diff options
author | Simon Riggs <simon@2ndQuadrant.com> | 2016-04-06 10:05:41 +0100 |
---|---|---|
committer | Simon Riggs <simon@2ndQuadrant.com> | 2016-04-06 10:05:41 +0100 |
commit | 3fe3511d05127cc024b221040db2eeb352e7d716 (patch) | |
tree | b17a084bec318a70a1c0fcd755596b771871bce7 /src/backend/replication/logical/decode.c | |
parent | 989be0810dffd08b54e1caecec0677608211c339 (diff) | |
download | postgresql-3fe3511d05127cc024b221040db2eeb352e7d716.tar.gz postgresql-3fe3511d05127cc024b221040db2eeb352e7d716.zip |
Generic Messages for Logical Decoding
API and mechanism to allow generic messages to be inserted into WAL that are
intended to be read by logical decoding plugins. This commit adds an optional
new callback to the logical decoding API.
Messages are either text or bytea. Messages can be transactional, or not, and
are identified by a prefix to allow multiple concurrent decoding plugins.
(Not to be confused with Generic WAL records, which are intended to allow crash
recovery of extensible objects.)
Author: Petr Jelinek and Andres Freund
Reviewers: Artur Zakirov, Tomas Vondra, Simon Riggs
Discussion: 5685F999.6010202@2ndquadrant.com
Diffstat (limited to 'src/backend/replication/logical/decode.c')
-rw-r--r-- | src/backend/replication/logical/decode.c | 46 |
1 files changed, 46 insertions, 0 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 7781ebcae0b..3e80c4a0d86 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -39,6 +39,7 @@ #include "replication/decode.h" #include "replication/logical.h" +#include "replication/message.h" #include "replication/reorderbuffer.h" #include "replication/origin.h" #include "replication/snapbuild.h" @@ -58,6 +59,7 @@ static void DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +static void DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); /* individual record(group)'s handlers */ static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); @@ -123,6 +125,10 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor DecodeHeapOp(ctx, &buf); break; + case RM_LOGICALMSG_ID: + DecodeLogicalMsgOp(ctx, &buf); + break; + /* * Rmgrs irrelevant for logical decoding; they describe stuff not * represented in logical decoding. Add new rmgrs in rmgrlist.h's @@ -458,6 +464,46 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) } } +/* + * Handle rmgr LOGICALMSG_ID records for DecodeRecordIntoReorderBuffer(). + */ +static void +DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +{ + SnapBuild *builder = ctx->snapshot_builder; + XLogReaderState *r = buf->record; + TransactionId xid = XLogRecGetXid(r); + uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK; + Snapshot snapshot; + xl_logical_message *message; + + if (info != XLOG_LOGICAL_MESSAGE) + elog(ERROR, "unexpected RM_LOGICALMSG_ID record type: %u", info); + + ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr); + + /* No point in doing anything yet. */ + if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT) + return; + + message = (xl_logical_message *) XLogRecGetData(r); + + if (message->transactional && + !SnapBuildProcessChange(builder, xid, buf->origptr)) + return; + else if (!message->transactional && + (SnapBuildCurrentState(builder) != SNAPBUILD_CONSISTENT || + SnapBuildXactNeedsSkip(builder, buf->origptr))) + return; + + snapshot = SnapBuildGetOrBuildSnapshot(builder, xid); + ReorderBufferQueueMessage(ctx->reorder, xid, snapshot, buf->endptr, + message->transactional, + message->message, /* first part of message is prefix */ + message->message_size, + message->message + message->prefix_size); +} + static inline bool FilterByOrigin(LogicalDecodingContext *ctx, RepOriginId origin_id) { |