diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/access/transam/xact.c | 4 | ||||
-rw-r--r-- | src/backend/bootstrap/bootstrap.c | 2 | ||||
-rw-r--r-- | src/backend/postmaster/bgwriter.c | 2 | ||||
-rw-r--r-- | src/backend/postmaster/checkpointer.c | 2 | ||||
-rw-r--r-- | src/backend/postmaster/walwriter.c | 2 | ||||
-rw-r--r-- | src/backend/replication/walsender.c | 2 | ||||
-rw-r--r-- | src/backend/storage/lmgr/Makefile | 2 | ||||
-rw-r--r-- | src/backend/storage/lmgr/condition_variable.c | 225 | ||||
-rw-r--r-- | src/backend/storage/lmgr/proc.c | 7 | ||||
-rw-r--r-- | src/include/storage/condition_variable.h | 59 | ||||
-rw-r--r-- | src/include/storage/proc.h | 3 | ||||
-rw-r--r-- | src/include/storage/proclist.h | 56 |
12 files changed, 364 insertions, 2 deletions
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 95805963af1..d6432165f1e 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -45,6 +45,7 @@ #include "replication/origin.h" #include "replication/syncrep.h" #include "replication/walsender.h" +#include "storage/condition_variable.h" #include "storage/fd.h" #include "storage/lmgr.h" #include "storage/predicate.h" @@ -2472,6 +2473,9 @@ AbortTransaction(void) /* Reset WAL record construction state */ XLogResetInsertion(); + /* Cancel condition variable sleep */ + ConditionVariableCancelSleep(); + /* * Also clean up any open wait for lock, since the lock manager will choke * if we try to wait for another lock before doing this. diff --git a/src/backend/bootstrap/bootstrap.c b/src/backend/bootstrap/bootstrap.c index 3870a4deb97..5c5ba7bec20 100644 --- a/src/backend/bootstrap/bootstrap.c +++ b/src/backend/bootstrap/bootstrap.c @@ -33,6 +33,7 @@ #include "replication/walreceiver.h" #include "storage/bufmgr.h" #include "storage/bufpage.h" +#include "storage/condition_variable.h" #include "storage/ipc.h" #include "storage/proc.h" #include "tcop/tcopprot.h" @@ -536,6 +537,7 @@ static void ShutdownAuxiliaryProcess(int code, Datum arg) { LWLockReleaseAll(); + ConditionVariableCancelSleep(); pgstat_report_wait_end(); } diff --git a/src/backend/postmaster/bgwriter.c b/src/backend/postmaster/bgwriter.c index c3f33561da2..a31d44e799c 100644 --- a/src/backend/postmaster/bgwriter.c +++ b/src/backend/postmaster/bgwriter.c @@ -46,6 +46,7 @@ #include "postmaster/bgwriter.h" #include "storage/bufmgr.h" #include "storage/buf_internals.h" +#include "storage/condition_variable.h" #include "storage/fd.h" #include "storage/ipc.h" #include "storage/lwlock.h" @@ -187,6 +188,7 @@ BackgroundWriterMain(void) * about in bgwriter, but we do have LWLocks, buffers, and temp files. */ LWLockReleaseAll(); + ConditionVariableCancelSleep(); AbortBufferIO(); UnlockBuffers(); /* buffer pins are released here: */ diff --git a/src/backend/postmaster/checkpointer.c b/src/backend/postmaster/checkpointer.c index 397267c6b74..92b0a9416d9 100644 --- a/src/backend/postmaster/checkpointer.c +++ b/src/backend/postmaster/checkpointer.c @@ -49,6 +49,7 @@ #include "postmaster/bgwriter.h" #include "replication/syncrep.h" #include "storage/bufmgr.h" +#include "storage/condition_variable.h" #include "storage/fd.h" #include "storage/ipc.h" #include "storage/lwlock.h" @@ -271,6 +272,7 @@ CheckpointerMain(void) * files. */ LWLockReleaseAll(); + ConditionVariableCancelSleep(); pgstat_report_wait_end(); AbortBufferIO(); UnlockBuffers(); diff --git a/src/backend/postmaster/walwriter.c b/src/backend/postmaster/walwriter.c index 67dcff63b1b..7803af4abae 100644 --- a/src/backend/postmaster/walwriter.c +++ b/src/backend/postmaster/walwriter.c @@ -50,6 +50,7 @@ #include "pgstat.h" #include "postmaster/walwriter.h" #include "storage/bufmgr.h" +#include "storage/condition_variable.h" #include "storage/fd.h" #include "storage/ipc.h" #include "storage/lwlock.h" @@ -167,6 +168,7 @@ WalWriterMain(void) * about in walwriter, but we do have LWLocks, and perhaps buffers? */ LWLockReleaseAll(); + ConditionVariableCancelSleep(); pgstat_report_wait_end(); AbortBufferIO(); UnlockBuffers(); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index bc5e50807af..aa42d596104 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -66,6 +66,7 @@ #include "replication/walreceiver.h" #include "replication/walsender.h" #include "replication/walsender_private.h" +#include "storage/condition_variable.h" #include "storage/fd.h" #include "storage/ipc.h" #include "storage/pmsignal.h" @@ -253,6 +254,7 @@ void WalSndErrorCleanup(void) { LWLockReleaseAll(); + ConditionVariableCancelSleep(); pgstat_report_wait_end(); if (sendFile >= 0) diff --git a/src/backend/storage/lmgr/Makefile b/src/backend/storage/lmgr/Makefile index cd6ec73f08f..e1b787e838f 100644 --- a/src/backend/storage/lmgr/Makefile +++ b/src/backend/storage/lmgr/Makefile @@ -13,7 +13,7 @@ top_builddir = ../../../.. include $(top_builddir)/src/Makefile.global OBJS = lmgr.o lock.o proc.o deadlock.o lwlock.o lwlocknames.o spin.o \ - s_lock.o predicate.o + s_lock.o predicate.o condition_variable.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/storage/lmgr/condition_variable.c b/src/backend/storage/lmgr/condition_variable.c new file mode 100644 index 00000000000..2710b0bb622 --- /dev/null +++ b/src/backend/storage/lmgr/condition_variable.c @@ -0,0 +1,225 @@ +/*------------------------------------------------------------------------- + * + * condition_variable.c + * Implementation of condition variables. Condition variables provide + * a way for one process to wait until a specific condition occurs, + * without needing to know the specific identity of the process for + * which they are waiting. Waits for condition variables can be + * interrupted, unlike LWLock waits. Condition variables are safe + * to use within dynamic shared memory segments. + * + * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/backend/storage/lmgr/condition_variable.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "miscadmin.h" +#include "storage/condition_variable.h" +#include "storage/ipc.h" +#include "storage/proc.h" +#include "storage/proclist.h" +#include "storage/spin.h" +#include "utils/memutils.h" + +/* Initially, we are not prepared to sleep on any condition variable. */ +static ConditionVariable *cv_sleep_target = NULL; + +/* Reusable WaitEventSet. */ +static WaitEventSet *cv_wait_event_set = NULL; + +/* + * Initialize a condition variable. + */ +void +ConditionVariableInit(ConditionVariable *cv) +{ + SpinLockInit(&cv->mutex); + proclist_init(&cv->wakeup); +} + +/* + * Prepare to wait on a given condition variable. This can optionally be + * called before entering a test/sleep loop. Alternatively, the call to + * ConditionVariablePrepareToSleep can be omitted. The only advantage of + * calling ConditionVariablePrepareToSleep is that it avoids an initial + * double-test of the user's predicate in the case that we need to wait. + */ +void +ConditionVariablePrepareToSleep(ConditionVariable *cv) +{ + int pgprocno = MyProc->pgprocno; + + /* + * It's not legal to prepare a sleep until the previous sleep has been + * completed or canceled. + */ + Assert(cv_sleep_target == NULL); + + /* Record the condition variable on which we will sleep. */ + cv_sleep_target = cv; + + /* Create a reusable WaitEventSet. */ + if (cv_wait_event_set == NULL) + { + cv_wait_event_set = CreateWaitEventSet(TopMemoryContext, 1); + AddWaitEventToSet(cv_wait_event_set, WL_LATCH_SET, PGINVALID_SOCKET, + &MyProc->procLatch, NULL); + } + + /* Add myself to the wait queue. */ + SpinLockAcquire(&cv->mutex); + if (!proclist_contains(&cv->wakeup, pgprocno, cvWaitLink)) + proclist_push_tail(&cv->wakeup, pgprocno, cvWaitLink); + SpinLockRelease(&cv->mutex); + + /* Reset my latch before entering the caller's predicate loop. */ + ResetLatch(&MyProc->procLatch); +} + +/*-------------------------------------------------------------------------- + * Wait for the given condition variable to be signaled. This should be + * called in a predicate loop that tests for a specfic exit condition and + * otherwise sleeps, like so: + * + * ConditionVariablePrepareToSleep(cv); [optional] + * while (condition for which we are waiting is not true) + * ConditionVariableSleep(cv, wait_event_info); + * ConditionVariableCancelSleep(); + * + * Supply a value from one of the WaitEventXXX enums defined in pgstat.h to + * control the contents of pg_stat_activity's wait_event_type and wait_event + * columns while waiting. + *-------------------------------------------------------------------------*/ +void +ConditionVariableSleep(ConditionVariable *cv, uint32 wait_event_info) +{ + WaitEvent event; + bool done = false; + + /* + * If the caller didn't prepare to sleep explicitly, then do so now and + * return immediately. The caller's predicate loop should immediately + * call again if its exit condition is not yet met. This initial spurious + * return can be avoided by calling ConditionVariablePrepareToSleep(cv) + * first. Whether it's worth doing that depends on whether you expect the + * condition to be met initially, in which case skipping the prepare + * allows you to skip manipulation of the wait list, or not met intiailly, + * in which case preparing first allows you to skip a spurious test of the + * caller's exit condition. + */ + if (cv_sleep_target == NULL) + { + ConditionVariablePrepareToSleep(cv); + return; + } + + /* Any earlier condition variable sleep must have been canceled. */ + Assert(cv_sleep_target == cv); + + while (!done) + { + CHECK_FOR_INTERRUPTS(); + + /* + * Wait for latch to be set. We don't care about the result because + * our contract permits spurious returns. + */ + WaitEventSetWait(cv_wait_event_set, -1, &event, 1, wait_event_info); + + /* Reset latch before testing whether we can return. */ + ResetLatch(&MyProc->procLatch); + + /* + * If this process has been taken out of the wait list, then we know + * that is has been signaled by ConditionVariableSignal. We put it + * back into the wait list, so we don't miss any further signals while + * the caller's loop checks its condition. If it hasn't been taken + * out of the wait list, then the latch must have been set by + * something other than ConditionVariableSignal; though we don't + * guarantee not to return spuriously, we'll avoid these obvious + * cases. + */ + SpinLockAcquire(&cv->mutex); + if (!proclist_contains(&cv->wakeup, MyProc->pgprocno, cvWaitLink)) + { + done = true; + proclist_push_tail(&cv->wakeup, MyProc->pgprocno, cvWaitLink); + } + SpinLockRelease(&cv->mutex); + } +} + +/* + * Cancel any pending sleep operation. We just need to remove ourselves + * from the wait queue of any condition variable for which we have previously + * prepared a sleep. + */ +void +ConditionVariableCancelSleep(void) +{ + ConditionVariable *cv = cv_sleep_target; + + if (cv == NULL) + return; + + SpinLockAcquire(&cv->mutex); + if (proclist_contains(&cv->wakeup, MyProc->pgprocno, cvWaitLink)) + proclist_delete(&cv->wakeup, MyProc->pgprocno, cvWaitLink); + SpinLockRelease(&cv->mutex); + + cv_sleep_target = NULL; +} + +/* + * Wake up one sleeping process, assuming there is at least one. + * + * The return value indicates whether or not we woke somebody up. + */ +bool +ConditionVariableSignal(ConditionVariable *cv) +{ + PGPROC *proc = NULL; + + /* Remove the first process from the wakeup queue (if any). */ + SpinLockAcquire(&cv->mutex); + if (!proclist_is_empty(&cv->wakeup)) + proc = proclist_pop_head_node(&cv->wakeup, cvWaitLink); + SpinLockRelease(&cv->mutex); + + /* If we found someone sleeping, set their latch to wake them up. */ + if (proc != NULL) + { + SetLatch(&proc->procLatch); + return true; + } + + /* No sleeping processes. */ + return false; +} + +/* + * Wake up all sleeping processes. + * + * The return value indicates the number of processes we woke. + */ +int +ConditionVariableBroadcast(ConditionVariable *cv) +{ + int nwoken = 0; + + /* + * Let's just do this the dumbest way possible. We could try to dequeue + * all the sleepers at once to save spinlock cycles, but it's a bit hard + * to get that right in the face of possible sleep cancelations, and + * we don't want to loop holding the mutex. + */ + while (ConditionVariableSignal(cv)) + ++nwoken; + + return nwoken; +} diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index b2016312a51..83e9ca15d18 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -43,6 +43,7 @@ #include "postmaster/autovacuum.h" #include "replication/slot.h" #include "replication/syncrep.h" +#include "storage/condition_variable.h" #include "storage/standby.h" #include "storage/ipc.h" #include "storage/lmgr.h" @@ -802,6 +803,9 @@ ProcKill(int code, Datum arg) */ LWLockReleaseAll(); + /* Cancel any pending condition variable sleep, too */ + ConditionVariableCancelSleep(); + /* Make sure active replication slots are released */ if (MyReplicationSlot != NULL) ReplicationSlotRelease(); @@ -907,6 +911,9 @@ AuxiliaryProcKill(int code, Datum arg) /* Release any LW locks I am holding (see notes above) */ LWLockReleaseAll(); + /* Cancel any pending condition variable sleep, too */ + ConditionVariableCancelSleep(); + /* * Reset MyLatch to the process local one. This is so that signal * handlers et al can continue using the latch after the shared latch diff --git a/src/include/storage/condition_variable.h b/src/include/storage/condition_variable.h new file mode 100644 index 00000000000..685df4323d8 --- /dev/null +++ b/src/include/storage/condition_variable.h @@ -0,0 +1,59 @@ +/*------------------------------------------------------------------------- + * + * condition_variable.h + * Condition variables + * + * A condition variable is a method of waiting until a certain condition + * becomes true. Conventionally, a condition variable supports three + * operations: (1) sleep; (2) signal, which wakes up one process sleeping + * on the condition variable; and (3) broadcast, which wakes up every + * process sleeping on the condition variable. In our implementation, + * condition variables put a process into an interruptible sleep (so it + * can be cancelled prior to the fulfillment of the condition) and do not + * use pointers internally (so that they are safe to use within DSMs). + * + * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/storage/condition_variable.h + * + *------------------------------------------------------------------------- + */ +#ifndef CONDITION_VARIABLE_H +#define CONDITION_VARIABLE_H + +#include "storage/s_lock.h" +#include "storage/proclist_types.h" + +typedef struct +{ + slock_t mutex; + proclist_head wakeup; +} ConditionVariable; + +/* Initialize a condition variable. */ +extern void ConditionVariableInit(ConditionVariable *); + +/* + * To sleep on a condition variable, a process should use a loop which first + * checks the condition, exiting the loop if it is met, and then calls + * ConditionVariableSleep. Spurious wakeups are possible, but should be + * infrequent. After exiting the loop, ConditionVariableCancelSleep should + * be called to ensure that the process is no longer in the wait list for + * the condition variable. + */ +extern void ConditionVariableSleep(ConditionVariable *, uint32 wait_event_info); +extern void ConditionVariableCancelSleep(void); + +/* + * The use of this function is optional and not necessary for correctness; + * for efficiency, it should be called prior entering the loop described above + * if it is thought that the condition is unlikely to hold immediately. + */ +extern void ConditionVariablePrepareToSleep(ConditionVariable *); + +/* Wake up a single waiter (via signal) or all waiters (via broadcast). */ +extern bool ConditionVariableSignal(ConditionVariable *); +extern int ConditionVariableBroadcast(ConditionVariable *); + +#endif /* CONDITION_VARIABLE_H */ diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index 7dc8dac6d1e..6fa71253d86 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -115,6 +115,9 @@ struct PGPROC uint8 lwWaitMode; /* lwlock mode being waited for */ proclist_node lwWaitLink; /* position in LW lock wait list */ + /* Support for condition variables. */ + proclist_node cvWaitLink; /* position in CV wait list */ + /* Info about lock the process is currently waiting for, if any. */ /* waitLock and waitProcLock are NULL if not currently waiting. */ LOCK *waitLock; /* Lock object we're sleeping on ... */ diff --git a/src/include/storage/proclist.h b/src/include/storage/proclist.h index 2013a406a3c..8666c27cf8c 100644 --- a/src/include/storage/proclist.h +++ b/src/include/storage/proclist.h @@ -69,6 +69,8 @@ proclist_push_head_offset(proclist_head *list, int procno, size_t node_offset) else { Assert(list->tail != INVALID_PGPROCNO); + Assert(list->head != procno); + Assert(list->tail != procno); node->next = list->head; proclist_node_get(node->next, node_offset)->prev = procno; node->prev = INVALID_PGPROCNO; @@ -77,7 +79,7 @@ proclist_push_head_offset(proclist_head *list, int procno, size_t node_offset) } /* - * Insert a node a the end of a list. + * Insert a node at the end of a list. */ static inline void proclist_push_tail_offset(proclist_head *list, int procno, size_t node_offset) @@ -93,6 +95,8 @@ proclist_push_tail_offset(proclist_head *list, int procno, size_t node_offset) else { Assert(list->head != INVALID_PGPROCNO); + Assert(list->head != procno); + Assert(list->tail != procno); node->prev = list->tail; proclist_node_get(node->prev, node_offset)->next = procno; node->next = INVALID_PGPROCNO; @@ -117,6 +121,52 @@ proclist_delete_offset(proclist_head *list, int procno, size_t node_offset) list->tail = node->prev; else proclist_node_get(node->next, node_offset)->prev = node->prev; + + node->next = node->prev = INVALID_PGPROCNO; +} + +/* + * Check if a node is currently in a list. It must be known that the node is + * not in any _other_ proclist that uses the same proclist_node, so that the + * only possibilities are that it is in this list or none. + */ +static inline bool +proclist_contains_offset(proclist_head *list, int procno, + size_t node_offset) +{ + proclist_node *node = proclist_node_get(procno, node_offset); + + /* + * If this is not a member of a proclist, then the next and prev pointers + * should be 0. Circular lists are not allowed so this condition is not + * confusable with a real pgprocno 0. + */ + if (node->prev == 0 && node->next == 0) + return false; + + /* If there is a previous node, then this node must be in the list. */ + if (node->prev != INVALID_PGPROCNO) + return true; + + /* + * There is no previous node, so the only way this node can be in the list + * is if it's the head node. + */ + return list->head == procno; +} + +/* + * Remove and return the first node from a list (there must be one). + */ +static inline PGPROC * +proclist_pop_head_node_offset(proclist_head *list, size_t node_offset) +{ + PGPROC *proc; + + Assert(!proclist_is_empty(list)); + proc = GetPGProcByNumber(list->head); + proclist_delete_offset(list, list->head, node_offset); + return proc; } /* @@ -129,6 +179,10 @@ proclist_delete_offset(proclist_head *list, int procno, size_t node_offset) proclist_push_head_offset((list), (procno), offsetof(PGPROC, link_member)) #define proclist_push_tail(list, procno, link_member) \ proclist_push_tail_offset((list), (procno), offsetof(PGPROC, link_member)) +#define proclist_pop_head_node(list, link_member) \ + proclist_pop_head_node_offset((list), offsetof(PGPROC, link_member)) +#define proclist_contains(list, procno, link_member) \ + proclist_contains_offset((list), (procno), offsetof(PGPROC, link_member)) /* * Iterate through the list pointed at by 'lhead', storing the current |