aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--contrib/test_decoding/Makefile2
-rw-r--r--contrib/test_decoding/expected/catalog_change_snapshot.out44
-rw-r--r--contrib/test_decoding/specs/catalog_change_snapshot.spec39
-rw-r--r--src/backend/replication/logical/decode.c3
-rw-r--r--src/backend/replication/logical/reorderbuffer.c71
-rw-r--r--src/backend/replication/logical/snapbuild.c273
-rw-r--r--src/include/replication/reorderbuffer.h12
-rw-r--r--src/include/replication/snapbuild.h2
8 files changed, 353 insertions, 93 deletions
diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index b2209064790..c7ce6037064 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -8,7 +8,7 @@ REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
spill slot truncate stream stats twophase twophase_stream
ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
oldest_xmin snapshot_transfer subxact_without_top concurrent_stream \
- twophase_snapshot slot_creation_error
+ twophase_snapshot slot_creation_error catalog_change_snapshot
REGRESS_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/test_decoding/logical.conf
diff --git a/contrib/test_decoding/expected/catalog_change_snapshot.out b/contrib/test_decoding/expected/catalog_change_snapshot.out
new file mode 100644
index 00000000000..dc4f9b7018f
--- /dev/null
+++ b/contrib/test_decoding/expected/catalog_change_snapshot.out
@@ -0,0 +1,44 @@
+Parsed test spec with 2 sessions
+
+starting permutation: s0_init s0_begin s0_savepoint s0_truncate s1_checkpoint s1_get_changes s0_commit s0_begin s0_insert s1_checkpoint s1_get_changes s0_commit s1_get_changes
+step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding');
+?column?
+--------
+init
+(1 row)
+
+step s0_begin: BEGIN;
+step s0_savepoint: SAVEPOINT sp1;
+step s0_truncate: TRUNCATE tbl1;
+step s1_checkpoint: CHECKPOINT;
+step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data
+----
+(0 rows)
+
+step s0_commit: COMMIT;
+step s0_begin: BEGIN;
+step s0_insert: INSERT INTO tbl1 VALUES (1);
+step s1_checkpoint: CHECKPOINT;
+step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data
+---------------------------------------
+BEGIN
+table public.tbl1: TRUNCATE: (no-flags)
+COMMIT
+(3 rows)
+
+step s0_commit: COMMIT;
+step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data
+-------------------------------------------------------------
+BEGIN
+table public.tbl1: INSERT: val1[integer]:1 val2[integer]:null
+COMMIT
+(3 rows)
+
+?column?
+--------
+stop
+(1 row)
+
diff --git a/contrib/test_decoding/specs/catalog_change_snapshot.spec b/contrib/test_decoding/specs/catalog_change_snapshot.spec
new file mode 100644
index 00000000000..2971ddc69cb
--- /dev/null
+++ b/contrib/test_decoding/specs/catalog_change_snapshot.spec
@@ -0,0 +1,39 @@
+# Test decoding only the commit record of the transaction that have
+# modified catalogs.
+setup
+{
+ DROP TABLE IF EXISTS tbl1;
+ CREATE TABLE tbl1 (val1 integer, val2 integer);
+}
+
+teardown
+{
+ DROP TABLE tbl1;
+ SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
+}
+
+session "s0"
+setup { SET synchronous_commit=on; }
+step "s0_init" { SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); }
+step "s0_begin" { BEGIN; }
+step "s0_savepoint" { SAVEPOINT sp1; }
+step "s0_truncate" { TRUNCATE tbl1; }
+step "s0_insert" { INSERT INTO tbl1 VALUES (1); }
+step "s0_commit" { COMMIT; }
+
+session "s1"
+setup { SET synchronous_commit=on; }
+step "s1_checkpoint" { CHECKPOINT; }
+step "s1_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); }
+
+# For the transaction that TRUNCATEd the table tbl1, the last decoding decodes
+# only its COMMIT record, because it starts from the RUNNING_XACTS record emitted
+# during the first checkpoint execution. This transaction must be marked as
+# containing catalog changes while decoding the COMMIT record and the decoding
+# of the INSERT record must read the pg_class with the correct historic snapshot.
+#
+# Note that in a case where bgwriter wrote the RUNNING_XACTS record between "s0_commit"
+# and "s0_begin", this doesn't happen as the decoding starts from the RUNNING_XACTS
+# record written by bgwriter. One might think we can either stop the bgwriter or
+# increase LOG_SNAPSHOT_INTERVAL_MS but it's not practical via tests.
+permutation "s0_init" "s0_begin" "s0_savepoint" "s0_truncate" "s1_checkpoint" "s1_get_changes" "s0_commit" "s0_begin" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_commit" "s1_get_changes"
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index c5c6a2ba689..1667d720b11 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -628,7 +628,8 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
}
SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid,
- parsed->nsubxacts, parsed->subxacts);
+ parsed->nsubxacts, parsed->subxacts,
+ parsed->xinfo);
/* ----
* Check whether we are interested in this specific transaction, and tell
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index 88a37fde722..1c21a1d14b6 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -349,6 +349,8 @@ ReorderBufferAllocate(void)
buffer->by_txn_last_xid = InvalidTransactionId;
buffer->by_txn_last_txn = NULL;
+ buffer->catchange_ntxns = 0;
+
buffer->outbuf = NULL;
buffer->outbufsize = 0;
buffer->size = 0;
@@ -366,6 +368,7 @@ ReorderBufferAllocate(void)
dlist_init(&buffer->toplevel_by_lsn);
dlist_init(&buffer->txns_by_base_snapshot_lsn);
+ dlist_init(&buffer->catchange_txns);
/*
* Ensure there's no stale data from prior uses of this slot, in case some
@@ -1526,14 +1529,22 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
}
/*
- * Remove TXN from its containing list.
+ * Remove TXN from its containing lists.
*
* Note: if txn is known as subxact, we are deleting the TXN from its
* parent's list of known subxacts; this leaves the parent's nsubxacts
* count too high, but we don't care. Otherwise, we are deleting the TXN
- * from the LSN-ordered list of toplevel TXNs.
+ * from the LSN-ordered list of toplevel TXNs. We remove the TXN from the
+ * list of catalog modifying transactions as well.
*/
dlist_delete(&txn->node);
+ if (rbtxn_has_catalog_changes(txn))
+ {
+ dlist_delete(&txn->catchange_node);
+ rb->catchange_ntxns--;
+
+ Assert(rb->catchange_ntxns >= 0);
+ }
/* now remove reference from buffer */
hash_search(rb->by_txn,
@@ -3275,10 +3286,16 @@ ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid,
XLogRecPtr lsn)
{
ReorderBufferTXN *txn;
+ ReorderBufferTXN *toptxn;
txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
- txn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES;
+ if (!rbtxn_has_catalog_changes(txn))
+ {
+ txn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES;
+ dlist_push_tail(&rb->catchange_txns, &txn->catchange_node);
+ rb->catchange_ntxns++;
+ }
/*
* Mark top-level transaction as having catalog changes too if one of its
@@ -3286,8 +3303,52 @@ ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid,
* conveniently check just top-level transaction and decide whether to
* build the hash table or not.
*/
- if (txn->toptxn != NULL)
- txn->toptxn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES;
+ toptxn = txn->toptxn;
+ if (toptxn != NULL && !rbtxn_has_catalog_changes(toptxn))
+ {
+ toptxn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES;
+ dlist_push_tail(&rb->catchange_txns, &toptxn->catchange_node);
+ rb->catchange_ntxns++;
+ }
+}
+
+/*
+ * Return palloc'ed array of the transactions that have changed catalogs.
+ * The returned array is sorted in xidComparator order.
+ *
+ * The caller must free the returned array when done with it.
+ */
+TransactionId *
+ReorderBufferGetCatalogChangesXacts(ReorderBuffer *rb)
+{
+ dlist_iter iter;
+ TransactionId *xids = NULL;
+ size_t xcnt = 0;
+
+ /* Quick return if the list is empty */
+ if (rb->catchange_ntxns == 0)
+ {
+ Assert(dlist_is_empty(&rb->catchange_txns));
+ return NULL;
+ }
+
+ /* Initialize XID array */
+ xids = (TransactionId *) palloc(sizeof(TransactionId) * rb->catchange_ntxns);
+ dlist_foreach(iter, &rb->catchange_txns)
+ {
+ ReorderBufferTXN *txn = dlist_container(ReorderBufferTXN,
+ catchange_node,
+ iter.cur);
+
+ Assert(rbtxn_has_catalog_changes(txn));
+
+ xids[xcnt++] = txn->xid;
+ }
+
+ qsort(xids, xcnt, sizeof(TransactionId), xidComparator);
+
+ Assert(xcnt == rb->catchange_ntxns);
+ return xids;
}
/*
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 73c0f15214a..1ff2c12240d 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -241,6 +241,33 @@ struct SnapBuild
*/
TransactionId *xip;
} committed;
+
+ /*
+ * Array of transactions and subtransactions that had modified catalogs
+ * and were running when the snapshot was serialized.
+ *
+ * We normally rely on some WAL record types such as HEAP2_NEW_CID to know
+ * if the transaction has changed the catalog. But it could happen that
+ * the logical decoding decodes only the commit record of the transaction
+ * after restoring the previously serialized snapshot in which case we
+ * will miss adding the xid to the snapshot and end up looking at the
+ * catalogs with the wrong snapshot.
+ *
+ * Now to avoid the above problem, we serialize the transactions that had
+ * modified the catalogs and are still running at the time of snapshot
+ * serialization. We fill this array while restoring the snapshot and then
+ * refer it while decoding commit to ensure if the xact has modified the
+ * catalog. We discard this array when all the xids in the list become old
+ * enough to matter. See SnapBuildPurgeOlderTxn for details.
+ */
+ struct
+ {
+ /* number of transactions */
+ size_t xcnt;
+
+ /* This array must be sorted in xidComparator order */
+ TransactionId *xip;
+ } catchange;
};
/*
@@ -250,8 +277,8 @@ struct SnapBuild
static ResourceOwner SavedResourceOwnerDuringExport = NULL;
static bool ExportInProgress = false;
-/* ->committed manipulation */
-static void SnapBuildPurgeCommittedTxn(SnapBuild *builder);
+/* ->committed and ->catchange manipulation */
+static void SnapBuildPurgeOlderTxn(SnapBuild *builder);
/* snapshot building/manipulation/distribution functions */
static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder);
@@ -262,6 +289,9 @@ static void SnapBuildSnapIncRefcount(Snapshot snap);
static void SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn);
+static inline bool SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid,
+ uint32 xinfo);
+
/* xlog reading helper functions for SnapBuildProcessRunningXacts */
static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running);
static void SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff);
@@ -269,6 +299,7 @@ static void SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutof
/* serialization functions */
static void SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn);
static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn);
+static void SnapBuildRestoreContents(int fd, char *dest, Size size, const char *path);
/*
* Allocate a new snapshot builder.
@@ -306,6 +337,9 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder,
palloc0(builder->committed.xcnt_space * sizeof(TransactionId));
builder->committed.includes_all_transactions = true;
+ builder->catchange.xcnt = 0;
+ builder->catchange.xip = NULL;
+
builder->initial_xmin_horizon = xmin_horizon;
builder->start_decoding_at = start_lsn;
builder->building_full_snapshot = need_full_snapshot;
@@ -888,12 +922,17 @@ SnapBuildAddCommittedTxn(SnapBuild *builder, TransactionId xid)
}
/*
- * Remove knowledge about transactions we treat as committed that are smaller
- * than ->xmin. Those won't ever get checked via the ->committed array but via
- * the clog machinery, so we don't need to waste memory on them.
+ * Remove knowledge about transactions we treat as committed or containing catalog
+ * changes that are smaller than ->xmin. Those won't ever get checked via
+ * the ->committed or ->catchange array, respectively. The committed xids will
+ * get checked via the clog machinery.
+ *
+ * We can ideally remove the transaction from catchange array once it is
+ * finished (committed/aborted) but that could be costly as we need to maintain
+ * the xids order in the array.
*/
static void
-SnapBuildPurgeCommittedTxn(SnapBuild *builder)
+SnapBuildPurgeOlderTxn(SnapBuild *builder)
{
int off;
TransactionId *workspace;
@@ -928,6 +967,30 @@ SnapBuildPurgeCommittedTxn(SnapBuild *builder)
builder->committed.xcnt = surviving_xids;
pfree(workspace);
+
+ /*
+ * Either all the xacts got purged or none. It is only possible to
+ * partially remove the xids from this array if one or more of the xids
+ * are still running but not all. That can happen if we start decoding
+ * from a point (LSN where the snapshot state became consistent) where all
+ * the xacts in this were running and then at least one of those got
+ * committed and a few are still running. We will never start from such a
+ * point because we won't move the slot's restart_lsn past the point where
+ * the oldest running transaction's restart_decoding_lsn is.
+ */
+ if (builder->catchange.xcnt == 0 ||
+ TransactionIdFollowsOrEquals(builder->catchange.xip[0],
+ builder->xmin))
+ return;
+
+ Assert(TransactionIdFollows(builder->xmin,
+ builder->catchange.xip[builder->catchange.xcnt - 1]));
+ pfree(builder->catchange.xip);
+ builder->catchange.xip = NULL;
+ builder->catchange.xcnt = 0;
+
+ elog(DEBUG3, "purged catalog modifying transactions, oldest running xid %u",
+ builder->xmin);
}
/*
@@ -935,7 +998,7 @@ SnapBuildPurgeCommittedTxn(SnapBuild *builder)
*/
void
SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
- int nsubxacts, TransactionId *subxacts)
+ int nsubxacts, TransactionId *subxacts, uint32 xinfo)
{
int nxact;
@@ -983,7 +1046,7 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
* Add subtransaction to base snapshot if catalog modifying, we don't
* distinguish to toplevel transactions there.
*/
- if (ReorderBufferXidHasCatalogChanges(builder->reorder, subxid))
+ if (SnapBuildXidHasCatalogChanges(builder, subxid, xinfo))
{
sub_needs_timetravel = true;
needs_snapshot = true;
@@ -1012,7 +1075,7 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
}
/* if top-level modified catalog, it'll need a snapshot */
- if (ReorderBufferXidHasCatalogChanges(builder->reorder, xid))
+ if (SnapBuildXidHasCatalogChanges(builder, xid, xinfo))
{
elog(DEBUG2, "found top level transaction %u, with catalog changes",
xid);
@@ -1089,6 +1152,29 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
}
}
+/*
+ * Check the reorder buffer and the snapshot to see if the given transaction has
+ * modified catalogs.
+ */
+static inline bool
+SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid,
+ uint32 xinfo)
+{
+ if (ReorderBufferXidHasCatalogChanges(builder->reorder, xid))
+ return true;
+
+ /*
+ * The transactions that have changed catalogs must have invalidation
+ * info.
+ */
+ if (!(xinfo & XACT_XINFO_HAS_INVALS))
+ return false;
+
+ /* Check the catchange XID array */
+ return ((builder->catchange.xcnt > 0) &&
+ (bsearch(&xid, builder->catchange.xip, builder->catchange.xcnt,
+ sizeof(TransactionId), xidComparator) != NULL));
+}
/* -----------------------------------
* Snapshot building functions dealing with xlog records
@@ -1135,7 +1221,7 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact
builder->xmin = running->oldestRunningXid;
/* Remove transactions we don't need to keep track off anymore */
- SnapBuildPurgeCommittedTxn(builder);
+ SnapBuildPurgeOlderTxn(builder);
/*
* Advance the xmin limit for the current replication slot, to allow
@@ -1438,6 +1524,7 @@ SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff)
*
* struct SnapBuildOnDisk;
* TransactionId * committed.xcnt; (*not xcnt_space*)
+ * TransactionId * catchange.xcnt;
*
*/
typedef struct SnapBuildOnDisk
@@ -1467,7 +1554,7 @@ typedef struct SnapBuildOnDisk
offsetof(SnapBuildOnDisk, version)
#define SNAPBUILD_MAGIC 0x51A1E001
-#define SNAPBUILD_VERSION 4
+#define SNAPBUILD_VERSION 5
/*
* Store/Load a snapshot from disk, depending on the snapshot builder's state.
@@ -1493,6 +1580,9 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
{
Size needed_length;
SnapBuildOnDisk *ondisk = NULL;
+ TransactionId *catchange_xip = NULL;
+ MemoryContext old_ctx;
+ size_t catchange_xcnt;
char *ondisk_c;
int fd;
char tmppath[MAXPGPATH];
@@ -1578,10 +1668,16 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
(errcode_for_file_access(),
errmsg("could not remove file \"%s\": %m", tmppath)));
+ old_ctx = MemoryContextSwitchTo(builder->context);
+
+ /* Get the catalog modifying transactions that are yet not committed */
+ catchange_xip = ReorderBufferGetCatalogChangesXacts(builder->reorder);
+ catchange_xcnt = builder->reorder->catchange_ntxns;
+
needed_length = sizeof(SnapBuildOnDisk) +
- sizeof(TransactionId) * builder->committed.xcnt;
+ sizeof(TransactionId) * (builder->committed.xcnt + catchange_xcnt);
- ondisk_c = MemoryContextAllocZero(builder->context, needed_length);
+ ondisk_c = palloc0(needed_length);
ondisk = (SnapBuildOnDisk *) ondisk_c;
ondisk->magic = SNAPBUILD_MAGIC;
ondisk->version = SNAPBUILD_VERSION;
@@ -1598,16 +1694,31 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
ondisk->builder.snapshot = NULL;
ondisk->builder.reorder = NULL;
ondisk->builder.committed.xip = NULL;
+ ondisk->builder.catchange.xip = NULL;
+ /* update catchange only on disk data */
+ ondisk->builder.catchange.xcnt = catchange_xcnt;
COMP_CRC32C(ondisk->checksum,
&ondisk->builder,
sizeof(SnapBuild));
/* copy committed xacts */
- sz = sizeof(TransactionId) * builder->committed.xcnt;
- memcpy(ondisk_c, builder->committed.xip, sz);
- COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
- ondisk_c += sz;
+ if (builder->committed.xcnt > 0)
+ {
+ sz = sizeof(TransactionId) * builder->committed.xcnt;
+ memcpy(ondisk_c, builder->committed.xip, sz);
+ COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
+ ondisk_c += sz;
+ }
+
+ /* copy catalog modifying xacts */
+ if (catchange_xcnt > 0)
+ {
+ sz = sizeof(TransactionId) * catchange_xcnt;
+ memcpy(ondisk_c, catchange_xip, sz);
+ COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
+ ondisk_c += sz;
+ }
FIN_CRC32C(ondisk->checksum);
@@ -1688,12 +1799,16 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
*/
builder->last_serialized_snapshot = lsn;
+ MemoryContextSwitchTo(old_ctx);
+
out:
ReorderBufferSetRestartPoint(builder->reorder,
builder->last_serialized_snapshot);
/* be tidy */
if (ondisk)
pfree(ondisk);
+ if (catchange_xip)
+ pfree(catchange_xip);
}
/*
@@ -1707,7 +1822,6 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
int fd;
char path[MAXPGPATH];
Size sz;
- int readBytes;
pg_crc32c checksum;
/* no point in loading a snapshot if we're already there */
@@ -1739,29 +1853,7 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
/* read statically sized portion of snapshot */
- pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
- readBytes = read(fd, &ondisk, SnapBuildOnDiskConstantSize);
- pgstat_report_wait_end();
- if (readBytes != SnapBuildOnDiskConstantSize)
- {
- int save_errno = errno;
-
- CloseTransientFile(fd);
-
- if (readBytes < 0)
- {
- errno = save_errno;
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not read file \"%s\": %m", path)));
- }
- else
- ereport(ERROR,
- (errcode(ERRCODE_DATA_CORRUPTED),
- errmsg("could not read file \"%s\": read %d of %zu",
- path, readBytes,
- (Size) SnapBuildOnDiskConstantSize)));
- }
+ SnapBuildRestoreContents(fd, (char *) &ondisk, SnapBuildOnDiskConstantSize, path);
if (ondisk.magic != SNAPBUILD_MAGIC)
ereport(ERROR,
@@ -1781,56 +1873,26 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
/* read SnapBuild */
- pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
- readBytes = read(fd, &ondisk.builder, sizeof(SnapBuild));
- pgstat_report_wait_end();
- if (readBytes != sizeof(SnapBuild))
- {
- int save_errno = errno;
-
- CloseTransientFile(fd);
-
- if (readBytes < 0)
- {
- errno = save_errno;
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not read file \"%s\": %m", path)));
- }
- else
- ereport(ERROR,
- (errcode(ERRCODE_DATA_CORRUPTED),
- errmsg("could not read file \"%s\": read %d of %zu",
- path, readBytes, sizeof(SnapBuild))));
- }
+ SnapBuildRestoreContents(fd, (char *) &ondisk.builder, sizeof(SnapBuild), path);
COMP_CRC32C(checksum, &ondisk.builder, sizeof(SnapBuild));
/* restore committed xacts information */
- sz = sizeof(TransactionId) * ondisk.builder.committed.xcnt;
- ondisk.builder.committed.xip = MemoryContextAllocZero(builder->context, sz);
- pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
- readBytes = read(fd, ondisk.builder.committed.xip, sz);
- pgstat_report_wait_end();
- if (readBytes != sz)
+ if (ondisk.builder.committed.xcnt > 0)
{
- int save_errno = errno;
-
- CloseTransientFile(fd);
+ sz = sizeof(TransactionId) * ondisk.builder.committed.xcnt;
+ ondisk.builder.committed.xip = MemoryContextAllocZero(builder->context, sz);
+ SnapBuildRestoreContents(fd, (char *) ondisk.builder.committed.xip, sz, path);
+ COMP_CRC32C(checksum, ondisk.builder.committed.xip, sz);
+ }
- if (readBytes < 0)
- {
- errno = save_errno;
- ereport(ERROR,
- (errcode_for_file_access(),
- errmsg("could not read file \"%s\": %m", path)));
- }
- else
- ereport(ERROR,
- (errcode(ERRCODE_DATA_CORRUPTED),
- errmsg("could not read file \"%s\": read %d of %zu",
- path, readBytes, sz)));
+ /* restore catalog modifying xacts information */
+ if (ondisk.builder.catchange.xcnt > 0)
+ {
+ sz = sizeof(TransactionId) * ondisk.builder.catchange.xcnt;
+ ondisk.builder.catchange.xip = MemoryContextAllocZero(builder->context, sz);
+ SnapBuildRestoreContents(fd, (char *) ondisk.builder.catchange.xip, sz, path);
+ COMP_CRC32C(checksum, ondisk.builder.catchange.xip, sz);
}
- COMP_CRC32C(checksum, ondisk.builder.committed.xip, sz);
if (CloseTransientFile(fd) != 0)
ereport(ERROR,
@@ -1885,6 +1947,13 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
}
ondisk.builder.committed.xip = NULL;
+ /* set catalog modifying transactions */
+ if (builder->catchange.xip)
+ pfree(builder->catchange.xip);
+ builder->catchange.xcnt = ondisk.builder.catchange.xcnt;
+ builder->catchange.xip = ondisk.builder.catchange.xip;
+ ondisk.builder.catchange.xip = NULL;
+
/* our snapshot is not interesting anymore, build a new one */
if (builder->snapshot != NULL)
{
@@ -1906,10 +1975,44 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
snapshot_not_interesting:
if (ondisk.builder.committed.xip != NULL)
pfree(ondisk.builder.committed.xip);
+ if (ondisk.builder.catchange.xip != NULL)
+ pfree(ondisk.builder.catchange.xip);
return false;
}
/*
+ * Read the contents of the serialized snapshot to 'dest'.
+ */
+static void
+SnapBuildRestoreContents(int fd, char *dest, Size size, const char *path)
+{
+ int readBytes;
+
+ pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
+ readBytes = read(fd, dest, size);
+ pgstat_report_wait_end();
+ if (readBytes != size)
+ {
+ int save_errno = errno;
+
+ CloseTransientFile(fd);
+
+ if (readBytes < 0)
+ {
+ errno = save_errno;
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not read file \"%s\": %m", path)));
+ }
+ else
+ ereport(ERROR,
+ (errcode(ERRCODE_DATA_CORRUPTED),
+ errmsg("could not read file \"%s\": read %d of %zu",
+ path, readBytes, sizeof(SnapBuild))));
+ }
+}
+
+/*
* Remove all serialized snapshots that are not required anymore because no
* slot can need them. This doesn't actually have to run during a checkpoint,
* but it's a convenient point to schedule this.
diff --git a/src/include/replication/reorderbuffer.h b/src/include/replication/reorderbuffer.h
index 2c9206ace41..8695901ba71 100644
--- a/src/include/replication/reorderbuffer.h
+++ b/src/include/replication/reorderbuffer.h
@@ -381,6 +381,11 @@ typedef struct ReorderBufferTXN
dlist_node node;
/*
+ * A node in the list of catalog modifying transactions
+ */
+ dlist_node catchange_node;
+
+ /*
* Size of this transaction (changes currently in memory, in bytes).
*/
Size size;
@@ -527,6 +532,12 @@ struct ReorderBuffer
dlist_head txns_by_base_snapshot_lsn;
/*
+ * Transactions and subtransactions that have modified system catalogs.
+ */
+ dlist_head catchange_txns;
+ int catchange_ntxns;
+
+ /*
* one-entry sized cache for by_txn. Very frequently the same txn gets
* looked up over and over again.
*/
@@ -677,6 +688,7 @@ extern void ReorderBufferSkipPrepare(ReorderBuffer *rb, TransactionId xid);
extern void ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, char *gid);
extern ReorderBufferTXN *ReorderBufferGetOldestTXN(ReorderBuffer *);
extern TransactionId ReorderBufferGetOldestXmin(ReorderBuffer *rb);
+extern TransactionId *ReorderBufferGetCatalogChangesXacts(ReorderBuffer *rb);
extern void ReorderBufferSetRestartPoint(ReorderBuffer *, XLogRecPtr ptr);
diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h
index d179251aad9..e6adea24f22 100644
--- a/src/include/replication/snapbuild.h
+++ b/src/include/replication/snapbuild.h
@@ -82,7 +82,7 @@ extern void SnapBuildSetTwoPhaseAt(SnapBuild *builder, XLogRecPtr ptr);
extern void SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn,
TransactionId xid, int nsubxacts,
- TransactionId *subxacts);
+ TransactionId *subxacts, uint32 xinfo);
extern bool SnapBuildProcessChange(SnapBuild *builder, TransactionId xid,
XLogRecPtr lsn);
extern void SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,