diff options
Diffstat (limited to 'src/backend/executor/execParallel.c')
-rw-r--r-- | src/backend/executor/execParallel.c | 98 |
1 files changed, 57 insertions, 41 deletions
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index 67300377109..30e6b3d2a72 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -49,20 +49,18 @@ #define PARALLEL_TUPLE_QUEUE_SIZE 65536 /* DSM structure for accumulating per-PlanState instrumentation. */ -typedef struct SharedPlanStateInstrumentation -{ - int plan_node_id; - slock_t mutex; - Instrumentation instr; -} SharedPlanStateInstrumentation; - -/* DSM structure for accumulating per-PlanState instrumentation. */ struct SharedExecutorInstrumentation { int instrument_options; - int ps_ninstrument; /* # of ps_instrument structures following */ - SharedPlanStateInstrumentation ps_instrument[FLEXIBLE_ARRAY_MEMBER]; + int instrument_offset; /* offset of first Instrumentation struct */ + int num_workers; /* # of workers */ + int num_plan_nodes; /* # of plan nodes */ + int plan_node_id[FLEXIBLE_ARRAY_MEMBER]; /* array of plan node IDs */ + /* array of num_plan_nodes * num_workers Instrumentation objects follows */ }; +#define GetInstrumentationArray(sei) \ + (AssertVariableIsOfTypeMacro(sei, SharedExecutorInstrumentation *), \ + (Instrumentation *) (((char *) sei) + sei->instrument_offset)) /* Context object for ExecParallelEstimate. */ typedef struct ExecParallelEstimateContext @@ -196,18 +194,10 @@ ExecParallelInitializeDSM(PlanState *planstate, if (planstate == NULL) return false; - /* If instrumentation is enabled, initialize array slot for this node. */ + /* If instrumentation is enabled, initialize slot for this node. */ if (d->instrumentation != NULL) - { - SharedPlanStateInstrumentation *instrumentation; - - instrumentation = &d->instrumentation->ps_instrument[d->nnodes]; - Assert(d->nnodes < d->instrumentation->ps_ninstrument); - instrumentation->plan_node_id = planstate->plan->plan_node_id; - SpinLockInit(&instrumentation->mutex); - InstrInit(&instrumentation->instr, - d->instrumentation->instrument_options); - } + d->instrumentation->plan_node_id[d->nnodes] = + planstate->plan->plan_node_id; /* Count this node. */ d->nnodes++; @@ -307,6 +297,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers) int pstmt_len; int param_len; int instrumentation_len = 0; + int instrument_offset = 0; /* Allocate object for return value. */ pei = palloc0(sizeof(ParallelExecutorInfo)); @@ -364,8 +355,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers) if (estate->es_instrument) { instrumentation_len = - offsetof(SharedExecutorInstrumentation, ps_instrument) - + sizeof(SharedPlanStateInstrumentation) * e.nnodes; + offsetof(SharedExecutorInstrumentation, plan_node_id) + + sizeof(int) * e.nnodes; + instrumentation_len = MAXALIGN(instrumentation_len); + instrument_offset = instrumentation_len; + instrumentation_len += sizeof(Instrumentation) * e.nnodes * nworkers; shm_toc_estimate_chunk(&pcxt->estimator, instrumentation_len); shm_toc_estimate_keys(&pcxt->estimator, 1); } @@ -407,9 +401,17 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers) */ if (estate->es_instrument) { + Instrumentation *instrument; + int i; + instrumentation = shm_toc_allocate(pcxt->toc, instrumentation_len); instrumentation->instrument_options = estate->es_instrument; - instrumentation->ps_ninstrument = e.nnodes; + instrumentation->instrument_offset = instrument_offset; + instrumentation->num_workers = nworkers; + instrumentation->num_plan_nodes = e.nnodes; + instrument = GetInstrumentationArray(instrumentation); + for (i = 0; i < nworkers * e.nnodes; ++i) + InstrInit(&instrument[i], estate->es_instrument); shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION, instrumentation); pei->instrumentation = instrumentation; @@ -444,20 +446,31 @@ static bool ExecParallelRetrieveInstrumentation(PlanState *planstate, SharedExecutorInstrumentation *instrumentation) { + Instrumentation *instrument; int i; + int n; + int ibytes; int plan_node_id = planstate->plan->plan_node_id; - SharedPlanStateInstrumentation *ps_instrument; /* Find the instumentation for this node. */ - for (i = 0; i < instrumentation->ps_ninstrument; ++i) - if (instrumentation->ps_instrument[i].plan_node_id == plan_node_id) + for (i = 0; i < instrumentation->num_plan_nodes; ++i) + if (instrumentation->plan_node_id[i] == plan_node_id) break; - if (i >= instrumentation->ps_ninstrument) + if (i >= instrumentation->num_plan_nodes) elog(ERROR, "plan node %d not found", plan_node_id); - /* No need to acquire the spinlock here; workers have exited already. */ - ps_instrument = &instrumentation->ps_instrument[i]; - InstrAggNode(planstate->instrument, &ps_instrument->instr); + /* Accumulate the statistics from all workers. */ + instrument = GetInstrumentationArray(instrumentation); + instrument += i * instrumentation->num_workers; + for (n = 0; n < instrumentation->num_workers; ++n) + InstrAggNode(planstate->instrument, &instrument[n]); + + /* Also store the per-worker detail. */ + ibytes = instrumentation->num_workers * sizeof(Instrumentation); + planstate->worker_instrument = + palloc(offsetof(WorkerInstrumentation, instrument) + ibytes); + planstate->worker_instrument->num_workers = instrumentation->num_workers; + memcpy(&planstate->worker_instrument->instrument, instrument, ibytes); return planstate_tree_walker(planstate, ExecParallelRetrieveInstrumentation, instrumentation); @@ -568,7 +581,9 @@ ExecParallelReportInstrumentation(PlanState *planstate, { int i; int plan_node_id = planstate->plan->plan_node_id; - SharedPlanStateInstrumentation *ps_instrument; + Instrumentation *instrument; + + InstrEndLoop(planstate->instrument); /* * If we shuffled the plan_node_id values in ps_instrument into sorted @@ -576,20 +591,21 @@ ExecParallelReportInstrumentation(PlanState *planstate, * if we're pushing down sufficiently large plan trees. For now, do it * the slow, dumb way. */ - for (i = 0; i < instrumentation->ps_ninstrument; ++i) - if (instrumentation->ps_instrument[i].plan_node_id == plan_node_id) + for (i = 0; i < instrumentation->num_plan_nodes; ++i) + if (instrumentation->plan_node_id[i] == plan_node_id) break; - if (i >= instrumentation->ps_ninstrument) + if (i >= instrumentation->num_plan_nodes) elog(ERROR, "plan node %d not found", plan_node_id); /* - * There's one SharedPlanStateInstrumentation per plan_node_id, so we - * must use a spinlock in case multiple workers report at the same time. + * Add our statistics to the per-node, per-worker totals. It's possible + * that this could happen more than once if we relaunched workers. */ - ps_instrument = &instrumentation->ps_instrument[i]; - SpinLockAcquire(&ps_instrument->mutex); - InstrAggNode(&ps_instrument->instr, planstate->instrument); - SpinLockRelease(&ps_instrument->mutex); + instrument = GetInstrumentationArray(instrumentation); + instrument += i * instrumentation->num_workers; + Assert(IsParallelWorker()); + Assert(ParallelWorkerNumber < instrumentation->num_workers); + InstrAggNode(&instrument[ParallelWorkerNumber], planstate->instrument); return planstate_tree_walker(planstate, ExecParallelReportInstrumentation, instrumentation); |