diff options
Diffstat (limited to 'src/backend/executor/execParallel.c')
-rw-r--r-- | src/backend/executor/execParallel.c | 85 |
1 files changed, 72 insertions, 13 deletions
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index ce47f1d4a8b..2313b4c45cb 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -109,6 +109,8 @@ static bool ExecParallelInitializeDSM(PlanState *node, ExecParallelInitializeDSMContext *d); static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize); +static bool ExecParallelReInitializeDSM(PlanState *planstate, + ParallelContext *pcxt); static bool ExecParallelRetrieveInstrumentation(PlanState *planstate, SharedExecutorInstrumentation *instrumentation); @@ -365,18 +367,6 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize) } /* - * Re-initialize the parallel executor info such that it can be reused by - * workers. - */ -void -ExecParallelReinitialize(ParallelExecutorInfo *pei) -{ - ReinitializeParallelDSM(pei->pcxt); - pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true); - pei->finished = false; -} - -/* * Sets up the required infrastructure for backend workers to perform * execution and return results to the main backend. */ @@ -567,7 +557,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers) ExecParallelInitializeDSM(planstate, &d); /* - * Make sure that the world hasn't shifted under our feat. This could + * Make sure that the world hasn't shifted under our feet. This could * probably just be an Assert(), but let's be conservative for now. */ if (e.nnodes != d.nnodes) @@ -578,6 +568,75 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers) } /* + * Re-initialize the parallel executor shared memory state before launching + * a fresh batch of workers. + */ +void +ExecParallelReinitialize(PlanState *planstate, + ParallelExecutorInfo *pei) +{ + /* Old workers must already be shut down */ + Assert(pei->finished); + + ReinitializeParallelDSM(pei->pcxt); + pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true); + pei->finished = false; + + /* Traverse plan tree and let each child node reset associated state. */ + ExecParallelReInitializeDSM(planstate, pei->pcxt); +} + +/* + * Traverse plan tree to reinitialize per-node dynamic shared memory state + */ +static bool +ExecParallelReInitializeDSM(PlanState *planstate, + ParallelContext *pcxt) +{ + if (planstate == NULL) + return false; + + /* + * Call reinitializers for DSM-using plan nodes. + */ + if (planstate->plan->parallel_aware) + { + switch (nodeTag(planstate)) + { + case T_SeqScanState: + ExecSeqScanReInitializeDSM((SeqScanState *) planstate, + pcxt); + break; + case T_IndexScanState: + ExecIndexScanReInitializeDSM((IndexScanState *) planstate, + pcxt); + break; + case T_IndexOnlyScanState: + ExecIndexOnlyScanReInitializeDSM((IndexOnlyScanState *) planstate, + pcxt); + break; + case T_ForeignScanState: + ExecForeignScanReInitializeDSM((ForeignScanState *) planstate, + pcxt); + break; + case T_CustomScanState: + ExecCustomScanReInitializeDSM((CustomScanState *) planstate, + pcxt); + break; + case T_BitmapHeapScanState: + ExecBitmapHeapReInitializeDSM((BitmapHeapScanState *) planstate, + pcxt); + break; + + default: + break; + } + } + + return planstate_tree_walker(planstate, ExecParallelReInitializeDSM, pcxt); +} + +/* * Copy instrumentation information about this node and its descendants from * dynamic shared memory. */ |