diff options
Diffstat (limited to 'src/backend/executor/execParallel.c')
-rw-r--r-- | src/backend/executor/execParallel.c | 101 |
1 files changed, 86 insertions, 15 deletions
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index 01316ff5d94..c713b851399 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -119,6 +119,8 @@ static bool ExecParallelInitializeDSM(PlanState *node, ExecParallelInitializeDSMContext *d); static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize); +static bool ExecParallelReInitializeDSM(PlanState *planstate, + ParallelContext *pcxt); static bool ExecParallelRetrieveInstrumentation(PlanState *planstate, SharedExecutorInstrumentation *instrumentation); @@ -255,6 +257,8 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e) case T_SortState: /* even when not parallel-aware */ ExecSortEstimate((SortState *) planstate, e->pcxt); + break; + default: break; } @@ -325,6 +329,8 @@ ExecParallelInitializeDSM(PlanState *planstate, case T_SortState: /* even when not parallel-aware */ ExecSortInitializeDSM((SortState *) planstate, d->pcxt); + break; + default: break; } @@ -385,18 +391,6 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize) } /* - * Re-initialize the parallel executor info such that it can be reused by - * workers. - */ -void -ExecParallelReinitialize(ParallelExecutorInfo *pei) -{ - ReinitializeParallelDSM(pei->pcxt); - pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true); - pei->finished = false; -} - -/* * Sets up the required infrastructure for backend workers to perform * execution and return results to the main backend. */ @@ -599,7 +593,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers, ExecParallelInitializeDSM(planstate, &d); /* - * Make sure that the world hasn't shifted under our feat. This could + * Make sure that the world hasn't shifted under our feet. This could * probably just be an Assert(), but let's be conservative for now. */ if (e.nnodes != d.nnodes) @@ -610,6 +604,82 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers, } /* + * Re-initialize the parallel executor shared memory state before launching + * a fresh batch of workers. + */ +void +ExecParallelReinitialize(PlanState *planstate, + ParallelExecutorInfo *pei) +{ + /* Old workers must already be shut down */ + Assert(pei->finished); + + ReinitializeParallelDSM(pei->pcxt); + pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true); + pei->finished = false; + + /* Traverse plan tree and let each child node reset associated state. */ + ExecParallelReInitializeDSM(planstate, pei->pcxt); +} + +/* + * Traverse plan tree to reinitialize per-node dynamic shared memory state + */ +static bool +ExecParallelReInitializeDSM(PlanState *planstate, + ParallelContext *pcxt) +{ + if (planstate == NULL) + return false; + + /* + * Call reinitializers for DSM-using plan nodes. + */ + switch (nodeTag(planstate)) + { + case T_SeqScanState: + if (planstate->plan->parallel_aware) + ExecSeqScanReInitializeDSM((SeqScanState *) planstate, + pcxt); + break; + case T_IndexScanState: + if (planstate->plan->parallel_aware) + ExecIndexScanReInitializeDSM((IndexScanState *) planstate, + pcxt); + break; + case T_IndexOnlyScanState: + if (planstate->plan->parallel_aware) + ExecIndexOnlyScanReInitializeDSM((IndexOnlyScanState *) planstate, + pcxt); + break; + case T_ForeignScanState: + if (planstate->plan->parallel_aware) + ExecForeignScanReInitializeDSM((ForeignScanState *) planstate, + pcxt); + break; + case T_CustomScanState: + if (planstate->plan->parallel_aware) + ExecCustomScanReInitializeDSM((CustomScanState *) planstate, + pcxt); + break; + case T_BitmapHeapScanState: + if (planstate->plan->parallel_aware) + ExecBitmapHeapReInitializeDSM((BitmapHeapScanState *) planstate, + pcxt); + break; + case T_SortState: + /* even when not parallel-aware */ + ExecSortReInitializeDSM((SortState *) planstate, pcxt); + break; + + default: + break; + } + + return planstate_tree_walker(planstate, ExecParallelReInitializeDSM, pcxt); +} + +/* * Copy instrumentation information about this node and its descendants from * dynamic shared memory. */ @@ -845,12 +915,13 @@ ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc) break; case T_BitmapHeapScanState: if (planstate->plan->parallel_aware) - ExecBitmapHeapInitializeWorker( - (BitmapHeapScanState *) planstate, toc); + ExecBitmapHeapInitializeWorker((BitmapHeapScanState *) planstate, toc); break; case T_SortState: /* even when not parallel-aware */ ExecSortInitializeWorker((SortState *) planstate, toc); + break; + default: break; } |