diff options
author | Robert Haas <rhaas@postgresql.org> | 2016-02-09 14:00:50 -0500 |
---|---|---|
committer | Robert Haas <rhaas@postgresql.org> | 2016-02-09 14:00:50 -0500 |
commit | e4106b2528727c4b48639c0e12bf2f70a766b910 (patch) | |
tree | d22a96438d65f4cdd6537ab7787296e6bf14b190 /contrib/postgres_fdw/postgres_fdw.c | |
parent | 7351e18286ec83461b386e23328d65fd4a538bba (diff) | |
download | postgresql-e4106b2528727c4b48639c0e12bf2f70a766b910.tar.gz postgresql-e4106b2528727c4b48639c0e12bf2f70a766b910.zip |
postgres_fdw: Push down joins to remote servers.
If we've got a relatively straightforward join between two tables,
this pushes that join down to the remote server instead of fetching
the rows for each table and performing the join locally. Some cases
are not handled yet, such as SEMI and ANTI joins. Also, we don't
yet attempt to create presorted join paths or parameterized join
paths even though these options do get tried for a base relation
scan. Nevertheless, this seems likely to be a very significant win
in many practical cases.
Shigeru Hanada and Ashutosh Bapat, reviewed by Robert Haas, with
additional review at various points by Tom Lane, Etsuro Fujita,
KaiGai Kohei, and Jeevan Chalke.
Diffstat (limited to 'contrib/postgres_fdw/postgres_fdw.c')
-rw-r--r-- | contrib/postgres_fdw/postgres_fdw.c | 835 |
1 files changed, 756 insertions, 79 deletions
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index d5a2af9428c..14a3f9891a9 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -28,9 +28,9 @@ #include "optimizer/pathnode.h" #include "optimizer/paths.h" #include "optimizer/planmain.h" -#include "optimizer/prep.h" #include "optimizer/restrictinfo.h" #include "optimizer/var.h" +#include "optimizer/tlist.h" #include "parser/parsetree.h" #include "utils/builtins.h" #include "utils/guc.h" @@ -64,7 +64,15 @@ enum FdwScanPrivateIndex /* Integer list of attribute numbers retrieved by the SELECT */ FdwScanPrivateRetrievedAttrs, /* Integer representing the desired fetch_size */ - FdwScanPrivateFetchSize + FdwScanPrivateFetchSize, + /* Oid of user mapping to be used while connecting to the foreign server */ + FdwScanPrivateUserMappingOid, + + /* + * String describing join i.e. names of relations being joined and types + * of join, added when the scan is join + */ + FdwScanPrivateRelations }; /* @@ -94,7 +102,9 @@ enum FdwModifyPrivateIndex */ typedef struct PgFdwScanState { - Relation rel; /* relcache entry for the foreign table */ + Relation rel; /* relcache entry for the foreign table. NULL + * for a foreign join scan. */ + TupleDesc tupdesc; /* tuple descriptor of scan */ AttInMetadata *attinmeta; /* attribute datatype conversion metadata */ /* extracted fdw_private data */ @@ -182,8 +192,16 @@ typedef struct PgFdwAnalyzeState */ typedef struct ConversionLocation { - Relation rel; /* foreign table's relcache entry */ + Relation rel; /* foreign table's relcache entry. */ AttrNumber cur_attno; /* attribute number being processed, or 0 */ + + /* + * In case of foreign join push down, fdw_scan_tlist is used to identify + * the Var node corresponding to the error location and + * fsstate->ss.ps.state gives access to the RTEs of corresponding relation + * to get the relation name and attribute name. + */ + ForeignScanState *fsstate; } ConversionLocation; /* Callback argument for ec_member_matches_foreign */ @@ -257,6 +275,14 @@ static bool postgresAnalyzeForeignTable(Relation relation, BlockNumber *totalpages); static List *postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid); +static void postgresGetForeignJoinPaths(PlannerInfo *root, + RelOptInfo *joinrel, + RelOptInfo *outerrel, + RelOptInfo *innerrel, + JoinType jointype, + JoinPathExtraData *extra); +static bool postgresRecheckForeignScan(ForeignScanState *node, + TupleTableSlot *slot); static List *get_useful_pathkeys_for_relation(PlannerInfo *root, RelOptInfo *rel); static List *get_useful_ecs_for_relation(PlannerInfo *root, RelOptInfo *rel); @@ -299,8 +325,12 @@ static HeapTuple make_tuple_from_result_row(PGresult *res, Relation rel, AttInMetadata *attinmeta, List *retrieved_attrs, + ForeignScanState *fsstate, MemoryContext temp_context); static void conversion_error_callback(void *arg); +static bool foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel, + JoinType jointype, RelOptInfo *outerrel, RelOptInfo *innerrel, + JoinPathExtraData *extra); /* @@ -331,6 +361,8 @@ postgres_fdw_handler(PG_FUNCTION_ARGS) routine->EndForeignModify = postgresEndForeignModify; routine->IsForeignRelUpdatable = postgresIsForeignRelUpdatable; + /* Function for EvalPlanQual rechecks */ + routine->RecheckForeignScan = postgresRecheckForeignScan; /* Support functions for EXPLAIN */ routine->ExplainForeignScan = postgresExplainForeignScan; routine->ExplainForeignModify = postgresExplainForeignModify; @@ -341,6 +373,9 @@ postgres_fdw_handler(PG_FUNCTION_ARGS) /* Support functions for IMPORT FOREIGN SCHEMA */ routine->ImportForeignSchema = postgresImportForeignSchema; + /* Support functions for join push-down */ + routine->GetForeignJoinPaths = postgresGetForeignJoinPaths; + PG_RETURN_POINTER(routine); } @@ -358,6 +393,10 @@ postgresGetForeignRelSize(PlannerInfo *root, { PgFdwRelationInfo *fpinfo; ListCell *lc; + RangeTblEntry *rte = planner_rt_fetch(baserel->relid, root); + const char *namespace; + const char *relname; + const char *refname; /* * We use PgFdwRelationInfo to pass various information to subsequent @@ -366,6 +405,9 @@ postgresGetForeignRelSize(PlannerInfo *root, fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo)); baserel->fdw_private = (void *) fpinfo; + /* Base foreign tables need to be push down always. */ + fpinfo->pushdown_safe = true; + /* Look up foreign-table catalog info. */ fpinfo->table = GetForeignTable(foreigntableid); fpinfo->server = GetForeignServer(fpinfo->table->serverid); @@ -414,7 +456,6 @@ postgresGetForeignRelSize(PlannerInfo *root, */ if (fpinfo->use_remote_estimate) { - RangeTblEntry *rte = planner_rt_fetch(baserel->relid, root); Oid userid = rte->checkAsUser ? rte->checkAsUser : GetUserId(); fpinfo->user = GetUserMapping(userid, fpinfo->server->serverid); @@ -509,6 +550,23 @@ postgresGetForeignRelSize(PlannerInfo *root, &fpinfo->rows, &fpinfo->width, &fpinfo->startup_cost, &fpinfo->total_cost); } + + /* + * Set the name of relation in fpinfo, while we are constructing it here. + * It will be used to build the string describing the join relation in + * EXPLAIN output. We can't know whether VERBOSE option is specified or + * not, so always schema-qualify the foreign table name. + */ + fpinfo->relation_name = makeStringInfo(); + namespace = get_namespace_name(get_rel_namespace(foreigntableid)); + relname = get_rel_name(foreigntableid); + refname = rte->eref->aliasname; + appendStringInfo(fpinfo->relation_name, "%s.%s", + quote_identifier(namespace), + quote_identifier(relname)); + if (*refname && strcmp(refname, relname) != 0) + appendStringInfo(fpinfo->relation_name, " %s", + quote_identifier(rte->eref->aliasname)); } /* @@ -935,15 +993,15 @@ postgresGetForeignPaths(PlannerInfo *root, */ static ForeignScan * postgresGetForeignPlan(PlannerInfo *root, - RelOptInfo *baserel, + RelOptInfo *foreignrel, Oid foreigntableid, ForeignPath *best_path, List *tlist, List *scan_clauses, Plan *outer_plan) { - PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private; - Index scan_relid = baserel->relid; + PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private; + Index scan_relid; List *fdw_private; List *remote_conds = NIL; List *remote_exprs = NIL; @@ -952,6 +1010,28 @@ postgresGetForeignPlan(PlannerInfo *root, List *retrieved_attrs; StringInfoData sql; ListCell *lc; + List *fdw_scan_tlist = NIL; + + /* + * For base relations, set scan_relid as the relid of the relation. For + * other kinds of relations set it to 0. + */ + if (foreignrel->reloptkind == RELOPT_BASEREL || + foreignrel->reloptkind == RELOPT_OTHER_MEMBER_REL) + scan_relid = foreignrel->relid; + else + { + scan_relid = 0; + + /* + * create_scan_plan() and create_foreignscan_plan() pass + * rel->baserestrictinfo + parameterization clauses through + * scan_clauses. For a join rel->baserestrictinfo is NIL and we are + * not considering parameterization right now, so there should be no + * scan_clauses for a joinrel. + */ + Assert(!scan_clauses); + } /* * Separate the scan_clauses into those that can be executed remotely and @@ -989,7 +1069,7 @@ postgresGetForeignPlan(PlannerInfo *root, } else if (list_member_ptr(fpinfo->local_conds, rinfo)) local_exprs = lappend(local_exprs, rinfo->clause); - else if (is_foreign_expr(root, baserel, rinfo->clause)) + else if (is_foreign_expr(root, foreignrel, rinfo->clause)) { remote_conds = lappend(remote_conds, rinfo); remote_exprs = lappend(remote_exprs, rinfo->clause); @@ -998,26 +1078,70 @@ postgresGetForeignPlan(PlannerInfo *root, local_exprs = lappend(local_exprs, rinfo->clause); } + if (foreignrel->reloptkind == RELOPT_JOINREL) + { + /* For a join relation, get the conditions from fdw_private structure */ + remote_conds = fpinfo->remote_conds; + local_exprs = fpinfo->local_conds; + + /* Build the list of columns to be fetched from the foreign server. */ + fdw_scan_tlist = build_tlist_to_deparse(foreignrel); + + /* + * Ensure that the outer plan produces a tuple whose descriptor + * matches our scan tuple slot. This is safe because all scans and + * joins support projection, so we never need to insert a Result node. + * Also, remove the local conditions from outer plan's quals, lest + * they will be evaluated twice, once by the local plan and once by + * the scan. + */ + if (outer_plan) + { + ListCell *lc; + + outer_plan->targetlist = fdw_scan_tlist; + + foreach(lc, local_exprs) + { + Join *join_plan = (Join *) outer_plan; + Node *qual = lfirst(lc); + + outer_plan->qual = list_delete(outer_plan->qual, qual); + + /* + * For an inner join the local conditions of foreign scan plan + * can be part of the joinquals as well. + */ + if (join_plan->jointype == JOIN_INNER) + join_plan->joinqual = list_delete(join_plan->joinqual, + qual); + } + } + } + /* * Build the query string to be sent for execution, and identify * expressions to be sent as parameters. */ initStringInfo(&sql); - deparseSelectStmtForRel(&sql, root, baserel, remote_conds, - best_path->path.pathkeys, &retrieved_attrs, - ¶ms_list); + deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist, + remote_conds, best_path->path.pathkeys, + &retrieved_attrs, ¶ms_list); /* * Build the fdw_private list that will be available to the executor. - * Items in the list must match enum FdwScanPrivateIndex, above. + * Items in the list must match order in enum FdwScanPrivateIndex. */ - fdw_private = list_make3(makeString(sql.data), + fdw_private = list_make4(makeString(sql.data), retrieved_attrs, - makeInteger(fpinfo->fetch_size)); + makeInteger(fpinfo->fetch_size), + makeInteger(foreignrel->umid)); + if (foreignrel->reloptkind == RELOPT_JOINREL) + fdw_private = lappend(fdw_private, + makeString(fpinfo->relation_name->data)); /* - * Create the ForeignScan node from target list, filtering expressions, - * remote parameter expressions, and FDW private information. + * Create the ForeignScan node for the given relation. * * Note that the remote parameter expressions are stored in the fdw_exprs * field of the finished plan node; we can't keep them in private state @@ -1028,7 +1152,7 @@ postgresGetForeignPlan(PlannerInfo *root, scan_relid, params_list, fdw_private, - NIL, /* no custom tlist */ + fdw_scan_tlist, remote_exprs, outer_plan); } @@ -1043,9 +1167,6 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan; EState *estate = node->ss.ps.state; PgFdwScanState *fsstate; - RangeTblEntry *rte; - Oid userid; - ForeignTable *table; UserMapping *user; int numParams; int i; @@ -1064,16 +1185,36 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) node->fdw_state = (void *) fsstate; /* - * Identify which user to do the remote access as. This should match what - * ExecCheckRTEPerms() does. + * Obtain the foreign server where to connect and user mapping to use for + * connection. For base relations we obtain this information from + * catalogs. For join relations, this information is frozen at the time of + * planning to ensure that the join is safe to pushdown. In case the + * information goes stale between planning and execution, plan will be + * invalidated and replanned. */ - rte = rt_fetch(fsplan->scan.scanrelid, estate->es_range_table); - userid = rte->checkAsUser ? rte->checkAsUser : GetUserId(); + if (fsplan->scan.scanrelid > 0) + { + ForeignTable *table; - /* Get info about foreign table. */ - fsstate->rel = node->ss.ss_currentRelation; - table = GetForeignTable(RelationGetRelid(fsstate->rel)); - user = GetUserMapping(userid, table->serverid); + /* + * Identify which user to do the remote access as. This should match + * what ExecCheckRTEPerms() does. + */ + RangeTblEntry *rte = rt_fetch(fsplan->scan.scanrelid, estate->es_range_table); + Oid userid = rte->checkAsUser ? rte->checkAsUser : GetUserId(); + + fsstate->rel = node->ss.ss_currentRelation; + table = GetForeignTable(RelationGetRelid(fsstate->rel)); + + user = GetUserMapping(userid, table->serverid); + } + else + { + Oid umid = intVal(list_nth(fsplan->fdw_private, FdwScanPrivateUserMappingOid)); + + user = GetUserMappingById(umid); + Assert(fsplan->fs_server == user->serverid); + } /* * Get connection to the foreign server. Connection manager will @@ -1105,8 +1246,16 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) ALLOCSET_SMALL_INITSIZE, ALLOCSET_SMALL_MAXSIZE); - /* Get info we'll need for input data conversion. */ - fsstate->attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(fsstate->rel)); + /* + * Get info we'll need for converting data fetched from the foreign server + * into local representation and error reporting during that process. + */ + if (fsplan->scan.scanrelid > 0) + fsstate->tupdesc = RelationGetDescr(fsstate->rel); + else + fsstate->tupdesc = node->ss.ss_ScanTupleSlot->tts_tupleDescriptor; + + fsstate->attinmeta = TupleDescGetAttInMetadata(fsstate->tupdesc); /* Prepare for output conversion of parameters used in remote query. */ numParams = list_length(fsplan->fdw_exprs); @@ -1825,6 +1974,34 @@ postgresIsForeignRelUpdatable(Relation rel) } /* + * postgresRecheckForeignScan + * Execute a local join execution plan for a foreign join + */ +static bool +postgresRecheckForeignScan(ForeignScanState *node, TupleTableSlot *slot) +{ + Index scanrelid = ((Scan *) node->ss.ps.plan)->scanrelid; + PlanState *outerPlan = outerPlanState(node); + TupleTableSlot *result; + + /* For base foreign relations, it suffices to set fdw_recheck_quals */ + if (scanrelid > 0) + return true; + + Assert(outerPlan != NULL); + + /* Execute a local join execution plan */ + result = ExecProcNode(outerPlan); + if (TupIsNull(result)) + return false; + + /* Store result in the given slot */ + ExecCopySlot(slot, result); + + return true; +} + +/* * postgresExplainForeignScan * Produce extra output for EXPLAIN of a ForeignScan on a foreign table */ @@ -1833,10 +2010,25 @@ postgresExplainForeignScan(ForeignScanState *node, ExplainState *es) { List *fdw_private; char *sql; + char *relations; + + fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private; + + /* + * Add names of relation handled by the foreign scan when the scan is a + * join + */ + if (list_length(fdw_private) > FdwScanPrivateRelations) + { + relations = strVal(list_nth(fdw_private, FdwScanPrivateRelations)); + ExplainPropertyText("Relations", relations, es); + } + /* + * Add remote query, when VERBOSE option is specified. + */ if (es->verbose) { - fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private; sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql)); ExplainPropertyText("Remote SQL", sql, es); } @@ -1865,26 +2057,29 @@ postgresExplainForeignModify(ModifyTableState *mtstate, /* * estimate_path_cost_size - * Get cost and size estimates for a foreign scan + * Get cost and size estimates for a foreign scan on given foreign relation + * either a base relation or a join between foreign relations. * - * We assume that all the baserestrictinfo clauses will be applied, plus - * any join clauses listed in join_conds. + * param_join_conds are the parameterization clauses with outer relations. + * pathkeys specify the expected sort order if any for given path being costed. + * + * The function returns the cost and size estimates in p_row, p_width, + * p_startup_cost and p_total_cost variables. */ static void estimate_path_cost_size(PlannerInfo *root, - RelOptInfo *baserel, - List *join_conds, + RelOptInfo *foreignrel, + List *param_join_conds, List *pathkeys, double *p_rows, int *p_width, Cost *p_startup_cost, Cost *p_total_cost) { - PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private; + PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private; double rows; double retrieved_rows; int width; Cost startup_cost; Cost total_cost; - Cost run_cost; Cost cpu_per_tuple; /* @@ -1896,39 +2091,49 @@ estimate_path_cost_size(PlannerInfo *root, */ if (fpinfo->use_remote_estimate) { - List *remote_join_conds; - List *local_join_conds; + List *remote_param_join_conds; + List *local_param_join_conds; StringInfoData sql; - List *retrieved_attrs; PGconn *conn; Selectivity local_sel; QualCost local_cost; + List *fdw_scan_tlist = NIL; List *remote_conds; + /* Required only to be passed to deparseSelectStmtForRel */ + List *retrieved_attrs; + /* - * join_conds might contain both clauses that are safe to send across, - * and clauses that aren't. + * param_join_conds might contain both clauses that are safe to send + * across, and clauses that aren't. */ - classifyConditions(root, baserel, join_conds, - &remote_join_conds, &local_join_conds); + classifyConditions(root, foreignrel, param_join_conds, + &remote_param_join_conds, &local_param_join_conds); + + /* Build the list of columns to be fetched from the foreign server. */ + if (foreignrel->reloptkind == RELOPT_JOINREL) + fdw_scan_tlist = build_tlist_to_deparse(foreignrel); + else + fdw_scan_tlist = NIL; /* * The complete list of remote conditions includes everything from * baserestrictinfo plus any extra join_conds relevant to this * particular path. */ - remote_conds = list_concat(list_copy(remote_join_conds), + remote_conds = list_concat(list_copy(remote_param_join_conds), fpinfo->remote_conds); /* * Construct EXPLAIN query including the desired SELECT, FROM, and - * WHERE clauses. Params and other-relation Vars are replaced by - * dummy values. + * WHERE clauses. Params and other-relation Vars are replaced by dummy + * values, so don't request params_list. */ initStringInfo(&sql); appendStringInfoString(&sql, "EXPLAIN "); - deparseSelectStmtForRel(&sql, root, baserel, remote_conds, pathkeys, - &retrieved_attrs, NULL); + deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist, + remote_conds, pathkeys, &retrieved_attrs, + NULL); /* Get the remote estimate */ conn = GetConnection(fpinfo->user, false); @@ -1940,8 +2145,8 @@ estimate_path_cost_size(PlannerInfo *root, /* Factor in the selectivity of the locally-checked quals */ local_sel = clauselist_selectivity(root, - local_join_conds, - baserel->relid, + local_param_join_conds, + foreignrel->relid, JOIN_INNER, NULL); local_sel *= fpinfo->local_conds_sel; @@ -1951,41 +2156,113 @@ estimate_path_cost_size(PlannerInfo *root, /* Add in the eval cost of the locally-checked quals */ startup_cost += fpinfo->local_conds_cost.startup; total_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows; - cost_qual_eval(&local_cost, local_join_conds, root); + cost_qual_eval(&local_cost, local_param_join_conds, root); startup_cost += local_cost.startup; total_cost += local_cost.per_tuple * retrieved_rows; } else { + Cost run_cost = 0; + /* * We don't support join conditions in this mode (hence, no * parameterized paths can be made). */ - Assert(join_conds == NIL); - - /* Use rows/width estimates made by set_baserel_size_estimates. */ - rows = baserel->rows; - width = baserel->width; + Assert(param_join_conds == NIL); /* - * Back into an estimate of the number of retrieved rows. Just in - * case this is nuts, clamp to at most baserel->tuples. + * Use rows/width estimates made by set_baserel_size_estimates() for + * base foreign relations and set_joinrel_size_estimates() for join + * between foreign relations. */ + rows = foreignrel->rows; + width = foreignrel->width; + + /* Back into an estimate of the number of retrieved rows. */ retrieved_rows = clamp_row_est(rows / fpinfo->local_conds_sel); - retrieved_rows = Min(retrieved_rows, baserel->tuples); - /* - * Cost as though this were a seqscan, which is pessimistic. We - * effectively imagine the local_conds are being evaluated remotely, - * too. - */ - startup_cost = 0; - run_cost = 0; - run_cost += seq_page_cost * baserel->pages; + if (foreignrel->reloptkind != RELOPT_JOINREL) + { + /* Clamp retrieved rows estimates to at most foreignrel->tuples. */ + retrieved_rows = Min(retrieved_rows, foreignrel->tuples); + + /* + * Cost as though this were a seqscan, which is pessimistic. We + * effectively imagine the local_conds are being evaluated + * remotely, too. + */ + startup_cost = 0; + run_cost = 0; + run_cost += seq_page_cost * foreignrel->pages; - startup_cost += baserel->baserestrictcost.startup; - cpu_per_tuple = cpu_tuple_cost + baserel->baserestrictcost.per_tuple; - run_cost += cpu_per_tuple * baserel->tuples; + startup_cost += foreignrel->baserestrictcost.startup; + cpu_per_tuple = cpu_tuple_cost + foreignrel->baserestrictcost.per_tuple; + run_cost += cpu_per_tuple * foreignrel->tuples; + } + else + { + PgFdwRelationInfo *fpinfo_i; + PgFdwRelationInfo *fpinfo_o; + QualCost join_cost; + QualCost remote_conds_cost; + double nrows; + + /* For join we expect inner and outer relations set */ + Assert(fpinfo->innerrel && fpinfo->outerrel); + + fpinfo_i = (PgFdwRelationInfo *) fpinfo->innerrel->fdw_private; + fpinfo_o = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private; + + /* Estimate of number of rows in cross product */ + nrows = fpinfo_i->rows * fpinfo_o->rows; + /* Clamp retrieved rows estimate to at most size of cross product */ + retrieved_rows = Min(retrieved_rows, nrows); + + /* + * The cost of foreign join is estimated as cost of generating + * rows for the joining relations + cost for applying quals on the + * rows. + */ + + /* Calculate the cost of clauses pushed down the foreign server */ + cost_qual_eval(&remote_conds_cost, fpinfo->remote_conds, root); + /* Calculate the cost of applying join clauses */ + cost_qual_eval(&join_cost, fpinfo->joinclauses, root); + + /* + * Startup cost includes startup cost of joining relations and the + * startup cost for join and other clauses. We do not include the + * startup cost specific to join strategy (e.g. setting up hash + * tables) since we do not know what strategy the foreign server + * is going to use. + */ + startup_cost = fpinfo_i->rel_startup_cost + fpinfo_o->rel_startup_cost; + startup_cost += join_cost.startup; + startup_cost += remote_conds_cost.startup; + startup_cost += fpinfo->local_conds_cost.startup; + + /* + * Run time cost includes: + * + * 1. Run time cost (total_cost - startup_cost) of relations being + * joined + * + * 2. Run time cost of applying join clauses on the cross product + * of the joining relations. + * + * 3. Run time cost of applying pushed down other clauses on the + * result of join + * + * 4. Run time cost of applying nonpushable other clauses locally + * on the result fetched from the foreign server. + */ + run_cost = fpinfo_i->rel_total_cost - fpinfo_i->rel_startup_cost; + run_cost += fpinfo_o->rel_total_cost - fpinfo_o->rel_startup_cost; + run_cost += nrows * join_cost.per_tuple; + nrows = clamp_row_est(nrows * fpinfo->joinclause_sel); + run_cost += nrows * remote_conds_cost.per_tuple; + run_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows; + } /* * Without remote estimates, we have no real way to estimate the cost @@ -2006,6 +2283,15 @@ estimate_path_cost_size(PlannerInfo *root, } /* + * Cache the costs prior to adding the costs for transferring data from + * the foreign server. These costs are useful for costing the join between + * this relation and another foreign relation, when the cost of join can + * not be obtained from the foreign server. + */ + fpinfo->rel_startup_cost = startup_cost; + fpinfo->rel_total_cost = total_cost; + + /* * Add some additional cost factors to account for connection overhead * (fdw_startup_cost), transferring data across the network * (fdw_tuple_cost per retrieved row), and local manipulation of the data @@ -2237,11 +2523,15 @@ fetch_more_data(ForeignScanState *node) for (i = 0; i < numrows; i++) { + ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan; + + Assert(IsA(fsplan, ForeignScan)); fsstate->tuples[i] = make_tuple_from_result_row(res, i, fsstate->rel, fsstate->attinmeta, fsstate->retrieved_attrs, + node, fsstate->temp_cxt); } @@ -2460,6 +2750,7 @@ store_returning_result(PgFdwModifyState *fmstate, fmstate->rel, fmstate->attinmeta, fmstate->retrieved_attrs, + NULL, fmstate->temp_cxt); /* tuple will be deleted when it is cleared from the slot */ ExecStoreTuple(newtup, slot, InvalidBuffer, true); @@ -2770,6 +3061,7 @@ analyze_row_processor(PGresult *res, int row, PgFdwAnalyzeState *astate) astate->rel, astate->attinmeta, astate->retrieved_attrs, + NULL, astate->temp_cxt); MemoryContextSwitchTo(oldcontext); @@ -3045,6 +3337,345 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid) } /* + * Assess whether the join between inner and outer relations can be pushed down + * to the foreign server. As a side effect, save information we obtain in this + * function to PgFdwRelationInfo passed in. + * + * Joins that satisfy conditions below are safe to push down. + * + * 1) Join type is INNER or OUTER (one of LEFT/RIGHT/FULL) + * 2) Both outer and inner portions are safe to push-down + * 3) All foreign tables in the join belong to the same foreign server and use + * the same user mapping. + * 4) All join conditions are safe to push down + * 5) No relation has local filter (this can be relaxed for INNER JOIN, if we + * can move unpushable clauses upwards in the join tree). + */ +static bool +foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel, JoinType jointype, + RelOptInfo *outerrel, RelOptInfo *innerrel, + JoinPathExtraData *extra) +{ + PgFdwRelationInfo *fpinfo; + PgFdwRelationInfo *fpinfo_o; + PgFdwRelationInfo *fpinfo_i; + ListCell *lc; + List *joinclauses; + List *otherclauses; + + /* + * We support pushing down INNER, LEFT, RIGHT and FULL OUTER joins. + * Constructing queries representing SEMI and ANTI joins is hard, hence + * not considered right now. + */ + if (jointype != JOIN_INNER && jointype != JOIN_LEFT && + jointype != JOIN_RIGHT && jointype != JOIN_FULL) + return false; + + /* + * If either of the joining relations is marked as unsafe to pushdown, the + * join can not be pushed down. + */ + fpinfo = (PgFdwRelationInfo *) joinrel->fdw_private; + fpinfo_o = (PgFdwRelationInfo *) outerrel->fdw_private; + fpinfo_i = (PgFdwRelationInfo *) innerrel->fdw_private; + if (!fpinfo_o || !fpinfo_o->pushdown_safe || + !fpinfo_i || !fpinfo_i->pushdown_safe) + return false; + + /* + * If joining relations have local conditions, those conditions are + * required to be applied before joining the relations. Hence the join can + * not be pushed down. + */ + if (fpinfo_o->local_conds || fpinfo_i->local_conds) + return false; + + /* Separate restrict list into join quals and quals on join relation */ + if (IS_OUTER_JOIN(jointype)) + extract_actual_join_clauses(extra->restrictlist, &joinclauses, &otherclauses); + else + { + /* + * Unlike an outer join, for inner join, the join result contains only + * the rows which satisfy join clauses, similar to the other clause. + * Hence all clauses can be treated as other quals. This helps to push + * a join down to the foreign server even if some of its join quals + * are not safe to pushdown. + */ + otherclauses = extract_actual_clauses(extra->restrictlist, false); + joinclauses = NIL; + } + + /* Join quals must be safe to push down. */ + foreach(lc, joinclauses) + { + Expr *expr = (Expr *) lfirst(lc); + + if (!is_foreign_expr(root, joinrel, expr)) + return false; + } + + /* Save the join clauses, for later use. */ + fpinfo->joinclauses = joinclauses; + + /* + * Other clauses are applied after the join has been performed and thus + * need not be all pushable. We will push those which can be pushed to + * reduce the number of rows fetched from the foreign server. Rest of them + * will be applied locally after fetching join result. Add them to fpinfo + * so that other joins involving this joinrel will know that this joinrel + * has local clauses. + */ + foreach(lc, otherclauses) + { + Expr *expr = (Expr *) lfirst(lc); + + if (!is_foreign_expr(root, joinrel, expr)) + fpinfo->local_conds = lappend(fpinfo->local_conds, expr); + else + fpinfo->remote_conds = lappend(fpinfo->remote_conds, expr); + } + + fpinfo->outerrel = outerrel; + fpinfo->innerrel = innerrel; + fpinfo->jointype = jointype; + + /* + * If user is willing to estimate cost for a scan of either of the joining + * relations using EXPLAIN, he intends to estimate scans on that relation + * more accurately. Then, it makes sense to estimate the cost the join + * with that relation more accurately using EXPLAIN. + */ + fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate || + fpinfo_i->use_remote_estimate; + + /* + * Since both the joining relations come from the same server, the server + * level options should have same value for both the relations. Pick from + * any side. + */ + fpinfo->fdw_startup_cost = fpinfo_o->fdw_startup_cost; + fpinfo->fdw_tuple_cost = fpinfo_o->fdw_tuple_cost; + + /* Mark that this join can be pushed down safely */ + fpinfo->pushdown_safe = true; + + /* + * Set fetch size to maximum of the joining sides, since we are expecting + * the rows returned by the join to be proportional to the relation sizes. + */ + if (fpinfo_o->fetch_size > fpinfo_i->fetch_size) + fpinfo->fetch_size = fpinfo_o->fetch_size; + else + fpinfo->fetch_size = fpinfo_i->fetch_size; + + /* + * Pull the other remote conditions from the joining relations into join + * clauses or other remote clauses (remote_conds) of this relation. This + * avoids building subqueries at every join step. + * + * For an inner join, clauses from both the relations are added to the + * other remote clauses. For an OUTER join, the clauses from the outer + * side are added to remote_conds since those can be evaluated after the + * join is evaluated. The clauses from inner side are added to the + * joinclauses, since they need to evaluated while constructing the join. + * + * The joining sides can not have local conditions, thus no need to test + * shippability of the clauses being pulled up. + */ + switch (jointype) + { + case JOIN_INNER: + fpinfo->remote_conds = list_concat(fpinfo->remote_conds, + fpinfo_i->remote_conds); + fpinfo->remote_conds = list_concat(fpinfo->remote_conds, + fpinfo_o->remote_conds); + break; + + case JOIN_LEFT: + fpinfo->joinclauses = list_concat(fpinfo->joinclauses, + fpinfo_i->remote_conds); + fpinfo->remote_conds = list_concat(fpinfo->remote_conds, + fpinfo_o->remote_conds); + break; + + case JOIN_RIGHT: + fpinfo->joinclauses = list_concat(fpinfo->joinclauses, + fpinfo_o->remote_conds); + fpinfo->remote_conds = list_concat(fpinfo->remote_conds, + fpinfo_i->remote_conds); + break; + + case JOIN_FULL: + fpinfo->joinclauses = list_concat(fpinfo->joinclauses, + fpinfo_i->remote_conds); + fpinfo->joinclauses = list_concat(fpinfo->joinclauses, + fpinfo_o->remote_conds); + break; + + default: + /* Should not happen, we have just check this above */ + elog(ERROR, "unsupported join type %d", jointype); + } + + /* + * Set the string describing this join relation to be used in EXPLAIN + * output of corresponding ForeignScan. + */ + fpinfo->relation_name = makeStringInfo(); + appendStringInfo(fpinfo->relation_name, "(%s) %s JOIN (%s)", + fpinfo_o->relation_name->data, + get_jointype_name(fpinfo->jointype), + fpinfo_i->relation_name->data); + + return true; +} + +/* + * postgresGetForeignJoinPaths + * Add possible ForeignPath to joinrel, if join is safe to push down. + */ +static void +postgresGetForeignJoinPaths(PlannerInfo *root, + RelOptInfo *joinrel, + RelOptInfo *outerrel, + RelOptInfo *innerrel, + JoinType jointype, + JoinPathExtraData *extra) +{ + PgFdwRelationInfo *fpinfo; + ForeignPath *joinpath; + double rows; + int width; + Cost startup_cost; + Cost total_cost; + Path *epq_path; /* Path to create plan to be executed when + * EvalPlanQual gets triggered. */ + + /* + * Skip if this join combination has been considered already. + */ + if (joinrel->fdw_private) + return; + + /* + * Create unfinished PgFdwRelationInfo entry which is used to indicate + * that the join relation is already considered, so that we won't waste + * time in judging safety of join pushdown and adding the same paths again + * if found safe. Once we know that this join can be pushed down, we fill + * the entry. + */ + fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo)); + fpinfo->pushdown_safe = false; + joinrel->fdw_private = fpinfo; + /* attrs_used is only for base relations. */ + fpinfo->attrs_used = NULL; + + /* + * In case there is a possibility that EvalPlanQual will be executed, we + * should be able to reconstruct the row, from base relations applying all + * the conditions. We create a local plan from a suitable local path + * available in the path list. In case such a path doesn't exist, we can + * not push the join to the foreign server since we won't be able to + * reconstruct the row for EvalPlanQual(). Find an alternative local path + * before we add ForeignPath, lest the new path would kick possibly the + * only local path. Do this before calling foreign_join_ok(), since that + * function updates fpinfo and marks it as pushable if the join is found + * to be pushable. + */ + if (root->parse->commandType == CMD_DELETE || + root->parse->commandType == CMD_UPDATE || + root->rowMarks) + { + epq_path = GetExistingLocalJoinPath(joinrel); + if (!epq_path) + { + elog(DEBUG3, "could not push down foreign join because a local path suitable for EPQ checks was not found"); + return; + } + } + else + epq_path = NULL; + + if (!foreign_join_ok(root, joinrel, jointype, outerrel, innerrel, extra)) + { + /* Free path required for EPQ if we copied one; we don't need it now */ + if (epq_path) + pfree(epq_path); + return; + } + + /* + * Compute the selectivity and cost of the local_conds, so we don't have + * to do it over again for each path. The best we can do for these + * conditions is to estimate selectivity on the basis of local statistics. + * The local conditions are applied after the join has been computed on + * the remote side like quals in WHERE clause, so pass jointype as + * JOIN_INNER. + */ + fpinfo->local_conds_sel = clauselist_selectivity(root, + fpinfo->local_conds, + 0, + JOIN_INNER, + NULL); + cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root); + + /* + * If we are going to estimate the costs using EXPLAIN, we will need + * connection information. Fill it here. + */ + if (fpinfo->use_remote_estimate) + fpinfo->user = GetUserMappingById(joinrel->umid); + else + { + fpinfo->user = NULL; + + /* + * If we are going to estimate costs locally, estimate the join clause + * selectivity here while we have special join info. + */ + fpinfo->joinclause_sel = clauselist_selectivity(root, fpinfo->joinclauses, + 0, fpinfo->jointype, + extra->sjinfo); + + } + fpinfo->server = GetForeignServer(joinrel->serverid); + + /* Estimate costs for bare join relation */ + estimate_path_cost_size(root, joinrel, NIL, NIL, &rows, + &width, &startup_cost, &total_cost); + /* Now update this information in the joinrel */ + joinrel->rows = rows; + joinrel->width = width; + fpinfo->rows = rows; + fpinfo->width = width; + fpinfo->startup_cost = startup_cost; + fpinfo->total_cost = total_cost; + + /* + * Create a new join path and add it to the joinrel which represents a + * join between foreign tables. + */ + joinpath = create_foreignscan_path(root, + joinrel, + rows, + startup_cost, + total_cost, + NIL, /* no pathkeys */ + NULL, /* no required_outer */ + epq_path, + NULL); /* no fdw_private */ + + /* Add generated path into joinrel by add_path(). */ + add_path(joinrel, (Path *) joinpath); + + /* XXX Consider pathkeys for the join relation */ + + /* XXX Consider parameterized paths for the join relation */ +} + +/* * Create a tuple from the specified row of the PGresult. * * rel is the local representation of the foreign table, attinmeta is @@ -3058,10 +3689,11 @@ make_tuple_from_result_row(PGresult *res, Relation rel, AttInMetadata *attinmeta, List *retrieved_attrs, + ForeignScanState *fsstate, MemoryContext temp_context) { HeapTuple tuple; - TupleDesc tupdesc = RelationGetDescr(rel); + TupleDesc tupdesc; Datum *values; bool *nulls; ItemPointer ctid = NULL; @@ -3080,6 +3712,17 @@ make_tuple_from_result_row(PGresult *res, */ oldcontext = MemoryContextSwitchTo(temp_context); + if (rel) + tupdesc = RelationGetDescr(rel); + else + { + PgFdwScanState *fdw_sstate; + + Assert(fsstate); + fdw_sstate = (PgFdwScanState *) fsstate->fdw_state; + tupdesc = fdw_sstate->tupdesc; + } + values = (Datum *) palloc0(tupdesc->natts * sizeof(Datum)); nulls = (bool *) palloc(tupdesc->natts * sizeof(bool)); /* Initialize to nulls for any columns not present in result */ @@ -3090,6 +3733,7 @@ make_tuple_from_result_row(PGresult *res, */ errpos.rel = rel; errpos.cur_attno = 0; + errpos.fsstate = fsstate; errcallback.callback = conversion_error_callback; errcallback.arg = (void *) &errpos; errcallback.previous = error_context_stack; @@ -3178,13 +3822,46 @@ make_tuple_from_result_row(PGresult *res, static void conversion_error_callback(void *arg) { + const char *attname = NULL; + const char *relname = NULL; ConversionLocation *errpos = (ConversionLocation *) arg; - TupleDesc tupdesc = RelationGetDescr(errpos->rel); - if (errpos->cur_attno > 0 && errpos->cur_attno <= tupdesc->natts) - errcontext("column \"%s\" of foreign table \"%s\"", - NameStr(tupdesc->attrs[errpos->cur_attno - 1]->attname), - RelationGetRelationName(errpos->rel)); + if (errpos->rel) + { + /* error occurred in a scan against a foreign table */ + TupleDesc tupdesc = RelationGetDescr(errpos->rel); + + if (errpos->cur_attno > 0 && errpos->cur_attno <= tupdesc->natts) + attname = NameStr(tupdesc->attrs[errpos->cur_attno - 1]->attname); + else if (errpos->cur_attno == SelfItemPointerAttributeNumber) + attname = "ctid"; + + relname = RelationGetRelationName(errpos->rel); + } + else + { + /* error occurred in a scan against a foreign join */ + ForeignScanState *fsstate = errpos->fsstate; + ForeignScan *fsplan = (ForeignScan *) fsstate->ss.ps.plan; + EState *estate = fsstate->ss.ps.state; + TargetEntry *tle; + Var *var; + RangeTblEntry *rte; + + Assert(IsA(fsplan, ForeignScan)); + tle = (TargetEntry *) list_nth(fsplan->fdw_scan_tlist, + errpos->cur_attno - 1); + Assert(IsA(tle, TargetEntry)); + var = (Var *) tle->expr; + Assert(IsA(var, Var)); + + rte = rt_fetch(var->varno, estate->es_range_table); + relname = get_rel_name(rte->relid); + attname = get_relid_attribute_name(rte->relid, var->varattno); + } + + if (attname && relname) + errcontext("column \"%s\" of foreign table \"%s\"", attname, relname); } /* |