diff options
Diffstat (limited to 'src/backend/storage/lmgr/lwlock.c')
-rw-r--r-- | src/backend/storage/lmgr/lwlock.c | 166 |
1 files changed, 160 insertions, 6 deletions
diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c index cc4156826b5..bee35b8c1cf 100644 --- a/src/backend/storage/lmgr/lwlock.c +++ b/src/backend/storage/lmgr/lwlock.c @@ -430,7 +430,7 @@ LWLockAcquire(LWLockId lockid, LWLockMode mode) elog(PANIC, "cannot wait without a PGPROC structure"); proc->lwWaiting = true; - proc->lwExclusive = (mode == LW_EXCLUSIVE); + proc->lwWaitMode = mode; proc->lwWaitLink = NULL; if (lock->head == NULL) lock->head = proc; @@ -565,6 +565,144 @@ LWLockConditionalAcquire(LWLockId lockid, LWLockMode mode) } /* + * LWLockWaitUntilFree - Wait until a lock is free + * + * The semantics of this function are a bit funky. If the lock is currently + * free, it is acquired in the given mode, and the function returns true. If + * the lock isn't immediately free, the function waits until it is released + * and returns false, but does not acquire the lock. + * + * This is currently used for WALWriteLock: when a backend flushes the WAL, + * holding WALWriteLock, it can flush the commit records of many other + * backends as a side-effect. Those other backends need to wait until the + * flush finishes, but don't need to acquire the lock anymore. They can just + * wake up, observe that their records have already been flushed, and return. + */ +bool +LWLockWaitUntilFree(LWLockId lockid, LWLockMode mode) +{ + volatile LWLock *lock = &(LWLockArray[lockid].lock); + PGPROC *proc = MyProc; + bool mustwait; + int extraWaits = 0; + + PRINT_LWDEBUG("LWLockWaitUntilFree", lockid, lock); + + /* Ensure we will have room to remember the lock */ + if (num_held_lwlocks >= MAX_SIMUL_LWLOCKS) + elog(ERROR, "too many LWLocks taken"); + + /* + * Lock out cancel/die interrupts until we exit the code section protected + * by the LWLock. This ensures that interrupts will not interfere with + * manipulations of data structures in shared memory. + */ + HOLD_INTERRUPTS(); + + /* Acquire mutex. Time spent holding mutex should be short! */ + SpinLockAcquire(&lock->mutex); + + /* If I can get the lock, do so quickly. */ + if (mode == LW_EXCLUSIVE) + { + if (lock->exclusive == 0 && lock->shared == 0) + { + lock->exclusive++; + mustwait = false; + } + else + mustwait = true; + } + else + { + if (lock->exclusive == 0) + { + lock->shared++; + mustwait = false; + } + else + mustwait = true; + } + + if (mustwait) + { + /* + * Add myself to wait queue. + * + * If we don't have a PGPROC structure, there's no way to wait. This + * should never occur, since MyProc should only be null during shared + * memory initialization. + */ + if (proc == NULL) + elog(PANIC, "cannot wait without a PGPROC structure"); + + proc->lwWaiting = true; + proc->lwWaitMode = LW_WAIT_UNTIL_FREE; + proc->lwWaitLink = NULL; + if (lock->head == NULL) + lock->head = proc; + else + lock->tail->lwWaitLink = proc; + lock->tail = proc; + + /* Can release the mutex now */ + SpinLockRelease(&lock->mutex); + + /* + * Wait until awakened. Like in LWLockAcquire, be prepared for bogus + * wakups, because we share the semaphore with ProcWaitForSignal. + */ + LOG_LWDEBUG("LWLockWaitUntilFree", lockid, "waiting"); + +#ifdef LWLOCK_STATS + block_counts[lockid]++; +#endif + + TRACE_POSTGRESQL_LWLOCK_WAIT_START(lockid, mode); + + for (;;) + { + /* "false" means cannot accept cancel/die interrupt here. */ + PGSemaphoreLock(&proc->sem, false); + if (!proc->lwWaiting) + break; + extraWaits++; + } + + TRACE_POSTGRESQL_LWLOCK_WAIT_DONE(lockid, mode); + + LOG_LWDEBUG("LWLockWaitUntilFree", lockid, "awakened"); + } + else + { + /* We are done updating shared state of the lock itself. */ + SpinLockRelease(&lock->mutex); + } + + /* + * Fix the process wait semaphore's count for any absorbed wakeups. + */ + while (extraWaits-- > 0) + PGSemaphoreUnlock(&proc->sem); + + if (mustwait) + { + /* Failed to get lock, so release interrupt holdoff */ + RESUME_INTERRUPTS(); + LOG_LWDEBUG("LWLockWaitUntilFree", lockid, "failed"); + TRACE_POSTGRESQL_LWLOCK_WAIT_UNTIL_FREE_FAIL(lockid, mode); + } + else + { + /* Add lock to list of locks held by this backend */ + held_lwlocks[num_held_lwlocks++] = lockid; + TRACE_POSTGRESQL_LWLOCK_WAIT_UNTIL_FREE(lockid, mode); + } + + return !mustwait; +} + +/* * LWLockRelease - release a previously acquired lock */ void @@ -618,20 +756,36 @@ LWLockRelease(LWLockId lockid) /* * Remove the to-be-awakened PGPROCs from the queue. If the front * waiter wants exclusive lock, awaken him only. Otherwise awaken - * as many waiters as want shared access. + * as many waiters as want shared access (or just want to be + * woken up when the lock becomes free without acquiring it, + * ie. LWLockWaitUntilFree). */ + bool releaseOK = true; + proc = head; - if (!proc->lwExclusive) + if (proc->lwWaitMode != LW_EXCLUSIVE) { while (proc->lwWaitLink != NULL && - !proc->lwWaitLink->lwExclusive) + proc->lwWaitLink->lwWaitMode != LW_EXCLUSIVE) + { proc = proc->lwWaitLink; + if (proc->lwWaitMode != LW_WAIT_UNTIL_FREE) + releaseOK = false; + } } /* proc is now the last PGPROC to be released */ lock->head = proc->lwWaitLink; proc->lwWaitLink = NULL; - /* prevent additional wakeups until retryer gets to run */ - lock->releaseOK = false; + /* + * Prevent additional wakeups until retryer gets to run. Backends + * that are just waiting for the lock to become free don't prevent + * wakeups, because they might decide that they don't want the + * lock, after all. + */ + if (proc->lwWaitMode != LW_WAIT_UNTIL_FREE) + releaseOK = false; + + lock->releaseOK = releaseOK; } else { |