diff options
Diffstat (limited to 'src/backend/replication/logical/logical.c')
-rw-r--r-- | src/backend/replication/logical/logical.c | 25 |
1 files changed, 17 insertions, 8 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 1512be5322c..40432fe6407 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -210,6 +210,7 @@ StartupDecodingContext(List *output_plugin_options, LogicalDecodingContext * CreateInitDecodingContext(char *plugin, List *output_plugin_options, + bool need_full_snapshot, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write) @@ -267,23 +268,31 @@ CreateInitDecodingContext(char *plugin, * the slot machinery about the new limit. Once that's done the * ProcArrayLock can be released as the slot machinery now is * protecting against vacuum. + * + * Note that, temporarily, the data, not just the catalog, xmin has to be + * reserved if a data snapshot is to be exported. Otherwise the initial + * data snapshot created here is not guaranteed to be valid. After that + * the data xmin doesn't need to be managed anymore and the global xmin + * should be recomputed. As we are fine with losing the pegged data xmin + * after crash - no chance a snapshot would get exported anymore - we can + * get away with just setting the slot's + * effective_xmin. ReplicationSlotRelease will reset it again. + * * ---- */ LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); - slot->effective_catalog_xmin = GetOldestSafeDecodingTransactionId(); - slot->data.catalog_xmin = slot->effective_catalog_xmin; + xmin_horizon = GetOldestSafeDecodingTransactionId(need_full_snapshot); + + slot->effective_catalog_xmin = xmin_horizon; + slot->data.catalog_xmin = xmin_horizon; + if (need_full_snapshot) + slot->effective_xmin = xmin_horizon; ReplicationSlotsComputeRequiredXmin(true); LWLockRelease(ProcArrayLock); - /* - * tell the snapshot builder to only assemble snapshot once reaching the - * running_xact's record with the respective xmin. - */ - xmin_horizon = slot->data.catalog_xmin; - ReplicationSlotMarkDirty(); ReplicationSlotSave(); |