aboutsummaryrefslogtreecommitdiff
path: root/src/backend/executor/execParallel.c
diff options
context:
space:
mode:
authorRobert Haas <rhaas@postgresql.org>2015-12-09 13:18:09 -0500
committerRobert Haas <rhaas@postgresql.org>2015-12-09 13:21:19 -0500
commitb287df70e4080350aa471ecca428be145581dd4d (patch)
tree10f57a8c7d1029fa1c1b7caabf24ce0925f23b07 /src/backend/executor/execParallel.c
parent25c539233044c235e97fd7c9dc600fb5f08fe065 (diff)
downloadpostgresql-b287df70e4080350aa471ecca428be145581dd4d.tar.gz
postgresql-b287df70e4080350aa471ecca428be145581dd4d.zip
Allow EXPLAIN (ANALYZE, VERBOSE) to display per-worker statistics.
The original parallel sequential scan commit included only very limited changes to the EXPLAIN output. Aggregated totals from all workers were displayed, but there was no way to see what each individual worker did or to distinguish the effort made by the workers from the effort made by the leader. Per a gripe by Thom Brown (and maybe others). Patch by me, reviewed by Amit Kapila.
Diffstat (limited to 'src/backend/executor/execParallel.c')
-rw-r--r--src/backend/executor/execParallel.c98
1 files changed, 57 insertions, 41 deletions
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index 67300377109..30e6b3d2a72 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -49,20 +49,18 @@
#define PARALLEL_TUPLE_QUEUE_SIZE 65536
/* DSM structure for accumulating per-PlanState instrumentation. */
-typedef struct SharedPlanStateInstrumentation
-{
- int plan_node_id;
- slock_t mutex;
- Instrumentation instr;
-} SharedPlanStateInstrumentation;
-
-/* DSM structure for accumulating per-PlanState instrumentation. */
struct SharedExecutorInstrumentation
{
int instrument_options;
- int ps_ninstrument; /* # of ps_instrument structures following */
- SharedPlanStateInstrumentation ps_instrument[FLEXIBLE_ARRAY_MEMBER];
+ int instrument_offset; /* offset of first Instrumentation struct */
+ int num_workers; /* # of workers */
+ int num_plan_nodes; /* # of plan nodes */
+ int plan_node_id[FLEXIBLE_ARRAY_MEMBER]; /* array of plan node IDs */
+ /* array of num_plan_nodes * num_workers Instrumentation objects follows */
};
+#define GetInstrumentationArray(sei) \
+ (AssertVariableIsOfTypeMacro(sei, SharedExecutorInstrumentation *), \
+ (Instrumentation *) (((char *) sei) + sei->instrument_offset))
/* Context object for ExecParallelEstimate. */
typedef struct ExecParallelEstimateContext
@@ -196,18 +194,10 @@ ExecParallelInitializeDSM(PlanState *planstate,
if (planstate == NULL)
return false;
- /* If instrumentation is enabled, initialize array slot for this node. */
+ /* If instrumentation is enabled, initialize slot for this node. */
if (d->instrumentation != NULL)
- {
- SharedPlanStateInstrumentation *instrumentation;
-
- instrumentation = &d->instrumentation->ps_instrument[d->nnodes];
- Assert(d->nnodes < d->instrumentation->ps_ninstrument);
- instrumentation->plan_node_id = planstate->plan->plan_node_id;
- SpinLockInit(&instrumentation->mutex);
- InstrInit(&instrumentation->instr,
- d->instrumentation->instrument_options);
- }
+ d->instrumentation->plan_node_id[d->nnodes] =
+ planstate->plan->plan_node_id;
/* Count this node. */
d->nnodes++;
@@ -307,6 +297,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
int pstmt_len;
int param_len;
int instrumentation_len = 0;
+ int instrument_offset = 0;
/* Allocate object for return value. */
pei = palloc0(sizeof(ParallelExecutorInfo));
@@ -364,8 +355,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
if (estate->es_instrument)
{
instrumentation_len =
- offsetof(SharedExecutorInstrumentation, ps_instrument)
- + sizeof(SharedPlanStateInstrumentation) * e.nnodes;
+ offsetof(SharedExecutorInstrumentation, plan_node_id)
+ + sizeof(int) * e.nnodes;
+ instrumentation_len = MAXALIGN(instrumentation_len);
+ instrument_offset = instrumentation_len;
+ instrumentation_len += sizeof(Instrumentation) * e.nnodes * nworkers;
shm_toc_estimate_chunk(&pcxt->estimator, instrumentation_len);
shm_toc_estimate_keys(&pcxt->estimator, 1);
}
@@ -407,9 +401,17 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
*/
if (estate->es_instrument)
{
+ Instrumentation *instrument;
+ int i;
+
instrumentation = shm_toc_allocate(pcxt->toc, instrumentation_len);
instrumentation->instrument_options = estate->es_instrument;
- instrumentation->ps_ninstrument = e.nnodes;
+ instrumentation->instrument_offset = instrument_offset;
+ instrumentation->num_workers = nworkers;
+ instrumentation->num_plan_nodes = e.nnodes;
+ instrument = GetInstrumentationArray(instrumentation);
+ for (i = 0; i < nworkers * e.nnodes; ++i)
+ InstrInit(&instrument[i], estate->es_instrument);
shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION,
instrumentation);
pei->instrumentation = instrumentation;
@@ -444,20 +446,31 @@ static bool
ExecParallelRetrieveInstrumentation(PlanState *planstate,
SharedExecutorInstrumentation *instrumentation)
{
+ Instrumentation *instrument;
int i;
+ int n;
+ int ibytes;
int plan_node_id = planstate->plan->plan_node_id;
- SharedPlanStateInstrumentation *ps_instrument;
/* Find the instumentation for this node. */
- for (i = 0; i < instrumentation->ps_ninstrument; ++i)
- if (instrumentation->ps_instrument[i].plan_node_id == plan_node_id)
+ for (i = 0; i < instrumentation->num_plan_nodes; ++i)
+ if (instrumentation->plan_node_id[i] == plan_node_id)
break;
- if (i >= instrumentation->ps_ninstrument)
+ if (i >= instrumentation->num_plan_nodes)
elog(ERROR, "plan node %d not found", plan_node_id);
- /* No need to acquire the spinlock here; workers have exited already. */
- ps_instrument = &instrumentation->ps_instrument[i];
- InstrAggNode(planstate->instrument, &ps_instrument->instr);
+ /* Accumulate the statistics from all workers. */
+ instrument = GetInstrumentationArray(instrumentation);
+ instrument += i * instrumentation->num_workers;
+ for (n = 0; n < instrumentation->num_workers; ++n)
+ InstrAggNode(planstate->instrument, &instrument[n]);
+
+ /* Also store the per-worker detail. */
+ ibytes = instrumentation->num_workers * sizeof(Instrumentation);
+ planstate->worker_instrument =
+ palloc(offsetof(WorkerInstrumentation, instrument) + ibytes);
+ planstate->worker_instrument->num_workers = instrumentation->num_workers;
+ memcpy(&planstate->worker_instrument->instrument, instrument, ibytes);
return planstate_tree_walker(planstate, ExecParallelRetrieveInstrumentation,
instrumentation);
@@ -568,7 +581,9 @@ ExecParallelReportInstrumentation(PlanState *planstate,
{
int i;
int plan_node_id = planstate->plan->plan_node_id;
- SharedPlanStateInstrumentation *ps_instrument;
+ Instrumentation *instrument;
+
+ InstrEndLoop(planstate->instrument);
/*
* If we shuffled the plan_node_id values in ps_instrument into sorted
@@ -576,20 +591,21 @@ ExecParallelReportInstrumentation(PlanState *planstate,
* if we're pushing down sufficiently large plan trees. For now, do it
* the slow, dumb way.
*/
- for (i = 0; i < instrumentation->ps_ninstrument; ++i)
- if (instrumentation->ps_instrument[i].plan_node_id == plan_node_id)
+ for (i = 0; i < instrumentation->num_plan_nodes; ++i)
+ if (instrumentation->plan_node_id[i] == plan_node_id)
break;
- if (i >= instrumentation->ps_ninstrument)
+ if (i >= instrumentation->num_plan_nodes)
elog(ERROR, "plan node %d not found", plan_node_id);
/*
- * There's one SharedPlanStateInstrumentation per plan_node_id, so we
- * must use a spinlock in case multiple workers report at the same time.
+ * Add our statistics to the per-node, per-worker totals. It's possible
+ * that this could happen more than once if we relaunched workers.
*/
- ps_instrument = &instrumentation->ps_instrument[i];
- SpinLockAcquire(&ps_instrument->mutex);
- InstrAggNode(&ps_instrument->instr, planstate->instrument);
- SpinLockRelease(&ps_instrument->mutex);
+ instrument = GetInstrumentationArray(instrumentation);
+ instrument += i * instrumentation->num_workers;
+ Assert(IsParallelWorker());
+ Assert(ParallelWorkerNumber < instrumentation->num_workers);
+ InstrAggNode(&instrument[ParallelWorkerNumber], planstate->instrument);
return planstate_tree_walker(planstate, ExecParallelReportInstrumentation,
instrumentation);