aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/pgoutput/pgoutput.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/pgoutput/pgoutput.c')
-rw-r--r--src/backend/replication/pgoutput/pgoutput.c141
1 files changed, 133 insertions, 8 deletions
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 893833ea83c..20d0b1e1253 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -183,6 +183,36 @@ typedef struct RelationSyncEntry
MemoryContext entry_cxt;
} RelationSyncEntry;
+/*
+ * Maintain a per-transaction level variable to track whether the transaction
+ * has sent BEGIN. BEGIN is only sent when the first change in a transaction
+ * is processed. This makes it possible to skip sending a pair of BEGIN/COMMIT
+ * messages for empty transactions which saves network bandwidth.
+ *
+ * This optimization is not used for prepared transactions because if the
+ * WALSender restarts after prepare of a transaction and before commit prepared
+ * of the same transaction then we won't be able to figure out if we have
+ * skipped sending BEGIN/PREPARE of a transaction as it was empty. This is
+ * because we would have lost the in-memory txndata information that was
+ * present prior to the restart. This will result in sending a spurious
+ * COMMIT PREPARED without a corresponding prepared transaction at the
+ * downstream which would lead to an error when it tries to process it.
+ *
+ * XXX We could achieve this optimization by changing protocol to send
+ * additional information so that downstream can detect that the corresponding
+ * prepare has not been sent. However, adding such a check for every
+ * transaction in the downstream could be costly so we might want to do it
+ * optionally.
+ *
+ * We also don't have this optimization for streamed transactions because
+ * they can contain prepared transactions.
+ */
+typedef struct PGOutputTxnData
+{
+ bool sent_begin_txn; /* flag indicating whether BEGIN has
+ * been sent */
+} PGOutputTxnData;
+
/* Map used to remember which relation schemas we sent. */
static HTAB *RelationSyncCache = NULL;
@@ -488,15 +518,41 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
}
/*
- * BEGIN callback
+ * BEGIN callback.
+ *
+ * Don't send the BEGIN message here instead postpone it until the first
+ * change. In logical replication, a common scenario is to replicate a set of
+ * tables (instead of all tables) and transactions whose changes were on
+ * the table(s) that are not published will produce empty transactions. These
+ * empty transactions will send BEGIN and COMMIT messages to subscribers,
+ * using bandwidth on something with little/no use for logical replication.
*/
static void
-pgoutput_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
+pgoutput_begin_txn(LogicalDecodingContext * ctx, ReorderBufferTXN * txn)
+{
+ PGOutputTxnData *txndata = MemoryContextAllocZero(ctx->context,
+ sizeof(PGOutputTxnData));
+
+ txn->output_plugin_private = txndata;
+}
+
+/*
+ * Send BEGIN.
+ *
+ * This is called while processing the first change of the transaction.
+ */
+static void
+pgoutput_send_begin(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
{
bool send_replication_origin = txn->origin_id != InvalidRepOriginId;
+ PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
+
+ Assert(txndata);
+ Assert(!txndata->sent_begin_txn);
OutputPluginPrepareWrite(ctx, !send_replication_origin);
logicalrep_write_begin(ctx->out, txn);
+ txndata->sent_begin_txn = true;
send_repl_origin(ctx, txn->origin_id, txn->origin_lsn,
send_replication_origin);
@@ -511,7 +567,25 @@ static void
pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
XLogRecPtr commit_lsn)
{
- OutputPluginUpdateProgress(ctx);
+ PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
+ bool sent_begin_txn;
+
+ Assert(txndata);
+
+ /*
+ * We don't need to send the commit message unless some relevant change
+ * from this transaction has been sent to the downstream.
+ */
+ sent_begin_txn = txndata->sent_begin_txn;
+ OutputPluginUpdateProgress(ctx, !sent_begin_txn);
+ pfree(txndata);
+ txn->output_plugin_private = NULL;
+
+ if (!sent_begin_txn)
+ {
+ elog(DEBUG1, "skipped replication of an empty transaction with XID: %u", txn->xid);
+ return;
+ }
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_commit(ctx->out, txn, commit_lsn);
@@ -542,7 +616,7 @@ static void
pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
XLogRecPtr prepare_lsn)
{
- OutputPluginUpdateProgress(ctx);
+ OutputPluginUpdateProgress(ctx, false);
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
@@ -556,7 +630,7 @@ static void
pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
XLogRecPtr commit_lsn)
{
- OutputPluginUpdateProgress(ctx);
+ OutputPluginUpdateProgress(ctx, false);
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
@@ -572,7 +646,7 @@ pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
XLogRecPtr prepare_end_lsn,
TimestampTz prepare_time)
{
- OutputPluginUpdateProgress(ctx);
+ OutputPluginUpdateProgress(ctx, false);
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
@@ -1295,6 +1369,7 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change)
{
PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+ PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
MemoryContext old;
RelationSyncEntry *relentry;
TransactionId xid = InvalidTransactionId;
@@ -1371,6 +1446,16 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
break;
/*
+ * Send BEGIN if we haven't yet.
+ *
+ * We send the BEGIN message after ensuring that we will actually
+ * send the change. This avoids sending a pair of BEGIN/COMMIT
+ * messages for empty transactions.
+ */
+ if (txndata && !txndata->sent_begin_txn)
+ pgoutput_send_begin(ctx, txn);
+
+ /*
* Schema should be sent using the original relation because it
* also sends the ancestor's relation.
*/
@@ -1420,6 +1505,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
relentry, &action))
break;
+ /* Send BEGIN if we haven't yet */
+ if (txndata && !txndata->sent_begin_txn)
+ pgoutput_send_begin(ctx, txn);
+
maybe_send_schema(ctx, change, relation, relentry);
OutputPluginPrepareWrite(ctx, true);
@@ -1480,6 +1569,10 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
relentry, &action))
break;
+ /* Send BEGIN if we haven't yet */
+ if (txndata && !txndata->sent_begin_txn)
+ pgoutput_send_begin(ctx, txn);
+
maybe_send_schema(ctx, change, relation, relentry);
OutputPluginPrepareWrite(ctx, true);
@@ -1510,6 +1603,7 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
int nrelations, Relation relations[], ReorderBufferChange *change)
{
PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+ PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
MemoryContext old;
RelationSyncEntry *relentry;
int i;
@@ -1548,6 +1642,11 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
continue;
relids[nrelids++] = relid;
+
+ /* Send BEGIN if we haven't yet */
+ if (txndata && !txndata->sent_begin_txn)
+ pgoutput_send_begin(ctx, txn);
+
maybe_send_schema(ctx, change, relation, relentry);
}
@@ -1585,6 +1684,19 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
if (in_streaming)
xid = txn->xid;
+ /*
+ * Output BEGIN if we haven't yet. Avoid for non-transactional
+ * messages.
+ */
+ if (transactional)
+ {
+ PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
+
+ /* Send BEGIN if we haven't yet */
+ if (txndata && !txndata->sent_begin_txn)
+ pgoutput_send_begin(ctx, txn);
+ }
+
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_message(ctx->out,
xid,
@@ -1629,6 +1741,19 @@ pgoutput_sequence(LogicalDecodingContext *ctx,
if (!relentry->pubactions.pubsequence)
return;
+ /*
+ * Output BEGIN if we haven't yet. Avoid for non-transactional
+ * sequence changes.
+ */
+ if (transactional)
+ {
+ PGOutputTxnData *txndata = (PGOutputTxnData *) txn->output_plugin_private;
+
+ /* Send BEGIN if we haven't yet */
+ if (txndata && !txndata->sent_begin_txn)
+ pgoutput_send_begin(ctx, txn);
+ }
+
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_sequence(ctx->out,
relation,
@@ -1799,7 +1924,7 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
Assert(!in_streaming);
Assert(rbtxn_is_streamed(txn));
- OutputPluginUpdateProgress(ctx);
+ OutputPluginUpdateProgress(ctx, false);
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
@@ -1820,7 +1945,7 @@ pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
{
Assert(rbtxn_is_streamed(txn));
- OutputPluginUpdateProgress(ctx);
+ OutputPluginUpdateProgress(ctx, false);
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
OutputPluginWrite(ctx, true);