aboutsummaryrefslogtreecommitdiff
path: root/contrib/postgres_fdw/postgres_fdw.c
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/postgres_fdw/postgres_fdw.c')
-rw-r--r--contrib/postgres_fdw/postgres_fdw.c1066
1 files changed, 931 insertions, 135 deletions
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index a46597f02ea..a6db061d603 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -15,16 +15,21 @@
#include "postgres_fdw.h"
#include "access/htup_details.h"
+#include "access/sysattr.h"
#include "commands/defrem.h"
#include "commands/explain.h"
#include "commands/vacuum.h"
#include "foreign/fdwapi.h"
#include "funcapi.h"
#include "miscadmin.h"
+#include "nodes/makefuncs.h"
#include "optimizer/cost.h"
#include "optimizer/pathnode.h"
#include "optimizer/planmain.h"
+#include "optimizer/prep.h"
+#include "optimizer/var.h"
#include "parser/parsetree.h"
+#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
@@ -58,7 +63,7 @@ typedef struct PgFdwRelationInfo
} PgFdwRelationInfo;
/*
- * Indexes of FDW-private information stored in fdw_private list.
+ * Indexes of FDW-private information stored in fdw_private lists.
*
* We store various information in ForeignScan.fdw_private to pass it from
* planner to executor. Specifically there is:
@@ -66,26 +71,41 @@ typedef struct PgFdwRelationInfo
* 1) SELECT statement text to be sent to the remote server
* 2) IDs of PARAM_EXEC Params used in the SELECT statement
*
- * These items are indexed with the enum FdwPrivateIndex, so an item can be
- * fetched with list_nth(). For example, to get the SELECT statement:
- * sql = strVal(list_nth(fdw_private, FdwPrivateSelectSql));
+ * These items are indexed with the enum FdwScanPrivateIndex, so an item
+ * can be fetched with list_nth(). For example, to get the SELECT statement:
+ * sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
*/
-enum FdwPrivateIndex
+enum FdwScanPrivateIndex
{
/* SQL statement to execute remotely (as a String node) */
- FdwPrivateSelectSql,
-
+ FdwScanPrivateSelectSql,
/* Integer list of param IDs of PARAM_EXEC Params used in SQL stmt */
- FdwPrivateExternParamIds,
+ FdwScanPrivateExternParamIds
+};
- /* # of elements stored in the list fdw_private */
- FdwPrivateNum
+/*
+ * Similarly, this enum describes what's kept in the fdw_private list for
+ * a ModifyTable node referencing a postgres_fdw foreign table. We store:
+ *
+ * 1) INSERT/UPDATE/DELETE statement text to be sent to the remote server
+ * 2) Integer list of target attribute numbers for INSERT/UPDATE
+ * (NIL for a DELETE)
+ * 3) Boolean flag showing if there's a RETURNING clause
+ */
+enum FdwModifyPrivateIndex
+{
+ /* SQL statement to execute remotely (as a String node) */
+ FdwModifyPrivateUpdateSql,
+ /* Integer list of target attribute numbers for INSERT/UPDATE */
+ FdwModifyPrivateTargetAttnums,
+ /* has-returning flag (as an integer Value node) */
+ FdwModifyPrivateHasReturning
};
/*
* Execution state of a foreign scan using postgres_fdw.
*/
-typedef struct PgFdwExecutionState
+typedef struct PgFdwScanState
{
Relation rel; /* relcache entry for the foreign table */
AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
@@ -113,7 +133,33 @@ typedef struct PgFdwExecutionState
/* working memory contexts */
MemoryContext batch_cxt; /* context holding current batch of tuples */
MemoryContext temp_cxt; /* context for per-tuple temporary data */
-} PgFdwExecutionState;
+} PgFdwScanState;
+
+/*
+ * Execution state of a foreign insert/update/delete operation.
+ */
+typedef struct PgFdwModifyState
+{
+ Relation rel; /* relcache entry for the foreign table */
+ AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
+
+ /* for remote query execution */
+ PGconn *conn; /* connection for the scan */
+ char *p_name; /* name of prepared statement, if created */
+
+ /* extracted fdw_private data */
+ char *query; /* text of INSERT/UPDATE/DELETE command */
+ List *target_attrs; /* list of target attribute numbers */
+ bool has_returning; /* is there a RETURNING clause? */
+
+ /* info about parameters for prepared statement */
+ AttrNumber ctidAttno; /* attnum of input resjunk ctid column */
+ int p_nums; /* number of parameters to transmit */
+ FmgrInfo *p_flinfo; /* output conversion functions for them */
+
+ /* working memory context */
+ MemoryContext temp_cxt; /* context for per-tuple temporary data */
+} PgFdwModifyState;
/*
* Workspace for analyzing a foreign table.
@@ -169,12 +215,43 @@ static ForeignScan *postgresGetForeignPlan(PlannerInfo *root,
ForeignPath *best_path,
List *tlist,
List *scan_clauses);
-static void postgresExplainForeignScan(ForeignScanState *node,
- ExplainState *es);
static void postgresBeginForeignScan(ForeignScanState *node, int eflags);
static TupleTableSlot *postgresIterateForeignScan(ForeignScanState *node);
static void postgresReScanForeignScan(ForeignScanState *node);
static void postgresEndForeignScan(ForeignScanState *node);
+static void postgresAddForeignUpdateTargets(Query *parsetree,
+ RangeTblEntry *target_rte,
+ Relation target_relation);
+static List *postgresPlanForeignModify(PlannerInfo *root,
+ ModifyTable *plan,
+ Index resultRelation,
+ int subplan_index);
+static void postgresBeginForeignModify(ModifyTableState *mtstate,
+ ResultRelInfo *resultRelInfo,
+ List *fdw_private,
+ int subplan_index,
+ int eflags);
+static TupleTableSlot *postgresExecForeignInsert(EState *estate,
+ ResultRelInfo *resultRelInfo,
+ TupleTableSlot *slot,
+ TupleTableSlot *planSlot);
+static TupleTableSlot *postgresExecForeignUpdate(EState *estate,
+ ResultRelInfo *resultRelInfo,
+ TupleTableSlot *slot,
+ TupleTableSlot *planSlot);
+static TupleTableSlot *postgresExecForeignDelete(EState *estate,
+ ResultRelInfo *resultRelInfo,
+ TupleTableSlot *slot,
+ TupleTableSlot *planSlot);
+static void postgresEndForeignModify(EState *estate,
+ ResultRelInfo *resultRelInfo);
+static void postgresExplainForeignScan(ForeignScanState *node,
+ ExplainState *es);
+static void postgresExplainForeignModify(ModifyTableState *mtstate,
+ ResultRelInfo *rinfo,
+ List *fdw_private,
+ int subplan_index,
+ ExplainState *es);
static bool postgresAnalyzeForeignTable(Relation relation,
AcquireSampleRowsFunc *func,
BlockNumber *totalpages);
@@ -191,6 +268,12 @@ static void get_remote_estimate(const char *sql,
static void create_cursor(ForeignScanState *node);
static void fetch_more_data(ForeignScanState *node);
static void close_cursor(PGconn *conn, unsigned int cursor_number);
+static void prepare_foreign_modify(PgFdwModifyState *fmstate);
+static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
+ ItemPointer tupleid,
+ TupleTableSlot *slot);
+static void store_returning_result(PgFdwModifyState *fmstate,
+ TupleTableSlot *slot, PGresult *res);
static int postgresAcquireSampleRowsFunc(Relation relation, int elevel,
HeapTuple *rows, int targrows,
double *totalrows,
@@ -214,17 +297,29 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
{
FdwRoutine *routine = makeNode(FdwRoutine);
- /* Required handler functions. */
+ /* Functions for scanning foreign tables */
routine->GetForeignRelSize = postgresGetForeignRelSize;
routine->GetForeignPaths = postgresGetForeignPaths;
routine->GetForeignPlan = postgresGetForeignPlan;
- routine->ExplainForeignScan = postgresExplainForeignScan;
routine->BeginForeignScan = postgresBeginForeignScan;
routine->IterateForeignScan = postgresIterateForeignScan;
routine->ReScanForeignScan = postgresReScanForeignScan;
routine->EndForeignScan = postgresEndForeignScan;
- /* Optional handler functions. */
+ /* Functions for updating foreign tables */
+ routine->AddForeignUpdateTargets = postgresAddForeignUpdateTargets;
+ routine->PlanForeignModify = postgresPlanForeignModify;
+ routine->BeginForeignModify = postgresBeginForeignModify;
+ routine->ExecForeignInsert = postgresExecForeignInsert;
+ routine->ExecForeignUpdate = postgresExecForeignUpdate;
+ routine->ExecForeignDelete = postgresExecForeignDelete;
+ routine->EndForeignModify = postgresEndForeignModify;
+
+ /* Support functions for EXPLAIN */
+ routine->ExplainForeignScan = postgresExplainForeignScan;
+ routine->ExplainForeignModify = postgresExplainForeignModify;
+
+ /* Support functions for ANALYZE */
routine->AnalyzeForeignTable = postgresAnalyzeForeignTable;
PG_RETURN_POINTER(routine);
@@ -249,7 +344,6 @@ postgresGetForeignRelSize(PlannerInfo *root,
Oid foreigntableid)
{
bool use_remote_estimate = false;
- ListCell *lc;
PgFdwRelationInfo *fpinfo;
StringInfo sql;
ForeignTable *table;
@@ -266,12 +360,14 @@ postgresGetForeignRelSize(PlannerInfo *root,
List *param_conds;
List *local_conds;
List *param_numbers;
+ Bitmapset *attrs_used;
+ ListCell *lc;
/*
* We use PgFdwRelationInfo to pass various information to subsequent
* functions.
*/
- fpinfo = palloc0(sizeof(PgFdwRelationInfo));
+ fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo));
initStringInfo(&fpinfo->sql);
sql = &fpinfo->sql;
@@ -303,16 +399,37 @@ postgresGetForeignRelSize(PlannerInfo *root,
}
/*
- * Construct remote query which consists of SELECT, FROM, and WHERE
- * clauses. Conditions which contain any Param node are excluded because
- * placeholder can't be used in EXPLAIN statement. Such conditions are
- * appended later.
+ * Identify which restriction clauses can be sent to the remote server and
+ * which can't. Conditions that are remotely executable but contain
+ * PARAM_EXTERN Params have to be treated separately because we can't use
+ * placeholders in remote EXPLAIN.
*/
classifyConditions(root, baserel, &remote_conds, &param_conds,
&local_conds, &param_numbers);
- deparseSimpleSql(sql, root, baserel, local_conds);
- if (list_length(remote_conds) > 0)
- appendWhereClause(sql, true, remote_conds, root);
+
+ /*
+ * Identify which attributes will need to be retrieved from the remote
+ * server. These include all attrs needed for joins or final output, plus
+ * all attrs used in the local_conds.
+ */
+ attrs_used = NULL;
+ pull_varattnos((Node *) baserel->reltargetlist, baserel->relid,
+ &attrs_used);
+ foreach(lc, local_conds)
+ {
+ RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc);
+
+ pull_varattnos((Node *) rinfo->clause, baserel->relid,
+ &attrs_used);
+ }
+
+ /*
+ * Construct remote query which consists of SELECT, FROM, and WHERE
+ * clauses. For now, leave out the param_conds.
+ */
+ deparseSelectSql(sql, root, baserel, attrs_used);
+ if (remote_conds)
+ appendWhereClause(sql, root, remote_conds, true);
/*
* If the table or the server is configured to use remote estimates,
@@ -336,7 +453,7 @@ postgresGetForeignRelSize(PlannerInfo *root,
userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
user = GetUserMapping(userid, server->serverid);
- conn = GetConnection(server, user);
+ conn = GetConnection(server, user, false);
get_remote_estimate(sql->data, conn, &rows, &width,
&startup_cost, &total_cost);
ReleaseConnection(conn);
@@ -403,11 +520,53 @@ postgresGetForeignRelSize(PlannerInfo *root,
/*
* Finish deparsing remote query by adding conditions which were unusable
- * in remote EXPLAIN since they contain Param nodes.
+ * in remote EXPLAIN because they contain Param nodes.
*/
- if (list_length(param_conds) > 0)
- appendWhereClause(sql, !(list_length(remote_conds) > 0), param_conds,
- root);
+ if (param_conds)
+ appendWhereClause(sql, root, param_conds, (remote_conds == NIL));
+
+ /*
+ * Add FOR UPDATE/SHARE if appropriate. We apply locking during the
+ * initial row fetch, rather than later on as is done for local tables.
+ * The extra roundtrips involved in trying to duplicate the local
+ * semantics exactly don't seem worthwhile (see also comments for
+ * RowMarkType).
+ */
+ if (baserel->relid == root->parse->resultRelation &&
+ (root->parse->commandType == CMD_UPDATE ||
+ root->parse->commandType == CMD_DELETE))
+ {
+ /* Relation is UPDATE/DELETE target, so use FOR UPDATE */
+ appendStringInfo(sql, " FOR UPDATE");
+ }
+ else
+ {
+ RowMarkClause *rc = get_parse_rowmark(root->parse, baserel->relid);
+
+ if (rc)
+ {
+ /*
+ * Relation is specified as a FOR UPDATE/SHARE target, so handle
+ * that.
+ *
+ * For now, just ignore any [NO] KEY specification, since (a) it's
+ * not clear what that means for a remote table that we don't have
+ * complete information about, and (b) it wouldn't work anyway on
+ * older remote servers. Likewise, we don't worry about NOWAIT.
+ */
+ switch (rc->strength)
+ {
+ case LCS_FORKEYSHARE:
+ case LCS_FORSHARE:
+ appendStringInfo(sql, " FOR SHARE");
+ break;
+ case LCS_FORNOKEYUPDATE:
+ case LCS_FORUPDATE:
+ appendStringInfo(sql, " FOR UPDATE");
+ break;
+ }
+ }
+ }
/*
* Store obtained information into FDW-private area of RelOptInfo so it's
@@ -477,7 +636,7 @@ postgresGetForeignPaths(PlannerInfo *root,
/*
* Build the fdw_private list that will be available to the executor.
- * Items in the list must match enum FdwPrivateIndex, above.
+ * Items in the list must match enum FdwScanPrivateIndex, above.
*/
fdw_private = list_make2(makeString(fpinfo->sql.data),
fpinfo->param_numbers);
@@ -574,24 +733,6 @@ postgresGetForeignPlan(PlannerInfo *root,
}
/*
- * postgresExplainForeignScan
- * Produce extra output for EXPLAIN
- */
-static void
-postgresExplainForeignScan(ForeignScanState *node, ExplainState *es)
-{
- List *fdw_private;
- char *sql;
-
- if (es->verbose)
- {
- fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private;
- sql = strVal(list_nth(fdw_private, FdwPrivateSelectSql));
- ExplainPropertyText("Remote SQL", sql, es);
- }
-}
-
-/*
* postgresBeginForeignScan
* Initiate an executor scan of a foreign PostgreSQL table.
*/
@@ -600,7 +741,7 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
{
ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
EState *estate = node->ss.ps.state;
- PgFdwExecutionState *festate;
+ PgFdwScanState *fsstate;
RangeTblEntry *rte;
Oid userid;
ForeignTable *table;
@@ -619,8 +760,8 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
/*
* We'll save private state in node->fdw_state.
*/
- festate = (PgFdwExecutionState *) palloc0(sizeof(PgFdwExecutionState));
- node->fdw_state = (void *) festate;
+ fsstate = (PgFdwScanState *) palloc0(sizeof(PgFdwScanState));
+ node->fdw_state = (void *) fsstate;
/*
* Identify which user to do the remote access as. This should match what
@@ -630,8 +771,8 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
/* Get info about foreign table. */
- festate->rel = node->ss.ss_currentRelation;
- table = GetForeignTable(RelationGetRelid(festate->rel));
+ fsstate->rel = node->ss.ss_currentRelation;
+ table = GetForeignTable(RelationGetRelid(fsstate->rel));
server = GetForeignServer(table->serverid);
user = GetUserMapping(userid, server->serverid);
@@ -639,29 +780,29 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
* Get connection to the foreign server. Connection manager will
* establish new connection if necessary.
*/
- festate->conn = GetConnection(server, user);
+ fsstate->conn = GetConnection(server, user, false);
/* Assign a unique ID for my cursor */
- festate->cursor_number = GetCursorNumber(festate->conn);
- festate->cursor_exists = false;
+ fsstate->cursor_number = GetCursorNumber(fsstate->conn);
+ fsstate->cursor_exists = false;
/* Get private info created by planner functions. */
- festate->fdw_private = fsplan->fdw_private;
+ fsstate->fdw_private = fsplan->fdw_private;
/* Create contexts for batches of tuples and per-tuple temp workspace. */
- festate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt,
+ fsstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt,
"postgres_fdw tuple data",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
- festate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
+ fsstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
"postgres_fdw temporary data",
ALLOCSET_SMALL_MINSIZE,
ALLOCSET_SMALL_INITSIZE,
ALLOCSET_SMALL_MAXSIZE);
/* Get info we'll need for data conversion. */
- festate->attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(festate->rel));
+ fsstate->attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(fsstate->rel));
/*
* Allocate buffer for query parameters, if the remote conditions use any.
@@ -673,7 +814,7 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
* null values that are arbitrarily marked as being of type int4.
*/
param_numbers = (List *)
- list_nth(festate->fdw_private, FdwPrivateExternParamIds);
+ list_nth(fsstate->fdw_private, FdwScanPrivateExternParamIds);
if (param_numbers != NIL)
{
ParamListInfo params = estate->es_param_list_info;
@@ -682,21 +823,21 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
}
else
numParams = 0;
- festate->numParams = numParams;
+ fsstate->numParams = numParams;
if (numParams > 0)
{
/* we initially fill all slots with value = NULL, type = int4 */
- festate->param_types = (Oid *) palloc(numParams * sizeof(Oid));
- festate->param_values = (const char **) palloc0(numParams * sizeof(char *));
+ fsstate->param_types = (Oid *) palloc(numParams * sizeof(Oid));
+ fsstate->param_values = (const char **) palloc0(numParams * sizeof(char *));
for (i = 0; i < numParams; i++)
- festate->param_types[i] = INT4OID;
+ fsstate->param_types[i] = INT4OID;
}
else
{
- festate->param_types = NULL;
- festate->param_values = NULL;
+ fsstate->param_types = NULL;
+ fsstate->param_values = NULL;
}
- festate->extparams_done = false;
+ fsstate->extparams_done = false;
}
/*
@@ -707,33 +848,33 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
static TupleTableSlot *
postgresIterateForeignScan(ForeignScanState *node)
{
- PgFdwExecutionState *festate = (PgFdwExecutionState *) node->fdw_state;
+ PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
/*
* If this is the first call after Begin or ReScan, we need to create the
* cursor on the remote side.
*/
- if (!festate->cursor_exists)
+ if (!fsstate->cursor_exists)
create_cursor(node);
/*
* Get some more tuples, if we've run out.
*/
- if (festate->next_tuple >= festate->num_tuples)
+ if (fsstate->next_tuple >= fsstate->num_tuples)
{
/* No point in another fetch if we already detected EOF, though. */
- if (!festate->eof_reached)
+ if (!fsstate->eof_reached)
fetch_more_data(node);
/* If we didn't get any tuples, must be end of data. */
- if (festate->next_tuple >= festate->num_tuples)
+ if (fsstate->next_tuple >= fsstate->num_tuples)
return ExecClearTuple(slot);
}
/*
* Return the next tuple.
*/
- ExecStoreTuple(festate->tuples[festate->next_tuple++],
+ ExecStoreTuple(fsstate->tuples[fsstate->next_tuple++],
slot,
InvalidBuffer,
false);
@@ -748,7 +889,7 @@ postgresIterateForeignScan(ForeignScanState *node)
static void
postgresReScanForeignScan(ForeignScanState *node)
{
- PgFdwExecutionState *festate = (PgFdwExecutionState *) node->fdw_state;
+ PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
char sql[64];
PGresult *res;
@@ -758,7 +899,7 @@ postgresReScanForeignScan(ForeignScanState *node)
*/
/* If we haven't created the cursor yet, nothing to do. */
- if (!festate->cursor_exists)
+ if (!fsstate->cursor_exists)
return;
/*
@@ -769,19 +910,19 @@ postgresReScanForeignScan(ForeignScanState *node)
*/
if (node->ss.ps.chgParam != NULL)
{
- festate->cursor_exists = false;
+ fsstate->cursor_exists = false;
snprintf(sql, sizeof(sql), "CLOSE c%u",
- festate->cursor_number);
+ fsstate->cursor_number);
}
- else if (festate->fetch_ct_2 > 1)
+ else if (fsstate->fetch_ct_2 > 1)
{
snprintf(sql, sizeof(sql), "MOVE BACKWARD ALL IN c%u",
- festate->cursor_number);
+ fsstate->cursor_number);
}
else
{
/* Easy: just rescan what we already have in memory, if anything */
- festate->next_tuple = 0;
+ fsstate->next_tuple = 0;
return;
}
@@ -789,17 +930,17 @@ postgresReScanForeignScan(ForeignScanState *node)
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- res = PQexec(festate->conn, sql);
+ res = PQexec(fsstate->conn, sql);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, true, sql);
PQclear(res);
/* Now force a fresh FETCH. */
- festate->tuples = NULL;
- festate->num_tuples = 0;
- festate->next_tuple = 0;
- festate->fetch_ct_2 = 0;
- festate->eof_reached = false;
+ fsstate->tuples = NULL;
+ fsstate->num_tuples = 0;
+ fsstate->next_tuple = 0;
+ fsstate->fetch_ct_2 = 0;
+ fsstate->eof_reached = false;
}
/*
@@ -809,24 +950,530 @@ postgresReScanForeignScan(ForeignScanState *node)
static void
postgresEndForeignScan(ForeignScanState *node)
{
- PgFdwExecutionState *festate = (PgFdwExecutionState *) node->fdw_state;
+ PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
- /* if festate is NULL, we are in EXPLAIN; nothing to do */
- if (festate == NULL)
+ /* if fsstate is NULL, we are in EXPLAIN; nothing to do */
+ if (fsstate == NULL)
return;
/* Close the cursor if open, to prevent accumulation of cursors */
- if (festate->cursor_exists)
- close_cursor(festate->conn, festate->cursor_number);
+ if (fsstate->cursor_exists)
+ close_cursor(fsstate->conn, fsstate->cursor_number);
/* Release remote connection */
- ReleaseConnection(festate->conn);
- festate->conn = NULL;
+ ReleaseConnection(fsstate->conn);
+ fsstate->conn = NULL;
/* MemoryContexts will be deleted automatically. */
}
/*
+ * postgresAddForeignUpdateTargets
+ * Add resjunk column(s) needed for update/delete on a foreign table
+ */
+static void
+postgresAddForeignUpdateTargets(Query *parsetree,
+ RangeTblEntry *target_rte,
+ Relation target_relation)
+{
+ Var *var;
+ const char *attrname;
+ TargetEntry *tle;
+
+ /*
+ * In postgres_fdw, what we need is the ctid, same as for a regular table.
+ */
+
+ /* Make a Var representing the desired value */
+ var = makeVar(parsetree->resultRelation,
+ SelfItemPointerAttributeNumber,
+ TIDOID,
+ -1,
+ InvalidOid,
+ 0);
+
+ /* Wrap it in a resjunk TLE with the right name ... */
+ attrname = "ctid";
+
+ tle = makeTargetEntry((Expr *) var,
+ list_length(parsetree->targetList) + 1,
+ pstrdup(attrname),
+ true);
+
+ /* ... and add it to the query's targetlist */
+ parsetree->targetList = lappend(parsetree->targetList, tle);
+}
+
+/*
+ * postgresPlanForeignModify
+ * Plan an insert/update/delete operation on a foreign table
+ *
+ * Note: currently, the plan tree generated for UPDATE/DELETE will always
+ * include a ForeignScan that retrieves ctids (using SELECT FOR UPDATE)
+ * and then the ModifyTable node will have to execute individual remote
+ * UPDATE/DELETE commands. If there are no local conditions or joins
+ * needed, it'd be better to let the scan node do UPDATE/DELETE RETURNING
+ * and then do nothing at ModifyTable. Room for future optimization ...
+ */
+static List *
+postgresPlanForeignModify(PlannerInfo *root,
+ ModifyTable *plan,
+ Index resultRelation,
+ int subplan_index)
+{
+ CmdType operation = plan->operation;
+ StringInfoData sql;
+ List *targetAttrs = NIL;
+ List *returningList = NIL;
+
+ initStringInfo(&sql);
+
+ /*
+ * Construct a list of the columns that are to be assigned during INSERT
+ * or UPDATE. We should transmit only these columns, for performance and
+ * to respect any DEFAULT values the remote side may have for other
+ * columns. (XXX this will need some re-thinking when we support default
+ * expressions for foreign tables.)
+ */
+ if (operation == CMD_INSERT || operation == CMD_UPDATE)
+ {
+ RangeTblEntry *rte = planner_rt_fetch(resultRelation, root);
+ Bitmapset *tmpset = bms_copy(rte->modifiedCols);
+ AttrNumber col;
+
+ while ((col = bms_first_member(tmpset)) >= 0)
+ {
+ col += FirstLowInvalidHeapAttributeNumber;
+ if (col <= InvalidAttrNumber) /* shouldn't happen */
+ elog(ERROR, "system-column update is not supported");
+ targetAttrs = lappend_int(targetAttrs, col);
+ }
+ }
+
+ /*
+ * Extract the relevant RETURNING list if any.
+ */
+ if (plan->returningLists)
+ returningList = (List *) list_nth(plan->returningLists, subplan_index);
+
+ /*
+ * Construct the SQL command string.
+ */
+ switch (operation)
+ {
+ case CMD_INSERT:
+ deparseInsertSql(&sql, root, resultRelation,
+ targetAttrs, returningList);
+ break;
+ case CMD_UPDATE:
+ deparseUpdateSql(&sql, root, resultRelation,
+ targetAttrs, returningList);
+ break;
+ case CMD_DELETE:
+ deparseDeleteSql(&sql, root, resultRelation, returningList);
+ break;
+ default:
+ elog(ERROR, "unexpected operation: %d", (int) operation);
+ break;
+ }
+
+ /*
+ * Build the fdw_private list that will be available to the executor.
+ * Items in the list must match enum FdwModifyPrivateIndex, above.
+ */
+ return list_make3(makeString(sql.data),
+ targetAttrs,
+ makeInteger((returningList != NIL)));
+}
+
+/*
+ * postgresBeginForeignModify
+ * Begin an insert/update/delete operation on a foreign table
+ */
+static void
+postgresBeginForeignModify(ModifyTableState *mtstate,
+ ResultRelInfo *resultRelInfo,
+ List *fdw_private,
+ int subplan_index,
+ int eflags)
+{
+ PgFdwModifyState *fmstate;
+ EState *estate = mtstate->ps.state;
+ CmdType operation = mtstate->operation;
+ Relation rel = resultRelInfo->ri_RelationDesc;
+ RangeTblEntry *rte;
+ Oid userid;
+ ForeignTable *table;
+ ForeignServer *server;
+ UserMapping *user;
+ AttrNumber n_params;
+ Oid typefnoid;
+ bool isvarlena;
+ ListCell *lc;
+
+ /*
+ * Do nothing in EXPLAIN (no ANALYZE) case. resultRelInfo->ri_FdwState
+ * stays NULL.
+ */
+ if (eflags & EXEC_FLAG_EXPLAIN_ONLY)
+ return;
+
+ /* Begin constructing PgFdwModifyState. */
+ fmstate = (PgFdwModifyState *) palloc0(sizeof(PgFdwModifyState));
+ fmstate->rel = rel;
+
+ /*
+ * Identify which user to do the remote access as. This should match what
+ * ExecCheckRTEPerms() does.
+ */
+ rte = rt_fetch(resultRelInfo->ri_RangeTableIndex, estate->es_range_table);
+ userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
+
+ /* Get info about foreign table. */
+ table = GetForeignTable(RelationGetRelid(rel));
+ server = GetForeignServer(table->serverid);
+ user = GetUserMapping(userid, server->serverid);
+
+ /* Open connection; report that we'll create a prepared statement. */
+ fmstate->conn = GetConnection(server, user, true);
+ fmstate->p_name = NULL; /* prepared statement not made yet */
+
+ /* Deconstruct fdw_private data. */
+ fmstate->query = strVal(list_nth(fdw_private,
+ FdwModifyPrivateUpdateSql));
+ fmstate->target_attrs = (List *) list_nth(fdw_private,
+ FdwModifyPrivateTargetAttnums);
+ fmstate->has_returning = intVal(list_nth(fdw_private,
+ FdwModifyPrivateHasReturning));
+
+ /* Create context for per-tuple temp workspace. */
+ fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt,
+ "postgres_fdw temporary data",
+ ALLOCSET_SMALL_MINSIZE,
+ ALLOCSET_SMALL_INITSIZE,
+ ALLOCSET_SMALL_MAXSIZE);
+
+ /* Prepare for input conversion of RETURNING results. */
+ if (fmstate->has_returning)
+ fmstate->attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(rel));
+
+ /* Prepare for output conversion of parameters used in prepared stmt. */
+ n_params = list_length(fmstate->target_attrs) + 1;
+ fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params);
+ fmstate->p_nums = 0;
+
+ if (operation == CMD_UPDATE || operation == CMD_DELETE)
+ {
+ /* Find the ctid resjunk column in the subplan's result */
+ Plan *subplan = mtstate->mt_plans[subplan_index]->plan;
+
+ fmstate->ctidAttno = ExecFindJunkAttributeInTlist(subplan->targetlist,
+ "ctid");
+ if (!AttributeNumberIsValid(fmstate->ctidAttno))
+ elog(ERROR, "could not find junk ctid column");
+
+ /* First transmittable parameter will be ctid */
+ getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena);
+ fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
+ fmstate->p_nums++;
+ }
+
+ if (operation == CMD_INSERT || operation == CMD_UPDATE)
+ {
+ /* Set up for remaining transmittable parameters */
+ foreach(lc, fmstate->target_attrs)
+ {
+ int attnum = lfirst_int(lc);
+ Form_pg_attribute attr = RelationGetDescr(rel)->attrs[attnum - 1];
+
+ Assert(!attr->attisdropped);
+
+ getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena);
+ fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]);
+ fmstate->p_nums++;
+ }
+ }
+
+ Assert(fmstate->p_nums <= n_params);
+
+ resultRelInfo->ri_FdwState = fmstate;
+}
+
+/*
+ * postgresExecForeignInsert
+ * Insert one row into a foreign table
+ */
+static TupleTableSlot *
+postgresExecForeignInsert(EState *estate,
+ ResultRelInfo *resultRelInfo,
+ TupleTableSlot *slot,
+ TupleTableSlot *planSlot)
+{
+ PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
+ const char **p_values;
+ PGresult *res;
+ int n_rows;
+
+ /* Set up the prepared statement on the remote server, if we didn't yet */
+ if (!fmstate->p_name)
+ prepare_foreign_modify(fmstate);
+
+ /* Convert parameters needed by prepared statement to text form */
+ p_values = convert_prep_stmt_params(fmstate, NULL, slot);
+
+ /*
+ * Execute the prepared statement, and check for success.
+ *
+ * We don't use a PG_TRY block here, so be careful not to throw error
+ * without releasing the PGresult.
+ */
+ res = PQexecPrepared(fmstate->conn,
+ fmstate->p_name,
+ fmstate->p_nums,
+ p_values,
+ NULL,
+ NULL,
+ 0);
+ if (PQresultStatus(res) !=
+ (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
+ pgfdw_report_error(ERROR, res, true, fmstate->query);
+
+ /* Check number of rows affected, and fetch RETURNING tuple if any */
+ if (fmstate->has_returning)
+ {
+ n_rows = PQntuples(res);
+ if (n_rows > 0)
+ store_returning_result(fmstate, slot, res);
+ }
+ else
+ n_rows = atoi(PQcmdTuples(res));
+
+ /* And clean up */
+ PQclear(res);
+
+ MemoryContextReset(fmstate->temp_cxt);
+
+ /* Return NULL if nothing was inserted on the remote end */
+ return (n_rows > 0) ? slot : NULL;
+}
+
+/*
+ * postgresExecForeignUpdate
+ * Update one row in a foreign table
+ */
+static TupleTableSlot *
+postgresExecForeignUpdate(EState *estate,
+ ResultRelInfo *resultRelInfo,
+ TupleTableSlot *slot,
+ TupleTableSlot *planSlot)
+{
+ PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
+ Datum datum;
+ bool isNull;
+ const char **p_values;
+ PGresult *res;
+ int n_rows;
+
+ /* Set up the prepared statement on the remote server, if we didn't yet */
+ if (!fmstate->p_name)
+ prepare_foreign_modify(fmstate);
+
+ /* Get the ctid that was passed up as a resjunk column */
+ datum = ExecGetJunkAttribute(planSlot,
+ fmstate->ctidAttno,
+ &isNull);
+ /* shouldn't ever get a null result... */
+ if (isNull)
+ elog(ERROR, "ctid is NULL");
+
+ /* Convert parameters needed by prepared statement to text form */
+ p_values = convert_prep_stmt_params(fmstate,
+ (ItemPointer) DatumGetPointer(datum),
+ slot);
+
+ /*
+ * Execute the prepared statement, and check for success.
+ *
+ * We don't use a PG_TRY block here, so be careful not to throw error
+ * without releasing the PGresult.
+ */
+ res = PQexecPrepared(fmstate->conn,
+ fmstate->p_name,
+ fmstate->p_nums,
+ p_values,
+ NULL,
+ NULL,
+ 0);
+ if (PQresultStatus(res) !=
+ (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
+ pgfdw_report_error(ERROR, res, true, fmstate->query);
+
+ /* Check number of rows affected, and fetch RETURNING tuple if any */
+ if (fmstate->has_returning)
+ {
+ n_rows = PQntuples(res);
+ if (n_rows > 0)
+ store_returning_result(fmstate, slot, res);
+ }
+ else
+ n_rows = atoi(PQcmdTuples(res));
+
+ /* And clean up */
+ PQclear(res);
+
+ MemoryContextReset(fmstate->temp_cxt);
+
+ /* Return NULL if nothing was updated on the remote end */
+ return (n_rows > 0) ? slot : NULL;
+}
+
+/*
+ * postgresExecForeignDelete
+ * Delete one row from a foreign table
+ */
+static TupleTableSlot *
+postgresExecForeignDelete(EState *estate,
+ ResultRelInfo *resultRelInfo,
+ TupleTableSlot *slot,
+ TupleTableSlot *planSlot)
+{
+ PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
+ Datum datum;
+ bool isNull;
+ const char **p_values;
+ PGresult *res;
+ int n_rows;
+
+ /* Set up the prepared statement on the remote server, if we didn't yet */
+ if (!fmstate->p_name)
+ prepare_foreign_modify(fmstate);
+
+ /* Get the ctid that was passed up as a resjunk column */
+ datum = ExecGetJunkAttribute(planSlot,
+ fmstate->ctidAttno,
+ &isNull);
+ /* shouldn't ever get a null result... */
+ if (isNull)
+ elog(ERROR, "ctid is NULL");
+
+ /* Convert parameters needed by prepared statement to text form */
+ p_values = convert_prep_stmt_params(fmstate,
+ (ItemPointer) DatumGetPointer(datum),
+ NULL);
+
+ /*
+ * Execute the prepared statement, and check for success.
+ *
+ * We don't use a PG_TRY block here, so be careful not to throw error
+ * without releasing the PGresult.
+ */
+ res = PQexecPrepared(fmstate->conn,
+ fmstate->p_name,
+ fmstate->p_nums,
+ p_values,
+ NULL,
+ NULL,
+ 0);
+ if (PQresultStatus(res) !=
+ (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
+ pgfdw_report_error(ERROR, res, true, fmstate->query);
+
+ /* Check number of rows affected, and fetch RETURNING tuple if any */
+ if (fmstate->has_returning)
+ {
+ n_rows = PQntuples(res);
+ if (n_rows > 0)
+ store_returning_result(fmstate, slot, res);
+ }
+ else
+ n_rows = atoi(PQcmdTuples(res));
+
+ /* And clean up */
+ PQclear(res);
+
+ MemoryContextReset(fmstate->temp_cxt);
+
+ /* Return NULL if nothing was deleted on the remote end */
+ return (n_rows > 0) ? slot : NULL;
+}
+
+/*
+ * postgresEndForeignModify
+ * Finish an insert/update/delete operation on a foreign table
+ */
+static void
+postgresEndForeignModify(EState *estate,
+ ResultRelInfo *resultRelInfo)
+{
+ PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState;
+
+ /* If fmstate is NULL, we are in EXPLAIN; nothing to do */
+ if (fmstate == NULL)
+ return;
+
+ /* If we created a prepared statement, destroy it */
+ if (fmstate->p_name)
+ {
+ char sql[64];
+ PGresult *res;
+
+ snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name);
+
+ /*
+ * We don't use a PG_TRY block here, so be careful not to throw error
+ * without releasing the PGresult.
+ */
+ res = PQexec(fmstate->conn, sql);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pgfdw_report_error(ERROR, res, true, sql);
+ PQclear(res);
+ fmstate->p_name = NULL;
+ }
+
+ /* Release remote connection */
+ ReleaseConnection(fmstate->conn);
+ fmstate->conn = NULL;
+}
+
+/*
+ * postgresExplainForeignScan
+ * Produce extra output for EXPLAIN of a ForeignScan on a foreign table
+ */
+static void
+postgresExplainForeignScan(ForeignScanState *node, ExplainState *es)
+{
+ List *fdw_private;
+ char *sql;
+
+ if (es->verbose)
+ {
+ fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private;
+ sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql));
+ ExplainPropertyText("Remote SQL", sql, es);
+ }
+}
+
+/*
+ * postgresExplainForeignModify
+ * Produce extra output for EXPLAIN of a ModifyTable on a foreign table
+ */
+static void
+postgresExplainForeignModify(ModifyTableState *mtstate,
+ ResultRelInfo *rinfo,
+ List *fdw_private,
+ int subplan_index,
+ ExplainState *es)
+{
+ if (es->verbose)
+ {
+ char *sql = strVal(list_nth(fdw_private,
+ FdwModifyPrivateUpdateSql));
+
+ ExplainPropertyText("Remote SQL", sql, es);
+ }
+}
+
+/*
* Estimate costs of executing given SQL statement.
*/
static void
@@ -885,11 +1532,11 @@ get_remote_estimate(const char *sql, PGconn *conn,
static void
create_cursor(ForeignScanState *node)
{
- PgFdwExecutionState *festate = (PgFdwExecutionState *) node->fdw_state;
- int numParams = festate->numParams;
- Oid *types = festate->param_types;
- const char **values = festate->param_values;
- PGconn *conn = festate->conn;
+ PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+ int numParams = fsstate->numParams;
+ Oid *types = fsstate->param_types;
+ const char **values = fsstate->param_values;
+ PGconn *conn = fsstate->conn;
char *sql;
StringInfoData buf;
PGresult *res;
@@ -904,14 +1551,14 @@ create_cursor(ForeignScanState *node)
* recreate the cursor after a rescan, so we could need to re-use the
* values anyway.
*/
- if (numParams > 0 && !festate->extparams_done)
+ if (numParams > 0 && !fsstate->extparams_done)
{
ParamListInfo params = node->ss.ps.state->es_param_list_info;
List *param_numbers;
ListCell *lc;
param_numbers = (List *)
- list_nth(festate->fdw_private, FdwPrivateExternParamIds);
+ list_nth(fsstate->fdw_private, FdwScanPrivateExternParamIds);
foreach(lc, param_numbers)
{
int paramno = lfirst_int(lc);
@@ -929,8 +1576,8 @@ create_cursor(ForeignScanState *node)
* same OIDs we do for the parameters' types.
*
* We'd not need to pass a type array to PQexecParams at all,
- * except that there may be unused holes in the array, which
- * will have to be filled with something or the remote server will
+ * except that there may be unused holes in the array, which will
+ * have to be filled with something or the remote server will
* complain. We arbitrarily set them to INT4OID earlier.
*/
types[paramno - 1] = InvalidOid;
@@ -951,14 +1598,14 @@ create_cursor(ForeignScanState *node)
prm->value);
}
}
- festate->extparams_done = true;
+ fsstate->extparams_done = true;
}
/* Construct the DECLARE CURSOR command */
- sql = strVal(list_nth(festate->fdw_private, FdwPrivateSelectSql));
+ sql = strVal(list_nth(fsstate->fdw_private, FdwScanPrivateSelectSql));
initStringInfo(&buf);
appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s",
- festate->cursor_number, sql);
+ fsstate->cursor_number, sql);
/*
* We don't use a PG_TRY block here, so be careful not to throw error
@@ -971,12 +1618,12 @@ create_cursor(ForeignScanState *node)
PQclear(res);
/* Mark the cursor as created, and show no tuples have been retrieved */
- festate->cursor_exists = true;
- festate->tuples = NULL;
- festate->num_tuples = 0;
- festate->next_tuple = 0;
- festate->fetch_ct_2 = 0;
- festate->eof_reached = false;
+ fsstate->cursor_exists = true;
+ fsstate->tuples = NULL;
+ fsstate->num_tuples = 0;
+ fsstate->next_tuple = 0;
+ fsstate->fetch_ct_2 = 0;
+ fsstate->eof_reached = false;
/* Clean up */
pfree(buf.data);
@@ -988,7 +1635,7 @@ create_cursor(ForeignScanState *node)
static void
fetch_more_data(ForeignScanState *node)
{
- PgFdwExecutionState *festate = (PgFdwExecutionState *) node->fdw_state;
+ PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
PGresult *volatile res = NULL;
MemoryContext oldcontext;
@@ -996,14 +1643,14 @@ fetch_more_data(ForeignScanState *node)
* We'll store the tuples in the batch_cxt. First, flush the previous
* batch.
*/
- festate->tuples = NULL;
- MemoryContextReset(festate->batch_cxt);
- oldcontext = MemoryContextSwitchTo(festate->batch_cxt);
+ fsstate->tuples = NULL;
+ MemoryContextReset(fsstate->batch_cxt);
+ oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt);
/* PGresult must be released before leaving this function. */
PG_TRY();
{
- PGconn *conn = festate->conn;
+ PGconn *conn = fsstate->conn;
char sql[64];
int fetch_size;
int numrows;
@@ -1013,36 +1660,36 @@ fetch_more_data(ForeignScanState *node)
fetch_size = 100;
snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
- fetch_size, festate->cursor_number);
+ fetch_size, fsstate->cursor_number);
res = PQexec(conn, sql);
/* On error, report the original query, not the FETCH. */
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, false,
- strVal(list_nth(festate->fdw_private,
- FdwPrivateSelectSql)));
+ strVal(list_nth(fsstate->fdw_private,
+ FdwScanPrivateSelectSql)));
/* Convert the data into HeapTuples */
numrows = PQntuples(res);
- festate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
- festate->num_tuples = numrows;
- festate->next_tuple = 0;
+ fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
+ fsstate->num_tuples = numrows;
+ fsstate->next_tuple = 0;
for (i = 0; i < numrows; i++)
{
- festate->tuples[i] =
+ fsstate->tuples[i] =
make_tuple_from_result_row(res, i,
- festate->rel,
- festate->attinmeta,
- festate->temp_cxt);
+ fsstate->rel,
+ fsstate->attinmeta,
+ fsstate->temp_cxt);
}
/* Update fetch_ct_2 */
- if (festate->fetch_ct_2 < 2)
- festate->fetch_ct_2++;
+ if (fsstate->fetch_ct_2 < 2)
+ fsstate->fetch_ct_2++;
/* Must be EOF if we didn't get as many tuples as we asked for. */
- festate->eof_reached = (numrows < fetch_size);
+ fsstate->eof_reached = (numrows < fetch_size);
PQclear(res);
res = NULL;
@@ -1080,6 +1727,136 @@ close_cursor(PGconn *conn, unsigned int cursor_number)
}
/*
+ * prepare_foreign_modify
+ * Establish a prepared statement for execution of INSERT/UPDATE/DELETE
+ */
+static void
+prepare_foreign_modify(PgFdwModifyState *fmstate)
+{
+ char prep_name[NAMEDATALEN];
+ char *p_name;
+ PGresult *res;
+
+ /* Construct name we'll use for the prepared statement. */
+ snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
+ GetPrepStmtNumber(fmstate->conn));
+ p_name = pstrdup(prep_name);
+
+ /*
+ * We intentionally do not specify parameter types here, but leave the
+ * remote server to derive them by default. This avoids possible problems
+ * with the remote server using different type OIDs than we do. All of
+ * the prepared statements we use in this module are simple enough that
+ * the remote server will make the right choices.
+ *
+ * We don't use a PG_TRY block here, so be careful not to throw error
+ * without releasing the PGresult.
+ */
+ res = PQprepare(fmstate->conn,
+ p_name,
+ fmstate->query,
+ 0,
+ NULL);
+
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pgfdw_report_error(ERROR, res, true, fmstate->query);
+ PQclear(res);
+
+ /* This action shows that the prepare has been done. */
+ fmstate->p_name = p_name;
+}
+
+/*
+ * convert_prep_stmt_params
+ * Create array of text strings representing parameter values
+ *
+ * tupleid is ctid to send, or NULL if none
+ * slot is slot to get remaining parameters from, or NULL if none
+ *
+ * Data is constructed in temp_cxt; caller should reset that after use.
+ */
+static const char **
+convert_prep_stmt_params(PgFdwModifyState *fmstate,
+ ItemPointer tupleid,
+ TupleTableSlot *slot)
+{
+ const char **p_values;
+ int pindex = 0;
+ MemoryContext oldcontext;
+
+ oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt);
+
+ p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums);
+
+ /* 1st parameter should be ctid, if it's in use */
+ if (tupleid != NULL)
+ {
+ p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
+ PointerGetDatum(tupleid));
+ pindex++;
+ }
+
+ /* get following parameters from slot */
+ if (slot != NULL)
+ {
+ ListCell *lc;
+
+ foreach(lc, fmstate->target_attrs)
+ {
+ int attnum = lfirst_int(lc);
+ Datum value;
+ bool isnull;
+
+ value = slot_getattr(slot, attnum, &isnull);
+ if (isnull)
+ p_values[pindex] = NULL;
+ else
+ p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex],
+ value);
+ pindex++;
+ }
+ }
+
+ Assert(pindex == fmstate->p_nums);
+
+ MemoryContextSwitchTo(oldcontext);
+
+ return p_values;
+}
+
+/*
+ * store_returning_result
+ * Store the result of a RETURNING clause
+ *
+ * On error, be sure to release the PGresult on the way out. Callers do not
+ * have PG_TRY blocks to ensure this happens.
+ */
+static void
+store_returning_result(PgFdwModifyState *fmstate,
+ TupleTableSlot *slot, PGresult *res)
+{
+ /* PGresult must be released before leaving this function. */
+ PG_TRY();
+ {
+ HeapTuple newtup;
+
+ newtup = make_tuple_from_result_row(res, 0,
+ fmstate->rel,
+ fmstate->attinmeta,
+ fmstate->temp_cxt);
+ /* tuple will be deleted when it is cleared from the slot */
+ ExecStoreTuple(newtup, slot, InvalidBuffer, true);
+ }
+ PG_CATCH();
+ {
+ if (res)
+ PQclear(res);
+ PG_RE_THROW();
+ }
+ PG_END_TRY();
+}
+
+/*
* postgresAnalyzeForeignTable
* Test whether analyzing this foreign table is supported
*/
@@ -1099,7 +1876,7 @@ postgresAnalyzeForeignTable(Relation relation,
*func = postgresAcquireSampleRowsFunc;
/*
- * Now we have to get the number of pages. It's annoying that the ANALYZE
+ * Now we have to get the number of pages. It's annoying that the ANALYZE
* API requires us to return that now, because it forces some duplication
* of effort between this routine and postgresAcquireSampleRowsFunc. But
* it's probably not worth redefining that API at this point.
@@ -1112,7 +1889,7 @@ postgresAnalyzeForeignTable(Relation relation,
table = GetForeignTable(RelationGetRelid(relation));
server = GetForeignServer(table->serverid);
user = GetUserMapping(relation->rd_rel->relowner, server->serverid);
- conn = GetConnection(server, user);
+ conn = GetConnection(server, user, false);
/*
* Construct command to get page count for relation.
@@ -1204,7 +1981,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
table = GetForeignTable(RelationGetRelid(relation));
server = GetForeignServer(table->serverid);
user = GetUserMapping(relation->rd_rel->relowner, server->serverid);
- conn = GetConnection(server, user);
+ conn = GetConnection(server, user, false);
/*
* Construct cursor that retrieves whole rows from remote.
@@ -1382,6 +2159,7 @@ make_tuple_from_result_row(PGresult *res,
Form_pg_attribute *attrs = tupdesc->attrs;
Datum *values;
bool *nulls;
+ ItemPointer ctid = NULL;
ConversionLocation errpos;
ErrorContextCallback errcallback;
MemoryContext oldcontext;
@@ -1449,6 +2227,21 @@ make_tuple_from_result_row(PGresult *res,
j++;
}
+ /*
+ * Convert ctid if present. XXX we could stand to have a cleaner way of
+ * detecting whether ctid is included in the result.
+ */
+ if (j < PQnfields(res))
+ {
+ char *valstr;
+ Datum datum;
+
+ valstr = PQgetvalue(res, row, j);
+ datum = DirectFunctionCall1(tidin, CStringGetDatum(valstr));
+ ctid = (ItemPointer) DatumGetPointer(datum);
+ j++;
+ }
+
/* Uninstall error context callback. */
error_context_stack = errcallback.previous;
@@ -1463,6 +2256,9 @@ make_tuple_from_result_row(PGresult *res,
tuple = heap_form_tuple(tupdesc, values, nulls);
+ if (ctid)
+ tuple->t_self = *ctid;
+
/* Clean up */
MemoryContextReset(temp_context);