Silk (Postgres Pro Shardman InterLinK)
is a transport feature used for inter-cluster
communications. It is injected at the point where
Postgres Pro Shardman decides to transmit deparsed piece of query
through libpq connection to the remote node, replacing
libpq connection with itself. It is designed to decrease
the count of idle shardman_fdw connections during
transaction execution, minimize latency and boost overall throughput.
Silk implementation uses several background processes.
The main routing/multiplexing process (one per PostgreSQL
instance), called silkroad, and a bunch of background workers, called
silkworms. Every silkroad process is connected to only one remote
silkroad. In this scheme, remote silkworms
play the role of remote backends.
Silkroad wires local backend with remote node's workers this way:
Backend process uses regular API to access
remote data as usual. But Postgres Pro Shardman, when silk is enabled,
writes the query into shared memory queue instead of libpq
connection;
Silkroad process parses incoming shared memory queue from
that backend and routes the message to appropriate network connection with
remote silkroad process.
Remote silkroad process grabs incoming message from network
and (if it is a new one) redirects it to available worker's shared memory queue
(or in a special "unassigned jobs" queue if all of the workers are busy).
At last, remote worker gets the message through its shared memory queue, executes it and sends back the result tuples (or an error) the same way.
Silkroad acts here like a common network switch, tossing packets
between backend's shared memory and appropriate network socket. It knows nothing about
content of a message relying only on the message header.
Silkroad process runs an event loop powered by the libev
library. Each backend's shared memory queue is exposed at the event loop with
the eventfd descriptor, and each network connection -
with a socket descriptor.
During startup, the backend registers itself (its eventfd
descriptors) at a local silkroad process. Silkroad
responds by specifying which memory segments to use for the backend's message queue.
From this moment silkroad will respond to events from the queue
associated with this backend. Network connections between local and remote
silkroads will be established at once on the first request from
the backend to the remote node and stay alive until both of participants (silkroad
processes) exist.
For each subquery, we expect a subset of tuples, and therefore represent the interaction
within the subquery as a bidirectional data stream. Silkroad
uses an internal routing table to register these streams. A unique stream ID (within
the Postgres Pro Shardman cluster) is formed as a pair of "origin
node address, target node address" and a locally (within the node) unique number.
Each particular subquery from a backend to remote nodes will be registered by
silkroad as such a stream. So, any backend can be associated
with many streams at the time.
When a local silkroad process got a message with a new stream ID
from backend, it registers it in a local routing table and then redirects this message
to an appropriate socket. If the connection with the remote silkroad does not exist,
it is established using a handshake procedure. The original message that initiated
a handshake is placed into a special internal buffer until the handshake succeeds.
The remote silkroad process receiving a packet with the new ID registers
it in its own table, then assigns a silkworm worker from a pool of available workers
and places the message into the worker's shared memory queue. If all of the
silkworm workers are busy at the moment, the message will be
postponed, i.e., placed into a special "unassigned jobs queue"
(note that the configuration parameter shardman.silk_unassigned_job_queue_size is 1024).
If there is no free space in the queue, an error message will be generated and sent
back to the source backend. A job from this queue will be assigned later to the first
available worker when it gets rid of the previous job.
When the worker got a new “job”, it executes it through SPI subsystem,
organizing result tuples into batches and sends them back through shared memory to the local
silkroad process. The rest is trivial due to the whole route is known.
The last resulting packet with tuples in a stream is marked as “closing”. It is an order
to silkroads to wipe out this route from their tables.
Note that backend and remote workers stay “subscribed” to their streams until they are explicitly closed. So the backend has the opportunity to send an abort message or notify the remote worker to prematurely close the transaction. And it makes it possible to discard obsolete data packets, possibly from previous aborted transactions.
To observe the current state of the silkroad multiplexer process,
Silk diagnostics views are available.
Besides the routing table silkroad tracks endpoints (backends and network connections)
that were involved in some particular stream. So when some connection is closed, all
the involved backends (and/or workers) will be notified of that event with a special
error message, and all routes/streams related to this connection will be dismissed.
The same way, if the backend crashes, its shared memory queue become detached
and silkroad reacts by sending error messages to remote participants
of every stream related to the crashed backend. So remote workers are not left doing
useless work when the requester has already died.
The resulting tuples are transmitted by silkworm in a native binary mode.
Tuples with external storage attribute will be deTOASTed, but those that were compressed
stay compressed.
Small tuples will be organized in batches (about 256k). Big tuples will be cut into pieces by the sender and assembled into a whole by the receiving backend.
It may happen that when the next message is received from a backend, it will not fit
the target network buffer. Or the message received from the network does not fit
into the target shared memory queue. In such a case, the stream that caused
this situation will be “suspended”. This means that the silkroad
pauses the reaction to events from the source endpoint (connection or backend) until
the target endpoint drains their messages. The rest backends and connections
not affected by this route are kept working. Receiving modules of backends are designed
to minimize these situations. The backend periodically checks and drains the incoming
queue even when the plan executor is busy processing other plan nodes. Received tuples
are stored in backend's tuplestores according the plan nodes until the executor
requests the next tuple for a particular plan node execution.
When enough space is freed on the target queue, the suspended stream gets resumed, endpoint's events get unblocked and the process of receiving and sorting packets continues.
When Postgres Pro Shardman works over Silk transport, only one connection
between silkroad routing daemons is used to transfer user requests
to silkworm workers and get their responses.
Each request contains a transaction state, a replication group ID of the node
where the request is formed (coordinator), a query itself and query parameters (if present).
A response is either an error response message with a specific error message and error code
or a bunch of tuples followed by “end of tuples” message.
This means that silkworm has to switch to the transaction state
coming with the request prior to executing the request.
For now, Silk transport is used only for read-only SELECT queries.
All modifying requests are processed via a usual libpq connection and handled
mostly as all other DML requests in PostgreSQL. The only distinction is that
when a DML request is processed by shardman_fdw, it resets the saved transaction
state for the connection cache entry corresponding to the connection where this request is sent.
Also a read-only flag is set to false for such a connection cache entry.
When a request is sent over Silk transport, Postgres Pro Shardman extension
asks for the transaction state for a pair of serverid and userid.
If such a connection cache entry is found, it is not a read-only
cache entry and transaction state is present in this entry, the state
is returned. If it is not present, Postgres Pro Shardman retrieves a full transaction state
from the remote server and saves it in the connection cache entry.
The full transaction state is similar to the parallel worker transaction state and contains:
information related to the current user (uid, username)
pid of the current backend
transaction start timestamp
current snapshot CSN
flags indicating that invalidation messages are present
set of configuration settings, set in current session (including the application name)
backend private state:
array of ComboCIDs
internal transaction state (full transaction ID, isolation level, current command ID, etc.)
information about reindexed indexes
If the connection is not found in the connection cache (i.e., it is a new connection) or the entry in the connection cache is marked as read-only, only these characteristics form the transaction state:
information related to the current user (username)
transaction start timestamp
current snapshot CSN
flags indicating that invalidation messages are present
set of configuration settings, set in current session (including application name)
Using such transaction states, silkworm can attach to a running transaction
or start a new read-only transaction with the provided snapshot CSN and retreive the result.
Note that the full transaction state can be imported only on the server that exported it. Also note that due
to this transaction state transferring method, you cannot use Silk transport
without enabling CSN snapshots.
In the Section 54.2.2, asynchronous ForeignScan plan nodes
were presented as a way to optimize data retrieval from multiple hosts while these plan nodes
were located under a single Append node. In the standard PostgreSQL
architecture, the execution of ForeignScan plan nodes is implemented using
the network protocol based on libpq. To improve the system performance during data
transfer and reduce resource consumption, Postgres Pro Shardman employs a different
method for exchanging data with remote hosts. The mechanism for executing ForeignScan
nodes is implemented using the Silk protocol.
To disable the Silk transport for
query execution, it is necessary to set the query_engine_mode configuration parameter to
the value of ENGINE_NONE.
In the current implementation, the Silk transport is only used for
read-only queries, specifically during the execution of the ForeignScan node.
The standard exchange protocol based on libpq is used for modifying queries.
When receiving query execution result rows using the Silk transport, the data
is stored in a TupleStoreState storage as a complete result set, which is
the same size as that returned by the remote host. The TupleStoreState is
implemented as a data structure that can spill data to the disk in case of memory shortage.
If the remote host returns a large result set, it does not lead to an out-of-memory (OOM) condition. Once the result
set is received in the TupleStoreState, the data is copied into the
ForeignScan executor's in-memory buffer. The size of this buffer is defined by the
fetch_size attribute of the foreign server. The default value of 50000 rows
can be adjusted to find a balance between the performance (number of ForeignScan node
calls) and memory consumption.
Utilizing the Silk transport for the asynchronous FDW engine results in an increase of the network exchange performance and a reduction of the system resource consumption due to better utilization of system resources, including the number of network connections.
Views in this section provide various information related to
Silk multiplexing. See Section 53.3.3
for details of silkroad multiplexing process.
shardman.silk_routes #
The shardman.silk_routes view displays the current
snapshot of the multiplexer routing table. The columns of
the view are shown in Table 53.1.
Table 53.1. shardman.silk_routes Columns
| Name | Type | Description |
|---|---|---|
hashvalue | integer | Internal unique route identifier. Can be used to join with other Silk diagnostics views. |
origin_ip | inet | IP address of the source node, which generated this route |
origin_port | int2 | External TCP connection port of the source node, which generated this route |
channel_id | integer | Route sequential number within the node that generated this route.
channel_id is unique for the pair
origin_ip + origin_port.
This pair is a unique node identifier within the Postgres Pro Shardman
cluster and hence the origin_ip +
origin_port + channel_id
tuple is a unique route identifier within the Postgres Pro Shardman
cluster.
|
from_cn | integer | Connect index in the shardman.silk_connects
view for incoming routes, that is, not generated by this node, and -1 for
routes generated by this node.
|
backend_id | integer | ID of the local process that is currently using this route: either
the ID of the backend that generated this route or the ID of the
silkworm worker assigned to this route. Equals -1
for queued incoming routes that have not been assigned a
worker yet.
|
pending_queue_bytes | bigint | Size of the queue of delayed messages (awaiting a free worker) for this route, in bytes. This value is only meaningful for incoming routes of each node that are not assigned to a worker yet. |
pending_queue_messages | bigint | Number of messages in the queue of delayed messages (awaiting a free worker) for this route. This value is only meaningful for incoming routes of each node that are not assigned to a worker yet. |
connects | integer[] | List of indexes of connects that are currently using this route. |
shardman.silk_connects #
The shardman.silk_connects view displays the current
list of multiplexer connects. The columns of
the view are shown in Table 53.2.
Table 53.2. shardman.silk_connects Columns
| Name | Type | Description |
|---|---|---|
cn_index | integer | Unique connect index |
reg_ip | inet | “Registration” IP address of the node with which the connection is established. See Notes for details. |
reg_port | int2 | “Registration” TCP port of the node with which the connection is established. See Notes for details. |
read_ev_active | boolean | true if the multiplexer is ready to receive
data to the incoming queue.
See Notes
for details.
|
write_ev_active | boolean | true if the multiplexer filled the queue of
non-sent messages and is waiting for it to get free.
See Notes
for details.
|
is_outgoing | boolean | true if the connection is outgoing, that is,
created by connect, and false
for incoming connects, that is, created by accept.
Only used during the handshaking.
|
state | text | Current state of the connect:
connected — if the connection is established,
in progress — if the client has already
connected, but handshaking has not happened yet,
free — if the client has already
disconnected, but the connect structure for the disconnected client has
not been destroyed yet.
|
pending_queue_bytes | bigint | Size of the queue of non-sent messages for this connect, in bytes |
pending_queue_messages | bigint | Number of messages in the queue of non-sent messages for this connect |
blocked_by_backend | integer | ID of the backend that blocked this connect |
blocks_backends | integer[] | List of IDs of backends that are blocked by this connect |
routes | integer[] | List of unique IDs of routes that use this connect |
elapsed_time_write | bigint | Time from the last writing event of a connect, in microseconds |
elapsed_time_read | bigint | Time from the last reading event of a connect, in microseconds |
shardman.silk_backends #
The shardman.silk_backends view displays the current
list of processes of two kinds: backends that serve client connections and
silkworm multiplexer workers, which interact with the
multiplexer. The columns of the view are shown in
Table 53.3.
Table 53.3. shardman.silk_backends Columns
| Name | Type | Description |
|---|---|---|
backend_id | integer | Unique backend/worker identifier |
pid | integer | OS process ID |
attached | boolean | Value is true if backend is attached to multiplexer, false otherwise |
read_ev_active | boolean | true if the backend/worker is ready to receive
data to the incoming queue.
See Notes
for details.
|
write_ev_active | boolean | true if the backend/worker filled the queue
of non-sent messages and is waiting for it to get free.
See Notes
for details.
|
is_worker | boolean | true if this process is a silkworm
multiplexer worker and false otherwise
|
pending_queue_bytes | bigint | Size of the queue of messages being sent to this backend/worker, in bytes |
pending_queue_messages | bigint | Number of messages in the queue of messages being sent to this backend/worker |
blocked_by_connect | integer | Index of the connect that blocks this backend/worker |
blocks_connects | integer[] | List of indexes of connects that are blocked by this backend/worker |
routes | integer[] | List of unique IDs of routes that are used by this backend/worker |
in_queue_used | bigint | Number of queued data bytes in the incoming queue in the shared memory between the backend and multiplexer |
out_queue_used | bigint | Number of queued data bytes in the outgoing queue in the shared memory between the backend and multiplexer |
elapsed_time_write | bigint | Time from the last writing event of a backend, in microseconds |
elapsed_time_read | bigint | Time from the last reading event of backend, in microseconds |
shardman.silk_routing #
The shardman.silk_routing view displays the results
of the shardman.silk_routing function.
Table 53.4.
Table 53.4. shardman.silk_routing Columns
| Name | Type | Description |
|---|---|---|
hashvalue | integer | Internal unique route identifier |
origin_ip | inet | IP address of the node that generated this route |
origin_port | int2 | External TCP connection port of the source node that generated this route |
channel_id | integer | Route sequential number within the node that generated this route |
is_outgoing | boolean | true if this route was produced by the
outgoing network connection, false if it was
produced by the incoming network connection.
|
pending_queue_bytes | bigint | Pending queue size, in bytes |
pending_queue_messages | bigint | Number of pending queue messages |
backend_id | integer | ID of the local process that is currently using this route: either
the ID of the backend that generated this route or the ID of the
silkworm worker assigned to this route. Equals -1
for queued incoming routes that have not been assigned a
worker yet.
|
backend_pid | integer | Returns the process ID of the server process attached to the current session |
attached | boolean | Value is true if backend is attached to multiplexer, false otherwise |
backend_rd_active | boolean | true if the backend/worker is ready to receive
data to the incoming queue.
See Notes
for details.
|
backend_wr_active | boolean | true if the backend/worker filled the queue
of non-sent messages and is waiting for it to get free.
See Notes
for details.
|
is_worker | boolean | true if this process is a silkworm
multiplexer worker and false otherwise
|
backend_blocked_by_cn | integer | Index of the connect that blocks this backend/worker |
blocks_connects | integer[] | List of indexes of connects that are blocked by this backend/worker |
in_queue_used | bigint | Number of queued data bytes in the incoming queue in the shared memory between the backend and multiplexer |
out_queue_used | bigint | Number of queued data bytes in the outgoing queue in the shared memory between the backend and multiplexer |
connect_id | integer | Unique connect index |
reg_ip | inet | “Registration” IP address of the node with which the connection is established |
reg_port | int2 | “Registration” TCP port of the node with which the connection is established |
connect_rd_active | boolean | true if the multiplexer is ready to receive
data to the incoming queue
|
connect_wr_active | boolean | true if the multiplexer filled the queue of
non-sent messages and is waiting for it to get free
|
connect_is_outgoing | boolean | true if the connection is outgoing, that is,
created by connect, and false
for incoming connects, that is, created by accept.
Only used during the handshaking.
|
connect_state | text | Current state of the connect:
connected — if the connection is established,
in progress — if the client has already
connected, but handshaking has not happened yet,
free — if the client has already
disconnected, but the connect structure for the disconnected client has
not been destroyed yet
|
connect_outgoing_queue_bytes | bigint | Size of the queue of non-sent messages for this connect, in bytes |
connect_outgoing_queue_messages | bigint | Number of messages in the queue of non-sent messages for this connect |
connect_blocked_by_bk | integer | ID of the backend that blocked this connect |
blocks_backends | integer[] | List of IDs of backends that are blocked by this connect |
connect_elapsed_time_write | bigint | Time from the last writing event of a connect, in microseconds |
connect_elapsed_time_read | bigint | Time from the last reading event of a connect, in microseconds |
backend_elapsed_time_write | bigint | Time from the last writing event of a backend, in microseconds |
backend_elapsed_time_read | bigint | Time from the last reading event of a backend, in microseconds |
shardman.silk_pending_jobs #
The shardman.silk_pending_jobs view displays the current
list of routes in the queue of delayed multiplexer jobs, that is, jobs
that are not assigned to workers yet. The columns of
the view are shown in Table 53.5.
Table 53.5. shardman.silk_pending_jobs Columns
| Name | Type | Description |
|---|---|---|
hashvalue | integer | Internal unique route identifier |
origin_ip | inet | IP address of the node that generated this route |
origin_port | int2 | TCP connection port of the node that generated this route |
channel_id | integer | Route sequential number within the node that generated this route |
query | text | The first queued message |
pending_queue_bytes | bigint | Pending queue size, in bytes |
pending_queue_messages | bigint | Number of pending queue messages |
shardman.silk_statinfo #
The shardman.silk_statinfo view displays the current
multiplexer state information. The columns of the view are shown in
Table 53.6.
Table 53.6. shardman.silk_statinfo Columns
| Name | Type | Description |
|---|---|---|
pid | integer | silkroad process ID
|
started_at | timestamp with time zone | Time when the silkroad backend was started.
|
transferred_bytes | json | JSON object of key value pairs, where the key is the name of the message type, and the value is total number of bytes sent for the message types with at least one message sent |
transferred_pkts | json | JSON object of key value pairs, where the key is the name of the message type, and the value is the total number of sent messages for the message types with at least one message sent |
transferred_max | json | JSON object of key value pairs, where the key is the name of the message type, and the value is the maximum size of a message for the message types with at least one message sent |
memcxt_dpg_allocated | bigint | The mem_allocated value of the process
in DPGMemoryContext, in bytes
|
memcxt_top_allocated | bigint | The mem_allocated value of the process
in TopMemoryContext, in bytes
|
read_efd_max | bigint | Maximum reading time of the eventfd since reset,
in microseconds
|
write_efd_max | bigint | Maximum writing time of the eventfd since reset,
in microseconds
|
read_efd_total | bigint | Total reading time of the eventfd since reset,
in microseconds
|
write_efd_total | bigint | Total writing time of the eventfd since reset,
in microseconds
|
read_efd_count | bigint | Total number of reading events of the eventfd since reset
|
write_efd_count | bigint | Total number of writing events of the eventfd since reset
|
sort_time_max | bigint | Maximum time of sorting operations with the
silk_flow_control enabled (any value other than none),
in microseconds
|
sort_time_total | bigint | Total time of sorting operations with the
silk_flow_control enabled (any value other than none),
in microseconds
|
sort_time_count | bigint | Total number of the sorting operations with the
silk_flow_control enabled (any value other than none)
|
Note that read_efd_max, write_efd_max,
read_efd_total, write_efd_total,
read_efd_count, write_efd_count,
sort_time_max, sort_time_total,
and sort_time_count are only calculated
if the shardman.silk_track_time
configuration parameter is enabled.
shardman.silk_state #
The shardman.silk_state view displays the current
silkroad process state. The columns of the view are shown in
Table 53.7.
Table 53.7. shardman.silk_state Columns
| Name | Type | Description |
|---|---|---|
state | text | State of the silkroad process
|
shardman.pg_stat_silk_msg_components #
The shardman.pg_stat_silk_msg_components view displays
statistics on size of the MT_SPI message components (bytes) sent
from the node the view is on. It allows
understanding the request and parameter size, along with
the transaction state. The columns of the view are shown in
Table 53.8.
Table 53.8. shardman.pg_stat_silk_msg_components Columns
| Name | Type | Description |
|---|---|---|
msgs_sent | bigint | Total number of sent MT_SPI messages
|
query_size | bigint | Query size, bytes |
param_size | bigint | Size of parameters, bytes |
num_params | bigint | Number of parameters |
transaction_state_size | bigint | Transaction state size, bytes |
silk_tracepoints_size | bigint | Tracepoint information size, bytes |
max_query_size | bigint | Maximum query size, bytes |
max_param_siz | bigint | Maximum parameter size, bytes |
max_num_params | bigint | Maximum number of parameters |
max_transaction_state_size | bigint | Maximum transaction state size, bytes |
max_silk_tracepoints_size | bigint | Maximum tracepoint information size, bytes |
stats_reset | timestamp with time zone | Time when the statistics were reset |
reg_ip and reg_port
values are not actual network addresses, but the addresses by which
the multiplexer accesses the node. They are determined during a handshake
between multiplexer nodes and are equal to the corresponding parameters of
an appropriate server in the
pg_foreign_server
table.
All the read_ev_active values are
true and all the
write_ev_active values are
false when the multiplexer is in the
idle state.
Postgres Pro Shardman has a list of global views based on
the corresponding local views. The definition of global view columns is the
same as in its corresponding local view. Fetching from a global view returns
a union of rows from the corresponding local views. The rows are fetched
from each of their cluster nodes. Another difference is that the global views
have an added column rgid. The rgid
value shows the replication group ID of the cluster node from which a row
is fetched.
Below is the list of the global views with links to their corresponding local views:
Table 53.9. Silk-related global and local views
| Global view | Local view | Description |
|---|---|---|
shardman.gv_silk_routes | shardman.silk_routes | One row showing the current snapshot of the multiplexer routing table. |
shardman.gv_silk_connects | shardman.silk_connects | One row showing the current list of multiplexer connects. |
shardman.gv_silk_backends | shardman.silk_backends | One row showing the current list of processes of two
kinds: backends that serve client connections and
silkworm multiplexer workers, which interact with
the multiplexer. |
shardman.gv_silk_pending_jobs | shardman.silk_pending_jobs | One row showing the current list of routes in the queue of multiplexer jobs that are not assigned to workers yet. |
shardman.gv_silk_routing | shardman.silk_routing | One row showing the results of the shardman.silk_routing
function. |
shardman.gv_stat_silk_msg_components | sshardman.pg_stat_silk_msg_components | One row showing the statistics on size of the
MT_SPI message components.
|
shardman.silk_statinfo_reset()
#
Resets the values of the metrics with prefix transferred_
and time-based metrics (with prefixes read_efd_,
write_efd_, and sort_time_)
in the shardman.silk_statinfo
view.
shardman.silk_routing()
#
Retrieves the results of the multiplexer silk_connects,
silk_backends,
and silk_routes functions.
shardman.silk_rbc_snap()
#
Retrieves a consistent snapshot of all the connects, backends and
routes that can be used by silk_connects, silk_backends,
and silk_routes functions.