Расширение pgpro_queue включено в состав Postgres Pro Enterprise как стандартное расширение для управления очередями сообщений. Оно обеспечивает надёжное взаимодействие между приложениями с использованием очередей в базе данных.
pgpro_queue позволяет управлять очередями сообщений непосредственно в базе данных. С помощью pgpro_queue можно создавать очереди, добавлять в них сообщения и эффективно обрабатывать их, обеспечивая целостность и надёжность данных. Сообщение из очереди обрабатывается одним из потребителей. Полученное из очереди сообщение становится недоступным для других потребителей.
Благодаря тому, что pgpro_queue обеспечивает интегрированное в базу данных управление очередями сообщений, стандартные возможности, такие как восстановление транзакций, перезапуск после сбоя и синхронизация с резервным сервером, полностью поддерживаются.
Сообщения в очереди обрабатываются на основе назначенных приоритетов, что позволяет обрабатывать сообщения с более высоким приоритетом раньше остальных, даже если они поступили позже.
Диаграмма ниже демонстрирует практический пример запроса на создание файла в формате PDF. Вместо немедленной обработки такие запросы могут быть отправлены в очередь, что позволяет эффективно справляться с большим объёмом задач, улучшать пользовательский опыт благодаря асинхронной обработке и обеспечивать большую гибкость за счёт раздельной архитектуры.
Рисунок F.1. Пример создания PDF-файла
Расширение pgpro_queue включено в состав Postgres Pro Enterprise. Установив Postgres Pro Enterprise, выполните следующие действия, чтобы подготовить pgpro_queue:
Добавьте pgpro_queue в параметр shared_preload_libraries в файле postgresql.conf:
shared_preload_libraries = 'pgpro_queue'
Создайте расширение pgpro_queue, выполнив следующий запрос:
CREATE EXTENSION pgpro_queue;
Будет создан набор API-функций в текущей схеме.
Инициализируйте расширение:
SELECT queue_initialize();
Эта функция создаёт выделенную схему с именем pgpro_queue_data, в которой хранятся все служебные объекты, такие как таблицы метаданных, представления и таблицы для очередей. При инициализации объекты очереди отделяются от самого расширения, что обеспечивает корректную репликацию и выгрузку базы данных.
Чтобы удалить все объекты, созданные расширением, выполните следующую команду (доступно только суперпользователям):
SELECT queue_deinitialize();
Эта функция выполняет каскадное удаление схемы pgpro_queue_data.
pgpro_queue.database_with_managed_retry (text) #Указывает список имён баз данных, разделённых запятыми, для которых включена логика управления повторными попытками. Если имя базы данных отсутствует в списке, указано неправильно или используется недопустимый разделитель, все сообщения, неудачно обработанные в рамках транзакции (то есть отправленные на повторную попытку выполнения при откате), будут удалены, независимо от параметров очереди или сообщения. Значение по умолчанию — postgres. Этот параметр можно установить как при запуске сервера, так и при перезагрузке конфигурации.
pgpro_queue.shared_retry_list_size (integer) #Указывает размер хеш-таблицы в общей памяти для хранения списка повторных запросов (Shared Retry Pending List). Этот параметр в сущности определяет максимальное количество запросов на повторную попытку, которые процессы пользователя могут одновременно ставить в очередь для обработки процессом запуска. Значение по умолчанию — 10000.
pgpro_queue предоставляет следующие процедуры для управления очередями.
CREATE PROCEDURE CREATE_QUEUE( IN q_name name, IN q_type char DEFAULT 'N'::char, IN q_dlq name DEFAULT null, IN q_retries int DEFAULT 10, IN q_retrydelay int DEFAULT 30 )
#Создаёт очередь в существующей таблице очереди. Владелец очереди может ограничить доступ других пользователей к очереди, отказав им в доступе к таблице очереди на уровне списка управления доступом. Если какой-либо параметр не указан, для всех новых сообщений применяется его значение по умолчанию.
q_name: Имя очереди, которое может содержать только буквы, цифры и подчёркивания.
q_type: Тип новой очереди. Допустимые значения: N для обычной очереди (по умолчанию) и D для очереди недоставленных сообщений (dead-letter queue). (Очередь недоставленных сообщений в данный момент не реализована, значение параметра зарезервировано для использования в будущих версиях.)
q_dlq: Существующая очередь недоставленных сообщений. (Очередь недоставленных сообщений в данный момент не реализована, параметр зарезервирован для использования в будущих версиях.)
q_retries: Максимальное количество повторных попыток. Значение по умолчанию — 10.
q_retrydelay: Количество секунд до того, как сообщение будет запланировано для повторной обработки после выполнения ROLLBACK. Укажите 0, чтобы повторить обработку сообщения немедленно.
CREATE PROCEDURE ALTER_QUEUE( IN q_name name, IN new_type char DEFAULT null, IN new_dlq name DEFAULT null, IN new_retries int DEFAULT null, IN new_retrydelay int DEFAULT null )
#Изменяет параметры существующей очереди. Если какой-либо параметр не указан, для всех новых сообщений применяется его значение по умолчанию. Это изменение не повлияет на уже существующие сообщения. Изменять параметры очереди может только создавший её владелец.
q_name: Имя очереди, которое может содержать только буквы, цифры и подчёркивания.
new_type: Новый тип очереди. Допустимые значения: N для обычной очереди (по умолчанию) и D для очереди недоставленных сообщений (dead-letter queue). (Очередь недоставленных сообщений в данный момент не реализована, значение параметра зарезервировано для использования в будущих версиях.)
new_dlq: Существующая очередь недоставленных сообщений. (Очередь недоставленных сообщений в данный момент не реализована, параметр зарезервирован для использования в будущих версиях.)
new_retries: Максимальное количество повторных попыток.
new_retrydelay: Количество секунд до того, как сообщение будет запланировано для повторной обработки после выполнения ROLLBACK. Укажите 0, чтобы повторить обработку сообщения немедленно.
CREATE PROCEDURE DROP_QUEUE(IN q_name name)
#Удаляет очередь с указанным именем.
CREATE FUNCTION GET_QUEUE_TABLE(IN q_name name)
#Возвращает OID таблицы очереди, используемой для очереди.
pgpro_queue предоставляет следующие процедуры для отправки сообщений в очередь. Не рекомендуется совмещать строки сообщений в форматах XML и JSONB в одной и той же очереди.
CREATE PROCEDURE INSERT_MESSAGE( IN q_name name, IN q_msg_body jsonb, IN q_msg_priority int DEFAULT 0, IN q_msg_properties jsonb DEFAULT '{}'::jsonb, IN q_msg_retries int DEFAULT null, IN q_msg_retrydelay int DEFAULT null, IN q_msg_enable_time timestamptz DEFAULT null )
CREATE PROCEDURE INSERT_MESSAGE_XML( IN q_name name, IN q_msg_body xml, IN q_msg_priority int DEFAULT 0, IN q_msg_properties jsonb DEFAULT '{}'::jsonb, IN q_msg_retries int DEFAULT null, IN q_msg_retrydelay int DEFAULT null, IN q_msg_enable_time timestamptz DEFAULT null )
#Вставляет в очередь сообщение в формате JSON/XML.
q_name: Имя очереди.
q_msg_body: Строка сообщения (JSONB или XML).
q_msg_priority: Приоритет сообщения.
q_msg_properties: Свойств по умолчанию нет.
q_msg_retries: Максимальное количество повторных попыток. Если не указано, используется значение по умолчанию из очереди.
q_msg_retrydelay: Количество секунд до того, как сообщение будет запланировано для повторной обработки после выполнения ROLLBACK. Если не указано, используется значение по умолчанию из очереди.
q_msg_enable_time: Время задержки сообщения.
pgpro_queue предоставляет следующие функции для чтения сообщений из очереди.
CREATE FUNCTION READ_MESSAGE( IN q_name name, IN q_msg_hfilter jsonb DEFAULT null, IN q_msg_pfilter jsonb DEFAULT null ) RETURNS SETOF jsonb
CREATE FUNCTION READ_MESSAGE_XML( IN q_name name, IN q_msg_hfilter jsonb DEFAULT null, IN q_msg_pfilter jsonb DEFAULT null ) RETURNS SETOF xml
CREATE FUNCTION READ_MESSAGE_ANY( IN q_name name, IN q_msg_hfilter jsonb DEFAULT null, IN q_msg_pfilter jsonb DEFAULT null, OUT q_jsonb jsonb, OUT q_xml xml ) RETURNS SETOF RECORD
#Извлекает сообщение из очереди в режиме чтения без ожидания. Возвращает строку сообщения или null, если очередь пуста. Если сообщение прочитано успешно, оно удаляется из таблицы очереди.
q_name: Имя очереди.
q_msg_hfilter: Фильтр заголовков, применяемый дополнительно к стандартному фильтру.
q_msg_pfilter: Фильтр свойств, применяемый дополнительно к стандартному фильтру.
q_jsonb: Возвращаемое значение тела сообщения в формате JSON.
q_xml: Возвращаемое значение тела сообщения в формате XML.
CREATE FUNCTION READ_MESSAGE_BY_ID( IN q_name name, IN msgid bigint ) RETURNS SETOF jsonb
CREATE FUNCTION READ_MESSAGE_BY_ID_XML( IN q_name name, IN msgid bigint ) RETURNS SETOF xml
CREATE FUNCTION READ_MESSAGE_BY_ID_ANY( IN q_name name, IN msgid bigint, OUT body jsonb, OUT body_xml xml ) RETURNS SETOF RECORD
#Извлекает конкретное сообщение из очереди по его ID в режиме чтения без ожидания. Возвращает строку сообщения или null, если очередь пуста. Если сообщение прочитано успешно, оно удаляется из таблицы очереди.
q_name: Имя очереди.
msgid: ID сообщения.
body: Возвращаемое значение тела сообщения в формате JSON.
body_xml: Возвращаемое значение тела сообщения в формате XML.
Postgres Professional, Москва, Россия