diff options
Diffstat (limited to 'src/backend/replication/logical/decode.c')
-rw-r--r-- | src/backend/replication/logical/decode.c | 66 |
1 files changed, 59 insertions, 7 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 88424964ef3..ea388182692 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -64,6 +64,8 @@ static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); + static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, xl_xact_parsed_commit *parsed, TransactionId xid); static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, @@ -414,6 +416,11 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr); break; + case XLOG_HEAP_CONFIRM: + if (SnapBuildProcessChange(builder, xid, buf->origptr)) + DecodeSpecConfirm(ctx, buf); + break; + case XLOG_HEAP_LOCK: /* we don't care about row level locks for now */ break; @@ -564,11 +571,15 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) return; change = ReorderBufferGetChange(ctx->reorder); - change->action = REORDER_BUFFER_CHANGE_INSERT; + if (!(xlrec->flags & XLH_INSERT_IS_SPECULATIVE)) + change->action = REORDER_BUFFER_CHANGE_INSERT; + else + change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT; change->origin_id = XLogRecGetOrigin(r); + memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode)); - if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE) + if (xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE) { Size tuplelen; char *tupledata = XLogRecGetBlockData(r, 0, &tuplelen); @@ -615,7 +626,7 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) change->origin_id = XLogRecGetOrigin(r); memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode)); - if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE) + if (xlrec->flags & XLH_UPDATE_CONTAINS_NEW_TUPLE) { data = XLogRecGetBlockData(r, 0, &datalen); @@ -624,7 +635,7 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) DecodeXLogTuple(data, datalen, change->data.tp.newtuple); } - if (xlrec->flags & XLOG_HEAP_CONTAINS_OLD) + if (xlrec->flags & XLH_UPDATE_CONTAINS_OLD) { /* caution, remaining data in record is not aligned */ data = XLogRecGetData(r) + SizeOfHeapUpdate; @@ -660,6 +671,13 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) if (target_node.dbNode != ctx->slot->data.database) return; + /* + * Super deletions are irrelevant for logical decoding, it's driven by the + * confirmation records. + */ + if (xlrec->flags & XLH_DELETE_IS_SUPER) + return; + /* output plugin doesn't look for this origin, no need to queue */ if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) return; @@ -671,7 +689,7 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode)); /* old primary key stored */ - if (xlrec->flags & XLOG_HEAP_CONTAINS_OLD) + if (xlrec->flags & XLH_DELETE_CONTAINS_OLD) { Assert(XLogRecGetDataLen(r) > (SizeOfHeapDelete + SizeOfHeapHeader)); @@ -737,7 +755,7 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) * We decode the tuple in pretty much the same way as DecodeXLogTuple, * but since the layout is slightly different, we can't use it here. */ - if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE) + if (xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE) { change->data.tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder); @@ -775,7 +793,7 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) * xl_multi_insert_tuple record emitted by one heap_multi_insert() * call. */ - if (xlrec->flags & XLOG_HEAP_LAST_MULTI_INSERT && + if (xlrec->flags & XLH_INSERT_LAST_IN_MULTI && (i + 1) == xlrec->ntuples) change->data.tp.clear_toast_afterwards = true; else @@ -788,6 +806,40 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) } /* + * Parse XLOG_HEAP_CONFIRM from wal into a confirmation change. + * + * This is pretty trivial, all the state essentially already setup by the + * speculative insertion. + */ +static void +DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +{ + XLogReaderState *r = buf->record; + ReorderBufferChange *change; + RelFileNode target_node; + + /* only interested in our database */ + XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL); + if (target_node.dbNode != ctx->slot->data.database) + return; + + /* output plugin doesn't look for this origin, no need to queue */ + if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) + return; + + change = ReorderBufferGetChange(ctx->reorder); + change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM; + change->origin_id = XLogRecGetOrigin(r); + + memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode)); + + change->data.tp.clear_toast_afterwards = true; + + ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change); +} + + +/* * Read a HeapTuple as WAL logged by heap_insert, heap_update and heap_delete * (but not by heap_multi_insert) into a tuplebuf. * |