diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/utils/sort/tuplesort.c | 9 | ||||
-rw-r--r-- | src/backend/utils/sort/tuplesortvariants.c | 125 | ||||
-rw-r--r-- | src/include/utils/tuplesort.h | 7 |
3 files changed, 108 insertions, 33 deletions
diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c index e5a4e5b371e..c7a6c03f975 100644 --- a/src/backend/utils/sort/tuplesort.c +++ b/src/backend/utils/sort/tuplesort.c @@ -485,9 +485,6 @@ static void tuplesort_updatemax(Tuplesortstate *state); * is to try to sort two tuples without having to follow the pointers to the * comparator or the tuple. * - * XXX: For now, these fall back to comparator functions that will compare the - * leading datum a second time. - * * XXX: For now, there is no specialization for cases where datum1 is * authoritative and we don't even need to fall back to a callback at all (that * would be true for types like int4/int8/timestamp/date, but not true for @@ -513,7 +510,7 @@ qsort_tuple_unsigned_compare(SortTuple *a, SortTuple *b, Tuplesortstate *state) if (state->base.onlyKey != NULL) return 0; - return state->base.comparetup(a, b, state); + return state->base.comparetup_tiebreak(a, b, state); } #if SIZEOF_DATUM >= 8 @@ -537,7 +534,7 @@ qsort_tuple_signed_compare(SortTuple *a, SortTuple *b, Tuplesortstate *state) if (state->base.onlyKey != NULL) return 0; - return state->base.comparetup(a, b, state); + return state->base.comparetup_tiebreak(a, b, state); } #endif @@ -561,7 +558,7 @@ qsort_tuple_int32_compare(SortTuple *a, SortTuple *b, Tuplesortstate *state) if (state->base.onlyKey != NULL) return 0; - return state->base.comparetup(a, b, state); + return state->base.comparetup_tiebreak(a, b, state); } /* diff --git a/src/backend/utils/sort/tuplesortvariants.c b/src/backend/utils/sort/tuplesortvariants.c index eb6cfcfd002..84442a93c5a 100644 --- a/src/backend/utils/sort/tuplesortvariants.c +++ b/src/backend/utils/sort/tuplesortvariants.c @@ -47,26 +47,36 @@ static void removeabbrev_datum(Tuplesortstate *state, SortTuple *stups, int count); static int comparetup_heap(const SortTuple *a, const SortTuple *b, Tuplesortstate *state); +static int comparetup_heap_tiebreak(const SortTuple *a, const SortTuple *b, + Tuplesortstate *state); static void writetup_heap(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup); static void readtup_heap(Tuplesortstate *state, SortTuple *stup, LogicalTape *tape, unsigned int len); static int comparetup_cluster(const SortTuple *a, const SortTuple *b, Tuplesortstate *state); +static int comparetup_cluster_tiebreak(const SortTuple *a, const SortTuple *b, + Tuplesortstate *state); static void writetup_cluster(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup); static void readtup_cluster(Tuplesortstate *state, SortTuple *stup, LogicalTape *tape, unsigned int tuplen); static int comparetup_index_btree(const SortTuple *a, const SortTuple *b, Tuplesortstate *state); +static int comparetup_index_btree_tiebreak(const SortTuple *a, const SortTuple *b, + Tuplesortstate *state); static int comparetup_index_hash(const SortTuple *a, const SortTuple *b, Tuplesortstate *state); +static int comparetup_index_hash_tiebreak(const SortTuple *a, const SortTuple *b, + Tuplesortstate *state); static void writetup_index(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup); static void readtup_index(Tuplesortstate *state, SortTuple *stup, LogicalTape *tape, unsigned int len); static int comparetup_datum(const SortTuple *a, const SortTuple *b, Tuplesortstate *state); +static int comparetup_datum_tiebreak(const SortTuple *a, const SortTuple *b, + Tuplesortstate *state); static void writetup_datum(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup); static void readtup_datum(Tuplesortstate *state, SortTuple *stup, @@ -165,6 +175,7 @@ tuplesort_begin_heap(TupleDesc tupDesc, base->removeabbrev = removeabbrev_heap; base->comparetup = comparetup_heap; + base->comparetup_tiebreak = comparetup_heap_tiebreak; base->writetup = writetup_heap; base->readtup = readtup_heap; base->haveDatum1 = true; @@ -242,6 +253,7 @@ tuplesort_begin_cluster(TupleDesc tupDesc, base->removeabbrev = removeabbrev_cluster; base->comparetup = comparetup_cluster; + base->comparetup_tiebreak = comparetup_cluster_tiebreak; base->writetup = writetup_cluster; base->readtup = readtup_cluster; base->freestate = freestate_cluster; @@ -351,6 +363,7 @@ tuplesort_begin_index_btree(Relation heapRel, base->removeabbrev = removeabbrev_index; base->comparetup = comparetup_index_btree; + base->comparetup_tiebreak = comparetup_index_btree_tiebreak; base->writetup = writetup_index; base->readtup = readtup_index; base->haveDatum1 = true; @@ -431,6 +444,7 @@ tuplesort_begin_index_hash(Relation heapRel, base->removeabbrev = removeabbrev_index; base->comparetup = comparetup_index_hash; + base->comparetup_tiebreak = comparetup_index_hash_tiebreak; base->writetup = writetup_index; base->readtup = readtup_index; base->haveDatum1 = true; @@ -476,6 +490,7 @@ tuplesort_begin_index_gist(Relation heapRel, base->removeabbrev = removeabbrev_index; base->comparetup = comparetup_index_btree; + base->comparetup_tiebreak = comparetup_index_btree_tiebreak; base->writetup = writetup_index; base->readtup = readtup_index; base->haveDatum1 = true; @@ -546,6 +561,7 @@ tuplesort_begin_datum(Oid datumType, Oid sortOperator, Oid sortCollation, base->removeabbrev = removeabbrev_datum; base->comparetup = comparetup_datum; + base->comparetup_tiebreak = comparetup_datum_tiebreak; base->writetup = writetup_datum; base->readtup = readtup_datum; base->haveDatum1 = true; @@ -931,16 +947,7 @@ comparetup_heap(const SortTuple *a, const SortTuple *b, Tuplesortstate *state) { TuplesortPublic *base = TuplesortstateGetPublic(state); SortSupport sortKey = base->sortKeys; - HeapTupleData ltup; - HeapTupleData rtup; - TupleDesc tupDesc; - int nkey; int32 compare; - AttrNumber attno; - Datum datum1, - datum2; - bool isnull1, - isnull2; /* Compare the leading sort key */ @@ -951,6 +958,25 @@ comparetup_heap(const SortTuple *a, const SortTuple *b, Tuplesortstate *state) return compare; /* Compare additional sort keys */ + return comparetup_heap_tiebreak(a, b, state); +} + +static int +comparetup_heap_tiebreak(const SortTuple *a, const SortTuple *b, Tuplesortstate *state) +{ + TuplesortPublic *base = TuplesortstateGetPublic(state); + SortSupport sortKey = base->sortKeys; + HeapTupleData ltup; + HeapTupleData rtup; + TupleDesc tupDesc; + int nkey; + int32 compare; + AttrNumber attno; + Datum datum1, + datum2; + bool isnull1, + isnull2; + ltup.t_len = ((MinimalTuple) a->tuple)->t_len + MINIMAL_TUPLE_OFFSET; ltup.t_data = (HeapTupleHeader) ((char *) a->tuple - MINIMAL_TUPLE_OFFSET); rtup.t_len = ((MinimalTuple) b->tuple)->t_len + MINIMAL_TUPLE_OFFSET; @@ -1063,19 +1089,39 @@ comparetup_cluster(const SortTuple *a, const SortTuple *b, Tuplesortstate *state) { TuplesortPublic *base = TuplesortstateGetPublic(state); + SortSupport sortKey = base->sortKeys; + int32 compare; + + /* Compare the leading sort key, if it's simple */ + if (base->haveDatum1) + { + compare = ApplySortComparator(a->datum1, a->isnull1, + b->datum1, b->isnull1, + sortKey); + if (compare != 0) + return compare; + } + + return comparetup_cluster_tiebreak(a, b, state); +} + +static int +comparetup_cluster_tiebreak(const SortTuple *a, const SortTuple *b, + Tuplesortstate *state) +{ + TuplesortPublic *base = TuplesortstateGetPublic(state); TuplesortClusterArg *arg = (TuplesortClusterArg *) base->arg; SortSupport sortKey = base->sortKeys; HeapTuple ltup; HeapTuple rtup; TupleDesc tupDesc; int nkey; - int32 compare; + int32 compare = 0; Datum datum1, datum2; bool isnull1, isnull2; - /* Be prepared to compare additional sort keys */ ltup = (HeapTuple) a->tuple; rtup = (HeapTuple) b->tuple; tupDesc = arg->tupDesc; @@ -1083,12 +1129,6 @@ comparetup_cluster(const SortTuple *a, const SortTuple *b, /* Compare the leading sort key, if it's simple */ if (base->haveDatum1) { - compare = ApplySortComparator(a->datum1, a->isnull1, - b->datum1, b->isnull1, - sortKey); - if (compare != 0) - return compare; - if (sortKey->abbrev_converter) { AttrNumber leading = arg->indexInfo->ii_IndexAttrNumbers[0]; @@ -1269,6 +1309,25 @@ comparetup_index_btree(const SortTuple *a, const SortTuple *b, * treatment for equal keys at the end. */ TuplesortPublic *base = TuplesortstateGetPublic(state); + SortSupport sortKey = base->sortKeys; + int32 compare; + + /* Compare the leading sort key */ + compare = ApplySortComparator(a->datum1, a->isnull1, + b->datum1, b->isnull1, + sortKey); + if (compare != 0) + return compare; + + /* Compare additional sort keys */ + return comparetup_index_btree_tiebreak(a, b, state); +} + +static int +comparetup_index_btree_tiebreak(const SortTuple *a, const SortTuple *b, + Tuplesortstate *state) +{ + TuplesortPublic *base = TuplesortstateGetPublic(state); TuplesortIndexBTreeArg *arg = (TuplesortIndexBTreeArg *) base->arg; SortSupport sortKey = base->sortKeys; IndexTuple tuple1; @@ -1283,15 +1342,6 @@ comparetup_index_btree(const SortTuple *a, const SortTuple *b, bool isnull1, isnull2; - - /* Compare the leading sort key */ - compare = ApplySortComparator(a->datum1, a->isnull1, - b->datum1, b->isnull1, - sortKey); - if (compare != 0) - return compare; - - /* Compare additional sort keys */ tuple1 = (IndexTuple) a->tuple; tuple2 = (IndexTuple) b->tuple; keysz = base->nKeys; @@ -1467,6 +1517,19 @@ comparetup_index_hash(const SortTuple *a, const SortTuple *b, return 0; } +/* + * Sorting for hash indexes only uses one sort key, so this shouldn't ever be + * called. It's only here for consistency. + */ +static int +comparetup_index_hash_tiebreak(const SortTuple *a, const SortTuple *b, + Tuplesortstate *state) +{ + Assert(false); + + return 0; +} + static void writetup_index(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup) { @@ -1526,8 +1589,16 @@ comparetup_datum(const SortTuple *a, const SortTuple *b, Tuplesortstate *state) if (compare != 0) return compare; - /* if we have abbreviations, then "tuple" has the original value */ + return comparetup_datum_tiebreak(a, b, state); +} +static int +comparetup_datum_tiebreak(const SortTuple *a, const SortTuple *b, Tuplesortstate *state) +{ + TuplesortPublic *base = TuplesortstateGetPublic(state); + int32 compare = 0; + + /* if we have abbreviations, then "tuple" has the original value */ if (base->sortKeys->abbrev_converter) compare = ApplySortAbbrevFullComparator(PointerGetDatum(a->tuple), a->isnull1, PointerGetDatum(b->tuple), b->isnull1, diff --git a/src/include/utils/tuplesort.h b/src/include/utils/tuplesort.h index af057b6358e..3f71c70f175 100644 --- a/src/include/utils/tuplesort.h +++ b/src/include/utils/tuplesort.h @@ -163,6 +163,13 @@ typedef struct SortTupleComparator comparetup; /* + * Fall back to the full tuple for comparison, but only compare the first + * sortkey if it was abbreviated. Otherwise, only compare second and later + * sortkeys. + */ + SortTupleComparator comparetup_tiebreak; + + /* * Alter datum1 representation in the SortTuple's array back from the * abbreviated key to the first column value. */ |