The Shardman software comprises these main components: PostgreSQL core with additional features, shardman extension, management services and utilities. This section considers Shardman cluster as a group of PostgreSQL instances or shards. Each shard may also have one or more replicas and to emphasize this the term replication group is used. The support for highly available configurations is currently done on the level of tools and services and will be covered in the Management section.
In a distributed database, managed by Shardman, the following special table types are used: sharded tables and global tables.
Sharded tables are just usual PostgreSQL partitioned tables where a few partitions, making
up a shard, are regular local tables and the other partitions are
foreign tables available from remote servers via postgres_fdw. Sharded
tables are registered in shardman.sharded_tables dictionary.
Use CREATE TABLE
statement with distributed_by option to create a sharded table.
Several sharded tables can be created as colocated. This means
that they have the same number of partitions and that their partitions,
corresponding to the same sharding key, should reside together.
During rebalance Shardman mangement utilities
ensure that corresponding parititions of the colocated tables are moved to the same node.
(Such rebalance happens, for example, when a new node is added to the cluster).
A colocation is necessary to ensure that joins of several tables are
propagated to the node where the actual data resides. To define
one sharded table colocated with another, firstly create one table,
and then use colocate_with option of
CREATE TABLE statement while creating the second.
Chains of colocated tables are not supported, instead all related
tables should be marked as colocated to one of them. Note that colocate_with
property is symmetric and transitive.
A sharded table consists of several partitions. Some of them are regular tables,
some - foreign tables. By default the count of partitions is determined
by shardman.num_parts parameter, but can be overwritten by
num_parts CREATE TABLE option.
The most of DDL operations are restricted on partitions of a
sharded table. You should modify the parent table instead.
The number of partitions in a sharded table is defined when it is created and
cannot be changed afterwards. When new nodes are added to the cluster
some partitions are moved from existing nodes to the new ones to balance the
load. So, to allow scaling of clusters, the initial number of partitions should
be high enough, but not too high since an extremely large number of partitions
significantly slows down query planning. For example, if you expect the number
of nodes in your cluster to grow by 4 times at a maximum, create sharded tables
with the number of partitions equal to 4 * N, where
N is the number of nodes. A cluster becomes unable to scale when the
number of cluster nodes reaches the number of partitions in the sharded table with
the minimal number of them.
Partitions of a sharded table can be partitioned by range. In this case each
partition of the sharded table is a partitioned table, consisting only from
regular or only from foreign subpartitions. All subpartitions
of the partition are located on the same node. Use partition_by
CREATE TABLE option to specify a column which should
be used as a subpartition key column and partition_bounds
option to set bounds of the second level table partitions. New subpartitions can be added
or removed from the table as necessary. So, you can omit partition_bounds
option during table creation and create partitions later using
shardman.create_subpart()
function. Other subpartition management functions allow you to drop, detach
or attach subpartitions from a sharded table. Subpartition management
is performed cluster-wide.
Global tables are available to all nodes of a cluster. Now a global table is a set of
regular tables synchronized by triggers. The main use case for a global table is to
store a relatively rarely updated set of data, which is used by all cluster nodes.
When a sharded table is joined to a global table, joins between sharded table partitions and
the global table can be performed on nodes where individual partitions reside.
The implementation of trigger-based replication requires a non-deferrable primary key on a global
table to be defined. Currently when a global table is modified, an after statement trigger
fires and propagates changes to another nodes of the cluster via foreign tables.
When new nodes are added to the cluster, global tables data is transferred to
the new nodes via logical replication. When some nodes are removed from the cluster,
global tables are locked for writes for a brief time. Use global
CREATE TABLE option to create a global table.
Global tables are registered in shardman.global_tables dictionary.
Partitioned global tables are not supported.
Shardman extension allows to create several kinds of global objects. These are sharded and global tables, roles and tablespaces. The list of operations, allowed on global objects, is limited particularly to protect consistency of global schema. For the same reason the most operations on global objects are performed cluster-wide. The list of cluster-wide operations include:
CREATE for sharded, global tables, global roles and tablespaces or indexes
on sharded or global tables;
DROP for sharded, global tables, global roles and tablespaces or indexes
on sharded or global tables;
ALTER TABLE for sharded and global tables;
ALTER TABLESPACE for global tablespaces;
ALTER ROLE for global roles;
RENAME for sharded and global tables or indexes on them;
SET CONSTRAINTS ALL inside a transaction block.
Two GUCs control execution of distributed DDL: shardman.broadcast_ddl and shardman.sync_schema. The first one can be used to brodcast all sane DDL cluster-wide (for example, creating schemas or functions). The second one controls broadcasting of statements, related to global objects and should never be turned off without consulting the Postgres Pro Shardman support team.
Shardman uses standard PostgreSQL query execution pipeline. Other nodes in the cluster are accessed via modified postgres_fdw extension.
Shardman query planner takes query abstract syntax tree (AST) and creates query plan, which is used by executor. While evaluating query execution methods planner operates with so-called paths, which specify how relations should be accessed. Processing query join tree planner looks at different combinations - how relations can be joined. Each time it examines join of two relations, one of which can be a join relation itself. After choosing order and strategies for joining relations, a planner considers group by, order by and limit operations. When the cheapest path is selected, it's transformed to a query plan. A plan consists of a tree of nodes, each of them has methods to get one next result row (or NULL if there's no more results).
The efficiency of query execution in a distributed DBMS is determined by degree in which operations
can be executed on nodes which hold the actual data. In Shardman
case a lot of effort is devoted to push down join operations. When a planner finds a relation,
which is accesible via FDW, it creates ForeignPath to access it. Later, when it examines a
join of two relations and both of them are available via ForeignPath from the same foreign server,
it can consider to push down this join to the server and generate a so-called ForeignJoinPath.
Planner can fail to do it if a join type is not supported, if filters, attached to the relation,
should be applied locally, or if a relation scan result contains fields, which can't be evaluated
on the remote server. Example of currently unsupported join type is ANTI-join. The local filters,
attached to the relation, should be applied locally when remote execution can lead to different
result or if postgres_fdw module can't create SQL
expressions to apply some of the filters. Example of fields, which can't be evaluated on the
remote server are attributes of semi-join inner relation, which are not accessible via outer relation.
If foreign_join_fast_path parameter is set to on (which is the default value),
Shardman planner stops searching for other join strategies of two relations
once it founds that foreign join is possible for them. When postgres_fdw.enforce_foreign_join
parameter is set to on (which is also the default), the cost of foreign join is estimated in a
way to be always less than the cost of a local join.
When several sharded tables are joined on sharding key, partitionwise join can be possible.
This means that instead of joining original tables, we can join their matching partitions.
Partitionwise join currently applies only when the join conditions include all the partition keys,
which must be of the same data type and have exactly matching sets of child partitions.
Partitionwise join is crucial to efficient query execution as it allows to push down joins of table partitions.
Evidently, to push down join of several partitions, these partitions should reside on the same
node. This is usually the case when sharded tables are created with the same num_parts
option. However, for rebalance process to move the corresponding partitions to the same nodes,
sharded tables should be marked as colocated when created (see Section 2.1.1.1).
Partitionwise join is enabled with
enable_partitionwise_join
option, which is turned on by default in Shardman.
When sharded table is joined to a plain global table, asymmetric partitionwise join is possible. This means that instead of joining original tables, we can join each partition of the sharded table with global table. This makes it possible to push down join of sharded table partitions with global table to the foreign server.
After planning joins, planner considers paths for post-join operations, such as aggregations,
limiting, sorting and grouping. Not all such operations reach FDW pushdown logic. For example,
currently partitioning efficiently prevents LIMIT clause to be pushed down. There are two
efficient strategies for executing aggregates on remote nodes. The first one is a
partitionwise aggregation - when a group by clause includes partitioning key, aggregate can
be pushed down together with group by clause (this behavior is controlled by
enable_partitionwise_aggregate
option, which is turned on by default in Shardman). Alternatively,
planner can decide to execute partial aggregation on each partition of sharded table
and then combine the results. In Shardman such parital aggregate
can be pushed down if partial aggregate efficiently matches the main aggregate. For example,
partial sum() aggregate can be always pushed down, but
avg() can't. Also planner refuses to push down partial aggregates if
they contain additional clauses - like ORDER BY or DISTINCT,
or if statement has HAVING clause.
Generally, subqueries can't be pushed down to another cluster nodes. However, Shardman uses two approaches to alleviate this limitation.
The first is a subquery unnesting. In PostgreSQL non-correlated subqueries can be transformed into semi-joins. In the following example ANY subquery on non-partitioned tables is transformed to Hash Semi Join:
EXPLAIN (COSTS OFF) SELECT * FROM pgbench_branches WHERE bid = ANY (SELECT bid FROM pgbench_tellers);
QUERY PLAN
-----------------------------------------------------------
Hash Semi Join
Hash Cond: (pgbench_branches.bid = pgbench_tellers.bid)
-> Seq Scan on pgbench_branches
-> Hash
-> Seq Scan on pgbench_tellers
When optimize_correlated_subqueries is on (which is the default),
Shardman planner also tries to convert correlated subqueries
(i.e. subqueries, which reference upper level relations) into semi-joins. This optimization
works for 'IN' and '=' operators. The transformation has some restrictions. For example, it is
not considered if subquery contains aggregates or references upper level relations from outside
of WHERE clause. This optimization allows to transform more complex subqueries
into semi-joins, like in the following example.
EXPLAIN (COSTS OFF) SELECT * FROM pgbench_branches WHERE bid = ANY (SELECT bid FROM pgbench_tellers WHERE tbalance = bbalance);
QUERY PLAN
------------------------------------------------------------------------------------------------------------------------
Hash Semi Join
Hash Cond: ((pgbench_branches.bid = pgbench_tellers.bid) AND (pgbench_branches.bbalance = pgbench_tellers.tbalance))
-> Seq Scan on pgbench_branches
-> Hash
-> Seq Scan on pgbench_tellers
(5 rows)
After applying subquery unnesting semi-join can be pushed down for execution to remote node.
The second approach is to push down the entire subquery. This is possible when optimizer has already
recognized that subquery references only partitions from the same foreign server as the upper level query
and corresponding foreign scans don't have local conditions. The optimization is controlled by
postgres_fdw.subplan_pushdown (which is off by default).
When decision to push down a subquery is done by postgres_fdw,
it has to deparse this subquery. A subquery which contains plan nodes, for which deparsing is not
implemented, will not be pushed down. An example of subquery pushdown looks like the following:
EXPLAIN (VERBOSE ON, COSTS OFF)
SELECT * FROM pgbench_accounts a WHERE a.bid=90 AND abalance =
(SELECT min(tbalance) FROM pgbench_tellers t WHERE t.bid=90 and a.bid=t.bid);
QUERY PLAN
--------------------------------------------------------------------------------------
Foreign Scan on public.pgbench_accounts_5_fdw a
Output: a.aid, a.bid, a.abalance, a.filler
Remote SQL: SELECT aid, bid, abalance, filler FROM public.pgbench_accounts_5 r2 WHERE ((r2.bid = 90)) AND ((r2.abalance = ((SELECT min(sp0_2.tbalance) FROM public.pgbench_tellers_5 sp0_2 WHERE ((sp0_2.bid = 90)) AND ((r2.bid = 90))))))
Transport: Silk
SubPlan 1
-> Finalize Aggregate
Output: min(t.tbalance)
-> Foreign Scan
Output: (PARTIAL min(t.tbalance))
Relations: Aggregate on (public.pgbench_tellers_5_fdw t)
Remote SQL: SELECT min(tbalance) FROM public.pgbench_tellers_5 WHERE ((bid = 90)) AND (($1::integer = 90))
Transport: Silk
Note that in the plan above there are no references to SubPlan 1.
When sharded table is queried, Shardman planner creates Append
plans to scan all partitions of the table and combine result. When some of partitions are foreign tables,
planner can decide to use an asynchronous execution. This means that when Append
node for the first time after initialization is asked for the tuples, it asks asynchronous child nodes
to start fetching result. For postgres_fdw async ForeignScan nodes it means
that remote cursor is declared and fetch request is sent to the remote server. If Silk
transport is used, this means that query is sent for execution to the remote server as a
MT_SPI message.
After sending request to the remote servers Append returns to fetching data from
synchronous child nodes - local scan nodes or synchronous ForeignScan nodes. Data from such nodes
is fetched in blocking manner. When Append ends getting data from synchronous nodes,
it looks if async nodes have some data. If they do not, it waits for async nodes to produce results.
Shardman can execute asynchronously several types of plans. These are asynchronous ForeignScans, projections and trivial subquery scans (select * from subquery) over asynchronous plans.
The asynchronous execution is turned on by default on the level of a foreign server. This is controlled by
async_capable postgres_fdw option. For now only Append
plans support asynchronous execution. MergeAppend doesn't support asynchronous execution.
While examining query plans, pay attention to the presence of non-asynchronous ForeignScan nodes in the plan. Asynchronous execution can significantly increase query execution time.
Examples:
EXPLAIN (COSTS OFF) SELECT * FROM pgbench_accounts;
QUERY PLAN
-------------------------------------------------------------------------
Append
-> Seq Scan on pgbench_accounts_0 pgbench_accounts_1
-> Async Foreign Scan on pgbench_accounts_1_fdw pgbench_accounts_2
-> Async Foreign Scan on pgbench_accounts_2_fdw pgbench_accounts_3
-> Seq Scan on pgbench_accounts_3 pgbench_accounts_4
-> Async Foreign Scan on pgbench_accounts_4_fdw pgbench_accounts_5
-> Async Foreign Scan on pgbench_accounts_5_fdw pgbench_accounts_6
-> Seq Scan on pgbench_accounts_6 pgbench_accounts_7
-> Async Foreign Scan on pgbench_accounts_7_fdw pgbench_accounts_8
-> Async Foreign Scan on pgbench_accounts_8_fdw pgbench_accounts_9
-> Seq Scan on pgbench_accounts_9 pgbench_accounts_10
-> Async Foreign Scan on pgbench_accounts_10_fdw pgbench_accounts_11
-> Async Foreign Scan on pgbench_accounts_11_fdw pgbench_accounts_12
-> Seq Scan on pgbench_accounts_12 pgbench_accounts_13
-> Async Foreign Scan on pgbench_accounts_13_fdw pgbench_accounts_14
-> Async Foreign Scan on pgbench_accounts_14_fdw pgbench_accounts_15
-> Seq Scan on pgbench_accounts_15 pgbench_accounts_16
-> Async Foreign Scan on pgbench_accounts_16_fdw pgbench_accounts_17
-> Async Foreign Scan on pgbench_accounts_17_fdw pgbench_accounts_18
-> Seq Scan on pgbench_accounts_18 pgbench_accounts_19
-> Async Foreign Scan on pgbench_accounts_19_fdw pgbench_accounts_20
Here we see typical asynchronous plan. We see asynchronous foreign scans and local sequential scans, which are executed synchronously.
EXPLAIN (COSTS OFF) SELECT * FROM pgbench_accounts ORDER BY aid;
QUERY PLAN
-------------------------------------------------------------------
Merge Append
Sort Key: pgbench_accounts.aid
-> Sort
Sort Key: pgbench_accounts_1.aid
-> Seq Scan on pgbench_accounts_0 pgbench_accounts_1
-> Foreign Scan on pgbench_accounts_1_fdw pgbench_accounts_2
-> Foreign Scan on pgbench_accounts_2_fdw pgbench_accounts_3
-> Sort
Sort Key: pgbench_accounts_4.aid
-> Seq Scan on pgbench_accounts_3 pgbench_accounts_4
-> Foreign Scan on pgbench_accounts_4_fdw pgbench_accounts_5
-> Foreign Scan on pgbench_accounts_5_fdw pgbench_accounts_6
-> Sort
Sort Key: pgbench_accounts_7.aid
-> Seq Scan on pgbench_accounts_6 pgbench_accounts_7
-> Foreign Scan on pgbench_accounts_7_fdw pgbench_accounts_8
-> Foreign Scan on pgbench_accounts_8_fdw pgbench_accounts_9
-> Sort
Sort Key: pgbench_accounts_10.aid
-> Seq Scan on pgbench_accounts_9 pgbench_accounts_10
-> Foreign Scan on pgbench_accounts_10_fdw pgbench_accounts_11
-> Foreign Scan on pgbench_accounts_11_fdw pgbench_accounts_12
-> Sort
Sort Key: pgbench_accounts_13.aid
-> Seq Scan on pgbench_accounts_12 pgbench_accounts_13
-> Foreign Scan on pgbench_accounts_13_fdw pgbench_accounts_14
-> Foreign Scan on pgbench_accounts_14_fdw pgbench_accounts_15
-> Sort
Sort Key: pgbench_accounts_16.aid
-> Seq Scan on pgbench_accounts_15 pgbench_accounts_16
-> Foreign Scan on pgbench_accounts_16_fdw pgbench_accounts_17
-> Foreign Scan on pgbench_accounts_17_fdw pgbench_accounts_18
-> Sort
Sort Key: pgbench_accounts_19.aid
-> Seq Scan on pgbench_accounts_18 pgbench_accounts_19
-> Foreign Scan on pgbench_accounts_19_fdw pgbench_accounts_20
Here merge append is used, and so execution can't be asynchronous.
There are a lot of cases when operations on data can't be executed remotely (for example, when some non-immutable function is used in filters, when several sharded tables are joined by an attribute which is not a sharding key, when pushdown of a particular join type is not supported) or when planner considers that local execution is more cheap. In such cases different operations (selection, joins or aggregations) are not pushed down, but executed locally. This can lead to inefficient query execution due to large inter-cluster traffic and high processing cost on a coordinator. When this happens you should check if an optimizer has fresh statistics, consider rewriting a query to benefit from different forms of pushdown or at least check that suggested query plan is reasonable enough. To make DBMS analyze data cluster-wide you can use shardman.global_analyze function.
A Shardman cluster uses a snapshot isolation mechanism for distributed transactions. The mechanism provides a way to synchronize snapshots between different cluster nodes of the cluster and a way to atomically commit such transaction with respect to other concurrent global and local transactions. These global transactions can be coordinated by using provided SQL functions or through postgres_fdw which uses these functions on remote nodes transparently.
Clock-SI is described in https://dl.acm.org/citation.cfm?id=2553434. Assume that each node
runs Commit Sequence Number (CSN) based visibility: database tracks counter for each transaction
commit (CSN). In such setting, a snapshot is just a single number - a copy of current
CSN at the moment when the snapshot was taken. Visibility rules are boiled down to checking
whether current tuple's CSN is less than our snapshot's CSN.
Let's assume that CSN is current physical time on the node and call it GlobalCSN.
If the physical time on different nodes would be perfectly synchronized then such snapshot obtained on one node could be
used on other nodes to provide the necessary level of transaction isolation. But unfortunately physical time never
perfectly sync and can drift, so that fact should be taken into mind. Also, there is no easy notion of lock or atomic
operation in the distributed environment, so commit atomicity on different nodes with respect to concurrent snapshot
acquisition should be handled somehow. Clock-SI addresses that in following way:
To achieve commit atomicity of different nodes intermediate step is introduced: at first running transaction
is marked as InDoubt on all nodes, and only after that each node commit it and stamps with a
given GlobalCSN. All readers who ran into tuples of an InDoubt transaction
should wait until it ends and recheck visibility.
When coordinator is marking transactions as InDoubt on other nodes it collects
ProposedGlobalCSN from each participant which is local time at that nodes. Next, it selects
the maximal value of all ProposedGlobalCSN's and commits the transaction on all nodes with
that maximal GlobalCSN, even if that value is greater than current time on this node due
to clock drift. So the GlobalCSN for the given transaction will be the same on all nodes.
Each node records its last generated CSN (last_csn) and cannot generate CSN
<= last_csn. When a node commits a transaction with CSN > last_csn, last_csn is
adjusted to record this CSN. Due to this mechanics node can't generate CSN,
which less than CSNs of already committed transaction.
When local transaction imports foreign global snapshot with some GlobalCSN and current time
on this node is smaller than incoming GlobalCSN then the transaction should wait until this
GlobalCSN time comes on the local clock.
Rules 2 and 3 provide protection against time drift.
The rules above still do not provide recency guaranty for snapshots genereted on nodes that are not participated in a transaction. Read operation originated from such node can see stale data. The probability of anomaly directly depends on the system clock skew in Shardman cluster.
Particular attention should be paid to the synchronization of the system clocks on all cluster nodes. The size of the clock skew must be measured. If external consistency is required, then the clock skew can be compensated with a commit delay. This delay is added before every commit in the system, so it has a negative impact on the latency of transactions. The read-only transactions are not affected by this delay. The delay can be set using the GUC csn_commit_delay.
The CSN visibility mechanism described earlier is not a general way to check the visibility of all
transactions. This is used to provide isolation only for distributed transactions. As a result, each cluster node uses a visibility
checks mechanism based on xid and xmin. In order to be able to use
CSN snapshot that points to past we need to keep old versions of tuples on all nodes and therefore defer vacuuming them.
To do this, each node in a Shardman cluster maintains a CSN to xid
mapping. This map is called CSNSnapshotXidMap. This map is a ring buffer and stores correspondence between
current snapshot_csn and xmin in a sparse way: snapshot_csn
is rounded to seconds (and here we use the fact that snapshot_csn is just a timestamp) and
xmin is stored in the circular buffer where rounded snapshot_csn acts as an
offset from current circular buffer head. Size of the circular buffer is controlled by csn_snapshot_defer_time
GUC. VACUUM is not allowed to clean up tuples, which xmax is newer than the oldest xmin in CSNSnapshotXidMap.
When CSN snapshot arrives we check that its snapshot_csn is still in our map, otherwise
we'll error out with "snapshot too old" message. If snapshot_csn is successfully mapped we fill backend's xmin with the value from the map. That way we can take into account backends with imported CSN snapshot and old tuple versions will be preserved.
To support global transactions, each node keeps old versions of tuples for at least csn_snapshot_defer_time seconds.
With large values of csn_snapshot_defer_time, this negatively affects performance.
This is because nodes save all rows versions during the last csn_snapshot_defer_time seconds, but there
may not be more transactions in the cluster that can read them.
A special task of the monitor periodically recalculates xmin in the cluster and sets
it on all nodes to the minimum possible.
This allows the vacuuming routine to remove row version that is is no longer of interest to any transaction.
GUC shardman.monitor_trim_csnxid_map_interval controls the worker.
The worker wakes up every monitor_interval seconds and performs the following operations:
Checks if the current node's regroup ID is the smallest among all IDs in the cluster. If this condition is not met, then work on the current node is terminated. So only one node in the cluster can perform horizon negotiation.
The coordinator collects from each node of the Shardman cluster the oldest snapshots CSN among all active transactions on the node.
The coordinator chooses the smallest CSN and sends it to each node. Each node discards its
csnXidMap values that are less than this value.
Shardman implements a two-phase commit protocol to ensure the atomicity
of distributed transactions. During the execution of a distributed transaction, the coordinator node
sends the command BEGIN to the participant nodes to initiate their local transactions.
The term "participant nodes" herein and subsequently refers to a subset of cluster nodes that participate in the execution of a transaction's command while the node is engaged in writing activity.
Additionally, a local transaction is created on the coordinator node. This ensures that there are corresponding local transactions on all nodes participating in the distributed transaction.
During the two-phase transaction commit, the coordinator node sends the command
PREPARE TRANSACTION to the participant nodes to initiate the preparation of their
local transactions for commit. If the preparation is successful, the local transaction data is stored
in disk storage, making it persistent. If all participant nodes report successful preparation to the
coordinator node, the coordinator node will commit its local transaction. Subsequently,
the coordinator node will also commit the previously prepared transactions on the participant nodes
using the command COMMIT PREPARED.
If a failure occurs during the PREPARE TRANSACTION command on any of the participant
nodes, the distributed transaction is considered aborted. The coordinator node then broadcasts
the command to abort the previously prepared transactions using the ROLLBACK PREPARED
command. If the local transaction was already prepared, it is aborted. However, if there was no
prepared transaction with the specified name, the command to rollback is simply ignored.
Subsequently, the coordinator node rolls back its local transaction.
After a successful preparation phase, there will be an object prepared transaction
on the each of participant nodes. These objects are actually disk files and records in the server memory.
It is possible to have a prepared transaction that was created earlier through a two-phase operation
and will never be completed. This can occur, for example, if the coordinator node fails exactly after
the preparation step but before the commit step. It can also occur as a result of network connectivity
issues. For instance, if the command COMMIT PREPARED from the coordinator node to
a participant node ends with an error, local transactions will be committed on all participant nodes
except for the one with the error. The local transaction will also be committed on the coordinator node.
All participants, except for the one with the error, believe that the distributed transaction
was completed. However, the one participant still waiting for COMMIT PREPARED
will never receive it, resulting in a prepared transaction that will never be completed.
A prepared transaction consumes system resources, such as memory and disk space. An incomplete prepared transaction causes other transactions that access rows modified by that transaction to wait until the distributed operation completes. Therefore, it is necessary to complete prepared transactions, even in cases where there were failures during commit, to free up resources and ensure that other transactions can proceed.
To resolve such situations, there is a mechanism for resolving prepared transactions that is implemented as
part of the Shardman monitor. It is implemented as a background worker that
wakes up periodically, acting as an internal "crontab" job. By default, the period is set to 5 seconds,
but it can be configured using the shardman.monitor_dxact_interval configuration parameter. The worker
checks for the presence of prepared transactions that were created before a certain amount of time, specified by the shardman.monitor_dxact_timeout configuration parameter (which is also set to 5 seconds
by default), on the same node where the Shardman monitor is running.
When the PREPARE TRANSACTION command is sent to the participant node, a special
name is assigned to the prepared transaction. This name encodes useful information that allows for
the identification of the coordinator node and its local transaction.
If the Shardman monitor finds outdated prepared transactions, it extracts the coordinator's replication group ID and transaction ID of the coordinator's local transaction. The Shardman monitor then sends a query to the coordinator
SELECT shardman.xact_status(TransactionId)
which requests the current status of the coordinator's local transaction. If the query fails, such as due to network connectivity issues, then the prepared transaction will remain untouched until the next time the Shardman monitor wakes up.
In the case of a successful query, the coordinator node can reply with one of the following statuses:
committed
The local transaction on the coordinator node was completed
successfully. Therefore, the Shardman monitor commits this
prepared transaction as well using the COMMIT PREPARED command.
aborted
The local transaction on the coordinator node was aborted.
Therefore, the Shardman monitor aborts this transaction as well
using the ROLLBACK PREPARED command.
unknown
The transaction with such an identifier was never existent on
the coordinator node. Therefore, the Shardman monitor aborts
this transaction using the ROLLBACK PREPARED command.
active
The local transaction on the coordinator node is still somewhere
inside the CommitTransaction() flow. Therefore, the Shardman monitor
does nothing with this transaction. The Shardman monitor will
try again with this transaction at the next wake-up.
ambiguous
The ambiguous status can be returned when CLOG's
truncating is enabled on the coordinator's node. The CLOG is a bitmap that
stores the status of completed local transactions. When a transaction is committed or aborted,
its status is marked in the CLOG. However, the CLOG can be
truncated (garbage collected) by the VACUUM process to discard statuses
of old transactions that don't affect the visibility of data for any existing transaction.
When the CLOG is truncated, there is a possibility that the
shardman.xact_status() function may not be able to unambiguously decide if
a transaction exists in the past (with some status) or if it never existed. In such cases,
the function returns an ambiguous status. This can lead to uncertainty about the actual status
of the transaction and can make it difficult to resolve the prepared transaction.
When the shardman.xact_status() function returns an ambiguous status for
a prepared transaction, the monitor node logs a warning message indicating that the status
could not be determined unambiguously. The prepared transaction is left untouched, and
the monitor will try again with this transaction at the next wake-up. It's important
to properly configure the min_clog_size parameter to value
1024000 (that means "never truncate CLOG") to avoid ambiguity in
the status of prepared transactions.
In situations where the prepared transaction resolution mechanism is unable to resolve prepared transactions due to constant errors or ambiguous status, the administrator will need to manually intervene to resolve these transactions. This may involve examining the server logs and performing a manual rollback or commit operation on the prepared transaction. It's important to note that leaving prepared transactions unresolved can lead to resource consumption and performance issues, so it's important to address these situations as soon as possible.
Silk (Shardman InterLinK)
is an experimental transport feature. It is injected at the point where
postgres_fdw decides to transmit deparsed piece of query
through libpq connection to the remote node, replacing
libpq connection with itself. It is designed to decrease
the count of idle postgres_fdw connections during
transaction execution, minimize latency and boost overall throughput.
Silk implementation uses several background processes.
The main routing/multiplexing process (one per PostgreSQL
instance) named silkroad, and a bunch of background workers named
silkworms. While postgres_fdw uses
libpq, it spawns multiple libpq connections
from each backend to the remote node (where multiple backend processes are spawned
accordingly). But if silk replaces libpq -
every silkroad process is connected to only one remote
silkroad. In this scheme, remote silkworms
play the role of remote backends otherwise spawned by postgres_fdw.
Silkroad wires local backend with remote node's workers this way:
Backend process uses regular postgres_fdw API to access
remote data as usual. But postgres_fdw, when silk enabled,
writes the query into shared memory queue instead of libpq
connection;
Silkroad process parses incoming shared memory queue from
that backend and routes the message to appropriate network connection with
remote silkroad process.
Remote silkroad process grabs incoming message from network
and (if it is a new one) redirects it to available worker's shared memory queue
(or in a special "unassigned jobs" queue if all of the workers are busy).
At last, remote worker gets the message through its shared memory queue, executes it and sends back the result tuples (or an error) the same way.
Silkroad acts here like a common network switch, tossing packets
between backend's shared memory and appropriate network socket. It knows nothing about
content of a message relying only on the message header.
Silkroad process runs event loop powered by libev
library. Each backend's shared memory queue exposed at the event loop with
eventfd descriptor, and each network connection -
with a socket descriptor.
During startup, the backend registers itself (its eventfd
descriptors) at local silkroad process. Silkroad
responds by specifying which memory segments to use for the backend's message queue.
From this moment silkroad will respond to events from the queue
associated with this backend. Network connections between local and remote
silkroads would be established at once on the first request from
backend to remote node and stay alive until both of participants (silkroad
processes) exists.
For each subquery we expect a subset of tuples, and therefore represent the interaction
within the subquery as a bidirectional data stream. Silkroad
uses internal routing table to register these streams. Unique stream ID (within
the shardman cluster) is formed like a pair of "origin
node address, target node address" and a locally (within the node) unique number.
Each particular subquery from backend to remote nodes would be registered by
silkroad as such a stream. So, any backend can be associated
with many streams at the time.
When local silkroad process got a message with new stream ID
from backend it registers it in a local routing table, then redirects this message
to appropriate socket. If connection with remote silkroad does not exist,
it is established using a handshake procedure. The original message, which initiated
a handshake, is placed into special internal buffer until handshake succeeds.
Remote silkroad process receiving a packet with new ID registers
it in its own table, then assigns a silkworm worker from a pool of available workers
and place the message into worker's shared memory queue. If all of the
silkworm workers are busy at the moment, the message would be
postponed into a special "unassigned jobs queue"
(GUC 'shardman.silk_unassigned_job_queue_size = 1024').
If there's no free space in the queue, an error message will be generated and sent
back to the source backend. Job from this queue would be assigned later to the first
available worker when it get rid of previous job.
When the worker got a new "job", it executes it through SPI subsystem,
forming result tuples into batches and sends them back through shared memory to local
silkroad process. The rest is trivial due to the whole route is known.
The last resulting packet with tuples in a stream marked as "closing". Its an order
to silkroads to wipe out this route from their tables.
Note that backend and remote workers stay "subscribed" to their streams until they are explicitly closed. So the backend has the opportunity to send an abort message or notify remote worker to prematurely close the transaction. And it makes it possible to discard obsolete data packets, possibly from previous aborted transactions.
Besides the routing table silkroad tracks endpoints (backends and network connections)
that were involved in some particular stream. So when some connection is closed all
of involved backends (and/or workers) will be notified of that event with special
error message, and all routes/streams related to this connection will be dismissed.
The same way, if the backend crashes, its shared memory queue became detached
and silkroad reacts by sending error messages to remote participants
of every stream related to crashed backend. So the remote workers are not left doing
useless work while the requester has already died.
The resulting tuples are transmitted by silkworm in native binary mode.
Tuples with 'external' storage attribute will be detoasted but those that were compressed
stay compressed.
Small tuples will be organized in a batches (about 256k). Big tuples will be cut into pieces by sender and assembled into a whole by receiving backend.
It may happen that when the next message is received from backend, it will not fit
the target network buffer. Or the message received from the network does not fit
into the target shared memory queue. In such a case, the stream that caused
this situation will be "suspended". This means that the silkroad
pauses the reaction to events from source endpoint (connection or backend) until
the target endpoint drains their messages. The rest backends and connections
not affected by this route are keep working. Receiving modules of backends are designed
to minimize these situations. The backend periodically checks and drains the incoming
queue even when the plan executor is busy processing other plan nodes. Received tuples
are stored in backend's tuplestores according the plan nodes until the executor
requests the next tuple for particular plan node execution.
When enough space is freed on the target queue, the suspended stream get resumed, endpoint's events will be unblocked and the process of receiving and sorting packets continues.
When postgres_fdw works over Silk transport, only one connection
between silkroad routing daemons is used to transfer user requests
to silkworm workers and get their responses.
Each request contains a transaction state, a replication group id of node,
where request is formed (coordinator), a query itself and query parameters (if present).
A response is either an error response message with a specific error message and error code
or a bunch of tuples, followed by "end of tuples" message.
This means that silkworm has to switch to the transaction state,
coming with the request prior to executing request.
For now Silk transport is used only for read-only SELECT queries.
All modify requests are processed via usual libpq connection and handled
mostly as all other DML in upstream postgres_fdw. The only distinction is that
when DML request is processed by postgres_fdw, it resets saved transaction
state for the connection cache entry, corresponding to the connection, where this request is send.
Also read-only flag is set to false for such connection cache entry.
When a request is send over Silk transport, shardman extension
asks for the transaction state for a pair of serverid and userid from postgres_fdw.
If such connection cache entry is found in postgres_fdw connection cache, it's not a read-only
cache entry and transaction state is present in this entry, the state
is returned. If it's not present, postgres_fdw retreives full transaction state
from the remote server, saves it in the connection cache entry and returns to
shardman extension.
The full transaction state is similar to parallel worker transaction state and contains:
information, related to the current user (uid, username);
pid of the current backend;
transaction start timestamp;
current snapshot CSN;
flags that invalidation messages are present;
backend private state:
array of ComboCIDs;
internal transaction state (full transaction id, isolation level, current command id, etc.);
information about reindexed indexes.
If connection is not found in postgres_fdw connection cache (i.e. it's a new connection) or entry in connection cache is marked as read-only, only
information, related to the current user (username);
transaction start timestamp;
current snapshot CSN;
flags that invalidation messages are present
form the transaction state.
Using such transaction state, silkworm is able to attach to running transaction
or start a new read-only transaction with provided snapshot CSN and retreive the result.
Note that full transaction state can be imported only on server, which exported it. Also note, that due
to this transaction state transferring method, you can't use Silk transport
without enabling CSN snapshots.
In the Section 2.1.2.2, asynchronous ForeignScan plan nodes
were discovered as a way to optimize data retrieval from multiple hosts, while these plan nodes
were located under a single Append node. In standard Postgres
architecture, the execution of ForeignScan plan nodes is implemented using
the network protocol based on libpq. To improve system performance during data
transfer and reduce resource consumption, Shardman employs a different
method for exchanging data with remote hosts. The mechanism for executing ForeignScan
nodes is implemented using the Silk protocol.
In order to incorporate Silk transport into the asynchronous executor, modifications
were made to the postgres_fdw extension. A pluggable transport was implemented as
a set of interface functions included as part of the Shardman extension.
During execution callbacks that interact with remote hosts, these functions are called by the
postgres_fdw extension. The pluggable Silk transport is activated
if the Shardman extension is preloaded and if the foreign server has
the attribute extended_features (applicable for any FDW server in the Shardman cluster). For all other cases, the postgres_fdw extension
uses the standard exchange protocol based on libpq.
To disable the pluggable Silk transport in the Shardman
cluster, it is necessary to set the query_engine_mode configuration parameter to
the value ENGINE_NONE.
In the current implementation, the pluggable Silk transport is only used for
read-only queries, specifically during the execution of the ForeignScan node.
The standard exchange protocol based on libpq is used for modifying queries.
When receiving query execution result rows using the Silk transport, the data
is stored in a TupleStoreState storage as a complete result set, which is
the same size as that returned by the remote host. The TupleStoreState is
implemented as a data structure that can spill data to the disk in case of memory shortage.
If the remote host returns a large result set, it does not lead to an OOM condition. Once the result
set is received in the TupleStoreState, the data is copied into the
ForeignScan executor's in-memory buffer. The size of this buffer is defined by the
fetch_size attribute of the foreign server. The default value of 50000 rows
can be adjusted to find a balance between performance (number of ForeignScan node
calls) and memory consumption.
Utilizing the pluggable Silk transport for the asynchronous FDW engine results in
an increase in network exchange performance and a reduction in the consumption of system resources
due to better utilization of system resources, including the number of network connections.
Distributed deadlocks may occur during the processing of distributed transactions. Let us consider the following example:
create table players(id int, username text, pass text) with (distributed_by='id');
insert into players select id, 'user_' || id, 'pass_' || id from generate_series(1,1000) id;
Assuming that the record with id=2 belongs to node1 and the record with
id=3 belongs to node2.
Let us execute the following commands on different nodes:
node1=# begin;
node1=# update players set pass='someval' where id=3;
node2=# begin;
node2=# update players set pass='someval' where id=2;
-- it should stuck because node1's transaction is locked record with id=3
node2=# update players set pass='someval2' where id=3;
-- it should stuck because node2's transaction is locked record with id=2
node1=# update players set pass='someval2' where id=2;
A distributed deadlock situation arises when transactions are mutually locked by each other. Postgres has an internal mechanism for deadlock detection, which detects mutual locking between child processes of a single Postgres instance (backend) and resolves it. However, this mechanism is not applicable to the discovered situation, because mutual locking is distributed, i.e., backends from different nodes are involved. From the point of view of the Postgres lock manager, there is no deadlock condition because processes of the single instance are not locking each other. Therefore, Shardman has a separate mechanism for distributed deadlock resolution.
We can represent the interaction between processes in the entire cluster as a graph. A graph vertex
represents a process (backend), which we can identify with a couple of attributes
{rgid; vxid}, where rgid is the replication group ID, and
vxid is the virtual transaction ID of the currently executed transaction.
Graph edges represent directional connections between vertices. It is directed from the locked process
to the locking process.
It is obvious that any process can be locked only by one process. In other words, if the backend is waiting for a lock, it can only wait for a specific lock. On the other hand, a locking process can acquire multiple locks, meaning that it can lock multiple backends simultaneously.
With that said, the lock graph acts as a singly linked list. If this list contains a closed loop, then here is a deadlock condition. To detect a deadlock, it is necessary to build such a list and detect closed loops in it.
The distributed deadlock detector in Shardman is implemented as a separate task inside the Shardman monitor. If a process is unable to acquire a lock within
a specified amount of time (which is one second by default, but can be adjusted using
the deadlock_timeout configuration parameter), the internal
Postgres deadlock detector attempts to detect a local deadlock.
If no local deadlock is found, the distributed deadlock detector is activated.
The distributed deadlock detector builds a graph (list) of locks in the cluster. It queries views
pg_locks and pg_stat_activity on the local node and on each of
the remote cluster nodes.
The process of building the list of locks involves sequentially querying nodes in the cluster, and it is not atomic, so the list is not consistent. This means that the distributed deadlock detector may produce false positives. During the building of the list, we can store a lock that can disappear before the end of the list building process. To guarantee the reliability of deadlock detection, after the detection of a closed loop, it is necessary to re-query the nodes involved in the closed loop.
After finding the closed loop, the distributed deadlock detector chooses the process belonging to the local node and cancels it. The user process served by the cancelled backend will receive a message:
canceling statement due distributed deadlock was found
A verbose message about the detected deadlock will be recorded in the server logs:
LOG: distributed deadlock detected
DETAIL: repgroup 1, PID 95264 (application 'psql'), executed query 'update players set pass='qqq' where id=2;' is blocked by repgroup 1, PID 95283 (application 'pgfdw:2:95278:9/2'), executed query 'UPDATE public.players_0 SET pass = 'qqq'::text WHERE ((id = 2))'
repgroup 1, PID 95283 (application 'pgfdw:2:95278:9/2'), executed query 'UPDATE public.players_0 SET pass = 'qqq'::text WHERE ((id = 2))' is blocked by repgroup 2, PID 95278 (application 'psql'), executed query 'update players set pass='qqq' where id=3;'
repgroup 2, PID 95278 (application 'psql'), executed query 'update players set pass='qqq' where id=3;' is blocked by repgroup 2, PID 95267 (application 'pgfdw:1:95264:8/4'), executed query 'UPDATE public.players_1 SET pass = 'qqq'::text WHERE ((id = 3))'
repgroup 2, PID 95267 (application 'pgfdw:1:95264:8/4'), executed query 'UPDATE public.players_1 SET pass = 'qqq'::text WHERE ((id = 3))' is blocked by repgroup 1, PID 95264 (application 'psql'), executed query 'update players set pass='qqq' where id=2;'
Global sequences in Shardman are implemented on top of the regular PostgreSQL sequences with some additional cluster-wide metadata which among other things holds the interval of globally unused sequence elements.
When CREATE SEQUENCE is issued, an ordinary
PostgreSQL sequence with the same
name is created on every cluster node. The range of this local
sequence is a bounded sub-interval of the global sequence (as
defined by MINVALUE and
MAXVALUE parameters), and contains at most
block_size elements. The
shardman.next_value function returns
values from the local sequence until it runs out, then a new
sub-interval with block_size elements is
allocated from the global sequence using a broadcast query
involving all cluster nodes. So, smaller block size values
make the generated numbers more monotonic across the cluster,
but incur a performance penalty, since the broadcast query may
be rather expensive. Another way to describe the block size
parameter is to say that it controls the size of a second
cache level, similarly to how the CACHE
parameter works, except at the level of an entire
Shardman cluster.
Also note, that every time a new sub-interval is allocated the
underlying local sequence is modified (as in ALTER
SEQUENCE), which will lock it for the transaction
duration, preventing any other local, concurrent transactions
from obtaining next sequence values.
To ensure that cluster binary backup is consistent, Shardman implements the syncpoints mechanism.
To achieve consistent visibility of distributed transactions,
the technique of global snapshots based on physical clocks is used (Clock-Si).
Similarly, it is possible to get a consistent snapshot for backups,
only the time corresponding to the global snapshot must be mapped to
a set of LSN for each node. Such a set of consistent LSN in a
cluster is called a syncpoint.
In a Shardman cluster, each node can generate its own independent local CSN, which does not guarantee the global ordering of values in time. Therefore, we cannot take this arbitrary local CSN as the basis for a syncpoint. Instead, Shardman choses only those CSNs that match distributed transaction commit records as the basis of the syncpoint. These CSNs have the property of global ordering to the Clock-Si algorithm and can be used to obtain a syncpoint. The main points of this mechanism are described below.
The commit record of each completed transaction in Shardman contains the assigned
CSN for this transaction. This value, together with the LSN of this record,
forms a pair of values (CSN, LSN). Each of the cluster nodes stores a certain number of such
pairs in RAM in a special structure - the CSNLSNMap. CSNLSNMap is a circular
buffer. Each element of the map is a (CSN, LSN) pair. The map size is set by the GUCs csn_lsn_map_size.
A (CSN, LSN) pair can be added to the map only if there are no transactions on the node that
can receive a CSN less than the one added. This important condition guarantees monotonous growth of CSN and LSN in CSNLSNmap,
but does not guarantee that every commit record will get into the map.
When a user submits a request to create a syncpoint, a search by every CSNLSNMap is
made for a largest possible CSNg for which there is an entry (CSN
n, LSN) in each node and the condition CSNn <= CSNg
is true. The monotonic growth property of every CSNLSNMap ensures that each found pair
(CSNn, LSN) corresponds to the state of the global data at the time
corresponding to CSNg. If no such value of CSNg
is found, the get syncpoint operation fails and can be retried later. If such a value CSNg is found, then a syncpoint is generated as a special type of WAL record, which is
duplicated on all nodes of the cluster.
By getting a syncpoint and taking the LSN for each node in the cluster from it,
we can make a backup of each node, which must necessarily contain that LSN.
We can also recover to this LSN using the point in time recovery (PITR) mechanism.