diff options
Diffstat (limited to 'src/backend/storage/lmgr')
-rw-r--r-- | src/backend/storage/lmgr/deadlock.c | 188 | ||||
-rw-r--r-- | src/backend/storage/lmgr/lmgr.c | 5 | ||||
-rw-r--r-- | src/backend/storage/lmgr/lock.c | 203 | ||||
-rw-r--r-- | src/backend/storage/lmgr/proc.c | 86 |
4 files changed, 263 insertions, 219 deletions
diff --git a/src/backend/storage/lmgr/deadlock.c b/src/backend/storage/lmgr/deadlock.c index 31db44e74b0..160fc64fb24 100644 --- a/src/backend/storage/lmgr/deadlock.c +++ b/src/backend/storage/lmgr/deadlock.c @@ -12,7 +12,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/deadlock.c,v 1.2 2001/01/25 03:45:50 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/deadlock.c,v 1.3 2001/03/22 03:59:46 momjian Exp $ * * Interface: * @@ -29,32 +29,36 @@ /* One edge in the waits-for graph */ -typedef struct { - PROC *waiter; /* the waiting process */ - PROC *blocker; /* the process it is waiting for */ - int pred; /* workspace for TopoSort */ - int link; /* workspace for TopoSort */ +typedef struct +{ + PROC *waiter; /* the waiting process */ + PROC *blocker; /* the process it is waiting for */ + int pred; /* workspace for TopoSort */ + int link; /* workspace for TopoSort */ } EDGE; /* One potential reordering of a lock's wait queue */ -typedef struct { - LOCK *lock; /* the lock whose wait queue is described */ - PROC **procs; /* array of PROC *'s in new wait order */ - int nProcs; +typedef struct +{ + LOCK *lock; /* the lock whose wait queue is described */ + PROC **procs; /* array of PROC *'s in new wait order */ + int nProcs; } WAIT_ORDER; static bool DeadLockCheckRecurse(PROC *proc); static bool TestConfiguration(PROC *startProc); static bool FindLockCycle(PROC *checkProc, - EDGE *softEdges, int *nSoftEdges); + EDGE *softEdges, int *nSoftEdges); static bool FindLockCycleRecurse(PROC *checkProc, - EDGE *softEdges, int *nSoftEdges); + EDGE *softEdges, int *nSoftEdges); static bool ExpandConstraints(EDGE *constraints, int nConstraints); static bool TopoSort(LOCK *lock, EDGE *constraints, int nConstraints, - PROC **ordering); + PROC **ordering); + #ifdef DEBUG_DEADLOCK static void PrintLockQueue(LOCK *lock, const char *info); + #endif @@ -64,30 +68,34 @@ static void PrintLockQueue(LOCK *lock, const char *info); /* Workspace for FindLockCycle */ static PROC **visitedProcs; /* Array of visited procs */ -static int nVisitedProcs; +static int nVisitedProcs; + /* Workspace for TopoSort */ static PROC **topoProcs; /* Array of not-yet-output procs */ static int *beforeConstraints; /* Counts of remaining before-constraints */ static int *afterConstraints; /* List head for after-constraints */ + /* Output area for ExpandConstraints */ static WAIT_ORDER *waitOrders; /* Array of proposed queue rearrangements */ -static int nWaitOrders; +static int nWaitOrders; static PROC **waitOrderProcs; /* Space for waitOrders queue contents */ + /* Current list of constraints being considered */ static EDGE *curConstraints; -static int nCurConstraints; -static int maxCurConstraints; +static int nCurConstraints; +static int maxCurConstraints; + /* Storage space for results from FindLockCycle */ static EDGE *possibleConstraints; -static int nPossibleConstraints; -static int maxPossibleConstraints; +static int nPossibleConstraints; +static int maxPossibleConstraints; /* * InitDeadLockChecking -- initialize deadlock checker during backend startup * * This does per-backend initialization of the deadlock checker; primarily, - * allocation of working memory for DeadLockCheck. We do this per-backend + * allocation of working memory for DeadLockCheck. We do this per-backend * since there's no percentage in making the kernel do copy-on-write * inheritance of workspace from the postmaster. We want to allocate the * space at startup because the deadlock checker might be invoked when there's @@ -96,7 +104,7 @@ static int maxPossibleConstraints; void InitDeadLockChecking(void) { - MemoryContext oldcxt; + MemoryContext oldcxt; /* Make sure allocations are permanent */ oldcxt = MemoryContextSwitchTo(TopMemoryContext); @@ -116,20 +124,21 @@ InitDeadLockChecking(void) /* * We need to consider rearranging at most MaxBackends/2 wait queues - * (since it takes at least two waiters in a queue to create a soft edge), - * and the expanded form of the wait queues can't involve more than - * MaxBackends total waiters. + * (since it takes at least two waiters in a queue to create a soft + * edge), and the expanded form of the wait queues can't involve more + * than MaxBackends total waiters. */ - waitOrders = (WAIT_ORDER *) palloc((MaxBackends/2) * sizeof(WAIT_ORDER)); + waitOrders = (WAIT_ORDER *) palloc((MaxBackends / 2) * sizeof(WAIT_ORDER)); waitOrderProcs = (PROC **) palloc(MaxBackends * sizeof(PROC *)); /* * Allow at most MaxBackends distinct constraints in a configuration. - * (Is this enough? In practice it seems it should be, but I don't quite - * see how to prove it. If we run out, we might fail to find a workable - * wait queue rearrangement even though one exists.) NOTE that this - * number limits the maximum recursion depth of DeadLockCheckRecurse. - * Making it really big might potentially allow a stack-overflow problem. + * (Is this enough? In practice it seems it should be, but I don't + * quite see how to prove it. If we run out, we might fail to find a + * workable wait queue rearrangement even though one exists.) NOTE + * that this number limits the maximum recursion depth of + * DeadLockCheckRecurse. Making it really big might potentially allow + * a stack-overflow problem. */ maxCurConstraints = MaxBackends; curConstraints = (EDGE *) palloc(maxCurConstraints * sizeof(EDGE)); @@ -139,8 +148,8 @@ InitDeadLockChecking(void) * re-run TestConfiguration. (This is probably more than enough, but * we can survive if we run low on space by doing excess runs of * TestConfiguration to re-compute constraint lists each time needed.) - * The last MaxBackends entries in possibleConstraints[] are reserved as - * output workspace for FindLockCycle. + * The last MaxBackends entries in possibleConstraints[] are reserved + * as output workspace for FindLockCycle. */ maxPossibleConstraints = MaxBackends * 4; possibleConstraints = @@ -185,9 +194,9 @@ DeadLockCheck(PROC *proc) /* Apply any needed rearrangements of wait queues */ for (i = 0; i < nWaitOrders; i++) { - LOCK *lock = waitOrders[i].lock; - PROC **procs = waitOrders[i].procs; - int nProcs = waitOrders[i].nProcs; + LOCK *lock = waitOrders[i].lock; + PROC **procs = waitOrders[i].procs; + int nProcs = waitOrders[i].nProcs; PROC_QUEUE *waitQueue = &(lock->waitProcs); Assert(nProcs == waitQueue->size); @@ -218,10 +227,10 @@ DeadLockCheck(PROC *proc) * DeadLockCheckRecurse -- recursively search for valid orderings * * curConstraints[] holds the current set of constraints being considered - * by an outer level of recursion. Add to this each possible solution + * by an outer level of recursion. Add to this each possible solution * constraint for any cycle detected at this level. * - * Returns TRUE if no solution exists. Returns FALSE if a deadlock-free + * Returns TRUE if no solution exists. Returns FALSE if a deadlock-free * state is attainable, in which case waitOrders[] shows the required * rearrangements of lock wait queues (if any). */ @@ -252,6 +261,7 @@ DeadLockCheckRecurse(PROC *proc) /* Not room; will need to regenerate the edges on-the-fly */ savedList = false; } + /* * Try each available soft edge as an addition to the configuration. */ @@ -264,7 +274,7 @@ DeadLockCheckRecurse(PROC *proc) elog(FATAL, "DeadLockCheckRecurse: inconsistent results"); } curConstraints[nCurConstraints] = - possibleConstraints[oldPossibleConstraints+i]; + possibleConstraints[oldPossibleConstraints + i]; nCurConstraints++; if (!DeadLockCheckRecurse(proc)) return false; /* found a valid solution! */ @@ -293,25 +303,27 @@ DeadLockCheckRecurse(PROC *proc) static bool TestConfiguration(PROC *startProc) { - int softFound = 0; - EDGE *softEdges = possibleConstraints + nPossibleConstraints; - int nSoftEdges; - int i; + int softFound = 0; + EDGE *softEdges = possibleConstraints + nPossibleConstraints; + int nSoftEdges; + int i; /* * Make sure we have room for FindLockCycle's output. */ if (nPossibleConstraints + MaxBackends > maxPossibleConstraints) return -1; + /* * Expand current constraint set into wait orderings. Fail if the * constraint set is not self-consistent. */ if (!ExpandConstraints(curConstraints, nCurConstraints)) return -1; + /* * Check for cycles involving startProc or any of the procs mentioned - * in constraints. We check startProc last because if it has a soft + * in constraints. We check startProc last because if it has a soft * cycle still to be dealt with, we want to deal with that first. */ for (i = 0; i < nCurConstraints; i++) @@ -350,7 +362,7 @@ TestConfiguration(PROC *startProc) * * Since we need to be able to check hypothetical configurations that would * exist after wait queue rearrangement, the routine pays attention to the - * table of hypothetical queue orders in waitOrders[]. These orders will + * table of hypothetical queue orders in waitOrders[]. These orders will * be believed in preference to the actual ordering seen in the locktable. */ static bool @@ -391,9 +403,10 @@ FindLockCycleRecurse(PROC *checkProc, /* If we return to starting point, we have a deadlock cycle */ if (i == 0) return true; + /* - * Otherwise, we have a cycle but it does not include the start - * point, so say "no deadlock". + * Otherwise, we have a cycle but it does not include the + * start point, so say "no deadlock". */ return false; } @@ -401,6 +414,7 @@ FindLockCycleRecurse(PROC *checkProc, /* Mark proc as seen */ Assert(nVisitedProcs < MaxBackends); visitedProcs[nVisitedProcs++] = checkProc; + /* * If the proc is not waiting, we have no outgoing waits-for edges. */ @@ -413,8 +427,9 @@ FindLockCycleRecurse(PROC *checkProc, lockctl = lockMethodTable->ctl; numLockModes = lockctl->numLockModes; conflictMask = lockctl->conflictTab[checkProc->waitLockMode]; + /* - * Scan for procs that already hold conflicting locks. These are + * Scan for procs that already hold conflicting locks. These are * "hard" edges in the waits-for graph. */ lockHolders = &(lock->lockHolders); @@ -449,12 +464,13 @@ FindLockCycleRecurse(PROC *checkProc, /* * Scan for procs that are ahead of this one in the lock's wait queue. - * Those that have conflicting requests soft-block this one. This must - * be done after the hard-block search, since if another proc both - * hard- and soft-blocks this one, we want to call it a hard edge. + * Those that have conflicting requests soft-block this one. This + * must be done after the hard-block search, since if another proc + * both hard- and soft-blocks this one, we want to call it a hard + * edge. * - * If there is a proposed re-ordering of the lock's wait order, - * use that rather than the current wait order. + * If there is a proposed re-ordering of the lock's wait order, use that + * rather than the current wait order. */ for (i = 0; i < nWaitOrders; i++) { @@ -465,7 +481,7 @@ FindLockCycleRecurse(PROC *checkProc, if (i < nWaitOrders) { /* Use the given hypothetical wait queue order */ - PROC **procs = waitOrders[i].procs; + PROC **procs = waitOrders[i].procs; queue_size = waitOrders[i].nProcs; @@ -483,7 +499,11 @@ FindLockCycleRecurse(PROC *checkProc, /* This proc soft-blocks checkProc */ if (FindLockCycleRecurse(proc, softEdges, nSoftEdges)) { - /* Add this edge to the list of soft edges in the cycle */ + + /* + * Add this edge to the list of soft edges in the + * cycle + */ Assert(*nSoftEdges < MaxBackends); softEdges[*nSoftEdges].waiter = checkProc; softEdges[*nSoftEdges].blocker = proc; @@ -513,7 +533,11 @@ FindLockCycleRecurse(PROC *checkProc, /* This proc soft-blocks checkProc */ if (FindLockCycleRecurse(proc, softEdges, nSoftEdges)) { - /* Add this edge to the list of soft edges in the cycle */ + + /* + * Add this edge to the list of soft edges in the + * cycle + */ Assert(*nSoftEdges < MaxBackends); softEdges[*nSoftEdges].waiter = checkProc; softEdges[*nSoftEdges].blocker = proc; @@ -553,18 +577,19 @@ ExpandConstraints(EDGE *constraints, j; nWaitOrders = 0; + /* - * Scan constraint list backwards. This is because the last-added + * Scan constraint list backwards. This is because the last-added * constraint is the only one that could fail, and so we want to test * it for inconsistency first. */ - for (i = nConstraints; --i >= 0; ) + for (i = nConstraints; --i >= 0;) { - PROC *proc = constraints[i].waiter; - LOCK *lock = proc->waitLock; + PROC *proc = constraints[i].waiter; + LOCK *lock = proc->waitLock; /* Did we already make a list for this lock? */ - for (j = nWaitOrders; --j >= 0; ) + for (j = nWaitOrders; --j >= 0;) { if (waitOrders[j].lock == lock) break; @@ -577,11 +602,12 @@ ExpandConstraints(EDGE *constraints, waitOrders[nWaitOrders].nProcs = lock->waitProcs.size; nWaitOrderProcs += lock->waitProcs.size; Assert(nWaitOrderProcs <= MaxBackends); + /* * Do the topo sort. TopoSort need not examine constraints after * this one, since they must be for different locks. */ - if (!TopoSort(lock, constraints, i+1, + if (!TopoSort(lock, constraints, i + 1, waitOrders[nWaitOrders].procs)) return false; nWaitOrders++; @@ -607,7 +633,7 @@ ExpandConstraints(EDGE *constraints, * The initial queue ordering is taken directly from the lock's wait queue. * The output is an array of PROC pointers, of length equal to the lock's * wait queue length (the caller is responsible for providing this space). - * The partial order is specified by an array of EDGE structs. Each EDGE + * The partial order is specified by an array of EDGE structs. Each EDGE * is one that we need to reverse, therefore the "waiter" must appear before * the "blocker" in the output array. The EDGE array may well contain * edges associated with other locks; these should be ignored. @@ -638,14 +664,15 @@ TopoSort(LOCK *lock, } /* - * Scan the constraints, and for each proc in the array, generate a count - * of the number of constraints that say it must be before something else, - * plus a list of the constraints that say it must be after something else. - * The count for the j'th proc is stored in beforeConstraints[j], and the - * head of its list in afterConstraints[j]. Each constraint stores its - * list link in constraints[i].link (note any constraint will be in - * just one list). The array index for the before-proc of the i'th - * constraint is remembered in constraints[i].pred. + * Scan the constraints, and for each proc in the array, generate a + * count of the number of constraints that say it must be before + * something else, plus a list of the constraints that say it must be + * after something else. The count for the j'th proc is stored in + * beforeConstraints[j], and the head of its list in + * afterConstraints[j]. Each constraint stores its list link in + * constraints[i].link (note any constraint will be in just one list). + * The array index for the before-proc of the i'th constraint is + * remembered in constraints[i].pred. */ MemSet(beforeConstraints, 0, queue_size * sizeof(int)); MemSet(afterConstraints, 0, queue_size * sizeof(int)); @@ -656,7 +683,7 @@ TopoSort(LOCK *lock, if (proc->waitLock != lock) continue; /* Find the waiter proc in the array */ - for (j = queue_size; --j >= 0; ) + for (j = queue_size; --j >= 0;) { if (topoProcs[j] == proc) break; @@ -664,20 +691,20 @@ TopoSort(LOCK *lock, Assert(j >= 0); /* should have found a match */ /* Find the blocker proc in the array */ proc = constraints[i].blocker; - for (k = queue_size; --k >= 0; ) + for (k = queue_size; --k >= 0;) { if (topoProcs[k] == proc) break; } Assert(k >= 0); /* should have found a match */ - beforeConstraints[j]++; /* waiter must come before */ + beforeConstraints[j]++; /* waiter must come before */ /* add this constraint to list of after-constraints for blocker */ constraints[i].pred = j; constraints[i].link = afterConstraints[k]; - afterConstraints[k] = i+1; + afterConstraints[k] = i + 1; } /*-------------------- - * Now scan the topoProcs array backwards. At each step, output the + * Now scan the topoProcs array backwards. At each step, output the * last proc that has no remaining before-constraints, and decrease * the beforeConstraints count of each of the procs it was constrained * against. @@ -687,8 +714,8 @@ TopoSort(LOCK *lock, * last = last non-null index in topoProcs (avoid redundant searches) *-------------------- */ - last = queue_size-1; - for (i = queue_size; --i >= 0; ) + last = queue_size - 1; + for (i = queue_size; --i >= 0;) { /* Find next candidate to output */ while (topoProcs[last] == NULL) @@ -705,10 +732,8 @@ TopoSort(LOCK *lock, ordering[i] = topoProcs[j]; topoProcs[j] = NULL; /* Update beforeConstraints counts of its predecessors */ - for (k = afterConstraints[j]; k > 0; k = constraints[k-1].link) - { - beforeConstraints[constraints[k-1].pred]--; - } + for (k = afterConstraints[j]; k > 0; k = constraints[k - 1].link) + beforeConstraints[constraints[k - 1].pred]--; } /* Done */ @@ -734,4 +759,5 @@ PrintLockQueue(LOCK *lock, const char *info) printf("\n"); fflush(stdout); } + #endif diff --git a/src/backend/storage/lmgr/lmgr.c b/src/backend/storage/lmgr/lmgr.c index a042bbe3ee6..c06f76bfcdb 100644 --- a/src/backend/storage/lmgr/lmgr.c +++ b/src/backend/storage/lmgr/lmgr.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lmgr.c,v 1.44 2001/01/24 19:43:07 momjian Exp $ + * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lmgr.c,v 1.45 2001/03/22 03:59:46 momjian Exp $ * *------------------------------------------------------------------------- */ @@ -174,7 +174,7 @@ UnlockRelation(Relation relation, LOCKMODE lockmode) /* * LockRelationForSession * - * This routine grabs a session-level lock on the target relation. The + * This routine grabs a session-level lock on the target relation. The * session lock persists across transaction boundaries. It will be removed * when UnlockRelationForSession() is called, or if an elog(ERROR) occurs, * or if the backend exits. @@ -291,6 +291,7 @@ XactLockTableDelete(TransactionId xid) LockRelease(LockTableId, &tag, xid, ExclusiveLock); } + #endif void diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c index b3c630b79c0..912a25ff229 100644 --- a/src/backend/storage/lmgr/lock.c +++ b/src/backend/storage/lmgr/lock.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.87 2001/03/18 20:13:13 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.88 2001/03/22 03:59:46 momjian Exp $ * * NOTES * Outside modules can create a lock table and acquire/release @@ -40,10 +40,10 @@ #include "utils/memutils.h" #include "utils/ps_status.h" -static int WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode, - LOCK *lock, HOLDER *holder); +static int WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode, + LOCK *lock, HOLDER *holder); static void LockCountMyLocks(SHMEM_OFFSET lockOffset, PROC *proc, - int *myHolding); + int *myHolding); static char *lock_mode_names[] = { @@ -65,40 +65,40 @@ static char *DeadLockMessage = "Deadlock detected.\n\tSee the lock(l) manual pag /*------ * The following configuration options are available for lock debugging: * - * TRACE_LOCKS -- give a bunch of output what's going on in this file - * TRACE_USERLOCKS -- same but for user locks - * TRACE_LOCK_OIDMIN-- do not trace locks for tables below this oid - * (use to avoid output on system tables) - * TRACE_LOCK_TABLE -- trace locks on this table (oid) unconditionally - * DEBUG_DEADLOCKS -- currently dumps locks at untimely occasions ;) + * TRACE_LOCKS -- give a bunch of output what's going on in this file + * TRACE_USERLOCKS -- same but for user locks + * TRACE_LOCK_OIDMIN-- do not trace locks for tables below this oid + * (use to avoid output on system tables) + * TRACE_LOCK_TABLE -- trace locks on this table (oid) unconditionally + * DEBUG_DEADLOCKS -- currently dumps locks at untimely occasions ;) * * Furthermore, but in storage/ipc/spin.c: - * TRACE_SPINLOCKS -- trace spinlocks (pretty useless) + * TRACE_SPINLOCKS -- trace spinlocks (pretty useless) * * Define LOCK_DEBUG at compile time to get all these enabled. * -------- */ -int Trace_lock_oidmin = BootstrapObjectIdData; -bool Trace_locks = false; -bool Trace_userlocks = false; -int Trace_lock_table = 0; -bool Debug_deadlocks = false; +int Trace_lock_oidmin = BootstrapObjectIdData; +bool Trace_locks = false; +bool Trace_userlocks = false; +int Trace_lock_table = 0; +bool Debug_deadlocks = false; inline static bool -LOCK_DEBUG_ENABLED(const LOCK * lock) +LOCK_DEBUG_ENABLED(const LOCK *lock) { return - (((LOCK_LOCKMETHOD(*lock) == DEFAULT_LOCKMETHOD && Trace_locks) - || (LOCK_LOCKMETHOD(*lock) == USER_LOCKMETHOD && Trace_userlocks)) - && (lock->tag.relId >= (Oid) Trace_lock_oidmin)) - || (Trace_lock_table && (lock->tag.relId == Trace_lock_table)); + (((LOCK_LOCKMETHOD(*lock) == DEFAULT_LOCKMETHOD && Trace_locks) + || (LOCK_LOCKMETHOD(*lock) == USER_LOCKMETHOD && Trace_userlocks)) + && (lock->tag.relId >= (Oid) Trace_lock_oidmin)) + || (Trace_lock_table && (lock->tag.relId == Trace_lock_table)); } inline static void -LOCK_PRINT(const char * where, const LOCK * lock, LOCKMODE type) +LOCK_PRINT(const char *where, const LOCK *lock, LOCKMODE type) { if (LOCK_DEBUG_ENABLED(lock)) elog(DEBUG, @@ -119,30 +119,30 @@ LOCK_PRINT(const char * where, const LOCK * lock, LOCKMODE type) inline static void -HOLDER_PRINT(const char * where, const HOLDER * holderP) +HOLDER_PRINT(const char *where, const HOLDER *holderP) { if ( - (((HOLDER_LOCKMETHOD(*holderP) == DEFAULT_LOCKMETHOD && Trace_locks) - || (HOLDER_LOCKMETHOD(*holderP) == USER_LOCKMETHOD && Trace_userlocks)) - && (((LOCK *)MAKE_PTR(holderP->tag.lock))->tag.relId >= (Oid) Trace_lock_oidmin)) - || (Trace_lock_table && (((LOCK *)MAKE_PTR(holderP->tag.lock))->tag.relId == Trace_lock_table)) - ) + (((HOLDER_LOCKMETHOD(*holderP) == DEFAULT_LOCKMETHOD && Trace_locks) + || (HOLDER_LOCKMETHOD(*holderP) == USER_LOCKMETHOD && Trace_userlocks)) + && (((LOCK *) MAKE_PTR(holderP->tag.lock))->tag.relId >= (Oid) Trace_lock_oidmin)) + || (Trace_lock_table && (((LOCK *) MAKE_PTR(holderP->tag.lock))->tag.relId == Trace_lock_table)) + ) elog(DEBUG, "%s: holder(%lx) lock(%lx) tbl(%d) proc(%lx) xid(%u) hold(%d,%d,%d,%d,%d,%d,%d)=%d", where, MAKE_OFFSET(holderP), holderP->tag.lock, HOLDER_LOCKMETHOD(*(holderP)), holderP->tag.proc, holderP->tag.xid, - holderP->holding[1], holderP->holding[2], holderP->holding[3], - holderP->holding[4], holderP->holding[5], holderP->holding[6], + holderP->holding[1], holderP->holding[2], holderP->holding[3], + holderP->holding[4], holderP->holding[5], holderP->holding[6], holderP->holding[7], holderP->nHolding); } -#else /* not LOCK_DEBUG */ +#else /* not LOCK_DEBUG */ #define LOCK_PRINT(where, lock, type) #define HOLDER_PRINT(where, holderP) -#endif /* not LOCK_DEBUG */ +#endif /* not LOCK_DEBUG */ @@ -218,7 +218,7 @@ LockingDisabled(void) LOCKMETHODTABLE * GetLocksMethodTable(LOCK *lock) { - LOCKMETHOD lockmethod = LOCK_LOCKMETHOD(*lock); + LOCKMETHOD lockmethod = LOCK_LOCKMETHOD(*lock); Assert(lockmethod > 0 && lockmethod < NumLockMethods); return LockMethodTable[lockmethod]; @@ -258,7 +258,7 @@ LockMethodInit(LOCKMETHODTABLE *lockMethodTable, * is wasteful, in this case, but not much space is involved. * * NOTE: data structures allocated here are allocated permanently, using - * TopMemoryContext and shared memory. We don't ever release them anyway, + * TopMemoryContext and shared memory. We don't ever release them anyway, * and in normal multi-backend operation the lock table structures set up * by the postmaster are inherited by each backend, so they must be in * TopMemoryContext. @@ -304,8 +304,8 @@ LockMethodTableInit(char *tabName, SpinAcquire(LockMgrLock); /* - * allocate a control structure from shared memory or attach to it - * if it already exists. + * allocate a control structure from shared memory or attach to it if + * it already exists. * */ sprintf(shmemName, "%s (ctl)", tabName); @@ -341,8 +341,8 @@ LockMethodTableInit(char *tabName, Assert(NumLockMethods <= MAX_LOCK_METHODS); /* - * allocate a hash table for LOCK structs. This is used - * to store per-locked-object information. + * allocate a hash table for LOCK structs. This is used to store + * per-locked-object information. * */ info.keysize = SHMEM_LOCKTAB_KEYSIZE; @@ -362,8 +362,8 @@ LockMethodTableInit(char *tabName, Assert(lockMethodTable->lockHash->hash == tag_hash); /* - * allocate a hash table for HOLDER structs. This is used - * to store per-lock-holder information. + * allocate a hash table for HOLDER structs. This is used to store + * per-lock-holder information. * */ info.keysize = SHMEM_HOLDERTAB_KEYSIZE; @@ -558,7 +558,8 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, * Create the hash key for the holder table. * */ - MemSet(&holdertag, 0, sizeof(HOLDERTAG)); /* must clear padding, needed */ + MemSet(&holdertag, 0, sizeof(HOLDERTAG)); /* must clear padding, + * needed */ holdertag.lock = MAKE_OFFSET(lock); holdertag.proc = MAKE_OFFSET(MyProc); TransactionIdStore(xid, &holdertag.xid); @@ -595,6 +596,7 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, Assert(holder->nHolding <= lock->nGranted); #ifdef CHECK_DEADLOCK_RISK + /* * Issue warning if we already hold a lower-level lock on this * object and do not hold a lock of the requested level or higher. @@ -602,12 +604,13 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, * a deadlock if another backend were following the same code path * at about the same time). * - * This is not enabled by default, because it may generate log entries - * about user-level coding practices that are in fact safe in context. - * It can be enabled to help find system-level problems. + * This is not enabled by default, because it may generate log + * entries about user-level coding practices that are in fact safe + * in context. It can be enabled to help find system-level + * problems. * - * XXX Doing numeric comparison on the lockmodes is a hack; - * it'd be better to use a table. For now, though, this works. + * XXX Doing numeric comparison on the lockmodes is a hack; it'd be + * better to use a table. For now, though, this works. */ for (i = lockMethodTable->ctl->numLockModes; i > 0; i--) { @@ -618,17 +621,17 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, elog(DEBUG, "Deadlock risk: raising lock level" " from %s to %s on object %u/%u/%u", lock_mode_names[i], lock_mode_names[lockmode], - lock->tag.relId, lock->tag.dbId, lock->tag.objId.blkno); + lock->tag.relId, lock->tag.dbId, lock->tag.objId.blkno); break; } } -#endif /* CHECK_DEADLOCK_RISK */ +#endif /* CHECK_DEADLOCK_RISK */ } /* * lock->nRequested and lock->requested[] count the total number of - * requests, whether granted or waiting, so increment those immediately. - * The other counts don't increment till we get the lock. + * requests, whether granted or waiting, so increment those + * immediately. The other counts don't increment till we get the lock. * */ lock->nRequested++; @@ -636,8 +639,8 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, Assert((lock->nRequested > 0) && (lock->requested[lockmode] > 0)); /* - * If I already hold one or more locks of the requested type, - * just grant myself another one without blocking. + * If I already hold one or more locks of the requested type, just + * grant myself another one without blocking. * */ if (holder->holding[lockmode] > 0) @@ -649,8 +652,8 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, } /* - * If this process (under any XID) is a holder of the lock, - * also grant myself another one without blocking. + * If this process (under any XID) is a holder of the lock, also grant + * myself another one without blocking. * */ LockCountMyLocks(holder->tag.lock, MyProc, myHolding); @@ -663,9 +666,9 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, } /* - * If lock requested conflicts with locks requested by waiters, - * must join wait queue. Otherwise, check for conflict with - * already-held locks. (That's last because most complex check.) + * If lock requested conflicts with locks requested by waiters, must + * join wait queue. Otherwise, check for conflict with already-held + * locks. (That's last because most complex check.) * */ if (lockMethodTable->ctl->conflictTab[lockmode] & lock->waitMask) @@ -711,7 +714,7 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, SpinRelease(masterLock); return FALSE; } -#endif /* USER_LOCKS */ +#endif /* USER_LOCKS */ /* * Construct bitmask of locks this process holds on this object. @@ -737,8 +740,9 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, /* * NOTE: do not do any material change of state between here and - * return. All required changes in locktable state must have been - * done when the lock was granted to us --- see notes in WaitOnLock. + * return. All required changes in locktable state must have been + * done when the lock was granted to us --- see notes in + * WaitOnLock. */ /* @@ -795,13 +799,13 @@ LockCheckConflicts(LOCKMETHODTABLE *lockMethodTable, int localHolding[MAX_LOCKMODES]; /* - * first check for global conflicts: If no locks conflict - * with my request, then I get the lock. + * first check for global conflicts: If no locks conflict with my + * request, then I get the lock. * * Checking for conflict: lock->grantMask represents the types of - * currently held locks. conflictTable[lockmode] has a bit - * set for each type of lock that conflicts with request. Bitwise - * compare tells if there is a conflict. + * currently held locks. conflictTable[lockmode] has a bit set for + * each type of lock that conflicts with request. Bitwise compare + * tells if there is a conflict. * */ if (!(lockctl->conflictTab[lockmode] & lock->grantMask)) @@ -811,10 +815,10 @@ LockCheckConflicts(LOCKMETHODTABLE *lockMethodTable, } /* - * Rats. Something conflicts. But it could still be my own - * lock. We have to construct a conflict mask - * that does not reflect our own locks. Locks held by the current - * process under another XID also count as "our own locks". + * Rats. Something conflicts. But it could still be my own lock. We + * have to construct a conflict mask that does not reflect our own + * locks. Locks held by the current process under another XID also + * count as "our own locks". * */ if (myHolding == NULL) @@ -834,10 +838,9 @@ LockCheckConflicts(LOCKMETHODTABLE *lockMethodTable, } /* - * now check again for conflicts. 'bitmask' describes the types - * of locks held by other processes. If one of these - * conflicts with the kind of lock that I want, there is a - * conflict and I have to sleep. + * now check again for conflicts. 'bitmask' describes the types of + * locks held by other processes. If one of these conflicts with the + * kind of lock that I want, there is a conflict and I have to sleep. * */ if (!(lockctl->conflictTab[lockmode] & bitmask)) @@ -878,9 +881,7 @@ LockCountMyLocks(SHMEM_OFFSET lockOffset, PROC *proc, int *myHolding) if (lockOffset == holder->tag.lock) { for (i = 1; i < MAX_LOCKMODES; i++) - { myHolding[i] += holder->holding[i]; - } } holder = (HOLDER *) SHMQueueNext(procHolders, &holder->procLink, @@ -947,8 +948,8 @@ WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode, * Hence, after granting, the locktable state must fully reflect the * fact that we own the lock; we can't do additional work on return. * Contrariwise, if we fail, any cleanup must happen in xact abort - * processing, not here, to ensure it will also happen in the cancel/die - * case. + * processing, not here, to ensure it will also happen in the + * cancel/die case. */ if (ProcSleep(lockMethodTable, @@ -956,9 +957,10 @@ WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode, lock, holder) != STATUS_OK) { + /* - * We failed as a result of a deadlock, see HandleDeadLock(). - * Quit now. Removal of the holder and lock objects, if no longer + * We failed as a result of a deadlock, see HandleDeadLock(). Quit + * now. Removal of the holder and lock objects, if no longer * needed, will happen in xact cleanup (see above for motivation). */ LOCK_PRINT("WaitOnLock: aborting on lock", lock, lockmode); @@ -984,15 +986,15 @@ WaitOnLock(LOCKMETHOD lockmethod, LOCKMODE lockmode, * NB: this does not remove the process' holder object, nor the lock object, * even though their counts might now have gone to zero. That will happen * during a subsequent LockReleaseAll call, which we expect will happen - * during transaction cleanup. (Removal of a proc from its wait queue by + * during transaction cleanup. (Removal of a proc from its wait queue by * this routine can only happen if we are aborting the transaction.) *-------------------- */ void RemoveFromWaitQueue(PROC *proc) { - LOCK *waitLock = proc->waitLock; - LOCKMODE lockmode = proc->waitLockMode; + LOCK *waitLock = proc->waitLock; + LOCKMODE lockmode = proc->waitLockMode; /* Make sure proc is waiting */ Assert(proc->links.next != INVALID_OFFSET); @@ -1095,7 +1097,8 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, /* * Find the holder entry for this holder. */ - MemSet(&holdertag, 0, sizeof(HOLDERTAG)); /* must clear padding, needed */ + MemSet(&holdertag, 0, sizeof(HOLDERTAG)); /* must clear padding, + * needed */ holdertag.lock = MAKE_OFFSET(lock); holdertag.proc = MAKE_OFFSET(MyProc); TransactionIdStore(xid, &holdertag.xid); @@ -1156,11 +1159,11 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, /* * We need only run ProcLockWakeup if the released lock conflicts with * at least one of the lock types requested by waiter(s). Otherwise - * whatever conflict made them wait must still exist. NOTE: before MVCC, - * we could skip wakeup if lock->granted[lockmode] was still positive. - * But that's not true anymore, because the remaining granted locks might - * belong to some waiter, who could now be awakened because he doesn't - * conflict with his own locks. + * whatever conflict made them wait must still exist. NOTE: before + * MVCC, we could skip wakeup if lock->granted[lockmode] was still + * positive. But that's not true anymore, because the remaining + * granted locks might belong to some waiter, who could now be + * awakened because he doesn't conflict with his own locks. * */ if (lockMethodTable->ctl->conflictTab[lockmode] & lock->waitMask) @@ -1168,10 +1171,10 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, if (lock->nRequested == 0) { + /* - * if there's no one waiting in the queue, - * we just released the last lock on this object. - * Delete it from the lock table. + * if there's no one waiting in the queue, we just released the + * last lock on this object. Delete it from the lock table. * */ Assert(lockMethodTable->lockHash->hash == tag_hash); @@ -1197,8 +1200,8 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, Assert((holder->nHolding >= 0) && (holder->holding[lockmode] >= 0)); /* - * If this was my last hold on this lock, delete my entry in the holder - * table. + * If this was my last hold on this lock, delete my entry in the + * holder table. */ if (holder->nHolding == 0) { @@ -1316,11 +1319,12 @@ LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc, Assert(lock->requested[i] >= 0 && lock->granted[i] >= 0); if (lock->granted[i] == 0) lock->grantMask &= BITS_OFF[i]; + /* * Read comments in LockRelease */ if (!wakeupNeeded && - lockMethodTable->ctl->conflictTab[i] & lock->waitMask) + lockMethodTable->ctl->conflictTab[i] & lock->waitMask) wakeupNeeded = true; } } @@ -1331,9 +1335,10 @@ LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc, } else { + /* - * This holder accounts for all the requested locks on the object, - * so we can be lazy and just zero things out. + * This holder accounts for all the requested locks on the + * object, so we can be lazy and just zero things out. * */ lock->nRequested = 0; @@ -1371,6 +1376,7 @@ LockReleaseAll(LOCKMETHOD lockmethod, PROC *proc, if (lock->nRequested == 0) { + /* * We've just released the last lock, so garbage-collect the * lock object. @@ -1412,7 +1418,8 @@ LockShmemSize(int maxBackends) size += MAXALIGN(sizeof(PROC_HDR)); /* ProcGlobal */ size += maxBackends * MAXALIGN(sizeof(PROC)); /* each MyProc */ - size += MAX_LOCK_METHODS * MAXALIGN(sizeof(LOCKMETHODCTL)); /* each lockMethodTable->ctl */ + size += MAX_LOCK_METHODS * MAXALIGN(sizeof(LOCKMETHODCTL)); /* each + * lockMethodTable->ctl */ /* lockHash table */ size += hash_estimate_size(NLOCKENTS(maxBackends), @@ -1534,4 +1541,4 @@ DumpAllLocks(void) } } -#endif /* LOCK_DEBUG */ +#endif /* LOCK_DEBUG */ diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index 463af1fa5e3..ee2d6751c5e 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.98 2001/01/26 18:23:12 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.99 2001/03/22 03:59:46 momjian Exp $ * *------------------------------------------------------------------------- */ @@ -76,7 +76,7 @@ #include "storage/proc.h" -int DeadlockTimeout = 1000; +int DeadlockTimeout = 1000; /* -------------------- * Spin lock for manipulating the shared process data structure: @@ -147,10 +147,10 @@ InitProcGlobal(int maxBackends) /* * Arrange to delete semas on exit --- set this up now so that we - * will clean up if pre-allocation fails. We use our own freeproc, - * rather than IpcSemaphoreCreate's removeOnExit option, because - * we don't want to fill up the on_shmem_exit list with a separate - * entry for each semaphore set. + * will clean up if pre-allocation fails. We use our own + * freeproc, rather than IpcSemaphoreCreate's removeOnExit option, + * because we don't want to fill up the on_shmem_exit list with a + * separate entry for each semaphore set. */ on_shmem_exit(ProcFreeAllSemaphores, 0); @@ -159,9 +159,9 @@ InitProcGlobal(int maxBackends) */ Assert(maxBackends > 0 && maxBackends <= MAXBACKENDS); - for (i = 0; i < ((maxBackends-1)/PROC_NSEMS_PER_SET+1); i++) + for (i = 0; i < ((maxBackends - 1) / PROC_NSEMS_PER_SET + 1); i++) { - IpcSemaphoreId semId; + IpcSemaphoreId semId; semId = IpcSemaphoreCreate(PROC_NSEMS_PER_SET, IPCProtection, @@ -242,6 +242,7 @@ InitProcess(void) if (IsUnderPostmaster) { ProcGetNewSemIdAndNum(&MyProc->sem.semId, &MyProc->sem.semNum); + /* * we might be reusing a semaphore that belongs to a dead backend. * So be careful and reinitialize its value here. @@ -288,8 +289,8 @@ InitProcess(void) on_shmem_exit(ProcKill, 0); /* - * Now that we have a PROC, we could try to acquire locks, - * so initialize the deadlock checker. + * Now that we have a PROC, we could try to acquire locks, so + * initialize the deadlock checker. */ InitDeadLockChecking(); } @@ -300,7 +301,7 @@ InitProcess(void) static void ZeroProcSemaphore(PROC *proc) { - union semun semun; + union semun semun; semun.val = 0; if (semctl(proc->sem.semId, proc->sem.semNum, SETVAL, semun) < 0) @@ -333,15 +334,15 @@ LockWaitCancel(void) #ifndef __BEOS__ { struct itimerval timeval, - dummy; + dummy; MemSet(&timeval, 0, sizeof(struct itimerval)); setitimer(ITIMER_REAL, &timeval, &dummy); } #else /* BeOS doesn't have setitimer, but has set_alarm */ - set_alarm(B_INFINITE_TIMEOUT, B_PERIODIC_ALARM); -#endif /* __BEOS__ */ + set_alarm(B_INFINITE_TIMEOUT, B_PERIODIC_ALARM); +#endif /* __BEOS__ */ /* Unlink myself from the wait queue, if on it (might not be anymore!) */ LockLockTable(); @@ -352,17 +353,17 @@ LockWaitCancel(void) /* * Reset the proc wait semaphore to zero. This is necessary in the * scenario where someone else granted us the lock we wanted before we - * were able to remove ourselves from the wait-list. The semaphore will - * have been bumped to 1 by the would-be grantor, and since we are no - * longer going to wait on the sema, we have to force it back to zero. - * Otherwise, our next attempt to wait for a lock will fall through - * prematurely. + * were able to remove ourselves from the wait-list. The semaphore + * will have been bumped to 1 by the would-be grantor, and since we + * are no longer going to wait on the sema, we have to force it back + * to zero. Otherwise, our next attempt to wait for a lock will fall + * through prematurely. */ ZeroProcSemaphore(MyProc); /* - * Return true even if we were kicked off the lock before we were - * able to remove ourselves. + * Return true even if we were kicked off the lock before we were able + * to remove ourselves. */ return true; } @@ -467,7 +468,7 @@ ProcQueueAlloc(char *name) { bool found; PROC_QUEUE *queue = (PROC_QUEUE *) - ShmemInitStruct(name, sizeof(PROC_QUEUE), &found); + ShmemInitStruct(name, sizeof(PROC_QUEUE), &found); if (!queue) return NULL; @@ -520,11 +521,14 @@ ProcSleep(LOCKMETHODTABLE *lockMethodTable, int myHeldLocks = MyProc->heldLocks; PROC *proc; int i; + #ifndef __BEOS__ struct itimerval timeval, dummy; + #else - bigtime_t time_interval; + bigtime_t time_interval; + #endif /* ---------------------- @@ -582,6 +586,7 @@ ProcSleep(LOCKMETHODTABLE *lockMethodTable, aheadRequests |= (1 << proc->waitLockMode); proc = (PROC *) MAKE_PTR(proc->links.next); } + /* * If we fall out of loop normally, proc points to waitQueue head, * so we will insert at tail of queue as desired. @@ -607,7 +612,7 @@ ProcSleep(LOCKMETHODTABLE *lockMethodTable, MyProc->waitHolder = holder; MyProc->waitLockMode = lockmode; - MyProc->errType = STATUS_OK; /* initialize result for success */ + MyProc->errType = STATUS_OK;/* initialize result for success */ /* mark that we are waiting for a lock */ waitingForLock = true; @@ -643,7 +648,7 @@ ProcSleep(LOCKMETHODTABLE *lockMethodTable, if (setitimer(ITIMER_REAL, &timeval, &dummy)) elog(FATAL, "ProcSleep: Unable to set timer for process wakeup"); #else - time_interval = DeadlockTimeout * 1000000; /* usecs */ + time_interval = DeadlockTimeout * 1000000; /* usecs */ if (set_alarm(time_interval, B_ONE_SHOT_RELATIVE_ALARM) < 0) elog(FATAL, "ProcSleep: Unable to set timer for process wakeup"); #endif @@ -674,7 +679,7 @@ ProcSleep(LOCKMETHODTABLE *lockMethodTable, if (setitimer(ITIMER_REAL, &timeval, &dummy)) elog(FATAL, "ProcSleep: Unable to disable timer for process wakeup"); #else - if (set_alarm(B_INFINITE_TIMEOUT, B_PERIODIC_ALARM) < 0) + if (set_alarm(B_INFINITE_TIMEOUT, B_PERIODIC_ALARM) < 0) elog(FATAL, "ProcSleep: Unable to disable timer for process wakeup"); #endif @@ -759,7 +764,7 @@ ProcLockWakeup(LOCKMETHODTABLE *lockMethodTable, LOCK *lock) while (queue_size-- > 0) { - LOCKMODE lockmode = proc->waitLockMode; + LOCKMODE lockmode = proc->waitLockMode; /* * Waken if (a) doesn't conflict with requests of earlier waiters, @@ -776,15 +781,20 @@ ProcLockWakeup(LOCKMETHODTABLE *lockMethodTable, LOCK *lock) /* OK to waken */ GrantLock(lock, proc->waitHolder, lockmode); proc = ProcWakeup(proc, STATUS_OK); + /* - * ProcWakeup removes proc from the lock's waiting process queue - * and returns the next proc in chain; don't use proc's next-link, - * because it's been cleared. + * ProcWakeup removes proc from the lock's waiting process + * queue and returns the next proc in chain; don't use proc's + * next-link, because it's been cleared. */ } else { - /* Cannot wake this guy. Remember his request for later checks. */ + + /* + * Cannot wake this guy. Remember his request for later + * checks. + */ aheadRequests |= (1 << lockmode); proc = (PROC *) MAKE_PTR(proc->links.next); } @@ -807,11 +817,11 @@ HandleDeadLock(SIGNAL_ARGS) int save_errno = errno; /* - * Acquire locktable lock. Note that the SIGALRM interrupt had better - * not be enabled anywhere that this process itself holds the locktable - * lock, else this will wait forever. Also note that this calls - * SpinAcquire which creates a critical section, so that this routine - * cannot be interrupted by cancel/die interrupts. + * Acquire locktable lock. Note that the SIGALRM interrupt had better + * not be enabled anywhere that this process itself holds the + * locktable lock, else this will wait forever. Also note that this + * calls SpinAcquire which creates a critical section, so that this + * routine cannot be interrupted by cancel/die interrupts. */ LockLockTable(); @@ -836,8 +846,8 @@ HandleDeadLock(SIGNAL_ARGS) } #ifdef LOCK_DEBUG - if (Debug_deadlocks) - DumpAllLocks(); + if (Debug_deadlocks) + DumpAllLocks(); #endif if (!DeadLockCheck(MyProc)) |