diff options
Diffstat (limited to 'src/backend/access/heap/heapam.c')
-rw-r--r-- | src/backend/access/heap/heapam.c | 147 |
1 files changed, 100 insertions, 47 deletions
diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index 197067d4a1d..afe5e66b4bc 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -4199,6 +4199,9 @@ heap_xlog_cleanup_info(XLogRecPtr lsn, XLogRecord *record) * conflict processing to occur before we begin index vacuum actions. see * vacuumlazy.c and also comments in btvacuumpage() */ + + /* Backup blocks are not used in cleanup_info records */ + Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK)); } /* @@ -4231,10 +4234,15 @@ heap_xlog_clean(XLogRecPtr lsn, XLogRecord *record) ResolveRecoveryConflictWithSnapshot(xlrec->latestRemovedXid, xlrec->node); - RestoreBkpBlocks(lsn, record, true); - - if (record->xl_info & XLR_BKP_BLOCK_1) + /* + * If we have a full-page image, restore it (using a cleanup lock) and + * we're done. + */ + if (record->xl_info & XLR_BKP_BLOCK(0)) + { + (void) RestoreBackupBlock(lsn, record, 0, true, false); return; + } buffer = XLogReadBufferExtended(xlrec->node, MAIN_FORKNUM, xlrec->block, RBM_NORMAL); if (!BufferIsValid(buffer)) @@ -4300,15 +4308,16 @@ heap_xlog_freeze(XLogRecPtr lsn, XLogRecord *record) if (InHotStandby) ResolveRecoveryConflictWithSnapshot(cutoff_xid, xlrec->node); - RestoreBkpBlocks(lsn, record, false); - - if (record->xl_info & XLR_BKP_BLOCK_1) + /* If we have a full-page image, restore it and we're done */ + if (record->xl_info & XLR_BKP_BLOCK(0)) + { + (void) RestoreBackupBlock(lsn, record, 0, false, false); return; + } - buffer = XLogReadBufferExtended(xlrec->node, MAIN_FORKNUM, xlrec->block, RBM_NORMAL); + buffer = XLogReadBuffer(xlrec->node, xlrec->block, false); if (!BufferIsValid(buffer)) return; - LockBufferForCleanup(buffer); page = (Page) BufferGetPage(buffer); if (XLByteLE(lsn, PageGetLSN(page))) @@ -4349,6 +4358,9 @@ heap_xlog_newpage(XLogRecPtr lsn, XLogRecord *record) Buffer buffer; Page page; + /* Backup blocks are not used in newpage records */ + Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK)); + /* * Note: the NEWPAGE log record is used for both heaps and indexes, so do * not do anything that assumes we are touching a heap. @@ -4401,8 +4413,12 @@ heap_xlog_delete(XLogRecPtr lsn, XLogRecord *record) FreeFakeRelcacheEntry(reln); } - if (record->xl_info & XLR_BKP_BLOCK_1) + /* If we have a full-page image, restore it and we're done */ + if (record->xl_info & XLR_BKP_BLOCK(0)) + { + (void) RestoreBackupBlock(lsn, record, 0, false, false); return; + } buffer = XLogReadBuffer(xlrec->target.node, blkno, false); if (!BufferIsValid(buffer)) @@ -4479,8 +4495,12 @@ heap_xlog_insert(XLogRecPtr lsn, XLogRecord *record) FreeFakeRelcacheEntry(reln); } - if (record->xl_info & XLR_BKP_BLOCK_1) + /* If we have a full-page image, restore it and we're done */ + if (record->xl_info & XLR_BKP_BLOCK(0)) + { + (void) RestoreBackupBlock(lsn, record, 0, false, false); return; + } if (record->xl_info & XLOG_HEAP_INIT_PAGE) { @@ -4562,9 +4582,10 @@ static void heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update) { xl_heap_update *xlrec = (xl_heap_update *) XLogRecGetData(record); - Buffer buffer; bool samepage = (ItemPointerGetBlockNumber(&(xlrec->newtid)) == ItemPointerGetBlockNumber(&(xlrec->target.tid))); + Buffer obuffer, + nbuffer; Page page; OffsetNumber offnum; ItemId lp = NULL; @@ -4592,27 +4613,44 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update) FreeFakeRelcacheEntry(reln); } - if (record->xl_info & XLR_BKP_BLOCK_1) + /* + * In normal operation, it is important to lock the two pages in + * page-number order, to avoid possible deadlocks against other update + * operations going the other way. However, during WAL replay there can + * be no other update happening, so we don't need to worry about that. But + * we *do* need to worry that we don't expose an inconsistent state to Hot + * Standby queries --- so the original page can't be unlocked before we've + * added the new tuple to the new page. + */ + + if (record->xl_info & XLR_BKP_BLOCK(0)) { + obuffer = RestoreBackupBlock(lsn, record, 0, false, true); if (samepage) - return; /* backup block covered both changes */ + { + /* backup block covered both changes, so we're done */ + UnlockReleaseBuffer(obuffer); + return; + } goto newt; } /* Deal with old tuple version */ - buffer = XLogReadBuffer(xlrec->target.node, - ItemPointerGetBlockNumber(&(xlrec->target.tid)), - false); - if (!BufferIsValid(buffer)) + obuffer = XLogReadBuffer(xlrec->target.node, + ItemPointerGetBlockNumber(&(xlrec->target.tid)), + false); + if (!BufferIsValid(obuffer)) goto newt; - page = (Page) BufferGetPage(buffer); + page = (Page) BufferGetPage(obuffer); if (XLByteLE(lsn, PageGetLSN(page))) /* changes are applied */ { - UnlockReleaseBuffer(buffer); if (samepage) + { + UnlockReleaseBuffer(obuffer); return; + } goto newt; } @@ -4650,11 +4688,14 @@ heap_xlog_update(XLogRecPtr lsn, XLogRecord *record, bool hot_update) * is already applied */ if (samepage) + { + nbuffer = obuffer; goto newsame; + } + PageSetLSN(page, lsn); PageSetTLI(page, ThisTimeLineID); - MarkBufferDirty(buffer); - UnlockReleaseBuffer(buffer); + MarkBufferDirty(obuffer); /* Deal with new tuple */ @@ -4672,31 +4713,38 @@ newt:; FreeFakeRelcacheEntry(reln); } - if (record->xl_info & XLR_BKP_BLOCK_2) + if (record->xl_info & XLR_BKP_BLOCK(1)) + { + (void) RestoreBackupBlock(lsn, record, 1, false, false); + if (BufferIsValid(obuffer)) + UnlockReleaseBuffer(obuffer); return; + } if (record->xl_info & XLOG_HEAP_INIT_PAGE) { - buffer = XLogReadBuffer(xlrec->target.node, - ItemPointerGetBlockNumber(&(xlrec->newtid)), - true); - Assert(BufferIsValid(buffer)); - page = (Page) BufferGetPage(buffer); + nbuffer = XLogReadBuffer(xlrec->target.node, + ItemPointerGetBlockNumber(&(xlrec->newtid)), + true); + Assert(BufferIsValid(nbuffer)); + page = (Page) BufferGetPage(nbuffer); - PageInit(page, BufferGetPageSize(buffer), 0); + PageInit(page, BufferGetPageSize(nbuffer), 0); } else { - buffer = XLogReadBuffer(xlrec->target.node, - ItemPointerGetBlockNumber(&(xlrec->newtid)), - false); - if (!BufferIsValid(buffer)) + nbuffer = XLogReadBuffer(xlrec->target.node, + ItemPointerGetBlockNumber(&(xlrec->newtid)), + false); + if (!BufferIsValid(nbuffer)) return; - page = (Page) BufferGetPage(buffer); + page = (Page) BufferGetPage(nbuffer); if (XLByteLE(lsn, PageGetLSN(page))) /* changes are applied */ { - UnlockReleaseBuffer(buffer); + UnlockReleaseBuffer(nbuffer); + if (BufferIsValid(obuffer)) + UnlockReleaseBuffer(obuffer); return; } } @@ -4741,11 +4789,14 @@ newsame:; PageSetLSN(page, lsn); PageSetTLI(page, ThisTimeLineID); - MarkBufferDirty(buffer); - UnlockReleaseBuffer(buffer); + MarkBufferDirty(nbuffer); + UnlockReleaseBuffer(nbuffer); + + if (BufferIsValid(obuffer) && obuffer != nbuffer) + UnlockReleaseBuffer(obuffer); /* - * If the page is running low on free space, update the FSM as well. + * If the new page is running low on free space, update the FSM as well. * Arbitrarily, our definition of "low" is less than 20%. We can't do much * better than that without knowing the fill-factor for the table. * @@ -4761,7 +4812,8 @@ newsame:; */ if (!hot_update && freespace < BLCKSZ / 5) XLogRecordPageWithFreeSpace(xlrec->target.node, - ItemPointerGetBlockNumber(&(xlrec->newtid)), freespace); + ItemPointerGetBlockNumber(&(xlrec->newtid)), + freespace); } static void @@ -4774,8 +4826,12 @@ heap_xlog_lock(XLogRecPtr lsn, XLogRecord *record) ItemId lp = NULL; HeapTupleHeader htup; - if (record->xl_info & XLR_BKP_BLOCK_1) + /* If we have a full-page image, restore it and we're done */ + if (record->xl_info & XLR_BKP_BLOCK(0)) + { + (void) RestoreBackupBlock(lsn, record, 0, false, false); return; + } buffer = XLogReadBuffer(xlrec->target.node, ItemPointerGetBlockNumber(&(xlrec->target.tid)), @@ -4833,8 +4889,12 @@ heap_xlog_inplace(XLogRecPtr lsn, XLogRecord *record) uint32 oldlen; uint32 newlen; - if (record->xl_info & XLR_BKP_BLOCK_1) + /* If we have a full-page image, restore it and we're done */ + if (record->xl_info & XLR_BKP_BLOCK(0)) + { + (void) RestoreBackupBlock(lsn, record, 0, false, false); return; + } buffer = XLogReadBuffer(xlrec->target.node, ItemPointerGetBlockNumber(&(xlrec->target.tid)), @@ -4883,8 +4943,6 @@ heap_redo(XLogRecPtr lsn, XLogRecord *record) * required. The ones in heap2 rmgr do. */ - RestoreBkpBlocks(lsn, record, false); - switch (info & XLOG_HEAP_OPMASK) { case XLOG_HEAP_INSERT: @@ -4918,11 +4976,6 @@ heap2_redo(XLogRecPtr lsn, XLogRecord *record) { uint8 info = record->xl_info & ~XLR_INFO_MASK; - /* - * Note that RestoreBkpBlocks() is called after conflict processing within - * each record type handling function. - */ - switch (info & XLOG_HEAP_OPMASK) { case XLOG_HEAP2_FREEZE: |