diff options
author | Tom Lane <tgl@sss.pgh.pa.us> | 2013-03-10 14:14:53 -0400 |
---|---|---|
committer | Tom Lane <tgl@sss.pgh.pa.us> | 2013-03-10 14:16:02 -0400 |
commit | 21734d2fb896e0ecdddd3251caa72a3576e2d415 (patch) | |
tree | aed4ee5509e618c0fd9746c8be17c5bf23a08a3f /contrib/postgres_fdw/postgres_fdw.c | |
parent | 7f49a67f954db3e92fd96963169fb8302959576e (diff) | |
download | postgresql-21734d2fb896e0ecdddd3251caa72a3576e2d415.tar.gz postgresql-21734d2fb896e0ecdddd3251caa72a3576e2d415.zip |
Support writable foreign tables.
This patch adds the core-system infrastructure needed to support updates
on foreign tables, and extends contrib/postgres_fdw to allow updates
against remote Postgres servers. There's still a great deal of room for
improvement in optimization of remote updates, but at least there's basic
functionality there now.
KaiGai Kohei, reviewed by Alexander Korotkov and Laurenz Albe, and rather
heavily revised by Tom Lane.
Diffstat (limited to 'contrib/postgres_fdw/postgres_fdw.c')
-rw-r--r-- | contrib/postgres_fdw/postgres_fdw.c | 1066 |
1 files changed, 931 insertions, 135 deletions
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index a46597f02ea..a6db061d603 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -15,16 +15,21 @@ #include "postgres_fdw.h" #include "access/htup_details.h" +#include "access/sysattr.h" #include "commands/defrem.h" #include "commands/explain.h" #include "commands/vacuum.h" #include "foreign/fdwapi.h" #include "funcapi.h" #include "miscadmin.h" +#include "nodes/makefuncs.h" #include "optimizer/cost.h" #include "optimizer/pathnode.h" #include "optimizer/planmain.h" +#include "optimizer/prep.h" +#include "optimizer/var.h" #include "parser/parsetree.h" +#include "utils/builtins.h" #include "utils/lsyscache.h" #include "utils/memutils.h" @@ -58,7 +63,7 @@ typedef struct PgFdwRelationInfo } PgFdwRelationInfo; /* - * Indexes of FDW-private information stored in fdw_private list. + * Indexes of FDW-private information stored in fdw_private lists. * * We store various information in ForeignScan.fdw_private to pass it from * planner to executor. Specifically there is: @@ -66,26 +71,41 @@ typedef struct PgFdwRelationInfo * 1) SELECT statement text to be sent to the remote server * 2) IDs of PARAM_EXEC Params used in the SELECT statement * - * These items are indexed with the enum FdwPrivateIndex, so an item can be - * fetched with list_nth(). For example, to get the SELECT statement: - * sql = strVal(list_nth(fdw_private, FdwPrivateSelectSql)); + * These items are indexed with the enum FdwScanPrivateIndex, so an item + * can be fetched with list_nth(). For example, to get the SELECT statement: + * sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql)); */ -enum FdwPrivateIndex +enum FdwScanPrivateIndex { /* SQL statement to execute remotely (as a String node) */ - FdwPrivateSelectSql, - + FdwScanPrivateSelectSql, /* Integer list of param IDs of PARAM_EXEC Params used in SQL stmt */ - FdwPrivateExternParamIds, + FdwScanPrivateExternParamIds +}; - /* # of elements stored in the list fdw_private */ - FdwPrivateNum +/* + * Similarly, this enum describes what's kept in the fdw_private list for + * a ModifyTable node referencing a postgres_fdw foreign table. We store: + * + * 1) INSERT/UPDATE/DELETE statement text to be sent to the remote server + * 2) Integer list of target attribute numbers for INSERT/UPDATE + * (NIL for a DELETE) + * 3) Boolean flag showing if there's a RETURNING clause + */ +enum FdwModifyPrivateIndex +{ + /* SQL statement to execute remotely (as a String node) */ + FdwModifyPrivateUpdateSql, + /* Integer list of target attribute numbers for INSERT/UPDATE */ + FdwModifyPrivateTargetAttnums, + /* has-returning flag (as an integer Value node) */ + FdwModifyPrivateHasReturning }; /* * Execution state of a foreign scan using postgres_fdw. */ -typedef struct PgFdwExecutionState +typedef struct PgFdwScanState { Relation rel; /* relcache entry for the foreign table */ AttInMetadata *attinmeta; /* attribute datatype conversion metadata */ @@ -113,7 +133,33 @@ typedef struct PgFdwExecutionState /* working memory contexts */ MemoryContext batch_cxt; /* context holding current batch of tuples */ MemoryContext temp_cxt; /* context for per-tuple temporary data */ -} PgFdwExecutionState; +} PgFdwScanState; + +/* + * Execution state of a foreign insert/update/delete operation. + */ +typedef struct PgFdwModifyState +{ + Relation rel; /* relcache entry for the foreign table */ + AttInMetadata *attinmeta; /* attribute datatype conversion metadata */ + + /* for remote query execution */ + PGconn *conn; /* connection for the scan */ + char *p_name; /* name of prepared statement, if created */ + + /* extracted fdw_private data */ + char *query; /* text of INSERT/UPDATE/DELETE command */ + List *target_attrs; /* list of target attribute numbers */ + bool has_returning; /* is there a RETURNING clause? */ + + /* info about parameters for prepared statement */ + AttrNumber ctidAttno; /* attnum of input resjunk ctid column */ + int p_nums; /* number of parameters to transmit */ + FmgrInfo *p_flinfo; /* output conversion functions for them */ + + /* working memory context */ + MemoryContext temp_cxt; /* context for per-tuple temporary data */ +} PgFdwModifyState; /* * Workspace for analyzing a foreign table. @@ -169,12 +215,43 @@ static ForeignScan *postgresGetForeignPlan(PlannerInfo *root, ForeignPath *best_path, List *tlist, List *scan_clauses); -static void postgresExplainForeignScan(ForeignScanState *node, - ExplainState *es); static void postgresBeginForeignScan(ForeignScanState *node, int eflags); static TupleTableSlot *postgresIterateForeignScan(ForeignScanState *node); static void postgresReScanForeignScan(ForeignScanState *node); static void postgresEndForeignScan(ForeignScanState *node); +static void postgresAddForeignUpdateTargets(Query *parsetree, + RangeTblEntry *target_rte, + Relation target_relation); +static List *postgresPlanForeignModify(PlannerInfo *root, + ModifyTable *plan, + Index resultRelation, + int subplan_index); +static void postgresBeginForeignModify(ModifyTableState *mtstate, + ResultRelInfo *resultRelInfo, + List *fdw_private, + int subplan_index, + int eflags); +static TupleTableSlot *postgresExecForeignInsert(EState *estate, + ResultRelInfo *resultRelInfo, + TupleTableSlot *slot, + TupleTableSlot *planSlot); +static TupleTableSlot *postgresExecForeignUpdate(EState *estate, + ResultRelInfo *resultRelInfo, + TupleTableSlot *slot, + TupleTableSlot *planSlot); +static TupleTableSlot *postgresExecForeignDelete(EState *estate, + ResultRelInfo *resultRelInfo, + TupleTableSlot *slot, + TupleTableSlot *planSlot); +static void postgresEndForeignModify(EState *estate, + ResultRelInfo *resultRelInfo); +static void postgresExplainForeignScan(ForeignScanState *node, + ExplainState *es); +static void postgresExplainForeignModify(ModifyTableState *mtstate, + ResultRelInfo *rinfo, + List *fdw_private, + int subplan_index, + ExplainState *es); static bool postgresAnalyzeForeignTable(Relation relation, AcquireSampleRowsFunc *func, BlockNumber *totalpages); @@ -191,6 +268,12 @@ static void get_remote_estimate(const char *sql, static void create_cursor(ForeignScanState *node); static void fetch_more_data(ForeignScanState *node); static void close_cursor(PGconn *conn, unsigned int cursor_number); +static void prepare_foreign_modify(PgFdwModifyState *fmstate); +static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate, + ItemPointer tupleid, + TupleTableSlot *slot); +static void store_returning_result(PgFdwModifyState *fmstate, + TupleTableSlot *slot, PGresult *res); static int postgresAcquireSampleRowsFunc(Relation relation, int elevel, HeapTuple *rows, int targrows, double *totalrows, @@ -214,17 +297,29 @@ postgres_fdw_handler(PG_FUNCTION_ARGS) { FdwRoutine *routine = makeNode(FdwRoutine); - /* Required handler functions. */ + /* Functions for scanning foreign tables */ routine->GetForeignRelSize = postgresGetForeignRelSize; routine->GetForeignPaths = postgresGetForeignPaths; routine->GetForeignPlan = postgresGetForeignPlan; - routine->ExplainForeignScan = postgresExplainForeignScan; routine->BeginForeignScan = postgresBeginForeignScan; routine->IterateForeignScan = postgresIterateForeignScan; routine->ReScanForeignScan = postgresReScanForeignScan; routine->EndForeignScan = postgresEndForeignScan; - /* Optional handler functions. */ + /* Functions for updating foreign tables */ + routine->AddForeignUpdateTargets = postgresAddForeignUpdateTargets; + routine->PlanForeignModify = postgresPlanForeignModify; + routine->BeginForeignModify = postgresBeginForeignModify; + routine->ExecForeignInsert = postgresExecForeignInsert; + routine->ExecForeignUpdate = postgresExecForeignUpdate; + routine->ExecForeignDelete = postgresExecForeignDelete; + routine->EndForeignModify = postgresEndForeignModify; + + /* Support functions for EXPLAIN */ + routine->ExplainForeignScan = postgresExplainForeignScan; + routine->ExplainForeignModify = postgresExplainForeignModify; + + /* Support functions for ANALYZE */ routine->AnalyzeForeignTable = postgresAnalyzeForeignTable; PG_RETURN_POINTER(routine); @@ -249,7 +344,6 @@ postgresGetForeignRelSize(PlannerInfo *root, Oid foreigntableid) { bool use_remote_estimate = false; - ListCell *lc; PgFdwRelationInfo *fpinfo; StringInfo sql; ForeignTable *table; @@ -266,12 +360,14 @@ postgresGetForeignRelSize(PlannerInfo *root, List *param_conds; List *local_conds; List *param_numbers; + Bitmapset *attrs_used; + ListCell *lc; /* * We use PgFdwRelationInfo to pass various information to subsequent * functions. */ - fpinfo = palloc0(sizeof(PgFdwRelationInfo)); + fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo)); initStringInfo(&fpinfo->sql); sql = &fpinfo->sql; @@ -303,16 +399,37 @@ postgresGetForeignRelSize(PlannerInfo *root, } /* - * Construct remote query which consists of SELECT, FROM, and WHERE - * clauses. Conditions which contain any Param node are excluded because - * placeholder can't be used in EXPLAIN statement. Such conditions are - * appended later. + * Identify which restriction clauses can be sent to the remote server and + * which can't. Conditions that are remotely executable but contain + * PARAM_EXTERN Params have to be treated separately because we can't use + * placeholders in remote EXPLAIN. */ classifyConditions(root, baserel, &remote_conds, ¶m_conds, &local_conds, ¶m_numbers); - deparseSimpleSql(sql, root, baserel, local_conds); - if (list_length(remote_conds) > 0) - appendWhereClause(sql, true, remote_conds, root); + + /* + * Identify which attributes will need to be retrieved from the remote + * server. These include all attrs needed for joins or final output, plus + * all attrs used in the local_conds. + */ + attrs_used = NULL; + pull_varattnos((Node *) baserel->reltargetlist, baserel->relid, + &attrs_used); + foreach(lc, local_conds) + { + RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc); + + pull_varattnos((Node *) rinfo->clause, baserel->relid, + &attrs_used); + } + + /* + * Construct remote query which consists of SELECT, FROM, and WHERE + * clauses. For now, leave out the param_conds. + */ + deparseSelectSql(sql, root, baserel, attrs_used); + if (remote_conds) + appendWhereClause(sql, root, remote_conds, true); /* * If the table or the server is configured to use remote estimates, @@ -336,7 +453,7 @@ postgresGetForeignRelSize(PlannerInfo *root, userid = rte->checkAsUser ? rte->checkAsUser : GetUserId(); user = GetUserMapping(userid, server->serverid); - conn = GetConnection(server, user); + conn = GetConnection(server, user, false); get_remote_estimate(sql->data, conn, &rows, &width, &startup_cost, &total_cost); ReleaseConnection(conn); @@ -403,11 +520,53 @@ postgresGetForeignRelSize(PlannerInfo *root, /* * Finish deparsing remote query by adding conditions which were unusable - * in remote EXPLAIN since they contain Param nodes. + * in remote EXPLAIN because they contain Param nodes. */ - if (list_length(param_conds) > 0) - appendWhereClause(sql, !(list_length(remote_conds) > 0), param_conds, - root); + if (param_conds) + appendWhereClause(sql, root, param_conds, (remote_conds == NIL)); + + /* + * Add FOR UPDATE/SHARE if appropriate. We apply locking during the + * initial row fetch, rather than later on as is done for local tables. + * The extra roundtrips involved in trying to duplicate the local + * semantics exactly don't seem worthwhile (see also comments for + * RowMarkType). + */ + if (baserel->relid == root->parse->resultRelation && + (root->parse->commandType == CMD_UPDATE || + root->parse->commandType == CMD_DELETE)) + { + /* Relation is UPDATE/DELETE target, so use FOR UPDATE */ + appendStringInfo(sql, " FOR UPDATE"); + } + else + { + RowMarkClause *rc = get_parse_rowmark(root->parse, baserel->relid); + + if (rc) + { + /* + * Relation is specified as a FOR UPDATE/SHARE target, so handle + * that. + * + * For now, just ignore any [NO] KEY specification, since (a) it's + * not clear what that means for a remote table that we don't have + * complete information about, and (b) it wouldn't work anyway on + * older remote servers. Likewise, we don't worry about NOWAIT. + */ + switch (rc->strength) + { + case LCS_FORKEYSHARE: + case LCS_FORSHARE: + appendStringInfo(sql, " FOR SHARE"); + break; + case LCS_FORNOKEYUPDATE: + case LCS_FORUPDATE: + appendStringInfo(sql, " FOR UPDATE"); + break; + } + } + } /* * Store obtained information into FDW-private area of RelOptInfo so it's @@ -477,7 +636,7 @@ postgresGetForeignPaths(PlannerInfo *root, /* * Build the fdw_private list that will be available to the executor. - * Items in the list must match enum FdwPrivateIndex, above. + * Items in the list must match enum FdwScanPrivateIndex, above. */ fdw_private = list_make2(makeString(fpinfo->sql.data), fpinfo->param_numbers); @@ -574,24 +733,6 @@ postgresGetForeignPlan(PlannerInfo *root, } /* - * postgresExplainForeignScan - * Produce extra output for EXPLAIN - */ -static void -postgresExplainForeignScan(ForeignScanState *node, ExplainState *es) -{ - List *fdw_private; - char *sql; - - if (es->verbose) - { - fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private; - sql = strVal(list_nth(fdw_private, FdwPrivateSelectSql)); - ExplainPropertyText("Remote SQL", sql, es); - } -} - -/* * postgresBeginForeignScan * Initiate an executor scan of a foreign PostgreSQL table. */ @@ -600,7 +741,7 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) { ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan; EState *estate = node->ss.ps.state; - PgFdwExecutionState *festate; + PgFdwScanState *fsstate; RangeTblEntry *rte; Oid userid; ForeignTable *table; @@ -619,8 +760,8 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) /* * We'll save private state in node->fdw_state. */ - festate = (PgFdwExecutionState *) palloc0(sizeof(PgFdwExecutionState)); - node->fdw_state = (void *) festate; + fsstate = (PgFdwScanState *) palloc0(sizeof(PgFdwScanState)); + node->fdw_state = (void *) fsstate; /* * Identify which user to do the remote access as. This should match what @@ -630,8 +771,8 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) userid = rte->checkAsUser ? rte->checkAsUser : GetUserId(); /* Get info about foreign table. */ - festate->rel = node->ss.ss_currentRelation; - table = GetForeignTable(RelationGetRelid(festate->rel)); + fsstate->rel = node->ss.ss_currentRelation; + table = GetForeignTable(RelationGetRelid(fsstate->rel)); server = GetForeignServer(table->serverid); user = GetUserMapping(userid, server->serverid); @@ -639,29 +780,29 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) * Get connection to the foreign server. Connection manager will * establish new connection if necessary. */ - festate->conn = GetConnection(server, user); + fsstate->conn = GetConnection(server, user, false); /* Assign a unique ID for my cursor */ - festate->cursor_number = GetCursorNumber(festate->conn); - festate->cursor_exists = false; + fsstate->cursor_number = GetCursorNumber(fsstate->conn); + fsstate->cursor_exists = false; /* Get private info created by planner functions. */ - festate->fdw_private = fsplan->fdw_private; + fsstate->fdw_private = fsplan->fdw_private; /* Create contexts for batches of tuples and per-tuple temp workspace. */ - festate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt, + fsstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt, "postgres_fdw tuple data", ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); - festate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt, + fsstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt, "postgres_fdw temporary data", ALLOCSET_SMALL_MINSIZE, ALLOCSET_SMALL_INITSIZE, ALLOCSET_SMALL_MAXSIZE); /* Get info we'll need for data conversion. */ - festate->attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(festate->rel)); + fsstate->attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(fsstate->rel)); /* * Allocate buffer for query parameters, if the remote conditions use any. @@ -673,7 +814,7 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) * null values that are arbitrarily marked as being of type int4. */ param_numbers = (List *) - list_nth(festate->fdw_private, FdwPrivateExternParamIds); + list_nth(fsstate->fdw_private, FdwScanPrivateExternParamIds); if (param_numbers != NIL) { ParamListInfo params = estate->es_param_list_info; @@ -682,21 +823,21 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) } else numParams = 0; - festate->numParams = numParams; + fsstate->numParams = numParams; if (numParams > 0) { /* we initially fill all slots with value = NULL, type = int4 */ - festate->param_types = (Oid *) palloc(numParams * sizeof(Oid)); - festate->param_values = (const char **) palloc0(numParams * sizeof(char *)); + fsstate->param_types = (Oid *) palloc(numParams * sizeof(Oid)); + fsstate->param_values = (const char **) palloc0(numParams * sizeof(char *)); for (i = 0; i < numParams; i++) - festate->param_types[i] = INT4OID; + fsstate->param_types[i] = INT4OID; } else { - festate->param_types = NULL; - festate->param_values = NULL; + fsstate->param_types = NULL; + fsstate->param_values = NULL; } - festate->extparams_done = false; + fsstate->extparams_done = false; } /* @@ -707,33 +848,33 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) static TupleTableSlot * postgresIterateForeignScan(ForeignScanState *node) { - PgFdwExecutionState *festate = (PgFdwExecutionState *) node->fdw_state; + PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; TupleTableSlot *slot = node->ss.ss_ScanTupleSlot; /* * If this is the first call after Begin or ReScan, we need to create the * cursor on the remote side. */ - if (!festate->cursor_exists) + if (!fsstate->cursor_exists) create_cursor(node); /* * Get some more tuples, if we've run out. */ - if (festate->next_tuple >= festate->num_tuples) + if (fsstate->next_tuple >= fsstate->num_tuples) { /* No point in another fetch if we already detected EOF, though. */ - if (!festate->eof_reached) + if (!fsstate->eof_reached) fetch_more_data(node); /* If we didn't get any tuples, must be end of data. */ - if (festate->next_tuple >= festate->num_tuples) + if (fsstate->next_tuple >= fsstate->num_tuples) return ExecClearTuple(slot); } /* * Return the next tuple. */ - ExecStoreTuple(festate->tuples[festate->next_tuple++], + ExecStoreTuple(fsstate->tuples[fsstate->next_tuple++], slot, InvalidBuffer, false); @@ -748,7 +889,7 @@ postgresIterateForeignScan(ForeignScanState *node) static void postgresReScanForeignScan(ForeignScanState *node) { - PgFdwExecutionState *festate = (PgFdwExecutionState *) node->fdw_state; + PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; char sql[64]; PGresult *res; @@ -758,7 +899,7 @@ postgresReScanForeignScan(ForeignScanState *node) */ /* If we haven't created the cursor yet, nothing to do. */ - if (!festate->cursor_exists) + if (!fsstate->cursor_exists) return; /* @@ -769,19 +910,19 @@ postgresReScanForeignScan(ForeignScanState *node) */ if (node->ss.ps.chgParam != NULL) { - festate->cursor_exists = false; + fsstate->cursor_exists = false; snprintf(sql, sizeof(sql), "CLOSE c%u", - festate->cursor_number); + fsstate->cursor_number); } - else if (festate->fetch_ct_2 > 1) + else if (fsstate->fetch_ct_2 > 1) { snprintf(sql, sizeof(sql), "MOVE BACKWARD ALL IN c%u", - festate->cursor_number); + fsstate->cursor_number); } else { /* Easy: just rescan what we already have in memory, if anything */ - festate->next_tuple = 0; + fsstate->next_tuple = 0; return; } @@ -789,17 +930,17 @@ postgresReScanForeignScan(ForeignScanState *node) * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = PQexec(festate->conn, sql); + res = PQexec(fsstate->conn, sql); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, true, sql); PQclear(res); /* Now force a fresh FETCH. */ - festate->tuples = NULL; - festate->num_tuples = 0; - festate->next_tuple = 0; - festate->fetch_ct_2 = 0; - festate->eof_reached = false; + fsstate->tuples = NULL; + fsstate->num_tuples = 0; + fsstate->next_tuple = 0; + fsstate->fetch_ct_2 = 0; + fsstate->eof_reached = false; } /* @@ -809,24 +950,530 @@ postgresReScanForeignScan(ForeignScanState *node) static void postgresEndForeignScan(ForeignScanState *node) { - PgFdwExecutionState *festate = (PgFdwExecutionState *) node->fdw_state; + PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; - /* if festate is NULL, we are in EXPLAIN; nothing to do */ - if (festate == NULL) + /* if fsstate is NULL, we are in EXPLAIN; nothing to do */ + if (fsstate == NULL) return; /* Close the cursor if open, to prevent accumulation of cursors */ - if (festate->cursor_exists) - close_cursor(festate->conn, festate->cursor_number); + if (fsstate->cursor_exists) + close_cursor(fsstate->conn, fsstate->cursor_number); /* Release remote connection */ - ReleaseConnection(festate->conn); - festate->conn = NULL; + ReleaseConnection(fsstate->conn); + fsstate->conn = NULL; /* MemoryContexts will be deleted automatically. */ } /* + * postgresAddForeignUpdateTargets + * Add resjunk column(s) needed for update/delete on a foreign table + */ +static void +postgresAddForeignUpdateTargets(Query *parsetree, + RangeTblEntry *target_rte, + Relation target_relation) +{ + Var *var; + const char *attrname; + TargetEntry *tle; + + /* + * In postgres_fdw, what we need is the ctid, same as for a regular table. + */ + + /* Make a Var representing the desired value */ + var = makeVar(parsetree->resultRelation, + SelfItemPointerAttributeNumber, + TIDOID, + -1, + InvalidOid, + 0); + + /* Wrap it in a resjunk TLE with the right name ... */ + attrname = "ctid"; + + tle = makeTargetEntry((Expr *) var, + list_length(parsetree->targetList) + 1, + pstrdup(attrname), + true); + + /* ... and add it to the query's targetlist */ + parsetree->targetList = lappend(parsetree->targetList, tle); +} + +/* + * postgresPlanForeignModify + * Plan an insert/update/delete operation on a foreign table + * + * Note: currently, the plan tree generated for UPDATE/DELETE will always + * include a ForeignScan that retrieves ctids (using SELECT FOR UPDATE) + * and then the ModifyTable node will have to execute individual remote + * UPDATE/DELETE commands. If there are no local conditions or joins + * needed, it'd be better to let the scan node do UPDATE/DELETE RETURNING + * and then do nothing at ModifyTable. Room for future optimization ... + */ +static List * +postgresPlanForeignModify(PlannerInfo *root, + ModifyTable *plan, + Index resultRelation, + int subplan_index) +{ + CmdType operation = plan->operation; + StringInfoData sql; + List *targetAttrs = NIL; + List *returningList = NIL; + + initStringInfo(&sql); + + /* + * Construct a list of the columns that are to be assigned during INSERT + * or UPDATE. We should transmit only these columns, for performance and + * to respect any DEFAULT values the remote side may have for other + * columns. (XXX this will need some re-thinking when we support default + * expressions for foreign tables.) + */ + if (operation == CMD_INSERT || operation == CMD_UPDATE) + { + RangeTblEntry *rte = planner_rt_fetch(resultRelation, root); + Bitmapset *tmpset = bms_copy(rte->modifiedCols); + AttrNumber col; + + while ((col = bms_first_member(tmpset)) >= 0) + { + col += FirstLowInvalidHeapAttributeNumber; + if (col <= InvalidAttrNumber) /* shouldn't happen */ + elog(ERROR, "system-column update is not supported"); + targetAttrs = lappend_int(targetAttrs, col); + } + } + + /* + * Extract the relevant RETURNING list if any. + */ + if (plan->returningLists) + returningList = (List *) list_nth(plan->returningLists, subplan_index); + + /* + * Construct the SQL command string. + */ + switch (operation) + { + case CMD_INSERT: + deparseInsertSql(&sql, root, resultRelation, + targetAttrs, returningList); + break; + case CMD_UPDATE: + deparseUpdateSql(&sql, root, resultRelation, + targetAttrs, returningList); + break; + case CMD_DELETE: + deparseDeleteSql(&sql, root, resultRelation, returningList); + break; + default: + elog(ERROR, "unexpected operation: %d", (int) operation); + break; + } + + /* + * Build the fdw_private list that will be available to the executor. + * Items in the list must match enum FdwModifyPrivateIndex, above. + */ + return list_make3(makeString(sql.data), + targetAttrs, + makeInteger((returningList != NIL))); +} + +/* + * postgresBeginForeignModify + * Begin an insert/update/delete operation on a foreign table + */ +static void +postgresBeginForeignModify(ModifyTableState *mtstate, + ResultRelInfo *resultRelInfo, + List *fdw_private, + int subplan_index, + int eflags) +{ + PgFdwModifyState *fmstate; + EState *estate = mtstate->ps.state; + CmdType operation = mtstate->operation; + Relation rel = resultRelInfo->ri_RelationDesc; + RangeTblEntry *rte; + Oid userid; + ForeignTable *table; + ForeignServer *server; + UserMapping *user; + AttrNumber n_params; + Oid typefnoid; + bool isvarlena; + ListCell *lc; + + /* + * Do nothing in EXPLAIN (no ANALYZE) case. resultRelInfo->ri_FdwState + * stays NULL. + */ + if (eflags & EXEC_FLAG_EXPLAIN_ONLY) + return; + + /* Begin constructing PgFdwModifyState. */ + fmstate = (PgFdwModifyState *) palloc0(sizeof(PgFdwModifyState)); + fmstate->rel = rel; + + /* + * Identify which user to do the remote access as. This should match what + * ExecCheckRTEPerms() does. + */ + rte = rt_fetch(resultRelInfo->ri_RangeTableIndex, estate->es_range_table); + userid = rte->checkAsUser ? rte->checkAsUser : GetUserId(); + + /* Get info about foreign table. */ + table = GetForeignTable(RelationGetRelid(rel)); + server = GetForeignServer(table->serverid); + user = GetUserMapping(userid, server->serverid); + + /* Open connection; report that we'll create a prepared statement. */ + fmstate->conn = GetConnection(server, user, true); + fmstate->p_name = NULL; /* prepared statement not made yet */ + + /* Deconstruct fdw_private data. */ + fmstate->query = strVal(list_nth(fdw_private, + FdwModifyPrivateUpdateSql)); + fmstate->target_attrs = (List *) list_nth(fdw_private, + FdwModifyPrivateTargetAttnums); + fmstate->has_returning = intVal(list_nth(fdw_private, + FdwModifyPrivateHasReturning)); + + /* Create context for per-tuple temp workspace. */ + fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt, + "postgres_fdw temporary data", + ALLOCSET_SMALL_MINSIZE, + ALLOCSET_SMALL_INITSIZE, + ALLOCSET_SMALL_MAXSIZE); + + /* Prepare for input conversion of RETURNING results. */ + if (fmstate->has_returning) + fmstate->attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(rel)); + + /* Prepare for output conversion of parameters used in prepared stmt. */ + n_params = list_length(fmstate->target_attrs) + 1; + fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params); + fmstate->p_nums = 0; + + if (operation == CMD_UPDATE || operation == CMD_DELETE) + { + /* Find the ctid resjunk column in the subplan's result */ + Plan *subplan = mtstate->mt_plans[subplan_index]->plan; + + fmstate->ctidAttno = ExecFindJunkAttributeInTlist(subplan->targetlist, + "ctid"); + if (!AttributeNumberIsValid(fmstate->ctidAttno)) + elog(ERROR, "could not find junk ctid column"); + + /* First transmittable parameter will be ctid */ + getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena); + fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]); + fmstate->p_nums++; + } + + if (operation == CMD_INSERT || operation == CMD_UPDATE) + { + /* Set up for remaining transmittable parameters */ + foreach(lc, fmstate->target_attrs) + { + int attnum = lfirst_int(lc); + Form_pg_attribute attr = RelationGetDescr(rel)->attrs[attnum - 1]; + + Assert(!attr->attisdropped); + + getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena); + fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]); + fmstate->p_nums++; + } + } + + Assert(fmstate->p_nums <= n_params); + + resultRelInfo->ri_FdwState = fmstate; +} + +/* + * postgresExecForeignInsert + * Insert one row into a foreign table + */ +static TupleTableSlot * +postgresExecForeignInsert(EState *estate, + ResultRelInfo *resultRelInfo, + TupleTableSlot *slot, + TupleTableSlot *planSlot) +{ + PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState; + const char **p_values; + PGresult *res; + int n_rows; + + /* Set up the prepared statement on the remote server, if we didn't yet */ + if (!fmstate->p_name) + prepare_foreign_modify(fmstate); + + /* Convert parameters needed by prepared statement to text form */ + p_values = convert_prep_stmt_params(fmstate, NULL, slot); + + /* + * Execute the prepared statement, and check for success. + * + * We don't use a PG_TRY block here, so be careful not to throw error + * without releasing the PGresult. + */ + res = PQexecPrepared(fmstate->conn, + fmstate->p_name, + fmstate->p_nums, + p_values, + NULL, + NULL, + 0); + if (PQresultStatus(res) != + (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) + pgfdw_report_error(ERROR, res, true, fmstate->query); + + /* Check number of rows affected, and fetch RETURNING tuple if any */ + if (fmstate->has_returning) + { + n_rows = PQntuples(res); + if (n_rows > 0) + store_returning_result(fmstate, slot, res); + } + else + n_rows = atoi(PQcmdTuples(res)); + + /* And clean up */ + PQclear(res); + + MemoryContextReset(fmstate->temp_cxt); + + /* Return NULL if nothing was inserted on the remote end */ + return (n_rows > 0) ? slot : NULL; +} + +/* + * postgresExecForeignUpdate + * Update one row in a foreign table + */ +static TupleTableSlot * +postgresExecForeignUpdate(EState *estate, + ResultRelInfo *resultRelInfo, + TupleTableSlot *slot, + TupleTableSlot *planSlot) +{ + PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState; + Datum datum; + bool isNull; + const char **p_values; + PGresult *res; + int n_rows; + + /* Set up the prepared statement on the remote server, if we didn't yet */ + if (!fmstate->p_name) + prepare_foreign_modify(fmstate); + + /* Get the ctid that was passed up as a resjunk column */ + datum = ExecGetJunkAttribute(planSlot, + fmstate->ctidAttno, + &isNull); + /* shouldn't ever get a null result... */ + if (isNull) + elog(ERROR, "ctid is NULL"); + + /* Convert parameters needed by prepared statement to text form */ + p_values = convert_prep_stmt_params(fmstate, + (ItemPointer) DatumGetPointer(datum), + slot); + + /* + * Execute the prepared statement, and check for success. + * + * We don't use a PG_TRY block here, so be careful not to throw error + * without releasing the PGresult. + */ + res = PQexecPrepared(fmstate->conn, + fmstate->p_name, + fmstate->p_nums, + p_values, + NULL, + NULL, + 0); + if (PQresultStatus(res) != + (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) + pgfdw_report_error(ERROR, res, true, fmstate->query); + + /* Check number of rows affected, and fetch RETURNING tuple if any */ + if (fmstate->has_returning) + { + n_rows = PQntuples(res); + if (n_rows > 0) + store_returning_result(fmstate, slot, res); + } + else + n_rows = atoi(PQcmdTuples(res)); + + /* And clean up */ + PQclear(res); + + MemoryContextReset(fmstate->temp_cxt); + + /* Return NULL if nothing was updated on the remote end */ + return (n_rows > 0) ? slot : NULL; +} + +/* + * postgresExecForeignDelete + * Delete one row from a foreign table + */ +static TupleTableSlot * +postgresExecForeignDelete(EState *estate, + ResultRelInfo *resultRelInfo, + TupleTableSlot *slot, + TupleTableSlot *planSlot) +{ + PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState; + Datum datum; + bool isNull; + const char **p_values; + PGresult *res; + int n_rows; + + /* Set up the prepared statement on the remote server, if we didn't yet */ + if (!fmstate->p_name) + prepare_foreign_modify(fmstate); + + /* Get the ctid that was passed up as a resjunk column */ + datum = ExecGetJunkAttribute(planSlot, + fmstate->ctidAttno, + &isNull); + /* shouldn't ever get a null result... */ + if (isNull) + elog(ERROR, "ctid is NULL"); + + /* Convert parameters needed by prepared statement to text form */ + p_values = convert_prep_stmt_params(fmstate, + (ItemPointer) DatumGetPointer(datum), + NULL); + + /* + * Execute the prepared statement, and check for success. + * + * We don't use a PG_TRY block here, so be careful not to throw error + * without releasing the PGresult. + */ + res = PQexecPrepared(fmstate->conn, + fmstate->p_name, + fmstate->p_nums, + p_values, + NULL, + NULL, + 0); + if (PQresultStatus(res) != + (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) + pgfdw_report_error(ERROR, res, true, fmstate->query); + + /* Check number of rows affected, and fetch RETURNING tuple if any */ + if (fmstate->has_returning) + { + n_rows = PQntuples(res); + if (n_rows > 0) + store_returning_result(fmstate, slot, res); + } + else + n_rows = atoi(PQcmdTuples(res)); + + /* And clean up */ + PQclear(res); + + MemoryContextReset(fmstate->temp_cxt); + + /* Return NULL if nothing was deleted on the remote end */ + return (n_rows > 0) ? slot : NULL; +} + +/* + * postgresEndForeignModify + * Finish an insert/update/delete operation on a foreign table + */ +static void +postgresEndForeignModify(EState *estate, + ResultRelInfo *resultRelInfo) +{ + PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState; + + /* If fmstate is NULL, we are in EXPLAIN; nothing to do */ + if (fmstate == NULL) + return; + + /* If we created a prepared statement, destroy it */ + if (fmstate->p_name) + { + char sql[64]; + PGresult *res; + + snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name); + + /* + * We don't use a PG_TRY block here, so be careful not to throw error + * without releasing the PGresult. + */ + res = PQexec(fmstate->conn, sql); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pgfdw_report_error(ERROR, res, true, sql); + PQclear(res); + fmstate->p_name = NULL; + } + + /* Release remote connection */ + ReleaseConnection(fmstate->conn); + fmstate->conn = NULL; +} + +/* + * postgresExplainForeignScan + * Produce extra output for EXPLAIN of a ForeignScan on a foreign table + */ +static void +postgresExplainForeignScan(ForeignScanState *node, ExplainState *es) +{ + List *fdw_private; + char *sql; + + if (es->verbose) + { + fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private; + sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql)); + ExplainPropertyText("Remote SQL", sql, es); + } +} + +/* + * postgresExplainForeignModify + * Produce extra output for EXPLAIN of a ModifyTable on a foreign table + */ +static void +postgresExplainForeignModify(ModifyTableState *mtstate, + ResultRelInfo *rinfo, + List *fdw_private, + int subplan_index, + ExplainState *es) +{ + if (es->verbose) + { + char *sql = strVal(list_nth(fdw_private, + FdwModifyPrivateUpdateSql)); + + ExplainPropertyText("Remote SQL", sql, es); + } +} + +/* * Estimate costs of executing given SQL statement. */ static void @@ -885,11 +1532,11 @@ get_remote_estimate(const char *sql, PGconn *conn, static void create_cursor(ForeignScanState *node) { - PgFdwExecutionState *festate = (PgFdwExecutionState *) node->fdw_state; - int numParams = festate->numParams; - Oid *types = festate->param_types; - const char **values = festate->param_values; - PGconn *conn = festate->conn; + PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + int numParams = fsstate->numParams; + Oid *types = fsstate->param_types; + const char **values = fsstate->param_values; + PGconn *conn = fsstate->conn; char *sql; StringInfoData buf; PGresult *res; @@ -904,14 +1551,14 @@ create_cursor(ForeignScanState *node) * recreate the cursor after a rescan, so we could need to re-use the * values anyway. */ - if (numParams > 0 && !festate->extparams_done) + if (numParams > 0 && !fsstate->extparams_done) { ParamListInfo params = node->ss.ps.state->es_param_list_info; List *param_numbers; ListCell *lc; param_numbers = (List *) - list_nth(festate->fdw_private, FdwPrivateExternParamIds); + list_nth(fsstate->fdw_private, FdwScanPrivateExternParamIds); foreach(lc, param_numbers) { int paramno = lfirst_int(lc); @@ -929,8 +1576,8 @@ create_cursor(ForeignScanState *node) * same OIDs we do for the parameters' types. * * We'd not need to pass a type array to PQexecParams at all, - * except that there may be unused holes in the array, which - * will have to be filled with something or the remote server will + * except that there may be unused holes in the array, which will + * have to be filled with something or the remote server will * complain. We arbitrarily set them to INT4OID earlier. */ types[paramno - 1] = InvalidOid; @@ -951,14 +1598,14 @@ create_cursor(ForeignScanState *node) prm->value); } } - festate->extparams_done = true; + fsstate->extparams_done = true; } /* Construct the DECLARE CURSOR command */ - sql = strVal(list_nth(festate->fdw_private, FdwPrivateSelectSql)); + sql = strVal(list_nth(fsstate->fdw_private, FdwScanPrivateSelectSql)); initStringInfo(&buf); appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s", - festate->cursor_number, sql); + fsstate->cursor_number, sql); /* * We don't use a PG_TRY block here, so be careful not to throw error @@ -971,12 +1618,12 @@ create_cursor(ForeignScanState *node) PQclear(res); /* Mark the cursor as created, and show no tuples have been retrieved */ - festate->cursor_exists = true; - festate->tuples = NULL; - festate->num_tuples = 0; - festate->next_tuple = 0; - festate->fetch_ct_2 = 0; - festate->eof_reached = false; + fsstate->cursor_exists = true; + fsstate->tuples = NULL; + fsstate->num_tuples = 0; + fsstate->next_tuple = 0; + fsstate->fetch_ct_2 = 0; + fsstate->eof_reached = false; /* Clean up */ pfree(buf.data); @@ -988,7 +1635,7 @@ create_cursor(ForeignScanState *node) static void fetch_more_data(ForeignScanState *node) { - PgFdwExecutionState *festate = (PgFdwExecutionState *) node->fdw_state; + PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; PGresult *volatile res = NULL; MemoryContext oldcontext; @@ -996,14 +1643,14 @@ fetch_more_data(ForeignScanState *node) * We'll store the tuples in the batch_cxt. First, flush the previous * batch. */ - festate->tuples = NULL; - MemoryContextReset(festate->batch_cxt); - oldcontext = MemoryContextSwitchTo(festate->batch_cxt); + fsstate->tuples = NULL; + MemoryContextReset(fsstate->batch_cxt); + oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt); /* PGresult must be released before leaving this function. */ PG_TRY(); { - PGconn *conn = festate->conn; + PGconn *conn = fsstate->conn; char sql[64]; int fetch_size; int numrows; @@ -1013,36 +1660,36 @@ fetch_more_data(ForeignScanState *node) fetch_size = 100; snprintf(sql, sizeof(sql), "FETCH %d FROM c%u", - fetch_size, festate->cursor_number); + fetch_size, fsstate->cursor_number); res = PQexec(conn, sql); /* On error, report the original query, not the FETCH. */ if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, false, - strVal(list_nth(festate->fdw_private, - FdwPrivateSelectSql))); + strVal(list_nth(fsstate->fdw_private, + FdwScanPrivateSelectSql))); /* Convert the data into HeapTuples */ numrows = PQntuples(res); - festate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple)); - festate->num_tuples = numrows; - festate->next_tuple = 0; + fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple)); + fsstate->num_tuples = numrows; + fsstate->next_tuple = 0; for (i = 0; i < numrows; i++) { - festate->tuples[i] = + fsstate->tuples[i] = make_tuple_from_result_row(res, i, - festate->rel, - festate->attinmeta, - festate->temp_cxt); + fsstate->rel, + fsstate->attinmeta, + fsstate->temp_cxt); } /* Update fetch_ct_2 */ - if (festate->fetch_ct_2 < 2) - festate->fetch_ct_2++; + if (fsstate->fetch_ct_2 < 2) + fsstate->fetch_ct_2++; /* Must be EOF if we didn't get as many tuples as we asked for. */ - festate->eof_reached = (numrows < fetch_size); + fsstate->eof_reached = (numrows < fetch_size); PQclear(res); res = NULL; @@ -1080,6 +1727,136 @@ close_cursor(PGconn *conn, unsigned int cursor_number) } /* + * prepare_foreign_modify + * Establish a prepared statement for execution of INSERT/UPDATE/DELETE + */ +static void +prepare_foreign_modify(PgFdwModifyState *fmstate) +{ + char prep_name[NAMEDATALEN]; + char *p_name; + PGresult *res; + + /* Construct name we'll use for the prepared statement. */ + snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u", + GetPrepStmtNumber(fmstate->conn)); + p_name = pstrdup(prep_name); + + /* + * We intentionally do not specify parameter types here, but leave the + * remote server to derive them by default. This avoids possible problems + * with the remote server using different type OIDs than we do. All of + * the prepared statements we use in this module are simple enough that + * the remote server will make the right choices. + * + * We don't use a PG_TRY block here, so be careful not to throw error + * without releasing the PGresult. + */ + res = PQprepare(fmstate->conn, + p_name, + fmstate->query, + 0, + NULL); + + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pgfdw_report_error(ERROR, res, true, fmstate->query); + PQclear(res); + + /* This action shows that the prepare has been done. */ + fmstate->p_name = p_name; +} + +/* + * convert_prep_stmt_params + * Create array of text strings representing parameter values + * + * tupleid is ctid to send, or NULL if none + * slot is slot to get remaining parameters from, or NULL if none + * + * Data is constructed in temp_cxt; caller should reset that after use. + */ +static const char ** +convert_prep_stmt_params(PgFdwModifyState *fmstate, + ItemPointer tupleid, + TupleTableSlot *slot) +{ + const char **p_values; + int pindex = 0; + MemoryContext oldcontext; + + oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt); + + p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums); + + /* 1st parameter should be ctid, if it's in use */ + if (tupleid != NULL) + { + p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex], + PointerGetDatum(tupleid)); + pindex++; + } + + /* get following parameters from slot */ + if (slot != NULL) + { + ListCell *lc; + + foreach(lc, fmstate->target_attrs) + { + int attnum = lfirst_int(lc); + Datum value; + bool isnull; + + value = slot_getattr(slot, attnum, &isnull); + if (isnull) + p_values[pindex] = NULL; + else + p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex], + value); + pindex++; + } + } + + Assert(pindex == fmstate->p_nums); + + MemoryContextSwitchTo(oldcontext); + + return p_values; +} + +/* + * store_returning_result + * Store the result of a RETURNING clause + * + * On error, be sure to release the PGresult on the way out. Callers do not + * have PG_TRY blocks to ensure this happens. + */ +static void +store_returning_result(PgFdwModifyState *fmstate, + TupleTableSlot *slot, PGresult *res) +{ + /* PGresult must be released before leaving this function. */ + PG_TRY(); + { + HeapTuple newtup; + + newtup = make_tuple_from_result_row(res, 0, + fmstate->rel, + fmstate->attinmeta, + fmstate->temp_cxt); + /* tuple will be deleted when it is cleared from the slot */ + ExecStoreTuple(newtup, slot, InvalidBuffer, true); + } + PG_CATCH(); + { + if (res) + PQclear(res); + PG_RE_THROW(); + } + PG_END_TRY(); +} + +/* * postgresAnalyzeForeignTable * Test whether analyzing this foreign table is supported */ @@ -1099,7 +1876,7 @@ postgresAnalyzeForeignTable(Relation relation, *func = postgresAcquireSampleRowsFunc; /* - * Now we have to get the number of pages. It's annoying that the ANALYZE + * Now we have to get the number of pages. It's annoying that the ANALYZE * API requires us to return that now, because it forces some duplication * of effort between this routine and postgresAcquireSampleRowsFunc. But * it's probably not worth redefining that API at this point. @@ -1112,7 +1889,7 @@ postgresAnalyzeForeignTable(Relation relation, table = GetForeignTable(RelationGetRelid(relation)); server = GetForeignServer(table->serverid); user = GetUserMapping(relation->rd_rel->relowner, server->serverid); - conn = GetConnection(server, user); + conn = GetConnection(server, user, false); /* * Construct command to get page count for relation. @@ -1204,7 +1981,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel, table = GetForeignTable(RelationGetRelid(relation)); server = GetForeignServer(table->serverid); user = GetUserMapping(relation->rd_rel->relowner, server->serverid); - conn = GetConnection(server, user); + conn = GetConnection(server, user, false); /* * Construct cursor that retrieves whole rows from remote. @@ -1382,6 +2159,7 @@ make_tuple_from_result_row(PGresult *res, Form_pg_attribute *attrs = tupdesc->attrs; Datum *values; bool *nulls; + ItemPointer ctid = NULL; ConversionLocation errpos; ErrorContextCallback errcallback; MemoryContext oldcontext; @@ -1449,6 +2227,21 @@ make_tuple_from_result_row(PGresult *res, j++; } + /* + * Convert ctid if present. XXX we could stand to have a cleaner way of + * detecting whether ctid is included in the result. + */ + if (j < PQnfields(res)) + { + char *valstr; + Datum datum; + + valstr = PQgetvalue(res, row, j); + datum = DirectFunctionCall1(tidin, CStringGetDatum(valstr)); + ctid = (ItemPointer) DatumGetPointer(datum); + j++; + } + /* Uninstall error context callback. */ error_context_stack = errcallback.previous; @@ -1463,6 +2256,9 @@ make_tuple_from_result_row(PGresult *res, tuple = heap_form_tuple(tupdesc, values, nulls); + if (ctid) + tuple->t_self = *ctid; + /* Clean up */ MemoryContextReset(temp_context); |