diff options
author | Tom Lane <tgl@sss.pgh.pa.us> | 2017-08-30 13:18:16 -0400 |
---|---|---|
committer | Tom Lane <tgl@sss.pgh.pa.us> | 2017-08-30 13:18:16 -0400 |
commit | 41b0dd987d44089dc48e9c70024277e253b396b7 (patch) | |
tree | c42eeeb2f175764a9b1ad9c095f8a46057078eb3 /src/backend/executor/execParallel.c | |
parent | 6c2c5bea3cec4c874d1ee225bb6e222055c03d75 (diff) | |
download | postgresql-41b0dd987d44089dc48e9c70024277e253b396b7.tar.gz postgresql-41b0dd987d44089dc48e9c70024277e253b396b7.zip |
Separate reinitialization of shared parallel-scan state from ExecReScan.
Previously, the parallel executor logic did reinitialization of shared
state within the ExecReScan code for parallel-aware scan nodes. This is
problematic, because it means that the ExecReScan call has to occur
synchronously (ie, during the parent Gather node's ReScan call). That is
swimming very much against the tide so far as the ExecReScan machinery is
concerned; the fact that it works at all today depends on a lot of fragile
assumptions, such as that no plan node between Gather and a parallel-aware
scan node is parameterized. Another objection is that because ExecReScan
might be called in workers as well as the leader, hacky extra tests are
needed in some places to prevent unwanted shared-state resets.
Hence, let's separate this code into two functions, a ReInitializeDSM
call and the ReScan call proper. ReInitializeDSM is called only in
the leader and is guaranteed to run before we start new workers.
ReScan is returned to its traditional function of resetting only local
state, which means that ExecReScan's usual habits of delaying or
eliminating child rescan calls are safe again.
As with the preceding commit 7df2c1f8d, it doesn't seem to be necessary
to make these changes in 9.6, which is a good thing because the FDW and
CustomScan APIs are impacted.
Discussion: https://postgr.es/m/CAA4eK1JkByysFJNh9M349u_nNjqETuEnY_y1VUc_kJiU0bxtaQ@mail.gmail.com
Diffstat (limited to 'src/backend/executor/execParallel.c')
-rw-r--r-- | src/backend/executor/execParallel.c | 101 |
1 files changed, 86 insertions, 15 deletions
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index 01316ff5d94..c713b851399 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -119,6 +119,8 @@ static bool ExecParallelInitializeDSM(PlanState *node, ExecParallelInitializeDSMContext *d); static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize); +static bool ExecParallelReInitializeDSM(PlanState *planstate, + ParallelContext *pcxt); static bool ExecParallelRetrieveInstrumentation(PlanState *planstate, SharedExecutorInstrumentation *instrumentation); @@ -255,6 +257,8 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e) case T_SortState: /* even when not parallel-aware */ ExecSortEstimate((SortState *) planstate, e->pcxt); + break; + default: break; } @@ -325,6 +329,8 @@ ExecParallelInitializeDSM(PlanState *planstate, case T_SortState: /* even when not parallel-aware */ ExecSortInitializeDSM((SortState *) planstate, d->pcxt); + break; + default: break; } @@ -385,18 +391,6 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize) } /* - * Re-initialize the parallel executor info such that it can be reused by - * workers. - */ -void -ExecParallelReinitialize(ParallelExecutorInfo *pei) -{ - ReinitializeParallelDSM(pei->pcxt); - pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true); - pei->finished = false; -} - -/* * Sets up the required infrastructure for backend workers to perform * execution and return results to the main backend. */ @@ -599,7 +593,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers, ExecParallelInitializeDSM(planstate, &d); /* - * Make sure that the world hasn't shifted under our feat. This could + * Make sure that the world hasn't shifted under our feet. This could * probably just be an Assert(), but let's be conservative for now. */ if (e.nnodes != d.nnodes) @@ -610,6 +604,82 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers, } /* + * Re-initialize the parallel executor shared memory state before launching + * a fresh batch of workers. + */ +void +ExecParallelReinitialize(PlanState *planstate, + ParallelExecutorInfo *pei) +{ + /* Old workers must already be shut down */ + Assert(pei->finished); + + ReinitializeParallelDSM(pei->pcxt); + pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true); + pei->finished = false; + + /* Traverse plan tree and let each child node reset associated state. */ + ExecParallelReInitializeDSM(planstate, pei->pcxt); +} + +/* + * Traverse plan tree to reinitialize per-node dynamic shared memory state + */ +static bool +ExecParallelReInitializeDSM(PlanState *planstate, + ParallelContext *pcxt) +{ + if (planstate == NULL) + return false; + + /* + * Call reinitializers for DSM-using plan nodes. + */ + switch (nodeTag(planstate)) + { + case T_SeqScanState: + if (planstate->plan->parallel_aware) + ExecSeqScanReInitializeDSM((SeqScanState *) planstate, + pcxt); + break; + case T_IndexScanState: + if (planstate->plan->parallel_aware) + ExecIndexScanReInitializeDSM((IndexScanState *) planstate, + pcxt); + break; + case T_IndexOnlyScanState: + if (planstate->plan->parallel_aware) + ExecIndexOnlyScanReInitializeDSM((IndexOnlyScanState *) planstate, + pcxt); + break; + case T_ForeignScanState: + if (planstate->plan->parallel_aware) + ExecForeignScanReInitializeDSM((ForeignScanState *) planstate, + pcxt); + break; + case T_CustomScanState: + if (planstate->plan->parallel_aware) + ExecCustomScanReInitializeDSM((CustomScanState *) planstate, + pcxt); + break; + case T_BitmapHeapScanState: + if (planstate->plan->parallel_aware) + ExecBitmapHeapReInitializeDSM((BitmapHeapScanState *) planstate, + pcxt); + break; + case T_SortState: + /* even when not parallel-aware */ + ExecSortReInitializeDSM((SortState *) planstate, pcxt); + break; + + default: + break; + } + + return planstate_tree_walker(planstate, ExecParallelReInitializeDSM, pcxt); +} + +/* * Copy instrumentation information about this node and its descendants from * dynamic shared memory. */ @@ -845,12 +915,13 @@ ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc) break; case T_BitmapHeapScanState: if (planstate->plan->parallel_aware) - ExecBitmapHeapInitializeWorker( - (BitmapHeapScanState *) planstate, toc); + ExecBitmapHeapInitializeWorker((BitmapHeapScanState *) planstate, toc); break; case T_SortState: /* even when not parallel-aware */ ExecSortInitializeWorker((SortState *) planstate, toc); + break; + default: break; } |