diff options
Diffstat (limited to 'src/backend/replication/logical')
-rw-r--r-- | src/backend/replication/logical/decode.c | 41 | ||||
-rw-r--r-- | src/backend/replication/logical/logical.c | 43 | ||||
-rw-r--r-- | src/backend/replication/logical/reorderbuffer.c | 35 |
3 files changed, 119 insertions, 0 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 84183f82031..30d80e7c542 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -65,6 +65,7 @@ static void DecodeLogicalMsgOp(LogicalDecodingContext *ctx, XLogRecordBuffer *bu static void DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); +static void DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); static void DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf); @@ -450,6 +451,11 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) DecodeDelete(ctx, buf); break; + case XLOG_HEAP_TRUNCATE: + if (SnapBuildProcessChange(builder, xid, buf->origptr)) + DecodeTruncate(ctx, buf); + break; + case XLOG_HEAP_INPLACE: /* @@ -827,6 +833,41 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) } /* + * Parse XLOG_HEAP_TRUNCATE from wal + */ +static void +DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) +{ + XLogReaderState *r = buf->record; + xl_heap_truncate *xlrec; + ReorderBufferChange *change; + + xlrec = (xl_heap_truncate *) XLogRecGetData(r); + + /* only interested in our database */ + if (xlrec->dbId != ctx->slot->data.database) + return; + + /* output plugin doesn't look for this origin, no need to queue */ + if (FilterByOrigin(ctx, XLogRecGetOrigin(r))) + return; + + change = ReorderBufferGetChange(ctx->reorder); + change->action = REORDER_BUFFER_CHANGE_TRUNCATE; + change->origin_id = XLogRecGetOrigin(r); + if (xlrec->flags & XLH_TRUNCATE_CASCADE) + change->data.truncate.cascade = true; + if (xlrec->flags & XLH_TRUNCATE_RESTART_SEQS) + change->data.truncate.restart_seqs = true; + change->data.truncate.nrelids = xlrec->nrelids; + change->data.truncate.relids = palloc(xlrec->nrelids * sizeof(Oid)); + memcpy(change->data.truncate.relids, xlrec->relids, + xlrec->nrelids * sizeof(Oid)); + ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), + buf->origptr, change); +} + +/* * Decode XLOG_HEAP2_MULTI_INSERT_insert record into multiple tuplebufs. * * Currently MULTI_INSERT will always contain the full tuples. diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 3d8ad7ddf82..0737c7b1e75 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -62,6 +62,8 @@ static void commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr commit_lsn); static void change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, Relation relation, ReorderBufferChange *change); +static void truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + int nrelations, Relation relations[], ReorderBufferChange *change); static void message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size message_size, const char *message); @@ -183,6 +185,7 @@ StartupDecodingContext(List *output_plugin_options, /* wrap output plugin callbacks, so we can add error context information */ ctx->reorder->begin = begin_cb_wrapper; ctx->reorder->apply_change = change_cb_wrapper; + ctx->reorder->apply_truncate = truncate_cb_wrapper; ctx->reorder->commit = commit_cb_wrapper; ctx->reorder->message = message_cb_wrapper; @@ -734,6 +737,46 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, error_context_stack = errcallback.previous; } +static void +truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn, + int nrelations, Relation relations[], ReorderBufferChange *change) +{ + LogicalDecodingContext *ctx = cache->private_data; + LogicalErrorCallbackState state; + ErrorContextCallback errcallback; + + Assert(!ctx->fast_forward); + + if (!ctx->callbacks.truncate_cb) + return; + + /* Push callback + info on the error context stack */ + state.ctx = ctx; + state.callback_name = "truncate"; + state.report_location = change->lsn; + errcallback.callback = output_plugin_error_callback; + errcallback.arg = (void *) &state; + errcallback.previous = error_context_stack; + error_context_stack = &errcallback; + + /* set output state */ + ctx->accept_writes = true; + ctx->write_xid = txn->xid; + + /* + * report this change's lsn so replies from clients can give an up2date + * answer. This won't ever be enough (and shouldn't be!) to confirm + * receipt of this transaction, but it might allow another transaction's + * commit to be confirmed with one message. + */ + ctx->write_location = change->lsn; + + ctx->callbacks.truncate_cb(ctx, txn, nrelations, relations, change); + + /* Pop the error context stack */ + error_context_stack = errcallback.previous; +} + bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id) { diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index b4016ed52b0..596c91e9a95 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -415,6 +415,7 @@ ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change) case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: + case REORDER_BUFFER_CHANGE_TRUNCATE: break; } @@ -1491,6 +1492,38 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, specinsert = change; break; + case REORDER_BUFFER_CHANGE_TRUNCATE: + { + int i; + int nrelids = change->data.truncate.nrelids; + int nrelations = 0; + Relation *relations; + + relations = palloc0(nrelids * sizeof(Relation)); + for (i = 0; i < nrelids; i++) + { + Oid relid = change->data.truncate.relids[i]; + Relation relation; + + relation = RelationIdGetRelation(relid); + + if (relation == NULL) + elog(ERROR, "could not open relation with OID %u", relid); + + if (!RelationIsLogicallyLogged(relation)) + continue; + + relations[nrelations++] = relation; + } + + rb->apply_truncate(rb, txn, nrelations, relations, change); + + for (i = 0; i < nrelations; i++) + RelationClose(relations[i]); + + break; + } + case REORDER_BUFFER_CHANGE_MESSAGE: rb->message(rb, txn, change->lsn, true, change->data.msg.prefix, @@ -2255,6 +2288,7 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, } break; } + case REORDER_BUFFER_CHANGE_TRUNCATE: case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: @@ -2534,6 +2568,7 @@ ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, break; } /* the base struct contains all the data, easy peasy */ + case REORDER_BUFFER_CHANGE_TRUNCATE: case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: |