aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/access/transam/xact.c4
-rw-r--r--src/backend/bootstrap/bootstrap.c2
-rw-r--r--src/backend/postmaster/bgwriter.c2
-rw-r--r--src/backend/postmaster/checkpointer.c2
-rw-r--r--src/backend/postmaster/walwriter.c2
-rw-r--r--src/backend/replication/walsender.c2
-rw-r--r--src/backend/storage/lmgr/Makefile2
-rw-r--r--src/backend/storage/lmgr/condition_variable.c225
-rw-r--r--src/backend/storage/lmgr/proc.c7
-rw-r--r--src/include/storage/condition_variable.h59
-rw-r--r--src/include/storage/proc.h3
-rw-r--r--src/include/storage/proclist.h56
12 files changed, 364 insertions, 2 deletions
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 95805963af1..d6432165f1e 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -45,6 +45,7 @@
#include "replication/origin.h"
#include "replication/syncrep.h"
#include "replication/walsender.h"
+#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/lmgr.h"
#include "storage/predicate.h"
@@ -2472,6 +2473,9 @@ AbortTransaction(void)
/* Reset WAL record construction state */
XLogResetInsertion();
+ /* Cancel condition variable sleep */
+ ConditionVariableCancelSleep();
+
/*
* Also clean up any open wait for lock, since the lock manager will choke
* if we try to wait for another lock before doing this.
diff --git a/src/backend/bootstrap/bootstrap.c b/src/backend/bootstrap/bootstrap.c
index 3870a4deb97..5c5ba7bec20 100644
--- a/src/backend/bootstrap/bootstrap.c
+++ b/src/backend/bootstrap/bootstrap.c
@@ -33,6 +33,7 @@
#include "replication/walreceiver.h"
#include "storage/bufmgr.h"
#include "storage/bufpage.h"
+#include "storage/condition_variable.h"
#include "storage/ipc.h"
#include "storage/proc.h"
#include "tcop/tcopprot.h"
@@ -536,6 +537,7 @@ static void
ShutdownAuxiliaryProcess(int code, Datum arg)
{
LWLockReleaseAll();
+ ConditionVariableCancelSleep();
pgstat_report_wait_end();
}
diff --git a/src/backend/postmaster/bgwriter.c b/src/backend/postmaster/bgwriter.c
index c3f33561da2..a31d44e799c 100644
--- a/src/backend/postmaster/bgwriter.c
+++ b/src/backend/postmaster/bgwriter.c
@@ -46,6 +46,7 @@
#include "postmaster/bgwriter.h"
#include "storage/bufmgr.h"
#include "storage/buf_internals.h"
+#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/lwlock.h"
@@ -187,6 +188,7 @@ BackgroundWriterMain(void)
* about in bgwriter, but we do have LWLocks, buffers, and temp files.
*/
LWLockReleaseAll();
+ ConditionVariableCancelSleep();
AbortBufferIO();
UnlockBuffers();
/* buffer pins are released here: */
diff --git a/src/backend/postmaster/checkpointer.c b/src/backend/postmaster/checkpointer.c
index 397267c6b74..92b0a9416d9 100644
--- a/src/backend/postmaster/checkpointer.c
+++ b/src/backend/postmaster/checkpointer.c
@@ -49,6 +49,7 @@
#include "postmaster/bgwriter.h"
#include "replication/syncrep.h"
#include "storage/bufmgr.h"
+#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/lwlock.h"
@@ -271,6 +272,7 @@ CheckpointerMain(void)
* files.
*/
LWLockReleaseAll();
+ ConditionVariableCancelSleep();
pgstat_report_wait_end();
AbortBufferIO();
UnlockBuffers();
diff --git a/src/backend/postmaster/walwriter.c b/src/backend/postmaster/walwriter.c
index 67dcff63b1b..7803af4abae 100644
--- a/src/backend/postmaster/walwriter.c
+++ b/src/backend/postmaster/walwriter.c
@@ -50,6 +50,7 @@
#include "pgstat.h"
#include "postmaster/walwriter.h"
#include "storage/bufmgr.h"
+#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/lwlock.h"
@@ -167,6 +168,7 @@ WalWriterMain(void)
* about in walwriter, but we do have LWLocks, and perhaps buffers?
*/
LWLockReleaseAll();
+ ConditionVariableCancelSleep();
pgstat_report_wait_end();
AbortBufferIO();
UnlockBuffers();
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index bc5e50807af..aa42d596104 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -66,6 +66,7 @@
#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "replication/walsender_private.h"
+#include "storage/condition_variable.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
@@ -253,6 +254,7 @@ void
WalSndErrorCleanup(void)
{
LWLockReleaseAll();
+ ConditionVariableCancelSleep();
pgstat_report_wait_end();
if (sendFile >= 0)
diff --git a/src/backend/storage/lmgr/Makefile b/src/backend/storage/lmgr/Makefile
index cd6ec73f08f..e1b787e838f 100644
--- a/src/backend/storage/lmgr/Makefile
+++ b/src/backend/storage/lmgr/Makefile
@@ -13,7 +13,7 @@ top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
OBJS = lmgr.o lock.o proc.o deadlock.o lwlock.o lwlocknames.o spin.o \
- s_lock.o predicate.o
+ s_lock.o predicate.o condition_variable.o
include $(top_srcdir)/src/backend/common.mk
diff --git a/src/backend/storage/lmgr/condition_variable.c b/src/backend/storage/lmgr/condition_variable.c
new file mode 100644
index 00000000000..2710b0bb622
--- /dev/null
+++ b/src/backend/storage/lmgr/condition_variable.c
@@ -0,0 +1,225 @@
+/*-------------------------------------------------------------------------
+ *
+ * condition_variable.c
+ * Implementation of condition variables. Condition variables provide
+ * a way for one process to wait until a specific condition occurs,
+ * without needing to know the specific identity of the process for
+ * which they are waiting. Waits for condition variables can be
+ * interrupted, unlike LWLock waits. Condition variables are safe
+ * to use within dynamic shared memory segments.
+ *
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/backend/storage/lmgr/condition_variable.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "miscadmin.h"
+#include "storage/condition_variable.h"
+#include "storage/ipc.h"
+#include "storage/proc.h"
+#include "storage/proclist.h"
+#include "storage/spin.h"
+#include "utils/memutils.h"
+
+/* Initially, we are not prepared to sleep on any condition variable. */
+static ConditionVariable *cv_sleep_target = NULL;
+
+/* Reusable WaitEventSet. */
+static WaitEventSet *cv_wait_event_set = NULL;
+
+/*
+ * Initialize a condition variable.
+ */
+void
+ConditionVariableInit(ConditionVariable *cv)
+{
+ SpinLockInit(&cv->mutex);
+ proclist_init(&cv->wakeup);
+}
+
+/*
+ * Prepare to wait on a given condition variable. This can optionally be
+ * called before entering a test/sleep loop. Alternatively, the call to
+ * ConditionVariablePrepareToSleep can be omitted. The only advantage of
+ * calling ConditionVariablePrepareToSleep is that it avoids an initial
+ * double-test of the user's predicate in the case that we need to wait.
+ */
+void
+ConditionVariablePrepareToSleep(ConditionVariable *cv)
+{
+ int pgprocno = MyProc->pgprocno;
+
+ /*
+ * It's not legal to prepare a sleep until the previous sleep has been
+ * completed or canceled.
+ */
+ Assert(cv_sleep_target == NULL);
+
+ /* Record the condition variable on which we will sleep. */
+ cv_sleep_target = cv;
+
+ /* Create a reusable WaitEventSet. */
+ if (cv_wait_event_set == NULL)
+ {
+ cv_wait_event_set = CreateWaitEventSet(TopMemoryContext, 1);
+ AddWaitEventToSet(cv_wait_event_set, WL_LATCH_SET, PGINVALID_SOCKET,
+ &MyProc->procLatch, NULL);
+ }
+
+ /* Add myself to the wait queue. */
+ SpinLockAcquire(&cv->mutex);
+ if (!proclist_contains(&cv->wakeup, pgprocno, cvWaitLink))
+ proclist_push_tail(&cv->wakeup, pgprocno, cvWaitLink);
+ SpinLockRelease(&cv->mutex);
+
+ /* Reset my latch before entering the caller's predicate loop. */
+ ResetLatch(&MyProc->procLatch);
+}
+
+/*--------------------------------------------------------------------------
+ * Wait for the given condition variable to be signaled. This should be
+ * called in a predicate loop that tests for a specfic exit condition and
+ * otherwise sleeps, like so:
+ *
+ * ConditionVariablePrepareToSleep(cv); [optional]
+ * while (condition for which we are waiting is not true)
+ * ConditionVariableSleep(cv, wait_event_info);
+ * ConditionVariableCancelSleep();
+ *
+ * Supply a value from one of the WaitEventXXX enums defined in pgstat.h to
+ * control the contents of pg_stat_activity's wait_event_type and wait_event
+ * columns while waiting.
+ *-------------------------------------------------------------------------*/
+void
+ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info)
+{
+ WaitEvent event;
+ bool done = false;
+
+ /*
+ * If the caller didn't prepare to sleep explicitly, then do so now and
+ * return immediately. The caller's predicate loop should immediately
+ * call again if its exit condition is not yet met. This initial spurious
+ * return can be avoided by calling ConditionVariablePrepareToSleep(cv)
+ * first. Whether it's worth doing that depends on whether you expect the
+ * condition to be met initially, in which case skipping the prepare
+ * allows you to skip manipulation of the wait list, or not met intiailly,
+ * in which case preparing first allows you to skip a spurious test of the
+ * caller's exit condition.
+ */
+ if (cv_sleep_target == NULL)
+ {
+ ConditionVariablePrepareToSleep(cv);
+ return;
+ }
+
+ /* Any earlier condition variable sleep must have been canceled. */
+ Assert(cv_sleep_target == cv);
+
+ while (!done)
+ {
+ CHECK_FOR_INTERRUPTS();
+
+ /*
+ * Wait for latch to be set. We don't care about the result because
+ * our contract permits spurious returns.
+ */
+ WaitEventSetWait(cv_wait_event_set, -1, &event, 1, wait_event_info);
+
+ /* Reset latch before testing whether we can return. */
+ ResetLatch(&MyProc->procLatch);
+
+ /*
+ * If this process has been taken out of the wait list, then we know
+ * that is has been signaled by ConditionVariableSignal. We put it
+ * back into the wait list, so we don't miss any further signals while
+ * the caller's loop checks its condition. If it hasn't been taken
+ * out of the wait list, then the latch must have been set by
+ * something other than ConditionVariableSignal; though we don't
+ * guarantee not to return spuriously, we'll avoid these obvious
+ * cases.
+ */
+ SpinLockAcquire(&cv->mutex);
+ if (!proclist_contains(&cv->wakeup, MyProc->pgprocno, cvWaitLink))
+ {
+ done = true;
+ proclist_push_tail(&cv->wakeup, MyProc->pgprocno, cvWaitLink);
+ }
+ SpinLockRelease(&cv->mutex);
+ }
+}
+
+/*
+ * Cancel any pending sleep operation. We just need to remove ourselves
+ * from the wait queue of any condition variable for which we have previously
+ * prepared a sleep.
+ */
+void
+ConditionVariableCancelSleep(void)
+{
+ ConditionVariable *cv = cv_sleep_target;
+
+ if (cv == NULL)
+ return;
+
+ SpinLockAcquire(&cv->mutex);
+ if (proclist_contains(&cv->wakeup, MyProc->pgprocno, cvWaitLink))
+ proclist_delete(&cv->wakeup, MyProc->pgprocno, cvWaitLink);
+ SpinLockRelease(&cv->mutex);
+
+ cv_sleep_target = NULL;
+}
+
+/*
+ * Wake up one sleeping process, assuming there is at least one.
+ *
+ * The return value indicates whether or not we woke somebody up.
+ */
+bool
+ConditionVariableSignal(ConditionVariable *cv)
+{
+ PGPROC *proc = NULL;
+
+ /* Remove the first process from the wakeup queue (if any). */
+ SpinLockAcquire(&cv->mutex);
+ if (!proclist_is_empty(&cv->wakeup))
+ proc = proclist_pop_head_node(&cv->wakeup, cvWaitLink);
+ SpinLockRelease(&cv->mutex);
+
+ /* If we found someone sleeping, set their latch to wake them up. */
+ if (proc != NULL)
+ {
+ SetLatch(&proc->procLatch);
+ return true;
+ }
+
+ /* No sleeping processes. */
+ return false;
+}
+
+/*
+ * Wake up all sleeping processes.
+ *
+ * The return value indicates the number of processes we woke.
+ */
+int
+ConditionVariableBroadcast(ConditionVariable *cv)
+{
+ int nwoken = 0;
+
+ /*
+ * Let's just do this the dumbest way possible. We could try to dequeue
+ * all the sleepers at once to save spinlock cycles, but it's a bit hard
+ * to get that right in the face of possible sleep cancelations, and
+ * we don't want to loop holding the mutex.
+ */
+ while (ConditionVariableSignal(cv))
+ ++nwoken;
+
+ return nwoken;
+}
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index b2016312a51..83e9ca15d18 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -43,6 +43,7 @@
#include "postmaster/autovacuum.h"
#include "replication/slot.h"
#include "replication/syncrep.h"
+#include "storage/condition_variable.h"
#include "storage/standby.h"
#include "storage/ipc.h"
#include "storage/lmgr.h"
@@ -802,6 +803,9 @@ ProcKill(int code, Datum arg)
*/
LWLockReleaseAll();
+ /* Cancel any pending condition variable sleep, too */
+ ConditionVariableCancelSleep();
+
/* Make sure active replication slots are released */
if (MyReplicationSlot != NULL)
ReplicationSlotRelease();
@@ -907,6 +911,9 @@ AuxiliaryProcKill(int code, Datum arg)
/* Release any LW locks I am holding (see notes above) */
LWLockReleaseAll();
+ /* Cancel any pending condition variable sleep, too */
+ ConditionVariableCancelSleep();
+
/*
* Reset MyLatch to the process local one. This is so that signal
* handlers et al can continue using the latch after the shared latch
diff --git a/src/include/storage/condition_variable.h b/src/include/storage/condition_variable.h
new file mode 100644
index 00000000000..685df4323d8
--- /dev/null
+++ b/src/include/storage/condition_variable.h
@@ -0,0 +1,59 @@
+/*-------------------------------------------------------------------------
+ *
+ * condition_variable.h
+ * Condition variables
+ *
+ * A condition variable is a method of waiting until a certain condition
+ * becomes true. Conventionally, a condition variable supports three
+ * operations: (1) sleep; (2) signal, which wakes up one process sleeping
+ * on the condition variable; and (3) broadcast, which wakes up every
+ * process sleeping on the condition variable. In our implementation,
+ * condition variables put a process into an interruptible sleep (so it
+ * can be cancelled prior to the fulfillment of the condition) and do not
+ * use pointers internally (so that they are safe to use within DSMs).
+ *
+ * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * src/include/storage/condition_variable.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef CONDITION_VARIABLE_H
+#define CONDITION_VARIABLE_H
+
+#include "storage/s_lock.h"
+#include "storage/proclist_types.h"
+
+typedef struct
+{
+ slock_t mutex;
+ proclist_head wakeup;
+} ConditionVariable;
+
+/* Initialize a condition variable. */
+extern void ConditionVariableInit(ConditionVariable *);
+
+/*
+ * To sleep on a condition variable, a process should use a loop which first
+ * checks the condition, exiting the loop if it is met, and then calls
+ * ConditionVariableSleep. Spurious wakeups are possible, but should be
+ * infrequent. After exiting the loop, ConditionVariableCancelSleep should
+ * be called to ensure that the process is no longer in the wait list for
+ * the condition variable.
+ */
+extern void ConditionVariableSleep(ConditionVariable *, uint32 wait_event_info);
+extern void ConditionVariableCancelSleep(void);
+
+/*
+ * The use of this function is optional and not necessary for correctness;
+ * for efficiency, it should be called prior entering the loop described above
+ * if it is thought that the condition is unlikely to hold immediately.
+ */
+extern void ConditionVariablePrepareToSleep(ConditionVariable *);
+
+/* Wake up a single waiter (via signal) or all waiters (via broadcast). */
+extern bool ConditionVariableSignal(ConditionVariable *);
+extern int ConditionVariableBroadcast(ConditionVariable *);
+
+#endif /* CONDITION_VARIABLE_H */
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index 7dc8dac6d1e..6fa71253d86 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -115,6 +115,9 @@ struct PGPROC
uint8 lwWaitMode; /* lwlock mode being waited for */
proclist_node lwWaitLink; /* position in LW lock wait list */
+ /* Support for condition variables. */
+ proclist_node cvWaitLink; /* position in CV wait list */
+
/* Info about lock the process is currently waiting for, if any. */
/* waitLock and waitProcLock are NULL if not currently waiting. */
LOCK *waitLock; /* Lock object we're sleeping on ... */
diff --git a/src/include/storage/proclist.h b/src/include/storage/proclist.h
index 2013a406a3c..8666c27cf8c 100644
--- a/src/include/storage/proclist.h
+++ b/src/include/storage/proclist.h
@@ -69,6 +69,8 @@ proclist_push_head_offset(proclist_head *list, int procno, size_t node_offset)
else
{
Assert(list->tail != INVALID_PGPROCNO);
+ Assert(list->head != procno);
+ Assert(list->tail != procno);
node->next = list->head;
proclist_node_get(node->next, node_offset)->prev = procno;
node->prev = INVALID_PGPROCNO;
@@ -77,7 +79,7 @@ proclist_push_head_offset(proclist_head *list, int procno, size_t node_offset)
}
/*
- * Insert a node a the end of a list.
+ * Insert a node at the end of a list.
*/
static inline void
proclist_push_tail_offset(proclist_head *list, int procno, size_t node_offset)
@@ -93,6 +95,8 @@ proclist_push_tail_offset(proclist_head *list, int procno, size_t node_offset)
else
{
Assert(list->head != INVALID_PGPROCNO);
+ Assert(list->head != procno);
+ Assert(list->tail != procno);
node->prev = list->tail;
proclist_node_get(node->prev, node_offset)->next = procno;
node->next = INVALID_PGPROCNO;
@@ -117,6 +121,52 @@ proclist_delete_offset(proclist_head *list, int procno, size_t node_offset)
list->tail = node->prev;
else
proclist_node_get(node->next, node_offset)->prev = node->prev;
+
+ node->next = node->prev = INVALID_PGPROCNO;
+}
+
+/*
+ * Check if a node is currently in a list. It must be known that the node is
+ * not in any _other_ proclist that uses the same proclist_node, so that the
+ * only possibilities are that it is in this list or none.
+ */
+static inline bool
+proclist_contains_offset(proclist_head *list, int procno,
+ size_t node_offset)
+{
+ proclist_node *node = proclist_node_get(procno, node_offset);
+
+ /*
+ * If this is not a member of a proclist, then the next and prev pointers
+ * should be 0. Circular lists are not allowed so this condition is not
+ * confusable with a real pgprocno 0.
+ */
+ if (node->prev == 0 && node->next == 0)
+ return false;
+
+ /* If there is a previous node, then this node must be in the list. */
+ if (node->prev != INVALID_PGPROCNO)
+ return true;
+
+ /*
+ * There is no previous node, so the only way this node can be in the list
+ * is if it's the head node.
+ */
+ return list->head == procno;
+}
+
+/*
+ * Remove and return the first node from a list (there must be one).
+ */
+static inline PGPROC *
+proclist_pop_head_node_offset(proclist_head *list, size_t node_offset)
+{
+ PGPROC *proc;
+
+ Assert(!proclist_is_empty(list));
+ proc = GetPGProcByNumber(list->head);
+ proclist_delete_offset(list, list->head, node_offset);
+ return proc;
}
/*
@@ -129,6 +179,10 @@ proclist_delete_offset(proclist_head *list, int procno, size_t node_offset)
proclist_push_head_offset((list), (procno), offsetof(PGPROC, link_member))
#define proclist_push_tail(list, procno, link_member) \
proclist_push_tail_offset((list), (procno), offsetof(PGPROC, link_member))
+#define proclist_pop_head_node(list, link_member) \
+ proclist_pop_head_node_offset((list), offsetof(PGPROC, link_member))
+#define proclist_contains(list, procno, link_member) \
+ proclist_contains_offset((list), (procno), offsetof(PGPROC, link_member))
/*
* Iterate through the list pointed at by 'lhead', storing the current