diff options
author | Robert Haas <rhaas@postgresql.org> | 2016-10-21 09:54:29 -0400 |
---|---|---|
committer | Robert Haas <rhaas@postgresql.org> | 2016-10-21 09:54:29 -0400 |
commit | 7012b132d07c2b4ea15b0b3cb1ea9f3278801d98 (patch) | |
tree | d0a15aedea339d5b74ec63768ff5b2db0abd0c2d /contrib/postgres_fdw/postgres_fdw.c | |
parent | 709e461befa8a4999c4ccdbfc7260ef8092e805c (diff) | |
download | postgresql-7012b132d07c2b4ea15b0b3cb1ea9f3278801d98.tar.gz postgresql-7012b132d07c2b4ea15b0b3cb1ea9f3278801d98.zip |
postgres_fdw: Push down aggregates to remote servers.
Now that the upper planner uses paths, and now that we have proper hooks
to inject paths into the upper planning process, it's possible for
foreign data wrappers to arrange to push aggregates to the remote side
instead of fetching all of the rows and aggregating them locally. This
figures to be a massive win for performance, so teach postgres_fdw to
do it.
Jeevan Chalke and Ashutosh Bapat. Reviewed by Ashutosh Bapat with
additional testing by Prabhat Sahu. Various mostly cosmetic changes
by me.
Diffstat (limited to 'contrib/postgres_fdw/postgres_fdw.c')
-rw-r--r-- | contrib/postgres_fdw/postgres_fdw.c | 494 |
1 files changed, 458 insertions, 36 deletions
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index daf04385321..906d6e6abd4 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -25,6 +25,7 @@ #include "nodes/makefuncs.h" #include "nodes/nodeFuncs.h" #include "optimizer/cost.h" +#include "optimizer/clauses.h" #include "optimizer/pathnode.h" #include "optimizer/paths.h" #include "optimizer/planmain.h" @@ -38,6 +39,7 @@ #include "utils/memutils.h" #include "utils/rel.h" #include "utils/sampling.h" +#include "utils/selfuncs.h" PG_MODULE_MAGIC; @@ -343,6 +345,10 @@ static void postgresGetForeignJoinPaths(PlannerInfo *root, JoinPathExtraData *extra); static bool postgresRecheckForeignScan(ForeignScanState *node, TupleTableSlot *slot); +static void postgresGetForeignUpperPaths(PlannerInfo *root, + UpperRelationKind stage, + RelOptInfo *input_rel, + RelOptInfo *output_rel); /* * Helper functions @@ -400,11 +406,15 @@ static void conversion_error_callback(void *arg); static bool foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel, JoinType jointype, RelOptInfo *outerrel, RelOptInfo *innerrel, JoinPathExtraData *extra); +static bool foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel); static List *get_useful_pathkeys_for_relation(PlannerInfo *root, RelOptInfo *rel); static List *get_useful_ecs_for_relation(PlannerInfo *root, RelOptInfo *rel); static void add_paths_with_pathkeys_for_rel(PlannerInfo *root, RelOptInfo *rel, Path *epq_path); +static void add_foreign_grouping_paths(PlannerInfo *root, + RelOptInfo *input_rel, + RelOptInfo *grouped_rel); /* @@ -455,6 +465,9 @@ postgres_fdw_handler(PG_FUNCTION_ARGS) /* Support functions for join push-down */ routine->GetForeignJoinPaths = postgresGetForeignJoinPaths; + /* Support functions for upper relation push-down */ + routine->GetForeignUpperPaths = postgresGetForeignUpperPaths; + PG_RETURN_POINTER(routine); } @@ -1120,7 +1133,7 @@ postgresGetForeignPlan(PlannerInfo *root, * rel->baserestrictinfo + parameterization clauses through * scan_clauses. For a join rel->baserestrictinfo is NIL and we are * not considering parameterization right now, so there should be no - * scan_clauses for a joinrel. + * scan_clauses for a joinrel and upper rel either. */ Assert(!scan_clauses); } @@ -1170,7 +1183,8 @@ postgresGetForeignPlan(PlannerInfo *root, local_exprs = lappend(local_exprs, rinfo->clause); } - if (foreignrel->reloptkind == RELOPT_JOINREL) + if (foreignrel->reloptkind == RELOPT_JOINREL || + foreignrel->reloptkind == RELOPT_UPPER_REL) { /* For a join relation, get the conditions from fdw_private structure */ remote_conds = fpinfo->remote_conds; @@ -1191,6 +1205,13 @@ postgresGetForeignPlan(PlannerInfo *root, { ListCell *lc; + /* + * Right now, we only consider grouping and aggregation beyond + * joins. Queries involving aggregates or grouping do not require + * EPQ mechanism, hence should not have an outer plan here. + */ + Assert(foreignrel->reloptkind != RELOPT_UPPER_REL); + outer_plan->targetlist = fdw_scan_tlist; foreach(lc, local_exprs) @@ -1228,7 +1249,8 @@ postgresGetForeignPlan(PlannerInfo *root, remote_conds, retrieved_attrs, makeInteger(fpinfo->fetch_size)); - if (foreignrel->reloptkind == RELOPT_JOINREL) + if (foreignrel->reloptkind == RELOPT_JOINREL || + foreignrel->reloptkind == RELOPT_UPPER_REL) fdw_private = lappend(fdw_private, makeString(fpinfo->relation_name->data)); @@ -1280,8 +1302,9 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) /* * Identify which user to do the remote access as. This should match what - * ExecCheckRTEPerms() does. In case of a join, use the lowest-numbered - * member RTE as a representative; we would get the same result from any. + * ExecCheckRTEPerms() does. In case of a join or aggregate, use the + * lowest-numbered member RTE as a representative; we would get the same + * result from any. */ if (fsplan->scan.scanrelid > 0) rtindex = fsplan->scan.scanrelid; @@ -2452,7 +2475,8 @@ postgresExplainDirectModify(ForeignScanState *node, ExplainState *es) /* * estimate_path_cost_size * Get cost and size estimates for a foreign scan on given foreign relation - * either a base relation or a join between foreign relations. + * either a base relation or a join between foreign relations or an upper + * relation containing foreign relations. * * param_join_conds are the parameterization clauses with outer relations. * pathkeys specify the expected sort order if any for given path being costed. @@ -2505,7 +2529,8 @@ estimate_path_cost_size(PlannerInfo *root, &remote_param_join_conds, &local_param_join_conds); /* Build the list of columns to be fetched from the foreign server. */ - if (foreignrel->reloptkind == RELOPT_JOINREL) + if (foreignrel->reloptkind == RELOPT_JOINREL || + foreignrel->reloptkind == RELOPT_UPPER_REL) fdw_scan_tlist = build_tlist_to_deparse(foreignrel); else fdw_scan_tlist = NIL; @@ -2586,25 +2611,7 @@ estimate_path_cost_size(PlannerInfo *root, startup_cost = fpinfo->rel_startup_cost; run_cost = fpinfo->rel_total_cost - fpinfo->rel_startup_cost; } - else if (foreignrel->reloptkind != RELOPT_JOINREL) - { - /* Clamp retrieved rows estimates to at most foreignrel->tuples. */ - retrieved_rows = Min(retrieved_rows, foreignrel->tuples); - - /* - * Cost as though this were a seqscan, which is pessimistic. We - * effectively imagine the local_conds are being evaluated - * remotely, too. - */ - startup_cost = 0; - run_cost = 0; - run_cost += seq_page_cost * foreignrel->pages; - - startup_cost += foreignrel->baserestrictcost.startup; - cpu_per_tuple = cpu_tuple_cost + foreignrel->baserestrictcost.per_tuple; - run_cost += cpu_per_tuple * foreignrel->tuples; - } - else + else if (foreignrel->reloptkind == RELOPT_JOINREL) { PgFdwRelationInfo *fpinfo_i; PgFdwRelationInfo *fpinfo_o; @@ -2670,6 +2677,99 @@ estimate_path_cost_size(PlannerInfo *root, run_cost += nrows * remote_conds_cost.per_tuple; run_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows; } + else if (foreignrel->reloptkind == RELOPT_UPPER_REL) + { + PgFdwRelationInfo *ofpinfo; + PathTarget *ptarget = root->upper_targets[UPPERREL_GROUP_AGG]; + AggClauseCosts aggcosts; + double input_rows; + int numGroupCols; + double numGroups = 1; + + /* + * This cost model is mixture of costing done for sorted and + * hashed aggregates in cost_agg(). We are not sure which + * strategy will be considered at remote side, thus for + * simplicity, we put all startup related costs in startup_cost + * and all finalization and run cost are added in total_cost. + * + * Also, core does not care about costing HAVING expressions and + * adding that to the costs. So similarly, here too we are not + * considering remote and local conditions for costing. + */ + + ofpinfo = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private; + + /* Get rows and width from input rel */ + input_rows = ofpinfo->rows; + width = ofpinfo->width; + + /* Collect statistics about aggregates for estimating costs. */ + MemSet(&aggcosts, 0, sizeof(AggClauseCosts)); + if (root->parse->hasAggs) + { + get_agg_clause_costs(root, (Node *) fpinfo->grouped_tlist, + AGGSPLIT_SIMPLE, &aggcosts); + get_agg_clause_costs(root, (Node *) root->parse->havingQual, + AGGSPLIT_SIMPLE, &aggcosts); + } + + /* Get number of grouping columns and possible number of groups */ + numGroupCols = list_length(root->parse->groupClause); + numGroups = estimate_num_groups(root, + get_sortgrouplist_exprs(root->parse->groupClause, + fpinfo->grouped_tlist), + input_rows, NULL); + + /* + * Number of rows expected from foreign server will be same as + * that of number of groups. + */ + rows = retrieved_rows = numGroups; + + /*----- + * Startup cost includes: + * 1. Startup cost for underneath input * relation + * 2. Cost of performing aggregation, per cost_agg() + * 3. Startup cost for PathTarget eval + *----- + */ + startup_cost = ofpinfo->rel_startup_cost; + startup_cost += aggcosts.transCost.startup; + startup_cost += aggcosts.transCost.per_tuple * input_rows; + startup_cost += (cpu_operator_cost * numGroupCols) * input_rows; + startup_cost += ptarget->cost.startup; + + /*----- + * Run time cost includes: + * 1. Run time cost of underneath input relation + * 2. Run time cost of performing aggregation, per cost_agg() + * 3. PathTarget eval cost for each output row + *----- + */ + run_cost = ofpinfo->rel_total_cost - ofpinfo->rel_startup_cost; + run_cost += aggcosts.finalCost * numGroups; + run_cost += cpu_tuple_cost * numGroups; + run_cost += ptarget->cost.per_tuple * numGroups; + } + else + { + /* Clamp retrieved rows estimates to at most foreignrel->tuples. */ + retrieved_rows = Min(retrieved_rows, foreignrel->tuples); + + /* + * Cost as though this were a seqscan, which is pessimistic. We + * effectively imagine the local_conds are being evaluated + * remotely, too. + */ + startup_cost = 0; + run_cost = 0; + run_cost += seq_page_cost * foreignrel->pages; + + startup_cost += foreignrel->baserestrictcost.startup; + cpu_per_tuple = cpu_tuple_cost + foreignrel->baserestrictcost.per_tuple; + run_cost += cpu_per_tuple * foreignrel->tuples; + } /* * Without remote estimates, we have no real way to estimate the cost @@ -4343,6 +4443,318 @@ postgresGetForeignJoinPaths(PlannerInfo *root, } /* + * Assess whether the aggregation, grouping and having operations can be pushed + * down to the foreign server. As a side effect, save information we obtain in + * this function to PgFdwRelationInfo of the input relation. + */ +static bool +foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel) +{ + Query *query = root->parse; + PathTarget *grouping_target; + PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) grouped_rel->fdw_private; + PgFdwRelationInfo *ofpinfo; + List *aggvars; + ListCell *lc; + int i; + List *tlist = NIL; + + /* Grouping Sets are not pushable */ + if (query->groupingSets) + return false; + + /* Get the fpinfo of the underlying scan relation. */ + ofpinfo = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private; + + /* + * If underneath input relation has any local conditions, those conditions + * are required to be applied before performing aggregation. Hence the + * aggregate cannot be pushed down. + */ + if (ofpinfo->local_conds) + return false; + + /* + * The targetlist expected from this node and the targetlist pushed down + * to the foreign server may be different. The latter requires + * sortgrouprefs to be set to push down GROUP BY clause, but should not + * have those arising from ORDER BY clause. These sortgrouprefs may be + * different from those in the plan's targetlist. Use a copy of path + * target to record the new sortgrouprefs. + */ + grouping_target = copy_pathtarget(root->upper_targets[UPPERREL_GROUP_AGG]); + + /* + * Evaluate grouping targets and check whether they are safe to push down + * to the foreign side. All GROUP BY expressions will be part of the + * grouping target and thus there is no need to evaluate it separately. + * While doing so, add required expressions into target list which can + * then be used to pass to foreign server. + */ + i = 0; + foreach(lc, grouping_target->exprs) + { + Expr *expr = (Expr *) lfirst(lc); + Index sgref = get_pathtarget_sortgroupref(grouping_target, i); + ListCell *l; + + /* Check whether this expression is part of GROUP BY clause */ + if (sgref && get_sortgroupref_clause_noerr(sgref, query->groupClause)) + { + /* + * If any of the GROUP BY expression is not shippable we can not + * push down aggregation to the foreign server. + */ + if (!is_foreign_expr(root, grouped_rel, expr)) + return false; + + /* Pushable, add to tlist */ + tlist = add_to_flat_tlist(tlist, list_make1(expr)); + } + else + { + /* Check entire expression whether it is pushable or not */ + if (is_foreign_expr(root, grouped_rel, expr)) + { + /* Pushable, add to tlist */ + tlist = add_to_flat_tlist(tlist, list_make1(expr)); + } + else + { + /* + * If we have sortgroupref set, then it means that we have an + * ORDER BY entry pointing to this expression. Since we are + * not pushing ORDER BY with GROUP BY, clear it. + */ + if (sgref) + grouping_target->sortgrouprefs[i] = 0; + + /* Not matched exactly, pull the var with aggregates then */ + aggvars = pull_var_clause((Node *) expr, + PVC_INCLUDE_AGGREGATES); + + if (!is_foreign_expr(root, grouped_rel, (Expr *) aggvars)) + return false; + + /* + * Add aggregates, if any, into the targetlist. Plain var + * nodes should be either same as some GROUP BY expression or + * part of some GROUP BY expression. In later case, the query + * cannot refer plain var nodes without the surrounding + * expression. In both the cases, they are already part of + * the targetlist and thus no need to add them again. In fact + * adding pulled plain var nodes in SELECT clause will cause + * an error on the foreign server if they are not same as some + * GROUP BY expression. + */ + foreach(l, aggvars) + { + Expr *expr = (Expr *) lfirst(l); + + if (IsA(expr, Aggref)) + tlist = add_to_flat_tlist(tlist, list_make1(expr)); + } + } + } + + i++; + } + + /* + * Classify the pushable and non-pushable having clauses and save them in + * remote_conds and local_conds of the grouped rel's fpinfo. + */ + if (root->hasHavingQual && query->havingQual) + { + ListCell *lc; + + foreach(lc, (List *) query->havingQual) + { + Expr *expr = (Expr *) lfirst(lc); + + if (!is_foreign_expr(root, grouped_rel, expr)) + fpinfo->local_conds = lappend(fpinfo->local_conds, expr); + else + fpinfo->remote_conds = lappend(fpinfo->remote_conds, expr); + } + } + + /* + * If there are any local conditions, pull Vars and aggregates from it and + * check whether they are safe to pushdown or not. + */ + if (fpinfo->local_conds) + { + ListCell *lc; + List *aggvars = pull_var_clause((Node *) fpinfo->local_conds, + PVC_INCLUDE_AGGREGATES); + + foreach(lc, aggvars) + { + Expr *expr = (Expr *) lfirst(lc); + + /* + * If aggregates within local conditions are not safe to push + * down, then we cannot push down the query. Vars are already + * part of GROUP BY clause which are checked above, so no need to + * access them again here. + */ + if (IsA(expr, Aggref)) + { + if (!is_foreign_expr(root, grouped_rel, expr)) + return false; + + tlist = add_to_flat_tlist(tlist, aggvars); + } + } + } + + /* Transfer any sortgroupref data to the replacement tlist */ + apply_pathtarget_labeling_to_tlist(tlist, grouping_target); + + /* Store generated targetlist */ + fpinfo->grouped_tlist = tlist; + + /* Safe to pushdown */ + fpinfo->pushdown_safe = true; + + /* + * If user is willing to estimate cost for a scan using EXPLAIN, he + * intends to estimate scans on that relation more accurately. Then, it + * makes sense to estimate the cost of the grouping on that relation more + * accurately using EXPLAIN. + */ + fpinfo->use_remote_estimate = ofpinfo->use_remote_estimate; + + /* Copy startup and tuple cost as is from underneath input rel's fpinfo */ + fpinfo->fdw_startup_cost = ofpinfo->fdw_startup_cost; + fpinfo->fdw_tuple_cost = ofpinfo->fdw_tuple_cost; + + /* + * Set cached relation costs to some negative value, so that we can detect + * when they are set to some sensible costs, during one (usually the + * first) of the calls to estimate_path_cost_size(). + */ + fpinfo->rel_startup_cost = -1; + fpinfo->rel_total_cost = -1; + + /* Set fetch size same as that of underneath input rel's fpinfo */ + fpinfo->fetch_size = ofpinfo->fetch_size; + + /* + * Set the string describing this grouped relation to be used in EXPLAIN + * output of corresponding ForeignScan. + */ + fpinfo->relation_name = makeStringInfo(); + appendStringInfo(fpinfo->relation_name, "Aggregate on (%s)", + ofpinfo->relation_name->data); + + return true; +} + +/* + * postgresGetForeignUpperPaths + * Add paths for post-join operations like aggregation, grouping etc. if + * corresponding operations are safe to push down. + * + * Right now, we only support aggregate, grouping and having clause pushdown. + */ +static void +postgresGetForeignUpperPaths(PlannerInfo *root, UpperRelationKind stage, + RelOptInfo *input_rel, RelOptInfo *output_rel) +{ + PgFdwRelationInfo *fpinfo; + + /* + * If input rel is not safe to pushdown, then simply return as we cannot + * perform any post-join operations on the foreign server. + */ + if (!input_rel->fdw_private || + !((PgFdwRelationInfo *) input_rel->fdw_private)->pushdown_safe) + return; + + /* Ignore stages we don't support; and skip any duplicate calls. */ + if (stage != UPPERREL_GROUP_AGG || output_rel->fdw_private) + return; + + fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo)); + fpinfo->pushdown_safe = false; + output_rel->fdw_private = fpinfo; + + add_foreign_grouping_paths(root, input_rel, output_rel); +} + +/* + * add_foreign_grouping_paths + * Add foreign path for grouping and/or aggregation. + * + * Given input_rel represents the underlying scan. The paths are added to the + * given grouped_rel. + */ +static void +add_foreign_grouping_paths(PlannerInfo *root, RelOptInfo *input_rel, + RelOptInfo *grouped_rel) +{ + Query *parse = root->parse; + PgFdwRelationInfo *ifpinfo = input_rel->fdw_private; + PgFdwRelationInfo *fpinfo = grouped_rel->fdw_private; + ForeignPath *grouppath; + PathTarget *grouping_target; + double rows; + int width; + Cost startup_cost; + Cost total_cost; + + /* Nothing to be done, if there is no grouping or aggregation required. */ + if (!parse->groupClause && !parse->groupingSets && !parse->hasAggs && + !root->hasHavingQual) + return; + + grouping_target = root->upper_targets[UPPERREL_GROUP_AGG]; + + /* save the input_rel as outerrel in fpinfo */ + fpinfo->outerrel = input_rel; + + /* + * Copy foreign table, foreign server, user mapping, shippable extensions + * etc. details from the input relation's fpinfo. + */ + fpinfo->table = ifpinfo->table; + fpinfo->server = ifpinfo->server; + fpinfo->user = ifpinfo->user; + fpinfo->shippable_extensions = ifpinfo->shippable_extensions; + + /* Assess if it is safe to push down aggregation and grouping. */ + if (!foreign_grouping_ok(root, grouped_rel)) + return; + + /* Estimate the cost of push down */ + estimate_path_cost_size(root, grouped_rel, NIL, NIL, &rows, + &width, &startup_cost, &total_cost); + + /* Now update this information in the fpinfo */ + fpinfo->rows = rows; + fpinfo->width = width; + fpinfo->startup_cost = startup_cost; + fpinfo->total_cost = total_cost; + + /* Create and add foreign path to the grouping relation. */ + grouppath = create_foreignscan_path(root, + grouped_rel, + grouping_target, + rows, + startup_cost, + total_cost, + NIL, /* no pathkeys */ + NULL, /* no required_outer */ + NULL, + NULL); /* no fdw_private */ + + /* Add generated path into grouped_rel by add_path(). */ + add_path(grouped_rel, (Path *) grouppath); +} + +/* * Create a tuple from the specified row of the PGresult. * * rel is the local representation of the foreign table, attinmeta is @@ -4549,24 +4961,34 @@ conversion_error_callback(void *arg) ForeignScan *fsplan = (ForeignScan *) fsstate->ss.ps.plan; EState *estate = fsstate->ss.ps.state; TargetEntry *tle; - Var *var; - RangeTblEntry *rte; Assert(IsA(fsplan, ForeignScan)); tle = (TargetEntry *) list_nth(fsplan->fdw_scan_tlist, errpos->cur_attno - 1); Assert(IsA(tle, TargetEntry)); - var = (Var *) tle->expr; - Assert(IsA(var, Var)); - rte = rt_fetch(var->varno, estate->es_range_table); + /* + * Target list can have Vars and expressions. For Vars, we can get + * it's relation, however for expressions we can't. Thus for + * expressions, just show generic context message. + */ + if (IsA(tle->expr, Var)) + { + RangeTblEntry *rte; + Var *var = (Var *) tle->expr; - if (var->varattno == 0) - is_wholerow = true; - else - attname = get_relid_attribute_name(rte->relid, var->varattno); + rte = rt_fetch(var->varno, estate->es_range_table); + + if (var->varattno == 0) + is_wholerow = true; + else + attname = get_relid_attribute_name(rte->relid, var->varattno); - relname = get_rel_name(rte->relid); + relname = get_rel_name(rte->relid); + } + else + errcontext("processing expression at position %d in select list", + errpos->cur_attno); } if (relname) |