diff options
Diffstat (limited to 'src/backend/access')
-rw-r--r-- | src/backend/access/spgist/spgscan.c | 31 | ||||
-rw-r--r-- | src/backend/access/spgist/spgutils.c | 84 | ||||
-rw-r--r-- | src/backend/access/spgist/spgvalidate.c | 27 |
3 files changed, 128 insertions, 14 deletions
diff --git a/src/backend/access/spgist/spgscan.c b/src/backend/access/spgist/spgscan.c index 20e67c3f7d1..06011d2c024 100644 --- a/src/backend/access/spgist/spgscan.c +++ b/src/backend/access/spgist/spgscan.c @@ -81,7 +81,10 @@ pairingheap_SpGistSearchItem_cmp(const pairingheap_node *a, static void spgFreeSearchItem(SpGistScanOpaque so, SpGistSearchItem *item) { - if (!so->state.attLeafType.attbyval && + /* value is of type attType if isLeaf, else of type attLeafType */ + /* (no, that is not backwards; yes, it's confusing) */ + if (!(item->isLeaf ? so->state.attType.attbyval : + so->state.attLeafType.attbyval) && DatumGetPointer(item->value) != NULL) pfree(DatumGetPointer(item->value)); @@ -296,6 +299,7 @@ spgbeginscan(Relation rel, int keysz, int orderbysz) { IndexScanDesc scan; SpGistScanOpaque so; + TupleDesc outTupDesc; int i; scan = RelationGetIndexScan(rel, keysz, orderbysz); @@ -314,8 +318,21 @@ spgbeginscan(Relation rel, int keysz, int orderbysz) "SP-GiST traversal-value context", ALLOCSET_DEFAULT_SIZES); - /* Set up indexTupDesc and xs_hitupdesc in case it's an index-only scan */ - so->indexTupDesc = scan->xs_hitupdesc = RelationGetDescr(rel); + /* + * Set up indexTupDesc and xs_hitupdesc in case it's an index-only scan. + * (It's rather annoying to do this work when it might be wasted, but for + * most opclasses we can re-use the index reldesc instead of making one.) + */ + if (so->state.attType.type == + TupleDescAttr(RelationGetDescr(rel), 0)->atttypid) + outTupDesc = RelationGetDescr(rel); + else + { + outTupDesc = CreateTemplateTupleDesc(1); + TupleDescInitEntry(outTupDesc, 1, NULL, + so->state.attType.type, -1, 0); + } + so->indexTupDesc = scan->xs_hitupdesc = outTupDesc; /* Allocate various arrays needed for order-by scans */ if (scan->numberOfOrderBys > 0) @@ -447,9 +464,10 @@ spgNewHeapItem(SpGistScanOpaque so, int level, ItemPointer heapPtr, item->level = level; item->heapPtr = *heapPtr; /* copy value to queue cxt out of tmp cxt */ + /* caution: "leafValue" is of type attType not leafType */ item->value = isnull ? (Datum) 0 : - datumCopy(leafValue, so->state.attLeafType.attbyval, - so->state.attLeafType.attlen); + datumCopy(leafValue, so->state.attType.attbyval, + so->state.attType.attlen); item->traversalValue = NULL; item->isLeaf = true; item->recheck = recheck; @@ -497,6 +515,7 @@ spgLeafTest(SpGistScanOpaque so, SpGistSearchItem *item, in.nkeys = so->numberOfKeys; in.orderbys = so->orderByData; in.norderbys = so->numberOfNonNullOrderBys; + Assert(!item->isLeaf); /* else reconstructedValue would be wrong type */ in.reconstructedValue = item->value; in.traversalValue = item->traversalValue; in.level = item->level; @@ -563,6 +582,7 @@ spgInitInnerConsistentIn(spgInnerConsistentIn *in, in->orderbys = so->orderByData; in->nkeys = so->numberOfKeys; in->norderbys = so->numberOfNonNullOrderBys; + Assert(!item->isLeaf); /* else reconstructedValue would be wrong type */ in->reconstructedValue = item->value; in->traversalMemoryContext = so->traversalCxt; in->traversalValue = item->traversalValue; @@ -589,6 +609,7 @@ spgMakeInnerItem(SpGistScanOpaque so, : parentItem->level; /* Must copy value out of temp context */ + /* (recall that reconstructed values are of type leafType) */ item->value = out->reconstructedValues ? datumCopy(out->reconstructedValues[i], so->state.attLeafType.attbyval, diff --git a/src/backend/access/spgist/spgutils.c b/src/backend/access/spgist/spgutils.c index 949352ada76..f2ba72b5aa3 100644 --- a/src/backend/access/spgist/spgutils.c +++ b/src/backend/access/spgist/spgutils.c @@ -23,6 +23,7 @@ #include "access/xact.h" #include "catalog/pg_amop.h" #include "commands/vacuum.h" +#include "nodes/nodeFuncs.h" #include "storage/bufmgr.h" #include "storage/indexfsm.h" #include "storage/lmgr.h" @@ -53,7 +54,7 @@ spghandler(PG_FUNCTION_ARGS) amroutine->amoptionalkey = true; amroutine->amsearcharray = false; amroutine->amsearchnulls = true; - amroutine->amstorage = false; + amroutine->amstorage = true; amroutine->amclusterable = false; amroutine->ampredlocks = false; amroutine->amcanparallel = false; @@ -89,6 +90,66 @@ spghandler(PG_FUNCTION_ARGS) PG_RETURN_POINTER(amroutine); } +/* + * GetIndexInputType + * Determine the nominal input data type for an index column + * + * We define the "nominal" input type as the associated opclass's opcintype, + * or if that is a polymorphic type, the base type of the heap column or + * expression that is the index's input. The reason for preferring the + * opcintype is that non-polymorphic opclasses probably don't want to hear + * about binary-compatible input types. For instance, if a text opclass + * is being used with a varchar heap column, we want to report "text" not + * "varchar". Likewise, opclasses don't want to hear about domain types, + * so if we do consult the actual input type, we make sure to flatten domains. + * + * At some point maybe this should go somewhere else, but it's not clear + * if any other index AMs have a use for it. + */ +static Oid +GetIndexInputType(Relation index, AttrNumber indexcol) +{ + Oid opcintype; + AttrNumber heapcol; + List *indexprs; + ListCell *indexpr_item; + + Assert(index->rd_index != NULL); + Assert(indexcol > 0 && indexcol <= index->rd_index->indnkeyatts); + opcintype = index->rd_opcintype[indexcol - 1]; + if (!IsPolymorphicType(opcintype)) + return opcintype; + heapcol = index->rd_index->indkey.values[indexcol - 1]; + if (heapcol != 0) /* Simple index column? */ + return getBaseType(get_atttype(index->rd_index->indrelid, heapcol)); + + /* + * If the index expressions are already cached, skip calling + * RelationGetIndexExpressions, as it will make a copy which is overkill. + * We're not going to modify the trees, and we're not going to do anything + * that would invalidate the relcache entry before we're done. + */ + if (index->rd_indexprs) + indexprs = index->rd_indexprs; + else + indexprs = RelationGetIndexExpressions(index); + indexpr_item = list_head(indexprs); + for (int i = 1; i <= index->rd_index->indnkeyatts; i++) + { + if (index->rd_index->indkey.values[i - 1] == 0) + { + /* expression column */ + if (indexpr_item == NULL) + elog(ERROR, "wrong number of index expressions"); + if (i == indexcol) + return getBaseType(exprType((Node *) lfirst(indexpr_item))); + indexpr_item = lnext(indexprs, indexpr_item); + } + } + elog(ERROR, "wrong number of index expressions"); + return InvalidOid; /* keep compiler quiet */ +} + /* Fill in a SpGistTypeDesc struct with info about the specified data type */ static void fillTypeDesc(SpGistTypeDesc *desc, Oid type) @@ -121,11 +182,11 @@ spgGetCache(Relation index) Assert(index->rd_att->natts == 1); /* - * Get the actual data type of the indexed column from the index - * tupdesc. We pass this to the opclass config function so that + * Get the actual (well, nominal) data type of the column being + * indexed. We pass this to the opclass config function so that * polymorphic opclasses are possible. */ - atttype = TupleDescAttr(index->rd_att, 0)->atttypid; + atttype = GetIndexInputType(index, 1); /* Call the config function to get config info for the opclass */ in.attType = atttype; @@ -136,11 +197,21 @@ spgGetCache(Relation index) PointerGetDatum(&in), PointerGetDatum(&cache->config)); + /* + * If leafType isn't specified, use the declared index column type, + * which index.c will have derived from the opclass's opcintype. + * (Although we now make spgvalidate.c warn if these aren't the same, + * old user-defined opclasses may not set the STORAGE parameter + * correctly, so believe leafType if it's given.) + */ + if (!OidIsValid(cache->config.leafType)) + cache->config.leafType = + TupleDescAttr(RelationGetDescr(index), 0)->atttypid; + /* Get the information we need about each relevant datatype */ fillTypeDesc(&cache->attType, atttype); - if (OidIsValid(cache->config.leafType) && - cache->config.leafType != atttype) + if (cache->config.leafType != atttype) { if (!OidIsValid(index_getprocid(index, 1, SPGIST_COMPRESS_PROC))) ereport(ERROR, @@ -151,6 +222,7 @@ spgGetCache(Relation index) } else { + /* Save lookups in this common case */ cache->attLeafType = cache->attType; } diff --git a/src/backend/access/spgist/spgvalidate.c b/src/backend/access/spgist/spgvalidate.c index 8bc3889a4de..472a28b8080 100644 --- a/src/backend/access/spgist/spgvalidate.c +++ b/src/backend/access/spgist/spgvalidate.c @@ -43,6 +43,7 @@ spgvalidate(Oid opclassoid) Form_pg_opclass classform; Oid opfamilyoid; Oid opcintype; + Oid opckeytype; char *opclassname; HeapTuple familytup; Form_pg_opfamily familyform; @@ -57,6 +58,7 @@ spgvalidate(Oid opclassoid) spgConfigOut configOut; Oid configOutLefttype = InvalidOid; Oid configOutRighttype = InvalidOid; + Oid configOutLeafType = InvalidOid; /* Fetch opclass information */ classtup = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclassoid)); @@ -66,6 +68,7 @@ spgvalidate(Oid opclassoid) opfamilyoid = classform->opcfamily; opcintype = classform->opcintype; + opckeytype = classform->opckeytype; opclassname = NameStr(classform->opcname); /* Fetch opfamily information */ @@ -118,13 +121,31 @@ spgvalidate(Oid opclassoid) configOutLefttype = procform->amproclefttype; configOutRighttype = procform->amprocrighttype; + /* Default leaf type is opckeytype or input type */ + if (OidIsValid(opckeytype)) + configOutLeafType = opckeytype; + else + configOutLeafType = procform->amproclefttype; + + /* If some other leaf datum type is specified, warn */ + if (OidIsValid(configOut.leafType) && + configOutLeafType != configOut.leafType) + { + ereport(INFO, + (errcode(ERRCODE_INVALID_OBJECT_DEFINITION), + errmsg("SP-GiST leaf data type %s does not match declared type %s", + format_type_be(configOut.leafType), + format_type_be(configOutLeafType)))); + result = false; + configOutLeafType = configOut.leafType; + } + /* * When leaf and attribute types are the same, compress * function is not required and we set corresponding bit in * functionset for later group consistency check. */ - if (!OidIsValid(configOut.leafType) || - configOut.leafType == configIn.attType) + if (configOutLeafType == configIn.attType) { foreach(lc, grouplist) { @@ -156,7 +177,7 @@ spgvalidate(Oid opclassoid) ok = false; else ok = check_amproc_signature(procform->amproc, - configOut.leafType, true, + configOutLeafType, true, 1, 1, procform->amproclefttype); break; case SPGIST_OPTIONS_PROC: |