diff options
author | Robert Haas <rhaas@postgresql.org> | 2018-04-06 11:29:43 -0400 |
---|---|---|
committer | Robert Haas <rhaas@postgresql.org> | 2018-04-06 11:29:43 -0400 |
commit | 870d89608e5f891275d0b752560c827dbce3d7b4 (patch) | |
tree | bacd5f6badb957f2f81185cf886779da3354a32b /contrib/postgres_fdw/postgres_fdw.c | |
parent | bcf79b5bb648d30696406034a61ce0ca3dcb0dea (diff) | |
download | postgresql-870d89608e5f891275d0b752560c827dbce3d7b4.tar.gz postgresql-870d89608e5f891275d0b752560c827dbce3d7b4.zip |
Refactor PgFdwModifyState creation/destruction into separate functions.
Etsuro Fujita. The larger patch series of which this is a part has
been reviewed by Amit Langote, David Fetter, Maksim Milyutin,
Álvaro Herrera, Stephen Frost, and me.
Discussion: http://postgr.es/m/5A95487E.9050808@lab.ntt.co.jp
Diffstat (limited to 'contrib/postgres_fdw/postgres_fdw.c')
-rw-r--r-- | contrib/postgres_fdw/postgres_fdw.c | 278 |
1 files changed, 169 insertions, 109 deletions
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index a15ce28a48b..e7441c759ba 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -376,12 +376,21 @@ static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel, static void create_cursor(ForeignScanState *node); static void fetch_more_data(ForeignScanState *node); static void close_cursor(PGconn *conn, unsigned int cursor_number); +static PgFdwModifyState *create_foreign_modify(EState *estate, + ResultRelInfo *resultRelInfo, + CmdType operation, + Plan *subplan, + char *query, + List *target_attrs, + bool has_returning, + List *retrieved_attrs); static void prepare_foreign_modify(PgFdwModifyState *fmstate); static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate, ItemPointer tupleid, TupleTableSlot *slot); static void store_returning_result(PgFdwModifyState *fmstate, TupleTableSlot *slot, PGresult *res); +static void finish_foreign_modify(PgFdwModifyState *fmstate); static List *build_remote_returning(Index rtindex, Relation rel, List *returningList); static void rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist); @@ -1681,18 +1690,10 @@ postgresBeginForeignModify(ModifyTableState *mtstate, int eflags) { PgFdwModifyState *fmstate; - EState *estate = mtstate->ps.state; - CmdType operation = mtstate->operation; - Relation rel = resultRelInfo->ri_RelationDesc; - RangeTblEntry *rte; - Oid userid; - ForeignTable *table; - UserMapping *user; - AttrNumber n_params; - Oid typefnoid; - bool isvarlena; - ListCell *lc; - TupleDesc tupdesc = RelationGetDescr(rel); + char *query; + List *target_attrs; + bool has_returning; + List *retrieved_attrs; /* * Do nothing in EXPLAIN (no ANALYZE) case. resultRelInfo->ri_FdwState @@ -1701,82 +1702,25 @@ postgresBeginForeignModify(ModifyTableState *mtstate, if (eflags & EXEC_FLAG_EXPLAIN_ONLY) return; - /* Begin constructing PgFdwModifyState. */ - fmstate = (PgFdwModifyState *) palloc0(sizeof(PgFdwModifyState)); - fmstate->rel = rel; - - /* - * Identify which user to do the remote access as. This should match what - * ExecCheckRTEPerms() does. - */ - rte = rt_fetch(resultRelInfo->ri_RangeTableIndex, estate->es_range_table); - userid = rte->checkAsUser ? rte->checkAsUser : GetUserId(); - - /* Get info about foreign table. */ - table = GetForeignTable(RelationGetRelid(rel)); - user = GetUserMapping(userid, table->serverid); - - /* Open connection; report that we'll create a prepared statement. */ - fmstate->conn = GetConnection(user, true); - fmstate->p_name = NULL; /* prepared statement not made yet */ - /* Deconstruct fdw_private data. */ - fmstate->query = strVal(list_nth(fdw_private, - FdwModifyPrivateUpdateSql)); - fmstate->target_attrs = (List *) list_nth(fdw_private, - FdwModifyPrivateTargetAttnums); - fmstate->has_returning = intVal(list_nth(fdw_private, - FdwModifyPrivateHasReturning)); - fmstate->retrieved_attrs = (List *) list_nth(fdw_private, - FdwModifyPrivateRetrievedAttrs); - - /* Create context for per-tuple temp workspace. */ - fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt, - "postgres_fdw temporary data", - ALLOCSET_SMALL_SIZES); - - /* Prepare for input conversion of RETURNING results. */ - if (fmstate->has_returning) - fmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc); - - /* Prepare for output conversion of parameters used in prepared stmt. */ - n_params = list_length(fmstate->target_attrs) + 1; - fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params); - fmstate->p_nums = 0; - - if (operation == CMD_UPDATE || operation == CMD_DELETE) - { - /* Find the ctid resjunk column in the subplan's result */ - Plan *subplan = mtstate->mt_plans[subplan_index]->plan; - - fmstate->ctidAttno = ExecFindJunkAttributeInTlist(subplan->targetlist, - "ctid"); - if (!AttributeNumberIsValid(fmstate->ctidAttno)) - elog(ERROR, "could not find junk ctid column"); - - /* First transmittable parameter will be ctid */ - getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena); - fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]); - fmstate->p_nums++; - } - - if (operation == CMD_INSERT || operation == CMD_UPDATE) - { - /* Set up for remaining transmittable parameters */ - foreach(lc, fmstate->target_attrs) - { - int attnum = lfirst_int(lc); - Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1); - - Assert(!attr->attisdropped); - - getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena); - fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]); - fmstate->p_nums++; - } - } - - Assert(fmstate->p_nums <= n_params); + query = strVal(list_nth(fdw_private, + FdwModifyPrivateUpdateSql)); + target_attrs = (List *) list_nth(fdw_private, + FdwModifyPrivateTargetAttnums); + has_returning = intVal(list_nth(fdw_private, + FdwModifyPrivateHasReturning)); + retrieved_attrs = (List *) list_nth(fdw_private, + FdwModifyPrivateRetrievedAttrs); + + /* Construct an execution state. */ + fmstate = create_foreign_modify(mtstate->ps.state, + resultRelInfo, + mtstate->operation, + mtstate->mt_plans[subplan_index]->plan, + query, + target_attrs, + has_returning, + retrieved_attrs); resultRelInfo->ri_FdwState = fmstate; } @@ -2011,28 +1955,8 @@ postgresEndForeignModify(EState *estate, if (fmstate == NULL) return; - /* If we created a prepared statement, destroy it */ - if (fmstate->p_name) - { - char sql[64]; - PGresult *res; - - snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name); - - /* - * We don't use a PG_TRY block here, so be careful not to throw error - * without releasing the PGresult. - */ - res = pgfdw_exec_query(fmstate->conn, sql); - if (PQresultStatus(res) != PGRES_COMMAND_OK) - pgfdw_report_error(ERROR, res, fmstate->conn, true, sql); - PQclear(res); - fmstate->p_name = NULL; - } - - /* Release remote connection */ - ReleaseConnection(fmstate->conn); - fmstate->conn = NULL; + /* Destroy the execution state */ + finish_foreign_modify(fmstate); } /* @@ -3229,6 +3153,109 @@ close_cursor(PGconn *conn, unsigned int cursor_number) } /* + * create_foreign_modify + * Construct an execution state of a foreign insert/update/delete + * operation + */ +static PgFdwModifyState * +create_foreign_modify(EState *estate, + ResultRelInfo *resultRelInfo, + CmdType operation, + Plan *subplan, + char *query, + List *target_attrs, + bool has_returning, + List *retrieved_attrs) +{ + PgFdwModifyState *fmstate; + Relation rel = resultRelInfo->ri_RelationDesc; + TupleDesc tupdesc = RelationGetDescr(rel); + RangeTblEntry *rte; + Oid userid; + ForeignTable *table; + UserMapping *user; + AttrNumber n_params; + Oid typefnoid; + bool isvarlena; + ListCell *lc; + + /* Begin constructing PgFdwModifyState. */ + fmstate = (PgFdwModifyState *) palloc0(sizeof(PgFdwModifyState)); + fmstate->rel = rel; + + /* + * Identify which user to do the remote access as. This should match what + * ExecCheckRTEPerms() does. + */ + rte = rt_fetch(resultRelInfo->ri_RangeTableIndex, estate->es_range_table); + userid = rte->checkAsUser ? rte->checkAsUser : GetUserId(); + + /* Get info about foreign table. */ + table = GetForeignTable(RelationGetRelid(rel)); + user = GetUserMapping(userid, table->serverid); + + /* Open connection; report that we'll create a prepared statement. */ + fmstate->conn = GetConnection(user, true); + fmstate->p_name = NULL; /* prepared statement not made yet */ + + /* Set up remote query information. */ + fmstate->query = query; + fmstate->target_attrs = target_attrs; + fmstate->has_returning = has_returning; + fmstate->retrieved_attrs = retrieved_attrs; + + /* Create context for per-tuple temp workspace. */ + fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt, + "postgres_fdw temporary data", + ALLOCSET_SMALL_SIZES); + + /* Prepare for input conversion of RETURNING results. */ + if (fmstate->has_returning) + fmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc); + + /* Prepare for output conversion of parameters used in prepared stmt. */ + n_params = list_length(fmstate->target_attrs) + 1; + fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params); + fmstate->p_nums = 0; + + if (operation == CMD_UPDATE || operation == CMD_DELETE) + { + Assert(subplan != NULL); + + /* Find the ctid resjunk column in the subplan's result */ + fmstate->ctidAttno = ExecFindJunkAttributeInTlist(subplan->targetlist, + "ctid"); + if (!AttributeNumberIsValid(fmstate->ctidAttno)) + elog(ERROR, "could not find junk ctid column"); + + /* First transmittable parameter will be ctid */ + getTypeOutputInfo(TIDOID, &typefnoid, &isvarlena); + fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]); + fmstate->p_nums++; + } + + if (operation == CMD_INSERT || operation == CMD_UPDATE) + { + /* Set up for remaining transmittable parameters */ + foreach(lc, fmstate->target_attrs) + { + int attnum = lfirst_int(lc); + Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1); + + Assert(!attr->attisdropped); + + getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena); + fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]); + fmstate->p_nums++; + } + } + + Assert(fmstate->p_nums <= n_params); + + return fmstate; +} + +/* * prepare_foreign_modify * Establish a prepared statement for execution of INSERT/UPDATE/DELETE */ @@ -3371,6 +3398,39 @@ store_returning_result(PgFdwModifyState *fmstate, } /* + * finish_foreign_modify + * Release resources for a foreign insert/update/delete operation + */ +static void +finish_foreign_modify(PgFdwModifyState *fmstate) +{ + Assert(fmstate != NULL); + + /* If we created a prepared statement, destroy it */ + if (fmstate->p_name) + { + char sql[64]; + PGresult *res; + + snprintf(sql, sizeof(sql), "DEALLOCATE %s", fmstate->p_name); + + /* + * We don't use a PG_TRY block here, so be careful not to throw error + * without releasing the PGresult. + */ + res = pgfdw_exec_query(fmstate->conn, sql); + if (PQresultStatus(res) != PGRES_COMMAND_OK) + pgfdw_report_error(ERROR, res, fmstate->conn, true, sql); + PQclear(res); + fmstate->p_name = NULL; + } + + /* Release remote connection */ + ReleaseConnection(fmstate->conn); + fmstate->conn = NULL; +} + +/* * build_remote_returning * Build a RETURNING targetlist of a remote query for performing an * UPDATE/DELETE .. RETURNING on a join directly |