aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/transam/xlog.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/transam/xlog.c')
-rw-r--r--src/backend/access/transam/xlog.c255
1 files changed, 244 insertions, 11 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 9056f0b4549..c0e328bf619 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2003, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.134 2004/02/10 01:55:24 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.135 2004/02/11 22:55:24 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -439,6 +439,7 @@ static bool InRedo = false;
static bool AdvanceXLInsertBuffer(void);
+static bool WasteXLInsertBuffer(void);
static void XLogWrite(XLogwrtRqst WriteRqst);
static int XLogFileInit(uint32 log, uint32 seg,
bool *use_existent, bool use_lock);
@@ -724,19 +725,51 @@ begin:;
dtbuf_rdt[2 * i + 1].next = NULL;
}
- /* Insert record header */
+ /*
+ * Determine exactly where we will place the new XLOG record. If there
+ * isn't enough space on the current XLOG page for a record header,
+ * advance to the next page (leaving the unused space as zeroes).
+ * If there isn't enough space in the current XLOG segment for the whole
+ * record, advance to the next segment (inserting wasted-space records).
+ * This avoids needing a continuation record at the start of a segment
+ * file, which would conflict with placing a FILE_HEADER record there.
+ * We assume that no XLOG record can be larger than a segment file...
+ */
updrqst = false;
freespace = INSERT_FREESPACE(Insert);
if (freespace < SizeOfXLogRecord)
{
updrqst = AdvanceXLInsertBuffer();
- freespace = BLCKSZ - SizeOfXLogPHD;
+ freespace = INSERT_FREESPACE(Insert);
+ }
+
+ if (freespace < (uint32) (SizeOfXLogRecord + write_len))
+ {
+ /* Doesn't fit on this page, so check for overrunning the file */
+ uint32 avail;
+
+ /* First figure the space available in remaining pages of file */
+ avail = XLogSegSize - BLCKSZ -
+ (Insert->currpage->xlp_pageaddr.xrecoff % XLogSegSize);
+ avail /= BLCKSZ; /* convert to pages, then usable bytes */
+ avail *= (BLCKSZ - SizeOfXLogPHD - SizeOfXLogContRecord);
+ avail += freespace; /* add in the current page too */
+ if (avail < (uint32) (SizeOfXLogRecord + write_len))
+ {
+ /* It overruns the file, so waste the rest of the file... */
+ do {
+ updrqst = WasteXLInsertBuffer();
+ } while ((Insert->currpage->xlp_pageaddr.xrecoff % XLogSegSize) != 0);
+ freespace = INSERT_FREESPACE(Insert);
+ }
}
curridx = Insert->curridx;
record = (XLogRecord *) Insert->currpos;
+ /* Insert record header */
+
record->xl_prev = Insert->PrevRecord;
if (no_tran)
{
@@ -829,6 +862,8 @@ begin:;
/* Use next buffer */
updrqst = AdvanceXLInsertBuffer();
curridx = Insert->curridx;
+ /* This assert checks we did not insert a file header record */
+ Assert(INSERT_FREESPACE(Insert) == BLCKSZ - SizeOfXLogPHD);
/* Insert cont-record header */
Insert->currpage->xlp_info |= XLP_FIRST_IS_CONTRECORD;
contrecord = (XLogContRecord *) Insert->currpos;
@@ -991,17 +1026,109 @@ AdvanceXLInsertBuffer(void)
*/
MemSet((char *) NewPage, 0, BLCKSZ);
- /* And fill the new page's header */
+ /*
+ * Fill the new page's header
+ */
NewPage->xlp_magic = XLOG_PAGE_MAGIC;
/* NewPage->xlp_info = 0; */ /* done by memset */
NewPage->xlp_sui = ThisStartUpID;
NewPage->xlp_pageaddr.xlogid = NewPageEndPtr.xlogid;
NewPage->xlp_pageaddr.xrecoff = NewPageEndPtr.xrecoff - BLCKSZ;
+ /*
+ * If first page of an XLOG segment file, add a FILE_HEADER record.
+ */
+ if ((NewPage->xlp_pageaddr.xrecoff % XLogSegSize) == 0)
+ {
+ XLogRecPtr RecPtr;
+ XLogRecord *record;
+ XLogFileHeaderData *fhdr;
+ crc64 crc;
+
+ record = (XLogRecord *) Insert->currpos;
+ record->xl_prev = Insert->PrevRecord;
+ record->xl_xact_prev.xlogid = 0;
+ record->xl_xact_prev.xrecoff = 0;
+ record->xl_xid = InvalidTransactionId;
+ record->xl_len = SizeOfXLogFHD;
+ record->xl_info = XLOG_FILE_HEADER;
+ record->xl_rmid = RM_XLOG_ID;
+ fhdr = (XLogFileHeaderData *) XLogRecGetData(record);
+ fhdr->xlfhd_sysid = ControlFile->system_identifier;
+ fhdr->xlfhd_xlogid = NewPage->xlp_pageaddr.xlogid;
+ fhdr->xlfhd_segno = NewPage->xlp_pageaddr.xrecoff / XLogSegSize;
+ fhdr->xlfhd_seg_size = XLogSegSize;
+
+ INIT_CRC64(crc);
+ COMP_CRC64(crc, fhdr, SizeOfXLogFHD);
+ COMP_CRC64(crc, (char *) record + sizeof(crc64),
+ SizeOfXLogRecord - sizeof(crc64));
+ FIN_CRC64(crc);
+ record->xl_crc = crc;
+
+ /* Compute record's XLOG location */
+ INSERT_RECPTR(RecPtr, Insert, nextidx);
+
+ /* Record begin of record in appropriate places */
+ Insert->PrevRecord = RecPtr;
+
+ Insert->currpos += SizeOfXLogRecord + SizeOfXLogFHD;
+ }
+
return update_needed;
}
/*
+ * Fill the remainder of the current XLOG page with an XLOG_WASTED_SPACE
+ * record, and advance to the next page. This has the same calling and
+ * result conditions as AdvanceXLInsertBuffer, except that
+ * AdvanceXLInsertBuffer expects the current page to be already filled.
+ */
+static bool
+WasteXLInsertBuffer(void)
+{
+ XLogCtlInsert *Insert = &XLogCtl->Insert;
+ XLogRecord *record;
+ XLogRecPtr RecPtr;
+ uint32 freespace;
+ uint16 curridx;
+ crc64 rdata_crc;
+
+ freespace = INSERT_FREESPACE(Insert);
+ Assert(freespace >= SizeOfXLogRecord);
+ freespace -= SizeOfXLogRecord;
+
+ curridx = Insert->curridx;
+ record = (XLogRecord *) Insert->currpos;
+
+ record->xl_prev = Insert->PrevRecord;
+ record->xl_xact_prev.xlogid = 0;
+ record->xl_xact_prev.xrecoff = 0;
+
+ record->xl_xid = InvalidTransactionId;
+ record->xl_len = freespace;
+ record->xl_info = XLOG_WASTED_SPACE;
+ record->xl_rmid = RM_XLOG_ID;
+
+ INIT_CRC64(rdata_crc);
+ COMP_CRC64(rdata_crc, XLogRecGetData(record), freespace);
+ COMP_CRC64(rdata_crc, (char *) record + sizeof(crc64),
+ SizeOfXLogRecord - sizeof(crc64));
+ FIN_CRC64(rdata_crc);
+ record->xl_crc = rdata_crc;
+
+ /* Compute record's XLOG location */
+ INSERT_RECPTR(RecPtr, Insert, curridx);
+
+ /* Record begin of record in appropriate places */
+ Insert->PrevRecord = RecPtr;
+
+ /* We needn't bother to advance Insert->currpos */
+
+ return AdvanceXLInsertBuffer();
+}
+
+/*
* Write and/or fsync the log at least as far as WriteRqst indicates.
*
* Must be called with WALWriteLock held.
@@ -2142,6 +2269,7 @@ WriteControlFile(void)
ControlFile->catalog_version_no = CATALOG_VERSION_NO;
ControlFile->blcksz = BLCKSZ;
ControlFile->relseg_size = RELSEG_SIZE;
+ ControlFile->xlog_seg_size = XLOG_SEG_SIZE;
ControlFile->nameDataLen = NAMEDATALEN;
ControlFile->funcMaxArgs = FUNC_MAX_ARGS;
@@ -2295,6 +2423,13 @@ ReadControlFile(void)
" but the server was compiled with RELSEG_SIZE %d.",
ControlFile->relseg_size, RELSEG_SIZE),
errhint("It looks like you need to recompile or initdb.")));
+ if (ControlFile->xlog_seg_size != XLOG_SEG_SIZE)
+ ereport(FATAL,
+ (errmsg("database files are incompatible with server"),
+ errdetail("The database cluster was initialized with XLOG_SEG_SIZE %d,"
+ " but the server was compiled with XLOG_SEG_SIZE %d.",
+ ControlFile->xlog_seg_size, XLOG_SEG_SIZE),
+ errhint("It looks like you need to recompile or initdb.")));
if (ControlFile->nameDataLen != NAMEDATALEN)
ereport(FATAL,
(errmsg("database files are incompatible with server"),
@@ -2484,15 +2619,36 @@ BootStrapXLOG(void)
char *buffer;
XLogPageHeader page;
XLogRecord *record;
+ XLogFileHeaderData *fhdr;
bool use_existent;
+ uint64 sysidentifier;
+ struct timeval tv;
crc64 crc;
+ /*
+ * Select a hopefully-unique system identifier code for this installation.
+ * We use the result of gettimeofday(), including the fractional seconds
+ * field, as being about as unique as we can easily get. (Think not to
+ * use random(), since it hasn't been seeded and there's no portable way
+ * to seed it other than the system clock value...) The upper half of the
+ * uint64 value is just the tv_sec part, while the lower half is the XOR
+ * of tv_sec and tv_usec. This is to ensure that we don't lose uniqueness
+ * unnecessarily if "uint64" is really only 32 bits wide. A person
+ * knowing this encoding can determine the initialization time of the
+ * installation, which could perhaps be useful sometimes.
+ */
+ gettimeofday(&tv, NULL);
+ sysidentifier = ((uint64) tv.tv_sec) << 32;
+ sysidentifier |= (uint32) (tv.tv_sec | tv.tv_usec);
+
/* Use malloc() to ensure buffer is MAXALIGNED */
buffer = (char *) malloc(BLCKSZ);
page = (XLogPageHeader) buffer;
+ memset(buffer, 0, BLCKSZ);
+ /* Set up information for the initial checkpoint record */
checkPoint.redo.xlogid = 0;
- checkPoint.redo.xrecoff = SizeOfXLogPHD;
+ checkPoint.redo.xrecoff = SizeOfXLogPHD + SizeOfXLogRecord + SizeOfXLogFHD;
checkPoint.undo = checkPoint.redo;
checkPoint.ThisStartUpID = 0;
checkPoint.nextXid = FirstNormalTransactionId;
@@ -2503,16 +2659,42 @@ BootStrapXLOG(void)
ShmemVariableCache->nextOid = checkPoint.nextOid;
ShmemVariableCache->oidCount = 0;
- memset(buffer, 0, BLCKSZ);
+ /* Set up the XLOG page header */
page->xlp_magic = XLOG_PAGE_MAGIC;
page->xlp_info = 0;
page->xlp_sui = checkPoint.ThisStartUpID;
page->xlp_pageaddr.xlogid = 0;
page->xlp_pageaddr.xrecoff = 0;
+
+ /* Insert the file header record */
record = (XLogRecord *) ((char *) page + SizeOfXLogPHD);
record->xl_prev.xlogid = 0;
record->xl_prev.xrecoff = 0;
- record->xl_xact_prev = record->xl_prev;
+ record->xl_xact_prev.xlogid = 0;
+ record->xl_xact_prev.xrecoff = 0;
+ record->xl_xid = InvalidTransactionId;
+ record->xl_len = SizeOfXLogFHD;
+ record->xl_info = XLOG_FILE_HEADER;
+ record->xl_rmid = RM_XLOG_ID;
+ fhdr = (XLogFileHeaderData *) XLogRecGetData(record);
+ fhdr->xlfhd_sysid = sysidentifier;
+ fhdr->xlfhd_xlogid = 0;
+ fhdr->xlfhd_segno = 0;
+ fhdr->xlfhd_seg_size = XLogSegSize;
+
+ INIT_CRC64(crc);
+ COMP_CRC64(crc, fhdr, SizeOfXLogFHD);
+ COMP_CRC64(crc, (char *) record + sizeof(crc64),
+ SizeOfXLogRecord - sizeof(crc64));
+ FIN_CRC64(crc);
+ record->xl_crc = crc;
+
+ /* Insert the initial checkpoint record */
+ record = (XLogRecord *) ((char *) page + SizeOfXLogPHD + SizeOfXLogRecord + SizeOfXLogFHD);
+ record->xl_prev.xlogid = 0;
+ record->xl_prev.xrecoff = SizeOfXLogPHD;
+ record->xl_xact_prev.xlogid = 0;
+ record->xl_xact_prev.xrecoff = 0;
record->xl_xid = InvalidTransactionId;
record->xl_len = sizeof(checkPoint);
record->xl_info = XLOG_CHECKPOINT_SHUTDOWN;
@@ -2526,9 +2708,11 @@ BootStrapXLOG(void)
FIN_CRC64(crc);
record->xl_crc = crc;
+ /* Create first XLOG segment file */
use_existent = false;
openLogFile = XLogFileInit(0, 0, &use_existent, false);
+ /* Write the first page with the initial records */
errno = 0;
if (write(openLogFile, buffer, BLCKSZ) != BLCKSZ)
{
@@ -2552,8 +2736,11 @@ BootStrapXLOG(void)
openLogFile = -1;
+ /* Now create pg_control */
+
memset(ControlFile, 0, sizeof(ControlFileData));
/* Initialize pg_control status fields */
+ ControlFile->system_identifier = sysidentifier;
ControlFile->state = DB_SHUTDOWNED;
ControlFile->time = checkPoint.time;
ControlFile->logId = 0;
@@ -2638,11 +2825,9 @@ StartupXLOG(void)
/* This is just to allow attaching to startup process with a debugger */
#ifdef XLOG_REPLAY_DELAY
-#ifdef WAL_DEBUG
- if (XLOG_DEBUG && ControlFile->state != DB_SHUTDOWNED)
+ if (ControlFile->state != DB_SHUTDOWNED)
sleep(60);
#endif
-#endif
/*
* Get the last valid checkpoint record. If the latest one according
@@ -3241,7 +3426,7 @@ CreateCheckPoint(bool shutdown, bool force)
{
(void) AdvanceXLInsertBuffer();
/* OK to ignore update return flag, since we will do flush anyway */
- freespace = BLCKSZ - SizeOfXLogPHD;
+ freespace = INSERT_FREESPACE(Insert);
}
INSERT_RECPTR(checkPoint.redo, Insert, Insert->curridx);
@@ -3468,6 +3653,38 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
/* Any later WAL records should be run with the then-active SUI */
ThisStartUpID = checkPoint.ThisStartUpID;
}
+ else if (info == XLOG_FILE_HEADER)
+ {
+ XLogFileHeaderData fhdr;
+
+ memcpy(&fhdr, XLogRecGetData(record), sizeof(XLogFileHeaderData));
+ if (fhdr.xlfhd_sysid != ControlFile->system_identifier)
+ {
+ char fhdrident_str[32];
+ char sysident_str[32];
+
+ /*
+ * Format sysids separately to keep platform-dependent format
+ * code out of the translatable message string.
+ */
+ snprintf(fhdrident_str, sizeof(fhdrident_str), UINT64_FORMAT,
+ fhdr.xlfhd_sysid);
+ snprintf(sysident_str, sizeof(sysident_str), UINT64_FORMAT,
+ ControlFile->system_identifier);
+ ereport(PANIC,
+ (errmsg("WAL file is from different system"),
+ errdetail("WAL file SYSID is %s, pg_control SYSID is %s",
+ fhdrident_str, sysident_str)));
+ }
+ if (fhdr.xlfhd_seg_size != XLogSegSize)
+ ereport(PANIC,
+ (errmsg("WAL file is from different system"),
+ errdetail("Incorrect XLOG_SEG_SIZE in file header.")));
+ }
+ else if (info == XLOG_WASTED_SPACE)
+ {
+ /* ignore */
+ }
}
void
@@ -3500,6 +3717,22 @@ xlog_desc(char *buf, uint8 xl_info, char *rec)
memcpy(&nextOid, rec, sizeof(Oid));
sprintf(buf + strlen(buf), "nextOid: %u", nextOid);
}
+ else if (info == XLOG_FILE_HEADER)
+ {
+ XLogFileHeaderData *fhdr = (XLogFileHeaderData *) rec;
+
+ sprintf(buf + strlen(buf),
+ "file header: sysid " UINT64_FORMAT "; "
+ "xlogid %X segno %X; seg_size %X",
+ fhdr->xlfhd_sysid,
+ fhdr->xlfhd_xlogid,
+ fhdr->xlfhd_segno,
+ fhdr->xlfhd_seg_size);
+ }
+ else if (info == XLOG_WASTED_SPACE)
+ {
+ strcat(buf, "wasted space");
+ }
else
strcat(buf, "UNKNOWN");
}