diff options
Diffstat (limited to 'src/backend/access/transam/xloginsert.c')
-rw-r--r-- | src/backend/access/transam/xloginsert.c | 972 |
1 files changed, 617 insertions, 355 deletions
diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c index b83343bf5bd..89c407e521b 100644 --- a/src/backend/access/transam/xloginsert.c +++ b/src/backend/access/transam/xloginsert.c @@ -3,6 +3,12 @@ * xloginsert.c * Functions for constructing WAL records * + * Constructing a WAL record begins with a call to XLogBeginInsert, + * followed by a number of XLogRegister* calls. The registered data is + * collected in private working memory, and finally assembled into a chain + * of XLogRecData structs by a call to XLogRecordAssemble(). See + * access/transam/README for details. + * * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * @@ -24,39 +30,366 @@ #include "utils/memutils.h" #include "pg_trace.h" +/* + * For each block reference registered with XLogRegisterBuffer, we fill in + * a registered_buffer struct. + */ +typedef struct +{ + bool in_use; /* is this slot in use? */ + uint8 flags; /* REGBUF_* flags */ + RelFileNode rnode; /* identifies the relation and block */ + ForkNumber forkno; + BlockNumber block; + Page page; /* page content */ + uint32 rdata_len; /* total length of data in rdata chain */ + XLogRecData *rdata_head; /* head of the chain of data registered with + * this block */ + XLogRecData *rdata_tail; /* last entry in the chain, or &rdata_head if + * empty */ + + XLogRecData bkp_rdatas[2]; /* temporary rdatas used to hold references to + * backup block data in XLogRecordAssemble() */ +} registered_buffer; + +static registered_buffer *registered_buffers; +static int max_registered_buffers; /* allocated size */ +static int max_registered_block_id = 0; /* highest block_id + 1 + * currently registered */ + +/* + * A chain of XLogRecDatas to hold the "main data" of a WAL record, registered + * with XLogRegisterData(...). + */ +static XLogRecData *mainrdata_head; +static XLogRecData *mainrdata_last = (XLogRecData *) &mainrdata_head; +static uint32 mainrdata_len; /* total # of bytes in chain */ + +/* + * These are used to hold the record header while constructing a record. + * 'hdr_scratch' is not a plain variable, but is palloc'd at initialization, + * because we want it to be MAXALIGNed and padding bytes zeroed. + * + * For simplicity, it's allocated large enough to hold the headers for any + * WAL record. + */ +static XLogRecData hdr_rdt; +static char *hdr_scratch = NULL; + +#define HEADER_SCRATCH_SIZE \ + (SizeOfXLogRecord + \ + MaxSizeOfXLogRecordBlockHeader * (XLR_MAX_BLOCK_ID + 1) + \ + SizeOfXLogRecordDataHeaderLong) + +/* + * An array of XLogRecData structs, to hold registered data. + */ +static XLogRecData *rdatas; +static int num_rdatas; /* entries currently used */ +static int max_rdatas; /* allocated size */ + +static bool begininsert_called = false; + +/* Memory context to hold the registered buffer and data references. */ +static MemoryContext xloginsert_cxt; + static XLogRecData *XLogRecordAssemble(RmgrId rmid, uint8 info, - XLogRecData *rdata, XLogRecPtr RedoRecPtr, bool doPageWrites, - XLogRecPtr *fpw_lsn, XLogRecData **rdt_lastnormal); -static void XLogFillBkpBlock(Buffer buffer, bool buffer_std, BkpBlock *bkpb); + XLogRecPtr *fpw_lsn); + +/* + * Begin constructing a WAL record. This must be called before the + * XLogRegister* functions and XLogInsert(). + */ +void +XLogBeginInsert(void) +{ + Assert(max_registered_block_id == 0); + Assert(mainrdata_last == (XLogRecData *) &mainrdata_head); + Assert(mainrdata_len == 0); + Assert(!begininsert_called); + + /* cross-check on whether we should be here or not */ + if (!XLogInsertAllowed()) + elog(ERROR, "cannot make new WAL entries during recovery"); + + begininsert_called = true; +} /* - * Insert an XLOG record having the specified RMID and info bytes, - * with the body of the record being the data chunk(s) described by - * the rdata chain (see xloginsert.h for notes about rdata). + * Ensure that there are enough buffer and data slots in the working area, + * for subsequent XLogRegisterBuffer, XLogRegisterData and XLogRegisterBufData + * calls. + * + * There is always space for a small number of buffers and data chunks, enough + * for most record types. This function is for the exceptional cases that need + * more. + */ +void +XLogEnsureRecordSpace(int max_block_id, int ndatas) +{ + int nbuffers; + + /* + * This must be called before entering a critical section, because + * allocating memory inside a critical section can fail. repalloc() will + * check the same, but better to check it here too so that we fail + * consistently even if the arrays happen to be large enough already. + */ + Assert(CritSectionCount == 0); + + /* the minimum values can't be decreased */ + if (max_block_id < XLR_NORMAL_MAX_BLOCK_ID) + max_block_id = XLR_NORMAL_MAX_BLOCK_ID; + if (ndatas < XLR_NORMAL_RDATAS) + ndatas = XLR_NORMAL_RDATAS; + + if (max_block_id > XLR_MAX_BLOCK_ID) + elog(ERROR, "maximum number of WAL record block references exceeded"); + nbuffers = max_block_id + 1; + + if (nbuffers > max_registered_buffers) + { + registered_buffers = (registered_buffer *) + repalloc(registered_buffers, sizeof(registered_buffer) * nbuffers); + + /* + * At least the padding bytes in the structs must be zeroed, because + * they are included in WAL data, but initialize it all for tidiness. + */ + MemSet(®istered_buffers[max_registered_buffers], 0, + (nbuffers - max_registered_buffers) * sizeof(registered_buffer)); + max_registered_buffers = nbuffers; + } + + if (ndatas > max_rdatas) + { + rdatas = (XLogRecData *) repalloc(rdatas, sizeof(XLogRecData) * ndatas); + max_rdatas = ndatas; + } +} + +/* + * Reset WAL record construction buffers. + */ +void +XLogResetInsertion(void) +{ + int i; + + for (i = 0; i < max_registered_block_id; i++) + registered_buffers[i].in_use = false; + + num_rdatas = 0; + max_registered_block_id = 0; + mainrdata_len = 0; + mainrdata_last = (XLogRecData *) &mainrdata_head; + begininsert_called = false; +} + +/* + * Register a reference to a buffer with the WAL record being constructed. + * This must be called for every page that the WAL-logged operation modifies. + */ +void +XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags) +{ + registered_buffer *regbuf; + + /* NO_IMAGE doesn't make sense with FORCE_IMAGE */ + Assert(!((flags & REGBUF_FORCE_IMAGE) && (flags & (REGBUF_NO_IMAGE)))); + Assert(begininsert_called); + + if (block_id >= max_registered_block_id) + { + if (block_id >= max_registered_buffers) + elog(ERROR, "too many registered buffers"); + max_registered_block_id = block_id + 1; + } + + regbuf = ®istered_buffers[block_id]; + + BufferGetTag(buffer, ®buf->rnode, ®buf->forkno, ®buf->block); + regbuf->page = BufferGetPage(buffer); + regbuf->flags = flags; + regbuf->rdata_tail = (XLogRecData *) ®buf->rdata_head; + regbuf->rdata_len = 0; + + /* + * Check that this page hasn't already been registered with some other + * block_id. + */ +#ifdef USE_ASSERT_CHECKING + { + int i; + + for (i = 0; i < max_registered_block_id; i++) + { + registered_buffer *regbuf_old = ®istered_buffers[i]; + + if (i == block_id || !regbuf_old->in_use) + continue; + + Assert(!RelFileNodeEquals(regbuf_old->rnode, regbuf->rnode) || + regbuf_old->forkno != regbuf->forkno || + regbuf_old->block != regbuf->block); + } + } +#endif + + regbuf->in_use = true; +} + +/* + * Like XLogRegisterBuffer, but for registering a block that's not in the + * shared buffer pool (i.e. when you don't have a Buffer for it). + */ +void +XLogRegisterBlock(uint8 block_id, RelFileNode *rnode, ForkNumber forknum, + BlockNumber blknum, Page page, uint8 flags) +{ + registered_buffer *regbuf; + + /* This is currently only used to WAL-log a full-page image of a page */ + Assert(flags & REGBUF_FORCE_IMAGE); + Assert(begininsert_called); + + if (block_id >= max_registered_block_id) + max_registered_block_id = block_id + 1; + + if (block_id >= max_registered_buffers) + elog(ERROR, "too many registered buffers"); + + regbuf = ®istered_buffers[block_id]; + + regbuf->rnode = *rnode; + regbuf->forkno = forknum; + regbuf->block = blknum; + regbuf->page = page; + regbuf->flags = flags; + regbuf->rdata_tail = (XLogRecData *) ®buf->rdata_head; + regbuf->rdata_len = 0; + + /* + * Check that this page hasn't already been registered with some other + * block_id. + */ +#ifdef USE_ASSERT_CHECKING + { + int i; + + for (i = 0; i < max_registered_block_id; i++) + { + registered_buffer *regbuf_old = ®istered_buffers[i]; + + if (i == block_id || !regbuf_old->in_use) + continue; + + Assert(!RelFileNodeEquals(regbuf_old->rnode, regbuf->rnode) || + regbuf_old->forkno != regbuf->forkno || + regbuf_old->block != regbuf->block); + } + } +#endif + + regbuf->in_use = true; +} + +/* + * Add data to the WAL record that's being constructed. + * + * The data is appended to the "main chunk", available at replay with + * XLogGetRecData(). + */ +void +XLogRegisterData(char *data, int len) +{ + XLogRecData *rdata; + + Assert(begininsert_called); + + if (num_rdatas >= max_rdatas) + elog(ERROR, "too much WAL data"); + rdata = &rdatas[num_rdatas++]; + + rdata->data = data; + rdata->len = len; + + /* + * we use the mainrdata_last pointer to track the end of the chain, so no + * need to clear 'next' here. + */ + + mainrdata_last->next = rdata; + mainrdata_last = rdata; + + mainrdata_len += len; +} + +/* + * Add buffer-specific data to the WAL record that's being constructed. + * + * Block_id must reference a block previously registered with + * XLogRegisterBuffer(). If this is called more than once for the same + * block_id, the data is appended. + * + * The maximum amount of data that can be registered per block is 65535 + * bytes. That should be plenty; if you need more than BLCKSZ bytes to + * reconstruct the changes to the page, you might as well just log a full + * copy of it. (the "main data" that's not associated with a block is not + * limited) + */ +void +XLogRegisterBufData(uint8 block_id, char *data, int len) +{ + registered_buffer *regbuf; + XLogRecData *rdata; + + Assert(begininsert_called); + + /* find the registered buffer struct */ + regbuf = ®istered_buffers[block_id]; + if (!regbuf->in_use) + elog(ERROR, "no block with id %d registered with WAL insertion", + block_id); + + if (num_rdatas >= max_rdatas) + elog(ERROR, "too much WAL data"); + rdata = &rdatas[num_rdatas++]; + + rdata->data = data; + rdata->len = len; + + regbuf->rdata_tail->next = rdata; + regbuf->rdata_tail = rdata; + regbuf->rdata_len += len; +} + +/* + * Insert an XLOG record having the specified RMID and info bytes, with the + * body of the record being the data and buffer references registered earlier + * with XLogRegister* calls. * * Returns XLOG pointer to end of record (beginning of next record). * This can be used as LSN for data pages affected by the logged action. * (LSN is the XLOG point up to which the XLOG must be flushed to disk * before the data page can be written out. This implements the basic * WAL rule "write the log before the data".) - * - * NB: this routine feels free to scribble on the XLogRecData structs, - * though not on the data they reference. This is OK since the XLogRecData - * structs are always just temporaries in the calling code. */ XLogRecPtr -XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata) +XLogInsert(RmgrId rmid, uint8 info) { - XLogRecPtr RedoRecPtr; - bool doPageWrites; XLogRecPtr EndPos; - XLogRecPtr fpw_lsn; - XLogRecData *rdt; - XLogRecData *rdt_lastnormal; - /* info's high bits are reserved for use by me */ - if (info & XLR_INFO_MASK) + /* XLogBeginInsert() must have been called. */ + if (!begininsert_called) + elog(ERROR, "XLogBeginInsert was not called"); + + /* + * The caller can set rmgr bits and XLR_SPECIAL_REL_UPDATE; the rest are + * reserved for use by me. + */ + if ((info & ~(XLR_RMGR_INFO_MASK | XLR_SPECIAL_REL_UPDATE)) != 0) elog(PANIC, "invalid xlog info mask %02X", info); TRACE_POSTGRESQL_XLOG_INSERT(rmid, info); @@ -67,292 +400,282 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata) */ if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID) { + XLogResetInsertion(); EndPos = SizeOfXLogLongPHD; /* start of 1st chkpt record */ return EndPos; } - /* - * Get values needed to decide whether to do full-page writes. Since we - * don't yet have an insertion lock, these could change under us, but - * XLogInsertRecord will recheck them once it has a lock. - */ - GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites); - - /* - * Assemble an XLogRecData chain representing the WAL record, including - * any backup blocks needed. - * - * We may have to loop back to here if a race condition is detected in - * XLogInsertRecord. We could prevent the race by doing all this work - * while holding an insertion lock, but it seems better to avoid doing CRC - * calculations while holding one. - */ -retry: - rdt = XLogRecordAssemble(rmid, info, rdata, RedoRecPtr, doPageWrites, - &fpw_lsn, &rdt_lastnormal); - - EndPos = XLogInsertRecord(rdt, fpw_lsn); - - if (EndPos == InvalidXLogRecPtr) + do { + XLogRecPtr RedoRecPtr; + bool doPageWrites; + XLogRecPtr fpw_lsn; + XLogRecData *rdt; + /* - * Undo the changes we made to the rdata chain, and retry. - * - * XXX: This doesn't undo *all* the changes; the XLogRecData - * entries for buffers that we had already decided to back up have - * had their data-pointers cleared. That's OK, as long as we - * decide to back them up on the next iteration as well. Hence, - * don't allow "doPageWrites" value to go from true to false after - * we've modified the rdata chain. + * Get values needed to decide whether to do full-page writes. Since + * we don't yet have an insertion lock, these could change under us, + * but XLogInsertRecData will recheck them once it has a lock. */ - bool newDoPageWrites; + GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites); - GetFullPageWriteInfo(&RedoRecPtr, &newDoPageWrites); - doPageWrites = doPageWrites || newDoPageWrites; - rdt_lastnormal->next = NULL; + rdt = XLogRecordAssemble(rmid, info, RedoRecPtr, doPageWrites, + &fpw_lsn); - goto retry; - } + EndPos = XLogInsertRecord(rdt, fpw_lsn); + } while (EndPos == InvalidXLogRecPtr); + + XLogResetInsertion(); return EndPos; } /* - * Assemble a full WAL record, including backup blocks, from an XLogRecData - * chain, ready for insertion with XLogInsertRecord(). The record header - * fields are filled in, except for the xl_prev field and CRC. + * Assemble a WAL record from the registered data and buffers into an + * XLogRecData chain, ready for insertion with XLogInsertRecord(). * - * The rdata chain is modified, adding entries for full-page images. - * *rdt_lastnormal is set to point to the last normal (ie. not added by - * this function) entry. It can be used to reset the chain to its original - * state. + * The record header fields are filled in, except for the xl_prev field. The + * calculated CRC does not include xl_prev either. * - * If the rdata chain contains any buffer references, and a full-page image - * was not taken of all the buffers, *fpw_lsn is set to the lowest LSN among - * such pages. This signals that the assembled record is only good for - * insertion on the assumption that the RedoRecPtr and doPageWrites values - * were up-to-date. + * If there are any registered buffers, and a full-page image was not taken + * of all them, *page_writes_omitted is set to true. This signals that the + * assembled record is only good for insertion on the assumption that the + * RedoRecPtr and doPageWrites values were up-to-date. */ static XLogRecData * -XLogRecordAssemble(RmgrId rmid, uint8 info, XLogRecData *rdata, +XLogRecordAssemble(RmgrId rmid, uint8 info, XLogRecPtr RedoRecPtr, bool doPageWrites, - XLogRecPtr *fpw_lsn, XLogRecData **rdt_lastnormal) + XLogRecPtr *fpw_lsn) { - bool isLogSwitch = (rmid == RM_XLOG_ID && info == XLOG_SWITCH); XLogRecData *rdt; - Buffer dtbuf[XLR_MAX_BKP_BLOCKS]; - bool dtbuf_bkp[XLR_MAX_BKP_BLOCKS]; - uint32 len, - total_len; - unsigned i; + uint32 total_len = 0; + int block_id; + pg_crc32 rdata_crc; + registered_buffer *prev_regbuf = NULL; + XLogRecData *rdt_datas_last; + XLogRecord *rechdr; + char *scratch = hdr_scratch; /* - * These need to be static because they are returned to the caller as part - * of the XLogRecData chain. + * Note: this function can be called multiple times for the same record. + * All the modifications we do to the rdata chains below must handle that. */ - static BkpBlock dtbuf_xlg[XLR_MAX_BKP_BLOCKS]; - static XLogRecData dtbuf_rdt1[XLR_MAX_BKP_BLOCKS]; - static XLogRecData dtbuf_rdt2[XLR_MAX_BKP_BLOCKS]; - static XLogRecData dtbuf_rdt3[XLR_MAX_BKP_BLOCKS]; - static XLogRecData hdr_rdt; - static XLogRecord *rechdr; - - if (rechdr == NULL) - { - static char rechdrbuf[SizeOfXLogRecord + MAXIMUM_ALIGNOF]; - rechdr = (XLogRecord *) MAXALIGN(&rechdrbuf); - MemSet(rechdr, 0, SizeOfXLogRecord); - } + /* The record begins with the fixed-size header */ + rechdr = (XLogRecord *) scratch; + scratch += SizeOfXLogRecord; - /* The record begins with the header */ - hdr_rdt.data = (char *) rechdr; - hdr_rdt.len = SizeOfXLogRecord; - hdr_rdt.next = rdata; - total_len = SizeOfXLogRecord; + hdr_rdt.next = NULL; + rdt_datas_last = &hdr_rdt; + hdr_rdt.data = hdr_scratch; /* - * Here we scan the rdata chain, to determine which buffers must be backed - * up. - * - * We add entries for backup blocks to the chain, so that they don't need - * any special treatment in the critical section where the chunks are - * copied into the WAL buffers. Those entries have to be unlinked from the - * chain if we have to loop back here. + * Make an rdata chain containing all the data portions of all block + * references. This includes the data for full-page images. Also append + * the headers for the block references in the scratch buffer. */ - for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++) - { - dtbuf[i] = InvalidBuffer; - dtbuf_bkp[i] = false; - } - *fpw_lsn = InvalidXLogRecPtr; - len = 0; - for (rdt = rdata;;) + for (block_id = 0; block_id < max_registered_block_id; block_id++) { - if (rdt->buffer == InvalidBuffer) + registered_buffer *regbuf = ®istered_buffers[block_id]; + bool needs_backup; + bool needs_data; + XLogRecordBlockHeader bkpb; + XLogRecordBlockImageHeader bimg; + bool samerel; + + if (!regbuf->in_use) + continue; + + /* Determine if this block needs to be backed up */ + if (regbuf->flags & REGBUF_FORCE_IMAGE) + needs_backup = true; + else if (regbuf->flags & REGBUF_NO_IMAGE) + needs_backup = false; + else if (!doPageWrites) + needs_backup = false; + else { - /* Simple data, just include it */ - len += rdt->len; + /* + * We assume page LSN is first data on *every* page that can be + * passed to XLogInsert, whether it has the standard page layout + * or not. + */ + XLogRecPtr page_lsn = PageGetLSN(regbuf->page); + + needs_backup = (page_lsn <= RedoRecPtr); + if (!needs_backup) + { + if (*fpw_lsn == InvalidXLogRecPtr || page_lsn < *fpw_lsn) + *fpw_lsn = page_lsn; + } } + + /* Determine if the buffer data needs to included */ + if (regbuf->rdata_len == 0) + needs_data = false; + else if ((regbuf->flags & REGBUF_KEEP_DATA) != 0) + needs_data = true; else + needs_data = !needs_backup; + + bkpb.id = block_id; + bkpb.fork_flags = regbuf->forkno; + bkpb.data_length = 0; + + if ((regbuf->flags & REGBUF_WILL_INIT) == REGBUF_WILL_INIT) + bkpb.fork_flags |= BKPBLOCK_WILL_INIT; + + if (needs_backup) { - /* Find info for buffer */ - for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++) + Page page = regbuf->page; + + /* + * The page needs to be backed up, so set up *bimg + */ + if (regbuf->flags & REGBUF_STANDARD) { - if (rdt->buffer == dtbuf[i]) + /* Assume we can omit data between pd_lower and pd_upper */ + uint16 lower = ((PageHeader) page)->pd_lower; + uint16 upper = ((PageHeader) page)->pd_upper; + + if (lower >= SizeOfPageHeaderData && + upper > lower && + upper <= BLCKSZ) { - /* Buffer already referenced by earlier chain item */ - if (dtbuf_bkp[i]) - { - rdt->data = NULL; - rdt->len = 0; - } - else if (rdt->data) - len += rdt->len; - break; + bimg.hole_offset = lower; + bimg.hole_length = upper - lower; } - if (dtbuf[i] == InvalidBuffer) + else { - /* OK, put it in this slot */ - XLogRecPtr page_lsn; - bool needs_backup; - - dtbuf[i] = rdt->buffer; - - /* - * Determine whether the buffer has to be backed up. - * - * We assume page LSN is first data on *every* page that - * can be passed to XLogInsert, whether it has the - * standard page layout or not. We don't need to take the - * buffer header lock for PageGetLSN because we hold an - * exclusive lock on the page and/or the relation. - */ - page_lsn = PageGetLSN(BufferGetPage(rdt->buffer)); - if (!doPageWrites) - needs_backup = false; - else if (page_lsn <= RedoRecPtr) - needs_backup = true; - else - needs_backup = false; - - if (needs_backup) - { - /* - * The page needs to be backed up, so set up BkpBlock - */ - XLogFillBkpBlock(rdt->buffer, rdt->buffer_std, - &(dtbuf_xlg[i])); - dtbuf_bkp[i] = true; - rdt->data = NULL; - rdt->len = 0; - } - else - { - if (rdt->data) - len += rdt->len; - if (*fpw_lsn == InvalidXLogRecPtr || - page_lsn < *fpw_lsn) - { - *fpw_lsn = page_lsn; - } - } - break; + /* No "hole" to compress out */ + bimg.hole_offset = 0; + bimg.hole_length = 0; } } - if (i >= XLR_MAX_BKP_BLOCKS) - elog(PANIC, "can backup at most %d blocks per xlog record", - XLR_MAX_BKP_BLOCKS); - } - /* Break out of loop when rdt points to last chain item */ - if (rdt->next == NULL) - break; - rdt = rdt->next; - } - total_len += len; + else + { + /* Not a standard page header, don't try to eliminate "hole" */ + bimg.hole_offset = 0; + bimg.hole_length = 0; + } - /* - * Make additional rdata chain entries for the backup blocks, so that we - * don't need to special-case them in the write loop. This modifies the - * original rdata chain, but we keep a pointer to the last regular entry, - * rdt_lastnormal, so that we can undo this if we have to start over. - * - * At the exit of this loop, total_len includes the backup block data. - * - * Also set the appropriate info bits to show which buffers were backed - * up. The XLR_BKP_BLOCK(N) bit corresponds to the N'th distinct buffer - * value (ignoring InvalidBuffer) appearing in the rdata chain. - */ - *rdt_lastnormal = rdt; - for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++) - { - BkpBlock *bkpb; - char *page; + /* Fill in the remaining fields in the XLogRecordBlockData struct */ + bkpb.fork_flags |= BKPBLOCK_HAS_IMAGE; - if (!dtbuf_bkp[i]) - continue; + total_len += BLCKSZ - bimg.hole_length; + + /* + * Construct XLogRecData entries for the page content. + */ + rdt_datas_last->next = ®buf->bkp_rdatas[0]; + rdt_datas_last = rdt_datas_last->next; + if (bimg.hole_length == 0) + { + rdt_datas_last->data = page; + rdt_datas_last->len = BLCKSZ; + } + else + { + /* must skip the hole */ + rdt_datas_last->data = page; + rdt_datas_last->len = bimg.hole_offset; - info |= XLR_BKP_BLOCK(i); + rdt_datas_last->next = ®buf->bkp_rdatas[1]; + rdt_datas_last = rdt_datas_last->next; - bkpb = &(dtbuf_xlg[i]); - page = (char *) BufferGetBlock(dtbuf[i]); + rdt_datas_last->data = page + (bimg.hole_offset + bimg.hole_length); + rdt_datas_last->len = BLCKSZ - (bimg.hole_offset + bimg.hole_length); + } + } - rdt->next = &(dtbuf_rdt1[i]); - rdt = rdt->next; + if (needs_data) + { + /* + * Link the caller-supplied rdata chain for this buffer to the + * overall list. + */ + bkpb.fork_flags |= BKPBLOCK_HAS_DATA; + bkpb.data_length = regbuf->rdata_len; + total_len += regbuf->rdata_len; + + rdt_datas_last->next = regbuf->rdata_head; + rdt_datas_last = regbuf->rdata_tail; + } - rdt->data = (char *) bkpb; - rdt->len = sizeof(BkpBlock); - total_len += sizeof(BkpBlock); + if (prev_regbuf && RelFileNodeEquals(regbuf->rnode, prev_regbuf->rnode)) + { + samerel = true; + bkpb.fork_flags |= BKPBLOCK_SAME_REL; + prev_regbuf = regbuf; + } + else + samerel = false; - rdt->next = &(dtbuf_rdt2[i]); - rdt = rdt->next; + /* Ok, copy the header to the scratch buffer */ + memcpy(scratch, &bkpb, SizeOfXLogRecordBlockHeader); + scratch += SizeOfXLogRecordBlockHeader; + if (needs_backup) + { + memcpy(scratch, &bimg, SizeOfXLogRecordBlockImageHeader); + scratch += SizeOfXLogRecordBlockImageHeader; + } + if (!samerel) + { + memcpy(scratch, ®buf->rnode, sizeof(RelFileNode)); + scratch += sizeof(RelFileNode); + } + memcpy(scratch, ®buf->block, sizeof(BlockNumber)); + scratch += sizeof(BlockNumber); + } - if (bkpb->hole_length == 0) + /* followed by main data, if any */ + if (mainrdata_len > 0) + { + if (mainrdata_len > 255) { - rdt->data = page; - rdt->len = BLCKSZ; - total_len += BLCKSZ; - rdt->next = NULL; + *(scratch++) = XLR_BLOCK_ID_DATA_LONG; + memcpy(scratch, &mainrdata_len, sizeof(uint32)); + scratch += sizeof(uint32); } else { - /* must skip the hole */ - rdt->data = page; - rdt->len = bkpb->hole_offset; - total_len += bkpb->hole_offset; - - rdt->next = &(dtbuf_rdt3[i]); - rdt = rdt->next; - - rdt->data = page + (bkpb->hole_offset + bkpb->hole_length); - rdt->len = BLCKSZ - (bkpb->hole_offset + bkpb->hole_length); - total_len += rdt->len; - rdt->next = NULL; + *(scratch++) = XLR_BLOCK_ID_DATA_SHORT; + *(scratch++) = (uint8) mainrdata_len; } + rdt_datas_last->next = mainrdata_head; + rdt_datas_last = mainrdata_last; + total_len += mainrdata_len; } + rdt_datas_last->next = NULL; + + hdr_rdt.len = (scratch - hdr_scratch); + total_len += hdr_rdt.len; /* - * We disallow len == 0 because it provides a useful bit of extra error - * checking in ReadRecord. This means that all callers of XLogInsert - * must supply at least some not-in-a-buffer data. However, we make an - * exception for XLOG SWITCH records because we don't want them to ever - * cross a segment boundary. + * Calculate CRC of the data + * + * Note that the record header isn't added into the CRC initially since we + * don't know the prev-link yet. Thus, the CRC will represent the CRC of + * the whole record in the order: rdata, then backup blocks, then record + * header. */ - if (len == 0 && !isLogSwitch) - elog(PANIC, "invalid xlog record length %u", rechdr->xl_len); + INIT_CRC32C(rdata_crc); + COMP_CRC32C(rdata_crc, hdr_scratch + SizeOfXLogRecord, hdr_rdt.len - SizeOfXLogRecord); + for (rdt = hdr_rdt.next; rdt != NULL; rdt = rdt->next) + COMP_CRC32C(rdata_crc, rdt->data, rdt->len); /* * Fill in the fields in the record header. Prev-link is filled in later, - * once we know where in the WAL the record will be inserted. CRC is also - * not calculated yet. + * once we know where in the WAL the record will be inserted. The CRC does + * not include the record header yet. */ rechdr->xl_xid = GetCurrentTransactionIdIfAny(); rechdr->xl_tot_len = total_len; - rechdr->xl_len = len; /* doesn't include backup blocks */ rechdr->xl_info = info; rechdr->xl_rmid = rmid; rechdr->xl_prev = InvalidXLogRecPtr; + rechdr->xl_crc = rdata_crc; return &hdr_rdt; } @@ -429,45 +752,41 @@ XLogSaveBufferForHint(Buffer buffer, bool buffer_std) if (lsn <= RedoRecPtr) { - XLogRecData rdata[2]; - BkpBlock bkpb; + int flags; char copied_buffer[BLCKSZ]; char *origdata = (char *) BufferGetBlock(buffer); - - /* Make a BkpBlock struct representing the buffer */ - XLogFillBkpBlock(buffer, buffer_std, &bkpb); + RelFileNode rnode; + ForkNumber forkno; + BlockNumber blkno; /* * Copy buffer so we don't have to worry about concurrent hint bit or * lsn updates. We assume pd_lower/upper cannot be changed without an * exclusive lock, so the contents bkp are not racy. - * - * With buffer_std set to false, XLogFillBkpBlock() sets hole_length - * and hole_offset to 0; so the following code is safe for either - * case. */ - memcpy(copied_buffer, origdata, bkpb.hole_offset); - memcpy(copied_buffer + bkpb.hole_offset, - origdata + bkpb.hole_offset + bkpb.hole_length, - BLCKSZ - bkpb.hole_offset - bkpb.hole_length); + if (buffer_std) + { + /* Assume we can omit data between pd_lower and pd_upper */ + Page page = BufferGetPage(buffer); + uint16 lower = ((PageHeader) page)->pd_lower; + uint16 upper = ((PageHeader) page)->pd_upper; - /* - * Header for backup block. - */ - rdata[0].data = (char *) &bkpb; - rdata[0].len = sizeof(BkpBlock); - rdata[0].buffer = InvalidBuffer; - rdata[0].next = &(rdata[1]); + memcpy(copied_buffer, origdata, lower); + memcpy(copied_buffer + upper, origdata + upper, BLCKSZ - upper); + } + else + memcpy(copied_buffer, origdata, BLCKSZ); - /* - * Save copy of the buffer. - */ - rdata[1].data = copied_buffer; - rdata[1].len = BLCKSZ - bkpb.hole_length; - rdata[1].buffer = InvalidBuffer; - rdata[1].next = NULL; + XLogBeginInsert(); - recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI, rdata); + flags = REGBUF_FORCE_IMAGE; + if (buffer_std) + flags |= REGBUF_STANDARD; + + BufferGetTag(buffer, &rnode, &forkno, &blkno); + XLogRegisterBlock(0, &rnode, forkno, blkno, copied_buffer, flags); + + recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI); } return recptr; @@ -489,71 +808,16 @@ XLogRecPtr log_newpage(RelFileNode *rnode, ForkNumber forkNum, BlockNumber blkno, Page page, bool page_std) { - BkpBlock bkpb; + int flags; XLogRecPtr recptr; - XLogRecData rdata[3]; - - /* NO ELOG(ERROR) from here till newpage op is logged */ - START_CRIT_SECTION(); - - bkpb.node = *rnode; - bkpb.fork = forkNum; - bkpb.block = blkno; + flags = REGBUF_FORCE_IMAGE; if (page_std) - { - /* Assume we can omit data between pd_lower and pd_upper */ - uint16 lower = ((PageHeader) page)->pd_lower; - uint16 upper = ((PageHeader) page)->pd_upper; - - if (lower >= SizeOfPageHeaderData && - upper > lower && - upper <= BLCKSZ) - { - bkpb.hole_offset = lower; - bkpb.hole_length = upper - lower; - } - else - { - /* No "hole" to compress out */ - bkpb.hole_offset = 0; - bkpb.hole_length = 0; - } - } - else - { - /* Not a standard page header, don't try to eliminate "hole" */ - bkpb.hole_offset = 0; - bkpb.hole_length = 0; - } - - rdata[0].data = (char *) &bkpb; - rdata[0].len = sizeof(BkpBlock); - rdata[0].buffer = InvalidBuffer; - rdata[0].next = &(rdata[1]); - - if (bkpb.hole_length == 0) - { - rdata[1].data = (char *) page; - rdata[1].len = BLCKSZ; - rdata[1].buffer = InvalidBuffer; - rdata[1].next = NULL; - } - else - { - /* must skip the hole */ - rdata[1].data = (char *) page; - rdata[1].len = bkpb.hole_offset; - rdata[1].buffer = InvalidBuffer; - rdata[1].next = &rdata[2]; - - rdata[2].data = (char *) page + (bkpb.hole_offset + bkpb.hole_length); - rdata[2].len = BLCKSZ - (bkpb.hole_offset + bkpb.hole_length); - rdata[2].buffer = InvalidBuffer; - rdata[2].next = NULL; - } + flags |= REGBUF_STANDARD; - recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI, rdata); + XLogBeginInsert(); + XLogRegisterBlock(0, rnode, forkNum, blkno, page, flags); + recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI); /* * The page may be uninitialized. If so, we can't set the LSN because that @@ -564,8 +828,6 @@ log_newpage(RelFileNode *rnode, ForkNumber forkNum, BlockNumber blkno, PageSetLSN(page, recptr); } - END_CRIT_SECTION(); - return recptr; } @@ -596,38 +858,38 @@ log_newpage_buffer(Buffer buffer, bool page_std) } /* - * Fill a BkpBlock for a buffer. + * Allocate working buffers needed for WAL record construction. */ -static void -XLogFillBkpBlock(Buffer buffer, bool buffer_std, BkpBlock *bkpb) +void +InitXLogInsert(void) { - BufferGetTag(buffer, &bkpb->node, &bkpb->fork, &bkpb->block); + /* Initialize the working areas */ + if (xloginsert_cxt == NULL) + { + xloginsert_cxt = AllocSetContextCreate(TopMemoryContext, + "WAL record construction", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + } - if (buffer_std) + if (registered_buffers == NULL) { - /* Assume we can omit data between pd_lower and pd_upper */ - Page page = BufferGetPage(buffer); - uint16 lower = ((PageHeader) page)->pd_lower; - uint16 upper = ((PageHeader) page)->pd_upper; - - if (lower >= SizeOfPageHeaderData && - upper > lower && - upper <= BLCKSZ) - { - bkpb->hole_offset = lower; - bkpb->hole_length = upper - lower; - } - else - { - /* No "hole" to compress out */ - bkpb->hole_offset = 0; - bkpb->hole_length = 0; - } + registered_buffers = (registered_buffer *) + MemoryContextAllocZero(xloginsert_cxt, + sizeof(registered_buffer) * (XLR_NORMAL_MAX_BLOCK_ID + 1)); + max_registered_buffers = XLR_NORMAL_MAX_BLOCK_ID + 1; } - else + if (rdatas == NULL) { - /* Not a standard page header, don't try to eliminate "hole" */ - bkpb->hole_offset = 0; - bkpb->hole_length = 0; + rdatas = MemoryContextAlloc(xloginsert_cxt, + sizeof(XLogRecData) * XLR_NORMAL_RDATAS); + max_rdatas = XLR_NORMAL_RDATAS; } + + /* + * Allocate a buffer to hold the header information for a WAL record. + */ + if (hdr_scratch == NULL) + hdr_scratch = palloc0(HEADER_SCRATCH_SIZE); } |