diff options
author | Robert Haas <rhaas@postgresql.org> | 2016-03-18 13:48:58 -0400 |
---|---|---|
committer | Robert Haas <rhaas@postgresql.org> | 2016-03-18 13:55:52 -0400 |
commit | 0bf3ae88af330496517722e391e7c975e6bad219 (patch) | |
tree | 46220c3ebfc9616af8d683c74395b18045c59a8a /contrib/postgres_fdw/postgres_fdw.c | |
parent | 3422fecccadb021b7b4cdbc73b2c29f66f031761 (diff) | |
download | postgresql-0bf3ae88af330496517722e391e7c975e6bad219.tar.gz postgresql-0bf3ae88af330496517722e391e7c975e6bad219.zip |
Directly modify foreign tables.
postgres_fdw can now sent an UPDATE or DELETE statement directly to
the foreign server in simple cases, rather than sending a SELECT FOR
UPDATE statement and then updating or deleting rows one-by-one.
Etsuro Fujita, reviewed by Rushabh Lathia, Shigeru Hanada, Kyotaro
Horiguchi, Albe Laurenz, Thom Brown, and me.
Diffstat (limited to 'contrib/postgres_fdw/postgres_fdw.c')
-rw-r--r-- | contrib/postgres_fdw/postgres_fdw.c | 683 |
1 files changed, 612 insertions, 71 deletions
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index e446cc5645a..d6db8340129 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -61,6 +61,8 @@ enum FdwScanPrivateIndex { /* SQL statement to execute remotely (as a String node) */ FdwScanPrivateSelectSql, + /* List of restriction clauses that can be executed remotely */ + FdwScanPrivateRemoteConds, /* Integer list of attribute numbers retrieved by the SELECT */ FdwScanPrivateRetrievedAttrs, /* Integer representing the desired fetch_size */ @@ -98,6 +100,27 @@ enum FdwModifyPrivateIndex }; /* + * Similarly, this enum describes what's kept in the fdw_private list for + * a ForeignScan node that modifies a foreign table directly. We store: + * + * 1) UPDATE/DELETE statement text to be sent to the remote server + * 2) Boolean flag showing if the remote query has a RETURNING clause + * 3) Integer list of attribute numbers retrieved by RETURNING, if any + * 4) Boolean flag showing if we set the command es_processed + */ +enum FdwDirectModifyPrivateIndex +{ + /* SQL statement to execute remotely (as a String node) */ + FdwDirectModifyPrivateUpdateSql, + /* has-returning flag (as an integer Value node) */ + FdwDirectModifyPrivateHasReturning, + /* Integer list of attribute numbers retrieved by RETURNING */ + FdwDirectModifyPrivateRetrievedAttrs, + /* set-processed flag (as an integer Value node) */ + FdwDirectModifyPrivateSetProcessed +}; + +/* * Execution state of a foreign scan using postgres_fdw. */ typedef struct PgFdwScanState @@ -164,6 +187,36 @@ typedef struct PgFdwModifyState } PgFdwModifyState; /* + * Execution state of a foreign scan that modifies a foreign table directly. + */ +typedef struct PgFdwDirectModifyState +{ + Relation rel; /* relcache entry for the foreign table */ + AttInMetadata *attinmeta; /* attribute datatype conversion metadata */ + + /* extracted fdw_private data */ + char *query; /* text of UPDATE/DELETE command */ + bool has_returning; /* is there a RETURNING clause? */ + List *retrieved_attrs; /* attr numbers retrieved by RETURNING */ + bool set_processed; /* do we set the command es_processed? */ + + /* for remote query execution */ + PGconn *conn; /* connection for the update */ + int numParams; /* number of parameters passed to query */ + FmgrInfo *param_flinfo; /* output conversion functions for them */ + List *param_exprs; /* executable expressions for param values */ + const char **param_values; /* textual values of query parameters */ + + /* for storing result tuples */ + PGresult *result; /* result for query */ + int num_tuples; /* # of result tuples */ + int next_tuple; /* index of next one to return */ + + /* working memory context */ + MemoryContext temp_cxt; /* context for per-tuple temporary data */ +} PgFdwDirectModifyState; + +/* * Workspace for analyzing a foreign table. */ typedef struct PgFdwAnalyzeState @@ -263,6 +316,13 @@ static TupleTableSlot *postgresExecForeignDelete(EState *estate, static void postgresEndForeignModify(EState *estate, ResultRelInfo *resultRelInfo); static int postgresIsForeignRelUpdatable(Relation rel); +static bool postgresPlanDirectModify(PlannerInfo *root, + ModifyTable *plan, + Index resultRelation, + int subplan_index); +static void postgresBeginDirectModify(ForeignScanState *node, int eflags); +static TupleTableSlot *postgresIterateDirectModify(ForeignScanState *node); +static void postgresEndDirectModify(ForeignScanState *node); static void postgresExplainForeignScan(ForeignScanState *node, ExplainState *es); static void postgresExplainForeignModify(ModifyTableState *mtstate, @@ -270,6 +330,8 @@ static void postgresExplainForeignModify(ModifyTableState *mtstate, List *fdw_private, int subplan_index, ExplainState *es); +static void postgresExplainDirectModify(ForeignScanState *node, + ExplainState *es); static bool postgresAnalyzeForeignTable(Relation relation, AcquireSampleRowsFunc *func, BlockNumber *totalpages); @@ -311,6 +373,18 @@ static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate, TupleTableSlot *slot); static void store_returning_result(PgFdwModifyState *fmstate, TupleTableSlot *slot, PGresult *res); +static void execute_dml_stmt(ForeignScanState *node); +static TupleTableSlot *get_returning_data(ForeignScanState *node); +static void prepare_query_params(PlanState *node, + List *fdw_exprs, + int numParams, + FmgrInfo **param_flinfo, + List **param_exprs, + const char ***param_values); +static void process_query_params(ExprContext *econtext, + FmgrInfo *param_flinfo, + List *param_exprs, + const char **param_values); static int postgresAcquireSampleRowsFunc(Relation relation, int elevel, HeapTuple *rows, int targrows, double *totalrows, @@ -362,12 +436,17 @@ postgres_fdw_handler(PG_FUNCTION_ARGS) routine->ExecForeignDelete = postgresExecForeignDelete; routine->EndForeignModify = postgresEndForeignModify; routine->IsForeignRelUpdatable = postgresIsForeignRelUpdatable; + routine->PlanDirectModify = postgresPlanDirectModify; + routine->BeginDirectModify = postgresBeginDirectModify; + routine->IterateDirectModify = postgresIterateDirectModify; + routine->EndDirectModify = postgresEndDirectModify; /* Function for EvalPlanQual rechecks */ routine->RecheckForeignScan = postgresRecheckForeignScan; /* Support functions for EXPLAIN */ routine->ExplainForeignScan = postgresExplainForeignScan; routine->ExplainForeignModify = postgresExplainForeignModify; + routine->ExplainDirectModify = postgresExplainDirectModify; /* Support functions for ANALYZE */ routine->AnalyzeForeignTable = postgresAnalyzeForeignTable; @@ -1122,7 +1201,8 @@ postgresGetForeignPlan(PlannerInfo *root, * Build the fdw_private list that will be available to the executor. * Items in the list must match order in enum FdwScanPrivateIndex. */ - fdw_private = list_make4(makeString(sql.data), + fdw_private = list_make5(makeString(sql.data), + remote_conds, retrieved_attrs, makeInteger(fpinfo->fetch_size), makeInteger(foreignrel->umid)); @@ -1159,8 +1239,6 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) PgFdwScanState *fsstate; UserMapping *user; int numParams; - int i; - ListCell *lc; /* * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL. @@ -1247,42 +1325,18 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) fsstate->attinmeta = TupleDescGetAttInMetadata(fsstate->tupdesc); - /* Prepare for output conversion of parameters used in remote query. */ - numParams = list_length(fsplan->fdw_exprs); - fsstate->numParams = numParams; - fsstate->param_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * numParams); - - i = 0; - foreach(lc, fsplan->fdw_exprs) - { - Node *param_expr = (Node *) lfirst(lc); - Oid typefnoid; - bool isvarlena; - - getTypeOutputInfo(exprType(param_expr), &typefnoid, &isvarlena); - fmgr_info(typefnoid, &fsstate->param_flinfo[i]); - i++; - } - /* - * Prepare remote-parameter expressions for evaluation. (Note: in - * practice, we expect that all these expressions will be just Params, so - * we could possibly do something more efficient than using the full - * expression-eval machinery for this. But probably there would be little - * benefit, and it'd require postgres_fdw to know more than is desirable - * about Param evaluation.) - */ - fsstate->param_exprs = (List *) - ExecInitExpr((Expr *) fsplan->fdw_exprs, - (PlanState *) node); - - /* - * Allocate buffer for text form of query parameters, if any. + * Prepare for processing of parameters used in remote query, if any. */ + numParams = list_length(fsplan->fdw_exprs); + fsstate->numParams = numParams; if (numParams > 0) - fsstate->param_values = (const char **) palloc0(numParams * sizeof(char *)); - else - fsstate->param_values = NULL; + prepare_query_params((PlanState *) node, + fsplan->fdw_exprs, + numParams, + &fsstate->param_flinfo, + &fsstate->param_exprs, + &fsstate->param_values); } /* @@ -1447,13 +1501,6 @@ postgresAddForeignUpdateTargets(Query *parsetree, /* * postgresPlanForeignModify * Plan an insert/update/delete operation on a foreign table - * - * Note: currently, the plan tree generated for UPDATE/DELETE will always - * include a ForeignScan that retrieves ctids (using SELECT FOR UPDATE) - * and then the ModifyTable node will have to execute individual remote - * UPDATE/DELETE commands. If there are no local conditions or joins - * needed, it'd be better to let the scan node do UPDATE/DELETE RETURNING - * and then do nothing at ModifyTable. Room for future optimization ... */ static List * postgresPlanForeignModify(PlannerInfo *root, @@ -1992,6 +2039,314 @@ postgresRecheckForeignScan(ForeignScanState *node, TupleTableSlot *slot) } /* + * postgresPlanDirectModify + * Consider a direct foreign table modification + * + * Decide whether it is safe to modify a foreign table directly, and if so, + * rewrite subplan accordingly. + */ +static bool +postgresPlanDirectModify(PlannerInfo *root, + ModifyTable *plan, + Index resultRelation, + int subplan_index) +{ + CmdType operation = plan->operation; + Plan *subplan = (Plan *) list_nth(plan->plans, subplan_index); + RangeTblEntry *rte = planner_rt_fetch(resultRelation, root); + Relation rel; + StringInfoData sql; + ForeignScan *fscan; + List *targetAttrs = NIL; + List *remote_conds; + List *params_list = NIL; + List *returningList = NIL; + List *retrieved_attrs = NIL; + + /* + * Decide whether it is safe to modify a foreign table directly. + */ + + /* + * The table modification must be an UPDATE or DELETE. + */ + if (operation != CMD_UPDATE && operation != CMD_DELETE) + return false; + + /* + * It's unsafe to modify a foreign table directly if there are any local + * joins needed. + */ + if (!IsA(subplan, ForeignScan)) + return false; + + /* + * It's unsafe to modify a foreign table directly if there are any quals + * that should be evaluated locally. + */ + if (subplan->qual != NIL) + return false; + + /* + * We can't handle an UPDATE or DELETE on a foreign join for now. + */ + fscan = (ForeignScan *) subplan; + if (fscan->scan.scanrelid == 0) + return false; + + /* + * It's unsafe to update a foreign table directly, if any expressions to + * assign to the target columns are unsafe to evaluate remotely. + */ + if (operation == CMD_UPDATE) + { + RelOptInfo *baserel = root->simple_rel_array[resultRelation]; + int col; + + /* + * We transmit only columns that were explicitly targets of the UPDATE, + * so as to avoid unnecessary data transmission. + */ + col = -1; + while ((col = bms_next_member(rte->updatedCols, col)) >= 0) + { + /* bit numbers are offset by FirstLowInvalidHeapAttributeNumber */ + AttrNumber attno = col + FirstLowInvalidHeapAttributeNumber; + TargetEntry *tle; + + if (attno <= InvalidAttrNumber) /* shouldn't happen */ + elog(ERROR, "system-column update is not supported"); + + tle = get_tle_by_resno(subplan->targetlist, attno); + + if (!is_foreign_expr(root, baserel, (Expr *) tle->expr)) + return false; + + targetAttrs = lappend_int(targetAttrs, attno); + } + } + + /* + * Ok, rewrite subplan so as to modify the foreign table directly. + */ + initStringInfo(&sql); + + /* + * Core code already has some lock on each rel being planned, so we can + * use NoLock here. + */ + rel = heap_open(rte->relid, NoLock); + + /* + * Extract the baserestrictinfo clauses that can be evaluated remotely. + */ + remote_conds = (List *) list_nth(fscan->fdw_private, + FdwScanPrivateRemoteConds); + + /* + * Extract the relevant RETURNING list if any. + */ + if (plan->returningLists) + returningList = (List *) list_nth(plan->returningLists, subplan_index); + + /* + * Construct the SQL command string. + */ + switch (operation) + { + case CMD_UPDATE: + deparseDirectUpdateSql(&sql, root, resultRelation, rel, + ((Plan *) fscan)->targetlist, + targetAttrs, + remote_conds, ¶ms_list, + returningList, &retrieved_attrs); + break; + case CMD_DELETE: + deparseDirectDeleteSql(&sql, root, resultRelation, rel, + remote_conds, ¶ms_list, + returningList, &retrieved_attrs); + break; + default: + elog(ERROR, "unexpected operation: %d", (int) operation); + break; + } + + /* + * Update the operation info. + */ + fscan->operation = operation; + + /* + * Update the fdw_exprs list that will be available to the executor. + */ + fscan->fdw_exprs = params_list; + + /* + * Update the fdw_private list that will be available to the executor. + * Items in the list must match enum FdwDirectModifyPrivateIndex, above. + */ + fscan->fdw_private = list_make4(makeString(sql.data), + makeInteger((retrieved_attrs != NIL)), + retrieved_attrs, + makeInteger(plan->canSetTag)); + + heap_close(rel, NoLock); + return true; +} + +/* + * postgresBeginDirectModify + * Prepare a direct foreign table modification + */ +static void +postgresBeginDirectModify(ForeignScanState *node, int eflags) +{ + ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan; + EState *estate = node->ss.ps.state; + PgFdwDirectModifyState *dmstate; + RangeTblEntry *rte; + Oid userid; + ForeignTable *table; + UserMapping *user; + int numParams; + + /* + * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL. + */ + if (eflags & EXEC_FLAG_EXPLAIN_ONLY) + return; + + /* + * We'll save private state in node->fdw_state. + */ + dmstate = (PgFdwDirectModifyState *) palloc0(sizeof(PgFdwDirectModifyState)); + node->fdw_state = (void *) dmstate; + + /* + * Identify which user to do the remote access as. This should match what + * ExecCheckRTEPerms() does. + */ + rte = rt_fetch(fsplan->scan.scanrelid, estate->es_range_table); + userid = rte->checkAsUser ? rte->checkAsUser : GetUserId(); + + /* Get info about foreign table. */ + dmstate->rel = node->ss.ss_currentRelation; + table = GetForeignTable(RelationGetRelid(dmstate->rel)); + user = GetUserMapping(userid, table->serverid); + + /* + * Get connection to the foreign server. Connection manager will + * establish new connection if necessary. + */ + dmstate->conn = GetConnection(user, false); + + /* Initialize state variable */ + dmstate->num_tuples = -1; /* -1 means not set yet */ + + /* Get private info created by planner functions. */ + dmstate->query = strVal(list_nth(fsplan->fdw_private, + FdwDirectModifyPrivateUpdateSql)); + dmstate->has_returning = intVal(list_nth(fsplan->fdw_private, + FdwDirectModifyPrivateHasReturning)); + dmstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private, + FdwDirectModifyPrivateRetrievedAttrs); + dmstate->set_processed = intVal(list_nth(fsplan->fdw_private, + FdwDirectModifyPrivateSetProcessed)); + + /* Create context for per-tuple temp workspace. */ + dmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt, + "postgres_fdw temporary data", + ALLOCSET_SMALL_MINSIZE, + ALLOCSET_SMALL_INITSIZE, + ALLOCSET_SMALL_MAXSIZE); + + /* Prepare for input conversion of RETURNING results. */ + if (dmstate->has_returning) + dmstate->attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(dmstate->rel)); + + /* + * Prepare for processing of parameters used in remote query, if any. + */ + numParams = list_length(fsplan->fdw_exprs); + dmstate->numParams = numParams; + if (numParams > 0) + prepare_query_params((PlanState *) node, + fsplan->fdw_exprs, + numParams, + &dmstate->param_flinfo, + &dmstate->param_exprs, + &dmstate->param_values); +} + +/* + * postgresIterateDirectModify + * Execute a direct foreign table modification + */ +static TupleTableSlot * +postgresIterateDirectModify(ForeignScanState *node) +{ + PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state; + EState *estate = node->ss.ps.state; + ResultRelInfo *resultRelInfo = estate->es_result_relation_info; + + /* + * If this is the first call after Begin, execute the statement. + */ + if (dmstate->num_tuples == -1) + execute_dml_stmt(node); + + /* + * If the local query doesn't specify RETURNING, just clear tuple slot. + */ + if (!resultRelInfo->ri_projectReturning) + { + TupleTableSlot *slot = node->ss.ss_ScanTupleSlot; + Instrumentation *instr = node->ss.ps.instrument; + + Assert(!dmstate->has_returning); + + /* Increment the command es_processed count if necessary. */ + if (dmstate->set_processed) + estate->es_processed += dmstate->num_tuples; + + /* Increment the tuple count for EXPLAIN ANALYZE if necessary. */ + if (instr) + instr->tuplecount += dmstate->num_tuples; + + return ExecClearTuple(slot); + } + + /* + * Get the next RETURNING tuple. + */ + return get_returning_data(node); +} + +/* + * postgresEndDirectModify + * Finish a direct foreign table modification + */ +static void +postgresEndDirectModify(ForeignScanState *node) +{ + PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state; + + /* if dmstate is NULL, we are in EXPLAIN; nothing to do */ + if (dmstate == NULL) + return; + + /* Release PGresult */ + if (dmstate->result) + PQclear(dmstate->result); + + /* Release remote connection */ + ReleaseConnection(dmstate->conn); + dmstate->conn = NULL; + + /* MemoryContext will be deleted automatically. */ +} + +/* * postgresExplainForeignScan * Produce extra output for EXPLAIN of a ForeignScan on a foreign table */ @@ -2044,6 +2399,25 @@ postgresExplainForeignModify(ModifyTableState *mtstate, } } +/* + * postgresExplainDirectModify + * Produce extra output for EXPLAIN of a ForeignScan that modifies a + * foreign table directly + */ +static void +postgresExplainDirectModify(ForeignScanState *node, ExplainState *es) +{ + List *fdw_private; + char *sql; + + if (es->verbose) + { + fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private; + sql = strVal(list_nth(fdw_private, FdwDirectModifyPrivateUpdateSql)); + ExplainPropertyText("Remote SQL", sql, es); + } +} + /* * estimate_path_cost_size @@ -2419,38 +2793,14 @@ create_cursor(ForeignScanState *node) */ if (numParams > 0) { - int nestlevel; MemoryContext oldcontext; - int i; - ListCell *lc; oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory); - nestlevel = set_transmission_modes(); - - i = 0; - foreach(lc, fsstate->param_exprs) - { - ExprState *expr_state = (ExprState *) lfirst(lc); - Datum expr_value; - bool isNull; - - /* Evaluate the parameter expression */ - expr_value = ExecEvalExpr(expr_state, econtext, &isNull, NULL); - - /* - * Get string representation of each parameter value by invoking - * type-specific output function, unless the value is null. - */ - if (isNull) - values[i] = NULL; - else - values[i] = OutputFunctionCall(&fsstate->param_flinfo[i], - expr_value); - i++; - } - - reset_transmission_modes(nestlevel); + process_query_params(econtext, + fsstate->param_flinfo, + fsstate->param_exprs, + values); MemoryContextSwitchTo(oldcontext); } @@ -2771,6 +3121,197 @@ store_returning_result(PgFdwModifyState *fmstate, } /* + * Execute a direct UPDATE/DELETE statement. + */ +static void +execute_dml_stmt(ForeignScanState *node) +{ + PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state; + ExprContext *econtext = node->ss.ps.ps_ExprContext; + int numParams = dmstate->numParams; + const char **values = dmstate->param_values; + + /* + * Construct array of query parameter values in text format. + */ + if (numParams > 0) + process_query_params(econtext, + dmstate->param_flinfo, + dmstate->param_exprs, + values); + + /* + * Notice that we pass NULL for paramTypes, thus forcing the remote server + * to infer types for all parameters. Since we explicitly cast every + * parameter (see deparse.c), the "inference" is trivial and will produce + * the desired result. This allows us to avoid assuming that the remote + * server has the same OIDs we do for the parameters' types. + * + * We don't use a PG_TRY block here, so be careful not to throw error + * without releasing the PGresult. + */ + dmstate->result = PQexecParams(dmstate->conn, dmstate->query, + numParams, NULL, values, NULL, NULL, 0); + if (PQresultStatus(dmstate->result) != + (dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) + pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true, + dmstate->query); + + /* Get the number of rows affected. */ + if (dmstate->has_returning) + dmstate->num_tuples = PQntuples(dmstate->result); + else + dmstate->num_tuples = atoi(PQcmdTuples(dmstate->result)); +} + +/* + * Get the result of a RETURNING clause. + */ +static TupleTableSlot * +get_returning_data(ForeignScanState *node) +{ + PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state; + EState *estate = node->ss.ps.state; + ResultRelInfo *resultRelInfo = estate->es_result_relation_info; + TupleTableSlot *slot = node->ss.ss_ScanTupleSlot; + + Assert(resultRelInfo->ri_projectReturning); + + /* If we didn't get any tuples, must be end of data. */ + if (dmstate->next_tuple >= dmstate->num_tuples) + return ExecClearTuple(slot); + + /* Increment the command es_processed count if necessary. */ + if (dmstate->set_processed) + estate->es_processed += 1; + + /* + * Store a RETURNING tuple. If has_returning is false, just emit a dummy + * tuple. (has_returning is false when the local query is of the form + * "UPDATE/DELETE .. RETURNING 1" for example.) + */ + if (!dmstate->has_returning) + ExecStoreAllNullTuple(slot); + else + { + /* + * On error, be sure to release the PGresult on the way out. Callers + * do not have PG_TRY blocks to ensure this happens. + */ + PG_TRY(); + { + HeapTuple newtup; + + newtup = make_tuple_from_result_row(dmstate->result, + dmstate->next_tuple, + dmstate->rel, + dmstate->attinmeta, + dmstate->retrieved_attrs, + NULL, + dmstate->temp_cxt); + ExecStoreTuple(newtup, slot, InvalidBuffer, false); + } + PG_CATCH(); + { + if (dmstate->result) + PQclear(dmstate->result); + PG_RE_THROW(); + } + PG_END_TRY(); + } + dmstate->next_tuple++; + + /* Make slot available for evaluation of the local query RETURNING list. */ + resultRelInfo->ri_projectReturning->pi_exprContext->ecxt_scantuple = slot; + + return slot; +} + +/* + * Prepare for processing of parameters used in remote query. + */ +static void +prepare_query_params(PlanState *node, + List *fdw_exprs, + int numParams, + FmgrInfo **param_flinfo, + List **param_exprs, + const char ***param_values) +{ + int i; + ListCell *lc; + + Assert(numParams > 0); + + /* Prepare for output conversion of parameters used in remote query. */ + *param_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * numParams); + + i = 0; + foreach(lc, fdw_exprs) + { + Node *param_expr = (Node *) lfirst(lc); + Oid typefnoid; + bool isvarlena; + + getTypeOutputInfo(exprType(param_expr), &typefnoid, &isvarlena); + fmgr_info(typefnoid, &(*param_flinfo)[i]); + i++; + } + + /* + * Prepare remote-parameter expressions for evaluation. (Note: in + * practice, we expect that all these expressions will be just Params, so + * we could possibly do something more efficient than using the full + * expression-eval machinery for this. But probably there would be little + * benefit, and it'd require postgres_fdw to know more than is desirable + * about Param evaluation.) + */ + *param_exprs = (List *) ExecInitExpr((Expr *) fdw_exprs, node); + + /* Allocate buffer for text form of query parameters. */ + *param_values = (const char **) palloc0(numParams * sizeof(char *)); +} + +/* + * Construct array of query parameter values in text format. + */ +static void +process_query_params(ExprContext *econtext, + FmgrInfo *param_flinfo, + List *param_exprs, + const char **param_values) +{ + int nestlevel; + int i; + ListCell *lc; + + nestlevel = set_transmission_modes(); + + i = 0; + foreach(lc, param_exprs) + { + ExprState *expr_state = (ExprState *) lfirst(lc); + Datum expr_value; + bool isNull; + + /* Evaluate the parameter expression */ + expr_value = ExecEvalExpr(expr_state, econtext, &isNull, NULL); + + /* + * Get string representation of each parameter value by invoking + * type-specific output function, unless the value is null. + */ + if (isNull) + param_values[i] = NULL; + else + param_values[i] = OutputFunctionCall(¶m_flinfo[i], expr_value); + i++; + } + + reset_transmission_modes(nestlevel); +} + +/* * postgresAnalyzeForeignTable * Test whether analyzing this foreign table is supported */ |