Below is a list of performance tuning tips for the distributed systems.
Tuning query execution is better on a subset of production data that represents actual data distribution. Let's look at some sample plans.
EXPLAIN VERBOSE
SELECT bid,avg(abalance) FROM pgbench_accounts
WHERE bid IN (10,20,30,40)
GROUP BY bid;
QUERY PLAN
-------------------------------------------------------------------------------------------------------------------------------------------
Append (cost=0.29..21.98 rows=4 width=36)
-> GroupAggregate (cost=0.29..18.98 rows=1 width=36)
Output: pgbench_accounts.bid, avg(pgbench_accounts.abalance)
Group Key: pgbench_accounts.bid
-> Index Scan using pgbench_accounts_15_pkey on public.pgbench_accounts_15 pgbench_accounts (cost=0.29..18.96 rows=1 width=8)
Output: pgbench_accounts.bid, pgbench_accounts.abalance
Index Cond: (pgbench_accounts.bid = ANY ('{10,20,30,40}'::integer[]))
-> Async Foreign Scan (cost=0.99..0.99 rows=1 width=36)
Output: pgbench_accounts_1.bid, (avg(pgbench_accounts_1.abalance))
Relations: Aggregate on (public.pgbench_accounts_16_fdw pgbench_accounts_1)
Remote SQL: SELECT bid, avg(abalance) FROM public.pgbench_accounts_16 WHERE ((bid = ANY ('{10,20,30,40}'::integer[]))) GROUP BY 1
Transport: Silk
-> Async Foreign Scan (cost=0.99..0.99 rows=1 width=36)
Output: pgbench_accounts_2.bid, (avg(pgbench_accounts_2.abalance))
Relations: Aggregate on (public.pgbench_accounts_17_fdw pgbench_accounts_2)
Remote SQL: SELECT bid, avg(abalance) FROM public.pgbench_accounts_17 WHERE ((bid = ANY ('{10,20,30,40}'::integer[]))) GROUP BY 1
Transport: Silk
-> Async Foreign Scan (cost=1.00..1.00 rows=1 width=36)
Output: pgbench_accounts_3.bid, (avg(pgbench_accounts_3.abalance))
Relations: Aggregate on (public.pgbench_accounts_19_fdw pgbench_accounts_3)
Remote SQL: SELECT bid, avg(abalance) FROM public.pgbench_accounts_19 WHERE ((bid = ANY ('{10,20,30,40}'::integer[]))) GROUP BY 1
Transport: Silk
Query Identifier: -1714706980364121548
We see here that queries scanning three partitions are going to be sent to other nodes, coordinator data is also
going to be scanned using Index Scan. We do not know what plan will be used on the
remote side, but we see which queries will be sent (marked with Remote SQL).
Note that Transport: Silk section is present in the foreign scan description. This
indicates that Silk transport will be used to transfer results.
We see that Async foreign scan is going to be used, which is fine.
To discover which servers are used in the query, we should look at foreign tables definitions.
For example, we can find out that public.pgbench_accounts_19_fdw is
located on the shardman_rg_2 server listening on 127.0.0.2:65432:
SELECT srvname,srvoptions FROM pg_foreign_server s JOIN pg_foreign_table ON ftserver = s.oid
WHERE ftrelid = 'public.pgbench_accounts_19_fdw'::regclass;
-[ RECORD 1 ]-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
srvname | shardman_rg_2
srvoptions | {async_capable=on,batch_size=100,binary_format=on,connect_timeout=5,dbname=postgres,fdw_tuple_cost=0.2,fetch_size=50000,host=127.0.0.2,port=65432,silk_port=8000,tcp_user_timeout=10000}
Now we can connect to shardman_rg_2 server and find out which plan is used for the local query
which was shown by the above EXPLAIN:
EXPLAIN SELECT bid, avg(abalance)
FROM public.pgbench_accounts_19
WHERE ((bid = ANY ('{10,20,30,40}'::integer[]))) GROUP BY 1;
QUERY PLAN
---------------------------------------------------------------------------------
HashAggregate (cost=3641.00..3641.01 rows=1 width=36)
Group Key: bid
-> Seq Scan on pgbench_accounts_19 (cost=0.00..3141.00 rows=100000 width=8)
Filter: (bid = ANY ('{10,20,30,40}'::integer[]))
While looking at distributed query plans, we can see that sometimes aggregates are not pushed down:
EXPLAIN VERBOSE
SELECT avg(abalance) FROM pgbench_accounts;
QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (cost=156209.38..156209.39 rows=1 width=32) (actual time=590.359..590.371 rows=1 loops=1)
Output: avg(pgbench_accounts.abalance)
-> Append (cost=2891.00..156209.33 rows=20 width=32) (actual time=56.815..590.341 rows=20 loops=1)
-> Partial Aggregate (cost=2891.00..2891.01 rows=1 width=32) (actual time=56.812..56.813 rows=1 loops=1)
Output: PARTIAL avg(pgbench_accounts.abalance)
-> Seq Scan on public.pgbench_accounts_0 pgbench_accounts (cost=0.00..2641.00 rows=100000 width=4) (actual time=0.018..38.478 rows=100000 loops=1)
Output: pgbench_accounts.abalance
-> Partial Aggregate (cost=23991.00..23991.01 rows=1 width=32) (actual time=75.133..75.134 rows=1 loops=1)
Output: PARTIAL avg(pgbench_accounts_1.abalance)
-> Foreign Scan on public.pgbench_accounts_1_fdw pgbench_accounts_1 (cost=100.00..23741.00 rows=100000 width=4) (actual time=41.281..67.293 rows=100000 loops=1)
Output: pgbench_accounts_1.abalance
Remote SQL: SELECT abalance FROM public.pgbench_accounts_1
Transport: Silk
.....
Here avg() is calculated on the coordinator side. This can lead to a
significant growth of data transfer between nodes. The actual data transfer can be monitored with the
NETWORK parameter of EXPLAIN ANALYZE (look at the
Network received field of the topmost plan node):
EXPLAIN (ANALYZE, VERBOSE, NETWORK)
SELECT avg(abalance) FROM pgbench_accounts
QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (cost=156209.38..156209.39 rows=1 width=32) (actual time=589.014..589.027 rows=1 loops=1)
Output: avg(pgbench_accounts.abalance)
Network: FDW bytes sent=3218 received=14402396
-> Append (cost=2891.00..156209.33 rows=20 width=32) (actual time=52.111..588.999 rows=20 loops=1)
Network: FDW bytes sent=3218 received=14402396
-> Partial Aggregate (cost=2891.00..2891.01 rows=1 width=32) (actual time=52.109..52.109 rows=1 loops=1)
Output: PARTIAL avg(pgbench_accounts.abalance)
-> Seq Scan on public.pgbench_accounts_0 pgbench_accounts (cost=0.00..2641.00 rows=100000 width=4) (actual time=0.020..34.472 rows=100000 loops=1)
Output: pgbench_accounts.abalance
-> Partial Aggregate (cost=23991.00..23991.01 rows=1 width=32) (actual time=78.616..78.617 rows=1 loops=1)
Output: PARTIAL avg(pgbench_accounts_1.abalance)
Network: FDW bytes sent=247 received=2400360
-> Foreign Scan on public.pgbench_accounts_1_fdw pgbench_accounts_1 (cost=100.00..23741.00 rows=100000 width=4) (actual time=42.359..69.984 rows=100000 loops=1)
Output: pgbench_accounts_1.abalance
Remote SQL: SELECT abalance FROM public.pgbench_accounts_1
Transport: Silk
Network: FDW bytes sent=247 received=2400360
.....
In such cases, we sometimes can rewrite the query:
EXPLAIN (ANALYZE, NETWORK, VERBOSE)
SELECT sum(abalance)::float/count(abalance) FROM pgbench_accounts where abalance is not null;
QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (cost=12577.20..12577.22 rows=1 width=8) (actual time=151.632..151.639 rows=1 loops=1)
Output: ((sum(pgbench_accounts.abalance))::double precision / (count(pgbench_accounts.abalance))::double precision)
Network: FDW bytes sent=3907 received=872
-> Append (cost=3141.00..12577.10 rows=20 width=16) (actual time=55.589..151.621 rows=20 loops=1)
Network: FDW bytes sent=3907 received=872
-> Partial Aggregate (cost=3141.00..3141.01 rows=1 width=16) (actual time=55.423..55.424 rows=1 loops=1)
Output: PARTIAL sum(pgbench_accounts.abalance), PARTIAL count(pgbench_accounts.abalance)
-> Seq Scan on public.pgbench_accounts_0 pgbench_accounts (cost=0.00..2641.00 rows=100000 width=4) (actual time=0.023..37.212 rows=100000 loops=1)
Output: pgbench_accounts.abalance
Filter: (pgbench_accounts.abalance IS NOT NULL)
-> Async Foreign Scan (cost=1.00..1.00 rows=1 width=16) (actual time=0.055..0.089 rows=1 loops=1)
Output: (PARTIAL sum(pgbench_accounts_1.abalance)), (PARTIAL count(pgbench_accounts_1.abalance))
Relations: Aggregate on (public.pgbench_accounts_1_fdw pgbench_accounts_1)
Remote SQL: SELECT sum(abalance), count(abalance) FROM public.pgbench_accounts_1 WHERE ((abalance IS NOT NULL))
Transport: Silk
Network: FDW bytes sent=300 received=800
....
Rewriting the query here, we could decrease incoming network traffic generated by the query from 13 MB to 872 bytes.
Now let's look at two nearly identical joins.
EXPLAIN ANALYZE SELECT count(*) FROM pgbench_branches b
JOIN pgbench_history h ON b.bid = h.bid
WHERE mtime > '2023-03-14 10:00:00'::timestamptz AND b.bbalance > 0;
QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (cost=8125.68..8125.69 rows=1 width=8) (actual time=27.464..27.543 rows=1 loops=1)
-> Append (cost=3.85..8125.63 rows=20 width=8) (actual time=0.036..27.475 rows=20 loops=1)
-> Partial Aggregate (cost=3.85..3.86 rows=1 width=8) (actual time=0.033..0.036 rows=1 loops=1)
-> Nested Loop (cost=0.00..3.69 rows=67 width=0) (actual time=0.025..0.027 rows=0 loops=1)
Join Filter: (b.bid = h.bid)
-> Seq Scan on pgbench_branches_0 b (cost=0.00..1.01 rows=1 width=4) (actual time=0.023..0.024 rows=0 loops=1)
Filter: (bbalance > 0)
Rows Removed by Filter: 1
-> Seq Scan on pgbench_history_0 h (cost=0.00..1.84 rows=67 width=4) (never executed)
Filter: (mtime > '2023-03-14 10:00:00+03'::timestamp with time zone)
-> Partial Aggregate (cost=222.65..222.66 rows=1 width=8) (actual time=3.969..3.973 rows=1 loops=1)
-> Nested Loop (cost=200.00..222.43 rows=86 width=0) (actual time=3.736..3.920 rows=86 loops=1)
Join Filter: (b_1.bid = h_1.bid)
-> Foreign Scan on pgbench_branches_1_fdw b_1 (cost=100.00..101.22 rows=1 width=4) (actual time=1.929..1.932 rows=1 loops=1)
-> Foreign Scan on pgbench_history_1_fdw h_1 (cost=100.00..120.14 rows=86 width=4) (actual time=1.795..1.916 rows=86 loops=1)
Filter: (mtime > '2023-03-14 10:00:00+03'::timestamp with time zone)
-> Partial Aggregate (cost=864.54..864.55 rows=1 width=8) (actual time=1.780..1.786 rows=1 loops=1)
-> Hash Join (cost=200.01..864.53 rows=5 width=0) (actual time=1.769..1.773 rows=0 loops=1)
Hash Cond: (h_2.bid = b_2.bid)
-> Foreign Scan on pgbench_history_2_fdw h_2 (cost=100.00..760.81 rows=975 width=4) (never executed)
Filter: (mtime > '2023-03-14 10:00:00+03'::timestamp with time zone)
-> Hash (cost=100.00..100.00 rows=1 width=4) (actual time=1.740..1.742 rows=0 loops=1)
Buckets: 1024 Batches: 1 Memory Usage: 8kB
-> Foreign Scan on pgbench_branches_2_fdw b_2 (cost=100.00..100.00 rows=1 width=4) (actual time=1.738..1.738 rows=0 loops=1)
....
Planning Time: 6.066 ms
Execution Time: 33.851 ms
An interesting thing to note is that joining of pgbench_branches
and pgbench_history partitions happens locally. It is a fetch-all plan —
you can discover this by joins being located above foreign scans. It is not always evident
why join pushdown does not happen. But if we look at the pgbench_history
definition, we can see that mtime has the timestamp without time zone
type.
\d pgbench_history
Partitioned table "public.pgbench_history"
Column | Type | Collation | Nullable | Default
--------+-----------------------------+-----------+----------+---------
tid | integer | | |
bid | integer | | |
aid | integer | | |
delta | integer | | |
mtime | timestamp without time zone | | |
filler | character(22) | | |
Partition key: HASH (bid)
Number of partitions: 20 (Use \d+ to list them.)
And in the above query, the string describing time is converted to timestamp with
timezone. This requires comparison of mtime column
(of timestamp type) and timestamptz
value. The comparison is implicitly performed using the stable function
timestamp_gt_timestamptz. A filter containing a non-immutable
function cannot be pushed down to the foreign server, so join is executed locally.
If we rewrite the query, converting the string to a timestamp, we can see not only that joins
are pushed down, but also that remote queries can be executed asynchronously because
foreign scans in a plan tree are located immediately below Append:
EXPLAIN ANALYZE SELECT count(*) FROM pgbench_branches b
JOIN pgbench_history h ON b.bid = h.bid
WHERE mtime > '2023-03-14 10:00:00'::timestamp AND b.bbalance > 0;
QUERY PLAN
------------------------------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate (cost=84.30..84.31 rows=1 width=8) (actual time=22.962..22.990 rows=1 loops=1)
-> Append (cost=3.85..84.25 rows=20 width=8) (actual time=0.196..22.927 rows=20 loops=1)
-> Partial Aggregate (cost=3.85..3.86 rows=1 width=8) (actual time=0.032..0.034 rows=1 loops=1)
-> Nested Loop (cost=0.00..3.69 rows=67 width=0) (actual time=0.024..0.026 rows=0 loops=1)
Join Filter: (b.bid = h.bid)
-> Seq Scan on pgbench_branches_0 b (cost=0.00..1.01 rows=1 width=4) (actual time=0.023..0.023 rows=0 loops=1)
Filter: (bbalance > 0)
Rows Removed by Filter: 1
-> Seq Scan on pgbench_history_0 h (cost=0.00..1.84 rows=67 width=4) (never executed)
Filter: (mtime > '2023-03-14 10:00:00'::timestamp without time zone)
-> Async Foreign Scan (cost=0.99..0.99 rows=1 width=8) (actual time=10.870..10.871 rows=1 loops=1)
Relations: Aggregate on ((pgbench_branches_1_fdw b_1) INNER JOIN (pgbench_history_1_fdw h_1))
-> Async Foreign Scan (cost=0.99..0.99 rows=1 width=8) (actual time=0.016..0.017 rows=1 loops=1)
Relations: Aggregate on ((pgbench_branches_2_fdw b_2) INNER JOIN (pgbench_history_2_fdw h_2))
...
Planning Time: 7.729 ms
Execution Time: 14.603 ms
Note that foreign scans here include a list of joined relations. The expected cost of a foreign join is below 1.0. This is due to an optimistic technique of foreign join cost estimation, which is done automatically. Compare the total execution time (planning time + execution time) of the original and modified query — we could decrease it from about 40 to 22 ms.
Overall, while examining query plans, pay attention to what queries are actually pushed down. Some of the common reasons why joins cannot be pushed down is the absence of equi-joins on the sharding key and filters that contain non-immutable functions (possibly implicitly). If data is fetched from multiple replication groups, check that execution is mostly asynchronous.
EXPLAIN Parameters #
This section lists EXPLAIN parameters for a
distributed system.
NETWORK
(boolean)
#
Include the actual data transfer between nodes in the
EXPLAIN ANALYZE output. If this parameter is not
specified, off is assumed. If the parameter is specified
without a value, on is assumed.
REMOTE
(boolean)
#
Include plans for queries executed on foreign servers. If this parameter or
its value is not specified, on is assumed.
While evaluating performance of DML statements, it is important to understand how they are processed in Postgres Pro Shardman.
First of all, the execution of INSERT significantly
differs from the execution of UPDATE and DELETE
statements. The behavior of INSERT for sharded tables is
controlled by the batch_size foreign server option, which
can be set in FDWOptions section of
Postgres Pro Shardman configuration
file.
If batch_size is greater than 0,
an INSERT in the same statement of several values
that fall into the same foreign partition leads to the values being grouped together
in batches of the specified size. Remote INSERT statements are prepared
with the necessary number of parameters and then are executed with the
given values. If the number of values does not match the number of prepared
arguments, the modified statement with the necessary number of parameters is prepared
again. A batch insert optimization can fail if a transaction inserts records one by one
or records routed to different foreign tables are intermixed
in one INSERT statement. A batch is formed for a single
foreign modify operation. It is sent to the remote server when the batch is filled
or when the modify operation is over. The modify operation is over when
we start routing tuples to another sharded table partition.
So, for bulk load, inserting multiple values in a single INSERT
command or using COPY is recommended (as COPY
is optimized in a similar way). Large batch_size
values allow issuing less INSERT
statements on remote side and so significantly reduce communication
costs. However, during construction of parameters for
prepared INSERT statements, all
inserted values should be copied to libpq-allocated
memory. This can lead to unrestricted memory usage on the query coordinator
side when several large text
or bytea objects are loaded.
UPDATE and DELETE statements
can be executed in a direct or indirect mode. A direct mode is used
when a statement can be directly sent to a foreign server.
In this mode, to modify a table on a remote server, a new statement
is created based on the original ModifyTable
plan node. Using a direct update is not always possible.
In particular, it is impossible when some conditions should be evaluated
locally. In this case, a much less efficient indirect modification
is used. An indirect modification includes several statements.
The first one is SELECT FOR UPDATE to lock remote
rows. The second one is an actual UPDATE or
DELETE, which is prepared once and then
executed with different parameters for each row of the
SELECT FOR UPDATE statement result
after local filters are applied to the result.
Evidently, direct modifications are much more efficient.
You can easily identify whether a DML statement is going to be executed in a direct or indirect mode looking at the query plan. A typical example of an indirect modification is:
EXPLAIN VERBOSE DELETE FROM pgbench_history
WHERE bid = 20 AND mtime > '2023-03-14 10:00:00'::timestamptz;
QUERY PLAN
--------------------------------------------------------------------------------------------------------------
Delete on public.pgbench_history (cost=100.00..142.66 rows=0 width=0)
Foreign Delete on public.pgbench_history_17_fdw pgbench_history_1
Remote SQL: DELETE FROM public.pgbench_history_17 WHERE ctid = $1
-> Foreign Scan on public.pgbench_history_17_fdw pgbench_history_1 (cost=100.00..142.66 rows=4 width=10)
Output: pgbench_history_1.tableoid, pgbench_history_1.ctid
Filter: (pgbench_history_1.mtime > '2023-03-14 10:00:00+03'::timestamp with time zone)
Remote SQL: SELECT mtime, ctid FROM public.pgbench_history_17 WHERE ((bid = 20)) FOR UPDATE
If we had chosen another type for the string constant, this would become a direct update.
EXPLAIN VERBOSE DELETE FROM pgbench_history
WHERE bid = 20 AND mtime > '2023-03-14 10:00:00'::timestamp;
explain verbose delete from pgbench_history where bid = 20 and mtime > '2023-03-14 10:00:00'::timestamp;
QUERY PLAN
-------------------------------------------------------------------------------------------------------------------------------------------------
Delete on public.pgbench_history (cost=100.00..146.97 rows=0 width=0)
Foreign Delete on public.pgbench_history_17_fdw pgbench_history_1
-> Foreign Delete on public.pgbench_history_17_fdw pgbench_history_1 (cost=100.00..146.97 rows=4 width=10)
Remote SQL: DELETE FROM public.pgbench_history_17 WHERE ((mtime > '2023-03-14 10:00:00'::timestamp without time zone)) AND ((bid = 20))
We see that in a direct update mode, only one statement is executed on the remote server.
The shardman.gt_batch_size configuration parameter, which you can tune, defines the size of an intermediate buffer used before sending data to a remote server.
INSERT uses the binary protocol and creates batches of
the shardman.gt_batch_size size. Large values of
the buffer size enable sending fewer network
requests on the remote side and thus substantially reduce the connection
costs. On the other hand, large values of this parameter can increase
memory consumption on the query coordinator side. Therefore, when specifying
the buffer size, it is important to achieve a compromise between the
connection costs and the allocated memory size.
For UPDATE, a query for each column and each row is
created on the coordinator and sent to remote nodes.
For DELETE, a query for a batch of data of the
shardman.gt_batch_size size is
created on the coordinator and sent to remote nodes.
SELECT FOR UPDATE Tuning #
SELECT FOR UPDATE for the distributed tables differ from one
for the regular tables. It may block more records than expected if the records are
filtered locally, if the LIMIT clause is used, or
if the tables are joined. It may result in the entire table to be locked despite
of the selected filters.
The algorithm that provides data consistency on all the cluster nodes uses the system clock installed on the hosts. Therefore, the transaction commit latency depends on clock drift on different hosts, as the coordinator always waits for the most lagging host to catch up. This makes it crucial that the time on all the connected nodes of a Postgres Pro Shardman cluster are synchronized, as lack of synchronization may have a negative impact on Postgres Pro Shardman performance by increasing the query latency.
First, to ensure time synchronization on all cluster nodes, install chrony daemon when deploying a new cluster.
sudo apt update sudo apt install -y chrony sudo systemctl enable --now chrony
Check that chrony is working properly.
chronyc tracking
Expected output:
Reference ID : C0248F82 (Time100.Stupi.SE)
Stratum : 2
Ref time (UTC) : Tue Apr 18 11:50:44 2023
System time : 0.000019457 seconds slow of NTP time
Last offset : -0.000005579 seconds
RMS offset : 0.000089375 seconds
Frequency : 30.777 ppm fast
Residual freq : -0.000 ppm
Skew : 0.003 ppm
Root delay : 0.018349268 seconds
Root dispersion : 0.000334640 seconds
Update interval : 1039.1 seconds
Leap status : Normal
Note that managing the clock drift should be performed using the OS tools. Postgres Pro Shardman diagnostic tools cannot be considered as the only and defining measurement utility.
To see if any major drift already exists, use
the shardman.pg_stat_csn
view that shows statistics on delays that take place during import of CSN snapshots.
Its values are calculated when any related action is performed, or if
any of the shardman.trim_csnxid_map() or
shardman.pg_oldest_csn_snapshot()
functions are called. These functions are called from the
csn trimmer routine worker, therefore disabling this
worker will result in these statistics not being collected.
The csn_max_shift field of the shardman.pg_stat_csn
view shows the maximum registered snapshot CSN shift that caused
a delay. This value defines the clock drift between the nodes in the cluster.
A consecutive increase of this value means at least one's
cluster system clock is out of sync. If this value exceeds
1000 (microseconds), it is recommended to
check the time synchronization settings.
The same can be discovered if the csn_total_import_delay value
increases while csn_max_shift remains unchanged.
However, one-time increase may be due to
single failures, non-related to the time issues.
Also, if the difference between CSNXidMap_head_csn
and shardman.oldest_csn exceeds the csn_snapshot_defer_time
parameter value and stays the same for a long time, it means that
the CSNSnapshotXidMap map is full. It can result in
a global transaction failure.
There are two main reasons for this issue.
There is a transaction that runs for more than csn_snapshot_defer_time
seconds and holds the entire cluster, holding the VACUUM process.
In this case, xid field of the
shardman.oldest_csn view is used
to determine the transaction ID of this transaction, and the rgid field is used to determine
the cluster node where this transaction is located.
The CSNSnapshotXidMap map lacks capacity. During
the normal operation the system might have transactions that
exceed the csn_snapshot_defer_time
value. To fix it, increase the csn_snapshot_defer_time
time so that these transactions stay below this value.
If the shardman.silk_tracepoints configuration parameter
is enabled, executing the EXPLAIN command for the
distributed queries outputs the rows with information
about how much time was
spent on the query execution and what result it ended with, depending on the system components.
These rows show metric values for the time spent on each component.
The net (qry),
net (1st tup), net (last tup)
metrics calculate the difference between timestamps on different servers.
This difference includes both time spent on a message transfer and the clock drift
(positive or negative) between these servers. Therefore,
these metrics can also help to determine whether there is
any clock drift.
Postgres Pro Shardman enhances the
EXPLAIN command so that it can provide
additional information about a query if it is distributed.
The work with the distributed tables is based on the plan
nodes with the ForeignScan type.
A query to each remote partition is determined by a single plan node of this type, with
Postgres Pro Shardman submitting additional information to the
EXPLAIN blocks with the node description.
When executing a distributed query, the part of the plan (a subtree) that
relates to a specific remote partition is serialized into an SQL statement.
This process is known as deparsing. Then, this statement is sent to a remote server.
The result of this query is the output of a ForeignScan node.
It is used to gather the final results of the distributed query execution.
When the VERBOSE option of the EXPLAIN
command is set to on, the Remote SQL
field of the ForeignScan node block shows
the statement sent to the remote server. Also, the Server
field indicates the name of the server as it was specified during
the cluster configuration and as it is displayed in pg_foreign_server,
along with the transport method used to send this statement.
The transport field can take two values: silk
for the the enhanced interconnect Postgres Pro Shardman mechanism,
or libpq for sending via the standard
PostgreSQL protocol.
To see the execution plan that will be used on the remote server under
the EXPLAIN block of the ForeignScan node,
use the shardman.foreign_explain configuration parameter.
The possible values are: none to exclude
the EXPLAIN output from the remote servers, full
to include the EXPLAIN output from the remote servers,
collapsed to include the EXPLAIN output
only for the first ForeignScan node under its
Append/MergeAppend.
In production, it is recommended to disable this parameter (set it
to none) or set it to collapsed,
because obtaining any EXPLAIN information
results in an additional implicit request to the server. Moreover, this
request is executed in a synchronous mode, meaning the overall
EXPLAIN output is built only once all the servers
are sequentially queried.
It can be a costly operationin case of a table with a large number of partitions.
Note that in case of the internal request for obtaining the EXPLAIN
blocks for a remote plan, certain parameters are forcibly disabled, regardless
of the parameters specified by a user when requesting EXPLAIN
from the coordinator: ANALYZE OFF, TIMING OFF,
SUMMARY OFF, SETTINGS OFF, NETWORK OFF.
In this case, the EXPLAIN block of a remote plan will lack
the corresponding metrics. Other EXPLAIN parameters
(FORMAT, VERBOSE, COSTS, BUFFERS,
WAL) are inherited from the coordinator.
The internal EXPLAIN request to a remote server is made with the
GENERIC_PLAN option. Therefore, when analyzing the plans from the
remote side, note that the EXPLAIN blocks display the generic plan.
Setting the NETWORK option of the EXPLAIN
command to on shows the network operation metrics
for the plan nodes, including individual ForeignScan nodes
and general nodes Append or MergeAppend.
For each plan node, the FDW bytes, sent,
and received parameters are displayed for the outgoing
and incoming traffic when the node is executed (regardless of the transport type).
Note that these metrics are only output when the ANALYZE
option of the EXPLAIN
command is set to on.
When the track_fdw_wait_timing configuration parameter
is enabled, the wait_time metric is also output.
This metric summarizes all stages of the plan node execution,
starting from the time the request is sent to the remote server,
including the time spent on the execution itself
and all the time until the complete set of results for that plan node is received.
Note that the ForeignScan node can operate in both
synchronous and asynchronous modes. For the asynchronous execution,
the node's execution function sends a request to the remote server
and completes its execution without waiting for the result.
The result is considered and processed later, upon receipt.
In this scenario, the wait_time metric may not
accurately reflect the actual execution time.
For the Silk transport, there is an option
to output the extended debug information about tracing of a query passing
from the coordinator to the remote server and back, including
the results from the remote server. This information is only
available if the ANALYZE option of the EXPLAIN
command is set to on, and
the shardman.silk_tracepoints configuration parameter
is enabled.
When these parameters are enabled, each message transferred through
the Silk transport (sending the SQL query,
delivering it to the recipient, executing the query, and
returning the execution result) is accompanied by an array of
the timestamps measured at certain points in the pipeline. Once the query
is executed, this information is displayed in the
EXPLAIN block as rows starting with the word Trace.
Each metric represents the difference between the timestamps at
different points, in milliseconds:
Table 14.1. Query Tracing for Silk Transport Metrics
| Interval | Description |
|---|---|
| bk shm->mp1 (qry) | The time taken to transfer an SQL query from the coordinator to its multiplexer via the shared memory. |
| mp1 shm->net (qry) | The time between receiving a query within the multiplexer from the shared memory and transferring it over the network. |
| net (qry) | The time spent by an SQL query to transfer over the network between the multiplexers. |
| mp2 recv->shm (qry) | The time between receiving an SQL query from the network and placing it in the queue in the shared memory on a remote multiplexer. |
| wk exec (1st tup) | The time spent to execute a query in Silkworm
until the first row of the result is received. |
| wk exec (all tups) | The time spent to execute a query on Silkworm
until the complete result is received. |
| wk->shm (1st tup) | The time taken to place the first row of the result into the
Silkworm queue. |
| wk->shm (last tup) | The time taken to place the last row of the result
into the Silkworm queue. |
| mp2 shm->net (1st tup) | The time between reading the first row of the result from the queue by the remote multiplexer and transferring it over the network. |
| net (1st tup) | The time spent to transfer the first row of the result over the network between the multiplexers. |
| mp1 recv->shm (1st tup) | The time between receiving the first row of the result from the network and placing it in the queue by the local multiplexer. |
| mp1 shm->bk (1st tup) | The time spent to retrieve the first row of the result from the queue by the coordinator. |
| mp2 shm->net (last tup) | The time between reading of the last row of the result from the queue by the remote multiplexer and transferring it over the network. |
| net (last tup) | The time spent to transfer the last row of the result over the network between the multiplexers. |
| mp1 recv->shm (last tup) | The time between receiving the last row of the result from the network and placing it in the queue by the local multiplexer. |
| mp1 shm->bk (last tup) | The time taken by the coordinator to retrieve the last row o the result from the queue. |
| END-TO-END | The total time from sending the query to receiving the last
row of the result. This approximately corresponds to the wait_time. |
For the metrics net (qry), net (1st tup),
and net (last tup), the interval value is calculated
as the difference between timestamps on different servers.
Therefore, negative values may appear in these lines.
This difference includes both time spent on a message transfer and the clock drift
(positive or negative) between these servers.
Thus, even with a slight drift, the values will be negative if its absolute value exceeds the duration
of network transfer. Although it is not a bug, you should pay close
attention to whether the cluster clocks are synchronized. For more information,
see Section 14.6.3.