diff options
Diffstat (limited to 'contrib/postgres_fdw/postgres_fdw.c')
-rw-r--r-- | contrib/postgres_fdw/postgres_fdw.c | 494 |
1 files changed, 458 insertions, 36 deletions
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index daf04385321..906d6e6abd4 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -25,6 +25,7 @@ #include "nodes/makefuncs.h" #include "nodes/nodeFuncs.h" #include "optimizer/cost.h" +#include "optimizer/clauses.h" #include "optimizer/pathnode.h" #include "optimizer/paths.h" #include "optimizer/planmain.h" @@ -38,6 +39,7 @@ #include "utils/memutils.h" #include "utils/rel.h" #include "utils/sampling.h" +#include "utils/selfuncs.h" PG_MODULE_MAGIC; @@ -343,6 +345,10 @@ static void postgresGetForeignJoinPaths(PlannerInfo *root, JoinPathExtraData *extra); static bool postgresRecheckForeignScan(ForeignScanState *node, TupleTableSlot *slot); +static void postgresGetForeignUpperPaths(PlannerInfo *root, + UpperRelationKind stage, + RelOptInfo *input_rel, + RelOptInfo *output_rel); /* * Helper functions @@ -400,11 +406,15 @@ static void conversion_error_callback(void *arg); static bool foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel, JoinType jointype, RelOptInfo *outerrel, RelOptInfo *innerrel, JoinPathExtraData *extra); +static bool foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel); static List *get_useful_pathkeys_for_relation(PlannerInfo *root, RelOptInfo *rel); static List *get_useful_ecs_for_relation(PlannerInfo *root, RelOptInfo *rel); static void add_paths_with_pathkeys_for_rel(PlannerInfo *root, RelOptInfo *rel, Path *epq_path); +static void add_foreign_grouping_paths(PlannerInfo *root, + RelOptInfo *input_rel, + RelOptInfo *grouped_rel); /* @@ -455,6 +465,9 @@ postgres_fdw_handler(PG_FUNCTION_ARGS) /* Support functions for join push-down */ routine->GetForeignJoinPaths = postgresGetForeignJoinPaths; + /* Support functions for upper relation push-down */ + routine->GetForeignUpperPaths = postgresGetForeignUpperPaths; + PG_RETURN_POINTER(routine); } @@ -1120,7 +1133,7 @@ postgresGetForeignPlan(PlannerInfo *root, * rel->baserestrictinfo + parameterization clauses through * scan_clauses. For a join rel->baserestrictinfo is NIL and we are * not considering parameterization right now, so there should be no - * scan_clauses for a joinrel. + * scan_clauses for a joinrel and upper rel either. */ Assert(!scan_clauses); } @@ -1170,7 +1183,8 @@ postgresGetForeignPlan(PlannerInfo *root, local_exprs = lappend(local_exprs, rinfo->clause); } - if (foreignrel->reloptkind == RELOPT_JOINREL) + if (foreignrel->reloptkind == RELOPT_JOINREL || + foreignrel->reloptkind == RELOPT_UPPER_REL) { /* For a join relation, get the conditions from fdw_private structure */ remote_conds = fpinfo->remote_conds; @@ -1191,6 +1205,13 @@ postgresGetForeignPlan(PlannerInfo *root, { ListCell *lc; + /* + * Right now, we only consider grouping and aggregation beyond + * joins. Queries involving aggregates or grouping do not require + * EPQ mechanism, hence should not have an outer plan here. + */ + Assert(foreignrel->reloptkind != RELOPT_UPPER_REL); + outer_plan->targetlist = fdw_scan_tlist; foreach(lc, local_exprs) @@ -1228,7 +1249,8 @@ postgresGetForeignPlan(PlannerInfo *root, remote_conds, retrieved_attrs, makeInteger(fpinfo->fetch_size)); - if (foreignrel->reloptkind == RELOPT_JOINREL) + if (foreignrel->reloptkind == RELOPT_JOINREL || + foreignrel->reloptkind == RELOPT_UPPER_REL) fdw_private = lappend(fdw_private, makeString(fpinfo->relation_name->data)); @@ -1280,8 +1302,9 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) /* * Identify which user to do the remote access as. This should match what - * ExecCheckRTEPerms() does. In case of a join, use the lowest-numbered - * member RTE as a representative; we would get the same result from any. + * ExecCheckRTEPerms() does. In case of a join or aggregate, use the + * lowest-numbered member RTE as a representative; we would get the same + * result from any. */ if (fsplan->scan.scanrelid > 0) rtindex = fsplan->scan.scanrelid; @@ -2452,7 +2475,8 @@ postgresExplainDirectModify(ForeignScanState *node, ExplainState *es) /* * estimate_path_cost_size * Get cost and size estimates for a foreign scan on given foreign relation - * either a base relation or a join between foreign relations. + * either a base relation or a join between foreign relations or an upper + * relation containing foreign relations. * * param_join_conds are the parameterization clauses with outer relations. * pathkeys specify the expected sort order if any for given path being costed. @@ -2505,7 +2529,8 @@ estimate_path_cost_size(PlannerInfo *root, &remote_param_join_conds, &local_param_join_conds); /* Build the list of columns to be fetched from the foreign server. */ - if (foreignrel->reloptkind == RELOPT_JOINREL) + if (foreignrel->reloptkind == RELOPT_JOINREL || + foreignrel->reloptkind == RELOPT_UPPER_REL) fdw_scan_tlist = build_tlist_to_deparse(foreignrel); else fdw_scan_tlist = NIL; @@ -2586,25 +2611,7 @@ estimate_path_cost_size(PlannerInfo *root, startup_cost = fpinfo->rel_startup_cost; run_cost = fpinfo->rel_total_cost - fpinfo->rel_startup_cost; } - else if (foreignrel->reloptkind != RELOPT_JOINREL) - { - /* Clamp retrieved rows estimates to at most foreignrel->tuples. */ - retrieved_rows = Min(retrieved_rows, foreignrel->tuples); - - /* - * Cost as though this were a seqscan, which is pessimistic. We - * effectively imagine the local_conds are being evaluated - * remotely, too. - */ - startup_cost = 0; - run_cost = 0; - run_cost += seq_page_cost * foreignrel->pages; - - startup_cost += foreignrel->baserestrictcost.startup; - cpu_per_tuple = cpu_tuple_cost + foreignrel->baserestrictcost.per_tuple; - run_cost += cpu_per_tuple * foreignrel->tuples; - } - else + else if (foreignrel->reloptkind == RELOPT_JOINREL) { PgFdwRelationInfo *fpinfo_i; PgFdwRelationInfo *fpinfo_o; @@ -2670,6 +2677,99 @@ estimate_path_cost_size(PlannerInfo *root, run_cost += nrows * remote_conds_cost.per_tuple; run_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows; } + else if (foreignrel->reloptkind == RELOPT_UPPER_REL) + { + PgFdwRelationInfo *ofpinfo; + PathTarget *ptarget = root->upper_targets[UPPERREL_GROUP_AGG]; + AggClauseCosts aggcosts; + double input_rows; + int numGroupCols; + double numGroups = 1; + + /* + * This cost model is mixture of costing done for sorted and + * hashed aggregates in cost_agg(). We are not sure which + * strategy will be considered at remote side, thus for + * simplicity, we put all startup related costs in startup_cost + * and all finalization and run cost are added in total_cost. + * + * Also, core does not care about costing HAVING expressions and + * adding that to the costs. So similarly, here too we are not + * considering remote and local conditions for costing. + */ + + ofpinfo = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private; + + /* Get rows and width from input rel */ + input_rows = ofpinfo->rows; + width = ofpinfo->width; + + /* Collect statistics about aggregates for estimating costs. */ + MemSet(&aggcosts, 0, sizeof(AggClauseCosts)); + if (root->parse->hasAggs) + { + get_agg_clause_costs(root, (Node *) fpinfo->grouped_tlist, + AGGSPLIT_SIMPLE, &aggcosts); + get_agg_clause_costs(root, (Node *) root->parse->havingQual, + AGGSPLIT_SIMPLE, &aggcosts); + } + + /* Get number of grouping columns and possible number of groups */ + numGroupCols = list_length(root->parse->groupClause); + numGroups = estimate_num_groups(root, + get_sortgrouplist_exprs(root->parse->groupClause, + fpinfo->grouped_tlist), + input_rows, NULL); + + /* + * Number of rows expected from foreign server will be same as + * that of number of groups. + */ + rows = retrieved_rows = numGroups; + + /*----- + * Startup cost includes: + * 1. Startup cost for underneath input * relation + * 2. Cost of performing aggregation, per cost_agg() + * 3. Startup cost for PathTarget eval + *----- + */ + startup_cost = ofpinfo->rel_startup_cost; + startup_cost += aggcosts.transCost.startup; + startup_cost += aggcosts.transCost.per_tuple * input_rows; + startup_cost += (cpu_operator_cost * numGroupCols) * input_rows; + startup_cost += ptarget->cost.startup; + + /*----- + * Run time cost includes: + * 1. Run time cost of underneath input relation + * 2. Run time cost of performing aggregation, per cost_agg() + * 3. PathTarget eval cost for each output row + *----- + */ + run_cost = ofpinfo->rel_total_cost - ofpinfo->rel_startup_cost; + run_cost += aggcosts.finalCost * numGroups; + run_cost += cpu_tuple_cost * numGroups; + run_cost += ptarget->cost.per_tuple * numGroups; + } + else + { + /* Clamp retrieved rows estimates to at most foreignrel->tuples. */ + retrieved_rows = Min(retrieved_rows, foreignrel->tuples); + + /* + * Cost as though this were a seqscan, which is pessimistic. We + * effectively imagine the local_conds are being evaluated + * remotely, too. + */ + startup_cost = 0; + run_cost = 0; + run_cost += seq_page_cost * foreignrel->pages; + + startup_cost += foreignrel->baserestrictcost.startup; + cpu_per_tuple = cpu_tuple_cost + foreignrel->baserestrictcost.per_tuple; + run_cost += cpu_per_tuple * foreignrel->tuples; + } /* * Without remote estimates, we have no real way to estimate the cost @@ -4343,6 +4443,318 @@ postgresGetForeignJoinPaths(PlannerInfo *root, } /* + * Assess whether the aggregation, grouping and having operations can be pushed + * down to the foreign server. As a side effect, save information we obtain in + * this function to PgFdwRelationInfo of the input relation. + */ +static bool +foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel) +{ + Query *query = root->parse; + PathTarget *grouping_target; + PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) grouped_rel->fdw_private; + PgFdwRelationInfo *ofpinfo; + List *aggvars; + ListCell *lc; + int i; + List *tlist = NIL; + + /* Grouping Sets are not pushable */ + if (query->groupingSets) + return false; + + /* Get the fpinfo of the underlying scan relation. */ + ofpinfo = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private; + + /* + * If underneath input relation has any local conditions, those conditions + * are required to be applied before performing aggregation. Hence the + * aggregate cannot be pushed down. + */ + if (ofpinfo->local_conds) + return false; + + /* + * The targetlist expected from this node and the targetlist pushed down + * to the foreign server may be different. The latter requires + * sortgrouprefs to be set to push down GROUP BY clause, but should not + * have those arising from ORDER BY clause. These sortgrouprefs may be + * different from those in the plan's targetlist. Use a copy of path + * target to record the new sortgrouprefs. + */ + grouping_target = copy_pathtarget(root->upper_targets[UPPERREL_GROUP_AGG]); + + /* + * Evaluate grouping targets and check whether they are safe to push down + * to the foreign side. All GROUP BY expressions will be part of the + * grouping target and thus there is no need to evaluate it separately. + * While doing so, add required expressions into target list which can + * then be used to pass to foreign server. + */ + i = 0; + foreach(lc, grouping_target->exprs) + { + Expr *expr = (Expr *) lfirst(lc); + Index sgref = get_pathtarget_sortgroupref(grouping_target, i); + ListCell *l; + + /* Check whether this expression is part of GROUP BY clause */ + if (sgref && get_sortgroupref_clause_noerr(sgref, query->groupClause)) + { + /* + * If any of the GROUP BY expression is not shippable we can not + * push down aggregation to the foreign server. + */ + if (!is_foreign_expr(root, grouped_rel, expr)) + return false; + + /* Pushable, add to tlist */ + tlist = add_to_flat_tlist(tlist, list_make1(expr)); + } + else + { + /* Check entire expression whether it is pushable or not */ + if (is_foreign_expr(root, grouped_rel, expr)) + { + /* Pushable, add to tlist */ + tlist = add_to_flat_tlist(tlist, list_make1(expr)); + } + else + { + /* + * If we have sortgroupref set, then it means that we have an + * ORDER BY entry pointing to this expression. Since we are + * not pushing ORDER BY with GROUP BY, clear it. + */ + if (sgref) + grouping_target->sortgrouprefs[i] = 0; + + /* Not matched exactly, pull the var with aggregates then */ + aggvars = pull_var_clause((Node *) expr, + PVC_INCLUDE_AGGREGATES); + + if (!is_foreign_expr(root, grouped_rel, (Expr *) aggvars)) + return false; + + /* + * Add aggregates, if any, into the targetlist. Plain var + * nodes should be either same as some GROUP BY expression or + * part of some GROUP BY expression. In later case, the query + * cannot refer plain var nodes without the surrounding + * expression. In both the cases, they are already part of + * the targetlist and thus no need to add them again. In fact + * adding pulled plain var nodes in SELECT clause will cause + * an error on the foreign server if they are not same as some + * GROUP BY expression. + */ + foreach(l, aggvars) + { + Expr *expr = (Expr *) lfirst(l); + + if (IsA(expr, Aggref)) + tlist = add_to_flat_tlist(tlist, list_make1(expr)); + } + } + } + + i++; + } + + /* + * Classify the pushable and non-pushable having clauses and save them in + * remote_conds and local_conds of the grouped rel's fpinfo. + */ + if (root->hasHavingQual && query->havingQual) + { + ListCell *lc; + + foreach(lc, (List *) query->havingQual) + { + Expr *expr = (Expr *) lfirst(lc); + + if (!is_foreign_expr(root, grouped_rel, expr)) + fpinfo->local_conds = lappend(fpinfo->local_conds, expr); + else + fpinfo->remote_conds = lappend(fpinfo->remote_conds, expr); + } + } + + /* + * If there are any local conditions, pull Vars and aggregates from it and + * check whether they are safe to pushdown or not. + */ + if (fpinfo->local_conds) + { + ListCell *lc; + List *aggvars = pull_var_clause((Node *) fpinfo->local_conds, + PVC_INCLUDE_AGGREGATES); + + foreach(lc, aggvars) + { + Expr *expr = (Expr *) lfirst(lc); + + /* + * If aggregates within local conditions are not safe to push + * down, then we cannot push down the query. Vars are already + * part of GROUP BY clause which are checked above, so no need to + * access them again here. + */ + if (IsA(expr, Aggref)) + { + if (!is_foreign_expr(root, grouped_rel, expr)) + return false; + + tlist = add_to_flat_tlist(tlist, aggvars); + } + } + } + + /* Transfer any sortgroupref data to the replacement tlist */ + apply_pathtarget_labeling_to_tlist(tlist, grouping_target); + + /* Store generated targetlist */ + fpinfo->grouped_tlist = tlist; + + /* Safe to pushdown */ + fpinfo->pushdown_safe = true; + + /* + * If user is willing to estimate cost for a scan using EXPLAIN, he + * intends to estimate scans on that relation more accurately. Then, it + * makes sense to estimate the cost of the grouping on that relation more + * accurately using EXPLAIN. + */ + fpinfo->use_remote_estimate = ofpinfo->use_remote_estimate; + + /* Copy startup and tuple cost as is from underneath input rel's fpinfo */ + fpinfo->fdw_startup_cost = ofpinfo->fdw_startup_cost; + fpinfo->fdw_tuple_cost = ofpinfo->fdw_tuple_cost; + + /* + * Set cached relation costs to some negative value, so that we can detect + * when they are set to some sensible costs, during one (usually the + * first) of the calls to estimate_path_cost_size(). + */ + fpinfo->rel_startup_cost = -1; + fpinfo->rel_total_cost = -1; + + /* Set fetch size same as that of underneath input rel's fpinfo */ + fpinfo->fetch_size = ofpinfo->fetch_size; + + /* + * Set the string describing this grouped relation to be used in EXPLAIN + * output of corresponding ForeignScan. + */ + fpinfo->relation_name = makeStringInfo(); + appendStringInfo(fpinfo->relation_name, "Aggregate on (%s)", + ofpinfo->relation_name->data); + + return true; +} + +/* + * postgresGetForeignUpperPaths + * Add paths for post-join operations like aggregation, grouping etc. if + * corresponding operations are safe to push down. + * + * Right now, we only support aggregate, grouping and having clause pushdown. + */ +static void +postgresGetForeignUpperPaths(PlannerInfo *root, UpperRelationKind stage, + RelOptInfo *input_rel, RelOptInfo *output_rel) +{ + PgFdwRelationInfo *fpinfo; + + /* + * If input rel is not safe to pushdown, then simply return as we cannot + * perform any post-join operations on the foreign server. + */ + if (!input_rel->fdw_private || + !((PgFdwRelationInfo *) input_rel->fdw_private)->pushdown_safe) + return; + + /* Ignore stages we don't support; and skip any duplicate calls. */ + if (stage != UPPERREL_GROUP_AGG || output_rel->fdw_private) + return; + + fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo)); + fpinfo->pushdown_safe = false; + output_rel->fdw_private = fpinfo; + + add_foreign_grouping_paths(root, input_rel, output_rel); +} + +/* + * add_foreign_grouping_paths + * Add foreign path for grouping and/or aggregation. + * + * Given input_rel represents the underlying scan. The paths are added to the + * given grouped_rel. + */ +static void +add_foreign_grouping_paths(PlannerInfo *root, RelOptInfo *input_rel, + RelOptInfo *grouped_rel) +{ + Query *parse = root->parse; + PgFdwRelationInfo *ifpinfo = input_rel->fdw_private; + PgFdwRelationInfo *fpinfo = grouped_rel->fdw_private; + ForeignPath *grouppath; + PathTarget *grouping_target; + double rows; + int width; + Cost startup_cost; + Cost total_cost; + + /* Nothing to be done, if there is no grouping or aggregation required. */ + if (!parse->groupClause && !parse->groupingSets && !parse->hasAggs && + !root->hasHavingQual) + return; + + grouping_target = root->upper_targets[UPPERREL_GROUP_AGG]; + + /* save the input_rel as outerrel in fpinfo */ + fpinfo->outerrel = input_rel; + + /* + * Copy foreign table, foreign server, user mapping, shippable extensions + * etc. details from the input relation's fpinfo. + */ + fpinfo->table = ifpinfo->table; + fpinfo->server = ifpinfo->server; + fpinfo->user = ifpinfo->user; + fpinfo->shippable_extensions = ifpinfo->shippable_extensions; + + /* Assess if it is safe to push down aggregation and grouping. */ + if (!foreign_grouping_ok(root, grouped_rel)) + return; + + /* Estimate the cost of push down */ + estimate_path_cost_size(root, grouped_rel, NIL, NIL, &rows, + &width, &startup_cost, &total_cost); + + /* Now update this information in the fpinfo */ + fpinfo->rows = rows; + fpinfo->width = width; + fpinfo->startup_cost = startup_cost; + fpinfo->total_cost = total_cost; + + /* Create and add foreign path to the grouping relation. */ + grouppath = create_foreignscan_path(root, + grouped_rel, + grouping_target, + rows, + startup_cost, + total_cost, + NIL, /* no pathkeys */ + NULL, /* no required_outer */ + NULL, + NULL); /* no fdw_private */ + + /* Add generated path into grouped_rel by add_path(). */ + add_path(grouped_rel, (Path *) grouppath); +} + +/* * Create a tuple from the specified row of the PGresult. * * rel is the local representation of the foreign table, attinmeta is @@ -4549,24 +4961,34 @@ conversion_error_callback(void *arg) ForeignScan *fsplan = (ForeignScan *) fsstate->ss.ps.plan; EState *estate = fsstate->ss.ps.state; TargetEntry *tle; - Var *var; - RangeTblEntry *rte; Assert(IsA(fsplan, ForeignScan)); tle = (TargetEntry *) list_nth(fsplan->fdw_scan_tlist, errpos->cur_attno - 1); Assert(IsA(tle, TargetEntry)); - var = (Var *) tle->expr; - Assert(IsA(var, Var)); - rte = rt_fetch(var->varno, estate->es_range_table); + /* + * Target list can have Vars and expressions. For Vars, we can get + * it's relation, however for expressions we can't. Thus for + * expressions, just show generic context message. + */ + if (IsA(tle->expr, Var)) + { + RangeTblEntry *rte; + Var *var = (Var *) tle->expr; - if (var->varattno == 0) - is_wholerow = true; - else - attname = get_relid_attribute_name(rte->relid, var->varattno); + rte = rt_fetch(var->varno, estate->es_range_table); + + if (var->varattno == 0) + is_wholerow = true; + else + attname = get_relid_attribute_name(rte->relid, var->varattno); - relname = get_rel_name(rte->relid); + relname = get_rel_name(rte->relid); + } + else + errcontext("processing expression at position %d in select list", + errpos->cur_attno); } if (relname) |