diff options
Diffstat (limited to 'src/backend/access/transam/xlog.c')
-rw-r--r-- | src/backend/access/transam/xlog.c | 112 |
1 files changed, 67 insertions, 45 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 213d5b791cb..f4e5c471ce5 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -926,8 +926,8 @@ begin:; * loop, write_len includes the backup block data. * * Also set the appropriate info bits to show which buffers were backed - * up. The i'th XLR_SET_BKP_BLOCK bit corresponds to the i'th distinct - * buffer value (ignoring InvalidBuffer) appearing in the rdata chain. + * up. The XLR_BKP_BLOCK(N) bit corresponds to the N'th distinct buffer + * value (ignoring InvalidBuffer) appearing in the rdata chain. */ write_len = len; for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++) @@ -938,7 +938,7 @@ begin:; if (!dtbuf_bkp[i]) continue; - info |= XLR_SET_BKP_BLOCK(i); + info |= XLR_BKP_BLOCK(i); bkpb = &(dtbuf_xlg[i]); page = (char *) BufferGetBlock(dtbuf[i]); @@ -3560,9 +3560,16 @@ CleanupBackupHistory(void) } /* - * Restore the backup blocks present in an XLOG record, if any. + * Restore a full-page image from a backup block attached to an XLOG record. * - * We assume all of the record has been read into memory at *record. + * lsn: LSN of the XLOG record being replayed + * record: the complete XLOG record + * block_index: which backup block to restore (0 .. XLR_MAX_BKP_BLOCKS - 1) + * get_cleanup_lock: TRUE to get a cleanup rather than plain exclusive lock + * keep_buffer: TRUE to return the buffer still locked and pinned + * + * Returns the buffer number containing the page. Note this is not terribly + * useful unless keep_buffer is specified as TRUE. * * Note: when a backup block is available in XLOG, we restore it * unconditionally, even if the page in the database appears newer. @@ -3573,15 +3580,20 @@ CleanupBackupHistory(void) * modifications of the page that appear in XLOG, rather than possibly * ignoring them as already applied, but that's not a huge drawback. * - * If 'cleanup' is true, a cleanup lock is used when restoring blocks. - * Otherwise, a normal exclusive lock is used. During crash recovery, that's - * just pro forma because there can't be any regular backends in the system, - * but in hot standby mode the distinction is important. The 'cleanup' - * argument applies to all backup blocks in the WAL record, that suffices for - * now. + * If 'get_cleanup_lock' is true, a cleanup lock is obtained on the buffer, + * else a normal exclusive lock is used. During crash recovery, that's just + * pro forma because there can't be any regular backends in the system, but + * in hot standby mode the distinction is important. + * + * If 'keep_buffer' is true, return without releasing the buffer lock and pin; + * then caller is responsible for doing UnlockReleaseBuffer() later. This + * is needed in some cases when replaying XLOG records that touch multiple + * pages, to prevent inconsistent states from being visible to other backends. + * (Again, that's only important in hot standby mode.) */ -void -RestoreBkpBlocks(XLogRecPtr lsn, XLogRecord *record, bool cleanup) +Buffer +RestoreBackupBlock(XLogRecPtr lsn, XLogRecord *record, int block_index, + bool get_cleanup_lock, bool keep_buffer) { Buffer buffer; Page page; @@ -3589,49 +3601,59 @@ RestoreBkpBlocks(XLogRecPtr lsn, XLogRecord *record, bool cleanup) char *blk; int i; - if (!(record->xl_info & XLR_BKP_BLOCK_MASK)) - return; - + /* Locate requested BkpBlock in the record */ blk = (char *) XLogRecGetData(record) + record->xl_len; for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++) { - if (!(record->xl_info & XLR_SET_BKP_BLOCK(i))) + if (!(record->xl_info & XLR_BKP_BLOCK(i))) continue; memcpy(&bkpb, blk, sizeof(BkpBlock)); blk += sizeof(BkpBlock); - buffer = XLogReadBufferExtended(bkpb.node, bkpb.fork, bkpb.block, - RBM_ZERO); - Assert(BufferIsValid(buffer)); - if (cleanup) - LockBufferForCleanup(buffer); - else - LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); + if (i == block_index) + { + /* Found it, apply the update */ + buffer = XLogReadBufferExtended(bkpb.node, bkpb.fork, bkpb.block, + RBM_ZERO); + Assert(BufferIsValid(buffer)); + if (get_cleanup_lock) + LockBufferForCleanup(buffer); + else + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); - page = (Page) BufferGetPage(buffer); + page = (Page) BufferGetPage(buffer); - if (bkpb.hole_length == 0) - { - memcpy((char *) page, blk, BLCKSZ); - } - else - { - memcpy((char *) page, blk, bkpb.hole_offset); - /* must zero-fill the hole */ - MemSet((char *) page + bkpb.hole_offset, 0, bkpb.hole_length); - memcpy((char *) page + (bkpb.hole_offset + bkpb.hole_length), - blk + bkpb.hole_offset, - BLCKSZ - (bkpb.hole_offset + bkpb.hole_length)); - } + if (bkpb.hole_length == 0) + { + memcpy((char *) page, blk, BLCKSZ); + } + else + { + memcpy((char *) page, blk, bkpb.hole_offset); + /* must zero-fill the hole */ + MemSet((char *) page + bkpb.hole_offset, 0, bkpb.hole_length); + memcpy((char *) page + (bkpb.hole_offset + bkpb.hole_length), + blk + bkpb.hole_offset, + BLCKSZ - (bkpb.hole_offset + bkpb.hole_length)); + } + + PageSetLSN(page, lsn); + PageSetTLI(page, ThisTimeLineID); + MarkBufferDirty(buffer); - PageSetLSN(page, lsn); - PageSetTLI(page, ThisTimeLineID); - MarkBufferDirty(buffer); - UnlockReleaseBuffer(buffer); + if (!keep_buffer) + UnlockReleaseBuffer(buffer); + + return buffer; + } blk += BLCKSZ - bkpb.hole_length; } + + /* Caller specified a bogus block_index */ + elog(ERROR, "failed to restore block_index %d", block_index); + return InvalidBuffer; /* keep compiler quiet */ } /* @@ -3660,7 +3682,7 @@ RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode) { uint32 blen; - if (!(record->xl_info & XLR_SET_BKP_BLOCK(i))) + if (!(record->xl_info & XLR_BKP_BLOCK(i))) continue; memcpy(&bkpb, blk, sizeof(BkpBlock)); @@ -8714,8 +8736,8 @@ xlog_outrec(StringInfo buf, XLogRecord *record) for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++) { - if (record->xl_info & XLR_SET_BKP_BLOCK(i)) - appendStringInfo(buf, "; bkpb%d", i + 1); + if (record->xl_info & XLR_BKP_BLOCK(i)) + appendStringInfo(buf, "; bkpb%d", i); } appendStringInfo(buf, ": %s", RmgrTable[record->xl_rmid].rm_name); |