diff options
author | Robert Haas <rhaas@postgresql.org> | 2018-02-07 15:34:30 -0500 |
---|---|---|
committer | Robert Haas <rhaas@postgresql.org> | 2018-02-07 15:34:30 -0500 |
commit | 1bc0100d270e5bcc980a0629b8726a32a497e788 (patch) | |
tree | 4c0dfc658b7c837705a55d105fed356a51bfc64d /contrib/postgres_fdw/postgres_fdw.c | |
parent | 7c44b75a2a0705bf17d0e7ef02b1a0a769306fa5 (diff) | |
download | postgresql-1bc0100d270e5bcc980a0629b8726a32a497e788.tar.gz postgresql-1bc0100d270e5bcc980a0629b8726a32a497e788.zip |
postgres_fdw: Push down UPDATE/DELETE joins to remote servers.
Commit 0bf3ae88af330496517722e391e7c975e6bad219 allowed direct
foreign table modification; instead of fetching each row, updating
it locally, and then pushing the modification back to the remote
side, we would instead do all the work on the remote server via a
single remote UPDATE or DELETE command. However, that commit only
enabled this optimization when join tree consisted only of the
target table.
This change allows the same optimization when an UPDATE statement
has a FROM clause or a DELETE statement has a USING clause. This
works much like ordinary foreign join pushdown, in that the tables
must be on the same remote server, relevant parts of the query
must be pushdown-safe, and so forth.
Etsuro Fujita, reviewed by Ashutosh Bapat, Rushabh Lathia, and me.
Some formatting corrections by me.
Discussion: http://postgr.es/m/5A57193A.2080003@lab.ntt.co.jp
Discussion: http://postgr.es/m/b9cee735-62f8-6c07-7528-6364ce9347d0@lab.ntt.co.jp
Diffstat (limited to 'contrib/postgres_fdw/postgres_fdw.c')
-rw-r--r-- | contrib/postgres_fdw/postgres_fdw.c | 438 |
1 files changed, 422 insertions, 16 deletions
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 7ff43337a9a..c1d7f8032e5 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -210,6 +210,11 @@ typedef struct PgFdwDirectModifyState PGresult *result; /* result for query */ int num_tuples; /* # of result tuples */ int next_tuple; /* index of next one to return */ + Relation resultRel; /* relcache entry for the target relation */ + AttrNumber *attnoMap; /* array of attnums of input user columns */ + AttrNumber ctidAttno; /* attnum of input ctid column */ + AttrNumber oidAttno; /* attnum of input oid column */ + bool hasSystemCols; /* are there system columns of resultRel? */ /* working memory context */ MemoryContext temp_cxt; /* context for per-tuple temporary data */ @@ -376,8 +381,17 @@ static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate, TupleTableSlot *slot); static void store_returning_result(PgFdwModifyState *fmstate, TupleTableSlot *slot, PGresult *res); +static List *build_remote_returning(Index rtindex, Relation rel, + List *returningList); +static void rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist); static void execute_dml_stmt(ForeignScanState *node); static TupleTableSlot *get_returning_data(ForeignScanState *node); +static void init_returning_filter(PgFdwDirectModifyState *dmstate, + List *fdw_scan_tlist, + Index rtindex); +static TupleTableSlot *apply_returning_filter(PgFdwDirectModifyState *dmstate, + TupleTableSlot *slot, + EState *estate); static void prepare_query_params(PlanState *node, List *fdw_exprs, int numParams, @@ -2144,14 +2158,15 @@ postgresPlanDirectModify(PlannerInfo *root, if (subplan->qual != NIL) return false; - /* - * We can't handle an UPDATE or DELETE on a foreign join for now. - */ - if (fscan->scan.scanrelid == 0) - return false; - /* Safe to fetch data about the target foreign rel */ - foreignrel = root->simple_rel_array[resultRelation]; + if (fscan->scan.scanrelid == 0) + { + foreignrel = find_join_rel(root, fscan->fs_relids); + /* We should have a rel for this foreign join. */ + Assert(foreignrel); + } + else + foreignrel = root->simple_rel_array[resultRelation]; rte = root->simple_rte_array[resultRelation]; fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private; @@ -2212,8 +2227,23 @@ postgresPlanDirectModify(PlannerInfo *root, * Extract the relevant RETURNING list if any. */ if (plan->returningLists) + { returningList = (List *) list_nth(plan->returningLists, subplan_index); + /* + * When performing an UPDATE/DELETE .. RETURNING on a join directly, + * we fetch from the foreign server any Vars specified in RETURNING + * that refer not only to the target relation but to non-target + * relations. So we'll deparse them into the RETURNING clause of the + * remote query; use a targetlist consisting of them instead, which + * will be adjusted to be new fdw_scan_tlist of the foreign-scan plan + * node below. + */ + if (fscan->scan.scanrelid == 0) + returningList = build_remote_returning(resultRelation, rel, + returningList); + } + /* * Construct the SQL command string. */ @@ -2221,6 +2251,7 @@ postgresPlanDirectModify(PlannerInfo *root, { case CMD_UPDATE: deparseDirectUpdateSql(&sql, root, resultRelation, rel, + foreignrel, ((Plan *) fscan)->targetlist, targetAttrs, remote_exprs, ¶ms_list, @@ -2228,6 +2259,7 @@ postgresPlanDirectModify(PlannerInfo *root, break; case CMD_DELETE: deparseDirectDeleteSql(&sql, root, resultRelation, rel, + foreignrel, remote_exprs, ¶ms_list, returningList, &retrieved_attrs); break; @@ -2255,6 +2287,19 @@ postgresPlanDirectModify(PlannerInfo *root, retrieved_attrs, makeInteger(plan->canSetTag)); + /* + * Update the foreign-join-related fields. + */ + if (fscan->scan.scanrelid == 0) + { + /* No need for the outer subplan. */ + fscan->scan.plan.lefttree = NULL; + + /* Build new fdw_scan_tlist if UPDATE/DELETE .. RETURNING. */ + if (returningList) + rebuild_fdw_scan_tlist(fscan, returningList); + } + heap_close(rel, NoLock); return true; } @@ -2269,6 +2314,7 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags) ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan; EState *estate = node->ss.ps.state; PgFdwDirectModifyState *dmstate; + Index rtindex; RangeTblEntry *rte; Oid userid; ForeignTable *table; @@ -2291,11 +2337,15 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags) * Identify which user to do the remote access as. This should match what * ExecCheckRTEPerms() does. */ - rte = rt_fetch(fsplan->scan.scanrelid, estate->es_range_table); + rtindex = estate->es_result_relation_info->ri_RangeTableIndex; + rte = rt_fetch(rtindex, estate->es_range_table); userid = rte->checkAsUser ? rte->checkAsUser : GetUserId(); /* Get info about foreign table. */ - dmstate->rel = node->ss.ss_currentRelation; + if (fsplan->scan.scanrelid == 0) + dmstate->rel = ExecOpenScanRelation(estate, rtindex, eflags); + else + dmstate->rel = node->ss.ss_currentRelation; table = GetForeignTable(RelationGetRelid(dmstate->rel)); user = GetUserMapping(userid, table->serverid); @@ -2305,6 +2355,21 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags) */ dmstate->conn = GetConnection(user, false); + /* Update the foreign-join-related fields. */ + if (fsplan->scan.scanrelid == 0) + { + /* Save info about foreign table. */ + dmstate->resultRel = dmstate->rel; + + /* + * Set dmstate->rel to NULL to teach get_returning_data() and + * make_tuple_from_result_row() that columns fetched from the remote + * server are described by fdw_scan_tlist of the foreign-scan plan + * node, not the tuple descriptor for the target relation. + */ + dmstate->rel = NULL; + } + /* Initialize state variable */ dmstate->num_tuples = -1; /* -1 means not set yet */ @@ -2325,7 +2390,24 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags) /* Prepare for input conversion of RETURNING results. */ if (dmstate->has_returning) - dmstate->attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(dmstate->rel)); + { + TupleDesc tupdesc; + + if (fsplan->scan.scanrelid == 0) + tupdesc = node->ss.ss_ScanTupleSlot->tts_tupleDescriptor; + else + tupdesc = RelationGetDescr(dmstate->rel); + + dmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc); + + /* + * When performing an UPDATE/DELETE .. RETURNING on a join directly, + * initialize a filter to extract an updated/deleted tuple from a scan + * tuple. + */ + if (fsplan->scan.scanrelid == 0) + init_returning_filter(dmstate, fsplan->fdw_scan_tlist, rtindex); + } /* * Prepare for processing of parameters used in remote query, if any. @@ -2406,6 +2488,10 @@ postgresEndDirectModify(ForeignScanState *node) ReleaseConnection(dmstate->conn); dmstate->conn = NULL; + /* close the target relation. */ + if (dmstate->resultRel) + ExecCloseScanRelation(dmstate->resultRel); + /* MemoryContext will be deleted automatically. */ } @@ -3273,6 +3359,136 @@ store_returning_result(PgFdwModifyState *fmstate, } /* + * build_remote_returning + * Build a RETURNING targetlist of a remote query for performing an + * UPDATE/DELETE .. RETURNING on a join directly + */ +static List * +build_remote_returning(Index rtindex, Relation rel, List *returningList) +{ + bool have_wholerow = false; + List *tlist = NIL; + List *vars; + ListCell *lc; + + Assert(returningList); + + vars = pull_var_clause((Node *) returningList, PVC_INCLUDE_PLACEHOLDERS); + + /* + * If there's a whole-row reference to the target relation, then we'll + * need all the columns of the relation. + */ + foreach(lc, vars) + { + Var *var = (Var *) lfirst(lc); + + if (IsA(var, Var) && + var->varno == rtindex && + var->varattno == InvalidAttrNumber) + { + have_wholerow = true; + break; + } + } + + if (have_wholerow) + { + TupleDesc tupdesc = RelationGetDescr(rel); + int i; + + for (i = 1; i <= tupdesc->natts; i++) + { + Form_pg_attribute attr = TupleDescAttr(tupdesc, i - 1); + Var *var; + + /* Ignore dropped attributes. */ + if (attr->attisdropped) + continue; + + var = makeVar(rtindex, + i, + attr->atttypid, + attr->atttypmod, + attr->attcollation, + 0); + + tlist = lappend(tlist, + makeTargetEntry((Expr *) var, + list_length(tlist) + 1, + NULL, + false)); + } + } + + /* Now add any remaining columns to tlist. */ + foreach(lc, vars) + { + Var *var = (Var *) lfirst(lc); + + /* + * No need for whole-row references to the target relation. We don't + * need system columns other than ctid and oid either, since those are + * set locally. + */ + if (IsA(var, Var) && + var->varno == rtindex && + var->varattno <= InvalidAttrNumber && + var->varattno != SelfItemPointerAttributeNumber && + var->varattno != ObjectIdAttributeNumber) + continue; /* don't need it */ + + if (tlist_member((Expr *) var, tlist)) + continue; /* already got it */ + + tlist = lappend(tlist, + makeTargetEntry((Expr *) var, + list_length(tlist) + 1, + NULL, + false)); + } + + list_free(vars); + + return tlist; +} + +/* + * rebuild_fdw_scan_tlist + * Build new fdw_scan_tlist of given foreign-scan plan node from given + * tlist + * + * There might be columns that the fdw_scan_tlist of the given foreign-scan + * plan node contains that the given tlist doesn't. The fdw_scan_tlist would + * have contained resjunk columns such as 'ctid' of the target relation and + * 'wholerow' of non-target relations, but the tlist might not contain them, + * for example. So, adjust the tlist so it contains all the columns specified + * in the fdw_scan_tlist; else setrefs.c will get confused. + */ +static void +rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist) +{ + List *new_tlist = tlist; + List *old_tlist = fscan->fdw_scan_tlist; + ListCell *lc; + + foreach(lc, old_tlist) + { + TargetEntry *tle = (TargetEntry *) lfirst(lc); + + if (tlist_member(tle->expr, new_tlist)) + continue; /* already got it */ + + new_tlist = lappend(new_tlist, + makeTargetEntry(tle->expr, + list_length(new_tlist) + 1, + NULL, + false)); + } + fscan->fdw_scan_tlist = new_tlist; +} + +/* * Execute a direct UPDATE/DELETE statement. */ static void @@ -3332,6 +3548,7 @@ get_returning_data(ForeignScanState *node) EState *estate = node->ss.ps.state; ResultRelInfo *resultRelInfo = estate->es_result_relation_info; TupleTableSlot *slot = node->ss.ss_ScanTupleSlot; + TupleTableSlot *resultSlot; Assert(resultRelInfo->ri_projectReturning); @@ -3349,7 +3566,10 @@ get_returning_data(ForeignScanState *node) * "UPDATE/DELETE .. RETURNING 1" for example.) */ if (!dmstate->has_returning) + { ExecStoreAllNullTuple(slot); + resultSlot = slot; + } else { /* @@ -3365,7 +3585,7 @@ get_returning_data(ForeignScanState *node) dmstate->rel, dmstate->attinmeta, dmstate->retrieved_attrs, - NULL, + node, dmstate->temp_cxt); ExecStoreTuple(newtup, slot, InvalidBuffer, false); } @@ -3376,16 +3596,205 @@ get_returning_data(ForeignScanState *node) PG_RE_THROW(); } PG_END_TRY(); + + /* Get the updated/deleted tuple. */ + if (dmstate->rel) + resultSlot = slot; + else + resultSlot = apply_returning_filter(dmstate, slot, estate); } dmstate->next_tuple++; /* Make slot available for evaluation of the local query RETURNING list. */ - resultRelInfo->ri_projectReturning->pi_exprContext->ecxt_scantuple = slot; + resultRelInfo->ri_projectReturning->pi_exprContext->ecxt_scantuple = + resultSlot; return slot; } /* + * Initialize a filter to extract an updated/deleted tuple from a scan tuple. + */ +static void +init_returning_filter(PgFdwDirectModifyState *dmstate, + List *fdw_scan_tlist, + Index rtindex) +{ + TupleDesc resultTupType = RelationGetDescr(dmstate->resultRel); + ListCell *lc; + int i; + + /* + * Calculate the mapping between the fdw_scan_tlist's entries and the + * result tuple's attributes. + * + * The "map" is an array of indexes of the result tuple's attributes in + * fdw_scan_tlist, i.e., one entry for every attribute of the result + * tuple. We store zero for any attributes that don't have the + * corresponding entries in that list, marking that a NULL is needed in + * the result tuple. + * + * Also get the indexes of the entries for ctid and oid if any. + */ + dmstate->attnoMap = (AttrNumber *) + palloc0(resultTupType->natts * sizeof(AttrNumber)); + + dmstate->ctidAttno = dmstate->oidAttno = 0; + + i = 1; + dmstate->hasSystemCols = false; + foreach(lc, fdw_scan_tlist) + { + TargetEntry *tle = (TargetEntry *) lfirst(lc); + Var *var = (Var *) tle->expr; + + Assert(IsA(var, Var)); + + /* + * If the Var is a column of the target relation to be retrieved from + * the foreign server, get the index of the entry. + */ + if (var->varno == rtindex && + list_member_int(dmstate->retrieved_attrs, i)) + { + int attrno = var->varattno; + + if (attrno < 0) + { + /* + * We don't retrieve system columns other than ctid and oid. + */ + if (attrno == SelfItemPointerAttributeNumber) + dmstate->ctidAttno = i; + else if (attrno == ObjectIdAttributeNumber) + dmstate->oidAttno = i; + else + Assert(false); + dmstate->hasSystemCols = true; + } + else + { + /* + * We don't retrieve whole-row references to the target + * relation either. + */ + Assert(attrno > 0); + + dmstate->attnoMap[attrno - 1] = i; + } + } + i++; + } +} + +/* + * Extract and return an updated/deleted tuple from a scan tuple. + */ +static TupleTableSlot * +apply_returning_filter(PgFdwDirectModifyState *dmstate, + TupleTableSlot *slot, + EState *estate) +{ + TupleDesc resultTupType = RelationGetDescr(dmstate->resultRel); + TupleTableSlot *resultSlot; + Datum *values; + bool *isnull; + Datum *old_values; + bool *old_isnull; + int i; + + /* + * Use the trigger tuple slot as a place to store the result tuple. + */ + resultSlot = estate->es_trig_tuple_slot; + if (resultSlot->tts_tupleDescriptor != resultTupType) + ExecSetSlotDescriptor(resultSlot, resultTupType); + + /* + * Extract all the values of the scan tuple. + */ + slot_getallattrs(slot); + old_values = slot->tts_values; + old_isnull = slot->tts_isnull; + + /* + * Prepare to build the result tuple. + */ + ExecClearTuple(resultSlot); + values = resultSlot->tts_values; + isnull = resultSlot->tts_isnull; + + /* + * Transpose data into proper fields of the result tuple. + */ + for (i = 0; i < resultTupType->natts; i++) + { + int j = dmstate->attnoMap[i]; + + if (j == 0) + { + values[i] = (Datum) 0; + isnull[i] = true; + } + else + { + values[i] = old_values[j - 1]; + isnull[i] = old_isnull[j - 1]; + } + } + + /* + * Build the virtual tuple. + */ + ExecStoreVirtualTuple(resultSlot); + + /* + * If we have any system columns to return, install them. + */ + if (dmstate->hasSystemCols) + { + HeapTuple resultTup = ExecMaterializeSlot(resultSlot); + + /* ctid */ + if (dmstate->ctidAttno) + { + ItemPointer ctid = NULL; + + ctid = (ItemPointer) DatumGetPointer(old_values[dmstate->ctidAttno - 1]); + resultTup->t_self = *ctid; + } + + /* oid */ + if (dmstate->oidAttno) + { + Oid oid = InvalidOid; + + oid = DatumGetObjectId(old_values[dmstate->oidAttno - 1]); + HeapTupleSetOid(resultTup, oid); + } + + /* + * And remaining columns + * + * Note: since we currently don't allow the target relation to appear + * on the nullable side of an outer join, any system columns wouldn't + * go to NULL. + * + * Note: no need to care about tableoid here because it will be + * initialized in ExecProcessReturning(). + */ + HeapTupleHeaderSetXmin(resultTup->t_data, InvalidTransactionId); + HeapTupleHeaderSetXmax(resultTup->t_data, InvalidTransactionId); + HeapTupleHeaderSetCmin(resultTup->t_data, InvalidTransactionId); + } + + /* + * And return the result tuple. + */ + return resultSlot; +} + +/* * Prepare for processing of parameters used in remote query. */ static void @@ -4954,11 +5363,8 @@ make_tuple_from_result_row(PGresult *res, tupdesc = RelationGetDescr(rel); else { - PgFdwScanState *fdw_sstate; - Assert(fsstate); - fdw_sstate = (PgFdwScanState *) fsstate->fdw_state; - tupdesc = fdw_sstate->tupdesc; + tupdesc = fsstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor; } values = (Datum *) palloc0(tupdesc->natts * sizeof(Datum)); |