aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/transam/xlogreader.c
diff options
context:
space:
mode:
authorHeikki Linnakangas <heikki.linnakangas@iki.fi>2014-11-20 17:56:26 +0200
committerHeikki Linnakangas <heikki.linnakangas@iki.fi>2014-11-20 18:46:41 +0200
commit2c03216d831160bedd72d45f712601b6f7d03f1c (patch)
treeab6a03d031ffa605d848b0b7067add15e56e2207 /src/backend/access/transam/xlogreader.c
parent8dc626defec23016dd5988208d8704b858b9d21d (diff)
downloadpostgresql-2c03216d831160bedd72d45f712601b6f7d03f1c.tar.gz
postgresql-2c03216d831160bedd72d45f712601b6f7d03f1c.zip
Revamp the WAL record format.
Each WAL record now carries information about the modified relation and block(s) in a standardized format. That makes it easier to write tools that need that information, like pg_rewind, prefetching the blocks to speed up recovery, etc. There's a whole new API for building WAL records, replacing the XLogRecData chains used previously. The new API consists of XLogRegister* functions, which are called for each buffer and chunk of data that is added to the record. The new API also gives more control over when a full-page image is written, by passing flags to the XLogRegisterBuffer function. This also simplifies the XLogReadBufferForRedo() calls. The function can dig the relation and block number from the WAL record, so they no longer need to be passed as arguments. For the convenience of redo routines, XLogReader now disects each WAL record after reading it, copying the main data part and the per-block data into MAXALIGNed buffers. The data chunks are not aligned within the WAL record, but the redo routines can assume that the pointers returned by XLogRecGet* functions are. Redo routines are now passed the XLogReaderState, which contains the record in the already-disected format, instead of the plain XLogRecord. The new record format also makes the fixed size XLogRecord header smaller, by removing the xl_len field. The length of the "main data" portion is now stored at the end of the WAL record, and there's a separate header after XLogRecord for it. The alignment padding at the end of XLogRecord is also removed. This compansates for the fact that the new format would otherwise be more bulky than the old format. Reviewed by Andres Freund, Amit Kapila, Michael Paquier, Alvaro Herrera, Fujii Masao.
Diffstat (limited to 'src/backend/access/transam/xlogreader.c')
-rw-r--r--src/backend/access/transam/xlogreader.c486
1 files changed, 360 insertions, 126 deletions
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 7d573cc585d..67d62234369 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -37,6 +37,8 @@ report_invalid_record(XLogReaderState *state, const char *fmt,...)
the supplied arguments. */
__attribute__((format(PG_PRINTF_ATTRIBUTE, 2, 3)));
+static void ResetDecoder(XLogReaderState *state);
+
/* size of the buffer allocated for error message. */
#define MAX_ERRORMSG_LEN 1000
@@ -59,46 +61,33 @@ report_invalid_record(XLogReaderState *state, const char *fmt,...)
/*
* Allocate and initialize a new XLogReader.
*
- * Returns NULL if the xlogreader couldn't be allocated.
+ * The returned XLogReader is palloc'd. (In FRONTEND code, that means that
+ * running out-of-memory causes an immediate exit(1).
*/
XLogReaderState *
XLogReaderAllocate(XLogPageReadCB pagereadfunc, void *private_data)
{
XLogReaderState *state;
- AssertArg(pagereadfunc != NULL);
+ state = (XLogReaderState *) palloc0(sizeof(XLogReaderState));
- state = (XLogReaderState *) malloc(sizeof(XLogReaderState));
- if (!state)
- return NULL;
- MemSet(state, 0, sizeof(XLogReaderState));
+ state->max_block_id = -1;
/*
* Permanently allocate readBuf. We do it this way, rather than just
* making a static array, for two reasons: (1) no need to waste the
* storage in most instantiations of the backend; (2) a static char array
- * isn't guaranteed to have any particular alignment, whereas malloc()
+ * isn't guaranteed to have any particular alignment, whereas palloc()
* will provide MAXALIGN'd storage.
*/
- state->readBuf = (char *) malloc(XLOG_BLCKSZ);
- if (!state->readBuf)
- {
- free(state);
- return NULL;
- }
+ state->readBuf = (char *) palloc(XLOG_BLCKSZ);
state->read_page = pagereadfunc;
/* system_identifier initialized to zeroes above */
state->private_data = private_data;
/* ReadRecPtr and EndRecPtr initialized to zeroes above */
/* readSegNo, readOff, readLen, readPageTLI initialized to zeroes above */
- state->errormsg_buf = malloc(MAX_ERRORMSG_LEN + 1);
- if (!state->errormsg_buf)
- {
- free(state->readBuf);
- free(state);
- return NULL;
- }
+ state->errormsg_buf = palloc(MAX_ERRORMSG_LEN + 1);
state->errormsg_buf[0] = '\0';
/*
@@ -107,9 +96,9 @@ XLogReaderAllocate(XLogPageReadCB pagereadfunc, void *private_data)
*/
if (!allocate_recordbuf(state, 0))
{
- free(state->errormsg_buf);
- free(state->readBuf);
- free(state);
+ pfree(state->errormsg_buf);
+ pfree(state->readBuf);
+ pfree(state);
return NULL;
}
@@ -119,11 +108,24 @@ XLogReaderAllocate(XLogPageReadCB pagereadfunc, void *private_data)
void
XLogReaderFree(XLogReaderState *state)
{
- free(state->errormsg_buf);
+ int block_id;
+
+ for (block_id = 0; block_id <= state->max_block_id; block_id++)
+ {
+ if (state->blocks[block_id].in_use)
+ {
+ if (state->blocks[block_id].data)
+ pfree(state->blocks[block_id].data);
+ }
+ }
+ if (state->main_data)
+ pfree(state->main_data);
+
+ pfree(state->errormsg_buf);
if (state->readRecordBuf)
- free(state->readRecordBuf);
- free(state->readBuf);
- free(state);
+ pfree(state->readRecordBuf);
+ pfree(state->readBuf);
+ pfree(state);
}
/*
@@ -146,14 +148,8 @@ allocate_recordbuf(XLogReaderState *state, uint32 reclength)
newSize = Max(newSize, 5 * Max(BLCKSZ, XLOG_BLCKSZ));
if (state->readRecordBuf)
- free(state->readRecordBuf);
- state->readRecordBuf = (char *) malloc(newSize);
- if (!state->readRecordBuf)
- {
- state->readRecordBufSize = 0;
- return false;
- }
-
+ pfree(state->readRecordBuf);
+ state->readRecordBuf = (char *) palloc(newSize);
state->readRecordBufSize = newSize;
return true;
}
@@ -191,6 +187,8 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
*errormsg = NULL;
state->errormsg_buf[0] = '\0';
+ ResetDecoder(state);
+
if (RecPtr == InvalidXLogRecPtr)
{
RecPtr = state->EndRecPtr;
@@ -440,7 +438,10 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
state->EndRecPtr -= state->EndRecPtr % XLogSegSize;
}
- return record;
+ if (DecodeXLogRecord(state, record, errormsg))
+ return record;
+ else
+ return NULL;
err:
@@ -579,30 +580,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
XLogRecPtr PrevRecPtr, XLogRecord *record,
bool randAccess)
{
- /*
- * xl_len == 0 is bad data for everything except XLOG SWITCH, where it is
- * required.
- */
- if (record->xl_rmid == RM_XLOG_ID && record->xl_info == XLOG_SWITCH)
- {
- if (record->xl_len != 0)
- {
- report_invalid_record(state,
- "invalid xlog switch record at %X/%X",
- (uint32) (RecPtr >> 32), (uint32) RecPtr);
- return false;
- }
- }
- else if (record->xl_len == 0)
- {
- report_invalid_record(state,
- "record with zero length at %X/%X",
- (uint32) (RecPtr >> 32), (uint32) RecPtr);
- return false;
- }
- if (record->xl_tot_len < SizeOfXLogRecord + record->xl_len ||
- record->xl_tot_len > SizeOfXLogRecord + record->xl_len +
- XLR_MAX_BKP_BLOCKS * (sizeof(BkpBlock) + BLCKSZ))
+ if (record->xl_tot_len < SizeOfXLogRecord)
{
report_invalid_record(state,
"invalid record length at %X/%X",
@@ -663,79 +641,17 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
* We assume all of the record (that is, xl_tot_len bytes) has been read
* into memory at *record. Also, ValidXLogRecordHeader() has accepted the
* record's header, which means in particular that xl_tot_len is at least
- * SizeOfXlogRecord, so it is safe to fetch xl_len.
+ * SizeOfXlogRecord.
*/
static bool
ValidXLogRecord(XLogReaderState *state, XLogRecord *record, XLogRecPtr recptr)
{
pg_crc32 crc;
- int i;
- uint32 len = record->xl_len;
- BkpBlock bkpb;
- char *blk;
- size_t remaining = record->xl_tot_len;
- /* First the rmgr data */
- if (remaining < SizeOfXLogRecord + len)
- {
- /* ValidXLogRecordHeader() should've caught this already... */
- report_invalid_record(state, "invalid record length at %X/%X",
- (uint32) (recptr >> 32), (uint32) recptr);
- return false;
- }
- remaining -= SizeOfXLogRecord + len;
+ /* Calculate the CRC */
INIT_CRC32C(crc);
- COMP_CRC32C(crc, XLogRecGetData(record), len);
-
- /* Add in the backup blocks, if any */
- blk = (char *) XLogRecGetData(record) + len;
- for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
- {
- uint32 blen;
-
- if (!(record->xl_info & XLR_BKP_BLOCK(i)))
- continue;
-
- if (remaining < sizeof(BkpBlock))
- {
- report_invalid_record(state,
- "invalid backup block size in record at %X/%X",
- (uint32) (recptr >> 32), (uint32) recptr);
- return false;
- }
- memcpy(&bkpb, blk, sizeof(BkpBlock));
-
- if (bkpb.hole_offset + bkpb.hole_length > BLCKSZ)
- {
- report_invalid_record(state,
- "incorrect hole size in record at %X/%X",
- (uint32) (recptr >> 32), (uint32) recptr);
- return false;
- }
- blen = sizeof(BkpBlock) + BLCKSZ - bkpb.hole_length;
-
- if (remaining < blen)
- {
- report_invalid_record(state,
- "invalid backup block size in record at %X/%X",
- (uint32) (recptr >> 32), (uint32) recptr);
- return false;
- }
- remaining -= blen;
- COMP_CRC32C(crc, blk, blen);
- blk += blen;
- }
-
- /* Check that xl_tot_len agrees with our calculation */
- if (remaining != 0)
- {
- report_invalid_record(state,
- "incorrect total length in record at %X/%X",
- (uint32) (recptr >> 32), (uint32) recptr);
- return false;
- }
-
- /* Finally include the record header */
+ COMP_CRC32C(crc, ((char *) record) + SizeOfXLogRecord, record->xl_tot_len - SizeOfXLogRecord);
+ /* include the record header last */
COMP_CRC32C(crc, (char *) record, offsetof(XLogRecord, xl_crc));
FIN_CRC32C(crc);
@@ -985,3 +901,321 @@ out:
}
#endif /* FRONTEND */
+
+
+/* ----------------------------------------
+ * Functions for decoding the data and block references in a record.
+ * ----------------------------------------
+ */
+
+/* private function to reset the state between records */
+static void
+ResetDecoder(XLogReaderState *state)
+{
+ int block_id;
+
+ state->decoded_record = NULL;
+
+ state->main_data_len = 0;
+
+ for (block_id = 0; block_id <= state->max_block_id; block_id++)
+ {
+ state->blocks[block_id].in_use = false;
+ state->blocks[block_id].has_image = false;
+ state->blocks[block_id].has_data = false;
+ }
+ state->max_block_id = -1;
+}
+
+/*
+ * Decode the previously read record.
+ *
+ * On error, a human-readable error message is returned in *errormsg, and
+ * the return value is false.
+ */
+bool
+DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
+{
+ /*
+ * read next _size bytes from record buffer, but check for overrun first.
+ */
+#define COPY_HEADER_FIELD(_dst, _size) \
+ do { \
+ if (remaining < _size) \
+ goto shortdata_err; \
+ memcpy(_dst, ptr, _size); \
+ ptr += _size; \
+ remaining -= _size; \
+ } while(0)
+
+ char *ptr;
+ uint32 remaining;
+ uint32 datatotal;
+ RelFileNode *rnode = NULL;
+ uint8 block_id;
+
+ ResetDecoder(state);
+
+ state->decoded_record = record;
+
+ ptr = (char *) record;
+ ptr += SizeOfXLogRecord;
+ remaining = record->xl_tot_len - SizeOfXLogRecord;
+
+ /* Decode the headers */
+ datatotal = 0;
+ while (remaining > datatotal)
+ {
+ COPY_HEADER_FIELD(&block_id, sizeof(uint8));
+
+ if (block_id == XLR_BLOCK_ID_DATA_SHORT)
+ {
+ /* XLogRecordDataHeaderShort */
+ uint8 main_data_len;
+
+ COPY_HEADER_FIELD(&main_data_len, sizeof(uint8));
+
+ state->main_data_len = main_data_len;
+ datatotal += main_data_len;
+ break; /* by convention, the main data fragment is
+ * always last */
+ }
+ else if (block_id == XLR_BLOCK_ID_DATA_LONG)
+ {
+ /* XLogRecordDataHeaderLong */
+ uint32 main_data_len;
+
+ COPY_HEADER_FIELD(&main_data_len, sizeof(uint32));
+ state->main_data_len = main_data_len;
+ datatotal += main_data_len;
+ break; /* by convention, the main data fragment is
+ * always last */
+ }
+ else if (block_id <= XLR_MAX_BLOCK_ID)
+ {
+ /* XLogRecordBlockHeader */
+ DecodedBkpBlock *blk;
+ uint8 fork_flags;
+
+ if (block_id <= state->max_block_id)
+ {
+ report_invalid_record(state,
+ "out-of-order block_id %u at %X/%X",
+ block_id,
+ (uint32) (state->ReadRecPtr >> 32),
+ (uint32) state->ReadRecPtr);
+ goto err;
+ }
+ state->max_block_id = block_id;
+
+ blk = &state->blocks[block_id];
+ blk->in_use = true;
+
+ COPY_HEADER_FIELD(&fork_flags, sizeof(uint8));
+ blk->forknum = fork_flags & BKPBLOCK_FORK_MASK;
+ blk->flags = fork_flags;
+ blk->has_image = ((fork_flags & BKPBLOCK_HAS_IMAGE) != 0);
+ blk->has_data = ((fork_flags & BKPBLOCK_HAS_DATA) != 0);
+
+ COPY_HEADER_FIELD(&blk->data_len, sizeof(uint16));
+ /* cross-check that the HAS_DATA flag is set iff data_length > 0 */
+ if (blk->has_data && blk->data_len == 0)
+ report_invalid_record(state,
+ "BKPBLOCK_HAS_DATA set, but no data included at %X/%X",
+ (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
+ if (!blk->has_data && blk->data_len != 0)
+ report_invalid_record(state,
+ "BKPBLOCK_HAS_DATA not set, but data length is %u at %X/%X",
+ (unsigned int) blk->data_len,
+ (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
+ datatotal += blk->data_len;
+
+ if (blk->has_image)
+ {
+ COPY_HEADER_FIELD(&blk->hole_offset, sizeof(uint16));
+ COPY_HEADER_FIELD(&blk->hole_length, sizeof(uint16));
+ datatotal += BLCKSZ - blk->hole_length;
+ }
+ if (!(fork_flags & BKPBLOCK_SAME_REL))
+ {
+ COPY_HEADER_FIELD(&blk->rnode, sizeof(RelFileNode));
+ rnode = &blk->rnode;
+ }
+ else
+ {
+ if (rnode == NULL)
+ {
+ report_invalid_record(state,
+ "BKPBLOCK_SAME_REL set but no previous rel at %X/%X",
+ (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
+ goto err;
+ }
+
+ blk->rnode = *rnode;
+ }
+ COPY_HEADER_FIELD(&blk->blkno, sizeof(BlockNumber));
+ }
+ else
+ {
+ report_invalid_record(state,
+ "invalid block_id %u at %X/%X",
+ block_id,
+ (uint32) (state->ReadRecPtr >> 32),
+ (uint32) state->ReadRecPtr);
+ goto err;
+ }
+ }
+
+ if (remaining != datatotal)
+ goto shortdata_err;
+
+ /*
+ * Ok, we've parsed the fragment headers, and verified that the total
+ * length of the payload in the fragments is equal to the amount of data
+ * left. Copy the data of each fragment to a separate buffer.
+ *
+ * We could just set up pointers into readRecordBuf, but we want to align
+ * the data for the convenience of the callers. Backup images are not
+ * copied, however; they don't need alignment.
+ */
+
+ /* block data first */
+ for (block_id = 0; block_id <= state->max_block_id; block_id++)
+ {
+ DecodedBkpBlock *blk = &state->blocks[block_id];
+
+ if (!blk->in_use)
+ continue;
+ if (blk->has_image)
+ {
+ blk->bkp_image = ptr;
+ ptr += BLCKSZ - blk->hole_length;
+ }
+ if (blk->has_data)
+ {
+ if (!blk->data || blk->data_len > blk->data_bufsz)
+ {
+ if (blk->data)
+ pfree(blk->data);
+ blk->data_bufsz = blk->data_len;
+ blk->data = palloc(blk->data_bufsz);
+ }
+ memcpy(blk->data, ptr, blk->data_len);
+ ptr += blk->data_len;
+ }
+ }
+
+ /* and finally, the main data */
+ if (state->main_data_len > 0)
+ {
+ if (!state->main_data || state->main_data_len > state->main_data_bufsz)
+ {
+ if (state->main_data)
+ pfree(state->main_data);
+ state->main_data_bufsz = state->main_data_len;
+ state->main_data = palloc(state->main_data_bufsz);
+ }
+ memcpy(state->main_data, ptr, state->main_data_len);
+ ptr += state->main_data_len;
+ }
+
+ return true;
+
+shortdata_err:
+ report_invalid_record(state,
+ "record with invalid length at %X/%X",
+ (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
+err:
+ *errormsg = state->errormsg_buf;
+
+ return false;
+}
+
+/*
+ * Returns information about the block that a block reference refers to.
+ *
+ * If the WAL record contains a block reference with the given ID, *rnode,
+ * *forknum, and *blknum are filled in (if not NULL), and returns TRUE.
+ * Otherwise returns FALSE.
+ */
+bool
+XLogRecGetBlockTag(XLogReaderState *record, uint8 block_id,
+ RelFileNode *rnode, ForkNumber *forknum, BlockNumber *blknum)
+{
+ DecodedBkpBlock *bkpb;
+
+ if (!record->blocks[block_id].in_use)
+ return false;
+
+ bkpb = &record->blocks[block_id];
+ if (rnode)
+ *rnode = bkpb->rnode;
+ if (forknum)
+ *forknum = bkpb->forknum;
+ if (blknum)
+ *blknum = bkpb->blkno;
+ return true;
+}
+
+/*
+ * Returns the data associated with a block reference, or NULL if there is
+ * no data (e.g. because a full-page image was taken instead). The returned
+ * pointer points to a MAXALIGNed buffer.
+ */
+char *
+XLogRecGetBlockData(XLogReaderState *record, uint8 block_id, Size *len)
+{
+ DecodedBkpBlock *bkpb;
+
+ if (!record->blocks[block_id].in_use)
+ return NULL;
+
+ bkpb = &record->blocks[block_id];
+
+ if (!bkpb->has_data)
+ {
+ if (len)
+ *len = 0;
+ return NULL;
+ }
+ else
+ {
+ if (len)
+ *len = bkpb->data_len;
+ return bkpb->data;
+ }
+}
+
+/*
+ * Restore a full-page image from a backup block attached to an XLOG record.
+ *
+ * Returns the buffer number containing the page.
+ */
+bool
+RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
+{
+ DecodedBkpBlock *bkpb;
+
+ if (!record->blocks[block_id].in_use)
+ return false;
+ if (!record->blocks[block_id].has_image)
+ return false;
+
+ bkpb = &record->blocks[block_id];
+
+ if (bkpb->hole_length == 0)
+ {
+ memcpy(page, bkpb->bkp_image, BLCKSZ);
+ }
+ else
+ {
+ memcpy(page, bkpb->bkp_image, bkpb->hole_offset);
+ /* must zero-fill the hole */
+ MemSet(page + bkpb->hole_offset, 0, bkpb->hole_length);
+ memcpy(page + (bkpb->hole_offset + bkpb->hole_length),
+ bkpb->bkp_image + bkpb->hole_offset,
+ BLCKSZ - (bkpb->hole_offset + bkpb->hole_length));
+ }
+
+ return true;
+}