aboutsummaryrefslogtreecommitdiff
path: root/contrib/postgres_fdw/postgres_fdw.c
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/postgres_fdw/postgres_fdw.c')
-rw-r--r--contrib/postgres_fdw/postgres_fdw.c835
1 files changed, 756 insertions, 79 deletions
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index d5a2af9428c..14a3f9891a9 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -28,9 +28,9 @@
#include "optimizer/pathnode.h"
#include "optimizer/paths.h"
#include "optimizer/planmain.h"
-#include "optimizer/prep.h"
#include "optimizer/restrictinfo.h"
#include "optimizer/var.h"
+#include "optimizer/tlist.h"
#include "parser/parsetree.h"
#include "utils/builtins.h"
#include "utils/guc.h"
@@ -64,7 +64,15 @@ enum FdwScanPrivateIndex
/* Integer list of attribute numbers retrieved by the SELECT */
FdwScanPrivateRetrievedAttrs,
/* Integer representing the desired fetch_size */
- FdwScanPrivateFetchSize
+ FdwScanPrivateFetchSize,
+ /* Oid of user mapping to be used while connecting to the foreign server */
+ FdwScanPrivateUserMappingOid,
+
+ /*
+ * String describing join i.e. names of relations being joined and types
+ * of join, added when the scan is join
+ */
+ FdwScanPrivateRelations
};
/*
@@ -94,7 +102,9 @@ enum FdwModifyPrivateIndex
*/
typedef struct PgFdwScanState
{
- Relation rel; /* relcache entry for the foreign table */
+ Relation rel; /* relcache entry for the foreign table. NULL
+ * for a foreign join scan. */
+ TupleDesc tupdesc; /* tuple descriptor of scan */
AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
/* extracted fdw_private data */
@@ -182,8 +192,16 @@ typedef struct PgFdwAnalyzeState
*/
typedef struct ConversionLocation
{
- Relation rel; /* foreign table's relcache entry */
+ Relation rel; /* foreign table's relcache entry. */
AttrNumber cur_attno; /* attribute number being processed, or 0 */
+
+ /*
+ * In case of foreign join push down, fdw_scan_tlist is used to identify
+ * the Var node corresponding to the error location and
+ * fsstate->ss.ps.state gives access to the RTEs of corresponding relation
+ * to get the relation name and attribute name.
+ */
+ ForeignScanState *fsstate;
} ConversionLocation;
/* Callback argument for ec_member_matches_foreign */
@@ -257,6 +275,14 @@ static bool postgresAnalyzeForeignTable(Relation relation,
BlockNumber *totalpages);
static List *postgresImportForeignSchema(ImportForeignSchemaStmt *stmt,
Oid serverOid);
+static void postgresGetForeignJoinPaths(PlannerInfo *root,
+ RelOptInfo *joinrel,
+ RelOptInfo *outerrel,
+ RelOptInfo *innerrel,
+ JoinType jointype,
+ JoinPathExtraData *extra);
+static bool postgresRecheckForeignScan(ForeignScanState *node,
+ TupleTableSlot *slot);
static List *get_useful_pathkeys_for_relation(PlannerInfo *root,
RelOptInfo *rel);
static List *get_useful_ecs_for_relation(PlannerInfo *root, RelOptInfo *rel);
@@ -299,8 +325,12 @@ static HeapTuple make_tuple_from_result_row(PGresult *res,
Relation rel,
AttInMetadata *attinmeta,
List *retrieved_attrs,
+ ForeignScanState *fsstate,
MemoryContext temp_context);
static void conversion_error_callback(void *arg);
+static bool foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel,
+ JoinType jointype, RelOptInfo *outerrel, RelOptInfo *innerrel,
+ JoinPathExtraData *extra);
/*
@@ -331,6 +361,8 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
routine->EndForeignModify = postgresEndForeignModify;
routine->IsForeignRelUpdatable = postgresIsForeignRelUpdatable;
+ /* Function for EvalPlanQual rechecks */
+ routine->RecheckForeignScan = postgresRecheckForeignScan;
/* Support functions for EXPLAIN */
routine->ExplainForeignScan = postgresExplainForeignScan;
routine->ExplainForeignModify = postgresExplainForeignModify;
@@ -341,6 +373,9 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
/* Support functions for IMPORT FOREIGN SCHEMA */
routine->ImportForeignSchema = postgresImportForeignSchema;
+ /* Support functions for join push-down */
+ routine->GetForeignJoinPaths = postgresGetForeignJoinPaths;
+
PG_RETURN_POINTER(routine);
}
@@ -358,6 +393,10 @@ postgresGetForeignRelSize(PlannerInfo *root,
{
PgFdwRelationInfo *fpinfo;
ListCell *lc;
+ RangeTblEntry *rte = planner_rt_fetch(baserel->relid, root);
+ const char *namespace;
+ const char *relname;
+ const char *refname;
/*
* We use PgFdwRelationInfo to pass various information to subsequent
@@ -366,6 +405,9 @@ postgresGetForeignRelSize(PlannerInfo *root,
fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
baserel->fdw_private = (void *) fpinfo;
+ /* Base foreign tables need to be push down always. */
+ fpinfo->pushdown_safe = true;
+
/* Look up foreign-table catalog info. */
fpinfo->table = GetForeignTable(foreigntableid);
fpinfo->server = GetForeignServer(fpinfo->table->serverid);
@@ -414,7 +456,6 @@ postgresGetForeignRelSize(PlannerInfo *root,
*/
if (fpinfo->use_remote_estimate)
{
- RangeTblEntry *rte = planner_rt_fetch(baserel->relid, root);
Oid userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
fpinfo->user = GetUserMapping(userid, fpinfo->server->serverid);
@@ -509,6 +550,23 @@ postgresGetForeignRelSize(PlannerInfo *root,
&fpinfo->rows, &fpinfo->width,
&fpinfo->startup_cost, &fpinfo->total_cost);
}
+
+ /*
+ * Set the name of relation in fpinfo, while we are constructing it here.
+ * It will be used to build the string describing the join relation in
+ * EXPLAIN output. We can't know whether VERBOSE option is specified or
+ * not, so always schema-qualify the foreign table name.
+ */
+ fpinfo->relation_name = makeStringInfo();
+ namespace = get_namespace_name(get_rel_namespace(foreigntableid));
+ relname = get_rel_name(foreigntableid);
+ refname = rte->eref->aliasname;
+ appendStringInfo(fpinfo->relation_name, "%s.%s",
+ quote_identifier(namespace),
+ quote_identifier(relname));
+ if (*refname && strcmp(refname, relname) != 0)
+ appendStringInfo(fpinfo->relation_name, " %s",
+ quote_identifier(rte->eref->aliasname));
}
/*
@@ -935,15 +993,15 @@ postgresGetForeignPaths(PlannerInfo *root,
*/
static ForeignScan *
postgresGetForeignPlan(PlannerInfo *root,
- RelOptInfo *baserel,
+ RelOptInfo *foreignrel,
Oid foreigntableid,
ForeignPath *best_path,
List *tlist,
List *scan_clauses,
Plan *outer_plan)
{
- PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private;
- Index scan_relid = baserel->relid;
+ PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
+ Index scan_relid;
List *fdw_private;
List *remote_conds = NIL;
List *remote_exprs = NIL;
@@ -952,6 +1010,28 @@ postgresGetForeignPlan(PlannerInfo *root,
List *retrieved_attrs;
StringInfoData sql;
ListCell *lc;
+ List *fdw_scan_tlist = NIL;
+
+ /*
+ * For base relations, set scan_relid as the relid of the relation. For
+ * other kinds of relations set it to 0.
+ */
+ if (foreignrel->reloptkind == RELOPT_BASEREL ||
+ foreignrel->reloptkind == RELOPT_OTHER_MEMBER_REL)
+ scan_relid = foreignrel->relid;
+ else
+ {
+ scan_relid = 0;
+
+ /*
+ * create_scan_plan() and create_foreignscan_plan() pass
+ * rel->baserestrictinfo + parameterization clauses through
+ * scan_clauses. For a join rel->baserestrictinfo is NIL and we are
+ * not considering parameterization right now, so there should be no
+ * scan_clauses for a joinrel.
+ */
+ Assert(!scan_clauses);
+ }
/*
* Separate the scan_clauses into those that can be executed remotely and
@@ -989,7 +1069,7 @@ postgresGetForeignPlan(PlannerInfo *root,
}
else if (list_member_ptr(fpinfo->local_conds, rinfo))
local_exprs = lappend(local_exprs, rinfo->clause);
- else if (is_foreign_expr(root, baserel, rinfo->clause))
+ else if (is_foreign_expr(root, foreignrel, rinfo->clause))
{
remote_conds = lappend(remote_conds, rinfo);
remote_exprs = lappend(remote_exprs, rinfo->clause);
@@ -998,26 +1078,70 @@ postgresGetForeignPlan(PlannerInfo *root,
local_exprs = lappend(local_exprs, rinfo->clause);
}
+ if (foreignrel->reloptkind == RELOPT_JOINREL)
+ {
+ /* For a join relation, get the conditions from fdw_private structure */
+ remote_conds = fpinfo->remote_conds;
+ local_exprs = fpinfo->local_conds;
+
+ /* Build the list of columns to be fetched from the foreign server. */
+ fdw_scan_tlist = build_tlist_to_deparse(foreignrel);
+
+ /*
+ * Ensure that the outer plan produces a tuple whose descriptor
+ * matches our scan tuple slot. This is safe because all scans and
+ * joins support projection, so we never need to insert a Result node.
+ * Also, remove the local conditions from outer plan's quals, lest
+ * they will be evaluated twice, once by the local plan and once by
+ * the scan.
+ */
+ if (outer_plan)
+ {
+ ListCell *lc;
+
+ outer_plan->targetlist = fdw_scan_tlist;
+
+ foreach(lc, local_exprs)
+ {
+ Join *join_plan = (Join *) outer_plan;
+ Node *qual = lfirst(lc);
+
+ outer_plan->qual = list_delete(outer_plan->qual, qual);
+
+ /*
+ * For an inner join the local conditions of foreign scan plan
+ * can be part of the joinquals as well.
+ */
+ if (join_plan->jointype == JOIN_INNER)
+ join_plan->joinqual = list_delete(join_plan->joinqual,
+ qual);
+ }
+ }
+ }
+
/*
* Build the query string to be sent for execution, and identify
* expressions to be sent as parameters.
*/
initStringInfo(&sql);
- deparseSelectStmtForRel(&sql, root, baserel, remote_conds,
- best_path->path.pathkeys, &retrieved_attrs,
- &params_list);
+ deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist,
+ remote_conds, best_path->path.pathkeys,
+ &retrieved_attrs, &params_list);
/*
* Build the fdw_private list that will be available to the executor.
- * Items in the list must match enum FdwScanPrivateIndex, above.
+ * Items in the list must match order in enum FdwScanPrivateIndex.
*/
- fdw_private = list_make3(makeString(sql.data),
+ fdw_private = list_make4(makeString(sql.data),
retrieved_attrs,
- makeInteger(fpinfo->fetch_size));
+ makeInteger(fpinfo->fetch_size),
+ makeInteger(foreignrel->umid));
+ if (foreignrel->reloptkind == RELOPT_JOINREL)
+ fdw_private = lappend(fdw_private,
+ makeString(fpinfo->relation_name->data));
/*
- * Create the ForeignScan node from target list, filtering expressions,
- * remote parameter expressions, and FDW private information.
+ * Create the ForeignScan node for the given relation.
*
* Note that the remote parameter expressions are stored in the fdw_exprs
* field of the finished plan node; we can't keep them in private state
@@ -1028,7 +1152,7 @@ postgresGetForeignPlan(PlannerInfo *root,
scan_relid,
params_list,
fdw_private,
- NIL, /* no custom tlist */
+ fdw_scan_tlist,
remote_exprs,
outer_plan);
}
@@ -1043,9 +1167,6 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
EState *estate = node->ss.ps.state;
PgFdwScanState *fsstate;
- RangeTblEntry *rte;
- Oid userid;
- ForeignTable *table;
UserMapping *user;
int numParams;
int i;
@@ -1064,16 +1185,36 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
node->fdw_state = (void *) fsstate;
/*
- * Identify which user to do the remote access as. This should match what
- * ExecCheckRTEPerms() does.
+ * Obtain the foreign server where to connect and user mapping to use for
+ * connection. For base relations we obtain this information from
+ * catalogs. For join relations, this information is frozen at the time of
+ * planning to ensure that the join is safe to pushdown. In case the
+ * information goes stale between planning and execution, plan will be
+ * invalidated and replanned.
*/
- rte = rt_fetch(fsplan->scan.scanrelid, estate->es_range_table);
- userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
+ if (fsplan->scan.scanrelid > 0)
+ {
+ ForeignTable *table;
- /* Get info about foreign table. */
- fsstate->rel = node->ss.ss_currentRelation;
- table = GetForeignTable(RelationGetRelid(fsstate->rel));
- user = GetUserMapping(userid, table->serverid);
+ /*
+ * Identify which user to do the remote access as. This should match
+ * what ExecCheckRTEPerms() does.
+ */
+ RangeTblEntry *rte = rt_fetch(fsplan->scan.scanrelid, estate->es_range_table);
+ Oid userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
+
+ fsstate->rel = node->ss.ss_currentRelation;
+ table = GetForeignTable(RelationGetRelid(fsstate->rel));
+
+ user = GetUserMapping(userid, table->serverid);
+ }
+ else
+ {
+ Oid umid = intVal(list_nth(fsplan->fdw_private, FdwScanPrivateUserMappingOid));
+
+ user = GetUserMappingById(umid);
+ Assert(fsplan->fs_server == user->serverid);
+ }
/*
* Get connection to the foreign server. Connection manager will
@@ -1105,8 +1246,16 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
ALLOCSET_SMALL_INITSIZE,
ALLOCSET_SMALL_MAXSIZE);
- /* Get info we'll need for input data conversion. */
- fsstate->attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(fsstate->rel));
+ /*
+ * Get info we'll need for converting data fetched from the foreign server
+ * into local representation and error reporting during that process.
+ */
+ if (fsplan->scan.scanrelid > 0)
+ fsstate->tupdesc = RelationGetDescr(fsstate->rel);
+ else
+ fsstate->tupdesc = node->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
+
+ fsstate->attinmeta = TupleDescGetAttInMetadata(fsstate->tupdesc);
/* Prepare for output conversion of parameters used in remote query. */
numParams = list_length(fsplan->fdw_exprs);
@@ -1825,6 +1974,34 @@ postgresIsForeignRelUpdatable(Relation rel)
}
/*
+ * postgresRecheckForeignScan
+ * Execute a local join execution plan for a foreign join
+ */
+static bool
+postgresRecheckForeignScan(ForeignScanState *node, TupleTableSlot *slot)
+{
+ Index scanrelid = ((Scan *) node->ss.ps.plan)->scanrelid;
+ PlanState *outerPlan = outerPlanState(node);
+ TupleTableSlot *result;
+
+ /* For base foreign relations, it suffices to set fdw_recheck_quals */
+ if (scanrelid > 0)
+ return true;
+
+ Assert(outerPlan != NULL);
+
+ /* Execute a local join execution plan */
+ result = ExecProcNode(outerPlan);
+ if (TupIsNull(result))
+ return false;
+
+ /* Store result in the given slot */
+ ExecCopySlot(slot, result);
+
+ return true;
+}
+
+/*
* postgresExplainForeignScan
* Produce extra output for EXPLAIN of a ForeignScan on a foreign table
*/
@@ -1833,10 +2010,25 @@ postgresExplainForeignScan(ForeignScanState *node, ExplainState *es)
{
List *fdw_private;
char *sql;
+ char *relations;
+
+ fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private;
+
+ /*
+ * Add names of relation handled by the foreign scan when the scan is a
+ * join
+ */
+ if (list_length(fdw_private) > FdwScanPrivateRelations)
+ {
+ relations = strVal(list_nth(fdw_private, FdwScanPrivateRelations));
+ ExplainPropertyText("Relations", relations, es);
+ }
+ /*
+ * Add remote query, when VERBOSE option is specified.
+ */
if (es->verbose)
{
- fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private;
sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
ExplainPropertyText("Remote SQL", sql, es);
}
@@ -1865,26 +2057,29 @@ postgresExplainForeignModify(ModifyTableState *mtstate,
/*
* estimate_path_cost_size
- * Get cost and size estimates for a foreign scan
+ * Get cost and size estimates for a foreign scan on given foreign relation
+ * either a base relation or a join between foreign relations.
*
- * We assume that all the baserestrictinfo clauses will be applied, plus
- * any join clauses listed in join_conds.
+ * param_join_conds are the parameterization clauses with outer relations.
+ * pathkeys specify the expected sort order if any for given path being costed.
+ *
+ * The function returns the cost and size estimates in p_row, p_width,
+ * p_startup_cost and p_total_cost variables.
*/
static void
estimate_path_cost_size(PlannerInfo *root,
- RelOptInfo *baserel,
- List *join_conds,
+ RelOptInfo *foreignrel,
+ List *param_join_conds,
List *pathkeys,
double *p_rows, int *p_width,
Cost *p_startup_cost, Cost *p_total_cost)
{
- PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private;
+ PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
double rows;
double retrieved_rows;
int width;
Cost startup_cost;
Cost total_cost;
- Cost run_cost;
Cost cpu_per_tuple;
/*
@@ -1896,39 +2091,49 @@ estimate_path_cost_size(PlannerInfo *root,
*/
if (fpinfo->use_remote_estimate)
{
- List *remote_join_conds;
- List *local_join_conds;
+ List *remote_param_join_conds;
+ List *local_param_join_conds;
StringInfoData sql;
- List *retrieved_attrs;
PGconn *conn;
Selectivity local_sel;
QualCost local_cost;
+ List *fdw_scan_tlist = NIL;
List *remote_conds;
+ /* Required only to be passed to deparseSelectStmtForRel */
+ List *retrieved_attrs;
+
/*
- * join_conds might contain both clauses that are safe to send across,
- * and clauses that aren't.
+ * param_join_conds might contain both clauses that are safe to send
+ * across, and clauses that aren't.
*/
- classifyConditions(root, baserel, join_conds,
- &remote_join_conds, &local_join_conds);
+ classifyConditions(root, foreignrel, param_join_conds,
+ &remote_param_join_conds, &local_param_join_conds);
+
+ /* Build the list of columns to be fetched from the foreign server. */
+ if (foreignrel->reloptkind == RELOPT_JOINREL)
+ fdw_scan_tlist = build_tlist_to_deparse(foreignrel);
+ else
+ fdw_scan_tlist = NIL;
/*
* The complete list of remote conditions includes everything from
* baserestrictinfo plus any extra join_conds relevant to this
* particular path.
*/
- remote_conds = list_concat(list_copy(remote_join_conds),
+ remote_conds = list_concat(list_copy(remote_param_join_conds),
fpinfo->remote_conds);
/*
* Construct EXPLAIN query including the desired SELECT, FROM, and
- * WHERE clauses. Params and other-relation Vars are replaced by
- * dummy values.
+ * WHERE clauses. Params and other-relation Vars are replaced by dummy
+ * values, so don't request params_list.
*/
initStringInfo(&sql);
appendStringInfoString(&sql, "EXPLAIN ");
- deparseSelectStmtForRel(&sql, root, baserel, remote_conds, pathkeys,
- &retrieved_attrs, NULL);
+ deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist,
+ remote_conds, pathkeys, &retrieved_attrs,
+ NULL);
/* Get the remote estimate */
conn = GetConnection(fpinfo->user, false);
@@ -1940,8 +2145,8 @@ estimate_path_cost_size(PlannerInfo *root,
/* Factor in the selectivity of the locally-checked quals */
local_sel = clauselist_selectivity(root,
- local_join_conds,
- baserel->relid,
+ local_param_join_conds,
+ foreignrel->relid,
JOIN_INNER,
NULL);
local_sel *= fpinfo->local_conds_sel;
@@ -1951,41 +2156,113 @@ estimate_path_cost_size(PlannerInfo *root,
/* Add in the eval cost of the locally-checked quals */
startup_cost += fpinfo->local_conds_cost.startup;
total_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
- cost_qual_eval(&local_cost, local_join_conds, root);
+ cost_qual_eval(&local_cost, local_param_join_conds, root);
startup_cost += local_cost.startup;
total_cost += local_cost.per_tuple * retrieved_rows;
}
else
{
+ Cost run_cost = 0;
+
/*
* We don't support join conditions in this mode (hence, no
* parameterized paths can be made).
*/
- Assert(join_conds == NIL);
-
- /* Use rows/width estimates made by set_baserel_size_estimates. */
- rows = baserel->rows;
- width = baserel->width;
+ Assert(param_join_conds == NIL);
/*
- * Back into an estimate of the number of retrieved rows. Just in
- * case this is nuts, clamp to at most baserel->tuples.
+ * Use rows/width estimates made by set_baserel_size_estimates() for
+ * base foreign relations and set_joinrel_size_estimates() for join
+ * between foreign relations.
*/
+ rows = foreignrel->rows;
+ width = foreignrel->width;
+
+ /* Back into an estimate of the number of retrieved rows. */
retrieved_rows = clamp_row_est(rows / fpinfo->local_conds_sel);
- retrieved_rows = Min(retrieved_rows, baserel->tuples);
- /*
- * Cost as though this were a seqscan, which is pessimistic. We
- * effectively imagine the local_conds are being evaluated remotely,
- * too.
- */
- startup_cost = 0;
- run_cost = 0;
- run_cost += seq_page_cost * baserel->pages;
+ if (foreignrel->reloptkind != RELOPT_JOINREL)
+ {
+ /* Clamp retrieved rows estimates to at most foreignrel->tuples. */
+ retrieved_rows = Min(retrieved_rows, foreignrel->tuples);
+
+ /*
+ * Cost as though this were a seqscan, which is pessimistic. We
+ * effectively imagine the local_conds are being evaluated
+ * remotely, too.
+ */
+ startup_cost = 0;
+ run_cost = 0;
+ run_cost += seq_page_cost * foreignrel->pages;
- startup_cost += baserel->baserestrictcost.startup;
- cpu_per_tuple = cpu_tuple_cost + baserel->baserestrictcost.per_tuple;
- run_cost += cpu_per_tuple * baserel->tuples;
+ startup_cost += foreignrel->baserestrictcost.startup;
+ cpu_per_tuple = cpu_tuple_cost + foreignrel->baserestrictcost.per_tuple;
+ run_cost += cpu_per_tuple * foreignrel->tuples;
+ }
+ else
+ {
+ PgFdwRelationInfo *fpinfo_i;
+ PgFdwRelationInfo *fpinfo_o;
+ QualCost join_cost;
+ QualCost remote_conds_cost;
+ double nrows;
+
+ /* For join we expect inner and outer relations set */
+ Assert(fpinfo->innerrel && fpinfo->outerrel);
+
+ fpinfo_i = (PgFdwRelationInfo *) fpinfo->innerrel->fdw_private;
+ fpinfo_o = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private;
+
+ /* Estimate of number of rows in cross product */
+ nrows = fpinfo_i->rows * fpinfo_o->rows;
+ /* Clamp retrieved rows estimate to at most size of cross product */
+ retrieved_rows = Min(retrieved_rows, nrows);
+
+ /*
+ * The cost of foreign join is estimated as cost of generating
+ * rows for the joining relations + cost for applying quals on the
+ * rows.
+ */
+
+ /* Calculate the cost of clauses pushed down the foreign server */
+ cost_qual_eval(&remote_conds_cost, fpinfo->remote_conds, root);
+ /* Calculate the cost of applying join clauses */
+ cost_qual_eval(&join_cost, fpinfo->joinclauses, root);
+
+ /*
+ * Startup cost includes startup cost of joining relations and the
+ * startup cost for join and other clauses. We do not include the
+ * startup cost specific to join strategy (e.g. setting up hash
+ * tables) since we do not know what strategy the foreign server
+ * is going to use.
+ */
+ startup_cost = fpinfo_i->rel_startup_cost + fpinfo_o->rel_startup_cost;
+ startup_cost += join_cost.startup;
+ startup_cost += remote_conds_cost.startup;
+ startup_cost += fpinfo->local_conds_cost.startup;
+
+ /*
+ * Run time cost includes:
+ *
+ * 1. Run time cost (total_cost - startup_cost) of relations being
+ * joined
+ *
+ * 2. Run time cost of applying join clauses on the cross product
+ * of the joining relations.
+ *
+ * 3. Run time cost of applying pushed down other clauses on the
+ * result of join
+ *
+ * 4. Run time cost of applying nonpushable other clauses locally
+ * on the result fetched from the foreign server.
+ */
+ run_cost = fpinfo_i->rel_total_cost - fpinfo_i->rel_startup_cost;
+ run_cost += fpinfo_o->rel_total_cost - fpinfo_o->rel_startup_cost;
+ run_cost += nrows * join_cost.per_tuple;
+ nrows = clamp_row_est(nrows * fpinfo->joinclause_sel);
+ run_cost += nrows * remote_conds_cost.per_tuple;
+ run_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows;
+ }
/*
* Without remote estimates, we have no real way to estimate the cost
@@ -2006,6 +2283,15 @@ estimate_path_cost_size(PlannerInfo *root,
}
/*
+ * Cache the costs prior to adding the costs for transferring data from
+ * the foreign server. These costs are useful for costing the join between
+ * this relation and another foreign relation, when the cost of join can
+ * not be obtained from the foreign server.
+ */
+ fpinfo->rel_startup_cost = startup_cost;
+ fpinfo->rel_total_cost = total_cost;
+
+ /*
* Add some additional cost factors to account for connection overhead
* (fdw_startup_cost), transferring data across the network
* (fdw_tuple_cost per retrieved row), and local manipulation of the data
@@ -2237,11 +2523,15 @@ fetch_more_data(ForeignScanState *node)
for (i = 0; i < numrows; i++)
{
+ ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
+
+ Assert(IsA(fsplan, ForeignScan));
fsstate->tuples[i] =
make_tuple_from_result_row(res, i,
fsstate->rel,
fsstate->attinmeta,
fsstate->retrieved_attrs,
+ node,
fsstate->temp_cxt);
}
@@ -2460,6 +2750,7 @@ store_returning_result(PgFdwModifyState *fmstate,
fmstate->rel,
fmstate->attinmeta,
fmstate->retrieved_attrs,
+ NULL,
fmstate->temp_cxt);
/* tuple will be deleted when it is cleared from the slot */
ExecStoreTuple(newtup, slot, InvalidBuffer, true);
@@ -2770,6 +3061,7 @@ analyze_row_processor(PGresult *res, int row, PgFdwAnalyzeState *astate)
astate->rel,
astate->attinmeta,
astate->retrieved_attrs,
+ NULL,
astate->temp_cxt);
MemoryContextSwitchTo(oldcontext);
@@ -3045,6 +3337,345 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
}
/*
+ * Assess whether the join between inner and outer relations can be pushed down
+ * to the foreign server. As a side effect, save information we obtain in this
+ * function to PgFdwRelationInfo passed in.
+ *
+ * Joins that satisfy conditions below are safe to push down.
+ *
+ * 1) Join type is INNER or OUTER (one of LEFT/RIGHT/FULL)
+ * 2) Both outer and inner portions are safe to push-down
+ * 3) All foreign tables in the join belong to the same foreign server and use
+ * the same user mapping.
+ * 4) All join conditions are safe to push down
+ * 5) No relation has local filter (this can be relaxed for INNER JOIN, if we
+ * can move unpushable clauses upwards in the join tree).
+ */
+static bool
+foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel, JoinType jointype,
+ RelOptInfo *outerrel, RelOptInfo *innerrel,
+ JoinPathExtraData *extra)
+{
+ PgFdwRelationInfo *fpinfo;
+ PgFdwRelationInfo *fpinfo_o;
+ PgFdwRelationInfo *fpinfo_i;
+ ListCell *lc;
+ List *joinclauses;
+ List *otherclauses;
+
+ /*
+ * We support pushing down INNER, LEFT, RIGHT and FULL OUTER joins.
+ * Constructing queries representing SEMI and ANTI joins is hard, hence
+ * not considered right now.
+ */
+ if (jointype != JOIN_INNER && jointype != JOIN_LEFT &&
+ jointype != JOIN_RIGHT && jointype != JOIN_FULL)
+ return false;
+
+ /*
+ * If either of the joining relations is marked as unsafe to pushdown, the
+ * join can not be pushed down.
+ */
+ fpinfo = (PgFdwRelationInfo *) joinrel->fdw_private;
+ fpinfo_o = (PgFdwRelationInfo *) outerrel->fdw_private;
+ fpinfo_i = (PgFdwRelationInfo *) innerrel->fdw_private;
+ if (!fpinfo_o || !fpinfo_o->pushdown_safe ||
+ !fpinfo_i || !fpinfo_i->pushdown_safe)
+ return false;
+
+ /*
+ * If joining relations have local conditions, those conditions are
+ * required to be applied before joining the relations. Hence the join can
+ * not be pushed down.
+ */
+ if (fpinfo_o->local_conds || fpinfo_i->local_conds)
+ return false;
+
+ /* Separate restrict list into join quals and quals on join relation */
+ if (IS_OUTER_JOIN(jointype))
+ extract_actual_join_clauses(extra->restrictlist, &joinclauses, &otherclauses);
+ else
+ {
+ /*
+ * Unlike an outer join, for inner join, the join result contains only
+ * the rows which satisfy join clauses, similar to the other clause.
+ * Hence all clauses can be treated as other quals. This helps to push
+ * a join down to the foreign server even if some of its join quals
+ * are not safe to pushdown.
+ */
+ otherclauses = extract_actual_clauses(extra->restrictlist, false);
+ joinclauses = NIL;
+ }
+
+ /* Join quals must be safe to push down. */
+ foreach(lc, joinclauses)
+ {
+ Expr *expr = (Expr *) lfirst(lc);
+
+ if (!is_foreign_expr(root, joinrel, expr))
+ return false;
+ }
+
+ /* Save the join clauses, for later use. */
+ fpinfo->joinclauses = joinclauses;
+
+ /*
+ * Other clauses are applied after the join has been performed and thus
+ * need not be all pushable. We will push those which can be pushed to
+ * reduce the number of rows fetched from the foreign server. Rest of them
+ * will be applied locally after fetching join result. Add them to fpinfo
+ * so that other joins involving this joinrel will know that this joinrel
+ * has local clauses.
+ */
+ foreach(lc, otherclauses)
+ {
+ Expr *expr = (Expr *) lfirst(lc);
+
+ if (!is_foreign_expr(root, joinrel, expr))
+ fpinfo->local_conds = lappend(fpinfo->local_conds, expr);
+ else
+ fpinfo->remote_conds = lappend(fpinfo->remote_conds, expr);
+ }
+
+ fpinfo->outerrel = outerrel;
+ fpinfo->innerrel = innerrel;
+ fpinfo->jointype = jointype;
+
+ /*
+ * If user is willing to estimate cost for a scan of either of the joining
+ * relations using EXPLAIN, he intends to estimate scans on that relation
+ * more accurately. Then, it makes sense to estimate the cost the join
+ * with that relation more accurately using EXPLAIN.
+ */
+ fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate ||
+ fpinfo_i->use_remote_estimate;
+
+ /*
+ * Since both the joining relations come from the same server, the server
+ * level options should have same value for both the relations. Pick from
+ * any side.
+ */
+ fpinfo->fdw_startup_cost = fpinfo_o->fdw_startup_cost;
+ fpinfo->fdw_tuple_cost = fpinfo_o->fdw_tuple_cost;
+
+ /* Mark that this join can be pushed down safely */
+ fpinfo->pushdown_safe = true;
+
+ /*
+ * Set fetch size to maximum of the joining sides, since we are expecting
+ * the rows returned by the join to be proportional to the relation sizes.
+ */
+ if (fpinfo_o->fetch_size > fpinfo_i->fetch_size)
+ fpinfo->fetch_size = fpinfo_o->fetch_size;
+ else
+ fpinfo->fetch_size = fpinfo_i->fetch_size;
+
+ /*
+ * Pull the other remote conditions from the joining relations into join
+ * clauses or other remote clauses (remote_conds) of this relation. This
+ * avoids building subqueries at every join step.
+ *
+ * For an inner join, clauses from both the relations are added to the
+ * other remote clauses. For an OUTER join, the clauses from the outer
+ * side are added to remote_conds since those can be evaluated after the
+ * join is evaluated. The clauses from inner side are added to the
+ * joinclauses, since they need to evaluated while constructing the join.
+ *
+ * The joining sides can not have local conditions, thus no need to test
+ * shippability of the clauses being pulled up.
+ */
+ switch (jointype)
+ {
+ case JOIN_INNER:
+ fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
+ fpinfo_i->remote_conds);
+ fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
+ fpinfo_o->remote_conds);
+ break;
+
+ case JOIN_LEFT:
+ fpinfo->joinclauses = list_concat(fpinfo->joinclauses,
+ fpinfo_i->remote_conds);
+ fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
+ fpinfo_o->remote_conds);
+ break;
+
+ case JOIN_RIGHT:
+ fpinfo->joinclauses = list_concat(fpinfo->joinclauses,
+ fpinfo_o->remote_conds);
+ fpinfo->remote_conds = list_concat(fpinfo->remote_conds,
+ fpinfo_i->remote_conds);
+ break;
+
+ case JOIN_FULL:
+ fpinfo->joinclauses = list_concat(fpinfo->joinclauses,
+ fpinfo_i->remote_conds);
+ fpinfo->joinclauses = list_concat(fpinfo->joinclauses,
+ fpinfo_o->remote_conds);
+ break;
+
+ default:
+ /* Should not happen, we have just check this above */
+ elog(ERROR, "unsupported join type %d", jointype);
+ }
+
+ /*
+ * Set the string describing this join relation to be used in EXPLAIN
+ * output of corresponding ForeignScan.
+ */
+ fpinfo->relation_name = makeStringInfo();
+ appendStringInfo(fpinfo->relation_name, "(%s) %s JOIN (%s)",
+ fpinfo_o->relation_name->data,
+ get_jointype_name(fpinfo->jointype),
+ fpinfo_i->relation_name->data);
+
+ return true;
+}
+
+/*
+ * postgresGetForeignJoinPaths
+ * Add possible ForeignPath to joinrel, if join is safe to push down.
+ */
+static void
+postgresGetForeignJoinPaths(PlannerInfo *root,
+ RelOptInfo *joinrel,
+ RelOptInfo *outerrel,
+ RelOptInfo *innerrel,
+ JoinType jointype,
+ JoinPathExtraData *extra)
+{
+ PgFdwRelationInfo *fpinfo;
+ ForeignPath *joinpath;
+ double rows;
+ int width;
+ Cost startup_cost;
+ Cost total_cost;
+ Path *epq_path; /* Path to create plan to be executed when
+ * EvalPlanQual gets triggered. */
+
+ /*
+ * Skip if this join combination has been considered already.
+ */
+ if (joinrel->fdw_private)
+ return;
+
+ /*
+ * Create unfinished PgFdwRelationInfo entry which is used to indicate
+ * that the join relation is already considered, so that we won't waste
+ * time in judging safety of join pushdown and adding the same paths again
+ * if found safe. Once we know that this join can be pushed down, we fill
+ * the entry.
+ */
+ fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
+ fpinfo->pushdown_safe = false;
+ joinrel->fdw_private = fpinfo;
+ /* attrs_used is only for base relations. */
+ fpinfo->attrs_used = NULL;
+
+ /*
+ * In case there is a possibility that EvalPlanQual will be executed, we
+ * should be able to reconstruct the row, from base relations applying all
+ * the conditions. We create a local plan from a suitable local path
+ * available in the path list. In case such a path doesn't exist, we can
+ * not push the join to the foreign server since we won't be able to
+ * reconstruct the row for EvalPlanQual(). Find an alternative local path
+ * before we add ForeignPath, lest the new path would kick possibly the
+ * only local path. Do this before calling foreign_join_ok(), since that
+ * function updates fpinfo and marks it as pushable if the join is found
+ * to be pushable.
+ */
+ if (root->parse->commandType == CMD_DELETE ||
+ root->parse->commandType == CMD_UPDATE ||
+ root->rowMarks)
+ {
+ epq_path = GetExistingLocalJoinPath(joinrel);
+ if (!epq_path)
+ {
+ elog(DEBUG3, "could not push down foreign join because a local path suitable for EPQ checks was not found");
+ return;
+ }
+ }
+ else
+ epq_path = NULL;
+
+ if (!foreign_join_ok(root, joinrel, jointype, outerrel, innerrel, extra))
+ {
+ /* Free path required for EPQ if we copied one; we don't need it now */
+ if (epq_path)
+ pfree(epq_path);
+ return;
+ }
+
+ /*
+ * Compute the selectivity and cost of the local_conds, so we don't have
+ * to do it over again for each path. The best we can do for these
+ * conditions is to estimate selectivity on the basis of local statistics.
+ * The local conditions are applied after the join has been computed on
+ * the remote side like quals in WHERE clause, so pass jointype as
+ * JOIN_INNER.
+ */
+ fpinfo->local_conds_sel = clauselist_selectivity(root,
+ fpinfo->local_conds,
+ 0,
+ JOIN_INNER,
+ NULL);
+ cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root);
+
+ /*
+ * If we are going to estimate the costs using EXPLAIN, we will need
+ * connection information. Fill it here.
+ */
+ if (fpinfo->use_remote_estimate)
+ fpinfo->user = GetUserMappingById(joinrel->umid);
+ else
+ {
+ fpinfo->user = NULL;
+
+ /*
+ * If we are going to estimate costs locally, estimate the join clause
+ * selectivity here while we have special join info.
+ */
+ fpinfo->joinclause_sel = clauselist_selectivity(root, fpinfo->joinclauses,
+ 0, fpinfo->jointype,
+ extra->sjinfo);
+
+ }
+ fpinfo->server = GetForeignServer(joinrel->serverid);
+
+ /* Estimate costs for bare join relation */
+ estimate_path_cost_size(root, joinrel, NIL, NIL, &rows,
+ &width, &startup_cost, &total_cost);
+ /* Now update this information in the joinrel */
+ joinrel->rows = rows;
+ joinrel->width = width;
+ fpinfo->rows = rows;
+ fpinfo->width = width;
+ fpinfo->startup_cost = startup_cost;
+ fpinfo->total_cost = total_cost;
+
+ /*
+ * Create a new join path and add it to the joinrel which represents a
+ * join between foreign tables.
+ */
+ joinpath = create_foreignscan_path(root,
+ joinrel,
+ rows,
+ startup_cost,
+ total_cost,
+ NIL, /* no pathkeys */
+ NULL, /* no required_outer */
+ epq_path,
+ NULL); /* no fdw_private */
+
+ /* Add generated path into joinrel by add_path(). */
+ add_path(joinrel, (Path *) joinpath);
+
+ /* XXX Consider pathkeys for the join relation */
+
+ /* XXX Consider parameterized paths for the join relation */
+}
+
+/*
* Create a tuple from the specified row of the PGresult.
*
* rel is the local representation of the foreign table, attinmeta is
@@ -3058,10 +3689,11 @@ make_tuple_from_result_row(PGresult *res,
Relation rel,
AttInMetadata *attinmeta,
List *retrieved_attrs,
+ ForeignScanState *fsstate,
MemoryContext temp_context)
{
HeapTuple tuple;
- TupleDesc tupdesc = RelationGetDescr(rel);
+ TupleDesc tupdesc;
Datum *values;
bool *nulls;
ItemPointer ctid = NULL;
@@ -3080,6 +3712,17 @@ make_tuple_from_result_row(PGresult *res,
*/
oldcontext = MemoryContextSwitchTo(temp_context);
+ if (rel)
+ tupdesc = RelationGetDescr(rel);
+ else
+ {
+ PgFdwScanState *fdw_sstate;
+
+ Assert(fsstate);
+ fdw_sstate = (PgFdwScanState *) fsstate->fdw_state;
+ tupdesc = fdw_sstate->tupdesc;
+ }
+
values = (Datum *) palloc0(tupdesc->natts * sizeof(Datum));
nulls = (bool *) palloc(tupdesc->natts * sizeof(bool));
/* Initialize to nulls for any columns not present in result */
@@ -3090,6 +3733,7 @@ make_tuple_from_result_row(PGresult *res,
*/
errpos.rel = rel;
errpos.cur_attno = 0;
+ errpos.fsstate = fsstate;
errcallback.callback = conversion_error_callback;
errcallback.arg = (void *) &errpos;
errcallback.previous = error_context_stack;
@@ -3178,13 +3822,46 @@ make_tuple_from_result_row(PGresult *res,
static void
conversion_error_callback(void *arg)
{
+ const char *attname = NULL;
+ const char *relname = NULL;
ConversionLocation *errpos = (ConversionLocation *) arg;
- TupleDesc tupdesc = RelationGetDescr(errpos->rel);
- if (errpos->cur_attno > 0 && errpos->cur_attno <= tupdesc->natts)
- errcontext("column \"%s\" of foreign table \"%s\"",
- NameStr(tupdesc->attrs[errpos->cur_attno - 1]->attname),
- RelationGetRelationName(errpos->rel));
+ if (errpos->rel)
+ {
+ /* error occurred in a scan against a foreign table */
+ TupleDesc tupdesc = RelationGetDescr(errpos->rel);
+
+ if (errpos->cur_attno > 0 && errpos->cur_attno <= tupdesc->natts)
+ attname = NameStr(tupdesc->attrs[errpos->cur_attno - 1]->attname);
+ else if (errpos->cur_attno == SelfItemPointerAttributeNumber)
+ attname = "ctid";
+
+ relname = RelationGetRelationName(errpos->rel);
+ }
+ else
+ {
+ /* error occurred in a scan against a foreign join */
+ ForeignScanState *fsstate = errpos->fsstate;
+ ForeignScan *fsplan = (ForeignScan *) fsstate->ss.ps.plan;
+ EState *estate = fsstate->ss.ps.state;
+ TargetEntry *tle;
+ Var *var;
+ RangeTblEntry *rte;
+
+ Assert(IsA(fsplan, ForeignScan));
+ tle = (TargetEntry *) list_nth(fsplan->fdw_scan_tlist,
+ errpos->cur_attno - 1);
+ Assert(IsA(tle, TargetEntry));
+ var = (Var *) tle->expr;
+ Assert(IsA(var, Var));
+
+ rte = rt_fetch(var->varno, estate->es_range_table);
+ relname = get_rel_name(rte->relid);
+ attname = get_relid_attribute_name(rte->relid, var->varattno);
+ }
+
+ if (attname && relname)
+ errcontext("column \"%s\" of foreign table \"%s\"", attname, relname);
}
/*