aboutsummaryrefslogtreecommitdiff
path: root/src/backend/storage/buffer/bufmgr.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/storage/buffer/bufmgr.c')
-rw-r--r--src/backend/storage/buffer/bufmgr.c726
1 files changed, 512 insertions, 214 deletions
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index f0f8d4259c5..929eb8f175f 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -19,6 +19,10 @@
* and pin it so that no one can destroy it while this process
* is using it.
*
+ * StartReadBuffer() -- as above, with separate wait step
+ * StartReadBuffers() -- multiple block version
+ * WaitReadBuffers() -- second step of above
+ *
* ReleaseBuffer() -- unpin a buffer
*
* MarkBufferDirty() -- mark a pinned buffer's contents as "dirty".
@@ -153,6 +157,13 @@ int effective_io_concurrency = DEFAULT_EFFECTIVE_IO_CONCURRENCY;
int maintenance_io_concurrency = DEFAULT_MAINTENANCE_IO_CONCURRENCY;
/*
+ * Limit on how many blocks should be handled in single I/O operations.
+ * StartReadBuffers() callers should respect it, as should other operations
+ * that call smgr APIs directly.
+ */
+int io_combine_limit = DEFAULT_IO_COMBINE_LIMIT;
+
+/*
* GUC variables about triggering kernel writeback for buffers written; OS
* dependent defaults are set via the GUC mechanism.
*/
@@ -471,10 +482,10 @@ ForgetPrivateRefCountEntry(PrivateRefCountEntry *ref)
)
-static Buffer ReadBuffer_common(SMgrRelation smgr, char relpersistence,
+static Buffer ReadBuffer_common(Relation rel,
+ SMgrRelation smgr, char smgr_persistence,
ForkNumber forkNum, BlockNumber blockNum,
- ReadBufferMode mode, BufferAccessStrategy strategy,
- bool *hit);
+ ReadBufferMode mode, BufferAccessStrategy strategy);
static BlockNumber ExtendBufferedRelCommon(BufferManagerRelation bmr,
ForkNumber fork,
BufferAccessStrategy strategy,
@@ -500,18 +511,18 @@ static uint32 WaitBufHdrUnlocked(BufferDesc *buf);
static int SyncOneBuffer(int buf_id, bool skip_recently_used,
WritebackContext *wb_context);
static void WaitIO(BufferDesc *buf);
-static bool StartBufferIO(BufferDesc *buf, bool forInput);
+static bool StartBufferIO(BufferDesc *buf, bool forInput, bool nowait);
static void TerminateBufferIO(BufferDesc *buf, bool clear_dirty,
uint32 set_flag_bits, bool forget_owner);
static void AbortBufferIO(Buffer buffer);
static void shared_buffer_write_error_callback(void *arg);
static void local_buffer_write_error_callback(void *arg);
-static BufferDesc *BufferAlloc(SMgrRelation smgr,
- char relpersistence,
- ForkNumber forkNum,
- BlockNumber blockNum,
- BufferAccessStrategy strategy,
- bool *foundPtr, IOContext io_context);
+static inline BufferDesc *BufferAlloc(SMgrRelation smgr,
+ char relpersistence,
+ ForkNumber forkNum,
+ BlockNumber blockNum,
+ BufferAccessStrategy strategy,
+ bool *foundPtr, IOContext io_context);
static Buffer GetVictimBuffer(BufferAccessStrategy strategy, IOContext io_context);
static void FlushBuffer(BufferDesc *buf, SMgrRelation reln,
IOObject io_object, IOContext io_context);
@@ -777,11 +788,10 @@ ReadBuffer(Relation reln, BlockNumber blockNum)
* If strategy is not NULL, a nondefault buffer access strategy is used.
* See buffer/README for details.
*/
-Buffer
+inline Buffer
ReadBufferExtended(Relation reln, ForkNumber forkNum, BlockNumber blockNum,
ReadBufferMode mode, BufferAccessStrategy strategy)
{
- bool hit;
Buffer buf;
/*
@@ -798,11 +808,9 @@ ReadBufferExtended(Relation reln, ForkNumber forkNum, BlockNumber blockNum,
* Read the buffer, and update pgstat counters to reflect a cache hit or
* miss.
*/
- pgstat_count_buffer_read(reln);
- buf = ReadBuffer_common(RelationGetSmgr(reln), reln->rd_rel->relpersistence,
- forkNum, blockNum, mode, strategy, &hit);
- if (hit)
- pgstat_count_buffer_hit(reln);
+ buf = ReadBuffer_common(reln, RelationGetSmgr(reln), 0,
+ forkNum, blockNum, mode, strategy);
+
return buf;
}
@@ -822,13 +830,12 @@ ReadBufferWithoutRelcache(RelFileLocator rlocator, ForkNumber forkNum,
BlockNumber blockNum, ReadBufferMode mode,
BufferAccessStrategy strategy, bool permanent)
{
- bool hit;
-
SMgrRelation smgr = smgropen(rlocator, INVALID_PROC_NUMBER);
- return ReadBuffer_common(smgr, permanent ? RELPERSISTENCE_PERMANENT :
- RELPERSISTENCE_UNLOGGED, forkNum, blockNum,
- mode, strategy, &hit);
+ return ReadBuffer_common(NULL, smgr,
+ permanent ? RELPERSISTENCE_PERMANENT : RELPERSISTENCE_UNLOGGED,
+ forkNum, blockNum,
+ mode, strategy);
}
/*
@@ -994,55 +1001,98 @@ ExtendBufferedRelTo(BufferManagerRelation bmr,
*/
if (buffer == InvalidBuffer)
{
- bool hit;
-
Assert(extended_by == 0);
- buffer = ReadBuffer_common(bmr.smgr, bmr.relpersistence,
- fork, extend_to - 1, mode, strategy,
- &hit);
+ buffer = ReadBuffer_common(bmr.rel, bmr.smgr, 0,
+ fork, extend_to - 1, mode, strategy);
}
return buffer;
}
/*
- * ReadBuffer_common -- common logic for all ReadBuffer variants
- *
- * *hit is set to true if the request was satisfied from shared buffer cache.
+ * Zero a buffer and lock it, as part of the implementation of
+ * RBM_ZERO_AND_LOCK or RBM_ZERO_AND_CLEANUP_LOCK. The buffer must be already
+ * pinned. It does not have to be valid, but it is valid and locked on
+ * return.
*/
-static Buffer
-ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
- BlockNumber blockNum, ReadBufferMode mode,
- BufferAccessStrategy strategy, bool *hit)
+static void
+ZeroBuffer(Buffer buffer, ReadBufferMode mode)
+{
+ BufferDesc *bufHdr;
+ uint32 buf_state;
+
+ Assert(mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK);
+
+ if (BufferIsLocal(buffer))
+ bufHdr = GetLocalBufferDescriptor(-buffer - 1);
+ else
+ {
+ bufHdr = GetBufferDescriptor(buffer - 1);
+ if (mode == RBM_ZERO_AND_LOCK)
+ LockBuffer(buffer, BUFFER_LOCK_EXCLUSIVE);
+ else
+ LockBufferForCleanup(buffer);
+ }
+
+ memset(BufferGetPage(buffer), 0, BLCKSZ);
+
+ if (BufferIsLocal(buffer))
+ {
+ buf_state = pg_atomic_read_u32(&bufHdr->state);
+ buf_state |= BM_VALID;
+ pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
+ }
+ else
+ {
+ buf_state = LockBufHdr(bufHdr);
+ buf_state |= BM_VALID;
+ UnlockBufHdr(bufHdr, buf_state);
+ }
+}
+
+/*
+ * Pin a buffer for a given block. *foundPtr is set to true if the block was
+ * already present, or false if more work is required to either read it in or
+ * zero it.
+ */
+static pg_attribute_always_inline Buffer
+PinBufferForBlock(Relation rel,
+ SMgrRelation smgr,
+ char smgr_persistence,
+ ForkNumber forkNum,
+ BlockNumber blockNum,
+ BufferAccessStrategy strategy,
+ bool *foundPtr)
{
BufferDesc *bufHdr;
- Block bufBlock;
- bool found;
IOContext io_context;
IOObject io_object;
- bool isLocalBuf = SmgrIsTemp(smgr);
+ char persistence;
- *hit = false;
+ Assert(blockNum != P_NEW);
/*
- * Backward compatibility path, most code should use ExtendBufferedRel()
- * instead, as acquiring the extension lock inside ExtendBufferedRel()
- * scales a lot better.
+ * If there is no Relation it usually implies recovery and thus permanent,
+ * but we take an argmument because CreateAndCopyRelationData can reach us
+ * with only an SMgrRelation for an unlogged relation that we don't want
+ * to flag with BM_PERMANENT.
*/
- if (unlikely(blockNum == P_NEW))
- {
- uint32 flags = EB_SKIP_EXTENSION_LOCK;
-
- /*
- * Since no-one else can be looking at the page contents yet, there is
- * no difference between an exclusive lock and a cleanup-strength
- * lock.
- */
- if (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK)
- flags |= EB_LOCK_FIRST;
+ if (rel)
+ persistence = rel->rd_rel->relpersistence;
+ else if (smgr_persistence == 0)
+ persistence = RELPERSISTENCE_PERMANENT;
+ else
+ persistence = smgr_persistence;
- return ExtendBufferedRel(BMR_SMGR(smgr, relpersistence),
- forkNum, strategy, flags);
+ if (persistence == RELPERSISTENCE_TEMP)
+ {
+ io_context = IOCONTEXT_NORMAL;
+ io_object = IOOBJECT_TEMP_RELATION;
+ }
+ else
+ {
+ io_context = IOContextForStrategy(strategy);
+ io_object = IOOBJECT_RELATION;
}
TRACE_POSTGRESQL_BUFFER_READ_START(forkNum, blockNum,
@@ -1051,50 +1101,34 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
smgr->smgr_rlocator.locator.relNumber,
smgr->smgr_rlocator.backend);
- if (isLocalBuf)
+ if (persistence == RELPERSISTENCE_TEMP)
{
- /*
- * We do not use a BufferAccessStrategy for I/O of temporary tables.
- * However, in some cases, the "strategy" may not be NULL, so we can't
- * rely on IOContextForStrategy() to set the right IOContext for us.
- * This may happen in cases like CREATE TEMPORARY TABLE AS...
- */
- io_context = IOCONTEXT_NORMAL;
- io_object = IOOBJECT_TEMP_RELATION;
- bufHdr = LocalBufferAlloc(smgr, forkNum, blockNum, &found);
- if (found)
+ bufHdr = LocalBufferAlloc(smgr, forkNum, blockNum, foundPtr);
+ if (*foundPtr)
pgBufferUsage.local_blks_hit++;
- else if (mode == RBM_NORMAL || mode == RBM_NORMAL_NO_LOG ||
- mode == RBM_ZERO_ON_ERROR)
- pgBufferUsage.local_blks_read++;
}
else
{
+ bufHdr = BufferAlloc(smgr, persistence, forkNum, blockNum,
+ strategy, foundPtr, io_context);
+ if (*foundPtr)
+ pgBufferUsage.shared_blks_hit++;
+ }
+ if (rel)
+ {
/*
- * lookup the buffer. IO_IN_PROGRESS is set if the requested block is
- * not currently in memory.
+ * While pgBufferUsage's "read" counter isn't bumped unless we reach
+ * WaitReadBuffers() (so, not for hits, and not for buffers that are
+ * zeroed instead), the per-relation stats always count them.
*/
- io_context = IOContextForStrategy(strategy);
- io_object = IOOBJECT_RELATION;
- bufHdr = BufferAlloc(smgr, relpersistence, forkNum, blockNum,
- strategy, &found, io_context);
- if (found)
- pgBufferUsage.shared_blks_hit++;
- else if (mode == RBM_NORMAL || mode == RBM_NORMAL_NO_LOG ||
- mode == RBM_ZERO_ON_ERROR)
- pgBufferUsage.shared_blks_read++;
+ pgstat_count_buffer_read(rel);
+ if (*foundPtr)
+ pgstat_count_buffer_hit(rel);
}
-
- /* At this point we do NOT hold any locks. */
-
- /* if it was already in the buffer pool, we're done */
- if (found)
+ if (*foundPtr)
{
- /* Just need to update stats before we exit */
- *hit = true;
VacuumPageHit++;
pgstat_count_io_op(io_object, io_context, IOOP_HIT);
-
if (VacuumCostActive)
VacuumCostBalance += VacuumCostPageHit;
@@ -1103,119 +1137,398 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
smgr->smgr_rlocator.locator.dbOid,
smgr->smgr_rlocator.locator.relNumber,
smgr->smgr_rlocator.backend,
- found);
+ true);
+ }
+
+ return BufferDescriptorGetBuffer(bufHdr);
+}
+
+/*
+ * ReadBuffer_common -- common logic for all ReadBuffer variants
+ *
+ * smgr is required, rel is optional unless using P_NEW.
+ */
+static pg_attribute_always_inline Buffer
+ReadBuffer_common(Relation rel, SMgrRelation smgr, char smgr_persistence,
+ ForkNumber forkNum,
+ BlockNumber blockNum, ReadBufferMode mode,
+ BufferAccessStrategy strategy)
+{
+ ReadBuffersOperation operation;
+ Buffer buffer;
+ int flags;
+
+ /*
+ * Backward compatibility path, most code should use ExtendBufferedRel()
+ * instead, as acquiring the extension lock inside ExtendBufferedRel()
+ * scales a lot better.
+ */
+ if (unlikely(blockNum == P_NEW))
+ {
+ uint32 flags = EB_SKIP_EXTENSION_LOCK;
/*
- * In RBM_ZERO_AND_LOCK mode the caller expects the page to be locked
- * on return.
+ * Since no-one else can be looking at the page contents yet, there is
+ * no difference between an exclusive lock and a cleanup-strength
+ * lock.
*/
- if (!isLocalBuf)
- {
- if (mode == RBM_ZERO_AND_LOCK)
- LWLockAcquire(BufferDescriptorGetContentLock(bufHdr),
- LW_EXCLUSIVE);
- else if (mode == RBM_ZERO_AND_CLEANUP_LOCK)
- LockBufferForCleanup(BufferDescriptorGetBuffer(bufHdr));
- }
+ if (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK)
+ flags |= EB_LOCK_FIRST;
- return BufferDescriptorGetBuffer(bufHdr);
+ return ExtendBufferedRel(BMR_REL(rel), forkNum, strategy, flags);
}
- /*
- * if we have gotten to this point, we have allocated a buffer for the
- * page but its contents are not yet valid. IO_IN_PROGRESS is set for it,
- * if it's a shared buffer.
- */
- Assert(!(pg_atomic_read_u32(&bufHdr->state) & BM_VALID)); /* spinlock not needed */
+ if (unlikely(mode == RBM_ZERO_AND_CLEANUP_LOCK ||
+ mode == RBM_ZERO_AND_LOCK))
+ {
+ bool found;
- bufBlock = isLocalBuf ? LocalBufHdrGetBlock(bufHdr) : BufHdrGetBlock(bufHdr);
+ buffer = PinBufferForBlock(rel, smgr, smgr_persistence,
+ forkNum, blockNum, strategy, &found);
+ ZeroBuffer(buffer, mode);
+ return buffer;
+ }
- /*
- * Read in the page, unless the caller intends to overwrite it and just
- * wants us to allocate a buffer.
- */
- if (mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK)
- MemSet((char *) bufBlock, 0, BLCKSZ);
+ if (mode == RBM_ZERO_ON_ERROR)
+ flags = READ_BUFFERS_ZERO_ON_ERROR;
else
- {
- instr_time io_start = pgstat_prepare_io_time(track_io_timing);
+ flags = 0;
+ operation.smgr = smgr;
+ operation.rel = rel;
+ operation.smgr_persistence = smgr_persistence;
+ operation.forknum = forkNum;
+ operation.strategy = strategy;
+ if (StartReadBuffer(&operation,
+ &buffer,
+ blockNum,
+ flags))
+ WaitReadBuffers(&operation);
+
+ return buffer;
+}
+
+static pg_attribute_always_inline bool
+StartReadBuffersImpl(ReadBuffersOperation *operation,
+ Buffer *buffers,
+ BlockNumber blockNum,
+ int *nblocks,
+ int flags)
+{
+ int actual_nblocks = *nblocks;
+ int io_buffers_len = 0;
- smgrread(smgr, forkNum, blockNum, bufBlock);
+ Assert(*nblocks > 0);
+ Assert(*nblocks <= MAX_IO_COMBINE_LIMIT);
- pgstat_count_io_op_time(io_object, io_context,
- IOOP_READ, io_start, 1);
+ for (int i = 0; i < actual_nblocks; ++i)
+ {
+ bool found;
- /* check for garbage data */
- if (!PageIsVerifiedExtended((Page) bufBlock, blockNum,
- PIV_LOG_WARNING | PIV_REPORT_STAT))
+ buffers[i] = PinBufferForBlock(operation->rel,
+ operation->smgr,
+ operation->smgr_persistence,
+ operation->forknum,
+ blockNum + i,
+ operation->strategy,
+ &found);
+
+ if (found)
{
- if (mode == RBM_ZERO_ON_ERROR || zero_damaged_pages)
- {
- ereport(WARNING,
- (errcode(ERRCODE_DATA_CORRUPTED),
- errmsg("invalid page in block %u of relation %s; zeroing out page",
- blockNum,
- relpath(smgr->smgr_rlocator, forkNum))));
- MemSet((char *) bufBlock, 0, BLCKSZ);
- }
- else
- ereport(ERROR,
- (errcode(ERRCODE_DATA_CORRUPTED),
- errmsg("invalid page in block %u of relation %s",
- blockNum,
- relpath(smgr->smgr_rlocator, forkNum))));
+ /*
+ * Terminate the read as soon as we get a hit. It could be a
+ * single buffer hit, or it could be a hit that follows a readable
+ * range. We don't want to create more than one readable range,
+ * so we stop here.
+ */
+ actual_nblocks = i + 1;
+ break;
+ }
+ else
+ {
+ /* Extend the readable range to cover this block. */
+ io_buffers_len++;
}
}
+ *nblocks = actual_nblocks;
- /*
- * In RBM_ZERO_AND_LOCK / RBM_ZERO_AND_CLEANUP_LOCK mode, grab the buffer
- * content lock before marking the page as valid, to make sure that no
- * other backend sees the zeroed page before the caller has had a chance
- * to initialize it.
- *
- * Since no-one else can be looking at the page contents yet, there is no
- * difference between an exclusive lock and a cleanup-strength lock. (Note
- * that we cannot use LockBuffer() or LockBufferForCleanup() here, because
- * they assert that the buffer is already valid.)
- */
- if ((mode == RBM_ZERO_AND_LOCK || mode == RBM_ZERO_AND_CLEANUP_LOCK) &&
- !isLocalBuf)
+ if (likely(io_buffers_len == 0))
+ return false;
+
+ /* Populate information needed for I/O. */
+ operation->buffers = buffers;
+ operation->blocknum = blockNum;
+ operation->flags = flags;
+ operation->nblocks = actual_nblocks;
+ operation->io_buffers_len = io_buffers_len;
+
+ if (flags & READ_BUFFERS_ISSUE_ADVICE)
{
- LWLockAcquire(BufferDescriptorGetContentLock(bufHdr), LW_EXCLUSIVE);
+ /*
+ * In theory we should only do this if PinBufferForBlock() had to
+ * allocate new buffers above. That way, if two calls to
+ * StartReadBuffers() were made for the same blocks before
+ * WaitReadBuffers(), only the first would issue the advice. That'd be
+ * a better simulation of true asynchronous I/O, which would only
+ * start the I/O once, but isn't done here for simplicity. Note also
+ * that the following call might actually issue two advice calls if we
+ * cross a segment boundary; in a true asynchronous version we might
+ * choose to process only one real I/O at a time in that case.
+ */
+ smgrprefetch(operation->smgr,
+ operation->forknum,
+ blockNum,
+ operation->io_buffers_len);
}
- if (isLocalBuf)
+ /* Indicate that WaitReadBuffers() should be called. */
+ return true;
+}
+
+/*
+ * Begin reading a range of blocks beginning at blockNum and extending for
+ * *nblocks. On return, up to *nblocks pinned buffers holding those blocks
+ * are written into the buffers array, and *nblocks is updated to contain the
+ * actual number, which may be fewer than requested. Caller sets some of the
+ * members of operation; see struct definition.
+ *
+ * If false is returned, no I/O is necessary. If true is returned, one I/O
+ * has been started, and WaitReadBuffers() must be called with the same
+ * operation object before the buffers are accessed. Along with the operation
+ * object, the caller-supplied array of buffers must remain valid until
+ * WaitReadBuffers() is called.
+ *
+ * Currently the I/O is only started with optional operating system advice if
+ * requested by the caller with READ_BUFFERS_ISSUE_ADVICE, and the real I/O
+ * happens synchronously in WaitReadBuffers(). In future work, true I/O could
+ * be initiated here.
+ */
+bool
+StartReadBuffers(ReadBuffersOperation *operation,
+ Buffer *buffers,
+ BlockNumber blockNum,
+ int *nblocks,
+ int flags)
+{
+ return StartReadBuffersImpl(operation, buffers, blockNum, nblocks, flags);
+}
+
+/*
+ * Single block version of the StartReadBuffers(). This might save a few
+ * instructions when called from another translation unit, because it is
+ * specialized for nblocks == 1.
+ */
+bool
+StartReadBuffer(ReadBuffersOperation *operation,
+ Buffer *buffer,
+ BlockNumber blocknum,
+ int flags)
+{
+ int nblocks = 1;
+ bool result;
+
+ result = StartReadBuffersImpl(operation, buffer, blocknum, &nblocks, flags);
+ Assert(nblocks == 1); /* single block can't be short */
+
+ return result;
+}
+
+static inline bool
+WaitReadBuffersCanStartIO(Buffer buffer, bool nowait)
+{
+ if (BufferIsLocal(buffer))
{
- /* Only need to adjust flags */
- uint32 buf_state = pg_atomic_read_u32(&bufHdr->state);
+ BufferDesc *bufHdr = GetLocalBufferDescriptor(-buffer - 1);
- buf_state |= BM_VALID;
- pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
+ return (pg_atomic_read_u32(&bufHdr->state) & BM_VALID) == 0;
}
else
+ return StartBufferIO(GetBufferDescriptor(buffer - 1), true, nowait);
+}
+
+void
+WaitReadBuffers(ReadBuffersOperation *operation)
+{
+ Buffer *buffers;
+ int nblocks;
+ BlockNumber blocknum;
+ ForkNumber forknum;
+ IOContext io_context;
+ IOObject io_object;
+ char persistence;
+
+ /*
+ * Currently operations are only allowed to include a read of some range,
+ * with an optional extra buffer that is already pinned at the end. So
+ * nblocks can be at most one more than io_buffers_len.
+ */
+ Assert((operation->nblocks == operation->io_buffers_len) ||
+ (operation->nblocks == operation->io_buffers_len + 1));
+
+ /* Find the range of the physical read we need to perform. */
+ nblocks = operation->io_buffers_len;
+ if (nblocks == 0)
+ return; /* nothing to do */
+
+ buffers = &operation->buffers[0];
+ blocknum = operation->blocknum;
+ forknum = operation->forknum;
+
+ persistence = operation->rel
+ ? operation->rel->rd_rel->relpersistence
+ : RELPERSISTENCE_PERMANENT;
+ if (persistence == RELPERSISTENCE_TEMP)
{
- /* Set BM_VALID, terminate IO, and wake up any waiters */
- TerminateBufferIO(bufHdr, false, BM_VALID, true);
+ io_context = IOCONTEXT_NORMAL;
+ io_object = IOOBJECT_TEMP_RELATION;
+ }
+ else
+ {
+ io_context = IOContextForStrategy(operation->strategy);
+ io_object = IOOBJECT_RELATION;
}
- VacuumPageMiss++;
- if (VacuumCostActive)
- VacuumCostBalance += VacuumCostPageMiss;
+ /*
+ * We count all these blocks as read by this backend. This is traditional
+ * behavior, but might turn out to be not true if we find that someone
+ * else has beaten us and completed the read of some of these blocks. In
+ * that case the system globally double-counts, but we traditionally don't
+ * count this as a "hit", and we don't have a separate counter for "miss,
+ * but another backend completed the read".
+ */
+ if (persistence == RELPERSISTENCE_TEMP)
+ pgBufferUsage.local_blks_read += nblocks;
+ else
+ pgBufferUsage.shared_blks_read += nblocks;
- TRACE_POSTGRESQL_BUFFER_READ_DONE(forkNum, blockNum,
- smgr->smgr_rlocator.locator.spcOid,
- smgr->smgr_rlocator.locator.dbOid,
- smgr->smgr_rlocator.locator.relNumber,
- smgr->smgr_rlocator.backend,
- found);
+ for (int i = 0; i < nblocks; ++i)
+ {
+ int io_buffers_len;
+ Buffer io_buffers[MAX_IO_COMBINE_LIMIT];
+ void *io_pages[MAX_IO_COMBINE_LIMIT];
+ instr_time io_start;
+ BlockNumber io_first_block;
- return BufferDescriptorGetBuffer(bufHdr);
+ /*
+ * Skip this block if someone else has already completed it. If an
+ * I/O is already in progress in another backend, this will wait for
+ * the outcome: either done, or something went wrong and we will
+ * retry.
+ */
+ if (!WaitReadBuffersCanStartIO(buffers[i], false))
+ {
+ /*
+ * Report this as a 'hit' for this backend, even though it must
+ * have started out as a miss in PinBufferForBlock().
+ */
+ TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, blocknum + i,
+ operation->smgr->smgr_rlocator.locator.spcOid,
+ operation->smgr->smgr_rlocator.locator.dbOid,
+ operation->smgr->smgr_rlocator.locator.relNumber,
+ operation->smgr->smgr_rlocator.backend,
+ true);
+ continue;
+ }
+
+ /* We found a buffer that we need to read in. */
+ io_buffers[0] = buffers[i];
+ io_pages[0] = BufferGetBlock(buffers[i]);
+ io_first_block = blocknum + i;
+ io_buffers_len = 1;
+
+ /*
+ * How many neighboring-on-disk blocks can we can scatter-read into
+ * other buffers at the same time? In this case we don't wait if we
+ * see an I/O already in progress. We already hold BM_IO_IN_PROGRESS
+ * for the head block, so we should get on with that I/O as soon as
+ * possible. We'll come back to this block again, above.
+ */
+ while ((i + 1) < nblocks &&
+ WaitReadBuffersCanStartIO(buffers[i + 1], true))
+ {
+ /* Must be consecutive block numbers. */
+ Assert(BufferGetBlockNumber(buffers[i + 1]) ==
+ BufferGetBlockNumber(buffers[i]) + 1);
+
+ io_buffers[io_buffers_len] = buffers[++i];
+ io_pages[io_buffers_len++] = BufferGetBlock(buffers[i]);
+ }
+
+ io_start = pgstat_prepare_io_time(track_io_timing);
+ smgrreadv(operation->smgr, forknum, io_first_block, io_pages, io_buffers_len);
+ pgstat_count_io_op_time(io_object, io_context, IOOP_READ, io_start,
+ io_buffers_len);
+
+ /* Verify each block we read, and terminate the I/O. */
+ for (int j = 0; j < io_buffers_len; ++j)
+ {
+ BufferDesc *bufHdr;
+ Block bufBlock;
+
+ if (persistence == RELPERSISTENCE_TEMP)
+ {
+ bufHdr = GetLocalBufferDescriptor(-io_buffers[j] - 1);
+ bufBlock = LocalBufHdrGetBlock(bufHdr);
+ }
+ else
+ {
+ bufHdr = GetBufferDescriptor(io_buffers[j] - 1);
+ bufBlock = BufHdrGetBlock(bufHdr);
+ }
+
+ /* check for garbage data */
+ if (!PageIsVerifiedExtended((Page) bufBlock, io_first_block + j,
+ PIV_LOG_WARNING | PIV_REPORT_STAT))
+ {
+ if ((operation->flags & READ_BUFFERS_ZERO_ON_ERROR) || zero_damaged_pages)
+ {
+ ereport(WARNING,
+ (errcode(ERRCODE_DATA_CORRUPTED),
+ errmsg("invalid page in block %u of relation %s; zeroing out page",
+ io_first_block + j,
+ relpath(operation->smgr->smgr_rlocator, forknum))));
+ memset(bufBlock, 0, BLCKSZ);
+ }
+ else
+ ereport(ERROR,
+ (errcode(ERRCODE_DATA_CORRUPTED),
+ errmsg("invalid page in block %u of relation %s",
+ io_first_block + j,
+ relpath(operation->smgr->smgr_rlocator, forknum))));
+ }
+
+ /* Terminate I/O and set BM_VALID. */
+ if (persistence == RELPERSISTENCE_TEMP)
+ {
+ uint32 buf_state = pg_atomic_read_u32(&bufHdr->state);
+
+ buf_state |= BM_VALID;
+ pg_atomic_unlocked_write_u32(&bufHdr->state, buf_state);
+ }
+ else
+ {
+ /* Set BM_VALID, terminate IO, and wake up any waiters */
+ TerminateBufferIO(bufHdr, false, BM_VALID, true);
+ }
+
+ /* Report I/Os as completing individually. */
+ TRACE_POSTGRESQL_BUFFER_READ_DONE(forknum, io_first_block + j,
+ operation->smgr->smgr_rlocator.locator.spcOid,
+ operation->smgr->smgr_rlocator.locator.dbOid,
+ operation->smgr->smgr_rlocator.locator.relNumber,
+ operation->smgr->smgr_rlocator.backend,
+ false);
+ }
+
+ VacuumPageMiss += io_buffers_len;
+ if (VacuumCostActive)
+ VacuumCostBalance += VacuumCostPageMiss * io_buffers_len;
+ }
}
/*
- * BufferAlloc -- subroutine for ReadBuffer. Handles lookup of a shared
- * buffer. If no buffer exists already, selects a replacement
- * victim and evicts the old page, but does NOT read in new page.
+ * BufferAlloc -- subroutine for PinBufferForBlock. Handles lookup of a shared
+ * buffer. If no buffer exists already, selects a replacement victim and
+ * evicts the old page, but does NOT read in new page.
*
* "strategy" can be a buffer replacement strategy object, or NULL for
* the default strategy. The selected buffer's usage_count is advanced when
@@ -1223,11 +1536,7 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
*
* The returned buffer is pinned and is already marked as holding the
* desired page. If it already did have the desired page, *foundPtr is
- * set true. Otherwise, *foundPtr is set false and the buffer is marked
- * as IO_IN_PROGRESS; ReadBuffer will now need to do I/O to fill it.
- *
- * *foundPtr is actually redundant with the buffer's BM_VALID flag, but
- * we keep it for simplicity in ReadBuffer.
+ * set true. Otherwise, *foundPtr is set false.
*
* io_context is passed as an output parameter to avoid calling
* IOContextForStrategy() when there is a shared buffers hit and no IO
@@ -1235,7 +1544,7 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
*
* No locks are held either at entry or exit.
*/
-static BufferDesc *
+static pg_attribute_always_inline BufferDesc *
BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
BlockNumber blockNum,
BufferAccessStrategy strategy,
@@ -1286,19 +1595,10 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
{
/*
* We can only get here if (a) someone else is still reading in
- * the page, or (b) a previous read attempt failed. We have to
- * wait for any active read attempt to finish, and then set up our
- * own read attempt if the page is still not BM_VALID.
- * StartBufferIO does it all.
+ * the page, (b) a previous read attempt failed, or (c) someone
+ * called StartReadBuffers() but not yet WaitReadBuffers().
*/
- if (StartBufferIO(buf, true))
- {
- /*
- * If we get here, previous attempts to read the buffer must
- * have failed ... but we shall bravely try again.
- */
- *foundPtr = false;
- }
+ *foundPtr = false;
}
return buf;
@@ -1363,19 +1663,10 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
{
/*
* We can only get here if (a) someone else is still reading in
- * the page, or (b) a previous read attempt failed. We have to
- * wait for any active read attempt to finish, and then set up our
- * own read attempt if the page is still not BM_VALID.
- * StartBufferIO does it all.
+ * the page, (b) a previous read attempt failed, or (c) someone
+ * called StartReadBuffers() but not yet WaitReadBuffers().
*/
- if (StartBufferIO(existing_buf_hdr, true))
- {
- /*
- * If we get here, previous attempts to read the buffer must
- * have failed ... but we shall bravely try again.
- */
- *foundPtr = false;
- }
+ *foundPtr = false;
}
return existing_buf_hdr;
@@ -1407,15 +1698,9 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum,
LWLockRelease(newPartitionLock);
/*
- * Buffer contents are currently invalid. Try to obtain the right to
- * start I/O. If StartBufferIO returns false, then someone else managed
- * to read it before we did, so there's nothing left for BufferAlloc() to
- * do.
+ * Buffer contents are currently invalid.
*/
- if (StartBufferIO(victim_buf_hdr, true))
- *foundPtr = false;
- else
- *foundPtr = true;
+ *foundPtr = false;
return victim_buf_hdr;
}
@@ -1769,7 +2054,7 @@ again:
* pessimistic, but outside of toy-sized shared_buffers it should allow
* sufficient pins.
*/
-static void
+void
LimitAdditionalPins(uint32 *additional_pins)
{
uint32 max_backends;
@@ -2034,7 +2319,7 @@ ExtendBufferedRelShared(BufferManagerRelation bmr,
buf_state &= ~BM_VALID;
UnlockBufHdr(existing_hdr, buf_state);
- } while (!StartBufferIO(existing_hdr, true));
+ } while (!StartBufferIO(existing_hdr, true, false));
}
else
{
@@ -2057,7 +2342,7 @@ ExtendBufferedRelShared(BufferManagerRelation bmr,
LWLockRelease(partition_lock);
/* XXX: could combine the locked operations in it with the above */
- StartBufferIO(victim_buf_hdr, true);
+ StartBufferIO(victim_buf_hdr, true, false);
}
}
@@ -2372,7 +2657,12 @@ PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy)
else
{
/*
- * If we previously pinned the buffer, it must surely be valid.
+ * If we previously pinned the buffer, it is likely to be valid, but
+ * it may not be if StartReadBuffers() was called and
+ * WaitReadBuffers() hasn't been called yet. We'll check by loading
+ * the flags without locking. This is racy, but it's OK to return
+ * false spuriously: when WaitReadBuffers() calls StartBufferIO(),
+ * it'll see that it's now valid.
*
* Note: We deliberately avoid a Valgrind client request here.
* Individual access methods can optionally superimpose buffer page
@@ -2381,7 +2671,7 @@ PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy)
* that the buffer page is legitimately non-accessible here. We
* cannot meddle with that.
*/
- result = true;
+ result = (pg_atomic_read_u32(&buf->state) & BM_VALID) != 0;
}
ref->refcount++;
@@ -3449,7 +3739,7 @@ FlushBuffer(BufferDesc *buf, SMgrRelation reln, IOObject io_object,
* someone else flushed the buffer before we could, so we need not do
* anything.
*/
- if (!StartBufferIO(buf, false))
+ if (!StartBufferIO(buf, false, false))
return;
/* Setup error traceback support for ereport() */
@@ -5184,9 +5474,15 @@ WaitIO(BufferDesc *buf)
*
* Returns true if we successfully marked the buffer as I/O busy,
* false if someone else already did the work.
+ *
+ * If nowait is true, then we don't wait for an I/O to be finished by another
+ * backend. In that case, false indicates either that the I/O was already
+ * finished, or is still in progress. This is useful for callers that want to
+ * find out if they can perform the I/O as part of a larger operation, without
+ * waiting for the answer or distinguishing the reasons why not.
*/
static bool
-StartBufferIO(BufferDesc *buf, bool forInput)
+StartBufferIO(BufferDesc *buf, bool forInput, bool nowait)
{
uint32 buf_state;
@@ -5199,6 +5495,8 @@ StartBufferIO(BufferDesc *buf, bool forInput)
if (!(buf_state & BM_IO_IN_PROGRESS))
break;
UnlockBufHdr(buf, buf_state);
+ if (nowait)
+ return false;
WaitIO(buf);
}