diff options
Diffstat (limited to 'src/backend/storage/buffer/bufmgr.c')
-rw-r--r-- | src/backend/storage/buffer/bufmgr.c | 726 |
1 files changed, 512 insertions, 214 deletions
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index f0f8d4259c5..929eb8f175f 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -19,6 +19,10 @@ * and pin it so that no one can destroy it while this process * is using it. * + * StartReadBuffer() -- as above, with separate wait step + * StartReadBuffers() -- multiple block version + * WaitReadBuffers() -- second step of above + * * ReleaseBuffer() -- unpin a buffer * * MarkBufferDirty() -- mark a pinned buffer's contents as "dirty". @@ -153,6 +157,13 @@ int effective_io_concurrency = DEFAULT_EFFECTIVE_IO_CONCURRENCY; int maintenance_io_concurrency = DEFAULT_MAINTENANCE_IO_CONCURRENCY; /* + * Limit on how many blocks should be handled in single I/O operations. + * StartReadBuffers() callers should respect it, as should other operations + * that call smgr APIs directly. + */ +int io_combine_limit = DEFAULT_IO_COMBINE_LIMIT; + +/* * GUC variables about triggering kernel writeback for buffers written; OS * dependent defaults are set via the GUC mechanism. */ @@ -471,10 +482,10 @@ ForgetPrivateRefCountEntry(PrivateRefCountEntry *ref) ) -static Buffer ReadBuffer_common(SMgrRelation smgr, char relpersistence, +static Buffer ReadBuffer_common(Relation rel, + SMgrRelation smgr, char smgr_persistence, ForkNumber forkNum, BlockNumber blockNum, - ReadBufferMode mode, BufferAccessStrategy strategy, - bool *hit); + ReadBufferMode mode, BufferAccessStrategy strategy); static BlockNumber ExtendBufferedRelCommon(BufferManagerRelation bmr, ForkNumber fork, BufferAccessStrategy strategy, @@ -500,18 +511,18 @@ static uint32 WaitBufHdrUnlocked(BufferDesc *buf); static int SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context); static void WaitIO(BufferDesc *buf); -static bool StartBufferIO(BufferDesc *buf, bool forInput); +static bool StartBufferIO(BufferDesc *buf, bool forInput, bool nowait); static void TerminateBufferIO(BufferDesc *buf, bool clear_dirty, uint32 set_flag_bits, bool forget_owner); static void AbortBufferIO(Buffer buffer); static void shared_buffer_write_error_callback(void *arg); static void local_buffer_write_error_callback(void *arg); -static BufferDesc *BufferAlloc(SMgrRelation smgr, - char relpersistence, - ForkNumber forkNum, - BlockNumber blockNum, - BufferAccessStrategy strategy, - bool *foundPtr, IOContext io_context); +static inline BufferDesc *BufferAlloc(SMgrRelation smgr, + char relpersistence, + ForkNumber forkNum, + BlockNumber blockNum, + BufferAccessStrategy strategy, + bool *foundPtr, IOContext io_context); static Buffer GetVictimBuffer(BufferAccessStrategy strategy, IOContext io_context); static void FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, IOContext io_context); @@ -777,11 +788,10 @@ ReadBuffer(Relation reln, BlockNumber blockNum) * If strategy is not NULL, a nondefault buffer access strategy is used. * See buffer/README for details. */ -Buffer +inline Buffer ReadBufferExtended(Relation reln, ForkNumber forkNum, BlockNumber blockNum, ReadBufferMode mode, BufferAccessStrategy strategy) { - bool hit; Buffer buf; /* @@ -798,11 +808,9 @@ ReadBufferExtended(Relation reln, ForkNumber forkNum, BlockNumber blockNum, * Read the buffer, and update pgstat counters to reflect a cache hit or * miss. */ - pgstat_count_buffer_read(reln); - buf = ReadBuffer_common(RelationGetSmgr(reln), reln->rd_rel->relpersistence, - forkNum, blockNum, mode, strategy, &hit); - if (hit) - pgstat_count_buffer_hit(reln); + buf = ReadBuffer_common(reln, RelationGetSmgr(reln), 0, + forkNum, blockNum, mode, strategy); + return buf; } @@ -822,13 +830,12 @@ ReadBufferWithoutRelcache(RelFileLocator rlocator, ForkNumber forkNum, BlockNumber blockNum, ReadBufferMode mode, BufferAccessStrategy strategy, bool permanent) { - bool hit; - SMgrRelation smgr = smgropen(rlocator, INVALID_PROC_NUMBER); - return ReadBuffer_common(smgr, permanent ? RELPERSISTENCE_PERMANENT : - RELPERSISTENCE_UNLOGGED, forkNum, blockNum, - mode, strategy, &hit); + return ReadBuffer_common(NULL, smgr, + permanent ? RELPERSISTENCE_PERMANENT : RELPERSISTENCE_UNLOGGED, + forkNum, blockNum, + mode, strategy); } /* @@ -994,55 +1001,98 @@ ExtendBufferedRelTo(BufferManagerRelation bmr, */ if (buffer == InvalidBuffer) { - bool hit; - Assert(extended_by == 0); - buffer = ReadBuffer_common(bmr.smgr, bmr.relpersistence, - fork, extend_to - 1, mode, strategy, - &hit); + buffer = ReadBuffer_common(bmr.rel, bmr.smgr, 0, + fork, extend_to - 1, mode, strategy); } return buffer; } /* - * ReadBuffer_common -- common logic for all ReadBuffer variants - * - * *hit is set to true if the request was satisfied from shared buffer cache. + * Zero a buffer and lock it, as part of the implementation of + * RBM_ZERO_AND_LOCK or RBM_ZERO_AND_CLEANUP_LOCK. The buffer must be already + * pinned. It does not have to be valid, but it is valid and locked on + * return. */ -static Buffer -ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, - BlockNumber blockNum, ReadBufferMode mode, - BufferAccessStrategy strategy, bool *hit) +static void +ZeroBuffer(Buffer buffer, ReadBufferMode mode) +{ + BufferDesc *bufHdr; + uint32 buf_state; + + Assert(mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK); + + if (BufferIsLocal(buffer)) + bufHdr = GetLocalBufferDescriptor(-buffer - 1); + else + { + bufHdr = GetBufferDescriptor(buffer - 1); + if (mode == RBM_ZERO_AND_LOCK) + LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE); + else + LockBufferForCleanup(buffer); + } + + memset(BufferGetPage(buffer), 0, BLCKSZ); + + if (BufferIsLocal(buffer)) + { + buf_state = pg_atomic_read_u32(&bufHdr->state); + buf_state |= BM_VALID; + pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state); + } + else + { + buf_state = LockBufHdr(bufHdr); + buf_state |= BM_VALID; + UnlockBufHdr(bufHdr, buf_state); + } +} + +/* + * Pin a buffer for a given block. *foundPtr is set to true if the block was + * already present, or false if more work is required to either read it in or + * zero it. + */ +static pg_attribute_always_inline Buffer +PinBufferForBlock(Relation rel, + SMgrRelation smgr, + char smgr_persistence, + ForkNumber forkNum, + BlockNumber blockNum, + BufferAccessStrategy strategy, + bool *foundPtr) { BufferDesc *bufHdr; - Block bufBlock; - bool found; IOContext io_context; IOObject io_object; - bool isLocalBuf = SmgrIsTemp(smgr); + char persistence; - *hit = false; + Assert(blockNum != P_NEW); /* - * Backward compatibility path, most code should use ExtendBufferedRel() - * instead, as acquiring the extension lock inside ExtendBufferedRel() - * scales a lot better. + * If there is no Relation it usually implies recovery and thus permanent, + * but we take an argmument because CreateAndCopyRelationData can reach us + * with only an SMgrRelation for an unlogged relation that we don't want + * to flag with BM_PERMANENT. */ - if (unlikely(blockNum == P_NEW)) - { - uint32 flags = EB_SKIP_EXTENSION_LOCK; - - /* - * Since no-one else can be looking at the page contents yet, there is - * no difference between an exclusive lock and a cleanup-strength - * lock. - */ - if (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK) - flags |= EB_LOCK_FIRST; + if (rel) + persistence = rel->rd_rel->relpersistence; + else if (smgr_persistence == 0) + persistence = RELPERSISTENCE_PERMANENT; + else + persistence = smgr_persistence; - return ExtendBufferedRel(BMR_SMGR(smgr, relpersistence), - forkNum, strategy, flags); + if (persistence == RELPERSISTENCE_TEMP) + { + io_context = IOCONTEXT_NORMAL; + io_object = IOOBJECT_TEMP_RELATION; + } + else + { + io_context = IOContextForStrategy(strategy); + io_object = IOOBJECT_RELATION; } TRACE_POSTGRESQL_BUFFER_READ_START(forkNum, blockNum, @@ -1051,50 +1101,34 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, smgr->smgr_rlocator.locator.relNumber, smgr->smgr_rlocator.backend); - if (isLocalBuf) + if (persistence == RELPERSISTENCE_TEMP) { - /* - * We do not use a BufferAccessStrategy for I/O of temporary tables. - * However, in some cases, the "strategy" may not be NULL, so we can't - * rely on IOContextForStrategy() to set the right IOContext for us. - * This may happen in cases like CREATE TEMPORARY TABLE AS... - */ - io_context = IOCONTEXT_NORMAL; - io_object = IOOBJECT_TEMP_RELATION; - bufHdr = LocalBufferAlloc(smgr, forkNum, blockNum, &found); - if (found) + bufHdr = LocalBufferAlloc(smgr, forkNum, blockNum, foundPtr); + if (*foundPtr) pgBufferUsage.local_blks_hit++; - else if (mode == RBM_NORMAL || mode == RBM_NORMAL_NO_LOG || - mode == RBM_ZERO_ON_ERROR) - pgBufferUsage.local_blks_read++; } else { + bufHdr = BufferAlloc(smgr, persistence, forkNum, blockNum, + strategy, foundPtr, io_context); + if (*foundPtr) + pgBufferUsage.shared_blks_hit++; + } + if (rel) + { /* - * lookup the buffer. IO_IN_PROGRESS is set if the requested block is - * not currently in memory. + * While pgBufferUsage's "read" counter isn't bumped unless we reach + * WaitReadBuffers() (so, not for hits, and not for buffers that are + * zeroed instead), the per-relation stats always count them. */ - io_context = IOContextForStrategy(strategy); - io_object = IOOBJECT_RELATION; - bufHdr = BufferAlloc(smgr, relpersistence, forkNum, blockNum, - strategy, &found, io_context); - if (found) - pgBufferUsage.shared_blks_hit++; - else if (mode == RBM_NORMAL || mode == RBM_NORMAL_NO_LOG || - mode == RBM_ZERO_ON_ERROR) - pgBufferUsage.shared_blks_read++; + pgstat_count_buffer_read(rel); + if (*foundPtr) + pgstat_count_buffer_hit(rel); } - - /* At this point we do NOT hold any locks. */ - - /* if it was already in the buffer pool, we're done */ - if (found) + if (*foundPtr) { - /* Just need to update stats before we exit */ - *hit = true; VacuumPageHit++; pgstat_count_io_op(io_object, io_context, IOOP_HIT); - if (VacuumCostActive) VacuumCostBalance += VacuumCostPageHit; @@ -1103,119 +1137,398 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, smgr->smgr_rlocator.locator.dbOid, smgr->smgr_rlocator.locator.relNumber, smgr->smgr_rlocator.backend, - found); + true); + } + + return BufferDescriptorGetBuffer(bufHdr); +} + +/* + * ReadBuffer_common -- common logic for all ReadBuffer variants + * + * smgr is required, rel is optional unless using P_NEW. + */ +static pg_attribute_always_inline Buffer +ReadBuffer_common(Relation rel, SMgrRelation smgr, char smgr_persistence, + ForkNumber forkNum, + BlockNumber blockNum, ReadBufferMode mode, + BufferAccessStrategy strategy) +{ + ReadBuffersOperation operation; + Buffer buffer; + int flags; + + /* + * Backward compatibility path, most code should use ExtendBufferedRel() + * instead, as acquiring the extension lock inside ExtendBufferedRel() + * scales a lot better. + */ + if (unlikely(blockNum == P_NEW)) + { + uint32 flags = EB_SKIP_EXTENSION_LOCK; /* - * In RBM_ZERO_AND_LOCK mode the caller expects the page to be locked - * on return. + * Since no-one else can be looking at the page contents yet, there is + * no difference between an exclusive lock and a cleanup-strength + * lock. */ - if (!isLocalBuf) - { - if (mode == RBM_ZERO_AND_LOCK) - LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), - LW_EXCLUSIVE); - else if (mode == RBM_ZERO_AND_CLEANUP_LOCK) - LockBufferForCleanup(BufferDescriptorGetBuffer(bufHdr)); - } + if (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK) + flags |= EB_LOCK_FIRST; - return BufferDescriptorGetBuffer(bufHdr); + return ExtendBufferedRel(BMR_REL(rel), forkNum, strategy, flags); } - /* - * if we have gotten to this point, we have allocated a buffer for the - * page but its contents are not yet valid. IO_IN_PROGRESS is set for it, - * if it's a shared buffer. - */ - Assert(!(pg_atomic_read_u32(&bufHdr->state) & BM_VALID)); /* spinlock not needed */ + if (unlikely(mode == RBM_ZERO_AND_CLEANUP_LOCK || + mode == RBM_ZERO_AND_LOCK)) + { + bool found; - bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr); + buffer = PinBufferForBlock(rel, smgr, smgr_persistence, + forkNum, blockNum, strategy, &found); + ZeroBuffer(buffer, mode); + return buffer; + } - /* - * Read in the page, unless the caller intends to overwrite it and just - * wants us to allocate a buffer. - */ - if (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK) - MemSet((char *) bufBlock, 0, BLCKSZ); + if (mode == RBM_ZERO_ON_ERROR) + flags = READ_BUFFERS_ZERO_ON_ERROR; else - { - instr_time io_start = pgstat_prepare_io_time(track_io_timing); + flags = 0; + operation.smgr = smgr; + operation.rel = rel; + operation.smgr_persistence = smgr_persistence; + operation.forknum = forkNum; + operation.strategy = strategy; + if (StartReadBuffer(&operation, + &buffer, + blockNum, + flags)) + WaitReadBuffers(&operation); + + return buffer; +} + +static pg_attribute_always_inline bool +StartReadBuffersImpl(ReadBuffersOperation *operation, + Buffer *buffers, + BlockNumber blockNum, + int *nblocks, + int flags) +{ + int actual_nblocks = *nblocks; + int io_buffers_len = 0; - smgrread(smgr, forkNum, blockNum, bufBlock); + Assert(*nblocks > 0); + Assert(*nblocks <= MAX_IO_COMBINE_LIMIT); - pgstat_count_io_op_time(io_object, io_context, - IOOP_READ, io_start, 1); + for (int i = 0; i < actual_nblocks; ++i) + { + bool found; - /* check for garbage data */ - if (!PageIsVerifiedExtended((Page) bufBlock, blockNum, - PIV_LOG_WARNING | PIV_REPORT_STAT)) + buffers[i] = PinBufferForBlock(operation->rel, + operation->smgr, + operation->smgr_persistence, + operation->forknum, + blockNum + i, + operation->strategy, + &found); + + if (found) { - if (mode == RBM_ZERO_ON_ERROR || zero_damaged_pages) - { - ereport(WARNING, - (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("invalid page in block %u of relation %s; zeroing out page", - blockNum, - relpath(smgr->smgr_rlocator, forkNum)))); - MemSet((char *) bufBlock, 0, BLCKSZ); - } - else - ereport(ERROR, - (errcode(ERRCODE_DATA_CORRUPTED), - errmsg("invalid page in block %u of relation %s", - blockNum, - relpath(smgr->smgr_rlocator, forkNum)))); + /* + * Terminate the read as soon as we get a hit. It could be a + * single buffer hit, or it could be a hit that follows a readable + * range. We don't want to create more than one readable range, + * so we stop here. + */ + actual_nblocks = i + 1; + break; + } + else + { + /* Extend the readable range to cover this block. */ + io_buffers_len++; } } + *nblocks = actual_nblocks; - /* - * In RBM_ZERO_AND_LOCK / RBM_ZERO_AND_CLEANUP_LOCK mode, grab the buffer - * content lock before marking the page as valid, to make sure that no - * other backend sees the zeroed page before the caller has had a chance - * to initialize it. - * - * Since no-one else can be looking at the page contents yet, there is no - * difference between an exclusive lock and a cleanup-strength lock. (Note - * that we cannot use LockBuffer() or LockBufferForCleanup() here, because - * they assert that the buffer is already valid.) - */ - if ((mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK) && - !isLocalBuf) + if (likely(io_buffers_len == 0)) + return false; + + /* Populate information needed for I/O. */ + operation->buffers = buffers; + operation->blocknum = blockNum; + operation->flags = flags; + operation->nblocks = actual_nblocks; + operation->io_buffers_len = io_buffers_len; + + if (flags & READ_BUFFERS_ISSUE_ADVICE) { - LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_EXCLUSIVE); + /* + * In theory we should only do this if PinBufferForBlock() had to + * allocate new buffers above. That way, if two calls to + * StartReadBuffers() were made for the same blocks before + * WaitReadBuffers(), only the first would issue the advice. That'd be + * a better simulation of true asynchronous I/O, which would only + * start the I/O once, but isn't done here for simplicity. Note also + * that the following call might actually issue two advice calls if we + * cross a segment boundary; in a true asynchronous version we might + * choose to process only one real I/O at a time in that case. + */ + smgrprefetch(operation->smgr, + operation->forknum, + blockNum, + operation->io_buffers_len); } - if (isLocalBuf) + /* Indicate that WaitReadBuffers() should be called. */ + return true; +} + +/* + * Begin reading a range of blocks beginning at blockNum and extending for + * *nblocks. On return, up to *nblocks pinned buffers holding those blocks + * are written into the buffers array, and *nblocks is updated to contain the + * actual number, which may be fewer than requested. Caller sets some of the + * members of operation; see struct definition. + * + * If false is returned, no I/O is necessary. If true is returned, one I/O + * has been started, and WaitReadBuffers() must be called with the same + * operation object before the buffers are accessed. Along with the operation + * object, the caller-supplied array of buffers must remain valid until + * WaitReadBuffers() is called. + * + * Currently the I/O is only started with optional operating system advice if + * requested by the caller with READ_BUFFERS_ISSUE_ADVICE, and the real I/O + * happens synchronously in WaitReadBuffers(). In future work, true I/O could + * be initiated here. + */ +bool +StartReadBuffers(ReadBuffersOperation *operation, + Buffer *buffers, + BlockNumber blockNum, + int *nblocks, + int flags) +{ + return StartReadBuffersImpl(operation, buffers, blockNum, nblocks, flags); +} + +/* + * Single block version of the StartReadBuffers(). This might save a few + * instructions when called from another translation unit, because it is + * specialized for nblocks == 1. + */ +bool +StartReadBuffer(ReadBuffersOperation *operation, + Buffer *buffer, + BlockNumber blocknum, + int flags) +{ + int nblocks = 1; + bool result; + + result = StartReadBuffersImpl(operation, buffer, blocknum, &nblocks, flags); + Assert(nblocks == 1); /* single block can't be short */ + + return result; +} + +static inline bool +WaitReadBuffersCanStartIO(Buffer buffer, bool nowait) +{ + if (BufferIsLocal(buffer)) { - /* Only need to adjust flags */ - uint32 buf_state = pg_atomic_read_u32(&bufHdr->state); + BufferDesc *bufHdr = GetLocalBufferDescriptor(-buffer - 1); - buf_state |= BM_VALID; - pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state); + return (pg_atomic_read_u32(&bufHdr->state) & BM_VALID) == 0; } else + return StartBufferIO(GetBufferDescriptor(buffer - 1), true, nowait); +} + +void +WaitReadBuffers(ReadBuffersOperation *operation) +{ + Buffer *buffers; + int nblocks; + BlockNumber blocknum; + ForkNumber forknum; + IOContext io_context; + IOObject io_object; + char persistence; + + /* + * Currently operations are only allowed to include a read of some range, + * with an optional extra buffer that is already pinned at the end. So + * nblocks can be at most one more than io_buffers_len. + */ + Assert((operation->nblocks == operation->io_buffers_len) || + (operation->nblocks == operation->io_buffers_len + 1)); + + /* Find the range of the physical read we need to perform. */ + nblocks = operation->io_buffers_len; + if (nblocks == 0) + return; /* nothing to do */ + + buffers = &operation->buffers[0]; + blocknum = operation->blocknum; + forknum = operation->forknum; + + persistence = operation->rel + ? operation->rel->rd_rel->relpersistence + : RELPERSISTENCE_PERMANENT; + if (persistence == RELPERSISTENCE_TEMP) { - /* Set BM_VALID, terminate IO, and wake up any waiters */ - TerminateBufferIO(bufHdr, false, BM_VALID, true); + io_context = IOCONTEXT_NORMAL; + io_object = IOOBJECT_TEMP_RELATION; + } + else + { + io_context = IOContextForStrategy(operation->strategy); + io_object = IOOBJECT_RELATION; } - VacuumPageMiss++; - if (VacuumCostActive) - VacuumCostBalance += VacuumCostPageMiss; + /* + * We count all these blocks as read by this backend. This is traditional + * behavior, but might turn out to be not true if we find that someone + * else has beaten us and completed the read of some of these blocks. In + * that case the system globally double-counts, but we traditionally don't + * count this as a "hit", and we don't have a separate counter for "miss, + * but another backend completed the read". + */ + if (persistence == RELPERSISTENCE_TEMP) + pgBufferUsage.local_blks_read += nblocks; + else + pgBufferUsage.shared_blks_read += nblocks; - TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum, - smgr->smgr_rlocator.locator.spcOid, - smgr->smgr_rlocator.locator.dbOid, - smgr->smgr_rlocator.locator.relNumber, - smgr->smgr_rlocator.backend, - found); + for (int i = 0; i < nblocks; ++i) + { + int io_buffers_len; + Buffer io_buffers[MAX_IO_COMBINE_LIMIT]; + void *io_pages[MAX_IO_COMBINE_LIMIT]; + instr_time io_start; + BlockNumber io_first_block; - return BufferDescriptorGetBuffer(bufHdr); + /* + * Skip this block if someone else has already completed it. If an + * I/O is already in progress in another backend, this will wait for + * the outcome: either done, or something went wrong and we will + * retry. + */ + if (!WaitReadBuffersCanStartIO(buffers[i], false)) + { + /* + * Report this as a 'hit' for this backend, even though it must + * have started out as a miss in PinBufferForBlock(). + */ + TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, blocknum + i, + operation->smgr->smgr_rlocator.locator.spcOid, + operation->smgr->smgr_rlocator.locator.dbOid, + operation->smgr->smgr_rlocator.locator.relNumber, + operation->smgr->smgr_rlocator.backend, + true); + continue; + } + + /* We found a buffer that we need to read in. */ + io_buffers[0] = buffers[i]; + io_pages[0] = BufferGetBlock(buffers[i]); + io_first_block = blocknum + i; + io_buffers_len = 1; + + /* + * How many neighboring-on-disk blocks can we can scatter-read into + * other buffers at the same time? In this case we don't wait if we + * see an I/O already in progress. We already hold BM_IO_IN_PROGRESS + * for the head block, so we should get on with that I/O as soon as + * possible. We'll come back to this block again, above. + */ + while ((i + 1) < nblocks && + WaitReadBuffersCanStartIO(buffers[i + 1], true)) + { + /* Must be consecutive block numbers. */ + Assert(BufferGetBlockNumber(buffers[i + 1]) == + BufferGetBlockNumber(buffers[i]) + 1); + + io_buffers[io_buffers_len] = buffers[++i]; + io_pages[io_buffers_len++] = BufferGetBlock(buffers[i]); + } + + io_start = pgstat_prepare_io_time(track_io_timing); + smgrreadv(operation->smgr, forknum, io_first_block, io_pages, io_buffers_len); + pgstat_count_io_op_time(io_object, io_context, IOOP_READ, io_start, + io_buffers_len); + + /* Verify each block we read, and terminate the I/O. */ + for (int j = 0; j < io_buffers_len; ++j) + { + BufferDesc *bufHdr; + Block bufBlock; + + if (persistence == RELPERSISTENCE_TEMP) + { + bufHdr = GetLocalBufferDescriptor(-io_buffers[j] - 1); + bufBlock = LocalBufHdrGetBlock(bufHdr); + } + else + { + bufHdr = GetBufferDescriptor(io_buffers[j] - 1); + bufBlock = BufHdrGetBlock(bufHdr); + } + + /* check for garbage data */ + if (!PageIsVerifiedExtended((Page) bufBlock, io_first_block + j, + PIV_LOG_WARNING | PIV_REPORT_STAT)) + { + if ((operation->flags & READ_BUFFERS_ZERO_ON_ERROR) || zero_damaged_pages) + { + ereport(WARNING, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("invalid page in block %u of relation %s; zeroing out page", + io_first_block + j, + relpath(operation->smgr->smgr_rlocator, forknum)))); + memset(bufBlock, 0, BLCKSZ); + } + else + ereport(ERROR, + (errcode(ERRCODE_DATA_CORRUPTED), + errmsg("invalid page in block %u of relation %s", + io_first_block + j, + relpath(operation->smgr->smgr_rlocator, forknum)))); + } + + /* Terminate I/O and set BM_VALID. */ + if (persistence == RELPERSISTENCE_TEMP) + { + uint32 buf_state = pg_atomic_read_u32(&bufHdr->state); + + buf_state |= BM_VALID; + pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state); + } + else + { + /* Set BM_VALID, terminate IO, and wake up any waiters */ + TerminateBufferIO(bufHdr, false, BM_VALID, true); + } + + /* Report I/Os as completing individually. */ + TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, io_first_block + j, + operation->smgr->smgr_rlocator.locator.spcOid, + operation->smgr->smgr_rlocator.locator.dbOid, + operation->smgr->smgr_rlocator.locator.relNumber, + operation->smgr->smgr_rlocator.backend, + false); + } + + VacuumPageMiss += io_buffers_len; + if (VacuumCostActive) + VacuumCostBalance += VacuumCostPageMiss * io_buffers_len; + } } /* - * BufferAlloc -- subroutine for ReadBuffer. Handles lookup of a shared - * buffer. If no buffer exists already, selects a replacement - * victim and evicts the old page, but does NOT read in new page. + * BufferAlloc -- subroutine for PinBufferForBlock. Handles lookup of a shared + * buffer. If no buffer exists already, selects a replacement victim and + * evicts the old page, but does NOT read in new page. * * "strategy" can be a buffer replacement strategy object, or NULL for * the default strategy. The selected buffer's usage_count is advanced when @@ -1223,11 +1536,7 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, * * The returned buffer is pinned and is already marked as holding the * desired page. If it already did have the desired page, *foundPtr is - * set true. Otherwise, *foundPtr is set false and the buffer is marked - * as IO_IN_PROGRESS; ReadBuffer will now need to do I/O to fill it. - * - * *foundPtr is actually redundant with the buffer's BM_VALID flag, but - * we keep it for simplicity in ReadBuffer. + * set true. Otherwise, *foundPtr is set false. * * io_context is passed as an output parameter to avoid calling * IOContextForStrategy() when there is a shared buffers hit and no IO @@ -1235,7 +1544,7 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, * * No locks are held either at entry or exit. */ -static BufferDesc * +static pg_attribute_always_inline BufferDesc * BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, BlockNumber blockNum, BufferAccessStrategy strategy, @@ -1286,19 +1595,10 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, { /* * We can only get here if (a) someone else is still reading in - * the page, or (b) a previous read attempt failed. We have to - * wait for any active read attempt to finish, and then set up our - * own read attempt if the page is still not BM_VALID. - * StartBufferIO does it all. + * the page, (b) a previous read attempt failed, or (c) someone + * called StartReadBuffers() but not yet WaitReadBuffers(). */ - if (StartBufferIO(buf, true)) - { - /* - * If we get here, previous attempts to read the buffer must - * have failed ... but we shall bravely try again. - */ - *foundPtr = false; - } + *foundPtr = false; } return buf; @@ -1363,19 +1663,10 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, { /* * We can only get here if (a) someone else is still reading in - * the page, or (b) a previous read attempt failed. We have to - * wait for any active read attempt to finish, and then set up our - * own read attempt if the page is still not BM_VALID. - * StartBufferIO does it all. + * the page, (b) a previous read attempt failed, or (c) someone + * called StartReadBuffers() but not yet WaitReadBuffers(). */ - if (StartBufferIO(existing_buf_hdr, true)) - { - /* - * If we get here, previous attempts to read the buffer must - * have failed ... but we shall bravely try again. - */ - *foundPtr = false; - } + *foundPtr = false; } return existing_buf_hdr; @@ -1407,15 +1698,9 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, LWLockRelease(newPartitionLock); /* - * Buffer contents are currently invalid. Try to obtain the right to - * start I/O. If StartBufferIO returns false, then someone else managed - * to read it before we did, so there's nothing left for BufferAlloc() to - * do. + * Buffer contents are currently invalid. */ - if (StartBufferIO(victim_buf_hdr, true)) - *foundPtr = false; - else - *foundPtr = true; + *foundPtr = false; return victim_buf_hdr; } @@ -1769,7 +2054,7 @@ again: * pessimistic, but outside of toy-sized shared_buffers it should allow * sufficient pins. */ -static void +void LimitAdditionalPins(uint32 *additional_pins) { uint32 max_backends; @@ -2034,7 +2319,7 @@ ExtendBufferedRelShared(BufferManagerRelation bmr, buf_state &= ~BM_VALID; UnlockBufHdr(existing_hdr, buf_state); - } while (!StartBufferIO(existing_hdr, true)); + } while (!StartBufferIO(existing_hdr, true, false)); } else { @@ -2057,7 +2342,7 @@ ExtendBufferedRelShared(BufferManagerRelation bmr, LWLockRelease(partition_lock); /* XXX: could combine the locked operations in it with the above */ - StartBufferIO(victim_buf_hdr, true); + StartBufferIO(victim_buf_hdr, true, false); } } @@ -2372,7 +2657,12 @@ PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy) else { /* - * If we previously pinned the buffer, it must surely be valid. + * If we previously pinned the buffer, it is likely to be valid, but + * it may not be if StartReadBuffers() was called and + * WaitReadBuffers() hasn't been called yet. We'll check by loading + * the flags without locking. This is racy, but it's OK to return + * false spuriously: when WaitReadBuffers() calls StartBufferIO(), + * it'll see that it's now valid. * * Note: We deliberately avoid a Valgrind client request here. * Individual access methods can optionally superimpose buffer page @@ -2381,7 +2671,7 @@ PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy) * that the buffer page is legitimately non-accessible here. We * cannot meddle with that. */ - result = true; + result = (pg_atomic_read_u32(&buf->state) & BM_VALID) != 0; } ref->refcount++; @@ -3449,7 +3739,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object, * someone else flushed the buffer before we could, so we need not do * anything. */ - if (!StartBufferIO(buf, false)) + if (!StartBufferIO(buf, false, false)) return; /* Setup error traceback support for ereport() */ @@ -5184,9 +5474,15 @@ WaitIO(BufferDesc *buf) * * Returns true if we successfully marked the buffer as I/O busy, * false if someone else already did the work. + * + * If nowait is true, then we don't wait for an I/O to be finished by another + * backend. In that case, false indicates either that the I/O was already + * finished, or is still in progress. This is useful for callers that want to + * find out if they can perform the I/O as part of a larger operation, without + * waiting for the answer or distinguishing the reasons why not. */ static bool -StartBufferIO(BufferDesc *buf, bool forInput) +StartBufferIO(BufferDesc *buf, bool forInput, bool nowait) { uint32 buf_state; @@ -5199,6 +5495,8 @@ StartBufferIO(BufferDesc *buf, bool forInput) if (!(buf_state & BM_IO_IN_PROGRESS)) break; UnlockBufHdr(buf, buf_state); + if (nowait) + return false; WaitIO(buf); } |