diff options
Diffstat (limited to 'src/backend/replication/logical')
-rw-r--r-- | src/backend/replication/logical/logical.c | 25 | ||||
-rw-r--r-- | src/backend/replication/logical/snapbuild.c | 12 |
2 files changed, 29 insertions, 8 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 24e5960e29f..e32f773e187 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -208,6 +208,7 @@ StartupDecodingContext(List *output_plugin_options, LogicalDecodingContext * CreateInitDecodingContext(char *plugin, List *output_plugin_options, + bool need_full_snapshot, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write) @@ -310,23 +311,31 @@ CreateInitDecodingContext(char *plugin, * the slot machinery about the new limit. Once that's done the * ProcArrayLock can be released as the slot machinery now is * protecting against vacuum. + * + * Note that, temporarily, the data, not just the catalog, xmin has to be + * reserved if a data snapshot is to be exported. Otherwise the initial + * data snapshot created here is not guaranteed to be valid. After that + * the data xmin doesn't need to be managed anymore and the global xmin + * should be recomputed. As we are fine with losing the pegged data xmin + * after crash - no chance a snapshot would get exported anymore - we can + * get away with just setting the slot's + * effective_xmin. ReplicationSlotRelease will reset it again. + * * ---- */ LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); - slot->effective_catalog_xmin = GetOldestSafeDecodingTransactionId(); - slot->data.catalog_xmin = slot->effective_catalog_xmin; + xmin_horizon = GetOldestSafeDecodingTransactionId(need_full_snapshot); + + slot->effective_catalog_xmin = xmin_horizon; + slot->data.catalog_xmin = xmin_horizon; + if (need_full_snapshot) + slot->effective_xmin = xmin_horizon; ReplicationSlotsComputeRequiredXmin(true); LWLockRelease(ProcArrayLock); - /* - * tell the snapshot builder to only assemble snapshot once reaching the a - * running_xact's record with the respective xmin. - */ - xmin_horizon = slot->data.catalog_xmin; - ReplicationSlotMarkDirty(); ReplicationSlotSave(); diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 513c5c37c66..e70e525404c 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -553,6 +553,18 @@ SnapBuildExportSnapshot(SnapBuild *builder) * mechanism. Due to that we can do this without locks, we're only * changing our own value. */ +#ifdef USE_ASSERT_CHECKING + { + TransactionId safeXid; + + LWLockAcquire(ProcArrayLock, LW_SHARED); + safeXid = GetOldestSafeDecodingTransactionId(true); + LWLockRelease(ProcArrayLock); + + Assert(TransactionIdPrecedesOrEquals(safeXid, snap->xmin)); + } +#endif + MyPgXact->xmin = snap->xmin; /* allocate in transaction context */ |