aboutsummaryrefslogtreecommitdiff
path: root/src/backend
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend')
-rw-r--r--src/backend/catalog/pg_constraint.c237
-rw-r--r--src/backend/commands/tablecmds.c193
-rw-r--r--src/backend/parser/parse_utilcmd.c12
-rw-r--r--src/backend/utils/adt/ri_triggers.c59
4 files changed, 431 insertions, 70 deletions
diff --git a/src/backend/catalog/pg_constraint.c b/src/backend/catalog/pg_constraint.c
index 4f1a27a7d34..153522782d4 100644
--- a/src/backend/catalog/pg_constraint.c
+++ b/src/backend/catalog/pg_constraint.c
@@ -26,6 +26,7 @@
#include "catalog/pg_operator.h"
#include "catalog/pg_type.h"
#include "commands/defrem.h"
+#include "commands/tablecmds.h"
#include "utils/array.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
@@ -377,6 +378,242 @@ CreateConstraintEntry(const char *constraintName,
return conOid;
}
+/*
+ * CloneForeignKeyConstraints
+ * Clone foreign keys from a partitioned table to a newly acquired
+ * partition.
+ *
+ * relationId is a partition of parentId, so we can be certain that it has the
+ * same columns with the same datatypes. The columns may be in different
+ * order, though.
+ *
+ * The *cloned list is appended ClonedConstraint elements describing what was
+ * created.
+ */
+void
+CloneForeignKeyConstraints(Oid parentId, Oid relationId, List **cloned)
+{
+ Relation pg_constraint;
+ Relation parentRel;
+ Relation rel;
+ ScanKeyData key;
+ SysScanDesc scan;
+ TupleDesc tupdesc;
+ HeapTuple tuple;
+ AttrNumber *attmap;
+
+ parentRel = heap_open(parentId, NoLock); /* already got lock */
+ /* see ATAddForeignKeyConstraint about lock level */
+ rel = heap_open(relationId, AccessExclusiveLock);
+
+ pg_constraint = heap_open(ConstraintRelationId, RowShareLock);
+ tupdesc = RelationGetDescr(pg_constraint);
+
+ /*
+ * The constraint key may differ, if the columns in the partition are
+ * different. This map is used to convert them.
+ */
+ attmap = convert_tuples_by_name_map(RelationGetDescr(rel),
+ RelationGetDescr(parentRel),
+ gettext_noop("could not convert row type"));
+
+ ScanKeyInit(&key,
+ Anum_pg_constraint_conrelid, BTEqualStrategyNumber,
+ F_OIDEQ, ObjectIdGetDatum(parentId));
+ scan = systable_beginscan(pg_constraint, ConstraintRelidIndexId, true,
+ NULL, 1, &key);
+
+ while ((tuple = systable_getnext(scan)) != NULL)
+ {
+ Form_pg_constraint constrForm = (Form_pg_constraint) GETSTRUCT(tuple);
+ AttrNumber conkey[INDEX_MAX_KEYS];
+ AttrNumber mapped_conkey[INDEX_MAX_KEYS];
+ AttrNumber confkey[INDEX_MAX_KEYS];
+ Oid conpfeqop[INDEX_MAX_KEYS];
+ Oid conppeqop[INDEX_MAX_KEYS];
+ Oid conffeqop[INDEX_MAX_KEYS];
+ Constraint *fkconstraint;
+ ClonedConstraint *newc;
+ Oid constrOid;
+ ObjectAddress parentAddr,
+ childAddr;
+ int nelem;
+ int i;
+ ArrayType *arr;
+ Datum datum;
+ bool isnull;
+
+ /* only foreign keys */
+ if (constrForm->contype != CONSTRAINT_FOREIGN)
+ continue;
+
+ ObjectAddressSet(parentAddr, ConstraintRelationId,
+ HeapTupleGetOid(tuple));
+
+ datum = fastgetattr(tuple, Anum_pg_constraint_conkey,
+ tupdesc, &isnull);
+ if (isnull)
+ elog(ERROR, "null conkey");
+ arr = DatumGetArrayTypeP(datum);
+ nelem = ARR_DIMS(arr)[0];
+ if (ARR_NDIM(arr) != 1 ||
+ nelem < 1 ||
+ nelem > INDEX_MAX_KEYS ||
+ ARR_HASNULL(arr) ||
+ ARR_ELEMTYPE(arr) != INT2OID)
+ elog(ERROR, "conkey is not a 1-D smallint array");
+ memcpy(conkey, ARR_DATA_PTR(arr), nelem * sizeof(AttrNumber));
+
+ for (i = 0; i < nelem; i++)
+ mapped_conkey[i] = attmap[conkey[i] - 1];
+
+ datum = fastgetattr(tuple, Anum_pg_constraint_confkey,
+ tupdesc, &isnull);
+ if (isnull)
+ elog(ERROR, "null confkey");
+ arr = DatumGetArrayTypeP(datum);
+ nelem = ARR_DIMS(arr)[0];
+ if (ARR_NDIM(arr) != 1 ||
+ nelem < 1 ||
+ nelem > INDEX_MAX_KEYS ||
+ ARR_HASNULL(arr) ||
+ ARR_ELEMTYPE(arr) != INT2OID)
+ elog(ERROR, "confkey is not a 1-D smallint array");
+ memcpy(confkey, ARR_DATA_PTR(arr), nelem * sizeof(AttrNumber));
+
+ datum = fastgetattr(tuple, Anum_pg_constraint_conpfeqop,
+ tupdesc, &isnull);
+ if (isnull)
+ elog(ERROR, "null conpfeqop");
+ arr = DatumGetArrayTypeP(datum);
+ nelem = ARR_DIMS(arr)[0];
+ if (ARR_NDIM(arr) != 1 ||
+ nelem < 1 ||
+ nelem > INDEX_MAX_KEYS ||
+ ARR_HASNULL(arr) ||
+ ARR_ELEMTYPE(arr) != OIDOID)
+ elog(ERROR, "conpfeqop is not a 1-D OID array");
+ memcpy(conpfeqop, ARR_DATA_PTR(arr), nelem * sizeof(Oid));
+
+ datum = fastgetattr(tuple, Anum_pg_constraint_conpfeqop,
+ tupdesc, &isnull);
+ if (isnull)
+ elog(ERROR, "null conpfeqop");
+ arr = DatumGetArrayTypeP(datum);
+ nelem = ARR_DIMS(arr)[0];
+ if (ARR_NDIM(arr) != 1 ||
+ nelem < 1 ||
+ nelem > INDEX_MAX_KEYS ||
+ ARR_HASNULL(arr) ||
+ ARR_ELEMTYPE(arr) != OIDOID)
+ elog(ERROR, "conpfeqop is not a 1-D OID array");
+ memcpy(conpfeqop, ARR_DATA_PTR(arr), nelem * sizeof(Oid));
+
+ datum = fastgetattr(tuple, Anum_pg_constraint_conppeqop,
+ tupdesc, &isnull);
+ if (isnull)
+ elog(ERROR, "null conppeqop");
+ arr = DatumGetArrayTypeP(datum);
+ nelem = ARR_DIMS(arr)[0];
+ if (ARR_NDIM(arr) != 1 ||
+ nelem < 1 ||
+ nelem > INDEX_MAX_KEYS ||
+ ARR_HASNULL(arr) ||
+ ARR_ELEMTYPE(arr) != OIDOID)
+ elog(ERROR, "conppeqop is not a 1-D OID array");
+ memcpy(conppeqop, ARR_DATA_PTR(arr), nelem * sizeof(Oid));
+
+ datum = fastgetattr(tuple, Anum_pg_constraint_conffeqop,
+ tupdesc, &isnull);
+ if (isnull)
+ elog(ERROR, "null conffeqop");
+ arr = DatumGetArrayTypeP(datum);
+ nelem = ARR_DIMS(arr)[0];
+ if (ARR_NDIM(arr) != 1 ||
+ nelem < 1 ||
+ nelem > INDEX_MAX_KEYS ||
+ ARR_HASNULL(arr) ||
+ ARR_ELEMTYPE(arr) != OIDOID)
+ elog(ERROR, "conffeqop is not a 1-D OID array");
+ memcpy(conffeqop, ARR_DATA_PTR(arr), nelem * sizeof(Oid));
+
+ constrOid =
+ CreateConstraintEntry(NameStr(constrForm->conname),
+ constrForm->connamespace,
+ CONSTRAINT_FOREIGN,
+ constrForm->condeferrable,
+ constrForm->condeferred,
+ constrForm->convalidated,
+ HeapTupleGetOid(tuple),
+ relationId,
+ mapped_conkey,
+ nelem,
+ InvalidOid, /* not a domain constraint */
+ constrForm->conindid, /* same index */
+ constrForm->confrelid, /* same foreign rel */
+ confkey,
+ conpfeqop,
+ conppeqop,
+ conffeqop,
+ nelem,
+ constrForm->confupdtype,
+ constrForm->confdeltype,
+ constrForm->confmatchtype,
+ NULL,
+ NULL,
+ NULL,
+ NULL,
+ false,
+ 1, false, true);
+
+ ObjectAddressSet(childAddr, ConstraintRelationId, constrOid);
+ recordDependencyOn(&childAddr, &parentAddr, DEPENDENCY_INTERNAL_AUTO);
+
+ fkconstraint = makeNode(Constraint);
+ /* for now this is all we need */
+ fkconstraint->fk_upd_action = constrForm->confupdtype;
+ fkconstraint->fk_del_action = constrForm->confdeltype;
+ fkconstraint->deferrable = constrForm->condeferrable;
+ fkconstraint->initdeferred = constrForm->condeferred;
+
+ createForeignKeyTriggers(rel, constrForm->confrelid, fkconstraint,
+ constrOid, constrForm->conindid, false);
+
+ if (cloned)
+ {
+ /*
+ * Feed back caller about the constraints we created, so that they can
+ * set up constraint verification.
+ */
+ newc = palloc(sizeof(ClonedConstraint));
+ newc->relid = relationId;
+ newc->refrelid = constrForm->confrelid;
+ newc->conindid = constrForm->conindid;
+ newc->conid = constrOid;
+ newc->constraint = fkconstraint;
+
+ *cloned = lappend(*cloned, newc);
+ }
+ }
+ systable_endscan(scan);
+
+ pfree(attmap);
+
+ if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+ {
+ PartitionDesc partdesc = RelationGetPartitionDesc(rel);
+ int i;
+
+ for (i = 0; i < partdesc->nparts; i++)
+ CloneForeignKeyConstraints(RelationGetRelid(rel),
+ partdesc->oids[i],
+ cloned);
+ }
+
+ heap_close(rel, NoLock); /* keep lock till commit */
+ heap_close(parentRel, NoLock);
+ heap_close(pg_constraint, RowShareLock);
+}
/*
* Test whether given name is currently used as a constraint name
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index c0f987cc814..ec2f9616edd 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -338,9 +338,6 @@ static void validateCheckConstraint(Relation rel, HeapTuple constrtup);
static void validateForeignKeyConstraint(char *conname,
Relation rel, Relation pkrel,
Oid pkindOid, Oid constraintOid);
-static void createForeignKeyTriggers(Relation rel, Oid refRelOid,
- Constraint *fkconstraint,
- Oid constraintOid, Oid indexOid);
static void ATController(AlterTableStmt *parsetree,
Relation rel, List *cmds, bool recurse, LOCKMODE lockmode);
static void ATPrepCmd(List **wqueue, Relation rel, AlterTableCmd *cmd,
@@ -411,8 +408,10 @@ static ObjectAddress ATAddCheckConstraint(List **wqueue,
Constraint *constr,
bool recurse, bool recursing, bool is_readd,
LOCKMODE lockmode);
-static ObjectAddress ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
- Constraint *fkconstraint, LOCKMODE lockmode);
+static ObjectAddress ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab,
+ Relation rel, Constraint *fkconstraint, Oid parentConstr,
+ bool recurse, bool recursing,
+ LOCKMODE lockmode);
static void ATExecDropConstraint(Relation rel, const char *constrName,
DropBehavior behavior,
bool recurse, bool recursing,
@@ -505,6 +504,7 @@ static void refuseDupeIndexAttach(Relation parentIdx, Relation partIdx,
* relkind: relkind to assign to the new relation
* ownerId: if not InvalidOid, use this as the new relation's owner.
* typaddress: if not null, it's set to the pg_type entry's address.
+ * queryString: for error reporting
*
* Note that permissions checks are done against current user regardless of
* ownerId. A nonzero ownerId is used when someone is creating a relation
@@ -908,8 +908,8 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId,
}
/*
- * If we're creating a partition, create now all the indexes and triggers
- * defined in the parent.
+ * If we're creating a partition, create now all the indexes, triggers,
+ * FKs defined in the parent.
*
* We can't do it earlier, because DefineIndex wants to know the partition
* key which we just stored.
@@ -961,6 +961,12 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId,
if (parent->trigdesc != NULL)
CloneRowTriggersToPartition(parent, rel);
+ /*
+ * And foreign keys too. Note that because we're freshly creating the
+ * table, there is no need to verify these new constraints.
+ */
+ CloneForeignKeyConstraints(parentId, relationId, NULL);
+
heap_close(parent, NoLock);
}
@@ -7025,7 +7031,9 @@ ATExecAddConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
RelationGetNamespace(rel),
NIL);
- address = ATAddForeignKeyConstraint(tab, rel, newConstraint,
+ address = ATAddForeignKeyConstraint(wqueue, tab, rel,
+ newConstraint, InvalidOid,
+ recurse, false,
lockmode);
break;
@@ -7180,8 +7188,9 @@ ATAddCheckConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
* We do permissions checks here, however.
*/
static ObjectAddress
-ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
- Constraint *fkconstraint, LOCKMODE lockmode)
+ATAddForeignKeyConstraint(List **wqueue, AlteredTableInfo *tab, Relation rel,
+ Constraint *fkconstraint, Oid parentConstr,
+ bool recurse, bool recursing, LOCKMODE lockmode)
{
Relation pkrel;
int16 pkattnum[INDEX_MAX_KEYS];
@@ -7220,6 +7229,21 @@ ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
errmsg("cannot reference partitioned table \"%s\"",
RelationGetRelationName(pkrel))));
+ if (rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+ {
+ if (!recurse)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("foreign key referencing partitioned table \"%s\" must not be ONLY",
+ RelationGetRelationName(pkrel))));
+ if (fkconstraint->skip_validation && !fkconstraint->initially_valid)
+ ereport(ERROR,
+ (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+ errmsg("cannot add NOT VALID foreign key to relation \"%s\"",
+ RelationGetRelationName(pkrel)),
+ errdetail("This feature is not yet supported on partitioned tables.")));
+ }
+
if (pkrel->rd_rel->relkind != RELKIND_RELATION)
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
@@ -7527,7 +7551,7 @@ ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
fkconstraint->deferrable,
fkconstraint->initdeferred,
fkconstraint->initially_valid,
- InvalidOid, /* no parent constraint */
+ parentConstr,
RelationGetRelid(rel),
fkattnum,
numfks,
@@ -7553,10 +7577,12 @@ ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
ObjectAddressSet(address, ConstraintRelationId, constrOid);
/*
- * Create the triggers that will enforce the constraint.
+ * Create the triggers that will enforce the constraint. We only want
+ * the action triggers to appear for the parent partitioned relation,
+ * even though the constraints also exist below.
*/
createForeignKeyTriggers(rel, RelationGetRelid(pkrel), fkconstraint,
- constrOid, indexOid);
+ constrOid, indexOid, !recursing);
/*
* Tell Phase 3 to check that the constraint is satisfied by existing
@@ -7581,6 +7607,40 @@ ATAddForeignKeyConstraint(AlteredTableInfo *tab, Relation rel,
}
/*
+ * When called on a partitioned table, recurse to create the constraint on
+ * the partitions also.
+ */
+ if (recurse && rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
+ {
+ PartitionDesc partdesc;
+
+ partdesc = RelationGetPartitionDesc(rel);
+
+ for (i = 0; i < partdesc->nparts; i++)
+ {
+ Oid partitionId = partdesc->oids[i];
+ Relation partition = heap_open(partitionId, lockmode);
+ AlteredTableInfo *childtab;
+ ObjectAddress childAddr;
+
+ CheckTableNotInUse(partition, "ALTER TABLE");
+
+ /* Find or create work queue entry for this table */
+ childtab = ATGetQueueEntry(wqueue, partition);
+
+ childAddr =
+ ATAddForeignKeyConstraint(wqueue, childtab, partition,
+ fkconstraint, constrOid,
+ recurse, true, lockmode);
+
+ /* Record this constraint as dependent on the parent one */
+ recordDependencyOn(&childAddr, &address, DEPENDENCY_INTERNAL_AUTO);
+
+ heap_close(partition, NoLock);
+ }
+ }
+
+ /*
* Close pk table, but keep lock until we've committed.
*/
heap_close(pkrel, NoLock);
@@ -7842,8 +7902,8 @@ ATExecValidateConstraint(Relation rel, char *constrName, bool recurse,
heap_close(refrel, NoLock);
/*
- * Foreign keys do not inherit, so we purposely ignore the
- * recursion bit here
+ * We disallow creating invalid foreign keys to or from
+ * partitioned tables, so ignoring the recursion bit is okay.
*/
}
else if (con->contype == CONSTRAINT_CHECK)
@@ -8489,23 +8549,16 @@ CreateFKCheckTrigger(Oid myRelOid, Oid refRelOid, Constraint *fkconstraint,
}
/*
- * Create the triggers that implement an FK constraint.
- *
- * NB: if you change any trigger properties here, see also
- * ATExecAlterConstraint.
+ * createForeignKeyActionTriggers
+ * Create the referenced-side "action" triggers that implement a foreign
+ * key.
*/
static void
-createForeignKeyTriggers(Relation rel, Oid refRelOid, Constraint *fkconstraint,
- Oid constraintOid, Oid indexOid)
+createForeignKeyActionTriggers(Relation rel, Oid refRelOid, Constraint *fkconstraint,
+ Oid constraintOid, Oid indexOid)
{
- Oid myRelOid;
CreateTrigStmt *fk_trigger;
- myRelOid = RelationGetRelid(rel);
-
- /* Make changes-so-far visible */
- CommandCounterIncrement();
-
/*
* Build and execute a CREATE CONSTRAINT TRIGGER statement for the ON
* DELETE action on the referenced table.
@@ -8555,7 +8608,8 @@ createForeignKeyTriggers(Relation rel, Oid refRelOid, Constraint *fkconstraint,
}
fk_trigger->args = NIL;
- (void) CreateTrigger(fk_trigger, NULL, refRelOid, myRelOid, constraintOid,
+ (void) CreateTrigger(fk_trigger, NULL, refRelOid, RelationGetRelid(rel),
+ constraintOid,
indexOid, InvalidOid, InvalidOid, NULL, true, false);
/* Make changes-so-far visible */
@@ -8610,16 +8664,21 @@ createForeignKeyTriggers(Relation rel, Oid refRelOid, Constraint *fkconstraint,
}
fk_trigger->args = NIL;
- (void) CreateTrigger(fk_trigger, NULL, refRelOid, myRelOid, constraintOid,
+ (void) CreateTrigger(fk_trigger, NULL, refRelOid, RelationGetRelid(rel),
+ constraintOid,
indexOid, InvalidOid, InvalidOid, NULL, true, false);
+}
- /* Make changes-so-far visible */
- CommandCounterIncrement();
-
- /*
- * Build and execute CREATE CONSTRAINT TRIGGER statements for the CHECK
- * action for both INSERTs and UPDATEs on the referencing table.
- */
+/*
+ * createForeignKeyCheckTriggers
+ * Create the referencing-side "check" triggers that implement a foreign
+ * key.
+ */
+static void
+createForeignKeyCheckTriggers(Oid myRelOid, Oid refRelOid,
+ Constraint *fkconstraint, Oid constraintOid,
+ Oid indexOid)
+{
CreateFKCheckTrigger(myRelOid, refRelOid, fkconstraint, constraintOid,
indexOid, true);
CreateFKCheckTrigger(myRelOid, refRelOid, fkconstraint, constraintOid,
@@ -8627,6 +8686,37 @@ createForeignKeyTriggers(Relation rel, Oid refRelOid, Constraint *fkconstraint,
}
/*
+ * Create the triggers that implement an FK constraint.
+ *
+ * NB: if you change any trigger properties here, see also
+ * ATExecAlterConstraint.
+ */
+void
+createForeignKeyTriggers(Relation rel, Oid refRelOid, Constraint *fkconstraint,
+ Oid constraintOid, Oid indexOid, bool create_action)
+{
+ /*
+ * For the referenced side, create action triggers, if requested. (If the
+ * referencing side is partitioned, there is still only one trigger, which
+ * runs on the referenced side and points to the top of the referencing
+ * hierarchy.)
+ */
+ if (create_action)
+ createForeignKeyActionTriggers(rel, refRelOid, fkconstraint, constraintOid,
+ indexOid);
+
+ /*
+ * For the referencing side, create the check triggers. We only need these
+ * on the partitions.
+ */
+ if (rel->rd_rel->relkind != RELKIND_PARTITIONED_TABLE)
+ createForeignKeyCheckTriggers(RelationGetRelid(rel), refRelOid,
+ fkconstraint, constraintOid, indexOid);
+
+ CommandCounterIncrement();
+}
+
+/*
* ALTER TABLE DROP CONSTRAINT
*
* Like DROP COLUMN, we can't use the normal ALTER TABLE recursion mechanism.
@@ -13889,6 +13979,8 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd)
bool found_whole_row;
Oid defaultPartOid;
List *partBoundConstraint;
+ List *cloned;
+ ListCell *l;
/*
* We must lock the default partition, because attaching a new partition
@@ -14072,6 +14164,35 @@ ATExecAttachPartition(List **wqueue, Relation rel, PartitionCmd *cmd)
CloneRowTriggersToPartition(rel, attachrel);
/*
+ * Clone foreign key constraints, and setup for Phase 3 to verify them.
+ */
+ cloned = NIL;
+ CloneForeignKeyConstraints(RelationGetRelid(rel),
+ RelationGetRelid(attachrel), &cloned);
+ foreach(l, cloned)
+ {
+ ClonedConstraint *cloned = lfirst(l);
+ NewConstraint *newcon;
+ Relation clonedrel;
+ AlteredTableInfo *parttab;
+
+ clonedrel = relation_open(cloned->relid, NoLock);
+ parttab = ATGetQueueEntry(wqueue, clonedrel);
+
+ newcon = (NewConstraint *) palloc0(sizeof(NewConstraint));
+ newcon->name = cloned->constraint->conname;
+ newcon->contype = CONSTR_FOREIGN;
+ newcon->refrelid = cloned->refrelid;
+ newcon->refindid = cloned->conindid;
+ newcon->conid = cloned->conid;
+ newcon->qual = (Node *) cloned->constraint;
+
+ parttab->constraints = lappend(parttab->constraints, newcon);
+
+ relation_close(clonedrel, NoLock);
+ }
+
+ /*
* Generate partition constraint from the partition bound specification.
* If the parent itself is a partition, make sure to include its
* constraint as well.
diff --git a/src/backend/parser/parse_utilcmd.c b/src/backend/parser/parse_utilcmd.c
index 0fd14f43c6b..513a5dda262 100644
--- a/src/backend/parser/parse_utilcmd.c
+++ b/src/backend/parser/parse_utilcmd.c
@@ -749,12 +749,6 @@ transformColumnDefinition(CreateStmtContext *cxt, ColumnDef *column)
errmsg("foreign key constraints are not supported on foreign tables"),
parser_errposition(cxt->pstate,
constraint->location)));
- if (cxt->ispartitioned)
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("foreign key constraints are not supported on partitioned tables"),
- parser_errposition(cxt->pstate,
- constraint->location)));
/*
* Fill in the current attribute's name and throw it into the
@@ -868,12 +862,6 @@ transformTableConstraint(CreateStmtContext *cxt, Constraint *constraint)
errmsg("foreign key constraints are not supported on foreign tables"),
parser_errposition(cxt->pstate,
constraint->location)));
- if (cxt->ispartitioned)
- ereport(ERROR,
- (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("foreign key constraints are not supported on partitioned tables"),
- parser_errposition(cxt->pstate,
- constraint->location)));
cxt->fkconstraints = lappend(cxt->fkconstraints, constraint);
break;
diff --git a/src/backend/utils/adt/ri_triggers.c b/src/backend/utils/adt/ri_triggers.c
index d0fe65cea9b..90ddbe55167 100644
--- a/src/backend/utils/adt/ri_triggers.c
+++ b/src/backend/utils/adt/ri_triggers.c
@@ -788,20 +788,23 @@ ri_restrict(TriggerData *trigdata, bool is_no_action)
char paramname[16];
const char *querysep;
Oid queryoids[RI_MAX_NUMKEYS];
+ const char *fk_only;
int i;
/* ----------
* The query string built is
- * SELECT 1 FROM ONLY <fktable> x WHERE $1 = fkatt1 [AND ...]
+ * SELECT 1 FROM [ONLY] <fktable> x WHERE $1 = fkatt1 [AND ...]
* FOR KEY SHARE OF x
* The type id's for the $ parameters are those of the
* corresponding PK attributes.
* ----------
*/
initStringInfo(&querybuf);
+ fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
+ "" : "ONLY ";
quoteRelationName(fkrelname, fk_rel);
- appendStringInfo(&querybuf, "SELECT 1 FROM ONLY %s x",
- fkrelname);
+ appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x",
+ fk_only, fkrelname);
querysep = "WHERE";
for (i = 0; i < riinfo->nkeys; i++)
{
@@ -947,17 +950,21 @@ RI_FKey_cascade_del(PG_FUNCTION_ARGS)
char paramname[16];
const char *querysep;
Oid queryoids[RI_MAX_NUMKEYS];
+ const char *fk_only;
/* ----------
* The query string built is
- * DELETE FROM ONLY <fktable> WHERE $1 = fkatt1 [AND ...]
+ * DELETE FROM [ONLY] <fktable> WHERE $1 = fkatt1 [AND ...]
* The type id's for the $ parameters are those of the
* corresponding PK attributes.
* ----------
*/
initStringInfo(&querybuf);
+ fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
+ "" : "ONLY ";
quoteRelationName(fkrelname, fk_rel);
- appendStringInfo(&querybuf, "DELETE FROM ONLY %s", fkrelname);
+ appendStringInfo(&querybuf, "DELETE FROM %s%s",
+ fk_only, fkrelname);
querysep = "WHERE";
for (i = 0; i < riinfo->nkeys; i++)
{
@@ -1118,10 +1125,11 @@ RI_FKey_cascade_upd(PG_FUNCTION_ARGS)
const char *querysep;
const char *qualsep;
Oid queryoids[RI_MAX_NUMKEYS * 2];
+ const char *fk_only;
/* ----------
* The query string built is
- * UPDATE ONLY <fktable> SET fkatt1 = $1 [, ...]
+ * UPDATE [ONLY] <fktable> SET fkatt1 = $1 [, ...]
* WHERE $n = fkatt1 [AND ...]
* The type id's for the $ parameters are those of the
* corresponding PK attributes. Note that we are assuming
@@ -1131,8 +1139,11 @@ RI_FKey_cascade_upd(PG_FUNCTION_ARGS)
*/
initStringInfo(&querybuf);
initStringInfo(&qualbuf);
+ fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
+ "" : "ONLY ";
quoteRelationName(fkrelname, fk_rel);
- appendStringInfo(&querybuf, "UPDATE ONLY %s SET", fkrelname);
+ appendStringInfo(&querybuf, "UPDATE %s%s SET",
+ fk_only, fkrelname);
querysep = "";
qualsep = "WHERE";
for (i = 0, j = riinfo->nkeys; i < riinfo->nkeys; i++, j++)
@@ -1337,11 +1348,12 @@ ri_setnull(TriggerData *trigdata)
char paramname[16];
const char *querysep;
const char *qualsep;
+ const char *fk_only;
Oid queryoids[RI_MAX_NUMKEYS];
/* ----------
* The query string built is
- * UPDATE ONLY <fktable> SET fkatt1 = NULL [, ...]
+ * UPDATE [ONLY] <fktable> SET fkatt1 = NULL [, ...]
* WHERE $1 = fkatt1 [AND ...]
* The type id's for the $ parameters are those of the
* corresponding PK attributes.
@@ -1349,8 +1361,11 @@ ri_setnull(TriggerData *trigdata)
*/
initStringInfo(&querybuf);
initStringInfo(&qualbuf);
+ fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
+ "" : "ONLY ";
quoteRelationName(fkrelname, fk_rel);
- appendStringInfo(&querybuf, "UPDATE ONLY %s SET", fkrelname);
+ appendStringInfo(&querybuf, "UPDATE %s%s SET",
+ fk_only, fkrelname);
querysep = "";
qualsep = "WHERE";
for (i = 0; i < riinfo->nkeys; i++)
@@ -1554,11 +1569,12 @@ ri_setdefault(TriggerData *trigdata)
const char *querysep;
const char *qualsep;
Oid queryoids[RI_MAX_NUMKEYS];
+ const char *fk_only;
int i;
/* ----------
* The query string built is
- * UPDATE ONLY <fktable> SET fkatt1 = DEFAULT [, ...]
+ * UPDATE [ONLY] <fktable> SET fkatt1 = DEFAULT [, ...]
* WHERE $1 = fkatt1 [AND ...]
* The type id's for the $ parameters are those of the
* corresponding PK attributes.
@@ -1567,7 +1583,10 @@ ri_setdefault(TriggerData *trigdata)
initStringInfo(&querybuf);
initStringInfo(&qualbuf);
quoteRelationName(fkrelname, fk_rel);
- appendStringInfo(&querybuf, "UPDATE ONLY %s SET", fkrelname);
+ fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
+ "" : "ONLY ";
+ appendStringInfo(&querybuf, "UPDATE %s%s SET",
+ fk_only, fkrelname);
querysep = "";
qualsep = "WHERE";
for (i = 0; i < riinfo->nkeys; i++)
@@ -1838,6 +1857,7 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
RangeTblEntry *pkrte;
RangeTblEntry *fkrte;
const char *sep;
+ const char *fk_only;
int i;
int save_nestlevel;
char workmembuf[32];
@@ -1894,8 +1914,8 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
/*----------
* The query string built is:
- * SELECT fk.keycols FROM ONLY relname fk
- * LEFT OUTER JOIN ONLY pkrelname pk
+ * SELECT fk.keycols FROM [ONLY] relname fk
+ * LEFT OUTER JOIN pkrelname pk
* ON (pk.pkkeycol1=fk.keycol1 [AND ...])
* WHERE pk.pkkeycol1 IS NULL AND
* For MATCH SIMPLE:
@@ -1920,9 +1940,11 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
quoteRelationName(pkrelname, pk_rel);
quoteRelationName(fkrelname, fk_rel);
+ fk_only = fk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
+ "" : "ONLY ";
appendStringInfo(&querybuf,
- " FROM ONLY %s fk LEFT OUTER JOIN ONLY %s pk ON",
- fkrelname, pkrelname);
+ " FROM %s%s fk LEFT OUTER JOIN %s pk ON",
+ fk_only, fkrelname, pkrelname);
strcpy(pkattname, "pk.");
strcpy(fkattname, "fk.");
@@ -2298,13 +2320,6 @@ ri_FetchConstraintInfo(Trigger *trigger, Relation trig_rel, bool rel_is_pk)
elog(ERROR, "wrong pg_constraint entry for trigger \"%s\" on table \"%s\"",
trigger->tgname, RelationGetRelationName(trig_rel));
}
- else
- {
- if (riinfo->fk_relid != RelationGetRelid(trig_rel) ||
- riinfo->pk_relid != trigger->tgconstrrelid)
- elog(ERROR, "wrong pg_constraint entry for trigger \"%s\" on table \"%s\"",
- trigger->tgname, RelationGetRelationName(trig_rel));
- }
return riinfo;
}