diff options
Diffstat (limited to 'src/backend/replication/logical/decode.c')
-rw-r--r-- | src/backend/replication/logical/decode.c | 158 |
1 files changed, 72 insertions, 86 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 8e78aafda7c..1c7dac38fc9 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -31,7 +31,9 @@ #include "access/transam.h" #include "access/xact.h" #include "access/xlog_internal.h" +#include "access/xlogutils.h" #include "access/xlogreader.h" +#include "access/xlogrecord.h" #include "catalog/pg_control.h" @@ -46,8 +48,7 @@ typedef struct XLogRecordBuffer { XLogRecPtr origptr; XLogRecPtr endptr; - XLogRecord record; - char *record_data; + XLogReaderState *record; } XLogRecordBuffer; /* RMGR Handlers */ @@ -79,17 +80,16 @@ static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup); * context. */ void -LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogRecord *record) +LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record) { XLogRecordBuffer buf; buf.origptr = ctx->reader->ReadRecPtr; buf.endptr = ctx->reader->EndRecPtr; - buf.record = *record; - buf.record_data = XLogRecGetData(record); + buf.record = record; /* cast so we get a warning when new rmgrs are added */ - switch ((RmgrIds) buf.record.xl_rmid) + switch ((RmgrIds) XLogRecGetRmid(record)) { /* * Rmgrs we care about for logical decoding. Add new rmgrs in @@ -135,7 +135,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogRecord *record) case RM_BRIN_ID: break; case RM_NEXT_ID: - elog(ERROR, "unexpected RM_NEXT_ID rmgr_id: %u", (RmgrIds) buf.record.xl_rmid); + elog(ERROR, "unexpected RM_NEXT_ID rmgr_id: %u", (RmgrIds) XLogRecGetRmid(buf.record)); } } @@ -146,7 +146,7 @@ static void DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { SnapBuild *builder = ctx->snapshot_builder; - uint8 info = buf->record.xl_info & ~XLR_INFO_MASK; + uint8 info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK; switch (info) { @@ -185,8 +185,8 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { SnapBuild *builder = ctx->snapshot_builder; ReorderBuffer *reorder = ctx->reorder; - XLogRecord *r = &buf->record; - uint8 info = r->xl_info & ~XLR_INFO_MASK; + XLogReaderState *r = buf->record; + uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK; /* no point in doing anything yet, data could not be decoded anyway */ if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT) @@ -200,12 +200,12 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) TransactionId *subxacts = NULL; SharedInvalidationMessage *invals = NULL; - xlrec = (xl_xact_commit *) buf->record_data; + xlrec = (xl_xact_commit *) XLogRecGetData(r); subxacts = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]); invals = (SharedInvalidationMessage *) &(subxacts[xlrec->nsubxacts]); - DecodeCommit(ctx, buf, r->xl_xid, xlrec->dbId, + DecodeCommit(ctx, buf, XLogRecGetXid(r), xlrec->dbId, xlrec->xact_time, xlrec->nsubxacts, subxacts, xlrec->nmsgs, invals); @@ -220,7 +220,7 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) SharedInvalidationMessage *invals = NULL; /* Prepared commits contain a normal commit record... */ - prec = (xl_xact_commit_prepared *) buf->record_data; + prec = (xl_xact_commit_prepared *) XLogRecGetData(r); xlrec = &prec->crec; subxacts = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]); @@ -237,9 +237,9 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { xl_xact_commit_compact *xlrec; - xlrec = (xl_xact_commit_compact *) buf->record_data; + xlrec = (xl_xact_commit_compact *) XLogRecGetData(r); - DecodeCommit(ctx, buf, r->xl_xid, InvalidOid, + DecodeCommit(ctx, buf, XLogRecGetXid(r), InvalidOid, xlrec->xact_time, xlrec->nsubxacts, xlrec->subxacts, 0, NULL); @@ -250,11 +250,11 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) xl_xact_abort *xlrec; TransactionId *sub_xids; - xlrec = (xl_xact_abort *) buf->record_data; + xlrec = (xl_xact_abort *) XLogRecGetData(r); sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]); - DecodeAbort(ctx, buf->origptr, r->xl_xid, + DecodeAbort(ctx, buf->origptr, XLogRecGetXid(r), sub_xids, xlrec->nsubxacts); break; } @@ -265,7 +265,7 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) TransactionId *sub_xids; /* prepared abort contain a normal commit abort... */ - prec = (xl_xact_abort_prepared *) buf->record_data; + prec = (xl_xact_abort_prepared *) XLogRecGetData(r); xlrec = &prec->arec; sub_xids = (TransactionId *) &(xlrec->xnodes[xlrec->nrels]); @@ -282,7 +282,7 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) int i; TransactionId *sub_xid; - xlrec = (xl_xact_assignment *) buf->record_data; + xlrec = (xl_xact_assignment *) XLogRecGetData(r); sub_xid = &xlrec->xsub[0]; @@ -316,14 +316,14 @@ static void DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { SnapBuild *builder = ctx->snapshot_builder; - XLogRecord *r = &buf->record; - uint8 info = r->xl_info & ~XLR_INFO_MASK; + XLogReaderState *r = buf->record; + uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK; switch (info) { case XLOG_RUNNING_XACTS: { - xl_running_xacts *running = (xl_running_xacts *) buf->record_data; + xl_running_xacts *running = (xl_running_xacts *) XLogRecGetData(r); SnapBuildProcessRunningXacts(builder, buf->origptr, running); @@ -352,8 +352,8 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) static void DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { - uint8 info = buf->record.xl_info & XLOG_HEAP_OPMASK; - TransactionId xid = buf->record.xl_xid; + uint8 info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK; + TransactionId xid = XLogRecGetXid(buf->record); SnapBuild *builder = ctx->snapshot_builder; /* no point in doing anything yet */ @@ -370,7 +370,7 @@ DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { xl_heap_new_cid *xlrec; - xlrec = (xl_heap_new_cid *) buf->record_data; + xlrec = (xl_heap_new_cid *) XLogRecGetData(buf->record); SnapBuildProcessNewCid(builder, xid, buf->origptr, xlrec); break; @@ -405,8 +405,8 @@ DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) static void DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { - uint8 info = buf->record.xl_info & XLOG_HEAP_OPMASK; - TransactionId xid = buf->record.xl_xid; + uint8 info = XLogRecGetInfo(buf->record) & XLOG_HEAP_OPMASK; + TransactionId xid = XLogRecGetXid(buf->record); SnapBuild *builder = ctx->snapshot_builder; /* no point in doing anything yet */ @@ -576,34 +576,35 @@ DecodeAbort(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { - XLogRecord *r = &buf->record; + XLogReaderState *r = buf->record; xl_heap_insert *xlrec; ReorderBufferChange *change; + RelFileNode target_node; - xlrec = (xl_heap_insert *) buf->record_data; + xlrec = (xl_heap_insert *) XLogRecGetData(r); /* only interested in our database */ - if (xlrec->target.node.dbNode != ctx->slot->data.database) + XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL); + if (target_node.dbNode != ctx->slot->data.database) return; change = ReorderBufferGetChange(ctx->reorder); change->action = REORDER_BUFFER_CHANGE_INSERT; - memcpy(&change->data.tp.relnode, &xlrec->target.node, sizeof(RelFileNode)); + memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode)); if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE) { - Assert(r->xl_len > (SizeOfHeapInsert + SizeOfHeapHeader)); + Size tuplelen; + char *tupledata = XLogRecGetBlockData(r, 0, &tuplelen); change->data.tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder); - DecodeXLogTuple((char *) xlrec + SizeOfHeapInsert, - r->xl_len - SizeOfHeapInsert, - change->data.tp.newtuple); + DecodeXLogTuple(tupledata, tuplelen, change->data.tp.newtuple); } change->data.tp.clear_toast_afterwards = true; - ReorderBufferQueueChange(ctx->reorder, r->xl_xid, buf->origptr, change); + ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change); } /* @@ -615,62 +616,47 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { - XLogRecord *r = &buf->record; + XLogReaderState *r = buf->record; xl_heap_update *xlrec; - xl_heap_header_len xlhdr; ReorderBufferChange *change; char *data; + Size datalen; + RelFileNode target_node; - xlrec = (xl_heap_update *) buf->record_data; + xlrec = (xl_heap_update *) XLogRecGetData(r); /* only interested in our database */ - if (xlrec->target.node.dbNode != ctx->slot->data.database) + XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL); + if (target_node.dbNode != ctx->slot->data.database) return; change = ReorderBufferGetChange(ctx->reorder); change->action = REORDER_BUFFER_CHANGE_UPDATE; - memcpy(&change->data.tp.relnode, &xlrec->target.node, sizeof(RelFileNode)); - - /* caution, remaining data in record is not aligned */ - data = buf->record_data + SizeOfHeapUpdate; + memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode)); if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE) { - Assert(r->xl_len > (SizeOfHeapUpdate + SizeOfHeapHeaderLen)); - - memcpy(&xlhdr, data, sizeof(xlhdr)); - data += offsetof(xl_heap_header_len, header); + data = XLogRecGetBlockData(r, 0, &datalen); change->data.tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder); - DecodeXLogTuple(data, - xlhdr.t_len + SizeOfHeapHeader, - change->data.tp.newtuple); - /* skip over the rest of the tuple header */ - data += SizeOfHeapHeader; - /* skip over the tuple data */ - data += xlhdr.t_len; + DecodeXLogTuple(data, datalen, change->data.tp.newtuple); } if (xlrec->flags & XLOG_HEAP_CONTAINS_OLD) { - memcpy(&xlhdr, data, sizeof(xlhdr)); - data += offsetof(xl_heap_header_len, header); + /* caution, remaining data in record is not aligned */ + data = XLogRecGetData(r) + SizeOfHeapUpdate; + datalen = XLogRecGetDataLen(r) - SizeOfHeapUpdate; change->data.tp.oldtuple = ReorderBufferGetTupleBuf(ctx->reorder); - DecodeXLogTuple(data, - xlhdr.t_len + SizeOfHeapHeader, - change->data.tp.oldtuple); -#ifdef NOT_USED - data += SizeOfHeapHeader; - data += xlhdr.t_len; -#endif + DecodeXLogTuple(data, datalen, change->data.tp.oldtuple); } change->data.tp.clear_toast_afterwards = true; - ReorderBufferQueueChange(ctx->reorder, r->xl_xid, buf->origptr, change); + ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change); } /* @@ -681,36 +667,38 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { - XLogRecord *r = &buf->record; + XLogReaderState *r = buf->record; xl_heap_delete *xlrec; ReorderBufferChange *change; + RelFileNode target_node; - xlrec = (xl_heap_delete *) buf->record_data; + xlrec = (xl_heap_delete *) XLogRecGetData(r); /* only interested in our database */ - if (xlrec->target.node.dbNode != ctx->slot->data.database) + XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL); + if (target_node.dbNode != ctx->slot->data.database) return; change = ReorderBufferGetChange(ctx->reorder); change->action = REORDER_BUFFER_CHANGE_DELETE; - memcpy(&change->data.tp.relnode, &xlrec->target.node, sizeof(RelFileNode)); + memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode)); /* old primary key stored */ if (xlrec->flags & XLOG_HEAP_CONTAINS_OLD) { - Assert(r->xl_len > (SizeOfHeapDelete + SizeOfHeapHeader)); + Assert(XLogRecGetDataLen(r) > (SizeOfHeapDelete + SizeOfHeapHeader)); change->data.tp.oldtuple = ReorderBufferGetTupleBuf(ctx->reorder); DecodeXLogTuple((char *) xlrec + SizeOfHeapDelete, - r->xl_len - SizeOfHeapDelete, + XLogRecGetDataLen(r) - SizeOfHeapDelete, change->data.tp.oldtuple); } change->data.tp.clear_toast_afterwards = true; - ReorderBufferQueueChange(ctx->reorder, r->xl_xid, buf->origptr, change); + ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change); } /* @@ -721,27 +709,24 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) { - XLogRecord *r = &buf->record; + XLogReaderState *r = buf->record; xl_heap_multi_insert *xlrec; int i; char *data; - bool isinit = (r->xl_info & XLOG_HEAP_INIT_PAGE) != 0; + char *tupledata; + Size tuplelen; + RelFileNode rnode; - xlrec = (xl_heap_multi_insert *) buf->record_data; + xlrec = (xl_heap_multi_insert *) XLogRecGetData(r); /* only interested in our database */ - if (xlrec->node.dbNode != ctx->slot->data.database) + XLogRecGetBlockTag(r, 0, &rnode, NULL, NULL); + if (rnode.dbNode != ctx->slot->data.database) return; - data = buf->record_data + SizeOfHeapMultiInsert; - - /* - * OffsetNumbers (which are not of interest to us) are stored when - * XLOG_HEAP_INIT_PAGE is not set -- skip over them. - */ - if (!isinit) - data += sizeof(OffsetNumber) * xlrec->ntuples; + tupledata = XLogRecGetBlockData(r, 0, &tuplelen); + data = tupledata; for (i = 0; i < xlrec->ntuples; i++) { ReorderBufferChange *change; @@ -751,7 +736,7 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) change = ReorderBufferGetChange(ctx->reorder); change->action = REORDER_BUFFER_CHANGE_INSERT; - memcpy(&change->data.tp.relnode, &xlrec->node, sizeof(RelFileNode)); + memcpy(&change->data.tp.relnode, &rnode, sizeof(RelFileNode)); /* * CONTAINS_NEW_TUPLE will always be set currently as multi_insert @@ -806,9 +791,10 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) else change->data.tp.clear_toast_afterwards = false; - ReorderBufferQueueChange(ctx->reorder, r->xl_xid, + ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change); } + Assert(data == tupledata + tuplelen); } /* |