From 89395bfa6f2fafccec10be377fcf759030910654 Mon Sep 17 00:00:00 2001 From: Tom Lane Date: Thu, 30 Mar 2006 23:03:10 +0000 Subject: Improve gist XLOG code to follow the coding rules needed to prevent torn-page problems. This introduces some issues of its own, mainly that there are now some critical sections of unreasonably broad scope, but it's a step forward anyway. Further cleanup will require some code refactoring that I'd prefer to get Oleg and Teodor involved in. --- src/backend/access/gist/gistxlog.c | 297 ++++++++++++++++--------------------- 1 file changed, 126 insertions(+), 171 deletions(-) (limited to 'src/backend/access/gist/gistxlog.c') diff --git a/src/backend/access/gist/gistxlog.c b/src/backend/access/gist/gistxlog.c index 9a15061484f..12a521c75c9 100644 --- a/src/backend/access/gist/gistxlog.c +++ b/src/backend/access/gist/gistxlog.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.12 2006/03/29 21:17:36 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/access/gist/gistxlog.c,v 1.13 2006/03/30 23:03:10 tgl Exp $ *------------------------------------------------------------------------- */ #include "postgres.h" @@ -25,11 +25,11 @@ typedef struct { - gistxlogEntryUpdate *data; + gistxlogPageUpdate *data; int len; IndexTuple *itup; OffsetNumber *todelete; -} EntryUpdateRecord; +} PageUpdateRecord; typedef struct { @@ -58,16 +58,15 @@ typedef struct gistIncompleteInsert } gistIncompleteInsert; -MemoryContext opCtx; -MemoryContext insertCtx; +static MemoryContext opCtx; /* working memory for operations */ +static MemoryContext insertCtx; /* holds incomplete_inserts list */ static List *incomplete_inserts; -#define ItemPointerEQ( a, b ) \ - ( \ - ItemPointerGetOffsetNumber(a) == ItemPointerGetOffsetNumber(b) && \ - ItemPointerGetBlockNumber (a) == ItemPointerGetBlockNumber(b) \ - ) +#define ItemPointerEQ(a, b) \ + ( ItemPointerGetOffsetNumber(a) == ItemPointerGetOffsetNumber(b) && \ + ItemPointerGetBlockNumber (a) == ItemPointerGetBlockNumber(b) ) + static void pushIncompleteInsert(RelFileNode node, XLogRecPtr lsn, ItemPointerData key, @@ -101,7 +100,13 @@ pushIncompleteInsert(RelFileNode node, XLogRecPtr lsn, ItemPointerData key, } Assert(ninsert->lenblk > 0); - incomplete_inserts = lappend(incomplete_inserts, ninsert); + /* + * Stick the new incomplete insert onto the front of the list, not the + * back. This is so that gist_xlog_cleanup will process incompletions + * in last-in-first-out order. + */ + incomplete_inserts = lcons(ninsert, incomplete_inserts); + MemoryContextSwitchTo(oldCxt); } @@ -116,10 +121,9 @@ forgetIncompleteInsert(RelFileNode node, ItemPointerData key) if (RelFileNodeEquals(node, insert->node) && ItemPointerEQ(&(insert->key), &(key))) { - /* found */ - pfree(insert->blkno); incomplete_inserts = list_delete_ptr(incomplete_inserts, insert); + pfree(insert->blkno); pfree(insert); break; } @@ -127,25 +131,25 @@ forgetIncompleteInsert(RelFileNode node, ItemPointerData key) } static void -decodeEntryUpdateRecord(EntryUpdateRecord *decoded, XLogRecord *record) +decodePageUpdateRecord(PageUpdateRecord *decoded, XLogRecord *record) { char *begin = XLogRecGetData(record), *ptr; int i = 0, addpath = 0; - decoded->data = (gistxlogEntryUpdate *) begin; + decoded->data = (gistxlogPageUpdate *) begin; if (decoded->data->ntodelete) { - decoded->todelete = (OffsetNumber *) (begin + sizeof(gistxlogEntryUpdate) + addpath); + decoded->todelete = (OffsetNumber *) (begin + sizeof(gistxlogPageUpdate) + addpath); addpath = MAXALIGN(sizeof(OffsetNumber) * decoded->data->ntodelete); } else decoded->todelete = NULL; decoded->len = 0; - ptr = begin + sizeof(gistxlogEntryUpdate) + addpath; + ptr = begin + sizeof(gistxlogPageUpdate) + addpath; while (ptr - begin < record->xl_len) { decoded->len++; @@ -154,7 +158,7 @@ decodeEntryUpdateRecord(EntryUpdateRecord *decoded, XLogRecord *record) decoded->itup = (IndexTuple *) palloc(sizeof(IndexTuple) * decoded->len); - ptr = begin + sizeof(gistxlogEntryUpdate) + addpath; + ptr = begin + sizeof(gistxlogPageUpdate) + addpath; while (ptr - begin < record->xl_len) { decoded->itup[i] = (IndexTuple) ptr; @@ -167,38 +171,30 @@ decodeEntryUpdateRecord(EntryUpdateRecord *decoded, XLogRecord *record) * redo any page update (except page split) */ static void -gistRedoEntryUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) +gistRedoPageUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) { - EntryUpdateRecord xlrec; + PageUpdateRecord xlrec; Relation reln; Buffer buffer; Page page; - decodeEntryUpdateRecord(&xlrec, record); + /* nothing to do if whole page was backed up (and no info to do it with) */ + if (record->xl_info & XLR_BKP_BLOCK_1) + return; + + decodePageUpdateRecord(&xlrec, record); reln = XLogOpenRelation(xlrec.data->node); buffer = XLogReadBuffer(reln, xlrec.data->blkno, false); if (!BufferIsValid(buffer)) - elog(PANIC, "block %u unfound", xlrec.data->blkno); + return; page = (Page) BufferGetPage(buffer); - if (isnewroot) + if (XLByteLE(lsn, PageGetLSN(page))) { - if (!PageIsNew((PageHeader) page) && XLByteLE(lsn, PageGetLSN(page))) - { - LockBuffer(buffer, BUFFER_LOCK_UNLOCK); - ReleaseBuffer(buffer); - return; - } - } - else - { - if (XLByteLE(lsn, PageGetLSN(page))) - { - LockBuffer(buffer, BUFFER_LOCK_UNLOCK); - ReleaseBuffer(buffer); - return; - } + LockBuffer(buffer, BUFFER_LOCK_UNLOCK); + ReleaseBuffer(buffer); + return; } if (xlrec.data->isemptypage) @@ -237,9 +233,9 @@ gistRedoEntryUpdateRecord(XLogRecPtr lsn, XLogRecord *record, bool isnewroot) GistClearTuplesDeleted(page); } + GistPageGetOpaque(page)->rightlink = InvalidBlockNumber; PageSetLSN(page, lsn); PageSetTLI(page, ThisTimeLineID); - GistPageGetOpaque(page)->rightlink = InvalidBlockNumber; LockBuffer(buffer, BUFFER_LOCK_UNLOCK); WriteBuffer(buffer); @@ -294,38 +290,21 @@ gistRedoPageSplitRecord(XLogRecPtr lsn, XLogRecord *record) Buffer buffer; Page page; int i; - int flags = 0; + int flags; decodePageSplitRecord(&xlrec, record); reln = XLogOpenRelation(xlrec.data->node); - - /* first of all wee need get F_LEAF flag from original page */ - buffer = XLogReadBuffer(reln, xlrec.data->origblkno, false); - if (!BufferIsValid(buffer)) - elog(PANIC, "block %u unfound", xlrec.data->origblkno); - page = (Page) BufferGetPage(buffer); - flags = (GistPageIsLeaf(page)) ? F_LEAF : 0; - LockBuffer(buffer, BUFFER_LOCK_UNLOCK); - ReleaseBuffer(buffer); + flags = xlrec.data->origleaf ? F_LEAF : 0; /* loop around all pages */ for (i = 0; i < xlrec.data->npage; i++) { NewPage *newpage = xlrec.page + i; - bool isorigpage = (xlrec.data->origblkno == newpage->header->blkno) ? true : false; - buffer = XLogReadBuffer(reln, newpage->header->blkno, !isorigpage); - if (!BufferIsValid(buffer)) - elog(PANIC, "block %u unfound", newpage->header->blkno); + buffer = XLogReadBuffer(reln, newpage->header->blkno, true); + Assert(BufferIsValid(buffer)); page = (Page) BufferGetPage(buffer); - if (XLByteLE(lsn, PageGetLSN(page))) - { - LockBuffer(buffer, BUFFER_LOCK_UNLOCK); - ReleaseBuffer(buffer); - continue; - } - /* ok, clear buffer */ GISTInitBuffer(buffer, flags); @@ -399,12 +378,11 @@ gist_redo(XLogRecPtr lsn, XLogRecord *record) oldCxt = MemoryContextSwitchTo(opCtx); switch (info) { - case XLOG_GIST_ENTRY_UPDATE: - case XLOG_GIST_ENTRY_DELETE: - gistRedoEntryUpdateRecord(lsn, record, false); + case XLOG_GIST_PAGE_UPDATE: + gistRedoPageUpdateRecord(lsn, record, false); break; case XLOG_GIST_NEW_ROOT: - gistRedoEntryUpdateRecord(lsn, record, true); + gistRedoPageUpdateRecord(lsn, record, true); break; case XLOG_GIST_PAGE_SPLIT: gistRedoPageSplitRecord(lsn, record); @@ -433,7 +411,7 @@ out_target(StringInfo buf, RelFileNode node, ItemPointerData key) } static void -out_gistxlogEntryUpdate(StringInfo buf, gistxlogEntryUpdate *xlrec) +out_gistxlogPageUpdate(StringInfo buf, gistxlogPageUpdate *xlrec) { out_target(buf, xlrec->node, xlrec->key); appendStringInfo(buf, "; block number %u", xlrec->blkno); @@ -455,17 +433,13 @@ gist_desc(StringInfo buf, uint8 xl_info, char *rec) switch (info) { - case XLOG_GIST_ENTRY_UPDATE: - appendStringInfo(buf, "entry_update: "); - out_gistxlogEntryUpdate(buf, (gistxlogEntryUpdate *) rec); - break; - case XLOG_GIST_ENTRY_DELETE: - appendStringInfo(buf, "entry_delete: "); - out_gistxlogEntryUpdate(buf, (gistxlogEntryUpdate *) rec); + case XLOG_GIST_PAGE_UPDATE: + appendStringInfo(buf, "page_update: "); + out_gistxlogPageUpdate(buf, (gistxlogPageUpdate *) rec); break; case XLOG_GIST_NEW_ROOT: appendStringInfo(buf, "new_root: "); - out_target(buf, ((gistxlogEntryUpdate *) rec)->node, ((gistxlogEntryUpdate *) rec)->key); + out_target(buf, ((gistxlogPageUpdate *) rec)->node, ((gistxlogPageUpdate *) rec)->key); break; case XLOG_GIST_PAGE_SPLIT: out_gistxlogPageSplit(buf, (gistxlogPageSplit *) rec); @@ -506,60 +480,47 @@ gist_form_invalid_tuple(BlockNumber blkno) return tuple; } -static Buffer -gistXLogReadAndLockBuffer(Relation r, BlockNumber blkno) -{ - Buffer buffer = XLogReadBuffer(r, blkno, false); - - if (!BufferIsValid(buffer)) - elog(PANIC, "block %u unfound", blkno); - - return buffer; -} - static void -gixtxlogFindPath(Relation index, gistIncompleteInsert *insert) +gistxlogFindPath(Relation index, gistIncompleteInsert *insert) { GISTInsertStack *top; insert->pathlen = 0; insert->path = NULL; - if ((top = gistFindPath(index, insert->origblkno, gistXLogReadAndLockBuffer)) != NULL) + if ((top = gistFindPath(index, insert->origblkno)) != NULL) { int i; - GISTInsertStack *ptr = top; + GISTInsertStack *ptr; - while (ptr) - { + for (ptr = top; ptr; ptr = ptr->parent) insert->pathlen++; - ptr = ptr->parent; - } insert->path = (BlockNumber *) palloc(sizeof(BlockNumber) * insert->pathlen); i = 0; - ptr = top; - while (ptr) - { - insert->path[i] = ptr->blkno; - i++; - ptr = ptr->parent; - } + for (ptr = top; ptr; ptr = ptr->parent) + insert->path[i++] = ptr->blkno; } else elog(LOG, "lost parent for block %u", insert->origblkno); } /* - * Continue insert after crash. In normal situation, there isn't any incomplete - * inserts, but if it might be after crash, WAL may has not a record of completetion. + * Continue insert after crash. In normal situations, there aren't any + * incomplete inserts, but if a crash occurs partway through an insertion + * sequence, we'll need to finish making the index valid at the end of WAL + * replay. + * + * Note that we assume the index is now in a valid state, except for the + * unfinished insertion. In particular it's safe to invoke gistFindPath(); + * there shouldn't be any garbage pages for it to run into. * * Although stored LSN in gistIncompleteInsert is a LSN of child page, * we can compare it with LSN of parent, because parent is always locked * while we change child page (look at gistmakedeal). So if parent's LSN is - * lesser than stored lsn then changes in parent doesn't do yet. + * less than stored lsn then changes in parent aren't done yet. */ static void gistContinueInsert(gistIncompleteInsert *insert) @@ -602,6 +563,12 @@ gistContinueInsert(gistIncompleteInsert *insert) LockBuffer(buffer, BUFFER_LOCK_UNLOCK); WriteBuffer(buffer); + + /* + * XXX fall out to avoid making LOG message at bottom of routine. + * I think the logic for when to emit that message is all wrong... + */ + return; } else { @@ -610,7 +577,7 @@ gistContinueInsert(gistIncompleteInsert *insert) int numbuffer; /* construct path */ - gixtxlogFindPath(index, insert); + gistxlogFindPath(index, insert); Assert(insert->pathlen > 0); @@ -625,9 +592,8 @@ gistContinueInsert(gistIncompleteInsert *insert) childfound = 0; numbuffer = 1; - buffers[numbuffer - 1] = XLogReadBuffer(index, insert->path[i], false); - if (!BufferIsValid(buffers[numbuffer - 1])) - elog(PANIC, "block %u unfound", insert->path[i]); + buffers[numbuffer - 1] = ReadBuffer(index, insert->path[i]); + LockBuffer(buffers[numbuffer - 1], GIST_EXCLUSIVE); pages[numbuffer - 1] = BufferGetPage(buffers[numbuffer - 1]); if (XLByteLE(insert->lsn, PageGetLSN(pages[numbuffer - 1]))) @@ -661,10 +627,9 @@ gistContinueInsert(gistIncompleteInsert *insert) if (gistnospace(pages[numbuffer - 1], itup, lenitup)) { - /* no space left on page, so we should split */ - buffers[numbuffer] = XLogReadBuffer(index, P_NEW, true); - if (!BufferIsValid(buffers[numbuffer])) - elog(PANIC, "could not obtain new block"); + /* no space left on page, so we must split */ + buffers[numbuffer] = ReadBuffer(index, P_NEW); + LockBuffer(buffers[numbuffer], GIST_EXCLUSIVE); GISTInitBuffer(buffers[numbuffer], 0); pages[numbuffer] = BufferGetPage(buffers[numbuffer]); gistfillbuffer(index, pages[numbuffer], itup, lenitup, FirstOffsetNumber); @@ -678,7 +643,8 @@ gistContinueInsert(gistIncompleteInsert *insert) * we split root, just copy tuples from old root to new * page */ - parentitup = gistextractbuffer(buffers[numbuffer - 1], &pituplen); + parentitup = gistextractbuffer(buffers[numbuffer - 1], + &pituplen); /* sanity check */ if (i + 1 != insert->pathlen) @@ -686,9 +652,8 @@ gistContinueInsert(gistIncompleteInsert *insert) RelationGetRelationName(index)); /* fill new page */ - buffers[numbuffer] = XLogReadBuffer(index, P_NEW, true); - if (!BufferIsValid(buffers[numbuffer])) - elog(PANIC, "could not obtain new block"); + buffers[numbuffer] = ReadBuffer(index, P_NEW); + LockBuffer(buffers[numbuffer], GIST_EXCLUSIVE); GISTInitBuffer(buffers[numbuffer], 0); pages[numbuffer] = BufferGetPage(buffers[numbuffer]); gistfillbuffer(index, pages[numbuffer], parentitup, pituplen, FirstOffsetNumber); @@ -748,16 +713,10 @@ void gist_xlog_cleanup(void) { ListCell *l; - List *reverse = NIL; - MemoryContext oldCxt = MemoryContextSwitchTo(insertCtx); - - /* we should call gistContinueInsert in reverse order */ + MemoryContext oldCxt; + oldCxt = MemoryContextSwitchTo(opCtx); foreach(l, incomplete_inserts) - reverse = lappend(reverse, lfirst(l)); - - MemoryContextSwitchTo(opCtx); - foreach(l, reverse) { gistIncompleteInsert *insert = (gistIncompleteInsert *) lfirst(l); @@ -772,10 +731,9 @@ gist_xlog_cleanup(void) XLogRecData * -formSplitRdata(RelFileNode node, BlockNumber blkno, +formSplitRdata(RelFileNode node, BlockNumber blkno, bool page_is_leaf, ItemPointer key, SplitedPageLayout *dist) { - XLogRecData *rdata; gistxlogPageSplit *xlrec = (gistxlogPageSplit *) palloc(sizeof(gistxlogPageSplit)); SplitedPageLayout *ptr; @@ -793,6 +751,7 @@ formSplitRdata(RelFileNode node, BlockNumber blkno, xlrec->node = node; xlrec->origblkno = blkno; + xlrec->origleaf = page_is_leaf; xlrec->npage = (uint16) npage; if (key) xlrec->key = *key; @@ -825,68 +784,64 @@ formSplitRdata(RelFileNode node, BlockNumber blkno, return rdata; } - +/* + * Construct the rdata array for an XLOG record describing a page update + * (deletion and/or insertion of tuples on a single index page). + * + * Note that both the todelete array and the tuples are marked as belonging + * to the target buffer; they need not be stored in XLOG if XLogInsert decides + * to log the whole buffer contents instead. Also, we take care that there's + * at least one rdata item referencing the buffer, even when ntodelete and + * ituplen are both zero; this ensures that XLogInsert knows about the buffer. + */ XLogRecData * -formUpdateRdata(RelFileNode node, BlockNumber blkno, +formUpdateRdata(RelFileNode node, Buffer buffer, OffsetNumber *todelete, int ntodelete, bool emptypage, IndexTuple *itup, int ituplen, ItemPointer key) { XLogRecData *rdata; - gistxlogEntryUpdate *xlrec = (gistxlogEntryUpdate *) palloc(sizeof(gistxlogEntryUpdate)); + gistxlogPageUpdate *xlrec; + int cur, + i; + + /* ugly wart in API: emptypage causes us to ignore other inputs */ + if (emptypage) + ntodelete = ituplen = 0; + + rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (2 + ituplen)); + xlrec = (gistxlogPageUpdate *) palloc(sizeof(gistxlogPageUpdate)); xlrec->node = node; - xlrec->blkno = blkno; + xlrec->blkno = BufferGetBlockNumber(buffer); + xlrec->ntodelete = ntodelete; + xlrec->isemptypage = emptypage; if (key) xlrec->key = *key; else ItemPointerSetInvalid(&(xlrec->key)); - if (emptypage) - { - xlrec->isemptypage = true; - xlrec->ntodelete = 0; - - rdata = (XLogRecData *) palloc(sizeof(XLogRecData)); - rdata->buffer = InvalidBuffer; - rdata->data = (char *) xlrec; - rdata->len = sizeof(gistxlogEntryUpdate); - rdata->next = NULL; - } - else - { - int cur = 1, - i; - - xlrec->isemptypage = false; - xlrec->ntodelete = ntodelete; - - rdata = (XLogRecData *) palloc(sizeof(XLogRecData) * (2 + ituplen)); - - rdata->buffer = InvalidBuffer; - rdata->data = (char *) xlrec; - rdata->len = sizeof(gistxlogEntryUpdate); - rdata->next = NULL; + rdata[0].data = (char *) xlrec; + rdata[0].len = sizeof(gistxlogPageUpdate); + rdata[0].buffer = InvalidBuffer; + rdata[0].next = &(rdata[1]); - if (ntodelete) - { - rdata[cur - 1].next = &(rdata[cur]); - rdata[cur].buffer = InvalidBuffer; - rdata[cur].data = (char *) todelete; - rdata[cur].len = MAXALIGN(sizeof(OffsetNumber) * ntodelete); - rdata[cur].next = NULL; - cur++; - } + rdata[1].data = (char *) todelete; + rdata[1].len = MAXALIGN(sizeof(OffsetNumber) * ntodelete); + rdata[1].buffer = buffer; + rdata[1].buffer_std = true; + rdata[1].next = NULL; - /* new tuples */ - for (i = 0; i < ituplen; i++) - { - rdata[cur].buffer = InvalidBuffer; - rdata[cur].data = (char *) (itup[i]); - rdata[cur].len = IndexTupleSize(itup[i]); - rdata[cur].next = NULL; - rdata[cur - 1].next = &(rdata[cur]); - cur++; - } + /* new tuples */ + cur = 2; + for (i = 0; i < ituplen; i++) + { + rdata[cur - 1].next = &(rdata[cur]); + rdata[cur].data = (char *) (itup[i]); + rdata[cur].len = IndexTupleSize(itup[i]); + rdata[cur].buffer = buffer; + rdata[cur].buffer_std = true; + rdata[cur].next = NULL; + cur++; } return rdata; -- cgit v1.2.3