aboutsummaryrefslogtreecommitdiff
path: root/contrib/postgres_fdw/postgres_fdw.c
diff options
context:
space:
mode:
authorRobert Haas <rhaas@postgresql.org>2016-03-18 13:48:58 -0400
committerRobert Haas <rhaas@postgresql.org>2016-03-18 13:55:52 -0400
commit0bf3ae88af330496517722e391e7c975e6bad219 (patch)
tree46220c3ebfc9616af8d683c74395b18045c59a8a /contrib/postgres_fdw/postgres_fdw.c
parent3422fecccadb021b7b4cdbc73b2c29f66f031761 (diff)
downloadpostgresql-0bf3ae88af330496517722e391e7c975e6bad219.tar.gz
postgresql-0bf3ae88af330496517722e391e7c975e6bad219.zip
Directly modify foreign tables.
postgres_fdw can now sent an UPDATE or DELETE statement directly to the foreign server in simple cases, rather than sending a SELECT FOR UPDATE statement and then updating or deleting rows one-by-one. Etsuro Fujita, reviewed by Rushabh Lathia, Shigeru Hanada, Kyotaro Horiguchi, Albe Laurenz, Thom Brown, and me.
Diffstat (limited to 'contrib/postgres_fdw/postgres_fdw.c')
-rw-r--r--contrib/postgres_fdw/postgres_fdw.c683
1 files changed, 612 insertions, 71 deletions
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index e446cc5645a..d6db8340129 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -61,6 +61,8 @@ enum FdwScanPrivateIndex
{
/* SQL statement to execute remotely (as a String node) */
FdwScanPrivateSelectSql,
+ /* List of restriction clauses that can be executed remotely */
+ FdwScanPrivateRemoteConds,
/* Integer list of attribute numbers retrieved by the SELECT */
FdwScanPrivateRetrievedAttrs,
/* Integer representing the desired fetch_size */
@@ -98,6 +100,27 @@ enum FdwModifyPrivateIndex
};
/*
+ * Similarly, this enum describes what's kept in the fdw_private list for
+ * a ForeignScan node that modifies a foreign table directly. We store:
+ *
+ * 1) UPDATE/DELETE statement text to be sent to the remote server
+ * 2) Boolean flag showing if the remote query has a RETURNING clause
+ * 3) Integer list of attribute numbers retrieved by RETURNING, if any
+ * 4) Boolean flag showing if we set the command es_processed
+ */
+enum FdwDirectModifyPrivateIndex
+{
+ /* SQL statement to execute remotely (as a String node) */
+ FdwDirectModifyPrivateUpdateSql,
+ /* has-returning flag (as an integer Value node) */
+ FdwDirectModifyPrivateHasReturning,
+ /* Integer list of attribute numbers retrieved by RETURNING */
+ FdwDirectModifyPrivateRetrievedAttrs,
+ /* set-processed flag (as an integer Value node) */
+ FdwDirectModifyPrivateSetProcessed
+};
+
+/*
* Execution state of a foreign scan using postgres_fdw.
*/
typedef struct PgFdwScanState
@@ -164,6 +187,36 @@ typedef struct PgFdwModifyState
} PgFdwModifyState;
/*
+ * Execution state of a foreign scan that modifies a foreign table directly.
+ */
+typedef struct PgFdwDirectModifyState
+{
+ Relation rel; /* relcache entry for the foreign table */
+ AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
+
+ /* extracted fdw_private data */
+ char *query; /* text of UPDATE/DELETE command */
+ bool has_returning; /* is there a RETURNING clause? */
+ List *retrieved_attrs; /* attr numbers retrieved by RETURNING */
+ bool set_processed; /* do we set the command es_processed? */
+
+ /* for remote query execution */
+ PGconn *conn; /* connection for the update */
+ int numParams; /* number of parameters passed to query */
+ FmgrInfo *param_flinfo; /* output conversion functions for them */
+ List *param_exprs; /* executable expressions for param values */
+ const char **param_values; /* textual values of query parameters */
+
+ /* for storing result tuples */
+ PGresult *result; /* result for query */
+ int num_tuples; /* # of result tuples */
+ int next_tuple; /* index of next one to return */
+
+ /* working memory context */
+ MemoryContext temp_cxt; /* context for per-tuple temporary data */
+} PgFdwDirectModifyState;
+
+/*
* Workspace for analyzing a foreign table.
*/
typedef struct PgFdwAnalyzeState
@@ -263,6 +316,13 @@ static TupleTableSlot *postgresExecForeignDelete(EState *estate,
static void postgresEndForeignModify(EState *estate,
ResultRelInfo *resultRelInfo);
static int postgresIsForeignRelUpdatable(Relation rel);
+static bool postgresPlanDirectModify(PlannerInfo *root,
+ ModifyTable *plan,
+ Index resultRelation,
+ int subplan_index);
+static void postgresBeginDirectModify(ForeignScanState *node, int eflags);
+static TupleTableSlot *postgresIterateDirectModify(ForeignScanState *node);
+static void postgresEndDirectModify(ForeignScanState *node);
static void postgresExplainForeignScan(ForeignScanState *node,
ExplainState *es);
static void postgresExplainForeignModify(ModifyTableState *mtstate,
@@ -270,6 +330,8 @@ static void postgresExplainForeignModify(ModifyTableState *mtstate,
List *fdw_private,
int subplan_index,
ExplainState *es);
+static void postgresExplainDirectModify(ForeignScanState *node,
+ ExplainState *es);
static bool postgresAnalyzeForeignTable(Relation relation,
AcquireSampleRowsFunc *func,
BlockNumber *totalpages);
@@ -311,6 +373,18 @@ static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
TupleTableSlot *slot);
static void store_returning_result(PgFdwModifyState *fmstate,
TupleTableSlot *slot, PGresult *res);
+static void execute_dml_stmt(ForeignScanState *node);
+static TupleTableSlot *get_returning_data(ForeignScanState *node);
+static void prepare_query_params(PlanState *node,
+ List *fdw_exprs,
+ int numParams,
+ FmgrInfo **param_flinfo,
+ List **param_exprs,
+ const char ***param_values);
+static void process_query_params(ExprContext *econtext,
+ FmgrInfo *param_flinfo,
+ List *param_exprs,
+ const char **param_values);
static int postgresAcquireSampleRowsFunc(Relation relation, int elevel,
HeapTuple *rows, int targrows,
double *totalrows,
@@ -362,12 +436,17 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
routine->ExecForeignDelete = postgresExecForeignDelete;
routine->EndForeignModify = postgresEndForeignModify;
routine->IsForeignRelUpdatable = postgresIsForeignRelUpdatable;
+ routine->PlanDirectModify = postgresPlanDirectModify;
+ routine->BeginDirectModify = postgresBeginDirectModify;
+ routine->IterateDirectModify = postgresIterateDirectModify;
+ routine->EndDirectModify = postgresEndDirectModify;
/* Function for EvalPlanQual rechecks */
routine->RecheckForeignScan = postgresRecheckForeignScan;
/* Support functions for EXPLAIN */
routine->ExplainForeignScan = postgresExplainForeignScan;
routine->ExplainForeignModify = postgresExplainForeignModify;
+ routine->ExplainDirectModify = postgresExplainDirectModify;
/* Support functions for ANALYZE */
routine->AnalyzeForeignTable = postgresAnalyzeForeignTable;
@@ -1122,7 +1201,8 @@ postgresGetForeignPlan(PlannerInfo *root,
* Build the fdw_private list that will be available to the executor.
* Items in the list must match order in enum FdwScanPrivateIndex.
*/
- fdw_private = list_make4(makeString(sql.data),
+ fdw_private = list_make5(makeString(sql.data),
+ remote_conds,
retrieved_attrs,
makeInteger(fpinfo->fetch_size),
makeInteger(foreignrel->umid));
@@ -1159,8 +1239,6 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
PgFdwScanState *fsstate;
UserMapping *user;
int numParams;
- int i;
- ListCell *lc;
/*
* Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL.
@@ -1247,42 +1325,18 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
fsstate->attinmeta = TupleDescGetAttInMetadata(fsstate->tupdesc);
- /* Prepare for output conversion of parameters used in remote query. */
- numParams = list_length(fsplan->fdw_exprs);
- fsstate->numParams = numParams;
- fsstate->param_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * numParams);
-
- i = 0;
- foreach(lc, fsplan->fdw_exprs)
- {
- Node *param_expr = (Node *) lfirst(lc);
- Oid typefnoid;
- bool isvarlena;
-
- getTypeOutputInfo(exprType(param_expr), &typefnoid, &isvarlena);
- fmgr_info(typefnoid, &fsstate->param_flinfo[i]);
- i++;
- }
-
/*
- * Prepare remote-parameter expressions for evaluation. (Note: in
- * practice, we expect that all these expressions will be just Params, so
- * we could possibly do something more efficient than using the full
- * expression-eval machinery for this. But probably there would be little
- * benefit, and it'd require postgres_fdw to know more than is desirable
- * about Param evaluation.)
- */
- fsstate->param_exprs = (List *)
- ExecInitExpr((Expr *) fsplan->fdw_exprs,
- (PlanState *) node);
-
- /*
- * Allocate buffer for text form of query parameters, if any.
+ * Prepare for processing of parameters used in remote query, if any.
*/
+ numParams = list_length(fsplan->fdw_exprs);
+ fsstate->numParams = numParams;
if (numParams > 0)
- fsstate->param_values = (const char **) palloc0(numParams * sizeof(char *));
- else
- fsstate->param_values = NULL;
+ prepare_query_params((PlanState *) node,
+ fsplan->fdw_exprs,
+ numParams,
+ &fsstate->param_flinfo,
+ &fsstate->param_exprs,
+ &fsstate->param_values);
}
/*
@@ -1447,13 +1501,6 @@ postgresAddForeignUpdateTargets(Query *parsetree,
/*
* postgresPlanForeignModify
* Plan an insert/update/delete operation on a foreign table
- *
- * Note: currently, the plan tree generated for UPDATE/DELETE will always
- * include a ForeignScan that retrieves ctids (using SELECT FOR UPDATE)
- * and then the ModifyTable node will have to execute individual remote
- * UPDATE/DELETE commands. If there are no local conditions or joins
- * needed, it'd be better to let the scan node do UPDATE/DELETE RETURNING
- * and then do nothing at ModifyTable. Room for future optimization ...
*/
static List *
postgresPlanForeignModify(PlannerInfo *root,
@@ -1992,6 +2039,314 @@ postgresRecheckForeignScan(ForeignScanState *node, TupleTableSlot *slot)
}
/*
+ * postgresPlanDirectModify
+ * Consider a direct foreign table modification
+ *
+ * Decide whether it is safe to modify a foreign table directly, and if so,
+ * rewrite subplan accordingly.
+ */
+static bool
+postgresPlanDirectModify(PlannerInfo *root,
+ ModifyTable *plan,
+ Index resultRelation,
+ int subplan_index)
+{
+ CmdType operation = plan->operation;
+ Plan *subplan = (Plan *) list_nth(plan->plans, subplan_index);
+ RangeTblEntry *rte = planner_rt_fetch(resultRelation, root);
+ Relation rel;
+ StringInfoData sql;
+ ForeignScan *fscan;
+ List *targetAttrs = NIL;
+ List *remote_conds;
+ List *params_list = NIL;
+ List *returningList = NIL;
+ List *retrieved_attrs = NIL;
+
+ /*
+ * Decide whether it is safe to modify a foreign table directly.
+ */
+
+ /*
+ * The table modification must be an UPDATE or DELETE.
+ */
+ if (operation != CMD_UPDATE && operation != CMD_DELETE)
+ return false;
+
+ /*
+ * It's unsafe to modify a foreign table directly if there are any local
+ * joins needed.
+ */
+ if (!IsA(subplan, ForeignScan))
+ return false;
+
+ /*
+ * It's unsafe to modify a foreign table directly if there are any quals
+ * that should be evaluated locally.
+ */
+ if (subplan->qual != NIL)
+ return false;
+
+ /*
+ * We can't handle an UPDATE or DELETE on a foreign join for now.
+ */
+ fscan = (ForeignScan *) subplan;
+ if (fscan->scan.scanrelid == 0)
+ return false;
+
+ /*
+ * It's unsafe to update a foreign table directly, if any expressions to
+ * assign to the target columns are unsafe to evaluate remotely.
+ */
+ if (operation == CMD_UPDATE)
+ {
+ RelOptInfo *baserel = root->simple_rel_array[resultRelation];
+ int col;
+
+ /*
+ * We transmit only columns that were explicitly targets of the UPDATE,
+ * so as to avoid unnecessary data transmission.
+ */
+ col = -1;
+ while ((col = bms_next_member(rte->updatedCols, col)) >= 0)
+ {
+ /* bit numbers are offset by FirstLowInvalidHeapAttributeNumber */
+ AttrNumber attno = col + FirstLowInvalidHeapAttributeNumber;
+ TargetEntry *tle;
+
+ if (attno <= InvalidAttrNumber) /* shouldn't happen */
+ elog(ERROR, "system-column update is not supported");
+
+ tle = get_tle_by_resno(subplan->targetlist, attno);
+
+ if (!is_foreign_expr(root, baserel, (Expr *) tle->expr))
+ return false;
+
+ targetAttrs = lappend_int(targetAttrs, attno);
+ }
+ }
+
+ /*
+ * Ok, rewrite subplan so as to modify the foreign table directly.
+ */
+ initStringInfo(&sql);
+
+ /*
+ * Core code already has some lock on each rel being planned, so we can
+ * use NoLock here.
+ */
+ rel = heap_open(rte->relid, NoLock);
+
+ /*
+ * Extract the baserestrictinfo clauses that can be evaluated remotely.
+ */
+ remote_conds = (List *) list_nth(fscan->fdw_private,
+ FdwScanPrivateRemoteConds);
+
+ /*
+ * Extract the relevant RETURNING list if any.
+ */
+ if (plan->returningLists)
+ returningList = (List *) list_nth(plan->returningLists, subplan_index);
+
+ /*
+ * Construct the SQL command string.
+ */
+ switch (operation)
+ {
+ case CMD_UPDATE:
+ deparseDirectUpdateSql(&sql, root, resultRelation, rel,
+ ((Plan *) fscan)->targetlist,
+ targetAttrs,
+ remote_conds, &params_list,
+ returningList, &retrieved_attrs);
+ break;
+ case CMD_DELETE:
+ deparseDirectDeleteSql(&sql, root, resultRelation, rel,
+ remote_conds, &params_list,
+ returningList, &retrieved_attrs);
+ break;
+ default:
+ elog(ERROR, "unexpected operation: %d", (int) operation);
+ break;
+ }
+
+ /*
+ * Update the operation info.
+ */
+ fscan->operation = operation;
+
+ /*
+ * Update the fdw_exprs list that will be available to the executor.
+ */
+ fscan->fdw_exprs = params_list;
+
+ /*
+ * Update the fdw_private list that will be available to the executor.
+ * Items in the list must match enum FdwDirectModifyPrivateIndex, above.
+ */
+ fscan->fdw_private = list_make4(makeString(sql.data),
+ makeInteger((retrieved_attrs != NIL)),
+ retrieved_attrs,
+ makeInteger(plan->canSetTag));
+
+ heap_close(rel, NoLock);
+ return true;
+}
+
+/*
+ * postgresBeginDirectModify
+ * Prepare a direct foreign table modification
+ */
+static void
+postgresBeginDirectModify(ForeignScanState *node, int eflags)
+{
+ ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
+ EState *estate = node->ss.ps.state;
+ PgFdwDirectModifyState *dmstate;
+ RangeTblEntry *rte;
+ Oid userid;
+ ForeignTable *table;
+ UserMapping *user;
+ int numParams;
+
+ /*
+ * Do nothing in EXPLAIN (no ANALYZE) case. node->fdw_state stays NULL.
+ */
+ if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
+ return;
+
+ /*
+ * We'll save private state in node->fdw_state.
+ */
+ dmstate = (PgFdwDirectModifyState *) palloc0(sizeof(PgFdwDirectModifyState));
+ node->fdw_state = (void *) dmstate;
+
+ /*
+ * Identify which user to do the remote access as. This should match what
+ * ExecCheckRTEPerms() does.
+ */
+ rte = rt_fetch(fsplan->scan.scanrelid, estate->es_range_table);
+ userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
+
+ /* Get info about foreign table. */
+ dmstate->rel = node->ss.ss_currentRelation;
+ table = GetForeignTable(RelationGetRelid(dmstate->rel));
+ user = GetUserMapping(userid, table->serverid);
+
+ /*
+ * Get connection to the foreign server. Connection manager will
+ * establish new connection if necessary.
+ */
+ dmstate->conn = GetConnection(user, false);
+
+ /* Initialize state variable */
+ dmstate->num_tuples = -1; /* -1 means not set yet */
+
+ /* Get private info created by planner functions. */
+ dmstate->query = strVal(list_nth(fsplan->fdw_private,
+ FdwDirectModifyPrivateUpdateSql));
+ dmstate->has_returning = intVal(list_nth(fsplan->fdw_private,
+ FdwDirectModifyPrivateHasReturning));
+ dmstate->retrieved_attrs = (List *) list_nth(fsplan->fdw_private,
+ FdwDirectModifyPrivateRetrievedAttrs);
+ dmstate->set_processed = intVal(list_nth(fsplan->fdw_private,
+ FdwDirectModifyPrivateSetProcessed));
+
+ /* Create context for per-tuple temp workspace. */
+ dmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
+ "postgres_fdw temporary data",
+ ALLOCSET_SMALL_MINSIZE,
+ ALLOCSET_SMALL_INITSIZE,
+ ALLOCSET_SMALL_MAXSIZE);
+
+ /* Prepare for input conversion of RETURNING results. */
+ if (dmstate->has_returning)
+ dmstate->attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(dmstate->rel));
+
+ /*
+ * Prepare for processing of parameters used in remote query, if any.
+ */
+ numParams = list_length(fsplan->fdw_exprs);
+ dmstate->numParams = numParams;
+ if (numParams > 0)
+ prepare_query_params((PlanState *) node,
+ fsplan->fdw_exprs,
+ numParams,
+ &dmstate->param_flinfo,
+ &dmstate->param_exprs,
+ &dmstate->param_values);
+}
+
+/*
+ * postgresIterateDirectModify
+ * Execute a direct foreign table modification
+ */
+static TupleTableSlot *
+postgresIterateDirectModify(ForeignScanState *node)
+{
+ PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state;
+ EState *estate = node->ss.ps.state;
+ ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
+
+ /*
+ * If this is the first call after Begin, execute the statement.
+ */
+ if (dmstate->num_tuples == -1)
+ execute_dml_stmt(node);
+
+ /*
+ * If the local query doesn't specify RETURNING, just clear tuple slot.
+ */
+ if (!resultRelInfo->ri_projectReturning)
+ {
+ TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
+ Instrumentation *instr = node->ss.ps.instrument;
+
+ Assert(!dmstate->has_returning);
+
+ /* Increment the command es_processed count if necessary. */
+ if (dmstate->set_processed)
+ estate->es_processed += dmstate->num_tuples;
+
+ /* Increment the tuple count for EXPLAIN ANALYZE if necessary. */
+ if (instr)
+ instr->tuplecount += dmstate->num_tuples;
+
+ return ExecClearTuple(slot);
+ }
+
+ /*
+ * Get the next RETURNING tuple.
+ */
+ return get_returning_data(node);
+}
+
+/*
+ * postgresEndDirectModify
+ * Finish a direct foreign table modification
+ */
+static void
+postgresEndDirectModify(ForeignScanState *node)
+{
+ PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state;
+
+ /* if dmstate is NULL, we are in EXPLAIN; nothing to do */
+ if (dmstate == NULL)
+ return;
+
+ /* Release PGresult */
+ if (dmstate->result)
+ PQclear(dmstate->result);
+
+ /* Release remote connection */
+ ReleaseConnection(dmstate->conn);
+ dmstate->conn = NULL;
+
+ /* MemoryContext will be deleted automatically. */
+}
+
+/*
* postgresExplainForeignScan
* Produce extra output for EXPLAIN of a ForeignScan on a foreign table
*/
@@ -2044,6 +2399,25 @@ postgresExplainForeignModify(ModifyTableState *mtstate,
}
}
+/*
+ * postgresExplainDirectModify
+ * Produce extra output for EXPLAIN of a ForeignScan that modifies a
+ * foreign table directly
+ */
+static void
+postgresExplainDirectModify(ForeignScanState *node, ExplainState *es)
+{
+ List *fdw_private;
+ char *sql;
+
+ if (es->verbose)
+ {
+ fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private;
+ sql = strVal(list_nth(fdw_private, FdwDirectModifyPrivateUpdateSql));
+ ExplainPropertyText("Remote SQL", sql, es);
+ }
+}
+
/*
* estimate_path_cost_size
@@ -2419,38 +2793,14 @@ create_cursor(ForeignScanState *node)
*/
if (numParams > 0)
{
- int nestlevel;
MemoryContext oldcontext;
- int i;
- ListCell *lc;
oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
- nestlevel = set_transmission_modes();
-
- i = 0;
- foreach(lc, fsstate->param_exprs)
- {
- ExprState *expr_state = (ExprState *) lfirst(lc);
- Datum expr_value;
- bool isNull;
-
- /* Evaluate the parameter expression */
- expr_value = ExecEvalExpr(expr_state, econtext, &isNull, NULL);
-
- /*
- * Get string representation of each parameter value by invoking
- * type-specific output function, unless the value is null.
- */
- if (isNull)
- values[i] = NULL;
- else
- values[i] = OutputFunctionCall(&fsstate->param_flinfo[i],
- expr_value);
- i++;
- }
-
- reset_transmission_modes(nestlevel);
+ process_query_params(econtext,
+ fsstate->param_flinfo,
+ fsstate->param_exprs,
+ values);
MemoryContextSwitchTo(oldcontext);
}
@@ -2771,6 +3121,197 @@ store_returning_result(PgFdwModifyState *fmstate,
}
/*
+ * Execute a direct UPDATE/DELETE statement.
+ */
+static void
+execute_dml_stmt(ForeignScanState *node)
+{
+ PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state;
+ ExprContext *econtext = node->ss.ps.ps_ExprContext;
+ int numParams = dmstate->numParams;
+ const char **values = dmstate->param_values;
+
+ /*
+ * Construct array of query parameter values in text format.
+ */
+ if (numParams > 0)
+ process_query_params(econtext,
+ dmstate->param_flinfo,
+ dmstate->param_exprs,
+ values);
+
+ /*
+ * Notice that we pass NULL for paramTypes, thus forcing the remote server
+ * to infer types for all parameters. Since we explicitly cast every
+ * parameter (see deparse.c), the "inference" is trivial and will produce
+ * the desired result. This allows us to avoid assuming that the remote
+ * server has the same OIDs we do for the parameters' types.
+ *
+ * We don't use a PG_TRY block here, so be careful not to throw error
+ * without releasing the PGresult.
+ */
+ dmstate->result = PQexecParams(dmstate->conn, dmstate->query,
+ numParams, NULL, values, NULL, NULL, 0);
+ if (PQresultStatus(dmstate->result) !=
+ (dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
+ pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
+ dmstate->query);
+
+ /* Get the number of rows affected. */
+ if (dmstate->has_returning)
+ dmstate->num_tuples = PQntuples(dmstate->result);
+ else
+ dmstate->num_tuples = atoi(PQcmdTuples(dmstate->result));
+}
+
+/*
+ * Get the result of a RETURNING clause.
+ */
+static TupleTableSlot *
+get_returning_data(ForeignScanState *node)
+{
+ PgFdwDirectModifyState *dmstate = (PgFdwDirectModifyState *) node->fdw_state;
+ EState *estate = node->ss.ps.state;
+ ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
+ TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
+
+ Assert(resultRelInfo->ri_projectReturning);
+
+ /* If we didn't get any tuples, must be end of data. */
+ if (dmstate->next_tuple >= dmstate->num_tuples)
+ return ExecClearTuple(slot);
+
+ /* Increment the command es_processed count if necessary. */
+ if (dmstate->set_processed)
+ estate->es_processed += 1;
+
+ /*
+ * Store a RETURNING tuple. If has_returning is false, just emit a dummy
+ * tuple. (has_returning is false when the local query is of the form
+ * "UPDATE/DELETE .. RETURNING 1" for example.)
+ */
+ if (!dmstate->has_returning)
+ ExecStoreAllNullTuple(slot);
+ else
+ {
+ /*
+ * On error, be sure to release the PGresult on the way out. Callers
+ * do not have PG_TRY blocks to ensure this happens.
+ */
+ PG_TRY();
+ {
+ HeapTuple newtup;
+
+ newtup = make_tuple_from_result_row(dmstate->result,
+ dmstate->next_tuple,
+ dmstate->rel,
+ dmstate->attinmeta,
+ dmstate->retrieved_attrs,
+ NULL,
+ dmstate->temp_cxt);
+ ExecStoreTuple(newtup, slot, InvalidBuffer, false);
+ }
+ PG_CATCH();
+ {
+ if (dmstate->result)
+ PQclear(dmstate->result);
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+ }
+ dmstate->next_tuple++;
+
+ /* Make slot available for evaluation of the local query RETURNING list. */
+ resultRelInfo->ri_projectReturning->pi_exprContext->ecxt_scantuple = slot;
+
+ return slot;
+}
+
+/*
+ * Prepare for processing of parameters used in remote query.
+ */
+static void
+prepare_query_params(PlanState *node,
+ List *fdw_exprs,
+ int numParams,
+ FmgrInfo **param_flinfo,
+ List **param_exprs,
+ const char ***param_values)
+{
+ int i;
+ ListCell *lc;
+
+ Assert(numParams > 0);
+
+ /* Prepare for output conversion of parameters used in remote query. */
+ *param_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * numParams);
+
+ i = 0;
+ foreach(lc, fdw_exprs)
+ {
+ Node *param_expr = (Node *) lfirst(lc);
+ Oid typefnoid;
+ bool isvarlena;
+
+ getTypeOutputInfo(exprType(param_expr), &typefnoid, &isvarlena);
+ fmgr_info(typefnoid, &(*param_flinfo)[i]);
+ i++;
+ }
+
+ /*
+ * Prepare remote-parameter expressions for evaluation. (Note: in
+ * practice, we expect that all these expressions will be just Params, so
+ * we could possibly do something more efficient than using the full
+ * expression-eval machinery for this. But probably there would be little
+ * benefit, and it'd require postgres_fdw to know more than is desirable
+ * about Param evaluation.)
+ */
+ *param_exprs = (List *) ExecInitExpr((Expr *) fdw_exprs, node);
+
+ /* Allocate buffer for text form of query parameters. */
+ *param_values = (const char **) palloc0(numParams * sizeof(char *));
+}
+
+/*
+ * Construct array of query parameter values in text format.
+ */
+static void
+process_query_params(ExprContext *econtext,
+ FmgrInfo *param_flinfo,
+ List *param_exprs,
+ const char **param_values)
+{
+ int nestlevel;
+ int i;
+ ListCell *lc;
+
+ nestlevel = set_transmission_modes();
+
+ i = 0;
+ foreach(lc, param_exprs)
+ {
+ ExprState *expr_state = (ExprState *) lfirst(lc);
+ Datum expr_value;
+ bool isNull;
+
+ /* Evaluate the parameter expression */
+ expr_value = ExecEvalExpr(expr_state, econtext, &isNull, NULL);
+
+ /*
+ * Get string representation of each parameter value by invoking
+ * type-specific output function, unless the value is null.
+ */
+ if (isNull)
+ param_values[i] = NULL;
+ else
+ param_values[i] = OutputFunctionCall(&param_flinfo[i], expr_value);
+ i++;
+ }
+
+ reset_transmission_modes(nestlevel);
+}
+
+/*
* postgresAnalyzeForeignTable
* Test whether analyzing this foreign table is supported
*/