aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/pgoutput/pgoutput.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/pgoutput/pgoutput.c')
-rw-r--r--src/backend/replication/pgoutput/pgoutput.c54
1 files changed, 6 insertions, 48 deletions
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index e4938d8888f..73b080060da 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -92,8 +92,6 @@ static void send_relation_and_attrs(Relation relation, TransactionId xid,
static void send_repl_origin(LogicalDecodingContext *ctx,
RepOriginId origin_id, XLogRecPtr origin_lsn,
bool send_origin);
-static void update_replication_progress(LogicalDecodingContext *ctx,
- bool skipped_xact);
/*
* Only 3 publication actions are used for row filtering ("insert", "update",
@@ -586,7 +584,7 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
* from this transaction has been sent to the downstream.
*/
sent_begin_txn = txndata->sent_begin_txn;
- update_replication_progress(ctx, !sent_begin_txn);
+ OutputPluginUpdateProgress(ctx, !sent_begin_txn);
pfree(txndata);
txn->output_plugin_private = NULL;
@@ -625,7 +623,7 @@ static void
pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
XLogRecPtr prepare_lsn)
{
- update_replication_progress(ctx, false);
+ OutputPluginUpdateProgress(ctx, false);
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
@@ -639,7 +637,7 @@ static void
pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
XLogRecPtr commit_lsn)
{
- update_replication_progress(ctx, false);
+ OutputPluginUpdateProgress(ctx, false);
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
@@ -655,7 +653,7 @@ pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
XLogRecPtr prepare_end_lsn,
TimestampTz prepare_time)
{
- update_replication_progress(ctx, false);
+ OutputPluginUpdateProgress(ctx, false);
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
@@ -1401,8 +1399,6 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
TupleTableSlot *old_slot = NULL;
TupleTableSlot *new_slot = NULL;
- update_replication_progress(ctx, false);
-
if (!is_publishable_relation(relation))
return;
@@ -1637,8 +1633,6 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
Oid *relids;
TransactionId xid = InvalidTransactionId;
- update_replication_progress(ctx, false);
-
/* Remember the xid for the change in streaming mode. See pgoutput_change. */
if (in_streaming)
xid = change->txn->xid;
@@ -1702,8 +1696,6 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
TransactionId xid = InvalidTransactionId;
- update_replication_progress(ctx, false);
-
if (!data->messages)
return;
@@ -1903,7 +1895,7 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
Assert(!in_streaming);
Assert(rbtxn_is_streamed(txn));
- update_replication_progress(ctx, false);
+ OutputPluginUpdateProgress(ctx, false);
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
@@ -1924,7 +1916,7 @@ pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
{
Assert(rbtxn_is_streamed(txn));
- update_replication_progress(ctx, false);
+ OutputPluginUpdateProgress(ctx, false);
OutputPluginPrepareWrite(ctx, true);
logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
OutputPluginWrite(ctx, true);
@@ -2424,37 +2416,3 @@ send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id,
}
}
}
-
-/*
- * Try to update progress and send a keepalive message if too many changes were
- * processed.
- *
- * For a large transaction, if we don't send any change to the downstream for a
- * long time (exceeds the wal_receiver_timeout of standby) then it can timeout.
- * This can happen when all or most of the changes are either not published or
- * got filtered out.
- */
-static void
-update_replication_progress(LogicalDecodingContext *ctx, bool skipped_xact)
-{
- static int changes_count = 0;
-
- /*
- * We don't want to try sending a keepalive message after processing each
- * change as that can have overhead. Tests revealed that there is no
- * noticeable overhead in doing it after continuously processing 100 or so
- * changes.
- */
-#define CHANGES_THRESHOLD 100
-
- /*
- * If we are at the end of transaction LSN, update progress tracking.
- * Otherwise, after continuously processing CHANGES_THRESHOLD changes, we
- * try to send a keepalive message if required.
- */
- if (ctx->end_xact || ++changes_count >= CHANGES_THRESHOLD)
- {
- OutputPluginUpdateProgress(ctx, skipped_xact);
- changes_count = 0;
- }
-}