diff options
author | Michael Paquier <michael@paquier.xyz> | 2019-12-18 16:23:02 +0900 |
---|---|---|
committer | Michael Paquier <michael@paquier.xyz> | 2019-12-18 16:23:02 +0900 |
commit | e1551f96e643a52a035c3b35777d968bc073f7fc (patch) | |
tree | 9010890c3289462dc42a51a4d03498423359d375 /src/backend/commands/tablecmds.c | |
parent | 04c8a69c0cccbc271e0feeb22a74c69fbd87c37e (diff) | |
download | postgresql-e1551f96e643a52a035c3b35777d968bc073f7fc.tar.gz postgresql-e1551f96e643a52a035c3b35777d968bc073f7fc.zip |
Refactor attribute mappings used in logical tuple conversion
Tuple conversion support in tupconvert.c is able to convert rowtypes
between two relations, inner and outer, which are logically equivalent
but have a different ordering or even dropped columns (used mainly for
inheritance tree and partitions). This makes use of attribute mappings,
which are simple arrays made of AttrNumber elements with a length
matching the number of attributes of the outer relation. The length of
the attribute mapping has been treated as completely independent of the
mapping itself until now, making it easy to pass down an incorrect
mapping length.
This commit refactors the code related to attribute mappings and moves
it into an independent facility called attmap.c, extracted from
tupconvert.c. This merges the attribute mapping with its length,
avoiding to try to guess what is the length of a mapping to use as this
is computed once, when the map is built.
This will avoid mistakes like what has been fixed in dc816e58, which has
used an incorrect mapping length by matching it with the number of
attributes of an inner relation (a child partition) instead of an outer
relation (a partitioned table).
Author: Michael Paquier
Reviewed-by: Amit Langote
Discussion: https://postgr.es/m/20191121042556.GD153437@paquier.xyz
Diffstat (limited to 'src/backend/commands/tablecmds.c')
-rw-r--r-- | src/backend/commands/tablecmds.c | 98 |
1 files changed, 47 insertions, 51 deletions
diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index daa80ec4aa0..e8e004eef4d 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -14,6 +14,7 @@ */ #include "postgres.h" +#include "access/attmap.h" #include "access/genam.h" #include "access/heapam.h" #include "access/heapam_xlog.h" @@ -22,7 +23,6 @@ #include "access/relscan.h" #include "access/sysattr.h" #include "access/tableam.h" -#include "access/tupconvert.h" #include "access/xact.h" #include "access/xlog.h" #include "catalog/catalog.h" @@ -1070,7 +1070,7 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId, foreach(cell, idxlist) { Relation idxRel = index_open(lfirst_oid(cell), AccessShareLock); - AttrNumber *attmap; + AttrMap *attmap; IndexStmt *idxstmt; Oid constraintOid; @@ -1090,12 +1090,11 @@ DefineRelation(CreateStmt *stmt, char relkind, Oid ownerId, } } - attmap = convert_tuples_by_name_map(RelationGetDescr(rel), - RelationGetDescr(parent)); + attmap = build_attrmap_by_name(RelationGetDescr(rel), + RelationGetDescr(parent)); idxstmt = generateClonedIndexStmt(NULL, idxRel, - attmap, RelationGetDescr(parent)->natts, - &constraintOid); + attmap, &constraintOid); DefineIndex(RelationGetRelid(rel), idxstmt, InvalidOid, @@ -2156,7 +2155,7 @@ MergeAttributes(List *schema, List *supers, char relpersistence, Relation relation; TupleDesc tupleDesc; TupleConstr *constr; - AttrNumber *newattno; + AttrMap *newattmap; AttrNumber parent_attno; /* caller already got lock */ @@ -2237,12 +2236,11 @@ MergeAttributes(List *schema, List *supers, char relpersistence, constr = tupleDesc->constr; /* - * newattno[] will contain the child-table attribute numbers for the - * attributes of this parent table. (They are not the same for - * parents after the first one, nor if we have dropped columns.) + * newattmap->attnums[] will contain the child-table attribute numbers + * for the attributes of this parent table. (They are not the same + * for parents after the first one, nor if we have dropped columns.) */ - newattno = (AttrNumber *) - palloc0(tupleDesc->natts * sizeof(AttrNumber)); + newattmap = make_attrmap(tupleDesc->natts); for (parent_attno = 1; parent_attno <= tupleDesc->natts; parent_attno++) @@ -2257,7 +2255,7 @@ MergeAttributes(List *schema, List *supers, char relpersistence, * Ignore dropped columns in the parent. */ if (attribute->attisdropped) - continue; /* leave newattno entry as zero */ + continue; /* leave newattmap->attnums entry as zero */ /* * Does it conflict with some previously inherited column? @@ -2315,7 +2313,7 @@ MergeAttributes(List *schema, List *supers, char relpersistence, /* Merge of NOT NULL constraints = OR 'em together */ def->is_not_null |= attribute->attnotnull; /* Default and other constraints are handled below */ - newattno[parent_attno - 1] = exist_attno; + newattmap->attnums[parent_attno - 1] = exist_attno; /* Check for GENERATED conflicts */ if (def->generated != attribute->attgenerated) @@ -2346,7 +2344,7 @@ MergeAttributes(List *schema, List *supers, char relpersistence, def->constraints = NIL; def->location = -1; inhSchema = lappend(inhSchema, def); - newattno[parent_attno - 1] = ++child_attno; + newattmap->attnums[parent_attno - 1] = ++child_attno; } /* @@ -2394,7 +2392,7 @@ MergeAttributes(List *schema, List *supers, char relpersistence, /* * Now copy the CHECK constraints of this parent, adjusting attnos - * using the completed newattno[] map. Identically named constraints + * using the completed newattmap map. Identically named constraints * are merged if possible, else we throw error. */ if (constr && constr->num_check > 0) @@ -2415,7 +2413,7 @@ MergeAttributes(List *schema, List *supers, char relpersistence, /* Adjust Vars to match new table's column numbering */ expr = map_variable_attnos(stringToNode(check[i].ccbin), 1, 0, - newattno, tupleDesc->natts, + newattmap, InvalidOid, &found_whole_row); /* @@ -2452,7 +2450,7 @@ MergeAttributes(List *schema, List *supers, char relpersistence, } } - pfree(newattno); + free_attrmap(newattmap); /* * Close the parent rel, but keep our lock on it until xact commit. @@ -8168,7 +8166,7 @@ addFkRecurseReferenced(List **wqueue, Constraint *fkconstraint, Relation rel, for (int i = 0; i < pd->nparts; i++) { Relation partRel; - AttrNumber *map; + AttrMap *map; AttrNumber *mapped_pkattnum; Oid partIndexId; @@ -8178,13 +8176,13 @@ addFkRecurseReferenced(List **wqueue, Constraint *fkconstraint, Relation rel, * Map the attribute numbers in the referenced side of the FK * definition to match the partition's column layout. */ - map = convert_tuples_by_name_map_if_req(RelationGetDescr(partRel), - RelationGetDescr(pkrel)); + map = build_attrmap_by_name_if_req(RelationGetDescr(partRel), + RelationGetDescr(pkrel)); if (map) { mapped_pkattnum = palloc(sizeof(AttrNumber) * numfks); for (int j = 0; j < numfks; j++) - mapped_pkattnum[j] = map[pkattnum[j] - 1]; + mapped_pkattnum[j] = map->attnums[pkattnum[j] - 1]; } else mapped_pkattnum = pkattnum; @@ -8205,7 +8203,7 @@ addFkRecurseReferenced(List **wqueue, Constraint *fkconstraint, Relation rel, if (map) { pfree(mapped_pkattnum); - pfree(map); + free_attrmap(map); } } } @@ -8309,7 +8307,7 @@ addFkRecurseReferencing(List **wqueue, Constraint *fkconstraint, Relation rel, Oid partitionId = pd->oids[i]; Relation partition = table_open(partitionId, lockmode); List *partFKs; - AttrNumber *attmap; + AttrMap *attmap; AttrNumber mapped_fkattnum[INDEX_MAX_KEYS]; bool attached; char *conname; @@ -8320,10 +8318,10 @@ addFkRecurseReferencing(List **wqueue, Constraint *fkconstraint, Relation rel, CheckTableNotInUse(partition, "ALTER TABLE"); - attmap = convert_tuples_by_name_map(RelationGetDescr(partition), - RelationGetDescr(rel)); + attmap = build_attrmap_by_name(RelationGetDescr(partition), + RelationGetDescr(rel)); for (int j = 0; j < numfks; j++) - mapped_fkattnum[j] = attmap[fkattnum[j] - 1]; + mapped_fkattnum[j] = attmap->attnums[fkattnum[j] - 1]; /* Check whether an existing constraint can be repurposed */ partFKs = copyObject(RelationGetFKeyList(partition)); @@ -8471,7 +8469,7 @@ static void CloneFkReferenced(Relation parentRel, Relation partitionRel) { Relation pg_constraint; - AttrNumber *attmap; + AttrMap *attmap; ListCell *cell; SysScanDesc scan; ScanKeyData key[2]; @@ -8510,8 +8508,8 @@ CloneFkReferenced(Relation parentRel, Relation partitionRel) systable_endscan(scan); table_close(pg_constraint, RowShareLock); - attmap = convert_tuples_by_name_map(RelationGetDescr(partitionRel), - RelationGetDescr(parentRel)); + attmap = build_attrmap_by_name(RelationGetDescr(partitionRel), + RelationGetDescr(parentRel)); foreach(cell, clone) { Oid constrOid = lfirst_oid(cell); @@ -8549,8 +8547,9 @@ CloneFkReferenced(Relation parentRel, Relation partitionRel) conpfeqop, conppeqop, conffeqop); + Assert(numfks == attmap->maplen); for (int i = 0; i < numfks; i++) - mapped_confkey[i] = attmap[confkey[i] - 1]; + mapped_confkey[i] = attmap->attnums[confkey[i] - 1]; fkconstraint = makeNode(Constraint); /* for now this is all we need */ @@ -8617,7 +8616,7 @@ CloneFkReferenced(Relation parentRel, Relation partitionRel) static void CloneFkReferencing(List **wqueue, Relation parentRel, Relation partRel) { - AttrNumber *attmap; + AttrMap *attmap; List *partFKs; List *clone = NIL; ListCell *cell; @@ -8646,8 +8645,8 @@ CloneFkReferencing(List **wqueue, Relation parentRel, Relation partRel) * The constraint key may differ, if the columns in the partition are * different. This map is used to convert them. */ - attmap = convert_tuples_by_name_map(RelationGetDescr(partRel), - RelationGetDescr(parentRel)); + attmap = build_attrmap_by_name(RelationGetDescr(partRel), + RelationGetDescr(parentRel)); partFKs = copyObject(RelationGetFKeyList(partRel)); @@ -8697,7 +8696,7 @@ CloneFkReferencing(List **wqueue, Relation parentRel, Relation partRel) DeconstructFkConstraintRow(tuple, &numfks, conkey, confkey, conpfeqop, conppeqop, conffeqop); for (int i = 0; i < numfks; i++) - mapped_conkey[i] = attmap[conkey[i] - 1]; + mapped_conkey[i] = attmap->attnums[conkey[i] - 1]; /* * Before creating a new constraint, see whether any existing FKs are @@ -10470,18 +10469,18 @@ ATPrepAlterColumnType(List **wqueue, */ if (def->cooked_default) { - AttrNumber *attmap; + AttrMap *attmap; bool found_whole_row; /* create a copy to scribble on */ cmd = copyObject(cmd); - attmap = convert_tuples_by_name_map(RelationGetDescr(childrel), - RelationGetDescr(rel)); + attmap = build_attrmap_by_name(RelationGetDescr(childrel), + RelationGetDescr(rel)); ((ColumnDef *) cmd->def)->cooked_default = map_variable_attnos(def->cooked_default, 1, 0, - attmap, RelationGetDescr(rel)->natts, + attmap, InvalidOid, &found_whole_row); if (found_whole_row) ereport(ERROR, @@ -15833,7 +15832,7 @@ AttachPartitionEnsureIndexes(Relation rel, Relation attachrel) Oid idx = lfirst_oid(cell); Relation idxRel = index_open(idx, AccessShareLock); IndexInfo *info; - AttrNumber *attmap; + AttrMap *attmap; bool found = false; Oid constraintOid; @@ -15849,8 +15848,8 @@ AttachPartitionEnsureIndexes(Relation rel, Relation attachrel) /* construct an indexinfo to compare existing indexes against */ info = BuildIndexInfo(idxRel); - attmap = convert_tuples_by_name_map(RelationGetDescr(attachrel), - RelationGetDescr(rel)); + attmap = build_attrmap_by_name(RelationGetDescr(attachrel), + RelationGetDescr(rel)); constraintOid = get_relation_idx_constraint_oid(RelationGetRelid(rel), idx); /* @@ -15872,8 +15871,7 @@ AttachPartitionEnsureIndexes(Relation rel, Relation attachrel) idxRel->rd_indcollation, attachrelIdxRels[i]->rd_opfamily, idxRel->rd_opfamily, - attmap, - RelationGetDescr(rel)->natts)) + attmap)) { /* * If this index is being created in the parent because of a @@ -15914,7 +15912,6 @@ AttachPartitionEnsureIndexes(Relation rel, Relation attachrel) stmt = generateClonedIndexStmt(NULL, idxRel, attmap, - RelationGetDescr(rel)->natts, &constraintOid); DefineIndex(RelationGetRelid(attachrel), stmt, InvalidOid, RelationGetRelid(idxRel), @@ -16380,7 +16377,7 @@ ATExecAttachPartitionIdx(List **wqueue, Relation parentIdx, RangeVar *name) { IndexInfo *childInfo; IndexInfo *parentInfo; - AttrNumber *attmap; + AttrMap *attmap; bool found; int i; PartitionDesc partDesc; @@ -16425,15 +16422,14 @@ ATExecAttachPartitionIdx(List **wqueue, Relation parentIdx, RangeVar *name) /* Ensure the indexes are compatible */ childInfo = BuildIndexInfo(partIdx); parentInfo = BuildIndexInfo(parentIdx); - attmap = convert_tuples_by_name_map(RelationGetDescr(partTbl), - RelationGetDescr(parentTbl)); + attmap = build_attrmap_by_name(RelationGetDescr(partTbl), + RelationGetDescr(parentTbl)); if (!CompareIndexInfo(childInfo, parentInfo, partIdx->rd_indcollation, parentIdx->rd_indcollation, partIdx->rd_opfamily, parentIdx->rd_opfamily, - attmap, - RelationGetDescr(parentTbl)->natts)) + attmap)) ereport(ERROR, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), errmsg("cannot attach index \"%s\" as a partition of index \"%s\"", @@ -16470,7 +16466,7 @@ ATExecAttachPartitionIdx(List **wqueue, Relation parentIdx, RangeVar *name) ConstraintSetParentConstraint(cldConstrId, constraintOid, RelationGetRelid(partTbl)); - pfree(attmap); + free_attrmap(attmap); validatePartitionedIndex(parentIdx, parentTbl); } |