aboutsummaryrefslogtreecommitdiff
path: root/src/backend/executor/execParallel.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/executor/execParallel.c')
-rw-r--r--src/backend/executor/execParallel.c98
1 files changed, 57 insertions, 41 deletions
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c
index 67300377109..30e6b3d2a72 100644
--- a/src/backend/executor/execParallel.c
+++ b/src/backend/executor/execParallel.c
@@ -49,20 +49,18 @@
#define PARALLEL_TUPLE_QUEUE_SIZE 65536
/* DSM structure for accumulating per-PlanState instrumentation. */
-typedef struct SharedPlanStateInstrumentation
-{
- int plan_node_id;
- slock_t mutex;
- Instrumentation instr;
-} SharedPlanStateInstrumentation;
-
-/* DSM structure for accumulating per-PlanState instrumentation. */
struct SharedExecutorInstrumentation
{
int instrument_options;
- int ps_ninstrument; /* # of ps_instrument structures following */
- SharedPlanStateInstrumentation ps_instrument[FLEXIBLE_ARRAY_MEMBER];
+ int instrument_offset; /* offset of first Instrumentation struct */
+ int num_workers; /* # of workers */
+ int num_plan_nodes; /* # of plan nodes */
+ int plan_node_id[FLEXIBLE_ARRAY_MEMBER]; /* array of plan node IDs */
+ /* array of num_plan_nodes * num_workers Instrumentation objects follows */
};
+#define GetInstrumentationArray(sei) \
+ (AssertVariableIsOfTypeMacro(sei, SharedExecutorInstrumentation *), \
+ (Instrumentation *) (((char *) sei) + sei->instrument_offset))
/* Context object for ExecParallelEstimate. */
typedef struct ExecParallelEstimateContext
@@ -196,18 +194,10 @@ ExecParallelInitializeDSM(PlanState *planstate,
if (planstate == NULL)
return false;
- /* If instrumentation is enabled, initialize array slot for this node. */
+ /* If instrumentation is enabled, initialize slot for this node. */
if (d->instrumentation != NULL)
- {
- SharedPlanStateInstrumentation *instrumentation;
-
- instrumentation = &d->instrumentation->ps_instrument[d->nnodes];
- Assert(d->nnodes < d->instrumentation->ps_ninstrument);
- instrumentation->plan_node_id = planstate->plan->plan_node_id;
- SpinLockInit(&instrumentation->mutex);
- InstrInit(&instrumentation->instr,
- d->instrumentation->instrument_options);
- }
+ d->instrumentation->plan_node_id[d->nnodes] =
+ planstate->plan->plan_node_id;
/* Count this node. */
d->nnodes++;
@@ -307,6 +297,7 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
int pstmt_len;
int param_len;
int instrumentation_len = 0;
+ int instrument_offset = 0;
/* Allocate object for return value. */
pei = palloc0(sizeof(ParallelExecutorInfo));
@@ -364,8 +355,11 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
if (estate->es_instrument)
{
instrumentation_len =
- offsetof(SharedExecutorInstrumentation, ps_instrument)
- + sizeof(SharedPlanStateInstrumentation) * e.nnodes;
+ offsetof(SharedExecutorInstrumentation, plan_node_id)
+ + sizeof(int) * e.nnodes;
+ instrumentation_len = MAXALIGN(instrumentation_len);
+ instrument_offset = instrumentation_len;
+ instrumentation_len += sizeof(Instrumentation) * e.nnodes * nworkers;
shm_toc_estimate_chunk(&pcxt->estimator, instrumentation_len);
shm_toc_estimate_keys(&pcxt->estimator, 1);
}
@@ -407,9 +401,17 @@ ExecInitParallelPlan(PlanState *planstate, EState *estate, int nworkers)
*/
if (estate->es_instrument)
{
+ Instrumentation *instrument;
+ int i;
+
instrumentation = shm_toc_allocate(pcxt->toc, instrumentation_len);
instrumentation->instrument_options = estate->es_instrument;
- instrumentation->ps_ninstrument = e.nnodes;
+ instrumentation->instrument_offset = instrument_offset;
+ instrumentation->num_workers = nworkers;
+ instrumentation->num_plan_nodes = e.nnodes;
+ instrument = GetInstrumentationArray(instrumentation);
+ for (i = 0; i < nworkers * e.nnodes; ++i)
+ InstrInit(&instrument[i], estate->es_instrument);
shm_toc_insert(pcxt->toc, PARALLEL_KEY_INSTRUMENTATION,
instrumentation);
pei->instrumentation = instrumentation;
@@ -444,20 +446,31 @@ static bool
ExecParallelRetrieveInstrumentation(PlanState *planstate,
SharedExecutorInstrumentation *instrumentation)
{
+ Instrumentation *instrument;
int i;
+ int n;
+ int ibytes;
int plan_node_id = planstate->plan->plan_node_id;
- SharedPlanStateInstrumentation *ps_instrument;
/* Find the instumentation for this node. */
- for (i = 0; i < instrumentation->ps_ninstrument; ++i)
- if (instrumentation->ps_instrument[i].plan_node_id == plan_node_id)
+ for (i = 0; i < instrumentation->num_plan_nodes; ++i)
+ if (instrumentation->plan_node_id[i] == plan_node_id)
break;
- if (i >= instrumentation->ps_ninstrument)
+ if (i >= instrumentation->num_plan_nodes)
elog(ERROR, "plan node %d not found", plan_node_id);
- /* No need to acquire the spinlock here; workers have exited already. */
- ps_instrument = &instrumentation->ps_instrument[i];
- InstrAggNode(planstate->instrument, &ps_instrument->instr);
+ /* Accumulate the statistics from all workers. */
+ instrument = GetInstrumentationArray(instrumentation);
+ instrument += i * instrumentation->num_workers;
+ for (n = 0; n < instrumentation->num_workers; ++n)
+ InstrAggNode(planstate->instrument, &instrument[n]);
+
+ /* Also store the per-worker detail. */
+ ibytes = instrumentation->num_workers * sizeof(Instrumentation);
+ planstate->worker_instrument =
+ palloc(offsetof(WorkerInstrumentation, instrument) + ibytes);
+ planstate->worker_instrument->num_workers = instrumentation->num_workers;
+ memcpy(&planstate->worker_instrument->instrument, instrument, ibytes);
return planstate_tree_walker(planstate, ExecParallelRetrieveInstrumentation,
instrumentation);
@@ -568,7 +581,9 @@ ExecParallelReportInstrumentation(PlanState *planstate,
{
int i;
int plan_node_id = planstate->plan->plan_node_id;
- SharedPlanStateInstrumentation *ps_instrument;
+ Instrumentation *instrument;
+
+ InstrEndLoop(planstate->instrument);
/*
* If we shuffled the plan_node_id values in ps_instrument into sorted
@@ -576,20 +591,21 @@ ExecParallelReportInstrumentation(PlanState *planstate,
* if we're pushing down sufficiently large plan trees. For now, do it
* the slow, dumb way.
*/
- for (i = 0; i < instrumentation->ps_ninstrument; ++i)
- if (instrumentation->ps_instrument[i].plan_node_id == plan_node_id)
+ for (i = 0; i < instrumentation->num_plan_nodes; ++i)
+ if (instrumentation->plan_node_id[i] == plan_node_id)
break;
- if (i >= instrumentation->ps_ninstrument)
+ if (i >= instrumentation->num_plan_nodes)
elog(ERROR, "plan node %d not found", plan_node_id);
/*
- * There's one SharedPlanStateInstrumentation per plan_node_id, so we
- * must use a spinlock in case multiple workers report at the same time.
+ * Add our statistics to the per-node, per-worker totals. It's possible
+ * that this could happen more than once if we relaunched workers.
*/
- ps_instrument = &instrumentation->ps_instrument[i];
- SpinLockAcquire(&ps_instrument->mutex);
- InstrAggNode(&ps_instrument->instr, planstate->instrument);
- SpinLockRelease(&ps_instrument->mutex);
+ instrument = GetInstrumentationArray(instrumentation);
+ instrument += i * instrumentation->num_workers;
+ Assert(IsParallelWorker());
+ Assert(ParallelWorkerNumber < instrumentation->num_workers);
+ InstrAggNode(&instrument[ParallelWorkerNumber], planstate->instrument);
return planstate_tree_walker(planstate, ExecParallelReportInstrumentation,
instrumentation);