diff options
author | Tom Lane <tgl@sss.pgh.pa.us> | 1999-05-18 21:33:06 +0000 |
---|---|---|
committer | Tom Lane <tgl@sss.pgh.pa.us> | 1999-05-18 21:33:06 +0000 |
commit | 26069a58e8e4e4f3bef27e52d2d5cad2baa46c9f (patch) | |
tree | c40f7d3e130df1fce0e1fc73520b9e465c02a607 /src/backend/executor/nodeHash.c | |
parent | d261a5ec861c001f0331e36e01499d8dde7f5c67 (diff) | |
download | postgresql-26069a58e8e4e4f3bef27e52d2d5cad2baa46c9f.tar.gz postgresql-26069a58e8e4e4f3bef27e52d2d5cad2baa46c9f.zip |
Rewrite hash join to use simple linked lists instead of a
fixed-size hashtable. This should prevent 'hashtable out of memory' errors,
unless you really do run out of memory. Note: target size for hashtable
is now taken from -S postmaster switch, not -B, since it is local memory
in the backend rather than shared memory.
Diffstat (limited to 'src/backend/executor/nodeHash.c')
-rw-r--r-- | src/backend/executor/nodeHash.c | 585 |
1 files changed, 213 insertions, 372 deletions
diff --git a/src/backend/executor/nodeHash.c b/src/backend/executor/nodeHash.c index b36a2ba4051..4589da32bc1 100644 --- a/src/backend/executor/nodeHash.c +++ b/src/backend/executor/nodeHash.c @@ -6,7 +6,7 @@ * Copyright (c) 1994, Regents of the University of California * * - * $Id: nodeHash.c,v 1.34 1999/05/09 00:53:20 tgl Exp $ + * $Id: nodeHash.c,v 1.35 1999/05/18 21:33:06 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -22,11 +22,6 @@ #include <stdio.h> #include <math.h> #include <string.h> -#include <sys/file.h> -#include <sys/stat.h> -#include <fcntl.h> - -#include <unistd.h> #include "postgres.h" #include "miscadmin.h" @@ -34,17 +29,12 @@ #include "executor/executor.h" #include "executor/nodeHash.h" #include "executor/nodeHashjoin.h" -#include "storage/ipc.h" #include "utils/hsearch.h" +#include "utils/portal.h" -extern int NBuffers; +extern int SortMem; static int hashFunc(Datum key, int len, bool byVal); -static RelativeAddr hashTableAlloc(int size, HashJoinTable hashtable); -static void * absHashTableAlloc(int size, HashJoinTable hashtable); -static void ExecHashOverflowInsert(HashJoinTable hashtable, - HashBucket bucket, - HeapTuple heapTuple); /* ---------------------------------------------------------------- * ExecHash @@ -63,11 +53,7 @@ ExecHash(Hash *node) HashJoinTable hashtable; TupleTableSlot *slot; ExprContext *econtext; - int nbatch; - File *batches = NULL; - RelativeAddr *batchPos; - int *batchSizes; int i; /* ---------------- @@ -79,27 +65,25 @@ ExecHash(Hash *node) estate = node->plan.state; outerNode = outerPlan(node); - hashtable = node->hashtable; + hashtable = hashstate->hashtable; if (hashtable == NULL) elog(ERROR, "ExecHash: hash table is NULL."); nbatch = hashtable->nbatch; if (nbatch > 0) - { /* if needs hash partition */ - /* -------------- - * allocate space for the file descriptors of batch files - * then open the batch files in the current processes. - * -------------- + { + /* ---------------- + * Open temp files for inner batches, if needed. + * Note that file buffers are palloc'd in regular executor context. + * ---------------- */ - batches = (File *) palloc(nbatch * sizeof(File)); for (i = 0; i < nbatch; i++) { - batches[i] = OpenTemporaryFile(); + File tfile = OpenTemporaryFile(); + Assert(tfile >= 0); + hashtable->innerBatchFile[i] = BufFileCreate(tfile); } - hashstate->hashBatches = batches; - batchPos = (RelativeAddr *) ABSADDR(hashtable->innerbatchPos); - batchSizes = (int *) ABSADDR(hashtable->innerbatchSizes); } /* ---------------- @@ -110,7 +94,7 @@ ExecHash(Hash *node) econtext = hashstate->cstate.cs_ExprContext; /* ---------------- - * get tuple and insert into the hash table + * get all inner tuples and insert into the hash table (or temp files) * ---------------- */ for (;;) @@ -118,26 +102,11 @@ ExecHash(Hash *node) slot = ExecProcNode(outerNode, (Plan *) node); if (TupIsNull(slot)) break; - econtext->ecxt_innertuple = slot; - ExecHashTableInsert(hashtable, econtext, hashkey, - hashstate->hashBatches); - + ExecHashTableInsert(hashtable, econtext, hashkey); ExecClearTuple(slot); } - /* - * end of build phase, flush all the last pages of the batches. - */ - for (i = 0; i < nbatch; i++) - { - if (FileSeek(batches[i], 0L, SEEK_END) < 0) - perror("FileSeek"); - if (FileWrite(batches[i], ABSADDR(hashtable->batch) + i * BLCKSZ, BLCKSZ) < 0) - perror("FileWrite"); - NDirectFileWrite++; - } - /* --------------------- * Return the slot so that we have the tuple descriptor * when we need to save/restore them. -Jeff 11 July 1991 @@ -173,10 +142,10 @@ ExecInitHash(Hash *node, EState *estate, Plan *parent) */ hashstate = makeNode(HashState); node->hashstate = hashstate; - hashstate->hashBatches = NULL; + hashstate->hashtable = NULL; /* ---------------- - * Miscellanious initialization + * Miscellaneous initialization * * + assign node's base_id * + assign debugging hooks and @@ -186,7 +155,6 @@ ExecInitHash(Hash *node, EState *estate, Plan *parent) ExecAssignNodeBaseInfo(estate, &hashstate->cstate, parent); ExecAssignExprContext(estate, &hashstate->cstate); -#define HASH_NSLOTS 1 /* ---------------- * initialize our result slot * ---------------- @@ -214,6 +182,7 @@ ExecInitHash(Hash *node, EState *estate, Plan *parent) int ExecCountSlotsHash(Hash *node) { +#define HASH_NSLOTS 1 return ExecCountSlotsNode(outerPlan(node)) + ExecCountSlotsNode(innerPlan(node)) + HASH_NSLOTS; @@ -230,16 +199,12 @@ ExecEndHash(Hash *node) { HashState *hashstate; Plan *outerPlan; - File *batches; /* ---------------- * get info from the hash state * ---------------- */ hashstate = node->hashstate; - batches = hashstate->hashBatches; - if (batches != NULL) - pfree(batches); /* ---------------- * free projection info. no need to free result type info @@ -256,21 +221,6 @@ ExecEndHash(Hash *node) ExecEndNode(outerPlan, (Plan *) node); } -static RelativeAddr -hashTableAlloc(int size, HashJoinTable hashtable) -{ - RelativeAddr p = hashtable->top; - hashtable->top += MAXALIGN(size); - return p; -} - -static void * -absHashTableAlloc(int size, HashJoinTable hashtable) -{ - RelativeAddr p = hashTableAlloc(size, hashtable); - return ABSADDR(p); -} - /* ---------------------------------------------------------------- * ExecHashTableCreate @@ -285,22 +235,19 @@ HashJoinTable ExecHashTableCreate(Hash *node) { Plan *outerNode; - int HashTBSize; - int nbatch; int ntuples; int tupsize; - int pages; - int sqrtpages; - IpcMemoryId shmid; + double inner_rel_bytes; + double hash_table_bytes; + int nbatch; HashJoinTable hashtable; - HashBucket bucket; int nbuckets; int totalbuckets; int bucketsize; int i; - RelativeAddr *outerbatchPos; - RelativeAddr *innerbatchPos; - int *innerbatchSizes; + Portal myPortal; + char myPortalName[64]; + MemoryContext oldcxt; /* ---------------- * Get information about the size of the relation to be hashed @@ -314,38 +261,48 @@ ExecHashTableCreate(Hash *node) ntuples = outerNode->plan_size; if (ntuples <= 0) /* force a plausible size if no info */ ntuples = 1000; - tupsize = outerNode->plan_width + sizeof(HeapTupleData); - pages = (int) ceil((double) ntuples * tupsize * FUDGE_FAC / BLCKSZ); + /* estimate tupsize based on footprint of tuple in hashtable... + * but what about palloc overhead? + */ + tupsize = MAXALIGN(outerNode->plan_width) + + MAXALIGN(sizeof(HashJoinTupleData)); + inner_rel_bytes = (double) ntuples * tupsize * FUDGE_FAC; /* - * Max hashtable size is NBuffers pages, but not less than + * Target hashtable size is SortMem kilobytes, but not less than * sqrt(estimated inner rel size), so as to avoid horrible performance. - * XXX since the hashtable is not allocated in shared mem anymore, - * it would probably be more appropriate to drive this from -S than -B. */ - sqrtpages = (int) ceil(sqrt((double) pages)); - HashTBSize = NBuffers; - if (sqrtpages > HashTBSize) - HashTBSize = sqrtpages; + hash_table_bytes = sqrt(inner_rel_bytes); + if (hash_table_bytes < (SortMem * 1024L)) + hash_table_bytes = SortMem * 1024L; /* * Count the number of hash buckets we want for the whole relation, - * and the number we can actually fit in the allowed memory. + * for an average bucket load of NTUP_PER_BUCKET (per virtual bucket!). + */ + totalbuckets = (int) ceil((double) ntuples * FUDGE_FAC / NTUP_PER_BUCKET); + + /* + * Count the number of buckets we think will actually fit in the + * target memory size, at a loading of NTUP_PER_BUCKET (physical buckets). * NOTE: FUDGE_FAC here determines the fraction of the hashtable space - * saved for overflow records. Need a better approach... + * reserved to allow for nonuniform distribution of hash values. + * Perhaps this should be a different number from the other uses of + * FUDGE_FAC, but since we have no real good way to pick either one... */ - totalbuckets = (int) ceil((double) ntuples / NTUP_PER_BUCKET); - bucketsize = MAXALIGN(NTUP_PER_BUCKET * tupsize + sizeof(*bucket)); - nbuckets = (int) ((HashTBSize * BLCKSZ) / (bucketsize * FUDGE_FAC)); + bucketsize = NTUP_PER_BUCKET * tupsize; + nbuckets = (int) (hash_table_bytes / (bucketsize * FUDGE_FAC)); + if (nbuckets <= 0) + nbuckets = 1; if (totalbuckets <= nbuckets) { /* We have enough space, so no batching. In theory we could - * even reduce HashTBSize, but as long as we don't have a way - * to deal with overflow-space overrun, best to leave the - * extra space available for overflow. + * even reduce nbuckets, but since that could lead to poor + * behavior if estimated ntuples is much less than reality, + * it seems better to make more buckets instead of fewer. */ - nbuckets = totalbuckets; + totalbuckets = nbuckets; nbatch = 0; } else @@ -356,7 +313,8 @@ ExecHashTableCreate(Hash *node) * of groups we will use for the part of the data that doesn't * fall into the first nbuckets hash buckets. */ - nbatch = (int) ceil((double) (pages - HashTBSize) / HashTBSize); + nbatch = (int) ceil((inner_rel_bytes - hash_table_bytes) / + hash_table_bytes); if (nbatch <= 0) nbatch = 1; } @@ -374,90 +332,117 @@ ExecHashTableCreate(Hash *node) #endif /* ---------------- - * in non-parallel machines, we don't need to put the hash table - * in the shared memory. We just palloc it. The space needed - * is the hash area itself plus nbatch+1 I/O buffer pages. - * ---------------- - */ - hashtable = (HashJoinTable) palloc((HashTBSize + nbatch + 1) * BLCKSZ); - shmid = 0; - - if (hashtable == NULL) - elog(ERROR, "not enough memory for hashjoin."); - /* ---------------- - * initialize the hash table header + * Initialize the hash table control block. + * The hashtable control block is just palloc'd from executor memory. * ---------------- */ + hashtable = (HashJoinTable) palloc(sizeof(HashTableData)); hashtable->nbuckets = nbuckets; hashtable->totalbuckets = totalbuckets; - hashtable->bucketsize = bucketsize; - hashtable->shmid = shmid; - hashtable->top = MAXALIGN(sizeof(HashTableData)); - hashtable->bottom = HashTBSize * BLCKSZ; - /* - * hashtable->readbuf has to be maxaligned!!! - * Note there are nbatch additional pages available after readbuf; - * these are used for buffering the outgoing batch data. - */ - hashtable->readbuf = hashtable->bottom; - hashtable->batch = hashtable->bottom + BLCKSZ; + hashtable->buckets = NULL; hashtable->nbatch = nbatch; hashtable->curbatch = 0; - hashtable->pcount = hashtable->nprocess = 0; + hashtable->innerBatchFile = NULL; + hashtable->outerBatchFile = NULL; + hashtable->innerBatchSize = NULL; + hashtable->outerBatchSize = NULL; + + /* ---------------- + * Create a named portal in which to keep the hashtable working storage. + * Each hashjoin must have its own portal, so be wary of name conflicts. + * ---------------- + */ + i = 0; + do { + i++; + sprintf(myPortalName, "<hashtable %d>", i); + myPortal = GetPortalByName(myPortalName); + } while (PortalIsValid(myPortal)); + myPortal = CreatePortal(myPortalName); + Assert(PortalIsValid(myPortal)); + hashtable->myPortal = (void*) myPortal; /* kluge for circular includes */ + hashtable->hashCxt = (MemoryContext) PortalGetVariableMemory(myPortal); + hashtable->batchCxt = (MemoryContext) PortalGetHeapMemory(myPortal); + + /* Allocate data that will live for the life of the hashjoin */ + + oldcxt = MemoryContextSwitchTo(hashtable->hashCxt); + if (nbatch > 0) { /* --------------- - * allocate and initialize the outer batches + * allocate and initialize the file arrays in hashCxt * --------------- */ - outerbatchPos = (RelativeAddr *) - absHashTableAlloc(nbatch * sizeof(RelativeAddr), hashtable); + hashtable->innerBatchFile = (BufFile **) + palloc(nbatch * sizeof(BufFile *)); + hashtable->outerBatchFile = (BufFile **) + palloc(nbatch * sizeof(BufFile *)); + hashtable->innerBatchSize = (long *) + palloc(nbatch * sizeof(long)); + hashtable->outerBatchSize = (long *) + palloc(nbatch * sizeof(long)); for (i = 0; i < nbatch; i++) { - outerbatchPos[i] = -1; + hashtable->innerBatchFile[i] = NULL; + hashtable->outerBatchFile[i] = NULL; + hashtable->innerBatchSize[i] = 0; + hashtable->outerBatchSize[i] = 0; } - hashtable->outerbatchPos = RELADDR(outerbatchPos); - /* --------------- - * allocate and initialize the inner batches - * --------------- - */ - innerbatchPos = (RelativeAddr *) - absHashTableAlloc(nbatch * sizeof(RelativeAddr), hashtable); - innerbatchSizes = (int *) - absHashTableAlloc(nbatch * sizeof(int), hashtable); - for (i = 0; i < nbatch; i++) - { - innerbatchPos[i] = -1; - innerbatchSizes[i] = 0; - } - hashtable->innerbatchPos = RELADDR(innerbatchPos); - hashtable->innerbatchSizes = RELADDR(innerbatchSizes); - } - else - { - hashtable->outerbatchPos = (RelativeAddr) NULL; - hashtable->innerbatchPos = (RelativeAddr) NULL; - hashtable->innerbatchSizes = (RelativeAddr) NULL; + /* The files will not be opened until later... */ } - hashtable->overflownext = hashtable->top + bucketsize * nbuckets; - Assert(hashtable->overflownext < hashtable->bottom); - /* ---------------- - * initialize each hash bucket - * ---------------- + /* Prepare portal for the first-scan space allocations; + * allocate the hashbucket array therein, and set each bucket "empty". */ - bucket = (HashBucket) ABSADDR(hashtable->top); + MemoryContextSwitchTo(hashtable->batchCxt); + StartPortalAllocMode(DefaultAllocMode, 0); + + hashtable->buckets = (HashJoinTuple *) + palloc(nbuckets * sizeof(HashJoinTuple)); + + if (hashtable->buckets == NULL) + elog(ERROR, "Insufficient memory for hash table."); + for (i = 0; i < nbuckets; i++) { - bucket->top = RELADDR((char *) bucket + MAXALIGN(sizeof(*bucket))); - bucket->bottom = bucket->top; - bucket->firstotuple = bucket->lastotuple = -1; - bucket = (HashBucket) ((char *) bucket + bucketsize); + hashtable->buckets[i] = NULL; } + + MemoryContextSwitchTo(oldcxt); + return hashtable; } /* ---------------------------------------------------------------- + * ExecHashTableDestroy + * + * destroy a hash table + * ---------------------------------------------------------------- + */ +void +ExecHashTableDestroy(HashJoinTable hashtable) +{ + int i; + + /* Make sure all the temp files are closed */ + for (i = 0; i < hashtable->nbatch; i++) + { + if (hashtable->innerBatchFile[i]) + BufFileClose(hashtable->innerBatchFile[i]); + if (hashtable->outerBatchFile[i]) + BufFileClose(hashtable->outerBatchFile[i]); + } + + /* Destroy the portal to release all working memory */ + /* cast here is a kluge for circular includes... */ + PortalDestroy((Portal*) & hashtable->myPortal); + + /* And drop the control block */ + pfree(hashtable); +} + +/* ---------------------------------------------------------------- * ExecHashTableInsert * * insert a tuple into the hash table depending on the hash value @@ -467,32 +452,11 @@ ExecHashTableCreate(Hash *node) void ExecHashTableInsert(HashJoinTable hashtable, ExprContext *econtext, - Var *hashkey, - File *batches) + Var *hashkey) { - TupleTableSlot *slot; - HeapTuple heapTuple; - HashBucket bucket; - int bucketno; - int nbatch; - int batchno; - char *buffer; - RelativeAddr *batchPos; - int *batchSizes; - char *pos; - - nbatch = hashtable->nbatch; - batchPos = (RelativeAddr *) ABSADDR(hashtable->innerbatchPos); - batchSizes = (int *) ABSADDR(hashtable->innerbatchSizes); - - slot = econtext->ecxt_innertuple; - heapTuple = slot->val; - -#ifdef HJDEBUG - printf("Inserting "); -#endif - - bucketno = ExecHashGetBucket(hashtable, econtext, hashkey); + int bucketno = ExecHashGetBucket(hashtable, econtext, hashkey); + TupleTableSlot *slot = econtext->ecxt_innertuple; + HeapTuple heapTuple = slot->val; /* ---------------- * decide whether to put the tuple in the hash table or a tmp file @@ -504,22 +468,24 @@ ExecHashTableInsert(HashJoinTable hashtable, * put the tuple in hash table * --------------- */ - bucket = (HashBucket) - (ABSADDR(hashtable->top) + bucketno * hashtable->bucketsize); - if (((char *) MAXALIGN(ABSADDR(bucket->bottom)) - (char *) bucket) - + heapTuple->t_len + HEAPTUPLESIZE > hashtable->bucketsize) - ExecHashOverflowInsert(hashtable, bucket, heapTuple); - else - { - memmove((char *) MAXALIGN(ABSADDR(bucket->bottom)), - heapTuple, - HEAPTUPLESIZE); - memmove((char *) MAXALIGN(ABSADDR(bucket->bottom)) + HEAPTUPLESIZE, - heapTuple->t_data, - heapTuple->t_len); - bucket->bottom = ((RelativeAddr) MAXALIGN(bucket->bottom) + - heapTuple->t_len + HEAPTUPLESIZE); - } + HashJoinTuple hashTuple; + int hashTupleSize; + + hashTupleSize = MAXALIGN(sizeof(*hashTuple)) + heapTuple->t_len; + hashTuple = (HashJoinTuple) MemoryContextAlloc(hashtable->batchCxt, + hashTupleSize); + if (hashTuple == NULL) + elog(ERROR, "Insufficient memory for hash table."); + memcpy((char *) & hashTuple->htup, + (char *) heapTuple, + sizeof(hashTuple->htup)); + hashTuple->htup.t_data = (HeapTupleHeader) + (((char *) hashTuple) + MAXALIGN(sizeof(*hashTuple))); + memcpy((char *) hashTuple->htup.t_data, + (char *) heapTuple->t_data, + heapTuple->t_len); + hashTuple->next = hashtable->buckets[bucketno]; + hashtable->buckets[bucketno] = hashTuple; } else { @@ -527,32 +493,15 @@ ExecHashTableInsert(HashJoinTable hashtable, * put the tuple into a tmp file for other batches * ----------------- */ - batchno = (nbatch * (bucketno - hashtable->nbuckets)) / + int batchno = (hashtable->nbatch * (bucketno - hashtable->nbuckets)) / (hashtable->totalbuckets - hashtable->nbuckets); - buffer = ABSADDR(hashtable->batch) + batchno * BLCKSZ; - batchSizes[batchno]++; - pos = (char *) - ExecHashJoinSaveTuple(heapTuple, - buffer, - batches[batchno], - (char *) ABSADDR(batchPos[batchno])); - batchPos[batchno] = RELADDR(pos); + hashtable->innerBatchSize[batchno]++; + ExecHashJoinSaveTuple(heapTuple, + hashtable->innerBatchFile[batchno]); } } /* ---------------------------------------------------------------- - * ExecHashTableDestroy - * - * destroy a hash table - * ---------------------------------------------------------------- - */ -void -ExecHashTableDestroy(HashJoinTable hashtable) -{ - pfree(hashtable); -} - -/* ---------------------------------------------------------------- * ExecHashGetBucket * * Get the hash value for a tuple @@ -567,12 +516,12 @@ ExecHashGetBucket(HashJoinTable hashtable, Datum keyval; bool isNull; - /* ---------------- * Get the join attribute value of the tuple - * ---------------- + * * ...It's quick hack - use ExecEvalExpr instead of ExecEvalVar: * hashkey may be T_ArrayRef, not just T_Var. - vadim 04/22/97 + * ---------------- */ keyval = ExecEvalExpr((Node *) hashkey, econtext, &isNull, NULL); @@ -604,62 +553,6 @@ ExecHashGetBucket(HashJoinTable hashtable, } /* ---------------------------------------------------------------- - * ExecHashOverflowInsert - * - * insert into the overflow area of a hash bucket - * ---------------------------------------------------------------- - */ -static void -ExecHashOverflowInsert(HashJoinTable hashtable, - HashBucket bucket, - HeapTuple heapTuple) -{ - OverflowTuple otuple; - RelativeAddr newend; - OverflowTuple firstotuple; - OverflowTuple lastotuple; - - firstotuple = (OverflowTuple) ABSADDR(bucket->firstotuple); - lastotuple = (OverflowTuple) ABSADDR(bucket->lastotuple); - /* ---------------- - * see if we run out of overflow space - * ---------------- - */ - newend = (RelativeAddr) MAXALIGN(hashtable->overflownext + sizeof(*otuple) - + heapTuple->t_len + HEAPTUPLESIZE); - if (newend > hashtable->bottom) - elog(ERROR, - "hash table out of memory. Use -B parameter to increase buffers."); - - /* ---------------- - * establish the overflow chain - * ---------------- - */ - otuple = (OverflowTuple) ABSADDR(hashtable->overflownext); - hashtable->overflownext = newend; - if (firstotuple == NULL) - bucket->firstotuple = bucket->lastotuple = RELADDR(otuple); - else - { - lastotuple->next = RELADDR(otuple); - bucket->lastotuple = RELADDR(otuple); - } - - /* ---------------- - * copy the tuple into the overflow area - * ---------------- - */ - otuple->next = -1; - otuple->tuple = RELADDR(MAXALIGN(((char *) otuple + sizeof(*otuple)))); - memmove(ABSADDR(otuple->tuple), - heapTuple, - HEAPTUPLESIZE); - memmove(ABSADDR(otuple->tuple) + HEAPTUPLESIZE, - heapTuple->t_data, - heapTuple->t_len); -} - -/* ---------------------------------------------------------------- * ExecScanHashBucket * * scan a hash bucket of matches @@ -667,95 +560,46 @@ ExecHashOverflowInsert(HashJoinTable hashtable, */ HeapTuple ExecScanHashBucket(HashJoinState *hjstate, - HashBucket bucket, - HeapTuple curtuple, List *hjclauses, ExprContext *econtext) { - HeapTuple heapTuple; - bool qualResult; - OverflowTuple otuple = NULL; - OverflowTuple curotuple; - TupleTableSlot *inntuple; - OverflowTuple firstotuple; - OverflowTuple lastotuple; - HashJoinTable hashtable; + HashJoinTable hashtable = hjstate->hj_HashTable; + HashJoinTuple hashTuple = hjstate->hj_CurTuple; - hashtable = hjstate->hj_HashTable; - firstotuple = (OverflowTuple) ABSADDR(bucket->firstotuple); - lastotuple = (OverflowTuple) ABSADDR(bucket->lastotuple); - - /* ---------------- - * search the hash bucket - * ---------------- + /* hj_CurTuple is NULL to start scanning a new bucket, or the address + * of the last tuple returned from the current bucket. */ - if (curtuple == NULL || curtuple < (HeapTuple) ABSADDR(bucket->bottom)) + if (hashTuple == NULL) { - if (curtuple == NULL) - heapTuple = (HeapTuple) - MAXALIGN(ABSADDR(bucket->top)); - else - heapTuple = (HeapTuple) - MAXALIGN(((char *) curtuple + curtuple->t_len + HEAPTUPLESIZE)); - - while (heapTuple < (HeapTuple) ABSADDR(bucket->bottom)) - { - - heapTuple->t_data = (HeapTupleHeader) - ((char *) heapTuple + HEAPTUPLESIZE); - - inntuple = ExecStoreTuple(heapTuple, /* tuple to store */ - hjstate->hj_HashTupleSlot, /* slot */ - InvalidBuffer, /* tuple has no buffer */ - false); /* do not pfree this tuple */ - - econtext->ecxt_innertuple = inntuple; - qualResult = ExecQual((List *) hjclauses, econtext); - - if (qualResult) - return heapTuple; - - heapTuple = (HeapTuple) - MAXALIGN(((char *) heapTuple + heapTuple->t_len + HEAPTUPLESIZE)); - } - - if (firstotuple == NULL) - return NULL; - otuple = firstotuple; + hashTuple = hashtable->buckets[hjstate->hj_CurBucketNo]; } - - /* ---------------- - * search the overflow area of the hash bucket - * ---------------- - */ - if (otuple == NULL) + else { - curotuple = hjstate->hj_CurOTuple; - otuple = (OverflowTuple) ABSADDR(curotuple->next); + hashTuple = hashTuple->next; } - while (otuple != NULL) + while (hashTuple != NULL) { - heapTuple = (HeapTuple) ABSADDR(otuple->tuple); - heapTuple->t_data = (HeapTupleHeader) - ((char *) heapTuple + HEAPTUPLESIZE); + HeapTuple heapTuple = & hashTuple->htup; + TupleTableSlot *inntuple; + bool qualResult; + /* insert hashtable's tuple into exec slot so ExecQual sees it */ inntuple = ExecStoreTuple(heapTuple, /* tuple to store */ hjstate->hj_HashTupleSlot, /* slot */ - InvalidBuffer, /* SP?? this tuple has - * no buffer */ + InvalidBuffer, false); /* do not pfree this tuple */ - econtext->ecxt_innertuple = inntuple; - qualResult = ExecQual((List *) hjclauses, econtext); + + qualResult = ExecQual(hjclauses, econtext); if (qualResult) { - hjstate->hj_CurOTuple = otuple; + hjstate->hj_CurTuple = hashTuple; return heapTuple; } - otuple = (OverflowTuple) ABSADDR(otuple->next); + hashTuple = hashTuple->next; } /* ---------------- @@ -819,60 +663,57 @@ hashFunc(Datum key, int len, bool byVal) * reset hash table header for new batch * * ntuples is the number of tuples in the inner relation's batch + * (which we currently don't actually use...) * ---------------------------------------------------------------- */ void -ExecHashTableReset(HashJoinTable hashtable, int ntuples) +ExecHashTableReset(HashJoinTable hashtable, long ntuples) { + MemoryContext oldcxt; + int nbuckets = hashtable->nbuckets; int i; - HashBucket bucket; /* - * We can reset the number of hashbuckets since we are going to - * recalculate the hash values of all the tuples in the new batch - * anyway. We might as well spread out the hash values as much as - * we can within the available space. Note we must set nbuckets - * equal to totalbuckets since we will NOT generate any new output - * batches after this point. + * Release all the hash buckets and tuples acquired in the prior pass, + * and reinitialize the portal for a new pass. */ - hashtable->nbuckets = hashtable->totalbuckets = - (int) (hashtable->bottom / (hashtable->bucketsize * FUDGE_FAC)); + oldcxt = MemoryContextSwitchTo(hashtable->batchCxt); + EndPortalAllocMode(); + StartPortalAllocMode(DefaultAllocMode, 0); /* - * reinitialize the overflow area to empty, and reinit each hash bucket. + * We still use the same number of physical buckets as in the first pass. + * (It could be different; but we already decided how many buckets would + * be appropriate for the allowed memory, so stick with that number.) + * We MUST set totalbuckets to equal nbuckets, because from now on + * no tuples will go out to temp files; there are no more virtual buckets, + * only real buckets. (This implies that tuples will go into different + * bucket numbers than they did on the first pass, but that's OK.) */ - hashtable->overflownext = hashtable->top + hashtable->bucketsize * - hashtable->nbuckets; - Assert(hashtable->overflownext < hashtable->bottom); + hashtable->totalbuckets = nbuckets; + + /* Reallocate and reinitialize the hash bucket headers. */ + hashtable->buckets = (HashJoinTuple *) + palloc(nbuckets * sizeof(HashJoinTuple)); + + if (hashtable->buckets == NULL) + elog(ERROR, "Insufficient memory for hash table."); - bucket = (HashBucket) ABSADDR(hashtable->top); - for (i = 0; i < hashtable->nbuckets; i++) + for (i = 0; i < nbuckets; i++) { - bucket->top = RELADDR((char *) bucket + MAXALIGN(sizeof(*bucket))); - bucket->bottom = bucket->top; - bucket->firstotuple = bucket->lastotuple = -1; - bucket = (HashBucket) ((char *) bucket + hashtable->bucketsize); + hashtable->buckets[i] = NULL; } - hashtable->pcount = hashtable->nprocess; + MemoryContextSwitchTo(oldcxt); } void ExecReScanHash(Hash *node, ExprContext *exprCtxt, Plan *parent) { - HashState *hashstate = node->hashstate; - - if (hashstate->hashBatches != NULL) - { - pfree(hashstate->hashBatches); - hashstate->hashBatches = NULL; - } - /* * if chgParam of subnode is not null then plan will be re-scanned by * first ExecProcNode. */ if (((Plan *) node)->lefttree->chgParam == NULL) ExecReScan(((Plan *) node)->lefttree, exprCtxt, (Plan *) node); - } |