diff options
Diffstat (limited to 'src')
29 files changed, 939 insertions, 128 deletions
diff --git a/src/backend/executor/execParallel.c b/src/backend/executor/execParallel.c index ff5cff59b0f..558cb08b07e 100644 --- a/src/backend/executor/execParallel.c +++ b/src/backend/executor/execParallel.c @@ -26,6 +26,7 @@ #include "executor/execExpr.h" #include "executor/execParallel.h" #include "executor/executor.h" +#include "executor/nodeAppend.h" #include "executor/nodeBitmapHeapscan.h" #include "executor/nodeCustom.h" #include "executor/nodeForeignscan.h" @@ -250,6 +251,11 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e) ExecForeignScanEstimate((ForeignScanState *) planstate, e->pcxt); break; + case T_AppendState: + if (planstate->plan->parallel_aware) + ExecAppendEstimate((AppendState *) planstate, + e->pcxt); + break; case T_CustomScanState: if (planstate->plan->parallel_aware) ExecCustomScanEstimate((CustomScanState *) planstate, @@ -453,6 +459,11 @@ ExecParallelInitializeDSM(PlanState *planstate, ExecForeignScanInitializeDSM((ForeignScanState *) planstate, d->pcxt); break; + case T_AppendState: + if (planstate->plan->parallel_aware) + ExecAppendInitializeDSM((AppendState *) planstate, + d->pcxt); + break; case T_CustomScanState: if (planstate->plan->parallel_aware) ExecCustomScanInitializeDSM((CustomScanState *) planstate, @@ -884,6 +895,10 @@ ExecParallelReInitializeDSM(PlanState *planstate, ExecForeignScanReInitializeDSM((ForeignScanState *) planstate, pcxt); break; + case T_AppendState: + if (planstate->plan->parallel_aware) + ExecAppendReInitializeDSM((AppendState *) planstate, pcxt); + break; case T_CustomScanState: if (planstate->plan->parallel_aware) ExecCustomScanReInitializeDSM((CustomScanState *) planstate, @@ -1194,6 +1209,10 @@ ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt) ExecForeignScanInitializeWorker((ForeignScanState *) planstate, pwcxt); break; + case T_AppendState: + if (planstate->plan->parallel_aware) + ExecAppendInitializeWorker((AppendState *) planstate, pwcxt); + break; case T_CustomScanState: if (planstate->plan->parallel_aware) ExecCustomScanInitializeWorker((CustomScanState *) planstate, diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c index 1d2fb35d551..246a0b2d852 100644 --- a/src/backend/executor/nodeAppend.c +++ b/src/backend/executor/nodeAppend.c @@ -61,51 +61,27 @@ #include "executor/nodeAppend.h" #include "miscadmin.h" -static TupleTableSlot *ExecAppend(PlanState *pstate); -static bool exec_append_initialize_next(AppendState *appendstate); - - -/* ---------------------------------------------------------------- - * exec_append_initialize_next - * - * Sets up the append state node for the "next" scan. - * - * Returns t iff there is a "next" scan to process. - * ---------------------------------------------------------------- - */ -static bool -exec_append_initialize_next(AppendState *appendstate) +/* Shared state for parallel-aware Append. */ +struct ParallelAppendState { - int whichplan; + LWLock pa_lock; /* mutual exclusion to choose next subplan */ + int pa_next_plan; /* next plan to choose by any worker */ /* - * get information from the append node + * pa_finished[i] should be true if no more workers should select subplan + * i. for a non-partial plan, this should be set to true as soon as a + * worker selects the plan; for a partial plan, it remains false until + * some worker executes the plan to completion. */ - whichplan = appendstate->as_whichplan; + bool pa_finished[FLEXIBLE_ARRAY_MEMBER]; +}; - if (whichplan < 0) - { - /* - * if scanning in reverse, we start at the last scan in the list and - * then proceed back to the first.. in any case we inform ExecAppend - * that we are at the end of the line by returning false - */ - appendstate->as_whichplan = 0; - return false; - } - else if (whichplan >= appendstate->as_nplans) - { - /* - * as above, end the scan if we go beyond the last scan in our list.. - */ - appendstate->as_whichplan = appendstate->as_nplans - 1; - return false; - } - else - { - return true; - } -} +#define INVALID_SUBPLAN_INDEX -1 + +static TupleTableSlot *ExecAppend(PlanState *pstate); +static bool choose_next_subplan_locally(AppendState *node); +static bool choose_next_subplan_for_leader(AppendState *node); +static bool choose_next_subplan_for_worker(AppendState *node); /* ---------------------------------------------------------------- * ExecInitAppend @@ -185,10 +161,15 @@ ExecInitAppend(Append *node, EState *estate, int eflags) appendstate->ps.ps_ProjInfo = NULL; /* - * initialize to scan first subplan + * Parallel-aware append plans must choose the first subplan to execute by + * looking at shared memory, but non-parallel-aware append plans can + * always start with the first subplan. */ - appendstate->as_whichplan = 0; - exec_append_initialize_next(appendstate); + appendstate->as_whichplan = + appendstate->ps.plan->parallel_aware ? INVALID_SUBPLAN_INDEX : 0; + + /* If parallel-aware, this will be overridden later. */ + appendstate->choose_next_subplan = choose_next_subplan_locally; return appendstate; } @@ -204,6 +185,11 @@ ExecAppend(PlanState *pstate) { AppendState *node = castNode(AppendState, pstate); + /* If no subplan has been chosen, we must choose one before proceeding. */ + if (node->as_whichplan == INVALID_SUBPLAN_INDEX && + !node->choose_next_subplan(node)) + return ExecClearTuple(node->ps.ps_ResultTupleSlot); + for (;;) { PlanState *subnode; @@ -214,6 +200,7 @@ ExecAppend(PlanState *pstate) /* * figure out which subplan we are currently processing */ + Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as_nplans); subnode = node->appendplans[node->as_whichplan]; /* @@ -231,19 +218,9 @@ ExecAppend(PlanState *pstate) return result; } - /* - * Go on to the "next" subplan in the appropriate direction. If no - * more subplans, return the empty slot set up for us by - * ExecInitAppend. - */ - if (ScanDirectionIsForward(node->ps.state->es_direction)) - node->as_whichplan++; - else - node->as_whichplan--; - if (!exec_append_initialize_next(node)) + /* choose new subplan; if none, we're done */ + if (!node->choose_next_subplan(node)) return ExecClearTuple(node->ps.ps_ResultTupleSlot); - - /* Else loop back and try to get a tuple from the new subplan */ } } @@ -298,6 +275,246 @@ ExecReScanAppend(AppendState *node) if (subnode->chgParam == NULL) ExecReScan(subnode); } - node->as_whichplan = 0; - exec_append_initialize_next(node); + + node->as_whichplan = + node->ps.plan->parallel_aware ? INVALID_SUBPLAN_INDEX : 0; +} + +/* ---------------------------------------------------------------- + * Parallel Append Support + * ---------------------------------------------------------------- + */ + +/* ---------------------------------------------------------------- + * ExecAppendEstimate + * + * Compute the amount of space we'll need in the parallel + * query DSM, and inform pcxt->estimator about our needs. + * ---------------------------------------------------------------- + */ +void +ExecAppendEstimate(AppendState *node, + ParallelContext *pcxt) +{ + node->pstate_len = + add_size(offsetof(ParallelAppendState, pa_finished), + sizeof(bool) * node->as_nplans); + + shm_toc_estimate_chunk(&pcxt->estimator, node->pstate_len); + shm_toc_estimate_keys(&pcxt->estimator, 1); +} + + +/* ---------------------------------------------------------------- + * ExecAppendInitializeDSM + * + * Set up shared state for Parallel Append. + * ---------------------------------------------------------------- + */ +void +ExecAppendInitializeDSM(AppendState *node, + ParallelContext *pcxt) +{ + ParallelAppendState *pstate; + + pstate = shm_toc_allocate(pcxt->toc, node->pstate_len); + memset(pstate, 0, node->pstate_len); + LWLockInitialize(&pstate->pa_lock, LWTRANCHE_PARALLEL_APPEND); + shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id, pstate); + + node->as_pstate = pstate; + node->choose_next_subplan = choose_next_subplan_for_leader; +} + +/* ---------------------------------------------------------------- + * ExecAppendReInitializeDSM + * + * Reset shared state before beginning a fresh scan. + * ---------------------------------------------------------------- + */ +void +ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt) +{ + ParallelAppendState *pstate = node->as_pstate; + + pstate->pa_next_plan = 0; + memset(pstate->pa_finished, 0, sizeof(bool) * node->as_nplans); +} + +/* ---------------------------------------------------------------- + * ExecAppendInitializeWorker + * + * Copy relevant information from TOC into planstate, and initialize + * whatever is required to choose and execute the optimal subplan. + * ---------------------------------------------------------------- + */ +void +ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt) +{ + node->as_pstate = shm_toc_lookup(pwcxt->toc, node->ps.plan->plan_node_id, false); + node->choose_next_subplan = choose_next_subplan_for_worker; +} + +/* ---------------------------------------------------------------- + * choose_next_subplan_locally + * + * Choose next subplan for a non-parallel-aware Append, + * returning false if there are no more. + * ---------------------------------------------------------------- + */ +static bool +choose_next_subplan_locally(AppendState *node) +{ + int whichplan = node->as_whichplan; + + /* We should never see INVALID_SUBPLAN_INDEX in this case. */ + Assert(whichplan >= 0 && whichplan <= node->as_nplans); + + if (ScanDirectionIsForward(node->ps.state->es_direction)) + { + if (whichplan >= node->as_nplans - 1) + return false; + node->as_whichplan++; + } + else + { + if (whichplan <= 0) + return false; + node->as_whichplan--; + } + + return true; +} + +/* ---------------------------------------------------------------- + * choose_next_subplan_for_leader + * + * Try to pick a plan which doesn't commit us to doing much + * work locally, so that as much work as possible is done in + * the workers. Cheapest subplans are at the end. + * ---------------------------------------------------------------- + */ +static bool +choose_next_subplan_for_leader(AppendState *node) +{ + ParallelAppendState *pstate = node->as_pstate; + Append *append = (Append *) node->ps.plan; + + /* Backward scan is not supported by parallel-aware plans */ + Assert(ScanDirectionIsForward(node->ps.state->es_direction)); + + LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE); + + if (node->as_whichplan != INVALID_SUBPLAN_INDEX) + { + /* Mark just-completed subplan as finished. */ + node->as_pstate->pa_finished[node->as_whichplan] = true; + } + else + { + /* Start with last subplan. */ + node->as_whichplan = node->as_nplans - 1; + } + + /* Loop until we find a subplan to execute. */ + while (pstate->pa_finished[node->as_whichplan]) + { + if (node->as_whichplan == 0) + { + pstate->pa_next_plan = INVALID_SUBPLAN_INDEX; + node->as_whichplan = INVALID_SUBPLAN_INDEX; + LWLockRelease(&pstate->pa_lock); + return false; + } + node->as_whichplan--; + } + + /* If non-partial, immediately mark as finished. */ + if (node->as_whichplan < append->first_partial_plan) + node->as_pstate->pa_finished[node->as_whichplan] = true; + + LWLockRelease(&pstate->pa_lock); + + return true; +} + +/* ---------------------------------------------------------------- + * choose_next_subplan_for_worker + * + * Choose next subplan for a parallel-aware Append, returning + * false if there are no more. + * + * We start from the first plan and advance through the list; + * when we get back to the end, we loop back to the first + * nonpartial plan. This assigns the non-partial plans first + * in order of descending cost and then spreads out the + * workers as evenly as possible across the remaining partial + * plans. + * ---------------------------------------------------------------- + */ +static bool +choose_next_subplan_for_worker(AppendState *node) +{ + ParallelAppendState *pstate = node->as_pstate; + Append *append = (Append *) node->ps.plan; + + /* Backward scan is not supported by parallel-aware plans */ + Assert(ScanDirectionIsForward(node->ps.state->es_direction)); + + LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE); + + /* Mark just-completed subplan as finished. */ + if (node->as_whichplan != INVALID_SUBPLAN_INDEX) + node->as_pstate->pa_finished[node->as_whichplan] = true; + + /* If all the plans are already done, we have nothing to do */ + if (pstate->pa_next_plan == INVALID_SUBPLAN_INDEX) + { + LWLockRelease(&pstate->pa_lock); + return false; + } + + /* Loop until we find a subplan to execute. */ + while (pstate->pa_finished[pstate->pa_next_plan]) + { + if (pstate->pa_next_plan < node->as_nplans - 1) + { + /* Advance to next plan. */ + pstate->pa_next_plan++; + } + else if (append->first_partial_plan < node->as_nplans) + { + /* Loop back to first partial plan. */ + pstate->pa_next_plan = append->first_partial_plan; + } + else + { + /* At last plan, no partial plans, arrange to bail out. */ + pstate->pa_next_plan = node->as_whichplan; + } + + if (pstate->pa_next_plan == node->as_whichplan) + { + /* We've tried everything! */ + pstate->pa_next_plan = INVALID_SUBPLAN_INDEX; + LWLockRelease(&pstate->pa_lock); + return false; + } + } + + /* Pick the plan we found, and advance pa_next_plan one more time. */ + node->as_whichplan = pstate->pa_next_plan++; + if (pstate->pa_next_plan >= node->as_nplans) + { + Assert(append->first_partial_plan < node->as_nplans); + pstate->pa_next_plan = append->first_partial_plan; + } + + /* If non-partial, immediately mark as finished. */ + if (node->as_whichplan < append->first_partial_plan) + node->as_pstate->pa_finished[node->as_whichplan] = true; + + LWLockRelease(&pstate->pa_lock); + + return true; } diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c index aff9a62106d..b1515dd8e10 100644 --- a/src/backend/nodes/copyfuncs.c +++ b/src/backend/nodes/copyfuncs.c @@ -242,6 +242,7 @@ _copyAppend(const Append *from) */ COPY_NODE_FIELD(partitioned_rels); COPY_NODE_FIELD(appendplans); + COPY_SCALAR_FIELD(first_partial_plan); return newnode; } diff --git a/src/backend/nodes/list.c b/src/backend/nodes/list.c index acaf4b53153..bee6244adc1 100644 --- a/src/backend/nodes/list.c +++ b/src/backend/nodes/list.c @@ -1250,6 +1250,44 @@ list_copy_tail(const List *oldlist, int nskip) } /* + * Sort a list using qsort. A sorted list is built but the cells of the + * original list are re-used. The comparator function receives arguments of + * type ListCell ** + */ +List * +list_qsort(const List *list, list_qsort_comparator cmp) +{ + ListCell *cell; + int i; + int len = list_length(list); + ListCell **list_arr; + List *new_list; + + if (len == 0) + return NIL; + + i = 0; + list_arr = palloc(sizeof(ListCell *) * len); + foreach(cell, list) + list_arr[i++] = cell; + + qsort(list_arr, len, sizeof(ListCell *), cmp); + + new_list = (List *) palloc(sizeof(List)); + new_list->type = list->type; + new_list->length = len; + new_list->head = list_arr[0]; + new_list->tail = list_arr[len - 1]; + + for (i = 0; i < len - 1; i++) + list_arr[i]->next = list_arr[i + 1]; + + list_arr[len - 1]->next = NULL; + pfree(list_arr); + return new_list; +} + +/* * Temporary compatibility functions * * In order to avoid warnings for these function definitions, we need diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c index c97ee24ade1..b59a5219a72 100644 --- a/src/backend/nodes/outfuncs.c +++ b/src/backend/nodes/outfuncs.c @@ -399,6 +399,7 @@ _outAppend(StringInfo str, const Append *node) WRITE_NODE_FIELD(partitioned_rels); WRITE_NODE_FIELD(appendplans); + WRITE_INT_FIELD(first_partial_plan); } static void diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c index 7eb67fc0407..0d17ae89b0c 100644 --- a/src/backend/nodes/readfuncs.c +++ b/src/backend/nodes/readfuncs.c @@ -1600,6 +1600,7 @@ _readAppend(void) READ_NODE_FIELD(partitioned_rels); READ_NODE_FIELD(appendplans); + READ_INT_FIELD(first_partial_plan); READ_DONE(); } diff --git a/src/backend/optimizer/path/allpaths.c b/src/backend/optimizer/path/allpaths.c index 44f6b034420..47986ba80a5 100644 --- a/src/backend/optimizer/path/allpaths.c +++ b/src/backend/optimizer/path/allpaths.c @@ -101,7 +101,8 @@ static void generate_mergeappend_paths(PlannerInfo *root, RelOptInfo *rel, static Path *get_cheapest_parameterized_child_path(PlannerInfo *root, RelOptInfo *rel, Relids required_outer); -static List *accumulate_append_subpath(List *subpaths, Path *path); +static void accumulate_append_subpath(Path *path, + List **subpaths, List **special_subpaths); static void set_subquery_pathlist(PlannerInfo *root, RelOptInfo *rel, Index rti, RangeTblEntry *rte); static void set_function_pathlist(PlannerInfo *root, RelOptInfo *rel, @@ -1331,13 +1332,17 @@ add_paths_to_append_rel(PlannerInfo *root, RelOptInfo *rel, List *subpaths = NIL; bool subpaths_valid = true; List *partial_subpaths = NIL; + List *pa_partial_subpaths = NIL; + List *pa_nonpartial_subpaths = NIL; bool partial_subpaths_valid = true; + bool pa_subpaths_valid = enable_parallel_append; List *all_child_pathkeys = NIL; List *all_child_outers = NIL; ListCell *l; List *partitioned_rels = NIL; RangeTblEntry *rte; bool build_partitioned_rels = false; + double partial_rows = -1; if (IS_SIMPLE_REL(rel)) { @@ -1388,6 +1393,7 @@ add_paths_to_append_rel(PlannerInfo *root, RelOptInfo *rel, { RelOptInfo *childrel = lfirst(l); ListCell *lcp; + Path *cheapest_partial_path = NULL; /* * If we need to build partitioned_rels, accumulate the partitioned @@ -1408,19 +1414,70 @@ add_paths_to_append_rel(PlannerInfo *root, RelOptInfo *rel, * If not, there's no workable unparameterized path. */ if (childrel->cheapest_total_path->param_info == NULL) - subpaths = accumulate_append_subpath(subpaths, - childrel->cheapest_total_path); + accumulate_append_subpath(childrel->cheapest_total_path, + &subpaths, NULL); else subpaths_valid = false; /* Same idea, but for a partial plan. */ if (childrel->partial_pathlist != NIL) - partial_subpaths = accumulate_append_subpath(partial_subpaths, - linitial(childrel->partial_pathlist)); + { + cheapest_partial_path = linitial(childrel->partial_pathlist); + accumulate_append_subpath(cheapest_partial_path, + &partial_subpaths, NULL); + } else partial_subpaths_valid = false; /* + * Same idea, but for a parallel append mixing partial and non-partial + * paths. + */ + if (pa_subpaths_valid) + { + Path *nppath = NULL; + + nppath = + get_cheapest_parallel_safe_total_inner(childrel->pathlist); + + if (cheapest_partial_path == NULL && nppath == NULL) + { + /* Neither a partial nor a parallel-safe path? Forget it. */ + pa_subpaths_valid = false; + } + else if (nppath == NULL || + (cheapest_partial_path != NULL && + cheapest_partial_path->total_cost < nppath->total_cost)) + { + /* Partial path is cheaper or the only option. */ + Assert(cheapest_partial_path != NULL); + accumulate_append_subpath(cheapest_partial_path, + &pa_partial_subpaths, + &pa_nonpartial_subpaths); + + } + else + { + /* + * Either we've got only a non-partial path, or we think that + * a single backend can execute the best non-partial path + * faster than all the parallel backends working together can + * execute the best partial path. + * + * It might make sense to be more aggressive here. Even if + * the best non-partial path is more expensive than the best + * partial path, it could still be better to choose the + * non-partial path if there are several such paths that can + * be given to different workers. For now, we don't try to + * figure that out. + */ + accumulate_append_subpath(nppath, + &pa_nonpartial_subpaths, + NULL); + } + } + + /* * Collect lists of all the available path orderings and * parameterizations for all the children. We use these as a * heuristic to indicate which sort orderings and parameterizations we @@ -1491,11 +1548,13 @@ add_paths_to_append_rel(PlannerInfo *root, RelOptInfo *rel, * if we have zero or one live subpath due to constraint exclusion.) */ if (subpaths_valid) - add_path(rel, (Path *) create_append_path(rel, subpaths, NULL, 0, - partitioned_rels)); + add_path(rel, (Path *) create_append_path(rel, subpaths, NIL, + NULL, 0, false, + partitioned_rels, -1)); /* - * Consider an append of partial unordered, unparameterized partial paths. + * Consider an append of unordered, unparameterized partial paths. Make + * it parallel-aware if possible. */ if (partial_subpaths_valid) { @@ -1503,12 +1562,7 @@ add_paths_to_append_rel(PlannerInfo *root, RelOptInfo *rel, ListCell *lc; int parallel_workers = 0; - /* - * Decide on the number of workers to request for this append path. - * For now, we just use the maximum value from among the members. It - * might be useful to use a higher number if the Append node were - * smart enough to spread out the workers, but it currently isn't. - */ + /* Find the highest number of workers requested for any subpath. */ foreach(lc, partial_subpaths) { Path *path = lfirst(lc); @@ -1517,9 +1571,78 @@ add_paths_to_append_rel(PlannerInfo *root, RelOptInfo *rel, } Assert(parallel_workers > 0); + /* + * If the use of parallel append is permitted, always request at least + * log2(# of children) paths. We assume it can be useful to have + * extra workers in this case because they will be spread out across + * the children. The precise formula is just a guess, but we don't + * want to end up with a radically different answer for a table with N + * partitions vs. an unpartitioned table with the same data, so the + * use of some kind of log-scaling here seems to make some sense. + */ + if (enable_parallel_append) + { + parallel_workers = Max(parallel_workers, + fls(list_length(live_childrels))); + parallel_workers = Min(parallel_workers, + max_parallel_workers_per_gather); + } + Assert(parallel_workers > 0); + /* Generate a partial append path. */ - appendpath = create_append_path(rel, partial_subpaths, NULL, - parallel_workers, partitioned_rels); + appendpath = create_append_path(rel, NIL, partial_subpaths, NULL, + parallel_workers, + enable_parallel_append, + partitioned_rels, -1); + + /* + * Make sure any subsequent partial paths use the same row count + * estimate. + */ + partial_rows = appendpath->path.rows; + + /* Add the path. */ + add_partial_path(rel, (Path *) appendpath); + } + + /* + * Consider a parallel-aware append using a mix of partial and non-partial + * paths. (This only makes sense if there's at least one child which has + * a non-partial path that is substantially cheaper than any partial path; + * otherwise, we should use the append path added in the previous step.) + */ + if (pa_subpaths_valid && pa_nonpartial_subpaths != NIL) + { + AppendPath *appendpath; + ListCell *lc; + int parallel_workers = 0; + + /* + * Find the highest number of workers requested for any partial + * subpath. + */ + foreach(lc, pa_partial_subpaths) + { + Path *path = lfirst(lc); + + parallel_workers = Max(parallel_workers, path->parallel_workers); + } + + /* + * Same formula here as above. It's even more important in this + * instance because the non-partial paths won't contribute anything to + * the planned number of parallel workers. + */ + parallel_workers = Max(parallel_workers, + fls(list_length(live_childrels))); + parallel_workers = Min(parallel_workers, + max_parallel_workers_per_gather); + Assert(parallel_workers > 0); + + appendpath = create_append_path(rel, pa_nonpartial_subpaths, + pa_partial_subpaths, + NULL, parallel_workers, true, + partitioned_rels, partial_rows); add_partial_path(rel, (Path *) appendpath); } @@ -1567,13 +1690,14 @@ add_paths_to_append_rel(PlannerInfo *root, RelOptInfo *rel, subpaths_valid = false; break; } - subpaths = accumulate_append_subpath(subpaths, subpath); + accumulate_append_subpath(subpath, &subpaths, NULL); } if (subpaths_valid) add_path(rel, (Path *) - create_append_path(rel, subpaths, required_outer, 0, - partitioned_rels)); + create_append_path(rel, subpaths, NIL, + required_outer, 0, false, + partitioned_rels, -1)); } } @@ -1657,10 +1781,10 @@ generate_mergeappend_paths(PlannerInfo *root, RelOptInfo *rel, if (cheapest_startup != cheapest_total) startup_neq_total = true; - startup_subpaths = - accumulate_append_subpath(startup_subpaths, cheapest_startup); - total_subpaths = - accumulate_append_subpath(total_subpaths, cheapest_total); + accumulate_append_subpath(cheapest_startup, + &startup_subpaths, NULL); + accumulate_append_subpath(cheapest_total, + &total_subpaths, NULL); } /* ... and build the MergeAppend paths */ @@ -1756,7 +1880,7 @@ get_cheapest_parameterized_child_path(PlannerInfo *root, RelOptInfo *rel, /* * accumulate_append_subpath - * Add a subpath to the list being built for an Append or MergeAppend + * Add a subpath to the list being built for an Append or MergeAppend. * * It's possible that the child is itself an Append or MergeAppend path, in * which case we can "cut out the middleman" and just add its child paths to @@ -1767,26 +1891,53 @@ get_cheapest_parameterized_child_path(PlannerInfo *root, RelOptInfo *rel, * omitting a sort step, which seems fine: if the parent is to be an Append, * its result would be unsorted anyway, while if the parent is to be a * MergeAppend, there's no point in a separate sort on a child. + * its result would be unsorted anyway. + * + * Normally, either path is a partial path and subpaths is a list of partial + * paths, or else path is a non-partial plan and subpaths is a list of those. + * However, if path is a parallel-aware Append, then we add its partial path + * children to subpaths and the rest to special_subpaths. If the latter is + * NULL, we don't flatten the path at all (unless it contains only partial + * paths). */ -static List * -accumulate_append_subpath(List *subpaths, Path *path) +static void +accumulate_append_subpath(Path *path, List **subpaths, List **special_subpaths) { if (IsA(path, AppendPath)) { AppendPath *apath = (AppendPath *) path; - /* list_copy is important here to avoid sharing list substructure */ - return list_concat(subpaths, list_copy(apath->subpaths)); + if (!apath->path.parallel_aware || apath->first_partial_path == 0) + { + /* list_copy is important here to avoid sharing list substructure */ + *subpaths = list_concat(*subpaths, list_copy(apath->subpaths)); + return; + } + else if (special_subpaths != NULL) + { + List *new_special_subpaths; + + /* Split Parallel Append into partial and non-partial subpaths */ + *subpaths = list_concat(*subpaths, + list_copy_tail(apath->subpaths, + apath->first_partial_path)); + new_special_subpaths = + list_truncate(list_copy(apath->subpaths), + apath->first_partial_path); + *special_subpaths = list_concat(*special_subpaths, + new_special_subpaths); + } } else if (IsA(path, MergeAppendPath)) { MergeAppendPath *mpath = (MergeAppendPath *) path; /* list_copy is important here to avoid sharing list substructure */ - return list_concat(subpaths, list_copy(mpath->subpaths)); + *subpaths = list_concat(*subpaths, list_copy(mpath->subpaths)); + return; } - else - return lappend(subpaths, path); + + *subpaths = lappend(*subpaths, path); } /* @@ -1809,7 +1960,8 @@ set_dummy_rel_pathlist(RelOptInfo *rel) rel->pathlist = NIL; rel->partial_pathlist = NIL; - add_path(rel, (Path *) create_append_path(rel, NIL, NULL, 0, NIL)); + add_path(rel, (Path *) create_append_path(rel, NIL, NIL, NULL, + 0, false, NIL, -1)); /* * We set the cheapest path immediately, to ensure that IS_DUMMY_REL() diff --git a/src/backend/optimizer/path/costsize.c b/src/backend/optimizer/path/costsize.c index d11bf19e30a..877827dcb52 100644 --- a/src/backend/optimizer/path/costsize.c +++ b/src/backend/optimizer/path/costsize.c @@ -128,6 +128,7 @@ bool enable_mergejoin = true; bool enable_hashjoin = true; bool enable_gathermerge = true; bool enable_partition_wise_join = false; +bool enable_parallel_append = true; typedef struct { @@ -160,6 +161,8 @@ static Selectivity get_foreign_key_join_selectivity(PlannerInfo *root, Relids inner_relids, SpecialJoinInfo *sjinfo, List **restrictlist); +static Cost append_nonpartial_cost(List *subpaths, int numpaths, + int parallel_workers); static void set_rel_width(PlannerInfo *root, RelOptInfo *rel); static double relation_byte_size(double tuples, int width); static double page_size(double tuples, int width); @@ -1742,6 +1745,167 @@ cost_sort(Path *path, PlannerInfo *root, } /* + * append_nonpartial_cost + * Estimate the cost of the non-partial paths in a Parallel Append. + * The non-partial paths are assumed to be the first "numpaths" paths + * from the subpaths list, and to be in order of decreasing cost. + */ +static Cost +append_nonpartial_cost(List *subpaths, int numpaths, int parallel_workers) +{ + Cost *costarr; + int arrlen; + ListCell *l; + ListCell *cell; + int i; + int path_index; + int min_index; + int max_index; + + if (numpaths == 0) + return 0; + + /* + * Array length is number of workers or number of relevants paths, + * whichever is less. + */ + arrlen = Min(parallel_workers, numpaths); + costarr = (Cost *) palloc(sizeof(Cost) * arrlen); + + /* The first few paths will each be claimed by a different worker. */ + path_index = 0; + foreach(cell, subpaths) + { + Path *subpath = (Path *) lfirst(cell); + + if (path_index == arrlen) + break; + costarr[path_index++] = subpath->total_cost; + } + + /* + * Since subpaths are sorted by decreasing cost, the last one will have + * the minimum cost. + */ + min_index = arrlen - 1; + + /* + * For each of the remaining subpaths, add its cost to the array element + * with minimum cost. + */ + for_each_cell(l, cell) + { + Path *subpath = (Path *) lfirst(l); + int i; + + /* Consider only the non-partial paths */ + if (path_index++ == numpaths) + break; + + costarr[min_index] += subpath->total_cost; + + /* Update the new min cost array index */ + for (min_index = i = 0; i < arrlen; i++) + { + if (costarr[i] < costarr[min_index]) + min_index = i; + } + } + + /* Return the highest cost from the array */ + for (max_index = i = 0; i < arrlen; i++) + { + if (costarr[i] > costarr[max_index]) + max_index = i; + } + + return costarr[max_index]; +} + +/* + * cost_append + * Determines and returns the cost of an Append node. + * + * We charge nothing extra for the Append itself, which perhaps is too + * optimistic, but since it doesn't do any selection or projection, it is a + * pretty cheap node. + */ +void +cost_append(AppendPath *apath) +{ + ListCell *l; + + apath->path.startup_cost = 0; + apath->path.total_cost = 0; + + if (apath->subpaths == NIL) + return; + + if (!apath->path.parallel_aware) + { + Path *subpath = (Path *) linitial(apath->subpaths); + + /* + * Startup cost of non-parallel-aware Append is the startup cost of + * first subpath. + */ + apath->path.startup_cost = subpath->startup_cost; + + /* Compute rows and costs as sums of subplan rows and costs. */ + foreach(l, apath->subpaths) + { + Path *subpath = (Path *) lfirst(l); + + apath->path.rows += subpath->rows; + apath->path.total_cost += subpath->total_cost; + } + } + else /* parallel-aware */ + { + int i = 0; + double parallel_divisor = get_parallel_divisor(&apath->path); + + /* Calculate startup cost. */ + foreach(l, apath->subpaths) + { + Path *subpath = (Path *) lfirst(l); + + /* + * Append will start returning tuples when the child node having + * lowest startup cost is done setting up. We consider only the + * first few subplans that immediately get a worker assigned. + */ + if (i == 0) + apath->path.startup_cost = subpath->startup_cost; + else if (i < apath->path.parallel_workers) + apath->path.startup_cost = Min(apath->path.startup_cost, + subpath->startup_cost); + + /* + * Apply parallel divisor to non-partial subpaths. Also add the + * cost of partial paths to the total cost, but ignore non-partial + * paths for now. + */ + if (i < apath->first_partial_path) + apath->path.rows += subpath->rows / parallel_divisor; + else + { + apath->path.rows += subpath->rows; + apath->path.total_cost += subpath->total_cost; + } + + i++; + } + + /* Add cost for non-partial subpaths. */ + apath->path.total_cost += + append_nonpartial_cost(apath->subpaths, + apath->first_partial_path, + apath->path.parallel_workers); + } +} + +/* * cost_merge_append * Determines and returns the cost of a MergeAppend node. * diff --git a/src/backend/optimizer/path/joinrels.c b/src/backend/optimizer/path/joinrels.c index 453f25964ac..5e03f8bc213 100644 --- a/src/backend/optimizer/path/joinrels.c +++ b/src/backend/optimizer/path/joinrels.c @@ -1232,7 +1232,8 @@ mark_dummy_rel(RelOptInfo *rel) rel->partial_pathlist = NIL; /* Set up the dummy path */ - add_path(rel, (Path *) create_append_path(rel, NIL, NULL, 0, NIL)); + add_path(rel, (Path *) create_append_path(rel, NIL, NIL, NULL, + 0, false, NIL, -1)); /* Set or update cheapest_total_path and related fields */ set_cheapest(rel); diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c index d4454779ee9..f6c83d0477c 100644 --- a/src/backend/optimizer/plan/createplan.c +++ b/src/backend/optimizer/plan/createplan.c @@ -203,7 +203,8 @@ static NamedTuplestoreScan *make_namedtuplestorescan(List *qptlist, List *qpqual Index scanrelid, char *enrname); static WorkTableScan *make_worktablescan(List *qptlist, List *qpqual, Index scanrelid, int wtParam); -static Append *make_append(List *appendplans, List *tlist, List *partitioned_rels); +static Append *make_append(List *appendplans, int first_partial_plan, + List *tlist, List *partitioned_rels); static RecursiveUnion *make_recursive_union(List *tlist, Plan *lefttree, Plan *righttree, @@ -1059,7 +1060,8 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path) * parent-rel Vars it'll be asked to emit. */ - plan = make_append(subplans, tlist, best_path->partitioned_rels); + plan = make_append(subplans, best_path->first_partial_path, + tlist, best_path->partitioned_rels); copy_generic_path_info(&plan->plan, (Path *) best_path); @@ -5294,7 +5296,8 @@ make_foreignscan(List *qptlist, } static Append * -make_append(List *appendplans, List *tlist, List *partitioned_rels) +make_append(List *appendplans, int first_partial_plan, + List *tlist, List *partitioned_rels) { Append *node = makeNode(Append); Plan *plan = &node->plan; @@ -5305,6 +5308,7 @@ make_append(List *appendplans, List *tlist, List *partitioned_rels) plan->righttree = NULL; node->partitioned_rels = partitioned_rels; node->appendplans = appendplans; + node->first_partial_plan = first_partial_plan; return node; } diff --git a/src/backend/optimizer/plan/planner.c b/src/backend/optimizer/plan/planner.c index ef2eaeac0a4..e8bc15c35d2 100644 --- a/src/backend/optimizer/plan/planner.c +++ b/src/backend/optimizer/plan/planner.c @@ -3680,9 +3680,12 @@ create_grouping_paths(PlannerInfo *root, path = (Path *) create_append_path(grouped_rel, paths, + NIL, NULL, 0, - NIL); + false, + NIL, + -1); path->pathtarget = target; } else diff --git a/src/backend/optimizer/prep/prepunion.c b/src/backend/optimizer/prep/prepunion.c index f620243ab44..a24e8acfa6c 100644 --- a/src/backend/optimizer/prep/prepunion.c +++ b/src/backend/optimizer/prep/prepunion.c @@ -590,8 +590,8 @@ generate_union_path(SetOperationStmt *op, PlannerInfo *root, /* * Append the child results together. */ - path = (Path *) create_append_path(result_rel, pathlist, NULL, 0, NIL); - + path = (Path *) create_append_path(result_rel, pathlist, NIL, + NULL, 0, false, NIL, -1); /* We have to manually jam the right tlist into the path; ick */ path->pathtarget = create_pathtarget(root, tlist); @@ -702,7 +702,8 @@ generate_nonunion_path(SetOperationStmt *op, PlannerInfo *root, /* * Append the child results together. */ - path = (Path *) create_append_path(result_rel, pathlist, NULL, 0, NIL); + path = (Path *) create_append_path(result_rel, pathlist, NIL, + NULL, 0, false, NIL, -1); /* We have to manually jam the right tlist into the path; ick */ path->pathtarget = create_pathtarget(root, tlist); diff --git a/src/backend/optimizer/util/pathnode.c b/src/backend/optimizer/util/pathnode.c index bc0841bf0b8..54126fbb6a5 100644 --- a/src/backend/optimizer/util/pathnode.c +++ b/src/backend/optimizer/util/pathnode.c @@ -51,6 +51,8 @@ typedef enum #define STD_FUZZ_FACTOR 1.01 static List *translate_sub_tlist(List *tlist, int relid); +static int append_total_cost_compare(const void *a, const void *b); +static int append_startup_cost_compare(const void *a, const void *b); static List *reparameterize_pathlist_by_child(PlannerInfo *root, List *pathlist, RelOptInfo *child_rel); @@ -1208,44 +1210,50 @@ create_tidscan_path(PlannerInfo *root, RelOptInfo *rel, List *tidquals, * Note that we must handle subpaths = NIL, representing a dummy access path. */ AppendPath * -create_append_path(RelOptInfo *rel, List *subpaths, Relids required_outer, - int parallel_workers, List *partitioned_rels) +create_append_path(RelOptInfo *rel, + List *subpaths, List *partial_subpaths, + Relids required_outer, + int parallel_workers, bool parallel_aware, + List *partitioned_rels, double rows) { AppendPath *pathnode = makeNode(AppendPath); ListCell *l; + Assert(!parallel_aware || parallel_workers > 0); + pathnode->path.pathtype = T_Append; pathnode->path.parent = rel; pathnode->path.pathtarget = rel->reltarget; pathnode->path.param_info = get_appendrel_parampathinfo(rel, required_outer); - pathnode->path.parallel_aware = false; + pathnode->path.parallel_aware = parallel_aware; pathnode->path.parallel_safe = rel->consider_parallel; pathnode->path.parallel_workers = parallel_workers; pathnode->path.pathkeys = NIL; /* result is always considered unsorted */ pathnode->partitioned_rels = list_copy(partitioned_rels); - pathnode->subpaths = subpaths; /* - * We don't bother with inventing a cost_append(), but just do it here. - * - * Compute rows and costs as sums of subplan rows and costs. We charge - * nothing extra for the Append itself, which perhaps is too optimistic, - * but since it doesn't do any selection or projection, it is a pretty - * cheap node. + * For parallel append, non-partial paths are sorted by descending total + * costs. That way, the total time to finish all non-partial paths is + * minimized. Also, the partial paths are sorted by descending startup + * costs. There may be some paths that require to do startup work by a + * single worker. In such case, it's better for workers to choose the + * expensive ones first, whereas the leader should choose the cheapest + * startup plan. */ - pathnode->path.rows = 0; - pathnode->path.startup_cost = 0; - pathnode->path.total_cost = 0; + if (pathnode->path.parallel_aware) + { + subpaths = list_qsort(subpaths, append_total_cost_compare); + partial_subpaths = list_qsort(partial_subpaths, + append_startup_cost_compare); + } + pathnode->first_partial_path = list_length(subpaths); + pathnode->subpaths = list_concat(subpaths, partial_subpaths); + foreach(l, subpaths) { Path *subpath = (Path *) lfirst(l); - pathnode->path.rows += subpath->rows; - - if (l == list_head(subpaths)) /* first node? */ - pathnode->path.startup_cost = subpath->startup_cost; - pathnode->path.total_cost += subpath->total_cost; pathnode->path.parallel_safe = pathnode->path.parallel_safe && subpath->parallel_safe; @@ -1253,10 +1261,54 @@ create_append_path(RelOptInfo *rel, List *subpaths, Relids required_outer, Assert(bms_equal(PATH_REQ_OUTER(subpath), required_outer)); } + Assert(!parallel_aware || pathnode->path.parallel_safe); + + cost_append(pathnode); + + /* If the caller provided a row estimate, override the computed value. */ + if (rows >= 0) + pathnode->path.rows = rows; + return pathnode; } /* + * append_total_cost_compare + * list_qsort comparator for sorting append child paths by total_cost + */ +static int +append_total_cost_compare(const void *a, const void *b) +{ + Path *path1 = (Path *) lfirst(*(ListCell **) a); + Path *path2 = (Path *) lfirst(*(ListCell **) b); + + if (path1->total_cost > path2->total_cost) + return -1; + if (path1->total_cost < path2->total_cost) + return 1; + + return 0; +} + +/* + * append_startup_cost_compare + * list_qsort comparator for sorting append child paths by startup_cost + */ +static int +append_startup_cost_compare(const void *a, const void *b) +{ + Path *path1 = (Path *) lfirst(*(ListCell **) a); + Path *path2 = (Path *) lfirst(*(ListCell **) b); + + if (path1->startup_cost > path2->startup_cost) + return -1; + if (path1->startup_cost < path2->startup_cost) + return 1; + + return 0; +} + +/* * create_merge_append_path * Creates a path corresponding to a MergeAppend plan, returning the * pathnode. diff --git a/src/backend/storage/lmgr/lwlock.c b/src/backend/storage/lmgr/lwlock.c index e5c3e867099..46f5c4277d4 100644 --- a/src/backend/storage/lmgr/lwlock.c +++ b/src/backend/storage/lmgr/lwlock.c @@ -517,6 +517,7 @@ RegisterLWLockTranches(void) LWLockRegisterTranche(LWTRANCHE_SESSION_TYPMOD_TABLE, "session_typmod_table"); LWLockRegisterTranche(LWTRANCHE_TBM, "tbm"); + LWLockRegisterTranche(LWTRANCHE_PARALLEL_APPEND, "parallel_append"); /* Register named tranches. */ for (i = 0; i < NamedLWLockTrancheRequests; i++) diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 6dcd738be64..0f7a96d85c3 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -920,6 +920,15 @@ static struct config_bool ConfigureNamesBool[] = false, NULL, NULL, NULL }, + { + {"enable_parallel_append", PGC_USERSET, QUERY_TUNING_METHOD, + gettext_noop("Enables the planner's use of parallel append plans."), + NULL + }, + &enable_parallel_append, + true, + NULL, NULL, NULL + }, { {"geqo", PGC_USERSET, QUERY_TUNING_GEQO, diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 16ffbbeea81..842cf3601aa 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -296,6 +296,7 @@ #enable_material = on #enable_mergejoin = on #enable_nestloop = on +#enable_parallel_append = on #enable_seqscan = on #enable_sort = on #enable_tidscan = on diff --git a/src/include/executor/nodeAppend.h b/src/include/executor/nodeAppend.h index 4e38a1380e2..d42d50614c8 100644 --- a/src/include/executor/nodeAppend.h +++ b/src/include/executor/nodeAppend.h @@ -14,10 +14,15 @@ #ifndef NODEAPPEND_H #define NODEAPPEND_H +#include "access/parallel.h" #include "nodes/execnodes.h" extern AppendState *ExecInitAppend(Append *node, EState *estate, int eflags); extern void ExecEndAppend(AppendState *node); extern void ExecReScanAppend(AppendState *node); +extern void ExecAppendEstimate(AppendState *node, ParallelContext *pcxt); +extern void ExecAppendInitializeDSM(AppendState *node, ParallelContext *pcxt); +extern void ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt); +extern void ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt); #endif /* NODEAPPEND_H */ diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h index 084d59ef834..1a35c5c9ada 100644 --- a/src/include/nodes/execnodes.h +++ b/src/include/nodes/execnodes.h @@ -21,6 +21,7 @@ #include "lib/pairingheap.h" #include "nodes/params.h" #include "nodes/plannodes.h" +#include "storage/spin.h" #include "utils/hsearch.h" #include "utils/queryenvironment.h" #include "utils/reltrigger.h" @@ -1000,13 +1001,22 @@ typedef struct ModifyTableState * whichplan which plan is being executed (0 .. n-1) * ---------------- */ -typedef struct AppendState + +struct AppendState; +typedef struct AppendState AppendState; +struct ParallelAppendState; +typedef struct ParallelAppendState ParallelAppendState; + +struct AppendState { PlanState ps; /* its first field is NodeTag */ PlanState **appendplans; /* array of PlanStates for my inputs */ int as_nplans; int as_whichplan; -} AppendState; + ParallelAppendState *as_pstate; /* parallel coordination info */ + Size pstate_len; /* size of parallel coordination info */ + bool (*choose_next_subplan) (AppendState *); +}; /* ---------------- * MergeAppendState information diff --git a/src/include/nodes/pg_list.h b/src/include/nodes/pg_list.h index 667d5e269cc..711db925769 100644 --- a/src/include/nodes/pg_list.h +++ b/src/include/nodes/pg_list.h @@ -269,6 +269,9 @@ extern void list_free_deep(List *list); extern List *list_copy(const List *list); extern List *list_copy_tail(const List *list, int nskip); +typedef int (*list_qsort_comparator) (const void *a, const void *b); +extern List *list_qsort(const List *list, list_qsort_comparator cmp); + /* * To ease migration to the new list API, a set of compatibility * macros are provided that reduce the impact of the list API changes diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h index 9b38d44ba02..02fb366680f 100644 --- a/src/include/nodes/plannodes.h +++ b/src/include/nodes/plannodes.h @@ -248,6 +248,7 @@ typedef struct Append /* RT indexes of non-leaf tables in a partition tree */ List *partitioned_rels; List *appendplans; + int first_partial_plan; } Append; /* ---------------- diff --git a/src/include/nodes/relation.h b/src/include/nodes/relation.h index 51df8e97415..1108b6a0ea0 100644 --- a/src/include/nodes/relation.h +++ b/src/include/nodes/relation.h @@ -1255,6 +1255,9 @@ typedef struct CustomPath * AppendPath represents an Append plan, ie, successive execution of * several member plans. * + * For partial Append, 'subpaths' contains non-partial subpaths followed by + * partial subpaths. + * * Note: it is possible for "subpaths" to contain only one, or even no, * elements. These cases are optimized during create_append_plan. * In particular, an AppendPath with no subpaths is a "dummy" path that @@ -1266,6 +1269,9 @@ typedef struct AppendPath /* RT indexes of non-leaf tables in a partition tree */ List *partitioned_rels; List *subpaths; /* list of component Paths */ + + /* Index of first partial path in subpaths */ + int first_partial_path; } AppendPath; #define IS_DUMMY_PATH(p) \ diff --git a/src/include/optimizer/cost.h b/src/include/optimizer/cost.h index 6c2317df397..5a1fbf97c38 100644 --- a/src/include/optimizer/cost.h +++ b/src/include/optimizer/cost.h @@ -68,6 +68,7 @@ extern bool enable_mergejoin; extern bool enable_hashjoin; extern bool enable_gathermerge; extern bool enable_partition_wise_join; +extern bool enable_parallel_append; extern int constraint_exclusion; extern double clamp_row_est(double nrows); @@ -106,6 +107,7 @@ extern void cost_sort(Path *path, PlannerInfo *root, List *pathkeys, Cost input_cost, double tuples, int width, Cost comparison_cost, int sort_mem, double limit_tuples); +extern void cost_append(AppendPath *path); extern void cost_merge_append(Path *path, PlannerInfo *root, List *pathkeys, int n_streams, Cost input_startup_cost, Cost input_total_cost, diff --git a/src/include/optimizer/pathnode.h b/src/include/optimizer/pathnode.h index e9ed16ad321..99f65b44f22 100644 --- a/src/include/optimizer/pathnode.h +++ b/src/include/optimizer/pathnode.h @@ -14,6 +14,7 @@ #ifndef PATHNODE_H #define PATHNODE_H +#include "nodes/bitmapset.h" #include "nodes/relation.h" @@ -63,9 +64,11 @@ extern BitmapOrPath *create_bitmap_or_path(PlannerInfo *root, List *bitmapquals); extern TidPath *create_tidscan_path(PlannerInfo *root, RelOptInfo *rel, List *tidquals, Relids required_outer); -extern AppendPath *create_append_path(RelOptInfo *rel, List *subpaths, - Relids required_outer, int parallel_workers, - List *partitioned_rels); +extern AppendPath *create_append_path(RelOptInfo *rel, + List *subpaths, List *partial_subpaths, + Relids required_outer, + int parallel_workers, bool parallel_aware, + List *partitioned_rels, double rows); extern MergeAppendPath *create_merge_append_path(PlannerInfo *root, RelOptInfo *rel, List *subpaths, diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h index 596fdadc632..460843d73e2 100644 --- a/src/include/storage/lwlock.h +++ b/src/include/storage/lwlock.h @@ -216,6 +216,7 @@ typedef enum BuiltinTrancheIds LWTRANCHE_SESSION_RECORD_TABLE, LWTRANCHE_SESSION_TYPMOD_TABLE, LWTRANCHE_TBM, + LWTRANCHE_PARALLEL_APPEND, LWTRANCHE_FIRST_USER_DEFINED } BuiltinTrancheIds; diff --git a/src/test/regress/expected/inherit.out b/src/test/regress/expected/inherit.out index fac7b62f9c9..a79f891da72 100644 --- a/src/test/regress/expected/inherit.out +++ b/src/test/regress/expected/inherit.out @@ -1404,6 +1404,7 @@ select min(1-id) from matest0; reset enable_indexscan; set enable_seqscan = off; -- plan with fewest seqscans should be merge +set enable_parallel_append = off; -- Don't let parallel-append interfere explain (verbose, costs off) select * from matest0 order by 1-id; QUERY PLAN ------------------------------------------------------------------ @@ -1470,6 +1471,7 @@ select min(1-id) from matest0; (1 row) reset enable_seqscan; +reset enable_parallel_append; drop table matest0 cascade; NOTICE: drop cascades to 3 other objects DETAIL: drop cascades to table matest1 diff --git a/src/test/regress/expected/select_parallel.out b/src/test/regress/expected/select_parallel.out index b748c98c915..62ed719cccb 100644 --- a/src/test/regress/expected/select_parallel.out +++ b/src/test/regress/expected/select_parallel.out @@ -11,8 +11,88 @@ set parallel_setup_cost=0; set parallel_tuple_cost=0; set min_parallel_table_scan_size=0; set max_parallel_workers_per_gather=4; +-- Parallel Append with partial-subplans explain (costs off) - select count(*) from a_star; + select round(avg(aa)), sum(aa) from a_star; + QUERY PLAN +----------------------------------------------------- + Finalize Aggregate + -> Gather + Workers Planned: 3 + -> Partial Aggregate + -> Parallel Append + -> Parallel Seq Scan on a_star + -> Parallel Seq Scan on b_star + -> Parallel Seq Scan on c_star + -> Parallel Seq Scan on d_star + -> Parallel Seq Scan on e_star + -> Parallel Seq Scan on f_star +(11 rows) + +select round(avg(aa)), sum(aa) from a_star; + round | sum +-------+----- + 14 | 355 +(1 row) + +-- Parallel Append with both partial and non-partial subplans +alter table c_star set (parallel_workers = 0); +alter table d_star set (parallel_workers = 0); +explain (costs off) + select round(avg(aa)), sum(aa) from a_star; + QUERY PLAN +----------------------------------------------------- + Finalize Aggregate + -> Gather + Workers Planned: 3 + -> Partial Aggregate + -> Parallel Append + -> Seq Scan on d_star + -> Seq Scan on c_star + -> Parallel Seq Scan on a_star + -> Parallel Seq Scan on b_star + -> Parallel Seq Scan on e_star + -> Parallel Seq Scan on f_star +(11 rows) + +-- Parallel Append with only non-partial subplans +alter table a_star set (parallel_workers = 0); +alter table b_star set (parallel_workers = 0); +alter table e_star set (parallel_workers = 0); +alter table f_star set (parallel_workers = 0); +explain (costs off) + select round(avg(aa)), sum(aa) from a_star; + QUERY PLAN +-------------------------------------------- + Finalize Aggregate + -> Gather + Workers Planned: 3 + -> Partial Aggregate + -> Parallel Append + -> Seq Scan on d_star + -> Seq Scan on f_star + -> Seq Scan on e_star + -> Seq Scan on b_star + -> Seq Scan on c_star + -> Seq Scan on a_star +(11 rows) + +select round(avg(aa)), sum(aa) from a_star; + round | sum +-------+----- + 14 | 355 +(1 row) + +-- Disable Parallel Append +alter table a_star reset (parallel_workers); +alter table b_star reset (parallel_workers); +alter table c_star reset (parallel_workers); +alter table d_star reset (parallel_workers); +alter table e_star reset (parallel_workers); +alter table f_star reset (parallel_workers); +set enable_parallel_append to off; +explain (costs off) + select round(avg(aa)), sum(aa) from a_star; QUERY PLAN ----------------------------------------------------- Finalize Aggregate @@ -28,12 +108,13 @@ explain (costs off) -> Parallel Seq Scan on f_star (11 rows) -select count(*) from a_star; - count -------- - 50 +select round(avg(aa)), sum(aa) from a_star; + round | sum +-------+----- + 14 | 355 (1 row) +reset enable_parallel_append; -- test with leader participation disabled set parallel_leader_participation = off; explain (costs off) diff --git a/src/test/regress/expected/sysviews.out b/src/test/regress/expected/sysviews.out index cd1f7f301d4..2b738aae7c5 100644 --- a/src/test/regress/expected/sysviews.out +++ b/src/test/regress/expected/sysviews.out @@ -81,11 +81,12 @@ select name, setting from pg_settings where name like 'enable%'; enable_material | on enable_mergejoin | on enable_nestloop | on + enable_parallel_append | on enable_partition_wise_join | off enable_seqscan | on enable_sort | on enable_tidscan | on -(13 rows) +(14 rows) -- Test that the pg_timezone_names and pg_timezone_abbrevs views are -- more-or-less working. We can't test their contents in any great detail diff --git a/src/test/regress/sql/inherit.sql b/src/test/regress/sql/inherit.sql index c71febffc2e..2e42ae115d3 100644 --- a/src/test/regress/sql/inherit.sql +++ b/src/test/regress/sql/inherit.sql @@ -508,11 +508,13 @@ select min(1-id) from matest0; reset enable_indexscan; set enable_seqscan = off; -- plan with fewest seqscans should be merge +set enable_parallel_append = off; -- Don't let parallel-append interfere explain (verbose, costs off) select * from matest0 order by 1-id; select * from matest0 order by 1-id; explain (verbose, costs off) select min(1-id) from matest0; select min(1-id) from matest0; reset enable_seqscan; +reset enable_parallel_append; drop table matest0 cascade; diff --git a/src/test/regress/sql/select_parallel.sql b/src/test/regress/sql/select_parallel.sql index 00df92779c6..d3f2028468d 100644 --- a/src/test/regress/sql/select_parallel.sql +++ b/src/test/regress/sql/select_parallel.sql @@ -15,9 +15,38 @@ set parallel_tuple_cost=0; set min_parallel_table_scan_size=0; set max_parallel_workers_per_gather=4; +-- Parallel Append with partial-subplans explain (costs off) - select count(*) from a_star; -select count(*) from a_star; + select round(avg(aa)), sum(aa) from a_star; +select round(avg(aa)), sum(aa) from a_star; + +-- Parallel Append with both partial and non-partial subplans +alter table c_star set (parallel_workers = 0); +alter table d_star set (parallel_workers = 0); +explain (costs off) + select round(avg(aa)), sum(aa) from a_star; + +-- Parallel Append with only non-partial subplans +alter table a_star set (parallel_workers = 0); +alter table b_star set (parallel_workers = 0); +alter table e_star set (parallel_workers = 0); +alter table f_star set (parallel_workers = 0); +explain (costs off) + select round(avg(aa)), sum(aa) from a_star; +select round(avg(aa)), sum(aa) from a_star; + +-- Disable Parallel Append +alter table a_star reset (parallel_workers); +alter table b_star reset (parallel_workers); +alter table c_star reset (parallel_workers); +alter table d_star reset (parallel_workers); +alter table e_star reset (parallel_workers); +alter table f_star reset (parallel_workers); +set enable_parallel_append to off; +explain (costs off) + select round(avg(aa)), sum(aa) from a_star; +select round(avg(aa)), sum(aa) from a_star; +reset enable_parallel_append; -- test with leader participation disabled set parallel_leader_participation = off; |