aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/logical/logicalfuncs.c2
-rw-r--r--src/backend/replication/slot.c122
-rw-r--r--src/backend/replication/slotfuncs.c34
-rw-r--r--src/backend/replication/walsender.c6
-rw-r--r--src/include/replication/slot.h10
5 files changed, 118 insertions, 56 deletions
diff --git a/src/backend/replication/logical/logicalfuncs.c b/src/backend/replication/logical/logicalfuncs.c
index 363ca82cb0b..a3ba2b1266c 100644
--- a/src/backend/replication/logical/logicalfuncs.c
+++ b/src/backend/replication/logical/logicalfuncs.c
@@ -244,7 +244,7 @@ pg_logical_slot_get_changes_guts(FunctionCallInfo fcinfo, bool confirm, bool bin
else
end_of_wal = GetXLogReplayRecPtr(&ThisTimeLineID);
- ReplicationSlotAcquire(NameStr(*name));
+ ReplicationSlotAcquire(NameStr(*name), true);
PG_TRY();
{
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index dc7de20e113..08c0b1b285f 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -157,6 +157,7 @@ ReplicationSlotsShmemInit(void)
/* everything else is zeroed by the memset above */
SpinLockInit(&slot->mutex);
LWLockInitialize(&slot->io_in_progress_lock, LWTRANCHE_REPLICATION_SLOT_IO_IN_PROGRESS);
+ ConditionVariableInit(&slot->active_cv);
}
}
}
@@ -313,25 +314,35 @@ ReplicationSlotCreate(const char *name, bool db_specific,
LWLockRelease(ReplicationSlotControlLock);
/*
- * Now that the slot has been marked as in_use and in_active, it's safe to
+ * Now that the slot has been marked as in_use and active, it's safe to
* let somebody else try to allocate a slot.
*/
LWLockRelease(ReplicationSlotAllocationLock);
+
+ /* Let everybody know we've modified this slot */
+ ConditionVariableBroadcast(&slot->active_cv);
}
/*
* Find a previously created slot and mark it as used by this backend.
*/
void
-ReplicationSlotAcquire(const char *name)
+ReplicationSlotAcquire(const char *name, bool nowait)
{
- ReplicationSlot *slot = NULL;
+ ReplicationSlot *slot;
+ int active_pid;
int i;
- int active_pid = 0; /* Keep compiler quiet */
+retry:
Assert(MyReplicationSlot == NULL);
- /* Search for the named slot and mark it active if we find it. */
+ /*
+ * Search for the named slot and mark it active if we find it. If the
+ * slot is already active, we exit the loop with active_pid set to the PID
+ * of the backend that owns it.
+ */
+ active_pid = 0;
+ slot = NULL;
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
for (i = 0; i < max_replication_slots; i++)
{
@@ -339,35 +350,66 @@ ReplicationSlotAcquire(const char *name)
if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
{
+ /*
+ * This is the slot we want. We don't know yet if it's active,
+ * so get ready to sleep on it in case it is. (We may end up not
+ * sleeping, but we don't want to do this while holding the
+ * spinlock.)
+ */
+ ConditionVariablePrepareToSleep(&s->active_cv);
+
SpinLockAcquire(&s->mutex);
+
active_pid = s->active_pid;
if (active_pid == 0)
active_pid = s->active_pid = MyProcPid;
+
SpinLockRelease(&s->mutex);
slot = s;
+
break;
}
}
LWLockRelease(ReplicationSlotControlLock);
- /* If we did not find the slot or it was already active, error out. */
+ /* If we did not find the slot, error out. */
if (slot == NULL)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_OBJECT),
errmsg("replication slot \"%s\" does not exist", name)));
+
+ /*
+ * If we found the slot but it's already active in another backend, we
+ * either error out or retry after a short wait, as caller specified.
+ */
if (active_pid != MyProcPid)
- ereport(ERROR,
- (errcode(ERRCODE_OBJECT_IN_USE),
- errmsg("replication slot \"%s\" is active for PID %d",
- name, active_pid)));
+ {
+ if (nowait)
+ ereport(ERROR,
+ (errcode(ERRCODE_OBJECT_IN_USE),
+ errmsg("replication slot \"%s\" is active for PID %d",
+ name, active_pid)));
+
+ /* Wait here until we get signaled, and then restart */
+ ConditionVariableSleep(&slot->active_cv, PG_WAIT_LOCK);
+ ConditionVariableCancelSleep();
+ goto retry;
+ }
+ else
+ ConditionVariableCancelSleep(); /* no sleep needed after all */
+
+ /* Let everybody know we've modified this slot */
+ ConditionVariableBroadcast(&slot->active_cv);
/* We made this slot active, so it's ours now. */
MyReplicationSlot = slot;
}
/*
- * Release a replication slot, this or another backend can ReAcquire it
- * later. Resources this slot requires will be preserved.
+ * Release the replication slot that this backend considers to own.
+ *
+ * This or another backend can re-acquire the slot later.
+ * Resources this slot requires will be preserved.
*/
void
ReplicationSlotRelease(void)
@@ -385,17 +427,6 @@ ReplicationSlotRelease(void)
*/
ReplicationSlotDropAcquired();
}
- else if (slot->data.persistency == RS_PERSISTENT)
- {
- /*
- * Mark persistent slot inactive. We're not freeing it, just
- * disconnecting.
- */
- SpinLockAcquire(&slot->mutex);
- slot->active_pid = 0;
- SpinLockRelease(&slot->mutex);
- }
-
/*
* If slot needed to temporarily restrain both data and catalog xmin to
@@ -412,6 +443,18 @@ ReplicationSlotRelease(void)
ReplicationSlotsComputeRequiredXmin(false);
}
+ if (slot->data.persistency == RS_PERSISTENT)
+ {
+ /*
+ * Mark persistent slot inactive. We're not freeing it, just
+ * disconnecting, but wake up others that may be waiting for it.
+ */
+ SpinLockAcquire(&slot->mutex);
+ slot->active_pid = 0;
+ SpinLockRelease(&slot->mutex);
+ ConditionVariableBroadcast(&slot->active_cv);
+ }
+
MyReplicationSlot = NULL;
/* might not have been set when we've been a plain slot */
@@ -430,32 +473,43 @@ ReplicationSlotCleanup(void)
Assert(MyReplicationSlot == NULL);
- /*
- * No need for locking as we are only interested in slots active in
- * current process and those are not touched by other processes.
- */
+restart:
+ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
for (i = 0; i < max_replication_slots; i++)
{
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+ if (!s->in_use)
+ continue;
+
+ SpinLockAcquire(&s->mutex);
if (s->active_pid == MyProcPid)
{
- Assert(s->in_use && s->data.persistency == RS_TEMPORARY);
+ Assert(s->data.persistency == RS_TEMPORARY);
+ SpinLockRelease(&s->mutex);
+ LWLockRelease(ReplicationSlotControlLock); /* avoid deadlock */
ReplicationSlotDropPtr(s);
+
+ ConditionVariableBroadcast(&s->active_cv);
+ goto restart;
}
+ else
+ SpinLockRelease(&s->mutex);
}
+
+ LWLockRelease(ReplicationSlotControlLock);
}
/*
* Permanently drop replication slot identified by the passed in name.
*/
void
-ReplicationSlotDrop(const char *name)
+ReplicationSlotDrop(const char *name, bool nowait)
{
Assert(MyReplicationSlot == NULL);
- ReplicationSlotAcquire(name);
+ ReplicationSlotAcquire(name, nowait);
ReplicationSlotDropAcquired();
}
@@ -527,6 +581,9 @@ ReplicationSlotDropPtr(ReplicationSlot *slot)
slot->active_pid = 0;
SpinLockRelease(&slot->mutex);
+ /* wake up anyone waiting on this slot */
+ ConditionVariableBroadcast(&slot->active_cv);
+
ereport(fail_softly ? WARNING : ERROR,
(errcode_for_file_access(),
errmsg("could not rename file \"%s\" to \"%s\": %m",
@@ -535,15 +592,18 @@ ReplicationSlotDropPtr(ReplicationSlot *slot)
/*
* The slot is definitely gone. Lock out concurrent scans of the array
- * long enough to kill it. It's OK to clear the active flag here without
+ * long enough to kill it. It's OK to clear the active PID here without
* grabbing the mutex because nobody else can be scanning the array here,
* and nobody can be attached to this slot and thus access it without
* scanning the array.
+ *
+ * Also wake up processes waiting for it.
*/
LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
slot->active_pid = 0;
slot->in_use = false;
LWLockRelease(ReplicationSlotControlLock);
+ ConditionVariableBroadcast(&slot->active_cv);
/*
* Slot is dead and doesn't prevent resource removal anymore, recompute
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index 6dc808874d6..d4cbd83bde1 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -171,7 +171,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
CheckSlotRequirements();
- ReplicationSlotDrop(NameStr(*name));
+ ReplicationSlotDrop(NameStr(*name), false);
PG_RETURN_VOID();
}
@@ -221,6 +221,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
MemoryContextSwitchTo(oldcontext);
+ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
for (slotno = 0; slotno < max_replication_slots; slotno++)
{
ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[slotno];
@@ -238,25 +239,21 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
NameData plugin;
int i;
- SpinLockAcquire(&slot->mutex);
if (!slot->in_use)
- {
- SpinLockRelease(&slot->mutex);
continue;
- }
- else
- {
- xmin = slot->data.xmin;
- catalog_xmin = slot->data.catalog_xmin;
- database = slot->data.database;
- restart_lsn = slot->data.restart_lsn;
- confirmed_flush_lsn = slot->data.confirmed_flush;
- namecpy(&slot_name, &slot->data.name);
- namecpy(&plugin, &slot->data.plugin);
-
- active_pid = slot->active_pid;
- persistency = slot->data.persistency;
- }
+
+ SpinLockAcquire(&slot->mutex);
+
+ xmin = slot->data.xmin;
+ catalog_xmin = slot->data.catalog_xmin;
+ database = slot->data.database;
+ restart_lsn = slot->data.restart_lsn;
+ confirmed_flush_lsn = slot->data.confirmed_flush;
+ namecpy(&slot_name, &slot->data.name);
+ namecpy(&plugin, &slot->data.plugin);
+ active_pid = slot->active_pid;
+ persistency = slot->data.persistency;
+
SpinLockRelease(&slot->mutex);
memset(nulls, 0, sizeof(nulls));
@@ -309,6 +306,7 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
}
+ LWLockRelease(ReplicationSlotControlLock);
tuplestore_donestoring(tupstore);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 002143b26a2..9a2babef1e6 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -541,7 +541,7 @@ StartReplication(StartReplicationCmd *cmd)
if (cmd->slotname)
{
- ReplicationSlotAcquire(cmd->slotname);
+ ReplicationSlotAcquire(cmd->slotname, true);
if (SlotIsLogical(MyReplicationSlot))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
@@ -1028,7 +1028,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
static void
DropReplicationSlot(DropReplicationSlotCmd *cmd)
{
- ReplicationSlotDrop(cmd->slotname);
+ ReplicationSlotDrop(cmd->slotname, false);
EndCommand("DROP_REPLICATION_SLOT", DestRemote);
}
@@ -1046,7 +1046,7 @@ StartLogicalReplication(StartReplicationCmd *cmd)
Assert(!MyReplicationSlot);
- ReplicationSlotAcquire(cmd->slotname);
+ ReplicationSlotAcquire(cmd->slotname, true);
/*
* Force a disconnect, so that the decoding code doesn't need to care
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index a283f4e2b86..0bf2611fe9c 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -12,6 +12,7 @@
#include "fmgr.h"
#include "access/xlog.h"
#include "access/xlogreader.h"
+#include "storage/condition_variable.h"
#include "storage/lwlock.h"
#include "storage/shmem.h"
#include "storage/spin.h"
@@ -19,7 +20,7 @@
/*
* Behaviour of replication slots, upon release or crash.
*
- * Slots marked as PERSISTENT are crashsafe and will not be dropped when
+ * Slots marked as PERSISTENT are crash-safe and will not be dropped when
* released. Slots marked as EPHEMERAL will be dropped when released or after
* restarts.
*
@@ -117,6 +118,9 @@ typedef struct ReplicationSlot
/* is somebody performing io on this slot? */
LWLock io_in_progress_lock;
+ /* Condition variable signalled when active_pid changes */
+ ConditionVariable active_cv;
+
/* all the remaining data is only used for logical slots */
/*
@@ -162,9 +166,9 @@ extern void ReplicationSlotsShmemInit(void);
extern void ReplicationSlotCreate(const char *name, bool db_specific,
ReplicationSlotPersistency p);
extern void ReplicationSlotPersist(void);
-extern void ReplicationSlotDrop(const char *name);
+extern void ReplicationSlotDrop(const char *name, bool nowait);
-extern void ReplicationSlotAcquire(const char *name);
+extern void ReplicationSlotAcquire(const char *name, bool nowait);
extern void ReplicationSlotRelease(void);
extern void ReplicationSlotCleanup(void);
extern void ReplicationSlotSave(void);