aboutsummaryrefslogtreecommitdiff
path: root/src/backend/commands/tablecmds.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/commands/tablecmds.c')
-rw-r--r--src/backend/commands/tablecmds.c134
1 files changed, 85 insertions, 49 deletions
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index adf278bb1fa..ae62c698499 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -73,6 +73,7 @@
#include "storage/lock.h"
#include "storage/predicate.h"
#include "storage/smgr.h"
+#include "tcop/utility.h"
#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
@@ -272,7 +273,8 @@ static void checkFkeyPermissions(Relation rel, int16 *attnums, int natts);
static void validateForeignKeyConstraint(char *conname,
Relation rel, Relation pkrel,
Oid pkindOid, Oid constraintOid);
-static void createForeignKeyTriggers(Relation rel, Constraint *fkconstraint,
+static void createForeignKeyTriggers(Relation rel, Oid refRelOid,
+ Constraint *fkconstraint,
Oid constraintOid, Oid indexOid);
static void ATController(Relation rel, List *cmds, bool recurse, LOCKMODE lockmode);
static void ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
@@ -346,7 +348,8 @@ static bool ATColumnChangeRequiresRewrite(Node *expr, AttrNumber varattno);
static void ATExecAlterColumnType(AlteredTableInfo *tab, Relation rel,
AlterTableCmd *cmd, LOCKMODE lockmode);
static void ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab, LOCKMODE lockmode);
-static void ATPostAlterTypeParse(char *cmd, List **wqueue, LOCKMODE lockmode);
+static void ATPostAlterTypeParse(Oid oldRelId, Oid refRelId, char *cmd,
+ List **wqueue, LOCKMODE lockmode);
static void change_owner_fix_column_acls(Oid relationOid,
Oid oldOwnerId, Oid newOwnerId);
static void change_owner_recurse_to_sequences(Oid relationOid,
@@ -2378,15 +2381,13 @@ CheckTableNotInUse(Relation rel, const char *stmt)
* rather than reassess it at lower levels.
*/
void
-AlterTable(AlterTableStmt *stmt)
+AlterTable(Oid relid, AlterTableStmt *stmt)
{
Relation rel;
LOCKMODE lockmode = AlterTableGetLockLevel(stmt->cmds);
- /*
- * Acquire same level of lock as already acquired during parsing.
- */
- rel = relation_openrv(stmt->relation, lockmode);
+ /* Caller is required to provide an adequate lock. */
+ rel = relation_open(relid, NoLock);
CheckTableNotInUse(rel, "ALTER TABLE");
@@ -5153,7 +5154,7 @@ ATExecAddIndex(AlteredTableInfo *tab, Relation rel,
/* The IndexStmt has already been through transformIndexStmt */
- DefineIndex(stmt->relation, /* relation */
+ DefineIndex(RelationGetRelid(rel), /* relation */
stmt->idxname, /* index name */
InvalidOid, /* no predefined OID */
stmt->accessMethod, /* am name */
@@ -5465,7 +5466,10 @@ ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
* table; trying to start with a lesser lock will just create a risk of
* deadlock.)
*/
- pkrel = heap_openrv(fkconstraint->pktable, AccessExclusiveLock);
+ if (OidIsValid(fkconstraint->old_pktable_oid))
+ pkrel = heap_open(fkconstraint->old_pktable_oid, AccessExclusiveLock);
+ else
+ pkrel = heap_openrv(fkconstraint->pktable, AccessExclusiveLock);
/*
* Validity checks (permission checks wait till we have the column
@@ -5713,7 +5717,8 @@ ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
/*
* Create the triggers that will enforce the constraint.
*/
- createForeignKeyTriggers(rel, fkconstraint, constrOid, indexOid);
+ createForeignKeyTriggers(rel, RelationGetRelid(pkrel), fkconstraint,
+ constrOid, indexOid);
/*
* Tell Phase 3 to check that the constraint is satisfied by existing rows.
@@ -6202,14 +6207,14 @@ validateForeignKeyConstraint(char *conname,
}
static void
-CreateFKCheckTrigger(RangeVar *myRel, Constraint *fkconstraint,
+CreateFKCheckTrigger(Oid myRelOid, Oid refRelOid, Constraint *fkconstraint,
Oid constraintOid, Oid indexOid, bool on_insert)
{
CreateTrigStmt *fk_trigger;
fk_trigger = makeNode(CreateTrigStmt);
fk_trigger->trigname = "RI_ConstraintTrigger";
- fk_trigger->relation = myRel;
+ fk_trigger->relation = NULL;
fk_trigger->row = true;
fk_trigger->timing = TRIGGER_TYPE_AFTER;
@@ -6230,10 +6235,11 @@ CreateFKCheckTrigger(RangeVar *myRel, Constraint *fkconstraint,
fk_trigger->isconstraint = true;
fk_trigger->deferrable = fkconstraint->deferrable;
fk_trigger->initdeferred = fkconstraint->initdeferred;
- fk_trigger->constrrel = fkconstraint->pktable;
+ fk_trigger->constrrel = NULL;
fk_trigger->args = NIL;
- (void) CreateTrigger(fk_trigger, NULL, constraintOid, indexOid, true);
+ (void) CreateTrigger(fk_trigger, NULL, myRelOid, refRelOid, constraintOid,
+ indexOid, true);
/* Make changes-so-far visible */
CommandCounterIncrement();
@@ -6243,18 +6249,13 @@ CreateFKCheckTrigger(RangeVar *myRel, Constraint *fkconstraint,
* Create the triggers that implement an FK constraint.
*/
static void
-createForeignKeyTriggers(Relation rel, Constraint *fkconstraint,
+createForeignKeyTriggers(Relation rel, Oid refRelOid, Constraint *fkconstraint,
Oid constraintOid, Oid indexOid)
{
- RangeVar *myRel;
+ Oid myRelOid;
CreateTrigStmt *fk_trigger;
- /*
- * Reconstruct a RangeVar for my relation (not passed in, unfortunately).
- */
- myRel = makeRangeVar(get_namespace_name(RelationGetNamespace(rel)),
- pstrdup(RelationGetRelationName(rel)),
- -1);
+ myRelOid = RelationGetRelid(rel);
/* Make changes-so-far visible */
CommandCounterIncrement();
@@ -6265,14 +6266,14 @@ createForeignKeyTriggers(Relation rel, Constraint *fkconstraint,
*/
fk_trigger = makeNode(CreateTrigStmt);
fk_trigger->trigname = "RI_ConstraintTrigger";
- fk_trigger->relation = fkconstraint->pktable;
+ fk_trigger->relation = NULL;
fk_trigger->row = true;
fk_trigger->timing = TRIGGER_TYPE_AFTER;
fk_trigger->events = TRIGGER_TYPE_DELETE;
fk_trigger->columns = NIL;
fk_trigger->whenClause = NULL;
fk_trigger->isconstraint = true;
- fk_trigger->constrrel = myRel;
+ fk_trigger->constrrel = NULL;
switch (fkconstraint->fk_del_action)
{
case FKCONSTR_ACTION_NOACTION:
@@ -6307,7 +6308,8 @@ createForeignKeyTriggers(Relation rel, Constraint *fkconstraint,
}
fk_trigger->args = NIL;
- (void) CreateTrigger(fk_trigger, NULL, constraintOid, indexOid, true);
+ (void) CreateTrigger(fk_trigger, NULL, refRelOid, myRelOid, constraintOid,
+ indexOid, true);
/* Make changes-so-far visible */
CommandCounterIncrement();
@@ -6318,14 +6320,14 @@ createForeignKeyTriggers(Relation rel, Constraint *fkconstraint,
*/
fk_trigger = makeNode(CreateTrigStmt);
fk_trigger->trigname = "RI_ConstraintTrigger";
- fk_trigger->relation = fkconstraint->pktable;
+ fk_trigger->relation = NULL;
fk_trigger->row = true;
fk_trigger->timing = TRIGGER_TYPE_AFTER;
fk_trigger->events = TRIGGER_TYPE_UPDATE;
fk_trigger->columns = NIL;
fk_trigger->whenClause = NULL;
fk_trigger->isconstraint = true;
- fk_trigger->constrrel = myRel;
+ fk_trigger->constrrel = NULL;
switch (fkconstraint->fk_upd_action)
{
case FKCONSTR_ACTION_NOACTION:
@@ -6360,7 +6362,8 @@ createForeignKeyTriggers(Relation rel, Constraint *fkconstraint,
}
fk_trigger->args = NIL;
- (void) CreateTrigger(fk_trigger, NULL, constraintOid, indexOid, true);
+ (void) CreateTrigger(fk_trigger, NULL, refRelOid, myRelOid, constraintOid,
+ indexOid, true);
/* Make changes-so-far visible */
CommandCounterIncrement();
@@ -6380,8 +6383,10 @@ createForeignKeyTriggers(Relation rel, Constraint *fkconstraint,
* and the use of self-referential FKs is rare enough, that we live with
* it for now. There will be a real fix in PG 9.2.
*/
- CreateFKCheckTrigger(myRel, fkconstraint, constraintOid, indexOid, true);
- CreateFKCheckTrigger(myRel, fkconstraint, constraintOid, indexOid, false);
+ CreateFKCheckTrigger(myRelOid, refRelOid, fkconstraint, constraintOid,
+ indexOid, true);
+ CreateFKCheckTrigger(myRelOid, refRelOid, fkconstraint, constraintOid,
+ indexOid, false);
}
/*
@@ -7179,7 +7184,8 @@ static void
ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab, LOCKMODE lockmode)
{
ObjectAddress obj;
- ListCell *l;
+ ListCell *def_item;
+ ListCell *oid_item;
/*
* Re-parse the index and constraint definitions, and attach them to the
@@ -7188,11 +7194,36 @@ ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab, LOCKMODE lockmode)
* lock on the table the constraint is attached to, and we need to get
* that before dropping. It's safe because the parser won't actually look
* at the catalogs to detect the existing entry.
+ *
+ * We can't rely on the output of deparsing to tell us which relation
+ * to operate on, because concurrent activity might have made the name
+ * resolve differently. Instead, we've got to use the OID of the
+ * constraint or index we're processing to figure out which relation
+ * to operate on.
*/
- foreach(l, tab->changedIndexDefs)
- ATPostAlterTypeParse((char *) lfirst(l), wqueue, lockmode);
- foreach(l, tab->changedConstraintDefs)
- ATPostAlterTypeParse((char *) lfirst(l), wqueue, lockmode);
+ forboth(oid_item, tab->changedConstraintOids,
+ def_item, tab->changedConstraintDefs)
+ {
+ Oid oldId = lfirst_oid(oid_item);
+ Oid relid;
+ Oid confrelid;
+
+ get_constraint_relation_oids(oldId, &relid, &confrelid);
+ ATPostAlterTypeParse(relid, confrelid,
+ (char *) lfirst(def_item),
+ wqueue, lockmode);
+ }
+ forboth(oid_item, tab->changedIndexOids,
+ def_item, tab->changedIndexDefs)
+ {
+ Oid oldId = lfirst_oid(oid_item);
+ Oid relid;
+
+ relid = IndexGetRelation(oldId);
+ ATPostAlterTypeParse(relid, InvalidOid,
+ (char *) lfirst(def_item),
+ wqueue, lockmode);
+ }
/*
* Now we can drop the existing constraints and indexes --- constraints
@@ -7202,18 +7233,18 @@ ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab, LOCKMODE lockmode)
* should be okay to use DROP_RESTRICT here, since nothing else should be
* depending on these objects.
*/
- foreach(l, tab->changedConstraintOids)
+ foreach(oid_item, tab->changedConstraintOids)
{
obj.classId = ConstraintRelationId;
- obj.objectId = lfirst_oid(l);
+ obj.objectId = lfirst_oid(oid_item);
obj.objectSubId = 0;
performDeletion(&obj, DROP_RESTRICT);
}
- foreach(l, tab->changedIndexOids)
+ foreach(oid_item, tab->changedIndexOids)
{
obj.classId = RelationRelationId;
- obj.objectId = lfirst_oid(l);
+ obj.objectId = lfirst_oid(oid_item);
obj.objectSubId = 0;
performDeletion(&obj, DROP_RESTRICT);
}
@@ -7225,11 +7256,13 @@ ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab, LOCKMODE lockmode)
}
static void
-ATPostAlterTypeParse(char *cmd, List **wqueue, LOCKMODE lockmode)
+ATPostAlterTypeParse(Oid oldRelId, Oid refRelId, char *cmd, List **wqueue,
+ LOCKMODE lockmode)
{
List *raw_parsetree_list;
List *querytree_list;
ListCell *list_item;
+ Relation rel;
/*
* We expect that we will get only ALTER TABLE and CREATE INDEX
@@ -7245,16 +7278,21 @@ ATPostAlterTypeParse(char *cmd, List **wqueue, LOCKMODE lockmode)
if (IsA(stmt, IndexStmt))
querytree_list = lappend(querytree_list,
- transformIndexStmt((IndexStmt *) stmt,
+ transformIndexStmt(oldRelId,
+ (IndexStmt *) stmt,
cmd));
else if (IsA(stmt, AlterTableStmt))
querytree_list = list_concat(querytree_list,
- transformAlterTableStmt((AlterTableStmt *) stmt,
+ transformAlterTableStmt(oldRelId,
+ (AlterTableStmt *) stmt,
cmd));
else
querytree_list = lappend(querytree_list, stmt);
}
+ /* Caller should already have acquired whatever lock we need. */
+ rel = relation_open(oldRelId, NoLock);
+
/*
* Attach each generated command to the proper place in the work queue.
* Note this could result in creation of entirely new work-queue entries.
@@ -7266,7 +7304,6 @@ ATPostAlterTypeParse(char *cmd, List **wqueue, LOCKMODE lockmode)
foreach(list_item, querytree_list)
{
Node *stm = (Node *) lfirst(list_item);
- Relation rel;
AlteredTableInfo *tab;
switch (nodeTag(stm))
@@ -7276,14 +7313,12 @@ ATPostAlterTypeParse(char *cmd, List **wqueue, LOCKMODE lockmode)
IndexStmt *stmt = (IndexStmt *) stm;
AlterTableCmd *newcmd;
- rel = relation_openrv(stmt->relation, lockmode);
tab = ATGetQueueEntry(wqueue, rel);
newcmd = makeNode(AlterTableCmd);
newcmd->subtype = AT_ReAddIndex;
newcmd->def = (Node *) stmt;
tab->subcmds[AT_PASS_OLD_INDEX] =
lappend(tab->subcmds[AT_PASS_OLD_INDEX], newcmd);
- relation_close(rel, NoLock);
break;
}
case T_AlterTableStmt:
@@ -7291,7 +7326,6 @@ ATPostAlterTypeParse(char *cmd, List **wqueue, LOCKMODE lockmode)
AlterTableStmt *stmt = (AlterTableStmt *) stm;
ListCell *lcmd;
- rel = relation_openrv(stmt->relation, lockmode);
tab = ATGetQueueEntry(wqueue, rel);
foreach(lcmd, stmt->cmds)
{
@@ -7314,7 +7348,6 @@ ATPostAlterTypeParse(char *cmd, List **wqueue, LOCKMODE lockmode)
(int) cmd->subtype);
}
}
- relation_close(rel, NoLock);
break;
}
default:
@@ -7322,8 +7355,9 @@ ATPostAlterTypeParse(char *cmd, List **wqueue, LOCKMODE lockmode)
(int) nodeTag(stm));
}
}
-}
+ relation_close(rel, NoLock);
+}
/*
* ALTER TABLE OWNER
@@ -9002,7 +9036,8 @@ ATExecGenericOptions(Relation rel, List *options)
/*
* Execute ALTER TABLE SET SCHEMA
*
- * Note: caller must have checked ownership of the relation already
+ * WARNING WARNING WARNING: In previous *minor* releases the caller was
+ * responsible for checking ownership of the relation, but now we do it here.
*/
void
AlterTableNamespace(RangeVar *relation, const char *newschema,
@@ -9017,6 +9052,7 @@ AlterTableNamespace(RangeVar *relation, const char *newschema,
rel = relation_openrv(relation, lockmode);
relid = RelationGetRelid(rel);
+ CheckRelationOwnership(relid, true);
oldNspOid = RelationGetNamespace(rel);
/* Check relation type against type specified in the ALTER command */