diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/replication/logical/decode.c | 15 | ||||
-rw-r--r-- | src/backend/replication/logical/snapbuild.c | 133 | ||||
-rw-r--r-- | src/include/replication/snapbuild.h | 3 |
3 files changed, 143 insertions, 8 deletions
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 60d07ce4eb5..19cd0bf76ac 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -585,7 +585,20 @@ DecodeCommit(LogicalDecodingContext *ctx, XLogRecordBuffer *buf, if (!ctx->fast_forward) ReorderBufferAddInvalidations(ctx->reorder, xid, buf->origptr, parsed->nmsgs, parsed->msgs); - ReorderBufferXidSetCatalogChanges(ctx->reorder, xid, buf->origptr); + /* + * If the COMMIT record has invalidation messages, it could have catalog + * changes. It is possible that we didn't mark this transaction and + * its subtransactions as containing catalog changes when the decoding + * starts from a commit record without decoding the transaction's other + * changes. Therefore, we ensure to mark such transactions as containing + * catalog change. + * + * This must be done before SnapBuildCommitTxn() so that we can include + * these transactions in the historic snapshot. + */ + SnapBuildXidSetCatalogChanges(ctx->snapshot_builder, xid, + parsed->nsubxacts, parsed->subxacts, + buf->origptr); } SnapBuildCommitTxn(ctx->snapshot_builder, buf->origptr, xid, diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 5a1bce5acc0..cd091bb724b 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -257,8 +257,38 @@ struct SnapBuild static ResourceOwner SavedResourceOwnerDuringExport = NULL; static bool ExportInProgress = false; -/* ->committed manipulation */ -static void SnapBuildPurgeCommittedTxn(SnapBuild *builder); +/* + * Array of transactions and subtransactions that were running when + * the xl_running_xacts record that we decoded was written. The array is + * sorted in xidComparator order. We remove xids from this array when + * they become old enough to matter, and then it eventually becomes empty. + * This array is allocated in builder->context so its lifetime is the same + * as the snapshot builder. + * + * We normally rely on some WAL record types such as HEAP2_NEW_CID to know + * if the transaction has changed the catalog. But it could happen that the + * logical decoding decodes only the commit record of the transaction after + * restoring the previously serialized snapshot in which case we will miss + * adding the xid to the snapshot and end up looking at the catalogs with the + * wrong snapshot. + * + * Now to avoid the above problem, if the COMMIT record of the xid listed in + * InitialRunningXacts has XACT_XINFO_HAS_INVALS flag, we mark both the top + * transaction and its substransactions as containing catalog changes. + * + * We could end up adding the transaction that didn't change catalog + * to the snapshot since we cannot distinguish whether the transaction + * has catalog changes only by checking the COMMIT record. It doesn't + * have the information on which (sub) transaction has catalog changes, + * and XACT_XINFO_HAS_INVALS doesn't necessarily indicate that the + * transaction has catalog change. But that won't be a problem since we + * use snapshot built during decoding only for reading system catalogs. + */ +static TransactionId *InitialRunningXacts = NULL; +static int NInitialRunningXacts = 0; + +/* ->committed and InitailRunningXacts manipulation */ +static void SnapBuildPurgeOlderTxn(SnapBuild *builder); /* snapshot building/manipulation/distribution functions */ static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder); @@ -895,12 +925,17 @@ SnapBuildAddCommittedTxn(SnapBuild *builder, TransactionId xid) } /* - * Remove knowledge about transactions we treat as committed that are smaller - * than ->xmin. Those won't ever get checked via the ->committed array but via - * the clog machinery, so we don't need to waste memory on them. + * Remove knowledge about transactions we treat as committed and the initial + * running transactions that are smaller than ->xmin. Those won't ever get + * checked via the ->committed or InitialRunningXacts array, respectively. + * The committed xids will get checked via the clog machinery. + * + * We can ideally remove the transaction from InitialRunningXacts array + * once it is finished (committed/aborted) but that could be costly as we need + * to maintain the xids order in the array. */ static void -SnapBuildPurgeCommittedTxn(SnapBuild *builder) +SnapBuildPurgeOlderTxn(SnapBuild *builder) { int off; TransactionId *workspace; @@ -935,6 +970,49 @@ SnapBuildPurgeCommittedTxn(SnapBuild *builder) builder->committed.xcnt = surviving_xids; pfree(workspace); + + /* Quick exit if there is no initial running transactions */ + if (NInitialRunningXacts == 0) + return; + + /* bound check if there is at least one transaction to remove */ + if (!NormalTransactionIdPrecedes(InitialRunningXacts[0], + builder->xmin)) + return; + + /* + * purge xids in InitialRunningXacts as well. The purged array must also + * be sorted in xidComparator order. + */ + workspace = + MemoryContextAlloc(builder->context, + NInitialRunningXacts * sizeof(TransactionId)); + surviving_xids = 0; + for (off = 0; off < NInitialRunningXacts; off++) + { + if (NormalTransactionIdPrecedes(InitialRunningXacts[off], + builder->xmin)) + ; /* remove */ + else + workspace[surviving_xids++] = InitialRunningXacts[off]; + } + + if (surviving_xids > 0) + memcpy(InitialRunningXacts, workspace, + sizeof(TransactionId) * surviving_xids); + else + { + pfree(InitialRunningXacts); + InitialRunningXacts = NULL; + } + + elog(DEBUG3, "purged initial running transactions from %u to %u, oldest running xid %u", + (uint32) NInitialRunningXacts, + (uint32) surviving_xids, + builder->xmin); + + NInitialRunningXacts = surviving_xids; + pfree(workspace); } /* @@ -1142,7 +1220,7 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact builder->xmin = running->oldestRunningXid; /* Remove transactions we don't need to keep track off anymore */ - SnapBuildPurgeCommittedTxn(builder); + SnapBuildPurgeOlderTxn(builder); /* * Advance the xmin limit for the current replication slot, to allow @@ -1293,6 +1371,20 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn else if (!builder->building_full_snapshot && SnapBuildRestore(builder, lsn)) { + int nxacts = running->subxcnt + running->xcnt; + Size sz = sizeof(TransactionId) * nxacts; + + /* + * Remember the transactions and subtransactions that were running + * when xl_running_xacts record that we decoded was written. We use + * this later to identify the transactions have performed catalog + * changes. See SnapBuildXidSetCatalogChanges. + */ + NInitialRunningXacts = nxacts; + InitialRunningXacts = MemoryContextAlloc(builder->context, sz); + memcpy(InitialRunningXacts, running->xids, sz); + qsort(InitialRunningXacts, nxacts, sizeof(TransactionId), xidComparator); + /* there won't be any state to cleanup */ return false; } @@ -2035,3 +2127,30 @@ CheckPointSnapBuild(void) } FreeDir(snap_dir); } + +/* + * Mark the transaction as containing catalog changes. In addition, if the + * given xid is in the list of the initial running xacts, we mark its + * subtransactions as well. See comments for NInitialRunningXacts and + * InitialRunningXacts for additional info. + */ +void +SnapBuildXidSetCatalogChanges(SnapBuild *builder, TransactionId xid, int subxcnt, + TransactionId *subxacts, XLogRecPtr lsn) +{ + ReorderBufferXidSetCatalogChanges(builder->reorder, xid, lsn); + + /* Skip if there is no initial running xacts information */ + if (NInitialRunningXacts == 0) + return; + + if (bsearch(&xid, InitialRunningXacts, NInitialRunningXacts, + sizeof(TransactionId), xidComparator) != NULL) + { + for (int i = 0; i < subxcnt; i++) + { + ReorderBufferAssignChild(builder->reorder, xid, subxacts[i], lsn); + ReorderBufferXidSetCatalogChanges(builder->reorder, subxacts[i], lsn); + } + } +} diff --git a/src/include/replication/snapbuild.h b/src/include/replication/snapbuild.h index 3acf68f5bdf..2eb9532a1bb 100644 --- a/src/include/replication/snapbuild.h +++ b/src/include/replication/snapbuild.h @@ -88,4 +88,7 @@ extern void SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, struct xl_running_xacts *running); extern void SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn); +extern void SnapBuildXidSetCatalogChanges(SnapBuild *builder, TransactionId xid, + int subxcnt, TransactionId *subxacts, + XLogRecPtr lsn); #endif /* SNAPBUILD_H */ |