Модуль wal2json — это расширение Postgres Pro для логического декодирования, которое преобразует изменения базы данных из журнала предзаписи (WAL) в формат JSON. Postgres Pro имеет доступ к кортежам, сгенерированным командами INSERT и UPDATE. В зависимости от выбора варианта идентификации реплики также могут быть доступны предыдущие версии строк для команд UPDATE и DELETE. Изменения можно получить либо по протоколу потоковой репликации (слоты логической репликации), либо через специальный SQL API.
Формат версии 1 создаёт JSON-объект для каждой транзакции. Этот объект содержит все новые и старые кортежи. Также есть возможность включить в него такие свойства, как метки времени транзакций, имена, дополненные именами схем, типы данных и идентификаторы транзакций.
Формат версии 2 создаёт JSON-объект для каждого кортежа с необязательными JSON-объектами, обозначающими начало и конец транзакции. Также можно включить разные свойства транзакции.
wal2json поставляется вместе с Postgres Pro Standard в виде отдельного пакета wal2json-std-16 (подробные инструкции по установке приведены в Главе 16). После установки Postgres Pro Standard выполните следующие действия, чтобы подготовить wal2json к работе:
Включите логическое декодирование, задав для параметра wal_level значение logical в файле postgresql.conf.
Дополнительно вы можете задать значения для параметров max_replication_slots и max_wal_senders.
Перезапустите сервер баз данных, чтобы изменения вступили в силу.
Модуль wal2json предоставляет различные параметры для управления логическим декодированием:
include-xidsДобавить идентификатор транзакции к каждому набору изменений в JSON-выводе. Значение по умолчанию — off.
include-timestampДобавить timestamp к каждому набору изменений в JSON-выводе. Значение по умолчанию — off.
include-schemasДобавить имя схемы к каждой записи об изменениях в JSON-выводе. Значение по умолчанию — on.
include-typesДобавить type к каждой записи об изменениях в JSON-выводе. Значение по умолчанию — on.
include-typmodДобавить модификатор типа для столбцов, у которых они есть. Значение по умолчанию — on.
include-type-oidsДобавить OID типа. Значение по умолчанию — off.
include-domain-data-typeЗаменить доменное имя нижележащим типом данных. Значение по умолчанию — off.
include-column-positionsДобавить позицию столбца (pg_attribute.attnum). Значение по умолчанию — off.
include-originДобавить источник каждого изменения. Значение по умолчанию — off.
include-not-nullДобавить информацию, помечен ли столбец как not null в поле columnoptionals JSON-вывода. Значение по умолчанию — off.
include-defaultДобавить выражения по умолчанию в JSON-вывод. Значение по умолчанию — off.
include-pkДобавить информацию о первичном ключе, включая имена столбцов и типы данных, в поле pk JSON-вывода. Значение по умолчанию — off.
numeric-data-types-as-stringПреобразовать значения числовых типов данных в строки в JSON-выводе. Спецификация JSON не поддерживает Infinity и NaN как допустимые числовые значения. Для чисел двойной точности могут возникнуть потенциальные проблемы совместимости. Значение по умолчанию — off.
pretty-printДобавить пробелы и отступы в структуру JSON-вывода. Значение по умолчанию — off.
write-in-chunksВключить запись JSON-вывода небольшими фрагментами вместо целого набора изменений. Этот параметр используется только когда для format-version задано значение 1. Значение по умолчанию — off.
include-lsnДобавить поле nextlsn к каждому набору изменений. Значение по умолчанию — off.
include-transactionДобавить записи о начале и конце каждой транзакции в JSON-вывод. Значение по умолчанию — off.
filter-originsИсключить изменения из указанных источников. По умолчанию этот параметр пуст, что означает, что источники не будут отфильтрованы. Задаётся в виде разделённого запятыми списка значений.
filter-tablesИсключить строки из указанных таблиц. По умолчанию этот параметр пуст, что означает, что таблицы не будут отфильтрованы. Задаётся в виде разделённого запятыми списка значений. Имена таблиц должны быть дополнены схемой. Имя вида *.foo означает таблицу foo во всех схемах, а имя вида bar.* означает все таблицы в схеме bar. Специальные символы (пробел, одинарная кавычка, запятая, точка, звёздочка) должны быть экранированы обратной косой чертой. Схема и таблица чувствительны к регистру. Например, таблица "public"."Foo bar" должна быть указана как public.Foo\ bar.
add-tablesВключить строки только из указанных таблиц. По умолчанию этот параметр пуст, что означает, что включены все таблицы из всех схем. Для этого параметра действуют те же правила, что и для filter-tables.
filter-msg-prefixesИсключить сообщения, чей префикс указан в значении параметра. По умолчанию этот параметр пуст, что означает, что сообщения не будут отфильтрованы. Задаётся в виде разделённого запятыми списка значений.
add-msg-prefixesВключить только те сообщения, чей префикс указан в значении параметра. По умолчанию используются все префиксы. Задаётся в виде разделённого запятыми списка значений. wal2json применяет filter-msg-prefixes до этого параметра.
format-versionОпределить, какой формат вывода использовать. Значение по умолчанию — 1.
actionsОпределить, какие операции будут включены в JSON-вывод. По умолчанию используются все действия (INSERT, UPDATE, DELETE и TRUNCATE). Однако, если используется format-version = 1, операция TRUNCATE не включается (в целях обратной совместимости).
Существует два способа получить изменения (JSON-объекты) в wal2json: через функции, вызываемые в SQL или с помощью pg_recvlogical.
Ниже представлен пример, как получить JSON-объекты в wal2json с помощью pg_recvlogical. Помимо вышеуказанной настройки необходимо настроить подключение для репликации, чтобы использовать pg_recvlogical. Начиная с Postgres Pro версии 10, логическая репликация сопоставляет обычные записи с именем базы данных или ключевыми словами, такими как all.
Чтобы настроить подключение для репликации и параметры базы данных:
Добавьте правило подключения для репликации в файле pg_hba.conf:
local mydatabase myuser trust
Дополнительно можно задать max_wal_senders в файле postgresql.conf:
max_wal_senders = 1
Перезапустите сервер баз данных, если изменили max_wal_senders.
Чтобы получить JSON-объекты в wal2json:
Откройте терминал и подключитесь к базе данных:
$ pg_recvlogical -d postgres --slot test_slot --create-slot -P wal2json
$ pg_recvlogical -d postgres --slot test_slot --start -o pretty-print=1 -o add-msg-prefixes=wal2json -f -
В другом терминале:
$ cat /tmp/example1.sql
CREATE TABLE table1_with_pk (a SERIAL, b VARCHAR(30), c TIMESTAMP NOT NULL, PRIMARY KEY(a, c));
CREATE TABLE table1_without_pk (a SERIAL, b NUMERIC(5,2), c TEXT);
BEGIN;
INSERT INTO table1_with_pk (b, c) VALUES('Backup and Restore', now());
INSERT INTO table1_with_pk (b, c) VALUES('Tuning', now());
INSERT INTO table1_with_pk (b, c) VALUES('Replication', now());
SELECT pg_logical_emit_message(true, 'wal2json', 'this message will be delivered');
SELECT pg_logical_emit_message(true, 'pgoutput', 'this message will be filtered');
DELETE FROM table1_with_pk WHERE a < 3;
SELECT pg_logical_emit_message(false, 'wal2json', 'this non-transactional message will be delivered even if you rollback the transaction');
INSERT INTO table1_without_pk (b, c) VALUES(2.34, 'Tapir');
-- эта запись не добавляется в поток, так как отсутствует первичный ключ или репликационный идентификатор
UPDATE table1_without_pk SET c = 'Anta' WHERE c = 'Tapir';
COMMIT;
DROP TABLE table1_with_pk;
DROP TABLE table1_without_pk;
$ psql -At -f /tmp/example1.sql postgres
CREATE TABLE
CREATE TABLE
BEGIN
INSERT 0 1
INSERT 0 1
INSERT 0 1
3/78BFC828
3/78BFC880
DELETE 2
3/78BFC990
INSERT 0 1
UPDATE 1
COMMIT
DROP TABLE
DROP TABLE
Вывод в первом терминале может выглядеть так:
$ psql -At -f /tmp/example2.sql postgres
CREATE TABLE
CREATE TABLE
init
BEGIN
INSERT 0 1
INSERT 0 1
INSERT 0 1
3/78C2CA50
3/78C2CAA8
DELETE 2
3/78C2CBD8
INSERT 0 1
UPDATE 1
COMMIT
{
"change": [
{
"kind": "message",
"transactional": false,
"prefix": "wal2json",
"content": "this non-transactional message will be delivered even if you rollback the transaction"
}
]
}
psql:/tmp/example2.sql:17: WARNING: table "table2_without_pk" without primary key or replica identity is nothing
{
"change": [
{
"kind": "insert",
"schema": "public",
"table": "table2_with_pk",
"columnnames": ["a", "b", "c"],
"columntypes": ["integer", "character varying(30)", "timestamp without time zone"],
"columnvalues": [1, "Backup and Restore", "2018-03-27 12:05:29.914496"]
}
,{
"kind": "insert",
"schema": "public",
"table": "table2_with_pk",
"columnnames": ["a", "b", "c"],
"columntypes": ["integer", "character varying(30)", "timestamp without time zone"],
"columnvalues": [2, "Tuning", "2018-03-27 12:05:29.914496"]
}
,{
"kind": "insert",
"schema": "public",
"table": "table2_with_pk",
"columnnames": ["a", "b", "c"],
"columntypes": ["integer", "character varying(30)", "timestamp without time zone"],
"columnvalues": [3, "Replication", "2018-03-27 12:05:29.914496"]
}
,{
"kind": "message",
"transactional": true,
"prefix": "wal2json",
"content": "this message will be delivered"
}
,{
"kind": "delete",
"schema": "public",
"table": "table2_with_pk",
"oldkeys": {
"keynames": ["a", "c"],
"keytypes": ["integer", "timestamp without time zone"],
"keyvalues": [1, "2018-03-27 12:05:29.914496"]
}
}
,{
"kind": "delete",
"schema": "public",
"table": "table2_with_pk",
"oldkeys": {
"keynames": ["a", "c"],
"keytypes": ["integer", "timestamp without time zone"],
"keyvalues": [2, "2018-03-27 12:05:29.914496"]
}
}
,{
"kind": "insert",
"schema": "public",
"table": "table2_without_pk",
"columnnames": ["a", "b", "c"],
"columntypes": ["integer", "numeric(5,2)", "text"],
"columnvalues": [1, 2.34, "Tapir"]
}
]
}
stop
DROP TABLE
DROP TABLE
Чтобы удалить слот в первом терминале:
Ctrl+C
$ pg_recvlogical -d postgres --slot test_slot --drop-slot
Ниже представлены примеры, как получить изменения в wal2json с помощью SQL.
Если для format-version задано значение 1, скрипт может выглядеть так:
$ cat /tmp/example2.sql
CREATE TABLE table2_with_pk (a SERIAL, b VARCHAR(30), c TIMESTAMP NOT NULL, PRIMARY KEY(a, c));
CREATE TABLE table2_without_pk (a SERIAL, b NUMERIC(5,2), c TEXT);
SELECT 'init' FROM pg_create_logical_replication_slot('test_slot', 'wal2json');
BEGIN;
INSERT INTO table2_with_pk (b, c) VALUES('Backup and Restore', now());
INSERT INTO table2_with_pk (b, c) VALUES('Tuning', now());
INSERT INTO table2_with_pk (b, c) VALUES('Replication', now());
SELECT pg_logical_emit_message(true, 'wal2json', 'this message will be delivered');
SELECT pg_logical_emit_message(true, 'pgoutput', 'this message will be filtered');
DELETE FROM table2_with_pk WHERE a < 3;
SELECT pg_logical_emit_message(false, 'wal2json', 'this non-transactional message will be delivered even if you rollback the transaction');
INSERT INTO table2_without_pk (b, c) VALUES(2.34, 'Tapir');
-- эта запись не добавляется в поток, так как отсутствует первичный ключ или репликационный идентификатор
UPDATE table2_without_pk SET c = 'Anta' WHERE c = 'Tapir';
COMMIT;
SELECT data FROM pg_logical_slot_get_changes('test_slot', NULL, NULL, 'pretty-print', '1', 'add-msg-prefixes', 'wal2json');
SELECT 'stop' FROM pg_drop_replication_slot('test_slot');
DROP TABLE table2_with_pk;
DROP TABLE table2_without_pk;
Ожидаемый вывод может выглядеть так:
$ psql -At -f /tmp/example2.sql postgres
CREATE TABLE
CREATE TABLE
init
BEGIN
INSERT 0 1
INSERT 0 1
INSERT 0 1
3/78C2CA50
3/78C2CAA8
DELETE 2
3/78C2CBD8
INSERT 0 1
UPDATE 1
COMMIT
{
"change": [
{
"kind": "message",
"transactional": false,
"prefix": "wal2json",
"content": "this non-transactional message will be delivered even if you rollback the transaction"
}
]
}
psql:/tmp/example2.sql:17: WARNING: table "table2_without_pk" without primary key or replica identity is nothing
{
"change": [
{
"kind": "insert",
"schema": "public",
"table": "table2_with_pk",
"columnnames": ["a", "b", "c"],
"columntypes": ["integer", "character varying(30)", "timestamp without time zone"],
"columnvalues": [1, "Backup and Restore", "2018-03-27 12:05:29.914496"]
}
,{
"kind": "insert",
"schema": "public",
"table": "table2_with_pk",
"columnnames": ["a", "b", "c"],
"columntypes": ["integer", "character varying(30)", "timestamp without time zone"],
"columnvalues": [2, "Tuning", "2018-03-27 12:05:29.914496"]
}
,{
"kind": "insert",
"schema": "public",
"table": "table2_with_pk",
"columnnames": ["a", "b", "c"],
"columntypes": ["integer", "character varying(30)", "timestamp without time zone"],
"columnvalues": [3, "Replication", "2018-03-27 12:05:29.914496"]
}
,{
"kind": "message",
"transactional": true,
"prefix": "wal2json",
"content": "this message will be delivered"
}
,{
"kind": "delete",
"schema": "public",
"table": "table2_with_pk",
"oldkeys": {
"keynames": ["a", "c"],
"keytypes": ["integer", "timestamp without time zone"],
"keyvalues": [1, "2018-03-27 12:05:29.914496"]
}
}
,{
"kind": "delete",
"schema": "public",
"table": "table2_with_pk",
"oldkeys": {
"keynames": ["a", "c"],
"keytypes": ["integer", "timestamp without time zone"],
"keyvalues": [2, "2018-03-27 12:05:29.914496"]
}
}
,{
"kind": "insert",
"schema": "public",
"table": "table2_without_pk",
"columnnames": ["a", "b", "c"],
"columntypes": ["integer", "numeric(5,2)", "text"],
"columnvalues": [1, 2.34, "Tapir"]
}
]
}
stop
DROP TABLE
DROP TABLE
Если для format-version задано значение 2, скрипт может выглядеть так:
$ cat /tmp/example3.sql
CREATE TABLE table3_with_pk (a SERIAL, b VARCHAR(30), c TIMESTAMP NOT NULL, PRIMARY KEY(a, c));
CREATE TABLE table3_without_pk (a SERIAL, b NUMERIC(5,2), c TEXT);
SELECT 'init' FROM pg_create_logical_replication_slot('test_slot', 'wal2json');
BEGIN;
INSERT INTO table3_with_pk (b, c) VALUES('Backup and Restore', now());
INSERT INTO table3_with_pk (b, c) VALUES('Tuning', now());
INSERT INTO table3_with_pk (b, c) VALUES('Replication', now());
SELECT pg_logical_emit_message(true, 'wal2json', 'this message will be delivered');
SELECT pg_logical_emit_message(true, 'pgoutput', 'this message will be filtered');
DELETE FROM table3_with_pk WHERE a < 3;
SELECT pg_logical_emit_message(false, 'wal2json', 'this non-transactional message will be delivered even if you rollback the transaction');
INSERT INTO table3_without_pk (b, c) VALUES(2.34, 'Tapir');
-- эта запись не добавляется в поток, так как отсутствует первичный ключ или репликационный идентификатор
UPDATE table3_without_pk SET c = 'Anta' WHERE c = 'Tapir';
COMMIT;
SELECT data FROM pg_logical_slot_get_changes('test_slot', NULL, NULL, 'format-version', '2', 'add-msg-prefixes', 'wal2json');
SELECT 'stop' FROM pg_drop_replication_slot('test_slot');
DROP TABLE table3_with_pk;
DROP TABLE table3_without_pk;
Ожидаемый вывод может выглядеть так:
$ psql -At -f /tmp/example3.sql postgres
CREATE TABLE
CREATE TABLE
init
BEGIN
INSERT 0 1
INSERT 0 1
INSERT 0 1
3/78CB8F30
3/78CB8F88
DELETE 2
3/78CB90B8
INSERT 0 1
UPDATE 1
COMMIT
psql:/tmp/example3.sql:20: WARNING: no tuple identifier for UPDATE in table "public"."table3_without_pk"
{"action":"M","transactional":false,"prefix":"wal2json","content":"this non-transactional message will be delivered even if you rollback the transaction"}
{"action":"B"}
{"action":"I","schema":"public","table":"table3_with_pk","columns":[{"name":"a","type":"integer","value":1},{"name":"b","type":"character varying(30)","value":"Backup and Restore"},{"name":"c","type":"timestamp without time zone","value":"2019-12-29 04:58:34.806671"}]}
{"action":"I","schema":"public","table":"table3_with_pk","columns":[{"name":"a","type":"integer","value":2},{"name":"b","type":"character varying(30)","value":"Tuning"},{"name":"c","type":"timestamp without time zone","value":"2019-12-29 04:58:34.806671"}]}
{"action":"I","schema":"public","table":"table3_with_pk","columns":[{"name":"a","type":"integer","value":3},{"name":"b","type":"character varying(30)","value":"Replication"},{"name":"c","type":"timestamp without time zone","value":"2019-12-29 04:58:34.806671"}]}
{"action":"M","transactional":true,"prefix":"wal2json","content":"this message will be delivered"}
{"action":"D","schema":"public","table":"table3_with_pk","identity":[{"name":"a","type":"integer","value":1},{"name":"c","type":"timestamp without time zone","value":"2019-12-29 04:58:34.806671"}]}
{"action":"D","schema":"public","table":"table3_with_pk","identity":[{"name":"a","type":"integer","value":2},{"name":"c","type":"timestamp without time zone","value":"2019-12-29 04:58:34.806671"}]}
{"action":"I","schema":"public","table":"table3_without_pk","columns":[{"name":"a","type":"integer","value":1},{"name":"b","type":"numeric(5,2)","value":2.34},{"name":"c","type":"text","value":"Tapir"}]}
{"action":"C"}
stop
DROP TABLE
DROP TABLE