aboutsummaryrefslogtreecommitdiff
path: root/src/backend/storage
diff options
context:
space:
mode:
authorTom Lane <tgl@sss.pgh.pa.us>2005-06-14 22:15:33 +0000
committerTom Lane <tgl@sss.pgh.pa.us>2005-06-14 22:15:33 +0000
commit8563ccae2caf0119355c5c80cfa59da19ce809d6 (patch)
treec53bd9d44280939a986d44e26d7ee1f29216cb3e /src/backend/storage
parentf5835b4b8d569e4d7814886f9e7b90c36d8a4fb5 (diff)
downloadpostgresql-8563ccae2caf0119355c5c80cfa59da19ce809d6.tar.gz
postgresql-8563ccae2caf0119355c5c80cfa59da19ce809d6.zip
Simplify shared-memory lock data structures as per recent discussion:
it is sufficient to track whether a backend holds a lock or not, and store information about transaction vs. session locks only in the inside-the-backend LocalLockTable. Since there can now be but one PROCLOCK per lock per backend, LockCountMyLocks() is no longer needed, thus eliminating some O(N^2) behavior when a backend holds many locks. Also simplify the LockAcquire/LockRelease API by passing just a 'sessionLock' boolean instead of a transaction ID. The previous API was designed with the idea that per-transaction lock holding would be important for subtransactions, but now that we have subtransactions we know that this is unwanted. While at it, add an 'isTempObject' parameter to LockAcquire to indicate whether the lock is being taken on a temp table. This is not used just yet, but will be needed shortly for two-phase commit.
Diffstat (limited to 'src/backend/storage')
-rw-r--r--src/backend/storage/lmgr/README34
-rw-r--r--src/backend/storage/lmgr/lmgr.c77
-rw-r--r--src/backend/storage/lmgr/lock.c276
-rw-r--r--src/backend/storage/lmgr/proc.c8
4 files changed, 181 insertions, 214 deletions
diff --git a/src/backend/storage/lmgr/README b/src/backend/storage/lmgr/README
index 6e5c36e1373..943b2f5bc03 100644
--- a/src/backend/storage/lmgr/README
+++ b/src/backend/storage/lmgr/README
@@ -1,4 +1,4 @@
-$PostgreSQL: pgsql/src/backend/storage/lmgr/README,v 1.16 2005/04/29 22:28:24 tgl Exp $
+$PostgreSQL: pgsql/src/backend/storage/lmgr/README,v 1.17 2005/06/14 22:15:32 tgl Exp $
LOCKING OVERVIEW
@@ -57,16 +57,17 @@ details.
There are two fundamental lock structures in shared memory: the
per-lockable-object LOCK struct, and the per-lock-and-requestor PROCLOCK
struct. A LOCK object exists for each lockable object that currently has
-locks held or requested on it. A PROCLOCK struct exists for each transaction
+locks held or requested on it. A PROCLOCK struct exists for each backend
that is holding or requesting lock(s) on each LOCK object.
In addition to these, each backend maintains an unshared LOCALLOCK structure
for each lockable object and lock mode that it is currently holding or
requesting. The shared lock structures only allow a single lock grant to
-be made per lockable object/lock mode/transaction. Internally to a backend,
+be made per lockable object/lock mode/backend. Internally to a backend,
however, the same lock may be requested and perhaps released multiple times
-in a transaction. The internal request counts are held in LOCALLOCK so that
-the shared LockMgrLock need not be obtained to alter them.
+in a transaction, and it can also be held both transactionally and session-
+wide. The internal request counts are held in LOCALLOCK so that the shared
+LockMgrLock need not be obtained to alter them.
---------------------------------------------------------------------------
@@ -112,9 +113,8 @@ nRequested -
acquired. The count includes attempts by processes which were put
to sleep due to conflicts. It also counts the same backend twice
if, for example, a backend process first acquires a read and then
- acquires a write, or acquires the lock under two different transaction
- IDs. (But multiple acquisitions of the same lock/lock mode under the
- same transaction ID are not multiply counted here; they are recorded
+ acquires a write. (But multiple acquisitions of the same lock/lock mode
+ within a backend are not multiply counted here; they are recorded
only in the backend's LOCALLOCK structure.)
requested -
@@ -153,23 +153,17 @@ tag -
tag.proc
SHMEM offset of PROC of backend process that owns this PROCLOCK.
- tag.xid
- XID of transaction this PROCLOCK is for, or InvalidTransactionId
- if the PROCLOCK is for session-level locking.
-
- Note that this structure will support multiple transactions running
- concurrently in one backend. Currently we do not use it for that
- purpose: subtransactions acquire locks in the name of their top parent
- transaction, to simplify reassigning lock ownership at subtransaction end.
- So the XID field is really only needed to distinguish per-transaction
- locks from session locks. User locks are always session locks, and we
- also use session locks for multi-transaction operations like VACUUM.
-
holdMask -
A bitmask for the lock types successfully acquired by this PROCLOCK.
This should be a subset of the LOCK object's grantMask, and also a
subset of the PGPROC object's heldLocks mask.
+releaseMask -
+ A bitmask for the lock types due to be released during LockReleaseAll.
+ This must be a subset of the holdMask. Note that it is modified without
+ taking the LockMgrLock, and therefore it is unsafe for any backend except
+ the one owning the PROCLOCK to examine/change it.
+
lockLink -
List link for shared memory queue of all the PROCLOCK objects for the
same LOCK.
diff --git a/src/backend/storage/lmgr/lmgr.c b/src/backend/storage/lmgr/lmgr.c
index b7c5903c8a1..351ef27bee7 100644
--- a/src/backend/storage/lmgr/lmgr.c
+++ b/src/backend/storage/lmgr/lmgr.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/storage/lmgr/lmgr.c,v 1.75 2005/05/29 22:45:02 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/storage/lmgr/lmgr.c,v 1.76 2005/06/14 22:15:32 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -142,8 +142,8 @@ LockRelation(Relation relation, LOCKMODE lockmode)
relation->rd_lockInfo.lockRelId.dbId,
relation->rd_lockInfo.lockRelId.relId);
- res = LockAcquire(LockTableId, &tag, GetTopTransactionId(),
- lockmode, false);
+ res = LockAcquire(LockTableId, &tag, relation->rd_istemp,
+ lockmode, false, false);
/*
* Check to see if the relcache entry has been invalidated while we
@@ -179,8 +179,8 @@ ConditionalLockRelation(Relation relation, LOCKMODE lockmode)
relation->rd_lockInfo.lockRelId.dbId,
relation->rd_lockInfo.lockRelId.relId);
- res = LockAcquire(LockTableId, &tag, GetTopTransactionId(),
- lockmode, true);
+ res = LockAcquire(LockTableId, &tag, relation->rd_istemp,
+ lockmode, false, true);
if (res == LOCKACQUIRE_NOT_AVAIL)
return false;
@@ -214,7 +214,7 @@ UnlockRelation(Relation relation, LOCKMODE lockmode)
relation->rd_lockInfo.lockRelId.dbId,
relation->rd_lockInfo.lockRelId.relId);
- LockRelease(LockTableId, &tag, GetTopTransactionId(), lockmode);
+ LockRelease(LockTableId, &tag, lockmode, false);
}
/*
@@ -230,14 +230,14 @@ UnlockRelation(Relation relation, LOCKMODE lockmode)
* relcache entry is up to date.
*/
void
-LockRelationForSession(LockRelId *relid, LOCKMODE lockmode)
+LockRelationForSession(LockRelId *relid, bool istemprel, LOCKMODE lockmode)
{
LOCKTAG tag;
SET_LOCKTAG_RELATION(tag, relid->dbId, relid->relId);
- (void) LockAcquire(LockTableId, &tag, InvalidTransactionId,
- lockmode, false);
+ (void) LockAcquire(LockTableId, &tag, istemprel,
+ lockmode, true, false);
}
/*
@@ -250,7 +250,7 @@ UnlockRelationForSession(LockRelId *relid, LOCKMODE lockmode)
SET_LOCKTAG_RELATION(tag, relid->dbId, relid->relId);
- LockRelease(LockTableId, &tag, InvalidTransactionId, lockmode);
+ LockRelease(LockTableId, &tag, lockmode, true);
}
/*
@@ -272,8 +272,8 @@ LockRelationForExtension(Relation relation, LOCKMODE lockmode)
relation->rd_lockInfo.lockRelId.dbId,
relation->rd_lockInfo.lockRelId.relId);
- (void) LockAcquire(LockTableId, &tag, GetTopTransactionId(),
- lockmode, false);
+ (void) LockAcquire(LockTableId, &tag, relation->rd_istemp,
+ lockmode, false, false);
}
/*
@@ -288,7 +288,7 @@ UnlockRelationForExtension(Relation relation, LOCKMODE lockmode)
relation->rd_lockInfo.lockRelId.dbId,
relation->rd_lockInfo.lockRelId.relId);
- LockRelease(LockTableId, &tag, GetTopTransactionId(), lockmode);
+ LockRelease(LockTableId, &tag, lockmode, false);
}
/*
@@ -307,8 +307,8 @@ LockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode)
relation->rd_lockInfo.lockRelId.relId,
blkno);
- (void) LockAcquire(LockTableId, &tag, GetTopTransactionId(),
- lockmode, false);
+ (void) LockAcquire(LockTableId, &tag, relation->rd_istemp,
+ lockmode, false, false);
}
/*
@@ -327,8 +327,8 @@ ConditionalLockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode)
relation->rd_lockInfo.lockRelId.relId,
blkno);
- return (LockAcquire(LockTableId, &tag, GetTopTransactionId(),
- lockmode, true) != LOCKACQUIRE_NOT_AVAIL);
+ return (LockAcquire(LockTableId, &tag, relation->rd_istemp,
+ lockmode, false, true) != LOCKACQUIRE_NOT_AVAIL);
}
/*
@@ -344,7 +344,7 @@ UnlockPage(Relation relation, BlockNumber blkno, LOCKMODE lockmode)
relation->rd_lockInfo.lockRelId.relId,
blkno);
- LockRelease(LockTableId, &tag, GetTopTransactionId(), lockmode);
+ LockRelease(LockTableId, &tag, lockmode, false);
}
/*
@@ -365,8 +365,8 @@ LockTuple(Relation relation, ItemPointer tid, LOCKMODE lockmode)
ItemPointerGetBlockNumber(tid),
ItemPointerGetOffsetNumber(tid));
- (void) LockAcquire(LockTableId, &tag, GetTopTransactionId(),
- lockmode, false);
+ (void) LockAcquire(LockTableId, &tag, relation->rd_istemp,
+ lockmode, false, false);
}
/*
@@ -383,7 +383,7 @@ UnlockTuple(Relation relation, ItemPointer tid, LOCKMODE lockmode)
ItemPointerGetBlockNumber(tid),
ItemPointerGetOffsetNumber(tid));
- LockRelease(LockTableId, &tag, GetTopTransactionId(), lockmode);
+ LockRelease(LockTableId, &tag, lockmode, false);
}
/*
@@ -400,8 +400,8 @@ XactLockTableInsert(TransactionId xid)
SET_LOCKTAG_TRANSACTION(tag, xid);
- (void) LockAcquire(LockTableId, &tag, GetTopTransactionId(),
- ExclusiveLock, false);
+ (void) LockAcquire(LockTableId, &tag, false,
+ ExclusiveLock, false, false);
}
/*
@@ -419,7 +419,7 @@ XactLockTableDelete(TransactionId xid)
SET_LOCKTAG_TRANSACTION(tag, xid);
- LockRelease(LockTableId, &tag, GetTopTransactionId(), ExclusiveLock);
+ LockRelease(LockTableId, &tag, ExclusiveLock, false);
}
/*
@@ -438,19 +438,18 @@ void
XactLockTableWait(TransactionId xid)
{
LOCKTAG tag;
- TransactionId myxid = GetTopTransactionId();
for (;;)
{
Assert(TransactionIdIsValid(xid));
- Assert(!TransactionIdEquals(xid, myxid));
+ Assert(!TransactionIdEquals(xid, GetTopTransactionId()));
SET_LOCKTAG_TRANSACTION(tag, xid);
- (void) LockAcquire(LockTableId, &tag, myxid,
- ShareLock, false);
+ (void) LockAcquire(LockTableId, &tag, false,
+ ShareLock, false, false);
- LockRelease(LockTableId, &tag, myxid, ShareLock);
+ LockRelease(LockTableId, &tag, ShareLock, false);
if (!TransactionIdIsInProgress(xid))
break;
@@ -470,9 +469,11 @@ XactLockTableWait(TransactionId xid)
* LockDatabaseObject
*
* Obtain a lock on a general object of the current database. Don't use
- * this for shared objects (such as tablespaces). It's usually unwise to
- * apply it to entire relations, also, since a lock taken this way will
- * NOT conflict with LockRelation.
+ * this for shared objects (such as tablespaces). It's unwise to apply it
+ * to relations, also, since a lock taken this way will NOT conflict with
+ * LockRelation, and also may be wrongly marked if the relation is temp.
+ * (If we ever invent temp objects that aren't tables, we'll want to extend
+ * the API of this routine to include an isTempObject flag.)
*/
void
LockDatabaseObject(Oid classid, Oid objid, uint16 objsubid,
@@ -486,8 +487,8 @@ LockDatabaseObject(Oid classid, Oid objid, uint16 objsubid,
objid,
objsubid);
- (void) LockAcquire(LockTableId, &tag, GetTopTransactionId(),
- lockmode, false);
+ (void) LockAcquire(LockTableId, &tag, false,
+ lockmode, false, false);
}
/*
@@ -505,7 +506,7 @@ UnlockDatabaseObject(Oid classid, Oid objid, uint16 objsubid,
objid,
objsubid);
- LockRelease(LockTableId, &tag, GetTopTransactionId(), lockmode);
+ LockRelease(LockTableId, &tag, lockmode, false);
}
/*
@@ -525,8 +526,8 @@ LockSharedObject(Oid classid, Oid objid, uint16 objsubid,
objid,
objsubid);
- (void) LockAcquire(LockTableId, &tag, GetTopTransactionId(),
- lockmode, false);
+ (void) LockAcquire(LockTableId, &tag, false,
+ lockmode, false, false);
}
/*
@@ -544,5 +545,5 @@ UnlockSharedObject(Oid classid, Oid objid, uint16 objsubid,
objid,
objsubid);
- LockRelease(LockTableId, &tag, GetTopTransactionId(), lockmode);
+ LockRelease(LockTableId, &tag, lockmode, false);
}
diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c
index 6f02c0720e2..08e579486e0 100644
--- a/src/backend/storage/lmgr/lock.c
+++ b/src/backend/storage/lmgr/lock.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/storage/lmgr/lock.c,v 1.154 2005/05/29 22:45:02 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/storage/lmgr/lock.c,v 1.155 2005/06/14 22:15:32 tgl Exp $
*
* NOTES
* Outside modules can create a lock table and acquire/release
@@ -144,11 +144,10 @@ PROCLOCK_PRINT(const char *where, const PROCLOCK *proclockP)
{
if (LOCK_DEBUG_ENABLED((LOCK *) MAKE_PTR(proclockP->tag.lock)))
elog(LOG,
- "%s: proclock(%lx) lock(%lx) method(%u) proc(%lx) xid(%u) hold(%x)",
+ "%s: proclock(%lx) lock(%lx) method(%u) proc(%lx) hold(%x)",
where, MAKE_OFFSET(proclockP), proclockP->tag.lock,
PROCLOCK_LOCKMETHOD(*(proclockP)),
- proclockP->tag.proc, proclockP->tag.xid,
- (int) proclockP->holdMask);
+ proclockP->tag.proc, (int) proclockP->holdMask);
}
#else /* not LOCK_DEBUG */
@@ -162,8 +161,6 @@ static void RemoveLocalLock(LOCALLOCK *locallock);
static void GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner);
static void WaitOnLock(LOCKMETHODID lockmethodid, LOCALLOCK *locallock,
ResourceOwner owner);
-static void LockCountMyLocks(SHMEM_OFFSET lockOffset, PGPROC *proc,
- int *myHolding);
static bool UnGrantLock(LOCK *lock, LOCKMODE lockmode,
PROCLOCK *proclock, LockMethod lockMethodTable);
static void CleanUpLock(LOCKMETHODID lockmethodid, LOCK *lock,
@@ -377,6 +374,15 @@ LockMethodTableRename(LOCKMETHODID lockmethodid)
* LockAcquire -- Check for lock conflicts, sleep if conflict found,
* set lock if/when no conflicts.
*
+ * Inputs:
+ * lockmethodid: identifies which lock table to use
+ * locktag: unique identifier for the lockable object
+ * isTempObject: is the lockable object a temporary object? (Under 2PC,
+ * such locks cannot be persisted)
+ * lockmode: lock mode to acquire
+ * sessionLock: if true, acquire lock for session not current transaction
+ * dontWait: if true, don't wait to acquire lock
+ *
* Returns one of:
* LOCKACQUIRE_NOT_AVAIL lock not available, and dontWait=true
* LOCKACQUIRE_OK lock successfully acquired
@@ -420,8 +426,12 @@ LockMethodTableRename(LOCKMETHODID lockmethodid)
* DZ - 22 Nov 1997
*/
LockAcquireResult
-LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
- TransactionId xid, LOCKMODE lockmode, bool dontWait)
+LockAcquire(LOCKMETHODID lockmethodid,
+ LOCKTAG *locktag,
+ bool isTempObject,
+ LOCKMODE lockmode,
+ bool sessionLock,
+ bool dontWait)
{
LOCALLOCKTAG localtag;
LOCALLOCK *locallock;
@@ -433,8 +443,6 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
LWLockId masterLock;
LockMethod lockMethodTable;
int status;
- int myHolding[MAX_LOCKMODES];
- int i;
#ifdef LOCK_DEBUG
if (Trace_userlocks && lockmethodid == USER_LOCKMETHOD)
@@ -452,8 +460,7 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
elog(ERROR, "unrecognized lock method: %d", lockmethodid);
/* Session locks and user locks are not transactional */
- if (xid != InvalidTransactionId &&
- lockmethodid == DEFAULT_LOCKMETHOD)
+ if (!sessionLock && lockmethodid == DEFAULT_LOCKMETHOD)
owner = CurrentResourceOwner;
else
owner = NULL;
@@ -463,7 +470,6 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
*/
MemSet(&localtag, 0, sizeof(localtag)); /* must clear padding */
localtag.lock = *locktag;
- localtag.xid = xid;
localtag.mode = lockmode;
locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash[lockmethodid],
@@ -477,6 +483,7 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
{
locallock->lock = NULL;
locallock->proclock = NULL;
+ locallock->isTempObject = isTempObject;
locallock->nLocks = 0;
locallock->numLockOwners = 0;
locallock->maxLockOwners = 8;
@@ -487,6 +494,8 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
}
else
{
+ Assert(locallock->isTempObject == isTempObject);
+
/* Make sure there will be room to remember the lock */
if (locallock->numLockOwners >= locallock->maxLockOwners)
{
@@ -563,10 +572,9 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
/*
* Create the hash key for the proclock table.
*/
- MemSet(&proclocktag, 0, sizeof(PROCLOCKTAG)); /* must clear padding */
+ MemSet(&proclocktag, 0, sizeof(PROCLOCKTAG)); /* must clear padding */
proclocktag.lock = MAKE_OFFSET(lock);
proclocktag.proc = MAKE_OFFSET(MyProc);
- TransactionIdStore(xid, &proclocktag.xid);
/*
* Find or create a proclock entry with this tag
@@ -605,6 +613,7 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
if (!found)
{
proclock->holdMask = 0;
+ proclock->releaseMask = 0;
/* Add proclock to appropriate lists */
SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink);
SHMQueueInsertBefore(&MyProc->procLocks, &proclock->procLink);
@@ -632,17 +641,22 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
* XXX Doing numeric comparison on the lockmodes is a hack; it'd be
* better to use a table. For now, though, this works.
*/
- for (i = lockMethodTable->numLockModes; i > 0; i--)
{
- if (proclock->holdMask & LOCKBIT_ON(i))
+ int i;
+
+ for (i = lockMethodTable->numLockModes; i > 0; i--)
{
- if (i >= (int) lockmode)
- break; /* safe: we have a lock >= req level */
- elog(LOG, "deadlock risk: raising lock level"
- " from %s to %s on object %u/%u/%u",
- lock_mode_names[i], lock_mode_names[lockmode],
- lock->tag.relId, lock->tag.dbId, lock->tag.objId.blkno);
- break;
+ if (proclock->holdMask & LOCKBIT_ON(i))
+ {
+ if (i >= (int) lockmode)
+ break; /* safe: we have a lock >= req level */
+ elog(LOG, "deadlock risk: raising lock level"
+ " from %s to %s on object %u/%u/%u",
+ lock_mode_names[i], lock_mode_names[lockmode],
+ lock->tag.locktag_field1, lock->tag.locktag_field2,
+ lock->tag.locktag_field3);
+ break;
+ }
}
}
#endif /* CHECK_DEADLOCK_RISK */
@@ -658,18 +672,14 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0));
/*
- * If this process (under any XID) is a holder of the lock, just grant
- * myself another one without blocking.
+ * We shouldn't already hold the desired lock; else locallock table
+ * is broken.
*/
- LockCountMyLocks(proclock->tag.lock, MyProc, myHolding);
- if (myHolding[lockmode] > 0)
- {
- GrantLock(lock, proclock, lockmode);
- GrantLockLocal(locallock, owner);
- PROCLOCK_PRINT("LockAcquire: my other XID owning", proclock);
- LWLockRelease(masterLock);
- return LOCKACQUIRE_ALREADY_HELD;
- }
+ if (proclock->holdMask & LOCKBIT_ON(lockmode))
+ elog(ERROR, "lock %s on object %u/%u/%u is already held",
+ lock_mode_names[lockmode],
+ lock->tag.locktag_field1, lock->tag.locktag_field2,
+ lock->tag.locktag_field3);
/*
* If lock requested conflicts with locks requested by waiters, must
@@ -680,8 +690,7 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
status = STATUS_FOUND;
else
status = LockCheckConflicts(lockMethodTable, lockmode,
- lock, proclock,
- MyProc, myHolding);
+ lock, proclock, MyProc);
if (status == STATUS_OK)
{
@@ -723,18 +732,9 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
}
/*
- * Construct bitmask of locks this process holds on this object.
+ * Set bitmask of locks this process already holds on this object.
*/
- {
- LOCKMASK heldLocks = 0;
-
- for (i = 1; i <= lockMethodTable->numLockModes; i++)
- {
- if (myHolding[i] > 0)
- heldLocks |= LOCKBIT_ON(i);
- }
- MyProc->heldLocks = heldLocks;
- }
+ MyProc->heldLocks = proclock->holdMask;
/*
* Sleep till someone wakes me up.
@@ -793,26 +793,22 @@ RemoveLocalLock(LOCALLOCK *locallock)
*
* NOTES:
* Here's what makes this complicated: one process's locks don't
- * conflict with one another, even if they are held under different
- * transaction IDs (eg, session and xact locks do not conflict).
+ * conflict with one another, no matter what purpose they are held for
+ * (eg, session and transaction locks do not conflict).
* So, we must subtract off our own locks when determining whether the
* requested new lock conflicts with those already held.
- *
- * The caller can optionally pass the process's total holding counts, if
- * known. If NULL is passed then these values will be computed internally.
*/
int
LockCheckConflicts(LockMethod lockMethodTable,
LOCKMODE lockmode,
LOCK *lock,
PROCLOCK *proclock,
- PGPROC *proc,
- int *myHolding) /* myHolding[] array or NULL */
+ PGPROC *proc)
{
int numLockModes = lockMethodTable->numLockModes;
- LOCKMASK bitmask;
+ LOCKMASK myLocks;
+ LOCKMASK otherLocks;
int i;
- int localHolding[MAX_LOCKMODES];
/*
* first check for global conflicts: If no locks conflict with my
@@ -830,32 +826,26 @@ LockCheckConflicts(LockMethod lockMethodTable,
}
/*
- * Rats. Something conflicts. But it could still be my own lock. We
- * have to construct a conflict mask that does not reflect our own
- * locks. Locks held by the current process under another XID also
- * count as "our own locks".
+ * Rats. Something conflicts. But it could still be my own lock.
+ * We have to construct a conflict mask that does not reflect our own
+ * locks, but only lock types held by other processes.
*/
- if (myHolding == NULL)
- {
- /* Caller didn't do calculation of total holding for me */
- LockCountMyLocks(proclock->tag.lock, proc, localHolding);
- myHolding = localHolding;
- }
-
- /* Compute mask of lock types held by other processes */
- bitmask = 0;
+ myLocks = proclock->holdMask;
+ otherLocks = 0;
for (i = 1; i <= numLockModes; i++)
{
- if (lock->granted[i] != myHolding[i])
- bitmask |= LOCKBIT_ON(i);
+ int myHolding = (myLocks & LOCKBIT_ON(i)) ? 1 : 0;
+
+ if (lock->granted[i] > myHolding)
+ otherLocks |= LOCKBIT_ON(i);
}
/*
- * now check again for conflicts. 'bitmask' describes the types of
+ * now check again for conflicts. 'otherLocks' describes the types of
* locks held by other processes. If one of these conflicts with the
* kind of lock that I want, there is a conflict and I have to sleep.
*/
- if (!(lockMethodTable->conflictTab[lockmode] & bitmask))
+ if (!(lockMethodTable->conflictTab[lockmode] & otherLocks))
{
/* no conflict. OK to get the lock */
PROCLOCK_PRINT("LockCheckConflicts: resolved", proclock);
@@ -867,46 +857,6 @@ LockCheckConflicts(LockMethod lockMethodTable,
}
/*
- * LockCountMyLocks --- Count total number of locks held on a given lockable
- * object by a given process (under any transaction ID).
- *
- * XXX This could be rather slow if the process holds a large number of locks.
- * Perhaps it could be sped up if we kept yet a third hashtable of per-
- * process lock information. However, for the normal case where a transaction
- * doesn't hold a large number of locks, keeping such a table would probably
- * be a net slowdown.
- */
-static void
-LockCountMyLocks(SHMEM_OFFSET lockOffset, PGPROC *proc, int *myHolding)
-{
- SHM_QUEUE *procLocks = &(proc->procLocks);
- PROCLOCK *proclock;
-
- MemSet(myHolding, 0, MAX_LOCKMODES * sizeof(int));
-
- proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks,
- offsetof(PROCLOCK, procLink));
-
- while (proclock)
- {
- if (lockOffset == proclock->tag.lock)
- {
- LOCKMASK holdMask = proclock->holdMask;
- int i;
-
- for (i = 1; i < MAX_LOCKMODES; i++)
- {
- if (holdMask & LOCKBIT_ON(i))
- myHolding[i]++;
- }
- }
-
- proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->procLink,
- offsetof(PROCLOCK, procLink));
- }
-}
-
-/*
* GrantLock -- update the lock and proclock data structures to show
* the lock request has been granted.
*
@@ -1086,7 +1036,7 @@ GrantAwaitedLock(void)
* WaitOnLock -- wait to acquire a lock
*
* Caller must have set MyProc->heldLocks to reflect locks already held
- * on the lockable object by this process (under all XIDs).
+ * on the lockable object by this process.
*
* The locktable's masterLock must be held at entry.
*/
@@ -1212,7 +1162,8 @@ RemoveFromWaitQueue(PGPROC *proc)
/*
* LockRelease -- look up 'locktag' in lock table 'lockmethodid' and
- * release one 'lockmode' lock on it.
+ * release one 'lockmode' lock on it. Release a session lock if
+ * 'sessionLock' is true, else release a regular transaction lock.
*
* Side Effects: find any waiting processes that are now wakable,
* grant them their requested locks and awaken them.
@@ -1222,7 +1173,7 @@ RemoveFromWaitQueue(PGPROC *proc)
*/
bool
LockRelease(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
- TransactionId xid, LOCKMODE lockmode)
+ LOCKMODE lockmode, bool sessionLock)
{
LOCALLOCKTAG localtag;
LOCALLOCK *locallock;
@@ -1252,7 +1203,6 @@ LockRelease(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
*/
MemSet(&localtag, 0, sizeof(localtag)); /* must clear padding */
localtag.lock = *locktag;
- localtag.xid = xid;
localtag.mode = lockmode;
locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash[lockmethodid],
@@ -1279,8 +1229,7 @@ LockRelease(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
int i;
/* Session locks and user locks are not transactional */
- if (xid != InvalidTransactionId &&
- lockmethodid == DEFAULT_LOCKMETHOD)
+ if (!sessionLock && lockmethodid == DEFAULT_LOCKMETHOD)
owner = CurrentResourceOwner;
else
owner = NULL;
@@ -1367,15 +1316,11 @@ LockRelease(LOCKMETHODID lockmethodid, LOCKTAG *locktag,
* are held by the current process.
*
* Well, not necessarily *all* locks. The available behaviors are:
- *
- * allxids == true: release all locks regardless of transaction
- * affiliation.
- *
- * allxids == false: release all locks with Xid != 0
- * (zero is the Xid used for "session" locks).
+ * allLocks == true: release all locks including session locks.
+ * allLocks == false: release all non-session locks.
*/
void
-LockReleaseAll(LOCKMETHODID lockmethodid, bool allxids)
+LockReleaseAll(LOCKMETHODID lockmethodid, bool allLocks)
{
HASH_SEQ_STATUS status;
SHM_QUEUE *procLocks = &(MyProc->procLocks);
@@ -1427,13 +1372,41 @@ LockReleaseAll(LOCKMETHODID lockmethodid, bool allxids)
continue;
/*
- * Ignore locks with Xid=0 unless we are asked to release all
- * locks
+ * If we are asked to release all locks, we can just zap the
+ * entry. Otherwise, must scan to see if there are session locks.
+ * We assume there is at most one lockOwners entry for session locks.
*/
- if (TransactionIdEquals(locallock->tag.xid, InvalidTransactionId)
- && !allxids)
- continue;
+ if (!allLocks)
+ {
+ LOCALLOCKOWNER *lockOwners = locallock->lockOwners;
+
+ /* If it's above array position 0, move it down to 0 */
+ for (i = locallock->numLockOwners - 1; i > 0; i--)
+ {
+ if (lockOwners[i].owner == NULL)
+ {
+ lockOwners[0] = lockOwners[i];
+ break;
+ }
+ }
+
+ if (locallock->numLockOwners > 0 &&
+ lockOwners[0].owner == NULL &&
+ lockOwners[0].nLocks > 0)
+ {
+ /* Fix the locallock to show just the session locks */
+ locallock->nLocks = lockOwners[0].nLocks;
+ locallock->numLockOwners = 1;
+ /* We aren't deleting this locallock, so done */
+ continue;
+ }
+ }
+
+ /* Mark the proclock to show we need to release this lockmode */
+ if (locallock->nLocks > 0)
+ locallock->proclock->releaseMask |= LOCKBIT_ON(locallock->tag.mode);
+ /* And remove the locallock hashtable entry */
RemoveLocalLock(locallock);
}
@@ -1460,11 +1433,19 @@ LockReleaseAll(LOCKMETHODID lockmethodid, bool allxids)
goto next_item;
/*
- * Ignore locks with Xid=0 unless we are asked to release all
- * locks
+ * In allLocks mode, force release of all locks even if locallock
+ * table had problems
*/
- if (TransactionIdEquals(proclock->tag.xid, InvalidTransactionId)
- && !allxids)
+ if (allLocks)
+ proclock->releaseMask = proclock->holdMask;
+ else
+ Assert((proclock->releaseMask & ~proclock->holdMask) == 0);
+
+ /*
+ * Ignore items that have nothing to be released, unless they have
+ * holdMask == 0 and are therefore recyclable
+ */
+ if (proclock->releaseMask == 0 && proclock->holdMask != 0)
goto next_item;
PROCLOCK_PRINT("LockReleaseAll", proclock);
@@ -1475,22 +1456,19 @@ LockReleaseAll(LOCKMETHODID lockmethodid, bool allxids)
Assert((proclock->holdMask & ~lock->grantMask) == 0);
/*
- * fix the general lock stats
+ * Release the previously-marked lock modes
*/
- if (proclock->holdMask)
+ for (i = 1; i <= numLockModes; i++)
{
- for (i = 1; i <= numLockModes; i++)
- {
- if (proclock->holdMask & LOCKBIT_ON(i))
- wakeupNeeded |= UnGrantLock(lock, i, proclock,
- lockMethodTable);
- }
+ if (proclock->releaseMask & LOCKBIT_ON(i))
+ wakeupNeeded |= UnGrantLock(lock, i, proclock,
+ lockMethodTable);
}
Assert((lock->nRequested >= 0) && (lock->nGranted >= 0));
Assert(lock->nGranted <= lock->nRequested);
LOCK_PRINT("LockReleaseAll: updated", lock, 0);
- Assert(proclock->holdMask == 0);
+ proclock->releaseMask = 0;
/* CleanUpLock will wake up waiters if needed. */
CleanUpLock(lockmethodid, lock, proclock, wakeupNeeded);
@@ -1528,8 +1506,6 @@ LockReleaseCurrentOwner(void)
/* Ignore items that must be nontransactional */
if (LOCALLOCK_LOCKMETHOD(*locallock) != DEFAULT_LOCKMETHOD)
continue;
- if (TransactionIdEquals(locallock->tag.xid, InvalidTransactionId))
- continue;
/* Scan to see if there are any locks belonging to current owner */
lockOwners = locallock->lockOwners;
@@ -1558,8 +1534,8 @@ LockReleaseCurrentOwner(void)
locallock->nLocks = 1;
if (!LockRelease(DEFAULT_LOCKMETHOD,
&locallock->tag.lock,
- locallock->tag.xid,
- locallock->tag.mode))
+ locallock->tag.mode,
+ false))
elog(WARNING, "LockReleaseCurrentOwner: failed??");
}
break;
@@ -1594,8 +1570,6 @@ LockReassignCurrentOwner(void)
/* Ignore items that must be nontransactional */
if (LOCALLOCK_LOCKMETHOD(*locallock) != DEFAULT_LOCKMETHOD)
continue;
- if (TransactionIdEquals(locallock->tag.xid, InvalidTransactionId))
- continue;
/*
* Scan to see if there are any locks belonging to current owner
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 597e97a5b0e..b62283cffb4 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/storage/lmgr/proc.c,v 1.158 2005/05/19 21:35:46 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/storage/lmgr/proc.c,v 1.159 2005/06/14 22:15:32 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -644,8 +644,7 @@ ProcSleep(LockMethod lockMethodTable,
lockmode,
lock,
proclock,
- MyProc,
- NULL) == STATUS_OK)
+ MyProc) == STATUS_OK)
{
/* Skip the wait and just grant myself the lock. */
GrantLock(lock, proclock, lockmode);
@@ -846,8 +845,7 @@ ProcLockWakeup(LockMethod lockMethodTable, LOCK *lock)
lockmode,
lock,
proc->waitProcLock,
- proc,
- NULL) == STATUS_OK)
+ proc) == STATUS_OK)
{
/* OK to waken */
GrantLock(lock, proc->waitProcLock, lockmode);