diff options
Diffstat (limited to 'contrib/postgres_fdw/postgres_fdw.c')
-rw-r--r-- | contrib/postgres_fdw/postgres_fdw.c | 835 |
1 files changed, 756 insertions, 79 deletions
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index d5a2af9428c..14a3f9891a9 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -28,9 +28,9 @@ #include "optimizer/pathnode.h" #include "optimizer/paths.h" #include "optimizer/planmain.h" -#include "optimizer/prep.h" #include "optimizer/restrictinfo.h" #include "optimizer/var.h" +#include "optimizer/tlist.h" #include "parser/parsetree.h" #include "utils/builtins.h" #include "utils/guc.h" @@ -64,7 +64,15 @@ enum FdwScanPrivateIndex /* Integer list of attribute numbers retrieved by the SELECT */ FdwScanPrivateRetrievedAttrs, /* Integer representing the desired fetch_size */ - FdwScanPrivateFetchSize + FdwScanPrivateFetchSize, + /* Oid of user mapping to be used while connecting to the foreign server */ + FdwScanPrivateUserMappingOid, + + /* + * String describing join i.e. names of relations being joined and types + * of join, added when the scan is join + */ + FdwScanPrivateRelations }; /* @@ -94,7 +102,9 @@ enum FdwModifyPrivateIndex */ typedef struct PgFdwScanState { - Relation rel; /* relcache entry for the foreign table */ + Relation rel; /* relcache entry for the foreign table. NULL + * for a foreign join scan. */ + TupleDesc tupdesc; /* tuple descriptor of scan */ AttInMetadata *attinmeta; /* attribute datatype conversion metadata */ /* extracted fdw_private data */ @@ -182,8 +192,16 @@ typedef struct PgFdwAnalyzeState */ typedef struct ConversionLocation { - Relation rel; /* foreign table's relcache entry */ + Relation rel; /* foreign table's relcache entry. */ AttrNumber cur_attno; /* attribute number being processed, or 0 */ + + /* + * In case of foreign join push down, fdw_scan_tlist is used to identify + * the Var node corresponding to the error location and + * fsstate->ss.ps.state gives access to the RTEs of corresponding relation + * to get the relation name and attribute name. + */ + ForeignScanState *fsstate; } ConversionLocation; /* Callback argument for ec_member_matches_foreign */ @@ -257,6 +275,14 @@ static bool postgresAnalyzeForeignTable(Relation relation, BlockNumber *totalpages); static List *postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid); +static void postgresGetForeignJoinPaths(PlannerInfo *root, + RelOptInfo *joinrel, + RelOptInfo *outerrel, + RelOptInfo *innerrel, + JoinType jointype, + JoinPathExtraData *extra); +static bool postgresRecheckForeignScan(ForeignScanState *node, + TupleTableSlot *slot); static List *get_useful_pathkeys_for_relation(PlannerInfo *root, RelOptInfo *rel); static List *get_useful_ecs_for_relation(PlannerInfo *root, RelOptInfo *rel); @@ -299,8 +325,12 @@ static HeapTuple make_tuple_from_result_row(PGresult *res, Relation rel, AttInMetadata *attinmeta, List *retrieved_attrs, + ForeignScanState *fsstate, MemoryContext temp_context); static void conversion_error_callback(void *arg); +static bool foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel, + JoinType jointype, RelOptInfo *outerrel, RelOptInfo *innerrel, + JoinPathExtraData *extra); /* @@ -331,6 +361,8 @@ postgres_fdw_handler(PG_FUNCTION_ARGS) routine->EndForeignModify = postgresEndForeignModify; routine->IsForeignRelUpdatable = postgresIsForeignRelUpdatable; + /* Function for EvalPlanQual rechecks */ + routine->RecheckForeignScan = postgresRecheckForeignScan; /* Support functions for EXPLAIN */ routine->ExplainForeignScan = postgresExplainForeignScan; routine->ExplainForeignModify = postgresExplainForeignModify; @@ -341,6 +373,9 @@ postgres_fdw_handler(PG_FUNCTION_ARGS) /* Support functions for IMPORT FOREIGN SCHEMA */ routine->ImportForeignSchema = postgresImportForeignSchema; + /* Support functions for join push-down */ + routine->GetForeignJoinPaths = postgresGetForeignJoinPaths; + PG_RETURN_POINTER(routine); } @@ -358,6 +393,10 @@ postgresGetForeignRelSize(PlannerInfo *root, { PgFdwRelationInfo *fpinfo; ListCell *lc; + RangeTblEntry *rte = planner_rt_fetch(baserel->relid, root); + const char *namespace; + const char *relname; + const char *refname; /* * We use PgFdwRelationInfo to pass various information to subsequent @@ -366,6 +405,9 @@ postgresGetForeignRelSize(PlannerInfo *root, fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo)); baserel->fdw_private = (void *) fpinfo; + /* Base foreign tables need to be push down always. */ + fpinfo->pushdown_safe = true; + /* Look up foreign-table catalog info. */ fpinfo->table = GetForeignTable(foreigntableid); fpinfo->server = GetForeignServer(fpinfo->table->serverid); @@ -414,7 +456,6 @@ postgresGetForeignRelSize(PlannerInfo *root, */ if (fpinfo->use_remote_estimate) { - RangeTblEntry *rte = planner_rt_fetch(baserel->relid, root); Oid userid = rte->checkAsUser ? rte->checkAsUser : GetUserId(); fpinfo->user = GetUserMapping(userid, fpinfo->server->serverid); @@ -509,6 +550,23 @@ postgresGetForeignRelSize(PlannerInfo *root, &fpinfo->rows, &fpinfo->width, &fpinfo->startup_cost, &fpinfo->total_cost); } + + /* + * Set the name of relation in fpinfo, while we are constructing it here. + * It will be used to build the string describing the join relation in + * EXPLAIN output. We can't know whether VERBOSE option is specified or + * not, so always schema-qualify the foreign table name. + */ + fpinfo->relation_name = makeStringInfo(); + namespace = get_namespace_name(get_rel_namespace(foreigntableid)); + relname = get_rel_name(foreigntableid); + refname = rte->eref->aliasname; + appendStringInfo(fpinfo->relation_name, "%s.%s", + quote_identifier(namespace), + quote_identifier(relname)); + if (*refname && strcmp(refname, relname) != 0) + appendStringInfo(fpinfo->relation_name, " %s", + quote_identifier(rte->eref->aliasname)); } /* @@ -935,15 +993,15 @@ postgresGetForeignPaths(PlannerInfo *root, */ static ForeignScan * postgresGetForeignPlan(PlannerInfo *root, - RelOptInfo *baserel, + RelOptInfo *foreignrel, Oid foreigntableid, ForeignPath *best_path, List *tlist, List *scan_clauses, Plan *outer_plan) { - PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private; - Index scan_relid = baserel->relid; + PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private; + Index scan_relid; List *fdw_private; List *remote_conds = NIL; List *remote_exprs = NIL; @@ -952,6 +1010,28 @@ postgresGetForeignPlan(PlannerInfo *root, List *retrieved_attrs; StringInfoData sql; ListCell *lc; + List *fdw_scan_tlist = NIL; + + /* + * For base relations, set scan_relid as the relid of the relation. For + * other kinds of relations set it to 0. + */ + if (foreignrel->reloptkind == RELOPT_BASEREL || + foreignrel->reloptkind == RELOPT_OTHER_MEMBER_REL) + scan_relid = foreignrel->relid; + else + { + scan_relid = 0; + + /* + * create_scan_plan() and create_foreignscan_plan() pass + * rel->baserestrictinfo + parameterization clauses through + * scan_clauses. For a join rel->baserestrictinfo is NIL and we are + * not considering parameterization right now, so there should be no + * scan_clauses for a joinrel. + */ + Assert(!scan_clauses); + } /* * Separate the scan_clauses into those that can be executed remotely and @@ -989,7 +1069,7 @@ postgresGetForeignPlan(PlannerInfo *root, } else if (list_member_ptr(fpinfo->local_conds, rinfo)) local_exprs = lappend(local_exprs, rinfo->clause); - else if (is_foreign_expr(root, baserel, rinfo->clause)) + else if (is_foreign_expr(root, foreignrel, rinfo->clause)) { remote_conds = lappend(remote_conds, rinfo); remote_exprs = lappend(remote_exprs, rinfo->clause); @@ -998,26 +1078,70 @@ postgresGetForeignPlan(PlannerInfo *root, local_exprs = lappend(local_exprs, rinfo->clause); } + if (foreignrel->reloptkind == RELOPT_JOINREL) + { + /* For a join relation, get the conditions from fdw_private structure */ + remote_conds = fpinfo->remote_conds; + local_exprs = fpinfo->local_conds; + + /* Build the list of columns to be fetched from the foreign server. */ + fdw_scan_tlist = build_tlist_to_deparse(foreignrel); + + /* + * Ensure that the outer plan produces a tuple whose descriptor + * matches our scan tuple slot. This is safe because all scans and + * joins support projection, so we never need to insert a Result node. + * Also, remove the local conditions from outer plan's quals, lest + * they will be evaluated twice, once by the local plan and once by + * the scan. + */ + if (outer_plan) + { + ListCell *lc; + + outer_plan->targetlist = fdw_scan_tlist; + + foreach(lc, local_exprs) + { + Join *join_plan = (Join *) outer_plan; + Node *qual = lfirst(lc); + + outer_plan->qual = list_delete(outer_plan->qual, qual); + + /* + * For an inner join the local conditions of foreign scan plan + * can be part of the joinquals as well. + */ + if (join_plan->jointype == JOIN_INNER) + join_plan->joinqual = list_delete(join_plan->joinqual, + qual); + } + } + } + /* * Build the query string to be sent for execution, and identify * expressions to be sent as parameters. */ initStringInfo(&sql); - deparseSelectStmtForRel(&sql, root, baserel, remote_conds, - best_path->path.pathkeys, &retrieved_attrs, - ¶ms_list); + deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist, + remote_conds, best_path->path.pathkeys, + &retrieved_attrs, ¶ms_list); /* * Build the fdw_private list that will be available to the executor. - * Items in the list must match enum FdwScanPrivateIndex, above. + * Items in the list must match order in enum FdwScanPrivateIndex. */ - fdw_private = list_make3(makeString(sql.data), + fdw_private = list_make4(makeString(sql.data), retrieved_attrs, - makeInteger(fpinfo->fetch_size)); + makeInteger(fpinfo->fetch_size), + makeInteger(foreignrel->umid)); + if (foreignrel->reloptkind == RELOPT_JOINREL) + fdw_private = lappend(fdw_private, + makeString(fpinfo->relation_name->data)); /* - * Create the ForeignScan node from target list, filtering expressions, - * remote parameter expressions, and FDW private information. + * Create the ForeignScan node for the given relation. * * Note that the remote parameter expressions are stored in the fdw_exprs * field of the finished plan node; we can't keep them in private state @@ -1028,7 +1152,7 @@ postgresGetForeignPlan(PlannerInfo *root, scan_relid, params_list, fdw_private, - NIL, /* no custom tlist */ + fdw_scan_tlist, remote_exprs, outer_plan); } @@ -1043,9 +1167,6 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan; EState *estate = node->ss.ps.state; PgFdwScanState *fsstate; - RangeTblEntry *rte; - Oid userid; - ForeignTable *table; UserMapping *user; int numParams; int i; @@ -1064,16 +1185,36 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) node->fdw_state = (void *) fsstate; /* - * Identify which user to do the remote access as. This should match what - * ExecCheckRTEPerms() does. + * Obtain the foreign server where to connect and user mapping to use for + * connection. For base relations we obtain this information from + * catalogs. For join relations, this information is frozen at the time of + * planning to ensure that the join is safe to pushdown. In case the + * information goes stale between planning and execution, plan will be + * invalidated and replanned. */ - rte = rt_fetch(fsplan->scan.scanrelid, estate->es_range_table); - userid = rte->checkAsUser ? rte->checkAsUser : GetUserId(); + if (fsplan->scan.scanrelid > 0) + { + ForeignTable *table; - /* Get info about foreign table. */ - fsstate->rel = node->ss.ss_currentRelation; - table = GetForeignTable(RelationGetRelid(fsstate->rel)); - user = GetUserMapping(userid, table->serverid); + /* + * Identify which user to do the remote access as. This should match + * what ExecCheckRTEPerms() does. + */ + RangeTblEntry *rte = rt_fetch(fsplan->scan.scanrelid, estate->es_range_table); + Oid userid = rte->checkAsUser ? rte->checkAsUser : GetUserId(); + + fsstate->rel = node->ss.ss_currentRelation; + table = GetForeignTable(RelationGetRelid(fsstate->rel)); + + user = GetUserMapping(userid, table->serverid); + } + else + { + Oid umid = intVal(list_nth(fsplan->fdw_private, FdwScanPrivateUserMappingOid)); + + user = GetUserMappingById(umid); + Assert(fsplan->fs_server == user->serverid); + } /* * Get connection to the foreign server. Connection manager will @@ -1105,8 +1246,16 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) ALLOCSET_SMALL_INITSIZE, ALLOCSET_SMALL_MAXSIZE); - /* Get info we'll need for input data conversion. */ - fsstate->attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(fsstate->rel)); + /* + * Get info we'll need for converting data fetched from the foreign server + * into local representation and error reporting during that process. + */ + if (fsplan->scan.scanrelid > 0) + fsstate->tupdesc = RelationGetDescr(fsstate->rel); + else + fsstate->tupdesc = node->ss.ss_ScanTupleSlot->tts_tupleDescriptor; + + fsstate->attinmeta = TupleDescGetAttInMetadata(fsstate->tupdesc); /* Prepare for output conversion of parameters used in remote query. */ numParams = list_length(fsplan->fdw_exprs); @@ -1825,6 +1974,34 @@ postgresIsForeignRelUpdatable(Relation rel) } /* + * postgresRecheckForeignScan + * Execute a local join execution plan for a foreign join + */ +static bool +postgresRecheckForeignScan(ForeignScanState *node, TupleTableSlot *slot) +{ + Index scanrelid = ((Scan *) node->ss.ps.plan)->scanrelid; + PlanState *outerPlan = outerPlanState(node); + TupleTableSlot *result; + + /* For base foreign relations, it suffices to set fdw_recheck_quals */ + if (scanrelid > 0) + return true; + + Assert(outerPlan != NULL); + + /* Execute a local join execution plan */ + result = ExecProcNode(outerPlan); + if (TupIsNull(result)) + return false; + + /* Store result in the given slot */ + ExecCopySlot(slot, result); + + return true; +} + +/* * postgresExplainForeignScan * Produce extra output for EXPLAIN of a ForeignScan on a foreign table */ @@ -1833,10 +2010,25 @@ postgresExplainForeignScan(ForeignScanState *node, ExplainState *es) { List *fdw_private; char *sql; + char *relations; + + fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private; + + /* + * Add names of relation handled by the foreign scan when the scan is a + * join + */ + if (list_length(fdw_private) > FdwScanPrivateRelations) + { + relations = strVal(list_nth(fdw_private, FdwScanPrivateRelations)); + ExplainPropertyText("Relations", relations, es); + } + /* + * Add remote query, when VERBOSE option is specified. + */ if (es->verbose) { - fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private; sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql)); ExplainPropertyText("Remote SQL", sql, es); } @@ -1865,26 +2057,29 @@ postgresExplainForeignModify(ModifyTableState *mtstate, /* * estimate_path_cost_size - * Get cost and size estimates for a foreign scan + * Get cost and size estimates for a foreign scan on given foreign relation + * either a base relation or a join between foreign relations. * - * We assume that all the baserestrictinfo clauses will be applied, plus - * any join clauses listed in join_conds. + * param_join_conds are the parameterization clauses with outer relations. + * pathkeys specify the expected sort order if any for given path being costed. + * + * The function returns the cost and size estimates in p_row, p_width, + * p_startup_cost and p_total_cost variables. */ static void estimate_path_cost_size(PlannerInfo *root, - RelOptInfo *baserel, - List *join_conds, + RelOptInfo *foreignrel, + List *param_join_conds, List *pathkeys, double *p_rows, int *p_width, Cost *p_startup_cost, Cost *p_total_cost) { - PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) baserel->fdw_private; + PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private; double rows; double retrieved_rows; int width; Cost startup_cost; Cost total_cost; - Cost run_cost; Cost cpu_per_tuple; /* @@ -1896,39 +2091,49 @@ estimate_path_cost_size(PlannerInfo *root, */ if (fpinfo->use_remote_estimate) { - List *remote_join_conds; - List *local_join_conds; + List *remote_param_join_conds; + List *local_param_join_conds; StringInfoData sql; - List *retrieved_attrs; PGconn *conn; Selectivity local_sel; QualCost local_cost; + List *fdw_scan_tlist = NIL; List *remote_conds; + /* Required only to be passed to deparseSelectStmtForRel */ + List *retrieved_attrs; + /* - * join_conds might contain both clauses that are safe to send across, - * and clauses that aren't. + * param_join_conds might contain both clauses that are safe to send + * across, and clauses that aren't. */ - classifyConditions(root, baserel, join_conds, - &remote_join_conds, &local_join_conds); + classifyConditions(root, foreignrel, param_join_conds, + &remote_param_join_conds, &local_param_join_conds); + + /* Build the list of columns to be fetched from the foreign server. */ + if (foreignrel->reloptkind == RELOPT_JOINREL) + fdw_scan_tlist = build_tlist_to_deparse(foreignrel); + else + fdw_scan_tlist = NIL; /* * The complete list of remote conditions includes everything from * baserestrictinfo plus any extra join_conds relevant to this * particular path. */ - remote_conds = list_concat(list_copy(remote_join_conds), + remote_conds = list_concat(list_copy(remote_param_join_conds), fpinfo->remote_conds); /* * Construct EXPLAIN query including the desired SELECT, FROM, and - * WHERE clauses. Params and other-relation Vars are replaced by - * dummy values. + * WHERE clauses. Params and other-relation Vars are replaced by dummy + * values, so don't request params_list. */ initStringInfo(&sql); appendStringInfoString(&sql, "EXPLAIN "); - deparseSelectStmtForRel(&sql, root, baserel, remote_conds, pathkeys, - &retrieved_attrs, NULL); + deparseSelectStmtForRel(&sql, root, foreignrel, fdw_scan_tlist, + remote_conds, pathkeys, &retrieved_attrs, + NULL); /* Get the remote estimate */ conn = GetConnection(fpinfo->user, false); @@ -1940,8 +2145,8 @@ estimate_path_cost_size(PlannerInfo *root, /* Factor in the selectivity of the locally-checked quals */ local_sel = clauselist_selectivity(root, - local_join_conds, - baserel->relid, + local_param_join_conds, + foreignrel->relid, JOIN_INNER, NULL); local_sel *= fpinfo->local_conds_sel; @@ -1951,41 +2156,113 @@ estimate_path_cost_size(PlannerInfo *root, /* Add in the eval cost of the locally-checked quals */ startup_cost += fpinfo->local_conds_cost.startup; total_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows; - cost_qual_eval(&local_cost, local_join_conds, root); + cost_qual_eval(&local_cost, local_param_join_conds, root); startup_cost += local_cost.startup; total_cost += local_cost.per_tuple * retrieved_rows; } else { + Cost run_cost = 0; + /* * We don't support join conditions in this mode (hence, no * parameterized paths can be made). */ - Assert(join_conds == NIL); - - /* Use rows/width estimates made by set_baserel_size_estimates. */ - rows = baserel->rows; - width = baserel->width; + Assert(param_join_conds == NIL); /* - * Back into an estimate of the number of retrieved rows. Just in - * case this is nuts, clamp to at most baserel->tuples. + * Use rows/width estimates made by set_baserel_size_estimates() for + * base foreign relations and set_joinrel_size_estimates() for join + * between foreign relations. */ + rows = foreignrel->rows; + width = foreignrel->width; + + /* Back into an estimate of the number of retrieved rows. */ retrieved_rows = clamp_row_est(rows / fpinfo->local_conds_sel); - retrieved_rows = Min(retrieved_rows, baserel->tuples); - /* - * Cost as though this were a seqscan, which is pessimistic. We - * effectively imagine the local_conds are being evaluated remotely, - * too. - */ - startup_cost = 0; - run_cost = 0; - run_cost += seq_page_cost * baserel->pages; + if (foreignrel->reloptkind != RELOPT_JOINREL) + { + /* Clamp retrieved rows estimates to at most foreignrel->tuples. */ + retrieved_rows = Min(retrieved_rows, foreignrel->tuples); + + /* + * Cost as though this were a seqscan, which is pessimistic. We + * effectively imagine the local_conds are being evaluated + * remotely, too. + */ + startup_cost = 0; + run_cost = 0; + run_cost += seq_page_cost * foreignrel->pages; - startup_cost += baserel->baserestrictcost.startup; - cpu_per_tuple = cpu_tuple_cost + baserel->baserestrictcost.per_tuple; - run_cost += cpu_per_tuple * baserel->tuples; + startup_cost += foreignrel->baserestrictcost.startup; + cpu_per_tuple = cpu_tuple_cost + foreignrel->baserestrictcost.per_tuple; + run_cost += cpu_per_tuple * foreignrel->tuples; + } + else + { + PgFdwRelationInfo *fpinfo_i; + PgFdwRelationInfo *fpinfo_o; + QualCost join_cost; + QualCost remote_conds_cost; + double nrows; + + /* For join we expect inner and outer relations set */ + Assert(fpinfo->innerrel && fpinfo->outerrel); + + fpinfo_i = (PgFdwRelationInfo *) fpinfo->innerrel->fdw_private; + fpinfo_o = (PgFdwRelationInfo *) fpinfo->outerrel->fdw_private; + + /* Estimate of number of rows in cross product */ + nrows = fpinfo_i->rows * fpinfo_o->rows; + /* Clamp retrieved rows estimate to at most size of cross product */ + retrieved_rows = Min(retrieved_rows, nrows); + + /* + * The cost of foreign join is estimated as cost of generating + * rows for the joining relations + cost for applying quals on the + * rows. + */ + + /* Calculate the cost of clauses pushed down the foreign server */ + cost_qual_eval(&remote_conds_cost, fpinfo->remote_conds, root); + /* Calculate the cost of applying join clauses */ + cost_qual_eval(&join_cost, fpinfo->joinclauses, root); + + /* + * Startup cost includes startup cost of joining relations and the + * startup cost for join and other clauses. We do not include the + * startup cost specific to join strategy (e.g. setting up hash + * tables) since we do not know what strategy the foreign server + * is going to use. + */ + startup_cost = fpinfo_i->rel_startup_cost + fpinfo_o->rel_startup_cost; + startup_cost += join_cost.startup; + startup_cost += remote_conds_cost.startup; + startup_cost += fpinfo->local_conds_cost.startup; + + /* + * Run time cost includes: + * + * 1. Run time cost (total_cost - startup_cost) of relations being + * joined + * + * 2. Run time cost of applying join clauses on the cross product + * of the joining relations. + * + * 3. Run time cost of applying pushed down other clauses on the + * result of join + * + * 4. Run time cost of applying nonpushable other clauses locally + * on the result fetched from the foreign server. + */ + run_cost = fpinfo_i->rel_total_cost - fpinfo_i->rel_startup_cost; + run_cost += fpinfo_o->rel_total_cost - fpinfo_o->rel_startup_cost; + run_cost += nrows * join_cost.per_tuple; + nrows = clamp_row_est(nrows * fpinfo->joinclause_sel); + run_cost += nrows * remote_conds_cost.per_tuple; + run_cost += fpinfo->local_conds_cost.per_tuple * retrieved_rows; + } /* * Without remote estimates, we have no real way to estimate the cost @@ -2006,6 +2283,15 @@ estimate_path_cost_size(PlannerInfo *root, } /* + * Cache the costs prior to adding the costs for transferring data from + * the foreign server. These costs are useful for costing the join between + * this relation and another foreign relation, when the cost of join can + * not be obtained from the foreign server. + */ + fpinfo->rel_startup_cost = startup_cost; + fpinfo->rel_total_cost = total_cost; + + /* * Add some additional cost factors to account for connection overhead * (fdw_startup_cost), transferring data across the network * (fdw_tuple_cost per retrieved row), and local manipulation of the data @@ -2237,11 +2523,15 @@ fetch_more_data(ForeignScanState *node) for (i = 0; i < numrows; i++) { + ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan; + + Assert(IsA(fsplan, ForeignScan)); fsstate->tuples[i] = make_tuple_from_result_row(res, i, fsstate->rel, fsstate->attinmeta, fsstate->retrieved_attrs, + node, fsstate->temp_cxt); } @@ -2460,6 +2750,7 @@ store_returning_result(PgFdwModifyState *fmstate, fmstate->rel, fmstate->attinmeta, fmstate->retrieved_attrs, + NULL, fmstate->temp_cxt); /* tuple will be deleted when it is cleared from the slot */ ExecStoreTuple(newtup, slot, InvalidBuffer, true); @@ -2770,6 +3061,7 @@ analyze_row_processor(PGresult *res, int row, PgFdwAnalyzeState *astate) astate->rel, astate->attinmeta, astate->retrieved_attrs, + NULL, astate->temp_cxt); MemoryContextSwitchTo(oldcontext); @@ -3045,6 +3337,345 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid) } /* + * Assess whether the join between inner and outer relations can be pushed down + * to the foreign server. As a side effect, save information we obtain in this + * function to PgFdwRelationInfo passed in. + * + * Joins that satisfy conditions below are safe to push down. + * + * 1) Join type is INNER or OUTER (one of LEFT/RIGHT/FULL) + * 2) Both outer and inner portions are safe to push-down + * 3) All foreign tables in the join belong to the same foreign server and use + * the same user mapping. + * 4) All join conditions are safe to push down + * 5) No relation has local filter (this can be relaxed for INNER JOIN, if we + * can move unpushable clauses upwards in the join tree). + */ +static bool +foreign_join_ok(PlannerInfo *root, RelOptInfo *joinrel, JoinType jointype, + RelOptInfo *outerrel, RelOptInfo *innerrel, + JoinPathExtraData *extra) +{ + PgFdwRelationInfo *fpinfo; + PgFdwRelationInfo *fpinfo_o; + PgFdwRelationInfo *fpinfo_i; + ListCell *lc; + List *joinclauses; + List *otherclauses; + + /* + * We support pushing down INNER, LEFT, RIGHT and FULL OUTER joins. + * Constructing queries representing SEMI and ANTI joins is hard, hence + * not considered right now. + */ + if (jointype != JOIN_INNER && jointype != JOIN_LEFT && + jointype != JOIN_RIGHT && jointype != JOIN_FULL) + return false; + + /* + * If either of the joining relations is marked as unsafe to pushdown, the + * join can not be pushed down. + */ + fpinfo = (PgFdwRelationInfo *) joinrel->fdw_private; + fpinfo_o = (PgFdwRelationInfo *) outerrel->fdw_private; + fpinfo_i = (PgFdwRelationInfo *) innerrel->fdw_private; + if (!fpinfo_o || !fpinfo_o->pushdown_safe || + !fpinfo_i || !fpinfo_i->pushdown_safe) + return false; + + /* + * If joining relations have local conditions, those conditions are + * required to be applied before joining the relations. Hence the join can + * not be pushed down. + */ + if (fpinfo_o->local_conds || fpinfo_i->local_conds) + return false; + + /* Separate restrict list into join quals and quals on join relation */ + if (IS_OUTER_JOIN(jointype)) + extract_actual_join_clauses(extra->restrictlist, &joinclauses, &otherclauses); + else + { + /* + * Unlike an outer join, for inner join, the join result contains only + * the rows which satisfy join clauses, similar to the other clause. + * Hence all clauses can be treated as other quals. This helps to push + * a join down to the foreign server even if some of its join quals + * are not safe to pushdown. + */ + otherclauses = extract_actual_clauses(extra->restrictlist, false); + joinclauses = NIL; + } + + /* Join quals must be safe to push down. */ + foreach(lc, joinclauses) + { + Expr *expr = (Expr *) lfirst(lc); + + if (!is_foreign_expr(root, joinrel, expr)) + return false; + } + + /* Save the join clauses, for later use. */ + fpinfo->joinclauses = joinclauses; + + /* + * Other clauses are applied after the join has been performed and thus + * need not be all pushable. We will push those which can be pushed to + * reduce the number of rows fetched from the foreign server. Rest of them + * will be applied locally after fetching join result. Add them to fpinfo + * so that other joins involving this joinrel will know that this joinrel + * has local clauses. + */ + foreach(lc, otherclauses) + { + Expr *expr = (Expr *) lfirst(lc); + + if (!is_foreign_expr(root, joinrel, expr)) + fpinfo->local_conds = lappend(fpinfo->local_conds, expr); + else + fpinfo->remote_conds = lappend(fpinfo->remote_conds, expr); + } + + fpinfo->outerrel = outerrel; + fpinfo->innerrel = innerrel; + fpinfo->jointype = jointype; + + /* + * If user is willing to estimate cost for a scan of either of the joining + * relations using EXPLAIN, he intends to estimate scans on that relation + * more accurately. Then, it makes sense to estimate the cost the join + * with that relation more accurately using EXPLAIN. + */ + fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate || + fpinfo_i->use_remote_estimate; + + /* + * Since both the joining relations come from the same server, the server + * level options should have same value for both the relations. Pick from + * any side. + */ + fpinfo->fdw_startup_cost = fpinfo_o->fdw_startup_cost; + fpinfo->fdw_tuple_cost = fpinfo_o->fdw_tuple_cost; + + /* Mark that this join can be pushed down safely */ + fpinfo->pushdown_safe = true; + + /* + * Set fetch size to maximum of the joining sides, since we are expecting + * the rows returned by the join to be proportional to the relation sizes. + */ + if (fpinfo_o->fetch_size > fpinfo_i->fetch_size) + fpinfo->fetch_size = fpinfo_o->fetch_size; + else + fpinfo->fetch_size = fpinfo_i->fetch_size; + + /* + * Pull the other remote conditions from the joining relations into join + * clauses or other remote clauses (remote_conds) of this relation. This + * avoids building subqueries at every join step. + * + * For an inner join, clauses from both the relations are added to the + * other remote clauses. For an OUTER join, the clauses from the outer + * side are added to remote_conds since those can be evaluated after the + * join is evaluated. The clauses from inner side are added to the + * joinclauses, since they need to evaluated while constructing the join. + * + * The joining sides can not have local conditions, thus no need to test + * shippability of the clauses being pulled up. + */ + switch (jointype) + { + case JOIN_INNER: + fpinfo->remote_conds = list_concat(fpinfo->remote_conds, + fpinfo_i->remote_conds); + fpinfo->remote_conds = list_concat(fpinfo->remote_conds, + fpinfo_o->remote_conds); + break; + + case JOIN_LEFT: + fpinfo->joinclauses = list_concat(fpinfo->joinclauses, + fpinfo_i->remote_conds); + fpinfo->remote_conds = list_concat(fpinfo->remote_conds, + fpinfo_o->remote_conds); + break; + + case JOIN_RIGHT: + fpinfo->joinclauses = list_concat(fpinfo->joinclauses, + fpinfo_o->remote_conds); + fpinfo->remote_conds = list_concat(fpinfo->remote_conds, + fpinfo_i->remote_conds); + break; + + case JOIN_FULL: + fpinfo->joinclauses = list_concat(fpinfo->joinclauses, + fpinfo_i->remote_conds); + fpinfo->joinclauses = list_concat(fpinfo->joinclauses, + fpinfo_o->remote_conds); + break; + + default: + /* Should not happen, we have just check this above */ + elog(ERROR, "unsupported join type %d", jointype); + } + + /* + * Set the string describing this join relation to be used in EXPLAIN + * output of corresponding ForeignScan. + */ + fpinfo->relation_name = makeStringInfo(); + appendStringInfo(fpinfo->relation_name, "(%s) %s JOIN (%s)", + fpinfo_o->relation_name->data, + get_jointype_name(fpinfo->jointype), + fpinfo_i->relation_name->data); + + return true; +} + +/* + * postgresGetForeignJoinPaths + * Add possible ForeignPath to joinrel, if join is safe to push down. + */ +static void +postgresGetForeignJoinPaths(PlannerInfo *root, + RelOptInfo *joinrel, + RelOptInfo *outerrel, + RelOptInfo *innerrel, + JoinType jointype, + JoinPathExtraData *extra) +{ + PgFdwRelationInfo *fpinfo; + ForeignPath *joinpath; + double rows; + int width; + Cost startup_cost; + Cost total_cost; + Path *epq_path; /* Path to create plan to be executed when + * EvalPlanQual gets triggered. */ + + /* + * Skip if this join combination has been considered already. + */ + if (joinrel->fdw_private) + return; + + /* + * Create unfinished PgFdwRelationInfo entry which is used to indicate + * that the join relation is already considered, so that we won't waste + * time in judging safety of join pushdown and adding the same paths again + * if found safe. Once we know that this join can be pushed down, we fill + * the entry. + */ + fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo)); + fpinfo->pushdown_safe = false; + joinrel->fdw_private = fpinfo; + /* attrs_used is only for base relations. */ + fpinfo->attrs_used = NULL; + + /* + * In case there is a possibility that EvalPlanQual will be executed, we + * should be able to reconstruct the row, from base relations applying all + * the conditions. We create a local plan from a suitable local path + * available in the path list. In case such a path doesn't exist, we can + * not push the join to the foreign server since we won't be able to + * reconstruct the row for EvalPlanQual(). Find an alternative local path + * before we add ForeignPath, lest the new path would kick possibly the + * only local path. Do this before calling foreign_join_ok(), since that + * function updates fpinfo and marks it as pushable if the join is found + * to be pushable. + */ + if (root->parse->commandType == CMD_DELETE || + root->parse->commandType == CMD_UPDATE || + root->rowMarks) + { + epq_path = GetExistingLocalJoinPath(joinrel); + if (!epq_path) + { + elog(DEBUG3, "could not push down foreign join because a local path suitable for EPQ checks was not found"); + return; + } + } + else + epq_path = NULL; + + if (!foreign_join_ok(root, joinrel, jointype, outerrel, innerrel, extra)) + { + /* Free path required for EPQ if we copied one; we don't need it now */ + if (epq_path) + pfree(epq_path); + return; + } + + /* + * Compute the selectivity and cost of the local_conds, so we don't have + * to do it over again for each path. The best we can do for these + * conditions is to estimate selectivity on the basis of local statistics. + * The local conditions are applied after the join has been computed on + * the remote side like quals in WHERE clause, so pass jointype as + * JOIN_INNER. + */ + fpinfo->local_conds_sel = clauselist_selectivity(root, + fpinfo->local_conds, + 0, + JOIN_INNER, + NULL); + cost_qual_eval(&fpinfo->local_conds_cost, fpinfo->local_conds, root); + + /* + * If we are going to estimate the costs using EXPLAIN, we will need + * connection information. Fill it here. + */ + if (fpinfo->use_remote_estimate) + fpinfo->user = GetUserMappingById(joinrel->umid); + else + { + fpinfo->user = NULL; + + /* + * If we are going to estimate costs locally, estimate the join clause + * selectivity here while we have special join info. + */ + fpinfo->joinclause_sel = clauselist_selectivity(root, fpinfo->joinclauses, + 0, fpinfo->jointype, + extra->sjinfo); + + } + fpinfo->server = GetForeignServer(joinrel->serverid); + + /* Estimate costs for bare join relation */ + estimate_path_cost_size(root, joinrel, NIL, NIL, &rows, + &width, &startup_cost, &total_cost); + /* Now update this information in the joinrel */ + joinrel->rows = rows; + joinrel->width = width; + fpinfo->rows = rows; + fpinfo->width = width; + fpinfo->startup_cost = startup_cost; + fpinfo->total_cost = total_cost; + + /* + * Create a new join path and add it to the joinrel which represents a + * join between foreign tables. + */ + joinpath = create_foreignscan_path(root, + joinrel, + rows, + startup_cost, + total_cost, + NIL, /* no pathkeys */ + NULL, /* no required_outer */ + epq_path, + NULL); /* no fdw_private */ + + /* Add generated path into joinrel by add_path(). */ + add_path(joinrel, (Path *) joinpath); + + /* XXX Consider pathkeys for the join relation */ + + /* XXX Consider parameterized paths for the join relation */ +} + +/* * Create a tuple from the specified row of the PGresult. * * rel is the local representation of the foreign table, attinmeta is @@ -3058,10 +3689,11 @@ make_tuple_from_result_row(PGresult *res, Relation rel, AttInMetadata *attinmeta, List *retrieved_attrs, + ForeignScanState *fsstate, MemoryContext temp_context) { HeapTuple tuple; - TupleDesc tupdesc = RelationGetDescr(rel); + TupleDesc tupdesc; Datum *values; bool *nulls; ItemPointer ctid = NULL; @@ -3080,6 +3712,17 @@ make_tuple_from_result_row(PGresult *res, */ oldcontext = MemoryContextSwitchTo(temp_context); + if (rel) + tupdesc = RelationGetDescr(rel); + else + { + PgFdwScanState *fdw_sstate; + + Assert(fsstate); + fdw_sstate = (PgFdwScanState *) fsstate->fdw_state; + tupdesc = fdw_sstate->tupdesc; + } + values = (Datum *) palloc0(tupdesc->natts * sizeof(Datum)); nulls = (bool *) palloc(tupdesc->natts * sizeof(bool)); /* Initialize to nulls for any columns not present in result */ @@ -3090,6 +3733,7 @@ make_tuple_from_result_row(PGresult *res, */ errpos.rel = rel; errpos.cur_attno = 0; + errpos.fsstate = fsstate; errcallback.callback = conversion_error_callback; errcallback.arg = (void *) &errpos; errcallback.previous = error_context_stack; @@ -3178,13 +3822,46 @@ make_tuple_from_result_row(PGresult *res, static void conversion_error_callback(void *arg) { + const char *attname = NULL; + const char *relname = NULL; ConversionLocation *errpos = (ConversionLocation *) arg; - TupleDesc tupdesc = RelationGetDescr(errpos->rel); - if (errpos->cur_attno > 0 && errpos->cur_attno <= tupdesc->natts) - errcontext("column \"%s\" of foreign table \"%s\"", - NameStr(tupdesc->attrs[errpos->cur_attno - 1]->attname), - RelationGetRelationName(errpos->rel)); + if (errpos->rel) + { + /* error occurred in a scan against a foreign table */ + TupleDesc tupdesc = RelationGetDescr(errpos->rel); + + if (errpos->cur_attno > 0 && errpos->cur_attno <= tupdesc->natts) + attname = NameStr(tupdesc->attrs[errpos->cur_attno - 1]->attname); + else if (errpos->cur_attno == SelfItemPointerAttributeNumber) + attname = "ctid"; + + relname = RelationGetRelationName(errpos->rel); + } + else + { + /* error occurred in a scan against a foreign join */ + ForeignScanState *fsstate = errpos->fsstate; + ForeignScan *fsplan = (ForeignScan *) fsstate->ss.ps.plan; + EState *estate = fsstate->ss.ps.state; + TargetEntry *tle; + Var *var; + RangeTblEntry *rte; + + Assert(IsA(fsplan, ForeignScan)); + tle = (TargetEntry *) list_nth(fsplan->fdw_scan_tlist, + errpos->cur_attno - 1); + Assert(IsA(tle, TargetEntry)); + var = (Var *) tle->expr; + Assert(IsA(var, Var)); + + rte = rt_fetch(var->varno, estate->es_range_table); + relname = get_rel_name(rte->relid); + attname = get_relid_attribute_name(rte->relid, var->varattno); + } + + if (attname && relname) + errcontext("column \"%s\" of foreign table \"%s\"", attname, relname); } /* |