aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/logical/decode.c15
-rw-r--r--src/backend/replication/logical/snapbuild.c133
-rw-r--r--src/include/replication/snapbuild.h3
3 files changed, 143 insertions, 8 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 60d07ce4eb5..19cd0bf76ac 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -585,7 +585,20 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
if (!ctx->fast_forward)
ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr,
parsed->nmsgs, parsed->msgs);
- ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr);
+ /*
+ * If the COMMIT record has invalidation messages, it could have catalog
+ * changes. It is possible that we didn't mark this transaction and
+ * its subtransactions as containing catalog changes when the decoding
+ * starts from a commit record without decoding the transaction's other
+ * changes. Therefore, we ensure to mark such transactions as containing
+ * catalog change.
+ *
+ * This must be done before SnapBuildCommitTxn() so that we can include
+ * these transactions in the historic snapshot.
+ */
+ SnapBuildXidSetCatalogChanges(ctx->snapshot_builder, xid,
+ parsed->nsubxacts, parsed->subxacts,
+ buf->origptr);
}
SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid,
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 5a1bce5acc0..cd091bb724b 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -257,8 +257,38 @@ struct SnapBuild
static ResourceOwner SavedResourceOwnerDuringExport = NULL;
static bool ExportInProgress = false;
-/* ->committed manipulation */
-static void SnapBuildPurgeCommittedTxn(SnapBuild *builder);
+/*
+ * Array of transactions and subtransactions that were running when
+ * the xl_running_xacts record that we decoded was written. The array is
+ * sorted in xidComparator order. We remove xids from this array when
+ * they become old enough to matter, and then it eventually becomes empty.
+ * This array is allocated in builder->context so its lifetime is the same
+ * as the snapshot builder.
+ *
+ * We normally rely on some WAL record types such as HEAP2_NEW_CID to know
+ * if the transaction has changed the catalog. But it could happen that the
+ * logical decoding decodes only the commit record of the transaction after
+ * restoring the previously serialized snapshot in which case we will miss
+ * adding the xid to the snapshot and end up looking at the catalogs with the
+ * wrong snapshot.
+ *
+ * Now to avoid the above problem, if the COMMIT record of the xid listed in
+ * InitialRunningXacts has XACT_XINFO_HAS_INVALS flag, we mark both the top
+ * transaction and its substransactions as containing catalog changes.
+ *
+ * We could end up adding the transaction that didn't change catalog
+ * to the snapshot since we cannot distinguish whether the transaction
+ * has catalog changes only by checking the COMMIT record. It doesn't
+ * have the information on which (sub) transaction has catalog changes,
+ * and XACT_XINFO_HAS_INVALS doesn't necessarily indicate that the
+ * transaction has catalog change. But that won't be a problem since we
+ * use snapshot built during decoding only for reading system catalogs.
+ */
+static TransactionId *InitialRunningXacts = NULL;
+static int NInitialRunningXacts = 0;
+
+/* ->committed and InitailRunningXacts manipulation */
+static void SnapBuildPurgeOlderTxn(SnapBuild *builder);
/* snapshot building/manipulation/distribution functions */
static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder);
@@ -895,12 +925,17 @@ SnapBuildAddCommittedTxn(SnapBuild *builder, TransactionId xid)
}
/*
- * Remove knowledge about transactions we treat as committed that are smaller
- * than ->xmin. Those won't ever get checked via the ->committed array but via
- * the clog machinery, so we don't need to waste memory on them.
+ * Remove knowledge about transactions we treat as committed and the initial
+ * running transactions that are smaller than ->xmin. Those won't ever get
+ * checked via the ->committed or InitialRunningXacts array, respectively.
+ * The committed xids will get checked via the clog machinery.
+ *
+ * We can ideally remove the transaction from InitialRunningXacts array
+ * once it is finished (committed/aborted) but that could be costly as we need
+ * to maintain the xids order in the array.
*/
static void
-SnapBuildPurgeCommittedTxn(SnapBuild *builder)
+SnapBuildPurgeOlderTxn(SnapBuild *builder)
{
int off;
TransactionId *workspace;
@@ -935,6 +970,49 @@ SnapBuildPurgeCommittedTxn(SnapBuild *builder)
builder->committed.xcnt = surviving_xids;
pfree(workspace);
+
+ /* Quick exit if there is no initial running transactions */
+ if (NInitialRunningXacts == 0)
+ return;
+
+ /* bound check if there is at least one transaction to remove */
+ if (!NormalTransactionIdPrecedes(InitialRunningXacts[0],
+ builder->xmin))
+ return;
+
+ /*
+ * purge xids in InitialRunningXacts as well. The purged array must also
+ * be sorted in xidComparator order.
+ */
+ workspace =
+ MemoryContextAlloc(builder->context,
+ NInitialRunningXacts * sizeof(TransactionId));
+ surviving_xids = 0;
+ for (off = 0; off < NInitialRunningXacts; off++)
+ {
+ if (NormalTransactionIdPrecedes(InitialRunningXacts[off],
+ builder->xmin))
+ ; /* remove */
+ else
+ workspace[surviving_xids++] = InitialRunningXacts[off];
+ }
+
+ if (surviving_xids > 0)
+ memcpy(InitialRunningXacts, workspace,
+ sizeof(TransactionId) * surviving_xids);
+ else
+ {
+ pfree(InitialRunningXacts);
+ InitialRunningXacts = NULL;
+ }
+
+ elog(DEBUG3, "purged initial running transactions from %u to %u, oldest running xid %u",
+ (uint32) NInitialRunningXacts,
+ (uint32) surviving_xids,
+ builder->xmin);
+
+ NInitialRunningXacts = surviving_xids;
+ pfree(workspace);
}
/*
@@ -1142,7 +1220,7 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact
builder->xmin = running->oldestRunningXid;
/* Remove transactions we don't need to keep track off anymore */
- SnapBuildPurgeCommittedTxn(builder);
+ SnapBuildPurgeOlderTxn(builder);
/*
* Advance the xmin limit for the current replication slot, to allow
@@ -1293,6 +1371,20 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
else if (!builder->building_full_snapshot &&
SnapBuildRestore(builder, lsn))
{
+ int nxacts = running->subxcnt + running->xcnt;
+ Size sz = sizeof(TransactionId) * nxacts;
+
+ /*
+ * Remember the transactions and subtransactions that were running
+ * when xl_running_xacts record that we decoded was written. We use
+ * this later to identify the transactions have performed catalog
+ * changes. See SnapBuildXidSetCatalogChanges.
+ */
+ NInitialRunningXacts = nxacts;
+ InitialRunningXacts = MemoryContextAlloc(builder->context, sz);
+ memcpy(InitialRunningXacts, running->xids, sz);
+ qsort(InitialRunningXacts, nxacts, sizeof(TransactionId), xidComparator);
+
/* there won't be any state to cleanup */
return false;
}
@@ -2035,3 +2127,30 @@ CheckPointSnapBuild(void)
}
FreeDir(snap_dir);
}
+
+/*
+ * Mark the transaction as containing catalog changes. In addition, if the
+ * given xid is in the list of the initial running xacts, we mark its
+ * subtransactions as well. See comments for NInitialRunningXacts and
+ * InitialRunningXacts for additional info.
+ */
+void
+SnapBuildXidSetCatalogChanges(SnapBuild *builder, TransactionId xid, int subxcnt,
+ TransactionId *subxacts, XLogRecPtr lsn)
+{
+ ReorderBufferXidSetCatalogChanges(builder->reorder, xid, lsn);
+
+ /* Skip if there is no initial running xacts information */
+ if (NInitialRunningXacts == 0)
+ return;
+
+ if (bsearch(&xid, InitialRunningXacts, NInitialRunningXacts,
+ sizeof(TransactionId), xidComparator) != NULL)
+ {
+ for (int i = 0; i < subxcnt; i++)
+ {
+ ReorderBufferAssignChild(builder->reorder, xid, subxacts[i], lsn);
+ ReorderBufferXidSetCatalogChanges(builder->reorder, subxacts[i], lsn);
+ }
+ }
+}
diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h
index 3acf68f5bdf..2eb9532a1bb 100644
--- a/src/include/replication/snapbuild.h
+++ b/src/include/replication/snapbuild.h
@@ -88,4 +88,7 @@ extern void SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn,
struct xl_running_xacts *running);
extern void SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn);
+extern void SnapBuildXidSetCatalogChanges(SnapBuild *builder, TransactionId xid,
+ int subxcnt, TransactionId *subxacts,
+ XLogRecPtr lsn);
#endif /* SNAPBUILD_H */