aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/decode.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/decode.c')
-rw-r--r--src/backend/replication/logical/decode.c41
1 files changed, 41 insertions, 0 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 84183f82031..30d80e7c542 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -65,6 +65,7 @@ static void DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *bu
static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
@@ -450,6 +451,11 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
DecodeDelete(ctx, buf);
break;
+ case XLOG_HEAP_TRUNCATE:
+ if (SnapBuildProcessChange(builder, xid, buf->origptr))
+ DecodeTruncate(ctx, buf);
+ break;
+
case XLOG_HEAP_INPLACE:
/*
@@ -827,6 +833,41 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
}
/*
+ * Parse XLOG_HEAP_TRUNCATE from wal
+ */
+static void
+DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+ XLogReaderState *r = buf->record;
+ xl_heap_truncate *xlrec;
+ ReorderBufferChange *change;
+
+ xlrec = (xl_heap_truncate *) XLogRecGetData(r);
+
+ /* only interested in our database */
+ if (xlrec->dbId != ctx->slot->data.database)
+ return;
+
+ /* output plugin doesn't look for this origin, no need to queue */
+ if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+ return;
+
+ change = ReorderBufferGetChange(ctx->reorder);
+ change->action = REORDER_BUFFER_CHANGE_TRUNCATE;
+ change->origin_id = XLogRecGetOrigin(r);
+ if (xlrec->flags & XLH_TRUNCATE_CASCADE)
+ change->data.truncate.cascade = true;
+ if (xlrec->flags & XLH_TRUNCATE_RESTART_SEQS)
+ change->data.truncate.restart_seqs = true;
+ change->data.truncate.nrelids = xlrec->nrelids;
+ change->data.truncate.relids = palloc(xlrec->nrelids * sizeof(Oid));
+ memcpy(change->data.truncate.relids, xlrec->relids,
+ xlrec->nrelids * sizeof(Oid));
+ ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
+ buf->origptr, change);
+}
+
+/*
* Decode XLOG_HEAP2_MULTI_INSERT_insert record into multiple tuplebufs.
*
* Currently MULTI_INSERT will always contain the full tuples.