aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/transam/xloginsert.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/transam/xloginsert.c')
-rw-r--r--src/backend/access/transam/xloginsert.c972
1 files changed, 617 insertions, 355 deletions
diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c
index b83343bf5bd..89c407e521b 100644
--- a/src/backend/access/transam/xloginsert.c
+++ b/src/backend/access/transam/xloginsert.c
@@ -3,6 +3,12 @@
* xloginsert.c
* Functions for constructing WAL records
*
+ * Constructing a WAL record begins with a call to XLogBeginInsert,
+ * followed by a number of XLogRegister* calls. The registered data is
+ * collected in private working memory, and finally assembled into a chain
+ * of XLogRecData structs by a call to XLogRecordAssemble(). See
+ * access/transam/README for details.
+ *
* Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
@@ -24,39 +30,366 @@
#include "utils/memutils.h"
#include "pg_trace.h"
+/*
+ * For each block reference registered with XLogRegisterBuffer, we fill in
+ * a registered_buffer struct.
+ */
+typedef struct
+{
+ bool in_use; /* is this slot in use? */
+ uint8 flags; /* REGBUF_* flags */
+ RelFileNode rnode; /* identifies the relation and block */
+ ForkNumber forkno;
+ BlockNumber block;
+ Page page; /* page content */
+ uint32 rdata_len; /* total length of data in rdata chain */
+ XLogRecData *rdata_head; /* head of the chain of data registered with
+ * this block */
+ XLogRecData *rdata_tail; /* last entry in the chain, or &rdata_head if
+ * empty */
+
+ XLogRecData bkp_rdatas[2]; /* temporary rdatas used to hold references to
+ * backup block data in XLogRecordAssemble() */
+} registered_buffer;
+
+static registered_buffer *registered_buffers;
+static int max_registered_buffers; /* allocated size */
+static int max_registered_block_id = 0; /* highest block_id + 1
+ * currently registered */
+
+/*
+ * A chain of XLogRecDatas to hold the "main data" of a WAL record, registered
+ * with XLogRegisterData(...).
+ */
+static XLogRecData *mainrdata_head;
+static XLogRecData *mainrdata_last = (XLogRecData *) &mainrdata_head;
+static uint32 mainrdata_len; /* total # of bytes in chain */
+
+/*
+ * These are used to hold the record header while constructing a record.
+ * 'hdr_scratch' is not a plain variable, but is palloc'd at initialization,
+ * because we want it to be MAXALIGNed and padding bytes zeroed.
+ *
+ * For simplicity, it's allocated large enough to hold the headers for any
+ * WAL record.
+ */
+static XLogRecData hdr_rdt;
+static char *hdr_scratch = NULL;
+
+#define HEADER_SCRATCH_SIZE \
+ (SizeOfXLogRecord + \
+ MaxSizeOfXLogRecordBlockHeader * (XLR_MAX_BLOCK_ID + 1) + \
+ SizeOfXLogRecordDataHeaderLong)
+
+/*
+ * An array of XLogRecData structs, to hold registered data.
+ */
+static XLogRecData *rdatas;
+static int num_rdatas; /* entries currently used */
+static int max_rdatas; /* allocated size */
+
+static bool begininsert_called = false;
+
+/* Memory context to hold the registered buffer and data references. */
+static MemoryContext xloginsert_cxt;
+
static XLogRecData *XLogRecordAssemble(RmgrId rmid, uint8 info,
- XLogRecData *rdata,
XLogRecPtr RedoRecPtr, bool doPageWrites,
- XLogRecPtr *fpw_lsn, XLogRecData **rdt_lastnormal);
-static void XLogFillBkpBlock(Buffer buffer, bool buffer_std, BkpBlock *bkpb);
+ XLogRecPtr *fpw_lsn);
+
+/*
+ * Begin constructing a WAL record. This must be called before the
+ * XLogRegister* functions and XLogInsert().
+ */
+void
+XLogBeginInsert(void)
+{
+ Assert(max_registered_block_id == 0);
+ Assert(mainrdata_last == (XLogRecData *) &mainrdata_head);
+ Assert(mainrdata_len == 0);
+ Assert(!begininsert_called);
+
+ /* cross-check on whether we should be here or not */
+ if (!XLogInsertAllowed())
+ elog(ERROR, "cannot make new WAL entries during recovery");
+
+ begininsert_called = true;
+}
/*
- * Insert an XLOG record having the specified RMID and info bytes,
- * with the body of the record being the data chunk(s) described by
- * the rdata chain (see xloginsert.h for notes about rdata).
+ * Ensure that there are enough buffer and data slots in the working area,
+ * for subsequent XLogRegisterBuffer, XLogRegisterData and XLogRegisterBufData
+ * calls.
+ *
+ * There is always space for a small number of buffers and data chunks, enough
+ * for most record types. This function is for the exceptional cases that need
+ * more.
+ */
+void
+XLogEnsureRecordSpace(int max_block_id, int ndatas)
+{
+ int nbuffers;
+
+ /*
+ * This must be called before entering a critical section, because
+ * allocating memory inside a critical section can fail. repalloc() will
+ * check the same, but better to check it here too so that we fail
+ * consistently even if the arrays happen to be large enough already.
+ */
+ Assert(CritSectionCount == 0);
+
+ /* the minimum values can't be decreased */
+ if (max_block_id < XLR_NORMAL_MAX_BLOCK_ID)
+ max_block_id = XLR_NORMAL_MAX_BLOCK_ID;
+ if (ndatas < XLR_NORMAL_RDATAS)
+ ndatas = XLR_NORMAL_RDATAS;
+
+ if (max_block_id > XLR_MAX_BLOCK_ID)
+ elog(ERROR, "maximum number of WAL record block references exceeded");
+ nbuffers = max_block_id + 1;
+
+ if (nbuffers > max_registered_buffers)
+ {
+ registered_buffers = (registered_buffer *)
+ repalloc(registered_buffers, sizeof(registered_buffer) * nbuffers);
+
+ /*
+ * At least the padding bytes in the structs must be zeroed, because
+ * they are included in WAL data, but initialize it all for tidiness.
+ */
+ MemSet(&registered_buffers[max_registered_buffers], 0,
+ (nbuffers - max_registered_buffers) * sizeof(registered_buffer));
+ max_registered_buffers = nbuffers;
+ }
+
+ if (ndatas > max_rdatas)
+ {
+ rdatas = (XLogRecData *) repalloc(rdatas, sizeof(XLogRecData) * ndatas);
+ max_rdatas = ndatas;
+ }
+}
+
+/*
+ * Reset WAL record construction buffers.
+ */
+void
+XLogResetInsertion(void)
+{
+ int i;
+
+ for (i = 0; i < max_registered_block_id; i++)
+ registered_buffers[i].in_use = false;
+
+ num_rdatas = 0;
+ max_registered_block_id = 0;
+ mainrdata_len = 0;
+ mainrdata_last = (XLogRecData *) &mainrdata_head;
+ begininsert_called = false;
+}
+
+/*
+ * Register a reference to a buffer with the WAL record being constructed.
+ * This must be called for every page that the WAL-logged operation modifies.
+ */
+void
+XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags)
+{
+ registered_buffer *regbuf;
+
+ /* NO_IMAGE doesn't make sense with FORCE_IMAGE */
+ Assert(!((flags & REGBUF_FORCE_IMAGE) && (flags & (REGBUF_NO_IMAGE))));
+ Assert(begininsert_called);
+
+ if (block_id >= max_registered_block_id)
+ {
+ if (block_id >= max_registered_buffers)
+ elog(ERROR, "too many registered buffers");
+ max_registered_block_id = block_id + 1;
+ }
+
+ regbuf = &registered_buffers[block_id];
+
+ BufferGetTag(buffer, &regbuf->rnode, &regbuf->forkno, &regbuf->block);
+ regbuf->page = BufferGetPage(buffer);
+ regbuf->flags = flags;
+ regbuf->rdata_tail = (XLogRecData *) &regbuf->rdata_head;
+ regbuf->rdata_len = 0;
+
+ /*
+ * Check that this page hasn't already been registered with some other
+ * block_id.
+ */
+#ifdef USE_ASSERT_CHECKING
+ {
+ int i;
+
+ for (i = 0; i < max_registered_block_id; i++)
+ {
+ registered_buffer *regbuf_old = &registered_buffers[i];
+
+ if (i == block_id || !regbuf_old->in_use)
+ continue;
+
+ Assert(!RelFileNodeEquals(regbuf_old->rnode, regbuf->rnode) ||
+ regbuf_old->forkno != regbuf->forkno ||
+ regbuf_old->block != regbuf->block);
+ }
+ }
+#endif
+
+ regbuf->in_use = true;
+}
+
+/*
+ * Like XLogRegisterBuffer, but for registering a block that's not in the
+ * shared buffer pool (i.e. when you don't have a Buffer for it).
+ */
+void
+XLogRegisterBlock(uint8 block_id, RelFileNode *rnode, ForkNumber forknum,
+ BlockNumber blknum, Page page, uint8 flags)
+{
+ registered_buffer *regbuf;
+
+ /* This is currently only used to WAL-log a full-page image of a page */
+ Assert(flags & REGBUF_FORCE_IMAGE);
+ Assert(begininsert_called);
+
+ if (block_id >= max_registered_block_id)
+ max_registered_block_id = block_id + 1;
+
+ if (block_id >= max_registered_buffers)
+ elog(ERROR, "too many registered buffers");
+
+ regbuf = &registered_buffers[block_id];
+
+ regbuf->rnode = *rnode;
+ regbuf->forkno = forknum;
+ regbuf->block = blknum;
+ regbuf->page = page;
+ regbuf->flags = flags;
+ regbuf->rdata_tail = (XLogRecData *) &regbuf->rdata_head;
+ regbuf->rdata_len = 0;
+
+ /*
+ * Check that this page hasn't already been registered with some other
+ * block_id.
+ */
+#ifdef USE_ASSERT_CHECKING
+ {
+ int i;
+
+ for (i = 0; i < max_registered_block_id; i++)
+ {
+ registered_buffer *regbuf_old = &registered_buffers[i];
+
+ if (i == block_id || !regbuf_old->in_use)
+ continue;
+
+ Assert(!RelFileNodeEquals(regbuf_old->rnode, regbuf->rnode) ||
+ regbuf_old->forkno != regbuf->forkno ||
+ regbuf_old->block != regbuf->block);
+ }
+ }
+#endif
+
+ regbuf->in_use = true;
+}
+
+/*
+ * Add data to the WAL record that's being constructed.
+ *
+ * The data is appended to the "main chunk", available at replay with
+ * XLogGetRecData().
+ */
+void
+XLogRegisterData(char *data, int len)
+{
+ XLogRecData *rdata;
+
+ Assert(begininsert_called);
+
+ if (num_rdatas >= max_rdatas)
+ elog(ERROR, "too much WAL data");
+ rdata = &rdatas[num_rdatas++];
+
+ rdata->data = data;
+ rdata->len = len;
+
+ /*
+ * we use the mainrdata_last pointer to track the end of the chain, so no
+ * need to clear 'next' here.
+ */
+
+ mainrdata_last->next = rdata;
+ mainrdata_last = rdata;
+
+ mainrdata_len += len;
+}
+
+/*
+ * Add buffer-specific data to the WAL record that's being constructed.
+ *
+ * Block_id must reference a block previously registered with
+ * XLogRegisterBuffer(). If this is called more than once for the same
+ * block_id, the data is appended.
+ *
+ * The maximum amount of data that can be registered per block is 65535
+ * bytes. That should be plenty; if you need more than BLCKSZ bytes to
+ * reconstruct the changes to the page, you might as well just log a full
+ * copy of it. (the "main data" that's not associated with a block is not
+ * limited)
+ */
+void
+XLogRegisterBufData(uint8 block_id, char *data, int len)
+{
+ registered_buffer *regbuf;
+ XLogRecData *rdata;
+
+ Assert(begininsert_called);
+
+ /* find the registered buffer struct */
+ regbuf = &registered_buffers[block_id];
+ if (!regbuf->in_use)
+ elog(ERROR, "no block with id %d registered with WAL insertion",
+ block_id);
+
+ if (num_rdatas >= max_rdatas)
+ elog(ERROR, "too much WAL data");
+ rdata = &rdatas[num_rdatas++];
+
+ rdata->data = data;
+ rdata->len = len;
+
+ regbuf->rdata_tail->next = rdata;
+ regbuf->rdata_tail = rdata;
+ regbuf->rdata_len += len;
+}
+
+/*
+ * Insert an XLOG record having the specified RMID and info bytes, with the
+ * body of the record being the data and buffer references registered earlier
+ * with XLogRegister* calls.
*
* Returns XLOG pointer to end of record (beginning of next record).
* This can be used as LSN for data pages affected by the logged action.
* (LSN is the XLOG point up to which the XLOG must be flushed to disk
* before the data page can be written out. This implements the basic
* WAL rule "write the log before the data".)
- *
- * NB: this routine feels free to scribble on the XLogRecData structs,
- * though not on the data they reference. This is OK since the XLogRecData
- * structs are always just temporaries in the calling code.
*/
XLogRecPtr
-XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
+XLogInsert(RmgrId rmid, uint8 info)
{
- XLogRecPtr RedoRecPtr;
- bool doPageWrites;
XLogRecPtr EndPos;
- XLogRecPtr fpw_lsn;
- XLogRecData *rdt;
- XLogRecData *rdt_lastnormal;
- /* info's high bits are reserved for use by me */
- if (info & XLR_INFO_MASK)
+ /* XLogBeginInsert() must have been called. */
+ if (!begininsert_called)
+ elog(ERROR, "XLogBeginInsert was not called");
+
+ /*
+ * The caller can set rmgr bits and XLR_SPECIAL_REL_UPDATE; the rest are
+ * reserved for use by me.
+ */
+ if ((info & ~(XLR_RMGR_INFO_MASK | XLR_SPECIAL_REL_UPDATE)) != 0)
elog(PANIC, "invalid xlog info mask %02X", info);
TRACE_POSTGRESQL_XLOG_INSERT(rmid, info);
@@ -67,292 +400,282 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
*/
if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID)
{
+ XLogResetInsertion();
EndPos = SizeOfXLogLongPHD; /* start of 1st chkpt record */
return EndPos;
}
- /*
- * Get values needed to decide whether to do full-page writes. Since we
- * don't yet have an insertion lock, these could change under us, but
- * XLogInsertRecord will recheck them once it has a lock.
- */
- GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites);
-
- /*
- * Assemble an XLogRecData chain representing the WAL record, including
- * any backup blocks needed.
- *
- * We may have to loop back to here if a race condition is detected in
- * XLogInsertRecord. We could prevent the race by doing all this work
- * while holding an insertion lock, but it seems better to avoid doing CRC
- * calculations while holding one.
- */
-retry:
- rdt = XLogRecordAssemble(rmid, info, rdata, RedoRecPtr, doPageWrites,
- &fpw_lsn, &rdt_lastnormal);
-
- EndPos = XLogInsertRecord(rdt, fpw_lsn);
-
- if (EndPos == InvalidXLogRecPtr)
+ do
{
+ XLogRecPtr RedoRecPtr;
+ bool doPageWrites;
+ XLogRecPtr fpw_lsn;
+ XLogRecData *rdt;
+
/*
- * Undo the changes we made to the rdata chain, and retry.
- *
- * XXX: This doesn't undo *all* the changes; the XLogRecData
- * entries for buffers that we had already decided to back up have
- * had their data-pointers cleared. That's OK, as long as we
- * decide to back them up on the next iteration as well. Hence,
- * don't allow "doPageWrites" value to go from true to false after
- * we've modified the rdata chain.
+ * Get values needed to decide whether to do full-page writes. Since
+ * we don't yet have an insertion lock, these could change under us,
+ * but XLogInsertRecData will recheck them once it has a lock.
*/
- bool newDoPageWrites;
+ GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites);
- GetFullPageWriteInfo(&RedoRecPtr, &newDoPageWrites);
- doPageWrites = doPageWrites || newDoPageWrites;
- rdt_lastnormal->next = NULL;
+ rdt = XLogRecordAssemble(rmid, info, RedoRecPtr, doPageWrites,
+ &fpw_lsn);
- goto retry;
- }
+ EndPos = XLogInsertRecord(rdt, fpw_lsn);
+ } while (EndPos == InvalidXLogRecPtr);
+
+ XLogResetInsertion();
return EndPos;
}
/*
- * Assemble a full WAL record, including backup blocks, from an XLogRecData
- * chain, ready for insertion with XLogInsertRecord(). The record header
- * fields are filled in, except for the xl_prev field and CRC.
+ * Assemble a WAL record from the registered data and buffers into an
+ * XLogRecData chain, ready for insertion with XLogInsertRecord().
*
- * The rdata chain is modified, adding entries for full-page images.
- * *rdt_lastnormal is set to point to the last normal (ie. not added by
- * this function) entry. It can be used to reset the chain to its original
- * state.
+ * The record header fields are filled in, except for the xl_prev field. The
+ * calculated CRC does not include xl_prev either.
*
- * If the rdata chain contains any buffer references, and a full-page image
- * was not taken of all the buffers, *fpw_lsn is set to the lowest LSN among
- * such pages. This signals that the assembled record is only good for
- * insertion on the assumption that the RedoRecPtr and doPageWrites values
- * were up-to-date.
+ * If there are any registered buffers, and a full-page image was not taken
+ * of all them, *page_writes_omitted is set to true. This signals that the
+ * assembled record is only good for insertion on the assumption that the
+ * RedoRecPtr and doPageWrites values were up-to-date.
*/
static XLogRecData *
-XLogRecordAssemble(RmgrId rmid, uint8 info, XLogRecData *rdata,
+XLogRecordAssemble(RmgrId rmid, uint8 info,
XLogRecPtr RedoRecPtr, bool doPageWrites,
- XLogRecPtr *fpw_lsn, XLogRecData **rdt_lastnormal)
+ XLogRecPtr *fpw_lsn)
{
- bool isLogSwitch = (rmid == RM_XLOG_ID && info == XLOG_SWITCH);
XLogRecData *rdt;
- Buffer dtbuf[XLR_MAX_BKP_BLOCKS];
- bool dtbuf_bkp[XLR_MAX_BKP_BLOCKS];
- uint32 len,
- total_len;
- unsigned i;
+ uint32 total_len = 0;
+ int block_id;
+ pg_crc32 rdata_crc;
+ registered_buffer *prev_regbuf = NULL;
+ XLogRecData *rdt_datas_last;
+ XLogRecord *rechdr;
+ char *scratch = hdr_scratch;
/*
- * These need to be static because they are returned to the caller as part
- * of the XLogRecData chain.
+ * Note: this function can be called multiple times for the same record.
+ * All the modifications we do to the rdata chains below must handle that.
*/
- static BkpBlock dtbuf_xlg[XLR_MAX_BKP_BLOCKS];
- static XLogRecData dtbuf_rdt1[XLR_MAX_BKP_BLOCKS];
- static XLogRecData dtbuf_rdt2[XLR_MAX_BKP_BLOCKS];
- static XLogRecData dtbuf_rdt3[XLR_MAX_BKP_BLOCKS];
- static XLogRecData hdr_rdt;
- static XLogRecord *rechdr;
-
- if (rechdr == NULL)
- {
- static char rechdrbuf[SizeOfXLogRecord + MAXIMUM_ALIGNOF];
- rechdr = (XLogRecord *) MAXALIGN(&rechdrbuf);
- MemSet(rechdr, 0, SizeOfXLogRecord);
- }
+ /* The record begins with the fixed-size header */
+ rechdr = (XLogRecord *) scratch;
+ scratch += SizeOfXLogRecord;
- /* The record begins with the header */
- hdr_rdt.data = (char *) rechdr;
- hdr_rdt.len = SizeOfXLogRecord;
- hdr_rdt.next = rdata;
- total_len = SizeOfXLogRecord;
+ hdr_rdt.next = NULL;
+ rdt_datas_last = &hdr_rdt;
+ hdr_rdt.data = hdr_scratch;
/*
- * Here we scan the rdata chain, to determine which buffers must be backed
- * up.
- *
- * We add entries for backup blocks to the chain, so that they don't need
- * any special treatment in the critical section where the chunks are
- * copied into the WAL buffers. Those entries have to be unlinked from the
- * chain if we have to loop back here.
+ * Make an rdata chain containing all the data portions of all block
+ * references. This includes the data for full-page images. Also append
+ * the headers for the block references in the scratch buffer.
*/
- for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
- {
- dtbuf[i] = InvalidBuffer;
- dtbuf_bkp[i] = false;
- }
-
*fpw_lsn = InvalidXLogRecPtr;
- len = 0;
- for (rdt = rdata;;)
+ for (block_id = 0; block_id < max_registered_block_id; block_id++)
{
- if (rdt->buffer == InvalidBuffer)
+ registered_buffer *regbuf = &registered_buffers[block_id];
+ bool needs_backup;
+ bool needs_data;
+ XLogRecordBlockHeader bkpb;
+ XLogRecordBlockImageHeader bimg;
+ bool samerel;
+
+ if (!regbuf->in_use)
+ continue;
+
+ /* Determine if this block needs to be backed up */
+ if (regbuf->flags & REGBUF_FORCE_IMAGE)
+ needs_backup = true;
+ else if (regbuf->flags & REGBUF_NO_IMAGE)
+ needs_backup = false;
+ else if (!doPageWrites)
+ needs_backup = false;
+ else
{
- /* Simple data, just include it */
- len += rdt->len;
+ /*
+ * We assume page LSN is first data on *every* page that can be
+ * passed to XLogInsert, whether it has the standard page layout
+ * or not.
+ */
+ XLogRecPtr page_lsn = PageGetLSN(regbuf->page);
+
+ needs_backup = (page_lsn <= RedoRecPtr);
+ if (!needs_backup)
+ {
+ if (*fpw_lsn == InvalidXLogRecPtr || page_lsn < *fpw_lsn)
+ *fpw_lsn = page_lsn;
+ }
}
+
+ /* Determine if the buffer data needs to included */
+ if (regbuf->rdata_len == 0)
+ needs_data = false;
+ else if ((regbuf->flags & REGBUF_KEEP_DATA) != 0)
+ needs_data = true;
else
+ needs_data = !needs_backup;
+
+ bkpb.id = block_id;
+ bkpb.fork_flags = regbuf->forkno;
+ bkpb.data_length = 0;
+
+ if ((regbuf->flags & REGBUF_WILL_INIT) == REGBUF_WILL_INIT)
+ bkpb.fork_flags |= BKPBLOCK_WILL_INIT;
+
+ if (needs_backup)
{
- /* Find info for buffer */
- for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
+ Page page = regbuf->page;
+
+ /*
+ * The page needs to be backed up, so set up *bimg
+ */
+ if (regbuf->flags & REGBUF_STANDARD)
{
- if (rdt->buffer == dtbuf[i])
+ /* Assume we can omit data between pd_lower and pd_upper */
+ uint16 lower = ((PageHeader) page)->pd_lower;
+ uint16 upper = ((PageHeader) page)->pd_upper;
+
+ if (lower >= SizeOfPageHeaderData &&
+ upper > lower &&
+ upper <= BLCKSZ)
{
- /* Buffer already referenced by earlier chain item */
- if (dtbuf_bkp[i])
- {
- rdt->data = NULL;
- rdt->len = 0;
- }
- else if (rdt->data)
- len += rdt->len;
- break;
+ bimg.hole_offset = lower;
+ bimg.hole_length = upper - lower;
}
- if (dtbuf[i] == InvalidBuffer)
+ else
{
- /* OK, put it in this slot */
- XLogRecPtr page_lsn;
- bool needs_backup;
-
- dtbuf[i] = rdt->buffer;
-
- /*
- * Determine whether the buffer has to be backed up.
- *
- * We assume page LSN is first data on *every* page that
- * can be passed to XLogInsert, whether it has the
- * standard page layout or not. We don't need to take the
- * buffer header lock for PageGetLSN because we hold an
- * exclusive lock on the page and/or the relation.
- */
- page_lsn = PageGetLSN(BufferGetPage(rdt->buffer));
- if (!doPageWrites)
- needs_backup = false;
- else if (page_lsn <= RedoRecPtr)
- needs_backup = true;
- else
- needs_backup = false;
-
- if (needs_backup)
- {
- /*
- * The page needs to be backed up, so set up BkpBlock
- */
- XLogFillBkpBlock(rdt->buffer, rdt->buffer_std,
- &(dtbuf_xlg[i]));
- dtbuf_bkp[i] = true;
- rdt->data = NULL;
- rdt->len = 0;
- }
- else
- {
- if (rdt->data)
- len += rdt->len;
- if (*fpw_lsn == InvalidXLogRecPtr ||
- page_lsn < *fpw_lsn)
- {
- *fpw_lsn = page_lsn;
- }
- }
- break;
+ /* No "hole" to compress out */
+ bimg.hole_offset = 0;
+ bimg.hole_length = 0;
}
}
- if (i >= XLR_MAX_BKP_BLOCKS)
- elog(PANIC, "can backup at most %d blocks per xlog record",
- XLR_MAX_BKP_BLOCKS);
- }
- /* Break out of loop when rdt points to last chain item */
- if (rdt->next == NULL)
- break;
- rdt = rdt->next;
- }
- total_len += len;
+ else
+ {
+ /* Not a standard page header, don't try to eliminate "hole" */
+ bimg.hole_offset = 0;
+ bimg.hole_length = 0;
+ }
- /*
- * Make additional rdata chain entries for the backup blocks, so that we
- * don't need to special-case them in the write loop. This modifies the
- * original rdata chain, but we keep a pointer to the last regular entry,
- * rdt_lastnormal, so that we can undo this if we have to start over.
- *
- * At the exit of this loop, total_len includes the backup block data.
- *
- * Also set the appropriate info bits to show which buffers were backed
- * up. The XLR_BKP_BLOCK(N) bit corresponds to the N'th distinct buffer
- * value (ignoring InvalidBuffer) appearing in the rdata chain.
- */
- *rdt_lastnormal = rdt;
- for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
- {
- BkpBlock *bkpb;
- char *page;
+ /* Fill in the remaining fields in the XLogRecordBlockData struct */
+ bkpb.fork_flags |= BKPBLOCK_HAS_IMAGE;
- if (!dtbuf_bkp[i])
- continue;
+ total_len += BLCKSZ - bimg.hole_length;
+
+ /*
+ * Construct XLogRecData entries for the page content.
+ */
+ rdt_datas_last->next = &regbuf->bkp_rdatas[0];
+ rdt_datas_last = rdt_datas_last->next;
+ if (bimg.hole_length == 0)
+ {
+ rdt_datas_last->data = page;
+ rdt_datas_last->len = BLCKSZ;
+ }
+ else
+ {
+ /* must skip the hole */
+ rdt_datas_last->data = page;
+ rdt_datas_last->len = bimg.hole_offset;
- info |= XLR_BKP_BLOCK(i);
+ rdt_datas_last->next = &regbuf->bkp_rdatas[1];
+ rdt_datas_last = rdt_datas_last->next;
- bkpb = &(dtbuf_xlg[i]);
- page = (char *) BufferGetBlock(dtbuf[i]);
+ rdt_datas_last->data = page + (bimg.hole_offset + bimg.hole_length);
+ rdt_datas_last->len = BLCKSZ - (bimg.hole_offset + bimg.hole_length);
+ }
+ }
- rdt->next = &(dtbuf_rdt1[i]);
- rdt = rdt->next;
+ if (needs_data)
+ {
+ /*
+ * Link the caller-supplied rdata chain for this buffer to the
+ * overall list.
+ */
+ bkpb.fork_flags |= BKPBLOCK_HAS_DATA;
+ bkpb.data_length = regbuf->rdata_len;
+ total_len += regbuf->rdata_len;
+
+ rdt_datas_last->next = regbuf->rdata_head;
+ rdt_datas_last = regbuf->rdata_tail;
+ }
- rdt->data = (char *) bkpb;
- rdt->len = sizeof(BkpBlock);
- total_len += sizeof(BkpBlock);
+ if (prev_regbuf && RelFileNodeEquals(regbuf->rnode, prev_regbuf->rnode))
+ {
+ samerel = true;
+ bkpb.fork_flags |= BKPBLOCK_SAME_REL;
+ prev_regbuf = regbuf;
+ }
+ else
+ samerel = false;
- rdt->next = &(dtbuf_rdt2[i]);
- rdt = rdt->next;
+ /* Ok, copy the header to the scratch buffer */
+ memcpy(scratch, &bkpb, SizeOfXLogRecordBlockHeader);
+ scratch += SizeOfXLogRecordBlockHeader;
+ if (needs_backup)
+ {
+ memcpy(scratch, &bimg, SizeOfXLogRecordBlockImageHeader);
+ scratch += SizeOfXLogRecordBlockImageHeader;
+ }
+ if (!samerel)
+ {
+ memcpy(scratch, &regbuf->rnode, sizeof(RelFileNode));
+ scratch += sizeof(RelFileNode);
+ }
+ memcpy(scratch, &regbuf->block, sizeof(BlockNumber));
+ scratch += sizeof(BlockNumber);
+ }
- if (bkpb->hole_length == 0)
+ /* followed by main data, if any */
+ if (mainrdata_len > 0)
+ {
+ if (mainrdata_len > 255)
{
- rdt->data = page;
- rdt->len = BLCKSZ;
- total_len += BLCKSZ;
- rdt->next = NULL;
+ *(scratch++) = XLR_BLOCK_ID_DATA_LONG;
+ memcpy(scratch, &mainrdata_len, sizeof(uint32));
+ scratch += sizeof(uint32);
}
else
{
- /* must skip the hole */
- rdt->data = page;
- rdt->len = bkpb->hole_offset;
- total_len += bkpb->hole_offset;
-
- rdt->next = &(dtbuf_rdt3[i]);
- rdt = rdt->next;
-
- rdt->data = page + (bkpb->hole_offset + bkpb->hole_length);
- rdt->len = BLCKSZ - (bkpb->hole_offset + bkpb->hole_length);
- total_len += rdt->len;
- rdt->next = NULL;
+ *(scratch++) = XLR_BLOCK_ID_DATA_SHORT;
+ *(scratch++) = (uint8) mainrdata_len;
}
+ rdt_datas_last->next = mainrdata_head;
+ rdt_datas_last = mainrdata_last;
+ total_len += mainrdata_len;
}
+ rdt_datas_last->next = NULL;
+
+ hdr_rdt.len = (scratch - hdr_scratch);
+ total_len += hdr_rdt.len;
/*
- * We disallow len == 0 because it provides a useful bit of extra error
- * checking in ReadRecord. This means that all callers of XLogInsert
- * must supply at least some not-in-a-buffer data. However, we make an
- * exception for XLOG SWITCH records because we don't want them to ever
- * cross a segment boundary.
+ * Calculate CRC of the data
+ *
+ * Note that the record header isn't added into the CRC initially since we
+ * don't know the prev-link yet. Thus, the CRC will represent the CRC of
+ * the whole record in the order: rdata, then backup blocks, then record
+ * header.
*/
- if (len == 0 && !isLogSwitch)
- elog(PANIC, "invalid xlog record length %u", rechdr->xl_len);
+ INIT_CRC32C(rdata_crc);
+ COMP_CRC32C(rdata_crc, hdr_scratch + SizeOfXLogRecord, hdr_rdt.len - SizeOfXLogRecord);
+ for (rdt = hdr_rdt.next; rdt != NULL; rdt = rdt->next)
+ COMP_CRC32C(rdata_crc, rdt->data, rdt->len);
/*
* Fill in the fields in the record header. Prev-link is filled in later,
- * once we know where in the WAL the record will be inserted. CRC is also
- * not calculated yet.
+ * once we know where in the WAL the record will be inserted. The CRC does
+ * not include the record header yet.
*/
rechdr->xl_xid = GetCurrentTransactionIdIfAny();
rechdr->xl_tot_len = total_len;
- rechdr->xl_len = len; /* doesn't include backup blocks */
rechdr->xl_info = info;
rechdr->xl_rmid = rmid;
rechdr->xl_prev = InvalidXLogRecPtr;
+ rechdr->xl_crc = rdata_crc;
return &hdr_rdt;
}
@@ -429,45 +752,41 @@ XLogSaveBufferForHint(Buffer buffer, bool buffer_std)
if (lsn <= RedoRecPtr)
{
- XLogRecData rdata[2];
- BkpBlock bkpb;
+ int flags;
char copied_buffer[BLCKSZ];
char *origdata = (char *) BufferGetBlock(buffer);
-
- /* Make a BkpBlock struct representing the buffer */
- XLogFillBkpBlock(buffer, buffer_std, &bkpb);
+ RelFileNode rnode;
+ ForkNumber forkno;
+ BlockNumber blkno;
/*
* Copy buffer so we don't have to worry about concurrent hint bit or
* lsn updates. We assume pd_lower/upper cannot be changed without an
* exclusive lock, so the contents bkp are not racy.
- *
- * With buffer_std set to false, XLogFillBkpBlock() sets hole_length
- * and hole_offset to 0; so the following code is safe for either
- * case.
*/
- memcpy(copied_buffer, origdata, bkpb.hole_offset);
- memcpy(copied_buffer + bkpb.hole_offset,
- origdata + bkpb.hole_offset + bkpb.hole_length,
- BLCKSZ - bkpb.hole_offset - bkpb.hole_length);
+ if (buffer_std)
+ {
+ /* Assume we can omit data between pd_lower and pd_upper */
+ Page page = BufferGetPage(buffer);
+ uint16 lower = ((PageHeader) page)->pd_lower;
+ uint16 upper = ((PageHeader) page)->pd_upper;
- /*
- * Header for backup block.
- */
- rdata[0].data = (char *) &bkpb;
- rdata[0].len = sizeof(BkpBlock);
- rdata[0].buffer = InvalidBuffer;
- rdata[0].next = &(rdata[1]);
+ memcpy(copied_buffer, origdata, lower);
+ memcpy(copied_buffer + upper, origdata + upper, BLCKSZ - upper);
+ }
+ else
+ memcpy(copied_buffer, origdata, BLCKSZ);
- /*
- * Save copy of the buffer.
- */
- rdata[1].data = copied_buffer;
- rdata[1].len = BLCKSZ - bkpb.hole_length;
- rdata[1].buffer = InvalidBuffer;
- rdata[1].next = NULL;
+ XLogBeginInsert();
- recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI, rdata);
+ flags = REGBUF_FORCE_IMAGE;
+ if (buffer_std)
+ flags |= REGBUF_STANDARD;
+
+ BufferGetTag(buffer, &rnode, &forkno, &blkno);
+ XLogRegisterBlock(0, &rnode, forkno, blkno, copied_buffer, flags);
+
+ recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI);
}
return recptr;
@@ -489,71 +808,16 @@ XLogRecPtr
log_newpage(RelFileNode *rnode, ForkNumber forkNum, BlockNumber blkno,
Page page, bool page_std)
{
- BkpBlock bkpb;
+ int flags;
XLogRecPtr recptr;
- XLogRecData rdata[3];
-
- /* NO ELOG(ERROR) from here till newpage op is logged */
- START_CRIT_SECTION();
-
- bkpb.node = *rnode;
- bkpb.fork = forkNum;
- bkpb.block = blkno;
+ flags = REGBUF_FORCE_IMAGE;
if (page_std)
- {
- /* Assume we can omit data between pd_lower and pd_upper */
- uint16 lower = ((PageHeader) page)->pd_lower;
- uint16 upper = ((PageHeader) page)->pd_upper;
-
- if (lower >= SizeOfPageHeaderData &&
- upper > lower &&
- upper <= BLCKSZ)
- {
- bkpb.hole_offset = lower;
- bkpb.hole_length = upper - lower;
- }
- else
- {
- /* No "hole" to compress out */
- bkpb.hole_offset = 0;
- bkpb.hole_length = 0;
- }
- }
- else
- {
- /* Not a standard page header, don't try to eliminate "hole" */
- bkpb.hole_offset = 0;
- bkpb.hole_length = 0;
- }
-
- rdata[0].data = (char *) &bkpb;
- rdata[0].len = sizeof(BkpBlock);
- rdata[0].buffer = InvalidBuffer;
- rdata[0].next = &(rdata[1]);
-
- if (bkpb.hole_length == 0)
- {
- rdata[1].data = (char *) page;
- rdata[1].len = BLCKSZ;
- rdata[1].buffer = InvalidBuffer;
- rdata[1].next = NULL;
- }
- else
- {
- /* must skip the hole */
- rdata[1].data = (char *) page;
- rdata[1].len = bkpb.hole_offset;
- rdata[1].buffer = InvalidBuffer;
- rdata[1].next = &rdata[2];
-
- rdata[2].data = (char *) page + (bkpb.hole_offset + bkpb.hole_length);
- rdata[2].len = BLCKSZ - (bkpb.hole_offset + bkpb.hole_length);
- rdata[2].buffer = InvalidBuffer;
- rdata[2].next = NULL;
- }
+ flags |= REGBUF_STANDARD;
- recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI, rdata);
+ XLogBeginInsert();
+ XLogRegisterBlock(0, rnode, forkNum, blkno, page, flags);
+ recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI);
/*
* The page may be uninitialized. If so, we can't set the LSN because that
@@ -564,8 +828,6 @@ log_newpage(RelFileNode *rnode, ForkNumber forkNum, BlockNumber blkno,
PageSetLSN(page, recptr);
}
- END_CRIT_SECTION();
-
return recptr;
}
@@ -596,38 +858,38 @@ log_newpage_buffer(Buffer buffer, bool page_std)
}
/*
- * Fill a BkpBlock for a buffer.
+ * Allocate working buffers needed for WAL record construction.
*/
-static void
-XLogFillBkpBlock(Buffer buffer, bool buffer_std, BkpBlock *bkpb)
+void
+InitXLogInsert(void)
{
- BufferGetTag(buffer, &bkpb->node, &bkpb->fork, &bkpb->block);
+ /* Initialize the working areas */
+ if (xloginsert_cxt == NULL)
+ {
+ xloginsert_cxt = AllocSetContextCreate(TopMemoryContext,
+ "WAL record construction",
+ ALLOCSET_DEFAULT_MINSIZE,
+ ALLOCSET_DEFAULT_INITSIZE,
+ ALLOCSET_DEFAULT_MAXSIZE);
+ }
- if (buffer_std)
+ if (registered_buffers == NULL)
{
- /* Assume we can omit data between pd_lower and pd_upper */
- Page page = BufferGetPage(buffer);
- uint16 lower = ((PageHeader) page)->pd_lower;
- uint16 upper = ((PageHeader) page)->pd_upper;
-
- if (lower >= SizeOfPageHeaderData &&
- upper > lower &&
- upper <= BLCKSZ)
- {
- bkpb->hole_offset = lower;
- bkpb->hole_length = upper - lower;
- }
- else
- {
- /* No "hole" to compress out */
- bkpb->hole_offset = 0;
- bkpb->hole_length = 0;
- }
+ registered_buffers = (registered_buffer *)
+ MemoryContextAllocZero(xloginsert_cxt,
+ sizeof(registered_buffer) * (XLR_NORMAL_MAX_BLOCK_ID + 1));
+ max_registered_buffers = XLR_NORMAL_MAX_BLOCK_ID + 1;
}
- else
+ if (rdatas == NULL)
{
- /* Not a standard page header, don't try to eliminate "hole" */
- bkpb->hole_offset = 0;
- bkpb->hole_length = 0;
+ rdatas = MemoryContextAlloc(xloginsert_cxt,
+ sizeof(XLogRecData) * XLR_NORMAL_RDATAS);
+ max_rdatas = XLR_NORMAL_RDATAS;
}
+
+ /*
+ * Allocate a buffer to hold the header information for a WAL record.
+ */
+ if (hdr_scratch == NULL)
+ hdr_scratch = palloc0(HEADER_SCRATCH_SIZE);
}