diff options
Diffstat (limited to 'src/backend/replication/pgoutput/pgoutput.c')
-rw-r--r-- | src/backend/replication/pgoutput/pgoutput.c | 54 |
1 files changed, 6 insertions, 48 deletions
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index e4938d8888f..73b080060da 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -92,8 +92,6 @@ static void send_relation_and_attrs(Relation relation, TransactionId xid, static void send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id, XLogRecPtr origin_lsn, bool send_origin); -static void update_replication_progress(LogicalDecodingContext *ctx, - bool skipped_xact); /* * Only 3 publication actions are used for row filtering ("insert", "update", @@ -586,7 +584,7 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, * from this transaction has been sent to the downstream. */ sent_begin_txn = txndata->sent_begin_txn; - update_replication_progress(ctx, !sent_begin_txn); + OutputPluginUpdateProgress(ctx, !sent_begin_txn); pfree(txndata); txn->output_plugin_private = NULL; @@ -625,7 +623,7 @@ static void pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr prepare_lsn) { - update_replication_progress(ctx, false); + OutputPluginUpdateProgress(ctx, false); OutputPluginPrepareWrite(ctx, true); logicalrep_write_prepare(ctx->out, txn, prepare_lsn); @@ -639,7 +637,7 @@ static void pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn) { - update_replication_progress(ctx, false); + OutputPluginUpdateProgress(ctx, false); OutputPluginPrepareWrite(ctx, true); logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn); @@ -655,7 +653,7 @@ pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx, XLogRecPtr prepare_end_lsn, TimestampTz prepare_time) { - update_replication_progress(ctx, false); + OutputPluginUpdateProgress(ctx, false); OutputPluginPrepareWrite(ctx, true); logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn, @@ -1401,8 +1399,6 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, TupleTableSlot *old_slot = NULL; TupleTableSlot *new_slot = NULL; - update_replication_progress(ctx, false); - if (!is_publishable_relation(relation)) return; @@ -1637,8 +1633,6 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Oid *relids; TransactionId xid = InvalidTransactionId; - update_replication_progress(ctx, false); - /* Remember the xid for the change in streaming mode. See pgoutput_change. */ if (in_streaming) xid = change->txn->xid; @@ -1702,8 +1696,6 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; TransactionId xid = InvalidTransactionId; - update_replication_progress(ctx, false); - if (!data->messages) return; @@ -1903,7 +1895,7 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx, Assert(!in_streaming); Assert(rbtxn_is_streamed(txn)); - update_replication_progress(ctx, false); + OutputPluginUpdateProgress(ctx, false); OutputPluginPrepareWrite(ctx, true); logicalrep_write_stream_commit(ctx->out, txn, commit_lsn); @@ -1924,7 +1916,7 @@ pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx, { Assert(rbtxn_is_streamed(txn)); - update_replication_progress(ctx, false); + OutputPluginUpdateProgress(ctx, false); OutputPluginPrepareWrite(ctx, true); logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn); OutputPluginWrite(ctx, true); @@ -2424,37 +2416,3 @@ send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id, } } } - -/* - * Try to update progress and send a keepalive message if too many changes were - * processed. - * - * For a large transaction, if we don't send any change to the downstream for a - * long time (exceeds the wal_receiver_timeout of standby) then it can timeout. - * This can happen when all or most of the changes are either not published or - * got filtered out. - */ -static void -update_replication_progress(LogicalDecodingContext *ctx, bool skipped_xact) -{ - static int changes_count = 0; - - /* - * We don't want to try sending a keepalive message after processing each - * change as that can have overhead. Tests revealed that there is no - * noticeable overhead in doing it after continuously processing 100 or so - * changes. - */ -#define CHANGES_THRESHOLD 100 - - /* - * If we are at the end of transaction LSN, update progress tracking. - * Otherwise, after continuously processing CHANGES_THRESHOLD changes, we - * try to send a keepalive message if required. - */ - if (ctx->end_xact || ++changes_count >= CHANGES_THRESHOLD) - { - OutputPluginUpdateProgress(ctx, skipped_xact); - changes_count = 0; - } -} |