aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorVadim B. Mikheev <vadim4o@yahoo.com>1999-05-07 01:23:11 +0000
committerVadim B. Mikheev <vadim4o@yahoo.com>1999-05-07 01:23:11 +0000
commit122abf3af3e0519cfddcfdfd0dfb8eb0c4aca61c (patch)
treedf5da77d26dc9f7337be57fe614030ee6a7b0330
parent86bc1da2628322c25190a15f1b6a433237aa1a45 (diff)
downloadpostgresql-122abf3af3e0519cfddcfdfd0dfb8eb0c4aca61c.tar.gz
postgresql-122abf3af3e0519cfddcfdfd0dfb8eb0c4aca61c.zip
Fix LMGR for MVCC.
Get rid of Extend lock mode.
-rw-r--r--src/backend/access/heap/hio.c15
-rw-r--r--src/backend/storage/lmgr/lmgr.c8
-rw-r--r--src/backend/storage/lmgr/lock.c251
-rw-r--r--src/backend/storage/lmgr/proc.c128
-rw-r--r--src/include/storage/lmgr.h4
-rw-r--r--src/include/storage/lock.h5
-rw-r--r--src/include/storage/proc.h11
7 files changed, 205 insertions, 217 deletions
diff --git a/src/backend/access/heap/hio.c b/src/backend/access/heap/hio.c
index 3b4fa4af0bb..f991b206d44 100644
--- a/src/backend/access/heap/hio.c
+++ b/src/backend/access/heap/hio.c
@@ -7,7 +7,7 @@
*
*
* IDENTIFICATION
- * $Id: hio.c,v 1.18 1999/05/01 15:04:46 vadim Exp $
+ * $Id: hio.c,v 1.19 1999/05/07 01:22:53 vadim Exp $
*
*-------------------------------------------------------------------------
*/
@@ -111,14 +111,13 @@ RelationPutHeapTupleAtEnd(Relation relation, HeapTuple tuple)
Item item;
/*
- * Actually, we lock _relation_ here, not page, but I believe
- * that locking page is faster... Obviously, we could get rid
- * of ExtendLock mode at all and use ExclusiveLock mode on
- * page 0, as long as we use page-level locking for indices only,
- * but we are in 6.5-beta currently... - vadim 05/01/99
+ * Lock relation for extention. We can use LockPage here as long as
+ * in all other places we use page-level locking for indices only.
+ * Alternatevely, we could define pseudo-table as we do for
+ * transactions with XactLockTable.
*/
if (!relation->rd_myxactonly)
- LockPage(relation, 0, ExtendLock);
+ LockPage(relation, 0, ExclusiveLock);
/*
* XXX This does an lseek - VERY expensive - but at the moment it is
@@ -166,7 +165,7 @@ RelationPutHeapTupleAtEnd(Relation relation, HeapTuple tuple)
}
if (!relation->rd_myxactonly)
- UnlockPage(relation, 0, ExtendLock);
+ UnlockPage(relation, 0, ExclusiveLock);
offnum = PageAddItem((Page) pageHeader, (Item) tuple->t_data,
tuple->t_len, InvalidOffsetNumber, LP_USED);
diff --git a/src/backend/storage/lmgr/lmgr.c b/src/backend/storage/lmgr/lmgr.c
index 7d34499b017..7eb51207c7e 100644
--- a/src/backend/storage/lmgr/lmgr.c
+++ b/src/backend/storage/lmgr/lmgr.c
@@ -7,7 +7,7 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lmgr.c,v 1.22 1999/02/13 23:18:24 momjian Exp $
+ * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lmgr.c,v 1.23 1999/05/07 01:23:02 vadim Exp $
*
*-------------------------------------------------------------------------
*/
@@ -79,9 +79,6 @@ static MASK LockConflicts[] = {
(1 << RowExclusiveLock) | (1 << RowShareLock) | (1 << AccessExclusiveLock) |
(1 << AccessShareLock),
-/* ExtendLock */
- (1 << ExtendLock)
-
};
static int LockPrios[] = {
@@ -92,8 +89,7 @@ static int LockPrios[] = {
4,
5,
6,
- 7,
- 1
+ 7
};
LOCKMETHOD LockTableId = (LOCKMETHOD) NULL;
diff --git a/src/backend/storage/lmgr/lock.c b/src/backend/storage/lmgr/lock.c
index c39138ff129..c1996fe0713 100644
--- a/src/backend/storage/lmgr/lock.c
+++ b/src/backend/storage/lmgr/lock.c
@@ -7,7 +7,7 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.49 1999/04/30 17:03:04 momjian Exp $
+ * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/lock.c,v 1.50 1999/05/07 01:23:03 vadim Exp $
*
* NOTES
* Outside modules can create a lock table and acquire/release
@@ -50,8 +50,7 @@
#include "utils/trace.h"
#include "utils/ps_status.h"
-static int WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode,
- TransactionId xid);
+static int WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode);
/*
* lockDebugRelation can be used to trace unconditionally a single relation,
@@ -143,12 +142,14 @@ static int WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode,
#endif /* !LOCK_MGR_DEBUG */
static char *lock_types[] = {
- "",
- "WRITE",
- "READ",
- "WRITE INTENT",
- "READ INTENT",
- "EXTEND"
+ "INVALID",
+ "AccessShareLock",
+ "RowShareLock",
+ "RowExclusiveLock",
+ "ShareLock",
+ "ShareRowExclusiveLock",
+ "ExclusiveLock",
+ "AccessExclusiveLock"
};
SPINLOCK LockMgrLock; /* in Shmem or created in
@@ -631,12 +632,11 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
/* --------------------
* If I'm the only one holding a lock, then there
- * cannot be a conflict. Need to subtract one from the
- * lock's count since we just bumped the count up by 1
- * above.
+ * cannot be a conflict. The same is true if we already
+ * hold this lock.
* --------------------
*/
- if (result->nHolding == lock->nActive)
+ if (result->nHolding == lock->nActive || result->holders[lockmode] != 0)
{
result->holders[lockmode]++;
result->nHolding++;
@@ -647,7 +647,39 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
return TRUE;
}
- status = LockResolveConflicts(lockmethod, lock, lockmode, xid, result);
+ /*
+ * If lock requested conflicts with locks requested by waiters...
+ */
+ if (lockMethodTable->ctl->conflictTab[lockmode] & lock->waitMask)
+ {
+ int i = 1;
+
+ /*
+ * If I don't hold locks or my locks don't conflict
+ * with waiters then force to sleep.
+ */
+ if (result->nHolding > 0)
+ {
+ for ( ; i <= lockMethodTable->ctl->numLockModes; i++)
+ {
+ if (result->holders[i] > 0 &&
+ lockMethodTable->ctl->conflictTab[i] & lock->waitMask)
+ break; /* conflict */
+ }
+ }
+
+ if (result->nHolding == 0 || i > lockMethodTable->ctl->numLockModes)
+ {
+ XID_PRINT("LockAcquire: higher priority proc waiting",
+ result);
+ status = STATUS_FOUND;
+ }
+ else
+ status = LockResolveConflicts(lockmethod, lock, lockmode, xid, result);
+ }
+ else
+ status = LockResolveConflicts(lockmethod, lock, lockmode, xid, result);
+
if (status == STATUS_OK)
GrantLock(lock, lockmode);
else if (status == STATUS_FOUND)
@@ -680,7 +712,25 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
return FALSE;
}
#endif
- status = WaitOnLock(lockmethod, lock, lockmode, xid);
+ /*
+ * Construct bitmask of locks we hold before going to sleep.
+ */
+ MyProc->holdLock = 0;
+ if (result->nHolding > 0)
+ {
+ int i,
+ tmpMask = 2;
+
+ for (i = 1; i <= lockMethodTable->ctl->numLockModes;
+ i++, tmpMask <<= 1)
+ {
+ if (result->holders[i] > 0)
+ MyProc->holdLock |= tmpMask;
+ }
+ Assert(MyProc->holdLock != 0);
+ }
+
+ status = WaitOnLock(lockmethod, lock, lockmode);
/*
* Check the xid entry status, in case something in the ipc
@@ -712,10 +762,6 @@ LockAcquire(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
* determining whether or not any new lock acquired conflicts with
* the old ones.
*
- * For example, if I am already holding a WRITE_INTENT lock,
- * there will not be a conflict with my own READ_LOCK. If I
- * don't consider the intent lock when checking for conflicts,
- * I find no conflict.
* ----------------------------
*/
int
@@ -812,32 +858,6 @@ LockResolveConflicts(LOCKMETHOD lockmethod,
}
Assert((result->nHolding >= 0) && (result->holders[lockmode] >= 0));
- /*
- * We can control runtime this option. Default is lockReadPriority=0
- */
- if (!lockReadPriority)
- {
- /* ------------------------
- * If someone with a greater priority is waiting for the lock,
- * do not continue and share the lock, even if we can.
- * Don't do this if the process already has some locks, because
- * this could hold up other people waiting on our locks, causing
- * a priority inversion. bjm
- * ------------------------
- */
- int myprio = LockMethodTable[lockmethod]->ctl->prio[lockmode];
- PROC_QUEUE *waitQueue = &(lock->waitProcs);
- PROC *topproc = (PROC *) MAKE_PTR(waitQueue->links.prev);
-
- if (SHMQueueEmpty(&MyProc->lockQueue) && waitQueue->size &&
- topproc->prio > myprio)
- {
- XID_PRINT("LockResolveConflicts: higher priority proc waiting",
- result);
- return STATUS_FOUND;
- }
- }
-
/* ----------------------------
* first check for global conflicts: If no locks conflict
* with mine, then I get the lock.
@@ -909,12 +929,10 @@ GrantLock(LOCK *lock, LOCKMODE lockmode)
}
static int
-WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode,
- TransactionId xid)
+WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode)
{
PROC_QUEUE *waitQueue = &(lock->waitProcs);
LOCKMETHODTABLE *lockMethodTable = LockMethodTable[lockmethod];
- int prio = lockMethodTable->ctl->prio[lockmode];
char old_status[64],
new_status[64];
@@ -934,11 +952,9 @@ WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode,
strcat(new_status, " waiting");
PS_SET_STATUS(new_status);
if (ProcSleep(waitQueue,
- lockMethodTable->ctl->masterLock,
+ lockMethodTable->ctl,
lockmode,
- prio,
- lock,
- xid) != NO_ERROR)
+ lock) != NO_ERROR)
{
/* -------------------
* This could have happend as a result of a deadlock,
@@ -952,12 +968,16 @@ WaitOnLock(LOCKMETHOD lockmethod, LOCK *lock, LOCKMODE lockmode,
LOCK_PRINT_AUX("WaitOnLock: aborting on lock", lock, lockmode);
Assert((lock->nHolding >= 0) && (lock->holders[lockmode] >= 0));
Assert(lock->nActive <= lock->nHolding);
+ if (lock->activeHolders[lockmode] == lock->holders[lockmode])
+ lock->waitMask &= BITS_OFF[lockmode];
SpinRelease(lockMethodTable->ctl->masterLock);
elog(ERROR, "WaitOnLock: error on wakeup - Aborting this transaction");
/* not reached */
}
+ if (lock->activeHolders[lockmode] == lock->holders[lockmode])
+ lock->waitMask &= BITS_OFF[lockmode];
PS_SET_STATUS(old_status);
LOCK_PRINT_AUX("WaitOnLock: wakeup on lock", lock, lockmode);
return STATUS_OK;
@@ -1129,6 +1149,7 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
lock->nActive--;
lock->activeHolders[lockmode]--;
+#ifdef NOT_USED
/* --------------------------
* If there are still active locks of the type I just released, no one
* should be woken up. Whoever is asleep will still conflict
@@ -1138,6 +1159,19 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
if (lock->activeHolders[lockmode])
wakeupNeeded = false;
else
+#endif
+ /*
+ * Above is not valid any more (due to MVCC lock modes).
+ * Actually we should compare activeHolders[lockmode] with
+ * number of waiters holding lock of this type and try to
+ * wakeup only if these numbers are equal (and lock released
+ * conflicts with locks requested by waiters). For the moment
+ * we only check the last condition.
+ */
+ if (lockMethodTable->ctl->conflictTab[lockmode] & lock->waitMask)
+ wakeupNeeded = true;
+
+ if (!(lock->activeHolders[lockmode]))
{
/* change the conflict mask. No more of this lock type. */
lock->mask &= BITS_OFF[lockmode];
@@ -1199,12 +1233,6 @@ LockRelease(LOCKMETHOD lockmethod, LOCKTAG *locktag, LOCKMODE lockmode)
if (wakeupNeeded)
{
- /* --------------------------
- * Wake the first waiting process and grant him the lock if it
- * doesn't conflict. The woken process must record the lock
- * himself.
- * --------------------------
- */
ProcLockWakeup(&(lock->waitProcs), lockmethod, lock);
}
else
@@ -1275,6 +1303,7 @@ LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue)
for (;;)
{
+ bool wakeupNeeded = false;
/*
* Sometimes the queue appears to be messed up.
@@ -1380,6 +1409,12 @@ LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue)
&&(lock->activeHolders[i] >= 0));
if (!lock->activeHolders[i])
lock->mask &= BITS_OFF[i];
+ /*
+ * Read comments in LockRelease
+ */
+ if (!wakeupNeeded && xidLook->holders[i] > 0 &&
+ lockMethodTable->ctl->conflictTab[i] & lock->waitMask)
+ wakeupNeeded = true;
}
lock->nHolding -= xidLook->nHolding;
lock->nActive -= xidLook->nHolding;
@@ -1444,14 +1479,8 @@ LockReleaseAll(LOCKMETHOD lockmethod, SHM_QUEUE *lockQueue)
return FALSE;
}
}
- else
+ else if (wakeupNeeded)
{
- /* --------------------
- * Wake the first waiting process and grant him the lock if it
- * doesn't conflict. The woken process must record the lock
- * him/herself.
- * --------------------
- */
waitQueue = &(lock->waitProcs);
ProcLockWakeup(waitQueue, lockmethod, lock);
}
@@ -1534,46 +1563,22 @@ LockingDisabled()
bool
DeadLockCheck(SHM_QUEUE *lockQueue, LOCK *findlock, bool skip_check)
{
- int done;
- XIDLookupEnt *xidLook = NULL;
- XIDLookupEnt *tmp = NULL;
- SHMEM_OFFSET end = MAKE_OFFSET(lockQueue);
- LOCK *lock;
+ int done;
+ XIDLookupEnt *xidLook = NULL;
+ XIDLookupEnt *tmp = NULL;
+ SHMEM_OFFSET end = MAKE_OFFSET(lockQueue);
+ LOCK *lock;
- LOCKMETHODTABLE *lockMethodTable;
- XIDLookupEnt *result,
- item;
- HTAB *xidTable;
- bool found;
-
- static PROC *checked_procs[MAXBACKENDS];
- static int nprocs;
- static bool MyNHolding;
+ static PROC *checked_procs[MAXBACKENDS];
+ static int nprocs;
/* initialize at start of recursion */
if (skip_check)
{
checked_procs[0] = MyProc;
nprocs = 1;
-
- lockMethodTable = LockMethodTable[DEFAULT_LOCKMETHOD];
- xidTable = lockMethodTable->xidHash;
-
- MemSet(&item, 0, XID_TAGSIZE);
- TransactionIdStore(MyProc->xid, &item.tag.xid);
- item.tag.lock = MAKE_OFFSET(findlock);
-#ifdef NOT_USED
- item.tag.pid = pid;
-#endif
-
- if (!(result = (XIDLookupEnt *)
- hash_search(xidTable, (Pointer) &item, HASH_FIND, &found)) || !found)
- {
- elog(NOTICE, "LockAcquire: xid table corrupted");
- return true;
- }
- MyNHolding = result->nHolding;
}
+
if (SHMQueueEmpty(lockQueue))
return false;
@@ -1583,12 +1588,6 @@ DeadLockCheck(SHM_QUEUE *lockQueue, LOCK *findlock, bool skip_check)
for (;;)
{
- /* ---------------------------
- * XXX Here we assume the shared memory queue is circular and
- * that we know its internal structure. Should have some sort of
- * macros to allow one to walk it. mer 20 July 1991
- * ---------------------------
- */
done = (xidLook->queue.next == end);
lock = (LOCK *) MAKE_PTR(xidLook->tag.lock);
@@ -1613,45 +1612,21 @@ DeadLockCheck(SHM_QUEUE *lockQueue, LOCK *findlock, bool skip_check)
proc = (PROC *) MAKE_PTR(waitQueue->links.prev);
for (i = 0; i < waitQueue->size; i++)
{
+ /*
+ * If I hold some locks on findlock and another proc
+ * waits on it holding locks too - check if we are
+ * waiting one another.
+ */
if (proc != MyProc &&
- lock == findlock && /* skip_check also true */
- MyNHolding) /* I already hold some lock on it */
+ lock == findlock && /* skip_check also true */
+ MyProc->holdLock)
{
-
- /*
- * For findlock's wait queue, we are interested in
- * procs who are blocked waiting for a write-lock on
- * the table we are waiting on, and already hold a
- * lock on it. We first check to see if there is an
- * escalation deadlock, where we hold a readlock and
- * want a writelock, and someone else holds readlock
- * on the same table, and wants a writelock.
- *
- * Basically, the test is, "Do we both hold some lock on
- * findlock, and we are both waiting in the lock
- * queue?" bjm
- */
+ LOCKMETHODCTL *lockctl =
+ LockMethodTable[DEFAULT_LOCKMETHOD]->ctl;
Assert(skip_check);
- Assert(MyProc->prio >= 2);
-
- lockMethodTable = LockMethodTable[1];
- xidTable = lockMethodTable->xidHash;
-
- MemSet(&item, 0, XID_TAGSIZE);
- TransactionIdStore(proc->xid, &item.tag.xid);
- item.tag.lock = MAKE_OFFSET(findlock);
-#ifdef NOT_USED
- item.tag.pid = pid;
-#endif
-
- if (!(result = (XIDLookupEnt *)
- hash_search(xidTable, (Pointer) &item, HASH_FIND, &found)) || !found)
- {
- elog(NOTICE, "LockAcquire: xid table corrupted");
- return true;
- }
- if (result->nHolding)
+ if (lockctl->conflictTab[MyProc->token] & proc->holdLock &&
+ lockctl->conflictTab[proc->token] & MyProc->holdLock)
return true;
}
diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c
index 229a78587cb..b80d32e1b44 100644
--- a/src/backend/storage/lmgr/proc.c
+++ b/src/backend/storage/lmgr/proc.c
@@ -7,7 +7,7 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.53 1999/04/30 02:04:51 momjian Exp $
+ * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.54 1999/05/07 01:23:04 vadim Exp $
*
*-------------------------------------------------------------------------
*/
@@ -46,7 +46,7 @@
* This is so that we can support more backends. (system-wide semaphore
* sets run out pretty fast.) -ay 4/95
*
- * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.53 1999/04/30 02:04:51 momjian Exp $
+ * $Header: /cvsroot/pgsql/src/backend/storage/lmgr/proc.c,v 1.54 1999/05/07 01:23:04 vadim Exp $
*/
#include <sys/time.h>
#include <unistd.h>
@@ -106,6 +106,8 @@ static void ProcKill(int exitStatus, int pid);
static void ProcGetNewSemKeyAndNum(IPCKey *key, int *semNum);
static void ProcFreeSem(IpcSemaphoreKey semKey, int semNum);
+static char *DeadLockMessage = "Deadlock detected -- See the lock(l) manual page for a possible cause.";
+
/*
* InitProcGlobal -
* initializes the global process table. We put it here so that
@@ -488,68 +490,80 @@ ProcQueueInit(PROC_QUEUE *queue)
*/
int
ProcSleep(PROC_QUEUE *waitQueue,/* lock->waitProcs */
- SPINLOCK spinlock,
+ LOCKMETHODCTL *lockctl,
int token, /* lockmode */
- int prio,
- LOCK *lock,
- TransactionId xid) /* needed by user locks, see below */
+ LOCK *lock)
{
int i;
+ SPINLOCK spinlock = lockctl->masterLock;
PROC *proc;
+ int myMask = (1 << token);
+ int waitMask = lock->waitMask;
+ int aheadHolders[MAX_LOCKMODES];
+ bool selfConflict = (lockctl->conflictTab[token] & myMask),
+ prevSame = false;
bool deadlock_checked = false;
struct itimerval timeval,
dummy;
- /*
- * If the first entries in the waitQueue have a greater priority than
- * we have, we must be a reader, and they must be a writers, and we
- * must be here because the current holder is a writer or a reader but
- * we don't share shared locks if a writer is waiting. We put
- * ourselves after the writers. This way, we have a FIFO, but keep
- * the readers together to give them decent priority, and no one
- * starves. Because we group all readers together, a non-empty queue
- * only has a few possible configurations:
- *
- * [readers] [writers] [readers][writers] [writers][readers]
- * [writers][readers][writers]
- *
- * In a full queue, we would have a reader holding a lock, then a writer
- * gets the lock, then a bunch of readers, made up of readers who
- * could not share the first readlock because a writer was waiting,
- * and new readers arriving while the writer had the lock. bjm
- */
+ MyProc->token = token;
+ MyProc->waitLock = lock;
+
proc = (PROC *) MAKE_PTR(waitQueue->links.prev);
- /* If we are a reader, and they are writers, skip past them */
- for (i = 0; i < waitQueue->size && proc->prio > prio; i++)
- proc = (PROC *) MAKE_PTR(proc->links.prev);
+ /* if we don't conflict with any waiter - be first in queue */
+ if (!(lockctl->conflictTab[token] & waitMask))
+ goto ins;
- /* The rest of the queue is FIFO, with readers first, writers last */
- for (; i < waitQueue->size && proc->prio <= prio; i++)
- proc = (PROC *) MAKE_PTR(proc->links.prev);
+ for (i = 1; i < MAX_LOCKMODES; i++)
+ aheadHolders[i] = lock->activeHolders[i];
+ (aheadHolders[token])++;
- MyProc->prio = prio;
- MyProc->token = token;
- MyProc->waitLock = lock;
+ for (i = 0; i < waitQueue->size; i++)
+ {
+ /* am I waiting for him ? */
+ if (lockctl->conflictTab[token] & proc->holdLock)
+ {
+ /* is he waiting for me ? */
+ if (lockctl->conflictTab[proc->token] & MyProc->holdLock)
+ {
+ MyProc->errType = STATUS_ERROR;
+ elog(NOTICE, DeadLockMessage);
+ goto rt;
+ }
+ /* being waiting for him - go past */
+ }
+ /* if he waits for me */
+ else if (lockctl->conflictTab[proc->token] & MyProc->holdLock)
+ {
+ break;
+ }
+ /* if conflicting locks requested */
+ else if (lockctl->conflictTab[proc->token] & myMask)
+ {
+ /*
+ * If I request non self-conflicting lock and there
+ * are others requesting the same lock just before me -
+ * stay here.
+ */
+ if (!selfConflict && prevSame)
+ break;
+ }
+ /*
+ * Last attempt to don't move any more: if we don't conflict
+ * with rest waiters in queue.
+ */
+ else if (!(lockctl->conflictTab[token] & waitMask))
+ break;
-#ifdef USER_LOCKS
- /* -------------------
- * Currently, we only need this for the ProcWakeup routines.
- * This must be 0 for user lock, so we can't just use the value
- * from GetCurrentTransactionId().
- * -------------------
- */
- TransactionIdStore(xid, &MyProc->xid);
-#else
-#ifndef LowLevelLocking
- /* -------------------
- * currently, we only need this for the ProcWakeup routines
- * -------------------
- */
- TransactionIdStore((TransactionId) GetCurrentTransactionId(), &MyProc->xid);
-#endif
-#endif
+ prevSame = (proc->token == token);
+ (aheadHolders[proc->token])++;
+ if (aheadHolders[proc->token] == lock->holders[proc->token])
+ waitMask &= ~ (1 << proc->token);
+ proc = (PROC *) MAKE_PTR(proc->links.prev);
+ }
+ins:;
/* -------------------
* assume that these two operations are atomic (because
* of the spinlock).
@@ -558,6 +572,7 @@ ProcSleep(PROC_QUEUE *waitQueue,/* lock->waitProcs */
SHMQueueInsertTL(&(proc->links), &(MyProc->links));
waitQueue->size++;
+ lock->waitMask |= myMask;
SpinRelease(spinlock);
/* --------------
@@ -608,6 +623,8 @@ ProcSleep(PROC_QUEUE *waitQueue,/* lock->waitProcs */
*/
SpinAcquire(spinlock);
+rt:;
+
#ifdef LOCK_MGR_DEBUG
/* Just to get meaningful debug messages from DumpLocks() */
MyProc->waitLock = (LOCK *) NULL;
@@ -655,9 +672,9 @@ int
ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod, LOCK *lock)
{
PROC *proc;
- int count;
+ int count = 0;
int trace_flag;
- int last_locktype = -1;
+ int last_locktype = 0;
int queue_size = queue->size;
Assert(queue->size >= 0);
@@ -666,7 +683,6 @@ ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod, LOCK *lock)
return STATUS_NOT_FOUND;
proc = (PROC *) MAKE_PTR(queue->links.prev);
- count = 0;
while ((queue_size--) && (proc))
{
@@ -678,7 +694,7 @@ ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod, LOCK *lock)
continue;
/*
- * This proc conflicts with locks held by others, ignored.
+ * Does this proc conflict with locks held by others ?
*/
if (LockResolveConflicts(lockmethod,
lock,
@@ -686,6 +702,8 @@ ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod, LOCK *lock)
proc->xid,
(XIDLookupEnt *) NULL) != STATUS_OK)
{
+ if (count != 0)
+ break;
last_locktype = proc->token;
continue;
}
@@ -828,7 +846,7 @@ HandleDeadLock(int sig)
*/
UnlockLockTable();
- elog(NOTICE, "Deadlock detected -- See the lock(l) manual page for a possible cause.");
+ elog(NOTICE, DeadLockMessage);
return;
}
diff --git a/src/include/storage/lmgr.h b/src/include/storage/lmgr.h
index 0e78c33de9b..3af0071d468 100644
--- a/src/include/storage/lmgr.h
+++ b/src/include/storage/lmgr.h
@@ -6,7 +6,7 @@
*
* Copyright (c) 1994, Regents of the University of California
*
- * $Id: lmgr.h,v 1.18 1999/02/19 06:06:34 tgl Exp $
+ * $Id: lmgr.h,v 1.19 1999/05/07 01:23:05 vadim Exp $
*
*-------------------------------------------------------------------------
*/
@@ -25,8 +25,6 @@
#define ExclusiveLock 6
#define AccessExclusiveLock 7
-#define ExtendLock 8
-
extern LOCKMETHOD LockTableId;
diff --git a/src/include/storage/lock.h b/src/include/storage/lock.h
index da77f1d523c..387f164247c 100644
--- a/src/include/storage/lock.h
+++ b/src/include/storage/lock.h
@@ -6,7 +6,7 @@
*
* Copyright (c) 1994, Regents of the University of California
*
- * $Id: lock.h,v 1.24 1999/03/06 21:17:43 tgl Exp $
+ * $Id: lock.h,v 1.25 1999/05/07 01:23:07 vadim Exp $
*
*-------------------------------------------------------------------------
*/
@@ -41,7 +41,7 @@ typedef int LOCKMODE;
typedef int LOCKMETHOD;
/* MAX_LOCKMODES cannot be larger than the bits in MASK */
-#define MAX_LOCKMODES 9
+#define MAX_LOCKMODES 8
/*
* MAX_LOCK_METHODS corresponds to the number of spin locks allocated in
@@ -204,6 +204,7 @@ typedef struct LOCK
/* data */
int mask;
+ int waitMask;
PROC_QUEUE waitProcs;
int holders[MAX_LOCKMODES];
int nHolding;
diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h
index 952f50553ca..53b677858fd 100644
--- a/src/include/storage/proc.h
+++ b/src/include/storage/proc.h
@@ -6,7 +6,7 @@
*
* Copyright (c) 1994, Regents of the University of California
*
- * $Id: proc.h,v 1.20 1999/02/19 07:10:47 tgl Exp $
+ * $Id: proc.h,v 1.21 1999/05/07 01:23:07 vadim Exp $
*
*-------------------------------------------------------------------------
*/
@@ -48,8 +48,9 @@ typedef struct proc
* were starting our xact: vacuum must not
* remove tuples deleted by xid >= xmin ! */
- LOCK *waitLock; /* Lock we're sleeping on */
- int token; /* info for proc wakeup routines */
+ LOCK *waitLock; /* Lock we're sleeping on ... */
+ int token; /* type of lock we sleeping for */
+ int holdLock; /* while holding these locks */
int pid; /* This procs process id */
short sLocks[MAX_SPINS]; /* Spin lock stats */
SHM_QUEUE lockQueue; /* locks associated with current
@@ -116,8 +117,8 @@ extern bool ProcRemove(int pid);
/* make static in storage/lmgr/proc.c -- jolly */
extern void ProcQueueInit(PROC_QUEUE *queue);
-extern int ProcSleep(PROC_QUEUE *queue, SPINLOCK spinlock, int token,
- int prio, LOCK *lock, TransactionId xid);
+extern int ProcSleep(PROC_QUEUE *queue, LOCKMETHODCTL *lockctl, int token,
+ LOCK *lock);
extern int ProcLockWakeup(PROC_QUEUE *queue, LOCKMETHOD lockmethod,
LOCK *lock);
extern void ProcAddLock(SHM_QUEUE *elem);