diff options
Diffstat (limited to 'src/backend/replication/logical/decode.c')
-rw-r--r-- | src/backend/replication/logical/decode.c | 49 |
1 files changed, 46 insertions, 3 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index eb7293f2f33..88424964ef3 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -40,6 +40,7 @@ #include "replication/decode.h" #include "replication/logical.h" #include "replication/reorderbuffer.h" +#include "replication/origin.h" #include "replication/snapbuild.h" #include "storage/standby.h" @@ -131,6 +132,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor case RM_SPGIST_ID: case RM_BRIN_ID: case RM_COMMIT_TS_ID: + case RM_REPLORIGIN_ID: break; case RM_NEXT_ID: elog(ERROR, "unexpected RM_NEXT_ID rmgr_id: %u", (RmgrIds) XLogRecGetRmid(buf.record)); @@ -422,6 +424,15 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) } } +static inline bool +FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id) +{ + if (ctx->callbacks.filter_by_origin_cb == NULL) + return false; + + return filter_by_origin_cb_wrapper(ctx, origin_id); +} + /* * Consolidated commit record handling between the different form of commit * records. @@ -430,8 +441,17 @@ static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, xl_xact_parsed_commit *parsed, TransactionId xid) { + XLogRecPtr origin_lsn = InvalidXLogRecPtr; + XLogRecPtr commit_time = InvalidXLogRecPtr; + XLogRecPtr origin_id = InvalidRepOriginId; int i; + if (parsed->xinfo & XACT_XINFO_HAS_ORIGIN) + { + origin_lsn = parsed->origin_lsn; + commit_time = parsed->origin_timestamp; + } + /* * Process invalidation messages, even if we're not interested in the * transaction's contents, since the various caches need to always be @@ -452,12 +472,13 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, * the reorderbuffer to forget the content of the (sub-)transactions * if not. * - * There basically two reasons we might not be interested in this + * There can be several reasons we might not be interested in this * transaction: * 1) We might not be interested in decoding transactions up to this * LSN. This can happen because we previously decoded it and now just * are restarting or if we haven't assembled a consistent snapshot yet. * 2) The transaction happened in another database. + * 3) The output plugin is not interested in the origin. * * We can't just use ReorderBufferAbort() here, because we need to execute * the transaction's invalidations. This currently won't be needed if @@ -472,7 +493,8 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, * --- */ if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) || - (parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database)) + (parsed->dbId != InvalidOid && parsed->dbId != ctx->slot->data.database) || + FilterByOrigin(ctx, origin_id)) { for (i = 0; i < parsed->nsubxacts; i++) { @@ -492,7 +514,7 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, /* replay actions of all transaction + subtransactions in order */ ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr, - parsed->xact_time); + commit_time, origin_id, origin_lsn); } /* @@ -537,8 +559,13 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) if (target_node.dbNode != ctx->slot->data.database) return; + /* output plugin doesn't look for this origin, no need to queue */ + if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) + return; + change = ReorderBufferGetChange(ctx->reorder); change->action = REORDER_BUFFER_CHANGE_INSERT; + change->origin_id = XLogRecGetOrigin(r); memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode)); if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE) @@ -579,8 +606,13 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) if (target_node.dbNode != ctx->slot->data.database) return; + /* output plugin doesn't look for this origin, no need to queue */ + if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) + return; + change = ReorderBufferGetChange(ctx->reorder); change->action = REORDER_BUFFER_CHANGE_UPDATE; + change->origin_id = XLogRecGetOrigin(r); memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode)); if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE) @@ -628,8 +660,13 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) if (target_node.dbNode != ctx->slot->data.database) return; + /* output plugin doesn't look for this origin, no need to queue */ + if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) + return; + change = ReorderBufferGetChange(ctx->reorder); change->action = REORDER_BUFFER_CHANGE_DELETE; + change->origin_id = XLogRecGetOrigin(r); memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode)); @@ -673,6 +710,10 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) if (rnode.dbNode != ctx->slot->data.database) return; + /* output plugin doesn't look for this origin, no need to queue */ + if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) + return; + tupledata = XLogRecGetBlockData(r, 0, &tuplelen); data = tupledata; @@ -685,6 +726,8 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) change = ReorderBufferGetChange(ctx->reorder); change->action = REORDER_BUFFER_CHANGE_INSERT; + change->origin_id = XLogRecGetOrigin(r); + memcpy(&change->data.tp.relnode, &rnode, sizeof(RelFileNode)); /* |