aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/src/sgml/ref/checkpoint.sgml13
-rw-r--r--doc/src/sgml/runtime.sgml32
-rw-r--r--doc/src/sgml/wal.sgml15
-rw-r--r--src/backend/access/transam/transam.c3
-rw-r--r--src/backend/access/transam/varsup.c38
-rw-r--r--src/backend/access/transam/xact.c24
-rw-r--r--src/backend/access/transam/xlog.c2573
-rw-r--r--src/backend/access/transam/xlogutils.c4
-rw-r--r--src/backend/bootstrap/bootstrap.c72
-rw-r--r--src/backend/port/beos/shm.c23
-rw-r--r--src/backend/port/qnx4/shm.c25
-rw-r--r--src/backend/postmaster/postmaster.c159
-rw-r--r--src/backend/storage/ipc/ipc.c40
-rw-r--r--src/backend/tcop/postgres.c41
-rw-r--r--src/backend/utils/hash/Makefile4
-rw-r--r--src/backend/utils/hash/pg_crc.c417
-rw-r--r--src/backend/utils/init/globals.c52
-rw-r--r--src/backend/utils/init/miscinit.c180
-rw-r--r--src/backend/utils/misc/guc.c10
-rw-r--r--src/backend/utils/misc/postgresql.conf.sample1
-rw-r--r--src/include/access/transam.h18
-rw-r--r--src/include/access/xlog.h171
-rw-r--r--src/include/access/xlogdefs.h41
-rw-r--r--src/include/access/xlogutils.h10
-rw-r--r--src/include/catalog/pg_control.h115
-rw-r--r--src/include/miscadmin.h4
-rw-r--r--src/include/storage/ipc.h4
-rw-r--r--src/include/tcop/tcopprot.h3
-rw-r--r--src/include/utils/pg_crc.h115
29 files changed, 2818 insertions, 1389 deletions
diff --git a/doc/src/sgml/ref/checkpoint.sgml b/doc/src/sgml/ref/checkpoint.sgml
index 021035888b4..5336c3f5529 100644
--- a/doc/src/sgml/ref/checkpoint.sgml
+++ b/doc/src/sgml/ref/checkpoint.sgml
@@ -1,4 +1,4 @@
-<!-- $Header: /cvsroot/pgsql/doc/src/sgml/ref/checkpoint.sgml,v 1.3 2001/01/27 10:19:52 petere Exp $ -->
+<!-- $Header: /cvsroot/pgsql/doc/src/sgml/ref/checkpoint.sgml,v 1.4 2001/03/13 01:17:05 tgl Exp $ -->
<refentry id="sql-checkpoint">
<docinfo>
@@ -26,11 +26,12 @@ CHECKPOINT
<para>
Write-Ahead Logging (WAL) puts a checkpoint in the transaction log
- every 300 seconds by default. (This may be changed by the run-time
- configuration option <parameter>CHECKPOINT_TIMEOUT</parameter>.)
- The <command>CHECKPOINT</command> command forces a checkpoint at
- the point at which the command is issued. The next automatic
- checkpoint will still happen after the original cycle expires.
+ every so often. (To adjust the automatic checkpoint interval, see
+ the run-time
+ configuration options <parameter>CHECKPOINT_SEGMENTS</parameter>
+ and <parameter>CHECKPOINT_TIMEOUT</parameter>.)
+ The <command>CHECKPOINT</command> command forces an immediate checkpoint
+ when the command is issued, without waiting for a scheduled checkpoint.
</para>
<para>
diff --git a/doc/src/sgml/runtime.sgml b/doc/src/sgml/runtime.sgml
index b23dcbf5a60..f321cea669b 100644
--- a/doc/src/sgml/runtime.sgml
+++ b/doc/src/sgml/runtime.sgml
@@ -1,5 +1,5 @@
<!--
-$Header: /cvsroot/pgsql/doc/src/sgml/runtime.sgml,v 1.55 2001/02/18 05:30:12 tgl Exp $
+$Header: /cvsroot/pgsql/doc/src/sgml/runtime.sgml,v 1.56 2001/03/13 01:17:05 tgl Exp $
-->
<Chapter Id="runtime">
@@ -976,6 +976,11 @@ env PGOPTIONS='-c geqo=off' psql
fsyncs because of performance problems, you may wish to reconsider
your choice.
</para>
+
+ <para>
+ This option can only be set at server start or in the
+ <filename>postgresql.conf</filename> file.
+ </para>
</listitem>
</varlistentry>
@@ -1193,10 +1198,24 @@ env PGOPTIONS='-c geqo=off' psql
<variablelist>
<varlistentry>
+ <term>CHECKPOINT_SEGMENTS (<type>integer</type>)</term>
+ <listitem>
+ <para>
+ Maximum distance between automatic WAL checkpoints, in logfile
+ segments (each segment is normally 16 megabytes).
+ This option can only be set at server start or in the
+ <filename>postgresql.conf</filename> file.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
<term>CHECKPOINT_TIMEOUT (<type>integer</type>)</term>
<listitem>
<para>
- Frequency of automatic WAL checkpoints, in seconds.
+ Maximum time between automatic WAL checkpoints, in seconds.
+ This option can only be set at server start or in the
+ <filename>postgresql.conf</filename> file.
</para>
</listitem>
</varlistentry>
@@ -1205,8 +1224,8 @@ env PGOPTIONS='-c geqo=off' psql
<term>WAL_BUFFERS (<type>integer</type>)</term>
<listitem>
<para>
- Number of buffers for WAL. This option can only be set at
- server start.
+ Number of disk-page buffers for WAL log. This option can only be set
+ at server start.
</para>
</listitem>
</varlistentry>
@@ -1226,7 +1245,8 @@ env PGOPTIONS='-c geqo=off' psql
<listitem>
<para>
Number of log files that are created in advance at checkpoint
- time. This option can only be set at server start.
+ time. This option can only be set at server start or in the
+ <filename>postgresql.conf</filename> file.
</para>
</listitem>
</varlistentry>
@@ -1909,7 +1929,7 @@ default:\
<listitem>
<para>
This is the <firstterm>Immediate Shutdown</firstterm> which
- will cause the postmaster to send a SIGUSR1 to all backends and
+ will cause the postmaster to send a SIGQUIT to all backends and
exit immediately (without properly shutting down the database
system). When WAL is implemented, this will lead to recovery on
start-up. Right now it's not recommendable to use this option.
diff --git a/doc/src/sgml/wal.sgml b/doc/src/sgml/wal.sgml
index e706ee271c6..c92ccd9d230 100644
--- a/doc/src/sgml/wal.sgml
+++ b/doc/src/sgml/wal.sgml
@@ -1,4 +1,4 @@
-<!-- $Header: /cvsroot/pgsql/doc/src/sgml/wal.sgml,v 1.3 2001/02/26 00:50:07 tgl Exp $ -->
+<!-- $Header: /cvsroot/pgsql/doc/src/sgml/wal.sgml,v 1.4 2001/03/13 01:17:05 tgl Exp $ -->
<chapter id="wal">
<title>Write-Ahead Logging (<acronym>WAL</acronym>)</title>
@@ -257,7 +257,7 @@
The <acronym>WAL</acronym> log is held on the disk as a set of 16
MB files called <firstterm>segments</firstterm>. By default a new
segment is created only if more than 75% of the current segment is
- used. One can instruct the server to create up to 64 log segments
+ used. One can instruct the server to pre-create up to 64 log segments
at checkpoint time by modifying the <varname>WAL_FILES</varname>
configuration parameter.
</para>
@@ -272,11 +272,12 @@
</para>
<para>
- By default, the postmaster spawns a special backend process to
- create the next checkpoint 300 seconds after the previous
- checkpoint's creation. One can change this interval by modifying
- the <varname>CHECKPOINT_TIMEOUT</varname> parameter. It is also
- possible to force a checkpoint by using the SQL command
+ The postmaster spawns a special backend process every so often
+ to create the next checkpoint. A checkpoint is created every
+ <varname>CHECKPOINT_SEGMENTS</varname> log segments, or every
+ <varname>CHECKPOINT_TIMEOUT</varname> seconds, whichever comes first.
+ The default settings are 3 segments and 300 seconds respectively.
+ It is also possible to force a checkpoint by using the SQL command
<command>CHECKPOINT</command>.
</para>
diff --git a/src/backend/access/transam/transam.c b/src/backend/access/transam/transam.c
index dcac8ac2e04..ec429c194b2 100644
--- a/src/backend/access/transam/transam.c
+++ b/src/backend/access/transam/transam.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/access/transam/transam.c,v 1.39 2001/01/24 19:42:51 momjian Exp $
+ * $Header: /cvsroot/pgsql/src/backend/access/transam/transam.c,v 1.40 2001/03/13 01:17:05 tgl Exp $
*
* NOTES
* This file contains the high level access-method interface to the
@@ -430,6 +430,7 @@ InitializeTransactionLog(void)
Assert(!IsUnderPostmaster &&
ShmemVariableCache->nextXid <= FirstTransactionId);
ShmemVariableCache->nextXid = FirstTransactionId;
+ ShmemVariableCache->xidCount = 0; /* force an XLOG rec right away */
}
else if (RecoveryCheckingEnabled())
{
diff --git a/src/backend/access/transam/varsup.c b/src/backend/access/transam/varsup.c
index 885e8e236c1..e4271f5fa86 100644
--- a/src/backend/access/transam/varsup.c
+++ b/src/backend/access/transam/varsup.c
@@ -6,7 +6,7 @@
* Copyright (c) 2000, PostgreSQL Global Development Group
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/access/transam/varsup.c,v 1.35 2001/01/24 19:42:51 momjian Exp $
+ * $Header: /cvsroot/pgsql/src/backend/access/transam/varsup.c,v 1.36 2001/03/13 01:17:05 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -14,12 +14,17 @@
#include "postgres.h"
#include "access/transam.h"
+#include "access/xlog.h"
#include "storage/proc.h"
-SPINLOCK OidGenLockId;
-extern SPINLOCK XidGenLockId;
-extern void XLogPutNextOid(Oid nextOid);
+/* Number of XIDs and OIDs to prefetch (preallocate) per XLOG write */
+#define VAR_XID_PREFETCH 1024
+#define VAR_OID_PREFETCH 8192
+
+/* Spinlocks for serializing generation of XIDs and OIDs, respectively */
+SPINLOCK XidGenLockId;
+SPINLOCK OidGenLockId;
/* pointer to "variable cache" in shared memory (set up by shmem.c) */
VariableCache ShmemVariableCache = NULL;
@@ -38,23 +43,31 @@ GetNewTransactionId(TransactionId *xid)
}
SpinAcquire(XidGenLockId);
+
+ /* If we run out of logged for use xids then we must log more */
+ if (ShmemVariableCache->xidCount == 0)
+ {
+ XLogPutNextXid(ShmemVariableCache->nextXid + VAR_XID_PREFETCH);
+ ShmemVariableCache->xidCount = VAR_XID_PREFETCH;
+ }
+
*xid = ShmemVariableCache->nextXid;
- (ShmemVariableCache->nextXid)++;
- if (MyProc != (PROC *) NULL)
- MyProc->xid = *xid;
+ (ShmemVariableCache->nextXid)++;
+ (ShmemVariableCache->xidCount)--;
SpinRelease(XidGenLockId);
+ if (MyProc != (PROC *) NULL)
+ MyProc->xid = *xid;
}
/*
- * Like GetNewTransactionId reads nextXid but don't fetch it.
+ * Read nextXid but don't allocate it.
*/
void
ReadNewTransactionId(TransactionId *xid)
{
-
/*
* During bootstrap initialization, we return the special
* bootstrap transaction id.
@@ -68,7 +81,6 @@ ReadNewTransactionId(TransactionId *xid)
SpinAcquire(XidGenLockId);
*xid = ShmemVariableCache->nextXid;
SpinRelease(XidGenLockId);
-
}
/* ----------------------------------------------------------------
@@ -76,7 +88,6 @@ ReadNewTransactionId(TransactionId *xid)
* ----------------------------------------------------------------
*/
-#define VAR_OID_PREFETCH 8192
static Oid lastSeenOid = InvalidOid;
void
@@ -84,7 +95,7 @@ GetNewObjectId(Oid *oid_return)
{
SpinAcquire(OidGenLockId);
- /* If we run out of logged for use oids then we log more */
+ /* If we run out of logged for use oids then we must log more */
if (ShmemVariableCache->oidCount == 0)
{
XLogPutNextOid(ShmemVariableCache->nextOid + VAR_OID_PREFETCH);
@@ -103,11 +114,11 @@ GetNewObjectId(Oid *oid_return)
void
CheckMaxObjectId(Oid assigned_oid)
{
-
if (lastSeenOid != InvalidOid && assigned_oid < lastSeenOid)
return;
SpinAcquire(OidGenLockId);
+
if (assigned_oid < ShmemVariableCache->nextOid)
{
lastSeenOid = ShmemVariableCache->nextOid - 1;
@@ -138,5 +149,4 @@ CheckMaxObjectId(Oid assigned_oid)
ShmemVariableCache->nextOid = assigned_oid + 1;
SpinRelease(OidGenLockId);
-
}
diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c
index 0af25826589..1331c8e9834 100644
--- a/src/backend/access/transam/xact.c
+++ b/src/backend/access/transam/xact.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/access/transam/xact.c,v 1.98 2001/02/26 00:50:07 tgl Exp $
+ * $Header: /cvsroot/pgsql/src/backend/access/transam/xact.c,v 1.99 2001/03/13 01:17:05 tgl Exp $
*
* NOTES
* Transaction aborts can now occur two ways:
@@ -706,14 +706,18 @@ RecordTransactionCommit()
}
XLogFlush(recptr);
+
+ /* Break the chain of back-links in the XLOG records I output */
MyLastRecPtr.xrecoff = 0;
TransactionIdCommit(xid);
- MyProc->logRec.xrecoff = 0;
END_CRIT_SECTION();
}
+ /* Show myself as out of the transaction in PROC array */
+ MyProc->logRec.xrecoff = 0;
+
if (leak)
ResetBufferPool(true);
}
@@ -802,6 +806,10 @@ RecordTransactionAbort(void)
{
TransactionId xid = GetCurrentTransactionId();
+ /*
+ * Double check here is to catch case that we aborted partway through
+ * RecordTransactionCommit ...
+ */
if (MyLastRecPtr.xrecoff != 0 && !TransactionIdDidCommit(xid))
{
XLogRecData rdata;
@@ -815,13 +823,19 @@ RecordTransactionAbort(void)
rdata.next = NULL;
START_CRIT_SECTION();
+
recptr = XLogInsert(RM_XACT_ID, XLOG_XACT_ABORT, &rdata);
TransactionIdAbort(xid);
- MyProc->logRec.xrecoff = 0;
+
END_CRIT_SECTION();
}
+ /* Break the chain of back-links in the XLOG records I output */
+ MyLastRecPtr.xrecoff = 0;
+ /* Show myself as out of the transaction in PROC array */
+ MyProc->logRec.xrecoff = 0;
+
/*
* Tell bufmgr and smgr to release resources.
*/
@@ -1187,10 +1201,6 @@ AbortTransaction(void)
AtEOXact_CatCache(false);
AtAbort_Memory();
AtEOXact_Files();
-
- /* Here we'll rollback xaction changes */
- MyLastRecPtr.xrecoff = 0;
-
AtAbort_Locks();
SharedBufferChanged = false; /* safest place to do it */
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index fc861aba123..3d6b8255e96 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -6,7 +6,7 @@
* Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $Header: /cvsroot/pgsql/src/backend/access/transam/xlog.c,v 1.55 2001/02/26 00:50:07 tgl Exp $
+ * $Header: /cvsroot/pgsql/src/backend/access/transam/xlog.c,v 1.56 2001/03/13 01:17:05 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -14,6 +14,7 @@
#include "postgres.h"
#include <fcntl.h>
+#include <signal.h>
#include <unistd.h>
#include <errno.h>
#include <sys/stat.h>
@@ -27,6 +28,7 @@
#include "access/transam.h"
#include "access/xact.h"
#include "catalog/catversion.h"
+#include "catalog/pg_control.h"
#include "storage/sinval.h"
#include "storage/proc.h"
#include "storage/spin.h"
@@ -36,7 +38,6 @@
#include "access/xlogutils.h"
#include "utils/builtins.h"
#include "utils/relcache.h"
-
#include "miscadmin.h"
@@ -45,160 +46,239 @@
/* Max time to wait to acquire checkpoint lock */
#define CHECKPOINT_LOCK_TIMEOUT (10*60*1000000) /* 10 minutes */
-
+/* User-settable parameters */
+int CheckPointSegments = 3;
int XLOGbuffers = 8;
-int XLOGfiles = 0; /* how many files to pre-allocate */
-XLogRecPtr MyLastRecPtr = {0, 0};
-bool InRecovery = false;
+int XLOGfiles = 0; /* how many files to pre-allocate during ckpt */
+int XLOG_DEBUG = 0;
+char XLOG_archive_dir[MAXPGPATH]; /* null string means delete 'em */
+
+#define MinXLOGbuffers 4
+
+
+/*
+ * ThisStartUpID will be same in all backends --- it identifies current
+ * instance of the database system.
+ */
StartUpID ThisStartUpID = 0;
-XLogRecPtr RedoRecPtr;
-int XLOG_DEBUG = 0;
+/* Are we doing recovery by reading XLOG? */
+bool InRecovery = false;
-/* To read/update control file and create new log file */
-SPINLOCK ControlFileLockId;
+/*
+ * MyLastRecPtr points to the start of the last XLOG record inserted by the
+ * current transaction. If MyLastRecPtr.xrecoff == 0, then we are not in
+ * a transaction or the transaction has not yet made any loggable changes.
+ *
+ * Note that XLOG records inserted outside transaction control are not
+ * reflected into MyLastRecPtr.
+ */
+XLogRecPtr MyLastRecPtr = {0, 0};
-/* To generate new xid */
-SPINLOCK XidGenLockId;
+/*
+ * ProcLastRecPtr points to the start of the last XLOG record inserted by the
+ * current backend. It is updated for all inserts, transaction-controlled
+ * or not.
+ */
+static XLogRecPtr ProcLastRecPtr = {0, 0};
-static char XLogDir[MAXPGPATH];
-static char ControlFilePath[MAXPGPATH];
+/*
+ * RedoRecPtr is this backend's local copy of the REDO record pointer
+ * (which is almost but not quite the same as a pointer to the most recent
+ * CHECKPOINT record). We update this from the shared-memory copy,
+ * XLogCtl->Insert.RedoRecPtr, whenever we can safely do so (ie, when we
+ * hold the Insert spinlock). See XLogInsert for details.
+ */
+static XLogRecPtr RedoRecPtr;
-#define MinXLOGbuffers 4
+/* This lock must be held to read/update control file or create new log file */
+SPINLOCK ControlFileLockId;
-typedef struct XLgwrRqst
+/*----------
+ * Shared-memory data structures for XLOG control
+ *
+ * LogwrtRqst indicates a byte position that we need to write and/or fsync
+ * the log up to (all records before that point must be written or fsynced).
+ * LogwrtResult indicates the byte positions we have already written/fsynced.
+ * These structs are identical but are declared separately to indicate their
+ * slightly different functions.
+ *
+ * We do a lot of pushups to minimize the amount of access to spinlocked
+ * shared memory values. There are actually three shared-memory copies of
+ * LogwrtResult, plus one unshared copy in each backend. Here's how it works:
+ * XLogCtl->LogwrtResult is protected by info_lck
+ * XLogCtl->Write.LogwrtResult is protected by logwrt_lck
+ * XLogCtl->Insert.LogwrtResult is protected by insert_lck
+ * One must hold the associated spinlock to read or write any of these, but
+ * of course no spinlock is needed to read/write the unshared LogwrtResult.
+ *
+ * XLogCtl->LogwrtResult and XLogCtl->Write.LogwrtResult are both "always
+ * right", since both are updated by a write or flush operation before
+ * it releases logwrt_lck. The point of keeping XLogCtl->Write.LogwrtResult
+ * is that it can be examined/modified by code that already holds logwrt_lck
+ * without needing to grab info_lck as well.
+ *
+ * XLogCtl->Insert.LogwrtResult may lag behind the reality of the other two,
+ * but is updated when convenient. Again, it exists for the convenience of
+ * code that is already holding insert_lck but not the other locks.
+ *
+ * The unshared LogwrtResult may lag behind any or all of these, and again
+ * is updated when convenient.
+ *
+ * The request bookkeeping is simpler: there is a shared XLogCtl->LogwrtRqst
+ * (protected by info_lck), but we don't need to cache any copies of it.
+ *
+ * Note that this all works because the request and result positions can only
+ * advance forward, never back up, and so we can easily determine which of two
+ * values is "more up to date".
+ *----------
+ */
+typedef struct XLogwrtRqst
{
- XLogRecPtr Write; /* byte (1-based) to write out */
- XLogRecPtr Flush; /* byte (1-based) to flush */
-} XLgwrRqst;
+ XLogRecPtr Write; /* last byte + 1 to write out */
+ XLogRecPtr Flush; /* last byte + 1 to flush */
+} XLogwrtRqst;
-typedef struct XLgwrResult
+typedef struct XLogwrtResult
{
- XLogRecPtr Write; /* bytes written out */
- XLogRecPtr Flush; /* bytes flushed */
-} XLgwrResult;
+ XLogRecPtr Write; /* last byte + 1 written out */
+ XLogRecPtr Flush; /* last byte + 1 flushed */
+} XLogwrtResult;
+/*
+ * Shared state data for XLogInsert.
+ */
typedef struct XLogCtlInsert
{
- XLgwrResult LgwrResult;
- XLogRecPtr PrevRecord;
+ XLogwrtResult LogwrtResult; /* a recent value of LogwrtResult */
+ XLogRecPtr PrevRecord; /* start of previously-inserted record */
uint16 curridx; /* current block index in cache */
- XLogPageHeader currpage;
- char *currpos;
- XLogRecPtr RedoRecPtr;
+ XLogPageHeader currpage; /* points to header of block in cache */
+ char *currpos; /* current insertion point in cache */
+ XLogRecPtr RedoRecPtr; /* current redo point for insertions */
} XLogCtlInsert;
+/*
+ * Shared state data for XLogWrite/XLogFlush.
+ */
typedef struct XLogCtlWrite
{
- XLgwrResult LgwrResult;
- uint16 curridx; /* index of next block to write */
+ XLogwrtResult LogwrtResult; /* current value of LogwrtResult */
+ uint16 curridx; /* cache index of next block to write */
} XLogCtlWrite;
-
+/*
+ * Total shared-memory state for XLOG.
+ */
typedef struct XLogCtlData
{
+ /* Protected by insert_lck: */
XLogCtlInsert Insert;
- XLgwrRqst LgwrRqst;
- XLgwrResult LgwrResult;
+ /* Protected by info_lck: */
+ XLogwrtRqst LogwrtRqst;
+ XLogwrtResult LogwrtResult;
+ /* Protected by logwrt_lck: */
XLogCtlWrite Write;
- char *pages;
+ /*
+ * These values do not change after startup, although the pointed-to
+ * pages and xlblocks values certainly do. Permission to read/write
+ * the pages and xlblocks values depends on insert_lck and logwrt_lck.
+ */
+ char *pages; /* buffers for unwritten XLOG pages */
XLogRecPtr *xlblocks; /* 1st byte ptr-s + BLCKSZ */
- uint32 XLogCacheByte;
- uint32 XLogCacheBlck;
+ uint32 XLogCacheByte; /* # bytes in xlog buffers */
+ uint32 XLogCacheBlck; /* highest allocated xlog buffer index */
StartUpID ThisStartUpID;
- XLogRecPtr RedoRecPtr; /* for postmaster */
- slock_t insert_lck;
- slock_t info_lck;
- slock_t lgwr_lck;
+
+ /* This value is not protected by *any* spinlock... */
+ XLogRecPtr RedoRecPtr; /* see SetRedoRecPtr/GetRedoRecPtr */
+
+ slock_t insert_lck; /* XLogInsert lock */
+ slock_t info_lck; /* locks shared LogwrtRqst/LogwrtResult */
+ slock_t logwrt_lck; /* XLogWrite/XLogFlush lock */
slock_t chkp_lck; /* checkpoint lock */
} XLogCtlData;
static XLogCtlData *XLogCtl = NULL;
/*
- * Contents of pg_control
+ * We maintain an image of pg_control in shared memory.
*/
-
-typedef enum DBState
-{
- DB_STARTUP = 0,
- DB_SHUTDOWNED,
- DB_SHUTDOWNING,
- DB_IN_RECOVERY,
- DB_IN_PRODUCTION
-} DBState;
-
-#define LOCALE_NAME_BUFLEN 128
-
-typedef struct ControlFileData
-{
- crc64 crc;
- uint32 logId; /* current log file id */
- uint32 logSeg; /* current log file segment (1-based) */
- XLogRecPtr checkPoint; /* last check point record ptr */
- time_t time; /* time stamp of last modification */
- DBState state; /* see enum above */
-
- /*
- * this data is used to make sure that configuration of this DB is
- * compatible with the backend executable
- */
- uint32 blcksz; /* block size for this DB */
- uint32 relseg_size; /* blocks per segment of large relation */
- uint32 catalog_version_no; /* internal version number */
- /* active locales --- "C" if compiled without USE_LOCALE: */
- char lc_collate[LOCALE_NAME_BUFLEN];
- char lc_ctype[LOCALE_NAME_BUFLEN];
-
- /*
- * important directory locations
- */
- char archdir[MAXPGPATH]; /* where to move offline log files */
-} ControlFileData;
-
static ControlFileData *ControlFile = NULL;
-typedef struct CheckPoint
-{
- XLogRecPtr redo; /* next RecPtr available when we */
- /* began to create CheckPoint */
- /* (i.e. REDO start point) */
- XLogRecPtr undo; /* first record of oldest in-progress */
- /* transaction when we started */
- /* (i.e. UNDO end point) */
- StartUpID ThisStartUpID;
- TransactionId nextXid;
- Oid nextOid;
- bool Shutdown;
-} CheckPoint;
+/*
+ * Macros for managing XLogInsert state. In most cases, the calling routine
+ * has local copies of XLogCtl->Insert and/or XLogCtl->Insert->curridx,
+ * so these are passed as parameters instead of being fetched via XLogCtl.
+ */
-#define XLOG_CHECKPOINT 0x00
-#define XLOG_NEXTOID 0x10
+/* Free space remaining in the current xlog page buffer */
+#define INSERT_FREESPACE(Insert) \
+ (BLCKSZ - ((Insert)->currpos - (char *) (Insert)->currpage))
+
+/* Construct XLogRecPtr value for current insertion point */
+#define INSERT_RECPTR(recptr,Insert,curridx) \
+ ( \
+ (recptr).xlogid = XLogCtl->xlblocks[curridx].xlogid, \
+ (recptr).xrecoff = \
+ XLogCtl->xlblocks[curridx].xrecoff - INSERT_FREESPACE(Insert) \
+ )
+
+
+/* Increment an xlogid/segment pair */
+#define NextLogSeg(logId, logSeg) \
+ do { \
+ if ((logSeg) >= XLogSegsPerFile-1) \
+ { \
+ (logId)++; \
+ (logSeg) = 0; \
+ } \
+ else \
+ (logSeg)++; \
+ } while (0)
+
+/* Decrement an xlogid/segment pair (assume it's not 0,0) */
+#define PrevLogSeg(logId, logSeg) \
+ do { \
+ if (logSeg) \
+ (logSeg)--; \
+ else \
+ { \
+ (logId)--; \
+ (logSeg) = XLogSegsPerFile-1; \
+ } \
+ } while (0)
-typedef struct BkpBlock
-{
- crc64 crc;
- RelFileNode node;
- BlockNumber block;
-} BkpBlock;
+/*
+ * Compute ID and segment from an XLogRecPtr.
+ *
+ * For XLByteToSeg, do the computation at face value. For XLByteToPrevSeg,
+ * a boundary byte is taken to be in the previous segment. This is suitable
+ * for deciding which segment to write given a pointer to a record end,
+ * for example.
+ */
+#define XLByteToSeg(xlrp, logId, logSeg) \
+ ( logId = (xlrp).xlogid, \
+ logSeg = (xlrp).xrecoff / XLogSegSize \
+ )
+#define XLByteToPrevSeg(xlrp, logId, logSeg) \
+ ( logId = (xlrp).xlogid, \
+ logSeg = ((xlrp).xrecoff - 1) / XLogSegSize \
+ )
/*
- * We break each log file in 16Mb segments
+ * Is an XLogRecPtr within a particular XLOG segment?
+ *
+ * For XLByteInSeg, do the computation at face value. For XLByteInPrevSeg,
+ * a boundary byte is taken to be in the previous segment.
*/
-#define XLogSegSize ((uint32) (16*1024*1024))
-#define XLogLastSeg (((uint32) 0xffffffff) / XLogSegSize)
-#define XLogFileSize (XLogLastSeg * XLogSegSize)
-
-#define NextLogSeg(_logId, _logSeg) \
-{\
- if (_logSeg >= XLogLastSeg)\
- {\
- _logId++;\
- _logSeg = 0;\
- }\
- else\
- _logSeg++;\
-}
+#define XLByteInSeg(xlrp, logId, logSeg) \
+ ((xlrp).xlogid == (logId) && \
+ (xlrp).xrecoff / XLogSegSize == (logSeg))
+
+#define XLByteInPrevSeg(xlrp, logId, logSeg) \
+ ((xlrp).xlogid == (logId) && \
+ ((xlrp).xrecoff - 1) / XLogSegSize == (logSeg))
#define XLogFileName(path, log, seg) \
@@ -209,120 +289,133 @@ typedef struct BkpBlock
snprintf(path, MAXPGPATH, "%s%cT%08X%08X", \
XLogDir, SEP_CHAR, log, seg)
-#define PrevBufIdx(curridx) \
- ((curridx == 0) ? XLogCtl->XLogCacheBlck : (curridx - 1))
-
-#define NextBufIdx(curridx) \
- ((curridx == XLogCtl->XLogCacheBlck) ? 0 : (curridx + 1))
-
-#define InitXLBuffer(curridx) (\
- XLogCtl->xlblocks[curridx].xrecoff = \
- (XLogCtl->xlblocks[Insert->curridx].xrecoff == XLogFileSize) ? \
- BLCKSZ : (XLogCtl->xlblocks[Insert->curridx].xrecoff + BLCKSZ), \
- XLogCtl->xlblocks[curridx].xlogid = \
- (XLogCtl->xlblocks[Insert->curridx].xrecoff == XLogFileSize) ? \
- (XLogCtl->xlblocks[Insert->curridx].xlogid + 1) : \
- XLogCtl->xlblocks[Insert->curridx].xlogid, \
- Insert->curridx = curridx, \
- Insert->currpage = (XLogPageHeader) (XLogCtl->pages + curridx * BLCKSZ), \
- Insert->currpos = \
- ((char*) Insert->currpage) + SizeOfXLogPHD, \
- Insert->currpage->xlp_magic = XLOG_PAGE_MAGIC, \
- Insert->currpage->xlp_info = 0 \
- )
+#define PrevBufIdx(idx) \
+ (((idx) == 0) ? XLogCtl->XLogCacheBlck : ((idx) - 1))
+
+#define NextBufIdx(idx) \
+ (((idx) == XLogCtl->XLogCacheBlck) ? 0 : ((idx) + 1))
#define XRecOffIsValid(xrecoff) \
- (xrecoff % BLCKSZ >= SizeOfXLogPHD && \
- (BLCKSZ - xrecoff % BLCKSZ) >= SizeOfXLogRecord)
-
-#define _INTL_MAXLOGRECSZ (3 * MAXLOGRECSZ)
-
-extern uint32 crc_table[];
-#define INIT_CRC64(crc) (crc.crc1 = 0xffffffff, crc.crc2 = 0xffffffff)
-#define FIN_CRC64(crc) (crc.crc1 ^= 0xffffffff, crc.crc2 ^= 0xffffffff)
-#define COMP_CRC64(crc, data, len) \
-{\
- uint32 __c1 = crc.crc1;\
- uint32 __c2 = crc.crc2;\
- char *__data = data;\
- uint32 __len = len;\
-\
- while (__len >= 2)\
- {\
- __c1 = crc_table[(__c1 ^ *__data++) & 0xff] ^ (__c1 >> 8);\
- __c2 = crc_table[(__c2 ^ *__data++) & 0xff] ^ (__c2 >> 8);\
- __len -= 2;\
- }\
- if (__len > 0)\
- __c1 = crc_table[(__c1 ^ *__data++) & 0xff] ^ (__c1 >> 8);\
- crc.crc1 = __c1;\
- crc.crc2 = __c2;\
-}
+ ((xrecoff) % BLCKSZ >= SizeOfXLogPHD && \
+ (BLCKSZ - (xrecoff) % BLCKSZ) >= SizeOfXLogRecord)
-void SetRedoRecPtr(void);
-void GetRedoRecPtr(void);
+/*
+ * _INTL_MAXLOGRECSZ: max space needed for a record including header and
+ * any backup-block data.
+ */
+#define _INTL_MAXLOGRECSZ (SizeOfXLogRecord + MAXLOGRECSZ + \
+ XLR_MAX_BKP_BLOCKS * (sizeof(BkpBlock) + BLCKSZ))
-static void GetFreeXLBuffer(void);
-static void XLogWrite(char *buffer);
-static int XLogFileInit(uint32 log, uint32 seg, bool *usexistent);
-static int XLogFileOpen(uint32 log, uint32 seg, bool econt);
-static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, char *buffer);
-static void WriteControlFile(void);
-static void ReadControlFile(void);
-static char *str_time(time_t tnow);
-static void xlog_outrec(char *buf, XLogRecord *record);
-static XLgwrResult LgwrResult = {{0, 0}, {0, 0}};
-static XLgwrRqst LgwrRqst = {{0, 0}, {0, 0}};
+/* File path names */
+static char XLogDir[MAXPGPATH];
+static char ControlFilePath[MAXPGPATH];
-static int logFile = -1;
-static uint32 logId = 0;
-static uint32 logSeg = 0;
-static uint32 logOff = 0;
+/*
+ * Private, possibly out-of-date copy of shared LogwrtResult.
+ * See discussion above.
+ */
+static XLogwrtResult LogwrtResult = {{0, 0}, {0, 0}};
-static XLogRecPtr ReadRecPtr;
-static XLogRecPtr EndRecPtr;
+/*
+ * openLogFile is -1 or a kernel FD for an open log file segment.
+ * When it's open, openLogOff is the current seek offset in the file.
+ * openLogId/openLogSeg identify the segment. These variables are only
+ * used to write the XLOG, and so will normally refer to the active segment.
+ */
+static int openLogFile = -1;
+static uint32 openLogId = 0;
+static uint32 openLogSeg = 0;
+static uint32 openLogOff = 0;
+
+/*
+ * These variables are used similarly to the ones above, but for reading
+ * the XLOG. Note, however, that readOff generally represents the offset
+ * of the page just read, not the seek position of the FD itself, which
+ * will be just past that page.
+ */
static int readFile = -1;
static uint32 readId = 0;
static uint32 readSeg = 0;
static uint32 readOff = 0;
-static char readBuf[BLCKSZ];
+/* Buffer for currently read page (BLCKSZ bytes) */
+static char *readBuf = NULL;
+/* State information for XLOG reading */
+static XLogRecPtr ReadRecPtr;
+static XLogRecPtr EndRecPtr;
static XLogRecord *nextRecord = NULL;
static bool InRedo = false;
+
+static bool AdvanceXLInsertBuffer(void);
+static void XLogWrite(XLogwrtRqst WriteRqst);
+static int XLogFileInit(uint32 log, uint32 seg, bool *usexistent);
+static int XLogFileOpen(uint32 log, uint32 seg, bool econt);
+static void PreallocXlogFiles(XLogRecPtr endptr);
+static void MoveOfflineLogs(uint32 log, uint32 seg);
+static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode, char *buffer);
+static XLogRecord *ReadCheckpointRecord(XLogRecPtr RecPtr,
+ const char *whichChkpt,
+ char *buffer);
+static void WriteControlFile(void);
+static void ReadControlFile(void);
+static char *str_time(time_t tnow);
+static void xlog_outrec(char *buf, XLogRecord *record);
+
+
+/*
+ * Insert an XLOG record having the specified RMID and info bytes,
+ * with the body of the record being the data chunk(s) described by
+ * the rdata list (see xlog.h for notes about rdata).
+ *
+ * Returns XLOG pointer to end of record (beginning of next record).
+ * This can be used as LSN for data pages affected by the logged action.
+ * (LSN is the XLOG point up to which the XLOG must be flushed to disk
+ * before the data page can be written out. This implements the basic
+ * WAL rule "write the log before the data".)
+ *
+ * NB: this routine feels free to scribble on the XLogRecData structs,
+ * though not on the data they reference. This is OK since the XLogRecData
+ * structs are always just temporaries in the calling code.
+ */
XLogRecPtr
XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
{
XLogCtlInsert *Insert = &XLogCtl->Insert;
XLogRecord *record;
- XLogSubRecord *subrecord;
+ XLogContRecord *contrecord;
XLogRecPtr RecPtr;
+ XLogRecPtr WriteRqst;
uint32 freespace;
uint16 curridx;
XLogRecData *rdt;
- Buffer dtbuf[2] = {InvalidBuffer, InvalidBuffer};
- bool dtbuf_bkp[2] = {false, false};
- XLogRecData dtbuf_rdt[4];
- BkpBlock dtbuf_xlg[2];
- XLogRecPtr dtbuf_lsn[2];
- crc64 dtbuf_crc[2],
- rdata_crc;
- uint32 len;
+ Buffer dtbuf[XLR_MAX_BKP_BLOCKS];
+ bool dtbuf_bkp[XLR_MAX_BKP_BLOCKS];
+ BkpBlock dtbuf_xlg[XLR_MAX_BKP_BLOCKS];
+ XLogRecPtr dtbuf_lsn[XLR_MAX_BKP_BLOCKS];
+ XLogRecData dtbuf_rdt[2 * XLR_MAX_BKP_BLOCKS];
+ crc64 rdata_crc;
+ uint32 len,
+ write_len;
unsigned i;
- bool updrqst = false;
- bool repeat = false;
+ bool do_logwrt;
+ bool updrqst;
bool no_tran = (rmid == RM_XLOG_ID) ? true : false;
if (info & XLR_INFO_MASK)
{
if ((info & XLR_INFO_MASK) != XLOG_NO_TRAN)
elog(STOP, "XLogInsert: invalid info mask %02X",
- (info & XLR_INFO_MASK));
+ (info & XLR_INFO_MASK));
no_tran = true;
info &= ~XLR_INFO_MASK;
}
+ /*
+ * In bootstrap mode, we don't actually log anything but XLOG resources;
+ * return a phony record pointer.
+ */
if (IsBootstrapProcessingMode() && rmid != RM_XLOG_ID)
{
RecPtr.xlogid = 0;
@@ -330,180 +423,229 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
return (RecPtr);
}
+ /*
+ * Here we scan the rdata list, determine which buffers must be backed
+ * up, and compute the CRC values for the data. Note that the record
+ * header isn't added into the CRC yet since we don't know the final
+ * length or info bits quite yet.
+ *
+ * We may have to loop back to here if a race condition is detected below.
+ * We could prevent the race by doing all this work while holding the
+ * insert spinlock, but it seems better to avoid doing CRC calculations
+ * while holding the lock. This means we have to be careful about
+ * modifying the rdata list until we know we aren't going to loop back
+ * again. The only change we allow ourselves to make earlier is to set
+ * rdt->data = NULL in list items we have decided we will have to back
+ * up the whole buffer for. This is OK because we will certainly decide
+ * the same thing again for those items if we do it over; doing it here
+ * saves an extra pass over the list later.
+ */
begin:;
+ for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
+ {
+ dtbuf[i] = InvalidBuffer;
+ dtbuf_bkp[i] = false;
+ }
+
INIT_CRC64(rdata_crc);
- for (len = 0, rdt = rdata; ; )
+ len = 0;
+ for (rdt = rdata; ; )
{
if (rdt->buffer == InvalidBuffer)
{
+ /* Simple data, just include it */
len += rdt->len;
COMP_CRC64(rdata_crc, rdt->data, rdt->len);
- if (rdt->next == NULL)
- break;
- rdt = rdt->next;
- continue;
}
- for (i = 0; i < 2; i++)
+ else
{
- if (rdt->buffer == dtbuf[i])
+ /* Find info for buffer */
+ for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
{
- if (dtbuf_bkp[i])
- rdt->data = NULL;
- else if (rdt->data)
+ if (rdt->buffer == dtbuf[i])
{
- len += rdt->len;
- COMP_CRC64(rdata_crc, rdt->data, rdt->len);
- }
- break;
- }
- if (dtbuf[i] == InvalidBuffer)
- {
- dtbuf[i] = rdt->buffer;
- dtbuf_lsn[i] = *((XLogRecPtr*)(BufferGetBlock(rdt->buffer)));
- if (XLByteLE(dtbuf_lsn[i], RedoRecPtr))
- {
- crc64 crc;
-
- dtbuf_bkp[i] = true;
- rdt->data = NULL;
- INIT_CRC64(crc);
- COMP_CRC64(crc, ((char*)BufferGetBlock(dtbuf[i])), BLCKSZ);
- dtbuf_crc[i] = crc;
+ /* Buffer already referenced by earlier list item */
+ if (dtbuf_bkp[i])
+ rdt->data = NULL;
+ else if (rdt->data)
+ {
+ len += rdt->len;
+ COMP_CRC64(rdata_crc, rdt->data, rdt->len);
+ }
+ break;
}
- else if (rdt->data)
+ if (dtbuf[i] == InvalidBuffer)
{
- len += rdt->len;
- COMP_CRC64(rdata_crc, rdt->data, rdt->len);
+ /* OK, put it in this slot */
+ dtbuf[i] = rdt->buffer;
+ /*
+ * XXX We assume page LSN is first data on page
+ */
+ dtbuf_lsn[i] = *((XLogRecPtr*)BufferGetBlock(rdt->buffer));
+ if (XLByteLE(dtbuf_lsn[i], RedoRecPtr))
+ {
+ crc64 dtcrc;
+
+ dtbuf_bkp[i] = true;
+ rdt->data = NULL;
+ INIT_CRC64(dtcrc);
+ COMP_CRC64(dtcrc,
+ BufferGetBlock(dtbuf[i]),
+ BLCKSZ);
+ dtbuf_xlg[i].node = BufferGetFileNode(dtbuf[i]);
+ dtbuf_xlg[i].block = BufferGetBlockNumber(dtbuf[i]);
+ COMP_CRC64(dtcrc,
+ (char*) &(dtbuf_xlg[i]) + sizeof(crc64),
+ sizeof(BkpBlock) - sizeof(crc64));
+ FIN_CRC64(dtcrc);
+ dtbuf_xlg[i].crc = dtcrc;
+ }
+ else if (rdt->data)
+ {
+ len += rdt->len;
+ COMP_CRC64(rdata_crc, rdt->data, rdt->len);
+ }
+ break;
}
- break;
}
+ if (i >= XLR_MAX_BKP_BLOCKS)
+ elog(STOP, "XLogInsert: can backup %d blocks at most",
+ XLR_MAX_BKP_BLOCKS);
}
- if (i >= 2)
- elog(STOP, "XLogInsert: can backup 2 blocks at most");
+ /* Break out of loop when rdt points to last list item */
if (rdt->next == NULL)
break;
rdt = rdt->next;
}
+ /*
+ * NOTE: the test for len == 0 here is somewhat fishy, since in theory
+ * all of the rmgr data might have been suppressed in favor of backup
+ * blocks. Currently, all callers of XLogInsert provide at least some
+ * not-in-a-buffer data and so len == 0 should never happen, but that
+ * may not be true forever. If you need to remove the len == 0 check,
+ * also remove the check for xl_len == 0 in ReadRecord, below.
+ */
if (len == 0 || len > MAXLOGRECSZ)
elog(STOP, "XLogInsert: invalid record len %u", len);
START_CRIT_SECTION();
- /* obtain xlog insert lock */
- if (TAS(&(XLogCtl->insert_lck))) /* busy */
- {
- bool do_lgwr = true;
+ /* wait to obtain xlog insert lock */
+ do_logwrt = true;
- for (i = 0;;)
+ for (i = 0;;)
+ {
+ /* try to update LogwrtResult while waiting for insert lock */
+ if (!TAS(&(XLogCtl->info_lck)))
{
- /* try to read LgwrResult while waiting for insert lock */
- if (!TAS(&(XLogCtl->info_lck)))
- {
- LgwrRqst = XLogCtl->LgwrRqst;
- LgwrResult = XLogCtl->LgwrResult;
- S_UNLOCK(&(XLogCtl->info_lck));
+ XLogwrtRqst LogwrtRqst;
- /*
- * If cache is half filled then try to acquire lgwr lock
- * and do LGWR work, but only once.
- */
- if (do_lgwr &&
- (LgwrRqst.Write.xlogid != LgwrResult.Write.xlogid ||
- (LgwrRqst.Write.xrecoff - LgwrResult.Write.xrecoff >=
- XLogCtl->XLogCacheByte / 2)))
+ LogwrtRqst = XLogCtl->LogwrtRqst;
+ LogwrtResult = XLogCtl->LogwrtResult;
+ S_UNLOCK(&(XLogCtl->info_lck));
+
+ /*
+ * If cache is half filled then try to acquire logwrt lock
+ * and do LOGWRT work, but only once per XLogInsert call.
+ * Ignore any fractional blocks in performing this check.
+ */
+ LogwrtRqst.Write.xrecoff -= LogwrtRqst.Write.xrecoff % BLCKSZ;
+ if (do_logwrt &&
+ (LogwrtRqst.Write.xlogid != LogwrtResult.Write.xlogid ||
+ (LogwrtRqst.Write.xrecoff >= LogwrtResult.Write.xrecoff +
+ XLogCtl->XLogCacheByte / 2)))
+ {
+ if (!TAS(&(XLogCtl->logwrt_lck)))
{
- if (!TAS(&(XLogCtl->lgwr_lck)))
+ LogwrtResult = XLogCtl->Write.LogwrtResult;
+ if (XLByteLT(LogwrtResult.Write, LogwrtRqst.Write))
{
- LgwrResult = XLogCtl->Write.LgwrResult;
- if (!TAS(&(XLogCtl->info_lck)))
- {
- LgwrRqst = XLogCtl->LgwrRqst;
- S_UNLOCK(&(XLogCtl->info_lck));
- }
- if (XLByteLT(LgwrResult.Write, LgwrRqst.Write))
- {
- XLogWrite(NULL);
- do_lgwr = false;
- }
- S_UNLOCK(&(XLogCtl->lgwr_lck));
+ XLogWrite(LogwrtRqst);
+ do_logwrt = false;
}
+ S_UNLOCK(&(XLogCtl->logwrt_lck));
}
}
- S_LOCK_SLEEP(&(XLogCtl->insert_lck), i++, XLOG_LOCK_TIMEOUT);
- if (!TAS(&(XLogCtl->insert_lck)))
- break;
}
+ if (!TAS(&(XLogCtl->insert_lck)))
+ break;
+ S_LOCK_SLEEP(&(XLogCtl->insert_lck), i++, XLOG_LOCK_TIMEOUT);
}
- /* Race condition: RedoRecPtr was changed */
- RedoRecPtr = Insert->RedoRecPtr;
- repeat = false;
- for (i = 0; i < 2; i++)
+ /*
+ * Check to see if my RedoRecPtr is out of date. If so, may have to
+ * go back and recompute everything. This can only happen just after a
+ * checkpoint, so it's better to be slow in this case and fast otherwise.
+ */
+ if (!XLByteEQ(RedoRecPtr, Insert->RedoRecPtr))
{
- if (dtbuf[i] == InvalidBuffer)
- continue;
- if (dtbuf_bkp[i] == false &&
- XLByteLE(dtbuf_lsn[i], RedoRecPtr))
+ Assert(XLByteLT(RedoRecPtr, Insert->RedoRecPtr));
+ RedoRecPtr = Insert->RedoRecPtr;
+
+ for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
{
- dtbuf[i] = InvalidBuffer;
- repeat = true;
+ if (dtbuf[i] == InvalidBuffer)
+ continue;
+ if (dtbuf_bkp[i] == false &&
+ XLByteLE(dtbuf_lsn[i], RedoRecPtr))
+ {
+ /*
+ * Oops, this buffer now needs to be backed up, but we didn't
+ * think so above. Start over.
+ */
+ S_UNLOCK(&(XLogCtl->insert_lck));
+ END_CRIT_SECTION();
+ goto begin;
+ }
}
}
- if (repeat)
- {
- S_UNLOCK(&(XLogCtl->insert_lck));
- END_CRIT_SECTION();
- goto begin;
- }
- /* Attach backup blocks to record data */
- for (i = 0; i < 2; i++)
+ /*
+ * Make additional rdata list entries for the backup blocks, so that
+ * we don't need to special-case them in the write loop. Note that we
+ * have now irrevocably changed the input rdata list. At the exit of
+ * this loop, write_len includes the backup block data.
+ *
+ * Also set the appropriate info bits to show which buffers were backed
+ * up. The i'th XLR_SET_BKP_BLOCK bit corresponds to the i'th distinct
+ * buffer value (ignoring InvalidBuffer) appearing in the rdata list.
+ */
+ write_len = len;
+ for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
{
if (dtbuf[i] == InvalidBuffer || !(dtbuf_bkp[i]))
continue;
- info |= (XLR_SET_BKP_BLOCK(i));
-
- dtbuf_xlg[i].node = BufferGetFileNode(dtbuf[i]);
- dtbuf_xlg[i].block = BufferGetBlockNumber(dtbuf[i]);
- COMP_CRC64(dtbuf_crc[i],
- ((char*)&(dtbuf_xlg[i]) + offsetof(BkpBlock, node)),
- (sizeof(BkpBlock) - offsetof(BkpBlock, node)));
- FIN_CRC64(dtbuf_crc[i]);
- dtbuf_xlg[i].crc = dtbuf_crc[i];
+ info |= XLR_SET_BKP_BLOCK(i);
rdt->next = &(dtbuf_rdt[2 * i]);
- dtbuf_rdt[2 * i].data = (char*)&(dtbuf_xlg[i]);
+ dtbuf_rdt[2 * i].data = (char*) &(dtbuf_xlg[i]);
dtbuf_rdt[2 * i].len = sizeof(BkpBlock);
- len += sizeof(BkpBlock);
+ write_len += sizeof(BkpBlock);
rdt = dtbuf_rdt[2 * i].next = &(dtbuf_rdt[2 * i + 1]);
- dtbuf_rdt[2 * i + 1].data = (char*)(BufferGetBlock(dtbuf[i]));
+ dtbuf_rdt[2 * i + 1].data = (char*) BufferGetBlock(dtbuf[i]);
dtbuf_rdt[2 * i + 1].len = BLCKSZ;
- len += BLCKSZ;
+ write_len += BLCKSZ;
dtbuf_rdt[2 * i + 1].next = NULL;
}
- /* Insert record */
+ /* Insert record header */
- freespace = ((char *) Insert->currpage) + BLCKSZ - Insert->currpos;
+ updrqst = false;
+ freespace = INSERT_FREESPACE(Insert);
if (freespace < SizeOfXLogRecord)
{
- curridx = NextBufIdx(Insert->curridx);
- if (XLByteLE(XLogCtl->xlblocks[curridx], LgwrResult.Write))
- InitXLBuffer(curridx);
- else
- GetFreeXLBuffer();
+ updrqst = AdvanceXLInsertBuffer();
freespace = BLCKSZ - SizeOfXLogPHD;
}
- else
- curridx = Insert->curridx;
- freespace -= SizeOfXLogRecord;
+ curridx = Insert->curridx;
record = (XLogRecord *) Insert->currpos;
+
record->xl_prev = Insert->PrevRecord;
if (no_tran)
{
@@ -514,26 +656,26 @@ begin:;
record->xl_xact_prev = MyLastRecPtr;
record->xl_xid = GetCurrentTransactionId();
- record->xl_len = len;
+ record->xl_len = len; /* doesn't include backup blocks */
record->xl_info = info;
record->xl_rmid = rmid;
- COMP_CRC64(rdata_crc, ((char*)record + offsetof(XLogRecord, xl_prev)),
- (SizeOfXLogRecord - offsetof(XLogRecord, xl_prev)));
+ /* Now we can finish computing the main CRC */
+ COMP_CRC64(rdata_crc, (char*) record + sizeof(crc64),
+ SizeOfXLogRecord - sizeof(crc64));
FIN_CRC64(rdata_crc);
record->xl_crc = rdata_crc;
- RecPtr.xlogid = XLogCtl->xlblocks[curridx].xlogid;
- RecPtr.xrecoff =
- XLogCtl->xlblocks[curridx].xrecoff - BLCKSZ +
- Insert->currpos - ((char *) Insert->currpage);
+ /* Compute record's XLOG location */
+ INSERT_RECPTR(RecPtr, Insert, curridx);
+
+ /* If first XLOG record of transaction, save it in PROC array */
if (MyLastRecPtr.xrecoff == 0 && !no_tran)
{
SpinAcquire(SInvalLock);
MyProc->logRec = RecPtr;
SpinRelease(SInvalLock);
}
- Insert->PrevRecord = RecPtr;
if (XLOG_DEBUG)
{
@@ -546,14 +688,22 @@ begin:;
strcat(buf, " - ");
RmgrTable[record->xl_rmid].rm_desc(buf, record->xl_info, rdata->data);
}
- strcat(buf, "\n");
- write(2, buf, strlen(buf));
+ fprintf(stderr, "%s\n", buf);
}
- MyLastRecPtr = RecPtr; /* begin of record */
+ /* Record begin of record in appropriate places */
+ if (!no_tran)
+ MyLastRecPtr = RecPtr;
+ ProcLastRecPtr = RecPtr;
+ Insert->PrevRecord = RecPtr;
+
Insert->currpos += SizeOfXLogRecord;
+ freespace -= SizeOfXLogRecord;
- while (len)
+ /*
+ * Append the data, including backup blocks if any
+ */
+ while (write_len)
{
while (rdata->data == NULL)
rdata = rdata->next;
@@ -565,13 +715,13 @@ begin:;
memcpy(Insert->currpos, rdata->data, freespace);
rdata->data += freespace;
rdata->len -= freespace;
- len -= freespace;
+ write_len -= freespace;
}
else
{
memcpy(Insert->currpos, rdata->data, rdata->len);
freespace -= rdata->len;
- len -= rdata->len;
+ write_len -= rdata->len;
Insert->currpos += rdata->len;
rdata = rdata->next;
continue;
@@ -579,49 +729,44 @@ begin:;
}
/* Use next buffer */
- curridx = NextBufIdx(curridx);
- if (XLByteLE(XLogCtl->xlblocks[curridx], LgwrResult.Write))
- {
- InitXLBuffer(curridx);
- updrqst = true;
- }
- else
- GetFreeXLBuffer();
- freespace = BLCKSZ - SizeOfXLogPHD - SizeOfXLogSubRecord;
- Insert->currpage->xlp_info |= XLP_FIRST_IS_SUBRECORD;
- subrecord = (XLogSubRecord *) Insert->currpos;
- subrecord->xl_len = len;
- Insert->currpos += SizeOfXLogSubRecord;
+ updrqst = AdvanceXLInsertBuffer();
+ curridx = Insert->curridx;
+ /* Insert cont-record header */
+ Insert->currpage->xlp_info |= XLP_FIRST_IS_CONTRECORD;
+ contrecord = (XLogContRecord *) Insert->currpos;
+ contrecord->xl_rem_len = write_len;
+ Insert->currpos += SizeOfXLogContRecord;
+ freespace = BLCKSZ - SizeOfXLogPHD - SizeOfXLogContRecord;
}
- Insert->currpos = ((char *) Insert->currpage) +
- MAXALIGN(Insert->currpos - ((char *) Insert->currpage));
- freespace = ((char *) Insert->currpage) + BLCKSZ - Insert->currpos;
+ /* Ensure next record will be properly aligned */
+ Insert->currpos = (char *) Insert->currpage +
+ MAXALIGN(Insert->currpos - (char *) Insert->currpage);
+ freespace = INSERT_FREESPACE(Insert);
/*
- * Begin of the next record will be stored as LSN for
- * changed data page...
+ * The recptr I return is the beginning of the *next* record.
+ * This will be stored as LSN for changed data pages...
*/
- RecPtr.xlogid = XLogCtl->xlblocks[curridx].xlogid;
- RecPtr.xrecoff =
- XLogCtl->xlblocks[curridx].xrecoff - BLCKSZ +
- Insert->currpos - ((char *) Insert->currpage);
+ INSERT_RECPTR(RecPtr, Insert, curridx);
- /* Need to update global LgwrRqst if some block was filled up */
+ /* Need to update shared LogwrtRqst if some block was filled up */
if (freespace < SizeOfXLogRecord)
updrqst = true; /* curridx is filled and available for writing out */
else
curridx = PrevBufIdx(curridx);
- LgwrRqst.Write = XLogCtl->xlblocks[curridx];
+ WriteRqst = XLogCtl->xlblocks[curridx];
S_UNLOCK(&(XLogCtl->insert_lck));
if (updrqst)
{
S_LOCK(&(XLogCtl->info_lck));
- LgwrResult = XLogCtl->LgwrResult;
- if (XLByteLT(XLogCtl->LgwrRqst.Write, LgwrRqst.Write))
- XLogCtl->LgwrRqst.Write = LgwrRqst.Write;
+ /* advance global request to include new block(s) */
+ if (XLByteLT(XLogCtl->LogwrtRqst.Write, WriteRqst))
+ XLogCtl->LogwrtRqst.Write = WriteRqst;
+ /* update local result copy while I have the chance */
+ LogwrtResult = XLogCtl->LogwrtResult;
S_UNLOCK(&(XLogCtl->info_lck));
}
@@ -629,327 +774,403 @@ begin:;
return (RecPtr);
}
-void
-XLogFlush(XLogRecPtr record)
+/*
+ * Advance the Insert state to the next buffer page, writing out the next
+ * buffer if it still contains unwritten data.
+ *
+ * The global LogwrtRqst.Write pointer needs to be advanced to include the
+ * just-filled page. If we can do this for free (without an extra spinlock),
+ * we do so here. Otherwise the caller must do it. We return TRUE if the
+ * request update still needs to be done, FALSE if we did it internally.
+ *
+ * Must be called with insert_lck held.
+ */
+static bool
+AdvanceXLInsertBuffer(void)
{
- XLogRecPtr WriteRqst;
- char buffer[BLCKSZ];
- char *usebuf = NULL;
- unsigned spins = 0;
- bool force_lgwr = false;
+ XLogCtlInsert *Insert = &XLogCtl->Insert;
+ XLogCtlWrite *Write = &XLogCtl->Write;
+ uint16 nextidx = NextBufIdx(Insert->curridx);
+ bool update_needed = true;
+ XLogRecPtr OldPageRqstPtr;
+ XLogwrtRqst WriteRqst;
- if (XLOG_DEBUG)
- {
- fprintf(stderr, "XLogFlush%s%s: rqst %u/%u; wrt %u/%u; flsh %u/%u\n",
- (IsBootstrapProcessingMode()) ? "(bootstrap)" : "",
- (InRedo) ? "(redo)" : "",
- record.xlogid, record.xrecoff,
- LgwrResult.Write.xlogid, LgwrResult.Write.xrecoff,
- LgwrResult.Flush.xlogid, LgwrResult.Flush.xrecoff);
- fflush(stderr);
- }
+ /* Use Insert->LogwrtResult copy if it's more fresh */
+ if (XLByteLT(LogwrtResult.Write, Insert->LogwrtResult.Write))
+ LogwrtResult = Insert->LogwrtResult;
- if (InRedo)
- return;
- if (XLByteLE(record, LgwrResult.Flush))
- return;
+ /*
+ * Get ending-offset of the buffer page we need to replace (this may be
+ * zero if the buffer hasn't been used yet). Fall through if it's already
+ * written out.
+ */
+ OldPageRqstPtr = XLogCtl->xlblocks[nextidx];
+ if (!XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
+ {
+ /* nope, got work to do... */
+ unsigned spins = 0;
+ XLogRecPtr FinishedPageRqstPtr;
- START_CRIT_SECTION();
+ FinishedPageRqstPtr = XLogCtl->xlblocks[Insert->curridx];
- WriteRqst = LgwrRqst.Write;
- for (;;)
- {
- /* try to read LgwrResult */
- if (!TAS(&(XLogCtl->info_lck)))
+ for (;;)
{
- LgwrResult = XLogCtl->LgwrResult;
- if (XLByteLE(record, LgwrResult.Flush))
+ /* While waiting, try to get info_lck and update LogwrtResult */
+ if (!TAS(&(XLogCtl->info_lck)))
{
+ if (XLByteLT(XLogCtl->LogwrtRqst.Write, FinishedPageRqstPtr))
+ XLogCtl->LogwrtRqst.Write = FinishedPageRqstPtr;
+ update_needed = false; /* Did the shared-request update */
+ LogwrtResult = XLogCtl->LogwrtResult;
S_UNLOCK(&(XLogCtl->info_lck));
- END_CRIT_SECTION();
- return;
- }
- if (XLByteLT(XLogCtl->LgwrRqst.Flush, record))
- XLogCtl->LgwrRqst.Flush = record;
- if (XLByteLT(WriteRqst, XLogCtl->LgwrRqst.Write))
- {
- WriteRqst = XLogCtl->LgwrRqst.Write;
- usebuf = NULL;
- }
- S_UNLOCK(&(XLogCtl->info_lck));
- }
- /* if something was added to log cache then try to flush this too */
- if (!TAS(&(XLogCtl->insert_lck)))
- {
- XLogCtlInsert *Insert = &XLogCtl->Insert;
- uint32 freespace =
- ((char *) Insert->currpage) + BLCKSZ - Insert->currpos;
- if (freespace < SizeOfXLogRecord) /* buffer is full */
- {
- usebuf = NULL;
- LgwrRqst.Write = WriteRqst = XLogCtl->xlblocks[Insert->curridx];
- }
- else
- {
- usebuf = buffer;
- memcpy(usebuf, Insert->currpage, BLCKSZ - freespace);
- memset(usebuf + BLCKSZ - freespace, 0, freespace);
- WriteRqst = XLogCtl->xlblocks[Insert->curridx];
- WriteRqst.xrecoff = WriteRqst.xrecoff - BLCKSZ +
- Insert->currpos - ((char *) Insert->currpage);
- }
- S_UNLOCK(&(XLogCtl->insert_lck));
- force_lgwr = true;
- }
- if (force_lgwr || WriteRqst.xlogid > record.xlogid ||
- (WriteRqst.xlogid == record.xlogid &&
- WriteRqst.xrecoff >= record.xrecoff + BLCKSZ))
- {
- if (!TAS(&(XLogCtl->lgwr_lck)))
- {
- LgwrResult = XLogCtl->Write.LgwrResult;
- if (XLByteLE(record, LgwrResult.Flush))
+ if (XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
{
- S_UNLOCK(&(XLogCtl->lgwr_lck));
- END_CRIT_SECTION();
- return;
+ /* OK, someone wrote it already */
+ Insert->LogwrtResult = LogwrtResult;
+ break;
}
- if (XLByteLT(LgwrResult.Write, WriteRqst))
+ }
+
+ /*
+ * LogwrtResult lock is busy or we know the page is still dirty.
+ * Try to acquire logwrt lock and write full blocks.
+ */
+ if (!TAS(&(XLogCtl->logwrt_lck)))
+ {
+ LogwrtResult = Write->LogwrtResult;
+ if (XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
{
- LgwrRqst.Flush = LgwrRqst.Write = WriteRqst;
- XLogWrite(usebuf);
- S_UNLOCK(&(XLogCtl->lgwr_lck));
- if (XLByteLT(LgwrResult.Flush, record))
- elog(STOP, "XLogFlush: request is not satisfied");
- END_CRIT_SECTION();
- return;
+ S_UNLOCK(&(XLogCtl->logwrt_lck));
+ /* OK, someone wrote it already */
+ Insert->LogwrtResult = LogwrtResult;
+ break;
}
+ /*
+ * Have to write buffers while holding insert lock.
+ * This is not good, so only write as much as we absolutely
+ * must.
+ */
+ WriteRqst.Write = OldPageRqstPtr;
+ WriteRqst.Flush.xlogid = 0;
+ WriteRqst.Flush.xrecoff = 0;
+ XLogWrite(WriteRqst);
+ S_UNLOCK(&(XLogCtl->logwrt_lck));
+ Insert->LogwrtResult = LogwrtResult;
break;
}
+ S_LOCK_SLEEP(&(XLogCtl->logwrt_lck), spins++, XLOG_LOCK_TIMEOUT);
}
- S_LOCK_SLEEP(&(XLogCtl->lgwr_lck), spins++, XLOG_LOCK_TIMEOUT);
}
- if (logFile >= 0 && (LgwrResult.Write.xlogid != logId ||
- (LgwrResult.Write.xrecoff - 1) / XLogSegSize != logSeg))
+ /*
+ * Now the next buffer slot is free and we can set it up to be the
+ * next output page.
+ */
+ if (XLogCtl->xlblocks[Insert->curridx].xrecoff >= XLogFileSize)
{
- if (close(logFile) != 0)
- elog(STOP, "close(logfile %u seg %u) failed: %m",
- logId, logSeg);
- logFile = -1;
+ /* crossing a logid boundary */
+ XLogCtl->xlblocks[nextidx].xlogid =
+ XLogCtl->xlblocks[Insert->curridx].xlogid + 1;
+ XLogCtl->xlblocks[nextidx].xrecoff = BLCKSZ;
}
-
- if (logFile < 0)
+ else
{
- logId = LgwrResult.Write.xlogid;
- logSeg = (LgwrResult.Write.xrecoff - 1) / XLogSegSize;
- logOff = 0;
- logFile = XLogFileOpen(logId, logSeg, false);
+ XLogCtl->xlblocks[nextidx].xlogid =
+ XLogCtl->xlblocks[Insert->curridx].xlogid;
+ XLogCtl->xlblocks[nextidx].xrecoff =
+ XLogCtl->xlblocks[Insert->curridx].xrecoff + BLCKSZ;
}
+ Insert->curridx = nextidx;
+ Insert->currpage = (XLogPageHeader) (XLogCtl->pages + nextidx * BLCKSZ);
+ Insert->currpos = ((char*) Insert->currpage) + SizeOfXLogPHD;
+ /*
+ * Be sure to re-zero the buffer so that bytes beyond what we've written
+ * will look like zeroes and not valid XLOG records...
+ */
+ MemSet((char*) Insert->currpage, 0, BLCKSZ);
+ Insert->currpage->xlp_magic = XLOG_PAGE_MAGIC;
+ /* Insert->currpage->xlp_info = 0; */ /* done by memset */
- if (pg_fdatasync(logFile) != 0)
- elog(STOP, "fsync(logfile %u seg %u) failed: %m",
- logId, logSeg);
- LgwrResult.Flush = LgwrResult.Write;
-
- S_LOCK(&(XLogCtl->info_lck));
- XLogCtl->LgwrResult = LgwrResult;
- if (XLByteLT(XLogCtl->LgwrRqst.Write, LgwrResult.Write))
- XLogCtl->LgwrRqst.Write = LgwrResult.Write;
- S_UNLOCK(&(XLogCtl->info_lck));
-
- XLogCtl->Write.LgwrResult = LgwrResult;
-
- S_UNLOCK(&(XLogCtl->lgwr_lck));
-
- END_CRIT_SECTION();
- return;
-
+ return update_needed;
}
/*
- * We use this routine when Insert->curridx block is full and the next XLOG
- * buffer looks as unwritten to OS' cache. insert_lck is assumed here.
+ * Write and/or fsync the log at least as far as WriteRqst indicates.
+ *
+ * Must be called with logwrt_lck held.
*/
static void
-GetFreeXLBuffer()
+XLogWrite(XLogwrtRqst WriteRqst)
{
- XLogCtlInsert *Insert = &XLogCtl->Insert;
XLogCtlWrite *Write = &XLogCtl->Write;
- uint16 curridx = NextBufIdx(Insert->curridx);
- unsigned spins = 0;
+ char *from;
+ bool ispartialpage;
+ bool usexistent;
- /* Use Insert->LgwrResult copy if it's more fresh */
- if (XLByteLT(LgwrResult.Write, Insert->LgwrResult.Write))
- {
- LgwrResult = Insert->LgwrResult;
- if (XLByteLE(XLogCtl->xlblocks[curridx], LgwrResult.Write))
- {
- InitXLBuffer(curridx);
- return;
- }
- }
+ /* Update local LogwrtResult (caller probably did this already, but...) */
+ LogwrtResult = Write->LogwrtResult;
- LgwrRqst.Write = XLogCtl->xlblocks[Insert->curridx];
- for (;;)
+ while (XLByteLT(LogwrtResult.Write, WriteRqst.Write))
{
- if (!TAS(&(XLogCtl->info_lck)))
- {
- LgwrResult = XLogCtl->LgwrResult;
- /* LgwrRqst.Write GE XLogCtl->LgwrRqst.Write */
- XLogCtl->LgwrRqst.Write = LgwrRqst.Write;
- S_UNLOCK(&(XLogCtl->info_lck));
- if (XLByteLE(XLogCtl->xlblocks[curridx], LgwrResult.Write))
- {
- Insert->LgwrResult = LgwrResult;
- InitXLBuffer(curridx);
- return;
- }
- }
+ /* Advance LogwrtResult.Write to end of current buffer page */
+ LogwrtResult.Write = XLogCtl->xlblocks[Write->curridx];
+ ispartialpage = XLByteLT(WriteRqst.Write, LogwrtResult.Write);
- /*
- * LgwrResult lock is busy or un-updated. Try to acquire lgwr lock
- * and write full blocks.
- */
- if (!TAS(&(XLogCtl->lgwr_lck)))
+ if (!XLByteInPrevSeg(LogwrtResult.Write, openLogId, openLogSeg))
{
- LgwrResult = Write->LgwrResult;
- if (XLByteLE(XLogCtl->xlblocks[curridx], LgwrResult.Write))
- {
- S_UNLOCK(&(XLogCtl->lgwr_lck));
- Insert->LgwrResult = LgwrResult;
- InitXLBuffer(curridx);
- return;
- }
-
/*
- * Have to write buffers while holding insert lock - not
- * good...
+ * Switch to new logfile segment.
*/
- XLogWrite(NULL);
- S_UNLOCK(&(XLogCtl->lgwr_lck));
- Insert->LgwrResult = LgwrResult;
- InitXLBuffer(curridx);
- return;
- }
- S_LOCK_SLEEP(&(XLogCtl->lgwr_lck), spins++, XLOG_LOCK_TIMEOUT);
- }
-}
-
-static void
-XLogWrite(char *buffer)
-{
- XLogCtlWrite *Write = &XLogCtl->Write;
- char *from;
- uint32 wcnt = 0;
- bool usexistent;
-
- for (; XLByteLT(LgwrResult.Write, LgwrRqst.Write);)
- {
- LgwrResult.Write = XLogCtl->xlblocks[Write->curridx];
- if (LgwrResult.Write.xlogid != logId ||
- (LgwrResult.Write.xrecoff - 1) / XLogSegSize != logSeg)
- {
- if (wcnt > 0)
- {
- if (pg_fdatasync(logFile) != 0)
- elog(STOP, "fsync(logfile %u seg %u) failed: %m",
- logId, logSeg);
- if (LgwrResult.Write.xlogid != logId)
- LgwrResult.Flush.xrecoff = XLogFileSize;
- else
- LgwrResult.Flush.xrecoff = LgwrResult.Write.xrecoff - BLCKSZ;
- LgwrResult.Flush.xlogid = logId;
- if (!TAS(&(XLogCtl->info_lck)))
- {
- XLogCtl->LgwrResult.Flush = LgwrResult.Flush;
- XLogCtl->LgwrResult.Write = LgwrResult.Flush;
- if (XLByteLT(XLogCtl->LgwrRqst.Write, LgwrResult.Flush))
- XLogCtl->LgwrRqst.Write = LgwrResult.Flush;
- if (XLByteLT(XLogCtl->LgwrRqst.Flush, LgwrResult.Flush))
- XLogCtl->LgwrRqst.Flush = LgwrResult.Flush;
- S_UNLOCK(&(XLogCtl->info_lck));
- }
- }
- if (logFile >= 0)
+ if (openLogFile >= 0)
{
- if (close(logFile) != 0)
+ if (close(openLogFile) != 0)
elog(STOP, "close(logfile %u seg %u) failed: %m",
- logId, logSeg);
- logFile = -1;
+ openLogId, openLogSeg);
+ openLogFile = -1;
}
- logId = LgwrResult.Write.xlogid;
- logSeg = (LgwrResult.Write.xrecoff - 1) / XLogSegSize;
- logOff = 0;
+ XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg);
+
+ /* create/use new log file; need lock in case creating */
SpinAcquire(ControlFileLockId);
- /* create/use new log file */
usexistent = true;
- logFile = XLogFileInit(logId, logSeg, &usexistent);
- ControlFile->logId = logId;
- ControlFile->logSeg = logSeg + 1;
- ControlFile->time = time(NULL);
- UpdateControlFile();
+ openLogFile = XLogFileInit(openLogId, openLogSeg, &usexistent);
+ openLogOff = 0;
+ /* update pg_control, unless someone else already did */
+ if (ControlFile->logId != openLogId ||
+ ControlFile->logSeg != openLogSeg + 1)
+ {
+ ControlFile->logId = openLogId;
+ ControlFile->logSeg = openLogSeg + 1;
+ ControlFile->time = time(NULL);
+ UpdateControlFile();
+ }
SpinRelease(ControlFileLockId);
- if (!usexistent) /* there was no file */
+
+ if (!usexistent) /* there was no precreated file */
elog(LOG, "XLogWrite: new log file created - "
- "try to increase WAL_FILES");
+ "consider increasing WAL_FILES");
+
+ /*
+ * Signal postmaster to start a checkpoint if it's been too
+ * long since the last one. (We look at local copy of RedoRecPtr
+ * which might be a little out of date, but should be close enough
+ * for this purpose.)
+ */
+ if (IsUnderPostmaster &&
+ (openLogId != RedoRecPtr.xlogid ||
+ openLogSeg >= (RedoRecPtr.xrecoff / XLogSegSize) +
+ (uint32) CheckPointSegments))
+ {
+ if (XLOG_DEBUG)
+ fprintf(stderr, "XLogWrite: time for a checkpoint, signaling postmaster\n");
+ kill(getppid(), SIGUSR1);
+ }
}
- if (logFile < 0)
+ if (openLogFile < 0)
{
- logId = LgwrResult.Write.xlogid;
- logSeg = (LgwrResult.Write.xrecoff - 1) / XLogSegSize;
- logOff = 0;
- logFile = XLogFileOpen(logId, logSeg, false);
+ XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg);
+ openLogFile = XLogFileOpen(openLogId, openLogSeg, false);
+ openLogOff = 0;
}
- if (logOff != (LgwrResult.Write.xrecoff - BLCKSZ) % XLogSegSize)
+ /* Need to seek in the file? */
+ if (openLogOff != (LogwrtResult.Write.xrecoff - BLCKSZ) % XLogSegSize)
{
- logOff = (LgwrResult.Write.xrecoff - BLCKSZ) % XLogSegSize;
- if (lseek(logFile, (off_t) logOff, SEEK_SET) < 0)
+ openLogOff = (LogwrtResult.Write.xrecoff - BLCKSZ) % XLogSegSize;
+ if (lseek(openLogFile, (off_t) openLogOff, SEEK_SET) < 0)
elog(STOP, "lseek(logfile %u seg %u off %u) failed: %m",
- logId, logSeg, logOff);
+ openLogId, openLogSeg, openLogOff);
}
- if (buffer != NULL && XLByteLT(LgwrRqst.Write, LgwrResult.Write))
- from = buffer;
- else
- from = XLogCtl->pages + Write->curridx * BLCKSZ;
-
- if (write(logFile, from, BLCKSZ) != BLCKSZ)
+ /* OK to write the page */
+ from = XLogCtl->pages + Write->curridx * BLCKSZ;
+ if (write(openLogFile, from, BLCKSZ) != BLCKSZ)
elog(STOP, "write(logfile %u seg %u off %u) failed: %m",
- logId, logSeg, logOff);
+ openLogId, openLogSeg, openLogOff);
+ openLogOff += BLCKSZ;
- wcnt++;
- logOff += BLCKSZ;
+ /*
+ * If we just wrote the whole last page of a logfile segment,
+ * fsync the segment immediately. This avoids having to go back
+ * and re-open prior segments when an fsync request comes along later.
+ * Doing it here ensures that one and only one backend will perform
+ * this fsync.
+ */
+ if (openLogOff >= XLogSegSize && !ispartialpage)
+ {
+ if (pg_fdatasync(openLogFile) != 0)
+ elog(STOP, "fsync(logfile %u seg %u) failed: %m",
+ openLogId, openLogSeg);
+ LogwrtResult.Flush = LogwrtResult.Write; /* end of current page */
+ }
- if (from != buffer)
- Write->curridx = NextBufIdx(Write->curridx);
- else
- LgwrResult.Write = LgwrRqst.Write;
+ if (ispartialpage)
+ {
+ /* Only asked to write a partial page */
+ LogwrtResult.Write = WriteRqst.Write;
+ break;
+ }
+ Write->curridx = NextBufIdx(Write->curridx);
}
- if (wcnt == 0)
- elog(STOP, "XLogWrite: nothing written");
- if (XLByteLT(LgwrResult.Flush, LgwrRqst.Flush) &&
- XLByteLE(LgwrRqst.Flush, LgwrResult.Write))
+ /*
+ * If asked to flush, do so
+ */
+ if (XLByteLT(LogwrtResult.Flush, WriteRqst.Flush) &&
+ XLByteLT(LogwrtResult.Flush, LogwrtResult.Write))
{
- if (pg_fdatasync(logFile) != 0)
+ /*
+ * Could get here without iterating above loop, in which case
+ * we might have no open file or the wrong one. However, we do
+ * not need to fsync more than one file.
+ */
+ if (openLogFile >= 0 &&
+ !XLByteInPrevSeg(LogwrtResult.Write, openLogId, openLogSeg))
+ {
+ if (close(openLogFile) != 0)
+ elog(STOP, "close(logfile %u seg %u) failed: %m",
+ openLogId, openLogSeg);
+ openLogFile = -1;
+ }
+ if (openLogFile < 0)
+ {
+ XLByteToPrevSeg(LogwrtResult.Write, openLogId, openLogSeg);
+ openLogFile = XLogFileOpen(openLogId, openLogSeg, false);
+ openLogOff = 0;
+ }
+
+ if (pg_fdatasync(openLogFile) != 0)
elog(STOP, "fsync(logfile %u seg %u) failed: %m",
- logId, logSeg);
- LgwrResult.Flush = LgwrResult.Write;
+ openLogId, openLogSeg);
+ LogwrtResult.Flush = LogwrtResult.Write;
}
+ /*
+ * Update shared-memory status
+ *
+ * We make sure that the shared 'request' values do not fall behind
+ * the 'result' values. This is not absolutely essential, but it saves
+ * some code in a couple of places.
+ */
S_LOCK(&(XLogCtl->info_lck));
- XLogCtl->LgwrResult = LgwrResult;
- if (XLByteLT(XLogCtl->LgwrRqst.Write, LgwrResult.Write))
- XLogCtl->LgwrRqst.Write = LgwrResult.Write;
+ XLogCtl->LogwrtResult = LogwrtResult;
+ if (XLByteLT(XLogCtl->LogwrtRqst.Write, LogwrtResult.Write))
+ XLogCtl->LogwrtRqst.Write = LogwrtResult.Write;
+ if (XLByteLT(XLogCtl->LogwrtRqst.Flush, LogwrtResult.Flush))
+ XLogCtl->LogwrtRqst.Flush = LogwrtResult.Flush;
S_UNLOCK(&(XLogCtl->info_lck));
- Write->LgwrResult = LgwrResult;
+ Write->LogwrtResult = LogwrtResult;
}
+/*
+ * Ensure that all XLOG data through the given position is flushed to disk.
+ *
+ * NOTE: this differs from XLogWrite mainly in that the logwrt_lck is not
+ * already held, and we try to avoid acquiring it if possible.
+ */
+void
+XLogFlush(XLogRecPtr record)
+{
+ XLogRecPtr WriteRqstPtr;
+ XLogwrtRqst WriteRqst;
+ unsigned spins = 0;
+
+ if (XLOG_DEBUG)
+ {
+ fprintf(stderr, "XLogFlush%s%s: rqst %u/%u; wrt %u/%u; flsh %u/%u\n",
+ (IsBootstrapProcessingMode()) ? "(bootstrap)" : "",
+ (InRedo) ? "(redo)" : "",
+ record.xlogid, record.xrecoff,
+ LogwrtResult.Write.xlogid, LogwrtResult.Write.xrecoff,
+ LogwrtResult.Flush.xlogid, LogwrtResult.Flush.xrecoff);
+ fflush(stderr);
+ }
+
+ /* Disabled during REDO */
+ if (InRedo)
+ return;
+
+ /* Quick exit if already known flushed */
+ if (XLByteLE(record, LogwrtResult.Flush))
+ return;
+
+ START_CRIT_SECTION();
+
+ /*
+ * Since fsync is usually a horribly expensive operation, we try to
+ * piggyback as much data as we can on each fsync: if we see any more
+ * data entered into the xlog buffer, we'll write and fsync that too,
+ * so that the final value of LogwrtResult.Flush is as large as possible.
+ * This gives us some chance of avoiding another fsync immediately after.
+ */
+
+ /* initialize to given target; may increase below */
+ WriteRqstPtr = record;
+
+ for (;;)
+ {
+ /* try to read LogwrtResult and update local state */
+ if (!TAS(&(XLogCtl->info_lck)))
+ {
+ if (XLByteLT(WriteRqstPtr, XLogCtl->LogwrtRqst.Write))
+ WriteRqstPtr = XLogCtl->LogwrtRqst.Write;
+ LogwrtResult = XLogCtl->LogwrtResult;
+ S_UNLOCK(&(XLogCtl->info_lck));
+ if (XLByteLE(record, LogwrtResult.Flush))
+ {
+ /* Done already */
+ break;
+ }
+ }
+ /* if something was added to log cache then try to flush this too */
+ if (!TAS(&(XLogCtl->insert_lck)))
+ {
+ XLogCtlInsert *Insert = &XLogCtl->Insert;
+ uint32 freespace = INSERT_FREESPACE(Insert);
+
+ if (freespace < SizeOfXLogRecord) /* buffer is full */
+ {
+ WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx];
+ }
+ else
+ {
+ WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx];
+ WriteRqstPtr.xrecoff -= freespace;
+ }
+ S_UNLOCK(&(XLogCtl->insert_lck));
+ }
+ /* now try to get the logwrt lock */
+ if (!TAS(&(XLogCtl->logwrt_lck)))
+ {
+ LogwrtResult = XLogCtl->Write.LogwrtResult;
+ if (XLByteLE(record, LogwrtResult.Flush))
+ {
+ /* Done already */
+ S_UNLOCK(&(XLogCtl->logwrt_lck));
+ break;
+ }
+ WriteRqst.Write = WriteRqstPtr;
+ WriteRqst.Flush = record;
+ XLogWrite(WriteRqst);
+ S_UNLOCK(&(XLogCtl->logwrt_lck));
+ if (XLByteLT(LogwrtResult.Flush, record))
+ elog(STOP, "XLogFlush: request is not satisfied");
+ break;
+ }
+ S_LOCK_SLEEP(&(XLogCtl->logwrt_lck), spins++, XLOG_LOCK_TIMEOUT);
+ }
+
+ END_CRIT_SECTION();
+}
+
+/*
+ * Create a new XLOG file segment, or open a pre-existing one.
+ *
+ * Returns FD of opened file.
+ */
static int
XLogFileInit(uint32 log, uint32 seg, bool *usexistent)
{
@@ -962,7 +1183,7 @@ XLogFileInit(uint32 log, uint32 seg, bool *usexistent)
XLogFileName(path, log, seg);
/*
- * Try to use existent file (checkpoint maker creates it sometimes).
+ * Try to use existent file (checkpoint maker may have created it already)
*/
if (*usexistent)
{
@@ -971,10 +1192,11 @@ XLogFileInit(uint32 log, uint32 seg, bool *usexistent)
{
if (errno != ENOENT)
elog(STOP, "InitOpen(logfile %u seg %u) failed: %m",
- logId, logSeg);
+ log, seg);
}
else
return(fd);
+ /* Set flag to tell caller there was no existent file */
*usexistent = false;
}
@@ -982,10 +1204,11 @@ XLogFileInit(uint32 log, uint32 seg, bool *usexistent)
unlink(tpath);
unlink(path);
- fd = BasicOpenFile(tpath, O_RDWR | O_CREAT | O_EXCL | PG_BINARY, S_IRUSR | S_IWUSR);
+ fd = BasicOpenFile(tpath, O_RDWR | O_CREAT | O_EXCL | PG_BINARY,
+ S_IRUSR | S_IWUSR);
if (fd < 0)
elog(STOP, "InitCreate(logfile %u seg %u) failed: %m",
- logId, logSeg);
+ log, seg);
/*
* Zero-fill the file. We have to do this the hard way to ensure that
@@ -1000,13 +1223,21 @@ XLogFileInit(uint32 log, uint32 seg, bool *usexistent)
for (nbytes = 0; nbytes < XLogSegSize; nbytes += sizeof(zbuffer))
{
if ((int) write(fd, zbuffer, sizeof(zbuffer)) != (int) sizeof(zbuffer))
+ {
+ int save_errno = errno;
+
+ /* If we fail to make the file, delete it to release disk space */
+ unlink(tpath);
+ errno = save_errno;
+
elog(STOP, "ZeroFill(logfile %u seg %u) failed: %m",
- logId, logSeg);
+ log, seg);
+ }
}
if (pg_fsync(fd) != 0)
elog(STOP, "fsync(logfile %u seg %u) failed: %m",
- logId, logSeg);
+ log, seg);
close(fd);
@@ -1018,22 +1249,25 @@ XLogFileInit(uint32 log, uint32 seg, bool *usexistent)
#ifndef __BEOS__
if (link(tpath, path) < 0)
elog(STOP, "InitRelink(logfile %u seg %u) failed: %m",
- logId, logSeg);
+ log, seg);
unlink(tpath);
#else
if (rename(tpath, path) < 0)
elog(STOP, "InitRelink(logfile %u seg %u) failed: %m",
- logId, logSeg);
+ log, seg);
#endif
fd = BasicOpenFile(path, O_RDWR | PG_BINARY, S_IRUSR | S_IWUSR);
if (fd < 0)
elog(STOP, "InitReopen(logfile %u seg %u) failed: %m",
- logId, logSeg);
+ log, seg);
return (fd);
}
+/*
+ * Open a pre-existing logfile segment.
+ */
static int
XLogFileOpen(uint32 log, uint32 seg, bool econt)
{
@@ -1048,51 +1282,94 @@ XLogFileOpen(uint32 log, uint32 seg, bool econt)
if (econt && errno == ENOENT)
{
elog(LOG, "open(logfile %u seg %u) failed: %m",
- logId, logSeg);
+ log, seg);
return (fd);
}
elog(STOP, "open(logfile %u seg %u) failed: %m",
- logId, logSeg);
+ log, seg);
}
return (fd);
}
/*
- * (Re)move offline log files older or equal to passwd one
+ * Preallocate log files beyond the specified log endpoint, according to
+ * the XLOGfile user parameter.
*/
static void
-MoveOfflineLogs(char *archdir, uint32 _logId, uint32 _logSeg)
+PreallocXlogFiles(XLogRecPtr endptr)
+{
+ uint32 _logId;
+ uint32 _logSeg;
+ int lf;
+ bool usexistent;
+ struct timeval delay;
+ int i;
+
+ XLByteToPrevSeg(endptr, _logId, _logSeg);
+ if (XLOGfiles > 0)
+ {
+ for (i = 1; i <= XLOGfiles; i++)
+ {
+ usexistent = true;
+ NextLogSeg(_logId, _logSeg);
+ SpinAcquire(ControlFileLockId);
+ lf = XLogFileInit(_logId, _logSeg, &usexistent);
+ close(lf);
+ SpinRelease(ControlFileLockId);
+ /*
+ * Give up ControlFileLockId for 1/50 sec to let other
+ * backends switch to new log file in XLogWrite()
+ */
+ delay.tv_sec = 0;
+ delay.tv_usec = 20000;
+ (void) select(0, NULL, NULL, NULL, &delay);
+ }
+ }
+ else if ((endptr.xrecoff - 1) % XLogSegSize >=
+ (uint32) (0.75 * XLogSegSize))
+ {
+ usexistent = true;
+ NextLogSeg(_logId, _logSeg);
+ SpinAcquire(ControlFileLockId);
+ lf = XLogFileInit(_logId, _logSeg, &usexistent);
+ close(lf);
+ SpinRelease(ControlFileLockId);
+ }
+}
+
+/*
+ * Remove or move offline all log files older or equal to passed log/seg#
+ */
+static void
+MoveOfflineLogs(uint32 log, uint32 seg)
{
DIR *xldir;
struct dirent *xlde;
char lastoff[32];
char path[MAXPGPATH];
- Assert(archdir[0] == 0); /* ! implemented yet */
+ Assert(XLOG_archive_dir[0] == 0); /* ! implemented yet */
xldir = opendir(XLogDir);
if (xldir == NULL)
elog(STOP, "MoveOfflineLogs: cannot open xlog dir: %m");
- sprintf(lastoff, "%08X%08X", _logId, _logSeg);
+ sprintf(lastoff, "%08X%08X", log, seg);
errno = 0;
while ((xlde = readdir(xldir)) != NULL)
{
- if (strlen(xlde->d_name) != 16 ||
- strspn(xlde->d_name, "0123456789ABCDEF") != 16)
- continue;
- if (strcmp(xlde->d_name, lastoff) > 0)
+ if (strlen(xlde->d_name) == 16 &&
+ strspn(xlde->d_name, "0123456789ABCDEF") == 16 &&
+ strcmp(xlde->d_name, lastoff) <= 0)
{
- errno = 0;
- continue;
+ elog(LOG, "MoveOfflineLogs: %s %s", (XLOG_archive_dir[0]) ?
+ "archive" : "remove", xlde->d_name);
+ sprintf(path, "%s%c%s", XLogDir, SEP_CHAR, xlde->d_name);
+ if (XLOG_archive_dir[0] == 0)
+ unlink(path);
}
- elog(LOG, "MoveOfflineLogs: %s %s", (archdir[0]) ?
- "archive" : "remove", xlde->d_name);
- sprintf(path, "%s%c%s", XLogDir, SEP_CHAR, xlde->d_name);
- if (archdir[0] == 0)
- unlink(path);
errno = 0;
}
if (errno)
@@ -1100,6 +1377,11 @@ MoveOfflineLogs(char *archdir, uint32 _logId, uint32 _logSeg)
closedir(xldir);
}
+/*
+ * Restore the backup blocks present in an XLOG record, if any.
+ *
+ * We assume all of the record has been read into memory at *record.
+ */
static void
RestoreBkpBlocks(XLogRecord *record, XLogRecPtr lsn)
{
@@ -1110,9 +1392,10 @@ RestoreBkpBlocks(XLogRecord *record, XLogRecPtr lsn)
char *blk;
int i;
- for (i = 0, blk = (char*)XLogRecGetData(record) + record->xl_len; i < 2; i++)
+ blk = (char*)XLogRecGetData(record) + record->xl_len;
+ for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
{
- if (!(record->xl_info & (XLR_SET_BKP_BLOCK(i))))
+ if (!(record->xl_info & XLR_SET_BKP_BLOCK(i)))
continue;
memcpy((char*)&bkpb, blk, sizeof(BkpBlock));
@@ -1137,6 +1420,13 @@ RestoreBkpBlocks(XLogRecord *record, XLogRecPtr lsn)
}
}
+/*
+ * CRC-check an XLOG record. We do not believe the contents of an XLOG
+ * record (other than to the minimal extent of computing the amount of
+ * data to read in) until we've checked the CRCs.
+ *
+ * We assume all of the record has been read into memory at *record.
+ */
static bool
RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode)
{
@@ -1146,83 +1436,93 @@ RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode)
uint32 len = record->xl_len;
char *blk;
- for (i = 0; i < 2; i++)
- {
- if (!(record->xl_info & (XLR_SET_BKP_BLOCK(i))))
- continue;
-
- if (len <= (sizeof(BkpBlock) + BLCKSZ))
- {
- elog(emode, "ReadRecord: record at %u/%u is too short to keep bkp block",
- recptr.xlogid, recptr.xrecoff);
- return(false);
- }
- len -= sizeof(BkpBlock);
- len -= BLCKSZ;
- }
-
- /* CRC of rmgr data */
+ /* Check CRC of rmgr data and record header */
INIT_CRC64(crc);
- COMP_CRC64(crc, ((char*)XLogRecGetData(record)), len);
- COMP_CRC64(crc, ((char*)record + offsetof(XLogRecord, xl_prev)),
- (SizeOfXLogRecord - offsetof(XLogRecord, xl_prev)));
+ COMP_CRC64(crc, XLogRecGetData(record), len);
+ COMP_CRC64(crc, (char*) record + sizeof(crc64),
+ SizeOfXLogRecord - sizeof(crc64));
FIN_CRC64(crc);
- if (record->xl_crc.crc1 != crc.crc1 || record->xl_crc.crc2 != crc.crc2)
+ if (!EQ_CRC64(record->xl_crc, crc))
{
elog(emode, "ReadRecord: bad rmgr data CRC in record at %u/%u",
- recptr.xlogid, recptr.xrecoff);
+ recptr.xlogid, recptr.xrecoff);
return(false);
}
- if (record->xl_len == len)
- return(true);
-
- for (i = 0, blk = (char*)XLogRecGetData(record) + len; i < 2; i++)
+ /* Check CRCs of backup blocks, if any */
+ blk = (char*)XLogRecGetData(record) + len;
+ for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
{
- if (!(record->xl_info & (XLR_SET_BKP_BLOCK(i))))
+ if (!(record->xl_info & XLR_SET_BKP_BLOCK(i)))
continue;
INIT_CRC64(crc);
- COMP_CRC64(crc, (blk + sizeof(BkpBlock)), BLCKSZ);
- COMP_CRC64(crc, (blk + offsetof(BkpBlock, node)),
- (sizeof(BkpBlock) - offsetof(BkpBlock, node)));
+ COMP_CRC64(crc, blk + sizeof(BkpBlock), BLCKSZ);
+ COMP_CRC64(crc, blk + sizeof(crc64),
+ sizeof(BkpBlock) - sizeof(crc64));
FIN_CRC64(crc);
- memcpy((char*)&cbuf, blk, sizeof(crc64));
+ memcpy((char*)&cbuf, blk, sizeof(crc64)); /* don't assume alignment */
- if (cbuf.crc1 != crc.crc1 || cbuf.crc2 != crc.crc2)
+ if (!EQ_CRC64(cbuf, crc))
{
elog(emode, "ReadRecord: bad bkp block %d CRC in record at %u/%u",
- i + 1, recptr.xlogid, recptr.xrecoff);
+ i + 1, recptr.xlogid, recptr.xrecoff);
return(false);
}
- blk += sizeof(BkpBlock);
- blk += BLCKSZ;
+ blk += sizeof(BkpBlock) + BLCKSZ;
}
- record->xl_len = len; /* !!! */
-
return(true);
}
+/*
+ * Attempt to read an XLOG record.
+ *
+ * If RecPtr is not NULL, try to read a record at that position. Otherwise
+ * try to read a record just after the last one previously read.
+ *
+ * If no valid record is available, returns NULL, or fails if emode is STOP.
+ * (emode must be either STOP or LOG.)
+ *
+ * buffer is a workspace at least _INTL_MAXLOGRECSZ bytes long. It is needed
+ * to reassemble a record that crosses block boundaries. Note that on
+ * successful return, the returned record pointer always points at buffer.
+ */
static XLogRecord *
-ReadRecord(XLogRecPtr *RecPtr, char *buffer)
+ReadRecord(XLogRecPtr *RecPtr, int emode, char *buffer)
{
XLogRecord *record;
XLogRecPtr tmpRecPtr = EndRecPtr;
- uint32 len;
- bool nextmode = (RecPtr == NULL);
- int emode = (nextmode) ? LOG : STOP;
- bool noBlck = false;
+ uint32 len,
+ total_len;
+ uint32 targetPageOff;
+ unsigned i;
- if (nextmode)
+ if (readBuf == NULL)
+ {
+ /*
+ * First time through, permanently allocate readBuf. We do it
+ * this way, rather than just making a static array, for two
+ * reasons: (1) no need to waste the storage in most instantiations
+ * of the backend; (2) a static char array isn't guaranteed to
+ * have any particular alignment, whereas malloc() will provide
+ * MAXALIGN'd storage.
+ */
+ readBuf = (char *) malloc(BLCKSZ);
+ Assert(readBuf != NULL);
+ }
+
+ if (RecPtr == NULL)
{
RecPtr = &tmpRecPtr;
+ /* fast case if next record is on same page */
if (nextRecord != NULL)
{
record = nextRecord;
goto got_record;
}
+ /* align old recptr to next page */
if (tmpRecPtr.xrecoff % BLCKSZ != 0)
tmpRecPtr.xrecoff += (BLCKSZ - tmpRecPtr.xrecoff % BLCKSZ);
if (tmpRecPtr.xrecoff >= XLogFileSize)
@@ -1236,31 +1536,36 @@ ReadRecord(XLogRecPtr *RecPtr, char *buffer)
elog(STOP, "ReadRecord: invalid record offset at (%u, %u)",
RecPtr->xlogid, RecPtr->xrecoff);
- if (readFile >= 0 && (RecPtr->xlogid != readId ||
- RecPtr->xrecoff / XLogSegSize != readSeg))
+ if (readFile >= 0 && !XLByteInSeg(*RecPtr, readId, readSeg))
{
close(readFile);
readFile = -1;
}
- readId = RecPtr->xlogid;
- readSeg = RecPtr->xrecoff / XLogSegSize;
+ XLByteToSeg(*RecPtr, readId, readSeg);
if (readFile < 0)
{
- noBlck = true;
- readFile = XLogFileOpen(readId, readSeg, nextmode);
+ readFile = XLogFileOpen(readId, readSeg, (emode == LOG));
if (readFile < 0)
goto next_record_is_invalid;
+ readOff = (uint32) (-1); /* force read to occur below */
}
- if (noBlck || readOff != (RecPtr->xrecoff % XLogSegSize) / BLCKSZ)
+ targetPageOff = ((RecPtr->xrecoff % XLogSegSize) / BLCKSZ) * BLCKSZ;
+ if (readOff != targetPageOff)
{
- readOff = (RecPtr->xrecoff % XLogSegSize) / BLCKSZ;
- if (lseek(readFile, (off_t) (readOff * BLCKSZ), SEEK_SET) < 0)
- elog(STOP, "ReadRecord: lseek(logfile %u seg %u off %u) failed: %m",
+ readOff = targetPageOff;
+ if (lseek(readFile, (off_t) readOff, SEEK_SET) < 0)
+ {
+ elog(emode, "ReadRecord: lseek(logfile %u seg %u off %u) failed: %m",
readId, readSeg, readOff);
+ goto next_record_is_invalid;
+ }
if (read(readFile, readBuf, BLCKSZ) != BLCKSZ)
- elog(STOP, "ReadRecord: read(logfile %u seg %u off %u) failed: %m",
+ {
+ elog(emode, "ReadRecord: read(logfile %u seg %u off %u) failed: %m",
readId, readSeg, readOff);
+ goto next_record_is_invalid;
+ }
if (((XLogPageHeader) readBuf)->xlp_magic != XLOG_PAGE_MAGIC)
{
elog(emode, "ReadRecord: invalid magic number %u in logfile %u seg %u off %u",
@@ -1269,64 +1574,83 @@ ReadRecord(XLogRecPtr *RecPtr, char *buffer)
goto next_record_is_invalid;
}
}
- if ((((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_SUBRECORD) &&
+ if ((((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) &&
RecPtr->xrecoff % BLCKSZ == SizeOfXLogPHD)
{
- elog(emode, "ReadRecord: subrecord is requested by (%u, %u)",
+ elog(emode, "ReadRecord: contrecord is requested by (%u, %u)",
RecPtr->xlogid, RecPtr->xrecoff);
goto next_record_is_invalid;
}
record = (XLogRecord *) ((char *) readBuf + RecPtr->xrecoff % BLCKSZ);
got_record:;
+ /*
+ * Currently, xl_len == 0 must be bad data, but that might not be
+ * true forever. See note in XLogInsert.
+ */
if (record->xl_len == 0)
{
elog(emode, "ReadRecord: record with zero len at (%u, %u)",
- RecPtr->xlogid, RecPtr->xrecoff);
+ RecPtr->xlogid, RecPtr->xrecoff);
goto next_record_is_invalid;
}
- if (record->xl_len > _INTL_MAXLOGRECSZ)
+ /*
+ * Compute total length of record including any appended backup blocks.
+ */
+ total_len = SizeOfXLogRecord + record->xl_len;
+ for (i = 0; i < XLR_MAX_BKP_BLOCKS; i++)
+ {
+ if (!(record->xl_info & XLR_SET_BKP_BLOCK(i)))
+ continue;
+ total_len += sizeof(BkpBlock) + BLCKSZ;
+ }
+ /*
+ * Make sure it will fit in buffer (currently, it is mechanically
+ * impossible for this test to fail, but it seems like a good idea
+ * anyway).
+ */
+ if (total_len > _INTL_MAXLOGRECSZ)
{
elog(emode, "ReadRecord: too long record len %u at (%u, %u)",
- record->xl_len, RecPtr->xlogid, RecPtr->xrecoff);
+ total_len, RecPtr->xlogid, RecPtr->xrecoff);
goto next_record_is_invalid;
}
if (record->xl_rmid > RM_MAX_ID)
{
- elog(emode, "ReadRecord: invalid resource managed id %u at (%u, %u)",
+ elog(emode, "ReadRecord: invalid resource manager id %u at (%u, %u)",
record->xl_rmid, RecPtr->xlogid, RecPtr->xrecoff);
goto next_record_is_invalid;
}
nextRecord = NULL;
- len = BLCKSZ - RecPtr->xrecoff % BLCKSZ - SizeOfXLogRecord;
- if (record->xl_len > len)
+ len = BLCKSZ - RecPtr->xrecoff % BLCKSZ;
+ if (total_len > len)
{
- XLogSubRecord *subrecord;
+ /* Need to reassemble record */
+ XLogContRecord *contrecord;
uint32 gotlen = len;
- memcpy(buffer, record, len + SizeOfXLogRecord);
+ memcpy(buffer, record, len);
record = (XLogRecord *) buffer;
- buffer += len + SizeOfXLogRecord;
+ buffer += len;
for (;;)
{
- readOff++;
- if (readOff == XLogSegSize / BLCKSZ)
+ readOff += BLCKSZ;
+ if (readOff >= XLogSegSize)
{
- readSeg++;
- if (readSeg == XLogLastSeg)
- {
- readSeg = 0;
- readId++;
- }
close(readFile);
- readOff = 0;
- readFile = XLogFileOpen(readId, readSeg, nextmode);
+ readFile = -1;
+ NextLogSeg(readId, readSeg);
+ readFile = XLogFileOpen(readId, readSeg, (emode == LOG));
if (readFile < 0)
goto next_record_is_invalid;
+ readOff = 0;
}
if (read(readFile, readBuf, BLCKSZ) != BLCKSZ)
- elog(STOP, "ReadRecord: read(logfile %u seg %u off %u) failed: %m",
+ {
+ elog(emode, "ReadRecord: read(logfile %u seg %u off %u) failed: %m",
readId, readSeg, readOff);
+ goto next_record_is_invalid;
+ }
if (((XLogPageHeader) readBuf)->xlp_magic != XLOG_PAGE_MAGIC)
{
elog(emode, "ReadRecord: invalid magic number %u in logfile %u seg %u off %u",
@@ -1334,164 +1658,65 @@ got_record:;
readId, readSeg, readOff);
goto next_record_is_invalid;
}
- if (!(((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_SUBRECORD))
+ if (!(((XLogPageHeader) readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD))
{
- elog(emode, "ReadRecord: there is no subrecord flag in logfile %u seg %u off %u",
+ elog(emode, "ReadRecord: there is no ContRecord flag in logfile %u seg %u off %u",
readId, readSeg, readOff);
goto next_record_is_invalid;
}
- subrecord = (XLogSubRecord *) ((char *) readBuf + SizeOfXLogPHD);
- if (subrecord->xl_len == 0 ||
- record->xl_len < (subrecord->xl_len + gotlen))
+ contrecord = (XLogContRecord *) ((char *) readBuf + SizeOfXLogPHD);
+ if (contrecord->xl_rem_len == 0 ||
+ total_len != (contrecord->xl_rem_len + gotlen))
{
- elog(emode, "ReadRecord: invalid subrecord len %u in logfile %u seg %u off %u",
- subrecord->xl_len, readId, readSeg, readOff);
+ elog(emode, "ReadRecord: invalid cont-record len %u in logfile %u seg %u off %u",
+ contrecord->xl_rem_len, readId, readSeg, readOff);
goto next_record_is_invalid;
}
- len = BLCKSZ - SizeOfXLogPHD - SizeOfXLogSubRecord;
-
- if (subrecord->xl_len > len)
+ len = BLCKSZ - SizeOfXLogPHD - SizeOfXLogContRecord;
+ if (contrecord->xl_rem_len > len)
{
- memcpy(buffer, (char *) subrecord + SizeOfXLogSubRecord, len);
+ memcpy(buffer, (char *)contrecord + SizeOfXLogContRecord, len);
gotlen += len;
buffer += len;
continue;
}
- if (record->xl_len != (subrecord->xl_len + gotlen))
- {
- elog(emode, "ReadRecord: invalid len %u of constracted record in logfile %u seg %u off %u",
- subrecord->xl_len + gotlen, readId, readSeg, readOff);
- goto next_record_is_invalid;
- }
- memcpy(buffer, (char *) subrecord + SizeOfXLogSubRecord, subrecord->xl_len);
+ memcpy(buffer, (char *) contrecord + SizeOfXLogContRecord,
+ contrecord->xl_rem_len);
break;
}
if (!RecordIsValid(record, *RecPtr, emode))
goto next_record_is_invalid;
- if (BLCKSZ - SizeOfXLogRecord >= MAXALIGN(subrecord->xl_len) +
- SizeOfXLogPHD + SizeOfXLogSubRecord)
+ if (BLCKSZ - SizeOfXLogRecord >= SizeOfXLogPHD +
+ SizeOfXLogContRecord + MAXALIGN(contrecord->xl_rem_len))
{
- nextRecord = (XLogRecord *) ((char *) subrecord +
- MAXALIGN(subrecord->xl_len) + SizeOfXLogSubRecord);
+ nextRecord = (XLogRecord *) ((char *) contrecord +
+ SizeOfXLogContRecord + MAXALIGN(contrecord->xl_rem_len));
}
EndRecPtr.xlogid = readId;
- EndRecPtr.xrecoff = readSeg * XLogSegSize + readOff * BLCKSZ +
- SizeOfXLogPHD + SizeOfXLogSubRecord +
- MAXALIGN(subrecord->xl_len);
+ EndRecPtr.xrecoff = readSeg * XLogSegSize + readOff +
+ SizeOfXLogPHD + SizeOfXLogContRecord +
+ MAXALIGN(contrecord->xl_rem_len);
ReadRecPtr = *RecPtr;
- return (record);
+ return record;
}
+
+ /* Record does not cross a page boundary */
if (!RecordIsValid(record, *RecPtr, emode))
goto next_record_is_invalid;
- if (BLCKSZ - SizeOfXLogRecord >= MAXALIGN(record->xl_len) +
- RecPtr->xrecoff % BLCKSZ + SizeOfXLogRecord)
- nextRecord = (XLogRecord *) ((char *) record +
- MAXALIGN(record->xl_len) + SizeOfXLogRecord);
+ if (BLCKSZ - SizeOfXLogRecord >= RecPtr->xrecoff % BLCKSZ +
+ MAXALIGN(total_len))
+ nextRecord = (XLogRecord *) ((char *) record + MAXALIGN(total_len));
EndRecPtr.xlogid = RecPtr->xlogid;
- EndRecPtr.xrecoff = RecPtr->xrecoff +
- MAXALIGN(record->xl_len) + SizeOfXLogRecord;
+ EndRecPtr.xrecoff = RecPtr->xrecoff + MAXALIGN(total_len);
ReadRecPtr = *RecPtr;
-
- return (record);
+ memcpy(buffer, record, total_len);
+ return (XLogRecord *) buffer;
next_record_is_invalid:;
close(readFile);
readFile = -1;
nextRecord = NULL;
- memset(buffer, 0, SizeOfXLogRecord);
- record = (XLogRecord *) buffer;
-
- /*
- * If we assumed that next record began on the same page where
- * previous one ended - zero end of page.
- */
- if (XLByteEQ(tmpRecPtr, EndRecPtr))
- {
- Assert(EndRecPtr.xrecoff % BLCKSZ > (SizeOfXLogPHD + SizeOfXLogSubRecord) &&
- BLCKSZ - EndRecPtr.xrecoff % BLCKSZ >= SizeOfXLogRecord);
- readId = EndRecPtr.xlogid;
- readSeg = EndRecPtr.xrecoff / XLogSegSize;
- readOff = (EndRecPtr.xrecoff % XLogSegSize) / BLCKSZ;
- elog(LOG, "Formatting logfile %u seg %u block %u at offset %u",
- readId, readSeg, readOff, EndRecPtr.xrecoff % BLCKSZ);
- readFile = XLogFileOpen(readId, readSeg, false);
- if (lseek(readFile, (off_t) (readOff * BLCKSZ), SEEK_SET) < 0)
- elog(STOP, "ReadRecord: lseek(logfile %u seg %u off %u) failed: %m",
- readId, readSeg, readOff);
- if (read(readFile, readBuf, BLCKSZ) != BLCKSZ)
- elog(STOP, "ReadRecord: read(logfile %u seg %u off %u) failed: %m",
- readId, readSeg, readOff);
- memset(readBuf + EndRecPtr.xrecoff % BLCKSZ, 0,
- BLCKSZ - EndRecPtr.xrecoff % BLCKSZ);
- if (lseek(readFile, (off_t) (readOff * BLCKSZ), SEEK_SET) < 0)
- elog(STOP, "ReadRecord: lseek(logfile %u seg %u off %u) failed: %m",
- readId, readSeg, readOff);
- if (write(readFile, readBuf, BLCKSZ) != BLCKSZ)
- elog(STOP, "ReadRecord: write(logfile %u seg %u off %u) failed: %m",
- readId, readSeg, readOff);
- readOff++;
- }
- else
- {
- Assert(EndRecPtr.xrecoff % BLCKSZ == 0 ||
- BLCKSZ - EndRecPtr.xrecoff % BLCKSZ < SizeOfXLogRecord);
- readId = tmpRecPtr.xlogid;
- readSeg = tmpRecPtr.xrecoff / XLogSegSize;
- readOff = (tmpRecPtr.xrecoff % XLogSegSize) / BLCKSZ;
- Assert(readOff > 0);
- }
- if (readOff > 0)
- {
- if (!XLByteEQ(tmpRecPtr, EndRecPtr))
- elog(LOG, "Formatting logfile %u seg %u block %u at offset 0",
- readId, readSeg, readOff);
- readOff *= BLCKSZ;
- memset(readBuf, 0, BLCKSZ);
- readFile = XLogFileOpen(readId, readSeg, false);
- if (lseek(readFile, (off_t) readOff, SEEK_SET) < 0)
- elog(STOP, "ReadRecord: lseek(logfile %u seg %u off %u) failed: %m",
- readId, readSeg, readOff);
- while (readOff < XLogSegSize)
- {
- if (write(readFile, readBuf, BLCKSZ) != BLCKSZ)
- elog(STOP, "ReadRecord: write(logfile %u seg %u off %u) failed: %m",
- readId, readSeg, readOff);
- readOff += BLCKSZ;
- }
- }
- if (readFile >= 0)
- {
- if (pg_fsync(readFile) < 0)
- elog(STOP, "ReadRecord: fsync(logfile %u seg %u) failed: %m",
- readId, readSeg);
- close(readFile);
- readFile = -1;
- }
-
- readId = EndRecPtr.xlogid;
- readSeg = (EndRecPtr.xrecoff - 1) / XLogSegSize + 1;
- elog(LOG, "The last logId/logSeg is (%u, %u)", readId, readSeg - 1);
- if (ControlFile->logId != readId || ControlFile->logSeg != readSeg)
- {
- elog(LOG, "Set logId/logSeg in control file");
- ControlFile->logId = readId;
- ControlFile->logSeg = readSeg;
- ControlFile->time = time(NULL);
- UpdateControlFile();
- }
- if (readSeg == XLogLastSeg)
- {
- readSeg = 0;
- readId++;
- }
- {
- char path[MAXPGPATH];
-
- XLogFileName(path, readId, readSeg);
- unlink(path);
- }
-
- return (record);
+ return NULL;
}
/*
@@ -1521,17 +1746,18 @@ static void
WriteControlFile(void)
{
int fd;
- char buffer[BLCKSZ];
+ char buffer[BLCKSZ]; /* need not be aligned */
#ifdef USE_LOCALE
char *localeptr;
#endif
/*
- * Initialize compatibility-check fields
+ * Initialize version and compatibility-check fields
*/
+ ControlFile->pg_control_version = PG_CONTROL_VERSION;
+ ControlFile->catalog_version_no = CATALOG_VERSION_NO;
ControlFile->blcksz = BLCKSZ;
ControlFile->relseg_size = RELSEG_SIZE;
- ControlFile->catalog_version_no = CATALOG_VERSION_NO;
#ifdef USE_LOCALE
localeptr = setlocale(LC_COLLATE, NULL);
if (!localeptr)
@@ -1558,6 +1784,13 @@ WriteControlFile(void)
strcpy(ControlFile->lc_ctype, "C");
#endif
+ /* Contents are protected with a CRC */
+ INIT_CRC64(ControlFile->crc);
+ COMP_CRC64(ControlFile->crc,
+ (char*) ControlFile + sizeof(crc64),
+ sizeof(ControlFileData) - sizeof(crc64));
+ FIN_CRC64(ControlFile->crc);
+
/*
* We write out BLCKSZ bytes into pg_control, zero-padding the
* excess over sizeof(ControlFileData). This reduces the odds
@@ -1568,12 +1801,6 @@ WriteControlFile(void)
if (sizeof(ControlFileData) > BLCKSZ)
elog(STOP, "sizeof(ControlFileData) is too large ... fix xlog.c");
- INIT_CRC64(ControlFile->crc);
- COMP_CRC64(ControlFile->crc,
- ((char*)ControlFile + offsetof(ControlFileData, logId)),
- (sizeof(ControlFileData) - offsetof(ControlFileData, logId)));
- FIN_CRC64(ControlFile->crc);
-
memset(buffer, 0, BLCKSZ);
memcpy(buffer, ControlFile, sizeof(ControlFileData));
@@ -1609,13 +1836,24 @@ ReadControlFile(void)
close(fd);
+ /*
+ * Check for expected pg_control format version. If this is wrong,
+ * the CRC check will likely fail because we'll be checking the wrong
+ * number of bytes. Complaining about wrong version will probably be
+ * more enlightening than complaining about wrong CRC.
+ */
+ if (ControlFile->pg_control_version != PG_CONTROL_VERSION)
+ elog(STOP, "database was initialized with PG_CONTROL_VERSION %d,\n\tbut the backend was compiled with PG_CONTROL_VERSION %d.\n\tlooks like you need to initdb.",
+ ControlFile->pg_control_version, PG_CONTROL_VERSION);
+
+ /* Now check the CRC. */
INIT_CRC64(crc);
COMP_CRC64(crc,
- ((char*)ControlFile + offsetof(ControlFileData, logId)),
- (sizeof(ControlFileData) - offsetof(ControlFileData, logId)));
+ (char*) ControlFile + sizeof(crc64),
+ sizeof(ControlFileData) - sizeof(crc64));
FIN_CRC64(crc);
- if (crc.crc1 != ControlFile->crc.crc1 || crc.crc2 != ControlFile->crc.crc2)
+ if (!EQ_CRC64(crc, ControlFile->crc))
elog(STOP, "Invalid CRC in control file");
/*
@@ -1629,15 +1867,15 @@ ReadControlFile(void)
* for themselves. (These locale settings are considered critical
* compatibility items because they can affect sort order of indexes.)
*/
+ if (ControlFile->catalog_version_no != CATALOG_VERSION_NO)
+ elog(STOP, "database was initialized with CATALOG_VERSION_NO %d,\n\tbut the backend was compiled with CATALOG_VERSION_NO %d.\n\tlooks like you need to initdb.",
+ ControlFile->catalog_version_no, CATALOG_VERSION_NO);
if (ControlFile->blcksz != BLCKSZ)
elog(STOP, "database was initialized with BLCKSZ %d,\n\tbut the backend was compiled with BLCKSZ %d.\n\tlooks like you need to initdb.",
ControlFile->blcksz, BLCKSZ);
if (ControlFile->relseg_size != RELSEG_SIZE)
elog(STOP, "database was initialized with RELSEG_SIZE %d,\n\tbut the backend was compiled with RELSEG_SIZE %d.\n\tlooks like you need to initdb.",
ControlFile->relseg_size, RELSEG_SIZE);
- if (ControlFile->catalog_version_no != CATALOG_VERSION_NO)
- elog(STOP, "database was initialized with CATALOG_VERSION_NO %d,\n\tbut the backend was compiled with CATALOG_VERSION_NO %d.\n\tlooks like you need to initdb.",
- ControlFile->catalog_version_no, CATALOG_VERSION_NO);
#ifdef USE_LOCALE
if (setlocale(LC_COLLATE, ControlFile->lc_collate) == NULL)
elog(STOP, "database was initialized with LC_COLLATE '%s',\n\twhich is not recognized by setlocale().\n\tlooks like you need to initdb.",
@@ -1660,8 +1898,8 @@ UpdateControlFile(void)
INIT_CRC64(ControlFile->crc);
COMP_CRC64(ControlFile->crc,
- ((char*)ControlFile + offsetof(ControlFileData, logId)),
- (sizeof(ControlFileData) - offsetof(ControlFileData, logId)));
+ (char*) ControlFile + sizeof(crc64),
+ sizeof(ControlFileData) - sizeof(crc64));
FIN_CRC64(ControlFile->crc);
fd = BasicOpenFile(ControlFilePath, O_RDWR | PG_BINARY, S_IRUSR | S_IWUSR);
@@ -1678,7 +1916,7 @@ UpdateControlFile(void)
}
/*
- * Management of shared memory for XLOG
+ * Initialization of shared memory for XLOG
*/
int
@@ -1687,9 +1925,9 @@ XLOGShmemSize(void)
if (XLOGbuffers < MinXLOGbuffers)
XLOGbuffers = MinXLOGbuffers;
- return (sizeof(XLogCtlData) + BLCKSZ * XLOGbuffers +
- sizeof(XLogRecPtr) * XLOGbuffers +
- sizeof(ControlFileData));
+ return MAXALIGN(sizeof(XLogCtlData) + sizeof(XLogRecPtr) * XLOGbuffers)
+ + BLCKSZ * XLOGbuffers +
+ MAXALIGN(sizeof(ControlFileData));
}
void
@@ -1702,13 +1940,46 @@ XLOGShmemInit(void)
XLOGbuffers = MinXLOGbuffers;
XLogCtl = (XLogCtlData *)
- ShmemInitStruct("XLOG Ctl", sizeof(XLogCtlData) + BLCKSZ * XLOGbuffers +
- sizeof(XLogRecPtr) * XLOGbuffers, &found);
+ ShmemInitStruct("XLOG Ctl",
+ MAXALIGN(sizeof(XLogCtlData) +
+ sizeof(XLogRecPtr) * XLOGbuffers)
+ + BLCKSZ * XLOGbuffers,
+ &found);
Assert(!found);
ControlFile = (ControlFileData *)
ShmemInitStruct("Control File", sizeof(ControlFileData), &found);
Assert(!found);
+ memset(XLogCtl, 0, sizeof(XLogCtlData));
+ /*
+ * Since XLogCtlData contains XLogRecPtr fields, its sizeof should be
+ * a multiple of the alignment for same, so no extra alignment padding
+ * is needed here.
+ */
+ XLogCtl->xlblocks = (XLogRecPtr *)
+ (((char *) XLogCtl) + sizeof(XLogCtlData));
+ memset(XLogCtl->xlblocks, 0, sizeof(XLogRecPtr) * XLOGbuffers);
+ /*
+ * Here, on the other hand, we must MAXALIGN to ensure the page buffers
+ * have worst-case alignment.
+ */
+ XLogCtl->pages =
+ ((char *) XLogCtl) + MAXALIGN(sizeof(XLogCtlData) +
+ sizeof(XLogRecPtr) * XLOGbuffers);
+ memset(XLogCtl->pages, 0, BLCKSZ * XLOGbuffers);
+
+ /*
+ * Do basic initialization of XLogCtl shared data.
+ * (StartupXLOG will fill in additional info.)
+ */
+ XLogCtl->XLogCacheByte = BLCKSZ * XLOGbuffers;
+ XLogCtl->XLogCacheBlck = XLOGbuffers - 1;
+ XLogCtl->Insert.currpage = (XLogPageHeader) (XLogCtl->pages);
+ S_INIT_LOCK(&(XLogCtl->insert_lck));
+ S_INIT_LOCK(&(XLogCtl->info_lck));
+ S_INIT_LOCK(&(XLogCtl->logwrt_lck));
+ S_INIT_LOCK(&(XLogCtl->chkp_lck));
+
/*
* If we are not in bootstrap mode, pg_control should already exist.
* Read and validate it immediately (see comments in ReadControlFile()
@@ -1719,27 +1990,33 @@ XLOGShmemInit(void)
}
/*
- * This func must be called ONCE on system install
+ * This func must be called ONCE on system install. It creates pg_control
+ * and the initial XLOG segment.
*/
void
-BootStrapXLOG()
+BootStrapXLOG(void)
{
CheckPoint checkPoint;
- char buffer[BLCKSZ];
- bool usexistent = false;
- XLogPageHeader page = (XLogPageHeader) buffer;
+ char *buffer;
+ XLogPageHeader page;
XLogRecord *record;
+ bool usexistent = false;
crc64 crc;
+ /* Use malloc() to ensure buffer is MAXALIGNED */
+ buffer = (char *) malloc(BLCKSZ);
+ page = (XLogPageHeader) buffer;
+
checkPoint.redo.xlogid = 0;
checkPoint.redo.xrecoff = SizeOfXLogPHD;
checkPoint.undo = checkPoint.redo;
+ checkPoint.ThisStartUpID = 0;
checkPoint.nextXid = FirstTransactionId;
checkPoint.nextOid = BootstrapObjectIdData;
- checkPoint.ThisStartUpID = 0;
- checkPoint.Shutdown = true;
+ checkPoint.time = time(NULL);
ShmemVariableCache->nextXid = checkPoint.nextXid;
+ ShmemVariableCache->xidCount = 0;
ShmemVariableCache->nextOid = checkPoint.nextOid;
ShmemVariableCache->oidCount = 0;
@@ -1752,34 +2029,36 @@ BootStrapXLOG()
record->xl_xact_prev = record->xl_prev;
record->xl_xid = InvalidTransactionId;
record->xl_len = sizeof(checkPoint);
- record->xl_info = 0;
+ record->xl_info = XLOG_CHECKPOINT_SHUTDOWN;
record->xl_rmid = RM_XLOG_ID;
- memcpy((char *) record + SizeOfXLogRecord, &checkPoint, sizeof(checkPoint));
+ memcpy(XLogRecGetData(record), &checkPoint, sizeof(checkPoint));
INIT_CRC64(crc);
- COMP_CRC64(crc, ((char*)&checkPoint), sizeof(checkPoint));
- COMP_CRC64(crc, ((char*)record + offsetof(XLogRecord, xl_prev)),
- (SizeOfXLogRecord - offsetof(XLogRecord, xl_prev)));
+ COMP_CRC64(crc, &checkPoint, sizeof(checkPoint));
+ COMP_CRC64(crc, (char*) record + sizeof(crc64),
+ SizeOfXLogRecord - sizeof(crc64));
FIN_CRC64(crc);
record->xl_crc = crc;
- logFile = XLogFileInit(0, 0, &usexistent);
+ openLogFile = XLogFileInit(0, 0, &usexistent);
- if (write(logFile, buffer, BLCKSZ) != BLCKSZ)
+ if (write(openLogFile, buffer, BLCKSZ) != BLCKSZ)
elog(STOP, "BootStrapXLOG failed to write logfile: %m");
- if (pg_fsync(logFile) != 0)
+ if (pg_fsync(openLogFile) != 0)
elog(STOP, "BootStrapXLOG failed to fsync logfile: %m");
- close(logFile);
- logFile = -1;
+ close(openLogFile);
+ openLogFile = -1;
memset(ControlFile, 0, sizeof(ControlFileData));
+ /* Initialize pg_control status fields */
+ ControlFile->state = DB_SHUTDOWNED;
+ ControlFile->time = checkPoint.time;
ControlFile->logId = 0;
ControlFile->logSeg = 1;
ControlFile->checkPoint = checkPoint.redo;
- ControlFile->time = time(NULL);
- ControlFile->state = DB_SHUTDOWNED;
+ ControlFile->checkPointCopy = checkPoint;
/* some additional ControlFile fields are set in WriteControlFile() */
WriteControlFile();
@@ -1788,47 +2067,35 @@ BootStrapXLOG()
static char *
str_time(time_t tnow)
{
- static char buf[20];
+ static char buf[32];
strftime(buf, sizeof(buf),
- "%Y-%m-%d %H:%M:%S",
+ "%Y-%m-%d %H:%M:%S %Z",
localtime(&tnow));
return buf;
}
/*
- * This func must be called ONCE on system startup
+ * This must be called ONCE during postmaster or standalone-backend startup
*/
void
-StartupXLOG()
+StartupXLOG(void)
{
XLogCtlInsert *Insert;
CheckPoint checkPoint;
+ bool wasShutdown;
XLogRecPtr RecPtr,
- LastRec;
+ LastRec,
+ checkPointLoc,
+ EndOfLog;
XLogRecord *record;
- char buffer[_INTL_MAXLOGRECSZ + SizeOfXLogRecord];
+ char *buffer;
- elog(LOG, "starting up");
- CritSectionCount++;
+ /* Use malloc() to ensure record buffer is MAXALIGNED */
+ buffer = (char *) malloc(_INTL_MAXLOGRECSZ);
- XLogCtl->xlblocks = (XLogRecPtr *) (((char *) XLogCtl) + sizeof(XLogCtlData));
- XLogCtl->pages = ((char *) XLogCtl->xlblocks + sizeof(XLogRecPtr) * XLOGbuffers);
- XLogCtl->XLogCacheByte = BLCKSZ * XLOGbuffers;
- XLogCtl->XLogCacheBlck = XLOGbuffers - 1;
- memset(XLogCtl->xlblocks, 0, sizeof(XLogRecPtr) * XLOGbuffers);
- XLogCtl->LgwrRqst = LgwrRqst;
- XLogCtl->LgwrResult = LgwrResult;
- XLogCtl->Insert.LgwrResult = LgwrResult;
- XLogCtl->Insert.curridx = 0;
- XLogCtl->Insert.currpage = (XLogPageHeader) (XLogCtl->pages);
- XLogCtl->Write.LgwrResult = LgwrResult;
- XLogCtl->Write.curridx = 0;
- S_INIT_LOCK(&(XLogCtl->insert_lck));
- S_INIT_LOCK(&(XLogCtl->info_lck));
- S_INIT_LOCK(&(XLogCtl->lgwr_lck));
- S_INIT_LOCK(&(XLogCtl->chkp_lck));
+ CritSectionCount++;
/*
* Read control file and check XLOG status looks valid.
@@ -1860,22 +2127,42 @@ StartupXLOG()
elog(LOG, "database system was interrupted at %s",
str_time(ControlFile->time));
- LastRec = RecPtr = ControlFile->checkPoint;
- if (!XRecOffIsValid(RecPtr.xrecoff))
- elog(STOP, "Invalid checkPoint in control file");
- elog(LOG, "CheckPoint record at (%u, %u)", RecPtr.xlogid, RecPtr.xrecoff);
-
- record = ReadRecord(&RecPtr, buffer);
- if (record->xl_rmid != RM_XLOG_ID)
- elog(STOP, "Invalid RMID in checkPoint record");
- if (record->xl_len != sizeof(checkPoint))
- elog(STOP, "Invalid length of checkPoint record");
- checkPoint = *((CheckPoint *) ((char *) record + SizeOfXLogRecord));
+ /*
+ * Get the last valid checkpoint record. If the latest one according
+ * to pg_control is broken, try the next-to-last one.
+ */
+ record = ReadCheckpointRecord(ControlFile->checkPoint,
+ "primary", buffer);
+ if (record != NULL)
+ {
+ checkPointLoc = ControlFile->checkPoint;
+ elog(LOG, "CheckPoint record at (%u, %u)",
+ checkPointLoc.xlogid, checkPointLoc.xrecoff);
+ }
+ else
+ {
+ record = ReadCheckpointRecord(ControlFile->prevCheckPoint,
+ "secondary", buffer);
+ if (record != NULL)
+ {
+ checkPointLoc = ControlFile->prevCheckPoint;
+ elog(LOG, "Using previous CheckPoint record at (%u, %u)",
+ checkPointLoc.xlogid, checkPointLoc.xrecoff);
+ InRecovery = true; /* force recovery even if SHUTDOWNED */
+ }
+ else
+ {
+ elog(STOP, "Unable to locate a valid CheckPoint record");
+ }
+ }
+ LastRec = RecPtr = checkPointLoc;
+ memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
+ wasShutdown = (record->xl_info == XLOG_CHECKPOINT_SHUTDOWN);
elog(LOG, "Redo record at (%u, %u); Undo record at (%u, %u); Shutdown %s",
checkPoint.redo.xlogid, checkPoint.redo.xrecoff,
checkPoint.undo.xlogid, checkPoint.undo.xrecoff,
- (checkPoint.Shutdown) ? "TRUE" : "FALSE");
+ wasShutdown ? "TRUE" : "FALSE");
elog(LOG, "NextTransactionId: %u; NextOid: %u",
checkPoint.nextXid, checkPoint.nextOid);
if (checkPoint.nextXid < FirstTransactionId ||
@@ -1883,6 +2170,7 @@ StartupXLOG()
elog(STOP, "Invalid NextTransactionId/NextOid");
ShmemVariableCache->nextXid = checkPoint.nextXid;
+ ShmemVariableCache->xidCount = 0;
ShmemVariableCache->nextOid = checkPoint.nextOid;
ShmemVariableCache->oidCount = 0;
@@ -1898,10 +2186,8 @@ StartupXLOG()
if (XLByteLT(checkPoint.undo, RecPtr) ||
XLByteLT(checkPoint.redo, RecPtr))
{
- if (checkPoint.Shutdown)
+ if (wasShutdown)
elog(STOP, "Invalid Redo/Undo record in shutdown checkpoint");
- if (ControlFile->state == DB_SHUTDOWNED)
- elog(STOP, "Invalid Redo/Undo record in shut down state");
InRecovery = true;
}
else if (ControlFile->state != DB_SHUTDOWNED)
@@ -1923,11 +2209,11 @@ StartupXLOG()
/* Is REDO required ? */
if (XLByteLT(checkPoint.redo, RecPtr))
- record = ReadRecord(&(checkPoint.redo), buffer);
+ record = ReadRecord(&(checkPoint.redo), STOP, buffer);
else /* read past CheckPoint record */
- record = ReadRecord(NULL, buffer);
+ record = ReadRecord(NULL, LOG, buffer);
- if (record->xl_len != 0)
+ if (record != NULL)
{
InRedo = true;
elog(LOG, "redo starts at (%u, %u)",
@@ -1935,7 +2221,11 @@ StartupXLOG()
do
{
if (record->xl_xid >= ShmemVariableCache->nextXid)
+ {
+ /* This probably shouldn't happen... */
ShmemVariableCache->nextXid = record->xl_xid + 1;
+ ShmemVariableCache->xidCount = 0;
+ }
if (XLOG_DEBUG)
{
char buf[8192];
@@ -1947,16 +2237,15 @@ StartupXLOG()
strcat(buf, " - ");
RmgrTable[record->xl_rmid].rm_desc(buf,
record->xl_info, XLogRecGetData(record));
- strcat(buf, "\n");
- write(2, buf, strlen(buf));
+ fprintf(stderr, "%s\n", buf);
}
- if (record->xl_info & (XLR_BKP_BLOCK_1|XLR_BKP_BLOCK_2))
+ if (record->xl_info & XLR_BKP_BLOCK_MASK)
RestoreBkpBlocks(record, EndRecPtr);
RmgrTable[record->xl_rmid].rm_redo(EndRecPtr, record);
- record = ReadRecord(NULL, buffer);
- } while (record->xl_len != 0);
+ record = ReadRecord(NULL, LOG, buffer);
+ } while (record != NULL);
elog(LOG, "redo done at (%u, %u)",
ReadRecPtr.xlogid, ReadRecPtr.xrecoff);
LastRec = ReadRecPtr;
@@ -1966,29 +2255,40 @@ StartupXLOG()
elog(LOG, "redo is not required");
}
- /* Init xlog buffer cache */
- record = ReadRecord(&LastRec, buffer);
- logId = EndRecPtr.xlogid;
- logSeg = (EndRecPtr.xrecoff - 1) / XLogSegSize;
- logOff = 0;
- logFile = XLogFileOpen(logId, logSeg, false);
- XLogCtl->xlblocks[0].xlogid = logId;
+ /*
+ * Init xlog buffer cache using the block containing the last valid
+ * record from the previous incarnation.
+ */
+ record = ReadRecord(&LastRec, STOP, buffer);
+ EndOfLog = EndRecPtr;
+ XLByteToPrevSeg(EndOfLog, openLogId, openLogSeg);
+ openLogFile = XLogFileOpen(openLogId, openLogSeg, false);
+ openLogOff = 0;
+ ControlFile->logId = openLogId;
+ ControlFile->logSeg = openLogSeg + 1;
+ XLogCtl->xlblocks[0].xlogid = openLogId;
XLogCtl->xlblocks[0].xrecoff =
- ((EndRecPtr.xrecoff - 1) / BLCKSZ + 1) * BLCKSZ;
+ ((EndOfLog.xrecoff - 1) / BLCKSZ + 1) * BLCKSZ;
Insert = &XLogCtl->Insert;
- memcpy((char *) (Insert->currpage), readBuf, BLCKSZ);
- Insert->currpos = ((char *) Insert->currpage) +
- (EndRecPtr.xrecoff + BLCKSZ - XLogCtl->xlblocks[0].xrecoff);
+ /* Tricky point here: readBuf contains the *last* block that the LastRec
+ * record spans, not the one it starts in, which is what we want.
+ */
+ Assert(readOff == (XLogCtl->xlblocks[0].xrecoff - BLCKSZ) % XLogSegSize);
+ memcpy((char *) Insert->currpage, readBuf, BLCKSZ);
+ Insert->currpos = (char *) Insert->currpage +
+ (EndOfLog.xrecoff + BLCKSZ - XLogCtl->xlblocks[0].xrecoff);
+ /* Make sure rest of page is zero */
+ memset(Insert->currpos, 0, INSERT_FREESPACE(Insert));
Insert->PrevRecord = LastRec;
- LgwrRqst.Write = LgwrRqst.Flush =
- LgwrResult.Write = LgwrResult.Flush = EndRecPtr;
+ LogwrtResult.Write = LogwrtResult.Flush = EndOfLog;
- XLogCtl->Write.LgwrResult = LgwrResult;
- Insert->LgwrResult = LgwrResult;
+ XLogCtl->Write.LogwrtResult = LogwrtResult;
+ Insert->LogwrtResult = LogwrtResult;
+ XLogCtl->LogwrtResult = LogwrtResult;
- XLogCtl->LgwrRqst = LgwrRqst;
- XLogCtl->LgwrResult = LgwrResult;
+ XLogCtl->LogwrtRqst.Write = EndOfLog;
+ XLogCtl->LogwrtRqst.Flush = EndOfLog;
#ifdef NOT_USED
/* UNDO */
@@ -2001,7 +2301,7 @@ StartupXLOG()
RecPtr.xlogid, RecPtr.xrecoff);
do
{
- record = ReadRecord(&RecPtr, buffer);
+ record = ReadRecord(&RecPtr, STOP, buffer);
if (TransactionIdIsValid(record->xl_xid) &&
!TransactionIdDidCommit(record->xl_xid))
RmgrTable[record->xl_rmid].rm_undo(EndRecPtr, record);
@@ -2017,25 +2317,21 @@ StartupXLOG()
if (InRecovery)
{
+ /*
+ * In case we had to use the secondary checkpoint, make sure that
+ * it will still be shown as the secondary checkpoint after this
+ * CreateCheckPoint operation; we don't want the broken primary
+ * checkpoint to become prevCheckPoint...
+ */
+ ControlFile->checkPoint = checkPointLoc;
CreateCheckPoint(true);
XLogCloseRelationCache();
}
- if (XLOGfiles > 0) /* pre-allocate log files */
- {
- uint32 _logId = logId,
- _logSeg = logSeg;
- int lf, i;
- bool usexistent;
-
- for (i = 1; i <= XLOGfiles; i++)
- {
- NextLogSeg(_logId, _logSeg);
- usexistent = false;
- lf = XLogFileInit(_logId, _logSeg, &usexistent);
- close(lf);
- }
- }
+ /*
+ * Preallocate additional log files, if wanted.
+ */
+ PreallocXlogFiles(EndOfLog);
InRecovery = false;
@@ -2049,11 +2345,63 @@ StartupXLOG()
elog(LOG, "database system is in production state");
CritSectionCount--;
- return;
+ /* Shut down readFile facility, free space */
+ if (readFile >= 0)
+ {
+ close(readFile);
+ readFile = -1;
+ }
+ if (readBuf)
+ {
+ free(readBuf);
+ readBuf = NULL;
+ }
+
+ free(buffer);
+}
+
+/* Subroutine to try to fetch and validate a prior checkpoint record */
+static XLogRecord *
+ReadCheckpointRecord(XLogRecPtr RecPtr,
+ const char *whichChkpt,
+ char *buffer)
+{
+ XLogRecord *record;
+
+ if (!XRecOffIsValid(RecPtr.xrecoff))
+ {
+ elog(LOG, "Invalid %s checkPoint link in control file", whichChkpt);
+ return NULL;
+ }
+
+ record = ReadRecord(&RecPtr, LOG, buffer);
+
+ if (record == NULL)
+ {
+ elog(LOG, "Invalid %s checkPoint record", whichChkpt);
+ return NULL;
+ }
+ if (record->xl_rmid != RM_XLOG_ID)
+ {
+ elog(LOG, "Invalid RMID in %s checkPoint record", whichChkpt);
+ return NULL;
+ }
+ if (record->xl_info != XLOG_CHECKPOINT_SHUTDOWN &&
+ record->xl_info != XLOG_CHECKPOINT_ONLINE)
+ {
+ elog(LOG, "Invalid xl_info in %s checkPoint record", whichChkpt);
+ return NULL;
+ }
+ if (record->xl_len != sizeof(CheckPoint))
+ {
+ elog(LOG, "Invalid length of %s checkPoint record", whichChkpt);
+ return NULL;
+ }
+ return record;
}
/*
- * Postmaster uses it to set ThisStartUpID & RedoRecPtr from
+ * Postmaster uses this to initialize ThisStartUpID & RedoRecPtr from
* XLogCtlData located in shmem after successful startup.
*/
void
@@ -2064,9 +2412,17 @@ SetThisStartUpID(void)
}
/*
- * CheckPoint-er called by postmaster creates copy of RedoRecPtr
- * for postmaster in shmem. Postmaster uses GetRedoRecPtr after
- * that to update its own copy of RedoRecPtr.
+ * CheckPoint process called by postmaster saves copy of new RedoRecPtr
+ * in shmem (using SetRedoRecPtr). When checkpointer completes, postmaster
+ * calls GetRedoRecPtr to update its own copy of RedoRecPtr, so that
+ * subsequently-spawned backends will start out with a reasonably up-to-date
+ * local RedoRecPtr. Since these operations are not protected by any spinlock
+ * and copying an XLogRecPtr isn't atomic, it's unsafe to use either of these
+ * routines at other times!
+ *
+ * Note: once spawned, a backend must update its local RedoRecPtr from
+ * XLogCtl->Insert.RedoRecPtr while holding the insert spinlock. This is
+ * done in XLogInsert().
*/
void
SetRedoRecPtr(void)
@@ -2081,13 +2437,16 @@ GetRedoRecPtr(void)
}
/*
- * This func must be called ONCE on system shutdown
+ * This must be called ONCE during postmaster or standalone-backend shutdown
*/
void
-ShutdownXLOG()
+ShutdownXLOG(void)
{
elog(LOG, "shutting down");
+ /* suppress in-transaction check in CreateCheckPoint */
+ MyLastRecPtr.xrecoff = 0;
+
CritSectionCount++;
CreateDummyCaches();
CreateCheckPoint(true);
@@ -2096,6 +2455,9 @@ ShutdownXLOG()
elog(LOG, "database system is shut down");
}
+/*
+ * Perform a checkpoint --- either during shutdown, or on-the-fly
+ */
void
CreateCheckPoint(bool shutdown)
{
@@ -2104,10 +2466,8 @@ CreateCheckPoint(bool shutdown)
XLogCtlInsert *Insert = &XLogCtl->Insert;
XLogRecData rdata;
uint32 freespace;
- uint16 curridx;
uint32 _logId;
uint32 _logSeg;
- char archdir[MAXPGPATH];
unsigned spins = 0;
if (MyLastRecPtr.xrecoff != 0)
@@ -2122,154 +2482,198 @@ CreateCheckPoint(bool shutdown)
CHECKPOINT_LOCK_TIMEOUT, 1000000);
}
- memset(&checkPoint, 0, sizeof(checkPoint));
if (shutdown)
{
ControlFile->state = DB_SHUTDOWNING;
ControlFile->time = time(NULL);
UpdateControlFile();
}
+
+ memset(&checkPoint, 0, sizeof(checkPoint));
checkPoint.ThisStartUpID = ThisStartUpID;
- checkPoint.Shutdown = shutdown;
+ checkPoint.time = time(NULL);
- /* Get REDO record ptr */
S_LOCK(&(XLogCtl->insert_lck));
- freespace = ((char *) Insert->currpage) + BLCKSZ - Insert->currpos;
+
+ /*
+ * If this isn't a shutdown, and we have not inserted any XLOG records
+ * since the start of the last checkpoint, skip the checkpoint. The
+ * idea here is to avoid inserting duplicate checkpoints when the system
+ * is idle. That wastes log space, and more importantly it exposes us to
+ * possible loss of both current and previous checkpoint records if the
+ * machine crashes just as we're writing the update. (Perhaps it'd make
+ * even more sense to checkpoint only when the previous checkpoint record
+ * is in a different xlog page?)
+ *
+ * We have to make two tests to determine that nothing has happened since
+ * the start of the last checkpoint: current insertion point must match
+ * the end of the last checkpoint record, and its redo pointer must point
+ * to itself.
+ */
+ if (!shutdown)
+ {
+ XLogRecPtr curInsert;
+
+ INSERT_RECPTR(curInsert, Insert, Insert->curridx);
+ if (curInsert.xlogid == ControlFile->checkPoint.xlogid &&
+ curInsert.xrecoff == ControlFile->checkPoint.xrecoff +
+ MAXALIGN(SizeOfXLogRecord + sizeof(CheckPoint)) &&
+ ControlFile->checkPoint.xlogid ==
+ ControlFile->checkPointCopy.redo.xlogid &&
+ ControlFile->checkPoint.xrecoff ==
+ ControlFile->checkPointCopy.redo.xrecoff)
+ {
+ S_UNLOCK(&(XLogCtl->insert_lck));
+ S_UNLOCK(&(XLogCtl->chkp_lck));
+ END_CRIT_SECTION();
+ return;
+ }
+ }
+
+ /*
+ * Compute new REDO record ptr = location of next XLOG record.
+ *
+ * NB: this is NOT necessarily where the checkpoint record itself will
+ * be, since other backends may insert more XLOG records while we're
+ * off doing the buffer flush work. Those XLOG records are logically
+ * after the checkpoint, even though physically before it. Got that?
+ */
+ freespace = INSERT_FREESPACE(Insert);
if (freespace < SizeOfXLogRecord)
{
- curridx = NextBufIdx(Insert->curridx);
- if (XLByteLE(XLogCtl->xlblocks[curridx], LgwrResult.Write))
- InitXLBuffer(curridx);
- else
- GetFreeXLBuffer();
+ (void) AdvanceXLInsertBuffer();
+ /* OK to ignore update return flag, since we will do flush anyway */
freespace = BLCKSZ - SizeOfXLogPHD;
}
- else
- curridx = Insert->curridx;
- checkPoint.redo.xlogid = XLogCtl->xlblocks[curridx].xlogid;
- checkPoint.redo.xrecoff = XLogCtl->xlblocks[curridx].xrecoff - BLCKSZ +
- Insert->currpos - ((char *) Insert->currpage);
+ INSERT_RECPTR(checkPoint.redo, Insert, Insert->curridx);
+ /*
+ * Here we update the shared RedoRecPtr for future XLogInsert calls;
+ * this must be done while holding the insert lock.
+ */
RedoRecPtr = XLogCtl->Insert.RedoRecPtr = checkPoint.redo;
+ /*
+ * Get UNDO record ptr - this is oldest of PROC->logRec values.
+ * We do this while holding insert lock to ensure that we won't miss
+ * any about-to-commit transactions (UNDO must include all xacts that
+ * have commits after REDO point).
+ */
+ checkPoint.undo = GetUndoRecPtr();
+
+ if (shutdown && checkPoint.undo.xrecoff != 0)
+ elog(STOP, "Active transaction while data base is shutting down");
+
+ /*
+ * Now we can release insert lock, allowing other xacts to proceed
+ * even while we are flushing disk buffers.
+ */
S_UNLOCK(&(XLogCtl->insert_lck));
SpinAcquire(XidGenLockId);
checkPoint.nextXid = ShmemVariableCache->nextXid;
+ if (!shutdown)
+ checkPoint.nextXid += ShmemVariableCache->xidCount;
SpinRelease(XidGenLockId);
+
SpinAcquire(OidGenLockId);
checkPoint.nextOid = ShmemVariableCache->nextOid;
if (!shutdown)
checkPoint.nextOid += ShmemVariableCache->oidCount;
-
SpinRelease(OidGenLockId);
+ /*
+ * Having constructed the checkpoint record, ensure all shmem disk buffers
+ * are flushed to disk.
+ */
FlushBufferPool();
- /* Get UNDO record ptr - should use oldest of PROC->logRec */
- checkPoint.undo = GetUndoRecPtr();
-
- if (shutdown && checkPoint.undo.xrecoff != 0)
- elog(STOP, "Active transaction while data base is shutting down");
-
+ /*
+ * Now insert the checkpoint record into XLOG.
+ */
rdata.buffer = InvalidBuffer;
rdata.data = (char *)(&checkPoint);
rdata.len = sizeof(checkPoint);
rdata.next = NULL;
- recptr = XLogInsert(RM_XLOG_ID, XLOG_CHECKPOINT, &rdata);
+ recptr = XLogInsert(RM_XLOG_ID,
+ shutdown ? XLOG_CHECKPOINT_SHUTDOWN :
+ XLOG_CHECKPOINT_ONLINE,
+ &rdata);
+
+ XLogFlush(recptr);
- if (shutdown && !XLByteEQ(checkPoint.redo, MyLastRecPtr))
+ /*
+ * We now have ProcLastRecPtr = start of actual checkpoint record,
+ * recptr = end of actual checkpoint record.
+ */
+ if (shutdown && !XLByteEQ(checkPoint.redo, ProcLastRecPtr))
elog(STOP, "XLog concurrent activity while data base is shutting down");
- XLogFlush(recptr);
+ /*
+ * Remember location of prior checkpoint's earliest info.
+ * Oldest item is redo or undo, whichever is older; but watch out
+ * for case that undo = 0.
+ */
+ if (ControlFile->checkPointCopy.undo.xrecoff != 0 &&
+ XLByteLT(ControlFile->checkPointCopy.undo,
+ ControlFile->checkPointCopy.redo))
+ XLByteToSeg(ControlFile->checkPointCopy.undo, _logId, _logSeg);
+ else
+ XLByteToSeg(ControlFile->checkPointCopy.redo, _logId, _logSeg);
+ /*
+ * Update the control file.
+ */
SpinAcquire(ControlFileLockId);
if (shutdown)
- {
- /* probably should delete extra log files */
ControlFile->state = DB_SHUTDOWNED;
- }
- else /* create new log file(s) */
- {
- int lf;
- bool usexistent = true;
-
- _logId = recptr.xlogid;
- _logSeg = (recptr.xrecoff - 1) / XLogSegSize;
- if (XLOGfiles > 0)
- {
- struct timeval delay;
- int i;
-
- for (i = 1; i <= XLOGfiles; i++)
- {
- usexistent = true;
- NextLogSeg(_logId, _logSeg);
- lf = XLogFileInit(_logId, _logSeg, &usexistent);
- close(lf);
- /*
- * Give up ControlFileLockId for 1/50 sec to let other
- * backends switch to new log file in XLogWrite()
- */
- SpinRelease(ControlFileLockId);
- delay.tv_sec = 0;
- delay.tv_usec = 20000;
- (void) select(0, NULL, NULL, NULL, &delay);
- SpinAcquire(ControlFileLockId);
- }
- }
- else if ((recptr.xrecoff - 1) % XLogSegSize >=
- (uint32) (0.75 * XLogSegSize))
- {
- NextLogSeg(_logId, _logSeg);
- lf = XLogFileInit(_logId, _logSeg, &usexistent);
- close(lf);
- }
- }
-
- ControlFile->checkPoint = MyLastRecPtr;
- strcpy(archdir, ControlFile->archdir);
+ ControlFile->prevCheckPoint = ControlFile->checkPoint;
+ ControlFile->checkPoint = ProcLastRecPtr;
+ ControlFile->checkPointCopy = checkPoint;
ControlFile->time = time(NULL);
UpdateControlFile();
SpinRelease(ControlFileLockId);
/*
- * Delete offline log files. Get oldest online
- * log file from redo or undo record, whatever
- * is older.
+ * Delete offline log files (those no longer needed even for previous
+ * checkpoint).
*/
- if (checkPoint.undo.xrecoff != 0 &&
- XLByteLT(checkPoint.undo, checkPoint.redo))
- {
- _logId = checkPoint.undo.xlogid;
- _logSeg = checkPoint.undo.xrecoff / XLogSegSize;
- }
- else
- {
- _logId = checkPoint.redo.xlogid;
- _logSeg = checkPoint.redo.xrecoff / XLogSegSize;
- }
if (_logId || _logSeg)
{
- if (_logSeg)
- _logSeg--;
- else
- {
- _logId--;
- _logSeg = 0;
- }
- MoveOfflineLogs(archdir, _logId, _logSeg);
+ PrevLogSeg(_logId, _logSeg);
+ MoveOfflineLogs(_logId, _logSeg);
}
+ /*
+ * Make more log segments if needed. (Do this after deleting offline
+ * log segments, to avoid having peak disk space usage higher than
+ * necessary.)
+ */
+ if (!shutdown)
+ PreallocXlogFiles(recptr);
+
S_UNLOCK(&(XLogCtl->chkp_lck));
- MyLastRecPtr.xrecoff = 0; /* to avoid commit record */
END_CRIT_SECTION();
-
- return;
}
-void XLogPutNextOid(Oid nextOid);
+/*
+ * Write a NEXTXID log record
+ */
+void
+XLogPutNextXid(TransactionId nextXid)
+{
+ XLogRecData rdata;
+ rdata.buffer = InvalidBuffer;
+ rdata.data = (char *)(&nextXid);
+ rdata.len = sizeof(TransactionId);
+ rdata.next = NULL;
+ (void) XLogInsert(RM_XLOG_ID, XLOG_NEXTXID, &rdata);
+}
+
+/*
+ * Write a NEXTOID log record
+ */
void
XLogPutNextOid(Oid nextOid)
{
@@ -2282,18 +2686,63 @@ XLogPutNextOid(Oid nextOid)
(void) XLogInsert(RM_XLOG_ID, XLOG_NEXTOID, &rdata);
}
+/*
+ * XLOG resource manager's routines
+ */
void
xlog_redo(XLogRecPtr lsn, XLogRecord *record)
{
uint8 info = record->xl_info & ~XLR_INFO_MASK;
- if (info == XLOG_NEXTOID)
+ if (info == XLOG_NEXTXID)
+ {
+ TransactionId nextXid;
+
+ memcpy(&nextXid, XLogRecGetData(record), sizeof(TransactionId));
+ if (ShmemVariableCache->nextXid < nextXid)
+ {
+ ShmemVariableCache->nextXid = nextXid;
+ ShmemVariableCache->xidCount = 0;
+ }
+ }
+ else if (info == XLOG_NEXTOID)
{
Oid nextOid;
memcpy(&nextOid, XLogRecGetData(record), sizeof(Oid));
if (ShmemVariableCache->nextOid < nextOid)
+ {
ShmemVariableCache->nextOid = nextOid;
+ ShmemVariableCache->oidCount = 0;
+ }
+ }
+ else if (info == XLOG_CHECKPOINT_SHUTDOWN)
+ {
+ CheckPoint checkPoint;
+
+ memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
+ /* In a SHUTDOWN checkpoint, believe the counters exactly */
+ ShmemVariableCache->nextXid = checkPoint.nextXid;
+ ShmemVariableCache->xidCount = 0;
+ ShmemVariableCache->nextOid = checkPoint.nextOid;
+ ShmemVariableCache->oidCount = 0;
+ }
+ else if (info == XLOG_CHECKPOINT_ONLINE)
+ {
+ CheckPoint checkPoint;
+
+ memcpy(&checkPoint, XLogRecGetData(record), sizeof(CheckPoint));
+ /* In an ONLINE checkpoint, treat the counters like NEXTXID/NEXTOID */
+ if (ShmemVariableCache->nextXid < checkPoint.nextXid)
+ {
+ ShmemVariableCache->nextXid = checkPoint.nextXid;
+ ShmemVariableCache->xidCount = 0;
+ }
+ if (ShmemVariableCache->nextOid < checkPoint.nextOid)
+ {
+ ShmemVariableCache->nextOid = checkPoint.nextOid;
+ ShmemVariableCache->oidCount = 0;
+ }
}
}
@@ -2307,7 +2756,8 @@ xlog_desc(char *buf, uint8 xl_info, char* rec)
{
uint8 info = xl_info & ~XLR_INFO_MASK;
- if (info == XLOG_CHECKPOINT)
+ if (info == XLOG_CHECKPOINT_SHUTDOWN ||
+ info == XLOG_CHECKPOINT_ONLINE)
{
CheckPoint *checkpoint = (CheckPoint*) rec;
sprintf(buf + strlen(buf), "checkpoint: redo %u/%u; undo %u/%u; "
@@ -2316,7 +2766,14 @@ xlog_desc(char *buf, uint8 xl_info, char* rec)
checkpoint->undo.xlogid, checkpoint->undo.xrecoff,
checkpoint->ThisStartUpID, checkpoint->nextXid,
checkpoint->nextOid,
- (checkpoint->Shutdown) ? "shutdown" : "online");
+ (info == XLOG_CHECKPOINT_SHUTDOWN) ? "shutdown" : "online");
+ }
+ else if (info == XLOG_NEXTXID)
+ {
+ TransactionId nextXid;
+
+ memcpy(&nextXid, rec, sizeof(TransactionId));
+ sprintf(buf + strlen(buf), "nextXid: %u", nextXid);
}
else if (info == XLOG_NEXTOID)
{
@@ -2340,7 +2797,7 @@ xlog_outrec(char *buf, XLogRecord *record)
record->xl_xact_prev.xlogid, record->xl_xact_prev.xrecoff,
record->xl_xid);
- for (i = 0, bkpb = 0; i < 2; i++)
+ for (i = 0, bkpb = 0; i < XLR_MAX_BKP_BLOCKS; i++)
{
if (!(record->xl_info & (XLR_SET_BKP_BLOCK(i))))
continue;
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index 8735db1ae1c..8b80c326cab 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -6,10 +6,12 @@
* Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
+ * $Header: /cvsroot/pgsql/src/backend/access/transam/xlogutils.c,v 1.14 2001/03/13 01:17:05 tgl Exp $
+ *
*-------------------------------------------------------------------------
*/
-
#include "postgres.h"
+
#include "access/xlog.h"
#include "access/transam.h"
#include "access/xact.h"
diff --git a/src/backend/bootstrap/bootstrap.c b/src/backend/bootstrap/bootstrap.c
index 836a6c87ef8..95f73c356ad 100644
--- a/src/backend/bootstrap/bootstrap.c
+++ b/src/backend/bootstrap/bootstrap.c
@@ -8,7 +8,7 @@
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/bootstrap/bootstrap.c,v 1.104 2001/01/24 19:42:51 momjian Exp $
+ * $Header: /cvsroot/pgsql/src/backend/bootstrap/bootstrap.c,v 1.105 2001/03/13 01:17:05 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -18,13 +18,12 @@
#include <time.h>
#include <signal.h>
#include <setjmp.h>
-
-#define BOOTSTRAP_INCLUDE /* mask out stuff in tcop/tcopprot.h */
-
#ifdef HAVE_GETOPT_H
#include <getopt.h>
#endif
+#define BOOTSTRAP_INCLUDE /* mask out stuff in tcop/tcopprot.h */
+
#include "access/genam.h"
#include "access/heapam.h"
#include "access/xlog.h"
@@ -147,8 +146,6 @@ static MemoryContext nogc = NULL; /* special no-gc mem context */
extern int optind;
extern char *optarg;
-extern void SetRedoRecPtr(void);
-
/*
* At bootstrap time, we first declare all the indices to be built, and
* then build them. The IndexList structure stores enough information
@@ -294,8 +291,16 @@ BootstrapMain(int argc, char *argv[])
else if (argc - optind == 1)
dbName = argv[optind];
- SetProcessingMode(BootstrapProcessing);
- IgnoreSystemIndexes(true);
+ if (dbName == NULL)
+ {
+ dbName = getenv("USER");
+ if (dbName == NULL)
+ {
+ fputs("bootstrap backend: failed, no db name specified\n", stderr);
+ fputs(" and no USER enviroment variable\n", stderr);
+ proc_exit(1);
+ }
+ }
if (!IsUnderPostmaster)
{
@@ -312,29 +317,52 @@ BootstrapMain(int argc, char *argv[])
}
Assert(DataDir);
- if (dbName == NULL)
+ if (IsUnderPostmaster)
{
- dbName = getenv("USER");
- if (dbName == NULL)
- {
- fputs("bootstrap backend: failed, no db name specified\n", stderr);
- fputs(" and no USER enviroment variable\n", stderr);
- proc_exit(1);
- }
+ /*
+ * Properly accept or ignore signals the postmaster might send us
+ */
+ pqsignal(SIGHUP, SIG_IGN);
+ pqsignal(SIGINT, SIG_IGN); /* ignore query-cancel */
+ pqsignal(SIGTERM, die);
+ pqsignal(SIGQUIT, quickdie);
+ pqsignal(SIGUSR1, SIG_IGN);
+ pqsignal(SIGUSR2, SIG_IGN);
+ /*
+ * Reset some signals that are accepted by postmaster but not here
+ */
+ pqsignal(SIGCHLD, SIG_IGN);
+ pqsignal(SIGTTIN, SIG_DFL);
+ pqsignal(SIGTTOU, SIG_DFL);
+ pqsignal(SIGCONT, SIG_DFL);
+ pqsignal(SIGWINCH, SIG_DFL);
+ /*
+ * Unblock signals (they were blocked when the postmaster forked us)
+ */
+ PG_SETMASK(&UnBlockSig);
}
-
- XLOGPathInit();
-
- BaseInit();
-
- if (!IsUnderPostmaster)
+ else
{
+ /* Set up appropriately for interactive use */
pqsignal(SIGHUP, die);
pqsignal(SIGINT, die);
pqsignal(SIGTERM, die);
pqsignal(SIGQUIT, die);
+
+ /*
+ * Create lockfile for data directory.
+ */
+ if (! CreateDataDirLockFile(DataDir, false))
+ proc_exit(1);
}
+ SetProcessingMode(BootstrapProcessing);
+ IgnoreSystemIndexes(true);
+
+ XLOGPathInit();
+
+ BaseInit();
+
/*
* XLOG operations
*/
diff --git a/src/backend/port/beos/shm.c b/src/backend/port/beos/shm.c
index ee5b67fc9cb..e213ecb6646 100644
--- a/src/backend/port/beos/shm.c
+++ b/src/backend/port/beos/shm.c
@@ -57,12 +57,25 @@ int* shmat(int memId,int m1,int m2)
}
}
-/* Control a shared mem area : Used only to delete it */
-int shmctl(int shmid,int flag, struct shmid_ds* dummy)
+/* Control a shared mem area */
+int shmctl(int shmid, int flag, struct shmid_ds* dummy)
{
- /* Delete the area */
- delete_area(shmid);
- return 0;
+ if (flag == IPC_RMID)
+ {
+ /* Delete the area */
+ delete_area(shmid);
+ return 0;
+ }
+ if (flag == IPC_STAT)
+ {
+ /* Is there a way to check existence of an area given its ID?
+ * For now, punt and assume it does not exist.
+ */
+ errno = EINVAL;
+ return -1;
+ }
+ errno = EINVAL;
+ return -1;
}
/* Get an area based on the IPC key */
diff --git a/src/backend/port/qnx4/shm.c b/src/backend/port/qnx4/shm.c
index 4cf0486544b..9958f799384 100644
--- a/src/backend/port/qnx4/shm.c
+++ b/src/backend/port/qnx4/shm.c
@@ -7,7 +7,7 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/port/qnx4/Attic/shm.c,v 1.2 2000/04/12 17:15:30 momjian Exp $
+ * $Header: /cvsroot/pgsql/src/backend/port/qnx4/Attic/shm.c,v 1.3 2001/03/13 01:17:06 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -173,20 +173,25 @@ shmctl(int shmid, int cmd, struct shmid_ds * buf)
struct shm_info info;
char name[NAME_MAX + 1];
- /* IPC_RMID supported only */
- if (cmd != IPC_RMID)
+ if (cmd == IPC_RMID)
{
- errno = EINVAL;
- return -1;
+ if (shm_getinfo(shmid, &info) == -1)
+ {
+ errno = EACCES;
+ return -1;
+ }
+ return shm_unlink(itoa(info.key, name, 16));
}
-
- if (shm_getinfo(shmid, &info) == -1)
+ if (cmd == IPC_STAT)
{
- errno = EACCES;
+ /* Can we support IPC_STAT? We only need shm_nattch ...
+ * For now, punt and assume the shm seg does not exist.
+ */
+ errno = EINVAL;
return -1;
}
-
- return shm_unlink(itoa(info.key, name, 16));
+ errno = EINVAL;
+ return -1;
}
int
diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c
index e807d3d0696..6a0d1198652 100644
--- a/src/backend/postmaster/postmaster.c
+++ b/src/backend/postmaster/postmaster.c
@@ -6,12 +6,29 @@
* to the Postmaster and the postmaster uses the info in the
* message to setup a backend process.
*
+ * The postmaster also manages system-wide operations such as
+ * startup, shutdown, and periodic checkpoints. The postmaster
+ * itself doesn't do those operations, mind you --- it just forks
+ * off a subprocess to do them at the right times. It also takes
+ * care of resetting the system if a backend crashes.
+ *
+ * The postmaster process creates the shared memory and semaphore
+ * pools during startup, but as a rule does not touch them itself.
+ * In particular, it is not a member of the PROC array of backends
+ * and so it cannot participate in lock-manager operations. Keeping
+ * the postmaster away from shared memory operations makes it simpler
+ * and more reliable. The postmaster is almost always able to recover
+ * from crashes of individual backends by resetting shared memory;
+ * if it did much with shared memory then it would be prone to crashing
+ * along with the backends.
+ *
+ *
* Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/postmaster/postmaster.c,v 1.208 2001/02/20 01:34:40 tgl Exp $
+ * $Header: /cvsroot/pgsql/src/backend/postmaster/postmaster.c,v 1.209 2001/03/13 01:17:05 tgl Exp $
*
* NOTES
*
@@ -21,16 +38,15 @@
* lock manager.
*
* Synchronization:
- * The Postmaster shares memory with the backends and will have to lock
- * the shared memory it accesses. The Postmaster should never block
- * on messages from clients.
+ * The Postmaster shares memory with the backends but should avoid
+ * touching shared memory, so as not to become stuck if a crashing
+ * backend screws up locks or shared memory. Likewise, the Postmaster
+ * should never block on messages from frontend clients.
*
* Garbage Collection:
* The Postmaster cleans up after backends if they have an emergency
* exit and/or core dump.
*
- * Communication:
- *
*-------------------------------------------------------------------------
*/
@@ -194,8 +210,6 @@ extern char *optarg;
extern int optind,
opterr;
-extern void GetRedoRecPtr(void);
-
/*
* postmaster.c - function prototypes
*/
@@ -207,6 +221,7 @@ static void reset_shared(unsigned short port);
static void SIGHUP_handler(SIGNAL_ARGS);
static void pmdie(SIGNAL_ARGS);
static void reaper(SIGNAL_ARGS);
+static void schedule_checkpoint(SIGNAL_ARGS);
static void dumpstatus(SIGNAL_ARGS);
static void CleanupProc(int pid, int exitstatus);
static int DoBackend(Port *port);
@@ -491,7 +506,7 @@ PostmasterMain(int argc, char *argv[])
/*
* In the event that some backend dumps core, send
- * SIGSTOP, rather than SIGUSR1, to all its peers. This
+ * SIGSTOP, rather than SIGQUIT, to all its peers. This
* lets the wily post_hacker collect core dumps from
* everyone.
*/
@@ -643,11 +658,11 @@ PostmasterMain(int argc, char *argv[])
pqsignal(SIGHUP, SIGHUP_handler); /* reread config file and have children do same */
pqsignal(SIGINT, pmdie); /* send SIGTERM and ShutdownDataBase */
- pqsignal(SIGQUIT, pmdie); /* send SIGUSR1 and die */
+ pqsignal(SIGQUIT, pmdie); /* send SIGQUIT and die */
pqsignal(SIGTERM, pmdie); /* wait for children and ShutdownDataBase */
pqsignal(SIGALRM, SIG_IGN); /* ignored */
pqsignal(SIGPIPE, SIG_IGN); /* ignored */
- pqsignal(SIGUSR1, pmdie); /* currently ignored, but see note in pmdie */
+ pqsignal(SIGUSR1, schedule_checkpoint); /* start a background checkpoint */
pqsignal(SIGUSR2, pmdie); /* send SIGUSR2, don't die */
pqsignal(SIGCHLD, reaper); /* handle child termination */
pqsignal(SIGTTIN, SIG_IGN); /* ignored */
@@ -773,25 +788,22 @@ ServerLoop(void)
struct timeval *timeout = NULL;
struct timeval timeout_tv;
- if (CheckPointPID == 0 && checkpointed && !FatalError)
+ if (CheckPointPID == 0 && checkpointed &&
+ Shutdown == NoShutdown && !FatalError)
{
time_t now = time(NULL);
if (CheckPointTimeout + checkpointed > now)
{
+ /* Not time for checkpoint yet, so set a timeout for select */
timeout_tv.tv_sec = CheckPointTimeout + checkpointed - now;
timeout_tv.tv_usec = 0;
timeout = &timeout_tv;
}
else
{
+ /* Time to make the checkpoint... */
CheckPointPID = CheckPointDataBase();
- /*
- * Since this code is executed periodically, it's a fine
- * place to do other actions that should happen every now
- * and then on no particular schedule. Such as...
- */
- TouchSocketLockFile();
}
}
@@ -1256,7 +1268,7 @@ ConnCreate(int serverFd)
{
fprintf(stderr, "%s: ConnCreate: malloc failed\n",
progname);
- SignalChildren(SIGUSR1);
+ SignalChildren(SIGQUIT);
ExitPostmaster(1);
}
@@ -1374,7 +1386,7 @@ SIGHUP_handler(SIGNAL_ARGS)
/*
- * pmdie -- signal handler for cleaning up after a kill signal.
+ * pmdie -- signal handler for processing various postmaster signals.
*/
static void
pmdie(SIGNAL_ARGS)
@@ -1388,18 +1400,6 @@ pmdie(SIGNAL_ARGS)
switch (postgres_signal_arg)
{
- case SIGUSR1:
- /*
- * Currently the postmaster ignores SIGUSR1 (maybe it should
- * do something useful instead?) But we must have some handler
- * installed for SIGUSR1, not just set it to SIG_IGN. Else, a
- * freshly spawned backend would likewise have it set to SIG_IGN,
- * which would mean the backend would ignore any attempt to kill
- * it before it had gotten as far as setting up its own handler.
- */
- errno = save_errno;
- return;
-
case SIGUSR2:
/*
@@ -1419,7 +1419,7 @@ pmdie(SIGNAL_ARGS)
/*
* Smart Shutdown:
*
- * let children to end their work and ShutdownDataBase.
+ * Wait for children to end their work and ShutdownDataBase.
*/
if (Shutdown >= SmartShutdown)
{
@@ -1458,7 +1458,7 @@ pmdie(SIGNAL_ARGS)
* Fast Shutdown:
*
* abort all children with SIGTERM (rollback active transactions
- * and exit) and ShutdownDataBase.
+ * and exit) and ShutdownDataBase when they are gone.
*/
if (Shutdown >= FastShutdown)
{
@@ -1509,7 +1509,7 @@ pmdie(SIGNAL_ARGS)
/*
* Immediate Shutdown:
*
- * abort all children with SIGUSR1 and exit without attempt to
+ * abort all children with SIGQUIT and exit without attempt to
* properly shutdown data base system.
*/
tnow = time(NULL);
@@ -1517,10 +1517,10 @@ pmdie(SIGNAL_ARGS)
fflush(stderr);
if (ShutdownPID > 0)
kill(ShutdownPID, SIGQUIT);
- else if (StartupPID > 0)
+ if (StartupPID > 0)
kill(StartupPID, SIGQUIT);
- else if (DLGetHead(BackendList))
- SignalChildren(SIGUSR1);
+ if (DLGetHead(BackendList))
+ SignalChildren(SIGQUIT);
break;
}
@@ -1593,11 +1593,13 @@ reaper(SIGNAL_ARGS)
}
/*
- * Startup succeeded - remember its ID
- * and RedoRecPtr
+ * Startup succeeded - remember its ID and RedoRecPtr
*/
SetThisStartUpID();
+ /*
+ * Arrange for first checkpoint to occur after standard delay.
+ */
CheckPointPID = 0;
checkpointed = time(NULL);
@@ -1697,6 +1699,7 @@ CleanupProc(int pid,
if (!FatalError)
{
checkpointed = time(NULL);
+ /* Update RedoRecPtr for future child backends */
GetRedoRecPtr();
}
}
@@ -1731,7 +1734,7 @@ CleanupProc(int pid,
* This backend is still alive. Unless we did so already,
* tell it to commit hara-kiri.
*
- * SIGUSR1 is the special signal that says exit without proc_exit
+ * SIGQUIT is the special signal that says exit without proc_exit
* and let the user know what's going on. But if SendStop is set
* (-s on command line), then we send SIGSTOP instead, so that we
* can get core dumps from all backends by hand.
@@ -1741,9 +1744,9 @@ CleanupProc(int pid,
if (DebugLvl)
fprintf(stderr, "%s: CleanupProc: sending %s to process %d\n",
progname,
- (SendStop ? "SIGSTOP" : "SIGUSR1"),
+ (SendStop ? "SIGSTOP" : "SIGQUIT"),
bp->pid);
- kill(bp->pid, (SendStop ? SIGSTOP : SIGUSR1));
+ kill(bp->pid, (SendStop ? SIGSTOP : SIGQUIT));
}
}
else
@@ -1772,7 +1775,7 @@ CleanupProc(int pid,
}
/*
- * Send a signal to all chidren processes.
+ * Send a signal to all backend children.
*/
static void
SignalChildren(int signal)
@@ -2108,6 +2111,24 @@ ExitPostmaster(int status)
proc_exit(status);
}
+/* Request to schedule a checkpoint (no-op if one is currently running) */
+static void
+schedule_checkpoint(SIGNAL_ARGS)
+{
+ int save_errno = errno;
+
+ PG_SETMASK(&BlockSig);
+
+ /* Ignore request if checkpointing is currently disabled */
+ if (CheckPointPID == 0 && checkpointed &&
+ Shutdown == NoShutdown && !FatalError)
+ {
+ CheckPointPID = CheckPointDataBase();
+ }
+
+ errno = save_errno;
+}
+
static void
dumpstatus(SIGNAL_ARGS)
{
@@ -2116,12 +2137,13 @@ dumpstatus(SIGNAL_ARGS)
PG_SETMASK(&BlockSig);
+ fprintf(stderr, "%s: dumpstatus:\n", progname);
+
curr = DLGetHead(PortList);
while (curr)
{
Port *port = DLE_VAL(curr);
- fprintf(stderr, "%s: dumpstatus:\n", progname);
fprintf(stderr, "\tsock %d\n", port->sock);
curr = DLGetSucc(curr);
}
@@ -2240,6 +2262,9 @@ InitSSL(void)
#endif
+/*
+ * Fire off a subprocess for startup/shutdown/checkpoint.
+ */
static pid_t
SSDataBase(int xlop)
{
@@ -2273,7 +2298,7 @@ SSDataBase(int xlop)
/* Close the postmaster's sockets */
ClosePostmasterPorts(NULL);
-
+ /* Set up command-line arguments for subprocess */
av[ac++] = "postgres";
av[ac++] = "-d";
@@ -2293,14 +2318,6 @@ SSDataBase(int xlop)
optind = 1;
- pqsignal(SIGQUIT, SIG_DFL);
-#ifdef HAVE_SIGPROCMASK
- sigdelset(&BlockSig, SIGQUIT);
-#else
- BlockSig &= ~(sigmask(SIGQUIT));
-#endif
- PG_SETMASK(&BlockSig);
-
BootstrapMain(ac, av);
ExitPostmaster(0);
}
@@ -2320,19 +2337,31 @@ SSDataBase(int xlop)
ExitPostmaster(1);
}
- if (xlop != BS_XLOG_CHECKPOINT)
- return(pid);
-
- if (!(bn = (Backend *) calloc(1, sizeof(Backend))))
+ /*
+ * The startup and shutdown processes are not considered normal backends,
+ * but the checkpoint process is. Checkpoint must be added to the list
+ * of backends.
+ */
+ if (xlop == BS_XLOG_CHECKPOINT)
{
- fprintf(stderr, "%s: CheckPointDataBase: malloc failed\n",
- progname);
- ExitPostmaster(1);
- }
+ if (!(bn = (Backend *) calloc(1, sizeof(Backend))))
+ {
+ fprintf(stderr, "%s: CheckPointDataBase: malloc failed\n",
+ progname);
+ ExitPostmaster(1);
+ }
- bn->pid = pid;
- bn->cancel_key = 0;
- DLAddHead(BackendList, DLNewElem(bn));
+ bn->pid = pid;
+ bn->cancel_key = PostmasterRandom();
+ DLAddHead(BackendList, DLNewElem(bn));
+
+ /*
+ * Since this code is executed periodically, it's a fine
+ * place to do other actions that should happen every now
+ * and then on no particular schedule. Such as...
+ */
+ TouchSocketLockFile();
+ }
return (pid);
}
diff --git a/src/backend/storage/ipc/ipc.c b/src/backend/storage/ipc/ipc.c
index c529547fcfe..eb8d488bdd3 100644
--- a/src/backend/storage/ipc/ipc.c
+++ b/src/backend/storage/ipc/ipc.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/storage/ipc/ipc.c,v 1.62 2001/01/24 19:43:07 momjian Exp $
+ * $Header: /cvsroot/pgsql/src/backend/storage/ipc/ipc.c,v 1.63 2001/03/13 01:17:06 tgl Exp $
*
* NOTES
*
@@ -626,6 +626,9 @@ InternalIpcMemoryCreate(IpcMemoryKey memKey, uint32 size, int permission)
/* Register on-exit routine to detach new segment before deleting */
on_shmem_exit(IpcMemoryDetach, PointerGetDatum(memAddress));
+ /* Record key and ID in lockfile for data directory. */
+ RecordSharedMemoryInLockFile(memKey, shmid);
+
return memAddress;
}
@@ -660,6 +663,41 @@ IpcMemoryDelete(int status, Datum shmId)
*/
}
+/****************************************************************************/
+/* SharedMemoryIsInUse(shmKey, shmId) Is a shared memory segment in use? */
+/****************************************************************************/
+bool
+SharedMemoryIsInUse(IpcMemoryKey shmKey, IpcMemoryId shmId)
+{
+ struct shmid_ds shmStat;
+
+ /*
+ * We detect whether a shared memory segment is in use by seeing whether
+ * it (a) exists and (b) has any processes are attached to it.
+ *
+ * If we are unable to perform the stat operation for a reason other than
+ * nonexistence of the segment (most likely, because it doesn't belong to
+ * our userid), assume it is in use.
+ */
+ if (shmctl(shmId, IPC_STAT, &shmStat) < 0)
+ {
+ /*
+ * EINVAL actually has multiple possible causes documented in the
+ * shmctl man page, but we assume it must mean the segment no longer
+ * exists.
+ */
+ if (errno == EINVAL)
+ return false;
+ /* Else assume segment is in use */
+ return true;
+ }
+ /* If it has attached processes, it's in use */
+ if (shmStat.shm_nattch != 0)
+ return true;
+ return false;
+}
+
+
/* ----------------------------------------------------------------
* private memory support
*
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index fa0dbc13125..326a05a348f 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/tcop/postgres.c,v 1.209 2001/03/09 06:36:32 inoue Exp $
+ * $Header: /cvsroot/pgsql/src/backend/tcop/postgres.c,v 1.210 2001/03/13 01:17:06 tgl Exp $
*
* NOTES
* this is the "main" module of the postgres backend and
@@ -128,7 +128,6 @@ static void start_xact_command(void);
static void finish_xact_command(void);
static void SigHupHandler(SIGNAL_ARGS);
static void FloatExceptionHandler(SIGNAL_ARGS);
-static void quickdie(SIGNAL_ARGS);
/*
* Flag to mark SIGHUP. Whenever the main loop comes around it
@@ -895,12 +894,12 @@ finish_xact_command(void)
*/
/*
- * quickdie() occurs when signalled SIGUSR1 by the postmaster.
+ * quickdie() occurs when signalled SIGQUIT by the postmaster.
*
* Some backend has bought the farm,
* so we need to stop what we're doing and exit.
*/
-static void
+void
quickdie(SIGNAL_ARGS)
{
PG_SETMASK(&BlockSig);
@@ -917,7 +916,7 @@ quickdie(SIGNAL_ARGS)
* Just nail the windows shut and get out of town.
*
* Note we do exit(1) not exit(0). This is to force the postmaster
- * into a system reset cycle if some idiot DBA sends a manual SIGUSR1
+ * into a system reset cycle if some idiot DBA sends a manual SIGQUIT
* to a random backend. This is necessary precisely because we don't
* clean up our shared memory state.
*/
@@ -987,8 +986,8 @@ QueryCancelHandler(SIGNAL_ARGS)
InterruptHoldoffCount++;
if (LockWaitCancel())
{
- InterruptHoldoffCount--;
DisableNotifyInterrupt();
+ InterruptHoldoffCount--;
ProcessInterrupts();
}
else
@@ -1205,9 +1204,7 @@ PostgresMain(int argc, char *argv[], int real_argc, char *real_argv[], const cha
case 'D': /* PGDATA directory */
if (secure)
- {
potential_DataDir = optarg;
- }
break;
case 'd': /* debug level */
@@ -1243,13 +1240,10 @@ PostgresMain(int argc, char *argv[], int real_argc, char *real_argv[], const cha
case 'F':
/* --------------------
* turn off fsync
- *
- * 7.0 buffer manager can support different backends running
- * with different fsync settings, so this no longer needs
- * to be "if (secure)".
* --------------------
*/
- enableFsync = false;
+ if (secure)
+ enableFsync = false;
break;
case 'f':
@@ -1504,13 +1498,18 @@ PostgresMain(int argc, char *argv[], int real_argc, char *real_argv[], const cha
* Note that postmaster blocked all signals before forking child process,
* so there is no race condition whereby we might receive a signal before
* we have set up the handler.
+ *
+ * Also note: it's best not to use any signals that are SIG_IGNored in
+ * the postmaster. If such a signal arrives before we are able to change
+ * the handler to non-SIG_IGN, it'll get dropped. If necessary, make a
+ * dummy handler in the postmaster to reserve the signal.
*/
pqsignal(SIGHUP, SigHupHandler); /* set flag to read config file */
pqsignal(SIGINT, QueryCancelHandler); /* cancel current query */
pqsignal(SIGTERM, die); /* cancel current query and exit */
- pqsignal(SIGQUIT, die); /* could reassign this sig for another use */
- pqsignal(SIGALRM, HandleDeadLock);
+ pqsignal(SIGQUIT, quickdie); /* hard crash time */
+ pqsignal(SIGALRM, HandleDeadLock); /* check for deadlock after timeout */
/*
* Ignore failure to write to frontend. Note: if frontend closes
@@ -1519,7 +1518,7 @@ PostgresMain(int argc, char *argv[], int real_argc, char *real_argv[], const cha
* midst of output during who-knows-what operation...
*/
pqsignal(SIGPIPE, SIG_IGN);
- pqsignal(SIGUSR1, quickdie);
+ pqsignal(SIGUSR1, SIG_IGN); /* this signal available for use */
pqsignal(SIGUSR2, Async_NotifyHandler); /* flush also sinval cache */
pqsignal(SIGFPE, FloatExceptionHandler);
pqsignal(SIGCHLD, SIG_IGN); /* ignored (may get this in system() calls) */
@@ -1534,14 +1533,14 @@ PostgresMain(int argc, char *argv[], int real_argc, char *real_argv[], const cha
pqinitmask();
- /* We allow SIGUSR1 (quickdie) at all times */
+ /* We allow SIGQUIT (quickdie) at all times */
#ifdef HAVE_SIGPROCMASK
- sigdelset(&BlockSig, SIGUSR1);
+ sigdelset(&BlockSig, SIGQUIT);
#else
- BlockSig &= ~(sigmask(SIGUSR1));
+ BlockSig &= ~(sigmask(SIGQUIT));
#endif
- PG_SETMASK(&BlockSig); /* block everything except SIGUSR1 */
+ PG_SETMASK(&BlockSig); /* block everything except SIGQUIT */
if (IsUnderPostmaster)
@@ -1693,7 +1692,7 @@ PostgresMain(int argc, char *argv[], int real_argc, char *real_argv[], const cha
if (!IsUnderPostmaster)
{
puts("\nPOSTGRES backend interactive interface ");
- puts("$Revision: 1.209 $ $Date: 2001/03/09 06:36:32 $\n");
+ puts("$Revision: 1.210 $ $Date: 2001/03/13 01:17:06 $\n");
}
/*
diff --git a/src/backend/utils/hash/Makefile b/src/backend/utils/hash/Makefile
index 38db1cda30c..a3749e37d26 100644
--- a/src/backend/utils/hash/Makefile
+++ b/src/backend/utils/hash/Makefile
@@ -4,7 +4,7 @@
# Makefile for utils/hash
#
# IDENTIFICATION
-# $Header: /cvsroot/pgsql/src/backend/utils/hash/Makefile,v 1.9 2000/08/31 16:10:51 petere Exp $
+# $Header: /cvsroot/pgsql/src/backend/utils/hash/Makefile,v 1.10 2001/03/13 01:17:06 tgl Exp $
#
#-------------------------------------------------------------------------
@@ -12,7 +12,7 @@ subdir = src/backend/utils/hash
top_builddir = ../../../..
include $(top_builddir)/src/Makefile.global
-OBJS = dynahash.o hashfn.o
+OBJS = dynahash.o hashfn.o pg_crc.o
all: SUBSYS.o
diff --git a/src/backend/utils/hash/pg_crc.c b/src/backend/utils/hash/pg_crc.c
new file mode 100644
index 00000000000..96413f3b8b4
--- /dev/null
+++ b/src/backend/utils/hash/pg_crc.c
@@ -0,0 +1,417 @@
+/*-------------------------------------------------------------------------
+ *
+ * pg_crc.c
+ * PostgreSQL 64-bit CRC support
+ *
+ * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ * $Header: /cvsroot/pgsql/src/backend/utils/hash/pg_crc.c,v 1.1 2001/03/13 01:17:06 tgl Exp $
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include "utils/pg_crc.h"
+
+
+#ifdef INT64_IS_BUSTED
+
+const uint32 crc_table0[256] = {
+ 0x00000000, 0xA9EA3693,
+ 0x53D46D26, 0xFA3E5BB5,
+ 0x0E42ECDF, 0xA7A8DA4C,
+ 0x5D9681F9, 0xF47CB76A,
+ 0x1C85D9BE, 0xB56FEF2D,
+ 0x4F51B498, 0xE6BB820B,
+ 0x12C73561, 0xBB2D03F2,
+ 0x41135847, 0xE8F96ED4,
+ 0x90E185EF, 0x390BB37C,
+ 0xC335E8C9, 0x6ADFDE5A,
+ 0x9EA36930, 0x37495FA3,
+ 0xCD770416, 0x649D3285,
+ 0x8C645C51, 0x258E6AC2,
+ 0xDFB03177, 0x765A07E4,
+ 0x8226B08E, 0x2BCC861D,
+ 0xD1F2DDA8, 0x7818EB3B,
+ 0x21C30BDE, 0x88293D4D,
+ 0x721766F8, 0xDBFD506B,
+ 0x2F81E701, 0x866BD192,
+ 0x7C558A27, 0xD5BFBCB4,
+ 0x3D46D260, 0x94ACE4F3,
+ 0x6E92BF46, 0xC77889D5,
+ 0x33043EBF, 0x9AEE082C,
+ 0x60D05399, 0xC93A650A,
+ 0xB1228E31, 0x18C8B8A2,
+ 0xE2F6E317, 0x4B1CD584,
+ 0xBF6062EE, 0x168A547D,
+ 0xECB40FC8, 0x455E395B,
+ 0xADA7578F, 0x044D611C,
+ 0xFE733AA9, 0x57990C3A,
+ 0xA3E5BB50, 0x0A0F8DC3,
+ 0xF031D676, 0x59DBE0E5,
+ 0xEA6C212F, 0x438617BC,
+ 0xB9B84C09, 0x10527A9A,
+ 0xE42ECDF0, 0x4DC4FB63,
+ 0xB7FAA0D6, 0x1E109645,
+ 0xF6E9F891, 0x5F03CE02,
+ 0xA53D95B7, 0x0CD7A324,
+ 0xF8AB144E, 0x514122DD,
+ 0xAB7F7968, 0x02954FFB,
+ 0x7A8DA4C0, 0xD3679253,
+ 0x2959C9E6, 0x80B3FF75,
+ 0x74CF481F, 0xDD257E8C,
+ 0x271B2539, 0x8EF113AA,
+ 0x66087D7E, 0xCFE24BED,
+ 0x35DC1058, 0x9C3626CB,
+ 0x684A91A1, 0xC1A0A732,
+ 0x3B9EFC87, 0x9274CA14,
+ 0xCBAF2AF1, 0x62451C62,
+ 0x987B47D7, 0x31917144,
+ 0xC5EDC62E, 0x6C07F0BD,
+ 0x9639AB08, 0x3FD39D9B,
+ 0xD72AF34F, 0x7EC0C5DC,
+ 0x84FE9E69, 0x2D14A8FA,
+ 0xD9681F90, 0x70822903,
+ 0x8ABC72B6, 0x23564425,
+ 0x5B4EAF1E, 0xF2A4998D,
+ 0x089AC238, 0xA170F4AB,
+ 0x550C43C1, 0xFCE67552,
+ 0x06D82EE7, 0xAF321874,
+ 0x47CB76A0, 0xEE214033,
+ 0x141F1B86, 0xBDF52D15,
+ 0x49899A7F, 0xE063ACEC,
+ 0x1A5DF759, 0xB3B7C1CA,
+ 0x7D3274CD, 0xD4D8425E,
+ 0x2EE619EB, 0x870C2F78,
+ 0x73709812, 0xDA9AAE81,
+ 0x20A4F534, 0x894EC3A7,
+ 0x61B7AD73, 0xC85D9BE0,
+ 0x3263C055, 0x9B89F6C6,
+ 0x6FF541AC, 0xC61F773F,
+ 0x3C212C8A, 0x95CB1A19,
+ 0xEDD3F122, 0x4439C7B1,
+ 0xBE079C04, 0x17EDAA97,
+ 0xE3911DFD, 0x4A7B2B6E,
+ 0xB04570DB, 0x19AF4648,
+ 0xF156289C, 0x58BC1E0F,
+ 0xA28245BA, 0x0B687329,
+ 0xFF14C443, 0x56FEF2D0,
+ 0xACC0A965, 0x052A9FF6,
+ 0x5CF17F13, 0xF51B4980,
+ 0x0F251235, 0xA6CF24A6,
+ 0x52B393CC, 0xFB59A55F,
+ 0x0167FEEA, 0xA88DC879,
+ 0x4074A6AD, 0xE99E903E,
+ 0x13A0CB8B, 0xBA4AFD18,
+ 0x4E364A72, 0xE7DC7CE1,
+ 0x1DE22754, 0xB40811C7,
+ 0xCC10FAFC, 0x65FACC6F,
+ 0x9FC497DA, 0x362EA149,
+ 0xC2521623, 0x6BB820B0,
+ 0x91867B05, 0x386C4D96,
+ 0xD0952342, 0x797F15D1,
+ 0x83414E64, 0x2AAB78F7,
+ 0xDED7CF9D, 0x773DF90E,
+ 0x8D03A2BB, 0x24E99428,
+ 0x975E55E2, 0x3EB46371,
+ 0xC48A38C4, 0x6D600E57,
+ 0x991CB93D, 0x30F68FAE,
+ 0xCAC8D41B, 0x6322E288,
+ 0x8BDB8C5C, 0x2231BACF,
+ 0xD80FE17A, 0x71E5D7E9,
+ 0x85996083, 0x2C735610,
+ 0xD64D0DA5, 0x7FA73B36,
+ 0x07BFD00D, 0xAE55E69E,
+ 0x546BBD2B, 0xFD818BB8,
+ 0x09FD3CD2, 0xA0170A41,
+ 0x5A2951F4, 0xF3C36767,
+ 0x1B3A09B3, 0xB2D03F20,
+ 0x48EE6495, 0xE1045206,
+ 0x1578E56C, 0xBC92D3FF,
+ 0x46AC884A, 0xEF46BED9,
+ 0xB69D5E3C, 0x1F7768AF,
+ 0xE549331A, 0x4CA30589,
+ 0xB8DFB2E3, 0x11358470,
+ 0xEB0BDFC5, 0x42E1E956,
+ 0xAA188782, 0x03F2B111,
+ 0xF9CCEAA4, 0x5026DC37,
+ 0xA45A6B5D, 0x0DB05DCE,
+ 0xF78E067B, 0x5E6430E8,
+ 0x267CDBD3, 0x8F96ED40,
+ 0x75A8B6F5, 0xDC428066,
+ 0x283E370C, 0x81D4019F,
+ 0x7BEA5A2A, 0xD2006CB9,
+ 0x3AF9026D, 0x931334FE,
+ 0x692D6F4B, 0xC0C759D8,
+ 0x34BBEEB2, 0x9D51D821,
+ 0x676F8394, 0xCE85B507
+};
+
+const uint32 crc_table1[256] = {
+ 0x00000000, 0x42F0E1EB,
+ 0x85E1C3D7, 0xC711223C,
+ 0x49336645, 0x0BC387AE,
+ 0xCCD2A592, 0x8E224479,
+ 0x9266CC8A, 0xD0962D61,
+ 0x17870F5D, 0x5577EEB6,
+ 0xDB55AACF, 0x99A54B24,
+ 0x5EB46918, 0x1C4488F3,
+ 0x663D78FF, 0x24CD9914,
+ 0xE3DCBB28, 0xA12C5AC3,
+ 0x2F0E1EBA, 0x6DFEFF51,
+ 0xAAEFDD6D, 0xE81F3C86,
+ 0xF45BB475, 0xB6AB559E,
+ 0x71BA77A2, 0x334A9649,
+ 0xBD68D230, 0xFF9833DB,
+ 0x388911E7, 0x7A79F00C,
+ 0xCC7AF1FF, 0x8E8A1014,
+ 0x499B3228, 0x0B6BD3C3,
+ 0x854997BA, 0xC7B97651,
+ 0x00A8546D, 0x4258B586,
+ 0x5E1C3D75, 0x1CECDC9E,
+ 0xDBFDFEA2, 0x990D1F49,
+ 0x172F5B30, 0x55DFBADB,
+ 0x92CE98E7, 0xD03E790C,
+ 0xAA478900, 0xE8B768EB,
+ 0x2FA64AD7, 0x6D56AB3C,
+ 0xE374EF45, 0xA1840EAE,
+ 0x66952C92, 0x2465CD79,
+ 0x3821458A, 0x7AD1A461,
+ 0xBDC0865D, 0xFF3067B6,
+ 0x711223CF, 0x33E2C224,
+ 0xF4F3E018, 0xB60301F3,
+ 0xDA050215, 0x98F5E3FE,
+ 0x5FE4C1C2, 0x1D142029,
+ 0x93366450, 0xD1C685BB,
+ 0x16D7A787, 0x5427466C,
+ 0x4863CE9F, 0x0A932F74,
+ 0xCD820D48, 0x8F72ECA3,
+ 0x0150A8DA, 0x43A04931,
+ 0x84B16B0D, 0xC6418AE6,
+ 0xBC387AEA, 0xFEC89B01,
+ 0x39D9B93D, 0x7B2958D6,
+ 0xF50B1CAF, 0xB7FBFD44,
+ 0x70EADF78, 0x321A3E93,
+ 0x2E5EB660, 0x6CAE578B,
+ 0xABBF75B7, 0xE94F945C,
+ 0x676DD025, 0x259D31CE,
+ 0xE28C13F2, 0xA07CF219,
+ 0x167FF3EA, 0x548F1201,
+ 0x939E303D, 0xD16ED1D6,
+ 0x5F4C95AF, 0x1DBC7444,
+ 0xDAAD5678, 0x985DB793,
+ 0x84193F60, 0xC6E9DE8B,
+ 0x01F8FCB7, 0x43081D5C,
+ 0xCD2A5925, 0x8FDAB8CE,
+ 0x48CB9AF2, 0x0A3B7B19,
+ 0x70428B15, 0x32B26AFE,
+ 0xF5A348C2, 0xB753A929,
+ 0x3971ED50, 0x7B810CBB,
+ 0xBC902E87, 0xFE60CF6C,
+ 0xE224479F, 0xA0D4A674,
+ 0x67C58448, 0x253565A3,
+ 0xAB1721DA, 0xE9E7C031,
+ 0x2EF6E20D, 0x6C0603E6,
+ 0xF6FAE5C0, 0xB40A042B,
+ 0x731B2617, 0x31EBC7FC,
+ 0xBFC98385, 0xFD39626E,
+ 0x3A284052, 0x78D8A1B9,
+ 0x649C294A, 0x266CC8A1,
+ 0xE17DEA9D, 0xA38D0B76,
+ 0x2DAF4F0F, 0x6F5FAEE4,
+ 0xA84E8CD8, 0xEABE6D33,
+ 0x90C79D3F, 0xD2377CD4,
+ 0x15265EE8, 0x57D6BF03,
+ 0xD9F4FB7A, 0x9B041A91,
+ 0x5C1538AD, 0x1EE5D946,
+ 0x02A151B5, 0x4051B05E,
+ 0x87409262, 0xC5B07389,
+ 0x4B9237F0, 0x0962D61B,
+ 0xCE73F427, 0x8C8315CC,
+ 0x3A80143F, 0x7870F5D4,
+ 0xBF61D7E8, 0xFD913603,
+ 0x73B3727A, 0x31439391,
+ 0xF652B1AD, 0xB4A25046,
+ 0xA8E6D8B5, 0xEA16395E,
+ 0x2D071B62, 0x6FF7FA89,
+ 0xE1D5BEF0, 0xA3255F1B,
+ 0x64347D27, 0x26C49CCC,
+ 0x5CBD6CC0, 0x1E4D8D2B,
+ 0xD95CAF17, 0x9BAC4EFC,
+ 0x158E0A85, 0x577EEB6E,
+ 0x906FC952, 0xD29F28B9,
+ 0xCEDBA04A, 0x8C2B41A1,
+ 0x4B3A639D, 0x09CA8276,
+ 0x87E8C60F, 0xC51827E4,
+ 0x020905D8, 0x40F9E433,
+ 0x2CFFE7D5, 0x6E0F063E,
+ 0xA91E2402, 0xEBEEC5E9,
+ 0x65CC8190, 0x273C607B,
+ 0xE02D4247, 0xA2DDA3AC,
+ 0xBE992B5F, 0xFC69CAB4,
+ 0x3B78E888, 0x79880963,
+ 0xF7AA4D1A, 0xB55AACF1,
+ 0x724B8ECD, 0x30BB6F26,
+ 0x4AC29F2A, 0x08327EC1,
+ 0xCF235CFD, 0x8DD3BD16,
+ 0x03F1F96F, 0x41011884,
+ 0x86103AB8, 0xC4E0DB53,
+ 0xD8A453A0, 0x9A54B24B,
+ 0x5D459077, 0x1FB5719C,
+ 0x919735E5, 0xD367D40E,
+ 0x1476F632, 0x568617D9,
+ 0xE085162A, 0xA275F7C1,
+ 0x6564D5FD, 0x27943416,
+ 0xA9B6706F, 0xEB469184,
+ 0x2C57B3B8, 0x6EA75253,
+ 0x72E3DAA0, 0x30133B4B,
+ 0xF7021977, 0xB5F2F89C,
+ 0x3BD0BCE5, 0x79205D0E,
+ 0xBE317F32, 0xFCC19ED9,
+ 0x86B86ED5, 0xC4488F3E,
+ 0x0359AD02, 0x41A94CE9,
+ 0xCF8B0890, 0x8D7BE97B,
+ 0x4A6ACB47, 0x089A2AAC,
+ 0x14DEA25F, 0x562E43B4,
+ 0x913F6188, 0xD3CF8063,
+ 0x5DEDC41A, 0x1F1D25F1,
+ 0xD80C07CD, 0x9AFCE626
+};
+
+#else /* int64 works */
+
+const uint64 crc_table[256] = {
+ 0x0000000000000000, 0x42F0E1EBA9EA3693,
+ 0x85E1C3D753D46D26, 0xC711223CFA3E5BB5,
+ 0x493366450E42ECDF, 0x0BC387AEA7A8DA4C,
+ 0xCCD2A5925D9681F9, 0x8E224479F47CB76A,
+ 0x9266CC8A1C85D9BE, 0xD0962D61B56FEF2D,
+ 0x17870F5D4F51B498, 0x5577EEB6E6BB820B,
+ 0xDB55AACF12C73561, 0x99A54B24BB2D03F2,
+ 0x5EB4691841135847, 0x1C4488F3E8F96ED4,
+ 0x663D78FF90E185EF, 0x24CD9914390BB37C,
+ 0xE3DCBB28C335E8C9, 0xA12C5AC36ADFDE5A,
+ 0x2F0E1EBA9EA36930, 0x6DFEFF5137495FA3,
+ 0xAAEFDD6DCD770416, 0xE81F3C86649D3285,
+ 0xF45BB4758C645C51, 0xB6AB559E258E6AC2,
+ 0x71BA77A2DFB03177, 0x334A9649765A07E4,
+ 0xBD68D2308226B08E, 0xFF9833DB2BCC861D,
+ 0x388911E7D1F2DDA8, 0x7A79F00C7818EB3B,
+ 0xCC7AF1FF21C30BDE, 0x8E8A101488293D4D,
+ 0x499B3228721766F8, 0x0B6BD3C3DBFD506B,
+ 0x854997BA2F81E701, 0xC7B97651866BD192,
+ 0x00A8546D7C558A27, 0x4258B586D5BFBCB4,
+ 0x5E1C3D753D46D260, 0x1CECDC9E94ACE4F3,
+ 0xDBFDFEA26E92BF46, 0x990D1F49C77889D5,
+ 0x172F5B3033043EBF, 0x55DFBADB9AEE082C,
+ 0x92CE98E760D05399, 0xD03E790CC93A650A,
+ 0xAA478900B1228E31, 0xE8B768EB18C8B8A2,
+ 0x2FA64AD7E2F6E317, 0x6D56AB3C4B1CD584,
+ 0xE374EF45BF6062EE, 0xA1840EAE168A547D,
+ 0x66952C92ECB40FC8, 0x2465CD79455E395B,
+ 0x3821458AADA7578F, 0x7AD1A461044D611C,
+ 0xBDC0865DFE733AA9, 0xFF3067B657990C3A,
+ 0x711223CFA3E5BB50, 0x33E2C2240A0F8DC3,
+ 0xF4F3E018F031D676, 0xB60301F359DBE0E5,
+ 0xDA050215EA6C212F, 0x98F5E3FE438617BC,
+ 0x5FE4C1C2B9B84C09, 0x1D14202910527A9A,
+ 0x93366450E42ECDF0, 0xD1C685BB4DC4FB63,
+ 0x16D7A787B7FAA0D6, 0x5427466C1E109645,
+ 0x4863CE9FF6E9F891, 0x0A932F745F03CE02,
+ 0xCD820D48A53D95B7, 0x8F72ECA30CD7A324,
+ 0x0150A8DAF8AB144E, 0x43A04931514122DD,
+ 0x84B16B0DAB7F7968, 0xC6418AE602954FFB,
+ 0xBC387AEA7A8DA4C0, 0xFEC89B01D3679253,
+ 0x39D9B93D2959C9E6, 0x7B2958D680B3FF75,
+ 0xF50B1CAF74CF481F, 0xB7FBFD44DD257E8C,
+ 0x70EADF78271B2539, 0x321A3E938EF113AA,
+ 0x2E5EB66066087D7E, 0x6CAE578BCFE24BED,
+ 0xABBF75B735DC1058, 0xE94F945C9C3626CB,
+ 0x676DD025684A91A1, 0x259D31CEC1A0A732,
+ 0xE28C13F23B9EFC87, 0xA07CF2199274CA14,
+ 0x167FF3EACBAF2AF1, 0x548F120162451C62,
+ 0x939E303D987B47D7, 0xD16ED1D631917144,
+ 0x5F4C95AFC5EDC62E, 0x1DBC74446C07F0BD,
+ 0xDAAD56789639AB08, 0x985DB7933FD39D9B,
+ 0x84193F60D72AF34F, 0xC6E9DE8B7EC0C5DC,
+ 0x01F8FCB784FE9E69, 0x43081D5C2D14A8FA,
+ 0xCD2A5925D9681F90, 0x8FDAB8CE70822903,
+ 0x48CB9AF28ABC72B6, 0x0A3B7B1923564425,
+ 0x70428B155B4EAF1E, 0x32B26AFEF2A4998D,
+ 0xF5A348C2089AC238, 0xB753A929A170F4AB,
+ 0x3971ED50550C43C1, 0x7B810CBBFCE67552,
+ 0xBC902E8706D82EE7, 0xFE60CF6CAF321874,
+ 0xE224479F47CB76A0, 0xA0D4A674EE214033,
+ 0x67C58448141F1B86, 0x253565A3BDF52D15,
+ 0xAB1721DA49899A7F, 0xE9E7C031E063ACEC,
+ 0x2EF6E20D1A5DF759, 0x6C0603E6B3B7C1CA,
+ 0xF6FAE5C07D3274CD, 0xB40A042BD4D8425E,
+ 0x731B26172EE619EB, 0x31EBC7FC870C2F78,
+ 0xBFC9838573709812, 0xFD39626EDA9AAE81,
+ 0x3A28405220A4F534, 0x78D8A1B9894EC3A7,
+ 0x649C294A61B7AD73, 0x266CC8A1C85D9BE0,
+ 0xE17DEA9D3263C055, 0xA38D0B769B89F6C6,
+ 0x2DAF4F0F6FF541AC, 0x6F5FAEE4C61F773F,
+ 0xA84E8CD83C212C8A, 0xEABE6D3395CB1A19,
+ 0x90C79D3FEDD3F122, 0xD2377CD44439C7B1,
+ 0x15265EE8BE079C04, 0x57D6BF0317EDAA97,
+ 0xD9F4FB7AE3911DFD, 0x9B041A914A7B2B6E,
+ 0x5C1538ADB04570DB, 0x1EE5D94619AF4648,
+ 0x02A151B5F156289C, 0x4051B05E58BC1E0F,
+ 0x87409262A28245BA, 0xC5B073890B687329,
+ 0x4B9237F0FF14C443, 0x0962D61B56FEF2D0,
+ 0xCE73F427ACC0A965, 0x8C8315CC052A9FF6,
+ 0x3A80143F5CF17F13, 0x7870F5D4F51B4980,
+ 0xBF61D7E80F251235, 0xFD913603A6CF24A6,
+ 0x73B3727A52B393CC, 0x31439391FB59A55F,
+ 0xF652B1AD0167FEEA, 0xB4A25046A88DC879,
+ 0xA8E6D8B54074A6AD, 0xEA16395EE99E903E,
+ 0x2D071B6213A0CB8B, 0x6FF7FA89BA4AFD18,
+ 0xE1D5BEF04E364A72, 0xA3255F1BE7DC7CE1,
+ 0x64347D271DE22754, 0x26C49CCCB40811C7,
+ 0x5CBD6CC0CC10FAFC, 0x1E4D8D2B65FACC6F,
+ 0xD95CAF179FC497DA, 0x9BAC4EFC362EA149,
+ 0x158E0A85C2521623, 0x577EEB6E6BB820B0,
+ 0x906FC95291867B05, 0xD29F28B9386C4D96,
+ 0xCEDBA04AD0952342, 0x8C2B41A1797F15D1,
+ 0x4B3A639D83414E64, 0x09CA82762AAB78F7,
+ 0x87E8C60FDED7CF9D, 0xC51827E4773DF90E,
+ 0x020905D88D03A2BB, 0x40F9E43324E99428,
+ 0x2CFFE7D5975E55E2, 0x6E0F063E3EB46371,
+ 0xA91E2402C48A38C4, 0xEBEEC5E96D600E57,
+ 0x65CC8190991CB93D, 0x273C607B30F68FAE,
+ 0xE02D4247CAC8D41B, 0xA2DDA3AC6322E288,
+ 0xBE992B5F8BDB8C5C, 0xFC69CAB42231BACF,
+ 0x3B78E888D80FE17A, 0x7988096371E5D7E9,
+ 0xF7AA4D1A85996083, 0xB55AACF12C735610,
+ 0x724B8ECDD64D0DA5, 0x30BB6F267FA73B36,
+ 0x4AC29F2A07BFD00D, 0x08327EC1AE55E69E,
+ 0xCF235CFD546BBD2B, 0x8DD3BD16FD818BB8,
+ 0x03F1F96F09FD3CD2, 0x41011884A0170A41,
+ 0x86103AB85A2951F4, 0xC4E0DB53F3C36767,
+ 0xD8A453A01B3A09B3, 0x9A54B24BB2D03F20,
+ 0x5D45907748EE6495, 0x1FB5719CE1045206,
+ 0x919735E51578E56C, 0xD367D40EBC92D3FF,
+ 0x1476F63246AC884A, 0x568617D9EF46BED9,
+ 0xE085162AB69D5E3C, 0xA275F7C11F7768AF,
+ 0x6564D5FDE549331A, 0x279434164CA30589,
+ 0xA9B6706FB8DFB2E3, 0xEB46918411358470,
+ 0x2C57B3B8EB0BDFC5, 0x6EA7525342E1E956,
+ 0x72E3DAA0AA188782, 0x30133B4B03F2B111,
+ 0xF7021977F9CCEAA4, 0xB5F2F89C5026DC37,
+ 0x3BD0BCE5A45A6B5D, 0x79205D0E0DB05DCE,
+ 0xBE317F32F78E067B, 0xFCC19ED95E6430E8,
+ 0x86B86ED5267CDBD3, 0xC4488F3E8F96ED40,
+ 0x0359AD0275A8B6F5, 0x41A94CE9DC428066,
+ 0xCF8B0890283E370C, 0x8D7BE97B81D4019F,
+ 0x4A6ACB477BEA5A2A, 0x089A2AACD2006CB9,
+ 0x14DEA25F3AF9026D, 0x562E43B4931334FE,
+ 0x913F6188692D6F4B, 0xD3CF8063C0C759D8,
+ 0x5DEDC41A34BBEEB2, 0x1F1D25F19D51D821,
+ 0xD80C07CD676F8394, 0x9AFCE626CE85B507
+};
+
+#endif /* INT64_IS_BUSTED */
diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c
index 7e072a88b2a..43331badc7a 100644
--- a/src/backend/utils/init/globals.c
+++ b/src/backend/utils/init/globals.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/utils/init/globals.c,v 1.53 2001/01/24 19:43:16 momjian Exp $
+ * $Header: /cvsroot/pgsql/src/backend/utils/init/globals.c,v 1.54 2001/03/13 01:17:06 tgl Exp $
*
* NOTES
* Globals used all over the place should be declared here and not
@@ -16,14 +16,14 @@
*
*-------------------------------------------------------------------------
*/
+#include "postgres.h"
+
#include <fcntl.h>
#include <sys/file.h>
#include <sys/types.h>
#include <math.h>
#include <unistd.h>
-#include "postgres.h"
-
#include "catalog/catname.h"
#include "catalog/indexing.h"
#include "libpq/pqcomm.h"
@@ -119,49 +119,3 @@ char *SharedSystemRelationNames[] = {
VariableRelationName,
0
};
-
-uint32 crc_table[] = {
-0x00000000, 0x77073096, 0xee0e612c, 0x990951ba, 0x076dc419, 0x706af48f,
-0xe963a535, 0x9e6495a3, 0x0edb8832, 0x79dcb8a4, 0xe0d5e91e, 0x97d2d988,
-0x09b64c2b, 0x7eb17cbd, 0xe7b82d07, 0x90bf1d91, 0x1db71064, 0x6ab020f2,
-0xf3b97148, 0x84be41de, 0x1adad47d, 0x6ddde4eb, 0xf4d4b551, 0x83d385c7,
-0x136c9856, 0x646ba8c0, 0xfd62f97a, 0x8a65c9ec, 0x14015c4f, 0x63066cd9,
-0xfa0f3d63, 0x8d080df5, 0x3b6e20c8, 0x4c69105e, 0xd56041e4, 0xa2677172,
-0x3c03e4d1, 0x4b04d447, 0xd20d85fd, 0xa50ab56b, 0x35b5a8fa, 0x42b2986c,
-0xdbbbc9d6, 0xacbcf940, 0x32d86ce3, 0x45df5c75, 0xdcd60dcf, 0xabd13d59,
-0x26d930ac, 0x51de003a, 0xc8d75180, 0xbfd06116, 0x21b4f4b5, 0x56b3c423,
-0xcfba9599, 0xb8bda50f, 0x2802b89e, 0x5f058808, 0xc60cd9b2, 0xb10be924,
-0x2f6f7c87, 0x58684c11, 0xc1611dab, 0xb6662d3d, 0x76dc4190, 0x01db7106,
-0x98d220bc, 0xefd5102a, 0x71b18589, 0x06b6b51f, 0x9fbfe4a5, 0xe8b8d433,
-0x7807c9a2, 0x0f00f934, 0x9609a88e, 0xe10e9818, 0x7f6a0dbb, 0x086d3d2d,
-0x91646c97, 0xe6635c01, 0x6b6b51f4, 0x1c6c6162, 0x856530d8, 0xf262004e,
-0x6c0695ed, 0x1b01a57b, 0x8208f4c1, 0xf50fc457, 0x65b0d9c6, 0x12b7e950,
-0x8bbeb8ea, 0xfcb9887c, 0x62dd1ddf, 0x15da2d49, 0x8cd37cf3, 0xfbd44c65,
-0x4db26158, 0x3ab551ce, 0xa3bc0074, 0xd4bb30e2, 0x4adfa541, 0x3dd895d7,
-0xa4d1c46d, 0xd3d6f4fb, 0x4369e96a, 0x346ed9fc, 0xad678846, 0xda60b8d0,
-0x44042d73, 0x33031de5, 0xaa0a4c5f, 0xdd0d7cc9, 0x5005713c, 0x270241aa,
-0xbe0b1010, 0xc90c2086, 0x5768b525, 0x206f85b3, 0xb966d409, 0xce61e49f,
-0x5edef90e, 0x29d9c998, 0xb0d09822, 0xc7d7a8b4, 0x59b33d17, 0x2eb40d81,
-0xb7bd5c3b, 0xc0ba6cad, 0xedb88320, 0x9abfb3b6, 0x03b6e20c, 0x74b1d29a,
-0xead54739, 0x9dd277af, 0x04db2615, 0x73dc1683, 0xe3630b12, 0x94643b84,
-0x0d6d6a3e, 0x7a6a5aa8, 0xe40ecf0b, 0x9309ff9d, 0x0a00ae27, 0x7d079eb1,
-0xf00f9344, 0x8708a3d2, 0x1e01f268, 0x6906c2fe, 0xf762575d, 0x806567cb,
-0x196c3671, 0x6e6b06e7, 0xfed41b76, 0x89d32be0, 0x10da7a5a, 0x67dd4acc,
-0xf9b9df6f, 0x8ebeeff9, 0x17b7be43, 0x60b08ed5, 0xd6d6a3e8, 0xa1d1937e,
-0x38d8c2c4, 0x4fdff252, 0xd1bb67f1, 0xa6bc5767, 0x3fb506dd, 0x48b2364b,
-0xd80d2bda, 0xaf0a1b4c, 0x36034af6, 0x41047a60, 0xdf60efc3, 0xa867df55,
-0x316e8eef, 0x4669be79, 0xcb61b38c, 0xbc66831a, 0x256fd2a0, 0x5268e236,
-0xcc0c7795, 0xbb0b4703, 0x220216b9, 0x5505262f, 0xc5ba3bbe, 0xb2bd0b28,
-0x2bb45a92, 0x5cb36a04, 0xc2d7ffa7, 0xb5d0cf31, 0x2cd99e8b, 0x5bdeae1d,
-0x9b64c2b0, 0xec63f226, 0x756aa39c, 0x026d930a, 0x9c0906a9, 0xeb0e363f,
-0x72076785, 0x05005713, 0x95bf4a82, 0xe2b87a14, 0x7bb12bae, 0x0cb61b38,
-0x92d28e9b, 0xe5d5be0d, 0x7cdcefb7, 0x0bdbdf21, 0x86d3d2d4, 0xf1d4e242,
-0x68ddb3f8, 0x1fda836e, 0x81be16cd, 0xf6b9265b, 0x6fb077e1, 0x18b74777,
-0x88085ae6, 0xff0f6a70, 0x66063bca, 0x11010b5c, 0x8f659eff, 0xf862ae69,
-0x616bffd3, 0x166ccf45, 0xa00ae278, 0xd70dd2ee, 0x4e048354, 0x3903b3c2,
-0xa7672661, 0xd06016f7, 0x4969474d, 0x3e6e77db, 0xaed16a4a, 0xd9d65adc,
-0x40df0b66, 0x37d83bf0, 0xa9bcae53, 0xdebb9ec5, 0x47b2cf7f, 0x30b5ffe9,
-0xbdbdf21c, 0xcabac28a, 0x53b39330, 0x24b4a3a6, 0xbad03605, 0xcdd70693,
-0x54de5729, 0x23d967bf, 0xb3667a2e, 0xc4614ab8, 0x5d681b02, 0x2a6f2b94,
-0xb40bbe37, 0xc30c8ea1, 0x5a05df1b, 0x2d02ef8d
-};
diff --git a/src/backend/utils/init/miscinit.c b/src/backend/utils/init/miscinit.c
index 0ecfa272440..c6192aa0c6e 100644
--- a/src/backend/utils/init/miscinit.c
+++ b/src/backend/utils/init/miscinit.c
@@ -8,7 +8,7 @@
*
*
* IDENTIFICATION
- * $Header: /cvsroot/pgsql/src/backend/utils/init/miscinit.c,v 1.61 2001/01/27 00:05:31 tgl Exp $
+ * $Header: /cvsroot/pgsql/src/backend/utils/init/miscinit.c,v 1.62 2001/03/13 01:17:06 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -41,7 +41,8 @@ unsigned char RecodeBackTable[128];
ProcessingMode Mode = InitProcessing;
-/* Note: we rely on this to initialize as zeroes */
+/* Note: we rely on these to initialize as zeroes */
+static char directoryLockFile[MAXPGPATH];
static char socketLockFile[MAXPGPATH];
@@ -458,6 +459,9 @@ GetUserName(Oid userid)
* The path is also just for informational purposes (so that a socket lockfile
* can be more easily traced to the associated postmaster).
*
+ * A data-directory lockfile can optionally contain a third line, containing
+ * the key and ID for the shared memory block used by this postmaster.
+ *
* On successful lockfile creation, a proc_exit callback to remove the
* lockfile is automatically created.
*-------------------------------------------------------------------------
@@ -476,16 +480,18 @@ UnlinkLockFile(int status, Datum filename)
/*
* Create a lockfile, if possible
*
- * Call CreateLockFile with the name of the lockfile to be created. If
- * successful, it returns zero. On detecting a collision, it returns
- * the PID or negated PID of the lockfile owner --- the caller is responsible
- * for producing an appropriate error message.
+ * Call CreateLockFile with the name of the lockfile to be created.
+ * Returns true if successful, false if not (with a message on stderr).
+ *
+ * amPostmaster is used to determine how to encode the output PID.
+ * isDDLock and refName are used to determine what error message to produce.
*/
-static int
-CreateLockFile(const char *filename, bool amPostmaster)
+static bool
+CreateLockFile(const char *filename, bool amPostmaster,
+ bool isDDLock, const char *refName)
{
int fd;
- char buffer[MAXPGPATH + 32];
+ char buffer[MAXPGPATH + 100];
int len;
int encoded_pid;
pid_t other_pid;
@@ -539,11 +545,61 @@ CreateLockFile(const char *filename, bool amPostmaster)
{
if (kill(other_pid, 0) == 0 ||
errno != ESRCH)
- return encoded_pid; /* lockfile belongs to a live process */
+ {
+ /* lockfile belongs to a live process */
+ fprintf(stderr, "Lock file \"%s\" already exists.\n",
+ filename);
+ if (isDDLock)
+ fprintf(stderr,
+ "Is another %s (pid %d) running in \"%s\"?\n",
+ (encoded_pid < 0 ? "postgres" : "postmaster"),
+ other_pid, refName);
+ else
+ fprintf(stderr,
+ "Is another %s (pid %d) using \"%s\"?\n",
+ (encoded_pid < 0 ? "postgres" : "postmaster"),
+ other_pid, refName);
+ return false;
+ }
+ }
+
+ /*
+ * No, the creating process did not exist. However, it could be that
+ * the postmaster crashed (or more likely was kill -9'd by a clueless
+ * admin) but has left orphan backends behind. Check for this by
+ * looking to see if there is an associated shmem segment that is
+ * still in use.
+ */
+ if (isDDLock)
+ {
+ char *ptr;
+ unsigned long shmKey,
+ shmId;
+
+ ptr = strchr(buffer, '\n');
+ if (ptr != NULL &&
+ (ptr = strchr(ptr+1, '\n')) != NULL)
+ {
+ ptr++;
+ if (sscanf(ptr, "%lu %lu", &shmKey, &shmId) == 2)
+ {
+ if (SharedMemoryIsInUse((IpcMemoryKey) shmKey,
+ (IpcMemoryId) shmId))
+ {
+ fprintf(stderr,
+ "Found a pre-existing shared memory block (ID %d) still in use.\n"
+ "If you're sure there are no old backends still running,\n"
+ "remove the shared memory block with ipcrm(1), or just\n"
+ "delete \"%s\".\n",
+ (int) shmId, filename);
+ return false;
+ }
+ }
+ }
}
/*
- * No, the process did not exist. Unlink the file and try again to
+ * Looks like nobody's home. Unlink the file and try again to
* create it. Need a loop because of possible race condition against
* other would-be creators.
*/
@@ -576,28 +632,19 @@ CreateLockFile(const char *filename, bool amPostmaster)
*/
on_proc_exit(UnlinkLockFile, PointerGetDatum(strdup(filename)));
- return 0; /* Success! */
+ return true; /* Success! */
}
bool
CreateDataDirLockFile(const char *datadir, bool amPostmaster)
{
char lockfile[MAXPGPATH];
- int encoded_pid;
snprintf(lockfile, sizeof(lockfile), "%s/postmaster.pid", datadir);
- encoded_pid = CreateLockFile(lockfile, amPostmaster);
- if (encoded_pid != 0)
- {
- fprintf(stderr, "Lock file \"%s\" already exists.\n", lockfile);
- if (encoded_pid < 0)
- fprintf(stderr, "Is another postgres (pid %d) running in \"%s\"?\n",
- -encoded_pid, datadir);
- else
- fprintf(stderr, "Is another postmaster (pid %d) running in \"%s\"?\n",
- encoded_pid, datadir);
+ if (! CreateLockFile(lockfile, amPostmaster, true, datadir))
return false;
- }
+ /* Save name of lockfile for RecordSharedMemoryInLockFile */
+ strcpy(directoryLockFile, lockfile);
return true;
}
@@ -605,21 +652,10 @@ bool
CreateSocketLockFile(const char *socketfile, bool amPostmaster)
{
char lockfile[MAXPGPATH];
- int encoded_pid;
snprintf(lockfile, sizeof(lockfile), "%s.lock", socketfile);
- encoded_pid = CreateLockFile(lockfile, amPostmaster);
- if (encoded_pid != 0)
- {
- fprintf(stderr, "Lock file \"%s\" already exists.\n", lockfile);
- if (encoded_pid < 0)
- fprintf(stderr, "Is another postgres (pid %d) using \"%s\"?\n",
- -encoded_pid, socketfile);
- else
- fprintf(stderr, "Is another postmaster (pid %d) using \"%s\"?\n",
- encoded_pid, socketfile);
+ if (! CreateLockFile(lockfile, amPostmaster, false, socketfile))
return false;
- }
/* Save name of lockfile for TouchSocketLockFile */
strcpy(socketLockFile, lockfile);
return true;
@@ -650,6 +686,78 @@ TouchSocketLockFile(void)
}
}
+/*
+ * Append information about a shared memory segment to the data directory
+ * lock file (if we have created one).
+ *
+ * This may be called multiple times in the life of a postmaster, if we
+ * delete and recreate shmem due to backend crash. Therefore, be prepared
+ * to overwrite existing information. (As of 7.1, a postmaster only creates
+ * one shm seg anyway; but for the purposes here, if we did have more than
+ * one then any one of them would do anyway.)
+ */
+void
+RecordSharedMemoryInLockFile(IpcMemoryKey shmKey, IpcMemoryId shmId)
+{
+ int fd;
+ int len;
+ char *ptr;
+ char buffer[BLCKSZ];
+
+ /*
+ * Do nothing if we did not create a lockfile (probably because we
+ * are running standalone).
+ */
+ if (directoryLockFile[0] == '\0')
+ return;
+
+ fd = open(directoryLockFile, O_RDWR | PG_BINARY, 0);
+ if (fd < 0)
+ {
+ elog(DEBUG, "Failed to rewrite %s: %m", directoryLockFile);
+ return;
+ }
+ len = read(fd, buffer, sizeof(buffer) - 100);
+ if (len <= 0)
+ {
+ elog(DEBUG, "Failed to read %s: %m", directoryLockFile);
+ close(fd);
+ return;
+ }
+ buffer[len] = '\0';
+ /*
+ * Skip over first two lines (PID and path).
+ */
+ ptr = strchr(buffer, '\n');
+ if (ptr == NULL ||
+ (ptr = strchr(ptr+1, '\n')) == NULL)
+ {
+ elog(DEBUG, "Bogus data in %s", directoryLockFile);
+ close(fd);
+ return;
+ }
+ ptr++;
+ /*
+ * Append shm key and ID. Format to try to keep it the same length
+ * always (trailing junk won't hurt, but might confuse humans).
+ */
+ sprintf(ptr, "%9lu %9lu\n",
+ (unsigned long) shmKey, (unsigned long) shmId);
+ /*
+ * And rewrite the data. Since we write in a single kernel call,
+ * this update should appear atomic to onlookers.
+ */
+ len = strlen(buffer);
+ if (lseek(fd, (off_t) 0, SEEK_SET) != 0 ||
+ (int) write(fd, buffer, len) != len)
+ {
+ elog(DEBUG, "Failed to write %s: %m", directoryLockFile);
+ close(fd);
+ return;
+ }
+ close(fd);
+}
+
/*-------------------------------------------------------------------------
* Version checking support
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index b2853917ac3..7e6769d3c89 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -4,7 +4,7 @@
* Support for grand unified configuration scheme, including SET
* command, configuration file, and command line options.
*
- * $Header: /cvsroot/pgsql/src/backend/utils/misc/guc.c,v 1.31 2001/02/26 00:50:07 tgl Exp $
+ * $Header: /cvsroot/pgsql/src/backend/utils/misc/guc.c,v 1.32 2001/03/13 01:17:06 tgl Exp $
*
* Copyright 2000 by PostgreSQL Global Development Group
* Written by Peter Eisentraut <peter_e@gmx.net>.
@@ -36,6 +36,7 @@
/* XXX should be in a header file */
extern bool Log_connections;
+extern int CheckPointSegments;
extern int CheckPointTimeout;
extern int XLOGbuffers;
extern int XLOGfiles;
@@ -279,13 +280,16 @@ ConfigureNamesInt[] =
{"unix_socket_permissions", PGC_POSTMASTER, &Unix_socket_permissions,
0777, 0000, 0777},
- {"checkpoint_timeout", PGC_POSTMASTER, &CheckPointTimeout,
+ {"checkpoint_segments", PGC_SIGHUP, &CheckPointSegments,
+ 3, 1, INT_MAX},
+
+ {"checkpoint_timeout", PGC_SIGHUP, &CheckPointTimeout,
300, 30, 3600},
{"wal_buffers", PGC_POSTMASTER, &XLOGbuffers,
8, 4, INT_MAX},
- {"wal_files", PGC_POSTMASTER, &XLOGfiles,
+ {"wal_files", PGC_SIGHUP, &XLOGfiles,
0, 0, 64},
{"wal_debug", PGC_SUSET, &XLOG_DEBUG,
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index f599d97cff4..88d1fe94370 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -110,6 +110,7 @@
#wal_debug = 0 # range 0-16
#commit_delay = 0 # range 0-100000
#commit_siblings = 5 # range 1-1000
+#checkpoint_segments = 3 # in logfile segments (16MB each), min 1
#checkpoint_timeout = 300 # in seconds, range 30-3600
diff --git a/src/include/access/transam.h b/src/include/access/transam.h
index 620f6e59105..460de699886 100644
--- a/src/include/access/transam.h
+++ b/src/include/access/transam.h
@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $Id: transam.h,v 1.29 2001/01/24 19:43:19 momjian Exp $
+ * $Id: transam.h,v 1.30 2001/03/13 01:17:06 tgl Exp $
*
* NOTES
* Transaction System Version 101 now support proper oid
@@ -79,7 +79,7 @@ typedef unsigned char XidStatus;/* (2 bits) */
* their numbering at 512.
*
* The first 4 bytes of this relation store the version
- * number of the transction system.
+ * number of the transaction system.
* ----------------
*/
typedef struct LogRelationContentsData
@@ -100,13 +100,16 @@ typedef LogRelationContentsData *LogRelationContents;
* is updated in place whenever the variables change.
*
* The first 4 bytes of this relation store the version
- * number of the transction system.
+ * number of the transaction system.
*
* Currently, the relation has only one page and the next
* available xid, the last committed xid and the next
* available oid are stored there.
+ *
+ * XXX As of 7.1, pg_variable isn't used anymore; this is dead code.
* ----------------
*/
+#ifdef NOT_USED
typedef struct VariableRelationContentsData
{
XLogRecPtr LSN;
@@ -117,6 +120,7 @@ typedef struct VariableRelationContentsData
} VariableRelationContentsData;
typedef VariableRelationContentsData *VariableRelationContents;
+#endif /* NOT_USED */
/*
* VariableCache is placed in shmem and used by
@@ -124,8 +128,9 @@ typedef VariableRelationContentsData *VariableRelationContents;
*/
typedef struct VariableCacheData
{
- TransactionId nextXid;
- Oid nextOid;
+ TransactionId nextXid; /* next XID to assign */
+ uint32 xidCount; /* XIDs available before must do XLOG work */
+ Oid nextOid; /* and similarly for OIDs */
uint32 oidCount;
} VariableCacheData;
@@ -184,7 +189,8 @@ extern int RecoveryCheckingEnableState;
extern bool AMI_OVERRIDE;
/* in varsup.c */
-extern int OidGenLockId;
+extern SPINLOCK OidGenLockId;
+extern SPINLOCK XidGenLockId;
extern VariableCache ShmemVariableCache;
#endif /* TRAMSAM_H */
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index c17bf32cc55..dd079496ed6 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -3,7 +3,10 @@
*
* PostgreSQL transaction log manager
*
- * $Header: /cvsroot/pgsql/src/include/access/xlog.h,v 1.18 2001/02/26 00:50:07 tgl Exp $
+ * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * $Id: xlog.h,v 1.19 2001/03/13 01:17:06 tgl Exp $
*/
#ifndef XLOG_H
#define XLOG_H
@@ -11,97 +14,126 @@
#include "access/rmgr.h"
#include "access/transam.h"
#include "access/xlogdefs.h"
-#include "access/xlogutils.h"
+#include "utils/pg_crc.h"
-typedef struct crc64
-{
- uint32 crc1;
- uint32 crc2;
-} crc64;
+/*
+ * Header for each record in XLOG
+ *
+ * NOTE: xl_len counts only the rmgr data, not the XLogRecord header,
+ * and also not any backup blocks appended to the record (which are signaled
+ * by xl_info flag bits). The total space needed for an XLOG record is
+ * really:
+ *
+ * SizeOfXLogRecord + xl_len + n_backup_blocks * (sizeof(BkpBlock) + BLCKSZ)
+ */
typedef struct XLogRecord
{
- crc64 xl_crc;
+ crc64 xl_crc; /* CRC for this record */
XLogRecPtr xl_prev; /* ptr to previous record in log */
XLogRecPtr xl_xact_prev; /* ptr to previous record of this xact */
TransactionId xl_xid; /* xact id */
- uint16 xl_len; /* total len of record *data* */
- uint8 xl_info;
- RmgrId xl_rmid; /* resource manager inserted this record */
+ uint16 xl_len; /* total len of rmgr data */
+ uint8 xl_info; /* flag bits, see below */
+ RmgrId xl_rmid; /* resource manager for this record */
/* ACTUAL LOG DATA FOLLOWS AT END OF STRUCT */
} XLogRecord;
-#define SizeOfXLogRecord DOUBLEALIGN(sizeof(XLogRecord))
-#define MAXLOGRECSZ (2 * BLCKSZ)
+#define SizeOfXLogRecord MAXALIGN(sizeof(XLogRecord))
+#define MAXLOGRECSZ 65535 /* the most that'll fit in xl_len */
-#define XLogRecGetData(record) \
- ((char*)record + SizeOfXLogRecord)
+#define XLogRecGetData(record) ((char*) (record) + SizeOfXLogRecord)
/*
- * When there is no space on current page we continue
- * on the next page with subrecord.
+ * XLOG uses only low 4 bits of xl_info. High 4 bits may be used by rmgr.
*/
-typedef struct XLogSubRecord
-{
- uint16 xl_len; /* len of data left */
-
- /* ACTUAL LOG DATA FOLLOWS AT END OF STRUCT */
-
-} XLogSubRecord;
-
-#define SizeOfXLogSubRecord DOUBLEALIGN(sizeof(XLogSubRecord))
+#define XLR_INFO_MASK 0x0F
/*
- * XLOG uses only low 4 bits of xl_info.
- * High 4 bits may be used by rmgr...
- *
- * We support backup of 2 blocks per record only.
- * If we backed up some of these blocks then we use
- * flags below to signal rmgr about this on recovery.
+ * We support backup of up to 2 disk blocks per XLOG record (could support
+ * more if we cared to dedicate more xl_info bits for this purpose; currently
+ * do not need more than 2 anyway). If we backed up any disk blocks then we
+ * use flag bits in xl_info to signal it.
*/
-#define XLR_SET_BKP_BLOCK(iblk) (0x08 >> iblk)
+#define XLR_BKP_BLOCK_MASK 0x0C /* all info bits used for bkp blocks */
+#define XLR_MAX_BKP_BLOCKS 2
+#define XLR_SET_BKP_BLOCK(iblk) (0x08 >> (iblk))
#define XLR_BKP_BLOCK_1 XLR_SET_BKP_BLOCK(0) /* 0x08 */
#define XLR_BKP_BLOCK_2 XLR_SET_BKP_BLOCK(1) /* 0x04 */
-#define XLR_INFO_MASK 0x0F
/*
* Sometimes we log records which are out of transaction control.
- * Rmgr may use flag below for this purpose.
+ * Rmgr may "or" XLOG_NO_TRAN into info passed to XLogInsert to indicate this.
*/
#define XLOG_NO_TRAN XLR_INFO_MASK
-#define XLOG_PAGE_MAGIC 0x17345168
+/*
+ * Header info for a backup block appended to an XLOG record.
+ *
+ * Note that the backup block has its own CRC, and is not covered by
+ * the CRC of the XLOG record proper. Also note that we don't attempt
+ * to align either the BkpBlock struct or the block's data.
+ */
+typedef struct BkpBlock
+{
+ crc64 crc;
+ RelFileNode node;
+ BlockNumber block;
+} BkpBlock;
-typedef struct XLogPageHeaderData
+/*
+ * When there is not enough space on current page for whole record, we
+ * continue on the next page with continuation record. (However, the
+ * XLogRecord header will never be split across pages; if there's less than
+ * SizeOfXLogRecord space left at the end of a page, we just waste it.)
+ *
+ * Note that xl_rem_len includes backup-block data, unlike xl_len in the
+ * initial header.
+ */
+typedef struct XLogContRecord
{
- uint32 xlp_magic;
- uint16 xlp_info;
-} XLogPageHeaderData;
+ uint32 xl_rem_len; /* total len of remaining data for record */
-#define SizeOfXLogPHD DOUBLEALIGN(sizeof(XLogPageHeaderData))
+ /* ACTUAL LOG DATA FOLLOWS AT END OF STRUCT */
-typedef XLogPageHeaderData *XLogPageHeader;
+} XLogContRecord;
-/* When record crosses page boundary */
-#define XLP_FIRST_IS_SUBRECORD 0x0001
+#define SizeOfXLogContRecord MAXALIGN(sizeof(XLogContRecord))
-#define XLByteLT(left, right) \
- (right.xlogid > left.xlogid || \
- (right.xlogid == left.xlogid && right.xrecoff > left.xrecoff))
+/*
+ * Each page of XLOG file has a header like this:
+ */
+#define XLOG_PAGE_MAGIC 0x17345169 /* can be used as WAL version indicator */
-#define XLByteLE(left, right) \
- (right.xlogid > left.xlogid || \
- (right.xlogid == left.xlogid && right.xrecoff >= left.xrecoff))
+typedef struct XLogPageHeaderData
+{
+ uint32 xlp_magic; /* magic value for correctness checks */
+ uint16 xlp_info; /* flag bits, see below */
+} XLogPageHeaderData;
-#define XLByteEQ(left, right) \
- (right.xlogid == left.xlogid && right.xrecoff == left.xrecoff)
+#define SizeOfXLogPHD MAXALIGN(sizeof(XLogPageHeaderData))
-extern StartUpID ThisStartUpID; /* current SUI */
-extern bool InRecovery;
-extern XLogRecPtr MyLastRecPtr;
+typedef XLogPageHeaderData *XLogPageHeader;
+
+/* When record crosses page boundary, set this flag in new page's header */
+#define XLP_FIRST_IS_CONTRECORD 0x0001
+
+/*
+ * We break each logical log file (xlogid value) into 16Mb segments.
+ * One possible segment at the end of each log file is wasted, to ensure
+ * that we don't have problems representing last-byte-position-plus-1.
+ */
+#define XLogSegSize ((uint32) (16*1024*1024))
+#define XLogSegsPerFile (((uint32) 0xffffffff) / XLogSegSize)
+#define XLogFileSize (XLogSegsPerFile * XLogSegSize)
+/*
+ * Method table for resource managers.
+ *
+ * RmgrTable[] is indexed by RmgrId values (see rmgr.h).
+ */
typedef struct RmgrData
{
char *rm_name;
@@ -112,12 +144,19 @@ typedef struct RmgrData
extern RmgrData RmgrTable[];
-/*
- * List of these structs is used to pass data to XLOG.
- * If buffer is valid then XLOG will check if buffer must
- * be backup-ed. For backup-ed buffer data will not be
- * inserted into record (and XLOG sets
- * XLR_BKP_BLOCK_X bit in xl_info).
+/*--------------------
+ * List of these structs is used to pass data to XLogInsert().
+ *
+ * If buffer is valid then XLOG will check if buffer must be backed up
+ * (ie, whether this is first change of that page since last checkpoint).
+ * If so, the whole page contents are attached to the XLOG record, and XLOG
+ * sets XLR_BKP_BLOCK_X bit in xl_info. Note that the buffer must be pinned
+ * and locked while this is going on, so that it won't change under us.
+ * NB: when this happens, we do not bother to insert the associated data into
+ * the XLOG record, since we assume it's present in the buffer. Therefore,
+ * rmgr redo routines MUST pay attention to XLR_BKP_BLOCK_X to know what
+ * is actually stored in the XLOG record.
+ *--------------------
*/
typedef struct XLogRecData
{
@@ -127,11 +166,13 @@ typedef struct XLogRecData
struct XLogRecData *next;
} XLogRecData;
+extern StartUpID ThisStartUpID; /* current SUI */
+extern bool InRecovery;
+extern XLogRecPtr MyLastRecPtr;
+
extern XLogRecPtr XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata);
extern void XLogFlush(XLogRecPtr RecPtr);
-extern void CreateCheckPoint(bool shutdown);
-
extern void xlog_redo(XLogRecPtr lsn, XLogRecord *record);
extern void xlog_undo(XLogRecPtr lsn, XLogRecord *record);
extern void xlog_desc(char *buf, uint8 xl_info, char* rec);
@@ -145,6 +186,10 @@ extern void StartupXLOG(void);
extern void ShutdownXLOG(void);
extern void CreateCheckPoint(bool shutdown);
extern void SetThisStartUpID(void);
+extern void XLogPutNextXid(TransactionId nextXid);
+extern void XLogPutNextOid(Oid nextOid);
+extern void SetRedoRecPtr(void);
+extern void GetRedoRecPtr(void);
/* in storage/ipc/sinval.c, but don't want to declare in sinval.h because
* we'd have to include xlog.h into that ...
diff --git a/src/include/access/xlogdefs.h b/src/include/access/xlogdefs.h
index ce1b3ef8cf6..bc7f9e1a36d 100644
--- a/src/include/access/xlogdefs.h
+++ b/src/include/access/xlogdefs.h
@@ -1,21 +1,56 @@
/*
- *
* xlogdefs.h
*
* Postgres transaction log manager record pointer and
- * system stratup number definitions
+ * system startup number definitions
+ *
+ * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
*
+ * $Id: xlogdefs.h,v 1.2 2001/03/13 01:17:06 tgl Exp $
*/
#ifndef XLOG_DEFS_H
#define XLOG_DEFS_H
+/*
+ * Pointer to a location in the XLOG. These pointers are 64 bits wide,
+ * because we don't want them ever to overflow.
+ *
+ * NOTE: xrecoff == 0 is used to indicate an invalid pointer. This is OK
+ * because we use page headers in the XLOG, so no XLOG record can start
+ * right at the beginning of a file.
+ *
+ * NOTE: the "log file number" is somewhat misnamed, since the actual files
+ * making up the XLOG are much smaller than 4Gb. Each actual file is an
+ * XLogSegSize-byte "segment" of a logical log file having the indicated
+ * xlogid. The log file number and segment number together identify a
+ * physical XLOG file. Segment number and offset within the physical file
+ * are computed from xrecoff div and mod XLogSegSize.
+ */
typedef struct XLogRecPtr
{
uint32 xlogid; /* log file #, 0 based */
- uint32 xrecoff; /* offset of record in log file */
+ uint32 xrecoff; /* byte offset of location in log file */
} XLogRecPtr;
/*
+ * Macros for comparing XLogRecPtrs
+ *
+ * Beware of passing expressions with side-effects to these macros,
+ * since the arguments may be evaluated multiple times.
+ */
+#define XLByteLT(a, b) \
+ ((a).xlogid < (b).xlogid || \
+ ((a).xlogid == (b).xlogid && (a).xrecoff < (b).xrecoff))
+
+#define XLByteLE(a, b) \
+ ((a).xlogid < (b).xlogid || \
+ ((a).xlogid == (b).xlogid && (a).xrecoff <= (b).xrecoff))
+
+#define XLByteEQ(a, b) \
+ ((a).xlogid == (b).xlogid && (a).xrecoff == (b).xrecoff)
+
+/*
* StartUpID (SUI) - system startups counter. It's to allow removing
* pg_log after shutdown, in future.
*/
diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h
index 016381f0d2b..1f1fff7c07f 100644
--- a/src/include/access/xlogutils.h
+++ b/src/include/access/xlogutils.h
@@ -1,3 +1,13 @@
+/*
+ * xlogutils.h
+ *
+ * PostgreSQL transaction log manager utility routines
+ *
+ * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * $Id: xlogutils.h,v 1.6 2001/03/13 01:17:06 tgl Exp $
+ */
#ifndef XLOG_UTILS_H
#define XLOG_UTILS_H
diff --git a/src/include/catalog/pg_control.h b/src/include/catalog/pg_control.h
new file mode 100644
index 00000000000..97d0e13e783
--- /dev/null
+++ b/src/include/catalog/pg_control.h
@@ -0,0 +1,115 @@
+/*-------------------------------------------------------------------------
+ *
+ * pg_control.h
+ * The system control file "pg_control" is not a heap relation.
+ * However, we define it here so that the format is documented.
+ *
+ *
+ * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * $Id: pg_control.h,v 1.1 2001/03/13 01:17:06 tgl Exp $
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef PG_CONTROL_H
+#define PG_CONTROL_H
+
+#include <time.h>
+
+#include "access/xlogdefs.h"
+#include "utils/pg_crc.h"
+
+
+/* Version identifier for this pg_control format */
+#define PG_CONTROL_VERSION 71
+
+/*
+ * Body of CheckPoint XLOG records. This is declared here because we keep
+ * a copy of the latest one in pg_control for possible disaster recovery.
+ */
+typedef struct CheckPoint
+{
+ XLogRecPtr redo; /* next RecPtr available when we */
+ /* began to create CheckPoint */
+ /* (i.e. REDO start point) */
+ XLogRecPtr undo; /* first record of oldest in-progress */
+ /* transaction when we started */
+ /* (i.e. UNDO end point) */
+ StartUpID ThisStartUpID; /* current SUI */
+ TransactionId nextXid; /* next free XID */
+ Oid nextOid; /* next free OID */
+ time_t time; /* time stamp of checkpoint */
+} CheckPoint;
+
+/* XLOG info values for XLOG rmgr */
+#define XLOG_CHECKPOINT_SHUTDOWN 0x00
+#define XLOG_CHECKPOINT_ONLINE 0x10
+#define XLOG_NEXTXID 0x20
+#define XLOG_NEXTOID 0x30
+
+
+/* System status indicator */
+typedef enum DBState
+{
+ DB_STARTUP = 0,
+ DB_SHUTDOWNED,
+ DB_SHUTDOWNING,
+ DB_IN_RECOVERY,
+ DB_IN_PRODUCTION
+} DBState;
+
+#define LOCALE_NAME_BUFLEN 128
+
+/*
+ * Contents of pg_control.
+ *
+ * NOTE: try to keep this under 512 bytes so that it will fit on one physical
+ * sector of typical disk drives. This reduces the odds of corruption due to
+ * power failure midway through a write. Currently it fits comfortably,
+ * but we could probably reduce LOCALE_NAME_BUFLEN if things get tight.
+ */
+
+typedef struct ControlFileData
+{
+ crc64 crc; /* CRC for remainder of struct */
+
+ /*
+ * Version identifier information. Keep these fields at the front,
+ * especially pg_control_version; they won't be real useful if they
+ * move around.
+ *
+ * pg_control_version identifies the format of pg_control itself.
+ * catalog_version_no identifies the format of the system catalogs.
+ *
+ * There are additional version identifiers in individual files;
+ * for example, WAL logs contain per-page magic numbers that can serve
+ * as version cues for the WAL log.
+ */
+ uint32 pg_control_version; /* PG_CONTROL_VERSION */
+ uint32 catalog_version_no; /* see catversion.h */
+
+ /*
+ * System status data
+ */
+ DBState state; /* see enum above */
+ time_t time; /* time stamp of last pg_control update */
+ uint32 logId; /* current log file id */
+ uint32 logSeg; /* current log file segment, + 1 */
+ XLogRecPtr checkPoint; /* last check point record ptr */
+ XLogRecPtr prevCheckPoint; /* previous check point record ptr */
+
+ CheckPoint checkPointCopy; /* copy of last check point record */
+
+ /*
+ * This data is used to make sure that configuration of this database
+ * is compatible with the backend executable.
+ */
+ uint32 blcksz; /* block size for this DB */
+ uint32 relseg_size; /* blocks per segment of large relation */
+ /* active locales --- "C" if compiled without USE_LOCALE: */
+ char lc_collate[LOCALE_NAME_BUFLEN];
+ char lc_ctype[LOCALE_NAME_BUFLEN];
+} ControlFileData;
+
+#endif /* PG_CONTROL_H */
diff --git a/src/include/miscadmin.h b/src/include/miscadmin.h
index 46d7ab534ea..851174f3966 100644
--- a/src/include/miscadmin.h
+++ b/src/include/miscadmin.h
@@ -12,7 +12,7 @@
* Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $Id: miscadmin.h,v 1.81 2001/02/10 02:31:28 tgl Exp $
+ * $Id: miscadmin.h,v 1.82 2001/03/13 01:17:06 tgl Exp $
*
* NOTES
* some of the information in this file should be moved to
@@ -283,6 +283,8 @@ extern ProcessingMode Mode;
extern bool CreateDataDirLockFile(const char *datadir, bool amPostmaster);
extern bool CreateSocketLockFile(const char *socketfile, bool amPostmaster);
extern void TouchSocketLockFile(void);
+extern void RecordSharedMemoryInLockFile(IpcMemoryKey shmKey,
+ IpcMemoryId shmId);
extern void ValidatePgVersion(const char *path);
diff --git a/src/include/storage/ipc.h b/src/include/storage/ipc.h
index 3173121850d..f1e1e6096cd 100644
--- a/src/include/storage/ipc.h
+++ b/src/include/storage/ipc.h
@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $Id: ipc.h,v 1.47 2001/02/10 02:31:28 tgl Exp $
+ * $Id: ipc.h,v 1.48 2001/03/13 01:17:06 tgl Exp $
*
* Some files that would normally need to include only sys/ipc.h must
* instead include this file because on Ultrix, sys/ipc.h is not designed
@@ -105,6 +105,8 @@ extern int IpcSemaphoreGetValue(IpcSemaphoreId semId, int sem);
extern PGShmemHeader *IpcMemoryCreate(uint32 size, bool makePrivate,
int permission);
+extern bool SharedMemoryIsInUse(IpcMemoryKey shmKey, IpcMemoryId shmId);
+
/* ipci.c */
extern void CreateSharedMemoryAndSemaphores(bool makePrivate,
int maxBackends);
diff --git a/src/include/tcop/tcopprot.h b/src/include/tcop/tcopprot.h
index 52d719c6c5f..94af6e71133 100644
--- a/src/include/tcop/tcopprot.h
+++ b/src/include/tcop/tcopprot.h
@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $Id: tcopprot.h,v 1.38 2001/01/24 19:43:28 momjian Exp $
+ * $Id: tcopprot.h,v 1.39 2001/03/13 01:17:06 tgl Exp $
*
* OLD COMMENTS
* This file was created so that other c files could get the two
@@ -41,6 +41,7 @@ extern void pg_exec_query_string(char *query_string,
#endif /* BOOTSTRAP_INCLUDE */
extern void die(SIGNAL_ARGS);
+extern void quickdie(SIGNAL_ARGS);
extern int PostgresMain(int argc, char *argv[],
int real_argc, char *real_argv[], const char *username);
extern void ResetUsage(void);
diff --git a/src/include/utils/pg_crc.h b/src/include/utils/pg_crc.h
new file mode 100644
index 00000000000..18e012bbcd0
--- /dev/null
+++ b/src/include/utils/pg_crc.h
@@ -0,0 +1,115 @@
+/*
+ * pg_crc.h
+ *
+ * PostgreSQL 64-bit CRC support
+ *
+ * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * $Id: pg_crc.h,v 1.1 2001/03/13 01:17:06 tgl Exp $
+ */
+#ifndef PG_CRC_H
+#define PG_CRC_H
+
+/*
+ * If we have a 64-bit integer type, then a 64-bit CRC looks just like the
+ * usual sort of implementation. (See Ross Williams' excellent introduction
+ * A PAINLESS GUIDE TO CRC ERROR DETECTION ALGORITHMS, available from
+ * ftp://ftp.rocksoft.com/papers/crc_v3.txt or several other net sites.)
+ * If we have no working 64-bit type, then fake it with two 32-bit registers.
+ *
+ * The present implementation is a normal (not "reflected", in Williams'
+ * terms) 64-bit CRC, using initial all-ones register contents and a final
+ * bit inversion. The chosen polynomial is borrowed from the DLT1 spec
+ * (ECMA-182, available from http://www.ecma.ch/ecma1/STAND/ECMA-182.HTM):
+ *
+ * x^64 + x^62 + x^57 + x^55 + x^54 + x^53 + x^52 + x^47 + x^46 + x^45 +
+ * x^40 + x^39 + x^38 + x^37 + x^35 + x^33 + x^32 + x^31 + x^29 + x^27 +
+ * x^24 + x^23 + x^22 + x^21 + x^19 + x^17 + x^13 + x^12 + x^10 + x^9 +
+ * x^7 + x^4 + x + 1
+ */
+
+#ifdef INT64_IS_BUSTED
+
+/*
+ * crc0 represents the LSBs of the 64-bit value, crc1 the MSBs. Note that
+ * with crc0 placed first, the output of 32-bit and 64-bit implementations
+ * will be bit-compatible only on little-endian architectures. If it were
+ * important to make the two possible implementations bit-compatible on
+ * all machines, we could do a configure test to decide how to order the
+ * two fields, but it seems not worth the trouble.
+ */
+typedef struct crc64
+{
+ uint32 crc0;
+ uint32 crc1;
+} crc64;
+
+/* Initialize a CRC accumulator */
+#define INIT_CRC64(crc) ((crc).crc0 = 0xffffffff, (crc).crc1 = 0xffffffff)
+
+/* Finish a CRC calculation */
+#define FIN_CRC64(crc) ((crc).crc0 ^= 0xffffffff, (crc).crc1 ^= 0xffffffff)
+
+/* Accumulate some (more) bytes into a CRC */
+#define COMP_CRC64(crc, data, len) \
+do { \
+ uint32 __crc0 = (crc).crc0; \
+ uint32 __crc1 = (crc).crc1; \
+ unsigned char *__data = (unsigned char *) (data); \
+ uint32 __len = (len); \
+\
+ while (__len-- > 0) \
+ { \
+ int __tab_index = ((int) (__crc1 >> 24) ^ *__data++) & 0xFF; \
+ __crc1 = crc_table1[__tab_index] ^ ((__crc1 << 8) | (__crc0 >> 24)); \
+ __crc0 = crc_table0[__tab_index] ^ (__crc0 << 8); \
+ } \
+ (crc).crc0 = __crc0; \
+ (crc).crc1 = __crc1; \
+} while (0)
+
+/* Check for equality of two CRCs */
+#define EQ_CRC64(c1,c2) ((c1).crc0 == (c2).crc0 && (c1).crc1 == (c2).crc1)
+
+/* Constant table for CRC calculation */
+extern const uint32 crc_table0[];
+extern const uint32 crc_table1[];
+
+#else /* int64 works */
+
+typedef struct crc64
+{
+ uint64 crc0;
+} crc64;
+
+/* Initialize a CRC accumulator */
+#define INIT_CRC64(crc) ((crc).crc0 = (uint64) 0xffffffffffffffff)
+
+/* Finish a CRC calculation */
+#define FIN_CRC64(crc) ((crc).crc0 ^= (uint64) 0xffffffffffffffff)
+
+/* Accumulate some (more) bytes into a CRC */
+#define COMP_CRC64(crc, data, len) \
+do { \
+ uint64 __crc0 = (crc).crc0; \
+ unsigned char *__data = (unsigned char *) (data); \
+ uint32 __len = (len); \
+\
+ while (__len-- > 0) \
+ { \
+ int __tab_index = ((int) (__crc0 >> 56) ^ *__data++) & 0xFF; \
+ __crc0 = crc_table[__tab_index] ^ (__crc0 << 8); \
+ } \
+ (crc).crc0 = __crc0; \
+} while (0)
+
+/* Check for equality of two CRCs */
+#define EQ_CRC64(c1,c2) ((c1).crc0 == (c2).crc0)
+
+/* Constant table for CRC calculation */
+extern const uint64 crc_table[];
+
+#endif /* INT64_IS_BUSTED */
+
+#endif /* PG_CRC_H */