diff options
Diffstat (limited to 'src/backend/access/heap/heapam_handler.c')
-rw-r--r-- | src/backend/access/heap/heapam_handler.c | 155 |
1 files changed, 155 insertions, 0 deletions
diff --git a/src/backend/access/heap/heapam_handler.c b/src/backend/access/heap/heapam_handler.c index 5e8bf0aa5ca..ce54b16f342 100644 --- a/src/backend/access/heap/heapam_handler.c +++ b/src/backend/access/heap/heapam_handler.c @@ -1953,6 +1953,159 @@ heapam_estimate_rel_size(Relation rel, int32 *attr_widths, */ static bool +heapam_scan_bitmap_next_block(TableScanDesc scan, + TBMIterateResult *tbmres) +{ + HeapScanDesc hscan = (HeapScanDesc) scan; + BlockNumber page = tbmres->blockno; + Buffer buffer; + Snapshot snapshot; + int ntup; + + hscan->rs_cindex = 0; + hscan->rs_ntuples = 0; + + /* + * Ignore any claimed entries past what we think is the end of the + * relation. It may have been extended after the start of our scan (we + * only hold an AccessShareLock, and it could be inserts from this + * backend). + */ + if (page >= hscan->rs_nblocks) + return false; + + /* + * Acquire pin on the target heap page, trading in any pin we held before. + */ + hscan->rs_cbuf = ReleaseAndReadBuffer(hscan->rs_cbuf, + scan->rs_rd, + page); + hscan->rs_cblock = page; + buffer = hscan->rs_cbuf; + snapshot = scan->rs_snapshot; + + ntup = 0; + + /* + * Prune and repair fragmentation for the whole page, if possible. + */ + heap_page_prune_opt(scan->rs_rd, buffer); + + /* + * We must hold share lock on the buffer content while examining tuple + * visibility. Afterwards, however, the tuples we have found to be + * visible are guaranteed good as long as we hold the buffer pin. + */ + LockBuffer(buffer, BUFFER_LOCK_SHARE); + + /* + * We need two separate strategies for lossy and non-lossy cases. + */ + if (tbmres->ntuples >= 0) + { + /* + * Bitmap is non-lossy, so we just look through the offsets listed in + * tbmres; but we have to follow any HOT chain starting at each such + * offset. + */ + int curslot; + + for (curslot = 0; curslot < tbmres->ntuples; curslot++) + { + OffsetNumber offnum = tbmres->offsets[curslot]; + ItemPointerData tid; + HeapTupleData heapTuple; + + ItemPointerSet(&tid, page, offnum); + if (heap_hot_search_buffer(&tid, scan->rs_rd, buffer, snapshot, + &heapTuple, NULL, true)) + hscan->rs_vistuples[ntup++] = ItemPointerGetOffsetNumber(&tid); + } + } + else + { + /* + * Bitmap is lossy, so we must examine each item pointer on the page. + * But we can ignore HOT chains, since we'll check each tuple anyway. + */ + Page dp = (Page) BufferGetPage(buffer); + OffsetNumber maxoff = PageGetMaxOffsetNumber(dp); + OffsetNumber offnum; + + for (offnum = FirstOffsetNumber; offnum <= maxoff; offnum = OffsetNumberNext(offnum)) + { + ItemId lp; + HeapTupleData loctup; + bool valid; + + lp = PageGetItemId(dp, offnum); + if (!ItemIdIsNormal(lp)) + continue; + loctup.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lp); + loctup.t_len = ItemIdGetLength(lp); + loctup.t_tableOid = scan->rs_rd->rd_id; + ItemPointerSet(&loctup.t_self, page, offnum); + valid = HeapTupleSatisfiesVisibility(&loctup, snapshot, buffer); + if (valid) + { + hscan->rs_vistuples[ntup++] = offnum; + PredicateLockTuple(scan->rs_rd, &loctup, snapshot); + } + CheckForSerializableConflictOut(valid, scan->rs_rd, &loctup, + buffer, snapshot); + } + } + + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + + Assert(ntup <= MaxHeapTuplesPerPage); + hscan->rs_ntuples = ntup; + + return ntup > 0; +} + +static bool +heapam_scan_bitmap_next_tuple(TableScanDesc scan, + TBMIterateResult *tbmres, + TupleTableSlot *slot) +{ + HeapScanDesc hscan = (HeapScanDesc) scan; + OffsetNumber targoffset; + Page dp; + ItemId lp; + + /* + * Out of range? If so, nothing more to look at on this page + */ + if (hscan->rs_cindex < 0 || hscan->rs_cindex >= hscan->rs_ntuples) + return false; + + targoffset = hscan->rs_vistuples[hscan->rs_cindex]; + dp = (Page) BufferGetPage(hscan->rs_cbuf); + lp = PageGetItemId(dp, targoffset); + Assert(ItemIdIsNormal(lp)); + + hscan->rs_ctup.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lp); + hscan->rs_ctup.t_len = ItemIdGetLength(lp); + hscan->rs_ctup.t_tableOid = scan->rs_rd->rd_id; + ItemPointerSet(&hscan->rs_ctup.t_self, hscan->rs_cblock, targoffset); + + pgstat_count_heap_fetch(scan->rs_rd); + + /* + * Set up the result slot to point to this tuple. Note that the slot + * acquires a pin on the buffer. + */ + ExecStoreBufferHeapTuple(&hscan->rs_ctup, + slot, + hscan->rs_cbuf); + + hscan->rs_cindex++; + + return true; +} + +static bool heapam_scan_sample_next_block(TableScanDesc scan, SampleScanState *scanstate) { HeapScanDesc hscan = (HeapScanDesc) scan; @@ -2266,6 +2419,8 @@ static const TableAmRoutine heapam_methods = { .relation_estimate_size = heapam_estimate_rel_size, + .scan_bitmap_next_block = heapam_scan_bitmap_next_block, + .scan_bitmap_next_tuple = heapam_scan_bitmap_next_tuple, .scan_sample_next_block = heapam_scan_sample_next_block, .scan_sample_next_tuple = heapam_scan_sample_next_tuple }; |