aboutsummaryrefslogtreecommitdiff
path: root/src/include/access/xlogreader.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/include/access/xlogreader.h')
-rw-r--r--src/include/access/xlogreader.h128
1 files changed, 98 insertions, 30 deletions
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index d27c0cd281c..010cbb59d6b 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -101,6 +101,7 @@ typedef enum XLogReadRecordResult
{
XLREAD_SUCCESS, /* record is successfully read */
XLREAD_NEED_DATA, /* need more data. see XLogReadRecord. */
+ XLREAD_FULL, /* cannot hold more data while reading ahead */
XLREAD_FAIL /* failed during reading a record */
} XLogReadRecordResult;
@@ -120,6 +121,30 @@ typedef enum XLogReadRecordState
XLREAD_CONTINUATION
} XLogReadRecordState;
+/*
+ * The decoded contents of a record. This occupies a contiguous region of
+ * memory, with main_data and blocks[n].data pointing to memory after the
+ * members declared here.
+ */
+typedef struct DecodedXLogRecord
+{
+ /* Private member used for resource management. */
+ size_t size; /* total size of decoded record */
+ bool oversized; /* outside the regular decode buffer? */
+ struct DecodedXLogRecord *next; /* decoded record queue link */
+
+ /* Public members. */
+ XLogRecPtr lsn; /* location */
+ XLogRecPtr next_lsn; /* location of next record */
+ XLogRecord header; /* header */
+ RepOriginId record_origin;
+ TransactionId toplevel_xid; /* XID of top-level transaction */
+ char *main_data; /* record's main data portion */
+ uint32 main_data_len; /* main data portion's length */
+ int max_block_id; /* highest block_id in use (-1 if none) */
+ DecodedBkpBlock blocks[FLEXIBLE_ARRAY_MEMBER];
+} DecodedXLogRecord;
+
struct XLogReaderState
{
/*
@@ -142,10 +167,12 @@ struct XLogReaderState
* Start and end point of last record read. EndRecPtr is also used as the
* position to read next. Calling XLogBeginRead() sets EndRecPtr to the
* starting position and ReadRecPtr to invalid.
+ *
+ * Start and end point of last record returned by XLogReadRecord(). These
+ * are also available as record->lsn and record->next_lsn.
*/
XLogRecPtr ReadRecPtr; /* start of last record read or being read */
XLogRecPtr EndRecPtr; /* end+1 of last record read */
- XLogRecPtr PrevRecPtr; /* start of previous record read */
/* ----------------------------------------
* Communication with page reader
@@ -170,27 +197,43 @@ struct XLogReaderState
* Use XLogRecGet* functions to investigate the record; these fields
* should not be accessed directly.
* ----------------------------------------
+ * Start and end point of the last record read and decoded by
+ * XLogReadRecordInternal(). NextRecPtr is also used as the position to
+ * decode next. Calling XLogBeginRead() sets NextRecPtr and EndRecPtr to
+ * the requested starting position.
*/
- XLogRecord *decoded_record; /* currently decoded record */
-
- char *main_data; /* record's main data portion */
- uint32 main_data_len; /* main data portion's length */
- uint32 main_data_bufsz; /* allocated size of the buffer */
-
- RepOriginId record_origin;
+ XLogRecPtr DecodeRecPtr; /* start of last record decoded */
+ XLogRecPtr NextRecPtr; /* end+1 of last record decoded */
+ XLogRecPtr PrevRecPtr; /* start of previous record decoded */
- TransactionId toplevel_xid; /* XID of top-level transaction */
-
- /* information about blocks referenced by the record. */
- DecodedBkpBlock blocks[XLR_MAX_BLOCK_ID + 1];
-
- int max_block_id; /* highest block_id in use (-1 if none) */
+ /* Last record returned by XLogReadRecord(). */
+ DecodedXLogRecord *record;
/* ----------------------------------------
* private/internal state
* ----------------------------------------
*/
+ /*
+ * Buffer for decoded records. This is a circular buffer, though
+ * individual records can't be split in the middle, so some space is often
+ * wasted at the end. Oversized records that don't fit in this space are
+ * allocated separately.
+ */
+ char *decode_buffer;
+ size_t decode_buffer_size;
+ bool free_decode_buffer; /* need to free? */
+ char *decode_buffer_head; /* write head */
+ char *decode_buffer_tail; /* read head */
+
+ /*
+ * Queue of records that have been decoded. This is a linked list that
+ * usually consists of consecutive records in decode_buffer, but may also
+ * contain oversized records allocated with palloc().
+ */
+ DecodedXLogRecord *decode_queue_head; /* newest decoded record */
+ DecodedXLogRecord *decode_queue_tail; /* oldest decoded record */
+
/* last read XLOG position for data currently in readBuf */
WALSegmentContext segcxt;
WALOpenSegment seg;
@@ -230,7 +273,7 @@ struct XLogReaderState
uint32 readRecordBufSize;
/*
- * XLogReadRecord() state
+ * XLogReadRecordInternal() state
*/
XLogReadRecordState readRecordState; /* state machine state */
int recordGotLen; /* amount of current record that has already
@@ -238,8 +281,11 @@ struct XLogReaderState
int recordRemainLen; /* length of current record that remains */
XLogRecPtr recordContRecPtr; /* where the current record continues */
+ DecodedXLogRecord *decoding; /* record currently being decoded */
+
/* Buffer to hold error message */
char *errormsg_buf;
+ bool errormsg_deferred;
};
struct XLogFindNextRecordState
@@ -264,6 +310,11 @@ extern XLogReaderState *XLogReaderAllocate(int wal_segment_size,
/* Free an XLogReader */
extern void XLogReaderFree(XLogReaderState *state);
+/* Optionally provide a circular decoding buffer to allow readahead. */
+extern void XLogReaderSetDecodeBuffer(XLogReaderState *state,
+ void *buffer,
+ size_t size);
+
/* Position the XLogReader to given record */
extern void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr);
#ifdef FRONTEND
@@ -271,11 +322,21 @@ extern XLogFindNextRecordState *InitXLogFindNextRecord(XLogReaderState *reader_s
extern bool XLogFindNextRecord(XLogFindNextRecordState *state);
#endif /* FRONTEND */
-/* Read the next XLog record. Returns NULL on end-of-WAL or failure */
+/* Read the next record's header. Returns NULL on end-of-WAL or failure. */
extern XLogReadRecordResult XLogReadRecord(XLogReaderState *state,
XLogRecord **record,
char **errormsg);
+/* Read the next decoded record. Returns NULL on end-of-WAL or failure. */
+extern XLogReadRecordResult XLogNextRecord(XLogReaderState *state,
+ DecodedXLogRecord **record,
+ char **errormsg);
+
+/* Try to read ahead, if there is space in the decoding buffer. */
+extern XLogReadRecordResult XLogReadAhead(XLogReaderState *state,
+ DecodedXLogRecord **record,
+ char **errormsg);
+
/* Validate a page */
extern bool XLogReaderValidatePageHeader(XLogReaderState *state,
XLogRecPtr recptr, char *phdr);
@@ -300,25 +361,32 @@ extern bool WALRead(XLogReaderState *state,
/* Functions for decoding an XLogRecord */
-extern bool DecodeXLogRecord(XLogReaderState *state, XLogRecord *record,
+extern size_t DecodeXLogRecordRequiredSpace(size_t xl_tot_len);
+extern bool DecodeXLogRecord(XLogReaderState *state,
+ DecodedXLogRecord *decoded,
+ XLogRecord *record,
+ XLogRecPtr lsn,
char **errmsg);
-#define XLogRecGetTotalLen(decoder) ((decoder)->decoded_record->xl_tot_len)
-#define XLogRecGetPrev(decoder) ((decoder)->decoded_record->xl_prev)
-#define XLogRecGetInfo(decoder) ((decoder)->decoded_record->xl_info)
-#define XLogRecGetRmid(decoder) ((decoder)->decoded_record->xl_rmid)
-#define XLogRecGetXid(decoder) ((decoder)->decoded_record->xl_xid)
-#define XLogRecGetOrigin(decoder) ((decoder)->record_origin)
-#define XLogRecGetTopXid(decoder) ((decoder)->toplevel_xid)
-#define XLogRecGetData(decoder) ((decoder)->main_data)
-#define XLogRecGetDataLen(decoder) ((decoder)->main_data_len)
-#define XLogRecHasAnyBlockRefs(decoder) ((decoder)->max_block_id >= 0)
+#define XLogRecGetTotalLen(decoder) ((decoder)->record->header.xl_tot_len)
+#define XLogRecGetPrev(decoder) ((decoder)->record->header.xl_prev)
+#define XLogRecGetInfo(decoder) ((decoder)->record->header.xl_info)
+#define XLogRecGetRmid(decoder) ((decoder)->record->header.xl_rmid)
+#define XLogRecGetXid(decoder) ((decoder)->record->header.xl_xid)
+#define XLogRecGetOrigin(decoder) ((decoder)->record->record_origin)
+#define XLogRecGetTopXid(decoder) ((decoder)->record->toplevel_xid)
+#define XLogRecGetData(decoder) ((decoder)->record->main_data)
+#define XLogRecGetDataLen(decoder) ((decoder)->record->main_data_len)
+#define XLogRecHasAnyBlockRefs(decoder) ((decoder)->record->max_block_id >= 0)
+#define XLogRecMaxBlockId(decoder) ((decoder)->record->max_block_id)
+#define XLogRecGetBlock(decoder, i) (&(decoder)->record->blocks[(i)])
#define XLogRecHasBlockRef(decoder, block_id) \
- ((decoder)->blocks[block_id].in_use)
+ ((decoder)->record->max_block_id >= (block_id)) && \
+ ((decoder)->record->blocks[block_id].in_use)
#define XLogRecHasBlockImage(decoder, block_id) \
- ((decoder)->blocks[block_id].has_image)
+ ((decoder)->record->blocks[block_id].has_image)
#define XLogRecBlockImageApply(decoder, block_id) \
- ((decoder)->blocks[block_id].apply_image)
+ ((decoder)->record->blocks[block_id].apply_image)
#ifndef FRONTEND
extern FullTransactionId XLogRecGetFullXid(XLogReaderState *record);