diff options
Diffstat (limited to 'src/backend/storage/lmgr/lock.c')
-rw-r--r-- | src/backend/storage/lmgr/lock.c | 91 |
1 files changed, 75 insertions, 16 deletions
diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c index a98dfcab7f0..568de68beba 100644 --- a/src/backend/storage/lmgr/lock.c +++ b/src/backend/storage/lmgr/lock.c @@ -250,7 +250,8 @@ static HTAB *LockMethodProcLockHash; static HTAB *LockMethodLocalHash; -/* private state for GrantAwaitedLock */ +/* private state for error cleanup */ +static LOCALLOCK *StrongLockInProgress; static LOCALLOCK *awaitedLock; static ResourceOwner awaitedOwner; @@ -338,6 +339,8 @@ static void RemoveLocalLock(LOCALLOCK *locallock); static PROCLOCK *SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc, const LOCKTAG *locktag, uint32 hashcode, LOCKMODE lockmode); static void GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner); +static void BeginStrongLockAcquire(LOCALLOCK *locallock, uint32 fasthashcode); +static void FinishStrongLockAcquire(void); static void WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner); static void ReleaseLockForOwner(LOCALLOCK *locallock, ResourceOwner owner); static bool UnGrantLock(LOCK *lock, LOCKMODE lockmode, @@ -738,22 +741,11 @@ LockAcquireExtended(const LOCKTAG *locktag, } else if (FastPathStrongMode(lockmode)) { - /* - * Adding to a memory location is not atomic, so we take a - * spinlock to ensure we don't collide with someone else trying - * to bump the count at the same time. - * - * XXX: It might be worth considering using an atomic fetch-and-add - * instruction here, on architectures where that is supported. - */ - Assert(locallock->holdsStrongLockCount == FALSE); - SpinLockAcquire(&FastPathStrongRelationLocks->mutex); - FastPathStrongRelationLocks->count[fasthashcode]++; - locallock->holdsStrongLockCount = TRUE; - SpinLockRelease(&FastPathStrongRelationLocks->mutex); + BeginStrongLockAcquire(locallock, fasthashcode); if (!FastPathTransferRelationLocks(lockMethodTable, locktag, hashcode)) { + AbortStrongLockAcquire(); if (reportMemoryError) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), @@ -779,6 +771,7 @@ LockAcquireExtended(const LOCKTAG *locktag, hashcode, lockmode); if (!proclock) { + AbortStrongLockAcquire(); LWLockRelease(partitionLock); if (reportMemoryError) ereport(ERROR, @@ -820,6 +813,7 @@ LockAcquireExtended(const LOCKTAG *locktag, */ if (dontWait) { + AbortStrongLockAcquire(); if (proclock->holdMask == 0) { uint32 proclock_hashcode; @@ -884,6 +878,7 @@ LockAcquireExtended(const LOCKTAG *locktag, */ if (!(proclock->holdMask & LOCKBIT_ON(lockmode))) { + AbortStrongLockAcquire(); PROCLOCK_PRINT("LockAcquire: INCONSISTENT", proclock); LOCK_PRINT("LockAcquire: INCONSISTENT", lock, lockmode); /* Should we retry ? */ @@ -894,6 +889,12 @@ LockAcquireExtended(const LOCKTAG *locktag, LOCK_PRINT("LockAcquire: granted", lock, lockmode); } + /* + * Lock state is fully up-to-date now; if we error out after this, no + * special error cleanup is required. + */ + FinishStrongLockAcquire(); + LWLockRelease(partitionLock); /* @@ -1350,6 +1351,64 @@ GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner) } /* + * BeginStrongLockAcquire - inhibit use of fastpath for a given LOCALLOCK, + * and arrange for error cleanup if it fails + */ +static void +BeginStrongLockAcquire(LOCALLOCK *locallock, uint32 fasthashcode) +{ + Assert(StrongLockInProgress == NULL); + Assert(locallock->holdsStrongLockCount == FALSE); + + /* + * Adding to a memory location is not atomic, so we take a + * spinlock to ensure we don't collide with someone else trying + * to bump the count at the same time. + * + * XXX: It might be worth considering using an atomic fetch-and-add + * instruction here, on architectures where that is supported. + */ + + SpinLockAcquire(&FastPathStrongRelationLocks->mutex); + FastPathStrongRelationLocks->count[fasthashcode]++; + locallock->holdsStrongLockCount = TRUE; + StrongLockInProgress = locallock; + SpinLockRelease(&FastPathStrongRelationLocks->mutex); +} + +/* + * FinishStrongLockAcquire - cancel pending cleanup for a strong lock + * acquisition once it's no longer needed + */ +static void +FinishStrongLockAcquire(void) +{ + StrongLockInProgress = NULL; +} + +/* + * AbortStrongLockAcquire - undo strong lock state changes performed by + * BeginStrongLockAcquire. + */ +void +AbortStrongLockAcquire(void) +{ + uint32 fasthashcode; + LOCALLOCK *locallock = StrongLockInProgress; + + if (locallock == NULL) + return; + + fasthashcode = FastPathStrongLockHashPartition(locallock->hashcode); + Assert(locallock->holdsStrongLockCount == TRUE); + SpinLockAcquire(&FastPathStrongRelationLocks->mutex); + FastPathStrongRelationLocks->count[fasthashcode]--; + locallock->holdsStrongLockCount = FALSE; + StrongLockInProgress = NULL; + SpinLockRelease(&FastPathStrongRelationLocks->mutex); +} + +/* * GrantAwaitedLock -- call GrantLockLocal for the lock we are doing * WaitOnLock on. * @@ -1414,7 +1473,7 @@ WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner) * We can and do use a PG_TRY block to try to clean up after failure, but * this still has a major limitation: elog(FATAL) can occur while waiting * (eg, a "die" interrupt), and then control won't come back here. So all - * cleanup of essential state should happen in LockWaitCancel, not here. + * cleanup of essential state should happen in LockErrorCleanup, not here. * We can use PG_TRY to clear the "waiting" status flags, since doing that * is unimportant if the process exits. */ @@ -1441,7 +1500,7 @@ WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner) } PG_CATCH(); { - /* In this path, awaitedLock remains set until LockWaitCancel */ + /* In this path, awaitedLock remains set until LockErrorCleanup */ /* Report change to non-waiting status */ pgstat_report_waiting(false); |