aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/executor/execParallel.c54
-rw-r--r--src/backend/executor/execProcnode.c121
-rw-r--r--src/backend/executor/nodeGather.c4
-rw-r--r--src/backend/executor/nodeGatherMerge.c4
-rw-r--r--src/backend/executor/nodeLimit.c98
-rw-r--r--src/include/executor/execParallel.h2
-rw-r--r--src/include/executor/executor.h1
-rw-r--r--src/include/nodes/execnodes.h2
-rw-r--r--src/test/regress/expected/select_parallel.out24
-rw-r--r--src/test/regress/sql/select_parallel.sql9
10 files changed, 222 insertions, 97 deletions
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index ce47f1d4a8b..ad9eba63dd3 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -47,17 +47,26 @@
* greater than any 32-bit integer here so that values < 2^32 can be used
* by individual parallel nodes to store their own state.
*/
-#define PARALLEL_KEY_PLANNEDSTMT UINT64CONST(0xE000000000000001)
-#define PARALLEL_KEY_PARAMS UINT64CONST(0xE000000000000002)
-#define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xE000000000000003)
-#define PARALLEL_KEY_TUPLE_QUEUE UINT64CONST(0xE000000000000004)
-#define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000005)
-#define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000006)
-#define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000007)
+#define PARALLEL_KEY_EXECUTOR_FIXED UINT64CONST(0xE000000000000001)
+#define PARALLEL_KEY_PLANNEDSTMT UINT64CONST(0xE000000000000002)
+#define PARALLEL_KEY_PARAMS UINT64CONST(0xE000000000000003)
+#define PARALLEL_KEY_BUFFER_USAGE UINT64CONST(0xE000000000000004)
+#define PARALLEL_KEY_TUPLE_QUEUE UINT64CONST(0xE000000000000005)
+#define PARALLEL_KEY_INSTRUMENTATION UINT64CONST(0xE000000000000006)
+#define PARALLEL_KEY_DSA UINT64CONST(0xE000000000000007)
+#define PARALLEL_KEY_QUERY_TEXT UINT64CONST(0xE000000000000008)
#define PARALLEL_TUPLE_QUEUE_SIZE 65536
/*
+ * Fixed-size random stuff that we need to pass to parallel workers.
+ */
+typedef struct FixedParallelExecutorState
+{
+ int64 tuples_needed; /* tuple bound, see ExecSetTupleBound */
+} FixedParallelExecutorState;
+
+/*
* DSM structure for accumulating per-PlanState instrumentation.
*
* instrument_options: Same meaning here as in instrument.c.
@@ -381,12 +390,14 @@ ExecParallelReinitialize(ParallelExecutorInfo *pei)
* execution and return results to the main backend.
*/
ParallelExecutorInfo *
-ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
+ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers,
+ int64 tuples_needed)
{
ParallelExecutorInfo *pei;
ParallelContext *pcxt;
ExecParallelEstimateContext e;
ExecParallelInitializeDSMContext d;
+ FixedParallelExecutorState *fpes;
char *pstmt_data;
char *pstmt_space;
char *param_space;
@@ -418,6 +429,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
* for the various things we need to store.
*/
+ /* Estimate space for fixed-size state. */
+ shm_toc_estimate_chunk(&pcxt->estimator,
+ sizeof(FixedParallelExecutorState));
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+
/* Estimate space for query text. */
query_len = strlen(estate->es_sourceText);
shm_toc_estimate_chunk(&pcxt->estimator, query_len);
@@ -487,6 +503,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
* asked for has been allocated or initialized yet, though, so do that.
*/
+ /* Store fixed-size state. */
+ fpes = shm_toc_allocate(pcxt->toc, sizeof(FixedParallelExecutorState));
+ fpes->tuples_needed = tuples_needed;
+ shm_toc_insert(pcxt->toc, PARALLEL_KEY_EXECUTOR_FIXED, fpes);
+
/* Store query string */
query_string = shm_toc_allocate(pcxt->toc, query_len);
memcpy(query_string, estate->es_sourceText, query_len);
@@ -833,6 +854,7 @@ ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc)
void
ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
{
+ FixedParallelExecutorState *fpes;
BufferUsage *buffer_usage;
DestReceiver *receiver;
QueryDesc *queryDesc;
@@ -841,6 +863,9 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
void *area_space;
dsa_area *area;
+ /* Get fixed-size state. */
+ fpes = shm_toc_lookup(toc, PARALLEL_KEY_EXECUTOR_FIXED, false);
+
/* Set up DestReceiver, SharedExecutorInstrumentation, and QueryDesc. */
receiver = ExecParallelGetReceiver(seg, toc);
instrumentation = shm_toc_lookup(toc, PARALLEL_KEY_INSTRUMENTATION, true);
@@ -868,8 +893,17 @@ ParallelQueryMain(dsm_segment *seg, shm_toc *toc)
queryDesc->planstate->state->es_query_dsa = area;
ExecParallelInitializeWorker(queryDesc->planstate, toc);
- /* Run the plan */
- ExecutorRun(queryDesc, ForwardScanDirection, 0L, true);
+ /* Pass down any tuple bound */
+ ExecSetTupleBound(fpes->tuples_needed, queryDesc->planstate);
+
+ /*
+ * Run the plan. If we specified a tuple bound, be careful not to demand
+ * more tuples than that.
+ */
+ ExecutorRun(queryDesc,
+ ForwardScanDirection,
+ fpes->tuples_needed < 0 ? (int64) 0 : fpes->tuples_needed,
+ true);
/* Shut down the executor */
ExecutorFinish(queryDesc);
diff --git a/src/backend/executor/execProcnode.c b/src/backend/executor/execProcnode.c
index 36d2914249c..c1aa5064c90 100644
--- a/src/backend/executor/execProcnode.c
+++ b/src/backend/executor/execProcnode.c
@@ -757,3 +757,124 @@ ExecShutdownNode(PlanState *node)
return false;
}
+
+/*
+ * ExecSetTupleBound
+ *
+ * Set a tuple bound for a planstate node. This lets child plan nodes
+ * optimize based on the knowledge that the maximum number of tuples that
+ * their parent will demand is limited. The tuple bound for a node may
+ * only be changed between scans (i.e., after node initialization or just
+ * before an ExecReScan call).
+ *
+ * Any negative tuples_needed value means "no limit", which should be the
+ * default assumption when this is not called at all for a particular node.
+ *
+ * Note: if this is called repeatedly on a plan tree, the exact same set
+ * of nodes must be updated with the new limit each time; be careful that
+ * only unchanging conditions are tested here.
+ */
+void
+ExecSetTupleBound(int64 tuples_needed, PlanState *child_node)
+{
+ /*
+ * Since this function recurses, in principle we should check stack depth
+ * here. In practice, it's probably pointless since the earlier node
+ * initialization tree traversal would surely have consumed more stack.
+ */
+
+ if (IsA(child_node, SortState))
+ {
+ /*
+ * If it is a Sort node, notify it that it can use bounded sort.
+ *
+ * Note: it is the responsibility of nodeSort.c to react properly to
+ * changes of these parameters. If we ever redesign this, it'd be a
+ * good idea to integrate this signaling with the parameter-change
+ * mechanism.
+ */
+ SortState *sortState = (SortState *) child_node;
+
+ if (tuples_needed < 0)
+ {
+ /* make sure flag gets reset if needed upon rescan */
+ sortState->bounded = false;
+ }
+ else
+ {
+ sortState->bounded = true;
+ sortState->bound = tuples_needed;
+ }
+ }
+ else if (IsA(child_node, MergeAppendState))
+ {
+ /*
+ * If it is a MergeAppend, we can apply the bound to any nodes that
+ * are children of the MergeAppend, since the MergeAppend surely need
+ * read no more than that many tuples from any one input.
+ */
+ MergeAppendState *maState = (MergeAppendState *) child_node;
+ int i;
+
+ for (i = 0; i < maState->ms_nplans; i++)
+ ExecSetTupleBound(tuples_needed, maState->mergeplans[i]);
+ }
+ else if (IsA(child_node, ResultState))
+ {
+ /*
+ * Similarly, for a projecting Result, we can apply the bound to its
+ * child node.
+ *
+ * If Result supported qual checking, we'd have to punt on seeing a
+ * qual. Note that having a resconstantqual is not a showstopper: if
+ * that condition succeeds it affects nothing, while if it fails, no
+ * rows will be demanded from the Result child anyway.
+ */
+ if (outerPlanState(child_node))
+ ExecSetTupleBound(tuples_needed, outerPlanState(child_node));
+ }
+ else if (IsA(child_node, SubqueryScanState))
+ {
+ /*
+ * We can also descend through SubqueryScan, but only if it has no
+ * qual (otherwise it might discard rows).
+ */
+ SubqueryScanState *subqueryState = (SubqueryScanState *) child_node;
+
+ if (subqueryState->ss.ps.qual == NULL)
+ ExecSetTupleBound(tuples_needed, subqueryState->subplan);
+ }
+ else if (IsA(child_node, GatherState))
+ {
+ /*
+ * A Gather node can propagate the bound to its workers. As with
+ * MergeAppend, no one worker could possibly need to return more
+ * tuples than the Gather itself needs to.
+ *
+ * Note: As with Sort, the Gather node is responsible for reacting
+ * properly to changes to this parameter.
+ */
+ GatherState *gstate = (GatherState *) child_node;
+
+ gstate->tuples_needed = tuples_needed;
+
+ /* Also pass down the bound to our own copy of the child plan */
+ ExecSetTupleBound(tuples_needed, outerPlanState(child_node));
+ }
+ else if (IsA(child_node, GatherMergeState))
+ {
+ /* Same comments as for Gather */
+ GatherMergeState *gstate = (GatherMergeState *) child_node;
+
+ gstate->tuples_needed = tuples_needed;
+
+ ExecSetTupleBound(tuples_needed, outerPlanState(child_node));
+ }
+
+ /*
+ * In principle we could descend through any plan node type that is
+ * certain not to discard or combine input rows; but on seeing a node that
+ * can do that, we can't propagate the bound any further. For the moment
+ * it's unclear that any other cases are worth checking here.
+ */
+}
diff --git a/src/backend/executor/nodeGather.c b/src/backend/executor/nodeGather.c
index e8d94ee6f38..a0f5a60d932 100644
--- a/src/backend/executor/nodeGather.c
+++ b/src/backend/executor/nodeGather.c
@@ -72,6 +72,7 @@ ExecInitGather(Gather *node, EState *estate, int eflags)
gatherstate->ps.state = estate;
gatherstate->ps.ExecProcNode = ExecGather;
gatherstate->need_to_scan_locally = !node->single_copy;
+ gatherstate->tuples_needed = -1;
/*
* Miscellaneous initialization
@@ -156,7 +157,8 @@ ExecGather(PlanState *pstate)
if (!node->pei)
node->pei = ExecInitParallelPlan(node->ps.lefttree,
estate,
- gather->num_workers);
+ gather->num_workers,
+ node->tuples_needed);
/*
* Register backend workers. We might not get as many as we
diff --git a/src/backend/executor/nodeGatherMerge.c b/src/backend/executor/nodeGatherMerge.c
index 64c62398bbe..2526c584fd0 100644
--- a/src/backend/executor/nodeGatherMerge.c
+++ b/src/backend/executor/nodeGatherMerge.c
@@ -77,6 +77,7 @@ ExecInitGatherMerge(GatherMerge *node, EState *estate, int eflags)
gm_state->ps.plan = (Plan *) node;
gm_state->ps.state = estate;
gm_state->ps.ExecProcNode = ExecGatherMerge;
+ gm_state->tuples_needed = -1;
/*
* Miscellaneous initialization
@@ -190,7 +191,8 @@ ExecGatherMerge(PlanState *pstate)
if (!node->pei)
node->pei = ExecInitParallelPlan(node->ps.lefttree,
estate,
- gm->num_workers);
+ gm->num_workers,
+ node->tuples_needed);
/* Try to launch workers. */
pcxt = node->pei->pcxt;
diff --git a/src/backend/executor/nodeLimit.c b/src/backend/executor/nodeLimit.c
index ceb6854b597..883f46ce7c9 100644
--- a/src/backend/executor/nodeLimit.c
+++ b/src/backend/executor/nodeLimit.c
@@ -27,7 +27,7 @@
#include "nodes/nodeFuncs.h"
static void recompute_limits(LimitState *node);
-static void pass_down_bound(LimitState *node, PlanState *child_node);
+static int64 compute_tuples_needed(LimitState *node);
/* ----------------------------------------------------------------
@@ -297,92 +297,26 @@ recompute_limits(LimitState *node)
/* Set state-machine state */
node->lstate = LIMIT_RESCAN;
- /* Notify child node about limit, if useful */
- pass_down_bound(node, outerPlanState(node));
+ /*
+ * Notify child node about limit. Note: think not to "optimize" by
+ * skipping ExecSetTupleBound if compute_tuples_needed returns < 0. We
+ * must update the child node anyway, in case this is a rescan and the
+ * previous time we got a different result.
+ */
+ ExecSetTupleBound(compute_tuples_needed(node), outerPlanState(node));
}
/*
- * If we have a COUNT, and our input is a Sort node, notify it that it can
- * use bounded sort. We can also pass down the bound through plan nodes
- * that cannot remove or combine input rows; for example, if our input is a
- * MergeAppend, we can apply the same bound to any Sorts that are direct
- * children of the MergeAppend, since the MergeAppend surely need not read
- * more than that many tuples from any one input.
- *
- * This is a bit of a kluge, but we don't have any more-abstract way of
- * communicating between the two nodes; and it doesn't seem worth trying
- * to invent one without some more examples of special communication needs.
- *
- * Note: it is the responsibility of nodeSort.c to react properly to
- * changes of these parameters. If we ever do redesign this, it'd be a
- * good idea to integrate this signaling with the parameter-change mechanism.
+ * Compute the maximum number of tuples needed to satisfy this Limit node.
+ * Return a negative value if there is not a determinable limit.
*/
-static void
-pass_down_bound(LimitState *node, PlanState *child_node)
+static int64
+compute_tuples_needed(LimitState *node)
{
- /*
- * Since this function recurses, in principle we should check stack depth
- * here. In practice, it's probably pointless since the earlier node
- * initialization tree traversal would surely have consumed more stack.
- */
-
- if (IsA(child_node, SortState))
- {
- SortState *sortState = (SortState *) child_node;
- int64 tuples_needed = node->count + node->offset;
-
- /* negative test checks for overflow in sum */
- if (node->noCount || tuples_needed < 0)
- {
- /* make sure flag gets reset if needed upon rescan */
- sortState->bounded = false;
- }
- else
- {
- sortState->bounded = true;
- sortState->bound = tuples_needed;
- }
- }
- else if (IsA(child_node, MergeAppendState))
- {
- /* Pass down the bound through MergeAppend */
- MergeAppendState *maState = (MergeAppendState *) child_node;
- int i;
-
- for (i = 0; i < maState->ms_nplans; i++)
- pass_down_bound(node, maState->mergeplans[i]);
- }
- else if (IsA(child_node, ResultState))
- {
- /*
- * We also have to be prepared to look through a Result, since the
- * planner might stick one atop MergeAppend for projection purposes.
- *
- * If Result supported qual checking, we'd have to punt on seeing a
- * qual. Note that having a resconstantqual is not a showstopper: if
- * that fails we're not getting any rows at all.
- */
- if (outerPlanState(child_node))
- pass_down_bound(node, outerPlanState(child_node));
- }
- else if (IsA(child_node, SubqueryScanState))
- {
- /*
- * We can also look through SubqueryScan, but only if it has no qual
- * (otherwise it might discard rows).
- */
- SubqueryScanState *subqueryState = (SubqueryScanState *) child_node;
-
- if (subqueryState->ss.ps.qual == NULL)
- pass_down_bound(node, subqueryState->subplan);
- }
-
- /*
- * In principle we could look through any plan node type that is certain
- * not to discard or combine input rows. In practice, there are not many
- * node types that the planner might put between Sort and Limit, so trying
- * to be very general is not worth the trouble.
- */
+ if (node->noCount)
+ return -1;
+ /* Note: if this overflows, we'll return a negative value, which is OK */
+ return node->count + node->offset;
}
/* ----------------------------------------------------------------
diff --git a/src/include/executor/execParallel.h b/src/include/executor/execParallel.h
index bd0a87fa041..79b886706f7 100644
--- a/src/include/executor/execParallel.h
+++ b/src/include/executor/execParallel.h
@@ -33,7 +33,7 @@ typedef struct ParallelExecutorInfo
} ParallelExecutorInfo;
extern ParallelExecutorInfo *ExecInitParallelPlan(PlanState *planstate,
- EState *estate, int nworkers);
+ EState *estate, int nworkers, int64 tuples_needed);
extern void ExecParallelFinish(ParallelExecutorInfo *pei);
extern void ExecParallelCleanup(ParallelExecutorInfo *pei);
extern void ExecParallelReinitialize(ParallelExecutorInfo *pei);
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index eacbea3c365..f48a603daeb 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -232,6 +232,7 @@ extern PlanState *ExecInitNode(Plan *node, EState *estate, int eflags);
extern Node *MultiExecProcNode(PlanState *node);
extern void ExecEndNode(PlanState *node);
extern bool ExecShutdownNode(PlanState *node);
+extern void ExecSetTupleBound(int64 tuples_needed, PlanState *child_node);
/* ----------------------------------------------------------------
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 3272c4b3155..15a84269ec9 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -1919,6 +1919,7 @@ typedef struct GatherState
struct TupleQueueReader **reader;
TupleTableSlot *funnel_slot;
bool need_to_scan_locally;
+ int64 tuples_needed; /* tuple bound, see ExecSetTupleBound */
} GatherState;
/* ----------------
@@ -1944,6 +1945,7 @@ typedef struct GatherMergeState
struct binaryheap *gm_heap; /* binary heap of slot indices */
bool gm_initialized; /* gather merge initilized ? */
bool need_to_scan_locally;
+ int64 tuples_needed; /* tuple bound, see ExecSetTupleBound */
int gm_nkeys;
SortSupport gm_sortkeys; /* array of length ms_nkeys */
struct GMReaderTupleBuffer *gm_tuple_buffers; /* tuple buffer per reader */
diff --git a/src/test/regress/expected/select_parallel.out b/src/test/regress/expected/select_parallel.out
index 084f0f0c8e1..ccad18e978f 100644
--- a/src/test/regress/expected/select_parallel.out
+++ b/src/test/regress/expected/select_parallel.out
@@ -300,6 +300,29 @@ select count(*) from tenk1 group by twenty;
500
(20 rows)
+reset enable_hashagg;
+-- gather merge test with a LIMIT
+explain (costs off)
+ select fivethous from tenk1 order by fivethous limit 4;
+ QUERY PLAN
+----------------------------------------------
+ Limit
+ -> Gather Merge
+ Workers Planned: 4
+ -> Sort
+ Sort Key: fivethous
+ -> Parallel Seq Scan on tenk1
+(6 rows)
+
+select fivethous from tenk1 order by fivethous limit 4;
+ fivethous
+-----------
+ 0
+ 0
+ 1
+ 1
+(4 rows)
+
-- gather merge test with 0 worker
set max_parallel_workers = 0;
explain (costs off)
@@ -325,7 +348,6 @@ select string4 from tenk1 order by string4 limit 5;
(5 rows)
reset max_parallel_workers;
-reset enable_hashagg;
SAVEPOINT settings;
SET LOCAL force_parallel_mode = 1;
explain (costs off)
diff --git a/src/test/regress/sql/select_parallel.sql b/src/test/regress/sql/select_parallel.sql
index 58c3f598905..c0debddbcd1 100644
--- a/src/test/regress/sql/select_parallel.sql
+++ b/src/test/regress/sql/select_parallel.sql
@@ -118,13 +118,20 @@ explain (costs off)
select count(*) from tenk1 group by twenty;
+reset enable_hashagg;
+
+-- gather merge test with a LIMIT
+explain (costs off)
+ select fivethous from tenk1 order by fivethous limit 4;
+
+select fivethous from tenk1 order by fivethous limit 4;
+
-- gather merge test with 0 worker
set max_parallel_workers = 0;
explain (costs off)
select string4 from tenk1 order by string4 limit 5;
select string4 from tenk1 order by string4 limit 5;
reset max_parallel_workers;
-reset enable_hashagg;
SAVEPOINT settings;
SET LOCAL force_parallel_mode = 1;