aboutsummaryrefslogtreecommitdiff
path: root/src/backend/executor/nodeGatherMerge.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/executor/nodeGatherMerge.c')
-rw-r--r--src/backend/executor/nodeGatherMerge.c40
1 files changed, 22 insertions, 18 deletions
diff --git a/src/backend/executor/nodeGatherMerge.c b/src/backend/executor/nodeGatherMerge.c
index 317ddb4ae27..47129344f32 100644
--- a/src/backend/executor/nodeGatherMerge.c
+++ b/src/backend/executor/nodeGatherMerge.c
@@ -45,7 +45,7 @@
*/
typedef struct GMReaderTupleBuffer
{
- HeapTuple *tuple; /* array of length MAX_TUPLE_STORE */
+ MinimalTuple *tuple; /* array of length MAX_TUPLE_STORE */
int nTuples; /* number of tuples currently stored */
int readCounter; /* index of next tuple to extract */
bool done; /* true if reader is known exhausted */
@@ -54,8 +54,8 @@ typedef struct GMReaderTupleBuffer
static TupleTableSlot *ExecGatherMerge(PlanState *pstate);
static int32 heap_compare_slots(Datum a, Datum b, void *arg);
static TupleTableSlot *gather_merge_getnext(GatherMergeState *gm_state);
-static HeapTuple gm_readnext_tuple(GatherMergeState *gm_state, int nreader,
- bool nowait, bool *done);
+static MinimalTuple gm_readnext_tuple(GatherMergeState *gm_state, int nreader,
+ bool nowait, bool *done);
static void ExecShutdownGatherMergeWorkers(GatherMergeState *node);
static void gather_merge_setup(GatherMergeState *gm_state);
static void gather_merge_init(GatherMergeState *gm_state);
@@ -419,12 +419,12 @@ gather_merge_setup(GatherMergeState *gm_state)
{
/* Allocate the tuple array with length MAX_TUPLE_STORE */
gm_state->gm_tuple_buffers[i].tuple =
- (HeapTuple *) palloc0(sizeof(HeapTuple) * MAX_TUPLE_STORE);
+ (MinimalTuple *) palloc0(sizeof(MinimalTuple) * MAX_TUPLE_STORE);
/* Initialize tuple slot for worker */
gm_state->gm_slots[i + 1] =
ExecInitExtraTupleSlot(gm_state->ps.state, gm_state->tupDesc,
- &TTSOpsHeapTuple);
+ &TTSOpsMinimalTuple);
}
/* Allocate the resources for the merge */
@@ -533,7 +533,7 @@ gather_merge_clear_tuples(GatherMergeState *gm_state)
GMReaderTupleBuffer *tuple_buffer = &gm_state->gm_tuple_buffers[i];
while (tuple_buffer->readCounter < tuple_buffer->nTuples)
- heap_freetuple(tuple_buffer->tuple[tuple_buffer->readCounter++]);
+ pfree(tuple_buffer->tuple[tuple_buffer->readCounter++]);
ExecClearTuple(gm_state->gm_slots[i + 1]);
}
@@ -613,13 +613,13 @@ load_tuple_array(GatherMergeState *gm_state, int reader)
/* Try to fill additional slots in the array. */
for (i = tuple_buffer->nTuples; i < MAX_TUPLE_STORE; i++)
{
- HeapTuple tuple;
+ MinimalTuple tuple;
tuple = gm_readnext_tuple(gm_state,
reader,
true,
&tuple_buffer->done);
- if (!HeapTupleIsValid(tuple))
+ if (!tuple)
break;
tuple_buffer->tuple[i] = tuple;
tuple_buffer->nTuples++;
@@ -637,7 +637,7 @@ static bool
gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
{
GMReaderTupleBuffer *tuple_buffer;
- HeapTuple tup;
+ MinimalTuple tup;
/*
* If we're being asked to generate a tuple from the leader, then we just
@@ -687,7 +687,7 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
reader,
nowait,
&tuple_buffer->done);
- if (!HeapTupleIsValid(tup))
+ if (!tup)
return false;
/*
@@ -697,13 +697,13 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
load_tuple_array(gm_state, reader);
}
- Assert(HeapTupleIsValid(tup));
+ Assert(tup);
/* Build the TupleTableSlot for the given tuple */
- ExecStoreHeapTuple(tup, /* tuple to store */
- gm_state->gm_slots[reader], /* slot in which to store
- * the tuple */
- true); /* pfree tuple when done with it */
+ ExecStoreMinimalTuple(tup, /* tuple to store */
+ gm_state->gm_slots[reader], /* slot in which to store
+ * the tuple */
+ true); /* pfree tuple when done with it */
return true;
}
@@ -711,12 +711,12 @@ gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait)
/*
* Attempt to read a tuple from given worker.
*/
-static HeapTuple
+static MinimalTuple
gm_readnext_tuple(GatherMergeState *gm_state, int nreader, bool nowait,
bool *done)
{
TupleQueueReader *reader;
- HeapTuple tup;
+ MinimalTuple tup;
/* Check for async events, particularly messages from workers. */
CHECK_FOR_INTERRUPTS();
@@ -732,7 +732,11 @@ gm_readnext_tuple(GatherMergeState *gm_state, int nreader, bool nowait,
reader = gm_state->reader[nreader - 1];
tup = TupleQueueReaderNext(reader, nowait, done);
- return tup;
+ /*
+ * Since we'll be buffering these across multiple calls, we need to make a
+ * copy.
+ */
+ return tup ? heap_copy_minimal_tuple(tup) : NULL;
}
/*