diff options
Diffstat (limited to 'src/backend/replication/logical')
-rw-r--r-- | src/backend/replication/logical/decode.c | 23 | ||||
-rw-r--r-- | src/backend/replication/logical/message.c | 6 |
2 files changed, 20 insertions, 9 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 3e80c4a0d86..0cdb0b8a92b 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -464,6 +464,15 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) } } +static inline bool +FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id) +{ + if (ctx->callbacks.filter_by_origin_cb == NULL) + return false; + + return filter_by_origin_cb_wrapper(ctx, origin_id); +} + /* * Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer(). */ @@ -474,6 +483,7 @@ DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) XLogReaderState *r = buf->record; TransactionId xid = XLogRecGetXid(r); uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK; + RepOriginId origin_id = XLogRecGetOrigin(r); Snapshot snapshot; xl_logical_message *message; @@ -488,6 +498,10 @@ DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) message = (xl_logical_message *) XLogRecGetData(r); + if (message->dbId != ctx->slot->data.database || + FilterByOrigin(ctx, origin_id)) + return; + if (message->transactional && !SnapBuildProcessChange(builder, xid, buf->origptr)) return; @@ -504,15 +518,6 @@ DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) message->message + message->prefix_size); } -static inline bool -FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id) -{ - if (ctx->callbacks.filter_by_origin_cb == NULL) - return false; - - return filter_by_origin_cb_wrapper(ctx, origin_id); -} - /* * Consolidated commit record handling between the different form of commit * records. diff --git a/src/backend/replication/logical/message.c b/src/backend/replication/logical/message.c index 684f7998263..efcc25ae957 100644 --- a/src/backend/replication/logical/message.c +++ b/src/backend/replication/logical/message.c @@ -31,6 +31,8 @@ #include "postgres.h" +#include "miscadmin.h" + #include "access/xact.h" #include "catalog/indexing.h" @@ -60,6 +62,7 @@ LogLogicalMessage(const char *prefix, const char *message, size_t size, GetCurrentTransactionId(); } + xlrec.dbId = MyDatabaseId; xlrec.transactional = transactional; xlrec.prefix_size = strlen(prefix) + 1; xlrec.message_size = size; @@ -69,6 +72,9 @@ LogLogicalMessage(const char *prefix, const char *message, size_t size, XLogRegisterData((char *) prefix, xlrec.prefix_size); XLogRegisterData((char *) message, size); + /* allow origin filtering */ + XLogIncludeOrigin(); + return XLogInsert(RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE); } |