diff options
Diffstat (limited to 'src/backend/access/nbtree/nbtree.c')
-rw-r--r-- | src/backend/access/nbtree/nbtree.c | 149 |
1 files changed, 100 insertions, 49 deletions
diff --git a/src/backend/access/nbtree/nbtree.c b/src/backend/access/nbtree/nbtree.c index 3401bc5bdb2..60b7f599a74 100644 --- a/src/backend/access/nbtree/nbtree.c +++ b/src/backend/access/nbtree/nbtree.c @@ -276,39 +276,63 @@ btgettuple(PG_FUNCTION_ARGS) scan->xs_recheck = false; /* - * If we've already initialized this scan, we can just advance it in the - * appropriate direction. If we haven't done so yet, we call a routine to - * get the first item in the scan. + * If we have any array keys, initialize them during first call for a + * scan. We can't do this in btrescan because we don't know the scan + * direction at that time. */ - if (BTScanPosIsValid(so->currPos)) + if (so->numArrayKeys && !BTScanPosIsValid(so->currPos)) + { + /* punt if we have any unsatisfiable array keys */ + if (so->numArrayKeys < 0) + PG_RETURN_BOOL(false); + + _bt_start_array_keys(scan, dir); + } + + /* This loop handles advancing to the next array elements, if any */ + do { /* - * Check to see if we should kill the previously-fetched tuple. + * If we've already initialized this scan, we can just advance it in + * the appropriate direction. If we haven't done so yet, we call + * _bt_first() to get the first item in the scan. */ - if (scan->kill_prior_tuple) + if (!BTScanPosIsValid(so->currPos)) + res = _bt_first(scan, dir); + else { /* - * Yes, remember it for later. (We'll deal with all such tuples - * at once right before leaving the index page.) The test for - * numKilled overrun is not just paranoia: if the caller reverses - * direction in the indexscan then the same item might get entered - * multiple times. It's not worth trying to optimize that, so we - * don't detect it, but instead just forget any excess entries. + * Check to see if we should kill the previously-fetched tuple. */ - if (so->killedItems == NULL) - so->killedItems = (int *) - palloc(MaxIndexTuplesPerPage * sizeof(int)); - if (so->numKilled < MaxIndexTuplesPerPage) - so->killedItems[so->numKilled++] = so->currPos.itemIndex; + if (scan->kill_prior_tuple) + { + /* + * Yes, remember it for later. (We'll deal with all such + * tuples at once right before leaving the index page.) The + * test for numKilled overrun is not just paranoia: if the + * caller reverses direction in the indexscan then the same + * item might get entered multiple times. It's not worth + * trying to optimize that, so we don't detect it, but instead + * just forget any excess entries. + */ + if (so->killedItems == NULL) + so->killedItems = (int *) + palloc(MaxIndexTuplesPerPage * sizeof(int)); + if (so->numKilled < MaxIndexTuplesPerPage) + so->killedItems[so->numKilled++] = so->currPos.itemIndex; + } + + /* + * Now continue the scan. + */ + res = _bt_next(scan, dir); } - /* - * Now continue the scan. - */ - res = _bt_next(scan, dir); - } - else - res = _bt_first(scan, dir); + /* If we have a tuple, return it ... */ + if (res) + break; + /* ... otherwise see if we have more array keys to deal with */ + } while (so->numArrayKeys && _bt_advance_array_keys(scan, dir)); PG_RETURN_BOOL(res); } @@ -325,35 +349,50 @@ btgetbitmap(PG_FUNCTION_ARGS) int64 ntids = 0; ItemPointer heapTid; - /* Fetch the first page & tuple. */ - if (!_bt_first(scan, ForwardScanDirection)) + /* + * If we have any array keys, initialize them. + */ + if (so->numArrayKeys) { - /* empty scan */ - PG_RETURN_INT64(0); + /* punt if we have any unsatisfiable array keys */ + if (so->numArrayKeys < 0) + PG_RETURN_INT64(ntids); + + _bt_start_array_keys(scan, ForwardScanDirection); } - /* Save tuple ID, and continue scanning */ - heapTid = &scan->xs_ctup.t_self; - tbm_add_tuples(tbm, heapTid, 1, false); - ntids++; - for (;;) + /* This loop handles advancing to the next array elements, if any */ + do { - /* - * Advance to next tuple within page. This is the same as the easy - * case in _bt_next(). - */ - if (++so->currPos.itemIndex > so->currPos.lastItem) + /* Fetch the first page & tuple */ + if (_bt_first(scan, ForwardScanDirection)) { - /* let _bt_next do the heavy lifting */ - if (!_bt_next(scan, ForwardScanDirection)) - break; - } + /* Save tuple ID, and continue scanning */ + heapTid = &scan->xs_ctup.t_self; + tbm_add_tuples(tbm, heapTid, 1, false); + ntids++; - /* Save tuple ID, and continue scanning */ - heapTid = &so->currPos.items[so->currPos.itemIndex].heapTid; - tbm_add_tuples(tbm, heapTid, 1, false); - ntids++; - } + for (;;) + { + /* + * Advance to next tuple within page. This is the same as the + * easy case in _bt_next(). + */ + if (++so->currPos.itemIndex > so->currPos.lastItem) + { + /* let _bt_next do the heavy lifting */ + if (!_bt_next(scan, ForwardScanDirection)) + break; + } + + /* Save tuple ID, and continue scanning */ + heapTid = &so->currPos.items[so->currPos.itemIndex].heapTid; + tbm_add_tuples(tbm, heapTid, 1, false); + ntids++; + } + } + /* Now see if we have more array keys to deal with */ + } while (so->numArrayKeys && _bt_advance_array_keys(scan, ForwardScanDirection)); PG_RETURN_INT64(ntids); } @@ -383,6 +422,12 @@ btbeginscan(PG_FUNCTION_ARGS) so->keyData = (ScanKey) palloc(scan->numberOfKeys * sizeof(ScanKeyData)); else so->keyData = NULL; + + so->arrayKeyData = NULL; /* assume no array keys for now */ + so->numArrayKeys = 0; + so->arrayKeys = NULL; + so->arrayContext = NULL; + so->killedItems = NULL; /* until needed */ so->numKilled = 0; @@ -460,6 +505,9 @@ btrescan(PG_FUNCTION_ARGS) scan->numberOfKeys * sizeof(ScanKeyData)); so->numberOfKeys = 0; /* until _bt_preprocess_keys sets it */ + /* If any keys are SK_SEARCHARRAY type, set up array-key info */ + _bt_preprocess_array_keys(scan); + PG_RETURN_VOID(); } @@ -490,10 +538,13 @@ btendscan(PG_FUNCTION_ARGS) so->markItemIndex = -1; /* Release storage */ - if (so->killedItems != NULL) - pfree(so->killedItems); if (so->keyData != NULL) pfree(so->keyData); + /* so->arrayKeyData and so->arrayKeys are in arrayContext */ + if (so->arrayContext != NULL) + MemoryContextDelete(so->arrayContext); + if (so->killedItems != NULL) + pfree(so->killedItems); if (so->currTuples != NULL) pfree(so->currTuples); /* so->markTuples should not be pfree'd, see btrescan */ |