aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/logical/decode.c17
-rw-r--r--src/backend/replication/logical/logical.c5
-rw-r--r--src/include/replication/logical.h3
-rw-r--r--src/include/replication/output_plugin.h1
4 files changed, 17 insertions, 9 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 5f596135b15..97be4b0f23f 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -80,7 +80,8 @@ static void DecodePrepare(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
/* helper functions for decoding transactions */
-static inline bool FilterPrepare(LogicalDecodingContext *ctx, const char *gid);
+static inline bool FilterPrepare(LogicalDecodingContext *ctx,
+ TransactionId xid, const char *gid);
static bool DecodeTXNNeedSkip(LogicalDecodingContext *ctx,
XLogRecordBuffer *buf, Oid dbId,
RepOriginId origin_id);
@@ -271,7 +272,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
* doesn't filter the transaction at prepare time.
*/
if (info == XLOG_XACT_COMMIT_PREPARED)
- two_phase = !(FilterPrepare(ctx, parsed.twophase_gid));
+ two_phase = !(FilterPrepare(ctx, xid,
+ parsed.twophase_gid));
DecodeCommit(ctx, buf, &parsed, xid, two_phase);
break;
@@ -298,7 +300,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
* doesn't filter the transaction at prepare time.
*/
if (info == XLOG_XACT_ABORT_PREPARED)
- two_phase = !(FilterPrepare(ctx, parsed.twophase_gid));
+ two_phase = !(FilterPrepare(ctx, xid,
+ parsed.twophase_gid));
DecodeAbort(ctx, buf, &parsed, xid, two_phase);
break;
@@ -355,7 +358,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
* manner iff output plugin supports two-phase commits and
* doesn't filter the transaction at prepare time.
*/
- if (FilterPrepare(ctx, parsed.twophase_gid))
+ if (FilterPrepare(ctx, parsed.twophase_xid,
+ parsed.twophase_gid))
{
ReorderBufferProcessXid(reorder, parsed.twophase_xid,
buf->origptr);
@@ -581,7 +585,8 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
* this transaction as a regular commit later.
*/
static inline bool
-FilterPrepare(LogicalDecodingContext *ctx, const char *gid)
+FilterPrepare(LogicalDecodingContext *ctx, TransactionId xid,
+ const char *gid)
{
/*
* Skip if decoding of two-phase transactions at PREPARE time is not
@@ -599,7 +604,7 @@ FilterPrepare(LogicalDecodingContext *ctx, const char *gid)
if (ctx->callbacks.filter_prepare_cb == NULL)
return false;
- return filter_prepare_cb_wrapper(ctx, gid);
+ return filter_prepare_cb_wrapper(ctx, xid, gid);
}
static inline bool
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 37b75deb728..2f6803637bf 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -1083,7 +1083,8 @@ truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
}
bool
-filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, const char *gid)
+filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, TransactionId xid,
+ const char *gid)
{
LogicalErrorCallbackState state;
ErrorContextCallback errcallback;
@@ -1104,7 +1105,7 @@ filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, const char *gid)
ctx->accept_writes = false;
/* do the actual work: call callback */
- ret = ctx->callbacks.filter_prepare_cb(ctx, gid);
+ ret = ctx->callbacks.filter_prepare_cb(ctx, xid, gid);
/* Pop the error context stack */
error_context_stack = errcallback.previous;
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index c2534033723..af551d6f4ee 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -125,7 +125,8 @@ extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn,
XLogRecPtr restart_lsn);
extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
-extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, const char *gid);
+extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx,
+ TransactionId xid, const char *gid);
extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
extern void ResetLogicalStreamingState(void);
extern void UpdateDecodingStats(LogicalDecodingContext *ctx);
diff --git a/src/include/replication/output_plugin.h b/src/include/replication/output_plugin.h
index 2c2c964c55f..810495ed0e4 100644
--- a/src/include/replication/output_plugin.h
+++ b/src/include/replication/output_plugin.h
@@ -106,6 +106,7 @@ typedef void (*LogicalDecodeShutdownCB) (struct LogicalDecodingContext *ctx);
* and sent as usual transaction.
*/
typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
+ TransactionId xid,
const char *gid);
/*