diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/storage/aio/read_stream.c | 106 |
1 files changed, 90 insertions, 16 deletions
diff --git a/src/backend/storage/aio/read_stream.c b/src/backend/storage/aio/read_stream.c index 36fb9fe152c..175f8410baf 100644 --- a/src/backend/storage/aio/read_stream.c +++ b/src/backend/storage/aio/read_stream.c @@ -116,6 +116,7 @@ struct ReadStream int16 pinned_buffers; int16 distance; bool advice_enabled; + bool temporary; /* * One-block buffer to support 'ungetting' a block number, to resolve flow @@ -213,8 +214,9 @@ read_stream_get_block(ReadStream *stream, void *per_buffer_data) } /* - * In order to deal with short reads in StartReadBuffers(), we sometimes need - * to defer handling of a block until later. + * In order to deal with buffer shortages and I/O limits after short reads, we + * sometimes need to defer handling of a block we've already consumed from the + * registered callback until later. */ static inline void read_stream_unget_block(ReadStream *stream, BlockNumber blocknum) @@ -225,7 +227,17 @@ read_stream_unget_block(ReadStream *stream, BlockNumber blocknum) stream->buffered_blocknum = blocknum; } -static void +/* + * Start as much of the current pending read as we can. If we have to split it + * because of the per-backend buffer limit, or the buffer manager decides to + * split it, then the pending read is adjusted to hold the remaining portion. + * + * We can always start a read of at least size one if we have no progress yet. + * Otherwise it's possible that we can't start a read at all because of a lack + * of buffers, and then false is returned. Buffer shortages also reduce the + * distance to a level that prevents look-ahead until buffers are released. + */ +static bool read_stream_start_pending_read(ReadStream *stream, bool suppress_advice) { bool need_wait; @@ -234,12 +246,13 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice) int16 io_index; int16 overflow; int16 buffer_index; + int16 buffer_limit; /* This should only be called with a pending read. */ Assert(stream->pending_read_nblocks > 0); Assert(stream->pending_read_nblocks <= stream->io_combine_limit); - /* We had better not exceed the pin limit by starting this read. */ + /* We had better not exceed the per-stream buffer limit with this read. */ Assert(stream->pinned_buffers + stream->pending_read_nblocks <= stream->max_pinned_buffers); @@ -260,10 +273,39 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice) else flags = 0; - /* We say how many blocks we want to read, but may be smaller on return. */ + /* How many more buffers is this backend allowed? */ + if (stream->temporary) + buffer_limit = Min(GetAdditionalLocalPinLimit(), PG_INT16_MAX); + else + buffer_limit = Min(GetAdditionalPinLimit(), PG_INT16_MAX); + if (buffer_limit == 0 && stream->pinned_buffers == 0) + buffer_limit = 1; /* guarantee progress */ + + /* Does the per-backend limit affect this read? */ + nblocks = stream->pending_read_nblocks; + if (buffer_limit < nblocks) + { + int16 new_distance; + + /* Shrink distance: no more look-ahead until buffers are released. */ + new_distance = stream->pinned_buffers + buffer_limit; + if (stream->distance > new_distance) + stream->distance = new_distance; + + /* Unless we have nothing to give the consumer, stop here. */ + if (stream->pinned_buffers > 0) + return false; + + /* A short read is required to make progress. */ + nblocks = buffer_limit; + } + + /* + * We say how many blocks we want to read, but it may be smaller on return + * if the buffer manager decides to shorten the read. + */ buffer_index = stream->next_buffer_index; io_index = stream->next_io_index; - nblocks = stream->pending_read_nblocks; need_wait = StartReadBuffers(&stream->ios[io_index].op, &stream->buffers[buffer_index], stream->pending_read_blocknum, @@ -313,6 +355,8 @@ read_stream_start_pending_read(ReadStream *stream, bool suppress_advice) /* Adjust the pending read to cover the remaining portion, if any. */ stream->pending_read_blocknum += nblocks; stream->pending_read_nblocks -= nblocks; + + return true; } static void @@ -361,14 +405,15 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice) /* We have to start the pending read before we can build another. */ while (stream->pending_read_nblocks > 0) { - read_stream_start_pending_read(stream, suppress_advice); - suppress_advice = false; - if (stream->ios_in_progress == stream->max_ios) + if (!read_stream_start_pending_read(stream, suppress_advice) || + stream->ios_in_progress == stream->max_ios) { - /* And we've hit the limit. Rewind, and stop here. */ + /* We've hit the buffer or I/O limit. Rewind and stop here. */ read_stream_unget_block(stream, blocknum); return; } + + suppress_advice = false; } /* This is the start of a new pending read. */ @@ -382,15 +427,25 @@ read_stream_look_ahead(ReadStream *stream, bool suppress_advice) * io_combine_limit size once more buffers have been consumed. However, * if we've already reached io_combine_limit, or we've reached the * distance limit and there isn't anything pinned yet, or the callback has - * signaled end-of-stream, we start the read immediately. + * signaled end-of-stream, we start the read immediately. Note that the + * pending read can exceed the distance goal, if the latter was reduced + * after hitting the per-backend buffer limit. */ if (stream->pending_read_nblocks > 0 && (stream->pending_read_nblocks == stream->io_combine_limit || - (stream->pending_read_nblocks == stream->distance && + (stream->pending_read_nblocks >= stream->distance && stream->pinned_buffers == 0) || stream->distance == 0) && stream->ios_in_progress < stream->max_ios) read_stream_start_pending_read(stream, suppress_advice); + + /* + * There should always be something pinned when we leave this function, + * whether started by this call or not, unless we've hit the end of the + * stream. In the worst case we can always make progress one buffer at a + * time. + */ + Assert(stream->pinned_buffers > 0 || stream->distance == 0); } /* @@ -420,6 +475,7 @@ read_stream_begin_impl(int flags, int max_ios; int strategy_pin_limit; uint32 max_pinned_buffers; + uint32 max_possible_buffer_limit; Oid tablespace_id; /* @@ -475,12 +531,23 @@ read_stream_begin_impl(int flags, strategy_pin_limit = GetAccessStrategyPinLimit(strategy); max_pinned_buffers = Min(strategy_pin_limit, max_pinned_buffers); - /* Don't allow this backend to pin more than its share of buffers. */ + /* + * Also limit our queue to the maximum number of pins we could ever be + * allowed to acquire according to the buffer manager. We may not really + * be able to use them all due to other pins held by this backend, but + * we'll check that later in read_stream_start_pending_read(). + */ if (SmgrIsTemp(smgr)) - LimitAdditionalLocalPins(&max_pinned_buffers); + max_possible_buffer_limit = GetLocalPinLimit(); else - LimitAdditionalPins(&max_pinned_buffers); - Assert(max_pinned_buffers > 0); + max_possible_buffer_limit = GetPinLimit(); + max_pinned_buffers = Min(max_pinned_buffers, max_possible_buffer_limit); + + /* + * The limit might be zero on a system configured with too few buffers for + * the number of connections. We need at least one to make progress. + */ + max_pinned_buffers = Max(1, max_pinned_buffers); /* * We need one extra entry for buffers and per-buffer data, because users @@ -546,6 +613,7 @@ read_stream_begin_impl(int flags, stream->callback = callback; stream->callback_private_data = callback_private_data; stream->buffered_blocknum = InvalidBlockNumber; + stream->temporary = SmgrIsTemp(smgr); /* * Skip the initial ramp-up phase if the caller says we're going to be @@ -674,6 +742,12 @@ read_stream_next_buffer(ReadStream *stream, void **per_buffer_data) * arbitrary I/O entry (they're all free). We don't have to * adjust pinned_buffers because we're transferring one to caller * but pinning one more. + * + * In the fast path we don't need to check the pin limit. We're + * always allowed at least one pin so that progress can be made, + * and that's all we need here. Although two pins are momentarily + * held at the same time, the model used here is that the stream + * holds only one, and the other now belongs to the caller. */ if (likely(!StartReadBuffer(&stream->ios[0].op, &stream->buffers[oldest_buffer_index], |