The wal2json module is a
Postgres Pro extension for logical decoding that
converts database changes from the write-ahead log (WAL) into
JSON format. Postgres Pro has
access to tuples generated by
INSERT and
UPDATE operations.
Depending on the configured replica identity, it can also access previous row
versions of UPDATE and DELETE. Changes
can be consumed either through the streaming protocol (logical replication
slots) or through a specialized SQL API.
Format version 1 generates a JSON object per transaction. This object contains all new and old tuples, with optional inclusion of properties such as transaction timestamps, schema-qualified names, data types, and transaction identifiers.
Format version 2 generates a JSON object per tuple, with optional JSON objects marking transaction start and end. Different tuple properties can also be included.
The wal2json extension is provided with
Postgres Pro Standard as a separate pre-built package
wal2json-std-17 (for the detailed
installation instructions, see Chapter 16). Once you
have Postgres Pro Standard installed, complete the
following steps to enable wal2json:
Enable logical decoding by setting
wal_level to
logical in the postgresql.conf
file.
Optionally, you can also edit values of the
max_replication_slots
and
max_wal_senders
parameters.
Restart the database server for the changes to take effect.
The wal2json module provides a variety of options for managing logical decoding:
include-xids
Add transaction IDs to each set of changes in the JSON output.
The default is off.
include-timestamp
Add timestamp to each set of changes in the JSON
output. The default is off.
include-schemas
Add the schema name to each change record in the JSON output. The default is
on.
include-types
Add type to each change record in the JSON output.
The default is on.
include-typmod
Add type modifiers for columns that have them. The default is
on.
include-type-oidsAdd type OIDs. The default is off.
include-domain-data-type
Replace a domain name with the underlying data type. The default is
off.
include-column-positions
Add the column position (pg_attribute.attnum). The
default is off.
include-origin
Add the origin of each change. The default is off.
include-not-null
Add information whether a column is marked as
not null in the columnoptionals
field of the JSON output. The default is off.
include-default
Add default expressions to the JSON output. The default is
off.
include-pk
Add primary key information including column names and data type to
the pk field of the JSON output. The default is
off.
numeric-data-types-as-string
Convert values of numeric data types to strings in the JSON output.
The JSON specification does not support Infinity
and NaN as valid numeric values. There might be
potential
interoperability problems for double precision numbers.
The default is off.
pretty-print
Add spaces and indentation to the JSON output structure. The default
is off.
write-in-chunks
Enable writing the JSON output in smaller chunks instead of writing
the entire changeset at once. This option is used only when
format-version is set to 1.
The default is off.
include-lsn
Add the nextlsn field to each changeset. The
default is off.
include-transaction
Add records denoting the start and end of each transaction to the
JSON output. The default is off.
filter-originsExclude changes from the specified origins. The default is empty, which means that no origin will be filtered. The value is a comma-separated list.
filter-tables
Exclude rows from the specified tables. The default is empty, which
means that no table will be filtered. The value is a comma-separated
list. The tables should be schema-qualified. *.foo
means the foo table in all schemas, and
bar.* means all tables in the
bar schema. Special characters (space, single
quote, comma, period, asterisk) must be escaped with backslash.
Schema and table names are case-sensitive. For example, table
"public"."Foo bar"
should be specified as public.Foo\ bar.
add-tables
Include rows only from the specified tables. The default
is empty, which means that all tables from all schemas are included.
It follows the same rules as filter-tables.
filter-msg-prefixesExclude messages whose prefix is specified in the option value. The default is empty, which means that no messages will be filtered. The value is a comma-separated list.
add-msg-prefixes
Include only messages whose prefix is specified in the
option value. The default is all prefixes. The value is a
comma-separated list. wal2json applies
filter-msg-prefixes before this option.
format-version
Define which output format to use. The default is
1.
actions
Define which operations will be included in the JSON
output. The default is all actions (INSERT,
UPDATE, DELETE, and
TRUNCATE). However, if you are using
format-version = 1, TRUNCATE
is not enabled (for backward compatibility).
There are two ways to obtain the changes (JSON objects) from wal2json: calling functions via SQL or pg_recvlogical.
This is an example how to obtain JSON objects from
wal2json using
pg_recvlogical.
Besides the configuration mentioned above, it is necessary to configure
a replication connection to use pg_recvlogical.
Since Postgres Pro version 10, logical
replication matches a normal entry with a database name or keywords such
as all.
To configure a replication connection and database parameters:
Add a replication connection rule to pg_hba.conf:
local mydatabase myuser trust
Optionally, set max_wal_senders in
postgresql.conf:
max_wal_senders = 1
Restart the database server if you changed
max_wal_senders.
To obtain JSON objects from wal2json:
Open a terminal and connect to the database:
$ pg_recvlogical -d postgres --slot test_slot --create-slot -P wal2json
$ pg_recvlogical -d postgres --slot test_slot --start -o pretty-print=1 -o add-msg-prefixes=wal2json -f -
In another terminal:
$ cat /tmp/example1.sql
CREATE TABLE table1_with_pk (a SERIAL, b VARCHAR(30), c TIMESTAMP NOT NULL, PRIMARY KEY(a, c));
CREATE TABLE table1_without_pk (a SERIAL, b NUMERIC(5,2), c TEXT);
BEGIN;
INSERT INTO table1_with_pk (b, c) VALUES('Backup and Restore', now());
INSERT INTO table1_with_pk (b, c) VALUES('Tuning', now());
INSERT INTO table1_with_pk (b, c) VALUES('Replication', now());
SELECT pg_logical_emit_message(true, 'wal2json', 'this message will be delivered');
SELECT pg_logical_emit_message(true, 'pgoutput', 'this message will be filtered');
DELETE FROM table1_with_pk WHERE a < 3;
SELECT pg_logical_emit_message(false, 'wal2json', 'this non-transactional message will be delivered even if you rollback the transaction');
INSERT INTO table1_without_pk (b, c) VALUES(2.34, 'Tapir');
-- it is not added to stream because there isn't a pk or a replica identity
UPDATE table1_without_pk SET c = 'Anta' WHERE c = 'Tapir';
COMMIT;
DROP TABLE table1_with_pk;
DROP TABLE table1_without_pk;
$ psql -At -f /tmp/example1.sql postgres
CREATE TABLE
CREATE TABLE
BEGIN
INSERT 0 1
INSERT 0 1
INSERT 0 1
3/78BFC828
3/78BFC880
DELETE 2
3/78BFC990
INSERT 0 1
UPDATE 1
COMMIT
DROP TABLE
DROP TABLE
The output in the first terminal might look like this:
$ psql -At -f /tmp/example2.sql postgres
CREATE TABLE
CREATE TABLE
init
BEGIN
INSERT 0 1
INSERT 0 1
INSERT 0 1
3/78C2CA50
3/78C2CAA8
DELETE 2
3/78C2CBD8
INSERT 0 1
UPDATE 1
COMMIT
{
"change": [
{
"kind": "message",
"transactional": false,
"prefix": "wal2json",
"content": "this non-transactional message will be delivered even if you rollback the transaction"
}
]
}
psql:/tmp/example2.sql:17: WARNING: table "table2_without_pk" without primary key or replica identity is nothing
{
"change": [
{
"kind": "insert",
"schema": "public",
"table": "table2_with_pk",
"columnnames": ["a", "b", "c"],
"columntypes": ["integer", "character varying(30)", "timestamp without time zone"],
"columnvalues": [1, "Backup and Restore", "2018-03-27 12:05:29.914496"]
}
,{
"kind": "insert",
"schema": "public",
"table": "table2_with_pk",
"columnnames": ["a", "b", "c"],
"columntypes": ["integer", "character varying(30)", "timestamp without time zone"],
"columnvalues": [2, "Tuning", "2018-03-27 12:05:29.914496"]
}
,{
"kind": "insert",
"schema": "public",
"table": "table2_with_pk",
"columnnames": ["a", "b", "c"],
"columntypes": ["integer", "character varying(30)", "timestamp without time zone"],
"columnvalues": [3, "Replication", "2018-03-27 12:05:29.914496"]
}
,{
"kind": "message",
"transactional": true,
"prefix": "wal2json",
"content": "this message will be delivered"
}
,{
"kind": "delete",
"schema": "public",
"table": "table2_with_pk",
"oldkeys": {
"keynames": ["a", "c"],
"keytypes": ["integer", "timestamp without time zone"],
"keyvalues": [1, "2018-03-27 12:05:29.914496"]
}
}
,{
"kind": "delete",
"schema": "public",
"table": "table2_with_pk",
"oldkeys": {
"keynames": ["a", "c"],
"keytypes": ["integer", "timestamp without time zone"],
"keyvalues": [2, "2018-03-27 12:05:29.914496"]
}
}
,{
"kind": "insert",
"schema": "public",
"table": "table2_without_pk",
"columnnames": ["a", "b", "c"],
"columntypes": ["integer", "numeric(5,2)", "text"],
"columnvalues": [1, 2.34, "Tapir"]
}
]
}
stop
DROP TABLE
DROP TABLE
To drop the slot in the first terminal:
Ctrl+C
$ pg_recvlogical -d postgres --slot test_slot --drop-slot
These are examples how to obtain changes from wal2json via SQL.
If format-version is set to 1, the
script might look like this:
$ cat /tmp/example2.sql
CREATE TABLE table2_with_pk (a SERIAL, b VARCHAR(30), c TIMESTAMP NOT NULL, PRIMARY KEY(a, c));
CREATE TABLE table2_without_pk (a SERIAL, b NUMERIC(5,2), c TEXT);
SELECT 'init' FROM pg_create_logical_replication_slot('test_slot', 'wal2json');
BEGIN;
INSERT INTO table2_with_pk (b, c) VALUES('Backup and Restore', now());
INSERT INTO table2_with_pk (b, c) VALUES('Tuning', now());
INSERT INTO table2_with_pk (b, c) VALUES('Replication', now());
SELECT pg_logical_emit_message(true, 'wal2json', 'this message will be delivered');
SELECT pg_logical_emit_message(true, 'pgoutput', 'this message will be filtered');
DELETE FROM table2_with_pk WHERE a < 3;
SELECT pg_logical_emit_message(false, 'wal2json', 'this non-transactional message will be delivered even if you rollback the transaction');
INSERT INTO table2_without_pk (b, c) VALUES(2.34, 'Tapir');
-- it is not added to stream because there isn't a pk or a replica identity
UPDATE table2_without_pk SET c = 'Anta' WHERE c = 'Tapir';
COMMIT;
SELECT data FROM pg_logical_slot_get_changes('test_slot', NULL, NULL, 'pretty-print', '1', 'add-msg-prefixes', 'wal2json');
SELECT 'stop' FROM pg_drop_replication_slot('test_slot');
DROP TABLE table2_with_pk;
DROP TABLE table2_without_pk;
The expected output might look like this:
$ psql -At -f /tmp/example2.sql postgres
CREATE TABLE
CREATE TABLE
init
BEGIN
INSERT 0 1
INSERT 0 1
INSERT 0 1
3/78C2CA50
3/78C2CAA8
DELETE 2
3/78C2CBD8
INSERT 0 1
UPDATE 1
COMMIT
{
"change": [
{
"kind": "message",
"transactional": false,
"prefix": "wal2json",
"content": "this non-transactional message will be delivered even if you rollback the transaction"
}
]
}
psql:/tmp/example2.sql:17: WARNING: table "table2_without_pk" without primary key or replica identity is nothing
{
"change": [
{
"kind": "insert",
"schema": "public",
"table": "table2_with_pk",
"columnnames": ["a", "b", "c"],
"columntypes": ["integer", "character varying(30)", "timestamp without time zone"],
"columnvalues": [1, "Backup and Restore", "2018-03-27 12:05:29.914496"]
}
,{
"kind": "insert",
"schema": "public",
"table": "table2_with_pk",
"columnnames": ["a", "b", "c"],
"columntypes": ["integer", "character varying(30)", "timestamp without time zone"],
"columnvalues": [2, "Tuning", "2018-03-27 12:05:29.914496"]
}
,{
"kind": "insert",
"schema": "public",
"table": "table2_with_pk",
"columnnames": ["a", "b", "c"],
"columntypes": ["integer", "character varying(30)", "timestamp without time zone"],
"columnvalues": [3, "Replication", "2018-03-27 12:05:29.914496"]
}
,{
"kind": "message",
"transactional": true,
"prefix": "wal2json",
"content": "this message will be delivered"
}
,{
"kind": "delete",
"schema": "public",
"table": "table2_with_pk",
"oldkeys": {
"keynames": ["a", "c"],
"keytypes": ["integer", "timestamp without time zone"],
"keyvalues": [1, "2018-03-27 12:05:29.914496"]
}
}
,{
"kind": "delete",
"schema": "public",
"table": "table2_with_pk",
"oldkeys": {
"keynames": ["a", "c"],
"keytypes": ["integer", "timestamp without time zone"],
"keyvalues": [2, "2018-03-27 12:05:29.914496"]
}
}
,{
"kind": "insert",
"schema": "public",
"table": "table2_without_pk",
"columnnames": ["a", "b", "c"],
"columntypes": ["integer", "numeric(5,2)", "text"],
"columnvalues": [1, 2.34, "Tapir"]
}
]
}
stop
DROP TABLE
DROP TABLE
If format-version is set to 2, the
script might look like this:
$ cat /tmp/example3.sql
CREATE TABLE table3_with_pk (a SERIAL, b VARCHAR(30), c TIMESTAMP NOT NULL, PRIMARY KEY(a, c));
CREATE TABLE table3_without_pk (a SERIAL, b NUMERIC(5,2), c TEXT);
SELECT 'init' FROM pg_create_logical_replication_slot('test_slot', 'wal2json');
BEGIN;
INSERT INTO table3_with_pk (b, c) VALUES('Backup and Restore', now());
INSERT INTO table3_with_pk (b, c) VALUES('Tuning', now());
INSERT INTO table3_with_pk (b, c) VALUES('Replication', now());
SELECT pg_logical_emit_message(true, 'wal2json', 'this message will be delivered');
SELECT pg_logical_emit_message(true, 'pgoutput', 'this message will be filtered');
DELETE FROM table3_with_pk WHERE a < 3;
SELECT pg_logical_emit_message(false, 'wal2json', 'this non-transactional message will be delivered even if you rollback the transaction');
INSERT INTO table3_without_pk (b, c) VALUES(2.34, 'Tapir');
-- it is not added to stream because there isn't a pk or a replica identity
UPDATE table3_without_pk SET c = 'Anta' WHERE c = 'Tapir';
COMMIT;
SELECT data FROM pg_logical_slot_get_changes('test_slot', NULL, NULL, 'format-version', '2', 'add-msg-prefixes', 'wal2json');
SELECT 'stop' FROM pg_drop_replication_slot('test_slot');
DROP TABLE table3_with_pk;
DROP TABLE table3_without_pk;
The expected output might look like this:
$ psql -At -f /tmp/example3.sql postgres
CREATE TABLE
CREATE TABLE
init
BEGIN
INSERT 0 1
INSERT 0 1
INSERT 0 1
3/78CB8F30
3/78CB8F88
DELETE 2
3/78CB90B8
INSERT 0 1
UPDATE 1
COMMIT
psql:/tmp/example3.sql:20: WARNING: no tuple identifier for UPDATE in table "public"."table3_without_pk"
{"action":"M","transactional":false,"prefix":"wal2json","content":"this non-transactional message will be delivered even if you rollback the transaction"}
{"action":"B"}
{"action":"I","schema":"public","table":"table3_with_pk","columns":[{"name":"a","type":"integer","value":1},{"name":"b","type":"character varying(30)","value":"Backup and Restore"},{"name":"c","type":"timestamp without time zone","value":"2019-12-29 04:58:34.806671"}]}
{"action":"I","schema":"public","table":"table3_with_pk","columns":[{"name":"a","type":"integer","value":2},{"name":"b","type":"character varying(30)","value":"Tuning"},{"name":"c","type":"timestamp without time zone","value":"2019-12-29 04:58:34.806671"}]}
{"action":"I","schema":"public","table":"table3_with_pk","columns":[{"name":"a","type":"integer","value":3},{"name":"b","type":"character varying(30)","value":"Replication"},{"name":"c","type":"timestamp without time zone","value":"2019-12-29 04:58:34.806671"}]}
{"action":"M","transactional":true,"prefix":"wal2json","content":"this message will be delivered"}
{"action":"D","schema":"public","table":"table3_with_pk","identity":[{"name":"a","type":"integer","value":1},{"name":"c","type":"timestamp without time zone","value":"2019-12-29 04:58:34.806671"}]}
{"action":"D","schema":"public","table":"table3_with_pk","identity":[{"name":"a","type":"integer","value":2},{"name":"c","type":"timestamp without time zone","value":"2019-12-29 04:58:34.806671"}]}
{"action":"I","schema":"public","table":"table3_without_pk","columns":[{"name":"a","type":"integer","value":1},{"name":"b","type":"numeric(5,2)","value":2.34},{"name":"c","type":"text","value":"Tapir"}]}
{"action":"C"}
stop
DROP TABLE
DROP TABLE