diff options
Diffstat (limited to 'src/backend/access/heap/heapam.c')
-rw-r--r-- | src/backend/access/heap/heapam.c | 650 |
1 files changed, 205 insertions, 445 deletions
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index dc3499349b6..3c8a5da0bc8 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -41,6 +41,7 @@ #include "access/parallel.h" #include "access/relscan.h" #include "access/sysattr.h" +#include "access/tableam.h" #include "access/transam.h" #include "access/tuptoaster.h" #include "access/valid.h" @@ -68,22 +69,6 @@ #include "utils/snapmgr.h" -/* GUC variable */ -bool synchronize_seqscans = true; - - -static HeapScanDesc heap_beginscan_internal(Relation relation, - Snapshot snapshot, - int nkeys, ScanKey key, - ParallelHeapScanDesc parallel_scan, - bool allow_strat, - bool allow_sync, - bool allow_pagemode, - bool is_bitmapscan, - bool is_samplescan, - bool temp_snap); -static void heap_parallelscan_startblock_init(HeapScanDesc scan); -static BlockNumber heap_parallelscan_nextpage(HeapScanDesc scan); static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup, TransactionId xid, CommandId cid, int options); static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf, @@ -207,6 +192,7 @@ static const int MultiXactStatusLock[MaxMultiXactStatus + 1] = static void initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock) { + ParallelBlockTableScanDesc bpscan = NULL; bool allow_strat; bool allow_sync; @@ -221,10 +207,13 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock) * results for a non-MVCC snapshot, the caller must hold some higher-level * lock that ensures the interesting tuple(s) won't change.) */ - if (scan->rs_parallel != NULL) - scan->rs_nblocks = scan->rs_parallel->phs_nblocks; + if (scan->rs_base.rs_parallel != NULL) + { + bpscan = (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel; + scan->rs_nblocks = bpscan->phs_nblocks; + } else - scan->rs_nblocks = RelationGetNumberOfBlocks(scan->rs_rd); + scan->rs_nblocks = RelationGetNumberOfBlocks(scan->rs_base.rs_rd); /* * If the table is large relative to NBuffers, use a bulk-read access @@ -238,11 +227,11 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock) * Note that heap_parallelscan_initialize has a very similar test; if you * change this, consider changing that one, too. */ - if (!RelationUsesLocalBuffers(scan->rs_rd) && + if (!RelationUsesLocalBuffers(scan->rs_base.rs_rd) && scan->rs_nblocks > NBuffers / 4) { - allow_strat = scan->rs_allow_strat; - allow_sync = scan->rs_allow_sync; + allow_strat = scan->rs_base.rs_allow_strat; + allow_sync = scan->rs_base.rs_allow_sync; } else allow_strat = allow_sync = false; @@ -260,10 +249,10 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock) scan->rs_strategy = NULL; } - if (scan->rs_parallel != NULL) + if (scan->rs_base.rs_parallel != NULL) { - /* For parallel scan, believe whatever ParallelHeapScanDesc says. */ - scan->rs_syncscan = scan->rs_parallel->phs_syncscan; + /* For parallel scan, believe whatever ParallelTableScanDesc says. */ + scan->rs_base.rs_syncscan = scan->rs_base.rs_parallel->phs_syncscan; } else if (keep_startblock) { @@ -272,16 +261,16 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock) * so that rewinding a cursor doesn't generate surprising results. * Reset the active syncscan setting, though. */ - scan->rs_syncscan = (allow_sync && synchronize_seqscans); + scan->rs_base.rs_syncscan = (allow_sync && synchronize_seqscans); } else if (allow_sync && synchronize_seqscans) { - scan->rs_syncscan = true; - scan->rs_startblock = ss_get_location(scan->rs_rd, scan->rs_nblocks); + scan->rs_base.rs_syncscan = true; + scan->rs_startblock = ss_get_location(scan->rs_base.rs_rd, scan->rs_nblocks); } else { - scan->rs_syncscan = false; + scan->rs_base.rs_syncscan = false; scan->rs_startblock = 0; } @@ -298,15 +287,15 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock) * copy the scan key, if appropriate */ if (key != NULL) - memcpy(scan->rs_key, key, scan->rs_nkeys * sizeof(ScanKeyData)); + memcpy(scan->rs_base.rs_key, key, scan->rs_base.rs_nkeys * sizeof(ScanKeyData)); /* * Currently, we don't have a stats counter for bitmap heap scans (but the * underlying bitmap index scans will be counted) or sample scans (we only * update stats for tuple fetches there) */ - if (!scan->rs_bitmapscan && !scan->rs_samplescan) - pgstat_count_heap_scan(scan->rs_rd); + if (!scan->rs_base.rs_bitmapscan && !scan->rs_base.rs_samplescan) + pgstat_count_heap_scan(scan->rs_base.rs_rd); } /* @@ -316,10 +305,12 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock) * numBlks is number of pages to scan (InvalidBlockNumber means "all") */ void -heap_setscanlimits(HeapScanDesc scan, BlockNumber startBlk, BlockNumber numBlks) +heap_setscanlimits(TableScanDesc sscan, BlockNumber startBlk, BlockNumber numBlks) { + HeapScanDesc scan = (HeapScanDesc) sscan; + Assert(!scan->rs_inited); /* else too late to change */ - Assert(!scan->rs_syncscan); /* else rs_startblock is significant */ + Assert(!scan->rs_base.rs_syncscan); /* else rs_startblock is significant */ /* Check startBlk is valid (but allow case of zero blocks...) */ Assert(startBlk == 0 || startBlk < scan->rs_nblocks); @@ -336,8 +327,9 @@ heap_setscanlimits(HeapScanDesc scan, BlockNumber startBlk, BlockNumber numBlks) * which tuples on the page are visible. */ void -heapgetpage(HeapScanDesc scan, BlockNumber page) +heapgetpage(TableScanDesc sscan, BlockNumber page) { + HeapScanDesc scan = (HeapScanDesc) sscan; Buffer buffer; Snapshot snapshot; Page dp; @@ -364,20 +356,20 @@ heapgetpage(HeapScanDesc scan, BlockNumber page) CHECK_FOR_INTERRUPTS(); /* read page using selected strategy */ - scan->rs_cbuf = ReadBufferExtended(scan->rs_rd, MAIN_FORKNUM, page, + scan->rs_cbuf = ReadBufferExtended(scan->rs_base.rs_rd, MAIN_FORKNUM, page, RBM_NORMAL, scan->rs_strategy); scan->rs_cblock = page; - if (!scan->rs_pageatatime) + if (!scan->rs_base.rs_pageatatime) return; buffer = scan->rs_cbuf; - snapshot = scan->rs_snapshot; + snapshot = scan->rs_base.rs_snapshot; /* * Prune and repair fragmentation for the whole page, if possible. */ - heap_page_prune_opt(scan->rs_rd, buffer); + heap_page_prune_opt(scan->rs_base.rs_rd, buffer); /* * We must hold share lock on the buffer content while examining tuple @@ -387,7 +379,7 @@ heapgetpage(HeapScanDesc scan, BlockNumber page) LockBuffer(buffer, BUFFER_LOCK_SHARE); dp = BufferGetPage(buffer); - TestForOldSnapshot(snapshot, scan->rs_rd, dp); + TestForOldSnapshot(snapshot, scan->rs_base.rs_rd, dp); lines = PageGetMaxOffsetNumber(dp); ntup = 0; @@ -422,7 +414,7 @@ heapgetpage(HeapScanDesc scan, BlockNumber page) HeapTupleData loctup; bool valid; - loctup.t_tableOid = RelationGetRelid(scan->rs_rd); + loctup.t_tableOid = RelationGetRelid(scan->rs_base.rs_rd); loctup.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp); loctup.t_len = ItemIdGetLength(lpp); ItemPointerSet(&(loctup.t_self), page, lineoff); @@ -432,8 +424,8 @@ heapgetpage(HeapScanDesc scan, BlockNumber page) else valid = HeapTupleSatisfiesVisibility(&loctup, snapshot, buffer); - CheckForSerializableConflictOut(valid, scan->rs_rd, &loctup, - buffer, snapshot); + CheckForSerializableConflictOut(valid, scan->rs_base.rs_rd, + &loctup, buffer, snapshot); if (valid) scan->rs_vistuples[ntup++] = lineoff; @@ -476,7 +468,7 @@ heapgettup(HeapScanDesc scan, ScanKey key) { HeapTuple tuple = &(scan->rs_ctup); - Snapshot snapshot = scan->rs_snapshot; + Snapshot snapshot = scan->rs_base.rs_snapshot; bool backward = ScanDirectionIsBackward(dir); BlockNumber page; bool finished; @@ -502,11 +494,16 @@ heapgettup(HeapScanDesc scan, tuple->t_data = NULL; return; } - if (scan->rs_parallel != NULL) + if (scan->rs_base.rs_parallel != NULL) { - heap_parallelscan_startblock_init(scan); + ParallelBlockTableScanDesc pbscan = + (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel; - page = heap_parallelscan_nextpage(scan); + table_block_parallelscan_startblock_init(scan->rs_base.rs_rd, + pbscan); + + page = table_block_parallelscan_nextpage(scan->rs_base.rs_rd, + pbscan); /* Other processes might have already finished the scan. */ if (page == InvalidBlockNumber) @@ -518,7 +515,7 @@ heapgettup(HeapScanDesc scan, } else page = scan->rs_startblock; /* first page */ - heapgetpage(scan, page); + heapgetpage((TableScanDesc) scan, page); lineoff = FirstOffsetNumber; /* first offnum */ scan->rs_inited = true; } @@ -533,7 +530,7 @@ heapgettup(HeapScanDesc scan, LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE); dp = BufferGetPage(scan->rs_cbuf); - TestForOldSnapshot(snapshot, scan->rs_rd, dp); + TestForOldSnapshot(snapshot, scan->rs_base.rs_rd, dp); lines = PageGetMaxOffsetNumber(dp); /* page and lineoff now reference the physically next tid */ @@ -542,7 +539,7 @@ heapgettup(HeapScanDesc scan, else if (backward) { /* backward parallel scan not supported */ - Assert(scan->rs_parallel == NULL); + Assert(scan->rs_base.rs_parallel == NULL); if (!scan->rs_inited) { @@ -562,13 +559,13 @@ heapgettup(HeapScanDesc scan, * time, and much more likely that we'll just bollix things for * forward scanners. */ - scan->rs_syncscan = false; + scan->rs_base.rs_syncscan = false; /* start from last page of the scan */ if (scan->rs_startblock > 0) page = scan->rs_startblock - 1; else page = scan->rs_nblocks - 1; - heapgetpage(scan, page); + heapgetpage((TableScanDesc) scan, page); } else { @@ -579,7 +576,7 @@ heapgettup(HeapScanDesc scan, LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE); dp = BufferGetPage(scan->rs_cbuf); - TestForOldSnapshot(snapshot, scan->rs_rd, dp); + TestForOldSnapshot(snapshot, scan->rs_base.rs_rd, dp); lines = PageGetMaxOffsetNumber(dp); if (!scan->rs_inited) @@ -610,11 +607,11 @@ heapgettup(HeapScanDesc scan, page = ItemPointerGetBlockNumber(&(tuple->t_self)); if (page != scan->rs_cblock) - heapgetpage(scan, page); + heapgetpage((TableScanDesc) scan, page); /* Since the tuple was previously fetched, needn't lock page here */ dp = BufferGetPage(scan->rs_cbuf); - TestForOldSnapshot(snapshot, scan->rs_rd, dp); + TestForOldSnapshot(snapshot, scan->rs_base.rs_rd, dp); lineoff = ItemPointerGetOffsetNumber(&(tuple->t_self)); lpp = PageGetItemId(dp, lineoff); Assert(ItemIdIsNormal(lpp)); @@ -649,11 +646,12 @@ heapgettup(HeapScanDesc scan, snapshot, scan->rs_cbuf); - CheckForSerializableConflictOut(valid, scan->rs_rd, tuple, - scan->rs_cbuf, snapshot); + CheckForSerializableConflictOut(valid, scan->rs_base.rs_rd, + tuple, scan->rs_cbuf, + snapshot); if (valid && key != NULL) - HeapKeyTest(tuple, RelationGetDescr(scan->rs_rd), + HeapKeyTest(tuple, RelationGetDescr(scan->rs_base.rs_rd), nkeys, key, valid); if (valid) @@ -696,9 +694,13 @@ heapgettup(HeapScanDesc scan, page = scan->rs_nblocks; page--; } - else if (scan->rs_parallel != NULL) + else if (scan->rs_base.rs_parallel != NULL) { - page = heap_parallelscan_nextpage(scan); + ParallelBlockTableScanDesc pbscan = + (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel; + + page = table_block_parallelscan_nextpage(scan->rs_base.rs_rd, + pbscan); finished = (page == InvalidBlockNumber); } else @@ -721,8 +723,8 @@ heapgettup(HeapScanDesc scan, * a little bit backwards on every invocation, which is confusing. * We don't guarantee any specific ordering in general, though. */ - if (scan->rs_syncscan) - ss_report_location(scan->rs_rd, page); + if (scan->rs_base.rs_syncscan) + ss_report_location(scan->rs_base.rs_rd, page); } /* @@ -739,12 +741,12 @@ heapgettup(HeapScanDesc scan, return; } - heapgetpage(scan, page); + heapgetpage((TableScanDesc) scan, page); LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE); dp = BufferGetPage(scan->rs_cbuf); - TestForOldSnapshot(snapshot, scan->rs_rd, dp); + TestForOldSnapshot(snapshot, scan->rs_base.rs_rd, dp); lines = PageGetMaxOffsetNumber((Page) dp); linesleft = lines; if (backward) @@ -806,11 +808,16 @@ heapgettup_pagemode(HeapScanDesc scan, tuple->t_data = NULL; return; } - if (scan->rs_parallel != NULL) + if (scan->rs_base.rs_parallel != NULL) { - heap_parallelscan_startblock_init(scan); + ParallelBlockTableScanDesc pbscan = + (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel; + + table_block_parallelscan_startblock_init(scan->rs_base.rs_rd, + pbscan); - page = heap_parallelscan_nextpage(scan); + page = table_block_parallelscan_nextpage(scan->rs_base.rs_rd, + pbscan); /* Other processes might have already finished the scan. */ if (page == InvalidBlockNumber) @@ -822,7 +829,7 @@ heapgettup_pagemode(HeapScanDesc scan, } else page = scan->rs_startblock; /* first page */ - heapgetpage(scan, page); + heapgetpage((TableScanDesc) scan, page); lineindex = 0; scan->rs_inited = true; } @@ -834,7 +841,7 @@ heapgettup_pagemode(HeapScanDesc scan, } dp = BufferGetPage(scan->rs_cbuf); - TestForOldSnapshot(scan->rs_snapshot, scan->rs_rd, dp); + TestForOldSnapshot(scan->rs_base.rs_snapshot, scan->rs_base.rs_rd, dp); lines = scan->rs_ntuples; /* page and lineindex now reference the next visible tid */ @@ -843,7 +850,7 @@ heapgettup_pagemode(HeapScanDesc scan, else if (backward) { /* backward parallel scan not supported */ - Assert(scan->rs_parallel == NULL); + Assert(scan->rs_base.rs_parallel == NULL); if (!scan->rs_inited) { @@ -863,13 +870,13 @@ heapgettup_pagemode(HeapScanDesc scan, * time, and much more likely that we'll just bollix things for * forward scanners. */ - scan->rs_syncscan = false; + scan->rs_base.rs_syncscan = false; /* start from last page of the scan */ if (scan->rs_startblock > 0) page = scan->rs_startblock - 1; else page = scan->rs_nblocks - 1; - heapgetpage(scan, page); + heapgetpage((TableScanDesc) scan, page); } else { @@ -878,7 +885,7 @@ heapgettup_pagemode(HeapScanDesc scan, } dp = BufferGetPage(scan->rs_cbuf); - TestForOldSnapshot(scan->rs_snapshot, scan->rs_rd, dp); + TestForOldSnapshot(scan->rs_base.rs_snapshot, scan->rs_base.rs_rd, dp); lines = scan->rs_ntuples; if (!scan->rs_inited) @@ -908,11 +915,11 @@ heapgettup_pagemode(HeapScanDesc scan, page = ItemPointerGetBlockNumber(&(tuple->t_self)); if (page != scan->rs_cblock) - heapgetpage(scan, page); + heapgetpage((TableScanDesc) scan, page); /* Since the tuple was previously fetched, needn't lock page here */ dp = BufferGetPage(scan->rs_cbuf); - TestForOldSnapshot(scan->rs_snapshot, scan->rs_rd, dp); + TestForOldSnapshot(scan->rs_base.rs_snapshot, scan->rs_base.rs_rd, dp); lineoff = ItemPointerGetOffsetNumber(&(tuple->t_self)); lpp = PageGetItemId(dp, lineoff); Assert(ItemIdIsNormal(lpp)); @@ -950,7 +957,7 @@ heapgettup_pagemode(HeapScanDesc scan, { bool valid; - HeapKeyTest(tuple, RelationGetDescr(scan->rs_rd), + HeapKeyTest(tuple, RelationGetDescr(scan->rs_base.rs_rd), nkeys, key, valid); if (valid) { @@ -986,9 +993,13 @@ heapgettup_pagemode(HeapScanDesc scan, page = scan->rs_nblocks; page--; } - else if (scan->rs_parallel != NULL) + else if (scan->rs_base.rs_parallel != NULL) { - page = heap_parallelscan_nextpage(scan); + ParallelBlockTableScanDesc pbscan = + (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel; + + page = table_block_parallelscan_nextpage(scan->rs_base.rs_rd, + pbscan); finished = (page == InvalidBlockNumber); } else @@ -1011,8 +1022,8 @@ heapgettup_pagemode(HeapScanDesc scan, * a little bit backwards on every invocation, which is confusing. * We don't guarantee any specific ordering in general, though. */ - if (scan->rs_syncscan) - ss_report_location(scan->rs_rd, page); + if (scan->rs_base.rs_syncscan) + ss_report_location(scan->rs_base.rs_rd, page); } /* @@ -1029,10 +1040,10 @@ heapgettup_pagemode(HeapScanDesc scan, return; } - heapgetpage(scan, page); + heapgetpage((TableScanDesc) scan, page); dp = BufferGetPage(scan->rs_cbuf); - TestForOldSnapshot(scan->rs_snapshot, scan->rs_rd, dp); + TestForOldSnapshot(scan->rs_base.rs_snapshot, scan->rs_base.rs_rd, dp); lines = scan->rs_ntuples; linesleft = lines; if (backward) @@ -1095,86 +1106,16 @@ fastgetattr(HeapTuple tup, int attnum, TupleDesc tupleDesc, */ -/* ---------------- - * heap_beginscan - begin relation scan - * - * heap_beginscan is the "standard" case. - * - * heap_beginscan_catalog differs in setting up its own temporary snapshot. - * - * heap_beginscan_strat offers an extended API that lets the caller control - * whether a nondefault buffer access strategy can be used, and whether - * syncscan can be chosen (possibly resulting in the scan not starting from - * block zero). Both of these default to true with plain heap_beginscan. - * - * heap_beginscan_bm is an alternative entry point for setting up a - * HeapScanDesc for a bitmap heap scan. Although that scan technology is - * really quite unlike a standard seqscan, there is just enough commonality - * to make it worth using the same data structure. - * - * heap_beginscan_sampling is an alternative entry point for setting up a - * HeapScanDesc for a TABLESAMPLE scan. As with bitmap scans, it's worth - * using the same data structure although the behavior is rather different. - * In addition to the options offered by heap_beginscan_strat, this call - * also allows control of whether page-mode visibility checking is used. - * ---------------- - */ -HeapScanDesc +TableScanDesc heap_beginscan(Relation relation, Snapshot snapshot, - int nkeys, ScanKey key) -{ - return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL, - true, true, true, false, false, false); -} - -HeapScanDesc -heap_beginscan_catalog(Relation relation, int nkeys, ScanKey key) -{ - Oid relid = RelationGetRelid(relation); - Snapshot snapshot = RegisterSnapshot(GetCatalogSnapshot(relid)); - - return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL, - true, true, true, false, false, true); -} - -HeapScanDesc -heap_beginscan_strat(Relation relation, Snapshot snapshot, - int nkeys, ScanKey key, - bool allow_strat, bool allow_sync) -{ - return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL, - allow_strat, allow_sync, true, - false, false, false); -} - -HeapScanDesc -heap_beginscan_bm(Relation relation, Snapshot snapshot, - int nkeys, ScanKey key) -{ - return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL, - false, false, true, true, false, false); -} - -HeapScanDesc -heap_beginscan_sampling(Relation relation, Snapshot snapshot, - int nkeys, ScanKey key, - bool allow_strat, bool allow_sync, bool allow_pagemode) -{ - return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL, - allow_strat, allow_sync, allow_pagemode, - false, true, false); -} - -static HeapScanDesc -heap_beginscan_internal(Relation relation, Snapshot snapshot, - int nkeys, ScanKey key, - ParallelHeapScanDesc parallel_scan, - bool allow_strat, - bool allow_sync, - bool allow_pagemode, - bool is_bitmapscan, - bool is_samplescan, - bool temp_snap) + int nkeys, ScanKey key, + ParallelTableScanDesc parallel_scan, + bool allow_strat, + bool allow_sync, + bool allow_pagemode, + bool is_bitmapscan, + bool is_samplescan, + bool temp_snap) { HeapScanDesc scan; @@ -1192,21 +1133,22 @@ heap_beginscan_internal(Relation relation, Snapshot snapshot, */ scan = (HeapScanDesc) palloc(sizeof(HeapScanDescData)); - scan->rs_rd = relation; - scan->rs_snapshot = snapshot; - scan->rs_nkeys = nkeys; - scan->rs_bitmapscan = is_bitmapscan; - scan->rs_samplescan = is_samplescan; + scan->rs_base.rs_rd = relation; + scan->rs_base.rs_snapshot = snapshot; + scan->rs_base.rs_nkeys = nkeys; + scan->rs_base.rs_bitmapscan = is_bitmapscan; + scan->rs_base.rs_samplescan = is_samplescan; scan->rs_strategy = NULL; /* set in initscan */ - scan->rs_allow_strat = allow_strat; - scan->rs_allow_sync = allow_sync; - scan->rs_temp_snap = temp_snap; - scan->rs_parallel = parallel_scan; + scan->rs_base.rs_allow_strat = allow_strat; + scan->rs_base.rs_allow_sync = allow_sync; + scan->rs_base.rs_temp_snap = temp_snap; + scan->rs_base.rs_parallel = parallel_scan; /* * we can use page-at-a-time mode if it's an MVCC-safe snapshot */ - scan->rs_pageatatime = allow_pagemode && IsMVCCSnapshot(snapshot); + scan->rs_base.rs_pageatatime = + allow_pagemode && snapshot && IsMVCCSnapshot(snapshot); /* * For a seqscan in a serializable transaction, acquire a predicate lock @@ -1230,23 +1172,29 @@ heap_beginscan_internal(Relation relation, Snapshot snapshot, * initscan() and we don't want to allocate memory again */ if (nkeys > 0) - scan->rs_key = (ScanKey) palloc(sizeof(ScanKeyData) * nkeys); + scan->rs_base.rs_key = (ScanKey) palloc(sizeof(ScanKeyData) * nkeys); else - scan->rs_key = NULL; + scan->rs_base.rs_key = NULL; initscan(scan, key, false); - return scan; + return (TableScanDesc) scan; } -/* ---------------- - * heap_rescan - restart a relation scan - * ---------------- - */ void -heap_rescan(HeapScanDesc scan, - ScanKey key) +heap_rescan(TableScanDesc sscan, ScanKey key, bool set_params, + bool allow_strat, bool allow_sync, bool allow_pagemode) { + HeapScanDesc scan = (HeapScanDesc) sscan; + + if (set_params) + { + scan->rs_base.rs_allow_strat = allow_strat; + scan->rs_base.rs_allow_sync = allow_sync; + scan->rs_base.rs_pageatatime = + allow_pagemode && IsMVCCSnapshot(scan->rs_base.rs_snapshot); + } + /* * unpin scan buffers */ @@ -1259,37 +1207,11 @@ heap_rescan(HeapScanDesc scan, initscan(scan, key, true); } -/* ---------------- - * heap_rescan_set_params - restart a relation scan after changing params - * - * This call allows changing the buffer strategy, syncscan, and pagemode - * options before starting a fresh scan. Note that although the actual use - * of syncscan might change (effectively, enabling or disabling reporting), - * the previously selected startblock will be kept. - * ---------------- - */ void -heap_rescan_set_params(HeapScanDesc scan, ScanKey key, - bool allow_strat, bool allow_sync, bool allow_pagemode) +heap_endscan(TableScanDesc sscan) { - /* adjust parameters */ - scan->rs_allow_strat = allow_strat; - scan->rs_allow_sync = allow_sync; - scan->rs_pageatatime = allow_pagemode && IsMVCCSnapshot(scan->rs_snapshot); - /* ... and rescan */ - heap_rescan(scan, key); -} + HeapScanDesc scan = (HeapScanDesc) sscan; -/* ---------------- - * heap_endscan - end relation scan - * - * See how to integrate with index scans. - * Check handling if reldesc caching. - * ---------------- - */ -void -heap_endscan(HeapScanDesc scan) -{ /* Note: no locking manipulations needed */ /* @@ -1301,246 +1223,20 @@ heap_endscan(HeapScanDesc scan) /* * decrement relation reference count and free scan descriptor storage */ - RelationDecrementReferenceCount(scan->rs_rd); + RelationDecrementReferenceCount(scan->rs_base.rs_rd); - if (scan->rs_key) - pfree(scan->rs_key); + if (scan->rs_base.rs_key) + pfree(scan->rs_base.rs_key); if (scan->rs_strategy != NULL) FreeAccessStrategy(scan->rs_strategy); - if (scan->rs_temp_snap) - UnregisterSnapshot(scan->rs_snapshot); + if (scan->rs_base.rs_temp_snap) + UnregisterSnapshot(scan->rs_base.rs_snapshot); pfree(scan); } -/* ---------------- - * heap_parallelscan_estimate - estimate storage for ParallelHeapScanDesc - * - * Sadly, this doesn't reduce to a constant, because the size required - * to serialize the snapshot can vary. - * ---------------- - */ -Size -heap_parallelscan_estimate(Snapshot snapshot) -{ - Size sz = offsetof(ParallelHeapScanDescData, phs_snapshot_data); - - if (IsMVCCSnapshot(snapshot)) - sz = add_size(sz, EstimateSnapshotSpace(snapshot)); - else - Assert(snapshot == SnapshotAny); - - return sz; -} - -/* ---------------- - * heap_parallelscan_initialize - initialize ParallelHeapScanDesc - * - * Must allow as many bytes of shared memory as returned by - * heap_parallelscan_estimate. Call this just once in the leader - * process; then, individual workers attach via heap_beginscan_parallel. - * ---------------- - */ -void -heap_parallelscan_initialize(ParallelHeapScanDesc target, Relation relation, - Snapshot snapshot) -{ - target->phs_relid = RelationGetRelid(relation); - target->phs_nblocks = RelationGetNumberOfBlocks(relation); - /* compare phs_syncscan initialization to similar logic in initscan */ - target->phs_syncscan = synchronize_seqscans && - !RelationUsesLocalBuffers(relation) && - target->phs_nblocks > NBuffers / 4; - SpinLockInit(&target->phs_mutex); - target->phs_startblock = InvalidBlockNumber; - pg_atomic_init_u64(&target->phs_nallocated, 0); - if (IsMVCCSnapshot(snapshot)) - { - SerializeSnapshot(snapshot, target->phs_snapshot_data); - target->phs_snapshot_any = false; - } - else - { - Assert(snapshot == SnapshotAny); - target->phs_snapshot_any = true; - } -} - -/* ---------------- - * heap_parallelscan_reinitialize - reset a parallel scan - * - * Call this in the leader process. Caller is responsible for - * making sure that all workers have finished the scan beforehand. - * ---------------- - */ -void -heap_parallelscan_reinitialize(ParallelHeapScanDesc parallel_scan) -{ - pg_atomic_write_u64(¶llel_scan->phs_nallocated, 0); -} - -/* ---------------- - * heap_beginscan_parallel - join a parallel scan - * - * Caller must hold a suitable lock on the correct relation. - * ---------------- - */ -HeapScanDesc -heap_beginscan_parallel(Relation relation, ParallelHeapScanDesc parallel_scan) -{ - Snapshot snapshot; - - Assert(RelationGetRelid(relation) == parallel_scan->phs_relid); - - if (!parallel_scan->phs_snapshot_any) - { - /* Snapshot was serialized -- restore it */ - snapshot = RestoreSnapshot(parallel_scan->phs_snapshot_data); - RegisterSnapshot(snapshot); - } - else - { - /* SnapshotAny passed by caller (not serialized) */ - snapshot = SnapshotAny; - } - - return heap_beginscan_internal(relation, snapshot, 0, NULL, parallel_scan, - true, true, true, false, false, - !parallel_scan->phs_snapshot_any); -} - -/* ---------------- - * heap_parallelscan_startblock_init - find and set the scan's startblock - * - * Determine where the parallel seq scan should start. This function may - * be called many times, once by each parallel worker. We must be careful - * only to set the startblock once. - * ---------------- - */ -static void -heap_parallelscan_startblock_init(HeapScanDesc scan) -{ - BlockNumber sync_startpage = InvalidBlockNumber; - ParallelHeapScanDesc parallel_scan; - - Assert(scan->rs_parallel); - parallel_scan = scan->rs_parallel; - -retry: - /* Grab the spinlock. */ - SpinLockAcquire(¶llel_scan->phs_mutex); - - /* - * If the scan's startblock has not yet been initialized, we must do so - * now. If this is not a synchronized scan, we just start at block 0, but - * if it is a synchronized scan, we must get the starting position from - * the synchronized scan machinery. We can't hold the spinlock while - * doing that, though, so release the spinlock, get the information we - * need, and retry. If nobody else has initialized the scan in the - * meantime, we'll fill in the value we fetched on the second time - * through. - */ - if (parallel_scan->phs_startblock == InvalidBlockNumber) - { - if (!parallel_scan->phs_syncscan) - parallel_scan->phs_startblock = 0; - else if (sync_startpage != InvalidBlockNumber) - parallel_scan->phs_startblock = sync_startpage; - else - { - SpinLockRelease(¶llel_scan->phs_mutex); - sync_startpage = ss_get_location(scan->rs_rd, scan->rs_nblocks); - goto retry; - } - } - SpinLockRelease(¶llel_scan->phs_mutex); -} - -/* ---------------- - * heap_parallelscan_nextpage - get the next page to scan - * - * Get the next page to scan. Even if there are no pages left to scan, - * another backend could have grabbed a page to scan and not yet finished - * looking at it, so it doesn't follow that the scan is done when the - * first backend gets an InvalidBlockNumber return. - * ---------------- - */ -static BlockNumber -heap_parallelscan_nextpage(HeapScanDesc scan) -{ - BlockNumber page; - ParallelHeapScanDesc parallel_scan; - uint64 nallocated; - - Assert(scan->rs_parallel); - parallel_scan = scan->rs_parallel; - - /* - * phs_nallocated tracks how many pages have been allocated to workers - * already. When phs_nallocated >= rs_nblocks, all blocks have been - * allocated. - * - * Because we use an atomic fetch-and-add to fetch the current value, the - * phs_nallocated counter will exceed rs_nblocks, because workers will - * still increment the value, when they try to allocate the next block but - * all blocks have been allocated already. The counter must be 64 bits - * wide because of that, to avoid wrapping around when rs_nblocks is close - * to 2^32. - * - * The actual page to return is calculated by adding the counter to the - * starting block number, modulo nblocks. - */ - nallocated = pg_atomic_fetch_add_u64(¶llel_scan->phs_nallocated, 1); - if (nallocated >= scan->rs_nblocks) - page = InvalidBlockNumber; /* all blocks have been allocated */ - else - page = (nallocated + parallel_scan->phs_startblock) % scan->rs_nblocks; - - /* - * Report scan location. Normally, we report the current page number. - * When we reach the end of the scan, though, we report the starting page, - * not the ending page, just so the starting positions for later scans - * doesn't slew backwards. We only report the position at the end of the - * scan once, though: subsequent callers will report nothing. - */ - if (scan->rs_syncscan) - { - if (page != InvalidBlockNumber) - ss_report_location(scan->rs_rd, page); - else if (nallocated == scan->rs_nblocks) - ss_report_location(scan->rs_rd, parallel_scan->phs_startblock); - } - - return page; -} - -/* ---------------- - * heap_update_snapshot - * - * Update snapshot info in heap scan descriptor. - * ---------------- - */ -void -heap_update_snapshot(HeapScanDesc scan, Snapshot snapshot) -{ - Assert(IsMVCCSnapshot(snapshot)); - - RegisterSnapshot(snapshot); - scan->rs_snapshot = snapshot; - scan->rs_temp_snap = true; -} - -/* ---------------- - * heap_getnext - retrieve next tuple in scan - * - * Fix to work with index relations. - * We don't return the buffer anymore, but you can get it from the - * returned HeapTuple. - * ---------------- - */ - #ifdef HEAPDEBUGALL #define HEAPDEBUG_1 \ elog(DEBUG2, "heap_getnext([%s,nkeys=%d],dir=%d) called", \ @@ -1557,17 +1253,32 @@ heap_update_snapshot(HeapScanDesc scan, Snapshot snapshot) HeapTuple -heap_getnext(HeapScanDesc scan, ScanDirection direction) +heap_getnext(TableScanDesc sscan, ScanDirection direction) { + HeapScanDesc scan = (HeapScanDesc) sscan; + + /* + * This is still widely used directly, without going through table AM, so + * add a safety check. It's possible we should, at a later point, + * downgrade this to an assert. The reason for checking the AM routine, + * rather than the AM oid, is that this allows to write regression tests + * that create another AM reusing the heap handler. + */ + if (unlikely(sscan->rs_rd->rd_tableam != GetHeapamTableAmRoutine())) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("only heap AM is supported"))); + /* Note: no locking manipulations needed */ HEAPDEBUG_1; /* heap_getnext( info ) */ - if (scan->rs_pageatatime) + if (scan->rs_base.rs_pageatatime) heapgettup_pagemode(scan, direction, - scan->rs_nkeys, scan->rs_key); + scan->rs_base.rs_nkeys, scan->rs_base.rs_key); else - heapgettup(scan, direction, scan->rs_nkeys, scan->rs_key); + heapgettup(scan, direction, + scan->rs_base.rs_nkeys, scan->rs_base.rs_key); if (scan->rs_ctup.t_data == NULL) { @@ -1581,9 +1292,58 @@ heap_getnext(HeapScanDesc scan, ScanDirection direction) */ HEAPDEBUG_3; /* heap_getnext returning tuple */ - pgstat_count_heap_getnext(scan->rs_rd); + pgstat_count_heap_getnext(scan->rs_base.rs_rd); + + return &scan->rs_ctup; +} + +#ifdef HEAPAMSLOTDEBUGALL +#define HEAPAMSLOTDEBUG_1 \ + elog(DEBUG2, "heapam_getnextslot([%s,nkeys=%d],dir=%d) called", \ + RelationGetRelationName(scan->rs_base.rs_rd), scan->rs_base.rs_nkeys, (int) direction) +#define HEAPAMSLOTDEBUG_2 \ + elog(DEBUG2, "heapam_getnextslot returning EOS") +#define HEAPAMSLOTDEBUG_3 \ + elog(DEBUG2, "heapam_getnextslot returning tuple") +#else +#define HEAPAMSLOTDEBUG_1 +#define HEAPAMSLOTDEBUG_2 +#define HEAPAMSLOTDEBUG_3 +#endif + +bool +heap_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlot *slot) +{ + HeapScanDesc scan = (HeapScanDesc) sscan; + + /* Note: no locking manipulations needed */ + + HEAPAMSLOTDEBUG_1; /* heap_getnextslot( info ) */ + + if (scan->rs_base.rs_pageatatime) + heapgettup_pagemode(scan, direction, + scan->rs_base.rs_nkeys, scan->rs_base.rs_key); + else + heapgettup(scan, direction, scan->rs_base.rs_nkeys, scan->rs_base.rs_key); - return &(scan->rs_ctup); + if (scan->rs_ctup.t_data == NULL) + { + HEAPAMSLOTDEBUG_2; /* heap_getnextslot returning EOS */ + ExecClearTuple(slot); + return false; + } + + /* + * if we get here it means we have a new current scan tuple, so point to + * the proper return buffer and return the tuple. + */ + HEAPAMSLOTDEBUG_3; /* heap_getnextslot returning tuple */ + + pgstat_count_heap_getnext(scan->rs_base.rs_rd); + + ExecStoreBufferHeapTuple(&scan->rs_ctup, slot, + scan->rs_cbuf); + return true; } /* |