diff options
Diffstat (limited to 'src/backend/utils/sort/tuplesort.c')
-rw-r--r-- | src/backend/utils/sort/tuplesort.c | 229 |
1 files changed, 107 insertions, 122 deletions
diff --git a/src/backend/utils/sort/tuplesort.c b/src/backend/utils/sort/tuplesort.c index b17347b2141..d5930f258d9 100644 --- a/src/backend/utils/sort/tuplesort.c +++ b/src/backend/utils/sort/tuplesort.c @@ -262,6 +262,7 @@ struct Tuplesortstate MemoryContext sortcontext; /* memory context holding most sort data */ MemoryContext tuplecontext; /* sub-context of sortcontext for tuple data */ LogicalTapeSet *tapeset; /* logtape.c object for tapes in a temp file */ + LogicalTape **tapes; /* * These function pointers decouple the routines that must know what kind @@ -290,7 +291,7 @@ struct Tuplesortstate * SortTuple struct!), and increase state->availMem by the amount of * memory space thereby released. */ - void (*writetup) (Tuplesortstate *state, int tapenum, + void (*writetup) (Tuplesortstate *state, LogicalTape *tape, SortTuple *stup); /* @@ -299,7 +300,7 @@ struct Tuplesortstate * from the slab memory arena, or is palloc'd, see readtup_alloc(). */ void (*readtup) (Tuplesortstate *state, SortTuple *stup, - int tapenum, unsigned int len); + LogicalTape *tape, unsigned int len); /* * This array holds the tuples now in sort memory. If we are in state @@ -393,7 +394,7 @@ struct Tuplesortstate * the next tuple to return. (In the tape case, the tape's current read * position is also critical state.) */ - int result_tape; /* actual tape number of finished output */ + LogicalTape *result_tape; /* tape of finished output */ int current; /* array index (only used if SORTEDINMEM) */ bool eof_reached; /* reached EOF (needed for cursors) */ @@ -599,9 +600,9 @@ struct Sharedsort */ /* When using this macro, beware of double evaluation of len */ -#define LogicalTapeReadExact(tapeset, tapenum, ptr, len) \ +#define LogicalTapeReadExact(tape, ptr, len) \ do { \ - if (LogicalTapeRead(tapeset, tapenum, ptr, len) != (size_t) (len)) \ + if (LogicalTapeRead(tape, ptr, len) != (size_t) (len)) \ elog(ERROR, "unexpected end of data"); \ } while(0) @@ -619,7 +620,7 @@ static void init_slab_allocator(Tuplesortstate *state, int numSlots); static void mergeruns(Tuplesortstate *state); static void mergeonerun(Tuplesortstate *state); static void beginmerge(Tuplesortstate *state); -static bool mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup); +static bool mergereadnext(Tuplesortstate *state, int srcTapeIndex, SortTuple *stup); static void dumptuples(Tuplesortstate *state, bool alltuples); static void make_bounded_heap(Tuplesortstate *state); static void sort_bounded_heap(Tuplesortstate *state); @@ -628,39 +629,39 @@ static void tuplesort_heap_insert(Tuplesortstate *state, SortTuple *tuple); static void tuplesort_heap_replace_top(Tuplesortstate *state, SortTuple *tuple); static void tuplesort_heap_delete_top(Tuplesortstate *state); static void reversedirection(Tuplesortstate *state); -static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK); -static void markrunend(Tuplesortstate *state, int tapenum); +static unsigned int getlen(LogicalTape *tape, bool eofOK); +static void markrunend(LogicalTape *tape); static void *readtup_alloc(Tuplesortstate *state, Size tuplen); static int comparetup_heap(const SortTuple *a, const SortTuple *b, Tuplesortstate *state); static void copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup); -static void writetup_heap(Tuplesortstate *state, int tapenum, +static void writetup_heap(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup); static void readtup_heap(Tuplesortstate *state, SortTuple *stup, - int tapenum, unsigned int len); + LogicalTape *tape, unsigned int len); static int comparetup_cluster(const SortTuple *a, const SortTuple *b, Tuplesortstate *state); static void copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup); -static void writetup_cluster(Tuplesortstate *state, int tapenum, +static void writetup_cluster(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup); static void readtup_cluster(Tuplesortstate *state, SortTuple *stup, - int tapenum, unsigned int len); + LogicalTape *tape, unsigned int len); static int comparetup_index_btree(const SortTuple *a, const SortTuple *b, Tuplesortstate *state); static int comparetup_index_hash(const SortTuple *a, const SortTuple *b, Tuplesortstate *state); static void copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup); -static void writetup_index(Tuplesortstate *state, int tapenum, +static void writetup_index(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup); static void readtup_index(Tuplesortstate *state, SortTuple *stup, - int tapenum, unsigned int len); + LogicalTape *tape, unsigned int len); static int comparetup_datum(const SortTuple *a, const SortTuple *b, Tuplesortstate *state); static void copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup); -static void writetup_datum(Tuplesortstate *state, int tapenum, +static void writetup_datum(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup); static void readtup_datum(Tuplesortstate *state, SortTuple *stup, - int tapenum, unsigned int len); + LogicalTape *tape, unsigned int len); static int worker_get_identifier(Tuplesortstate *state); static void worker_freeze_result_tape(Tuplesortstate *state); static void worker_nomergeruns(Tuplesortstate *state); @@ -888,7 +889,7 @@ tuplesort_begin_batch(Tuplesortstate *state) * inittapes(), if needed */ - state->result_tape = -1; /* flag that result tape has not been formed */ + state->result_tape = NULL; /* flag that result tape has not been formed */ MemoryContextSwitchTo(oldcontext); } @@ -2221,7 +2222,7 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward, if (state->eof_reached) return false; - if ((tuplen = getlen(state, state->result_tape, true)) != 0) + if ((tuplen = getlen(state->result_tape, true)) != 0) { READTUP(state, stup, state->result_tape, tuplen); @@ -2254,8 +2255,7 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward, * end of file; back up to fetch last tuple's ending length * word. If seek fails we must have a completely empty file. */ - nmoved = LogicalTapeBackspace(state->tapeset, - state->result_tape, + nmoved = LogicalTapeBackspace(state->result_tape, 2 * sizeof(unsigned int)); if (nmoved == 0) return false; @@ -2269,20 +2269,18 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward, * Back up and fetch previously-returned tuple's ending length * word. If seek fails, assume we are at start of file. */ - nmoved = LogicalTapeBackspace(state->tapeset, - state->result_tape, + nmoved = LogicalTapeBackspace(state->result_tape, sizeof(unsigned int)); if (nmoved == 0) return false; else if (nmoved != sizeof(unsigned int)) elog(ERROR, "unexpected tape position"); - tuplen = getlen(state, state->result_tape, false); + tuplen = getlen(state->result_tape, false); /* * Back up to get ending length word of tuple before it. */ - nmoved = LogicalTapeBackspace(state->tapeset, - state->result_tape, + nmoved = LogicalTapeBackspace(state->result_tape, tuplen + 2 * sizeof(unsigned int)); if (nmoved == tuplen + sizeof(unsigned int)) { @@ -2299,15 +2297,14 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward, elog(ERROR, "bogus tuple length in backward scan"); } - tuplen = getlen(state, state->result_tape, false); + tuplen = getlen(state->result_tape, false); /* * Now we have the length of the prior tuple, back up and read it. * Note: READTUP expects we are positioned after the initial * length word of the tuple, so back up to that point. */ - nmoved = LogicalTapeBackspace(state->tapeset, - state->result_tape, + nmoved = LogicalTapeBackspace(state->result_tape, tuplen); if (nmoved != tuplen) elog(ERROR, "bogus tuple length in backward scan"); @@ -2365,11 +2362,10 @@ tuplesort_gettuple_common(Tuplesortstate *state, bool forward, tuplesort_heap_delete_top(state); /* - * Rewind to free the read buffer. It'd go away at the - * end of the sort anyway, but better to release the - * memory early. + * Close the tape. It'd go away at the end of the sort + * anyway, but better to release the memory early. */ - LogicalTapeRewindForWrite(state->tapeset, srcTape); + LogicalTapeClose(state->tapes[srcTape]); return true; } newtup.srctape = srcTape; @@ -2667,9 +2663,12 @@ inittapes(Tuplesortstate *state, bool mergeruns) /* Create the tape set and allocate the per-tape data arrays */ inittapestate(state, maxTapes); state->tapeset = - LogicalTapeSetCreate(maxTapes, false, NULL, + LogicalTapeSetCreate(false, state->shared ? &state->shared->fileset : NULL, state->worker); + state->tapes = palloc(maxTapes * sizeof(LogicalTape *)); + for (j = 0; j < maxTapes; j++) + state->tapes[j] = LogicalTapeCreate(state->tapeset); state->currentRun = 0; @@ -2919,7 +2918,7 @@ mergeruns(Tuplesortstate *state) /* End of step D2: rewind all output tapes to prepare for merging */ for (tapenum = 0; tapenum < state->tapeRange; tapenum++) - LogicalTapeRewindForRead(state->tapeset, tapenum, state->read_buffer_size); + LogicalTapeRewindForRead(state->tapes[tapenum], state->read_buffer_size); for (;;) { @@ -2981,11 +2980,14 @@ mergeruns(Tuplesortstate *state) /* Step D6: decrease level */ if (--state->Level == 0) break; + /* rewind output tape T to use as new input */ - LogicalTapeRewindForRead(state->tapeset, state->tp_tapenum[state->tapeRange], + LogicalTapeRewindForRead(state->tapes[state->tp_tapenum[state->tapeRange]], state->read_buffer_size); - /* rewind used-up input tape P, and prepare it for write pass */ - LogicalTapeRewindForWrite(state->tapeset, state->tp_tapenum[state->tapeRange - 1]); + + /* close used-up input tape P, and create a new one for write pass */ + LogicalTapeClose(state->tapes[state->tp_tapenum[state->tapeRange - 1]]); + state->tapes[state->tp_tapenum[state->tapeRange - 1]] = LogicalTapeCreate(state->tapeset); state->tp_runs[state->tapeRange - 1] = 0; /* @@ -3013,18 +3015,21 @@ mergeruns(Tuplesortstate *state) * output tape while rewinding it. The last iteration of step D6 would be * a waste of cycles anyway... */ - state->result_tape = state->tp_tapenum[state->tapeRange]; + state->result_tape = state->tapes[state->tp_tapenum[state->tapeRange]]; if (!WORKER(state)) - LogicalTapeFreeze(state->tapeset, state->result_tape, NULL); + LogicalTapeFreeze(state->result_tape, NULL); else worker_freeze_result_tape(state); state->status = TSS_SORTEDONTAPE; - /* Release the read buffers of all the other tapes, by rewinding them. */ + /* Close all the other tapes, to release their read buffers. */ for (tapenum = 0; tapenum < state->maxTapes; tapenum++) { - if (tapenum != state->result_tape) - LogicalTapeRewindForWrite(state->tapeset, tapenum); + if (state->tapes[tapenum] != state->result_tape) + { + LogicalTapeClose(state->tapes[tapenum]); + state->tapes[tapenum] = NULL; + } } } @@ -3037,7 +3042,8 @@ mergeruns(Tuplesortstate *state) static void mergeonerun(Tuplesortstate *state) { - int destTape = state->tp_tapenum[state->tapeRange]; + int destTapeNum = state->tp_tapenum[state->tapeRange]; + LogicalTape *destTape = state->tapes[destTapeNum]; int srcTape; /* @@ -3080,7 +3086,7 @@ mergeonerun(Tuplesortstate *state) * When the heap empties, we're done. Write an end-of-run marker on the * output tape, and increment its count of real runs. */ - markrunend(state, destTape); + markrunend(destTape); state->tp_runs[state->tapeRange]++; #ifdef TRACE_SORT @@ -3146,17 +3152,18 @@ beginmerge(Tuplesortstate *state) * Returns false on EOF. */ static bool -mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup) +mergereadnext(Tuplesortstate *state, int srcTapeIndex, SortTuple *stup) { + LogicalTape *srcTape = state->tapes[srcTapeIndex]; unsigned int tuplen; - if (!state->mergeactive[srcTape]) + if (!state->mergeactive[srcTapeIndex]) return false; /* tape's run is already exhausted */ /* read next tuple, if any */ - if ((tuplen = getlen(state, srcTape, true)) == 0) + if ((tuplen = getlen(srcTape, true)) == 0) { - state->mergeactive[srcTape] = false; + state->mergeactive[srcTapeIndex] = false; return false; } READTUP(state, stup, srcTape, tuplen); @@ -3173,6 +3180,7 @@ mergereadnext(Tuplesortstate *state, int srcTape, SortTuple *stup) static void dumptuples(Tuplesortstate *state, bool alltuples) { + LogicalTape *destTape; int memtupwrite; int i; @@ -3239,10 +3247,10 @@ dumptuples(Tuplesortstate *state, bool alltuples) #endif memtupwrite = state->memtupcount; + destTape = state->tapes[state->tp_tapenum[state->destTape]]; for (i = 0; i < memtupwrite; i++) { - WRITETUP(state, state->tp_tapenum[state->destTape], - &state->memtuples[i]); + WRITETUP(state, destTape, &state->memtuples[i]); state->memtupcount--; } @@ -3255,7 +3263,7 @@ dumptuples(Tuplesortstate *state, bool alltuples) */ MemoryContextReset(state->tuplecontext); - markrunend(state, state->tp_tapenum[state->destTape]); + markrunend(destTape); state->tp_runs[state->destTape]++; state->tp_dummy[state->destTape]--; /* per Alg D step D2 */ @@ -3289,9 +3297,7 @@ tuplesort_rescan(Tuplesortstate *state) state->markpos_eof = false; break; case TSS_SORTEDONTAPE: - LogicalTapeRewindForRead(state->tapeset, - state->result_tape, - 0); + LogicalTapeRewindForRead(state->result_tape, 0); state->eof_reached = false; state->markpos_block = 0L; state->markpos_offset = 0; @@ -3322,8 +3328,7 @@ tuplesort_markpos(Tuplesortstate *state) state->markpos_eof = state->eof_reached; break; case TSS_SORTEDONTAPE: - LogicalTapeTell(state->tapeset, - state->result_tape, + LogicalTapeTell(state->result_tape, &state->markpos_block, &state->markpos_offset); state->markpos_eof = state->eof_reached; @@ -3354,8 +3359,7 @@ tuplesort_restorepos(Tuplesortstate *state) state->eof_reached = state->markpos_eof; break; case TSS_SORTEDONTAPE: - LogicalTapeSeek(state->tapeset, - state->result_tape, + LogicalTapeSeek(state->result_tape, state->markpos_block, state->markpos_offset); state->eof_reached = state->markpos_eof; @@ -3697,11 +3701,11 @@ reversedirection(Tuplesortstate *state) */ static unsigned int -getlen(Tuplesortstate *state, int tapenum, bool eofOK) +getlen(LogicalTape *tape, bool eofOK) { unsigned int len; - if (LogicalTapeRead(state->tapeset, tapenum, + if (LogicalTapeRead(tape, &len, sizeof(len)) != sizeof(len)) elog(ERROR, "unexpected end of tape"); if (len == 0 && !eofOK) @@ -3710,11 +3714,11 @@ getlen(Tuplesortstate *state, int tapenum, bool eofOK) } static void -markrunend(Tuplesortstate *state, int tapenum) +markrunend(LogicalTape *tape) { unsigned int len = 0; - LogicalTapeWrite(state->tapeset, tapenum, (void *) &len, sizeof(len)); + LogicalTapeWrite(tape, (void *) &len, sizeof(len)); } /* @@ -3892,7 +3896,7 @@ copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup) } static void -writetup_heap(Tuplesortstate *state, int tapenum, SortTuple *stup) +writetup_heap(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup) { MinimalTuple tuple = (MinimalTuple) stup->tuple; @@ -3903,13 +3907,10 @@ writetup_heap(Tuplesortstate *state, int tapenum, SortTuple *stup) /* total on-disk footprint: */ unsigned int tuplen = tupbodylen + sizeof(int); - LogicalTapeWrite(state->tapeset, tapenum, - (void *) &tuplen, sizeof(tuplen)); - LogicalTapeWrite(state->tapeset, tapenum, - (void *) tupbody, tupbodylen); + LogicalTapeWrite(tape, (void *) &tuplen, sizeof(tuplen)); + LogicalTapeWrite(tape, (void *) tupbody, tupbodylen); if (state->randomAccess) /* need trailing length word? */ - LogicalTapeWrite(state->tapeset, tapenum, - (void *) &tuplen, sizeof(tuplen)); + LogicalTapeWrite(tape, (void *) &tuplen, sizeof(tuplen)); if (!state->slabAllocatorUsed) { @@ -3920,7 +3921,7 @@ writetup_heap(Tuplesortstate *state, int tapenum, SortTuple *stup) static void readtup_heap(Tuplesortstate *state, SortTuple *stup, - int tapenum, unsigned int len) + LogicalTape *tape, unsigned int len) { unsigned int tupbodylen = len - sizeof(int); unsigned int tuplen = tupbodylen + MINIMAL_TUPLE_DATA_OFFSET; @@ -3930,11 +3931,9 @@ readtup_heap(Tuplesortstate *state, SortTuple *stup, /* read in the tuple proper */ tuple->t_len = tuplen; - LogicalTapeReadExact(state->tapeset, tapenum, - tupbody, tupbodylen); + LogicalTapeReadExact(tape, tupbody, tupbodylen); if (state->randomAccess) /* need trailing length word? */ - LogicalTapeReadExact(state->tapeset, tapenum, - &tuplen, sizeof(tuplen)); + LogicalTapeReadExact(tape, &tuplen, sizeof(tuplen)); stup->tuple = (void *) tuple; /* set up first-column key value */ htup.t_len = tuple->t_len + MINIMAL_TUPLE_OFFSET; @@ -4135,21 +4134,17 @@ copytup_cluster(Tuplesortstate *state, SortTuple *stup, void *tup) } static void -writetup_cluster(Tuplesortstate *state, int tapenum, SortTuple *stup) +writetup_cluster(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup) { HeapTuple tuple = (HeapTuple) stup->tuple; unsigned int tuplen = tuple->t_len + sizeof(ItemPointerData) + sizeof(int); /* We need to store t_self, but not other fields of HeapTupleData */ - LogicalTapeWrite(state->tapeset, tapenum, - &tuplen, sizeof(tuplen)); - LogicalTapeWrite(state->tapeset, tapenum, - &tuple->t_self, sizeof(ItemPointerData)); - LogicalTapeWrite(state->tapeset, tapenum, - tuple->t_data, tuple->t_len); + LogicalTapeWrite(tape, &tuplen, sizeof(tuplen)); + LogicalTapeWrite(tape, &tuple->t_self, sizeof(ItemPointerData)); + LogicalTapeWrite(tape, tuple->t_data, tuple->t_len); if (state->randomAccess) /* need trailing length word? */ - LogicalTapeWrite(state->tapeset, tapenum, - &tuplen, sizeof(tuplen)); + LogicalTapeWrite(tape, &tuplen, sizeof(tuplen)); if (!state->slabAllocatorUsed) { @@ -4160,7 +4155,7 @@ writetup_cluster(Tuplesortstate *state, int tapenum, SortTuple *stup) static void readtup_cluster(Tuplesortstate *state, SortTuple *stup, - int tapenum, unsigned int tuplen) + LogicalTape *tape, unsigned int tuplen) { unsigned int t_len = tuplen - sizeof(ItemPointerData) - sizeof(int); HeapTuple tuple = (HeapTuple) readtup_alloc(state, @@ -4169,16 +4164,13 @@ readtup_cluster(Tuplesortstate *state, SortTuple *stup, /* Reconstruct the HeapTupleData header */ tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE); tuple->t_len = t_len; - LogicalTapeReadExact(state->tapeset, tapenum, - &tuple->t_self, sizeof(ItemPointerData)); + LogicalTapeReadExact(tape, &tuple->t_self, sizeof(ItemPointerData)); /* We don't currently bother to reconstruct t_tableOid */ tuple->t_tableOid = InvalidOid; /* Read in the tuple body */ - LogicalTapeReadExact(state->tapeset, tapenum, - tuple->t_data, tuple->t_len); + LogicalTapeReadExact(tape, tuple->t_data, tuple->t_len); if (state->randomAccess) /* need trailing length word? */ - LogicalTapeReadExact(state->tapeset, tapenum, - &tuplen, sizeof(tuplen)); + LogicalTapeReadExact(tape, &tuplen, sizeof(tuplen)); stup->tuple = (void *) tuple; /* set up first-column key value, if it's a simple column */ if (state->indexInfo->ii_IndexAttrNumbers[0] != 0) @@ -4392,19 +4384,16 @@ copytup_index(Tuplesortstate *state, SortTuple *stup, void *tup) } static void -writetup_index(Tuplesortstate *state, int tapenum, SortTuple *stup) +writetup_index(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup) { IndexTuple tuple = (IndexTuple) stup->tuple; unsigned int tuplen; tuplen = IndexTupleSize(tuple) + sizeof(tuplen); - LogicalTapeWrite(state->tapeset, tapenum, - (void *) &tuplen, sizeof(tuplen)); - LogicalTapeWrite(state->tapeset, tapenum, - (void *) tuple, IndexTupleSize(tuple)); + LogicalTapeWrite(tape, (void *) &tuplen, sizeof(tuplen)); + LogicalTapeWrite(tape, (void *) tuple, IndexTupleSize(tuple)); if (state->randomAccess) /* need trailing length word? */ - LogicalTapeWrite(state->tapeset, tapenum, - (void *) &tuplen, sizeof(tuplen)); + LogicalTapeWrite(tape, (void *) &tuplen, sizeof(tuplen)); if (!state->slabAllocatorUsed) { @@ -4415,16 +4404,14 @@ writetup_index(Tuplesortstate *state, int tapenum, SortTuple *stup) static void readtup_index(Tuplesortstate *state, SortTuple *stup, - int tapenum, unsigned int len) + LogicalTape *tape, unsigned int len) { unsigned int tuplen = len - sizeof(unsigned int); IndexTuple tuple = (IndexTuple) readtup_alloc(state, tuplen); - LogicalTapeReadExact(state->tapeset, tapenum, - tuple, tuplen); + LogicalTapeReadExact(tape, tuple, tuplen); if (state->randomAccess) /* need trailing length word? */ - LogicalTapeReadExact(state->tapeset, tapenum, - &tuplen, sizeof(tuplen)); + LogicalTapeReadExact(tape, &tuplen, sizeof(tuplen)); stup->tuple = (void *) tuple; /* set up first-column key value */ stup->datum1 = index_getattr(tuple, @@ -4466,7 +4453,7 @@ copytup_datum(Tuplesortstate *state, SortTuple *stup, void *tup) } static void -writetup_datum(Tuplesortstate *state, int tapenum, SortTuple *stup) +writetup_datum(Tuplesortstate *state, LogicalTape *tape, SortTuple *stup) { void *waddr; unsigned int tuplen; @@ -4491,13 +4478,10 @@ writetup_datum(Tuplesortstate *state, int tapenum, SortTuple *stup) writtenlen = tuplen + sizeof(unsigned int); - LogicalTapeWrite(state->tapeset, tapenum, - (void *) &writtenlen, sizeof(writtenlen)); - LogicalTapeWrite(state->tapeset, tapenum, - waddr, tuplen); + LogicalTapeWrite(tape, (void *) &writtenlen, sizeof(writtenlen)); + LogicalTapeWrite(tape, waddr, tuplen); if (state->randomAccess) /* need trailing length word? */ - LogicalTapeWrite(state->tapeset, tapenum, - (void *) &writtenlen, sizeof(writtenlen)); + LogicalTapeWrite(tape, (void *) &writtenlen, sizeof(writtenlen)); if (!state->slabAllocatorUsed && stup->tuple) { @@ -4508,7 +4492,7 @@ writetup_datum(Tuplesortstate *state, int tapenum, SortTuple *stup) static void readtup_datum(Tuplesortstate *state, SortTuple *stup, - int tapenum, unsigned int len) + LogicalTape *tape, unsigned int len) { unsigned int tuplen = len - sizeof(unsigned int); @@ -4522,8 +4506,7 @@ readtup_datum(Tuplesortstate *state, SortTuple *stup, else if (!state->tuples) { Assert(tuplen == sizeof(Datum)); - LogicalTapeReadExact(state->tapeset, tapenum, - &stup->datum1, tuplen); + LogicalTapeReadExact(tape, &stup->datum1, tuplen); stup->isnull1 = false; stup->tuple = NULL; } @@ -4531,16 +4514,14 @@ readtup_datum(Tuplesortstate *state, SortTuple *stup, { void *raddr = readtup_alloc(state, tuplen); - LogicalTapeReadExact(state->tapeset, tapenum, - raddr, tuplen); + LogicalTapeReadExact(tape, raddr, tuplen); stup->datum1 = PointerGetDatum(raddr); stup->isnull1 = false; stup->tuple = raddr; } if (state->randomAccess) /* need trailing length word? */ - LogicalTapeReadExact(state->tapeset, tapenum, - &tuplen, sizeof(tuplen)); + LogicalTapeReadExact(tape, &tuplen, sizeof(tuplen)); } /* @@ -4652,7 +4633,7 @@ worker_freeze_result_tape(Tuplesortstate *state) TapeShare output; Assert(WORKER(state)); - Assert(state->result_tape != -1); + Assert(state->result_tape != NULL); Assert(state->memtupcount == 0); /* @@ -4668,7 +4649,7 @@ worker_freeze_result_tape(Tuplesortstate *state) * Parallel worker requires result tape metadata, which is to be stored in * shared memory for leader */ - LogicalTapeFreeze(state->tapeset, state->result_tape, &output); + LogicalTapeFreeze(state->result_tape, &output); /* Store properties of output tape, and update finished worker count */ SpinLockAcquire(&shared->mutex); @@ -4687,9 +4668,9 @@ static void worker_nomergeruns(Tuplesortstate *state) { Assert(WORKER(state)); - Assert(state->result_tape == -1); + Assert(state->result_tape == NULL); - state->result_tape = state->tp_tapenum[state->destTape]; + state->result_tape = state->tapes[state->tp_tapenum[state->destTape]]; worker_freeze_result_tape(state); } @@ -4733,9 +4714,13 @@ leader_takeover_tapes(Tuplesortstate *state) * randomAccess is disallowed for parallel sorts. */ inittapestate(state, nParticipants + 1); - state->tapeset = LogicalTapeSetCreate(nParticipants + 1, false, - shared->tapes, &shared->fileset, + state->tapeset = LogicalTapeSetCreate(false, + &shared->fileset, state->worker); + state->tapes = palloc(state->maxTapes * sizeof(LogicalTape *)); + for (j = 0; j < nParticipants; j++) + state->tapes[j] = LogicalTapeImport(state->tapeset, j, &shared->tapes[j]); + /* tapes[nParticipants] represents the "leader tape", which is not used */ /* mergeruns() relies on currentRun for # of runs (in one-pass cases) */ state->currentRun = nParticipants; |