diff options
author | Amit Kapila <akapila@postgresql.org> | 2022-05-11 10:51:04 +0530 |
---|---|---|
committer | Amit Kapila <akapila@postgresql.org> | 2022-05-11 10:51:04 +0530 |
commit | d6da71fa8f28faa68823e163f318ffb38a7a9a54 (patch) | |
tree | 6a384644d3e56ec4f4ccc1232d87f1eff0a41df8 /src/backend/replication/pgoutput/pgoutput.c | |
parent | ca9e9b08e453523314a3b8e87d1894edb23b6e8d (diff) | |
download | postgresql-d6da71fa8f28faa68823e163f318ffb38a7a9a54.tar.gz postgresql-d6da71fa8f28faa68823e163f318ffb38a7a9a54.zip |
Fix the logical replication timeout during large transactions.
The problem is that we don't send keep-alive messages for a long time
while processing large transactions during logical replication where we
don't send any data of such transactions. This can happen when the table
modified in the transaction is not published or because all the changes
got filtered. We do try to send the keep_alive if necessary at the end of
the transaction (via WalSndWriteData()) but by that time the
subscriber-side can timeout and exit.
To fix this we try to send the keepalive message if required after
processing certain threshold of changes.
Reported-by: Fabrice Chapuis
Author: Wang wei and Amit Kapila
Reviewed By: Masahiko Sawada, Euler Taveira, Hou Zhijie, Hayato Kuroda
Backpatch-through: 10
Discussion: https://postgr.es/m/CAA5-nLARN7-3SLU_QUxfy510pmrYK6JJb=bk3hcgemAM_pAv+w@mail.gmail.com
Diffstat (limited to 'src/backend/replication/pgoutput/pgoutput.c')
-rw-r--r-- | src/backend/replication/pgoutput/pgoutput.c | 44 |
1 files changed, 42 insertions, 2 deletions
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index 2ea540ce4d2..ff9cf5d406d 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -70,6 +70,7 @@ static void publication_invalidation_cb(Datum arg, int cacheid, uint32 hashvalue); static void send_relation_and_attrs(Relation relation, TransactionId xid, LogicalDecodingContext *ctx); +static void update_replication_progress(LogicalDecodingContext *ctx); /* * Entry in the map used to remember which relation schemas we sent. @@ -381,7 +382,7 @@ static void pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn) { - OutputPluginUpdateProgress(ctx); + update_replication_progress(ctx); OutputPluginPrepareWrite(ctx, true); logicalrep_write_commit(ctx->out, txn, commit_lsn); @@ -535,6 +536,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, TransactionId xid = InvalidTransactionId; Relation ancestor = NULL; + update_replication_progress(ctx); + if (!is_publishable_relation(relation)) return; @@ -677,6 +680,8 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Oid *relids; TransactionId xid = InvalidTransactionId; + update_replication_progress(ctx); + /* Remember the xid for the change in streaming mode. See pgoutput_change. */ if (in_streaming) xid = change->txn->xid; @@ -735,6 +740,8 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, PGOutputData *data = (PGOutputData *) ctx->output_plugin_private; TransactionId xid = InvalidTransactionId; + update_replication_progress(ctx); + if (!data->messages) return; @@ -921,7 +928,7 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx, Assert(!in_streaming); Assert(rbtxn_is_streamed(txn)); - OutputPluginUpdateProgress(ctx); + update_replication_progress(ctx); OutputPluginPrepareWrite(ctx, true); logicalrep_write_stream_commit(ctx->out, txn, commit_lsn); @@ -1304,3 +1311,36 @@ rel_sync_cache_publication_cb(Datum arg, int cacheid, uint32 hashvalue) entry->pubactions.pubtruncate = false; } } + +/* + * Try to update progress and send a keepalive message if too many changes were + * processed. + * + * For a large transaction, if we don't send any change to the downstream for a + * long time (exceeds the wal_receiver_timeout of standby) then it can timeout. + * This can happen when all or most of the changes are not published. + */ +static void +update_replication_progress(LogicalDecodingContext *ctx) +{ + static int changes_count = 0; + + /* + * We don't want to try sending a keepalive message after processing each + * change as that can have overhead. Tests revealed that there is no + * noticeable overhead in doing it after continuously processing 100 or so + * changes. + */ +#define CHANGES_THRESHOLD 100 + + /* + * If we are at the end of transaction LSN, update progress tracking. + * Otherwise, after continuously processing CHANGES_THRESHOLD changes, we + * try to send a keepalive message if required. + */ + if (ctx->end_xact || ++changes_count >= CHANGES_THRESHOLD) + { + OutputPluginUpdateProgress(ctx); + changes_count = 0; + } +} |