diff options
author | Andres Freund <andres@anarazel.de> | 2016-04-10 20:12:32 -0700 |
---|---|---|
committer | Andres Freund <andres@anarazel.de> | 2016-04-10 20:12:32 -0700 |
commit | 48354581a49c30f5757c203415aa8412d85b0f70 (patch) | |
tree | ca509a2c196f179e97993ac89979c361c4b5f431 /src/backend/storage | |
parent | cf223c3bf5ba16232147c66b5fef4037aafe747c (diff) | |
download | postgresql-48354581a49c30f5757c203415aa8412d85b0f70.tar.gz postgresql-48354581a49c30f5757c203415aa8412d85b0f70.zip |
Allow Pin/UnpinBuffer to operate in a lockfree manner.
Pinning/Unpinning a buffer is a very frequent operation; especially in
read-mostly cache resident workloads. Benchmarking shows that in various
scenarios the spinlock protecting a buffer header's state becomes a
significant bottleneck. The problem can be reproduced with pgbench -S on
larger machines, but can be considerably worse for queries which touch
the same buffers over and over at a high frequency (e.g. nested loops
over a small inner table).
To allow atomic operations to be used, cram BufferDesc's flags,
usage_count, buf_hdr_lock, refcount into a single 32bit atomic variable;
that allows to manipulate them together using 32bit compare-and-swap
operations. This requires reducing MAX_BACKENDS to 2^18-1 (which could
be lifted by using a 64bit field, but it's not a realistic configuration
atm).
As not all operations can easily implemented in a lockfree manner,
implement the previous buf_hdr_lock via a flag bit in the atomic
variable. That way we can continue to lock the header in places where
it's needed, but can get away without acquiring it in the more frequent
hot-paths. There's some additional operations which can be done without
the lock, but aren't in this patch; but the most important places are
covered.
As bufmgr.c now essentially re-implements spinlocks, abstract the delay
logic from s_lock.c into something more generic. It now has already two
users, and more are coming up; there's a follupw patch for lwlock.c at
least.
This patch is based on a proof-of-concept written by me, which Alexander
Korotkov made into a fully working patch; the committed version is again
revised by me. Benchmarking and testing has, amongst others, been
provided by Dilip Kumar, Alexander Korotkov, Robert Haas.
On a large x86 system improvements for readonly pgbench, with a high
client count, of a factor of 8 have been observed.
Author: Alexander Korotkov and Andres Freund
Discussion: 2400449.GjM57CE0Yg@dinodell
Diffstat (limited to 'src/backend/storage')
-rw-r--r-- | src/backend/storage/buffer/buf_init.c | 7 | ||||
-rw-r--r-- | src/backend/storage/buffer/bufmgr.c | 508 | ||||
-rw-r--r-- | src/backend/storage/buffer/freelist.c | 44 | ||||
-rw-r--r-- | src/backend/storage/buffer/localbuf.c | 64 | ||||
-rw-r--r-- | src/backend/storage/lmgr/s_lock.c | 206 |
5 files changed, 517 insertions, 312 deletions
diff --git a/src/backend/storage/buffer/buf_init.c b/src/backend/storage/buffer/buf_init.c index bfa37f1c66b..a5cffc78968 100644 --- a/src/backend/storage/buffer/buf_init.c +++ b/src/backend/storage/buffer/buf_init.c @@ -135,12 +135,9 @@ InitBufferPool(void) BufferDesc *buf = GetBufferDescriptor(i); CLEAR_BUFFERTAG(buf->tag); - buf->flags = 0; - buf->usage_count = 0; - buf->refcount = 0; - buf->wait_backend_pid = 0; - SpinLockInit(&buf->buf_hdr_lock); + pg_atomic_init_u32(&buf->state, 0); + buf->wait_backend_pid = 0; buf->buf_id = i; diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index c664984d0a1..29f10e59568 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -436,11 +436,12 @@ static bool PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy); static void PinBuffer_Locked(BufferDesc *buf); static void UnpinBuffer(BufferDesc *buf, bool fixOwner); static void BufferSync(int flags); +static uint32 WaitBufHdrUnlocked(BufferDesc *buf); static int SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *flush_context); static void WaitIO(BufferDesc *buf); static bool StartBufferIO(BufferDesc *buf, bool forInput); static void TerminateBufferIO(BufferDesc *buf, bool clear_dirty, - int set_flag_bits); + uint32 set_flag_bits); static void shared_buffer_write_error_callback(void *arg); static void local_buffer_write_error_callback(void *arg); static BufferDesc *BufferAlloc(SMgrRelation smgr, @@ -816,8 +817,11 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, if (isLocalBuf) { /* Only need to adjust flags */ - Assert(bufHdr->flags & BM_VALID); - bufHdr->flags &= ~BM_VALID; + uint32 buf_state = pg_atomic_read_u32(&bufHdr->state); + + Assert(buf_state & BM_VALID); + buf_state &= ~BM_VALID; + pg_atomic_write_u32(&bufHdr->state, buf_state); } else { @@ -828,10 +832,11 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, */ do { - LockBufHdr(bufHdr); - Assert(bufHdr->flags & BM_VALID); - bufHdr->flags &= ~BM_VALID; - UnlockBufHdr(bufHdr); + uint32 buf_state = LockBufHdr(bufHdr); + + Assert(buf_state & BM_VALID); + buf_state &= ~BM_VALID; + UnlockBufHdr(bufHdr, buf_state); } while (!StartBufferIO(bufHdr, true)); } } @@ -848,7 +853,7 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, * it's not been recycled) but come right back here to try smgrextend * again. */ - Assert(!(bufHdr->flags & BM_VALID)); /* spinlock not needed */ + Assert(!(pg_atomic_read_u32(&bufHdr->state) & BM_VALID)); /* spinlock not needed */ bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr); @@ -933,7 +938,10 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, if (isLocalBuf) { /* Only need to adjust flags */ - bufHdr->flags |= BM_VALID; + uint32 buf_state = pg_atomic_read_u32(&bufHdr->state); + + buf_state |= BM_VALID; + pg_atomic_write_u32(&bufHdr->state, buf_state); } else { @@ -987,10 +995,11 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, BufferTag oldTag; /* previous identity of selected buffer */ uint32 oldHash; /* hash value for oldTag */ LWLock *oldPartitionLock; /* buffer partition lock for it */ - BufFlags oldFlags; + uint32 oldFlags; int buf_id; BufferDesc *buf; bool valid; + uint32 buf_state; /* create a tag so we can lookup the buffer */ INIT_BUFFERTAG(newTag, smgr->smgr_rnode.node, forkNum, blockNum); @@ -1059,12 +1068,12 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, * Select a victim buffer. The buffer is returned with its header * spinlock still held! */ - buf = StrategyGetBuffer(strategy); + buf = StrategyGetBuffer(strategy, &buf_state); - Assert(buf->refcount == 0); + Assert(BUF_STATE_GET_REFCOUNT(buf_state) == 0); /* Must copy buffer flags while we still hold the spinlock */ - oldFlags = buf->flags; + oldFlags = buf_state & BUF_FLAG_MASK; /* Pin the buffer and then release the buffer spinlock */ PinBuffer_Locked(buf); @@ -1108,9 +1117,9 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, XLogRecPtr lsn; /* Read the LSN while holding buffer header lock */ - LockBufHdr(buf); + buf_state = LockBufHdr(buf); lsn = BufferGetLSN(buf); - UnlockBufHdr(buf); + UnlockBufHdr(buf, buf_state); if (XLogNeedsFlush(lsn) && StrategyRejectBuffer(strategy, buf)) @@ -1254,7 +1263,7 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, /* * Need to lock the buffer header too in order to change its tag. */ - LockBufHdr(buf); + buf_state = LockBufHdr(buf); /* * Somebody could have pinned or re-dirtied the buffer while we were @@ -1262,11 +1271,11 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, * recycle this buffer; we must undo everything we've done and start * over with a new victim buffer. */ - oldFlags = buf->flags; - if (buf->refcount == 1 && !(oldFlags & BM_DIRTY)) + oldFlags = buf_state & BUF_FLAG_MASK; + if (BUF_STATE_GET_REFCOUNT(buf_state) == 1 && !(oldFlags & BM_DIRTY)) break; - UnlockBufHdr(buf); + UnlockBufHdr(buf, buf_state); BufTableDelete(&newTag, newHash); if ((oldFlags & BM_TAG_VALID) && oldPartitionLock != newPartitionLock) @@ -1284,14 +1293,15 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, * 1 so that the buffer can survive one clock-sweep pass.) */ buf->tag = newTag; - buf->flags &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | BM_CHECKPOINT_NEEDED | BM_IO_ERROR | BM_PERMANENT); + buf_state &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | + BM_CHECKPOINT_NEEDED | BM_IO_ERROR | BM_PERMANENT | + BUF_USAGECOUNT_MASK); if (relpersistence == RELPERSISTENCE_PERMANENT) - buf->flags |= BM_TAG_VALID | BM_PERMANENT; + buf_state |= BM_TAG_VALID | BM_PERMANENT | BUF_USAGECOUNT_ONE; else - buf->flags |= BM_TAG_VALID; - buf->usage_count = 1; + buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE; - UnlockBufHdr(buf); + UnlockBufHdr(buf, buf_state); if (oldFlags & BM_TAG_VALID) { @@ -1338,12 +1348,15 @@ InvalidateBuffer(BufferDesc *buf) BufferTag oldTag; uint32 oldHash; /* hash value for oldTag */ LWLock *oldPartitionLock; /* buffer partition lock for it */ - BufFlags oldFlags; + uint32 oldFlags; + uint32 buf_state; /* Save the original buffer tag before dropping the spinlock */ oldTag = buf->tag; - UnlockBufHdr(buf); + buf_state = pg_atomic_read_u32(&buf->state); + Assert(buf_state & BM_LOCKED); + UnlockBufHdr(buf, buf_state); /* * Need to compute the old tag's hashcode and partition lock ID. XXX is it @@ -1362,12 +1375,12 @@ retry: LWLockAcquire(oldPartitionLock, LW_EXCLUSIVE); /* Re-lock the buffer header */ - LockBufHdr(buf); + buf_state = LockBufHdr(buf); /* If it's changed while we were waiting for lock, do nothing */ if (!BUFFERTAGS_EQUAL(buf->tag, oldTag)) { - UnlockBufHdr(buf); + UnlockBufHdr(buf, buf_state); LWLockRelease(oldPartitionLock); return; } @@ -1381,9 +1394,9 @@ retry: * yet done StartBufferIO, WaitIO will fall through and we'll effectively * be busy-looping here.) */ - if (buf->refcount != 0) + if (BUF_STATE_GET_REFCOUNT(buf_state) != 0) { - UnlockBufHdr(buf); + UnlockBufHdr(buf, buf_state); LWLockRelease(oldPartitionLock); /* safety check: should definitely not be our *own* pin */ if (GetPrivateRefCount(BufferDescriptorGetBuffer(buf)) > 0) @@ -1396,12 +1409,10 @@ retry: * Clear out the buffer's tag and flags. We must do this to ensure that * linear scans of the buffer array don't think the buffer is valid. */ - oldFlags = buf->flags; + oldFlags = buf_state & BUF_FLAG_MASK; CLEAR_BUFFERTAG(buf->tag); - buf->flags = 0; - buf->usage_count = 0; - - UnlockBufHdr(buf); + buf_state &= ~(BUF_FLAG_MASK | BUF_USAGECOUNT_MASK); + UnlockBufHdr(buf, buf_state); /* * Remove the buffer from the lookup hashtable, if it was in there. @@ -1433,6 +1444,8 @@ void MarkBufferDirty(Buffer buffer) { BufferDesc *bufHdr; + uint32 buf_state; + uint32 old_buf_state; if (!BufferIsValid(buffer)) elog(ERROR, "bad buffer ID: %d", buffer); @@ -1449,24 +1462,32 @@ MarkBufferDirty(Buffer buffer) /* unfortunately we can't check if the lock is held exclusively */ Assert(LWLockHeldByMe(BufferDescriptorGetContentLock(bufHdr))); - LockBufHdr(bufHdr); + old_buf_state = pg_atomic_read_u32(&bufHdr->state); + for (;;) + { + if (old_buf_state & BM_LOCKED) + old_buf_state = WaitBufHdrUnlocked(bufHdr); - Assert(bufHdr->refcount > 0); + buf_state = old_buf_state; + + Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0); + buf_state |= BM_DIRTY | BM_JUST_DIRTIED; + + if (pg_atomic_compare_exchange_u32(&bufHdr->state, &old_buf_state, + buf_state)) + break; + } /* * If the buffer was not dirty already, do vacuum accounting. */ - if (!(bufHdr->flags & BM_DIRTY)) + if (!(old_buf_state & BM_DIRTY)) { VacuumPageDirty++; pgBufferUsage.shared_blks_dirtied++; if (VacuumCostActive) VacuumCostBalance += VacuumCostPageDirty; } - - bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED); - - UnlockBufHdr(bufHdr); } /* @@ -1531,6 +1552,10 @@ ReleaseAndReadBuffer(Buffer buffer, * * This should be applied only to shared buffers, never local ones. * + * Since buffers are pinned/unpinned very frequently, pin buffers without + * taking the buffer header lock; instead update the state variable in loop of + * CAS operations. Hopefully it's just a single CAS. + * * Note that ResourceOwnerEnlargeBuffers must have been done already. * * Returns TRUE if buffer is BM_VALID, else FALSE. This provision allows @@ -1547,23 +1572,34 @@ PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy) if (ref == NULL) { + uint32 buf_state; + uint32 old_buf_state; + ReservePrivateRefCountEntry(); ref = NewPrivateRefCountEntry(b); - LockBufHdr(buf); - buf->refcount++; - if (strategy == NULL) - { - if (buf->usage_count < BM_MAX_USAGE_COUNT) - buf->usage_count++; - } - else + old_buf_state = pg_atomic_read_u32(&buf->state); + for (;;) { - if (buf->usage_count == 0) - buf->usage_count = 1; + if (old_buf_state & BM_LOCKED) + old_buf_state = WaitBufHdrUnlocked(buf); + + buf_state = old_buf_state; + + /* increase refcount */ + buf_state += BUF_REFCOUNT_ONE; + + /* increase usagecount unless already max */ + if (BUF_STATE_GET_USAGECOUNT(buf_state) != BM_MAX_USAGE_COUNT) + buf_state += BUF_USAGECOUNT_ONE; + + if (pg_atomic_compare_exchange_u32(&buf->state, &old_buf_state, + buf_state)) + { + result = (buf_state & BM_VALID) != 0; + break; + } } - result = (buf->flags & BM_VALID) != 0; - UnlockBufHdr(buf); } else { @@ -1603,6 +1639,7 @@ PinBuffer_Locked(BufferDesc *buf) { Buffer b; PrivateRefCountEntry *ref; + uint32 buf_state; /* * As explained, We don't expect any preexisting pins. That allows us to @@ -1610,8 +1647,14 @@ PinBuffer_Locked(BufferDesc *buf) */ Assert(GetPrivateRefCountEntry(BufferDescriptorGetBuffer(buf), false) == NULL); - buf->refcount++; - UnlockBufHdr(buf); + /* + * Since we hold the buffer spinlock, we can update the buffer state and + * release the lock in one operation. + */ + buf_state = pg_atomic_read_u32(&buf->state); + Assert(buf_state & BM_LOCKED); + buf_state += BUF_REFCOUNT_ONE; + UnlockBufHdr(buf, buf_state); b = BufferDescriptorGetBuffer(buf); @@ -1646,30 +1689,59 @@ UnpinBuffer(BufferDesc *buf, bool fixOwner) ref->refcount--; if (ref->refcount == 0) { + uint32 buf_state; + uint32 old_buf_state; + /* I'd better not still hold any locks on the buffer */ Assert(!LWLockHeldByMe(BufferDescriptorGetContentLock(buf))); Assert(!LWLockHeldByMe(BufferDescriptorGetIOLock(buf))); - LockBufHdr(buf); + /* + * Decrement the shared reference count. + * + * Since buffer spinlock holder can update status using just write, + * it's not safe to use atomic decrement here; thus use a CAS loop. + */ + old_buf_state = pg_atomic_read_u32(&buf->state); + for (;;) + { + if (old_buf_state & BM_LOCKED) + old_buf_state = WaitBufHdrUnlocked(buf); + + buf_state = old_buf_state; + + buf_state -= BUF_REFCOUNT_ONE; - /* Decrement the shared reference count */ - Assert(buf->refcount > 0); - buf->refcount--; + if (pg_atomic_compare_exchange_u32(&buf->state, &old_buf_state, + buf_state)) + break; + } /* Support LockBufferForCleanup() */ - if ((buf->flags & BM_PIN_COUNT_WAITER) && - buf->refcount == 1) + if (buf_state & BM_PIN_COUNT_WAITER) { - /* we just released the last pin other than the waiter's */ - int wait_backend_pid = buf->wait_backend_pid; + /* + * Acquire the buffer header lock, re-check that there's a waiter. + * Another backend could have unpinned this buffer, and already + * woken up the waiter. There's no danger of the buffer being + * replaced after we unpinned it above, as it's pinned by the + * waiter. + */ + buf_state = LockBufHdr(buf); - buf->flags &= ~BM_PIN_COUNT_WAITER; - UnlockBufHdr(buf); - ProcSendSignal(wait_backend_pid); - } - else - UnlockBufHdr(buf); + if ((buf_state & BM_PIN_COUNT_WAITER) && + BUF_STATE_GET_REFCOUNT(buf_state) == 1) + { + /* we just released the last pin other than the waiter's */ + int wait_backend_pid = buf->wait_backend_pid; + buf_state &= ~BM_PIN_COUNT_WAITER; + UnlockBufHdr(buf, buf_state); + ProcSendSignal(wait_backend_pid); + } + else + UnlockBufHdr(buf, buf_state); + } ForgetPrivateRefCountEntry(ref); } } @@ -1687,6 +1759,7 @@ UnpinBuffer(BufferDesc *buf, bool fixOwner) static void BufferSync(int flags) { + uint32 buf_state; int buf_id; int num_to_scan; int num_spaces; @@ -1736,13 +1809,13 @@ BufferSync(int flags) * Header spinlock is enough to examine BM_DIRTY, see comment in * SyncOneBuffer. */ - LockBufHdr(bufHdr); + buf_state = LockBufHdr(bufHdr); - if ((bufHdr->flags & mask) == mask) + if ((buf_state & mask) == mask) { CkptSortItem *item; - bufHdr->flags |= BM_CHECKPOINT_NEEDED; + buf_state |= BM_CHECKPOINT_NEEDED; item = &CkptBufferIds[num_to_scan++]; item->buf_id = buf_id; @@ -1752,7 +1825,7 @@ BufferSync(int flags) item->blockNum = bufHdr->tag.blockNum; } - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); } if (num_to_scan == 0) @@ -1888,7 +1961,7 @@ BufferSync(int flags) * write the buffer though we didn't need to. It doesn't seem worth * guarding against this, though. */ - if (bufHdr->flags & BM_CHECKPOINT_NEEDED) + if (pg_atomic_read_u32(&bufHdr->state) & BM_CHECKPOINT_NEEDED) { if (SyncOneBuffer(buf_id, false, &wb_context) & BUF_WRITTEN) { @@ -2176,8 +2249,8 @@ BgBufferSync(WritebackContext *wb_context) /* Execute the LRU scan */ while (num_to_scan > 0 && reusable_buffers < upcoming_alloc_est) { - int buffer_state = SyncOneBuffer(next_to_clean, true, - wb_context); + int sync_state = SyncOneBuffer(next_to_clean, true, + wb_context); if (++next_to_clean >= NBuffers) { @@ -2186,7 +2259,7 @@ BgBufferSync(WritebackContext *wb_context) } num_to_scan--; - if (buffer_state & BUF_WRITTEN) + if (sync_state & BUF_WRITTEN) { reusable_buffers++; if (++num_written >= bgwriter_lru_maxpages) @@ -2195,7 +2268,7 @@ BgBufferSync(WritebackContext *wb_context) break; } } - else if (buffer_state & BUF_REUSABLE) + else if (sync_state & BUF_REUSABLE) reusable_buffers++; } @@ -2258,6 +2331,7 @@ SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context) { BufferDesc *bufHdr = GetBufferDescriptor(buf_id); int result = 0; + uint32 buf_state; BufferTag tag; ReservePrivateRefCountEntry(); @@ -2271,21 +2345,24 @@ SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context) * don't worry because our checkpoint.redo points before log record for * upcoming changes and so we are not required to write such dirty buffer. */ - LockBufHdr(bufHdr); + buf_state = LockBufHdr(bufHdr); - if (bufHdr->refcount == 0 && bufHdr->usage_count == 0) + if (BUF_STATE_GET_REFCOUNT(buf_state) == 0 && + BUF_STATE_GET_USAGECOUNT(buf_state) == 0) + { result |= BUF_REUSABLE; + } else if (skip_recently_used) { /* Caller told us not to write recently-used buffers */ - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); return result; } - if (!(bufHdr->flags & BM_VALID) || !(bufHdr->flags & BM_DIRTY)) + if (!(buf_state & BM_VALID) || !(buf_state & BM_DIRTY)) { /* It's clean, so nothing to do */ - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); return result; } @@ -2439,6 +2516,7 @@ PrintBufferLeakWarning(Buffer buffer) int32 loccount; char *path; BackendId backend; + uint32 buf_state; Assert(BufferIsValid(buffer)); if (BufferIsLocal(buffer)) @@ -2456,12 +2534,13 @@ PrintBufferLeakWarning(Buffer buffer) /* theoretically we should lock the bufhdr here */ path = relpathbackend(buf->tag.rnode, backend, buf->tag.forkNum); + buf_state = pg_atomic_read_u32(&buf->state); elog(WARNING, "buffer refcount leak: [%03d] " "(rel=%s, blockNum=%u, flags=0x%x, refcount=%u %d)", buffer, path, - buf->tag.blockNum, buf->flags, - buf->refcount, loccount); + buf->tag.blockNum, buf_state & BUF_FLAG_MASK, + BUF_STATE_GET_REFCOUNT(buf_state), loccount); pfree(path); } @@ -2573,6 +2652,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln) io_time; Block bufBlock; char *bufToWrite; + uint32 buf_state; /* * Acquire the buffer's io_in_progress lock. If StartBufferIO returns @@ -2598,7 +2678,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln) reln->smgr_rnode.node.dbNode, reln->smgr_rnode.node.relNode); - LockBufHdr(buf); + buf_state = LockBufHdr(buf); /* * Run PageGetLSN while holding header lock, since we don't have the @@ -2607,8 +2687,8 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln) recptr = BufferGetLSN(buf); /* To check if block content changes while flushing. - vadim 01/17/97 */ - buf->flags &= ~BM_JUST_DIRTIED; - UnlockBufHdr(buf); + buf_state &= ~BM_JUST_DIRTIED; + UnlockBufHdr(buf, buf_state); /* * Force XLOG flush up to buffer's LSN. This implements the basic WAL @@ -2627,7 +2707,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln) * disastrous system-wide consequences. To make sure that can't happen, * skip the flush if the buffer isn't permanent. */ - if (buf->flags & BM_PERMANENT) + if (buf_state & BM_PERMANENT) XLogFlush(recptr); /* @@ -2716,12 +2796,12 @@ BufferIsPermanent(Buffer buffer) /* * BM_PERMANENT can't be changed while we hold a pin on the buffer, so we * need not bother with the buffer header spinlock. Even if someone else - * changes the buffer header flags while we're doing this, we assume that - * changing an aligned 2-byte BufFlags value is atomic, so we'll read the - * old value or the new value, but not random garbage. + * changes the buffer header state while we're doing this, the state is + * changed atomically, so we'll read the old value or the new value, but + * not random garbage. */ bufHdr = GetBufferDescriptor(buffer - 1); - return (bufHdr->flags & BM_PERMANENT) != 0; + return (pg_atomic_read_u32(&bufHdr->state) & BM_PERMANENT) != 0; } /* @@ -2736,6 +2816,7 @@ BufferGetLSNAtomic(Buffer buffer) BufferDesc *bufHdr = GetBufferDescriptor(buffer - 1); char *page = BufferGetPage(buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST); XLogRecPtr lsn; + uint32 buf_state; /* * If we don't need locking for correctness, fastpath out. @@ -2747,9 +2828,9 @@ BufferGetLSNAtomic(Buffer buffer) Assert(BufferIsValid(buffer)); Assert(BufferIsPinned(buffer)); - LockBufHdr(bufHdr); + buf_state = LockBufHdr(bufHdr); lsn = PageGetLSN(page); - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); return lsn; } @@ -2797,6 +2878,7 @@ DropRelFileNodeBuffers(RelFileNodeBackend rnode, ForkNumber forkNum, for (i = 0; i < NBuffers; i++) { BufferDesc *bufHdr = GetBufferDescriptor(i); + uint32 buf_state; /* * We can make this a tad faster by prechecking the buffer tag before @@ -2817,13 +2899,13 @@ DropRelFileNodeBuffers(RelFileNodeBackend rnode, ForkNumber forkNum, if (!RelFileNodeEquals(bufHdr->tag.rnode, rnode.node)) continue; - LockBufHdr(bufHdr); + buf_state = LockBufHdr(bufHdr); if (RelFileNodeEquals(bufHdr->tag.rnode, rnode.node) && bufHdr->tag.forkNum == forkNum && bufHdr->tag.blockNum >= firstDelBlock) InvalidateBuffer(bufHdr); /* releases spinlock */ else - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); } } @@ -2887,6 +2969,7 @@ DropRelFileNodesAllBuffers(RelFileNodeBackend *rnodes, int nnodes) { RelFileNode *rnode = NULL; BufferDesc *bufHdr = GetBufferDescriptor(i); + uint32 buf_state; /* * As in DropRelFileNodeBuffers, an unlocked precheck should be safe @@ -2917,11 +3000,11 @@ DropRelFileNodesAllBuffers(RelFileNodeBackend *rnodes, int nnodes) if (rnode == NULL) continue; - LockBufHdr(bufHdr); + buf_state = LockBufHdr(bufHdr); if (RelFileNodeEquals(bufHdr->tag.rnode, (*rnode))) InvalidateBuffer(bufHdr); /* releases spinlock */ else - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); } pfree(nodes); @@ -2951,6 +3034,7 @@ DropDatabaseBuffers(Oid dbid) for (i = 0; i < NBuffers; i++) { BufferDesc *bufHdr = GetBufferDescriptor(i); + uint32 buf_state; /* * As in DropRelFileNodeBuffers, an unlocked precheck should be safe @@ -2959,11 +3043,11 @@ DropDatabaseBuffers(Oid dbid) if (bufHdr->tag.rnode.dbNode != dbid) continue; - LockBufHdr(bufHdr); + buf_state = LockBufHdr(bufHdr); if (bufHdr->tag.rnode.dbNode == dbid) InvalidateBuffer(bufHdr); /* releases spinlock */ else - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); } } @@ -3055,9 +3139,12 @@ FlushRelationBuffers(Relation rel) { for (i = 0; i < NLocBuffer; i++) { + uint32 buf_state; + bufHdr = GetLocalBufferDescriptor(i); if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) && - (bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY)) + ((buf_state = pg_atomic_read_u32(&bufHdr->state)) & + (BM_VALID | BM_DIRTY)) == (BM_VALID | BM_DIRTY)) { ErrorContextCallback errcallback; Page localpage; @@ -3078,7 +3165,8 @@ FlushRelationBuffers(Relation rel) localpage, false); - bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED); + buf_state &= ~(BM_DIRTY | BM_JUST_DIRTIED); + pg_atomic_write_u32(&bufHdr->state, buf_state); /* Pop the error context stack */ error_context_stack = errcallback.previous; @@ -3093,6 +3181,8 @@ FlushRelationBuffers(Relation rel) for (i = 0; i < NBuffers; i++) { + uint32 buf_state; + bufHdr = GetBufferDescriptor(i); /* @@ -3104,9 +3194,9 @@ FlushRelationBuffers(Relation rel) ReservePrivateRefCountEntry(); - LockBufHdr(bufHdr); + buf_state = LockBufHdr(bufHdr); if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) && - (bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY)) + (buf_state & (BM_VALID | BM_DIRTY)) == (BM_VALID | BM_DIRTY)) { PinBuffer_Locked(bufHdr); LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_SHARED); @@ -3115,7 +3205,7 @@ FlushRelationBuffers(Relation rel) UnpinBuffer(bufHdr, true); } else - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); } } @@ -3145,6 +3235,8 @@ FlushDatabaseBuffers(Oid dbid) for (i = 0; i < NBuffers; i++) { + uint32 buf_state; + bufHdr = GetBufferDescriptor(i); /* @@ -3156,9 +3248,9 @@ FlushDatabaseBuffers(Oid dbid) ReservePrivateRefCountEntry(); - LockBufHdr(bufHdr); + buf_state = LockBufHdr(bufHdr); if (bufHdr->tag.rnode.dbNode == dbid && - (bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY)) + (buf_state & (BM_VALID | BM_DIRTY)) == (BM_VALID | BM_DIRTY)) { PinBuffer_Locked(bufHdr); LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_SHARED); @@ -3167,7 +3259,7 @@ FlushDatabaseBuffers(Oid dbid) UnpinBuffer(bufHdr, true); } else - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); } } @@ -3297,12 +3389,13 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std) * is only intended to be used in cases where failing to write out the * data would be harmless anyway, it doesn't really matter. */ - if ((bufHdr->flags & (BM_DIRTY | BM_JUST_DIRTIED)) != + if ((pg_atomic_read_u32(&bufHdr->state) & (BM_DIRTY | BM_JUST_DIRTIED)) != (BM_DIRTY | BM_JUST_DIRTIED)) { XLogRecPtr lsn = InvalidXLogRecPtr; bool dirtied = false; bool delayChkpt = false; + uint32 buf_state; /* * If we need to protect hint bit updates from torn writes, WAL-log a @@ -3313,7 +3406,8 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std) * We don't check full_page_writes here because that logic is included * when we call XLogInsert() since the value changes dynamically. */ - if (XLogHintBitIsNeeded() && (bufHdr->flags & BM_PERMANENT)) + if (XLogHintBitIsNeeded() && + (pg_atomic_read_u32(&bufHdr->state) & BM_PERMANENT)) { /* * If we're in recovery we cannot dirty a page because of a hint. @@ -3352,9 +3446,11 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std) lsn = XLogSaveBufferForHint(buffer, buffer_std); } - LockBufHdr(bufHdr); - Assert(bufHdr->refcount > 0); - if (!(bufHdr->flags & BM_DIRTY)) + buf_state = LockBufHdr(bufHdr); + + Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0); + + if (!(buf_state & BM_DIRTY)) { dirtied = true; /* Means "will be dirtied by this action" */ @@ -3374,8 +3470,9 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std) if (!XLogRecPtrIsInvalid(lsn)) PageSetLSN(page, lsn); } - bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED); - UnlockBufHdr(bufHdr); + + buf_state |= BM_DIRTY | BM_JUST_DIRTIED; + UnlockBufHdr(bufHdr, buf_state); if (delayChkpt) MyPgXact->delayChkpt = false; @@ -3406,17 +3503,19 @@ UnlockBuffers(void) if (buf) { - LockBufHdr(buf); + uint32 buf_state; + + buf_state = LockBufHdr(buf); /* * Don't complain if flag bit not set; it could have been reset but we * got a cancel/die interrupt before getting the signal. */ - if ((buf->flags & BM_PIN_COUNT_WAITER) != 0 && + if ((buf_state & BM_PIN_COUNT_WAITER) != 0 && buf->wait_backend_pid == MyProcPid) - buf->flags &= ~BM_PIN_COUNT_WAITER; + buf_state &= ~BM_PIN_COUNT_WAITER; - UnlockBufHdr(buf); + UnlockBufHdr(buf, buf_state); PinCountWaitBuf = NULL; } @@ -3509,27 +3608,30 @@ LockBufferForCleanup(Buffer buffer) for (;;) { + uint32 buf_state; + /* Try to acquire lock */ LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); - LockBufHdr(bufHdr); - Assert(bufHdr->refcount > 0); - if (bufHdr->refcount == 1) + buf_state = LockBufHdr(bufHdr); + + Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0); + if (BUF_STATE_GET_REFCOUNT(buf_state) == 1) { /* Successfully acquired exclusive lock with pincount 1 */ - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); return; } /* Failed, so mark myself as waiting for pincount 1 */ - if (bufHdr->flags & BM_PIN_COUNT_WAITER) + if (buf_state & BM_PIN_COUNT_WAITER) { - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); LockBuffer(buffer, BUFFER_LOCK_UNLOCK); elog(ERROR, "multiple backends attempting to wait for pincount 1"); } bufHdr->wait_backend_pid = MyProcPid; - bufHdr->flags |= BM_PIN_COUNT_WAITER; PinCountWaitBuf = bufHdr; - UnlockBufHdr(bufHdr); + buf_state |= BM_PIN_COUNT_WAITER; + UnlockBufHdr(bufHdr, buf_state); LockBuffer(buffer, BUFFER_LOCK_UNLOCK); /* Report the wait */ @@ -3558,11 +3660,11 @@ LockBufferForCleanup(Buffer buffer) * impossible with the current usages due to table level locking, but * better be safe. */ - LockBufHdr(bufHdr); - if ((bufHdr->flags & BM_PIN_COUNT_WAITER) != 0 && + buf_state = LockBufHdr(bufHdr); + if ((buf_state & BM_PIN_COUNT_WAITER) != 0 && bufHdr->wait_backend_pid == MyProcPid) - bufHdr->flags &= ~BM_PIN_COUNT_WAITER; - UnlockBufHdr(bufHdr); + buf_state &= ~BM_PIN_COUNT_WAITER; + UnlockBufHdr(bufHdr, buf_state); PinCountWaitBuf = NULL; /* Loop back and try again */ @@ -3603,22 +3705,26 @@ bool ConditionalLockBufferForCleanup(Buffer buffer) { BufferDesc *bufHdr; + uint32 buf_state, + refcount; Assert(BufferIsValid(buffer)); if (BufferIsLocal(buffer)) { + refcount = LocalRefCount[-buffer - 1]; /* There should be exactly one pin */ - Assert(LocalRefCount[-buffer - 1] > 0); - if (LocalRefCount[-buffer - 1] != 1) + Assert(refcount > 0); + if (refcount != 1) return false; /* Nobody else to wait for */ return true; } /* There should be exactly one local pin */ - Assert(GetPrivateRefCount(buffer) > 0); - if (GetPrivateRefCount(buffer) != 1) + refcount = GetPrivateRefCount(buffer); + Assert(refcount); + if (refcount != 1) return false; /* Try to acquire lock */ @@ -3626,17 +3732,19 @@ ConditionalLockBufferForCleanup(Buffer buffer) return false; bufHdr = GetBufferDescriptor(buffer - 1); - LockBufHdr(bufHdr); - Assert(bufHdr->refcount > 0); - if (bufHdr->refcount == 1) + buf_state = LockBufHdr(bufHdr); + refcount = BUF_STATE_GET_REFCOUNT(buf_state); + + Assert(refcount > 0); + if (refcount == 1) { /* Successfully acquired exclusive lock with pincount 1 */ - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); return true; } /* Failed, so release the lock */ - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); LockBuffer(buffer, BUFFER_LOCK_UNLOCK); return false; } @@ -3666,17 +3774,17 @@ WaitIO(BufferDesc *buf) */ for (;;) { - BufFlags sv_flags; + uint32 buf_state; /* * It may not be necessary to acquire the spinlock to check the flag * here, but since this test is essential for correctness, we'd better * play it safe. */ - LockBufHdr(buf); - sv_flags = buf->flags; - UnlockBufHdr(buf); - if (!(sv_flags & BM_IO_IN_PROGRESS)) + buf_state = LockBufHdr(buf); + UnlockBufHdr(buf, buf_state); + + if (!(buf_state & BM_IO_IN_PROGRESS)) break; LWLockAcquire(BufferDescriptorGetIOLock(buf), LW_SHARED); LWLockRelease(BufferDescriptorGetIOLock(buf)); @@ -3704,6 +3812,8 @@ WaitIO(BufferDesc *buf) static bool StartBufferIO(BufferDesc *buf, bool forInput) { + uint32 buf_state; + Assert(!InProgressBuf); for (;;) @@ -3714,9 +3824,9 @@ StartBufferIO(BufferDesc *buf, bool forInput) */ LWLockAcquire(BufferDescriptorGetIOLock(buf), LW_EXCLUSIVE); - LockBufHdr(buf); + buf_state = LockBufHdr(buf); - if (!(buf->flags & BM_IO_IN_PROGRESS)) + if (!(buf_state & BM_IO_IN_PROGRESS)) break; /* @@ -3725,24 +3835,23 @@ StartBufferIO(BufferDesc *buf, bool forInput) * an error (see AbortBufferIO). If that's the case, we must wait for * him to get unwedged. */ - UnlockBufHdr(buf); + UnlockBufHdr(buf, buf_state); LWLockRelease(BufferDescriptorGetIOLock(buf)); WaitIO(buf); } /* Once we get here, there is definitely no I/O active on this buffer */ - if (forInput ? (buf->flags & BM_VALID) : !(buf->flags & BM_DIRTY)) + if (forInput ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY)) { /* someone else already did the I/O */ - UnlockBufHdr(buf); + UnlockBufHdr(buf, buf_state); LWLockRelease(BufferDescriptorGetIOLock(buf)); return false; } - buf->flags |= BM_IO_IN_PROGRESS; - - UnlockBufHdr(buf); + buf_state |= BM_IO_IN_PROGRESS; + UnlockBufHdr(buf, buf_state); InProgressBuf = buf; IsForInput = forInput; @@ -3768,19 +3877,22 @@ StartBufferIO(BufferDesc *buf, bool forInput) * be 0, or BM_VALID if we just finished reading in the page. */ static void -TerminateBufferIO(BufferDesc *buf, bool clear_dirty, int set_flag_bits) +TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits) { + uint32 buf_state; + Assert(buf == InProgressBuf); - LockBufHdr(buf); + buf_state = LockBufHdr(buf); + + Assert(buf_state & BM_IO_IN_PROGRESS); - Assert(buf->flags & BM_IO_IN_PROGRESS); - buf->flags &= ~(BM_IO_IN_PROGRESS | BM_IO_ERROR); - if (clear_dirty && !(buf->flags & BM_JUST_DIRTIED)) - buf->flags &= ~(BM_DIRTY | BM_CHECKPOINT_NEEDED); - buf->flags |= set_flag_bits; + buf_state &= ~(BM_IO_IN_PROGRESS | BM_IO_ERROR); + if (clear_dirty && !(buf_state & BM_JUST_DIRTIED)) + buf_state &= ~(BM_DIRTY | BM_CHECKPOINT_NEEDED); - UnlockBufHdr(buf); + buf_state |= set_flag_bits; + UnlockBufHdr(buf, buf_state); InProgressBuf = NULL; @@ -3803,6 +3915,8 @@ AbortBufferIO(void) if (buf) { + uint32 buf_state; + /* * Since LWLockReleaseAll has already been called, we're not holding * the buffer's io_in_progress_lock. We have to re-acquire it so that @@ -3811,24 +3925,22 @@ AbortBufferIO(void) */ LWLockAcquire(BufferDescriptorGetIOLock(buf), LW_EXCLUSIVE); - LockBufHdr(buf); - Assert(buf->flags & BM_IO_IN_PROGRESS); + buf_state = LockBufHdr(buf); + Assert(buf_state & BM_IO_IN_PROGRESS); if (IsForInput) { - Assert(!(buf->flags & BM_DIRTY)); + Assert(!(buf_state & BM_DIRTY)); + /* We'd better not think buffer is valid yet */ - Assert(!(buf->flags & BM_VALID)); - UnlockBufHdr(buf); + Assert(!(buf_state & BM_VALID)); + UnlockBufHdr(buf, buf_state); } else { - BufFlags sv_flags; - - sv_flags = buf->flags; - Assert(sv_flags & BM_DIRTY); - UnlockBufHdr(buf); + Assert(buf_state & BM_DIRTY); + UnlockBufHdr(buf, buf_state); /* Issue notice if this is not the first failure... */ - if (sv_flags & BM_IO_ERROR) + if (buf_state & BM_IO_ERROR) { /* Buffer is pinned, so we can read tag without spinlock */ char *path; @@ -3912,6 +4024,54 @@ rnode_comparator(const void *p1, const void *p2) } /* + * Lock buffer header - set BM_LOCKED in buffer state. + */ +uint32 +LockBufHdr(BufferDesc *desc) +{ + SpinDelayStatus delayStatus = init_spin_delay(desc); + uint32 old_buf_state; + + while (true) + { + /* set BM_LOCKED flag */ + old_buf_state = pg_atomic_fetch_or_u32(&desc->state, BM_LOCKED); + /* if it wasn't set before we're OK */ + if (!(old_buf_state & BM_LOCKED)) + break; + perform_spin_delay(&delayStatus); + } + finish_spin_delay(&delayStatus); + return old_buf_state | BM_LOCKED; +} + +/* + * Wait until the BM_LOCKED flag isn't set anymore and return the buffer's + * state at that point. + * + * Obviously the buffer could be locked by the time the value is returned, so + * this is primarily useful in CAS style loops. + */ +static uint32 +WaitBufHdrUnlocked(BufferDesc *buf) +{ + SpinDelayStatus delayStatus = init_spin_delay(buf); + uint32 buf_state; + + buf_state = pg_atomic_read_u32(&buf->state); + + while (buf_state & BM_LOCKED) + { + perform_spin_delay(&delayStatus); + buf_state = pg_atomic_read_u32(&buf->state); + } + + finish_spin_delay(&delayStatus); + + return buf_state; +} + +/* * BufferTag comparator. */ static int diff --git a/src/backend/storage/buffer/freelist.c b/src/backend/storage/buffer/freelist.c index 551d15205ca..88b90dc5276 100644 --- a/src/backend/storage/buffer/freelist.c +++ b/src/backend/storage/buffer/freelist.c @@ -98,7 +98,8 @@ typedef struct BufferAccessStrategyData /* Prototypes for internal functions */ -static BufferDesc *GetBufferFromRing(BufferAccessStrategy strategy); +static BufferDesc *GetBufferFromRing(BufferAccessStrategy strategy, + uint32 *buf_state); static void AddBufferToRing(BufferAccessStrategy strategy, BufferDesc *buf); @@ -180,11 +181,12 @@ ClockSweepTick(void) * return the buffer with the buffer header spinlock still held. */ BufferDesc * -StrategyGetBuffer(BufferAccessStrategy strategy) +StrategyGetBuffer(BufferAccessStrategy strategy, uint32 *buf_state) { BufferDesc *buf; int bgwprocno; int trycounter; + uint32 local_buf_state; /* to avoid repeated (de-)referencing */ /* * If given a strategy object, see whether it can select a buffer. We @@ -192,7 +194,7 @@ StrategyGetBuffer(BufferAccessStrategy strategy) */ if (strategy != NULL) { - buf = GetBufferFromRing(strategy); + buf = GetBufferFromRing(strategy, buf_state); if (buf != NULL) return buf; } @@ -279,14 +281,16 @@ StrategyGetBuffer(BufferAccessStrategy strategy) * it before we got to it. It's probably impossible altogether as * of 8.3, but we'd better check anyway.) */ - LockBufHdr(buf); - if (buf->refcount == 0 && buf->usage_count == 0) + local_buf_state = LockBufHdr(buf); + if (BUF_STATE_GET_REFCOUNT(local_buf_state) == 0 + && BUF_STATE_GET_USAGECOUNT(local_buf_state) == 0) { if (strategy != NULL) AddBufferToRing(strategy, buf); + *buf_state = local_buf_state; return buf; } - UnlockBufHdr(buf); + UnlockBufHdr(buf, local_buf_state); } } @@ -295,19 +299,20 @@ StrategyGetBuffer(BufferAccessStrategy strategy) trycounter = NBuffers; for (;;) { - buf = GetBufferDescriptor(ClockSweepTick()); /* * If the buffer is pinned or has a nonzero usage_count, we cannot use * it; decrement the usage_count (unless pinned) and keep scanning. */ - LockBufHdr(buf); - if (buf->refcount == 0) + local_buf_state = LockBufHdr(buf); + + if (BUF_STATE_GET_REFCOUNT(local_buf_state) == 0) { - if (buf->usage_count > 0) + if (BUF_STATE_GET_USAGECOUNT(local_buf_state) != 0) { - buf->usage_count--; + local_buf_state -= BUF_USAGECOUNT_ONE; + trycounter = NBuffers; } else @@ -315,6 +320,7 @@ StrategyGetBuffer(BufferAccessStrategy strategy) /* Found a usable buffer */ if (strategy != NULL) AddBufferToRing(strategy, buf); + *buf_state = local_buf_state; return buf; } } @@ -327,10 +333,10 @@ StrategyGetBuffer(BufferAccessStrategy strategy) * probably better to fail than to risk getting stuck in an * infinite loop. */ - UnlockBufHdr(buf); + UnlockBufHdr(buf, local_buf_state); elog(ERROR, "no unpinned buffers available"); } - UnlockBufHdr(buf); + UnlockBufHdr(buf, local_buf_state); } } @@ -585,10 +591,12 @@ FreeAccessStrategy(BufferAccessStrategy strategy) * The bufhdr spin lock is held on the returned buffer. */ static BufferDesc * -GetBufferFromRing(BufferAccessStrategy strategy) +GetBufferFromRing(BufferAccessStrategy strategy, uint32 *buf_state) { BufferDesc *buf; Buffer bufnum; + uint32 local_buf_state; /* to avoid repeated (de-)referencing */ + /* Advance to next ring slot */ if (++strategy->current >= strategy->ring_size) @@ -616,13 +624,15 @@ GetBufferFromRing(BufferAccessStrategy strategy) * shouldn't re-use it. */ buf = GetBufferDescriptor(bufnum - 1); - LockBufHdr(buf); - if (buf->refcount == 0 && buf->usage_count <= 1) + local_buf_state = LockBufHdr(buf); + if (BUF_STATE_GET_REFCOUNT(local_buf_state) == 0 + && BUF_STATE_GET_USAGECOUNT(local_buf_state) <= 1) { strategy->current_was_in_ring = true; + *buf_state = local_buf_state; return buf; } - UnlockBufHdr(buf); + UnlockBufHdr(buf, local_buf_state); /* * Tell caller to allocate a new buffer with the normal allocation diff --git a/src/backend/storage/buffer/localbuf.c b/src/backend/storage/buffer/localbuf.c index 17640cfe2a7..68b402023a1 100644 --- a/src/backend/storage/buffer/localbuf.c +++ b/src/backend/storage/buffer/localbuf.c @@ -108,6 +108,7 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum, int b; int trycounter; bool found; + uint32 buf_state; INIT_BUFFERTAG(newTag, smgr->smgr_rnode.node, forkNum, blockNum); @@ -128,16 +129,21 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum, fprintf(stderr, "LB ALLOC (%u,%d,%d) %d\n", smgr->smgr_rnode.node.relNode, forkNum, blockNum, -b - 1); #endif + buf_state = pg_atomic_read_u32(&bufHdr->state); + /* this part is equivalent to PinBuffer for a shared buffer */ if (LocalRefCount[b] == 0) { - if (bufHdr->usage_count < BM_MAX_USAGE_COUNT) - bufHdr->usage_count++; + if (BUF_STATE_GET_USAGECOUNT(buf_state) < BM_MAX_USAGE_COUNT) + { + buf_state += BUF_USAGECOUNT_ONE; + pg_atomic_write_u32(&bufHdr->state, buf_state); + } } LocalRefCount[b]++; ResourceOwnerRememberBuffer(CurrentResourceOwner, BufferDescriptorGetBuffer(bufHdr)); - if (bufHdr->flags & BM_VALID) + if (buf_state & BM_VALID) *foundPtr = TRUE; else { @@ -169,9 +175,12 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum, if (LocalRefCount[b] == 0) { - if (bufHdr->usage_count > 0) + buf_state = pg_atomic_read_u32(&bufHdr->state); + + if (BUF_STATE_GET_USAGECOUNT(buf_state) > 0) { - bufHdr->usage_count--; + buf_state -= BUF_USAGECOUNT_ONE; + pg_atomic_write_u32(&bufHdr->state, buf_state); trycounter = NLocBuffer; } else @@ -193,7 +202,7 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum, * this buffer is not referenced but it might still be dirty. if that's * the case, write it out before reusing it! */ - if (bufHdr->flags & BM_DIRTY) + if (buf_state & BM_DIRTY) { SMgrRelation oreln; Page localpage = (char *) LocalBufHdrGetBlock(bufHdr); @@ -211,7 +220,8 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum, false); /* Mark not-dirty now in case we error out below */ - bufHdr->flags &= ~BM_DIRTY; + buf_state &= ~BM_DIRTY; + pg_atomic_write_u32(&bufHdr->state, buf_state); pgBufferUsage.local_blks_written++; } @@ -228,7 +238,7 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum, /* * Update the hash table: remove old entry, if any, and make new one. */ - if (bufHdr->flags & BM_TAG_VALID) + if (buf_state & BM_TAG_VALID) { hresult = (LocalBufferLookupEnt *) hash_search(LocalBufHash, (void *) &bufHdr->tag, @@ -237,7 +247,8 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum, elog(ERROR, "local buffer hash table corrupted"); /* mark buffer invalid just in case hash insert fails */ CLEAR_BUFFERTAG(bufHdr->tag); - bufHdr->flags &= ~(BM_VALID | BM_TAG_VALID); + buf_state &= ~(BM_VALID | BM_TAG_VALID); + pg_atomic_write_u32(&bufHdr->state, buf_state); } hresult = (LocalBufferLookupEnt *) @@ -250,9 +261,11 @@ LocalBufferAlloc(SMgrRelation smgr, ForkNumber forkNum, BlockNumber blockNum, * it's all ours now. */ bufHdr->tag = newTag; - bufHdr->flags &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | BM_IO_ERROR); - bufHdr->flags |= BM_TAG_VALID; - bufHdr->usage_count = 1; + buf_state &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | BM_IO_ERROR); + buf_state |= BM_TAG_VALID; + buf_state &= ~BUF_USAGECOUNT_MASK; + buf_state += BUF_USAGECOUNT_ONE; + pg_atomic_write_u32(&bufHdr->state, buf_state); *foundPtr = FALSE; return bufHdr; @@ -267,6 +280,7 @@ MarkLocalBufferDirty(Buffer buffer) { int bufid; BufferDesc *bufHdr; + uint32 buf_state; Assert(BufferIsLocal(buffer)); @@ -280,10 +294,10 @@ MarkLocalBufferDirty(Buffer buffer) bufHdr = GetLocalBufferDescriptor(bufid); - if (!(bufHdr->flags & BM_DIRTY)) - pgBufferUsage.local_blks_dirtied++; + buf_state = pg_atomic_fetch_or_u32(&bufHdr->state, BM_DIRTY); - bufHdr->flags |= BM_DIRTY; + if (!(buf_state & BM_DIRTY)) + pgBufferUsage.local_blks_dirtied++; } /* @@ -307,8 +321,11 @@ DropRelFileNodeLocalBuffers(RelFileNode rnode, ForkNumber forkNum, { BufferDesc *bufHdr = GetLocalBufferDescriptor(i); LocalBufferLookupEnt *hresult; + uint32 buf_state; - if ((bufHdr->flags & BM_TAG_VALID) && + buf_state = pg_atomic_read_u32(&bufHdr->state); + + if ((buf_state & BM_TAG_VALID) && RelFileNodeEquals(bufHdr->tag.rnode, rnode) && bufHdr->tag.forkNum == forkNum && bufHdr->tag.blockNum >= firstDelBlock) @@ -327,8 +344,9 @@ DropRelFileNodeLocalBuffers(RelFileNode rnode, ForkNumber forkNum, elog(ERROR, "local buffer hash table corrupted"); /* Mark buffer invalid */ CLEAR_BUFFERTAG(bufHdr->tag); - bufHdr->flags = 0; - bufHdr->usage_count = 0; + buf_state &= ~BUF_FLAG_MASK; + buf_state &= ~BUF_USAGECOUNT_MASK; + pg_atomic_write_u32(&bufHdr->state, buf_state); } } } @@ -349,8 +367,11 @@ DropRelFileNodeAllLocalBuffers(RelFileNode rnode) { BufferDesc *bufHdr = GetLocalBufferDescriptor(i); LocalBufferLookupEnt *hresult; + uint32 buf_state; + + buf_state = pg_atomic_read_u32(&bufHdr->state); - if ((bufHdr->flags & BM_TAG_VALID) && + if ((buf_state & BM_TAG_VALID) && RelFileNodeEquals(bufHdr->tag.rnode, rnode)) { if (LocalRefCount[i] != 0) @@ -367,8 +388,9 @@ DropRelFileNodeAllLocalBuffers(RelFileNode rnode) elog(ERROR, "local buffer hash table corrupted"); /* Mark buffer invalid */ CLEAR_BUFFERTAG(bufHdr->tag); - bufHdr->flags = 0; - bufHdr->usage_count = 0; + buf_state &= ~BUF_FLAG_MASK; + buf_state &= ~BUF_USAGECOUNT_MASK; + pg_atomic_write_u32(&bufHdr->state, buf_state); } } } diff --git a/src/backend/storage/lmgr/s_lock.c b/src/backend/storage/lmgr/s_lock.c index cc0bf5e01fb..4a6ffb4f890 100644 --- a/src/backend/storage/lmgr/s_lock.c +++ b/src/backend/storage/lmgr/s_lock.c @@ -3,6 +3,38 @@ * s_lock.c * Hardware-dependent implementation of spinlocks. * + * When waiting for a contended spinlock we loop tightly for awhile, then + * delay using pg_usleep() and try again. Preferably, "awhile" should be a + * small multiple of the maximum time we expect a spinlock to be held. 100 + * iterations seems about right as an initial guess. However, on a + * uniprocessor the loop is a waste of cycles, while in a multi-CPU scenario + * it's usually better to spin a bit longer than to call the kernel, so we try + * to adapt the spin loop count depending on whether we seem to be in a + * uniprocessor or multiprocessor. + * + * Note: you might think MIN_SPINS_PER_DELAY should be just 1, but you'd + * be wrong; there are platforms where that can result in a "stuck + * spinlock" failure. This has been seen particularly on Alphas; it seems + * that the first TAS after returning from kernel space will always fail + * on that hardware. + * + * Once we do decide to block, we use randomly increasing pg_usleep() + * delays. The first delay is 1 msec, then the delay randomly increases to + * about one second, after which we reset to 1 msec and start again. The + * idea here is that in the presence of heavy contention we need to + * increase the delay, else the spinlock holder may never get to run and + * release the lock. (Consider situation where spinlock holder has been + * nice'd down in priority by the scheduler --- it will not get scheduled + * until all would-be acquirers are sleeping, so if we always use a 1-msec + * sleep, there is a real possibility of starvation.) But we can't just + * clamp the delay to an upper bound, else it would take a long time to + * make a reasonable number of tries. + * + * We time out and declare error after NUM_DELAYS delays (thus, exactly + * that many tries). With the given settings, this will usually take 2 or + * so minutes. It seems better to fix the total number of tries (and thus + * the probability of unintended failure) than to fix the total time + * spent. * * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California @@ -21,6 +53,14 @@ #include "storage/s_lock.h" #include "storage/barrier.h" + +#define MIN_SPINS_PER_DELAY 10 +#define MAX_SPINS_PER_DELAY 1000 +#define NUM_DELAYS 1000 +#define MIN_DELAY_USEC 1000L +#define MAX_DELAY_USEC 1000000L + + slock_t dummy_spinlock; static int spins_per_delay = DEFAULT_SPINS_PER_DELAY; @@ -30,117 +70,107 @@ static int spins_per_delay = DEFAULT_SPINS_PER_DELAY; * s_lock_stuck() - complain about a stuck spinlock */ static void -s_lock_stuck(volatile slock_t *lock, const char *file, int line) +s_lock_stuck(void *p, const char *file, int line) { #if defined(S_LOCK_TEST) fprintf(stderr, "\nStuck spinlock (%p) detected at %s:%d.\n", - lock, file, line); + p, file, line); exit(1); #else elog(PANIC, "stuck spinlock (%p) detected at %s:%d", - lock, file, line); + p, file, line); #endif } - /* * s_lock(lock) - platform-independent portion of waiting for a spinlock. */ int s_lock(volatile slock_t *lock, const char *file, int line) { - /* - * We loop tightly for awhile, then delay using pg_usleep() and try again. - * Preferably, "awhile" should be a small multiple of the maximum time we - * expect a spinlock to be held. 100 iterations seems about right as an - * initial guess. However, on a uniprocessor the loop is a waste of - * cycles, while in a multi-CPU scenario it's usually better to spin a bit - * longer than to call the kernel, so we try to adapt the spin loop count - * depending on whether we seem to be in a uniprocessor or multiprocessor. - * - * Note: you might think MIN_SPINS_PER_DELAY should be just 1, but you'd - * be wrong; there are platforms where that can result in a "stuck - * spinlock" failure. This has been seen particularly on Alphas; it seems - * that the first TAS after returning from kernel space will always fail - * on that hardware. - * - * Once we do decide to block, we use randomly increasing pg_usleep() - * delays. The first delay is 1 msec, then the delay randomly increases to - * about one second, after which we reset to 1 msec and start again. The - * idea here is that in the presence of heavy contention we need to - * increase the delay, else the spinlock holder may never get to run and - * release the lock. (Consider situation where spinlock holder has been - * nice'd down in priority by the scheduler --- it will not get scheduled - * until all would-be acquirers are sleeping, so if we always use a 1-msec - * sleep, there is a real possibility of starvation.) But we can't just - * clamp the delay to an upper bound, else it would take a long time to - * make a reasonable number of tries. - * - * We time out and declare error after NUM_DELAYS delays (thus, exactly - * that many tries). With the given settings, this will usually take 2 or - * so minutes. It seems better to fix the total number of tries (and thus - * the probability of unintended failure) than to fix the total time - * spent. - */ -#define MIN_SPINS_PER_DELAY 10 -#define MAX_SPINS_PER_DELAY 1000 -#define NUM_DELAYS 1000 -#define MIN_DELAY_USEC 1000L -#define MAX_DELAY_USEC 1000000L - - int spins = 0; - int delays = 0; - int cur_delay = 0; + SpinDelayStatus delayStatus = init_spin_delay((void *) lock); while (TAS_SPIN(lock)) { - /* CPU-specific delay each time through the loop */ - SPIN_DELAY(); + perform_spin_delay(&delayStatus); + } - /* Block the process every spins_per_delay tries */ - if (++spins >= spins_per_delay) - { - if (++delays > NUM_DELAYS) - s_lock_stuck(lock, file, line); + finish_spin_delay(&delayStatus); - if (cur_delay == 0) /* first time to delay? */ - cur_delay = MIN_DELAY_USEC; + return delayStatus.delays; +} - pg_usleep(cur_delay); +#ifdef USE_DEFAULT_S_UNLOCK +void +s_unlock(volatile slock_t *lock) +{ +#ifdef TAS_ACTIVE_WORD + /* HP's PA-RISC */ + *TAS_ACTIVE_WORD(lock) = -1; +#else + *lock = 0; +#endif +} +#endif + +/* + * Wait while spinning on a contended spinlock. + */ +void +perform_spin_delay(SpinDelayStatus *status) +{ + /* CPU-specific delay each time through the loop */ + SPIN_DELAY(); + + /* Block the process every spins_per_delay tries */ + if (++(status->spins) >= spins_per_delay) + { + if (++(status->delays) > NUM_DELAYS) + s_lock_stuck(status->ptr, status->file, status->line); + + if (status->cur_delay == 0) /* first time to delay? */ + status->cur_delay = MIN_DELAY_USEC; + + pg_usleep(status->cur_delay); #if defined(S_LOCK_TEST) - fprintf(stdout, "*"); - fflush(stdout); + fprintf(stdout, "*"); + fflush(stdout); #endif - /* increase delay by a random fraction between 1X and 2X */ - cur_delay += (int) (cur_delay * + /* increase delay by a random fraction between 1X and 2X */ + status->cur_delay += (int) (status->cur_delay * ((double) random() / (double) MAX_RANDOM_VALUE) + 0.5); - /* wrap back to minimum delay when max is exceeded */ - if (cur_delay > MAX_DELAY_USEC) - cur_delay = MIN_DELAY_USEC; + /* wrap back to minimum delay when max is exceeded */ + if (status->cur_delay > MAX_DELAY_USEC) + status->cur_delay = MIN_DELAY_USEC; - spins = 0; - } + status->spins = 0; } +} - /* - * If we were able to acquire the lock without delaying, it's a good - * indication we are in a multiprocessor. If we had to delay, it's a sign - * (but not a sure thing) that we are in a uniprocessor. Hence, we - * decrement spins_per_delay slowly when we had to delay, and increase it - * rapidly when we didn't. It's expected that spins_per_delay will - * converge to the minimum value on a uniprocessor and to the maximum - * value on a multiprocessor. - * - * Note: spins_per_delay is local within our current process. We want to - * average these observations across multiple backends, since it's - * relatively rare for this function to even get entered, and so a single - * backend might not live long enough to converge on a good value. That - * is handled by the two routines below. - */ - if (cur_delay == 0) +/* + * After acquiring a spinlock, update estimates about how long to loop. + * + * If we were able to acquire the lock without delaying, it's a good + * indication we are in a multiprocessor. If we had to delay, it's a sign + * (but not a sure thing) that we are in a uniprocessor. Hence, we + * decrement spins_per_delay slowly when we had to delay, and increase it + * rapidly when we didn't. It's expected that spins_per_delay will + * converge to the minimum value on a uniprocessor and to the maximum + * value on a multiprocessor. + * + * Note: spins_per_delay is local within our current process. We want to + * average these observations across multiple backends, since it's + * relatively rare for this function to even get entered, and so a single + * backend might not live long enough to converge on a good value. That + * is handled by the two routines below. + */ +void +finish_spin_delay(SpinDelayStatus *status) +{ + if (status->cur_delay == 0) { /* we never had to delay */ if (spins_per_delay < MAX_SPINS_PER_DELAY) @@ -151,22 +181,8 @@ s_lock(volatile slock_t *lock, const char *file, int line) if (spins_per_delay > MIN_SPINS_PER_DELAY) spins_per_delay = Max(spins_per_delay - 1, MIN_SPINS_PER_DELAY); } - return delays; } -#ifdef USE_DEFAULT_S_UNLOCK -void -s_unlock(volatile slock_t *lock) -{ -#ifdef TAS_ACTIVE_WORD - /* HP's PA-RISC */ - *TAS_ACTIVE_WORD(lock) = -1; -#else - *lock = 0; -#endif -} -#endif - /* * Set local copy of spins_per_delay during backend startup. * |