aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlvaro Herrera <alvherre@alvh.no-ip.org>2022-04-07 23:42:13 +0200
committerAlvaro Herrera <alvherre@alvh.no-ip.org>2022-04-07 23:42:13 +0200
commita90641eac24dfc8889122d88eb7f482cd3db8b39 (patch)
tree2504b842babaeceb8bf9f29a700c27fe5fa6a55d
parent3e707fbb4009e9ac1d0e8b78b7af9f3f03f4cf1a (diff)
downloadpostgresql-a90641eac24dfc8889122d88eb7f482cd3db8b39.tar.gz
postgresql-a90641eac24dfc8889122d88eb7f482cd3db8b39.zip
Revert "Rewrite some RI code to avoid using SPI"
This reverts commit 99392cdd78b788295e52b9f4942fa11992fd5ba9. We'd rather rewrite ri_triggers.c as a whole rather than piecemeal. Discussion: https://postgr.es/m/E1ncXX2-000mFt-Pe@gemulon.postgresql.org
-rw-r--r--src/backend/executor/execPartition.c174
-rw-r--r--src/backend/executor/nodeLockRows.c161
-rw-r--r--src/backend/utils/adt/ri_triggers.c564
-rw-r--r--src/include/executor/execPartition.h6
-rw-r--r--src/include/executor/executor.h8
-rw-r--r--src/test/isolation/expected/fk-snapshot.out4
-rw-r--r--src/test/isolation/specs/fk-snapshot.spec5
7 files changed, 317 insertions, 605 deletions
diff --git a/src/backend/executor/execPartition.c b/src/backend/executor/execPartition.c
index c22c9ac0966..615bd809735 100644
--- a/src/backend/executor/execPartition.c
+++ b/src/backend/executor/execPartition.c
@@ -176,9 +176,8 @@ static void FormPartitionKeyDatum(PartitionDispatch pd,
EState *estate,
Datum *values,
bool *isnull);
-static int get_partition_for_tuple(PartitionKey key,
- PartitionDesc partdesc,
- Datum *values, bool *isnull);
+static int get_partition_for_tuple(PartitionDispatch pd, Datum *values,
+ bool *isnull);
static char *ExecBuildSlotPartitionKeyDescription(Relation rel,
Datum *values,
bool *isnull,
@@ -319,9 +318,7 @@ ExecFindPartition(ModifyTableState *mtstate,
* these values, error out.
*/
if (partdesc->nparts == 0 ||
- (partidx = get_partition_for_tuple(dispatch->key,
- dispatch->partdesc,
- values, isnull)) < 0)
+ (partidx = get_partition_for_tuple(dispatch, values, isnull)) < 0)
{
char *val_desc;
@@ -1344,12 +1341,12 @@ FormPartitionKeyDatum(PartitionDispatch pd,
* found or -1 if none found.
*/
static int
-get_partition_for_tuple(PartitionKey key,
- PartitionDesc partdesc,
- Datum *values, bool *isnull)
+get_partition_for_tuple(PartitionDispatch pd, Datum *values, bool *isnull)
{
int bound_offset;
int part_index = -1;
+ PartitionKey key = pd->key;
+ PartitionDesc partdesc = pd->partdesc;
PartitionBoundInfo boundinfo = partdesc->boundinfo;
/* Route as appropriate based on partitioning strategy. */
@@ -1442,165 +1439,6 @@ get_partition_for_tuple(PartitionKey key,
}
/*
- * ExecGetLeafPartitionForKey
- * Finds the leaf partition of partitioned table 'root_rel' that would
- * contain the specified key tuple.
- *
- * A subset of the table's columns (including all of the partition key columns)
- * must be specified:
- * - 'key_natts' indicats the number of columns contained in the key
- * - 'key_attnums' indicates their attribute numbers as defined in 'root_rel'
- * - 'key_vals' and 'key_nulls' specify the key tuple
- *
- * Returns the leaf partition, locked with the given lockmode, or NULL if
- * there isn't one. Caller is responsibly for closing it. All intermediate
- * partitions are also locked with the same lockmode. Caller must have locked
- * the root already.
- *
- * In addition, the OID of the index of a unique constraint on the root table
- * must be given as 'root_idxoid'; *leaf_idxoid will be set to the OID of the
- * corresponding index on the returned leaf partition. (This can be used by
- * caller to search for a tuple matching the key in the leaf partition.)
- *
- * This works because the unique key defined on the root relation is required
- * to contain the partition key columns of all of the ancestors that lead up to
- * a given leaf partition.
- */
-Relation
-ExecGetLeafPartitionForKey(Relation root_rel, int key_natts,
- const AttrNumber *key_attnums,
- Datum *key_vals, char *key_nulls,
- Oid root_idxoid, int lockmode,
- Oid *leaf_idxoid)
-{
- Relation found_leafpart = NULL;
- Relation rel = root_rel;
- Oid constr_idxoid = root_idxoid;
- PartitionDirectory partdir;
-
- Assert(root_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE);
-
- *leaf_idxoid = InvalidOid;
-
- partdir = CreatePartitionDirectory(CurrentMemoryContext, true);
-
- /*
- * Descend through partitioned parents to find the leaf partition that
- * would accept a row with the provided key values, starting with the root
- * parent.
- */
- for (;;)
- {
- PartitionKey partkey = RelationGetPartitionKey(rel);
- PartitionDesc partdesc;
- Datum partkey_vals[PARTITION_MAX_KEYS];
- bool partkey_isnull[PARTITION_MAX_KEYS];
- AttrNumber *root_partattrs = partkey->partattrs;
- int found_att;
- int partidx;
- Oid partoid;
-
- CHECK_FOR_INTERRUPTS();
-
- /*
- * Collect partition key values from the unique key.
- *
- * Because we only have the root table's copy of pk_attnums, must map
- * any non-root table's partition key attribute numbers to the root
- * table's.
- */
- if (rel != root_rel)
- {
- /*
- * map->attnums will contain root table attribute numbers for each
- * attribute of the current partitioned relation.
- */
- AttrMap *map;
-
- map = build_attrmap_by_name_if_req(RelationGetDescr(root_rel),
- RelationGetDescr(rel));
- if (map)
- {
- root_partattrs = palloc(partkey->partnatts *
- sizeof(AttrNumber));
- for (int att = 0; att < partkey->partnatts; att++)
- {
- AttrNumber partattno = partkey->partattrs[att];
-
- root_partattrs[att] = map->attnums[partattno - 1];
- }
-
- free_attrmap(map);
- }
- }
-
- /*
- * Map the values/isnulls to match the partition description, as
- * necessary.
- *
- * (Referenced key specification does not allow expressions, so there
- * would not be expressions in the partition keys either.)
- */
- Assert(partkey->partexprs == NIL);
- found_att = 0;
- for (int keyatt = 0; keyatt < key_natts; keyatt++)
- {
- for (int att = 0; att < partkey->partnatts; att++)
- {
- if (root_partattrs[att] == key_attnums[keyatt])
- {
- partkey_vals[found_att] = key_vals[keyatt];
- partkey_isnull[found_att] = (key_nulls[keyatt] == 'n');
- found_att++;
- break;
- }
- }
- }
- /* We had better have found values for all partition keys */
- Assert(found_att == partkey->partnatts);
-
- if (root_partattrs != partkey->partattrs)
- pfree(root_partattrs);
-
- /* Get the PartitionDesc using the partition directory machinery. */
- partdesc = PartitionDirectoryLookup(partdir, rel);
- if (partdesc->nparts == 0)
- break;
-
- /* Find the partition for the key. */
- partidx = get_partition_for_tuple(partkey, partdesc,
- partkey_vals, partkey_isnull);
- Assert(partidx < 0 || partidx < partdesc->nparts);
-
- /* close the previous parent if any, but keep lock */
- if (rel != root_rel)
- table_close(rel, NoLock);
-
- /* No partition found. */
- if (partidx < 0)
- break;
-
- partoid = partdesc->oids[partidx];
- rel = table_open(partoid, lockmode);
- constr_idxoid = index_get_partition(rel, constr_idxoid);
-
- /*
- * We're done if the partition is a leaf, else find its partition in
- * the next iteration.
- */
- if (partdesc->is_leaf[partidx])
- {
- *leaf_idxoid = constr_idxoid;
- found_leafpart = rel;
- break;
- }
- }
-
- DestroyPartitionDirectory(partdir);
- return found_leafpart;
-}
-
-/*
* ExecBuildSlotPartitionKeyDescription
*
* This works very much like BuildIndexValueDescription() and is currently
diff --git a/src/backend/executor/nodeLockRows.c b/src/backend/executor/nodeLockRows.c
index bbccafb2cfd..1a9dab25dd6 100644
--- a/src/backend/executor/nodeLockRows.c
+++ b/src/backend/executor/nodeLockRows.c
@@ -79,7 +79,10 @@ lnext:
Datum datum;
bool isNull;
ItemPointerData tid;
+ TM_FailureData tmfd;
LockTupleMode lockmode;
+ int lockflags = 0;
+ TM_Result test;
TupleTableSlot *markSlot;
/* clear any leftover test tuple for this rel */
@@ -176,11 +179,74 @@ lnext:
break;
}
- /* skip tuple if it couldn't be locked */
- if (!ExecLockTableTuple(erm->relation, &tid, markSlot,
- estate->es_snapshot, estate->es_output_cid,
- lockmode, erm->waitPolicy, &epq_needed))
- goto lnext;
+ lockflags = TUPLE_LOCK_FLAG_LOCK_UPDATE_IN_PROGRESS;
+ if (!IsolationUsesXactSnapshot())
+ lockflags |= TUPLE_LOCK_FLAG_FIND_LAST_VERSION;
+
+ test = table_tuple_lock(erm->relation, &tid, estate->es_snapshot,
+ markSlot, estate->es_output_cid,
+ lockmode, erm->waitPolicy,
+ lockflags,
+ &tmfd);
+
+ switch (test)
+ {
+ case TM_WouldBlock:
+ /* couldn't lock tuple in SKIP LOCKED mode */
+ goto lnext;
+
+ case TM_SelfModified:
+
+ /*
+ * The target tuple was already updated or deleted by the
+ * current command, or by a later command in the current
+ * transaction. We *must* ignore the tuple in the former
+ * case, so as to avoid the "Halloween problem" of repeated
+ * update attempts. In the latter case it might be sensible
+ * to fetch the updated tuple instead, but doing so would
+ * require changing heap_update and heap_delete to not
+ * complain about updating "invisible" tuples, which seems
+ * pretty scary (table_tuple_lock will not complain, but few
+ * callers expect TM_Invisible, and we're not one of them). So
+ * for now, treat the tuple as deleted and do not process.
+ */
+ goto lnext;
+
+ case TM_Ok:
+
+ /*
+ * Got the lock successfully, the locked tuple saved in
+ * markSlot for, if needed, EvalPlanQual testing below.
+ */
+ if (tmfd.traversed)
+ epq_needed = true;
+ break;
+
+ case TM_Updated:
+ if (IsolationUsesXactSnapshot())
+ ereport(ERROR,
+ (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+ errmsg("could not serialize access due to concurrent update")));
+ elog(ERROR, "unexpected table_tuple_lock status: %u",
+ test);
+ break;
+
+ case TM_Deleted:
+ if (IsolationUsesXactSnapshot())
+ ereport(ERROR,
+ (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
+ errmsg("could not serialize access due to concurrent update")));
+ /* tuple was deleted so don't return it */
+ goto lnext;
+
+ case TM_Invisible:
+ elog(ERROR, "attempted to lock invisible tuple");
+ break;
+
+ default:
+ elog(ERROR, "unrecognized table_tuple_lock status: %u",
+ test);
+ }
/* Remember locked tuple's TID for EPQ testing and WHERE CURRENT OF */
erm->curCtid = tid;
@@ -215,91 +281,6 @@ lnext:
return slot;
}
-/*
- * ExecLockTableTuple
- * Locks tuple with the specified TID in lockmode following given wait
- * policy
- *
- * Returns true if the tuple was successfully locked. Locked tuple is loaded
- * into provided slot.
- */
-bool
-ExecLockTableTuple(Relation relation, ItemPointer tid, TupleTableSlot *slot,
- Snapshot snapshot, CommandId cid,
- LockTupleMode lockmode, LockWaitPolicy waitPolicy,
- bool *epq_needed)
-{
- TM_FailureData tmfd;
- int lockflags = TUPLE_LOCK_FLAG_LOCK_UPDATE_IN_PROGRESS;
- TM_Result test;
-
- if (!IsolationUsesXactSnapshot())
- lockflags |= TUPLE_LOCK_FLAG_FIND_LAST_VERSION;
-
- test = table_tuple_lock(relation, tid, snapshot, slot, cid, lockmode,
- waitPolicy, lockflags, &tmfd);
-
- switch (test)
- {
- case TM_WouldBlock:
- /* couldn't lock tuple in SKIP LOCKED mode */
- return false;
-
- case TM_SelfModified:
-
- /*
- * The target tuple was already updated or deleted by the current
- * command, or by a later command in the current transaction. We
- * *must* ignore the tuple in the former case, so as to avoid the
- * "Halloween problem" of repeated update attempts. In the latter
- * case it might be sensible to fetch the updated tuple instead,
- * but doing so would require changing heap_update and heap_delete
- * to not complain about updating "invisible" tuples, which seems
- * pretty scary (table_tuple_lock will not complain, but few
- * callers expect TM_Invisible, and we're not one of them). So for
- * now, treat the tuple as deleted and do not process.
- */
- return false;
-
- case TM_Ok:
-
- /*
- * Got the lock successfully, the locked tuple saved in slot for
- * EvalPlanQual, if asked by the caller.
- */
- if (tmfd.traversed && epq_needed)
- *epq_needed = true;
- break;
-
- case TM_Updated:
- if (IsolationUsesXactSnapshot())
- ereport(ERROR,
- (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
- errmsg("could not serialize access due to concurrent update")));
- elog(ERROR, "unexpected table_tuple_lock status: %u",
- test);
- break;
-
- case TM_Deleted:
- if (IsolationUsesXactSnapshot())
- ereport(ERROR,
- (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
- errmsg("could not serialize access due to concurrent update")));
- /* tuple was deleted so don't return it */
- return false;
-
- case TM_Invisible:
- elog(ERROR, "attempted to lock invisible tuple");
- return false;
-
- default:
- elog(ERROR, "unrecognized table_tuple_lock status: %u", test);
- return false;
- }
-
- return true;
-}
-
/* ----------------------------------------------------------------
* ExecInitLockRows
*
diff --git a/src/backend/utils/adt/ri_triggers.c b/src/backend/utils/adt/ri_triggers.c
index 088b4027008..01d4c22cfce 100644
--- a/src/backend/utils/adt/ri_triggers.c
+++ b/src/backend/utils/adt/ri_triggers.c
@@ -23,7 +23,6 @@
#include "postgres.h"
-#include "access/genam.h"
#include "access/htup_details.h"
#include "access/sysattr.h"
#include "access/table.h"
@@ -34,7 +33,6 @@
#include "catalog/pg_operator.h"
#include "catalog/pg_type.h"
#include "commands/trigger.h"
-#include "executor/execPartition.h"
#include "executor/executor.h"
#include "executor/spi.h"
#include "lib/ilist.h"
@@ -70,14 +68,19 @@
#define RI_KEYS_NONE_NULL 2
/* RI query type codes */
-#define RI_PLAN_CASCADE_ONDELETE 1
-#define RI_PLAN_CASCADE_ONUPDATE 2
+/* these queries are executed against the PK (referenced) table: */
+#define RI_PLAN_CHECK_LOOKUPPK 1
+#define RI_PLAN_CHECK_LOOKUPPK_FROM_PK 2
+#define RI_PLAN_LAST_ON_PK RI_PLAN_CHECK_LOOKUPPK_FROM_PK
+/* these queries are executed against the FK (referencing) table: */
+#define RI_PLAN_CASCADE_ONDELETE 3
+#define RI_PLAN_CASCADE_ONUPDATE 4
/* For RESTRICT, the same plan can be used for both ON DELETE and ON UPDATE triggers. */
-#define RI_PLAN_RESTRICT 3
-#define RI_PLAN_SETNULL_ONDELETE 4
-#define RI_PLAN_SETNULL_ONUPDATE 5
-#define RI_PLAN_SETDEFAULT_ONDELETE 6
-#define RI_PLAN_SETDEFAULT_ONUPDATE 7
+#define RI_PLAN_RESTRICT 5
+#define RI_PLAN_SETNULL_ONDELETE 6
+#define RI_PLAN_SETNULL_ONUPDATE 7
+#define RI_PLAN_SETDEFAULT_ONDELETE 8
+#define RI_PLAN_SETDEFAULT_ONUPDATE 9
#define MAX_QUOTED_NAME_LEN (NAMEDATALEN*2+3)
#define MAX_QUOTED_REL_NAME_LEN (MAX_QUOTED_NAME_LEN*2)
@@ -226,279 +229,10 @@ static void ri_ExtractValues(Relation rel, TupleTableSlot *slot,
static void ri_ReportViolation(const RI_ConstraintInfo *riinfo,
Relation pk_rel, Relation fk_rel,
TupleTableSlot *violatorslot, TupleDesc tupdesc,
- bool on_fk, bool partgone) pg_attribute_noreturn();
-static Oid get_fkey_unique_index(Oid conoid);
+ int queryno, bool partgone) pg_attribute_noreturn();
/*
- * Checks whether a tuple containing the unique key as extracted from the
- * tuple provided in 'slot' exists in 'pk_rel'. The key is extracted using the
- * constraint's index given in 'riinfo', which is also scanned to check the
- * existence of the key.
- *
- * If 'pk_rel' is a partitioned table, the check is performed on its leaf
- * partition that would contain the key.
- *
- * The provided tuple is either the one being inserted into the referencing
- * relation ('fk_rel' is non-NULL), or the one being deleted from the
- * referenced relation, that is, 'pk_rel' ('fk_rel' is NULL).
- */
-static bool
-ri_ReferencedKeyExists(Relation pk_rel, Relation fk_rel,
- TupleTableSlot *slot, const RI_ConstraintInfo *riinfo)
-{
- Oid constr_id = riinfo->constraint_id;
- Oid idxoid;
- Relation idxrel;
- Relation leaf_pk_rel = NULL;
- int num_pk;
- int i;
- bool found = false;
- const Oid *eq_oprs;
- Datum pk_vals[INDEX_MAX_KEYS];
- char pk_nulls[INDEX_MAX_KEYS];
- ScanKeyData skey[INDEX_MAX_KEYS];
- Snapshot snap = InvalidSnapshot;
- bool pushed_latest_snapshot = false;
- IndexScanDesc scan;
- TupleTableSlot *outslot;
- Oid saved_userid;
- int saved_sec_context;
- AclResult aclresult;
-
- /*
- * Extract the unique key from the provided slot and choose the equality
- * operators to use when scanning the index below.
- */
- if (fk_rel)
- {
- ri_ExtractValues(fk_rel, slot, riinfo, false, pk_vals, pk_nulls);
- /* Use PK = FK equality operator. */
- eq_oprs = riinfo->pf_eq_oprs;
-
- /*
- * May need to cast each of the individual values of the foreign key
- * to the corresponding PK column's type if the equality operator
- * demands it.
- */
- for (i = 0; i < riinfo->nkeys; i++)
- {
- if (pk_nulls[i] != 'n')
- {
- Oid eq_opr = eq_oprs[i];
- Oid typeid = RIAttType(fk_rel, riinfo->fk_attnums[i]);
- RI_CompareHashEntry *entry = ri_HashCompareOp(eq_opr, typeid);
-
- if (OidIsValid(entry->cast_func_finfo.fn_oid))
- pk_vals[i] = FunctionCall3(&entry->cast_func_finfo,
- pk_vals[i],
- Int32GetDatum(-1), /* typmod */
- BoolGetDatum(false)); /* implicit coercion */
- }
- }
- }
- else
- {
- ri_ExtractValues(pk_rel, slot, riinfo, true, pk_vals, pk_nulls);
- /* Use PK = PK equality operator. */
- eq_oprs = riinfo->pp_eq_oprs;
- }
-
- /*
- * Switch to referenced table's owner to perform the below operations as.
- * This matches what ri_PerformCheck() does.
- *
- * Note that as with queries done by ri_PerformCheck(), the way we select
- * the referenced row below effectively bypasses any RLS policies that may
- * be present on the referenced table.
- */
- GetUserIdAndSecContext(&saved_userid, &saved_sec_context);
- SetUserIdAndSecContext(RelationGetForm(pk_rel)->relowner,
- saved_sec_context | SECURITY_LOCAL_USERID_CHANGE);
-
- /*
- * Also check that the new user has permissions to look into the schema of
- * and SELECT from the referenced table.
- */
- aclresult = pg_namespace_aclcheck(RelationGetNamespace(pk_rel),
- GetUserId(), ACL_USAGE);
- if (aclresult != ACLCHECK_OK)
- aclcheck_error(aclresult, OBJECT_SCHEMA,
- get_namespace_name(RelationGetNamespace(pk_rel)));
- aclresult = pg_class_aclcheck(RelationGetRelid(pk_rel), GetUserId(),
- ACL_SELECT);
- if (aclresult != ACLCHECK_OK)
- aclcheck_error(aclresult, OBJECT_TABLE,
- RelationGetRelationName(pk_rel));
-
- /* Make the changes of the current command visible in all cases. */
- CommandCounterIncrement();
-
- /*
- * In the case of scanning the PK index for ri_Check_Pk_Match(), we'd like
- * to see all rows that could be interesting, even those that would not be
- * visible to the transaction snapshot. To do so, force-push the latest
- * snapshot.
- */
- if (fk_rel == NULL)
- {
- snap = GetLatestSnapshot();
- PushActiveSnapshot(snap);
- pushed_latest_snapshot = true;
- }
- else
- {
- snap = GetTransactionSnapshot();
- PushActiveSnapshot(snap);
- }
-
- /*
- * Open the constraint index to be scanned.
- *
- * If the target table is partitioned, we must look up the leaf partition
- * and its corresponding unique index to search the keys in.
- */
- idxoid = get_fkey_unique_index(constr_id);
- if (pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE)
- {
- Oid leaf_idxoid;
- Snapshot mysnap = InvalidSnapshot;
-
- /*
- * XXX the partition descriptor machinery has a hack that assumes that
- * the queries originating in this module push the latest snapshot in
- * the transaction-snapshot mode. If we haven't pushed one already,
- * do so now.
- */
- if (!pushed_latest_snapshot)
- {
- mysnap = GetLatestSnapshot();
- PushActiveSnapshot(mysnap);
- }
-
- leaf_pk_rel = ExecGetLeafPartitionForKey(pk_rel, riinfo->nkeys,
- riinfo->pk_attnums,
- pk_vals, pk_nulls,
- idxoid, RowShareLock,
- &leaf_idxoid);
-
- /*
- * XXX done fiddling with the partition descriptor machinery so unset
- * the active snapshot if we must.
- */
- if (mysnap != InvalidSnapshot)
- PopActiveSnapshot();
-
- /*
- * If no suitable leaf partition exists, neither can the key we're
- * looking for.
- */
- if (leaf_pk_rel == NULL)
- {
- SetUserIdAndSecContext(saved_userid, saved_sec_context);
- PopActiveSnapshot();
- return false;
- }
-
- pk_rel = leaf_pk_rel;
- idxoid = leaf_idxoid;
- }
- idxrel = index_open(idxoid, RowShareLock);
-
- /* Set up ScanKeys for the index scan. */
- num_pk = IndexRelationGetNumberOfKeyAttributes(idxrel);
- for (i = 0; i < num_pk; i++)
- {
- int pkattno = i + 1;
- Oid operator = eq_oprs[i];
- Oid opfamily = idxrel->rd_opfamily[i];
- StrategyNumber strat = get_op_opfamily_strategy(operator, opfamily);
- RegProcedure regop = get_opcode(operator);
-
- /* Initialize the scankey. */
- ScanKeyInit(&skey[i],
- pkattno,
- strat,
- regop,
- pk_vals[i]);
-
- skey[i].sk_collation = idxrel->rd_indcollation[i];
-
- /*
- * Check for null value. Should not occur, because callers currently
- * take care of the cases in which they do occur.
- */
- if (pk_nulls[i] == 'n')
- skey[i].sk_flags |= SK_ISNULL;
- }
-
- scan = index_beginscan(pk_rel, idxrel, snap, num_pk, 0);
- index_rescan(scan, skey, num_pk, NULL, 0);
-
- /* Look for the tuple, and if found, try to lock it in key share mode. */
- outslot = table_slot_create(pk_rel, NULL);
- if (index_getnext_slot(scan, ForwardScanDirection, outslot))
- {
- /*
- * If we fail to lock the tuple for whatever reason, assume it doesn't
- * exist.
- */
- found = ExecLockTableTuple(pk_rel, &(outslot->tts_tid), outslot,
- snap,
- GetCurrentCommandId(false),
- LockTupleKeyShare,
- LockWaitBlock, NULL);
- }
-
- index_endscan(scan);
- ExecDropSingleTupleTableSlot(outslot);
-
- /* Don't release lock until commit. */
- index_close(idxrel, NoLock);
-
- /* Close leaf partition relation if any. */
- if (leaf_pk_rel)
- table_close(leaf_pk_rel, NoLock);
-
- /* Restore UID and security context */
- SetUserIdAndSecContext(saved_userid, saved_sec_context);
-
- PopActiveSnapshot();
-
- return found;
-}
-
-/*
- * get_fkey_unique_index
- * Returns the unique index used by a supposedly foreign key constraint
- *
- * XXX This is very similar to get_constraint_index; probably they should be
- * unified.
- */
-static Oid
-get_fkey_unique_index(Oid conoid)
-{
- Oid result = InvalidOid;
- HeapTuple tp;
-
- tp = SearchSysCache1(CONSTROID, ObjectIdGetDatum(conoid));
- if (HeapTupleIsValid(tp))
- {
- Form_pg_constraint contup = (Form_pg_constraint) GETSTRUCT(tp);
-
- if (contup->contype == CONSTRAINT_FOREIGN)
- result = contup->conindid;
- ReleaseSysCache(tp);
- }
-
- if (!OidIsValid(result))
- elog(ERROR, "unique index not found for foreign key constraint %u",
- conoid);
-
- return result;
-}
-
-/*
* RI_FKey_check -
*
* Check foreign key existence (combined for INSERT and UPDATE).
@@ -510,6 +244,8 @@ RI_FKey_check(TriggerData *trigdata)
Relation fk_rel;
Relation pk_rel;
TupleTableSlot *newslot;
+ RI_QueryKey qkey;
+ SPIPlanPtr qplan;
riinfo = ri_FetchConstraintInfo(trigdata->tg_trigger,
trigdata->tg_relation, false);
@@ -589,9 +325,9 @@ RI_FKey_check(TriggerData *trigdata)
/*
* MATCH PARTIAL - all non-null columns must match. (not
- * implemented, can be done by modifying
- * ri_ReferencedKeyExists() to only include non-null
- * columns.
+ * implemented, can be done by modifying the query below
+ * to only include non-null columns, or by writing a
+ * special version here)
*/
break;
#endif
@@ -606,12 +342,74 @@ RI_FKey_check(TriggerData *trigdata)
break;
}
- if (!ri_ReferencedKeyExists(pk_rel, fk_rel, newslot, riinfo))
- ri_ReportViolation(riinfo,
- pk_rel, fk_rel,
- newslot,
- NULL,
- true, false);
+ if (SPI_connect() != SPI_OK_CONNECT)
+ elog(ERROR, "SPI_connect failed");
+
+ /* Fetch or prepare a saved plan for the real check */
+ ri_BuildQueryKey(&qkey, riinfo, RI_PLAN_CHECK_LOOKUPPK);
+
+ if ((qplan = ri_FetchPreparedPlan(&qkey)) == NULL)
+ {
+ StringInfoData querybuf;
+ char pkrelname[MAX_QUOTED_REL_NAME_LEN];
+ char attname[MAX_QUOTED_NAME_LEN];
+ char paramname[16];
+ const char *querysep;
+ Oid queryoids[RI_MAX_NUMKEYS];
+ const char *pk_only;
+
+ /* ----------
+ * The query string built is
+ * SELECT 1 FROM [ONLY] <pktable> x WHERE pkatt1 = $1 [AND ...]
+ * FOR KEY SHARE OF x
+ * The type id's for the $ parameters are those of the
+ * corresponding FK attributes.
+ * ----------
+ */
+ initStringInfo(&querybuf);
+ pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
+ "" : "ONLY ";
+ quoteRelationName(pkrelname, pk_rel);
+ appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x",
+ pk_only, pkrelname);
+ querysep = "WHERE";
+ for (int i = 0; i < riinfo->nkeys; i++)
+ {
+ Oid pk_type = RIAttType(pk_rel, riinfo->pk_attnums[i]);
+ Oid fk_type = RIAttType(fk_rel, riinfo->fk_attnums[i]);
+
+ quoteOneName(attname,
+ RIAttName(pk_rel, riinfo->pk_attnums[i]));
+ sprintf(paramname, "$%d", i + 1);
+ ri_GenerateQual(&querybuf, querysep,
+ attname, pk_type,
+ riinfo->pf_eq_oprs[i],
+ paramname, fk_type);
+ querysep = "AND";
+ queryoids[i] = fk_type;
+ }
+ appendStringInfoString(&querybuf, " FOR KEY SHARE OF x");
+
+ /* Prepare and save the plan */
+ qplan = ri_PlanCheck(querybuf.data, riinfo->nkeys, queryoids,
+ &qkey, fk_rel, pk_rel);
+ }
+
+ /*
+ * Now check that foreign key exists in PK table
+ *
+ * XXX detectNewRows must be true when a partitioned table is on the
+ * referenced side. The reason is that our snapshot must be fresh in
+ * order for the hack in find_inheritance_children() to work.
+ */
+ ri_PerformCheck(riinfo, &qkey, qplan,
+ fk_rel, pk_rel,
+ NULL, newslot,
+ pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE,
+ SPI_OK_SELECT);
+
+ if (SPI_finish() != SPI_OK_FINISH)
+ elog(ERROR, "SPI_finish failed");
table_close(pk_rel, RowShareLock);
@@ -666,10 +464,81 @@ ri_Check_Pk_Match(Relation pk_rel, Relation fk_rel,
TupleTableSlot *oldslot,
const RI_ConstraintInfo *riinfo)
{
+ SPIPlanPtr qplan;
+ RI_QueryKey qkey;
+ bool result;
+
/* Only called for non-null rows */
Assert(ri_NullCheck(RelationGetDescr(pk_rel), oldslot, riinfo, true) == RI_KEYS_NONE_NULL);
- return ri_ReferencedKeyExists(pk_rel, NULL, oldslot, riinfo);
+ if (SPI_connect() != SPI_OK_CONNECT)
+ elog(ERROR, "SPI_connect failed");
+
+ /*
+ * Fetch or prepare a saved plan for checking PK table with values coming
+ * from a PK row
+ */
+ ri_BuildQueryKey(&qkey, riinfo, RI_PLAN_CHECK_LOOKUPPK_FROM_PK);
+
+ if ((qplan = ri_FetchPreparedPlan(&qkey)) == NULL)
+ {
+ StringInfoData querybuf;
+ char pkrelname[MAX_QUOTED_REL_NAME_LEN];
+ char attname[MAX_QUOTED_NAME_LEN];
+ char paramname[16];
+ const char *querysep;
+ const char *pk_only;
+ Oid queryoids[RI_MAX_NUMKEYS];
+
+ /* ----------
+ * The query string built is
+ * SELECT 1 FROM [ONLY] <pktable> x WHERE pkatt1 = $1 [AND ...]
+ * FOR KEY SHARE OF x
+ * The type id's for the $ parameters are those of the
+ * PK attributes themselves.
+ * ----------
+ */
+ initStringInfo(&querybuf);
+ pk_only = pk_rel->rd_rel->relkind == RELKIND_PARTITIONED_TABLE ?
+ "" : "ONLY ";
+ quoteRelationName(pkrelname, pk_rel);
+ appendStringInfo(&querybuf, "SELECT 1 FROM %s%s x",
+ pk_only, pkrelname);
+ querysep = "WHERE";
+ for (int i = 0; i < riinfo->nkeys; i++)
+ {
+ Oid pk_type = RIAttType(pk_rel, riinfo->pk_attnums[i]);
+
+ quoteOneName(attname,
+ RIAttName(pk_rel, riinfo->pk_attnums[i]));
+ sprintf(paramname, "$%d", i + 1);
+ ri_GenerateQual(&querybuf, querysep,
+ attname, pk_type,
+ riinfo->pp_eq_oprs[i],
+ paramname, pk_type);
+ querysep = "AND";
+ queryoids[i] = pk_type;
+ }
+ appendStringInfoString(&querybuf, " FOR KEY SHARE OF x");
+
+ /* Prepare and save the plan */
+ qplan = ri_PlanCheck(querybuf.data, riinfo->nkeys, queryoids,
+ &qkey, fk_rel, pk_rel);
+ }
+
+ /*
+ * We have a plan now. Run it.
+ */
+ result = ri_PerformCheck(riinfo, &qkey, qplan,
+ fk_rel, pk_rel,
+ oldslot, NULL,
+ true, /* treat like update */
+ SPI_OK_SELECT);
+
+ if (SPI_finish() != SPI_OK_FINISH)
+ elog(ERROR, "SPI_finish failed");
+
+ return result;
}
@@ -1739,10 +1608,15 @@ RI_Initial_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
errtableconstraint(fk_rel,
NameStr(fake_riinfo.conname))));
+ /*
+ * We tell ri_ReportViolation we were doing the RI_PLAN_CHECK_LOOKUPPK
+ * query, which isn't true, but will cause it to use
+ * fake_riinfo.fk_attnums as we need.
+ */
ri_ReportViolation(&fake_riinfo,
pk_rel, fk_rel,
slot, tupdesc,
- true, false);
+ RI_PLAN_CHECK_LOOKUPPK, false);
ExecDropSingleTupleTableSlot(slot);
}
@@ -1959,7 +1833,7 @@ RI_PartitionRemove_Check(Trigger *trigger, Relation fk_rel, Relation pk_rel)
fake_riinfo.pk_attnums[i] = i + 1;
ri_ReportViolation(&fake_riinfo, pk_rel, fk_rel,
- slot, tupdesc, true, true);
+ slot, tupdesc, 0, true);
}
if (SPI_finish() != SPI_OK_FINISH)
@@ -2096,8 +1970,9 @@ ri_BuildQueryKey(RI_QueryKey *key, const RI_ConstraintInfo *riinfo,
{
/*
* Inherited constraints with a common ancestor can share ri_query_cache
- * entries, because each query processes the other table involved in the
- * FK constraint (i.e., not the table on which the trigger has been
+ * entries for all query types except RI_PLAN_CHECK_LOOKUPPK_FROM_PK.
+ * Except in that case, the query processes the other table involved in
+ * the FK constraint (i.e., not the table on which the trigger has been
* fired), and so it will be the same for all members of the inheritance
* tree. So we may use the root constraint's OID in the hash key, rather
* than the constraint's own OID. This avoids creating duplicate SPI
@@ -2108,13 +1983,13 @@ ri_BuildQueryKey(RI_QueryKey *key, const RI_ConstraintInfo *riinfo,
* constraint, because partitions can have different column orders,
* resulting in different pk_attnums[] or fk_attnums[] array contents.)
*
- * (Note also that for a standalone or non-inherited constraint,
- * constraint_root_id is same as constraint_id.)
- *
* We assume struct RI_QueryKey contains no padding bytes, else we'd need
* to use memset to clear them.
*/
- key->constr_id = riinfo->constraint_root_id;
+ if (constr_queryno != RI_PLAN_CHECK_LOOKUPPK_FROM_PK)
+ key->constr_id = riinfo->constraint_root_id;
+ else
+ key->constr_id = riinfo->constraint_id;
key->constr_queryno = constr_queryno;
}
@@ -2385,12 +2260,19 @@ ri_PlanCheck(const char *querystr, int nargs, Oid *argtypes,
RI_QueryKey *qkey, Relation fk_rel, Relation pk_rel)
{
SPIPlanPtr qplan;
-
- /* There are currently no queries that run on PK table. */
- Relation query_rel = fk_rel;
+ Relation query_rel;
Oid save_userid;
int save_sec_context;
+ /*
+ * Use the query type code to determine whether the query is run against
+ * the PK or FK table; we'll do the check as that table's owner
+ */
+ if (qkey->constr_queryno <= RI_PLAN_LAST_ON_PK)
+ query_rel = pk_rel;
+ else
+ query_rel = fk_rel;
+
/* Switch to proper UID to perform check as */
GetUserIdAndSecContext(&save_userid, &save_sec_context);
SetUserIdAndSecContext(RelationGetForm(query_rel)->relowner,
@@ -2423,9 +2305,9 @@ ri_PerformCheck(const RI_ConstraintInfo *riinfo,
TupleTableSlot *oldslot, TupleTableSlot *newslot,
bool detectNewRows, int expect_OK)
{
- /* There are currently no queries that run on PK table. */
- Relation query_rel = fk_rel,
- source_rel = pk_rel;
+ Relation query_rel,
+ source_rel;
+ bool source_is_pk;
Snapshot test_snapshot;
Snapshot crosscheck_snapshot;
int limit;
@@ -2435,17 +2317,46 @@ ri_PerformCheck(const RI_ConstraintInfo *riinfo,
Datum vals[RI_MAX_NUMKEYS * 2];
char nulls[RI_MAX_NUMKEYS * 2];
+ /*
+ * Use the query type code to determine whether the query is run against
+ * the PK or FK table; we'll do the check as that table's owner
+ */
+ if (qkey->constr_queryno <= RI_PLAN_LAST_ON_PK)
+ query_rel = pk_rel;
+ else
+ query_rel = fk_rel;
+
+ /*
+ * The values for the query are taken from the table on which the trigger
+ * is called - it is normally the other one with respect to query_rel. An
+ * exception is ri_Check_Pk_Match(), which uses the PK table for both (and
+ * sets queryno to RI_PLAN_CHECK_LOOKUPPK_FROM_PK). We might eventually
+ * need some less klugy way to determine this.
+ */
+ if (qkey->constr_queryno == RI_PLAN_CHECK_LOOKUPPK)
+ {
+ source_rel = fk_rel;
+ source_is_pk = false;
+ }
+ else
+ {
+ source_rel = pk_rel;
+ source_is_pk = true;
+ }
+
/* Extract the parameters to be passed into the query */
if (newslot)
{
- ri_ExtractValues(source_rel, newslot, riinfo, true, vals, nulls);
+ ri_ExtractValues(source_rel, newslot, riinfo, source_is_pk,
+ vals, nulls);
if (oldslot)
- ri_ExtractValues(source_rel, oldslot, riinfo, true,
+ ri_ExtractValues(source_rel, oldslot, riinfo, source_is_pk,
vals + riinfo->nkeys, nulls + riinfo->nkeys);
}
else
{
- ri_ExtractValues(source_rel, oldslot, riinfo, true, vals, nulls);
+ ri_ExtractValues(source_rel, oldslot, riinfo, source_is_pk,
+ vals, nulls);
}
/*
@@ -2509,12 +2420,14 @@ ri_PerformCheck(const RI_ConstraintInfo *riinfo,
errhint("This is most likely due to a rule having rewritten the query.")));
/* XXX wouldn't it be clearer to do this part at the caller? */
- if (expect_OK == SPI_OK_SELECT && SPI_processed != 0)
+ if (qkey->constr_queryno != RI_PLAN_CHECK_LOOKUPPK_FROM_PK &&
+ expect_OK == SPI_OK_SELECT &&
+ (SPI_processed == 0) == (qkey->constr_queryno == RI_PLAN_CHECK_LOOKUPPK))
ri_ReportViolation(riinfo,
pk_rel, fk_rel,
newslot ? newslot : oldslot,
NULL,
- false, false);
+ qkey->constr_queryno, false);
return SPI_processed != 0;
}
@@ -2545,9 +2458,9 @@ ri_ExtractValues(Relation rel, TupleTableSlot *slot,
/*
* Produce an error report
*
- * If the failed constraint was on insert/update to the FK table (on_fk is
- * true), we want the key names and values extracted from there, and the
- * error message to look like 'key blah is not present in PK'.
+ * If the failed constraint was on insert/update to the FK table,
+ * we want the key names and values extracted from there, and the error
+ * message to look like 'key blah is not present in PK'.
* Otherwise, the attr names and values come from the PK table and the
* message looks like 'key blah is still referenced from FK'.
*/
@@ -2555,20 +2468,22 @@ static void
ri_ReportViolation(const RI_ConstraintInfo *riinfo,
Relation pk_rel, Relation fk_rel,
TupleTableSlot *violatorslot, TupleDesc tupdesc,
- bool on_fk, bool partgone)
+ int queryno, bool partgone)
{
StringInfoData key_names;
StringInfoData key_values;
+ bool onfk;
const int16 *attnums;
Oid rel_oid;
AclResult aclresult;
bool has_perm = true;
/*
- * If tupdesc wasn't passed by caller, assume the violator tuple came from
- * there.
+ * Determine which relation to complain about. If tupdesc wasn't passed
+ * by caller, assume the violator tuple came from there.
*/
- if (on_fk)
+ onfk = (queryno == RI_PLAN_CHECK_LOOKUPPK);
+ if (onfk)
{
attnums = riinfo->fk_attnums;
rel_oid = fk_rel->rd_id;
@@ -2670,7 +2585,7 @@ ri_ReportViolation(const RI_ConstraintInfo *riinfo,
key_names.data, key_values.data,
RelationGetRelationName(fk_rel)),
errtableconstraint(fk_rel, NameStr(riinfo->conname))));
- else if (on_fk)
+ else if (onfk)
ereport(ERROR,
(errcode(ERRCODE_FOREIGN_KEY_VIOLATION),
errmsg("insert or update on table \"%s\" violates foreign key constraint \"%s\"",
@@ -2977,10 +2892,7 @@ ri_AttributesEqual(Oid eq_opr, Oid typeid,
* ri_HashCompareOp -
*
* See if we know how to compare two values, and create a new hash entry
- * if not. The entry contains the FmgrInfo of the equality operator function
- * and that of the cast function, if one is needed to convert the right
- * operand (whose type OID has been passed) before passing it to the equality
- * function.
+ * if not.
*/
static RI_CompareHashEntry *
ri_HashCompareOp(Oid eq_opr, Oid typeid)
@@ -3036,16 +2948,8 @@ ri_HashCompareOp(Oid eq_opr, Oid typeid)
* moment since that will never be generated for implicit coercions.
*/
op_input_types(eq_opr, &lefttype, &righttype);
-
- /*
- * Don't need to cast if the values that will be passed to the
- * operator will be of expected operand type(s). The operator can be
- * cross-type (such as when called by ri_ReferencedKeyExists()), in
- * which case, we only need the cast if the right operand value
- * doesn't match the type expected by the operator.
- */
- if ((lefttype == righttype && typeid == lefttype) ||
- (lefttype != righttype && typeid == righttype))
+ Assert(lefttype == righttype);
+ if (typeid == lefttype)
castfunc = InvalidOid; /* simplest case */
else
{
diff --git a/src/include/executor/execPartition.h b/src/include/executor/execPartition.h
index cbe1d996e6b..708435e9528 100644
--- a/src/include/executor/execPartition.h
+++ b/src/include/executor/execPartition.h
@@ -31,12 +31,6 @@ extern ResultRelInfo *ExecFindPartition(ModifyTableState *mtstate,
EState *estate);
extern void ExecCleanupTupleRouting(ModifyTableState *mtstate,
PartitionTupleRouting *proute);
-extern Relation ExecGetLeafPartitionForKey(Relation root_rel,
- int key_natts,
- const AttrNumber *key_attnums,
- Datum *key_vals, char *key_nulls,
- Oid root_idxoid, int lockmode,
- Oid *leaf_idxoid);
/*
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index 216d28679a6..873772f1883 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -652,14 +652,6 @@ extern void CheckSubscriptionRelkind(char relkind, const char *nspname,
const char *relname);
/*
- * prototypes from functions in nodeLockRows.c
- */
-extern bool ExecLockTableTuple(Relation relation, ItemPointer tid,
- TupleTableSlot *slot, Snapshot snapshot,
- CommandId cid, LockTupleMode lockmode,
- LockWaitPolicy waitPolicy, bool *epq_needed);
-
-/*
* prototypes from functions in nodeModifyTable.c
*/
extern TupleTableSlot *ExecGetUpdateNewTuple(ResultRelInfo *relinfo,
diff --git a/src/test/isolation/expected/fk-snapshot.out b/src/test/isolation/expected/fk-snapshot.out
index 22752cc7429..5faf80d6ce0 100644
--- a/src/test/isolation/expected/fk-snapshot.out
+++ b/src/test/isolation/expected/fk-snapshot.out
@@ -47,12 +47,12 @@ a
step s2ifn2: INSERT INTO fk_noparted VALUES (2);
step s2c: COMMIT;
-ERROR: insert or update on table "fk_noparted" violates foreign key constraint "fk_noparted_a_fkey"
step s2sfn: SELECT * FROM fk_noparted;
a
-
1
-(1 row)
+2
+(2 rows)
starting permutation: s1brc s2brc s2ip2 s1sp s2c s1sp s1ifp2 s2brc s2sfp s1c s1sfp s2ifn2 s2c s2sfn
diff --git a/src/test/isolation/specs/fk-snapshot.spec b/src/test/isolation/specs/fk-snapshot.spec
index 64d27f29c3c..378507fbc38 100644
--- a/src/test/isolation/specs/fk-snapshot.spec
+++ b/src/test/isolation/specs/fk-snapshot.spec
@@ -46,7 +46,10 @@ step s2sfn { SELECT * FROM fk_noparted; }
# inserting into referencing tables in transaction-snapshot mode
# PK table is non-partitioned
permutation s1brr s2brc s2ip2 s1sp s2c s1sp s1ifp2 s1c s1sfp
-# PK table is partitioned
+# PK table is partitioned: buggy, because s2's serialization transaction can
+# see the uncommitted row thanks to the latest snapshot taken for
+# partition lookup to work correctly also ends up getting used by the PK index
+# scan
permutation s2ip2 s2brr s1brc s1ifp2 s2sfp s1c s2sfp s2ifn2 s2c s2sfn
# inserting into referencing tables in up-to-date snapshot mode