diff options
Diffstat (limited to 'src/backend/commands/tablecmds.c')
-rw-r--r-- | src/backend/commands/tablecmds.c | 1664 |
1 files changed, 418 insertions, 1246 deletions
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index de0d911b468..79c9c031833 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -151,7 +151,6 @@ typedef enum AlterTablePass AT_PASS_ALTER_TYPE, /* ALTER COLUMN TYPE */ AT_PASS_ADD_COL, /* ADD COLUMN */ AT_PASS_SET_EXPRESSION, /* ALTER SET EXPRESSION */ - AT_PASS_OLD_COL_ATTRS, /* re-install attnotnull */ AT_PASS_OLD_INDEX, /* re-add existing indexes */ AT_PASS_OLD_CONSTR, /* re-add existing constraints */ /* We could support a RENAME COLUMN pass here, but not currently used */ @@ -362,8 +361,7 @@ static void truncate_check_activity(Relation rel); static void RangeVarCallbackForTruncate(const RangeVar *relation, Oid relId, Oid oldRelId, void *arg); static List *MergeAttributes(List *columns, const List *supers, char relpersistence, - bool is_partition, List **supconstr, - List **supnotnulls); + bool is_partition, List **supconstr); static List *MergeCheckConstraint(List *constraints, const char *name, Node *expr); static void MergeChildAttribute(List *inh_columns, int exist_attno, int newcol_attno, const ColumnDef *newdef); static ColumnDef *MergeInheritedAttribute(List *inh_columns, int exist_attno, const ColumnDef *newdef); @@ -447,16 +445,16 @@ static bool check_for_column_name_collision(Relation rel, const char *colname, bool if_not_exists); static void add_column_datatype_dependency(Oid relid, int32 attnum, Oid typid); static void add_column_collation_dependency(Oid relid, int32 attnum, Oid collid); -static ObjectAddress ATExecDropNotNull(Relation rel, const char *colName, bool recurse, - LOCKMODE lockmode); -static bool set_attnotnull(List **wqueue, Relation rel, - AttrNumber attnum, bool recurse, LOCKMODE lockmode); -static ObjectAddress ATExecSetNotNull(List **wqueue, Relation rel, - char *constrname, char *colName, - bool recurse, bool recursing, - List **readyRels, LOCKMODE lockmode); -static ObjectAddress ATExecSetAttNotNull(List **wqueue, Relation rel, - const char *colName, LOCKMODE lockmode); +static void ATPrepDropNotNull(Relation rel, bool recurse, bool recursing); +static ObjectAddress ATExecDropNotNull(Relation rel, const char *colName, LOCKMODE lockmode); +static void ATPrepSetNotNull(List **wqueue, Relation rel, + AlterTableCmd *cmd, bool recurse, bool recursing, + LOCKMODE lockmode, + AlterTableUtilityContext *context); +static ObjectAddress ATExecSetNotNull(AlteredTableInfo *tab, Relation rel, + const char *colName, LOCKMODE lockmode); +static void ATExecCheckNotNull(AlteredTableInfo *tab, Relation rel, + const char *colName, LOCKMODE lockmode); static bool NotNullImpliedByRelConstraints(Relation rel, Form_pg_attribute attr); static bool ConstraintImpliedByRelConstraint(Relation scanrel, List *testConstraint, List *provenConstraint); @@ -488,8 +486,6 @@ static ObjectAddress ATExecDropColumn(List **wqueue, Relation rel, const char *c bool recurse, bool recursing, bool missing_ok, LOCKMODE lockmode, ObjectAddresses *addrs); -static void ATPrepAddPrimaryKey(List **wqueue, Relation rel, AlterTableCmd *cmd, - LOCKMODE lockmode, AlterTableUtilityContext *context); static ObjectAddress ATExecAddIndex(AlteredTableInfo *tab, Relation rel, IndexStmt *stmt, bool is_rebuild, LOCKMODE lockmode); static ObjectAddress ATExecAddStatistics(AlteredTableInfo *tab, Relation rel, @@ -501,11 +497,11 @@ static ObjectAddress ATExecAddConstraint(List **wqueue, static char *ChooseForeignKeyConstraintNameAddition(List *colnames); static ObjectAddress ATExecAddIndexConstraint(AlteredTableInfo *tab, Relation rel, IndexStmt *stmt, LOCKMODE lockmode); -static ObjectAddress ATAddCheckNNConstraint(List **wqueue, - AlteredTableInfo *tab, Relation rel, - Constraint *constr, - bool recurse, bool recursing, bool is_readd, - LOCKMODE lockmode); +static ObjectAddress ATAddCheckConstraint(List **wqueue, + AlteredTableInfo *tab, Relation rel, + Constraint *constr, + bool recurse, bool recursing, bool is_readd, + LOCKMODE lockmode); static ObjectAddress ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, Constraint *fkconstraint, bool recurse, bool recursing, @@ -562,13 +558,9 @@ static void GetForeignKeyCheckTriggers(Relation trigrel, Oid *insertTriggerOid, Oid *updateTriggerOid); static void ATExecDropConstraint(Relation rel, const char *constrName, - DropBehavior behavior, bool recurse, + DropBehavior behavior, + bool recurse, bool recursing, bool missing_ok, LOCKMODE lockmode); -static ObjectAddress dropconstraint_internal(Relation rel, - HeapTuple constraintTup, DropBehavior behavior, - bool recurse, bool recursing, - bool missing_ok, List **readyRels, - LOCKMODE lockmode); static void ATPrepAlterColumnType(List **wqueue, AlteredTableInfo *tab, Relation rel, bool recurse, bool recursing, @@ -644,8 +636,6 @@ static void ComputePartitionAttrs(ParseState *pstate, Relation rel, List *partPa static void CreateInheritance(Relation child_rel, Relation parent_rel, bool ispartition); static void RemoveInheritance(Relation child_rel, Relation parent_rel, bool expect_detached); -static void ATInheritAdjustNotNulls(Relation parent_rel, Relation child_rel, - int inhcount); static ObjectAddress ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd, AlterTableUtilityContext *context); @@ -667,7 +657,6 @@ static ObjectAddress ATExecAttachPartitionIdx(List **wqueue, Relation parentIdx, static void validatePartitionedIndex(Relation partedIdx, Relation partedTbl); static void refuseDupeIndexAttach(Relation parentIdx, Relation partIdx, Relation partitionTbl); -static void verifyPartitionIndexNotNull(IndexInfo *iinfo, Relation partIdx); static List *GetParentedForeignKeyRefs(Relation partition); static void ATDetachCheckNoForeignKeyRefs(Relation partition); static char GetAttributeCompression(Oid atttypid, const char *compression); @@ -710,10 +699,8 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId, TupleDesc descriptor; List *inheritOids; List *old_constraints; - List *old_notnulls; List *rawDefaults; List *cookedDefaults; - List *nncols; Datum reloptions; ListCell *listptr; AttrNumber attnum; @@ -898,13 +885,12 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId, MergeAttributes(stmt->tableElts, inheritOids, stmt->relation->relpersistence, stmt->partbound != NULL, - &old_constraints, &old_notnulls); + &old_constraints); /* * Create a tuple descriptor from the relation schema. Note that this - * deals with column names, types, and in-descriptor NOT NULL flags, but - * not default values, NOT NULL or CHECK constraints; we handle those - * below. + * deals with column names, types, and not-null constraints, but not + * default values or CHECK constraints; we handle those below. */ descriptor = BuildDescForRelation(stmt->tableElts); @@ -1276,17 +1262,6 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId, AddRelationNewConstraints(rel, NIL, stmt->constraints, true, true, false, queryString); - /* - * Finally, merge the not-null constraints that are declared directly with - * those that come from parent relations (making sure to count inheritance - * appropriately for each), create them, and set the attnotnull flag on - * columns that don't yet have it. - */ - nncols = AddRelationNotNullConstraints(rel, stmt->nnconstraints, - old_notnulls); - foreach(listptr, nncols) - set_attnotnull(NULL, rel, lfirst_int(listptr), false, NoLock); - ObjectAddressSet(address, RelationRelationId, relationId); /* @@ -2437,8 +2412,6 @@ storage_name(char c) * Output arguments: * 'supconstr' receives a list of constraints belonging to the parents, * updated as necessary to be valid for the child. - * 'supnotnulls' receives a list of CookedConstraints that corresponds to - * constraints coming from inheritance parents. * * Return value: * Completed schema list. @@ -2469,10 +2442,7 @@ storage_name(char c) * * Constraints (including not-null constraints) for the child table * are the union of all relevant constraints, from both the child schema - * and parent tables. In addition, in legacy inheritance, each column that - * appears in a primary key in any of the parents also gets a NOT NULL - * constraint (partitioning doesn't need this, because the PK itself gets - * inherited.) + * and parent tables. * * The default value for a child column is defined as: * (1) If the child schema specifies a default, that value is used. @@ -2491,11 +2461,10 @@ storage_name(char c) */ static List * MergeAttributes(List *columns, const List *supers, char relpersistence, - bool is_partition, List **supconstr, List **supnotnulls) + bool is_partition, List **supconstr) { List *inh_columns = NIL; List *constraints = NIL; - List *nnconstraints = NIL; bool have_bogus_defaults = false; int child_attno; static Node bogus_marker = {0}; /* marks conflicting defaults */ @@ -2606,11 +2575,8 @@ MergeAttributes(List *columns, const List *supers, char relpersistence, AttrMap *newattmap; List *inherited_defaults; List *cols_with_defaults; - List *nnconstrs; ListCell *lc1; ListCell *lc2; - Bitmapset *pkattrs; - Bitmapset *nncols = NULL; /* caller already got lock */ relation = table_open(parent, NoLock); @@ -2698,20 +2664,6 @@ MergeAttributes(List *columns, const List *supers, char relpersistence, /* We can't process inherited defaults until newattmap is complete. */ inherited_defaults = cols_with_defaults = NIL; - /* - * All columns that are part of the parent's primary key need to be - * NOT NULL; if partition just the attnotnull bit, otherwise a full - * constraint (if they don't have one already). Also, we request - * attnotnull on columns that have a not-null constraint that's not - * marked NO INHERIT. - */ - pkattrs = RelationGetIndexAttrBitmap(relation, - INDEX_ATTR_BITMAP_PRIMARY_KEY); - nnconstrs = RelationGetNotNullConstraints(RelationGetRelid(relation), true); - foreach(lc1, nnconstrs) - nncols = bms_add_member(nncols, - ((CookedConstraint *) lfirst(lc1))->attnum); - for (AttrNumber parent_attno = 1; parent_attno <= tupleDesc->natts; parent_attno++) { @@ -2733,6 +2685,7 @@ MergeAttributes(List *columns, const List *supers, char relpersistence, */ newdef = makeColumnDef(attributeName, attribute->atttypid, attribute->atttypmod, attribute->attcollation); + newdef->is_not_null = attribute->attnotnull; newdef->storage = attribute->attstorage; newdef->generated = attribute->attgenerated; if (CompressionMethodIsValid(attribute->attcompression)) @@ -2777,45 +2730,10 @@ MergeAttributes(List *columns, const List *supers, char relpersistence, inh_columns = lappend(inh_columns, newdef); newattmap->attnums[parent_attno - 1] = ++child_attno; - mergeddef = newdef; } /* - * mark attnotnull if parent has it and it's not NO INHERIT - */ - if (bms_is_member(parent_attno, nncols) || - bms_is_member(parent_attno - FirstLowInvalidHeapAttributeNumber, - pkattrs)) - mergeddef->is_not_null = true; - - /* - * In regular inheritance, columns in the parent's primary key get - * an extra not-null constraint. Partitioning doesn't need this, - * because the PK itself is going to be cloned to the partition. - */ - if (!is_partition && - bms_is_member(parent_attno - - FirstLowInvalidHeapAttributeNumber, - pkattrs)) - { - CookedConstraint *nn; - - nn = palloc(sizeof(CookedConstraint)); - nn->contype = CONSTR_NOTNULL; - nn->conoid = InvalidOid; - nn->name = NULL; - nn->attnum = newattmap->attnums[parent_attno - 1]; - nn->expr = NULL; - nn->skip_validation = false; - nn->is_local = false; - nn->inhcount = 1; - nn->is_no_inherit = false; - - nnconstraints = lappend(nnconstraints, nn); - } - - /* * Locate default/generation expression if any */ if (attribute->atthasdef) @@ -2926,23 +2844,6 @@ MergeAttributes(List *columns, const List *supers, char relpersistence, } } - /* - * Also copy the not-null constraints from this parent. The - * attnotnull markings were already installed above. - */ - foreach(lc1, nnconstrs) - { - CookedConstraint *nn = lfirst(lc1); - - Assert(nn->contype == CONSTR_NOTNULL); - - nn->attnum = newattmap->attnums[nn->attnum - 1]; - nn->is_local = false; - nn->inhcount = 1; - - nnconstraints = lappend(nnconstraints, nn); - } - free_attrmap(newattmap); /* @@ -3013,7 +2914,8 @@ MergeAttributes(List *columns, const List *supers, char relpersistence, /* * Now that we have the column definition list for a partition, we can * check whether the columns referenced in the column constraint specs - * actually exist. Also, merge column defaults. + * actually exist. Also, we merge parent's not-null constraints and + * defaults into each corresponding column definition. */ if (is_partition) { @@ -3030,6 +2932,7 @@ MergeAttributes(List *columns, const List *supers, char relpersistence, if (strcmp(coldef->colname, restdef->colname) == 0) { found = true; + coldef->is_not_null |= restdef->is_not_null; /* * Check for conflicts related to generated columns. @@ -3118,7 +3021,6 @@ MergeAttributes(List *columns, const List *supers, char relpersistence, } *supconstr = constraints; - *supnotnulls = nnconstraints; return columns; } @@ -3400,6 +3302,11 @@ MergeInheritedAttribute(List *inh_columns, format_type_with_typemod(newtypeid, newtypmod)))); /* + * Merge of not-null constraints = OR 'em together + */ + prevdef->is_not_null |= newdef->is_not_null; + + /* * Must have the same collation */ prevcollid = GetColumnDefCollation(NULL, prevdef, prevtypeid); @@ -4022,10 +3929,7 @@ rename_constraint_internal(Oid myrelid, constraintOid); con = (Form_pg_constraint) GETSTRUCT(tuple); - if (myrelid && - (con->contype == CONSTRAINT_CHECK || - con->contype == CONSTRAINT_NOTNULL) && - !con->connoinherit) + if (myrelid && con->contype == CONSTRAINT_CHECK && !con->connoinherit) { if (recurse) { @@ -4610,7 +4514,6 @@ AlterTableGetLockLevel(List *cmds) case AT_AddIndexConstraint: case AT_ReplicaIdentity: case AT_SetNotNull: - case AT_SetAttNotNull: case AT_EnableRowSecurity: case AT_DisableRowSecurity: case AT_ForceRowSecurity: @@ -4758,6 +4661,15 @@ AlterTableGetLockLevel(List *cmds) cmd_lockmode = AccessExclusiveLock; break; + case AT_CheckNotNull: + + /* + * This only examines the table's schema; but lock must be + * strong enough to prevent concurrent DROP NOT NULL. + */ + cmd_lockmode = AccessShareLock; + break; + default: /* oops */ elog(ERROR, "unrecognized alter table type: %d", (int) cmd->subtype); @@ -4915,23 +4827,21 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, break; case AT_DropNotNull: /* ALTER COLUMN DROP NOT NULL */ ATSimplePermissions(cmd->subtype, rel, ATT_TABLE | ATT_FOREIGN_TABLE); - /* Set up recursion for phase 2; no other prep needed */ - if (recurse) - cmd->recurse = true; + ATPrepDropNotNull(rel, recurse, recursing); + ATSimpleRecursion(wqueue, rel, cmd, recurse, lockmode, context); pass = AT_PASS_DROP; break; case AT_SetNotNull: /* ALTER COLUMN SET NOT NULL */ ATSimplePermissions(cmd->subtype, rel, ATT_TABLE | ATT_FOREIGN_TABLE); - /* Set up recursion for phase 2; no other prep needed */ - if (recurse) - cmd->recurse = true; + /* Need command-specific recursion decision */ + ATPrepSetNotNull(wqueue, rel, cmd, recurse, recursing, + lockmode, context); pass = AT_PASS_COL_ATTRS; break; - case AT_SetAttNotNull: /* set pg_attribute.attnotnull without adding - * a constraint */ + case AT_CheckNotNull: /* check column is already marked NOT NULL */ ATSimplePermissions(cmd->subtype, rel, ATT_TABLE | ATT_FOREIGN_TABLE); - /* Need command-specific recursion decision */ ATSimpleRecursion(wqueue, rel, cmd, recurse, lockmode, context); + /* No command-specific prep needed */ pass = AT_PASS_COL_ATTRS; break; case AT_SetExpression: /* ALTER COLUMN SET EXPRESSION */ @@ -4985,12 +4895,10 @@ ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd, break; case AT_AddConstraint: /* ADD CONSTRAINT */ ATSimplePermissions(cmd->subtype, rel, ATT_TABLE | ATT_FOREIGN_TABLE); + /* Recursion occurs during execution phase */ + /* No command-specific prep needed except saving recurse flag */ if (recurse) - { - /* recurses at exec time; lock descendants and set flag */ - (void) find_all_inheritors(RelationGetRelid(rel), lockmode, NULL); cmd->recurse = true; - } pass = AT_PASS_ADD_CONSTR; break; case AT_AddIndexConstraint: /* ADD CONSTRAINT USING INDEX */ @@ -5320,14 +5228,13 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, address = ATExecDropIdentity(rel, cmd->name, cmd->missing_ok, lockmode, cmd->recurse, false); break; case AT_DropNotNull: /* ALTER COLUMN DROP NOT NULL */ - address = ATExecDropNotNull(rel, cmd->name, cmd->recurse, lockmode); + address = ATExecDropNotNull(rel, cmd->name, lockmode); break; case AT_SetNotNull: /* ALTER COLUMN SET NOT NULL */ - address = ATExecSetNotNull(wqueue, rel, NULL, cmd->name, - cmd->recurse, false, NULL, lockmode); + address = ATExecSetNotNull(tab, rel, cmd->name, lockmode); break; - case AT_SetAttNotNull: /* set pg_attribute.attnotnull */ - address = ATExecSetAttNotNull(wqueue, rel, cmd->name, lockmode); + case AT_CheckNotNull: /* check column is already marked NOT NULL */ + ATExecCheckNotNull(tab, rel, cmd->name, lockmode); break; case AT_SetExpression: address = ATExecSetExpression(tab, rel, cmd->name, cmd->def, lockmode); @@ -5410,7 +5317,7 @@ ATExecCmd(List **wqueue, AlteredTableInfo *tab, break; case AT_DropConstraint: /* DROP CONSTRAINT */ ATExecDropConstraint(rel, cmd->name, cmd->behavior, - cmd->recurse, + cmd->recurse, false, cmd->missing_ok, lockmode); break; case AT_AlterColumnType: /* ALTER COLUMN TYPE */ @@ -5689,23 +5596,21 @@ ATParseTransformCmd(List **wqueue, AlteredTableInfo *tab, Relation rel, */ switch (cmd2->subtype) { - case AT_SetAttNotNull: - ATSimpleRecursion(wqueue, rel, cmd2, recurse, lockmode, context); + case AT_SetNotNull: + /* Need command-specific recursion decision */ + ATPrepSetNotNull(wqueue, rel, cmd2, + recurse, false, + lockmode, context); pass = AT_PASS_COL_ATTRS; break; case AT_AddIndex: - - /* - * A primary key on an inheritance parent needs supporting NOT - * NULL constraint on its children; enqueue commands to create - * those or mark them inherited if they already exist. - */ - ATPrepAddPrimaryKey(wqueue, rel, cmd2, lockmode, context); + /* This command never recurses */ + /* No command-specific prep needed */ pass = AT_PASS_ADD_INDEX; break; case AT_AddIndexConstraint: - /* as above */ - ATPrepAddPrimaryKey(wqueue, rel, cmd2, lockmode, context); + /* This command never recurses */ + /* No command-specific prep needed */ pass = AT_PASS_ADD_INDEXCONSTR; break; case AT_AddConstraint: @@ -6372,7 +6277,6 @@ ATRewriteTable(AlteredTableInfo *tab, Oid OIDNewHeap, LOCKMODE lockmode) RelationGetRelationName(oldrel)), errtableconstraint(oldrel, con->name))); break; - case CONSTR_NOTNULL: case CONSTR_FOREIGN: /* Nothing to do here */ break; @@ -6482,12 +6386,12 @@ alter_table_type_to_string(AlterTableType cmdtype) return "ALTER COLUMN ... DROP NOT NULL"; case AT_SetNotNull: return "ALTER COLUMN ... SET NOT NULL"; - case AT_SetAttNotNull: - return NULL; /* not real grammar */ case AT_SetExpression: return "ALTER COLUMN ... SET EXPRESSION"; case AT_DropExpression: return "ALTER COLUMN ... DROP EXPRESSION"; + case AT_CheckNotNull: + return NULL; /* not real grammar */ case AT_SetStatistics: return "ALTER COLUMN ... SET STATISTICS"; case AT_SetOptions: @@ -7570,21 +7474,41 @@ add_column_collation_dependency(Oid relid, int32 attnum, Oid collid) /* * ALTER TABLE ALTER COLUMN DROP NOT NULL - * + */ + +static void +ATPrepDropNotNull(Relation rel, bool recurse, bool recursing) +{ + /* + * If the parent is a partitioned table, like check constraints, we do not + * support removing the NOT NULL while partitions exist. + */ + if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) + { + PartitionDesc partdesc = RelationGetPartitionDesc(rel, true); + + Assert(partdesc != NULL); + if (partdesc->nparts > 0 && !recurse && !recursing) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("cannot remove constraint from only the partitioned table when partitions exist"), + errhint("Do not specify the ONLY keyword."))); + } +} + +/* * Return the address of the modified column. If the column was already * nullable, InvalidObjectAddress is returned. */ static ObjectAddress -ATExecDropNotNull(Relation rel, const char *colName, bool recurse, - LOCKMODE lockmode) +ATExecDropNotNull(Relation rel, const char *colName, LOCKMODE lockmode) { HeapTuple tuple; - HeapTuple conTup; Form_pg_attribute attTup; AttrNumber attnum; Relation attr_rel; + List *indexoidlist; ObjectAddress address; - List *readyRels; /* * lookup the attribute @@ -7599,15 +7523,6 @@ ATExecDropNotNull(Relation rel, const char *colName, bool recurse, colName, RelationGetRelationName(rel)))); attTup = (Form_pg_attribute) GETSTRUCT(tuple); attnum = attTup->attnum; - ObjectAddressSubSet(address, RelationRelationId, - RelationGetRelid(rel), attnum); - - /* If the column is already nullable there's nothing to do. */ - if (!attTup->attnotnull) - { - table_close(attr_rel, RowExclusiveLock); - return InvalidObjectAddress; - } /* Prevent them from altering a system attribute */ if (attnum <= 0) @@ -7623,37 +7538,60 @@ ATExecDropNotNull(Relation rel, const char *colName, bool recurse, colName, RelationGetRelationName(rel)))); /* - * It's not OK to remove a constraint only for the parent and leave it in - * the children, so disallow that. + * Check that the attribute is not in a primary key or in an index used as + * a replica identity. + * + * Note: we'll throw error even if the pkey index is not valid. */ - if (!recurse) + + /* Loop over all indexes on the relation */ + indexoidlist = RelationGetIndexList(rel); + + foreach_oid(indexoid, indexoidlist) { - if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) - { - PartitionDesc partdesc; + HeapTuple indexTuple; + Form_pg_index indexStruct; - partdesc = RelationGetPartitionDesc(rel, true); + indexTuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(indexoid)); + if (!HeapTupleIsValid(indexTuple)) + elog(ERROR, "cache lookup failed for index %u", indexoid); + indexStruct = (Form_pg_index) GETSTRUCT(indexTuple); - if (partdesc->nparts > 0) - ereport(ERROR, - errcode(ERRCODE_INVALID_TABLE_DEFINITION), - errmsg("cannot remove constraint from only the partitioned table when partitions exist"), - errhint("Do not specify the ONLY keyword.")); - } - else if (rel->rd_rel->relhassubclass && - find_inheritance_children(RelationGetRelid(rel), NoLock) != NIL) + /* + * If the index is not a primary key or an index used as replica + * identity, skip the check. + */ + if (indexStruct->indisprimary || indexStruct->indisreplident) { - ereport(ERROR, - errcode(ERRCODE_INVALID_TABLE_DEFINITION), - errmsg("not-null constraint on column \"%s\" must be removed in child tables too", - colName), - errhint("Do not specify the ONLY keyword.")); + /* + * Loop over each attribute in the primary key or the index used + * as replica identity and see if it matches the to-be-altered + * attribute. + */ + for (int i = 0; i < indexStruct->indnkeyatts; i++) + { + if (indexStruct->indkey.values[i] == attnum) + { + if (indexStruct->indisprimary) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("column \"%s\" is in a primary key", + colName))); + else + ereport(ERROR, + (errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("column \"%s\" is in index used as replica identity", + colName))); + } + } } + + ReleaseSysCache(indexTuple); } - /* - * If rel is partition, shouldn't drop NOT NULL if parent has the same. - */ + list_free(indexoidlist); + + /* If rel is partition, shouldn't drop NOT NULL if parent has the same */ if (rel->rd_rel->relispartition) { Oid parentId = get_partition_parent(RelationGetRelid(rel), false); @@ -7671,52 +7609,19 @@ ATExecDropNotNull(Relation rel, const char *colName, bool recurse, } /* - * Find the constraint that makes this column NOT NULL, and drop it if we - * see one. dropconstraint_internal() will do necessary consistency - * checking. If there isn't one, there are two possibilities: either the - * column is marked attnotnull because it's part of the primary key, and - * then we just throw an appropriate error; or it's a leftover marking - * that we can remove. However, before doing the latter, to avoid - * breaking consistency any further, prevent this if the column is part of - * the replica identity. + * Okay, actually perform the catalog change ... if needed */ - conTup = findNotNullConstraint(RelationGetRelid(rel), colName); - if (conTup == NULL) + if (attTup->attnotnull) { - Bitmapset *pkcols; - Bitmapset *ircols; - - /* - * If the column is in a primary key, throw a specific error message. - */ - pkcols = RelationGetIndexAttrBitmap(rel, INDEX_ATTR_BITMAP_PRIMARY_KEY); - if (bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber, - pkcols)) - ereport(ERROR, - errcode(ERRCODE_INVALID_TABLE_DEFINITION), - errmsg("column \"%s\" is in a primary key", colName)); - - /* Also throw an error if the column is in the replica identity */ - ircols = RelationGetIndexAttrBitmap(rel, INDEX_ATTR_BITMAP_IDENTITY_KEY); - if (bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber, ircols)) - ereport(ERROR, - errcode(ERRCODE_INVALID_TABLE_DEFINITION), - errmsg("column \"%s\" is in index used as replica identity", - get_attname(RelationGetRelid(rel), attnum, false))); - - /* Otherwise, just remove the attnotnull marking and do nothing else. */ attTup->attnotnull = false; + CatalogTupleUpdate(attr_rel, &tuple->t_self, tuple); - } - else - { - /* The normal case: we have a pg_constraint row, remove it */ - readyRels = NIL; - dropconstraint_internal(rel, conTup, DROP_RESTRICT, recurse, false, - false, &readyRels, lockmode); - heap_freetuple(conTup); + ObjectAddressSubSet(address, RelationRelationId, + RelationGetRelid(rel), attnum); } + else + address = InvalidObjectAddress; InvokeObjectPostAlterHook(RelationRelationId, RelationGetRelid(rel), attnum); @@ -7727,147 +7632,102 @@ ATExecDropNotNull(Relation rel, const char *colName, bool recurse, } /* - * Helper to set pg_attribute.attnotnull if it isn't set, and to tell phase 3 - * to verify it; recurses to apply the same to children. - * - * When called to alter an existing table, 'wqueue' must be given so that we can - * queue a check that existing tuples pass the constraint. When called from - * table creation, 'wqueue' should be passed as NULL. - * - * Returns true if the flag was set in any table, otherwise false. + * ALTER TABLE ALTER COLUMN SET NOT NULL */ -static bool -set_attnotnull(List **wqueue, Relation rel, AttrNumber attnum, bool recurse, - LOCKMODE lockmode) -{ - HeapTuple tuple; - Form_pg_attribute attForm; - bool retval = false; - /* Guard against stack overflow due to overly deep inheritance tree. */ - check_stack_depth(); +static void +ATPrepSetNotNull(List **wqueue, Relation rel, + AlterTableCmd *cmd, bool recurse, bool recursing, + LOCKMODE lockmode, AlterTableUtilityContext *context) +{ + /* + * If we're already recursing, there's nothing to do; the topmost + * invocation of ATSimpleRecursion already visited all children. + */ + if (recursing) + return; - tuple = SearchSysCacheCopyAttNum(RelationGetRelid(rel), attnum); - if (!HeapTupleIsValid(tuple)) - elog(ERROR, "cache lookup failed for attribute %d of relation %u", - attnum, RelationGetRelid(rel)); - attForm = (Form_pg_attribute) GETSTRUCT(tuple); - if (!attForm->attnotnull) + /* + * If the target column is already marked NOT NULL, we can skip recursing + * to children, because their columns should already be marked NOT NULL as + * well. But there's no point in checking here unless the relation has + * some children; else we can just wait till execution to check. (If it + * does have children, however, this can save taking per-child locks + * unnecessarily. This greatly improves concurrency in some parallel + * restore scenarios.) + * + * Unfortunately, we can only apply this optimization to partitioned + * tables, because traditional inheritance doesn't enforce that child + * columns be NOT NULL when their parent is. (That's a bug that should + * get fixed someday.) + */ + if (rel->rd_rel->relhassubclass && + rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) { - Relation attr_rel; - - attr_rel = table_open(AttributeRelationId, RowExclusiveLock); - - attForm->attnotnull = true; - CatalogTupleUpdate(attr_rel, &tuple->t_self, tuple); - - table_close(attr_rel, RowExclusiveLock); + HeapTuple tuple; + bool attnotnull; - /* - * And set up for existing values to be checked, unless another - * constraint already proves this. - */ - if (wqueue && !NotNullImpliedByRelConstraints(rel, attForm)) - { - AlteredTableInfo *tab; + tuple = SearchSysCacheAttName(RelationGetRelid(rel), cmd->name); - tab = ATGetQueueEntry(wqueue, rel); - tab->verify_new_notnull = true; - } + /* Might as well throw the error now, if name is bad */ + if (!HeapTupleIsValid(tuple)) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_COLUMN), + errmsg("column \"%s\" of relation \"%s\" does not exist", + cmd->name, RelationGetRelationName(rel)))); - retval = true; + attnotnull = ((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull; + ReleaseSysCache(tuple); + if (attnotnull) + return; } - if (recurse) + /* + * If we have ALTER TABLE ONLY ... SET NOT NULL on a partitioned table, + * apply ALTER TABLE ... CHECK NOT NULL to every child. Otherwise, use + * normal recursion logic. + */ + if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE && + !recurse) { - List *children; - ListCell *lc; - - /* Make above update visible, for multiple inheritance cases */ - if (retval) - CommandCounterIncrement(); - - children = find_inheritance_children(RelationGetRelid(rel), lockmode); - foreach(lc, children) - { - Oid childrelid = lfirst_oid(lc); - Relation childrel; - AttrNumber childattno; - - /* find_inheritance_children already got lock */ - childrel = table_open(childrelid, NoLock); - CheckTableNotInUse(childrel, "ALTER TABLE"); + AlterTableCmd *newcmd = makeNode(AlterTableCmd); - childattno = get_attnum(RelationGetRelid(childrel), - get_attname(RelationGetRelid(rel), attnum, - false)); - retval |= set_attnotnull(wqueue, childrel, childattno, - recurse, lockmode); - table_close(childrel, NoLock); - } + newcmd->subtype = AT_CheckNotNull; + newcmd->name = pstrdup(cmd->name); + ATSimpleRecursion(wqueue, rel, newcmd, true, lockmode, context); } - - return retval; + else + ATSimpleRecursion(wqueue, rel, cmd, recurse, lockmode, context); } /* - * ALTER TABLE ALTER COLUMN SET NOT NULL - * - * Add a not-null constraint to a single table and its children. Returns - * the address of the constraint added to the parent relation, if one gets - * added, or InvalidObjectAddress otherwise. - * - * We must recurse to child tables during execution, rather than using - * ALTER TABLE's normal prep-time recursion. + * Return the address of the modified column. If the column was already NOT + * NULL, InvalidObjectAddress is returned. */ static ObjectAddress -ATExecSetNotNull(List **wqueue, Relation rel, char *conName, char *colName, - bool recurse, bool recursing, List **readyRels, - LOCKMODE lockmode) +ATExecSetNotNull(AlteredTableInfo *tab, Relation rel, + const char *colName, LOCKMODE lockmode) { HeapTuple tuple; - Relation constr_rel; - ScanKeyData skey; - SysScanDesc conscan; AttrNumber attnum; + Relation attr_rel; ObjectAddress address; - Constraint *constraint; - CookedConstraint *ccon; - List *cooked; - bool is_no_inherit = false; - List *ready = NIL; - - /* Guard against stack overflow due to overly deep inheritance tree. */ - check_stack_depth(); /* - * In cases of multiple inheritance, we might visit the same child more - * than once. In the topmost call, set up a list that we fill with all - * visited relations, to skip those. + * lookup the attribute */ - if (readyRels == NULL) - { - Assert(!recursing); - readyRels = &ready; - } - if (list_member_oid(*readyRels, RelationGetRelid(rel))) - return InvalidObjectAddress; - *readyRels = lappend_oid(*readyRels, RelationGetRelid(rel)); + attr_rel = table_open(AttributeRelationId, RowExclusiveLock); - /* At top level, permission check was done in ATPrepCmd, else do it */ - if (recursing) - { - ATSimplePermissions(AT_AddConstraint, rel, ATT_TABLE | ATT_FOREIGN_TABLE); - Assert(conName != NULL); - } + tuple = SearchSysCacheCopyAttName(RelationGetRelid(rel), colName); - attnum = get_attnum(RelationGetRelid(rel), colName); - if (attnum == InvalidAttrNumber) + if (!HeapTupleIsValid(tuple)) ereport(ERROR, (errcode(ERRCODE_UNDEFINED_COLUMN), errmsg("column \"%s\" of relation \"%s\" does not exist", colName, RelationGetRelationName(rel)))); + attnum = ((Form_pg_attribute) GETSTRUCT(tuple))->attnum; + /* Prevent them from altering a system attribute */ if (attnum <= 0) ereport(ERROR, @@ -7875,188 +7735,80 @@ ATExecSetNotNull(List **wqueue, Relation rel, char *conName, char *colName, errmsg("cannot alter system column \"%s\"", colName))); - /* See if there's already a constraint */ - constr_rel = table_open(ConstraintRelationId, RowExclusiveLock); - ScanKeyInit(&skey, - Anum_pg_constraint_conrelid, - BTEqualStrategyNumber, F_OIDEQ, - ObjectIdGetDatum(RelationGetRelid(rel))); - conscan = systable_beginscan(constr_rel, ConstraintRelidTypidNameIndexId, true, - NULL, 1, &skey); - - while (HeapTupleIsValid(tuple = systable_getnext(conscan))) + /* + * Okay, actually perform the catalog change ... if needed + */ + if (!((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull) { - Form_pg_constraint conForm = (Form_pg_constraint) GETSTRUCT(tuple); - bool changed = false; - HeapTuple copytup; - - if (conForm->contype != CONSTRAINT_NOTNULL) - continue; + ((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull = true; - if (extractNotNullColumn(tuple) != attnum) - continue; - - copytup = heap_copytuple(tuple); - conForm = (Form_pg_constraint) GETSTRUCT(copytup); - - /* - * Don't let a NO INHERIT constraint be changed into inherit. - */ - if (conForm->connoinherit && recurse) - ereport(ERROR, - errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot change NO INHERIT status of NOT NULL constraint \"%s\" on relation \"%s\"", - NameStr(conForm->conname), - RelationGetRelationName(rel))); + CatalogTupleUpdate(attr_rel, &tuple->t_self, tuple); /* - * If we find an appropriate constraint, we're almost done, but just - * need to change some properties on it: if we're recursing, increment - * coninhcount; if not, set conislocal if not already set. + * Ordinarily phase 3 must ensure that no NULLs exist in columns that + * are set NOT NULL; however, if we can find a constraint which proves + * this then we can skip that. We needn't bother looking if we've + * already found that we must verify some other not-null constraint. */ - if (recursing) - { - conForm->coninhcount++; - changed = true; - } - else if (!conForm->conislocal) + if (!tab->verify_new_notnull && + !NotNullImpliedByRelConstraints(rel, (Form_pg_attribute) GETSTRUCT(tuple))) { - conForm->conislocal = true; - changed = true; - } - - if (changed) - { - CatalogTupleUpdate(constr_rel, ©tup->t_self, copytup); - ObjectAddressSet(address, ConstraintRelationId, conForm->oid); + /* Tell Phase 3 it needs to test the constraint */ + tab->verify_new_notnull = true; } - systable_endscan(conscan); - table_close(constr_rel, RowExclusiveLock); - - if (changed) - return address; - else - return InvalidObjectAddress; - } - - systable_endscan(conscan); - table_close(constr_rel, RowExclusiveLock); - - /* - * If we're asked not to recurse, and children exist, raise an error for - * partitioned tables. For inheritance, we act as if NO INHERIT had been - * specified. - */ - if (!recurse && - find_inheritance_children(RelationGetRelid(rel), - NoLock) != NIL) - { - if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) - ereport(ERROR, - errcode(ERRCODE_INVALID_TABLE_DEFINITION), - errmsg("constraint must be added to child tables too"), - errhint("Do not specify the ONLY keyword.")); - else - is_no_inherit = true; + ObjectAddressSubSet(address, RelationRelationId, + RelationGetRelid(rel), attnum); } - - /* - * No constraint exists; we must add one. First determine a name to use, - * if we haven't already. - */ - if (!recursing) - { - Assert(conName == NULL); - conName = ChooseConstraintName(RelationGetRelationName(rel), - colName, "not_null", - RelationGetNamespace(rel), - NIL); - } - constraint = makeNode(Constraint); - constraint->contype = CONSTR_NOTNULL; - constraint->conname = conName; - constraint->deferrable = false; - constraint->initdeferred = false; - constraint->location = -1; - constraint->keys = list_make1(makeString(colName)); - constraint->is_no_inherit = is_no_inherit; - constraint->inhcount = recursing ? 1 : 0; - constraint->skip_validation = false; - constraint->initially_valid = true; - - /* and do it */ - cooked = AddRelationNewConstraints(rel, NIL, list_make1(constraint), - false, !recursing, false, NULL); - ccon = linitial(cooked); - ObjectAddressSet(address, ConstraintRelationId, ccon->conoid); + else + address = InvalidObjectAddress; InvokeObjectPostAlterHook(RelationRelationId, RelationGetRelid(rel), attnum); - /* - * Mark pg_attribute.attnotnull for the column. Tell that function not to - * recurse, because we're going to do it here. - */ - set_attnotnull(wqueue, rel, attnum, false, lockmode); - - /* - * Recurse to propagate the constraint to children that don't have one. - */ - if (recurse) - { - List *children; - ListCell *lc; - - children = find_inheritance_children(RelationGetRelid(rel), - lockmode); - - foreach(lc, children) - { - Relation childrel; - - childrel = table_open(lfirst_oid(lc), NoLock); - - ATExecSetNotNull(wqueue, childrel, - conName, colName, recurse, true, - readyRels, lockmode); - - table_close(childrel, NoLock); - } - } + table_close(attr_rel, RowExclusiveLock); return address; } /* - * ALTER TABLE ALTER COLUMN SET ATTNOTNULL + * ALTER TABLE ALTER COLUMN CHECK NOT NULL * - * This doesn't exist in the grammar; it's used when creating a - * primary key and the column is not already marked attnotnull. + * This doesn't exist in the grammar, but we generate AT_CheckNotNull + * commands against the partitions of a partitioned table if the user + * writes ALTER TABLE ONLY ... SET NOT NULL on the partitioned table, + * or tries to create a primary key on it (which internally creates + * AT_SetNotNull on the partitioned table). Such a command doesn't + * allow us to actually modify any partition, but we want to let it + * go through if the partitions are already properly marked. + * + * In future, this might need to adjust the child table's state, likely + * by incrementing an inheritance count for the attnotnull constraint. + * For now we need only check for the presence of the flag. */ -static ObjectAddress -ATExecSetAttNotNull(List **wqueue, Relation rel, - const char *colName, LOCKMODE lockmode) +static void +ATExecCheckNotNull(AlteredTableInfo *tab, Relation rel, + const char *colName, LOCKMODE lockmode) { - AttrNumber attnum; - ObjectAddress address = InvalidObjectAddress; + HeapTuple tuple; - attnum = get_attnum(RelationGetRelid(rel), colName); - if (attnum == InvalidAttrNumber) + tuple = SearchSysCacheAttName(RelationGetRelid(rel), colName); + + if (!HeapTupleIsValid(tuple)) ereport(ERROR, errcode(ERRCODE_UNDEFINED_COLUMN), errmsg("column \"%s\" of relation \"%s\" does not exist", colName, RelationGetRelationName(rel))); - /* - * Make the change, if necessary, and only if so report the column as - * changed - */ - if (set_attnotnull(wqueue, rel, attnum, false, lockmode)) - ObjectAddressSubSet(address, RelationRelationId, - RelationGetRelid(rel), attnum); + if (!((Form_pg_attribute) GETSTRUCT(tuple))->attnotnull) + ereport(ERROR, + (errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("constraint must be added to child tables too"), + errdetail("Column \"%s\" of relation \"%s\" is not already NOT NULL.", + colName, RelationGetRelationName(rel)), + errhint("Do not specify the ONLY keyword."))); - return address; + ReleaseSysCache(tuple); } /* @@ -9363,85 +9115,6 @@ ATExecDropColumn(List **wqueue, Relation rel, const char *colName, } /* - * Prepare to add a primary key on an inheritance parent, by adding NOT NULL - * constraint on its children. - */ -static void -ATPrepAddPrimaryKey(List **wqueue, Relation rel, AlterTableCmd *cmd, - LOCKMODE lockmode, AlterTableUtilityContext *context) -{ - List *children; - List *newconstrs = NIL; - IndexStmt *indexstmt; - - /* No work if not creating a primary key */ - if (!IsA(cmd->def, IndexStmt)) - return; - indexstmt = castNode(IndexStmt, cmd->def); - if (!indexstmt->primary) - return; - - /* No work if no legacy inheritance children are present */ - if (rel->rd_rel->relkind != RELKIND_RELATION || - !rel->rd_rel->relhassubclass) - return; - - /* - * Acquire locks all the way down the hierarchy. The recursion to lower - * levels occurs at execution time as necessary, so we don't need to do it - * here, and we don't need the returned list either. - */ - (void) find_all_inheritors(RelationGetRelid(rel), lockmode, NULL); - - /* - * Construct the list of constraints that we need to add to each child - * relation. - */ - foreach_node(IndexElem, elem, indexstmt->indexParams) - { - Constraint *nnconstr; - - Assert(elem->expr == NULL); - - nnconstr = makeNode(Constraint); - nnconstr->contype = CONSTR_NOTNULL; - nnconstr->conname = NULL; /* XXX use PK name? */ - nnconstr->inhcount = 1; - nnconstr->deferrable = false; - nnconstr->initdeferred = false; - nnconstr->location = -1; - nnconstr->keys = list_make1(makeString(elem->name)); - nnconstr->skip_validation = false; - nnconstr->initially_valid = true; - - newconstrs = lappend(newconstrs, nnconstr); - } - - /* Finally, add AT subcommands to add each constraint to each child. */ - children = find_inheritance_children(RelationGetRelid(rel), NoLock); - foreach_oid(childrelid, children) - { - Relation childrel = table_open(childrelid, NoLock); - AlterTableCmd *newcmd = makeNode(AlterTableCmd); - ListCell *lc2; - - newcmd->subtype = AT_AddConstraint; - newcmd->recurse = true; - - foreach(lc2, newconstrs) - { - /* ATPrepCmd copies newcmd, so we can scribble on it here */ - newcmd->def = lfirst(lc2); - - ATPrepCmd(wqueue, childrel, newcmd, - true, false, lockmode, context); - } - - table_close(childrel, NoLock); - } -} - -/* * ALTER TABLE ADD INDEX * * There is no such command in the grammar, but parse_utilcmd.c converts @@ -9636,18 +9309,17 @@ ATExecAddConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, Assert(IsA(newConstraint, Constraint)); /* - * Currently, we only expect to see CONSTR_CHECK, CONSTR_NOTNULL and - * CONSTR_FOREIGN nodes arriving here (see the preprocessing done in - * parse_utilcmd.c). + * Currently, we only expect to see CONSTR_CHECK and CONSTR_FOREIGN nodes + * arriving here (see the preprocessing done in parse_utilcmd.c). Use a + * switch anyway to make it easier to add more code later. */ switch (newConstraint->contype) { case CONSTR_CHECK: - case CONSTR_NOTNULL: address = - ATAddCheckNNConstraint(wqueue, tab, rel, - newConstraint, recurse, false, is_readd, - lockmode); + ATAddCheckConstraint(wqueue, tab, rel, + newConstraint, recurse, false, is_readd, + lockmode); break; case CONSTR_FOREIGN: @@ -9728,9 +9400,9 @@ ChooseForeignKeyConstraintNameAddition(List *colnames) } /* - * Add a check or not-null constraint to a single table and its children. - * Returns the address of the constraint added to the parent relation, - * if one gets added, or InvalidObjectAddress otherwise. + * Add a check constraint to a single table and its children. Returns the + * address of the constraint added to the parent relation, if one gets added, + * or InvalidObjectAddress otherwise. * * Subroutine for ATExecAddConstraint. * @@ -9743,9 +9415,9 @@ ChooseForeignKeyConstraintNameAddition(List *colnames) * the parent table and pass that down. */ static ObjectAddress -ATAddCheckNNConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, - Constraint *constr, bool recurse, bool recursing, - bool is_readd, LOCKMODE lockmode) +ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, + Constraint *constr, bool recurse, bool recursing, + bool is_readd, LOCKMODE lockmode) { List *newcons; ListCell *lcon; @@ -9753,9 +9425,6 @@ ATAddCheckNNConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, ListCell *child; ObjectAddress address = InvalidObjectAddress; - /* Guard against stack overflow due to overly deep inheritance tree. */ - check_stack_depth(); - /* At top level, permission check was done in ATPrepCmd, else do it */ if (recursing) ATSimplePermissions(AT_AddConstraint, rel, ATT_TABLE | ATT_FOREIGN_TABLE); @@ -9786,7 +9455,7 @@ ATAddCheckNNConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, { CookedConstraint *ccon = (CookedConstraint *) lfirst(lcon); - if (!ccon->skip_validation && ccon->contype != CONSTR_NOTNULL) + if (!ccon->skip_validation) { NewConstraint *newcon; @@ -9802,19 +9471,11 @@ ATAddCheckNNConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, if (constr->conname == NULL) constr->conname = ccon->name; - /* - * If adding a not-null constraint, set the pg_attribute flag and tell - * phase 3 to verify existing rows, if needed. - */ - if (constr->contype == CONSTR_NOTNULL) - set_attnotnull(wqueue, rel, ccon->attnum, - !ccon->is_no_inherit, lockmode); - ObjectAddressSet(address, ConstraintRelationId, ccon->conoid); } /* At this point we must have a locked-down name to use */ - Assert(newcons == NIL || constr->conname != NULL); + Assert(constr->conname != NULL); /* Advance command counter in case same table is visited multiple times */ CommandCounterIncrement(); @@ -9852,12 +9513,6 @@ ATAddCheckNNConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), errmsg("constraint must be added to child tables too"))); - /* - * The constraint must appear as inherited in children, so create a - * modified constraint object to use. - */ - constr = copyObject(constr); - constr->inhcount = 1; foreach(child, children) { Oid childrelid = lfirst_oid(child); @@ -9871,13 +9526,9 @@ ATAddCheckNNConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel, /* Find or create work queue entry for this table */ childtab = ATGetQueueEntry(wqueue, childrel); - /* - * Recurse to child. XXX if we didn't create a constraint on the - * parent because it already existed, and we do create one on a child, - * should we return that child's constraint ObjectAddress here? - */ - ATAddCheckNNConstraint(wqueue, childtab, childrel, - constr, recurse, true, is_readd, lockmode); + /* Recurse to child */ + ATAddCheckConstraint(wqueue, childtab, childrel, + constr, recurse, true, is_readd, lockmode); table_close(childrel, NoLock); } @@ -12889,14 +12540,23 @@ createForeignKeyCheckTriggers(Oid myRelOid, Oid refRelOid, */ static void ATExecDropConstraint(Relation rel, const char *constrName, - DropBehavior behavior, bool recurse, + DropBehavior behavior, + bool recurse, bool recursing, bool missing_ok, LOCKMODE lockmode) { + List *children; Relation conrel; + Form_pg_constraint con; SysScanDesc scan; ScanKeyData skey[3]; HeapTuple tuple; bool found = false; + bool is_no_inherit_constraint = false; + char contype; + + /* At top level, permission check was done in ATPrepCmd, else do it */ + if (recursing) + ATSimplePermissions(AT_DropConstraint, rel, ATT_TABLE | ATT_FOREIGN_TABLE); conrel = table_open(ConstraintRelationId, RowExclusiveLock); @@ -12921,260 +12581,80 @@ ATExecDropConstraint(Relation rel, const char *constrName, /* There can be at most one matching row */ if (HeapTupleIsValid(tuple = systable_getnext(scan))) { - List *readyRels = NIL; + ObjectAddress conobj; - dropconstraint_internal(rel, tuple, behavior, recurse, false, - missing_ok, &readyRels, lockmode); - found = true; - } + con = (Form_pg_constraint) GETSTRUCT(tuple); - systable_endscan(scan); - - if (!found) - { - if (!missing_ok) + /* Don't drop inherited constraints */ + if (con->coninhcount > 0 && !recursing) ereport(ERROR, - errcode(ERRCODE_UNDEFINED_OBJECT), - errmsg("constraint \"%s\" of relation \"%s\" does not exist", - constrName, RelationGetRelationName(rel))); - else - ereport(NOTICE, - errmsg("constraint \"%s\" of relation \"%s\" does not exist, skipping", - constrName, RelationGetRelationName(rel))); - } - - table_close(conrel, RowExclusiveLock); -} - -/* - * Remove a constraint, using its pg_constraint tuple - * - * Implementation for ALTER TABLE DROP CONSTRAINT and ALTER TABLE ALTER COLUMN - * DROP NOT NULL. - * - * Returns the address of the constraint being removed. - */ -static ObjectAddress -dropconstraint_internal(Relation rel, HeapTuple constraintTup, DropBehavior behavior, - bool recurse, bool recursing, bool missing_ok, List **readyRels, - LOCKMODE lockmode) -{ - Relation conrel; - Form_pg_constraint con; - ObjectAddress conobj; - List *children; - bool is_no_inherit_constraint = false; - char *constrName; - List *unconstrained_cols = NIL; - char *colname = NULL; - bool dropping_pk = false; - - if (list_member_oid(*readyRels, RelationGetRelid(rel))) - return InvalidObjectAddress; - *readyRels = lappend_oid(*readyRels, RelationGetRelid(rel)); - - /* Guard against stack overflow due to overly deep inheritance tree. */ - check_stack_depth(); - - /* At top level, permission check was done in ATPrepCmd, else do it */ - if (recursing) - ATSimplePermissions(AT_DropConstraint, rel, ATT_TABLE | ATT_FOREIGN_TABLE); - - conrel = table_open(ConstraintRelationId, RowExclusiveLock); - - con = (Form_pg_constraint) GETSTRUCT(constraintTup); - constrName = NameStr(con->conname); - - /* - * If we're asked to drop a constraint which is both defined locally and - * inherited, we can simply mark it as no longer having a local - * definition, and no further changes are required. - * - * XXX We do this for not-null constraints only, not CHECK, because the - * latter have historically not behaved this way and it might be confusing - * to change the behavior now. - */ - if (con->contype == CONSTRAINT_NOTNULL && - con->conislocal && con->coninhcount > 0) - { - HeapTuple copytup; - - copytup = heap_copytuple(constraintTup); - con = (Form_pg_constraint) GETSTRUCT(copytup); - con->conislocal = false; - CatalogTupleUpdate(conrel, ©tup->t_self, copytup); - ObjectAddressSet(conobj, ConstraintRelationId, con->oid); - - CommandCounterIncrement(); - table_close(conrel, RowExclusiveLock); - return conobj; - } - - /* Don't allow drop of inherited constraints */ - if (con->coninhcount > 0 && !recursing) - ereport(ERROR, - (errcode(ERRCODE_INVALID_TABLE_DEFINITION), - errmsg("cannot drop inherited constraint \"%s\" of relation \"%s\"", - constrName, RelationGetRelationName(rel)))); - - /* - * See if we have a not-null constraint or a PRIMARY KEY. If so, we have - * more checks and actions below, so obtain the list of columns that are - * constrained by the constraint being dropped. - */ - if (con->contype == CONSTRAINT_NOTNULL) - { - AttrNumber colnum; - - colnum = extractNotNullColumn(constraintTup); - unconstrained_cols = list_make1_int(colnum); - colname = NameStr(TupleDescAttr(RelationGetDescr(rel), - colnum - 1)->attname); - } - else if (con->contype == CONSTRAINT_PRIMARY) - { - Datum adatum; - ArrayType *arr; - int numkeys; - bool isNull; - int16 *attnums; + (errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("cannot drop inherited constraint \"%s\" of relation \"%s\"", + constrName, RelationGetRelationName(rel)))); - dropping_pk = true; + is_no_inherit_constraint = con->connoinherit; + contype = con->contype; - adatum = heap_getattr(constraintTup, Anum_pg_constraint_conkey, - RelationGetDescr(conrel), &isNull); - if (isNull) - elog(ERROR, "null conkey for constraint %u", con->oid); - arr = DatumGetArrayTypeP(adatum); /* ensure not toasted */ - numkeys = ARR_DIMS(arr)[0]; - if (ARR_NDIM(arr) != 1 || - numkeys < 0 || - ARR_HASNULL(arr) || - ARR_ELEMTYPE(arr) != INT2OID) - elog(ERROR, "conkey is not a 1-D smallint array"); - attnums = (int16 *) ARR_DATA_PTR(arr); + /* + * If it's a foreign-key constraint, we'd better lock the referenced + * table and check that that's not in use, just as we've already done + * for the constrained table (else we might, eg, be dropping a trigger + * that has unfired events). But we can/must skip that in the + * self-referential case. + */ + if (contype == CONSTRAINT_FOREIGN && + con->confrelid != RelationGetRelid(rel)) + { + Relation frel; - for (int i = 0; i < numkeys; i++) - unconstrained_cols = lappend_int(unconstrained_cols, attnums[i]); - } + /* Must match lock taken by RemoveTriggerById: */ + frel = table_open(con->confrelid, AccessExclusiveLock); + CheckTableNotInUse(frel, "ALTER TABLE"); + table_close(frel, NoLock); + } - is_no_inherit_constraint = con->connoinherit; + /* + * Perform the actual constraint deletion + */ + conobj.classId = ConstraintRelationId; + conobj.objectId = con->oid; + conobj.objectSubId = 0; - /* - * If it's a foreign-key constraint, we'd better lock the referenced table - * and check that that's not in use, just as we've already done for the - * constrained table (else we might, eg, be dropping a trigger that has - * unfired events). But we can/must skip that in the self-referential - * case. - */ - if (con->contype == CONSTRAINT_FOREIGN && - con->confrelid != RelationGetRelid(rel)) - { - Relation frel; + performDeletion(&conobj, behavior, 0); - /* Must match lock taken by RemoveTriggerById: */ - frel = table_open(con->confrelid, AccessExclusiveLock); - CheckTableNotInUse(frel, "ALTER TABLE"); - table_close(frel, NoLock); + found = true; } - /* - * Perform the actual constraint deletion - */ - ObjectAddressSet(conobj, ConstraintRelationId, con->oid); - performDeletion(&conobj, behavior, 0); + systable_endscan(scan); - /* - * If this was a NOT NULL or the primary key, verify that we still have - * constraints to support GENERATED AS IDENTITY or the replica identity. - */ - if (unconstrained_cols != NIL) + if (!found) { - Relation attrel; - Bitmapset *pkcols; - Bitmapset *ircols; - - /* Make implicit attnotnull changes visible */ - CommandCounterIncrement(); - - attrel = table_open(AttributeRelationId, RowExclusiveLock); - - /* - * We want to test columns for their presence in the primary key, but - * only if we're not dropping it. - */ - pkcols = dropping_pk ? NULL : - RelationGetIndexAttrBitmap(rel, - INDEX_ATTR_BITMAP_PRIMARY_KEY); - ircols = RelationGetIndexAttrBitmap(rel, INDEX_ATTR_BITMAP_IDENTITY_KEY); - - foreach_int(attnum, unconstrained_cols) + if (!missing_ok) { - HeapTuple atttup; - HeapTuple contup; - Form_pg_attribute attForm; - char attidentity; - - /* - * Obtain pg_attribute tuple and verify conditions on it. - */ - atttup = SearchSysCacheAttNum(RelationGetRelid(rel), attnum); - if (!HeapTupleIsValid(atttup)) - elog(ERROR, "cache lookup failed for attribute %d of relation %u", - attnum, RelationGetRelid(rel)); - attForm = (Form_pg_attribute) GETSTRUCT(atttup); - attidentity = attForm->attidentity; - ReleaseSysCache(atttup); - - /* - * Since the above deletion has been made visible, we can now - * search for any remaining constraints on this column (or these - * columns, in the case we're dropping a multicol primary key.) - * Then, verify whether any further NOT NULL or primary key - * exists: if none and this is a generated identity column or the - * replica identity, abort the whole thing. - */ - contup = findNotNullConstraintAttnum(RelationGetRelid(rel), attnum); - if (contup || - bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber, - pkcols)) - continue; - - /* - * It's not valid to drop the not-null constraint for a GENERATED - * AS IDENTITY column. - */ - if (attidentity != '\0') - ereport(ERROR, - errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("column \"%s\" of relation \"%s\" is an identity column", - get_attname(RelationGetRelid(rel), attnum, - false), - RelationGetRelationName(rel))); - - /* - * It's not valid to drop the not-null constraint for a column in - * the replica identity index, either. (FULL is not affected.) - */ - if (bms_is_member(attnum - FirstLowInvalidHeapAttributeNumber, ircols)) - ereport(ERROR, - errcode(ERRCODE_INVALID_TABLE_DEFINITION), - errmsg("column \"%s\" is in index used as replica identity", - get_attname(RelationGetRelid(rel), attnum, false))); + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("constraint \"%s\" of relation \"%s\" does not exist", + constrName, RelationGetRelationName(rel)))); + } + else + { + ereport(NOTICE, + (errmsg("constraint \"%s\" of relation \"%s\" does not exist, skipping", + constrName, RelationGetRelationName(rel)))); + table_close(conrel, RowExclusiveLock); + return; } - table_close(attrel, RowExclusiveLock); } /* - * For partitioned tables, non-CHECK, non-NOT-NULL inherited constraints - * are dropped via the dependency mechanism, so we're done here. + * For partitioned tables, non-CHECK inherited constraints are dropped via + * the dependency mechanism, so we're done here. */ - if (con->contype != CONSTRAINT_CHECK && - con->contype != CONSTRAINT_NOTNULL && + if (contype != CONSTRAINT_CHECK && rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE) { table_close(conrel, RowExclusiveLock); - return conobj; + return; } /* @@ -13202,68 +12682,48 @@ dropconstraint_internal(Relation rel, HeapTuple constraintTup, DropBehavior beha foreach_oid(childrelid, children) { Relation childrel; - HeapTuple tuple; - Form_pg_constraint childcon; - - if (list_member_oid(*readyRels, childrelid)) - continue; /* child already processed */ + HeapTuple copy_tuple; /* find_inheritance_children already got lock */ childrel = table_open(childrelid, NoLock); CheckTableNotInUse(childrel, "ALTER TABLE"); - /* - * We search for not-null constraints by column name, and others by - * constraint name. - */ - if (con->contype == CONSTRAINT_NOTNULL) - { - tuple = findNotNullConstraint(childrelid, colname); - if (!HeapTupleIsValid(tuple)) - elog(ERROR, "cache lookup failed for not-null constraint on column \"%s\" of relation %u", - colname, RelationGetRelid(childrel)); - } - else - { - SysScanDesc scan; - ScanKeyData skey[3]; - - ScanKeyInit(&skey[0], - Anum_pg_constraint_conrelid, - BTEqualStrategyNumber, F_OIDEQ, - ObjectIdGetDatum(childrelid)); - ScanKeyInit(&skey[1], - Anum_pg_constraint_contypid, - BTEqualStrategyNumber, F_OIDEQ, - ObjectIdGetDatum(InvalidOid)); - ScanKeyInit(&skey[2], - Anum_pg_constraint_conname, - BTEqualStrategyNumber, F_NAMEEQ, - CStringGetDatum(constrName)); - scan = systable_beginscan(conrel, ConstraintRelidTypidNameIndexId, - true, NULL, 3, skey); - /* There can only be one, so no need to loop */ - tuple = systable_getnext(scan); - if (!HeapTupleIsValid(tuple)) - ereport(ERROR, - (errcode(ERRCODE_UNDEFINED_OBJECT), - errmsg("constraint \"%s\" of relation \"%s\" does not exist", - constrName, - RelationGetRelationName(childrel)))); - tuple = heap_copytuple(tuple); - systable_endscan(scan); - } + ScanKeyInit(&skey[0], + Anum_pg_constraint_conrelid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(childrelid)); + ScanKeyInit(&skey[1], + Anum_pg_constraint_contypid, + BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(InvalidOid)); + ScanKeyInit(&skey[2], + Anum_pg_constraint_conname, + BTEqualStrategyNumber, F_NAMEEQ, + CStringGetDatum(constrName)); + scan = systable_beginscan(conrel, ConstraintRelidTypidNameIndexId, + true, NULL, 3, skey); + + /* There can be at most one matching row */ + if (!HeapTupleIsValid(tuple = systable_getnext(scan))) + ereport(ERROR, + (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("constraint \"%s\" of relation \"%s\" does not exist", + constrName, + RelationGetRelationName(childrel)))); + + copy_tuple = heap_copytuple(tuple); + + systable_endscan(scan); - childcon = (Form_pg_constraint) GETSTRUCT(tuple); + con = (Form_pg_constraint) GETSTRUCT(copy_tuple); - /* Right now only CHECK and not-null constraints can be inherited */ - if (childcon->contype != CONSTRAINT_CHECK && - childcon->contype != CONSTRAINT_NOTNULL) - elog(ERROR, "inherited constraint is not a CHECK or not-null constraint"); + /* Right now only CHECK constraints can be inherited */ + if (con->contype != CONSTRAINT_CHECK) + elog(ERROR, "inherited constraint is not a CHECK constraint"); - if (childcon->coninhcount <= 0) /* shouldn't happen */ + if (con->coninhcount <= 0) /* shouldn't happen */ elog(ERROR, "relation %u has non-inherited constraint \"%s\"", - childrelid, NameStr(childcon->conname)); + childrelid, constrName); if (recurse) { @@ -13271,18 +12731,18 @@ dropconstraint_internal(Relation rel, HeapTuple constraintTup, DropBehavior beha * If the child constraint has other definition sources, just * decrement its inheritance count; if not, recurse to delete it. */ - if (childcon->coninhcount == 1 && !childcon->conislocal) + if (con->coninhcount == 1 && !con->conislocal) { /* Time to delete this child constraint, too */ - dropconstraint_internal(childrel, tuple, behavior, - recurse, true, missing_ok, readyRels, - lockmode); + ATExecDropConstraint(childrel, constrName, behavior, + true, true, + false, lockmode); } else { /* Child constraint must survive my deletion */ - childcon->coninhcount--; - CatalogTupleUpdate(conrel, &tuple->t_self, tuple); + con->coninhcount--; + CatalogTupleUpdate(conrel, ©_tuple->t_self, copy_tuple); /* Make update visible */ CommandCounterIncrement(); @@ -13291,91 +12751,25 @@ dropconstraint_internal(Relation rel, HeapTuple constraintTup, DropBehavior beha else { /* - * If we were told to drop ONLY in this table (no recursion) and - * there are no further parents for this constraint, we need to - * mark the inheritors' constraints as locally defined rather than - * inherited. + * If we were told to drop ONLY in this table (no recursion), we + * need to mark the inheritors' constraints as locally defined + * rather than inherited. */ - childcon->coninhcount--; - if (childcon->coninhcount == 0) - childcon->conislocal = true; + con->coninhcount--; + con->conislocal = true; - CatalogTupleUpdate(conrel, &tuple->t_self, tuple); + CatalogTupleUpdate(conrel, ©_tuple->t_self, copy_tuple); /* Make update visible */ CommandCounterIncrement(); } - heap_freetuple(tuple); + heap_freetuple(copy_tuple); table_close(childrel, NoLock); } - /* - * In addition, when dropping a primary key from a legacy-inheritance - * parent table, we must recurse to children to mark the corresponding NOT - * NULL constraint as no longer inherited, or drop it if this its last - * reference. - */ - if (con->contype == CONSTRAINT_PRIMARY && - rel->rd_rel->relkind == RELKIND_RELATION && - rel->rd_rel->relhassubclass) - { - List *colnames = NIL; - List *pkready = NIL; - - /* - * Because primary keys are always marked as NO INHERIT, we don't have - * a list of children yet, so obtain one now. - */ - children = find_inheritance_children(RelationGetRelid(rel), lockmode); - - /* - * Find out the list of column names to process. Fortunately, we - * already have the list of column numbers. - */ - foreach_int(attnum, unconstrained_cols) - { - colnames = lappend(colnames, get_attname(RelationGetRelid(rel), - attnum, false)); - } - - foreach_oid(childrelid, children) - { - Relation childrel; - - if (list_member_oid(pkready, childrelid)) - continue; /* child already processed */ - - /* find_inheritance_children already got lock */ - childrel = table_open(childrelid, NoLock); - CheckTableNotInUse(childrel, "ALTER TABLE"); - - foreach_ptr(char, colName, colnames) - { - HeapTuple contup; - - contup = findNotNullConstraint(childrelid, colName); - if (contup == NULL) - elog(ERROR, "cache lookup failed for not-null constraint on column \"%s\", relation \"%s\"", - colName, RelationGetRelationName(childrel)); - - dropconstraint_internal(childrel, contup, - DROP_RESTRICT, true, true, - false, &pkready, - lockmode); - pkready = NIL; - } - - table_close(childrel, NoLock); - - pkready = lappend_oid(pkready, childrelid); - } - } - table_close(conrel, RowExclusiveLock); - - return conobj; } /* @@ -14479,10 +13873,9 @@ ATPostAlterTypeCleanup(List **wqueue, AlteredTableInfo *tab, LOCKMODE lockmode) /* * If the constraint is inherited (only), we don't want to inject a - * new definition here; it'll get recreated when - * ATAddCheckNNConstraint recurses from adding the parent table's - * constraint. But we had to carry the info this far so that we can - * drop the constraint below. + * new definition here; it'll get recreated when ATAddCheckConstraint + * recurses from adding the parent table's constraint. But we had to + * carry the info this far so that we can drop the constraint below. */ if (!conislocal) continue; @@ -14729,19 +14122,15 @@ ATPostAlterTypeParse(Oid oldId, Oid oldRelId, Oid refRelId, char *cmd, NIL, con->conname); } - else if (cmd->subtype == AT_SetAttNotNull) + else if (cmd->subtype == AT_SetNotNull) { /* - * We see this subtype when a primary key is created - * internally, for example when it is replaced with a new - * constraint (say because one of the columns changes - * type); in this case we need to reinstate attnotnull, - * because it was removed because of the drop of the old - * PK. Schedule this subcommand to an upcoming AT pass. + * The parser will create AT_SetNotNull subcommands for + * columns of PRIMARY KEY indexes/constraints, but we need + * not do anything with them here, because the columns' + * NOT NULL marks will already have been propagated into + * the new table definition. */ - cmd->subtype = AT_SetAttNotNull; - tab->subcmds[AT_PASS_OLD_COL_ATTRS] = - lappend(tab->subcmds[AT_PASS_OLD_COL_ATTRS], cmd); } else elog(ERROR, "unexpected statement subtype: %d", @@ -16316,13 +15705,6 @@ ATExecAddInherit(Relation child_rel, RangeVar *parent, LOCKMODE lockmode) /* OK to create inheritance */ CreateInheritance(child_rel, parent_rel, false); - /* - * If parent_rel has a primary key, then child_rel has not-null - * constraints that make these columns as non nullable. Make those - * constraints as inherited. - */ - ATInheritAdjustNotNulls(parent_rel, child_rel, 1); - ObjectAddressSet(address, RelationRelationId, RelationGetRelid(parent_rel)); @@ -16501,24 +15883,14 @@ MergeAttributesIntoExisting(Relation child_rel, Relation parent_rel, bool ispart RelationGetRelationName(child_rel), parent_attname))); /* - * If the parent has a not-null constraint that's not NO INHERIT, - * make sure the child has one too. - * - * Other constraints are checked elsewhere. + * Check child doesn't discard NOT NULL property. (Other + * constraints are checked elsewhere.) */ if (parent_att->attnotnull && !child_att->attnotnull) - { - HeapTuple contup; - - contup = findNotNullConstraintAttnum(RelationGetRelid(parent_rel), - parent_att->attnum); - if (HeapTupleIsValid(contup) && - !((Form_pg_constraint) GETSTRUCT(contup))->connoinherit) - ereport(ERROR, - errcode(ERRCODE_DATATYPE_MISMATCH), - errmsg("column \"%s\" in child table must be marked NOT NULL", - parent_attname)); - } + ereport(ERROR, + (errcode(ERRCODE_DATATYPE_MISMATCH), + errmsg("column \"%s\" in child table must be marked NOT NULL", + parent_attname))); /* * Child column must be generated if and only if parent column is. @@ -16619,8 +15991,7 @@ MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel) HeapTuple child_tuple; bool found = false; - if (parent_con->contype != CONSTRAINT_CHECK && - parent_con->contype != CONSTRAINT_NOTNULL) + if (parent_con->contype != CONSTRAINT_CHECK) continue; /* if the parent's constraint is marked NO INHERIT, it's not inherited */ @@ -16640,50 +16011,21 @@ MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel) Form_pg_constraint child_con = (Form_pg_constraint) GETSTRUCT(child_tuple); HeapTuple child_copy; - if (child_con->contype != parent_con->contype) + if (child_con->contype != CONSTRAINT_CHECK) continue; - /* - * CHECK constraint are matched by name, NOT NULL ones by - * attribute number - */ - if (child_con->contype == CONSTRAINT_CHECK) - { - if (strcmp(NameStr(parent_con->conname), - NameStr(child_con->conname)) != 0) - continue; - } - else if (child_con->contype == CONSTRAINT_NOTNULL) - { - AttrNumber parent_attno = extractNotNullColumn(parent_tuple); - AttrNumber child_attno = extractNotNullColumn(child_tuple); - - if (strcmp(get_attname(parent_relid, parent_attno, false), - get_attname(RelationGetRelid(child_rel), child_attno, - false)) != 0) - continue; - } + if (strcmp(NameStr(parent_con->conname), + NameStr(child_con->conname)) != 0) + continue; - if (child_con->contype == CONSTRAINT_CHECK && - !constraints_equivalent(parent_tuple, child_tuple, RelationGetDescr(constraintrel))) + if (!constraints_equivalent(parent_tuple, child_tuple, RelationGetDescr(constraintrel))) ereport(ERROR, (errcode(ERRCODE_DATATYPE_MISMATCH), errmsg("child table \"%s\" has different definition for check constraint \"%s\"", RelationGetRelationName(child_rel), NameStr(parent_con->conname)))); - /* - * If the CHECK child constraint is "no inherit" then cannot - * merge. - * - * This is not desirable for not-null constraints, mostly because - * it breaks our pg_upgrade strategy, but it also makes sense on - * its own: if a child has its own not-null constraint and then - * acquires a parent with the same constraint, then we start to - * enforce that constraint for all the descendants of that child - * too, if any. - */ - if (child_con->contype == CONSTRAINT_CHECK && - child_con->connoinherit) + /* If the child constraint is "no inherit" then cannot merge */ + if (child_con->connoinherit) ereport(ERROR, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), errmsg("constraint \"%s\" conflicts with non-inherited constraint on child table \"%s\"", @@ -16710,27 +16052,6 @@ MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel) ereport(ERROR, errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), errmsg("too many inheritance parents")); - if (child_con->contype == CONSTRAINT_NOTNULL && - child_con->connoinherit) - { - /* - * If the child has children, it's not possible to turn a NO - * INHERIT constraint into an inheritable one: we would need - * to recurse to create constraints in those children, but - * this is not a good place to do that. - */ - if (child_rel->rd_rel->relhassubclass) - ereport(ERROR, - errmsg("cannot add NOT NULL constraint to column \"%s\" of relation \"%s\" with inheritance children", - get_attname(RelationGetRelid(child_rel), - extractNotNullColumn(child_tuple), - false), - RelationGetRelationName(child_rel)), - errdetail("Existing constraint \"%s\" is marked NO INHERIT.", - NameStr(child_con->conname))); - - child_con->connoinherit = false; - } /* * In case of partitions, an inherited constraint must be @@ -16753,20 +16074,10 @@ MergeConstraintsIntoExisting(Relation child_rel, Relation parent_rel) systable_endscan(child_scan); if (!found) - { - if (parent_con->contype == CONSTRAINT_NOTNULL) - ereport(ERROR, - errcode(ERRCODE_DATATYPE_MISMATCH), - errmsg("column \"%s\" in child table must be marked NOT NULL", - get_attname(parent_relid, - extractNotNullColumn(parent_tuple), - false))); - ereport(ERROR, (errcode(ERRCODE_DATATYPE_MISMATCH), errmsg("child table is missing constraint \"%s\"", NameStr(parent_con->conname)))); - } } systable_endscan(parent_scan); @@ -16804,18 +16115,6 @@ ATExecDropInherit(Relation rel, RangeVar *parent, LOCKMODE lockmode) /* Off to RemoveInheritance() where most of the work happens */ RemoveInheritance(rel, parent_rel, false); - /* - * If parent_rel has a primary key, then child_rel has not-null - * constraints that make these columns as non nullable. Mark those - * constraints as no longer inherited by this parent. - */ - ATInheritAdjustNotNulls(parent_rel, rel, -1); - - /* - * If the parent has a primary key, then we decrement counts for all NOT - * NULL constraints - */ - ObjectAddressSet(address, RelationRelationId, RelationGetRelid(parent_rel)); @@ -16924,7 +16223,6 @@ RemoveInheritance(Relation child_rel, Relation parent_rel, bool expect_detached) HeapTuple attributeTuple, constraintTuple; List *connames; - List *nncolumns; bool found; bool is_partitioning; @@ -16993,8 +16291,6 @@ RemoveInheritance(Relation child_rel, Relation parent_rel, bool expect_detached) * this, we first need a list of the names of the parent's check * constraints. (We cheat a bit by only checking for name matches, * assuming that the expressions will match.) - * - * For NOT NULL columns, we store column numbers to match. */ catalogRelation = table_open(ConstraintRelationId, RowExclusiveLock); ScanKeyInit(&key[0], @@ -17005,7 +16301,6 @@ RemoveInheritance(Relation child_rel, Relation parent_rel, bool expect_detached) true, NULL, 1, key); connames = NIL; - nncolumns = NIL; while (HeapTupleIsValid(constraintTuple = systable_getnext(scan))) { @@ -17013,8 +16308,6 @@ RemoveInheritance(Relation child_rel, Relation parent_rel, bool expect_detached) if (con->contype == CONSTRAINT_CHECK) connames = lappend(connames, pstrdup(NameStr(con->conname))); - if (con->contype == CONSTRAINT_NOTNULL) - nncolumns = lappend_int(nncolumns, extractNotNullColumn(constraintTuple)); } systable_endscan(scan); @@ -17030,40 +16323,20 @@ RemoveInheritance(Relation child_rel, Relation parent_rel, bool expect_detached) while (HeapTupleIsValid(constraintTuple = systable_getnext(scan))) { Form_pg_constraint con = (Form_pg_constraint) GETSTRUCT(constraintTuple); - bool match = false; - ListCell *lc; + bool match; - /* - * Match CHECK constraints by name, not-null constraints by column - * number, and ignore all others. - */ - if (con->contype == CONSTRAINT_CHECK) - { - foreach(lc, connames) - { - if (con->contype == CONSTRAINT_CHECK && - strcmp(NameStr(con->conname), (char *) lfirst(lc)) == 0) - { - match = true; - break; - } - } - } - else if (con->contype == CONSTRAINT_NOTNULL) - { - AttrNumber child_attno = extractNotNullColumn(constraintTuple); + if (con->contype != CONSTRAINT_CHECK) + continue; - foreach(lc, nncolumns) + match = false; + foreach_ptr(char, chkname, connames) + { + if (strcmp(NameStr(con->conname), chkname) == 0) { - if (lfirst_int(lc) == child_attno) - { - match = true; - break; - } + match = true; + break; } } - else - continue; if (match) { @@ -17103,54 +16376,6 @@ RemoveInheritance(Relation child_rel, Relation parent_rel, bool expect_detached) } /* - * Adjust coninhcount of not-null constraints upwards or downwards when a - * table is marked as inheriting or no longer doing so a table with a primary - * key. - * - * Note: these constraints are not dropped, even if their inhcount goes to zero - * and conislocal is false. Instead we mark the constraints as locally defined. - * This is seen as more useful behavior, with no downsides. The user can always - * drop them afterwards. - */ -static void -ATInheritAdjustNotNulls(Relation parent_rel, Relation child_rel, int inhcount) -{ - Bitmapset *pkattnos; - - /* Quick exit when parent has no PK */ - if (!parent_rel->rd_rel->relhasindex) - return; - - pkattnos = RelationGetIndexAttrBitmap(parent_rel, - INDEX_ATTR_BITMAP_PRIMARY_KEY); - if (pkattnos != NULL) - { - Bitmapset *childattnums = NULL; - AttrMap *attmap; - int i; - - attmap = build_attrmap_by_name(RelationGetDescr(parent_rel), - RelationGetDescr(child_rel), true); - - i = -1; - while ((i = bms_next_member(pkattnos, i)) >= 0) - { - childattnums = bms_add_member(childattnums, - attmap->attnums[i + FirstLowInvalidHeapAttributeNumber - 1]); - } - - /* - * CCI is needed in case there's a NOT NULL PRIMARY KEY column in the - * parent: the relevant not-null constraint in the child already had - * its inhcount modified earlier. - */ - CommandCounterIncrement(); - AdjustNotNullInheritance(RelationGetRelid(child_rel), childattnums, - inhcount); - } -} - -/* * Drop the dependency created by StoreCatalogInheritance1 (CREATE TABLE * INHERITS/ALTER TABLE INHERIT -- refclassid will be RelationRelationId) or * heap_create_with_catalog (CREATE TABLE OF/ALTER TABLE OF -- refclassid will @@ -19557,10 +18782,9 @@ AttachPartitionEnsureIndexes(List **wqueue, Relation rel, Relation attachrel) attachInfos = palloc(sizeof(IndexInfo *) * list_length(attachRelIdxs)); /* Build arrays of all existing indexes and their IndexInfos */ - foreach(cell, attachRelIdxs) + foreach_oid(cldIdxId, attachRelIdxs) { - Oid cldIdxId = lfirst_oid(cell); - int i = foreach_current_index(cell); + int i = foreach_current_index(cldIdxId); attachrelIdxRels[i] = index_open(cldIdxId, AccessShareLock); attachInfos[i] = BuildIndexInfo(attachrelIdxRels[i]); @@ -19694,28 +18918,6 @@ AttachPartitionEnsureIndexes(List **wqueue, Relation rel, Relation attachrel) stmt = generateClonedIndexStmt(NULL, idxRel, attmap, &conOid); - - /* - * If the index is a primary key, mark all columns as NOT NULL if - * they aren't already. - */ - if (stmt->primary) - { - MemoryContextSwitchTo(oldcxt); - for (int j = 0; j < info->ii_NumIndexKeyAttrs; j++) - { - AttrNumber childattno; - - childattno = get_attnum(RelationGetRelid(attachrel), - get_attname(RelationGetRelid(rel), - info->ii_IndexAttrNumbers[j], - false)); - set_attnotnull(wqueue, attachrel, childattno, - true, AccessExclusiveLock); - } - MemoryContextSwitchTo(cxt); - } - DefineIndex(RelationGetRelid(attachrel), stmt, InvalidOid, RelationGetRelid(idxRel), conOid, @@ -20338,7 +19540,7 @@ ATExecDetachPartitionFinalize(Relation rel, RangeVar *name) * DetachAddConstraintIfNeeded * Subroutine for ATExecDetachPartition. Create a constraint that * takes the place of the partition constraint, but avoid creating - * a dupe if a constraint already exists which implies the needed + * a dupe if an constraint already exists which implies the needed * constraint. */ static void @@ -20371,8 +19573,8 @@ DetachAddConstraintIfNeeded(List **wqueue, Relation partRel) n->initially_valid = true; n->skip_validation = true; /* It's a re-add, since it nominally already exists */ - ATAddCheckNNConstraint(wqueue, tab, partRel, n, - true, false, true, ShareUpdateExclusiveLock); + ATAddCheckConstraint(wqueue, tab, partRel, n, + true, false, true, ShareUpdateExclusiveLock); } } @@ -20641,13 +19843,6 @@ ATExecAttachPartitionIdx(List **wqueue, Relation parentIdx, RangeVar *name) RelationGetRelationName(partIdx)))); } - /* - * If it's a primary key, make sure the columns in the partition are - * NOT NULL. - */ - if (parentIdx->rd_index->indisprimary) - verifyPartitionIndexNotNull(childInfo, partTbl); - /* All good -- do it */ IndexSetParentIndex(partIdx, RelationGetRelid(parentIdx)); if (OidIsValid(constraintOid)) @@ -20792,29 +19987,6 @@ validatePartitionedIndex(Relation partedIdx, Relation partedTbl) } /* - * When attaching an index as a partition of a partitioned index which is a - * primary key, verify that all the columns in the partition are marked NOT - * NULL. - */ -static void -verifyPartitionIndexNotNull(IndexInfo *iinfo, Relation partition) -{ - for (int i = 0; i < iinfo->ii_NumIndexKeyAttrs; i++) - { - Form_pg_attribute att = TupleDescAttr(RelationGetDescr(partition), - iinfo->ii_IndexAttrNumbers[i] - 1); - - if (!att->attnotnull) - ereport(ERROR, - errcode(ERRCODE_INVALID_TABLE_DEFINITION), - errmsg("invalid primary key definition"), - errdetail("Column \"%s\" of relation \"%s\" is not marked NOT NULL.", - NameStr(att->attname), - RelationGetRelationName(partition))); - } -} - -/* * Return an OID list of constraints that reference the given relation * that are marked as having a parent constraints. */ |