diff options
Diffstat (limited to 'src/backend/access/transam')
-rw-r--r-- | src/backend/access/transam/README | 249 | ||||
-rw-r--r-- | src/backend/access/transam/clog.c | 25 | ||||
-rw-r--r-- | src/backend/access/transam/multixact.c | 33 | ||||
-rw-r--r-- | src/backend/access/transam/twophase.c | 105 | ||||
-rw-r--r-- | src/backend/access/transam/xact.c | 129 | ||||
-rw-r--r-- | src/backend/access/transam/xlog.c | 348 | ||||
-rw-r--r-- | src/backend/access/transam/xloginsert.c | 972 | ||||
-rw-r--r-- | src/backend/access/transam/xlogreader.c | 486 | ||||
-rw-r--r-- | src/backend/access/transam/xlogutils.c | 237 |
9 files changed, 1480 insertions, 1104 deletions
diff --git a/src/backend/access/transam/README b/src/backend/access/transam/README index 92b12fbb6c2..ba6ae05d653 100644 --- a/src/backend/access/transam/README +++ b/src/backend/access/transam/README @@ -440,96 +440,164 @@ happen before the WAL record is inserted; see notes in SyncOneBuffer().) Note that marking a buffer dirty with MarkBufferDirty() should only happen iff you write a WAL record; see Writing Hints below. -5. If the relation requires WAL-logging, build a WAL log record and pass it -to XLogInsert(); then update the page's LSN using the returned XLOG -location. For instance, +5. If the relation requires WAL-logging, build a WAL record using +XLogBeginInsert and XLogRegister* functions, and insert it. (See +"Constructing a WAL record" below). Then update the page's LSN using the +returned XLOG location. For instance, - recptr = XLogInsert(rmgr_id, info, rdata); + XLogBeginInsert(); + XLogRegisterBuffer(...) + XLogRegisterData(...) + recptr = XLogInsert(rmgr_id, info); PageSetLSN(dp, recptr); - // Note that we no longer do PageSetTLI() from 9.3 onwards - // since that field on a page has now changed its meaning. 6. END_CRIT_SECTION() 7. Unlock and unpin the buffer(s). -XLogInsert's "rdata" argument is an array of pointer/size items identifying -chunks of data to be written in the XLOG record, plus optional shared-buffer -IDs for chunks that are in shared buffers rather than temporary variables. -The "rdata" array must mention (at least once) each of the shared buffers -being modified, unless the action is such that the WAL replay routine can -reconstruct the entire page contents. XLogInsert includes the logic that -tests to see whether a shared buffer has been modified since the last -checkpoint. If not, the entire page contents are logged rather than just the -portion(s) pointed to by "rdata". - -Because XLogInsert drops the rdata components associated with buffers it -chooses to log in full, the WAL replay routines normally need to test to see -which buffers were handled that way --- otherwise they may be misled about -what the XLOG record actually contains. XLOG records that describe multi-page -changes therefore require some care to design: you must be certain that you -know what data is indicated by each "BKP" bit. An example of the trickiness -is that in a HEAP_UPDATE record, BKP(0) normally is associated with the source -page and BKP(1) is associated with the destination page --- but if these are -the same page, only BKP(0) would have been set. - -For this reason as well as the risk of deadlocking on buffer locks, it's best -to design WAL records so that they reflect small atomic actions involving just -one or a few pages. The current XLOG infrastructure cannot handle WAL records -involving references to more than four shared buffers, anyway. - -In the case where the WAL record contains enough information to re-generate -the entire contents of a page, do *not* show that page's buffer ID in the -rdata array, even if some of the rdata items point into the buffer. This is -because you don't want XLogInsert to log the whole page contents. The -standard replay-routine pattern for this case is - - buffer = XLogReadBuffer(rnode, blkno, true); - Assert(BufferIsValid(buffer)); - page = (Page) BufferGetPage(buffer); - - ... initialize the page ... - - PageSetLSN(page, lsn); - MarkBufferDirty(buffer); - UnlockReleaseBuffer(buffer); - -In the case where the WAL record provides only enough information to -incrementally update the page, the rdata array *must* mention the buffer -ID at least once; otherwise there is no defense against torn-page problems. -The standard replay-routine pattern for this case is - - if (XLogReadBufferForRedo(lsn, record, N, rnode, blkno, &buffer) == BLK_NEEDS_REDO) - { - page = (Page) BufferGetPage(buffer); - - ... apply the change ... - - PageSetLSN(page, lsn); - MarkBufferDirty(buffer); - } - if (BufferIsValid(buffer)) - UnlockReleaseBuffer(buffer); - -XLogReadBufferForRedo reads the page from disk, and checks what action needs to -be taken to the page. If the XLR_BKP_BLOCK(N) flag is set, it restores the -full page image and returns BLK_RESTORED. If there is no full page image, but -page cannot be found or if the change has already been replayed (i.e. the -page's LSN >= the record we're replaying), it returns BLK_NOTFOUND or BLK_DONE, -respectively. Usually, the redo routine only needs to pay attention to the -BLK_NEEDS_REDO return code, which means that the routine should apply the -incremental change. In any case, the caller is responsible for unlocking and -releasing the buffer. Note that XLogReadBufferForRedo returns the buffer -locked even if no redo is required, unless the page does not exist. - -As noted above, for a multi-page update you need to be able to determine -which XLR_BKP_BLOCK(N) flag applies to each page. If a WAL record reflects -a combination of fully-rewritable and incremental updates, then the rewritable -pages don't count for the XLR_BKP_BLOCK(N) numbering. (XLR_BKP_BLOCK(N) is -associated with the N'th distinct buffer ID seen in the "rdata" array, and -per the above discussion, fully-rewritable buffers shouldn't be mentioned in -"rdata".) +Complex changes (such as a multilevel index insertion) normally need to be +described by a series of atomic-action WAL records. The intermediate states +must be self-consistent, so that if the replay is interrupted between any +two actions, the system is fully functional. In btree indexes, for example, +a page split requires a new page to be allocated, and an insertion of a new +key in the parent btree level, but for locking reasons this has to be +reflected by two separate WAL records. Replaying the first record, to +allocate the new page and move tuples to it, sets a flag on the page to +indicate that the key has not been inserted to the parent yet. Replaying the +second record clears the flag. This intermediate state is never seen by +other backends during normal operation, because the lock on the child page +is held across the two actions, but will be seen if the operation is +interrupted before writing the second WAL record. The search algorithm works +with the intermediate state as normal, but if an insertion encounters a page +with the incomplete-split flag set, it will finish the interrupted split by +inserting the key to the parent, before proceeding. + + +Constructing a WAL record +------------------------- + +A WAL record consists of a header common to all WAL record types, +record-specific data, and information about the data blocks modified. Each +modified data block is identified by an ID number, and can optionally have +more record-specific data associated with the block. If XLogInsert decides +that a full-page image of a block needs to be taken, the data associated +with that block is not included. + +The API for constructing a WAL record consists of five functions: +XLogBeginInsert, XLogRegisterBuffer, XLogRegisterData, XLogRegisterBufData, +and XLogInsert. First, call XLogBeginInsert(). Then register all the buffers +modified, and data needed to replay the changes, using XLogRegister* +functions. Finally, insert the constructed record to the WAL by calling +XLogInsert(). + + XLogBeginInsert(); + + /* register buffers modified as part of this WAL-logged action */ + XLogRegisterBuffer(0, lbuffer, REGBUF_STANDARD); + XLogRegisterBuffer(1, rbuffer, REGBUF_STANDARD); + + /* register data that is always included in the WAL record */ + XLogRegisterData(&xlrec, SizeOfFictionalAction); + + /* + * register data associated with a buffer. This will not be included + * in the record if a full-page image is taken. + */ + XLogRegisterBufData(0, tuple->data, tuple->len); + + /* more data associated with the buffer */ + XLogRegisterBufData(0, data2, len2); + + /* + * Ok, all the data and buffers to include in the WAL record have + * been registered. Insert the record. + */ + recptr = XLogInsert(RM_FOO_ID, XLOG_FOOBAR_DO_STUFF); + +Details of the API functions: + +void XLogBeginInsert(void) + + Must be called before XLogRegisterBuffer and XLogRegisterData. + +void XLogResetInsertion(void) + + Clear any currently registered data and buffers from the WAL record + construction workspace. This is only needed if you have already called + XLogBeginInsert(), but decide to not insert the record after all. + +void XLogEnsureRecordSpace(int max_block_id, int nrdatas) + + Normally, the WAL record construction buffers have the following limits: + + * highest block ID that can be used is 4 (allowing five block references) + * Max 20 chunks of registered data + + These default limits are enough for most record types that change some + on-disk structures. For the odd case that requires more data, or needs to + modify more buffers, these limits can be raised by calling + XLogEnsureRecordSpace(). XLogEnsureRecordSpace() must be called before + XLogBeginInsert(), and outside a critical section. + +void XLogRegisterBuffer(uint8 block_id, Buffer buf, uint8 flags); + + XLogRegisterBuffer adds information about a data block to the WAL record. + block_id is an arbitrary number used to identify this page reference in + the redo routine. The information needed to re-find the page at redo - + relfilenode, fork, and block number - are included in the WAL record. + + XLogInsert will automatically include a full copy of the page contents, if + this is the first modification of the buffer since the last checkpoint. + It is important to register every buffer modified by the action with + XLogRegisterBuffer, to avoid torn-page hazards. + + The flags control when and how the buffer contents are included in the + WAL record. Normally, a full-page image is taken only if the page has not + been modified since the last checkpoint, and only if full_page_writes=on + or an online backup is in progress. The REGBUF_FORCE_IMAGE flag can be + used to force a full-page image to always be included; that is useful + e.g. for an operation that rewrites most of the page, so that tracking the + details is not worth it. For the rare case where it is not necessary to + protect from torn pages, REGBUF_NO_IMAGE flag can be used to suppress + full page image from being taken. REGBUF_WILL_INIT also suppresses a full + page image, but the redo routine must re-generate the page from scratch, + without looking at the old page contents. Re-initializing the page + protects from torn page hazards like a full page image does. + + The REGBUF_STANDARD flag can be specified together with the other flags to + indicate that the page follows the standard page layout. It causes the + area between pd_lower and pd_upper to be left out from the image, reducing + WAL volume. + + If the REGBUF_KEEP_DATA flag is given, any per-buffer data registered with + XLogRegisterBufData() is included in the WAL record even if a full-page + image is taken. + +void XLogRegisterData(char *data, int len); + + XLogRegisterData is used to include arbitrary data in the WAL record. If + XLogRegisterData() is called multiple times, the data are appended, and + will be made available to the redo routine as one contiguous chunk. + +void XLogRegisterBufData(uint8 block_id, char *data, int len); + + XLogRegisterBufData is used to include data associated with a particular + buffer that was registered earlier with XLogRegisterBuffer(). If + XLogRegisterBufData() is called multiple times with the same block ID, the + data are appended, and will be made available to the redo routine as one + contiguous chunk. + + If a full-page image of the buffer is taken at insertion, the data is not + included in the WAL record, unless the REGBUF_KEEP_DATA flag is used. + + +Writing a REDO routine +---------------------- + +A REDO routine uses the data and page references included in the WAL record +to reconstruct the new state of the page. The record decoding functions +and macros in xlogreader.c/h can be used to extract the data from the record. When replaying a WAL record that describes changes on multiple pages, you must be careful to lock the pages properly to prevent concurrent Hot Standby @@ -545,23 +613,6 @@ either an exclusive buffer lock or a shared lock plus buffer header lock, or be writing the data block directly rather than through shared buffers while holding AccessExclusiveLock on the relation. -Due to all these constraints, complex changes (such as a multilevel index -insertion) normally need to be described by a series of atomic-action WAL -records. The intermediate states must be self-consistent, so that if the -replay is interrupted between any two actions, the system is fully -functional. In btree indexes, for example, a page split requires a new page -to be allocated, and an insertion of a new key in the parent btree level, -but for locking reasons this has to be reflected by two separate WAL -records. Replaying the first record, to allocate the new page and move -tuples to it, sets a flag on the page to indicate that the key has not been -inserted to the parent yet. Replaying the second record clears the flag. -This intermediate state is never seen by other backends during normal -operation, because the lock on the child page is held across the two -actions, but will be seen if the operation is interrupted before writing -the second WAL record. The search algorithm works with the intermediate -state as normal, but if an insertion encounters a page with the -incomplete-split flag set, it will finish the interrupted split by -inserting the key to the parent, before proceeding. Writing Hints ------------- diff --git a/src/backend/access/transam/clog.c b/src/backend/access/transam/clog.c index 5ee070bd0a9..313bd042404 100644 --- a/src/backend/access/transam/clog.c +++ b/src/backend/access/transam/clog.c @@ -699,13 +699,9 @@ CLOGPagePrecedes(int page1, int page2) static void WriteZeroPageXlogRec(int pageno) { - XLogRecData rdata; - - rdata.data = (char *) (&pageno); - rdata.len = sizeof(int); - rdata.buffer = InvalidBuffer; - rdata.next = NULL; - (void) XLogInsert(RM_CLOG_ID, CLOG_ZEROPAGE, &rdata); + XLogBeginInsert(); + XLogRegisterData((char *) (&pageno), sizeof(int)); + (void) XLogInsert(RM_CLOG_ID, CLOG_ZEROPAGE); } /* @@ -717,14 +713,11 @@ WriteZeroPageXlogRec(int pageno) static void WriteTruncateXlogRec(int pageno) { - XLogRecData rdata; XLogRecPtr recptr; - rdata.data = (char *) (&pageno); - rdata.len = sizeof(int); - rdata.buffer = InvalidBuffer; - rdata.next = NULL; - recptr = XLogInsert(RM_CLOG_ID, CLOG_TRUNCATE, &rdata); + XLogBeginInsert(); + XLogRegisterData((char *) (&pageno), sizeof(int)); + recptr = XLogInsert(RM_CLOG_ID, CLOG_TRUNCATE); XLogFlush(recptr); } @@ -732,12 +725,12 @@ WriteTruncateXlogRec(int pageno) * CLOG resource manager's routines */ void -clog_redo(XLogRecPtr lsn, XLogRecord *record) +clog_redo(XLogReaderState *record) { - uint8 info = record->xl_info & ~XLR_INFO_MASK; + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; /* Backup blocks are not used in clog records */ - Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK)); + Assert(!XLogRecHasAnyBlockRefs(record)); if (info == CLOG_ZEROPAGE) { diff --git a/src/backend/access/transam/multixact.c b/src/backend/access/transam/multixact.c index 3c20bb37e4c..fff9f837330 100644 --- a/src/backend/access/transam/multixact.c +++ b/src/backend/access/transam/multixact.c @@ -720,7 +720,6 @@ MultiXactIdCreateFromMembers(int nmembers, MultiXactMember *members) { MultiXactId multi; MultiXactOffset offset; - XLogRecData rdata[2]; xl_multixact_create xlrec; debug_elog3(DEBUG2, "Create: %s", @@ -796,17 +795,11 @@ MultiXactIdCreateFromMembers(int nmembers, MultiXactMember *members) * the status flags in one XLogRecData, then all the xids in another one? * Not clear that it's worth the trouble though. */ - rdata[0].data = (char *) (&xlrec); - rdata[0].len = SizeOfMultiXactCreate; - rdata[0].buffer = InvalidBuffer; - rdata[0].next = &(rdata[1]); + XLogBeginInsert(); + XLogRegisterData((char *) (&xlrec), SizeOfMultiXactCreate); + XLogRegisterData((char *) members, nmembers * sizeof(MultiXactMember)); - rdata[1].data = (char *) members; - rdata[1].len = nmembers * sizeof(MultiXactMember); - rdata[1].buffer = InvalidBuffer; - rdata[1].next = NULL; - - (void) XLogInsert(RM_MULTIXACT_ID, XLOG_MULTIXACT_CREATE_ID, rdata); + (void) XLogInsert(RM_MULTIXACT_ID, XLOG_MULTIXACT_CREATE_ID); /* Now enter the information into the OFFSETs and MEMBERs logs */ RecordNewMultiXact(multi, offset, nmembers, members); @@ -2705,25 +2698,21 @@ MultiXactOffsetPrecedes(MultiXactOffset offset1, MultiXactOffset offset2) static void WriteMZeroPageXlogRec(int pageno, uint8 info) { - XLogRecData rdata; - - rdata.data = (char *) (&pageno); - rdata.len = sizeof(int); - rdata.buffer = InvalidBuffer; - rdata.next = NULL; - (void) XLogInsert(RM_MULTIXACT_ID, info, &rdata); + XLogBeginInsert(); + XLogRegisterData((char *) (&pageno), sizeof(int)); + (void) XLogInsert(RM_MULTIXACT_ID, info); } /* * MULTIXACT resource manager's routines */ void -multixact_redo(XLogRecPtr lsn, XLogRecord *record) +multixact_redo(XLogReaderState *record) { - uint8 info = record->xl_info & ~XLR_INFO_MASK; + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; /* Backup blocks are not used in multixact records */ - Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK)); + Assert(!XLogRecHasAnyBlockRefs(record)); if (info == XLOG_MULTIXACT_ZERO_OFF_PAGE) { @@ -2775,7 +2764,7 @@ multixact_redo(XLogRecPtr lsn, XLogRecord *record) * should be unnecessary, since any XID found here ought to have other * evidence in the XLOG, but let's be safe. */ - max_xid = record->xl_xid; + max_xid = XLogRecGetXid(record); for (i = 0; i < xlrec->nmembers; i++) { if (TransactionIdPrecedes(max_xid, xlrec->members[i].xid)) diff --git a/src/backend/access/transam/twophase.c b/src/backend/access/transam/twophase.c index d23c292edcd..40de84e934e 100644 --- a/src/backend/access/transam/twophase.c +++ b/src/backend/access/transam/twophase.c @@ -889,14 +889,21 @@ typedef struct TwoPhaseRecordOnDisk /* * During prepare, the state file is assembled in memory before writing it - * to WAL and the actual state file. We use a chain of XLogRecData blocks - * so that we will be able to pass the state file contents directly to - * XLogInsert. + * to WAL and the actual state file. We use a chain of StateFileChunk blocks + * for that. */ +typedef struct StateFileChunk +{ + char *data; + uint32 len; + struct StateFileChunk *next; +} StateFileChunk; + static struct xllist { - XLogRecData *head; /* first data block in the chain */ - XLogRecData *tail; /* last block in chain */ + StateFileChunk *head; /* first data block in the chain */ + StateFileChunk *tail; /* last block in chain */ + uint32 num_chunks; uint32 bytes_free; /* free bytes left in tail block */ uint32 total_len; /* total data bytes in chain */ } records; @@ -917,11 +924,11 @@ save_state_data(const void *data, uint32 len) if (padlen > records.bytes_free) { - records.tail->next = palloc0(sizeof(XLogRecData)); + records.tail->next = palloc0(sizeof(StateFileChunk)); records.tail = records.tail->next; - records.tail->buffer = InvalidBuffer; records.tail->len = 0; records.tail->next = NULL; + records.num_chunks++; records.bytes_free = Max(padlen, 512); records.tail->data = palloc(records.bytes_free); @@ -951,8 +958,7 @@ StartPrepare(GlobalTransaction gxact) SharedInvalidationMessage *invalmsgs; /* Initialize linked list */ - records.head = palloc0(sizeof(XLogRecData)); - records.head->buffer = InvalidBuffer; + records.head = palloc0(sizeof(StateFileChunk)); records.head->len = 0; records.head->next = NULL; @@ -960,6 +966,7 @@ StartPrepare(GlobalTransaction gxact) records.head->data = palloc(records.bytes_free); records.tail = records.head; + records.num_chunks = 1; records.total_len = 0; @@ -1019,7 +1026,7 @@ EndPrepare(GlobalTransaction gxact) TransactionId xid = pgxact->xid; TwoPhaseFileHeader *hdr; char path[MAXPGPATH]; - XLogRecData *record; + StateFileChunk *record; pg_crc32 statefile_crc; pg_crc32 bogus_crc; int fd; @@ -1117,12 +1124,16 @@ EndPrepare(GlobalTransaction gxact) * We save the PREPARE record's location in the gxact for later use by * CheckPointTwoPhase. */ + XLogEnsureRecordSpace(0, records.num_chunks); + START_CRIT_SECTION(); MyPgXact->delayChkpt = true; - gxact->prepare_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE, - records.head); + XLogBeginInsert(); + for (record = records.head; record != NULL; record = record->next) + XLogRegisterData(record->data, record->len); + gxact->prepare_lsn = XLogInsert(RM_XACT_ID, XLOG_XACT_PREPARE); XLogFlush(gxact->prepare_lsn); /* If we crash now, we have prepared: WAL replay will fix things */ @@ -1180,6 +1191,7 @@ EndPrepare(GlobalTransaction gxact) SyncRepWaitForLSN(gxact->prepare_lsn); records.tail = records.head = NULL; + records.num_chunks = 0; } /* @@ -2071,8 +2083,6 @@ RecordTransactionCommitPrepared(TransactionId xid, SharedInvalidationMessage *invalmsgs, bool initfileinval) { - XLogRecData rdata[4]; - int lastrdata = 0; xl_xact_commit_prepared xlrec; XLogRecPtr recptr; @@ -2094,39 +2104,24 @@ RecordTransactionCommitPrepared(TransactionId xid, xlrec.crec.nsubxacts = nchildren; xlrec.crec.nmsgs = ninvalmsgs; - rdata[0].data = (char *) (&xlrec); - rdata[0].len = MinSizeOfXactCommitPrepared; - rdata[0].buffer = InvalidBuffer; + XLogBeginInsert(); + XLogRegisterData((char *) (&xlrec), MinSizeOfXactCommitPrepared); + /* dump rels to delete */ if (nrels > 0) - { - rdata[0].next = &(rdata[1]); - rdata[1].data = (char *) rels; - rdata[1].len = nrels * sizeof(RelFileNode); - rdata[1].buffer = InvalidBuffer; - lastrdata = 1; - } + XLogRegisterData((char *) rels, nrels * sizeof(RelFileNode)); + /* dump committed child Xids */ if (nchildren > 0) - { - rdata[lastrdata].next = &(rdata[2]); - rdata[2].data = (char *) children; - rdata[2].len = nchildren * sizeof(TransactionId); - rdata[2].buffer = InvalidBuffer; - lastrdata = 2; - } + XLogRegisterData((char *) children, + nchildren * sizeof(TransactionId)); + /* dump cache invalidation messages */ if (ninvalmsgs > 0) - { - rdata[lastrdata].next = &(rdata[3]); - rdata[3].data = (char *) invalmsgs; - rdata[3].len = ninvalmsgs * sizeof(SharedInvalidationMessage); - rdata[3].buffer = InvalidBuffer; - lastrdata = 3; - } - rdata[lastrdata].next = NULL; + XLogRegisterData((char *) invalmsgs, + ninvalmsgs * sizeof(SharedInvalidationMessage)); - recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT_PREPARED, rdata); + recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT_PREPARED); /* * We don't currently try to sleep before flush here ... nor is there any @@ -2169,8 +2164,6 @@ RecordTransactionAbortPrepared(TransactionId xid, int nrels, RelFileNode *rels) { - XLogRecData rdata[3]; - int lastrdata = 0; xl_xact_abort_prepared xlrec; XLogRecPtr recptr; @@ -2189,30 +2182,20 @@ RecordTransactionAbortPrepared(TransactionId xid, xlrec.arec.xact_time = GetCurrentTimestamp(); xlrec.arec.nrels = nrels; xlrec.arec.nsubxacts = nchildren; - rdata[0].data = (char *) (&xlrec); - rdata[0].len = MinSizeOfXactAbortPrepared; - rdata[0].buffer = InvalidBuffer; + + XLogBeginInsert(); + XLogRegisterData((char *) (&xlrec), MinSizeOfXactAbortPrepared); + /* dump rels to delete */ if (nrels > 0) - { - rdata[0].next = &(rdata[1]); - rdata[1].data = (char *) rels; - rdata[1].len = nrels * sizeof(RelFileNode); - rdata[1].buffer = InvalidBuffer; - lastrdata = 1; - } + XLogRegisterData((char *) rels, nrels * sizeof(RelFileNode)); + /* dump committed child Xids */ if (nchildren > 0) - { - rdata[lastrdata].next = &(rdata[2]); - rdata[2].data = (char *) children; - rdata[2].len = nchildren * sizeof(TransactionId); - rdata[2].buffer = InvalidBuffer; - lastrdata = 2; - } - rdata[lastrdata].next = NULL; + XLogRegisterData((char *) children, + nchildren * sizeof(TransactionId)); - recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_ABORT_PREPARED, rdata); + recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_ABORT_PREPARED); /* Always flush, since we're about to remove the 2PC state file */ XLogFlush(recptr); diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 6f92bad07ca..763e9deb6f5 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -571,7 +571,6 @@ AssignTransactionId(TransactionState s) if (nUnreportedXids >= PGPROC_MAX_CACHED_SUBXIDS || log_unknown_top) { - XLogRecData rdata[2]; xl_xact_assignment xlrec; /* @@ -582,17 +581,12 @@ AssignTransactionId(TransactionState s) Assert(TransactionIdIsValid(xlrec.xtop)); xlrec.nsubxacts = nUnreportedXids; - rdata[0].data = (char *) &xlrec; - rdata[0].len = MinSizeOfXactAssignment; - rdata[0].buffer = InvalidBuffer; - rdata[0].next = &rdata[1]; + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, MinSizeOfXactAssignment); + XLogRegisterData((char *) unreportedXids, + nUnreportedXids * sizeof(TransactionId)); - rdata[1].data = (char *) unreportedXids; - rdata[1].len = nUnreportedXids * sizeof(TransactionId); - rdata[1].buffer = InvalidBuffer; - rdata[1].next = NULL; - - (void) XLogInsert(RM_XACT_ID, XLOG_XACT_ASSIGNMENT, rdata); + (void) XLogInsert(RM_XACT_ID, XLOG_XACT_ASSIGNMENT); nUnreportedXids = 0; /* mark top, not current xact as having been logged */ @@ -1087,8 +1081,6 @@ RecordTransactionCommit(void) if (nrels > 0 || nmsgs > 0 || RelcacheInitFileInval || forceSyncCommit || XLogLogicalInfoActive()) { - XLogRecData rdata[4]; - int lastrdata = 0; xl_xact_commit xlrec; /* @@ -1107,63 +1099,38 @@ RecordTransactionCommit(void) xlrec.nrels = nrels; xlrec.nsubxacts = nchildren; xlrec.nmsgs = nmsgs; - rdata[0].data = (char *) (&xlrec); - rdata[0].len = MinSizeOfXactCommit; - rdata[0].buffer = InvalidBuffer; + + XLogBeginInsert(); + XLogRegisterData((char *) (&xlrec), MinSizeOfXactCommit); /* dump rels to delete */ if (nrels > 0) - { - rdata[0].next = &(rdata[1]); - rdata[1].data = (char *) rels; - rdata[1].len = nrels * sizeof(RelFileNode); - rdata[1].buffer = InvalidBuffer; - lastrdata = 1; - } + XLogRegisterData((char *) rels, + nrels * sizeof(RelFileNode)); /* dump committed child Xids */ if (nchildren > 0) - { - rdata[lastrdata].next = &(rdata[2]); - rdata[2].data = (char *) children; - rdata[2].len = nchildren * sizeof(TransactionId); - rdata[2].buffer = InvalidBuffer; - lastrdata = 2; - } + XLogRegisterData((char *) children, + nchildren * sizeof(TransactionId)); /* dump shared cache invalidation messages */ if (nmsgs > 0) - { - rdata[lastrdata].next = &(rdata[3]); - rdata[3].data = (char *) invalMessages; - rdata[3].len = nmsgs * sizeof(SharedInvalidationMessage); - rdata[3].buffer = InvalidBuffer; - lastrdata = 3; - } - rdata[lastrdata].next = NULL; - - (void) XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT, rdata); + XLogRegisterData((char *) invalMessages, + nmsgs * sizeof(SharedInvalidationMessage)); + (void) XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT); } else { - XLogRecData rdata[2]; - int lastrdata = 0; xl_xact_commit_compact xlrec; xlrec.xact_time = xactStopTimestamp; xlrec.nsubxacts = nchildren; - rdata[0].data = (char *) (&xlrec); - rdata[0].len = MinSizeOfXactCommitCompact; - rdata[0].buffer = InvalidBuffer; + + XLogBeginInsert(); + XLogRegisterData((char *) (&xlrec), MinSizeOfXactCommitCompact); /* dump committed child Xids */ if (nchildren > 0) - { - rdata[0].next = &(rdata[1]); - rdata[1].data = (char *) children; - rdata[1].len = nchildren * sizeof(TransactionId); - rdata[1].buffer = InvalidBuffer; - lastrdata = 1; - } - rdata[lastrdata].next = NULL; + XLogRegisterData((char *) children, + nchildren * sizeof(TransactionId)); - (void) XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT_COMPACT, rdata); + (void) XLogInsert(RM_XACT_ID, XLOG_XACT_COMMIT_COMPACT); } } @@ -1436,8 +1403,6 @@ RecordTransactionAbort(bool isSubXact) RelFileNode *rels; int nchildren; TransactionId *children; - XLogRecData rdata[3]; - int lastrdata = 0; xl_xact_abort xlrec; /* @@ -1486,30 +1451,20 @@ RecordTransactionAbort(bool isSubXact) } xlrec.nrels = nrels; xlrec.nsubxacts = nchildren; - rdata[0].data = (char *) (&xlrec); - rdata[0].len = MinSizeOfXactAbort; - rdata[0].buffer = InvalidBuffer; + + XLogBeginInsert(); + XLogRegisterData((char *) (&xlrec), MinSizeOfXactAbort); + /* dump rels to delete */ if (nrels > 0) - { - rdata[0].next = &(rdata[1]); - rdata[1].data = (char *) rels; - rdata[1].len = nrels * sizeof(RelFileNode); - rdata[1].buffer = InvalidBuffer; - lastrdata = 1; - } + XLogRegisterData((char *) rels, nrels * sizeof(RelFileNode)); + /* dump committed child Xids */ if (nchildren > 0) - { - rdata[lastrdata].next = &(rdata[2]); - rdata[2].data = (char *) children; - rdata[2].len = nchildren * sizeof(TransactionId); - rdata[2].buffer = InvalidBuffer; - lastrdata = 2; - } - rdata[lastrdata].next = NULL; + XLogRegisterData((char *) children, + nchildren * sizeof(TransactionId)); - (void) XLogInsert(RM_XACT_ID, XLOG_XACT_ABORT, rdata); + (void) XLogInsert(RM_XACT_ID, XLOG_XACT_ABORT); /* * Report the latest async abort LSN, so that the WAL writer knows to @@ -2351,6 +2306,9 @@ AbortTransaction(void) AbortBufferIO(); UnlockBuffers(); + /* Reset WAL record construction state */ + XLogResetInsertion(); + /* * Also clean up any open wait for lock, since the lock manager will choke * if we try to wait for another lock before doing this. @@ -4299,6 +4257,9 @@ AbortSubTransaction(void) AbortBufferIO(); UnlockBuffers(); + /* Reset WAL record construction state */ + XLogResetInsertion(); + /* * Also clean up any open wait for lock, since the lock manager will choke * if we try to wait for another lock before doing this. @@ -4938,42 +4899,42 @@ xact_redo_abort(xl_xact_abort *xlrec, TransactionId xid) } void -xact_redo(XLogRecPtr lsn, XLogRecord *record) +xact_redo(XLogReaderState *record) { - uint8 info = record->xl_info & ~XLR_INFO_MASK; + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; /* Backup blocks are not used in xact records */ - Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK)); + Assert(!XLogRecHasAnyBlockRefs(record)); if (info == XLOG_XACT_COMMIT_COMPACT) { xl_xact_commit_compact *xlrec = (xl_xact_commit_compact *) XLogRecGetData(record); - xact_redo_commit_compact(xlrec, record->xl_xid, lsn); + xact_redo_commit_compact(xlrec, XLogRecGetXid(record), record->EndRecPtr); } else if (info == XLOG_XACT_COMMIT) { xl_xact_commit *xlrec = (xl_xact_commit *) XLogRecGetData(record); - xact_redo_commit(xlrec, record->xl_xid, lsn); + xact_redo_commit(xlrec, XLogRecGetXid(record), record->EndRecPtr); } else if (info == XLOG_XACT_ABORT) { xl_xact_abort *xlrec = (xl_xact_abort *) XLogRecGetData(record); - xact_redo_abort(xlrec, record->xl_xid); + xact_redo_abort(xlrec, XLogRecGetXid(record)); } else if (info == XLOG_XACT_PREPARE) { /* the record contents are exactly the 2PC file */ - RecreateTwoPhaseFile(record->xl_xid, - XLogRecGetData(record), record->xl_len); + RecreateTwoPhaseFile(XLogRecGetXid(record), + XLogRecGetData(record), XLogRecGetDataLen(record)); } else if (info == XLOG_XACT_COMMIT_PREPARED) { xl_xact_commit_prepared *xlrec = (xl_xact_commit_prepared *) XLogRecGetData(record); - xact_redo_commit(&xlrec->crec, xlrec->xid, lsn); + xact_redo_commit(&xlrec->crec, xlrec->xid, record->EndRecPtr); RemoveTwoPhaseFile(xlrec->xid, false); } else if (info == XLOG_XACT_ABORT_PREPARED) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 60531277dc6..2059bbeda4a 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -757,10 +757,10 @@ static MemoryContext walDebugCxt = NULL; static void readRecoveryCommandFile(void); static void exitArchiveRecovery(TimeLineID endTLI, XLogSegNo endLogSegNo); -static bool recoveryStopsBefore(XLogRecord *record); -static bool recoveryStopsAfter(XLogRecord *record); +static bool recoveryStopsBefore(XLogReaderState *record); +static bool recoveryStopsAfter(XLogReaderState *record); static void recoveryPausesHere(void); -static bool recoveryApplyDelay(XLogRecord *record); +static bool recoveryApplyDelay(XLogReaderState *record); static void SetLatestXTime(TimestampTz xtime); static void SetCurrentChunkStartTime(TimestampTz xtime); static void CheckRequiredParameterValues(void); @@ -807,9 +807,9 @@ static char *str_time(pg_time_t tnow); static bool CheckForStandbyTrigger(void); #ifdef WAL_DEBUG -static void xlog_outrec(StringInfo buf, XLogRecord *record); +static void xlog_outrec(StringInfo buf, XLogReaderState *record); #endif -static void xlog_outdesc(StringInfo buf, RmgrId rmid, XLogRecord *record); +static void xlog_outdesc(StringInfo buf, XLogReaderState *record); static void pg_start_backup_callback(int code, Datum arg); static bool read_backup_label(XLogRecPtr *checkPointLoc, bool *backupEndRequired, bool *backupFromStandby); @@ -861,7 +861,6 @@ XLogRecPtr XLogInsertRecord(XLogRecData *rdata, XLogRecPtr fpw_lsn) { XLogCtlInsert *Insert = &XLogCtl->Insert; - XLogRecData *rdt; pg_crc32 rdata_crc; bool inserted; XLogRecord *rechdr = (XLogRecord *) rdata->data; @@ -870,28 +869,13 @@ XLogInsertRecord(XLogRecData *rdata, XLogRecPtr fpw_lsn) XLogRecPtr StartPos; XLogRecPtr EndPos; + /* we assume that all of the record header is in the first chunk */ + Assert(rdata->len >= SizeOfXLogRecord); + /* cross-check on whether we should be here or not */ if (!XLogInsertAllowed()) elog(ERROR, "cannot make new WAL entries during recovery"); - /* - * Calculate CRC of the data, including all the backup blocks - * - * Note that the record header isn't added into the CRC initially since we - * don't know the prev-link yet. Thus, the CRC will represent the CRC of - * the whole record in the order: rdata, then backup blocks, then record - * header. - */ - INIT_CRC32C(rdata_crc); - for (rdt = rdata->next; rdt != NULL; rdt = rdt->next) - COMP_CRC32C(rdata_crc, rdt->data, rdt->len); - - /* - * Calculate CRC of the header, except for prev-link, because we don't - * know it yet. It will be added later. - */ - COMP_CRC32C(rdata_crc, ((char *) rechdr), offsetof(XLogRecord, xl_prev)); - /*---------- * * We have now done all the preparatory work we can without holding a @@ -976,10 +960,11 @@ XLogInsertRecord(XLogRecData *rdata, XLogRecPtr fpw_lsn) if (inserted) { /* - * Now that xl_prev has been filled in, finish CRC calculation of the - * record header. + * Now that xl_prev has been filled in, calculate CRC of the record + * header. */ - COMP_CRC32C(rdata_crc, ((char *) &rechdr->xl_prev), sizeof(XLogRecPtr)); + rdata_crc = rechdr->xl_crc; + COMP_CRC32C(rdata_crc, rechdr, offsetof(XLogRecord, xl_crc)); FIN_CRC32C(rdata_crc); rechdr->xl_crc = rdata_crc; @@ -1053,34 +1038,47 @@ XLogInsertRecord(XLogRecData *rdata, XLogRecPtr fpw_lsn) #ifdef WAL_DEBUG if (XLOG_DEBUG) { + static XLogReaderState *debug_reader = NULL; StringInfoData buf; - MemoryContext oldCxt = MemoryContextSwitchTo(walDebugCxt); + StringInfoData recordBuf; + char *errormsg = NULL; + MemoryContext oldCxt; + + oldCxt = MemoryContextSwitchTo(walDebugCxt); initStringInfo(&buf); appendStringInfo(&buf, "INSERT @ %X/%X: ", (uint32) (EndPos >> 32), (uint32) EndPos); - xlog_outrec(&buf, rechdr); - if (rdata->data != NULL) - { - StringInfoData recordbuf; - /* - * We have to piece together the WAL record data from the - * XLogRecData entries, so that we can pass it to the rm_desc - * function as one contiguous chunk. - */ - initStringInfo(&recordbuf); - appendBinaryStringInfo(&recordbuf, (char *) rechdr, sizeof(XLogRecord)); - for (; rdata != NULL; rdata = rdata->next) - appendBinaryStringInfo(&recordbuf, rdata->data, rdata->len); + /* + * We have to piece together the WAL record data from the XLogRecData + * entries, so that we can pass it to the rm_desc function as one + * contiguous chunk. + */ + initStringInfo(&recordBuf); + for (; rdata != NULL; rdata = rdata->next) + appendBinaryStringInfo(&recordBuf, rdata->data, rdata->len); + + if (!debug_reader) + debug_reader = XLogReaderAllocate(NULL, NULL); + if (!debug_reader || + !DecodeXLogRecord(debug_reader, (XLogRecord *) recordBuf.data, + &errormsg)) + { + appendStringInfo(&buf, "error decoding record: %s", + errormsg ? errormsg : "no error message"); + } + else + { appendStringInfoString(&buf, " - "); - xlog_outdesc(&buf, rechdr->xl_rmid, (XLogRecord *) recordbuf.data); + xlog_outdesc(&buf, debug_reader); } elog(LOG, "%s", buf.data); + pfree(buf.data); + pfree(recordBuf.data); MemoryContextSwitchTo(oldCxt); - MemoryContextReset(walDebugCxt); } #endif @@ -1170,7 +1168,7 @@ ReserveXLogSwitch(XLogRecPtr *StartPos, XLogRecPtr *EndPos, XLogRecPtr *PrevPtr) uint64 startbytepos; uint64 endbytepos; uint64 prevbytepos; - uint32 size = SizeOfXLogRecord; + uint32 size = MAXALIGN(SizeOfXLogRecord); XLogRecPtr ptr; uint32 segleft; @@ -1234,9 +1232,6 @@ CopyXLogRecordToWAL(int write_len, bool isLogSwitch, XLogRecData *rdata, XLogRecPtr CurrPos; XLogPageHeader pagehdr; - /* The first chunk is the record header */ - Assert(rdata->len == SizeOfXLogRecord); - /* * Get a pointer to the right place in the right WAL buffer to start * inserting to. @@ -1309,9 +1304,6 @@ CopyXLogRecordToWAL(int write_len, bool isLogSwitch, XLogRecData *rdata, } Assert(written == write_len); - /* Align the end position, so that the next record starts aligned */ - CurrPos = MAXALIGN64(CurrPos); - /* * If this was an xlog-switch, it's not enough to write the switch record, * we also have to consume all the remaining space in the WAL segment. We @@ -1341,6 +1333,11 @@ CopyXLogRecordToWAL(int write_len, bool isLogSwitch, XLogRecData *rdata, CurrPos += XLOG_BLCKSZ; } } + else + { + /* Align the end position, so that the next record starts aligned */ + CurrPos = MAXALIGN64(CurrPos); + } if (CurrPos != EndPos) elog(PANIC, "space reserved for WAL record does not match what was written"); @@ -4470,6 +4467,7 @@ BootStrapXLOG(void) XLogPageHeader page; XLogLongPageHeader longpage; XLogRecord *record; + char *recptr; bool use_existent; uint64 sysidentifier; struct timeval tv; @@ -4541,17 +4539,23 @@ BootStrapXLOG(void) longpage->xlp_xlog_blcksz = XLOG_BLCKSZ; /* Insert the initial checkpoint record */ - record = (XLogRecord *) ((char *) page + SizeOfXLogLongPHD); + recptr = ((char *) page + SizeOfXLogLongPHD); + record = (XLogRecord *) recptr; record->xl_prev = 0; record->xl_xid = InvalidTransactionId; - record->xl_tot_len = SizeOfXLogRecord + sizeof(checkPoint); - record->xl_len = sizeof(checkPoint); + record->xl_tot_len = SizeOfXLogRecord + SizeOfXLogRecordDataHeaderShort + sizeof(checkPoint); record->xl_info = XLOG_CHECKPOINT_SHUTDOWN; record->xl_rmid = RM_XLOG_ID; - memcpy(XLogRecGetData(record), &checkPoint, sizeof(checkPoint)); + recptr += SizeOfXLogRecord; + /* fill the XLogRecordDataHeaderShort struct */ + *(recptr++) = XLR_BLOCK_ID_DATA_SHORT; + *(recptr++) = sizeof(checkPoint); + memcpy(recptr, &checkPoint, sizeof(checkPoint)); + recptr += sizeof(checkPoint); + Assert(recptr - (char *) record == record->xl_tot_len); INIT_CRC32C(crc); - COMP_CRC32C(crc, &checkPoint, sizeof(checkPoint)); + COMP_CRC32C(crc, ((char *) record) + SizeOfXLogRecord, record->xl_tot_len - SizeOfXLogRecord); COMP_CRC32C(crc, (char *) record, offsetof(XLogRecord, xl_crc)); FIN_CRC32C(crc); record->xl_crc = crc; @@ -4984,36 +4988,37 @@ exitArchiveRecovery(TimeLineID endTLI, XLogSegNo endLogSegNo) * timestamps. */ static bool -getRecordTimestamp(XLogRecord *record, TimestampTz *recordXtime) +getRecordTimestamp(XLogReaderState *record, TimestampTz *recordXtime) { - uint8 record_info = record->xl_info & ~XLR_INFO_MASK; + uint8 record_info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + uint8 rmid = XLogRecGetRmid(record); - if (record->xl_rmid == RM_XLOG_ID && record_info == XLOG_RESTORE_POINT) + if (rmid == RM_XLOG_ID && record_info == XLOG_RESTORE_POINT) { *recordXtime = ((xl_restore_point *) XLogRecGetData(record))->rp_time; return true; } - if (record->xl_rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT_COMPACT) + if (rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT_COMPACT) { *recordXtime = ((xl_xact_commit_compact *) XLogRecGetData(record))->xact_time; return true; } - if (record->xl_rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT) + if (rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT) { *recordXtime = ((xl_xact_commit *) XLogRecGetData(record))->xact_time; return true; } - if (record->xl_rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT_PREPARED) + if (rmid == RM_XACT_ID && record_info == XLOG_XACT_COMMIT_PREPARED) { *recordXtime = ((xl_xact_commit_prepared *) XLogRecGetData(record))->crec.xact_time; return true; } - if (record->xl_rmid == RM_XACT_ID && record_info == XLOG_XACT_ABORT) + if (rmid == RM_XACT_ID && record_info == XLOG_XACT_ABORT) { *recordXtime = ((xl_xact_abort *) XLogRecGetData(record))->xact_time; return true; } - if (record->xl_rmid == RM_XACT_ID && record_info == XLOG_XACT_ABORT_PREPARED) + if (rmid == RM_XACT_ID && record_info == XLOG_XACT_ABORT_PREPARED) { *recordXtime = ((xl_xact_abort_prepared *) XLogRecGetData(record))->arec.xact_time; return true; @@ -5030,7 +5035,7 @@ getRecordTimestamp(XLogRecord *record, TimestampTz *recordXtime) * new timeline's history file. */ static bool -recoveryStopsBefore(XLogRecord *record) +recoveryStopsBefore(XLogReaderState *record) { bool stopsHere = false; uint8 record_info; @@ -5052,14 +5057,14 @@ recoveryStopsBefore(XLogRecord *record) } /* Otherwise we only consider stopping before COMMIT or ABORT records. */ - if (record->xl_rmid != RM_XACT_ID) + if (XLogRecGetRmid(record) != RM_XACT_ID) return false; - record_info = record->xl_info & ~XLR_INFO_MASK; + record_info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; if (record_info == XLOG_XACT_COMMIT_COMPACT || record_info == XLOG_XACT_COMMIT) { isCommit = true; - recordXid = record->xl_xid; + recordXid = XLogRecGetXid(record); } else if (record_info == XLOG_XACT_COMMIT_PREPARED) { @@ -5069,7 +5074,7 @@ recoveryStopsBefore(XLogRecord *record) else if (record_info == XLOG_XACT_ABORT) { isCommit = false; - recordXid = record->xl_xid; + recordXid = XLogRecGetXid(record); } else if (record_info == XLOG_XACT_ABORT_PREPARED) { @@ -5140,19 +5145,21 @@ recoveryStopsBefore(XLogRecord *record) * record in XLogCtl->recoveryLastXTime. */ static bool -recoveryStopsAfter(XLogRecord *record) +recoveryStopsAfter(XLogReaderState *record) { uint8 record_info; + uint8 rmid; TimestampTz recordXtime; - record_info = record->xl_info & ~XLR_INFO_MASK; + record_info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + rmid = XLogRecGetRmid(record); /* * There can be many restore points that share the same name; we stop at * the first one. */ if (recoveryTarget == RECOVERY_TARGET_NAME && - record->xl_rmid == RM_XLOG_ID && record_info == XLOG_RESTORE_POINT) + rmid == RM_XLOG_ID && record_info == XLOG_RESTORE_POINT) { xl_restore_point *recordRestorePointData; @@ -5173,7 +5180,7 @@ recoveryStopsAfter(XLogRecord *record) } } - if (record->xl_rmid == RM_XACT_ID && + if (rmid == RM_XACT_ID && (record_info == XLOG_XACT_COMMIT_COMPACT || record_info == XLOG_XACT_COMMIT || record_info == XLOG_XACT_COMMIT_PREPARED || @@ -5192,7 +5199,7 @@ recoveryStopsAfter(XLogRecord *record) else if (record_info == XLOG_XACT_ABORT_PREPARED) recordXid = ((xl_xact_abort_prepared *) XLogRecGetData(record))->xid; else - recordXid = record->xl_xid; + recordXid = XLogRecGetXid(record); /* * There can be only one transaction end record with this exact @@ -5307,7 +5314,7 @@ SetRecoveryPause(bool recoveryPause) * usability. */ static bool -recoveryApplyDelay(XLogRecord *record) +recoveryApplyDelay(XLogReaderState *record) { uint8 record_info; TimestampTz xtime; @@ -5326,8 +5333,8 @@ recoveryApplyDelay(XLogRecord *record) * so there is already opportunity for issues caused by early conflicts on * standbys. */ - record_info = record->xl_info & ~XLR_INFO_MASK; - if (!(record->xl_rmid == RM_XACT_ID && + record_info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + if (!(XLogRecGetRmid(record) == RM_XACT_ID && (record_info == XLOG_XACT_COMMIT_COMPACT || record_info == XLOG_XACT_COMMIT || record_info == XLOG_XACT_COMMIT_PREPARED))) @@ -5696,7 +5703,7 @@ StartupXLOG(void) record = ReadCheckpointRecord(xlogreader, checkPointLoc, 0, true); if (record != NULL) { - memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint)); + memcpy(&checkPoint, XLogRecGetData(xlogreader), sizeof(CheckPoint)); wasShutdown = (record->xl_info == XLOG_CHECKPOINT_SHUTDOWN); ereport(DEBUG1, (errmsg("checkpoint record is at %X/%X", @@ -5793,7 +5800,7 @@ StartupXLOG(void) ereport(PANIC, (errmsg("could not locate a valid checkpoint record"))); } - memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint)); + memcpy(&checkPoint, XLogRecGetData(xlogreader), sizeof(CheckPoint)); wasShutdown = (record->xl_info == XLOG_CHECKPOINT_SHUTDOWN); } @@ -6230,9 +6237,9 @@ StartupXLOG(void) appendStringInfo(&buf, "REDO @ %X/%X; LSN %X/%X: ", (uint32) (ReadRecPtr >> 32), (uint32) ReadRecPtr, (uint32) (EndRecPtr >> 32), (uint32) EndRecPtr); - xlog_outrec(&buf, record); + xlog_outrec(&buf, xlogreader); appendStringInfoString(&buf, " - "); - xlog_outdesc(&buf, record->xl_rmid, record); + xlog_outdesc(&buf, xlogreader); elog(LOG, "%s", buf.data); pfree(buf.data); } @@ -6260,7 +6267,7 @@ StartupXLOG(void) /* * Have we reached our recovery target? */ - if (recoveryStopsBefore(record)) + if (recoveryStopsBefore(xlogreader)) { reachedStopPoint = true; /* see below */ break; @@ -6270,7 +6277,7 @@ StartupXLOG(void) * If we've been asked to lag the master, wait on latch until * enough time has passed. */ - if (recoveryApplyDelay(record)) + if (recoveryApplyDelay(xlogreader)) { /* * We test for paused recovery again here. If user sets @@ -6285,7 +6292,7 @@ StartupXLOG(void) /* Setup error traceback support for ereport() */ errcallback.callback = rm_redo_error_callback; - errcallback.arg = (void *) record; + errcallback.arg = (void *) xlogreader; errcallback.previous = error_context_stack; error_context_stack = &errcallback; @@ -6324,7 +6331,7 @@ StartupXLOG(void) { CheckPoint checkPoint; - memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint)); + memcpy(&checkPoint, XLogRecGetData(xlogreader), sizeof(CheckPoint)); newTLI = checkPoint.ThisTimeLineID; prevTLI = checkPoint.PrevTimeLineID; } @@ -6332,7 +6339,7 @@ StartupXLOG(void) { xl_end_of_recovery xlrec; - memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_end_of_recovery)); + memcpy(&xlrec, XLogRecGetData(xlogreader), sizeof(xl_end_of_recovery)); newTLI = xlrec.ThisTimeLineID; prevTLI = xlrec.PrevTimeLineID; } @@ -6366,7 +6373,7 @@ StartupXLOG(void) RecordKnownAssignedTransactionIds(record->xl_xid); /* Now apply the WAL record itself */ - RmgrTable[record->xl_rmid].rm_redo(EndRecPtr, record); + RmgrTable[record->xl_rmid].rm_redo(xlogreader); /* Pop the error context stack */ error_context_stack = errcallback.previous; @@ -6394,7 +6401,7 @@ StartupXLOG(void) WalSndWakeup(); /* Exit loop if we reached inclusive recovery target */ - if (recoveryStopsAfter(record)) + if (recoveryStopsAfter(xlogreader)) { reachedStopPoint = true; break; @@ -7148,8 +7155,7 @@ ReadCheckpointRecord(XLogReaderState *xlogreader, XLogRecPtr RecPtr, } return NULL; } - if (record->xl_len != sizeof(CheckPoint) || - record->xl_tot_len != SizeOfXLogRecord + sizeof(CheckPoint)) + if (record->xl_tot_len != SizeOfXLogRecord + SizeOfXLogRecordDataHeaderShort + sizeof(CheckPoint)) { switch (whichChkpt) { @@ -7194,6 +7200,9 @@ InitXLOGAccess(void) (void) GetRedoRecPtr(); /* Also update our copy of doPageWrites. */ doPageWrites = (Insert->fullPageWrites || Insert->forcePageWrites); + + /* Also initialize the working areas for constructing WAL records */ + InitXLogInsert(); } /* @@ -7490,7 +7499,6 @@ CreateCheckPoint(int flags) CheckPoint checkPoint; XLogRecPtr recptr; XLogCtlInsert *Insert = &XLogCtl->Insert; - XLogRecData rdata; uint32 freespace; XLogSegNo _logSegNo; XLogRecPtr curInsert; @@ -7760,15 +7768,11 @@ CreateCheckPoint(int flags) /* * Now insert the checkpoint record into XLOG. */ - rdata.data = (char *) (&checkPoint); - rdata.len = sizeof(checkPoint); - rdata.buffer = InvalidBuffer; - rdata.next = NULL; - + XLogBeginInsert(); + XLogRegisterData((char *) (&checkPoint), sizeof(checkPoint)); recptr = XLogInsert(RM_XLOG_ID, shutdown ? XLOG_CHECKPOINT_SHUTDOWN : - XLOG_CHECKPOINT_ONLINE, - &rdata); + XLOG_CHECKPOINT_ONLINE); XLogFlush(recptr); @@ -7908,7 +7912,6 @@ static void CreateEndOfRecoveryRecord(void) { xl_end_of_recovery xlrec; - XLogRecData rdata; XLogRecPtr recptr; /* sanity check */ @@ -7926,12 +7929,9 @@ CreateEndOfRecoveryRecord(void) START_CRIT_SECTION(); - rdata.data = (char *) &xlrec; - rdata.len = sizeof(xl_end_of_recovery); - rdata.buffer = InvalidBuffer; - rdata.next = NULL; - - recptr = XLogInsert(RM_XLOG_ID, XLOG_END_OF_RECOVERY, &rdata); + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, sizeof(xl_end_of_recovery)); + recptr = XLogInsert(RM_XLOG_ID, XLOG_END_OF_RECOVERY); XLogFlush(recptr); @@ -8307,13 +8307,9 @@ KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo) void XLogPutNextOid(Oid nextOid) { - XLogRecData rdata; - - rdata.data = (char *) (&nextOid); - rdata.len = sizeof(Oid); - rdata.buffer = InvalidBuffer; - rdata.next = NULL; - (void) XLogInsert(RM_XLOG_ID, XLOG_NEXTOID, &rdata); + XLogBeginInsert(); + XLogRegisterData((char *) (&nextOid), sizeof(Oid)); + (void) XLogInsert(RM_XLOG_ID, XLOG_NEXTOID); /* * We need not flush the NEXTOID record immediately, because any of the @@ -8349,15 +8345,10 @@ XLogRecPtr RequestXLogSwitch(void) { XLogRecPtr RecPtr; - XLogRecData rdata; - - /* XLOG SWITCH, alone among xlog record types, has no data */ - rdata.buffer = InvalidBuffer; - rdata.data = NULL; - rdata.len = 0; - rdata.next = NULL; - RecPtr = XLogInsert(RM_XLOG_ID, XLOG_SWITCH, &rdata); + /* XLOG SWITCH has no data */ + XLogBeginInsert(); + RecPtr = XLogInsert(RM_XLOG_ID, XLOG_SWITCH); return RecPtr; } @@ -8369,18 +8360,15 @@ XLogRecPtr XLogRestorePoint(const char *rpName) { XLogRecPtr RecPtr; - XLogRecData rdata; xl_restore_point xlrec; xlrec.rp_time = GetCurrentTimestamp(); strlcpy(xlrec.rp_name, rpName, MAXFNAMELEN); - rdata.buffer = InvalidBuffer; - rdata.data = (char *) &xlrec; - rdata.len = sizeof(xl_restore_point); - rdata.next = NULL; + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, sizeof(xl_restore_point)); - RecPtr = XLogInsert(RM_XLOG_ID, XLOG_RESTORE_POINT, &rdata); + RecPtr = XLogInsert(RM_XLOG_ID, XLOG_RESTORE_POINT); ereport(LOG, (errmsg("restore point \"%s\" created at %X/%X", @@ -8412,7 +8400,6 @@ XLogReportParameters(void) */ if (wal_level != ControlFile->wal_level || XLogIsNeeded()) { - XLogRecData rdata; xl_parameter_change xlrec; XLogRecPtr recptr; @@ -8423,12 +8410,10 @@ XLogReportParameters(void) xlrec.wal_level = wal_level; xlrec.wal_log_hints = wal_log_hints; - rdata.buffer = InvalidBuffer; - rdata.data = (char *) &xlrec; - rdata.len = sizeof(xlrec); - rdata.next = NULL; + XLogBeginInsert(); + XLogRegisterData((char *) &xlrec, sizeof(xlrec)); - recptr = XLogInsert(RM_XLOG_ID, XLOG_PARAMETER_CHANGE, &rdata); + recptr = XLogInsert(RM_XLOG_ID, XLOG_PARAMETER_CHANGE); XLogFlush(recptr); } @@ -8486,14 +8471,10 @@ UpdateFullPageWrites(void) */ if (XLogStandbyInfoActive() && !RecoveryInProgress()) { - XLogRecData rdata; - - rdata.data = (char *) (&fullPageWrites); - rdata.len = sizeof(bool); - rdata.buffer = InvalidBuffer; - rdata.next = NULL; + XLogBeginInsert(); + XLogRegisterData((char *) (&fullPageWrites), sizeof(bool)); - XLogInsert(RM_XLOG_ID, XLOG_FPW_CHANGE, &rdata); + XLogInsert(RM_XLOG_ID, XLOG_FPW_CHANGE); } if (!fullPageWrites) @@ -8558,12 +8539,13 @@ checkTimeLineSwitch(XLogRecPtr lsn, TimeLineID newTLI, TimeLineID prevTLI) * not all record types are related to control file updates. */ void -xlog_redo(XLogRecPtr lsn, XLogRecord *record) +xlog_redo(XLogReaderState *record) { - uint8 info = record->xl_info & ~XLR_INFO_MASK; + uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; + XLogRecPtr lsn = record->EndRecPtr; - /* Backup blocks are not used by XLOG rmgr */ - Assert(!(record->xl_info & XLR_BKP_BLOCK_MASK)); + /* in XLOG rmgr, backup blocks are only used by XLOG_FPI records */ + Assert(!XLogRecHasAnyBlockRefs(record) || info == XLOG_FPI); if (info == XLOG_NEXTOID) { @@ -8750,14 +8732,12 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record) } else if (info == XLOG_FPI) { - char *data; - BkpBlock bkpb; + Buffer buffer; /* - * Full-page image (FPI) records contain a backup block stored - * "inline" in the normal data since the locking when writing hint - * records isn't sufficient to use the normal backup block mechanism, - * which assumes exclusive lock on the buffer supplied. + * Full-page image (FPI) records contain nothing else but a backup + * block. The block reference must include a full-page image - + * otherwise there would be no point in this record. * * Since the only change in these backup block are hint bits, there * are no recovery conflicts generated. @@ -8766,11 +8746,9 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record) * smgr implementation has no need to implement anything. Which means * nothing is needed in md.c etc */ - data = XLogRecGetData(record); - memcpy(&bkpb, data, sizeof(BkpBlock)); - data += sizeof(BkpBlock); - - RestoreBackupBlockContents(lsn, bkpb, data, false, false); + if (XLogReadBufferForRedo(record, 0, &buffer) != BLK_RESTORED) + elog(ERROR, "unexpected XLogReadBufferForRedo result when restoring backup block"); + UnlockReleaseBuffer(buffer); } else if (info == XLOG_BACKUP_END) { @@ -8867,22 +8845,42 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record) #ifdef WAL_DEBUG static void -xlog_outrec(StringInfo buf, XLogRecord *record) +xlog_outrec(StringInfo buf, XLogReaderState *record) { - int i; + int block_id; appendStringInfo(buf, "prev %X/%X; xid %u", - (uint32) (record->xl_prev >> 32), - (uint32) record->xl_prev, - record->xl_xid); + (uint32) (XLogRecGetPrev(record) >> 32), + (uint32) XLogRecGetPrev(record), + XLogRecGetXid(record)); appendStringInfo(buf, "; len %u", - record->xl_len); + XLogRecGetDataLen(record)); - for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++) + /* decode block references */ + for (block_id = 0; block_id <= record->max_block_id; block_id++) { - if (record->xl_info & XLR_BKP_BLOCK(i)) - appendStringInfo(buf, "; bkpb%d", i); + RelFileNode rnode; + ForkNumber forknum; + BlockNumber blk; + + if (!XLogRecHasBlockRef(record, block_id)) + continue; + + XLogRecGetBlockTag(record, block_id, &rnode, &forknum, &blk); + if (forknum != MAIN_FORKNUM) + appendStringInfo(buf, "; blkref #%u: rel %u/%u/%u, fork %u, blk %u", + block_id, + rnode.spcNode, rnode.dbNode, rnode.relNode, + forknum, + blk); + else + appendStringInfo(buf, "; blkref #%u: rel %u/%u/%u, blk %u", + block_id, + rnode.spcNode, rnode.dbNode, rnode.relNode, + blk); + if (XLogRecHasBlockImage(record, block_id)) + appendStringInfo(buf, " FPW"); } } #endif /* WAL_DEBUG */ @@ -8892,17 +8890,18 @@ xlog_outrec(StringInfo buf, XLogRecord *record) * optionally followed by a colon, a space, and a further description. */ static void -xlog_outdesc(StringInfo buf, RmgrId rmid, XLogRecord *record) +xlog_outdesc(StringInfo buf, XLogReaderState *record) { + RmgrId rmid = XLogRecGetRmid(record); + uint8 info = XLogRecGetInfo(record); const char *id; appendStringInfoString(buf, RmgrTable[rmid].rm_name); appendStringInfoChar(buf, '/'); - id = RmgrTable[rmid].rm_identify(record->xl_info); + id = RmgrTable[rmid].rm_identify(info); if (id == NULL) - appendStringInfo(buf, "UNKNOWN (%X): ", - record->xl_info & ~XLR_INFO_MASK); + appendStringInfo(buf, "UNKNOWN (%X): ", info & ~XLR_INFO_MASK); else appendStringInfo(buf, "%s: ", id); @@ -9411,7 +9410,6 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive, TimeLineID *stoptli_p) XLogRecPtr startpoint; XLogRecPtr stoppoint; TimeLineID stoptli; - XLogRecData rdata; pg_time_t stamp_time; char strfbuf[128]; char histfilepath[MAXPGPATH]; @@ -9618,11 +9616,9 @@ do_pg_stop_backup(char *labelfile, bool waitforarchive, TimeLineID *stoptli_p) /* * Write the backup-end xlog record */ - rdata.data = (char *) (&startpoint); - rdata.len = sizeof(startpoint); - rdata.buffer = InvalidBuffer; - rdata.next = NULL; - stoppoint = XLogInsert(RM_XLOG_ID, XLOG_BACKUP_END, &rdata); + XLogBeginInsert(); + XLogRegisterData((char *) (&startpoint), sizeof(startpoint)); + stoppoint = XLogInsert(RM_XLOG_ID, XLOG_BACKUP_END); stoptli = ThisTimeLineID; /* @@ -9930,15 +9926,13 @@ read_backup_label(XLogRecPtr *checkPointLoc, bool *backupEndRequired, static void rm_redo_error_callback(void *arg) { - XLogRecord *record = (XLogRecord *) arg; + XLogReaderState *record = (XLogReaderState *) arg; StringInfoData buf; initStringInfo(&buf); - xlog_outdesc(&buf, record->xl_rmid, record); + xlog_outdesc(&buf, record); - /* don't bother emitting empty description */ - if (buf.len > 0) - errcontext("xlog redo %s", buf.data); + errcontext("xlog redo %s", buf.data); pfree(buf.data); } diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c index b83343bf5bd..89c407e521b 100644 --- a/src/backend/access/transam/xloginsert.c +++ b/src/backend/access/transam/xloginsert.c @@ -3,6 +3,12 @@ * xloginsert.c * Functions for constructing WAL records * + * Constructing a WAL record begins with a call to XLogBeginInsert, + * followed by a number of XLogRegister* calls. The registered data is + * collected in private working memory, and finally assembled into a chain + * of XLogRecData structs by a call to XLogRecordAssemble(). See + * access/transam/README for details. + * * Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * @@ -24,39 +30,366 @@ #include "utils/memutils.h" #include "pg_trace.h" +/* + * For each block reference registered with XLogRegisterBuffer, we fill in + * a registered_buffer struct. + */ +typedef struct +{ + bool in_use; /* is this slot in use? */ + uint8 flags; /* REGBUF_* flags */ + RelFileNode rnode; /* identifies the relation and block */ + ForkNumber forkno; + BlockNumber block; + Page page; /* page content */ + uint32 rdata_len; /* total length of data in rdata chain */ + XLogRecData *rdata_head; /* head of the chain of data registered with + * this block */ + XLogRecData *rdata_tail; /* last entry in the chain, or &rdata_head if + * empty */ + + XLogRecData bkp_rdatas[2]; /* temporary rdatas used to hold references to + * backup block data in XLogRecordAssemble() */ +} registered_buffer; + +static registered_buffer *registered_buffers; +static int max_registered_buffers; /* allocated size */ +static int max_registered_block_id = 0; /* highest block_id + 1 + * currently registered */ + +/* + * A chain of XLogRecDatas to hold the "main data" of a WAL record, registered + * with XLogRegisterData(...). + */ +static XLogRecData *mainrdata_head; +static XLogRecData *mainrdata_last = (XLogRecData *) &mainrdata_head; +static uint32 mainrdata_len; /* total # of bytes in chain */ + +/* + * These are used to hold the record header while constructing a record. + * 'hdr_scratch' is not a plain variable, but is palloc'd at initialization, + * because we want it to be MAXALIGNed and padding bytes zeroed. + * + * For simplicity, it's allocated large enough to hold the headers for any + * WAL record. + */ +static XLogRecData hdr_rdt; +static char *hdr_scratch = NULL; + +#define HEADER_SCRATCH_SIZE \ + (SizeOfXLogRecord + \ + MaxSizeOfXLogRecordBlockHeader * (XLR_MAX_BLOCK_ID + 1) + \ + SizeOfXLogRecordDataHeaderLong) + +/* + * An array of XLogRecData structs, to hold registered data. + */ +static XLogRecData *rdatas; +static int num_rdatas; /* entries currently used */ +static int max_rdatas; /* allocated size */ + +static bool begininsert_called = false; + +/* Memory context to hold the registered buffer and data references. */ +static MemoryContext xloginsert_cxt; + static XLogRecData *XLogRecordAssemble(RmgrId rmid, uint8 info, - XLogRecData *rdata, XLogRecPtr RedoRecPtr, bool doPageWrites, - XLogRecPtr *fpw_lsn, XLogRecData **rdt_lastnormal); -static void XLogFillBkpBlock(Buffer buffer, bool buffer_std, BkpBlock *bkpb); + XLogRecPtr *fpw_lsn); + +/* + * Begin constructing a WAL record. This must be called before the + * XLogRegister* functions and XLogInsert(). + */ +void +XLogBeginInsert(void) +{ + Assert(max_registered_block_id == 0); + Assert(mainrdata_last == (XLogRecData *) &mainrdata_head); + Assert(mainrdata_len == 0); + Assert(!begininsert_called); + + /* cross-check on whether we should be here or not */ + if (!XLogInsertAllowed()) + elog(ERROR, "cannot make new WAL entries during recovery"); + + begininsert_called = true; +} /* - * Insert an XLOG record having the specified RMID and info bytes, - * with the body of the record being the data chunk(s) described by - * the rdata chain (see xloginsert.h for notes about rdata). + * Ensure that there are enough buffer and data slots in the working area, + * for subsequent XLogRegisterBuffer, XLogRegisterData and XLogRegisterBufData + * calls. + * + * There is always space for a small number of buffers and data chunks, enough + * for most record types. This function is for the exceptional cases that need + * more. + */ +void +XLogEnsureRecordSpace(int max_block_id, int ndatas) +{ + int nbuffers; + + /* + * This must be called before entering a critical section, because + * allocating memory inside a critical section can fail. repalloc() will + * check the same, but better to check it here too so that we fail + * consistently even if the arrays happen to be large enough already. + */ + Assert(CritSectionCount == 0); + + /* the minimum values can't be decreased */ + if (max_block_id < XLR_NORMAL_MAX_BLOCK_ID) + max_block_id = XLR_NORMAL_MAX_BLOCK_ID; + if (ndatas < XLR_NORMAL_RDATAS) + ndatas = XLR_NORMAL_RDATAS; + + if (max_block_id > XLR_MAX_BLOCK_ID) + elog(ERROR, "maximum number of WAL record block references exceeded"); + nbuffers = max_block_id + 1; + + if (nbuffers > max_registered_buffers) + { + registered_buffers = (registered_buffer *) + repalloc(registered_buffers, sizeof(registered_buffer) * nbuffers); + + /* + * At least the padding bytes in the structs must be zeroed, because + * they are included in WAL data, but initialize it all for tidiness. + */ + MemSet(®istered_buffers[max_registered_buffers], 0, + (nbuffers - max_registered_buffers) * sizeof(registered_buffer)); + max_registered_buffers = nbuffers; + } + + if (ndatas > max_rdatas) + { + rdatas = (XLogRecData *) repalloc(rdatas, sizeof(XLogRecData) * ndatas); + max_rdatas = ndatas; + } +} + +/* + * Reset WAL record construction buffers. + */ +void +XLogResetInsertion(void) +{ + int i; + + for (i = 0; i < max_registered_block_id; i++) + registered_buffers[i].in_use = false; + + num_rdatas = 0; + max_registered_block_id = 0; + mainrdata_len = 0; + mainrdata_last = (XLogRecData *) &mainrdata_head; + begininsert_called = false; +} + +/* + * Register a reference to a buffer with the WAL record being constructed. + * This must be called for every page that the WAL-logged operation modifies. + */ +void +XLogRegisterBuffer(uint8 block_id, Buffer buffer, uint8 flags) +{ + registered_buffer *regbuf; + + /* NO_IMAGE doesn't make sense with FORCE_IMAGE */ + Assert(!((flags & REGBUF_FORCE_IMAGE) && (flags & (REGBUF_NO_IMAGE)))); + Assert(begininsert_called); + + if (block_id >= max_registered_block_id) + { + if (block_id >= max_registered_buffers) + elog(ERROR, "too many registered buffers"); + max_registered_block_id = block_id + 1; + } + + regbuf = ®istered_buffers[block_id]; + + BufferGetTag(buffer, ®buf->rnode, ®buf->forkno, ®buf->block); + regbuf->page = BufferGetPage(buffer); + regbuf->flags = flags; + regbuf->rdata_tail = (XLogRecData *) ®buf->rdata_head; + regbuf->rdata_len = 0; + + /* + * Check that this page hasn't already been registered with some other + * block_id. + */ +#ifdef USE_ASSERT_CHECKING + { + int i; + + for (i = 0; i < max_registered_block_id; i++) + { + registered_buffer *regbuf_old = ®istered_buffers[i]; + + if (i == block_id || !regbuf_old->in_use) + continue; + + Assert(!RelFileNodeEquals(regbuf_old->rnode, regbuf->rnode) || + regbuf_old->forkno != regbuf->forkno || + regbuf_old->block != regbuf->block); + } + } +#endif + + regbuf->in_use = true; +} + +/* + * Like XLogRegisterBuffer, but for registering a block that's not in the + * shared buffer pool (i.e. when you don't have a Buffer for it). + */ +void +XLogRegisterBlock(uint8 block_id, RelFileNode *rnode, ForkNumber forknum, + BlockNumber blknum, Page page, uint8 flags) +{ + registered_buffer *regbuf; + + /* This is currently only used to WAL-log a full-page image of a page */ + Assert(flags & REGBUF_FORCE_IMAGE); + Assert(begininsert_called); + + if (block_id >= max_registered_block_id) + max_registered_block_id = block_id + 1; + + if (block_id >= max_registered_buffers) + elog(ERROR, "too many registered buffers"); + + regbuf = ®istered_buffers[block_id]; + + regbuf->rnode = *rnode; + regbuf->forkno = forknum; + regbuf->block = blknum; + regbuf->page = page; + regbuf->flags = flags; + regbuf->rdata_tail = (XLogRecData *) ®buf->rdata_head; + regbuf->rdata_len = 0; + + /* + * Check that this page hasn't already been registered with some other + * block_id. + */ +#ifdef USE_ASSERT_CHECKING + { + int i; + + for (i = 0; i < max_registered_block_id; i++) + { + registered_buffer *regbuf_old = ®istered_buffers[i]; + + if (i == block_id || !regbuf_old->in_use) + continue; + + Assert(!RelFileNodeEquals(regbuf_old->rnode, regbuf->rnode) || + regbuf_old->forkno != regbuf->forkno || + regbuf_old->block != regbuf->block); + } + } +#endif + + regbuf->in_use = true; +} + +/* + * Add data to the WAL record that's being constructed. + * + * The data is appended to the "main chunk", available at replay with + * XLogGetRecData(). + */ +void +XLogRegisterData(char *data, int len) +{ + XLogRecData *rdata; + + Assert(begininsert_called); + + if (num_rdatas >= max_rdatas) + elog(ERROR, "too much WAL data"); + rdata = &rdatas[num_rdatas++]; + + rdata->data = data; + rdata->len = len; + + /* + * we use the mainrdata_last pointer to track the end of the chain, so no + * need to clear 'next' here. + */ + + mainrdata_last->next = rdata; + mainrdata_last = rdata; + + mainrdata_len += len; +} + +/* + * Add buffer-specific data to the WAL record that's being constructed. + * + * Block_id must reference a block previously registered with + * XLogRegisterBuffer(). If this is called more than once for the same + * block_id, the data is appended. + * + * The maximum amount of data that can be registered per block is 65535 + * bytes. That should be plenty; if you need more than BLCKSZ bytes to + * reconstruct the changes to the page, you might as well just log a full + * copy of it. (the "main data" that's not associated with a block is not + * limited) + */ +void +XLogRegisterBufData(uint8 block_id, char *data, int len) +{ + registered_buffer *regbuf; + XLogRecData *rdata; + + Assert(begininsert_called); + + /* find the registered buffer struct */ + regbuf = ®istered_buffers[block_id]; + if (!regbuf->in_use) + elog(ERROR, "no block with id %d registered with WAL insertion", + block_id); + + if (num_rdatas >= max_rdatas) + elog(ERROR, "too much WAL data"); + rdata = &rdatas[num_rdatas++]; + + rdata->data = data; + rdata->len = len; + + regbuf->rdata_tail->next = rdata; + regbuf->rdata_tail = rdata; + regbuf->rdata_len += len; +} + +/* + * Insert an XLOG record having the specified RMID and info bytes, with the + * body of the record being the data and buffer references registered earlier + * with XLogRegister* calls. * * Returns XLOG pointer to end of record (beginning of next record). * This can be used as LSN for data pages affected by the logged action. * (LSN is the XLOG point up to which the XLOG must be flushed to disk * before the data page can be written out. This implements the basic * WAL rule "write the log before the data".) - * - * NB: this routine feels free to scribble on the XLogRecData structs, - * though not on the data they reference. This is OK since the XLogRecData - * structs are always just temporaries in the calling code. */ XLogRecPtr -XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata) +XLogInsert(RmgrId rmid, uint8 info) { - XLogRecPtr RedoRecPtr; - bool doPageWrites; XLogRecPtr EndPos; - XLogRecPtr fpw_lsn; - XLogRecData *rdt; - XLogRecData *rdt_lastnormal; - /* info's high bits are reserved for use by me */ - if (info & XLR_INFO_MASK) + /* XLogBeginInsert() must have been called. */ + if (!begininsert_called) + elog(ERROR, "XLogBeginInsert was not called"); + + /* + * The caller can set rmgr bits and XLR_SPECIAL_REL_UPDATE; the rest are + * reserved for use by me. + */ + if ((info & ~(XLR_RMGR_INFO_MASK | XLR_SPECIAL_REL_UPDATE)) != 0) elog(PANIC, "invalid xlog info mask %02X", info); TRACE_POSTGRESQL_XLOG_INSERT(rmid, info); @@ -67,292 +400,282 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata) */ if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID) { + XLogResetInsertion(); EndPos = SizeOfXLogLongPHD; /* start of 1st chkpt record */ return EndPos; } - /* - * Get values needed to decide whether to do full-page writes. Since we - * don't yet have an insertion lock, these could change under us, but - * XLogInsertRecord will recheck them once it has a lock. - */ - GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites); - - /* - * Assemble an XLogRecData chain representing the WAL record, including - * any backup blocks needed. - * - * We may have to loop back to here if a race condition is detected in - * XLogInsertRecord. We could prevent the race by doing all this work - * while holding an insertion lock, but it seems better to avoid doing CRC - * calculations while holding one. - */ -retry: - rdt = XLogRecordAssemble(rmid, info, rdata, RedoRecPtr, doPageWrites, - &fpw_lsn, &rdt_lastnormal); - - EndPos = XLogInsertRecord(rdt, fpw_lsn); - - if (EndPos == InvalidXLogRecPtr) + do { + XLogRecPtr RedoRecPtr; + bool doPageWrites; + XLogRecPtr fpw_lsn; + XLogRecData *rdt; + /* - * Undo the changes we made to the rdata chain, and retry. - * - * XXX: This doesn't undo *all* the changes; the XLogRecData - * entries for buffers that we had already decided to back up have - * had their data-pointers cleared. That's OK, as long as we - * decide to back them up on the next iteration as well. Hence, - * don't allow "doPageWrites" value to go from true to false after - * we've modified the rdata chain. + * Get values needed to decide whether to do full-page writes. Since + * we don't yet have an insertion lock, these could change under us, + * but XLogInsertRecData will recheck them once it has a lock. */ - bool newDoPageWrites; + GetFullPageWriteInfo(&RedoRecPtr, &doPageWrites); - GetFullPageWriteInfo(&RedoRecPtr, &newDoPageWrites); - doPageWrites = doPageWrites || newDoPageWrites; - rdt_lastnormal->next = NULL; + rdt = XLogRecordAssemble(rmid, info, RedoRecPtr, doPageWrites, + &fpw_lsn); - goto retry; - } + EndPos = XLogInsertRecord(rdt, fpw_lsn); + } while (EndPos == InvalidXLogRecPtr); + + XLogResetInsertion(); return EndPos; } /* - * Assemble a full WAL record, including backup blocks, from an XLogRecData - * chain, ready for insertion with XLogInsertRecord(). The record header - * fields are filled in, except for the xl_prev field and CRC. + * Assemble a WAL record from the registered data and buffers into an + * XLogRecData chain, ready for insertion with XLogInsertRecord(). * - * The rdata chain is modified, adding entries for full-page images. - * *rdt_lastnormal is set to point to the last normal (ie. not added by - * this function) entry. It can be used to reset the chain to its original - * state. + * The record header fields are filled in, except for the xl_prev field. The + * calculated CRC does not include xl_prev either. * - * If the rdata chain contains any buffer references, and a full-page image - * was not taken of all the buffers, *fpw_lsn is set to the lowest LSN among - * such pages. This signals that the assembled record is only good for - * insertion on the assumption that the RedoRecPtr and doPageWrites values - * were up-to-date. + * If there are any registered buffers, and a full-page image was not taken + * of all them, *page_writes_omitted is set to true. This signals that the + * assembled record is only good for insertion on the assumption that the + * RedoRecPtr and doPageWrites values were up-to-date. */ static XLogRecData * -XLogRecordAssemble(RmgrId rmid, uint8 info, XLogRecData *rdata, +XLogRecordAssemble(RmgrId rmid, uint8 info, XLogRecPtr RedoRecPtr, bool doPageWrites, - XLogRecPtr *fpw_lsn, XLogRecData **rdt_lastnormal) + XLogRecPtr *fpw_lsn) { - bool isLogSwitch = (rmid == RM_XLOG_ID && info == XLOG_SWITCH); XLogRecData *rdt; - Buffer dtbuf[XLR_MAX_BKP_BLOCKS]; - bool dtbuf_bkp[XLR_MAX_BKP_BLOCKS]; - uint32 len, - total_len; - unsigned i; + uint32 total_len = 0; + int block_id; + pg_crc32 rdata_crc; + registered_buffer *prev_regbuf = NULL; + XLogRecData *rdt_datas_last; + XLogRecord *rechdr; + char *scratch = hdr_scratch; /* - * These need to be static because they are returned to the caller as part - * of the XLogRecData chain. + * Note: this function can be called multiple times for the same record. + * All the modifications we do to the rdata chains below must handle that. */ - static BkpBlock dtbuf_xlg[XLR_MAX_BKP_BLOCKS]; - static XLogRecData dtbuf_rdt1[XLR_MAX_BKP_BLOCKS]; - static XLogRecData dtbuf_rdt2[XLR_MAX_BKP_BLOCKS]; - static XLogRecData dtbuf_rdt3[XLR_MAX_BKP_BLOCKS]; - static XLogRecData hdr_rdt; - static XLogRecord *rechdr; - - if (rechdr == NULL) - { - static char rechdrbuf[SizeOfXLogRecord + MAXIMUM_ALIGNOF]; - rechdr = (XLogRecord *) MAXALIGN(&rechdrbuf); - MemSet(rechdr, 0, SizeOfXLogRecord); - } + /* The record begins with the fixed-size header */ + rechdr = (XLogRecord *) scratch; + scratch += SizeOfXLogRecord; - /* The record begins with the header */ - hdr_rdt.data = (char *) rechdr; - hdr_rdt.len = SizeOfXLogRecord; - hdr_rdt.next = rdata; - total_len = SizeOfXLogRecord; + hdr_rdt.next = NULL; + rdt_datas_last = &hdr_rdt; + hdr_rdt.data = hdr_scratch; /* - * Here we scan the rdata chain, to determine which buffers must be backed - * up. - * - * We add entries for backup blocks to the chain, so that they don't need - * any special treatment in the critical section where the chunks are - * copied into the WAL buffers. Those entries have to be unlinked from the - * chain if we have to loop back here. + * Make an rdata chain containing all the data portions of all block + * references. This includes the data for full-page images. Also append + * the headers for the block references in the scratch buffer. */ - for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++) - { - dtbuf[i] = InvalidBuffer; - dtbuf_bkp[i] = false; - } - *fpw_lsn = InvalidXLogRecPtr; - len = 0; - for (rdt = rdata;;) + for (block_id = 0; block_id < max_registered_block_id; block_id++) { - if (rdt->buffer == InvalidBuffer) + registered_buffer *regbuf = ®istered_buffers[block_id]; + bool needs_backup; + bool needs_data; + XLogRecordBlockHeader bkpb; + XLogRecordBlockImageHeader bimg; + bool samerel; + + if (!regbuf->in_use) + continue; + + /* Determine if this block needs to be backed up */ + if (regbuf->flags & REGBUF_FORCE_IMAGE) + needs_backup = true; + else if (regbuf->flags & REGBUF_NO_IMAGE) + needs_backup = false; + else if (!doPageWrites) + needs_backup = false; + else { - /* Simple data, just include it */ - len += rdt->len; + /* + * We assume page LSN is first data on *every* page that can be + * passed to XLogInsert, whether it has the standard page layout + * or not. + */ + XLogRecPtr page_lsn = PageGetLSN(regbuf->page); + + needs_backup = (page_lsn <= RedoRecPtr); + if (!needs_backup) + { + if (*fpw_lsn == InvalidXLogRecPtr || page_lsn < *fpw_lsn) + *fpw_lsn = page_lsn; + } } + + /* Determine if the buffer data needs to included */ + if (regbuf->rdata_len == 0) + needs_data = false; + else if ((regbuf->flags & REGBUF_KEEP_DATA) != 0) + needs_data = true; else + needs_data = !needs_backup; + + bkpb.id = block_id; + bkpb.fork_flags = regbuf->forkno; + bkpb.data_length = 0; + + if ((regbuf->flags & REGBUF_WILL_INIT) == REGBUF_WILL_INIT) + bkpb.fork_flags |= BKPBLOCK_WILL_INIT; + + if (needs_backup) { - /* Find info for buffer */ - for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++) + Page page = regbuf->page; + + /* + * The page needs to be backed up, so set up *bimg + */ + if (regbuf->flags & REGBUF_STANDARD) { - if (rdt->buffer == dtbuf[i]) + /* Assume we can omit data between pd_lower and pd_upper */ + uint16 lower = ((PageHeader) page)->pd_lower; + uint16 upper = ((PageHeader) page)->pd_upper; + + if (lower >= SizeOfPageHeaderData && + upper > lower && + upper <= BLCKSZ) { - /* Buffer already referenced by earlier chain item */ - if (dtbuf_bkp[i]) - { - rdt->data = NULL; - rdt->len = 0; - } - else if (rdt->data) - len += rdt->len; - break; + bimg.hole_offset = lower; + bimg.hole_length = upper - lower; } - if (dtbuf[i] == InvalidBuffer) + else { - /* OK, put it in this slot */ - XLogRecPtr page_lsn; - bool needs_backup; - - dtbuf[i] = rdt->buffer; - - /* - * Determine whether the buffer has to be backed up. - * - * We assume page LSN is first data on *every* page that - * can be passed to XLogInsert, whether it has the - * standard page layout or not. We don't need to take the - * buffer header lock for PageGetLSN because we hold an - * exclusive lock on the page and/or the relation. - */ - page_lsn = PageGetLSN(BufferGetPage(rdt->buffer)); - if (!doPageWrites) - needs_backup = false; - else if (page_lsn <= RedoRecPtr) - needs_backup = true; - else - needs_backup = false; - - if (needs_backup) - { - /* - * The page needs to be backed up, so set up BkpBlock - */ - XLogFillBkpBlock(rdt->buffer, rdt->buffer_std, - &(dtbuf_xlg[i])); - dtbuf_bkp[i] = true; - rdt->data = NULL; - rdt->len = 0; - } - else - { - if (rdt->data) - len += rdt->len; - if (*fpw_lsn == InvalidXLogRecPtr || - page_lsn < *fpw_lsn) - { - *fpw_lsn = page_lsn; - } - } - break; + /* No "hole" to compress out */ + bimg.hole_offset = 0; + bimg.hole_length = 0; } } - if (i >= XLR_MAX_BKP_BLOCKS) - elog(PANIC, "can backup at most %d blocks per xlog record", - XLR_MAX_BKP_BLOCKS); - } - /* Break out of loop when rdt points to last chain item */ - if (rdt->next == NULL) - break; - rdt = rdt->next; - } - total_len += len; + else + { + /* Not a standard page header, don't try to eliminate "hole" */ + bimg.hole_offset = 0; + bimg.hole_length = 0; + } - /* - * Make additional rdata chain entries for the backup blocks, so that we - * don't need to special-case them in the write loop. This modifies the - * original rdata chain, but we keep a pointer to the last regular entry, - * rdt_lastnormal, so that we can undo this if we have to start over. - * - * At the exit of this loop, total_len includes the backup block data. - * - * Also set the appropriate info bits to show which buffers were backed - * up. The XLR_BKP_BLOCK(N) bit corresponds to the N'th distinct buffer - * value (ignoring InvalidBuffer) appearing in the rdata chain. - */ - *rdt_lastnormal = rdt; - for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++) - { - BkpBlock *bkpb; - char *page; + /* Fill in the remaining fields in the XLogRecordBlockData struct */ + bkpb.fork_flags |= BKPBLOCK_HAS_IMAGE; - if (!dtbuf_bkp[i]) - continue; + total_len += BLCKSZ - bimg.hole_length; + + /* + * Construct XLogRecData entries for the page content. + */ + rdt_datas_last->next = ®buf->bkp_rdatas[0]; + rdt_datas_last = rdt_datas_last->next; + if (bimg.hole_length == 0) + { + rdt_datas_last->data = page; + rdt_datas_last->len = BLCKSZ; + } + else + { + /* must skip the hole */ + rdt_datas_last->data = page; + rdt_datas_last->len = bimg.hole_offset; - info |= XLR_BKP_BLOCK(i); + rdt_datas_last->next = ®buf->bkp_rdatas[1]; + rdt_datas_last = rdt_datas_last->next; - bkpb = &(dtbuf_xlg[i]); - page = (char *) BufferGetBlock(dtbuf[i]); + rdt_datas_last->data = page + (bimg.hole_offset + bimg.hole_length); + rdt_datas_last->len = BLCKSZ - (bimg.hole_offset + bimg.hole_length); + } + } - rdt->next = &(dtbuf_rdt1[i]); - rdt = rdt->next; + if (needs_data) + { + /* + * Link the caller-supplied rdata chain for this buffer to the + * overall list. + */ + bkpb.fork_flags |= BKPBLOCK_HAS_DATA; + bkpb.data_length = regbuf->rdata_len; + total_len += regbuf->rdata_len; + + rdt_datas_last->next = regbuf->rdata_head; + rdt_datas_last = regbuf->rdata_tail; + } - rdt->data = (char *) bkpb; - rdt->len = sizeof(BkpBlock); - total_len += sizeof(BkpBlock); + if (prev_regbuf && RelFileNodeEquals(regbuf->rnode, prev_regbuf->rnode)) + { + samerel = true; + bkpb.fork_flags |= BKPBLOCK_SAME_REL; + prev_regbuf = regbuf; + } + else + samerel = false; - rdt->next = &(dtbuf_rdt2[i]); - rdt = rdt->next; + /* Ok, copy the header to the scratch buffer */ + memcpy(scratch, &bkpb, SizeOfXLogRecordBlockHeader); + scratch += SizeOfXLogRecordBlockHeader; + if (needs_backup) + { + memcpy(scratch, &bimg, SizeOfXLogRecordBlockImageHeader); + scratch += SizeOfXLogRecordBlockImageHeader; + } + if (!samerel) + { + memcpy(scratch, ®buf->rnode, sizeof(RelFileNode)); + scratch += sizeof(RelFileNode); + } + memcpy(scratch, ®buf->block, sizeof(BlockNumber)); + scratch += sizeof(BlockNumber); + } - if (bkpb->hole_length == 0) + /* followed by main data, if any */ + if (mainrdata_len > 0) + { + if (mainrdata_len > 255) { - rdt->data = page; - rdt->len = BLCKSZ; - total_len += BLCKSZ; - rdt->next = NULL; + *(scratch++) = XLR_BLOCK_ID_DATA_LONG; + memcpy(scratch, &mainrdata_len, sizeof(uint32)); + scratch += sizeof(uint32); } else { - /* must skip the hole */ - rdt->data = page; - rdt->len = bkpb->hole_offset; - total_len += bkpb->hole_offset; - - rdt->next = &(dtbuf_rdt3[i]); - rdt = rdt->next; - - rdt->data = page + (bkpb->hole_offset + bkpb->hole_length); - rdt->len = BLCKSZ - (bkpb->hole_offset + bkpb->hole_length); - total_len += rdt->len; - rdt->next = NULL; + *(scratch++) = XLR_BLOCK_ID_DATA_SHORT; + *(scratch++) = (uint8) mainrdata_len; } + rdt_datas_last->next = mainrdata_head; + rdt_datas_last = mainrdata_last; + total_len += mainrdata_len; } + rdt_datas_last->next = NULL; + + hdr_rdt.len = (scratch - hdr_scratch); + total_len += hdr_rdt.len; /* - * We disallow len == 0 because it provides a useful bit of extra error - * checking in ReadRecord. This means that all callers of XLogInsert - * must supply at least some not-in-a-buffer data. However, we make an - * exception for XLOG SWITCH records because we don't want them to ever - * cross a segment boundary. + * Calculate CRC of the data + * + * Note that the record header isn't added into the CRC initially since we + * don't know the prev-link yet. Thus, the CRC will represent the CRC of + * the whole record in the order: rdata, then backup blocks, then record + * header. */ - if (len == 0 && !isLogSwitch) - elog(PANIC, "invalid xlog record length %u", rechdr->xl_len); + INIT_CRC32C(rdata_crc); + COMP_CRC32C(rdata_crc, hdr_scratch + SizeOfXLogRecord, hdr_rdt.len - SizeOfXLogRecord); + for (rdt = hdr_rdt.next; rdt != NULL; rdt = rdt->next) + COMP_CRC32C(rdata_crc, rdt->data, rdt->len); /* * Fill in the fields in the record header. Prev-link is filled in later, - * once we know where in the WAL the record will be inserted. CRC is also - * not calculated yet. + * once we know where in the WAL the record will be inserted. The CRC does + * not include the record header yet. */ rechdr->xl_xid = GetCurrentTransactionIdIfAny(); rechdr->xl_tot_len = total_len; - rechdr->xl_len = len; /* doesn't include backup blocks */ rechdr->xl_info = info; rechdr->xl_rmid = rmid; rechdr->xl_prev = InvalidXLogRecPtr; + rechdr->xl_crc = rdata_crc; return &hdr_rdt; } @@ -429,45 +752,41 @@ XLogSaveBufferForHint(Buffer buffer, bool buffer_std) if (lsn <= RedoRecPtr) { - XLogRecData rdata[2]; - BkpBlock bkpb; + int flags; char copied_buffer[BLCKSZ]; char *origdata = (char *) BufferGetBlock(buffer); - - /* Make a BkpBlock struct representing the buffer */ - XLogFillBkpBlock(buffer, buffer_std, &bkpb); + RelFileNode rnode; + ForkNumber forkno; + BlockNumber blkno; /* * Copy buffer so we don't have to worry about concurrent hint bit or * lsn updates. We assume pd_lower/upper cannot be changed without an * exclusive lock, so the contents bkp are not racy. - * - * With buffer_std set to false, XLogFillBkpBlock() sets hole_length - * and hole_offset to 0; so the following code is safe for either - * case. */ - memcpy(copied_buffer, origdata, bkpb.hole_offset); - memcpy(copied_buffer + bkpb.hole_offset, - origdata + bkpb.hole_offset + bkpb.hole_length, - BLCKSZ - bkpb.hole_offset - bkpb.hole_length); + if (buffer_std) + { + /* Assume we can omit data between pd_lower and pd_upper */ + Page page = BufferGetPage(buffer); + uint16 lower = ((PageHeader) page)->pd_lower; + uint16 upper = ((PageHeader) page)->pd_upper; - /* - * Header for backup block. - */ - rdata[0].data = (char *) &bkpb; - rdata[0].len = sizeof(BkpBlock); - rdata[0].buffer = InvalidBuffer; - rdata[0].next = &(rdata[1]); + memcpy(copied_buffer, origdata, lower); + memcpy(copied_buffer + upper, origdata + upper, BLCKSZ - upper); + } + else + memcpy(copied_buffer, origdata, BLCKSZ); - /* - * Save copy of the buffer. - */ - rdata[1].data = copied_buffer; - rdata[1].len = BLCKSZ - bkpb.hole_length; - rdata[1].buffer = InvalidBuffer; - rdata[1].next = NULL; + XLogBeginInsert(); - recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI, rdata); + flags = REGBUF_FORCE_IMAGE; + if (buffer_std) + flags |= REGBUF_STANDARD; + + BufferGetTag(buffer, &rnode, &forkno, &blkno); + XLogRegisterBlock(0, &rnode, forkno, blkno, copied_buffer, flags); + + recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI); } return recptr; @@ -489,71 +808,16 @@ XLogRecPtr log_newpage(RelFileNode *rnode, ForkNumber forkNum, BlockNumber blkno, Page page, bool page_std) { - BkpBlock bkpb; + int flags; XLogRecPtr recptr; - XLogRecData rdata[3]; - - /* NO ELOG(ERROR) from here till newpage op is logged */ - START_CRIT_SECTION(); - - bkpb.node = *rnode; - bkpb.fork = forkNum; - bkpb.block = blkno; + flags = REGBUF_FORCE_IMAGE; if (page_std) - { - /* Assume we can omit data between pd_lower and pd_upper */ - uint16 lower = ((PageHeader) page)->pd_lower; - uint16 upper = ((PageHeader) page)->pd_upper; - - if (lower >= SizeOfPageHeaderData && - upper > lower && - upper <= BLCKSZ) - { - bkpb.hole_offset = lower; - bkpb.hole_length = upper - lower; - } - else - { - /* No "hole" to compress out */ - bkpb.hole_offset = 0; - bkpb.hole_length = 0; - } - } - else - { - /* Not a standard page header, don't try to eliminate "hole" */ - bkpb.hole_offset = 0; - bkpb.hole_length = 0; - } - - rdata[0].data = (char *) &bkpb; - rdata[0].len = sizeof(BkpBlock); - rdata[0].buffer = InvalidBuffer; - rdata[0].next = &(rdata[1]); - - if (bkpb.hole_length == 0) - { - rdata[1].data = (char *) page; - rdata[1].len = BLCKSZ; - rdata[1].buffer = InvalidBuffer; - rdata[1].next = NULL; - } - else - { - /* must skip the hole */ - rdata[1].data = (char *) page; - rdata[1].len = bkpb.hole_offset; - rdata[1].buffer = InvalidBuffer; - rdata[1].next = &rdata[2]; - - rdata[2].data = (char *) page + (bkpb.hole_offset + bkpb.hole_length); - rdata[2].len = BLCKSZ - (bkpb.hole_offset + bkpb.hole_length); - rdata[2].buffer = InvalidBuffer; - rdata[2].next = NULL; - } + flags |= REGBUF_STANDARD; - recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI, rdata); + XLogBeginInsert(); + XLogRegisterBlock(0, rnode, forkNum, blkno, page, flags); + recptr = XLogInsert(RM_XLOG_ID, XLOG_FPI); /* * The page may be uninitialized. If so, we can't set the LSN because that @@ -564,8 +828,6 @@ log_newpage(RelFileNode *rnode, ForkNumber forkNum, BlockNumber blkno, PageSetLSN(page, recptr); } - END_CRIT_SECTION(); - return recptr; } @@ -596,38 +858,38 @@ log_newpage_buffer(Buffer buffer, bool page_std) } /* - * Fill a BkpBlock for a buffer. + * Allocate working buffers needed for WAL record construction. */ -static void -XLogFillBkpBlock(Buffer buffer, bool buffer_std, BkpBlock *bkpb) +void +InitXLogInsert(void) { - BufferGetTag(buffer, &bkpb->node, &bkpb->fork, &bkpb->block); + /* Initialize the working areas */ + if (xloginsert_cxt == NULL) + { + xloginsert_cxt = AllocSetContextCreate(TopMemoryContext, + "WAL record construction", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + } - if (buffer_std) + if (registered_buffers == NULL) { - /* Assume we can omit data between pd_lower and pd_upper */ - Page page = BufferGetPage(buffer); - uint16 lower = ((PageHeader) page)->pd_lower; - uint16 upper = ((PageHeader) page)->pd_upper; - - if (lower >= SizeOfPageHeaderData && - upper > lower && - upper <= BLCKSZ) - { - bkpb->hole_offset = lower; - bkpb->hole_length = upper - lower; - } - else - { - /* No "hole" to compress out */ - bkpb->hole_offset = 0; - bkpb->hole_length = 0; - } + registered_buffers = (registered_buffer *) + MemoryContextAllocZero(xloginsert_cxt, + sizeof(registered_buffer) * (XLR_NORMAL_MAX_BLOCK_ID + 1)); + max_registered_buffers = XLR_NORMAL_MAX_BLOCK_ID + 1; } - else + if (rdatas == NULL) { - /* Not a standard page header, don't try to eliminate "hole" */ - bkpb->hole_offset = 0; - bkpb->hole_length = 0; + rdatas = MemoryContextAlloc(xloginsert_cxt, + sizeof(XLogRecData) * XLR_NORMAL_RDATAS); + max_rdatas = XLR_NORMAL_RDATAS; } + + /* + * Allocate a buffer to hold the header information for a WAL record. + */ + if (hdr_scratch == NULL) + hdr_scratch = palloc0(HEADER_SCRATCH_SIZE); } diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c index 7d573cc585d..67d62234369 100644 --- a/src/backend/access/transam/xlogreader.c +++ b/src/backend/access/transam/xlogreader.c @@ -37,6 +37,8 @@ report_invalid_record(XLogReaderState *state, const char *fmt,...) the supplied arguments. */ __attribute__((format(PG_PRINTF_ATTRIBUTE, 2, 3))); +static void ResetDecoder(XLogReaderState *state); + /* size of the buffer allocated for error message. */ #define MAX_ERRORMSG_LEN 1000 @@ -59,46 +61,33 @@ report_invalid_record(XLogReaderState *state, const char *fmt,...) /* * Allocate and initialize a new XLogReader. * - * Returns NULL if the xlogreader couldn't be allocated. + * The returned XLogReader is palloc'd. (In FRONTEND code, that means that + * running out-of-memory causes an immediate exit(1). */ XLogReaderState * XLogReaderAllocate(XLogPageReadCB pagereadfunc, void *private_data) { XLogReaderState *state; - AssertArg(pagereadfunc != NULL); + state = (XLogReaderState *) palloc0(sizeof(XLogReaderState)); - state = (XLogReaderState *) malloc(sizeof(XLogReaderState)); - if (!state) - return NULL; - MemSet(state, 0, sizeof(XLogReaderState)); + state->max_block_id = -1; /* * Permanently allocate readBuf. We do it this way, rather than just * making a static array, for two reasons: (1) no need to waste the * storage in most instantiations of the backend; (2) a static char array - * isn't guaranteed to have any particular alignment, whereas malloc() + * isn't guaranteed to have any particular alignment, whereas palloc() * will provide MAXALIGN'd storage. */ - state->readBuf = (char *) malloc(XLOG_BLCKSZ); - if (!state->readBuf) - { - free(state); - return NULL; - } + state->readBuf = (char *) palloc(XLOG_BLCKSZ); state->read_page = pagereadfunc; /* system_identifier initialized to zeroes above */ state->private_data = private_data; /* ReadRecPtr and EndRecPtr initialized to zeroes above */ /* readSegNo, readOff, readLen, readPageTLI initialized to zeroes above */ - state->errormsg_buf = malloc(MAX_ERRORMSG_LEN + 1); - if (!state->errormsg_buf) - { - free(state->readBuf); - free(state); - return NULL; - } + state->errormsg_buf = palloc(MAX_ERRORMSG_LEN + 1); state->errormsg_buf[0] = '\0'; /* @@ -107,9 +96,9 @@ XLogReaderAllocate(XLogPageReadCB pagereadfunc, void *private_data) */ if (!allocate_recordbuf(state, 0)) { - free(state->errormsg_buf); - free(state->readBuf); - free(state); + pfree(state->errormsg_buf); + pfree(state->readBuf); + pfree(state); return NULL; } @@ -119,11 +108,24 @@ XLogReaderAllocate(XLogPageReadCB pagereadfunc, void *private_data) void XLogReaderFree(XLogReaderState *state) { - free(state->errormsg_buf); + int block_id; + + for (block_id = 0; block_id <= state->max_block_id; block_id++) + { + if (state->blocks[block_id].in_use) + { + if (state->blocks[block_id].data) + pfree(state->blocks[block_id].data); + } + } + if (state->main_data) + pfree(state->main_data); + + pfree(state->errormsg_buf); if (state->readRecordBuf) - free(state->readRecordBuf); - free(state->readBuf); - free(state); + pfree(state->readRecordBuf); + pfree(state->readBuf); + pfree(state); } /* @@ -146,14 +148,8 @@ allocate_recordbuf(XLogReaderState *state, uint32 reclength) newSize = Max(newSize, 5 * Max(BLCKSZ, XLOG_BLCKSZ)); if (state->readRecordBuf) - free(state->readRecordBuf); - state->readRecordBuf = (char *) malloc(newSize); - if (!state->readRecordBuf) - { - state->readRecordBufSize = 0; - return false; - } - + pfree(state->readRecordBuf); + state->readRecordBuf = (char *) palloc(newSize); state->readRecordBufSize = newSize; return true; } @@ -191,6 +187,8 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg) *errormsg = NULL; state->errormsg_buf[0] = '\0'; + ResetDecoder(state); + if (RecPtr == InvalidXLogRecPtr) { RecPtr = state->EndRecPtr; @@ -440,7 +438,10 @@ XLogReadRecord(XLogReaderState *state, XLogRecPtr RecPtr, char **errormsg) state->EndRecPtr -= state->EndRecPtr % XLogSegSize; } - return record; + if (DecodeXLogRecord(state, record, errormsg)) + return record; + else + return NULL; err: @@ -579,30 +580,7 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, XLogRecPtr PrevRecPtr, XLogRecord *record, bool randAccess) { - /* - * xl_len == 0 is bad data for everything except XLOG SWITCH, where it is - * required. - */ - if (record->xl_rmid == RM_XLOG_ID && record->xl_info == XLOG_SWITCH) - { - if (record->xl_len != 0) - { - report_invalid_record(state, - "invalid xlog switch record at %X/%X", - (uint32) (RecPtr >> 32), (uint32) RecPtr); - return false; - } - } - else if (record->xl_len == 0) - { - report_invalid_record(state, - "record with zero length at %X/%X", - (uint32) (RecPtr >> 32), (uint32) RecPtr); - return false; - } - if (record->xl_tot_len < SizeOfXLogRecord + record->xl_len || - record->xl_tot_len > SizeOfXLogRecord + record->xl_len + - XLR_MAX_BKP_BLOCKS * (sizeof(BkpBlock) + BLCKSZ)) + if (record->xl_tot_len < SizeOfXLogRecord) { report_invalid_record(state, "invalid record length at %X/%X", @@ -663,79 +641,17 @@ ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, * We assume all of the record (that is, xl_tot_len bytes) has been read * into memory at *record. Also, ValidXLogRecordHeader() has accepted the * record's header, which means in particular that xl_tot_len is at least - * SizeOfXlogRecord, so it is safe to fetch xl_len. + * SizeOfXlogRecord. */ static bool ValidXLogRecord(XLogReaderState *state, XLogRecord *record, XLogRecPtr recptr) { pg_crc32 crc; - int i; - uint32 len = record->xl_len; - BkpBlock bkpb; - char *blk; - size_t remaining = record->xl_tot_len; - /* First the rmgr data */ - if (remaining < SizeOfXLogRecord + len) - { - /* ValidXLogRecordHeader() should've caught this already... */ - report_invalid_record(state, "invalid record length at %X/%X", - (uint32) (recptr >> 32), (uint32) recptr); - return false; - } - remaining -= SizeOfXLogRecord + len; + /* Calculate the CRC */ INIT_CRC32C(crc); - COMP_CRC32C(crc, XLogRecGetData(record), len); - - /* Add in the backup blocks, if any */ - blk = (char *) XLogRecGetData(record) + len; - for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++) - { - uint32 blen; - - if (!(record->xl_info & XLR_BKP_BLOCK(i))) - continue; - - if (remaining < sizeof(BkpBlock)) - { - report_invalid_record(state, - "invalid backup block size in record at %X/%X", - (uint32) (recptr >> 32), (uint32) recptr); - return false; - } - memcpy(&bkpb, blk, sizeof(BkpBlock)); - - if (bkpb.hole_offset + bkpb.hole_length > BLCKSZ) - { - report_invalid_record(state, - "incorrect hole size in record at %X/%X", - (uint32) (recptr >> 32), (uint32) recptr); - return false; - } - blen = sizeof(BkpBlock) + BLCKSZ - bkpb.hole_length; - - if (remaining < blen) - { - report_invalid_record(state, - "invalid backup block size in record at %X/%X", - (uint32) (recptr >> 32), (uint32) recptr); - return false; - } - remaining -= blen; - COMP_CRC32C(crc, blk, blen); - blk += blen; - } - - /* Check that xl_tot_len agrees with our calculation */ - if (remaining != 0) - { - report_invalid_record(state, - "incorrect total length in record at %X/%X", - (uint32) (recptr >> 32), (uint32) recptr); - return false; - } - - /* Finally include the record header */ + COMP_CRC32C(crc, ((char *) record) + SizeOfXLogRecord, record->xl_tot_len - SizeOfXLogRecord); + /* include the record header last */ COMP_CRC32C(crc, (char *) record, offsetof(XLogRecord, xl_crc)); FIN_CRC32C(crc); @@ -985,3 +901,321 @@ out: } #endif /* FRONTEND */ + + +/* ---------------------------------------- + * Functions for decoding the data and block references in a record. + * ---------------------------------------- + */ + +/* private function to reset the state between records */ +static void +ResetDecoder(XLogReaderState *state) +{ + int block_id; + + state->decoded_record = NULL; + + state->main_data_len = 0; + + for (block_id = 0; block_id <= state->max_block_id; block_id++) + { + state->blocks[block_id].in_use = false; + state->blocks[block_id].has_image = false; + state->blocks[block_id].has_data = false; + } + state->max_block_id = -1; +} + +/* + * Decode the previously read record. + * + * On error, a human-readable error message is returned in *errormsg, and + * the return value is false. + */ +bool +DecodeXLogRecord(XLogReaderState *state, XLogRecord *record, char **errormsg) +{ + /* + * read next _size bytes from record buffer, but check for overrun first. + */ +#define COPY_HEADER_FIELD(_dst, _size) \ + do { \ + if (remaining < _size) \ + goto shortdata_err; \ + memcpy(_dst, ptr, _size); \ + ptr += _size; \ + remaining -= _size; \ + } while(0) + + char *ptr; + uint32 remaining; + uint32 datatotal; + RelFileNode *rnode = NULL; + uint8 block_id; + + ResetDecoder(state); + + state->decoded_record = record; + + ptr = (char *) record; + ptr += SizeOfXLogRecord; + remaining = record->xl_tot_len - SizeOfXLogRecord; + + /* Decode the headers */ + datatotal = 0; + while (remaining > datatotal) + { + COPY_HEADER_FIELD(&block_id, sizeof(uint8)); + + if (block_id == XLR_BLOCK_ID_DATA_SHORT) + { + /* XLogRecordDataHeaderShort */ + uint8 main_data_len; + + COPY_HEADER_FIELD(&main_data_len, sizeof(uint8)); + + state->main_data_len = main_data_len; + datatotal += main_data_len; + break; /* by convention, the main data fragment is + * always last */ + } + else if (block_id == XLR_BLOCK_ID_DATA_LONG) + { + /* XLogRecordDataHeaderLong */ + uint32 main_data_len; + + COPY_HEADER_FIELD(&main_data_len, sizeof(uint32)); + state->main_data_len = main_data_len; + datatotal += main_data_len; + break; /* by convention, the main data fragment is + * always last */ + } + else if (block_id <= XLR_MAX_BLOCK_ID) + { + /* XLogRecordBlockHeader */ + DecodedBkpBlock *blk; + uint8 fork_flags; + + if (block_id <= state->max_block_id) + { + report_invalid_record(state, + "out-of-order block_id %u at %X/%X", + block_id, + (uint32) (state->ReadRecPtr >> 32), + (uint32) state->ReadRecPtr); + goto err; + } + state->max_block_id = block_id; + + blk = &state->blocks[block_id]; + blk->in_use = true; + + COPY_HEADER_FIELD(&fork_flags, sizeof(uint8)); + blk->forknum = fork_flags & BKPBLOCK_FORK_MASK; + blk->flags = fork_flags; + blk->has_image = ((fork_flags & BKPBLOCK_HAS_IMAGE) != 0); + blk->has_data = ((fork_flags & BKPBLOCK_HAS_DATA) != 0); + + COPY_HEADER_FIELD(&blk->data_len, sizeof(uint16)); + /* cross-check that the HAS_DATA flag is set iff data_length > 0 */ + if (blk->has_data && blk->data_len == 0) + report_invalid_record(state, + "BKPBLOCK_HAS_DATA set, but no data included at %X/%X", + (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr); + if (!blk->has_data && blk->data_len != 0) + report_invalid_record(state, + "BKPBLOCK_HAS_DATA not set, but data length is %u at %X/%X", + (unsigned int) blk->data_len, + (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr); + datatotal += blk->data_len; + + if (blk->has_image) + { + COPY_HEADER_FIELD(&blk->hole_offset, sizeof(uint16)); + COPY_HEADER_FIELD(&blk->hole_length, sizeof(uint16)); + datatotal += BLCKSZ - blk->hole_length; + } + if (!(fork_flags & BKPBLOCK_SAME_REL)) + { + COPY_HEADER_FIELD(&blk->rnode, sizeof(RelFileNode)); + rnode = &blk->rnode; + } + else + { + if (rnode == NULL) + { + report_invalid_record(state, + "BKPBLOCK_SAME_REL set but no previous rel at %X/%X", + (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr); + goto err; + } + + blk->rnode = *rnode; + } + COPY_HEADER_FIELD(&blk->blkno, sizeof(BlockNumber)); + } + else + { + report_invalid_record(state, + "invalid block_id %u at %X/%X", + block_id, + (uint32) (state->ReadRecPtr >> 32), + (uint32) state->ReadRecPtr); + goto err; + } + } + + if (remaining != datatotal) + goto shortdata_err; + + /* + * Ok, we've parsed the fragment headers, and verified that the total + * length of the payload in the fragments is equal to the amount of data + * left. Copy the data of each fragment to a separate buffer. + * + * We could just set up pointers into readRecordBuf, but we want to align + * the data for the convenience of the callers. Backup images are not + * copied, however; they don't need alignment. + */ + + /* block data first */ + for (block_id = 0; block_id <= state->max_block_id; block_id++) + { + DecodedBkpBlock *blk = &state->blocks[block_id]; + + if (!blk->in_use) + continue; + if (blk->has_image) + { + blk->bkp_image = ptr; + ptr += BLCKSZ - blk->hole_length; + } + if (blk->has_data) + { + if (!blk->data || blk->data_len > blk->data_bufsz) + { + if (blk->data) + pfree(blk->data); + blk->data_bufsz = blk->data_len; + blk->data = palloc(blk->data_bufsz); + } + memcpy(blk->data, ptr, blk->data_len); + ptr += blk->data_len; + } + } + + /* and finally, the main data */ + if (state->main_data_len > 0) + { + if (!state->main_data || state->main_data_len > state->main_data_bufsz) + { + if (state->main_data) + pfree(state->main_data); + state->main_data_bufsz = state->main_data_len; + state->main_data = palloc(state->main_data_bufsz); + } + memcpy(state->main_data, ptr, state->main_data_len); + ptr += state->main_data_len; + } + + return true; + +shortdata_err: + report_invalid_record(state, + "record with invalid length at %X/%X", + (uint32) (state->ReadRecPtr >> 32), (uint32) state->ReadRecPtr); +err: + *errormsg = state->errormsg_buf; + + return false; +} + +/* + * Returns information about the block that a block reference refers to. + * + * If the WAL record contains a block reference with the given ID, *rnode, + * *forknum, and *blknum are filled in (if not NULL), and returns TRUE. + * Otherwise returns FALSE. + */ +bool +XLogRecGetBlockTag(XLogReaderState *record, uint8 block_id, + RelFileNode *rnode, ForkNumber *forknum, BlockNumber *blknum) +{ + DecodedBkpBlock *bkpb; + + if (!record->blocks[block_id].in_use) + return false; + + bkpb = &record->blocks[block_id]; + if (rnode) + *rnode = bkpb->rnode; + if (forknum) + *forknum = bkpb->forknum; + if (blknum) + *blknum = bkpb->blkno; + return true; +} + +/* + * Returns the data associated with a block reference, or NULL if there is + * no data (e.g. because a full-page image was taken instead). The returned + * pointer points to a MAXALIGNed buffer. + */ +char * +XLogRecGetBlockData(XLogReaderState *record, uint8 block_id, Size *len) +{ + DecodedBkpBlock *bkpb; + + if (!record->blocks[block_id].in_use) + return NULL; + + bkpb = &record->blocks[block_id]; + + if (!bkpb->has_data) + { + if (len) + *len = 0; + return NULL; + } + else + { + if (len) + *len = bkpb->data_len; + return bkpb->data; + } +} + +/* + * Restore a full-page image from a backup block attached to an XLOG record. + * + * Returns the buffer number containing the page. + */ +bool +RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page) +{ + DecodedBkpBlock *bkpb; + + if (!record->blocks[block_id].in_use) + return false; + if (!record->blocks[block_id].has_image) + return false; + + bkpb = &record->blocks[block_id]; + + if (bkpb->hole_length == 0) + { + memcpy(page, bkpb->bkp_image, BLCKSZ); + } + else + { + memcpy(page, bkpb->bkp_image, bkpb->hole_offset); + /* must zero-fill the hole */ + MemSet(page + bkpb->hole_offset, 0, bkpb->hole_length); + memcpy(page + (bkpb->hole_offset + bkpb->hole_length), + bkpb->bkp_image + bkpb->hole_offset, + BLCKSZ - (bkpb->hole_offset + bkpb->hole_length)); + } + + return true; +} diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c index cf04081c19e..ae323a0db87 100644 --- a/src/backend/access/transam/xlogutils.c +++ b/src/backend/access/transam/xlogutils.c @@ -253,9 +253,8 @@ XLogCheckInvalidPages(void) * * 'lsn' is the LSN of the record being replayed. It is compared with the * page's LSN to determine if the record has already been replayed. - * 'rnode' and 'blkno' point to the block being replayed (main fork number - * is implied, use XLogReadBufferForRedoExtended for other forks). - * 'block_index' identifies the backup block in the record for the page. + * 'block_id' is the ID number the block was registered with, when the WAL + * record was created. * * Returns one of the following: * @@ -272,15 +271,36 @@ XLogCheckInvalidPages(void) * single-process crash recovery, but some subroutines such as MarkBufferDirty * will complain if we don't have the lock. In hot standby mode it's * definitely necessary.) + * + * Note: when a backup block is available in XLOG, we restore it + * unconditionally, even if the page in the database appears newer. This is + * to protect ourselves against database pages that were partially or + * incorrectly written during a crash. We assume that the XLOG data must be + * good because it has passed a CRC check, while the database page might not + * be. This will force us to replay all subsequent modifications of the page + * that appear in XLOG, rather than possibly ignoring them as already + * applied, but that's not a huge drawback. */ XLogRedoAction -XLogReadBufferForRedo(XLogRecPtr lsn, XLogRecord *record, int block_index, - RelFileNode rnode, BlockNumber blkno, +XLogReadBufferForRedo(XLogReaderState *record, uint8 block_id, Buffer *buf) { - return XLogReadBufferForRedoExtended(lsn, record, block_index, - rnode, MAIN_FORKNUM, blkno, - RBM_NORMAL, false, buf); + return XLogReadBufferForRedoExtended(record, block_id, RBM_NORMAL, + false, buf); +} + +/* + * Pin and lock a buffer referenced by a WAL record, for the purpose of + * re-initializing it. + */ +Buffer +XLogInitBufferForRedo(XLogReaderState *record, uint8 block_id) +{ + Buffer buf; + + XLogReadBufferForRedoExtended(record, block_id, RBM_ZERO_AND_LOCK, false, + &buf); + return buf; } /* @@ -299,21 +319,54 @@ XLogReadBufferForRedo(XLogRecPtr lsn, XLogRecord *record, int block_index, * using LockBufferForCleanup(), instead of a regular exclusive lock. */ XLogRedoAction -XLogReadBufferForRedoExtended(XLogRecPtr lsn, XLogRecord *record, - int block_index, RelFileNode rnode, - ForkNumber forkno, BlockNumber blkno, +XLogReadBufferForRedoExtended(XLogReaderState *record, + uint8 block_id, ReadBufferMode mode, bool get_cleanup_lock, Buffer *buf) { - if (record->xl_info & XLR_BKP_BLOCK(block_index)) + XLogRecPtr lsn = record->EndRecPtr; + RelFileNode rnode; + ForkNumber forknum; + BlockNumber blkno; + Page page; + + if (!XLogRecGetBlockTag(record, block_id, &rnode, &forknum, &blkno)) + { + /* Caller specified a bogus block_id */ + elog(PANIC, "failed to locate backup block with ID %d", block_id); + } + + /* If it's a full-page image, restore it. */ + if (XLogRecHasBlockImage(record, block_id)) { - *buf = RestoreBackupBlock(lsn, record, block_index, - get_cleanup_lock, true); + *buf = XLogReadBufferExtended(rnode, forknum, blkno, + get_cleanup_lock ? RBM_ZERO_AND_CLEANUP_LOCK : RBM_ZERO_AND_LOCK); + page = BufferGetPage(*buf); + if (!RestoreBlockImage(record, block_id, page)) + elog(ERROR, "failed to restore block image"); + + /* + * The page may be uninitialized. If so, we can't set the LSN because + * that would corrupt the page. + */ + if (!PageIsNew(page)) + { + PageSetLSN(page, lsn); + } + + MarkBufferDirty(*buf); + return BLK_RESTORED; } else { - *buf = XLogReadBufferExtended(rnode, forkno, blkno, mode); + if ((record->blocks[block_id].flags & BKPBLOCK_WILL_INIT) != 0 && + mode != RBM_ZERO_AND_LOCK && mode != RBM_ZERO_AND_CLEANUP_LOCK) + { + elog(PANIC, "block with WILL_INIT flag in WAL record must be zeroed by redo routine"); + } + + *buf = XLogReadBufferExtended(rnode, forknum, blkno, mode); if (BufferIsValid(*buf)) { if (mode != RBM_ZERO_AND_LOCK && mode != RBM_ZERO_AND_CLEANUP_LOCK) @@ -334,37 +387,6 @@ XLogReadBufferForRedoExtended(XLogRecPtr lsn, XLogRecord *record, } /* - * XLogReadBuffer - * Read a page during XLOG replay. - * - * This is a shorthand of XLogReadBufferExtended() followed by - * LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE), for reading from the main - * fork. - * - * (Getting the buffer lock is not really necessary during single-process - * crash recovery, but some subroutines such as MarkBufferDirty will complain - * if we don't have the lock. In hot standby mode it's definitely necessary.) - * - * The returned buffer is exclusively-locked. - * - * For historical reasons, instead of a ReadBufferMode argument, this only - * supports RBM_ZERO_AND_LOCK (init == true) and RBM_NORMAL (init == false) - * modes. - */ -Buffer -XLogReadBuffer(RelFileNode rnode, BlockNumber blkno, bool init) -{ - Buffer buf; - - buf = XLogReadBufferExtended(rnode, MAIN_FORKNUM, blkno, - init ? RBM_ZERO_AND_LOCK : RBM_NORMAL); - if (BufferIsValid(buf) && !init) - LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE); - - return buf; -} - -/* * XLogReadBufferExtended * Read a page during XLOG replay * @@ -383,6 +405,11 @@ XLogReadBuffer(RelFileNode rnode, BlockNumber blkno, bool init) * In RBM_NORMAL_NO_LOG mode, we return InvalidBuffer if the page doesn't * exist, and we don't check for all-zeroes. Thus, no log entry is made * to imply that the page should be dropped or truncated later. + * + * NB: A redo function should normally not call this directly. To get a page + * to modify, use XLogReplayBuffer instead. It is important that all pages + * modified by a WAL record are registered in the WAL records, or they will be + * invisible to tools that that need to know which pages are modified. */ Buffer XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum, @@ -474,124 +501,6 @@ XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum, } /* - * Restore a full-page image from a backup block attached to an XLOG record. - * - * lsn: LSN of the XLOG record being replayed - * record: the complete XLOG record - * block_index: which backup block to restore (0 .. XLR_MAX_BKP_BLOCKS - 1) - * get_cleanup_lock: TRUE to get a cleanup rather than plain exclusive lock - * keep_buffer: TRUE to return the buffer still locked and pinned - * - * Returns the buffer number containing the page. Note this is not terribly - * useful unless keep_buffer is specified as TRUE. - * - * Note: when a backup block is available in XLOG, we restore it - * unconditionally, even if the page in the database appears newer. - * This is to protect ourselves against database pages that were partially - * or incorrectly written during a crash. We assume that the XLOG data - * must be good because it has passed a CRC check, while the database - * page might not be. This will force us to replay all subsequent - * modifications of the page that appear in XLOG, rather than possibly - * ignoring them as already applied, but that's not a huge drawback. - * - * If 'get_cleanup_lock' is true, a cleanup lock is obtained on the buffer, - * else a normal exclusive lock is used. During crash recovery, that's just - * pro forma because there can't be any regular backends in the system, but - * in hot standby mode the distinction is important. - * - * If 'keep_buffer' is true, return without releasing the buffer lock and pin; - * then caller is responsible for doing UnlockReleaseBuffer() later. This - * is needed in some cases when replaying XLOG records that touch multiple - * pages, to prevent inconsistent states from being visible to other backends. - * (Again, that's only important in hot standby mode.) - */ -Buffer -RestoreBackupBlock(XLogRecPtr lsn, XLogRecord *record, int block_index, - bool get_cleanup_lock, bool keep_buffer) -{ - BkpBlock bkpb; - char *blk; - int i; - - /* Locate requested BkpBlock in the record */ - blk = (char *) XLogRecGetData(record) + record->xl_len; - for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++) - { - if (!(record->xl_info & XLR_BKP_BLOCK(i))) - continue; - - memcpy(&bkpb, blk, sizeof(BkpBlock)); - blk += sizeof(BkpBlock); - - if (i == block_index) - { - /* Found it, apply the update */ - return RestoreBackupBlockContents(lsn, bkpb, blk, get_cleanup_lock, - keep_buffer); - } - - blk += BLCKSZ - bkpb.hole_length; - } - - /* Caller specified a bogus block_index */ - elog(ERROR, "failed to restore block_index %d", block_index); - return InvalidBuffer; /* keep compiler quiet */ -} - -/* - * Workhorse for RestoreBackupBlock usable without an xlog record - * - * Restores a full-page image from BkpBlock and a data pointer. - */ -Buffer -RestoreBackupBlockContents(XLogRecPtr lsn, BkpBlock bkpb, char *blk, - bool get_cleanup_lock, bool keep_buffer) -{ - Buffer buffer; - Page page; - - buffer = XLogReadBufferExtended(bkpb.node, bkpb.fork, bkpb.block, - get_cleanup_lock ? RBM_ZERO_AND_CLEANUP_LOCK : RBM_ZERO_AND_LOCK); - Assert(BufferIsValid(buffer)); - - page = (Page) BufferGetPage(buffer); - - if (bkpb.hole_length == 0) - { - memcpy((char *) page, blk, BLCKSZ); - } - else - { - memcpy((char *) page, blk, bkpb.hole_offset); - /* must zero-fill the hole */ - MemSet((char *) page + bkpb.hole_offset, 0, bkpb.hole_length); - memcpy((char *) page + (bkpb.hole_offset + bkpb.hole_length), - blk + bkpb.hole_offset, - BLCKSZ - (bkpb.hole_offset + bkpb.hole_length)); - } - - /* - * The checksum value on this page is currently invalid. We don't need to - * reset it here since it will be set before being written. - */ - - /* - * The page may be uninitialized. If so, we can't set the LSN because that - * would corrupt the page. - */ - if (!PageIsNew(page)) - { - PageSetLSN(page, lsn); - } - MarkBufferDirty(buffer); - - if (!keep_buffer) - UnlockReleaseBuffer(buffer); - - return buffer; -} - -/* * Struct actually returned by XLogFakeRelcacheEntry, though the declared * return type is Relation. */ |