aboutsummaryrefslogtreecommitdiff
path: root/src/backend/rewrite/viewUpdate.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/rewrite/viewUpdate.c')
-rw-r--r--src/backend/rewrite/viewUpdate.c1401
1 files changed, 1401 insertions, 0 deletions
diff --git a/src/backend/rewrite/viewUpdate.c b/src/backend/rewrite/viewUpdate.c
new file mode 100644
index 00000000000..3687f19429d
--- /dev/null
+++ b/src/backend/rewrite/viewUpdate.c
@@ -0,0 +1,1401 @@
+/*-------------------------------------------------------------------------
+ *
+ * viewUpdate.c
+ * routines for translating a view definition into
+ * INSERT/UPDATE/DELETE rules (i.e. updatable views).
+ *
+ * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * ORIGINAL AUTHORS
+ * Bernd Helmle, Jaime Casanova
+ *
+ * IDENTIFICATION
+ * $PostgreSQL: pgsql/src/backend/rewrite/viewUpdate.c,v 1.1 2009/01/22 17:27:54 petere Exp $
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "access/heapam.h"
+#include "access/xact.h"
+#include "catalog/pg_operator.h"
+#include "catalog/pg_rewrite.h"
+#include "nodes/nodeFuncs.h"
+#include "parser/parse_oper.h"
+#include "parser/parsetree.h"
+#include "rewrite/rewriteDefine.h"
+#include "rewrite/viewUpdate.h"
+#include "utils/lsyscache.h"
+#include "utils/memutils.h"
+#include "utils/syscache.h"
+#include "utils/rel.h"
+
+typedef TargetEntry** ViewDefColumnList;
+
+typedef struct ViewBaseRelation
+{
+ List *defs; /* List of all base relations (root starts
+ * with only one relation because we
+ * implement only simple updatability) */
+ Oid parentRelation; /* Oid of parent relation, 0 indicates root */
+} ViewBaseRelation;
+
+typedef struct ViewBaseRelationItem
+{
+ Relation rel; /* the Relation itself */
+ Query *rule; /* _RETURN rule of a view relation */
+ TargetEntry **tentries; /* saves order of column target list */
+} ViewBaseRelationItem;
+
+typedef struct ViewExprContext
+{
+ Index newRTE;
+ Index oldRTE;
+ Index baseRTE;
+ Index subQueryLevel;
+ ViewDefColumnList tentries;
+} ViewExprContext;
+
+static const char *get_auto_rule_name(CmdType type);
+static Query *get_return_rule(Relation rel);
+static void read_rearranged_cols(ViewBaseRelation *tree);
+static bool is_select_query_updatable(const Query *query);
+static Oid get_reloid_from_select(const Query *select,
+ int *rti, RangeTblEntry **rel_entry);
+static void create_update_rule(Oid viewOid,
+ const Query *select,
+ const Relation baserel,
+ TargetEntry **tentries,
+ CmdType ruletype);
+static void get_base_base_relations(const Query *view, Oid baserelid, List **list);
+static void copyReversedTargetEntryPtr(List *targetList,
+ ViewDefColumnList targets);
+static bool check_reltree(ViewBaseRelation *node);
+static Query *form_update_query(const Query *select, ViewDefColumnList tentries, CmdType type);
+static RangeTblEntry *get_relation_RTE(const Query *select,
+ unsigned int *offset);
+static Index get_rtindex_for_rel(List *rte_list,
+ const char *relname);
+static bool replace_tlist_varno_walker(Node *node,
+ ViewExprContext *ctxt);
+static OpExpr *create_opexpr(Var *var_left, Var *var_right);
+static void form_where_for_updrule(const Query *select, FromExpr **from,
+ const Relation baserel, Index baserti,
+ Index oldrti);
+static void build_update_target_list(const Query *update, const Query *select,
+ const Relation baserel);
+
+/*------------------------------------------------------------------------------
+ * Private functions
+ * -----------------------------------------------------------------------------
+ */
+
+static const char *
+get_auto_rule_name(CmdType type)
+{
+ if (type == CMD_INSERT)
+ return "_INSERT";
+ if (type == CMD_UPDATE)
+ return "_UPDATE";
+ if (type == CMD_DELETE)
+ return "_DELETE";
+ return NULL;
+}
+
+/*
+ * Returns the range table index for the specified relname.
+ *
+ * XXX This seems pretty grotty ... can't we do this in some other way?
+ */
+static Index
+get_rtindex_for_rel(List *rte_list, const char *relname)
+{
+ ListCell *cell;
+ int index = 0;
+
+ AssertArg(relname);
+
+ foreach(cell, rte_list)
+ {
+ RangeTblEntry *rte = (RangeTblEntry *) lfirst(cell);
+
+ index++;
+
+ if (rte && strncmp(rte->eref->aliasname, relname, NAMEDATALEN) == 0)
+ break;
+ }
+
+ Assert(index > 0);
+
+ return (Index) index;
+}
+
+/*
+ * Returns the RangeTblEntry starting at the specified offset. The
+ * function can be used to iterate over the rtable list of the
+ * specified select query tree. Returns NULL if nothing is found.
+ *
+ * NOTE: The function only returns those RangeTblEntry that do not
+ * match a *NEW* or *OLD* RangeTblEntry.
+ *
+ * The offset is incremented as a side effect.
+ */
+static RangeTblEntry *
+get_relation_RTE(const Query *select, unsigned int *offset)
+{
+ AssertArg(offset);
+ AssertArg(select);
+
+ while (*offset <= list_length(select->rtable))
+ {
+ RangeTblEntry *rte = rt_fetch(*offset, select->rtable);
+ (*offset)++;
+
+ /* skip non-table RTEs */
+ if (rte->rtekind != RTE_RELATION)
+ continue;
+
+ /*
+ * Skip RTEs named *NEW* and *OLD*.
+ *
+ * XXX It would be nice to be able to use something else than just
+ * the names here ... However, rtekind does not work as expected :-(
+ */
+ if (strncmp(rte->eref->aliasname, "*NEW*", 6) == 0
+ || strncmp(rte->eref->aliasname, "*OLD*", 6) == 0)
+ continue;
+
+ return rte;
+ }
+
+ return NULL;
+}
+
+/*
+ * Rewrite varno's and varattno for the specified Var node if it is in
+ * a reversed order regarding to the underlying relation. The lookup
+ * table tentries holds all TargetEntries which are on a different
+ * location in the view definition. If var isn't on a different
+ * position in the current view than on its original relation, nothing
+ * is done.
+ *
+ * Note: This function assumes that the caller has already checked all
+ * parameters for NULL.
+ */
+static void
+adjustVarnoIfReversedCol(Var *var,
+ Index newRTE,
+ ViewDefColumnList tentries)
+{
+ TargetEntry *entry = tentries[var->varattno - 1];
+
+ /*
+ * tentries holds NULL if given var isn't on a different location
+ * in the view Only replace if column order is reversed.
+ */
+ if (entry && entry->resno != var->varattno)
+ {
+ var->varattno = entry->resno;
+ var->varoattno = entry->resno;
+ }
+
+ /* Finally, make varno point to the *NEW* range table entry. */
+ var->varno = newRTE;
+ var->varnoold = newRTE;
+}
+
+/*
+ * Creates an equal operator expression for the specified Vars. They
+ * are assumed to be of the same type.
+ */
+static OpExpr *
+create_opexpr(Var *var_left, Var *var_right)
+{
+ OpExpr *result;
+ HeapTuple tuple;
+ Form_pg_operator operator;
+ Oid eqOid;
+
+ AssertArg(var_left);
+ AssertArg(var_right);
+ Assert(var_left->vartype == var_right->vartype);
+
+ get_sort_group_operators(var_left->vartype, false, true, false,
+ NULL, &eqOid, NULL);
+
+ tuple = SearchSysCache(OPEROID, ObjectIdGetDatum(eqOid), 0, 0, 0);
+
+ operator = (Form_pg_operator) GETSTRUCT(tuple);
+ result = makeNode(OpExpr);
+
+ result->opno = HeapTupleGetOid(tuple);
+ result->opfuncid = operator->oprcode;
+ result->opresulttype = operator->oprresult;
+ result->opretset = false;
+
+ result->args = lappend(result->args, var_left);
+ result->args = lappend(result->args, var_right);
+
+ ReleaseSysCache(tuple);
+
+ return result;
+}
+
+/*
+ * Creates an expression tree for a WHERE clause.
+ *
+ * If from is not NULL, assigns the root node to the specified
+ * FromExpr of the target query tree.
+ *
+ * Note that the function appends the specified opExpr op to the
+ * specified anchor (if anchor != NULL) and returns that immediately.
+ * That way this function could be used to add operator nodes to an
+ * existing BoolExpr tree or (if from is given), to create a new Query
+ * qualification list.
+ */
+static Node *
+build_expression_tree(FromExpr *from, Node **anchor, BoolExpr *expr, OpExpr *op)
+{
+ /* Already some nodes there? */
+ if (*anchor)
+ {
+ expr->args = lappend(expr->args, op);
+ ((BoolExpr *)(*anchor))->args = lappend(((BoolExpr *)(*anchor))->args,
+ expr);
+ *anchor = (Node *)expr;
+ }
+ else
+ {
+ /* Currently no nodes ... */
+ BoolExpr *boolexpr = makeNode(BoolExpr);
+ expr->args = lappend(expr->args, op);
+ boolexpr->args = lappend(boolexpr->args, expr);
+
+ *anchor = (Node *) boolexpr;
+
+ if (from)
+ from->quals = *anchor;
+ }
+
+ return *anchor;
+}
+
+/*
+ * Forms the WHERE clause for DELETE/UPDATE rules targeted to the
+ * specified view.
+ */
+static void
+form_where_for_updrule(const Query *select, /* View retrieve rule */
+ FromExpr **from, /* FromExpr for stmt */
+ const Relation baserel, /* base relation of view */
+ Index baserti, /* Index of base relation RTE */
+ Index oldrti) /* Index of *OLD* RTE */
+{
+ BoolExpr *expr = NULL;
+ Node *anchor = NULL;
+ Form_pg_attribute *attrs;
+ ListCell *cell;
+
+ AssertArg(baserti > 0);
+ AssertArg(oldrti > 0);
+ AssertArg(*from);
+ AssertArg(baserel);
+
+ attrs = baserel->rd_att->attrs;
+
+ foreach(cell, select->targetList)
+ {
+ TargetEntry *te = (TargetEntry *) lfirst(cell);
+ Var *var1;
+ Var *var2;
+ OpExpr *op;
+ BoolExpr *null_condition;
+ NullTest *nulltest1;
+ NullTest *nulltest2;
+
+ /* If te->expr holds no Var pointer, continue. */
+ if (!IsA(te->expr, Var))
+ continue;
+
+ null_condition = makeNode(BoolExpr);
+ nulltest1 = makeNode(NullTest);
+ nulltest2 = makeNode(NullTest);
+
+ /*
+ * These are the new operands we had to check for equality.
+ *
+ * For DELETE/UPDATE rules, var1 points to the *OLD* RTE, var2
+ * references the base relation.
+ */
+ var1 = copyObject((Var *) (te->expr));
+
+ /*
+ * Look at varoattno to determine whether this attribute has a different
+ * location in the underlying base table. If that case, retrieve the
+ * attribute from the base table and assign it to var2; otherwise
+ * simply copy it to var1.
+ */
+ if (var1->varoattno > 0)
+ {
+ var2 = makeNode(Var);
+
+ var2->varno = baserti;
+ var2->varnoold = baserti;
+ var2->varattno = attrs[var1->varoattno - 1]->attnum;
+ var2->vartype = attrs[var1->varoattno - 1]->atttypid;
+ var2->vartypmod = attrs[var1->varoattno - 1]->atttypmod;
+ var2->varlevelsup = var1->varlevelsup;
+ var2->varnoold = var2->varno;
+ var2->varoattno = var2->varattno;
+ }
+ else
+ {
+ var2 = copyObject(var1);
+ var2->varno = baserti;
+ var2->varnoold = baserti;
+ }
+
+ var1->varno = oldrti;
+ var1->varnoold = oldrti;
+
+ /*
+ * rewrite varattno of var2 to point to the right column in relation
+ * *OLD* or *NEW*
+ */
+ var2->varattno = te->resorigcol;
+ var2->varoattno = te->resorigcol;
+
+ /*
+ * rewrite varattno of var1 to point to the right column in base
+ * relation
+ */
+ var1->varattno = te->resno;
+ var1->varoattno = te->resno;
+
+ op = create_opexpr(var1, var2);
+ expr = makeNode(BoolExpr);
+ expr->boolop = OR_EXPR;
+ null_condition->boolop = AND_EXPR;
+
+ nulltest1->arg = (Expr *)var1;
+ nulltest1->nulltesttype = IS_NULL;
+
+ nulltest2->arg = (Expr *)var2;
+ nulltest2->nulltesttype = IS_NULL;
+
+ null_condition->args = lappend(null_condition->args, nulltest1);
+ null_condition->args = lappend(null_condition->args, nulltest2);
+ expr->args = lappend(expr->args, null_condition);
+
+ anchor = build_expression_tree(*from, (Node **) &anchor, expr, op);
+ }
+}
+
+/*
+ * Replaces the varnos for the specified targetlist to rtIndex
+ */
+static bool
+replace_tlist_varno_walker(Node *node,
+ ViewExprContext *ctxt)
+{
+ AssertArg(ctxt);
+
+ if (!node)
+ return false;
+
+ switch(node->type)
+ {
+ case T_Var:
+ elog(DEBUG1, "adjusting varno old %d to new %d",
+ ((Var *)(node))->varno,
+ ctxt->newRTE);
+
+ ((Var *)(node))->varno = ctxt->newRTE;
+ adjustVarnoIfReversedCol((Var *)node,
+ ctxt->newRTE,
+ ctxt->tentries);
+ /* nothing more to do */
+ break;
+
+ case T_ArrayRef:
+ {
+ ArrayRef *array = (ArrayRef *) node;
+
+ /*
+ * Things are getting complicated here. We have found an
+ * array subscripting operation. It's necessary to
+ * examine all varno's found in this operation to make
+ * sure, we're getting right. This covers cases where a
+ * view selects a single index or complete array from a
+ * base table or view.
+ */
+
+ /*
+ * Look at expressions that evaluate upper array
+ * indexes. Make sure all varno's are modified. This is
+ * done by walking the expression tree recursively.
+ */
+ expression_tree_walker((Node *) array->refupperindexpr,
+ replace_tlist_varno_walker,
+ (void *)ctxt);
+
+ expression_tree_walker((Node *) array->reflowerindexpr,
+ replace_tlist_varno_walker,
+ (void *)ctxt);
+
+ expression_tree_walker((Node *) array->refexpr,
+ replace_tlist_varno_walker,
+ (void *)ctxt);
+
+ expression_tree_walker((Node *) array->refassgnexpr,
+ replace_tlist_varno_walker,
+ (void *)ctxt);
+ }
+
+ default:
+ break;
+ }
+
+ return expression_tree_walker(node, replace_tlist_varno_walker, ctxt);
+}
+
+/*
+ * Adds RTEs to form a query tree.
+ *
+ * select has to be a valid initialized view definition query tree
+ * (the function assumes that this query has passed the
+ * is_select_query_updatable() function).
+ */
+static Query *
+form_update_query(const Query *select, ViewDefColumnList tentries, CmdType type)
+{
+ RangeTblEntry *rte;
+ Oid reloid;
+ Query *newquery;
+
+ AssertArg(select);
+ AssertArg(tentries);
+
+ newquery = makeNode(Query);
+ newquery->commandType = type;
+
+ /* copy the range table entries */
+ newquery->rtable = copyObject(select->rtable);
+
+ /* prepare other stuff */
+ newquery->canSetTag = true;
+ newquery->jointree = makeNode(FromExpr);
+
+ /*
+ * Set result relation to the base relation.
+ *
+ * Since we currently only support updatable views with one
+ * underlying table, we simply extract the one relation which
+ * isn't labeled as *OLD* or *NEW*.
+ */
+ reloid = get_reloid_from_select(select, &(newquery->resultRelation), &rte);
+ if (!OidIsValid(reloid))
+ elog(ERROR, "could not retrieve base relation OID");
+
+ Assert(newquery->resultRelation > 0);
+
+ /* adjust inFromCl of result range table entry */
+ rte->inFromCl = false;
+
+ /* We don't need a target list for DELETE. */
+ if (type != CMD_DELETE)
+ {
+ ViewExprContext ctxt;
+ ListCell *cell;
+
+ /* Copy all target entries. */
+ newquery->targetList = copyObject(select->targetList);
+
+ /*
+ * Replace all varnos to point to the *NEW* node in all targetentry
+ * expressions.
+ */
+
+ ctxt.newRTE = PRS2_NEW_VARNO;
+ ctxt.tentries = tentries;
+
+ foreach(cell, newquery->targetList)
+ {
+ Node *node = (Node *) lfirst(cell);
+ expression_tree_walker(node,
+ replace_tlist_varno_walker,
+ (void *) &ctxt);
+ }
+ }
+
+ return newquery;
+}
+
+/*
+ * Rewrite a TargetEntry, based on the given arguments to match
+ * the new Query tree of the new DELETE/UPDATE/INSERT rule and/or
+ * its underlying base relation.
+ *
+ * form_te_for_update() needs to carefully reassign Varno's of
+ * all Var expressions assigned to the given TargetEntry and to
+ * adjust all type info values and attribute index locations so
+ * that the rewritten TargetEntry corresponds to the correct
+ * column in the underlying base relation.
+ *
+ * Someone should consider that columns could be in reversed
+ * order in a view definition, so we need to take care to
+ * "restore" the correct order of all columns in the target list
+ * of the new view update rules.
+ *
+ * There's also some additional overhead if we have an array field
+ * involved. In this case we have to loop recursively through the
+ * array expressions to get all target entries right.
+ */
+static void
+form_te_for_update(int2 attnum, Form_pg_attribute attrs, Oid baserelid,
+ Expr *expr, TargetEntry *te_update)
+{
+ /*
+ * First, try if this is an array subscripting operation. If true, dive
+ * recursively into the subscripting tree examining all varnos.
+ */
+
+ if (IsA(expr, ArrayRef))
+ {
+ ArrayRef *array = (ArrayRef *) expr;
+
+ if (array->refassgnexpr != NULL)
+ form_te_for_update(attnum, attrs, baserelid, array->refassgnexpr,
+ te_update);
+
+ if (array->refupperindexpr != NIL)
+ {
+ ListCell *cell;
+
+ foreach(cell, array->refupperindexpr)
+ form_te_for_update(attnum, attrs, baserelid, (Expr *) lfirst(cell), te_update);
+ }
+
+ if (array->reflowerindexpr != NIL)
+ {
+ ListCell *cell;
+
+ foreach(cell, array->reflowerindexpr)
+ form_te_for_update(attnum, attrs, baserelid, (Expr *) lfirst(cell), te_update);
+ }
+
+ if (array->refexpr != NULL)
+ form_te_for_update(attnum, attrs, baserelid, array->refexpr,
+ te_update);
+ }
+ else if (IsA(expr, Var))
+ {
+ /*
+ * Base case of recursion: actually build the TargetEntry.
+ */
+ Var *upd_var = (Var *) (te_update->expr);
+
+ upd_var->varattno = te_update->resno;
+ upd_var->varoattno = te_update->resno;
+
+ upd_var->vartype = attrs->atttypid;
+ upd_var->vartypmod = attrs->atttypmod;
+
+ upd_var->varnoold = upd_var->varno;
+
+ te_update->resno = attnum;
+ te_update->resname = pstrdup(get_attname(baserelid, attnum));
+ te_update->ressortgroupref = 0;
+ te_update->resorigcol = 0;
+ te_update->resorigtbl = 0;
+ te_update->resjunk = false;
+ }
+}
+
+/*
+ * Create the returning list for the given query tree. This allows
+ * using RETURING in view update actions. Note that the function
+ * creates the returning list from the target list of the given query
+ * tree if src is set to NULL. This requires to call
+ * build_update_target_list() on that query tree before. If src !=
+ * NULL, the target list is created from this query tree instead.
+ */
+static void
+create_rule_returning_list(Query *query, const Query *src, Index newRTE,
+ ViewDefColumnList tentries)
+{
+ ViewExprContext ctxt;
+ ListCell *cell;
+
+ ctxt.newRTE = newRTE;
+ ctxt.tentries = tentries;
+
+ /* determine target list source */
+ if (src)
+ query->returningList = copyObject(src->targetList);
+ else
+ query->returningList = copyObject(query->targetList);
+
+ foreach(cell, query->returningList)
+ expression_tree_walker((Node *) lfirst(cell),
+ replace_tlist_varno_walker,
+ (void *) &ctxt);
+}
+
+/*
+ * Build the target list for a view UPDATE rule.
+ *
+ * Note: The function assumes a Query tree specified by update, which
+ * was copied by form_update_query(). We need the original Query tree
+ * to adjust the properties of each member of the TargetList of the
+ * new query tree.
+ */
+static void
+build_update_target_list(const Query *update, const Query *select,
+ Relation baserel)
+{
+ ListCell *cell1;
+ ListCell *cell2;
+
+ /*
+ * This Assert() is appropriate, since we rely on a query tree
+ * created by from_query(), which copies the target list from the
+ * original query tree specified by the argument select, which
+ * holds the current view definition. So both target lists have
+ * to be equal in length.
+ */
+ Assert(list_length(update->targetList) == list_length(select->targetList));
+
+ /*
+ * Copy the target list of the view definition to the
+ * returningList. This is required to support RETURNING clauses
+ * in view update actions.
+ */
+ forboth(cell1, select->targetList, cell2, update->targetList)
+ {
+ TargetEntry *entry = (TargetEntry *) lfirst(cell1);
+ TargetEntry *upd_entry = (TargetEntry *) lfirst(cell2);
+ int attindex;
+ Form_pg_attribute attr;
+
+ if (entry->resorigcol > 0)
+ /*
+ * This column seems to have a different order than in the base
+ * table. We get the attribute from the base relation referenced
+ * by rel and create a new resdom. This new result domain is then
+ * assigned instead of the old one.
+ */
+ attindex = entry->resorigcol;
+ else
+ attindex = entry->resno;
+
+ attr = baserel->rd_att->attrs[attindex - 1];
+
+ form_te_for_update(attindex, attr, baserel->rd_id, upd_entry->expr,
+ upd_entry);
+ }
+}
+
+/*
+ * Examines the columns by the current view and initializes the lookup
+ * table for all rearranged columns in base relations. The function
+ * requires a relation tree initialized by get_base_base_relations().
+ */
+static void
+read_rearranged_cols(ViewBaseRelation *tree)
+{
+ AssertArg(tree);
+
+ if (tree->defs)
+ {
+ int num_items = list_length(tree->defs);
+ int i;
+
+ /*
+ * Traverse the relation tree and look on all base relations
+ * for reversed column order in their target lists. We have
+ * to perform a look-ahead-read on the tree, because we need
+ * to know how much columns the next base relation has, to
+ * allocate enough memory in tentries.
+ *
+ * Note that if we only have one base relation (a "real"
+ * table, not a view) exists, we have nothing to do, because
+ * this base relation cannot have a reversed column order
+ * caused by a view definition query.
+ */
+ for (i = num_items - 1; i > 0; i--)
+ {
+ ViewBaseRelationItem *item_current;
+ ViewBaseRelationItem *item_next;
+ ViewBaseRelation *current;
+ ViewBaseRelation *next;
+
+ current = (ViewBaseRelation *) list_nth(tree->defs, i);
+
+ /*
+ * We look ahead for the next base relation. We can do
+ * this here safely, because the loop condition terminates
+ * before reaching the list head.
+ */
+ next = (ViewBaseRelation *) list_nth(tree->defs, i - 1);
+
+ /*
+ * Note that the code currently requires a simply updatable
+ * relation tree. This means we handle one base relation
+ * per loop, only.
+ */
+ Assert(next);
+ Assert(current);
+ Assert(list_length(next->defs) == 1);
+ Assert(list_length(current->defs) == 1);
+
+ item_current = (ViewBaseRelationItem *) list_nth(current->defs, 0);
+ item_next = (ViewBaseRelationItem *) list_nth(next->defs, 0);
+
+ /* allocate tentries buffer */
+ item_current->tentries = (ViewDefColumnList) palloc(sizeof(TargetEntry *) * RelationGetNumberOfAttributes(item_next->rel));
+
+ copyReversedTargetEntryPtr(item_current->rule->targetList,
+ item_current->tentries);
+ }
+ }
+}
+
+/*------------------------------------------------------------------------------
+ * Retrieves all relations from the view that can be considered a "base
+ * relation". The function returns a list that holds lists of all relation
+ * OIDs found for the view. The list is filled top down, that means the head of
+ * the list holds the relations for the "highest" view in the tree.
+ *
+ * Consider this view definition tree where each node is a relation the above
+ * node is based on:
+ *
+ * 1
+ * / \
+ * 2 3
+ * / \ \
+ * 4 5 6
+ * /
+ * 7
+ *
+ * The function will then return a list with the following layout:
+ *
+ * Listindex Node(s)
+ * --------------------------
+ * 1 7
+ * 2 4 5 6
+ * 3 2 3
+ *
+ * As you can see in the table, all relations that are "children" of the
+ * given root relation (the view relation itself) are saved in the
+ * tree, except the root node itself.
+ *------------------------------------------------------------------------------
+ */
+static void
+get_base_base_relations(const Query *view, Oid baserelid, List **list)
+{
+ RangeTblEntry *entry;
+ unsigned int offset = 1;
+ ViewBaseRelation *childRel;
+
+ if (!view)
+ return;
+
+ childRel = (ViewBaseRelation *) palloc(sizeof(ViewBaseRelation));
+ childRel->defs = NIL;
+ childRel->parentRelation = baserelid;
+
+ /* Get all OIDs from the RTE list of view. */
+ while ((entry = get_relation_RTE(view, &offset)) != NULL)
+ {
+ Relation rel;
+ ViewBaseRelationItem *item;
+
+ /*
+ * Is this really a view or relation?
+ */
+ rel = relation_open(entry->relid, AccessShareLock);
+
+ if (rel->rd_rel->relkind != RELKIND_RELATION &&
+ rel->rd_rel->relkind != RELKIND_VIEW)
+ {
+ /* don't need this one */
+ relation_close(rel, AccessShareLock);
+ continue;
+ }
+
+ item = (ViewBaseRelationItem *) palloc0(sizeof(ViewBaseRelationItem));
+ item->rel = rel;
+ item->rule = NULL;
+
+ if (rel->rd_rel->relkind == RELKIND_VIEW)
+ /*
+ * Get the rule _RETURN expression tree for the specified
+ * relation OID. We need this to recurse into the view
+ * base relation tree.
+ */
+ item->rule = get_return_rule(rel);
+
+ elog(DEBUG1, "extracted relation %s for relation tree",
+ RelationGetRelationName(rel));
+ childRel->defs = lappend(childRel->defs, item);
+
+ /* recurse to any other child relations */
+ if (item->rule)
+ get_base_base_relations(item->rule, RelationGetRelid(rel), list);
+
+ }
+
+ if (childRel->defs)
+ *list = lappend(*list, childRel);
+}
+
+static void
+copyReversedTargetEntryPtr(List *targetList, ViewDefColumnList targets)
+{
+ ListCell *cell;
+
+ AssertArg(targets);
+ AssertArg(targetList);
+
+ /* NOTE: We only reassign pointers. */
+ foreach(cell, targetList)
+ {
+ Node *node = (Node *) lfirst(cell);
+
+ if (IsA(node, TargetEntry))
+ {
+ /*
+ * Look at the resdom's resorigcol to determine whether
+ * this is a reversed column (meaning, it has a different
+ * column number than the underlying base table).
+ */
+ TargetEntry *entry = (TargetEntry *) node;
+
+ if (!IsA(entry->expr, Var))
+ /* nothing to do here */
+ continue;
+
+ if (entry->resorigcol > 0 && entry->resno != entry->resorigcol)
+ {
+ /*
+ * Save this TargetEntry to the appropiate place in
+ * the lookup table. Do it only if not already
+ * occupied (this could happen if the column is
+ * specified more than one time in the view
+ * definition).
+ */
+ if (targets[entry->resorigcol - 1] == NULL)
+ targets[entry->resorigcol - 1] = entry;
+ }
+ }
+ }
+}
+
+/*
+ * Transforms the specified view definition into an INSERT, UPDATE, or
+ * DELETE rule.
+ *
+ * Note: The function assumes that the specified query tree has passed the
+ * is_select_query_updatable() function.
+ */
+static void
+create_update_rule(Oid viewOid, const Query *select, const Relation baserel,
+ ViewDefColumnList tentries,
+ CmdType ruletype)
+{
+ Query *newquery;
+ Oid baserelid;
+ Index baserti;
+ RangeTblEntry *rte;
+
+ AssertArg(tentries);
+ AssertArg(baserel);
+ AssertArg(select);
+ AssertArg(ruletype == CMD_INSERT || ruletype == CMD_UPDATE || ruletype == CMD_DELETE);
+
+ newquery = form_update_query(select, tentries, ruletype);
+
+ /*
+ * form_update_query() has prepared the jointree of the new UPDATE/DELETE rule.
+ *
+ * Now, our UPDATE rule needs range table references for the *NEW*
+ * and base relation RTEs. A DELETE rule needs range table
+ * references for the *OLD* and base relation RTEs.
+ */
+
+ baserelid = get_reloid_from_select(select, NULL, &rte);
+ if (!OidIsValid(baserelid))
+ elog(ERROR, "could not get the base relation from the view definition");
+
+ baserti = get_rtindex_for_rel(newquery->rtable,
+ rte->eref->aliasname);
+ Assert(baserti > 0);
+
+ rte = rt_fetch(baserti, newquery->rtable);
+
+ if (ruletype != CMD_INSERT)
+ {
+ RangeTblRef *oldref;
+ RangeTblRef *baseref;
+
+ oldref = makeNode(RangeTblRef);
+ oldref->rtindex = PRS2_OLD_VARNO;
+
+ baseref = makeNode(RangeTblRef);
+ baseref->rtindex = baserti;
+
+ newquery->jointree->fromlist = list_make2(baseref, oldref);
+
+ /* Create the WHERE condition qualification for the rule action. */
+ form_where_for_updrule(select, &(newquery->jointree),
+ baserel, baserti, PRS2_OLD_VARNO);
+ }
+
+ if (ruletype != CMD_DELETE)
+ /*
+ * We must reorder the columns in the targetlist to match the
+ * underlying table. We do this after calling
+ * form_where_for_updrule() because build_update_target_list()
+ * relies on the original resdoms in the update tree.
+ */
+ build_update_target_list(newquery, select, baserel);
+
+ /*
+ * Create the returning list now that build_update_target_list()
+ * has done the leg work.
+ */
+ if (ruletype == CMD_DELETE)
+ create_rule_returning_list(newquery, select, PRS2_OLD_VARNO, tentries);
+ else
+ create_rule_returning_list(newquery, NULL, PRS2_NEW_VARNO, tentries);
+
+ /* Set ACL bit */
+ if (ruletype == CMD_INSERT)
+ rte->requiredPerms |= ACL_INSERT;
+ else if (ruletype == CMD_UPDATE)
+ rte->requiredPerms |= ACL_UPDATE;
+ else if (ruletype == CMD_DELETE)
+ rte->requiredPerms |= ACL_DELETE;
+
+ /* Create system rule */
+ DefineQueryRewrite(pstrdup(get_auto_rule_name(ruletype)),
+ viewOid, /* event_relid */
+ NULL, /* WHERE clause */
+ ruletype,
+ true, /* is_instead */
+ true, /* is_auto */
+ false, /* replace */
+ list_make1(newquery) /* action */);
+}
+
+/*
+ * Checks the specified Query for updatability. Currently, "simply
+ * updatable" rules are implemented.
+ */
+static bool
+is_select_query_updatable(const Query *query)
+{
+ ListCell *cell;
+ List *seen_attnos;
+
+ AssertArg(query);
+ AssertArg(query->commandType == CMD_SELECT);
+
+ /*
+ * check for unsupported clauses in the view definition
+ */
+
+ if (query->hasAggs)
+ {
+ elog(DEBUG1, "view is not updatable because it uses an aggregate function");
+ return false;
+ }
+
+ if (query->hasWindowFuncs)
+ {
+ elog(DEBUG1, "view is not updatable because it uses a window function");
+ return false;
+ }
+
+ if (query->hasRecursive)
+ {
+ elog(DEBUG1, "view is not updatable because it contains a WITH RECURSIVE clause");
+ return false;
+ }
+
+ if (query->cteList)
+ {
+ elog(DEBUG1, "view is not updatable because it contains a WITH clause");
+ return false;
+ }
+
+ if (list_length(query->groupClause) >= 1)
+ {
+ elog(DEBUG1, "view is not updatable because it contains a GROUP BY clause");
+ return false;
+ }
+
+ if (query->havingQual)
+ {
+ elog(DEBUG1, "view is not updatable because it contains a HAVING clause");
+ return false;
+ }
+
+ if (list_length(query->distinctClause) >= 1)
+ {
+ elog(DEBUG1, "view is not updatable because it contains a DISTINCT clause");
+ return false;
+ }
+
+ if (query->limitOffset)
+ {
+ elog(DEBUG1, "view is not updatable because it contains an OFFSET clause");
+ return false;
+ }
+
+ if (query->limitCount)
+ {
+ elog(DEBUG1, "view is not updatable because it contains a LIMIT clause");
+ return false;
+ }
+
+ if (query->setOperations)
+ {
+ elog(DEBUG1, "view is not updatable because it contains UNION or INTERSECT or EXCEPT");
+ return false;
+ }
+
+ /*
+ * Test for number of involved relations. Since we assume to
+ * operate on a view definition SELECT query tree, we must count 3
+ * rtable entries. Otherwise this seems not to be a view based on
+ * a single relation.
+ */
+ if (list_length(query->rtable) > 3)
+ {
+ elog(DEBUG1, "view is not updatable because it has more than one underlying table");
+ return false;
+ }
+
+ /* Any rtable entries involved? */
+ if (list_length(query->rtable) < 3)
+ {
+ elog(DEBUG1, "view is not updatable because it has no underlying tables");
+ return false;
+ }
+
+ /*
+ * Walk down the target list and look for nodes that aren't Vars.
+ * "Simply updatable" doesn't allow functions, host variables, or
+ * constant expressions in the target list.
+ *
+ * Also, check if any of the target list entries are indexed array
+ * expressions, which aren't supported.
+ */
+ seen_attnos = NIL;
+
+ foreach(cell, query->targetList)
+ {
+ Node *node = (Node *) lfirst(cell);
+
+ if (IsA(node, TargetEntry))
+ {
+ TargetEntry *te = (TargetEntry *) node;
+
+ /*
+ * TODO -- it would be nice to support Const nodes here as well
+ * (but apparently it isn't in the standard)
+ */
+ if (!IsA(te->expr, Var) && !IsA(te->expr, ArrayRef))
+ {
+ elog(DEBUG1, "view is not updatable because select list contains a derived column");
+ return false;
+ }
+
+ /* This is currently only partially implemented, but can be fixed. */
+ if (IsA(te->expr, ArrayRef))
+ {
+ elog(DEBUG1, "view is not updatable because select list contains an array element reference");
+ return false;
+ }
+
+ if (IsA(te->expr, Var))
+ {
+ Var *var = (Var *) te->expr;
+
+ /* System columns aren't updatable. */
+ if (var->varattno < 0)
+ {
+ elog(DEBUG1, "view is not updatable because select list references a system column");
+ return false;
+ }
+
+ if (list_member_int(seen_attnos, var->varattno))
+ {
+ elog(DEBUG1, "view is not updatable because select list references the same column more than once");
+ return false;
+ }
+ else
+ seen_attnos = lappend_int(seen_attnos, var->varattno);
+ }
+ }
+ }
+
+ /*
+ * Finally, check that all RTEs are acceptable. This rejects
+ * table functions, which cannot ever be updatable, and also WITH
+ * clauses.
+ */
+ foreach(cell, query->rtable)
+ {
+ RangeTblEntry *entry = (RangeTblEntry *) lfirst(cell);
+
+ if (entry->rtekind != RTE_RELATION)
+ {
+ elog(DEBUG1, "view is not updatable because correlation \"%s\" is not a table",
+ entry->eref->aliasname);
+ return false;
+ }
+ }
+
+ return true;
+}
+
+/*
+ * Traverse the specified relation tree. The function stops at the
+ * base relations at the leafs of the tree. If any of the relations
+ * has more than one base relation, it is considered a not simply
+ * updatable view and false is returned.
+ */
+static bool
+check_reltree(ViewBaseRelation *node)
+{
+ ListCell *cell;
+
+ AssertArg(node);
+
+ foreach(cell, node->defs)
+ {
+ /* Walk down the tree */
+ ViewBaseRelation *relations = (ViewBaseRelation *) lfirst(cell);
+
+ if (list_length(relations->defs) > 1)
+ {
+ elog(DEBUG1, "possible JOIN/UNION in view definition: %d", list_length(relations->defs));
+ return false;
+ }
+ else if (list_length(relations->defs) == 1) {
+ ViewBaseRelationItem *item = (ViewBaseRelationItem *) linitial(relations->defs);
+
+ /* if the relation found is a view, check its updatability */
+ if (item->rel->rd_rel->relkind == RELKIND_VIEW && !is_select_query_updatable(item->rule))
+ {
+ elog(DEBUG1, "base view \"%s\" is not updatable",
+ RelationGetRelationName(item->rel));
+ return false;
+ }
+ }
+ }
+
+ return true;
+}
+
+/*
+ * Given a SELECT query tree, return the OID of the first RTE_RELATION range
+ * table entry found that is not *NEW* nor *OLD*.
+ *
+ * Also sets the RangeTblEntry pointer into rel_entry, and the range
+ * table index into rti, unless they are NULL.
+ *
+ * This function assumes that the specified query tree was checked by a
+ * previous call to the is_select_query_updatable() function.
+ */
+static Oid
+get_reloid_from_select(const Query *select, int *rti, RangeTblEntry **rel_entry)
+{
+ ListCell *cell;
+ Oid result = InvalidOid;
+ int index;
+
+ /* Check specified query tree. Return immediately on error. */
+ if (select == NULL || select->commandType != CMD_SELECT)
+ return InvalidOid;
+
+ /*
+ * We loop through the RTEs to get information about all involved
+ * relations. We return the first OID we find in the list that is not
+ * *NEW* nor *OLD*.
+ */
+ index = 0;
+ foreach(cell, select->rtable)
+ {
+ RangeTblEntry *entry = (RangeTblEntry *) lfirst(cell);
+
+ index++;
+
+ if (entry == NULL)
+ elog(ERROR, "null RTE pointer in get_reloid_from_select");
+
+ elog(DEBUG1, "extracted range table entry for %u", entry->relid);
+
+ /* Return the first RELATION rte we find */
+ if (entry->rtekind == RTE_RELATION)
+ {
+ /*
+ * XXX This is ugly. The parser prepends two RTEs with rtekind
+ * RTE_RELATION named *NEW* and *OLD*. We have to exclude them by
+ * name! It would be much better if it used RTE_SPECIAL
+ * instead, but other parts of the system stop working if one
+ * just changes it naively.
+ */
+ if (strncmp(entry->eref->aliasname, "*NEW*", 6) == 0
+ || strncmp(entry->eref->aliasname, "*OLD*", 6) == 0)
+ continue;
+
+ result = entry->relid;
+ if (rti != NULL)
+ *rti = index;
+ if (rel_entry != NULL)
+ *rel_entry = entry;
+ break;
+ }
+ }
+
+ return result;
+}
+
+/*
+ * get_return_rule: returns the _RETURN rule of a view as a Query node.
+ */
+static Query *
+get_return_rule(Relation rel)
+{
+ Query *query = NULL;
+ int i;
+
+ AssertArg(rel->rd_rel->relkind == RELKIND_VIEW);
+
+ for (i = 0; i < rel->rd_rules->numLocks; i++)
+ {
+ RewriteRule *rule = rel->rd_rules->rules[i];
+
+ if (rule->event == CMD_SELECT)
+ {
+ /* A _RETURN rule should have only one action */
+ if (list_length(rule->actions) != 1)
+ elog(ERROR, "invalid _RETURN rule action specification");
+
+ query = linitial(rule->actions);
+ break;
+ }
+ }
+
+ return query;
+}
+
+/*------------------------------------------------------------------------------
+ * Public functions
+ *------------------------------------------------------------------------------
+ */
+
+/*
+ * CreateViewUpdateRules
+ *
+ * This is the main entry point to creating an updatable view's rules. Given a
+ * rule definition, examine it, and create the rules if appropiate, or return
+ * doing nothing if not.
+ */
+void
+CreateViewUpdateRules(Oid viewOid, const Query *viewDef)
+{
+ Relation baserel;
+ Form_pg_attribute *attrs;
+ ViewDefColumnList tentries;
+ Oid baserelid;
+ MemoryContext cxt;
+ MemoryContext oldcxt;
+ ViewBaseRelation *tree;
+ ListCell *cell;
+
+ /*
+ * The routines in this file leak memory like crazy, so make sure we
+ * allocate it all in an appropiate context.
+ */
+ cxt = AllocSetContextCreate(TopTransactionContext,
+ "UpdateRulesContext",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+ oldcxt = MemoryContextSwitchTo(cxt);
+
+ /*
+ * Create the lookup table for the view definition target columns. We save
+ * the RESDOMS in that manner to look quickly for reversed column orders.
+ */
+
+ baserelid = get_reloid_from_select(viewDef, NULL, NULL);
+
+ /* Get relation tree */
+ tree = (ViewBaseRelation *) palloc(sizeof(ViewBaseRelation));
+
+ tree->parentRelation = InvalidOid;
+ tree->defs = NIL;
+ get_base_base_relations(viewDef, baserelid, &(tree->defs));
+
+ /* Check the query tree for updatability */
+ if (!check_reltree(tree) || !is_select_query_updatable(viewDef))
+ {
+ elog(DEBUG1, "view is not updatable");
+ goto finish;
+ }
+
+ baserel = heap_open(baserelid, AccessShareLock);
+ attrs = baserel->rd_att->attrs;
+
+ /*
+ * Copy TargetEntries to match the slot numbers in the target list with
+ * their original column attribute number. Note that only pointers are
+ * copied and they are valid only as long as the specified SELECT query
+ * stays valid!
+ */
+ tentries = (ViewDefColumnList)
+ palloc0(baserel->rd_rel->relnatts * sizeof(TargetEntry *));
+
+ copyReversedTargetEntryPtr(viewDef->targetList, tentries);
+
+ /*
+ * Now do the same for the base relation tree. read_rearranged_cols
+ * traverses the relation tree and performs a copyReversedTargetEntry()
+ * call to each base relation.
+ */
+ read_rearranged_cols(tree);
+
+ create_update_rule(viewOid, viewDef, baserel, tentries, CMD_INSERT);
+ create_update_rule(viewOid, viewDef, baserel, tentries, CMD_DELETE);
+ create_update_rule(viewOid, viewDef, baserel, tentries, CMD_UPDATE);
+
+ ereport(NOTICE, (errmsg("CREATE VIEW has created automatic view update rules")));
+
+ /* free remaining stuff */
+ heap_close(baserel, NoLock);
+
+finish:
+ /* get_base_base_relations leaves some open relations */
+ foreach(cell, tree->defs)
+ {
+ ListCell *cell2;
+ ViewBaseRelation *vbr = (ViewBaseRelation *) lfirst(cell);
+
+ foreach(cell2, vbr->defs)
+ {
+ ViewBaseRelationItem *vbri = (ViewBaseRelationItem *) lfirst(cell2);
+
+ relation_close(vbri->rel, NoLock);
+ }
+ }
+
+ MemoryContextSwitchTo(oldcxt);
+ MemoryContextDelete(cxt);
+}