aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/storage/aio/read_stream.c106
1 files changed, 90 insertions, 16 deletions
diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c
index 36fb9fe152c..175f8410baf 100644
--- a/src/backend/storage/aio/read_stream.c
+++ b/src/backend/storage/aio/read_stream.c
@@ -116,6 +116,7 @@ struct ReadStream
int16 pinned_buffers;
int16 distance;
bool advice_enabled;
+ bool temporary;
/*
* One-block buffer to support 'ungetting' a block number, to resolve flow
@@ -213,8 +214,9 @@ read_stream_get_block(ReadStream *stream, void *per_buffer_data)
}
/*
- * In order to deal with short reads in StartReadBuffers(), we sometimes need
- * to defer handling of a block until later.
+ * In order to deal with buffer shortages and I/O limits after short reads, we
+ * sometimes need to defer handling of a block we've already consumed from the
+ * registered callback until later.
*/
static inline void
read_stream_unget_block(ReadStream *stream, BlockNumber blocknum)
@@ -225,7 +227,17 @@ read_stream_unget_block(ReadStream *stream, BlockNumber blocknum)
stream->buffered_blocknum = blocknum;
}
-static void
+/*
+ * Start as much of the current pending read as we can. If we have to split it
+ * because of the per-backend buffer limit, or the buffer manager decides to
+ * split it, then the pending read is adjusted to hold the remaining portion.
+ *
+ * We can always start a read of at least size one if we have no progress yet.
+ * Otherwise it's possible that we can't start a read at all because of a lack
+ * of buffers, and then false is returned. Buffer shortages also reduce the
+ * distance to a level that prevents look-ahead until buffers are released.
+ */
+static bool
read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
{
bool need_wait;
@@ -234,12 +246,13 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
int16 io_index;
int16 overflow;
int16 buffer_index;
+ int16 buffer_limit;
/* This should only be called with a pending read. */
Assert(stream->pending_read_nblocks > 0);
Assert(stream->pending_read_nblocks <= stream->io_combine_limit);
- /* We had better not exceed the pin limit by starting this read. */
+ /* We had better not exceed the per-stream buffer limit with this read. */
Assert(stream->pinned_buffers + stream->pending_read_nblocks <=
stream->max_pinned_buffers);
@@ -260,10 +273,39 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
else
flags = 0;
- /* We say how many blocks we want to read, but may be smaller on return. */
+ /* How many more buffers is this backend allowed? */
+ if (stream->temporary)
+ buffer_limit = Min(GetAdditionalLocalPinLimit(), PG_INT16_MAX);
+ else
+ buffer_limit = Min(GetAdditionalPinLimit(), PG_INT16_MAX);
+ if (buffer_limit == 0 && stream->pinned_buffers == 0)
+ buffer_limit = 1; /* guarantee progress */
+
+ /* Does the per-backend limit affect this read? */
+ nblocks = stream->pending_read_nblocks;
+ if (buffer_limit < nblocks)
+ {
+ int16 new_distance;
+
+ /* Shrink distance: no more look-ahead until buffers are released. */
+ new_distance = stream->pinned_buffers + buffer_limit;
+ if (stream->distance > new_distance)
+ stream->distance = new_distance;
+
+ /* Unless we have nothing to give the consumer, stop here. */
+ if (stream->pinned_buffers > 0)
+ return false;
+
+ /* A short read is required to make progress. */
+ nblocks = buffer_limit;
+ }
+
+ /*
+ * We say how many blocks we want to read, but it may be smaller on return
+ * if the buffer manager decides to shorten the read.
+ */
buffer_index = stream->next_buffer_index;
io_index = stream->next_io_index;
- nblocks = stream->pending_read_nblocks;
need_wait = StartReadBuffers(&stream->ios[io_index].op,
&stream->buffers[buffer_index],
stream->pending_read_blocknum,
@@ -313,6 +355,8 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice)
/* Adjust the pending read to cover the remaining portion, if any. */
stream->pending_read_blocknum += nblocks;
stream->pending_read_nblocks -= nblocks;
+
+ return true;
}
static void
@@ -361,14 +405,15 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
/* We have to start the pending read before we can build another. */
while (stream->pending_read_nblocks > 0)
{
- read_stream_start_pending_read(stream, suppress_advice);
- suppress_advice = false;
- if (stream->ios_in_progress == stream->max_ios)
+ if (!read_stream_start_pending_read(stream, suppress_advice) ||
+ stream->ios_in_progress == stream->max_ios)
{
- /* And we've hit the limit. Rewind, and stop here. */
+ /* We've hit the buffer or I/O limit. Rewind and stop here. */
read_stream_unget_block(stream, blocknum);
return;
}
+
+ suppress_advice = false;
}
/* This is the start of a new pending read. */
@@ -382,15 +427,25 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice)
* io_combine_limit size once more buffers have been consumed. However,
* if we've already reached io_combine_limit, or we've reached the
* distance limit and there isn't anything pinned yet, or the callback has
- * signaled end-of-stream, we start the read immediately.
+ * signaled end-of-stream, we start the read immediately. Note that the
+ * pending read can exceed the distance goal, if the latter was reduced
+ * after hitting the per-backend buffer limit.
*/
if (stream->pending_read_nblocks > 0 &&
(stream->pending_read_nblocks == stream->io_combine_limit ||
- (stream->pending_read_nblocks == stream->distance &&
+ (stream->pending_read_nblocks >= stream->distance &&
stream->pinned_buffers == 0) ||
stream->distance == 0) &&
stream->ios_in_progress < stream->max_ios)
read_stream_start_pending_read(stream, suppress_advice);
+
+ /*
+ * There should always be something pinned when we leave this function,
+ * whether started by this call or not, unless we've hit the end of the
+ * stream. In the worst case we can always make progress one buffer at a
+ * time.
+ */
+ Assert(stream->pinned_buffers > 0 || stream->distance == 0);
}
/*
@@ -420,6 +475,7 @@ read_stream_begin_impl(int flags,
int max_ios;
int strategy_pin_limit;
uint32 max_pinned_buffers;
+ uint32 max_possible_buffer_limit;
Oid tablespace_id;
/*
@@ -475,12 +531,23 @@ read_stream_begin_impl(int flags,
strategy_pin_limit = GetAccessStrategyPinLimit(strategy);
max_pinned_buffers = Min(strategy_pin_limit, max_pinned_buffers);
- /* Don't allow this backend to pin more than its share of buffers. */
+ /*
+ * Also limit our queue to the maximum number of pins we could ever be
+ * allowed to acquire according to the buffer manager. We may not really
+ * be able to use them all due to other pins held by this backend, but
+ * we'll check that later in read_stream_start_pending_read().
+ */
if (SmgrIsTemp(smgr))
- LimitAdditionalLocalPins(&max_pinned_buffers);
+ max_possible_buffer_limit = GetLocalPinLimit();
else
- LimitAdditionalPins(&max_pinned_buffers);
- Assert(max_pinned_buffers > 0);
+ max_possible_buffer_limit = GetPinLimit();
+ max_pinned_buffers = Min(max_pinned_buffers, max_possible_buffer_limit);
+
+ /*
+ * The limit might be zero on a system configured with too few buffers for
+ * the number of connections. We need at least one to make progress.
+ */
+ max_pinned_buffers = Max(1, max_pinned_buffers);
/*
* We need one extra entry for buffers and per-buffer data, because users
@@ -546,6 +613,7 @@ read_stream_begin_impl(int flags,
stream->callback = callback;
stream->callback_private_data = callback_private_data;
stream->buffered_blocknum = InvalidBlockNumber;
+ stream->temporary = SmgrIsTemp(smgr);
/*
* Skip the initial ramp-up phase if the caller says we're going to be
@@ -674,6 +742,12 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data)
* arbitrary I/O entry (they're all free). We don't have to
* adjust pinned_buffers because we're transferring one to caller
* but pinning one more.
+ *
+ * In the fast path we don't need to check the pin limit. We're
+ * always allowed at least one pin so that progress can be made,
+ * and that's all we need here. Although two pins are momentarily
+ * held at the same time, the model used here is that the stream
+ * holds only one, and the other now belongs to the caller.
*/
if (likely(!StartReadBuffer(&stream->ios[0].op,
&stream->buffers[oldest_buffer_index],