aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access')
-rw-r--r--src/backend/access/spgist/spgscan.c31
-rw-r--r--src/backend/access/spgist/spgutils.c84
-rw-r--r--src/backend/access/spgist/spgvalidate.c27
3 files changed, 128 insertions, 14 deletions
diff --git a/src/backend/access/spgist/spgscan.c b/src/backend/access/spgist/spgscan.c
index 20e67c3f7d1..06011d2c024 100644
--- a/src/backend/access/spgist/spgscan.c
+++ b/src/backend/access/spgist/spgscan.c
@@ -81,7 +81,10 @@ pairingheap_SpGistSearchItem_cmp(const pairingheap_node *a,
static void
spgFreeSearchItem(SpGistScanOpaque so, SpGistSearchItem *item)
{
- if (!so->state.attLeafType.attbyval &&
+ /* value is of type attType if isLeaf, else of type attLeafType */
+ /* (no, that is not backwards; yes, it's confusing) */
+ if (!(item->isLeaf ? so->state.attType.attbyval :
+ so->state.attLeafType.attbyval) &&
DatumGetPointer(item->value) != NULL)
pfree(DatumGetPointer(item->value));
@@ -296,6 +299,7 @@ spgbeginscan(Relation rel, int keysz, int orderbysz)
{
IndexScanDesc scan;
SpGistScanOpaque so;
+ TupleDesc outTupDesc;
int i;
scan = RelationGetIndexScan(rel, keysz, orderbysz);
@@ -314,8 +318,21 @@ spgbeginscan(Relation rel, int keysz, int orderbysz)
"SP-GiST traversal-value context",
ALLOCSET_DEFAULT_SIZES);
- /* Set up indexTupDesc and xs_hitupdesc in case it's an index-only scan */
- so->indexTupDesc = scan->xs_hitupdesc = RelationGetDescr(rel);
+ /*
+ * Set up indexTupDesc and xs_hitupdesc in case it's an index-only scan.
+ * (It's rather annoying to do this work when it might be wasted, but for
+ * most opclasses we can re-use the index reldesc instead of making one.)
+ */
+ if (so->state.attType.type ==
+ TupleDescAttr(RelationGetDescr(rel), 0)->atttypid)
+ outTupDesc = RelationGetDescr(rel);
+ else
+ {
+ outTupDesc = CreateTemplateTupleDesc(1);
+ TupleDescInitEntry(outTupDesc, 1, NULL,
+ so->state.attType.type, -1, 0);
+ }
+ so->indexTupDesc = scan->xs_hitupdesc = outTupDesc;
/* Allocate various arrays needed for order-by scans */
if (scan->numberOfOrderBys > 0)
@@ -447,9 +464,10 @@ spgNewHeapItem(SpGistScanOpaque so, int level, ItemPointer heapPtr,
item->level = level;
item->heapPtr = *heapPtr;
/* copy value to queue cxt out of tmp cxt */
+ /* caution: "leafValue" is of type attType not leafType */
item->value = isnull ? (Datum) 0 :
- datumCopy(leafValue, so->state.attLeafType.attbyval,
- so->state.attLeafType.attlen);
+ datumCopy(leafValue, so->state.attType.attbyval,
+ so->state.attType.attlen);
item->traversalValue = NULL;
item->isLeaf = true;
item->recheck = recheck;
@@ -497,6 +515,7 @@ spgLeafTest(SpGistScanOpaque so, SpGistSearchItem *item,
in.nkeys = so->numberOfKeys;
in.orderbys = so->orderByData;
in.norderbys = so->numberOfNonNullOrderBys;
+ Assert(!item->isLeaf); /* else reconstructedValue would be wrong type */
in.reconstructedValue = item->value;
in.traversalValue = item->traversalValue;
in.level = item->level;
@@ -563,6 +582,7 @@ spgInitInnerConsistentIn(spgInnerConsistentIn *in,
in->orderbys = so->orderByData;
in->nkeys = so->numberOfKeys;
in->norderbys = so->numberOfNonNullOrderBys;
+ Assert(!item->isLeaf); /* else reconstructedValue would be wrong type */
in->reconstructedValue = item->value;
in->traversalMemoryContext = so->traversalCxt;
in->traversalValue = item->traversalValue;
@@ -589,6 +609,7 @@ spgMakeInnerItem(SpGistScanOpaque so,
: parentItem->level;
/* Must copy value out of temp context */
+ /* (recall that reconstructed values are of type leafType) */
item->value = out->reconstructedValues
? datumCopy(out->reconstructedValues[i],
so->state.attLeafType.attbyval,
diff --git a/src/backend/access/spgist/spgutils.c b/src/backend/access/spgist/spgutils.c
index 949352ada76..f2ba72b5aa3 100644
--- a/src/backend/access/spgist/spgutils.c
+++ b/src/backend/access/spgist/spgutils.c
@@ -23,6 +23,7 @@
#include "access/xact.h"
#include "catalog/pg_amop.h"
#include "commands/vacuum.h"
+#include "nodes/nodeFuncs.h"
#include "storage/bufmgr.h"
#include "storage/indexfsm.h"
#include "storage/lmgr.h"
@@ -53,7 +54,7 @@ spghandler(PG_FUNCTION_ARGS)
amroutine->amoptionalkey = true;
amroutine->amsearcharray = false;
amroutine->amsearchnulls = true;
- amroutine->amstorage = false;
+ amroutine->amstorage = true;
amroutine->amclusterable = false;
amroutine->ampredlocks = false;
amroutine->amcanparallel = false;
@@ -89,6 +90,66 @@ spghandler(PG_FUNCTION_ARGS)
PG_RETURN_POINTER(amroutine);
}
+/*
+ * GetIndexInputType
+ * Determine the nominal input data type for an index column
+ *
+ * We define the "nominal" input type as the associated opclass's opcintype,
+ * or if that is a polymorphic type, the base type of the heap column or
+ * expression that is the index's input. The reason for preferring the
+ * opcintype is that non-polymorphic opclasses probably don't want to hear
+ * about binary-compatible input types. For instance, if a text opclass
+ * is being used with a varchar heap column, we want to report "text" not
+ * "varchar". Likewise, opclasses don't want to hear about domain types,
+ * so if we do consult the actual input type, we make sure to flatten domains.
+ *
+ * At some point maybe this should go somewhere else, but it's not clear
+ * if any other index AMs have a use for it.
+ */
+static Oid
+GetIndexInputType(Relation index, AttrNumber indexcol)
+{
+ Oid opcintype;
+ AttrNumber heapcol;
+ List *indexprs;
+ ListCell *indexpr_item;
+
+ Assert(index->rd_index != NULL);
+ Assert(indexcol > 0 && indexcol <= index->rd_index->indnkeyatts);
+ opcintype = index->rd_opcintype[indexcol - 1];
+ if (!IsPolymorphicType(opcintype))
+ return opcintype;
+ heapcol = index->rd_index->indkey.values[indexcol - 1];
+ if (heapcol != 0) /* Simple index column? */
+ return getBaseType(get_atttype(index->rd_index->indrelid, heapcol));
+
+ /*
+ * If the index expressions are already cached, skip calling
+ * RelationGetIndexExpressions, as it will make a copy which is overkill.
+ * We're not going to modify the trees, and we're not going to do anything
+ * that would invalidate the relcache entry before we're done.
+ */
+ if (index->rd_indexprs)
+ indexprs = index->rd_indexprs;
+ else
+ indexprs = RelationGetIndexExpressions(index);
+ indexpr_item = list_head(indexprs);
+ for (int i = 1; i <= index->rd_index->indnkeyatts; i++)
+ {
+ if (index->rd_index->indkey.values[i - 1] == 0)
+ {
+ /* expression column */
+ if (indexpr_item == NULL)
+ elog(ERROR, "wrong number of index expressions");
+ if (i == indexcol)
+ return getBaseType(exprType((Node *) lfirst(indexpr_item)));
+ indexpr_item = lnext(indexprs, indexpr_item);
+ }
+ }
+ elog(ERROR, "wrong number of index expressions");
+ return InvalidOid; /* keep compiler quiet */
+}
+
/* Fill in a SpGistTypeDesc struct with info about the specified data type */
static void
fillTypeDesc(SpGistTypeDesc *desc, Oid type)
@@ -121,11 +182,11 @@ spgGetCache(Relation index)
Assert(index->rd_att->natts == 1);
/*
- * Get the actual data type of the indexed column from the index
- * tupdesc. We pass this to the opclass config function so that
+ * Get the actual (well, nominal) data type of the column being
+ * indexed. We pass this to the opclass config function so that
* polymorphic opclasses are possible.
*/
- atttype = TupleDescAttr(index->rd_att, 0)->atttypid;
+ atttype = GetIndexInputType(index, 1);
/* Call the config function to get config info for the opclass */
in.attType = atttype;
@@ -136,11 +197,21 @@ spgGetCache(Relation index)
PointerGetDatum(&in),
PointerGetDatum(&cache->config));
+ /*
+ * If leafType isn't specified, use the declared index column type,
+ * which index.c will have derived from the opclass's opcintype.
+ * (Although we now make spgvalidate.c warn if these aren't the same,
+ * old user-defined opclasses may not set the STORAGE parameter
+ * correctly, so believe leafType if it's given.)
+ */
+ if (!OidIsValid(cache->config.leafType))
+ cache->config.leafType =
+ TupleDescAttr(RelationGetDescr(index), 0)->atttypid;
+
/* Get the information we need about each relevant datatype */
fillTypeDesc(&cache->attType, atttype);
- if (OidIsValid(cache->config.leafType) &&
- cache->config.leafType != atttype)
+ if (cache->config.leafType != atttype)
{
if (!OidIsValid(index_getprocid(index, 1, SPGIST_COMPRESS_PROC)))
ereport(ERROR,
@@ -151,6 +222,7 @@ spgGetCache(Relation index)
}
else
{
+ /* Save lookups in this common case */
cache->attLeafType = cache->attType;
}
diff --git a/src/backend/access/spgist/spgvalidate.c b/src/backend/access/spgist/spgvalidate.c
index 8bc3889a4de..472a28b8080 100644
--- a/src/backend/access/spgist/spgvalidate.c
+++ b/src/backend/access/spgist/spgvalidate.c
@@ -43,6 +43,7 @@ spgvalidate(Oid opclassoid)
Form_pg_opclass classform;
Oid opfamilyoid;
Oid opcintype;
+ Oid opckeytype;
char *opclassname;
HeapTuple familytup;
Form_pg_opfamily familyform;
@@ -57,6 +58,7 @@ spgvalidate(Oid opclassoid)
spgConfigOut configOut;
Oid configOutLefttype = InvalidOid;
Oid configOutRighttype = InvalidOid;
+ Oid configOutLeafType = InvalidOid;
/* Fetch opclass information */
classtup = SearchSysCache1(CLAOID, ObjectIdGetDatum(opclassoid));
@@ -66,6 +68,7 @@ spgvalidate(Oid opclassoid)
opfamilyoid = classform->opcfamily;
opcintype = classform->opcintype;
+ opckeytype = classform->opckeytype;
opclassname = NameStr(classform->opcname);
/* Fetch opfamily information */
@@ -118,13 +121,31 @@ spgvalidate(Oid opclassoid)
configOutLefttype = procform->amproclefttype;
configOutRighttype = procform->amprocrighttype;
+ /* Default leaf type is opckeytype or input type */
+ if (OidIsValid(opckeytype))
+ configOutLeafType = opckeytype;
+ else
+ configOutLeafType = procform->amproclefttype;
+
+ /* If some other leaf datum type is specified, warn */
+ if (OidIsValid(configOut.leafType) &&
+ configOutLeafType != configOut.leafType)
+ {
+ ereport(INFO,
+ (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
+ errmsg("SP-GiST leaf data type %s does not match declared type %s",
+ format_type_be(configOut.leafType),
+ format_type_be(configOutLeafType))));
+ result = false;
+ configOutLeafType = configOut.leafType;
+ }
+
/*
* When leaf and attribute types are the same, compress
* function is not required and we set corresponding bit in
* functionset for later group consistency check.
*/
- if (!OidIsValid(configOut.leafType) ||
- configOut.leafType == configIn.attType)
+ if (configOutLeafType == configIn.attType)
{
foreach(lc, grouplist)
{
@@ -156,7 +177,7 @@ spgvalidate(Oid opclassoid)
ok = false;
else
ok = check_amproc_signature(procform->amproc,
- configOut.leafType, true,
+ configOutLeafType, true,
1, 1, procform->amproclefttype);
break;
case SPGIST_OPTIONS_PROC: