aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/heap/heapam.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/heap/heapam.c')
-rw-r--r--src/backend/access/heap/heapam.c650
1 files changed, 205 insertions, 445 deletions
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c
index dc3499349b6..3c8a5da0bc8 100644
--- a/src/backend/access/heap/heapam.c
+++ b/src/backend/access/heap/heapam.c
@@ -41,6 +41,7 @@
#include "access/parallel.h"
#include "access/relscan.h"
#include "access/sysattr.h"
+#include "access/tableam.h"
#include "access/transam.h"
#include "access/tuptoaster.h"
#include "access/valid.h"
@@ -68,22 +69,6 @@
#include "utils/snapmgr.h"
-/* GUC variable */
-bool synchronize_seqscans = true;
-
-
-static HeapScanDesc heap_beginscan_internal(Relation relation,
- Snapshot snapshot,
- int nkeys, ScanKey key,
- ParallelHeapScanDesc parallel_scan,
- bool allow_strat,
- bool allow_sync,
- bool allow_pagemode,
- bool is_bitmapscan,
- bool is_samplescan,
- bool temp_snap);
-static void heap_parallelscan_startblock_init(HeapScanDesc scan);
-static BlockNumber heap_parallelscan_nextpage(HeapScanDesc scan);
static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup,
TransactionId xid, CommandId cid, int options);
static XLogRecPtr log_heap_update(Relation reln, Buffer oldbuf,
@@ -207,6 +192,7 @@ static const int MultiXactStatusLock[MaxMultiXactStatus + 1] =
static void
initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
{
+ ParallelBlockTableScanDesc bpscan = NULL;
bool allow_strat;
bool allow_sync;
@@ -221,10 +207,13 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
* results for a non-MVCC snapshot, the caller must hold some higher-level
* lock that ensures the interesting tuple(s) won't change.)
*/
- if (scan->rs_parallel != NULL)
- scan->rs_nblocks = scan->rs_parallel->phs_nblocks;
+ if (scan->rs_base.rs_parallel != NULL)
+ {
+ bpscan = (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel;
+ scan->rs_nblocks = bpscan->phs_nblocks;
+ }
else
- scan->rs_nblocks = RelationGetNumberOfBlocks(scan->rs_rd);
+ scan->rs_nblocks = RelationGetNumberOfBlocks(scan->rs_base.rs_rd);
/*
* If the table is large relative to NBuffers, use a bulk-read access
@@ -238,11 +227,11 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
* Note that heap_parallelscan_initialize has a very similar test; if you
* change this, consider changing that one, too.
*/
- if (!RelationUsesLocalBuffers(scan->rs_rd) &&
+ if (!RelationUsesLocalBuffers(scan->rs_base.rs_rd) &&
scan->rs_nblocks > NBuffers / 4)
{
- allow_strat = scan->rs_allow_strat;
- allow_sync = scan->rs_allow_sync;
+ allow_strat = scan->rs_base.rs_allow_strat;
+ allow_sync = scan->rs_base.rs_allow_sync;
}
else
allow_strat = allow_sync = false;
@@ -260,10 +249,10 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
scan->rs_strategy = NULL;
}
- if (scan->rs_parallel != NULL)
+ if (scan->rs_base.rs_parallel != NULL)
{
- /* For parallel scan, believe whatever ParallelHeapScanDesc says. */
- scan->rs_syncscan = scan->rs_parallel->phs_syncscan;
+ /* For parallel scan, believe whatever ParallelTableScanDesc says. */
+ scan->rs_base.rs_syncscan = scan->rs_base.rs_parallel->phs_syncscan;
}
else if (keep_startblock)
{
@@ -272,16 +261,16 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
* so that rewinding a cursor doesn't generate surprising results.
* Reset the active syncscan setting, though.
*/
- scan->rs_syncscan = (allow_sync && synchronize_seqscans);
+ scan->rs_base.rs_syncscan = (allow_sync && synchronize_seqscans);
}
else if (allow_sync && synchronize_seqscans)
{
- scan->rs_syncscan = true;
- scan->rs_startblock = ss_get_location(scan->rs_rd, scan->rs_nblocks);
+ scan->rs_base.rs_syncscan = true;
+ scan->rs_startblock = ss_get_location(scan->rs_base.rs_rd, scan->rs_nblocks);
}
else
{
- scan->rs_syncscan = false;
+ scan->rs_base.rs_syncscan = false;
scan->rs_startblock = 0;
}
@@ -298,15 +287,15 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
* copy the scan key, if appropriate
*/
if (key != NULL)
- memcpy(scan->rs_key, key, scan->rs_nkeys * sizeof(ScanKeyData));
+ memcpy(scan->rs_base.rs_key, key, scan->rs_base.rs_nkeys * sizeof(ScanKeyData));
/*
* Currently, we don't have a stats counter for bitmap heap scans (but the
* underlying bitmap index scans will be counted) or sample scans (we only
* update stats for tuple fetches there)
*/
- if (!scan->rs_bitmapscan && !scan->rs_samplescan)
- pgstat_count_heap_scan(scan->rs_rd);
+ if (!scan->rs_base.rs_bitmapscan && !scan->rs_base.rs_samplescan)
+ pgstat_count_heap_scan(scan->rs_base.rs_rd);
}
/*
@@ -316,10 +305,12 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock)
* numBlks is number of pages to scan (InvalidBlockNumber means "all")
*/
void
-heap_setscanlimits(HeapScanDesc scan, BlockNumber startBlk, BlockNumber numBlks)
+heap_setscanlimits(TableScanDesc sscan, BlockNumber startBlk, BlockNumber numBlks)
{
+ HeapScanDesc scan = (HeapScanDesc) sscan;
+
Assert(!scan->rs_inited); /* else too late to change */
- Assert(!scan->rs_syncscan); /* else rs_startblock is significant */
+ Assert(!scan->rs_base.rs_syncscan); /* else rs_startblock is significant */
/* Check startBlk is valid (but allow case of zero blocks...) */
Assert(startBlk == 0 || startBlk < scan->rs_nblocks);
@@ -336,8 +327,9 @@ heap_setscanlimits(HeapScanDesc scan, BlockNumber startBlk, BlockNumber numBlks)
* which tuples on the page are visible.
*/
void
-heapgetpage(HeapScanDesc scan, BlockNumber page)
+heapgetpage(TableScanDesc sscan, BlockNumber page)
{
+ HeapScanDesc scan = (HeapScanDesc) sscan;
Buffer buffer;
Snapshot snapshot;
Page dp;
@@ -364,20 +356,20 @@ heapgetpage(HeapScanDesc scan, BlockNumber page)
CHECK_FOR_INTERRUPTS();
/* read page using selected strategy */
- scan->rs_cbuf = ReadBufferExtended(scan->rs_rd, MAIN_FORKNUM, page,
+ scan->rs_cbuf = ReadBufferExtended(scan->rs_base.rs_rd, MAIN_FORKNUM, page,
RBM_NORMAL, scan->rs_strategy);
scan->rs_cblock = page;
- if (!scan->rs_pageatatime)
+ if (!scan->rs_base.rs_pageatatime)
return;
buffer = scan->rs_cbuf;
- snapshot = scan->rs_snapshot;
+ snapshot = scan->rs_base.rs_snapshot;
/*
* Prune and repair fragmentation for the whole page, if possible.
*/
- heap_page_prune_opt(scan->rs_rd, buffer);
+ heap_page_prune_opt(scan->rs_base.rs_rd, buffer);
/*
* We must hold share lock on the buffer content while examining tuple
@@ -387,7 +379,7 @@ heapgetpage(HeapScanDesc scan, BlockNumber page)
LockBuffer(buffer, BUFFER_LOCK_SHARE);
dp = BufferGetPage(buffer);
- TestForOldSnapshot(snapshot, scan->rs_rd, dp);
+ TestForOldSnapshot(snapshot, scan->rs_base.rs_rd, dp);
lines = PageGetMaxOffsetNumber(dp);
ntup = 0;
@@ -422,7 +414,7 @@ heapgetpage(HeapScanDesc scan, BlockNumber page)
HeapTupleData loctup;
bool valid;
- loctup.t_tableOid = RelationGetRelid(scan->rs_rd);
+ loctup.t_tableOid = RelationGetRelid(scan->rs_base.rs_rd);
loctup.t_data = (HeapTupleHeader) PageGetItem((Page) dp, lpp);
loctup.t_len = ItemIdGetLength(lpp);
ItemPointerSet(&(loctup.t_self), page, lineoff);
@@ -432,8 +424,8 @@ heapgetpage(HeapScanDesc scan, BlockNumber page)
else
valid = HeapTupleSatisfiesVisibility(&loctup, snapshot, buffer);
- CheckForSerializableConflictOut(valid, scan->rs_rd, &loctup,
- buffer, snapshot);
+ CheckForSerializableConflictOut(valid, scan->rs_base.rs_rd,
+ &loctup, buffer, snapshot);
if (valid)
scan->rs_vistuples[ntup++] = lineoff;
@@ -476,7 +468,7 @@ heapgettup(HeapScanDesc scan,
ScanKey key)
{
HeapTuple tuple = &(scan->rs_ctup);
- Snapshot snapshot = scan->rs_snapshot;
+ Snapshot snapshot = scan->rs_base.rs_snapshot;
bool backward = ScanDirectionIsBackward(dir);
BlockNumber page;
bool finished;
@@ -502,11 +494,16 @@ heapgettup(HeapScanDesc scan,
tuple->t_data = NULL;
return;
}
- if (scan->rs_parallel != NULL)
+ if (scan->rs_base.rs_parallel != NULL)
{
- heap_parallelscan_startblock_init(scan);
+ ParallelBlockTableScanDesc pbscan =
+ (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel;
- page = heap_parallelscan_nextpage(scan);
+ table_block_parallelscan_startblock_init(scan->rs_base.rs_rd,
+ pbscan);
+
+ page = table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
+ pbscan);
/* Other processes might have already finished the scan. */
if (page == InvalidBlockNumber)
@@ -518,7 +515,7 @@ heapgettup(HeapScanDesc scan,
}
else
page = scan->rs_startblock; /* first page */
- heapgetpage(scan, page);
+ heapgetpage((TableScanDesc) scan, page);
lineoff = FirstOffsetNumber; /* first offnum */
scan->rs_inited = true;
}
@@ -533,7 +530,7 @@ heapgettup(HeapScanDesc scan,
LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
dp = BufferGetPage(scan->rs_cbuf);
- TestForOldSnapshot(snapshot, scan->rs_rd, dp);
+ TestForOldSnapshot(snapshot, scan->rs_base.rs_rd, dp);
lines = PageGetMaxOffsetNumber(dp);
/* page and lineoff now reference the physically next tid */
@@ -542,7 +539,7 @@ heapgettup(HeapScanDesc scan,
else if (backward)
{
/* backward parallel scan not supported */
- Assert(scan->rs_parallel == NULL);
+ Assert(scan->rs_base.rs_parallel == NULL);
if (!scan->rs_inited)
{
@@ -562,13 +559,13 @@ heapgettup(HeapScanDesc scan,
* time, and much more likely that we'll just bollix things for
* forward scanners.
*/
- scan->rs_syncscan = false;
+ scan->rs_base.rs_syncscan = false;
/* start from last page of the scan */
if (scan->rs_startblock > 0)
page = scan->rs_startblock - 1;
else
page = scan->rs_nblocks - 1;
- heapgetpage(scan, page);
+ heapgetpage((TableScanDesc) scan, page);
}
else
{
@@ -579,7 +576,7 @@ heapgettup(HeapScanDesc scan,
LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
dp = BufferGetPage(scan->rs_cbuf);
- TestForOldSnapshot(snapshot, scan->rs_rd, dp);
+ TestForOldSnapshot(snapshot, scan->rs_base.rs_rd, dp);
lines = PageGetMaxOffsetNumber(dp);
if (!scan->rs_inited)
@@ -610,11 +607,11 @@ heapgettup(HeapScanDesc scan,
page = ItemPointerGetBlockNumber(&(tuple->t_self));
if (page != scan->rs_cblock)
- heapgetpage(scan, page);
+ heapgetpage((TableScanDesc) scan, page);
/* Since the tuple was previously fetched, needn't lock page here */
dp = BufferGetPage(scan->rs_cbuf);
- TestForOldSnapshot(snapshot, scan->rs_rd, dp);
+ TestForOldSnapshot(snapshot, scan->rs_base.rs_rd, dp);
lineoff = ItemPointerGetOffsetNumber(&(tuple->t_self));
lpp = PageGetItemId(dp, lineoff);
Assert(ItemIdIsNormal(lpp));
@@ -649,11 +646,12 @@ heapgettup(HeapScanDesc scan,
snapshot,
scan->rs_cbuf);
- CheckForSerializableConflictOut(valid, scan->rs_rd, tuple,
- scan->rs_cbuf, snapshot);
+ CheckForSerializableConflictOut(valid, scan->rs_base.rs_rd,
+ tuple, scan->rs_cbuf,
+ snapshot);
if (valid && key != NULL)
- HeapKeyTest(tuple, RelationGetDescr(scan->rs_rd),
+ HeapKeyTest(tuple, RelationGetDescr(scan->rs_base.rs_rd),
nkeys, key, valid);
if (valid)
@@ -696,9 +694,13 @@ heapgettup(HeapScanDesc scan,
page = scan->rs_nblocks;
page--;
}
- else if (scan->rs_parallel != NULL)
+ else if (scan->rs_base.rs_parallel != NULL)
{
- page = heap_parallelscan_nextpage(scan);
+ ParallelBlockTableScanDesc pbscan =
+ (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel;
+
+ page = table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
+ pbscan);
finished = (page == InvalidBlockNumber);
}
else
@@ -721,8 +723,8 @@ heapgettup(HeapScanDesc scan,
* a little bit backwards on every invocation, which is confusing.
* We don't guarantee any specific ordering in general, though.
*/
- if (scan->rs_syncscan)
- ss_report_location(scan->rs_rd, page);
+ if (scan->rs_base.rs_syncscan)
+ ss_report_location(scan->rs_base.rs_rd, page);
}
/*
@@ -739,12 +741,12 @@ heapgettup(HeapScanDesc scan,
return;
}
- heapgetpage(scan, page);
+ heapgetpage((TableScanDesc) scan, page);
LockBuffer(scan->rs_cbuf, BUFFER_LOCK_SHARE);
dp = BufferGetPage(scan->rs_cbuf);
- TestForOldSnapshot(snapshot, scan->rs_rd, dp);
+ TestForOldSnapshot(snapshot, scan->rs_base.rs_rd, dp);
lines = PageGetMaxOffsetNumber((Page) dp);
linesleft = lines;
if (backward)
@@ -806,11 +808,16 @@ heapgettup_pagemode(HeapScanDesc scan,
tuple->t_data = NULL;
return;
}
- if (scan->rs_parallel != NULL)
+ if (scan->rs_base.rs_parallel != NULL)
{
- heap_parallelscan_startblock_init(scan);
+ ParallelBlockTableScanDesc pbscan =
+ (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel;
+
+ table_block_parallelscan_startblock_init(scan->rs_base.rs_rd,
+ pbscan);
- page = heap_parallelscan_nextpage(scan);
+ page = table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
+ pbscan);
/* Other processes might have already finished the scan. */
if (page == InvalidBlockNumber)
@@ -822,7 +829,7 @@ heapgettup_pagemode(HeapScanDesc scan,
}
else
page = scan->rs_startblock; /* first page */
- heapgetpage(scan, page);
+ heapgetpage((TableScanDesc) scan, page);
lineindex = 0;
scan->rs_inited = true;
}
@@ -834,7 +841,7 @@ heapgettup_pagemode(HeapScanDesc scan,
}
dp = BufferGetPage(scan->rs_cbuf);
- TestForOldSnapshot(scan->rs_snapshot, scan->rs_rd, dp);
+ TestForOldSnapshot(scan->rs_base.rs_snapshot, scan->rs_base.rs_rd, dp);
lines = scan->rs_ntuples;
/* page and lineindex now reference the next visible tid */
@@ -843,7 +850,7 @@ heapgettup_pagemode(HeapScanDesc scan,
else if (backward)
{
/* backward parallel scan not supported */
- Assert(scan->rs_parallel == NULL);
+ Assert(scan->rs_base.rs_parallel == NULL);
if (!scan->rs_inited)
{
@@ -863,13 +870,13 @@ heapgettup_pagemode(HeapScanDesc scan,
* time, and much more likely that we'll just bollix things for
* forward scanners.
*/
- scan->rs_syncscan = false;
+ scan->rs_base.rs_syncscan = false;
/* start from last page of the scan */
if (scan->rs_startblock > 0)
page = scan->rs_startblock - 1;
else
page = scan->rs_nblocks - 1;
- heapgetpage(scan, page);
+ heapgetpage((TableScanDesc) scan, page);
}
else
{
@@ -878,7 +885,7 @@ heapgettup_pagemode(HeapScanDesc scan,
}
dp = BufferGetPage(scan->rs_cbuf);
- TestForOldSnapshot(scan->rs_snapshot, scan->rs_rd, dp);
+ TestForOldSnapshot(scan->rs_base.rs_snapshot, scan->rs_base.rs_rd, dp);
lines = scan->rs_ntuples;
if (!scan->rs_inited)
@@ -908,11 +915,11 @@ heapgettup_pagemode(HeapScanDesc scan,
page = ItemPointerGetBlockNumber(&(tuple->t_self));
if (page != scan->rs_cblock)
- heapgetpage(scan, page);
+ heapgetpage((TableScanDesc) scan, page);
/* Since the tuple was previously fetched, needn't lock page here */
dp = BufferGetPage(scan->rs_cbuf);
- TestForOldSnapshot(scan->rs_snapshot, scan->rs_rd, dp);
+ TestForOldSnapshot(scan->rs_base.rs_snapshot, scan->rs_base.rs_rd, dp);
lineoff = ItemPointerGetOffsetNumber(&(tuple->t_self));
lpp = PageGetItemId(dp, lineoff);
Assert(ItemIdIsNormal(lpp));
@@ -950,7 +957,7 @@ heapgettup_pagemode(HeapScanDesc scan,
{
bool valid;
- HeapKeyTest(tuple, RelationGetDescr(scan->rs_rd),
+ HeapKeyTest(tuple, RelationGetDescr(scan->rs_base.rs_rd),
nkeys, key, valid);
if (valid)
{
@@ -986,9 +993,13 @@ heapgettup_pagemode(HeapScanDesc scan,
page = scan->rs_nblocks;
page--;
}
- else if (scan->rs_parallel != NULL)
+ else if (scan->rs_base.rs_parallel != NULL)
{
- page = heap_parallelscan_nextpage(scan);
+ ParallelBlockTableScanDesc pbscan =
+ (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel;
+
+ page = table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
+ pbscan);
finished = (page == InvalidBlockNumber);
}
else
@@ -1011,8 +1022,8 @@ heapgettup_pagemode(HeapScanDesc scan,
* a little bit backwards on every invocation, which is confusing.
* We don't guarantee any specific ordering in general, though.
*/
- if (scan->rs_syncscan)
- ss_report_location(scan->rs_rd, page);
+ if (scan->rs_base.rs_syncscan)
+ ss_report_location(scan->rs_base.rs_rd, page);
}
/*
@@ -1029,10 +1040,10 @@ heapgettup_pagemode(HeapScanDesc scan,
return;
}
- heapgetpage(scan, page);
+ heapgetpage((TableScanDesc) scan, page);
dp = BufferGetPage(scan->rs_cbuf);
- TestForOldSnapshot(scan->rs_snapshot, scan->rs_rd, dp);
+ TestForOldSnapshot(scan->rs_base.rs_snapshot, scan->rs_base.rs_rd, dp);
lines = scan->rs_ntuples;
linesleft = lines;
if (backward)
@@ -1095,86 +1106,16 @@ fastgetattr(HeapTuple tup, int attnum, TupleDesc tupleDesc,
*/
-/* ----------------
- * heap_beginscan - begin relation scan
- *
- * heap_beginscan is the "standard" case.
- *
- * heap_beginscan_catalog differs in setting up its own temporary snapshot.
- *
- * heap_beginscan_strat offers an extended API that lets the caller control
- * whether a nondefault buffer access strategy can be used, and whether
- * syncscan can be chosen (possibly resulting in the scan not starting from
- * block zero). Both of these default to true with plain heap_beginscan.
- *
- * heap_beginscan_bm is an alternative entry point for setting up a
- * HeapScanDesc for a bitmap heap scan. Although that scan technology is
- * really quite unlike a standard seqscan, there is just enough commonality
- * to make it worth using the same data structure.
- *
- * heap_beginscan_sampling is an alternative entry point for setting up a
- * HeapScanDesc for a TABLESAMPLE scan. As with bitmap scans, it's worth
- * using the same data structure although the behavior is rather different.
- * In addition to the options offered by heap_beginscan_strat, this call
- * also allows control of whether page-mode visibility checking is used.
- * ----------------
- */
-HeapScanDesc
+TableScanDesc
heap_beginscan(Relation relation, Snapshot snapshot,
- int nkeys, ScanKey key)
-{
- return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL,
- true, true, true, false, false, false);
-}
-
-HeapScanDesc
-heap_beginscan_catalog(Relation relation, int nkeys, ScanKey key)
-{
- Oid relid = RelationGetRelid(relation);
- Snapshot snapshot = RegisterSnapshot(GetCatalogSnapshot(relid));
-
- return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL,
- true, true, true, false, false, true);
-}
-
-HeapScanDesc
-heap_beginscan_strat(Relation relation, Snapshot snapshot,
- int nkeys, ScanKey key,
- bool allow_strat, bool allow_sync)
-{
- return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL,
- allow_strat, allow_sync, true,
- false, false, false);
-}
-
-HeapScanDesc
-heap_beginscan_bm(Relation relation, Snapshot snapshot,
- int nkeys, ScanKey key)
-{
- return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL,
- false, false, true, true, false, false);
-}
-
-HeapScanDesc
-heap_beginscan_sampling(Relation relation, Snapshot snapshot,
- int nkeys, ScanKey key,
- bool allow_strat, bool allow_sync, bool allow_pagemode)
-{
- return heap_beginscan_internal(relation, snapshot, nkeys, key, NULL,
- allow_strat, allow_sync, allow_pagemode,
- false, true, false);
-}
-
-static HeapScanDesc
-heap_beginscan_internal(Relation relation, Snapshot snapshot,
- int nkeys, ScanKey key,
- ParallelHeapScanDesc parallel_scan,
- bool allow_strat,
- bool allow_sync,
- bool allow_pagemode,
- bool is_bitmapscan,
- bool is_samplescan,
- bool temp_snap)
+ int nkeys, ScanKey key,
+ ParallelTableScanDesc parallel_scan,
+ bool allow_strat,
+ bool allow_sync,
+ bool allow_pagemode,
+ bool is_bitmapscan,
+ bool is_samplescan,
+ bool temp_snap)
{
HeapScanDesc scan;
@@ -1192,21 +1133,22 @@ heap_beginscan_internal(Relation relation, Snapshot snapshot,
*/
scan = (HeapScanDesc) palloc(sizeof(HeapScanDescData));
- scan->rs_rd = relation;
- scan->rs_snapshot = snapshot;
- scan->rs_nkeys = nkeys;
- scan->rs_bitmapscan = is_bitmapscan;
- scan->rs_samplescan = is_samplescan;
+ scan->rs_base.rs_rd = relation;
+ scan->rs_base.rs_snapshot = snapshot;
+ scan->rs_base.rs_nkeys = nkeys;
+ scan->rs_base.rs_bitmapscan = is_bitmapscan;
+ scan->rs_base.rs_samplescan = is_samplescan;
scan->rs_strategy = NULL; /* set in initscan */
- scan->rs_allow_strat = allow_strat;
- scan->rs_allow_sync = allow_sync;
- scan->rs_temp_snap = temp_snap;
- scan->rs_parallel = parallel_scan;
+ scan->rs_base.rs_allow_strat = allow_strat;
+ scan->rs_base.rs_allow_sync = allow_sync;
+ scan->rs_base.rs_temp_snap = temp_snap;
+ scan->rs_base.rs_parallel = parallel_scan;
/*
* we can use page-at-a-time mode if it's an MVCC-safe snapshot
*/
- scan->rs_pageatatime = allow_pagemode && IsMVCCSnapshot(snapshot);
+ scan->rs_base.rs_pageatatime =
+ allow_pagemode && snapshot && IsMVCCSnapshot(snapshot);
/*
* For a seqscan in a serializable transaction, acquire a predicate lock
@@ -1230,23 +1172,29 @@ heap_beginscan_internal(Relation relation, Snapshot snapshot,
* initscan() and we don't want to allocate memory again
*/
if (nkeys > 0)
- scan->rs_key = (ScanKey) palloc(sizeof(ScanKeyData) * nkeys);
+ scan->rs_base.rs_key = (ScanKey) palloc(sizeof(ScanKeyData) * nkeys);
else
- scan->rs_key = NULL;
+ scan->rs_base.rs_key = NULL;
initscan(scan, key, false);
- return scan;
+ return (TableScanDesc) scan;
}
-/* ----------------
- * heap_rescan - restart a relation scan
- * ----------------
- */
void
-heap_rescan(HeapScanDesc scan,
- ScanKey key)
+heap_rescan(TableScanDesc sscan, ScanKey key, bool set_params,
+ bool allow_strat, bool allow_sync, bool allow_pagemode)
{
+ HeapScanDesc scan = (HeapScanDesc) sscan;
+
+ if (set_params)
+ {
+ scan->rs_base.rs_allow_strat = allow_strat;
+ scan->rs_base.rs_allow_sync = allow_sync;
+ scan->rs_base.rs_pageatatime =
+ allow_pagemode && IsMVCCSnapshot(scan->rs_base.rs_snapshot);
+ }
+
/*
* unpin scan buffers
*/
@@ -1259,37 +1207,11 @@ heap_rescan(HeapScanDesc scan,
initscan(scan, key, true);
}
-/* ----------------
- * heap_rescan_set_params - restart a relation scan after changing params
- *
- * This call allows changing the buffer strategy, syncscan, and pagemode
- * options before starting a fresh scan. Note that although the actual use
- * of syncscan might change (effectively, enabling or disabling reporting),
- * the previously selected startblock will be kept.
- * ----------------
- */
void
-heap_rescan_set_params(HeapScanDesc scan, ScanKey key,
- bool allow_strat, bool allow_sync, bool allow_pagemode)
+heap_endscan(TableScanDesc sscan)
{
- /* adjust parameters */
- scan->rs_allow_strat = allow_strat;
- scan->rs_allow_sync = allow_sync;
- scan->rs_pageatatime = allow_pagemode && IsMVCCSnapshot(scan->rs_snapshot);
- /* ... and rescan */
- heap_rescan(scan, key);
-}
+ HeapScanDesc scan = (HeapScanDesc) sscan;
-/* ----------------
- * heap_endscan - end relation scan
- *
- * See how to integrate with index scans.
- * Check handling if reldesc caching.
- * ----------------
- */
-void
-heap_endscan(HeapScanDesc scan)
-{
/* Note: no locking manipulations needed */
/*
@@ -1301,246 +1223,20 @@ heap_endscan(HeapScanDesc scan)
/*
* decrement relation reference count and free scan descriptor storage
*/
- RelationDecrementReferenceCount(scan->rs_rd);
+ RelationDecrementReferenceCount(scan->rs_base.rs_rd);
- if (scan->rs_key)
- pfree(scan->rs_key);
+ if (scan->rs_base.rs_key)
+ pfree(scan->rs_base.rs_key);
if (scan->rs_strategy != NULL)
FreeAccessStrategy(scan->rs_strategy);
- if (scan->rs_temp_snap)
- UnregisterSnapshot(scan->rs_snapshot);
+ if (scan->rs_base.rs_temp_snap)
+ UnregisterSnapshot(scan->rs_base.rs_snapshot);
pfree(scan);
}
-/* ----------------
- * heap_parallelscan_estimate - estimate storage for ParallelHeapScanDesc
- *
- * Sadly, this doesn't reduce to a constant, because the size required
- * to serialize the snapshot can vary.
- * ----------------
- */
-Size
-heap_parallelscan_estimate(Snapshot snapshot)
-{
- Size sz = offsetof(ParallelHeapScanDescData, phs_snapshot_data);
-
- if (IsMVCCSnapshot(snapshot))
- sz = add_size(sz, EstimateSnapshotSpace(snapshot));
- else
- Assert(snapshot == SnapshotAny);
-
- return sz;
-}
-
-/* ----------------
- * heap_parallelscan_initialize - initialize ParallelHeapScanDesc
- *
- * Must allow as many bytes of shared memory as returned by
- * heap_parallelscan_estimate. Call this just once in the leader
- * process; then, individual workers attach via heap_beginscan_parallel.
- * ----------------
- */
-void
-heap_parallelscan_initialize(ParallelHeapScanDesc target, Relation relation,
- Snapshot snapshot)
-{
- target->phs_relid = RelationGetRelid(relation);
- target->phs_nblocks = RelationGetNumberOfBlocks(relation);
- /* compare phs_syncscan initialization to similar logic in initscan */
- target->phs_syncscan = synchronize_seqscans &&
- !RelationUsesLocalBuffers(relation) &&
- target->phs_nblocks > NBuffers / 4;
- SpinLockInit(&target->phs_mutex);
- target->phs_startblock = InvalidBlockNumber;
- pg_atomic_init_u64(&target->phs_nallocated, 0);
- if (IsMVCCSnapshot(snapshot))
- {
- SerializeSnapshot(snapshot, target->phs_snapshot_data);
- target->phs_snapshot_any = false;
- }
- else
- {
- Assert(snapshot == SnapshotAny);
- target->phs_snapshot_any = true;
- }
-}
-
-/* ----------------
- * heap_parallelscan_reinitialize - reset a parallel scan
- *
- * Call this in the leader process. Caller is responsible for
- * making sure that all workers have finished the scan beforehand.
- * ----------------
- */
-void
-heap_parallelscan_reinitialize(ParallelHeapScanDesc parallel_scan)
-{
- pg_atomic_write_u64(&parallel_scan->phs_nallocated, 0);
-}
-
-/* ----------------
- * heap_beginscan_parallel - join a parallel scan
- *
- * Caller must hold a suitable lock on the correct relation.
- * ----------------
- */
-HeapScanDesc
-heap_beginscan_parallel(Relation relation, ParallelHeapScanDesc parallel_scan)
-{
- Snapshot snapshot;
-
- Assert(RelationGetRelid(relation) == parallel_scan->phs_relid);
-
- if (!parallel_scan->phs_snapshot_any)
- {
- /* Snapshot was serialized -- restore it */
- snapshot = RestoreSnapshot(parallel_scan->phs_snapshot_data);
- RegisterSnapshot(snapshot);
- }
- else
- {
- /* SnapshotAny passed by caller (not serialized) */
- snapshot = SnapshotAny;
- }
-
- return heap_beginscan_internal(relation, snapshot, 0, NULL, parallel_scan,
- true, true, true, false, false,
- !parallel_scan->phs_snapshot_any);
-}
-
-/* ----------------
- * heap_parallelscan_startblock_init - find and set the scan's startblock
- *
- * Determine where the parallel seq scan should start. This function may
- * be called many times, once by each parallel worker. We must be careful
- * only to set the startblock once.
- * ----------------
- */
-static void
-heap_parallelscan_startblock_init(HeapScanDesc scan)
-{
- BlockNumber sync_startpage = InvalidBlockNumber;
- ParallelHeapScanDesc parallel_scan;
-
- Assert(scan->rs_parallel);
- parallel_scan = scan->rs_parallel;
-
-retry:
- /* Grab the spinlock. */
- SpinLockAcquire(&parallel_scan->phs_mutex);
-
- /*
- * If the scan's startblock has not yet been initialized, we must do so
- * now. If this is not a synchronized scan, we just start at block 0, but
- * if it is a synchronized scan, we must get the starting position from
- * the synchronized scan machinery. We can't hold the spinlock while
- * doing that, though, so release the spinlock, get the information we
- * need, and retry. If nobody else has initialized the scan in the
- * meantime, we'll fill in the value we fetched on the second time
- * through.
- */
- if (parallel_scan->phs_startblock == InvalidBlockNumber)
- {
- if (!parallel_scan->phs_syncscan)
- parallel_scan->phs_startblock = 0;
- else if (sync_startpage != InvalidBlockNumber)
- parallel_scan->phs_startblock = sync_startpage;
- else
- {
- SpinLockRelease(&parallel_scan->phs_mutex);
- sync_startpage = ss_get_location(scan->rs_rd, scan->rs_nblocks);
- goto retry;
- }
- }
- SpinLockRelease(&parallel_scan->phs_mutex);
-}
-
-/* ----------------
- * heap_parallelscan_nextpage - get the next page to scan
- *
- * Get the next page to scan. Even if there are no pages left to scan,
- * another backend could have grabbed a page to scan and not yet finished
- * looking at it, so it doesn't follow that the scan is done when the
- * first backend gets an InvalidBlockNumber return.
- * ----------------
- */
-static BlockNumber
-heap_parallelscan_nextpage(HeapScanDesc scan)
-{
- BlockNumber page;
- ParallelHeapScanDesc parallel_scan;
- uint64 nallocated;
-
- Assert(scan->rs_parallel);
- parallel_scan = scan->rs_parallel;
-
- /*
- * phs_nallocated tracks how many pages have been allocated to workers
- * already. When phs_nallocated >= rs_nblocks, all blocks have been
- * allocated.
- *
- * Because we use an atomic fetch-and-add to fetch the current value, the
- * phs_nallocated counter will exceed rs_nblocks, because workers will
- * still increment the value, when they try to allocate the next block but
- * all blocks have been allocated already. The counter must be 64 bits
- * wide because of that, to avoid wrapping around when rs_nblocks is close
- * to 2^32.
- *
- * The actual page to return is calculated by adding the counter to the
- * starting block number, modulo nblocks.
- */
- nallocated = pg_atomic_fetch_add_u64(&parallel_scan->phs_nallocated, 1);
- if (nallocated >= scan->rs_nblocks)
- page = InvalidBlockNumber; /* all blocks have been allocated */
- else
- page = (nallocated + parallel_scan->phs_startblock) % scan->rs_nblocks;
-
- /*
- * Report scan location. Normally, we report the current page number.
- * When we reach the end of the scan, though, we report the starting page,
- * not the ending page, just so the starting positions for later scans
- * doesn't slew backwards. We only report the position at the end of the
- * scan once, though: subsequent callers will report nothing.
- */
- if (scan->rs_syncscan)
- {
- if (page != InvalidBlockNumber)
- ss_report_location(scan->rs_rd, page);
- else if (nallocated == scan->rs_nblocks)
- ss_report_location(scan->rs_rd, parallel_scan->phs_startblock);
- }
-
- return page;
-}
-
-/* ----------------
- * heap_update_snapshot
- *
- * Update snapshot info in heap scan descriptor.
- * ----------------
- */
-void
-heap_update_snapshot(HeapScanDesc scan, Snapshot snapshot)
-{
- Assert(IsMVCCSnapshot(snapshot));
-
- RegisterSnapshot(snapshot);
- scan->rs_snapshot = snapshot;
- scan->rs_temp_snap = true;
-}
-
-/* ----------------
- * heap_getnext - retrieve next tuple in scan
- *
- * Fix to work with index relations.
- * We don't return the buffer anymore, but you can get it from the
- * returned HeapTuple.
- * ----------------
- */
-
#ifdef HEAPDEBUGALL
#define HEAPDEBUG_1 \
elog(DEBUG2, "heap_getnext([%s,nkeys=%d],dir=%d) called", \
@@ -1557,17 +1253,32 @@ heap_update_snapshot(HeapScanDesc scan, Snapshot snapshot)
HeapTuple
-heap_getnext(HeapScanDesc scan, ScanDirection direction)
+heap_getnext(TableScanDesc sscan, ScanDirection direction)
{
+ HeapScanDesc scan = (HeapScanDesc) sscan;
+
+ /*
+ * This is still widely used directly, without going through table AM, so
+ * add a safety check. It's possible we should, at a later point,
+ * downgrade this to an assert. The reason for checking the AM routine,
+ * rather than the AM oid, is that this allows to write regression tests
+ * that create another AM reusing the heap handler.
+ */
+ if (unlikely(sscan->rs_rd->rd_tableam != GetHeapamTableAmRoutine()))
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("only heap AM is supported")));
+
/* Note: no locking manipulations needed */
HEAPDEBUG_1; /* heap_getnext( info ) */
- if (scan->rs_pageatatime)
+ if (scan->rs_base.rs_pageatatime)
heapgettup_pagemode(scan, direction,
- scan->rs_nkeys, scan->rs_key);
+ scan->rs_base.rs_nkeys, scan->rs_base.rs_key);
else
- heapgettup(scan, direction, scan->rs_nkeys, scan->rs_key);
+ heapgettup(scan, direction,
+ scan->rs_base.rs_nkeys, scan->rs_base.rs_key);
if (scan->rs_ctup.t_data == NULL)
{
@@ -1581,9 +1292,58 @@ heap_getnext(HeapScanDesc scan, ScanDirection direction)
*/
HEAPDEBUG_3; /* heap_getnext returning tuple */
- pgstat_count_heap_getnext(scan->rs_rd);
+ pgstat_count_heap_getnext(scan->rs_base.rs_rd);
+
+ return &scan->rs_ctup;
+}
+
+#ifdef HEAPAMSLOTDEBUGALL
+#define HEAPAMSLOTDEBUG_1 \
+ elog(DEBUG2, "heapam_getnextslot([%s,nkeys=%d],dir=%d) called", \
+ RelationGetRelationName(scan->rs_base.rs_rd), scan->rs_base.rs_nkeys, (int) direction)
+#define HEAPAMSLOTDEBUG_2 \
+ elog(DEBUG2, "heapam_getnextslot returning EOS")
+#define HEAPAMSLOTDEBUG_3 \
+ elog(DEBUG2, "heapam_getnextslot returning tuple")
+#else
+#define HEAPAMSLOTDEBUG_1
+#define HEAPAMSLOTDEBUG_2
+#define HEAPAMSLOTDEBUG_3
+#endif
+
+bool
+heap_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlot *slot)
+{
+ HeapScanDesc scan = (HeapScanDesc) sscan;
+
+ /* Note: no locking manipulations needed */
+
+ HEAPAMSLOTDEBUG_1; /* heap_getnextslot( info ) */
+
+ if (scan->rs_base.rs_pageatatime)
+ heapgettup_pagemode(scan, direction,
+ scan->rs_base.rs_nkeys, scan->rs_base.rs_key);
+ else
+ heapgettup(scan, direction, scan->rs_base.rs_nkeys, scan->rs_base.rs_key);
- return &(scan->rs_ctup);
+ if (scan->rs_ctup.t_data == NULL)
+ {
+ HEAPAMSLOTDEBUG_2; /* heap_getnextslot returning EOS */
+ ExecClearTuple(slot);
+ return false;
+ }
+
+ /*
+ * if we get here it means we have a new current scan tuple, so point to
+ * the proper return buffer and return the tuple.
+ */
+ HEAPAMSLOTDEBUG_3; /* heap_getnextslot returning tuple */
+
+ pgstat_count_heap_getnext(scan->rs_base.rs_rd);
+
+ ExecStoreBufferHeapTuple(&scan->rs_ctup, slot,
+ scan->rs_cbuf);
+ return true;
}
/*