diff options
Diffstat (limited to 'src/backend/access/transam/xlogreader.c')
-rw-r--r-- | src/backend/access/transam/xlogreader.c | 486 |
1 files changed, 360 insertions, 126 deletions
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index 7d573cc585d..67d62234369 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -37,6 +37,8 @@ report_invalid_record(XLogReaderState *state, const char *fmt,...) the supplied arguments. */ __attribute__((format(PG_PRINTF_ATTRIBUTE, 2, 3))); +static void ResetDecoder(XLogReaderState *state); + /* size of the buffer allocated for error message. */ #define MAX_ERRORMSG_LEN 1000 @@ -59,46 +61,33 @@ report_invalid_record(XLogReaderState *state, const char *fmt,...) /* * Allocate and initialize a new XLogReader. * - * Returns NULL if the xlogreader couldn't be allocated. + * The returned XLogReader is palloc'd. (In FRONTEND code, that means that + * running out-of-memory causes an immediate exit(1). */ XLogReaderState * XLogReaderAllocate(XLogPageReadCB pagereadfunc, void *private_data) { XLogReaderState *state; - AssertArg(pagereadfunc != NULL); + state = (XLogReaderState *) palloc0(sizeof(XLogReaderState)); - state = (XLogReaderState *) malloc(sizeof(XLogReaderState)); - if (!state) - return NULL; - MemSet(state, 0, sizeof(XLogReaderState)); + state->max_block_id = -1; /* * Permanently allocate readBuf. We do it this way, rather than just * making a static array, for two reasons: (1) no need to waste the * storage in most instantiations of the backend; (2) a static char array - * isn't guaranteed to have any particular alignment, whereas malloc() + * isn't guaranteed to have any particular alignment, whereas palloc() * will provide MAXALIGN'd storage. */ - state->readBuf = (char *) malloc(XLOG_BLCKSZ); - if (!state->readBuf) - { - free(state); - return NULL; - } + state->readBuf = (char *) palloc(XLOG_BLCKSZ); state->read_page = pagereadfunc; /* system_identifier initialized to zeroes above */ state->private_data = private_data; /* ReadRecPtr and EndRecPtr initialized to zeroes above */ /* readSegNo, readOff, readLen, readPageTLI initialized to zeroes above */ - state->errormsg_buf = malloc(MAX_ERRORMSG_LEN + 1); - if (!state->errormsg_buf) - { - free(state->readBuf); - free(state); - return NULL; - } + state->errormsg_buf = palloc(MAX_ERRORMSG_LEN + 1); state->errormsg_buf[0] = '\0'; /* @@ -107,9 +96,9 @@ XLogReaderAllocate(XLogPageReadCB pagereadfunc, void *private_data) */ if (!allocate_recordbuf(state, 0)) { - free(state->errormsg_buf); - free(state->readBuf); - free(state); + pfree(state->errormsg_buf); + pfree(state->readBuf); + pfree(state); return NULL; } @@ -119,11 +108,24 @@ XLogReaderAllocate(XLogPageReadCB pagereadfunc, void *private_data) void XLogReaderFree(XLogReaderState *state) { - free(state->errormsg_buf); + int block_id; + + for (block_id = 0; block_id <= state->max_block_id; block_id++) + { + if (state->blocks[block_id].in_use) + { + if (state->blocks[block_id].data) + pfree(state->blocks[block_id].data); + } + } + if (state->main_data) + pfree(state->main_data); + + pfree(state->errormsg_buf); if (state->readRecordBuf) - free(state->readRecordBuf); - free(state->readBuf); - free(state); + pfree(state->readRecordBuf); + pfree(state->readBuf); + pfree(state); } /* @@ -146,14 +148,8 @@ allocate_recordbuf(XLogReaderState *state, uint32 reclength) newSize = Max(newSize, 5 * Max(BLCKSZ, XLOG_BLCKSZ)); if (state->readRecordBuf) - free(state->readRecordBuf); - state->readRecordBuf = (char *) malloc(newSize); - if (!state->readRecordBuf) - { - state->readRecordBufSize = 0; - return false; - } - + pfree(state->readRecordBuf); + state->readRecordBuf = (char *) palloc(newSize); state->readRecordBufSize = newSize; return true; } @@ -191,6 +187,8 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg) *errormsg = NULL; state->errormsg_buf[0] = '\0'; + ResetDecoder(state); + if (RecPtr == InvalidXLogRecPtr) { RecPtr = state->EndRecPtr; @@ -440,7 +438,10 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg) state->EndRecPtr -= state->EndRecPtr % XLogSegSize; } - return record; + if (DecodeXLogRecord(state, record, errormsg)) + return record; + else + return NULL; err: @@ -579,30 +580,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, XLogRecPtr PrevRecPtr, XLogRecord *record, bool randAccess) { - /* - * xl_len == 0 is bad data for everything except XLOG SWITCH, where it is - * required. - */ - if (record->xl_rmid == RM_XLOG_ID && record->xl_info == XLOG_SWITCH) - { - if (record->xl_len != 0) - { - report_invalid_record(state, - "invalid xlog switch record at %X/%X", - (uint32) (RecPtr >> 32), (uint32) RecPtr); - return false; - } - } - else if (record->xl_len == 0) - { - report_invalid_record(state, - "record with zero length at %X/%X", - (uint32) (RecPtr >> 32), (uint32) RecPtr); - return false; - } - if (record->xl_tot_len < SizeOfXLogRecord + record->xl_len || - record->xl_tot_len > SizeOfXLogRecord + record->xl_len + - XLR_MAX_BKP_BLOCKS * (sizeof(BkpBlock) + BLCKSZ)) + if (record->xl_tot_len < SizeOfXLogRecord) { report_invalid_record(state, "invalid record length at %X/%X", @@ -663,79 +641,17 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, * We assume all of the record (that is, xl_tot_len bytes) has been read * into memory at *record. Also, ValidXLogRecordHeader() has accepted the * record's header, which means in particular that xl_tot_len is at least - * SizeOfXlogRecord, so it is safe to fetch xl_len. + * SizeOfXlogRecord. */ static bool ValidXLogRecord(XLogReaderState *state, XLogRecord *record, XLogRecPtr recptr) { pg_crc32 crc; - int i; - uint32 len = record->xl_len; - BkpBlock bkpb; - char *blk; - size_t remaining = record->xl_tot_len; - /* First the rmgr data */ - if (remaining < SizeOfXLogRecord + len) - { - /* ValidXLogRecordHeader() should've caught this already... */ - report_invalid_record(state, "invalid record length at %X/%X", - (uint32) (recptr >> 32), (uint32) recptr); - return false; - } - remaining -= SizeOfXLogRecord + len; + /* Calculate the CRC */ INIT_CRC32C(crc); - COMP_CRC32C(crc, XLogRecGetData(record), len); - - /* Add in the backup blocks, if any */ - blk = (char *) XLogRecGetData(record) + len; - for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++) - { - uint32 blen; - - if (!(record->xl_info & XLR_BKP_BLOCK(i))) - continue; - - if (remaining < sizeof(BkpBlock)) - { - report_invalid_record(state, - "invalid backup block size in record at %X/%X", - (uint32) (recptr >> 32), (uint32) recptr); - return false; - } - memcpy(&bkpb, blk, sizeof(BkpBlock)); - - if (bkpb.hole_offset + bkpb.hole_length > BLCKSZ) - { - report_invalid_record(state, - "incorrect hole size in record at %X/%X", - (uint32) (recptr >> 32), (uint32) recptr); - return false; - } - blen = sizeof(BkpBlock) + BLCKSZ - bkpb.hole_length; - - if (remaining < blen) - { - report_invalid_record(state, - "invalid backup block size in record at %X/%X", - (uint32) (recptr >> 32), (uint32) recptr); - return false; - } - remaining -= blen; - COMP_CRC32C(crc, blk, blen); - blk += blen; - } - - /* Check that xl_tot_len agrees with our calculation */ - if (remaining != 0) - { - report_invalid_record(state, - "incorrect total length in record at %X/%X", - (uint32) (recptr >> 32), (uint32) recptr); - return false; - } - - /* Finally include the record header */ + COMP_CRC32C(crc, ((char *) record) + SizeOfXLogRecord, record->xl_tot_len - SizeOfXLogRecord); + /* include the record header last */ COMP_CRC32C(crc, (char *) record, offsetof(XLogRecord, xl_crc)); FIN_CRC32C(crc); @@ -985,3 +901,321 @@ out: } #endif /* FRONTEND */ + + +/* ---------------------------------------- + * Functions for decoding the data and block references in a record. + * ---------------------------------------- + */ + +/* private function to reset the state between records */ +static void +ResetDecoder(XLogReaderState *state) +{ + int block_id; + + state->decoded_record = NULL; + + state->main_data_len = 0; + + for (block_id = 0; block_id <= state->max_block_id; block_id++) + { + state->blocks[block_id].in_use = false; + state->blocks[block_id].has_image = false; + state->blocks[block_id].has_data = false; + } + state->max_block_id = -1; +} + +/* + * Decode the previously read record. + * + * On error, a human-readable error message is returned in *errormsg, and + * the return value is false. + */ +bool +DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg) +{ + /* + * read next _size bytes from record buffer, but check for overrun first. + */ +#define COPY_HEADER_FIELD(_dst, _size) \ + do { \ + if (remaining < _size) \ + goto shortdata_err; \ + memcpy(_dst, ptr, _size); \ + ptr += _size; \ + remaining -= _size; \ + } while(0) + + char *ptr; + uint32 remaining; + uint32 datatotal; + RelFileNode *rnode = NULL; + uint8 block_id; + + ResetDecoder(state); + + state->decoded_record = record; + + ptr = (char *) record; + ptr += SizeOfXLogRecord; + remaining = record->xl_tot_len - SizeOfXLogRecord; + + /* Decode the headers */ + datatotal = 0; + while (remaining > datatotal) + { + COPY_HEADER_FIELD(&block_id, sizeof(uint8)); + + if (block_id == XLR_BLOCK_ID_DATA_SHORT) + { + /* XLogRecordDataHeaderShort */ + uint8 main_data_len; + + COPY_HEADER_FIELD(&main_data_len, sizeof(uint8)); + + state->main_data_len = main_data_len; + datatotal += main_data_len; + break; /* by convention, the main data fragment is + * always last */ + } + else if (block_id == XLR_BLOCK_ID_DATA_LONG) + { + /* XLogRecordDataHeaderLong */ + uint32 main_data_len; + + COPY_HEADER_FIELD(&main_data_len, sizeof(uint32)); + state->main_data_len = main_data_len; + datatotal += main_data_len; + break; /* by convention, the main data fragment is + * always last */ + } + else if (block_id <= XLR_MAX_BLOCK_ID) + { + /* XLogRecordBlockHeader */ + DecodedBkpBlock *blk; + uint8 fork_flags; + + if (block_id <= state->max_block_id) + { + report_invalid_record(state, + "out-of-order block_id %u at %X/%X", + block_id, + (uint32) (state->ReadRecPtr >> 32), + (uint32) state->ReadRecPtr); + goto err; + } + state->max_block_id = block_id; + + blk = &state->blocks[block_id]; + blk->in_use = true; + + COPY_HEADER_FIELD(&fork_flags, sizeof(uint8)); + blk->forknum = fork_flags & BKPBLOCK_FORK_MASK; + blk->flags = fork_flags; + blk->has_image = ((fork_flags & BKPBLOCK_HAS_IMAGE) != 0); + blk->has_data = ((fork_flags & BKPBLOCK_HAS_DATA) != 0); + + COPY_HEADER_FIELD(&blk->data_len, sizeof(uint16)); + /* cross-check that the HAS_DATA flag is set iff data_length > 0 */ + if (blk->has_data && blk->data_len == 0) + report_invalid_record(state, + "BKPBLOCK_HAS_DATA set, but no data included at %X/%X", + (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr); + if (!blk->has_data && blk->data_len != 0) + report_invalid_record(state, + "BKPBLOCK_HAS_DATA not set, but data length is %u at %X/%X", + (unsigned int) blk->data_len, + (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr); + datatotal += blk->data_len; + + if (blk->has_image) + { + COPY_HEADER_FIELD(&blk->hole_offset, sizeof(uint16)); + COPY_HEADER_FIELD(&blk->hole_length, sizeof(uint16)); + datatotal += BLCKSZ - blk->hole_length; + } + if (!(fork_flags & BKPBLOCK_SAME_REL)) + { + COPY_HEADER_FIELD(&blk->rnode, sizeof(RelFileNode)); + rnode = &blk->rnode; + } + else + { + if (rnode == NULL) + { + report_invalid_record(state, + "BKPBLOCK_SAME_REL set but no previous rel at %X/%X", + (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr); + goto err; + } + + blk->rnode = *rnode; + } + COPY_HEADER_FIELD(&blk->blkno, sizeof(BlockNumber)); + } + else + { + report_invalid_record(state, + "invalid block_id %u at %X/%X", + block_id, + (uint32) (state->ReadRecPtr >> 32), + (uint32) state->ReadRecPtr); + goto err; + } + } + + if (remaining != datatotal) + goto shortdata_err; + + /* + * Ok, we've parsed the fragment headers, and verified that the total + * length of the payload in the fragments is equal to the amount of data + * left. Copy the data of each fragment to a separate buffer. + * + * We could just set up pointers into readRecordBuf, but we want to align + * the data for the convenience of the callers. Backup images are not + * copied, however; they don't need alignment. + */ + + /* block data first */ + for (block_id = 0; block_id <= state->max_block_id; block_id++) + { + DecodedBkpBlock *blk = &state->blocks[block_id]; + + if (!blk->in_use) + continue; + if (blk->has_image) + { + blk->bkp_image = ptr; + ptr += BLCKSZ - blk->hole_length; + } + if (blk->has_data) + { + if (!blk->data || blk->data_len > blk->data_bufsz) + { + if (blk->data) + pfree(blk->data); + blk->data_bufsz = blk->data_len; + blk->data = palloc(blk->data_bufsz); + } + memcpy(blk->data, ptr, blk->data_len); + ptr += blk->data_len; + } + } + + /* and finally, the main data */ + if (state->main_data_len > 0) + { + if (!state->main_data || state->main_data_len > state->main_data_bufsz) + { + if (state->main_data) + pfree(state->main_data); + state->main_data_bufsz = state->main_data_len; + state->main_data = palloc(state->main_data_bufsz); + } + memcpy(state->main_data, ptr, state->main_data_len); + ptr += state->main_data_len; + } + + return true; + +shortdata_err: + report_invalid_record(state, + "record with invalid length at %X/%X", + (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr); +err: + *errormsg = state->errormsg_buf; + + return false; +} + +/* + * Returns information about the block that a block reference refers to. + * + * If the WAL record contains a block reference with the given ID, *rnode, + * *forknum, and *blknum are filled in (if not NULL), and returns TRUE. + * Otherwise returns FALSE. + */ +bool +XLogRecGetBlockTag(XLogReaderState *record, uint8 block_id, + RelFileNode *rnode, ForkNumber *forknum, BlockNumber *blknum) +{ + DecodedBkpBlock *bkpb; + + if (!record->blocks[block_id].in_use) + return false; + + bkpb = &record->blocks[block_id]; + if (rnode) + *rnode = bkpb->rnode; + if (forknum) + *forknum = bkpb->forknum; + if (blknum) + *blknum = bkpb->blkno; + return true; +} + +/* + * Returns the data associated with a block reference, or NULL if there is + * no data (e.g. because a full-page image was taken instead). The returned + * pointer points to a MAXALIGNed buffer. + */ +char * +XLogRecGetBlockData(XLogReaderState *record, uint8 block_id, Size *len) +{ + DecodedBkpBlock *bkpb; + + if (!record->blocks[block_id].in_use) + return NULL; + + bkpb = &record->blocks[block_id]; + + if (!bkpb->has_data) + { + if (len) + *len = 0; + return NULL; + } + else + { + if (len) + *len = bkpb->data_len; + return bkpb->data; + } +} + +/* + * Restore a full-page image from a backup block attached to an XLOG record. + * + * Returns the buffer number containing the page. + */ +bool +RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page) +{ + DecodedBkpBlock *bkpb; + + if (!record->blocks[block_id].in_use) + return false; + if (!record->blocks[block_id].has_image) + return false; + + bkpb = &record->blocks[block_id]; + + if (bkpb->hole_length == 0) + { + memcpy(page, bkpb->bkp_image, BLCKSZ); + } + else + { + memcpy(page, bkpb->bkp_image, bkpb->hole_offset); + /* must zero-fill the hole */ + MemSet(page + bkpb->hole_offset, 0, bkpb->hole_length); + memcpy(page + (bkpb->hole_offset + bkpb->hole_length), + bkpb->bkp_image + bkpb->hole_offset, + BLCKSZ - (bkpb->hole_offset + bkpb->hole_length)); + } + + return true; +} |