diff options
author | Heikki Linnakangas <heikki.linnakangas@iki.fi> | 2014-11-20 17:56:26 +0200 |
---|---|---|
committer | Heikki Linnakangas <heikki.linnakangas@iki.fi> | 2014-11-20 18:46:41 +0200 |
commit | 2c03216d831160bedd72d45f712601b6f7d03f1c (patch) | |
tree | ab6a03d031ffa605d848b0b7067add15e56e2207 /src/backend/access/transam/xloginsert.c | |
parent | 8dc626defec23016dd5988208d8704b858b9d21d (diff) | |
download | postgresql-2c03216d831160bedd72d45f712601b6f7d03f1c.tar.gz postgresql-2c03216d831160bedd72d45f712601b6f7d03f1c.zip |
Revamp the WAL record format.
Each WAL record now carries information about the modified relation and
block(s) in a standardized format. That makes it easier to write tools that
need that information, like pg_rewind, prefetching the blocks to speed up
recovery, etc.
There's a whole new API for building WAL records, replacing the XLogRecData
chains used previously. The new API consists of XLogRegister* functions,
which are called for each buffer and chunk of data that is added to the
record. The new API also gives more control over when a full-page image is
written, by passing flags to the XLogRegisterBuffer function.
This also simplifies the XLogReadBufferForRedo() calls. The function can dig
the relation and block number from the WAL record, so they no longer need to
be passed as arguments.
For the convenience of redo routines, XLogReader now disects each WAL record
after reading it, copying the main data part and the per-block data into
MAXALIGNed buffers. The data chunks are not aligned within the WAL record,
but the redo routines can assume that the pointers returned by XLogRecGet*
functions are. Redo routines are now passed the XLogReaderState, which
contains the record in the already-disected format, instead of the plain
XLogRecord.
The new record format also makes the fixed size XLogRecord header smaller,
by removing the xl_len field. The length of the "main data" portion is now
stored at the end of the WAL record, and there's a separate header after
XLogRecord for it. The alignment padding at the end of XLogRecord is also
removed. This compansates for the fact that the new format would otherwise
be more bulky than the old format.
Reviewed by Andres Freund, Amit Kapila, Michael Paquier, Alvaro Herrera,
Fujii Masao.
Diffstat (limited to 'src/backend/access/transam/xloginsert.c')
-rw-r--r-- | src/backend/access/transam/xloginsert.c | 972 |
1 files changed, 617 insertions, 355 deletions
diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c index b83343bf5bd..89c407e521b 100644 --- a/src/backend/access/transam/xloginsert.c +++ b/src/backend/access/transam/xloginsert.c @@ -3,6 +3,12 @@ * xloginsert.c * Functions for constructing WAL records * + * Constructing a WAL record begins with a call to XLogBeginInsert, + * followed by a number of XLogRegister* calls. The registered data is + * collected in private working memory, and finally assembled into a chain + * of XLogRecData structs by a call to XLogRecordAssemble(). See + * access/transam/README for details. + * * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * @@ -24,39 +30,366 @@ #include "utils/memutils.h" #include "pg_trace.h" +/* + * For each block reference registered with XLogRegisterBuffer, we fill in + * a registered_buffer struct. + */ +typedef struct +{ + bool in_use; /* is this slot in use? */ + uint8 flags; /* REGBUF_* flags */ + RelFileNode rnode; /* identifies the relation and block */ + ForkNumber forkno; + BlockNumber block; + Page page; /* page content */ + uint32 rdata_len; /* total length of data in rdata chain */ + XLogRecData *rdata_head; /* head of the chain of data registered with + * this block */ + XLogRecData *rdata_tail; /* last entry in the chain, or &rdata_head if + * empty */ + + XLogRecData bkp_rdatas[2]; /* temporary rdatas used to hold references to + * backup block data in XLogRecordAssemble() */ +} registered_buffer; + +static registered_buffer *registered_buffers; +static int max_registered_buffers; /* allocated size */ +static int max_registered_block_id = 0; /* highest block_id + 1 + * currently registered */ + +/* + * A chain of XLogRecDatas to hold the "main data" of a WAL record, registered + * with XLogRegisterData(...). + */ +static XLogRecData *mainrdata_head; +static XLogRecData *mainrdata_last = (XLogRecData *) &mainrdata_head; +static uint32 mainrdata_len; /* total # of bytes in chain */ + +/* + * These are used to hold the record header while constructing a record. + * 'hdr_scratch' is not a plain variable, but is palloc'd at initialization, + * because we want it to be MAXALIGNed and padding bytes zeroed. + * + * For simplicity, it's allocated large enough to hold the headers for any + * WAL record. + */ +static XLogRecData hdr_rdt; +static char *hdr_scratch = NULL; + +#define HEADER_SCRATCH_SIZE \ + (SizeOfXLogRecord + \ + MaxSizeOfXLogRecordBlockHeader * (XLR_MAX_BLOCK_ID + 1) + \ + SizeOfXLogRecordDataHeaderLong) + +/* + * An array of XLogRecData structs, to hold registered data. + */ +static XLogRecData *rdatas; +static int num_rdatas; /* entries currently used */ +static int max_rdatas; /* allocated size */ + +static bool begininsert_called = false; + +/* Memory context to hold the registered buffer and data references. */ +static MemoryContext xloginsert_cxt; + static XLogRecData *XLogRecordAssemble(RmgrId rmid, uint8 info, - XLogRecData *rdata, XLogRecPtr RedoRecPtr, bool doPageWrites, - XLogRecPtr *fpw_lsn, XLogRecData **rdt_lastnormal); -static void XLogFillBkpBlock(Buffer buffer, bool buffer_std, BkpBlock *bkpb); + XLogRecPtr *fpw_lsn); + +/* + * Begin constructing a WAL record. This must be called before the + * XLogRegister* functions and XLogInsert(). + */ +void +XLogBeginInsert(void) +{ + Assert(max_registered_block_id == 0); + Assert(mainrdata_last == (XLogRecData *) &mainrdata_head); + Assert(mainrdata_len == 0); + Assert(!begininsert_called); + + /* cross-check on whether we should be here or not */ + if (!XLogInsertAllowed()) + elog(ERROR, "cannot make new WAL entries during recovery"); + + begininsert_called = true; +} /* - * Insert an XLOG record having the specified RMID and info bytes, - * with the body of the record being the data chunk(s) described by - * the rdata chain (see xloginsert.h for notes about rdata). + * Ensure that there are enough buffer and data slots in the working area, + * for subsequent XLogRegisterBuffer, XLogRegisterData and XLogRegisterBufData + * calls. + * + * There is always space for a small number of buffers and data chunks, enough + * for most record types. This function is for the exceptional cases that need + * more. + */ +void +XLogEnsureRecordSpace(int max_block_id, int ndatas) +{ + int nbuffers; + + /* + * This must be called before entering a critical section, because + * allocating memory inside a critical section can fail. repalloc() will + * check the same, but better to check it here too so that we fail + * consistently even if the arrays happen to be large enough already. + */ + Assert(CritSectionCount == 0); + + /* the minimum values can't be decreased */ + if (max_block_id < XLR_NORMAL_MAX_BLOCK_ID) + max_block_id = XLR_NORMAL_MAX_BLOCK_ID; + if (ndatas < XLR_NORMAL_RDATAS) + ndatas = XLR_NORMAL_RDATAS; + + if (max_block_id > XLR_MAX_BLOCK_ID) + elog(ERROR, "maximum number of WAL record block references exceeded"); + nbuffers = max_block_id + 1; + + if (nbuffers > max_registered_buffers) + { + registered_buffers = (registered_buffer *) + repalloc(registered_buffers, sizeof(registered_buffer) * nbuffers); + + /* + * At least the padding bytes in the structs must be zeroed, because + * they are included in WAL data, but initialize it all for tidiness. + */ + MemSet(®istered_buffers[max_registered_buffers], 0, + (nbuffers - max_registered_buffers) * sizeof(registered_buffer)); + max_registered_buffers = nbuffers; + } + + if (ndatas > max_rdatas) + { + rdatas = (XLogRecData *) repalloc(rdatas, sizeof(XLogRecData) * ndatas); + max_rdatas = ndatas; + } +} + +/* + * Reset WAL record construction buffers. + */ +void +XLogResetInsertion(void) +{ + int i; + + for (i = 0; i < max_registered_block_id; i++) + registered_buffers[i].in_use = false; + + num_rdatas = 0; + max_registered_block_id = 0; + mainrdata_len = 0; + mainrdata_last = (XLogRecData *) &mainrdata_head; + begininsert_called = false; +} + +/* + * Register a reference to a buffer with the WAL record being constructed. + * This must be called for every page that the WAL-logged operation modifies. + */ +void +XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags) +{ + registered_buffer *regbuf; + + /* NO_IMAGE doesn't make sense with FORCE_IMAGE */ + Assert(!((flags & REGBUF_FORCE_IMAGE) && (flags & (REGBUF_NO_IMAGE)))); + Assert(begininsert_called); + + if (block_id >= max_registered_block_id) + { + if (block_id >= max_registered_buffers) + elog(ERROR, "too many registered buffers"); + max_registered_block_id = block_id + 1; + } + + regbuf = ®istered_buffers[block_id]; + + BufferGetTag(buffer, ®buf->rnode, ®buf->forkno, ®buf->block); + regbuf->page = BufferGetPage(buffer); + regbuf->flags = flags; + regbuf->rdata_tail = (XLogRecData *) ®buf->rdata_head; + regbuf->rdata_len = 0; + + /* + * Check that this page hasn't already been registered with some other + * block_id. + */ +#ifdef USE_ASSERT_CHECKING + { + int i; + + for (i = 0; i < max_registered_block_id; i++) + { + registered_buffer *regbuf_old = ®istered_buffers[i]; + + if (i == block_id || !regbuf_old->in_use) + continue; + + Assert(!RelFileNodeEquals(regbuf_old->rnode, regbuf->rnode) || + regbuf_old->forkno != regbuf->forkno || + regbuf_old->block != regbuf->block); + } + } +#endif + + regbuf->in_use = true; +} + +/* + * Like XLogRegisterBuffer, but for registering a block that's not in the + * shared buffer pool (i.e. when you don't have a Buffer for it). + */ +void +XLogRegisterBlock(uint8 block_id, RelFileNode *rnode, ForkNumber forknum, + BlockNumber blknum, Page page, uint8 flags) +{ + registered_buffer *regbuf; + + /* This is currently only used to WAL-log a full-page image of a page */ + Assert(flags & REGBUF_FORCE_IMAGE); + Assert(begininsert_called); + + if (block_id >= max_registered_block_id) + max_registered_block_id = block_id + 1; + + if (block_id >= max_registered_buffers) + elog(ERROR, "too many registered buffers"); + + regbuf = ®istered_buffers[block_id]; + + regbuf->rnode = *rnode; + regbuf->forkno = forknum; + regbuf->block = blknum; + regbuf->page = page; + regbuf->flags = flags; + regbuf->rdata_tail = (XLogRecData *) ®buf->rdata_head; + regbuf->rdata_len = 0; + + /* + * Check that this page hasn't already been registered with some other + * block_id. + */ +#ifdef USE_ASSERT_CHECKING + { + int i; + + for (i = 0; i < max_registered_block_id; i++) + { + registered_buffer *regbuf_old = ®istered_buffers[i]; + + if (i == block_id || !regbuf_old->in_use) + continue; + + Assert(!RelFileNodeEquals(regbuf_old->rnode, regbuf->rnode) || + regbuf_old->forkno != regbuf->forkno || + regbuf_old->block != regbuf->block); + } + } +#endif + + regbuf->in_use = true; +} + +/* + * Add data to the WAL record that's being constructed. + * + * The data is appended to the "main chunk", available at replay with + * XLogGetRecData(). + */ +void +XLogRegisterData(char *data, int len) +{ + XLogRecData *rdata; + + Assert(begininsert_called); + + if (num_rdatas >= max_rdatas) + elog(ERROR, "too much WAL data"); + rdata = &rdatas[num_rdatas++]; + + rdata->data = data; + rdata->len = len; + + /* + * we use the mainrdata_last pointer to track the end of the chain, so no + * need to clear 'next' here. + */ + + mainrdata_last->next = rdata; + mainrdata_last = rdata; + + mainrdata_len += len; +} + +/* + * Add buffer-specific data to the WAL record that's being constructed. + * + * Block_id must reference a block previously registered with + * XLogRegisterBuffer(). If this is called more than once for the same + * block_id, the data is appended. + * + * The maximum amount of data that can be registered per block is 65535 + * bytes. That should be plenty; if you need more than BLCKSZ bytes to + * reconstruct the changes to the page, you might as well just log a full + * copy of it. (the "main data" that's not associated with a block is not + * limited) + */ +void +XLogRegisterBufData(uint8 block_id, char *data, int len) +{ + registered_buffer *regbuf; + XLogRecData *rdata; + + Assert(begininsert_called); + + /* find the registered buffer struct */ + regbuf = ®istered_buffers[block_id]; + if (!regbuf->in_use) + elog(ERROR, "no block with id %d registered with WAL insertion", + block_id); + + if (num_rdatas >= max_rdatas) + elog(ERROR, "too much WAL data"); + rdata = &rdatas[num_rdatas++]; + + rdata->data = data; + rdata->len = len; + + regbuf->rdata_tail->next = rdata; + regbuf->rdata_tail = rdata; + regbuf->rdata_len += len; +} + +/* + * Insert an XLOG record having the specified RMID and info bytes, with the + * body of the record being the data and buffer references registered earlier + * with XLogRegister* calls. * * Returns XLOG pointer to end of record (beginning of next record). * This can be used as LSN for data pages affected by the logged action. * (LSN is the XLOG point up to which the XLOG must be flushed to disk * before the data page can be written out. This implements the basic * WAL rule "write the log before the data".) - * - * NB: this routine feels free to scribble on the XLogRecData structs, - * though not on the data they reference. This is OK since the XLogRecData - * structs are always just temporaries in the calling code. */ XLogRecPtr -XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata) +XLogInsert(RmgrId rmid, uint8 info) { - XLogRecPtr RedoRecPtr; - bool doPageWrites; XLogRecPtr EndPos; - XLogRecPtr fpw_lsn; - XLogRecData *rdt; - XLogRecData *rdt_lastnormal; - /* info's high bits are reserved for use by me */ - if (info & XLR_INFO_MASK) + /* XLogBeginInsert() must have been called. */ + if (!begininsert_called) + elog(ERROR, "XLogBeginInsert was not called"); + + /* + * The caller can set rmgr bits and XLR_SPECIAL_REL_UPDATE; the rest are + * reserved for use by me. + */ + if ((info & ~(XLR_RMGR_INFO_MASK | XLR_SPECIAL_REL_UPDATE)) != 0) elog(PANIC, "invalid xlog info mask %02X", info); TRACE_POSTGRESQL_XLOG_INSERT(rmid, info); @@ -67,292 +400,282 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata) */ if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID) { + XLogResetInsertion(); EndPos = SizeOfXLogLongPHD; /* start of 1st chkpt record */ return EndPos; } - /* - * Get values needed to decide whether to do full-page writes. Since we - * don't yet have an insertion lock, these could change under us, but - * XLogInsertRecord will recheck them once it has a lock. - */ - GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites); - - /* - * Assemble an XLogRecData chain representing the WAL record, including - * any backup blocks needed. - * - * We may have to loop back to here if a race condition is detected in - * XLogInsertRecord. We could prevent the race by doing all this work - * while holding an insertion lock, but it seems better to avoid doing CRC - * calculations while holding one. - */ -retry: - rdt = XLogRecordAssemble(rmid, info, rdata, RedoRecPtr, doPageWrites, - &fpw_lsn, &rdt_lastnormal); - - EndPos = XLogInsertRecord(rdt, fpw_lsn); - - if (EndPos == InvalidXLogRecPtr) + do { + XLogRecPtr RedoRecPtr; + bool doPageWrites; + XLogRecPtr fpw_lsn; + XLogRecData *rdt; + /* - * Undo the changes we made to the rdata chain, and retry. - * - * XXX: This doesn't undo *all* the changes; the XLogRecData - * entries for buffers that we had already decided to back up have - * had their data-pointers cleared. That's OK, as long as we - * decide to back them up on the next iteration as well. Hence, - * don't allow "doPageWrites" value to go from true to false after - * we've modified the rdata chain. + * Get values needed to decide whether to do full-page writes. Since + * we don't yet have an insertion lock, these could change under us, + * but XLogInsertRecData will recheck them once it has a lock. */ - bool newDoPageWrites; + GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites); - GetFullPageWriteInfo(&RedoRecPtr, &newDoPageWrites); - doPageWrites = doPageWrites || newDoPageWrites; - rdt_lastnormal->next = NULL; + rdt = XLogRecordAssemble(rmid, info, RedoRecPtr, doPageWrites, + &fpw_lsn); - goto retry; - } + EndPos = XLogInsertRecord(rdt, fpw_lsn); + } while (EndPos == InvalidXLogRecPtr); + + XLogResetInsertion(); return EndPos; } /* - * Assemble a full WAL record, including backup blocks, from an XLogRecData - * chain, ready for insertion with XLogInsertRecord(). The record header - * fields are filled in, except for the xl_prev field and CRC. + * Assemble a WAL record from the registered data and buffers into an + * XLogRecData chain, ready for insertion with XLogInsertRecord(). * - * The rdata chain is modified, adding entries for full-page images. - * *rdt_lastnormal is set to point to the last normal (ie. not added by - * this function) entry. It can be used to reset the chain to its original - * state. + * The record header fields are filled in, except for the xl_prev field. The + * calculated CRC does not include xl_prev either. * - * If the rdata chain contains any buffer references, and a full-page image - * was not taken of all the buffers, *fpw_lsn is set to the lowest LSN among - * such pages. This signals that the assembled record is only good for - * insertion on the assumption that the RedoRecPtr and doPageWrites values - * were up-to-date. + * If there are any registered buffers, and a full-page image was not taken + * of all them, *page_writes_omitted is set to true. This signals that the + * assembled record is only good for insertion on the assumption that the + * RedoRecPtr and doPageWrites values were up-to-date. */ static XLogRecData * -XLogRecordAssemble(RmgrId rmid, uint8 info, XLogRecData *rdata, +XLogRecordAssemble(RmgrId rmid, uint8 info, XLogRecPtr RedoRecPtr, bool doPageWrites, - XLogRecPtr *fpw_lsn, XLogRecData **rdt_lastnormal) + XLogRecPtr *fpw_lsn) { - bool isLogSwitch = (rmid == RM_XLOG_ID && info == XLOG_SWITCH); XLogRecData *rdt; - Buffer dtbuf[XLR_MAX_BKP_BLOCKS]; - bool dtbuf_bkp[XLR_MAX_BKP_BLOCKS]; - uint32 len, - total_len; - unsigned i; + uint32 total_len = 0; + int block_id; + pg_crc32 rdata_crc; + registered_buffer *prev_regbuf = NULL; + XLogRecData *rdt_datas_last; + XLogRecord *rechdr; + char *scratch = hdr_scratch; /* - * These need to be static because they are returned to the caller as part - * of the XLogRecData chain. + * Note: this function can be called multiple times for the same record. + * All the modifications we do to the rdata chains below must handle that. */ - static BkpBlock dtbuf_xlg[XLR_MAX_BKP_BLOCKS]; - static XLogRecData dtbuf_rdt1[XLR_MAX_BKP_BLOCKS]; - static XLogRecData dtbuf_rdt2[XLR_MAX_BKP_BLOCKS]; - static XLogRecData dtbuf_rdt3[XLR_MAX_BKP_BLOCKS]; - static XLogRecData hdr_rdt; - static XLogRecord *rechdr; - - if (rechdr == NULL) - { - static char rechdrbuf[SizeOfXLogRecord + MAXIMUM_ALIGNOF]; - rechdr = (XLogRecord *) MAXALIGN(&rechdrbuf); - MemSet(rechdr, 0, SizeOfXLogRecord); - } + /* The record begins with the fixed-size header */ + rechdr = (XLogRecord *) scratch; + scratch += SizeOfXLogRecord; - /* The record begins with the header */ - hdr_rdt.data = (char *) rechdr; - hdr_rdt.len = SizeOfXLogRecord; - hdr_rdt.next = rdata; - total_len = SizeOfXLogRecord; + hdr_rdt.next = NULL; + rdt_datas_last = &hdr_rdt; + hdr_rdt.data = hdr_scratch; /* - * Here we scan the rdata chain, to determine which buffers must be backed - * up. - * - * We add entries for backup blocks to the chain, so that they don't need - * any special treatment in the critical section where the chunks are - * copied into the WAL buffers. Those entries have to be unlinked from the - * chain if we have to loop back here. + * Make an rdata chain containing all the data portions of all block + * references. This includes the data for full-page images. Also append + * the headers for the block references in the scratch buffer. */ - for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++) - { - dtbuf[i] = InvalidBuffer; - dtbuf_bkp[i] = false; - } - *fpw_lsn = InvalidXLogRecPtr; - len = 0; - for (rdt = rdata;;) + for (block_id = 0; block_id < max_registered_block_id; block_id++) { - if (rdt->buffer == InvalidBuffer) + registered_buffer *regbuf = ®istered_buffers[block_id]; + bool needs_backup; + bool needs_data; + XLogRecordBlockHeader bkpb; + XLogRecordBlockImageHeader bimg; + bool samerel; + + if (!regbuf->in_use) + continue; + + /* Determine if this block needs to be backed up */ + if (regbuf->flags & REGBUF_FORCE_IMAGE) + needs_backup = true; + else if (regbuf->flags & REGBUF_NO_IMAGE) + needs_backup = false; + else if (!doPageWrites) + needs_backup = false; + else { - /* Simple data, just include it */ - len += rdt->len; + /* + * We assume page LSN is first data on *every* page that can be + * passed to XLogInsert, whether it has the standard page layout + * or not. + */ + XLogRecPtr page_lsn = PageGetLSN(regbuf->page); + + needs_backup = (page_lsn <= RedoRecPtr); + if (!needs_backup) + { + if (*fpw_lsn == InvalidXLogRecPtr || page_lsn < *fpw_lsn) + *fpw_lsn = page_lsn; + } } + + /* Determine if the buffer data needs to included */ + if (regbuf->rdata_len == 0) + needs_data = false; + else if ((regbuf->flags & REGBUF_KEEP_DATA) != 0) + needs_data = true; else + needs_data = !needs_backup; + + bkpb.id = block_id; + bkpb.fork_flags = regbuf->forkno; + bkpb.data_length = 0; + + if ((regbuf->flags & REGBUF_WILL_INIT) == REGBUF_WILL_INIT) + bkpb.fork_flags |= BKPBLOCK_WILL_INIT; + + if (needs_backup) { - /* Find info for buffer */ - for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++) + Page page = regbuf->page; + + /* + * The page needs to be backed up, so set up *bimg + */ + if (regbuf->flags & REGBUF_STANDARD) { - if (rdt->buffer == dtbuf[i]) + /* Assume we can omit data between pd_lower and pd_upper */ + uint16 lower = ((PageHeader) page)->pd_lower; + uint16 upper = ((PageHeader) page)->pd_upper; + + if (lower >= SizeOfPageHeaderData && + upper > lower && + upper <= BLCKSZ) { - /* Buffer already referenced by earlier chain item */ - if (dtbuf_bkp[i]) - { - rdt->data = NULL; - rdt->len = 0; - } - else if (rdt->data) - len += rdt->len; - break; + bimg.hole_offset = lower; + bimg.hole_length = upper - lower; } - if (dtbuf[i] == InvalidBuffer) + else { - /* OK, put it in this slot */ - XLogRecPtr page_lsn; - bool needs_backup; - - dtbuf[i] = rdt->buffer; - - /* - * Determine whether the buffer has to be backed up. - * - * We assume page LSN is first data on *every* page that - * can be passed to XLogInsert, whether it has the - * standard page layout or not. We don't need to take the - * buffer header lock for PageGetLSN because we hold an - * exclusive lock on the page and/or the relation. - */ - page_lsn = PageGetLSN(BufferGetPage(rdt->buffer)); - if (!doPageWrites) - needs_backup = false; - else if (page_lsn <= RedoRecPtr) - needs_backup = true; - else - needs_backup = false; - - if (needs_backup) - { - /* - * The page needs to be backed up, so set up BkpBlock - */ - XLogFillBkpBlock(rdt->buffer, rdt->buffer_std, - &(dtbuf_xlg[i])); - dtbuf_bkp[i] = true; - rdt->data = NULL; - rdt->len = 0; - } - else - { - if (rdt->data) - len += rdt->len; - if (*fpw_lsn == InvalidXLogRecPtr || - page_lsn < *fpw_lsn) - { - *fpw_lsn = page_lsn; - } - } - break; + /* No "hole" to compress out */ + bimg.hole_offset = 0; + bimg.hole_length = 0; } } - if (i >= XLR_MAX_BKP_BLOCKS) - elog(PANIC, "can backup at most %d blocks per xlog record", - XLR_MAX_BKP_BLOCKS); - } - /* Break out of loop when rdt points to last chain item */ - if (rdt->next == NULL) - break; - rdt = rdt->next; - } - total_len += len; + else + { + /* Not a standard page header, don't try to eliminate "hole" */ + bimg.hole_offset = 0; + bimg.hole_length = 0; + } - /* - * Make additional rdata chain entries for the backup blocks, so that we - * don't need to special-case them in the write loop. This modifies the - * original rdata chain, but we keep a pointer to the last regular entry, - * rdt_lastnormal, so that we can undo this if we have to start over. - * - * At the exit of this loop, total_len includes the backup block data. - * - * Also set the appropriate info bits to show which buffers were backed - * up. The XLR_BKP_BLOCK(N) bit corresponds to the N'th distinct buffer - * value (ignoring InvalidBuffer) appearing in the rdata chain. - */ - *rdt_lastnormal = rdt; - for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++) - { - BkpBlock *bkpb; - char *page; + /* Fill in the remaining fields in the XLogRecordBlockData struct */ + bkpb.fork_flags |= BKPBLOCK_HAS_IMAGE; - if (!dtbuf_bkp[i]) - continue; + total_len += BLCKSZ - bimg.hole_length; + + /* + * Construct XLogRecData entries for the page content. + */ + rdt_datas_last->next = ®buf->bkp_rdatas[0]; + rdt_datas_last = rdt_datas_last->next; + if (bimg.hole_length == 0) + { + rdt_datas_last->data = page; + rdt_datas_last->len = BLCKSZ; + } + else + { + /* must skip the hole */ + rdt_datas_last->data = page; + rdt_datas_last->len = bimg.hole_offset; - info |= XLR_BKP_BLOCK(i); + rdt_datas_last->next = ®buf->bkp_rdatas[1]; + rdt_datas_last = rdt_datas_last->next; - bkpb = &(dtbuf_xlg[i]); - page = (char *) BufferGetBlock(dtbuf[i]); + rdt_datas_last->data = page + (bimg.hole_offset + bimg.hole_length); + rdt_datas_last->len = BLCKSZ - (bimg.hole_offset + bimg.hole_length); + } + } - rdt->next = &(dtbuf_rdt1[i]); - rdt = rdt->next; + if (needs_data) + { + /* + * Link the caller-supplied rdata chain for this buffer to the + * overall list. + */ + bkpb.fork_flags |= BKPBLOCK_HAS_DATA; + bkpb.data_length = regbuf->rdata_len; + total_len += regbuf->rdata_len; + + rdt_datas_last->next = regbuf->rdata_head; + rdt_datas_last = regbuf->rdata_tail; + } - rdt->data = (char *) bkpb; - rdt->len = sizeof(BkpBlock); - total_len += sizeof(BkpBlock); + if (prev_regbuf && RelFileNodeEquals(regbuf->rnode, prev_regbuf->rnode)) + { + samerel = true; + bkpb.fork_flags |= BKPBLOCK_SAME_REL; + prev_regbuf = regbuf; + } + else + samerel = false; - rdt->next = &(dtbuf_rdt2[i]); - rdt = rdt->next; + /* Ok, copy the header to the scratch buffer */ + memcpy(scratch, &bkpb, SizeOfXLogRecordBlockHeader); + scratch += SizeOfXLogRecordBlockHeader; + if (needs_backup) + { + memcpy(scratch, &bimg, SizeOfXLogRecordBlockImageHeader); + scratch += SizeOfXLogRecordBlockImageHeader; + } + if (!samerel) + { + memcpy(scratch, ®buf->rnode, sizeof(RelFileNode)); + scratch += sizeof(RelFileNode); + } + memcpy(scratch, ®buf->block, sizeof(BlockNumber)); + scratch += sizeof(BlockNumber); + } - if (bkpb->hole_length == 0) + /* followed by main data, if any */ + if (mainrdata_len > 0) + { + if (mainrdata_len > 255) { - rdt->data = page; - rdt->len = BLCKSZ; - total_len += BLCKSZ; - rdt->next = NULL; + *(scratch++) = XLR_BLOCK_ID_DATA_LONG; + memcpy(scratch, &mainrdata_len, sizeof(uint32)); + scratch += sizeof(uint32); } else { - /* must skip the hole */ - rdt->data = page; - rdt->len = bkpb->hole_offset; - total_len += bkpb->hole_offset; - - rdt->next = &(dtbuf_rdt3[i]); - rdt = rdt->next; - - rdt->data = page + (bkpb->hole_offset + bkpb->hole_length); - rdt->len = BLCKSZ - (bkpb->hole_offset + bkpb->hole_length); - total_len += rdt->len; - rdt->next = NULL; + *(scratch++) = XLR_BLOCK_ID_DATA_SHORT; + *(scratch++) = (uint8) mainrdata_len; } + rdt_datas_last->next = mainrdata_head; + rdt_datas_last = mainrdata_last; + total_len += mainrdata_len; } + rdt_datas_last->next = NULL; + + hdr_rdt.len = (scratch - hdr_scratch); + total_len += hdr_rdt.len; /* - * We disallow len == 0 because it provides a useful bit of extra error - * checking in ReadRecord. This means that all callers of XLogInsert - * must supply at least some not-in-a-buffer data. However, we make an - * exception for XLOG SWITCH records because we don't want them to ever - * cross a segment boundary. + * Calculate CRC of the data + * + * Note that the record header isn't added into the CRC initially since we + * don't know the prev-link yet. Thus, the CRC will represent the CRC of + * the whole record in the order: rdata, then backup blocks, then record + * header. */ - if (len == 0 && !isLogSwitch) - elog(PANIC, "invalid xlog record length %u", rechdr->xl_len); + INIT_CRC32C(rdata_crc); + COMP_CRC32C(rdata_crc, hdr_scratch + SizeOfXLogRecord, hdr_rdt.len - SizeOfXLogRecord); + for (rdt = hdr_rdt.next; rdt != NULL; rdt = rdt->next) + COMP_CRC32C(rdata_crc, rdt->data, rdt->len); /* * Fill in the fields in the record header. Prev-link is filled in later, - * once we know where in the WAL the record will be inserted. CRC is also - * not calculated yet. + * once we know where in the WAL the record will be inserted. The CRC does + * not include the record header yet. */ rechdr->xl_xid = GetCurrentTransactionIdIfAny(); rechdr->xl_tot_len = total_len; - rechdr->xl_len = len; /* doesn't include backup blocks */ rechdr->xl_info = info; rechdr->xl_rmid = rmid; rechdr->xl_prev = InvalidXLogRecPtr; + rechdr->xl_crc = rdata_crc; return &hdr_rdt; } @@ -429,45 +752,41 @@ XLogSaveBufferForHint(Buffer buffer, bool buffer_std) if (lsn <= RedoRecPtr) { - XLogRecData rdata[2]; - BkpBlock bkpb; + int flags; char copied_buffer[BLCKSZ]; char *origdata = (char *) BufferGetBlock(buffer); - - /* Make a BkpBlock struct representing the buffer */ - XLogFillBkpBlock(buffer, buffer_std, &bkpb); + RelFileNode rnode; + ForkNumber forkno; + BlockNumber blkno; /* * Copy buffer so we don't have to worry about concurrent hint bit or * lsn updates. We assume pd_lower/upper cannot be changed without an * exclusive lock, so the contents bkp are not racy. - * - * With buffer_std set to false, XLogFillBkpBlock() sets hole_length - * and hole_offset to 0; so the following code is safe for either - * case. */ - memcpy(copied_buffer, origdata, bkpb.hole_offset); - memcpy(copied_buffer + bkpb.hole_offset, - origdata + bkpb.hole_offset + bkpb.hole_length, - BLCKSZ - bkpb.hole_offset - bkpb.hole_length); + if (buffer_std) + { + /* Assume we can omit data between pd_lower and pd_upper */ + Page page = BufferGetPage(buffer); + uint16 lower = ((PageHeader) page)->pd_lower; + uint16 upper = ((PageHeader) page)->pd_upper; - /* - * Header for backup block. - */ - rdata[0].data = (char *) &bkpb; - rdata[0].len = sizeof(BkpBlock); - rdata[0].buffer = InvalidBuffer; - rdata[0].next = &(rdata[1]); + memcpy(copied_buffer, origdata, lower); + memcpy(copied_buffer + upper, origdata + upper, BLCKSZ - upper); + } + else + memcpy(copied_buffer, origdata, BLCKSZ); - /* - * Save copy of the buffer. - */ - rdata[1].data = copied_buffer; - rdata[1].len = BLCKSZ - bkpb.hole_length; - rdata[1].buffer = InvalidBuffer; - rdata[1].next = NULL; + XLogBeginInsert(); - recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI, rdata); + flags = REGBUF_FORCE_IMAGE; + if (buffer_std) + flags |= REGBUF_STANDARD; + + BufferGetTag(buffer, &rnode, &forkno, &blkno); + XLogRegisterBlock(0, &rnode, forkno, blkno, copied_buffer, flags); + + recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI); } return recptr; @@ -489,71 +808,16 @@ XLogRecPtr log_newpage(RelFileNode *rnode, ForkNumber forkNum, BlockNumber blkno, Page page, bool page_std) { - BkpBlock bkpb; + int flags; XLogRecPtr recptr; - XLogRecData rdata[3]; - - /* NO ELOG(ERROR) from here till newpage op is logged */ - START_CRIT_SECTION(); - - bkpb.node = *rnode; - bkpb.fork = forkNum; - bkpb.block = blkno; + flags = REGBUF_FORCE_IMAGE; if (page_std) - { - /* Assume we can omit data between pd_lower and pd_upper */ - uint16 lower = ((PageHeader) page)->pd_lower; - uint16 upper = ((PageHeader) page)->pd_upper; - - if (lower >= SizeOfPageHeaderData && - upper > lower && - upper <= BLCKSZ) - { - bkpb.hole_offset = lower; - bkpb.hole_length = upper - lower; - } - else - { - /* No "hole" to compress out */ - bkpb.hole_offset = 0; - bkpb.hole_length = 0; - } - } - else - { - /* Not a standard page header, don't try to eliminate "hole" */ - bkpb.hole_offset = 0; - bkpb.hole_length = 0; - } - - rdata[0].data = (char *) &bkpb; - rdata[0].len = sizeof(BkpBlock); - rdata[0].buffer = InvalidBuffer; - rdata[0].next = &(rdata[1]); - - if (bkpb.hole_length == 0) - { - rdata[1].data = (char *) page; - rdata[1].len = BLCKSZ; - rdata[1].buffer = InvalidBuffer; - rdata[1].next = NULL; - } - else - { - /* must skip the hole */ - rdata[1].data = (char *) page; - rdata[1].len = bkpb.hole_offset; - rdata[1].buffer = InvalidBuffer; - rdata[1].next = &rdata[2]; - - rdata[2].data = (char *) page + (bkpb.hole_offset + bkpb.hole_length); - rdata[2].len = BLCKSZ - (bkpb.hole_offset + bkpb.hole_length); - rdata[2].buffer = InvalidBuffer; - rdata[2].next = NULL; - } + flags |= REGBUF_STANDARD; - recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI, rdata); + XLogBeginInsert(); + XLogRegisterBlock(0, rnode, forkNum, blkno, page, flags); + recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI); /* * The page may be uninitialized. If so, we can't set the LSN because that @@ -564,8 +828,6 @@ log_newpage(RelFileNode *rnode, ForkNumber forkNum, BlockNumber blkno, PageSetLSN(page, recptr); } - END_CRIT_SECTION(); - return recptr; } @@ -596,38 +858,38 @@ log_newpage_buffer(Buffer buffer, bool page_std) } /* - * Fill a BkpBlock for a buffer. + * Allocate working buffers needed for WAL record construction. */ -static void -XLogFillBkpBlock(Buffer buffer, bool buffer_std, BkpBlock *bkpb) +void +InitXLogInsert(void) { - BufferGetTag(buffer, &bkpb->node, &bkpb->fork, &bkpb->block); + /* Initialize the working areas */ + if (xloginsert_cxt == NULL) + { + xloginsert_cxt = AllocSetContextCreate(TopMemoryContext, + "WAL record construction", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + } - if (buffer_std) + if (registered_buffers == NULL) { - /* Assume we can omit data between pd_lower and pd_upper */ - Page page = BufferGetPage(buffer); - uint16 lower = ((PageHeader) page)->pd_lower; - uint16 upper = ((PageHeader) page)->pd_upper; - - if (lower >= SizeOfPageHeaderData && - upper > lower && - upper <= BLCKSZ) - { - bkpb->hole_offset = lower; - bkpb->hole_length = upper - lower; - } - else - { - /* No "hole" to compress out */ - bkpb->hole_offset = 0; - bkpb->hole_length = 0; - } + registered_buffers = (registered_buffer *) + MemoryContextAllocZero(xloginsert_cxt, + sizeof(registered_buffer) * (XLR_NORMAL_MAX_BLOCK_ID + 1)); + max_registered_buffers = XLR_NORMAL_MAX_BLOCK_ID + 1; } - else + if (rdatas == NULL) { - /* Not a standard page header, don't try to eliminate "hole" */ - bkpb->hole_offset = 0; - bkpb->hole_length = 0; + rdatas = MemoryContextAlloc(xloginsert_cxt, + sizeof(XLogRecData) * XLR_NORMAL_RDATAS); + max_rdatas = XLR_NORMAL_RDATAS; } + + /* + * Allocate a buffer to hold the header information for a WAL record. + */ + if (hdr_scratch == NULL) + hdr_scratch = palloc0(HEADER_SCRATCH_SIZE); } |