aboutsummaryrefslogtreecommitdiff
path: root/src/backend/executor/execParallel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/executor/execParallel.c')
-rw-r--r--src/backend/executor/execParallel.c85
1 files changed, 72 insertions, 13 deletions
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index ce47f1d4a8b..2313b4c45cb 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -109,6 +109,8 @@ static bool ExecParallelInitializeDSM(PlanState *node,
ExecParallelInitializeDSMContext *d);
static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt,
bool reinitialize);
+static bool ExecParallelReInitializeDSM(PlanState *planstate,
+ ParallelContext *pcxt);
static bool ExecParallelRetrieveInstrumentation(PlanState *planstate,
SharedExecutorInstrumentation *instrumentation);
@@ -365,18 +367,6 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
}
/*
- * Re-initialize the parallel executor info such that it can be reused by
- * workers.
- */
-void
-ExecParallelReinitialize(ParallelExecutorInfo *pei)
-{
- ReinitializeParallelDSM(pei->pcxt);
- pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true);
- pei->finished = false;
-}
-
-/*
* Sets up the required infrastructure for backend workers to perform
* execution and return results to the main backend.
*/
@@ -567,7 +557,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
ExecParallelInitializeDSM(planstate, &d);
/*
- * Make sure that the world hasn't shifted under our feat. This could
+ * Make sure that the world hasn't shifted under our feet. This could
* probably just be an Assert(), but let's be conservative for now.
*/
if (e.nnodes != d.nnodes)
@@ -578,6 +568,75 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
}
/*
+ * Re-initialize the parallel executor shared memory state before launching
+ * a fresh batch of workers.
+ */
+void
+ExecParallelReinitialize(PlanState *planstate,
+ ParallelExecutorInfo *pei)
+{
+ /* Old workers must already be shut down */
+ Assert(pei->finished);
+
+ ReinitializeParallelDSM(pei->pcxt);
+ pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true);
+ pei->finished = false;
+
+ /* Traverse plan tree and let each child node reset associated state. */
+ ExecParallelReInitializeDSM(planstate, pei->pcxt);
+}
+
+/*
+ * Traverse plan tree to reinitialize per-node dynamic shared memory state
+ */
+static bool
+ExecParallelReInitializeDSM(PlanState *planstate,
+ ParallelContext *pcxt)
+{
+ if (planstate == NULL)
+ return false;
+
+ /*
+ * Call reinitializers for DSM-using plan nodes.
+ */
+ if (planstate->plan->parallel_aware)
+ {
+ switch (nodeTag(planstate))
+ {
+ case T_SeqScanState:
+ ExecSeqScanReInitializeDSM((SeqScanState *) planstate,
+ pcxt);
+ break;
+ case T_IndexScanState:
+ ExecIndexScanReInitializeDSM((IndexScanState *) planstate,
+ pcxt);
+ break;
+ case T_IndexOnlyScanState:
+ ExecIndexOnlyScanReInitializeDSM((IndexOnlyScanState *) planstate,
+ pcxt);
+ break;
+ case T_ForeignScanState:
+ ExecForeignScanReInitializeDSM((ForeignScanState *) planstate,
+ pcxt);
+ break;
+ case T_CustomScanState:
+ ExecCustomScanReInitializeDSM((CustomScanState *) planstate,
+ pcxt);
+ break;
+ case T_BitmapHeapScanState:
+ ExecBitmapHeapReInitializeDSM((BitmapHeapScanState *) planstate,
+ pcxt);
+ break;
+
+ default:
+ break;
+ }
+ }
+
+ return planstate_tree_walker(planstate, ExecParallelReInitializeDSM, pcxt);
+}
+
+/*
* Copy instrumentation information about this node and its descendants from
* dynamic shared memory.
*/