aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/replication/slot.c226
1 files changed, 154 insertions, 72 deletions
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index 505445f2dc8..a7bbcf34991 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -99,6 +99,9 @@ ReplicationSlot *MyReplicationSlot = NULL;
int max_replication_slots = 0; /* the maximum number of replication
* slots */
+static ReplicationSlot *SearchNamedReplicationSlot(const char *name);
+static int ReplicationSlotAcquireInternal(ReplicationSlot *slot,
+ const char *name, SlotAcquireBehavior behavior);
static void ReplicationSlotDropAcquired(void);
static void ReplicationSlotDropPtr(ReplicationSlot *slot);
@@ -322,77 +325,117 @@ ReplicationSlotCreate(const char *name, bool db_specific,
}
/*
- * Find a previously created slot and mark it as used by this backend.
+ * Search for the named replication slot.
*
- * The return value is only useful if behavior is SAB_Inquire, in which
- * it's zero if we successfully acquired the slot, or the PID of the
- * owning process otherwise. If behavior is SAB_Error, then trying to
- * acquire an owned slot is an error. If SAB_Block, we sleep until the
- * slot is released by the owning process.
+ * Return the replication slot if found, otherwise NULL.
+ *
+ * The caller must hold ReplicationSlotControlLock in shared mode.
*/
-int
-ReplicationSlotAcquire(const char *name, SlotAcquireBehavior behavior)
+static ReplicationSlot *
+SearchNamedReplicationSlot(const char *name)
{
- ReplicationSlot *slot;
- int active_pid;
int i;
+ ReplicationSlot *slot = NULL;
-retry:
- Assert(MyReplicationSlot == NULL);
+ Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock,
+ LW_SHARED));
- /*
- * Search for the named slot and mark it active if we find it. If the
- * slot is already active, we exit the loop with active_pid set to the PID
- * of the backend that owns it.
- */
- active_pid = 0;
- slot = NULL;
- LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
for (i = 0; i < max_replication_slots; i++)
{
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
{
- /*
- * This is the slot we want; check if it's active under some other
- * process. In single user mode, we don't need this check.
- */
- if (IsUnderPostmaster)
- {
- /*
- * Get ready to sleep on it in case it is active. (We may end
- * up not sleeping, but we don't want to do this while holding
- * the spinlock.)
- */
- ConditionVariablePrepareToSleep(&s->active_cv);
-
- SpinLockAcquire(&s->mutex);
-
- active_pid = s->active_pid;
- if (active_pid == 0)
- active_pid = s->active_pid = MyProcPid;
-
- SpinLockRelease(&s->mutex);
- }
- else
- active_pid = MyProcPid;
slot = s;
-
break;
}
}
- LWLockRelease(ReplicationSlotControlLock);
- /* If we did not find the slot, error out. */
- if (slot == NULL)
+ return slot;
+}
+
+/*
+ * Find a previously created slot and mark it as used by this process.
+ *
+ * The return value is only useful if behavior is SAB_Inquire, in which
+ * it's zero if we successfully acquired the slot, -1 if the slot no longer
+ * exists, or the PID of the owning process otherwise. If behavior is
+ * SAB_Error, then trying to acquire an owned slot is an error.
+ * If SAB_Block, we sleep until the slot is released by the owning process.
+ */
+int
+ReplicationSlotAcquire(const char *name, SlotAcquireBehavior behavior)
+{
+ return ReplicationSlotAcquireInternal(NULL, name, behavior);
+}
+
+/*
+ * Mark the specified slot as used by this process.
+ *
+ * Only one of slot and name can be specified.
+ * If slot == NULL, search for the slot with the given name.
+ *
+ * See comments about the return value in ReplicationSlotAcquire().
+ */
+static int
+ReplicationSlotAcquireInternal(ReplicationSlot *slot, const char *name,
+ SlotAcquireBehavior behavior)
+{
+ ReplicationSlot *s;
+ int active_pid;
+
+ AssertArg((slot == NULL) ^ (name == NULL));
+
+retry:
+ Assert(MyReplicationSlot == NULL);
+
+ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+
+ /*
+ * Search for the slot with the specified name if the slot to acquire is
+ * not given. If the slot is not found, we either return -1 or error out.
+ */
+ s = slot ? slot : SearchNamedReplicationSlot(name);
+ if (s == NULL || !s->in_use)
+ {
+ LWLockRelease(ReplicationSlotControlLock);
+
+ if (behavior == SAB_Inquire)
+ return -1;
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
- errmsg("replication slot \"%s\" does not exist", name)));
+ errmsg("replication slot \"%s\" does not exist",
+ name ? name : NameStr(slot->data.name))));
+ }
/*
- * If we found the slot but it's already active in another backend, we
- * either error out or retry after a short wait, as caller specified.
+ * This is the slot we want; check if it's active under some other
+ * process. In single user mode, we don't need this check.
+ */
+ if (IsUnderPostmaster)
+ {
+ /*
+ * Get ready to sleep on the slot in case it is active if SAB_Block.
+ * (We may end up not sleeping, but we don't want to do this while
+ * holding the spinlock.)
+ */
+ if (behavior == SAB_Block)
+ ConditionVariablePrepareToSleep(&s->active_cv);
+
+ SpinLockAcquire(&s->mutex);
+ if (s->active_pid == 0)
+ s->active_pid = MyProcPid;
+ active_pid = s->active_pid;
+ SpinLockRelease(&s->mutex);
+ }
+ else
+ active_pid = MyProcPid;
+ LWLockRelease(ReplicationSlotControlLock);
+
+ /*
+ * If we found the slot but it's already active in another process, we
+ * either error out, return the PID of the owning process, or retry
+ * after a short wait, as caller specified.
*/
if (active_pid != MyProcPid)
{
@@ -400,24 +443,24 @@ retry:
ereport(ERROR,
(errcode(ERRCODE_OBJECT_IN_USE),
errmsg("replication slot \"%s\" is active for PID %d",
- name, active_pid)));
+ NameStr(s->data.name), active_pid)));
else if (behavior == SAB_Inquire)
return active_pid;
/* Wait here until we get signaled, and then restart */
- ConditionVariableSleep(&slot->active_cv,
+ ConditionVariableSleep(&s->active_cv,
WAIT_EVENT_REPLICATION_SLOT_DROP);
ConditionVariableCancelSleep();
goto retry;
}
- else
- ConditionVariableCancelSleep(); /* no sleep needed after all */
+ else if (behavior == SAB_Block)
+ ConditionVariableCancelSleep(); /* no sleep needed after all */
/* Let everybody know we've modified this slot */
- ConditionVariableBroadcast(&slot->active_cv);
+ ConditionVariableBroadcast(&s->active_cv);
/* We made this slot active, so it's ours now. */
- MyReplicationSlot = slot;
+ MyReplicationSlot = s;
/* success */
return 0;
@@ -1100,43 +1143,82 @@ restart:
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
XLogRecPtr restart_lsn = InvalidXLogRecPtr;
NameData slotname;
+ int wspid;
+ int last_signaled_pid = 0;
if (!s->in_use)
continue;
SpinLockAcquire(&s->mutex);
- if (s->data.restart_lsn == InvalidXLogRecPtr ||
- s->data.restart_lsn >= oldestLSN)
- {
- SpinLockRelease(&s->mutex);
- continue;
- }
-
slotname = s->data.name;
restart_lsn = s->data.restart_lsn;
-
SpinLockRelease(&s->mutex);
+
+ if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn >= oldestLSN)
+ continue;
LWLockRelease(ReplicationSlotControlLock);
+ /* Get ready to sleep on the slot in case it is active */
+ ConditionVariablePrepareToSleep(&s->active_cv);
+
for (;;)
{
- int wspid = ReplicationSlotAcquire(NameStr(slotname),
- SAB_Inquire);
+ /*
+ * Try to mark this slot as used by this process.
+ *
+ * Note that ReplicationSlotAcquireInternal(SAB_Inquire)
+ * should not cancel the prepared condition variable
+ * if this slot is active in other process. Because in this case
+ * we have to wait on that CV for the process owning
+ * the slot to be terminated, later.
+ */
+ wspid = ReplicationSlotAcquireInternal(s, NULL, SAB_Inquire);
- /* no walsender? success! */
- if (wspid == 0)
+ /*
+ * Exit the loop if we successfully acquired the slot or
+ * the slot was dropped during waiting for the owning process
+ * to be terminated. For example, the latter case is likely to
+ * happen when the slot is temporary because it's automatically
+ * dropped by the termination of the owning process.
+ */
+ if (wspid <= 0)
break;
- ereport(LOG,
- (errmsg("terminating walsender %d because replication slot \"%s\" is too far behind",
- wspid, NameStr(slotname))));
- (void) kill(wspid, SIGTERM);
+ /*
+ * Signal to terminate the process that owns the slot.
+ *
+ * There is the race condition where other process may own
+ * the slot after the process using it was terminated and before
+ * this process owns it. To handle this case, we signal again
+ * if the PID of the owning process is changed than the last.
+ *
+ * XXX This logic assumes that the same PID is not reused
+ * very quickly.
+ */
+ if (last_signaled_pid != wspid)
+ {
+ ereport(LOG,
+ (errmsg("terminating process %d because replication slot \"%s\" is too far behind",
+ wspid, NameStr(slotname))));
+ (void) kill(wspid, SIGTERM);
+ last_signaled_pid = wspid;
+ }
ConditionVariableTimedSleep(&s->active_cv, 10,
WAIT_EVENT_REPLICATION_SLOT_DROP);
}
ConditionVariableCancelSleep();
+ /*
+ * Do nothing here and start from scratch if the slot has
+ * already been dropped.
+ */
+ if (wspid == -1)
+ {
+ CHECK_FOR_INTERRUPTS();
+ goto restart;
+ }
+
ereport(LOG,
(errmsg("invalidating slot \"%s\" because its restart_lsn %X/%X exceeds max_slot_wal_keep_size",
NameStr(slotname),