diff options
Diffstat (limited to 'src/backend/access/heap/heapam.c')
-rw-r--r-- | src/backend/access/heap/heapam.c | 294 |
1 files changed, 206 insertions, 88 deletions
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index ee604df2cae..06b1fdb6440 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/heap/heapam.c,v 1.188 2005/04/28 21:47:10 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/access/heap/heapam.c,v 1.189 2005/04/30 19:03:32 tgl Exp $ * * * INTERFACE ROUTINES @@ -1209,12 +1209,13 @@ heap_delete(Relation relation, ItemPointer tid, ItemPointer ctid, CommandId cid, Snapshot crosscheck, bool wait) { + HTSU_Result result; TransactionId xid = GetCurrentTransactionId(); ItemId lp; HeapTupleData tp; PageHeader dp; Buffer buffer; - HTSU_Result result; + bool have_tuple_lock = false; Assert(ItemPointerIsValid(tid)); @@ -1243,20 +1244,36 @@ l1: TransactionId xwait; uint16 infomask; + /* must copy state data before unlocking buffer */ + xwait = HeapTupleHeaderGetXmax(tp.t_data); + infomask = tp.t_data->t_infomask; + + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + + /* + * Acquire tuple lock to establish our priority for the tuple + * (see heap_lock_tuple). LockTuple will release us when we are + * next-in-line for the tuple. + * + * If we are forced to "start over" below, we keep the tuple lock; + * this arranges that we stay at the head of the line while + * rechecking tuple state. + */ + if (!have_tuple_lock) + { + LockTuple(relation, &(tp.t_self), ExclusiveLock); + have_tuple_lock = true; + } + /* * Sleep until concurrent transaction ends. Note that we don't care * if the locker has an exclusive or shared lock, because we need * exclusive. */ - /* must copy state data before unlocking buffer */ - xwait = HeapTupleHeaderGetXmax(tp.t_data); - infomask = tp.t_data->t_infomask; - if (infomask & HEAP_XMAX_IS_MULTI) { /* wait for multixact */ - LockBuffer(buffer, BUFFER_LOCK_UNLOCK); MultiXactIdWait((MultiXactId) xwait); LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); @@ -1283,7 +1300,6 @@ l1: else { /* wait for regular transaction to end */ - LockBuffer(buffer, BUFFER_LOCK_UNLOCK); XactLockTableWait(xwait); LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); @@ -1335,6 +1351,8 @@ l1: *ctid = tp.t_data->t_ctid; LockBuffer(buffer, BUFFER_LOCK_UNLOCK); ReleaseBuffer(buffer); + if (have_tuple_lock) + UnlockTuple(relation, &(tp.t_self), ExclusiveLock); return result; } @@ -1406,6 +1424,12 @@ l1: WriteBuffer(buffer); + /* + * Release the lmgr tuple lock, if we had it. + */ + if (have_tuple_lock) + UnlockTuple(relation, &(tp.t_self), ExclusiveLock); + return HeapTupleMayBeUpdated; } @@ -1476,6 +1500,7 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup, ItemPointer ctid, CommandId cid, Snapshot crosscheck, bool wait) { + HTSU_Result result; TransactionId xid = GetCurrentTransactionId(); ItemId lp; HeapTupleData oldtup; @@ -1486,7 +1511,7 @@ heap_update(Relation relation, ItemPointer otid, HeapTuple newtup, already_marked; Size newtupsize, pagefree; - HTSU_Result result; + bool have_tuple_lock = false; Assert(ItemPointerIsValid(otid)); @@ -1522,20 +1547,36 @@ l2: TransactionId xwait; uint16 infomask; + /* must copy state data before unlocking buffer */ + xwait = HeapTupleHeaderGetXmax(oldtup.t_data); + infomask = oldtup.t_data->t_infomask; + + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + + /* + * Acquire tuple lock to establish our priority for the tuple + * (see heap_lock_tuple). LockTuple will release us when we are + * next-in-line for the tuple. + * + * If we are forced to "start over" below, we keep the tuple lock; + * this arranges that we stay at the head of the line while + * rechecking tuple state. + */ + if (!have_tuple_lock) + { + LockTuple(relation, &(oldtup.t_self), ExclusiveLock); + have_tuple_lock = true; + } + /* * Sleep until concurrent transaction ends. Note that we don't care * if the locker has an exclusive or shared lock, because we need * exclusive. */ - /* must copy state data before unlocking buffer */ - xwait = HeapTupleHeaderGetXmax(oldtup.t_data); - infomask = oldtup.t_data->t_infomask; - if (infomask & HEAP_XMAX_IS_MULTI) { /* wait for multixact */ - LockBuffer(buffer, BUFFER_LOCK_UNLOCK); MultiXactIdWait((MultiXactId) xwait); LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); @@ -1562,7 +1603,6 @@ l2: else { /* wait for regular transaction to end */ - LockBuffer(buffer, BUFFER_LOCK_UNLOCK); XactLockTableWait(xwait); LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); @@ -1614,6 +1654,8 @@ l2: *ctid = oldtup.t_data->t_ctid; LockBuffer(buffer, BUFFER_LOCK_UNLOCK); ReleaseBuffer(buffer); + if (have_tuple_lock) + UnlockTuple(relation, &(oldtup.t_self), ExclusiveLock); return result; } @@ -1803,6 +1845,12 @@ l2: */ CacheInvalidateHeapTuple(relation, newtup); + /* + * Release the lmgr tuple lock, if we had it. + */ + if (have_tuple_lock) + UnlockTuple(relation, &(oldtup.t_self), ExclusiveLock); + return HeapTupleMayBeUpdated; } @@ -1847,17 +1895,53 @@ simple_heap_update(Relation relation, ItemPointer otid, HeapTuple tup) /* * heap_lock_tuple - lock a tuple in shared or exclusive mode + * + * NOTES: because the shared-memory lock table is of finite size, but users + * could reasonably want to lock large numbers of tuples, we do not rely on + * the standard lock manager to store tuple-level locks over the long term. + * Instead, a tuple is marked as locked by setting the current transaction's + * XID as its XMAX, and setting additional infomask bits to distinguish this + * usage from the more normal case of having deleted the tuple. When + * multiple transactions concurrently share-lock a tuple, the first locker's + * XID is replaced in XMAX with a MultiTransactionId representing the set of + * XIDs currently holding share-locks. + * + * When it is necessary to wait for a tuple-level lock to be released, the + * basic delay is provided by XactLockTableWait or MultiXactIdWait on the + * contents of the tuple's XMAX. However, that mechanism will release all + * waiters concurrently, so there would be a race condition as to which + * waiter gets the tuple, potentially leading to indefinite starvation of + * some waiters. The possibility of share-locking makes the problem much + * worse --- a steady stream of share-lockers can easily block an exclusive + * locker forever. To provide more reliable semantics about who gets a + * tuple-level lock first, we use the standard lock manager. The protocol + * for waiting for a tuple-level lock is really + * LockTuple() + * XactLockTableWait() + * mark tuple as locked by me + * UnlockTuple() + * When there are multiple waiters, arbitration of who is to get the lock next + * is provided by LockTuple(). However, at most one tuple-level lock will + * be held or awaited per backend at any time, so we don't risk overflow + * of the lock table. Note that incoming share-lockers are required to + * do LockTuple as well, if there is any conflict, to ensure that they don't + * starve out waiting exclusive-lockers. However, if there is not any active + * conflict for a tuple, we don't incur any extra overhead. */ HTSU_Result heap_lock_tuple(Relation relation, HeapTuple tuple, Buffer *buffer, CommandId cid, LockTupleMode mode) { - TransactionId xid; + HTSU_Result result; ItemPointer tid = &(tuple->t_self); ItemId lp; PageHeader dp; - HTSU_Result result; + TransactionId xid; uint16 new_infomask; + LOCKMODE tuple_lock_type; + bool have_tuple_lock = false; + + tuple_lock_type = (mode == LockTupleShared) ? ShareLock : ExclusiveLock; *buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid)); LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE); @@ -1879,94 +1963,121 @@ l3: } else if (result == HeapTupleBeingUpdated) { - if (mode == LockTupleShared && - (tuple->t_data->t_infomask & HEAP_XMAX_SHARED_LOCK)) - result = HeapTupleMayBeUpdated; - else + TransactionId xwait; + uint16 infomask; + + /* must copy state data before unlocking buffer */ + xwait = HeapTupleHeaderGetXmax(tuple->t_data); + infomask = tuple->t_data->t_infomask; + + LockBuffer(*buffer, BUFFER_LOCK_UNLOCK); + + /* + * Acquire tuple lock to establish our priority for the tuple. + * LockTuple will release us when we are next-in-line for the + * tuple. We must do this even if we are share-locking. + * + * If we are forced to "start over" below, we keep the tuple lock; + * this arranges that we stay at the head of the line while + * rechecking tuple state. + */ + if (!have_tuple_lock) { - TransactionId xwait; - uint16 infomask; + LockTuple(relation, tid, tuple_lock_type); + have_tuple_lock = true; + } + if (mode == LockTupleShared && (infomask & HEAP_XMAX_SHARED_LOCK)) + { /* - * Sleep until concurrent transaction ends. + * Acquiring sharelock when there's at least one sharelocker + * already. We need not wait for him/them to complete. */ + LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE); - /* must copy state data before unlocking buffer */ - xwait = HeapTupleHeaderGetXmax(tuple->t_data); - infomask = tuple->t_data->t_infomask; - - if (infomask & HEAP_XMAX_IS_MULTI) - { - /* wait for multixact */ - LockBuffer(*buffer, BUFFER_LOCK_UNLOCK); - MultiXactIdWait((MultiXactId) xwait); - LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE); - - /* - * If xwait had just locked the tuple then some other xact - * could update this tuple before we get to this point. - * Check for xmax change, and start over if so. - */ - if (!(tuple->t_data->t_infomask & HEAP_XMAX_IS_MULTI) || - !TransactionIdEquals(HeapTupleHeaderGetXmax(tuple->t_data), - xwait)) - goto l3; + /* + * Make sure it's still a shared lock, else start over. (It's + * OK if the ownership of the shared lock has changed, though.) + */ + if (!(tuple->t_data->t_infomask & HEAP_XMAX_SHARED_LOCK)) + goto l3; + } + else if (infomask & HEAP_XMAX_IS_MULTI) + { + /* wait for multixact to end */ + MultiXactIdWait((MultiXactId) xwait); + LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE); - /* - * You might think the multixact is necessarily done here, but - * not so: it could have surviving members, namely our own xact - * or other subxacts of this backend. It is legal for us to - * lock the tuple in either case, however. We don't bother - * changing the on-disk hint bits since we are about to - * overwrite the xmax altogether. - */ - } - else - { - /* wait for regular transaction to end */ - LockBuffer(*buffer, BUFFER_LOCK_UNLOCK); - XactLockTableWait(xwait); - LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE); + /* + * If xwait had just locked the tuple then some other xact + * could update this tuple before we get to this point. + * Check for xmax change, and start over if so. + */ + if (!(tuple->t_data->t_infomask & HEAP_XMAX_IS_MULTI) || + !TransactionIdEquals(HeapTupleHeaderGetXmax(tuple->t_data), + xwait)) + goto l3; - /* - * xwait is done, but if xwait had just locked the tuple then - * some other xact could update this tuple before we get to - * this point. Check for xmax change, and start over if so. - */ - if ((tuple->t_data->t_infomask & HEAP_XMAX_IS_MULTI) || - !TransactionIdEquals(HeapTupleHeaderGetXmax(tuple->t_data), - xwait)) - goto l3; - - /* Otherwise we can mark it committed or aborted */ - if (!(tuple->t_data->t_infomask & (HEAP_XMAX_COMMITTED | - HEAP_XMAX_INVALID))) - { - if (TransactionIdDidCommit(xwait)) - tuple->t_data->t_infomask |= HEAP_XMAX_COMMITTED; - else - tuple->t_data->t_infomask |= HEAP_XMAX_INVALID; - SetBufferCommitInfoNeedsSave(*buffer); - } - } + /* + * You might think the multixact is necessarily done here, but + * not so: it could have surviving members, namely our own xact + * or other subxacts of this backend. It is legal for us to + * lock the tuple in either case, however. We don't bother + * changing the on-disk hint bits since we are about to + * overwrite the xmax altogether. + */ + } + else + { + /* wait for regular transaction to end */ + XactLockTableWait(xwait); + LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE); /* - * We may lock if previous xmax aborted, or if it committed - * but only locked the tuple without updating it. + * xwait is done, but if xwait had just locked the tuple then + * some other xact could update this tuple before we get to + * this point. Check for xmax change, and start over if so. */ - if (tuple->t_data->t_infomask & (HEAP_XMAX_INVALID | - HEAP_IS_LOCKED)) - result = HeapTupleMayBeUpdated; - else - result = HeapTupleUpdated; + if ((tuple->t_data->t_infomask & HEAP_XMAX_IS_MULTI) || + !TransactionIdEquals(HeapTupleHeaderGetXmax(tuple->t_data), + xwait)) + goto l3; + + /* Otherwise we can mark it committed or aborted */ + if (!(tuple->t_data->t_infomask & (HEAP_XMAX_COMMITTED | + HEAP_XMAX_INVALID))) + { + if (TransactionIdDidCommit(xwait)) + tuple->t_data->t_infomask |= HEAP_XMAX_COMMITTED; + else + tuple->t_data->t_infomask |= HEAP_XMAX_INVALID; + SetBufferCommitInfoNeedsSave(*buffer); + } } + + /* + * We may lock if previous xmax aborted, or if it committed + * but only locked the tuple without updating it. The case where + * we didn't wait because we are joining an existing shared lock + * is correctly handled, too. + */ + if (tuple->t_data->t_infomask & (HEAP_XMAX_INVALID | + HEAP_IS_LOCKED)) + result = HeapTupleMayBeUpdated; + else + result = HeapTupleUpdated; } if (result != HeapTupleMayBeUpdated) { + ItemPointerData newctid = tuple->t_data->t_ctid; + Assert(result == HeapTupleSelfUpdated || result == HeapTupleUpdated); - tuple->t_self = tuple->t_data->t_ctid; LockBuffer(*buffer, BUFFER_LOCK_UNLOCK); + if (have_tuple_lock) + UnlockTuple(relation, tid, tuple_lock_type); + /* can't overwrite t_self (== *tid) until after above Unlock */ + tuple->t_self = newctid; return result; } @@ -2142,6 +2253,13 @@ l3: WriteNoReleaseBuffer(*buffer); + /* + * Now that we have successfully marked the tuple as locked, we can + * release the lmgr tuple lock, if we had it. + */ + if (have_tuple_lock) + UnlockTuple(relation, tid, tuple_lock_type); + return HeapTupleMayBeUpdated; } |