diff options
author | Amit Kapila <akapila@postgresql.org> | 2022-03-30 07:41:05 +0530 |
---|---|---|
committer | Amit Kapila <akapila@postgresql.org> | 2022-03-30 07:41:05 +0530 |
commit | d5a9d86d8ffcadc52ff3729cd00fbd83bc38643c (patch) | |
tree | 2b5497206cf558509284df8817bd350e4d27dbb1 /src/backend/replication/pgoutput/pgoutput.c | |
parent | ad4f2c47de440cdd5d58cf9ffea09afa0da04d6c (diff) | |
download | postgresql-d5a9d86d8ffcadc52ff3729cd00fbd83bc38643c.tar.gz postgresql-d5a9d86d8ffcadc52ff3729cd00fbd83bc38643c.zip |
Skip empty transactions for logical replication.
The current logical replication behavior is to send every transaction to
subscriber even if the transaction is empty. This can happen because
transaction doesn't contain changes from the selected publications or all
the changes got filtered. It is a waste of CPU cycles and network
bandwidth to build/transmit these empty transactions.
This patch addresses the above problem by postponing the BEGIN message
until the first change is sent. While processing a COMMIT message, if
there was no other change for that transaction, do not send the COMMIT
message. This allows us to skip sending BEGIN/COMMIT messages for empty
transactions.
When skipping empty transactions in synchronous replication mode, we send
a keepalive message to avoid delaying such transactions.
Author: Ajin Cherian, Hou Zhijie, Euler Taveira
Reviewed-by: Peter Smith, Takamichi Osumi, Shi Yu, Masahiko Sawada, Greg Nancarrow, Vignesh C, Amit Kapila
Discussion: https://postgr.es/m/CAMkU=1yohp9-dv48FLoSPrMqYEyyS5ZWkaZGD41RJr10xiNo_Q@mail.gmail.com
Diffstat (limited to 'src/backend/replication/pgoutput/pgoutput.c')
-rw-r--r-- | src/backend/replication/pgoutput/pgoutput.c | 141 |
1 files changed, 133 insertions, 8 deletions
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 893833ea83c..20d0b1e1253 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -183,6 +183,36 @@ typedef struct RelationSyncEntry MemoryContext entry_cxt; } RelationSyncEntry; +/* + * Maintain a per-transaction level variable to track whether the transaction + * has sent BEGIN. BEGIN is only sent when the first change in a transaction + * is processed. This makes it possible to skip sending a pair of BEGIN/COMMIT + * messages for empty transactions which saves network bandwidth. + * + * This optimization is not used for prepared transactions because if the + * WALSender restarts after prepare of a transaction and before commit prepared + * of the same transaction then we won't be able to figure out if we have + * skipped sending BEGIN/PREPARE of a transaction as it was empty. This is + * because we would have lost the in-memory txndata information that was + * present prior to the restart. This will result in sending a spurious + * COMMIT PREPARED without a corresponding prepared transaction at the + * downstream which would lead to an error when it tries to process it. + * + * XXX We could achieve this optimization by changing protocol to send + * additional information so that downstream can detect that the corresponding + * prepare has not been sent. However, adding such a check for every + * transaction in the downstream could be costly so we might want to do it + * optionally. + * + * We also don't have this optimization for streamed transactions because + * they can contain prepared transactions. + */ +typedef struct PGOutputTxnData +{ + bool sent_begin_txn; /* flag indicating whether BEGIN has + * been sent */ +} PGOutputTxnData; + /* Map used to remember which relation schemas we sent. */ static HTAB *RelationSyncCache = NULL; @@ -488,15 +518,41 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, } /* - * BEGIN callback + * BEGIN callback. + * + * Don't send the BEGIN message here instead postpone it until the first + * change. In logical replication, a common scenario is to replicate a set of + * tables (instead of all tables) and transactions whose changes were on + * the table(s) that are not published will produce empty transactions. These + * empty transactions will send BEGIN and COMMIT messages to subscribers, + * using bandwidth on something with little/no use for logical replication. */ static void -pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) +pgoutput_begin_txn(LogicalDecodingContext * ctx, ReorderBufferTXN * txn) +{ + PGOutputTxnData *txndata = MemoryContextAllocZero(ctx->context, + sizeof(PGOutputTxnData)); + + txn->output_plugin_private = txndata; +} + +/* + * Send BEGIN. + * + * This is called while processing the first change of the transaction. + */ +static void +pgoutput_send_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) { bool send_replication_origin = txn->origin_id != InvalidRepOriginId; + PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private; + + Assert(txndata); + Assert(!txndata->sent_begin_txn); OutputPluginPrepareWrite(ctx, !send_replication_origin); logicalrep_write_begin(ctx->out, txn); + txndata->sent_begin_txn = true; send_repl_origin(ctx, txn->origin_id, txn->origin_lsn, send_replication_origin); @@ -511,7 +567,25 @@ static void pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn) { - OutputPluginUpdateProgress(ctx); + PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private; + bool sent_begin_txn; + + Assert(txndata); + + /* + * We don't need to send the commit message unless some relevant change + * from this transaction has been sent to the downstream. + */ + sent_begin_txn = txndata->sent_begin_txn; + OutputPluginUpdateProgress(ctx, !sent_begin_txn); + pfree(txndata); + txn->output_plugin_private = NULL; + + if (!sent_begin_txn) + { + elog(DEBUG1, "skipped replication of an empty transaction with XID: %u", txn->xid); + return; + } OutputPluginPrepareWrite(ctx, true); logicalrep_write_commit(ctx->out, txn, commit_lsn); @@ -542,7 +616,7 @@ static void pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn) { - OutputPluginUpdateProgress(ctx); + OutputPluginUpdateProgress(ctx, false); OutputPluginPrepareWrite(ctx, true); logicalrep_write_prepare(ctx->out, txn, prepare_lsn); @@ -556,7 +630,7 @@ static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn) { - OutputPluginUpdateProgress(ctx); + OutputPluginUpdateProgress(ctx, false); OutputPluginPrepareWrite(ctx, true); logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn); @@ -572,7 +646,7 @@ pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time) { - OutputPluginUpdateProgress(ctx); + OutputPluginUpdateProgress(ctx, false); OutputPluginPrepareWrite(ctx, true); logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn, @@ -1295,6 +1369,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change) { PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; + PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private; MemoryContext old; RelationSyncEntry *relentry; TransactionId xid = InvalidTransactionId; @@ -1371,6 +1446,16 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, break; /* + * Send BEGIN if we haven't yet. + * + * We send the BEGIN message after ensuring that we will actually + * send the change. This avoids sending a pair of BEGIN/COMMIT + * messages for empty transactions. + */ + if (txndata && !txndata->sent_begin_txn) + pgoutput_send_begin(ctx, txn); + + /* * Schema should be sent using the original relation because it * also sends the ancestor's relation. */ @@ -1420,6 +1505,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, relentry, &action)) break; + /* Send BEGIN if we haven't yet */ + if (txndata && !txndata->sent_begin_txn) + pgoutput_send_begin(ctx, txn); + maybe_send_schema(ctx, change, relation, relentry); OutputPluginPrepareWrite(ctx, true); @@ -1480,6 +1569,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, relentry, &action)) break; + /* Send BEGIN if we haven't yet */ + if (txndata && !txndata->sent_begin_txn) + pgoutput_send_begin(ctx, txn); + maybe_send_schema(ctx, change, relation, relentry); OutputPluginPrepareWrite(ctx, true); @@ -1510,6 +1603,7 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change) { PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; + PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private; MemoryContext old; RelationSyncEntry *relentry; int i; @@ -1548,6 +1642,11 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, continue; relids[nrelids++] = relid; + + /* Send BEGIN if we haven't yet */ + if (txndata && !txndata->sent_begin_txn) + pgoutput_send_begin(ctx, txn); + maybe_send_schema(ctx, change, relation, relentry); } @@ -1585,6 +1684,19 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (in_streaming) xid = txn->xid; + /* + * Output BEGIN if we haven't yet. Avoid for non-transactional + * messages. + */ + if (transactional) + { + PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private; + + /* Send BEGIN if we haven't yet */ + if (txndata && !txndata->sent_begin_txn) + pgoutput_send_begin(ctx, txn); + } + OutputPluginPrepareWrite(ctx, true); logicalrep_write_message(ctx->out, xid, @@ -1629,6 +1741,19 @@ pgoutput_sequence(LogicalDecodingContext *ctx, if (!relentry->pubactions.pubsequence) return; + /* + * Output BEGIN if we haven't yet. Avoid for non-transactional + * sequence changes. + */ + if (transactional) + { + PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private; + + /* Send BEGIN if we haven't yet */ + if (txndata && !txndata->sent_begin_txn) + pgoutput_send_begin(ctx, txn); + } + OutputPluginPrepareWrite(ctx, true); logicalrep_write_sequence(ctx->out, relation, @@ -1799,7 +1924,7 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx, Assert(!in_streaming); Assert(rbtxn_is_streamed(txn)); - OutputPluginUpdateProgress(ctx); + OutputPluginUpdateProgress(ctx, false); OutputPluginPrepareWrite(ctx, true); logicalrep_write_stream_commit(ctx->out, txn, commit_lsn); @@ -1820,7 +1945,7 @@ pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx, { Assert(rbtxn_is_streamed(txn)); - OutputPluginUpdateProgress(ctx); + OutputPluginUpdateProgress(ctx, false); OutputPluginPrepareWrite(ctx, true); logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn); OutputPluginWrite(ctx, true); |