From 0ab43c96781f37a72ae4e1e897d1e10f56c484d2 Mon Sep 17 00:00:00 2001 From: Teodor Sigaev Date: Sat, 23 Aug 2008 10:40:03 +0000 Subject: Fix possible duplicate tuples while GiST scan. Now page is processed at once and ItemPointers are collected in memory. Remove tuple's killing by killtuple() if tuple was moved to another page - it could produce unaceptable overhead. Backpatch up to 8.1 because the bug was introduced by GiST's concurrency support. --- src/backend/access/gist/gistget.c | 186 +++++++++++++++++++------------------- 1 file changed, 94 insertions(+), 92 deletions(-) (limited to 'src/backend/access/gist/gistget.c') diff --git a/src/backend/access/gist/gistget.c b/src/backend/access/gist/gistget.c index 22a7d0ceae6..9a9927ca05f 100644 --- a/src/backend/access/gist/gistget.c +++ b/src/backend/access/gist/gistget.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/gist/gistget.c,v 1.69 2008/01/01 19:45:46 momjian Exp $ + * $PostgreSQL: pgsql/src/backend/access/gist/gistget.c,v 1.69.2.1 2008/08/23 10:40:03 teodor Exp $ * *------------------------------------------------------------------------- */ @@ -29,63 +29,39 @@ static bool gistindex_keytest(IndexTuple tuple, IndexScanDesc scan, static void killtuple(Relation r, GISTScanOpaque so, ItemPointer iptr) { - Buffer buffer = so->curbuf; - - for (;;) - { - Page p; - BlockNumber blkno; - OffsetNumber offset, - maxoff; - - LockBuffer(buffer, GIST_SHARE); - gistcheckpage(r, buffer); - p = (Page) BufferGetPage(buffer); + Page p; + OffsetNumber offset; - if (buffer == so->curbuf && XLByteEQ(so->stack->lsn, PageGetLSN(p))) - { - /* page unchanged, so all is simple */ - offset = ItemPointerGetOffsetNumber(iptr); - ItemIdMarkDead(PageGetItemId(p, offset)); - SetBufferCommitInfoNeedsSave(buffer); - LockBuffer(buffer, GIST_UNLOCK); - break; - } + LockBuffer(so->curbuf, GIST_SHARE); + gistcheckpage(r, so->curbuf); + p = (Page) BufferGetPage(so->curbuf); - maxoff = PageGetMaxOffsetNumber(p); + if (XLByteEQ(so->stack->lsn, PageGetLSN(p))) + { + /* page unchanged, so all is simple */ + offset = ItemPointerGetOffsetNumber(iptr); + ItemIdMarkDead(PageGetItemId(p, offset)); + SetBufferCommitInfoNeedsSave(so->curbuf); + } + else + { + OffsetNumber maxoff = PageGetMaxOffsetNumber(p); for (offset = FirstOffsetNumber; offset <= maxoff; offset = OffsetNumberNext(offset)) { - IndexTuple ituple = (IndexTuple) PageGetItem(p, PageGetItemId(p, offset)); + IndexTuple ituple = (IndexTuple) PageGetItem(p, PageGetItemId(p, offset)); if (ItemPointerEquals(&(ituple->t_tid), iptr)) { /* found */ ItemIdMarkDead(PageGetItemId(p, offset)); - SetBufferCommitInfoNeedsSave(buffer); - LockBuffer(buffer, GIST_UNLOCK); - if (buffer != so->curbuf) - ReleaseBuffer(buffer); - return; + SetBufferCommitInfoNeedsSave(so->curbuf); + break; } } - - /* follow right link */ - - /* - * ??? is it good? if tuple dropped by concurrent vacuum, we will read - * all leaf pages... - */ - blkno = GistPageGetOpaque(p)->rightlink; - LockBuffer(buffer, GIST_UNLOCK); - if (buffer != so->curbuf) - ReleaseBuffer(buffer); - - if (blkno == InvalidBlockNumber) - /* can't found, dropped by somebody else */ - return; - buffer = ReadBuffer(r, blkno); } + + LockBuffer(so->curbuf, GIST_UNLOCK); } /* @@ -147,7 +123,6 @@ gistnext(IndexScanDesc scan, ScanDirection dir, ItemPointer tids, GISTSearchStack *stk; IndexTuple it; GISTPageOpaque opaque; - bool resetoffset = false; int ntids = 0; so = (GISTScanOpaque) scan->opaque; @@ -172,6 +147,42 @@ gistnext(IndexScanDesc scan, ScanDirection dir, ItemPointer tids, return 0; } + /* + * check stored pointers from last visit + */ + if ( so->nPageData > 0 ) + { + while( ntids < maxtids && so->curPageData < so->nPageData ) + { + tids[ ntids ] = scan->xs_ctup.t_self = so->pageData[ so->curPageData ]; + + so->curPageData ++; + ntids++; + } + + if ( ntids == maxtids ) + return ntids; + + /* + * Go to the next page + */ + stk = so->stack->next; + pfree(so->stack); + so->stack = stk; + + /* If we're out of stack entries, we're done */ + if (so->stack == NULL) + { + ReleaseBuffer(so->curbuf); + so->curbuf = InvalidBuffer; + return ntids; + } + + so->curbuf = ReleaseAndReadBuffer(so->curbuf, + scan->indexRelation, + stk->block); + } + for (;;) { /* First of all, we need lock buffer */ @@ -180,30 +191,25 @@ gistnext(IndexScanDesc scan, ScanDirection dir, ItemPointer tids, gistcheckpage(scan->indexRelation, so->curbuf); p = BufferGetPage(so->curbuf); opaque = GistPageGetOpaque(p); - resetoffset = false; - if (XLogRecPtrIsInvalid(so->stack->lsn) || !XLByteEQ(so->stack->lsn, PageGetLSN(p))) - { - /* page changed from last visit or visit first time , reset offset */ - so->stack->lsn = PageGetLSN(p); - resetoffset = true; - - /* check page split, occured from last visit or visit to parent */ - if (!XLogRecPtrIsInvalid(so->stack->parentlsn) && - XLByteLT(so->stack->parentlsn, opaque->nsn) && - opaque->rightlink != InvalidBlockNumber /* sanity check */ && - (so->stack->next == NULL || so->stack->next->block != opaque->rightlink) /* check if already - added */ ) - { - /* detect page split, follow right link to add pages */ + /* remember lsn to identify page changed for tuple's killing */ + so->stack->lsn = PageGetLSN(p); - stk = (GISTSearchStack *) palloc(sizeof(GISTSearchStack)); - stk->next = so->stack->next; - stk->block = opaque->rightlink; - stk->parentlsn = so->stack->parentlsn; - memset(&(stk->lsn), 0, sizeof(GistNSN)); - so->stack->next = stk; - } + /* check page split, occured from last visit or visit to parent */ + if (!XLogRecPtrIsInvalid(so->stack->parentlsn) && + XLByteLT(so->stack->parentlsn, opaque->nsn) && + opaque->rightlink != InvalidBlockNumber /* sanity check */ && + (so->stack->next == NULL || so->stack->next->block != opaque->rightlink) /* check if already + added */ ) + { + /* detect page split, follow right link to add pages */ + + stk = (GISTSearchStack *) palloc(sizeof(GISTSearchStack)); + stk->next = so->stack->next; + stk->block = opaque->rightlink; + stk->parentlsn = so->stack->parentlsn; + memset(&(stk->lsn), 0, sizeof(GistNSN)); + so->stack->next = stk; } /* if page is empty, then just skip it */ @@ -226,25 +232,13 @@ gistnext(IndexScanDesc scan, ScanDirection dir, ItemPointer tids, continue; } - if (!GistPageIsLeaf(p) || resetoffset || - !ItemPointerIsValid(&so->curpos)) - { - if (ScanDirectionIsBackward(dir)) - n = PageGetMaxOffsetNumber(p); - else - n = FirstOffsetNumber; - } + if (ScanDirectionIsBackward(dir)) + n = PageGetMaxOffsetNumber(p); else - { - n = ItemPointerGetOffsetNumber(&(so->curpos)); - - if (ScanDirectionIsBackward(dir)) - n = OffsetNumberPrev(n); - else - n = OffsetNumberNext(n); - } + n = FirstOffsetNumber; /* wonderful, we can look at page */ + so->nPageData = so->curPageData = 0; for (;;) { @@ -252,6 +246,20 @@ gistnext(IndexScanDesc scan, ScanDirection dir, ItemPointer tids, if (!OffsetNumberIsValid(n)) { + while( ntids < maxtids && so->curPageData < so->nPageData ) + { + tids[ ntids ] = scan->xs_ctup.t_self = so->pageData[ so->curPageData ]; + + so->curPageData ++; + ntids++; + } + + if ( ntids == maxtids ) + { + LockBuffer(so->curbuf, GIST_UNLOCK); + return ntids; + } + /* * We ran out of matching index entries on the current page, * so pop the top stack entry and use it to continue the @@ -292,14 +300,8 @@ gistnext(IndexScanDesc scan, ScanDirection dir, ItemPointer tids, if (!(ignore_killed_tuples && ItemIdIsDead(PageGetItemId(p, n)))) { it = (IndexTuple) PageGetItem(p, PageGetItemId(p, n)); - tids[ntids] = scan->xs_ctup.t_self = it->t_tid; - ntids++; - - if (ntids == maxtids) - { - LockBuffer(so->curbuf, GIST_UNLOCK); - return ntids; - } + so->pageData[ so->nPageData ] = it->t_tid; + so->nPageData ++; } } else -- cgit v1.2.3