diff options
author | Peter Eisentraut <peter_e@gmx.net> | 2009-01-22 17:27:55 +0000 |
---|---|---|
committer | Peter Eisentraut <peter_e@gmx.net> | 2009-01-22 17:27:55 +0000 |
commit | dd7e54a17ffed5767cd9af175024c60df2b1951c (patch) | |
tree | 2932885551f282e1466c6bd1f31a35c4938c329c /src/backend/rewrite/viewUpdate.c | |
parent | 5841aa86eb67dabb0a54c81d21463002dfa7e358 (diff) | |
download | postgresql-dd7e54a17ffed5767cd9af175024c60df2b1951c.tar.gz postgresql-dd7e54a17ffed5767cd9af175024c60df2b1951c.zip |
Automatic view update rules
Bernd Helmle
Diffstat (limited to 'src/backend/rewrite/viewUpdate.c')
-rw-r--r-- | src/backend/rewrite/viewUpdate.c | 1401 |
1 files changed, 1401 insertions, 0 deletions
diff --git a/src/backend/rewrite/viewUpdate.c b/src/backend/rewrite/viewUpdate.c new file mode 100644 index 00000000000..3687f19429d --- /dev/null +++ b/src/backend/rewrite/viewUpdate.c @@ -0,0 +1,1401 @@ +/*------------------------------------------------------------------------- + * + * viewUpdate.c + * routines for translating a view definition into + * INSERT/UPDATE/DELETE rules (i.e. updatable views). + * + * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * ORIGINAL AUTHORS + * Bernd Helmle, Jaime Casanova + * + * IDENTIFICATION + * $PostgreSQL: pgsql/src/backend/rewrite/viewUpdate.c,v 1.1 2009/01/22 17:27:54 petere Exp $ + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include "access/heapam.h" +#include "access/xact.h" +#include "catalog/pg_operator.h" +#include "catalog/pg_rewrite.h" +#include "nodes/nodeFuncs.h" +#include "parser/parse_oper.h" +#include "parser/parsetree.h" +#include "rewrite/rewriteDefine.h" +#include "rewrite/viewUpdate.h" +#include "utils/lsyscache.h" +#include "utils/memutils.h" +#include "utils/syscache.h" +#include "utils/rel.h" + +typedef TargetEntry** ViewDefColumnList; + +typedef struct ViewBaseRelation +{ + List *defs; /* List of all base relations (root starts + * with only one relation because we + * implement only simple updatability) */ + Oid parentRelation; /* Oid of parent relation, 0 indicates root */ +} ViewBaseRelation; + +typedef struct ViewBaseRelationItem +{ + Relation rel; /* the Relation itself */ + Query *rule; /* _RETURN rule of a view relation */ + TargetEntry **tentries; /* saves order of column target list */ +} ViewBaseRelationItem; + +typedef struct ViewExprContext +{ + Index newRTE; + Index oldRTE; + Index baseRTE; + Index subQueryLevel; + ViewDefColumnList tentries; +} ViewExprContext; + +static const char *get_auto_rule_name(CmdType type); +static Query *get_return_rule(Relation rel); +static void read_rearranged_cols(ViewBaseRelation *tree); +static bool is_select_query_updatable(const Query *query); +static Oid get_reloid_from_select(const Query *select, + int *rti, RangeTblEntry **rel_entry); +static void create_update_rule(Oid viewOid, + const Query *select, + const Relation baserel, + TargetEntry **tentries, + CmdType ruletype); +static void get_base_base_relations(const Query *view, Oid baserelid, List **list); +static void copyReversedTargetEntryPtr(List *targetList, + ViewDefColumnList targets); +static bool check_reltree(ViewBaseRelation *node); +static Query *form_update_query(const Query *select, ViewDefColumnList tentries, CmdType type); +static RangeTblEntry *get_relation_RTE(const Query *select, + unsigned int *offset); +static Index get_rtindex_for_rel(List *rte_list, + const char *relname); +static bool replace_tlist_varno_walker(Node *node, + ViewExprContext *ctxt); +static OpExpr *create_opexpr(Var *var_left, Var *var_right); +static void form_where_for_updrule(const Query *select, FromExpr **from, + const Relation baserel, Index baserti, + Index oldrti); +static void build_update_target_list(const Query *update, const Query *select, + const Relation baserel); + +/*------------------------------------------------------------------------------ + * Private functions + * ----------------------------------------------------------------------------- + */ + +static const char * +get_auto_rule_name(CmdType type) +{ + if (type == CMD_INSERT) + return "_INSERT"; + if (type == CMD_UPDATE) + return "_UPDATE"; + if (type == CMD_DELETE) + return "_DELETE"; + return NULL; +} + +/* + * Returns the range table index for the specified relname. + * + * XXX This seems pretty grotty ... can't we do this in some other way? + */ +static Index +get_rtindex_for_rel(List *rte_list, const char *relname) +{ + ListCell *cell; + int index = 0; + + AssertArg(relname); + + foreach(cell, rte_list) + { + RangeTblEntry *rte = (RangeTblEntry *) lfirst(cell); + + index++; + + if (rte && strncmp(rte->eref->aliasname, relname, NAMEDATALEN) == 0) + break; + } + + Assert(index > 0); + + return (Index) index; +} + +/* + * Returns the RangeTblEntry starting at the specified offset. The + * function can be used to iterate over the rtable list of the + * specified select query tree. Returns NULL if nothing is found. + * + * NOTE: The function only returns those RangeTblEntry that do not + * match a *NEW* or *OLD* RangeTblEntry. + * + * The offset is incremented as a side effect. + */ +static RangeTblEntry * +get_relation_RTE(const Query *select, unsigned int *offset) +{ + AssertArg(offset); + AssertArg(select); + + while (*offset <= list_length(select->rtable)) + { + RangeTblEntry *rte = rt_fetch(*offset, select->rtable); + (*offset)++; + + /* skip non-table RTEs */ + if (rte->rtekind != RTE_RELATION) + continue; + + /* + * Skip RTEs named *NEW* and *OLD*. + * + * XXX It would be nice to be able to use something else than just + * the names here ... However, rtekind does not work as expected :-( + */ + if (strncmp(rte->eref->aliasname, "*NEW*", 6) == 0 + || strncmp(rte->eref->aliasname, "*OLD*", 6) == 0) + continue; + + return rte; + } + + return NULL; +} + +/* + * Rewrite varno's and varattno for the specified Var node if it is in + * a reversed order regarding to the underlying relation. The lookup + * table tentries holds all TargetEntries which are on a different + * location in the view definition. If var isn't on a different + * position in the current view than on its original relation, nothing + * is done. + * + * Note: This function assumes that the caller has already checked all + * parameters for NULL. + */ +static void +adjustVarnoIfReversedCol(Var *var, + Index newRTE, + ViewDefColumnList tentries) +{ + TargetEntry *entry = tentries[var->varattno - 1]; + + /* + * tentries holds NULL if given var isn't on a different location + * in the view Only replace if column order is reversed. + */ + if (entry && entry->resno != var->varattno) + { + var->varattno = entry->resno; + var->varoattno = entry->resno; + } + + /* Finally, make varno point to the *NEW* range table entry. */ + var->varno = newRTE; + var->varnoold = newRTE; +} + +/* + * Creates an equal operator expression for the specified Vars. They + * are assumed to be of the same type. + */ +static OpExpr * +create_opexpr(Var *var_left, Var *var_right) +{ + OpExpr *result; + HeapTuple tuple; + Form_pg_operator operator; + Oid eqOid; + + AssertArg(var_left); + AssertArg(var_right); + Assert(var_left->vartype == var_right->vartype); + + get_sort_group_operators(var_left->vartype, false, true, false, + NULL, &eqOid, NULL); + + tuple = SearchSysCache(OPEROID, ObjectIdGetDatum(eqOid), 0, 0, 0); + + operator = (Form_pg_operator) GETSTRUCT(tuple); + result = makeNode(OpExpr); + + result->opno = HeapTupleGetOid(tuple); + result->opfuncid = operator->oprcode; + result->opresulttype = operator->oprresult; + result->opretset = false; + + result->args = lappend(result->args, var_left); + result->args = lappend(result->args, var_right); + + ReleaseSysCache(tuple); + + return result; +} + +/* + * Creates an expression tree for a WHERE clause. + * + * If from is not NULL, assigns the root node to the specified + * FromExpr of the target query tree. + * + * Note that the function appends the specified opExpr op to the + * specified anchor (if anchor != NULL) and returns that immediately. + * That way this function could be used to add operator nodes to an + * existing BoolExpr tree or (if from is given), to create a new Query + * qualification list. + */ +static Node * +build_expression_tree(FromExpr *from, Node **anchor, BoolExpr *expr, OpExpr *op) +{ + /* Already some nodes there? */ + if (*anchor) + { + expr->args = lappend(expr->args, op); + ((BoolExpr *)(*anchor))->args = lappend(((BoolExpr *)(*anchor))->args, + expr); + *anchor = (Node *)expr; + } + else + { + /* Currently no nodes ... */ + BoolExpr *boolexpr = makeNode(BoolExpr); + expr->args = lappend(expr->args, op); + boolexpr->args = lappend(boolexpr->args, expr); + + *anchor = (Node *) boolexpr; + + if (from) + from->quals = *anchor; + } + + return *anchor; +} + +/* + * Forms the WHERE clause for DELETE/UPDATE rules targeted to the + * specified view. + */ +static void +form_where_for_updrule(const Query *select, /* View retrieve rule */ + FromExpr **from, /* FromExpr for stmt */ + const Relation baserel, /* base relation of view */ + Index baserti, /* Index of base relation RTE */ + Index oldrti) /* Index of *OLD* RTE */ +{ + BoolExpr *expr = NULL; + Node *anchor = NULL; + Form_pg_attribute *attrs; + ListCell *cell; + + AssertArg(baserti > 0); + AssertArg(oldrti > 0); + AssertArg(*from); + AssertArg(baserel); + + attrs = baserel->rd_att->attrs; + + foreach(cell, select->targetList) + { + TargetEntry *te = (TargetEntry *) lfirst(cell); + Var *var1; + Var *var2; + OpExpr *op; + BoolExpr *null_condition; + NullTest *nulltest1; + NullTest *nulltest2; + + /* If te->expr holds no Var pointer, continue. */ + if (!IsA(te->expr, Var)) + continue; + + null_condition = makeNode(BoolExpr); + nulltest1 = makeNode(NullTest); + nulltest2 = makeNode(NullTest); + + /* + * These are the new operands we had to check for equality. + * + * For DELETE/UPDATE rules, var1 points to the *OLD* RTE, var2 + * references the base relation. + */ + var1 = copyObject((Var *) (te->expr)); + + /* + * Look at varoattno to determine whether this attribute has a different + * location in the underlying base table. If that case, retrieve the + * attribute from the base table and assign it to var2; otherwise + * simply copy it to var1. + */ + if (var1->varoattno > 0) + { + var2 = makeNode(Var); + + var2->varno = baserti; + var2->varnoold = baserti; + var2->varattno = attrs[var1->varoattno - 1]->attnum; + var2->vartype = attrs[var1->varoattno - 1]->atttypid; + var2->vartypmod = attrs[var1->varoattno - 1]->atttypmod; + var2->varlevelsup = var1->varlevelsup; + var2->varnoold = var2->varno; + var2->varoattno = var2->varattno; + } + else + { + var2 = copyObject(var1); + var2->varno = baserti; + var2->varnoold = baserti; + } + + var1->varno = oldrti; + var1->varnoold = oldrti; + + /* + * rewrite varattno of var2 to point to the right column in relation + * *OLD* or *NEW* + */ + var2->varattno = te->resorigcol; + var2->varoattno = te->resorigcol; + + /* + * rewrite varattno of var1 to point to the right column in base + * relation + */ + var1->varattno = te->resno; + var1->varoattno = te->resno; + + op = create_opexpr(var1, var2); + expr = makeNode(BoolExpr); + expr->boolop = OR_EXPR; + null_condition->boolop = AND_EXPR; + + nulltest1->arg = (Expr *)var1; + nulltest1->nulltesttype = IS_NULL; + + nulltest2->arg = (Expr *)var2; + nulltest2->nulltesttype = IS_NULL; + + null_condition->args = lappend(null_condition->args, nulltest1); + null_condition->args = lappend(null_condition->args, nulltest2); + expr->args = lappend(expr->args, null_condition); + + anchor = build_expression_tree(*from, (Node **) &anchor, expr, op); + } +} + +/* + * Replaces the varnos for the specified targetlist to rtIndex + */ +static bool +replace_tlist_varno_walker(Node *node, + ViewExprContext *ctxt) +{ + AssertArg(ctxt); + + if (!node) + return false; + + switch(node->type) + { + case T_Var: + elog(DEBUG1, "adjusting varno old %d to new %d", + ((Var *)(node))->varno, + ctxt->newRTE); + + ((Var *)(node))->varno = ctxt->newRTE; + adjustVarnoIfReversedCol((Var *)node, + ctxt->newRTE, + ctxt->tentries); + /* nothing more to do */ + break; + + case T_ArrayRef: + { + ArrayRef *array = (ArrayRef *) node; + + /* + * Things are getting complicated here. We have found an + * array subscripting operation. It's necessary to + * examine all varno's found in this operation to make + * sure, we're getting right. This covers cases where a + * view selects a single index or complete array from a + * base table or view. + */ + + /* + * Look at expressions that evaluate upper array + * indexes. Make sure all varno's are modified. This is + * done by walking the expression tree recursively. + */ + expression_tree_walker((Node *) array->refupperindexpr, + replace_tlist_varno_walker, + (void *)ctxt); + + expression_tree_walker((Node *) array->reflowerindexpr, + replace_tlist_varno_walker, + (void *)ctxt); + + expression_tree_walker((Node *) array->refexpr, + replace_tlist_varno_walker, + (void *)ctxt); + + expression_tree_walker((Node *) array->refassgnexpr, + replace_tlist_varno_walker, + (void *)ctxt); + } + + default: + break; + } + + return expression_tree_walker(node, replace_tlist_varno_walker, ctxt); +} + +/* + * Adds RTEs to form a query tree. + * + * select has to be a valid initialized view definition query tree + * (the function assumes that this query has passed the + * is_select_query_updatable() function). + */ +static Query * +form_update_query(const Query *select, ViewDefColumnList tentries, CmdType type) +{ + RangeTblEntry *rte; + Oid reloid; + Query *newquery; + + AssertArg(select); + AssertArg(tentries); + + newquery = makeNode(Query); + newquery->commandType = type; + + /* copy the range table entries */ + newquery->rtable = copyObject(select->rtable); + + /* prepare other stuff */ + newquery->canSetTag = true; + newquery->jointree = makeNode(FromExpr); + + /* + * Set result relation to the base relation. + * + * Since we currently only support updatable views with one + * underlying table, we simply extract the one relation which + * isn't labeled as *OLD* or *NEW*. + */ + reloid = get_reloid_from_select(select, &(newquery->resultRelation), &rte); + if (!OidIsValid(reloid)) + elog(ERROR, "could not retrieve base relation OID"); + + Assert(newquery->resultRelation > 0); + + /* adjust inFromCl of result range table entry */ + rte->inFromCl = false; + + /* We don't need a target list for DELETE. */ + if (type != CMD_DELETE) + { + ViewExprContext ctxt; + ListCell *cell; + + /* Copy all target entries. */ + newquery->targetList = copyObject(select->targetList); + + /* + * Replace all varnos to point to the *NEW* node in all targetentry + * expressions. + */ + + ctxt.newRTE = PRS2_NEW_VARNO; + ctxt.tentries = tentries; + + foreach(cell, newquery->targetList) + { + Node *node = (Node *) lfirst(cell); + expression_tree_walker(node, + replace_tlist_varno_walker, + (void *) &ctxt); + } + } + + return newquery; +} + +/* + * Rewrite a TargetEntry, based on the given arguments to match + * the new Query tree of the new DELETE/UPDATE/INSERT rule and/or + * its underlying base relation. + * + * form_te_for_update() needs to carefully reassign Varno's of + * all Var expressions assigned to the given TargetEntry and to + * adjust all type info values and attribute index locations so + * that the rewritten TargetEntry corresponds to the correct + * column in the underlying base relation. + * + * Someone should consider that columns could be in reversed + * order in a view definition, so we need to take care to + * "restore" the correct order of all columns in the target list + * of the new view update rules. + * + * There's also some additional overhead if we have an array field + * involved. In this case we have to loop recursively through the + * array expressions to get all target entries right. + */ +static void +form_te_for_update(int2 attnum, Form_pg_attribute attrs, Oid baserelid, + Expr *expr, TargetEntry *te_update) +{ + /* + * First, try if this is an array subscripting operation. If true, dive + * recursively into the subscripting tree examining all varnos. + */ + + if (IsA(expr, ArrayRef)) + { + ArrayRef *array = (ArrayRef *) expr; + + if (array->refassgnexpr != NULL) + form_te_for_update(attnum, attrs, baserelid, array->refassgnexpr, + te_update); + + if (array->refupperindexpr != NIL) + { + ListCell *cell; + + foreach(cell, array->refupperindexpr) + form_te_for_update(attnum, attrs, baserelid, (Expr *) lfirst(cell), te_update); + } + + if (array->reflowerindexpr != NIL) + { + ListCell *cell; + + foreach(cell, array->reflowerindexpr) + form_te_for_update(attnum, attrs, baserelid, (Expr *) lfirst(cell), te_update); + } + + if (array->refexpr != NULL) + form_te_for_update(attnum, attrs, baserelid, array->refexpr, + te_update); + } + else if (IsA(expr, Var)) + { + /* + * Base case of recursion: actually build the TargetEntry. + */ + Var *upd_var = (Var *) (te_update->expr); + + upd_var->varattno = te_update->resno; + upd_var->varoattno = te_update->resno; + + upd_var->vartype = attrs->atttypid; + upd_var->vartypmod = attrs->atttypmod; + + upd_var->varnoold = upd_var->varno; + + te_update->resno = attnum; + te_update->resname = pstrdup(get_attname(baserelid, attnum)); + te_update->ressortgroupref = 0; + te_update->resorigcol = 0; + te_update->resorigtbl = 0; + te_update->resjunk = false; + } +} + +/* + * Create the returning list for the given query tree. This allows + * using RETURING in view update actions. Note that the function + * creates the returning list from the target list of the given query + * tree if src is set to NULL. This requires to call + * build_update_target_list() on that query tree before. If src != + * NULL, the target list is created from this query tree instead. + */ +static void +create_rule_returning_list(Query *query, const Query *src, Index newRTE, + ViewDefColumnList tentries) +{ + ViewExprContext ctxt; + ListCell *cell; + + ctxt.newRTE = newRTE; + ctxt.tentries = tentries; + + /* determine target list source */ + if (src) + query->returningList = copyObject(src->targetList); + else + query->returningList = copyObject(query->targetList); + + foreach(cell, query->returningList) + expression_tree_walker((Node *) lfirst(cell), + replace_tlist_varno_walker, + (void *) &ctxt); +} + +/* + * Build the target list for a view UPDATE rule. + * + * Note: The function assumes a Query tree specified by update, which + * was copied by form_update_query(). We need the original Query tree + * to adjust the properties of each member of the TargetList of the + * new query tree. + */ +static void +build_update_target_list(const Query *update, const Query *select, + Relation baserel) +{ + ListCell *cell1; + ListCell *cell2; + + /* + * This Assert() is appropriate, since we rely on a query tree + * created by from_query(), which copies the target list from the + * original query tree specified by the argument select, which + * holds the current view definition. So both target lists have + * to be equal in length. + */ + Assert(list_length(update->targetList) == list_length(select->targetList)); + + /* + * Copy the target list of the view definition to the + * returningList. This is required to support RETURNING clauses + * in view update actions. + */ + forboth(cell1, select->targetList, cell2, update->targetList) + { + TargetEntry *entry = (TargetEntry *) lfirst(cell1); + TargetEntry *upd_entry = (TargetEntry *) lfirst(cell2); + int attindex; + Form_pg_attribute attr; + + if (entry->resorigcol > 0) + /* + * This column seems to have a different order than in the base + * table. We get the attribute from the base relation referenced + * by rel and create a new resdom. This new result domain is then + * assigned instead of the old one. + */ + attindex = entry->resorigcol; + else + attindex = entry->resno; + + attr = baserel->rd_att->attrs[attindex - 1]; + + form_te_for_update(attindex, attr, baserel->rd_id, upd_entry->expr, + upd_entry); + } +} + +/* + * Examines the columns by the current view and initializes the lookup + * table for all rearranged columns in base relations. The function + * requires a relation tree initialized by get_base_base_relations(). + */ +static void +read_rearranged_cols(ViewBaseRelation *tree) +{ + AssertArg(tree); + + if (tree->defs) + { + int num_items = list_length(tree->defs); + int i; + + /* + * Traverse the relation tree and look on all base relations + * for reversed column order in their target lists. We have + * to perform a look-ahead-read on the tree, because we need + * to know how much columns the next base relation has, to + * allocate enough memory in tentries. + * + * Note that if we only have one base relation (a "real" + * table, not a view) exists, we have nothing to do, because + * this base relation cannot have a reversed column order + * caused by a view definition query. + */ + for (i = num_items - 1; i > 0; i--) + { + ViewBaseRelationItem *item_current; + ViewBaseRelationItem *item_next; + ViewBaseRelation *current; + ViewBaseRelation *next; + + current = (ViewBaseRelation *) list_nth(tree->defs, i); + + /* + * We look ahead for the next base relation. We can do + * this here safely, because the loop condition terminates + * before reaching the list head. + */ + next = (ViewBaseRelation *) list_nth(tree->defs, i - 1); + + /* + * Note that the code currently requires a simply updatable + * relation tree. This means we handle one base relation + * per loop, only. + */ + Assert(next); + Assert(current); + Assert(list_length(next->defs) == 1); + Assert(list_length(current->defs) == 1); + + item_current = (ViewBaseRelationItem *) list_nth(current->defs, 0); + item_next = (ViewBaseRelationItem *) list_nth(next->defs, 0); + + /* allocate tentries buffer */ + item_current->tentries = (ViewDefColumnList) palloc(sizeof(TargetEntry *) * RelationGetNumberOfAttributes(item_next->rel)); + + copyReversedTargetEntryPtr(item_current->rule->targetList, + item_current->tentries); + } + } +} + +/*------------------------------------------------------------------------------ + * Retrieves all relations from the view that can be considered a "base + * relation". The function returns a list that holds lists of all relation + * OIDs found for the view. The list is filled top down, that means the head of + * the list holds the relations for the "highest" view in the tree. + * + * Consider this view definition tree where each node is a relation the above + * node is based on: + * + * 1 + * / \ + * 2 3 + * / \ \ + * 4 5 6 + * / + * 7 + * + * The function will then return a list with the following layout: + * + * Listindex Node(s) + * -------------------------- + * 1 7 + * 2 4 5 6 + * 3 2 3 + * + * As you can see in the table, all relations that are "children" of the + * given root relation (the view relation itself) are saved in the + * tree, except the root node itself. + *------------------------------------------------------------------------------ + */ +static void +get_base_base_relations(const Query *view, Oid baserelid, List **list) +{ + RangeTblEntry *entry; + unsigned int offset = 1; + ViewBaseRelation *childRel; + + if (!view) + return; + + childRel = (ViewBaseRelation *) palloc(sizeof(ViewBaseRelation)); + childRel->defs = NIL; + childRel->parentRelation = baserelid; + + /* Get all OIDs from the RTE list of view. */ + while ((entry = get_relation_RTE(view, &offset)) != NULL) + { + Relation rel; + ViewBaseRelationItem *item; + + /* + * Is this really a view or relation? + */ + rel = relation_open(entry->relid, AccessShareLock); + + if (rel->rd_rel->relkind != RELKIND_RELATION && + rel->rd_rel->relkind != RELKIND_VIEW) + { + /* don't need this one */ + relation_close(rel, AccessShareLock); + continue; + } + + item = (ViewBaseRelationItem *) palloc0(sizeof(ViewBaseRelationItem)); + item->rel = rel; + item->rule = NULL; + + if (rel->rd_rel->relkind == RELKIND_VIEW) + /* + * Get the rule _RETURN expression tree for the specified + * relation OID. We need this to recurse into the view + * base relation tree. + */ + item->rule = get_return_rule(rel); + + elog(DEBUG1, "extracted relation %s for relation tree", + RelationGetRelationName(rel)); + childRel->defs = lappend(childRel->defs, item); + + /* recurse to any other child relations */ + if (item->rule) + get_base_base_relations(item->rule, RelationGetRelid(rel), list); + + } + + if (childRel->defs) + *list = lappend(*list, childRel); +} + +static void +copyReversedTargetEntryPtr(List *targetList, ViewDefColumnList targets) +{ + ListCell *cell; + + AssertArg(targets); + AssertArg(targetList); + + /* NOTE: We only reassign pointers. */ + foreach(cell, targetList) + { + Node *node = (Node *) lfirst(cell); + + if (IsA(node, TargetEntry)) + { + /* + * Look at the resdom's resorigcol to determine whether + * this is a reversed column (meaning, it has a different + * column number than the underlying base table). + */ + TargetEntry *entry = (TargetEntry *) node; + + if (!IsA(entry->expr, Var)) + /* nothing to do here */ + continue; + + if (entry->resorigcol > 0 && entry->resno != entry->resorigcol) + { + /* + * Save this TargetEntry to the appropiate place in + * the lookup table. Do it only if not already + * occupied (this could happen if the column is + * specified more than one time in the view + * definition). + */ + if (targets[entry->resorigcol - 1] == NULL) + targets[entry->resorigcol - 1] = entry; + } + } + } +} + +/* + * Transforms the specified view definition into an INSERT, UPDATE, or + * DELETE rule. + * + * Note: The function assumes that the specified query tree has passed the + * is_select_query_updatable() function. + */ +static void +create_update_rule(Oid viewOid, const Query *select, const Relation baserel, + ViewDefColumnList tentries, + CmdType ruletype) +{ + Query *newquery; + Oid baserelid; + Index baserti; + RangeTblEntry *rte; + + AssertArg(tentries); + AssertArg(baserel); + AssertArg(select); + AssertArg(ruletype == CMD_INSERT || ruletype == CMD_UPDATE || ruletype == CMD_DELETE); + + newquery = form_update_query(select, tentries, ruletype); + + /* + * form_update_query() has prepared the jointree of the new UPDATE/DELETE rule. + * + * Now, our UPDATE rule needs range table references for the *NEW* + * and base relation RTEs. A DELETE rule needs range table + * references for the *OLD* and base relation RTEs. + */ + + baserelid = get_reloid_from_select(select, NULL, &rte); + if (!OidIsValid(baserelid)) + elog(ERROR, "could not get the base relation from the view definition"); + + baserti = get_rtindex_for_rel(newquery->rtable, + rte->eref->aliasname); + Assert(baserti > 0); + + rte = rt_fetch(baserti, newquery->rtable); + + if (ruletype != CMD_INSERT) + { + RangeTblRef *oldref; + RangeTblRef *baseref; + + oldref = makeNode(RangeTblRef); + oldref->rtindex = PRS2_OLD_VARNO; + + baseref = makeNode(RangeTblRef); + baseref->rtindex = baserti; + + newquery->jointree->fromlist = list_make2(baseref, oldref); + + /* Create the WHERE condition qualification for the rule action. */ + form_where_for_updrule(select, &(newquery->jointree), + baserel, baserti, PRS2_OLD_VARNO); + } + + if (ruletype != CMD_DELETE) + /* + * We must reorder the columns in the targetlist to match the + * underlying table. We do this after calling + * form_where_for_updrule() because build_update_target_list() + * relies on the original resdoms in the update tree. + */ + build_update_target_list(newquery, select, baserel); + + /* + * Create the returning list now that build_update_target_list() + * has done the leg work. + */ + if (ruletype == CMD_DELETE) + create_rule_returning_list(newquery, select, PRS2_OLD_VARNO, tentries); + else + create_rule_returning_list(newquery, NULL, PRS2_NEW_VARNO, tentries); + + /* Set ACL bit */ + if (ruletype == CMD_INSERT) + rte->requiredPerms |= ACL_INSERT; + else if (ruletype == CMD_UPDATE) + rte->requiredPerms |= ACL_UPDATE; + else if (ruletype == CMD_DELETE) + rte->requiredPerms |= ACL_DELETE; + + /* Create system rule */ + DefineQueryRewrite(pstrdup(get_auto_rule_name(ruletype)), + viewOid, /* event_relid */ + NULL, /* WHERE clause */ + ruletype, + true, /* is_instead */ + true, /* is_auto */ + false, /* replace */ + list_make1(newquery) /* action */); +} + +/* + * Checks the specified Query for updatability. Currently, "simply + * updatable" rules are implemented. + */ +static bool +is_select_query_updatable(const Query *query) +{ + ListCell *cell; + List *seen_attnos; + + AssertArg(query); + AssertArg(query->commandType == CMD_SELECT); + + /* + * check for unsupported clauses in the view definition + */ + + if (query->hasAggs) + { + elog(DEBUG1, "view is not updatable because it uses an aggregate function"); + return false; + } + + if (query->hasWindowFuncs) + { + elog(DEBUG1, "view is not updatable because it uses a window function"); + return false; + } + + if (query->hasRecursive) + { + elog(DEBUG1, "view is not updatable because it contains a WITH RECURSIVE clause"); + return false; + } + + if (query->cteList) + { + elog(DEBUG1, "view is not updatable because it contains a WITH clause"); + return false; + } + + if (list_length(query->groupClause) >= 1) + { + elog(DEBUG1, "view is not updatable because it contains a GROUP BY clause"); + return false; + } + + if (query->havingQual) + { + elog(DEBUG1, "view is not updatable because it contains a HAVING clause"); + return false; + } + + if (list_length(query->distinctClause) >= 1) + { + elog(DEBUG1, "view is not updatable because it contains a DISTINCT clause"); + return false; + } + + if (query->limitOffset) + { + elog(DEBUG1, "view is not updatable because it contains an OFFSET clause"); + return false; + } + + if (query->limitCount) + { + elog(DEBUG1, "view is not updatable because it contains a LIMIT clause"); + return false; + } + + if (query->setOperations) + { + elog(DEBUG1, "view is not updatable because it contains UNION or INTERSECT or EXCEPT"); + return false; + } + + /* + * Test for number of involved relations. Since we assume to + * operate on a view definition SELECT query tree, we must count 3 + * rtable entries. Otherwise this seems not to be a view based on + * a single relation. + */ + if (list_length(query->rtable) > 3) + { + elog(DEBUG1, "view is not updatable because it has more than one underlying table"); + return false; + } + + /* Any rtable entries involved? */ + if (list_length(query->rtable) < 3) + { + elog(DEBUG1, "view is not updatable because it has no underlying tables"); + return false; + } + + /* + * Walk down the target list and look for nodes that aren't Vars. + * "Simply updatable" doesn't allow functions, host variables, or + * constant expressions in the target list. + * + * Also, check if any of the target list entries are indexed array + * expressions, which aren't supported. + */ + seen_attnos = NIL; + + foreach(cell, query->targetList) + { + Node *node = (Node *) lfirst(cell); + + if (IsA(node, TargetEntry)) + { + TargetEntry *te = (TargetEntry *) node; + + /* + * TODO -- it would be nice to support Const nodes here as well + * (but apparently it isn't in the standard) + */ + if (!IsA(te->expr, Var) && !IsA(te->expr, ArrayRef)) + { + elog(DEBUG1, "view is not updatable because select list contains a derived column"); + return false; + } + + /* This is currently only partially implemented, but can be fixed. */ + if (IsA(te->expr, ArrayRef)) + { + elog(DEBUG1, "view is not updatable because select list contains an array element reference"); + return false; + } + + if (IsA(te->expr, Var)) + { + Var *var = (Var *) te->expr; + + /* System columns aren't updatable. */ + if (var->varattno < 0) + { + elog(DEBUG1, "view is not updatable because select list references a system column"); + return false; + } + + if (list_member_int(seen_attnos, var->varattno)) + { + elog(DEBUG1, "view is not updatable because select list references the same column more than once"); + return false; + } + else + seen_attnos = lappend_int(seen_attnos, var->varattno); + } + } + } + + /* + * Finally, check that all RTEs are acceptable. This rejects + * table functions, which cannot ever be updatable, and also WITH + * clauses. + */ + foreach(cell, query->rtable) + { + RangeTblEntry *entry = (RangeTblEntry *) lfirst(cell); + + if (entry->rtekind != RTE_RELATION) + { + elog(DEBUG1, "view is not updatable because correlation \"%s\" is not a table", + entry->eref->aliasname); + return false; + } + } + + return true; +} + +/* + * Traverse the specified relation tree. The function stops at the + * base relations at the leafs of the tree. If any of the relations + * has more than one base relation, it is considered a not simply + * updatable view and false is returned. + */ +static bool +check_reltree(ViewBaseRelation *node) +{ + ListCell *cell; + + AssertArg(node); + + foreach(cell, node->defs) + { + /* Walk down the tree */ + ViewBaseRelation *relations = (ViewBaseRelation *) lfirst(cell); + + if (list_length(relations->defs) > 1) + { + elog(DEBUG1, "possible JOIN/UNION in view definition: %d", list_length(relations->defs)); + return false; + } + else if (list_length(relations->defs) == 1) { + ViewBaseRelationItem *item = (ViewBaseRelationItem *) linitial(relations->defs); + + /* if the relation found is a view, check its updatability */ + if (item->rel->rd_rel->relkind == RELKIND_VIEW && !is_select_query_updatable(item->rule)) + { + elog(DEBUG1, "base view \"%s\" is not updatable", + RelationGetRelationName(item->rel)); + return false; + } + } + } + + return true; +} + +/* + * Given a SELECT query tree, return the OID of the first RTE_RELATION range + * table entry found that is not *NEW* nor *OLD*. + * + * Also sets the RangeTblEntry pointer into rel_entry, and the range + * table index into rti, unless they are NULL. + * + * This function assumes that the specified query tree was checked by a + * previous call to the is_select_query_updatable() function. + */ +static Oid +get_reloid_from_select(const Query *select, int *rti, RangeTblEntry **rel_entry) +{ + ListCell *cell; + Oid result = InvalidOid; + int index; + + /* Check specified query tree. Return immediately on error. */ + if (select == NULL || select->commandType != CMD_SELECT) + return InvalidOid; + + /* + * We loop through the RTEs to get information about all involved + * relations. We return the first OID we find in the list that is not + * *NEW* nor *OLD*. + */ + index = 0; + foreach(cell, select->rtable) + { + RangeTblEntry *entry = (RangeTblEntry *) lfirst(cell); + + index++; + + if (entry == NULL) + elog(ERROR, "null RTE pointer in get_reloid_from_select"); + + elog(DEBUG1, "extracted range table entry for %u", entry->relid); + + /* Return the first RELATION rte we find */ + if (entry->rtekind == RTE_RELATION) + { + /* + * XXX This is ugly. The parser prepends two RTEs with rtekind + * RTE_RELATION named *NEW* and *OLD*. We have to exclude them by + * name! It would be much better if it used RTE_SPECIAL + * instead, but other parts of the system stop working if one + * just changes it naively. + */ + if (strncmp(entry->eref->aliasname, "*NEW*", 6) == 0 + || strncmp(entry->eref->aliasname, "*OLD*", 6) == 0) + continue; + + result = entry->relid; + if (rti != NULL) + *rti = index; + if (rel_entry != NULL) + *rel_entry = entry; + break; + } + } + + return result; +} + +/* + * get_return_rule: returns the _RETURN rule of a view as a Query node. + */ +static Query * +get_return_rule(Relation rel) +{ + Query *query = NULL; + int i; + + AssertArg(rel->rd_rel->relkind == RELKIND_VIEW); + + for (i = 0; i < rel->rd_rules->numLocks; i++) + { + RewriteRule *rule = rel->rd_rules->rules[i]; + + if (rule->event == CMD_SELECT) + { + /* A _RETURN rule should have only one action */ + if (list_length(rule->actions) != 1) + elog(ERROR, "invalid _RETURN rule action specification"); + + query = linitial(rule->actions); + break; + } + } + + return query; +} + +/*------------------------------------------------------------------------------ + * Public functions + *------------------------------------------------------------------------------ + */ + +/* + * CreateViewUpdateRules + * + * This is the main entry point to creating an updatable view's rules. Given a + * rule definition, examine it, and create the rules if appropiate, or return + * doing nothing if not. + */ +void +CreateViewUpdateRules(Oid viewOid, const Query *viewDef) +{ + Relation baserel; + Form_pg_attribute *attrs; + ViewDefColumnList tentries; + Oid baserelid; + MemoryContext cxt; + MemoryContext oldcxt; + ViewBaseRelation *tree; + ListCell *cell; + + /* + * The routines in this file leak memory like crazy, so make sure we + * allocate it all in an appropiate context. + */ + cxt = AllocSetContextCreate(TopTransactionContext, + "UpdateRulesContext", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + oldcxt = MemoryContextSwitchTo(cxt); + + /* + * Create the lookup table for the view definition target columns. We save + * the RESDOMS in that manner to look quickly for reversed column orders. + */ + + baserelid = get_reloid_from_select(viewDef, NULL, NULL); + + /* Get relation tree */ + tree = (ViewBaseRelation *) palloc(sizeof(ViewBaseRelation)); + + tree->parentRelation = InvalidOid; + tree->defs = NIL; + get_base_base_relations(viewDef, baserelid, &(tree->defs)); + + /* Check the query tree for updatability */ + if (!check_reltree(tree) || !is_select_query_updatable(viewDef)) + { + elog(DEBUG1, "view is not updatable"); + goto finish; + } + + baserel = heap_open(baserelid, AccessShareLock); + attrs = baserel->rd_att->attrs; + + /* + * Copy TargetEntries to match the slot numbers in the target list with + * their original column attribute number. Note that only pointers are + * copied and they are valid only as long as the specified SELECT query + * stays valid! + */ + tentries = (ViewDefColumnList) + palloc0(baserel->rd_rel->relnatts * sizeof(TargetEntry *)); + + copyReversedTargetEntryPtr(viewDef->targetList, tentries); + + /* + * Now do the same for the base relation tree. read_rearranged_cols + * traverses the relation tree and performs a copyReversedTargetEntry() + * call to each base relation. + */ + read_rearranged_cols(tree); + + create_update_rule(viewOid, viewDef, baserel, tentries, CMD_INSERT); + create_update_rule(viewOid, viewDef, baserel, tentries, CMD_DELETE); + create_update_rule(viewOid, viewDef, baserel, tentries, CMD_UPDATE); + + ereport(NOTICE, (errmsg("CREATE VIEW has created automatic view update rules"))); + + /* free remaining stuff */ + heap_close(baserel, NoLock); + +finish: + /* get_base_base_relations leaves some open relations */ + foreach(cell, tree->defs) + { + ListCell *cell2; + ViewBaseRelation *vbr = (ViewBaseRelation *) lfirst(cell); + + foreach(cell2, vbr->defs) + { + ViewBaseRelationItem *vbri = (ViewBaseRelationItem *) lfirst(cell2); + + relation_close(vbri->rel, NoLock); + } + } + + MemoryContextSwitchTo(oldcxt); + MemoryContextDelete(cxt); +} |