diff options
-rw-r--r-- | contrib/test_decoding/Makefile | 4 | ||||
-rw-r--r-- | contrib/test_decoding/expected/xact.out | 42 | ||||
-rw-r--r-- | contrib/test_decoding/sql/xact.sql | 22 | ||||
-rw-r--r-- | src/backend/replication/logical/decode.c | 28 | ||||
-rw-r--r-- | src/backend/replication/logical/reorderbuffer.c | 21 | ||||
-rw-r--r-- | src/backend/replication/logical/snapbuild.c | 6 | ||||
-rw-r--r-- | src/include/replication/reorderbuffer.h | 2 |
7 files changed, 108 insertions, 17 deletions
diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index 78816bfe2f8..200c43efd6f 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -37,8 +37,8 @@ submake-isolation: submake-test_decoding: $(MAKE) -C $(top_builddir)/contrib/test_decoding -REGRESSCHECKS=ddl rewrite toast permissions decoding_in_xact decoding_into_rel \ - binary prepared replorigin time +REGRESSCHECKS=ddl xact rewrite toast permissions decoding_in_xact \ + decoding_into_rel binary prepared replorigin time regresscheck: | submake-regress submake-test_decoding temp-install $(MKDIR_P) regression_output diff --git a/contrib/test_decoding/expected/xact.out b/contrib/test_decoding/expected/xact.out new file mode 100644 index 00000000000..507b701c3ab --- /dev/null +++ b/contrib/test_decoding/expected/xact.out @@ -0,0 +1,42 @@ +-- predictability +SET synchronous_commit = on; +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + ?column? +---------- + init +(1 row) + +-- bug #13844, xids in non-decoded records need to be inspected +CREATE TABLE xact_test(data text); +INSERT INTO xact_test VALUES ('before-test'); +BEGIN; +-- perform operation in xact that creates and logs xid, but isn't decoded +SELECT * FROM xact_test FOR UPDATE; + data +------------- + before-test +(1 row) + +SAVEPOINT foo; +-- and now actually insert in subxact, xid is expected to be known +INSERT INTO xact_test VALUES ('after-assignment'); +COMMIT; +-- and now show those changes +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +--------------------------------------------------------------- + BEGIN + table public.xact_test: INSERT: data[text]:'before-test' + COMMIT + BEGIN + table public.xact_test: INSERT: data[text]:'after-assignment' + COMMIT +(6 rows) + +DROP TABLE xact_test; +SELECT pg_drop_replication_slot('regression_slot'); + pg_drop_replication_slot +-------------------------- + +(1 row) + diff --git a/contrib/test_decoding/sql/xact.sql b/contrib/test_decoding/sql/xact.sql new file mode 100644 index 00000000000..9ce238f62df --- /dev/null +++ b/contrib/test_decoding/sql/xact.sql @@ -0,0 +1,22 @@ +-- predictability +SET synchronous_commit = on; + +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + +-- bug #13844, xids in non-decoded records need to be inspected +CREATE TABLE xact_test(data text); +INSERT INTO xact_test VALUES ('before-test'); + +BEGIN; +-- perform operation in xact that creates and logs xid, but isn't decoded +SELECT * FROM xact_test FOR UPDATE; +SAVEPOINT foo; +-- and now actually insert in subxact, xid is expected to be known +INSERT INTO xact_test VALUES ('after-assignment'); +COMMIT; +-- and now show those changes +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + +DROP TABLE xact_test; + +SELECT pg_drop_replication_slot('regression_slot'); diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 61c3ed740bb..c530fe9ba09 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -78,6 +78,14 @@ static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup); * Take every XLogReadRecord()ed record and perform the actions required to * decode it using the output plugin already setup in the logical decoding * context. + * + * NB: Note that every record's xid needs to be processed by reorderbuffer + * (xids contained in the content of records are not relevant for this rule). + * That means that for records which'd otherwise not go through the + * reorderbuffer ReorderBufferProcessXid() has to be called. We don't want to + * call ReorderBufferProcessXid for each record type by default, because + * e.g. empty xacts can be handled more efficiently if there's no previous + * state for them. */ void LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record) @@ -135,6 +143,9 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor case RM_BRIN_ID: case RM_COMMIT_TS_ID: case RM_REPLORIGIN_ID: + /* just deal with xid, and done */ + ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record), + buf.origptr); break; case RM_NEXT_ID: elog(ERROR, "unexpected RM_NEXT_ID rmgr_id: %u", (RmgrIds) XLogRecGetRmid(buf.record)); @@ -150,6 +161,9 @@ DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) SnapBuild *builder = ctx->snapshot_builder; uint8 info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK; + ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(buf->record), + buf->origptr); + switch (info) { /* this is also used in END_OF_RECOVERY checkpoints */ @@ -191,7 +205,12 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) XLogReaderState *r = buf->record; uint8 info = XLogRecGetInfo(r) & XLOG_XACT_OPMASK; - /* no point in doing anything yet, data could not be decoded anyway */ + /* + * No point in doing anything yet, data could not be decoded anyway. It's + * ok not to call ReorderBufferProcessXid() in that case, except in the + * assignment case there'll not be any later records with the same xid; + * and in the assignment case we'll not decode those xacts. + */ if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT) return; @@ -260,6 +279,7 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) * transactions in the changestream allowing for a kind of * distributed 2PC. */ + ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr); break; default: elog(ERROR, "unexpected RM_XACT_ID record type: %u", info); @@ -276,6 +296,8 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) XLogReaderState *r = buf->record; uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK; + ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr); + switch (info) { case XLOG_RUNNING_XACTS: @@ -313,6 +335,8 @@ DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) TransactionId xid = XLogRecGetXid(buf->record); SnapBuild *builder = ctx->snapshot_builder; + ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr); + /* no point in doing anything yet */ if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT) return; @@ -366,6 +390,8 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf) TransactionId xid = XLogRecGetXid(buf->record); SnapBuild *builder = ctx->snapshot_builder; + ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr); + /* no point in doing anything yet */ if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT) return; diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c index 478c3e874af..b3276c74c7c 100644 --- a/src/backend/replication/logical/reorderbuffer.c +++ b/src/backend/replication/logical/reorderbuffer.c @@ -1741,16 +1741,21 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn) /* - * Check whether a transaction is already known in this module.xs + * Tell reorderbuffer about an xid seen in the WAL stream. Has to be called at + * least once for every xid in XLogRecord->xl_xid (other places in records + * may, but do not have to be passed through here). + * + * Reorderbuffer keeps some datastructures about transactions in LSN order, + * for efficiency. To do that it has to know about when transactions are seen + * first in the WAL. As many types of records are not actually interesting for + * logical decoding, they do not necessarily pass though here. */ -bool -ReorderBufferIsXidKnown(ReorderBuffer *rb, TransactionId xid) +void +ReorderBufferProcessXid(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn) { - ReorderBufferTXN *txn; - - txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, - false); - return txn != NULL; + /* many records won't have an xid assigned, centralize check here */ + if (xid != InvalidTransactionId) + ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); } /* diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 16100538663..eb571045fea 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -635,8 +635,6 @@ SnapBuildClearExportedSnapshot() bool SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn) { - bool is_old_tx; - /* * We can't handle data in transactions if we haven't built a snapshot * yet, so don't store them. @@ -657,9 +655,7 @@ SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn) * If the reorderbuffer doesn't yet have a snapshot, add one now, it will * be needed to decode the change we're currently processing. */ - is_old_tx = ReorderBufferIsXidKnown(builder->reorder, xid); - - if (!is_old_tx || !ReorderBufferXidHasBaseSnapshot(builder->reorder, xid)) + if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid)) { /* only build a new snapshot if we don't have a prebuilt one */ if (builder->snapshot == NULL) diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h index bf0d790053b..cf47a117200 100644 --- a/src/include/replication/reorderbuffer.h +++ b/src/include/replication/reorderbuffer.h @@ -369,7 +369,7 @@ void ReorderBufferAddNewTupleCids(ReorderBuffer *, TransactionId, XLogRecPtr lsn CommandId cmin, CommandId cmax, CommandId combocid); void ReorderBufferAddInvalidations(ReorderBuffer *, TransactionId, XLogRecPtr lsn, Size nmsgs, SharedInvalidationMessage *msgs); -bool ReorderBufferIsXidKnown(ReorderBuffer *, TransactionId xid); +void ReorderBufferProcessXid(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn); void ReorderBufferXidSetCatalogChanges(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn); bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid); bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *, TransactionId xid); |