diff options
Diffstat (limited to 'src/backend/access/gin/ginfast.c')
-rw-r--r-- | src/backend/access/gin/ginfast.c | 866 |
1 files changed, 866 insertions, 0 deletions
diff --git a/src/backend/access/gin/ginfast.c b/src/backend/access/gin/ginfast.c new file mode 100644 index 00000000000..d8624237ec1 --- /dev/null +++ b/src/backend/access/gin/ginfast.c @@ -0,0 +1,866 @@ +/*------------------------------------------------------------------------- + * + * ginfast.c + * Fast insert routines for the Postgres inverted index access method. + * Pending entries are stored in linear list of pages. Later on + * (typically during VACUUM), ginInsertCleanup() will be invoked to + * transfer pending entries into the regular index structure. This + * wins because bulk insertion is much more efficient than retail. + * + * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * $PostgreSQL: pgsql/src/backend/access/gin/ginfast.c,v 1.1 2009/03/24 20:17:10 tgl Exp $ + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/genam.h" +#include "access/gin.h" +#include "access/tuptoaster.h" +#include "catalog/index.h" +#include "commands/vacuum.h" +#include "miscadmin.h" +#include "storage/bufmgr.h" +#include "utils/memutils.h" + + +#define GIN_PAGE_FREESIZE \ + ( BLCKSZ - MAXALIGN(SizeOfPageHeaderData) - MAXALIGN(sizeof(GinPageOpaqueData)) ) + +typedef struct DatumArray +{ + Datum *values; /* expansible array */ + int32 nvalues; /* current number of valid entries */ + int32 maxvalues; /* allocated size of array */ +} DatumArray; + + +/* + * Build a pending-list page from the given array of tuples, and write it out. + */ +static int32 +writeListPage(Relation index, Buffer buffer, + IndexTuple *tuples, int32 ntuples, BlockNumber rightlink) +{ + Page page = BufferGetPage(buffer); + int i, freesize, size=0; + OffsetNumber l, off; + char *workspace; + char *ptr; + + /* workspace could be a local array; we use palloc for alignment */ + workspace = palloc(BLCKSZ); + + START_CRIT_SECTION(); + + GinInitBuffer(buffer, GIN_LIST); + + off = FirstOffsetNumber; + ptr = workspace; + + for(i=0; i<ntuples; i++) + { + int this_size = IndexTupleSize(tuples[i]); + + memcpy(ptr, tuples[i], this_size); + ptr += this_size; + size += this_size; + + l = PageAddItem(page, (Item)tuples[i], this_size, off, false, false); + + if (l == InvalidOffsetNumber) + elog(ERROR, "failed to add item to index page in \"%s\"", + RelationGetRelationName(index)); + + off++; + } + + Assert(size <= BLCKSZ); /* else we overran workspace */ + + GinPageGetOpaque(page)->rightlink = rightlink; + + /* + * tail page may contain only the whole row(s) or final + * part of row placed on previous pages + */ + if ( rightlink == InvalidBlockNumber ) + { + GinPageSetFullRow(page); + GinPageGetOpaque(page)->maxoff = 1; + } + else + { + GinPageGetOpaque(page)->maxoff = 0; + } + + freesize = PageGetFreeSpace(page); + + MarkBufferDirty(buffer); + + if (!index->rd_istemp) + { + XLogRecData rdata[2]; + ginxlogInsertListPage data; + XLogRecPtr recptr; + + rdata[0].buffer = buffer; + rdata[0].buffer_std = true; + rdata[0].data = (char*)&data; + rdata[0].len = sizeof(ginxlogInsertListPage); + rdata[0].next = rdata+1; + + rdata[1].buffer = InvalidBuffer; + rdata[1].data = workspace; + rdata[1].len = size; + rdata[1].next = NULL; + + data.blkno = BufferGetBlockNumber(buffer); + data.rightlink = rightlink; + data.ntuples = ntuples; + + recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT_LISTPAGE, rdata); + PageSetLSN(page, recptr); + PageSetTLI(page, ThisTimeLineID); + } + + UnlockReleaseBuffer(buffer); + + END_CRIT_SECTION(); + + pfree(workspace); + + return freesize; +} + +static void +makeSublist(Relation index, IndexTuple *tuples, int32 ntuples, + GinMetaPageData *res) +{ + Buffer curBuffer = InvalidBuffer; + Buffer prevBuffer = InvalidBuffer; + int i, size = 0, tupsize; + int startTuple = 0; + + Assert(ntuples > 0); + + /* + * Split tuples into pages + */ + for(i=0;i<ntuples;i++) + { + if ( curBuffer == InvalidBuffer ) + { + curBuffer = GinNewBuffer(index); + + if ( prevBuffer != InvalidBuffer ) + { + res->nPendingPages++; + writeListPage(index, prevBuffer, + tuples+startTuple, i-startTuple, + BufferGetBlockNumber(curBuffer)); + } + else + { + res->head = BufferGetBlockNumber(curBuffer); + } + + prevBuffer = curBuffer; + startTuple = i; + size = 0; + } + + tupsize = MAXALIGN(IndexTupleSize(tuples[i])) + sizeof(ItemIdData); + + if ( size + tupsize >= GinListPageSize ) + { + /* won't fit, force a new page and reprocess */ + i--; + curBuffer = InvalidBuffer; + } + else + { + size += tupsize; + } + } + + /* + * Write last page + */ + res->tail = BufferGetBlockNumber(curBuffer); + res->tailFreeSize = writeListPage(index, curBuffer, + tuples+startTuple, ntuples-startTuple, + InvalidBlockNumber); + res->nPendingPages++; + /* that was only one heap tuple */ + res->nPendingHeapTuples = 1; +} + +/* + * Inserts collected values during normal insertion. Function guarantees + * that all values of heap will be stored sequentially, preserving order + */ +void +ginHeapTupleFastInsert(Relation index, GinState *ginstate, + GinTupleCollector *collector) +{ + Buffer metabuffer; + Page metapage; + GinMetaPageData *metadata = NULL; + XLogRecData rdata[2]; + Buffer buffer = InvalidBuffer; + Page page = NULL; + ginxlogUpdateMeta data; + bool separateList = false; + bool needCleanup = false; + + if ( collector->ntuples == 0 ) + return; + + data.node = index->rd_node; + data.ntuples = 0; + data.newRightlink = data.prevTail = InvalidBlockNumber; + + rdata[0].buffer = InvalidBuffer; + rdata[0].data = (char *) &data; + rdata[0].len = sizeof(ginxlogUpdateMeta); + rdata[0].next = NULL; + + metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO); + metapage = BufferGetPage(metabuffer); + + if ( collector->sumsize + collector->ntuples * sizeof(ItemIdData) > GIN_PAGE_FREESIZE ) + { + /* + * Total size is greater than one page => make sublist + */ + separateList = true; + } + else + { + LockBuffer(metabuffer, GIN_EXCLUSIVE); + metadata = GinPageGetMeta(metapage); + + if ( metadata->head == InvalidBlockNumber || + collector->sumsize + collector->ntuples * sizeof(ItemIdData) > metadata->tailFreeSize ) + { + /* + * Pending list is empty or total size is greater than freespace + * on tail page => make sublist + * + * We unlock metabuffer to keep high concurrency + */ + separateList = true; + LockBuffer(metabuffer, GIN_UNLOCK); + } + } + + if ( separateList ) + { + GinMetaPageData sublist; + + /* + * We should make sublist separately and append it to the tail + */ + memset( &sublist, 0, sizeof(GinMetaPageData) ); + + makeSublist(index, collector->tuples, collector->ntuples, &sublist); + + /* + * metapage was unlocked, see above + */ + LockBuffer(metabuffer, GIN_EXCLUSIVE); + metadata = GinPageGetMeta(metapage); + + if ( metadata->head == InvalidBlockNumber ) + { + /* + * Sublist becomes main list + */ + START_CRIT_SECTION(); + memcpy(metadata, &sublist, sizeof(GinMetaPageData) ); + memcpy(&data.metadata, &sublist, sizeof(GinMetaPageData) ); + } + else + { + /* + * merge lists + */ + + data.prevTail = metadata->tail; + buffer = ReadBuffer(index, metadata->tail); + LockBuffer(buffer, GIN_EXCLUSIVE); + page = BufferGetPage(buffer); + Assert(GinPageGetOpaque(page)->rightlink == InvalidBlockNumber); + + START_CRIT_SECTION(); + + GinPageGetOpaque(page)->rightlink = sublist.head; + metadata->tail = sublist.tail; + metadata->tailFreeSize = sublist.tailFreeSize; + + metadata->nPendingPages += sublist.nPendingPages; + metadata->nPendingHeapTuples += sublist.nPendingHeapTuples; + + memcpy(&data.metadata, metadata, sizeof(GinMetaPageData) ); + data.newRightlink = sublist.head; + + MarkBufferDirty(buffer); + } + } + else + { + /* + * Insert into tail page, metapage is already locked + */ + + OffsetNumber l, off; + int i, tupsize; + char *ptr; + + buffer = ReadBuffer(index, metadata->tail); + LockBuffer(buffer, GIN_EXCLUSIVE); + page = BufferGetPage(buffer); + off = (PageIsEmpty(page)) ? FirstOffsetNumber : + OffsetNumberNext(PageGetMaxOffsetNumber(page)); + + rdata[0].next = rdata + 1; + + rdata[1].buffer = buffer; + rdata[1].buffer_std = true; + ptr = rdata[1].data = (char *) palloc( collector->sumsize ); + rdata[1].len = collector->sumsize; + rdata[1].next = NULL; + + data.ntuples = collector->ntuples; + + START_CRIT_SECTION(); + + /* + * Increase counter of heap tuples + */ + Assert( GinPageGetOpaque(page)->maxoff <= metadata->nPendingHeapTuples ); + GinPageGetOpaque(page)->maxoff++; + metadata->nPendingHeapTuples++; + + for(i=0; i<collector->ntuples; i++) + { + tupsize = IndexTupleSize(collector->tuples[i]); + l = PageAddItem(page, (Item)collector->tuples[i], tupsize, off, false, false); + + if (l == InvalidOffsetNumber) + elog(ERROR, "failed to add item to index page in \"%s\"", + RelationGetRelationName(index)); + + memcpy(ptr, collector->tuples[i], tupsize); + ptr+=tupsize; + + off++; + } + + metadata->tailFreeSize -= collector->sumsize + collector->ntuples * sizeof(ItemIdData); + memcpy(&data.metadata, metadata, sizeof(GinMetaPageData) ); + MarkBufferDirty(buffer); + } + + /* + * Make real write + */ + + MarkBufferDirty(metabuffer); + if ( !index->rd_istemp ) + { + XLogRecPtr recptr; + + recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_UPDATE_META_PAGE, rdata); + PageSetLSN(metapage, recptr); + PageSetTLI(metapage, ThisTimeLineID); + + if ( buffer != InvalidBuffer ) + { + PageSetLSN(page, recptr); + PageSetTLI(page, ThisTimeLineID); + } + } + + if (buffer != InvalidBuffer) + UnlockReleaseBuffer(buffer); + + /* + * Force pending list cleanup when it becomes too long. + * And, ginInsertCleanup could take significant amount of + * time, so we prefer to call it when it can do all the work in a + * single collection cycle. In non-vacuum mode, it shouldn't + * require maintenance_work_mem, so fire it while pending list is + * still small enough to fit into work_mem. + * + * ginInsertCleanup() should not be called inside our CRIT_SECTION. + */ + if ( metadata->nPendingPages * GIN_PAGE_FREESIZE > work_mem * 1024L ) + needCleanup = true; + + UnlockReleaseBuffer(metabuffer); + + END_CRIT_SECTION(); + + if ( needCleanup ) + ginInsertCleanup(index, ginstate, false, NULL); +} + +/* + * Collect values from one tuples to be indexed. All values for + * one tuples should be written at once - to guarantee consistent state + */ +uint32 +ginHeapTupleFastCollect(Relation index, GinState *ginstate, + GinTupleCollector *collector, + OffsetNumber attnum, Datum value, ItemPointer item) +{ + Datum *entries; + int32 i, + nentries; + + entries = extractEntriesSU(ginstate, attnum, value, &nentries); + + if (nentries == 0) + /* nothing to insert */ + return 0; + + /* + * Allocate/reallocate memory for storing collected tuples + */ + if ( collector->tuples == NULL ) + { + collector->lentuples = nentries * index->rd_att->natts; + collector->tuples = (IndexTuple*)palloc(sizeof(IndexTuple) * collector->lentuples); + } + + while ( collector->ntuples + nentries > collector->lentuples ) + { + collector->lentuples *= 2; + collector->tuples = (IndexTuple*)repalloc( collector->tuples, + sizeof(IndexTuple) * collector->lentuples); + } + + /* + * Creates tuple's array + */ + for (i = 0; i < nentries; i++) + { + int32 tupsize; + + collector->tuples[collector->ntuples + i] = GinFormTuple(ginstate, attnum, entries[i], NULL, 0); + collector->tuples[collector->ntuples + i]->t_tid = *item; + tupsize = IndexTupleSize(collector->tuples[collector->ntuples + i]); + + if ( tupsize > TOAST_INDEX_TARGET || tupsize >= GinMaxItemSize) + elog(ERROR, "huge tuple"); + + collector->sumsize += tupsize; + } + + collector->ntuples += nentries; + + return nentries; +} + +/* + * Deletes pending list pages up to (not including) newHead page. + * If newHead == InvalidBlockNumber then function drops the whole list. + * + * metapage is pinned and exclusive-locked throughout this function. + * + * Returns true if another cleanup process is running concurrently + * (if so, we can just abandon our own efforts) + */ +static bool +shiftList(Relation index, Buffer metabuffer, BlockNumber newHead, + IndexBulkDeleteResult *stats) +{ + Page metapage; + GinMetaPageData *metadata; + BlockNumber blknoToDelete; + + metapage = BufferGetPage(metabuffer); + metadata = GinPageGetMeta(metapage); + blknoToDelete = metadata->head; + + do + { + Page page; + int i; + int64 nDeletedHeapTuples = 0; + ginxlogDeleteListPages data; + XLogRecData rdata[1]; + Buffer buffers[GIN_NDELETE_AT_ONCE]; + + data.node = index->rd_node; + + rdata[0].buffer = InvalidBuffer; + rdata[0].data = (char *) &data; + rdata[0].len = sizeof(ginxlogDeleteListPages); + rdata[0].next = NULL; + + data.ndeleted = 0; + while (data.ndeleted < GIN_NDELETE_AT_ONCE && blknoToDelete != newHead) + { + data.toDelete[ data.ndeleted ] = blknoToDelete; + buffers[ data.ndeleted ] = ReadBuffer(index, blknoToDelete); + LockBuffer( buffers[ data.ndeleted ], GIN_EXCLUSIVE ); + page = BufferGetPage( buffers[ data.ndeleted ] ); + + data.ndeleted++; + + if ( GinPageIsDeleted(page) ) + { + /* concurrent cleanup process is detected */ + for(i=0;i<data.ndeleted;i++) + UnlockReleaseBuffer( buffers[i] ); + + return true; + } + + nDeletedHeapTuples += GinPageGetOpaque(page)->maxoff; + blknoToDelete = GinPageGetOpaque( page )->rightlink; + } + + if (stats) + stats->pages_deleted += data.ndeleted; + + START_CRIT_SECTION(); + + metadata->head = blknoToDelete; + + Assert( metadata->nPendingPages >= data.ndeleted ); + metadata->nPendingPages -= data.ndeleted; + Assert( metadata->nPendingHeapTuples >= nDeletedHeapTuples ); + metadata->nPendingHeapTuples -= nDeletedHeapTuples; + + if ( blknoToDelete == InvalidBlockNumber ) + { + metadata->tail = InvalidBlockNumber; + metadata->tailFreeSize = 0; + metadata->nPendingPages = 0; + metadata->nPendingHeapTuples = 0; + } + memcpy( &data.metadata, metadata, sizeof(GinMetaPageData)); + + MarkBufferDirty( metabuffer ); + + for(i=0; i<data.ndeleted; i++) + { + page = BufferGetPage( buffers[ i ] ); + GinPageGetOpaque( page )->flags = GIN_DELETED; + MarkBufferDirty( buffers[ i ] ); + } + + if ( !index->rd_istemp ) + { + XLogRecPtr recptr; + + recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_DELETE_LISTPAGE, rdata); + PageSetLSN(metapage, recptr); + PageSetTLI(metapage, ThisTimeLineID); + + for(i=0; i<data.ndeleted; i++) + { + page = BufferGetPage( buffers[ i ] ); + PageSetLSN(page, recptr); + PageSetTLI(page, ThisTimeLineID); + } + } + + for(i=0; i<data.ndeleted; i++) + UnlockReleaseBuffer( buffers[ i ] ); + + END_CRIT_SECTION(); + } while( blknoToDelete != newHead ); + + return false; +} + +/* Add datum to DatumArray, resizing if needed */ +static void +addDatum(DatumArray *datums, Datum datum) +{ + if ( datums->nvalues >= datums->maxvalues) + { + datums->maxvalues *= 2; + datums->values = (Datum*)repalloc(datums->values, + sizeof(Datum)*datums->maxvalues); + } + + datums->values[ datums->nvalues++ ] = datum; +} + +/* + * Go through all tuples >= startoff on page and collect values in memory + * + * Note that da is just workspace --- it does not carry any state across + * calls. + */ +static void +processPendingPage(BuildAccumulator *accum, DatumArray *da, + Page page, OffsetNumber startoff) +{ + ItemPointerData heapptr; + OffsetNumber i,maxoff; + OffsetNumber attrnum, curattnum; + + /* reset *da to empty */ + da->nvalues = 0; + + maxoff = PageGetMaxOffsetNumber(page); + Assert( maxoff >= FirstOffsetNumber ); + ItemPointerSetInvalid(&heapptr); + attrnum = 0; + + for (i = startoff; i <= maxoff; i = OffsetNumberNext(i)) + { + IndexTuple itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i)); + + curattnum = gintuple_get_attrnum(accum->ginstate, itup); + + if ( !ItemPointerIsValid(&heapptr) ) + { + heapptr = itup->t_tid; + attrnum = curattnum; + } + else if ( !(ItemPointerEquals(&heapptr, &itup->t_tid) && + curattnum == attrnum) ) + { + /* + * We can insert several datums per call, but only for one heap + * tuple and one column. + */ + ginInsertRecordBA(accum, &heapptr, attrnum, da->values, da->nvalues); + da->nvalues = 0; + heapptr = itup->t_tid; + attrnum = curattnum; + } + addDatum(da, gin_index_getattr(accum->ginstate, itup)); + } + + ginInsertRecordBA(accum, &heapptr, attrnum, da->values, da->nvalues); +} + +/* + * Move tuples from pending pages into regular GIN structure. + * + * This can be called concurrently by multiple backends, so it must cope. + * On first glance it looks completely not concurrent-safe and not crash-safe + * either. The reason it's okay is that multiple insertion of the same entry + * is detected and treated as a no-op by gininsert.c. If we crash after + * posting entries to the main index and before removing them from the + * pending list, it's okay because when we redo the posting later on, nothing + * bad will happen. Likewise, if two backends simultaneously try to post + * a pending entry into the main index, one will succeed and one will do + * nothing. We try to notice when someone else is a little bit ahead of + * us in the process, but that's just to avoid wasting cycles. Only the + * action of removing a page from the pending list really needs exclusive + * lock. + * + * vac_delay indicates that ginInsertCleanup is called from vacuum process, + * so call vacuum_delay_point() periodically. + * If stats isn't null, we count deleted pending pages into the counts. + */ +void +ginInsertCleanup(Relation index, GinState *ginstate, + bool vac_delay, IndexBulkDeleteResult *stats) +{ + Buffer metabuffer, buffer; + Page metapage, page; + GinMetaPageData *metadata; + MemoryContext opCtx, oldCtx; + BuildAccumulator accum; + DatumArray datums; + BlockNumber blkno; + + metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO); + LockBuffer(metabuffer, GIN_SHARE); + metapage = BufferGetPage(metabuffer); + metadata = GinPageGetMeta(metapage); + + if ( metadata->head == InvalidBlockNumber ) + { + /* Nothing to do */ + UnlockReleaseBuffer(metabuffer); + return; + } + + /* + * Read and lock head of pending list + */ + blkno = metadata->head; + buffer = ReadBuffer(index, blkno); + LockBuffer(buffer, GIN_SHARE); + page = BufferGetPage(buffer); + + LockBuffer(metabuffer, GIN_UNLOCK); + + /* + * Initialize. All temporary space will be in opCtx + */ + opCtx = AllocSetContextCreate(CurrentMemoryContext, + "GIN insert cleanup temporary context", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + + oldCtx = MemoryContextSwitchTo(opCtx); + + datums.maxvalues=128; + datums.nvalues = 0; + datums.values = (Datum*)palloc(sizeof(Datum)*datums.maxvalues); + + ginInitBA(&accum); + accum.ginstate = ginstate; + + /* + * At the top of this loop, we have pin and lock on the current page + * of the pending list. However, we'll release that before exiting + * the loop. Note we also have pin but not lock on the metapage. + */ + for(;;) + { + if ( GinPageIsDeleted(page) ) + { + /* another cleanup process is running concurrently */ + UnlockReleaseBuffer( buffer ); + break; + } + + /* + * read page's datums into memory + */ + processPendingPage(&accum, &datums, page, FirstOffsetNumber); + + if (vac_delay) + vacuum_delay_point(); + + /* + * Is it time to flush memory to disk? Flush if we are at the end + * of the pending list, or if we have a full row and memory is + * getting full. + * + * XXX using up maintenance_work_mem here is probably unreasonably + * much, since vacuum might already be using that much. + */ + if ( GinPageGetOpaque(page)->rightlink == InvalidBlockNumber || + ( GinPageHasFullRow(page) && + accum.allocatedMemory > maintenance_work_mem * 1024L ) ) + { + ItemPointerData *list; + uint32 nlist; + Datum entry; + OffsetNumber maxoff, attnum; + + /* + * Unlock current page to increase performance. + * Changes of page will be checked later by comparing + * maxoff after completion of memory flush. + */ + maxoff = PageGetMaxOffsetNumber(page); + LockBuffer(buffer, GIN_UNLOCK); + + /* + * Moving collected data into regular structure can take + * significant amount of time - so, run it without locking pending + * list. + */ + while ((list = ginGetEntry(&accum, &attnum, &entry, &nlist)) != NULL) + { + ginEntryInsert(index, ginstate, attnum, entry, list, nlist, FALSE); + if (vac_delay) + vacuum_delay_point(); + } + + /* + * Lock the whole list to remove pages + */ + LockBuffer(metabuffer, GIN_EXCLUSIVE); + LockBuffer(buffer, GIN_SHARE); + + if ( GinPageIsDeleted(page) ) + { + /* another cleanup process is running concurrently */ + UnlockReleaseBuffer(buffer); + LockBuffer(metabuffer, GIN_UNLOCK); + break; + } + + /* + * While we left the page unlocked, more stuff might have gotten + * added to it. If so, process those entries immediately. There + * shouldn't be very many, so we don't worry about the fact that + * we're doing this with exclusive lock. Insertion algorithm + * gurantees that inserted row(s) will not continue on next page. + * NOTE: intentionally no vacuum_delay_point in this loop. + */ + if ( PageGetMaxOffsetNumber(page) != maxoff ) + { + ginInitBA(&accum); + processPendingPage(&accum, &datums, page, maxoff+1); + + while ((list = ginGetEntry(&accum, &attnum, &entry, &nlist)) != NULL) + ginEntryInsert(index, ginstate, attnum, entry, list, nlist, FALSE); + } + + /* + * Remember next page - it will become the new list head + */ + blkno = GinPageGetOpaque(page)->rightlink; + UnlockReleaseBuffer(buffer); /* shiftList will do exclusive locking */ + + /* + * remove readed pages from pending list, at this point all + * content of readed pages is in regular structure + */ + if ( shiftList(index, metabuffer, blkno, stats) ) + { + /* another cleanup process is running concurrently */ + LockBuffer(metabuffer, GIN_UNLOCK); + break; + } + + Assert( blkno == metadata->head ); + LockBuffer(metabuffer, GIN_UNLOCK); + + /* + * if we removed the whole pending list just exit + */ + if ( blkno == InvalidBlockNumber ) + break; + + /* + * release memory used so far and reinit state + */ + MemoryContextReset(opCtx); + ginInitBA(&accum); + datums.nvalues = 0; + datums.values = (Datum*)palloc(sizeof(Datum)*datums.maxvalues); + } + else + { + blkno = GinPageGetOpaque(page)->rightlink; + UnlockReleaseBuffer(buffer); + } + + /* + * Read next page in pending list + */ + CHECK_FOR_INTERRUPTS(); + buffer = ReadBuffer(index, blkno); + LockBuffer(buffer, GIN_SHARE); + page = BufferGetPage(buffer); + } + + ReleaseBuffer(metabuffer); + + /* Clean up temporary space */ + MemoryContextSwitchTo(oldCtx); + MemoryContextDelete(opCtx); +} |