aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--contrib/test_decoding/Makefile2
-rw-r--r--contrib/test_decoding/expected/catalog_change_snapshot.out44
-rw-r--r--contrib/test_decoding/specs/catalog_change_snapshot.spec39
-rw-r--r--src/backend/replication/logical/decode.c15
-rw-r--r--src/backend/replication/logical/snapbuild.c133
-rw-r--r--src/include/replication/snapbuild.h3
6 files changed, 227 insertions, 9 deletions
diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index f439c582a5f..6ec09ab192e 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -7,7 +7,7 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
decoding_into_rel binary prepared replorigin time messages \
spill slot truncate
ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
- oldest_xmin snapshot_transfer subxact_without_top
+ oldest_xmin snapshot_transfer subxact_without_top catalog_change_snapshot
REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
diff --git a/contrib/test_decoding/expected/catalog_change_snapshot.out b/contrib/test_decoding/expected/catalog_change_snapshot.out
new file mode 100644
index 00000000000..dc4f9b7018f
--- /dev/null
+++ b/contrib/test_decoding/expected/catalog_change_snapshot.out
@@ -0,0 +1,44 @@
+Parsed test spec with 2 sessions
+
+starting permutation: s0_init s0_begin s0_savepoint s0_truncate s1_checkpoint s1_get_changes s0_commit s0_begin s0_insert s1_checkpoint s1_get_changes s0_commit s1_get_changes
+step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding');
+?column?
+--------
+init
+(1 row)
+
+step s0_begin: BEGIN;
+step s0_savepoint: SAVEPOINT sp1;
+step s0_truncate: TRUNCATE tbl1;
+step s1_checkpoint: CHECKPOINT;
+step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data
+----
+(0 rows)
+
+step s0_commit: COMMIT;
+step s0_begin: BEGIN;
+step s0_insert: INSERT INTO tbl1 VALUES (1);
+step s1_checkpoint: CHECKPOINT;
+step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data
+---------------------------------------
+BEGIN
+table public.tbl1: TRUNCATE: (no-flags)
+COMMIT
+(3 rows)
+
+step s0_commit: COMMIT;
+step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data
+-------------------------------------------------------------
+BEGIN
+table public.tbl1: INSERT: val1[integer]:1 val2[integer]:null
+COMMIT
+(3 rows)
+
+?column?
+--------
+stop
+(1 row)
+
diff --git a/contrib/test_decoding/specs/catalog_change_snapshot.spec b/contrib/test_decoding/specs/catalog_change_snapshot.spec
new file mode 100644
index 00000000000..2971ddc69cb
--- /dev/null
+++ b/contrib/test_decoding/specs/catalog_change_snapshot.spec
@@ -0,0 +1,39 @@
+# Test decoding only the commit record of the transaction that have
+# modified catalogs.
+setup
+{
+ DROP TABLE IF EXISTS tbl1;
+ CREATE TABLE tbl1 (val1 integer, val2 integer);
+}
+
+teardown
+{
+ DROP TABLE tbl1;
+ SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
+}
+
+session "s0"
+setup { SET synchronous_commit=on; }
+step "s0_init" { SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); }
+step "s0_begin" { BEGIN; }
+step "s0_savepoint" { SAVEPOINT sp1; }
+step "s0_truncate" { TRUNCATE tbl1; }
+step "s0_insert" { INSERT INTO tbl1 VALUES (1); }
+step "s0_commit" { COMMIT; }
+
+session "s1"
+setup { SET synchronous_commit=on; }
+step "s1_checkpoint" { CHECKPOINT; }
+step "s1_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); }
+
+# For the transaction that TRUNCATEd the table tbl1, the last decoding decodes
+# only its COMMIT record, because it starts from the RUNNING_XACTS record emitted
+# during the first checkpoint execution. This transaction must be marked as
+# containing catalog changes while decoding the COMMIT record and the decoding
+# of the INSERT record must read the pg_class with the correct historic snapshot.
+#
+# Note that in a case where bgwriter wrote the RUNNING_XACTS record between "s0_commit"
+# and "s0_begin", this doesn't happen as the decoding starts from the RUNNING_XACTS
+# record written by bgwriter. One might think we can either stop the bgwriter or
+# increase LOG_SNAPSHOT_INTERVAL_MS but it's not practical via tests.
+permutation "s0_init" "s0_begin" "s0_savepoint" "s0_truncate" "s1_checkpoint" "s1_get_changes" "s0_commit" "s0_begin" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_commit" "s1_get_changes"
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 5a2b828aa3f..87cbd08e858 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -582,7 +582,20 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
if (!ctx->fast_forward)
ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr,
parsed->nmsgs, parsed->msgs);
- ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr);
+ /*
+ * If the COMMIT record has invalidation messages, it could have catalog
+ * changes. It is possible that we didn't mark this transaction and
+ * its subtransactions as containing catalog changes when the decoding
+ * starts from a commit record without decoding the transaction's other
+ * changes. Therefore, we ensure to mark such transactions as containing
+ * catalog change.
+ *
+ * This must be done before SnapBuildCommitTxn() so that we can include
+ * these transactions in the historic snapshot.
+ */
+ SnapBuildXidSetCatalogChanges(ctx->snapshot_builder, xid,
+ parsed->nsubxacts, parsed->subxacts,
+ buf->origptr);
}
SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid,
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index be46bf0363d..d407fb3440e 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -252,8 +252,38 @@ struct SnapBuild
static ResourceOwner SavedResourceOwnerDuringExport = NULL;
static bool ExportInProgress = false;
-/* ->committed manipulation */
-static void SnapBuildPurgeCommittedTxn(SnapBuild *builder);
+/*
+ * Array of transactions and subtransactions that were running when
+ * the xl_running_xacts record that we decoded was written. The array is
+ * sorted in xidComparator order. We remove xids from this array when
+ * they become old enough to matter, and then it eventually becomes empty.
+ * This array is allocated in builder->context so its lifetime is the same
+ * as the snapshot builder.
+ *
+ * We normally rely on some WAL record types such as HEAP2_NEW_CID to know
+ * if the transaction has changed the catalog. But it could happen that the
+ * logical decoding decodes only the commit record of the transaction after
+ * restoring the previously serialized snapshot in which case we will miss
+ * adding the xid to the snapshot and end up looking at the catalogs with the
+ * wrong snapshot.
+ *
+ * Now to avoid the above problem, if the COMMIT record of the xid listed in
+ * InitialRunningXacts has XACT_XINFO_HAS_INVALS flag, we mark both the top
+ * transaction and its substransactions as containing catalog changes.
+ *
+ * We could end up adding the transaction that didn't change catalog
+ * to the snapshot since we cannot distinguish whether the transaction
+ * has catalog changes only by checking the COMMIT record. It doesn't
+ * have the information on which (sub) transaction has catalog changes,
+ * and XACT_XINFO_HAS_INVALS doesn't necessarily indicate that the
+ * transaction has catalog change. But that won't be a problem since we
+ * use snapshot built during decoding only for reading system catalogs.
+ */
+static TransactionId *InitialRunningXacts = NULL;
+static int NInitialRunningXacts = 0;
+
+/* ->committed and InitailRunningXacts manipulation */
+static void SnapBuildPurgeOlderTxn(SnapBuild *builder);
/* snapshot building/manipulation/distribution functions */
static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder);
@@ -890,12 +920,17 @@ SnapBuildAddCommittedTxn(SnapBuild *builder, TransactionId xid)
}
/*
- * Remove knowledge about transactions we treat as committed that are smaller
- * than ->xmin. Those won't ever get checked via the ->committed array but via
- * the clog machinery, so we don't need to waste memory on them.
+ * Remove knowledge about transactions we treat as committed and the initial
+ * running transactions that are smaller than ->xmin. Those won't ever get
+ * checked via the ->committed or InitialRunningXacts array, respectively.
+ * The committed xids will get checked via the clog machinery.
+ *
+ * We can ideally remove the transaction from InitialRunningXacts array
+ * once it is finished (committed/aborted) but that could be costly as we need
+ * to maintain the xids order in the array.
*/
static void
-SnapBuildPurgeCommittedTxn(SnapBuild *builder)
+SnapBuildPurgeOlderTxn(SnapBuild *builder)
{
int off;
TransactionId *workspace;
@@ -930,6 +965,49 @@ SnapBuildPurgeCommittedTxn(SnapBuild *builder)
builder->committed.xcnt = surviving_xids;
pfree(workspace);
+
+ /* Quick exit if there is no initial running transactions */
+ if (NInitialRunningXacts == 0)
+ return;
+
+ /* bound check if there is at least one transaction to remove */
+ if (!NormalTransactionIdPrecedes(InitialRunningXacts[0],
+ builder->xmin))
+ return;
+
+ /*
+ * purge xids in InitialRunningXacts as well. The purged array must also
+ * be sorted in xidComparator order.
+ */
+ workspace =
+ MemoryContextAlloc(builder->context,
+ NInitialRunningXacts * sizeof(TransactionId));
+ surviving_xids = 0;
+ for (off = 0; off < NInitialRunningXacts; off++)
+ {
+ if (NormalTransactionIdPrecedes(InitialRunningXacts[off],
+ builder->xmin))
+ ; /* remove */
+ else
+ workspace[surviving_xids++] = InitialRunningXacts[off];
+ }
+
+ if (surviving_xids > 0)
+ memcpy(InitialRunningXacts, workspace,
+ sizeof(TransactionId) * surviving_xids);
+ else
+ {
+ pfree(InitialRunningXacts);
+ InitialRunningXacts = NULL;
+ }
+
+ elog(DEBUG3, "purged initial running transactions from %u to %u, oldest running xid %u",
+ (uint32) NInitialRunningXacts,
+ (uint32) surviving_xids,
+ builder->xmin);
+
+ NInitialRunningXacts = surviving_xids;
+ pfree(workspace);
}
/*
@@ -1137,7 +1215,7 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact
builder->xmin = running->oldestRunningXid;
/* Remove transactions we don't need to keep track off anymore */
- SnapBuildPurgeCommittedTxn(builder);
+ SnapBuildPurgeOlderTxn(builder);
/*
* Advance the xmin limit for the current replication slot, to allow
@@ -1288,6 +1366,20 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
else if (!builder->building_full_snapshot &&
SnapBuildRestore(builder, lsn))
{
+ int nxacts = running->subxcnt + running->xcnt;
+ Size sz = sizeof(TransactionId) * nxacts;
+
+ /*
+ * Remember the transactions and subtransactions that were running
+ * when xl_running_xacts record that we decoded was written. We use
+ * this later to identify the transactions have performed catalog
+ * changes. See SnapBuildXidSetCatalogChanges.
+ */
+ NInitialRunningXacts = nxacts;
+ InitialRunningXacts = MemoryContextAlloc(builder->context, sz);
+ memcpy(InitialRunningXacts, running->xids, sz);
+ qsort(InitialRunningXacts, nxacts, sizeof(TransactionId), xidComparator);
+
/* there won't be any state to cleanup */
return false;
}
@@ -2030,3 +2122,30 @@ CheckPointSnapBuild(void)
}
FreeDir(snap_dir);
}
+
+/*
+ * Mark the transaction as containing catalog changes. In addition, if the
+ * given xid is in the list of the initial running xacts, we mark its
+ * subtransactions as well. See comments for NInitialRunningXacts and
+ * InitialRunningXacts for additional info.
+ */
+void
+SnapBuildXidSetCatalogChanges(SnapBuild *builder, TransactionId xid, int subxcnt,
+ TransactionId *subxacts, XLogRecPtr lsn)
+{
+ ReorderBufferXidSetCatalogChanges(builder->reorder, xid, lsn);
+
+ /* Skip if there is no initial running xacts information */
+ if (NInitialRunningXacts == 0)
+ return;
+
+ if (bsearch(&xid, InitialRunningXacts, NInitialRunningXacts,
+ sizeof(TransactionId), xidComparator) != NULL)
+ {
+ for (int i = 0; i < subxcnt; i++)
+ {
+ ReorderBufferAssignChild(builder->reorder, xid, subxacts[i], lsn);
+ ReorderBufferXidSetCatalogChanges(builder->reorder, subxacts[i], lsn);
+ }
+ }
+}
diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h
index b048dc7484c..17d2f933004 100644
--- a/src/include/replication/snapbuild.h
+++ b/src/include/replication/snapbuild.h
@@ -88,4 +88,7 @@ extern void SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn,
struct xl_running_xacts *running);
extern void SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn);
+extern void SnapBuildXidSetCatalogChanges(SnapBuild *builder, TransactionId xid,
+ int subxcnt, TransactionId *subxacts,
+ XLogRecPtr lsn);
#endif /* SNAPBUILD_H */