diff options
Diffstat (limited to 'src/backend/replication/logical/decode.c')
-rw-r--r-- | src/backend/replication/logical/decode.c | 46 |
1 files changed, 46 insertions, 0 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 7781ebcae0b..3e80c4a0d86 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -39,6 +39,7 @@ #include "replication/decode.h" #include "replication/logical.h" +#include "replication/message.h" #include "replication/reorderbuffer.h" #include "replication/origin.h" #include "replication/snapbuild.h" @@ -58,6 +59,7 @@ static void DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +static void DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); /* individual record(group)'s handlers */ static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); @@ -123,6 +125,10 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor DecodeHeapOp(ctx, &buf); break; + case RM_LOGICALMSG_ID: + DecodeLogicalMsgOp(ctx, &buf); + break; + /* * Rmgrs irrelevant for logical decoding; they describe stuff not * represented in logical decoding. Add new rmgrs in rmgrlist.h's @@ -458,6 +464,46 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) } } +/* + * Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer(). + */ +static void +DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +{ + SnapBuild *builder = ctx->snapshot_builder; + XLogReaderState *r = buf->record; + TransactionId xid = XLogRecGetXid(r); + uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK; + Snapshot snapshot; + xl_logical_message *message; + + if (info != XLOG_LOGICAL_MESSAGE) + elog(ERROR, "unexpected RM_LOGICALMSG_ID record type: %u", info); + + ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr); + + /* No point in doing anything yet. */ + if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT) + return; + + message = (xl_logical_message *) XLogRecGetData(r); + + if (message->transactional && + !SnapBuildProcessChange(builder, xid, buf->origptr)) + return; + else if (!message->transactional && + (SnapBuildCurrentState(builder) != SNAPBUILD_CONSISTENT || + SnapBuildXactNeedsSkip(builder, buf->origptr))) + return; + + snapshot = SnapBuildGetOrBuildSnapshot(builder, xid); + ReorderBufferQueueMessage(ctx->reorder, xid, snapshot, buf->endptr, + message->transactional, + message->message, /* first part of message is prefix */ + message->message_size, + message->message + message->prefix_size); +} + static inline bool FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id) { |