diff options
author | Amit Kapila <akapila@postgresql.org> | 2022-05-11 11:11:44 +0530 |
---|---|---|
committer | Amit Kapila <akapila@postgresql.org> | 2022-05-11 11:11:44 +0530 |
commit | f95d53eded55ecbf037f6416ced6af29a2c3caca (patch) | |
tree | 300a90851aa4256f9c5cdf0e8f88e9d8efdf40ba /src/backend/replication/logical/logical.c | |
parent | 8bbf8461a3a2a38ce5f2952a025385b6938a61f7 (diff) | |
download | postgresql-f95d53eded55ecbf037f6416ced6af29a2c3caca.tar.gz postgresql-f95d53eded55ecbf037f6416ced6af29a2c3caca.zip |
Fix the logical replication timeout during large transactions.
The problem is that we don't send keep-alive messages for a long time
while processing large transactions during logical replication where we
don't send any data of such transactions. This can happen when the table
modified in the transaction is not published or because all the changes
got filtered. We do try to send the keep_alive if necessary at the end of
the transaction (via WalSndWriteData()) but by that time the
subscriber-side can timeout and exit.
To fix this we try to send the keepalive message if required after
processing certain threshold of changes.
Reported-by: Fabrice Chapuis
Author: Wang wei and Amit Kapila
Reviewed By: Masahiko Sawada, Euler Taveira, Hou Zhijie, Hayato Kuroda
Backpatch-through: 10
Discussion: https://postgr.es/m/CAA5-nLARN7-3SLU_QUxfy510pmrYK6JJb=bk3hcgemAM_pAv+w@mail.gmail.com
Diffstat (limited to 'src/backend/replication/logical/logical.c')
-rw-r--r-- | src/backend/replication/logical/logical.c | 27 |
1 files changed, 27 insertions, 0 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 788769dd738..625a7f42730 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -746,6 +746,7 @@ startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool i /* set output state */ ctx->accept_writes = false; + ctx->end_xact = false; /* do the actual work: call callback */ ctx->callbacks.startup_cb(ctx, opt, is_init); @@ -773,6 +774,7 @@ shutdown_cb_wrapper(LogicalDecodingContext *ctx) /* set output state */ ctx->accept_writes = false; + ctx->end_xact = false; /* do the actual work: call callback */ ctx->callbacks.shutdown_cb(ctx); @@ -808,6 +810,7 @@ begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn) ctx->accept_writes = true; ctx->write_xid = txn->xid; ctx->write_location = txn->first_lsn; + ctx->end_xact = false; /* do the actual work: call callback */ ctx->callbacks.begin_cb(ctx, txn); @@ -839,6 +842,7 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, ctx->accept_writes = true; ctx->write_xid = txn->xid; ctx->write_location = txn->end_lsn; /* points to the end of the record */ + ctx->end_xact = true; /* do the actual work: call callback */ ctx->callbacks.commit_cb(ctx, txn, commit_lsn); @@ -879,6 +883,7 @@ begin_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn) ctx->accept_writes = true; ctx->write_xid = txn->xid; ctx->write_location = txn->first_lsn; + ctx->end_xact = false; /* * If the plugin supports two-phase commits then begin prepare callback is @@ -923,6 +928,7 @@ prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, ctx->accept_writes = true; ctx->write_xid = txn->xid; ctx->write_location = txn->end_lsn; /* points to the end of the record */ + ctx->end_xact = true; /* * If the plugin supports two-phase commits then prepare callback is @@ -967,6 +973,7 @@ commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, ctx->accept_writes = true; ctx->write_xid = txn->xid; ctx->write_location = txn->end_lsn; /* points to the end of the record */ + ctx->end_xact = true; /* * If the plugin support two-phase commits then commit prepared callback @@ -1012,6 +1019,7 @@ rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, ctx->accept_writes = true; ctx->write_xid = txn->xid; ctx->write_location = txn->end_lsn; /* points to the end of the record */ + ctx->end_xact = true; /* * If the plugin support two-phase commits then rollback prepared callback @@ -1062,6 +1070,8 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, */ ctx->write_location = change->lsn; + ctx->end_xact = false; + ctx->callbacks.change_cb(ctx, txn, relation, change); /* Pop the error context stack */ @@ -1102,6 +1112,8 @@ truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, */ ctx->write_location = change->lsn; + ctx->end_xact = false; + ctx->callbacks.truncate_cb(ctx, txn, nrelations, relations, change); /* Pop the error context stack */ @@ -1129,6 +1141,7 @@ filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, TransactionId xid, /* set output state */ ctx->accept_writes = false; + ctx->end_xact = false; /* do the actual work: call callback */ ret = ctx->callbacks.filter_prepare_cb(ctx, xid, gid); @@ -1159,6 +1172,7 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id) /* set output state */ ctx->accept_writes = false; + ctx->end_xact = false; /* do the actual work: call callback */ ret = ctx->callbacks.filter_by_origin_cb(ctx, origin_id); @@ -1196,6 +1210,7 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, ctx->accept_writes = true; ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId; ctx->write_location = message_lsn; + ctx->end_xact = false; /* do the actual work: call callback */ ctx->callbacks.message_cb(ctx, txn, message_lsn, transactional, prefix, @@ -1239,6 +1254,8 @@ stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, */ ctx->write_location = first_lsn; + ctx->end_xact = false; + /* in streaming mode, stream_start_cb is required */ if (ctx->callbacks.stream_start_cb == NULL) ereport(ERROR, @@ -1286,6 +1303,8 @@ stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, */ ctx->write_location = last_lsn; + ctx->end_xact = false; + /* in streaming mode, stream_stop_cb is required */ if (ctx->callbacks.stream_stop_cb == NULL) ereport(ERROR, @@ -1325,6 +1344,7 @@ stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, ctx->accept_writes = true; ctx->write_xid = txn->xid; ctx->write_location = abort_lsn; + ctx->end_xact = true; /* in streaming mode, stream_abort_cb is required */ if (ctx->callbacks.stream_abort_cb == NULL) @@ -1369,6 +1389,7 @@ stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, ctx->accept_writes = true; ctx->write_xid = txn->xid; ctx->write_location = txn->end_lsn; + ctx->end_xact = true; /* in streaming mode with two-phase commits, stream_prepare_cb is required */ if (ctx->callbacks.stream_prepare_cb == NULL) @@ -1409,6 +1430,7 @@ stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, ctx->accept_writes = true; ctx->write_xid = txn->xid; ctx->write_location = txn->end_lsn; + ctx->end_xact = true; /* in streaming mode, stream_commit_cb is required */ if (ctx->callbacks.stream_commit_cb == NULL) @@ -1457,6 +1479,8 @@ stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, */ ctx->write_location = change->lsn; + ctx->end_xact = false; + /* in streaming mode, stream_change_cb is required */ if (ctx->callbacks.stream_change_cb == NULL) ereport(ERROR, @@ -1501,6 +1525,7 @@ stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, ctx->accept_writes = true; ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId; ctx->write_location = message_lsn; + ctx->end_xact = false; /* do the actual work: call callback */ ctx->callbacks.stream_message_cb(ctx, txn, message_lsn, transactional, prefix, @@ -1549,6 +1574,8 @@ stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, */ ctx->write_location = change->lsn; + ctx->end_xact = false; + ctx->callbacks.stream_truncate_cb(ctx, txn, nrelations, relations, change); /* Pop the error context stack */ |