aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/logical.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/logical.c')
-rw-r--r--src/backend/replication/logical/logical.c27
1 files changed, 27 insertions, 0 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index d536a5f3ba3..f7d14919077 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -724,6 +724,7 @@ startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool i
/* set output state */
ctx->accept_writes = false;
+ ctx->end_xact = false;
/* do the actual work: call callback */
ctx->callbacks.startup_cb(ctx, opt, is_init);
@@ -751,6 +752,7 @@ shutdown_cb_wrapper(LogicalDecodingContext *ctx)
/* set output state */
ctx->accept_writes = false;
+ ctx->end_xact = false;
/* do the actual work: call callback */
ctx->callbacks.shutdown_cb(ctx);
@@ -786,6 +788,7 @@ begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
ctx->accept_writes = true;
ctx->write_xid = txn->xid;
ctx->write_location = txn->first_lsn;
+ ctx->end_xact = false;
/* do the actual work: call callback */
ctx->callbacks.begin_cb(ctx, txn);
@@ -817,6 +820,7 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
ctx->accept_writes = true;
ctx->write_xid = txn->xid;
ctx->write_location = txn->end_lsn; /* points to the end of the record */
+ ctx->end_xact = true;
/* do the actual work: call callback */
ctx->callbacks.commit_cb(ctx, txn, commit_lsn);
@@ -857,6 +861,7 @@ begin_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
ctx->accept_writes = true;
ctx->write_xid = txn->xid;
ctx->write_location = txn->first_lsn;
+ ctx->end_xact = false;
/*
* If the plugin supports two-phase commits then begin prepare callback is
@@ -901,6 +906,7 @@ prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
ctx->accept_writes = true;
ctx->write_xid = txn->xid;
ctx->write_location = txn->end_lsn; /* points to the end of the record */
+ ctx->end_xact = true;
/*
* If the plugin supports two-phase commits then prepare callback is
@@ -945,6 +951,7 @@ commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
ctx->accept_writes = true;
ctx->write_xid = txn->xid;
ctx->write_location = txn->end_lsn; /* points to the end of the record */
+ ctx->end_xact = true;
/*
* If the plugin support two-phase commits then commit prepared callback
@@ -990,6 +997,7 @@ rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
ctx->accept_writes = true;
ctx->write_xid = txn->xid;
ctx->write_location = txn->end_lsn; /* points to the end of the record */
+ ctx->end_xact = true;
/*
* If the plugin support two-phase commits then rollback prepared callback
@@ -1040,6 +1048,8 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
*/
ctx->write_location = change->lsn;
+ ctx->end_xact = false;
+
ctx->callbacks.change_cb(ctx, txn, relation, change);
/* Pop the error context stack */
@@ -1080,6 +1090,8 @@ truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
*/
ctx->write_location = change->lsn;
+ ctx->end_xact = false;
+
ctx->callbacks.truncate_cb(ctx, txn, nrelations, relations, change);
/* Pop the error context stack */
@@ -1107,6 +1119,7 @@ filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, TransactionId xid,
/* set output state */
ctx->accept_writes = false;
+ ctx->end_xact = false;
/* do the actual work: call callback */
ret = ctx->callbacks.filter_prepare_cb(ctx, xid, gid);
@@ -1137,6 +1150,7 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
/* set output state */
ctx->accept_writes = false;
+ ctx->end_xact = false;
/* do the actual work: call callback */
ret = ctx->callbacks.filter_by_origin_cb(ctx, origin_id);
@@ -1174,6 +1188,7 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
ctx->accept_writes = true;
ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
ctx->write_location = message_lsn;
+ ctx->end_xact = false;
/* do the actual work: call callback */
ctx->callbacks.message_cb(ctx, txn, message_lsn, transactional, prefix,
@@ -1217,6 +1232,8 @@ stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
*/
ctx->write_location = first_lsn;
+ ctx->end_xact = false;
+
/* in streaming mode, stream_start_cb is required */
if (ctx->callbacks.stream_start_cb == NULL)
ereport(ERROR,
@@ -1264,6 +1281,8 @@ stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
*/
ctx->write_location = last_lsn;
+ ctx->end_xact = false;
+
/* in streaming mode, stream_stop_cb is required */
if (ctx->callbacks.stream_stop_cb == NULL)
ereport(ERROR,
@@ -1303,6 +1322,7 @@ stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
ctx->accept_writes = true;
ctx->write_xid = txn->xid;
ctx->write_location = abort_lsn;
+ ctx->end_xact = true;
/* in streaming mode, stream_abort_cb is required */
if (ctx->callbacks.stream_abort_cb == NULL)
@@ -1347,6 +1367,7 @@ stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
ctx->accept_writes = true;
ctx->write_xid = txn->xid;
ctx->write_location = txn->end_lsn;
+ ctx->end_xact = true;
/* in streaming mode with two-phase commits, stream_prepare_cb is required */
if (ctx->callbacks.stream_prepare_cb == NULL)
@@ -1387,6 +1408,7 @@ stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
ctx->accept_writes = true;
ctx->write_xid = txn->xid;
ctx->write_location = txn->end_lsn;
+ ctx->end_xact = true;
/* in streaming mode, stream_commit_cb is required */
if (ctx->callbacks.stream_commit_cb == NULL)
@@ -1435,6 +1457,8 @@ stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
*/
ctx->write_location = change->lsn;
+ ctx->end_xact = false;
+
/* in streaming mode, stream_change_cb is required */
if (ctx->callbacks.stream_change_cb == NULL)
ereport(ERROR,
@@ -1479,6 +1503,7 @@ stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
ctx->accept_writes = true;
ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
ctx->write_location = message_lsn;
+ ctx->end_xact = false;
/* do the actual work: call callback */
ctx->callbacks.stream_message_cb(ctx, txn, message_lsn, transactional, prefix,
@@ -1527,6 +1552,8 @@ stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
*/
ctx->write_location = change->lsn;
+ ctx->end_xact = false;
+
ctx->callbacks.stream_truncate_cb(ctx, txn, nrelations, relations, change);
/* Pop the error context stack */