diff options
Diffstat (limited to 'src/backend/executor/nodeIncrementalSort.c')
-rw-r--r-- | src/backend/executor/nodeIncrementalSort.c | 1263 |
1 files changed, 1263 insertions, 0 deletions
diff --git a/src/backend/executor/nodeIncrementalSort.c b/src/backend/executor/nodeIncrementalSort.c new file mode 100644 index 00000000000..bcab7c054c1 --- /dev/null +++ b/src/backend/executor/nodeIncrementalSort.c @@ -0,0 +1,1263 @@ +/*------------------------------------------------------------------------- + * + * nodeIncrementalSort.c + * Routines to handle incremental sorting of relations. + * + * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/executor/nodeIncrementalSort.c + * + * DESCRIPTION + * + * Incremental sort is an optimized variant of multikey sort for cases + * when the input is already sorted by a prefix of the sort keys. For + * example when a sort by (key1, key2 ... keyN) is requested, and the + * input is already sorted by (key1, key2 ... keyM), M < N, we can + * divide the input into groups where keys (key1, ... keyM) are equal, + * and only sort on the remaining columns. + * + * Consider the following example. We have input tuples consisting of + * two integers (X, Y) already presorted by X, while it's required to + * sort them by both X and Y. Let input tuples be following. + * + * (1, 5) + * (1, 2) + * (2, 9) + * (2, 1) + * (2, 5) + * (3, 3) + * (3, 7) + * + * An incremental sort algorithm would split the input into the following + * groups, which have equal X, and then sort them by Y individually: + * + * (1, 5) (1, 2) + * (2, 9) (2, 1) (2, 5) + * (3, 3) (3, 7) + * + * After sorting these groups and putting them altogether, we would get + * the following result which is sorted by X and Y, as requested: + * + * (1, 2) + * (1, 5) + * (2, 1) + * (2, 5) + * (2, 9) + * (3, 3) + * (3, 7) + * + * Incremental sort may be more efficient than plain sort, particularly + * on large datasets, as it reduces the amount of data to sort at once, + * making it more likely it fits into work_mem (eliminating the need to + * spill to disk). But the main advantage of incremental sort is that + * it can start producing rows early, before sorting the whole dataset, + * which is a significant benefit especially for queries with LIMIT. + * + * The algorithm we've implemented here is modified from the theoretical + * base described above by operating in two different modes: + * - Fetching a minimum number of tuples without checking prefix key + * group membership and sorting on all columns when safe. + * - Fetching all tuples for a single prefix key group and sorting on + * solely the unsorted columns. + * We always begin in the first mode, and employ a heuristic to switch + * into the second mode if we believe it's beneficial. + * + * Sorting incrementally can potentially use less memory, avoid fetching + * and sorting all tuples in the the dataset, and begin returning tuples + * before the entire result set is available. + * + * The hybrid mode approach allows us to optimize for both very small + * groups (where the overhead of a new tuplesort is high) and very large + * groups (where we can lower cost by not having to sort on already sorted + * columns), albeit at some extra cost while switching between modes. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/htup_details.h" +#include "executor/execdebug.h" +#include "executor/nodeIncrementalSort.h" +#include "miscadmin.h" +#include "utils/lsyscache.h" +#include "utils/tuplesort.h" + +/* + * We need to store the instrumentation information in either local node's sort + * info or, for a parallel worker process, in the shared info (this avoids + * having to additionally memcpy the info from local memory to shared memory + * at each instrumentation call). This macro expands to choose the proper sort + * state and group info. + * + * Arguments: + * - node: type IncrementalSortState * + * - groupName: the token fullsort or prefixsort + */ +#define INSTRUMENT_SORT_GROUP(node, groupName) \ + if (node->ss.ps.instrument != NULL) \ + { \ + if (node->shared_info && node->am_worker) \ + { \ + Assert(IsParallelWorker()); \ + Assert(ParallelWorkerNumber <= node->shared_info->num_workers); \ + instrumentSortedGroup(&node->shared_info->sinfo[ParallelWorkerNumber].groupName##GroupInfo, node->groupName##_state); \ + } else { \ + instrumentSortedGroup(&node->incsort_info.groupName##GroupInfo, node->groupName##_state); \ + } \ + } + +/* ---------------------------------------------------------------- + * instrumentSortedGroup + * + * Because incremental sort processes (potentially many) sort batches, we need + * to capture tuplesort stats each time we finalize a sort state. This summary + * data is later used for EXPLAIN ANALYZE output. + * ---------------------------------------------------------------- + */ +static void +instrumentSortedGroup(IncrementalSortGroupInfo *groupInfo, + Tuplesortstate *sortState) +{ + TuplesortInstrumentation sort_instr; + groupInfo->groupCount++; + + tuplesort_get_stats(sortState, &sort_instr); + + /* Calculate total and maximum memory and disk space used. */ + switch (sort_instr.spaceType) + { + case SORT_SPACE_TYPE_DISK: + groupInfo->totalDiskSpaceUsed += sort_instr.spaceUsed; + if (sort_instr.spaceUsed > groupInfo->maxDiskSpaceUsed) + groupInfo->maxDiskSpaceUsed = sort_instr.spaceUsed; + + break; + case SORT_SPACE_TYPE_MEMORY: + groupInfo->totalMemorySpaceUsed += sort_instr.spaceUsed; + if (sort_instr.spaceUsed > groupInfo->maxMemorySpaceUsed) + groupInfo->maxMemorySpaceUsed = sort_instr.spaceUsed; + + break; + } + + /* Track each sort method we've used. */ + groupInfo->sortMethods |= sort_instr.sortMethod; +} + +/* ---------------------------------------------------------------- + * preparePresortedCols + * + * Prepare information for presorted_keys comparisons. + * ---------------------------------------------------------------- + */ +static void +preparePresortedCols(IncrementalSortState *node) +{ + IncrementalSort *plannode = castNode(IncrementalSort, node->ss.ps.plan); + + node->presorted_keys = + (PresortedKeyData *) palloc(plannode->nPresortedCols * + sizeof(PresortedKeyData)); + + /* Pre-cache comparison functions for each pre-sorted key. */ + for (int i = 0; i < plannode->nPresortedCols; i++) + { + Oid equalityOp, + equalityFunc; + PresortedKeyData *key; + + key = &node->presorted_keys[i]; + key->attno = plannode->sort.sortColIdx[i]; + + equalityOp = get_equality_op_for_ordering_op(plannode->sort.sortOperators[i], + NULL); + if (!OidIsValid(equalityOp)) + elog(ERROR, "missing equality operator for ordering operator %u", + plannode->sort.sortOperators[i]); + + equalityFunc = get_opcode(equalityOp); + if (!OidIsValid(equalityFunc)) + elog(ERROR, "missing function for operator %u", equalityOp); + + /* Lookup the comparison function */ + fmgr_info_cxt(equalityFunc, &key->flinfo, CurrentMemoryContext); + + /* We can initialize the callinfo just once and re-use it */ + key->fcinfo = palloc0(SizeForFunctionCallInfo(2)); + InitFunctionCallInfoData(*key->fcinfo, &key->flinfo, 2, + plannode->sort.collations[i], NULL, NULL); + key->fcinfo->args[0].isnull = false; + key->fcinfo->args[1].isnull = false; + } +} + +/* ---------------------------------------------------------------- + * isCurrentGroup + * + * Check whether a given tuple belongs to the current sort group by comparing + * the presorted column values to the pivot tuple of the current group. + * ---------------------------------------------------------------- + */ +static bool +isCurrentGroup(IncrementalSortState *node, TupleTableSlot *pivot, TupleTableSlot *tuple) +{ + int nPresortedCols; + + nPresortedCols = castNode(IncrementalSort, node->ss.ps.plan)->nPresortedCols; + + /* + * That the input is sorted by keys * (0, ... n) implies that the tail + * keys are more likely to change. Therefore we do our comparison starting + * from the last pre-sorted column to optimize for early detection of + * inequality and minimizing the number of function calls.. + */ + for (int i = nPresortedCols - 1; i >= 0; i--) + { + Datum datumA, + datumB, + result; + bool isnullA, + isnullB; + AttrNumber attno = node->presorted_keys[i].attno; + PresortedKeyData *key; + + datumA = slot_getattr(pivot, attno, &isnullA); + datumB = slot_getattr(tuple, attno, &isnullB); + + /* Special case for NULL-vs-NULL, else use standard comparison */ + if (isnullA || isnullB) + { + if (isnullA == isnullB) + continue; + else + return false; + } + + key = &node->presorted_keys[i]; + + key->fcinfo->args[0].value = datumA; + key->fcinfo->args[1].value = datumB; + + /* just for paranoia's sake, we reset isnull each time */ + key->fcinfo->isnull = false; + + result = FunctionCallInvoke(key->fcinfo); + + /* Check for null result, since caller is clearly not expecting one */ + if (key->fcinfo->isnull) + elog(ERROR, "function %u returned NULL", key->flinfo.fn_oid); + + if (!DatumGetBool(result)) + return false; + } + return true; +} + +/* ---------------------------------------------------------------- + * switchToPresortedPrefixMode + * + * When we determine that we've likely encountered a large batch of tuples all + * having the same presorted prefix values, we want to optimize tuplesort by + * only sorting on unsorted suffix keys. + * + * The problem is that we've already accumulated several tuples in another + * tuplesort configured to sort by all columns (assuming that there may be + * more than one prefix key group). So to switch to presorted prefix mode we + * have to go back and look at all the tuples we've already accumulated to + * verify they're all part of the same prefix key group before sorting them + * solely by unsorted suffix keys. + * + * While it's likely that all already fetch tuples are all part of a single + * prefix group, we also have to handle the possibility that there is at least + * one different prefix key group before the large prefix key group. + * ---------------------------------------------------------------- + */ +static void +switchToPresortedPrefixMode(PlanState *pstate) +{ + IncrementalSortState *node = castNode(IncrementalSortState, pstate); + ScanDirection dir; + int64 nTuples = 0; + bool lastTuple = false; + bool firstTuple = true; + TupleDesc tupDesc; + PlanState *outerNode; + IncrementalSort *plannode = castNode(IncrementalSort, node->ss.ps.plan); + + dir = node->ss.ps.state->es_direction; + outerNode = outerPlanState(node); + tupDesc = ExecGetResultType(outerNode); + + /* Configure the prefix sort state the first time around. */ + if (node->prefixsort_state == NULL) + { + Tuplesortstate *prefixsort_state; + int nPresortedCols = plannode->nPresortedCols; + + /* + * Optimize the sort by assuming the prefix columns are all equal and + * thus we only need to sort by any remaining columns. + */ + prefixsort_state = tuplesort_begin_heap(tupDesc, + plannode->sort.numCols - nPresortedCols, + &(plannode->sort.sortColIdx[nPresortedCols]), + &(plannode->sort.sortOperators[nPresortedCols]), + &(plannode->sort.collations[nPresortedCols]), + &(plannode->sort.nullsFirst[nPresortedCols]), + work_mem, + NULL, + false); + node->prefixsort_state = prefixsort_state; + } + else + { + /* Next group of presorted data */ + tuplesort_reset(node->prefixsort_state); + } + + /* + * If the current node has a bound, then it's reasonably likely that a + * large prefix key group will benefit from bounded sort, so configure the + * tuplesort to allow for that optimization. + */ + if (node->bounded) + { + SO1_printf("Setting bound on presorted prefix tuplesort to: %ld\n", + node->bound - node->bound_Done); + tuplesort_set_bound(node->prefixsort_state, + node->bound - node->bound_Done); + } + + /* + * Copy as many tuples as we can (i.e., in the same prefix key group) from + * the full sort state to the prefix sort state. + */ + for (;;) + { + lastTuple = node->n_fullsort_remaining - nTuples == 1; + + /* + * When we encounter multiple prefix key groups inside the full sort + * tuplesort we have to carry over the last read tuple into the next + * batch. + */ + if (firstTuple && !TupIsNull(node->transfer_tuple)) + { + tuplesort_puttupleslot(node->prefixsort_state, node->transfer_tuple); + nTuples++; + + /* The carried over tuple is our new group pivot tuple. */ + ExecCopySlot(node->group_pivot, node->transfer_tuple); + } + else + { + tuplesort_gettupleslot(node->fullsort_state, + ScanDirectionIsForward(dir), + false, node->transfer_tuple, NULL); + + /* + * If this is our first time through the loop, then we need to + * save the first tuple we get as our new group pivot. + */ + if (TupIsNull(node->group_pivot)) + ExecCopySlot(node->group_pivot, node->transfer_tuple); + + if (isCurrentGroup(node, node->group_pivot, node->transfer_tuple)) + { + tuplesort_puttupleslot(node->prefixsort_state, node->transfer_tuple); + nTuples++; + } + else + { + /* + * The tuple isn't part of the current batch so we need to + * carry it over into the next batch of tuples we transfer out + * of the full sort tuplesort into the presorted prefix + * tuplesort. We don't actually have to do anything special to + * save the tuple since we've already loaded it into the + * node->transfer_tuple slot, and, even though that slot + * points to memory inside the full sort tuplesort, we can't + * reset that tuplesort anyway until we've fully transferred + * out of its tuples, so this reference is safe. We do need to + * reset the group pivot tuple though since we've finished the + * current prefix key group. + */ + ExecClearTuple(node->group_pivot); + break; + } + } + + firstTuple = false; + + /* + * If we've copied all of the tuples from the full sort state into the + * prefix sort state, then we don't actually know that we've yet found + * the last tuple in that prefix key group until we check the next + * tuple from the outer plan node, so we retain the current group + * pivot tuple prefix key group comparison. + */ + if (lastTuple) + break; + } + + /* + * Track how many tuples remain in the full sort batch so that we know if + * we need to sort multiple prefix key groups before processing tuples + * remaining in the large single prefix key group we think we've + * encountered. + */ + SO1_printf("Moving %ld tuples to presorted prefix tuplesort\n", nTuples); + node->n_fullsort_remaining -= nTuples; + SO1_printf("Setting n_fullsort_remaining to %ld\n", node->n_fullsort_remaining); + + if (lastTuple) + { + /* + * We've confirmed that all tuples remaining in the full sort batch is + * in the same prefix key group and moved all of those tuples into the + * presorted prefix tuplesort. Now we can save our pivot comparison + * tuple and continue fetching tuples from the outer execution node to + * load into the presorted prefix tuplesort. + */ + ExecCopySlot(node->group_pivot, node->transfer_tuple); + SO_printf("Setting execution_status to INCSORT_LOADPREFIXSORT (switchToPresortedPrefixMode)\n"); + node->execution_status = INCSORT_LOADPREFIXSORT; + + /* + * Make sure we clear the transfer tuple slot so that next time we + * encounter a large prefix key group we don't incorrectly assume we + * have a tuple carried over from the previous group. + */ + ExecClearTuple(node->transfer_tuple); + } + else + { + /* + * We finished a group but didn't consume all of the tuples from the + * full sort state, so we'll sort this batch, let the outer node read + * out all of those tuples, and then come back around to find another + * batch. + */ + SO1_printf("Sorting presorted prefix tuplesort with %ld tuples\n", nTuples); + tuplesort_performsort(node->prefixsort_state); + + INSTRUMENT_SORT_GROUP(node, prefixsort) + + if (node->bounded) + { + /* + * If the current node has a bound and we've already sorted n + * tuples, then the functional bound remaining is (original bound + * - n), so store the current number of processed tuples for use + * in configuring sorting bound. + */ + SO2_printf("Changing bound_Done from %ld to %ld\n", + Min(node->bound, node->bound_Done + nTuples), node->bound_Done); + node->bound_Done = Min(node->bound, node->bound_Done + nTuples); + } + + SO_printf("Setting execution_status to INCSORT_READPREFIXSORT (switchToPresortedPrefixMode)\n"); + node->execution_status = INCSORT_READPREFIXSORT; + } +} + +/* + * Sorting many small groups with tuplesort is inefficient. In order to + * cope with this problem we don't start a new group until the current one + * contains at least DEFAULT_MIN_GROUP_SIZE tuples (unfortunately this also + * means we can't assume small groups of tuples all have the same prefix keys.) + * When we have a bound that's less than DEFAULT_MIN_GROUP_SIZE we start looking + * for the new group as soon as we've met our bound to avoid fetching more + * tuples than we absolutely have to fetch. + */ +#define DEFAULT_MIN_GROUP_SIZE 32 + +/* + * While we've optimized for small prefix key groups by not starting our prefix + * key comparisons until we've reached a minimum number of tuples, we don't want + * that optimization to cause us to lose out on the benefits of being able to + * assume a large group of tuples is fully presorted by its prefix keys. + * Therefore we use the DEFAULT_MAX_FULL_SORT_GROUP_SIZE cutoff as a heuristic + * for determining when we believe we've encountered a large group, and, if we + * get to that point without finding a new prefix key group we transition to + * presorted prefix key mode. + */ +#define DEFAULT_MAX_FULL_SORT_GROUP_SIZE (2 * DEFAULT_MIN_GROUP_SIZE) + +/* ---------------------------------------------------------------- + * ExecIncrementalSort + * + * Assuming that outer subtree returns tuple presorted by some prefix + * of target sort columns, performs incremental sort. + * + * Conditions: + * -- none. + * + * Initial States: + * -- the outer child is prepared to return the first tuple. + * ---------------------------------------------------------------- + */ +static TupleTableSlot * +ExecIncrementalSort(PlanState *pstate) +{ + IncrementalSortState *node = castNode(IncrementalSortState, pstate); + EState *estate; + ScanDirection dir; + Tuplesortstate *read_sortstate; + Tuplesortstate *fullsort_state; + TupleTableSlot *slot; + IncrementalSort *plannode = (IncrementalSort *) node->ss.ps.plan; + PlanState *outerNode; + TupleDesc tupDesc; + int64 nTuples = 0; + int64 minGroupSize; + + CHECK_FOR_INTERRUPTS(); + + estate = node->ss.ps.state; + dir = estate->es_direction; + fullsort_state = node->fullsort_state; + + /* + * If a previous iteration has sorted a batch, then we need to check to + * see if there are any remaining tuples in that batch that we can return + * before moving on to other execution states. + */ + if (node->execution_status == INCSORT_READFULLSORT + || node->execution_status == INCSORT_READPREFIXSORT) + { + /* + * Return next tuple from the current sorted group set if available. + */ + read_sortstate = node->execution_status == INCSORT_READFULLSORT ? + fullsort_state : node->prefixsort_state; + slot = node->ss.ps.ps_ResultTupleSlot; + + /* + * We have to populate the slot from the tuplesort before checking + * outerNodeDone because it will set the slot to NULL if no more + * tuples remain. If the tuplesort is empty, but we don't have any + * more tuples available for sort from the outer node, then + * outerNodeDone will have been set so we'll return that now-empty + * slot to the caller. + */ + if (tuplesort_gettupleslot(read_sortstate, ScanDirectionIsForward(dir), + false, slot, NULL) || node->outerNodeDone) + + /* + * Note: there isn't a good test case for the node->outerNodeDone + * check directly, but we need it for any plan where the outer + * node will fail when trying to fetch too many tuples. + */ + return slot; + else if (node->n_fullsort_remaining > 0) + { + /* + * When we transition to presorted prefix mode, we might have + * accumulated at least one additional prefix key group in the + * full sort tuplesort. The first call to + * switchToPresortedPrefixMode() will have pulled the first one of + * those groups out, and we've returned those tuples to the parent + * node, but if at this point we still have tuples remaining in + * the full sort state (i.e., n_fullsort_remaining > 0), then we + * need to re-execute the prefix mode transition function to pull + * out the next prefix key group. + */ + SO1_printf("Re-calling switchToPresortedPrefixMode() because n_fullsort_remaining is > 0 (%ld)\n", + node->n_fullsort_remaining); + switchToPresortedPrefixMode(pstate); + } + else + { + /* + * If we don't have any sorted tuples to read and we're not + * currently transitioning into presorted prefix sort mode, then + * it's time to start the process all over again by building a new + * group in the full sort state. + */ + SO_printf("Setting execution_status to INCSORT_LOADFULLSORT (n_fullsort_remaining > 0)\n"); + node->execution_status = INCSORT_LOADFULLSORT; + } + } + + /* + * Scan the subplan in the forward direction while creating the sorted + * data. + */ + estate->es_direction = ForwardScanDirection; + + outerNode = outerPlanState(node); + tupDesc = ExecGetResultType(outerNode); + + /* Load tuples into the full sort state. */ + if (node->execution_status == INCSORT_LOADFULLSORT) + { + /* + * Initialize sorting structures. + */ + if (fullsort_state == NULL) + { + /* + * Initialize presorted column support structures for + * isCurrentGroup(). It's correct to do this along with the + * initial intialization for the full sort state (and not for the + * prefix sort state) since we always load the full sort state + * first. + */ + preparePresortedCols(node); + + /* + * Since we optimize small prefix key groups by accumulating a + * minimum number of tuples before sorting, we can't assume that a + * group of tuples all have the same prefix key values. Hence we + * setup the full sort tuplesort to sort by all requested sort + * keys. + */ + fullsort_state = tuplesort_begin_heap(tupDesc, + plannode->sort.numCols, + plannode->sort.sortColIdx, + plannode->sort.sortOperators, + plannode->sort.collations, + plannode->sort.nullsFirst, + work_mem, + NULL, + false); + node->fullsort_state = fullsort_state; + } + else + { + /* Reset sort for the next batch. */ + tuplesort_reset(fullsort_state); + } + + /* + * Calculate the remaining tuples left if bounded and configure both + * bounded sort and the minimum group size accordingly. + */ + if (node->bounded) + { + int64 currentBound = node->bound - node->bound_Done; + + /* + * Bounded sort isn't likely to be a useful optimization for full + * sort mode since we limit full sort mode to a relatively small + * number of tuples and tuplesort doesn't switch over to top-n + * heap sort anyway unless it hits (2 * bound) tuples. + */ + if (currentBound < DEFAULT_MIN_GROUP_SIZE) + tuplesort_set_bound(fullsort_state, currentBound); + + minGroupSize = Min(DEFAULT_MIN_GROUP_SIZE, currentBound); + } + else + minGroupSize = DEFAULT_MIN_GROUP_SIZE; + + /* + * Because we have to read the next tuple to find out that we've + * encountered a new prefix key group, on subsequent groups we have to + * carry over that extra tuple and add it to the new group's sort here + * before we read any new tuples from the outer node. + */ + if (!TupIsNull(node->group_pivot)) + { + tuplesort_puttupleslot(fullsort_state, node->group_pivot); + nTuples++; + + /* + * We're in full sort mode accumulating a minimum number of tuples + * and not checking for prefix key equality yet, so we can't + * assume the group pivot tuple will reamin the same -- unless + * we're using a minimum group size of 1, in which case the pivot + * is obviously still the pviot. + */ + if (nTuples != minGroupSize) + ExecClearTuple(node->group_pivot); + } + + + /* + * Pull as many tuples from the outer node as possible given our + * current operating mode. + */ + for (;;) + { + slot = ExecProcNode(outerNode); + + /* + * If the outer node can't provide us any more tuples, then we can + * sort the current group and return those tuples. + */ + if (TupIsNull(slot)) + { + /* + * We need to know later if the outer node has completed to be + * able to distinguish between being done with a batch and + * being done with the whole node. + */ + node->outerNodeDone = true; + + SO1_printf("Sorting fullsort with %ld tuples\n", nTuples); + tuplesort_performsort(fullsort_state); + + INSTRUMENT_SORT_GROUP(node, fullsort) + + SO_printf("Setting execution_status to INCSORT_READFULLSORT (final tuple)\n"); + node->execution_status = INCSORT_READFULLSORT; + break; + } + + /* Accumulate the next group of presorted tuples. */ + if (nTuples < minGroupSize) + { + /* + * If we haven't yet hit our target minimum group size, then + * we don't need to bother checking for inclusion in the + * current prefix group since at this point we'll assume that + * we'll full sort this batch to avoid a large number of very + * tiny (and thus inefficient) sorts. + */ + tuplesort_puttupleslot(fullsort_state, slot); + nTuples++; + + /* + * If we've reach our minimum group size, then we need to + * store the most recent tuple as a pivot. + */ + if (nTuples == minGroupSize) + ExecCopySlot(node->group_pivot, slot); + } + else + { + /* + * If we've already accumulated enough tuples to reach our + * minimum group size, then we need to compare any additional + * tuples to our pivot tuple to see if we reach the end of + * that prefix key group. Only after we find changed prefix + * keys can we guarantee sort stability of the tuples we've + * already accumulated. + */ + if (isCurrentGroup(node, node->group_pivot, slot)) + { + /* + * As long as the prefix keys match the pivot tuple then + * load the tuple into the tuplesort. + */ + tuplesort_puttupleslot(fullsort_state, slot); + nTuples++; + } + else + { + /* + * Since the tuple we fetched isn't part of the current + * prefix key group we don't want to sort it as part of + * the current batch. Instead we use the group_pivot slot + * to carry it over to the next batch (even though we + * won't actually treat it as a group pivot). + */ + ExecCopySlot(node->group_pivot, slot); + + if (node->bounded) + { + /* + * If the current node has a bound, and we've already + * sorted n tuples, then the functional bound + * remaining is (original bound - n), so store the + * current number of processed tuples for later use + * configuring the sort state's bound. + */ + SO2_printf("Changing bound_Done from %ld to %ld\n", + node->bound_Done, + Min(node->bound, node->bound_Done + nTuples)); + node->bound_Done = Min(node->bound, node->bound_Done + nTuples); + } + + /* + * Once we find changed prefix keys we can complete the + * sort and transition modes to reading out the sorted + * tuples. + */ + SO1_printf("Sorting fullsort tuplesort with %ld tuples\n", + nTuples); + tuplesort_performsort(fullsort_state); + + INSTRUMENT_SORT_GROUP(node, fullsort) + + SO_printf("Setting execution_status to INCSORT_READFULLSORT (found end of group)\n"); + node->execution_status = INCSORT_READFULLSORT; + break; + } + } + + /* + * Unless we've alrady transitioned modes to reading from the full + * sort state, then we assume that having read at least + * DEFAULT_MAX_FULL_SORT_GROUP_SIZE tuples means it's likely we're + * processing a large group of tuples all having equal prefix keys + * (but haven't yet found the final tuple in that prefix key + * group), so we need to transition in to presorted prefix mode. + */ + if (nTuples > DEFAULT_MAX_FULL_SORT_GROUP_SIZE && + node->execution_status != INCSORT_READFULLSORT) + { + /* + * The group pivot we have stored has already been put into + * the tuplesort; we don't want to carry it over. Since we + * haven't yet found the end of the prefix key group, it might + * seem like we should keep this, but we don't actually know + * how many prefix key groups might be represented in the full + * sort state, so we'll let the mode transition function + * manage this state for us. + */ + ExecClearTuple(node->group_pivot); + + /* + * Unfortunately the tuplesort API doesn't include a way to + * retrieve tuples unless a sort has been performed, so we + * perform the sort even though we could just as easily rely + * on FIFO retrieval semantics when transferring them to the + * presorted prefix tuplesort. + */ + SO1_printf("Sorting fullsort tuplesort with %ld tuples\n", nTuples); + tuplesort_performsort(fullsort_state); + + INSTRUMENT_SORT_GROUP(node, fullsort) + + /* + * If the full sort tuplesort happened to switch into top-n + * heapsort mode then we will only be able to retrieve + * currentBound tuples (since the tuplesort will have only + * retained the top-n tuples). This is safe even though we + * haven't yet completed fetching the current prefix key group + * because the tuples we've "lost" already sorted "below" the + * retained ones, and we're already contractually guaranteed + * to not need any more than the currentBound tuples. + */ + if (tuplesort_used_bound(node->fullsort_state)) + { + int64 currentBound = node->bound - node->bound_Done; + + SO2_printf("Read %ld tuples, but setting to %ld because we used bounded sort\n", + nTuples, Min(currentBound, nTuples)); + nTuples = Min(currentBound, nTuples); + } + + SO1_printf("Setting n_fullsort_remaining to %ld and calling switchToPresortedPrefixMode()\n", + nTuples); + + /* + * We might have multiple prefix key groups in the full sort + * state, so the mode transition function needs to know the it + * needs to move from the fullsort to presorted prefix sort. + */ + node->n_fullsort_remaining = nTuples; + + /* Transition the tuples to the presorted prefix tuplesort. */ + switchToPresortedPrefixMode(pstate); + + /* + * Since we know we had tuples to move to the presorted prefix + * tuplesort, we know that unless that transition has verified + * that all tuples belonged to the same prefix key group (in + * which case we can go straight to continuing to load tuples + * into that tuplesort), we should have a tuple to return + * here. + * + * Either way, the appropriate execution status should have + * been set by switchToPresortedPrefixMode(), so we can drop + * out of the loop here and let the appropriate path kick in. + */ + break; + } + } + } + + if (node->execution_status == INCSORT_LOADPREFIXSORT) + { + /* + * We only enter this state after the mode transition function has + * confirmed all remaining tuples from the full sort state have the + * same prefix and moved those tuples to the prefix sort state. That + * function has also set a group pivot tuple (which doesn't need to be + * carried over; it's already been put into the prefix sort state). + */ + Assert(!TupIsNull(node->group_pivot)); + + /* + * Read tuples from the outer node and load them into the prefix sort + * state until we encounter a tuple whose prefix keys don't match the + * current group_pivot tuple, since we can't guarantee sort stability + * until we have all tuples matching those prefix keys. + */ + for (;;) + { + slot = ExecProcNode(outerNode); + + /* + * If we've exhausted tuples from the outer node we're done + * loading the prefix sort state. + */ + if (TupIsNull(slot)) + { + /* + * We need to know later if the outer node has completed to be + * able to distinguish between being done with a batch and + * being done with the whole node. + */ + node->outerNodeDone = true; + break; + } + + /* + * If the tuple's prefix keys match our pivot tuple, we're not + * done yet and can load it into the prefix sort state. If not, we + * don't want to sort it as part of the current batch. Instead we + * use the group_pivot slot to carry it over to the next batch + * (even though we won't actually treat it as a group pivot). + */ + if (isCurrentGroup(node, node->group_pivot, slot)) + { + tuplesort_puttupleslot(node->prefixsort_state, slot); + nTuples++; + } + else + { + ExecCopySlot(node->group_pivot, slot); + break; + } + } + + /* + * Perform the sort and begin returning the tuples to the parent plan + * node. + */ + SO1_printf("Sorting presorted prefix tuplesort with >= %ld tuples\n", nTuples); + tuplesort_performsort(node->prefixsort_state); + + INSTRUMENT_SORT_GROUP(node, prefixsort) + + SO_printf("Setting execution_status to INCSORT_READPREFIXSORT (found end of group)\n"); + node->execution_status = INCSORT_READPREFIXSORT; + + if (node->bounded) + { + /* + * If the current node has a bound, and we've already sorted n + * tuples, then the functional bound remaining is (original bound + * - n), so store the current number of processed tuples for use + * in configuring sorting bound. + */ + SO2_printf("Changing bound_Done from %ld to %ld\n", + node->bound_Done, + Min(node->bound, node->bound_Done + nTuples)); + node->bound_Done = Min(node->bound, node->bound_Done + nTuples); + } + } + + /* Restore to user specified direction. */ + estate->es_direction = dir; + + /* + * Get the first or next tuple from tuplesort. Returns NULL if no more + * tuples. + */ + read_sortstate = node->execution_status == INCSORT_READFULLSORT ? + fullsort_state : node->prefixsort_state; + slot = node->ss.ps.ps_ResultTupleSlot; + (void) tuplesort_gettupleslot(read_sortstate, ScanDirectionIsForward(dir), + false, slot, NULL); + return slot; +} + +/* ---------------------------------------------------------------- + * ExecInitIncrementalSort + * + * Creates the run-time state information for the sort node + * produced by the planner and initializes its outer subtree. + * ---------------------------------------------------------------- + */ +IncrementalSortState * +ExecInitIncrementalSort(IncrementalSort *node, EState *estate, int eflags) +{ + IncrementalSortState *incrsortstate; + + SO_printf("ExecInitIncrementalSort: initializing sort node\n"); + + /* + * Incremental sort can't be used with either EXEC_FLAG_REWIND, + * EXEC_FLAG_BACKWARD or EXEC_FLAG_MARK, because we only one of many sort + * batches in the current sort state. + */ + Assert((eflags & (EXEC_FLAG_BACKWARD | + EXEC_FLAG_MARK)) == 0); + + /* Initialize state structure. */ + incrsortstate = makeNode(IncrementalSortState); + incrsortstate->ss.ps.plan = (Plan *) node; + incrsortstate->ss.ps.state = estate; + incrsortstate->ss.ps.ExecProcNode = ExecIncrementalSort; + + incrsortstate->execution_status = INCSORT_LOADFULLSORT; + incrsortstate->bounded = false; + incrsortstate->outerNodeDone = false; + incrsortstate->bound_Done = 0; + incrsortstate->fullsort_state = NULL; + incrsortstate->prefixsort_state = NULL; + incrsortstate->group_pivot = NULL; + incrsortstate->transfer_tuple = NULL; + incrsortstate->n_fullsort_remaining = 0; + incrsortstate->presorted_keys = NULL; + + if (incrsortstate->ss.ps.instrument != NULL) + { + IncrementalSortGroupInfo *fullsortGroupInfo = + &incrsortstate->incsort_info.fullsortGroupInfo; + IncrementalSortGroupInfo *prefixsortGroupInfo = + &incrsortstate->incsort_info.prefixsortGroupInfo; + + fullsortGroupInfo->groupCount = 0; + fullsortGroupInfo->maxDiskSpaceUsed = 0; + fullsortGroupInfo->totalDiskSpaceUsed = 0; + fullsortGroupInfo->maxMemorySpaceUsed = 0; + fullsortGroupInfo->totalMemorySpaceUsed = 0; + fullsortGroupInfo->sortMethods = 0; + prefixsortGroupInfo->groupCount = 0; + prefixsortGroupInfo->maxDiskSpaceUsed = 0; + prefixsortGroupInfo->totalDiskSpaceUsed = 0; + prefixsortGroupInfo->maxMemorySpaceUsed = 0; + prefixsortGroupInfo->totalMemorySpaceUsed = 0; + prefixsortGroupInfo->sortMethods = 0; + } + + /* + * Miscellaneous initialization + * + * Sort nodes don't initialize their ExprContexts because they never call + * ExecQual or ExecProject. + */ + + /* + * Initialize child nodes. + * + * We shield the child node from the need to support REWIND, BACKWARD, or + * MARK/RESTORE. + */ + eflags &= ~(EXEC_FLAG_REWIND | EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK); + + outerPlanState(incrsortstate) = ExecInitNode(outerPlan(node), estate, eflags); + + /* + * Initialize scan slot and type. + */ + ExecCreateScanSlotFromOuterPlan(estate, &incrsortstate->ss, &TTSOpsMinimalTuple); + + /* + * Initialize return slot and type. No need to initialize projection info + * because we don't do any projections. + */ + ExecInitResultTupleSlotTL(&incrsortstate->ss.ps, &TTSOpsMinimalTuple); + incrsortstate->ss.ps.ps_ProjInfo = NULL; + + /* + * Initialize standalone slots to store a tuple for pivot prefix keys and + * for carrying over a tuple from one batch to the next. + */ + incrsortstate->group_pivot = + MakeSingleTupleTableSlot(ExecGetResultType(outerPlanState(incrsortstate)), + &TTSOpsMinimalTuple); + incrsortstate->transfer_tuple = + MakeSingleTupleTableSlot(ExecGetResultType(outerPlanState(incrsortstate)), + &TTSOpsMinimalTuple); + + SO_printf("ExecInitIncrementalSort: sort node initialized\n"); + + return incrsortstate; +} + +/* ---------------------------------------------------------------- + * ExecEndIncrementalSort(node) + * ---------------------------------------------------------------- + */ +void +ExecEndIncrementalSort(IncrementalSortState *node) +{ + SO_printf("ExecEndIncrementalSort: shutting down sort node\n"); + + /* clean out the scan tuple */ + ExecClearTuple(node->ss.ss_ScanTupleSlot); + /* must drop pointer to sort result tuple */ + ExecClearTuple(node->ss.ps.ps_ResultTupleSlot); + /* must drop stanalone tuple slots from outer node */ + ExecDropSingleTupleTableSlot(node->group_pivot); + ExecDropSingleTupleTableSlot(node->transfer_tuple); + + /* + * Release tuplesort resources. + */ + if (node->fullsort_state != NULL) + { + tuplesort_end(node->fullsort_state); + node->fullsort_state = NULL; + } + if (node->prefixsort_state != NULL) + { + tuplesort_end(node->prefixsort_state); + node->prefixsort_state = NULL; + } + + /* + * Shut down the subplan. + */ + ExecEndNode(outerPlanState(node)); + + SO_printf("ExecEndIncrementalSort: sort node shutdown\n"); +} + +void +ExecReScanIncrementalSort(IncrementalSortState *node) +{ + PlanState *outerPlan = outerPlanState(node); + + /* + * Incremental sort doesn't support efficient rescan even when paramters + * haven't changed (e.g., rewind) because unlike regular sort we don't + * store all tuples at once for the full sort. + * + * So even if EXEC_FLAG_REWIND is set we just reset all of our state and + * reexecute the sort along with the child node below us. + * + * In theory if we've only fill the full sort with one batch (and haven't + * reset it for a new batch yet) then we could efficiently rewind, but + * that seems a narrow enough case that it's not worth handling specially + * at this time. + */ + + /* must drop pointer to sort result tuple */ + ExecClearTuple(node->ss.ps.ps_ResultTupleSlot); + + if (node->group_pivot != NULL) + ExecClearTuple(node->group_pivot); + if (node->transfer_tuple != NULL) + ExecClearTuple(node->transfer_tuple); + + node->bounded = false; + node->outerNodeDone = false; + node->n_fullsort_remaining = 0; + node->bound_Done = 0; + node->presorted_keys = NULL; + + node->execution_status = INCSORT_LOADFULLSORT; + + /* + * If we've set up either of the sort states yet, we need to reset them. + * We could end them and null out the pointers, but there's no reason to + * repay the setup cost, and because guard setting up pivot comparator + * state similarly, doing so might actually cause a leak. + */ + if (node->fullsort_state != NULL) + { + tuplesort_reset(node->fullsort_state); + node->fullsort_state = NULL; + } + if (node->prefixsort_state != NULL) + { + tuplesort_reset(node->prefixsort_state); + node->prefixsort_state = NULL; + } + + /* + * If chgParam of subnode is not null, theni the plan will be re-scanned + * by the first ExecProcNode. + */ + if (outerPlan->chgParam == NULL) + ExecReScan(outerPlan); +} + +/* ---------------------------------------------------------------- + * Parallel Query Support + * ---------------------------------------------------------------- + */ + +/* ---------------------------------------------------------------- + * ExecSortEstimate + * + * Estimate space required to propagate sort statistics. + * ---------------------------------------------------------------- + */ +void +ExecIncrementalSortEstimate(IncrementalSortState *node, ParallelContext *pcxt) +{ + Size size; + + /* don't need this if not instrumenting or no workers */ + if (!node->ss.ps.instrument || pcxt->nworkers == 0) + return; + + size = mul_size(pcxt->nworkers, sizeof(IncrementalSortInfo)); + size = add_size(size, offsetof(SharedIncrementalSortInfo, sinfo)); + shm_toc_estimate_chunk(&pcxt->estimator, size); + shm_toc_estimate_keys(&pcxt->estimator, 1); +} + +/* ---------------------------------------------------------------- + * ExecSortInitializeDSM + * + * Initialize DSM space for sort statistics. + * ---------------------------------------------------------------- + */ +void +ExecIncrementalSortInitializeDSM(IncrementalSortState *node, ParallelContext *pcxt) +{ + Size size; + + /* don't need this if not instrumenting or no workers */ + if (!node->ss.ps.instrument || pcxt->nworkers == 0) + return; + + size = offsetof(SharedIncrementalSortInfo, sinfo) + + pcxt->nworkers * sizeof(IncrementalSortInfo); + node->shared_info = shm_toc_allocate(pcxt->toc, size); + /* ensure any unfilled slots will contain zeroes */ + memset(node->shared_info, 0, size); + node->shared_info->num_workers = pcxt->nworkers; + shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, + node->shared_info); +} + +/* ---------------------------------------------------------------- + * ExecSortInitializeWorker + * + * Attach worker to DSM space for sort statistics. + * ---------------------------------------------------------------- + */ +void +ExecIncrementalSortInitializeWorker(IncrementalSortState *node, ParallelWorkerContext *pwcxt) +{ + node->shared_info = + shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, true); + node->am_worker = true; +} + +/* ---------------------------------------------------------------- + * ExecSortRetrieveInstrumentation + * + * Transfer sort statistics from DSM to private memory. + * ---------------------------------------------------------------- + */ +void +ExecIncrementalSortRetrieveInstrumentation(IncrementalSortState *node) +{ + Size size; + SharedIncrementalSortInfo *si; + + if (node->shared_info == NULL) + return; + + size = offsetof(SharedIncrementalSortInfo, sinfo) + + node->shared_info->num_workers * sizeof(IncrementalSortInfo); + si = palloc(size); + memcpy(si, node->shared_info, size); + node->shared_info = si; +} |