diff options
Diffstat (limited to 'src/backend/executor')
-rw-r--r-- | src/backend/executor/execParallel.c | 19 | ||||
-rw-r--r-- | src/backend/executor/nodeAppend.c | 331 |
2 files changed, 293 insertions, 57 deletions
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index ff5cff59b0f..558cb08b07e 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -26,6 +26,7 @@ #include "executor/execExpr.h" #include "executor/execParallel.h" #include "executor/executor.h" +#include "executor/nodeAppend.h" #include "executor/nodeBitmapHeapscan.h" #include "executor/nodeCustom.h" #include "executor/nodeForeignscan.h" @@ -250,6 +251,11 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e) ExecForeignScanEstimate((ForeignScanState *) planstate, e->pcxt); break; + case T_AppendState: + if (planstate->plan->parallel_aware) + ExecAppendEstimate((AppendState *) planstate, + e->pcxt); + break; case T_CustomScanState: if (planstate->plan->parallel_aware) ExecCustomScanEstimate((CustomScanState *) planstate, @@ -453,6 +459,11 @@ ExecParallelInitializeDSM(PlanState *planstate, ExecForeignScanInitializeDSM((ForeignScanState *) planstate, d->pcxt); break; + case T_AppendState: + if (planstate->plan->parallel_aware) + ExecAppendInitializeDSM((AppendState *) planstate, + d->pcxt); + break; case T_CustomScanState: if (planstate->plan->parallel_aware) ExecCustomScanInitializeDSM((CustomScanState *) planstate, @@ -884,6 +895,10 @@ ExecParallelReInitializeDSM(PlanState *planstate, ExecForeignScanReInitializeDSM((ForeignScanState *) planstate, pcxt); break; + case T_AppendState: + if (planstate->plan->parallel_aware) + ExecAppendReInitializeDSM((AppendState *) planstate, pcxt); + break; case T_CustomScanState: if (planstate->plan->parallel_aware) ExecCustomScanReInitializeDSM((CustomScanState *) planstate, @@ -1194,6 +1209,10 @@ ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt) ExecForeignScanInitializeWorker((ForeignScanState *) planstate, pwcxt); break; + case T_AppendState: + if (planstate->plan->parallel_aware) + ExecAppendInitializeWorker((AppendState *) planstate, pwcxt); + break; case T_CustomScanState: if (planstate->plan->parallel_aware) ExecCustomScanInitializeWorker((CustomScanState *) planstate, diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c index 1d2fb35d551..246a0b2d852 100644 --- a/src/backend/executor/nodeAppend.c +++ b/src/backend/executor/nodeAppend.c @@ -61,51 +61,27 @@ #include "executor/nodeAppend.h" #include "miscadmin.h" -static TupleTableSlot *ExecAppend(PlanState *pstate); -static bool exec_append_initialize_next(AppendState *appendstate); - - -/* ---------------------------------------------------------------- - * exec_append_initialize_next - * - * Sets up the append state node for the "next" scan. - * - * Returns t iff there is a "next" scan to process. - * ---------------------------------------------------------------- - */ -static bool -exec_append_initialize_next(AppendState *appendstate) +/* Shared state for parallel-aware Append. */ +struct ParallelAppendState { - int whichplan; + LWLock pa_lock; /* mutual exclusion to choose next subplan */ + int pa_next_plan; /* next plan to choose by any worker */ /* - * get information from the append node + * pa_finished[i] should be true if no more workers should select subplan + * i. for a non-partial plan, this should be set to true as soon as a + * worker selects the plan; for a partial plan, it remains false until + * some worker executes the plan to completion. */ - whichplan = appendstate->as_whichplan; + bool pa_finished[FLEXIBLE_ARRAY_MEMBER]; +}; - if (whichplan < 0) - { - /* - * if scanning in reverse, we start at the last scan in the list and - * then proceed back to the first.. in any case we inform ExecAppend - * that we are at the end of the line by returning false - */ - appendstate->as_whichplan = 0; - return false; - } - else if (whichplan >= appendstate->as_nplans) - { - /* - * as above, end the scan if we go beyond the last scan in our list.. - */ - appendstate->as_whichplan = appendstate->as_nplans - 1; - return false; - } - else - { - return true; - } -} +#define INVALID_SUBPLAN_INDEX -1 + +static TupleTableSlot *ExecAppend(PlanState *pstate); +static bool choose_next_subplan_locally(AppendState *node); +static bool choose_next_subplan_for_leader(AppendState *node); +static bool choose_next_subplan_for_worker(AppendState *node); /* ---------------------------------------------------------------- * ExecInitAppend @@ -185,10 +161,15 @@ ExecInitAppend(Append *node, EState *estate, int eflags) appendstate->ps.ps_ProjInfo = NULL; /* - * initialize to scan first subplan + * Parallel-aware append plans must choose the first subplan to execute by + * looking at shared memory, but non-parallel-aware append plans can + * always start with the first subplan. */ - appendstate->as_whichplan = 0; - exec_append_initialize_next(appendstate); + appendstate->as_whichplan = + appendstate->ps.plan->parallel_aware ? INVALID_SUBPLAN_INDEX : 0; + + /* If parallel-aware, this will be overridden later. */ + appendstate->choose_next_subplan = choose_next_subplan_locally; return appendstate; } @@ -204,6 +185,11 @@ ExecAppend(PlanState *pstate) { AppendState *node = castNode(AppendState, pstate); + /* If no subplan has been chosen, we must choose one before proceeding. */ + if (node->as_whichplan == INVALID_SUBPLAN_INDEX && + !node->choose_next_subplan(node)) + return ExecClearTuple(node->ps.ps_ResultTupleSlot); + for (;;) { PlanState *subnode; @@ -214,6 +200,7 @@ ExecAppend(PlanState *pstate) /* * figure out which subplan we are currently processing */ + Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as_nplans); subnode = node->appendplans[node->as_whichplan]; /* @@ -231,19 +218,9 @@ ExecAppend(PlanState *pstate) return result; } - /* - * Go on to the "next" subplan in the appropriate direction. If no - * more subplans, return the empty slot set up for us by - * ExecInitAppend. - */ - if (ScanDirectionIsForward(node->ps.state->es_direction)) - node->as_whichplan++; - else - node->as_whichplan--; - if (!exec_append_initialize_next(node)) + /* choose new subplan; if none, we're done */ + if (!node->choose_next_subplan(node)) return ExecClearTuple(node->ps.ps_ResultTupleSlot); - - /* Else loop back and try to get a tuple from the new subplan */ } } @@ -298,6 +275,246 @@ ExecReScanAppend(AppendState *node) if (subnode->chgParam == NULL) ExecReScan(subnode); } - node->as_whichplan = 0; - exec_append_initialize_next(node); + + node->as_whichplan = + node->ps.plan->parallel_aware ? INVALID_SUBPLAN_INDEX : 0; +} + +/* ---------------------------------------------------------------- + * Parallel Append Support + * ---------------------------------------------------------------- + */ + +/* ---------------------------------------------------------------- + * ExecAppendEstimate + * + * Compute the amount of space we'll need in the parallel + * query DSM, and inform pcxt->estimator about our needs. + * ---------------------------------------------------------------- + */ +void +ExecAppendEstimate(AppendState *node, + ParallelContext *pcxt) +{ + node->pstate_len = + add_size(offsetof(ParallelAppendState, pa_finished), + sizeof(bool) * node->as_nplans); + + shm_toc_estimate_chunk(&pcxt->estimator, node->pstate_len); + shm_toc_estimate_keys(&pcxt->estimator, 1); +} + + +/* ---------------------------------------------------------------- + * ExecAppendInitializeDSM + * + * Set up shared state for Parallel Append. + * ---------------------------------------------------------------- + */ +void +ExecAppendInitializeDSM(AppendState *node, + ParallelContext *pcxt) +{ + ParallelAppendState *pstate; + + pstate = shm_toc_allocate(pcxt->toc, node->pstate_len); + memset(pstate, 0, node->pstate_len); + LWLockInitialize(&pstate->pa_lock, LWTRANCHE_PARALLEL_APPEND); + shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id, pstate); + + node->as_pstate = pstate; + node->choose_next_subplan = choose_next_subplan_for_leader; +} + +/* ---------------------------------------------------------------- + * ExecAppendReInitializeDSM + * + * Reset shared state before beginning a fresh scan. + * ---------------------------------------------------------------- + */ +void +ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt) +{ + ParallelAppendState *pstate = node->as_pstate; + + pstate->pa_next_plan = 0; + memset(pstate->pa_finished, 0, sizeof(bool) * node->as_nplans); +} + +/* ---------------------------------------------------------------- + * ExecAppendInitializeWorker + * + * Copy relevant information from TOC into planstate, and initialize + * whatever is required to choose and execute the optimal subplan. + * ---------------------------------------------------------------- + */ +void +ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt) +{ + node->as_pstate = shm_toc_lookup(pwcxt->toc, node->ps.plan->plan_node_id, false); + node->choose_next_subplan = choose_next_subplan_for_worker; +} + +/* ---------------------------------------------------------------- + * choose_next_subplan_locally + * + * Choose next subplan for a non-parallel-aware Append, + * returning false if there are no more. + * ---------------------------------------------------------------- + */ +static bool +choose_next_subplan_locally(AppendState *node) +{ + int whichplan = node->as_whichplan; + + /* We should never see INVALID_SUBPLAN_INDEX in this case. */ + Assert(whichplan >= 0 && whichplan <= node->as_nplans); + + if (ScanDirectionIsForward(node->ps.state->es_direction)) + { + if (whichplan >= node->as_nplans - 1) + return false; + node->as_whichplan++; + } + else + { + if (whichplan <= 0) + return false; + node->as_whichplan--; + } + + return true; +} + +/* ---------------------------------------------------------------- + * choose_next_subplan_for_leader + * + * Try to pick a plan which doesn't commit us to doing much + * work locally, so that as much work as possible is done in + * the workers. Cheapest subplans are at the end. + * ---------------------------------------------------------------- + */ +static bool +choose_next_subplan_for_leader(AppendState *node) +{ + ParallelAppendState *pstate = node->as_pstate; + Append *append = (Append *) node->ps.plan; + + /* Backward scan is not supported by parallel-aware plans */ + Assert(ScanDirectionIsForward(node->ps.state->es_direction)); + + LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE); + + if (node->as_whichplan != INVALID_SUBPLAN_INDEX) + { + /* Mark just-completed subplan as finished. */ + node->as_pstate->pa_finished[node->as_whichplan] = true; + } + else + { + /* Start with last subplan. */ + node->as_whichplan = node->as_nplans - 1; + } + + /* Loop until we find a subplan to execute. */ + while (pstate->pa_finished[node->as_whichplan]) + { + if (node->as_whichplan == 0) + { + pstate->pa_next_plan = INVALID_SUBPLAN_INDEX; + node->as_whichplan = INVALID_SUBPLAN_INDEX; + LWLockRelease(&pstate->pa_lock); + return false; + } + node->as_whichplan--; + } + + /* If non-partial, immediately mark as finished. */ + if (node->as_whichplan < append->first_partial_plan) + node->as_pstate->pa_finished[node->as_whichplan] = true; + + LWLockRelease(&pstate->pa_lock); + + return true; +} + +/* ---------------------------------------------------------------- + * choose_next_subplan_for_worker + * + * Choose next subplan for a parallel-aware Append, returning + * false if there are no more. + * + * We start from the first plan and advance through the list; + * when we get back to the end, we loop back to the first + * nonpartial plan. This assigns the non-partial plans first + * in order of descending cost and then spreads out the + * workers as evenly as possible across the remaining partial + * plans. + * ---------------------------------------------------------------- + */ +static bool +choose_next_subplan_for_worker(AppendState *node) +{ + ParallelAppendState *pstate = node->as_pstate; + Append *append = (Append *) node->ps.plan; + + /* Backward scan is not supported by parallel-aware plans */ + Assert(ScanDirectionIsForward(node->ps.state->es_direction)); + + LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE); + + /* Mark just-completed subplan as finished. */ + if (node->as_whichplan != INVALID_SUBPLAN_INDEX) + node->as_pstate->pa_finished[node->as_whichplan] = true; + + /* If all the plans are already done, we have nothing to do */ + if (pstate->pa_next_plan == INVALID_SUBPLAN_INDEX) + { + LWLockRelease(&pstate->pa_lock); + return false; + } + + /* Loop until we find a subplan to execute. */ + while (pstate->pa_finished[pstate->pa_next_plan]) + { + if (pstate->pa_next_plan < node->as_nplans - 1) + { + /* Advance to next plan. */ + pstate->pa_next_plan++; + } + else if (append->first_partial_plan < node->as_nplans) + { + /* Loop back to first partial plan. */ + pstate->pa_next_plan = append->first_partial_plan; + } + else + { + /* At last plan, no partial plans, arrange to bail out. */ + pstate->pa_next_plan = node->as_whichplan; + } + + if (pstate->pa_next_plan == node->as_whichplan) + { + /* We've tried everything! */ + pstate->pa_next_plan = INVALID_SUBPLAN_INDEX; + LWLockRelease(&pstate->pa_lock); + return false; + } + } + + /* Pick the plan we found, and advance pa_next_plan one more time. */ + node->as_whichplan = pstate->pa_next_plan++; + if (pstate->pa_next_plan >= node->as_nplans) + { + Assert(append->first_partial_plan < node->as_nplans); + pstate->pa_next_plan = append->first_partial_plan; + } + + /* If non-partial, immediately mark as finished. */ + if (node->as_whichplan < append->first_partial_plan) + node->as_pstate->pa_finished[node->as_whichplan] = true; + + LWLockRelease(&pstate->pa_lock); + + return true; } |