aboutsummaryrefslogtreecommitdiff
path: root/src/backend/executor/execParallel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/executor/execParallel.c')
-rw-r--r--src/backend/executor/execParallel.c101
1 files changed, 86 insertions, 15 deletions
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index 01316ff5d94..c713b851399 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -119,6 +119,8 @@ static bool ExecParallelInitializeDSM(PlanState *node,
ExecParallelInitializeDSMContext *d);
static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt,
bool reinitialize);
+static bool ExecParallelReInitializeDSM(PlanState *planstate,
+ ParallelContext *pcxt);
static bool ExecParallelRetrieveInstrumentation(PlanState *planstate,
SharedExecutorInstrumentation *instrumentation);
@@ -255,6 +257,8 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
case T_SortState:
/* even when not parallel-aware */
ExecSortEstimate((SortState *) planstate, e->pcxt);
+ break;
+
default:
break;
}
@@ -325,6 +329,8 @@ ExecParallelInitializeDSM(PlanState *planstate,
case T_SortState:
/* even when not parallel-aware */
ExecSortInitializeDSM((SortState *) planstate, d->pcxt);
+ break;
+
default:
break;
}
@@ -385,18 +391,6 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
}
/*
- * Re-initialize the parallel executor info such that it can be reused by
- * workers.
- */
-void
-ExecParallelReinitialize(ParallelExecutorInfo *pei)
-{
- ReinitializeParallelDSM(pei->pcxt);
- pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true);
- pei->finished = false;
-}
-
-/*
* Sets up the required infrastructure for backend workers to perform
* execution and return results to the main backend.
*/
@@ -599,7 +593,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers,
ExecParallelInitializeDSM(planstate, &d);
/*
- * Make sure that the world hasn't shifted under our feat. This could
+ * Make sure that the world hasn't shifted under our feet. This could
* probably just be an Assert(), but let's be conservative for now.
*/
if (e.nnodes != d.nnodes)
@@ -610,6 +604,82 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers,
}
/*
+ * Re-initialize the parallel executor shared memory state before launching
+ * a fresh batch of workers.
+ */
+void
+ExecParallelReinitialize(PlanState *planstate,
+ ParallelExecutorInfo *pei)
+{
+ /* Old workers must already be shut down */
+ Assert(pei->finished);
+
+ ReinitializeParallelDSM(pei->pcxt);
+ pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true);
+ pei->finished = false;
+
+ /* Traverse plan tree and let each child node reset associated state. */
+ ExecParallelReInitializeDSM(planstate, pei->pcxt);
+}
+
+/*
+ * Traverse plan tree to reinitialize per-node dynamic shared memory state
+ */
+static bool
+ExecParallelReInitializeDSM(PlanState *planstate,
+ ParallelContext *pcxt)
+{
+ if (planstate == NULL)
+ return false;
+
+ /*
+ * Call reinitializers for DSM-using plan nodes.
+ */
+ switch (nodeTag(planstate))
+ {
+ case T_SeqScanState:
+ if (planstate->plan->parallel_aware)
+ ExecSeqScanReInitializeDSM((SeqScanState *) planstate,
+ pcxt);
+ break;
+ case T_IndexScanState:
+ if (planstate->plan->parallel_aware)
+ ExecIndexScanReInitializeDSM((IndexScanState *) planstate,
+ pcxt);
+ break;
+ case T_IndexOnlyScanState:
+ if (planstate->plan->parallel_aware)
+ ExecIndexOnlyScanReInitializeDSM((IndexOnlyScanState *) planstate,
+ pcxt);
+ break;
+ case T_ForeignScanState:
+ if (planstate->plan->parallel_aware)
+ ExecForeignScanReInitializeDSM((ForeignScanState *) planstate,
+ pcxt);
+ break;
+ case T_CustomScanState:
+ if (planstate->plan->parallel_aware)
+ ExecCustomScanReInitializeDSM((CustomScanState *) planstate,
+ pcxt);
+ break;
+ case T_BitmapHeapScanState:
+ if (planstate->plan->parallel_aware)
+ ExecBitmapHeapReInitializeDSM((BitmapHeapScanState *) planstate,
+ pcxt);
+ break;
+ case T_SortState:
+ /* even when not parallel-aware */
+ ExecSortReInitializeDSM((SortState *) planstate, pcxt);
+ break;
+
+ default:
+ break;
+ }
+
+ return planstate_tree_walker(planstate, ExecParallelReInitializeDSM, pcxt);
+}
+
+/*
* Copy instrumentation information about this node and its descendants from
* dynamic shared memory.
*/
@@ -845,12 +915,13 @@ ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc)
break;
case T_BitmapHeapScanState:
if (planstate->plan->parallel_aware)
- ExecBitmapHeapInitializeWorker(
- (BitmapHeapScanState *) planstate, toc);
+ ExecBitmapHeapInitializeWorker((BitmapHeapScanState *) planstate, toc);
break;
case T_SortState:
/* even when not parallel-aware */
ExecSortInitializeWorker((SortState *) planstate, toc);
+ break;
+
default:
break;
}