aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/transam/xlogreader.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/transam/xlogreader.c')
-rw-r--r--src/backend/access/transam/xlogreader.c123
1 files changed, 67 insertions, 56 deletions
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index c9f9f6e98f0..58654b746ca 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -192,6 +192,9 @@ XLogReaderFree(XLogReaderState *state)
* XLOG_BLCKSZ, and make sure it's at least 5*Max(BLCKSZ, XLOG_BLCKSZ) to start
* with. (That is enough for all "normal" records, but very large commit or
* abort records might need more space.)
+ *
+ * Note: This routine should *never* be called for xl_tot_len until the header
+ * of the record has been fully validated.
*/
static bool
allocate_recordbuf(XLogReaderState *state, uint32 reclength)
@@ -201,25 +204,6 @@ allocate_recordbuf(XLogReaderState *state, uint32 reclength)
newSize += XLOG_BLCKSZ - (newSize % XLOG_BLCKSZ);
newSize = Max(newSize, 5 * Max(BLCKSZ, XLOG_BLCKSZ));
-#ifndef FRONTEND
-
- /*
- * Note that in much unlucky circumstances, the random data read from a
- * recycled segment can cause this routine to be called with a size
- * causing a hard failure at allocation. For a standby, this would cause
- * the instance to stop suddenly with a hard failure, preventing it to
- * retry fetching WAL from one of its sources which could allow it to move
- * on with replay without a manual restart. If the data comes from a past
- * recycled segment and is still valid, then the allocation may succeed
- * but record checks are going to fail so this would be short-lived. If
- * the allocation fails because of a memory shortage, then this is not a
- * hard failure either per the guarantee given by MCXT_ALLOC_NO_OOM.
- */
- if (!AllocSizeIsValid(newSize))
- return false;
-
-#endif
-
if (state->readRecordBuf)
pfree(state->readRecordBuf);
state->readRecordBuf =
@@ -669,41 +653,26 @@ restart:
}
else
{
- /* XXX: more validation should be done here */
- if (total_len < SizeOfXLogRecord)
- {
- report_invalid_record(state,
- "invalid record length at %X/%X: expected at least %u, got %u",
- LSN_FORMAT_ARGS(RecPtr),
- (uint32) SizeOfXLogRecord, total_len);
- goto err;
- }
+ /* We'll validate the header once we have the next page. */
gotheader = false;
}
/*
- * Find space to decode this record. Don't allow oversized allocation if
- * the caller requested nonblocking. Otherwise, we *have* to try to
- * decode the record now because the caller has nothing else to do, so
- * allow an oversized record to be palloc'd if that turns out to be
- * necessary.
+ * Try to find space to decode this record, if we can do so without
+ * calling palloc. If we can't, we'll try again below after we've
+ * validated that total_len isn't garbage bytes from a recycled WAL page.
*/
decoded = XLogReadRecordAlloc(state,
total_len,
- !nonblocking /* allow_oversized */ );
- if (decoded == NULL)
+ false /* allow_oversized */ );
+ if (decoded == NULL && nonblocking)
{
/*
- * There is no space in the decode buffer. The caller should help
- * with that problem by consuming some records.
+ * There is no space in the circular decode buffer, and the caller is
+ * only reading ahead. The caller should consume existing records to
+ * make space.
*/
- if (nonblocking)
- return XLREAD_WOULDBLOCK;
-
- /* We failed to allocate memory for an oversized record. */
- report_invalid_record(state,
- "out of memory while trying to decode a record of length %u", total_len);
- goto err;
+ return XLREAD_WOULDBLOCK;
}
len = XLOG_BLCKSZ - RecPtr % XLOG_BLCKSZ;
@@ -718,16 +687,11 @@ restart:
assembled = true;
/*
- * Enlarge readRecordBuf as needed.
+ * We always have space for a couple of pages, enough to validate a
+ * boundary-spanning record header.
*/
- if (total_len > state->readRecordBufSize &&
- !allocate_recordbuf(state, total_len))
- {
- /* We treat this as a "bogus data" condition */
- report_invalid_record(state, "record length %u at %X/%X too long",
- total_len, LSN_FORMAT_ARGS(RecPtr));
- goto err;
- }
+ Assert(state->readRecordBufSize >= XLOG_BLCKSZ * 2);
+ Assert(state->readRecordBufSize >= len);
/* Copy the first fragment of the record from the first page. */
memcpy(state->readRecordBuf,
@@ -824,8 +788,35 @@ restart:
goto err;
gotheader = true;
}
- } while (gotlen < total_len);
+ /*
+ * We might need a bigger buffer. We have validated the record
+ * header, in the case that it split over a page boundary. We've
+ * also cross-checked total_len against xlp_rem_len on the second
+ * page, and verified xlp_pageaddr on both.
+ */
+ if (total_len > state->readRecordBufSize)
+ {
+ char save_copy[XLOG_BLCKSZ * 2];
+
+ /*
+ * Save and restore the data we already had. It can't be more
+ * than two pages.
+ */
+ Assert(gotlen <= lengthof(save_copy));
+ Assert(gotlen <= state->readRecordBufSize);
+ memcpy(save_copy, state->readRecordBuf, gotlen);
+ if (!allocate_recordbuf(state, total_len))
+ {
+ /* We treat this as a "bogus data" condition */
+ report_invalid_record(state, "record length %u at %X/%X too long",
+ total_len, LSN_FORMAT_ARGS(RecPtr));
+ goto err;
+ }
+ memcpy(state->readRecordBuf, save_copy, gotlen);
+ buffer = state->readRecordBuf + gotlen;
+ }
+ } while (gotlen < total_len);
Assert(gotheader);
record = (XLogRecord *) state->readRecordBuf;
@@ -867,6 +858,28 @@ restart:
state->NextRecPtr -= XLogSegmentOffset(state->NextRecPtr, state->segcxt.ws_segsize);
}
+ /*
+ * If we got here without a DecodedXLogRecord, it means we needed to
+ * validate total_len before trusting it, but by now now we've done that.
+ */
+ if (decoded == NULL)
+ {
+ Assert(!nonblocking);
+ decoded = XLogReadRecordAlloc(state,
+ total_len,
+ true /* allow_oversized */ );
+ if (decoded == NULL)
+ {
+ /*
+ * We failed to allocate memory for an oversized record. As
+ * above, we currently treat this as a "bogus data" condition.
+ */
+ report_invalid_record(state,
+ "out of memory while trying to decode a record of length %u", total_len);
+ goto err;
+ }
+ }
+
if (DecodeXLogRecord(state, decoded, record, RecPtr, &errormsg))
{
/* Record the location of the next record. */
@@ -895,8 +908,6 @@ restart:
state->decode_queue_head = decoded;
return XLREAD_SUCCESS;
}
- else
- return XLREAD_FAIL;
err:
if (assembled)