aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/replication/logical/logical.c10
-rw-r--r--src/backend/replication/pgoutput/pgoutput.c40
-rw-r--r--src/backend/replication/walsender.c38
-rw-r--r--src/include/replication/logical.h3
4 files changed, 85 insertions, 6 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index a0fc480646f..8b79d670012 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -624,6 +624,7 @@ startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool i
/* set output state */
ctx->accept_writes = false;
+ ctx->end_xact = false;
/* do the actual work: call callback */
ctx->callbacks.startup_cb(ctx, opt, is_init);
@@ -651,6 +652,7 @@ shutdown_cb_wrapper(LogicalDecodingContext *ctx)
/* set output state */
ctx->accept_writes = false;
+ ctx->end_xact = false;
/* do the actual work: call callback */
ctx->callbacks.shutdown_cb(ctx);
@@ -686,6 +688,7 @@ begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
ctx->accept_writes = true;
ctx->write_xid = txn->xid;
ctx->write_location = txn->first_lsn;
+ ctx->end_xact = false;
/* do the actual work: call callback */
ctx->callbacks.begin_cb(ctx, txn);
@@ -717,6 +720,7 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
ctx->accept_writes = true;
ctx->write_xid = txn->xid;
ctx->write_location = txn->end_lsn; /* points to the end of the record */
+ ctx->end_xact = true;
/* do the actual work: call callback */
ctx->callbacks.commit_cb(ctx, txn, commit_lsn);
@@ -756,6 +760,8 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
*/
ctx->write_location = change->lsn;
+ ctx->end_xact = false;
+
ctx->callbacks.change_cb(ctx, txn, relation, change);
/* Pop the error context stack */
@@ -796,6 +802,8 @@ truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
*/
ctx->write_location = change->lsn;
+ ctx->end_xact = false;
+
ctx->callbacks.truncate_cb(ctx, txn, nrelations, relations, change);
/* Pop the error context stack */
@@ -822,6 +830,7 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
/* set output state */
ctx->accept_writes = false;
+ ctx->end_xact = false;
/* do the actual work: call callback */
ret = ctx->callbacks.filter_by_origin_cb(ctx, origin_id);
@@ -859,6 +868,7 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
ctx->accept_writes = true;
ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
ctx->write_location = message_lsn;
+ ctx->end_xact = false;
/* do the actual work: call callback */
ctx->callbacks.message_cb(ctx, txn, message_lsn, transactional, prefix,
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index d317fd70063..6710f983ea4 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -50,6 +50,7 @@ static bool publications_valid;
static List *LoadPublications(List *pubnames);
static void publication_invalidation_cb(Datum arg, int cacheid,
uint32 hashvalue);
+static void update_replication_progress(LogicalDecodingContext *ctx);
/* Entry in the map used to remember which relation schemas we sent. */
typedef struct RelationSyncEntry
@@ -247,7 +248,7 @@ static void
pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
XLogRecPtr commit_lsn)
{
- OutputPluginUpdateProgress(ctx);
+ update_replication_progress(ctx);
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_commit(ctx->out, txn, commit_lsn);
@@ -309,6 +310,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
MemoryContext old;
RelationSyncEntry *relentry;
+ update_replication_progress(ctx);
+
if (!is_publishable_relation(relation))
return;
@@ -389,6 +392,8 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
int nrelids;
Oid *relids;
+ update_replication_progress(ctx);
+
old = MemoryContextSwitchTo(data->context);
relids = palloc0(nrelations * sizeof(Oid));
@@ -660,3 +665,36 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
while ((entry = (RelationSyncEntry *) hash_seq_search(&status)) != NULL)
entry->replicate_valid = false;
}
+
+/*
+ * Try to update progress and send a keepalive message if too many changes were
+ * processed.
+ *
+ * For a large transaction, if we don't send any change to the downstream for a
+ * long time (exceeds the wal_receiver_timeout of standby) then it can timeout.
+ * This can happen when all or most of the changes are not published.
+ */
+static void
+update_replication_progress(LogicalDecodingContext *ctx)
+{
+ static int changes_count = 0;
+
+ /*
+ * We don't want to try sending a keepalive message after processing each
+ * change as that can have overhead. Tests revealed that there is no
+ * noticeable overhead in doing it after continuously processing 100 or so
+ * changes.
+ */
+#define CHANGES_THRESHOLD 100
+
+ /*
+ * If we are at the end of transaction LSN, update progress tracking.
+ * Otherwise, after continuously processing CHANGES_THRESHOLD changes, we
+ * try to send a keepalive message if required.
+ */
+ if (ctx->end_xact || ++changes_count >= CHANGES_THRESHOLD)
+ {
+ OutputPluginUpdateProgress(ctx);
+ changes_count = 0;
+ }
+}
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 3698135e493..c2c0c7df430 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -244,6 +244,7 @@ static void ProcessStandbyMessage(void);
static void ProcessStandbyReplyMessage(void);
static void ProcessStandbyHSFeedbackMessage(void);
static void ProcessRepliesIfAny(void);
+static void ProcessPendingWrites(void);
static void WalSndKeepalive(bool requestReply);
static void WalSndKeepaliveIfNecessary(void);
static void WalSndCheckTimeOut(void);
@@ -1214,6 +1215,16 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
}
/* If we have pending write here, go to slow path */
+ ProcessPendingWrites();
+}
+
+/*
+ * Wait until there is no pending write. Also process replies from the other
+ * side and check timeouts during that.
+ */
+static void
+ProcessPendingWrites(void)
+{
for (;;)
{
int wakeEvents;
@@ -1273,18 +1284,35 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId
{
static TimestampTz sendTime = 0;
TimestampTz now = GetCurrentTimestamp();
+ bool end_xact = ctx->end_xact;
/*
* Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
* avoid flooding the lag tracker when we commit frequently.
+ *
+ * We don't have a mechanism to get the ack for any LSN other than end
+ * xact LSN from the downstream. So, we track lag only for end of
+ * transaction LSN.
*/
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
- if (!TimestampDifferenceExceeds(sendTime, now,
- WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
- return;
+ if (end_xact && TimestampDifferenceExceeds(sendTime, now,
+ WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
+ {
+ LagTrackerWrite(lsn, now);
+ sendTime = now;
+ }
- LagTrackerWrite(lsn, now);
- sendTime = now;
+ /*
+ * Try to send a keepalive if required. We don't need to try sending keep
+ * alive messages at the transaction end as that will be done at a later
+ * point in time. This is required only for large transactions where we
+ * don't send any changes to the downstream and the receiver can timeout
+ * due to that.
+ */
+ if (!end_xact &&
+ now >= TimestampTzPlusMilliseconds(last_reply_timestamp,
+ wal_sender_timeout / 2))
+ ProcessPendingWrites();
}
/*
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index 31c796b7651..718080d54a7 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -50,6 +50,9 @@ typedef struct LogicalDecodingContext
*/
bool fast_forward;
+ /* Are we processing the end LSN of a transaction? */
+ bool end_xact;
+
OutputPluginCallbacks callbacks;
OutputPluginOptions options;