diff options
Diffstat (limited to 'src/backend/access/heap')
-rw-r--r-- | src/backend/access/heap/heapam.c | 496 | ||||
-rw-r--r-- | src/backend/access/heap/hio.c | 71 | ||||
-rw-r--r-- | src/backend/access/heap/stats.c | 8 |
3 files changed, 291 insertions, 284 deletions
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 76fcfaf2c82..8ffd9d41922 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -7,7 +7,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.38 1998/11/27 19:51:36 vadim Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/heap/heapam.c,v 1.39 1998/12/15 12:45:13 vadim Exp $ * * * INTERFACE ROUTINES @@ -89,15 +89,12 @@ #include <utils/inval.h> #include <utils/memutils.h> - #ifndef HAVE_MEMMOVE #include <regex/utils.h> #else #include <string.h> #endif -static void doinsert(Relation relation, HeapTuple tup); - /* ---------------------------------------------------------------- * heap support routines * ---------------------------------------------------------------- @@ -214,7 +211,7 @@ static void heapgettup(Relation relation, HeapTuple tuple, int dir, - Buffer *buf, + Buffer *buffer, Snapshot snapshot, int nkeys, ScanKey key) @@ -255,7 +252,7 @@ heapgettup(Relation relation, elog(DEBUG, "heapgettup(%s, tid=0x%x, dir=%d, ...)", RelationGetRelationName(relation), tid, dir); } - elog(DEBUG, "heapgettup(..., b=0x%x, nkeys=%d, key=0x%x", buf, nkeys, key); + elog(DEBUG, "heapgettup(..., b=0x%x, nkeys=%d, key=0x%x", buffer, nkeys, key); elog(DEBUG, "heapgettup: relation(%c)=`%s', %p", relation->rd_rel->relkind, &relation->rd_rel->relname, @@ -288,25 +285,26 @@ heapgettup(Relation relation, /* assume it is a valid TID XXX */ if (ItemPointerIsValid(tid) == false) { - *buf = InvalidBuffer; + *buffer = InvalidBuffer; tuple->t_data = NULL; return; } - *buf = RelationGetBufferWithBuffer(relation, + *buffer = RelationGetBufferWithBuffer(relation, ItemPointerGetBlockNumber(tid), - *buf); + *buffer); -#ifndef NO_BUFFERISVALID - if (!BufferIsValid(*buf)) + if (!BufferIsValid(*buffer)) elog(ERROR, "heapgettup: failed ReadBuffer"); -#endif - dp = (Page) BufferGetPage(*buf); + LockBuffer(*buffer, BUFFER_LOCK_SHARE); + + dp = (Page) BufferGetPage(*buffer); lineoff = ItemPointerGetOffsetNumber(tid); lpp = PageGetItemId(dp, lineoff); tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp); tuple->t_len = ItemIdGetLength(lpp); + LockBuffer(*buffer, BUFFER_LOCK_UNLOCK); return; } @@ -328,18 +326,18 @@ heapgettup(Relation relation, } if (page < 0) { - *buf = InvalidBuffer; + *buffer = InvalidBuffer; tuple->t_data = NULL; return; } - *buf = RelationGetBufferWithBuffer(relation, page, *buf); -#ifndef NO_BUFFERISVALID - if (!BufferIsValid(*buf)) + *buffer = RelationGetBufferWithBuffer(relation, page, *buffer); + if (!BufferIsValid(*buffer)) elog(ERROR, "heapgettup: failed ReadBuffer"); -#endif - dp = (Page) BufferGetPage(*buf); + LockBuffer(*buffer, BUFFER_LOCK_SHARE); + + dp = (Page) BufferGetPage(*buffer); lines = PageGetMaxOffsetNumber(dp); if (tid == NULL) { @@ -373,19 +371,19 @@ heapgettup(Relation relation, if (page >= pages) { - *buf = InvalidBuffer; + *buffer = InvalidBuffer; tuple->t_data = NULL; return; } /* page and lineoff now reference the physically next tid */ - *buf = RelationGetBufferWithBuffer(relation, page, *buf); -#ifndef NO_BUFFERISVALID - if (!BufferIsValid(*buf)) + *buffer = RelationGetBufferWithBuffer(relation, page, *buffer); + if (!BufferIsValid(*buffer)) elog(ERROR, "heapgettup: failed ReadBuffer"); -#endif - dp = (Page) BufferGetPage(*buf); + LockBuffer(*buffer, BUFFER_LOCK_SHARE); + + dp = (Page) BufferGetPage(*buffer); lines = PageGetMaxOffsetNumber(dp); } @@ -420,10 +418,13 @@ heapgettup(Relation relation, * if current tuple qualifies, return it. * ---------------- */ - HeapTupleSatisfies(tuple, relation, *buf, (PageHeader) dp, + HeapTupleSatisfies(tuple, relation, *buffer, (PageHeader) dp, snapshot, nkeys, key); if (tuple->t_data != NULL) + { + LockBuffer(*buffer, BUFFER_LOCK_UNLOCK); return; + } } /* ---------------- @@ -448,6 +449,7 @@ heapgettup(Relation relation, * this page and it's time to move to the next.. * ---------------- */ + LockBuffer(*buffer, BUFFER_LOCK_UNLOCK); page = nextpage(page, dir); /* ---------------- @@ -456,20 +458,19 @@ heapgettup(Relation relation, */ if (page < 0 || page >= pages) { - if (BufferIsValid(*buf)) - ReleaseBuffer(*buf); - *buf = InvalidBuffer; + if (BufferIsValid(*buffer)) + ReleaseBuffer(*buffer); + *buffer = InvalidBuffer; tuple->t_data = NULL; return; } - *buf = ReleaseAndReadBuffer(*buf, relation, page); + *buffer = ReleaseAndReadBuffer(*buffer, relation, page); -#ifndef NO_BUFFERISVALID - if (!BufferIsValid(*buf)) + if (!BufferIsValid(*buffer)) elog(ERROR, "heapgettup: failed ReadBuffer"); -#endif - dp = (Page) BufferGetPage(*buf); + LockBuffer(*buffer, BUFFER_LOCK_SHARE); + dp = (Page) BufferGetPage(*buffer); lines = PageGetMaxOffsetNumber((Page) dp); linesleft = lines - 1; if (dir < 0) @@ -485,13 +486,6 @@ heapgettup(Relation relation, } } -static void -doinsert(Relation relation, HeapTuple tup) -{ - RelationPutHeapTupleAtEnd(relation, tup); - return; -} - /* ---------------------------------------------------------------- * heap access method interface @@ -599,11 +593,7 @@ heap_beginscan(Relation relation, if (RelationIsValid(relation) == false) elog(ERROR, "heap_beginscan: !RelationIsValid(relation)"); - /* ---------------- - * set relation level read lock - * ---------------- - */ - RelationSetLockForRead(relation); + LockRelation(relation, AccessShareLock); /* XXX someday assert SelfTimeQual if relkind == RELKIND_UNCATALOGED */ if (relation->rd_rel->relkind == RELKIND_UNCATALOGED) @@ -707,13 +697,7 @@ heap_endscan(HeapScanDesc scan) */ RelationDecrementReferenceCount(scan->rs_rd); - /* ---------------- - * Non 2-phase read locks on catalog relations - * ---------------- - */ - if (IsSystemRelationName(RelationGetRelationName(scan->rs_rd)->data)) - - RelationUnsetLockForRead(scan->rs_rd); + UnlockRelation(scan->rs_rd, AccessShareLock); pfree(scan); /* XXX */ } @@ -997,14 +981,6 @@ heap_fetch(Relation relation, IncrHeapAccessStat(local_fetch); IncrHeapAccessStat(global_fetch); - /* - * Note: This is collosally expensive - does two system calls per - * indexscan tuple fetch. Not good, and since we should be doing page - * level locking by the scanner anyway, it is commented out. - */ - - /* RelationSetLockForTupleRead(relation, tid); */ - /* ---------------- * get the buffer from the relation descriptor * Note that this does a buffer pin. @@ -1013,13 +989,11 @@ heap_fetch(Relation relation, buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid)); -#ifndef NO_BUFFERISVALID if (!BufferIsValid(buffer)) - { elog(ERROR, "heap_fetch: %s relation: ReadBuffer(%lx) failed", &relation->rd_rel->relname, (long) tid); - } -#endif + + LockBuffer(buffer, BUFFER_LOCK_SHARE); /* ---------------- * get the item line pointer corresponding to the requested tid @@ -1047,6 +1021,8 @@ heap_fetch(Relation relation, HeapTupleSatisfies(tuple, relation, buffer, dp, snapshot, 0, (ScanKey) NULL); + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + if (tuple->t_data == NULL) { ReleaseBuffer(buffer); @@ -1091,14 +1067,6 @@ heap_insert(Relation relation, HeapTuple tup) IncrHeapAccessStat(global_insert); /* ---------------- - * set relation level write lock. If this is a "local" relation (not - * visible to others), we don't need to set a write lock. - * ---------------- - */ - if (!relation->rd_islocal) - RelationSetLockForWrite(relation); - - /* ---------------- * If the object id of this tuple has already been assigned, trust * the caller. There are a couple of ways this can happen. At initial * db creation, the backend program sets oids for tuples. When we @@ -1122,228 +1090,178 @@ heap_insert(Relation relation, HeapTuple tup) tup->t_data->t_infomask &= ~(HEAP_XACT_MASK); tup->t_data->t_infomask |= HEAP_XMAX_INVALID; - doinsert(relation, tup); + RelationPutHeapTupleAtEnd(relation, tup); if (IsSystemRelationName(RelationGetRelationName(relation)->data)) { - RelationUnsetLockForWrite(relation); - - /* ---------------- - * invalidate caches (only works for system relations) - * ---------------- - */ RelationInvalidateHeapTuple(relation, tup); } return tup->t_data->t_oid; } -/* ---------------- - * heap_delete - delete a tuple - * - * Must decide how to handle errors. - * ---------------- +/* + * heap_delete - delete a tuple */ int -heap_delete(Relation relation, ItemPointer tid) +heap_delete(Relation relation, ItemPointer tid, ItemPointer ctid) { ItemId lp; HeapTupleData tp; PageHeader dp; - Buffer buf; + Buffer buffer; + int result; - /* ---------------- - * increment access statistics - * ---------------- - */ + /* increment access statistics */ IncrHeapAccessStat(local_delete); IncrHeapAccessStat(global_delete); - /* ---------------- - * sanity check - * ---------------- - */ Assert(ItemPointerIsValid(tid)); - /* ---------------- - * set relation level write lock - * ---------------- - */ - RelationSetLockForWrite(relation); - - buf = ReadBuffer(relation, ItemPointerGetBlockNumber(tid)); + buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid)); -#ifndef NO_BUFFERISVALID - if (!BufferIsValid(buf)) - { /* XXX L_SH better ??? */ + if (!BufferIsValid(buffer)) elog(ERROR, "heap_delete: failed ReadBuffer"); - } -#endif /* NO_BUFFERISVALID */ - dp = (PageHeader) BufferGetPage(buf); - lp = PageGetItemId(dp, ItemPointerGetOffsetNumber(tid)); + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); - /* - * Just like test against non-functional updates we try to catch - * non-functional delete attempts. - vadim 05/05/97 - */ + dp = (PageHeader) BufferGetPage(buffer); + lp = PageGetItemId(dp, ItemPointerGetOffsetNumber(tid)); tp.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lp); tp.t_len = ItemIdGetLength(lp); tp.t_self = *tid; - if (TupleUpdatedByCurXactAndCmd(&tp)) +l1: + result = HeapTupleSatisfiesUpdate(&tp); + + if (result == HeapTupleInvisible) { - - /* - * Vadim says this is no longer needed 1998/6/15 elog(NOTICE, - * "Non-functional delete, tuple already deleted"); - */ - if (IsSystemRelationName(RelationGetRelationName(relation)->data)) - RelationUnsetLockForWrite(relation); - ReleaseBuffer(buf); - return 1; + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + ReleaseBuffer(buffer); + elog(ERROR, "heap_delete: (am)invalid tid"); } - /* ---------------- - * check that we're deleteing a valid item - * ---------------- - */ - HeapTupleSatisfies((&tp), relation, buf, dp, - false, 0, (ScanKey) NULL); - if (!(tp.t_data)) + else if (result == HeapTupleBeingUpdated) { - - /* XXX call something else */ - ReleaseBuffer(buf); - - elog(ERROR, "heap_delete: (am)invalid tid"); + TransactionId xwait = tp.t_data->t_xmax; + + /* sleep untill concurrent transaction ends */ + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + XactLockTableWait(xwait); + + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); + if (TransactionIdDidAbort(xwait)) + goto l1; + /* concurrent xact committed */ + Assert(tp.t_data->t_xmax == xwait); + if (!(tp.t_data->t_infomask & HEAP_XMAX_COMMITTED)) + { + tp.t_data->t_infomask |= HEAP_XMAX_COMMITTED; + SetBufferCommitInfoNeedsSave(buffer); + } + /* if tuple was marked for update but not updated... */ + if (tp.t_data->t_infomask & HEAP_MARKED_FOR_UPDATE) + result = HeapTupleMayBeUpdated; + else + result = HeapTupleUpdated; + } + if (result != HeapTupleMayBeUpdated) + { + Assert(result == HeapTupleSelfUpdated || result == HeapTupleUpdated); + if (ctid != NULL) + *ctid = tp.t_data->t_ctid; + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + ReleaseBuffer(buffer); + return result; } - /* ---------------- - * get the tuple and lock tell the buffer manager we want - * exclusive access to the page - * ---------------- - */ - - /* ---------------- - * store transaction information of xact deleting the tuple - * ---------------- - */ + /* store transaction information of xact deleting the tuple */ TransactionIdStore(GetCurrentTransactionId(), &(tp.t_data->t_xmax)); tp.t_data->t_cmax = GetCurrentCommandId(); - tp.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED | HEAP_XMAX_INVALID); + tp.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED | + HEAP_XMAX_INVALID | HEAP_MARKED_FOR_UPDATE); - /* ---------------- - * invalidate caches - * ---------------- - */ + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + + /* invalidate caches */ RelationInvalidateHeapTuple(relation, &tp); - WriteBuffer(buf); - if (IsSystemRelationName(RelationGetRelationName(relation)->data)) - RelationUnsetLockForWrite(relation); + WriteBuffer(buffer); - return 0; + return HeapTupleMayBeUpdated; } -/* ---------------- - * heap_replace - replace a tuple - * - * Must decide how to handle errors. - * - * Fix arguments, work with indexes. - * - * 12/30/93 - modified the return value to be 1 when - * a non-functional update is detected. This - * prevents the calling routine from updating - * indices unnecessarily. -kw - * - * ---------------- +/* + * heap_replace - replace a tuple */ int -heap_replace(Relation relation, ItemPointer otid, HeapTuple newtup) +heap_replace(Relation relation, ItemPointer otid, HeapTuple newtup, + ItemPointer ctid) { ItemId lp; HeapTupleData oldtup; - Page dp; + PageHeader dp; Buffer buffer; + int result; - /* ---------------- - * increment access statistics - * ---------------- - */ + /* increment access statistics */ IncrHeapAccessStat(local_replace); IncrHeapAccessStat(global_replace); - /* ---------------- - * sanity checks - * ---------------- - */ Assert(ItemPointerIsValid(otid)); - /* ---------------- - * set relation level write lock - * ---------------- - */ - if (!relation->rd_islocal) - RelationSetLockForWrite(relation); - buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(otid)); if (!BufferIsValid(buffer)) elog(ERROR, "amreplace: failed ReadBuffer"); + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); - dp = (Page) BufferGetPage(buffer); + dp = (PageHeader) BufferGetPage(buffer); lp = PageGetItemId(dp, ItemPointerGetOffsetNumber(otid)); - /* ---------------- - * logically delete old item - * ---------------- - */ - oldtup.t_data = (HeapTupleHeader) PageGetItem(dp, lp); oldtup.t_len = ItemIdGetLength(lp); oldtup.t_self = *otid; - /* ----------------- - * the following test should be able to catch all non-functional - * update attempts and shut out all ghost tuples. - * XXX In the future, Spyros may need to update the rule lock on a tuple - * more than once within the same command and same transaction. - * He will have to introduce a new flag to override the following check. - * -- Wei - * - * ----------------- - */ - - if (TupleUpdatedByCurXactAndCmd(&oldtup)) +l2: + result = HeapTupleSatisfiesUpdate(&oldtup); + + if (result == HeapTupleInvisible) { - elog(NOTICE, "Non-functional update, only first update is performed"); - if (IsSystemRelationName(RelationGetRelationName(relation)->data)) - RelationUnsetLockForWrite(relation); + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); ReleaseBuffer(buffer); - return 1; + elog(ERROR, "heap_replace: (am)invalid tid"); } - - /* ---------------- - * check that we're replacing a valid item - - * - * NOTE that this check must follow the non-functional update test - * above as it can happen that we try to 'replace' the same tuple - * twice in a single transaction. The second time around the - * tuple will fail the NowTimeQual. We don't want to abort the - * xact, we only want to flag the 'non-functional' NOTICE. -mer - * ---------------- - */ - HeapTupleSatisfies((&oldtup), - relation, - buffer, - (PageHeader) dp, - false, - 0, - (ScanKey) NULL); - if (!(oldtup.t_data)) + else if (result == HeapTupleBeingUpdated) { + TransactionId xwait = oldtup.t_data->t_xmax; + + /* sleep untill concurrent transaction ends */ + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + XactLockTableWait(xwait); + + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); + if (TransactionIdDidAbort(xwait)) + goto l2; + /* concurrent xact committed */ + Assert(oldtup.t_data->t_xmax == xwait); + if (!(oldtup.t_data->t_infomask & HEAP_XMAX_COMMITTED)) + { + oldtup.t_data->t_infomask |= HEAP_XMAX_COMMITTED; + SetBufferCommitInfoNeedsSave(buffer); + } + /* if tuple was marked for update but not updated... */ + if (oldtup.t_data->t_infomask & HEAP_MARKED_FOR_UPDATE) + result = HeapTupleMayBeUpdated; + else + result = HeapTupleUpdated; + } + if (result != HeapTupleMayBeUpdated) + { + Assert(result == HeapTupleSelfUpdated || result == HeapTupleUpdated); + if (ctid != NULL) + *ctid = oldtup.t_data->t_ctid; + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); ReleaseBuffer(buffer); - elog(ERROR, "heap_replace: (am)invalid otid"); + return result; } /* XXX order problems if not atomic assignment ??? */ @@ -1354,42 +1272,122 @@ heap_replace(Relation relation, ItemPointer otid, HeapTuple newtup) newtup->t_data->t_infomask &= ~(HEAP_XACT_MASK); newtup->t_data->t_infomask |= HEAP_XMAX_INVALID; - /* ---------------- - * insert new item - * ---------------- - */ + /* logically delete old item */ + TransactionIdStore(GetCurrentTransactionId(), &(oldtup.t_data->t_xmax)); + oldtup.t_data->t_cmax = GetCurrentCommandId(); + oldtup.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED | + HEAP_XMAX_INVALID | HEAP_MARKED_FOR_UPDATE); + + /* insert new item */ if ((unsigned) DOUBLEALIGN(newtup->t_len) <= PageGetFreeSpace((Page) dp)) - RelationPutHeapTuple(relation, BufferGetBlockNumber(buffer), newtup); + RelationPutHeapTuple(relation, buffer, newtup); else { - /* ---------------- - * new item won't fit on same page as old item, have to look - * for a new place to put it. - * ---------------- + /* + * New item won't fit on same page as old item, have to look + * for a new place to put it. Note that we have to unlock + * current buffer context - not good but RelationPutHeapTupleAtEnd + * uses extend lock. */ - doinsert(relation, newtup); + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + RelationPutHeapTupleAtEnd(relation, newtup); + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); } - /* ---------------- - * new item in place, now record transaction information - * ---------------- + /* + * New item in place, now record address of new tuple in + * t_ctid of old one. */ - TransactionIdStore(GetCurrentTransactionId(), &(oldtup.t_data->t_xmax)); - oldtup.t_data->t_cmax = GetCurrentCommandId(); - oldtup.t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED | HEAP_XMAX_INVALID); + oldtup.t_data->t_ctid = newtup->t_self; - /* ---------------- - * invalidate caches - * ---------------- - */ + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + + /* invalidate caches */ RelationInvalidateHeapTuple(relation, &oldtup); WriteBuffer(buffer); - if (IsSystemRelationName(RelationGetRelationName(relation)->data)) - RelationUnsetLockForWrite(relation); + return HeapTupleMayBeUpdated; +} + +/* + * heap_mark4update - mark a tuple for update + */ +int +heap_mark4update(Relation relation, HeapTuple tuple, Buffer *buffer) +{ + ItemPointer tid = &(tuple->t_self); + ItemId lp; + PageHeader dp; + int result; + + /* increment access statistics */ + IncrHeapAccessStat(local_mark4update); + IncrHeapAccessStat(global_mark4update); + + *buffer = ReadBuffer(relation, ItemPointerGetBlockNumber(tid)); + + if (!BufferIsValid(*buffer)) + elog(ERROR, "heap_mark4update: failed ReadBuffer"); + + LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE); + + dp = (PageHeader) BufferGetPage(*buffer); + lp = PageGetItemId(dp, ItemPointerGetOffsetNumber(tid)); + tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lp); + tuple->t_len = ItemIdGetLength(lp); + +l3: + result = HeapTupleSatisfiesUpdate(tuple); + + if (result == HeapTupleInvisible) + { + LockBuffer(*buffer, BUFFER_LOCK_UNLOCK); + ReleaseBuffer(*buffer); + elog(ERROR, "heap_mark4update: (am)invalid tid"); + } + else if (result == HeapTupleBeingUpdated) + { + TransactionId xwait = tuple->t_data->t_xmax; + + /* sleep untill concurrent transaction ends */ + LockBuffer(*buffer, BUFFER_LOCK_UNLOCK); + XactLockTableWait(xwait); + + LockBuffer(*buffer, BUFFER_LOCK_EXCLUSIVE); + if (TransactionIdDidAbort(xwait)) + goto l3; + /* concurrent xact committed */ + Assert(tuple->t_data->t_xmax == xwait); + if (!(tuple->t_data->t_infomask & HEAP_XMAX_COMMITTED)) + { + tuple->t_data->t_infomask |= HEAP_XMAX_COMMITTED; + SetBufferCommitInfoNeedsSave(*buffer); + } + /* if tuple was marked for update but not updated... */ + if (tuple->t_data->t_infomask & HEAP_MARKED_FOR_UPDATE) + result = HeapTupleMayBeUpdated; + else + result = HeapTupleUpdated; + } + if (result != HeapTupleMayBeUpdated) + { + Assert(result == HeapTupleSelfUpdated || result == HeapTupleUpdated); + LockBuffer(*buffer, BUFFER_LOCK_UNLOCK); + return result; + } + + /* store transaction information of xact marking the tuple */ + TransactionIdStore(GetCurrentTransactionId(), &(tuple->t_data->t_xmax)); + tuple->t_data->t_cmax = GetCurrentCommandId(); + tuple->t_data->t_infomask &= ~(HEAP_XMAX_COMMITTED | HEAP_XMAX_INVALID); + tuple->t_data->t_infomask |= HEAP_MARKED_FOR_UPDATE; + + LockBuffer(*buffer, BUFFER_LOCK_UNLOCK); + + WriteNoReleaseBuffer(*buffer); - return 0; + return HeapTupleMayBeUpdated; } /* ---------------- diff --git a/src/backend/access/heap/hio.c b/src/backend/access/heap/hio.c index 5c727145f6f..462de54ccfd 100644 --- a/src/backend/access/heap/hio.c +++ b/src/backend/access/heap/hio.c @@ -7,7 +7,7 @@ * * * IDENTIFICATION - * $Id: hio.c,v 1.14 1998/11/27 19:51:36 vadim Exp $ + * $Id: hio.c,v 1.15 1998/12/15 12:45:14 vadim Exp $ * *------------------------------------------------------------------------- */ @@ -15,6 +15,7 @@ #include <postgres.h> #include <storage/bufpage.h> +#include <access/hio.h> #include <access/heapam.h> #include <storage/bufmgr.h> #include <utils/memutils.h> @@ -29,19 +30,20 @@ * Probably needs to have an amdelunique to allow for * internal index records to be deleted and reordered as needed. * For the heap AM, this should never be needed. + * + * Note - we assume that caller hold BUFFER_LOCK_EXCLUSIVE on the buffer. + * */ void RelationPutHeapTuple(Relation relation, - BlockNumber blockIndex, + Buffer buffer, HeapTuple tuple) { - Buffer buffer; - Page pageHeader; - BlockNumber numberOfBlocks; - OffsetNumber offnum; - unsigned int len; - ItemId itemId; - Item item; + Page pageHeader; + OffsetNumber offnum; + unsigned int len; + ItemId itemId; + Item item; /* ---------------- * increment access statistics @@ -50,21 +52,6 @@ RelationPutHeapTuple(Relation relation, IncrHeapAccessStat(local_RelationPutHeapTuple); IncrHeapAccessStat(global_RelationPutHeapTuple); - Assert(RelationIsValid(relation)); - Assert(HeapTupleIsValid(tuple)); - - numberOfBlocks = RelationGetNumberOfBlocks(relation); - Assert(blockIndex < numberOfBlocks); - - buffer = ReadBuffer(relation, blockIndex); -#ifndef NO_BUFFERISVALID - if (!BufferIsValid(buffer)) - { - elog(ERROR, "RelationPutHeapTuple: no buffer for %ld in %s", - blockIndex, &relation->rd_rel->relname); - } -#endif - pageHeader = (Page) BufferGetPage(buffer); len = (unsigned) DOUBLEALIGN(tuple->t_len); /* be conservative */ Assert((int) len <= PageGetFreeSpace(pageHeader)); @@ -75,11 +62,17 @@ RelationPutHeapTuple(Relation relation, itemId = PageGetItemId((Page) pageHeader, offnum); item = PageGetItem((Page) pageHeader, itemId); - ItemPointerSet(&((HeapTupleHeader) item)->t_ctid, blockIndex, offnum); + ItemPointerSet(&((HeapTupleHeader) item)->t_ctid, + BufferGetBlockNumber(buffer), offnum); + /* + * Let the caller do this! + * WriteBuffer(buffer); + */ + /* return an accurate tuple */ - ItemPointerSet(&tuple->t_self, blockIndex, offnum); + ItemPointerSet(&tuple->t_self, BufferGetBlockNumber(buffer), offnum); } /* @@ -99,6 +92,7 @@ RelationPutHeapTuple(Relation relation, * RelationGetNumberOfBlocks to be useful. * * NOTE: This code presumes that we have a write lock on the relation. + * Not now - we use extend locking... * * Also note that this routine probably shouldn't have to exist, and does * screw up the call graph rather badly, but we are wasting so much time and @@ -116,8 +110,8 @@ RelationPutHeapTupleAtEnd(Relation relation, HeapTuple tuple) ItemId itemId; Item item; - Assert(RelationIsValid(relation)); - Assert(HeapTupleIsValid(tuple)); + if (!relation->rd_islocal) + LockRelation(relation, ExtendLock); /* * XXX This does an lseek - VERY expensive - but at the moment it is @@ -132,16 +126,18 @@ RelationPutHeapTupleAtEnd(Relation relation, HeapTuple tuple) { buffer = ReadBuffer(relation, lastblock); pageHeader = (Page) BufferGetPage(buffer); - if (PageIsNew((PageHeader) pageHeader)) - { - buffer = ReleaseAndReadBuffer(buffer, relation, P_NEW); - pageHeader = (Page) BufferGetPage(buffer); - PageInit(pageHeader, BufferGetPageSize(buffer), 0); - } + /* + * There was IF instead of ASSERT here ?! + */ + Assert(PageIsNew((PageHeader) pageHeader)); + buffer = ReleaseAndReadBuffer(buffer, relation, P_NEW); + pageHeader = (Page) BufferGetPage(buffer); + PageInit(pageHeader, BufferGetPageSize(buffer), 0); } else buffer = ReadBuffer(relation, lastblock - 1); + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); pageHeader = (Page) BufferGetPage(buffer); len = (unsigned) DOUBLEALIGN(tuple->t_len); /* be conservative */ @@ -152,7 +148,9 @@ RelationPutHeapTupleAtEnd(Relation relation, HeapTuple tuple) if (len > PageGetFreeSpace(pageHeader)) { + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); buffer = ReleaseAndReadBuffer(buffer, relation, P_NEW); + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); pageHeader = (Page) BufferGetPage(buffer); PageInit(pageHeader, BufferGetPageSize(buffer), 0); @@ -160,6 +158,9 @@ RelationPutHeapTupleAtEnd(Relation relation, HeapTuple tuple) elog(ERROR, "Tuple is too big: size %d", len); } + if (!relation->rd_islocal) + UnlockRelation(relation, ExtendLock); + offnum = PageAddItem((Page) pageHeader, (Item) tuple->t_data, tuple->t_len, InvalidOffsetNumber, LP_USED); @@ -173,5 +174,7 @@ RelationPutHeapTupleAtEnd(Relation relation, HeapTuple tuple) /* return an accurate tuple */ ItemPointerSet(&tuple->t_self, lastblock, offnum); + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); WriteBuffer(buffer); + } diff --git a/src/backend/access/heap/stats.c b/src/backend/access/heap/stats.c index 2bebfd9b0be..d630dedf815 100644 --- a/src/backend/access/heap/stats.c +++ b/src/backend/access/heap/stats.c @@ -7,7 +7,7 @@ * * * IDENTIFICATION - * $Header: /cvsroot/pgsql/src/backend/access/heap/Attic/stats.c,v 1.13 1997/09/08 02:20:31 momjian Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/heap/Attic/stats.c,v 1.14 1998/12/15 12:45:15 vadim Exp $ * * NOTES * initam should be moved someplace else. @@ -73,6 +73,7 @@ InitHeapAccessStatistics() stats->global_insert = 0; stats->global_delete = 0; stats->global_replace = 0; + stats->global_mark4update = 0; stats->global_markpos = 0; stats->global_restrpos = 0; stats->global_BufferGetRelation = 0; @@ -94,6 +95,7 @@ InitHeapAccessStatistics() stats->local_insert = 0; stats->local_delete = 0; stats->local_replace = 0; + stats->local_mark4update = 0; stats->local_markpos = 0; stats->local_restrpos = 0; stats->local_BufferGetRelation = 0; @@ -157,6 +159,7 @@ ResetHeapAccessStatistics() stats->local_insert = 0; stats->local_delete = 0; stats->local_replace = 0; + stats->local_mark4update = 0; stats->local_markpos = 0; stats->local_restrpos = 0; stats->local_BufferGetRelation = 0; @@ -274,6 +277,9 @@ PrintHeapAccessStatistics(HeapAccessStatistics stats) printf("local/global_replace: %6d/%6d\n", stats->local_replace, stats->global_replace); + printf("local/global_mark4update: %6d/%6d\n", + stats->local_mark4update, stats->global_mark4update); + printf("local/global_markpos: %6d/%6d\n", stats->local_markpos, stats->global_markpos); |