aboutsummaryrefslogtreecommitdiff
path: root/contrib/postgres_fdw/postgres_fdw.c
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/postgres_fdw/postgres_fdw.c')
-rw-r--r--contrib/postgres_fdw/postgres_fdw.c494
1 files changed, 458 insertions, 36 deletions
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index daf04385321..906d6e6abd4 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -25,6 +25,7 @@
#include "nodes/makefuncs.h"
#include "nodes/nodeFuncs.h"
#include "optimizer/cost.h"
+#include "optimizer/clauses.h"
#include "optimizer/pathnode.h"
#include "optimizer/paths.h"
#include "optimizer/planmain.h"
@@ -38,6 +39,7 @@
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/sampling.h"
+#include "utils/selfuncs.h"
PG_MODULE_MAGIC;
@@ -343,6 +345,10 @@ static void postgresGetForeignJoinPaths(PlannerInfo *root,
JoinPathExtraData *extra);
static bool postgresRecheckForeignScan(ForeignScanState *node,
TupleTableSlot *slot);
+static void postgresGetForeignUpperPaths(PlannerInfo *root,
+ UpperRelationKind stage,
+ RelOptInfo *input_rel,
+ RelOptInfo *output_rel);
/*
* Helper functions
@@ -400,11 +406,15 @@ static void conversion_error_callback(void *arg);
static bool foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel,
JoinType jointype, RelOptInfo *outerrel, RelOptInfo *innerrel,
JoinPathExtraData *extra);
+static bool foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel);
static List *get_useful_pathkeys_for_relation(PlannerInfo *root,
RelOptInfo *rel);
static List *get_useful_ecs_for_relation(PlannerInfo *root, RelOptInfo *rel);
static void add_paths_with_pathkeys_for_rel(PlannerInfo *root, RelOptInfo *rel,
Path *epq_path);
+static void add_foreign_grouping_paths(PlannerInfo *root,
+ RelOptInfo *input_rel,
+ RelOptInfo *grouped_rel);
/*
@@ -455,6 +465,9 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
/* Support functions for join push-down */
routine->GetForeignJoinPaths = postgresGetForeignJoinPaths;
+ /* Support functions for upper relation push-down */
+ routine->GetForeignUpperPaths = postgresGetForeignUpperPaths;
+
PG_RETURN_POINTER(routine);
}
@@ -1120,7 +1133,7 @@ postgresGetForeignPlan(PlannerInfo *root,
* rel->baserestrictinfo + parameterization clauses through
* scan_clauses. For a join rel->baserestrictinfo is NIL and we are
* not considering parameterization right now, so there should be no
- * scan_clauses for a joinrel.
+ * scan_clauses for a joinrel and upper rel either.
*/
Assert(!scan_clauses);
}
@@ -1170,7 +1183,8 @@ postgresGetForeignPlan(PlannerInfo *root,
local_exprs = lappend(local_exprs, rinfo->clause);
}
- if (foreignrel->reloptkind == RELOPT_JOINREL)
+ if (foreignrel->reloptkind == RELOPT_JOINREL ||
+ foreignrel->reloptkind == RELOPT_UPPER_REL)
{
/* For a join relation, get the conditions from fdw_private structure */
remote_conds = fpinfo->remote_conds;
@@ -1191,6 +1205,13 @@ postgresGetForeignPlan(PlannerInfo *root,
{
ListCell *lc;
+ /*
+ * Right now, we only consider grouping and aggregation beyond
+ * joins. Queries involving aggregates or grouping do not require
+ * EPQ mechanism, hence should not have an outer plan here.
+ */
+ Assert(foreignrel->reloptkind != RELOPT_UPPER_REL);
+
outer_plan->targetlist = fdw_scan_tlist;
foreach(lc, local_exprs)
@@ -1228,7 +1249,8 @@ postgresGetForeignPlan(PlannerInfo *root,
remote_conds,
retrieved_attrs,
makeInteger(fpinfo->fetch_size));
- if (foreignrel->reloptkind == RELOPT_JOINREL)
+ if (foreignrel->reloptkind == RELOPT_JOINREL ||
+ foreignrel->reloptkind == RELOPT_UPPER_REL)
fdw_private = lappend(fdw_private,
makeString(fpinfo->relation_name->data));
@@ -1280,8 +1302,9 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
/*
* Identify which user to do the remote access as. This should match what
- * ExecCheckRTEPerms() does. In case of a join, use the lowest-numbered
- * member RTE as a representative; we would get the same result from any.
+ * ExecCheckRTEPerms() does. In case of a join or aggregate, use the
+ * lowest-numbered member RTE as a representative; we would get the same
+ * result from any.
*/
if (fsplan->scan.scanrelid > 0)
rtindex = fsplan->scan.scanrelid;
@@ -2452,7 +2475,8 @@ postgresExplainDirectModify(ForeignScanState *node, ExplainState *es)
/*
* estimate_path_cost_size
* Get cost and size estimates for a foreign scan on given foreign relation
- * either a base relation or a join between foreign relations.
+ * either a base relation or a join between foreign relations or an upper
+ * relation containing foreign relations.
*
* param_join_conds are the parameterization clauses with outer relations.
* pathkeys specify the expected sort order if any for given path being costed.
@@ -2505,7 +2529,8 @@ estimate_path_cost_size(PlannerInfo *root,
&remote_param_join_conds, &local_param_join_conds);
/* Build the list of columns to be fetched from the foreign server. */
- if (foreignrel->reloptkind == RELOPT_JOINREL)
+ if (foreignrel->reloptkind == RELOPT_JOINREL ||
+ foreignrel->reloptkind == RELOPT_UPPER_REL)
fdw_scan_tlist = build_tlist_to_deparse(foreignrel);
else
fdw_scan_tlist = NIL;
@@ -2586,25 +2611,7 @@ estimate_path_cost_size(PlannerInfo *root,
startup_cost = fpinfo->rel_startup_cost;
run_cost = fpinfo->rel_total_cost - fpinfo->rel_startup_cost;
}
- else if (foreignrel->reloptkind != RELOPT_JOINREL)
- {
- /* Clamp retrieved rows estimates to at most foreignrel->tuples. */
- retrieved_rows = Min(retrieved_rows, foreignrel->tuples);
-
- /*
- * Cost as though this were a seqscan, which is pessimistic. We
- * effectively imagine the local_conds are being evaluated
- * remotely, too.
- */
- startup_cost = 0;
- run_cost = 0;
- run_cost += seq_page_cost * foreignrel->pages;
-
- startup_cost += foreignrel->baserestrictcost.startup;
- cpu_per_tuple = cpu_tuple_cost + foreignrel->baserestrictcost.per_tuple;
- run_cost += cpu_per_tuple * foreignrel->tuples;
- }
- else
+ else if (foreignrel->reloptkind == RELOPT_JOINREL)
{
PgFdwRelationInfo *fpinfo_i;
PgFdwRelationInfo *fpinfo_o;
@@ -2670,6 +2677,99 @@ estimate_path_cost_size(PlannerInfo *root,
run_cost += nrows * remote_conds_cost.per_tuple;
run_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
}
+ else if (foreignrel->reloptkind == RELOPT_UPPER_REL)
+ {
+ PgFdwRelationInfo *ofpinfo;
+ PathTarget *ptarget = root->upper_targets[UPPERREL_GROUP_AGG];
+ AggClauseCosts aggcosts;
+ double input_rows;
+ int numGroupCols;
+ double numGroups = 1;
+
+ /*
+ * This cost model is mixture of costing done for sorted and
+ * hashed aggregates in cost_agg(). We are not sure which
+ * strategy will be considered at remote side, thus for
+ * simplicity, we put all startup related costs in startup_cost
+ * and all finalization and run cost are added in total_cost.
+ *
+ * Also, core does not care about costing HAVING expressions and
+ * adding that to the costs. So similarly, here too we are not
+ * considering remote and local conditions for costing.
+ */
+
+ ofpinfo = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
+
+ /* Get rows and width from input rel */
+ input_rows = ofpinfo->rows;
+ width = ofpinfo->width;
+
+ /* Collect statistics about aggregates for estimating costs. */
+ MemSet(&aggcosts, 0, sizeof(AggClauseCosts));
+ if (root->parse->hasAggs)
+ {
+ get_agg_clause_costs(root, (Node *) fpinfo->grouped_tlist,
+ AGGSPLIT_SIMPLE, &aggcosts);
+ get_agg_clause_costs(root, (Node *) root->parse->havingQual,
+ AGGSPLIT_SIMPLE, &aggcosts);
+ }
+
+ /* Get number of grouping columns and possible number of groups */
+ numGroupCols = list_length(root->parse->groupClause);
+ numGroups = estimate_num_groups(root,
+ get_sortgrouplist_exprs(root->parse->groupClause,
+ fpinfo->grouped_tlist),
+ input_rows, NULL);
+
+ /*
+ * Number of rows expected from foreign server will be same as
+ * that of number of groups.
+ */
+ rows = retrieved_rows = numGroups;
+
+ /*-----
+ * Startup cost includes:
+ * 1. Startup cost for underneath input * relation
+ * 2. Cost of performing aggregation, per cost_agg()
+ * 3. Startup cost for PathTarget eval
+ *-----
+ */
+ startup_cost = ofpinfo->rel_startup_cost;
+ startup_cost += aggcosts.transCost.startup;
+ startup_cost += aggcosts.transCost.per_tuple * input_rows;
+ startup_cost += (cpu_operator_cost * numGroupCols) * input_rows;
+ startup_cost += ptarget->cost.startup;
+
+ /*-----
+ * Run time cost includes:
+ * 1. Run time cost of underneath input relation
+ * 2. Run time cost of performing aggregation, per cost_agg()
+ * 3. PathTarget eval cost for each output row
+ *-----
+ */
+ run_cost = ofpinfo->rel_total_cost - ofpinfo->rel_startup_cost;
+ run_cost += aggcosts.finalCost * numGroups;
+ run_cost += cpu_tuple_cost * numGroups;
+ run_cost += ptarget->cost.per_tuple * numGroups;
+ }
+ else
+ {
+ /* Clamp retrieved rows estimates to at most foreignrel->tuples. */
+ retrieved_rows = Min(retrieved_rows, foreignrel->tuples);
+
+ /*
+ * Cost as though this were a seqscan, which is pessimistic. We
+ * effectively imagine the local_conds are being evaluated
+ * remotely, too.
+ */
+ startup_cost = 0;
+ run_cost = 0;
+ run_cost += seq_page_cost * foreignrel->pages;
+
+ startup_cost += foreignrel->baserestrictcost.startup;
+ cpu_per_tuple = cpu_tuple_cost + foreignrel->baserestrictcost.per_tuple;
+ run_cost += cpu_per_tuple * foreignrel->tuples;
+ }
/*
* Without remote estimates, we have no real way to estimate the cost
@@ -4343,6 +4443,318 @@ postgresGetForeignJoinPaths(PlannerInfo *root,
}
/*
+ * Assess whether the aggregation, grouping and having operations can be pushed
+ * down to the foreign server. As a side effect, save information we obtain in
+ * this function to PgFdwRelationInfo of the input relation.
+ */
+static bool
+foreign_grouping_ok(PlannerInfo *root, RelOptInfo *grouped_rel)
+{
+ Query *query = root->parse;
+ PathTarget *grouping_target;
+ PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) grouped_rel->fdw_private;
+ PgFdwRelationInfo *ofpinfo;
+ List *aggvars;
+ ListCell *lc;
+ int i;
+ List *tlist = NIL;
+
+ /* Grouping Sets are not pushable */
+ if (query->groupingSets)
+ return false;
+
+ /* Get the fpinfo of the underlying scan relation. */
+ ofpinfo = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
+
+ /*
+ * If underneath input relation has any local conditions, those conditions
+ * are required to be applied before performing aggregation. Hence the
+ * aggregate cannot be pushed down.
+ */
+ if (ofpinfo->local_conds)
+ return false;
+
+ /*
+ * The targetlist expected from this node and the targetlist pushed down
+ * to the foreign server may be different. The latter requires
+ * sortgrouprefs to be set to push down GROUP BY clause, but should not
+ * have those arising from ORDER BY clause. These sortgrouprefs may be
+ * different from those in the plan's targetlist. Use a copy of path
+ * target to record the new sortgrouprefs.
+ */
+ grouping_target = copy_pathtarget(root->upper_targets[UPPERREL_GROUP_AGG]);
+
+ /*
+ * Evaluate grouping targets and check whether they are safe to push down
+ * to the foreign side. All GROUP BY expressions will be part of the
+ * grouping target and thus there is no need to evaluate it separately.
+ * While doing so, add required expressions into target list which can
+ * then be used to pass to foreign server.
+ */
+ i = 0;
+ foreach(lc, grouping_target->exprs)
+ {
+ Expr *expr = (Expr *) lfirst(lc);
+ Index sgref = get_pathtarget_sortgroupref(grouping_target, i);
+ ListCell *l;
+
+ /* Check whether this expression is part of GROUP BY clause */
+ if (sgref && get_sortgroupref_clause_noerr(sgref, query->groupClause))
+ {
+ /*
+ * If any of the GROUP BY expression is not shippable we can not
+ * push down aggregation to the foreign server.
+ */
+ if (!is_foreign_expr(root, grouped_rel, expr))
+ return false;
+
+ /* Pushable, add to tlist */
+ tlist = add_to_flat_tlist(tlist, list_make1(expr));
+ }
+ else
+ {
+ /* Check entire expression whether it is pushable or not */
+ if (is_foreign_expr(root, grouped_rel, expr))
+ {
+ /* Pushable, add to tlist */
+ tlist = add_to_flat_tlist(tlist, list_make1(expr));
+ }
+ else
+ {
+ /*
+ * If we have sortgroupref set, then it means that we have an
+ * ORDER BY entry pointing to this expression. Since we are
+ * not pushing ORDER BY with GROUP BY, clear it.
+ */
+ if (sgref)
+ grouping_target->sortgrouprefs[i] = 0;
+
+ /* Not matched exactly, pull the var with aggregates then */
+ aggvars = pull_var_clause((Node *) expr,
+ PVC_INCLUDE_AGGREGATES);
+
+ if (!is_foreign_expr(root, grouped_rel, (Expr *) aggvars))
+ return false;
+
+ /*
+ * Add aggregates, if any, into the targetlist. Plain var
+ * nodes should be either same as some GROUP BY expression or
+ * part of some GROUP BY expression. In later case, the query
+ * cannot refer plain var nodes without the surrounding
+ * expression. In both the cases, they are already part of
+ * the targetlist and thus no need to add them again. In fact
+ * adding pulled plain var nodes in SELECT clause will cause
+ * an error on the foreign server if they are not same as some
+ * GROUP BY expression.
+ */
+ foreach(l, aggvars)
+ {
+ Expr *expr = (Expr *) lfirst(l);
+
+ if (IsA(expr, Aggref))
+ tlist = add_to_flat_tlist(tlist, list_make1(expr));
+ }
+ }
+ }
+
+ i++;
+ }
+
+ /*
+ * Classify the pushable and non-pushable having clauses and save them in
+ * remote_conds and local_conds of the grouped rel's fpinfo.
+ */
+ if (root->hasHavingQual && query->havingQual)
+ {
+ ListCell *lc;
+
+ foreach(lc, (List *) query->havingQual)
+ {
+ Expr *expr = (Expr *) lfirst(lc);
+
+ if (!is_foreign_expr(root, grouped_rel, expr))
+ fpinfo->local_conds = lappend(fpinfo->local_conds, expr);
+ else
+ fpinfo->remote_conds = lappend(fpinfo->remote_conds, expr);
+ }
+ }
+
+ /*
+ * If there are any local conditions, pull Vars and aggregates from it and
+ * check whether they are safe to pushdown or not.
+ */
+ if (fpinfo->local_conds)
+ {
+ ListCell *lc;
+ List *aggvars = pull_var_clause((Node *) fpinfo->local_conds,
+ PVC_INCLUDE_AGGREGATES);
+
+ foreach(lc, aggvars)
+ {
+ Expr *expr = (Expr *) lfirst(lc);
+
+ /*
+ * If aggregates within local conditions are not safe to push
+ * down, then we cannot push down the query. Vars are already
+ * part of GROUP BY clause which are checked above, so no need to
+ * access them again here.
+ */
+ if (IsA(expr, Aggref))
+ {
+ if (!is_foreign_expr(root, grouped_rel, expr))
+ return false;
+
+ tlist = add_to_flat_tlist(tlist, aggvars);
+ }
+ }
+ }
+
+ /* Transfer any sortgroupref data to the replacement tlist */
+ apply_pathtarget_labeling_to_tlist(tlist, grouping_target);
+
+ /* Store generated targetlist */
+ fpinfo->grouped_tlist = tlist;
+
+ /* Safe to pushdown */
+ fpinfo->pushdown_safe = true;
+
+ /*
+ * If user is willing to estimate cost for a scan using EXPLAIN, he
+ * intends to estimate scans on that relation more accurately. Then, it
+ * makes sense to estimate the cost of the grouping on that relation more
+ * accurately using EXPLAIN.
+ */
+ fpinfo->use_remote_estimate = ofpinfo->use_remote_estimate;
+
+ /* Copy startup and tuple cost as is from underneath input rel's fpinfo */
+ fpinfo->fdw_startup_cost = ofpinfo->fdw_startup_cost;
+ fpinfo->fdw_tuple_cost = ofpinfo->fdw_tuple_cost;
+
+ /*
+ * Set cached relation costs to some negative value, so that we can detect
+ * when they are set to some sensible costs, during one (usually the
+ * first) of the calls to estimate_path_cost_size().
+ */
+ fpinfo->rel_startup_cost = -1;
+ fpinfo->rel_total_cost = -1;
+
+ /* Set fetch size same as that of underneath input rel's fpinfo */
+ fpinfo->fetch_size = ofpinfo->fetch_size;
+
+ /*
+ * Set the string describing this grouped relation to be used in EXPLAIN
+ * output of corresponding ForeignScan.
+ */
+ fpinfo->relation_name = makeStringInfo();
+ appendStringInfo(fpinfo->relation_name, "Aggregate on (%s)",
+ ofpinfo->relation_name->data);
+
+ return true;
+}
+
+/*
+ * postgresGetForeignUpperPaths
+ * Add paths for post-join operations like aggregation, grouping etc. if
+ * corresponding operations are safe to push down.
+ *
+ * Right now, we only support aggregate, grouping and having clause pushdown.
+ */
+static void
+postgresGetForeignUpperPaths(PlannerInfo *root, UpperRelationKind stage,
+ RelOptInfo *input_rel, RelOptInfo *output_rel)
+{
+ PgFdwRelationInfo *fpinfo;
+
+ /*
+ * If input rel is not safe to pushdown, then simply return as we cannot
+ * perform any post-join operations on the foreign server.
+ */
+ if (!input_rel->fdw_private ||
+ !((PgFdwRelationInfo *) input_rel->fdw_private)->pushdown_safe)
+ return;
+
+ /* Ignore stages we don't support; and skip any duplicate calls. */
+ if (stage != UPPERREL_GROUP_AGG || output_rel->fdw_private)
+ return;
+
+ fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
+ fpinfo->pushdown_safe = false;
+ output_rel->fdw_private = fpinfo;
+
+ add_foreign_grouping_paths(root, input_rel, output_rel);
+}
+
+/*
+ * add_foreign_grouping_paths
+ * Add foreign path for grouping and/or aggregation.
+ *
+ * Given input_rel represents the underlying scan. The paths are added to the
+ * given grouped_rel.
+ */
+static void
+add_foreign_grouping_paths(PlannerInfo *root, RelOptInfo *input_rel,
+ RelOptInfo *grouped_rel)
+{
+ Query *parse = root->parse;
+ PgFdwRelationInfo *ifpinfo = input_rel->fdw_private;
+ PgFdwRelationInfo *fpinfo = grouped_rel->fdw_private;
+ ForeignPath *grouppath;
+ PathTarget *grouping_target;
+ double rows;
+ int width;
+ Cost startup_cost;
+ Cost total_cost;
+
+ /* Nothing to be done, if there is no grouping or aggregation required. */
+ if (!parse->groupClause && !parse->groupingSets && !parse->hasAggs &&
+ !root->hasHavingQual)
+ return;
+
+ grouping_target = root->upper_targets[UPPERREL_GROUP_AGG];
+
+ /* save the input_rel as outerrel in fpinfo */
+ fpinfo->outerrel = input_rel;
+
+ /*
+ * Copy foreign table, foreign server, user mapping, shippable extensions
+ * etc. details from the input relation's fpinfo.
+ */
+ fpinfo->table = ifpinfo->table;
+ fpinfo->server = ifpinfo->server;
+ fpinfo->user = ifpinfo->user;
+ fpinfo->shippable_extensions = ifpinfo->shippable_extensions;
+
+ /* Assess if it is safe to push down aggregation and grouping. */
+ if (!foreign_grouping_ok(root, grouped_rel))
+ return;
+
+ /* Estimate the cost of push down */
+ estimate_path_cost_size(root, grouped_rel, NIL, NIL, &rows,
+ &width, &startup_cost, &total_cost);
+
+ /* Now update this information in the fpinfo */
+ fpinfo->rows = rows;
+ fpinfo->width = width;
+ fpinfo->startup_cost = startup_cost;
+ fpinfo->total_cost = total_cost;
+
+ /* Create and add foreign path to the grouping relation. */
+ grouppath = create_foreignscan_path(root,
+ grouped_rel,
+ grouping_target,
+ rows,
+ startup_cost,
+ total_cost,
+ NIL, /* no pathkeys */
+ NULL, /* no required_outer */
+ NULL,
+ NULL); /* no fdw_private */
+
+ /* Add generated path into grouped_rel by add_path(). */
+ add_path(grouped_rel, (Path *) grouppath);
+}
+
+/*
* Create a tuple from the specified row of the PGresult.
*
* rel is the local representation of the foreign table, attinmeta is
@@ -4549,24 +4961,34 @@ conversion_error_callback(void *arg)
ForeignScan *fsplan = (ForeignScan *) fsstate->ss.ps.plan;
EState *estate = fsstate->ss.ps.state;
TargetEntry *tle;
- Var *var;
- RangeTblEntry *rte;
Assert(IsA(fsplan, ForeignScan));
tle = (TargetEntry *) list_nth(fsplan->fdw_scan_tlist,
errpos->cur_attno - 1);
Assert(IsA(tle, TargetEntry));
- var = (Var *) tle->expr;
- Assert(IsA(var, Var));
- rte = rt_fetch(var->varno, estate->es_range_table);
+ /*
+ * Target list can have Vars and expressions. For Vars, we can get
+ * it's relation, however for expressions we can't. Thus for
+ * expressions, just show generic context message.
+ */
+ if (IsA(tle->expr, Var))
+ {
+ RangeTblEntry *rte;
+ Var *var = (Var *) tle->expr;
- if (var->varattno == 0)
- is_wholerow = true;
- else
- attname = get_relid_attribute_name(rte->relid, var->varattno);
+ rte = rt_fetch(var->varno, estate->es_range_table);
+
+ if (var->varattno == 0)
+ is_wholerow = true;
+ else
+ attname = get_relid_attribute_name(rte->relid, var->varattno);
- relname = get_rel_name(rte->relid);
+ relname = get_rel_name(rte->relid);
+ }
+ else
+ errcontext("processing expression at position %d in select list",
+ errpos->cur_attno);
}
if (relname)