diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/replication/logical/decode.c | 15 | ||||
-rw-r--r-- | src/backend/replication/logical/snapbuild.c | 137 | ||||
-rw-r--r-- | src/include/replication/snapbuild.h | 3 |
3 files changed, 148 insertions, 7 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 92dfafc6329..5a440e6eb7a 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -691,6 +691,21 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, commit_time = parsed->origin_timestamp; } + /* + * If the COMMIT record has invalidation messages, it could have catalog + * changes. It is possible that we didn't mark this transaction as + * containing catalog changes when the decoding starts from a commit + * record without decoding the transaction's other changes. So, we ensure + * to mark such transactions as containing catalog change. + * + * This must be done before SnapBuildCommitTxn() so that we can include + * these transactions in the historic snapshot. + */ + if (parsed->xinfo & XACT_XINFO_HAS_INVALS) + SnapBuildXidSetCatalogChanges(ctx->snapshot_builder, xid, + parsed->nsubxacts, parsed->subxacts, + buf->origptr); + SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid, parsed->nsubxacts, parsed->subxacts); diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 6df602485b3..ac09f0e4f9d 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -250,8 +250,38 @@ struct SnapBuild static ResourceOwner SavedResourceOwnerDuringExport = NULL; static bool ExportInProgress = false; -/* ->committed manipulation */ -static void SnapBuildPurgeCommittedTxn(SnapBuild *builder); +/* + * Array of transactions and subtransactions that were running when + * the xl_running_xacts record that we decoded was written. The array is + * sorted in xidComparator order. We remove xids from this array when + * they become old enough to matter, and then it eventually becomes empty. + * This array is allocated in builder->context so its lifetime is the same + * as the snapshot builder. + * + * We normally rely on some WAL record types such as HEAP2_NEW_CID to know + * if the transaction has changed the catalog. But it could happen that the + * logical decoding decodes only the commit record of the transaction after + * restoring the previously serialized snapshot in which case we will miss + * adding the xid to the snapshot and end up looking at the catalogs with the + * wrong snapshot. + * + * Now to avoid the above problem, if the COMMIT record of the xid listed in + * InitialRunningXacts has XACT_XINFO_HAS_INVALS flag, we mark both the top + * transaction and its substransactions as containing catalog changes. + * + * We could end up adding the transaction that didn't change catalog + * to the snapshot since we cannot distinguish whether the transaction + * has catalog changes only by checking the COMMIT record. It doesn't + * have the information on which (sub) transaction has catalog changes, + * and XACT_XINFO_HAS_INVALS doesn't necessarily indicate that the + * transaction has catalog change. But that won't be a problem since we + * use snapshot built during decoding only for reading system catalogs. + */ +static TransactionId *InitialRunningXacts = NULL; +static int NInitialRunningXacts = 0; + +/* ->committed and InitailRunningXacts manipulation */ +static void SnapBuildPurgeOlderTxn(SnapBuild *builder); /* snapshot building/manipulation/distribution functions */ static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder); @@ -879,12 +909,17 @@ SnapBuildAddCommittedTxn(SnapBuild *builder, TransactionId xid) } /* - * Remove knowledge about transactions we treat as committed that are smaller - * than ->xmin. Those won't ever get checked via the ->committed array but via - * the clog machinery, so we don't need to waste memory on them. + * Remove knowledge about transactions we treat as committed and the initial + * running transactions that are smaller than ->xmin. Those won't ever get + * checked via the ->committed or InitialRunningXacts array, respectively. + * The committed xids will get checked via the clog machinery. + * + * We can ideally remove the transaction from InitialRunningXacts array + * once it is finished (committed/aborted) but that could be costly as we need + * to maintain the xids order in the array. */ static void -SnapBuildPurgeCommittedTxn(SnapBuild *builder) +SnapBuildPurgeOlderTxn(SnapBuild *builder) { int off; TransactionId *workspace; @@ -919,6 +954,49 @@ SnapBuildPurgeCommittedTxn(SnapBuild *builder) builder->committed.xcnt = surviving_xids; pfree(workspace); + + /* Quick exit if there is no initial running transactions */ + if (NInitialRunningXacts == 0) + return; + + /* bound check if there is at least one transaction to remove */ + if (!NormalTransactionIdPrecedes(InitialRunningXacts[0], + builder->xmin)) + return; + + /* + * purge xids in InitialRunningXacts as well. The purged array must also + * be sorted in xidComparator order. + */ + workspace = + MemoryContextAlloc(builder->context, + NInitialRunningXacts * sizeof(TransactionId)); + surviving_xids = 0; + for (off = 0; off < NInitialRunningXacts; off++) + { + if (NormalTransactionIdPrecedes(InitialRunningXacts[off], + builder->xmin)) + ; /* remove */ + else + workspace[surviving_xids++] = InitialRunningXacts[off]; + } + + if (surviving_xids > 0) + memcpy(InitialRunningXacts, workspace, + sizeof(TransactionId) * surviving_xids); + else + { + pfree(InitialRunningXacts); + InitialRunningXacts = NULL; + } + + elog(DEBUG3, "purged initial running transactions from %u to %u, oldest running xid %u", + (uint32) NInitialRunningXacts, + (uint32) surviving_xids, + builder->xmin); + + NInitialRunningXacts = surviving_xids; + pfree(workspace); } /* @@ -1126,7 +1204,7 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact builder->xmin = running->oldestRunningXid; /* Remove transactions we don't need to keep track off anymore */ - SnapBuildPurgeCommittedTxn(builder); + SnapBuildPurgeOlderTxn(builder); /* * Advance the xmin limit for the current replication slot, to allow @@ -1277,6 +1355,20 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn else if (!builder->building_full_snapshot && SnapBuildRestore(builder, lsn)) { + int nxacts = running->subxcnt + running->xcnt; + Size sz = sizeof(TransactionId) * nxacts; + + /* + * Remember the transactions and subtransactions that were running + * when xl_running_xacts record that we decoded was written. We use + * this later to identify the transactions have performed catalog + * changes. See SnapBuildXidSetCatalogChanges. + */ + NInitialRunningXacts = nxacts; + InitialRunningXacts = MemoryContextAlloc(builder->context, sz); + memcpy(InitialRunningXacts, running->xids, sz); + qsort(InitialRunningXacts, nxacts, sizeof(TransactionId), xidComparator); + /* there won't be any state to cleanup */ return false; } @@ -1993,3 +2085,34 @@ CheckPointSnapBuild(void) } FreeDir(snap_dir); } + +/* + * If the given xid is in the list of the initial running xacts, we mark the + * transaction and its subtransactions as containing catalog changes. See + * comments for NInitialRunningXacts and InitialRunningXacts for additional + * info. + */ +void +SnapBuildXidSetCatalogChanges(SnapBuild *builder, TransactionId xid, int subxcnt, + TransactionId *subxacts, XLogRecPtr lsn) +{ + /* + * Skip if there is no initial running xacts information or the + * transaction is already marked as containing catalog changes. + */ + if (NInitialRunningXacts == 0 || + ReorderBufferXidHasCatalogChanges(builder->reorder, xid)) + return; + + if (bsearch(&xid, InitialRunningXacts, NInitialRunningXacts, + sizeof(TransactionId), xidComparator) != NULL) + { + ReorderBufferXidSetCatalogChanges(builder->reorder, xid, lsn); + + for (int i = 0; i < subxcnt; i++) + { + ReorderBufferAssignChild(builder->reorder, xid, subxacts[i], lsn); + ReorderBufferXidSetCatalogChanges(builder->reorder, subxacts[i], lsn); + } + } +} diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h index 3604621e888..a19b59e1008 100644 --- a/src/include/replication/snapbuild.h +++ b/src/include/replication/snapbuild.h @@ -90,4 +90,7 @@ extern void SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, struct xl_running_xacts *running); extern void SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn); +extern void SnapBuildXidSetCatalogChanges(SnapBuild *builder, TransactionId xid, + int subxcnt, TransactionId *subxacts, + XLogRecPtr lsn); #endif /* SNAPBUILD_H */ |