diff options
Diffstat (limited to 'src/backend/storage/lmgr/lock.c')
-rw-r--r-- | src/backend/storage/lmgr/lock.c | 119 |
1 files changed, 69 insertions, 50 deletions
diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c index 0d93932d8d3..5022a50dd7b 100644 --- a/src/backend/storage/lmgr/lock.c +++ b/src/backend/storage/lmgr/lock.c @@ -360,7 +360,8 @@ static PROCLOCK *SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc, static void GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner); static void BeginStrongLockAcquire(LOCALLOCK *locallock, uint32 fasthashcode); static void FinishStrongLockAcquire(void); -static void WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner); +static void WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner, + bool dontWait); static void ReleaseLockIfHeld(LOCALLOCK *locallock, bool sessionLock); static void LockReassignOwner(LOCALLOCK *locallock, ResourceOwner parent); static bool UnGrantLock(LOCK *lock, LOCKMODE lockmode, @@ -1025,49 +1026,14 @@ LockAcquireExtended(const LOCKTAG *locktag, else { /* - * We can't acquire the lock immediately. If caller specified no - * blocking, remove useless table entries and return - * LOCKACQUIRE_NOT_AVAIL without waiting. - */ - if (dontWait) - { - AbortStrongLockAcquire(); - if (proclock->holdMask == 0) - { - uint32 proclock_hashcode; - - proclock_hashcode = ProcLockHashCode(&proclock->tag, hashcode); - dlist_delete(&proclock->lockLink); - dlist_delete(&proclock->procLink); - if (!hash_search_with_hash_value(LockMethodProcLockHash, - &(proclock->tag), - proclock_hashcode, - HASH_REMOVE, - NULL)) - elog(PANIC, "proclock table corrupted"); - } - else - PROCLOCK_PRINT("LockAcquire: NOWAIT", proclock); - lock->nRequested--; - lock->requested[lockmode]--; - LOCK_PRINT("LockAcquire: conditional lock failed", lock, lockmode); - Assert((lock->nRequested > 0) && (lock->requested[lockmode] >= 0)); - Assert(lock->nGranted <= lock->nRequested); - LWLockRelease(partitionLock); - if (locallock->nLocks == 0) - RemoveLocalLock(locallock); - if (locallockp) - *locallockp = NULL; - return LOCKACQUIRE_NOT_AVAIL; - } - - /* * Set bitmask of locks this process already holds on this object. */ MyProc->heldLocks = proclock->holdMask; /* - * Sleep till someone wakes me up. + * Sleep till someone wakes me up. We do this even in the dontWait + * case, beause while trying to go to sleep, we may discover that we + * can acquire the lock immediately after all. */ TRACE_POSTGRESQL_LOCK_WAIT_START(locktag->locktag_field1, @@ -1077,7 +1043,7 @@ LockAcquireExtended(const LOCKTAG *locktag, locktag->locktag_type, lockmode); - WaitOnLock(locallock, owner); + WaitOnLock(locallock, owner, dontWait); TRACE_POSTGRESQL_LOCK_WAIT_DONE(locktag->locktag_field1, locktag->locktag_field2, @@ -1093,17 +1059,63 @@ LockAcquireExtended(const LOCKTAG *locktag, */ /* - * Check the proclock entry status, in case something in the ipc - * communication doesn't work correctly. + * Check the proclock entry status. If dontWait = true, this is an + * expected case; otherwise, it will open happen if something in the + * ipc communication doesn't work correctly. */ if (!(proclock->holdMask & LOCKBIT_ON(lockmode))) { AbortStrongLockAcquire(); - PROCLOCK_PRINT("LockAcquire: INCONSISTENT", proclock); - LOCK_PRINT("LockAcquire: INCONSISTENT", lock, lockmode); - /* Should we retry ? */ - LWLockRelease(partitionLock); - elog(ERROR, "LockAcquire failed"); + + if (dontWait) + { + /* + * We can't acquire the lock immediately. If caller specified + * no blocking, remove useless table entries and return + * LOCKACQUIRE_NOT_AVAIL without waiting. + */ + if (proclock->holdMask == 0) + { + uint32 proclock_hashcode; + + proclock_hashcode = ProcLockHashCode(&proclock->tag, + hashcode); + dlist_delete(&proclock->lockLink); + dlist_delete(&proclock->procLink); + if (!hash_search_with_hash_value(LockMethodProcLockHash, + &(proclock->tag), + proclock_hashcode, + HASH_REMOVE, + NULL)) + elog(PANIC, "proclock table corrupted"); + } + else + PROCLOCK_PRINT("LockAcquire: NOWAIT", proclock); + lock->nRequested--; + lock->requested[lockmode]--; + LOCK_PRINT("LockAcquire: conditional lock failed", + lock, lockmode); + Assert((lock->nRequested > 0) && + (lock->requested[lockmode] >= 0)); + Assert(lock->nGranted <= lock->nRequested); + LWLockRelease(partitionLock); + if (locallock->nLocks == 0) + RemoveLocalLock(locallock); + if (locallockp) + *locallockp = NULL; + return LOCKACQUIRE_NOT_AVAIL; + } + else + { + /* + * We should have gotten the lock, but somehow that didn't + * happen. If we get here, it's a bug. + */ + PROCLOCK_PRINT("LockAcquire: INCONSISTENT", proclock); + LOCK_PRINT("LockAcquire: INCONSISTENT", lock, lockmode); + LWLockRelease(partitionLock); + elog(ERROR, "LockAcquire failed"); + } } PROCLOCK_PRINT("LockAcquire: granted", proclock); LOCK_PRINT("LockAcquire: granted", lock, lockmode); @@ -1777,10 +1789,11 @@ MarkLockClear(LOCALLOCK *locallock) * Caller must have set MyProc->heldLocks to reflect locks already held * on the lockable object by this process. * - * The appropriate partition lock must be held at entry. + * The appropriate partition lock must be held at entry, and will still be + * held at exit. */ static void -WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner) +WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner, bool dontWait) { LOCKMETHODID lockmethodid = LOCALLOCK_LOCKMETHOD(*locallock); LockMethod lockMethodTable = LockMethods[lockmethodid]; @@ -1813,8 +1826,14 @@ WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner) */ PG_TRY(); { - if (ProcSleep(locallock, lockMethodTable) != PROC_WAIT_STATUS_OK) + /* + * If dontWait = true, we handle success and failure in the same way + * here. The caller will be able to sort out what has happened. + */ + if (ProcSleep(locallock, lockMethodTable, dontWait) != PROC_WAIT_STATUS_OK + && !dontWait) { + /* * We failed as a result of a deadlock, see CheckDeadLock(). Quit * now. |