aboutsummaryrefslogtreecommitdiff
path: root/contrib/postgres_fdw/postgres_fdw.c
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/postgres_fdw/postgres_fdw.c')
-rw-r--r--contrib/postgres_fdw/postgres_fdw.c438
1 files changed, 422 insertions, 16 deletions
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index 7ff43337a9a..c1d7f8032e5 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -210,6 +210,11 @@ typedef struct PgFdwDirectModifyState
PGresult *result; /* result for query */
int num_tuples; /* # of result tuples */
int next_tuple; /* index of next one to return */
+ Relation resultRel; /* relcache entry for the target relation */
+ AttrNumber *attnoMap; /* array of attnums of input user columns */
+ AttrNumber ctidAttno; /* attnum of input ctid column */
+ AttrNumber oidAttno; /* attnum of input oid column */
+ bool hasSystemCols; /* are there system columns of resultRel? */
/* working memory context */
MemoryContext temp_cxt; /* context for per-tuple temporary data */
@@ -376,8 +381,17 @@ static const char **convert_prep_stmt_params(PgFdwModifyState *fmstate,
TupleTableSlot *slot);
static void store_returning_result(PgFdwModifyState *fmstate,
TupleTableSlot *slot, PGresult *res);
+static List *build_remote_returning(Index rtindex, Relation rel,
+ List *returningList);
+static void rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist);
static void execute_dml_stmt(ForeignScanState *node);
static TupleTableSlot *get_returning_data(ForeignScanState *node);
+static void init_returning_filter(PgFdwDirectModifyState *dmstate,
+ List *fdw_scan_tlist,
+ Index rtindex);
+static TupleTableSlot *apply_returning_filter(PgFdwDirectModifyState *dmstate,
+ TupleTableSlot *slot,
+ EState *estate);
static void prepare_query_params(PlanState *node,
List *fdw_exprs,
int numParams,
@@ -2144,14 +2158,15 @@ postgresPlanDirectModify(PlannerInfo *root,
if (subplan->qual != NIL)
return false;
- /*
- * We can't handle an UPDATE or DELETE on a foreign join for now.
- */
- if (fscan->scan.scanrelid == 0)
- return false;
-
/* Safe to fetch data about the target foreign rel */
- foreignrel = root->simple_rel_array[resultRelation];
+ if (fscan->scan.scanrelid == 0)
+ {
+ foreignrel = find_join_rel(root, fscan->fs_relids);
+ /* We should have a rel for this foreign join. */
+ Assert(foreignrel);
+ }
+ else
+ foreignrel = root->simple_rel_array[resultRelation];
rte = root->simple_rte_array[resultRelation];
fpinfo = (PgFdwRelationInfo *) foreignrel->fdw_private;
@@ -2212,8 +2227,23 @@ postgresPlanDirectModify(PlannerInfo *root,
* Extract the relevant RETURNING list if any.
*/
if (plan->returningLists)
+ {
returningList = (List *) list_nth(plan->returningLists, subplan_index);
+ /*
+ * When performing an UPDATE/DELETE .. RETURNING on a join directly,
+ * we fetch from the foreign server any Vars specified in RETURNING
+ * that refer not only to the target relation but to non-target
+ * relations. So we'll deparse them into the RETURNING clause of the
+ * remote query; use a targetlist consisting of them instead, which
+ * will be adjusted to be new fdw_scan_tlist of the foreign-scan plan
+ * node below.
+ */
+ if (fscan->scan.scanrelid == 0)
+ returningList = build_remote_returning(resultRelation, rel,
+ returningList);
+ }
+
/*
* Construct the SQL command string.
*/
@@ -2221,6 +2251,7 @@ postgresPlanDirectModify(PlannerInfo *root,
{
case CMD_UPDATE:
deparseDirectUpdateSql(&sql, root, resultRelation, rel,
+ foreignrel,
((Plan *) fscan)->targetlist,
targetAttrs,
remote_exprs, &params_list,
@@ -2228,6 +2259,7 @@ postgresPlanDirectModify(PlannerInfo *root,
break;
case CMD_DELETE:
deparseDirectDeleteSql(&sql, root, resultRelation, rel,
+ foreignrel,
remote_exprs, &params_list,
returningList, &retrieved_attrs);
break;
@@ -2255,6 +2287,19 @@ postgresPlanDirectModify(PlannerInfo *root,
retrieved_attrs,
makeInteger(plan->canSetTag));
+ /*
+ * Update the foreign-join-related fields.
+ */
+ if (fscan->scan.scanrelid == 0)
+ {
+ /* No need for the outer subplan. */
+ fscan->scan.plan.lefttree = NULL;
+
+ /* Build new fdw_scan_tlist if UPDATE/DELETE .. RETURNING. */
+ if (returningList)
+ rebuild_fdw_scan_tlist(fscan, returningList);
+ }
+
heap_close(rel, NoLock);
return true;
}
@@ -2269,6 +2314,7 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags)
ForeignScan *fsplan = (ForeignScan *) node->ss.ps.plan;
EState *estate = node->ss.ps.state;
PgFdwDirectModifyState *dmstate;
+ Index rtindex;
RangeTblEntry *rte;
Oid userid;
ForeignTable *table;
@@ -2291,11 +2337,15 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags)
* Identify which user to do the remote access as. This should match what
* ExecCheckRTEPerms() does.
*/
- rte = rt_fetch(fsplan->scan.scanrelid, estate->es_range_table);
+ rtindex = estate->es_result_relation_info->ri_RangeTableIndex;
+ rte = rt_fetch(rtindex, estate->es_range_table);
userid = rte->checkAsUser ? rte->checkAsUser : GetUserId();
/* Get info about foreign table. */
- dmstate->rel = node->ss.ss_currentRelation;
+ if (fsplan->scan.scanrelid == 0)
+ dmstate->rel = ExecOpenScanRelation(estate, rtindex, eflags);
+ else
+ dmstate->rel = node->ss.ss_currentRelation;
table = GetForeignTable(RelationGetRelid(dmstate->rel));
user = GetUserMapping(userid, table->serverid);
@@ -2305,6 +2355,21 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags)
*/
dmstate->conn = GetConnection(user, false);
+ /* Update the foreign-join-related fields. */
+ if (fsplan->scan.scanrelid == 0)
+ {
+ /* Save info about foreign table. */
+ dmstate->resultRel = dmstate->rel;
+
+ /*
+ * Set dmstate->rel to NULL to teach get_returning_data() and
+ * make_tuple_from_result_row() that columns fetched from the remote
+ * server are described by fdw_scan_tlist of the foreign-scan plan
+ * node, not the tuple descriptor for the target relation.
+ */
+ dmstate->rel = NULL;
+ }
+
/* Initialize state variable */
dmstate->num_tuples = -1; /* -1 means not set yet */
@@ -2325,7 +2390,24 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags)
/* Prepare for input conversion of RETURNING results. */
if (dmstate->has_returning)
- dmstate->attinmeta = TupleDescGetAttInMetadata(RelationGetDescr(dmstate->rel));
+ {
+ TupleDesc tupdesc;
+
+ if (fsplan->scan.scanrelid == 0)
+ tupdesc = node->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
+ else
+ tupdesc = RelationGetDescr(dmstate->rel);
+
+ dmstate->attinmeta = TupleDescGetAttInMetadata(tupdesc);
+
+ /*
+ * When performing an UPDATE/DELETE .. RETURNING on a join directly,
+ * initialize a filter to extract an updated/deleted tuple from a scan
+ * tuple.
+ */
+ if (fsplan->scan.scanrelid == 0)
+ init_returning_filter(dmstate, fsplan->fdw_scan_tlist, rtindex);
+ }
/*
* Prepare for processing of parameters used in remote query, if any.
@@ -2406,6 +2488,10 @@ postgresEndDirectModify(ForeignScanState *node)
ReleaseConnection(dmstate->conn);
dmstate->conn = NULL;
+ /* close the target relation. */
+ if (dmstate->resultRel)
+ ExecCloseScanRelation(dmstate->resultRel);
+
/* MemoryContext will be deleted automatically. */
}
@@ -3273,6 +3359,136 @@ store_returning_result(PgFdwModifyState *fmstate,
}
/*
+ * build_remote_returning
+ * Build a RETURNING targetlist of a remote query for performing an
+ * UPDATE/DELETE .. RETURNING on a join directly
+ */
+static List *
+build_remote_returning(Index rtindex, Relation rel, List *returningList)
+{
+ bool have_wholerow = false;
+ List *tlist = NIL;
+ List *vars;
+ ListCell *lc;
+
+ Assert(returningList);
+
+ vars = pull_var_clause((Node *) returningList, PVC_INCLUDE_PLACEHOLDERS);
+
+ /*
+ * If there's a whole-row reference to the target relation, then we'll
+ * need all the columns of the relation.
+ */
+ foreach(lc, vars)
+ {
+ Var *var = (Var *) lfirst(lc);
+
+ if (IsA(var, Var) &&
+ var->varno == rtindex &&
+ var->varattno == InvalidAttrNumber)
+ {
+ have_wholerow = true;
+ break;
+ }
+ }
+
+ if (have_wholerow)
+ {
+ TupleDesc tupdesc = RelationGetDescr(rel);
+ int i;
+
+ for (i = 1; i <= tupdesc->natts; i++)
+ {
+ Form_pg_attribute attr = TupleDescAttr(tupdesc, i - 1);
+ Var *var;
+
+ /* Ignore dropped attributes. */
+ if (attr->attisdropped)
+ continue;
+
+ var = makeVar(rtindex,
+ i,
+ attr->atttypid,
+ attr->atttypmod,
+ attr->attcollation,
+ 0);
+
+ tlist = lappend(tlist,
+ makeTargetEntry((Expr *) var,
+ list_length(tlist) + 1,
+ NULL,
+ false));
+ }
+ }
+
+ /* Now add any remaining columns to tlist. */
+ foreach(lc, vars)
+ {
+ Var *var = (Var *) lfirst(lc);
+
+ /*
+ * No need for whole-row references to the target relation. We don't
+ * need system columns other than ctid and oid either, since those are
+ * set locally.
+ */
+ if (IsA(var, Var) &&
+ var->varno == rtindex &&
+ var->varattno <= InvalidAttrNumber &&
+ var->varattno != SelfItemPointerAttributeNumber &&
+ var->varattno != ObjectIdAttributeNumber)
+ continue; /* don't need it */
+
+ if (tlist_member((Expr *) var, tlist))
+ continue; /* already got it */
+
+ tlist = lappend(tlist,
+ makeTargetEntry((Expr *) var,
+ list_length(tlist) + 1,
+ NULL,
+ false));
+ }
+
+ list_free(vars);
+
+ return tlist;
+}
+
+/*
+ * rebuild_fdw_scan_tlist
+ * Build new fdw_scan_tlist of given foreign-scan plan node from given
+ * tlist
+ *
+ * There might be columns that the fdw_scan_tlist of the given foreign-scan
+ * plan node contains that the given tlist doesn't. The fdw_scan_tlist would
+ * have contained resjunk columns such as 'ctid' of the target relation and
+ * 'wholerow' of non-target relations, but the tlist might not contain them,
+ * for example. So, adjust the tlist so it contains all the columns specified
+ * in the fdw_scan_tlist; else setrefs.c will get confused.
+ */
+static void
+rebuild_fdw_scan_tlist(ForeignScan *fscan, List *tlist)
+{
+ List *new_tlist = tlist;
+ List *old_tlist = fscan->fdw_scan_tlist;
+ ListCell *lc;
+
+ foreach(lc, old_tlist)
+ {
+ TargetEntry *tle = (TargetEntry *) lfirst(lc);
+
+ if (tlist_member(tle->expr, new_tlist))
+ continue; /* already got it */
+
+ new_tlist = lappend(new_tlist,
+ makeTargetEntry(tle->expr,
+ list_length(new_tlist) + 1,
+ NULL,
+ false));
+ }
+ fscan->fdw_scan_tlist = new_tlist;
+}
+
+/*
* Execute a direct UPDATE/DELETE statement.
*/
static void
@@ -3332,6 +3548,7 @@ get_returning_data(ForeignScanState *node)
EState *estate = node->ss.ps.state;
ResultRelInfo *resultRelInfo = estate->es_result_relation_info;
TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
+ TupleTableSlot *resultSlot;
Assert(resultRelInfo->ri_projectReturning);
@@ -3349,7 +3566,10 @@ get_returning_data(ForeignScanState *node)
* "UPDATE/DELETE .. RETURNING 1" for example.)
*/
if (!dmstate->has_returning)
+ {
ExecStoreAllNullTuple(slot);
+ resultSlot = slot;
+ }
else
{
/*
@@ -3365,7 +3585,7 @@ get_returning_data(ForeignScanState *node)
dmstate->rel,
dmstate->attinmeta,
dmstate->retrieved_attrs,
- NULL,
+ node,
dmstate->temp_cxt);
ExecStoreTuple(newtup, slot, InvalidBuffer, false);
}
@@ -3376,16 +3596,205 @@ get_returning_data(ForeignScanState *node)
PG_RE_THROW();
}
PG_END_TRY();
+
+ /* Get the updated/deleted tuple. */
+ if (dmstate->rel)
+ resultSlot = slot;
+ else
+ resultSlot = apply_returning_filter(dmstate, slot, estate);
}
dmstate->next_tuple++;
/* Make slot available for evaluation of the local query RETURNING list. */
- resultRelInfo->ri_projectReturning->pi_exprContext->ecxt_scantuple = slot;
+ resultRelInfo->ri_projectReturning->pi_exprContext->ecxt_scantuple =
+ resultSlot;
return slot;
}
/*
+ * Initialize a filter to extract an updated/deleted tuple from a scan tuple.
+ */
+static void
+init_returning_filter(PgFdwDirectModifyState *dmstate,
+ List *fdw_scan_tlist,
+ Index rtindex)
+{
+ TupleDesc resultTupType = RelationGetDescr(dmstate->resultRel);
+ ListCell *lc;
+ int i;
+
+ /*
+ * Calculate the mapping between the fdw_scan_tlist's entries and the
+ * result tuple's attributes.
+ *
+ * The "map" is an array of indexes of the result tuple's attributes in
+ * fdw_scan_tlist, i.e., one entry for every attribute of the result
+ * tuple. We store zero for any attributes that don't have the
+ * corresponding entries in that list, marking that a NULL is needed in
+ * the result tuple.
+ *
+ * Also get the indexes of the entries for ctid and oid if any.
+ */
+ dmstate->attnoMap = (AttrNumber *)
+ palloc0(resultTupType->natts * sizeof(AttrNumber));
+
+ dmstate->ctidAttno = dmstate->oidAttno = 0;
+
+ i = 1;
+ dmstate->hasSystemCols = false;
+ foreach(lc, fdw_scan_tlist)
+ {
+ TargetEntry *tle = (TargetEntry *) lfirst(lc);
+ Var *var = (Var *) tle->expr;
+
+ Assert(IsA(var, Var));
+
+ /*
+ * If the Var is a column of the target relation to be retrieved from
+ * the foreign server, get the index of the entry.
+ */
+ if (var->varno == rtindex &&
+ list_member_int(dmstate->retrieved_attrs, i))
+ {
+ int attrno = var->varattno;
+
+ if (attrno < 0)
+ {
+ /*
+ * We don't retrieve system columns other than ctid and oid.
+ */
+ if (attrno == SelfItemPointerAttributeNumber)
+ dmstate->ctidAttno = i;
+ else if (attrno == ObjectIdAttributeNumber)
+ dmstate->oidAttno = i;
+ else
+ Assert(false);
+ dmstate->hasSystemCols = true;
+ }
+ else
+ {
+ /*
+ * We don't retrieve whole-row references to the target
+ * relation either.
+ */
+ Assert(attrno > 0);
+
+ dmstate->attnoMap[attrno - 1] = i;
+ }
+ }
+ i++;
+ }
+}
+
+/*
+ * Extract and return an updated/deleted tuple from a scan tuple.
+ */
+static TupleTableSlot *
+apply_returning_filter(PgFdwDirectModifyState *dmstate,
+ TupleTableSlot *slot,
+ EState *estate)
+{
+ TupleDesc resultTupType = RelationGetDescr(dmstate->resultRel);
+ TupleTableSlot *resultSlot;
+ Datum *values;
+ bool *isnull;
+ Datum *old_values;
+ bool *old_isnull;
+ int i;
+
+ /*
+ * Use the trigger tuple slot as a place to store the result tuple.
+ */
+ resultSlot = estate->es_trig_tuple_slot;
+ if (resultSlot->tts_tupleDescriptor != resultTupType)
+ ExecSetSlotDescriptor(resultSlot, resultTupType);
+
+ /*
+ * Extract all the values of the scan tuple.
+ */
+ slot_getallattrs(slot);
+ old_values = slot->tts_values;
+ old_isnull = slot->tts_isnull;
+
+ /*
+ * Prepare to build the result tuple.
+ */
+ ExecClearTuple(resultSlot);
+ values = resultSlot->tts_values;
+ isnull = resultSlot->tts_isnull;
+
+ /*
+ * Transpose data into proper fields of the result tuple.
+ */
+ for (i = 0; i < resultTupType->natts; i++)
+ {
+ int j = dmstate->attnoMap[i];
+
+ if (j == 0)
+ {
+ values[i] = (Datum) 0;
+ isnull[i] = true;
+ }
+ else
+ {
+ values[i] = old_values[j - 1];
+ isnull[i] = old_isnull[j - 1];
+ }
+ }
+
+ /*
+ * Build the virtual tuple.
+ */
+ ExecStoreVirtualTuple(resultSlot);
+
+ /*
+ * If we have any system columns to return, install them.
+ */
+ if (dmstate->hasSystemCols)
+ {
+ HeapTuple resultTup = ExecMaterializeSlot(resultSlot);
+
+ /* ctid */
+ if (dmstate->ctidAttno)
+ {
+ ItemPointer ctid = NULL;
+
+ ctid = (ItemPointer) DatumGetPointer(old_values[dmstate->ctidAttno - 1]);
+ resultTup->t_self = *ctid;
+ }
+
+ /* oid */
+ if (dmstate->oidAttno)
+ {
+ Oid oid = InvalidOid;
+
+ oid = DatumGetObjectId(old_values[dmstate->oidAttno - 1]);
+ HeapTupleSetOid(resultTup, oid);
+ }
+
+ /*
+ * And remaining columns
+ *
+ * Note: since we currently don't allow the target relation to appear
+ * on the nullable side of an outer join, any system columns wouldn't
+ * go to NULL.
+ *
+ * Note: no need to care about tableoid here because it will be
+ * initialized in ExecProcessReturning().
+ */
+ HeapTupleHeaderSetXmin(resultTup->t_data, InvalidTransactionId);
+ HeapTupleHeaderSetXmax(resultTup->t_data, InvalidTransactionId);
+ HeapTupleHeaderSetCmin(resultTup->t_data, InvalidTransactionId);
+ }
+
+ /*
+ * And return the result tuple.
+ */
+ return resultSlot;
+}
+
+/*
* Prepare for processing of parameters used in remote query.
*/
static void
@@ -4954,11 +5363,8 @@ make_tuple_from_result_row(PGresult *res,
tupdesc = RelationGetDescr(rel);
else
{
- PgFdwScanState *fdw_sstate;
-
Assert(fsstate);
- fdw_sstate = (PgFdwScanState *) fsstate->fdw_state;
- tupdesc = fdw_sstate->tupdesc;
+ tupdesc = fsstate->ss.ss_ScanTupleSlot->tts_tupleDescriptor;
}
values = (Datum *) palloc0(tupdesc->natts * sizeof(Datum));