diff options
Diffstat (limited to 'contrib/postgres_fdw/postgres_fdw.c')
-rw-r--r-- | contrib/postgres_fdw/postgres_fdw.c | 1066 |
1 files changed, 931 insertions, 135 deletions
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index a46597f02ea..a6db061d603 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -15,16 +15,21 @@ #include "postgres_fdw.h" #include "access/htup_details.h" +#include "access/sysattr.h" #include "commands/defrem.h" #include "commands/explain.h" #include "commands/vacuum.h" #include "foreign/fdwapi.h" #include "funcapi.h" #include "miscadmin.h" +#include "nodes/makefuncs.h" #include "optimizer/cost.h" #include "optimizer/pathnode.h" #include "optimizer/planmain.h" +#include "optimizer/prep.h" +#include "optimizer/var.h" #include "parser/parsetree.h" +#include "utils/builtins.h" #include "utils/lsyscache.h" #include "utils/memutils.h" @@ -58,7 +63,7 @@ typedef struct PgFdwRelationInfo } PgFdwRelationInfo; /* - * Indexes of FDW-private information stored in fdw_private list. + * Indexes of FDW-private information stored in fdw_private lists. * * We store various information in ForeignScan.fdw_private to pass it from * planner to executor. Specifically there is: @@ -66,26 +71,41 @@ typedef struct PgFdwRelationInfo * 1) SELECT statement text to be sent to the remote server * 2) IDs of PARAM_EXEC Params used in the SELECT statement * - * These items are indexed with the enum FdwPrivateIndex, so an item can be - * fetched with list_nth(). For example, to get the SELECT statement: - * sql = strVal(list_nth(fdw_private, FdwPrivateSelectSql)); + * These items are indexed with the enum FdwScanPrivateIndex, so an item + * can be fetched with list_nth(). For example, to get the SELECT statement: + * sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql)); */ -enum FdwPrivateIndex +enum FdwScanPrivateIndex { /* SQL statement to execute remotely (as a String node) */ - FdwPrivateSelectSql, - + FdwScanPrivateSelectSql, /* Integer list of param IDs of PARAM_EXEC Params used in SQL stmt */ - FdwPrivateExternParamIds, + FdwScanPrivateExternParamIds +}; - /* # of elements stored in the list fdw_private */ - FdwPrivateNum +/* + * Similarly, this enum describes what's kept in the fdw_private list for + * a ModifyTable node referencing a postgres_fdw foreign table. We store: + * + * 1) INSERT/UPDATE/DELETE statement text to be sent to the remote server + * 2) Integer list of target attribute numbers for INSERT/UPDATE + * (NIL for a DELETE) + * 3) Boolean flag showing if there's a RETURNING clause + */ +enum FdwModifyPrivateIndex +{ + /* SQL statement to execute remotely (as a String node) */ + FdwModifyPrivateUpdateSql, + /* Integer list of target attribute numbers for INSERT/UPDATE */ + FdwModifyPrivateTargetAttnums, + /* has-returning flag (as an integer Value node) */ + FdwModifyPrivateHasReturning }; /* * Execution state of a foreign scan using postgres_fdw. */ -typedef struct PgFdwExecutionState +typedef struct PgFdwScanState { Relation rel; /* relcache entry for the foreign table */ AttInMetadata *attinmeta; /* attribute datatype conversion metadata */ @@ -113,7 +133,33 @@ typedef struct PgFdwExecutionState /* working memory contexts */ MemoryContext batch_cxt; /* context holding current batch of tuples */ MemoryContext temp_cxt; /* context for per-tuple temporary data */ -} PgFdwExecutionState; +} PgFdwScanState; + +/* + * Execution state of a foreign insert/update/delete operation. + */ +typedef struct PgFdwModifyState +{ + Relation rel; /* relcache entry for the foreign table */ + AttInMetadata *attinmeta; /* attribute datatype conversion metadata */ + + /* for remote query execution */ + PGconn *conn; /* connection for the scan */ + char *p_name; /* name of prepared statement, if created */ + + /* extracted fdw_private data */ + char *query; /* text of INSERT/UPDATE/DELETE command */ + List *target_attrs; /* list of target attribute numbers */ + bool has_returning; /* is there a RETURNING clause? */ + + /* info about parameters for prepared statement */ + AttrNumber ctidAttno; /* attnum of input resjunk ctid column */ + int p_nums; /* number of parameters to transmit */ + FmgrInfo *p_flinfo; /* output conversion functions for them */ + + /* working memory context */ + MemoryContext temp_cxt; /* context for per-tuple temporary data */ +} PgFdwModifyState; /* * Workspace for analyzing a foreign table. @@ -169,12 +215,43 @@ static ForeignScan *postgresGetForeignPlan(PlannerInfo *root, ForeignPath *best_path, List *tlist, List *scan_clauses); -static void postgresExplainForeignScan(ForeignScanState *node, - ExplainState *es); static void postgresBeginForeignScan(ForeignScanState *node, int eflags); static TupleTableSlot *postgresIterateForeignScan(ForeignScanState *node); static void postgresReScanForeignScan(ForeignScanState *node); static void postgresEndForeignScan(ForeignScanState *node); +static void postgresAddForeignUpdateTargets(Query *parsetree, + RangeTblEntry *target_rte, + Relation target_relation); +static List *postgresPlanForeignModify(PlannerInfo *root, + ModifyTable *plan, + Index resultRelation, + int subplan_index); +static void postgresBeginForeignModify(ModifyTableState *mtstate, + ResultRelInfo *resultRelInfo, + List *fdw_private, + int subplan_index, + int eflags); +static TupleTableSlot *postgresExecForeignInsert(EState *estate, + ResultRelInfo *resultRelInfo, + TupleTableSlot *slot, + TupleTableSlot *planSlot); +static TupleTableSlot *postgresExecForeignUpdate(EState *estate, + ResultRelInfo *resultRelInfo, + TupleTableSlot *slot, + TupleTableSlot *planSlot); +static TupleTableSlot *postgresExecForeignDelete(EState *estate, + ResultRelInfo *resultRelInfo, + TupleTableSlot *slot, + TupleTableSlot *planSlot); +static void postgresEndForeignModify(EState *estate, + ResultRelInfo *resultRelInfo); +static void postgresExplainForeignScan(ForeignScanState *node, + ExplainState *es); +static void postgresExplainForeignModify(ModifyTableState *mtstate, + ResultRelInfo *rinfo, + List *fdw_private, + int subplan_index, + ExplainState *es); static bool postgresAnalyzeForeignTable(Relation relation, AcquireSampleRowsFunc *func, BlockNumber *totalpages); @@ -191,6 +268,12 @@ static void get_remote_estimate(const char *sql, static void create_cursor(ForeignScanState *node); static void fetch_more_data(ForeignScanState *node); static void close_cursor(PGconn *conn, unsigned int cursor_number); +static void prepare_foreign_modify(PgFdwModifyState *fmstate); +static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate, + ItemPointer tupleid, + TupleTableSlot *slot); +static void store_returning_result(PgFdwModifyState *fmstate, + TupleTableSlot *slot, PGresult *res); static int postgresAcquireSampleRowsFunc(Relation relation, int elevel, HeapTuple *rows, int targrows, double *totalrows, @@ -214,17 +297,29 @@ postgres_fdw_handler(PG_FUNCTION_ARGS) { FdwRoutine *routine = makeNode(FdwRoutine); - /* Required handler functions. */ + /* Functions for scanning foreign tables */ routine->GetForeignRelSize = postgresGetForeignRelSize; routine->GetForeignPaths = postgresGetForeignPaths; routine->GetForeignPlan = postgresGetForeignPlan; - routine->ExplainForeignScan = postgresExplainForeignScan; routine->BeginForeignScan = postgresBeginForeignScan; routine->IterateForeignScan = postgresIterateForeignScan; routine->ReScanForeignScan = postgresReScanForeignScan; routine->EndForeignScan = postgresEndForeignScan; - /* Optional handler functions. */ + /* Functions for updating foreign tables */ + routine->AddForeignUpdateTargets = postgresAddForeignUpdateTargets; + routine->PlanForeignModify = postgresPlanForeignModify; + routine->BeginForeignModify = postgresBeginForeignModify; + routine->ExecForeignInsert = postgresExecForeignInsert; + routine->ExecForeignUpdate = postgresExecForeignUpdate; + routine->ExecForeignDelete = postgresExecForeignDelete; + routine->EndForeignModify = postgresEndForeignModify; + + /* Support functions for EXPLAIN */ + routine->ExplainForeignScan = postgresExplainForeignScan; + routine->ExplainForeignModify = postgresExplainForeignModify; + + /* Support functions for ANALYZE */ routine->AnalyzeForeignTable = postgresAnalyzeForeignTable; PG_RETURN_POINTER(routine); @@ -249,7 +344,6 @@ postgresGetForeignRelSize(PlannerInfo *root, Oid foreigntableid) { bool use_remote_estimate = false; - ListCell *lc; PgFdwRelationInfo *fpinfo; StringInfo sql; ForeignTable *table; @@ -266,12 +360,14 @@ postgresGetForeignRelSize(PlannerInfo *root, List *param_conds; List *local_conds; List *param_numbers; + Bitmapset *attrs_used; + ListCell *lc; /* * We use PgFdwRelationInfo to pass various information to subsequent * functions. */ - fpinfo = palloc0(sizeof(PgFdwRelationInfo)); + fpinfo = (PgFdwRelationInfo *) palloc0(sizeof(PgFdwRelationInfo)); initStringInfo(&fpinfo->sql); sql = &fpinfo->sql; @@ -303,16 +399,37 @@ postgresGetForeignRelSize(PlannerInfo *root, } /* - * Construct remote query which consists of SELECT, FROM, and WHERE - * clauses. Conditions which contain any Param node are excluded because - * placeholder can't be used in EXPLAIN statement. Such conditions are - * appended later. + * Identify which restriction clauses can be sent to the remote server and + * which can't. Conditions that are remotely executable but contain + * PARAM_EXTERN Params have to be treated separately because we can't use + * placeholders in remote EXPLAIN. */ classifyConditions(root, baserel, &remote_conds, ¶m_conds, &local_conds, ¶m_numbers); - deparseSimpleSql(sql, root, baserel, local_conds); - if (list_length(remote_conds) > 0) - appendWhereClause(sql, true, remote_conds, root); + + /* + * Identify which attributes will need to be retrieved from the remote + * server. These include all attrs needed for joins or final output, plus + * all attrs used in the local_conds. + */ + attrs_used = NULL; + pull_varattnos((Node *) baserel->reltargetlist, baserel->relid, + &attrs_used); + foreach(lc, local_conds) + { + RestrictInfo *rinfo = (RestrictInfo *) lfirst(lc); + + pull_varattnos((Node *) rinfo->clause, baserel->relid, + &attrs_used); + } + + /* + * Construct remote query which consists of SELECT, FROM, and WHERE + * clauses. For now, leave out the param_conds. + */ + deparseSelectSql(sql, root, baserel, attrs_used); + if (remote_conds) + appendWhereClause(sql, root, remote_conds, true); /* * If the table or the server is configured to use remote estimates, @@ -336,7 +453,7 @@ postgresGetForeignRelSize(PlannerInfo *root, userid = rte->checkAsUser ? rte->checkAsUser : GetUserId(); user = GetUserMapping(userid, server->serverid); - conn = GetConnection(server, user); + conn = GetConnection(server, user, false); get_remote_estimate(sql->data, conn, &rows, &width, &startup_cost, &total_cost); ReleaseConnection(conn); @@ -403,11 +520,53 @@ postgresGetForeignRelSize(PlannerInfo *root, /* * Finish deparsing remote query by adding conditions which were unusable - * in remote EXPLAIN since they contain Param nodes. + * in remote EXPLAIN because they contain Param nodes. */ - if (list_length(param_conds) > 0) - appendWhereClause(sql, !(list_length(remote_conds) > 0), param_conds, - root); + if (param_conds) + appendWhereClause(sql, root, param_conds, (remote_conds == NIL)); + + /* + * Add FOR UPDATE/SHARE if appropriate. We apply locking during the + * initial row fetch, rather than later on as is done for local tables. + * The extra roundtrips involved in trying to duplicate the local + * semantics exactly don't seem worthwhile (see also comments for + * RowMarkType). + */ + if (baserel->relid == root->parse->resultRelation && + (root->parse->commandType == CMD_UPDATE || + root->parse->commandType == CMD_DELETE)) + { + /* Relation is UPDATE/DELETE target, so use FOR UPDATE */ + appendStringInfo(sql, " FOR UPDATE"); + } + else + { + RowMarkClause *rc = get_parse_rowmark(root->parse, baserel->relid); + + if (rc) + { + /* + * Relation is specified as a FOR UPDATE/SHARE target, so handle + * that. + * + * For now, just ignore any [NO] KEY specification, since (a) it's + * not clear what that means for a remote table that we don't have + * complete information about, and (b) it wouldn't work anyway on + * older remote servers. Likewise, we don't worry about NOWAIT. + */ + switch (rc->strength) + { + case LCS_FORKEYSHARE: + case LCS_FORSHARE: + appendStringInfo(sql, " FOR SHARE"); + break; + case LCS_FORNOKEYUPDATE: + case LCS_FORUPDATE: + appendStringInfo(sql, " FOR UPDATE"); + break; + } + } + } /* * Store obtained information into FDW-private area of RelOptInfo so it's @@ -477,7 +636,7 @@ postgresGetForeignPaths(PlannerInfo *root, /* * Build the fdw_private list that will be available to the executor. - * Items in the list must match enum FdwPrivateIndex, above. + * Items in the list must match enum FdwScanPrivateIndex, above. */ fdw_private = list_make2(makeString(fpinfo->sql.data), fpinfo->param_numbers); @@ -574,24 +733,6 @@ postgresGetForeignPlan(PlannerInfo *root, } /* - * postgresExplainForeignScan - * Produce extra output for EXPLAIN - */ -static void -postgresExplainForeignScan(ForeignScanState *node, ExplainState *es) -{ - List *fdw_private; - char *sql; - - if (es->verbose) - { - fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private; - sql = strVal(list_nth(fdw_private, FdwPrivateSelectSql)); - ExplainPropertyText("Remote SQL", sql, es); - } -} - -/* * postgresBeginForeignScan * Initiate an executor scan of a foreign PostgreSQL table. */ @@ -600,7 +741,7 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) { ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan; EState *estate = node->ss.ps.state; - PgFdwExecutionState *festate; + PgFdwScanState *fsstate; RangeTblEntry *rte; Oid userid; ForeignTable *table; @@ -619,8 +760,8 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) /* * We'll save private state in node->fdw_state. */ - festate = (PgFdwExecutionState *) palloc0(sizeof(PgFdwExecutionState)); - node->fdw_state = (void *) festate; + fsstate = (PgFdwScanState *) palloc0(sizeof(PgFdwScanState)); + node->fdw_state = (void *) fsstate; /* * Identify which user to do the remote access as. This should match what @@ -630,8 +771,8 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) userid = rte->checkAsUser ? rte->checkAsUser : GetUserId(); /* Get info about foreign table. */ - festate->rel = node->ss.ss_currentRelation; - table = GetForeignTable(RelationGetRelid(festate->rel)); + fsstate->rel = node->ss.ss_currentRelation; + table = GetForeignTable(RelationGetRelid(fsstate->rel)); server = GetForeignServer(table->serverid); user = GetUserMapping(userid, server->serverid); @@ -639,29 +780,29 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) * Get connection to the foreign server. Connection manager will * establish new connection if necessary. */ - festate->conn = GetConnection(server, user); + fsstate->conn = GetConnection(server, user, false); /* Assign a unique ID for my cursor */ - festate->cursor_number = GetCursorNumber(festate->conn); - festate->cursor_exists = false; + fsstate->cursor_number = GetCursorNumber(fsstate->conn); + fsstate->cursor_exists = false; /* Get private info created by planner functions. */ - festate->fdw_private = fsplan->fdw_private; + fsstate->fdw_private = fsplan->fdw_private; /* Create contexts for batches of tuples and per-tuple temp workspace. */ - festate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt, + fsstate->batch_cxt = AllocSetContextCreate(estate->es_query_cxt, "postgres_fdw tuple data", ALLOCSET_DEFAULT_MINSIZE, ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE); - festate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt, + fsstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt, "postgres_fdw temporary data", ALLOCSET_SMALL_MINSIZE, ALLOCSET_SMALL_INITSIZE, ALLOCSET_SMALL_MAXSIZE); /* Get info we'll need for data conversion. */ - festate->attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(festate->rel)); + fsstate->attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(fsstate->rel)); /* * Allocate buffer for query parameters, if the remote conditions use any. @@ -673,7 +814,7 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) * null values that are arbitrarily marked as being of type int4. */ param_numbers = (List *) - list_nth(festate->fdw_private, FdwPrivateExternParamIds); + list_nth(fsstate->fdw_private, FdwScanPrivateExternParamIds); if (param_numbers != NIL) { ParamListInfo params = estate->es_param_list_info; @@ -682,21 +823,21 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) } else numParams = 0; - festate->numParams = numParams; + fsstate->numParams = numParams; if (numParams > 0) { /* we initially fill all slots with value = NULL, type = int4 */ - festate->param_types = (Oid *) palloc(numParams * sizeof(Oid)); - festate->param_values = (const char **) palloc0(numParams * sizeof(char *)); + fsstate->param_types = (Oid *) palloc(numParams * sizeof(Oid)); + fsstate->param_values = (const char **) palloc0(numParams * sizeof(char *)); for (i = 0; i < numParams; i++) - festate->param_types[i] = INT4OID; + fsstate->param_types[i] = INT4OID; } else { - festate->param_types = NULL; - festate->param_values = NULL; + fsstate->param_types = NULL; + fsstate->param_values = NULL; } - festate->extparams_done = false; + fsstate->extparams_done = false; } /* @@ -707,33 +848,33 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags) static TupleTableSlot * postgresIterateForeignScan(ForeignScanState *node) { - PgFdwExecutionState *festate = (PgFdwExecutionState *) node->fdw_state; + PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; TupleTableSlot *slot = node->ss.ss_ScanTupleSlot; /* * If this is the first call after Begin or ReScan, we need to create the * cursor on the remote side. */ - if (!festate->cursor_exists) + if (!fsstate->cursor_exists) create_cursor(node); /* * Get some more tuples, if we've run out. */ - if (festate->next_tuple >= festate->num_tuples) + if (fsstate->next_tuple >= fsstate->num_tuples) { /* No point in another fetch if we already detected EOF, though. */ - if (!festate->eof_reached) + if (!fsstate->eof_reached) fetch_more_data(node); /* If we didn't get any tuples, must be end of data. */ - if (festate->next_tuple >= festate->num_tuples) + if (fsstate->next_tuple >= fsstate->num_tuples) return ExecClearTuple(slot); } /* * Return the next tuple. */ - ExecStoreTuple(festate->tuples[festate->next_tuple++], + ExecStoreTuple(fsstate->tuples[fsstate->next_tuple++], slot, InvalidBuffer, false); @@ -748,7 +889,7 @@ postgresIterateForeignScan(ForeignScanState *node) static void postgresReScanForeignScan(ForeignScanState *node) { - PgFdwExecutionState *festate = (PgFdwExecutionState *) node->fdw_state; + PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; char sql[64]; PGresult *res; @@ -758,7 +899,7 @@ postgresReScanForeignScan(ForeignScanState *node) */ /* If we haven't created the cursor yet, nothing to do. */ - if (!festate->cursor_exists) + if (!fsstate->cursor_exists) return; /* @@ -769,19 +910,19 @@ postgresReScanForeignScan(ForeignScanState *node) */ if (node->ss.ps.chgParam != NULL) { - festate->cursor_exists = false; + fsstate->cursor_exists = false; snprintf(sql, sizeof(sql), "CLOSE c%u", - festate->cursor_number); + fsstate->cursor_number); } - else if (festate->fetch_ct_2 > 1) + else if (fsstate->fetch_ct_2 > 1) { snprintf(sql, sizeof(sql), "MOVE BACKWARD ALL IN c%u", - festate->cursor_number); + fsstate->cursor_number); } else { /* Easy: just rescan what we already have in memory, if anything */ - festate->next_tuple = 0; + fsstate->next_tuple = 0; return; } @@ -789,17 +930,17 @@ postgresReScanForeignScan(ForeignScanState *node) * We don't use a PG_TRY block here, so be careful not to throw error * without releasing the PGresult. */ - res = PQexec(festate->conn, sql); + res = PQexec(fsstate->conn, sql); if (PQresultStatus(res) != PGRES_COMMAND_OK) pgfdw_report_error(ERROR, res, true, sql); PQclear(res); /* Now force a fresh FETCH. */ - festate->tuples = NULL; - festate->num_tuples = 0; - festate->next_tuple = 0; - festate->fetch_ct_2 = 0; - festate->eof_reached = false; + fsstate->tuples = NULL; + fsstate->num_tuples = 0; + fsstate->next_tuple = 0; + fsstate->fetch_ct_2 = 0; + fsstate->eof_reached = false; } /* @@ -809,24 +950,530 @@ postgresReScanForeignScan(ForeignScanState *node) static void postgresEndForeignScan(ForeignScanState *node) { - PgFdwExecutionState *festate = (PgFdwExecutionState *) node->fdw_state; + PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; - /* if festate is NULL, we are in EXPLAIN; nothing to do */ - if (festate == NULL) + /* if fsstate is NULL, we are in EXPLAIN; nothing to do */ + if (fsstate == NULL) return; /* Close the cursor if open, to prevent accumulation of cursors */ - if (festate->cursor_exists) - close_cursor(festate->conn, festate->cursor_number); + if (fsstate->cursor_exists) + close_cursor(fsstate->conn, fsstate->cursor_number); /* Release remote connection */ - ReleaseConnection(festate->conn); - festate->conn = NULL; + ReleaseConnection(fsstate->conn); + fsstate->conn = NULL; /* MemoryContexts will be deleted automatically. */ } /* + * postgresAddForeignUpdateTargets + * Add resjunk column(s) needed for update/delete on a foreign table + */ +static void +postgresAddForeignUpdateTargets(Query *parsetree, + RangeTblEntry *target_rte, + Relation target_relation) +{ + Var *var; + const char *attrname; + TargetEntry *tle; + + /* + * In postgres_fdw, what we need is the ctid, same as for a regular table. + */ + + /* Make a Var representing the desired value */ + var = makeVar(parsetree->resultRelation, + SelfItemPointerAttributeNumber, + TIDOID, + -1, + InvalidOid, + 0); + + /* Wrap it in a resjunk TLE with the right name ... */ + attrname = "ctid"; + + tle = makeTargetEntry((Expr *) var, + list_length(parsetree->targetList) + 1, + pstrdup(attrname), + true); + + /* ... and add it to the query's targetlist */ + parsetree->targetList = lappend(parsetree->targetList, tle); +} + +/* + * postgresPlanForeignModify + * Plan an insert/update/delete operation on a foreign table + * + * Note: currently, the plan tree generated for UPDATE/DELETE will always + * include a ForeignScan that retrieves ctids (using SELECT FOR UPDATE) + * and then the ModifyTable node will have to execute individual remote + * UPDATE/DELETE commands. If there are no local conditions or joins + * needed, it'd be better to let the scan node do UPDATE/DELETE RETURNING + * and then do nothing at ModifyTable. Room for future optimization ... + */ +static List * +postgresPlanForeignModify(PlannerInfo *root, + ModifyTable *plan, + Index resultRelation, + int subplan_index) +{ + CmdType operation = plan->operation; + StringInfoData sql; + List *targetAttrs = NIL; + List *returningList = NIL; + + initStringInfo(&sql); + + /* + * Construct a list of the columns that are to be assigned during INSERT + * or UPDATE. We should transmit only these columns, for performance and + * to respect any DEFAULT values the remote side may have for other + * columns. (XXX this will need some re-thinking when we support default + * expressions for foreign tables.) + */ + if (operation == CMD_INSERT || operation == CMD_UPDATE) + { + RangeTblEntry *rte = planner_rt_fetch(resultRelation, root); + Bitmapset *tmpset = bms_copy(rte->modifiedCols); + AttrNumber col; + + while ((col = bms_first_member(tmpset)) >= 0) + { + col += FirstLowInvalidHeapAttributeNumber; + if (col <= InvalidAttrNumber) /* shouldn't happen */ + elog(ERROR, "system-column update is not supported"); + targetAttrs = lappend_int(targetAttrs, col); + } + } + + /* + * Extract the relevant RETURNING list if any. + */ + if (plan->returningLists) + returningList = (List *) list_nth(plan->returningLists, subplan_index); + + /* + * Construct the SQL command string. + */ + switch (operation) + { + case CMD_INSERT: + deparseInsertSql(&sql, root, resultRelation, + targetAttrs, returningList); + break; + case CMD_UPDATE: + deparseUpdateSql(&sql, root, resultRelation, + targetAttrs, returningList); + break; + case CMD_DELETE: + deparseDeleteSql(&sql, root, resultRelation, returningList); + break; + default: + elog(ERROR, "unexpected operation: %d", (int) operation); + break; + } + + /* + * Build the fdw_private list that will be available to the executor. + * Items in the list must match enum FdwModifyPrivateIndex, above. + */ + return list_make3(makeString(sql.data), + targetAttrs, + makeInteger((returningList != NIL))); +} + +/* + * postgresBeginForeignModify + * Begin an insert/update/delete operation on a foreign table + */ +static void +postgresBeginForeignModify(ModifyTableState *mtstate, + ResultRelInfo *resultRelInfo, + List *fdw_private, + int subplan_index, + int eflags) +{ + PgFdwModifyState *fmstate; + EState *estate = mtstate->ps.state; + CmdType operation = mtstate->operation; + Relation rel = resultRelInfo->ri_RelationDesc; + RangeTblEntry *rte; + Oid userid; + ForeignTable *table; + ForeignServer *server; + UserMapping *user; + AttrNumber n_params; + Oid typefnoid; + bool isvarlena; + ListCell *lc; + + /* + * Do nothing in EXPLAIN (no ANALYZE) case. resultRelInfo->ri_FdwState + * stays NULL. + */ + if (eflags & EXEC_FLAG_EXPLAIN_ONLY) + return; + + /* Begin constructing PgFdwModifyState. */ + fmstate = (PgFdwModifyState *) palloc0(sizeof(PgFdwModifyState)); + fmstate->rel = rel; + + /* + * Identify which user to do the remote access as. This should match what + * ExecCheckRTEPerms() does. + */ + rte = rt_fetch(resultRelInfo->ri_RangeTableIndex, estate->es_range_table); + userid = rte->checkAsUser ? rte->checkAsUser : GetUserId(); + + /* Get info about foreign table. */ + table = GetForeignTable(RelationGetRelid(rel)); + server = GetForeignServer(table->serverid); + user = GetUserMapping(userid, server->serverid); + + /* Open connection; report that we'll create a prepared statement. */ + fmstate->conn = GetConnection(server, user, true); + fmstate->p_name = NULL; /* prepared statement not made yet */ + + /* Deconstruct fdw_private data. */ + fmstate->query = strVal(list_nth(fdw_private, + FdwModifyPrivateUpdateSql)); + fmstate->target_attrs = (List *) list_nth(fdw_private, + FdwModifyPrivateTargetAttnums); + fmstate->has_returning = intVal(list_nth(fdw_private, + FdwModifyPrivateHasReturning)); + + /* Create context for per-tuple temp workspace. */ + fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt, + "postgres_fdw temporary data", + ALLOCSET_SMALL_MINSIZE, + ALLOCSET_SMALL_INITSIZE, + ALLOCSET_SMALL_MAXSIZE); + + /* Prepare for input conversion of RETURNING results. */ + if (fmstate->has_returning) + fmstate->attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(rel)); + + /* Prepare for output conversion of parameters used in prepared stmt. */ + n_params = list_length(fmstate->target_attrs) + 1; + fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params); + fmstate->p_nums = 0; + + if (operation == CMD_UPDATE || operation == CMD_DELETE) + { + /* Find the ctid resjunk column in the subplan's result */ + Plan *subplan = mtstate->mt_plans[subplan_index]->plan; + + fmstate->ctidAttno = ExecFindJunkAttributeInTlist(subplan->targetlist, + "ctid"); + if (!AttributeNumberIsValid(fmstate->ctidAttno)) + elog(ERROR, "could not find junk ctid column"); + + /* First transmittable parameter will be ctid */ + getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena); + fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]); + fmstate->p_nums++; + } + + if (operation == CMD_INSERT || operation == CMD_UPDATE) + { + /* Set up for remaining transmittable parameters */ + foreach(lc, fmstate->target_attrs) + { + int attnum = lfirst_int(lc); + Form_pg_attribute attr = RelationGetDescr(rel)->attrs[attnum - 1]; + + Assert(!attr->attisdropped); + + getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena); + fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]); + fmstate->p_nums++; + } + } + + Assert(fmstate->p_nums <= n_params); + + resultRelInfo->ri_FdwState = fmstate; +} + +/* + * postgresExecForeignInsert + * Insert one row into a foreign table + */ +static TupleTableSlot * +postgresExecForeignInsert(EState *estate, + ResultRelInfo *resultRelInfo, + TupleTableSlot *slot, + TupleTableSlot *planSlot) +{ + PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState; + const char **p_values; + PGresult *res; + int n_rows; + + /* Set up the prepared statement on the remote server, if we didn't yet */ + if (!fmstate->p_name) + prepare_foreign_modify(fmstate); + + /* Convert parameters needed by prepared statement to text form */ + p_values = convert_prep_stmt_params(fmstate, NULL, slot); + + /* + * Execute the prepared statement, and check for success. + * + * We don't use a PG_TRY block here, so be careful not to throw error + * without releasing the PGresult. + */ + res = PQexecPrepared(fmstate->conn, + fmstate->p_name, + fmstate->p_nums, + p_values, + NULL, + NULL, + 0); + if (PQresultStatus(res) != + (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) + pgfdw_report_error(ERROR, res, true, fmstate->query); + + /* Check number of rows affected, and fetch RETURNING tuple if any */ + if (fmstate->has_returning) + { + n_rows = PQntuples(res); + if (n_rows > 0) + store_returning_result(fmstate, slot, res); + } + else + n_rows = atoi(PQcmdTuples(res)); + + /* And clean up */ + PQclear(res); + + MemoryContextReset(fmstate->temp_cxt); + + /* Return NULL if nothing was inserted on the remote end */ + return (n_rows > 0) ? slot : NULL; +} + +/* + * postgresExecForeignUpdate + * Update one row in a foreign table + */ +static TupleTableSlot * +postgresExecForeignUpdate(EState *estate, + ResultRelInfo *resultRelInfo, + TupleTableSlot *slot, + TupleTableSlot *planSlot) +{ + PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState; + Datum datum; + bool isNull; + const char **p_values; + PGresult *res; + int n_rows; + + /* Set up the prepared statement on the remote server, if we didn't yet */ + if (!fmstate->p_name) + prepare_foreign_modify(fmstate); + + /* Get the ctid that was passed up as a resjunk column */ + datum = ExecGetJunkAttribute(planSlot, + fmstate->ctidAttno, + &isNull); + /* shouldn't ever get a null result... */ + if (isNull) + elog(ERROR, "ctid is NULL"); + + /* Convert parameters needed by prepared statement to text form */ + p_values = convert_prep_stmt_params(fmstate, + (ItemPointer) DatumGetPointer(datum), + slot); + + /* + * Execute the prepared statement, and check for success. + * + * We don't use a PG_TRY block here, so be careful not to throw error + * without releasing the PGresult. + */ + res = PQexecPrepared(fmstate->conn, + fmstate->p_name, + fmstate->p_nums, + p_values, + NULL, + NULL, + 0); + if (PQresultStatus(res) != + (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) + pgfdw_report_error(ERROR, res, true, fmstate->query); + + /* Check number of rows affected, and fetch RETURNING tuple if any */ + if (fmstate->has_returning) + { + n_rows = PQntuples(res); + if (n_rows > 0) + store_returning_result(fmstate, slot, res); + } + else + n_rows = atoi(PQcmdTuples(res)); + + /* And clean up */ + PQclear(res); + + MemoryContextReset(fmstate->temp_cxt); + + /* Return NULL if nothing was updated on the remote end */ + return (n_rows > 0) ? slot : NULL; +} + +/* + * postgresExecForeignDelete + * Delete one row from a foreign table + */ +static TupleTableSlot * +postgresExecForeignDelete(EState *estate, + ResultRelInfo *resultRelInfo, + TupleTableSlot *slot, + TupleTableSlot *planSlot) +{ + PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState; + Datum datum; + bool isNull; + const char **p_values; + PGresult *res; + int n_rows; + + /* Set up the prepared statement on the remote server, if we didn't yet */ + if (!fmstate->p_name) + prepare_foreign_modify(fmstate); + + /* Get the ctid that was passed up as a resjunk column */ + datum = ExecGetJunkAttribute(planSlot, + fmstate->ctidAttno, + &isNull); + /* shouldn't ever get a null result... */ + if (isNull) + elog(ERROR, "ctid is NULL"); + + /* Convert parameters needed by prepared statement to text form */ + p_values = convert_prep_stmt_params(fmstate, + (ItemPointer) DatumGetPointer(datum), + NULL); + + /* + * Execute the prepared statement, and check for success. + * + * We don't use a PG_TRY block here, so be careful not to throw error + * without releasing the PGresult. + */ + res = PQexecPrepared(fmstate->conn, + fmstate->p_name, + fmstate->p_nums, + p_values, + NULL, + NULL, + 0); + if (PQresultStatus(res) != + (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK)) + pgfdw_report_error(ERROR, res, true, fmstate->query); + + /* Check number of rows affected, and fetch RETURNING tuple if any */ + if (fmstate->has_returning) + { + n_rows = PQntuples(res); + if (n_rows > 0) + store_returning_result(fmstate, slot, res); + } + else + n_rows = atoi(PQcmdTuples(res)); + + /* And clean up */ + PQclear(res); + + MemoryContextReset(fmstate->temp_cxt); + + /* Return NULL if nothing was deleted on the remote end */ + return (n_rows > 0) ? slot : NULL; +} + +/* + * postgresEndForeignModify + * Finish an insert/update/delete operation on a foreign table + */ +static void +postgresEndForeignModify(EState *estate, + ResultRelInfo *resultRelInfo) +{ + PgFdwModifyState *fmstate = (PgFdwModifyState *) resultRelInfo->ri_FdwState; + + /* If fmstate is NULL, we are in EXPLAIN; nothing to do */ + if (fmstate == NULL) + return; + + /* If we created a prepared statement, destroy it */ + if (fmstate->p_name) + { + char sql[64]; + PGresult *res; + + snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name); + + /* + * We don't use a PG_TRY block here, so be careful not to throw error + * without releasing the PGresult. + */ + res = PQexec(fmstate->conn, sql); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pgfdw_report_error(ERROR, res, true, sql); + PQclear(res); + fmstate->p_name = NULL; + } + + /* Release remote connection */ + ReleaseConnection(fmstate->conn); + fmstate->conn = NULL; +} + +/* + * postgresExplainForeignScan + * Produce extra output for EXPLAIN of a ForeignScan on a foreign table + */ +static void +postgresExplainForeignScan(ForeignScanState *node, ExplainState *es) +{ + List *fdw_private; + char *sql; + + if (es->verbose) + { + fdw_private = ((ForeignScan *) node->ss.ps.plan)->fdw_private; + sql = strVal(list_nth(fdw_private, FdwScanPrivateSelectSql)); + ExplainPropertyText("Remote SQL", sql, es); + } +} + +/* + * postgresExplainForeignModify + * Produce extra output for EXPLAIN of a ModifyTable on a foreign table + */ +static void +postgresExplainForeignModify(ModifyTableState *mtstate, + ResultRelInfo *rinfo, + List *fdw_private, + int subplan_index, + ExplainState *es) +{ + if (es->verbose) + { + char *sql = strVal(list_nth(fdw_private, + FdwModifyPrivateUpdateSql)); + + ExplainPropertyText("Remote SQL", sql, es); + } +} + +/* * Estimate costs of executing given SQL statement. */ static void @@ -885,11 +1532,11 @@ get_remote_estimate(const char *sql, PGconn *conn, static void create_cursor(ForeignScanState *node) { - PgFdwExecutionState *festate = (PgFdwExecutionState *) node->fdw_state; - int numParams = festate->numParams; - Oid *types = festate->param_types; - const char **values = festate->param_values; - PGconn *conn = festate->conn; + PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; + int numParams = fsstate->numParams; + Oid *types = fsstate->param_types; + const char **values = fsstate->param_values; + PGconn *conn = fsstate->conn; char *sql; StringInfoData buf; PGresult *res; @@ -904,14 +1551,14 @@ create_cursor(ForeignScanState *node) * recreate the cursor after a rescan, so we could need to re-use the * values anyway. */ - if (numParams > 0 && !festate->extparams_done) + if (numParams > 0 && !fsstate->extparams_done) { ParamListInfo params = node->ss.ps.state->es_param_list_info; List *param_numbers; ListCell *lc; param_numbers = (List *) - list_nth(festate->fdw_private, FdwPrivateExternParamIds); + list_nth(fsstate->fdw_private, FdwScanPrivateExternParamIds); foreach(lc, param_numbers) { int paramno = lfirst_int(lc); @@ -929,8 +1576,8 @@ create_cursor(ForeignScanState *node) * same OIDs we do for the parameters' types. * * We'd not need to pass a type array to PQexecParams at all, - * except that there may be unused holes in the array, which - * will have to be filled with something or the remote server will + * except that there may be unused holes in the array, which will + * have to be filled with something or the remote server will * complain. We arbitrarily set them to INT4OID earlier. */ types[paramno - 1] = InvalidOid; @@ -951,14 +1598,14 @@ create_cursor(ForeignScanState *node) prm->value); } } - festate->extparams_done = true; + fsstate->extparams_done = true; } /* Construct the DECLARE CURSOR command */ - sql = strVal(list_nth(festate->fdw_private, FdwPrivateSelectSql)); + sql = strVal(list_nth(fsstate->fdw_private, FdwScanPrivateSelectSql)); initStringInfo(&buf); appendStringInfo(&buf, "DECLARE c%u CURSOR FOR\n%s", - festate->cursor_number, sql); + fsstate->cursor_number, sql); /* * We don't use a PG_TRY block here, so be careful not to throw error @@ -971,12 +1618,12 @@ create_cursor(ForeignScanState *node) PQclear(res); /* Mark the cursor as created, and show no tuples have been retrieved */ - festate->cursor_exists = true; - festate->tuples = NULL; - festate->num_tuples = 0; - festate->next_tuple = 0; - festate->fetch_ct_2 = 0; - festate->eof_reached = false; + fsstate->cursor_exists = true; + fsstate->tuples = NULL; + fsstate->num_tuples = 0; + fsstate->next_tuple = 0; + fsstate->fetch_ct_2 = 0; + fsstate->eof_reached = false; /* Clean up */ pfree(buf.data); @@ -988,7 +1635,7 @@ create_cursor(ForeignScanState *node) static void fetch_more_data(ForeignScanState *node) { - PgFdwExecutionState *festate = (PgFdwExecutionState *) node->fdw_state; + PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state; PGresult *volatile res = NULL; MemoryContext oldcontext; @@ -996,14 +1643,14 @@ fetch_more_data(ForeignScanState *node) * We'll store the tuples in the batch_cxt. First, flush the previous * batch. */ - festate->tuples = NULL; - MemoryContextReset(festate->batch_cxt); - oldcontext = MemoryContextSwitchTo(festate->batch_cxt); + fsstate->tuples = NULL; + MemoryContextReset(fsstate->batch_cxt); + oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt); /* PGresult must be released before leaving this function. */ PG_TRY(); { - PGconn *conn = festate->conn; + PGconn *conn = fsstate->conn; char sql[64]; int fetch_size; int numrows; @@ -1013,36 +1660,36 @@ fetch_more_data(ForeignScanState *node) fetch_size = 100; snprintf(sql, sizeof(sql), "FETCH %d FROM c%u", - fetch_size, festate->cursor_number); + fetch_size, fsstate->cursor_number); res = PQexec(conn, sql); /* On error, report the original query, not the FETCH. */ if (PQresultStatus(res) != PGRES_TUPLES_OK) pgfdw_report_error(ERROR, res, false, - strVal(list_nth(festate->fdw_private, - FdwPrivateSelectSql))); + strVal(list_nth(fsstate->fdw_private, + FdwScanPrivateSelectSql))); /* Convert the data into HeapTuples */ numrows = PQntuples(res); - festate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple)); - festate->num_tuples = numrows; - festate->next_tuple = 0; + fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple)); + fsstate->num_tuples = numrows; + fsstate->next_tuple = 0; for (i = 0; i < numrows; i++) { - festate->tuples[i] = + fsstate->tuples[i] = make_tuple_from_result_row(res, i, - festate->rel, - festate->attinmeta, - festate->temp_cxt); + fsstate->rel, + fsstate->attinmeta, + fsstate->temp_cxt); } /* Update fetch_ct_2 */ - if (festate->fetch_ct_2 < 2) - festate->fetch_ct_2++; + if (fsstate->fetch_ct_2 < 2) + fsstate->fetch_ct_2++; /* Must be EOF if we didn't get as many tuples as we asked for. */ - festate->eof_reached = (numrows < fetch_size); + fsstate->eof_reached = (numrows < fetch_size); PQclear(res); res = NULL; @@ -1080,6 +1727,136 @@ close_cursor(PGconn *conn, unsigned int cursor_number) } /* + * prepare_foreign_modify + * Establish a prepared statement for execution of INSERT/UPDATE/DELETE + */ +static void +prepare_foreign_modify(PgFdwModifyState *fmstate) +{ + char prep_name[NAMEDATALEN]; + char *p_name; + PGresult *res; + + /* Construct name we'll use for the prepared statement. */ + snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u", + GetPrepStmtNumber(fmstate->conn)); + p_name = pstrdup(prep_name); + + /* + * We intentionally do not specify parameter types here, but leave the + * remote server to derive them by default. This avoids possible problems + * with the remote server using different type OIDs than we do. All of + * the prepared statements we use in this module are simple enough that + * the remote server will make the right choices. + * + * We don't use a PG_TRY block here, so be careful not to throw error + * without releasing the PGresult. + */ + res = PQprepare(fmstate->conn, + p_name, + fmstate->query, + 0, + NULL); + + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pgfdw_report_error(ERROR, res, true, fmstate->query); + PQclear(res); + + /* This action shows that the prepare has been done. */ + fmstate->p_name = p_name; +} + +/* + * convert_prep_stmt_params + * Create array of text strings representing parameter values + * + * tupleid is ctid to send, or NULL if none + * slot is slot to get remaining parameters from, or NULL if none + * + * Data is constructed in temp_cxt; caller should reset that after use. + */ +static const char ** +convert_prep_stmt_params(PgFdwModifyState *fmstate, + ItemPointer tupleid, + TupleTableSlot *slot) +{ + const char **p_values; + int pindex = 0; + MemoryContext oldcontext; + + oldcontext = MemoryContextSwitchTo(fmstate->temp_cxt); + + p_values = (const char **) palloc(sizeof(char *) * fmstate->p_nums); + + /* 1st parameter should be ctid, if it's in use */ + if (tupleid != NULL) + { + p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex], + PointerGetDatum(tupleid)); + pindex++; + } + + /* get following parameters from slot */ + if (slot != NULL) + { + ListCell *lc; + + foreach(lc, fmstate->target_attrs) + { + int attnum = lfirst_int(lc); + Datum value; + bool isnull; + + value = slot_getattr(slot, attnum, &isnull); + if (isnull) + p_values[pindex] = NULL; + else + p_values[pindex] = OutputFunctionCall(&fmstate->p_flinfo[pindex], + value); + pindex++; + } + } + + Assert(pindex == fmstate->p_nums); + + MemoryContextSwitchTo(oldcontext); + + return p_values; +} + +/* + * store_returning_result + * Store the result of a RETURNING clause + * + * On error, be sure to release the PGresult on the way out. Callers do not + * have PG_TRY blocks to ensure this happens. + */ +static void +store_returning_result(PgFdwModifyState *fmstate, + TupleTableSlot *slot, PGresult *res) +{ + /* PGresult must be released before leaving this function. */ + PG_TRY(); + { + HeapTuple newtup; + + newtup = make_tuple_from_result_row(res, 0, + fmstate->rel, + fmstate->attinmeta, + fmstate->temp_cxt); + /* tuple will be deleted when it is cleared from the slot */ + ExecStoreTuple(newtup, slot, InvalidBuffer, true); + } + PG_CATCH(); + { + if (res) + PQclear(res); + PG_RE_THROW(); + } + PG_END_TRY(); +} + +/* * postgresAnalyzeForeignTable * Test whether analyzing this foreign table is supported */ @@ -1099,7 +1876,7 @@ postgresAnalyzeForeignTable(Relation relation, *func = postgresAcquireSampleRowsFunc; /* - * Now we have to get the number of pages. It's annoying that the ANALYZE + * Now we have to get the number of pages. It's annoying that the ANALYZE * API requires us to return that now, because it forces some duplication * of effort between this routine and postgresAcquireSampleRowsFunc. But * it's probably not worth redefining that API at this point. @@ -1112,7 +1889,7 @@ postgresAnalyzeForeignTable(Relation relation, table = GetForeignTable(RelationGetRelid(relation)); server = GetForeignServer(table->serverid); user = GetUserMapping(relation->rd_rel->relowner, server->serverid); - conn = GetConnection(server, user); + conn = GetConnection(server, user, false); /* * Construct command to get page count for relation. @@ -1204,7 +1981,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel, table = GetForeignTable(RelationGetRelid(relation)); server = GetForeignServer(table->serverid); user = GetUserMapping(relation->rd_rel->relowner, server->serverid); - conn = GetConnection(server, user); + conn = GetConnection(server, user, false); /* * Construct cursor that retrieves whole rows from remote. @@ -1382,6 +2159,7 @@ make_tuple_from_result_row(PGresult *res, Form_pg_attribute *attrs = tupdesc->attrs; Datum *values; bool *nulls; + ItemPointer ctid = NULL; ConversionLocation errpos; ErrorContextCallback errcallback; MemoryContext oldcontext; @@ -1449,6 +2227,21 @@ make_tuple_from_result_row(PGresult *res, j++; } + /* + * Convert ctid if present. XXX we could stand to have a cleaner way of + * detecting whether ctid is included in the result. + */ + if (j < PQnfields(res)) + { + char *valstr; + Datum datum; + + valstr = PQgetvalue(res, row, j); + datum = DirectFunctionCall1(tidin, CStringGetDatum(valstr)); + ctid = (ItemPointer) DatumGetPointer(datum); + j++; + } + /* Uninstall error context callback. */ error_context_stack = errcallback.previous; @@ -1463,6 +2256,9 @@ make_tuple_from_result_row(PGresult *res, tuple = heap_form_tuple(tupdesc, values, nulls); + if (ctid) + tuple->t_self = *ctid; + /* Clean up */ MemoryContextReset(temp_context); |