aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/logical/logical.c25
-rw-r--r--src/backend/replication/logical/snapbuild.c12
-rw-r--r--src/backend/replication/slot.c25
-rw-r--r--src/backend/replication/slotfuncs.c4
-rw-r--r--src/backend/replication/walsender.c1
-rw-r--r--src/backend/storage/ipc/procarray.c14
-rw-r--r--src/include/replication/logical.h1
-rw-r--r--src/include/storage/procarray.h2
8 files changed, 66 insertions, 18 deletions
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 24e5960e29f..e32f773e187 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -208,6 +208,7 @@ StartupDecodingContext(List *output_plugin_options,
LogicalDecodingContext *
CreateInitDecodingContext(char *plugin,
List *output_plugin_options,
+ bool need_full_snapshot,
XLogPageReadCB read_page,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write)
@@ -310,23 +311,31 @@ CreateInitDecodingContext(char *plugin,
* the slot machinery about the new limit. Once that's done the
* ProcArrayLock can be released as the slot machinery now is
* protecting against vacuum.
+ *
+ * Note that, temporarily, the data, not just the catalog, xmin has to be
+ * reserved if a data snapshot is to be exported. Otherwise the initial
+ * data snapshot created here is not guaranteed to be valid. After that
+ * the data xmin doesn't need to be managed anymore and the global xmin
+ * should be recomputed. As we are fine with losing the pegged data xmin
+ * after crash - no chance a snapshot would get exported anymore - we can
+ * get away with just setting the slot's
+ * effective_xmin. ReplicationSlotRelease will reset it again.
+ *
* ----
*/
LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
- slot->effective_catalog_xmin = GetOldestSafeDecodingTransactionId();
- slot->data.catalog_xmin = slot->effective_catalog_xmin;
+ xmin_horizon = GetOldestSafeDecodingTransactionId(need_full_snapshot);
+
+ slot->effective_catalog_xmin = xmin_horizon;
+ slot->data.catalog_xmin = xmin_horizon;
+ if (need_full_snapshot)
+ slot->effective_xmin = xmin_horizon;
ReplicationSlotsComputeRequiredXmin(true);
LWLockRelease(ProcArrayLock);
- /*
- * tell the snapshot builder to only assemble snapshot once reaching the a
- * running_xact's record with the respective xmin.
- */
- xmin_horizon = slot->data.catalog_xmin;
-
ReplicationSlotMarkDirty();
ReplicationSlotSave();
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index 513c5c37c66..e70e525404c 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -553,6 +553,18 @@ SnapBuildExportSnapshot(SnapBuild *builder)
* mechanism. Due to that we can do this without locks, we're only
* changing our own value.
*/
+#ifdef USE_ASSERT_CHECKING
+ {
+ TransactionId safeXid;
+
+ LWLockAcquire(ProcArrayLock, LW_SHARED);
+ safeXid = GetOldestSafeDecodingTransactionId(true);
+ LWLockRelease(ProcArrayLock);
+
+ Assert(TransactionIdPrecedesOrEquals(safeXid, snap->xmin));
+ }
+#endif
+
MyPgXact->xmin = snap->xmin;
/* allocate in transaction context */
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 8ae0acd61f5..12779029102 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -396,6 +396,22 @@ ReplicationSlotRelease(void)
SpinLockRelease(&slot->mutex);
}
+
+ /*
+ * If slot needed to temporarily restrain both data and catalog xmin to
+ * create the catalog snapshot, remove that temporary constraint.
+ * Snapshots can only be exported while the initial snapshot is still
+ * acquired.
+ */
+ if (!TransactionIdIsValid(slot->data.xmin) &&
+ TransactionIdIsValid(slot->effective_xmin))
+ {
+ SpinLockAcquire(&slot->mutex);
+ slot->effective_xmin = InvalidTransactionId;
+ SpinLockRelease(&slot->mutex);
+ ReplicationSlotsComputeRequiredXmin(false);
+ }
+
MyReplicationSlot = NULL;
/* might not have been set when we've been a plain slot */
@@ -580,6 +596,9 @@ ReplicationSlotPersist(void)
/*
* Compute the oldest xmin across all slots and store it in the ProcArray.
+ *
+ * If already_locked is true, ProcArrayLock has already been acquired
+ * exclusively.
*/
void
ReplicationSlotsComputeRequiredXmin(bool already_locked)
@@ -590,8 +609,7 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked)
Assert(ReplicationSlotCtl != NULL);
- if (!already_locked)
- LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
for (i = 0; i < max_replication_slots; i++)
{
@@ -624,8 +642,7 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked)
agg_catalog_xmin = effective_catalog_xmin;
}
- if (!already_locked)
- LWLockRelease(ReplicationSlotControlLock);
+ LWLockRelease(ReplicationSlotControlLock);
ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
}
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 348c7fe9fce..da874d5ada1 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -109,8 +109,8 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
/*
* Create logical decoding context, to build the initial snapshot.
*/
- ctx = CreateInitDecodingContext(
- NameStr(*plugin), NIL,
+ ctx = CreateInitDecodingContext(NameStr(*plugin), NIL,
+ false, /* do not build snapshot */
logical_read_local_xlog_page, NULL, NULL);
/* build initial snapshot, might take a while */
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index b4c5a7d3b3f..31e9c8ad6de 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -812,6 +812,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
LogicalDecodingContext *ctx;
ctx = CreateInitDecodingContext(cmd->plugin, NIL,
+ true, /* build snapshot */
logical_read_xlog_page,
WalSndPrepareWrite, WalSndWriteData);
diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c
index 4f3c5c9dec9..12b74da41de 100644
--- a/src/backend/storage/ipc/procarray.c
+++ b/src/backend/storage/ipc/procarray.c
@@ -1978,7 +1978,7 @@ GetOldestActiveTransactionId(void)
* that the caller will immediately use the xid to peg the xmin horizon.
*/
TransactionId
-GetOldestSafeDecodingTransactionId(void)
+GetOldestSafeDecodingTransactionId(bool catalogOnly)
{
ProcArrayStruct *arrayP = procArray;
TransactionId oldestSafeXid;
@@ -2001,9 +2001,17 @@ GetOldestSafeDecodingTransactionId(void)
/*
* If there's already a slot pegging the xmin horizon, we can start with
* that value, it's guaranteed to be safe since it's computed by this
- * routine initially and has been enforced since.
+ * routine initially and has been enforced since. We can always use the
+ * slot's general xmin horizon, but the catalog horizon is only usable
+ * when we only catalog data is going to be looked at.
*/
- if (TransactionIdIsValid(procArray->replication_slot_catalog_xmin) &&
+ if (TransactionIdIsValid(procArray->replication_slot_xmin) &&
+ TransactionIdPrecedes(procArray->replication_slot_xmin,
+ oldestSafeXid))
+ oldestSafeXid = procArray->replication_slot_xmin;
+
+ if (catalogOnly &&
+ TransactionIdIsValid(procArray->replication_slot_catalog_xmin) &&
TransactionIdPrecedes(procArray->replication_slot_catalog_xmin,
oldestSafeXid))
oldestSafeXid = procArray->replication_slot_catalog_xmin;
diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h
index dfdbe6535f1..73b28360644 100644
--- a/src/include/replication/logical.h
+++ b/src/include/replication/logical.h
@@ -79,6 +79,7 @@ extern void CheckLogicalDecodingRequirements(void);
extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin,
List *output_plugin_options,
+ bool need_full_snapshot,
XLogPageReadCB read_page,
LogicalOutputPluginWriterPrepareWrite prepare_write,
LogicalOutputPluginWriterWrite do_write);
diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h
index a9b40ed944f..d40ef3eb570 100644
--- a/src/include/storage/procarray.h
+++ b/src/include/storage/procarray.h
@@ -54,7 +54,7 @@ extern bool TransactionIdIsInProgress(TransactionId xid);
extern bool TransactionIdIsActive(TransactionId xid);
extern TransactionId GetOldestXmin(Relation rel, bool ignoreVacuum);
extern TransactionId GetOldestActiveTransactionId(void);
-extern TransactionId GetOldestSafeDecodingTransactionId(void);
+extern TransactionId GetOldestSafeDecodingTransactionId(bool catalogOnly);
extern VirtualTransactionId *GetVirtualXIDsDelayingChkpt(int *nvxids);
extern bool HaveVirtualXIDsDelayingChkpt(VirtualTransactionId *vxids, int nvxids);