diff options
Diffstat (limited to 'src/backend/access/heap/heapam.c')
-rw-r--r-- | src/backend/access/heap/heapam.c | 575 |
1 files changed, 431 insertions, 144 deletions
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 8505e11437b..0b750e72a7d 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/heap/heapam.c,v 1.203 2005/11/22 18:17:06 momjian Exp $ + * $PostgreSQL: pgsql/src/backend/access/heap/heapam.c,v 1.204 2005/11/26 03:03:07 tgl Exp $ * * * INTERFACE ROUTINES @@ -78,12 +78,17 @@ initscan(HeapScanDesc scan, ScanKey key) */ scan->rs_nblocks = RelationGetNumberOfBlocks(scan->rs_rd); + scan->rs_inited = false; scan->rs_ctup.t_data = NULL; + ItemPointerSetInvalid(&scan->rs_ctup.t_self); scan->rs_cbuf = InvalidBuffer; + scan->rs_cblock = InvalidBlockNumber; /* we don't have a marked position... */ ItemPointerSetInvalid(&(scan->rs_mctid)); + /* page-at-a-time fields are always invalid when not rs_inited */ + /* * copy the scan key, if appropriate */ @@ -93,79 +98,128 @@ initscan(HeapScanDesc scan, ScanKey key) pgstat_count_heap_scan(&scan->rs_pgstat_info); } -/* ---------------- - * heapgettup - fetch next heap tuple - * - * routine used by heap_getnext() which does most of the - * real work in scanning tuples. +/* + * heapgetpage - subroutine for heapgettup() * - * The passed-in *buffer must be either InvalidBuffer or the pinned - * current page of the scan. If we have to move to another page, - * we will unpin this buffer (if valid). On return, *buffer is either - * InvalidBuffer or the ID of a pinned buffer. - * ---------------- + * This routine reads and pins the specified page of the relation. + * In page-at-a-time mode it performs additional work, namely determining + * which tuples on the page are visible. */ static void -heapgettup(Relation relation, - int dir, - HeapTuple tuple, - Buffer *buffer, - Snapshot snapshot, - int nkeys, - ScanKey key, - BlockNumber pages) +heapgetpage(HeapScanDesc scan, BlockNumber page) { - ItemId lpp; + Buffer buffer; + Snapshot snapshot; Page dp; - BlockNumber page; int lines; + int ntup; OffsetNumber lineoff; - int linesleft; - ItemPointer tid; + ItemId lpp; + + Assert(page < scan->rs_nblocks); - tid = (tuple->t_data == NULL) ? NULL : &(tuple->t_self); + scan->rs_cbuf = ReleaseAndReadBuffer(scan->rs_cbuf, + scan->rs_rd, + page); + scan->rs_cblock = page; + + if (!scan->rs_pageatatime) + return; + + buffer = scan->rs_cbuf; + snapshot = scan->rs_snapshot; /* - * debugging stuff - * - * check validity of arguments, here and for other functions too - * - * Note: no locking manipulations needed--this is a local function + * We must hold share lock on the buffer content while examining + * tuple visibility. Afterwards, however, the tuples we have found + * to be visible are guaranteed good as long as we hold the buffer pin. */ -#ifdef HEAPDEBUGALL - if (ItemPointerIsValid(tid)) - elog(DEBUG2, "heapgettup(%s, tid=0x%x[%d,%d], dir=%d, ...)", - RelationGetRelationName(relation), tid, tid->ip_blkid, - tid->ip_posid, dir); - else - elog(DEBUG2, "heapgettup(%s, tid=0x%x, dir=%d, ...)", - RelationGetRelationName(relation), tid, dir); - - elog(DEBUG2, "heapgettup(..., b=0x%x, nkeys=%d, key=0x%x", buffer, nkeys, key); + LockBuffer(buffer, BUFFER_LOCK_SHARE); - elog(DEBUG2, "heapgettup: relation(%c)=`%s', %p", - relation->rd_rel->relkind, RelationGetRelationName(relation), - snapshot); -#endif /* HEAPDEBUGALL */ + dp = (Page) BufferGetPage(buffer); + lines = PageGetMaxOffsetNumber(dp); + ntup = 0; - if (!ItemPointerIsValid(tid)) + for (lineoff = FirstOffsetNumber, lpp = PageGetItemId(dp, lineoff); + lineoff <= lines; + lineoff++, lpp++) { - Assert(!PointerIsValid(tid)); - tid = NULL; + if (ItemIdIsUsed(lpp)) + { + HeapTupleData loctup; + bool valid; + + loctup.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp); + loctup.t_len = ItemIdGetLength(lpp); + ItemPointerSet(&(loctup.t_self), page, lineoff); + + valid = HeapTupleSatisfiesVisibility(&loctup, snapshot, buffer); + if (valid) + scan->rs_vistuples[ntup++] = lineoff; + } } - tuple->t_tableOid = RelationGetRelid(relation); + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); - /* - * return null immediately if relation is empty - */ - if (pages == 0) + Assert(ntup <= MaxHeapTuplesPerPage); + scan->rs_ntuples = ntup; +} + +/* ---------------- + * heapgettup - fetch next heap tuple + * + * Initialize the scan if not already done; then advance to the next + * tuple as indicated by "dir"; return the next tuple in scan->rs_ctup, + * or set scan->rs_ctup.t_data = NULL if no more tuples. + * + * dir == 0 means "re-fetch the tuple indicated by scan->rs_ctup". + * + * Note: the reason nkeys/key are passed separately, even though they are + * kept in the scan descriptor, is that the caller may not want us to check + * the scankeys. + * + * Note: when we fall off the end of the scan in either direction, we + * reset rs_inited. This means that a further request with the same + * scan direction will restart the scan, which is a bit odd, but a + * request with the opposite scan direction will start a fresh scan + * in the proper direction. The latter is required behavior for cursors, + * while the former case is generally undefined behavior in Postgres + * so we don't care too much. + * ---------------- + */ +static void +heapgettup(HeapScanDesc scan, + int dir, + int nkeys, + ScanKey key) +{ + HeapTuple tuple = &(scan->rs_ctup); + ItemPointer tid = &(tuple->t_self); + Snapshot snapshot = scan->rs_snapshot; + BlockNumber pages = scan->rs_nblocks; + BlockNumber page; + Page dp; + int lines; + OffsetNumber lineoff; + int linesleft; + ItemId lpp; + + if (!scan->rs_inited) { - if (BufferIsValid(*buffer)) - ReleaseBuffer(*buffer); - *buffer = InvalidBuffer; - tuple->t_data = NULL; - return; + /* + * return null immediately if relation is empty + */ + if (pages == 0) + { + Assert(!BufferIsValid(scan->rs_cbuf)); + tuple->t_data = NULL; + return; + } + } + else + { + /* resuming scan from tuple indicated by scan->rs_ctup.t_self */ + Assert(ItemPointerIsValid(tid)); } /* @@ -174,30 +228,26 @@ heapgettup(Relation relation, if (dir == 0) { /* - * ``no movement'' scan direction: refetch same tuple + * ``no movement'' scan direction: refetch prior tuple */ - if (tid == NULL) + if (!scan->rs_inited) { - if (BufferIsValid(*buffer)) - ReleaseBuffer(*buffer); - *buffer = InvalidBuffer; + Assert(!BufferIsValid(scan->rs_cbuf)); tuple->t_data = NULL; return; } - *buffer = ReleaseAndReadBuffer(*buffer, - relation, - ItemPointerGetBlockNumber(tid)); - - LockBuffer(*buffer, BUFFER_LOCK_SHARE); + page = ItemPointerGetBlockNumber(tid); + if (page != scan->rs_cblock) + heapgetpage(scan, page); - dp = (Page) BufferGetPage(*buffer); + /* Since the tuple was previously fetched, needn't lock page here */ + dp = (Page) BufferGetPage(scan->rs_cbuf); lineoff = ItemPointerGetOffsetNumber(tid); lpp = PageGetItemId(dp, lineoff); tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp); tuple->t_len = ItemIdGetLength(lpp); - LockBuffer(*buffer, BUFFER_LOCK_UNLOCK); return; } @@ -206,28 +256,23 @@ heapgettup(Relation relation, /* * reverse scan direction */ - if (tid == NULL) - { + if (!scan->rs_inited) page = pages - 1; /* final page */ - } else - { page = ItemPointerGetBlockNumber(tid); /* current page */ - } - - Assert(page < pages); - *buffer = ReleaseAndReadBuffer(*buffer, - relation, - page); + if (page != scan->rs_cblock) + heapgetpage(scan, page); - LockBuffer(*buffer, BUFFER_LOCK_SHARE); + LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE); - dp = (Page) BufferGetPage(*buffer); + dp = (Page) BufferGetPage(scan->rs_cbuf); lines = PageGetMaxOffsetNumber(dp); - if (tid == NULL) + + if (!scan->rs_inited) { lineoff = lines; /* final offnum */ + scan->rs_inited = true; } else { @@ -241,10 +286,11 @@ heapgettup(Relation relation, /* * forward scan direction */ - if (tid == NULL) + if (!scan->rs_inited) { page = 0; /* first page */ lineoff = FirstOffsetNumber; /* first offnum */ + scan->rs_inited = true; } else { @@ -253,15 +299,12 @@ heapgettup(Relation relation, OffsetNumberNext(ItemPointerGetOffsetNumber(tid)); } - Assert(page < pages); - - *buffer = ReleaseAndReadBuffer(*buffer, - relation, - page); + if (page != scan->rs_cblock) + heapgetpage(scan, page); - LockBuffer(*buffer, BUFFER_LOCK_SHARE); + LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE); - dp = (Page) BufferGetPage(*buffer); + dp = (Page) BufferGetPage(scan->rs_cbuf); lines = PageGetMaxOffsetNumber(dp); /* page and lineoff now reference the physically next tid */ } @@ -269,22 +312,21 @@ heapgettup(Relation relation, /* 'dir' is now non-zero */ /* - * calculate line pointer and number of remaining items to check on this - * page. + * calculate number of remaining items to check on this page */ - lpp = PageGetItemId(dp, lineoff); if (dir < 0) - linesleft = lineoff - 1; + linesleft = lineoff; else - linesleft = lines - lineoff; + linesleft = lines - lineoff + 1; /* * advance the scan until we find a qualifying tuple or run out of stuff * to scan */ + lpp = PageGetItemId(dp, lineoff); for (;;) { - while (linesleft >= 0) + while (linesleft > 0) { if (ItemIdIsUsed(lpp)) { @@ -297,11 +339,17 @@ heapgettup(Relation relation, /* * if current tuple qualifies, return it. */ - HeapTupleSatisfies(tuple, relation, *buffer, (PageHeader) dp, - snapshot, nkeys, key, valid); + valid = HeapTupleSatisfiesVisibility(tuple, + snapshot, + scan->rs_cbuf); + + if (valid && key != NULL) + HeapKeyTest(tuple, RelationGetDescr(scan->rs_rd), + nkeys, key, valid); + if (valid) { - LockBuffer(*buffer, BUFFER_LOCK_UNLOCK); + LockBuffer(scan->rs_cbuf, BUFFER_LOCK_UNLOCK); return; } } @@ -326,32 +374,31 @@ heapgettup(Relation relation, * if we get here, it means we've exhausted the items on this page and * it's time to move to the next. */ - LockBuffer(*buffer, BUFFER_LOCK_UNLOCK); + LockBuffer(scan->rs_cbuf, BUFFER_LOCK_UNLOCK); /* * return NULL if we've exhausted all the pages */ if ((dir < 0) ? (page == 0) : (page + 1 >= pages)) { - if (BufferIsValid(*buffer)) - ReleaseBuffer(*buffer); - *buffer = InvalidBuffer; + if (BufferIsValid(scan->rs_cbuf)) + ReleaseBuffer(scan->rs_cbuf); + scan->rs_cbuf = InvalidBuffer; + scan->rs_cblock = InvalidBlockNumber; tuple->t_data = NULL; + scan->rs_inited = false; return; } page = (dir < 0) ? (page - 1) : (page + 1); - Assert(page < pages); + heapgetpage(scan, page); - *buffer = ReleaseAndReadBuffer(*buffer, - relation, - page); + LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE); - LockBuffer(*buffer, BUFFER_LOCK_SHARE); - dp = (Page) BufferGetPage(*buffer); + dp = (Page) BufferGetPage(scan->rs_cbuf); lines = PageGetMaxOffsetNumber((Page) dp); - linesleft = lines - 1; + linesleft = lines; if (dir < 0) { lineoff = lines; @@ -365,6 +412,233 @@ heapgettup(Relation relation, } } +/* ---------------- + * heapgettup_pagemode - fetch next heap tuple in page-at-a-time mode + * + * Same API as heapgettup, but used in page-at-a-time mode + * + * The internal logic is much the same as heapgettup's too, but there are some + * differences: we do not take the buffer content lock (that only needs to + * happen inside heapgetpage), and we iterate through just the tuples listed + * in rs_vistuples[] rather than all tuples on the page. Notice that + * lineindex is 0-based, where the corresponding loop variable lineoff in + * heapgettup is 1-based. + * ---------------- + */ +static void +heapgettup_pagemode(HeapScanDesc scan, + int dir, + int nkeys, + ScanKey key) +{ + HeapTuple tuple = &(scan->rs_ctup); + ItemPointer tid = &(tuple->t_self); + BlockNumber pages = scan->rs_nblocks; + BlockNumber page; + Page dp; + int lines; + int lineindex; + OffsetNumber lineoff; + int linesleft; + ItemId lpp; + + if (!scan->rs_inited) + { + /* + * return null immediately if relation is empty + */ + if (pages == 0) + { + Assert(!BufferIsValid(scan->rs_cbuf)); + tuple->t_data = NULL; + return; + } + } + else + { + /* resuming scan from tuple indicated by scan->rs_ctup.t_self */ + Assert(ItemPointerIsValid(tid)); + } + + /* + * calculate next starting lineindex, given scan direction + */ + if (dir == 0) + { + /* + * ``no movement'' scan direction: refetch prior tuple + */ + if (!scan->rs_inited) + { + Assert(!BufferIsValid(scan->rs_cbuf)); + tuple->t_data = NULL; + return; + } + + page = ItemPointerGetBlockNumber(tid); + if (page != scan->rs_cblock) + heapgetpage(scan, page); + + /* Since the tuple was previously fetched, needn't lock page here */ + dp = (Page) BufferGetPage(scan->rs_cbuf); + lineoff = ItemPointerGetOffsetNumber(tid); + lpp = PageGetItemId(dp, lineoff); + + tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp); + tuple->t_len = ItemIdGetLength(lpp); + + /* check that rs_cindex is in sync */ + Assert(scan->rs_cindex < scan->rs_ntuples); + Assert(lineoff == scan->rs_vistuples[scan->rs_cindex]); + + return; + } + else if (dir < 0) + { + /* + * reverse scan direction + */ + if (!scan->rs_inited) + page = pages - 1; /* final page */ + else + page = ItemPointerGetBlockNumber(tid); /* current page */ + + if (page != scan->rs_cblock) + heapgetpage(scan, page); + + dp = (Page) BufferGetPage(scan->rs_cbuf); + lines = scan->rs_ntuples; + + if (!scan->rs_inited) + { + lineindex = lines - 1; + scan->rs_inited = true; + } + else + { + lineindex = scan->rs_cindex - 1; + } + /* page and lineindex now reference the previous visible tid */ + } + else + { + /* + * forward scan direction + */ + if (!scan->rs_inited) + { + page = 0; /* first page */ + lineindex = 0; + scan->rs_inited = true; + } + else + { + page = ItemPointerGetBlockNumber(tid); /* current page */ + lineindex = scan->rs_cindex + 1; + } + + if (page != scan->rs_cblock) + heapgetpage(scan, page); + + dp = (Page) BufferGetPage(scan->rs_cbuf); + lines = scan->rs_ntuples; + /* page and lineindex now reference the next visible tid */ + } + + /* 'dir' is now non-zero */ + + /* + * calculate number of remaining items to check on this page + */ + if (dir < 0) + linesleft = lineindex + 1; + else + linesleft = lines - lineindex; + + /* + * advance the scan until we find a qualifying tuple or run out of stuff + * to scan + */ + for (;;) + { + while (linesleft > 0) + { + lineoff = scan->rs_vistuples[lineindex]; + lpp = PageGetItemId(dp, lineoff); + Assert(ItemIdIsUsed(lpp)); + + tuple->t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp); + tuple->t_len = ItemIdGetLength(lpp); + ItemPointerSet(&(tuple->t_self), page, lineoff); + + /* + * if current tuple qualifies, return it. + */ + if (key != NULL) + { + bool valid; + + HeapKeyTest(tuple, RelationGetDescr(scan->rs_rd), + nkeys, key, valid); + if (valid) + { + scan->rs_cindex = lineindex; + return; + } + } + else + { + scan->rs_cindex = lineindex; + return; + } + + /* + * otherwise move to the next item on the page + */ + --linesleft; + if (dir < 0) + { + --lineindex; + } + else + { + ++lineindex; + } + } + + /* + * if we get here, it means we've exhausted the items on this page and + * it's time to move to the next. + */ + + /* + * return NULL if we've exhausted all the pages + */ + if ((dir < 0) ? (page == 0) : (page + 1 >= pages)) + { + if (BufferIsValid(scan->rs_cbuf)) + ReleaseBuffer(scan->rs_cbuf); + scan->rs_cbuf = InvalidBuffer; + scan->rs_cblock = InvalidBlockNumber; + tuple->t_data = NULL; + scan->rs_inited = false; + return; + } + + page = (dir < 0) ? (page - 1) : (page + 1); + + heapgetpage(scan, page); + + dp = (Page) BufferGetPage(scan->rs_cbuf); + lines = scan->rs_ntuples; + linesleft = lines; + if (dir < 0) + lineindex = lines - 1; + else + lineindex = 0; + } +} + #if defined(DISABLE_COMPLEX_MACRO) /* @@ -643,6 +917,14 @@ heap_beginscan(Relation relation, Snapshot snapshot, scan->rs_nkeys = nkeys; /* + * we can use page-at-a-time mode if it's an MVCC-safe snapshot + */ + scan->rs_pageatatime = IsMVCCSnapshot(snapshot); + + /* we only need to set this up once */ + scan->rs_ctup.t_tableOid = RelationGetRelid(relation); + + /* * we do this here instead of in initscan() because heap_rescan also calls * initscan() and we don't want to allocate memory again */ @@ -741,16 +1023,14 @@ heap_getnext(HeapScanDesc scan, ScanDirection direction) /* * Note: we depend here on the -1/0/1 encoding of ScanDirection. */ - heapgettup(scan->rs_rd, - (int) direction, - &(scan->rs_ctup), - &(scan->rs_cbuf), - scan->rs_snapshot, - scan->rs_nkeys, - scan->rs_key, - scan->rs_nblocks); + if (scan->rs_pageatatime) + heapgettup_pagemode(scan, (int) direction, + scan->rs_nkeys, scan->rs_key); + else + heapgettup(scan, (int) direction, + scan->rs_nkeys, scan->rs_key); - if (scan->rs_ctup.t_data == NULL && !BufferIsValid(scan->rs_cbuf)) + if (scan->rs_ctup.t_data == NULL) { HEAPDEBUG_2; /* heap_getnext returning EOS */ return NULL; @@ -760,13 +1040,11 @@ heap_getnext(HeapScanDesc scan, ScanDirection direction) * if we get here it means we have a new current scan tuple, so point to * the proper return buffer and return the tuple. */ - HEAPDEBUG_3; /* heap_getnext returning tuple */ - if (scan->rs_ctup.t_data != NULL) - pgstat_count_heap_getnext(&scan->rs_pgstat_info); + pgstat_count_heap_getnext(&scan->rs_pgstat_info); - return ((scan->rs_ctup.t_data == NULL) ? NULL : &(scan->rs_ctup)); + return &(scan->rs_ctup); } /* @@ -903,8 +1181,7 @@ heap_release_fetch(Relation relation, /* * check time qualification of tuple, then release lock */ - HeapTupleSatisfies(tuple, relation, buffer, dp, - snapshot, 0, NULL, valid); + valid = HeapTupleSatisfiesVisibility(tuple, snapshot, buffer); LockBuffer(buffer, BUFFER_LOCK_UNLOCK); @@ -1038,8 +1315,7 @@ heap_get_latest_tid(Relation relation, * Check time qualification of tuple; if visible, set it as the new * result candidate. */ - HeapTupleSatisfies(&tp, relation, buffer, dp, - snapshot, 0, NULL, valid); + valid = HeapTupleSatisfiesVisibility(&tp, snapshot, buffer); if (valid) *tid = ctid; @@ -2439,7 +2715,11 @@ heap_markpos(HeapScanDesc scan) /* Note: no locking manipulations needed */ if (scan->rs_ctup.t_data != NULL) + { scan->rs_mctid = scan->rs_ctup.t_self; + if (scan->rs_pageatatime) + scan->rs_mindex = scan->rs_cindex; + } else ItemPointerSetInvalid(&scan->rs_mctid); } @@ -2453,31 +2733,38 @@ heap_restrpos(HeapScanDesc scan) { /* XXX no amrestrpos checking that ammarkpos called */ - /* Note: no locking manipulations needed */ - - /* - * unpin scan buffers - */ - if (BufferIsValid(scan->rs_cbuf)) - ReleaseBuffer(scan->rs_cbuf); - scan->rs_cbuf = InvalidBuffer; - if (!ItemPointerIsValid(&scan->rs_mctid)) { scan->rs_ctup.t_data = NULL; + /* + * unpin scan buffers + */ + if (BufferIsValid(scan->rs_cbuf)) + ReleaseBuffer(scan->rs_cbuf); + scan->rs_cbuf = InvalidBuffer; + scan->rs_cblock = InvalidBlockNumber; } else { + /* + * If we reached end of scan, rs_inited will now be false. We must + * reset it to true to keep heapgettup from doing the wrong thing. + */ + scan->rs_inited = true; scan->rs_ctup.t_self = scan->rs_mctid; - scan->rs_ctup.t_data = (HeapTupleHeader) 0x1; /* for heapgettup */ - heapgettup(scan->rs_rd, - 0, - &(scan->rs_ctup), - &(scan->rs_cbuf), - scan->rs_snapshot, - 0, - NULL, - scan->rs_nblocks); + if (scan->rs_pageatatime) + { + scan->rs_cindex = scan->rs_mindex; + heapgettup_pagemode(scan, + 0, /* "no movement" */ + 0, /* needn't recheck scan keys */ + NULL); + } + else + heapgettup(scan, + 0, /* "no movement" */ + 0, /* needn't recheck scan keys */ + NULL); } } |