diff options
Diffstat (limited to 'src/backend/access/transam/xlog.c')
-rw-r--r-- | src/backend/access/transam/xlog.c | 255 |
1 files changed, 244 insertions, 11 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 9056f0b4549..c0e328bf619 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.134 2004/02/10 01:55:24 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.135 2004/02/11 22:55:24 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -439,6 +439,7 @@ static bool InRedo = false; static bool AdvanceXLInsertBuffer(void); +static bool WasteXLInsertBuffer(void); static void XLogWrite(XLogwrtRqst WriteRqst); static int XLogFileInit(uint32 log, uint32 seg, bool *use_existent, bool use_lock); @@ -724,19 +725,51 @@ begin:; dtbuf_rdt[2 * i + 1].next = NULL; } - /* Insert record header */ + /* + * Determine exactly where we will place the new XLOG record. If there + * isn't enough space on the current XLOG page for a record header, + * advance to the next page (leaving the unused space as zeroes). + * If there isn't enough space in the current XLOG segment for the whole + * record, advance to the next segment (inserting wasted-space records). + * This avoids needing a continuation record at the start of a segment + * file, which would conflict with placing a FILE_HEADER record there. + * We assume that no XLOG record can be larger than a segment file... + */ updrqst = false; freespace = INSERT_FREESPACE(Insert); if (freespace < SizeOfXLogRecord) { updrqst = AdvanceXLInsertBuffer(); - freespace = BLCKSZ - SizeOfXLogPHD; + freespace = INSERT_FREESPACE(Insert); + } + + if (freespace < (uint32) (SizeOfXLogRecord + write_len)) + { + /* Doesn't fit on this page, so check for overrunning the file */ + uint32 avail; + + /* First figure the space available in remaining pages of file */ + avail = XLogSegSize - BLCKSZ - + (Insert->currpage->xlp_pageaddr.xrecoff % XLogSegSize); + avail /= BLCKSZ; /* convert to pages, then usable bytes */ + avail *= (BLCKSZ - SizeOfXLogPHD - SizeOfXLogContRecord); + avail += freespace; /* add in the current page too */ + if (avail < (uint32) (SizeOfXLogRecord + write_len)) + { + /* It overruns the file, so waste the rest of the file... */ + do { + updrqst = WasteXLInsertBuffer(); + } while ((Insert->currpage->xlp_pageaddr.xrecoff % XLogSegSize) != 0); + freespace = INSERT_FREESPACE(Insert); + } } curridx = Insert->curridx; record = (XLogRecord *) Insert->currpos; + /* Insert record header */ + record->xl_prev = Insert->PrevRecord; if (no_tran) { @@ -829,6 +862,8 @@ begin:; /* Use next buffer */ updrqst = AdvanceXLInsertBuffer(); curridx = Insert->curridx; + /* This assert checks we did not insert a file header record */ + Assert(INSERT_FREESPACE(Insert) == BLCKSZ - SizeOfXLogPHD); /* Insert cont-record header */ Insert->currpage->xlp_info |= XLP_FIRST_IS_CONTRECORD; contrecord = (XLogContRecord *) Insert->currpos; @@ -991,17 +1026,109 @@ AdvanceXLInsertBuffer(void) */ MemSet((char *) NewPage, 0, BLCKSZ); - /* And fill the new page's header */ + /* + * Fill the new page's header + */ NewPage->xlp_magic = XLOG_PAGE_MAGIC; /* NewPage->xlp_info = 0; */ /* done by memset */ NewPage->xlp_sui = ThisStartUpID; NewPage->xlp_pageaddr.xlogid = NewPageEndPtr.xlogid; NewPage->xlp_pageaddr.xrecoff = NewPageEndPtr.xrecoff - BLCKSZ; + /* + * If first page of an XLOG segment file, add a FILE_HEADER record. + */ + if ((NewPage->xlp_pageaddr.xrecoff % XLogSegSize) == 0) + { + XLogRecPtr RecPtr; + XLogRecord *record; + XLogFileHeaderData *fhdr; + crc64 crc; + + record = (XLogRecord *) Insert->currpos; + record->xl_prev = Insert->PrevRecord; + record->xl_xact_prev.xlogid = 0; + record->xl_xact_prev.xrecoff = 0; + record->xl_xid = InvalidTransactionId; + record->xl_len = SizeOfXLogFHD; + record->xl_info = XLOG_FILE_HEADER; + record->xl_rmid = RM_XLOG_ID; + fhdr = (XLogFileHeaderData *) XLogRecGetData(record); + fhdr->xlfhd_sysid = ControlFile->system_identifier; + fhdr->xlfhd_xlogid = NewPage->xlp_pageaddr.xlogid; + fhdr->xlfhd_segno = NewPage->xlp_pageaddr.xrecoff / XLogSegSize; + fhdr->xlfhd_seg_size = XLogSegSize; + + INIT_CRC64(crc); + COMP_CRC64(crc, fhdr, SizeOfXLogFHD); + COMP_CRC64(crc, (char *) record + sizeof(crc64), + SizeOfXLogRecord - sizeof(crc64)); + FIN_CRC64(crc); + record->xl_crc = crc; + + /* Compute record's XLOG location */ + INSERT_RECPTR(RecPtr, Insert, nextidx); + + /* Record begin of record in appropriate places */ + Insert->PrevRecord = RecPtr; + + Insert->currpos += SizeOfXLogRecord + SizeOfXLogFHD; + } + return update_needed; } /* + * Fill the remainder of the current XLOG page with an XLOG_WASTED_SPACE + * record, and advance to the next page. This has the same calling and + * result conditions as AdvanceXLInsertBuffer, except that + * AdvanceXLInsertBuffer expects the current page to be already filled. + */ +static bool +WasteXLInsertBuffer(void) +{ + XLogCtlInsert *Insert = &XLogCtl->Insert; + XLogRecord *record; + XLogRecPtr RecPtr; + uint32 freespace; + uint16 curridx; + crc64 rdata_crc; + + freespace = INSERT_FREESPACE(Insert); + Assert(freespace >= SizeOfXLogRecord); + freespace -= SizeOfXLogRecord; + + curridx = Insert->curridx; + record = (XLogRecord *) Insert->currpos; + + record->xl_prev = Insert->PrevRecord; + record->xl_xact_prev.xlogid = 0; + record->xl_xact_prev.xrecoff = 0; + + record->xl_xid = InvalidTransactionId; + record->xl_len = freespace; + record->xl_info = XLOG_WASTED_SPACE; + record->xl_rmid = RM_XLOG_ID; + + INIT_CRC64(rdata_crc); + COMP_CRC64(rdata_crc, XLogRecGetData(record), freespace); + COMP_CRC64(rdata_crc, (char *) record + sizeof(crc64), + SizeOfXLogRecord - sizeof(crc64)); + FIN_CRC64(rdata_crc); + record->xl_crc = rdata_crc; + + /* Compute record's XLOG location */ + INSERT_RECPTR(RecPtr, Insert, curridx); + + /* Record begin of record in appropriate places */ + Insert->PrevRecord = RecPtr; + + /* We needn't bother to advance Insert->currpos */ + + return AdvanceXLInsertBuffer(); +} + +/* * Write and/or fsync the log at least as far as WriteRqst indicates. * * Must be called with WALWriteLock held. @@ -2142,6 +2269,7 @@ WriteControlFile(void) ControlFile->catalog_version_no = CATALOG_VERSION_NO; ControlFile->blcksz = BLCKSZ; ControlFile->relseg_size = RELSEG_SIZE; + ControlFile->xlog_seg_size = XLOG_SEG_SIZE; ControlFile->nameDataLen = NAMEDATALEN; ControlFile->funcMaxArgs = FUNC_MAX_ARGS; @@ -2295,6 +2423,13 @@ ReadControlFile(void) " but the server was compiled with RELSEG_SIZE %d.", ControlFile->relseg_size, RELSEG_SIZE), errhint("It looks like you need to recompile or initdb."))); + if (ControlFile->xlog_seg_size != XLOG_SEG_SIZE) + ereport(FATAL, + (errmsg("database files are incompatible with server"), + errdetail("The database cluster was initialized with XLOG_SEG_SIZE %d," + " but the server was compiled with XLOG_SEG_SIZE %d.", + ControlFile->xlog_seg_size, XLOG_SEG_SIZE), + errhint("It looks like you need to recompile or initdb."))); if (ControlFile->nameDataLen != NAMEDATALEN) ereport(FATAL, (errmsg("database files are incompatible with server"), @@ -2484,15 +2619,36 @@ BootStrapXLOG(void) char *buffer; XLogPageHeader page; XLogRecord *record; + XLogFileHeaderData *fhdr; bool use_existent; + uint64 sysidentifier; + struct timeval tv; crc64 crc; + /* + * Select a hopefully-unique system identifier code for this installation. + * We use the result of gettimeofday(), including the fractional seconds + * field, as being about as unique as we can easily get. (Think not to + * use random(), since it hasn't been seeded and there's no portable way + * to seed it other than the system clock value...) The upper half of the + * uint64 value is just the tv_sec part, while the lower half is the XOR + * of tv_sec and tv_usec. This is to ensure that we don't lose uniqueness + * unnecessarily if "uint64" is really only 32 bits wide. A person + * knowing this encoding can determine the initialization time of the + * installation, which could perhaps be useful sometimes. + */ + gettimeofday(&tv, NULL); + sysidentifier = ((uint64) tv.tv_sec) << 32; + sysidentifier |= (uint32) (tv.tv_sec | tv.tv_usec); + /* Use malloc() to ensure buffer is MAXALIGNED */ buffer = (char *) malloc(BLCKSZ); page = (XLogPageHeader) buffer; + memset(buffer, 0, BLCKSZ); + /* Set up information for the initial checkpoint record */ checkPoint.redo.xlogid = 0; - checkPoint.redo.xrecoff = SizeOfXLogPHD; + checkPoint.redo.xrecoff = SizeOfXLogPHD + SizeOfXLogRecord + SizeOfXLogFHD; checkPoint.undo = checkPoint.redo; checkPoint.ThisStartUpID = 0; checkPoint.nextXid = FirstNormalTransactionId; @@ -2503,16 +2659,42 @@ BootStrapXLOG(void) ShmemVariableCache->nextOid = checkPoint.nextOid; ShmemVariableCache->oidCount = 0; - memset(buffer, 0, BLCKSZ); + /* Set up the XLOG page header */ page->xlp_magic = XLOG_PAGE_MAGIC; page->xlp_info = 0; page->xlp_sui = checkPoint.ThisStartUpID; page->xlp_pageaddr.xlogid = 0; page->xlp_pageaddr.xrecoff = 0; + + /* Insert the file header record */ record = (XLogRecord *) ((char *) page + SizeOfXLogPHD); record->xl_prev.xlogid = 0; record->xl_prev.xrecoff = 0; - record->xl_xact_prev = record->xl_prev; + record->xl_xact_prev.xlogid = 0; + record->xl_xact_prev.xrecoff = 0; + record->xl_xid = InvalidTransactionId; + record->xl_len = SizeOfXLogFHD; + record->xl_info = XLOG_FILE_HEADER; + record->xl_rmid = RM_XLOG_ID; + fhdr = (XLogFileHeaderData *) XLogRecGetData(record); + fhdr->xlfhd_sysid = sysidentifier; + fhdr->xlfhd_xlogid = 0; + fhdr->xlfhd_segno = 0; + fhdr->xlfhd_seg_size = XLogSegSize; + + INIT_CRC64(crc); + COMP_CRC64(crc, fhdr, SizeOfXLogFHD); + COMP_CRC64(crc, (char *) record + sizeof(crc64), + SizeOfXLogRecord - sizeof(crc64)); + FIN_CRC64(crc); + record->xl_crc = crc; + + /* Insert the initial checkpoint record */ + record = (XLogRecord *) ((char *) page + SizeOfXLogPHD + SizeOfXLogRecord + SizeOfXLogFHD); + record->xl_prev.xlogid = 0; + record->xl_prev.xrecoff = SizeOfXLogPHD; + record->xl_xact_prev.xlogid = 0; + record->xl_xact_prev.xrecoff = 0; record->xl_xid = InvalidTransactionId; record->xl_len = sizeof(checkPoint); record->xl_info = XLOG_CHECKPOINT_SHUTDOWN; @@ -2526,9 +2708,11 @@ BootStrapXLOG(void) FIN_CRC64(crc); record->xl_crc = crc; + /* Create first XLOG segment file */ use_existent = false; openLogFile = XLogFileInit(0, 0, &use_existent, false); + /* Write the first page with the initial records */ errno = 0; if (write(openLogFile, buffer, BLCKSZ) != BLCKSZ) { @@ -2552,8 +2736,11 @@ BootStrapXLOG(void) openLogFile = -1; + /* Now create pg_control */ + memset(ControlFile, 0, sizeof(ControlFileData)); /* Initialize pg_control status fields */ + ControlFile->system_identifier = sysidentifier; ControlFile->state = DB_SHUTDOWNED; ControlFile->time = checkPoint.time; ControlFile->logId = 0; @@ -2638,11 +2825,9 @@ StartupXLOG(void) /* This is just to allow attaching to startup process with a debugger */ #ifdef XLOG_REPLAY_DELAY -#ifdef WAL_DEBUG - if (XLOG_DEBUG && ControlFile->state != DB_SHUTDOWNED) + if (ControlFile->state != DB_SHUTDOWNED) sleep(60); #endif -#endif /* * Get the last valid checkpoint record. If the latest one according @@ -3241,7 +3426,7 @@ CreateCheckPoint(bool shutdown, bool force) { (void) AdvanceXLInsertBuffer(); /* OK to ignore update return flag, since we will do flush anyway */ - freespace = BLCKSZ - SizeOfXLogPHD; + freespace = INSERT_FREESPACE(Insert); } INSERT_RECPTR(checkPoint.redo, Insert, Insert->curridx); @@ -3468,6 +3653,38 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record) /* Any later WAL records should be run with the then-active SUI */ ThisStartUpID = checkPoint.ThisStartUpID; } + else if (info == XLOG_FILE_HEADER) + { + XLogFileHeaderData fhdr; + + memcpy(&fhdr, XLogRecGetData(record), sizeof(XLogFileHeaderData)); + if (fhdr.xlfhd_sysid != ControlFile->system_identifier) + { + char fhdrident_str[32]; + char sysident_str[32]; + + /* + * Format sysids separately to keep platform-dependent format + * code out of the translatable message string. + */ + snprintf(fhdrident_str, sizeof(fhdrident_str), UINT64_FORMAT, + fhdr.xlfhd_sysid); + snprintf(sysident_str, sizeof(sysident_str), UINT64_FORMAT, + ControlFile->system_identifier); + ereport(PANIC, + (errmsg("WAL file is from different system"), + errdetail("WAL file SYSID is %s, pg_control SYSID is %s", + fhdrident_str, sysident_str))); + } + if (fhdr.xlfhd_seg_size != XLogSegSize) + ereport(PANIC, + (errmsg("WAL file is from different system"), + errdetail("Incorrect XLOG_SEG_SIZE in file header."))); + } + else if (info == XLOG_WASTED_SPACE) + { + /* ignore */ + } } void @@ -3500,6 +3717,22 @@ xlog_desc(char *buf, uint8 xl_info, char *rec) memcpy(&nextOid, rec, sizeof(Oid)); sprintf(buf + strlen(buf), "nextOid: %u", nextOid); } + else if (info == XLOG_FILE_HEADER) + { + XLogFileHeaderData *fhdr = (XLogFileHeaderData *) rec; + + sprintf(buf + strlen(buf), + "file header: sysid " UINT64_FORMAT "; " + "xlogid %X segno %X; seg_size %X", + fhdr->xlfhd_sysid, + fhdr->xlfhd_xlogid, + fhdr->xlfhd_segno, + fhdr->xlfhd_seg_size); + } + else if (info == XLOG_WASTED_SPACE) + { + strcat(buf, "wasted space"); + } else strcat(buf, "UNKNOWN"); } |