aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/gin/ginfast.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/gin/ginfast.c')
-rw-r--r--src/backend/access/gin/ginfast.c866
1 files changed, 866 insertions, 0 deletions
diff --git a/src/backend/access/gin/ginfast.c b/src/backend/access/gin/ginfast.c
new file mode 100644
index 00000000000..d8624237ec1
--- /dev/null
+++ b/src/backend/access/gin/ginfast.c
@@ -0,0 +1,866 @@
+/*-------------------------------------------------------------------------
+ *
+ * ginfast.c
+ * Fast insert routines for the Postgres inverted index access method.
+ * Pending entries are stored in linear list of pages. Later on
+ * (typically during VACUUM), ginInsertCleanup() will be invoked to
+ * transfer pending entries into the regular index structure. This
+ * wins because bulk insertion is much more efficient than retail.
+ *
+ * Portions Copyright (c) 1996-2009, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * $PostgreSQL: pgsql/src/backend/access/gin/ginfast.c,v 1.1 2009/03/24 20:17:10 tgl Exp $
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/genam.h"
+#include "access/gin.h"
+#include "access/tuptoaster.h"
+#include "catalog/index.h"
+#include "commands/vacuum.h"
+#include "miscadmin.h"
+#include "storage/bufmgr.h"
+#include "utils/memutils.h"
+
+
+#define GIN_PAGE_FREESIZE \
+ ( BLCKSZ - MAXALIGN(SizeOfPageHeaderData) - MAXALIGN(sizeof(GinPageOpaqueData)) )
+
+typedef struct DatumArray
+{
+ Datum *values; /* expansible array */
+ int32 nvalues; /* current number of valid entries */
+ int32 maxvalues; /* allocated size of array */
+} DatumArray;
+
+
+/*
+ * Build a pending-list page from the given array of tuples, and write it out.
+ */
+static int32
+writeListPage(Relation index, Buffer buffer,
+ IndexTuple *tuples, int32 ntuples, BlockNumber rightlink)
+{
+ Page page = BufferGetPage(buffer);
+ int i, freesize, size=0;
+ OffsetNumber l, off;
+ char *workspace;
+ char *ptr;
+
+ /* workspace could be a local array; we use palloc for alignment */
+ workspace = palloc(BLCKSZ);
+
+ START_CRIT_SECTION();
+
+ GinInitBuffer(buffer, GIN_LIST);
+
+ off = FirstOffsetNumber;
+ ptr = workspace;
+
+ for(i=0; i<ntuples; i++)
+ {
+ int this_size = IndexTupleSize(tuples[i]);
+
+ memcpy(ptr, tuples[i], this_size);
+ ptr += this_size;
+ size += this_size;
+
+ l = PageAddItem(page, (Item)tuples[i], this_size, off, false, false);
+
+ if (l == InvalidOffsetNumber)
+ elog(ERROR, "failed to add item to index page in \"%s\"",
+ RelationGetRelationName(index));
+
+ off++;
+ }
+
+ Assert(size <= BLCKSZ); /* else we overran workspace */
+
+ GinPageGetOpaque(page)->rightlink = rightlink;
+
+ /*
+ * tail page may contain only the whole row(s) or final
+ * part of row placed on previous pages
+ */
+ if ( rightlink == InvalidBlockNumber )
+ {
+ GinPageSetFullRow(page);
+ GinPageGetOpaque(page)->maxoff = 1;
+ }
+ else
+ {
+ GinPageGetOpaque(page)->maxoff = 0;
+ }
+
+ freesize = PageGetFreeSpace(page);
+
+ MarkBufferDirty(buffer);
+
+ if (!index->rd_istemp)
+ {
+ XLogRecData rdata[2];
+ ginxlogInsertListPage data;
+ XLogRecPtr recptr;
+
+ rdata[0].buffer = buffer;
+ rdata[0].buffer_std = true;
+ rdata[0].data = (char*)&data;
+ rdata[0].len = sizeof(ginxlogInsertListPage);
+ rdata[0].next = rdata+1;
+
+ rdata[1].buffer = InvalidBuffer;
+ rdata[1].data = workspace;
+ rdata[1].len = size;
+ rdata[1].next = NULL;
+
+ data.blkno = BufferGetBlockNumber(buffer);
+ data.rightlink = rightlink;
+ data.ntuples = ntuples;
+
+ recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT_LISTPAGE, rdata);
+ PageSetLSN(page, recptr);
+ PageSetTLI(page, ThisTimeLineID);
+ }
+
+ UnlockReleaseBuffer(buffer);
+
+ END_CRIT_SECTION();
+
+ pfree(workspace);
+
+ return freesize;
+}
+
+static void
+makeSublist(Relation index, IndexTuple *tuples, int32 ntuples,
+ GinMetaPageData *res)
+{
+ Buffer curBuffer = InvalidBuffer;
+ Buffer prevBuffer = InvalidBuffer;
+ int i, size = 0, tupsize;
+ int startTuple = 0;
+
+ Assert(ntuples > 0);
+
+ /*
+ * Split tuples into pages
+ */
+ for(i=0;i<ntuples;i++)
+ {
+ if ( curBuffer == InvalidBuffer )
+ {
+ curBuffer = GinNewBuffer(index);
+
+ if ( prevBuffer != InvalidBuffer )
+ {
+ res->nPendingPages++;
+ writeListPage(index, prevBuffer,
+ tuples+startTuple, i-startTuple,
+ BufferGetBlockNumber(curBuffer));
+ }
+ else
+ {
+ res->head = BufferGetBlockNumber(curBuffer);
+ }
+
+ prevBuffer = curBuffer;
+ startTuple = i;
+ size = 0;
+ }
+
+ tupsize = MAXALIGN(IndexTupleSize(tuples[i])) + sizeof(ItemIdData);
+
+ if ( size + tupsize >= GinListPageSize )
+ {
+ /* won't fit, force a new page and reprocess */
+ i--;
+ curBuffer = InvalidBuffer;
+ }
+ else
+ {
+ size += tupsize;
+ }
+ }
+
+ /*
+ * Write last page
+ */
+ res->tail = BufferGetBlockNumber(curBuffer);
+ res->tailFreeSize = writeListPage(index, curBuffer,
+ tuples+startTuple, ntuples-startTuple,
+ InvalidBlockNumber);
+ res->nPendingPages++;
+ /* that was only one heap tuple */
+ res->nPendingHeapTuples = 1;
+}
+
+/*
+ * Inserts collected values during normal insertion. Function guarantees
+ * that all values of heap will be stored sequentially, preserving order
+ */
+void
+ginHeapTupleFastInsert(Relation index, GinState *ginstate,
+ GinTupleCollector *collector)
+{
+ Buffer metabuffer;
+ Page metapage;
+ GinMetaPageData *metadata = NULL;
+ XLogRecData rdata[2];
+ Buffer buffer = InvalidBuffer;
+ Page page = NULL;
+ ginxlogUpdateMeta data;
+ bool separateList = false;
+ bool needCleanup = false;
+
+ if ( collector->ntuples == 0 )
+ return;
+
+ data.node = index->rd_node;
+ data.ntuples = 0;
+ data.newRightlink = data.prevTail = InvalidBlockNumber;
+
+ rdata[0].buffer = InvalidBuffer;
+ rdata[0].data = (char *) &data;
+ rdata[0].len = sizeof(ginxlogUpdateMeta);
+ rdata[0].next = NULL;
+
+ metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
+ metapage = BufferGetPage(metabuffer);
+
+ if ( collector->sumsize + collector->ntuples * sizeof(ItemIdData) > GIN_PAGE_FREESIZE )
+ {
+ /*
+ * Total size is greater than one page => make sublist
+ */
+ separateList = true;
+ }
+ else
+ {
+ LockBuffer(metabuffer, GIN_EXCLUSIVE);
+ metadata = GinPageGetMeta(metapage);
+
+ if ( metadata->head == InvalidBlockNumber ||
+ collector->sumsize + collector->ntuples * sizeof(ItemIdData) > metadata->tailFreeSize )
+ {
+ /*
+ * Pending list is empty or total size is greater than freespace
+ * on tail page => make sublist
+ *
+ * We unlock metabuffer to keep high concurrency
+ */
+ separateList = true;
+ LockBuffer(metabuffer, GIN_UNLOCK);
+ }
+ }
+
+ if ( separateList )
+ {
+ GinMetaPageData sublist;
+
+ /*
+ * We should make sublist separately and append it to the tail
+ */
+ memset( &sublist, 0, sizeof(GinMetaPageData) );
+
+ makeSublist(index, collector->tuples, collector->ntuples, &sublist);
+
+ /*
+ * metapage was unlocked, see above
+ */
+ LockBuffer(metabuffer, GIN_EXCLUSIVE);
+ metadata = GinPageGetMeta(metapage);
+
+ if ( metadata->head == InvalidBlockNumber )
+ {
+ /*
+ * Sublist becomes main list
+ */
+ START_CRIT_SECTION();
+ memcpy(metadata, &sublist, sizeof(GinMetaPageData) );
+ memcpy(&data.metadata, &sublist, sizeof(GinMetaPageData) );
+ }
+ else
+ {
+ /*
+ * merge lists
+ */
+
+ data.prevTail = metadata->tail;
+ buffer = ReadBuffer(index, metadata->tail);
+ LockBuffer(buffer, GIN_EXCLUSIVE);
+ page = BufferGetPage(buffer);
+ Assert(GinPageGetOpaque(page)->rightlink == InvalidBlockNumber);
+
+ START_CRIT_SECTION();
+
+ GinPageGetOpaque(page)->rightlink = sublist.head;
+ metadata->tail = sublist.tail;
+ metadata->tailFreeSize = sublist.tailFreeSize;
+
+ metadata->nPendingPages += sublist.nPendingPages;
+ metadata->nPendingHeapTuples += sublist.nPendingHeapTuples;
+
+ memcpy(&data.metadata, metadata, sizeof(GinMetaPageData) );
+ data.newRightlink = sublist.head;
+
+ MarkBufferDirty(buffer);
+ }
+ }
+ else
+ {
+ /*
+ * Insert into tail page, metapage is already locked
+ */
+
+ OffsetNumber l, off;
+ int i, tupsize;
+ char *ptr;
+
+ buffer = ReadBuffer(index, metadata->tail);
+ LockBuffer(buffer, GIN_EXCLUSIVE);
+ page = BufferGetPage(buffer);
+ off = (PageIsEmpty(page)) ? FirstOffsetNumber :
+ OffsetNumberNext(PageGetMaxOffsetNumber(page));
+
+ rdata[0].next = rdata + 1;
+
+ rdata[1].buffer = buffer;
+ rdata[1].buffer_std = true;
+ ptr = rdata[1].data = (char *) palloc( collector->sumsize );
+ rdata[1].len = collector->sumsize;
+ rdata[1].next = NULL;
+
+ data.ntuples = collector->ntuples;
+
+ START_CRIT_SECTION();
+
+ /*
+ * Increase counter of heap tuples
+ */
+ Assert( GinPageGetOpaque(page)->maxoff <= metadata->nPendingHeapTuples );
+ GinPageGetOpaque(page)->maxoff++;
+ metadata->nPendingHeapTuples++;
+
+ for(i=0; i<collector->ntuples; i++)
+ {
+ tupsize = IndexTupleSize(collector->tuples[i]);
+ l = PageAddItem(page, (Item)collector->tuples[i], tupsize, off, false, false);
+
+ if (l == InvalidOffsetNumber)
+ elog(ERROR, "failed to add item to index page in \"%s\"",
+ RelationGetRelationName(index));
+
+ memcpy(ptr, collector->tuples[i], tupsize);
+ ptr+=tupsize;
+
+ off++;
+ }
+
+ metadata->tailFreeSize -= collector->sumsize + collector->ntuples * sizeof(ItemIdData);
+ memcpy(&data.metadata, metadata, sizeof(GinMetaPageData) );
+ MarkBufferDirty(buffer);
+ }
+
+ /*
+ * Make real write
+ */
+
+ MarkBufferDirty(metabuffer);
+ if ( !index->rd_istemp )
+ {
+ XLogRecPtr recptr;
+
+ recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_UPDATE_META_PAGE, rdata);
+ PageSetLSN(metapage, recptr);
+ PageSetTLI(metapage, ThisTimeLineID);
+
+ if ( buffer != InvalidBuffer )
+ {
+ PageSetLSN(page, recptr);
+ PageSetTLI(page, ThisTimeLineID);
+ }
+ }
+
+ if (buffer != InvalidBuffer)
+ UnlockReleaseBuffer(buffer);
+
+ /*
+ * Force pending list cleanup when it becomes too long.
+ * And, ginInsertCleanup could take significant amount of
+ * time, so we prefer to call it when it can do all the work in a
+ * single collection cycle. In non-vacuum mode, it shouldn't
+ * require maintenance_work_mem, so fire it while pending list is
+ * still small enough to fit into work_mem.
+ *
+ * ginInsertCleanup() should not be called inside our CRIT_SECTION.
+ */
+ if ( metadata->nPendingPages * GIN_PAGE_FREESIZE > work_mem * 1024L )
+ needCleanup = true;
+
+ UnlockReleaseBuffer(metabuffer);
+
+ END_CRIT_SECTION();
+
+ if ( needCleanup )
+ ginInsertCleanup(index, ginstate, false, NULL);
+}
+
+/*
+ * Collect values from one tuples to be indexed. All values for
+ * one tuples should be written at once - to guarantee consistent state
+ */
+uint32
+ginHeapTupleFastCollect(Relation index, GinState *ginstate,
+ GinTupleCollector *collector,
+ OffsetNumber attnum, Datum value, ItemPointer item)
+{
+ Datum *entries;
+ int32 i,
+ nentries;
+
+ entries = extractEntriesSU(ginstate, attnum, value, &nentries);
+
+ if (nentries == 0)
+ /* nothing to insert */
+ return 0;
+
+ /*
+ * Allocate/reallocate memory for storing collected tuples
+ */
+ if ( collector->tuples == NULL )
+ {
+ collector->lentuples = nentries * index->rd_att->natts;
+ collector->tuples = (IndexTuple*)palloc(sizeof(IndexTuple) * collector->lentuples);
+ }
+
+ while ( collector->ntuples + nentries > collector->lentuples )
+ {
+ collector->lentuples *= 2;
+ collector->tuples = (IndexTuple*)repalloc( collector->tuples,
+ sizeof(IndexTuple) * collector->lentuples);
+ }
+
+ /*
+ * Creates tuple's array
+ */
+ for (i = 0; i < nentries; i++)
+ {
+ int32 tupsize;
+
+ collector->tuples[collector->ntuples + i] = GinFormTuple(ginstate, attnum, entries[i], NULL, 0);
+ collector->tuples[collector->ntuples + i]->t_tid = *item;
+ tupsize = IndexTupleSize(collector->tuples[collector->ntuples + i]);
+
+ if ( tupsize > TOAST_INDEX_TARGET || tupsize >= GinMaxItemSize)
+ elog(ERROR, "huge tuple");
+
+ collector->sumsize += tupsize;
+ }
+
+ collector->ntuples += nentries;
+
+ return nentries;
+}
+
+/*
+ * Deletes pending list pages up to (not including) newHead page.
+ * If newHead == InvalidBlockNumber then function drops the whole list.
+ *
+ * metapage is pinned and exclusive-locked throughout this function.
+ *
+ * Returns true if another cleanup process is running concurrently
+ * (if so, we can just abandon our own efforts)
+ */
+static bool
+shiftList(Relation index, Buffer metabuffer, BlockNumber newHead,
+ IndexBulkDeleteResult *stats)
+{
+ Page metapage;
+ GinMetaPageData *metadata;
+ BlockNumber blknoToDelete;
+
+ metapage = BufferGetPage(metabuffer);
+ metadata = GinPageGetMeta(metapage);
+ blknoToDelete = metadata->head;
+
+ do
+ {
+ Page page;
+ int i;
+ int64 nDeletedHeapTuples = 0;
+ ginxlogDeleteListPages data;
+ XLogRecData rdata[1];
+ Buffer buffers[GIN_NDELETE_AT_ONCE];
+
+ data.node = index->rd_node;
+
+ rdata[0].buffer = InvalidBuffer;
+ rdata[0].data = (char *) &data;
+ rdata[0].len = sizeof(ginxlogDeleteListPages);
+ rdata[0].next = NULL;
+
+ data.ndeleted = 0;
+ while (data.ndeleted < GIN_NDELETE_AT_ONCE && blknoToDelete != newHead)
+ {
+ data.toDelete[ data.ndeleted ] = blknoToDelete;
+ buffers[ data.ndeleted ] = ReadBuffer(index, blknoToDelete);
+ LockBuffer( buffers[ data.ndeleted ], GIN_EXCLUSIVE );
+ page = BufferGetPage( buffers[ data.ndeleted ] );
+
+ data.ndeleted++;
+
+ if ( GinPageIsDeleted(page) )
+ {
+ /* concurrent cleanup process is detected */
+ for(i=0;i<data.ndeleted;i++)
+ UnlockReleaseBuffer( buffers[i] );
+
+ return true;
+ }
+
+ nDeletedHeapTuples += GinPageGetOpaque(page)->maxoff;
+ blknoToDelete = GinPageGetOpaque( page )->rightlink;
+ }
+
+ if (stats)
+ stats->pages_deleted += data.ndeleted;
+
+ START_CRIT_SECTION();
+
+ metadata->head = blknoToDelete;
+
+ Assert( metadata->nPendingPages >= data.ndeleted );
+ metadata->nPendingPages -= data.ndeleted;
+ Assert( metadata->nPendingHeapTuples >= nDeletedHeapTuples );
+ metadata->nPendingHeapTuples -= nDeletedHeapTuples;
+
+ if ( blknoToDelete == InvalidBlockNumber )
+ {
+ metadata->tail = InvalidBlockNumber;
+ metadata->tailFreeSize = 0;
+ metadata->nPendingPages = 0;
+ metadata->nPendingHeapTuples = 0;
+ }
+ memcpy( &data.metadata, metadata, sizeof(GinMetaPageData));
+
+ MarkBufferDirty( metabuffer );
+
+ for(i=0; i<data.ndeleted; i++)
+ {
+ page = BufferGetPage( buffers[ i ] );
+ GinPageGetOpaque( page )->flags = GIN_DELETED;
+ MarkBufferDirty( buffers[ i ] );
+ }
+
+ if ( !index->rd_istemp )
+ {
+ XLogRecPtr recptr;
+
+ recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_DELETE_LISTPAGE, rdata);
+ PageSetLSN(metapage, recptr);
+ PageSetTLI(metapage, ThisTimeLineID);
+
+ for(i=0; i<data.ndeleted; i++)
+ {
+ page = BufferGetPage( buffers[ i ] );
+ PageSetLSN(page, recptr);
+ PageSetTLI(page, ThisTimeLineID);
+ }
+ }
+
+ for(i=0; i<data.ndeleted; i++)
+ UnlockReleaseBuffer( buffers[ i ] );
+
+ END_CRIT_SECTION();
+ } while( blknoToDelete != newHead );
+
+ return false;
+}
+
+/* Add datum to DatumArray, resizing if needed */
+static void
+addDatum(DatumArray *datums, Datum datum)
+{
+ if ( datums->nvalues >= datums->maxvalues)
+ {
+ datums->maxvalues *= 2;
+ datums->values = (Datum*)repalloc(datums->values,
+ sizeof(Datum)*datums->maxvalues);
+ }
+
+ datums->values[ datums->nvalues++ ] = datum;
+}
+
+/*
+ * Go through all tuples >= startoff on page and collect values in memory
+ *
+ * Note that da is just workspace --- it does not carry any state across
+ * calls.
+ */
+static void
+processPendingPage(BuildAccumulator *accum, DatumArray *da,
+ Page page, OffsetNumber startoff)
+{
+ ItemPointerData heapptr;
+ OffsetNumber i,maxoff;
+ OffsetNumber attrnum, curattnum;
+
+ /* reset *da to empty */
+ da->nvalues = 0;
+
+ maxoff = PageGetMaxOffsetNumber(page);
+ Assert( maxoff >= FirstOffsetNumber );
+ ItemPointerSetInvalid(&heapptr);
+ attrnum = 0;
+
+ for (i = startoff; i <= maxoff; i = OffsetNumberNext(i))
+ {
+ IndexTuple itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, i));
+
+ curattnum = gintuple_get_attrnum(accum->ginstate, itup);
+
+ if ( !ItemPointerIsValid(&heapptr) )
+ {
+ heapptr = itup->t_tid;
+ attrnum = curattnum;
+ }
+ else if ( !(ItemPointerEquals(&heapptr, &itup->t_tid) &&
+ curattnum == attrnum) )
+ {
+ /*
+ * We can insert several datums per call, but only for one heap
+ * tuple and one column.
+ */
+ ginInsertRecordBA(accum, &heapptr, attrnum, da->values, da->nvalues);
+ da->nvalues = 0;
+ heapptr = itup->t_tid;
+ attrnum = curattnum;
+ }
+ addDatum(da, gin_index_getattr(accum->ginstate, itup));
+ }
+
+ ginInsertRecordBA(accum, &heapptr, attrnum, da->values, da->nvalues);
+}
+
+/*
+ * Move tuples from pending pages into regular GIN structure.
+ *
+ * This can be called concurrently by multiple backends, so it must cope.
+ * On first glance it looks completely not concurrent-safe and not crash-safe
+ * either. The reason it's okay is that multiple insertion of the same entry
+ * is detected and treated as a no-op by gininsert.c. If we crash after
+ * posting entries to the main index and before removing them from the
+ * pending list, it's okay because when we redo the posting later on, nothing
+ * bad will happen. Likewise, if two backends simultaneously try to post
+ * a pending entry into the main index, one will succeed and one will do
+ * nothing. We try to notice when someone else is a little bit ahead of
+ * us in the process, but that's just to avoid wasting cycles. Only the
+ * action of removing a page from the pending list really needs exclusive
+ * lock.
+ *
+ * vac_delay indicates that ginInsertCleanup is called from vacuum process,
+ * so call vacuum_delay_point() periodically.
+ * If stats isn't null, we count deleted pending pages into the counts.
+ */
+void
+ginInsertCleanup(Relation index, GinState *ginstate,
+ bool vac_delay, IndexBulkDeleteResult *stats)
+{
+ Buffer metabuffer, buffer;
+ Page metapage, page;
+ GinMetaPageData *metadata;
+ MemoryContext opCtx, oldCtx;
+ BuildAccumulator accum;
+ DatumArray datums;
+ BlockNumber blkno;
+
+ metabuffer = ReadBuffer(index, GIN_METAPAGE_BLKNO);
+ LockBuffer(metabuffer, GIN_SHARE);
+ metapage = BufferGetPage(metabuffer);
+ metadata = GinPageGetMeta(metapage);
+
+ if ( metadata->head == InvalidBlockNumber )
+ {
+ /* Nothing to do */
+ UnlockReleaseBuffer(metabuffer);
+ return;
+ }
+
+ /*
+ * Read and lock head of pending list
+ */
+ blkno = metadata->head;
+ buffer = ReadBuffer(index, blkno);
+ LockBuffer(buffer, GIN_SHARE);
+ page = BufferGetPage(buffer);
+
+ LockBuffer(metabuffer, GIN_UNLOCK);
+
+ /*
+ * Initialize. All temporary space will be in opCtx
+ */
+ opCtx = AllocSetContextCreate(CurrentMemoryContext,
+ "GIN insert cleanup temporary context",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+
+ oldCtx = MemoryContextSwitchTo(opCtx);
+
+ datums.maxvalues=128;
+ datums.nvalues = 0;
+ datums.values = (Datum*)palloc(sizeof(Datum)*datums.maxvalues);
+
+ ginInitBA(&accum);
+ accum.ginstate = ginstate;
+
+ /*
+ * At the top of this loop, we have pin and lock on the current page
+ * of the pending list. However, we'll release that before exiting
+ * the loop. Note we also have pin but not lock on the metapage.
+ */
+ for(;;)
+ {
+ if ( GinPageIsDeleted(page) )
+ {
+ /* another cleanup process is running concurrently */
+ UnlockReleaseBuffer( buffer );
+ break;
+ }
+
+ /*
+ * read page's datums into memory
+ */
+ processPendingPage(&accum, &datums, page, FirstOffsetNumber);
+
+ if (vac_delay)
+ vacuum_delay_point();
+
+ /*
+ * Is it time to flush memory to disk? Flush if we are at the end
+ * of the pending list, or if we have a full row and memory is
+ * getting full.
+ *
+ * XXX using up maintenance_work_mem here is probably unreasonably
+ * much, since vacuum might already be using that much.
+ */
+ if ( GinPageGetOpaque(page)->rightlink == InvalidBlockNumber ||
+ ( GinPageHasFullRow(page) &&
+ accum.allocatedMemory > maintenance_work_mem * 1024L ) )
+ {
+ ItemPointerData *list;
+ uint32 nlist;
+ Datum entry;
+ OffsetNumber maxoff, attnum;
+
+ /*
+ * Unlock current page to increase performance.
+ * Changes of page will be checked later by comparing
+ * maxoff after completion of memory flush.
+ */
+ maxoff = PageGetMaxOffsetNumber(page);
+ LockBuffer(buffer, GIN_UNLOCK);
+
+ /*
+ * Moving collected data into regular structure can take
+ * significant amount of time - so, run it without locking pending
+ * list.
+ */
+ while ((list = ginGetEntry(&accum, &attnum, &entry, &nlist)) != NULL)
+ {
+ ginEntryInsert(index, ginstate, attnum, entry, list, nlist, FALSE);
+ if (vac_delay)
+ vacuum_delay_point();
+ }
+
+ /*
+ * Lock the whole list to remove pages
+ */
+ LockBuffer(metabuffer, GIN_EXCLUSIVE);
+ LockBuffer(buffer, GIN_SHARE);
+
+ if ( GinPageIsDeleted(page) )
+ {
+ /* another cleanup process is running concurrently */
+ UnlockReleaseBuffer(buffer);
+ LockBuffer(metabuffer, GIN_UNLOCK);
+ break;
+ }
+
+ /*
+ * While we left the page unlocked, more stuff might have gotten
+ * added to it. If so, process those entries immediately. There
+ * shouldn't be very many, so we don't worry about the fact that
+ * we're doing this with exclusive lock. Insertion algorithm
+ * gurantees that inserted row(s) will not continue on next page.
+ * NOTE: intentionally no vacuum_delay_point in this loop.
+ */
+ if ( PageGetMaxOffsetNumber(page) != maxoff )
+ {
+ ginInitBA(&accum);
+ processPendingPage(&accum, &datums, page, maxoff+1);
+
+ while ((list = ginGetEntry(&accum, &attnum, &entry, &nlist)) != NULL)
+ ginEntryInsert(index, ginstate, attnum, entry, list, nlist, FALSE);
+ }
+
+ /*
+ * Remember next page - it will become the new list head
+ */
+ blkno = GinPageGetOpaque(page)->rightlink;
+ UnlockReleaseBuffer(buffer); /* shiftList will do exclusive locking */
+
+ /*
+ * remove readed pages from pending list, at this point all
+ * content of readed pages is in regular structure
+ */
+ if ( shiftList(index, metabuffer, blkno, stats) )
+ {
+ /* another cleanup process is running concurrently */
+ LockBuffer(metabuffer, GIN_UNLOCK);
+ break;
+ }
+
+ Assert( blkno == metadata->head );
+ LockBuffer(metabuffer, GIN_UNLOCK);
+
+ /*
+ * if we removed the whole pending list just exit
+ */
+ if ( blkno == InvalidBlockNumber )
+ break;
+
+ /*
+ * release memory used so far and reinit state
+ */
+ MemoryContextReset(opCtx);
+ ginInitBA(&accum);
+ datums.nvalues = 0;
+ datums.values = (Datum*)palloc(sizeof(Datum)*datums.maxvalues);
+ }
+ else
+ {
+ blkno = GinPageGetOpaque(page)->rightlink;
+ UnlockReleaseBuffer(buffer);
+ }
+
+ /*
+ * Read next page in pending list
+ */
+ CHECK_FOR_INTERRUPTS();
+ buffer = ReadBuffer(index, blkno);
+ LockBuffer(buffer, GIN_SHARE);
+ page = BufferGetPage(buffer);
+ }
+
+ ReleaseBuffer(metabuffer);
+
+ /* Clean up temporary space */
+ MemoryContextSwitchTo(oldCtx);
+ MemoryContextDelete(opCtx);
+}