aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/replication/logical/logical.c27
-rw-r--r--src/backend/replication/pgoutput/pgoutput.c54
-rw-r--r--src/backend/replication/walsender.c24
-rw-r--r--src/include/replication/logical.h2
4 files changed, 98 insertions, 9 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 788769dd738..625a7f42730 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -746,6 +746,7 @@ startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool i
/* set output state */
ctx->accept_writes = false;
+ ctx->end_xact = false;
/* do the actual work: call callback */
ctx->callbacks.startup_cb(ctx, opt, is_init);
@@ -773,6 +774,7 @@ shutdown_cb_wrapper(LogicalDecodingContext *ctx)
/* set output state */
ctx->accept_writes = false;
+ ctx->end_xact = false;
/* do the actual work: call callback */
ctx->callbacks.shutdown_cb(ctx);
@@ -808,6 +810,7 @@ begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
ctx->accept_writes = true;
ctx->write_xid = txn->xid;
ctx->write_location = txn->first_lsn;
+ ctx->end_xact = false;
/* do the actual work: call callback */
ctx->callbacks.begin_cb(ctx, txn);
@@ -839,6 +842,7 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
ctx->accept_writes = true;
ctx->write_xid = txn->xid;
ctx->write_location = txn->end_lsn; /* points to the end of the record */
+ ctx->end_xact = true;
/* do the actual work: call callback */
ctx->callbacks.commit_cb(ctx, txn, commit_lsn);
@@ -879,6 +883,7 @@ begin_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
ctx->accept_writes = true;
ctx->write_xid = txn->xid;
ctx->write_location = txn->first_lsn;
+ ctx->end_xact = false;
/*
* If the plugin supports two-phase commits then begin prepare callback is
@@ -923,6 +928,7 @@ prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
ctx->accept_writes = true;
ctx->write_xid = txn->xid;
ctx->write_location = txn->end_lsn; /* points to the end of the record */
+ ctx->end_xact = true;
/*
* If the plugin supports two-phase commits then prepare callback is
@@ -967,6 +973,7 @@ commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
ctx->accept_writes = true;
ctx->write_xid = txn->xid;
ctx->write_location = txn->end_lsn; /* points to the end of the record */
+ ctx->end_xact = true;
/*
* If the plugin support two-phase commits then commit prepared callback
@@ -1012,6 +1019,7 @@ rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
ctx->accept_writes = true;
ctx->write_xid = txn->xid;
ctx->write_location = txn->end_lsn; /* points to the end of the record */
+ ctx->end_xact = true;
/*
* If the plugin support two-phase commits then rollback prepared callback
@@ -1062,6 +1070,8 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
*/
ctx->write_location = change->lsn;
+ ctx->end_xact = false;
+
ctx->callbacks.change_cb(ctx, txn, relation, change);
/* Pop the error context stack */
@@ -1102,6 +1112,8 @@ truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
*/
ctx->write_location = change->lsn;
+ ctx->end_xact = false;
+
ctx->callbacks.truncate_cb(ctx, txn, nrelations, relations, change);
/* Pop the error context stack */
@@ -1129,6 +1141,7 @@ filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, TransactionId xid,
/* set output state */
ctx->accept_writes = false;
+ ctx->end_xact = false;
/* do the actual work: call callback */
ret = ctx->callbacks.filter_prepare_cb(ctx, xid, gid);
@@ -1159,6 +1172,7 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
/* set output state */
ctx->accept_writes = false;
+ ctx->end_xact = false;
/* do the actual work: call callback */
ret = ctx->callbacks.filter_by_origin_cb(ctx, origin_id);
@@ -1196,6 +1210,7 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
ctx->accept_writes = true;
ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
ctx->write_location = message_lsn;
+ ctx->end_xact = false;
/* do the actual work: call callback */
ctx->callbacks.message_cb(ctx, txn, message_lsn, transactional, prefix,
@@ -1239,6 +1254,8 @@ stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
*/
ctx->write_location = first_lsn;
+ ctx->end_xact = false;
+
/* in streaming mode, stream_start_cb is required */
if (ctx->callbacks.stream_start_cb == NULL)
ereport(ERROR,
@@ -1286,6 +1303,8 @@ stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
*/
ctx->write_location = last_lsn;
+ ctx->end_xact = false;
+
/* in streaming mode, stream_stop_cb is required */
if (ctx->callbacks.stream_stop_cb == NULL)
ereport(ERROR,
@@ -1325,6 +1344,7 @@ stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
ctx->accept_writes = true;
ctx->write_xid = txn->xid;
ctx->write_location = abort_lsn;
+ ctx->end_xact = true;
/* in streaming mode, stream_abort_cb is required */
if (ctx->callbacks.stream_abort_cb == NULL)
@@ -1369,6 +1389,7 @@ stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
ctx->accept_writes = true;
ctx->write_xid = txn->xid;
ctx->write_location = txn->end_lsn;
+ ctx->end_xact = true;
/* in streaming mode with two-phase commits, stream_prepare_cb is required */
if (ctx->callbacks.stream_prepare_cb == NULL)
@@ -1409,6 +1430,7 @@ stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
ctx->accept_writes = true;
ctx->write_xid = txn->xid;
ctx->write_location = txn->end_lsn;
+ ctx->end_xact = true;
/* in streaming mode, stream_commit_cb is required */
if (ctx->callbacks.stream_commit_cb == NULL)
@@ -1457,6 +1479,8 @@ stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
*/
ctx->write_location = change->lsn;
+ ctx->end_xact = false;
+
/* in streaming mode, stream_change_cb is required */
if (ctx->callbacks.stream_change_cb == NULL)
ereport(ERROR,
@@ -1501,6 +1525,7 @@ stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
ctx->accept_writes = true;
ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
ctx->write_location = message_lsn;
+ ctx->end_xact = false;
/* do the actual work: call callback */
ctx->callbacks.stream_message_cb(ctx, txn, message_lsn, transactional, prefix,
@@ -1549,6 +1574,8 @@ stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
*/
ctx->write_location = change->lsn;
+ ctx->end_xact = false;
+
ctx->callbacks.stream_truncate_cb(ctx, txn, nrelations, relations, change);
/* Pop the error context stack */
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index b197bfd565d..406ad84e1d6 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -91,6 +91,8 @@ static void send_relation_and_attrs(Relation relation, TransactionId xid,
static void send_repl_origin(LogicalDecodingContext *ctx,
RepOriginId origin_id, XLogRecPtr origin_lsn,
bool send_origin);
+static void update_replication_progress(LogicalDecodingContext *ctx,
+ bool skipped_xact);
/*
* Only 3 publication actions are used for row filtering ("insert", "update",
@@ -558,7 +560,7 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
* from this transaction has been sent to the downstream.
*/
sent_begin_txn = txndata->sent_begin_txn;
- OutputPluginUpdateProgress(ctx, !sent_begin_txn);
+ update_replication_progress(ctx, !sent_begin_txn);
pfree(txndata);
txn->output_plugin_private = NULL;
@@ -597,7 +599,7 @@ static void
pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
XLogRecPtr prepare_lsn)
{
- OutputPluginUpdateProgress(ctx, false);
+ update_replication_progress(ctx, false);
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
@@ -611,7 +613,7 @@ static void
pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
XLogRecPtr commit_lsn)
{
- OutputPluginUpdateProgress(ctx, false);
+ update_replication_progress(ctx, false);
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
@@ -627,7 +629,7 @@ pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
XLogRecPtr prepare_end_lsn,
TimestampTz prepare_time)
{
- OutputPluginUpdateProgress(ctx, false);
+ update_replication_progress(ctx, false);
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
@@ -1360,6 +1362,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
TupleTableSlot *old_slot = NULL;
TupleTableSlot *new_slot = NULL;
+ update_replication_progress(ctx, false);
+
if (!is_publishable_relation(relation))
return;
@@ -1592,6 +1596,8 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
Oid *relids;
TransactionId xid = InvalidTransactionId;
+ update_replication_progress(ctx, false);
+
/* Remember the xid for the change in streaming mode. See pgoutput_change. */
if (in_streaming)
xid = change->txn->xid;
@@ -1655,6 +1661,8 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
TransactionId xid = InvalidTransactionId;
+ update_replication_progress(ctx, false);
+
if (!data->messages)
return;
@@ -1847,7 +1855,7 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
Assert(!in_streaming);
Assert(rbtxn_is_streamed(txn));
- OutputPluginUpdateProgress(ctx, false);
+ update_replication_progress(ctx, false);
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
@@ -1868,7 +1876,7 @@ pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
{
Assert(rbtxn_is_streamed(txn));
- OutputPluginUpdateProgress(ctx, false);
+ update_replication_progress(ctx, false);
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
OutputPluginWrite(ctx, true);
@@ -2361,3 +2369,37 @@ send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id,
}
}
}
+
+/*
+ * Try to update progress and send a keepalive message if too many changes were
+ * processed.
+ *
+ * For a large transaction, if we don't send any change to the downstream for a
+ * long time (exceeds the wal_receiver_timeout of standby) then it can timeout.
+ * This can happen when all or most of the changes are either not published or
+ * got filtered out.
+ */
+static void
+update_replication_progress(LogicalDecodingContext *ctx, bool skipped_xact)
+{
+ static int changes_count = 0;
+
+ /*
+ * We don't want to try sending a keepalive message after processing each
+ * change as that can have overhead. Tests revealed that there is no
+ * noticeable overhead in doing it after continuously processing 100 or so
+ * changes.
+ */
+#define CHANGES_THRESHOLD 100
+
+ /*
+ * If we are at the end of transaction LSN, update progress tracking.
+ * Otherwise, after continuously processing CHANGES_THRESHOLD changes, we
+ * try to send a keepalive message if required.
+ */
+ if (ctx->end_xact || ++changes_count >= CHANGES_THRESHOLD)
+ {
+ OutputPluginUpdateProgress(ctx, skipped_xact);
+ changes_count = 0;
+ }
+}
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 63a818140bd..c6c196b2fab 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1482,14 +1482,20 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId
{
static TimestampTz sendTime = 0;
TimestampTz now = GetCurrentTimestamp();
+ bool pending_writes = false;
+ bool end_xact = ctx->end_xact;
/*
* Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
* avoid flooding the lag tracker when we commit frequently.
+ *
+ * We don't have a mechanism to get the ack for any LSN other than end
+ * xact LSN from the downstream. So, we track lag only for end of
+ * transaction LSN.
*/
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
- if (TimestampDifferenceExceeds(sendTime, now,
- WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
+ if (end_xact && TimestampDifferenceExceeds(sendTime, now,
+ WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
{
LagTrackerWrite(lsn, now);
sendTime = now;
@@ -1515,8 +1521,20 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId
/* If we have pending write here, make sure it's actually flushed */
if (pq_is_send_pending())
- ProcessPendingWrites();
+ pending_writes = true;
}
+
+ /*
+ * Process pending writes if any or try to send a keepalive if required.
+ * We don't need to try sending keep alive messages at the transaction end
+ * as that will be done at a later point in time. This is required only
+ * for large transactions where we don't send any changes to the
+ * downstream and the receiver can timeout due to that.
+ */
+ if (pending_writes || (!end_xact &&
+ now >= TimestampTzPlusMilliseconds(last_reply_timestamp,
+ wal_sender_timeout / 2)))
+ ProcessPendingWrites();
}
/*
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index a6ef16ad5b1..edadacd5893 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -107,6 +107,8 @@ typedef struct LogicalDecodingContext
bool prepared_write;
XLogRecPtr write_location;
TransactionId write_xid;
+ /* Are we processing the end LSN of a transaction? */
+ bool end_xact;
} LogicalDecodingContext;