aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/access/transam/generic_xlog.c6
-rw-r--r--src/backend/access/transam/xlog.c28
-rw-r--r--src/backend/access/transam/xlogreader.c744
-rw-r--r--src/backend/access/transam/xlogutils.c2
-rw-r--r--src/backend/replication/logical/decode.c2
-rw-r--r--src/bin/pg_rewind/parsexlog.c2
-rw-r--r--src/bin/pg_waldump/pg_waldump.c22
-rw-r--r--src/include/access/xlogreader.h128
8 files changed, 734 insertions, 200 deletions
diff --git a/src/backend/access/transam/generic_xlog.c b/src/backend/access/transam/generic_xlog.c
index 63301a1ab16..0e9bcc71596 100644
--- a/src/backend/access/transam/generic_xlog.c
+++ b/src/backend/access/transam/generic_xlog.c
@@ -482,10 +482,10 @@ generic_redo(XLogReaderState *record)
uint8 block_id;
/* Protect limited size of buffers[] array */
- Assert(record->max_block_id < MAX_GENERIC_XLOG_PAGES);
+ Assert(XLogRecMaxBlockId(record) < MAX_GENERIC_XLOG_PAGES);
/* Iterate over blocks */
- for (block_id = 0; block_id <= record->max_block_id; block_id++)
+ for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++)
{
XLogRedoAction action;
@@ -525,7 +525,7 @@ generic_redo(XLogReaderState *record)
}
/* Changes are done: unlock and release all buffers */
- for (block_id = 0; block_id <= record->max_block_id; block_id++)
+ for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++)
{
if (BufferIsValid(buffers[block_id]))
UnlockReleaseBuffer(buffers[block_id]);
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 7faac01bf24..729fc5ff13c 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -1209,6 +1209,7 @@ XLogInsertRecord(XLogRecData *rdata,
StringInfoData recordBuf;
char *errormsg = NULL;
MemoryContext oldCxt;
+ DecodedXLogRecord *decoded;
oldCxt = MemoryContextSwitchTo(walDebugCxt);
@@ -1224,6 +1225,9 @@ XLogInsertRecord(XLogRecData *rdata,
for (; rdata != NULL; rdata = rdata->next)
appendBinaryStringInfo(&recordBuf, rdata->data, rdata->len);
+ /* How much space would it take to decode this record? */
+ decoded = palloc(DecodeXLogRecordRequiredSpace(recordBuf.len));
+
if (!debug_reader)
debug_reader = XLogReaderAllocate(wal_segment_size, NULL, NULL);
@@ -1231,7 +1235,9 @@ XLogInsertRecord(XLogRecData *rdata,
{
appendStringInfoString(&buf, "error decoding record: out of memory");
}
- else if (!DecodeXLogRecord(debug_reader, (XLogRecord *) recordBuf.data,
+ else if (!DecodeXLogRecord(debug_reader, decoded,
+ (XLogRecord *) recordBuf.data,
+ EndPos,
&errormsg))
{
appendStringInfo(&buf, "error decoding record: %s",
@@ -1240,10 +1246,17 @@ XLogInsertRecord(XLogRecData *rdata,
else
{
appendStringInfoString(&buf, " - ");
+ /*
+ * Temporarily make this decoded record the current record for
+ * XLogRecGetXXX() macros.
+ */
+ debug_reader->record = decoded;
xlog_outdesc(&buf, debug_reader);
+ debug_reader->record = NULL;
}
elog(LOG, "%s", buf.data);
+ pfree(decoded);
pfree(buf.data);
pfree(recordBuf.data);
MemoryContextSwitchTo(oldCxt);
@@ -1417,7 +1430,7 @@ checkXLogConsistency(XLogReaderState *record)
Assert((XLogRecGetInfo(record) & XLR_CHECK_CONSISTENCY) != 0);
- for (block_id = 0; block_id <= record->max_block_id; block_id++)
+ for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++)
{
Buffer buf;
Page page;
@@ -4383,6 +4396,7 @@ ReadRecord(XLogReaderState *xlogreader, int emode,
ReadRecPtr = xlogreader->ReadRecPtr;
EndRecPtr = xlogreader->EndRecPtr;
+
if (record == NULL)
{
if (readFile >= 0)
@@ -10300,7 +10314,7 @@ xlog_redo(XLogReaderState *record)
* XLOG_FPI and XLOG_FPI_FOR_HINT records, they use a different info
* code just to distinguish them for statistics purposes.
*/
- for (uint8 block_id = 0; block_id <= record->max_block_id; block_id++)
+ for (uint8 block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++)
{
Buffer buffer;
@@ -10435,7 +10449,7 @@ xlog_block_info(StringInfo buf, XLogReaderState *record)
int block_id;
/* decode block references */
- for (block_id = 0; block_id <= record->max_block_id; block_id++)
+ for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++)
{
RelFileNode rnode;
ForkNumber forknum;
@@ -12104,7 +12118,7 @@ XLogPageRead(XLogReaderState *state,
XLogRecPtr targetPagePtr = state->readPagePtr;
int reqLen = state->reqLen;
int readLen = 0;
- XLogRecPtr targetRecPtr = state->ReadRecPtr;
+ XLogRecPtr targetRecPtr = state->DecodeRecPtr;
uint32 targetPageOff;
XLogSegNo targetSegNo PG_USED_FOR_ASSERTS_ONLY;
int r;
@@ -12122,6 +12136,9 @@ XLogPageRead(XLogReaderState *state,
/*
* Request a restartpoint if we've replayed too much xlog since the
* last one.
+ *
+ * XXX Why is this here? Move it to recovery loop, since it's based
+ * on replay position, not read position?
*/
if (bgwriterLaunched)
{
@@ -12613,6 +12630,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
* be updated on each cycle. When we are behind,
* XLogReceiptTime will not advance, so the grace time
* allotted to conflicting queries will decrease.
+ *
*/
if (RecPtr < flushedUpto)
havedata = true;
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index 02257768ec8..f66592482a4 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -38,6 +38,9 @@ static void report_invalid_record(XLogReaderState *state, const char *fmt,...)
static bool allocate_recordbuf(XLogReaderState *state, uint32 reclength);
static bool XLogNeedData(XLogReaderState *state, XLogRecPtr pageptr,
int reqLen, bool header_inclusive);
+size_t DecodeXLogRecordRequiredSpace(size_t xl_tot_len);
+static XLogReadRecordResult XLogDecodeOneRecord(XLogReaderState *state,
+ bool allow_oversized);
static void XLogReaderInvalReadState(XLogReaderState *state);
static bool ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr,
XLogRecPtr PrevRecPtr, XLogRecord *record);
@@ -50,6 +53,8 @@ static void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt,
/* size of the buffer allocated for error message. */
#define MAX_ERRORMSG_LEN 1000
+#define DEFAULT_DECODE_BUFFER_SIZE 0x10000
+
/*
* Construct a string in state->errormsg_buf explaining what's wrong with
* the current record being read.
@@ -64,6 +69,8 @@ report_invalid_record(XLogReaderState *state, const char *fmt,...)
va_start(args, fmt);
vsnprintf(state->errormsg_buf, MAX_ERRORMSG_LEN, fmt, args);
va_end(args);
+
+ state->errormsg_deferred = true;
}
/*
@@ -86,8 +93,6 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir,
/* initialize caller-provided support functions */
state->cleanup_cb = cleanup_cb;
- state->max_block_id = -1;
-
/*
* Permanently allocate readBuf. We do it this way, rather than just
* making a static array, for two reasons: (1) no need to waste the
@@ -136,18 +141,11 @@ XLogReaderAllocate(int wal_segment_size, const char *waldir,
void
XLogReaderFree(XLogReaderState *state)
{
- int block_id;
-
if (state->seg.ws_file >= 0)
state->cleanup_cb(state);
- for (block_id = 0; block_id <= XLR_MAX_BLOCK_ID; block_id++)
- {
- if (state->blocks[block_id].data)
- pfree(state->blocks[block_id].data);
- }
- if (state->main_data)
- pfree(state->main_data);
+ if (state->decode_buffer && state->free_decode_buffer)
+ pfree(state->decode_buffer);
pfree(state->errormsg_buf);
if (state->readRecordBuf)
@@ -157,6 +155,22 @@ XLogReaderFree(XLogReaderState *state)
}
/*
+ * Set the size of the decoding buffer. A pointer to a caller supplied memory
+ * region may also be passed in, in which case non-oversized records will be
+ * decoded there.
+ */
+void
+XLogReaderSetDecodeBuffer(XLogReaderState *state, void *buffer, size_t size)
+{
+ Assert(state->decode_buffer == NULL);
+
+ state->decode_buffer = buffer;
+ state->decode_buffer_size = size;
+ state->decode_buffer_head = buffer;
+ state->decode_buffer_tail = buffer;
+}
+
+/*
* Allocate readRecordBuf to fit a record of at least the given length.
* Returns true if successful, false if out of memory.
*
@@ -243,22 +257,123 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
/* Begin at the passed-in record pointer. */
state->EndRecPtr = RecPtr;
+ state->NextRecPtr = RecPtr;
state->ReadRecPtr = InvalidXLogRecPtr;
+ state->DecodeRecPtr = InvalidXLogRecPtr;
state->readRecordState = XLREAD_NEXT_RECORD;
}
/*
- * Attempt to read an XLOG record.
- *
- * XLogBeginRead() or XLogFindNextRecord() must be called before the first call
- * to XLogReadRecord().
+ * See if we can release the last record that was returned by
+ * XLogReadRecord(), to free up space.
+ */
+static void
+XLogReleasePreviousRecord(XLogReaderState *state)
+{
+ DecodedXLogRecord *record;
+
+ /*
+ * Remove it from the decoded record queue. It must be the oldest
+ * item decoded, decode_queue_tail.
+ */
+ record = state->record;
+ Assert(record == state->decode_queue_tail);
+ state->record = NULL;
+ state->decode_queue_tail = record->next;
+
+ /* It might also be the newest item decoded, decode_queue_head. */
+ if (state->decode_queue_head == record)
+ state->decode_queue_head = NULL;
+
+ /* Release the space. */
+ if (unlikely(record->oversized))
+ {
+ /* It's not in the the decode buffer, so free it to release space. */
+ pfree(record);
+ }
+ else
+ {
+ /* It must be the tail record in the decode buffer. */
+ Assert(state->decode_buffer_tail == (char *) record);
+
+ /*
+ * We need to update tail to point to the next record that is in the
+ * decode buffer, if any, being careful to skip oversized ones
+ * (they're not in the decode buffer).
+ */
+ record = record->next;
+ while (unlikely(record && record->oversized))
+ record = record->next;
+
+ if (record)
+ {
+ /* Adjust tail to release space up to the next record. */
+ state->decode_buffer_tail = (char *) record;
+ }
+ else if (state->decoding && !state->decoding->oversized)
+ {
+ /*
+ * We're releasing the last fully decoded record in
+ * XLogReadRecord(), but some time earlier we partially decoded a
+ * record in XLogReadAhead() and were unable to complete the job.
+ * We'll set the buffer head and tail to point to the record we
+ * started working on, so that we can continue (perhaps from a
+ * different source).
+ */
+ state->decode_buffer_tail = (char *) state->decoding;
+ state->decode_buffer_head = (char *) state->decoding;
+ }
+ else
+ {
+ /*
+ * Otherwise we might as well just reset head and tail to the
+ * start of the buffer space, because we're empty. This means
+ * we'll keep overwriting the same piece of memory if we're not
+ * doing any prefetching.
+ */
+ state->decode_buffer_tail = state->decode_buffer;
+ state->decode_buffer_head = state->decode_buffer;
+ }
+ }
+}
+
+/*
+ * Similar to XLogNextRecord(), but this traditional interface is for code
+ * that just wants the header, not the decoded record. Callers can access the
+ * decoded record through the XLogRecGetXXX() macros.
+ */
+XLogReadRecordResult
+XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg)
+{
+ XLogReadRecordResult result;
+ DecodedXLogRecord *decoded;
+
+ /* Consume the next decoded record. */
+ result = XLogNextRecord(state, &decoded, errormsg);
+ if (result == XLREAD_SUCCESS)
+ {
+ /*
+ * The traditional interface just returns the header, not the decoded
+ * record. The caller will access the decoded record through the
+ * XLogRecGetXXX() macros.
+ */
+ *record = &decoded->header;
+ }
+ else
+ *record = NULL;
+ return result;
+}
+
+/*
+ * Consume the next record. XLogBeginRead() or XLogFindNextRecord() must be
+ * called before the first call to XLogNextRecord().
*
* This function may return XLREAD_NEED_DATA several times before returning a
* result record. The caller shall read in some new data then call this
* function again with the same parameters.
*
* When a record is successfully read, returns XLREAD_SUCCESS with result
- * record being stored in *record. Otherwise *record is NULL.
+ * record being stored in *record. Otherwise *record is set to NULL.
*
* Returns XLREAD_NEED_DATA if more data is needed to finish decoding the
* current record. In that case, state->readPagePtr and state->reqLen inform
@@ -269,11 +384,249 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
* length of data that is now available (which must be >= given reqLen),
* respectively.
*
- * If invalid data is encountered, returns XLREAD_FAIL and sets *record to
- * NULL. *errormsg is set to a string with details of the failure. The
+ * Returns XLREAD_FULL if allow_oversized is true, and no space is available.
+ * This is intended for readahead.
+ *
+ * If invalid data is encountered, returns XLREAD_FAIL with *record being set
+ * to NULL. *errormsg is set to a string with details of the failure. The
* returned pointer (or *errormsg) points to an internal buffer that's valid
* until the next call to XLogReadRecord.
*
+ */
+XLogReadRecordResult
+XLogNextRecord(XLogReaderState *state,
+ DecodedXLogRecord **record,
+ char **errormsg)
+{
+ /* Release the space occupied by the last record we returned. */
+ if (state->record)
+ XLogReleasePreviousRecord(state);
+
+ for (;;)
+ {
+ XLogReadRecordResult result;
+
+ /* We can now return the oldest item in the queue, if there is one. */
+ if (state->decode_queue_tail)
+ {
+ /*
+ * Record this as the most recent record returned, so that we'll
+ * release it next time. This also exposes it to the
+ * XLogRecXXX(decoder) macros, which pass in the decoder rather
+ * than the record for historical reasons.
+ */
+ state->record = state->decode_queue_tail;
+
+ /*
+ * It should be immediately after the last the record returned by
+ * XLogReadRecord(), or at the position set by XLogBeginRead() if
+ * XLogReadRecord() hasn't been called yet. It may be after a
+ * page header, though.
+ */
+ Assert(state->record->lsn == state->EndRecPtr ||
+ (state->EndRecPtr % XLOG_BLCKSZ == 0 &&
+ (state->record->lsn == state->EndRecPtr + SizeOfXLogShortPHD ||
+ state->record->lsn == state->EndRecPtr + SizeOfXLogLongPHD)));
+
+ /*
+ * Set ReadRecPtr and EndRecPtr to correspond to that
+ * record.
+ *
+ * Calling code could access these through the returned decoded
+ * record, but for now we'll update them directly here, for the
+ * benefit of all the existing code that accesses these variables
+ * directly.
+ */
+ state->ReadRecPtr = state->record->lsn;
+ state->EndRecPtr = state->record->next_lsn;
+
+ *errormsg = NULL;
+ *record = state->record;
+
+ return XLREAD_SUCCESS;
+ }
+ else if (state->errormsg_deferred)
+ {
+ /*
+ * If we've run out of records, but we have a deferred error, now
+ * is the time to report it.
+ */
+ state->errormsg_deferred = false;
+ if (state->errormsg_buf[0] != '\0')
+ *errormsg = state->errormsg_buf;
+ else
+ *errormsg = NULL;
+ *record = NULL;
+ state->EndRecPtr = state->DecodeRecPtr;
+
+ return XLREAD_FAIL;
+ }
+
+ /* We need to get a decoded record into our queue first. */
+ result = XLogDecodeOneRecord(state, true /* allow_oversized */ );
+ switch(result)
+ {
+ case XLREAD_NEED_DATA:
+ *errormsg = NULL;
+ *record = NULL;
+ return result;
+ case XLREAD_SUCCESS:
+ Assert(state->decode_queue_tail != NULL);
+ break;
+ case XLREAD_FULL:
+ /* Not expected because we passed allow_oversized = true */
+ Assert(false);
+ break;
+ case XLREAD_FAIL:
+ /*
+ * If that produced neither a queued record nor a queued error,
+ * then we're at the end (for example, archive recovery with no
+ * more files available).
+ */
+ Assert(state->decode_queue_tail == NULL);
+ if (!state->errormsg_deferred)
+ {
+ state->EndRecPtr = state->DecodeRecPtr;
+ *errormsg = NULL;
+ *record = NULL;
+ return result;
+ }
+ break;
+ }
+ }
+
+ /* unreachable */
+ return XLREAD_FAIL;
+}
+
+/*
+ * Try to decode the next available record. The next record will also be
+ * returned to XLogRecordRead().
+ *
+ * In addition to the values that XLogReadRecord() can return, XLogReadAhead()
+ * can also return XLREAD_FULL to indicate that further readahead is not
+ * possible yet due to lack of space.
+ */
+XLogReadRecordResult
+XLogReadAhead(XLogReaderState *state, DecodedXLogRecord **record, char **errormsg)
+{
+ XLogReadRecordResult result;
+
+ /* We stop trying after encountering an error. */
+ if (unlikely(state->errormsg_deferred))
+ {
+ /* We only report the error message the first time, see below. */
+ *errormsg = NULL;
+ return XLREAD_FAIL;
+ }
+
+ /*
+ * Try to decode one more record, if we have space. Pass allow_oversized
+ * = false, so that this call returns fast if the decode buffer is full.
+ */
+ result = XLogDecodeOneRecord(state, false);
+ switch (result)
+ {
+ case XLREAD_SUCCESS:
+ /* New record at head of decode record queue. */
+ Assert(state->decode_queue_head != NULL);
+ *record = state->decode_queue_head;
+ return result;
+ case XLREAD_FULL:
+ /* No space in circular decode buffer. */
+ return result;
+ case XLREAD_NEED_DATA:
+ /* The caller needs to insert more data. */
+ return result;
+ case XLREAD_FAIL:
+ /* Report the error. XLogReadRecord() will also report it. */
+ Assert(state->errormsg_deferred);
+ if (state->errormsg_buf[0] != '\0')
+ *errormsg = state->errormsg_buf;
+ return result;
+ }
+
+ /* Unreachable. */
+ return XLREAD_FAIL;
+}
+
+/*
+ * Allocate space for a decoded record. The only member of the returned
+ * object that is initialized is the 'oversized' flag, indicating that the
+ * decoded record wouldn't fit in the decode buffer and must eventually be
+ * freed explicitly.
+ *
+ * Return NULL if there is no space in the decode buffer and allow_oversized
+ * is false, or if memory allocation fails for an oversized buffer.
+ */
+static DecodedXLogRecord *
+XLogReadRecordAlloc(XLogReaderState *state, size_t xl_tot_len, bool allow_oversized)
+{
+ size_t required_space = DecodeXLogRecordRequiredSpace(xl_tot_len);
+ DecodedXLogRecord *decoded = NULL;
+
+ /* Allocate a circular decode buffer if we don't have one already. */
+ if (unlikely(state->decode_buffer == NULL))
+ {
+ if (state->decode_buffer_size == 0)
+ state->decode_buffer_size = DEFAULT_DECODE_BUFFER_SIZE;
+ state->decode_buffer = palloc(state->decode_buffer_size);
+ state->decode_buffer_head = state->decode_buffer;
+ state->decode_buffer_tail = state->decode_buffer;
+ state->free_decode_buffer = true;
+ }
+ if (state->decode_buffer_head >= state->decode_buffer_tail)
+ {
+ /* Empty, or head is to the right of tail. */
+ if (state->decode_buffer_head + required_space <=
+ state->decode_buffer + state->decode_buffer_size)
+ {
+ /* There is space between head and end. */
+ decoded = (DecodedXLogRecord *) state->decode_buffer_head;
+ decoded->oversized = false;
+ return decoded;
+ }
+ else if (state->decode_buffer + required_space <
+ state->decode_buffer_tail)
+ {
+ /* There is space between start and tail. */
+ decoded = (DecodedXLogRecord *) state->decode_buffer;
+ decoded->oversized = false;
+ return decoded;
+ }
+ }
+ else
+ {
+ /* Head is to the left of tail. */
+ if (state->decode_buffer_head + required_space <
+ state->decode_buffer_tail)
+ {
+ /* There is space between head and tail. */
+ decoded = (DecodedXLogRecord *) state->decode_buffer_head;
+ decoded->oversized = false;
+ return decoded;
+ }
+ }
+
+ /* Not enough space in the decode buffer. Are we allowed to allocate? */
+ if (allow_oversized)
+ {
+ decoded = palloc_extended(required_space, MCXT_ALLOC_NO_OOM);
+ if (decoded == NULL)
+ return NULL;
+ decoded->oversized = true;
+ return decoded;
+ }
+
+ return decoded;
+}
+
+/*
+ * Try to read and decode the next record and add it to the head of the
+ * decoded record queue. If 'allow_oversized' is false, then XLREAD_FULL can
+ * be returned to indicate the decoding buffer is full. XLogBeginRead() or
+ * XLogFindNextRecord() must be called before the first call to
+ * XLogReadRecord().
*
* This function runs a state machine consisting of the following states.
*
@@ -300,35 +653,35 @@ XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr)
* current state. This behavior allows us to continue reading a record
* after switching to a different source, during streaming replication.
*/
-XLogReadRecordResult
-XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg)
+static XLogReadRecordResult
+XLogDecodeOneRecord(XLogReaderState *state, bool allow_oversized)
{
+ XLogRecord *record;
+ char *errormsg; /* not used */
XLogRecord *prec;
- *record = NULL;
-
/* reset error state */
- *errormsg = NULL;
state->errormsg_buf[0] = '\0';
+ record = NULL;
switch (state->readRecordState)
{
case XLREAD_NEXT_RECORD:
- ResetDecoder(state);
+ Assert(!state->decoding);
- if (state->ReadRecPtr != InvalidXLogRecPtr)
+ if (state->DecodeRecPtr != InvalidXLogRecPtr)
{
/* read the record after the one we just read */
/*
- * EndRecPtr is pointing to end+1 of the previous WAL record.
+ * NextRecPtr is pointing to end+1 of the previous WAL record.
* If we're at a page boundary, no more records can fit on the
* current page. We must skip over the page header, but we
* can't do that until we've read in the page, since the
* header size is variable.
*/
- state->PrevRecPtr = state->ReadRecPtr;
- state->ReadRecPtr = state->EndRecPtr;
+ state->PrevRecPtr = state->DecodeRecPtr;
+ state->DecodeRecPtr = state->NextRecPtr;
}
else
{
@@ -338,8 +691,8 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg)
* In this case, EndRecPtr should already be pointing to a
* valid record starting position.
*/
- Assert(XRecOffIsValid(state->EndRecPtr));
- state->ReadRecPtr = state->EndRecPtr;
+ Assert(XRecOffIsValid(state->NextRecPtr));
+ state->DecodeRecPtr = state->NextRecPtr;
/*
* We cannot verify the previous-record pointer when we're
@@ -347,7 +700,6 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg)
* won't try doing that.
*/
state->PrevRecPtr = InvalidXLogRecPtr;
- state->EndRecPtr = InvalidXLogRecPtr; /* to be tidy */
}
state->record_verified = false;
@@ -362,9 +714,11 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg)
uint32 targetRecOff;
XLogPageHeader pageHeader;
+ Assert(!state->decoding);
+
targetPagePtr =
- state->ReadRecPtr - (state->ReadRecPtr % XLOG_BLCKSZ);
- targetRecOff = state->ReadRecPtr % XLOG_BLCKSZ;
+ state->DecodeRecPtr - (state->DecodeRecPtr % XLOG_BLCKSZ);
+ targetRecOff = state->DecodeRecPtr % XLOG_BLCKSZ;
/*
* Check if we have enough data. For the first record in the
@@ -385,13 +739,13 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg)
if (targetRecOff == 0)
{
/* At page start, so skip over page header. */
- state->ReadRecPtr += pageHeaderSize;
+ state->DecodeRecPtr += pageHeaderSize;
targetRecOff = pageHeaderSize;
}
else if (targetRecOff < pageHeaderSize)
{
report_invalid_record(state, "invalid record offset at %X/%X",
- LSN_FORMAT_ARGS(state->ReadRecPtr));
+ LSN_FORMAT_ARGS(state->DecodeRecPtr));
goto err;
}
@@ -400,8 +754,8 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg)
targetRecOff == pageHeaderSize)
{
report_invalid_record(state, "contrecord is requested by %X/%X",
- (uint32) (state->ReadRecPtr >> 32),
- (uint32) state->ReadRecPtr);
+ (uint32) (state->DecodeRecPtr >> 32),
+ (uint32) state->DecodeRecPtr);
goto err;
}
@@ -419,9 +773,26 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg)
* header.
*/
prec = (XLogRecord *) (state->readBuf +
- state->ReadRecPtr % XLOG_BLCKSZ);
+ state->DecodeRecPtr % XLOG_BLCKSZ);
total_len = prec->xl_tot_len;
+ /* Find space to decode this record. */
+ Assert(state->decoding == NULL);
+ state->decoding = XLogReadRecordAlloc(state, total_len,
+ allow_oversized);
+ if (state->decoding == NULL)
+ {
+ /*
+ * We couldn't get space. If allow_oversized was true,
+ * then palloc() must have failed. Otherwise, report that
+ * our decoding buffer is full. This means that weare
+ * trying to read too far ahead.
+ */
+ if (allow_oversized)
+ goto err;
+ return XLREAD_FULL;
+ }
+
/*
* If the whole record header is on this page, validate it
* immediately. Otherwise do just a basic sanity check on
@@ -433,7 +804,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg)
*/
if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord)
{
- if (!ValidXLogRecordHeader(state, state->ReadRecPtr,
+ if (!ValidXLogRecordHeader(state, state->DecodeRecPtr,
state->PrevRecPtr, prec))
goto err;
@@ -446,7 +817,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg)
{
report_invalid_record(state,
"invalid record length at %X/%X: wanted %u, got %u",
- LSN_FORMAT_ARGS(state->ReadRecPtr),
+ LSN_FORMAT_ARGS(state->DecodeRecPtr),
(uint32) SizeOfXLogRecord, total_len);
goto err;
}
@@ -471,13 +842,15 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg)
XLogRecPtr targetPagePtr;
uint32 targetRecOff;
+ Assert(state->decoding);
+
/*
* Wait for the rest of the record on the first page to become
* available
*/
targetPagePtr =
- state->ReadRecPtr - (state->ReadRecPtr % XLOG_BLCKSZ);
- targetRecOff = state->ReadRecPtr % XLOG_BLCKSZ;
+ state->DecodeRecPtr - (state->DecodeRecPtr % XLOG_BLCKSZ);
+ targetRecOff = state->DecodeRecPtr % XLOG_BLCKSZ;
request_len = Min(targetRecOff + total_len, XLOG_BLCKSZ);
record_len = request_len - targetRecOff;
@@ -496,7 +869,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg)
/* validate record header if not yet */
if (!state->record_verified && record_len >= SizeOfXLogRecord)
{
- if (!ValidXLogRecordHeader(state, state->ReadRecPtr,
+ if (!ValidXLogRecordHeader(state, state->DecodeRecPtr,
state->PrevRecPtr, prec))
goto err;
@@ -509,15 +882,15 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg)
/* Record does not cross a page boundary */
Assert(state->record_verified);
- if (!ValidXLogRecord(state, prec, state->ReadRecPtr))
+ if (!ValidXLogRecord(state, prec, state->DecodeRecPtr))
goto err;
state->record_verified = true; /* to be tidy */
/* We already checked the header earlier */
- state->EndRecPtr = state->ReadRecPtr + MAXALIGN(record_len);
+ state->NextRecPtr = state->DecodeRecPtr + MAXALIGN(record_len);
- *record = prec;
+ record = prec;
state->readRecordState = XLREAD_NEXT_RECORD;
break;
}
@@ -536,7 +909,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg)
report_invalid_record(state,
"record length %u at %X/%X too long",
total_len,
- LSN_FORMAT_ARGS(state->ReadRecPtr));
+ LSN_FORMAT_ARGS(state->DecodeRecPtr));
goto err;
}
@@ -547,7 +920,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg)
state->recordRemainLen -= record_len;
/* Calculate pointer to beginning of next page */
- state->recordContRecPtr = state->ReadRecPtr + record_len;
+ state->recordContRecPtr = state->DecodeRecPtr + record_len;
Assert(state->recordContRecPtr % XLOG_BLCKSZ == 0);
state->readRecordState = XLREAD_CONTINUATION;
@@ -564,6 +937,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg)
* we enter this state only if we haven't read the whole
* record.
*/
+ Assert(state->decoding);
Assert(state->recordRemainLen > 0);
while (state->recordRemainLen > 0)
@@ -583,7 +957,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg)
return XLREAD_NEED_DATA;
if (!state->page_verified)
- goto err;
+ goto err_continue;
Assert(SizeOfXLogShortPHD <= state->readLen);
@@ -596,8 +970,8 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg)
"there is no contrecord flag at %X/%X reading %X/%X",
(uint32) (state->recordContRecPtr >> 32),
(uint32) state->recordContRecPtr,
- (uint32) (state->ReadRecPtr >> 32),
- (uint32) state->ReadRecPtr);
+ (uint32) (state->DecodeRecPtr >> 32),
+ (uint32) state->DecodeRecPtr);
goto err;
}
@@ -614,8 +988,8 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg)
pageHeader->xlp_rem_len,
(uint32) (state->recordContRecPtr >> 32),
(uint32) state->recordContRecPtr,
- (uint32) (state->ReadRecPtr >> 32),
- (uint32) state->ReadRecPtr,
+ (uint32) (state->DecodeRecPtr >> 32),
+ (uint32) state->DecodeRecPtr,
state->recordRemainLen);
goto err;
}
@@ -651,7 +1025,7 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg)
if (!state->record_verified)
{
Assert(state->recordGotLen >= SizeOfXLogRecord);
- if (!ValidXLogRecordHeader(state, state->ReadRecPtr,
+ if (!ValidXLogRecordHeader(state, state->DecodeRecPtr,
state->PrevRecPtr,
(XLogRecord *) state->readRecordBuf))
goto err;
@@ -668,16 +1042,17 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg)
/* targetPagePtr is pointing the last-read page here */
prec = (XLogRecord *) state->readRecordBuf;
- if (!ValidXLogRecord(state, prec, state->ReadRecPtr))
+ if (!ValidXLogRecord(state, prec, state->DecodeRecPtr))
goto err;
pageHeaderSize =
XLogPageHeaderSize((XLogPageHeader) state->readBuf);
- state->EndRecPtr = targetPagePtr + pageHeaderSize
+ state->NextRecPtr = targetPagePtr + pageHeaderSize
+ MAXALIGN(pageHeader->xlp_rem_len);
- *record = prec;
+ record = prec;
state->readRecordState = XLREAD_NEXT_RECORD;
+
break;
}
}
@@ -685,32 +1060,65 @@ XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg)
/*
* Special processing if it's an XLOG SWITCH record
*/
- if ((*record)->xl_rmid == RM_XLOG_ID &&
- ((*record)->xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH)
+ if (record->xl_rmid == RM_XLOG_ID &&
+ (record->xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH)
{
/* Pretend it extends to end of segment */
- state->EndRecPtr += state->segcxt.ws_segsize - 1;
- state->EndRecPtr -= XLogSegmentOffset(state->EndRecPtr, state->segcxt.ws_segsize);
+ state->NextRecPtr += state->segcxt.ws_segsize - 1;
+ state->NextRecPtr -= XLogSegmentOffset(state->NextRecPtr, state->segcxt.ws_segsize);
}
- if (DecodeXLogRecord(state, *record, errormsg))
- return XLREAD_SUCCESS;
+ Assert(!record || state->readLen >= 0);
+ if (DecodeXLogRecord(state, state->decoding, record, state->DecodeRecPtr, &errormsg))
+ {
+ /* Record the location of the next record. */
+ state->decoding->next_lsn = state->NextRecPtr;
- *record = NULL;
- return XLREAD_FAIL;
+ /*
+ * If it's in the decode buffer (not an "oversized" record allocated
+ * with palloc()), mark the decode buffer space as occupied.
+ */
+ if (!state->decoding->oversized)
+ {
+ /* The new decode buffer head must be MAXALIGNed. */
+ Assert(state->decoding->size == MAXALIGN(state->decoding->size));
+ if ((char *) state->decoding == state->decode_buffer)
+ state->decode_buffer_head = state->decode_buffer +
+ state->decoding->size;
+ else
+ state->decode_buffer_head += state->decoding->size;
+ }
+
+ /* Insert it into the queue of decoded records. */
+ Assert(state->decode_queue_head != state->decoding);
+ if (state->decode_queue_head)
+ state->decode_queue_head->next = state->decoding;
+ state->decode_queue_head = state->decoding;
+ if (!state->decode_queue_tail)
+ state->decode_queue_tail = state->decoding;
+ state->decoding = NULL;
+
+ return XLREAD_SUCCESS;
+ }
err:
+ if (state->decoding && state->decoding->oversized)
+ pfree(state->decoding);
+ state->decoding = NULL;
+err_continue:
/*
* Invalidate the read page. We might read from a different source after
* failure.
*/
XLogReaderInvalReadState(state);
- if (state->errormsg_buf[0] != '\0')
- *errormsg = state->errormsg_buf;
+ /*
+ * If an error was written to errmsg_buf, it'll be returned to the caller
+ * of XLogReadRecord() after all successfully decoded records from the
+ * read queue.
+ */
- *record = NULL;
return XLREAD_FAIL;
}
@@ -1342,34 +1750,84 @@ WALRead(XLogReaderState *state,
* ----------------------------------------
*/
-/* private function to reset the state between records */
+/*
+ * Private function to reset the state, forgetting all decoded records, if we
+ * are asked to move to a new read position.
+ */
static void
ResetDecoder(XLogReaderState *state)
{
- int block_id;
-
- state->decoded_record = NULL;
+ DecodedXLogRecord *r;
- state->main_data_len = 0;
-
- for (block_id = 0; block_id <= state->max_block_id; block_id++)
+ /* Reset the decoded record queue, freeing any oversized records. */
+ while ((r = state->decode_queue_tail))
{
- state->blocks[block_id].in_use = false;
- state->blocks[block_id].has_image = false;
- state->blocks[block_id].has_data = false;
- state->blocks[block_id].apply_image = false;
+ state->decode_queue_tail = r->next;
+ if (r->oversized)
+ pfree(r);
}
- state->max_block_id = -1;
+ state->decode_queue_head = NULL;
+ state->decode_queue_tail = NULL;
+ state->record = NULL;
+ state->decoding = NULL;
+
+ /* Reset the decode buffer to empty. */
+ state->decode_buffer_head = state->decode_buffer;
+ state->decode_buffer_tail = state->decode_buffer;
+
+ /* Clear error state. */
+ state->errormsg_buf[0] = '\0';
+ state->errormsg_deferred = false;
+}
+
+/*
+ * Compute the maximum possible amount of padding that could be required to
+ * decode a record, given xl_tot_len from the record's header. This is the
+ * amount of output buffer space that we need to decode a record, though we
+ * might not finish up using it all.
+ *
+ * This computation is pessimistic and assumes the maximum possible number of
+ * blocks, due to lack of better information.
+ */
+size_t
+DecodeXLogRecordRequiredSpace(size_t xl_tot_len)
+{
+ size_t size = 0;
+
+ /* Account for the fixed size part of the decoded record struct. */
+ size += offsetof(DecodedXLogRecord, blocks[0]);
+ /* Account for the flexible blocks array of maximum possible size. */
+ size += sizeof(DecodedBkpBlock) * (XLR_MAX_BLOCK_ID + 1);
+ /* Account for all the raw main and block data. */
+ size += xl_tot_len;
+ /* We might insert padding before main_data. */
+ size += (MAXIMUM_ALIGNOF - 1);
+ /* We might insert padding before each block's data. */
+ size += (MAXIMUM_ALIGNOF - 1) * (XLR_MAX_BLOCK_ID + 1);
+ /* We might insert padding at the end. */
+ size += (MAXIMUM_ALIGNOF - 1);
+
+ return size;
}
/*
- * Decode the previously read record.
+ * Decode a record. "decoded" must point to a MAXALIGNed memory area that has
+ * space for at least DecodeXLogRecordRequiredSpace(record) bytes. On
+ * success, decoded->size contains the actual space occupied by the decoded
+ * record, which may turn out to be less.
+ *
+ * Only decoded->oversized member must be initialized already, and will not be
+ * modified. Other members will be initialized as required.
*
* On error, a human-readable error message is returned in *errormsg, and
* the return value is false.
*/
bool
-DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
+DecodeXLogRecord(XLogReaderState *state,
+ DecodedXLogRecord *decoded,
+ XLogRecord *record,
+ XLogRecPtr lsn,
+ char **errormsg)
{
/*
* read next _size bytes from record buffer, but check for overrun first.
@@ -1384,17 +1842,20 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
} while(0)
char *ptr;
+ char *out;
uint32 remaining;
uint32 datatotal;
RelFileNode *rnode = NULL;
uint8 block_id;
- ResetDecoder(state);
-
- state->decoded_record = record;
- state->record_origin = InvalidRepOriginId;
- state->toplevel_xid = InvalidTransactionId;
-
+ decoded->header = *record;
+ decoded->lsn = lsn;
+ decoded->next = NULL;
+ decoded->record_origin = InvalidRepOriginId;
+ decoded->toplevel_xid = InvalidTransactionId;
+ decoded->main_data = NULL;
+ decoded->main_data_len = 0;
+ decoded->max_block_id = -1;
ptr = (char *) record;
ptr += SizeOfXLogRecord;
remaining = record->xl_tot_len - SizeOfXLogRecord;
@@ -1412,7 +1873,7 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
COPY_HEADER_FIELD(&main_data_len, sizeof(uint8));
- state->main_data_len = main_data_len;
+ decoded->main_data_len = main_data_len;
datatotal += main_data_len;
break; /* by convention, the main data fragment is
* always last */
@@ -1423,18 +1884,18 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
uint32 main_data_len;
COPY_HEADER_FIELD(&main_data_len, sizeof(uint32));
- state->main_data_len = main_data_len;
+ decoded->main_data_len = main_data_len;
datatotal += main_data_len;
break; /* by convention, the main data fragment is
* always last */
}
else if (block_id == XLR_BLOCK_ID_ORIGIN)
{
- COPY_HEADER_FIELD(&state->record_origin, sizeof(RepOriginId));
+ COPY_HEADER_FIELD(&decoded->record_origin, sizeof(RepOriginId));
}
else if (block_id == XLR_BLOCK_ID_TOPLEVEL_XID)
{
- COPY_HEADER_FIELD(&state->toplevel_xid, sizeof(TransactionId));
+ COPY_HEADER_FIELD(&decoded->toplevel_xid, sizeof(TransactionId));
}
else if (block_id <= XLR_MAX_BLOCK_ID)
{
@@ -1442,7 +1903,11 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
DecodedBkpBlock *blk;
uint8 fork_flags;
- if (block_id <= state->max_block_id)
+ /* mark any intervening block IDs as not in use */
+ for (int i = decoded->max_block_id + 1; i < block_id; ++i)
+ decoded->blocks[i].in_use = false;
+
+ if (block_id <= decoded->max_block_id)
{
report_invalid_record(state,
"out-of-order block_id %u at %X/%X",
@@ -1450,9 +1915,9 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
LSN_FORMAT_ARGS(state->ReadRecPtr));
goto err;
}
- state->max_block_id = block_id;
+ decoded->max_block_id = block_id;
- blk = &state->blocks[block_id];
+ blk = &decoded->blocks[block_id];
blk->in_use = true;
blk->apply_image = false;
@@ -1596,17 +2061,18 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
/*
* Ok, we've parsed the fragment headers, and verified that the total
* length of the payload in the fragments is equal to the amount of data
- * left. Copy the data of each fragment to a separate buffer.
- *
- * We could just set up pointers into readRecordBuf, but we want to align
- * the data for the convenience of the callers. Backup images are not
- * copied, however; they don't need alignment.
+ * left. Copy the data of each fragment to contiguous space after the
+ * blocks array, inserting alignment padding before the data fragments so
+ * they can be cast to struct pointers by REDO routines.
*/
+ out = ((char *) decoded) +
+ offsetof(DecodedXLogRecord, blocks) +
+ sizeof(decoded->blocks[0]) * (decoded->max_block_id + 1);
/* block data first */
- for (block_id = 0; block_id <= state->max_block_id; block_id++)
+ for (block_id = 0; block_id <= decoded->max_block_id; block_id++)
{
- DecodedBkpBlock *blk = &state->blocks[block_id];
+ DecodedBkpBlock *blk = &decoded->blocks[block_id];
if (!blk->in_use)
continue;
@@ -1615,58 +2081,37 @@ DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg)
if (blk->has_image)
{
- blk->bkp_image = ptr;
+ /* no need to align image */
+ blk->bkp_image = out;
+ memcpy(out, ptr, blk->bimg_len);
ptr += blk->bimg_len;
+ out += blk->bimg_len;
}
if (blk->has_data)
{
- if (!blk->data || blk->data_len > blk->data_bufsz)
- {
- if (blk->data)
- pfree(blk->data);
-
- /*
- * Force the initial request to be BLCKSZ so that we don't
- * waste time with lots of trips through this stanza as a
- * result of WAL compression.
- */
- blk->data_bufsz = MAXALIGN(Max(blk->data_len, BLCKSZ));
- blk->data = palloc(blk->data_bufsz);
- }
+ out = (char *) MAXALIGN(out);
+ blk->data = out;
memcpy(blk->data, ptr, blk->data_len);
ptr += blk->data_len;
+ out += blk->data_len;
}
}
/* and finally, the main data */
- if (state->main_data_len > 0)
+ if (decoded->main_data_len > 0)
{
- if (!state->main_data || state->main_data_len > state->main_data_bufsz)
- {
- if (state->main_data)
- pfree(state->main_data);
-
- /*
- * main_data_bufsz must be MAXALIGN'ed. In many xlog record
- * types, we omit trailing struct padding on-disk to save a few
- * bytes; but compilers may generate accesses to the xlog struct
- * that assume that padding bytes are present. If the palloc
- * request is not large enough to include such padding bytes then
- * we'll get valgrind complaints due to otherwise-harmless fetches
- * of the padding bytes.
- *
- * In addition, force the initial request to be reasonably large
- * so that we don't waste time with lots of trips through this
- * stanza. BLCKSZ / 2 seems like a good compromise choice.
- */
- state->main_data_bufsz = MAXALIGN(Max(state->main_data_len,
- BLCKSZ / 2));
- state->main_data = palloc(state->main_data_bufsz);
- }
- memcpy(state->main_data, ptr, state->main_data_len);
- ptr += state->main_data_len;
+ out = (char *) MAXALIGN(out);
+ decoded->main_data = out;
+ memcpy(decoded->main_data, ptr, decoded->main_data_len);
+ ptr += decoded->main_data_len;
+ out += decoded->main_data_len;
}
+ /* Report the actual size we used. */
+ decoded->size = MAXALIGN(out - (char *) decoded);
+ Assert(DecodeXLogRecordRequiredSpace(record->xl_tot_len) >=
+ decoded->size);
+
return true;
shortdata_err:
@@ -1692,10 +2137,11 @@ XLogRecGetBlockTag(XLogReaderState *record, uint8 block_id,
{
DecodedBkpBlock *bkpb;
- if (!record->blocks[block_id].in_use)
+ if (block_id > record->record->max_block_id ||
+ !record->record->blocks[block_id].in_use)
return false;
- bkpb = &record->blocks[block_id];
+ bkpb = &record->record->blocks[block_id];
if (rnode)
*rnode = bkpb->rnode;
if (forknum)
@@ -1715,10 +2161,11 @@ XLogRecGetBlockData(XLogReaderState *record, uint8 block_id, Size *len)
{
DecodedBkpBlock *bkpb;
- if (!record->blocks[block_id].in_use)
+ if (block_id > record->record->max_block_id ||
+ !record->record->blocks[block_id].in_use)
return NULL;
- bkpb = &record->blocks[block_id];
+ bkpb = &record->record->blocks[block_id];
if (!bkpb->has_data)
{
@@ -1746,12 +2193,13 @@ RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page)
char *ptr;
PGAlignedBlock tmp;
- if (!record->blocks[block_id].in_use)
+ if (block_id > record->record->max_block_id ||
+ !record->record->blocks[block_id].in_use)
return false;
- if (!record->blocks[block_id].has_image)
+ if (!record->record->blocks[block_id].has_image)
return false;
- bkpb = &record->blocks[block_id];
+ bkpb = &record->record->blocks[block_id];
ptr = bkpb->bkp_image;
if (bkpb->bimg_info & BKPIMAGE_IS_COMPRESSED)
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index e5de26dce54..eedd95cc137 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -350,7 +350,7 @@ XLogReadBufferForRedoExtended(XLogReaderState *record,
* going to initialize it. And vice versa.
*/
zeromode = (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK);
- willinit = (record->blocks[block_id].flags & BKPBLOCK_WILL_INIT) != 0;
+ willinit = (record->record->blocks[block_id].flags & BKPBLOCK_WILL_INIT) != 0;
if (willinit && !zeromode)
elog(PANIC, "block with WILL_INIT flag in WAL record must be zeroed by redo routine");
if (!willinit && zeromode)
diff --git a/src/backend/replication/logical/decode.c b/src/backend/replication/logical/decode.c
index 9aab7136843..7924581cdcd 100644
--- a/src/backend/replication/logical/decode.c
+++ b/src/backend/replication/logical/decode.c
@@ -123,7 +123,7 @@ LogicalDecodingProcessRecord(LogicalDecodingContext *ctx, XLogReaderState *recor
{
ReorderBufferAssignChild(ctx->reorder,
txid,
- record->decoded_record->xl_xid,
+ XLogRecGetXid(record),
buf.origptr);
}
diff --git a/src/bin/pg_rewind/parsexlog.c b/src/bin/pg_rewind/parsexlog.c
index 79f71c0477f..81e186270a3 100644
--- a/src/bin/pg_rewind/parsexlog.c
+++ b/src/bin/pg_rewind/parsexlog.c
@@ -439,7 +439,7 @@ extractPageInfo(XLogReaderState *record)
RmgrNames[rmid], info);
}
- for (block_id = 0; block_id <= record->max_block_id; block_id++)
+ for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++)
{
RelFileNode rnode;
ForkNumber forknum;
diff --git a/src/bin/pg_waldump/pg_waldump.c b/src/bin/pg_waldump/pg_waldump.c
index 5db389aa2d2..d4d6bb25a9f 100644
--- a/src/bin/pg_waldump/pg_waldump.c
+++ b/src/bin/pg_waldump/pg_waldump.c
@@ -397,10 +397,10 @@ XLogDumpRecordLen(XLogReaderState *record, uint32 *rec_len, uint32 *fpi_len)
* add an accessor macro for this.
*/
*fpi_len = 0;
- for (block_id = 0; block_id <= record->max_block_id; block_id++)
+ for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++)
{
if (XLogRecHasBlockImage(record, block_id))
- *fpi_len += record->blocks[block_id].bimg_len;
+ *fpi_len += record->record->blocks[block_id].bimg_len;
}
/*
@@ -498,7 +498,7 @@ XLogDumpDisplayRecord(XLogDumpConfig *config, XLogReaderState *record)
if (!config->bkp_details)
{
/* print block references (short format) */
- for (block_id = 0; block_id <= record->max_block_id; block_id++)
+ for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++)
{
if (!XLogRecHasBlockRef(record, block_id))
continue;
@@ -529,7 +529,7 @@ XLogDumpDisplayRecord(XLogDumpConfig *config, XLogReaderState *record)
{
/* print block references (detailed format) */
putchar('\n');
- for (block_id = 0; block_id <= record->max_block_id; block_id++)
+ for (block_id = 0; block_id <= XLogRecMaxBlockId(record); block_id++)
{
if (!XLogRecHasBlockRef(record, block_id))
continue;
@@ -542,26 +542,26 @@ XLogDumpDisplayRecord(XLogDumpConfig *config, XLogReaderState *record)
blk);
if (XLogRecHasBlockImage(record, block_id))
{
- if (record->blocks[block_id].bimg_info &
+ if (record->record->blocks[block_id].bimg_info &
BKPIMAGE_IS_COMPRESSED)
{
printf(" (FPW%s); hole: offset: %u, length: %u, "
"compression saved: %u",
XLogRecBlockImageApply(record, block_id) ?
"" : " for WAL verification",
- record->blocks[block_id].hole_offset,
- record->blocks[block_id].hole_length,
+ record->record->blocks[block_id].hole_offset,
+ record->record->blocks[block_id].hole_length,
BLCKSZ -
- record->blocks[block_id].hole_length -
- record->blocks[block_id].bimg_len);
+ record->record->blocks[block_id].hole_length -
+ record->record->blocks[block_id].bimg_len);
}
else
{
printf(" (FPW%s); hole: offset: %u, length: %u",
XLogRecBlockImageApply(record, block_id) ?
"" : " for WAL verification",
- record->blocks[block_id].hole_offset,
- record->blocks[block_id].hole_length);
+ record->record->blocks[block_id].hole_offset,
+ record->record->blocks[block_id].hole_length);
}
}
putchar('\n');
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index d27c0cd281c..010cbb59d6b 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -101,6 +101,7 @@ typedef enum XLogReadRecordResult
{
XLREAD_SUCCESS, /* record is successfully read */
XLREAD_NEED_DATA, /* need more data. see XLogReadRecord. */
+ XLREAD_FULL, /* cannot hold more data while reading ahead */
XLREAD_FAIL /* failed during reading a record */
} XLogReadRecordResult;
@@ -120,6 +121,30 @@ typedef enum XLogReadRecordState
XLREAD_CONTINUATION
} XLogReadRecordState;
+/*
+ * The decoded contents of a record. This occupies a contiguous region of
+ * memory, with main_data and blocks[n].data pointing to memory after the
+ * members declared here.
+ */
+typedef struct DecodedXLogRecord
+{
+ /* Private member used for resource management. */
+ size_t size; /* total size of decoded record */
+ bool oversized; /* outside the regular decode buffer? */
+ struct DecodedXLogRecord *next; /* decoded record queue link */
+
+ /* Public members. */
+ XLogRecPtr lsn; /* location */
+ XLogRecPtr next_lsn; /* location of next record */
+ XLogRecord header; /* header */
+ RepOriginId record_origin;
+ TransactionId toplevel_xid; /* XID of top-level transaction */
+ char *main_data; /* record's main data portion */
+ uint32 main_data_len; /* main data portion's length */
+ int max_block_id; /* highest block_id in use (-1 if none) */
+ DecodedBkpBlock blocks[FLEXIBLE_ARRAY_MEMBER];
+} DecodedXLogRecord;
+
struct XLogReaderState
{
/*
@@ -142,10 +167,12 @@ struct XLogReaderState
* Start and end point of last record read. EndRecPtr is also used as the
* position to read next. Calling XLogBeginRead() sets EndRecPtr to the
* starting position and ReadRecPtr to invalid.
+ *
+ * Start and end point of last record returned by XLogReadRecord(). These
+ * are also available as record->lsn and record->next_lsn.
*/
XLogRecPtr ReadRecPtr; /* start of last record read or being read */
XLogRecPtr EndRecPtr; /* end+1 of last record read */
- XLogRecPtr PrevRecPtr; /* start of previous record read */
/* ----------------------------------------
* Communication with page reader
@@ -170,27 +197,43 @@ struct XLogReaderState
* Use XLogRecGet* functions to investigate the record; these fields
* should not be accessed directly.
* ----------------------------------------
+ * Start and end point of the last record read and decoded by
+ * XLogReadRecordInternal(). NextRecPtr is also used as the position to
+ * decode next. Calling XLogBeginRead() sets NextRecPtr and EndRecPtr to
+ * the requested starting position.
*/
- XLogRecord *decoded_record; /* currently decoded record */
-
- char *main_data; /* record's main data portion */
- uint32 main_data_len; /* main data portion's length */
- uint32 main_data_bufsz; /* allocated size of the buffer */
-
- RepOriginId record_origin;
+ XLogRecPtr DecodeRecPtr; /* start of last record decoded */
+ XLogRecPtr NextRecPtr; /* end+1 of last record decoded */
+ XLogRecPtr PrevRecPtr; /* start of previous record decoded */
- TransactionId toplevel_xid; /* XID of top-level transaction */
-
- /* information about blocks referenced by the record. */
- DecodedBkpBlock blocks[XLR_MAX_BLOCK_ID + 1];
-
- int max_block_id; /* highest block_id in use (-1 if none) */
+ /* Last record returned by XLogReadRecord(). */
+ DecodedXLogRecord *record;
/* ----------------------------------------
* private/internal state
* ----------------------------------------
*/
+ /*
+ * Buffer for decoded records. This is a circular buffer, though
+ * individual records can't be split in the middle, so some space is often
+ * wasted at the end. Oversized records that don't fit in this space are
+ * allocated separately.
+ */
+ char *decode_buffer;
+ size_t decode_buffer_size;
+ bool free_decode_buffer; /* need to free? */
+ char *decode_buffer_head; /* write head */
+ char *decode_buffer_tail; /* read head */
+
+ /*
+ * Queue of records that have been decoded. This is a linked list that
+ * usually consists of consecutive records in decode_buffer, but may also
+ * contain oversized records allocated with palloc().
+ */
+ DecodedXLogRecord *decode_queue_head; /* newest decoded record */
+ DecodedXLogRecord *decode_queue_tail; /* oldest decoded record */
+
/* last read XLOG position for data currently in readBuf */
WALSegmentContext segcxt;
WALOpenSegment seg;
@@ -230,7 +273,7 @@ struct XLogReaderState
uint32 readRecordBufSize;
/*
- * XLogReadRecord() state
+ * XLogReadRecordInternal() state
*/
XLogReadRecordState readRecordState; /* state machine state */
int recordGotLen; /* amount of current record that has already
@@ -238,8 +281,11 @@ struct XLogReaderState
int recordRemainLen; /* length of current record that remains */
XLogRecPtr recordContRecPtr; /* where the current record continues */
+ DecodedXLogRecord *decoding; /* record currently being decoded */
+
/* Buffer to hold error message */
char *errormsg_buf;
+ bool errormsg_deferred;
};
struct XLogFindNextRecordState
@@ -264,6 +310,11 @@ extern XLogReaderState *XLogReaderAllocate(int wal_segment_size,
/* Free an XLogReader */
extern void XLogReaderFree(XLogReaderState *state);
+/* Optionally provide a circular decoding buffer to allow readahead. */
+extern void XLogReaderSetDecodeBuffer(XLogReaderState *state,
+ void *buffer,
+ size_t size);
+
/* Position the XLogReader to given record */
extern void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr);
#ifdef FRONTEND
@@ -271,11 +322,21 @@ extern XLogFindNextRecordState *InitXLogFindNextRecord(XLogReaderState *reader_s
extern bool XLogFindNextRecord(XLogFindNextRecordState *state);
#endif /* FRONTEND */
-/* Read the next XLog record. Returns NULL on end-of-WAL or failure */
+/* Read the next record's header. Returns NULL on end-of-WAL or failure. */
extern XLogReadRecordResult XLogReadRecord(XLogReaderState *state,
XLogRecord **record,
char **errormsg);
+/* Read the next decoded record. Returns NULL on end-of-WAL or failure. */
+extern XLogReadRecordResult XLogNextRecord(XLogReaderState *state,
+ DecodedXLogRecord **record,
+ char **errormsg);
+
+/* Try to read ahead, if there is space in the decoding buffer. */
+extern XLogReadRecordResult XLogReadAhead(XLogReaderState *state,
+ DecodedXLogRecord **record,
+ char **errormsg);
+
/* Validate a page */
extern bool XLogReaderValidatePageHeader(XLogReaderState *state,
XLogRecPtr recptr, char *phdr);
@@ -300,25 +361,32 @@ extern bool WALRead(XLogReaderState *state,
/* Functions for decoding an XLogRecord */
-extern bool DecodeXLogRecord(XLogReaderState *state, XLogRecord *record,
+extern size_t DecodeXLogRecordRequiredSpace(size_t xl_tot_len);
+extern bool DecodeXLogRecord(XLogReaderState *state,
+ DecodedXLogRecord *decoded,
+ XLogRecord *record,
+ XLogRecPtr lsn,
char **errmsg);
-#define XLogRecGetTotalLen(decoder) ((decoder)->decoded_record->xl_tot_len)
-#define XLogRecGetPrev(decoder) ((decoder)->decoded_record->xl_prev)
-#define XLogRecGetInfo(decoder) ((decoder)->decoded_record->xl_info)
-#define XLogRecGetRmid(decoder) ((decoder)->decoded_record->xl_rmid)
-#define XLogRecGetXid(decoder) ((decoder)->decoded_record->xl_xid)
-#define XLogRecGetOrigin(decoder) ((decoder)->record_origin)
-#define XLogRecGetTopXid(decoder) ((decoder)->toplevel_xid)
-#define XLogRecGetData(decoder) ((decoder)->main_data)
-#define XLogRecGetDataLen(decoder) ((decoder)->main_data_len)
-#define XLogRecHasAnyBlockRefs(decoder) ((decoder)->max_block_id >= 0)
+#define XLogRecGetTotalLen(decoder) ((decoder)->record->header.xl_tot_len)
+#define XLogRecGetPrev(decoder) ((decoder)->record->header.xl_prev)
+#define XLogRecGetInfo(decoder) ((decoder)->record->header.xl_info)
+#define XLogRecGetRmid(decoder) ((decoder)->record->header.xl_rmid)
+#define XLogRecGetXid(decoder) ((decoder)->record->header.xl_xid)
+#define XLogRecGetOrigin(decoder) ((decoder)->record->record_origin)
+#define XLogRecGetTopXid(decoder) ((decoder)->record->toplevel_xid)
+#define XLogRecGetData(decoder) ((decoder)->record->main_data)
+#define XLogRecGetDataLen(decoder) ((decoder)->record->main_data_len)
+#define XLogRecHasAnyBlockRefs(decoder) ((decoder)->record->max_block_id >= 0)
+#define XLogRecMaxBlockId(decoder) ((decoder)->record->max_block_id)
+#define XLogRecGetBlock(decoder, i) (&(decoder)->record->blocks[(i)])
#define XLogRecHasBlockRef(decoder, block_id) \
- ((decoder)->blocks[block_id].in_use)
+ ((decoder)->record->max_block_id >= (block_id)) && \
+ ((decoder)->record->blocks[block_id].in_use)
#define XLogRecHasBlockImage(decoder, block_id) \
- ((decoder)->blocks[block_id].has_image)
+ ((decoder)->record->blocks[block_id].has_image)
#define XLogRecBlockImageApply(decoder, block_id) \
- ((decoder)->blocks[block_id].apply_image)
+ ((decoder)->record->blocks[block_id].apply_image)
#ifndef FRONTEND
extern FullTransactionId XLogRecGetFullXid(XLogReaderState *record);