aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--contrib/bloom/blutils.c3
-rw-r--r--doc/src/sgml/indexam.sgml67
-rw-r--r--src/backend/access/brin/brin.c3
-rw-r--r--src/backend/access/gin/ginutil.c3
-rw-r--r--src/backend/access/gist/gist.c3
-rw-r--r--src/backend/access/hash/hash.c3
-rw-r--r--src/backend/access/index/indexam.c135
-rw-r--r--src/backend/access/nbtree/nbtree.c3
-rw-r--r--src/backend/access/spgist/spgutils.c3
-rw-r--r--src/include/access/amapi.h17
-rw-r--r--src/include/access/genam.h9
-rw-r--r--src/include/access/relscan.h13
-rw-r--r--src/include/c.h3
-rw-r--r--src/tools/pgindent/typedefs.list2
14 files changed, 262 insertions, 5 deletions
diff --git a/contrib/bloom/blutils.c b/contrib/bloom/blutils.c
index 06077afed69..858798db85c 100644
--- a/contrib/bloom/blutils.c
+++ b/contrib/bloom/blutils.c
@@ -138,6 +138,9 @@ blhandler(PG_FUNCTION_ARGS)
amroutine->amendscan = blendscan;
amroutine->ammarkpos = NULL;
amroutine->amrestrpos = NULL;
+ amroutine->amestimateparallelscan = NULL;
+ amroutine->aminitparallelscan = NULL;
+ amroutine->amparallelrescan = NULL;
PG_RETURN_POINTER(amroutine);
}
diff --git a/doc/src/sgml/indexam.sgml b/doc/src/sgml/indexam.sgml
index 40f201b11be..5d8e5574608 100644
--- a/doc/src/sgml/indexam.sgml
+++ b/doc/src/sgml/indexam.sgml
@@ -131,6 +131,11 @@ typedef struct IndexAmRoutine
amendscan_function amendscan;
ammarkpos_function ammarkpos; /* can be NULL */
amrestrpos_function amrestrpos; /* can be NULL */
+
+ /* interface functions to support parallel index scans */
+ amestimateparallelscan_function amestimateparallelscan; /* can be NULL */
+ aminitparallelscan_function aminitparallelscan; /* can be NULL */
+ amparallelrescan_function amparallelrescan; /* can be NULL */
} IndexAmRoutine;
</programlisting>
</para>
@@ -624,6 +629,68 @@ amrestrpos (IndexScanDesc scan);
the <structfield>amrestrpos</> field in its <structname>IndexAmRoutine</>
struct may be set to NULL.
</para>
+
+ <para>
+ In addition to supporting ordinary index scans, some types of index
+ may wish to support <firstterm>parallel index scans</>, which allow
+ multiple backends to cooperate in performing an index scan. The
+ index access method should arrange things so that each cooperating
+ process returns a subset of the tuples that would be performed by
+ an ordinary, non-parallel index scan, but in such a way that the
+ union of those subsets is equal to the set of tuples that would be
+ returned by an ordinary, non-parallel index scan. Furthermore, while
+ there need not be any global ordering of tuples returned by a parallel
+ scan, the ordering of that subset of tuples returned within each
+ cooperating backend must match the requested ordering. The following
+ functions may be implemented to support parallel index scans:
+ </para>
+
+ <para>
+<programlisting>
+Size
+amestimateparallelscan (void);
+</programlisting>
+ Estimate and return the number of bytes of dynamic shared memory which
+ the access method will be needed to perform a parallel scan. (This number
+ is in addition to, not in lieu of, the amount of space needed for
+ AM-independent data in <structname>ParallelIndexScanDescData</>.)
+ </para>
+
+ <para>
+ It is not necessary to implement this function for access methods which
+ do not support parallel scans or for which the number of additional bytes
+ of storage required is zero.
+ </para>
+
+ <para>
+<programlisting>
+void
+aminitparallelscan (void *target);
+</programlisting>
+ This function will be called to initialize dynamic shared memory at the
+ beginning of a parallel scan. <parameter>target</> will point to at least
+ the number of bytes previously returned by
+ <function>amestimateparallelscan</>, and this function may use that
+ amount of space to store whatever data it wishes.
+ </para>
+
+ <para>
+ It is not necessary to implement this function for access methods which
+ do not support parallel scans or in cases where the shared memory space
+ required needs no initialization.
+ </para>
+
+ <para>
+<programlisting>
+void
+amparallelrescan (IndexScanDesc scan);
+</programlisting>
+ This function, if implemented, will be called when a parallel index scan
+ must be restarted. It should reset any shared state set up by
+ <function>aminitparallelscan</> such that the scan will be restarted from
+ the beginning.
+ </para>
+
</sect1>
<sect1 id="index-scanning">
diff --git a/src/backend/access/brin/brin.c b/src/backend/access/brin/brin.c
index d60ddd242cb..b2afdb7bedb 100644
--- a/src/backend/access/brin/brin.c
+++ b/src/backend/access/brin/brin.c
@@ -112,6 +112,9 @@ brinhandler(PG_FUNCTION_ARGS)
amroutine->amendscan = brinendscan;
amroutine->ammarkpos = NULL;
amroutine->amrestrpos = NULL;
+ amroutine->amestimateparallelscan = NULL;
+ amroutine->aminitparallelscan = NULL;
+ amroutine->amparallelrescan = NULL;
PG_RETURN_POINTER(amroutine);
}
diff --git a/src/backend/access/gin/ginutil.c b/src/backend/access/gin/ginutil.c
index 3909638906b..02d920bb9db 100644
--- a/src/backend/access/gin/ginutil.c
+++ b/src/backend/access/gin/ginutil.c
@@ -68,6 +68,9 @@ ginhandler(PG_FUNCTION_ARGS)
amroutine->amendscan = ginendscan;
amroutine->ammarkpos = NULL;
amroutine->amrestrpos = NULL;
+ amroutine->amestimateparallelscan = NULL;
+ amroutine->aminitparallelscan = NULL;
+ amroutine->amparallelrescan = NULL;
PG_RETURN_POINTER(amroutine);
}
diff --git a/src/backend/access/gist/gist.c b/src/backend/access/gist/gist.c
index 597056ae440..c2247ad2f78 100644
--- a/src/backend/access/gist/gist.c
+++ b/src/backend/access/gist/gist.c
@@ -89,6 +89,9 @@ gisthandler(PG_FUNCTION_ARGS)
amroutine->amendscan = gistendscan;
amroutine->ammarkpos = NULL;
amroutine->amrestrpos = NULL;
+ amroutine->amestimateparallelscan = NULL;
+ amroutine->aminitparallelscan = NULL;
+ amroutine->amparallelrescan = NULL;
PG_RETURN_POINTER(amroutine);
}
diff --git a/src/backend/access/hash/hash.c b/src/backend/access/hash/hash.c
index a64a9b9696a..ec8ed33c708 100644
--- a/src/backend/access/hash/hash.c
+++ b/src/backend/access/hash/hash.c
@@ -86,6 +86,9 @@ hashhandler(PG_FUNCTION_ARGS)
amroutine->amendscan = hashendscan;
amroutine->ammarkpos = NULL;
amroutine->amrestrpos = NULL;
+ amroutine->amestimateparallelscan = NULL;
+ amroutine->aminitparallelscan = NULL;
+ amroutine->amparallelrescan = NULL;
PG_RETURN_POINTER(amroutine);
}
diff --git a/src/backend/access/index/indexam.c b/src/backend/access/index/indexam.c
index 4822af95a32..ba27c1e86d9 100644
--- a/src/backend/access/index/indexam.c
+++ b/src/backend/access/index/indexam.c
@@ -20,6 +20,10 @@
* index_insert - insert an index tuple into a relation
* index_markpos - mark a scan position
* index_restrpos - restore a scan position
+ * index_parallelscan_estimate - estimate shared memory for parallel scan
+ * index_parallelscan_initialize - initialize parallel scan
+ * index_parallelrescan - (re)start a parallel scan of an index
+ * index_beginscan_parallel - join parallel index scan
* index_getnext_tid - get the next TID from a scan
* index_fetch_heap - get the scan's next heap tuple
* index_getnext - get the next heap tuple from a scan
@@ -120,7 +124,8 @@ do { \
} while(0)
static IndexScanDesc index_beginscan_internal(Relation indexRelation,
- int nkeys, int norderbys, Snapshot snapshot);
+ int nkeys, int norderbys, Snapshot snapshot,
+ ParallelIndexScanDesc pscan, bool temp_snap);
/* ----------------------------------------------------------------
@@ -219,7 +224,7 @@ index_beginscan(Relation heapRelation,
{
IndexScanDesc scan;
- scan = index_beginscan_internal(indexRelation, nkeys, norderbys, snapshot);
+ scan = index_beginscan_internal(indexRelation, nkeys, norderbys, snapshot, NULL, false);
/*
* Save additional parameters into the scandesc. Everything else was set
@@ -244,7 +249,7 @@ index_beginscan_bitmap(Relation indexRelation,
{
IndexScanDesc scan;
- scan = index_beginscan_internal(indexRelation, nkeys, 0, snapshot);
+ scan = index_beginscan_internal(indexRelation, nkeys, 0, snapshot, NULL, false);
/*
* Save additional parameters into the scandesc. Everything else was set
@@ -260,8 +265,11 @@ index_beginscan_bitmap(Relation indexRelation,
*/
static IndexScanDesc
index_beginscan_internal(Relation indexRelation,
- int nkeys, int norderbys, Snapshot snapshot)
+ int nkeys, int norderbys, Snapshot snapshot,
+ ParallelIndexScanDesc pscan, bool temp_snap)
{
+ IndexScanDesc scan;
+
RELATION_CHECKS;
CHECK_REL_PROCEDURE(ambeginscan);
@@ -276,8 +284,13 @@ index_beginscan_internal(Relation indexRelation,
/*
* Tell the AM to open a scan.
*/
- return indexRelation->rd_amroutine->ambeginscan(indexRelation, nkeys,
+ scan = indexRelation->rd_amroutine->ambeginscan(indexRelation, nkeys,
norderbys);
+ /* Initialize information for parallel scan. */
+ scan->parallel_scan = pscan;
+ scan->xs_temp_snap = temp_snap;
+
+ return scan;
}
/* ----------------
@@ -341,6 +354,9 @@ index_endscan(IndexScanDesc scan)
/* Release index refcount acquired by index_beginscan */
RelationDecrementReferenceCount(scan->indexRelation);
+ if (scan->xs_temp_snap)
+ UnregisterSnapshot(scan->xs_snapshot);
+
/* Release the scan data structure itself */
IndexScanEnd(scan);
}
@@ -389,6 +405,115 @@ index_restrpos(IndexScanDesc scan)
scan->indexRelation->rd_amroutine->amrestrpos(scan);
}
+/*
+ * index_parallelscan_estimate - estimate shared memory for parallel scan
+ *
+ * Currently, we don't pass any information to the AM-specific estimator,
+ * so it can probably only return a constant. In the future, we might need
+ * to pass more information.
+ */
+Size
+index_parallelscan_estimate(Relation indexRelation, Snapshot snapshot)
+{
+ Size nbytes;
+
+ RELATION_CHECKS;
+
+ nbytes = offsetof(ParallelIndexScanDescData, ps_snapshot_data);
+ nbytes = add_size(nbytes, EstimateSnapshotSpace(snapshot));
+ nbytes = MAXALIGN(nbytes);
+
+ /*
+ * If amestimateparallelscan is not provided, assume there is no
+ * AM-specific data needed. (It's hard to believe that could work, but
+ * it's easy enough to cater to it here.)
+ */
+ if (indexRelation->rd_amroutine->amestimateparallelscan != NULL)
+ nbytes = add_size(nbytes,
+ indexRelation->rd_amroutine->amestimateparallelscan());
+
+ return nbytes;
+}
+
+/*
+ * index_parallelscan_initialize - initialize parallel scan
+ *
+ * We initialize both the ParallelIndexScanDesc proper and the AM-specific
+ * information which follows it.
+ *
+ * This function calls access method specific initialization routine to
+ * initialize am specific information. Call this just once in the leader
+ * process; then, individual workers attach via index_beginscan_parallel.
+ */
+void
+index_parallelscan_initialize(Relation heapRelation, Relation indexRelation,
+ Snapshot snapshot, ParallelIndexScanDesc target)
+{
+ Size offset;
+
+ RELATION_CHECKS;
+
+ offset = add_size(offsetof(ParallelIndexScanDescData, ps_snapshot_data),
+ EstimateSnapshotSpace(snapshot));
+ offset = MAXALIGN(offset);
+
+ target->ps_relid = RelationGetRelid(heapRelation);
+ target->ps_indexid = RelationGetRelid(indexRelation);
+ target->ps_offset = offset;
+ SerializeSnapshot(snapshot, target->ps_snapshot_data);
+
+ /* aminitparallelscan is optional; assume no-op if not provided by AM */
+ if (indexRelation->rd_amroutine->aminitparallelscan != NULL)
+ {
+ void *amtarget;
+
+ amtarget = OffsetToPointer(target, offset);
+ indexRelation->rd_amroutine->aminitparallelscan(amtarget);
+ }
+}
+
+/* ----------------
+ * index_parallelrescan - (re)start a parallel scan of an index
+ * ----------------
+ */
+void
+index_parallelrescan(IndexScanDesc scan)
+{
+ SCAN_CHECKS;
+
+ /* amparallelrescan is optional; assume no-op if not provided by AM */
+ if (scan->indexRelation->rd_amroutine->amparallelrescan != NULL)
+ scan->indexRelation->rd_amroutine->amparallelrescan(scan);
+}
+
+/*
+ * index_beginscan_parallel - join parallel index scan
+ *
+ * Caller must be holding suitable locks on the heap and the index.
+ */
+IndexScanDesc
+index_beginscan_parallel(Relation heaprel, Relation indexrel, int nkeys,
+ int norderbys, ParallelIndexScanDesc pscan)
+{
+ Snapshot snapshot;
+ IndexScanDesc scan;
+
+ Assert(RelationGetRelid(heaprel) == pscan->ps_relid);
+ snapshot = RestoreSnapshot(pscan->ps_snapshot_data);
+ RegisterSnapshot(snapshot);
+ scan = index_beginscan_internal(indexrel, nkeys, norderbys, snapshot,
+ pscan, true);
+
+ /*
+ * Save additional parameters into the scandesc. Everything else was set
+ * up by index_beginscan_internal.
+ */
+ scan->heapRelation = heaprel;
+ scan->xs_snapshot = snapshot;
+
+ return scan;
+}
+
/* ----------------
* index_getnext_tid - get the next TID from a scan
*
diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c
index 1bb1acfea6a..469e7abe4df 100644
--- a/src/backend/access/nbtree/nbtree.c
+++ b/src/backend/access/nbtree/nbtree.c
@@ -118,6 +118,9 @@ bthandler(PG_FUNCTION_ARGS)
amroutine->amendscan = btendscan;
amroutine->ammarkpos = btmarkpos;
amroutine->amrestrpos = btrestrpos;
+ amroutine->amestimateparallelscan = NULL;
+ amroutine->aminitparallelscan = NULL;
+ amroutine->amparallelrescan = NULL;
PG_RETURN_POINTER(amroutine);
}
diff --git a/src/backend/access/spgist/spgutils.c b/src/backend/access/spgist/spgutils.c
index ca4b0bdbe4f..78846bec666 100644
--- a/src/backend/access/spgist/spgutils.c
+++ b/src/backend/access/spgist/spgutils.c
@@ -68,6 +68,9 @@ spghandler(PG_FUNCTION_ARGS)
amroutine->amendscan = spgendscan;
amroutine->ammarkpos = NULL;
amroutine->amrestrpos = NULL;
+ amroutine->amestimateparallelscan = NULL;
+ amroutine->aminitparallelscan = NULL;
+ amroutine->amparallelrescan = NULL;
PG_RETURN_POINTER(amroutine);
}
diff --git a/src/include/access/amapi.h b/src/include/access/amapi.h
index 6a5f279e7f9..e91e41dc0f4 100644
--- a/src/include/access/amapi.h
+++ b/src/include/access/amapi.h
@@ -137,6 +137,18 @@ typedef void (*ammarkpos_function) (IndexScanDesc scan);
/* restore marked scan position */
typedef void (*amrestrpos_function) (IndexScanDesc scan);
+/*
+ * Callback function signatures - for parallel index scans.
+ */
+
+/* estimate size of parallel scan descriptor */
+typedef Size (*amestimateparallelscan_function) (void);
+
+/* prepare for parallel index scan */
+typedef void (*aminitparallelscan_function) (void *target);
+
+/* (re)start parallel index scan */
+typedef void (*amparallelrescan_function) (IndexScanDesc scan);
/*
* API struct for an index AM. Note this must be stored in a single palloc'd
@@ -196,6 +208,11 @@ typedef struct IndexAmRoutine
amendscan_function amendscan;
ammarkpos_function ammarkpos; /* can be NULL */
amrestrpos_function amrestrpos; /* can be NULL */
+
+ /* interface functions to support parallel index scans */
+ amestimateparallelscan_function amestimateparallelscan; /* can be NULL */
+ aminitparallelscan_function aminitparallelscan; /* can be NULL */
+ amparallelrescan_function amparallelrescan; /* can be NULL */
} IndexAmRoutine;
diff --git a/src/include/access/genam.h b/src/include/access/genam.h
index b2e078aed2e..51466b96e8b 100644
--- a/src/include/access/genam.h
+++ b/src/include/access/genam.h
@@ -83,6 +83,8 @@ typedef bool (*IndexBulkDeleteCallback) (ItemPointer itemptr, void *state);
typedef struct IndexScanDescData *IndexScanDesc;
typedef struct SysScanDescData *SysScanDesc;
+typedef struct ParallelIndexScanDescData *ParallelIndexScanDesc;
+
/*
* Enumeration specifying the type of uniqueness check to perform in
* index_insert().
@@ -144,6 +146,13 @@ extern void index_rescan(IndexScanDesc scan,
extern void index_endscan(IndexScanDesc scan);
extern void index_markpos(IndexScanDesc scan);
extern void index_restrpos(IndexScanDesc scan);
+extern Size index_parallelscan_estimate(Relation indexrel, Snapshot snapshot);
+extern void index_parallelscan_initialize(Relation heaprel, Relation indexrel,
+ Snapshot snapshot, ParallelIndexScanDesc target);
+extern void index_parallelrescan(IndexScanDesc scan);
+extern IndexScanDesc index_beginscan_parallel(Relation heaprel,
+ Relation indexrel, int nkeys, int norderbys,
+ ParallelIndexScanDesc pscan);
extern ItemPointer index_getnext_tid(IndexScanDesc scan,
ScanDirection direction);
extern HeapTuple index_fetch_heap(IndexScanDesc scan);
diff --git a/src/include/access/relscan.h b/src/include/access/relscan.h
index 8746045d8d8..ce3ca8d4ac2 100644
--- a/src/include/access/relscan.h
+++ b/src/include/access/relscan.h
@@ -93,6 +93,7 @@ typedef struct IndexScanDescData
ScanKey keyData; /* array of index qualifier descriptors */
ScanKey orderByData; /* array of ordering op descriptors */
bool xs_want_itup; /* caller requests index tuples */
+ bool xs_temp_snap; /* unregister snapshot at scan end? */
/* signaling to index AM about killing index tuples */
bool kill_prior_tuple; /* last-returned tuple is dead */
@@ -126,8 +127,20 @@ typedef struct IndexScanDescData
/* state data for traversing HOT chains in index_getnext */
bool xs_continue_hot; /* T if must keep walking HOT chain */
+
+ /* parallel index scan information, in shared memory */
+ ParallelIndexScanDesc parallel_scan;
} IndexScanDescData;
+/* Generic structure for parallel scans */
+typedef struct ParallelIndexScanDescData
+{
+ Oid ps_relid;
+ Oid ps_indexid;
+ Size ps_offset; /* Offset in bytes of am specific structure */
+ char ps_snapshot_data[FLEXIBLE_ARRAY_MEMBER];
+} ParallelIndexScanDescData;
+
/* Struct for heap-or-index scans of system tables */
typedef struct SysScanDescData
{
diff --git a/src/include/c.h b/src/include/c.h
index efbb77f540a..a2c043adfbf 100644
--- a/src/include/c.h
+++ b/src/include/c.h
@@ -527,6 +527,9 @@ typedef NameData *Name;
#define PointerIsAligned(pointer, type) \
(((uintptr_t)(pointer) % (sizeof (type))) == 0)
+#define OffsetToPointer(base, offset) \
+ ((void *)((char *) base + offset))
+
#define OidIsValid(objectId) ((bool) ((objectId) != InvalidOid))
#define RegProcedureIsValid(p) OidIsValid(p)
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 993880da43e..c4235ae63a4 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -1264,6 +1264,8 @@ OverrideSearchPath
OverrideStackEntry
PACE_HEADER
PACL
+ParallelIndexScanDesc
+ParallelIndexScanDescData
PATH
PBOOL
PCtxtHandle