aboutsummaryrefslogtreecommitdiff
path: root/src/backend/commands/tablecmds.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/commands/tablecmds.c')
-rw-r--r--src/backend/commands/tablecmds.c155
1 files changed, 51 insertions, 104 deletions
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index 3b8a2199e59..e3e6345b509 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -325,7 +325,8 @@ static void AlterSeqNamespaces(Relation classRel, Relation rel,
LOCKMODE lockmode);
static ObjectAddress ATExecAlterConstraint(Relation rel, AlterTableCmd *cmd,
bool recurse, bool recursing, LOCKMODE lockmode);
-static ObjectAddress ATExecValidateConstraint(Relation rel, char *constrName,
+static ObjectAddress ATExecValidateConstraint(List **wqueue,
+ Relation rel, char *constrName,
bool recurse, bool recursing, LOCKMODE lockmode);
static int transformColumnNameList(Oid relId, List *colList,
int16 *attnums, Oid *atttypids);
@@ -339,7 +340,6 @@ static Oid transformFkeyCheckAttrs(Relation pkrel,
static void checkFkeyPermissions(Relation rel, int16 *attnums, int natts);
static CoercionPathType findFkeyCast(Oid targetTypeId, Oid sourceTypeId,
Oid *funcid);
-static void validateCheckConstraint(Relation rel, HeapTuple constrtup);
static void validateForeignKeyConstraint(char *conname,
Relation rel, Relation pkrel,
Oid pkindOid, Oid constraintOid);
@@ -4399,13 +4399,13 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, Relation rel,
address = ATExecAlterConstraint(rel, cmd, false, false, lockmode);
break;
case AT_ValidateConstraint: /* VALIDATE CONSTRAINT */
- address = ATExecValidateConstraint(rel, cmd->name, false, false,
- lockmode);
+ address = ATExecValidateConstraint(wqueue, rel, cmd->name, false,
+ false, lockmode);
break;
case AT_ValidateConstraintRecurse: /* VALIDATE CONSTRAINT with
* recursion */
- address = ATExecValidateConstraint(rel, cmd->name, true, false,
- lockmode);
+ address = ATExecValidateConstraint(wqueue, rel, cmd->name, true,
+ false, lockmode);
break;
case AT_DropConstraint: /* DROP CONSTRAINT */
ATExecDropConstraint(rel, cmd->name, cmd->behavior,
@@ -9314,8 +9314,8 @@ ATExecAlterConstraint(Relation rel, AlterTableCmd *cmd,
* was already validated, InvalidObjectAddress is returned.
*/
static ObjectAddress
-ATExecValidateConstraint(Relation rel, char *constrName, bool recurse,
- bool recursing, LOCKMODE lockmode)
+ATExecValidateConstraint(List **wqueue, Relation rel, char *constrName,
+ bool recurse, bool recursing, LOCKMODE lockmode)
{
Relation conrel;
SysScanDesc scan;
@@ -9361,27 +9361,31 @@ ATExecValidateConstraint(Relation rel, char *constrName, bool recurse,
if (!con->convalidated)
{
+ AlteredTableInfo *tab;
HeapTuple copyTuple;
Form_pg_constraint copy_con;
if (con->contype == CONSTRAINT_FOREIGN)
{
- Relation refrel;
+ NewConstraint *newcon;
+ Constraint *fkconstraint;
- /*
- * Triggers are already in place on both tables, so a concurrent
- * write that alters the result here is not possible. Normally we
- * can run a query here to do the validation, which would only
- * require AccessShareLock. In some cases, it is possible that we
- * might need to fire triggers to perform the check, so we take a
- * lock at RowShareLock level just in case.
- */
- refrel = table_open(con->confrelid, RowShareLock);
+ /* Queue validation for phase 3 */
+ fkconstraint = makeNode(Constraint);
+ /* for now this is all we need */
+ fkconstraint->conname = constrName;
- validateForeignKeyConstraint(constrName, rel, refrel,
- con->conindid,
- con->oid);
- table_close(refrel, NoLock);
+ newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
+ newcon->name = constrName;
+ newcon->contype = CONSTR_FOREIGN;
+ newcon->refrelid = con->confrelid;
+ newcon->refindid = con->conindid;
+ newcon->conid = con->oid;
+ newcon->qual = (Node *) fkconstraint;
+
+ /* Find or create work queue entry for this table */
+ tab = ATGetQueueEntry(wqueue, rel);
+ tab->constraints = lappend(tab->constraints, newcon);
/*
* We disallow creating invalid foreign keys to or from
@@ -9392,6 +9396,11 @@ ATExecValidateConstraint(Relation rel, char *constrName, bool recurse,
{
List *children = NIL;
ListCell *child;
+ NewConstraint *newcon;
+ bool isnull;
+ Datum val;
+ char *conbin;
+
/*
* If we're recursing, the parent has already done this, so skip
@@ -9431,12 +9440,30 @@ ATExecValidateConstraint(Relation rel, char *constrName, bool recurse,
/* find_all_inheritors already got lock */
childrel = table_open(childoid, NoLock);
- ATExecValidateConstraint(childrel, constrName, false,
+ ATExecValidateConstraint(wqueue, childrel, constrName, false,
true, lockmode);
table_close(childrel, NoLock);
}
- validateCheckConstraint(rel, tuple);
+ /* Queue validation for phase 3 */
+ newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
+ newcon->name = constrName;
+ newcon->contype = CONSTR_CHECK;
+ newcon->refrelid = InvalidOid;
+ newcon->refindid = InvalidOid;
+ newcon->conid = con->oid;
+
+ val = SysCacheGetAttr(CONSTROID, tuple,
+ Anum_pg_constraint_conbin, &isnull);
+ if (isnull)
+ elog(ERROR, "null conbin for constraint %u", con->oid);
+
+ conbin = TextDatumGetCString(val);
+ newcon->qual = (Node *) stringToNode(conbin);
+
+ /* Find or create work queue entry for this table */
+ tab = ATGetQueueEntry(wqueue, rel);
+ tab->constraints = lappend(tab->constraints, newcon);
/*
* Invalidate relcache so that others see the new validated
@@ -9811,86 +9838,6 @@ checkFkeyPermissions(Relation rel, int16 *attnums, int natts)
}
/*
- * Scan the existing rows in a table to verify they meet a proposed
- * CHECK constraint.
- *
- * The caller must have opened and locked the relation appropriately.
- */
-static void
-validateCheckConstraint(Relation rel, HeapTuple constrtup)
-{
- EState *estate;
- Datum val;
- char *conbin;
- Expr *origexpr;
- ExprState *exprstate;
- TableScanDesc scan;
- ExprContext *econtext;
- MemoryContext oldcxt;
- TupleTableSlot *slot;
- Form_pg_constraint constrForm;
- bool isnull;
- Snapshot snapshot;
-
- /*
- * VALIDATE CONSTRAINT is a no-op for foreign tables and partitioned
- * tables.
- */
- if (rel->rd_rel->relkind == RELKIND_FOREIGN_TABLE ||
- rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
- return;
-
- constrForm = (Form_pg_constraint) GETSTRUCT(constrtup);
-
- estate = CreateExecutorState();
-
- /*
- * XXX this tuple doesn't really come from a syscache, but this doesn't
- * matter to SysCacheGetAttr, because it only wants to be able to fetch
- * the tupdesc
- */
- val = SysCacheGetAttr(CONSTROID, constrtup, Anum_pg_constraint_conbin,
- &isnull);
- if (isnull)
- elog(ERROR, "null conbin for constraint %u",
- constrForm->oid);
- conbin = TextDatumGetCString(val);
- origexpr = (Expr *) stringToNode(conbin);
- exprstate = ExecPrepareExpr(origexpr, estate);
-
- econtext = GetPerTupleExprContext(estate);
- slot = table_slot_create(rel, NULL);
- econtext->ecxt_scantuple = slot;
-
- snapshot = RegisterSnapshot(GetLatestSnapshot());
- scan = table_beginscan(rel, snapshot, 0, NULL);
-
- /*
- * Switch to per-tuple memory context and reset it for each tuple
- * produced, so we don't leak memory.
- */
- oldcxt = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
-
- while (table_scan_getnextslot(scan, ForwardScanDirection, slot))
- {
- if (!ExecCheck(exprstate, econtext))
- ereport(ERROR,
- (errcode(ERRCODE_CHECK_VIOLATION),
- errmsg("check constraint \"%s\" is violated by some row",
- NameStr(constrForm->conname)),
- errtableconstraint(rel, NameStr(constrForm->conname))));
-
- ResetExprContext(econtext);
- }
-
- MemoryContextSwitchTo(oldcxt);
- table_endscan(scan);
- UnregisterSnapshot(snapshot);
- ExecDropSingleTupleTableSlot(slot);
- FreeExecutorState(estate);
-}
-
-/*
* Scan the existing rows in a table to verify they meet a proposed FK
* constraint.
*