aboutsummaryrefslogtreecommitdiff
path: root/src/backend/executor
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/executor')
-rw-r--r--src/backend/executor/execParallel.c19
-rw-r--r--src/backend/executor/nodeAppend.c331
2 files changed, 293 insertions, 57 deletions
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index ff5cff59b0f..558cb08b07e 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -26,6 +26,7 @@
#include "executor/execExpr.h"
#include "executor/execParallel.h"
#include "executor/executor.h"
+#include "executor/nodeAppend.h"
#include "executor/nodeBitmapHeapscan.h"
#include "executor/nodeCustom.h"
#include "executor/nodeForeignscan.h"
@@ -250,6 +251,11 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
ExecForeignScanEstimate((ForeignScanState *) planstate,
e->pcxt);
break;
+ case T_AppendState:
+ if (planstate->plan->parallel_aware)
+ ExecAppendEstimate((AppendState *) planstate,
+ e->pcxt);
+ break;
case T_CustomScanState:
if (planstate->plan->parallel_aware)
ExecCustomScanEstimate((CustomScanState *) planstate,
@@ -453,6 +459,11 @@ ExecParallelInitializeDSM(PlanState *planstate,
ExecForeignScanInitializeDSM((ForeignScanState *) planstate,
d->pcxt);
break;
+ case T_AppendState:
+ if (planstate->plan->parallel_aware)
+ ExecAppendInitializeDSM((AppendState *) planstate,
+ d->pcxt);
+ break;
case T_CustomScanState:
if (planstate->plan->parallel_aware)
ExecCustomScanInitializeDSM((CustomScanState *) planstate,
@@ -884,6 +895,10 @@ ExecParallelReInitializeDSM(PlanState *planstate,
ExecForeignScanReInitializeDSM((ForeignScanState *) planstate,
pcxt);
break;
+ case T_AppendState:
+ if (planstate->plan->parallel_aware)
+ ExecAppendReInitializeDSM((AppendState *) planstate, pcxt);
+ break;
case T_CustomScanState:
if (planstate->plan->parallel_aware)
ExecCustomScanReInitializeDSM((CustomScanState *) planstate,
@@ -1194,6 +1209,10 @@ ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
ExecForeignScanInitializeWorker((ForeignScanState *) planstate,
pwcxt);
break;
+ case T_AppendState:
+ if (planstate->plan->parallel_aware)
+ ExecAppendInitializeWorker((AppendState *) planstate, pwcxt);
+ break;
case T_CustomScanState:
if (planstate->plan->parallel_aware)
ExecCustomScanInitializeWorker((CustomScanState *) planstate,
diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c
index 1d2fb35d551..246a0b2d852 100644
--- a/src/backend/executor/nodeAppend.c
+++ b/src/backend/executor/nodeAppend.c
@@ -61,51 +61,27 @@
#include "executor/nodeAppend.h"
#include "miscadmin.h"
-static TupleTableSlot *ExecAppend(PlanState *pstate);
-static bool exec_append_initialize_next(AppendState *appendstate);
-
-
-/* ----------------------------------------------------------------
- * exec_append_initialize_next
- *
- * Sets up the append state node for the "next" scan.
- *
- * Returns t iff there is a "next" scan to process.
- * ----------------------------------------------------------------
- */
-static bool
-exec_append_initialize_next(AppendState *appendstate)
+/* Shared state for parallel-aware Append. */
+struct ParallelAppendState
{
- int whichplan;
+ LWLock pa_lock; /* mutual exclusion to choose next subplan */
+ int pa_next_plan; /* next plan to choose by any worker */
/*
- * get information from the append node
+ * pa_finished[i] should be true if no more workers should select subplan
+ * i. for a non-partial plan, this should be set to true as soon as a
+ * worker selects the plan; for a partial plan, it remains false until
+ * some worker executes the plan to completion.
*/
- whichplan = appendstate->as_whichplan;
+ bool pa_finished[FLEXIBLE_ARRAY_MEMBER];
+};
- if (whichplan < 0)
- {
- /*
- * if scanning in reverse, we start at the last scan in the list and
- * then proceed back to the first.. in any case we inform ExecAppend
- * that we are at the end of the line by returning false
- */
- appendstate->as_whichplan = 0;
- return false;
- }
- else if (whichplan >= appendstate->as_nplans)
- {
- /*
- * as above, end the scan if we go beyond the last scan in our list..
- */
- appendstate->as_whichplan = appendstate->as_nplans - 1;
- return false;
- }
- else
- {
- return true;
- }
-}
+#define INVALID_SUBPLAN_INDEX -1
+
+static TupleTableSlot *ExecAppend(PlanState *pstate);
+static bool choose_next_subplan_locally(AppendState *node);
+static bool choose_next_subplan_for_leader(AppendState *node);
+static bool choose_next_subplan_for_worker(AppendState *node);
/* ----------------------------------------------------------------
* ExecInitAppend
@@ -185,10 +161,15 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
appendstate->ps.ps_ProjInfo = NULL;
/*
- * initialize to scan first subplan
+ * Parallel-aware append plans must choose the first subplan to execute by
+ * looking at shared memory, but non-parallel-aware append plans can
+ * always start with the first subplan.
*/
- appendstate->as_whichplan = 0;
- exec_append_initialize_next(appendstate);
+ appendstate->as_whichplan =
+ appendstate->ps.plan->parallel_aware ? INVALID_SUBPLAN_INDEX : 0;
+
+ /* If parallel-aware, this will be overridden later. */
+ appendstate->choose_next_subplan = choose_next_subplan_locally;
return appendstate;
}
@@ -204,6 +185,11 @@ ExecAppend(PlanState *pstate)
{
AppendState *node = castNode(AppendState, pstate);
+ /* If no subplan has been chosen, we must choose one before proceeding. */
+ if (node->as_whichplan == INVALID_SUBPLAN_INDEX &&
+ !node->choose_next_subplan(node))
+ return ExecClearTuple(node->ps.ps_ResultTupleSlot);
+
for (;;)
{
PlanState *subnode;
@@ -214,6 +200,7 @@ ExecAppend(PlanState *pstate)
/*
* figure out which subplan we are currently processing
*/
+ Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as_nplans);
subnode = node->appendplans[node->as_whichplan];
/*
@@ -231,19 +218,9 @@ ExecAppend(PlanState *pstate)
return result;
}
- /*
- * Go on to the "next" subplan in the appropriate direction. If no
- * more subplans, return the empty slot set up for us by
- * ExecInitAppend.
- */
- if (ScanDirectionIsForward(node->ps.state->es_direction))
- node->as_whichplan++;
- else
- node->as_whichplan--;
- if (!exec_append_initialize_next(node))
+ /* choose new subplan; if none, we're done */
+ if (!node->choose_next_subplan(node))
return ExecClearTuple(node->ps.ps_ResultTupleSlot);
-
- /* Else loop back and try to get a tuple from the new subplan */
}
}
@@ -298,6 +275,246 @@ ExecReScanAppend(AppendState *node)
if (subnode->chgParam == NULL)
ExecReScan(subnode);
}
- node->as_whichplan = 0;
- exec_append_initialize_next(node);
+
+ node->as_whichplan =
+ node->ps.plan->parallel_aware ? INVALID_SUBPLAN_INDEX : 0;
+}
+
+/* ----------------------------------------------------------------
+ * Parallel Append Support
+ * ----------------------------------------------------------------
+ */
+
+/* ----------------------------------------------------------------
+ * ExecAppendEstimate
+ *
+ * Compute the amount of space we'll need in the parallel
+ * query DSM, and inform pcxt->estimator about our needs.
+ * ----------------------------------------------------------------
+ */
+void
+ExecAppendEstimate(AppendState *node,
+ ParallelContext *pcxt)
+{
+ node->pstate_len =
+ add_size(offsetof(ParallelAppendState, pa_finished),
+ sizeof(bool) * node->as_nplans);
+
+ shm_toc_estimate_chunk(&pcxt->estimator, node->pstate_len);
+ shm_toc_estimate_keys(&pcxt->estimator, 1);
+}
+
+
+/* ----------------------------------------------------------------
+ * ExecAppendInitializeDSM
+ *
+ * Set up shared state for Parallel Append.
+ * ----------------------------------------------------------------
+ */
+void
+ExecAppendInitializeDSM(AppendState *node,
+ ParallelContext *pcxt)
+{
+ ParallelAppendState *pstate;
+
+ pstate = shm_toc_allocate(pcxt->toc, node->pstate_len);
+ memset(pstate, 0, node->pstate_len);
+ LWLockInitialize(&pstate->pa_lock, LWTRANCHE_PARALLEL_APPEND);
+ shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id, pstate);
+
+ node->as_pstate = pstate;
+ node->choose_next_subplan = choose_next_subplan_for_leader;
+}
+
+/* ----------------------------------------------------------------
+ * ExecAppendReInitializeDSM
+ *
+ * Reset shared state before beginning a fresh scan.
+ * ----------------------------------------------------------------
+ */
+void
+ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt)
+{
+ ParallelAppendState *pstate = node->as_pstate;
+
+ pstate->pa_next_plan = 0;
+ memset(pstate->pa_finished, 0, sizeof(bool) * node->as_nplans);
+}
+
+/* ----------------------------------------------------------------
+ * ExecAppendInitializeWorker
+ *
+ * Copy relevant information from TOC into planstate, and initialize
+ * whatever is required to choose and execute the optimal subplan.
+ * ----------------------------------------------------------------
+ */
+void
+ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt)
+{
+ node->as_pstate = shm_toc_lookup(pwcxt->toc, node->ps.plan->plan_node_id, false);
+ node->choose_next_subplan = choose_next_subplan_for_worker;
+}
+
+/* ----------------------------------------------------------------
+ * choose_next_subplan_locally
+ *
+ * Choose next subplan for a non-parallel-aware Append,
+ * returning false if there are no more.
+ * ----------------------------------------------------------------
+ */
+static bool
+choose_next_subplan_locally(AppendState *node)
+{
+ int whichplan = node->as_whichplan;
+
+ /* We should never see INVALID_SUBPLAN_INDEX in this case. */
+ Assert(whichplan >= 0 && whichplan <= node->as_nplans);
+
+ if (ScanDirectionIsForward(node->ps.state->es_direction))
+ {
+ if (whichplan >= node->as_nplans - 1)
+ return false;
+ node->as_whichplan++;
+ }
+ else
+ {
+ if (whichplan <= 0)
+ return false;
+ node->as_whichplan--;
+ }
+
+ return true;
+}
+
+/* ----------------------------------------------------------------
+ * choose_next_subplan_for_leader
+ *
+ * Try to pick a plan which doesn't commit us to doing much
+ * work locally, so that as much work as possible is done in
+ * the workers. Cheapest subplans are at the end.
+ * ----------------------------------------------------------------
+ */
+static bool
+choose_next_subplan_for_leader(AppendState *node)
+{
+ ParallelAppendState *pstate = node->as_pstate;
+ Append *append = (Append *) node->ps.plan;
+
+ /* Backward scan is not supported by parallel-aware plans */
+ Assert(ScanDirectionIsForward(node->ps.state->es_direction));
+
+ LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE);
+
+ if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
+ {
+ /* Mark just-completed subplan as finished. */
+ node->as_pstate->pa_finished[node->as_whichplan] = true;
+ }
+ else
+ {
+ /* Start with last subplan. */
+ node->as_whichplan = node->as_nplans - 1;
+ }
+
+ /* Loop until we find a subplan to execute. */
+ while (pstate->pa_finished[node->as_whichplan])
+ {
+ if (node->as_whichplan == 0)
+ {
+ pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
+ node->as_whichplan = INVALID_SUBPLAN_INDEX;
+ LWLockRelease(&pstate->pa_lock);
+ return false;
+ }
+ node->as_whichplan--;
+ }
+
+ /* If non-partial, immediately mark as finished. */
+ if (node->as_whichplan < append->first_partial_plan)
+ node->as_pstate->pa_finished[node->as_whichplan] = true;
+
+ LWLockRelease(&pstate->pa_lock);
+
+ return true;
+}
+
+/* ----------------------------------------------------------------
+ * choose_next_subplan_for_worker
+ *
+ * Choose next subplan for a parallel-aware Append, returning
+ * false if there are no more.
+ *
+ * We start from the first plan and advance through the list;
+ * when we get back to the end, we loop back to the first
+ * nonpartial plan. This assigns the non-partial plans first
+ * in order of descending cost and then spreads out the
+ * workers as evenly as possible across the remaining partial
+ * plans.
+ * ----------------------------------------------------------------
+ */
+static bool
+choose_next_subplan_for_worker(AppendState *node)
+{
+ ParallelAppendState *pstate = node->as_pstate;
+ Append *append = (Append *) node->ps.plan;
+
+ /* Backward scan is not supported by parallel-aware plans */
+ Assert(ScanDirectionIsForward(node->ps.state->es_direction));
+
+ LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE);
+
+ /* Mark just-completed subplan as finished. */
+ if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
+ node->as_pstate->pa_finished[node->as_whichplan] = true;
+
+ /* If all the plans are already done, we have nothing to do */
+ if (pstate->pa_next_plan == INVALID_SUBPLAN_INDEX)
+ {
+ LWLockRelease(&pstate->pa_lock);
+ return false;
+ }
+
+ /* Loop until we find a subplan to execute. */
+ while (pstate->pa_finished[pstate->pa_next_plan])
+ {
+ if (pstate->pa_next_plan < node->as_nplans - 1)
+ {
+ /* Advance to next plan. */
+ pstate->pa_next_plan++;
+ }
+ else if (append->first_partial_plan < node->as_nplans)
+ {
+ /* Loop back to first partial plan. */
+ pstate->pa_next_plan = append->first_partial_plan;
+ }
+ else
+ {
+ /* At last plan, no partial plans, arrange to bail out. */
+ pstate->pa_next_plan = node->as_whichplan;
+ }
+
+ if (pstate->pa_next_plan == node->as_whichplan)
+ {
+ /* We've tried everything! */
+ pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
+ LWLockRelease(&pstate->pa_lock);
+ return false;
+ }
+ }
+
+ /* Pick the plan we found, and advance pa_next_plan one more time. */
+ node->as_whichplan = pstate->pa_next_plan++;
+ if (pstate->pa_next_plan >= node->as_nplans)
+ {
+ Assert(append->first_partial_plan < node->as_nplans);
+ pstate->pa_next_plan = append->first_partial_plan;
+ }
+
+ /* If non-partial, immediately mark as finished. */
+ if (node->as_whichplan < append->first_partial_plan)
+ node->as_pstate->pa_finished[node->as_whichplan] = true;
+
+ LWLockRelease(&pstate->pa_lock);
+
+ return true;
}