49.6. Модули вывода логического декодирования

49.6.1. Функция инициализации
49.6.2. Возможности
49.6.3. Режимы вывода
49.6.4. Обработчики в модуле вывода
49.6.5. Функции для формирования вывода

Пример модуля вывода можно найти в подкаталоге contrib/test_decoding в дереве исходного кода Postgres Pro Enterprise.

49.6.1. Функция инициализации

Модуль вывода загружается в результате динамической загрузки разделяемой библиотеки (при этом в качестве имени библиотеки задаётся имя модуля). Для нахождения библиотеки применяется обычный путь поиска библиотек. В этой библиотеке должна быть функция _PG_output_plugin_init, которая показывает, что библиотека на самом деле представляет собой модуль вывода, и устанавливает требуемые обработчики модуля вывода. Этой функции передаётся структура, в которой должны быть заполнены указатели на функции-обработчики отдельных действий.

typedef struct OutputPluginCallbacks
{
    LogicalDecodeStartupCB startup_cb;
    LogicalDecodeBeginCB begin_cb;
    LogicalDecodeChangeCB change_cb;
    LogicalDecodeCommitCB commit_cb;
    LogicalDecodeAbortCB abort_cb;
    LogicalDecodeMessageCB message_cb;
    LogicalDecodeFilterPrepareCB filter_prepare_cb;
    LogicalDecodePrepareCB prepare_cb;
    LogicalDecodeCommitPreparedCB commit_prepared_cb;
    LogicalDecodeAbortPreparedCB abort_prepared_cb;
    LogicalDecodeFilterByOriginCB filter_by_origin_cb;
    LogicalDecodeFilterDecodeTxnCB filter_decode_txn_cb;
    LogicalDecodeShutdownCB shutdown_cb;
} OutputPluginCallbacks;

typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);

Обработчики begin_cb, change_cb и commit_cb должны устанавливаться обязательно, а startup_cb, filter_by_origin_cb и shutdown_cb могут отсутствовать.

49.6.2. Возможности

Для декодирования, форматирования и вывода изменений модули вывода могут использовать практически всю обычную инфраструктуру сервера, включая вызов функций вывода типов. К отношениям разрешается доступ только на чтение, если только эти отношения были созданы программой initdb в схеме pg_catalog, либо помечены как пользовательские таблицы каталогов командами

ALTER TABLE user_catalog_table SET (user_catalog_table = true);
CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true);

Любые действия, которые требует присвоения идентификатора транзакции, запрещаются. В частности, к этим действиям относятся операции записи в таблицы, изменения DDL и вызов txid_current().

49.6.3. Режимы вывода

Обработчики в модуле вывода могут передавать данные потребителю в практически любых форматах. Для некоторых вариантов использования, например, просмотра изменений через SQL, вывод информации в типах, которые могут содержать произвольные данные (например, bytea), может быть неудобоваримым. Если модуль вывода выводит только текстовые данные в кодировке сервера, он может объявить это, установив в OutputPluginOptions.output_type значение OUTPUT_PLUGIN_TEXTUAL_OUTPUT вместо OUTPUT_PLUGIN_BINARY_OUTPUT в обработчике запуска. В этом случае все данные должны быть в кодировке сервера, чтобы их можно было передать в значении типа text. Это контролируется в сборках с включёнными проверочными утверждениями.

49.6.4. Обработчики в модуле вывода

Модуль вывода уведомляется о происходящих изменениях через различные обработчики, которые он должен установить.

Concurrent transactions are decoded in commit order, and only changes belonging to a specific transaction are decoded between the begin and commit callbacks. Transactions that were rolled back explicitly or implicitly never get decoded. Successful savepoints are folded into the transaction containing them in the order they were executed within that transaction. A transaction that is prepared for a two-phase commit using PREPARE TRANSACTION will also be decoded if the output plugin callbacks needed for decoding them are provided. It is possible that the current transaction which is being decoded is aborted concurrently via a ROLLBACK PREPARED command. In that case, the logical decoding will be aborted midways.

Примечание

Декодироваться будут только те транзакции, которые уже успешно сброшены на диск. Вследствие этого, COMMIT может не декодироваться в следующем сразу за ним вызове pg_logical_slot_get_changes(), когда synchronous_commit имеет значение off.

49.6.4.1. Обработчик запуска

Необязательный обработчик startup_cb вызывается, когда слот репликации создаётся или через него запрашивается передача изменений, независимо от того, в каком количестве изменения готовы к передаче.

typedef void (*LogicalDecodeStartupCB) (struct LogicalDecodingContext *ctx,
                                        OutputPluginOptions *options,
                                        bool is_init);

Параметр is_init будет равен true, когда слот репликации создаётся, и false в противном случае. Параметр options указывает на структуру параметров, которые могут устанавливать модули вывода:

typedef struct OutputPluginOptions
{
    OutputPluginOutputType output_type;
} OutputPluginOptions;

В поле output_type должно быть значение OUTPUT_PLUGIN_TEXTUAL_OUTPUT или OUTPUT_PLUGIN_BINARY_OUTPUT. См. также Подраздел 49.6.3.

Обработчик запуска должен проверить параметры, представленные в ctx->output_plugin_options. Если модулю вывода требуется поддерживать состояние, он может сохранить его в ctx->output_plugin_private.

49.6.4.2. Обработчик выключения

Необязательный обработчик shutdown_cb вызывается, когда ранее активный слот репликации перестаёт использоваться, так что ресурсы, занятые модулем вывода, можно освободить. При этом слот не обязательно удаляется, прекращается только потоковая передача через него.

typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);

49.6.4.3. Обработчик начала транзакции

Обязательный обработчик begin_cb вызывается, когда декодируется начало зафиксированной транзакции. Прерванные транзакции и их содержимое никогда не декодируется.

typedef void (*LogicalDecodeBeginCB) (struct LogicalDecodingContext *ctx,
                                      ReorderBufferTXN *txn);

Параметр txn содержит метаинформацию о транзакции, в частности её идентификатор и время её фиксирования.

49.6.4.4. Обработчик завершения транзакции

Обязательный обработчик commit_cb вызывается, когда декодируется фиксирование транзакции. Перед этим обработчиком будет вызываться обработчик change_cb для всех изменённых строк (если строки были изменены).

typedef void (*LogicalDecodeCommitCB) (struct LogicalDecodingContext *ctx,
                                       ReorderBufferTXN *txn,
                                       XLogRecPtr commit_lsn);

49.6.4.5. Transaction Prepare Callback

The optional prepare_cb callback is called whenever a transaction which is prepared for two-phase commit has been decoded. The change_cb callbacks for all modified rows will have been called before this, if there have been any modified rows.

typedef void (*LogicalDecodePrepareCB) (struct LogicalDecodingContext *ctx,
                                        ReorderBufferTXN *txn,
                                        XLogRecPtr prepare_lsn);

49.6.4.6. Commit Prepared Transaction Callback

The optional commit_prepared_cb callback is called whenever a commit prepared transaction has been decoded. The gid field, which is part of the txn parameter can be used in this callback.

typedef void (*LogicalDecodeCommitPreparedCB) (struct LogicalDecodingContext *ctx,
                                               ReorderBufferTXN *txn,
                                               XLogRecPtr commit_lsn);

49.6.4.7. Rollback Prepared Transaction Callback

The optional abort_prepared_cb callback is called whenever a rollback prepared transaction has been decoded. The gid field, which is part of the txn parameter can be used in this callback.

typedef void (*LogicalDecodeAbortPreparedCB) (struct LogicalDecodingContext *ctx,
                                              ReorderBufferTXN *txn,
                                              XLogRecPtr abort_lsn);

49.6.4.8. Transaction Abort Callback

The required abort_cb callback is called whenever a transaction abort has to be initiated. This can happen if we are decoding a transaction that has been prepared for two-phase commit and a concurrent rollback happens while we are decoding it. It might make sense, even before we commence decoding, in such cases to check if the rollback happened even before we start looking at the changes to completely avoid the decoding of such transactions.

typedef void (*LogicalDecodeAbortCB) (struct LogicalDecodingContext *ctx,
                                       ReorderBufferTXN *txn,
                                       XLogRecPtr abort_lsn);

49.6.4.9. Обработчик изменения

Обязательный обработчик change_cb вызывается для каждого отдельного изменения строки в транзакции, производимого командами INSERT, UPDATE или DELETE. Даже если команда изменила несколько строк сразу, этот обработчик будет вызываться для каждой отдельной строки.

typedef void (*LogicalDecodeChangeCB) (struct LogicalDecodingContext *ctx,
                                       ReorderBufferTXN *txn,
                                       Relation relation,
                                       ReorderBufferChange *change);

Параметры ctx и txn имеют то же содержимое, что и для обработчиков begin_cb и commit_cb; дополнительный дескриптор отношения relation указывает на отношение, к которому принадлежит строка, а структура change описывает передаваемое изменение строки.

Примечание

В процессе логического декодирования могут быть обработаны изменения только в таблицах, не являющихся нежурналируемыми (см. описание UNLOGGED) или временными (см. описание TEMPORARY или TEMP).

49.6.4.10. Обработчик фильтрации источника

Необязательный обработчик filter_by_origin_cb вызывается, чтобы отметить, интересуют ли модуль вывода изменения, воспроизводимые из указанного источника (origin_id).

typedef bool (*LogicalDecodeFilterByOriginCB) (struct LogicalDecodingContext *ctx,
                                               RepOriginId origin_id);

В параметре ctx передаётся та же информация, что и для других обработчиков. Чтобы отметить, что изменения, поступающие из переданного узла, не представляют интереса, модуль должен вернуть true, вследствие чего эти изменения будут фильтроваться; в противном случае он должен вернуть false. Другие обработчики для фильтруемых транзакций и изменений вызываться не будут.

Это полезно при реализации каскадной или разнонаправленной репликации. Фильтрация по источнику в таких конфигурациях позволяет предотвратить передачу взад-вперёд одних и тех же изменений. Хотя информацию об источнике можно также извлечь из транзакций и изменений, фильтрация с помощью этого обработчика гораздо более эффективна.

49.6.4.11. Decode Filter Callback

The optional filter_decode_txn_cb callback is called to determine whether data that is part of the current transaction should be continued to be decoded.

typedef bool (*LogicalDecodeFilterDecodeTxnCB) (struct LogicalDecodingContext *ctx,
                                                ReorderBufferTXN *txn);

The ctx parameter has the same contents as for the other callbacks. The txn parameter contains meta information about the transaction, like its XID. Note however that it can be NULL in some cases. To signal that decoding process should terminate, return true; false otherwise.

49.6.4.12. Prepare Filter Callback

The optional filter_prepare_cb callback is called to determine whether data that is part of the current two-phase commit transaction should be considered for decode at this prepare stage or as a regular one-phase transaction at COMMIT PREPARED time later. To signal that decoding should be skipped, return true; false otherwise.

typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
                                              ReorderBufferTXN *txn,
                                              TransactionId xid,
                                              const char *gid);

The ctx parameter has the same contents as for the other callbacks. The txn parameter contains meta information about the transaction. The xid contains the XID because txn can be NULL in some cases. The gid is the identifier that later identifies this transaction for COMMIT PREPARED or ROLLBACK PREPARED. The callback has to provide the same static answer for a given combination of xid and gid every time it is called. To signal that decoding should be skipped, return true; false otherwise.

49.6.4.13. Обработчик произвольных сообщений

Необязательный обработчик message_cb вызывается при получении сообщения логического декодирования.

typedef void (*LogicalDecodeMessageCB) (struct LogicalDecodingContext *ctx,
                                        ReorderBufferTXN *txn,
                                        XLogRecPtr message_lsn,
                                        bool transactional,
                                        const char *prefix,
                                        Size message_size,
                                        const char *message);

Параметр txn содержит метаинформацию о транзакции, включая время её фиксации и её XID. Заметьте, однако, что в нём может передаваться NULL, когда сообщение нетранзакционное и транзакции, в которой было выдано сообщение, ещё не назначен XID. В параметре lsn отмечается позиция сообщения в WAL. Параметр transactional показывает, было ли сообщение передано как транзакционное. В параметре prefix передаётся некоторый префикс (завершающийся нулём), по которому текущий модуль может выделять интересующие его сообщения. И наконец, параметр message содержит само сообщение размером message_size байт.

Необходимо дополнительно позаботиться о том, чтобы префикс, определяющий интересующие модуль вывода сообщения, был уникальным. Удачным выбором обычно будет имя расширения или самого модуля вывода.

49.6.5. Функции для формирования вывода

Чтобы действительно вывести данные, модули вывода могут записывать их в буфер StringInfo через ctx->out, внутри обработчиков begin_cb, commit_cb или change_cb. Прежде чем записывать данные в этот буфер, необходимо вызвать OutputPluginPrepareWrite(ctx, last_write), а завершив запись в буфер, нужно вызвать OutputPluginWrite(ctx, last_write), чтобы собственно произвести запись. Параметр last_write указывает, была ли эта определённая операция записи последней в данном обработчике.

Следующий пример показывает, как вывести данные для потребителя модуля вывода:

OutputPluginPrepareWrite(ctx, true);
appendStringInfo(ctx->out, "BEGIN %u", txn->xid);
OutputPluginWrite(ctx, true);