diff options
Diffstat (limited to 'src/backend/executor/nodeGatherMerge.c')
-rw-r--r-- | src/backend/executor/nodeGatherMerge.c | 40 |
1 files changed, 22 insertions, 18 deletions
diff --git a/src/backend/executor/nodeGatherMerge.c b/src/backend/executor/nodeGatherMerge.c index 317ddb4ae27..47129344f32 100644 --- a/src/backend/executor/nodeGatherMerge.c +++ b/src/backend/executor/nodeGatherMerge.c @@ -45,7 +45,7 @@ */ typedef struct GMReaderTupleBuffer { - HeapTuple *tuple; /* array of length MAX_TUPLE_STORE */ + MinimalTuple *tuple; /* array of length MAX_TUPLE_STORE */ int nTuples; /* number of tuples currently stored */ int readCounter; /* index of next tuple to extract */ bool done; /* true if reader is known exhausted */ @@ -54,8 +54,8 @@ typedef struct GMReaderTupleBuffer static TupleTableSlot *ExecGatherMerge(PlanState *pstate); static int32 heap_compare_slots(Datum a, Datum b, void *arg); static TupleTableSlot *gather_merge_getnext(GatherMergeState *gm_state); -static HeapTuple gm_readnext_tuple(GatherMergeState *gm_state, int nreader, - bool nowait, bool *done); +static MinimalTuple gm_readnext_tuple(GatherMergeState *gm_state, int nreader, + bool nowait, bool *done); static void ExecShutdownGatherMergeWorkers(GatherMergeState *node); static void gather_merge_setup(GatherMergeState *gm_state); static void gather_merge_init(GatherMergeState *gm_state); @@ -419,12 +419,12 @@ gather_merge_setup(GatherMergeState *gm_state) { /* Allocate the tuple array with length MAX_TUPLE_STORE */ gm_state->gm_tuple_buffers[i].tuple = - (HeapTuple *) palloc0(sizeof(HeapTuple) * MAX_TUPLE_STORE); + (MinimalTuple *) palloc0(sizeof(MinimalTuple) * MAX_TUPLE_STORE); /* Initialize tuple slot for worker */ gm_state->gm_slots[i + 1] = ExecInitExtraTupleSlot(gm_state->ps.state, gm_state->tupDesc, - &TTSOpsHeapTuple); + &TTSOpsMinimalTuple); } /* Allocate the resources for the merge */ @@ -533,7 +533,7 @@ gather_merge_clear_tuples(GatherMergeState *gm_state) GMReaderTupleBuffer *tuple_buffer = &gm_state->gm_tuple_buffers[i]; while (tuple_buffer->readCounter < tuple_buffer->nTuples) - heap_freetuple(tuple_buffer->tuple[tuple_buffer->readCounter++]); + pfree(tuple_buffer->tuple[tuple_buffer->readCounter++]); ExecClearTuple(gm_state->gm_slots[i + 1]); } @@ -613,13 +613,13 @@ load_tuple_array(GatherMergeState *gm_state, int reader) /* Try to fill additional slots in the array. */ for (i = tuple_buffer->nTuples; i < MAX_TUPLE_STORE; i++) { - HeapTuple tuple; + MinimalTuple tuple; tuple = gm_readnext_tuple(gm_state, reader, true, &tuple_buffer->done); - if (!HeapTupleIsValid(tuple)) + if (!tuple) break; tuple_buffer->tuple[i] = tuple; tuple_buffer->nTuples++; @@ -637,7 +637,7 @@ static bool gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait) { GMReaderTupleBuffer *tuple_buffer; - HeapTuple tup; + MinimalTuple tup; /* * If we're being asked to generate a tuple from the leader, then we just @@ -687,7 +687,7 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait) reader, nowait, &tuple_buffer->done); - if (!HeapTupleIsValid(tup)) + if (!tup) return false; /* @@ -697,13 +697,13 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait) load_tuple_array(gm_state, reader); } - Assert(HeapTupleIsValid(tup)); + Assert(tup); /* Build the TupleTableSlot for the given tuple */ - ExecStoreHeapTuple(tup, /* tuple to store */ - gm_state->gm_slots[reader], /* slot in which to store - * the tuple */ - true); /* pfree tuple when done with it */ + ExecStoreMinimalTuple(tup, /* tuple to store */ + gm_state->gm_slots[reader], /* slot in which to store + * the tuple */ + true); /* pfree tuple when done with it */ return true; } @@ -711,12 +711,12 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait) /* * Attempt to read a tuple from given worker. */ -static HeapTuple +static MinimalTuple gm_readnext_tuple(GatherMergeState *gm_state, int nreader, bool nowait, bool *done) { TupleQueueReader *reader; - HeapTuple tup; + MinimalTuple tup; /* Check for async events, particularly messages from workers. */ CHECK_FOR_INTERRUPTS(); @@ -732,7 +732,11 @@ gm_readnext_tuple(GatherMergeState *gm_state, int nreader, bool nowait, reader = gm_state->reader[nreader - 1]; tup = TupleQueueReaderNext(reader, nowait, done); - return tup; + /* + * Since we'll be buffering these across multiple calls, we need to make a + * copy. + */ + return tup ? heap_copy_minimal_tuple(tup) : NULL; } /* |