aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/decode.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/decode.c')
-rw-r--r--src/backend/replication/logical/decode.c66
1 files changed, 59 insertions, 7 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 88424964ef3..ea388182692 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -64,6 +64,8 @@ static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+
static void DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
xl_xact_parsed_commit *parsed, TransactionId xid);
static void DecodeAbort(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
@@ -414,6 +416,11 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr);
break;
+ case XLOG_HEAP_CONFIRM:
+ if (SnapBuildProcessChange(builder, xid, buf->origptr))
+ DecodeSpecConfirm(ctx, buf);
+ break;
+
case XLOG_HEAP_LOCK:
/* we don't care about row level locks for now */
break;
@@ -564,11 +571,15 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
return;
change = ReorderBufferGetChange(ctx->reorder);
- change->action = REORDER_BUFFER_CHANGE_INSERT;
+ if (!(xlrec->flags & XLH_INSERT_IS_SPECULATIVE))
+ change->action = REORDER_BUFFER_CHANGE_INSERT;
+ else
+ change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT;
change->origin_id = XLogRecGetOrigin(r);
+
memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
- if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE)
+ if (xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE)
{
Size tuplelen;
char *tupledata = XLogRecGetBlockData(r, 0, &tuplelen);
@@ -615,7 +626,7 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
change->origin_id = XLogRecGetOrigin(r);
memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
- if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE)
+ if (xlrec->flags & XLH_UPDATE_CONTAINS_NEW_TUPLE)
{
data = XLogRecGetBlockData(r, 0, &datalen);
@@ -624,7 +635,7 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
DecodeXLogTuple(data, datalen, change->data.tp.newtuple);
}
- if (xlrec->flags & XLOG_HEAP_CONTAINS_OLD)
+ if (xlrec->flags & XLH_UPDATE_CONTAINS_OLD)
{
/* caution, remaining data in record is not aligned */
data = XLogRecGetData(r) + SizeOfHeapUpdate;
@@ -660,6 +671,13 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
if (target_node.dbNode != ctx->slot->data.database)
return;
+ /*
+ * Super deletions are irrelevant for logical decoding, it's driven by the
+ * confirmation records.
+ */
+ if (xlrec->flags & XLH_DELETE_IS_SUPER)
+ return;
+
/* output plugin doesn't look for this origin, no need to queue */
if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
return;
@@ -671,7 +689,7 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
/* old primary key stored */
- if (xlrec->flags & XLOG_HEAP_CONTAINS_OLD)
+ if (xlrec->flags & XLH_DELETE_CONTAINS_OLD)
{
Assert(XLogRecGetDataLen(r) > (SizeOfHeapDelete + SizeOfHeapHeader));
@@ -737,7 +755,7 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
* We decode the tuple in pretty much the same way as DecodeXLogTuple,
* but since the layout is slightly different, we can't use it here.
*/
- if (xlrec->flags & XLOG_HEAP_CONTAINS_NEW_TUPLE)
+ if (xlrec->flags & XLH_INSERT_CONTAINS_NEW_TUPLE)
{
change->data.tp.newtuple = ReorderBufferGetTupleBuf(ctx->reorder);
@@ -775,7 +793,7 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
* xl_multi_insert_tuple record emitted by one heap_multi_insert()
* call.
*/
- if (xlrec->flags & XLOG_HEAP_LAST_MULTI_INSERT &&
+ if (xlrec->flags & XLH_INSERT_LAST_IN_MULTI &&
(i + 1) == xlrec->ntuples)
change->data.tp.clear_toast_afterwards = true;
else
@@ -788,6 +806,40 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
}
/*
+ * Parse XLOG_HEAP_CONFIRM from wal into a confirmation change.
+ *
+ * This is pretty trivial, all the state essentially already setup by the
+ * speculative insertion.
+ */
+static void
+DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+ XLogReaderState *r = buf->record;
+ ReorderBufferChange *change;
+ RelFileNode target_node;
+
+ /* only interested in our database */
+ XLogRecGetBlockTag(r, 0, &target_node, NULL, NULL);
+ if (target_node.dbNode != ctx->slot->data.database)
+ return;
+
+ /* output plugin doesn't look for this origin, no need to queue */
+ if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+ return;
+
+ change = ReorderBufferGetChange(ctx->reorder);
+ change->action = REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM;
+ change->origin_id = XLogRecGetOrigin(r);
+
+ memcpy(&change->data.tp.relnode, &target_node, sizeof(RelFileNode));
+
+ change->data.tp.clear_toast_afterwards = true;
+
+ ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change);
+}
+
+
+/*
* Read a HeapTuple as WAL logged by heap_insert, heap_update and heap_delete
* (but not by heap_multi_insert) into a tuplebuf.
*