aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical')
-rw-r--r--src/backend/replication/logical/decode.c41
-rw-r--r--src/backend/replication/logical/logical.c43
-rw-r--r--src/backend/replication/logical/reorderbuffer.c35
3 files changed, 119 insertions, 0 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 84183f82031..30d80e7c542 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -65,6 +65,7 @@ static void DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *bu
static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
+static void DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf);
@@ -450,6 +451,11 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
DecodeDelete(ctx, buf);
break;
+ case XLOG_HEAP_TRUNCATE:
+ if (SnapBuildProcessChange(builder, xid, buf->origptr))
+ DecodeTruncate(ctx, buf);
+ break;
+
case XLOG_HEAP_INPLACE:
/*
@@ -827,6 +833,41 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
}
/*
+ * Parse XLOG_HEAP_TRUNCATE from wal
+ */
+static void
+DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
+{
+ XLogReaderState *r = buf->record;
+ xl_heap_truncate *xlrec;
+ ReorderBufferChange *change;
+
+ xlrec = (xl_heap_truncate *) XLogRecGetData(r);
+
+ /* only interested in our database */
+ if (xlrec->dbId != ctx->slot->data.database)
+ return;
+
+ /* output plugin doesn't look for this origin, no need to queue */
+ if (FilterByOrigin(ctx, XLogRecGetOrigin(r)))
+ return;
+
+ change = ReorderBufferGetChange(ctx->reorder);
+ change->action = REORDER_BUFFER_CHANGE_TRUNCATE;
+ change->origin_id = XLogRecGetOrigin(r);
+ if (xlrec->flags & XLH_TRUNCATE_CASCADE)
+ change->data.truncate.cascade = true;
+ if (xlrec->flags & XLH_TRUNCATE_RESTART_SEQS)
+ change->data.truncate.restart_seqs = true;
+ change->data.truncate.nrelids = xlrec->nrelids;
+ change->data.truncate.relids = palloc(xlrec->nrelids * sizeof(Oid));
+ memcpy(change->data.truncate.relids, xlrec->relids,
+ xlrec->nrelids * sizeof(Oid));
+ ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
+ buf->origptr, change);
+}
+
+/*
* Decode XLOG_HEAP2_MULTI_INSERT_insert record into multiple tuplebufs.
*
* Currently MULTI_INSERT will always contain the full tuples.
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 3d8ad7ddf82..0737c7b1e75 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -62,6 +62,8 @@ static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
Relation relation, ReorderBufferChange *change);
+static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ int nrelations, Relation relations[], ReorderBufferChange *change);
static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
XLogRecPtr message_lsn, bool transactional,
const char *prefix, Size message_size, const char *message);
@@ -183,6 +185,7 @@ StartupDecodingContext(List *output_plugin_options,
/* wrap output plugin callbacks, so we can add error context information */
ctx->reorder->begin = begin_cb_wrapper;
ctx->reorder->apply_change = change_cb_wrapper;
+ ctx->reorder->apply_truncate = truncate_cb_wrapper;
ctx->reorder->commit = commit_cb_wrapper;
ctx->reorder->message = message_cb_wrapper;
@@ -734,6 +737,46 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
error_context_stack = errcallback.previous;
}
+static void
+truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
+ int nrelations, Relation relations[], ReorderBufferChange *change)
+{
+ LogicalDecodingContext *ctx = cache->private_data;
+ LogicalErrorCallbackState state;
+ ErrorContextCallback errcallback;
+
+ Assert(!ctx->fast_forward);
+
+ if (!ctx->callbacks.truncate_cb)
+ return;
+
+ /* Push callback + info on the error context stack */
+ state.ctx = ctx;
+ state.callback_name = "truncate";
+ state.report_location = change->lsn;
+ errcallback.callback = output_plugin_error_callback;
+ errcallback.arg = (void *) &state;
+ errcallback.previous = error_context_stack;
+ error_context_stack = &errcallback;
+
+ /* set output state */
+ ctx->accept_writes = true;
+ ctx->write_xid = txn->xid;
+
+ /*
+ * report this change's lsn so replies from clients can give an up2date
+ * answer. This won't ever be enough (and shouldn't be!) to confirm
+ * receipt of this transaction, but it might allow another transaction's
+ * commit to be confirmed with one message.
+ */
+ ctx->write_location = change->lsn;
+
+ ctx->callbacks.truncate_cb(ctx, txn, nrelations, relations, change);
+
+ /* Pop the error context stack */
+ error_context_stack = errcallback.previous;
+}
+
bool
filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
{
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index b4016ed52b0..596c91e9a95 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -415,6 +415,7 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
+ case REORDER_BUFFER_CHANGE_TRUNCATE:
break;
}
@@ -1491,6 +1492,38 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
specinsert = change;
break;
+ case REORDER_BUFFER_CHANGE_TRUNCATE:
+ {
+ int i;
+ int nrelids = change->data.truncate.nrelids;
+ int nrelations = 0;
+ Relation *relations;
+
+ relations = palloc0(nrelids * sizeof(Relation));
+ for (i = 0; i < nrelids; i++)
+ {
+ Oid relid = change->data.truncate.relids[i];
+ Relation relation;
+
+ relation = RelationIdGetRelation(relid);
+
+ if (relation == NULL)
+ elog(ERROR, "could not open relation with OID %u", relid);
+
+ if (!RelationIsLogicallyLogged(relation))
+ continue;
+
+ relations[nrelations++] = relation;
+ }
+
+ rb->apply_truncate(rb, txn, nrelations, relations, change);
+
+ for (i = 0; i < nrelations; i++)
+ RelationClose(relations[i]);
+
+ break;
+ }
+
case REORDER_BUFFER_CHANGE_MESSAGE:
rb->message(rb, txn, change->lsn, true,
change->data.msg.prefix,
@@ -2255,6 +2288,7 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
}
break;
}
+ case REORDER_BUFFER_CHANGE_TRUNCATE:
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
@@ -2534,6 +2568,7 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
break;
}
/* the base struct contains all the data, easy peasy */
+ case REORDER_BUFFER_CHANGE_TRUNCATE:
case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: