diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/access/transam/xact.c | 4 | ||||
-rw-r--r-- | src/backend/storage/lmgr/README | 2 | ||||
-rw-r--r-- | src/backend/storage/lmgr/lock.c | 91 | ||||
-rw-r--r-- | src/backend/storage/lmgr/proc.c | 19 | ||||
-rw-r--r-- | src/backend/tcop/postgres.c | 6 | ||||
-rw-r--r-- | src/include/storage/lock.h | 1 | ||||
-rw-r--r-- | src/include/storage/proc.h | 2 |
7 files changed, 94 insertions, 31 deletions
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index d8523f37b2f..7f412b10d75 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -2259,7 +2259,7 @@ AbortTransaction(void) * Also clean up any open wait for lock, since the lock manager will choke * if we try to wait for another lock before doing this. */ - LockWaitCancel(); + LockErrorCleanup(); /* * check the current transaction state @@ -4144,7 +4144,7 @@ AbortSubTransaction(void) AbortBufferIO(); UnlockBuffers(); - LockWaitCancel(); + LockErrorCleanup(); /* * check the current transaction state diff --git a/src/backend/storage/lmgr/README b/src/backend/storage/lmgr/README index e3bb1163442..ee94725bb86 100644 --- a/src/backend/storage/lmgr/README +++ b/src/backend/storage/lmgr/README @@ -560,7 +560,7 @@ examines every member of the wait queue (this was not true in the 7.0 implementation, BTW). Therefore, if ProcLockWakeup is always invoked after a lock is released or a wait queue is rearranged, there can be no failure to wake a wakable process. One should also note that -LockWaitCancel (abort a waiter due to outside factors) must run +LockErrorCleanup (abort a waiter due to outside factors) must run ProcLockWakeup, in case the canceled waiter was soft-blocking other waiters. diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c index a98dfcab7f0..568de68beba 100644 --- a/src/backend/storage/lmgr/lock.c +++ b/src/backend/storage/lmgr/lock.c @@ -250,7 +250,8 @@ static HTAB *LockMethodProcLockHash; static HTAB *LockMethodLocalHash; -/* private state for GrantAwaitedLock */ +/* private state for error cleanup */ +static LOCALLOCK *StrongLockInProgress; static LOCALLOCK *awaitedLock; static ResourceOwner awaitedOwner; @@ -338,6 +339,8 @@ static void RemoveLocalLock(LOCALLOCK *locallock); static PROCLOCK *SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc, const LOCKTAG *locktag, uint32 hashcode, LOCKMODE lockmode); static void GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner); +static void BeginStrongLockAcquire(LOCALLOCK *locallock, uint32 fasthashcode); +static void FinishStrongLockAcquire(void); static void WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner); static void ReleaseLockForOwner(LOCALLOCK *locallock, ResourceOwner owner); static bool UnGrantLock(LOCK *lock, LOCKMODE lockmode, @@ -738,22 +741,11 @@ LockAcquireExtended(const LOCKTAG *locktag, } else if (FastPathStrongMode(lockmode)) { - /* - * Adding to a memory location is not atomic, so we take a - * spinlock to ensure we don't collide with someone else trying - * to bump the count at the same time. - * - * XXX: It might be worth considering using an atomic fetch-and-add - * instruction here, on architectures where that is supported. - */ - Assert(locallock->holdsStrongLockCount == FALSE); - SpinLockAcquire(&FastPathStrongRelationLocks->mutex); - FastPathStrongRelationLocks->count[fasthashcode]++; - locallock->holdsStrongLockCount = TRUE; - SpinLockRelease(&FastPathStrongRelationLocks->mutex); + BeginStrongLockAcquire(locallock, fasthashcode); if (!FastPathTransferRelationLocks(lockMethodTable, locktag, hashcode)) { + AbortStrongLockAcquire(); if (reportMemoryError) ereport(ERROR, (errcode(ERRCODE_OUT_OF_MEMORY), @@ -779,6 +771,7 @@ LockAcquireExtended(const LOCKTAG *locktag, hashcode, lockmode); if (!proclock) { + AbortStrongLockAcquire(); LWLockRelease(partitionLock); if (reportMemoryError) ereport(ERROR, @@ -820,6 +813,7 @@ LockAcquireExtended(const LOCKTAG *locktag, */ if (dontWait) { + AbortStrongLockAcquire(); if (proclock->holdMask == 0) { uint32 proclock_hashcode; @@ -884,6 +878,7 @@ LockAcquireExtended(const LOCKTAG *locktag, */ if (!(proclock->holdMask & LOCKBIT_ON(lockmode))) { + AbortStrongLockAcquire(); PROCLOCK_PRINT("LockAcquire: INCONSISTENT", proclock); LOCK_PRINT("LockAcquire: INCONSISTENT", lock, lockmode); /* Should we retry ? */ @@ -894,6 +889,12 @@ LockAcquireExtended(const LOCKTAG *locktag, LOCK_PRINT("LockAcquire: granted", lock, lockmode); } + /* + * Lock state is fully up-to-date now; if we error out after this, no + * special error cleanup is required. + */ + FinishStrongLockAcquire(); + LWLockRelease(partitionLock); /* @@ -1350,6 +1351,64 @@ GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner) } /* + * BeginStrongLockAcquire - inhibit use of fastpath for a given LOCALLOCK, + * and arrange for error cleanup if it fails + */ +static void +BeginStrongLockAcquire(LOCALLOCK *locallock, uint32 fasthashcode) +{ + Assert(StrongLockInProgress == NULL); + Assert(locallock->holdsStrongLockCount == FALSE); + + /* + * Adding to a memory location is not atomic, so we take a + * spinlock to ensure we don't collide with someone else trying + * to bump the count at the same time. + * + * XXX: It might be worth considering using an atomic fetch-and-add + * instruction here, on architectures where that is supported. + */ + + SpinLockAcquire(&FastPathStrongRelationLocks->mutex); + FastPathStrongRelationLocks->count[fasthashcode]++; + locallock->holdsStrongLockCount = TRUE; + StrongLockInProgress = locallock; + SpinLockRelease(&FastPathStrongRelationLocks->mutex); +} + +/* + * FinishStrongLockAcquire - cancel pending cleanup for a strong lock + * acquisition once it's no longer needed + */ +static void +FinishStrongLockAcquire(void) +{ + StrongLockInProgress = NULL; +} + +/* + * AbortStrongLockAcquire - undo strong lock state changes performed by + * BeginStrongLockAcquire. + */ +void +AbortStrongLockAcquire(void) +{ + uint32 fasthashcode; + LOCALLOCK *locallock = StrongLockInProgress; + + if (locallock == NULL) + return; + + fasthashcode = FastPathStrongLockHashPartition(locallock->hashcode); + Assert(locallock->holdsStrongLockCount == TRUE); + SpinLockAcquire(&FastPathStrongRelationLocks->mutex); + FastPathStrongRelationLocks->count[fasthashcode]--; + locallock->holdsStrongLockCount = FALSE; + StrongLockInProgress = NULL; + SpinLockRelease(&FastPathStrongRelationLocks->mutex); +} + +/* * GrantAwaitedLock -- call GrantLockLocal for the lock we are doing * WaitOnLock on. * @@ -1414,7 +1473,7 @@ WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner) * We can and do use a PG_TRY block to try to clean up after failure, but * this still has a major limitation: elog(FATAL) can occur while waiting * (eg, a "die" interrupt), and then control won't come back here. So all - * cleanup of essential state should happen in LockWaitCancel, not here. + * cleanup of essential state should happen in LockErrorCleanup, not here. * We can use PG_TRY to clear the "waiting" status flags, since doing that * is unimportant if the process exits. */ @@ -1441,7 +1500,7 @@ WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner) } PG_CATCH(); { - /* In this path, awaitedLock remains set until LockWaitCancel */ + /* In this path, awaitedLock remains set until LockErrorCleanup */ /* Report change to non-waiting status */ pgstat_report_waiting(false); diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index 2926c159bdc..d1bf264b131 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -635,17 +635,20 @@ IsWaitingForLock(void) } /* - * Cancel any pending wait for lock, when aborting a transaction. + * Cancel any pending wait for lock, when aborting a transaction, and revert + * any strong lock count acquisition for a lock being acquired. * * (Normally, this would only happen if we accept a cancel/die - * interrupt while waiting; but an ereport(ERROR) while waiting is - * within the realm of possibility, too.) + * interrupt while waiting; but an ereport(ERROR) before or during the lock + * wait is within the realm of possibility, too.) */ void -LockWaitCancel(void) +LockErrorCleanup(void) { LWLockId partitionLock; + AbortStrongLockAcquire(); + /* Nothing to do if we weren't waiting for a lock */ if (lockAwaited == NULL) return; @@ -709,7 +712,7 @@ ProcReleaseLocks(bool isCommit) if (!MyProc) return; /* If waiting, get off wait queue (should only be needed after error) */ - LockWaitCancel(); + LockErrorCleanup(); /* Release locks */ LockReleaseAll(DEFAULT_LOCKMETHOD, !isCommit); @@ -1019,7 +1022,7 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable) * NOTE: this may also cause us to exit critical-section state, possibly * allowing a cancel/die interrupt to be accepted. This is OK because we * have recorded the fact that we are waiting for a lock, and so - * LockWaitCancel will clean up if cancel/die happens. + * LockErrorCleanup will clean up if cancel/die happens. */ LWLockRelease(partitionLock); @@ -1062,7 +1065,7 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable) * don't, because we have no shared-state-change work to do after being * granted the lock (the grantor did it all). We do have to worry about * updating the locallock table, but if we lose control to an error, - * LockWaitCancel will fix that up. + * LockErrorCleanup will fix that up. */ do { @@ -1207,7 +1210,7 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable) LWLockAcquire(partitionLock, LW_EXCLUSIVE); /* - * We no longer want LockWaitCancel to do anything. + * We no longer want LockErrorCleanup to do anything. */ lockAwaited = NULL; diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index d5369f412e2..51d623ff3e3 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -2575,7 +2575,7 @@ die(SIGNAL_ARGS) /* bump holdoff count to make ProcessInterrupts() a no-op */ /* until we are done getting ready for it */ InterruptHoldoffCount++; - LockWaitCancel(); /* prevent CheckDeadLock from running */ + LockErrorCleanup(); /* prevent CheckDeadLock from running */ DisableNotifyInterrupt(); DisableCatchupInterrupt(); InterruptHoldoffCount--; @@ -2617,7 +2617,7 @@ StatementCancelHandler(SIGNAL_ARGS) /* bump holdoff count to make ProcessInterrupts() a no-op */ /* until we are done getting ready for it */ InterruptHoldoffCount++; - LockWaitCancel(); /* prevent CheckDeadLock from running */ + LockErrorCleanup(); /* prevent CheckDeadLock from running */ DisableNotifyInterrupt(); DisableCatchupInterrupt(); InterruptHoldoffCount--; @@ -2776,7 +2776,7 @@ RecoveryConflictInterrupt(ProcSignalReason reason) /* bump holdoff count to make ProcessInterrupts() a no-op */ /* until we are done getting ready for it */ InterruptHoldoffCount++; - LockWaitCancel(); /* prevent CheckDeadLock from running */ + LockErrorCleanup(); /* prevent CheckDeadLock from running */ DisableNotifyInterrupt(); DisableCatchupInterrupt(); InterruptHoldoffCount--; diff --git a/src/include/storage/lock.h b/src/include/storage/lock.h index 6e7a6e9b689..92b6d9d1b42 100644 --- a/src/include/storage/lock.h +++ b/src/include/storage/lock.h @@ -489,6 +489,7 @@ extern LockAcquireResult LockAcquireExtended(const LOCKTAG *locktag, bool sessionLock, bool dontWait, bool report_memory_error); +extern void AbortStrongLockAcquire(void); extern bool LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock); extern void LockReleaseSession(LOCKMETHODID lockmethodid); diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index 415c0935ad6..a66c4849c12 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -244,7 +244,7 @@ extern int ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable); extern PGPROC *ProcWakeup(PGPROC *proc, int waitStatus); extern void ProcLockWakeup(LockMethod lockMethodTable, LOCK *lock); extern bool IsWaitingForLock(void); -extern void LockWaitCancel(void); +extern void LockErrorCleanup(void); extern void ProcWaitForSignal(void); extern void ProcSendSignal(int pid); |