Ruby client for PGMQ - PostgreSQL Message Queue
PGMQ-Ruby is a Ruby client for PGMQ (PostgreSQL Message Queue). It provides direct access to all PGMQ operations with a clean, minimal API - similar to how rdkafka-ruby relates to Kafka.
Think of it as:
- Like AWS SQS - but running entirely in PostgreSQL with no external dependencies
- Like Sidekiq/Resque - but without Redis, using PostgreSQL for both data and queues
- Like rdkafka-ruby - a thin, efficient wrapper around the underlying system (PGMQ SQL functions)
Architecture Note: This library follows the rdkafka-ruby/Karafka pattern -
pgmq-rubyis the low-level foundation, while higher-level features (job processing, Rails integration, retry strategies) will live inpgmq-framework(similar to how Karafka builds on rdkafka-ruby).
- PGMQ Feature Support
- Requirements
- Installation
- Quick Start
- Configuration
- API Reference
- Message Object
- Serializers
- Rails Integration
- Development
- License
- Author
This gem provides complete support for all core PGMQ SQL functions. Based on the official PGMQ API:
| Category | Method | Description | Status |
|---|---|---|---|
| Producing | produce |
Send single message with optional delay and headers | âś… |
produce_batch |
Send multiple messages atomically with headers | âś… | |
| Reading | read |
Read single message with visibility timeout | âś… |
read_batch |
Read multiple messages with visibility timeout | âś… | |
read_with_poll |
Long-polling for efficient message consumption | âś… | |
pop |
Atomic read + delete operation | âś… | |
pop_batch |
Atomic batch read + delete operation | âś… | |
| Deleting/Archiving | delete |
Delete single message | âś… |
delete_batch |
Delete multiple messages | âś… | |
archive |
Archive single message for long-term storage | âś… | |
archive_batch |
Archive multiple messages | âś… | |
purge_queue |
Remove all messages from queue | âś… | |
| Queue Management | create |
Create standard queue | âś… |
create_partitioned |
Create partitioned queue (requires pg_partman) | âś… | |
create_unlogged |
Create unlogged queue (faster, no crash recovery) | âś… | |
drop_queue |
Delete queue and all messages | âś… | |
detach_archive |
Detach archive table from queue | âś… | |
| Utilities | set_vt |
Update message visibility timeout | âś… |
set_vt_batch |
Batch update visibility timeouts | âś… | |
set_vt_multi |
Update visibility timeouts across multiple queues | âś… | |
list_queues |
List all queues with metadata | âś… | |
metrics |
Get queue metrics (length, age, total messages) | âś… | |
metrics_all |
Get metrics for all queues | âś… | |
enable_notify_insert |
Enable PostgreSQL NOTIFY on insert | âś… | |
disable_notify_insert |
Disable notifications | âś… | |
| Ruby Enhancements | Transaction Support | Atomic operations via client.transaction do |txn| |
âś… |
| Conditional Filtering | Server-side JSONB filtering with conditional: |
âś… | |
| Multi-Queue Ops | Read/pop/delete/archive from multiple queues | âś… | |
| Queue Validation | 48-character limit and name validation | âś… | |
| Connection Pooling | Thread-safe connection pool for concurrency | âś… | |
| Pluggable Serializers | JSON (default) with custom serializer support | âś… |
- Ruby 3.2+
- PostgreSQL 14-18 with PGMQ extension installed
PGMQ can be installed on your PostgreSQL instance in several ways:
For self-hosted PostgreSQL instances with filesystem access, install via PGXN:
pgxn install pgmqOr build from source:
git clone https://github.com/pgmq/pgmq.git
cd pgmq/pgmq-extension
make && make installThen enable the extension:
CREATE EXTENSION pgmq;For managed PostgreSQL services that don't allow native extension installation, PGMQ provides a SQL-only installation that works without filesystem access:
git clone https://github.com/pgmq/pgmq.git
cd pgmq
psql -f pgmq-extension/sql/pgmq.sql postgres://user:pass@your-rds-host:5432/databaseThis creates a pgmq schema with all required functions. See PGMQ Installation Guide for details.
Comparison:
| Feature | Extension | SQL-only |
|---|---|---|
| Version tracking | Yes | No |
| Upgrade path | Yes | Manual |
| Filesystem access | Required | Not needed |
| Managed cloud services | Limited | Full support |
If your managed PostgreSQL service supports pg_tle (available on AWS RDS PostgreSQL 14.5+ and Aurora), you can potentially install PGMQ as a Trusted Language Extension since PGMQ is written in PL/pgSQL and SQL (both supported by pg_tle).
To use pg_tle:
- Enable pg_tle on your instance (add to
shared_preload_libraries) - Create the pg_tle extension:
CREATE EXTENSION pg_tle; - Use
pgtle.install_extension()to install PGMQ's SQL functions
See AWS pg_tle documentation for setup instructions.
Note: The SQL-only installation is simpler and recommended for most managed service use cases. pg_tle provides additional version management and extension lifecycle features if needed.
Add to your Gemfile:
gem 'pgmq-ruby'Or install directly:
gem install pgmq-rubyrequire 'pgmq'
# Connect to database
client = PGMQ::Client.new(
host: 'localhost',
port: 5432,
dbname: 'mydb',
user: 'postgres',
password: 'secret'
)
# Create a queue
client.create('orders')
# Send a message (must be JSON string)
msg_id = client.produce('orders', '{"order_id":123,"total":99.99}')
# Read a message (30 second visibility timeout)
msg = client.read('orders', vt: 30)
puts msg.message # => "{\"order_id\":123,\"total\":99.99}" (raw JSON string)
# Parse and process (you handle deserialization)
data = JSON.parse(msg.message)
process_order(data)
client.delete('orders', msg.msg_id)
# Or archive for long-term storage
client.archive('orders', msg.msg_id)
# Clean up
client.drop_queue('orders')
client.close# config/initializers/pgmq.rb or in your model
class OrderProcessor
def initialize
# Reuse Rails' connection pool - no separate connection needed!
@client = PGMQ::Client.new(-> { ActiveRecord::Base.connection.raw_connection })
end
def process_orders
loop do
msg = @client.read('orders', vt: 30)
break unless msg
# Parse JSON yourself
data = JSON.parse(msg.message)
process_order(data)
@client.delete('orders', msg.msg_id)
end
end
endPGMQ-Ruby supports multiple ways to connect:
client = PGMQ::Client.new(
host: 'localhost',
port: 5432,
dbname: 'mydb',
user: 'postgres',
password: 'secret',
pool_size: 5, # Default: 5
pool_timeout: 5 # Default: 5 seconds
)client = PGMQ::Client.new('postgres://user:pass@localhost:5432/dbname')# Reuses Rails connection pool - no additional connections needed
client = PGMQ::Client.new(-> { ActiveRecord::Base.connection.raw_connection })# Bring your own connection management
connection = PGMQ::Connection.new('postgres://localhost/mydb', pool_size: 10)
client = PGMQ::Client.new(connection)PGMQ-Ruby includes connection pooling with resilience:
# Configure pool size and timeouts
client = PGMQ::Client.new(
'postgres://localhost/mydb',
pool_size: 10, # Number of connections (default: 5)
pool_timeout: 5, # Timeout in seconds (default: 5)
auto_reconnect: true # Auto-reconnect on connection loss (default: true)
)
# Monitor connection pool health
stats = client.stats
puts "Pool size: #{stats[:size]}" # => 10
puts "Available: #{stats[:available]}" # => 8 (2 in use)
# Disable auto-reconnect if you prefer explicit error handling
client = PGMQ::Client.new(
'postgres://localhost/mydb',
auto_reconnect: false
)Connection Pool Benefits:
- Thread-safe - Multiple threads can safely share a single client
- Fiber-aware - Works with Ruby 3.0+ Fiber Scheduler for non-blocking I/O
- Auto-reconnect - Recovers from lost connections (configurable)
- Health checks - Verifies connections before use to prevent stale connection errors
- Monitoring - Track pool utilization with
client.stats
# Create a queue (returns true if created, false if already exists)
client.create("queue_name") # => true
client.create("queue_name") # => false (idempotent)
# Create partitioned queue (requires pg_partman)
client.create_partitioned("queue_name",
partition_interval: "daily",
retention_interval: "7 days"
) # => true/false
# Create unlogged queue (faster, no crash recovery)
client.create_unlogged("queue_name") # => true/false
# Drop queue (returns true if dropped, false if didn't exist)
client.drop_queue("queue_name") # => true/false
# List all queues
queues = client.list_queues
# => [#<PGMQ::QueueMetadata queue_name="orders" created_at=...>, ...]Queue names must follow PostgreSQL identifier rules with PGMQ-specific constraints:
- Maximum 48 characters (PGMQ enforces this limit for table prefixes)
- Must start with a letter or underscore
- Can contain only letters, digits, and underscores
- Case-sensitive
Valid Queue Names:
client.create("orders") # âś“ Simple name
client.create("high_priority") # âś“ With underscore
client.create("Queue123") # âś“ With numbers
client.create("_internal") # âś“ Starts with underscore
client.create("a" * 47) # âś“ Maximum length (47 chars)Invalid Queue Names:
client.create("123orders") # âś— Starts with number
client.create("my-queue") # âś— Contains hyphen
client.create("my.queue") # âś— Contains period
client.create("a" * 48) # âś— Too long (48+ chars)
# Raises PGMQ::Errors::InvalidQueueNameError# Send single message (must be JSON string)
msg_id = client.produce("queue_name", '{"data":"value"}')
# Send with delay (seconds)
msg_id = client.produce("queue_name", '{"data":"value"}', delay: 60)
# Send with headers (for routing, tracing, correlation)
msg_id = client.produce("queue_name", '{"data":"value"}',
headers: '{"trace_id":"abc123","priority":"high"}')
# Send with headers and delay
msg_id = client.produce("queue_name", '{"data":"value"}',
headers: '{"correlation_id":"req-456"}',
delay: 60)
# Send batch (array of JSON strings)
msg_ids = client.produce_batch("queue_name", [
'{"order":1}',
'{"order":2}',
'{"order":3}'
])
# => ["101", "102", "103"]
# Send batch with headers (one per message)
msg_ids = client.produce_batch("queue_name",
['{"order":1}', '{"order":2}'],
headers: ['{"priority":"high"}', '{"priority":"low"}'])# Read single message
msg = client.read("queue_name", vt: 30)
# => #<PGMQ::Message msg_id="1" message="{...}">
# Read batch
messages = client.read_batch("queue_name", vt: 30, qty: 10)
# Read with long-polling
msg = client.read_with_poll("queue_name",
vt: 30,
qty: 1,
max_poll_seconds: 5,
poll_interval_ms: 100
)
# Pop (atomic read + delete)
msg = client.pop("queue_name")
# Pop batch (atomic read + delete for multiple messages)
messages = client.pop_batch("queue_name", 10)Filter messages by JSON payload content using server-side JSONB queries:
# Filter by single condition
msg = client.read("orders", vt: 30, conditional: { status: "pending" })
# Filter by multiple conditions (AND logic)
msg = client.read("orders", vt: 30, conditional: {
status: "pending",
priority: "high"
})
# Filter by nested properties
msg = client.read("orders", vt: 30, conditional: {
user: { role: "admin" }
})
# Works with read_batch
messages = client.read_batch("orders",
vt: 30,
qty: 10,
conditional: { type: "priority" }
)
# Works with long-polling
messages = client.read_with_poll("orders",
vt: 30,
max_poll_seconds: 5,
conditional: { status: "ready" }
)How Filtering Works:
- Filtering happens in PostgreSQL using JSONB containment operator (
@>) - Only messages matching ALL conditions are returned (AND logic)
- The
qtyparameter applies after filtering - Empty conditions
{}means no filtering (same as omitting parameter)
Performance Tip: For frequently filtered fields, add JSONB indexes:
CREATE INDEX idx_orders_status
ON pgmq.q_orders USING gin ((message->'status'));# Delete message
client.delete("queue_name", msg_id)
# Delete batch
deleted_ids = client.delete_batch("queue_name", [101, 102, 103])
# Archive message
client.archive("queue_name", msg_id)
# Archive batch
archived_ids = client.archive_batch("queue_name", [101, 102, 103])
# Update visibility timeout
msg = client.set_vt("queue_name", msg_id, vt_offset: 60)
# Batch update visibility timeout
updated_msgs = client.set_vt_batch("queue_name", [101, 102, 103], vt_offset: 60)
# Update visibility timeout across multiple queues
client.set_vt_multi({
"orders" => [1, 2, 3],
"notifications" => [5, 6]
}, vt_offset: 120)
# Purge all messages
count = client.purge_queue("queue_name")
# Enable PostgreSQL NOTIFY for a queue (for LISTEN-based consumers)
client.enable_notify_insert("queue_name", throttle_interval_ms: 250)
# Disable notifications
client.disable_notify_insert("queue_name")# Get queue metrics
metrics = client.metrics("queue_name")
puts metrics.queue_length # => 42
puts metrics.oldest_msg_age_sec # => 120
puts metrics.newest_msg_age_sec # => 5
puts metrics.total_messages # => 1000
# Get all queue metrics
all_metrics = client.metrics_all
all_metrics.each do |m|
puts "#{m.queue_name}: #{m.queue_length} messages"
endLow-level PostgreSQL transaction support for atomic operations. Transactions are a database primitive provided by PostgreSQL - this is a thin wrapper for convenience.
Execute atomic operations across multiple queues or combine queue operations with application data updates:
# Atomic operations across multiple queues
client.transaction do |txn|
# Send to multiple queues atomically
txn.produce("orders", '{"order_id":123}')
txn.produce("notifications", '{"user_id":456,"type":"order_created"}')
txn.produce("analytics", '{"event":"order_placed"}')
end
# Process message and update application state atomically
client.transaction do |txn|
# Read and process message
msg = txn.read("orders", vt: 30)
if msg
# Parse and update your database
data = JSON.parse(msg.message)
Order.create!(external_id: data["order_id"])
# Delete message only if database update succeeds
txn.delete("orders", msg.msg_id)
end
end
# Automatic rollback on errors
client.transaction do |txn|
txn.produce("queue1", '{"data":"message1"}')
txn.produce("queue2", '{"data":"message2"}')
raise "Something went wrong!"
# Both messages are rolled back - neither queue receives anything
end
# Move messages between queues atomically
client.transaction do |txn|
msg = txn.read("pending_orders", vt: 30)
if msg
data = JSON.parse(msg.message)
if data["priority"] == "high"
# Move to high-priority queue
txn.produce("priority_orders", msg.message)
txn.delete("pending_orders", msg.msg_id)
end
end
endHow Transactions Work:
- Wraps PostgreSQL's native transaction support (similar to rdkafka-ruby providing Kafka transactions)
- All operations within the block execute in a single PostgreSQL transaction
- If any operation fails, the entire transaction is rolled back automatically
- The transactional client delegates all
PGMQ::Clientmethods for convenience
Use Cases:
- Multi-queue coordination: Send related messages to multiple queues atomically
- Exactly-once processing: Combine message deletion with application state updates
- Message routing: Move messages between queues without losing data
- Batch operations: Ensure all-or-nothing semantics for bulk operations
Important Notes:
- Transactions hold database locks - keep them short to avoid blocking
- Long transactions can impact queue throughput
- Read operations with long visibility timeouts may cause lock contention
- Consider using
pop()for atomic read+delete in simple cases
PGMQ-Ruby is a low-level transport library - it returns raw values from PostgreSQL without any transformation. You are responsible for parsing JSON and type conversion.
msg = client.read("queue", vt: 30)
# All values are strings as returned by PostgreSQL
msg.msg_id # => "123" (String, not Integer)
msg.id # => "123" (alias for msg_id)
msg.read_ct # => "1" (String, not Integer)
msg.enqueued_at # => "2025-01-15 10:30:00+00" (String, not Time)
msg.vt # => "2025-01-15 10:30:30+00" (String, not Time)
msg.message # => "{\"data\":\"value\"}" (Raw JSONB as JSON string)
msg.headers # => "{\"trace_id\":\"abc123\"}" (Raw JSONB as JSON string, optional)
msg.queue_name # => "my_queue" (only present for multi-queue operations, otherwise nil)
# You handle JSON parsing
data = JSON.parse(msg.message) # => { "data" => "value" }
metadata = JSON.parse(msg.headers) if msg.headers # => { "trace_id" => "abc123" }
# You handle type conversion if needed
id = msg.msg_id.to_i # => 123
read_count = msg.read_ct.to_i # => 1
enqueued = Time.parse(msg.enqueued_at) # => 2025-01-15 10:30:00 UTCPGMQ supports optional message headers via the headers JSONB column. Headers are useful for metadata like routing information, correlation IDs, and distributed tracing:
# Sending a message with headers
message = '{"order_id":123}'
headers = '{"trace_id":"abc123","priority":"high","correlation_id":"req-456"}'
msg_id = client.produce("orders", message, headers: headers)
# Sending with headers and delay
msg_id = client.produce("orders", message, headers: headers, delay: 60)
# Batch produce with headers (one header object per message)
messages = ['{"id":1}', '{"id":2}', '{"id":3}']
headers = [
'{"priority":"high"}',
'{"priority":"medium"}',
'{"priority":"low"}'
]
msg_ids = client.produce_batch("orders", messages, headers: headers)
# Reading messages with headers
msg = client.read("orders", vt: 30)
if msg.headers
metadata = JSON.parse(msg.headers)
trace_id = metadata["trace_id"]
priority = metadata["priority"]
correlation_id = metadata["correlation_id"]
endCommon header use cases:
- Distributed tracing:
trace_id,span_id,parent_span_id - Request correlation:
correlation_id,causation_id - Routing:
priority,region,tenant_id - Content metadata:
content_type,encoding,version
This library follows the rdkafka-ruby philosophy - provide a thin, performant wrapper around the underlying system:
- No assumptions - Your application decides how to parse timestamps, convert types, etc.
- Framework-agnostic - Works equally well with Rails, Sinatra, or plain Ruby
- Zero overhead - No hidden type conversion or object allocation
- Explicit control - You see exactly what PostgreSQL returns
Higher-level features (automatic deserialization, type conversion, instrumentation) belong in framework layers built on top of this library.
PGMQ stores messages as JSONB in PostgreSQL. You must handle JSON serialization yourself:
# Simple hash
msg = { order_id: 123, status: "pending" }
client.produce("orders", msg.to_json)
# Using JSON.generate for explicit control
client.produce("orders", JSON.generate(order_id: 123, status: "pending"))
# Pre-serialized JSON string
json_str = '{"order_id":123,"status":"pending"}'
client.produce("orders", json_str)msg = client.read("orders", vt: 30)
# Parse JSON yourself
data = JSON.parse(msg.message)
puts data["order_id"] # => 123
puts data["status"] # => "pending"
# Handle parsing errors
begin
data = JSON.parse(msg.message)
rescue JSON::ParserError => e
logger.error "Invalid JSON in message #{msg.msg_id}: #{e.message}"
client.delete("orders", msg.msg_id) # Remove invalid message
endFor convenience, you can wrap the client in your own helper:
class QueueHelper
def initialize(client)
@client = client
end
def produce(queue, data)
@client.produce(queue, data.to_json)
end
def read(queue, vt:)
msg = @client.read(queue, vt: vt)
return nil unless msg
OpenStruct.new(
id: msg.msg_id.to_i,
data: JSON.parse(msg.message),
read_count: msg.read_ct.to_i,
raw: msg
)
end
end
helper = QueueHelper.new(client)
helper.produce("orders", { order_id: 123 })
msg = helper.read("orders", vt: 30)
puts msg.data["order_id"] # => 123# Clone repository
git clone https://github.com/mensfeld/pgmq-ruby.git
cd pgmq-ruby
# Install dependencies
bundle install
# Start PostgreSQL with PGMQ
docker compose up -d
# Run tests
bundle exec rspec
# Run console
bundle exec bin/consoleMaintained by Maciej Mensfeld
Also check out Karafka - High-performance Apache Kafka framework for Ruby.