A dart client for Postgres Message Queue (PGMQ).
# Start a Postgres instance
docker run -d --name postgres -e POSTGRES_PASSWORD=postgres -p 5432:5432 ofceab/pgmq:pg18-1.9.0# Connect to Postgres
psql postgres://postgres:postgres@0.0.0.0:5432/postgres-- create the pgmq schema
CREATE SCHEMA pgmq;-- create the extension in the "pgmq" schema
CREATE EXTENSION pgmq;- With Postgresql
// Create a database connection
final databaseParam = DatabaseConnection(
host: 'localhost',
database: 'postgres',
password: 'postgres',
username: 'postgres',
ssl: false,
port: 5432);// Create a pgmq connection with DatabaseConnection param
final pgmq = await Pgmq.createConnection(param: databaseParam);// Create a queue [queueName]
final queue = await pgmq.createQueue(queueName: queueName);- With Prisma
// Create a pgmq connection with PrismaClient
final pgmq = Pgmq.createConnectionUsingPrisma(prismaClient: PrismaClient());// Create a queue [queueName]
final queue = await pgmq.createQueue(queueName: queueName);// Send a message
queue.send({"foo": "bar"});// Read a message with visibilityTimeOut
queue.read(visibilityTimeOut: vt);// Read messages with advanced JSON filter using the Filter API
// Example: Filter messages where user.age > 30
queue.read(
visibilityTimeOut: vt,
maxReadNumber: 10,
conditional: Filter.gt('user.age', 30)
);// More filter examples
// Equality filter
queue.read(conditional: Filter.eq('status', 'pending'));
// Greater than or equal
queue.read(conditional: Filter.gte('score', 80));
// Less than
queue.read(conditional: Filter.lt('age', 65));
// Not equal
queue.read(conditional: Filter.ne('type', 'archived'));
// Check if field exists
queue.read(conditional: Filter.exists('priority'));// Read messages with polling and conditional filter
queue.readWithPoll(
visibilityTimeOut: vt,
maxReadNumber: 10,
pollInterval: Duration(seconds: 1),
conditional: Filter.eq('status', 'pending')
);// Archive a message [messageID]
queue.archive(messageID);// Delete a message [messageID]
queue.delete(messageID);// Pull messages from queue with a specified polling duration
queue.pull(duration: duration);// Pausable pull
final (pausableTimer, stream) = queue.pausablePull(duration: duration);
// Start the pausable pull
pausableTimer.start();
// Pause the pulling
pausableTimer.pause();// Read a message from queue and delete it upon read
queue.pop();// Purge all messages from queue
queue.purgeQueue();// Drops the queue
queue.dropQueue();- Sending Messages
- Reading Messages
- read (with advanced JSON filter support)
- read_with_poll (with advanced JSON filter support)
- pop
- Deleting/Archiving Messages
- Queue Management
- Utilities
- Pausable Queue