From 216a784829c2c5f03ab0c43e009126cbb819e9b2 Mon Sep 17 00:00:00 2001 From: Amit Kapila Date: Mon, 9 Jan 2023 07:00:39 +0530 Subject: Perform apply of large transactions by parallel workers. Currently, for large transactions, the publisher sends the data in multiple streams (changes divided into chunks depending upon logical_decoding_work_mem), and then on the subscriber-side, the apply worker writes the changes into temporary files and once it receives the commit, it reads from those files and applies the entire transaction. To improve the performance of such transactions, we can instead allow them to be applied via parallel workers. In this approach, we assign a new parallel apply worker (if available) as soon as the xact's first stream is received and the leader apply worker will send changes to this new worker via shared memory. The parallel apply worker will directly apply the change instead of writing it to temporary files. However, if the leader apply worker times out while attempting to send a message to the parallel apply worker, it will switch to "partial serialize" mode - in this mode, the leader serializes all remaining changes to a file and notifies the parallel apply workers to read and apply them at the end of the transaction. We use a non-blocking way to send the messages from the leader apply worker to the parallel apply to avoid deadlocks. We keep this parallel apply assigned till the transaction commit is received and also wait for the worker to finish at commit. This preserves commit ordering and avoid writing to and reading from files in most cases. We still need to spill if there is no worker available. This patch also extends the SUBSCRIPTION 'streaming' parameter so that the user can control whether to apply the streaming transaction in a parallel apply worker or spill the change to disk. The user can set the streaming parameter to 'on/off', or 'parallel'. The parameter value 'parallel' means the streaming will be applied via a parallel apply worker, if available. The parameter value 'on' means the streaming transaction will be spilled to disk. The default value is 'off' (same as current behaviour). In addition, the patch extends the logical replication STREAM_ABORT message so that abort_lsn and abort_time can also be sent which can be used to update the replication origin in parallel apply worker when the streaming transaction is aborted. Because this message extension is needed to support parallel streaming, parallel streaming is not supported for publications on servers < PG16. Author: Hou Zhijie, Wang wei, Amit Kapila with design inputs from Sawada Masahiko Reviewed-by: Sawada Masahiko, Peter Smith, Dilip Kumar, Shi yu, Kuroda Hayato, Shveta Mallik Discussion: https://postgr.es/m/CAA4eK1+wyN6zpaHUkCLorEWNx75MG0xhMwcFhvjqm2KURZEAGw@mail.gmail.com --- doc/src/sgml/catalogs.sgml | 11 ++++++++--- doc/src/sgml/config.sgml | 28 +++++++++++++++++++++++++++- doc/src/sgml/logical-replication.sgml | 22 ++++++++++++++++++++-- doc/src/sgml/monitoring.sgml | 5 +++++ doc/src/sgml/protocol.sgml | 29 ++++++++++++++++++++++++++++- doc/src/sgml/ref/create_subscription.sgml | 24 ++++++++++++++++++++---- doc/src/sgml/system-views.sgml | 14 ++++++++++++-- 7 files changed, 120 insertions(+), 13 deletions(-) (limited to 'doc/src') diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml index 9316b811ac3..c1e4048054e 100644 --- a/doc/src/sgml/catalogs.sgml +++ b/doc/src/sgml/catalogs.sgml @@ -7913,11 +7913,16 @@ SCRAM-SHA-256$<iteration count>:&l - substream bool + substream char - If true, the subscription will allow streaming of in-progress - transactions + Controls how to handle the streaming of in-progress transactions: + f = disallow streaming of in-progress transactions, + t = spill the changes of in-progress transactions to + disk and apply at once after the transaction is committed on the + publisher and received by the subscriber, + p = apply changes directly using a parallel apply + worker if available (same as 't' if no worker is available) diff --git a/doc/src/sgml/config.sgml b/doc/src/sgml/config.sgml index 05b3862d09f..a33a93570bd 100644 --- a/doc/src/sgml/config.sgml +++ b/doc/src/sgml/config.sgml @@ -4968,7 +4968,8 @@ ANY num_sync ( num_sync ( + max_parallel_apply_workers_per_subscription (integer) + + max_parallel_apply_workers_per_subscription configuration parameter + + + + + Maximum number of parallel apply workers per subscription. This + parameter controls the amount of parallelism for streaming of + in-progress transactions with subscription parameter + streaming = parallel. + + + The parallel apply workers are taken from the pool defined by + max_logical_replication_workers. + + + The default value is 2. This parameter can only be set in the + postgresql.conf file or on the server command + line. + + + + diff --git a/doc/src/sgml/logical-replication.sgml b/doc/src/sgml/logical-replication.sgml index 7b9bb00e5ae..54f48be87f3 100644 --- a/doc/src/sgml/logical-replication.sgml +++ b/doc/src/sgml/logical-replication.sgml @@ -1501,6 +1501,16 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER might not violate any constraint. This can easily make the subscriber inconsistent. + + + When the streaming mode is parallel, the finish LSN of + failed transactions may not be logged. In that case, it may be necessary to + change the streaming mode to on or off and + cause the same conflicts again so the finish LSN of the failed transaction will + be written to the server log. For the usage of finish LSN, please refer to ALTER SUBSCRIPTION ... + SKIP. + @@ -1809,8 +1819,9 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER max_logical_replication_workers - must be set to at least the number of subscriptions (for apply workers), plus - some reserve for the table synchronization workers. + must be set to at least the number of subscriptions (for leader apply + workers), plus some reserve for the table synchronization workers and + parallel apply workers. @@ -1827,6 +1838,13 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER subscription initialization or when new tables are added. + + max_parallel_apply_workers_per_subscription + controls the amount of parallelism for streaming of in-progress + transactions with subscription parameter + streaming = parallel. + + Logical replication workers are also affected by wal_receiver_timeout, diff --git a/doc/src/sgml/monitoring.sgml b/doc/src/sgml/monitoring.sgml index 5bcba0fdec4..cf220c3bcb4 100644 --- a/doc/src/sgml/monitoring.sgml +++ b/doc/src/sgml/monitoring.sgml @@ -1858,6 +1858,11 @@ postgres 27093 0.0 0.0 30096 2752 ? Ss 11:34 0:00 postgres: ser advisory Waiting to acquire an advisory user lock. + + applytransaction + Waiting to acquire a lock on a remote transaction being applied + by a logical replication subscriber. + extend Waiting to extend a relation. diff --git a/doc/src/sgml/protocol.sgml b/doc/src/sgml/protocol.sgml index 03312e07e25..439ef675f3e 100644 --- a/doc/src/sgml/protocol.sgml +++ b/doc/src/sgml/protocol.sgml @@ -3103,7 +3103,7 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;" Protocol version. Currently versions 1, 2, - and 3 are supported. + 3, and 4 are supported. Version 2 is supported only for server version 14 @@ -3113,6 +3113,11 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;" Version 3 is supported only for server version 15 and above, and it allows streaming of two-phase commits. + + Version 4 is supported only for server version 16 + and above, and it allows streams of large in-progress transactions to + be applied in parallel. + @@ -6883,6 +6888,28 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;" + + + Int64 (XLogRecPtr) + + + The LSN of the abort. This field is available since protocol version + 4. + + + + + + Int64 (TimestampTz) + + + Abort timestamp of the transaction. The value is in number + of microseconds since PostgreSQL epoch (2000-01-01). This field is + available since protocol version 4. + + + + diff --git a/doc/src/sgml/ref/create_subscription.sgml b/doc/src/sgml/ref/create_subscription.sgml index 90bd70050dd..eba72c6af62 100644 --- a/doc/src/sgml/ref/create_subscription.sgml +++ b/doc/src/sgml/ref/create_subscription.sgml @@ -228,13 +228,29 @@ CREATE SUBSCRIPTION subscription_name - streaming (boolean) + streaming (enum) Specifies whether to enable streaming of in-progress transactions - for this subscription. By default, all transactions - are fully decoded on the publisher and only then sent to the - subscriber as a whole. + for this subscription. The default value is off, + meaning all transactions are fully decoded on the publisher and only + then sent to the subscriber as a whole. + + + + If set to on, the incoming changes are written to + temporary files and then applied only after the transaction is + committed on the publisher and received by the subscriber. + + + + If set to parallel, incoming changes are directly + applied via one of the parallel apply workers, if available. If no + parallel apply worker is free to handle streaming transactions then + the changes are written to temporary files and applied after the + transaction is committed. Note that if an error happens in a + parallel apply worker, the finish LSN of the remote transaction + might not be reported in the server log. diff --git a/doc/src/sgml/system-views.sgml b/doc/src/sgml/system-views.sgml index 143ae5b7bba..1d1b7de8f1e 100644 --- a/doc/src/sgml/system-views.sgml +++ b/doc/src/sgml/system-views.sgml @@ -1379,8 +1379,9 @@ virtualxid, spectoken, object, - userlock, or - advisory. + userlock, + advisory, or + applytransaction. (See also .) @@ -1594,6 +1595,15 @@ so the database column is meaningful for an advisory lock. + + Apply transaction locks are used in parallel mode to apply the transaction + in logical replication. The remote transaction id is displayed in the + transactionid column. The objsubid + displays the lock subtype which is 0 for the lock used to synchronize the + set of changes, and 1 for the lock used to wait for the transaction to + finish to ensure commit order. + + pg_locks provides a global view of all locks in the database cluster, not only those relevant to the current database. -- cgit v1.2.3