pgpro_queue is a built-in Postgres Pro Enterprise extension for message queueing management. It enables reliable communication between applications using database queues.
pgpro_queue allows managing message queues directly within the database. With pgpro_queue, you can create queues, add messages to them, and process messages efficiently while ensuring data integrity and reliability. A message from the queue is processed by one of the consumers. Once a message is retrieved from the queue, it is no longer available to other consumers.
pgpro_queue also supports message queueing management for subtransactions and autonomous transactions even if they have multiple nested levels.
Because pgpro_queue provides database-integrated message queuing, standard features, such as transaction recovery, crash restart, and standby synchronization, are fully supported.
Messages in the queue are processed based on assigned priorities, allowing higher-priority messages to be handled ahead of others, even if they arrive later.
To optimize the mechanism of reading messages, pgpro_queue uses callback functions handled by dedicated workers. These functions provide asynchronous message processing by being automatically triggered when relevant messages arrive to the queue. Callback functions also enable parallel message processing: multiple functions can be registered for different queues and executed independently, allowing similar queries to be processed concurrently.
The pgpro_queue extension can be used only for
transactions running at the READ COMMITTED
isolation level.
The diagram below shows a practical example of requesting the creation of a PDF file. Rather than processing these requests directly, they can be sent to a queue, allowing for efficient handling of high volumes, improved user experience with asynchronous processing, and greater flexibility through a decoupled architecture.
Figure F.1. PDF File Creation Example
In addition to the basic message queueing management described above, pgpro_queue also uses exchanges to support efficient message distribution based on the publish-subscribe model. In this model, messages are first published to an exchange — a component that receives messages and then routes them to one or more queues. Routing is determined by the following elements:
bindings: links between an exchange and a queue
exchange type: the routing algorithm
routing keys: message attributes that specify a destination or category
This method allows sending messages to multiple consumers at once instead of sending messages directly to specific point-to-point queues. It also allows consumers to receive messages selectively from specific queues by binding a queue to a selected exchange (i.e. creating a subscription).
The pgpro_queue extension is included into Postgres Pro Enterprise. Once you have Postgres Pro Enterprise installed, complete the following steps to enable pgpro_queue:
Add pgpro_queue to the
shared_preload_libraries parameter
in the postgresql.conf file:
shared_preload_libraries = 'pgpro_queue'
Create the pgpro_queue extension using
the following query:
CREATE EXTENSION pgpro_queue;
A set of API functions will be created in the current schema.
Initialize the extension:
SELECT queue_initialize();
This function creates a dedicated schema called
pgpro_queue_data where all service objects,
such as metadata tables, views, and queue-specific tables, are
stored. The initialization decouples queue objects from the
extension itself, ensuring proper replication and database dumps.
To remove all objects created by the extension, execute the following command (available only to superusers):
SELECT queue_deinitialize();
This function performs a cascaded delete of the
pgpro_queue_data schema.
When using pgpro_queue for prepared transactions, take note the following considerations:
If a transaction that calls the
READ_MESSAGE()
function is prepared for a two-phase commit using
PREPARE TRANSACTION, the corresponding message
becomes locked in the queue table until you execute the
COMMIT PREPARED or ROLLBACK PREPARED
command. In other words, this message will be shown in the queue table
but will not be available for further read operations.
For pgpro_queue, the
COMMIT PREPARED command works like the regular
COMMIT command. If a message was read in a prepared
transaction successfully, the message is removed from the queue table
once the COMMIT PREPARED command is executed.
Unlike the ROLLBACK command, the
ROLLBACK PREPARED command does not enable
retry-management logic for a message that was read in a prepared
transaction. Instead, ROLLBACK PREPARED only unlocks
the message in the queue table.
pgpro_queue.database_with_managed_retry (text)
#
Specifies a comma-separated list of database names for which
retry-management logic is enabled. If a database name is missing
from the list, incorrectly specified, or uses an invalid delimiter,
any messages unsuccessfully processed within a transaction, i.e.
sent to retry on rollback, will be deleted, regardless of the queue
or message parameters. Be careful when specifying this parameter because
its incorrect change may cause loss of some messages from the retry
queue. The default value is postgres. This parameter
can be set either at server start or when the configuration is reloaded.
pgpro_queue.shared_retry_list_size (integer)
#Specifies the capacity of the hash table in shared memory for the Shared Retry Pending List. This parameter essentially determines the maximum number of retry requests that user backend processes can simultaneously queue for processing by the launcher. The default value is 10000.
pgpro_queue.callback (boolean)
#
Enables the callback functions. The default value is
false.
pgpro_queue.callback_min_workers (integer)
#
Specifies the minimum number of workers per database. The default value
is 3.
pgpro_queue.callback_max_workers (integer)
#
Specifies the maximum number of workers across the entire cluster.
The default value is 10.
pgpro_queue.callback_naptime (integer)
#
Specifies the delay in milliseconds between activity rounds for workers and
the leader worker within a database. The default value is
150.
pgpro_queue.callback_check_timeout (integer)
#
Specifies the time in seconds between checks performed by the leader worker
for properly configured callback functions in the database. The default value is
60.
pgpro_queue.callback_worker_idle_timeout (integer)
#
Specifies the timeout in seconds after which one of the idle workers is
shut down by a leader worker. The default value is 60.
pgpro_queue.callback_worker_start_delay (integer)
#
Specifies the time in seconds, that a worker remains busy processing a callback function
before a new worker is started within a database. It allows to maintain
performance under changing load conditions. The default value is
5.
pgpro_queue provides the following procedures for queue management.
CREATE PROCEDURE CREATE_QUEUE(
IN queue_name name,
IN queue_type char DEFAULT 'N'::char,
IN queue_dlq name DEFAULT null,
IN queue_retries int DEFAULT 10,
IN queue_retry_delay int DEFAULT 30
)
#
Creates a queue in an existing queue table. The queue owner can
restrict other users from accessing a queue by denying them
access to the queue table at the ACL level or by using the reset_queue_access
and revoke_queue_access
procedures. If any parameter is not specified, its default value is
applied for all new messages.
queue_name: The name of the queue, which
can only contain letters, digits, and underscores, and cannot
exceed 54 symbols.
queue_type: The type of the new queue. The
valid values are: N for a normal queue
(the default), and D for the dead-letter
queue. (The dead-letter queue is not implemented at the moment,
the parameter value is reserved for use in future releases.)
queue_dlq: The existing dead-letter queue.
(The dead-letter queue is not implemented at the moment, the
parameter is reserved for use in future releases.)
queue_retries: The maximum number of retries.
The default value is 10.
queue_retry_delay: The number of seconds
until a message is scheduled for reprocessing after a
ROLLBACK. Specify 0 to retry the message
immediately.
CREATE PROCEDURE ALTER_QUEUE(
IN queue_name name,
IN new_type char DEFAULT null,
IN new_dlq name DEFAULT null,
IN new_retries int DEFAULT null,
IN new_retrydelay int DEFAULT null
)
#Modifies the parameters of an existing queue. If any parameter is not specified, its default value is applied for all new messages. This change will not affect any existing messages. Only the queue owner who created the queue can change its parameters.
queue_name: The name of the queue, which
can only contain letters, digits, and underscores.
new_type: The new type of the queue. The
valid values are: N for a normal queue
(the default), and D for the dead-letter
queue. (The dead-letter queue is not implemented at the moment,
the parameter value is reserved for use in future releases.)
new_dlq: The existing dead-letter queue.
(The dead-letter queue is not implemented at the moment, the
parameter is reserved for use in future releases.)
new_retries: The maximum number of retries.
new_retrydelay: The number of seconds
until a message is scheduled for reprocessing after a
ROLLBACK. Specify 0 to retry the message
immediately.
CREATE PROCEDURE DROP_QUEUE(IN queue_name name)
#
Deletes a queue with the specified name. The user executing this
procedure must own this queue or have the privileges on it granted
using the grant_queue_access
procedure.
CREATE FUNCTION GET_QUEUE_TABLE(IN queue_name name)
#Returns the OID of the queue table used for the queue.
CREATE PROCEDURE RESET_QUEUE_ACCESS(IN queue_name name)
#Restricts access to a queue for all users, except the queue owner. This procedure can only be executed by the queue owner. If a superuser who created the extension is deleted, the procedure does nothing.
CREATE PROCEDURE GRANT_QUEUE_ACCESS(
IN queue_name name,
IN role_name name
)
CREATE PROCEDURE GRANT_QUEUE_ACCESS(
IN queue_name name,
IN role_oid oid
)
#Grants access to a queue to the specified role.
queue_name: The name of the queue.
role_name: The name of the role.
role_oid: The OID of the role.
CREATE PROCEDURE REVOKE_QUEUE_ACCESS(
IN queue_name name,
IN role_name name
)
CREATE PROCEDURE REVOKE_QUEUE_ACCESS(
IN queue_name name,
IN role_oid oid
)
#Revokes access to a queue from the specified role.
pgpro_queue provides the following procedures for sending messages to the queue. It is not recommended to mix XML and JSONB payloads within the same queue.
CREATE PROCEDURE INSERT_MESSAGE(
IN q_name name,
IN q_msg_body jsonb,
IN q_msg_priority int DEFAULT 0,
IN q_msg_properties jsonb DEFAULT '{}'::jsonb,
IN q_msg_retries int DEFAULT null,
IN q_msg_retrydelay int DEFAULT null,
IN q_msg_enable_time timestamptz DEFAULT null
)
CREATE PROCEDURE INSERT_MESSAGE_XML(
IN q_name name,
IN q_msg_body xml,
IN q_msg_priority int DEFAULT 0,
IN q_msg_properties jsonb DEFAULT '{}'::jsonb,
IN q_msg_retries int DEFAULT null,
IN q_msg_retrydelay int DEFAULT null,
IN q_msg_enable_time timestamptz DEFAULT null
)
#Inserts a JSON/XML message into the queue.
q_name: The name of the queue.
q_msg_body: Message payload (JSONB or XML).
q_msg_priority: Message priority. The
lower the number, the higher the priority. The default value
is 0, which means the highest priority.
q_msg_properties: No properties by default.
q_msg_retries: The maximum number of
retries. If not specified, the default value from the queue
is used.
q_msg_retrydelay: The number of seconds
until a message is scheduled for reprocessing after a
ROLLBACK.
If not specified, the default value from the queue is used.
q_msg_enable_time: Time of message delay.
pgpro_queue provides the following functions for reading messages from the queue.
CREATE FUNCTION READ_MESSAGE(
IN q_name name,
IN q_msg_hfilter jsonb DEFAULT null,
IN q_msg_pfilter jsonb DEFAULT null
) RETURNS jsonb
CREATE FUNCTION READ_MESSAGE_XML(
IN q_name name,
IN q_msg_hfilter jsonb DEFAULT null,
IN q_msg_pfilter jsonb DEFAULT null
) RETURNS xml
CREATE FUNCTION READ_MESSAGE_ANY(
IN q_name name,
IN q_msg_hfilter jsonb DEFAULT null,
IN q_msg_pfilter jsonb DEFAULT null
) RETURNS RECORD
#Retrieves a message from the queue as a read operation without waiting. Returns the message payload or null if the queue is empty. On a successful read, the message is removed from the queue table.
q_name: The name of the queue.
q_msg_hfilter: A header filter, applied in
addition to a standard filter.
q_msg_pfilter: A properties filter,
applied in addition to a standard filter.
q_jsonb: Returned JSON body value.
q_xml: Returned XML body value.
CREATE FUNCTION READ_MESSAGE_BY_ID(
IN q_name name,
IN msgid bigint
) RETURNS jsonb
CREATE FUNCTION READ_MESSAGE_BY_ID_XML(
IN q_name name,
IN msgid bigint
) RETURNS xml
CREATE FUNCTION READ_MESSAGE_BY_ID_ANY(
IN q_name name,
IN msgid bigint
) RETURNS RECORD
#Retrieves a specific message from the queue by its ID as a read operation without waiting. Returns the message payload or null if the queue is empty. On a successful read, the message is removed from the queue table.
q_name: The name of the queue.
msgid: The message ID.
body: Returned JSON body value.
body_xml: Returned XML body value.
pgpro_queue provides the following functions and procedures for managing exchanges.
CREATE FUNCTION CREATE_EXCHANGE(
IN e_name name,
IN e_type text
)
#
Creates an exchange with the specified name and type. The exchange
owner can restrict other users from accessing the exchange by using
the revoke_publish_access
procedure.
e_name: The name of the exchange, which can
only contain letters, digits, and underscores, and cannot exceed
63 symbols.
e_type: The type of the new exchange. Possible
values are:
fanout (default): Routes messages to all
bound queues.
direct: Routes messages to queues based
on a matching routing key.
CREATE FUNCTION DROP_EXCHANGE(
IN e_name name,
IN do_cascade bool
)
#Deletes a specified exchange.
e_name: The name of the exchange.
do_cascade: When set to true,
all the bound queues are also deleted. When set to
false, the function fails if any queues are
bound to the exchange.
CREATE FUNCTION BIND_QUEUE(
IN q_name name,
IN e_name name,
IN e_key text
)
#
Binds a queue to an exchange. For direct exchanges,
bindings are based on routing keys. One queue can be bound to multiple
exchanges.
To avoid operational problems, the user executing this procedure must
own the specified exchange and queue or have the privileges on them
granted using the grant_publish_access
and
grant_queue_access
procedures.
q_name: The name of the queue.
e_name: The name of the exchange.
e_key: The routing key for the binding.
NULL (default) is used only for fanout exchanges.
CREATE FUNCTION UNBIND_QUEUE(
IN q_name name,
IN e_name name,
IN e_key text
)
#Deletes a binding between a queue and an exchange. If a routing key is provided, only that specific key is deleted from the binding. Deleting the last key unbinds the queue.
To avoid operational problems, the user executing this procedure must
own the specified exchange and queue or have the privileges on
them granted using the grant_publish_access
and
grant_queue_access
procedures.
If a queue is deleted using DROP_QUEUE(), it is automatically unbound from all exchanges.
q_name: The name of the queue.
e_name: The name of the exchange.
e_key: The routing key to be deleted. If
NULL (default), the queue is unbound completely.
CREATE PROCEDURE GRANT_PUBLISH_ACCESS(
IN exchange_name name,
IN role_name name
)
#
Grants the INSERT, UPDATE, and
DELETE privileges on an exchange and a binding table
to the specified role. This procedure can only be executed by the
superuser.
exchange_name: The name of the exchange.
role_name: The name of the role.
CREATE PROCEDURE REVOKE_PUBLISH_ACCESS(
IN exchange_name name,
IN role_name name
)
#Revokes all privileges on the exchange and the binding table from the specified role. This procedure can only be executed by a superuser.
exchange_name: The name of the exchange.
role_name: The name of the role.
pgpro_queue provides the following procedures for publishing messages to exchanges. It is not recommended to mix XML and JSONB payloads within the same queue.
CREATE PROCEDURE PUBLISH_MESSAGE(
IN e_name name,
IN e_routing_key text,
IN q_msg_body jsonb,
IN q_msg_priority int DEFAULT 0,
IN q_msg_properties jsonb DEFAULT '{}'::jsonb,
IN q_msg_retries int DEFAULT null,
IN q_msg_retrydelay int DEFAULT null,
IN q_msg_enable_time timestamptz DEFAULT null
)
CREATE PROCEDURE PUBLISH_MESSAGE_XML(
IN e_name name,
IN e_routing_key text,
IN q_msg_body xml,
IN q_msg_priority int DEFAULT 0,
IN q_msg_properties jsonb DEFAULT '{}'::jsonb,
IN q_msg_retries int DEFAULT null,
IN q_msg_retrydelay int DEFAULT null,
IN q_msg_enable_time timestamptz DEFAULT null
)
#Publishes a JSON/XML message via an exchange.
e_name: The name of the exchange.
e_routing_key: The routing key.
q_msg_body: Message payload (JSONB or XML).
q_msg_priority: Message priority. The
lower the number, the higher the priority. The default value
is 0, which means the highest priority.
q_msg_properties: Message properties, for
example, the encoding name or the application-level routing
information. No properties by default.
q_msg_retries: The maximum number of
retries. If not specified, the default value from the queue
is used.
q_msg_retrydelay: The number of seconds
until a message is scheduled for reprocessing after a
ROLLBACK.
If not specified, the default value from the queue is used.
q_msg_enable_time: Time of message delay.
pgpro_queue provides the following procedures for callback function management.
CREATE PROCEDURE REGISTER_CALLBACK(
IN q_name text,
IN callback text
)
#Registers a new callback function for the specified queue and creates a new worker for this function.
Only one callback function can be registered per queue.
q_name: The name of the queue.
callback: The procedure signature, for
example, new_callback_proc(JSONB).
CREATE PROCEDURE UNREGISTER_CALLBACK(
IN q_name text,
IN callback text
)
#Unregisters the specified callback function.
If a queue is deleted using DROP_QUEUE(), its callback function is unregistered automatically.
q_name: The name of the queue.
callback: The procedure signature.
Postgres Professional, Moscow, Russia