Silk (Shardman InterLinK)
is an experimental transport feature. It is injected at the point where
postgres_fdw decides to transmit deparsed piece of query
through libpq connection to the remote node, replacing
libpq connection with itself. It is designed to decrease
the count of idle postgres_fdw connections during
transaction execution, minimize latency and boost overall throughput.
Silk implementation uses several background processes.
The main routing/multiplexing process (one per PostgreSQL
instance), called silkroad, and a bunch of background workers, called
silkworms. While postgres_fdw uses
libpq, it spawns multiple libpq connections
from each backend to the remote node (where multiple backend processes are spawned
accordingly). But if silk replaces libpq -
every silkroad process is connected to only one remote
silkroad. In this scheme, remote silkworms
play the role of remote backends otherwise spawned by postgres_fdw.
Silkroad wires local backend with remote node's workers this way:
Backend process uses regular postgres_fdw API to access
remote data as usual. But postgres_fdw, when silk is enabled,
writes the query into shared memory queue instead of libpq
connection;
Silkroad process parses incoming shared memory queue from
that backend and routes the message to appropriate network connection with
remote silkroad process.
Remote silkroad process grabs incoming message from network
and (if it is a new one) redirects it to available worker's shared memory queue
(or in a special "unassigned jobs" queue if all of the workers are busy).
At last, remote worker gets the message through its shared memory queue, executes it and sends back the result tuples (or an error) the same way.
Silkroad acts here like a common network switch, tossing packets
between backend's shared memory and appropriate network socket. It knows nothing about
content of a message relying only on the message header.
Silkroad process runs an event loop powered by the libev
library. Each backend's shared memory queue is exposed at the event loop with
the eventfd descriptor, and each network connection -
with a socket descriptor.
During startup, the backend registers itself (its eventfd
descriptors) at a local silkroad process. Silkroad
responds by specifying which memory segments to use for the backend's message queue.
From this moment silkroad will respond to events from the queue
associated with this backend. Network connections between local and remote
silkroads will be established at once on the first request from
the backend to the remote node and stay alive until both of participants (silkroad
processes) exist.
For each subquery, we expect a subset of tuples, and therefore represent the interaction
within the subquery as a bidirectional data stream. Silkroad
uses an internal routing table to register these streams. A unique stream ID (within
the Shardman cluster) is formed as a pair of "origin
node address, target node address" and a locally (within the node) unique number.
Each particular subquery from a backend to remote nodes will be registered by
silkroad as such a stream. So, any backend can be associated
with many streams at the time.
When a local silkroad process got a message with a new stream ID
from backend, it registers it in a local routing table and then redirects this message
to an appropriate socket. If the connection with the remote silkroad does not exist,
it is established using a handshake procedure. The original message that initiated
a handshake is placed into a special internal buffer until the handshake succeeds.
The remote silkroad process receiving a packet with the new ID registers
it in its own table, then assigns a silkworm worker from a pool of available workers
and places the message into the worker's shared memory queue. If all of the
silkworm workers are busy at the moment, the message will be
postponed, i.e., placed into a special "unassigned jobs queue"
(note that the configuration parameter shardman.silk_unassigned_job_queue_size is 1024).
If there is no free space in the queue, an error message will be generated and sent
back to the source backend. A job from this queue will be assigned later to the first
available worker when it gets rid of the previous job.
When the worker got a new “job”, it executes it through SPI subsystem,
organizing result tuples into batches and sends them back through shared memory to the local
silkroad process. The rest is trivial due to the whole route is known.
The last resulting packet with tuples in a stream is marked as “closing”. It is an order
to silkroads to wipe out this route from their tables.
Note that backend and remote workers stay “subscribed” to their streams until they are explicitly closed. So the backend has the opportunity to send an abort message or notify the remote worker to prematurely close the transaction. And it makes it possible to discard obsolete data packets, possibly from previous aborted transactions.
Besides the routing table silkroad tracks endpoints (backends and network connections)
that were involved in some particular stream. So when some connection is closed, all
the involved backends (and/or workers) will be notified of that event with a special
error message, and all routes/streams related to this connection will be dismissed.
The same way, if the backend crashes, its shared memory queue become detached
and silkroad reacts by sending error messages to remote participants
of every stream related to the crashed backend. So remote workers are not left doing
useless work when the requester has already died.
The resulting tuples are transmitted by silkworm in a native binary mode.
Tuples with external storage attribute will be deTOASTed, but those that were compressed
stay compressed.
Small tuples will be organized in batches (about 256k). Big tuples will be cut into pieces by the sender and assembled into a whole by the receiving backend.
It may happen that when the next message is received from a backend, it will not fit
the target network buffer. Or the message received from the network does not fit
into the target shared memory queue. In such a case, the stream that caused
this situation will be “suspended”. This means that the silkroad
pauses the reaction to events from the source endpoint (connection or backend) until
the target endpoint drains their messages. The rest backends and connections
not affected by this route are kept working. Receiving modules of backends are designed
to minimize these situations. The backend periodically checks and drains the incoming
queue even when the plan executor is busy processing other plan nodes. Received tuples
are stored in backend's tuplestores according the plan nodes until the executor
requests the next tuple for a particular plan node execution.
When enough space is freed on the target queue, the suspended stream gets resumed, endpoint's events get unblocked and the process of receiving and sorting packets continues.
When postgres_fdw works over Silk transport, only one connection
between silkroad routing daemons is used to transfer user requests
to silkworm workers and get their responses.
Each request contains a transaction state, a replication group ID of the node
where the request is formed (coordinator), a query itself and query parameters (if present).
A response is either an error response message with a specific error message and error code
or a bunch of tuples followed by “end of tuples” message.
This means that silkworm has to switch to the transaction state
coming with the request prior to executing the request.
For now, Silk transport is used only for read-only SELECT queries.
All modifying requests are processed via a usual libpq connection and handled
mostly as all other DML requests in PostgreSQL postgres_fdw. The only distinction is that
when a DML request is processed by postgres_fdw, it resets the saved transaction
state for the connection cache entry corresponding to the connection where this request is sent.
Also a read-only flag is set to false for such a connection cache entry.
When a request is sent over Silk transport, Shardman extension
asks for the transaction state for a pair of serverid and userid from postgres_fdw.
If such a connection cache entry is found in the postgres_fdw connection cache, it is not a read-only
cache entry and transaction state is present in this entry, the state
is returned. If it is not present, postgres_fdw retreives a full transaction state
from the remote server, saves it in the connection cache entry and returns to the
Shardman extension.
The full transaction state is similar to the parallel worker transaction state and contains:
information related to the current user (uid, username)
pid of the current backend
transaction start timestamp
current snapshot CSN
flags indicating that invalidation messages are present
backend private state:
array of ComboCIDs
internal transaction state (full transaction ID, isolation level, current command ID, etc.)
information about reindexed indexes
If the connection is not found in the postgres_fdw connection cache (i.e., it is a new connection) or the entry in the connection cache is marked as read-only, only these characteristics form the transaction state:
information related to the current user (username)
transaction start timestamp
current snapshot CSN
flags indicating that invalidation messages are present
Using such transaction states, silkworm can attach to a running transaction
or start a new read-only transaction with the provided snapshot CSN and retreive the result.
Note that the full transaction state can be imported only on the server that exported it. Also note that due
to this transaction state transferring method, you cannot use Silk transport
without enabling CSN snapshots.
In the Section 7.2.2, asynchronous ForeignScan plan nodes
were presented as a way to optimize data retrieval from multiple hosts while these plan nodes
were located under a single Append node. In the standard PostgreSQL
architecture, the execution of ForeignScan plan nodes is implemented using
the network protocol based on libpq. To improve the system performance during data
transfer and reduce resource consumption, Shardman employs a different
method for exchanging data with remote hosts. The mechanism for executing ForeignScan
nodes is implemented using the Silk protocol.
To incorporate Silk transport into the asynchronous executor, modifications
were made to the postgres_fdw extension. A pluggable transport was implemented as
a set of interface functions included as part of the Shardman extension.
During execution of callbacks that interact with remote hosts, these functions are called by the
postgres_fdw extension. The pluggable Silk transport is activated
if the Shardman extension is preloaded and if the foreign server has
the attribute extended_features (applicable for any FDW server in the
Shardman cluster). For all other cases, the postgres_fdw extension
uses the standard exchange protocol based on libpq.
To disable the pluggable Silk transport in the Shardman
cluster, it is necessary to set the query_engine_mode configuration parameter to
the value of ENGINE_NONE.
In the current implementation, the pluggable Silk transport is only used for
read-only queries, specifically during the execution of the ForeignScan node.
The standard exchange protocol based on libpq is used for modifying queries.
When receiving query execution result rows using the Silk transport, the data
is stored in a TupleStoreState storage as a complete result set, which is
the same size as that returned by the remote host. The TupleStoreState is
implemented as a data structure that can spill data to the disk in case of memory shortage.
If the remote host returns a large result set, it does not lead to an out-of-memory (OOM) condition. Once the result
set is received in the TupleStoreState, the data is copied into the
ForeignScan executor's in-memory buffer. The size of this buffer is defined by the
fetch_size attribute of the foreign server. The default value of 50000 rows
can be adjusted to find a balance between the performance (number of ForeignScan node
calls) and memory consumption.
Utilizing the pluggable Silk transport for the asynchronous FDW engine results in an increase of the network exchange performance and a reduction of the system resource consumption due to better utilization of system resources, including the number of network connections.