aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/heap/heapam.c
diff options
context:
space:
mode:
authorAndres Freund <andres@anarazel.de>2019-03-11 12:46:41 -0700
committerAndres Freund <andres@anarazel.de>2019-03-11 12:46:41 -0700
commitc2fe139c201c48f1133e9fbea2dd99b8efe2fadd (patch)
treeab0a6261b412b8284b6c91af158f72af97e02a35 /src/backend/access/heap/heapam.c
parenta47841528107921f02c280e0c5f91c5a1d86adb0 (diff)
downloadpostgresql-c2fe139c201c48f1133e9fbea2dd99b8efe2fadd.tar.gz
postgresql-c2fe139c201c48f1133e9fbea2dd99b8efe2fadd.zip
tableam: Add and use scan APIs.
Too allow table accesses to be not directly dependent on heap, several new abstractions are needed. Specifically: 1) Heap scans need to be generalized into table scans. Do this by introducing TableScanDesc, which will be the "base class" for individual AMs. This contains the AM independent fields from HeapScanDesc. The previous heap_{beginscan,rescan,endscan} et al. have been replaced with a table_ version. There's no direct replacement for heap_getnext(), as that returned a HeapTuple, which is undesirable for a other AMs. Instead there's table_scan_getnextslot(). But note that heap_getnext() lives on, it's still used widely to access catalog tables. This is achieved by new scan_begin, scan_end, scan_rescan, scan_getnextslot callbacks. 2) The portion of parallel scans that's shared between backends need to be able to do so without the user doing per-AM work. To achieve that new parallelscan_{estimate, initialize, reinitialize} callbacks are introduced, which operate on a new ParallelTableScanDesc, which again can be subclassed by AMs. As it is likely that several AMs are going to be block oriented, block oriented callbacks that can be shared between such AMs are provided and used by heap. table_block_parallelscan_{estimate, intiialize, reinitialize} as callbacks, and table_block_parallelscan_{nextpage, init} for use in AMs. These operate on a ParallelBlockTableScanDesc. 3) Index scans need to be able to access tables to return a tuple, and there needs to be state across individual accesses to the heap to store state like buffers. That's now handled by introducing a sort-of-scan IndexFetchTable, which again is intended to be subclassed by individual AMs (for heap IndexFetchHeap). The relevant callbacks for an AM are index_fetch_{end, begin, reset} to create the necessary state, and index_fetch_tuple to retrieve an indexed tuple. Note that index_fetch_tuple implementations need to be smarter than just blindly fetching the tuples for AMs that have optimizations similar to heap's HOT - the currently alive tuple in the update chain needs to be fetched if appropriate. Similar to table_scan_getnextslot(), it's undesirable to continue to return HeapTuples. Thus index_fetch_heap (might want to rename that later) now accepts a slot as an argument. Core code doesn't have a lot of call sites performing index scans without going through the systable_* API (in contrast to loads of heap_getnext calls and working directly with HeapTuples). Index scans now store the result of a search in IndexScanDesc->xs_heaptid, rather than xs_ctup->t_self. As the target is not generally a HeapTuple anymore that seems cleaner. To be able to sensible adapt code to use the above, two further callbacks have been introduced: a) slot_callbacks returns a TupleTableSlotOps* suitable for creating slots capable of holding a tuple of the AMs type. table_slot_callbacks() and table_slot_create() are based upon that, but have additional logic to deal with views, foreign tables, etc. While this change could have been done separately, nearly all the call sites that needed to be adapted for the rest of this commit also would have been needed to be adapted for table_slot_callbacks(), making separation not worthwhile. b) tuple_satisfies_snapshot checks whether the tuple in a slot is currently visible according to a snapshot. That's required as a few places now don't have a buffer + HeapTuple around, but a slot (which in heap's case internally has that information). Additionally a few infrastructure changes were needed: I) SysScanDesc, as used by systable_{beginscan, getnext} et al. now internally uses a slot to keep track of tuples. While systable_getnext() still returns HeapTuples, and will so for the foreseeable future, the index API (see 1) above) now only deals with slots. The remainder, and largest part, of this commit is then adjusting all scans in postgres to use the new APIs. Author: Andres Freund, Haribabu Kommi, Alvaro Herrera Discussion: https://postgr.es/m/20180703070645.wchpu5muyto5n647@alap3.anarazel.de https://postgr.es/m/20160812231527.GA690404@alvherre.pgsql
Diffstat (limited to 'src/backend/access/heap/heapam.c')
-rw-r--r--src/backend/access/heap/heapam.c650
1 files changed, 205 insertions, 445 deletions
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index dc3499349b6..3c8a5da0bc8 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -41,6 +41,7 @@
#include "access/parallel.h"
#include "access/relscan.h"
#include "access/sysattr.h"
+#include "access/tableam.h"
#include "access/transam.h"
#include "access/tuptoaster.h"
#include "access/valid.h"
@@ -68,22 +69,6 @@
#include "utils/snapmgr.h"
-/* GUC variable */
-bool synchronize_seqscans = true;
-
-
-static HeapScanDesc heap_beginscan_internal(Relation relation,
- Snapshot snapshot,
- int nkeys, ScanKey key,
- ParallelHeapScanDesc parallel_scan,
- bool allow_strat,
- bool allow_sync,
- bool allow_pagemode,
- bool is_bitmapscan,
- bool is_samplescan,
- bool temp_snap);
-static void heap_parallelscan_startblock_init(HeapScanDesc scan);
-static BlockNumber heap_parallelscan_nextpage(HeapScanDesc scan);
static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup,
TransactionId xid, CommandId cid, int options);
static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf,
@@ -207,6 +192,7 @@ static const int MultiXactStatusLock[MaxMultiXactStatus + 1] =
static void
initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
{
+ ParallelBlockTableScanDesc bpscan = NULL;
bool allow_strat;
bool allow_sync;
@@ -221,10 +207,13 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
* results for a non-MVCC snapshot, the caller must hold some higher-level
* lock that ensures the interesting tuple(s) won't change.)
*/
- if (scan->rs_parallel != NULL)
- scan->rs_nblocks = scan->rs_parallel->phs_nblocks;
+ if (scan->rs_base.rs_parallel != NULL)
+ {
+ bpscan = (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel;
+ scan->rs_nblocks = bpscan->phs_nblocks;
+ }
else
- scan->rs_nblocks = RelationGetNumberOfBlocks(scan->rs_rd);
+ scan->rs_nblocks = RelationGetNumberOfBlocks(scan->rs_base.rs_rd);
/*
* If the table is large relative to NBuffers, use a bulk-read access
@@ -238,11 +227,11 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
* Note that heap_parallelscan_initialize has a very similar test; if you
* change this, consider changing that one, too.
*/
- if (!RelationUsesLocalBuffers(scan->rs_rd) &&
+ if (!RelationUsesLocalBuffers(scan->rs_base.rs_rd) &&
scan->rs_nblocks > NBuffers / 4)
{
- allow_strat = scan->rs_allow_strat;
- allow_sync = scan->rs_allow_sync;
+ allow_strat = scan->rs_base.rs_allow_strat;
+ allow_sync = scan->rs_base.rs_allow_sync;
}
else
allow_strat = allow_sync = false;
@@ -260,10 +249,10 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
scan->rs_strategy = NULL;
}
- if (scan->rs_parallel != NULL)
+ if (scan->rs_base.rs_parallel != NULL)
{
- /* For parallel scan, believe whatever ParallelHeapScanDesc says. */
- scan->rs_syncscan = scan->rs_parallel->phs_syncscan;
+ /* For parallel scan, believe whatever ParallelTableScanDesc says. */
+ scan->rs_base.rs_syncscan = scan->rs_base.rs_parallel->phs_syncscan;
}
else if (keep_startblock)
{
@@ -272,16 +261,16 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
* so that rewinding a cursor doesn't generate surprising results.
* Reset the active syncscan setting, though.
*/
- scan->rs_syncscan = (allow_sync && synchronize_seqscans);
+ scan->rs_base.rs_syncscan = (allow_sync && synchronize_seqscans);
}
else if (allow_sync && synchronize_seqscans)
{
- scan->rs_syncscan = true;
- scan->rs_startblock = ss_get_location(scan->rs_rd, scan->rs_nblocks);
+ scan->rs_base.rs_syncscan = true;
+ scan->rs_startblock = ss_get_location(scan->rs_base.rs_rd, scan->rs_nblocks);
}
else
{
- scan->rs_syncscan = false;
+ scan->rs_base.rs_syncscan = false;
scan->rs_startblock = 0;
}
@@ -298,15 +287,15 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
* copy the scan key, if appropriate
*/
if (key != NULL)
- memcpy(scan->rs_key, key, scan->rs_nkeys * sizeof(ScanKeyData));
+ memcpy(scan->rs_base.rs_key, key, scan->rs_base.rs_nkeys * sizeof(ScanKeyData));
/*
* Currently, we don't have a stats counter for bitmap heap scans (but the
* underlying bitmap index scans will be counted) or sample scans (we only
* update stats for tuple fetches there)
*/
- if (!scan->rs_bitmapscan && !scan->rs_samplescan)
- pgstat_count_heap_scan(scan->rs_rd);
+ if (!scan->rs_base.rs_bitmapscan && !scan->rs_base.rs_samplescan)
+ pgstat_count_heap_scan(scan->rs_base.rs_rd);
}
/*
@@ -316,10 +305,12 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
* numBlks is number of pages to scan (InvalidBlockNumber means "all")
*/
void
-heap_setscanlimits(HeapScanDesc scan, BlockNumber startBlk, BlockNumber numBlks)
+heap_setscanlimits(TableScanDesc sscan, BlockNumber startBlk, BlockNumber numBlks)
{
+ HeapScanDesc scan = (HeapScanDesc) sscan;
+
Assert(!scan->rs_inited); /* else too late to change */
- Assert(!scan->rs_syncscan); /* else rs_startblock is significant */
+ Assert(!scan->rs_base.rs_syncscan); /* else rs_startblock is significant */
/* Check startBlk is valid (but allow case of zero blocks...) */
Assert(startBlk == 0 || startBlk < scan->rs_nblocks);
@@ -336,8 +327,9 @@ heap_setscanlimits(HeapScanDesc scan, BlockNumber startBlk, BlockNumber numBlks)
* which tuples on the page are visible.
*/
void
-heapgetpage(HeapScanDesc scan, BlockNumber page)
+heapgetpage(TableScanDesc sscan, BlockNumber page)
{
+ HeapScanDesc scan = (HeapScanDesc) sscan;
Buffer buffer;
Snapshot snapshot;
Page dp;
@@ -364,20 +356,20 @@ heapgetpage(HeapScanDesc scan, BlockNumber page)
CHECK_FOR_INTERRUPTS();
/* read page using selected strategy */
- scan->rs_cbuf = ReadBufferExtended(scan->rs_rd, MAIN_FORKNUM, page,
+ scan->rs_cbuf = ReadBufferExtended(scan->rs_base.rs_rd, MAIN_FORKNUM, page,
RBM_NORMAL, scan->rs_strategy);
scan->rs_cblock = page;
- if (!scan->rs_pageatatime)
+ if (!scan->rs_base.rs_pageatatime)
return;
buffer = scan->rs_cbuf;
- snapshot = scan->rs_snapshot;
+ snapshot = scan->rs_base.rs_snapshot;
/*
* Prune and repair fragmentation for the whole page, if possible.
*/
- heap_page_prune_opt(scan->rs_rd, buffer);
+ heap_page_prune_opt(scan->rs_base.rs_rd, buffer);
/*
* We must hold share lock on the buffer content while examining tuple
@@ -387,7 +379,7 @@ heapgetpage(HeapScanDesc scan, BlockNumber page)
LockBuffer(buffer, BUFFER_LOCK_SHARE);
dp = BufferGetPage(buffer);
- TestForOldSnapshot(snapshot, scan->rs_rd, dp);
+ TestForOldSnapshot(snapshot, scan->rs_base.rs_rd, dp);
lines = PageGetMaxOffsetNumber(dp);
ntup = 0;
@@ -422,7 +414,7 @@ heapgetpage(HeapScanDesc scan, BlockNumber page)
HeapTupleData loctup;
bool valid;
- loctup.t_tableOid = RelationGetRelid(scan->rs_rd);
+ loctup.t_tableOid = RelationGetRelid(scan->rs_base.rs_rd);
loctup.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
loctup.t_len = ItemIdGetLength(lpp);
ItemPointerSet(&(loctup.t_self), page, lineoff);
@@ -432,8 +424,8 @@ heapgetpage(HeapScanDesc scan, BlockNumber page)
else
valid = HeapTupleSatisfiesVisibility(&loctup, snapshot, buffer);
- CheckForSerializableConflictOut(valid, scan->rs_rd, &loctup,
- buffer, snapshot);
+ CheckForSerializableConflictOut(valid, scan->rs_base.rs_rd,
+ &loctup, buffer, snapshot);
if (valid)
scan->rs_vistuples[ntup++] = lineoff;
@@ -476,7 +468,7 @@ heapgettup(HeapScanDesc scan,
ScanKey key)
{
HeapTuple tuple = &(scan->rs_ctup);
- Snapshot snapshot = scan->rs_snapshot;
+ Snapshot snapshot = scan->rs_base.rs_snapshot;
bool backward = ScanDirectionIsBackward(dir);
BlockNumber page;
bool finished;
@@ -502,11 +494,16 @@ heapgettup(HeapScanDesc scan,
tuple->t_data = NULL;
return;
}
- if (scan->rs_parallel != NULL)
+ if (scan->rs_base.rs_parallel != NULL)
{
- heap_parallelscan_startblock_init(scan);
+ ParallelBlockTableScanDesc pbscan =
+ (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel;
- page = heap_parallelscan_nextpage(scan);
+ table_block_parallelscan_startblock_init(scan->rs_base.rs_rd,
+ pbscan);
+
+ page = table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
+ pbscan);
/* Other processes might have already finished the scan. */
if (page == InvalidBlockNumber)
@@ -518,7 +515,7 @@ heapgettup(HeapScanDesc scan,
}
else
page = scan->rs_startblock; /* first page */
- heapgetpage(scan, page);
+ heapgetpage((TableScanDesc) scan, page);
lineoff = FirstOffsetNumber; /* first offnum */
scan->rs_inited = true;
}
@@ -533,7 +530,7 @@ heapgettup(HeapScanDesc scan,
LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
dp = BufferGetPage(scan->rs_cbuf);
- TestForOldSnapshot(snapshot, scan->rs_rd, dp);
+ TestForOldSnapshot(snapshot, scan->rs_base.rs_rd, dp);
lines = PageGetMaxOffsetNumber(dp);
/* page and lineoff now reference the physically next tid */
@@ -542,7 +539,7 @@ heapgettup(HeapScanDesc scan,
else if (backward)
{
/* backward parallel scan not supported */
- Assert(scan->rs_parallel == NULL);
+ Assert(scan->rs_base.rs_parallel == NULL);
if (!scan->rs_inited)
{
@@ -562,13 +559,13 @@ heapgettup(HeapScanDesc scan,
* time, and much more likely that we'll just bollix things for
* forward scanners.
*/
- scan->rs_syncscan = false;
+ scan->rs_base.rs_syncscan = false;
/* start from last page of the scan */
if (scan->rs_startblock > 0)
page = scan->rs_startblock - 1;
else
page = scan->rs_nblocks - 1;
- heapgetpage(scan, page);
+ heapgetpage((TableScanDesc) scan, page);
}
else
{
@@ -579,7 +576,7 @@ heapgettup(HeapScanDesc scan,
LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
dp = BufferGetPage(scan->rs_cbuf);
- TestForOldSnapshot(snapshot, scan->rs_rd, dp);
+ TestForOldSnapshot(snapshot, scan->rs_base.rs_rd, dp);
lines = PageGetMaxOffsetNumber(dp);
if (!scan->rs_inited)
@@ -610,11 +607,11 @@ heapgettup(HeapScanDesc scan,
page = ItemPointerGetBlockNumber(&(tuple->t_self));
if (page != scan->rs_cblock)
- heapgetpage(scan, page);
+ heapgetpage((TableScanDesc) scan, page);
/* Since the tuple was previously fetched, needn't lock page here */
dp = BufferGetPage(scan->rs_cbuf);
- TestForOldSnapshot(snapshot, scan->rs_rd, dp);
+ TestForOldSnapshot(snapshot, scan->rs_base.rs_rd, dp);
lineoff = ItemPointerGetOffsetNumber(&(tuple->t_self));
lpp = PageGetItemId(dp, lineoff);
Assert(ItemIdIsNormal(lpp));
@@ -649,11 +646,12 @@ heapgettup(HeapScanDesc scan,
snapshot,
scan->rs_cbuf);
- CheckForSerializableConflictOut(valid, scan->rs_rd, tuple,
- scan->rs_cbuf, snapshot);
+ CheckForSerializableConflictOut(valid, scan->rs_base.rs_rd,
+ tuple, scan->rs_cbuf,
+ snapshot);
if (valid && key != NULL)
- HeapKeyTest(tuple, RelationGetDescr(scan->rs_rd),
+ HeapKeyTest(tuple, RelationGetDescr(scan->rs_base.rs_rd),
nkeys, key, valid);
if (valid)
@@ -696,9 +694,13 @@ heapgettup(HeapScanDesc scan,
page = scan->rs_nblocks;
page--;
}
- else if (scan->rs_parallel != NULL)
+ else if (scan->rs_base.rs_parallel != NULL)
{
- page = heap_parallelscan_nextpage(scan);
+ ParallelBlockTableScanDesc pbscan =
+ (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel;
+
+ page = table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
+ pbscan);
finished = (page == InvalidBlockNumber);
}
else
@@ -721,8 +723,8 @@ heapgettup(HeapScanDesc scan,
* a little bit backwards on every invocation, which is confusing.
* We don't guarantee any specific ordering in general, though.
*/
- if (scan->rs_syncscan)
- ss_report_location(scan->rs_rd, page);
+ if (scan->rs_base.rs_syncscan)
+ ss_report_location(scan->rs_base.rs_rd, page);
}
/*
@@ -739,12 +741,12 @@ heapgettup(HeapScanDesc scan,
return;
}
- heapgetpage(scan, page);
+ heapgetpage((TableScanDesc) scan, page);
LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
dp = BufferGetPage(scan->rs_cbuf);
- TestForOldSnapshot(snapshot, scan->rs_rd, dp);
+ TestForOldSnapshot(snapshot, scan->rs_base.rs_rd, dp);
lines = PageGetMaxOffsetNumber((Page) dp);
linesleft = lines;
if (backward)
@@ -806,11 +808,16 @@ heapgettup_pagemode(HeapScanDesc scan,
tuple->t_data = NULL;
return;
}
- if (scan->rs_parallel != NULL)
+ if (scan->rs_base.rs_parallel != NULL)
{
- heap_parallelscan_startblock_init(scan);
+ ParallelBlockTableScanDesc pbscan =
+ (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel;
+
+ table_block_parallelscan_startblock_init(scan->rs_base.rs_rd,
+ pbscan);
- page = heap_parallelscan_nextpage(scan);
+ page = table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
+ pbscan);
/* Other processes might have already finished the scan. */
if (page == InvalidBlockNumber)
@@ -822,7 +829,7 @@ heapgettup_pagemode(HeapScanDesc scan,
}
else
page = scan->rs_startblock; /* first page */
- heapgetpage(scan, page);
+ heapgetpage((TableScanDesc) scan, page);
lineindex = 0;
scan->rs_inited = true;
}
@@ -834,7 +841,7 @@ heapgettup_pagemode(HeapScanDesc scan,
}
dp = BufferGetPage(scan->rs_cbuf);
- TestForOldSnapshot(scan->rs_snapshot, scan->rs_rd, dp);
+ TestForOldSnapshot(scan->rs_base.rs_snapshot, scan->rs_base.rs_rd, dp);
lines = scan->rs_ntuples;
/* page and lineindex now reference the next visible tid */
@@ -843,7 +850,7 @@ heapgettup_pagemode(HeapScanDesc scan,
else if (backward)
{
/* backward parallel scan not supported */
- Assert(scan->rs_parallel == NULL);
+ Assert(scan->rs_base.rs_parallel == NULL);
if (!scan->rs_inited)
{
@@ -863,13 +870,13 @@ heapgettup_pagemode(HeapScanDesc scan,
* time, and much more likely that we'll just bollix things for
* forward scanners.
*/
- scan->rs_syncscan = false;
+ scan->rs_base.rs_syncscan = false;
/* start from last page of the scan */
if (scan->rs_startblock > 0)
page = scan->rs_startblock - 1;
else
page = scan->rs_nblocks - 1;
- heapgetpage(scan, page);
+ heapgetpage((TableScanDesc) scan, page);
}
else
{
@@ -878,7 +885,7 @@ heapgettup_pagemode(HeapScanDesc scan,
}
dp = BufferGetPage(scan->rs_cbuf);
- TestForOldSnapshot(scan->rs_snapshot, scan->rs_rd, dp);
+ TestForOldSnapshot(scan->rs_base.rs_snapshot, scan->rs_base.rs_rd, dp);
lines = scan->rs_ntuples;
if (!scan->rs_inited)
@@ -908,11 +915,11 @@ heapgettup_pagemode(HeapScanDesc scan,
page = ItemPointerGetBlockNumber(&(tuple->t_self));
if (page != scan->rs_cblock)
- heapgetpage(scan, page);
+ heapgetpage((TableScanDesc) scan, page);
/* Since the tuple was previously fetched, needn't lock page here */
dp = BufferGetPage(scan->rs_cbuf);
- TestForOldSnapshot(scan->rs_snapshot, scan->rs_rd, dp);
+ TestForOldSnapshot(scan->rs_base.rs_snapshot, scan->rs_base.rs_rd, dp);
lineoff = ItemPointerGetOffsetNumber(&(tuple->t_self));
lpp = PageGetItemId(dp, lineoff);
Assert(ItemIdIsNormal(lpp));
@@ -950,7 +957,7 @@ heapgettup_pagemode(HeapScanDesc scan,
{
bool valid;
- HeapKeyTest(tuple, RelationGetDescr(scan->rs_rd),
+ HeapKeyTest(tuple, RelationGetDescr(scan->rs_base.rs_rd),
nkeys, key, valid);
if (valid)
{
@@ -986,9 +993,13 @@ heapgettup_pagemode(HeapScanDesc scan,
page = scan->rs_nblocks;
page--;
}
- else if (scan->rs_parallel != NULL)
+ else if (scan->rs_base.rs_parallel != NULL)
{
- page = heap_parallelscan_nextpage(scan);
+ ParallelBlockTableScanDesc pbscan =
+ (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel;
+
+ page = table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
+ pbscan);
finished = (page == InvalidBlockNumber);
}
else
@@ -1011,8 +1022,8 @@ heapgettup_pagemode(HeapScanDesc scan,
* a little bit backwards on every invocation, which is confusing.
* We don't guarantee any specific ordering in general, though.
*/
- if (scan->rs_syncscan)
- ss_report_location(scan->rs_rd, page);
+ if (scan->rs_base.rs_syncscan)
+ ss_report_location(scan->rs_base.rs_rd, page);
}
/*
@@ -1029,10 +1040,10 @@ heapgettup_pagemode(HeapScanDesc scan,
return;
}
- heapgetpage(scan, page);
+ heapgetpage((TableScanDesc) scan, page);
dp = BufferGetPage(scan->rs_cbuf);
- TestForOldSnapshot(scan->rs_snapshot, scan->rs_rd, dp);
+ TestForOldSnapshot(scan->rs_base.rs_snapshot, scan->rs_base.rs_rd, dp);
lines = scan->rs_ntuples;
linesleft = lines;
if (backward)
@@ -1095,86 +1106,16 @@ fastgetattr(HeapTuple tup, int attnum, TupleDesc tupleDesc,
*/
-/* ----------------
- * heap_beginscan - begin relation scan
- *
- * heap_beginscan is the "standard" case.
- *
- * heap_beginscan_catalog differs in setting up its own temporary snapshot.
- *
- * heap_beginscan_strat offers an extended API that lets the caller control
- * whether a nondefault buffer access strategy can be used, and whether
- * syncscan can be chosen (possibly resulting in the scan not starting from
- * block zero). Both of these default to true with plain heap_beginscan.
- *
- * heap_beginscan_bm is an alternative entry point for setting up a
- * HeapScanDesc for a bitmap heap scan. Although that scan technology is
- * really quite unlike a standard seqscan, there is just enough commonality
- * to make it worth using the same data structure.
- *
- * heap_beginscan_sampling is an alternative entry point for setting up a
- * HeapScanDesc for a TABLESAMPLE scan. As with bitmap scans, it's worth
- * using the same data structure although the behavior is rather different.
- * In addition to the options offered by heap_beginscan_strat, this call
- * also allows control of whether page-mode visibility checking is used.
- * ----------------
- */
-HeapScanDesc
+TableScanDesc
heap_beginscan(Relation relation, Snapshot snapshot,
- int nkeys, ScanKey key)
-{
- return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL,
- true, true, true, false, false, false);
-}
-
-HeapScanDesc
-heap_beginscan_catalog(Relation relation, int nkeys, ScanKey key)
-{
- Oid relid = RelationGetRelid(relation);
- Snapshot snapshot = RegisterSnapshot(GetCatalogSnapshot(relid));
-
- return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL,
- true, true, true, false, false, true);
-}
-
-HeapScanDesc
-heap_beginscan_strat(Relation relation, Snapshot snapshot,
- int nkeys, ScanKey key,
- bool allow_strat, bool allow_sync)
-{
- return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL,
- allow_strat, allow_sync, true,
- false, false, false);
-}
-
-HeapScanDesc
-heap_beginscan_bm(Relation relation, Snapshot snapshot,
- int nkeys, ScanKey key)
-{
- return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL,
- false, false, true, true, false, false);
-}
-
-HeapScanDesc
-heap_beginscan_sampling(Relation relation, Snapshot snapshot,
- int nkeys, ScanKey key,
- bool allow_strat, bool allow_sync, bool allow_pagemode)
-{
- return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL,
- allow_strat, allow_sync, allow_pagemode,
- false, true, false);
-}
-
-static HeapScanDesc
-heap_beginscan_internal(Relation relation, Snapshot snapshot,
- int nkeys, ScanKey key,
- ParallelHeapScanDesc parallel_scan,
- bool allow_strat,
- bool allow_sync,
- bool allow_pagemode,
- bool is_bitmapscan,
- bool is_samplescan,
- bool temp_snap)
+ int nkeys, ScanKey key,
+ ParallelTableScanDesc parallel_scan,
+ bool allow_strat,
+ bool allow_sync,
+ bool allow_pagemode,
+ bool is_bitmapscan,
+ bool is_samplescan,
+ bool temp_snap)
{
HeapScanDesc scan;
@@ -1192,21 +1133,22 @@ heap_beginscan_internal(Relation relation, Snapshot snapshot,
*/
scan = (HeapScanDesc) palloc(sizeof(HeapScanDescData));
- scan->rs_rd = relation;
- scan->rs_snapshot = snapshot;
- scan->rs_nkeys = nkeys;
- scan->rs_bitmapscan = is_bitmapscan;
- scan->rs_samplescan = is_samplescan;
+ scan->rs_base.rs_rd = relation;
+ scan->rs_base.rs_snapshot = snapshot;
+ scan->rs_base.rs_nkeys = nkeys;
+ scan->rs_base.rs_bitmapscan = is_bitmapscan;
+ scan->rs_base.rs_samplescan = is_samplescan;
scan->rs_strategy = NULL; /* set in initscan */
- scan->rs_allow_strat = allow_strat;
- scan->rs_allow_sync = allow_sync;
- scan->rs_temp_snap = temp_snap;
- scan->rs_parallel = parallel_scan;
+ scan->rs_base.rs_allow_strat = allow_strat;
+ scan->rs_base.rs_allow_sync = allow_sync;
+ scan->rs_base.rs_temp_snap = temp_snap;
+ scan->rs_base.rs_parallel = parallel_scan;
/*
* we can use page-at-a-time mode if it's an MVCC-safe snapshot
*/
- scan->rs_pageatatime = allow_pagemode && IsMVCCSnapshot(snapshot);
+ scan->rs_base.rs_pageatatime =
+ allow_pagemode && snapshot && IsMVCCSnapshot(snapshot);
/*
* For a seqscan in a serializable transaction, acquire a predicate lock
@@ -1230,23 +1172,29 @@ heap_beginscan_internal(Relation relation, Snapshot snapshot,
* initscan() and we don't want to allocate memory again
*/
if (nkeys > 0)
- scan->rs_key = (ScanKey) palloc(sizeof(ScanKeyData) * nkeys);
+ scan->rs_base.rs_key = (ScanKey) palloc(sizeof(ScanKeyData) * nkeys);
else
- scan->rs_key = NULL;
+ scan->rs_base.rs_key = NULL;
initscan(scan, key, false);
- return scan;
+ return (TableScanDesc) scan;
}
-/* ----------------
- * heap_rescan - restart a relation scan
- * ----------------
- */
void
-heap_rescan(HeapScanDesc scan,
- ScanKey key)
+heap_rescan(TableScanDesc sscan, ScanKey key, bool set_params,
+ bool allow_strat, bool allow_sync, bool allow_pagemode)
{
+ HeapScanDesc scan = (HeapScanDesc) sscan;
+
+ if (set_params)
+ {
+ scan->rs_base.rs_allow_strat = allow_strat;
+ scan->rs_base.rs_allow_sync = allow_sync;
+ scan->rs_base.rs_pageatatime =
+ allow_pagemode && IsMVCCSnapshot(scan->rs_base.rs_snapshot);
+ }
+
/*
* unpin scan buffers
*/
@@ -1259,37 +1207,11 @@ heap_rescan(HeapScanDesc scan,
initscan(scan, key, true);
}
-/* ----------------
- * heap_rescan_set_params - restart a relation scan after changing params
- *
- * This call allows changing the buffer strategy, syncscan, and pagemode
- * options before starting a fresh scan. Note that although the actual use
- * of syncscan might change (effectively, enabling or disabling reporting),
- * the previously selected startblock will be kept.
- * ----------------
- */
void
-heap_rescan_set_params(HeapScanDesc scan, ScanKey key,
- bool allow_strat, bool allow_sync, bool allow_pagemode)
+heap_endscan(TableScanDesc sscan)
{
- /* adjust parameters */
- scan->rs_allow_strat = allow_strat;
- scan->rs_allow_sync = allow_sync;
- scan->rs_pageatatime = allow_pagemode && IsMVCCSnapshot(scan->rs_snapshot);
- /* ... and rescan */
- heap_rescan(scan, key);
-}
+ HeapScanDesc scan = (HeapScanDesc) sscan;
-/* ----------------
- * heap_endscan - end relation scan
- *
- * See how to integrate with index scans.
- * Check handling if reldesc caching.
- * ----------------
- */
-void
-heap_endscan(HeapScanDesc scan)
-{
/* Note: no locking manipulations needed */
/*
@@ -1301,246 +1223,20 @@ heap_endscan(HeapScanDesc scan)
/*
* decrement relation reference count and free scan descriptor storage
*/
- RelationDecrementReferenceCount(scan->rs_rd);
+ RelationDecrementReferenceCount(scan->rs_base.rs_rd);
- if (scan->rs_key)
- pfree(scan->rs_key);
+ if (scan->rs_base.rs_key)
+ pfree(scan->rs_base.rs_key);
if (scan->rs_strategy != NULL)
FreeAccessStrategy(scan->rs_strategy);
- if (scan->rs_temp_snap)
- UnregisterSnapshot(scan->rs_snapshot);
+ if (scan->rs_base.rs_temp_snap)
+ UnregisterSnapshot(scan->rs_base.rs_snapshot);
pfree(scan);
}
-/* ----------------
- * heap_parallelscan_estimate - estimate storage for ParallelHeapScanDesc
- *
- * Sadly, this doesn't reduce to a constant, because the size required
- * to serialize the snapshot can vary.
- * ----------------
- */
-Size
-heap_parallelscan_estimate(Snapshot snapshot)
-{
- Size sz = offsetof(ParallelHeapScanDescData, phs_snapshot_data);
-
- if (IsMVCCSnapshot(snapshot))
- sz = add_size(sz, EstimateSnapshotSpace(snapshot));
- else
- Assert(snapshot == SnapshotAny);
-
- return sz;
-}
-
-/* ----------------
- * heap_parallelscan_initialize - initialize ParallelHeapScanDesc
- *
- * Must allow as many bytes of shared memory as returned by
- * heap_parallelscan_estimate. Call this just once in the leader
- * process; then, individual workers attach via heap_beginscan_parallel.
- * ----------------
- */
-void
-heap_parallelscan_initialize(ParallelHeapScanDesc target, Relation relation,
- Snapshot snapshot)
-{
- target->phs_relid = RelationGetRelid(relation);
- target->phs_nblocks = RelationGetNumberOfBlocks(relation);
- /* compare phs_syncscan initialization to similar logic in initscan */
- target->phs_syncscan = synchronize_seqscans &&
- !RelationUsesLocalBuffers(relation) &&
- target->phs_nblocks > NBuffers / 4;
- SpinLockInit(&target->phs_mutex);
- target->phs_startblock = InvalidBlockNumber;
- pg_atomic_init_u64(&target->phs_nallocated, 0);
- if (IsMVCCSnapshot(snapshot))
- {
- SerializeSnapshot(snapshot, target->phs_snapshot_data);
- target->phs_snapshot_any = false;
- }
- else
- {
- Assert(snapshot == SnapshotAny);
- target->phs_snapshot_any = true;
- }
-}
-
-/* ----------------
- * heap_parallelscan_reinitialize - reset a parallel scan
- *
- * Call this in the leader process. Caller is responsible for
- * making sure that all workers have finished the scan beforehand.
- * ----------------
- */
-void
-heap_parallelscan_reinitialize(ParallelHeapScanDesc parallel_scan)
-{
- pg_atomic_write_u64(&parallel_scan->phs_nallocated, 0);
-}
-
-/* ----------------
- * heap_beginscan_parallel - join a parallel scan
- *
- * Caller must hold a suitable lock on the correct relation.
- * ----------------
- */
-HeapScanDesc
-heap_beginscan_parallel(Relation relation, ParallelHeapScanDesc parallel_scan)
-{
- Snapshot snapshot;
-
- Assert(RelationGetRelid(relation) == parallel_scan->phs_relid);
-
- if (!parallel_scan->phs_snapshot_any)
- {
- /* Snapshot was serialized -- restore it */
- snapshot = RestoreSnapshot(parallel_scan->phs_snapshot_data);
- RegisterSnapshot(snapshot);
- }
- else
- {
- /* SnapshotAny passed by caller (not serialized) */
- snapshot = SnapshotAny;
- }
-
- return heap_beginscan_internal(relation, snapshot, 0, NULL, parallel_scan,
- true, true, true, false, false,
- !parallel_scan->phs_snapshot_any);
-}
-
-/* ----------------
- * heap_parallelscan_startblock_init - find and set the scan's startblock
- *
- * Determine where the parallel seq scan should start. This function may
- * be called many times, once by each parallel worker. We must be careful
- * only to set the startblock once.
- * ----------------
- */
-static void
-heap_parallelscan_startblock_init(HeapScanDesc scan)
-{
- BlockNumber sync_startpage = InvalidBlockNumber;
- ParallelHeapScanDesc parallel_scan;
-
- Assert(scan->rs_parallel);
- parallel_scan = scan->rs_parallel;
-
-retry:
- /* Grab the spinlock. */
- SpinLockAcquire(&parallel_scan->phs_mutex);
-
- /*
- * If the scan's startblock has not yet been initialized, we must do so
- * now. If this is not a synchronized scan, we just start at block 0, but
- * if it is a synchronized scan, we must get the starting position from
- * the synchronized scan machinery. We can't hold the spinlock while
- * doing that, though, so release the spinlock, get the information we
- * need, and retry. If nobody else has initialized the scan in the
- * meantime, we'll fill in the value we fetched on the second time
- * through.
- */
- if (parallel_scan->phs_startblock == InvalidBlockNumber)
- {
- if (!parallel_scan->phs_syncscan)
- parallel_scan->phs_startblock = 0;
- else if (sync_startpage != InvalidBlockNumber)
- parallel_scan->phs_startblock = sync_startpage;
- else
- {
- SpinLockRelease(&parallel_scan->phs_mutex);
- sync_startpage = ss_get_location(scan->rs_rd, scan->rs_nblocks);
- goto retry;
- }
- }
- SpinLockRelease(&parallel_scan->phs_mutex);
-}
-
-/* ----------------
- * heap_parallelscan_nextpage - get the next page to scan
- *
- * Get the next page to scan. Even if there are no pages left to scan,
- * another backend could have grabbed a page to scan and not yet finished
- * looking at it, so it doesn't follow that the scan is done when the
- * first backend gets an InvalidBlockNumber return.
- * ----------------
- */
-static BlockNumber
-heap_parallelscan_nextpage(HeapScanDesc scan)
-{
- BlockNumber page;
- ParallelHeapScanDesc parallel_scan;
- uint64 nallocated;
-
- Assert(scan->rs_parallel);
- parallel_scan = scan->rs_parallel;
-
- /*
- * phs_nallocated tracks how many pages have been allocated to workers
- * already. When phs_nallocated >= rs_nblocks, all blocks have been
- * allocated.
- *
- * Because we use an atomic fetch-and-add to fetch the current value, the
- * phs_nallocated counter will exceed rs_nblocks, because workers will
- * still increment the value, when they try to allocate the next block but
- * all blocks have been allocated already. The counter must be 64 bits
- * wide because of that, to avoid wrapping around when rs_nblocks is close
- * to 2^32.
- *
- * The actual page to return is calculated by adding the counter to the
- * starting block number, modulo nblocks.
- */
- nallocated = pg_atomic_fetch_add_u64(&parallel_scan->phs_nallocated, 1);
- if (nallocated >= scan->rs_nblocks)
- page = InvalidBlockNumber; /* all blocks have been allocated */
- else
- page = (nallocated + parallel_scan->phs_startblock) % scan->rs_nblocks;
-
- /*
- * Report scan location. Normally, we report the current page number.
- * When we reach the end of the scan, though, we report the starting page,
- * not the ending page, just so the starting positions for later scans
- * doesn't slew backwards. We only report the position at the end of the
- * scan once, though: subsequent callers will report nothing.
- */
- if (scan->rs_syncscan)
- {
- if (page != InvalidBlockNumber)
- ss_report_location(scan->rs_rd, page);
- else if (nallocated == scan->rs_nblocks)
- ss_report_location(scan->rs_rd, parallel_scan->phs_startblock);
- }
-
- return page;
-}
-
-/* ----------------
- * heap_update_snapshot
- *
- * Update snapshot info in heap scan descriptor.
- * ----------------
- */
-void
-heap_update_snapshot(HeapScanDesc scan, Snapshot snapshot)
-{
- Assert(IsMVCCSnapshot(snapshot));
-
- RegisterSnapshot(snapshot);
- scan->rs_snapshot = snapshot;
- scan->rs_temp_snap = true;
-}
-
-/* ----------------
- * heap_getnext - retrieve next tuple in scan
- *
- * Fix to work with index relations.
- * We don't return the buffer anymore, but you can get it from the
- * returned HeapTuple.
- * ----------------
- */
-
#ifdef HEAPDEBUGALL
#define HEAPDEBUG_1 \
elog(DEBUG2, "heap_getnext([%s,nkeys=%d],dir=%d) called", \
@@ -1557,17 +1253,32 @@ heap_update_snapshot(HeapScanDesc scan, Snapshot snapshot)
HeapTuple
-heap_getnext(HeapScanDesc scan, ScanDirection direction)
+heap_getnext(TableScanDesc sscan, ScanDirection direction)
{
+ HeapScanDesc scan = (HeapScanDesc) sscan;
+
+ /*
+ * This is still widely used directly, without going through table AM, so
+ * add a safety check. It's possible we should, at a later point,
+ * downgrade this to an assert. The reason for checking the AM routine,
+ * rather than the AM oid, is that this allows to write regression tests
+ * that create another AM reusing the heap handler.
+ */
+ if (unlikely(sscan->rs_rd->rd_tableam != GetHeapamTableAmRoutine()))
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("only heap AM is supported")));
+
/* Note: no locking manipulations needed */
HEAPDEBUG_1; /* heap_getnext( info ) */
- if (scan->rs_pageatatime)
+ if (scan->rs_base.rs_pageatatime)
heapgettup_pagemode(scan, direction,
- scan->rs_nkeys, scan->rs_key);
+ scan->rs_base.rs_nkeys, scan->rs_base.rs_key);
else
- heapgettup(scan, direction, scan->rs_nkeys, scan->rs_key);
+ heapgettup(scan, direction,
+ scan->rs_base.rs_nkeys, scan->rs_base.rs_key);
if (scan->rs_ctup.t_data == NULL)
{
@@ -1581,9 +1292,58 @@ heap_getnext(HeapScanDesc scan, ScanDirection direction)
*/
HEAPDEBUG_3; /* heap_getnext returning tuple */
- pgstat_count_heap_getnext(scan->rs_rd);
+ pgstat_count_heap_getnext(scan->rs_base.rs_rd);
+
+ return &scan->rs_ctup;
+}
+
+#ifdef HEAPAMSLOTDEBUGALL
+#define HEAPAMSLOTDEBUG_1 \
+ elog(DEBUG2, "heapam_getnextslot([%s,nkeys=%d],dir=%d) called", \
+ RelationGetRelationName(scan->rs_base.rs_rd), scan->rs_base.rs_nkeys, (int) direction)
+#define HEAPAMSLOTDEBUG_2 \
+ elog(DEBUG2, "heapam_getnextslot returning EOS")
+#define HEAPAMSLOTDEBUG_3 \
+ elog(DEBUG2, "heapam_getnextslot returning tuple")
+#else
+#define HEAPAMSLOTDEBUG_1
+#define HEAPAMSLOTDEBUG_2
+#define HEAPAMSLOTDEBUG_3
+#endif
+
+bool
+heap_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlot *slot)
+{
+ HeapScanDesc scan = (HeapScanDesc) sscan;
+
+ /* Note: no locking manipulations needed */
+
+ HEAPAMSLOTDEBUG_1; /* heap_getnextslot( info ) */
+
+ if (scan->rs_base.rs_pageatatime)
+ heapgettup_pagemode(scan, direction,
+ scan->rs_base.rs_nkeys, scan->rs_base.rs_key);
+ else
+ heapgettup(scan, direction, scan->rs_base.rs_nkeys, scan->rs_base.rs_key);
- return &(scan->rs_ctup);
+ if (scan->rs_ctup.t_data == NULL)
+ {
+ HEAPAMSLOTDEBUG_2; /* heap_getnextslot returning EOS */
+ ExecClearTuple(slot);
+ return false;
+ }
+
+ /*
+ * if we get here it means we have a new current scan tuple, so point to
+ * the proper return buffer and return the tuple.
+ */
+ HEAPAMSLOTDEBUG_3; /* heap_getnextslot returning tuple */
+
+ pgstat_count_heap_getnext(scan->rs_base.rs_rd);
+
+ ExecStoreBufferHeapTuple(&scan->rs_ctup, slot,
+ scan->rs_cbuf);
+ return true;
}
/*