aboutsummaryrefslogtreecommitdiff
path: root/src/include/access/xlogreader.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/include/access/xlogreader.h')
-rw-r--r--src/include/access/xlogreader.h154
1 files changed, 124 insertions, 30 deletions
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index 477f0efe26a..f4388cc9be8 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -144,6 +144,30 @@ typedef struct
uint16 data_bufsz;
} DecodedBkpBlock;
+/*
+ * The decoded contents of a record. This occupies a contiguous region of
+ * memory, with main_data and blocks[n].data pointing to memory after the
+ * members declared here.
+ */
+typedef struct DecodedXLogRecord
+{
+ /* Private member used for resource management. */
+ size_t size; /* total size of decoded record */
+ bool oversized; /* outside the regular decode buffer? */
+ struct DecodedXLogRecord *next; /* decoded record queue link */
+
+ /* Public members. */
+ XLogRecPtr lsn; /* location */
+ XLogRecPtr next_lsn; /* location of next record */
+ XLogRecord header; /* header */
+ RepOriginId record_origin;
+ TransactionId toplevel_xid; /* XID of top-level transaction */
+ char *main_data; /* record's main data portion */
+ uint32 main_data_len; /* main data portion's length */
+ int max_block_id; /* highest block_id in use (-1 if none) */
+ DecodedBkpBlock blocks[FLEXIBLE_ARRAY_MEMBER];
+} DecodedXLogRecord;
+
struct XLogReaderState
{
/*
@@ -171,6 +195,9 @@ struct XLogReaderState
* Start and end point of last record read. EndRecPtr is also used as the
* position to read next. Calling XLogBeginRead() sets EndRecPtr to the
* starting position and ReadRecPtr to invalid.
+ *
+ * Start and end point of last record returned by XLogReadRecord(). These
+ * are also available as record->lsn and record->next_lsn.
*/
XLogRecPtr ReadRecPtr; /* start of last record read */
XLogRecPtr EndRecPtr; /* end+1 of last record read */
@@ -192,21 +219,17 @@ struct XLogReaderState
* Use XLogRecGet* functions to investigate the record; these fields
* should not be accessed directly.
* ----------------------------------------
+ * Start and end point of the last record read and decoded by
+ * XLogReadRecordInternal(). NextRecPtr is also used as the position to
+ * decode next. Calling XLogBeginRead() sets NextRecPtr and EndRecPtr to
+ * the requested starting position.
*/
- XLogRecord *decoded_record; /* currently decoded record */
+ XLogRecPtr DecodeRecPtr; /* start of last record decoded */
+ XLogRecPtr NextRecPtr; /* end+1 of last record decoded */
+ XLogRecPtr PrevRecPtr; /* start of previous record decoded */
- char *main_data; /* record's main data portion */
- uint32 main_data_len; /* main data portion's length */
- uint32 main_data_bufsz; /* allocated size of the buffer */
-
- RepOriginId record_origin;
-
- TransactionId toplevel_xid; /* XID of top-level transaction */
-
- /* information about blocks referenced by the record. */
- DecodedBkpBlock blocks[XLR_MAX_BLOCK_ID + 1];
-
- int max_block_id; /* highest block_id in use (-1 if none) */
+ /* Last record returned by XLogReadRecord(). */
+ DecodedXLogRecord *record;
/* ----------------------------------------
* private/internal state
@@ -214,6 +237,26 @@ struct XLogReaderState
*/
/*
+ * Buffer for decoded records. This is a circular buffer, though
+ * individual records can't be split in the middle, so some space is often
+ * wasted at the end. Oversized records that don't fit in this space are
+ * allocated separately.
+ */
+ char *decode_buffer;
+ size_t decode_buffer_size;
+ bool free_decode_buffer; /* need to free? */
+ char *decode_buffer_head; /* data is read from the head */
+ char *decode_buffer_tail; /* new data is written at the tail */
+
+ /*
+ * Queue of records that have been decoded. This is a linked list that
+ * usually consists of consecutive records in decode_buffer, but may also
+ * contain oversized records allocated with palloc().
+ */
+ DecodedXLogRecord *decode_queue_head; /* oldest decoded record */
+ DecodedXLogRecord *decode_queue_tail; /* newest decoded record */
+
+ /*
* Buffer for currently read page (XLOG_BLCKSZ bytes, valid up to at least
* readLen bytes)
*/
@@ -262,8 +305,24 @@ struct XLogReaderState
/* Buffer to hold error message */
char *errormsg_buf;
+ bool errormsg_deferred;
+
+ /*
+ * Flag to indicate to XLogPageReadCB that it should not block waiting for
+ * data.
+ */
+ bool nonblocking;
};
+/*
+ * Check if XLogNextRecord() has any more queued records or an error to return.
+ */
+static inline bool
+XLogReaderHasQueuedRecordOrError(XLogReaderState *state)
+{
+ return (state->decode_queue_head != NULL) || state->errormsg_deferred;
+}
+
/* Get a new XLogReader */
extern XLogReaderState *XLogReaderAllocate(int wal_segment_size,
const char *waldir,
@@ -274,16 +333,40 @@ extern XLogReaderRoutine *LocalXLogReaderRoutine(void);
/* Free an XLogReader */
extern void XLogReaderFree(XLogReaderState *state);
+/* Optionally provide a circular decoding buffer to allow readahead. */
+extern void XLogReaderSetDecodeBuffer(XLogReaderState *state,
+ void *buffer,
+ size_t size);
+
/* Position the XLogReader to given record */
extern void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr);
#ifdef FRONTEND
extern XLogRecPtr XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr);
#endif /* FRONTEND */
+/* Return values from XLogPageReadCB. */
+typedef enum XLogPageReadResult
+{
+ XLREAD_SUCCESS = 0, /* record is successfully read */
+ XLREAD_FAIL = -1, /* failed during reading a record */
+ XLREAD_WOULDBLOCK = -2 /* nonblocking mode only, no data */
+} XLogPageReadResult;
+
/* Read the next XLog record. Returns NULL on end-of-WAL or failure */
extern struct XLogRecord *XLogReadRecord(XLogReaderState *state,
char **errormsg);
+/* Consume the next record or error. */
+extern DecodedXLogRecord *XLogNextRecord(XLogReaderState *state,
+ char **errormsg);
+
+/* Release the previously returned record, if necessary. */
+extern void XLogReleasePreviousRecord(XLogReaderState *state);
+
+/* Try to read ahead, if there is data and space. */
+extern DecodedXLogRecord *XLogReadAhead(XLogReaderState *state,
+ bool nonblocking);
+
/* Validate a page */
extern bool XLogReaderValidatePageHeader(XLogReaderState *state,
XLogRecPtr recptr, char *phdr);
@@ -307,25 +390,36 @@ extern bool WALRead(XLogReaderState *state,
/* Functions for decoding an XLogRecord */
-extern bool DecodeXLogRecord(XLogReaderState *state, XLogRecord *record,
+extern size_t DecodeXLogRecordRequiredSpace(size_t xl_tot_len);
+extern bool DecodeXLogRecord(XLogReaderState *state,
+ DecodedXLogRecord *decoded,
+ XLogRecord *record,
+ XLogRecPtr lsn,
char **errmsg);
-#define XLogRecGetTotalLen(decoder) ((decoder)->decoded_record->xl_tot_len)
-#define XLogRecGetPrev(decoder) ((decoder)->decoded_record->xl_prev)
-#define XLogRecGetInfo(decoder) ((decoder)->decoded_record->xl_info)
-#define XLogRecGetRmid(decoder) ((decoder)->decoded_record->xl_rmid)
-#define XLogRecGetXid(decoder) ((decoder)->decoded_record->xl_xid)
-#define XLogRecGetOrigin(decoder) ((decoder)->record_origin)
-#define XLogRecGetTopXid(decoder) ((decoder)->toplevel_xid)
-#define XLogRecGetData(decoder) ((decoder)->main_data)
-#define XLogRecGetDataLen(decoder) ((decoder)->main_data_len)
-#define XLogRecHasAnyBlockRefs(decoder) ((decoder)->max_block_id >= 0)
-#define XLogRecHasBlockRef(decoder, block_id) \
- ((decoder)->blocks[block_id].in_use)
-#define XLogRecHasBlockImage(decoder, block_id) \
- ((decoder)->blocks[block_id].has_image)
-#define XLogRecBlockImageApply(decoder, block_id) \
- ((decoder)->blocks[block_id].apply_image)
+/*
+ * Macros that provide access to parts of the record most recently returned by
+ * XLogReadRecord() or XLogNextRecord().
+ */
+#define XLogRecGetTotalLen(decoder) ((decoder)->record->header.xl_tot_len)
+#define XLogRecGetPrev(decoder) ((decoder)->record->header.xl_prev)
+#define XLogRecGetInfo(decoder) ((decoder)->record->header.xl_info)
+#define XLogRecGetRmid(decoder) ((decoder)->record->header.xl_rmid)
+#define XLogRecGetXid(decoder) ((decoder)->record->header.xl_xid)
+#define XLogRecGetOrigin(decoder) ((decoder)->record->record_origin)
+#define XLogRecGetTopXid(decoder) ((decoder)->record->toplevel_xid)
+#define XLogRecGetData(decoder) ((decoder)->record->main_data)
+#define XLogRecGetDataLen(decoder) ((decoder)->record->main_data_len)
+#define XLogRecHasAnyBlockRefs(decoder) ((decoder)->record->max_block_id >= 0)
+#define XLogRecMaxBlockId(decoder) ((decoder)->record->max_block_id)
+#define XLogRecGetBlock(decoder, i) (&(decoder)->record->blocks[(i)])
+#define XLogRecHasBlockRef(decoder, block_id) \
+ (((decoder)->record->max_block_id >= (block_id)) && \
+ ((decoder)->record->blocks[block_id].in_use))
+#define XLogRecHasBlockImage(decoder, block_id) \
+ ((decoder)->record->blocks[block_id].has_image)
+#define XLogRecBlockImageApply(decoder, block_id) \
+ ((decoder)->record->blocks[block_id].apply_image)
#ifndef FRONTEND
extern FullTransactionId XLogRecGetFullXid(XLogReaderState *record);