diff options
Diffstat (limited to 'src/backend/access/nbtree/nbtutils.c')
-rw-r--r-- | src/backend/access/nbtree/nbtutils.c | 457 |
1 files changed, 450 insertions, 7 deletions
diff --git a/src/backend/access/nbtree/nbtutils.c b/src/backend/access/nbtree/nbtutils.c index b6fb3867bf0..134abe6d596 100644 --- a/src/backend/access/nbtree/nbtutils.c +++ b/src/backend/access/nbtree/nbtutils.c @@ -21,10 +21,26 @@ #include "access/reloptions.h" #include "access/relscan.h" #include "miscadmin.h" +#include "utils/array.h" #include "utils/lsyscache.h" +#include "utils/memutils.h" #include "utils/rel.h" +typedef struct BTSortArrayContext +{ + FmgrInfo flinfo; + Oid collation; + bool reverse; +} BTSortArrayContext; + +static Datum _bt_find_extreme_element(IndexScanDesc scan, ScanKey skey, + StrategyNumber strat, + Datum *elems, int nelems); +static int _bt_sort_array_elements(IndexScanDesc scan, ScanKey skey, + bool reverse, + Datum *elems, int nelems); +static int _bt_compare_array_elements(const void *a, const void *b, void *arg); static bool _bt_compare_scankey_args(IndexScanDesc scan, ScanKey op, ScanKey leftarg, ScanKey rightarg, bool *result); @@ -159,12 +175,432 @@ _bt_freestack(BTStack stack) /* + * _bt_preprocess_array_keys() -- Preprocess SK_SEARCHARRAY scan keys + * + * If there are any SK_SEARCHARRAY scan keys, deconstruct the array(s) and + * set up BTArrayKeyInfo info for each one that is an equality-type key. + * Prepare modified scan keys in so->arrayKeyData, which will hold the current + * array elements during each primitive indexscan operation. For inequality + * array keys, it's sufficient to find the extreme element value and replace + * the whole array with that scalar value. + * + * Note: the reason we need so->arrayKeyData, rather than just scribbling + * on scan->keyData, is that callers are permitted to call btrescan without + * supplying a new set of scankey data. + */ +void +_bt_preprocess_array_keys(IndexScanDesc scan) +{ + BTScanOpaque so = (BTScanOpaque) scan->opaque; + int numberOfKeys = scan->numberOfKeys; + int16 *indoption = scan->indexRelation->rd_indoption; + int numArrayKeys; + ScanKey cur; + int i; + MemoryContext oldContext; + + /* Quick check to see if there are any array keys */ + numArrayKeys = 0; + for (i = 0; i < numberOfKeys; i++) + { + cur = &scan->keyData[i]; + if (cur->sk_flags & SK_SEARCHARRAY) + { + numArrayKeys++; + Assert(!(cur->sk_flags & (SK_ROW_HEADER | SK_SEARCHNULL | SK_SEARCHNOTNULL))); + /* If any arrays are null as a whole, we can quit right now. */ + if (cur->sk_flags & SK_ISNULL) + { + so->numArrayKeys = -1; + so->arrayKeyData = NULL; + return; + } + } + } + + /* Quit if nothing to do. */ + if (numArrayKeys == 0) + { + so->numArrayKeys = 0; + so->arrayKeyData = NULL; + return; + } + + /* + * Make a scan-lifespan context to hold array-associated data, or reset + * it if we already have one from a previous rescan cycle. + */ + if (so->arrayContext == NULL) + so->arrayContext = AllocSetContextCreate(CurrentMemoryContext, + "BTree Array Context", + ALLOCSET_SMALL_MINSIZE, + ALLOCSET_SMALL_INITSIZE, + ALLOCSET_SMALL_MAXSIZE); + else + MemoryContextReset(so->arrayContext); + + oldContext = MemoryContextSwitchTo(so->arrayContext); + + /* Create modifiable copy of scan->keyData in the workspace context */ + so->arrayKeyData = (ScanKey) palloc(scan->numberOfKeys * sizeof(ScanKeyData)); + memcpy(so->arrayKeyData, + scan->keyData, + scan->numberOfKeys * sizeof(ScanKeyData)); + + /* Allocate space for per-array data in the workspace context */ + so->arrayKeys = (BTArrayKeyInfo *) palloc0(numArrayKeys * sizeof(BTArrayKeyInfo)); + + /* Now process each array key */ + numArrayKeys = 0; + for (i = 0; i < numberOfKeys; i++) + { + ArrayType *arrayval; + int16 elmlen; + bool elmbyval; + char elmalign; + int num_elems; + Datum *elem_values; + bool *elem_nulls; + int num_nonnulls; + int j; + + cur = &so->arrayKeyData[i]; + if (!(cur->sk_flags & SK_SEARCHARRAY)) + continue; + + /* + * First, deconstruct the array into elements. Anything allocated + * here (including a possibly detoasted array value) is in the + * workspace context. + */ + arrayval = DatumGetArrayTypeP(cur->sk_argument); + /* We could cache this data, but not clear it's worth it */ + get_typlenbyvalalign(ARR_ELEMTYPE(arrayval), + &elmlen, &elmbyval, &elmalign); + deconstruct_array(arrayval, + ARR_ELEMTYPE(arrayval), + elmlen, elmbyval, elmalign, + &elem_values, &elem_nulls, &num_elems); + + /* + * Compress out any null elements. We can ignore them since we assume + * all btree operators are strict. + */ + num_nonnulls = 0; + for (j = 0; j < num_elems; j++) + { + if (!elem_nulls[j]) + elem_values[num_nonnulls++] = elem_values[j]; + } + + /* We could pfree(elem_nulls) now, but not worth the cycles */ + + /* If there's no non-nulls, the scan qual is unsatisfiable */ + if (num_nonnulls == 0) + { + numArrayKeys = -1; + break; + } + + /* + * If the comparison operator is not equality, then the array qual + * degenerates to a simple comparison against the smallest or largest + * non-null array element, as appropriate. + */ + switch (cur->sk_strategy) + { + case BTLessStrategyNumber: + case BTLessEqualStrategyNumber: + cur->sk_argument = + _bt_find_extreme_element(scan, cur, + BTGreaterStrategyNumber, + elem_values, num_nonnulls); + continue; + case BTEqualStrategyNumber: + /* proceed with rest of loop */ + break; + case BTGreaterEqualStrategyNumber: + case BTGreaterStrategyNumber: + cur->sk_argument = + _bt_find_extreme_element(scan, cur, + BTLessStrategyNumber, + elem_values, num_nonnulls); + continue; + default: + elog(ERROR, "unrecognized StrategyNumber: %d", + (int) cur->sk_strategy); + break; + } + + /* + * Sort the non-null elements and eliminate any duplicates. We must + * sort in the same ordering used by the index column, so that the + * successive primitive indexscans produce data in index order. + */ + num_elems = _bt_sort_array_elements(scan, cur, + (indoption[cur->sk_attno - 1] & INDOPTION_DESC) != 0, + elem_values, num_nonnulls); + + /* + * And set up the BTArrayKeyInfo data. + */ + so->arrayKeys[numArrayKeys].scan_key = i; + so->arrayKeys[numArrayKeys].num_elems = num_elems; + so->arrayKeys[numArrayKeys].elem_values = elem_values; + numArrayKeys++; + } + + so->numArrayKeys = numArrayKeys; + + MemoryContextSwitchTo(oldContext); +} + +/* + * _bt_find_extreme_element() -- get least or greatest array element + * + * scan and skey identify the index column, whose opfamily determines the + * comparison semantics. strat should be BTLessStrategyNumber to get the + * least element, or BTGreaterStrategyNumber to get the greatest. + */ +static Datum +_bt_find_extreme_element(IndexScanDesc scan, ScanKey skey, + StrategyNumber strat, + Datum *elems, int nelems) +{ + Relation rel = scan->indexRelation; + Oid elemtype, + cmp_op; + RegProcedure cmp_proc; + FmgrInfo flinfo; + Datum result; + int i; + + /* + * Determine the nominal datatype of the array elements. We have to + * support the convention that sk_subtype == InvalidOid means the opclass + * input type; this is a hack to simplify life for ScanKeyInit(). + */ + elemtype = skey->sk_subtype; + if (elemtype == InvalidOid) + elemtype = rel->rd_opcintype[skey->sk_attno - 1]; + + /* + * Look up the appropriate comparison operator in the opfamily. + * + * Note: it's possible that this would fail, if the opfamily is incomplete, + * but it seems quite unlikely that an opfamily would omit non-cross-type + * comparison operators for any datatype that it supports at all. + */ + cmp_op = get_opfamily_member(rel->rd_opfamily[skey->sk_attno - 1], + elemtype, + elemtype, + strat); + if (!OidIsValid(cmp_op)) + elog(ERROR, "missing operator %d(%u,%u) in opfamily %u", + strat, elemtype, elemtype, + rel->rd_opfamily[skey->sk_attno - 1]); + cmp_proc = get_opcode(cmp_op); + if (!RegProcedureIsValid(cmp_proc)) + elog(ERROR, "missing oprcode for operator %u", cmp_op); + + fmgr_info(cmp_proc, &flinfo); + + Assert(nelems > 0); + result = elems[0]; + for (i = 1; i < nelems; i++) + { + if (DatumGetBool(FunctionCall2Coll(&flinfo, + skey->sk_collation, + elems[i], + result))) + result = elems[i]; + } + + return result; +} + +/* + * _bt_sort_array_elements() -- sort and de-dup array elements + * + * The array elements are sorted in-place, and the new number of elements + * after duplicate removal is returned. + * + * scan and skey identify the index column, whose opfamily determines the + * comparison semantics. If reverse is true, we sort in descending order. + */ +static int +_bt_sort_array_elements(IndexScanDesc scan, ScanKey skey, + bool reverse, + Datum *elems, int nelems) +{ + Relation rel = scan->indexRelation; + Oid elemtype; + RegProcedure cmp_proc; + BTSortArrayContext cxt; + int last_non_dup; + int i; + + if (nelems <= 1) + return nelems; /* no work to do */ + + /* + * Determine the nominal datatype of the array elements. We have to + * support the convention that sk_subtype == InvalidOid means the opclass + * input type; this is a hack to simplify life for ScanKeyInit(). + */ + elemtype = skey->sk_subtype; + if (elemtype == InvalidOid) + elemtype = rel->rd_opcintype[skey->sk_attno - 1]; + + /* + * Look up the appropriate comparison function in the opfamily. + * + * Note: it's possible that this would fail, if the opfamily is incomplete, + * but it seems quite unlikely that an opfamily would omit non-cross-type + * support functions for any datatype that it supports at all. + */ + cmp_proc = get_opfamily_proc(rel->rd_opfamily[skey->sk_attno - 1], + elemtype, + elemtype, + BTORDER_PROC); + if (!RegProcedureIsValid(cmp_proc)) + elog(ERROR, "missing support function %d(%u,%u) in opfamily %u", + BTORDER_PROC, elemtype, elemtype, + rel->rd_opfamily[skey->sk_attno - 1]); + + /* Sort the array elements */ + fmgr_info(cmp_proc, &cxt.flinfo); + cxt.collation = skey->sk_collation; + cxt.reverse = reverse; + qsort_arg((void *) elems, nelems, sizeof(Datum), + _bt_compare_array_elements, (void *) &cxt); + + /* Now scan the sorted elements and remove duplicates */ + last_non_dup = 0; + for (i = 1; i < nelems; i++) + { + int32 compare; + + compare = DatumGetInt32(FunctionCall2Coll(&cxt.flinfo, + cxt.collation, + elems[last_non_dup], + elems[i])); + if (compare != 0) + elems[++last_non_dup] = elems[i]; + } + + return last_non_dup + 1; +} + +/* + * qsort_arg comparator for sorting array elements + */ +static int +_bt_compare_array_elements(const void *a, const void *b, void *arg) +{ + Datum da = *((const Datum *) a); + Datum db = *((const Datum *) b); + BTSortArrayContext *cxt = (BTSortArrayContext *) arg; + int32 compare; + + compare = DatumGetInt32(FunctionCall2Coll(&cxt->flinfo, + cxt->collation, + da, db)); + if (cxt->reverse) + compare = -compare; + return compare; +} + +/* + * _bt_start_array_keys() -- Initialize array keys at start of a scan + * + * Set up the cur_elem counters and fill in the first sk_argument value for + * each array scankey. We can't do this until we know the scan direction. + */ +void +_bt_start_array_keys(IndexScanDesc scan, ScanDirection dir) +{ + BTScanOpaque so = (BTScanOpaque) scan->opaque; + int i; + + for (i = 0; i < so->numArrayKeys; i++) + { + BTArrayKeyInfo *curArrayKey = &so->arrayKeys[i]; + ScanKey skey = &so->arrayKeyData[curArrayKey->scan_key]; + + Assert(curArrayKey->num_elems > 0); + if (ScanDirectionIsBackward(dir)) + curArrayKey->cur_elem = curArrayKey->num_elems - 1; + else + curArrayKey->cur_elem = 0; + skey->sk_argument = curArrayKey->elem_values[curArrayKey->cur_elem]; + } +} + +/* + * _bt_advance_array_keys() -- Advance to next set of array elements + * + * Returns TRUE if there is another set of values to consider, FALSE if not. + * On TRUE result, the scankeys are initialized with the next set of values. + */ +bool +_bt_advance_array_keys(IndexScanDesc scan, ScanDirection dir) +{ + BTScanOpaque so = (BTScanOpaque) scan->opaque; + bool found = false; + int i; + + /* + * We must advance the last array key most quickly, since it will + * correspond to the lowest-order index column among the available + * qualifications. This is necessary to ensure correct ordering of output + * when there are multiple array keys. + */ + for (i = so->numArrayKeys - 1; i >= 0; i--) + { + BTArrayKeyInfo *curArrayKey = &so->arrayKeys[i]; + ScanKey skey = &so->arrayKeyData[curArrayKey->scan_key]; + int cur_elem = curArrayKey->cur_elem; + int num_elems = curArrayKey->num_elems; + + if (ScanDirectionIsBackward(dir)) + { + if (--cur_elem < 0) + { + cur_elem = num_elems - 1; + found = false; /* need to advance next array key */ + } + else + found = true; + } + else + { + if (++cur_elem >= num_elems) + { + cur_elem = 0; + found = false; /* need to advance next array key */ + } + else + found = true; + } + + curArrayKey->cur_elem = cur_elem; + skey->sk_argument = curArrayKey->elem_values[cur_elem]; + if (found) + break; + } + + return found; +} + + +/* * _bt_preprocess_keys() -- Preprocess scan keys * - * The caller-supplied search-type keys (in scan->keyData[]) are copied to - * so->keyData[] with possible transformation. scan->numberOfKeys is - * the number of input keys, so->numberOfKeys gets the number of output - * keys (possibly less, never greater). + * The given search-type keys (in scan->keyData[] or so->arrayKeyData[]) + * are copied to so->keyData[] with possible transformation. + * scan->numberOfKeys is the number of input keys, so->numberOfKeys gets + * the number of output keys (possibly less, never greater). * * The output keys are marked with additional sk_flag bits beyond the * system-standard bits supplied by the caller. The DESC and NULLS_FIRST @@ -226,8 +662,8 @@ _bt_freestack(BTStack stack) * * Note: the reason we have to copy the preprocessed scan keys into private * storage is that we are modifying the array based on comparisons of the - * key argument values, which could change on a rescan. Therefore we can't - * overwrite the caller's data structure. + * key argument values, which could change on a rescan or after moving to + * new elements of array keys. Therefore we can't overwrite the source data. */ void _bt_preprocess_keys(IndexScanDesc scan) @@ -253,7 +689,14 @@ _bt_preprocess_keys(IndexScanDesc scan) if (numberOfKeys < 1) return; /* done if qual-less scan */ - inkeys = scan->keyData; + /* + * Read so->arrayKeyData if array keys are present, else scan->keyData + */ + if (so->arrayKeyData != NULL) + inkeys = so->arrayKeyData; + else + inkeys = scan->keyData; + outkeys = so->keyData; cur = &inkeys[0]; /* we check that input keys are correctly ordered */ |