aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/storage/ipc/standby.c3
-rw-r--r--src/backend/storage/lmgr/lmgr.c38
-rw-r--r--src/backend/storage/lmgr/lock.c57
-rw-r--r--src/include/storage/lock.h10
4 files changed, 92 insertions, 16 deletions
diff --git a/src/backend/storage/ipc/standby.c b/src/backend/storage/ipc/standby.c
index 2e077028951..f9c12e603b1 100644
--- a/src/backend/storage/ipc/standby.c
+++ b/src/backend/storage/ipc/standby.c
@@ -661,7 +661,8 @@ StandbyAcquireAccessExclusiveLock(TransactionId xid, Oid dbOid, Oid relOid)
SET_LOCKTAG_RELATION(locktag, newlock->dbOid, newlock->relOid);
- LockAcquireExtended(&locktag, AccessExclusiveLock, true, false, false);
+ (void) LockAcquireExtended(&locktag, AccessExclusiveLock, true, false,
+ false, NULL);
}
static void
diff --git a/src/backend/storage/lmgr/lmgr.c b/src/backend/storage/lmgr/lmgr.c
index 7b2dcb6c600..dc0a4396388 100644
--- a/src/backend/storage/lmgr/lmgr.c
+++ b/src/backend/storage/lmgr/lmgr.c
@@ -105,11 +105,12 @@ void
LockRelationOid(Oid relid, LOCKMODE lockmode)
{
LOCKTAG tag;
+ LOCALLOCK *locallock;
LockAcquireResult res;
SetLocktagRelationOid(&tag, relid);
- res = LockAcquire(&tag, lockmode, false, false);
+ res = LockAcquireExtended(&tag, lockmode, false, false, true, &locallock);
/*
* Now that we have the lock, check for invalidation messages, so that we
@@ -120,9 +121,18 @@ LockRelationOid(Oid relid, LOCKMODE lockmode)
* relcache entry in an undesirable way. (In the case where our own xact
* modifies the rel, the relcache update happens via
* CommandCounterIncrement, not here.)
+ *
+ * However, in corner cases where code acts on tables (usually catalogs)
+ * recursively, we might get here while still processing invalidation
+ * messages in some outer execution of this function or a sibling. The
+ * "cleared" status of the lock tells us whether we really are done
+ * absorbing relevant inval messages.
*/
- if (res != LOCKACQUIRE_ALREADY_HELD)
+ if (res != LOCKACQUIRE_ALREADY_CLEAR)
+ {
AcceptInvalidationMessages();
+ MarkLockClear(locallock);
+ }
}
/*
@@ -138,11 +148,12 @@ bool
ConditionalLockRelationOid(Oid relid, LOCKMODE lockmode)
{
LOCKTAG tag;
+ LOCALLOCK *locallock;
LockAcquireResult res;
SetLocktagRelationOid(&tag, relid);
- res = LockAcquire(&tag, lockmode, false, true);
+ res = LockAcquireExtended(&tag, lockmode, false, true, true, &locallock);
if (res == LOCKACQUIRE_NOT_AVAIL)
return false;
@@ -151,8 +162,11 @@ ConditionalLockRelationOid(Oid relid, LOCKMODE lockmode)
* Now that we have the lock, check for invalidation messages; see notes
* in LockRelationOid.
*/
- if (res != LOCKACQUIRE_ALREADY_HELD)
+ if (res != LOCKACQUIRE_ALREADY_CLEAR)
+ {
AcceptInvalidationMessages();
+ MarkLockClear(locallock);
+ }
return true;
}
@@ -199,20 +213,24 @@ void
LockRelation(Relation relation, LOCKMODE lockmode)
{
LOCKTAG tag;
+ LOCALLOCK *locallock;
LockAcquireResult res;
SET_LOCKTAG_RELATION(tag,
relation->rd_lockInfo.lockRelId.dbId,
relation->rd_lockInfo.lockRelId.relId);
- res = LockAcquire(&tag, lockmode, false, false);
+ res = LockAcquireExtended(&tag, lockmode, false, false, true, &locallock);
/*
* Now that we have the lock, check for invalidation messages; see notes
* in LockRelationOid.
*/
- if (res != LOCKACQUIRE_ALREADY_HELD)
+ if (res != LOCKACQUIRE_ALREADY_CLEAR)
+ {
AcceptInvalidationMessages();
+ MarkLockClear(locallock);
+ }
}
/*
@@ -226,13 +244,14 @@ bool
ConditionalLockRelation(Relation relation, LOCKMODE lockmode)
{
LOCKTAG tag;
+ LOCALLOCK *locallock;
LockAcquireResult res;
SET_LOCKTAG_RELATION(tag,
relation->rd_lockInfo.lockRelId.dbId,
relation->rd_lockInfo.lockRelId.relId);
- res = LockAcquire(&tag, lockmode, false, true);
+ res = LockAcquireExtended(&tag, lockmode, false, true, true, &locallock);
if (res == LOCKACQUIRE_NOT_AVAIL)
return false;
@@ -241,8 +260,11 @@ ConditionalLockRelation(Relation relation, LOCKMODE lockmode)
* Now that we have the lock, check for invalidation messages; see notes
* in LockRelationOid.
*/
- if (res != LOCKACQUIRE_ALREADY_HELD)
+ if (res != LOCKACQUIRE_ALREADY_CLEAR)
+ {
AcceptInvalidationMessages();
+ MarkLockClear(locallock);
+ }
return true;
}
diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c
index dc3d8d98179..58d1f32cf36 100644
--- a/src/backend/storage/lmgr/lock.c
+++ b/src/backend/storage/lmgr/lock.c
@@ -669,6 +669,7 @@ LockHasWaiters(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
* LOCKACQUIRE_NOT_AVAIL lock not available, and dontWait=true
* LOCKACQUIRE_OK lock successfully acquired
* LOCKACQUIRE_ALREADY_HELD incremented count for lock already held
+ * LOCKACQUIRE_ALREADY_CLEAR incremented count for lock already clear
*
* In the normal case where dontWait=false and the caller doesn't need to
* distinguish a freshly acquired lock from one already taken earlier in
@@ -685,7 +686,8 @@ LockAcquire(const LOCKTAG *locktag,
bool sessionLock,
bool dontWait)
{
- return LockAcquireExtended(locktag, lockmode, sessionLock, dontWait, true);
+ return LockAcquireExtended(locktag, lockmode, sessionLock, dontWait,
+ true, NULL);
}
/*
@@ -696,13 +698,17 @@ LockAcquire(const LOCKTAG *locktag,
* caller to note that the lock table is full and then begin taking
* extreme action to reduce the number of other lock holders before
* retrying the action.
+ *
+ * If locallockp isn't NULL, *locallockp receives a pointer to the LOCALLOCK
+ * table entry if a lock is successfully acquired, or NULL if not.
*/
LockAcquireResult
LockAcquireExtended(const LOCKTAG *locktag,
LOCKMODE lockmode,
bool sessionLock,
bool dontWait,
- bool reportMemoryError)
+ bool reportMemoryError,
+ LOCALLOCK **locallockp)
{
LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
LockMethod lockMethodTable;
@@ -766,9 +772,10 @@ LockAcquireExtended(const LOCKTAG *locktag,
locallock->proclock = NULL;
locallock->hashcode = LockTagHashCode(&(localtag.lock));
locallock->nLocks = 0;
+ locallock->holdsStrongLockCount = false;
+ locallock->lockCleared = false;
locallock->numLockOwners = 0;
locallock->maxLockOwners = 8;
- locallock->holdsStrongLockCount = false;
locallock->lockOwners = NULL; /* in case next line fails */
locallock->lockOwners = (LOCALLOCKOWNER *)
MemoryContextAlloc(TopMemoryContext,
@@ -789,13 +796,22 @@ LockAcquireExtended(const LOCKTAG *locktag,
}
hashcode = locallock->hashcode;
+ if (locallockp)
+ *locallockp = locallock;
+
/*
* If we already hold the lock, we can just increase the count locally.
+ *
+ * If lockCleared is already set, caller need not worry about absorbing
+ * sinval messages related to the lock's object.
*/
if (locallock->nLocks > 0)
{
GrantLockLocal(locallock, owner);
- return LOCKACQUIRE_ALREADY_HELD;
+ if (locallock->lockCleared)
+ return LOCKACQUIRE_ALREADY_CLEAR;
+ else
+ return LOCKACQUIRE_ALREADY_HELD;
}
/*
@@ -877,6 +893,10 @@ LockAcquireExtended(const LOCKTAG *locktag,
hashcode))
{
AbortStrongLockAcquire();
+ if (locallock->nLocks == 0)
+ RemoveLocalLock(locallock);
+ if (locallockp)
+ *locallockp = NULL;
if (reportMemoryError)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
@@ -911,6 +931,10 @@ LockAcquireExtended(const LOCKTAG *locktag,
{
AbortStrongLockAcquire();
LWLockRelease(partitionLock);
+ if (locallock->nLocks == 0)
+ RemoveLocalLock(locallock);
+ if (locallockp)
+ *locallockp = NULL;
if (reportMemoryError)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
@@ -976,6 +1000,8 @@ LockAcquireExtended(const LOCKTAG *locktag,
LWLockRelease(partitionLock);
if (locallock->nLocks == 0)
RemoveLocalLock(locallock);
+ if (locallockp)
+ *locallockp = NULL;
return LOCKACQUIRE_NOT_AVAIL;
}
@@ -1646,6 +1672,20 @@ GrantAwaitedLock(void)
}
/*
+ * MarkLockClear -- mark an acquired lock as "clear"
+ *
+ * This means that we know we have absorbed all sinval messages that other
+ * sessions generated before we acquired this lock, and so we can confidently
+ * assume we know about any catalog changes protected by this lock.
+ */
+void
+MarkLockClear(LOCALLOCK *locallock)
+{
+ Assert(locallock->nLocks > 0);
+ locallock->lockCleared = true;
+}
+
+/*
* WaitOnLock -- wait to acquire a lock
*
* Caller must have set MyProc->heldLocks to reflect locks already held
@@ -1909,6 +1949,15 @@ LockRelease(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
if (locallock->nLocks > 0)
return true;
+ /*
+ * At this point we can no longer suppose we are clear of invalidation
+ * messages related to this lock. Although we'll delete the LOCALLOCK
+ * object before any intentional return from this routine, it seems worth
+ * the trouble to explicitly reset lockCleared right now, just in case
+ * some error prevents us from deleting the LOCALLOCK.
+ */
+ locallock->lockCleared = false;
+
/* Attempt fast release of any lock eligible for the fast path. */
if (EligibleForRelationFastPath(locktag, lockmode) &&
FastPathLocalUseCount > 0)
diff --git a/src/include/storage/lock.h b/src/include/storage/lock.h
index 777da716790..ff4df7fec51 100644
--- a/src/include/storage/lock.h
+++ b/src/include/storage/lock.h
@@ -408,9 +408,10 @@ typedef struct LOCALLOCK
PROCLOCK *proclock; /* associated PROCLOCK object, if any */
uint32 hashcode; /* copy of LOCKTAG's hash value */
int64 nLocks; /* total number of times lock is held */
+ bool holdsStrongLockCount; /* bumped FastPathStrongRelationLocks */
+ bool lockCleared; /* we read all sinval msgs for lock */
int numLockOwners; /* # of relevant ResourceOwners */
int maxLockOwners; /* allocated size of array */
- bool holdsStrongLockCount; /* bumped FastPathStrongRelationLocks */
LOCALLOCKOWNER *lockOwners; /* dynamically resizable array */
} LOCALLOCK;
@@ -472,7 +473,8 @@ typedef enum
{
LOCKACQUIRE_NOT_AVAIL, /* lock not available, and dontWait=true */
LOCKACQUIRE_OK, /* lock successfully acquired */
- LOCKACQUIRE_ALREADY_HELD /* incremented count for lock already held */
+ LOCKACQUIRE_ALREADY_HELD, /* incremented count for lock already held */
+ LOCKACQUIRE_ALREADY_CLEAR /* incremented count for lock already clear */
} LockAcquireResult;
/* Deadlock states identified by DeadLockCheck() */
@@ -528,8 +530,10 @@ extern LockAcquireResult LockAcquireExtended(const LOCKTAG *locktag,
LOCKMODE lockmode,
bool sessionLock,
bool dontWait,
- bool report_memory_error);
+ bool reportMemoryError,
+ LOCALLOCK **locallockp);
extern void AbortStrongLockAcquire(void);
+extern void MarkLockClear(LOCALLOCK *locallock);
extern bool LockRelease(const LOCKTAG *locktag,
LOCKMODE lockmode, bool sessionLock);
extern void LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks);