2.1. Shardman Architecture

2.1.1. Table Types
2.1.2. Query Processing
2.1.3. Distributed Transactions
2.1.4. Silk
2.1.5. Distributed Deadlock Detection
2.1.6. Resource control?
2.1.7. Global Sequences
2.1.8. Syncpoints and consistent backup

The Shardman software comprises these main components: PostgreSQL core with additional features, shardman extension, management services and utilities. This section considers Shardman cluster as a group of PostgreSQL instances or shards. Each shard may also have one or more replicas and to emphasize this the term replication group is used. The support for highly available configurations is currently done on the level of tools and services and will be covered in the Management section.

2.1.1. Table Types

In a distributed database managed by Shardman the following special table types are used: sharded tables and global tables.

2.1.1.1. Sharded Tables

Sharded tables are just usual PostgreSQL partitioned tables where a few partitions, making up a shard, are regular local tables and the other partitions are foreign tables available from remote servers via postgres_fdw. Sharded tables are registered in the shardman.sharded_tables dictionary. Use the CREATE TABLE statement with the distributed_by parameter to create a sharded table. Several sharded tables can be created as colocated. This means that they have the same number of partitions and that their partitions corresponding to the same sharding key should reside together. During a rebalance, Shardman management utilities ensure that corresponding partitions of colocated tables are moved to the same node. (Such a rebalance happens, for example, when a new node is added to the cluster). Colocation is necessary to ensure that joins of several tables are propagated to the node where the actual data resides. To define one sharded table colocated with another one, first, create one table and then use the colocate_with parameter of the CREATE TABLE statement while creating the second table. Chains of colocated tables are not supported, all related tables should be marked as colocated to one of the tables instead. Note that colocate_with property is symmetric and transitive.

2.1.1.1.1. Partitions

A sharded table consists of several partitions. Some of them are regular tables, and others are foreign tables. By default, the number of partitions is determined by the shardman.num_parts parameter, but it can be overwritten by the num_parts CREATE TABLE parameter. Most of DDL operations are restricted on partitions of a sharded table. You should modify the parent table instead.

The number of partitions in a sharded table is defined when it is created and cannot be changed afterwards. When new nodes are added to the cluster, some partitions are moved from existing nodes to the new ones to balance the load. So, to allow scaling of clusters, the initial number of partitions should be high enough, but not too high since an extremely large number of partitions significantly slows down query planning. For example, if you expect the number of nodes in your cluster to grow by 4 times at a maximum, create sharded tables with the number of partitions equal to 4 * N, where N is the number of nodes. A cluster becomes unable to scale when the number of cluster nodes reaches the number of partitions in the sharded table with the minimal number of them.

2.1.1.1.2. Subpartitions

Partitions of a sharded table can be partitioned by range. In this case, each partition of a sharded table is a partitioned table consisting only of regular or only of foreign subpartitions. All subpartitions of a partition are located on the same node. Use the partition_by CREATE TABLE parameter to specify a column that should be used as a subpartition key column and the partition_bounds parameter to set bounds of the second-level table partitions. New subpartitions can be added or removed from a table as necessary. So you can omit the partition_bounds parameter during table creation and create partitions later using the shardman.create_subpart() function. Other subpartition management functions allow you to drop, detach or attach subpartitions of a sharded table. Subpartition management is cluster-wide.

2.1.1.2. Global Tables

Global tables are available to all nodes of a cluster. Now a global table is a set of regular tables synchronized by triggers. The main use case for a global table is to store a relatively rarely updated set of data that is used by all cluster nodes. When a sharded table is joined to a global table, joins between sharded table partitions and the global table can be performed on nodes where individual partitions reside. The implementation of trigger-based replication requires a non-deferrable primary key on a global table to be defined. Currently when a global table is modified, an after-statement trigger fires and propagates changes to other nodes of the cluster via foreign tables. When new nodes are added to a cluster, global table data is transferred to the new nodes via logical replication. When some nodes are removed from a cluster, global tables get locked for writes for a brief time. Use the global CREATE TABLE parameter to create a global table. Global tables are registered in the shardman.global_tables dictionary. Partitioned global tables are not supported.

2.1.1.3. Distributed DDL

Shardman extension allows creating several kinds of global objects. These are sharded and global tables, roles and tablespaces. The list of operations allowed on global objects is limited particularly to protect consistency of a global schema. For the same reason, most operations on global objects are cluster-wide. The list of cluster-wide operations includes:

  • CREATE for sharded and global tables, global roles and tablespaces or indexes on sharded or global tables.

  • DROP for sharded and global tables, global roles and tablespaces or indexes on sharded or global tables.

  • ALTER TABLE for sharded and global tables.

  • ALTER TABLESPACE for global tablespaces.

  • ALTER ROLE for global roles.

  • RENAME for sharded and global tables or indexes on them.

  • SET CONSTRAINTS ALL inside a transaction block.

These configuration settings control execution of the distributed DDL: shardman.broadcast_ddl and shardman.sync_schema. The first one can be used for a cluster-wide broadcast of all regular DDL operations (for example, creating schemas or functions). The second one controls broadcasting of statements related to global objects and should never be turned off without consulting the Postgres Pro Shardman support team.

2.1.2. Query Processing

Shardman uses the standard PostgreSQL query execution pipeline. Other nodes in the cluster are accessed via the modified postgres_fdw extension.

Shardman query planner takes the query abstract syntax tree (AST) and creates a query plan, which is used by the executor. While evaluating query execution methods, the planner operates with so-called paths, which specify how relations should be accessed. Processing a query join tree, the planner looks at different combinations of how relations can be joined. Each time, it examines a join of two relations, one of which can be a join relation itself. After choosing the order and strategies for joining relations, the planner considers the group by, order by and limit operations. When the cheapest path is selected, it is transformed to a query plan. A plan consists of a tree of nodes, each of which has methods to get one next result row (or NULL if there are no more results).

2.1.2.1. Push down technique

2.1.2.1.1. Joins

The efficiency of query execution in a distributed DBMS is determined by how many operations can be executed on nodes that hold the actual data. For Shardman, a lot of effort is devoted to pushing down join operations. When a planner finds a relation that is accessible via a foreign data wrapper (FDW), it creates ForeignPath to access it. Later, when it examines a join of two relations and both of them are available via ForeignPath from the same foreign server, it can consider pushing down this join to the server and generating a so-called ForeignJoinPath. The planner can fail to do it if the join type is not supported, if filters attached to the relation should be applied locally, or if the relation scan result contains fields that cannot be evaluated on the remote server. An example of a currently unsupported join type is anti-join. Local filters attached to the relation should be applied locally when remote execution can lead to a different result or if the postgres_fdw module cannot create SQL expressions to apply some of the filters. An example of fields that cannot be evaluated on a remote server are attributes of semi-join inner relation that are not accessible via an outer relation. If the foreign_join_fast_path configuration parameter is set to on (which is the default value), the Shardman planner stops searching for other join strategies of two relations once it finds foreign join possible for them. When the postgres_fdw.enforce_foreign_join configuration parameter is set to on (which is also the default), the cost of a foreign join is estimated so as to be always less than the cost of a local join.

When several sharded tables are joined on a sharding key, a partitionwise join can be possible. This means that instead of joining original tables, we can join their matching partitions. Partitionwise join currently applies only when the join conditions include all the partition keys, which must be of the same data type and have exactly matching sets of child partitions. Partitionwise join is crucial to the efficient query execution as it allows pushing down joins of table partitions. Evidently, to push down join of several partitions, these partitions should reside on the same node. This is usually the case when sharded tables are created with the same num_parts parameter. However, for a rebalance process to move the corresponding partitions to the same nodes, sharded tables should be marked as colocated when created (see Section 2.1.1.1). Partitionwise join is enabled with the enable_partitionwise_join configuration parameter, which is turned on by default in Shardman.

When a sharded table is joined to a plain global table, asymmetric partitionwise join is possible. This means that instead of joining original tables, we can join each partition of the sharded table with the global table. This makes it possible to push down a join of sharded table partitions - with a global table to the foreign server.

2.1.2.1.2. Aggregations

After planning joins, the planner considers paths for post-join operations, such as aggregations, limiting, sorting and grouping. Not all such operations reach FDW pushdown logic. For example, currently partitioning efficiently prevents the LIMIT clause from being pushed down. There are two efficient strategies for executing aggregates on remote nodes. The first one is a partitionwise aggregation — when a GROUP BY clause includes a partitioning key, the aggregate can be pushed down together with the GROUP BY clause (this behavior is controlled by the enable_partitionwise_aggregate configuration parameter, which is turned on by default in Shardman). Alternatively, the planner can decide to execute partial aggregation on each partition of a sharded table and then combine the results. In Shardman, such a partial aggregate can be pushed down if the partial aggregate efficiently matches the main aggregate. For example, partial sum() aggregate can always be pushed down, but avg() cannot. Also the planner refuses pushing down partial aggregates if they contain additional clauses, such as ORDER BY or DISTINCT, or if the statement has the HAVING clause.

2.1.2.1.3. Subqueries

Generally, subqueries cannot be pushed down to other cluster nodes. However, Shardman uses two approaches to alleviate this limitation.

The first is subquery unnesting. In PostgreSQL, non-correlated subqueries can be transformed into semi-joins. In the following example, ANY subquery on non-partitioned tables is transformed to Hash Semi Join:

EXPLAIN (COSTS OFF) SELECT * FROM pgbench_branches WHERE bid = ANY (SELECT bid FROM pgbench_tellers);
                        QUERY PLAN                         
-----------------------------------------------------------
 Hash Semi Join
   Hash Cond: (pgbench_branches.bid = pgbench_tellers.bid)
   ->  Seq Scan on pgbench_branches
   ->  Hash
         ->  Seq Scan on pgbench_tellers

When optimize_correlated_subqueries is on (which is the default), Shardman planner also tries to convert correlated subqueries (i.e., subqueries that reference upper-level relations) into semi-joins. This optimization works for IN and = operators. The transformation has some restrictions. For example, it is not considered if a subquery contains aggregates or references upper-level relations from outside of a WHERE clause. This optimization allows transforming more complex subqueries into semi-joins, like in the following example:

EXPLAIN (COSTS OFF) SELECT * FROM pgbench_branches WHERE bid = ANY (SELECT bid FROM pgbench_tellers WHERE tbalance = bbalance);
                                                       QUERY PLAN                                                       
------------------------------------------------------------------------------------------------------------------------
 Hash Semi Join
   Hash Cond: ((pgbench_branches.bid = pgbench_tellers.bid) AND (pgbench_branches.bbalance = pgbench_tellers.tbalance))
   ->  Seq Scan on pgbench_branches
   ->  Hash
         ->  Seq Scan on pgbench_tellers
(5 rows)

After applying subquery unnesting, semi-join can be pushed down for execution to a remote node.

The second approach is to push down the entire subquery. This is possible when the optimizer has already figured out that the subquery references only partitions from the same foreign server as the upper-level query and corresponding foreign scans do not have local conditions. The optimization is controlled by postgres_fdw.subplan_pushdown (which is off by default). When a decision to push down a subquery is made by postgres_fdw, it has to deparse this subquery. A subquery that contains plan nodes for which deparsing is not implemented will not be pushed down. An example of a subquery pushdown looks as follows:

EXPLAIN (VERBOSE ON, COSTS OFF)
SELECT * FROM pgbench_accounts a WHERE a.bid=90 AND abalance =
    (SELECT min(tbalance) FROM pgbench_tellers t WHERE t.bid=90 and a.bid=t.bid);
                                     QUERY PLAN                                                                                                                   
--------------------------------------------------------------------------------------
 Foreign Scan on public.pgbench_accounts_5_fdw a
   Output: a.aid, a.bid, a.abalance, a.filler
   Remote SQL: SELECT aid, bid, abalance, filler FROM public.pgbench_accounts_5 r2 WHERE ((r2.bid = 90)) AND ((r2.abalance = ((SELECT min(sp0_2.tbalance) FROM public.pgbench_tellers_5 sp0_2 WHERE ((sp0_2.bid = 90)) AND ((r2.bid = 90))))))
   Transport: Silk
   SubPlan 1
     ->  Finalize Aggregate
           Output: min(t.tbalance)
           ->  Foreign Scan
                 Output: (PARTIAL min(t.tbalance))
                 Relations: Aggregate on (public.pgbench_tellers_5_fdw t)
                 Remote SQL: SELECT min(tbalance) FROM public.pgbench_tellers_5 WHERE ((bid = 90)) AND (($1::integer = 90))
                 Transport: Silk

Note that in the plan above there are no references to SubPlan 1.

2.1.2.2. Asynchronous execution

When a sharded table is queried, the Shardman planner creates Append plans to scan all partitions of the table and combine the result. When some of partitions are foreign tables, the planner can decide to use an asynchronous execution. This means that when an Append node for the first time after initialization is asked for the tuples, it asks asynchronous child nodes to start fetching the result. For postgres_fdw async ForeignScan nodes, it means that a remote cursor is declared and a fetch request is sent to the remote server. If Silk transport is used, this means that the query is sent for execution to the remote server as an MT_SPI message.

After sending a request to the remote servers, Append returns to fetching data from synchronous child nodes — local scan nodes or synchronous ForeignScan nodes. Data from such nodes is fetched in a blocking manner. When Append ends getting data from synchronous nodes, it looks if async nodes have some data. If they do not, it waits for async nodes to produce results.

Shardman can execute several types of plans asynchronously. These are asynchronous ForeignScans, projections and trivial subquery scans (select * from subquery) over asynchronous plans.

The asynchronous execution is turned on by default on the level of a foreign server. This is controlled by async_capable postgres_fdw option. For now, only Append plans support asynchronous execution. MergeAppend does not support asynchronous execution.

While examining query plans, pay attention to the presence of non-asynchronous ForeignScan nodes in the plan. Asynchronous execution can significantly increase query execution time.

Examples:

EXPLAIN (COSTS OFF) SELECT * FROM pgbench_accounts;
                               QUERY PLAN                                
-------------------------------------------------------------------------
 Append
   ->  Seq Scan on pgbench_accounts_0 pgbench_accounts_1
   ->  Async Foreign Scan on pgbench_accounts_1_fdw pgbench_accounts_2
   ->  Async Foreign Scan on pgbench_accounts_2_fdw pgbench_accounts_3
   ->  Seq Scan on pgbench_accounts_3 pgbench_accounts_4
   ->  Async Foreign Scan on pgbench_accounts_4_fdw pgbench_accounts_5
   ->  Async Foreign Scan on pgbench_accounts_5_fdw pgbench_accounts_6
   ->  Seq Scan on pgbench_accounts_6 pgbench_accounts_7
   ->  Async Foreign Scan on pgbench_accounts_7_fdw pgbench_accounts_8
   ->  Async Foreign Scan on pgbench_accounts_8_fdw pgbench_accounts_9
   ->  Seq Scan on pgbench_accounts_9 pgbench_accounts_10
   ->  Async Foreign Scan on pgbench_accounts_10_fdw pgbench_accounts_11
   ->  Async Foreign Scan on pgbench_accounts_11_fdw pgbench_accounts_12
   ->  Seq Scan on pgbench_accounts_12 pgbench_accounts_13
   ->  Async Foreign Scan on pgbench_accounts_13_fdw pgbench_accounts_14
   ->  Async Foreign Scan on pgbench_accounts_14_fdw pgbench_accounts_15
   ->  Seq Scan on pgbench_accounts_15 pgbench_accounts_16
   ->  Async Foreign Scan on pgbench_accounts_16_fdw pgbench_accounts_17
   ->  Async Foreign Scan on pgbench_accounts_17_fdw pgbench_accounts_18
   ->  Seq Scan on pgbench_accounts_18 pgbench_accounts_19
   ->  Async Foreign Scan on pgbench_accounts_19_fdw pgbench_accounts_20

Here we see a typical asynchronous plan. There are asynchronous foreign scans and local sequential scans, which are executed synchronously.

EXPLAIN (COSTS OFF) SELECT * FROM pgbench_accounts ORDER BY aid;
                            QUERY PLAN                             
-------------------------------------------------------------------
 Merge Append
   Sort Key: pgbench_accounts.aid
   ->  Sort
         Sort Key: pgbench_accounts_1.aid
         ->  Seq Scan on pgbench_accounts_0 pgbench_accounts_1
   ->  Foreign Scan on pgbench_accounts_1_fdw pgbench_accounts_2
   ->  Foreign Scan on pgbench_accounts_2_fdw pgbench_accounts_3
   ->  Sort
         Sort Key: pgbench_accounts_4.aid
         ->  Seq Scan on pgbench_accounts_3 pgbench_accounts_4
   ->  Foreign Scan on pgbench_accounts_4_fdw pgbench_accounts_5
   ->  Foreign Scan on pgbench_accounts_5_fdw pgbench_accounts_6
   ->  Sort
         Sort Key: pgbench_accounts_7.aid
         ->  Seq Scan on pgbench_accounts_6 pgbench_accounts_7
   ->  Foreign Scan on pgbench_accounts_7_fdw pgbench_accounts_8
   ->  Foreign Scan on pgbench_accounts_8_fdw pgbench_accounts_9
   ->  Sort
         Sort Key: pgbench_accounts_10.aid
         ->  Seq Scan on pgbench_accounts_9 pgbench_accounts_10
   ->  Foreign Scan on pgbench_accounts_10_fdw pgbench_accounts_11
   ->  Foreign Scan on pgbench_accounts_11_fdw pgbench_accounts_12
   ->  Sort
         Sort Key: pgbench_accounts_13.aid
         ->  Seq Scan on pgbench_accounts_12 pgbench_accounts_13
   ->  Foreign Scan on pgbench_accounts_13_fdw pgbench_accounts_14
   ->  Foreign Scan on pgbench_accounts_14_fdw pgbench_accounts_15
   ->  Sort
         Sort Key: pgbench_accounts_16.aid
         ->  Seq Scan on pgbench_accounts_15 pgbench_accounts_16
   ->  Foreign Scan on pgbench_accounts_16_fdw pgbench_accounts_17
   ->  Foreign Scan on pgbench_accounts_17_fdw pgbench_accounts_18
   ->  Sort
         Sort Key: pgbench_accounts_19.aid
         ->  Seq Scan on pgbench_accounts_18 pgbench_accounts_19
   ->  Foreign Scan on pgbench_accounts_19_fdw pgbench_accounts_20

Here merge append is used, and so the execution cannot be asynchronous.

2.1.2.3. Fetch-all Fallback

There are a lot of cases when operations on data cannot be executed remotely (for example, when some non-immutable function is used in filters, when several sharded tables are joined by an attribute that is not a sharding key, when pushdown of a particular join type is not supported) or when the planner considers local execution to be cheaper. In such cases different operations (selection, joins or aggregations) are not pushed down, but executed locally. This can lead to inefficient query execution due to large inter-cluster traffic and high processing cost on a coordinator. When this happens, you should check if an optimizer has fresh statistics, consider rewriting a query to benefit from different forms of pushdown or at least check that the suggested query plan is reasonable enough. To make DBMS analyze data for the whole cluster, you can use shardman.global_analyze function.

2.1.3. Distributed Transactions

2.1.3.1. Visibility and CSN

2.1.3.1.1. Commit Sequence Number (CSN)

A Shardman cluster uses a snapshot isolation mechanism for distributed transactions. The mechanism provides a way to synchronize snapshots between different nodes of a cluster and a way to atomically commit such a transaction with respect to other concurrent global and local transactions. These global transactions can be coordinated by using provided SQL functions or through postgres_fdw, which uses these functions on remote nodes transparently.

Clock-SI is described in https://dl.acm.org/citation.cfm?id=2553434. Assume that each node uses the CSN-based visibility: the database tracks the counter for each transaction commit (CSN). With such a setting, a snapshot is just a single number — a copy of the current CSN at the moment when the snapshot was taken. Visibility rules are boiled down to checking whether the current tuple's CSN is less than our snapshot's CSN.

Let's assume that CSN is the current physical time on the node and call it GlobalCSN. If the physical time on different nodes is perfectly synchronized, then such a snapshot obtained on one node can be used on other nodes to provide the necessary level of transaction isolation. But unfortunately physical time is never perfectly sync and can drift, and this should be taken into account. Also, there is no easy notion of lock or atomic operation in the distributed environment, so commit atomicity on different nodes with respect to concurrent snapshot acquisition should be handled somehow. Clock-SI addresses that in the following way:

  1. To achieve commit atomicity of different nodes, intermediate step is introduced: at the first run, a transaction is marked as InDoubt on all nodes, and only after that each node commits it and stamps with a given GlobalCSN. All readers that ran into tuples of an InDoubt transaction should wait until it ends and recheck the visibility.

  2. When the coordinator is marking transactions as InDoubt on other nodes, it collects ProposedGlobalCSN from each participant, which is the local time on those nodes. Next, it selects the maximal value of all ProposedGlobalCSNs and commits the transaction on all nodes with that maximal GlobalCSN even if that value is greater than the current time on this node due to clock drift. So the GlobalCSN for the given transaction will be the same on all nodes. Each node records its last generated CSN (last_csn) and cannot generate CSNlast_csn. When a node commits a transaction with CSN > last_csn, last_csn is adjusted to record this CSN. Due to this mechanism, a node cannot generate a CSN, that is less than CSNs of already committed transactions.

  3. When a local transaction imports a foreign global snapshot with some GlobalCSN and the current time on this node is smaller than the incoming GlobalCSN, then the transaction should wait until this GlobalCSN time comes to the local clock.

The two last rules provide protection against time drift.

2.1.3.1.2. Commit delay and external consistency

The rules above still do not provide recency guaranty for snapshots genereted on nodes that are not participated in a transaction. Read operation originated from such node can see stale data. The probability of anomaly directly depends on the system clock skew in Shardman cluster.

Particular attention should be paid to the synchronization of the system clocks on all cluster nodes. The size of the clock skew must be measured. If external consistency is required, then the clock skew can be compensated with a commit delay. This delay is added before every commit in the system, so it has a negative impact on the latency of transactions. The read-only transactions are not affected by this delay. The delay can be set using the GUC csn_commit_delay.

2.1.3.1.3. CSN Map

The CSN visibility mechanism described above is not a general way to check the visibility of all transactions. It is used to provide isolation only for distributed transactions. As a result, each cluster node uses a visibility checking mechanism based on xid and xmin. To be able to use the CSN snapshot that points to the past, we need to keep old versions of tuples on all nodes and therefore defer vacuuming them. To do this, each node in a Shardman cluster maintains a CSN to xid mapping. The map is called CSNSnapshotXidMap. This map is a ring buffer, and it stores the correspondence between the current snapshot_csn and xmin in a sparse way: snapshot_csn is rounded to seconds (and here we use the fact that snapshot_csn is just a timestamp), and xmin is stored in the circular buffer where rounded snapshot_csn acts as an offset from the current circular buffer head. The size of the circular buffer is controlled by the csn_snapshot_defer_time configuration setting. VACUUM is not allowed to clean up tuples whose xmax is newer than the oldest xmin in CSNSnapshotXidMap.

When a CSN snapshot arrives, we check that its snapshot_csn is still in our map, otherwise, we will error out with snapshot too old message. If the snapshot_csn is successfully mapped, we fill backend's xmin with the value from the map. That way we can take into account backends with an imported CSN snapshot, and old tuple versions will be preserved.

2.1.3.1.4. CSN Map Trimming

To support global transactions, each node keeps old versions of tuples for at least csn_snapshot_defer_time seconds. With large values of csn_snapshot_defer_time, this negatively affects performance. This is because nodes save all row versions during the last csn_snapshot_defer_time seconds, but there may not be more transactions in the cluster that can read them. A special task of the monitor periodically recalculates xmin in the cluster and sets it on all nodes to the minimum possible value. This allows the vacuuming routine to remove a row version that is no longer of interest to any transaction. The shardman.monitor_trim_csnxid_map_interval configuration setting controls the worker. The worker wakes up every monitor_interval seconds and performs the following operations:

  1. Checks if the current node's repgroup ID is the smallest among all IDs in the cluster. If this condition is not met, then the work on the current node is terminated. So only one node in the cluster can perform a horizon negotiation.

  2. From each node of the Shardman cluster, the coordinator collects the oldest snapshot CSN among all active transactions on the node.

  3. The coordinator chooses the smallest CSN and sends it to each node. Each node discards its csnXidMap values that are less than this value.

2.1.3.2. 2PC and Prepared Transaction Resolution

Shardman implements a two-phase commit protocol to ensure the atomicity of distributed transactions. During the execution of a distributed transaction, the coordinator node sends the command BEGIN to participant nodes to initiate their local transactions.

The term "participant nodes" herein and subsequently refers to a subset of cluster nodes that participate in the execution of a transaction's command while the node is engaged in writing activity.

Additionally, a local transaction is created on the coordinator node. This ensures that there are corresponding local transactions on all nodes participating in the distributed transaction.

During the two-phase transaction commit, the coordinator node sends the command PREPARE TRANSACTION to the participant nodes to initiate the preparation of their local transactions for commit. If the preparation is successful, the local transaction data is stored in a disk storage, making it persistent. If all participant nodes report successful preparation to the coordinator node, the coordinator node will commit its local transaction. Subsequently, the coordinator node will also commit the previously prepared transactions on the participant nodes using the command COMMIT PREPARED.

If a failure occurs during the PREPARE TRANSACTION command on any of the participant nodes, the distributed transaction is considered aborted. The coordinator node then broadcasts the command to abort the previously prepared transactions using the ROLLBACK PREPARED command. If the local transaction was already prepared, it is aborted. However, if there was no prepared transaction with the specified name, the command to rollback is simply ignored. Subsequently, the coordinator node rolls back its local transaction.

After a successful preparation phase, there will be an object prepared transaction on the each of participant nodes. These objects are actually disk files and records in the server memory.

It is possible to have a prepared transaction that was created earlier through a two-phase operation and will never be completed. This can occur, for example, if the coordinator node fails exactly after the preparation step but before the commit step. It can also occur as a result of network connectivity issues. For instance, if the command COMMIT PREPARED from the coordinator node to a participant node ends with an error, local transactions will be committed on all participant nodes except for the one with the error. The local transaction will also be committed on the coordinator node. All participants, except for the one with the error, believe that the distributed transaction was completed. However, the one participant still waiting for COMMIT PREPARED will never receive it, resulting in a prepared transaction that will never be completed.

A prepared transaction consumes system resources, such as memory and disk space. An incomplete prepared transaction causes other transactions that access rows modified by that transaction to wait until the distributed operation completes. Therefore, it is necessary to complete prepared transactions, even in cases where there were failures during commit, to free up resources and ensure that other transactions can proceed.

To resolve such situations, there is a mechanism for resolving prepared transactions that is implemented as part of the Shardman monitor. It is implemented as a background worker that wakes up periodically, acting as an internal crontab job. By default, the period is set to 5 seconds, but it can be configured using the shardman.monitor_dxact_interval configuration parameter. The worker checks the presence of prepared transactions that were created earlier by a certain amount of time, specified by the shardman.monitor_dxact_timeout configuration parameter (which is also set to 5 seconds by default), on the same node where the Shardman monitor is running.

When the PREPARE TRANSACTION command is sent to a participant node, a special name is assigned to the prepared transaction. This name encodes useful information, which allows identifying the coordinator node and its local transaction.

If the Shardman monitor finds outdated prepared transactions, it extracts the coordinator's replication group ID and transaction ID of the coordinator's local transaction. The monitor then sends a query to the coordinator

SELECT shardman.xact_status(TransactionId)

which requests the current status of the coordinator's local transaction. If the query fails, for example, due to network connectivity issues, then the prepared transaction will remain untouched until the next time when the monitor wakes up.

In the case of a successful query, the coordinator node can reply with one of the following statuses:

committed

The local transaction on the coordinator node was completed successfully. Therefore, the Shardman monitor also commits this prepared transaction using the COMMIT PREPARED command.

aborted

The local transaction on the coordinator node was aborted. Therefore, the monitor also aborts this transaction using the ROLLBACK PREPARED command.

unknown

The transaction with such an identifier never existed on the coordinator node. Therefore, the monitor aborts this transaction using the ROLLBACK PREPARED command.

active

The local transaction on the coordinator node is still somewhere inside the CommitTransaction() flow. Therefore, the monitor does nothing with this transaction. The monitor will try again with this transaction at the next wake-up.

ambiguous

This status can be returned when CLOG's truncating is enabled on the coordinator node. The CLOG is a bitmap that stores the status of completed local transactions. When a transaction is committed or aborted, its status is marked in the CLOG. However, the CLOG can be truncated (garbage collected) by the VACUUM process to discard statuses of old transactions that do not affect the visibility of data for any existing transaction.

When the CLOG is truncated, there is a possibility that the shardman.xact_status() function may not be able to unambiguously decide if a transaction exists in the past (with some status) or if it never existed. In such cases, the function returns an ambiguous status. This can lead to uncertainty about the actual status of the transaction and can make it difficult to resolve the prepared transaction.

When the shardman.xact_status() function returns the ambiguous status for a prepared transaction, the monitor node logs a warning message indicating that the status could not be determined unambiguously. The prepared transaction is left untouched, and the monitor will try again with this transaction at the next wake-up. It is important to properly configure the min_clog_size parameter with the value of 1024000 (which means "never truncate CLOG") to avoid ambiguity in the status of prepared transactions.

In situations where the prepared transaction resolution mechanism is unable to resolve prepared transactions due to constant errors or ambiguous status, the administrator will need to manually intervene to resolve these transactions. This may involve examining the server logs and performing a manual rollback or commit operation on the prepared transaction. Note that leaving prepared transactions unresolved can lead to resource-consumption and performance issues, so it is important to address these situations as soon as possible.

2.1.4. Silk

2.1.4.1. Concept

Silk (Shardman InterLinK) is an experimental transport feature. It is injected at the point where postgres_fdw decides to transmit deparsed piece of query through libpq connection to the remote node, replacing libpq connection with itself. It is designed to decrease the count of idle postgres_fdw connections during transaction execution, minimize latency and boost overall throughput.

Silk implementation uses several background processes. The main routing/multiplexing process (one per PostgreSQL instance) named silkroad, and a bunch of background workers named silkworms. While postgres_fdw uses libpq, it spawns multiple libpq connections from each backend to the remote node (where multiple backend processes are spawned accordingly). But if silk replaces libpq - every silkroad process is connected to only one remote silkroad. In this scheme, remote silkworms play the role of remote backends otherwise spawned by postgres_fdw.

Silkroad wires local backend with remote node's workers this way:

  1. Backend process uses regular postgres_fdw API to access remote data as usual. But postgres_fdw, when silk is enabled, writes the query into shared memory queue instead of libpq connection;

  2. Silkroad process parses incoming shared memory queue from that backend and routes the message to appropriate network connection with remote silkroad process.

  3. Remote silkroad process grabs incoming message from network and (if it is a new one) redirects it to available worker's shared memory queue (or in a special "unassigned jobs" queue if all of the workers are busy).

  4. At last, remote worker gets the message through its shared memory queue, executes it and sends back the result tuples (or an error) the same way.

Silkroad acts here like a common network switch, tossing packets between backend's shared memory and appropriate network socket. It knows nothing about content of a message relying only on the message header.

2.1.4.2. Event Loop

Silkroad process runs an event loop powered by the libev library. Each backend's shared memory queue is exposed at the event loop with the eventfd descriptor, and each network connection - with a socket descriptor.

During startup, the backend registers itself (its eventfd descriptors) at a local silkroad process. Silkroad responds by specifying which memory segments to use for the backend's message queue. From this moment silkroad will respond to events from the queue associated with this backend. Network connections between local and remote silkroads will be established at once on the first request from the backend to the remote node and stay alive until both of participants (silkroad processes) exist.

2.1.4.3. Routing and Multiplexing

For each subquery, we expect a subset of tuples, and therefore represent the interaction within the subquery as a bidirectional data stream. Silkroad uses an internal routing table to register these streams. A unique stream ID (within the Shardman cluster) is formed as a pair of "origin node address, target node address" and a locally (within the node) unique number. Each particular subquery from a backend to remote nodes will be registered by silkroad as such a stream. So, any backend can be associated with many streams at the time.

When a local silkroad process got a message with a new stream ID from backend, it registers it in a local routing table and then redirects this message to an appropriate socket. If the connection with the remote silkroad does not exist, it is established using a handshake procedure. The original message that initiated a handshake is placed into a special internal buffer until the handshake succeeds. The remote silkroad process receiving a packet with the new ID registers it in its own table, then assigns a silkworm worker from a pool of available workers and places the message into the worker's shared memory queue. If all of the silkworm workers are busy at the moment, the message will be postponed, i.e., placed into a special "unassigned jobs queue" (note that the configuration parameter shardman.silk_unassigned_job_queue_size is 1024). If there is no free space in the queue, an error message will be generated and sent back to the source backend. A job from this queue will be assigned later to the first available worker when it gets rid of the previous job.

When the worker got a new job, it executes it through SPI subsystem, organizing result tuples into batches and sends them back through shared memory to the local silkroad process. The rest is trivial due to the whole route is known. The last resulting packet with tuples in a stream is marked as closing. It is an order to silkroads to wipe out this route from their tables.

Note that backend and remote workers stay subscribed to their streams until they are explicitly closed. So the backend has the opportunity to send an abort message or notify the remote worker to prematurely close the transaction. And it makes it possible to discard obsolete data packets, possibly from previous aborted transactions.

2.1.4.4. Error Handling and Route Integrity

Besides the routing table silkroad tracks endpoints (backends and network connections) that were involved in some particular stream. So when some connection is closed, all the involved backends (and/or workers) will be notified of that event with a special error message, and all routes/streams related to this connection will be dismissed. The same way, if the backend crashes, its shared memory queue become detached and silkroad reacts by sending error messages to remote participants of every stream related to the crashed backend. So remote workers are not left doing useless work when the requester has already died.

2.1.4.5. Data Transmitting/batching/splitting Oversized Tuples

The resulting tuples are transmitted by silkworm in a native binary mode. Tuples with external storage attribute will be deTOASTed, but those that were compressed stay compressed.

Small tuples will be organized in batches (about 256k). Big tuples will be cut into pieces by the sender and assembled into a whole by the receiving backend.

2.1.4.6. Streams Flow Control

It may happen that when the next message is received from a backend, it will not fit the target network buffer. Or the message received from the network does not fit into the target shared memory queue. In such a case, the stream that caused this situation will be suspended. This means that the silkroad pauses the reaction to events from the source endpoint (connection or backend) until the target endpoint drains their messages. The rest backends and connections not affected by this route are kept working. Receiving modules of backends are designed to minimize these situations. The backend periodically checks and drains the incoming queue even when the plan executor is busy processing other plan nodes. Received tuples are stored in backend's tuplestores according the plan nodes until the executor requests the next tuple for a particular plan node execution.

When enough space is freed on the target queue, the suspended stream gets resumed, endpoint's events get unblocked and the process of receiving and sorting packets continues.

2.1.4.7. Implementation details

2.1.4.7.1. State Transferring and CSNs

When postgres_fdw works over Silk transport, only one connection between silkroad routing daemons is used to transfer user requests to silkworm workers and get their responses. Each request contains a transaction state, a replication group ID of the node where the request is formed (coordinator), a query itself and query parameters (if present). A response is either an error response message with a specific error message and error code or a bunch of tuples followed by end of tuples message. This means that silkworm has to switch to the transaction state coming with the request prior to executing the request.

For now, Silk transport is used only for read-only SELECT queries. All modifying requests are processed via a usual libpq connection and handled mostly as all other DML requests in PostgreSQL postgres_fdw. The only distinction is that when a DML request is processed by postgres_fdw, it resets the saved transaction state for the connection cache entry corresponding to the connection where this request is sent. Also a read-only flag is set to false for such a connection cache entry. When a request is sent over Silk transport, Shardman extension asks for the transaction state for a pair of serverid and userid from postgres_fdw. If such a connection cache entry is found in the postgres_fdw connection cache, it is not a read-only cache entry and transaction state is present in this entry, the state is returned. If it is not present, postgres_fdw retreives a full transaction state from the remote server, saves it in the connection cache entry and returns to the Shardman extension.

The full transaction state is similar to the parallel worker transaction state and contains:

  • information related to the current user (uid, username)

  • pid of the current backend

  • transaction start timestamp

  • current snapshot CSN

  • flags indicating that invalidation messages are present

  • backend private state:

    • array of ComboCIDs

    • internal transaction state (full transaction ID, isolation level, current command ID, etc.)

    • information about reindexed indexes

If the connection is not found in the postgres_fdw connection cache (i.e., it is a new connection) or the entry in the connection cache is marked as read-only, only these characteristics form the transaction state:

  • information related to the current user (username)

  • transaction start timestamp

  • current snapshot CSN

  • flags indicating that invalidation messages are present

Using such transaction states, silkworm can attach to a running transaction or start a new read-only transaction with the provided snapshot CSN and retreive the result.

Note that the full transaction state can be imported only on the server that exported it. Also note that due to this transaction state transferring method, you cannot use Silk transport without enabling CSN snapshots.

2.1.4.7.2. Integration with Asynchronous FDW Engine

In the Section 2.1.2.2, asynchronous ForeignScan plan nodes were presented as a way to optimize data retrieval from multiple hosts while these plan nodes were located under a single Append node. In the standard PostgreSQL architecture, the execution of ForeignScan plan nodes is implemented using the network protocol based on libpq. To improve the system performance during data transfer and reduce resource consumption, Shardman employs a different method for exchanging data with remote hosts. The mechanism for executing ForeignScan nodes is implemented using the Silk protocol.

To incorporate Silk transport into the asynchronous executor, modifications were made to the postgres_fdw extension. A pluggable transport was implemented as a set of interface functions included as part of the Shardman extension. During execution of callbacks that interact with remote hosts, these functions are called by the postgres_fdw extension. The pluggable Silk transport is activated if the Shardman extension is preloaded and if the foreign server has the attribute extended_features (applicable for any FDW server in the Shardman cluster). For all other cases, the postgres_fdw extension uses the standard exchange protocol based on libpq.

To disable the pluggable Silk transport in the Shardman cluster, it is necessary to set the query_engine_mode configuration parameter to the value of ENGINE_NONE.

In the current implementation, the pluggable Silk transport is only used for read-only queries, specifically during the execution of the ForeignScan node. The standard exchange protocol based on libpq is used for modifying queries.

When receiving query execution result rows using the Silk transport, the data is stored in a TupleStoreState storage as a complete result set, which is the same size as that returned by the remote host. The TupleStoreState is implemented as a data structure that can spill data to the disk in case of memory shortage. If the remote host returns a large result set, it does not lead to an out-of-memory (OOM) condition. Once the result set is received in the TupleStoreState, the data is copied into the ForeignScan executor's in-memory buffer. The size of this buffer is defined by the fetch_size attribute of the foreign server. The default value of 50000 rows can be adjusted to find a balance between the performance (number of ForeignScan node calls) and memory consumption.

Utilizing the pluggable Silk transport for the asynchronous FDW engine results in an increase of the network exchange performance and a reduction of the system resource consumption due to better utilization of system resources, including the number of network connections.

2.1.5. Distributed Deadlock Detection

Distributed deadlocks may occur during the processing of distributed transactions. Let us consider the following example:

    create table players(id int, username text, pass text) with (distributed_by='id');
    insert into players select id, 'user_' || id, 'pass_' || id from generate_series(1,1000) id;

Assume that the record with id=2 belongs to node1 and the record with id=3 belongs to node2.

Let us execute the following commands on different nodes:

    node1=# begin;
    node1=# update players set pass='someval' where id=3;

    node2=# begin;
    node2=# update players set pass='someval' where id=2;

    -- it should stuck because transaction on node1 locked record with id=3
    node2=# update players set pass='someval2' where id=3;

    -- it should stuck because transaction on node2 locked record with id=2
    node1=# update players set pass='someval2' where id=2;

A distributed deadlock situation arises when transactions are mutually locked by each other. PostgreSQL has an internal mechanism for deadlock detection, which detects mutual locking between child processes of a single PostgreSQL instance (backend) and resolves it. However, this mechanism is not applicable to the discovered situation because mutual locking is distributed, i.e., backends from different nodes are involved. From the point of view of the PostgreSQL lock manager, there is no deadlock condition because processes of the single instance are not locking each other. Therefore, Shardman has its own mechanism for distributed deadlock resolution.

We can represent the interaction between processes in the entire cluster as a graph. A graph vertex represents a process (backend), which we can identify with a couple of attributes {rgid; vxid}, where rgid is the replication group ID, and vxid is the virtual transaction ID of the currently executed transaction. Graph edges represent directional connections between vertices. Each connection is directed from the locked process to the locking process.

It is obvious that any process can be locked by only one process. In other words, if the backend is waiting for a lock, it can only wait for a specific lock. On the other hand, a locking process can acquire multiple locks, meaning that it can lock multiple backends simultaneously.

With that said, the lock graph acts as a singly linked list. If this list contains a closed loop, then here is a deadlock condition. To detect a deadlock, it is necessary to build such a list and detect closed loops in it.

The distributed deadlock detector in Shardman is implemented as a separate task inside the Shardman monitor. If a process is unable to acquire a lock within a specified amount of time (which is one second by default, but can be adjusted using the deadlock_timeout configuration parameter), the internal PostgreSQL deadlock detector attempts to detect a local deadlock. If no local deadlock is found, the distributed deadlock detector is activated.

The distributed deadlock detector builds a graph (list) of locks in the cluster. It queries views pg_locks and pg_stat_activity on the local node and on each of the remote cluster nodes.

The process of building the list of locks involves sequentially querying nodes in the cluster, and it is not atomic, so the list is not consistent. This means that the distributed deadlock detector may produce false positives. During the building of the list, we can store a lock that can disappear before the end of the list building process. To guarantee the reliability of deadlock detection, after the detection of a closed loop, it is necessary to re-query the nodes involved in the closed loop.

After finding the closed loop, the distributed deadlock detector chooses the process belonging to the local node and cancels it. The user process served by the cancelled backend will receive a message:

    canceling statement due distributed deadlock was found

A verbose message about the detected deadlock will be recorded in the server logs:

    LOG:  distributed deadlock detected
    DETAIL:  repgroup 1, PID 95264 (application 'psql'), executed query 'update players set pass='qqq' where id=2;' is blocked by repgroup 1, PID 95283 (application 'pgfdw:2:95278:9/2'), executed query 'UPDATE public.players_0 SET pass = 'qqq'::text WHERE ((id = 2))'
    repgroup 1, PID 95283 (application 'pgfdw:2:95278:9/2'), executed query 'UPDATE public.players_0 SET pass = 'qqq'::text WHERE ((id = 2))' is blocked by repgroup 2, PID 95278 (application 'psql'), executed query 'update players set pass='qqq' where id=3;'
    repgroup 2, PID 95278 (application 'psql'), executed query 'update players set pass='qqq' where id=3;' is blocked by repgroup 2, PID 95267 (application 'pgfdw:1:95264:8/4'), executed query 'UPDATE public.players_1 SET pass = 'qqq'::text WHERE ((id = 3))'
    repgroup 2, PID 95267 (application 'pgfdw:1:95264:8/4'), executed query 'UPDATE public.players_1 SET pass = 'qqq'::text WHERE ((id = 3))' is blocked by repgroup 1, PID 95264 (application 'psql'), executed query 'update players set pass='qqq' where id=2;'

2.1.6. Resource control?

2.1.7. Global Sequences

Global sequences in Shardman are implemented on top of regular PostgreSQL sequences with some additional cluster-wide metadata, which among other things holds the interval of globally unused sequence elements.

When CREATE SEQUENCE is issued, an ordinary PostgreSQL sequence with the same name is created on every cluster node. The range of this local sequence is a bounded sub-interval of the global sequence (as defined by MINVALUE and MAXVALUE parameters), and it contains at most block_size elements. The shardman.next_value function returns values from the local sequence until it runs out, then a new sub-interval with block_size elements is allocated from the global sequence using a broadcast query involving all cluster nodes. So, smaller block size values make the generated numbers more monotonic across the cluster, but incur a performance penalty since the broadcast query may be rather expensive. Another way to describe the block size parameter is to say that it controls the size of the second cache level, similarly to how the CACHE parameter works, except at the level of an entire Shardman cluster.

Also note, that every time a new sub-interval is allocated the underlying local sequence is modified (as in ALTER SEQUENCE), which will lock it for the transaction duration, preventing any other local concurrent transactions from obtaining next sequence values.

2.1.8. Syncpoints and consistent backup

To ensure that cluster binary backup is consistent, Shardman implements the syncpoints mechanism.

To achieve consistent visibility of distributed transactions, the technique of global snapshots based on physical clocks is used (Clock-Si). Similarly, it is possible to get a consistent snapshot for backups, only the time corresponding to the global snapshot must be mapped to a set of LSN for each node. Such a set of consistent LSN in a cluster is called a syncpoint.

In a Shardman cluster, each node can generate its own independent local CSN, which does not guarantee the global ordering of values in time. Therefore, we cannot take this arbitrary local CSN as the basis for a syncpoint. Instead, Shardman choses only those CSNs that match distributed transaction commit records as the basis of the syncpoint. These CSNs have the property of global ordering to the Clock-Si algorithm and can be used to obtain a syncpoint. The main points of this mechanism are described below.

The commit record of each completed transaction in Shardman contains the assigned CSN for this transaction. This value, together with the LSN of this record, forms a pair of values (CSN, LSN). Each of the cluster nodes stores a certain number of such pairs in RAM in a special structure - the CSNLSNMap. CSNLSNMap is a circular buffer. Each element of the map is a (CSN, LSN) pair. The map size is set by the configuration settings csn_lsn_map_size. A (CSN, LSN) pair can be added to the map only if there are no transactions on the node that can receive a CSN less than the one added. This important condition guarantees monotonous growth of CSN and LSN in CSNLSNmap, but does not guarantee that every commit record will get into the map.

When a user submits a request to create a syncpoint, a search by every CSNLSNMap is made for a largest possible CSNg for which there is an entry (CSN n, LSN) in each node and the condition CSNn <= CSNg is true. The monotonic growth property of every CSNLSNMap ensures that each found pair (CSNn, LSN) corresponds to the state of the global data at the time corresponding to CSNg. If no such value of CSNg is found, the get syncpoint operation fails and can be retried later. If such a value CSNg is found, then a syncpoint is generated as a special type of WAL record, which is duplicated on all nodes of the cluster.

By getting a syncpoint and taking the LSN for each node in the cluster from it, we can make a backup of each node, which must necessarily contain that LSN. We can also recover to this LSN using the point in time recovery (PITR) mechanism.