aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/decode.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/decode.c')
-rw-r--r--src/backend/replication/logical/decode.c46
1 files changed, 46 insertions, 0 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 7781ebcae0b..3e80c4a0d86 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -39,6 +39,7 @@
#include "replication/decode.h"
#include "replication/logical.h"
+#include "replication/message.h"
#include "replication/reorderbuffer.h"
#include "replication/origin.h"
#include "replication/snapbuild.h"
@@ -58,6 +59,7 @@ static void DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
/* individual record(group)'s handlers */
static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
@@ -123,6 +125,10 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
DecodeHeapOp(ctx, &buf);
break;
+ case RM_LOGICALMSG_ID:
+ DecodeLogicalMsgOp(ctx, &buf);
+ break;
+
/*
* Rmgrs irrelevant for logical decoding; they describe stuff not
* represented in logical decoding. Add new rmgrs in rmgrlist.h's
@@ -458,6 +464,46 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
}
}
+/*
+ * Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer().
+ */
+static void
+DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+ SnapBuild *builder = ctx->snapshot_builder;
+ XLogReaderState *r = buf->record;
+ TransactionId xid = XLogRecGetXid(r);
+ uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
+ Snapshot snapshot;
+ xl_logical_message *message;
+
+ if (info != XLOG_LOGICAL_MESSAGE)
+ elog(ERROR, "unexpected RM_LOGICALMSG_ID record type: %u", info);
+
+ ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
+
+ /* No point in doing anything yet. */
+ if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
+ return;
+
+ message = (xl_logical_message *) XLogRecGetData(r);
+
+ if (message->transactional &&
+ !SnapBuildProcessChange(builder, xid, buf->origptr))
+ return;
+ else if (!message->transactional &&
+ (SnapBuildCurrentState(builder) != SNAPBUILD_CONSISTENT ||
+ SnapBuildXactNeedsSkip(builder, buf->origptr)))
+ return;
+
+ snapshot = SnapBuildGetOrBuildSnapshot(builder, xid);
+ ReorderBufferQueueMessage(ctx->reorder, xid, snapshot, buf->endptr,
+ message->transactional,
+ message->message, /* first part of message is prefix */
+ message->message_size,
+ message->message + message->prefix_size);
+}
+
static inline bool
FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id)
{