aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/decode.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/decode.c')
-rw-r--r--src/backend/replication/logical/decode.c158
1 files changed, 72 insertions, 86 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 8e78aafda7c..1c7dac38fc9 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -31,7 +31,9 @@
#include "access/transam.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
+#include "access/xlogutils.h"
#include "access/xlogreader.h"
+#include "access/xlogrecord.h"
#include "catalog/pg_control.h"
@@ -46,8 +48,7 @@ typedef struct XLogRecordBuffer
{
XLogRecPtr origptr;
XLogRecPtr endptr;
- XLogRecord record;
- char *record_data;
+ XLogReaderState *record;
} XLogRecordBuffer;
/* RMGR Handlers */
@@ -79,17 +80,16 @@ static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
* context.
*/
void
-LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogRecord *record)
+LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
{
XLogRecordBuffer buf;
buf.origptr = ctx->reader->ReadRecPtr;
buf.endptr = ctx->reader->EndRecPtr;
- buf.record = *record;
- buf.record_data = XLogRecGetData(record);
+ buf.record = record;
/* cast so we get a warning when new rmgrs are added */
- switch ((RmgrIds) buf.record.xl_rmid)
+ switch ((RmgrIds) XLogRecGetRmid(record))
{
/*
* Rmgrs we care about for logical decoding. Add new rmgrs in
@@ -135,7 +135,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogRecord *record)
case RM_BRIN_ID:
break;
case RM_NEXT_ID:
- elog(ERROR, "unexpected RM_NEXT_ID rmgr_id: %u", (RmgrIds) buf.record.xl_rmid);
+ elog(ERROR, "unexpected RM_NEXT_ID rmgr_id: %u", (RmgrIds) XLogRecGetRmid(buf.record));
}
}
@@ -146,7 +146,7 @@ static void
DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
SnapBuild *builder = ctx->snapshot_builder;
- uint8 info = buf->record.xl_info & ~XLR_INFO_MASK;
+ uint8 info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
switch (info)
{
@@ -185,8 +185,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
SnapBuild *builder = ctx->snapshot_builder;
ReorderBuffer *reorder = ctx->reorder;
- XLogRecord *r = &buf->record;
- uint8 info = r->xl_info & ~XLR_INFO_MASK;
+ XLogReaderState *r = buf->record;
+ uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
/* no point in doing anything yet, data could not be decoded anyway */
if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
@@ -200,12 +200,12 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
TransactionId *subxacts = NULL;
SharedInvalidationMessage *invals = NULL;
- xlrec = (xl_xact_commit *) buf->record_data;
+ xlrec = (xl_xact_commit *) XLogRecGetData(r);
subxacts = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
invals = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]);
- DecodeCommit(ctx, buf, r->xl_xid, xlrec->dbId,
+ DecodeCommit(ctx, buf, XLogRecGetXid(r), xlrec->dbId,
xlrec->xact_time,
xlrec->nsubxacts, subxacts,
xlrec->nmsgs, invals);
@@ -220,7 +220,7 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
SharedInvalidationMessage *invals = NULL;
/* Prepared commits contain a normal commit record... */
- prec = (xl_xact_commit_prepared *) buf->record_data;
+ prec = (xl_xact_commit_prepared *) XLogRecGetData(r);
xlrec = &prec->crec;
subxacts = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
@@ -237,9 +237,9 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
xl_xact_commit_compact *xlrec;
- xlrec = (xl_xact_commit_compact *) buf->record_data;
+ xlrec = (xl_xact_commit_compact *) XLogRecGetData(r);
- DecodeCommit(ctx, buf, r->xl_xid, InvalidOid,
+ DecodeCommit(ctx, buf, XLogRecGetXid(r), InvalidOid,
xlrec->xact_time,
xlrec->nsubxacts, xlrec->subxacts,
0, NULL);
@@ -250,11 +250,11 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
xl_xact_abort *xlrec;
TransactionId *sub_xids;
- xlrec = (xl_xact_abort *) buf->record_data;
+ xlrec = (xl_xact_abort *) XLogRecGetData(r);
sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
- DecodeAbort(ctx, buf->origptr, r->xl_xid,
+ DecodeAbort(ctx, buf->origptr, XLogRecGetXid(r),
sub_xids, xlrec->nsubxacts);
break;
}
@@ -265,7 +265,7 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
TransactionId *sub_xids;
/* prepared abort contain a normal commit abort... */
- prec = (xl_xact_abort_prepared *) buf->record_data;
+ prec = (xl_xact_abort_prepared *) XLogRecGetData(r);
xlrec = &prec->arec;
sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]);
@@ -282,7 +282,7 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
int i;
TransactionId *sub_xid;
- xlrec = (xl_xact_assignment *) buf->record_data;
+ xlrec = (xl_xact_assignment *) XLogRecGetData(r);
sub_xid = &xlrec->xsub[0];
@@ -316,14 +316,14 @@ static void
DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
SnapBuild *builder = ctx->snapshot_builder;
- XLogRecord *r = &buf->record;
- uint8 info = r->xl_info & ~XLR_INFO_MASK;
+ XLogReaderState *r = buf->record;
+ uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
switch (info)
{
case XLOG_RUNNING_XACTS:
{
- xl_running_xacts *running = (xl_running_xacts *) buf->record_data;
+ xl_running_xacts *running = (xl_running_xacts *) XLogRecGetData(r);
SnapBuildProcessRunningXacts(builder, buf->origptr, running);
@@ -352,8 +352,8 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
static void
DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
- uint8 info = buf->record.xl_info & XLOG_HEAP_OPMASK;
- TransactionId xid = buf->record.xl_xid;
+ uint8 info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
+ TransactionId xid = XLogRecGetXid(buf->record);
SnapBuild *builder = ctx->snapshot_builder;
/* no point in doing anything yet */
@@ -370,7 +370,7 @@ DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
xl_heap_new_cid *xlrec;
- xlrec = (xl_heap_new_cid *) buf->record_data;
+ xlrec = (xl_heap_new_cid *) XLogRecGetData(buf->record);
SnapBuildProcessNewCid(builder, xid, buf->origptr, xlrec);
break;
@@ -405,8 +405,8 @@ DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
static void
DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
- uint8 info = buf->record.xl_info & XLOG_HEAP_OPMASK;
- TransactionId xid = buf->record.xl_xid;
+ uint8 info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK;
+ TransactionId xid = XLogRecGetXid(buf->record);
SnapBuild *builder = ctx->snapshot_builder;
/* no point in doing anything yet */
@@ -576,34 +576,35 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
static void
DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
- XLogRecord *r = &buf->record;
+ XLogReaderState *r = buf->record;
xl_heap_insert *xlrec;
ReorderBufferChange *change;
+ RelFileNode target_node;
- xlrec = (xl_heap_insert *) buf->record_data;
+ xlrec = (xl_heap_insert *) XLogRecGetData(r);
/* only interested in our database */
- if (xlrec->target.node.dbNode != ctx->slot->data.database)
+ XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL);
+ if (target_node.dbNode != ctx->slot->data.database)
return;
change = ReorderBufferGetChange(ctx->reorder);
change->action = REORDER_BUFFER_CHANGE_INSERT;
- memcpy(&change->data.tp.relnode, &xlrec->target.node, sizeof(RelFileNode));
+ memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE)
{
- Assert(r->xl_len > (SizeOfHeapInsert + SizeOfHeapHeader));
+ Size tuplelen;
+ char *tupledata = XLogRecGetBlockData(r, 0, &tuplelen);
change->data.tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder);
- DecodeXLogTuple((char *) xlrec + SizeOfHeapInsert,
- r->xl_len - SizeOfHeapInsert,
- change->data.tp.newtuple);
+ DecodeXLogTuple(tupledata, tuplelen, change->data.tp.newtuple);
}
change->data.tp.clear_toast_afterwards = true;
- ReorderBufferQueueChange(ctx->reorder, r->xl_xid, buf->origptr, change);
+ ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change);
}
/*
@@ -615,62 +616,47 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
static void
DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
- XLogRecord *r = &buf->record;
+ XLogReaderState *r = buf->record;
xl_heap_update *xlrec;
- xl_heap_header_len xlhdr;
ReorderBufferChange *change;
char *data;
+ Size datalen;
+ RelFileNode target_node;
- xlrec = (xl_heap_update *) buf->record_data;
+ xlrec = (xl_heap_update *) XLogRecGetData(r);
/* only interested in our database */
- if (xlrec->target.node.dbNode != ctx->slot->data.database)
+ XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL);
+ if (target_node.dbNode != ctx->slot->data.database)
return;
change = ReorderBufferGetChange(ctx->reorder);
change->action = REORDER_BUFFER_CHANGE_UPDATE;
- memcpy(&change->data.tp.relnode, &xlrec->target.node, sizeof(RelFileNode));
-
- /* caution, remaining data in record is not aligned */
- data = buf->record_data + SizeOfHeapUpdate;
+ memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE)
{
- Assert(r->xl_len > (SizeOfHeapUpdate + SizeOfHeapHeaderLen));
-
- memcpy(&xlhdr, data, sizeof(xlhdr));
- data += offsetof(xl_heap_header_len, header);
+ data = XLogRecGetBlockData(r, 0, &datalen);
change->data.tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder);
- DecodeXLogTuple(data,
- xlhdr.t_len + SizeOfHeapHeader,
- change->data.tp.newtuple);
- /* skip over the rest of the tuple header */
- data += SizeOfHeapHeader;
- /* skip over the tuple data */
- data += xlhdr.t_len;
+ DecodeXLogTuple(data, datalen, change->data.tp.newtuple);
}
if (xlrec->flags & XLOG_HEAP_CONTAINS_OLD)
{
- memcpy(&xlhdr, data, sizeof(xlhdr));
- data += offsetof(xl_heap_header_len, header);
+ /* caution, remaining data in record is not aligned */
+ data = XLogRecGetData(r) + SizeOfHeapUpdate;
+ datalen = XLogRecGetDataLen(r) - SizeOfHeapUpdate;
change->data.tp.oldtuple = ReorderBufferGetTupleBuf(ctx->reorder);
- DecodeXLogTuple(data,
- xlhdr.t_len + SizeOfHeapHeader,
- change->data.tp.oldtuple);
-#ifdef NOT_USED
- data += SizeOfHeapHeader;
- data += xlhdr.t_len;
-#endif
+ DecodeXLogTuple(data, datalen, change->data.tp.oldtuple);
}
change->data.tp.clear_toast_afterwards = true;
- ReorderBufferQueueChange(ctx->reorder, r->xl_xid, buf->origptr, change);
+ ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change);
}
/*
@@ -681,36 +667,38 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
static void
DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
- XLogRecord *r = &buf->record;
+ XLogReaderState *r = buf->record;
xl_heap_delete *xlrec;
ReorderBufferChange *change;
+ RelFileNode target_node;
- xlrec = (xl_heap_delete *) buf->record_data;
+ xlrec = (xl_heap_delete *) XLogRecGetData(r);
/* only interested in our database */
- if (xlrec->target.node.dbNode != ctx->slot->data.database)
+ XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL);
+ if (target_node.dbNode != ctx->slot->data.database)
return;
change = ReorderBufferGetChange(ctx->reorder);
change->action = REORDER_BUFFER_CHANGE_DELETE;
- memcpy(&change->data.tp.relnode, &xlrec->target.node, sizeof(RelFileNode));
+ memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
/* old primary key stored */
if (xlrec->flags & XLOG_HEAP_CONTAINS_OLD)
{
- Assert(r->xl_len > (SizeOfHeapDelete + SizeOfHeapHeader));
+ Assert(XLogRecGetDataLen(r) > (SizeOfHeapDelete + SizeOfHeapHeader));
change->data.tp.oldtuple = ReorderBufferGetTupleBuf(ctx->reorder);
DecodeXLogTuple((char *) xlrec + SizeOfHeapDelete,
- r->xl_len - SizeOfHeapDelete,
+ XLogRecGetDataLen(r) - SizeOfHeapDelete,
change->data.tp.oldtuple);
}
change->data.tp.clear_toast_afterwards = true;
- ReorderBufferQueueChange(ctx->reorder, r->xl_xid, buf->origptr, change);
+ ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change);
}
/*
@@ -721,27 +709,24 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
static void
DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
{
- XLogRecord *r = &buf->record;
+ XLogReaderState *r = buf->record;
xl_heap_multi_insert *xlrec;
int i;
char *data;
- bool isinit = (r->xl_info & XLOG_HEAP_INIT_PAGE) != 0;
+ char *tupledata;
+ Size tuplelen;
+ RelFileNode rnode;
- xlrec = (xl_heap_multi_insert *) buf->record_data;
+ xlrec = (xl_heap_multi_insert *) XLogRecGetData(r);
/* only interested in our database */
- if (xlrec->node.dbNode != ctx->slot->data.database)
+ XLogRecGetBlockTag(r, 0, &rnode, NULL, NULL);
+ if (rnode.dbNode != ctx->slot->data.database)
return;
- data = buf->record_data + SizeOfHeapMultiInsert;
-
- /*
- * OffsetNumbers (which are not of interest to us) are stored when
- * XLOG_HEAP_INIT_PAGE is not set -- skip over them.
- */
- if (!isinit)
- data += sizeof(OffsetNumber) * xlrec->ntuples;
+ tupledata = XLogRecGetBlockData(r, 0, &tuplelen);
+ data = tupledata;
for (i = 0; i < xlrec->ntuples; i++)
{
ReorderBufferChange *change;
@@ -751,7 +736,7 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
change = ReorderBufferGetChange(ctx->reorder);
change->action = REORDER_BUFFER_CHANGE_INSERT;
- memcpy(&change->data.tp.relnode, &xlrec->node, sizeof(RelFileNode));
+ memcpy(&change->data.tp.relnode, &rnode, sizeof(RelFileNode));
/*
* CONTAINS_NEW_TUPLE will always be set currently as multi_insert
@@ -806,9 +791,10 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
else
change->data.tp.clear_toast_afterwards = false;
- ReorderBufferQueueChange(ctx->reorder, r->xl_xid,
+ ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
buf->origptr, change);
}
+ Assert(data == tupledata + tuplelen);
}
/*