diff options
Diffstat (limited to 'src/backend/utils/adt/ri_triggers.c')
-rw-r--r-- | src/backend/utils/adt/ri_triggers.c | 94 |
1 files changed, 92 insertions, 2 deletions
diff --git a/src/backend/utils/adt/ri_triggers.c b/src/backend/utils/adt/ri_triggers.c index 0d8b53d1b75..3d9985b17c2 100644 --- a/src/backend/utils/adt/ri_triggers.c +++ b/src/backend/utils/adt/ri_triggers.c @@ -129,6 +129,7 @@ typedef struct RI_ConstraintInfo Oid ff_eq_oprs[RI_MAX_NUMKEYS]; /* equality operators (FK = FK) */ Oid period_contained_by_oper; /* anyrange <@ anyrange */ Oid agged_period_contained_by_oper; /* fkattr <@ range_agg(pkattr) */ + Oid period_intersect_oper; /* anyrange * anyrange */ dlist_node valid_link; /* Link in list of valid entries */ } RI_ConstraintInfo; @@ -734,8 +735,11 @@ ri_restrict(TriggerData *trigdata, bool is_no_action) * not do anything. However, this check should only be made in the NO * ACTION case; in RESTRICT cases we don't wish to allow another row to be * substituted. + * + * If the foreign key has PERIOD, we incorporate looking for replacement + * rows in the main SQL query below, so we needn't do it here. */ - if (is_no_action && + if (is_no_action && !riinfo->hasperiod && ri_Check_Pk_Match(pk_rel, fk_rel, oldslot, riinfo)) { table_close(fk_rel, RowShareLock); @@ -753,8 +757,10 @@ ri_restrict(TriggerData *trigdata, bool is_no_action) if ((qplan = ri_FetchPreparedPlan(&qkey)) == NULL) { StringInfoData querybuf; + char pkrelname[MAX_QUOTED_REL_NAME_LEN]; char fkrelname[MAX_QUOTED_REL_NAME_LEN]; char attname[MAX_QUOTED_NAME_LEN]; + char periodattname[MAX_QUOTED_NAME_LEN]; char paramname[16]; const char *querysep; Oid queryoids[RI_MAX_NUMKEYS]; @@ -790,6 +796,89 @@ ri_restrict(TriggerData *trigdata, bool is_no_action) querysep = "AND"; queryoids[i] = pk_type; } + + /*---------- + * For temporal foreign keys, a reference could still be valid if the + * referenced range didn't change too much. Also if a referencing + * range extends past the current PK row, we don't want to check that + * part: some other PK row should fulfill it. We only want to check + * the part matching the PK record we've changed. Therefore to find + * invalid records we do this: + * + * SELECT 1 FROM [ONLY] <fktable> x WHERE $1 = x.fkatt1 [AND ...] + * -- begin temporal + * AND $n && x.fkperiod + * AND NOT coalesce((x.fkperiod * $n) <@ + * (SELECT range_agg(r) + * FROM (SELECT y.pkperiod r + * FROM [ONLY] <pktable> y + * WHERE $1 = y.pkatt1 [AND ...] AND $n && y.pkperiod + * FOR KEY SHARE OF y) y2), false) + * -- end temporal + * FOR KEY SHARE OF x + * + * We need the coalesce in case the first subquery returns no rows. + * We need the second subquery because FOR KEY SHARE doesn't support + * aggregate queries. + */ + if (riinfo->hasperiod && is_no_action) + { + Oid pk_period_type = RIAttType(pk_rel, riinfo->pk_attnums[riinfo->nkeys - 1]); + Oid fk_period_type = RIAttType(fk_rel, riinfo->fk_attnums[riinfo->nkeys - 1]); + StringInfoData intersectbuf; + StringInfoData replacementsbuf; + char *pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ? + "" : "ONLY "; + + quoteOneName(attname, RIAttName(fk_rel, riinfo->fk_attnums[riinfo->nkeys - 1])); + sprintf(paramname, "$%d", riinfo->nkeys); + + appendStringInfoString(&querybuf, " AND NOT coalesce("); + + /* Intersect the fk with the old pk range */ + initStringInfo(&intersectbuf); + appendStringInfoString(&intersectbuf, "("); + ri_GenerateQual(&intersectbuf, "", + attname, fk_period_type, + riinfo->period_intersect_oper, + paramname, pk_period_type); + appendStringInfoString(&intersectbuf, ")"); + + /* Find the remaining history */ + initStringInfo(&replacementsbuf); + appendStringInfoString(&replacementsbuf, "(SELECT pg_catalog.range_agg(r) FROM "); + + quoteOneName(periodattname, RIAttName(pk_rel, riinfo->pk_attnums[riinfo->nkeys - 1])); + quoteRelationName(pkrelname, pk_rel); + appendStringInfo(&replacementsbuf, "(SELECT y.%s r FROM %s%s y", + periodattname, pk_only, pkrelname); + + /* Restrict pk rows to what matches */ + querysep = "WHERE"; + for (int i = 0; i < riinfo->nkeys; i++) + { + Oid pk_type = RIAttType(pk_rel, riinfo->pk_attnums[i]); + + quoteOneName(attname, + RIAttName(pk_rel, riinfo->pk_attnums[i])); + sprintf(paramname, "$%d", i + 1); + ri_GenerateQual(&replacementsbuf, querysep, + paramname, pk_type, + riinfo->pp_eq_oprs[i], + attname, pk_type); + querysep = "AND"; + queryoids[i] = pk_type; + } + appendStringInfoString(&replacementsbuf, " FOR KEY SHARE OF y) y2)"); + + ri_GenerateQual(&querybuf, "", + intersectbuf.data, fk_period_type, + riinfo->agged_period_contained_by_oper, + replacementsbuf.data, ANYMULTIRANGEOID); + /* end of coalesce: */ + appendStringInfoString(&querybuf, ", false)"); + } + appendStringInfoString(&querybuf, " FOR KEY SHARE OF x"); /* Prepare and save the plan */ @@ -2251,7 +2340,8 @@ ri_LoadConstraintInfo(Oid constraintOid) FindFKPeriodOpers(opclass, &riinfo->period_contained_by_oper, - &riinfo->agged_period_contained_by_oper); + &riinfo->agged_period_contained_by_oper, + &riinfo->period_intersect_oper); } ReleaseSysCache(tup); |