aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/storage/aio/read_stream.c65
1 files changed, 45 insertions, 20 deletions
diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c
index 175f8410baf..9c0163eda3e 100644
--- a/src/backend/storage/aio/read_stream.c
+++ b/src/backend/storage/aio/read_stream.c
@@ -133,6 +133,7 @@ struct ReadStream
/* Next expected block, for detecting sequential access. */
BlockNumber seq_blocknum;
+ BlockNumber seq_until_processed;
/* The read operation we are currently preparing. */
BlockNumber pending_read_blocknum;
@@ -238,7 +239,7 @@ read_stream_unget_block(ReadStream *stream, BlockNumber blocknum)
* distance to a level that prevents look-ahead until buffers are released.
*/
static bool
-read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
+read_stream_start_pending_read(ReadStream *stream)
{
bool need_wait;
int nblocks;
@@ -262,16 +263,32 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
else
Assert(stream->next_buffer_index == stream->oldest_buffer_index);
- /*
- * If advice hasn't been suppressed, this system supports it, and this
- * isn't a strictly sequential pattern, then we'll issue advice.
- */
- if (!suppress_advice &&
- stream->advice_enabled &&
- stream->pending_read_blocknum != stream->seq_blocknum)
- flags = READ_BUFFERS_ISSUE_ADVICE;
- else
- flags = 0;
+ /* Do we need to issue read-ahead advice? */
+ flags = 0;
+ if (stream->advice_enabled)
+ {
+ if (stream->pending_read_blocknum == stream->seq_blocknum)
+ {
+ /*
+ * Sequential: Issue advice until the preadv() calls have caught
+ * up with the first advice issued for this sequential region, and
+ * then stay of the way of the kernel's own read-ahead.
+ */
+ if (stream->seq_until_processed != InvalidBlockNumber)
+ flags = READ_BUFFERS_ISSUE_ADVICE;
+ }
+ else
+ {
+ /*
+ * Random jump: Note the starting location of a new potential
+ * sequential region and start issuing advice. Skip it this time
+ * if the preadv() follows immediately, eg first block in stream.
+ */
+ stream->seq_until_processed = stream->pending_read_blocknum;
+ if (stream->pinned_buffers > 0)
+ flags = READ_BUFFERS_ISSUE_ADVICE;
+ }
+ }
/* How many more buffers is this backend allowed? */
if (stream->temporary)
@@ -360,7 +377,7 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
}
static void
-read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
+read_stream_look_ahead(ReadStream *stream)
{
while (stream->ios_in_progress < stream->max_ios &&
stream->pinned_buffers + stream->pending_read_nblocks < stream->distance)
@@ -371,8 +388,7 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
if (stream->pending_read_nblocks == stream->io_combine_limit)
{
- read_stream_start_pending_read(stream, suppress_advice);
- suppress_advice = false;
+ read_stream_start_pending_read(stream);
continue;
}
@@ -405,15 +421,13 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
/* We have to start the pending read before we can build another. */
while (stream->pending_read_nblocks > 0)
{
- if (!read_stream_start_pending_read(stream, suppress_advice) ||
+ if (!read_stream_start_pending_read(stream) ||
stream->ios_in_progress == stream->max_ios)
{
/* We've hit the buffer or I/O limit. Rewind and stop here. */
read_stream_unget_block(stream, blocknum);
return;
}
-
- suppress_advice = false;
}
/* This is the start of a new pending read. */
@@ -437,7 +451,7 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
stream->pinned_buffers == 0) ||
stream->distance == 0) &&
stream->ios_in_progress < stream->max_ios)
- read_stream_start_pending_read(stream, suppress_advice);
+ read_stream_start_pending_read(stream);
/*
* There should always be something pinned when we leave this function,
@@ -613,6 +627,8 @@ read_stream_begin_impl(int flags,
stream->callback = callback;
stream->callback_private_data = callback_private_data;
stream->buffered_blocknum = InvalidBlockNumber;
+ stream->seq_blocknum = InvalidBlockNumber;
+ stream->seq_until_processed = InvalidBlockNumber;
stream->temporary = SmgrIsTemp(smgr);
/*
@@ -793,7 +809,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
* space for more, but if we're just starting up we'll need to crank
* the handle to get started.
*/
- read_stream_look_ahead(stream, true);
+ read_stream_look_ahead(stream);
/* End of stream reached? */
if (stream->pinned_buffers == 0)
@@ -854,6 +870,15 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
stream->distance = distance;
}
}
+
+ /*
+ * If we've reached the first block of a sequential region we're
+ * issuing advice for, cancel that until the next jump. The kernel
+ * will see the sequential preadv() pattern starting here.
+ */
+ if (stream->advice_enabled &&
+ stream->ios[io_index].op.blocknum == stream->seq_until_processed)
+ stream->seq_until_processed = InvalidBlockNumber;
}
#ifdef CLOBBER_FREED_MEMORY
@@ -899,7 +924,7 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
stream->oldest_buffer_index = 0;
/* Prepare for the next call. */
- read_stream_look_ahead(stream, false);
+ read_stream_look_ahead(stream);
#ifndef READ_STREAM_DISABLE_FAST_PATH
/* See if we can take the fast path for all-cached scans next time. */