aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/pgoutput/pgoutput.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/pgoutput/pgoutput.c')
-rw-r--r--src/backend/replication/pgoutput/pgoutput.c44
1 files changed, 42 insertions, 2 deletions
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 2ea540ce4d2..ff9cf5d406d 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -70,6 +70,7 @@ static void publication_invalidation_cb(Datum arg, int cacheid,
uint32 hashvalue);
static void send_relation_and_attrs(Relation relation, TransactionId xid,
LogicalDecodingContext *ctx);
+static void update_replication_progress(LogicalDecodingContext *ctx);
/*
* Entry in the map used to remember which relation schemas we sent.
@@ -381,7 +382,7 @@ static void
pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
XLogRecPtr commit_lsn)
{
- OutputPluginUpdateProgress(ctx);
+ update_replication_progress(ctx);
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_commit(ctx->out, txn, commit_lsn);
@@ -535,6 +536,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
TransactionId xid = InvalidTransactionId;
Relation ancestor = NULL;
+ update_replication_progress(ctx);
+
if (!is_publishable_relation(relation))
return;
@@ -677,6 +680,8 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
Oid *relids;
TransactionId xid = InvalidTransactionId;
+ update_replication_progress(ctx);
+
/* Remember the xid for the change in streaming mode. See pgoutput_change. */
if (in_streaming)
xid = change->txn->xid;
@@ -735,6 +740,8 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
TransactionId xid = InvalidTransactionId;
+ update_replication_progress(ctx);
+
if (!data->messages)
return;
@@ -921,7 +928,7 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
Assert(!in_streaming);
Assert(rbtxn_is_streamed(txn));
- OutputPluginUpdateProgress(ctx);
+ update_replication_progress(ctx);
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
@@ -1304,3 +1311,36 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue)
entry->pubactions.pubtruncate = false;
}
}
+
+/*
+ * Try to update progress and send a keepalive message if too many changes were
+ * processed.
+ *
+ * For a large transaction, if we don't send any change to the downstream for a
+ * long time (exceeds the wal_receiver_timeout of standby) then it can timeout.
+ * This can happen when all or most of the changes are not published.
+ */
+static void
+update_replication_progress(LogicalDecodingContext *ctx)
+{
+ static int changes_count = 0;
+
+ /*
+ * We don't want to try sending a keepalive message after processing each
+ * change as that can have overhead. Tests revealed that there is no
+ * noticeable overhead in doing it after continuously processing 100 or so
+ * changes.
+ */
+#define CHANGES_THRESHOLD 100
+
+ /*
+ * If we are at the end of transaction LSN, update progress tracking.
+ * Otherwise, after continuously processing CHANGES_THRESHOLD changes, we
+ * try to send a keepalive message if required.
+ */
+ if (ctx->end_xact || ++changes_count >= CHANGES_THRESHOLD)
+ {
+ OutputPluginUpdateProgress(ctx);
+ changes_count = 0;
+ }
+}