aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/transam/xlog.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/transam/xlog.c')
-rw-r--r--src/backend/access/transam/xlog.c325
1 files changed, 148 insertions, 177 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 56a0e2ba6bb..057ee72d55a 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $Header: /cvsroot/pgsql/src/backend/access/transam/xlog.c,v 1.77 2001/09/26 20:24:02 tgl Exp $
+ * $Header: /cvsroot/pgsql/src/backend/access/transam/xlog.c,v 1.78 2001/09/29 04:02:21 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -33,11 +33,11 @@
#include "access/xlogutils.h"
#include "catalog/catversion.h"
#include "catalog/pg_control.h"
-#include "storage/sinval.h"
+#include "storage/bufpage.h"
+#include "storage/lwlock.h"
#include "storage/proc.h"
+#include "storage/sinval.h"
#include "storage/spin.h"
-#include "storage/s_lock.h"
-#include "storage/bufpage.h"
#include "utils/builtins.h"
#include "utils/relcache.h"
#include "utils/selfuncs.h"
@@ -86,11 +86,6 @@
#endif
-/* Max time to wait to acquire XLog activity locks */
-#define XLOG_LOCK_TIMEOUT (5*60*1000000) /* 5 minutes */
-/* Max time to wait to acquire checkpoint lock */
-#define CHECKPOINT_LOCK_TIMEOUT (20*60*1000000) /* 20 minutes */
-
/* User-settable parameters */
int CheckPointSegments = 3;
int XLOGbuffers = 8;
@@ -155,13 +150,10 @@ static XLogRecPtr ProcLastRecPtr = {0, 0};
* (which is almost but not quite the same as a pointer to the most recent
* CHECKPOINT record). We update this from the shared-memory copy,
* XLogCtl->Insert.RedoRecPtr, whenever we can safely do so (ie, when we
- * hold the Insert spinlock). See XLogInsert for details.
+ * hold the Insert lock). See XLogInsert for details.
*/
static XLogRecPtr RedoRecPtr;
-/* This lock must be held to read/update control file or create new log file */
-SPINLOCK ControlFileLockId;
-
/*----------
* Shared-memory data structures for XLOG control
*
@@ -171,24 +163,24 @@ SPINLOCK ControlFileLockId;
* These structs are identical but are declared separately to indicate their
* slightly different functions.
*
- * We do a lot of pushups to minimize the amount of access to spinlocked
+ * We do a lot of pushups to minimize the amount of access to lockable
* shared memory values. There are actually three shared-memory copies of
* LogwrtResult, plus one unshared copy in each backend. Here's how it works:
* XLogCtl->LogwrtResult is protected by info_lck
- * XLogCtl->Write.LogwrtResult is protected by logwrt_lck
- * XLogCtl->Insert.LogwrtResult is protected by insert_lck
- * One must hold the associated spinlock to read or write any of these, but
- * of course no spinlock is needed to read/write the unshared LogwrtResult.
+ * XLogCtl->Write.LogwrtResult is protected by WALWriteLock
+ * XLogCtl->Insert.LogwrtResult is protected by WALInsertLock
+ * One must hold the associated lock to read or write any of these, but
+ * of course no lock is needed to read/write the unshared LogwrtResult.
*
* XLogCtl->LogwrtResult and XLogCtl->Write.LogwrtResult are both "always
* right", since both are updated by a write or flush operation before
- * it releases logwrt_lck. The point of keeping XLogCtl->Write.LogwrtResult
- * is that it can be examined/modified by code that already holds logwrt_lck
+ * it releases WALWriteLock. The point of keeping XLogCtl->Write.LogwrtResult
+ * is that it can be examined/modified by code that already holds WALWriteLock
* without needing to grab info_lck as well.
*
* XLogCtl->Insert.LogwrtResult may lag behind the reality of the other two,
* but is updated when convenient. Again, it exists for the convenience of
- * code that is already holding insert_lck but not the other locks.
+ * code that is already holding WALInsertLock but not the other locks.
*
* The unshared LogwrtResult may lag behind any or all of these, and again
* is updated when convenient.
@@ -199,6 +191,24 @@ SPINLOCK ControlFileLockId;
* Note that this all works because the request and result positions can only
* advance forward, never back up, and so we can easily determine which of two
* values is "more up to date".
+ *
+ * info_lck is only held long enough to read/update the protected variables,
+ * so it's a plain spinlock. The other locks are held longer (potentially
+ * over I/O operations), so we use LWLocks for them. These locks are:
+ *
+ * WALInsertLock: must be held to insert a record into the WAL buffers.
+ *
+ * WALWriteLock: must be held to write WAL buffers to disk (XLogWrite or
+ * XLogFlush).
+ *
+ * ControlFileLock: must be held to read/update control file or create
+ * new log file.
+ *
+ * CheckpointLock: must be held to do a checkpoint (ensures only one
+ * checkpointer at a time; even though the postmaster won't launch
+ * parallel checkpoint processes, we need this because manual checkpoints
+ * could be launched simultaneously).
+ *
*----------
*/
typedef struct XLogwrtRqst
@@ -240,18 +250,18 @@ typedef struct XLogCtlWrite
*/
typedef struct XLogCtlData
{
- /* Protected by insert_lck: */
+ /* Protected by WALInsertLock: */
XLogCtlInsert Insert;
/* Protected by info_lck: */
XLogwrtRqst LogwrtRqst;
XLogwrtResult LogwrtResult;
- /* Protected by logwrt_lck: */
+ /* Protected by WALWriteLock: */
XLogCtlWrite Write;
/*
* These values do not change after startup, although the pointed-to
- * pages and xlblocks values certainly do. Permission to read/write
- * the pages and xlblocks values depends on insert_lck and logwrt_lck.
+ * pages and xlblocks values certainly do. Permission to read/write the
+ * pages and xlblocks values depends on WALInsertLock and WALWriteLock.
*/
char *pages; /* buffers for unwritten XLOG pages */
XLogRecPtr *xlblocks; /* 1st byte ptr-s + BLCKSZ */
@@ -259,13 +269,10 @@ typedef struct XLogCtlData
uint32 XLogCacheBlck; /* highest allocated xlog buffer index */
StartUpID ThisStartUpID;
- /* This value is not protected by *any* spinlock... */
+ /* This value is not protected by *any* lock... */
XLogRecPtr RedoRecPtr; /* see SetRedoRecPtr/GetRedoRecPtr */
- slock_t insert_lck; /* XLogInsert lock */
slock_t info_lck; /* locks shared LogwrtRqst/LogwrtResult */
- slock_t logwrt_lck; /* XLogWrite/XLogFlush lock */
- slock_t chkp_lck; /* checkpoint lock */
} XLogCtlData;
static XLogCtlData *XLogCtl = NULL;
@@ -473,7 +480,7 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
uint32 len,
write_len;
unsigned i;
- bool do_logwrt;
+ XLogwrtRqst LogwrtRqst;
bool updrqst;
bool no_tran = (rmid == RM_XLOG_ID) ? true : false;
@@ -505,7 +512,7 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
*
* We may have to loop back to here if a race condition is detected
* below. We could prevent the race by doing all this work while
- * holding the insert spinlock, but it seems better to avoid doing CRC
+ * holding the insert lock, but it seems better to avoid doing CRC
* calculations while holding the lock. This means we have to be
* careful about modifying the rdata list until we know we aren't
* going to loop back again. The only change we allow ourselves to
@@ -607,48 +614,33 @@ begin:;
START_CRIT_SECTION();
- /* wait to obtain xlog insert lock */
- do_logwrt = true;
+ /* update LogwrtResult before doing cache fill check */
+ SpinLockAcquire_NoHoldoff(&XLogCtl->info_lck);
+ LogwrtRqst = XLogCtl->LogwrtRqst;
+ LogwrtResult = XLogCtl->LogwrtResult;
+ SpinLockRelease_NoHoldoff(&XLogCtl->info_lck);
- for (i = 0;;)
+ /*
+ * If cache is half filled then try to acquire write lock and
+ * do XLogWrite. Ignore any fractional blocks in performing this check.
+ */
+ LogwrtRqst.Write.xrecoff -= LogwrtRqst.Write.xrecoff % BLCKSZ;
+ if (LogwrtRqst.Write.xlogid != LogwrtResult.Write.xlogid ||
+ (LogwrtRqst.Write.xrecoff >= LogwrtResult.Write.xrecoff +
+ XLogCtl->XLogCacheByte / 2))
{
- /* try to update LogwrtResult while waiting for insert lock */
- if (!TAS(&(XLogCtl->info_lck)))
+ if (LWLockConditionalAcquire(WALWriteLock, LW_EXCLUSIVE))
{
- XLogwrtRqst LogwrtRqst;
-
- LogwrtRqst = XLogCtl->LogwrtRqst;
- LogwrtResult = XLogCtl->LogwrtResult;
- S_UNLOCK(&(XLogCtl->info_lck));
-
- /*
- * If cache is half filled then try to acquire logwrt lock and
- * do LOGWRT work, but only once per XLogInsert call. Ignore
- * any fractional blocks in performing this check.
- */
- LogwrtRqst.Write.xrecoff -= LogwrtRqst.Write.xrecoff % BLCKSZ;
- if (do_logwrt &&
- (LogwrtRqst.Write.xlogid != LogwrtResult.Write.xlogid ||
- (LogwrtRqst.Write.xrecoff >= LogwrtResult.Write.xrecoff +
- XLogCtl->XLogCacheByte / 2)))
- {
- if (!TAS(&(XLogCtl->logwrt_lck)))
- {
- LogwrtResult = XLogCtl->Write.LogwrtResult;
- if (XLByteLT(LogwrtResult.Write, LogwrtRqst.Write))
- {
- XLogWrite(LogwrtRqst);
- do_logwrt = false;
- }
- S_UNLOCK(&(XLogCtl->logwrt_lck));
- }
- }
+ LogwrtResult = XLogCtl->Write.LogwrtResult;
+ if (XLByteLT(LogwrtResult.Write, LogwrtRqst.Write))
+ XLogWrite(LogwrtRqst);
+ LWLockRelease(WALWriteLock);
}
- if (!TAS(&(XLogCtl->insert_lck)))
- break;
- S_LOCK_SLEEP(&(XLogCtl->insert_lck), i++, XLOG_LOCK_TIMEOUT);
}
+ /* Now wait to get insert lock */
+ LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
+
/*
* Check to see if my RedoRecPtr is out of date. If so, may have to
* go back and recompute everything. This can only happen just after
@@ -667,12 +659,11 @@ begin:;
if (dtbuf_bkp[i] == false &&
XLByteLE(dtbuf_lsn[i], RedoRecPtr))
{
-
/*
* Oops, this buffer now needs to be backed up, but we
* didn't think so above. Start over.
*/
- S_UNLOCK(&(XLogCtl->insert_lck));
+ LWLockRelease(WALInsertLock);
END_CRIT_SECTION();
goto begin;
}
@@ -751,9 +742,9 @@ begin:;
/* If first XLOG record of transaction, save it in PROC array */
if (MyLastRecPtr.xrecoff == 0 && !no_tran)
{
- SpinAcquire(SInvalLock);
+ LWLockAcquire(SInvalLock, LW_EXCLUSIVE);
MyProc->logRec = RecPtr;
- SpinRelease(SInvalLock);
+ LWLockRelease(SInvalLock);
}
if (XLOG_DEBUG)
@@ -837,17 +828,17 @@ begin:;
curridx = PrevBufIdx(curridx);
WriteRqst = XLogCtl->xlblocks[curridx];
- S_UNLOCK(&(XLogCtl->insert_lck));
+ LWLockRelease(WALInsertLock);
if (updrqst)
{
- S_LOCK(&(XLogCtl->info_lck));
+ SpinLockAcquire_NoHoldoff(&XLogCtl->info_lck);
/* advance global request to include new block(s) */
if (XLByteLT(XLogCtl->LogwrtRqst.Write, WriteRqst))
XLogCtl->LogwrtRqst.Write = WriteRqst;
/* update local result copy while I have the chance */
LogwrtResult = XLogCtl->LogwrtResult;
- S_UNLOCK(&(XLogCtl->info_lck));
+ SpinLockRelease_NoHoldoff(&XLogCtl->info_lck);
}
END_CRIT_SECTION();
@@ -859,11 +850,11 @@ begin:;
* buffer if it still contains unwritten data.
*
* The global LogwrtRqst.Write pointer needs to be advanced to include the
- * just-filled page. If we can do this for free (without an extra spinlock),
+ * just-filled page. If we can do this for free (without an extra lock),
* we do so here. Otherwise the caller must do it. We return TRUE if the
* request update still needs to be done, FALSE if we did it internally.
*
- * Must be called with insert_lck held.
+ * Must be called with WALInsertLock held.
*/
static bool
AdvanceXLInsertBuffer(void)
@@ -890,45 +881,37 @@ AdvanceXLInsertBuffer(void)
if (!XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
{
/* nope, got work to do... */
- unsigned spins = 0;
XLogRecPtr FinishedPageRqstPtr;
FinishedPageRqstPtr = XLogCtl->xlblocks[Insert->curridx];
- for (;;)
+ /* Before waiting, get info_lck and update LogwrtResult */
+ SpinLockAcquire_NoHoldoff(&XLogCtl->info_lck);
+ if (XLByteLT(XLogCtl->LogwrtRqst.Write, FinishedPageRqstPtr))
+ XLogCtl->LogwrtRqst.Write = FinishedPageRqstPtr;
+ LogwrtResult = XLogCtl->LogwrtResult;
+ SpinLockRelease_NoHoldoff(&XLogCtl->info_lck);
+
+ update_needed = false; /* Did the shared-request update */
+
+ if (XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
+ {
+ /* OK, someone wrote it already */
+ Insert->LogwrtResult = LogwrtResult;
+ }
+ else
{
- /* While waiting, try to get info_lck and update LogwrtResult */
- if (!TAS(&(XLogCtl->info_lck)))
+ /* Must acquire write lock */
+ LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
+ LogwrtResult = Write->LogwrtResult;
+ if (XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
{
- if (XLByteLT(XLogCtl->LogwrtRqst.Write, FinishedPageRqstPtr))
- XLogCtl->LogwrtRqst.Write = FinishedPageRqstPtr;
- update_needed = false; /* Did the shared-request update */
- LogwrtResult = XLogCtl->LogwrtResult;
- S_UNLOCK(&(XLogCtl->info_lck));
-
- if (XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
- {
- /* OK, someone wrote it already */
- Insert->LogwrtResult = LogwrtResult;
- break;
- }
+ /* OK, someone wrote it already */
+ LWLockRelease(WALWriteLock);
+ Insert->LogwrtResult = LogwrtResult;
}
-
- /*
- * LogwrtResult lock is busy or we know the page is still
- * dirty. Try to acquire logwrt lock and write full blocks.
- */
- if (!TAS(&(XLogCtl->logwrt_lck)))
+ else
{
- LogwrtResult = Write->LogwrtResult;
- if (XLByteLE(OldPageRqstPtr, LogwrtResult.Write))
- {
- S_UNLOCK(&(XLogCtl->logwrt_lck));
- /* OK, someone wrote it already */
- Insert->LogwrtResult = LogwrtResult;
- break;
- }
-
/*
* Have to write buffers while holding insert lock. This
* is not good, so only write as much as we absolutely
@@ -938,11 +921,9 @@ AdvanceXLInsertBuffer(void)
WriteRqst.Flush.xlogid = 0;
WriteRqst.Flush.xrecoff = 0;
XLogWrite(WriteRqst);
- S_UNLOCK(&(XLogCtl->logwrt_lck));
+ LWLockRelease(WALWriteLock);
Insert->LogwrtResult = LogwrtResult;
- break;
}
- S_LOCK_SLEEP(&(XLogCtl->logwrt_lck), spins++, XLOG_LOCK_TIMEOUT);
}
}
@@ -986,7 +967,7 @@ AdvanceXLInsertBuffer(void)
/*
* Write and/or fsync the log at least as far as WriteRqst indicates.
*
- * Must be called with logwrt_lck held.
+ * Must be called with WALWriteLock held.
*/
static void
XLogWrite(XLogwrtRqst WriteRqst)
@@ -1047,7 +1028,7 @@ XLogWrite(XLogwrtRqst WriteRqst)
"consider increasing WAL_FILES");
/* update pg_control, unless someone else already did */
- SpinAcquire(ControlFileLockId);
+ LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
if (ControlFile->logId < openLogId ||
(ControlFile->logId == openLogId &&
ControlFile->logSeg < openLogSeg + 1))
@@ -1073,7 +1054,7 @@ XLogWrite(XLogwrtRqst WriteRqst)
kill(getppid(), SIGUSR1);
}
}
- SpinRelease(ControlFileLockId);
+ LWLockRelease(ControlFileLock);
}
if (openLogFile < 0)
@@ -1167,13 +1148,13 @@ XLogWrite(XLogwrtRqst WriteRqst)
* 'result' values. This is not absolutely essential, but it saves
* some code in a couple of places.
*/
- S_LOCK(&(XLogCtl->info_lck));
+ SpinLockAcquire_NoHoldoff(&XLogCtl->info_lck);
XLogCtl->LogwrtResult = LogwrtResult;
if (XLByteLT(XLogCtl->LogwrtRqst.Write, LogwrtResult.Write))
XLogCtl->LogwrtRqst.Write = LogwrtResult.Write;
if (XLByteLT(XLogCtl->LogwrtRqst.Flush, LogwrtResult.Flush))
XLogCtl->LogwrtRqst.Flush = LogwrtResult.Flush;
- S_UNLOCK(&(XLogCtl->info_lck));
+ SpinLockRelease_NoHoldoff(&XLogCtl->info_lck);
Write->LogwrtResult = LogwrtResult;
}
@@ -1181,7 +1162,7 @@ XLogWrite(XLogwrtRqst WriteRqst)
/*
* Ensure that all XLOG data through the given position is flushed to disk.
*
- * NOTE: this differs from XLogWrite mainly in that the logwrt_lck is not
+ * NOTE: this differs from XLogWrite mainly in that the WALWriteLock is not
* already held, and we try to avoid acquiring it if possible.
*/
void
@@ -1189,7 +1170,6 @@ XLogFlush(XLogRecPtr record)
{
XLogRecPtr WriteRqstPtr;
XLogwrtRqst WriteRqst;
- unsigned spins = 0;
if (XLOG_DEBUG)
{
@@ -1224,23 +1204,18 @@ XLogFlush(XLogRecPtr record)
/* initialize to given target; may increase below */
WriteRqstPtr = record;
- for (;;)
+ /* read LogwrtResult and update local state */
+ SpinLockAcquire_NoHoldoff(&XLogCtl->info_lck);
+ if (XLByteLT(WriteRqstPtr, XLogCtl->LogwrtRqst.Write))
+ WriteRqstPtr = XLogCtl->LogwrtRqst.Write;
+ LogwrtResult = XLogCtl->LogwrtResult;
+ SpinLockRelease_NoHoldoff(&XLogCtl->info_lck);
+
+ /* done already? */
+ if (!XLByteLE(record, LogwrtResult.Flush))
{
- /* try to read LogwrtResult and update local state */
- if (!TAS(&(XLogCtl->info_lck)))
- {
- if (XLByteLT(WriteRqstPtr, XLogCtl->LogwrtRqst.Write))
- WriteRqstPtr = XLogCtl->LogwrtRqst.Write;
- LogwrtResult = XLogCtl->LogwrtResult;
- S_UNLOCK(&(XLogCtl->info_lck));
- if (XLByteLE(record, LogwrtResult.Flush))
- {
- /* Done already */
- break;
- }
- }
/* if something was added to log cache then try to flush this too */
- if (!TAS(&(XLogCtl->insert_lck)))
+ if (LWLockConditionalAcquire(WALInsertLock, LW_EXCLUSIVE))
{
XLogCtlInsert *Insert = &XLogCtl->Insert;
uint32 freespace = INSERT_FREESPACE(Insert);
@@ -1252,29 +1227,22 @@ XLogFlush(XLogRecPtr record)
WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx];
WriteRqstPtr.xrecoff -= freespace;
}
- S_UNLOCK(&(XLogCtl->insert_lck));
+ LWLockRelease(WALInsertLock);
}
- /* now try to get the logwrt lock */
- if (!TAS(&(XLogCtl->logwrt_lck)))
+ /* now wait for the write lock */
+ LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
+ LogwrtResult = XLogCtl->Write.LogwrtResult;
+ if (!XLByteLE(record, LogwrtResult.Flush))
{
- LogwrtResult = XLogCtl->Write.LogwrtResult;
- if (XLByteLE(record, LogwrtResult.Flush))
- {
- /* Done already */
- S_UNLOCK(&(XLogCtl->logwrt_lck));
- break;
- }
WriteRqst.Write = WriteRqstPtr;
WriteRqst.Flush = record;
XLogWrite(WriteRqst);
- S_UNLOCK(&(XLogCtl->logwrt_lck));
if (XLByteLT(LogwrtResult.Flush, record))
elog(STOP, "XLogFlush: request %X/%X is not satisfied --- flushed only to %X/%X",
record.xlogid, record.xrecoff,
LogwrtResult.Flush.xlogid, LogwrtResult.Flush.xrecoff);
- break;
}
- S_LOCK_SLEEP(&(XLogCtl->logwrt_lck), spins++, XLOG_LOCK_TIMEOUT);
+ LWLockRelease(WALWriteLock);
}
END_CRIT_SECTION();
@@ -1289,9 +1257,9 @@ XLogFlush(XLogRecPtr record)
* pre-existing file will be deleted). On return, TRUE if a pre-existing
* file was used.
*
- * use_lock: if TRUE, acquire ControlFileLock spinlock while moving file into
+ * use_lock: if TRUE, acquire ControlFileLock while moving file into
* place. This should be TRUE except during bootstrap log creation. The
- * caller must *not* hold the spinlock at call.
+ * caller must *not* hold the lock at call.
*
* Returns FD of opened file.
*/
@@ -1329,7 +1297,7 @@ XLogFileInit(uint32 log, uint32 seg,
* Initialize an empty (all zeroes) segment. NOTE: it is possible
* that another process is doing the same thing. If so, we will end
* up pre-creating an extra log segment. That seems OK, and better
- * than holding the spinlock throughout this lengthy process.
+ * than holding the lock throughout this lengthy process.
*/
snprintf(tmppath, MAXPGPATH, "%s/xlogtemp.%d",
XLogDir, (int) getpid());
@@ -1423,9 +1391,9 @@ XLogFileInit(uint32 log, uint32 seg,
* point. Fail if no free slot is found in this range. (Irrelevant if
* find_free is FALSE.)
*
- * use_lock: if TRUE, acquire ControlFileLock spinlock while moving file into
+ * use_lock: if TRUE, acquire ControlFileLock while moving file into
* place. This should be TRUE except during bootstrap log creation. The
- * caller must *not* hold the spinlock at call.
+ * caller must *not* hold the lock at call.
*
* Returns TRUE if file installed, FALSE if not installed because of
* exceeding max_advance limit. (Any other kind of failure causes elog().)
@@ -1444,7 +1412,7 @@ InstallXLogFileSegment(uint32 log, uint32 seg, char *tmppath,
* We want to be sure that only one process does this at a time.
*/
if (use_lock)
- SpinAcquire(ControlFileLockId);
+ LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
if (!find_free)
{
@@ -1462,7 +1430,7 @@ InstallXLogFileSegment(uint32 log, uint32 seg, char *tmppath,
{
/* Failed to find a free slot within specified range */
if (use_lock)
- SpinRelease(ControlFileLockId);
+ LWLockRelease(ControlFileLock);
return false;
}
NextLogSeg(log, seg);
@@ -1487,7 +1455,7 @@ InstallXLogFileSegment(uint32 log, uint32 seg, char *tmppath,
#endif
if (use_lock)
- SpinRelease(ControlFileLockId);
+ LWLockRelease(ControlFileLock);
return true;
}
@@ -2319,10 +2287,7 @@ XLOGShmemInit(void)
XLogCtl->XLogCacheByte = BLCKSZ * XLOGbuffers;
XLogCtl->XLogCacheBlck = XLOGbuffers - 1;
XLogCtl->Insert.currpage = (XLogPageHeader) (XLogCtl->pages);
- S_INIT_LOCK(&(XLogCtl->insert_lck));
- S_INIT_LOCK(&(XLogCtl->info_lck));
- S_INIT_LOCK(&(XLogCtl->logwrt_lck));
- S_INIT_LOCK(&(XLogCtl->chkp_lck));
+ SpinLockInit(&XLogCtl->info_lck);
/*
* If we are not in bootstrap mode, pg_control should already exist.
@@ -2821,12 +2786,12 @@ SetThisStartUpID(void)
* in shmem (using SetRedoRecPtr). When checkpointer completes, postmaster
* calls GetRedoRecPtr to update its own copy of RedoRecPtr, so that
* subsequently-spawned backends will start out with a reasonably up-to-date
- * local RedoRecPtr. Since these operations are not protected by any spinlock
+ * local RedoRecPtr. Since these operations are not protected by any lock
* and copying an XLogRecPtr isn't atomic, it's unsafe to use either of these
* routines at other times!
*
* Note: once spawned, a backend must update its local RedoRecPtr from
- * XLogCtl->Insert.RedoRecPtr while holding the insert spinlock. This is
+ * XLogCtl->Insert.RedoRecPtr while holding the insert lock. This is
* done in XLogInsert().
*/
void
@@ -2874,20 +2839,26 @@ CreateCheckPoint(bool shutdown)
uint32 freespace;
uint32 _logId;
uint32 _logSeg;
- unsigned spins = 0;
if (MyLastRecPtr.xrecoff != 0)
elog(ERROR, "CreateCheckPoint: cannot be called inside transaction block");
- START_CRIT_SECTION();
-
- /* Grab lock, using larger than normal sleep between tries (1 sec) */
- while (TAS(&(XLogCtl->chkp_lck)))
+ /*
+ * The CheckpointLock can be held for quite a while, which is not good
+ * because we won't respond to a cancel/die request while waiting for an
+ * LWLock. (But the alternative of using a regular lock won't work for
+ * background checkpoint processes, which are not regular backends.)
+ * So, rather than use a plain LWLockAcquire, use this kluge to allow
+ * an interrupt to be accepted while we are waiting:
+ */
+ while (!LWLockConditionalAcquire(CheckpointLock, LW_EXCLUSIVE))
{
- S_LOCK_SLEEP_INTERVAL(&(XLogCtl->chkp_lck), spins++,
- CHECKPOINT_LOCK_TIMEOUT, 1000000);
+ CHECK_FOR_INTERRUPTS();
+ sleep(1);
}
+ START_CRIT_SECTION();
+
if (shutdown)
{
ControlFile->state = DB_SHUTDOWNING;
@@ -2899,7 +2870,7 @@ CreateCheckPoint(bool shutdown)
checkPoint.ThisStartUpID = ThisStartUpID;
checkPoint.time = time(NULL);
- S_LOCK(&(XLogCtl->insert_lck));
+ LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
/*
* If this isn't a shutdown, and we have not inserted any XLOG records
@@ -2929,8 +2900,8 @@ CreateCheckPoint(bool shutdown)
ControlFile->checkPoint.xrecoff ==
ControlFile->checkPointCopy.redo.xrecoff)
{
- S_UNLOCK(&(XLogCtl->insert_lck));
- S_UNLOCK(&(XLogCtl->chkp_lck));
+ LWLockRelease(WALInsertLock);
+ LWLockRelease(CheckpointLock);
END_CRIT_SECTION();
return;
}
@@ -2974,17 +2945,17 @@ CreateCheckPoint(bool shutdown)
* Now we can release insert lock, allowing other xacts to proceed
* even while we are flushing disk buffers.
*/
- S_UNLOCK(&(XLogCtl->insert_lck));
+ LWLockRelease(WALInsertLock);
- SpinAcquire(XidGenLockId);
+ LWLockAcquire(XidGenLock, LW_SHARED);
checkPoint.nextXid = ShmemVariableCache->nextXid;
- SpinRelease(XidGenLockId);
+ LWLockRelease(XidGenLock);
- SpinAcquire(OidGenLockId);
+ LWLockAcquire(OidGenLock, LW_SHARED);
checkPoint.nextOid = ShmemVariableCache->nextOid;
if (!shutdown)
checkPoint.nextOid += ShmemVariableCache->oidCount;
- SpinRelease(OidGenLockId);
+ LWLockRelease(OidGenLock);
/*
* Having constructed the checkpoint record, ensure all shmem disk
@@ -3039,7 +3010,7 @@ CreateCheckPoint(bool shutdown)
/*
* Update the control file.
*/
- SpinAcquire(ControlFileLockId);
+ LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
if (shutdown)
ControlFile->state = DB_SHUTDOWNED;
ControlFile->prevCheckPoint = ControlFile->checkPoint;
@@ -3047,7 +3018,7 @@ CreateCheckPoint(bool shutdown)
ControlFile->checkPointCopy = checkPoint;
ControlFile->time = time(NULL);
UpdateControlFile();
- SpinRelease(ControlFileLockId);
+ LWLockRelease(ControlFileLock);
/*
* Delete offline log files (those no longer needed even for previous
@@ -3067,7 +3038,7 @@ CreateCheckPoint(bool shutdown)
if (!shutdown)
PreallocXlogFiles(recptr);
- S_UNLOCK(&(XLogCtl->chkp_lck));
+ LWLockRelease(CheckpointLock);
END_CRIT_SECTION();
}