diff options
Diffstat (limited to 'contrib/postgres_fdw/postgres_fdw.c')
-rw-r--r-- | contrib/postgres_fdw/postgres_fdw.c | 438 |
1 files changed, 422 insertions, 16 deletions
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c index 7ff43337a9a..c1d7f8032e5 100644 --- a/contrib/postgres_fdw/postgres_fdw.c +++ b/contrib/postgres_fdw/postgres_fdw.c @@ -210,6 +210,11 @@ typedef struct PgFdwDirectModifyState PGresult *result; /* result for query */ int num_tuples; /* # of result tuples */ int next_tuple; /* index of next one to return */ + Relation resultRel; /* relcache entry for the target relation */ + AttrNumber *attnoMap; /* array of attnums of input user columns */ + AttrNumber ctidAttno; /* attnum of input ctid column */ + AttrNumber oidAttno; /* attnum of input oid column */ + bool hasSystemCols; /* are there system columns of resultRel? */ /* working memory context */ MemoryContext temp_cxt; /* context for per-tuple temporary data */ @@ -376,8 +381,17 @@ static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate, TupleTableSlot *slot); static void store_returning_result(PgFdwModifyState *fmstate, TupleTableSlot *slot, PGresult *res); +static List *build_remote_returning(Index rtindex, Relation rel, + List *returningList); +static void rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist); static void execute_dml_stmt(ForeignScanState *node); static TupleTableSlot *get_returning_data(ForeignScanState *node); +static void init_returning_filter(PgFdwDirectModifyState *dmstate, + List *fdw_scan_tlist, + Index rtindex); +static TupleTableSlot *apply_returning_filter(PgFdwDirectModifyState *dmstate, + TupleTableSlot *slot, + EState *estate); static void prepare_query_params(PlanState *node, List *fdw_exprs, int numParams, @@ -2144,14 +2158,15 @@ postgresPlanDirectModify(PlannerInfo *root, if (subplan->qual != NIL) return false; - /* - * We can't handle an UPDATE or DELETE on a foreign join for now. - */ - if (fscan->scan.scanrelid == 0) - return false; - /* Safe to fetch data about the target foreign rel */ - foreignrel = root->simple_rel_array[resultRelation]; + if (fscan->scan.scanrelid == 0) + { + foreignrel = find_join_rel(root, fscan->fs_relids); + /* We should have a rel for this foreign join. */ + Assert(foreignrel); + } + else + foreignrel = root->simple_rel_array[resultRelation]; rte = root->simple_rte_array[resultRelation]; fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private; @@ -2212,8 +2227,23 @@ postgresPlanDirectModify(PlannerInfo *root, * Extract the relevant RETURNING list if any. */ if (plan->returningLists) + { returningList = (List *) list_nth(plan->returningLists, subplan_index); + /* + * When performing an UPDATE/DELETE .. RETURNING on a join directly, + * we fetch from the foreign server any Vars specified in RETURNING + * that refer not only to the target relation but to non-target + * relations. So we'll deparse them into the RETURNING clause of the + * remote query; use a targetlist consisting of them instead, which + * will be adjusted to be new fdw_scan_tlist of the foreign-scan plan + * node below. + */ + if (fscan->scan.scanrelid == 0) + returningList = build_remote_returning(resultRelation, rel, + returningList); + } + /* * Construct the SQL command string. */ @@ -2221,6 +2251,7 @@ postgresPlanDirectModify(PlannerInfo *root, { case CMD_UPDATE: deparseDirectUpdateSql(&sql, root, resultRelation, rel, + foreignrel, ((Plan *) fscan)->targetlist, targetAttrs, remote_exprs, ¶ms_list, @@ -2228,6 +2259,7 @@ postgresPlanDirectModify(PlannerInfo *root, break; case CMD_DELETE: deparseDirectDeleteSql(&sql, root, resultRelation, rel, + foreignrel, remote_exprs, ¶ms_list, returningList, &retrieved_attrs); break; @@ -2255,6 +2287,19 @@ postgresPlanDirectModify(PlannerInfo *root, retrieved_attrs, makeInteger(plan->canSetTag)); + /* + * Update the foreign-join-related fields. + */ + if (fscan->scan.scanrelid == 0) + { + /* No need for the outer subplan. */ + fscan->scan.plan.lefttree = NULL; + + /* Build new fdw_scan_tlist if UPDATE/DELETE .. RETURNING. */ + if (returningList) + rebuild_fdw_scan_tlist(fscan, returningList); + } + heap_close(rel, NoLock); return true; } @@ -2269,6 +2314,7 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags) ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan; EState *estate = node->ss.ps.state; PgFdwDirectModifyState *dmstate; + Index rtindex; RangeTblEntry *rte; Oid userid; ForeignTable *table; @@ -2291,11 +2337,15 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags) * Identify which user to do the remote access as. This should match what * ExecCheckRTEPerms() does. */ - rte = rt_fetch(fsplan->scan.scanrelid, estate->es_range_table); + rtindex = estate->es_result_relation_info->ri_RangeTableIndex; + rte = rt_fetch(rtindex, estate->es_range_table); userid = rte->checkAsUser ? rte->checkAsUser : GetUserId(); /* Get info about foreign table. */ - dmstate->rel = node->ss.ss_currentRelation; + if (fsplan->scan.scanrelid == 0) + dmstate->rel = ExecOpenScanRelation(estate, rtindex, eflags); + else + dmstate->rel = node->ss.ss_currentRelation; table = GetForeignTable(RelationGetRelid(dmstate->rel)); user = GetUserMapping(userid, table->serverid); @@ -2305,6 +2355,21 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags) */ dmstate->conn = GetConnection(user, false); + /* Update the foreign-join-related fields. */ + if (fsplan->scan.scanrelid == 0) + { + /* Save info about foreign table. */ + dmstate->resultRel = dmstate->rel; + + /* + * Set dmstate->rel to NULL to teach get_returning_data() and + * make_tuple_from_result_row() that columns fetched from the remote + * server are described by fdw_scan_tlist of the foreign-scan plan + * node, not the tuple descriptor for the target relation. + */ + dmstate->rel = NULL; + } + /* Initialize state variable */ dmstate->num_tuples = -1; /* -1 means not set yet */ @@ -2325,7 +2390,24 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags) /* Prepare for input conversion of RETURNING results. */ if (dmstate->has_returning) - dmstate->attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(dmstate->rel)); + { + TupleDesc tupdesc; + + if (fsplan->scan.scanrelid == 0) + tupdesc = node->ss.ss_ScanTupleSlot->tts_tupleDescriptor; + else + tupdesc = RelationGetDescr(dmstate->rel); + + dmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc); + + /* + * When performing an UPDATE/DELETE .. RETURNING on a join directly, + * initialize a filter to extract an updated/deleted tuple from a scan + * tuple. + */ + if (fsplan->scan.scanrelid == 0) + init_returning_filter(dmstate, fsplan->fdw_scan_tlist, rtindex); + } /* * Prepare for processing of parameters used in remote query, if any. @@ -2406,6 +2488,10 @@ postgresEndDirectModify(ForeignScanState *node) ReleaseConnection(dmstate->conn); dmstate->conn = NULL; + /* close the target relation. */ + if (dmstate->resultRel) + ExecCloseScanRelation(dmstate->resultRel); + /* MemoryContext will be deleted automatically. */ } @@ -3273,6 +3359,136 @@ store_returning_result(PgFdwModifyState *fmstate, } /* + * build_remote_returning + * Build a RETURNING targetlist of a remote query for performing an + * UPDATE/DELETE .. RETURNING on a join directly + */ +static List * +build_remote_returning(Index rtindex, Relation rel, List *returningList) +{ + bool have_wholerow = false; + List *tlist = NIL; + List *vars; + ListCell *lc; + + Assert(returningList); + + vars = pull_var_clause((Node *) returningList, PVC_INCLUDE_PLACEHOLDERS); + + /* + * If there's a whole-row reference to the target relation, then we'll + * need all the columns of the relation. + */ + foreach(lc, vars) + { + Var *var = (Var *) lfirst(lc); + + if (IsA(var, Var) && + var->varno == rtindex && + var->varattno == InvalidAttrNumber) + { + have_wholerow = true; + break; + } + } + + if (have_wholerow) + { + TupleDesc tupdesc = RelationGetDescr(rel); + int i; + + for (i = 1; i <= tupdesc->natts; i++) + { + Form_pg_attribute attr = TupleDescAttr(tupdesc, i - 1); + Var *var; + + /* Ignore dropped attributes. */ + if (attr->attisdropped) + continue; + + var = makeVar(rtindex, + i, + attr->atttypid, + attr->atttypmod, + attr->attcollation, + 0); + + tlist = lappend(tlist, + makeTargetEntry((Expr *) var, + list_length(tlist) + 1, + NULL, + false)); + } + } + + /* Now add any remaining columns to tlist. */ + foreach(lc, vars) + { + Var *var = (Var *) lfirst(lc); + + /* + * No need for whole-row references to the target relation. We don't + * need system columns other than ctid and oid either, since those are + * set locally. + */ + if (IsA(var, Var) && + var->varno == rtindex && + var->varattno <= InvalidAttrNumber && + var->varattno != SelfItemPointerAttributeNumber && + var->varattno != ObjectIdAttributeNumber) + continue; /* don't need it */ + + if (tlist_member((Expr *) var, tlist)) + continue; /* already got it */ + + tlist = lappend(tlist, + makeTargetEntry((Expr *) var, + list_length(tlist) + 1, + NULL, + false)); + } + + list_free(vars); + + return tlist; +} + +/* + * rebuild_fdw_scan_tlist + * Build new fdw_scan_tlist of given foreign-scan plan node from given + * tlist + * + * There might be columns that the fdw_scan_tlist of the given foreign-scan + * plan node contains that the given tlist doesn't. The fdw_scan_tlist would + * have contained resjunk columns such as 'ctid' of the target relation and + * 'wholerow' of non-target relations, but the tlist might not contain them, + * for example. So, adjust the tlist so it contains all the columns specified + * in the fdw_scan_tlist; else setrefs.c will get confused. + */ +static void +rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist) +{ + List *new_tlist = tlist; + List *old_tlist = fscan->fdw_scan_tlist; + ListCell *lc; + + foreach(lc, old_tlist) + { + TargetEntry *tle = (TargetEntry *) lfirst(lc); + + if (tlist_member(tle->expr, new_tlist)) + continue; /* already got it */ + + new_tlist = lappend(new_tlist, + makeTargetEntry(tle->expr, + list_length(new_tlist) + 1, + NULL, + false)); + } + fscan->fdw_scan_tlist = new_tlist; +} + +/* * Execute a direct UPDATE/DELETE statement. */ static void @@ -3332,6 +3548,7 @@ get_returning_data(ForeignScanState *node) EState *estate = node->ss.ps.state; ResultRelInfo *resultRelInfo = estate->es_result_relation_info; TupleTableSlot *slot = node->ss.ss_ScanTupleSlot; + TupleTableSlot *resultSlot; Assert(resultRelInfo->ri_projectReturning); @@ -3349,7 +3566,10 @@ get_returning_data(ForeignScanState *node) * "UPDATE/DELETE .. RETURNING 1" for example.) */ if (!dmstate->has_returning) + { ExecStoreAllNullTuple(slot); + resultSlot = slot; + } else { /* @@ -3365,7 +3585,7 @@ get_returning_data(ForeignScanState *node) dmstate->rel, dmstate->attinmeta, dmstate->retrieved_attrs, - NULL, + node, dmstate->temp_cxt); ExecStoreTuple(newtup, slot, InvalidBuffer, false); } @@ -3376,16 +3596,205 @@ get_returning_data(ForeignScanState *node) PG_RE_THROW(); } PG_END_TRY(); + + /* Get the updated/deleted tuple. */ + if (dmstate->rel) + resultSlot = slot; + else + resultSlot = apply_returning_filter(dmstate, slot, estate); } dmstate->next_tuple++; /* Make slot available for evaluation of the local query RETURNING list. */ - resultRelInfo->ri_projectReturning->pi_exprContext->ecxt_scantuple = slot; + resultRelInfo->ri_projectReturning->pi_exprContext->ecxt_scantuple = + resultSlot; return slot; } /* + * Initialize a filter to extract an updated/deleted tuple from a scan tuple. + */ +static void +init_returning_filter(PgFdwDirectModifyState *dmstate, + List *fdw_scan_tlist, + Index rtindex) +{ + TupleDesc resultTupType = RelationGetDescr(dmstate->resultRel); + ListCell *lc; + int i; + + /* + * Calculate the mapping between the fdw_scan_tlist's entries and the + * result tuple's attributes. + * + * The "map" is an array of indexes of the result tuple's attributes in + * fdw_scan_tlist, i.e., one entry for every attribute of the result + * tuple. We store zero for any attributes that don't have the + * corresponding entries in that list, marking that a NULL is needed in + * the result tuple. + * + * Also get the indexes of the entries for ctid and oid if any. + */ + dmstate->attnoMap = (AttrNumber *) + palloc0(resultTupType->natts * sizeof(AttrNumber)); + + dmstate->ctidAttno = dmstate->oidAttno = 0; + + i = 1; + dmstate->hasSystemCols = false; + foreach(lc, fdw_scan_tlist) + { + TargetEntry *tle = (TargetEntry *) lfirst(lc); + Var *var = (Var *) tle->expr; + + Assert(IsA(var, Var)); + + /* + * If the Var is a column of the target relation to be retrieved from + * the foreign server, get the index of the entry. + */ + if (var->varno == rtindex && + list_member_int(dmstate->retrieved_attrs, i)) + { + int attrno = var->varattno; + + if (attrno < 0) + { + /* + * We don't retrieve system columns other than ctid and oid. + */ + if (attrno == SelfItemPointerAttributeNumber) + dmstate->ctidAttno = i; + else if (attrno == ObjectIdAttributeNumber) + dmstate->oidAttno = i; + else + Assert(false); + dmstate->hasSystemCols = true; + } + else + { + /* + * We don't retrieve whole-row references to the target + * relation either. + */ + Assert(attrno > 0); + + dmstate->attnoMap[attrno - 1] = i; + } + } + i++; + } +} + +/* + * Extract and return an updated/deleted tuple from a scan tuple. + */ +static TupleTableSlot * +apply_returning_filter(PgFdwDirectModifyState *dmstate, + TupleTableSlot *slot, + EState *estate) +{ + TupleDesc resultTupType = RelationGetDescr(dmstate->resultRel); + TupleTableSlot *resultSlot; + Datum *values; + bool *isnull; + Datum *old_values; + bool *old_isnull; + int i; + + /* + * Use the trigger tuple slot as a place to store the result tuple. + */ + resultSlot = estate->es_trig_tuple_slot; + if (resultSlot->tts_tupleDescriptor != resultTupType) + ExecSetSlotDescriptor(resultSlot, resultTupType); + + /* + * Extract all the values of the scan tuple. + */ + slot_getallattrs(slot); + old_values = slot->tts_values; + old_isnull = slot->tts_isnull; + + /* + * Prepare to build the result tuple. + */ + ExecClearTuple(resultSlot); + values = resultSlot->tts_values; + isnull = resultSlot->tts_isnull; + + /* + * Transpose data into proper fields of the result tuple. + */ + for (i = 0; i < resultTupType->natts; i++) + { + int j = dmstate->attnoMap[i]; + + if (j == 0) + { + values[i] = (Datum) 0; + isnull[i] = true; + } + else + { + values[i] = old_values[j - 1]; + isnull[i] = old_isnull[j - 1]; + } + } + + /* + * Build the virtual tuple. + */ + ExecStoreVirtualTuple(resultSlot); + + /* + * If we have any system columns to return, install them. + */ + if (dmstate->hasSystemCols) + { + HeapTuple resultTup = ExecMaterializeSlot(resultSlot); + + /* ctid */ + if (dmstate->ctidAttno) + { + ItemPointer ctid = NULL; + + ctid = (ItemPointer) DatumGetPointer(old_values[dmstate->ctidAttno - 1]); + resultTup->t_self = *ctid; + } + + /* oid */ + if (dmstate->oidAttno) + { + Oid oid = InvalidOid; + + oid = DatumGetObjectId(old_values[dmstate->oidAttno - 1]); + HeapTupleSetOid(resultTup, oid); + } + + /* + * And remaining columns + * + * Note: since we currently don't allow the target relation to appear + * on the nullable side of an outer join, any system columns wouldn't + * go to NULL. + * + * Note: no need to care about tableoid here because it will be + * initialized in ExecProcessReturning(). + */ + HeapTupleHeaderSetXmin(resultTup->t_data, InvalidTransactionId); + HeapTupleHeaderSetXmax(resultTup->t_data, InvalidTransactionId); + HeapTupleHeaderSetCmin(resultTup->t_data, InvalidTransactionId); + } + + /* + * And return the result tuple. + */ + return resultSlot; +} + +/* * Prepare for processing of parameters used in remote query. */ static void @@ -4954,11 +5363,8 @@ make_tuple_from_result_row(PGresult *res, tupdesc = RelationGetDescr(rel); else { - PgFdwScanState *fdw_sstate; - Assert(fsstate); - fdw_sstate = (PgFdwScanState *) fsstate->fdw_state; - tupdesc = fdw_sstate->tupdesc; + tupdesc = fsstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor; } values = (Datum *) palloc0(tupdesc->natts * sizeof(Datum)); |