diff options
Diffstat (limited to 'src/backend/storage/buffer/bufmgr.c')
-rw-r--r-- | src/backend/storage/buffer/bufmgr.c | 508 |
1 files changed, 334 insertions, 174 deletions
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index c664984d0a1..29f10e59568 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -436,11 +436,12 @@ static bool PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy); static void PinBuffer_Locked(BufferDesc *buf); static void UnpinBuffer(BufferDesc *buf, bool fixOwner); static void BufferSync(int flags); +static uint32 WaitBufHdrUnlocked(BufferDesc *buf); static int SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *flush_context); static void WaitIO(BufferDesc *buf); static bool StartBufferIO(BufferDesc *buf, bool forInput); static void TerminateBufferIO(BufferDesc *buf, bool clear_dirty, - int set_flag_bits); + uint32 set_flag_bits); static void shared_buffer_write_error_callback(void *arg); static void local_buffer_write_error_callback(void *arg); static BufferDesc *BufferAlloc(SMgrRelation smgr, @@ -816,8 +817,11 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, if (isLocalBuf) { /* Only need to adjust flags */ - Assert(bufHdr->flags & BM_VALID); - bufHdr->flags &= ~BM_VALID; + uint32 buf_state = pg_atomic_read_u32(&bufHdr->state); + + Assert(buf_state & BM_VALID); + buf_state &= ~BM_VALID; + pg_atomic_write_u32(&bufHdr->state, buf_state); } else { @@ -828,10 +832,11 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, */ do { - LockBufHdr(bufHdr); - Assert(bufHdr->flags & BM_VALID); - bufHdr->flags &= ~BM_VALID; - UnlockBufHdr(bufHdr); + uint32 buf_state = LockBufHdr(bufHdr); + + Assert(buf_state & BM_VALID); + buf_state &= ~BM_VALID; + UnlockBufHdr(bufHdr, buf_state); } while (!StartBufferIO(bufHdr, true)); } } @@ -848,7 +853,7 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, * it's not been recycled) but come right back here to try smgrextend * again. */ - Assert(!(bufHdr->flags & BM_VALID)); /* spinlock not needed */ + Assert(!(pg_atomic_read_u32(&bufHdr->state) & BM_VALID)); /* spinlock not needed */ bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr); @@ -933,7 +938,10 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, if (isLocalBuf) { /* Only need to adjust flags */ - bufHdr->flags |= BM_VALID; + uint32 buf_state = pg_atomic_read_u32(&bufHdr->state); + + buf_state |= BM_VALID; + pg_atomic_write_u32(&bufHdr->state, buf_state); } else { @@ -987,10 +995,11 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, BufferTag oldTag; /* previous identity of selected buffer */ uint32 oldHash; /* hash value for oldTag */ LWLock *oldPartitionLock; /* buffer partition lock for it */ - BufFlags oldFlags; + uint32 oldFlags; int buf_id; BufferDesc *buf; bool valid; + uint32 buf_state; /* create a tag so we can lookup the buffer */ INIT_BUFFERTAG(newTag, smgr->smgr_rnode.node, forkNum, blockNum); @@ -1059,12 +1068,12 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, * Select a victim buffer. The buffer is returned with its header * spinlock still held! */ - buf = StrategyGetBuffer(strategy); + buf = StrategyGetBuffer(strategy, &buf_state); - Assert(buf->refcount == 0); + Assert(BUF_STATE_GET_REFCOUNT(buf_state) == 0); /* Must copy buffer flags while we still hold the spinlock */ - oldFlags = buf->flags; + oldFlags = buf_state & BUF_FLAG_MASK; /* Pin the buffer and then release the buffer spinlock */ PinBuffer_Locked(buf); @@ -1108,9 +1117,9 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, XLogRecPtr lsn; /* Read the LSN while holding buffer header lock */ - LockBufHdr(buf); + buf_state = LockBufHdr(buf); lsn = BufferGetLSN(buf); - UnlockBufHdr(buf); + UnlockBufHdr(buf, buf_state); if (XLogNeedsFlush(lsn) && StrategyRejectBuffer(strategy, buf)) @@ -1254,7 +1263,7 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, /* * Need to lock the buffer header too in order to change its tag. */ - LockBufHdr(buf); + buf_state = LockBufHdr(buf); /* * Somebody could have pinned or re-dirtied the buffer while we were @@ -1262,11 +1271,11 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, * recycle this buffer; we must undo everything we've done and start * over with a new victim buffer. */ - oldFlags = buf->flags; - if (buf->refcount == 1 && !(oldFlags & BM_DIRTY)) + oldFlags = buf_state & BUF_FLAG_MASK; + if (BUF_STATE_GET_REFCOUNT(buf_state) == 1 && !(oldFlags & BM_DIRTY)) break; - UnlockBufHdr(buf); + UnlockBufHdr(buf, buf_state); BufTableDelete(&newTag, newHash); if ((oldFlags & BM_TAG_VALID) && oldPartitionLock != newPartitionLock) @@ -1284,14 +1293,15 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, * 1 so that the buffer can survive one clock-sweep pass.) */ buf->tag = newTag; - buf->flags &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | BM_CHECKPOINT_NEEDED | BM_IO_ERROR | BM_PERMANENT); + buf_state &= ~(BM_VALID | BM_DIRTY | BM_JUST_DIRTIED | + BM_CHECKPOINT_NEEDED | BM_IO_ERROR | BM_PERMANENT | + BUF_USAGECOUNT_MASK); if (relpersistence == RELPERSISTENCE_PERMANENT) - buf->flags |= BM_TAG_VALID | BM_PERMANENT; + buf_state |= BM_TAG_VALID | BM_PERMANENT | BUF_USAGECOUNT_ONE; else - buf->flags |= BM_TAG_VALID; - buf->usage_count = 1; + buf_state |= BM_TAG_VALID | BUF_USAGECOUNT_ONE; - UnlockBufHdr(buf); + UnlockBufHdr(buf, buf_state); if (oldFlags & BM_TAG_VALID) { @@ -1338,12 +1348,15 @@ InvalidateBuffer(BufferDesc *buf) BufferTag oldTag; uint32 oldHash; /* hash value for oldTag */ LWLock *oldPartitionLock; /* buffer partition lock for it */ - BufFlags oldFlags; + uint32 oldFlags; + uint32 buf_state; /* Save the original buffer tag before dropping the spinlock */ oldTag = buf->tag; - UnlockBufHdr(buf); + buf_state = pg_atomic_read_u32(&buf->state); + Assert(buf_state & BM_LOCKED); + UnlockBufHdr(buf, buf_state); /* * Need to compute the old tag's hashcode and partition lock ID. XXX is it @@ -1362,12 +1375,12 @@ retry: LWLockAcquire(oldPartitionLock, LW_EXCLUSIVE); /* Re-lock the buffer header */ - LockBufHdr(buf); + buf_state = LockBufHdr(buf); /* If it's changed while we were waiting for lock, do nothing */ if (!BUFFERTAGS_EQUAL(buf->tag, oldTag)) { - UnlockBufHdr(buf); + UnlockBufHdr(buf, buf_state); LWLockRelease(oldPartitionLock); return; } @@ -1381,9 +1394,9 @@ retry: * yet done StartBufferIO, WaitIO will fall through and we'll effectively * be busy-looping here.) */ - if (buf->refcount != 0) + if (BUF_STATE_GET_REFCOUNT(buf_state) != 0) { - UnlockBufHdr(buf); + UnlockBufHdr(buf, buf_state); LWLockRelease(oldPartitionLock); /* safety check: should definitely not be our *own* pin */ if (GetPrivateRefCount(BufferDescriptorGetBuffer(buf)) > 0) @@ -1396,12 +1409,10 @@ retry: * Clear out the buffer's tag and flags. We must do this to ensure that * linear scans of the buffer array don't think the buffer is valid. */ - oldFlags = buf->flags; + oldFlags = buf_state & BUF_FLAG_MASK; CLEAR_BUFFERTAG(buf->tag); - buf->flags = 0; - buf->usage_count = 0; - - UnlockBufHdr(buf); + buf_state &= ~(BUF_FLAG_MASK | BUF_USAGECOUNT_MASK); + UnlockBufHdr(buf, buf_state); /* * Remove the buffer from the lookup hashtable, if it was in there. @@ -1433,6 +1444,8 @@ void MarkBufferDirty(Buffer buffer) { BufferDesc *bufHdr; + uint32 buf_state; + uint32 old_buf_state; if (!BufferIsValid(buffer)) elog(ERROR, "bad buffer ID: %d", buffer); @@ -1449,24 +1462,32 @@ MarkBufferDirty(Buffer buffer) /* unfortunately we can't check if the lock is held exclusively */ Assert(LWLockHeldByMe(BufferDescriptorGetContentLock(bufHdr))); - LockBufHdr(bufHdr); + old_buf_state = pg_atomic_read_u32(&bufHdr->state); + for (;;) + { + if (old_buf_state & BM_LOCKED) + old_buf_state = WaitBufHdrUnlocked(bufHdr); - Assert(bufHdr->refcount > 0); + buf_state = old_buf_state; + + Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0); + buf_state |= BM_DIRTY | BM_JUST_DIRTIED; + + if (pg_atomic_compare_exchange_u32(&bufHdr->state, &old_buf_state, + buf_state)) + break; + } /* * If the buffer was not dirty already, do vacuum accounting. */ - if (!(bufHdr->flags & BM_DIRTY)) + if (!(old_buf_state & BM_DIRTY)) { VacuumPageDirty++; pgBufferUsage.shared_blks_dirtied++; if (VacuumCostActive) VacuumCostBalance += VacuumCostPageDirty; } - - bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED); - - UnlockBufHdr(bufHdr); } /* @@ -1531,6 +1552,10 @@ ReleaseAndReadBuffer(Buffer buffer, * * This should be applied only to shared buffers, never local ones. * + * Since buffers are pinned/unpinned very frequently, pin buffers without + * taking the buffer header lock; instead update the state variable in loop of + * CAS operations. Hopefully it's just a single CAS. + * * Note that ResourceOwnerEnlargeBuffers must have been done already. * * Returns TRUE if buffer is BM_VALID, else FALSE. This provision allows @@ -1547,23 +1572,34 @@ PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy) if (ref == NULL) { + uint32 buf_state; + uint32 old_buf_state; + ReservePrivateRefCountEntry(); ref = NewPrivateRefCountEntry(b); - LockBufHdr(buf); - buf->refcount++; - if (strategy == NULL) - { - if (buf->usage_count < BM_MAX_USAGE_COUNT) - buf->usage_count++; - } - else + old_buf_state = pg_atomic_read_u32(&buf->state); + for (;;) { - if (buf->usage_count == 0) - buf->usage_count = 1; + if (old_buf_state & BM_LOCKED) + old_buf_state = WaitBufHdrUnlocked(buf); + + buf_state = old_buf_state; + + /* increase refcount */ + buf_state += BUF_REFCOUNT_ONE; + + /* increase usagecount unless already max */ + if (BUF_STATE_GET_USAGECOUNT(buf_state) != BM_MAX_USAGE_COUNT) + buf_state += BUF_USAGECOUNT_ONE; + + if (pg_atomic_compare_exchange_u32(&buf->state, &old_buf_state, + buf_state)) + { + result = (buf_state & BM_VALID) != 0; + break; + } } - result = (buf->flags & BM_VALID) != 0; - UnlockBufHdr(buf); } else { @@ -1603,6 +1639,7 @@ PinBuffer_Locked(BufferDesc *buf) { Buffer b; PrivateRefCountEntry *ref; + uint32 buf_state; /* * As explained, We don't expect any preexisting pins. That allows us to @@ -1610,8 +1647,14 @@ PinBuffer_Locked(BufferDesc *buf) */ Assert(GetPrivateRefCountEntry(BufferDescriptorGetBuffer(buf), false) == NULL); - buf->refcount++; - UnlockBufHdr(buf); + /* + * Since we hold the buffer spinlock, we can update the buffer state and + * release the lock in one operation. + */ + buf_state = pg_atomic_read_u32(&buf->state); + Assert(buf_state & BM_LOCKED); + buf_state += BUF_REFCOUNT_ONE; + UnlockBufHdr(buf, buf_state); b = BufferDescriptorGetBuffer(buf); @@ -1646,30 +1689,59 @@ UnpinBuffer(BufferDesc *buf, bool fixOwner) ref->refcount--; if (ref->refcount == 0) { + uint32 buf_state; + uint32 old_buf_state; + /* I'd better not still hold any locks on the buffer */ Assert(!LWLockHeldByMe(BufferDescriptorGetContentLock(buf))); Assert(!LWLockHeldByMe(BufferDescriptorGetIOLock(buf))); - LockBufHdr(buf); + /* + * Decrement the shared reference count. + * + * Since buffer spinlock holder can update status using just write, + * it's not safe to use atomic decrement here; thus use a CAS loop. + */ + old_buf_state = pg_atomic_read_u32(&buf->state); + for (;;) + { + if (old_buf_state & BM_LOCKED) + old_buf_state = WaitBufHdrUnlocked(buf); + + buf_state = old_buf_state; + + buf_state -= BUF_REFCOUNT_ONE; - /* Decrement the shared reference count */ - Assert(buf->refcount > 0); - buf->refcount--; + if (pg_atomic_compare_exchange_u32(&buf->state, &old_buf_state, + buf_state)) + break; + } /* Support LockBufferForCleanup() */ - if ((buf->flags & BM_PIN_COUNT_WAITER) && - buf->refcount == 1) + if (buf_state & BM_PIN_COUNT_WAITER) { - /* we just released the last pin other than the waiter's */ - int wait_backend_pid = buf->wait_backend_pid; + /* + * Acquire the buffer header lock, re-check that there's a waiter. + * Another backend could have unpinned this buffer, and already + * woken up the waiter. There's no danger of the buffer being + * replaced after we unpinned it above, as it's pinned by the + * waiter. + */ + buf_state = LockBufHdr(buf); - buf->flags &= ~BM_PIN_COUNT_WAITER; - UnlockBufHdr(buf); - ProcSendSignal(wait_backend_pid); - } - else - UnlockBufHdr(buf); + if ((buf_state & BM_PIN_COUNT_WAITER) && + BUF_STATE_GET_REFCOUNT(buf_state) == 1) + { + /* we just released the last pin other than the waiter's */ + int wait_backend_pid = buf->wait_backend_pid; + buf_state &= ~BM_PIN_COUNT_WAITER; + UnlockBufHdr(buf, buf_state); + ProcSendSignal(wait_backend_pid); + } + else + UnlockBufHdr(buf, buf_state); + } ForgetPrivateRefCountEntry(ref); } } @@ -1687,6 +1759,7 @@ UnpinBuffer(BufferDesc *buf, bool fixOwner) static void BufferSync(int flags) { + uint32 buf_state; int buf_id; int num_to_scan; int num_spaces; @@ -1736,13 +1809,13 @@ BufferSync(int flags) * Header spinlock is enough to examine BM_DIRTY, see comment in * SyncOneBuffer. */ - LockBufHdr(bufHdr); + buf_state = LockBufHdr(bufHdr); - if ((bufHdr->flags & mask) == mask) + if ((buf_state & mask) == mask) { CkptSortItem *item; - bufHdr->flags |= BM_CHECKPOINT_NEEDED; + buf_state |= BM_CHECKPOINT_NEEDED; item = &CkptBufferIds[num_to_scan++]; item->buf_id = buf_id; @@ -1752,7 +1825,7 @@ BufferSync(int flags) item->blockNum = bufHdr->tag.blockNum; } - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); } if (num_to_scan == 0) @@ -1888,7 +1961,7 @@ BufferSync(int flags) * write the buffer though we didn't need to. It doesn't seem worth * guarding against this, though. */ - if (bufHdr->flags & BM_CHECKPOINT_NEEDED) + if (pg_atomic_read_u32(&bufHdr->state) & BM_CHECKPOINT_NEEDED) { if (SyncOneBuffer(buf_id, false, &wb_context) & BUF_WRITTEN) { @@ -2176,8 +2249,8 @@ BgBufferSync(WritebackContext *wb_context) /* Execute the LRU scan */ while (num_to_scan > 0 && reusable_buffers < upcoming_alloc_est) { - int buffer_state = SyncOneBuffer(next_to_clean, true, - wb_context); + int sync_state = SyncOneBuffer(next_to_clean, true, + wb_context); if (++next_to_clean >= NBuffers) { @@ -2186,7 +2259,7 @@ BgBufferSync(WritebackContext *wb_context) } num_to_scan--; - if (buffer_state & BUF_WRITTEN) + if (sync_state & BUF_WRITTEN) { reusable_buffers++; if (++num_written >= bgwriter_lru_maxpages) @@ -2195,7 +2268,7 @@ BgBufferSync(WritebackContext *wb_context) break; } } - else if (buffer_state & BUF_REUSABLE) + else if (sync_state & BUF_REUSABLE) reusable_buffers++; } @@ -2258,6 +2331,7 @@ SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context) { BufferDesc *bufHdr = GetBufferDescriptor(buf_id); int result = 0; + uint32 buf_state; BufferTag tag; ReservePrivateRefCountEntry(); @@ -2271,21 +2345,24 @@ SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context) * don't worry because our checkpoint.redo points before log record for * upcoming changes and so we are not required to write such dirty buffer. */ - LockBufHdr(bufHdr); + buf_state = LockBufHdr(bufHdr); - if (bufHdr->refcount == 0 && bufHdr->usage_count == 0) + if (BUF_STATE_GET_REFCOUNT(buf_state) == 0 && + BUF_STATE_GET_USAGECOUNT(buf_state) == 0) + { result |= BUF_REUSABLE; + } else if (skip_recently_used) { /* Caller told us not to write recently-used buffers */ - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); return result; } - if (!(bufHdr->flags & BM_VALID) || !(bufHdr->flags & BM_DIRTY)) + if (!(buf_state & BM_VALID) || !(buf_state & BM_DIRTY)) { /* It's clean, so nothing to do */ - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); return result; } @@ -2439,6 +2516,7 @@ PrintBufferLeakWarning(Buffer buffer) int32 loccount; char *path; BackendId backend; + uint32 buf_state; Assert(BufferIsValid(buffer)); if (BufferIsLocal(buffer)) @@ -2456,12 +2534,13 @@ PrintBufferLeakWarning(Buffer buffer) /* theoretically we should lock the bufhdr here */ path = relpathbackend(buf->tag.rnode, backend, buf->tag.forkNum); + buf_state = pg_atomic_read_u32(&buf->state); elog(WARNING, "buffer refcount leak: [%03d] " "(rel=%s, blockNum=%u, flags=0x%x, refcount=%u %d)", buffer, path, - buf->tag.blockNum, buf->flags, - buf->refcount, loccount); + buf->tag.blockNum, buf_state & BUF_FLAG_MASK, + BUF_STATE_GET_REFCOUNT(buf_state), loccount); pfree(path); } @@ -2573,6 +2652,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln) io_time; Block bufBlock; char *bufToWrite; + uint32 buf_state; /* * Acquire the buffer's io_in_progress lock. If StartBufferIO returns @@ -2598,7 +2678,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln) reln->smgr_rnode.node.dbNode, reln->smgr_rnode.node.relNode); - LockBufHdr(buf); + buf_state = LockBufHdr(buf); /* * Run PageGetLSN while holding header lock, since we don't have the @@ -2607,8 +2687,8 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln) recptr = BufferGetLSN(buf); /* To check if block content changes while flushing. - vadim 01/17/97 */ - buf->flags &= ~BM_JUST_DIRTIED; - UnlockBufHdr(buf); + buf_state &= ~BM_JUST_DIRTIED; + UnlockBufHdr(buf, buf_state); /* * Force XLOG flush up to buffer's LSN. This implements the basic WAL @@ -2627,7 +2707,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln) * disastrous system-wide consequences. To make sure that can't happen, * skip the flush if the buffer isn't permanent. */ - if (buf->flags & BM_PERMANENT) + if (buf_state & BM_PERMANENT) XLogFlush(recptr); /* @@ -2716,12 +2796,12 @@ BufferIsPermanent(Buffer buffer) /* * BM_PERMANENT can't be changed while we hold a pin on the buffer, so we * need not bother with the buffer header spinlock. Even if someone else - * changes the buffer header flags while we're doing this, we assume that - * changing an aligned 2-byte BufFlags value is atomic, so we'll read the - * old value or the new value, but not random garbage. + * changes the buffer header state while we're doing this, the state is + * changed atomically, so we'll read the old value or the new value, but + * not random garbage. */ bufHdr = GetBufferDescriptor(buffer - 1); - return (bufHdr->flags & BM_PERMANENT) != 0; + return (pg_atomic_read_u32(&bufHdr->state) & BM_PERMANENT) != 0; } /* @@ -2736,6 +2816,7 @@ BufferGetLSNAtomic(Buffer buffer) BufferDesc *bufHdr = GetBufferDescriptor(buffer - 1); char *page = BufferGetPage(buffer, NULL, NULL, BGP_NO_SNAPSHOT_TEST); XLogRecPtr lsn; + uint32 buf_state; /* * If we don't need locking for correctness, fastpath out. @@ -2747,9 +2828,9 @@ BufferGetLSNAtomic(Buffer buffer) Assert(BufferIsValid(buffer)); Assert(BufferIsPinned(buffer)); - LockBufHdr(bufHdr); + buf_state = LockBufHdr(bufHdr); lsn = PageGetLSN(page); - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); return lsn; } @@ -2797,6 +2878,7 @@ DropRelFileNodeBuffers(RelFileNodeBackend rnode, ForkNumber forkNum, for (i = 0; i < NBuffers; i++) { BufferDesc *bufHdr = GetBufferDescriptor(i); + uint32 buf_state; /* * We can make this a tad faster by prechecking the buffer tag before @@ -2817,13 +2899,13 @@ DropRelFileNodeBuffers(RelFileNodeBackend rnode, ForkNumber forkNum, if (!RelFileNodeEquals(bufHdr->tag.rnode, rnode.node)) continue; - LockBufHdr(bufHdr); + buf_state = LockBufHdr(bufHdr); if (RelFileNodeEquals(bufHdr->tag.rnode, rnode.node) && bufHdr->tag.forkNum == forkNum && bufHdr->tag.blockNum >= firstDelBlock) InvalidateBuffer(bufHdr); /* releases spinlock */ else - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); } } @@ -2887,6 +2969,7 @@ DropRelFileNodesAllBuffers(RelFileNodeBackend *rnodes, int nnodes) { RelFileNode *rnode = NULL; BufferDesc *bufHdr = GetBufferDescriptor(i); + uint32 buf_state; /* * As in DropRelFileNodeBuffers, an unlocked precheck should be safe @@ -2917,11 +3000,11 @@ DropRelFileNodesAllBuffers(RelFileNodeBackend *rnodes, int nnodes) if (rnode == NULL) continue; - LockBufHdr(bufHdr); + buf_state = LockBufHdr(bufHdr); if (RelFileNodeEquals(bufHdr->tag.rnode, (*rnode))) InvalidateBuffer(bufHdr); /* releases spinlock */ else - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); } pfree(nodes); @@ -2951,6 +3034,7 @@ DropDatabaseBuffers(Oid dbid) for (i = 0; i < NBuffers; i++) { BufferDesc *bufHdr = GetBufferDescriptor(i); + uint32 buf_state; /* * As in DropRelFileNodeBuffers, an unlocked precheck should be safe @@ -2959,11 +3043,11 @@ DropDatabaseBuffers(Oid dbid) if (bufHdr->tag.rnode.dbNode != dbid) continue; - LockBufHdr(bufHdr); + buf_state = LockBufHdr(bufHdr); if (bufHdr->tag.rnode.dbNode == dbid) InvalidateBuffer(bufHdr); /* releases spinlock */ else - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); } } @@ -3055,9 +3139,12 @@ FlushRelationBuffers(Relation rel) { for (i = 0; i < NLocBuffer; i++) { + uint32 buf_state; + bufHdr = GetLocalBufferDescriptor(i); if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) && - (bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY)) + ((buf_state = pg_atomic_read_u32(&bufHdr->state)) & + (BM_VALID | BM_DIRTY)) == (BM_VALID | BM_DIRTY)) { ErrorContextCallback errcallback; Page localpage; @@ -3078,7 +3165,8 @@ FlushRelationBuffers(Relation rel) localpage, false); - bufHdr->flags &= ~(BM_DIRTY | BM_JUST_DIRTIED); + buf_state &= ~(BM_DIRTY | BM_JUST_DIRTIED); + pg_atomic_write_u32(&bufHdr->state, buf_state); /* Pop the error context stack */ error_context_stack = errcallback.previous; @@ -3093,6 +3181,8 @@ FlushRelationBuffers(Relation rel) for (i = 0; i < NBuffers; i++) { + uint32 buf_state; + bufHdr = GetBufferDescriptor(i); /* @@ -3104,9 +3194,9 @@ FlushRelationBuffers(Relation rel) ReservePrivateRefCountEntry(); - LockBufHdr(bufHdr); + buf_state = LockBufHdr(bufHdr); if (RelFileNodeEquals(bufHdr->tag.rnode, rel->rd_node) && - (bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY)) + (buf_state & (BM_VALID | BM_DIRTY)) == (BM_VALID | BM_DIRTY)) { PinBuffer_Locked(bufHdr); LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_SHARED); @@ -3115,7 +3205,7 @@ FlushRelationBuffers(Relation rel) UnpinBuffer(bufHdr, true); } else - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); } } @@ -3145,6 +3235,8 @@ FlushDatabaseBuffers(Oid dbid) for (i = 0; i < NBuffers; i++) { + uint32 buf_state; + bufHdr = GetBufferDescriptor(i); /* @@ -3156,9 +3248,9 @@ FlushDatabaseBuffers(Oid dbid) ReservePrivateRefCountEntry(); - LockBufHdr(bufHdr); + buf_state = LockBufHdr(bufHdr); if (bufHdr->tag.rnode.dbNode == dbid && - (bufHdr->flags & BM_VALID) && (bufHdr->flags & BM_DIRTY)) + (buf_state & (BM_VALID | BM_DIRTY)) == (BM_VALID | BM_DIRTY)) { PinBuffer_Locked(bufHdr); LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_SHARED); @@ -3167,7 +3259,7 @@ FlushDatabaseBuffers(Oid dbid) UnpinBuffer(bufHdr, true); } else - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); } } @@ -3297,12 +3389,13 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std) * is only intended to be used in cases where failing to write out the * data would be harmless anyway, it doesn't really matter. */ - if ((bufHdr->flags & (BM_DIRTY | BM_JUST_DIRTIED)) != + if ((pg_atomic_read_u32(&bufHdr->state) & (BM_DIRTY | BM_JUST_DIRTIED)) != (BM_DIRTY | BM_JUST_DIRTIED)) { XLogRecPtr lsn = InvalidXLogRecPtr; bool dirtied = false; bool delayChkpt = false; + uint32 buf_state; /* * If we need to protect hint bit updates from torn writes, WAL-log a @@ -3313,7 +3406,8 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std) * We don't check full_page_writes here because that logic is included * when we call XLogInsert() since the value changes dynamically. */ - if (XLogHintBitIsNeeded() && (bufHdr->flags & BM_PERMANENT)) + if (XLogHintBitIsNeeded() && + (pg_atomic_read_u32(&bufHdr->state) & BM_PERMANENT)) { /* * If we're in recovery we cannot dirty a page because of a hint. @@ -3352,9 +3446,11 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std) lsn = XLogSaveBufferForHint(buffer, buffer_std); } - LockBufHdr(bufHdr); - Assert(bufHdr->refcount > 0); - if (!(bufHdr->flags & BM_DIRTY)) + buf_state = LockBufHdr(bufHdr); + + Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0); + + if (!(buf_state & BM_DIRTY)) { dirtied = true; /* Means "will be dirtied by this action" */ @@ -3374,8 +3470,9 @@ MarkBufferDirtyHint(Buffer buffer, bool buffer_std) if (!XLogRecPtrIsInvalid(lsn)) PageSetLSN(page, lsn); } - bufHdr->flags |= (BM_DIRTY | BM_JUST_DIRTIED); - UnlockBufHdr(bufHdr); + + buf_state |= BM_DIRTY | BM_JUST_DIRTIED; + UnlockBufHdr(bufHdr, buf_state); if (delayChkpt) MyPgXact->delayChkpt = false; @@ -3406,17 +3503,19 @@ UnlockBuffers(void) if (buf) { - LockBufHdr(buf); + uint32 buf_state; + + buf_state = LockBufHdr(buf); /* * Don't complain if flag bit not set; it could have been reset but we * got a cancel/die interrupt before getting the signal. */ - if ((buf->flags & BM_PIN_COUNT_WAITER) != 0 && + if ((buf_state & BM_PIN_COUNT_WAITER) != 0 && buf->wait_backend_pid == MyProcPid) - buf->flags &= ~BM_PIN_COUNT_WAITER; + buf_state &= ~BM_PIN_COUNT_WAITER; - UnlockBufHdr(buf); + UnlockBufHdr(buf, buf_state); PinCountWaitBuf = NULL; } @@ -3509,27 +3608,30 @@ LockBufferForCleanup(Buffer buffer) for (;;) { + uint32 buf_state; + /* Try to acquire lock */ LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); - LockBufHdr(bufHdr); - Assert(bufHdr->refcount > 0); - if (bufHdr->refcount == 1) + buf_state = LockBufHdr(bufHdr); + + Assert(BUF_STATE_GET_REFCOUNT(buf_state) > 0); + if (BUF_STATE_GET_REFCOUNT(buf_state) == 1) { /* Successfully acquired exclusive lock with pincount 1 */ - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); return; } /* Failed, so mark myself as waiting for pincount 1 */ - if (bufHdr->flags & BM_PIN_COUNT_WAITER) + if (buf_state & BM_PIN_COUNT_WAITER) { - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); LockBuffer(buffer, BUFFER_LOCK_UNLOCK); elog(ERROR, "multiple backends attempting to wait for pincount 1"); } bufHdr->wait_backend_pid = MyProcPid; - bufHdr->flags |= BM_PIN_COUNT_WAITER; PinCountWaitBuf = bufHdr; - UnlockBufHdr(bufHdr); + buf_state |= BM_PIN_COUNT_WAITER; + UnlockBufHdr(bufHdr, buf_state); LockBuffer(buffer, BUFFER_LOCK_UNLOCK); /* Report the wait */ @@ -3558,11 +3660,11 @@ LockBufferForCleanup(Buffer buffer) * impossible with the current usages due to table level locking, but * better be safe. */ - LockBufHdr(bufHdr); - if ((bufHdr->flags & BM_PIN_COUNT_WAITER) != 0 && + buf_state = LockBufHdr(bufHdr); + if ((buf_state & BM_PIN_COUNT_WAITER) != 0 && bufHdr->wait_backend_pid == MyProcPid) - bufHdr->flags &= ~BM_PIN_COUNT_WAITER; - UnlockBufHdr(bufHdr); + buf_state &= ~BM_PIN_COUNT_WAITER; + UnlockBufHdr(bufHdr, buf_state); PinCountWaitBuf = NULL; /* Loop back and try again */ @@ -3603,22 +3705,26 @@ bool ConditionalLockBufferForCleanup(Buffer buffer) { BufferDesc *bufHdr; + uint32 buf_state, + refcount; Assert(BufferIsValid(buffer)); if (BufferIsLocal(buffer)) { + refcount = LocalRefCount[-buffer - 1]; /* There should be exactly one pin */ - Assert(LocalRefCount[-buffer - 1] > 0); - if (LocalRefCount[-buffer - 1] != 1) + Assert(refcount > 0); + if (refcount != 1) return false; /* Nobody else to wait for */ return true; } /* There should be exactly one local pin */ - Assert(GetPrivateRefCount(buffer) > 0); - if (GetPrivateRefCount(buffer) != 1) + refcount = GetPrivateRefCount(buffer); + Assert(refcount); + if (refcount != 1) return false; /* Try to acquire lock */ @@ -3626,17 +3732,19 @@ ConditionalLockBufferForCleanup(Buffer buffer) return false; bufHdr = GetBufferDescriptor(buffer - 1); - LockBufHdr(bufHdr); - Assert(bufHdr->refcount > 0); - if (bufHdr->refcount == 1) + buf_state = LockBufHdr(bufHdr); + refcount = BUF_STATE_GET_REFCOUNT(buf_state); + + Assert(refcount > 0); + if (refcount == 1) { /* Successfully acquired exclusive lock with pincount 1 */ - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); return true; } /* Failed, so release the lock */ - UnlockBufHdr(bufHdr); + UnlockBufHdr(bufHdr, buf_state); LockBuffer(buffer, BUFFER_LOCK_UNLOCK); return false; } @@ -3666,17 +3774,17 @@ WaitIO(BufferDesc *buf) */ for (;;) { - BufFlags sv_flags; + uint32 buf_state; /* * It may not be necessary to acquire the spinlock to check the flag * here, but since this test is essential for correctness, we'd better * play it safe. */ - LockBufHdr(buf); - sv_flags = buf->flags; - UnlockBufHdr(buf); - if (!(sv_flags & BM_IO_IN_PROGRESS)) + buf_state = LockBufHdr(buf); + UnlockBufHdr(buf, buf_state); + + if (!(buf_state & BM_IO_IN_PROGRESS)) break; LWLockAcquire(BufferDescriptorGetIOLock(buf), LW_SHARED); LWLockRelease(BufferDescriptorGetIOLock(buf)); @@ -3704,6 +3812,8 @@ WaitIO(BufferDesc *buf) static bool StartBufferIO(BufferDesc *buf, bool forInput) { + uint32 buf_state; + Assert(!InProgressBuf); for (;;) @@ -3714,9 +3824,9 @@ StartBufferIO(BufferDesc *buf, bool forInput) */ LWLockAcquire(BufferDescriptorGetIOLock(buf), LW_EXCLUSIVE); - LockBufHdr(buf); + buf_state = LockBufHdr(buf); - if (!(buf->flags & BM_IO_IN_PROGRESS)) + if (!(buf_state & BM_IO_IN_PROGRESS)) break; /* @@ -3725,24 +3835,23 @@ StartBufferIO(BufferDesc *buf, bool forInput) * an error (see AbortBufferIO). If that's the case, we must wait for * him to get unwedged. */ - UnlockBufHdr(buf); + UnlockBufHdr(buf, buf_state); LWLockRelease(BufferDescriptorGetIOLock(buf)); WaitIO(buf); } /* Once we get here, there is definitely no I/O active on this buffer */ - if (forInput ? (buf->flags & BM_VALID) : !(buf->flags & BM_DIRTY)) + if (forInput ? (buf_state & BM_VALID) : !(buf_state & BM_DIRTY)) { /* someone else already did the I/O */ - UnlockBufHdr(buf); + UnlockBufHdr(buf, buf_state); LWLockRelease(BufferDescriptorGetIOLock(buf)); return false; } - buf->flags |= BM_IO_IN_PROGRESS; - - UnlockBufHdr(buf); + buf_state |= BM_IO_IN_PROGRESS; + UnlockBufHdr(buf, buf_state); InProgressBuf = buf; IsForInput = forInput; @@ -3768,19 +3877,22 @@ StartBufferIO(BufferDesc *buf, bool forInput) * be 0, or BM_VALID if we just finished reading in the page. */ static void -TerminateBufferIO(BufferDesc *buf, bool clear_dirty, int set_flag_bits) +TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits) { + uint32 buf_state; + Assert(buf == InProgressBuf); - LockBufHdr(buf); + buf_state = LockBufHdr(buf); + + Assert(buf_state & BM_IO_IN_PROGRESS); - Assert(buf->flags & BM_IO_IN_PROGRESS); - buf->flags &= ~(BM_IO_IN_PROGRESS | BM_IO_ERROR); - if (clear_dirty && !(buf->flags & BM_JUST_DIRTIED)) - buf->flags &= ~(BM_DIRTY | BM_CHECKPOINT_NEEDED); - buf->flags |= set_flag_bits; + buf_state &= ~(BM_IO_IN_PROGRESS | BM_IO_ERROR); + if (clear_dirty && !(buf_state & BM_JUST_DIRTIED)) + buf_state &= ~(BM_DIRTY | BM_CHECKPOINT_NEEDED); - UnlockBufHdr(buf); + buf_state |= set_flag_bits; + UnlockBufHdr(buf, buf_state); InProgressBuf = NULL; @@ -3803,6 +3915,8 @@ AbortBufferIO(void) if (buf) { + uint32 buf_state; + /* * Since LWLockReleaseAll has already been called, we're not holding * the buffer's io_in_progress_lock. We have to re-acquire it so that @@ -3811,24 +3925,22 @@ AbortBufferIO(void) */ LWLockAcquire(BufferDescriptorGetIOLock(buf), LW_EXCLUSIVE); - LockBufHdr(buf); - Assert(buf->flags & BM_IO_IN_PROGRESS); + buf_state = LockBufHdr(buf); + Assert(buf_state & BM_IO_IN_PROGRESS); if (IsForInput) { - Assert(!(buf->flags & BM_DIRTY)); + Assert(!(buf_state & BM_DIRTY)); + /* We'd better not think buffer is valid yet */ - Assert(!(buf->flags & BM_VALID)); - UnlockBufHdr(buf); + Assert(!(buf_state & BM_VALID)); + UnlockBufHdr(buf, buf_state); } else { - BufFlags sv_flags; - - sv_flags = buf->flags; - Assert(sv_flags & BM_DIRTY); - UnlockBufHdr(buf); + Assert(buf_state & BM_DIRTY); + UnlockBufHdr(buf, buf_state); /* Issue notice if this is not the first failure... */ - if (sv_flags & BM_IO_ERROR) + if (buf_state & BM_IO_ERROR) { /* Buffer is pinned, so we can read tag without spinlock */ char *path; @@ -3912,6 +4024,54 @@ rnode_comparator(const void *p1, const void *p2) } /* + * Lock buffer header - set BM_LOCKED in buffer state. + */ +uint32 +LockBufHdr(BufferDesc *desc) +{ + SpinDelayStatus delayStatus = init_spin_delay(desc); + uint32 old_buf_state; + + while (true) + { + /* set BM_LOCKED flag */ + old_buf_state = pg_atomic_fetch_or_u32(&desc->state, BM_LOCKED); + /* if it wasn't set before we're OK */ + if (!(old_buf_state & BM_LOCKED)) + break; + perform_spin_delay(&delayStatus); + } + finish_spin_delay(&delayStatus); + return old_buf_state | BM_LOCKED; +} + +/* + * Wait until the BM_LOCKED flag isn't set anymore and return the buffer's + * state at that point. + * + * Obviously the buffer could be locked by the time the value is returned, so + * this is primarily useful in CAS style loops. + */ +static uint32 +WaitBufHdrUnlocked(BufferDesc *buf) +{ + SpinDelayStatus delayStatus = init_spin_delay(buf); + uint32 buf_state; + + buf_state = pg_atomic_read_u32(&buf->state); + + while (buf_state & BM_LOCKED) + { + perform_spin_delay(&delayStatus); + buf_state = pg_atomic_read_u32(&buf->state); + } + + finish_spin_delay(&delayStatus); + + return buf_state; +} + +/* * BufferTag comparator. */ static int |