diff options
Diffstat (limited to 'src/backend/access/transam/xlog.c')
-rw-r--r-- | src/backend/access/transam/xlog.c | 325 |
1 files changed, 148 insertions, 177 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 56a0e2ba6bb..057ee72d55a 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2001, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $Header: /cvsroot/pgsql/src/backend/access/transam/xlog.c,v 1.77 2001/09/26 20:24:02 tgl Exp $ + * $Header: /cvsroot/pgsql/src/backend/access/transam/xlog.c,v 1.78 2001/09/29 04:02:21 tgl Exp $ * *------------------------------------------------------------------------- */ @@ -33,11 +33,11 @@ #include "access/xlogutils.h" #include "catalog/catversion.h" #include "catalog/pg_control.h" -#include "storage/sinval.h" +#include "storage/bufpage.h" +#include "storage/lwlock.h" #include "storage/proc.h" +#include "storage/sinval.h" #include "storage/spin.h" -#include "storage/s_lock.h" -#include "storage/bufpage.h" #include "utils/builtins.h" #include "utils/relcache.h" #include "utils/selfuncs.h" @@ -86,11 +86,6 @@ #endif -/* Max time to wait to acquire XLog activity locks */ -#define XLOG_LOCK_TIMEOUT (5*60*1000000) /* 5 minutes */ -/* Max time to wait to acquire checkpoint lock */ -#define CHECKPOINT_LOCK_TIMEOUT (20*60*1000000) /* 20 minutes */ - /* User-settable parameters */ int CheckPointSegments = 3; int XLOGbuffers = 8; @@ -155,13 +150,10 @@ static XLogRecPtr ProcLastRecPtr = {0, 0}; * (which is almost but not quite the same as a pointer to the most recent * CHECKPOINT record). We update this from the shared-memory copy, * XLogCtl->Insert.RedoRecPtr, whenever we can safely do so (ie, when we - * hold the Insert spinlock). See XLogInsert for details. + * hold the Insert lock). See XLogInsert for details. */ static XLogRecPtr RedoRecPtr; -/* This lock must be held to read/update control file or create new log file */ -SPINLOCK ControlFileLockId; - /*---------- * Shared-memory data structures for XLOG control * @@ -171,24 +163,24 @@ SPINLOCK ControlFileLockId; * These structs are identical but are declared separately to indicate their * slightly different functions. * - * We do a lot of pushups to minimize the amount of access to spinlocked + * We do a lot of pushups to minimize the amount of access to lockable * shared memory values. There are actually three shared-memory copies of * LogwrtResult, plus one unshared copy in each backend. Here's how it works: * XLogCtl->LogwrtResult is protected by info_lck - * XLogCtl->Write.LogwrtResult is protected by logwrt_lck - * XLogCtl->Insert.LogwrtResult is protected by insert_lck - * One must hold the associated spinlock to read or write any of these, but - * of course no spinlock is needed to read/write the unshared LogwrtResult. + * XLogCtl->Write.LogwrtResult is protected by WALWriteLock + * XLogCtl->Insert.LogwrtResult is protected by WALInsertLock + * One must hold the associated lock to read or write any of these, but + * of course no lock is needed to read/write the unshared LogwrtResult. * * XLogCtl->LogwrtResult and XLogCtl->Write.LogwrtResult are both "always * right", since both are updated by a write or flush operation before - * it releases logwrt_lck. The point of keeping XLogCtl->Write.LogwrtResult - * is that it can be examined/modified by code that already holds logwrt_lck + * it releases WALWriteLock. The point of keeping XLogCtl->Write.LogwrtResult + * is that it can be examined/modified by code that already holds WALWriteLock * without needing to grab info_lck as well. * * XLogCtl->Insert.LogwrtResult may lag behind the reality of the other two, * but is updated when convenient. Again, it exists for the convenience of - * code that is already holding insert_lck but not the other locks. + * code that is already holding WALInsertLock but not the other locks. * * The unshared LogwrtResult may lag behind any or all of these, and again * is updated when convenient. @@ -199,6 +191,24 @@ SPINLOCK ControlFileLockId; * Note that this all works because the request and result positions can only * advance forward, never back up, and so we can easily determine which of two * values is "more up to date". + * + * info_lck is only held long enough to read/update the protected variables, + * so it's a plain spinlock. The other locks are held longer (potentially + * over I/O operations), so we use LWLocks for them. These locks are: + * + * WALInsertLock: must be held to insert a record into the WAL buffers. + * + * WALWriteLock: must be held to write WAL buffers to disk (XLogWrite or + * XLogFlush). + * + * ControlFileLock: must be held to read/update control file or create + * new log file. + * + * CheckpointLock: must be held to do a checkpoint (ensures only one + * checkpointer at a time; even though the postmaster won't launch + * parallel checkpoint processes, we need this because manual checkpoints + * could be launched simultaneously). + * *---------- */ typedef struct XLogwrtRqst @@ -240,18 +250,18 @@ typedef struct XLogCtlWrite */ typedef struct XLogCtlData { - /* Protected by insert_lck: */ + /* Protected by WALInsertLock: */ XLogCtlInsert Insert; /* Protected by info_lck: */ XLogwrtRqst LogwrtRqst; XLogwrtResult LogwrtResult; - /* Protected by logwrt_lck: */ + /* Protected by WALWriteLock: */ XLogCtlWrite Write; /* * These values do not change after startup, although the pointed-to - * pages and xlblocks values certainly do. Permission to read/write - * the pages and xlblocks values depends on insert_lck and logwrt_lck. + * pages and xlblocks values certainly do. Permission to read/write the + * pages and xlblocks values depends on WALInsertLock and WALWriteLock. */ char *pages; /* buffers for unwritten XLOG pages */ XLogRecPtr *xlblocks; /* 1st byte ptr-s + BLCKSZ */ @@ -259,13 +269,10 @@ typedef struct XLogCtlData uint32 XLogCacheBlck; /* highest allocated xlog buffer index */ StartUpID ThisStartUpID; - /* This value is not protected by *any* spinlock... */ + /* This value is not protected by *any* lock... */ XLogRecPtr RedoRecPtr; /* see SetRedoRecPtr/GetRedoRecPtr */ - slock_t insert_lck; /* XLogInsert lock */ slock_t info_lck; /* locks shared LogwrtRqst/LogwrtResult */ - slock_t logwrt_lck; /* XLogWrite/XLogFlush lock */ - slock_t chkp_lck; /* checkpoint lock */ } XLogCtlData; static XLogCtlData *XLogCtl = NULL; @@ -473,7 +480,7 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata) uint32 len, write_len; unsigned i; - bool do_logwrt; + XLogwrtRqst LogwrtRqst; bool updrqst; bool no_tran = (rmid == RM_XLOG_ID) ? true : false; @@ -505,7 +512,7 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata) * * We may have to loop back to here if a race condition is detected * below. We could prevent the race by doing all this work while - * holding the insert spinlock, but it seems better to avoid doing CRC + * holding the insert lock, but it seems better to avoid doing CRC * calculations while holding the lock. This means we have to be * careful about modifying the rdata list until we know we aren't * going to loop back again. The only change we allow ourselves to @@ -607,48 +614,33 @@ begin:; START_CRIT_SECTION(); - /* wait to obtain xlog insert lock */ - do_logwrt = true; + /* update LogwrtResult before doing cache fill check */ + SpinLockAcquire_NoHoldoff(&XLogCtl->info_lck); + LogwrtRqst = XLogCtl->LogwrtRqst; + LogwrtResult = XLogCtl->LogwrtResult; + SpinLockRelease_NoHoldoff(&XLogCtl->info_lck); - for (i = 0;;) + /* + * If cache is half filled then try to acquire write lock and + * do XLogWrite. Ignore any fractional blocks in performing this check. + */ + LogwrtRqst.Write.xrecoff -= LogwrtRqst.Write.xrecoff % BLCKSZ; + if (LogwrtRqst.Write.xlogid != LogwrtResult.Write.xlogid || + (LogwrtRqst.Write.xrecoff >= LogwrtResult.Write.xrecoff + + XLogCtl->XLogCacheByte / 2)) { - /* try to update LogwrtResult while waiting for insert lock */ - if (!TAS(&(XLogCtl->info_lck))) + if (LWLockConditionalAcquire(WALWriteLock, LW_EXCLUSIVE)) { - XLogwrtRqst LogwrtRqst; - - LogwrtRqst = XLogCtl->LogwrtRqst; - LogwrtResult = XLogCtl->LogwrtResult; - S_UNLOCK(&(XLogCtl->info_lck)); - - /* - * If cache is half filled then try to acquire logwrt lock and - * do LOGWRT work, but only once per XLogInsert call. Ignore - * any fractional blocks in performing this check. - */ - LogwrtRqst.Write.xrecoff -= LogwrtRqst.Write.xrecoff % BLCKSZ; - if (do_logwrt && - (LogwrtRqst.Write.xlogid != LogwrtResult.Write.xlogid || - (LogwrtRqst.Write.xrecoff >= LogwrtResult.Write.xrecoff + - XLogCtl->XLogCacheByte / 2))) - { - if (!TAS(&(XLogCtl->logwrt_lck))) - { - LogwrtResult = XLogCtl->Write.LogwrtResult; - if (XLByteLT(LogwrtResult.Write, LogwrtRqst.Write)) - { - XLogWrite(LogwrtRqst); - do_logwrt = false; - } - S_UNLOCK(&(XLogCtl->logwrt_lck)); - } - } + LogwrtResult = XLogCtl->Write.LogwrtResult; + if (XLByteLT(LogwrtResult.Write, LogwrtRqst.Write)) + XLogWrite(LogwrtRqst); + LWLockRelease(WALWriteLock); } - if (!TAS(&(XLogCtl->insert_lck))) - break; - S_LOCK_SLEEP(&(XLogCtl->insert_lck), i++, XLOG_LOCK_TIMEOUT); } + /* Now wait to get insert lock */ + LWLockAcquire(WALInsertLock, LW_EXCLUSIVE); + /* * Check to see if my RedoRecPtr is out of date. If so, may have to * go back and recompute everything. This can only happen just after @@ -667,12 +659,11 @@ begin:; if (dtbuf_bkp[i] == false && XLByteLE(dtbuf_lsn[i], RedoRecPtr)) { - /* * Oops, this buffer now needs to be backed up, but we * didn't think so above. Start over. */ - S_UNLOCK(&(XLogCtl->insert_lck)); + LWLockRelease(WALInsertLock); END_CRIT_SECTION(); goto begin; } @@ -751,9 +742,9 @@ begin:; /* If first XLOG record of transaction, save it in PROC array */ if (MyLastRecPtr.xrecoff == 0 && !no_tran) { - SpinAcquire(SInvalLock); + LWLockAcquire(SInvalLock, LW_EXCLUSIVE); MyProc->logRec = RecPtr; - SpinRelease(SInvalLock); + LWLockRelease(SInvalLock); } if (XLOG_DEBUG) @@ -837,17 +828,17 @@ begin:; curridx = PrevBufIdx(curridx); WriteRqst = XLogCtl->xlblocks[curridx]; - S_UNLOCK(&(XLogCtl->insert_lck)); + LWLockRelease(WALInsertLock); if (updrqst) { - S_LOCK(&(XLogCtl->info_lck)); + SpinLockAcquire_NoHoldoff(&XLogCtl->info_lck); /* advance global request to include new block(s) */ if (XLByteLT(XLogCtl->LogwrtRqst.Write, WriteRqst)) XLogCtl->LogwrtRqst.Write = WriteRqst; /* update local result copy while I have the chance */ LogwrtResult = XLogCtl->LogwrtResult; - S_UNLOCK(&(XLogCtl->info_lck)); + SpinLockRelease_NoHoldoff(&XLogCtl->info_lck); } END_CRIT_SECTION(); @@ -859,11 +850,11 @@ begin:; * buffer if it still contains unwritten data. * * The global LogwrtRqst.Write pointer needs to be advanced to include the - * just-filled page. If we can do this for free (without an extra spinlock), + * just-filled page. If we can do this for free (without an extra lock), * we do so here. Otherwise the caller must do it. We return TRUE if the * request update still needs to be done, FALSE if we did it internally. * - * Must be called with insert_lck held. + * Must be called with WALInsertLock held. */ static bool AdvanceXLInsertBuffer(void) @@ -890,45 +881,37 @@ AdvanceXLInsertBuffer(void) if (!XLByteLE(OldPageRqstPtr, LogwrtResult.Write)) { /* nope, got work to do... */ - unsigned spins = 0; XLogRecPtr FinishedPageRqstPtr; FinishedPageRqstPtr = XLogCtl->xlblocks[Insert->curridx]; - for (;;) + /* Before waiting, get info_lck and update LogwrtResult */ + SpinLockAcquire_NoHoldoff(&XLogCtl->info_lck); + if (XLByteLT(XLogCtl->LogwrtRqst.Write, FinishedPageRqstPtr)) + XLogCtl->LogwrtRqst.Write = FinishedPageRqstPtr; + LogwrtResult = XLogCtl->LogwrtResult; + SpinLockRelease_NoHoldoff(&XLogCtl->info_lck); + + update_needed = false; /* Did the shared-request update */ + + if (XLByteLE(OldPageRqstPtr, LogwrtResult.Write)) + { + /* OK, someone wrote it already */ + Insert->LogwrtResult = LogwrtResult; + } + else { - /* While waiting, try to get info_lck and update LogwrtResult */ - if (!TAS(&(XLogCtl->info_lck))) + /* Must acquire write lock */ + LWLockAcquire(WALWriteLock, LW_EXCLUSIVE); + LogwrtResult = Write->LogwrtResult; + if (XLByteLE(OldPageRqstPtr, LogwrtResult.Write)) { - if (XLByteLT(XLogCtl->LogwrtRqst.Write, FinishedPageRqstPtr)) - XLogCtl->LogwrtRqst.Write = FinishedPageRqstPtr; - update_needed = false; /* Did the shared-request update */ - LogwrtResult = XLogCtl->LogwrtResult; - S_UNLOCK(&(XLogCtl->info_lck)); - - if (XLByteLE(OldPageRqstPtr, LogwrtResult.Write)) - { - /* OK, someone wrote it already */ - Insert->LogwrtResult = LogwrtResult; - break; - } + /* OK, someone wrote it already */ + LWLockRelease(WALWriteLock); + Insert->LogwrtResult = LogwrtResult; } - - /* - * LogwrtResult lock is busy or we know the page is still - * dirty. Try to acquire logwrt lock and write full blocks. - */ - if (!TAS(&(XLogCtl->logwrt_lck))) + else { - LogwrtResult = Write->LogwrtResult; - if (XLByteLE(OldPageRqstPtr, LogwrtResult.Write)) - { - S_UNLOCK(&(XLogCtl->logwrt_lck)); - /* OK, someone wrote it already */ - Insert->LogwrtResult = LogwrtResult; - break; - } - /* * Have to write buffers while holding insert lock. This * is not good, so only write as much as we absolutely @@ -938,11 +921,9 @@ AdvanceXLInsertBuffer(void) WriteRqst.Flush.xlogid = 0; WriteRqst.Flush.xrecoff = 0; XLogWrite(WriteRqst); - S_UNLOCK(&(XLogCtl->logwrt_lck)); + LWLockRelease(WALWriteLock); Insert->LogwrtResult = LogwrtResult; - break; } - S_LOCK_SLEEP(&(XLogCtl->logwrt_lck), spins++, XLOG_LOCK_TIMEOUT); } } @@ -986,7 +967,7 @@ AdvanceXLInsertBuffer(void) /* * Write and/or fsync the log at least as far as WriteRqst indicates. * - * Must be called with logwrt_lck held. + * Must be called with WALWriteLock held. */ static void XLogWrite(XLogwrtRqst WriteRqst) @@ -1047,7 +1028,7 @@ XLogWrite(XLogwrtRqst WriteRqst) "consider increasing WAL_FILES"); /* update pg_control, unless someone else already did */ - SpinAcquire(ControlFileLockId); + LWLockAcquire(ControlFileLock, LW_EXCLUSIVE); if (ControlFile->logId < openLogId || (ControlFile->logId == openLogId && ControlFile->logSeg < openLogSeg + 1)) @@ -1073,7 +1054,7 @@ XLogWrite(XLogwrtRqst WriteRqst) kill(getppid(), SIGUSR1); } } - SpinRelease(ControlFileLockId); + LWLockRelease(ControlFileLock); } if (openLogFile < 0) @@ -1167,13 +1148,13 @@ XLogWrite(XLogwrtRqst WriteRqst) * 'result' values. This is not absolutely essential, but it saves * some code in a couple of places. */ - S_LOCK(&(XLogCtl->info_lck)); + SpinLockAcquire_NoHoldoff(&XLogCtl->info_lck); XLogCtl->LogwrtResult = LogwrtResult; if (XLByteLT(XLogCtl->LogwrtRqst.Write, LogwrtResult.Write)) XLogCtl->LogwrtRqst.Write = LogwrtResult.Write; if (XLByteLT(XLogCtl->LogwrtRqst.Flush, LogwrtResult.Flush)) XLogCtl->LogwrtRqst.Flush = LogwrtResult.Flush; - S_UNLOCK(&(XLogCtl->info_lck)); + SpinLockRelease_NoHoldoff(&XLogCtl->info_lck); Write->LogwrtResult = LogwrtResult; } @@ -1181,7 +1162,7 @@ XLogWrite(XLogwrtRqst WriteRqst) /* * Ensure that all XLOG data through the given position is flushed to disk. * - * NOTE: this differs from XLogWrite mainly in that the logwrt_lck is not + * NOTE: this differs from XLogWrite mainly in that the WALWriteLock is not * already held, and we try to avoid acquiring it if possible. */ void @@ -1189,7 +1170,6 @@ XLogFlush(XLogRecPtr record) { XLogRecPtr WriteRqstPtr; XLogwrtRqst WriteRqst; - unsigned spins = 0; if (XLOG_DEBUG) { @@ -1224,23 +1204,18 @@ XLogFlush(XLogRecPtr record) /* initialize to given target; may increase below */ WriteRqstPtr = record; - for (;;) + /* read LogwrtResult and update local state */ + SpinLockAcquire_NoHoldoff(&XLogCtl->info_lck); + if (XLByteLT(WriteRqstPtr, XLogCtl->LogwrtRqst.Write)) + WriteRqstPtr = XLogCtl->LogwrtRqst.Write; + LogwrtResult = XLogCtl->LogwrtResult; + SpinLockRelease_NoHoldoff(&XLogCtl->info_lck); + + /* done already? */ + if (!XLByteLE(record, LogwrtResult.Flush)) { - /* try to read LogwrtResult and update local state */ - if (!TAS(&(XLogCtl->info_lck))) - { - if (XLByteLT(WriteRqstPtr, XLogCtl->LogwrtRqst.Write)) - WriteRqstPtr = XLogCtl->LogwrtRqst.Write; - LogwrtResult = XLogCtl->LogwrtResult; - S_UNLOCK(&(XLogCtl->info_lck)); - if (XLByteLE(record, LogwrtResult.Flush)) - { - /* Done already */ - break; - } - } /* if something was added to log cache then try to flush this too */ - if (!TAS(&(XLogCtl->insert_lck))) + if (LWLockConditionalAcquire(WALInsertLock, LW_EXCLUSIVE)) { XLogCtlInsert *Insert = &XLogCtl->Insert; uint32 freespace = INSERT_FREESPACE(Insert); @@ -1252,29 +1227,22 @@ XLogFlush(XLogRecPtr record) WriteRqstPtr = XLogCtl->xlblocks[Insert->curridx]; WriteRqstPtr.xrecoff -= freespace; } - S_UNLOCK(&(XLogCtl->insert_lck)); + LWLockRelease(WALInsertLock); } - /* now try to get the logwrt lock */ - if (!TAS(&(XLogCtl->logwrt_lck))) + /* now wait for the write lock */ + LWLockAcquire(WALWriteLock, LW_EXCLUSIVE); + LogwrtResult = XLogCtl->Write.LogwrtResult; + if (!XLByteLE(record, LogwrtResult.Flush)) { - LogwrtResult = XLogCtl->Write.LogwrtResult; - if (XLByteLE(record, LogwrtResult.Flush)) - { - /* Done already */ - S_UNLOCK(&(XLogCtl->logwrt_lck)); - break; - } WriteRqst.Write = WriteRqstPtr; WriteRqst.Flush = record; XLogWrite(WriteRqst); - S_UNLOCK(&(XLogCtl->logwrt_lck)); if (XLByteLT(LogwrtResult.Flush, record)) elog(STOP, "XLogFlush: request %X/%X is not satisfied --- flushed only to %X/%X", record.xlogid, record.xrecoff, LogwrtResult.Flush.xlogid, LogwrtResult.Flush.xrecoff); - break; } - S_LOCK_SLEEP(&(XLogCtl->logwrt_lck), spins++, XLOG_LOCK_TIMEOUT); + LWLockRelease(WALWriteLock); } END_CRIT_SECTION(); @@ -1289,9 +1257,9 @@ XLogFlush(XLogRecPtr record) * pre-existing file will be deleted). On return, TRUE if a pre-existing * file was used. * - * use_lock: if TRUE, acquire ControlFileLock spinlock while moving file into + * use_lock: if TRUE, acquire ControlFileLock while moving file into * place. This should be TRUE except during bootstrap log creation. The - * caller must *not* hold the spinlock at call. + * caller must *not* hold the lock at call. * * Returns FD of opened file. */ @@ -1329,7 +1297,7 @@ XLogFileInit(uint32 log, uint32 seg, * Initialize an empty (all zeroes) segment. NOTE: it is possible * that another process is doing the same thing. If so, we will end * up pre-creating an extra log segment. That seems OK, and better - * than holding the spinlock throughout this lengthy process. + * than holding the lock throughout this lengthy process. */ snprintf(tmppath, MAXPGPATH, "%s/xlogtemp.%d", XLogDir, (int) getpid()); @@ -1423,9 +1391,9 @@ XLogFileInit(uint32 log, uint32 seg, * point. Fail if no free slot is found in this range. (Irrelevant if * find_free is FALSE.) * - * use_lock: if TRUE, acquire ControlFileLock spinlock while moving file into + * use_lock: if TRUE, acquire ControlFileLock while moving file into * place. This should be TRUE except during bootstrap log creation. The - * caller must *not* hold the spinlock at call. + * caller must *not* hold the lock at call. * * Returns TRUE if file installed, FALSE if not installed because of * exceeding max_advance limit. (Any other kind of failure causes elog().) @@ -1444,7 +1412,7 @@ InstallXLogFileSegment(uint32 log, uint32 seg, char *tmppath, * We want to be sure that only one process does this at a time. */ if (use_lock) - SpinAcquire(ControlFileLockId); + LWLockAcquire(ControlFileLock, LW_EXCLUSIVE); if (!find_free) { @@ -1462,7 +1430,7 @@ InstallXLogFileSegment(uint32 log, uint32 seg, char *tmppath, { /* Failed to find a free slot within specified range */ if (use_lock) - SpinRelease(ControlFileLockId); + LWLockRelease(ControlFileLock); return false; } NextLogSeg(log, seg); @@ -1487,7 +1455,7 @@ InstallXLogFileSegment(uint32 log, uint32 seg, char *tmppath, #endif if (use_lock) - SpinRelease(ControlFileLockId); + LWLockRelease(ControlFileLock); return true; } @@ -2319,10 +2287,7 @@ XLOGShmemInit(void) XLogCtl->XLogCacheByte = BLCKSZ * XLOGbuffers; XLogCtl->XLogCacheBlck = XLOGbuffers - 1; XLogCtl->Insert.currpage = (XLogPageHeader) (XLogCtl->pages); - S_INIT_LOCK(&(XLogCtl->insert_lck)); - S_INIT_LOCK(&(XLogCtl->info_lck)); - S_INIT_LOCK(&(XLogCtl->logwrt_lck)); - S_INIT_LOCK(&(XLogCtl->chkp_lck)); + SpinLockInit(&XLogCtl->info_lck); /* * If we are not in bootstrap mode, pg_control should already exist. @@ -2821,12 +2786,12 @@ SetThisStartUpID(void) * in shmem (using SetRedoRecPtr). When checkpointer completes, postmaster * calls GetRedoRecPtr to update its own copy of RedoRecPtr, so that * subsequently-spawned backends will start out with a reasonably up-to-date - * local RedoRecPtr. Since these operations are not protected by any spinlock + * local RedoRecPtr. Since these operations are not protected by any lock * and copying an XLogRecPtr isn't atomic, it's unsafe to use either of these * routines at other times! * * Note: once spawned, a backend must update its local RedoRecPtr from - * XLogCtl->Insert.RedoRecPtr while holding the insert spinlock. This is + * XLogCtl->Insert.RedoRecPtr while holding the insert lock. This is * done in XLogInsert(). */ void @@ -2874,20 +2839,26 @@ CreateCheckPoint(bool shutdown) uint32 freespace; uint32 _logId; uint32 _logSeg; - unsigned spins = 0; if (MyLastRecPtr.xrecoff != 0) elog(ERROR, "CreateCheckPoint: cannot be called inside transaction block"); - START_CRIT_SECTION(); - - /* Grab lock, using larger than normal sleep between tries (1 sec) */ - while (TAS(&(XLogCtl->chkp_lck))) + /* + * The CheckpointLock can be held for quite a while, which is not good + * because we won't respond to a cancel/die request while waiting for an + * LWLock. (But the alternative of using a regular lock won't work for + * background checkpoint processes, which are not regular backends.) + * So, rather than use a plain LWLockAcquire, use this kluge to allow + * an interrupt to be accepted while we are waiting: + */ + while (!LWLockConditionalAcquire(CheckpointLock, LW_EXCLUSIVE)) { - S_LOCK_SLEEP_INTERVAL(&(XLogCtl->chkp_lck), spins++, - CHECKPOINT_LOCK_TIMEOUT, 1000000); + CHECK_FOR_INTERRUPTS(); + sleep(1); } + START_CRIT_SECTION(); + if (shutdown) { ControlFile->state = DB_SHUTDOWNING; @@ -2899,7 +2870,7 @@ CreateCheckPoint(bool shutdown) checkPoint.ThisStartUpID = ThisStartUpID; checkPoint.time = time(NULL); - S_LOCK(&(XLogCtl->insert_lck)); + LWLockAcquire(WALInsertLock, LW_EXCLUSIVE); /* * If this isn't a shutdown, and we have not inserted any XLOG records @@ -2929,8 +2900,8 @@ CreateCheckPoint(bool shutdown) ControlFile->checkPoint.xrecoff == ControlFile->checkPointCopy.redo.xrecoff) { - S_UNLOCK(&(XLogCtl->insert_lck)); - S_UNLOCK(&(XLogCtl->chkp_lck)); + LWLockRelease(WALInsertLock); + LWLockRelease(CheckpointLock); END_CRIT_SECTION(); return; } @@ -2974,17 +2945,17 @@ CreateCheckPoint(bool shutdown) * Now we can release insert lock, allowing other xacts to proceed * even while we are flushing disk buffers. */ - S_UNLOCK(&(XLogCtl->insert_lck)); + LWLockRelease(WALInsertLock); - SpinAcquire(XidGenLockId); + LWLockAcquire(XidGenLock, LW_SHARED); checkPoint.nextXid = ShmemVariableCache->nextXid; - SpinRelease(XidGenLockId); + LWLockRelease(XidGenLock); - SpinAcquire(OidGenLockId); + LWLockAcquire(OidGenLock, LW_SHARED); checkPoint.nextOid = ShmemVariableCache->nextOid; if (!shutdown) checkPoint.nextOid += ShmemVariableCache->oidCount; - SpinRelease(OidGenLockId); + LWLockRelease(OidGenLock); /* * Having constructed the checkpoint record, ensure all shmem disk @@ -3039,7 +3010,7 @@ CreateCheckPoint(bool shutdown) /* * Update the control file. */ - SpinAcquire(ControlFileLockId); + LWLockAcquire(ControlFileLock, LW_EXCLUSIVE); if (shutdown) ControlFile->state = DB_SHUTDOWNED; ControlFile->prevCheckPoint = ControlFile->checkPoint; @@ -3047,7 +3018,7 @@ CreateCheckPoint(bool shutdown) ControlFile->checkPointCopy = checkPoint; ControlFile->time = time(NULL); UpdateControlFile(); - SpinRelease(ControlFileLockId); + LWLockRelease(ControlFileLock); /* * Delete offline log files (those no longer needed even for previous @@ -3067,7 +3038,7 @@ CreateCheckPoint(bool shutdown) if (!shutdown) PreallocXlogFiles(recptr); - S_UNLOCK(&(XLogCtl->chkp_lck)); + LWLockRelease(CheckpointLock); END_CRIT_SECTION(); } |