aboutsummaryrefslogtreecommitdiff
path: root/src/backend/executor/execParallel.c
diff options
context:
space:
mode:
authorTom Lane <tgl@sss.pgh.pa.us>2017-08-30 13:18:16 -0400
committerTom Lane <tgl@sss.pgh.pa.us>2017-08-30 13:18:16 -0400
commit41b0dd987d44089dc48e9c70024277e253b396b7 (patch)
treec42eeeb2f175764a9b1ad9c095f8a46057078eb3 /src/backend/executor/execParallel.c
parent6c2c5bea3cec4c874d1ee225bb6e222055c03d75 (diff)
downloadpostgresql-41b0dd987d44089dc48e9c70024277e253b396b7.tar.gz
postgresql-41b0dd987d44089dc48e9c70024277e253b396b7.zip
Separate reinitialization of shared parallel-scan state from ExecReScan.
Previously, the parallel executor logic did reinitialization of shared state within the ExecReScan code for parallel-aware scan nodes. This is problematic, because it means that the ExecReScan call has to occur synchronously (ie, during the parent Gather node's ReScan call). That is swimming very much against the tide so far as the ExecReScan machinery is concerned; the fact that it works at all today depends on a lot of fragile assumptions, such as that no plan node between Gather and a parallel-aware scan node is parameterized. Another objection is that because ExecReScan might be called in workers as well as the leader, hacky extra tests are needed in some places to prevent unwanted shared-state resets. Hence, let's separate this code into two functions, a ReInitializeDSM call and the ReScan call proper. ReInitializeDSM is called only in the leader and is guaranteed to run before we start new workers. ReScan is returned to its traditional function of resetting only local state, which means that ExecReScan's usual habits of delaying or eliminating child rescan calls are safe again. As with the preceding commit 7df2c1f8d, it doesn't seem to be necessary to make these changes in 9.6, which is a good thing because the FDW and CustomScan APIs are impacted. Discussion: https://postgr.es/m/CAA4eK1JkByysFJNh9M349u_nNjqETuEnY_y1VUc_kJiU0bxtaQ@mail.gmail.com
Diffstat (limited to 'src/backend/executor/execParallel.c')
-rw-r--r--src/backend/executor/execParallel.c101
1 files changed, 86 insertions, 15 deletions
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index 01316ff5d94..c713b851399 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -119,6 +119,8 @@ static bool ExecParallelInitializeDSM(PlanState *node,
ExecParallelInitializeDSMContext *d);
static shm_mq_handle **ExecParallelSetupTupleQueues(ParallelContext *pcxt,
bool reinitialize);
+static bool ExecParallelReInitializeDSM(PlanState *planstate,
+ ParallelContext *pcxt);
static bool ExecParallelRetrieveInstrumentation(PlanState *planstate,
SharedExecutorInstrumentation *instrumentation);
@@ -255,6 +257,8 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
case T_SortState:
/* even when not parallel-aware */
ExecSortEstimate((SortState *) planstate, e->pcxt);
+ break;
+
default:
break;
}
@@ -325,6 +329,8 @@ ExecParallelInitializeDSM(PlanState *planstate,
case T_SortState:
/* even when not parallel-aware */
ExecSortInitializeDSM((SortState *) planstate, d->pcxt);
+ break;
+
default:
break;
}
@@ -385,18 +391,6 @@ ExecParallelSetupTupleQueues(ParallelContext *pcxt, bool reinitialize)
}
/*
- * Re-initialize the parallel executor info such that it can be reused by
- * workers.
- */
-void
-ExecParallelReinitialize(ParallelExecutorInfo *pei)
-{
- ReinitializeParallelDSM(pei->pcxt);
- pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true);
- pei->finished = false;
-}
-
-/*
* Sets up the required infrastructure for backend workers to perform
* execution and return results to the main backend.
*/
@@ -599,7 +593,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers,
ExecParallelInitializeDSM(planstate, &d);
/*
- * Make sure that the world hasn't shifted under our feat. This could
+ * Make sure that the world hasn't shifted under our feet. This could
* probably just be an Assert(), but let's be conservative for now.
*/
if (e.nnodes != d.nnodes)
@@ -610,6 +604,82 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers,
}
/*
+ * Re-initialize the parallel executor shared memory state before launching
+ * a fresh batch of workers.
+ */
+void
+ExecParallelReinitialize(PlanState *planstate,
+ ParallelExecutorInfo *pei)
+{
+ /* Old workers must already be shut down */
+ Assert(pei->finished);
+
+ ReinitializeParallelDSM(pei->pcxt);
+ pei->tqueue = ExecParallelSetupTupleQueues(pei->pcxt, true);
+ pei->finished = false;
+
+ /* Traverse plan tree and let each child node reset associated state. */
+ ExecParallelReInitializeDSM(planstate, pei->pcxt);
+}
+
+/*
+ * Traverse plan tree to reinitialize per-node dynamic shared memory state
+ */
+static bool
+ExecParallelReInitializeDSM(PlanState *planstate,
+ ParallelContext *pcxt)
+{
+ if (planstate == NULL)
+ return false;
+
+ /*
+ * Call reinitializers for DSM-using plan nodes.
+ */
+ switch (nodeTag(planstate))
+ {
+ case T_SeqScanState:
+ if (planstate->plan->parallel_aware)
+ ExecSeqScanReInitializeDSM((SeqScanState *) planstate,
+ pcxt);
+ break;
+ case T_IndexScanState:
+ if (planstate->plan->parallel_aware)
+ ExecIndexScanReInitializeDSM((IndexScanState *) planstate,
+ pcxt);
+ break;
+ case T_IndexOnlyScanState:
+ if (planstate->plan->parallel_aware)
+ ExecIndexOnlyScanReInitializeDSM((IndexOnlyScanState *) planstate,
+ pcxt);
+ break;
+ case T_ForeignScanState:
+ if (planstate->plan->parallel_aware)
+ ExecForeignScanReInitializeDSM((ForeignScanState *) planstate,
+ pcxt);
+ break;
+ case T_CustomScanState:
+ if (planstate->plan->parallel_aware)
+ ExecCustomScanReInitializeDSM((CustomScanState *) planstate,
+ pcxt);
+ break;
+ case T_BitmapHeapScanState:
+ if (planstate->plan->parallel_aware)
+ ExecBitmapHeapReInitializeDSM((BitmapHeapScanState *) planstate,
+ pcxt);
+ break;
+ case T_SortState:
+ /* even when not parallel-aware */
+ ExecSortReInitializeDSM((SortState *) planstate, pcxt);
+ break;
+
+ default:
+ break;
+ }
+
+ return planstate_tree_walker(planstate, ExecParallelReInitializeDSM, pcxt);
+}
+
+/*
* Copy instrumentation information about this node and its descendants from
* dynamic shared memory.
*/
@@ -845,12 +915,13 @@ ExecParallelInitializeWorker(PlanState *planstate, shm_toc *toc)
break;
case T_BitmapHeapScanState:
if (planstate->plan->parallel_aware)
- ExecBitmapHeapInitializeWorker(
- (BitmapHeapScanState *) planstate, toc);
+ ExecBitmapHeapInitializeWorker((BitmapHeapScanState *) planstate, toc);
break;
case T_SortState:
/* even when not parallel-aware */
ExecSortInitializeWorker((SortState *) planstate, toc);
+ break;
+
default:
break;
}