aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/storage/ipc/standby.c8
-rw-r--r--src/backend/storage/lmgr/lmgr.c38
-rw-r--r--src/backend/storage/lmgr/lock.c55
-rw-r--r--src/include/storage/lock.h8
4 files changed, 92 insertions, 17 deletions
diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
index 4558d6fd325..73b4e691c84 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -395,8 +395,8 @@ ResolveRecoveryConflictWithLock(Oid dbOid, Oid relOid)
ResolveRecoveryConflictWithVirtualXIDs(backends,
PROCSIG_RECOVERY_CONFLICT_LOCK);
- if (LockAcquireExtended(&locktag, AccessExclusiveLock, true, true, false)
- != LOCKACQUIRE_NOT_AVAIL)
+ if (LockAcquireExtended(&locktag, AccessExclusiveLock, true, true,
+ false, NULL) != LOCKACQUIRE_NOT_AVAIL)
lock_acquired = true;
}
}
@@ -631,8 +631,8 @@ StandbyAcquireAccessExclusiveLock(TransactionId xid, Oid dbOid, Oid relOid)
*/
SET_LOCKTAG_RELATION(locktag, newlock->dbOid, newlock->relOid);
- if (LockAcquireExtended(&locktag, AccessExclusiveLock, true, true, false)
- == LOCKACQUIRE_NOT_AVAIL)
+ if (LockAcquireExtended(&locktag, AccessExclusiveLock, true, true,
+ false, NULL) == LOCKACQUIRE_NOT_AVAIL)
ResolveRecoveryConflictWithLock(newlock->dbOid, newlock->relOid);
}
diff --git a/src/backend/storage/lmgr/lmgr.c b/src/backend/storage/lmgr/lmgr.c
index 6e6e534e2c3..1e9093b941f 100644
--- a/src/backend/storage/lmgr/lmgr.c
+++ b/src/backend/storage/lmgr/lmgr.c
@@ -105,11 +105,12 @@ void
LockRelationOid(Oid relid, LOCKMODE lockmode)
{
LOCKTAG tag;
+ LOCALLOCK *locallock;
LockAcquireResult res;
SetLocktagRelationOid(&tag, relid);
- res = LockAcquire(&tag, lockmode, false, false);
+ res = LockAcquireExtended(&tag, lockmode, false, false, true, &locallock);
/*
* Now that we have the lock, check for invalidation messages, so that we
@@ -120,9 +121,18 @@ LockRelationOid(Oid relid, LOCKMODE lockmode)
* relcache entry in an undesirable way. (In the case where our own xact
* modifies the rel, the relcache update happens via
* CommandCounterIncrement, not here.)
+ *
+ * However, in corner cases where code acts on tables (usually catalogs)
+ * recursively, we might get here while still processing invalidation
+ * messages in some outer execution of this function or a sibling. The
+ * "cleared" status of the lock tells us whether we really are done
+ * absorbing relevant inval messages.
*/
- if (res != LOCKACQUIRE_ALREADY_HELD)
+ if (res != LOCKACQUIRE_ALREADY_CLEAR)
+ {
AcceptInvalidationMessages();
+ MarkLockClear(locallock);
+ }
}
/*
@@ -138,11 +148,12 @@ bool
ConditionalLockRelationOid(Oid relid, LOCKMODE lockmode)
{
LOCKTAG tag;
+ LOCALLOCK *locallock;
LockAcquireResult res;
SetLocktagRelationOid(&tag, relid);
- res = LockAcquire(&tag, lockmode, false, true);
+ res = LockAcquireExtended(&tag, lockmode, false, true, true, &locallock);
if (res == LOCKACQUIRE_NOT_AVAIL)
return false;
@@ -151,8 +162,11 @@ ConditionalLockRelationOid(Oid relid, LOCKMODE lockmode)
* Now that we have the lock, check for invalidation messages; see notes
* in LockRelationOid.
*/
- if (res != LOCKACQUIRE_ALREADY_HELD)
+ if (res != LOCKACQUIRE_ALREADY_CLEAR)
+ {
AcceptInvalidationMessages();
+ MarkLockClear(locallock);
+ }
return true;
}
@@ -199,20 +213,24 @@ void
LockRelation(Relation relation, LOCKMODE lockmode)
{
LOCKTAG tag;
+ LOCALLOCK *locallock;
LockAcquireResult res;
SET_LOCKTAG_RELATION(tag,
relation->rd_lockInfo.lockRelId.dbId,
relation->rd_lockInfo.lockRelId.relId);
- res = LockAcquire(&tag, lockmode, false, false);
+ res = LockAcquireExtended(&tag, lockmode, false, false, true, &locallock);
/*
* Now that we have the lock, check for invalidation messages; see notes
* in LockRelationOid.
*/
- if (res != LOCKACQUIRE_ALREADY_HELD)
+ if (res != LOCKACQUIRE_ALREADY_CLEAR)
+ {
AcceptInvalidationMessages();
+ MarkLockClear(locallock);
+ }
}
/*
@@ -226,13 +244,14 @@ bool
ConditionalLockRelation(Relation relation, LOCKMODE lockmode)
{
LOCKTAG tag;
+ LOCALLOCK *locallock;
LockAcquireResult res;
SET_LOCKTAG_RELATION(tag,
relation->rd_lockInfo.lockRelId.dbId,
relation->rd_lockInfo.lockRelId.relId);
- res = LockAcquire(&tag, lockmode, false, true);
+ res = LockAcquireExtended(&tag, lockmode, false, true, true, &locallock);
if (res == LOCKACQUIRE_NOT_AVAIL)
return false;
@@ -241,8 +260,11 @@ ConditionalLockRelation(Relation relation, LOCKMODE lockmode)
* Now that we have the lock, check for invalidation messages; see notes
* in LockRelationOid.
*/
- if (res != LOCKACQUIRE_ALREADY_HELD)
+ if (res != LOCKACQUIRE_ALREADY_CLEAR)
+ {
AcceptInvalidationMessages();
+ MarkLockClear(locallock);
+ }
return true;
}
diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c
index 06d8e9cf26f..95093c47e3c 100644
--- a/src/backend/storage/lmgr/lock.c
+++ b/src/backend/storage/lmgr/lock.c
@@ -653,6 +653,7 @@ LockHasWaiters(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
* LOCKACQUIRE_NOT_AVAIL lock not available, and dontWait=true
* LOCKACQUIRE_OK lock successfully acquired
* LOCKACQUIRE_ALREADY_HELD incremented count for lock already held
+ * LOCKACQUIRE_ALREADY_CLEAR incremented count for lock already clear
*
* In the normal case where dontWait=false and the caller doesn't need to
* distinguish a freshly acquired lock from one already taken earlier in
@@ -669,7 +670,8 @@ LockAcquire(const LOCKTAG *locktag,
bool sessionLock,
bool dontWait)
{
- return LockAcquireExtended(locktag, lockmode, sessionLock, dontWait, true);
+ return LockAcquireExtended(locktag, lockmode, sessionLock, dontWait,
+ true, NULL);
}
/*
@@ -680,13 +682,17 @@ LockAcquire(const LOCKTAG *locktag,
* caller to note that the lock table is full and then begin taking
* extreme action to reduce the number of other lock holders before
* retrying the action.
+ *
+ * If locallockp isn't NULL, *locallockp receives a pointer to the LOCALLOCK
+ * table entry if a lock is successfully acquired, or NULL if not.
*/
LockAcquireResult
LockAcquireExtended(const LOCKTAG *locktag,
LOCKMODE lockmode,
bool sessionLock,
bool dontWait,
- bool reportMemoryError)
+ bool reportMemoryError,
+ LOCALLOCK **locallockp)
{
LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
LockMethod lockMethodTable;
@@ -753,6 +759,7 @@ LockAcquireExtended(const LOCKTAG *locktag,
locallock->numLockOwners = 0;
locallock->maxLockOwners = 8;
locallock->holdsStrongLockCount = FALSE;
+ locallock->lockCleared = false;
locallock->lockOwners = NULL; /* in case next line fails */
locallock->lockOwners = (LOCALLOCKOWNER *)
MemoryContextAlloc(TopMemoryContext,
@@ -773,13 +780,22 @@ LockAcquireExtended(const LOCKTAG *locktag,
}
hashcode = locallock->hashcode;
+ if (locallockp)
+ *locallockp = locallock;
+
/*
* If we already hold the lock, we can just increase the count locally.
+ *
+ * If lockCleared is already set, caller need not worry about absorbing
+ * sinval messages related to the lock's object.
*/
if (locallock->nLocks > 0)
{
GrantLockLocal(locallock, owner);
- return LOCKACQUIRE_ALREADY_HELD;
+ if (locallock->lockCleared)
+ return LOCKACQUIRE_ALREADY_CLEAR;
+ else
+ return LOCKACQUIRE_ALREADY_HELD;
}
/*
@@ -861,6 +877,10 @@ LockAcquireExtended(const LOCKTAG *locktag,
hashcode))
{
AbortStrongLockAcquire();
+ if (locallock->nLocks == 0)
+ RemoveLocalLock(locallock);
+ if (locallockp)
+ *locallockp = NULL;
if (reportMemoryError)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
@@ -895,6 +915,10 @@ LockAcquireExtended(const LOCKTAG *locktag,
{
AbortStrongLockAcquire();
LWLockRelease(partitionLock);
+ if (locallock->nLocks == 0)
+ RemoveLocalLock(locallock);
+ if (locallockp)
+ *locallockp = NULL;
if (reportMemoryError)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
@@ -960,6 +984,8 @@ LockAcquireExtended(const LOCKTAG *locktag,
LWLockRelease(partitionLock);
if (locallock->nLocks == 0)
RemoveLocalLock(locallock);
+ if (locallockp)
+ *locallockp = NULL;
return LOCKACQUIRE_NOT_AVAIL;
}
@@ -1562,6 +1588,20 @@ GrantAwaitedLock(void)
}
/*
+ * MarkLockClear -- mark an acquired lock as "clear"
+ *
+ * This means that we know we have absorbed all sinval messages that other
+ * sessions generated before we acquired this lock, and so we can confidently
+ * assume we know about any catalog changes protected by this lock.
+ */
+void
+MarkLockClear(LOCALLOCK *locallock)
+{
+ Assert(locallock->nLocks > 0);
+ locallock->lockCleared = true;
+}
+
+/*
* WaitOnLock -- wait to acquire a lock
*
* Caller must have set MyProc->heldLocks to reflect locks already held
@@ -1828,6 +1868,15 @@ LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
if (locallock->nLocks > 0)
return TRUE;
+ /*
+ * At this point we can no longer suppose we are clear of invalidation
+ * messages related to this lock. Although we'll delete the LOCALLOCK
+ * object before any intentional return from this routine, it seems worth
+ * the trouble to explicitly reset lockCleared right now, just in case
+ * some error prevents us from deleting the LOCALLOCK.
+ */
+ locallock->lockCleared = false;
+
/* Attempt fast release of any lock eligible for the fast path. */
if (EligibleForRelationFastPath(locktag, lockmode) &&
FastPathLocalUseCount > 0)
diff --git a/src/include/storage/lock.h b/src/include/storage/lock.h
index 96fe3a66ab0..c19a934b09a 100644
--- a/src/include/storage/lock.h
+++ b/src/include/storage/lock.h
@@ -435,6 +435,7 @@ typedef struct LOCALLOCK
int numLockOwners; /* # of relevant ResourceOwners */
int maxLockOwners; /* allocated size of array */
bool holdsStrongLockCount; /* bumped FastPathStrongRelationLocks */
+ bool lockCleared; /* we read all sinval msgs for lock */
LOCALLOCKOWNER *lockOwners; /* dynamically resizable array */
} LOCALLOCK;
@@ -469,7 +470,8 @@ typedef enum
{
LOCKACQUIRE_NOT_AVAIL, /* lock not available, and dontWait=true */
LOCKACQUIRE_OK, /* lock successfully acquired */
- LOCKACQUIRE_ALREADY_HELD /* incremented count for lock already held */
+ LOCKACQUIRE_ALREADY_HELD, /* incremented count for lock already held */
+ LOCKACQUIRE_ALREADY_CLEAR /* incremented count for lock already clear */
} LockAcquireResult;
/* Deadlock states identified by DeadLockCheck() */
@@ -513,8 +515,10 @@ extern LockAcquireResult LockAcquireExtended(const LOCKTAG *locktag,
LOCKMODE lockmode,
bool sessionLock,
bool dontWait,
- bool report_memory_error);
+ bool reportMemoryError,
+ LOCALLOCK **locallockp);
extern void AbortStrongLockAcquire(void);
+extern void MarkLockClear(LOCALLOCK *locallock);
extern bool LockRelease(const LOCKTAG *locktag,
LOCKMODE lockmode, bool sessionLock);
extern void LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks);