diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/access/transam/generic_xlog.c | 6 | ||||
-rw-r--r-- | src/backend/access/transam/xlog.c | 28 | ||||
-rw-r--r-- | src/backend/access/transam/xlogreader.c | 744 | ||||
-rw-r--r-- | src/backend/access/transam/xlogutils.c | 2 | ||||
-rw-r--r-- | src/backend/replication/logical/decode.c | 2 | ||||
-rw-r--r-- | src/bin/pg_rewind/parsexlog.c | 2 | ||||
-rw-r--r-- | src/bin/pg_waldump/pg_waldump.c | 22 | ||||
-rw-r--r-- | src/include/access/xlogreader.h | 128 |
8 files changed, 734 insertions, 200 deletions
diff --git a/src/backend/access/transam/generic_xlog.c b/src/backend/access/transam/generic_xlog.c index 63301a1ab16..0e9bcc71596 100644 --- a/src/backend/access/transam/generic_xlog.c +++ b/src/backend/access/transam/generic_xlog.c @@ -482,10 +482,10 @@ generic_redo(XLogReaderState *record) uint8 block_id; /* Protect limited size of buffers[] array */ - Assert(record->max_block_id < MAX_GENERIC_XLOG_PAGES); + Assert(XLogRecMaxBlockId(record) < MAX_GENERIC_XLOG_PAGES); /* Iterate over blocks */ - for (block_id = 0; block_id <= record->max_block_id; block_id++) + for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++) { XLogRedoAction action; @@ -525,7 +525,7 @@ generic_redo(XLogReaderState *record) } /* Changes are done: unlock and release all buffers */ - for (block_id = 0; block_id <= record->max_block_id; block_id++) + for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++) { if (BufferIsValid(buffers[block_id])) UnlockReleaseBuffer(buffers[block_id]); diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 7faac01bf24..729fc5ff13c 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -1209,6 +1209,7 @@ XLogInsertRecord(XLogRecData *rdata, StringInfoData recordBuf; char *errormsg = NULL; MemoryContext oldCxt; + DecodedXLogRecord *decoded; oldCxt = MemoryContextSwitchTo(walDebugCxt); @@ -1224,6 +1225,9 @@ XLogInsertRecord(XLogRecData *rdata, for (; rdata != NULL; rdata = rdata->next) appendBinaryStringInfo(&recordBuf, rdata->data, rdata->len); + /* How much space would it take to decode this record? */ + decoded = palloc(DecodeXLogRecordRequiredSpace(recordBuf.len)); + if (!debug_reader) debug_reader = XLogReaderAllocate(wal_segment_size, NULL, NULL); @@ -1231,7 +1235,9 @@ XLogInsertRecord(XLogRecData *rdata, { appendStringInfoString(&buf, "error decoding record: out of memory"); } - else if (!DecodeXLogRecord(debug_reader, (XLogRecord *) recordBuf.data, + else if (!DecodeXLogRecord(debug_reader, decoded, + (XLogRecord *) recordBuf.data, + EndPos, &errormsg)) { appendStringInfo(&buf, "error decoding record: %s", @@ -1240,10 +1246,17 @@ XLogInsertRecord(XLogRecData *rdata, else { appendStringInfoString(&buf, " - "); + /* + * Temporarily make this decoded record the current record for + * XLogRecGetXXX() macros. + */ + debug_reader->record = decoded; xlog_outdesc(&buf, debug_reader); + debug_reader->record = NULL; } elog(LOG, "%s", buf.data); + pfree(decoded); pfree(buf.data); pfree(recordBuf.data); MemoryContextSwitchTo(oldCxt); @@ -1417,7 +1430,7 @@ checkXLogConsistency(XLogReaderState *record) Assert((XLogRecGetInfo(record) & XLR_CHECK_CONSISTENCY) != 0); - for (block_id = 0; block_id <= record->max_block_id; block_id++) + for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++) { Buffer buf; Page page; @@ -4383,6 +4396,7 @@ ReadRecord(XLogReaderState *xlogreader, int emode, ReadRecPtr = xlogreader->ReadRecPtr; EndRecPtr = xlogreader->EndRecPtr; + if (record == NULL) { if (readFile >= 0) @@ -10300,7 +10314,7 @@ xlog_redo(XLogReaderState *record) * XLOG_FPI and XLOG_FPI_FOR_HINT records, they use a different info * code just to distinguish them for statistics purposes. */ - for (uint8 block_id = 0; block_id <= record->max_block_id; block_id++) + for (uint8 block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++) { Buffer buffer; @@ -10435,7 +10449,7 @@ xlog_block_info(StringInfo buf, XLogReaderState *record) int block_id; /* decode block references */ - for (block_id = 0; block_id <= record->max_block_id; block_id++) + for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++) { RelFileNode rnode; ForkNumber forknum; @@ -12104,7 +12118,7 @@ XLogPageRead(XLogReaderState *state, XLogRecPtr targetPagePtr = state->readPagePtr; int reqLen = state->reqLen; int readLen = 0; - XLogRecPtr targetRecPtr = state->ReadRecPtr; + XLogRecPtr targetRecPtr = state->DecodeRecPtr; uint32 targetPageOff; XLogSegNo targetSegNo PG_USED_FOR_ASSERTS_ONLY; int r; @@ -12122,6 +12136,9 @@ XLogPageRead(XLogReaderState *state, /* * Request a restartpoint if we've replayed too much xlog since the * last one. + * + * XXX Why is this here? Move it to recovery loop, since it's based + * on replay position, not read position? */ if (bgwriterLaunched) { @@ -12613,6 +12630,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, * be updated on each cycle. When we are behind, * XLogReceiptTime will not advance, so the grace time * allotted to conflicting queries will decrease. + * */ if (RecPtr < flushedUpto) havedata = true; diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index 02257768ec8..f66592482a4 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -38,6 +38,9 @@ static void report_invalid_record(XLogReaderState *state, const char *fmt,...) static bool allocate_recordbuf(XLogReaderState *state, uint32 reclength); static bool XLogNeedData(XLogReaderState *state, XLogRecPtr pageptr, int reqLen, bool header_inclusive); +size_t DecodeXLogRecordRequiredSpace(size_t xl_tot_len); +static XLogReadRecordResult XLogDecodeOneRecord(XLogReaderState *state, + bool allow_oversized); static void XLogReaderInvalReadState(XLogReaderState *state); static bool ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, XLogRecPtr PrevRecPtr, XLogRecord *record); @@ -50,6 +53,8 @@ static void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt, /* size of the buffer allocated for error message. */ #define MAX_ERRORMSG_LEN 1000 +#define DEFAULT_DECODE_BUFFER_SIZE 0x10000 + /* * Construct a string in state->errormsg_buf explaining what's wrong with * the current record being read. @@ -64,6 +69,8 @@ report_invalid_record(XLogReaderState *state, const char *fmt,...) va_start(args, fmt); vsnprintf(state->errormsg_buf, MAX_ERRORMSG_LEN, fmt, args); va_end(args); + + state->errormsg_deferred = true; } /* @@ -86,8 +93,6 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir, /* initialize caller-provided support functions */ state->cleanup_cb = cleanup_cb; - state->max_block_id = -1; - /* * Permanently allocate readBuf. We do it this way, rather than just * making a static array, for two reasons: (1) no need to waste the @@ -136,18 +141,11 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir, void XLogReaderFree(XLogReaderState *state) { - int block_id; - if (state->seg.ws_file >= 0) state->cleanup_cb(state); - for (block_id = 0; block_id <= XLR_MAX_BLOCK_ID; block_id++) - { - if (state->blocks[block_id].data) - pfree(state->blocks[block_id].data); - } - if (state->main_data) - pfree(state->main_data); + if (state->decode_buffer && state->free_decode_buffer) + pfree(state->decode_buffer); pfree(state->errormsg_buf); if (state->readRecordBuf) @@ -157,6 +155,22 @@ XLogReaderFree(XLogReaderState *state) } /* + * Set the size of the decoding buffer. A pointer to a caller supplied memory + * region may also be passed in, in which case non-oversized records will be + * decoded there. + */ +void +XLogReaderSetDecodeBuffer(XLogReaderState *state, void *buffer, size_t size) +{ + Assert(state->decode_buffer == NULL); + + state->decode_buffer = buffer; + state->decode_buffer_size = size; + state->decode_buffer_head = buffer; + state->decode_buffer_tail = buffer; +} + +/* * Allocate readRecordBuf to fit a record of at least the given length. * Returns true if successful, false if out of memory. * @@ -243,22 +257,123 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr) /* Begin at the passed-in record pointer. */ state->EndRecPtr = RecPtr; + state->NextRecPtr = RecPtr; state->ReadRecPtr = InvalidXLogRecPtr; + state->DecodeRecPtr = InvalidXLogRecPtr; state->readRecordState = XLREAD_NEXT_RECORD; } /* - * Attempt to read an XLOG record. - * - * XLogBeginRead() or XLogFindNextRecord() must be called before the first call - * to XLogReadRecord(). + * See if we can release the last record that was returned by + * XLogReadRecord(), to free up space. + */ +static void +XLogReleasePreviousRecord(XLogReaderState *state) +{ + DecodedXLogRecord *record; + + /* + * Remove it from the decoded record queue. It must be the oldest + * item decoded, decode_queue_tail. + */ + record = state->record; + Assert(record == state->decode_queue_tail); + state->record = NULL; + state->decode_queue_tail = record->next; + + /* It might also be the newest item decoded, decode_queue_head. */ + if (state->decode_queue_head == record) + state->decode_queue_head = NULL; + + /* Release the space. */ + if (unlikely(record->oversized)) + { + /* It's not in the the decode buffer, so free it to release space. */ + pfree(record); + } + else + { + /* It must be the tail record in the decode buffer. */ + Assert(state->decode_buffer_tail == (char *) record); + + /* + * We need to update tail to point to the next record that is in the + * decode buffer, if any, being careful to skip oversized ones + * (they're not in the decode buffer). + */ + record = record->next; + while (unlikely(record && record->oversized)) + record = record->next; + + if (record) + { + /* Adjust tail to release space up to the next record. */ + state->decode_buffer_tail = (char *) record; + } + else if (state->decoding && !state->decoding->oversized) + { + /* + * We're releasing the last fully decoded record in + * XLogReadRecord(), but some time earlier we partially decoded a + * record in XLogReadAhead() and were unable to complete the job. + * We'll set the buffer head and tail to point to the record we + * started working on, so that we can continue (perhaps from a + * different source). + */ + state->decode_buffer_tail = (char *) state->decoding; + state->decode_buffer_head = (char *) state->decoding; + } + else + { + /* + * Otherwise we might as well just reset head and tail to the + * start of the buffer space, because we're empty. This means + * we'll keep overwriting the same piece of memory if we're not + * doing any prefetching. + */ + state->decode_buffer_tail = state->decode_buffer; + state->decode_buffer_head = state->decode_buffer; + } + } +} + +/* + * Similar to XLogNextRecord(), but this traditional interface is for code + * that just wants the header, not the decoded record. Callers can access the + * decoded record through the XLogRecGetXXX() macros. + */ +XLogReadRecordResult +XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) +{ + XLogReadRecordResult result; + DecodedXLogRecord *decoded; + + /* Consume the next decoded record. */ + result = XLogNextRecord(state, &decoded, errormsg); + if (result == XLREAD_SUCCESS) + { + /* + * The traditional interface just returns the header, not the decoded + * record. The caller will access the decoded record through the + * XLogRecGetXXX() macros. + */ + *record = &decoded->header; + } + else + *record = NULL; + return result; +} + +/* + * Consume the next record. XLogBeginRead() or XLogFindNextRecord() must be + * called before the first call to XLogNextRecord(). * * This function may return XLREAD_NEED_DATA several times before returning a * result record. The caller shall read in some new data then call this * function again with the same parameters. * * When a record is successfully read, returns XLREAD_SUCCESS with result - * record being stored in *record. Otherwise *record is NULL. + * record being stored in *record. Otherwise *record is set to NULL. * * Returns XLREAD_NEED_DATA if more data is needed to finish decoding the * current record. In that case, state->readPagePtr and state->reqLen inform @@ -269,11 +384,249 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr) * length of data that is now available (which must be >= given reqLen), * respectively. * - * If invalid data is encountered, returns XLREAD_FAIL and sets *record to - * NULL. *errormsg is set to a string with details of the failure. The + * Returns XLREAD_FULL if allow_oversized is true, and no space is available. + * This is intended for readahead. + * + * If invalid data is encountered, returns XLREAD_FAIL with *record being set + * to NULL. *errormsg is set to a string with details of the failure. The * returned pointer (or *errormsg) points to an internal buffer that's valid * until the next call to XLogReadRecord. * + */ +XLogReadRecordResult +XLogNextRecord(XLogReaderState *state, + DecodedXLogRecord **record, + char **errormsg) +{ + /* Release the space occupied by the last record we returned. */ + if (state->record) + XLogReleasePreviousRecord(state); + + for (;;) + { + XLogReadRecordResult result; + + /* We can now return the oldest item in the queue, if there is one. */ + if (state->decode_queue_tail) + { + /* + * Record this as the most recent record returned, so that we'll + * release it next time. This also exposes it to the + * XLogRecXXX(decoder) macros, which pass in the decoder rather + * than the record for historical reasons. + */ + state->record = state->decode_queue_tail; + + /* + * It should be immediately after the last the record returned by + * XLogReadRecord(), or at the position set by XLogBeginRead() if + * XLogReadRecord() hasn't been called yet. It may be after a + * page header, though. + */ + Assert(state->record->lsn == state->EndRecPtr || + (state->EndRecPtr % XLOG_BLCKSZ == 0 && + (state->record->lsn == state->EndRecPtr + SizeOfXLogShortPHD || + state->record->lsn == state->EndRecPtr + SizeOfXLogLongPHD))); + + /* + * Set ReadRecPtr and EndRecPtr to correspond to that + * record. + * + * Calling code could access these through the returned decoded + * record, but for now we'll update them directly here, for the + * benefit of all the existing code that accesses these variables + * directly. + */ + state->ReadRecPtr = state->record->lsn; + state->EndRecPtr = state->record->next_lsn; + + *errormsg = NULL; + *record = state->record; + + return XLREAD_SUCCESS; + } + else if (state->errormsg_deferred) + { + /* + * If we've run out of records, but we have a deferred error, now + * is the time to report it. + */ + state->errormsg_deferred = false; + if (state->errormsg_buf[0] != '\0') + *errormsg = state->errormsg_buf; + else + *errormsg = NULL; + *record = NULL; + state->EndRecPtr = state->DecodeRecPtr; + + return XLREAD_FAIL; + } + + /* We need to get a decoded record into our queue first. */ + result = XLogDecodeOneRecord(state, true /* allow_oversized */ ); + switch(result) + { + case XLREAD_NEED_DATA: + *errormsg = NULL; + *record = NULL; + return result; + case XLREAD_SUCCESS: + Assert(state->decode_queue_tail != NULL); + break; + case XLREAD_FULL: + /* Not expected because we passed allow_oversized = true */ + Assert(false); + break; + case XLREAD_FAIL: + /* + * If that produced neither a queued record nor a queued error, + * then we're at the end (for example, archive recovery with no + * more files available). + */ + Assert(state->decode_queue_tail == NULL); + if (!state->errormsg_deferred) + { + state->EndRecPtr = state->DecodeRecPtr; + *errormsg = NULL; + *record = NULL; + return result; + } + break; + } + } + + /* unreachable */ + return XLREAD_FAIL; +} + +/* + * Try to decode the next available record. The next record will also be + * returned to XLogRecordRead(). + * + * In addition to the values that XLogReadRecord() can return, XLogReadAhead() + * can also return XLREAD_FULL to indicate that further readahead is not + * possible yet due to lack of space. + */ +XLogReadRecordResult +XLogReadAhead(XLogReaderState *state, DecodedXLogRecord **record, char **errormsg) +{ + XLogReadRecordResult result; + + /* We stop trying after encountering an error. */ + if (unlikely(state->errormsg_deferred)) + { + /* We only report the error message the first time, see below. */ + *errormsg = NULL; + return XLREAD_FAIL; + } + + /* + * Try to decode one more record, if we have space. Pass allow_oversized + * = false, so that this call returns fast if the decode buffer is full. + */ + result = XLogDecodeOneRecord(state, false); + switch (result) + { + case XLREAD_SUCCESS: + /* New record at head of decode record queue. */ + Assert(state->decode_queue_head != NULL); + *record = state->decode_queue_head; + return result; + case XLREAD_FULL: + /* No space in circular decode buffer. */ + return result; + case XLREAD_NEED_DATA: + /* The caller needs to insert more data. */ + return result; + case XLREAD_FAIL: + /* Report the error. XLogReadRecord() will also report it. */ + Assert(state->errormsg_deferred); + if (state->errormsg_buf[0] != '\0') + *errormsg = state->errormsg_buf; + return result; + } + + /* Unreachable. */ + return XLREAD_FAIL; +} + +/* + * Allocate space for a decoded record. The only member of the returned + * object that is initialized is the 'oversized' flag, indicating that the + * decoded record wouldn't fit in the decode buffer and must eventually be + * freed explicitly. + * + * Return NULL if there is no space in the decode buffer and allow_oversized + * is false, or if memory allocation fails for an oversized buffer. + */ +static DecodedXLogRecord * +XLogReadRecordAlloc(XLogReaderState *state, size_t xl_tot_len, bool allow_oversized) +{ + size_t required_space = DecodeXLogRecordRequiredSpace(xl_tot_len); + DecodedXLogRecord *decoded = NULL; + + /* Allocate a circular decode buffer if we don't have one already. */ + if (unlikely(state->decode_buffer == NULL)) + { + if (state->decode_buffer_size == 0) + state->decode_buffer_size = DEFAULT_DECODE_BUFFER_SIZE; + state->decode_buffer = palloc(state->decode_buffer_size); + state->decode_buffer_head = state->decode_buffer; + state->decode_buffer_tail = state->decode_buffer; + state->free_decode_buffer = true; + } + if (state->decode_buffer_head >= state->decode_buffer_tail) + { + /* Empty, or head is to the right of tail. */ + if (state->decode_buffer_head + required_space <= + state->decode_buffer + state->decode_buffer_size) + { + /* There is space between head and end. */ + decoded = (DecodedXLogRecord *) state->decode_buffer_head; + decoded->oversized = false; + return decoded; + } + else if (state->decode_buffer + required_space < + state->decode_buffer_tail) + { + /* There is space between start and tail. */ + decoded = (DecodedXLogRecord *) state->decode_buffer; + decoded->oversized = false; + return decoded; + } + } + else + { + /* Head is to the left of tail. */ + if (state->decode_buffer_head + required_space < + state->decode_buffer_tail) + { + /* There is space between head and tail. */ + decoded = (DecodedXLogRecord *) state->decode_buffer_head; + decoded->oversized = false; + return decoded; + } + } + + /* Not enough space in the decode buffer. Are we allowed to allocate? */ + if (allow_oversized) + { + decoded = palloc_extended(required_space, MCXT_ALLOC_NO_OOM); + if (decoded == NULL) + return NULL; + decoded->oversized = true; + return decoded; + } + + return decoded; +} + +/* + * Try to read and decode the next record and add it to the head of the + * decoded record queue. If 'allow_oversized' is false, then XLREAD_FULL can + * be returned to indicate the decoding buffer is full. XLogBeginRead() or + * XLogFindNextRecord() must be called before the first call to + * XLogReadRecord(). * * This function runs a state machine consisting of the following states. * @@ -300,35 +653,35 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr) * current state. This behavior allows us to continue reading a record * after switching to a different source, during streaming replication. */ -XLogReadRecordResult -XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) +static XLogReadRecordResult +XLogDecodeOneRecord(XLogReaderState *state, bool allow_oversized) { + XLogRecord *record; + char *errormsg; /* not used */ XLogRecord *prec; - *record = NULL; - /* reset error state */ - *errormsg = NULL; state->errormsg_buf[0] = '\0'; + record = NULL; switch (state->readRecordState) { case XLREAD_NEXT_RECORD: - ResetDecoder(state); + Assert(!state->decoding); - if (state->ReadRecPtr != InvalidXLogRecPtr) + if (state->DecodeRecPtr != InvalidXLogRecPtr) { /* read the record after the one we just read */ /* - * EndRecPtr is pointing to end+1 of the previous WAL record. + * NextRecPtr is pointing to end+1 of the previous WAL record. * If we're at a page boundary, no more records can fit on the * current page. We must skip over the page header, but we * can't do that until we've read in the page, since the * header size is variable. */ - state->PrevRecPtr = state->ReadRecPtr; - state->ReadRecPtr = state->EndRecPtr; + state->PrevRecPtr = state->DecodeRecPtr; + state->DecodeRecPtr = state->NextRecPtr; } else { @@ -338,8 +691,8 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) * In this case, EndRecPtr should already be pointing to a * valid record starting position. */ - Assert(XRecOffIsValid(state->EndRecPtr)); - state->ReadRecPtr = state->EndRecPtr; + Assert(XRecOffIsValid(state->NextRecPtr)); + state->DecodeRecPtr = state->NextRecPtr; /* * We cannot verify the previous-record pointer when we're @@ -347,7 +700,6 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) * won't try doing that. */ state->PrevRecPtr = InvalidXLogRecPtr; - state->EndRecPtr = InvalidXLogRecPtr; /* to be tidy */ } state->record_verified = false; @@ -362,9 +714,11 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) uint32 targetRecOff; XLogPageHeader pageHeader; + Assert(!state->decoding); + targetPagePtr = - state->ReadRecPtr - (state->ReadRecPtr % XLOG_BLCKSZ); - targetRecOff = state->ReadRecPtr % XLOG_BLCKSZ; + state->DecodeRecPtr - (state->DecodeRecPtr % XLOG_BLCKSZ); + targetRecOff = state->DecodeRecPtr % XLOG_BLCKSZ; /* * Check if we have enough data. For the first record in the @@ -385,13 +739,13 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) if (targetRecOff == 0) { /* At page start, so skip over page header. */ - state->ReadRecPtr += pageHeaderSize; + state->DecodeRecPtr += pageHeaderSize; targetRecOff = pageHeaderSize; } else if (targetRecOff < pageHeaderSize) { report_invalid_record(state, "invalid record offset at %X/%X", - LSN_FORMAT_ARGS(state->ReadRecPtr)); + LSN_FORMAT_ARGS(state->DecodeRecPtr)); goto err; } @@ -400,8 +754,8 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) targetRecOff == pageHeaderSize) { report_invalid_record(state, "contrecord is requested by %X/%X", - (uint32) (state->ReadRecPtr >> 32), - (uint32) state->ReadRecPtr); + (uint32) (state->DecodeRecPtr >> 32), + (uint32) state->DecodeRecPtr); goto err; } @@ -419,9 +773,26 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) * header. */ prec = (XLogRecord *) (state->readBuf + - state->ReadRecPtr % XLOG_BLCKSZ); + state->DecodeRecPtr % XLOG_BLCKSZ); total_len = prec->xl_tot_len; + /* Find space to decode this record. */ + Assert(state->decoding == NULL); + state->decoding = XLogReadRecordAlloc(state, total_len, + allow_oversized); + if (state->decoding == NULL) + { + /* + * We couldn't get space. If allow_oversized was true, + * then palloc() must have failed. Otherwise, report that + * our decoding buffer is full. This means that weare + * trying to read too far ahead. + */ + if (allow_oversized) + goto err; + return XLREAD_FULL; + } + /* * If the whole record header is on this page, validate it * immediately. Otherwise do just a basic sanity check on @@ -433,7 +804,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) */ if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord) { - if (!ValidXLogRecordHeader(state, state->ReadRecPtr, + if (!ValidXLogRecordHeader(state, state->DecodeRecPtr, state->PrevRecPtr, prec)) goto err; @@ -446,7 +817,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) { report_invalid_record(state, "invalid record length at %X/%X: wanted %u, got %u", - LSN_FORMAT_ARGS(state->ReadRecPtr), + LSN_FORMAT_ARGS(state->DecodeRecPtr), (uint32) SizeOfXLogRecord, total_len); goto err; } @@ -471,13 +842,15 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) XLogRecPtr targetPagePtr; uint32 targetRecOff; + Assert(state->decoding); + /* * Wait for the rest of the record on the first page to become * available */ targetPagePtr = - state->ReadRecPtr - (state->ReadRecPtr % XLOG_BLCKSZ); - targetRecOff = state->ReadRecPtr % XLOG_BLCKSZ; + state->DecodeRecPtr - (state->DecodeRecPtr % XLOG_BLCKSZ); + targetRecOff = state->DecodeRecPtr % XLOG_BLCKSZ; request_len = Min(targetRecOff + total_len, XLOG_BLCKSZ); record_len = request_len - targetRecOff; @@ -496,7 +869,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) /* validate record header if not yet */ if (!state->record_verified && record_len >= SizeOfXLogRecord) { - if (!ValidXLogRecordHeader(state, state->ReadRecPtr, + if (!ValidXLogRecordHeader(state, state->DecodeRecPtr, state->PrevRecPtr, prec)) goto err; @@ -509,15 +882,15 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) /* Record does not cross a page boundary */ Assert(state->record_verified); - if (!ValidXLogRecord(state, prec, state->ReadRecPtr)) + if (!ValidXLogRecord(state, prec, state->DecodeRecPtr)) goto err; state->record_verified = true; /* to be tidy */ /* We already checked the header earlier */ - state->EndRecPtr = state->ReadRecPtr + MAXALIGN(record_len); + state->NextRecPtr = state->DecodeRecPtr + MAXALIGN(record_len); - *record = prec; + record = prec; state->readRecordState = XLREAD_NEXT_RECORD; break; } @@ -536,7 +909,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) report_invalid_record(state, "record length %u at %X/%X too long", total_len, - LSN_FORMAT_ARGS(state->ReadRecPtr)); + LSN_FORMAT_ARGS(state->DecodeRecPtr)); goto err; } @@ -547,7 +920,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) state->recordRemainLen -= record_len; /* Calculate pointer to beginning of next page */ - state->recordContRecPtr = state->ReadRecPtr + record_len; + state->recordContRecPtr = state->DecodeRecPtr + record_len; Assert(state->recordContRecPtr % XLOG_BLCKSZ == 0); state->readRecordState = XLREAD_CONTINUATION; @@ -564,6 +937,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) * we enter this state only if we haven't read the whole * record. */ + Assert(state->decoding); Assert(state->recordRemainLen > 0); while (state->recordRemainLen > 0) @@ -583,7 +957,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) return XLREAD_NEED_DATA; if (!state->page_verified) - goto err; + goto err_continue; Assert(SizeOfXLogShortPHD <= state->readLen); @@ -596,8 +970,8 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) "there is no contrecord flag at %X/%X reading %X/%X", (uint32) (state->recordContRecPtr >> 32), (uint32) state->recordContRecPtr, - (uint32) (state->ReadRecPtr >> 32), - (uint32) state->ReadRecPtr); + (uint32) (state->DecodeRecPtr >> 32), + (uint32) state->DecodeRecPtr); goto err; } @@ -614,8 +988,8 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) pageHeader->xlp_rem_len, (uint32) (state->recordContRecPtr >> 32), (uint32) state->recordContRecPtr, - (uint32) (state->ReadRecPtr >> 32), - (uint32) state->ReadRecPtr, + (uint32) (state->DecodeRecPtr >> 32), + (uint32) state->DecodeRecPtr, state->recordRemainLen); goto err; } @@ -651,7 +1025,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) if (!state->record_verified) { Assert(state->recordGotLen >= SizeOfXLogRecord); - if (!ValidXLogRecordHeader(state, state->ReadRecPtr, + if (!ValidXLogRecordHeader(state, state->DecodeRecPtr, state->PrevRecPtr, (XLogRecord *) state->readRecordBuf)) goto err; @@ -668,16 +1042,17 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) /* targetPagePtr is pointing the last-read page here */ prec = (XLogRecord *) state->readRecordBuf; - if (!ValidXLogRecord(state, prec, state->ReadRecPtr)) + if (!ValidXLogRecord(state, prec, state->DecodeRecPtr)) goto err; pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf); - state->EndRecPtr = targetPagePtr + pageHeaderSize + state->NextRecPtr = targetPagePtr + pageHeaderSize + MAXALIGN(pageHeader->xlp_rem_len); - *record = prec; + record = prec; state->readRecordState = XLREAD_NEXT_RECORD; + break; } } @@ -685,32 +1060,65 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg) /* * Special processing if it's an XLOG SWITCH record */ - if ((*record)->xl_rmid == RM_XLOG_ID && - ((*record)->xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH) + if (record->xl_rmid == RM_XLOG_ID && + (record->xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH) { /* Pretend it extends to end of segment */ - state->EndRecPtr += state->segcxt.ws_segsize - 1; - state->EndRecPtr -= XLogSegmentOffset(state->EndRecPtr, state->segcxt.ws_segsize); + state->NextRecPtr += state->segcxt.ws_segsize - 1; + state->NextRecPtr -= XLogSegmentOffset(state->NextRecPtr, state->segcxt.ws_segsize); } - if (DecodeXLogRecord(state, *record, errormsg)) - return XLREAD_SUCCESS; + Assert(!record || state->readLen >= 0); + if (DecodeXLogRecord(state, state->decoding, record, state->DecodeRecPtr, &errormsg)) + { + /* Record the location of the next record. */ + state->decoding->next_lsn = state->NextRecPtr; - *record = NULL; - return XLREAD_FAIL; + /* + * If it's in the decode buffer (not an "oversized" record allocated + * with palloc()), mark the decode buffer space as occupied. + */ + if (!state->decoding->oversized) + { + /* The new decode buffer head must be MAXALIGNed. */ + Assert(state->decoding->size == MAXALIGN(state->decoding->size)); + if ((char *) state->decoding == state->decode_buffer) + state->decode_buffer_head = state->decode_buffer + + state->decoding->size; + else + state->decode_buffer_head += state->decoding->size; + } + + /* Insert it into the queue of decoded records. */ + Assert(state->decode_queue_head != state->decoding); + if (state->decode_queue_head) + state->decode_queue_head->next = state->decoding; + state->decode_queue_head = state->decoding; + if (!state->decode_queue_tail) + state->decode_queue_tail = state->decoding; + state->decoding = NULL; + + return XLREAD_SUCCESS; + } err: + if (state->decoding && state->decoding->oversized) + pfree(state->decoding); + state->decoding = NULL; +err_continue: /* * Invalidate the read page. We might read from a different source after * failure. */ XLogReaderInvalReadState(state); - if (state->errormsg_buf[0] != '\0') - *errormsg = state->errormsg_buf; + /* + * If an error was written to errmsg_buf, it'll be returned to the caller + * of XLogReadRecord() after all successfully decoded records from the + * read queue. + */ - *record = NULL; return XLREAD_FAIL; } @@ -1342,34 +1750,84 @@ WALRead(XLogReaderState *state, * ---------------------------------------- */ -/* private function to reset the state between records */ +/* + * Private function to reset the state, forgetting all decoded records, if we + * are asked to move to a new read position. + */ static void ResetDecoder(XLogReaderState *state) { - int block_id; - - state->decoded_record = NULL; + DecodedXLogRecord *r; - state->main_data_len = 0; - - for (block_id = 0; block_id <= state->max_block_id; block_id++) + /* Reset the decoded record queue, freeing any oversized records. */ + while ((r = state->decode_queue_tail)) { - state->blocks[block_id].in_use = false; - state->blocks[block_id].has_image = false; - state->blocks[block_id].has_data = false; - state->blocks[block_id].apply_image = false; + state->decode_queue_tail = r->next; + if (r->oversized) + pfree(r); } - state->max_block_id = -1; + state->decode_queue_head = NULL; + state->decode_queue_tail = NULL; + state->record = NULL; + state->decoding = NULL; + + /* Reset the decode buffer to empty. */ + state->decode_buffer_head = state->decode_buffer; + state->decode_buffer_tail = state->decode_buffer; + + /* Clear error state. */ + state->errormsg_buf[0] = '\0'; + state->errormsg_deferred = false; +} + +/* + * Compute the maximum possible amount of padding that could be required to + * decode a record, given xl_tot_len from the record's header. This is the + * amount of output buffer space that we need to decode a record, though we + * might not finish up using it all. + * + * This computation is pessimistic and assumes the maximum possible number of + * blocks, due to lack of better information. + */ +size_t +DecodeXLogRecordRequiredSpace(size_t xl_tot_len) +{ + size_t size = 0; + + /* Account for the fixed size part of the decoded record struct. */ + size += offsetof(DecodedXLogRecord, blocks[0]); + /* Account for the flexible blocks array of maximum possible size. */ + size += sizeof(DecodedBkpBlock) * (XLR_MAX_BLOCK_ID + 1); + /* Account for all the raw main and block data. */ + size += xl_tot_len; + /* We might insert padding before main_data. */ + size += (MAXIMUM_ALIGNOF - 1); + /* We might insert padding before each block's data. */ + size += (MAXIMUM_ALIGNOF - 1) * (XLR_MAX_BLOCK_ID + 1); + /* We might insert padding at the end. */ + size += (MAXIMUM_ALIGNOF - 1); + + return size; } /* - * Decode the previously read record. + * Decode a record. "decoded" must point to a MAXALIGNed memory area that has + * space for at least DecodeXLogRecordRequiredSpace(record) bytes. On + * success, decoded->size contains the actual space occupied by the decoded + * record, which may turn out to be less. + * + * Only decoded->oversized member must be initialized already, and will not be + * modified. Other members will be initialized as required. * * On error, a human-readable error message is returned in *errormsg, and * the return value is false. */ bool -DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg) +DecodeXLogRecord(XLogReaderState *state, + DecodedXLogRecord *decoded, + XLogRecord *record, + XLogRecPtr lsn, + char **errormsg) { /* * read next _size bytes from record buffer, but check for overrun first. @@ -1384,17 +1842,20 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg) } while(0) char *ptr; + char *out; uint32 remaining; uint32 datatotal; RelFileNode *rnode = NULL; uint8 block_id; - ResetDecoder(state); - - state->decoded_record = record; - state->record_origin = InvalidRepOriginId; - state->toplevel_xid = InvalidTransactionId; - + decoded->header = *record; + decoded->lsn = lsn; + decoded->next = NULL; + decoded->record_origin = InvalidRepOriginId; + decoded->toplevel_xid = InvalidTransactionId; + decoded->main_data = NULL; + decoded->main_data_len = 0; + decoded->max_block_id = -1; ptr = (char *) record; ptr += SizeOfXLogRecord; remaining = record->xl_tot_len - SizeOfXLogRecord; @@ -1412,7 +1873,7 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg) COPY_HEADER_FIELD(&main_data_len, sizeof(uint8)); - state->main_data_len = main_data_len; + decoded->main_data_len = main_data_len; datatotal += main_data_len; break; /* by convention, the main data fragment is * always last */ @@ -1423,18 +1884,18 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg) uint32 main_data_len; COPY_HEADER_FIELD(&main_data_len, sizeof(uint32)); - state->main_data_len = main_data_len; + decoded->main_data_len = main_data_len; datatotal += main_data_len; break; /* by convention, the main data fragment is * always last */ } else if (block_id == XLR_BLOCK_ID_ORIGIN) { - COPY_HEADER_FIELD(&state->record_origin, sizeof(RepOriginId)); + COPY_HEADER_FIELD(&decoded->record_origin, sizeof(RepOriginId)); } else if (block_id == XLR_BLOCK_ID_TOPLEVEL_XID) { - COPY_HEADER_FIELD(&state->toplevel_xid, sizeof(TransactionId)); + COPY_HEADER_FIELD(&decoded->toplevel_xid, sizeof(TransactionId)); } else if (block_id <= XLR_MAX_BLOCK_ID) { @@ -1442,7 +1903,11 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg) DecodedBkpBlock *blk; uint8 fork_flags; - if (block_id <= state->max_block_id) + /* mark any intervening block IDs as not in use */ + for (int i = decoded->max_block_id + 1; i < block_id; ++i) + decoded->blocks[i].in_use = false; + + if (block_id <= decoded->max_block_id) { report_invalid_record(state, "out-of-order block_id %u at %X/%X", @@ -1450,9 +1915,9 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg) LSN_FORMAT_ARGS(state->ReadRecPtr)); goto err; } - state->max_block_id = block_id; + decoded->max_block_id = block_id; - blk = &state->blocks[block_id]; + blk = &decoded->blocks[block_id]; blk->in_use = true; blk->apply_image = false; @@ -1596,17 +2061,18 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg) /* * Ok, we've parsed the fragment headers, and verified that the total * length of the payload in the fragments is equal to the amount of data - * left. Copy the data of each fragment to a separate buffer. - * - * We could just set up pointers into readRecordBuf, but we want to align - * the data for the convenience of the callers. Backup images are not - * copied, however; they don't need alignment. + * left. Copy the data of each fragment to contiguous space after the + * blocks array, inserting alignment padding before the data fragments so + * they can be cast to struct pointers by REDO routines. */ + out = ((char *) decoded) + + offsetof(DecodedXLogRecord, blocks) + + sizeof(decoded->blocks[0]) * (decoded->max_block_id + 1); /* block data first */ - for (block_id = 0; block_id <= state->max_block_id; block_id++) + for (block_id = 0; block_id <= decoded->max_block_id; block_id++) { - DecodedBkpBlock *blk = &state->blocks[block_id]; + DecodedBkpBlock *blk = &decoded->blocks[block_id]; if (!blk->in_use) continue; @@ -1615,58 +2081,37 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg) if (blk->has_image) { - blk->bkp_image = ptr; + /* no need to align image */ + blk->bkp_image = out; + memcpy(out, ptr, blk->bimg_len); ptr += blk->bimg_len; + out += blk->bimg_len; } if (blk->has_data) { - if (!blk->data || blk->data_len > blk->data_bufsz) - { - if (blk->data) - pfree(blk->data); - - /* - * Force the initial request to be BLCKSZ so that we don't - * waste time with lots of trips through this stanza as a - * result of WAL compression. - */ - blk->data_bufsz = MAXALIGN(Max(blk->data_len, BLCKSZ)); - blk->data = palloc(blk->data_bufsz); - } + out = (char *) MAXALIGN(out); + blk->data = out; memcpy(blk->data, ptr, blk->data_len); ptr += blk->data_len; + out += blk->data_len; } } /* and finally, the main data */ - if (state->main_data_len > 0) + if (decoded->main_data_len > 0) { - if (!state->main_data || state->main_data_len > state->main_data_bufsz) - { - if (state->main_data) - pfree(state->main_data); - - /* - * main_data_bufsz must be MAXALIGN'ed. In many xlog record - * types, we omit trailing struct padding on-disk to save a few - * bytes; but compilers may generate accesses to the xlog struct - * that assume that padding bytes are present. If the palloc - * request is not large enough to include such padding bytes then - * we'll get valgrind complaints due to otherwise-harmless fetches - * of the padding bytes. - * - * In addition, force the initial request to be reasonably large - * so that we don't waste time with lots of trips through this - * stanza. BLCKSZ / 2 seems like a good compromise choice. - */ - state->main_data_bufsz = MAXALIGN(Max(state->main_data_len, - BLCKSZ / 2)); - state->main_data = palloc(state->main_data_bufsz); - } - memcpy(state->main_data, ptr, state->main_data_len); - ptr += state->main_data_len; + out = (char *) MAXALIGN(out); + decoded->main_data = out; + memcpy(decoded->main_data, ptr, decoded->main_data_len); + ptr += decoded->main_data_len; + out += decoded->main_data_len; } + /* Report the actual size we used. */ + decoded->size = MAXALIGN(out - (char *) decoded); + Assert(DecodeXLogRecordRequiredSpace(record->xl_tot_len) >= + decoded->size); + return true; shortdata_err: @@ -1692,10 +2137,11 @@ XLogRecGetBlockTag(XLogReaderState *record, uint8 block_id, { DecodedBkpBlock *bkpb; - if (!record->blocks[block_id].in_use) + if (block_id > record->record->max_block_id || + !record->record->blocks[block_id].in_use) return false; - bkpb = &record->blocks[block_id]; + bkpb = &record->record->blocks[block_id]; if (rnode) *rnode = bkpb->rnode; if (forknum) @@ -1715,10 +2161,11 @@ XLogRecGetBlockData(XLogReaderState *record, uint8 block_id, Size *len) { DecodedBkpBlock *bkpb; - if (!record->blocks[block_id].in_use) + if (block_id > record->record->max_block_id || + !record->record->blocks[block_id].in_use) return NULL; - bkpb = &record->blocks[block_id]; + bkpb = &record->record->blocks[block_id]; if (!bkpb->has_data) { @@ -1746,12 +2193,13 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page) char *ptr; PGAlignedBlock tmp; - if (!record->blocks[block_id].in_use) + if (block_id > record->record->max_block_id || + !record->record->blocks[block_id].in_use) return false; - if (!record->blocks[block_id].has_image) + if (!record->record->blocks[block_id].has_image) return false; - bkpb = &record->blocks[block_id]; + bkpb = &record->record->blocks[block_id]; ptr = bkpb->bkp_image; if (bkpb->bimg_info & BKPIMAGE_IS_COMPRESSED) diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index e5de26dce54..eedd95cc137 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -350,7 +350,7 @@ XLogReadBufferForRedoExtended(XLogReaderState *record, * going to initialize it. And vice versa. */ zeromode = (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK); - willinit = (record->blocks[block_id].flags & BKPBLOCK_WILL_INIT) != 0; + willinit = (record->record->blocks[block_id].flags & BKPBLOCK_WILL_INIT) != 0; if (willinit && !zeromode) elog(PANIC, "block with WILL_INIT flag in WAL record must be zeroed by redo routine"); if (!willinit && zeromode) diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c index 9aab7136843..7924581cdcd 100644 --- a/src/backend/replication/logical/decode.c +++ b/src/backend/replication/logical/decode.c @@ -123,7 +123,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor { ReorderBufferAssignChild(ctx->reorder, txid, - record->decoded_record->xl_xid, + XLogRecGetXid(record), buf.origptr); } diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c index 79f71c0477f..81e186270a3 100644 --- a/src/bin/pg_rewind/parsexlog.c +++ b/src/bin/pg_rewind/parsexlog.c @@ -439,7 +439,7 @@ extractPageInfo(XLogReaderState *record) RmgrNames[rmid], info); } - for (block_id = 0; block_id <= record->max_block_id; block_id++) + for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++) { RelFileNode rnode; ForkNumber forknum; diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c index 5db389aa2d2..d4d6bb25a9f 100644 --- a/src/bin/pg_waldump/pg_waldump.c +++ b/src/bin/pg_waldump/pg_waldump.c @@ -397,10 +397,10 @@ XLogDumpRecordLen(XLogReaderState *record, uint32 *rec_len, uint32 *fpi_len) * add an accessor macro for this. */ *fpi_len = 0; - for (block_id = 0; block_id <= record->max_block_id; block_id++) + for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++) { if (XLogRecHasBlockImage(record, block_id)) - *fpi_len += record->blocks[block_id].bimg_len; + *fpi_len += record->record->blocks[block_id].bimg_len; } /* @@ -498,7 +498,7 @@ XLogDumpDisplayRecord(XLogDumpConfig *config, XLogReaderState *record) if (!config->bkp_details) { /* print block references (short format) */ - for (block_id = 0; block_id <= record->max_block_id; block_id++) + for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++) { if (!XLogRecHasBlockRef(record, block_id)) continue; @@ -529,7 +529,7 @@ XLogDumpDisplayRecord(XLogDumpConfig *config, XLogReaderState *record) { /* print block references (detailed format) */ putchar('\n'); - for (block_id = 0; block_id <= record->max_block_id; block_id++) + for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++) { if (!XLogRecHasBlockRef(record, block_id)) continue; @@ -542,26 +542,26 @@ XLogDumpDisplayRecord(XLogDumpConfig *config, XLogReaderState *record) blk); if (XLogRecHasBlockImage(record, block_id)) { - if (record->blocks[block_id].bimg_info & + if (record->record->blocks[block_id].bimg_info & BKPIMAGE_IS_COMPRESSED) { printf(" (FPW%s); hole: offset: %u, length: %u, " "compression saved: %u", XLogRecBlockImageApply(record, block_id) ? "" : " for WAL verification", - record->blocks[block_id].hole_offset, - record->blocks[block_id].hole_length, + record->record->blocks[block_id].hole_offset, + record->record->blocks[block_id].hole_length, BLCKSZ - - record->blocks[block_id].hole_length - - record->blocks[block_id].bimg_len); + record->record->blocks[block_id].hole_length - + record->record->blocks[block_id].bimg_len); } else { printf(" (FPW%s); hole: offset: %u, length: %u", XLogRecBlockImageApply(record, block_id) ? "" : " for WAL verification", - record->blocks[block_id].hole_offset, - record->blocks[block_id].hole_length); + record->record->blocks[block_id].hole_offset, + record->record->blocks[block_id].hole_length); } } putchar('\n'); diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index d27c0cd281c..010cbb59d6b 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -101,6 +101,7 @@ typedef enum XLogReadRecordResult { XLREAD_SUCCESS, /* record is successfully read */ XLREAD_NEED_DATA, /* need more data. see XLogReadRecord. */ + XLREAD_FULL, /* cannot hold more data while reading ahead */ XLREAD_FAIL /* failed during reading a record */ } XLogReadRecordResult; @@ -120,6 +121,30 @@ typedef enum XLogReadRecordState XLREAD_CONTINUATION } XLogReadRecordState; +/* + * The decoded contents of a record. This occupies a contiguous region of + * memory, with main_data and blocks[n].data pointing to memory after the + * members declared here. + */ +typedef struct DecodedXLogRecord +{ + /* Private member used for resource management. */ + size_t size; /* total size of decoded record */ + bool oversized; /* outside the regular decode buffer? */ + struct DecodedXLogRecord *next; /* decoded record queue link */ + + /* Public members. */ + XLogRecPtr lsn; /* location */ + XLogRecPtr next_lsn; /* location of next record */ + XLogRecord header; /* header */ + RepOriginId record_origin; + TransactionId toplevel_xid; /* XID of top-level transaction */ + char *main_data; /* record's main data portion */ + uint32 main_data_len; /* main data portion's length */ + int max_block_id; /* highest block_id in use (-1 if none) */ + DecodedBkpBlock blocks[FLEXIBLE_ARRAY_MEMBER]; +} DecodedXLogRecord; + struct XLogReaderState { /* @@ -142,10 +167,12 @@ struct XLogReaderState * Start and end point of last record read. EndRecPtr is also used as the * position to read next. Calling XLogBeginRead() sets EndRecPtr to the * starting position and ReadRecPtr to invalid. + * + * Start and end point of last record returned by XLogReadRecord(). These + * are also available as record->lsn and record->next_lsn. */ XLogRecPtr ReadRecPtr; /* start of last record read or being read */ XLogRecPtr EndRecPtr; /* end+1 of last record read */ - XLogRecPtr PrevRecPtr; /* start of previous record read */ /* ---------------------------------------- * Communication with page reader @@ -170,27 +197,43 @@ struct XLogReaderState * Use XLogRecGet* functions to investigate the record; these fields * should not be accessed directly. * ---------------------------------------- + * Start and end point of the last record read and decoded by + * XLogReadRecordInternal(). NextRecPtr is also used as the position to + * decode next. Calling XLogBeginRead() sets NextRecPtr and EndRecPtr to + * the requested starting position. */ - XLogRecord *decoded_record; /* currently decoded record */ - - char *main_data; /* record's main data portion */ - uint32 main_data_len; /* main data portion's length */ - uint32 main_data_bufsz; /* allocated size of the buffer */ - - RepOriginId record_origin; + XLogRecPtr DecodeRecPtr; /* start of last record decoded */ + XLogRecPtr NextRecPtr; /* end+1 of last record decoded */ + XLogRecPtr PrevRecPtr; /* start of previous record decoded */ - TransactionId toplevel_xid; /* XID of top-level transaction */ - - /* information about blocks referenced by the record. */ - DecodedBkpBlock blocks[XLR_MAX_BLOCK_ID + 1]; - - int max_block_id; /* highest block_id in use (-1 if none) */ + /* Last record returned by XLogReadRecord(). */ + DecodedXLogRecord *record; /* ---------------------------------------- * private/internal state * ---------------------------------------- */ + /* + * Buffer for decoded records. This is a circular buffer, though + * individual records can't be split in the middle, so some space is often + * wasted at the end. Oversized records that don't fit in this space are + * allocated separately. + */ + char *decode_buffer; + size_t decode_buffer_size; + bool free_decode_buffer; /* need to free? */ + char *decode_buffer_head; /* write head */ + char *decode_buffer_tail; /* read head */ + + /* + * Queue of records that have been decoded. This is a linked list that + * usually consists of consecutive records in decode_buffer, but may also + * contain oversized records allocated with palloc(). + */ + DecodedXLogRecord *decode_queue_head; /* newest decoded record */ + DecodedXLogRecord *decode_queue_tail; /* oldest decoded record */ + /* last read XLOG position for data currently in readBuf */ WALSegmentContext segcxt; WALOpenSegment seg; @@ -230,7 +273,7 @@ struct XLogReaderState uint32 readRecordBufSize; /* - * XLogReadRecord() state + * XLogReadRecordInternal() state */ XLogReadRecordState readRecordState; /* state machine state */ int recordGotLen; /* amount of current record that has already @@ -238,8 +281,11 @@ struct XLogReaderState int recordRemainLen; /* length of current record that remains */ XLogRecPtr recordContRecPtr; /* where the current record continues */ + DecodedXLogRecord *decoding; /* record currently being decoded */ + /* Buffer to hold error message */ char *errormsg_buf; + bool errormsg_deferred; }; struct XLogFindNextRecordState @@ -264,6 +310,11 @@ extern XLogReaderState *XLogReaderAllocate(int wal_segment_size, /* Free an XLogReader */ extern void XLogReaderFree(XLogReaderState *state); +/* Optionally provide a circular decoding buffer to allow readahead. */ +extern void XLogReaderSetDecodeBuffer(XLogReaderState *state, + void *buffer, + size_t size); + /* Position the XLogReader to given record */ extern void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr); #ifdef FRONTEND @@ -271,11 +322,21 @@ extern XLogFindNextRecordState *InitXLogFindNextRecord(XLogReaderState *reader_s extern bool XLogFindNextRecord(XLogFindNextRecordState *state); #endif /* FRONTEND */ -/* Read the next XLog record. Returns NULL on end-of-WAL or failure */ +/* Read the next record's header. Returns NULL on end-of-WAL or failure. */ extern XLogReadRecordResult XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg); +/* Read the next decoded record. Returns NULL on end-of-WAL or failure. */ +extern XLogReadRecordResult XLogNextRecord(XLogReaderState *state, + DecodedXLogRecord **record, + char **errormsg); + +/* Try to read ahead, if there is space in the decoding buffer. */ +extern XLogReadRecordResult XLogReadAhead(XLogReaderState *state, + DecodedXLogRecord **record, + char **errormsg); + /* Validate a page */ extern bool XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, char *phdr); @@ -300,25 +361,32 @@ extern bool WALRead(XLogReaderState *state, /* Functions for decoding an XLogRecord */ -extern bool DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, +extern size_t DecodeXLogRecordRequiredSpace(size_t xl_tot_len); +extern bool DecodeXLogRecord(XLogReaderState *state, + DecodedXLogRecord *decoded, + XLogRecord *record, + XLogRecPtr lsn, char **errmsg); -#define XLogRecGetTotalLen(decoder) ((decoder)->decoded_record->xl_tot_len) -#define XLogRecGetPrev(decoder) ((decoder)->decoded_record->xl_prev) -#define XLogRecGetInfo(decoder) ((decoder)->decoded_record->xl_info) -#define XLogRecGetRmid(decoder) ((decoder)->decoded_record->xl_rmid) -#define XLogRecGetXid(decoder) ((decoder)->decoded_record->xl_xid) -#define XLogRecGetOrigin(decoder) ((decoder)->record_origin) -#define XLogRecGetTopXid(decoder) ((decoder)->toplevel_xid) -#define XLogRecGetData(decoder) ((decoder)->main_data) -#define XLogRecGetDataLen(decoder) ((decoder)->main_data_len) -#define XLogRecHasAnyBlockRefs(decoder) ((decoder)->max_block_id >= 0) +#define XLogRecGetTotalLen(decoder) ((decoder)->record->header.xl_tot_len) +#define XLogRecGetPrev(decoder) ((decoder)->record->header.xl_prev) +#define XLogRecGetInfo(decoder) ((decoder)->record->header.xl_info) +#define XLogRecGetRmid(decoder) ((decoder)->record->header.xl_rmid) +#define XLogRecGetXid(decoder) ((decoder)->record->header.xl_xid) +#define XLogRecGetOrigin(decoder) ((decoder)->record->record_origin) +#define XLogRecGetTopXid(decoder) ((decoder)->record->toplevel_xid) +#define XLogRecGetData(decoder) ((decoder)->record->main_data) +#define XLogRecGetDataLen(decoder) ((decoder)->record->main_data_len) +#define XLogRecHasAnyBlockRefs(decoder) ((decoder)->record->max_block_id >= 0) +#define XLogRecMaxBlockId(decoder) ((decoder)->record->max_block_id) +#define XLogRecGetBlock(decoder, i) (&(decoder)->record->blocks[(i)]) #define XLogRecHasBlockRef(decoder, block_id) \ - ((decoder)->blocks[block_id].in_use) + ((decoder)->record->max_block_id >= (block_id)) && \ + ((decoder)->record->blocks[block_id].in_use) #define XLogRecHasBlockImage(decoder, block_id) \ - ((decoder)->blocks[block_id].has_image) + ((decoder)->record->blocks[block_id].has_image) #define XLogRecBlockImageApply(decoder, block_id) \ - ((decoder)->blocks[block_id].apply_image) + ((decoder)->record->blocks[block_id].apply_image) #ifndef FRONTEND extern FullTransactionId XLogRecGetFullXid(XLogReaderState *record); |