diff options
Diffstat (limited to 'src/backend/replication/pgoutput/pgoutput.c')
-rw-r--r-- | src/backend/replication/pgoutput/pgoutput.c | 141 |
1 files changed, 133 insertions, 8 deletions
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 893833ea83c..20d0b1e1253 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -183,6 +183,36 @@ typedef struct RelationSyncEntry MemoryContext entry_cxt; } RelationSyncEntry; +/* + * Maintain a per-transaction level variable to track whether the transaction + * has sent BEGIN. BEGIN is only sent when the first change in a transaction + * is processed. This makes it possible to skip sending a pair of BEGIN/COMMIT + * messages for empty transactions which saves network bandwidth. + * + * This optimization is not used for prepared transactions because if the + * WALSender restarts after prepare of a transaction and before commit prepared + * of the same transaction then we won't be able to figure out if we have + * skipped sending BEGIN/PREPARE of a transaction as it was empty. This is + * because we would have lost the in-memory txndata information that was + * present prior to the restart. This will result in sending a spurious + * COMMIT PREPARED without a corresponding prepared transaction at the + * downstream which would lead to an error when it tries to process it. + * + * XXX We could achieve this optimization by changing protocol to send + * additional information so that downstream can detect that the corresponding + * prepare has not been sent. However, adding such a check for every + * transaction in the downstream could be costly so we might want to do it + * optionally. + * + * We also don't have this optimization for streamed transactions because + * they can contain prepared transactions. + */ +typedef struct PGOutputTxnData +{ + bool sent_begin_txn; /* flag indicating whether BEGIN has + * been sent */ +} PGOutputTxnData; + /* Map used to remember which relation schemas we sent. */ static HTAB *RelationSyncCache = NULL; @@ -488,15 +518,41 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, } /* - * BEGIN callback + * BEGIN callback. + * + * Don't send the BEGIN message here instead postpone it until the first + * change. In logical replication, a common scenario is to replicate a set of + * tables (instead of all tables) and transactions whose changes were on + * the table(s) that are not published will produce empty transactions. These + * empty transactions will send BEGIN and COMMIT messages to subscribers, + * using bandwidth on something with little/no use for logical replication. */ static void -pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) +pgoutput_begin_txn(LogicalDecodingContext * ctx, ReorderBufferTXN * txn) +{ + PGOutputTxnData *txndata = MemoryContextAllocZero(ctx->context, + sizeof(PGOutputTxnData)); + + txn->output_plugin_private = txndata; +} + +/* + * Send BEGIN. + * + * This is called while processing the first change of the transaction. + */ +static void +pgoutput_send_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) { bool send_replication_origin = txn->origin_id != InvalidRepOriginId; + PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private; + + Assert(txndata); + Assert(!txndata->sent_begin_txn); OutputPluginPrepareWrite(ctx, !send_replication_origin); logicalrep_write_begin(ctx->out, txn); + txndata->sent_begin_txn = true; send_repl_origin(ctx, txn->origin_id, txn->origin_lsn, send_replication_origin); @@ -511,7 +567,25 @@ static void pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn) { - OutputPluginUpdateProgress(ctx); + PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private; + bool sent_begin_txn; + + Assert(txndata); + + /* + * We don't need to send the commit message unless some relevant change + * from this transaction has been sent to the downstream. + */ + sent_begin_txn = txndata->sent_begin_txn; + OutputPluginUpdateProgress(ctx, !sent_begin_txn); + pfree(txndata); + txn->output_plugin_private = NULL; + + if (!sent_begin_txn) + { + elog(DEBUG1, "skipped replication of an empty transaction with XID: %u", txn->xid); + return; + } OutputPluginPrepareWrite(ctx, true); logicalrep_write_commit(ctx->out, txn, commit_lsn); @@ -542,7 +616,7 @@ static void pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn) { - OutputPluginUpdateProgress(ctx); + OutputPluginUpdateProgress(ctx, false); OutputPluginPrepareWrite(ctx, true); logicalrep_write_prepare(ctx->out, txn, prepare_lsn); @@ -556,7 +630,7 @@ static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn) { - OutputPluginUpdateProgress(ctx); + OutputPluginUpdateProgress(ctx, false); OutputPluginPrepareWrite(ctx, true); logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn); @@ -572,7 +646,7 @@ pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time) { - OutputPluginUpdateProgress(ctx); + OutputPluginUpdateProgress(ctx, false); OutputPluginPrepareWrite(ctx, true); logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn, @@ -1295,6 +1369,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change) { PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; + PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private; MemoryContext old; RelationSyncEntry *relentry; TransactionId xid = InvalidTransactionId; @@ -1371,6 +1446,16 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, break; /* + * Send BEGIN if we haven't yet. + * + * We send the BEGIN message after ensuring that we will actually + * send the change. This avoids sending a pair of BEGIN/COMMIT + * messages for empty transactions. + */ + if (txndata && !txndata->sent_begin_txn) + pgoutput_send_begin(ctx, txn); + + /* * Schema should be sent using the original relation because it * also sends the ancestor's relation. */ @@ -1420,6 +1505,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, relentry, &action)) break; + /* Send BEGIN if we haven't yet */ + if (txndata && !txndata->sent_begin_txn) + pgoutput_send_begin(ctx, txn); + maybe_send_schema(ctx, change, relation, relentry); OutputPluginPrepareWrite(ctx, true); @@ -1480,6 +1569,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, relentry, &action)) break; + /* Send BEGIN if we haven't yet */ + if (txndata && !txndata->sent_begin_txn) + pgoutput_send_begin(ctx, txn); + maybe_send_schema(ctx, change, relation, relentry); OutputPluginPrepareWrite(ctx, true); @@ -1510,6 +1603,7 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], ReorderBufferChange *change) { PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; + PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private; MemoryContext old; RelationSyncEntry *relentry; int i; @@ -1548,6 +1642,11 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, continue; relids[nrelids++] = relid; + + /* Send BEGIN if we haven't yet */ + if (txndata && !txndata->sent_begin_txn) + pgoutput_send_begin(ctx, txn); + maybe_send_schema(ctx, change, relation, relentry); } @@ -1585,6 +1684,19 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, if (in_streaming) xid = txn->xid; + /* + * Output BEGIN if we haven't yet. Avoid for non-transactional + * messages. + */ + if (transactional) + { + PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private; + + /* Send BEGIN if we haven't yet */ + if (txndata && !txndata->sent_begin_txn) + pgoutput_send_begin(ctx, txn); + } + OutputPluginPrepareWrite(ctx, true); logicalrep_write_message(ctx->out, xid, @@ -1629,6 +1741,19 @@ pgoutput_sequence(LogicalDecodingContext *ctx, if (!relentry->pubactions.pubsequence) return; + /* + * Output BEGIN if we haven't yet. Avoid for non-transactional + * sequence changes. + */ + if (transactional) + { + PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private; + + /* Send BEGIN if we haven't yet */ + if (txndata && !txndata->sent_begin_txn) + pgoutput_send_begin(ctx, txn); + } + OutputPluginPrepareWrite(ctx, true); logicalrep_write_sequence(ctx->out, relation, @@ -1799,7 +1924,7 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx, Assert(!in_streaming); Assert(rbtxn_is_streamed(txn)); - OutputPluginUpdateProgress(ctx); + OutputPluginUpdateProgress(ctx, false); OutputPluginPrepareWrite(ctx, true); logicalrep_write_stream_commit(ctx->out, txn, commit_lsn); @@ -1820,7 +1945,7 @@ pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx, { Assert(rbtxn_is_streamed(txn)); - OutputPluginUpdateProgress(ctx); + OutputPluginUpdateProgress(ctx, false); OutputPluginPrepareWrite(ctx, true); logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn); OutputPluginWrite(ctx, true); |