PostreSQL extension litmus-pgnats
31 min
overview litmus pgnats is a postgresql extension (written in rust) that integrates postgresql with nats messaging it exposes sql functions for publishing, subscribing, request/reply, jetstream, key value store, object store, and responder operations note when connected to litmus edge, currently only the subscribe functionality is supported deployment options litmus provides two options for customers to deploy the litmus pgnats extension marketplace deployment with postgresql pg18 the image is based on the official postgres 18 image with the litmus pgnats extension pre installed and pre loaded what's included litmus pgnats extension files installed into the postgresql directories shared preload libraries=litmus pgnats set at server start — no postgresql conf change needed for background workers self deployment in external hosted postgresql server for user with externally deployed postgresql servers, a zip file is available for different postgresql pg15 pg18 allowing users to add the extensions to their existing deployments prerequisites \ linux (amd64) \ postgresql 15, 16, 17, or 18 \ root or sudo access to the postgresql server what's included install sh is a shell script which will move copy the required extension files into the correct location of your postgresql setup litmus pgnats so the rust extension litmus pgnats control the extension control file litmus pgnats sql sql script(s) which setup all the postgresql infrastructure installation marketplace deployment on litmus edge select from from applications >marketplace the catalog app postgres18 with litmus pgnats follow the deployment instructions and launch the catalog app parameter description required name name of the docker container yes description a description for the catalog app no dbname postgres database name no (default postgres) dbuser is used in conjunction with postgres password to set a user and its password this will create the specified user with superuser power and a database with the same name if it is not specified, then the default user of postgres will be used no (default postgres) dbpassword sets the superuser password for postgresql the default superuser is defined by the <\<postgres user>> variable no (default postgres) restart application restart option no, always, on failure no (default always) port destination port for the application no (default 5432) self deployment 1\ download the zip file download the zip for your postgresql version from the litmus accelerators page https //portal litmus io/accelerators the filename follows this pattern litmus pgnats \<version> pg\<pg version> linux amd64 zip 2\ extract extract the zip file unzip litmus pgnats zip 3\ install run the included install script (requires root) it copies the extension files to the correct postgresql directories automatically sudo /install sh if pg config is not in your path, pass the full path sudo /install sh /usr/lib/postgresql/18/bin/pg config 4\ enable background workers (subscriptions and responders) add the following to postgresql conf shared preload libraries = 'litmus pgnats' max worker processes = 32 then restart postgresql for the change to take effect first time setup 1\ enable the extension connect to your database and run create extension if not exists litmus pgnats; this needs to be run once per database where you want to use the extension note litmus pgnats and the original pgnats extension cannot be installed in the same database — the nats functions would conflict they can coexist in separate databases within the same postgresql cluster 2\ configure the nats connection connection settings are stored in postgresql as a foreign server run this once per database 2 1 connecting to litmus edge before being able to connect to litmus edge, you will need to first create and configure an access account docid\ izylu8isr4fbq69hqfdkw and enable the nats proxy server docid\ izylu8isr4fbq69hqfdkw after that you can create the foreign server in postgresql create server le nats fdw server foreign data wrapper litmus pgnats fdw options ( host '\<litmus edge ip>', port '4222', capacity '128', username '', password '\<access account token>' ); 2 2 connecting to an external nats server to configure the nats connection, you need to create a foreign server create server nats fdw server foreign data wrapper litmus pgnats fdw options ( \ ip/hostname of the nats message server (default 127 0 0 1) host 'localhost', \ tcp port for nats connections (default 4222) port '4222', \ internal command buffer size in messages (default 128) capacity '128', \ path to the ca certificate used to verify the nats server certificate \ required for tls when tls insecure is not set tls ca path '/path/ca', \ path to the client certificate for mutual tls authentication \ optional unless the server requires client auth tls cert path '/path/cert', \ path to the client private key required when tls cert path is set tls key path '/path/key', \ optional \ skip tls certificate verification entirely \ for development or servers with self signed/private ca certs (e g litmus edge) tls insecure 'true', \ optional \ default jetstream domain for all kv, object store, and stream publish calls \ can be overridden per call via the domain parameter on individual functions domain 'hub', \ optional \ nats username for username/password authentication \ requires password or password path username 'myuser', \ optional \ absolute path to a file containing the nats password \ file contents are trimmed of surrounding whitespace \ preferred over the inline password option — rotation takes effect on reconnect password path '/run/secrets/nats password', \ optional \ inline nats password stored in pg foreign server options (visible to superusers) password 'hunter2', \ optional \ absolute path to a file containing an nkey seed string \ takes precedence over username/password when set nkey seed path '/run/secrets/nats nkey seed', \ optional \ inline nkey seed stored in pg foreign server options (visible to superusers) nkey seed 'suabc ', \ optional \ absolute path to a file containing a nats token (no username required) \ takes precedence over username/password; use for nats token auth mode token path '/run/secrets/nats token', \ optional \ inline nats token stored in pg foreign server options (visible to superusers) token 'my token', \ optional \ nats subject for patroni role change notifications notify subject 'my subject', \ optional \ patroni rest api url for role change detection patroni url 'http //localhost 8008/patroni', \ optional \ statement timeout (ms) applied to responder handler spi calls (default 5000) responder timeout ms '5000' ); authentication parameters parameter description host ip/hostname of the nats message server (default 127 0 0 1) port tcp port for nats connections (default 4222) capacity internal command buffer size in messages (default 128) tls ca path path to the ca certificate used to verify the nats server certificate required for tls when tls insecure is not set tls cert path path to the client certificate for mutual tls authentication optional unless the server requires client auth tls key path path to the client private key required when tls cert path is set tls insecure skip tls certificate verification entirely for development or servers with self signed/private ca certs (e g litmus edge) domain default jetstream domain for all kv, object store, and stream publish calls can be overridden per call via the domain parameter on individual functions username nats username for username/password authentication requires password or password path password path absolute path to a file containing the nats password file contents are trimmed of surrounding whitespace preferred over the inline password option — rotation takes effect on reconnect password inline nats password stored in pg foreign server options (visible to superusers) nkey seed path absolute path to a file containing an nkey seed string takes precedence over username/password when set nkey seed inline nkey seed stored in pg foreign server options (visible to superusers) token path absolute path to a file containing a nats token (no username required) takes precedence over username/password; use for nats token auth mode token inline nats token stored in pg foreign server options (visible to superusers) notify subject nats subject for patroni role change notifications patroni url patroni rest api url for role change detection responder timeout ms statement timeout (ms) applied to responder handler spi calls (default 5000) authentication priority options are evaluated in this order — first match wins priority mode required options 1 nkey (file) nkey seed path 2 nkey (inline) nkey seed 3 token (file) token path 4 token (inline) token 5 username/password (file) username + password path 6 username/password (inline) username + password — no auth (none set) mtls (`tls cert path` + `tls key path`) is independent of credential mode and can be combined with any row above `tls insecure 'true'` is mutually exclusive with `tls ca path` — when set, no ca certificate is loaded and all server certificates are accepted 3\ reload configuration after changing foreign server options, reload without restarting select litmus pgnats reload conf(); force a full reconnect (drops and re establishes the nats connection) select litmus pgnats reload conf force(); background workers (subscriptions & responders) the image already sets shared preload libraries=litmus pgnats, so background workers start automatically if you need more than the default number of workers, set this in your postgresql configuration \ set this to the number of workers you want to support \ example max worker processes = 32 sql functions publish \ core nats (fire and forget) select nats publish text('sub ject', 'hello'); select nats publish json('sub ject', '{"key" "value"}' json); select nats publish jsonb('sub ject', '{"key" "value"}' jsonb); select nats publish binary('sub ject', 'data' bytea); \ with reply subject select nats publish text('sub ject', 'hello', 'reply subject'); \ with headers select nats publish text('sub ject', 'hello', null, '{"nats msg id" "1"}' jsonb); \ jetstream (sync) select nats publish text stream('sub ject', 'hello'); select nats publish json stream('sub ject', '{"key" "value"}' json); select nats publish jsonb stream('sub ject', '{"key" "value"}' jsonb); select nats publish binary stream('sub ject', 'data' bytea); \ jetstream with domain override select nats publish text stream('sub ject', 'hello', null, 'hub'); subscribe the callback function must accept a single argument of type bytea \ subscribe a postgresql function to a nats subject select nats subscribe('events user created', 'schema handle user created' regproc); \ multiple functions can be subscribed to the same subject select nats subscribe('events user created', 'schema log user created' regproc); \ unsubscribe select nats unsubscribe('events user created', 'schema handle user created' regproc); request/reply to be able to use the nats request functions a responder needs to setup select nats request text('rpc subject', 'payload', 5000); timeout in ms select nats request json('rpc subject', '{"q" "v"}' json, 5000); select nats request jsonb('rpc subject', '{"q" "v"}' jsonb, 5000); select nats request binary('rpc subject', 'data' bytea, 5000); responders register a postgresql function as a persistent nats request handler the function must accept bytea and return bytea select litmus pgnats register responder('rpc ping', 'public handle ping' regproc); select litmus pgnats unregister responder('rpc ping'); example this is an example for a simple responder function \ simple responder function create or replace function litmus pgnats test responder( 	 payload text, 	 subject text, 	 reply to text ) returns text language sql as $$ 	 select 'echo ' || payload; $$; 	 \ adding responder select litmus pgnats register responder('litmus test responder', 'litmus pgnats test responder' regproc); key value all kv functions accept an optional domain argument when omitted, the fdw level domain option is used \ write select nats put text('bucket', 'key', 'value'); select nats put json('bucket', 'key', '{"x" 1}' json); select nats put jsonb('bucket', 'key', '{"x" 1}' jsonb); select nats put binary('bucket', 'key', 'data' bytea); \ write to a specific domain select nats put text('bucket', 'key', 'value', 'hub'); \ read select nats get text('bucket', 'key'); select nats get json('bucket', 'key'); select nats get jsonb('bucket', 'key'); select nats get binary('bucket', 'key'); \ read from a specific domain select nats get text('bucket', 'key', 'hub'); \ delete select nats delete value('bucket', 'key'); select nats delete value('bucket', 'key', 'hub'); object store all object store functions accept an optional domain argument \ upload select nats put file('store', 'file txt', 'content' bytea); select nats put file('store', 'file txt', 'content' bytea, 'hub'); \ download select nats get file('store', 'file txt'); \ delete select nats delete file('store', 'file txt'); \ metadata and listing select from nats get file info('store', 'file txt'); select from nats get file list('store'); select from nats get file list('store', 'hub'); utility select from litmus pgnats version(); extension version select from nats get server info(); nats server connection info example this is a simple example connecting directly to litmus edge and capturing payloads as simple string \ enable the extension in the database you want to implement your use case create extension if not exists litmus pgnats; \ setup the foreign server connection create server nats fdw server test foreign data wrapper litmus pgnats fdw options (	 	\ nats external is the primary nats server (jetstream domain hub) 	host '10 30 50 1', 	\ tcp port for nats connections 	port '4222', 	\ internal command buffer size in messages 	capacity '128', 	\ pgnats nats username (for le access token leave set to empty string) 	username '', 	\ litmus edge access account token 	password '\<access account token>'); 	 \ relaod config select litmus pgnats reload conf() \ verify connection select from nats get server info(); \ create table to store le payloads as simple string create table if not exists litmus pgnats test messages ( 	received at timestamptz default now(), 	payload text ); \ create a function used by the subscriber to process the incoming payload create or replace function litmus pgnats test capture(msg bytea) returns void language sql as $$ 	insert into litmus pgnats test messages(payload) values (convert from(msg, 'utf8')); $$; \ subscribe to topic from litmus edge select nats subscribe('devicehub alias \<mydevice> \<mytag>', 'litmus pgnats test capture' regproc); \ verify data are captured select from litmus pgnats test messages order by received at desc limit 10; \ cleanup select nats unsubscribe('devicehub alias \<mydevice> \<mytag>', 'litmus pgnats test capture' regproc);