diff options
author | Amit Kapila <akapila@postgresql.org> | 2022-08-11 09:30:55 +0530 |
---|---|---|
committer | Amit Kapila <akapila@postgresql.org> | 2022-08-11 09:30:55 +0530 |
commit | 547b963683e34e9f26401093131f9bea89d48968 (patch) | |
tree | b609091084f948d3fe4709149bde69f346faa26f /src | |
parent | 71caf3c4da1c85a2ec7cfce914f1ccb723c8e991 (diff) | |
download | postgresql-547b963683e34e9f26401093131f9bea89d48968.tar.gz postgresql-547b963683e34e9f26401093131f9bea89d48968.zip |
Fix catalog lookup with the wrong snapshot during logical decoding.
Previously, we relied on HEAP2_NEW_CID records and XACT_INVALIDATION
records to know if the transaction has modified the catalog, and that
information is not serialized to snapshot. Therefore, after the restart,
if the logical decoding decodes only the commit record of the transaction
that has actually modified a catalog, we will miss adding its XID to the
snapshot. Thus, we will end up looking at catalogs with the wrong
snapshot.
To fix this problem, this changes the snapshot builder so that it
remembers the last-running-xacts list of the decoded RUNNING_XACTS record
after restoring the previously serialized snapshot. Then, we mark the
transaction as containing catalog changes if it's in the list of initial
running transactions and its commit record has XACT_XINFO_HAS_INVALS. To
avoid ABI breakage, we store the array of the initial running transactions
in the static variables InitialRunningXacts and NInitialRunningXacts,
instead of storing those in SnapBuild or ReorderBuffer.
This approach has a false positive; we could end up adding the transaction
that didn't change catalog to the snapshot since we cannot distinguish
whether the transaction has catalog changes only by checking the COMMIT
record. It doesn't have the information on which (sub) transaction has
catalog changes, and XACT_XINFO_HAS_INVALS doesn't necessarily indicate
that the transaction has catalog change. But that won't be a problem since
we use snapshot built during decoding only to read system catalogs.
On the master branch, we took a more future-proof approach by writing
catalog modifying transactions to the serialized snapshot which avoids the
above false positive. But we cannot backpatch it because of a change in
the SnapBuild.
Reported-by: Mike Oh
Author: Masahiko Sawada
Reviewed-by: Amit Kapila, Shi yu, Takamichi Osumi, Kyotaro Horiguchi, Bertrand Drouvot, Ahsan Hadi
Backpatch-through: 10
Discussion: https://postgr.es/m/81D0D8B0-E7C4-4999-B616-1E5004DBDCD2%40amazon.com
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/replication/logical/decode.c | 15 | ||||
-rw-r--r-- | src/backend/replication/logical/snapbuild.c | 133 | ||||
-rw-r--r-- | src/include/replication/snapbuild.h | 3 |
3 files changed, 143 insertions, 8 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 5a2b828aa3f..87cbd08e858 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -582,7 +582,20 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, if (!ctx->fast_forward) ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr, parsed->nmsgs, parsed->msgs); - ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr); + /* + * If the COMMIT record has invalidation messages, it could have catalog + * changes. It is possible that we didn't mark this transaction and + * its subtransactions as containing catalog changes when the decoding + * starts from a commit record without decoding the transaction's other + * changes. Therefore, we ensure to mark such transactions as containing + * catalog change. + * + * This must be done before SnapBuildCommitTxn() so that we can include + * these transactions in the historic snapshot. + */ + SnapBuildXidSetCatalogChanges(ctx->snapshot_builder, xid, + parsed->nsubxacts, parsed->subxacts, + buf->origptr); } SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid, diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index be46bf0363d..d407fb3440e 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -252,8 +252,38 @@ struct SnapBuild static ResourceOwner SavedResourceOwnerDuringExport = NULL; static bool ExportInProgress = false; -/* ->committed manipulation */ -static void SnapBuildPurgeCommittedTxn(SnapBuild *builder); +/* + * Array of transactions and subtransactions that were running when + * the xl_running_xacts record that we decoded was written. The array is + * sorted in xidComparator order. We remove xids from this array when + * they become old enough to matter, and then it eventually becomes empty. + * This array is allocated in builder->context so its lifetime is the same + * as the snapshot builder. + * + * We normally rely on some WAL record types such as HEAP2_NEW_CID to know + * if the transaction has changed the catalog. But it could happen that the + * logical decoding decodes only the commit record of the transaction after + * restoring the previously serialized snapshot in which case we will miss + * adding the xid to the snapshot and end up looking at the catalogs with the + * wrong snapshot. + * + * Now to avoid the above problem, if the COMMIT record of the xid listed in + * InitialRunningXacts has XACT_XINFO_HAS_INVALS flag, we mark both the top + * transaction and its substransactions as containing catalog changes. + * + * We could end up adding the transaction that didn't change catalog + * to the snapshot since we cannot distinguish whether the transaction + * has catalog changes only by checking the COMMIT record. It doesn't + * have the information on which (sub) transaction has catalog changes, + * and XACT_XINFO_HAS_INVALS doesn't necessarily indicate that the + * transaction has catalog change. But that won't be a problem since we + * use snapshot built during decoding only for reading system catalogs. + */ +static TransactionId *InitialRunningXacts = NULL; +static int NInitialRunningXacts = 0; + +/* ->committed and InitailRunningXacts manipulation */ +static void SnapBuildPurgeOlderTxn(SnapBuild *builder); /* snapshot building/manipulation/distribution functions */ static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder); @@ -890,12 +920,17 @@ SnapBuildAddCommittedTxn(SnapBuild *builder, TransactionId xid) } /* - * Remove knowledge about transactions we treat as committed that are smaller - * than ->xmin. Those won't ever get checked via the ->committed array but via - * the clog machinery, so we don't need to waste memory on them. + * Remove knowledge about transactions we treat as committed and the initial + * running transactions that are smaller than ->xmin. Those won't ever get + * checked via the ->committed or InitialRunningXacts array, respectively. + * The committed xids will get checked via the clog machinery. + * + * We can ideally remove the transaction from InitialRunningXacts array + * once it is finished (committed/aborted) but that could be costly as we need + * to maintain the xids order in the array. */ static void -SnapBuildPurgeCommittedTxn(SnapBuild *builder) +SnapBuildPurgeOlderTxn(SnapBuild *builder) { int off; TransactionId *workspace; @@ -930,6 +965,49 @@ SnapBuildPurgeCommittedTxn(SnapBuild *builder) builder->committed.xcnt = surviving_xids; pfree(workspace); + + /* Quick exit if there is no initial running transactions */ + if (NInitialRunningXacts == 0) + return; + + /* bound check if there is at least one transaction to remove */ + if (!NormalTransactionIdPrecedes(InitialRunningXacts[0], + builder->xmin)) + return; + + /* + * purge xids in InitialRunningXacts as well. The purged array must also + * be sorted in xidComparator order. + */ + workspace = + MemoryContextAlloc(builder->context, + NInitialRunningXacts * sizeof(TransactionId)); + surviving_xids = 0; + for (off = 0; off < NInitialRunningXacts; off++) + { + if (NormalTransactionIdPrecedes(InitialRunningXacts[off], + builder->xmin)) + ; /* remove */ + else + workspace[surviving_xids++] = InitialRunningXacts[off]; + } + + if (surviving_xids > 0) + memcpy(InitialRunningXacts, workspace, + sizeof(TransactionId) * surviving_xids); + else + { + pfree(InitialRunningXacts); + InitialRunningXacts = NULL; + } + + elog(DEBUG3, "purged initial running transactions from %u to %u, oldest running xid %u", + (uint32) NInitialRunningXacts, + (uint32) surviving_xids, + builder->xmin); + + NInitialRunningXacts = surviving_xids; + pfree(workspace); } /* @@ -1137,7 +1215,7 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact builder->xmin = running->oldestRunningXid; /* Remove transactions we don't need to keep track off anymore */ - SnapBuildPurgeCommittedTxn(builder); + SnapBuildPurgeOlderTxn(builder); /* * Advance the xmin limit for the current replication slot, to allow @@ -1288,6 +1366,20 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn else if (!builder->building_full_snapshot && SnapBuildRestore(builder, lsn)) { + int nxacts = running->subxcnt + running->xcnt; + Size sz = sizeof(TransactionId) * nxacts; + + /* + * Remember the transactions and subtransactions that were running + * when xl_running_xacts record that we decoded was written. We use + * this later to identify the transactions have performed catalog + * changes. See SnapBuildXidSetCatalogChanges. + */ + NInitialRunningXacts = nxacts; + InitialRunningXacts = MemoryContextAlloc(builder->context, sz); + memcpy(InitialRunningXacts, running->xids, sz); + qsort(InitialRunningXacts, nxacts, sizeof(TransactionId), xidComparator); + /* there won't be any state to cleanup */ return false; } @@ -2030,3 +2122,30 @@ CheckPointSnapBuild(void) } FreeDir(snap_dir); } + +/* + * Mark the transaction as containing catalog changes. In addition, if the + * given xid is in the list of the initial running xacts, we mark its + * subtransactions as well. See comments for NInitialRunningXacts and + * InitialRunningXacts for additional info. + */ +void +SnapBuildXidSetCatalogChanges(SnapBuild *builder, TransactionId xid, int subxcnt, + TransactionId *subxacts, XLogRecPtr lsn) +{ + ReorderBufferXidSetCatalogChanges(builder->reorder, xid, lsn); + + /* Skip if there is no initial running xacts information */ + if (NInitialRunningXacts == 0) + return; + + if (bsearch(&xid, InitialRunningXacts, NInitialRunningXacts, + sizeof(TransactionId), xidComparator) != NULL) + { + for (int i = 0; i < subxcnt; i++) + { + ReorderBufferAssignChild(builder->reorder, xid, subxacts[i], lsn); + ReorderBufferXidSetCatalogChanges(builder->reorder, subxacts[i], lsn); + } + } +} diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h index b048dc7484c..17d2f933004 100644 --- a/src/include/replication/snapbuild.h +++ b/src/include/replication/snapbuild.h @@ -88,4 +88,7 @@ extern void SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, struct xl_running_xacts *running); extern void SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn); +extern void SnapBuildXidSetCatalogChanges(SnapBuild *builder, TransactionId xid, + int subxcnt, TransactionId *subxacts, + XLogRecPtr lsn); #endif /* SNAPBUILD_H */ |