A TypeScript library that provides type-safe methods for PostgreSQL Message Queue (PGMQ) operations in your Prisma-based applications.
- đź”’ Type-safe: Full TypeScript support with proper type definitions
- 📦 Easy to use: Simple API with functional methods
- 🔌 Prisma Integration: Seamless integration with your existing Prisma setup
npm install prisma-pgmq
# or
pnpm add prisma-pgmq
# or
yarn add prisma-pgmq
- PostgreSQL database with the PGMQ extension installed
- Prisma Client v5.0.0 or higher
- Node.js 16+
Enabling the PGMQ extension via Prisma
You can manage PostgreSQL extensions (including PGMQ) directly in your Prisma schema using the
postgresqlExtensions
preview feature. Add the extension to yourdatasource
block inschema.prisma
:generator client { provider = "prisma-client-js" previewFeatures = ["postgresqlExtensions"] } datasource db { provider = "postgresql" url = env("DATABASE_URL") extensions = [pgmq] }For more details, see the Prisma documentation on PostgreSQL extensions.
import { PrismaClient } from '@prisma/client';
import { pgmq } from 'prisma-pgmq';
const prisma = new PrismaClient();
// Create a queue
await pgmq.createQueue(prisma, 'my-work-queue');
// Send a message
await pgmq.send(prisma, 'my-work-queue', {
userId: 123,
action: 'send-email',
email: 'user@example.com'
});
Send a single message to a queue.
const msgId = await pgmq.send(tx, 'my-queue', { data: 'hello' });
// Send with delay (seconds)
const msgId = await pgmq.send(tx, 'my-queue', { data: 'hello' }, 30);
// Send with specific time
const msgId = await pgmq.send(
tx,
'my-queue',
{ data: 'hello' },
new Date('2024-01-01T10:00:00Z')
);
Send multiple messages to a queue in a single operation.
const msgIds = await pgmq.sendBatch(tx, 'my-queue', [
{ id: 1, data: 'message 1' },
{ id: 2, data: 'message 2' },
{ id: 3, data: 'message 3' }
]);
Read messages from a queue with visibility timeout.
// Read up to 5 messages with 30 second visibility timeout
const messages = await pgmq.read(tx, 'my-queue', 30, 5);
// Read with conditional filtering
const messages = await pgmq.read(tx, 'my-queue', 30, 5, { priority: 'high' });
Read messages with polling (wait for messages if none available).
// Poll for up to 10 seconds, checking every 500ms
const messages = await pgmq.readWithPoll(tx, 'my-queue', 30, 1, 10, 500);
Read and immediately delete a message (atomic operation).
const messages = await pgmq.pop(tx, 'my-queue');
Delete a specific message.
const deleted = await pgmq.deleteMessage(tx, 'my-queue', 123);
Delete multiple messages.
const deletedIds = await pgmq.deleteBatch(tx, 'my-queue', [123, 124, 125]);
Archive a message (move to archive table).
const archived = await pgmq.archive(tx, 'my-queue', 123);
Archive multiple messages.
const archivedIds = await pgmq.archiveBatch(tx, 'my-queue', [123, 124, 125]);
Create a new queue.
await pgmq.createQueue(tx, 'my-new-queue');
Create a partitioned queue for high-throughput scenarios.
await pgmq.createPartitionedQueue(tx, 'high-volume-queue', '10000', '100000');
Create an unlogged queue (better performance, less durability).
await pgmq.createUnloggedQueue(tx, 'temp-queue');
Delete a queue and all its messages.
const dropped = await pgmq.dropQueue(tx, 'old-queue');
Remove all messages from a queue.
const messageCount = await pgmq.purgeQueue(tx, 'my-queue');
Set visibility timeout for a specific message.
const message = await pgmq.setVt(tx, 'my-queue', 123, 60); // 60 seconds
Get information about all queues.
const queues = await pgmq.listQueues(tx);
console.log(queues); // [{ queue_name: 'my-queue', created_at: ..., is_partitioned: false }]
Get metrics for a specific queue.
const metrics = await pgmq.metrics(tx, 'my-queue');
console.log(metrics);
// {
// queue_name: 'my-queue',
// queue_length: 5,
// newest_msg_age_sec: 10,
// oldest_msg_age_sec: 300,
// total_messages: 1000,
// scrape_time: 2024-01-01T10:00:00.000Z
// }
Get metrics for all queues.
const allMetrics = await pgmq.metricsAll(tx);
type Task = Record<string, unknown>;
interface MessageRecord {
msg_id: number;
read_ct: number;
enqueued_at: Date;
vt: Date;
message: Task;
}
interface QueueMetrics {
queue_name: string;
queue_length: number;
newest_msg_age_sec: number | null;
oldest_msg_age_sec: number | null;
total_messages: number;
scrape_time: Date;
}
interface QueueInfo {
queue_name: string;
created_at: Date;
is_partitioned: boolean;
is_unlogged: boolean;
}
import { PrismaClient } from '@prisma/client';
import { pgmq } from 'prisma-pgmq';
const prisma = new PrismaClient();
// Producer
async function sendTask(taskData: any) {
await pgmq.send(prisma, 'work-queue', {
type: 'process-user-data',
data: taskData,
timestamp: Date.now()
});
}
// Consumer
async function processMessages() {
const messages = await pgmq.readWithPoll(prisma, 'work-queue', 30, 5, 10, 1000);
for (const message of messages) {
try {
// Process the message
await handleTask(message.message);
// Delete on success
await pgmq.deleteMessage(prisma, 'work-queue', message.msg_id);
} catch (error) {
console.error('Task failed:', error);
// Archive failed messages for later analysis
await pgmq.archive(prisma, 'work-queue', message.msg_id);
}
}
}
async function handleTask(task: any) {
// Your business logic here
console.log('Processing task:', task.type);
}
// Schedule a message for later processing
const futureDate = new Date(Date.now() + 24 * 60 * 60 * 1000); // 24 hours
await pgmq.send(prisma, 'scheduled-tasks', {
type: 'send-reminder',
userId: 123,
reminder: 'Your subscription expires tomorrow'
}, futureDate);
- Fork the repository
- Create your feature branch (
git checkout -b feature/amazing-feature
) - Commit your changes (
git commit -m 'feat: add amazing feature'
) - Push to the branch (
git push origin feature/amazing-feature
) - Open a Pull Request
# Clone the repository
git clone https://github.com/dvlkv/prisma-pgmq.git
cd prisma-pgmq
# Install dependencies
pnpm install
# Run tests
pnpm test
# Build the library
pnpm build
# Watch for changes during development
pnpm dev
This project is licensed under the MIT License - see the LICENSE file for details.