aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/logical/decode.c23
-rw-r--r--src/backend/replication/logical/message.c6
-rw-r--r--src/include/access/xlog_internal.h2
-rw-r--r--src/include/replication/message.h1
4 files changed, 22 insertions, 10 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 3e80c4a0d86..0cdb0b8a92b 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -464,6 +464,15 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
}
}
+static inline bool
+FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
+{
+ if (ctx->callbacks.filter_by_origin_cb == NULL)
+ return false;
+
+ return filter_by_origin_cb_wrapper(ctx, origin_id);
+}
+
/*
* Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer().
*/
@@ -474,6 +483,7 @@ DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
XLogReaderState *r = buf->record;
TransactionId xid = XLogRecGetXid(r);
uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
+ RepOriginId origin_id = XLogRecGetOrigin(r);
Snapshot snapshot;
xl_logical_message *message;
@@ -488,6 +498,10 @@ DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
message = (xl_logical_message *) XLogRecGetData(r);
+ if (message->dbId != ctx->slot->data.database ||
+ FilterByOrigin(ctx, origin_id))
+ return;
+
if (message->transactional &&
!SnapBuildProcessChange(builder, xid, buf->origptr))
return;
@@ -504,15 +518,6 @@ DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
message->message + message->prefix_size);
}
-static inline bool
-FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
-{
- if (ctx->callbacks.filter_by_origin_cb == NULL)
- return false;
-
- return filter_by_origin_cb_wrapper(ctx, origin_id);
-}
-
/*
* Consolidated commit record handling between the different form of commit
* records.
diff --git a/src/backend/replication/logical/message.c b/src/backend/replication/logical/message.c
index 684f7998263..efcc25ae957 100644
--- a/src/backend/replication/logical/message.c
+++ b/src/backend/replication/logical/message.c
@@ -31,6 +31,8 @@
#include "postgres.h"
+#include "miscadmin.h"
+
#include "access/xact.h"
#include "catalog/indexing.h"
@@ -60,6 +62,7 @@ LogLogicalMessage(const char *prefix, const char *message, size_t size,
GetCurrentTransactionId();
}
+ xlrec.dbId = MyDatabaseId;
xlrec.transactional = transactional;
xlrec.prefix_size = strlen(prefix) + 1;
xlrec.message_size = size;
@@ -69,6 +72,9 @@ LogLogicalMessage(const char *prefix, const char *message, size_t size,
XLogRegisterData((char *) prefix, xlrec.prefix_size);
XLogRegisterData((char *) message, size);
+ /* allow origin filtering */
+ XLogIncludeOrigin();
+
return XLogInsert(RM_LOGICALMSG_ID, XLOG_LOGICAL_MESSAGE);
}
diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h
index 749060f9c75..7089a1c48f7 100644
--- a/src/include/access/xlog_internal.h
+++ b/src/include/access/xlog_internal.h
@@ -31,7 +31,7 @@
/*
* Each page of XLOG file has a header like this:
*/
-#define XLOG_PAGE_MAGIC 0xD090 /* can be used as WAL version indicator */
+#define XLOG_PAGE_MAGIC 0xD091 /* can be used as WAL version indicator */
typedef struct XLogPageHeaderData
{
diff --git a/src/include/replication/message.h b/src/include/replication/message.h
index 8b968d5288e..23b9cdb268b 100644
--- a/src/include/replication/message.h
+++ b/src/include/replication/message.h
@@ -19,6 +19,7 @@
*/
typedef struct xl_logical_message
{
+ Oid dbId; /* database Oid emitted from */
bool transactional; /* is message transactional? */
Size prefix_size; /* length of prefix */
Size message_size; /* size of the message */