A library that provides a wrapper of PGMQ, a PostgreSQL message queue implementation, making it easier to integrate into your application. As designed you can implement your own database access layer by implementing an Adapter conforming to a simple protocol. The provided adapter utilizes HikariCP and next.jdbc.
-
Documentation This is documentation that will contain explanations of the functions as well as usage examples when appropriate. It is generated from the docstrings present in code generated by quickdoc. This documentation has a complete table of contents.
-
Specs This project uses clojure.spec.alpha to provide a means of validating expectations. Using this project, leveraging the
clojure.spec.alpha/describe, documentation is generated describing functions, return types, and their expected inputs. They are organized by a namespaced function name followed by the arguments of that function.
This will build a jar that can be used directly in your applications.
# babashka
bb jar
# build tools
clj -T:build all
This will run all tests. Tests currently include an integration test using test containers against postgresql 15, 16, and 17.
# babashka
bb test
bb test coverage
bb test watch
# build tools
clj -M:test --profile test
clj -M:test --profile coverage
clj -M:test --profile watch
This will update the local depencies in deps.edn and bb.edn.
# babashka
bb upgrade
# build tools
clj -M:upgrade
com.thirstysink.pgmq-clj.corearchive-messages- Archives messagesmsg-idsin a queue namedqueue-nameusing a givenadapter.create-queue- Create a queue namedqueue-nameusing a givenadapter.delete-message- Permanently deletes message with idmsg-idin the queue namedqueue-nameusing a givenadapter.delete-message-batch- Deletes allmsg-idsmessages in queuequeue-nameusing a givenadapter.drop-queue- Drop queue namedqueue-nameusing a givenadapter.list-queues- List all queues using a givenadapter.pop-message- Pops one message from the queue namedqueue-nameusing a givenadapter.purge-queue- Purge queue namedqueue-namecontents using a givenadapterthen and return count of purged items.read-message- Read aquantityof messages fromqueue-namemarking them invisible forvisible_timeseconds using a givenadapter.send-message- Send one message to a queuequeue-namewith apayloadthat will not be read fordelayseconds using a givenadapter.send-message-batch- Sendspayloadto the queue namedqueue-nameas a collection of messages that cannot be read fordelayseconds using a givenadapter.
com.thirstysink.pgmq-clj.db.adapterAdapterclose- Performs database connection cleanup usingthis.execute!- Execute asqlstatement andparamswith 0 or more return values usingthis.execute-one!- Execute asqlstatement andparamswith 0 or 1 return values usingthis.query- Query the database with a givensql,params, and return results usingthis.with-transaction- Wrap a functionfin a database transaction usingthis.
com.thirstysink.pgmq-clj.db.adapters.hikari-adapter->pgobject- Transforms Clojure data to a PGobjectxthat contains the data as JSON.<-pgobject- Transform PGobjectvcontainingjsonorjsonbvalue to Clojure data.ensure-pgmq-extension- Checks the database to verify that thepgmqextension is installed using theadapter.make-hikari-adapter- Create a newHikariAdapterinstance.
com.thirstysink.pgmq-clj.instrumentationdisable-instrumentation- Disablesclojure.specs.alphaspecs instrumentation.enable-instrumentation- Enablesclojure.specs.alphaspecs instrumentation.instrumentation-enabled?- A flag that indicates if instrumentation is enabled.
com.thirstysink.pgmq-clj.json->json- Returns a JSON-encoding String for the given Clojure object.
com.thirstysink.pgmq-clj.specs
(archive-messages adapter queue-name msg-ids)Function.
Archives messages msg-ids in a queue named queue-name using a given adapter.
This will remove the message from queue-name and place it in a archive table
which is named a_{queue-name}.
Example: (core/archive-messages adapter "test-queue" [3]) ;; => ()
(create-queue adapter queue-name)Function.
Create a queue named queue-name using a given adapter.
Example:
(core/create-queue adapter "test-queue")
;; => nil(delete-message adapter queue-name msg-id)Function.
Permanently deletes message with id msg-id in the queue
named queue-name using a given adapter.
Example: (core/delete-message adapter "test-queue" 3) ;; => true
(delete-message-batch adapter queue-name msg-ids)Function.
Deletes all msg-ids messages in queue queue-name using a given adapter.
Example: (core/delete-message-batch adapter "test-queue" [2 5 6]) ;; => [2 5 6]
(drop-queue adapter queue-name)Function.
Drop queue named queue-name using a given adapter.
Example:
(core/drop-queue adapter "test-queue-2")
;; => true(list-queues adapter)Function.
List all queues using a given adapter.
Example:
(core/list-queues adapter)
;; => [{:queue-name "test-queue",
:is-partitioned false,
:is-unlogged false,
:created-at
#object[java.time.Instant 0x680b0f16 "2025-03-20T01:01:42.842248Z"]}
{:queue-name "test-queue-2",
:is-partitioned false,
:is-unlogged false,
:created-at
#object[java.time.Instant 0x45e79bdf "2025-03-20T01:01:46.292274Z"]}
{:queue-name "test-queue-3",
:is-partitioned false,
:is-unlogged false,
:created-at
#object[java.time.Instant 0x19767429 "2025-03-20T01:01:54.665295Z"]}]
(pop-message adapter queue-name)Function.
Pops one message from the queue named queue-name using a given adapter. The side-effect of
this function is equivalent to reading and deleting a message. See also
[[read-message]] and [[delete-message]].
Example: (core/pop-message adapter "test-queue") ;; => {:msg-id 1, :read-ct 0, :enqueued-at #object[java.time.Instant 0x79684534 "2025-03-20T01:29:15.298975Z"], :vt #object[java.time.Instant 0x391acb50 "2025-03-20T01:30:45.300696Z"], :message {:user-id "0f83fbeb-345b-41ca-bbec-3bace0cff5b4", :order-count 12}, :headers {:TENANT "b5bda77b-8283-4a6d-8de8-40a5041a60ee"}
(purge-queue adapter queue-name)Function.
Purge queue named queue-name contents using a given adapter then
and return count of purged items.
Example:
(c/purge-queue adapter queue-name)
;; => 2(read-message adapter queue-name visible_time quantity filter)Function.
Read a quantity of messages from queue-name marking them invisible for
visible_time seconds using a given adapter. This function supports the
ability to filter messages received when making a read request.
Here are some examples of how this conditional works:
-
If conditional is an empty JSON object ('{}'::jsonb), the condition always evaluates to TRUE, and all messages are considered matching.
-
If conditional is a JSON object with a single key-value pair, such as {'key': 'value'}, the condition checks if the message column contains a JSON object with the same key-value pair. For example:
message = {'key': 'value', 'other_key': 'other_value'}: // matches
message = {'other_key': 'other_value'}: // does not match
- If conditional is a JSON object with multiple key-value pairs, such as {'key1': 'value1', 'key2': 'value2'}, the condition checks if the message column contains a JSON object with all the specified key-value pairs. For example:
message = {'key1': 'value1', 'key2': 'value2', 'other_key': 'other_value'}: // matches
message = {'key1': 'value1', 'other_key': 'other_value'}: // does not match
Some examples of conditional JSONB values and their effects on the query:
{}: matches all messages{'type': 'error'}: matches messages with a type key equal to 'error'{'type': 'error', 'severity': 'high'}: matches messages with both type equal to 'error' and severity equal to 'high'{'user_id': 123}: matches messages with a user_id key equal to 123
Example: (core/read-message adapter "test-queue" 10 88 nil) ;; => ({:msg-id 2, :read-ct 1, :enqueued-at #object[java.time.Instant 0x5f794b3d "2025-03-21T01:14:00.831673Z"], :vt #object[java.time.Instant 0x3fcde164 "2025-03-21T01:15:32.988540Z"], :message {:user-id "0f83fbeb-345b-41ca-bbec-3bace0cff5b4", :order-count 12}, :headers {:TENANT "b5bda77b-8283-4a6d-8de8-40a5041a60ee"}})
(send-message adapter queue-name payload delay)Function.
Send one message to a queue queue-name with a payload
that will not be read for delay seconds using a given adapter.
A delay of 0 indicates it may be read immediately.
Example Payloads:
{:data {:foo "bad"} :headers {:x-data "baz"}}{:data "feed" :headers {:version "3"}}
Example: (core/send-message adapter "test-queue" {:data {:order-count 12 :user-id "0f83fbeb-345b-41ca-bbec-3bace0cff5b4"} :headers {:TENANT "b5bda77b-8283-4a6d-8de8-40a5041a60ee"}} 90) ;; => 1
(send-message-batch adapter queue-name payload delay)Function.
Sends payload to the queue named queue-name as a collection of messages
that cannot be read for delay seconds using a given adapter. The payload
should be a sequence of valid JSON objects. See also [[send-message]].
Example Payloads:
[{:data {:foo "bar"} :headers {:x-data "bat"}}][{:data 10002 :headers {}} {:data "feed" :headers {:version "2"}} ]Example: (core/send-message-batch adapter "test-queue" [{:data {:order-count 12 :user-id "0f83fbeb-345b-41ca-bbec-3bace0cff5b4"} :headers {:X-SESS-ID "b5bda77b-8283-4a6d-8de8-40a5041a60ee"}} {:data {:order-count 12 :user-id "da04bf11-018f-45c4-908f-62c33b6e8aa6"} :headers {:X-SESS-ID "b0ef0d6a-e587-4c28-b995-1efe8cb31c9e"}}] 15) ;; => [5 6]
(close this)Function.
Performs database connection cleanup using this.
(execute! this sql params)Function.
Execute a sql statement and params with 0 or more return values using this.
(execute-one! this sql params)Function.
Execute a sql statement and params with 0 or 1 return values using this.
(query this sql params)Function.
Query the database with a given sql, params, and return results using this.
(with-transaction this f)Function.
Wrap a function f in a database transaction using this.
(->pgobject x)Function.
Transforms Clojure data to a PGobject x that contains the data as
JSON. PGObject type defaults to jsonb but can be changed via
metadata key :pgtype
(<-pgobject v)Function.
Transform PGobject v containing json or jsonb value to Clojure data.
(ensure-pgmq-extension adapter)Function.
Checks the database to verify that the pgmq extension is installed
using the adapter. If it is not then it will throw an exception.
Example:
(hikari/ensure-pgmq-extension adapter)
;; => nil
(make-hikari-adapter config)Function.
Create a new HikariAdapter instance. The argument config
provides database connection values. See https://github.com/tomekw/hikari-cp
for additional details on the configuration options.
| Setting | Description |
|---|---|
| jdbc-url | This property sets the JDBC connection URL. |
| username | This property sets the default authentication username used when obtaining Connections from the underlying driver. |
| password | This property sets the default authentication password used when obtaining Connections from the underlying driver. |
| maximum-pool-size | This property controls the maximum size that the pool is allowed to reach, including both idle and in-use connections. |
| minimum-idle | This property controls the minimum number of idle connections that HikariCP tries to maintain in the pool. |
Example:
(def adapter (hikari/make-hikari-adapter {:jdbc-url "jdbc:postgresql://0.0.0.0:5432/postgres" :username "postgres" :password "postgres"}))
;; => #'user/adapter(disable-instrumentation)
(disable-instrumentation ns)Function.
Disables clojure.specs.alpha specs instrumentation. If
no namespace ns is provided it will disable instrumentation
for com.thirstysink.pgmq-clj.core.
(enable-instrumentation)
(enable-instrumentation ns)Function.
Enables clojure.specs.alpha specs instrumentation. If
no namespace ns is provided it will instrument
com.thirstysink.pgmq-clj.core.
A flag that indicates if instrumentation is enabled.
This is determined by the value of the environment variable PGMQCLJ_INSTRUMENTAION_ENABLED.
If the environment variable is set, the value will be true; otherwise, false.
Returns a JSON-encoding String for the given Clojure object. Takes an optional date format string that Date objects will be encoded with.
The default date format (in UTC) is: yyyy-MM-dd'T'HH:mm:ss'Z'
(fspec :args (cat :adapter :com.thirstysink.pgmq-clj.specs/adapter :queue-name :com.thirstysink.pgmq-clj.specs/queue-name :msg-id :com.thirstysink.pgmq-clj.specs/msg-id) :ret boolean? :fn nil)(or :string string? :keyword keyword?)boolean?(instance? java.time.Instant %)(and number? pos?)(instance? java.time.Instant %)int?(fspec :args (cat :adapter :com.thirstysink.pgmq-clj.specs/adapter :queue-name :com.thirstysink.pgmq-clj.specs/queue-name :visibility_time :com.thirstysink.pgmq-clj.specs/visibility_time :quantity :com.thirstysink.pgmq-clj.specs/quantity :filter :com.thirstysink.pgmq-clj.specs/json) :ret :com.thirstysink.pgmq-clj.specs/message-records :fn nil)(conformer (zipmap (map :clojure.spec.alpha/k %) (map :clojure.spec.alpha/v %)) (map (fn [[k v]] #:clojure.spec.alpha{:k k, :v v}) %))(fn [x] (or (map? x) (vector? x) (string? x) (number? x) (boolean? x) (nil? x)))(fn [x] (or (map? x) (vector? x) (string? x) (number? x) (boolean? x) (nil? x)))(coll-of :com.thirstysink.pgmq-clj.specs/msg-id)(fspec :args (cat :adapter :com.thirstysink.pgmq-clj.specs/adapter :queue-name :com.thirstysink.pgmq-clj.specs/queue-name :payload :com.thirstysink.pgmq-clj.specs/payload-objects :delay :com.thirstysink.pgmq-clj.specs/delay) :ret :com.thirstysink.pgmq-clj.specs/msg-ids :fn nil)(fn [x] (fn* [] (instance? java.time.Instant x)))(nilable (map-of :com.thirstysink.pgmq-clj.specs/header-key :com.thirstysink.pgmq-clj.specs/header-value :min-count 0))(fspec :args (cat :adapter :com.thirstysink.pgmq-clj.specs/adapter :queue-name :com.thirstysink.pgmq-clj.specs/queue-name :msg-ids :com.thirstysink.pgmq-clj.specs/msg-ids) :ret :com.thirstysink.pgmq-clj.specs/msg-ids :fn nil)valid-queue-name?boolean?(fspec :args (cat :adapter :com.thirstysink.pgmq-clj.specs/adapter :queue-name :com.thirstysink.pgmq-clj.specs/queue-name) :ret nil :fn nil)(and :com.thirstysink.pgmq-clj.specs/msg-ids (complement empty?))(coll-of :com.thirstysink.pgmq-clj.specs/queue-record)(fspec :args (cat :adapter :com.thirstysink.pgmq-clj.specs/adapter :queue-name :com.thirstysink.pgmq-clj.specs/queue-name) :ret :com.thirstysink.pgmq-clj.specs/message-record :fn nil)(and int? (>= % 0))(fspec :args (cat :adapter :com.thirstysink.pgmq-clj.specs/adapter :queue-name :com.thirstysink.pgmq-clj.specs/queue-name :msg-ids :com.thirstysink.pgmq-clj.specs/non-empty-msg-ids) :ret :com.thirstysink.pgmq-clj.specs/msg-ids :fn nil)(coll-of :com.thirstysink.pgmq-clj.specs/payload-object)(keys :req-un [:com.thirstysink.pgmq-clj.specs/data :com.thirstysink.pgmq-clj.specs/headers])(satisfies? Adapter %)int?(fspec :args (cat :adapter :com.thirstysink.pgmq-clj.specs/adapter :queue-name :com.thirstysink.pgmq-clj.specs/queue-name) :ret boolean? :fn nil)(keys :req-un [:com.thirstysink.pgmq-clj.specs/msg-id :com.thirstysink.pgmq-clj.specs/read-ct :com.thirstysink.pgmq-clj.specs/enqueued-at :com.thirstysink.pgmq-clj.specs/vt :com.thirstysink.pgmq-clj.specs/message] :opt-un [:com.thirstysink.pgmq-clj.specs/headers])(fspec :args (cat :adapter :com.thirstysink.pgmq-clj.specs/adapter) :ret :com.thirstysink.pgmq-clj.specs/queue-result :fn nil)(or :string string? :number number? :list (coll-of (or :string string? :number number?)))(instance? java.time.Instant %)(fn [x] (or (map? x) (vector? x) (string? x) (number? x) (boolean? x) (nil? x)))(fspec :args (cat :adapter :com.thirstysink.pgmq-clj.specs/adapter :queue-name :com.thirstysink.pgmq-clj.specs/queue-name :payload :com.thirstysink.pgmq-clj.specs/payload-object :delay :com.thirstysink.pgmq-clj.specs/delay) :ret :com.thirstysink.pgmq-clj.specs/msg-id :fn nil)(keys :req-un [:com.thirstysink.pgmq-clj.specs/queue-name :com.thirstysink.pgmq-clj.specs/is-partitioned :com.thirstysink.pgmq-clj.specs/is-unlogged :com.thirstysink.pgmq-clj.specs/created-at])(and int? (> % 0))(fspec :args (cat :adapter :com.thirstysink.pgmq-clj.specs/adapter :queue-name :com.thirstysink.pgmq-clj.specs/queue-name) :ret integer? :fn nil)(coll-of :com.thirstysink.pgmq-clj.specs/mesage-record)