aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/access/transam/xact.c4
-rw-r--r--src/backend/storage/lmgr/README2
-rw-r--r--src/backend/storage/lmgr/lock.c91
-rw-r--r--src/backend/storage/lmgr/proc.c19
-rw-r--r--src/backend/tcop/postgres.c6
-rw-r--r--src/include/storage/lock.h1
-rw-r--r--src/include/storage/proc.h2
7 files changed, 94 insertions, 31 deletions
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index d8523f37b2f..7f412b10d75 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -2259,7 +2259,7 @@ AbortTransaction(void)
* Also clean up any open wait for lock, since the lock manager will choke
* if we try to wait for another lock before doing this.
*/
- LockWaitCancel();
+ LockErrorCleanup();
/*
* check the current transaction state
@@ -4144,7 +4144,7 @@ AbortSubTransaction(void)
AbortBufferIO();
UnlockBuffers();
- LockWaitCancel();
+ LockErrorCleanup();
/*
* check the current transaction state
diff --git a/src/backend/storage/lmgr/README b/src/backend/storage/lmgr/README
index e3bb1163442..ee94725bb86 100644
--- a/src/backend/storage/lmgr/README
+++ b/src/backend/storage/lmgr/README
@@ -560,7 +560,7 @@ examines every member of the wait queue (this was not true in the 7.0
implementation, BTW). Therefore, if ProcLockWakeup is always invoked
after a lock is released or a wait queue is rearranged, there can be no
failure to wake a wakable process. One should also note that
-LockWaitCancel (abort a waiter due to outside factors) must run
+LockErrorCleanup (abort a waiter due to outside factors) must run
ProcLockWakeup, in case the canceled waiter was soft-blocking other
waiters.
diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c
index a98dfcab7f0..568de68beba 100644
--- a/src/backend/storage/lmgr/lock.c
+++ b/src/backend/storage/lmgr/lock.c
@@ -250,7 +250,8 @@ static HTAB *LockMethodProcLockHash;
static HTAB *LockMethodLocalHash;
-/* private state for GrantAwaitedLock */
+/* private state for error cleanup */
+static LOCALLOCK *StrongLockInProgress;
static LOCALLOCK *awaitedLock;
static ResourceOwner awaitedOwner;
@@ -338,6 +339,8 @@ static void RemoveLocalLock(LOCALLOCK *locallock);
static PROCLOCK *SetupLockInTable(LockMethod lockMethodTable, PGPROC *proc,
const LOCKTAG *locktag, uint32 hashcode, LOCKMODE lockmode);
static void GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner);
+static void BeginStrongLockAcquire(LOCALLOCK *locallock, uint32 fasthashcode);
+static void FinishStrongLockAcquire(void);
static void WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner);
static void ReleaseLockForOwner(LOCALLOCK *locallock, ResourceOwner owner);
static bool UnGrantLock(LOCK *lock, LOCKMODE lockmode,
@@ -738,22 +741,11 @@ LockAcquireExtended(const LOCKTAG *locktag,
}
else if (FastPathStrongMode(lockmode))
{
- /*
- * Adding to a memory location is not atomic, so we take a
- * spinlock to ensure we don't collide with someone else trying
- * to bump the count at the same time.
- *
- * XXX: It might be worth considering using an atomic fetch-and-add
- * instruction here, on architectures where that is supported.
- */
- Assert(locallock->holdsStrongLockCount == FALSE);
- SpinLockAcquire(&FastPathStrongRelationLocks->mutex);
- FastPathStrongRelationLocks->count[fasthashcode]++;
- locallock->holdsStrongLockCount = TRUE;
- SpinLockRelease(&FastPathStrongRelationLocks->mutex);
+ BeginStrongLockAcquire(locallock, fasthashcode);
if (!FastPathTransferRelationLocks(lockMethodTable, locktag,
hashcode))
{
+ AbortStrongLockAcquire();
if (reportMemoryError)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
@@ -779,6 +771,7 @@ LockAcquireExtended(const LOCKTAG *locktag,
hashcode, lockmode);
if (!proclock)
{
+ AbortStrongLockAcquire();
LWLockRelease(partitionLock);
if (reportMemoryError)
ereport(ERROR,
@@ -820,6 +813,7 @@ LockAcquireExtended(const LOCKTAG *locktag,
*/
if (dontWait)
{
+ AbortStrongLockAcquire();
if (proclock->holdMask == 0)
{
uint32 proclock_hashcode;
@@ -884,6 +878,7 @@ LockAcquireExtended(const LOCKTAG *locktag,
*/
if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
{
+ AbortStrongLockAcquire();
PROCLOCK_PRINT("LockAcquire: INCONSISTENT", proclock);
LOCK_PRINT("LockAcquire: INCONSISTENT", lock, lockmode);
/* Should we retry ? */
@@ -894,6 +889,12 @@ LockAcquireExtended(const LOCKTAG *locktag,
LOCK_PRINT("LockAcquire: granted", lock, lockmode);
}
+ /*
+ * Lock state is fully up-to-date now; if we error out after this, no
+ * special error cleanup is required.
+ */
+ FinishStrongLockAcquire();
+
LWLockRelease(partitionLock);
/*
@@ -1350,6 +1351,64 @@ GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner)
}
/*
+ * BeginStrongLockAcquire - inhibit use of fastpath for a given LOCALLOCK,
+ * and arrange for error cleanup if it fails
+ */
+static void
+BeginStrongLockAcquire(LOCALLOCK *locallock, uint32 fasthashcode)
+{
+ Assert(StrongLockInProgress == NULL);
+ Assert(locallock->holdsStrongLockCount == FALSE);
+
+ /*
+ * Adding to a memory location is not atomic, so we take a
+ * spinlock to ensure we don't collide with someone else trying
+ * to bump the count at the same time.
+ *
+ * XXX: It might be worth considering using an atomic fetch-and-add
+ * instruction here, on architectures where that is supported.
+ */
+
+ SpinLockAcquire(&FastPathStrongRelationLocks->mutex);
+ FastPathStrongRelationLocks->count[fasthashcode]++;
+ locallock->holdsStrongLockCount = TRUE;
+ StrongLockInProgress = locallock;
+ SpinLockRelease(&FastPathStrongRelationLocks->mutex);
+}
+
+/*
+ * FinishStrongLockAcquire - cancel pending cleanup for a strong lock
+ * acquisition once it's no longer needed
+ */
+static void
+FinishStrongLockAcquire(void)
+{
+ StrongLockInProgress = NULL;
+}
+
+/*
+ * AbortStrongLockAcquire - undo strong lock state changes performed by
+ * BeginStrongLockAcquire.
+ */
+void
+AbortStrongLockAcquire(void)
+{
+ uint32 fasthashcode;
+ LOCALLOCK *locallock = StrongLockInProgress;
+
+ if (locallock == NULL)
+ return;
+
+ fasthashcode = FastPathStrongLockHashPartition(locallock->hashcode);
+ Assert(locallock->holdsStrongLockCount == TRUE);
+ SpinLockAcquire(&FastPathStrongRelationLocks->mutex);
+ FastPathStrongRelationLocks->count[fasthashcode]--;
+ locallock->holdsStrongLockCount = FALSE;
+ StrongLockInProgress = NULL;
+ SpinLockRelease(&FastPathStrongRelationLocks->mutex);
+}
+
+/*
* GrantAwaitedLock -- call GrantLockLocal for the lock we are doing
* WaitOnLock on.
*
@@ -1414,7 +1473,7 @@ WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner)
* We can and do use a PG_TRY block to try to clean up after failure, but
* this still has a major limitation: elog(FATAL) can occur while waiting
* (eg, a "die" interrupt), and then control won't come back here. So all
- * cleanup of essential state should happen in LockWaitCancel, not here.
+ * cleanup of essential state should happen in LockErrorCleanup, not here.
* We can use PG_TRY to clear the "waiting" status flags, since doing that
* is unimportant if the process exits.
*/
@@ -1441,7 +1500,7 @@ WaitOnLock(LOCALLOCK *locallock, ResourceOwner owner)
}
PG_CATCH();
{
- /* In this path, awaitedLock remains set until LockWaitCancel */
+ /* In this path, awaitedLock remains set until LockErrorCleanup */
/* Report change to non-waiting status */
pgstat_report_waiting(false);
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 2926c159bdc..d1bf264b131 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -635,17 +635,20 @@ IsWaitingForLock(void)
}
/*
- * Cancel any pending wait for lock, when aborting a transaction.
+ * Cancel any pending wait for lock, when aborting a transaction, and revert
+ * any strong lock count acquisition for a lock being acquired.
*
* (Normally, this would only happen if we accept a cancel/die
- * interrupt while waiting; but an ereport(ERROR) while waiting is
- * within the realm of possibility, too.)
+ * interrupt while waiting; but an ereport(ERROR) before or during the lock
+ * wait is within the realm of possibility, too.)
*/
void
-LockWaitCancel(void)
+LockErrorCleanup(void)
{
LWLockId partitionLock;
+ AbortStrongLockAcquire();
+
/* Nothing to do if we weren't waiting for a lock */
if (lockAwaited == NULL)
return;
@@ -709,7 +712,7 @@ ProcReleaseLocks(bool isCommit)
if (!MyProc)
return;
/* If waiting, get off wait queue (should only be needed after error) */
- LockWaitCancel();
+ LockErrorCleanup();
/* Release locks */
LockReleaseAll(DEFAULT_LOCKMETHOD, !isCommit);
@@ -1019,7 +1022,7 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable)
* NOTE: this may also cause us to exit critical-section state, possibly
* allowing a cancel/die interrupt to be accepted. This is OK because we
* have recorded the fact that we are waiting for a lock, and so
- * LockWaitCancel will clean up if cancel/die happens.
+ * LockErrorCleanup will clean up if cancel/die happens.
*/
LWLockRelease(partitionLock);
@@ -1062,7 +1065,7 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable)
* don't, because we have no shared-state-change work to do after being
* granted the lock (the grantor did it all). We do have to worry about
* updating the locallock table, but if we lose control to an error,
- * LockWaitCancel will fix that up.
+ * LockErrorCleanup will fix that up.
*/
do
{
@@ -1207,7 +1210,7 @@ ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable)
LWLockAcquire(partitionLock, LW_EXCLUSIVE);
/*
- * We no longer want LockWaitCancel to do anything.
+ * We no longer want LockErrorCleanup to do anything.
*/
lockAwaited = NULL;
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index d5369f412e2..51d623ff3e3 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -2575,7 +2575,7 @@ die(SIGNAL_ARGS)
/* bump holdoff count to make ProcessInterrupts() a no-op */
/* until we are done getting ready for it */
InterruptHoldoffCount++;
- LockWaitCancel(); /* prevent CheckDeadLock from running */
+ LockErrorCleanup(); /* prevent CheckDeadLock from running */
DisableNotifyInterrupt();
DisableCatchupInterrupt();
InterruptHoldoffCount--;
@@ -2617,7 +2617,7 @@ StatementCancelHandler(SIGNAL_ARGS)
/* bump holdoff count to make ProcessInterrupts() a no-op */
/* until we are done getting ready for it */
InterruptHoldoffCount++;
- LockWaitCancel(); /* prevent CheckDeadLock from running */
+ LockErrorCleanup(); /* prevent CheckDeadLock from running */
DisableNotifyInterrupt();
DisableCatchupInterrupt();
InterruptHoldoffCount--;
@@ -2776,7 +2776,7 @@ RecoveryConflictInterrupt(ProcSignalReason reason)
/* bump holdoff count to make ProcessInterrupts() a no-op */
/* until we are done getting ready for it */
InterruptHoldoffCount++;
- LockWaitCancel(); /* prevent CheckDeadLock from running */
+ LockErrorCleanup(); /* prevent CheckDeadLock from running */
DisableNotifyInterrupt();
DisableCatchupInterrupt();
InterruptHoldoffCount--;
diff --git a/src/include/storage/lock.h b/src/include/storage/lock.h
index 6e7a6e9b689..92b6d9d1b42 100644
--- a/src/include/storage/lock.h
+++ b/src/include/storage/lock.h
@@ -489,6 +489,7 @@ extern LockAcquireResult LockAcquireExtended(const LOCKTAG *locktag,
bool sessionLock,
bool dontWait,
bool report_memory_error);
+extern void AbortStrongLockAcquire(void);
extern bool LockRelease(const LOCKTAG *locktag,
LOCKMODE lockmode, bool sessionLock);
extern void LockReleaseSession(LOCKMETHODID lockmethodid);
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index 415c0935ad6..a66c4849c12 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -244,7 +244,7 @@ extern int ProcSleep(LOCALLOCK *locallock, LockMethod lockMethodTable);
extern PGPROC *ProcWakeup(PGPROC *proc, int waitStatus);
extern void ProcLockWakeup(LockMethod lockMethodTable, LOCK *lock);
extern bool IsWaitingForLock(void);
-extern void LockWaitCancel(void);
+extern void LockErrorCleanup(void);
extern void ProcWaitForSignal(void);
extern void ProcSendSignal(int pid);