diff options
Diffstat (limited to 'src/include/access/xlogreader.h')
-rw-r--r-- | src/include/access/xlogreader.h | 128 |
1 files changed, 98 insertions, 30 deletions
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index d27c0cd281c..010cbb59d6b 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -101,6 +101,7 @@ typedef enum XLogReadRecordResult { XLREAD_SUCCESS, /* record is successfully read */ XLREAD_NEED_DATA, /* need more data. see XLogReadRecord. */ + XLREAD_FULL, /* cannot hold more data while reading ahead */ XLREAD_FAIL /* failed during reading a record */ } XLogReadRecordResult; @@ -120,6 +121,30 @@ typedef enum XLogReadRecordState XLREAD_CONTINUATION } XLogReadRecordState; +/* + * The decoded contents of a record. This occupies a contiguous region of + * memory, with main_data and blocks[n].data pointing to memory after the + * members declared here. + */ +typedef struct DecodedXLogRecord +{ + /* Private member used for resource management. */ + size_t size; /* total size of decoded record */ + bool oversized; /* outside the regular decode buffer? */ + struct DecodedXLogRecord *next; /* decoded record queue link */ + + /* Public members. */ + XLogRecPtr lsn; /* location */ + XLogRecPtr next_lsn; /* location of next record */ + XLogRecord header; /* header */ + RepOriginId record_origin; + TransactionId toplevel_xid; /* XID of top-level transaction */ + char *main_data; /* record's main data portion */ + uint32 main_data_len; /* main data portion's length */ + int max_block_id; /* highest block_id in use (-1 if none) */ + DecodedBkpBlock blocks[FLEXIBLE_ARRAY_MEMBER]; +} DecodedXLogRecord; + struct XLogReaderState { /* @@ -142,10 +167,12 @@ struct XLogReaderState * Start and end point of last record read. EndRecPtr is also used as the * position to read next. Calling XLogBeginRead() sets EndRecPtr to the * starting position and ReadRecPtr to invalid. + * + * Start and end point of last record returned by XLogReadRecord(). These + * are also available as record->lsn and record->next_lsn. */ XLogRecPtr ReadRecPtr; /* start of last record read or being read */ XLogRecPtr EndRecPtr; /* end+1 of last record read */ - XLogRecPtr PrevRecPtr; /* start of previous record read */ /* ---------------------------------------- * Communication with page reader @@ -170,27 +197,43 @@ struct XLogReaderState * Use XLogRecGet* functions to investigate the record; these fields * should not be accessed directly. * ---------------------------------------- + * Start and end point of the last record read and decoded by + * XLogReadRecordInternal(). NextRecPtr is also used as the position to + * decode next. Calling XLogBeginRead() sets NextRecPtr and EndRecPtr to + * the requested starting position. */ - XLogRecord *decoded_record; /* currently decoded record */ - - char *main_data; /* record's main data portion */ - uint32 main_data_len; /* main data portion's length */ - uint32 main_data_bufsz; /* allocated size of the buffer */ - - RepOriginId record_origin; + XLogRecPtr DecodeRecPtr; /* start of last record decoded */ + XLogRecPtr NextRecPtr; /* end+1 of last record decoded */ + XLogRecPtr PrevRecPtr; /* start of previous record decoded */ - TransactionId toplevel_xid; /* XID of top-level transaction */ - - /* information about blocks referenced by the record. */ - DecodedBkpBlock blocks[XLR_MAX_BLOCK_ID + 1]; - - int max_block_id; /* highest block_id in use (-1 if none) */ + /* Last record returned by XLogReadRecord(). */ + DecodedXLogRecord *record; /* ---------------------------------------- * private/internal state * ---------------------------------------- */ + /* + * Buffer for decoded records. This is a circular buffer, though + * individual records can't be split in the middle, so some space is often + * wasted at the end. Oversized records that don't fit in this space are + * allocated separately. + */ + char *decode_buffer; + size_t decode_buffer_size; + bool free_decode_buffer; /* need to free? */ + char *decode_buffer_head; /* write head */ + char *decode_buffer_tail; /* read head */ + + /* + * Queue of records that have been decoded. This is a linked list that + * usually consists of consecutive records in decode_buffer, but may also + * contain oversized records allocated with palloc(). + */ + DecodedXLogRecord *decode_queue_head; /* newest decoded record */ + DecodedXLogRecord *decode_queue_tail; /* oldest decoded record */ + /* last read XLOG position for data currently in readBuf */ WALSegmentContext segcxt; WALOpenSegment seg; @@ -230,7 +273,7 @@ struct XLogReaderState uint32 readRecordBufSize; /* - * XLogReadRecord() state + * XLogReadRecordInternal() state */ XLogReadRecordState readRecordState; /* state machine state */ int recordGotLen; /* amount of current record that has already @@ -238,8 +281,11 @@ struct XLogReaderState int recordRemainLen; /* length of current record that remains */ XLogRecPtr recordContRecPtr; /* where the current record continues */ + DecodedXLogRecord *decoding; /* record currently being decoded */ + /* Buffer to hold error message */ char *errormsg_buf; + bool errormsg_deferred; }; struct XLogFindNextRecordState @@ -264,6 +310,11 @@ extern XLogReaderState *XLogReaderAllocate(int wal_segment_size, /* Free an XLogReader */ extern void XLogReaderFree(XLogReaderState *state); +/* Optionally provide a circular decoding buffer to allow readahead. */ +extern void XLogReaderSetDecodeBuffer(XLogReaderState *state, + void *buffer, + size_t size); + /* Position the XLogReader to given record */ extern void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr); #ifdef FRONTEND @@ -271,11 +322,21 @@ extern XLogFindNextRecordState *InitXLogFindNextRecord(XLogReaderState *reader_s extern bool XLogFindNextRecord(XLogFindNextRecordState *state); #endif /* FRONTEND */ -/* Read the next XLog record. Returns NULL on end-of-WAL or failure */ +/* Read the next record's header. Returns NULL on end-of-WAL or failure. */ extern XLogReadRecordResult XLogReadRecord(XLogReaderState *state, XLogRecord **record, char **errormsg); +/* Read the next decoded record. Returns NULL on end-of-WAL or failure. */ +extern XLogReadRecordResult XLogNextRecord(XLogReaderState *state, + DecodedXLogRecord **record, + char **errormsg); + +/* Try to read ahead, if there is space in the decoding buffer. */ +extern XLogReadRecordResult XLogReadAhead(XLogReaderState *state, + DecodedXLogRecord **record, + char **errormsg); + /* Validate a page */ extern bool XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, char *phdr); @@ -300,25 +361,32 @@ extern bool WALRead(XLogReaderState *state, /* Functions for decoding an XLogRecord */ -extern bool DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, +extern size_t DecodeXLogRecordRequiredSpace(size_t xl_tot_len); +extern bool DecodeXLogRecord(XLogReaderState *state, + DecodedXLogRecord *decoded, + XLogRecord *record, + XLogRecPtr lsn, char **errmsg); -#define XLogRecGetTotalLen(decoder) ((decoder)->decoded_record->xl_tot_len) -#define XLogRecGetPrev(decoder) ((decoder)->decoded_record->xl_prev) -#define XLogRecGetInfo(decoder) ((decoder)->decoded_record->xl_info) -#define XLogRecGetRmid(decoder) ((decoder)->decoded_record->xl_rmid) -#define XLogRecGetXid(decoder) ((decoder)->decoded_record->xl_xid) -#define XLogRecGetOrigin(decoder) ((decoder)->record_origin) -#define XLogRecGetTopXid(decoder) ((decoder)->toplevel_xid) -#define XLogRecGetData(decoder) ((decoder)->main_data) -#define XLogRecGetDataLen(decoder) ((decoder)->main_data_len) -#define XLogRecHasAnyBlockRefs(decoder) ((decoder)->max_block_id >= 0) +#define XLogRecGetTotalLen(decoder) ((decoder)->record->header.xl_tot_len) +#define XLogRecGetPrev(decoder) ((decoder)->record->header.xl_prev) +#define XLogRecGetInfo(decoder) ((decoder)->record->header.xl_info) +#define XLogRecGetRmid(decoder) ((decoder)->record->header.xl_rmid) +#define XLogRecGetXid(decoder) ((decoder)->record->header.xl_xid) +#define XLogRecGetOrigin(decoder) ((decoder)->record->record_origin) +#define XLogRecGetTopXid(decoder) ((decoder)->record->toplevel_xid) +#define XLogRecGetData(decoder) ((decoder)->record->main_data) +#define XLogRecGetDataLen(decoder) ((decoder)->record->main_data_len) +#define XLogRecHasAnyBlockRefs(decoder) ((decoder)->record->max_block_id >= 0) +#define XLogRecMaxBlockId(decoder) ((decoder)->record->max_block_id) +#define XLogRecGetBlock(decoder, i) (&(decoder)->record->blocks[(i)]) #define XLogRecHasBlockRef(decoder, block_id) \ - ((decoder)->blocks[block_id].in_use) + ((decoder)->record->max_block_id >= (block_id)) && \ + ((decoder)->record->blocks[block_id].in_use) #define XLogRecHasBlockImage(decoder, block_id) \ - ((decoder)->blocks[block_id].has_image) + ((decoder)->record->blocks[block_id].has_image) #define XLogRecBlockImageApply(decoder, block_id) \ - ((decoder)->blocks[block_id].apply_image) + ((decoder)->record->blocks[block_id].apply_image) #ifndef FRONTEND extern FullTransactionId XLogRecGetFullXid(XLogReaderState *record); |