aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/transam/xlogreader.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/transam/xlogreader.c')
-rw-r--r--src/backend/access/transam/xlogreader.c486
1 files changed, 360 insertions, 126 deletions
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 7d573cc585d..67d62234369 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -37,6 +37,8 @@ report_invalid_record(XLogReaderState *state, const char *fmt,...)
the supplied arguments. */
__attribute__((format(PG_PRINTF_ATTRIBUTE, 2, 3)));
+static void ResetDecoder(XLogReaderState *state);
+
/* size of the buffer allocated for error message. */
#define MAX_ERRORMSG_LEN 1000
@@ -59,46 +61,33 @@ report_invalid_record(XLogReaderState *state, const char *fmt,...)
/*
* Allocate and initialize a new XLogReader.
*
- * Returns NULL if the xlogreader couldn't be allocated.
+ * The returned XLogReader is palloc'd. (In FRONTEND code, that means that
+ * running out-of-memory causes an immediate exit(1).
*/
XLogReaderState *
XLogReaderAllocate(XLogPageReadCB pagereadfunc, void *private_data)
{
XLogReaderState *state;
- AssertArg(pagereadfunc != NULL);
+ state = (XLogReaderState *) palloc0(sizeof(XLogReaderState));
- state = (XLogReaderState *) malloc(sizeof(XLogReaderState));
- if (!state)
- return NULL;
- MemSet(state, 0, sizeof(XLogReaderState));
+ state->max_block_id = -1;
/*
* Permanently allocate readBuf. We do it this way, rather than just
* making a static array, for two reasons: (1) no need to waste the
* storage in most instantiations of the backend; (2) a static char array
- * isn't guaranteed to have any particular alignment, whereas malloc()
+ * isn't guaranteed to have any particular alignment, whereas palloc()
* will provide MAXALIGN'd storage.
*/
- state->readBuf = (char *) malloc(XLOG_BLCKSZ);
- if (!state->readBuf)
- {
- free(state);
- return NULL;
- }
+ state->readBuf = (char *) palloc(XLOG_BLCKSZ);
state->read_page = pagereadfunc;
/* system_identifier initialized to zeroes above */
state->private_data = private_data;
/* ReadRecPtr and EndRecPtr initialized to zeroes above */
/* readSegNo, readOff, readLen, readPageTLI initialized to zeroes above */
- state->errormsg_buf = malloc(MAX_ERRORMSG_LEN + 1);
- if (!state->errormsg_buf)
- {
- free(state->readBuf);
- free(state);
- return NULL;
- }
+ state->errormsg_buf = palloc(MAX_ERRORMSG_LEN + 1);
state->errormsg_buf[0] = '\0';
/*
@@ -107,9 +96,9 @@ XLogReaderAllocate(XLogPageReadCB pagereadfunc, void *private_data)
*/
if (!allocate_recordbuf(state, 0))
{
- free(state->errormsg_buf);
- free(state->readBuf);
- free(state);
+ pfree(state->errormsg_buf);
+ pfree(state->readBuf);
+ pfree(state);
return NULL;
}
@@ -119,11 +108,24 @@ XLogReaderAllocate(XLogPageReadCB pagereadfunc, void *private_data)
void
XLogReaderFree(XLogReaderState *state)
{
- free(state->errormsg_buf);
+ int block_id;
+
+ for (block_id = 0; block_id <= state->max_block_id; block_id++)
+ {
+ if (state->blocks[block_id].in_use)
+ {
+ if (state->blocks[block_id].data)
+ pfree(state->blocks[block_id].data);
+ }
+ }
+ if (state->main_data)
+ pfree(state->main_data);
+
+ pfree(state->errormsg_buf);
if (state->readRecordBuf)
- free(state->readRecordBuf);
- free(state->readBuf);
- free(state);
+ pfree(state->readRecordBuf);
+ pfree(state->readBuf);
+ pfree(state);
}
/*
@@ -146,14 +148,8 @@ allocate_recordbuf(XLogReaderState *state, uint32 reclength)
newSize = Max(newSize, 5 * Max(BLCKSZ, XLOG_BLCKSZ));
if (state->readRecordBuf)
- free(state->readRecordBuf);
- state->readRecordBuf = (char *) malloc(newSize);
- if (!state->readRecordBuf)
- {
- state->readRecordBufSize = 0;
- return false;
- }
-
+ pfree(state->readRecordBuf);
+ state->readRecordBuf = (char *) palloc(newSize);
state->readRecordBufSize = newSize;
return true;
}
@@ -191,6 +187,8 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
*errormsg = NULL;
state->errormsg_buf[0] = '\0';
+ ResetDecoder(state);
+
if (RecPtr == InvalidXLogRecPtr)
{
RecPtr = state->EndRecPtr;
@@ -440,7 +438,10 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg)
state->EndRecPtr -= state->EndRecPtr % XLogSegSize;
}
- return record;
+ if (DecodeXLogRecord(state, record, errormsg))
+ return record;
+ else
+ return NULL;
err:
@@ -579,30 +580,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
XLogRecPtr PrevRecPtr, XLogRecord *record,
bool randAccess)
{
- /*
- * xl_len == 0 is bad data for everything except XLOG SWITCH, where it is
- * required.
- */
- if (record->xl_rmid == RM_XLOG_ID && record->xl_info == XLOG_SWITCH)
- {
- if (record->xl_len != 0)
- {
- report_invalid_record(state,
- "invalid xlog switch record at %X/%X",
- (uint32) (RecPtr >> 32), (uint32) RecPtr);
- return false;
- }
- }
- else if (record->xl_len == 0)
- {
- report_invalid_record(state,
- "record with zero length at %X/%X",
- (uint32) (RecPtr >> 32), (uint32) RecPtr);
- return false;
- }
- if (record->xl_tot_len < SizeOfXLogRecord + record->xl_len ||
- record->xl_tot_len > SizeOfXLogRecord + record->xl_len +
- XLR_MAX_BKP_BLOCKS * (sizeof(BkpBlock) + BLCKSZ))
+ if (record->xl_tot_len < SizeOfXLogRecord)
{
report_invalid_record(state,
"invalid record length at %X/%X",
@@ -663,79 +641,17 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
* We assume all of the record (that is, xl_tot_len bytes) has been read
* into memory at *record. Also, ValidXLogRecordHeader() has accepted the
* record's header, which means in particular that xl_tot_len is at least
- * SizeOfXlogRecord, so it is safe to fetch xl_len.
+ * SizeOfXlogRecord.
*/
static bool
ValidXLogRecord(XLogReaderState *state, XLogRecord *record, XLogRecPtr recptr)
{
pg_crc32 crc;
- int i;
- uint32 len = record->xl_len;
- BkpBlock bkpb;
- char *blk;
- size_t remaining = record->xl_tot_len;
- /* First the rmgr data */
- if (remaining < SizeOfXLogRecord + len)
- {
- /* ValidXLogRecordHeader() should've caught this already... */
- report_invalid_record(state, "invalid record length at %X/%X",
- (uint32) (recptr >> 32), (uint32) recptr);
- return false;
- }
- remaining -= SizeOfXLogRecord + len;
+ /* Calculate the CRC */
INIT_CRC32C(crc);
- COMP_CRC32C(crc, XLogRecGetData(record), len);
-
- /* Add in the backup blocks, if any */
- blk = (char *) XLogRecGetData(record) + len;
- for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
- {
- uint32 blen;
-
- if (!(record->xl_info & XLR_BKP_BLOCK(i)))
- continue;
-
- if (remaining < sizeof(BkpBlock))
- {
- report_invalid_record(state,
- "invalid backup block size in record at %X/%X",
- (uint32) (recptr >> 32), (uint32) recptr);
- return false;
- }
- memcpy(&bkpb, blk, sizeof(BkpBlock));
-
- if (bkpb.hole_offset + bkpb.hole_length > BLCKSZ)
- {
- report_invalid_record(state,
- "incorrect hole size in record at %X/%X",
- (uint32) (recptr >> 32), (uint32) recptr);
- return false;
- }
- blen = sizeof(BkpBlock) + BLCKSZ - bkpb.hole_length;
-
- if (remaining < blen)
- {
- report_invalid_record(state,
- "invalid backup block size in record at %X/%X",
- (uint32) (recptr >> 32), (uint32) recptr);
- return false;
- }
- remaining -= blen;
- COMP_CRC32C(crc, blk, blen);
- blk += blen;
- }
-
- /* Check that xl_tot_len agrees with our calculation */
- if (remaining != 0)
- {
- report_invalid_record(state,
- "incorrect total length in record at %X/%X",
- (uint32) (recptr >> 32), (uint32) recptr);
- return false;
- }
-
- /* Finally include the record header */
+ COMP_CRC32C(crc, ((char *) record) + SizeOfXLogRecord, record->xl_tot_len - SizeOfXLogRecord);
+ /* include the record header last */
COMP_CRC32C(crc, (char *) record, offsetof(XLogRecord, xl_crc));
FIN_CRC32C(crc);
@@ -985,3 +901,321 @@ out:
}
#endif /* FRONTEND */
+
+
+/* ----------------------------------------
+ * Functions for decoding the data and block references in a record.
+ * ----------------------------------------
+ */
+
+/* private function to reset the state between records */
+static void
+ResetDecoder(XLogReaderState *state)
+{
+ int block_id;
+
+ state->decoded_record = NULL;
+
+ state->main_data_len = 0;
+
+ for (block_id = 0; block_id <= state->max_block_id; block_id++)
+ {
+ state->blocks[block_id].in_use = false;
+ state->blocks[block_id].has_image = false;
+ state->blocks[block_id].has_data = false;
+ }
+ state->max_block_id = -1;
+}
+
+/*
+ * Decode the previously read record.
+ *
+ * On error, a human-readable error message is returned in *errormsg, and
+ * the return value is false.
+ */
+bool
+DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
+{
+ /*
+ * read next _size bytes from record buffer, but check for overrun first.
+ */
+#define COPY_HEADER_FIELD(_dst, _size) \
+ do { \
+ if (remaining < _size) \
+ goto shortdata_err; \
+ memcpy(_dst, ptr, _size); \
+ ptr += _size; \
+ remaining -= _size; \
+ } while(0)
+
+ char *ptr;
+ uint32 remaining;
+ uint32 datatotal;
+ RelFileNode *rnode = NULL;
+ uint8 block_id;
+
+ ResetDecoder(state);
+
+ state->decoded_record = record;
+
+ ptr = (char *) record;
+ ptr += SizeOfXLogRecord;
+ remaining = record->xl_tot_len - SizeOfXLogRecord;
+
+ /* Decode the headers */
+ datatotal = 0;
+ while (remaining > datatotal)
+ {
+ COPY_HEADER_FIELD(&block_id, sizeof(uint8));
+
+ if (block_id == XLR_BLOCK_ID_DATA_SHORT)
+ {
+ /* XLogRecordDataHeaderShort */
+ uint8 main_data_len;
+
+ COPY_HEADER_FIELD(&main_data_len, sizeof(uint8));
+
+ state->main_data_len = main_data_len;
+ datatotal += main_data_len;
+ break; /* by convention, the main data fragment is
+ * always last */
+ }
+ else if (block_id == XLR_BLOCK_ID_DATA_LONG)
+ {
+ /* XLogRecordDataHeaderLong */
+ uint32 main_data_len;
+
+ COPY_HEADER_FIELD(&main_data_len, sizeof(uint32));
+ state->main_data_len = main_data_len;
+ datatotal += main_data_len;
+ break; /* by convention, the main data fragment is
+ * always last */
+ }
+ else if (block_id <= XLR_MAX_BLOCK_ID)
+ {
+ /* XLogRecordBlockHeader */
+ DecodedBkpBlock *blk;
+ uint8 fork_flags;
+
+ if (block_id <= state->max_block_id)
+ {
+ report_invalid_record(state,
+ "out-of-order block_id %u at %X/%X",
+ block_id,
+ (uint32) (state->ReadRecPtr >> 32),
+ (uint32) state->ReadRecPtr);
+ goto err;
+ }
+ state->max_block_id = block_id;
+
+ blk = &state->blocks[block_id];
+ blk->in_use = true;
+
+ COPY_HEADER_FIELD(&fork_flags, sizeof(uint8));
+ blk->forknum = fork_flags & BKPBLOCK_FORK_MASK;
+ blk->flags = fork_flags;
+ blk->has_image = ((fork_flags & BKPBLOCK_HAS_IMAGE) != 0);
+ blk->has_data = ((fork_flags & BKPBLOCK_HAS_DATA) != 0);
+
+ COPY_HEADER_FIELD(&blk->data_len, sizeof(uint16));
+ /* cross-check that the HAS_DATA flag is set iff data_length > 0 */
+ if (blk->has_data && blk->data_len == 0)
+ report_invalid_record(state,
+ "BKPBLOCK_HAS_DATA set, but no data included at %X/%X",
+ (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
+ if (!blk->has_data && blk->data_len != 0)
+ report_invalid_record(state,
+ "BKPBLOCK_HAS_DATA not set, but data length is %u at %X/%X",
+ (unsigned int) blk->data_len,
+ (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
+ datatotal += blk->data_len;
+
+ if (blk->has_image)
+ {
+ COPY_HEADER_FIELD(&blk->hole_offset, sizeof(uint16));
+ COPY_HEADER_FIELD(&blk->hole_length, sizeof(uint16));
+ datatotal += BLCKSZ - blk->hole_length;
+ }
+ if (!(fork_flags & BKPBLOCK_SAME_REL))
+ {
+ COPY_HEADER_FIELD(&blk->rnode, sizeof(RelFileNode));
+ rnode = &blk->rnode;
+ }
+ else
+ {
+ if (rnode == NULL)
+ {
+ report_invalid_record(state,
+ "BKPBLOCK_SAME_REL set but no previous rel at %X/%X",
+ (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
+ goto err;
+ }
+
+ blk->rnode = *rnode;
+ }
+ COPY_HEADER_FIELD(&blk->blkno, sizeof(BlockNumber));
+ }
+ else
+ {
+ report_invalid_record(state,
+ "invalid block_id %u at %X/%X",
+ block_id,
+ (uint32) (state->ReadRecPtr >> 32),
+ (uint32) state->ReadRecPtr);
+ goto err;
+ }
+ }
+
+ if (remaining != datatotal)
+ goto shortdata_err;
+
+ /*
+ * Ok, we've parsed the fragment headers, and verified that the total
+ * length of the payload in the fragments is equal to the amount of data
+ * left. Copy the data of each fragment to a separate buffer.
+ *
+ * We could just set up pointers into readRecordBuf, but we want to align
+ * the data for the convenience of the callers. Backup images are not
+ * copied, however; they don't need alignment.
+ */
+
+ /* block data first */
+ for (block_id = 0; block_id <= state->max_block_id; block_id++)
+ {
+ DecodedBkpBlock *blk = &state->blocks[block_id];
+
+ if (!blk->in_use)
+ continue;
+ if (blk->has_image)
+ {
+ blk->bkp_image = ptr;
+ ptr += BLCKSZ - blk->hole_length;
+ }
+ if (blk->has_data)
+ {
+ if (!blk->data || blk->data_len > blk->data_bufsz)
+ {
+ if (blk->data)
+ pfree(blk->data);
+ blk->data_bufsz = blk->data_len;
+ blk->data = palloc(blk->data_bufsz);
+ }
+ memcpy(blk->data, ptr, blk->data_len);
+ ptr += blk->data_len;
+ }
+ }
+
+ /* and finally, the main data */
+ if (state->main_data_len > 0)
+ {
+ if (!state->main_data || state->main_data_len > state->main_data_bufsz)
+ {
+ if (state->main_data)
+ pfree(state->main_data);
+ state->main_data_bufsz = state->main_data_len;
+ state->main_data = palloc(state->main_data_bufsz);
+ }
+ memcpy(state->main_data, ptr, state->main_data_len);
+ ptr += state->main_data_len;
+ }
+
+ return true;
+
+shortdata_err:
+ report_invalid_record(state,
+ "record with invalid length at %X/%X",
+ (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr);
+err:
+ *errormsg = state->errormsg_buf;
+
+ return false;
+}
+
+/*
+ * Returns information about the block that a block reference refers to.
+ *
+ * If the WAL record contains a block reference with the given ID, *rnode,
+ * *forknum, and *blknum are filled in (if not NULL), and returns TRUE.
+ * Otherwise returns FALSE.
+ */
+bool
+XLogRecGetBlockTag(XLogReaderState *record, uint8 block_id,
+ RelFileNode *rnode, ForkNumber *forknum, BlockNumber *blknum)
+{
+ DecodedBkpBlock *bkpb;
+
+ if (!record->blocks[block_id].in_use)
+ return false;
+
+ bkpb = &record->blocks[block_id];
+ if (rnode)
+ *rnode = bkpb->rnode;
+ if (forknum)
+ *forknum = bkpb->forknum;
+ if (blknum)
+ *blknum = bkpb->blkno;
+ return true;
+}
+
+/*
+ * Returns the data associated with a block reference, or NULL if there is
+ * no data (e.g. because a full-page image was taken instead). The returned
+ * pointer points to a MAXALIGNed buffer.
+ */
+char *
+XLogRecGetBlockData(XLogReaderState *record, uint8 block_id, Size *len)
+{
+ DecodedBkpBlock *bkpb;
+
+ if (!record->blocks[block_id].in_use)
+ return NULL;
+
+ bkpb = &record->blocks[block_id];
+
+ if (!bkpb->has_data)
+ {
+ if (len)
+ *len = 0;
+ return NULL;
+ }
+ else
+ {
+ if (len)
+ *len = bkpb->data_len;
+ return bkpb->data;
+ }
+}
+
+/*
+ * Restore a full-page image from a backup block attached to an XLOG record.
+ *
+ * Returns the buffer number containing the page.
+ */
+bool
+RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
+{
+ DecodedBkpBlock *bkpb;
+
+ if (!record->blocks[block_id].in_use)
+ return false;
+ if (!record->blocks[block_id].has_image)
+ return false;
+
+ bkpb = &record->blocks[block_id];
+
+ if (bkpb->hole_length == 0)
+ {
+ memcpy(page, bkpb->bkp_image, BLCKSZ);
+ }
+ else
+ {
+ memcpy(page, bkpb->bkp_image, bkpb->hole_offset);
+ /* must zero-fill the hole */
+ MemSet(page + bkpb->hole_offset, 0, bkpb->hole_length);
+ memcpy(page + (bkpb->hole_offset + bkpb->hole_length),
+ bkpb->bkp_image + bkpb->hole_offset,
+ BLCKSZ - (bkpb->hole_offset + bkpb->hole_length));
+ }
+
+ return true;
+}