diff options
Diffstat (limited to 'src/backend/replication/logical/snapbuild.c')
-rw-r--r-- | src/backend/replication/logical/snapbuild.c | 273 |
1 files changed, 188 insertions, 85 deletions
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 73c0f15214a..1ff2c12240d 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -241,6 +241,33 @@ struct SnapBuild */ TransactionId *xip; } committed; + + /* + * Array of transactions and subtransactions that had modified catalogs + * and were running when the snapshot was serialized. + * + * We normally rely on some WAL record types such as HEAP2_NEW_CID to know + * if the transaction has changed the catalog. But it could happen that + * the logical decoding decodes only the commit record of the transaction + * after restoring the previously serialized snapshot in which case we + * will miss adding the xid to the snapshot and end up looking at the + * catalogs with the wrong snapshot. + * + * Now to avoid the above problem, we serialize the transactions that had + * modified the catalogs and are still running at the time of snapshot + * serialization. We fill this array while restoring the snapshot and then + * refer it while decoding commit to ensure if the xact has modified the + * catalog. We discard this array when all the xids in the list become old + * enough to matter. See SnapBuildPurgeOlderTxn for details. + */ + struct + { + /* number of transactions */ + size_t xcnt; + + /* This array must be sorted in xidComparator order */ + TransactionId *xip; + } catchange; }; /* @@ -250,8 +277,8 @@ struct SnapBuild static ResourceOwner SavedResourceOwnerDuringExport = NULL; static bool ExportInProgress = false; -/* ->committed manipulation */ -static void SnapBuildPurgeCommittedTxn(SnapBuild *builder); +/* ->committed and ->catchange manipulation */ +static void SnapBuildPurgeOlderTxn(SnapBuild *builder); /* snapshot building/manipulation/distribution functions */ static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder); @@ -262,6 +289,9 @@ static void SnapBuildSnapIncRefcount(Snapshot snap); static void SnapBuildDistributeNewCatalogSnapshot(SnapBuild *builder, XLogRecPtr lsn); +static inline bool SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid, + uint32 xinfo); + /* xlog reading helper functions for SnapBuildProcessRunningXacts */ static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running); static void SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff); @@ -269,6 +299,7 @@ static void SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutof /* serialization functions */ static void SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn); static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn); +static void SnapBuildRestoreContents(int fd, char *dest, Size size, const char *path); /* * Allocate a new snapshot builder. @@ -306,6 +337,9 @@ AllocateSnapshotBuilder(ReorderBuffer *reorder, palloc0(builder->committed.xcnt_space * sizeof(TransactionId)); builder->committed.includes_all_transactions = true; + builder->catchange.xcnt = 0; + builder->catchange.xip = NULL; + builder->initial_xmin_horizon = xmin_horizon; builder->start_decoding_at = start_lsn; builder->building_full_snapshot = need_full_snapshot; @@ -888,12 +922,17 @@ SnapBuildAddCommittedTxn(SnapBuild *builder, TransactionId xid) } /* - * Remove knowledge about transactions we treat as committed that are smaller - * than ->xmin. Those won't ever get checked via the ->committed array but via - * the clog machinery, so we don't need to waste memory on them. + * Remove knowledge about transactions we treat as committed or containing catalog + * changes that are smaller than ->xmin. Those won't ever get checked via + * the ->committed or ->catchange array, respectively. The committed xids will + * get checked via the clog machinery. + * + * We can ideally remove the transaction from catchange array once it is + * finished (committed/aborted) but that could be costly as we need to maintain + * the xids order in the array. */ static void -SnapBuildPurgeCommittedTxn(SnapBuild *builder) +SnapBuildPurgeOlderTxn(SnapBuild *builder) { int off; TransactionId *workspace; @@ -928,6 +967,30 @@ SnapBuildPurgeCommittedTxn(SnapBuild *builder) builder->committed.xcnt = surviving_xids; pfree(workspace); + + /* + * Either all the xacts got purged or none. It is only possible to + * partially remove the xids from this array if one or more of the xids + * are still running but not all. That can happen if we start decoding + * from a point (LSN where the snapshot state became consistent) where all + * the xacts in this were running and then at least one of those got + * committed and a few are still running. We will never start from such a + * point because we won't move the slot's restart_lsn past the point where + * the oldest running transaction's restart_decoding_lsn is. + */ + if (builder->catchange.xcnt == 0 || + TransactionIdFollowsOrEquals(builder->catchange.xip[0], + builder->xmin)) + return; + + Assert(TransactionIdFollows(builder->xmin, + builder->catchange.xip[builder->catchange.xcnt - 1])); + pfree(builder->catchange.xip); + builder->catchange.xip = NULL; + builder->catchange.xcnt = 0; + + elog(DEBUG3, "purged catalog modifying transactions, oldest running xid %u", + builder->xmin); } /* @@ -935,7 +998,7 @@ SnapBuildPurgeCommittedTxn(SnapBuild *builder) */ void SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, - int nsubxacts, TransactionId *subxacts) + int nsubxacts, TransactionId *subxacts, uint32 xinfo) { int nxact; @@ -983,7 +1046,7 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, * Add subtransaction to base snapshot if catalog modifying, we don't * distinguish to toplevel transactions there. */ - if (ReorderBufferXidHasCatalogChanges(builder->reorder, subxid)) + if (SnapBuildXidHasCatalogChanges(builder, subxid, xinfo)) { sub_needs_timetravel = true; needs_snapshot = true; @@ -1012,7 +1075,7 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, } /* if top-level modified catalog, it'll need a snapshot */ - if (ReorderBufferXidHasCatalogChanges(builder->reorder, xid)) + if (SnapBuildXidHasCatalogChanges(builder, xid, xinfo)) { elog(DEBUG2, "found top level transaction %u, with catalog changes", xid); @@ -1089,6 +1152,29 @@ SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, } } +/* + * Check the reorder buffer and the snapshot to see if the given transaction has + * modified catalogs. + */ +static inline bool +SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid, + uint32 xinfo) +{ + if (ReorderBufferXidHasCatalogChanges(builder->reorder, xid)) + return true; + + /* + * The transactions that have changed catalogs must have invalidation + * info. + */ + if (!(xinfo & XACT_XINFO_HAS_INVALS)) + return false; + + /* Check the catchange XID array */ + return ((builder->catchange.xcnt > 0) && + (bsearch(&xid, builder->catchange.xip, builder->catchange.xcnt, + sizeof(TransactionId), xidComparator) != NULL)); +} /* ----------------------------------- * Snapshot building functions dealing with xlog records @@ -1135,7 +1221,7 @@ SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xact builder->xmin = running->oldestRunningXid; /* Remove transactions we don't need to keep track off anymore */ - SnapBuildPurgeCommittedTxn(builder); + SnapBuildPurgeOlderTxn(builder); /* * Advance the xmin limit for the current replication slot, to allow @@ -1438,6 +1524,7 @@ SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff) * * struct SnapBuildOnDisk; * TransactionId * committed.xcnt; (*not xcnt_space*) + * TransactionId * catchange.xcnt; * */ typedef struct SnapBuildOnDisk @@ -1467,7 +1554,7 @@ typedef struct SnapBuildOnDisk offsetof(SnapBuildOnDisk, version) #define SNAPBUILD_MAGIC 0x51A1E001 -#define SNAPBUILD_VERSION 4 +#define SNAPBUILD_VERSION 5 /* * Store/Load a snapshot from disk, depending on the snapshot builder's state. @@ -1493,6 +1580,9 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn) { Size needed_length; SnapBuildOnDisk *ondisk = NULL; + TransactionId *catchange_xip = NULL; + MemoryContext old_ctx; + size_t catchange_xcnt; char *ondisk_c; int fd; char tmppath[MAXPGPATH]; @@ -1578,10 +1668,16 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn) (errcode_for_file_access(), errmsg("could not remove file \"%s\": %m", tmppath))); + old_ctx = MemoryContextSwitchTo(builder->context); + + /* Get the catalog modifying transactions that are yet not committed */ + catchange_xip = ReorderBufferGetCatalogChangesXacts(builder->reorder); + catchange_xcnt = builder->reorder->catchange_ntxns; + needed_length = sizeof(SnapBuildOnDisk) + - sizeof(TransactionId) * builder->committed.xcnt; + sizeof(TransactionId) * (builder->committed.xcnt + catchange_xcnt); - ondisk_c = MemoryContextAllocZero(builder->context, needed_length); + ondisk_c = palloc0(needed_length); ondisk = (SnapBuildOnDisk *) ondisk_c; ondisk->magic = SNAPBUILD_MAGIC; ondisk->version = SNAPBUILD_VERSION; @@ -1598,16 +1694,31 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn) ondisk->builder.snapshot = NULL; ondisk->builder.reorder = NULL; ondisk->builder.committed.xip = NULL; + ondisk->builder.catchange.xip = NULL; + /* update catchange only on disk data */ + ondisk->builder.catchange.xcnt = catchange_xcnt; COMP_CRC32C(ondisk->checksum, &ondisk->builder, sizeof(SnapBuild)); /* copy committed xacts */ - sz = sizeof(TransactionId) * builder->committed.xcnt; - memcpy(ondisk_c, builder->committed.xip, sz); - COMP_CRC32C(ondisk->checksum, ondisk_c, sz); - ondisk_c += sz; + if (builder->committed.xcnt > 0) + { + sz = sizeof(TransactionId) * builder->committed.xcnt; + memcpy(ondisk_c, builder->committed.xip, sz); + COMP_CRC32C(ondisk->checksum, ondisk_c, sz); + ondisk_c += sz; + } + + /* copy catalog modifying xacts */ + if (catchange_xcnt > 0) + { + sz = sizeof(TransactionId) * catchange_xcnt; + memcpy(ondisk_c, catchange_xip, sz); + COMP_CRC32C(ondisk->checksum, ondisk_c, sz); + ondisk_c += sz; + } FIN_CRC32C(ondisk->checksum); @@ -1688,12 +1799,16 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn) */ builder->last_serialized_snapshot = lsn; + MemoryContextSwitchTo(old_ctx); + out: ReorderBufferSetRestartPoint(builder->reorder, builder->last_serialized_snapshot); /* be tidy */ if (ondisk) pfree(ondisk); + if (catchange_xip) + pfree(catchange_xip); } /* @@ -1707,7 +1822,6 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn) int fd; char path[MAXPGPATH]; Size sz; - int readBytes; pg_crc32c checksum; /* no point in loading a snapshot if we're already there */ @@ -1739,29 +1853,7 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn) /* read statically sized portion of snapshot */ - pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ); - readBytes = read(fd, &ondisk, SnapBuildOnDiskConstantSize); - pgstat_report_wait_end(); - if (readBytes != SnapBuildOnDiskConstantSize) - { - int save_errno = errno; - - CloseTransientFile(fd); - - if (readBytes < 0) - { - errno = save_errno; - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not read file \"%s\": %m", path))); - } - else - ereport(ERROR, - (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("could not read file \"%s\": read %d of %zu", - path, readBytes, - (Size) SnapBuildOnDiskConstantSize))); - } + SnapBuildRestoreContents(fd, (char *) &ondisk, SnapBuildOnDiskConstantSize, path); if (ondisk.magic != SNAPBUILD_MAGIC) ereport(ERROR, @@ -1781,56 +1873,26 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn) SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize); /* read SnapBuild */ - pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ); - readBytes = read(fd, &ondisk.builder, sizeof(SnapBuild)); - pgstat_report_wait_end(); - if (readBytes != sizeof(SnapBuild)) - { - int save_errno = errno; - - CloseTransientFile(fd); - - if (readBytes < 0) - { - errno = save_errno; - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not read file \"%s\": %m", path))); - } - else - ereport(ERROR, - (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("could not read file \"%s\": read %d of %zu", - path, readBytes, sizeof(SnapBuild)))); - } + SnapBuildRestoreContents(fd, (char *) &ondisk.builder, sizeof(SnapBuild), path); COMP_CRC32C(checksum, &ondisk.builder, sizeof(SnapBuild)); /* restore committed xacts information */ - sz = sizeof(TransactionId) * ondisk.builder.committed.xcnt; - ondisk.builder.committed.xip = MemoryContextAllocZero(builder->context, sz); - pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ); - readBytes = read(fd, ondisk.builder.committed.xip, sz); - pgstat_report_wait_end(); - if (readBytes != sz) + if (ondisk.builder.committed.xcnt > 0) { - int save_errno = errno; - - CloseTransientFile(fd); + sz = sizeof(TransactionId) * ondisk.builder.committed.xcnt; + ondisk.builder.committed.xip = MemoryContextAllocZero(builder->context, sz); + SnapBuildRestoreContents(fd, (char *) ondisk.builder.committed.xip, sz, path); + COMP_CRC32C(checksum, ondisk.builder.committed.xip, sz); + } - if (readBytes < 0) - { - errno = save_errno; - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not read file \"%s\": %m", path))); - } - else - ereport(ERROR, - (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("could not read file \"%s\": read %d of %zu", - path, readBytes, sz))); + /* restore catalog modifying xacts information */ + if (ondisk.builder.catchange.xcnt > 0) + { + sz = sizeof(TransactionId) * ondisk.builder.catchange.xcnt; + ondisk.builder.catchange.xip = MemoryContextAllocZero(builder->context, sz); + SnapBuildRestoreContents(fd, (char *) ondisk.builder.catchange.xip, sz, path); + COMP_CRC32C(checksum, ondisk.builder.catchange.xip, sz); } - COMP_CRC32C(checksum, ondisk.builder.committed.xip, sz); if (CloseTransientFile(fd) != 0) ereport(ERROR, @@ -1885,6 +1947,13 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn) } ondisk.builder.committed.xip = NULL; + /* set catalog modifying transactions */ + if (builder->catchange.xip) + pfree(builder->catchange.xip); + builder->catchange.xcnt = ondisk.builder.catchange.xcnt; + builder->catchange.xip = ondisk.builder.catchange.xip; + ondisk.builder.catchange.xip = NULL; + /* our snapshot is not interesting anymore, build a new one */ if (builder->snapshot != NULL) { @@ -1906,10 +1975,44 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn) snapshot_not_interesting: if (ondisk.builder.committed.xip != NULL) pfree(ondisk.builder.committed.xip); + if (ondisk.builder.catchange.xip != NULL) + pfree(ondisk.builder.catchange.xip); return false; } /* + * Read the contents of the serialized snapshot to 'dest'. + */ +static void +SnapBuildRestoreContents(int fd, char *dest, Size size, const char *path) +{ + int readBytes; + + pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ); + readBytes = read(fd, dest, size); + pgstat_report_wait_end(); + if (readBytes != size) + { + int save_errno = errno; + + CloseTransientFile(fd); + + if (readBytes < 0) + { + errno = save_errno; + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not read file \"%s\": %m", path))); + } + else + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("could not read file \"%s\": read %d of %zu", + path, readBytes, sizeof(SnapBuild)))); + } +} + +/* * Remove all serialized snapshots that are not required anymore because no * slot can need them. This doesn't actually have to run during a checkpoint, * but it's a convenient point to schedule this. |