diff options
Diffstat (limited to 'src/backend/storage/lmgr/lock.c')
-rw-r--r-- | src/backend/storage/lmgr/lock.c | 766 |
1 files changed, 553 insertions, 213 deletions
diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c index e61eabb70e9..2849f877678 100644 --- a/src/backend/storage/lmgr/lock.c +++ b/src/backend/storage/lmgr/lock.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/storage/lmgr/lock.c,v 1.136 2004/08/26 17:23:30 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/storage/lmgr/lock.c,v 1.137 2004/08/27 17:07:41 tgl Exp $ * * NOTES * Outside modules can create a lock table and acquire/release @@ -47,12 +47,24 @@ int max_locks_per_xact; /* set by guc.c */ #define NLOCKENTS(maxBackends) (max_locks_per_xact * (maxBackends)) -static int WaitOnLock(LOCKMETHODID lockmethodid, LOCKMODE lockmode, - LOCK *lock, PROCLOCK *proclock); -static void LockCountMyLocks(SHMEM_OFFSET lockOffset, PGPROC *proc, - int *myHolding); +/* + * map from lock method id to the lock table data structures + */ +static LockMethod LockMethods[MAX_LOCK_METHODS]; +static HTAB *LockMethodLockHash[MAX_LOCK_METHODS]; +static HTAB *LockMethodProcLockHash[MAX_LOCK_METHODS]; +static HTAB *LockMethodLocalHash[MAX_LOCK_METHODS]; + +/* exported so lmgr.c can initialize it */ +int NumLockMethods; + -static char *lock_mode_names[] = +/* private state for GrantAwaitedLock */ +static LOCALLOCK *awaitedLock; +static ResourceOwner awaitedOwner; + + +static const char * const lock_mode_names[] = { "INVALID", "AccessShareLock", @@ -134,31 +146,27 @@ PROCLOCK_PRINT(const char *where, const PROCLOCK *proclockP) || (Trace_lock_table && (((LOCK *) MAKE_PTR(proclockP->tag.lock))->tag.relId == Trace_lock_table)) ) elog(LOG, - "%s: proclock(%lx) lock(%lx) tbl(%d) proc(%lx) xid(%u) hold(%d,%d,%d,%d,%d,%d,%d)=%d", + "%s: proclock(%lx) lock(%lx) tbl(%d) proc(%lx) xid(%u) hold(%x)", where, MAKE_OFFSET(proclockP), proclockP->tag.lock, PROCLOCK_LOCKMETHOD(*(proclockP)), proclockP->tag.proc, proclockP->tag.xid, - proclockP->holding[1], proclockP->holding[2], proclockP->holding[3], - proclockP->holding[4], proclockP->holding[5], proclockP->holding[6], - proclockP->holding[7], proclockP->nHolding); + (int) proclockP->holdMask); } #else /* not LOCK_DEBUG */ #define LOCK_PRINT(where, lock, type) #define PROCLOCK_PRINT(where, proclockP) + #endif /* not LOCK_DEBUG */ -/* - * map from lock method id to the lock table structure - */ -static LockMethod LockMethods[MAX_LOCK_METHODS]; -static HTAB* LockMethodLockHash[MAX_LOCK_METHODS]; -static HTAB* LockMethodProcLockHash[MAX_LOCK_METHODS]; - -/* exported so lmgr.c can initialize it */ -int NumLockMethods; +static void RemoveLocalLock(LOCALLOCK *locallock); +static void GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner); +static int WaitOnLock(LOCKMETHODID lockmethodid, LOCALLOCK *locallock, + ResourceOwner owner); +static void LockCountMyLocks(SHMEM_OFFSET lockOffset, PGPROC *proc, + int *myHolding); /* @@ -220,6 +228,7 @@ LockMethodTableInit(const char *tabName, int maxBackends) { LockMethod newLockMethod; + LOCKMETHODID lockmethodid; char *shmemName; HASHCTL info; int hash_flags; @@ -254,40 +263,39 @@ LockMethodTableInit(const char *tabName, { MemSet(newLockMethod, 0, sizeof(LockMethodData)); newLockMethod->masterLock = LockMgrLock; - newLockMethod->lockmethodid = NumLockMethods; LockMethodInit(newLockMethod, conflictsP, numModes); } /* * other modules refer to the lock table by a lockmethod ID */ - LockMethods[NumLockMethods] = newLockMethod; - NumLockMethods++; - Assert(NumLockMethods <= MAX_LOCK_METHODS); + Assert(NumLockMethods < MAX_LOCK_METHODS); + lockmethodid = NumLockMethods++; + LockMethods[lockmethodid] = newLockMethod; /* * allocate a hash table for LOCK structs. This is used to store * per-locked-object information. */ + MemSet(&info, 0, sizeof(info)); info.keysize = sizeof(LOCKTAG); info.entrysize = sizeof(LOCK); info.hash = tag_hash; hash_flags = (HASH_ELEM | HASH_FUNCTION); sprintf(shmemName, "%s (lock hash)", tabName); - LockMethodLockHash[NumLockMethods-1] = ShmemInitHash(shmemName, + LockMethodLockHash[lockmethodid] = ShmemInitHash(shmemName, init_table_size, max_table_size, &info, hash_flags); - if (!LockMethodLockHash[NumLockMethods-1]) + if (!LockMethodLockHash[lockmethodid]) elog(FATAL, "could not initialize lock table \"%s\"", tabName); - Assert(LockMethodLockHash[NumLockMethods-1]->hash == tag_hash); /* * allocate a hash table for PROCLOCK structs. This is used to store - * per-lock-proclock information. + * per-lock-holder information. */ info.keysize = sizeof(PROCLOCKTAG); info.entrysize = sizeof(PROCLOCK); @@ -295,18 +303,44 @@ LockMethodTableInit(const char *tabName, hash_flags = (HASH_ELEM | HASH_FUNCTION); sprintf(shmemName, "%s (proclock hash)", tabName); - LockMethodProcLockHash[NumLockMethods-1] = ShmemInitHash(shmemName, + LockMethodProcLockHash[lockmethodid] = ShmemInitHash(shmemName, init_table_size, max_table_size, &info, hash_flags); - if (!LockMethodProcLockHash[NumLockMethods-1]) + if (!LockMethodProcLockHash[lockmethodid]) + elog(FATAL, "could not initialize lock table \"%s\"", tabName); + + /* + * allocate a non-shared hash table for LOCALLOCK structs. This is used + * to store lock counts and resource owner information. + * + * The non-shared table could already exist in this process (this occurs + * when the postmaster is recreating shared memory after a backend crash). + * If so, delete and recreate it. (We could simply leave it, since it + * ought to be empty in the postmaster, but for safety let's zap it.) + */ + if (LockMethodLocalHash[lockmethodid]) + hash_destroy(LockMethodLocalHash[lockmethodid]); + + info.keysize = sizeof(LOCALLOCKTAG); + info.entrysize = sizeof(LOCALLOCK); + info.hash = tag_hash; + hash_flags = (HASH_ELEM | HASH_FUNCTION); + + sprintf(shmemName, "%s (locallock hash)", tabName); + LockMethodLocalHash[lockmethodid] = hash_create(shmemName, + init_table_size, + &info, + hash_flags); + + if (!LockMethodLocalHash[lockmethodid]) elog(FATAL, "could not initialize lock table \"%s\"", tabName); pfree(shmemName); - return newLockMethod->lockmethodid; + return lockmethodid; } /* @@ -339,6 +373,7 @@ LockMethodTableRename(LOCKMETHODID lockmethodid) LockMethods[newLockMethodId] = LockMethods[lockmethodid]; LockMethodLockHash[newLockMethodId] = LockMethodLockHash[lockmethodid]; LockMethodProcLockHash[newLockMethodId] = LockMethodProcLockHash[lockmethodid]; + LockMethodLocalHash[newLockMethodId] = LockMethodLocalHash[lockmethodid]; return newLockMethodId; } @@ -408,11 +443,13 @@ bool LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag, TransactionId xid, LOCKMODE lockmode, bool dontWait) { + LOCALLOCKTAG localtag; + LOCALLOCK *locallock; + LOCK *lock; PROCLOCK *proclock; PROCLOCKTAG proclocktag; - HTAB *proclockTable; bool found; - LOCK *lock; + ResourceOwner owner; LWLockId masterLock; LockMethod lockMethodTable; int status; @@ -428,9 +465,6 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag, /* ???????? This must be changed when short term locks will be used */ locktag->lockmethodid = lockmethodid; - /* Prepare to record the lock in the current resource owner */ - ResourceOwnerEnlargeLocks(CurrentResourceOwner); - Assert(lockmethodid < NumLockMethods); lockMethodTable = LockMethods[lockmethodid]; if (!lockMethodTable) @@ -439,14 +473,82 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag, return FALSE; } + /* Session locks and user locks are not transactional */ + if (xid != InvalidTransactionId && + lockmethodid == DEFAULT_LOCKMETHOD) + owner = CurrentResourceOwner; + else + owner = NULL; + + /* + * Find or create a LOCALLOCK entry for this lock and lockmode + */ + MemSet(&localtag, 0, sizeof(localtag)); /* must clear padding */ + localtag.lock = *locktag; + localtag.xid = xid; + localtag.mode = lockmode; + + locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash[lockmethodid], + (void *) &localtag, + HASH_ENTER, &found); + if (!locallock) + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of memory"))); + + /* + * if it's a new locallock object, initialize it + */ + if (!found) + { + locallock->lock = NULL; + locallock->proclock = NULL; + locallock->nLocks = 0; + locallock->numLockOwners = 0; + locallock->maxLockOwners = 8; + locallock->lockOwners = NULL; + locallock->lockOwners = (LOCALLOCKOWNER *) + MemoryContextAlloc(TopMemoryContext, + locallock->maxLockOwners * sizeof(LOCALLOCKOWNER)); + } + else + { + /* Make sure there will be room to remember the lock */ + if (locallock->numLockOwners >= locallock->maxLockOwners) + { + int newsize = locallock->maxLockOwners * 2; + + locallock->lockOwners = (LOCALLOCKOWNER *) + repalloc(locallock->lockOwners, + newsize * sizeof(LOCALLOCKOWNER)); + locallock->maxLockOwners = newsize; + } + } + + /* + * If we already hold the lock, we can just increase the count locally. + */ + if (locallock->nLocks > 0) + { + GrantLockLocal(locallock, owner); + return TRUE; + } + + /* + * Otherwise we've got to mess with the shared lock table. + */ masterLock = lockMethodTable->masterLock; LWLockAcquire(masterLock, LW_EXCLUSIVE); /* - * Find or create a lock with this tag + * Find or create a lock with this tag. + * + * Note: if the locallock object already existed, it might have a pointer + * to the lock already ... but we probably should not assume that that + * pointer is valid, since a lock object with no locks can go away + * anytime. */ - Assert(LockMethodLockHash[lockmethodid]->hash == tag_hash); lock = (LOCK *) hash_search(LockMethodLockHash[lockmethodid], (void *) locktag, HASH_ENTER, &found); @@ -458,6 +560,7 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag, errmsg("out of shared memory"), errhint("You may need to increase max_locks_per_transaction."))); } + locallock->lock = lock; /* * if it's a new lock object, initialize it @@ -466,7 +569,7 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag, { lock->grantMask = 0; lock->waitMask = 0; - SHMQueueInit(&(lock->lockHolders)); + SHMQueueInit(&(lock->procLocks)); ProcQueueInit(&(lock->waitProcs)); lock->nRequested = 0; lock->nGranted = 0; @@ -485,8 +588,7 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag, /* * Create the hash key for the proclock table. */ - MemSet(&proclocktag, 0, sizeof(PROCLOCKTAG)); /* must clear padding, - * needed */ + MemSet(&proclocktag, 0, sizeof(PROCLOCKTAG)); /* must clear padding */ proclocktag.lock = MAKE_OFFSET(lock); proclocktag.proc = MAKE_OFFSET(MyProc); TransactionIdStore(xid, &proclocktag.xid); @@ -494,8 +596,7 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag, /* * Find or create a proclock entry with this tag */ - proclockTable = LockMethodProcLockHash[lockmethodid]; - proclock = (PROCLOCK *) hash_search(proclockTable, + proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid], (void *) &proclocktag, HASH_ENTER, &found); if (!proclock) @@ -506,24 +607,23 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag, errmsg("out of shared memory"), errhint("You may need to increase max_locks_per_transaction."))); } + locallock->proclock = proclock; /* * If new, initialize the new entry */ if (!found) { - proclock->nHolding = 0; - MemSet((char *) proclock->holding, 0, sizeof(int) * MAX_LOCKMODES); + proclock->holdMask = 0; /* Add proclock to appropriate lists */ - SHMQueueInsertBefore(&lock->lockHolders, &proclock->lockLink); - SHMQueueInsertBefore(&MyProc->procHolders, &proclock->procLink); + SHMQueueInsertBefore(&lock->procLocks, &proclock->lockLink); + SHMQueueInsertBefore(&MyProc->procLocks, &proclock->procLink); PROCLOCK_PRINT("LockAcquire: new", proclock); } else { PROCLOCK_PRINT("LockAcquire: found", proclock); - Assert((proclock->nHolding >= 0) && (proclock->holding[lockmode] >= 0)); - Assert(proclock->nHolding <= lock->nGranted); + Assert((proclock->holdMask & ~lock->grantMask) == 0); #ifdef CHECK_DEADLOCK_RISK @@ -544,7 +644,7 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag, */ for (i = lockMethodTable->numLockModes; i > 0; i--) { - if (proclock->holding[i] > 0) + if (proclock->holdMask & LOCKBIT_ON(i)) { if (i >= (int) lockmode) break; /* safe: we have a lock >= req level */ @@ -568,29 +668,14 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag, Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0)); /* - * If I already hold one or more locks of the requested type, just - * grant myself another one without blocking. - */ - if (proclock->holding[lockmode] > 0) - { - GrantLock(lock, proclock, lockmode); - ResourceOwnerRememberLock(CurrentResourceOwner, locktag, xid, - lockmode); - PROCLOCK_PRINT("LockAcquire: owning", proclock); - LWLockRelease(masterLock); - return TRUE; - } - - /* - * If this process (under any XID) is a proclock of the lock, also + * If this process (under any XID) is a holder of the lock, just * grant myself another one without blocking. */ LockCountMyLocks(proclock->tag.lock, MyProc, myHolding); if (myHolding[lockmode] > 0) { GrantLock(lock, proclock, lockmode); - ResourceOwnerRememberLock(CurrentResourceOwner, locktag, xid, - lockmode); + GrantLockLocal(locallock, owner); PROCLOCK_PRINT("LockAcquire: my other XID owning", proclock); LWLockRelease(masterLock); return TRUE; @@ -612,8 +697,7 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag, { /* No conflict with held or previously requested locks */ GrantLock(lock, proclock, lockmode); - ResourceOwnerRememberLock(CurrentResourceOwner, locktag, xid, - lockmode); + GrantLockLocal(locallock, owner); } else { @@ -621,29 +705,31 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag, /* * We can't acquire the lock immediately. If caller specified no - * blocking, remove the proclock entry and return FALSE without + * blocking, remove useless table entries and return FALSE without * waiting. */ if (dontWait) { - if (proclock->nHolding == 0) + if (proclock->holdMask == 0) { SHMQueueDelete(&proclock->lockLink); SHMQueueDelete(&proclock->procLink); - proclock = (PROCLOCK *) hash_search(proclockTable, - (void *) proclock, + proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid], + (void *) &(proclock->tag), HASH_REMOVE, NULL); if (!proclock) elog(WARNING, "proclock table corrupted"); } else - PROCLOCK_PRINT("LockAcquire: NHOLDING", proclock); + PROCLOCK_PRINT("LockAcquire: NOWAIT", proclock); lock->nRequested--; lock->requested[lockmode]--; LOCK_PRINT("LockAcquire: conditional lock failed", lock, lockmode); Assert((lock->nRequested > 0) && (lock->requested[lockmode] >= 0)); Assert(lock->nGranted <= lock->nRequested); LWLockRelease(masterLock); + if (locallock->nLocks == 0) + RemoveLocalLock(locallock); return FALSE; } @@ -664,7 +750,7 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag, /* * Sleep till someone wakes me up. */ - status = WaitOnLock(lockmethodid, lockmode, lock, proclock); + status = WaitOnLock(lockmethodid, locallock, owner); /* * NOTE: do not do any material change of state between here and @@ -677,7 +763,7 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag, * Check the proclock entry status, in case something in the ipc * communication doesn't work correctly. */ - if (!((proclock->nHolding > 0) && (proclock->holding[lockmode] > 0))) + if (!(proclock->holdMask & LOCKBIT_ON(lockmode))) { PROCLOCK_PRINT("LockAcquire: INCONSISTENT", proclock); LOCK_PRINT("LockAcquire: INCONSISTENT", lock, lockmode); @@ -695,6 +781,23 @@ LockAcquire(LOCKMETHODID lockmethodid, LOCKTAG *locktag, } /* + * Subroutine to free a locallock entry + */ +static void +RemoveLocalLock(LOCALLOCK *locallock) +{ + LOCKMETHODID lockmethodid = LOCALLOCK_LOCKMETHOD(*locallock); + + pfree(locallock->lockOwners); + locallock->lockOwners = NULL; + locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash[lockmethodid], + (void *) &(locallock->tag), + HASH_REMOVE, NULL); + if (!locallock) + elog(WARNING, "locallock table corrupted"); +} + +/* * LockCheckConflicts -- test whether requested lock conflicts * with those already granted * @@ -788,24 +891,29 @@ LockCheckConflicts(LockMethod lockMethodTable, static void LockCountMyLocks(SHMEM_OFFSET lockOffset, PGPROC *proc, int *myHolding) { - SHM_QUEUE *procHolders = &(proc->procHolders); + SHM_QUEUE *procLocks = &(proc->procLocks); PROCLOCK *proclock; - int i; MemSet(myHolding, 0, MAX_LOCKMODES * sizeof(int)); - proclock = (PROCLOCK *) SHMQueueNext(procHolders, procHolders, + proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks, offsetof(PROCLOCK, procLink)); while (proclock) { if (lockOffset == proclock->tag.lock) { + LOCKMASK holdMask = proclock->holdMask; + int i; + for (i = 1; i < MAX_LOCKMODES; i++) - myHolding[i] += proclock->holding[i]; + { + if (holdMask & LOCKBIT_ON(i)) + myHolding[i]++; + } } - proclock = (PROCLOCK *) SHMQueueNext(procHolders, &proclock->procLink, + proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->procLink, offsetof(PROCLOCK, procLink)); } } @@ -815,10 +923,11 @@ LockCountMyLocks(SHMEM_OFFSET lockOffset, PGPROC *proc, int *myHolding) * the lock request has been granted. * * NOTE: if proc was blocked, it also needs to be removed from the wait list - * and have its waitLock/waitHolder fields cleared. That's not done here. + * and have its waitLock/waitProcLock fields cleared. That's not done here. * - * NOTE: the lock also has to be recorded in the current ResourceOwner; - * but since we may be awaking some other process, we can't do that here. + * NOTE: the lock grant also has to be recorded in the associated LOCALLOCK + * table entry; but since we may be awaking some other process, we can't do + * that here; it's done by GrantLockLocal, instead. */ void GrantLock(LOCK *lock, PROCLOCK *proclock, LOCKMODE lockmode) @@ -828,12 +937,56 @@ GrantLock(LOCK *lock, PROCLOCK *proclock, LOCKMODE lockmode) lock->grantMask |= LOCKBIT_ON(lockmode); if (lock->granted[lockmode] == lock->requested[lockmode]) lock->waitMask &= LOCKBIT_OFF(lockmode); + proclock->holdMask |= LOCKBIT_ON(lockmode); LOCK_PRINT("GrantLock", lock, lockmode); Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0)); Assert(lock->nGranted <= lock->nRequested); - proclock->holding[lockmode]++; - proclock->nHolding++; - Assert((proclock->nHolding > 0) && (proclock->holding[lockmode] > 0)); +} + +/* + * GrantLockLocal -- update the locallock data structures to show + * the lock request has been granted. + * + * We expect that LockAcquire made sure there is room to add a new + * ResourceOwner entry. + */ +static void +GrantLockLocal(LOCALLOCK *locallock, ResourceOwner owner) +{ + LOCALLOCKOWNER *lockOwners = locallock->lockOwners; + int i; + + Assert(locallock->numLockOwners < locallock->maxLockOwners); + /* Count the total */ + locallock->nLocks++; + /* Count the per-owner lock */ + for (i = 0; i < locallock->numLockOwners; i++) + { + if (lockOwners[i].owner == owner) + { + lockOwners[i].nLocks++; + return; + } + } + lockOwners[i].owner = owner; + lockOwners[i].nLocks = 1; + locallock->numLockOwners++; +} + +/* + * GrantAwaitedLock -- call GrantLockLocal for the lock we are doing + * WaitOnLock on. + * + * proc.c needs this for the case where we are booted off the lock by + * timeout, but discover that someone granted us the lock anyway. + * + * We could just export GrantLockLocal, but that would require including + * resowner.h in lock.h, which creates circularity. + */ +void +GrantAwaitedLock(void) +{ + GrantLockLocal(awaitedLock, awaitedOwner); } /* @@ -845,8 +998,8 @@ GrantLock(LOCK *lock, PROCLOCK *proclock, LOCKMODE lockmode) * The locktable's masterLock must be held at entry. */ static int -WaitOnLock(LOCKMETHODID lockmethodid, LOCKMODE lockmode, - LOCK *lock, PROCLOCK *proclock) +WaitOnLock(LOCKMETHODID lockmethodid, LOCALLOCK *locallock, + ResourceOwner owner) { LockMethod lockMethodTable = LockMethods[lockmethodid]; char *new_status, @@ -854,7 +1007,8 @@ WaitOnLock(LOCKMETHODID lockmethodid, LOCKMODE lockmode, Assert(lockmethodid < NumLockMethods); - LOCK_PRINT("WaitOnLock: sleeping on lock", lock, lockmode); + LOCK_PRINT("WaitOnLock: sleeping on lock", + locallock->lock, locallock->tag.mode); old_status = pstrdup(get_ps_display()); new_status = (char *) palloc(strlen(old_status) + 10); @@ -862,6 +1016,9 @@ WaitOnLock(LOCKMETHODID lockmethodid, LOCKMODE lockmode, strcat(new_status, " waiting"); set_ps_display(new_status); + awaitedLock = locallock; + awaitedOwner = owner; + /* * NOTE: Think not to put any shared-state cleanup after the call to * ProcSleep, in either the normal or failure path. The lock state @@ -877,16 +1034,18 @@ WaitOnLock(LOCKMETHODID lockmethodid, LOCKMODE lockmode, */ if (ProcSleep(lockMethodTable, - lockmode, - lock, - proclock) != STATUS_OK) + locallock->tag.mode, + locallock->lock, + locallock->proclock) != STATUS_OK) { /* * We failed as a result of a deadlock, see CheckDeadLock(). Quit * now. Removal of the proclock and lock objects, if no longer * needed, will happen in xact cleanup (see above for motivation). */ - LOCK_PRINT("WaitOnLock: aborting on lock", lock, lockmode); + awaitedLock = NULL; + LOCK_PRINT("WaitOnLock: aborting on lock", + locallock->lock, locallock->tag.mode); LWLockRelease(lockMethodTable->masterLock); /* @@ -897,11 +1056,14 @@ WaitOnLock(LOCKMETHODID lockmethodid, LOCKMODE lockmode, /* not reached */ } + awaitedLock = NULL; + set_ps_display(old_status); pfree(old_status); pfree(new_status); - LOCK_PRINT("WaitOnLock: wakeup on lock", lock, lockmode); + LOCK_PRINT("WaitOnLock: wakeup on lock", + locallock->lock, locallock->tag.mode); return STATUS_OK; } @@ -944,7 +1106,7 @@ RemoveFromWaitQueue(PGPROC *proc) /* Clean up the proc's own state */ proc->waitLock = NULL; - proc->waitHolder = NULL; + proc->waitProcLock = NULL; /* See if any other waiters for the lock can be woken up now */ ProcLockWakeup(GetLocksMethodTable(waitLock), waitLock); @@ -964,12 +1126,12 @@ bool LockRelease(LOCKMETHODID lockmethodid, LOCKTAG *locktag, TransactionId xid, LOCKMODE lockmode) { + LOCALLOCKTAG localtag; + LOCALLOCK *locallock; LOCK *lock; + PROCLOCK *proclock; LWLockId masterLock; LockMethod lockMethodTable; - PROCLOCK *proclock; - PROCLOCKTAG proclocktag; - HTAB *proclockTable; bool wakeupNeeded = false; #ifdef LOCK_DEBUG @@ -980,9 +1142,6 @@ LockRelease(LOCKMETHODID lockmethodid, LOCKTAG *locktag, /* ???????? This must be changed when short term locks will be used */ locktag->lockmethodid = lockmethodid; - /* Record release of the lock in the current resource owner */ - ResourceOwnerForgetLock(CurrentResourceOwner, locktag, xid, lockmode); - Assert(lockmethodid < NumLockMethods); lockMethodTable = LockMethods[lockmethodid]; if (!lockMethodTable) @@ -991,69 +1150,107 @@ LockRelease(LOCKMETHODID lockmethodid, LOCKTAG *locktag, return FALSE; } - masterLock = lockMethodTable->masterLock; - LWLockAcquire(masterLock, LW_EXCLUSIVE); - /* - * Find a lock with this tag + * Find the LOCALLOCK entry for this lock and lockmode */ - Assert(LockMethodLockHash[lockmethodid]->hash == tag_hash); - lock = (LOCK *) hash_search(LockMethodLockHash[lockmethodid], - (void *) locktag, - HASH_FIND, NULL); + MemSet(&localtag, 0, sizeof(localtag)); /* must clear padding */ + localtag.lock = *locktag; + localtag.xid = xid; + localtag.mode = lockmode; + + locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash[lockmethodid], + (void *) &localtag, + HASH_FIND, NULL); /* * let the caller print its own error message, too. Do not * ereport(ERROR). */ - if (!lock) + if (!locallock || locallock->nLocks <= 0) { - LWLockRelease(masterLock); - elog(WARNING, "no such lock"); + elog(WARNING, "you don't own a lock of type %s", + lock_mode_names[lockmode]); return FALSE; } - LOCK_PRINT("LockRelease: found", lock, lockmode); /* - * Find the proclock entry for this proclock. + * Decrease the count for the resource owner. */ - MemSet(&proclocktag, 0, sizeof(PROCLOCKTAG)); /* must clear padding, - * needed */ - proclocktag.lock = MAKE_OFFSET(lock); - proclocktag.proc = MAKE_OFFSET(MyProc); - TransactionIdStore(xid, &proclocktag.xid); - - proclockTable = LockMethodProcLockHash[lockmethodid]; - proclock = (PROCLOCK *) hash_search(proclockTable, - (void *) &proclocktag, - HASH_FIND_SAVE, NULL); - if (!proclock) { - LWLockRelease(masterLock); -#ifdef USER_LOCKS - if (lockmethodid == USER_LOCKMETHOD) - elog(WARNING, "no lock with this tag"); + LOCALLOCKOWNER *lockOwners = locallock->lockOwners; + ResourceOwner owner; + int i; + + /* Session locks and user locks are not transactional */ + if (xid != InvalidTransactionId && + lockmethodid == DEFAULT_LOCKMETHOD) + owner = CurrentResourceOwner; else -#endif - elog(WARNING, "proclock table corrupted"); - return FALSE; + owner = NULL; + + for (i = locallock->numLockOwners - 1; i >= 0; i--) + { + if (lockOwners[i].owner == owner) + { + Assert(lockOwners[i].nLocks > 0); + if (--lockOwners[i].nLocks == 0) + { + /* compact out unused slot */ + locallock->numLockOwners--; + if (i < locallock->numLockOwners) + lockOwners[i] = lockOwners[locallock->numLockOwners]; + } + break; + } + } + if (i < 0) + { + /* don't release a lock belonging to another owner */ + elog(WARNING, "you don't own a lock of type %s", + lock_mode_names[lockmode]); + return FALSE; + } } + + /* + * Decrease the total local count. If we're still holding the lock, + * we're done. + */ + locallock->nLocks--; + + if (locallock->nLocks > 0) + return TRUE; + + /* + * Otherwise we've got to mess with the shared lock table. + */ + masterLock = lockMethodTable->masterLock; + + LWLockAcquire(masterLock, LW_EXCLUSIVE); + + /* + * We don't need to re-find the lock or proclock, since we kept their + * addresses in the locallock table, and they couldn't have been + * removed while we were holding a lock on them. + */ + lock = locallock->lock; + LOCK_PRINT("LockRelease: found", lock, lockmode); + proclock = locallock->proclock; PROCLOCK_PRINT("LockRelease: found", proclock); /* - * Check that we are actually holding a lock of the type we want to + * Double-check that we are actually holding a lock of the type we want to * release. */ - if (!(proclock->holding[lockmode] > 0)) + if (!(proclock->holdMask & LOCKBIT_ON(lockmode))) { PROCLOCK_PRINT("LockRelease: WRONGTYPE", proclock); - Assert(proclock->holding[lockmode] >= 0); LWLockRelease(masterLock); elog(WARNING, "you don't own a lock of type %s", lock_mode_names[lockmode]); + RemoveLocalLock(locallock); return FALSE; } - Assert(proclock->nHolding > 0); Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0)); Assert((lock->nGranted > 0) && (lock->granted[lockmode] > 0)); Assert(lock->nGranted <= lock->nRequested); @@ -1089,67 +1286,69 @@ LockRelease(LOCKMETHODID lockmethodid, LOCKTAG *locktag, if (lockMethodTable->conflictTab[lockmode] & lock->waitMask) wakeupNeeded = true; - if (lock->nRequested == 0) - { - /* - * if there's no one waiting in the queue, we just released the - * last lock on this object. Delete it from the lock table. - */ - Assert(LockMethodLockHash[lockmethodid]->hash == tag_hash); - lock = (LOCK *) hash_search(LockMethodLockHash[lockmethodid], - (void *) &(lock->tag), - HASH_REMOVE, - NULL); - if (!lock) - { - LWLockRelease(masterLock); - elog(WARNING, "lock table corrupted"); - return FALSE; - } - wakeupNeeded = false; /* should be false, but make sure */ - } - /* - * Now fix the per-proclock lock stats. + * Now fix the per-proclock state. */ - proclock->holding[lockmode]--; - proclock->nHolding--; + proclock->holdMask &= LOCKBIT_OFF(lockmode); PROCLOCK_PRINT("LockRelease: updated", proclock); - Assert((proclock->nHolding >= 0) && (proclock->holding[lockmode] >= 0)); /* * If this was my last hold on this lock, delete my entry in the * proclock table. */ - if (proclock->nHolding == 0) + if (proclock->holdMask == 0) { PROCLOCK_PRINT("LockRelease: deleting", proclock); SHMQueueDelete(&proclock->lockLink); SHMQueueDelete(&proclock->procLink); - proclock = (PROCLOCK *) hash_search(proclockTable, - (void *) &proclock, - HASH_REMOVE_SAVED, NULL); + proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid], + (void *) &(proclock->tag), + HASH_REMOVE, NULL); if (!proclock) { LWLockRelease(masterLock); elog(WARNING, "proclock table corrupted"); + RemoveLocalLock(locallock); return FALSE; } } - /* - * Wake up waiters if needed. - */ - if (wakeupNeeded) - ProcLockWakeup(lockMethodTable, lock); + if (lock->nRequested == 0) + { + /* + * We've just released the last lock, so garbage-collect the + * lock object. + */ + Assert(SHMQueueEmpty(&(lock->procLocks))); + lock = (LOCK *) hash_search(LockMethodLockHash[lockmethodid], + (void *) &(lock->tag), + HASH_REMOVE, NULL); + if (!lock) + { + LWLockRelease(masterLock); + elog(WARNING, "lock table corrupted"); + RemoveLocalLock(locallock); + return FALSE; + } + } + else + { + /* + * Wake up waiters if needed. + */ + if (wakeupNeeded) + ProcLockWakeup(lockMethodTable, lock); + } LWLockRelease(masterLock); + + RemoveLocalLock(locallock); return TRUE; } /* * LockReleaseAll -- Release all locks of the specified lock method that - * are held by the specified process. + * are held by the current process. * * Well, not necessarily *all* locks. The available behaviors are: * @@ -1160,22 +1359,21 @@ LockRelease(LOCKMETHODID lockmethodid, LOCKTAG *locktag, * (zero is the Xid used for "session" locks). */ bool -LockReleaseAll(LOCKMETHODID lockmethodid, PGPROC *proc, - bool allxids) +LockReleaseAll(LOCKMETHODID lockmethodid, bool allxids) { - SHM_QUEUE *procHolders = &(proc->procHolders); - PROCLOCK *proclock; - PROCLOCK *nextHolder; + HASH_SEQ_STATUS status; + SHM_QUEUE *procLocks = &(MyProc->procLocks); LWLockId masterLock; LockMethod lockMethodTable; int i, numLockModes; + LOCALLOCK *locallock; + PROCLOCK *proclock; LOCK *lock; #ifdef LOCK_DEBUG if (lockmethodid == USER_LOCKMETHOD ? Trace_userlocks : Trace_locks) - elog(LOG, "LockReleaseAll: lockmethod=%d, pid=%d", - lockmethodid, proc->pid); + elog(LOG, "LockReleaseAll: lockmethod=%d", lockmethodid); #endif Assert(lockmethodid < NumLockMethods); @@ -1189,20 +1387,55 @@ LockReleaseAll(LOCKMETHODID lockmethodid, PGPROC *proc, numLockModes = lockMethodTable->numLockModes; masterLock = lockMethodTable->masterLock; + /* + * First we run through the locallock table and get rid of unwanted + * entries, then we scan the process's proclocks and get rid of those. + * We do this separately because we may have multiple locallock entries + * pointing to the same proclock, and we daren't end up with any + * dangling pointers. + */ + hash_seq_init(&status, LockMethodLocalHash[lockmethodid]); + + while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL) + { + if (locallock->proclock == NULL || locallock->lock == NULL) + { + /* + * We must've run out of shared memory while trying to set up + * this lock. Just forget the local entry. + */ + Assert(locallock->nLocks == 0); + RemoveLocalLock(locallock); + continue; + } + + /* Ignore items that are not of the lockmethod to be removed */ + if (LOCALLOCK_LOCKMETHOD(*locallock) != lockmethodid) + continue; + + /* Ignore locks with Xid=0 unless we are asked to release all locks */ + if (TransactionIdEquals(locallock->tag.xid, InvalidTransactionId) + && !allxids) + continue; + + RemoveLocalLock(locallock); + } + LWLockAcquire(masterLock, LW_EXCLUSIVE); - proclock = (PROCLOCK *) SHMQueueNext(procHolders, procHolders, + proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks, offsetof(PROCLOCK, procLink)); while (proclock) { bool wakeupNeeded = false; + PROCLOCK *nextHolder; /* Get link first, since we may unlink/delete this proclock */ - nextHolder = (PROCLOCK *) SHMQueueNext(procHolders, &proclock->procLink, + nextHolder = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->procLink, offsetof(PROCLOCK, procLink)); - Assert(proclock->tag.proc == MAKE_OFFSET(proc)); + Assert(proclock->tag.proc == MAKE_OFFSET(MyProc)); lock = (LOCK *) MAKE_PTR(proclock->tag.lock); @@ -1220,24 +1453,24 @@ LockReleaseAll(LOCKMETHODID lockmethodid, PGPROC *proc, Assert(lock->nRequested >= 0); Assert(lock->nGranted >= 0); Assert(lock->nGranted <= lock->nRequested); - Assert(proclock->nHolding >= 0); - Assert(proclock->nHolding <= lock->nRequested); + Assert((proclock->holdMask & ~lock->grantMask) == 0); /* * fix the general lock stats */ - if (lock->nRequested != proclock->nHolding) + if (proclock->holdMask) { for (i = 1; i <= numLockModes; i++) { - Assert(proclock->holding[i] >= 0); - if (proclock->holding[i] > 0) + if (proclock->holdMask & LOCKBIT_ON(i)) { - lock->requested[i] -= proclock->holding[i]; - lock->granted[i] -= proclock->holding[i]; + lock->requested[i]--; + lock->granted[i]--; Assert(lock->requested[i] >= 0 && lock->granted[i] >= 0); if (lock->granted[i] == 0) lock->grantMask &= LOCKBIT_OFF(i); + lock->nRequested--; + lock->nGranted--; /* * Read comments in LockRelease @@ -1247,26 +1480,9 @@ LockReleaseAll(LOCKMETHODID lockmethodid, PGPROC *proc, wakeupNeeded = true; } } - lock->nRequested -= proclock->nHolding; - lock->nGranted -= proclock->nHolding; - Assert((lock->nRequested >= 0) && (lock->nGranted >= 0)); - Assert(lock->nGranted <= lock->nRequested); - } - else - { - /* - * This proclock accounts for all the requested locks on the - * object, so we can be lazy and just zero things out. - */ - lock->nRequested = 0; - lock->nGranted = 0; - /* Fix the lock status, just for next LOCK_PRINT message. */ - for (i = 1; i <= numLockModes; i++) - { - Assert(lock->requested[i] == lock->granted[i]); - lock->requested[i] = lock->granted[i] = 0; - } } + Assert((lock->nRequested >= 0) && (lock->nGranted >= 0)); + Assert(lock->nGranted <= lock->nRequested); LOCK_PRINT("LockReleaseAll: updated", lock, 0); PROCLOCK_PRINT("LockReleaseAll: deleting", proclock); @@ -1281,7 +1497,7 @@ LockReleaseAll(LOCKMETHODID lockmethodid, PGPROC *proc, * remove the proclock entry from the hashtable */ proclock = (PROCLOCK *) hash_search(LockMethodProcLockHash[lockmethodid], - (void *) proclock, + (void *) &(proclock->tag), HASH_REMOVE, NULL); if (!proclock) @@ -1298,7 +1514,7 @@ LockReleaseAll(LOCKMETHODID lockmethodid, PGPROC *proc, * lock object. */ LOCK_PRINT("LockReleaseAll: deleting", lock, 0); - Assert(LockMethodLockHash[lockmethodid]->hash == tag_hash); + Assert(SHMQueueEmpty(&(lock->procLocks))); lock = (LOCK *) hash_search(LockMethodLockHash[lockmethodid], (void *) &(lock->tag), HASH_REMOVE, NULL); @@ -1326,6 +1542,130 @@ next_item: return TRUE; } +/* + * LockReleaseCurrentOwner + * Release all locks belonging to CurrentResourceOwner + * + * Only DEFAULT_LOCKMETHOD locks can belong to a resource owner. + */ +void +LockReleaseCurrentOwner(void) +{ + HASH_SEQ_STATUS status; + LOCALLOCK *locallock; + LOCALLOCKOWNER *lockOwners; + int i; + + hash_seq_init(&status, LockMethodLocalHash[DEFAULT_LOCKMETHOD]); + + while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL) + { + /* Ignore items that must be nontransactional */ + if (LOCALLOCK_LOCKMETHOD(*locallock) != DEFAULT_LOCKMETHOD) + continue; + if (TransactionIdEquals(locallock->tag.xid, InvalidTransactionId)) + continue; + + /* Scan to see if there are any locks belonging to current owner */ + lockOwners = locallock->lockOwners; + for (i = locallock->numLockOwners - 1; i >= 0; i--) + { + if (lockOwners[i].owner == CurrentResourceOwner) + { + Assert(lockOwners[i].nLocks > 0); + if (lockOwners[i].nLocks < locallock->nLocks) + { + /* + * We will still hold this lock after forgetting this + * ResourceOwner. + */ + locallock->nLocks -= lockOwners[i].nLocks; + /* compact out unused slot */ + locallock->numLockOwners--; + if (i < locallock->numLockOwners) + lockOwners[i] = lockOwners[locallock->numLockOwners]; + } + else + { + Assert(lockOwners[i].nLocks == locallock->nLocks); + /* We want to call LockRelease just once */ + lockOwners[i].nLocks = 1; + locallock->nLocks = 1; + if (!LockRelease(DEFAULT_LOCKMETHOD, + &locallock->tag.lock, + locallock->tag.xid, + locallock->tag.mode)) + elog(WARNING, "LockReleaseCurrentOwner: failed??"); + } + break; + } + } + } +} + +/* + * LockReassignCurrentOwner + * Reassign all locks belonging to CurrentResourceOwner to belong + * to its parent resource owner + */ +void +LockReassignCurrentOwner(void) +{ + ResourceOwner parent = ResourceOwnerGetParent(CurrentResourceOwner); + HASH_SEQ_STATUS status; + LOCALLOCK *locallock; + LOCALLOCKOWNER *lockOwners; + + Assert(parent != NULL); + + hash_seq_init(&status, LockMethodLocalHash[DEFAULT_LOCKMETHOD]); + + while ((locallock = (LOCALLOCK *) hash_seq_search(&status)) != NULL) + { + int i; + int ic = -1; + int ip = -1; + + /* Ignore items that must be nontransactional */ + if (LOCALLOCK_LOCKMETHOD(*locallock) != DEFAULT_LOCKMETHOD) + continue; + if (TransactionIdEquals(locallock->tag.xid, InvalidTransactionId)) + continue; + + /* + * Scan to see if there are any locks belonging to current owner + * or its parent + */ + lockOwners = locallock->lockOwners; + for (i = locallock->numLockOwners - 1; i >= 0; i--) + { + if (lockOwners[i].owner == CurrentResourceOwner) + ic = i; + else if (lockOwners[i].owner == parent) + ip = i; + } + + if (ic < 0) + continue; /* no current locks */ + + if (ip < 0) + { + /* Parent has no slot, so just give it child's slot */ + lockOwners[ic].owner = parent; + } + else + { + /* Merge child's count with parent's */ + lockOwners[ip].nLocks += lockOwners[ic].nLocks; + /* compact out unused slot */ + locallock->numLockOwners--; + if (ic < locallock->numLockOwners) + lockOwners[ic] = lockOwners[locallock->numLockOwners]; + } + } +} + + int LockShmemSize(int maxBackends) { @@ -1420,7 +1760,7 @@ GetLockmodeName(LOCKMODE mode) #ifdef LOCK_DEBUG /* - * Dump all locks in the proc->procHolders list. + * Dump all locks in the MyProc->procLocks list. * * Must have already acquired the masterLock. */ @@ -1428,7 +1768,7 @@ void DumpLocks(void) { PGPROC *proc; - SHM_QUEUE *procHolders; + SHM_QUEUE *procLocks; PROCLOCK *proclock; LOCK *lock; int lockmethodid = DEFAULT_LOCKMETHOD; @@ -1438,7 +1778,7 @@ DumpLocks(void) if (proc == NULL) return; - procHolders = &proc->procHolders; + procLocks = &proc->procLocks; Assert(lockmethodid < NumLockMethods); lockMethodTable = LockMethods[lockmethodid]; @@ -1448,7 +1788,7 @@ DumpLocks(void) if (proc->waitLock) LOCK_PRINT("DumpLocks: waiting on", proc->waitLock, 0); - proclock = (PROCLOCK *) SHMQueueNext(procHolders, procHolders, + proclock = (PROCLOCK *) SHMQueueNext(procLocks, procLocks, offsetof(PROCLOCK, procLink)); while (proclock) @@ -1460,7 +1800,7 @@ DumpLocks(void) PROCLOCK_PRINT("DumpLocks", proclock); LOCK_PRINT("DumpLocks", lock, 0); - proclock = (PROCLOCK *) SHMQueueNext(procHolders, &proclock->procLink, + proclock = (PROCLOCK *) SHMQueueNext(procLocks, &proclock->procLink, offsetof(PROCLOCK, procLink)); } } |