diff options
Diffstat (limited to 'src/include/access/xlogreader.h')
-rw-r--r-- | src/include/access/xlogreader.h | 154 |
1 files changed, 124 insertions, 30 deletions
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h index 477f0efe26a..f4388cc9be8 100644 --- a/src/include/access/xlogreader.h +++ b/src/include/access/xlogreader.h @@ -144,6 +144,30 @@ typedef struct uint16 data_bufsz; } DecodedBkpBlock; +/* + * The decoded contents of a record. This occupies a contiguous region of + * memory, with main_data and blocks[n].data pointing to memory after the + * members declared here. + */ +typedef struct DecodedXLogRecord +{ + /* Private member used for resource management. */ + size_t size; /* total size of decoded record */ + bool oversized; /* outside the regular decode buffer? */ + struct DecodedXLogRecord *next; /* decoded record queue link */ + + /* Public members. */ + XLogRecPtr lsn; /* location */ + XLogRecPtr next_lsn; /* location of next record */ + XLogRecord header; /* header */ + RepOriginId record_origin; + TransactionId toplevel_xid; /* XID of top-level transaction */ + char *main_data; /* record's main data portion */ + uint32 main_data_len; /* main data portion's length */ + int max_block_id; /* highest block_id in use (-1 if none) */ + DecodedBkpBlock blocks[FLEXIBLE_ARRAY_MEMBER]; +} DecodedXLogRecord; + struct XLogReaderState { /* @@ -171,6 +195,9 @@ struct XLogReaderState * Start and end point of last record read. EndRecPtr is also used as the * position to read next. Calling XLogBeginRead() sets EndRecPtr to the * starting position and ReadRecPtr to invalid. + * + * Start and end point of last record returned by XLogReadRecord(). These + * are also available as record->lsn and record->next_lsn. */ XLogRecPtr ReadRecPtr; /* start of last record read */ XLogRecPtr EndRecPtr; /* end+1 of last record read */ @@ -192,21 +219,17 @@ struct XLogReaderState * Use XLogRecGet* functions to investigate the record; these fields * should not be accessed directly. * ---------------------------------------- + * Start and end point of the last record read and decoded by + * XLogReadRecordInternal(). NextRecPtr is also used as the position to + * decode next. Calling XLogBeginRead() sets NextRecPtr and EndRecPtr to + * the requested starting position. */ - XLogRecord *decoded_record; /* currently decoded record */ + XLogRecPtr DecodeRecPtr; /* start of last record decoded */ + XLogRecPtr NextRecPtr; /* end+1 of last record decoded */ + XLogRecPtr PrevRecPtr; /* start of previous record decoded */ - char *main_data; /* record's main data portion */ - uint32 main_data_len; /* main data portion's length */ - uint32 main_data_bufsz; /* allocated size of the buffer */ - - RepOriginId record_origin; - - TransactionId toplevel_xid; /* XID of top-level transaction */ - - /* information about blocks referenced by the record. */ - DecodedBkpBlock blocks[XLR_MAX_BLOCK_ID + 1]; - - int max_block_id; /* highest block_id in use (-1 if none) */ + /* Last record returned by XLogReadRecord(). */ + DecodedXLogRecord *record; /* ---------------------------------------- * private/internal state @@ -214,6 +237,26 @@ struct XLogReaderState */ /* + * Buffer for decoded records. This is a circular buffer, though + * individual records can't be split in the middle, so some space is often + * wasted at the end. Oversized records that don't fit in this space are + * allocated separately. + */ + char *decode_buffer; + size_t decode_buffer_size; + bool free_decode_buffer; /* need to free? */ + char *decode_buffer_head; /* data is read from the head */ + char *decode_buffer_tail; /* new data is written at the tail */ + + /* + * Queue of records that have been decoded. This is a linked list that + * usually consists of consecutive records in decode_buffer, but may also + * contain oversized records allocated with palloc(). + */ + DecodedXLogRecord *decode_queue_head; /* oldest decoded record */ + DecodedXLogRecord *decode_queue_tail; /* newest decoded record */ + + /* * Buffer for currently read page (XLOG_BLCKSZ bytes, valid up to at least * readLen bytes) */ @@ -262,8 +305,24 @@ struct XLogReaderState /* Buffer to hold error message */ char *errormsg_buf; + bool errormsg_deferred; + + /* + * Flag to indicate to XLogPageReadCB that it should not block waiting for + * data. + */ + bool nonblocking; }; +/* + * Check if XLogNextRecord() has any more queued records or an error to return. + */ +static inline bool +XLogReaderHasQueuedRecordOrError(XLogReaderState *state) +{ + return (state->decode_queue_head != NULL) || state->errormsg_deferred; +} + /* Get a new XLogReader */ extern XLogReaderState *XLogReaderAllocate(int wal_segment_size, const char *waldir, @@ -274,16 +333,40 @@ extern XLogReaderRoutine *LocalXLogReaderRoutine(void); /* Free an XLogReader */ extern void XLogReaderFree(XLogReaderState *state); +/* Optionally provide a circular decoding buffer to allow readahead. */ +extern void XLogReaderSetDecodeBuffer(XLogReaderState *state, + void *buffer, + size_t size); + /* Position the XLogReader to given record */ extern void XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr); #ifdef FRONTEND extern XLogRecPtr XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr); #endif /* FRONTEND */ +/* Return values from XLogPageReadCB. */ +typedef enum XLogPageReadResult +{ + XLREAD_SUCCESS = 0, /* record is successfully read */ + XLREAD_FAIL = -1, /* failed during reading a record */ + XLREAD_WOULDBLOCK = -2 /* nonblocking mode only, no data */ +} XLogPageReadResult; + /* Read the next XLog record. Returns NULL on end-of-WAL or failure */ extern struct XLogRecord *XLogReadRecord(XLogReaderState *state, char **errormsg); +/* Consume the next record or error. */ +extern DecodedXLogRecord *XLogNextRecord(XLogReaderState *state, + char **errormsg); + +/* Release the previously returned record, if necessary. */ +extern void XLogReleasePreviousRecord(XLogReaderState *state); + +/* Try to read ahead, if there is data and space. */ +extern DecodedXLogRecord *XLogReadAhead(XLogReaderState *state, + bool nonblocking); + /* Validate a page */ extern bool XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, char *phdr); @@ -307,25 +390,36 @@ extern bool WALRead(XLogReaderState *state, /* Functions for decoding an XLogRecord */ -extern bool DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, +extern size_t DecodeXLogRecordRequiredSpace(size_t xl_tot_len); +extern bool DecodeXLogRecord(XLogReaderState *state, + DecodedXLogRecord *decoded, + XLogRecord *record, + XLogRecPtr lsn, char **errmsg); -#define XLogRecGetTotalLen(decoder) ((decoder)->decoded_record->xl_tot_len) -#define XLogRecGetPrev(decoder) ((decoder)->decoded_record->xl_prev) -#define XLogRecGetInfo(decoder) ((decoder)->decoded_record->xl_info) -#define XLogRecGetRmid(decoder) ((decoder)->decoded_record->xl_rmid) -#define XLogRecGetXid(decoder) ((decoder)->decoded_record->xl_xid) -#define XLogRecGetOrigin(decoder) ((decoder)->record_origin) -#define XLogRecGetTopXid(decoder) ((decoder)->toplevel_xid) -#define XLogRecGetData(decoder) ((decoder)->main_data) -#define XLogRecGetDataLen(decoder) ((decoder)->main_data_len) -#define XLogRecHasAnyBlockRefs(decoder) ((decoder)->max_block_id >= 0) -#define XLogRecHasBlockRef(decoder, block_id) \ - ((decoder)->blocks[block_id].in_use) -#define XLogRecHasBlockImage(decoder, block_id) \ - ((decoder)->blocks[block_id].has_image) -#define XLogRecBlockImageApply(decoder, block_id) \ - ((decoder)->blocks[block_id].apply_image) +/* + * Macros that provide access to parts of the record most recently returned by + * XLogReadRecord() or XLogNextRecord(). + */ +#define XLogRecGetTotalLen(decoder) ((decoder)->record->header.xl_tot_len) +#define XLogRecGetPrev(decoder) ((decoder)->record->header.xl_prev) +#define XLogRecGetInfo(decoder) ((decoder)->record->header.xl_info) +#define XLogRecGetRmid(decoder) ((decoder)->record->header.xl_rmid) +#define XLogRecGetXid(decoder) ((decoder)->record->header.xl_xid) +#define XLogRecGetOrigin(decoder) ((decoder)->record->record_origin) +#define XLogRecGetTopXid(decoder) ((decoder)->record->toplevel_xid) +#define XLogRecGetData(decoder) ((decoder)->record->main_data) +#define XLogRecGetDataLen(decoder) ((decoder)->record->main_data_len) +#define XLogRecHasAnyBlockRefs(decoder) ((decoder)->record->max_block_id >= 0) +#define XLogRecMaxBlockId(decoder) ((decoder)->record->max_block_id) +#define XLogRecGetBlock(decoder, i) (&(decoder)->record->blocks[(i)]) +#define XLogRecHasBlockRef(decoder, block_id) \ + (((decoder)->record->max_block_id >= (block_id)) && \ + ((decoder)->record->blocks[block_id].in_use)) +#define XLogRecHasBlockImage(decoder, block_id) \ + ((decoder)->record->blocks[block_id].has_image) +#define XLogRecBlockImageApply(decoder, block_id) \ + ((decoder)->record->blocks[block_id].apply_image) #ifndef FRONTEND extern FullTransactionId XLogRecGetFullXid(XLogReaderState *record); |