aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/logical.c
diff options
context:
space:
mode:
authorAmit Kapila <akapila@postgresql.org>2022-05-11 10:51:04 +0530
committerAmit Kapila <akapila@postgresql.org>2022-05-11 10:51:04 +0530
commitd6da71fa8f28faa68823e163f318ffb38a7a9a54 (patch)
tree6a384644d3e56ec4f4ccc1232d87f1eff0a41df8 /src/backend/replication/logical/logical.c
parentca9e9b08e453523314a3b8e87d1894edb23b6e8d (diff)
downloadpostgresql-d6da71fa8f28faa68823e163f318ffb38a7a9a54.tar.gz
postgresql-d6da71fa8f28faa68823e163f318ffb38a7a9a54.zip
Fix the logical replication timeout during large transactions.
The problem is that we don't send keep-alive messages for a long time while processing large transactions during logical replication where we don't send any data of such transactions. This can happen when the table modified in the transaction is not published or because all the changes got filtered. We do try to send the keep_alive if necessary at the end of the transaction (via WalSndWriteData()) but by that time the subscriber-side can timeout and exit. To fix this we try to send the keepalive message if required after processing certain threshold of changes. Reported-by: Fabrice Chapuis Author: Wang wei and Amit Kapila Reviewed By: Masahiko Sawada, Euler Taveira, Hou Zhijie, Hayato Kuroda Backpatch-through: 10 Discussion: https://postgr.es/m/CAA5-nLARN7-3SLU_QUxfy510pmrYK6JJb=bk3hcgemAM_pAv+w@mail.gmail.com
Diffstat (limited to 'src/backend/replication/logical/logical.c')
-rw-r--r--src/backend/replication/logical/logical.c27
1 files changed, 27 insertions, 0 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index d536a5f3ba3..f7d14919077 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -724,6 +724,7 @@ startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool i
/* set output state */
ctx->accept_writes = false;
+ ctx->end_xact = false;
/* do the actual work: call callback */
ctx->callbacks.startup_cb(ctx, opt, is_init);
@@ -751,6 +752,7 @@ shutdown_cb_wrapper(LogicalDecodingContext *ctx)
/* set output state */
ctx->accept_writes = false;
+ ctx->end_xact = false;
/* do the actual work: call callback */
ctx->callbacks.shutdown_cb(ctx);
@@ -786,6 +788,7 @@ begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
ctx->accept_writes = true;
ctx->write_xid = txn->xid;
ctx->write_location = txn->first_lsn;
+ ctx->end_xact = false;
/* do the actual work: call callback */
ctx->callbacks.begin_cb(ctx, txn);
@@ -817,6 +820,7 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
ctx->accept_writes = true;
ctx->write_xid = txn->xid;
ctx->write_location = txn->end_lsn; /* points to the end of the record */
+ ctx->end_xact = true;
/* do the actual work: call callback */
ctx->callbacks.commit_cb(ctx, txn, commit_lsn);
@@ -857,6 +861,7 @@ begin_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
ctx->accept_writes = true;
ctx->write_xid = txn->xid;
ctx->write_location = txn->first_lsn;
+ ctx->end_xact = false;
/*
* If the plugin supports two-phase commits then begin prepare callback is
@@ -901,6 +906,7 @@ prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
ctx->accept_writes = true;
ctx->write_xid = txn->xid;
ctx->write_location = txn->end_lsn; /* points to the end of the record */
+ ctx->end_xact = true;
/*
* If the plugin supports two-phase commits then prepare callback is
@@ -945,6 +951,7 @@ commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
ctx->accept_writes = true;
ctx->write_xid = txn->xid;
ctx->write_location = txn->end_lsn; /* points to the end of the record */
+ ctx->end_xact = true;
/*
* If the plugin support two-phase commits then commit prepared callback
@@ -990,6 +997,7 @@ rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
ctx->accept_writes = true;
ctx->write_xid = txn->xid;
ctx->write_location = txn->end_lsn; /* points to the end of the record */
+ ctx->end_xact = true;
/*
* If the plugin support two-phase commits then rollback prepared callback
@@ -1040,6 +1048,8 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
*/
ctx->write_location = change->lsn;
+ ctx->end_xact = false;
+
ctx->callbacks.change_cb(ctx, txn, relation, change);
/* Pop the error context stack */
@@ -1080,6 +1090,8 @@ truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
*/
ctx->write_location = change->lsn;
+ ctx->end_xact = false;
+
ctx->callbacks.truncate_cb(ctx, txn, nrelations, relations, change);
/* Pop the error context stack */
@@ -1107,6 +1119,7 @@ filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, TransactionId xid,
/* set output state */
ctx->accept_writes = false;
+ ctx->end_xact = false;
/* do the actual work: call callback */
ret = ctx->callbacks.filter_prepare_cb(ctx, xid, gid);
@@ -1137,6 +1150,7 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
/* set output state */
ctx->accept_writes = false;
+ ctx->end_xact = false;
/* do the actual work: call callback */
ret = ctx->callbacks.filter_by_origin_cb(ctx, origin_id);
@@ -1174,6 +1188,7 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
ctx->accept_writes = true;
ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
ctx->write_location = message_lsn;
+ ctx->end_xact = false;
/* do the actual work: call callback */
ctx->callbacks.message_cb(ctx, txn, message_lsn, transactional, prefix,
@@ -1217,6 +1232,8 @@ stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
*/
ctx->write_location = first_lsn;
+ ctx->end_xact = false;
+
/* in streaming mode, stream_start_cb is required */
if (ctx->callbacks.stream_start_cb == NULL)
ereport(ERROR,
@@ -1264,6 +1281,8 @@ stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
*/
ctx->write_location = last_lsn;
+ ctx->end_xact = false;
+
/* in streaming mode, stream_stop_cb is required */
if (ctx->callbacks.stream_stop_cb == NULL)
ereport(ERROR,
@@ -1303,6 +1322,7 @@ stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
ctx->accept_writes = true;
ctx->write_xid = txn->xid;
ctx->write_location = abort_lsn;
+ ctx->end_xact = true;
/* in streaming mode, stream_abort_cb is required */
if (ctx->callbacks.stream_abort_cb == NULL)
@@ -1347,6 +1367,7 @@ stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
ctx->accept_writes = true;
ctx->write_xid = txn->xid;
ctx->write_location = txn->end_lsn;
+ ctx->end_xact = true;
/* in streaming mode with two-phase commits, stream_prepare_cb is required */
if (ctx->callbacks.stream_prepare_cb == NULL)
@@ -1387,6 +1408,7 @@ stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
ctx->accept_writes = true;
ctx->write_xid = txn->xid;
ctx->write_location = txn->end_lsn;
+ ctx->end_xact = true;
/* in streaming mode, stream_commit_cb is required */
if (ctx->callbacks.stream_commit_cb == NULL)
@@ -1435,6 +1457,8 @@ stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
*/
ctx->write_location = change->lsn;
+ ctx->end_xact = false;
+
/* in streaming mode, stream_change_cb is required */
if (ctx->callbacks.stream_change_cb == NULL)
ereport(ERROR,
@@ -1479,6 +1503,7 @@ stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
ctx->accept_writes = true;
ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
ctx->write_location = message_lsn;
+ ctx->end_xact = false;
/* do the actual work: call callback */
ctx->callbacks.stream_message_cb(ctx, txn, message_lsn, transactional, prefix,
@@ -1527,6 +1552,8 @@ stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
*/
ctx->write_location = change->lsn;
+ ctx->end_xact = false;
+
ctx->callbacks.stream_truncate_cb(ctx, txn, nrelations, relations, change);
/* Pop the error context stack */