aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/transam/xlogprefetch.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/transam/xlogprefetch.c')
-rw-r--r--src/backend/access/transam/xlogprefetch.c922
1 files changed, 922 insertions, 0 deletions
diff --git a/src/backend/access/transam/xlogprefetch.c b/src/backend/access/transam/xlogprefetch.c
new file mode 100644
index 00000000000..28764326bcc
--- /dev/null
+++ b/src/backend/access/transam/xlogprefetch.c
@@ -0,0 +1,922 @@
+/*-------------------------------------------------------------------------
+ *
+ * xlogprefetch.c
+ * Prefetching support for recovery.
+ *
+ * Portions Copyright (c) 2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/access/transam/xlogprefetch.c
+ *
+ * The goal of this module is to read future WAL records and issue
+ * PrefetchSharedBuffer() calls for referenced blocks, so that we avoid I/O
+ * stalls in the main recovery loop.
+ *
+ * When examining a WAL record from the future, we need to consider that a
+ * referenced block or segment file might not exist on disk until this record
+ * or some earlier record has been replayed. After a crash, a file might also
+ * be missing because it was dropped by a later WAL record; in that case, it
+ * will be recreated when this record is replayed. These cases are handled by
+ * recognizing them and adding a "filter" that prevents all prefetching of a
+ * certain block range until the present WAL record has been replayed. Blocks
+ * skipped for these reasons are counted as "skip_new" (that is, cases where we
+ * didn't try to prefetch "new" blocks).
+ *
+ * Blocks found in the buffer pool already are counted as "skip_hit".
+ * Repeated access to the same buffer is detected and skipped, and this is
+ * counted with "skip_seq". Blocks that were logged with FPWs are skipped if
+ * recovery_prefetch_fpw is off, since on most systems there will be no I/O
+ * stall; this is counted with "skip_fpw".
+ *
+ * The only way we currently have to know that an I/O initiated with
+ * PrefetchSharedBuffer() has that recovery will eventually call ReadBuffer(),
+ * and perform a synchronous read. Therefore, we track the number of
+ * potentially in-flight I/Os by using a circular buffer of LSNs. When it's
+ * full, we have to wait for recovery to replay records so that the queue
+ * depth can be reduced, before we can do any more prefetching. Ideally, this
+ * keeps us the right distance ahead to respect maintenance_io_concurrency.
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/xlog.h"
+#include "access/xlogprefetch.h"
+#include "access/xlogreader.h"
+#include "access/xlogutils.h"
+#include "catalog/storage_xlog.h"
+#include "utils/fmgrprotos.h"
+#include "utils/timestamp.h"
+#include "funcapi.h"
+#include "pgstat.h"
+#include "miscadmin.h"
+#include "port/atomics.h"
+#include "storage/bufmgr.h"
+#include "storage/shmem.h"
+#include "storage/smgr.h"
+#include "utils/guc.h"
+#include "utils/hsearch.h"
+
+/*
+ * Sample the queue depth and distance every time we replay this much WAL.
+ * This is used to compute avg_queue_depth and avg_distance for the log
+ * message that appears at the end of crash recovery. It's also used to send
+ * messages periodically to the stats collector, to save the counters on disk.
+ */
+#define XLOGPREFETCHER_SAMPLE_DISTANCE 0x40000
+
+/* GUCs */
+bool recovery_prefetch = false;
+bool recovery_prefetch_fpw = false;
+
+int XLogPrefetchReconfigureCount;
+
+/*
+ * A prefetcher object. There is at most one of these in existence at a time,
+ * recreated whenever there is a configuration change.
+ */
+struct XLogPrefetcher
+{
+ /* Reader and current reading state. */
+ XLogReaderState *reader;
+ DecodedXLogRecord *record;
+ int next_block_id;
+ bool shutdown;
+
+ /* Details of last prefetch to skip repeats and seq scans. */
+ SMgrRelation last_reln;
+ RelFileNode last_rnode;
+ BlockNumber last_blkno;
+
+ /* Online averages. */
+ uint64 samples;
+ double avg_queue_depth;
+ double avg_distance;
+ XLogRecPtr next_sample_lsn;
+
+ /* Book-keeping required to avoid accessing non-existing blocks. */
+ HTAB *filter_table;
+ dlist_head filter_queue;
+
+ /* Book-keeping required to limit concurrent prefetches. */
+ int prefetch_head;
+ int prefetch_tail;
+ int prefetch_queue_size;
+ XLogRecPtr prefetch_queue[MAX_IO_CONCURRENCY + 1];
+};
+
+/*
+ * A temporary filter used to track block ranges that haven't been created
+ * yet, whole relations that haven't been created yet, and whole relations
+ * that we must assume have already been dropped.
+ */
+typedef struct XLogPrefetcherFilter
+{
+ RelFileNode rnode;
+ XLogRecPtr filter_until_replayed;
+ BlockNumber filter_from_block;
+ dlist_node link;
+} XLogPrefetcherFilter;
+
+/*
+ * Counters exposed in shared memory for pg_stat_prefetch_recovery.
+ */
+typedef struct XLogPrefetchStats
+{
+ pg_atomic_uint64 reset_time; /* Time of last reset. */
+ pg_atomic_uint64 prefetch; /* Prefetches initiated. */
+ pg_atomic_uint64 skip_hit; /* Blocks already buffered. */
+ pg_atomic_uint64 skip_new; /* New/missing blocks filtered. */
+ pg_atomic_uint64 skip_fpw; /* FPWs skipped. */
+ pg_atomic_uint64 skip_seq; /* Repeat blocks skipped. */
+ float avg_distance;
+ float avg_queue_depth;
+
+ /* Reset counters */
+ pg_atomic_uint32 reset_request;
+ uint32 reset_handled;
+
+ /* Dynamic values */
+ int distance; /* Number of bytes ahead in the WAL. */
+ int queue_depth; /* Number of I/Os possibly in progress. */
+} XLogPrefetchStats;
+
+static inline void XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher,
+ RelFileNode rnode,
+ BlockNumber blockno,
+ XLogRecPtr lsn);
+static inline bool XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher,
+ RelFileNode rnode,
+ BlockNumber blockno);
+static inline void XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher,
+ XLogRecPtr replaying_lsn);
+static inline void XLogPrefetcherInitiatedIO(XLogPrefetcher *prefetcher,
+ XLogRecPtr prefetching_lsn);
+static inline void XLogPrefetcherCompletedIO(XLogPrefetcher *prefetcher,
+ XLogRecPtr replaying_lsn);
+static inline bool XLogPrefetcherSaturated(XLogPrefetcher *prefetcher);
+static bool XLogPrefetcherScanRecords(XLogPrefetcher *prefetcher,
+ XLogRecPtr replaying_lsn);
+static bool XLogPrefetcherScanBlocks(XLogPrefetcher *prefetcher);
+static void XLogPrefetchSaveStats(void);
+static void XLogPrefetchRestoreStats(void);
+
+static XLogPrefetchStats *SharedStats;
+
+size_t
+XLogPrefetchShmemSize(void)
+{
+ return sizeof(XLogPrefetchStats);
+}
+
+static void
+XLogPrefetchResetStats(void)
+{
+ pg_atomic_write_u64(&SharedStats->reset_time, GetCurrentTimestamp());
+ pg_atomic_write_u64(&SharedStats->prefetch, 0);
+ pg_atomic_write_u64(&SharedStats->skip_hit, 0);
+ pg_atomic_write_u64(&SharedStats->skip_new, 0);
+ pg_atomic_write_u64(&SharedStats->skip_fpw, 0);
+ pg_atomic_write_u64(&SharedStats->skip_seq, 0);
+ SharedStats->avg_distance = 0;
+ SharedStats->avg_queue_depth = 0;
+}
+
+void
+XLogPrefetchShmemInit(void)
+{
+ bool found;
+
+ SharedStats = (XLogPrefetchStats *)
+ ShmemInitStruct("XLogPrefetchStats",
+ sizeof(XLogPrefetchStats),
+ &found);
+
+ if (!found)
+ {
+ pg_atomic_init_u32(&SharedStats->reset_request, 0);
+ SharedStats->reset_handled = 0;
+
+ pg_atomic_init_u64(&SharedStats->reset_time, GetCurrentTimestamp());
+ pg_atomic_init_u64(&SharedStats->prefetch, 0);
+ pg_atomic_init_u64(&SharedStats->skip_hit, 0);
+ pg_atomic_init_u64(&SharedStats->skip_new, 0);
+ pg_atomic_init_u64(&SharedStats->skip_fpw, 0);
+ pg_atomic_init_u64(&SharedStats->skip_seq, 0);
+ SharedStats->avg_distance = 0;
+ SharedStats->avg_queue_depth = 0;
+ SharedStats->distance = 0;
+ SharedStats->queue_depth = 0;
+ }
+}
+
+/*
+ * Called when any GUC is changed that affects prefetching.
+ */
+void
+XLogPrefetchReconfigure(void)
+{
+ XLogPrefetchReconfigureCount++;
+}
+
+/*
+ * Called by any backend to request that the stats be reset.
+ */
+void
+XLogPrefetchRequestResetStats(void)
+{
+ pg_atomic_fetch_add_u32(&SharedStats->reset_request, 1);
+}
+
+/*
+ * Tell the stats collector to serialize the shared memory counters into the
+ * stats file.
+ */
+static void
+XLogPrefetchSaveStats(void)
+{
+ PgStat_RecoveryPrefetchStats serialized = {
+ .prefetch = pg_atomic_read_u64(&SharedStats->prefetch),
+ .skip_hit = pg_atomic_read_u64(&SharedStats->skip_hit),
+ .skip_new = pg_atomic_read_u64(&SharedStats->skip_new),
+ .skip_fpw = pg_atomic_read_u64(&SharedStats->skip_fpw),
+ .skip_seq = pg_atomic_read_u64(&SharedStats->skip_seq),
+ .stat_reset_timestamp = pg_atomic_read_u64(&SharedStats->reset_time)
+ };
+
+ pgstat_send_recoveryprefetch(&serialized);
+}
+
+/*
+ * Try to restore the shared memory counters from the stats file.
+ */
+static void
+XLogPrefetchRestoreStats(void)
+{
+ PgStat_RecoveryPrefetchStats *serialized = pgstat_fetch_recoveryprefetch();
+
+ if (serialized->stat_reset_timestamp != 0)
+ {
+ pg_atomic_write_u64(&SharedStats->prefetch, serialized->prefetch);
+ pg_atomic_write_u64(&SharedStats->skip_hit, serialized->skip_hit);
+ pg_atomic_write_u64(&SharedStats->skip_new, serialized->skip_new);
+ pg_atomic_write_u64(&SharedStats->skip_fpw, serialized->skip_fpw);
+ pg_atomic_write_u64(&SharedStats->skip_seq, serialized->skip_seq);
+ pg_atomic_write_u64(&SharedStats->reset_time, serialized->stat_reset_timestamp);
+ }
+}
+
+/*
+ * Increment a counter in shared memory. This is equivalent to *counter++ on a
+ * plain uint64 without any memory barrier or locking, except on platforms
+ * where readers can't read uint64 without possibly observing a torn value.
+ */
+static inline void
+XLogPrefetchIncrement(pg_atomic_uint64 *counter)
+{
+ Assert(AmStartupProcess() || !IsUnderPostmaster);
+ pg_atomic_write_u64(counter, pg_atomic_read_u64(counter) + 1);
+}
+
+/*
+ * Initialize an XLogPrefetchState object and restore the last saved
+ * statistics from disk.
+ */
+void
+XLogPrefetchBegin(XLogPrefetchState *state, XLogReaderState *reader)
+{
+ XLogPrefetchRestoreStats();
+
+ /* We'll reconfigure on the first call to XLogPrefetch(). */
+ state->reader = reader;
+ state->prefetcher = NULL;
+ state->reconfigure_count = XLogPrefetchReconfigureCount - 1;
+}
+
+/*
+ * Shut down the prefetching infrastructure, if configured.
+ */
+void
+XLogPrefetchEnd(XLogPrefetchState *state)
+{
+ XLogPrefetchSaveStats();
+
+ if (state->prefetcher)
+ XLogPrefetcherFree(state->prefetcher);
+ state->prefetcher = NULL;
+
+ SharedStats->queue_depth = 0;
+ SharedStats->distance = 0;
+}
+
+/*
+ * Create a prefetcher that is ready to begin prefetching blocks referenced by
+ * WAL records.
+ */
+XLogPrefetcher *
+XLogPrefetcherAllocate(XLogReaderState *reader)
+{
+ XLogPrefetcher *prefetcher;
+ static HASHCTL hash_table_ctl = {
+ .keysize = sizeof(RelFileNode),
+ .entrysize = sizeof(XLogPrefetcherFilter)
+ };
+
+ /*
+ * The size of the queue is based on the maintenance_io_concurrency
+ * setting. In theory we might have a separate queue for each tablespace,
+ * but it's not clear how that should work, so for now we'll just use the
+ * general GUC to rate-limit all prefetching. The queue has space for up
+ * the highest possible value of the GUC + 1, because our circular buffer
+ * has a gap between head and tail when full.
+ */
+ prefetcher = palloc0(sizeof(XLogPrefetcher));
+ prefetcher->prefetch_queue_size = maintenance_io_concurrency + 1;
+ prefetcher->reader = reader;
+ prefetcher->filter_table = hash_create("XLogPrefetcherFilterTable", 1024,
+ &hash_table_ctl,
+ HASH_ELEM | HASH_BLOBS);
+ dlist_init(&prefetcher->filter_queue);
+
+ SharedStats->queue_depth = 0;
+ SharedStats->distance = 0;
+
+ return prefetcher;
+}
+
+/*
+ * Destroy a prefetcher and release all resources.
+ */
+void
+XLogPrefetcherFree(XLogPrefetcher *prefetcher)
+{
+ /* Log final statistics. */
+ ereport(LOG,
+ (errmsg("recovery finished prefetching at %X/%X; "
+ "prefetch = " UINT64_FORMAT ", "
+ "skip_hit = " UINT64_FORMAT ", "
+ "skip_new = " UINT64_FORMAT ", "
+ "skip_fpw = " UINT64_FORMAT ", "
+ "skip_seq = " UINT64_FORMAT ", "
+ "avg_distance = %f, "
+ "avg_queue_depth = %f",
+ (uint32) (prefetcher->reader->EndRecPtr << 32),
+ (uint32) (prefetcher->reader->EndRecPtr),
+ pg_atomic_read_u64(&SharedStats->prefetch),
+ pg_atomic_read_u64(&SharedStats->skip_hit),
+ pg_atomic_read_u64(&SharedStats->skip_new),
+ pg_atomic_read_u64(&SharedStats->skip_fpw),
+ pg_atomic_read_u64(&SharedStats->skip_seq),
+ SharedStats->avg_distance,
+ SharedStats->avg_queue_depth)));
+ hash_destroy(prefetcher->filter_table);
+ pfree(prefetcher);
+}
+
+/*
+ * Called when recovery is replaying a new LSN, to check if we can read ahead.
+ */
+bool
+XLogPrefetcherReadAhead(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn)
+{
+ uint32 reset_request;
+
+ /* If an error has occurred or we've hit the end of the WAL, do nothing. */
+ if (prefetcher->shutdown)
+ return false;
+
+ /*
+ * Have any in-flight prefetches definitely completed, judging by the LSN
+ * that is currently being replayed?
+ */
+ XLogPrefetcherCompletedIO(prefetcher, replaying_lsn);
+
+ /*
+ * Do we already have the maximum permitted number of I/Os running
+ * (according to the information we have)? If so, we have to wait for at
+ * least one to complete, so give up early and let recovery catch up.
+ */
+ if (XLogPrefetcherSaturated(prefetcher))
+ return false;
+
+ /*
+ * Can we drop any filters yet? This happens when the LSN that is
+ * currently being replayed has moved past a record that prevents
+ * prefetching of a block range, such as relation extension.
+ */
+ XLogPrefetcherCompleteFilters(prefetcher, replaying_lsn);
+
+ /*
+ * Have we been asked to reset our stats counters? This is checked with
+ * an unsynchronized memory read, but we'll see it eventually and we'll be
+ * accessing that cache line anyway.
+ */
+ reset_request = pg_atomic_read_u32(&SharedStats->reset_request);
+ if (reset_request != SharedStats->reset_handled)
+ {
+ XLogPrefetchResetStats();
+ SharedStats->reset_handled = reset_request;
+
+ prefetcher->avg_distance = 0;
+ prefetcher->avg_queue_depth = 0;
+ prefetcher->samples = 0;
+ }
+
+ /* OK, we can now try reading ahead. */
+ return XLogPrefetcherScanRecords(prefetcher, replaying_lsn);
+}
+
+/*
+ * Read ahead as far as we are allowed to, considering the LSN that recovery
+ * is currently replaying.
+ *
+ * Return true if the xlogreader would like more data.
+ */
+static bool
+XLogPrefetcherScanRecords(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn)
+{
+ XLogReaderState *reader = prefetcher->reader;
+ DecodedXLogRecord *record;
+
+ Assert(!XLogPrefetcherSaturated(prefetcher));
+
+ for (;;)
+ {
+ char *error;
+ int64 distance;
+
+ /* If we don't already have a record, then try to read one. */
+ if (prefetcher->record == NULL)
+ {
+ switch (XLogReadAhead(reader, &record, &error))
+ {
+ case XLREAD_NEED_DATA:
+ return true;
+ case XLREAD_FAIL:
+ if (error)
+ ereport(LOG,
+ (errmsg("recovery no longer prefetching: %s",
+ error)));
+ else
+ ereport(LOG,
+ (errmsg("recovery no longer prefetching")));
+ prefetcher->shutdown = true;
+ SharedStats->queue_depth = 0;
+ SharedStats->distance = 0;
+
+ return false;
+ case XLREAD_FULL:
+ return false;
+ case XLREAD_SUCCESS:
+ prefetcher->record = record;
+ prefetcher->next_block_id = 0;
+ break;
+ }
+ }
+ else
+ {
+ /*
+ * We ran out of I/O queue while part way through a record. We'll
+ * carry on where we left off, according to next_block_id.
+ */
+ record = prefetcher->record;
+ }
+
+ /* How far ahead of replay are we now? */
+ distance = record->lsn - replaying_lsn;
+
+ /* Update distance shown in shm. */
+ SharedStats->distance = distance;
+
+ /* Periodically recompute some statistics. */
+ if (unlikely(replaying_lsn >= prefetcher->next_sample_lsn))
+ {
+ /* Compute online averages. */
+ prefetcher->samples++;
+ if (prefetcher->samples == 1)
+ {
+ prefetcher->avg_distance = SharedStats->distance;
+ prefetcher->avg_queue_depth = SharedStats->queue_depth;
+ }
+ else
+ {
+ prefetcher->avg_distance +=
+ (SharedStats->distance - prefetcher->avg_distance) /
+ prefetcher->samples;
+ prefetcher->avg_queue_depth +=
+ (SharedStats->queue_depth - prefetcher->avg_queue_depth) /
+ prefetcher->samples;
+ }
+
+ /* Expose it in shared memory. */
+ SharedStats->avg_distance = prefetcher->avg_distance;
+ SharedStats->avg_queue_depth = prefetcher->avg_queue_depth;
+
+ /* Also periodically save the simple counters. */
+ XLogPrefetchSaveStats();
+
+ prefetcher->next_sample_lsn =
+ replaying_lsn + XLOGPREFETCHER_SAMPLE_DISTANCE;
+ }
+
+ /* Are we not far enough ahead? */
+ if (distance <= 0)
+ {
+ /* XXX Is this still possible? */
+ prefetcher->record = NULL; /* skip this record */
+ continue;
+ }
+
+ /*
+ * If this is a record that creates a new SMGR relation, we'll avoid
+ * prefetching anything from that rnode until it has been replayed.
+ */
+ if (replaying_lsn < record->lsn &&
+ record->header.xl_rmid == RM_SMGR_ID &&
+ (record->header.xl_info & ~XLR_INFO_MASK) == XLOG_SMGR_CREATE)
+ {
+ xl_smgr_create *xlrec = (xl_smgr_create *) record->main_data;
+
+ XLogPrefetcherAddFilter(prefetcher, xlrec->rnode, 0, record->lsn);
+ }
+
+ /* Scan the record's block references. */
+ if (!XLogPrefetcherScanBlocks(prefetcher))
+ return false;
+
+ /* Advance to the next record. */
+ prefetcher->record = NULL;
+ }
+}
+
+/*
+ * Scan the current record for block references, and consider prefetching.
+ *
+ * Return true if we processed the current record to completion and still have
+ * queue space to process a new record, and false if we saturated the I/O
+ * queue and need to wait for recovery to advance before we continue.
+ */
+static bool
+XLogPrefetcherScanBlocks(XLogPrefetcher *prefetcher)
+{
+ DecodedXLogRecord *record = prefetcher->record;
+
+ Assert(!XLogPrefetcherSaturated(prefetcher));
+
+ /*
+ * We might already have been partway through processing this record when
+ * our queue became saturated, so we need to start where we left off.
+ */
+ for (int block_id = prefetcher->next_block_id;
+ block_id <= record->max_block_id;
+ ++block_id)
+ {
+ DecodedBkpBlock *block = &record->blocks[block_id];
+ PrefetchBufferResult prefetch;
+ SMgrRelation reln;
+
+ /* Ignore everything but the main fork for now. */
+ if (block->forknum != MAIN_FORKNUM)
+ continue;
+
+ /*
+ * If there is a full page image attached, we won't be reading the
+ * page, so you might think we should skip it. However, if the
+ * underlying filesystem uses larger logical blocks than us, it might
+ * still need to perform a read-before-write some time later.
+ * Therefore, only prefetch if configured to do so.
+ */
+ if (block->has_image && !recovery_prefetch_fpw)
+ {
+ XLogPrefetchIncrement(&SharedStats->skip_fpw);
+ continue;
+ }
+
+ /*
+ * If this block will initialize a new page then it's probably a
+ * relation extension. Since that might create a new segment, we
+ * can't try to prefetch this block until the record has been
+ * replayed, or we might try to open a file that doesn't exist yet.
+ */
+ if (block->flags & BKPBLOCK_WILL_INIT)
+ {
+ XLogPrefetcherAddFilter(prefetcher, block->rnode, block->blkno,
+ record->lsn);
+ XLogPrefetchIncrement(&SharedStats->skip_new);
+ continue;
+ }
+
+ /* Should we skip this block due to a filter? */
+ if (XLogPrefetcherIsFiltered(prefetcher, block->rnode, block->blkno))
+ {
+ XLogPrefetchIncrement(&SharedStats->skip_new);
+ continue;
+ }
+
+ /* Fast path for repeated references to the same relation. */
+ if (RelFileNodeEquals(block->rnode, prefetcher->last_rnode))
+ {
+ /*
+ * If this is a repeat access to the same block, then skip it.
+ *
+ * XXX We could also check for last_blkno + 1 too, and also update
+ * last_blkno; it's not clear if the kernel would do a better job
+ * of sequential prefetching.
+ */
+ if (block->blkno == prefetcher->last_blkno)
+ {
+ XLogPrefetchIncrement(&SharedStats->skip_seq);
+ continue;
+ }
+
+ /* We can avoid calling smgropen(). */
+ reln = prefetcher->last_reln;
+ }
+ else
+ {
+ /* Otherwise we have to open it. */
+ reln = smgropen(block->rnode, InvalidBackendId);
+ prefetcher->last_rnode = block->rnode;
+ prefetcher->last_reln = reln;
+ }
+ prefetcher->last_blkno = block->blkno;
+
+ /* Try to prefetch this block! */
+ prefetch = PrefetchSharedBuffer(reln, block->forknum, block->blkno);
+ if (BufferIsValid(prefetch.recent_buffer))
+ {
+ /*
+ * It was already cached, so do nothing. We'll remember the
+ * buffer, so that recovery can try to avoid looking it up again.
+ */
+ block->recent_buffer = prefetch.recent_buffer;
+ XLogPrefetchIncrement(&SharedStats->skip_hit);
+ }
+ else if (prefetch.initiated_io)
+ {
+ /*
+ * I/O has possibly been initiated (though we don't know if it was
+ * already cached by the kernel, so we just have to assume that it
+ * has due to lack of better information). Record this as an I/O
+ * in progress until eventually we replay this LSN.
+ */
+ XLogPrefetchIncrement(&SharedStats->prefetch);
+ XLogPrefetcherInitiatedIO(prefetcher, record->lsn);
+
+ /*
+ * If the queue is now full, we'll have to wait before processing
+ * any more blocks from this record, or move to a new record if
+ * that was the last block.
+ */
+ if (XLogPrefetcherSaturated(prefetcher))
+ {
+ prefetcher->next_block_id = block_id + 1;
+ return false;
+ }
+ }
+ else
+ {
+ /*
+ * Neither cached nor initiated. The underlying segment file
+ * doesn't exist. Presumably it will be unlinked by a later WAL
+ * record. When recovery reads this block, it will use the
+ * EXTENSION_CREATE_RECOVERY flag. We certainly don't want to do
+ * that sort of thing while merely prefetching, so let's just
+ * ignore references to this relation until this record is
+ * replayed, and let recovery create the dummy file or complain if
+ * something is wrong.
+ */
+ XLogPrefetcherAddFilter(prefetcher, block->rnode, 0,
+ record->lsn);
+ XLogPrefetchIncrement(&SharedStats->skip_new);
+ }
+ }
+
+ return true;
+}
+
+/*
+ * Expose statistics about recovery prefetching.
+ */
+Datum
+pg_stat_get_prefetch_recovery(PG_FUNCTION_ARGS)
+{
+#define PG_STAT_GET_PREFETCH_RECOVERY_COLS 10
+ ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+ TupleDesc tupdesc;
+ Tuplestorestate *tupstore;
+ MemoryContext per_query_ctx;
+ MemoryContext oldcontext;
+ Datum values[PG_STAT_GET_PREFETCH_RECOVERY_COLS];
+ bool nulls[PG_STAT_GET_PREFETCH_RECOVERY_COLS];
+
+ if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("set-valued function called in context that cannot accept a set")));
+ if (!(rsinfo->allowedModes & SFRM_Materialize))
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("materialize mod required, but it is not allowed in this context")));
+
+ if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+ elog(ERROR, "return type must be a row type");
+
+ per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
+ oldcontext = MemoryContextSwitchTo(per_query_ctx);
+
+ tupstore = tuplestore_begin_heap(true, false, work_mem);
+ rsinfo->returnMode = SFRM_Materialize;
+ rsinfo->setResult = tupstore;
+ rsinfo->setDesc = tupdesc;
+
+ MemoryContextSwitchTo(oldcontext);
+
+ if (pg_atomic_read_u32(&SharedStats->reset_request) != SharedStats->reset_handled)
+ {
+ /* There's an unhandled reset request, so just show NULLs */
+ for (int i = 0; i < PG_STAT_GET_PREFETCH_RECOVERY_COLS; ++i)
+ nulls[i] = true;
+ }
+ else
+ {
+ for (int i = 0; i < PG_STAT_GET_PREFETCH_RECOVERY_COLS; ++i)
+ nulls[i] = false;
+ }
+
+ values[0] = TimestampTzGetDatum(pg_atomic_read_u64(&SharedStats->reset_time));
+ values[1] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->prefetch));
+ values[2] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_hit));
+ values[3] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_new));
+ values[4] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_fpw));
+ values[5] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_seq));
+ values[6] = Int32GetDatum(SharedStats->distance);
+ values[7] = Int32GetDatum(SharedStats->queue_depth);
+ values[8] = Float4GetDatum(SharedStats->avg_distance);
+ values[9] = Float4GetDatum(SharedStats->avg_queue_depth);
+ tuplestore_putvalues(tupstore, tupdesc, values, nulls);
+ tuplestore_donestoring(tupstore);
+
+ return (Datum) 0;
+}
+
+/*
+ * Compute (n + 1) % prefetch_queue_size, assuming n < prefetch_queue_size,
+ * without using division.
+ */
+static inline int
+XLogPrefetcherNext(XLogPrefetcher *prefetcher, int n)
+{
+ int next = n + 1;
+
+ return next == prefetcher->prefetch_queue_size ? 0 : next;
+}
+
+/*
+ * Don't prefetch any blocks >= 'blockno' from a given 'rnode', until 'lsn'
+ * has been replayed.
+ */
+static inline void
+XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher, RelFileNode rnode,
+ BlockNumber blockno, XLogRecPtr lsn)
+{
+ XLogPrefetcherFilter *filter;
+ bool found;
+
+ filter = hash_search(prefetcher->filter_table, &rnode, HASH_ENTER, &found);
+ if (!found)
+ {
+ /*
+ * Don't allow any prefetching of this block or higher until replayed.
+ */
+ filter->filter_until_replayed = lsn;
+ filter->filter_from_block = blockno;
+ dlist_push_head(&prefetcher->filter_queue, &filter->link);
+ }
+ else
+ {
+ /*
+ * We were already filtering this rnode. Extend the filter's lifetime
+ * to cover this WAL record, but leave the (presumably lower) block
+ * number there because we don't want to have to track individual
+ * blocks.
+ */
+ filter->filter_until_replayed = lsn;
+ dlist_delete(&filter->link);
+ dlist_push_head(&prefetcher->filter_queue, &filter->link);
+ }
+}
+
+/*
+ * Have we replayed the records that caused us to begin filtering a block
+ * range? That means that relations should have been created, extended or
+ * dropped as required, so we can drop relevant filters.
+ */
+static inline void
+XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn)
+{
+ while (unlikely(!dlist_is_empty(&prefetcher->filter_queue)))
+ {
+ XLogPrefetcherFilter *filter = dlist_tail_element(XLogPrefetcherFilter,
+ link,
+ &prefetcher->filter_queue);
+
+ if (filter->filter_until_replayed >= replaying_lsn)
+ break;
+ dlist_delete(&filter->link);
+ hash_search(prefetcher->filter_table, filter, HASH_REMOVE, NULL);
+ }
+}
+
+/*
+ * Check if a given block should be skipped due to a filter.
+ */
+static inline bool
+XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher, RelFileNode rnode,
+ BlockNumber blockno)
+{
+ /*
+ * Test for empty queue first, because we expect it to be empty most of
+ * the time and we can avoid the hash table lookup in that case.
+ */
+ if (unlikely(!dlist_is_empty(&prefetcher->filter_queue)))
+ {
+ XLogPrefetcherFilter *filter = hash_search(prefetcher->filter_table, &rnode,
+ HASH_FIND, NULL);
+
+ if (filter && filter->filter_from_block <= blockno)
+ return true;
+ }
+
+ return false;
+}
+
+/*
+ * Insert an LSN into the queue. The queue must not be full already. This
+ * tracks the fact that we have (to the best of our knowledge) initiated an
+ * I/O, so that we can impose a cap on concurrent prefetching.
+ */
+static inline void
+XLogPrefetcherInitiatedIO(XLogPrefetcher *prefetcher,
+ XLogRecPtr prefetching_lsn)
+{
+ Assert(!XLogPrefetcherSaturated(prefetcher));
+ prefetcher->prefetch_queue[prefetcher->prefetch_head] = prefetching_lsn;
+ prefetcher->prefetch_head =
+ XLogPrefetcherNext(prefetcher, prefetcher->prefetch_head);
+ SharedStats->queue_depth++;
+
+ Assert(SharedStats->queue_depth <= prefetcher->prefetch_queue_size);
+}
+
+/*
+ * Have we replayed the records that caused us to initiate the oldest
+ * prefetches yet? That means that they're definitely finished, so we can can
+ * forget about them and allow ourselves to initiate more prefetches. For now
+ * we don't have any awareness of when I/O really completes.
+ */
+static inline void
+XLogPrefetcherCompletedIO(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn)
+{
+ while (prefetcher->prefetch_head != prefetcher->prefetch_tail &&
+ prefetcher->prefetch_queue[prefetcher->prefetch_tail] < replaying_lsn)
+ {
+ prefetcher->prefetch_tail =
+ XLogPrefetcherNext(prefetcher, prefetcher->prefetch_tail);
+ SharedStats->queue_depth--;
+
+ Assert(SharedStats->queue_depth >= 0);
+ }
+}
+
+/*
+ * Check if the maximum allowed number of I/Os is already in flight.
+ */
+static inline bool
+XLogPrefetcherSaturated(XLogPrefetcher *prefetcher)
+{
+ int next = XLogPrefetcherNext(prefetcher, prefetcher->prefetch_head);
+
+ return next == prefetcher->prefetch_tail;
+}
+
+void
+assign_recovery_prefetch(bool new_value, void *extra)
+{
+ /* Reconfigure prefetching, because a setting it depends on changed. */
+ recovery_prefetch = new_value;
+ if (AmStartupProcess())
+ XLogPrefetchReconfigure();
+}
+
+void
+assign_recovery_prefetch_fpw(bool new_value, void *extra)
+{
+ /* Reconfigure prefetching, because a setting it depends on changed. */
+ recovery_prefetch_fpw = new_value;
+ if (AmStartupProcess())
+ XLogPrefetchReconfigure();
+}