aboutsummaryrefslogtreecommitdiff
path: root/src/backend/commands/tablecmds.c
diff options
context:
space:
mode:
authorMichael Paquier <michael@paquier.xyz>2019-12-18 16:23:02 +0900
committerMichael Paquier <michael@paquier.xyz>2019-12-18 16:23:02 +0900
commite1551f96e643a52a035c3b35777d968bc073f7fc (patch)
tree9010890c3289462dc42a51a4d03498423359d375 /src/backend/commands/tablecmds.c
parent04c8a69c0cccbc271e0feeb22a74c69fbd87c37e (diff)
downloadpostgresql-e1551f96e643a52a035c3b35777d968bc073f7fc.tar.gz
postgresql-e1551f96e643a52a035c3b35777d968bc073f7fc.zip
Refactor attribute mappings used in logical tuple conversion
Tuple conversion support in tupconvert.c is able to convert rowtypes between two relations, inner and outer, which are logically equivalent but have a different ordering or even dropped columns (used mainly for inheritance tree and partitions). This makes use of attribute mappings, which are simple arrays made of AttrNumber elements with a length matching the number of attributes of the outer relation. The length of the attribute mapping has been treated as completely independent of the mapping itself until now, making it easy to pass down an incorrect mapping length. This commit refactors the code related to attribute mappings and moves it into an independent facility called attmap.c, extracted from tupconvert.c. This merges the attribute mapping with its length, avoiding to try to guess what is the length of a mapping to use as this is computed once, when the map is built. This will avoid mistakes like what has been fixed in dc816e58, which has used an incorrect mapping length by matching it with the number of attributes of an inner relation (a child partition) instead of an outer relation (a partitioned table). Author: Michael Paquier Reviewed-by: Amit Langote Discussion: https://postgr.es/m/20191121042556.GD153437@paquier.xyz
Diffstat (limited to 'src/backend/commands/tablecmds.c')
-rw-r--r--src/backend/commands/tablecmds.c98
1 files changed, 47 insertions, 51 deletions
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c
index daa80ec4aa0..e8e004eef4d 100644
--- a/src/backend/commands/tablecmds.c
+++ b/src/backend/commands/tablecmds.c
@@ -14,6 +14,7 @@
*/
#include "postgres.h"
+#include "access/attmap.h"
#include "access/genam.h"
#include "access/heapam.h"
#include "access/heapam_xlog.h"
@@ -22,7 +23,6 @@
#include "access/relscan.h"
#include "access/sysattr.h"
#include "access/tableam.h"
-#include "access/tupconvert.h"
#include "access/xact.h"
#include "access/xlog.h"
#include "catalog/catalog.h"
@@ -1070,7 +1070,7 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId,
foreach(cell, idxlist)
{
Relation idxRel = index_open(lfirst_oid(cell), AccessShareLock);
- AttrNumber *attmap;
+ AttrMap *attmap;
IndexStmt *idxstmt;
Oid constraintOid;
@@ -1090,12 +1090,11 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId,
}
}
- attmap = convert_tuples_by_name_map(RelationGetDescr(rel),
- RelationGetDescr(parent));
+ attmap = build_attrmap_by_name(RelationGetDescr(rel),
+ RelationGetDescr(parent));
idxstmt =
generateClonedIndexStmt(NULL, idxRel,
- attmap, RelationGetDescr(parent)->natts,
- &constraintOid);
+ attmap, &constraintOid);
DefineIndex(RelationGetRelid(rel),
idxstmt,
InvalidOid,
@@ -2156,7 +2155,7 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
Relation relation;
TupleDesc tupleDesc;
TupleConstr *constr;
- AttrNumber *newattno;
+ AttrMap *newattmap;
AttrNumber parent_attno;
/* caller already got lock */
@@ -2237,12 +2236,11 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
constr = tupleDesc->constr;
/*
- * newattno[] will contain the child-table attribute numbers for the
- * attributes of this parent table. (They are not the same for
- * parents after the first one, nor if we have dropped columns.)
+ * newattmap->attnums[] will contain the child-table attribute numbers
+ * for the attributes of this parent table. (They are not the same
+ * for parents after the first one, nor if we have dropped columns.)
*/
- newattno = (AttrNumber *)
- palloc0(tupleDesc->natts * sizeof(AttrNumber));
+ newattmap = make_attrmap(tupleDesc->natts);
for (parent_attno = 1; parent_attno <= tupleDesc->natts;
parent_attno++)
@@ -2257,7 +2255,7 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
* Ignore dropped columns in the parent.
*/
if (attribute->attisdropped)
- continue; /* leave newattno entry as zero */
+ continue; /* leave newattmap->attnums entry as zero */
/*
* Does it conflict with some previously inherited column?
@@ -2315,7 +2313,7 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
/* Merge of NOT NULL constraints = OR 'em together */
def->is_not_null |= attribute->attnotnull;
/* Default and other constraints are handled below */
- newattno[parent_attno - 1] = exist_attno;
+ newattmap->attnums[parent_attno - 1] = exist_attno;
/* Check for GENERATED conflicts */
if (def->generated != attribute->attgenerated)
@@ -2346,7 +2344,7 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
def->constraints = NIL;
def->location = -1;
inhSchema = lappend(inhSchema, def);
- newattno[parent_attno - 1] = ++child_attno;
+ newattmap->attnums[parent_attno - 1] = ++child_attno;
}
/*
@@ -2394,7 +2392,7 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
/*
* Now copy the CHECK constraints of this parent, adjusting attnos
- * using the completed newattno[] map. Identically named constraints
+ * using the completed newattmap map. Identically named constraints
* are merged if possible, else we throw error.
*/
if (constr && constr->num_check > 0)
@@ -2415,7 +2413,7 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
/* Adjust Vars to match new table's column numbering */
expr = map_variable_attnos(stringToNode(check[i].ccbin),
1, 0,
- newattno, tupleDesc->natts,
+ newattmap,
InvalidOid, &found_whole_row);
/*
@@ -2452,7 +2450,7 @@ MergeAttributes(List *schema, List *supers, char relpersistence,
}
}
- pfree(newattno);
+ free_attrmap(newattmap);
/*
* Close the parent rel, but keep our lock on it until xact commit.
@@ -8168,7 +8166,7 @@ addFkRecurseReferenced(List **wqueue, Constraint *fkconstraint, Relation rel,
for (int i = 0; i < pd->nparts; i++)
{
Relation partRel;
- AttrNumber *map;
+ AttrMap *map;
AttrNumber *mapped_pkattnum;
Oid partIndexId;
@@ -8178,13 +8176,13 @@ addFkRecurseReferenced(List **wqueue, Constraint *fkconstraint, Relation rel,
* Map the attribute numbers in the referenced side of the FK
* definition to match the partition's column layout.
*/
- map = convert_tuples_by_name_map_if_req(RelationGetDescr(partRel),
- RelationGetDescr(pkrel));
+ map = build_attrmap_by_name_if_req(RelationGetDescr(partRel),
+ RelationGetDescr(pkrel));
if (map)
{
mapped_pkattnum = palloc(sizeof(AttrNumber) * numfks);
for (int j = 0; j < numfks; j++)
- mapped_pkattnum[j] = map[pkattnum[j] - 1];
+ mapped_pkattnum[j] = map->attnums[pkattnum[j] - 1];
}
else
mapped_pkattnum = pkattnum;
@@ -8205,7 +8203,7 @@ addFkRecurseReferenced(List **wqueue, Constraint *fkconstraint, Relation rel,
if (map)
{
pfree(mapped_pkattnum);
- pfree(map);
+ free_attrmap(map);
}
}
}
@@ -8309,7 +8307,7 @@ addFkRecurseReferencing(List **wqueue, Constraint *fkconstraint, Relation rel,
Oid partitionId = pd->oids[i];
Relation partition = table_open(partitionId, lockmode);
List *partFKs;
- AttrNumber *attmap;
+ AttrMap *attmap;
AttrNumber mapped_fkattnum[INDEX_MAX_KEYS];
bool attached;
char *conname;
@@ -8320,10 +8318,10 @@ addFkRecurseReferencing(List **wqueue, Constraint *fkconstraint, Relation rel,
CheckTableNotInUse(partition, "ALTER TABLE");
- attmap = convert_tuples_by_name_map(RelationGetDescr(partition),
- RelationGetDescr(rel));
+ attmap = build_attrmap_by_name(RelationGetDescr(partition),
+ RelationGetDescr(rel));
for (int j = 0; j < numfks; j++)
- mapped_fkattnum[j] = attmap[fkattnum[j] - 1];
+ mapped_fkattnum[j] = attmap->attnums[fkattnum[j] - 1];
/* Check whether an existing constraint can be repurposed */
partFKs = copyObject(RelationGetFKeyList(partition));
@@ -8471,7 +8469,7 @@ static void
CloneFkReferenced(Relation parentRel, Relation partitionRel)
{
Relation pg_constraint;
- AttrNumber *attmap;
+ AttrMap *attmap;
ListCell *cell;
SysScanDesc scan;
ScanKeyData key[2];
@@ -8510,8 +8508,8 @@ CloneFkReferenced(Relation parentRel, Relation partitionRel)
systable_endscan(scan);
table_close(pg_constraint, RowShareLock);
- attmap = convert_tuples_by_name_map(RelationGetDescr(partitionRel),
- RelationGetDescr(parentRel));
+ attmap = build_attrmap_by_name(RelationGetDescr(partitionRel),
+ RelationGetDescr(parentRel));
foreach(cell, clone)
{
Oid constrOid = lfirst_oid(cell);
@@ -8549,8 +8547,9 @@ CloneFkReferenced(Relation parentRel, Relation partitionRel)
conpfeqop,
conppeqop,
conffeqop);
+ Assert(numfks == attmap->maplen);
for (int i = 0; i < numfks; i++)
- mapped_confkey[i] = attmap[confkey[i] - 1];
+ mapped_confkey[i] = attmap->attnums[confkey[i] - 1];
fkconstraint = makeNode(Constraint);
/* for now this is all we need */
@@ -8617,7 +8616,7 @@ CloneFkReferenced(Relation parentRel, Relation partitionRel)
static void
CloneFkReferencing(List **wqueue, Relation parentRel, Relation partRel)
{
- AttrNumber *attmap;
+ AttrMap *attmap;
List *partFKs;
List *clone = NIL;
ListCell *cell;
@@ -8646,8 +8645,8 @@ CloneFkReferencing(List **wqueue, Relation parentRel, Relation partRel)
* The constraint key may differ, if the columns in the partition are
* different. This map is used to convert them.
*/
- attmap = convert_tuples_by_name_map(RelationGetDescr(partRel),
- RelationGetDescr(parentRel));
+ attmap = build_attrmap_by_name(RelationGetDescr(partRel),
+ RelationGetDescr(parentRel));
partFKs = copyObject(RelationGetFKeyList(partRel));
@@ -8697,7 +8696,7 @@ CloneFkReferencing(List **wqueue, Relation parentRel, Relation partRel)
DeconstructFkConstraintRow(tuple, &numfks, conkey, confkey,
conpfeqop, conppeqop, conffeqop);
for (int i = 0; i < numfks; i++)
- mapped_conkey[i] = attmap[conkey[i] - 1];
+ mapped_conkey[i] = attmap->attnums[conkey[i] - 1];
/*
* Before creating a new constraint, see whether any existing FKs are
@@ -10470,18 +10469,18 @@ ATPrepAlterColumnType(List **wqueue,
*/
if (def->cooked_default)
{
- AttrNumber *attmap;
+ AttrMap *attmap;
bool found_whole_row;
/* create a copy to scribble on */
cmd = copyObject(cmd);
- attmap = convert_tuples_by_name_map(RelationGetDescr(childrel),
- RelationGetDescr(rel));
+ attmap = build_attrmap_by_name(RelationGetDescr(childrel),
+ RelationGetDescr(rel));
((ColumnDef *) cmd->def)->cooked_default =
map_variable_attnos(def->cooked_default,
1, 0,
- attmap, RelationGetDescr(rel)->natts,
+ attmap,
InvalidOid, &found_whole_row);
if (found_whole_row)
ereport(ERROR,
@@ -15833,7 +15832,7 @@ AttachPartitionEnsureIndexes(Relation rel, Relation attachrel)
Oid idx = lfirst_oid(cell);
Relation idxRel = index_open(idx, AccessShareLock);
IndexInfo *info;
- AttrNumber *attmap;
+ AttrMap *attmap;
bool found = false;
Oid constraintOid;
@@ -15849,8 +15848,8 @@ AttachPartitionEnsureIndexes(Relation rel, Relation attachrel)
/* construct an indexinfo to compare existing indexes against */
info = BuildIndexInfo(idxRel);
- attmap = convert_tuples_by_name_map(RelationGetDescr(attachrel),
- RelationGetDescr(rel));
+ attmap = build_attrmap_by_name(RelationGetDescr(attachrel),
+ RelationGetDescr(rel));
constraintOid = get_relation_idx_constraint_oid(RelationGetRelid(rel), idx);
/*
@@ -15872,8 +15871,7 @@ AttachPartitionEnsureIndexes(Relation rel, Relation attachrel)
idxRel->rd_indcollation,
attachrelIdxRels[i]->rd_opfamily,
idxRel->rd_opfamily,
- attmap,
- RelationGetDescr(rel)->natts))
+ attmap))
{
/*
* If this index is being created in the parent because of a
@@ -15914,7 +15912,6 @@ AttachPartitionEnsureIndexes(Relation rel, Relation attachrel)
stmt = generateClonedIndexStmt(NULL,
idxRel, attmap,
- RelationGetDescr(rel)->natts,
&constraintOid);
DefineIndex(RelationGetRelid(attachrel), stmt, InvalidOid,
RelationGetRelid(idxRel),
@@ -16380,7 +16377,7 @@ ATExecAttachPartitionIdx(List **wqueue, Relation parentIdx, RangeVar *name)
{
IndexInfo *childInfo;
IndexInfo *parentInfo;
- AttrNumber *attmap;
+ AttrMap *attmap;
bool found;
int i;
PartitionDesc partDesc;
@@ -16425,15 +16422,14 @@ ATExecAttachPartitionIdx(List **wqueue, Relation parentIdx, RangeVar *name)
/* Ensure the indexes are compatible */
childInfo = BuildIndexInfo(partIdx);
parentInfo = BuildIndexInfo(parentIdx);
- attmap = convert_tuples_by_name_map(RelationGetDescr(partTbl),
- RelationGetDescr(parentTbl));
+ attmap = build_attrmap_by_name(RelationGetDescr(partTbl),
+ RelationGetDescr(parentTbl));
if (!CompareIndexInfo(childInfo, parentInfo,
partIdx->rd_indcollation,
parentIdx->rd_indcollation,
partIdx->rd_opfamily,
parentIdx->rd_opfamily,
- attmap,
- RelationGetDescr(parentTbl)->natts))
+ attmap))
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("cannot attach index \"%s\" as a partition of index \"%s\"",
@@ -16470,7 +16466,7 @@ ATExecAttachPartitionIdx(List **wqueue, Relation parentIdx, RangeVar *name)
ConstraintSetParentConstraint(cldConstrId, constraintOid,
RelationGetRelid(partTbl));
- pfree(attmap);
+ free_attrmap(attmap);
validatePartitionedIndex(parentIdx, parentTbl);
}