aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--contrib/test_decoding/Makefile4
-rw-r--r--contrib/test_decoding/expected/xact.out42
-rw-r--r--contrib/test_decoding/sql/xact.sql22
-rw-r--r--src/backend/replication/logical/decode.c28
-rw-r--r--src/backend/replication/logical/reorderbuffer.c21
-rw-r--r--src/backend/replication/logical/snapbuild.c6
-rw-r--r--src/include/replication/reorderbuffer.h2
7 files changed, 108 insertions, 17 deletions
diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index 78816bfe2f8..200c43efd6f 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -37,8 +37,8 @@ submake-isolation:
submake-test_decoding:
$(MAKE) -C $(top_builddir)/contrib/test_decoding
-REGRESSCHECKS=ddl rewrite toast permissions decoding_in_xact decoding_into_rel \
- binary prepared replorigin time
+REGRESSCHECKS=ddl xact rewrite toast permissions decoding_in_xact \
+ decoding_into_rel binary prepared replorigin time
regresscheck: | submake-regress submake-test_decoding temp-install
$(MKDIR_P) regression_output
diff --git a/contrib/test_decoding/expected/xact.out b/contrib/test_decoding/expected/xact.out
new file mode 100644
index 00000000000..507b701c3ab
--- /dev/null
+++ b/contrib/test_decoding/expected/xact.out
@@ -0,0 +1,42 @@
+-- predictability
+SET synchronous_commit = on;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+ ?column?
+----------
+ init
+(1 row)
+
+-- bug #13844, xids in non-decoded records need to be inspected
+CREATE TABLE xact_test(data text);
+INSERT INTO xact_test VALUES ('before-test');
+BEGIN;
+-- perform operation in xact that creates and logs xid, but isn't decoded
+SELECT * FROM xact_test FOR UPDATE;
+ data
+-------------
+ before-test
+(1 row)
+
+SAVEPOINT foo;
+-- and now actually insert in subxact, xid is expected to be known
+INSERT INTO xact_test VALUES ('after-assignment');
+COMMIT;
+-- and now show those changes
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+ data
+---------------------------------------------------------------
+ BEGIN
+ table public.xact_test: INSERT: data[text]:'before-test'
+ COMMIT
+ BEGIN
+ table public.xact_test: INSERT: data[text]:'after-assignment'
+ COMMIT
+(6 rows)
+
+DROP TABLE xact_test;
+SELECT pg_drop_replication_slot('regression_slot');
+ pg_drop_replication_slot
+--------------------------
+
+(1 row)
+
diff --git a/contrib/test_decoding/sql/xact.sql b/contrib/test_decoding/sql/xact.sql
new file mode 100644
index 00000000000..9ce238f62df
--- /dev/null
+++ b/contrib/test_decoding/sql/xact.sql
@@ -0,0 +1,22 @@
+-- predictability
+SET synchronous_commit = on;
+
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+
+-- bug #13844, xids in non-decoded records need to be inspected
+CREATE TABLE xact_test(data text);
+INSERT INTO xact_test VALUES ('before-test');
+
+BEGIN;
+-- perform operation in xact that creates and logs xid, but isn't decoded
+SELECT * FROM xact_test FOR UPDATE;
+SAVEPOINT foo;
+-- and now actually insert in subxact, xid is expected to be known
+INSERT INTO xact_test VALUES ('after-assignment');
+COMMIT;
+-- and now show those changes
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+DROP TABLE xact_test;
+
+SELECT pg_drop_replication_slot('regression_slot');
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 61c3ed740bb..c530fe9ba09 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -78,6 +78,14 @@ static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
* Take every XLogReadRecord()ed record and perform the actions required to
* decode it using the output plugin already setup in the logical decoding
* context.
+ *
+ * NB: Note that every record's xid needs to be processed by reorderbuffer
+ * (xids contained in the content of records are not relevant for this rule).
+ * That means that for records which'd otherwise not go through the
+ * reorderbuffer ReorderBufferProcessXid() has to be called. We don't want to
+ * call ReorderBufferProcessXid for each record type by default, because
+ * e.g. empty xacts can be handled more efficiently if there's no previous
+ * state for them.
*/
void
LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *record)
@@ -135,6 +143,9 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
case RM_BRIN_ID:
case RM_COMMIT_TS_ID:
case RM_REPLORIGIN_ID:
+ /* just deal with xid, and done */
+ ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(record),
+ buf.origptr);
break;
case RM_NEXT_ID:
elog(ERROR, "unexpected RM_NEXT_ID rmgr_id: %u", (RmgrIds) XLogRecGetRmid(buf.record));
@@ -150,6 +161,9 @@ DecodeXLogOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
SnapBuild *builder = ctx->snapshot_builder;
uint8 info = XLogRecGetInfo(buf->record) & ~XLR_INFO_MASK;
+ ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(buf->record),
+ buf->origptr);
+
switch (info)
{
/* this is also used in END_OF_RECOVERY checkpoints */
@@ -191,7 +205,12 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
XLogReaderState *r = buf->record;
uint8 info = XLogRecGetInfo(r) & XLOG_XACT_OPMASK;
- /* no point in doing anything yet, data could not be decoded anyway */
+ /*
+ * No point in doing anything yet, data could not be decoded anyway. It's
+ * ok not to call ReorderBufferProcessXid() in that case, except in the
+ * assignment case there'll not be any later records with the same xid;
+ * and in the assignment case we'll not decode those xacts.
+ */
if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
return;
@@ -260,6 +279,7 @@ DecodeXactOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
* transactions in the changestream allowing for a kind of
* distributed 2PC.
*/
+ ReorderBufferProcessXid(reorder, XLogRecGetXid(r), buf->origptr);
break;
default:
elog(ERROR, "unexpected RM_XACT_ID record type: %u", info);
@@ -276,6 +296,8 @@ DecodeStandbyOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
XLogReaderState *r = buf->record;
uint8 info = XLogRecGetInfo(r) & ~XLR_INFO_MASK;
+ ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);
+
switch (info)
{
case XLOG_RUNNING_XACTS:
@@ -313,6 +335,8 @@ DecodeHeap2Op(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
TransactionId xid = XLogRecGetXid(buf->record);
SnapBuild *builder = ctx->snapshot_builder;
+ ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
+
/* no point in doing anything yet */
if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
return;
@@ -366,6 +390,8 @@ DecodeHeapOp(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
TransactionId xid = XLogRecGetXid(buf->record);
SnapBuild *builder = ctx->snapshot_builder;
+ ReorderBufferProcessXid(ctx->reorder, xid, buf->origptr);
+
/* no point in doing anything yet */
if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
return;
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 478c3e874af..b3276c74c7c 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -1741,16 +1741,21 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
/*
- * Check whether a transaction is already known in this module.xs
+ * Tell reorderbuffer about an xid seen in the WAL stream. Has to be called at
+ * least once for every xid in XLogRecord->xl_xid (other places in records
+ * may, but do not have to be passed through here).
+ *
+ * Reorderbuffer keeps some datastructures about transactions in LSN order,
+ * for efficiency. To do that it has to know about when transactions are seen
+ * first in the WAL. As many types of records are not actually interesting for
+ * logical decoding, they do not necessarily pass though here.
*/
-bool
-ReorderBufferIsXidKnown(ReorderBuffer *rb, TransactionId xid)
+void
+ReorderBufferProcessXid(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
{
- ReorderBufferTXN *txn;
-
- txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
- false);
- return txn != NULL;
+ /* many records won't have an xid assigned, centralize check here */
+ if (xid != InvalidTransactionId)
+ ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
}
/*
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 16100538663..eb571045fea 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -635,8 +635,6 @@ SnapBuildClearExportedSnapshot()
bool
SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
{
- bool is_old_tx;
-
/*
* We can't handle data in transactions if we haven't built a snapshot
* yet, so don't store them.
@@ -657,9 +655,7 @@ SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
* If the reorderbuffer doesn't yet have a snapshot, add one now, it will
* be needed to decode the change we're currently processing.
*/
- is_old_tx = ReorderBufferIsXidKnown(builder->reorder, xid);
-
- if (!is_old_tx || !ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
+ if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
{
/* only build a new snapshot if we don't have a prebuilt one */
if (builder->snapshot == NULL)
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index bf0d790053b..cf47a117200 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -369,7 +369,7 @@ void ReorderBufferAddNewTupleCids(ReorderBuffer *, TransactionId, XLogRecPtr lsn
CommandId cmin, CommandId cmax, CommandId combocid);
void ReorderBufferAddInvalidations(ReorderBuffer *, TransactionId, XLogRecPtr lsn,
Size nmsgs, SharedInvalidationMessage *msgs);
-bool ReorderBufferIsXidKnown(ReorderBuffer *, TransactionId xid);
+void ReorderBufferProcessXid(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn);
void ReorderBufferXidSetCatalogChanges(ReorderBuffer *, TransactionId xid, XLogRecPtr lsn);
bool ReorderBufferXidHasCatalogChanges(ReorderBuffer *, TransactionId xid);
bool ReorderBufferXidHasBaseSnapshot(ReorderBuffer *, TransactionId xid);