aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/access/transam/Makefile1
-rw-r--r--src/backend/access/transam/xlog.c50
-rw-r--r--src/backend/access/transam/xlogprefetch.c922
-rw-r--r--src/backend/access/transam/xlogreader.c13
-rw-r--r--src/backend/access/transam/xlogutils.c23
-rw-r--r--src/backend/catalog/system_views.sql14
-rw-r--r--src/backend/postmaster/pgstat.c103
-rw-r--r--src/backend/storage/freespace/freespace.c3
-rw-r--r--src/backend/storage/ipc/ipci.c3
-rw-r--r--src/backend/utils/misc/guc.c51
-rw-r--r--src/backend/utils/misc/postgresql.conf.sample6
-rw-r--r--src/include/access/xlog.h1
-rw-r--r--src/include/access/xlogprefetch.h82
-rw-r--r--src/include/access/xlogreader.h7
-rw-r--r--src/include/access/xlogutils.h3
-rw-r--r--src/include/catalog/pg_proc.dat8
-rw-r--r--src/include/pgstat.h26
-rw-r--r--src/include/utils/guc.h4
-rw-r--r--src/test/regress/expected/rules.out11
-rw-r--r--src/tools/pgindent/typedefs.list4
20 files changed, 1318 insertions, 17 deletions
diff --git a/src/backend/access/transam/Makefile b/src/backend/access/transam/Makefile
index 595e02de722..39f9d4e77d4 100644
--- a/src/backend/access/transam/Makefile
+++ b/src/backend/access/transam/Makefile
@@ -31,6 +31,7 @@ OBJS = \
xlogarchive.o \
xlogfuncs.o \
xloginsert.o \
+ xlogprefetch.o \
xlogreader.o \
xlogutils.o
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 729fc5ff13c..adfc6f67e29 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -35,6 +35,7 @@
#include "access/xlog_internal.h"
#include "access/xlogarchive.h"
#include "access/xloginsert.h"
+#include "access/xlogprefetch.h"
#include "access/xlogreader.h"
#include "access/xlogutils.h"
#include "catalog/catversion.h"
@@ -110,6 +111,7 @@ int CommitDelay = 0; /* precommit delay in microseconds */
int CommitSiblings = 5; /* # concurrent xacts needed to sleep */
int wal_retrieve_retry_interval = 5000;
int max_slot_wal_keep_size_mb = -1;
+int wal_decode_buffer_size = 512 * 1024;
bool track_wal_io_timing = false;
#ifdef WAL_DEBUG
@@ -910,7 +912,8 @@ static int XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
XLogSource source, bool notfoundOk);
static int XLogFileReadAnyTLI(XLogSegNo segno, int emode, XLogSource source);
static bool XLogPageRead(XLogReaderState *state,
- bool fetching_ckpt, int emode, bool randAccess);
+ bool fetching_ckpt, int emode, bool randAccess,
+ bool nowait);
static bool WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
bool fetching_ckpt,
XLogRecPtr tliRecPtr,
@@ -1461,7 +1464,7 @@ checkXLogConsistency(XLogReaderState *record)
* temporary page.
*/
buf = XLogReadBufferExtended(rnode, forknum, blkno,
- RBM_NORMAL_NO_LOG);
+ RBM_NORMAL_NO_LOG, InvalidBuffer);
if (!BufferIsValid(buf))
continue;
@@ -3729,7 +3732,6 @@ XLogFileRead(XLogSegNo segno, int emode, TimeLineID tli,
snprintf(activitymsg, sizeof(activitymsg), "waiting for %s",
xlogfname);
set_ps_display(activitymsg);
-
restoredFromArchive = RestoreArchivedFile(path, xlogfname,
"RECOVERYXLOG",
wal_segment_size,
@@ -4389,9 +4391,9 @@ ReadRecord(XLogReaderState *xlogreader, int emode,
while ((result = XLogReadRecord(xlogreader, &record, &errormsg))
== XLREAD_NEED_DATA)
{
- if (!XLogPageRead(xlogreader, fetching_ckpt, emode, randAccess))
+ if (!XLogPageRead(xlogreader, fetching_ckpt, emode, randAccess,
+ false /* wait for data if streaming */))
break;
-
}
ReadRecPtr = xlogreader->ReadRecPtr;
@@ -6633,6 +6635,12 @@ StartupXLOG(void)
xlogreader->system_identifier = ControlFile->system_identifier;
/*
+ * Set the WAL decode buffer size. This limits how far ahead we can read
+ * in the WAL.
+ */
+ XLogReaderSetDecodeBuffer(xlogreader, NULL, wal_decode_buffer_size);
+
+ /*
* Allocate two page buffers dedicated to WAL consistency checks. We do
* it this way, rather than just making static arrays, for two reasons:
* (1) no need to waste the storage in most instantiations of the backend;
@@ -7312,6 +7320,7 @@ StartupXLOG(void)
{
ErrorContextCallback errcallback;
TimestampTz xtime;
+ XLogPrefetchState prefetch;
PGRUsage ru0;
pg_rusage_init(&ru0);
@@ -7322,6 +7331,9 @@ StartupXLOG(void)
(errmsg("redo starts at %X/%X",
LSN_FORMAT_ARGS(ReadRecPtr))));
+ /* Prepare to prefetch, if configured. */
+ XLogPrefetchBegin(&prefetch, xlogreader);
+
/*
* main redo apply loop
*/
@@ -7351,6 +7363,14 @@ StartupXLOG(void)
/* Handle interrupt signals of startup process */
HandleStartupProcInterrupts();
+ /* Perform WAL prefetching, if enabled. */
+ while (XLogPrefetch(&prefetch, xlogreader->ReadRecPtr) == XLREAD_NEED_DATA)
+ {
+ if (!XLogPageRead(xlogreader, false, LOG, false,
+ true /* don't wait for streaming data */))
+ break;
+ }
+
/*
* Pause WAL replay, if requested by a hot-standby session via
* SetRecoveryPause().
@@ -7524,6 +7544,9 @@ StartupXLOG(void)
*/
if (AllowCascadeReplication())
WalSndWakeup();
+
+ /* Reset the prefetcher. */
+ XLogPrefetchReconfigure();
}
/* Exit loop if we reached inclusive recovery target */
@@ -7540,6 +7563,7 @@ StartupXLOG(void)
/*
* end of main redo apply loop
*/
+ XLogPrefetchEnd(&prefetch);
if (reachedRecoveryTarget)
{
@@ -12109,10 +12133,13 @@ CancelBackup(void)
* and call XLogPageRead() again with the same arguments. This lets
* XLogPageRead() to try fetching the record from another source, or to
* sleep and retry.
+ *
+ * If nowait is true, then return false immediately if the requested data isn't
+ * available yet.
*/
static bool
XLogPageRead(XLogReaderState *state,
- bool fetching_ckpt, int emode, bool randAccess)
+ bool fetching_ckpt, int emode, bool randAccess, bool nowait)
{
char *readBuf = state->readBuf;
XLogRecPtr targetPagePtr = state->readPagePtr;
@@ -12136,9 +12163,6 @@ XLogPageRead(XLogReaderState *state,
/*
* Request a restartpoint if we've replayed too much xlog since the
* last one.
- *
- * XXX Why is this here? Move it to recovery loop, since it's based
- * on replay position, not read position?
*/
if (bgwriterLaunched)
{
@@ -12163,6 +12187,12 @@ retry:
(readSource == XLOG_FROM_STREAM &&
flushedUpto < targetPagePtr + reqLen))
{
+ if (nowait)
+ {
+ XLogReaderSetInputData(state, -1);
+ return false;
+ }
+
if (!WaitForWALToBecomeAvailable(targetPagePtr + reqLen,
randAccess, fetching_ckpt,
targetRecPtr, state->seg.ws_segno))
@@ -12396,6 +12426,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
*/
currentSource = XLOG_FROM_STREAM;
startWalReceiver = true;
+ XLogPrefetchReconfigure();
break;
case XLOG_FROM_STREAM:
@@ -12651,6 +12682,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
else
havedata = false;
}
+
if (havedata)
{
/*
diff --git a/src/backend/access/transam/xlogprefetch.c b/src/backend/access/transam/xlogprefetch.c
new file mode 100644
index 00000000000..28764326bcc
--- /dev/null
+++ b/src/backend/access/transam/xlogprefetch.c
@@ -0,0 +1,922 @@
+/*-------------------------------------------------------------------------
+ *
+ * xlogprefetch.c
+ * Prefetching support for recovery.
+ *
+ * Portions Copyright (c) 2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ *
+ * IDENTIFICATION
+ * src/backend/access/transam/xlogprefetch.c
+ *
+ * The goal of this module is to read future WAL records and issue
+ * PrefetchSharedBuffer() calls for referenced blocks, so that we avoid I/O
+ * stalls in the main recovery loop.
+ *
+ * When examining a WAL record from the future, we need to consider that a
+ * referenced block or segment file might not exist on disk until this record
+ * or some earlier record has been replayed. After a crash, a file might also
+ * be missing because it was dropped by a later WAL record; in that case, it
+ * will be recreated when this record is replayed. These cases are handled by
+ * recognizing them and adding a "filter" that prevents all prefetching of a
+ * certain block range until the present WAL record has been replayed. Blocks
+ * skipped for these reasons are counted as "skip_new" (that is, cases where we
+ * didn't try to prefetch "new" blocks).
+ *
+ * Blocks found in the buffer pool already are counted as "skip_hit".
+ * Repeated access to the same buffer is detected and skipped, and this is
+ * counted with "skip_seq". Blocks that were logged with FPWs are skipped if
+ * recovery_prefetch_fpw is off, since on most systems there will be no I/O
+ * stall; this is counted with "skip_fpw".
+ *
+ * The only way we currently have to know that an I/O initiated with
+ * PrefetchSharedBuffer() has that recovery will eventually call ReadBuffer(),
+ * and perform a synchronous read. Therefore, we track the number of
+ * potentially in-flight I/Os by using a circular buffer of LSNs. When it's
+ * full, we have to wait for recovery to replay records so that the queue
+ * depth can be reduced, before we can do any more prefetching. Ideally, this
+ * keeps us the right distance ahead to respect maintenance_io_concurrency.
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/xlog.h"
+#include "access/xlogprefetch.h"
+#include "access/xlogreader.h"
+#include "access/xlogutils.h"
+#include "catalog/storage_xlog.h"
+#include "utils/fmgrprotos.h"
+#include "utils/timestamp.h"
+#include "funcapi.h"
+#include "pgstat.h"
+#include "miscadmin.h"
+#include "port/atomics.h"
+#include "storage/bufmgr.h"
+#include "storage/shmem.h"
+#include "storage/smgr.h"
+#include "utils/guc.h"
+#include "utils/hsearch.h"
+
+/*
+ * Sample the queue depth and distance every time we replay this much WAL.
+ * This is used to compute avg_queue_depth and avg_distance for the log
+ * message that appears at the end of crash recovery. It's also used to send
+ * messages periodically to the stats collector, to save the counters on disk.
+ */
+#define XLOGPREFETCHER_SAMPLE_DISTANCE 0x40000
+
+/* GUCs */
+bool recovery_prefetch = false;
+bool recovery_prefetch_fpw = false;
+
+int XLogPrefetchReconfigureCount;
+
+/*
+ * A prefetcher object. There is at most one of these in existence at a time,
+ * recreated whenever there is a configuration change.
+ */
+struct XLogPrefetcher
+{
+ /* Reader and current reading state. */
+ XLogReaderState *reader;
+ DecodedXLogRecord *record;
+ int next_block_id;
+ bool shutdown;
+
+ /* Details of last prefetch to skip repeats and seq scans. */
+ SMgrRelation last_reln;
+ RelFileNode last_rnode;
+ BlockNumber last_blkno;
+
+ /* Online averages. */
+ uint64 samples;
+ double avg_queue_depth;
+ double avg_distance;
+ XLogRecPtr next_sample_lsn;
+
+ /* Book-keeping required to avoid accessing non-existing blocks. */
+ HTAB *filter_table;
+ dlist_head filter_queue;
+
+ /* Book-keeping required to limit concurrent prefetches. */
+ int prefetch_head;
+ int prefetch_tail;
+ int prefetch_queue_size;
+ XLogRecPtr prefetch_queue[MAX_IO_CONCURRENCY + 1];
+};
+
+/*
+ * A temporary filter used to track block ranges that haven't been created
+ * yet, whole relations that haven't been created yet, and whole relations
+ * that we must assume have already been dropped.
+ */
+typedef struct XLogPrefetcherFilter
+{
+ RelFileNode rnode;
+ XLogRecPtr filter_until_replayed;
+ BlockNumber filter_from_block;
+ dlist_node link;
+} XLogPrefetcherFilter;
+
+/*
+ * Counters exposed in shared memory for pg_stat_prefetch_recovery.
+ */
+typedef struct XLogPrefetchStats
+{
+ pg_atomic_uint64 reset_time; /* Time of last reset. */
+ pg_atomic_uint64 prefetch; /* Prefetches initiated. */
+ pg_atomic_uint64 skip_hit; /* Blocks already buffered. */
+ pg_atomic_uint64 skip_new; /* New/missing blocks filtered. */
+ pg_atomic_uint64 skip_fpw; /* FPWs skipped. */
+ pg_atomic_uint64 skip_seq; /* Repeat blocks skipped. */
+ float avg_distance;
+ float avg_queue_depth;
+
+ /* Reset counters */
+ pg_atomic_uint32 reset_request;
+ uint32 reset_handled;
+
+ /* Dynamic values */
+ int distance; /* Number of bytes ahead in the WAL. */
+ int queue_depth; /* Number of I/Os possibly in progress. */
+} XLogPrefetchStats;
+
+static inline void XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher,
+ RelFileNode rnode,
+ BlockNumber blockno,
+ XLogRecPtr lsn);
+static inline bool XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher,
+ RelFileNode rnode,
+ BlockNumber blockno);
+static inline void XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher,
+ XLogRecPtr replaying_lsn);
+static inline void XLogPrefetcherInitiatedIO(XLogPrefetcher *prefetcher,
+ XLogRecPtr prefetching_lsn);
+static inline void XLogPrefetcherCompletedIO(XLogPrefetcher *prefetcher,
+ XLogRecPtr replaying_lsn);
+static inline bool XLogPrefetcherSaturated(XLogPrefetcher *prefetcher);
+static bool XLogPrefetcherScanRecords(XLogPrefetcher *prefetcher,
+ XLogRecPtr replaying_lsn);
+static bool XLogPrefetcherScanBlocks(XLogPrefetcher *prefetcher);
+static void XLogPrefetchSaveStats(void);
+static void XLogPrefetchRestoreStats(void);
+
+static XLogPrefetchStats *SharedStats;
+
+size_t
+XLogPrefetchShmemSize(void)
+{
+ return sizeof(XLogPrefetchStats);
+}
+
+static void
+XLogPrefetchResetStats(void)
+{
+ pg_atomic_write_u64(&SharedStats->reset_time, GetCurrentTimestamp());
+ pg_atomic_write_u64(&SharedStats->prefetch, 0);
+ pg_atomic_write_u64(&SharedStats->skip_hit, 0);
+ pg_atomic_write_u64(&SharedStats->skip_new, 0);
+ pg_atomic_write_u64(&SharedStats->skip_fpw, 0);
+ pg_atomic_write_u64(&SharedStats->skip_seq, 0);
+ SharedStats->avg_distance = 0;
+ SharedStats->avg_queue_depth = 0;
+}
+
+void
+XLogPrefetchShmemInit(void)
+{
+ bool found;
+
+ SharedStats = (XLogPrefetchStats *)
+ ShmemInitStruct("XLogPrefetchStats",
+ sizeof(XLogPrefetchStats),
+ &found);
+
+ if (!found)
+ {
+ pg_atomic_init_u32(&SharedStats->reset_request, 0);
+ SharedStats->reset_handled = 0;
+
+ pg_atomic_init_u64(&SharedStats->reset_time, GetCurrentTimestamp());
+ pg_atomic_init_u64(&SharedStats->prefetch, 0);
+ pg_atomic_init_u64(&SharedStats->skip_hit, 0);
+ pg_atomic_init_u64(&SharedStats->skip_new, 0);
+ pg_atomic_init_u64(&SharedStats->skip_fpw, 0);
+ pg_atomic_init_u64(&SharedStats->skip_seq, 0);
+ SharedStats->avg_distance = 0;
+ SharedStats->avg_queue_depth = 0;
+ SharedStats->distance = 0;
+ SharedStats->queue_depth = 0;
+ }
+}
+
+/*
+ * Called when any GUC is changed that affects prefetching.
+ */
+void
+XLogPrefetchReconfigure(void)
+{
+ XLogPrefetchReconfigureCount++;
+}
+
+/*
+ * Called by any backend to request that the stats be reset.
+ */
+void
+XLogPrefetchRequestResetStats(void)
+{
+ pg_atomic_fetch_add_u32(&SharedStats->reset_request, 1);
+}
+
+/*
+ * Tell the stats collector to serialize the shared memory counters into the
+ * stats file.
+ */
+static void
+XLogPrefetchSaveStats(void)
+{
+ PgStat_RecoveryPrefetchStats serialized = {
+ .prefetch = pg_atomic_read_u64(&SharedStats->prefetch),
+ .skip_hit = pg_atomic_read_u64(&SharedStats->skip_hit),
+ .skip_new = pg_atomic_read_u64(&SharedStats->skip_new),
+ .skip_fpw = pg_atomic_read_u64(&SharedStats->skip_fpw),
+ .skip_seq = pg_atomic_read_u64(&SharedStats->skip_seq),
+ .stat_reset_timestamp = pg_atomic_read_u64(&SharedStats->reset_time)
+ };
+
+ pgstat_send_recoveryprefetch(&serialized);
+}
+
+/*
+ * Try to restore the shared memory counters from the stats file.
+ */
+static void
+XLogPrefetchRestoreStats(void)
+{
+ PgStat_RecoveryPrefetchStats *serialized = pgstat_fetch_recoveryprefetch();
+
+ if (serialized->stat_reset_timestamp != 0)
+ {
+ pg_atomic_write_u64(&SharedStats->prefetch, serialized->prefetch);
+ pg_atomic_write_u64(&SharedStats->skip_hit, serialized->skip_hit);
+ pg_atomic_write_u64(&SharedStats->skip_new, serialized->skip_new);
+ pg_atomic_write_u64(&SharedStats->skip_fpw, serialized->skip_fpw);
+ pg_atomic_write_u64(&SharedStats->skip_seq, serialized->skip_seq);
+ pg_atomic_write_u64(&SharedStats->reset_time, serialized->stat_reset_timestamp);
+ }
+}
+
+/*
+ * Increment a counter in shared memory. This is equivalent to *counter++ on a
+ * plain uint64 without any memory barrier or locking, except on platforms
+ * where readers can't read uint64 without possibly observing a torn value.
+ */
+static inline void
+XLogPrefetchIncrement(pg_atomic_uint64 *counter)
+{
+ Assert(AmStartupProcess() || !IsUnderPostmaster);
+ pg_atomic_write_u64(counter, pg_atomic_read_u64(counter) + 1);
+}
+
+/*
+ * Initialize an XLogPrefetchState object and restore the last saved
+ * statistics from disk.
+ */
+void
+XLogPrefetchBegin(XLogPrefetchState *state, XLogReaderState *reader)
+{
+ XLogPrefetchRestoreStats();
+
+ /* We'll reconfigure on the first call to XLogPrefetch(). */
+ state->reader = reader;
+ state->prefetcher = NULL;
+ state->reconfigure_count = XLogPrefetchReconfigureCount - 1;
+}
+
+/*
+ * Shut down the prefetching infrastructure, if configured.
+ */
+void
+XLogPrefetchEnd(XLogPrefetchState *state)
+{
+ XLogPrefetchSaveStats();
+
+ if (state->prefetcher)
+ XLogPrefetcherFree(state->prefetcher);
+ state->prefetcher = NULL;
+
+ SharedStats->queue_depth = 0;
+ SharedStats->distance = 0;
+}
+
+/*
+ * Create a prefetcher that is ready to begin prefetching blocks referenced by
+ * WAL records.
+ */
+XLogPrefetcher *
+XLogPrefetcherAllocate(XLogReaderState *reader)
+{
+ XLogPrefetcher *prefetcher;
+ static HASHCTL hash_table_ctl = {
+ .keysize = sizeof(RelFileNode),
+ .entrysize = sizeof(XLogPrefetcherFilter)
+ };
+
+ /*
+ * The size of the queue is based on the maintenance_io_concurrency
+ * setting. In theory we might have a separate queue for each tablespace,
+ * but it's not clear how that should work, so for now we'll just use the
+ * general GUC to rate-limit all prefetching. The queue has space for up
+ * the highest possible value of the GUC + 1, because our circular buffer
+ * has a gap between head and tail when full.
+ */
+ prefetcher = palloc0(sizeof(XLogPrefetcher));
+ prefetcher->prefetch_queue_size = maintenance_io_concurrency + 1;
+ prefetcher->reader = reader;
+ prefetcher->filter_table = hash_create("XLogPrefetcherFilterTable", 1024,
+ &hash_table_ctl,
+ HASH_ELEM | HASH_BLOBS);
+ dlist_init(&prefetcher->filter_queue);
+
+ SharedStats->queue_depth = 0;
+ SharedStats->distance = 0;
+
+ return prefetcher;
+}
+
+/*
+ * Destroy a prefetcher and release all resources.
+ */
+void
+XLogPrefetcherFree(XLogPrefetcher *prefetcher)
+{
+ /* Log final statistics. */
+ ereport(LOG,
+ (errmsg("recovery finished prefetching at %X/%X; "
+ "prefetch = " UINT64_FORMAT ", "
+ "skip_hit = " UINT64_FORMAT ", "
+ "skip_new = " UINT64_FORMAT ", "
+ "skip_fpw = " UINT64_FORMAT ", "
+ "skip_seq = " UINT64_FORMAT ", "
+ "avg_distance = %f, "
+ "avg_queue_depth = %f",
+ (uint32) (prefetcher->reader->EndRecPtr << 32),
+ (uint32) (prefetcher->reader->EndRecPtr),
+ pg_atomic_read_u64(&SharedStats->prefetch),
+ pg_atomic_read_u64(&SharedStats->skip_hit),
+ pg_atomic_read_u64(&SharedStats->skip_new),
+ pg_atomic_read_u64(&SharedStats->skip_fpw),
+ pg_atomic_read_u64(&SharedStats->skip_seq),
+ SharedStats->avg_distance,
+ SharedStats->avg_queue_depth)));
+ hash_destroy(prefetcher->filter_table);
+ pfree(prefetcher);
+}
+
+/*
+ * Called when recovery is replaying a new LSN, to check if we can read ahead.
+ */
+bool
+XLogPrefetcherReadAhead(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn)
+{
+ uint32 reset_request;
+
+ /* If an error has occurred or we've hit the end of the WAL, do nothing. */
+ if (prefetcher->shutdown)
+ return false;
+
+ /*
+ * Have any in-flight prefetches definitely completed, judging by the LSN
+ * that is currently being replayed?
+ */
+ XLogPrefetcherCompletedIO(prefetcher, replaying_lsn);
+
+ /*
+ * Do we already have the maximum permitted number of I/Os running
+ * (according to the information we have)? If so, we have to wait for at
+ * least one to complete, so give up early and let recovery catch up.
+ */
+ if (XLogPrefetcherSaturated(prefetcher))
+ return false;
+
+ /*
+ * Can we drop any filters yet? This happens when the LSN that is
+ * currently being replayed has moved past a record that prevents
+ * prefetching of a block range, such as relation extension.
+ */
+ XLogPrefetcherCompleteFilters(prefetcher, replaying_lsn);
+
+ /*
+ * Have we been asked to reset our stats counters? This is checked with
+ * an unsynchronized memory read, but we'll see it eventually and we'll be
+ * accessing that cache line anyway.
+ */
+ reset_request = pg_atomic_read_u32(&SharedStats->reset_request);
+ if (reset_request != SharedStats->reset_handled)
+ {
+ XLogPrefetchResetStats();
+ SharedStats->reset_handled = reset_request;
+
+ prefetcher->avg_distance = 0;
+ prefetcher->avg_queue_depth = 0;
+ prefetcher->samples = 0;
+ }
+
+ /* OK, we can now try reading ahead. */
+ return XLogPrefetcherScanRecords(prefetcher, replaying_lsn);
+}
+
+/*
+ * Read ahead as far as we are allowed to, considering the LSN that recovery
+ * is currently replaying.
+ *
+ * Return true if the xlogreader would like more data.
+ */
+static bool
+XLogPrefetcherScanRecords(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn)
+{
+ XLogReaderState *reader = prefetcher->reader;
+ DecodedXLogRecord *record;
+
+ Assert(!XLogPrefetcherSaturated(prefetcher));
+
+ for (;;)
+ {
+ char *error;
+ int64 distance;
+
+ /* If we don't already have a record, then try to read one. */
+ if (prefetcher->record == NULL)
+ {
+ switch (XLogReadAhead(reader, &record, &error))
+ {
+ case XLREAD_NEED_DATA:
+ return true;
+ case XLREAD_FAIL:
+ if (error)
+ ereport(LOG,
+ (errmsg("recovery no longer prefetching: %s",
+ error)));
+ else
+ ereport(LOG,
+ (errmsg("recovery no longer prefetching")));
+ prefetcher->shutdown = true;
+ SharedStats->queue_depth = 0;
+ SharedStats->distance = 0;
+
+ return false;
+ case XLREAD_FULL:
+ return false;
+ case XLREAD_SUCCESS:
+ prefetcher->record = record;
+ prefetcher->next_block_id = 0;
+ break;
+ }
+ }
+ else
+ {
+ /*
+ * We ran out of I/O queue while part way through a record. We'll
+ * carry on where we left off, according to next_block_id.
+ */
+ record = prefetcher->record;
+ }
+
+ /* How far ahead of replay are we now? */
+ distance = record->lsn - replaying_lsn;
+
+ /* Update distance shown in shm. */
+ SharedStats->distance = distance;
+
+ /* Periodically recompute some statistics. */
+ if (unlikely(replaying_lsn >= prefetcher->next_sample_lsn))
+ {
+ /* Compute online averages. */
+ prefetcher->samples++;
+ if (prefetcher->samples == 1)
+ {
+ prefetcher->avg_distance = SharedStats->distance;
+ prefetcher->avg_queue_depth = SharedStats->queue_depth;
+ }
+ else
+ {
+ prefetcher->avg_distance +=
+ (SharedStats->distance - prefetcher->avg_distance) /
+ prefetcher->samples;
+ prefetcher->avg_queue_depth +=
+ (SharedStats->queue_depth - prefetcher->avg_queue_depth) /
+ prefetcher->samples;
+ }
+
+ /* Expose it in shared memory. */
+ SharedStats->avg_distance = prefetcher->avg_distance;
+ SharedStats->avg_queue_depth = prefetcher->avg_queue_depth;
+
+ /* Also periodically save the simple counters. */
+ XLogPrefetchSaveStats();
+
+ prefetcher->next_sample_lsn =
+ replaying_lsn + XLOGPREFETCHER_SAMPLE_DISTANCE;
+ }
+
+ /* Are we not far enough ahead? */
+ if (distance <= 0)
+ {
+ /* XXX Is this still possible? */
+ prefetcher->record = NULL; /* skip this record */
+ continue;
+ }
+
+ /*
+ * If this is a record that creates a new SMGR relation, we'll avoid
+ * prefetching anything from that rnode until it has been replayed.
+ */
+ if (replaying_lsn < record->lsn &&
+ record->header.xl_rmid == RM_SMGR_ID &&
+ (record->header.xl_info & ~XLR_INFO_MASK) == XLOG_SMGR_CREATE)
+ {
+ xl_smgr_create *xlrec = (xl_smgr_create *) record->main_data;
+
+ XLogPrefetcherAddFilter(prefetcher, xlrec->rnode, 0, record->lsn);
+ }
+
+ /* Scan the record's block references. */
+ if (!XLogPrefetcherScanBlocks(prefetcher))
+ return false;
+
+ /* Advance to the next record. */
+ prefetcher->record = NULL;
+ }
+}
+
+/*
+ * Scan the current record for block references, and consider prefetching.
+ *
+ * Return true if we processed the current record to completion and still have
+ * queue space to process a new record, and false if we saturated the I/O
+ * queue and need to wait for recovery to advance before we continue.
+ */
+static bool
+XLogPrefetcherScanBlocks(XLogPrefetcher *prefetcher)
+{
+ DecodedXLogRecord *record = prefetcher->record;
+
+ Assert(!XLogPrefetcherSaturated(prefetcher));
+
+ /*
+ * We might already have been partway through processing this record when
+ * our queue became saturated, so we need to start where we left off.
+ */
+ for (int block_id = prefetcher->next_block_id;
+ block_id <= record->max_block_id;
+ ++block_id)
+ {
+ DecodedBkpBlock *block = &record->blocks[block_id];
+ PrefetchBufferResult prefetch;
+ SMgrRelation reln;
+
+ /* Ignore everything but the main fork for now. */
+ if (block->forknum != MAIN_FORKNUM)
+ continue;
+
+ /*
+ * If there is a full page image attached, we won't be reading the
+ * page, so you might think we should skip it. However, if the
+ * underlying filesystem uses larger logical blocks than us, it might
+ * still need to perform a read-before-write some time later.
+ * Therefore, only prefetch if configured to do so.
+ */
+ if (block->has_image && !recovery_prefetch_fpw)
+ {
+ XLogPrefetchIncrement(&SharedStats->skip_fpw);
+ continue;
+ }
+
+ /*
+ * If this block will initialize a new page then it's probably a
+ * relation extension. Since that might create a new segment, we
+ * can't try to prefetch this block until the record has been
+ * replayed, or we might try to open a file that doesn't exist yet.
+ */
+ if (block->flags & BKPBLOCK_WILL_INIT)
+ {
+ XLogPrefetcherAddFilter(prefetcher, block->rnode, block->blkno,
+ record->lsn);
+ XLogPrefetchIncrement(&SharedStats->skip_new);
+ continue;
+ }
+
+ /* Should we skip this block due to a filter? */
+ if (XLogPrefetcherIsFiltered(prefetcher, block->rnode, block->blkno))
+ {
+ XLogPrefetchIncrement(&SharedStats->skip_new);
+ continue;
+ }
+
+ /* Fast path for repeated references to the same relation. */
+ if (RelFileNodeEquals(block->rnode, prefetcher->last_rnode))
+ {
+ /*
+ * If this is a repeat access to the same block, then skip it.
+ *
+ * XXX We could also check for last_blkno + 1 too, and also update
+ * last_blkno; it's not clear if the kernel would do a better job
+ * of sequential prefetching.
+ */
+ if (block->blkno == prefetcher->last_blkno)
+ {
+ XLogPrefetchIncrement(&SharedStats->skip_seq);
+ continue;
+ }
+
+ /* We can avoid calling smgropen(). */
+ reln = prefetcher->last_reln;
+ }
+ else
+ {
+ /* Otherwise we have to open it. */
+ reln = smgropen(block->rnode, InvalidBackendId);
+ prefetcher->last_rnode = block->rnode;
+ prefetcher->last_reln = reln;
+ }
+ prefetcher->last_blkno = block->blkno;
+
+ /* Try to prefetch this block! */
+ prefetch = PrefetchSharedBuffer(reln, block->forknum, block->blkno);
+ if (BufferIsValid(prefetch.recent_buffer))
+ {
+ /*
+ * It was already cached, so do nothing. We'll remember the
+ * buffer, so that recovery can try to avoid looking it up again.
+ */
+ block->recent_buffer = prefetch.recent_buffer;
+ XLogPrefetchIncrement(&SharedStats->skip_hit);
+ }
+ else if (prefetch.initiated_io)
+ {
+ /*
+ * I/O has possibly been initiated (though we don't know if it was
+ * already cached by the kernel, so we just have to assume that it
+ * has due to lack of better information). Record this as an I/O
+ * in progress until eventually we replay this LSN.
+ */
+ XLogPrefetchIncrement(&SharedStats->prefetch);
+ XLogPrefetcherInitiatedIO(prefetcher, record->lsn);
+
+ /*
+ * If the queue is now full, we'll have to wait before processing
+ * any more blocks from this record, or move to a new record if
+ * that was the last block.
+ */
+ if (XLogPrefetcherSaturated(prefetcher))
+ {
+ prefetcher->next_block_id = block_id + 1;
+ return false;
+ }
+ }
+ else
+ {
+ /*
+ * Neither cached nor initiated. The underlying segment file
+ * doesn't exist. Presumably it will be unlinked by a later WAL
+ * record. When recovery reads this block, it will use the
+ * EXTENSION_CREATE_RECOVERY flag. We certainly don't want to do
+ * that sort of thing while merely prefetching, so let's just
+ * ignore references to this relation until this record is
+ * replayed, and let recovery create the dummy file or complain if
+ * something is wrong.
+ */
+ XLogPrefetcherAddFilter(prefetcher, block->rnode, 0,
+ record->lsn);
+ XLogPrefetchIncrement(&SharedStats->skip_new);
+ }
+ }
+
+ return true;
+}
+
+/*
+ * Expose statistics about recovery prefetching.
+ */
+Datum
+pg_stat_get_prefetch_recovery(PG_FUNCTION_ARGS)
+{
+#define PG_STAT_GET_PREFETCH_RECOVERY_COLS 10
+ ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+ TupleDesc tupdesc;
+ Tuplestorestate *tupstore;
+ MemoryContext per_query_ctx;
+ MemoryContext oldcontext;
+ Datum values[PG_STAT_GET_PREFETCH_RECOVERY_COLS];
+ bool nulls[PG_STAT_GET_PREFETCH_RECOVERY_COLS];
+
+ if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("set-valued function called in context that cannot accept a set")));
+ if (!(rsinfo->allowedModes & SFRM_Materialize))
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("materialize mod required, but it is not allowed in this context")));
+
+ if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+ elog(ERROR, "return type must be a row type");
+
+ per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
+ oldcontext = MemoryContextSwitchTo(per_query_ctx);
+
+ tupstore = tuplestore_begin_heap(true, false, work_mem);
+ rsinfo->returnMode = SFRM_Materialize;
+ rsinfo->setResult = tupstore;
+ rsinfo->setDesc = tupdesc;
+
+ MemoryContextSwitchTo(oldcontext);
+
+ if (pg_atomic_read_u32(&SharedStats->reset_request) != SharedStats->reset_handled)
+ {
+ /* There's an unhandled reset request, so just show NULLs */
+ for (int i = 0; i < PG_STAT_GET_PREFETCH_RECOVERY_COLS; ++i)
+ nulls[i] = true;
+ }
+ else
+ {
+ for (int i = 0; i < PG_STAT_GET_PREFETCH_RECOVERY_COLS; ++i)
+ nulls[i] = false;
+ }
+
+ values[0] = TimestampTzGetDatum(pg_atomic_read_u64(&SharedStats->reset_time));
+ values[1] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->prefetch));
+ values[2] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_hit));
+ values[3] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_new));
+ values[4] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_fpw));
+ values[5] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_seq));
+ values[6] = Int32GetDatum(SharedStats->distance);
+ values[7] = Int32GetDatum(SharedStats->queue_depth);
+ values[8] = Float4GetDatum(SharedStats->avg_distance);
+ values[9] = Float4GetDatum(SharedStats->avg_queue_depth);
+ tuplestore_putvalues(tupstore, tupdesc, values, nulls);
+ tuplestore_donestoring(tupstore);
+
+ return (Datum) 0;
+}
+
+/*
+ * Compute (n + 1) % prefetch_queue_size, assuming n < prefetch_queue_size,
+ * without using division.
+ */
+static inline int
+XLogPrefetcherNext(XLogPrefetcher *prefetcher, int n)
+{
+ int next = n + 1;
+
+ return next == prefetcher->prefetch_queue_size ? 0 : next;
+}
+
+/*
+ * Don't prefetch any blocks >= 'blockno' from a given 'rnode', until 'lsn'
+ * has been replayed.
+ */
+static inline void
+XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher, RelFileNode rnode,
+ BlockNumber blockno, XLogRecPtr lsn)
+{
+ XLogPrefetcherFilter *filter;
+ bool found;
+
+ filter = hash_search(prefetcher->filter_table, &rnode, HASH_ENTER, &found);
+ if (!found)
+ {
+ /*
+ * Don't allow any prefetching of this block or higher until replayed.
+ */
+ filter->filter_until_replayed = lsn;
+ filter->filter_from_block = blockno;
+ dlist_push_head(&prefetcher->filter_queue, &filter->link);
+ }
+ else
+ {
+ /*
+ * We were already filtering this rnode. Extend the filter's lifetime
+ * to cover this WAL record, but leave the (presumably lower) block
+ * number there because we don't want to have to track individual
+ * blocks.
+ */
+ filter->filter_until_replayed = lsn;
+ dlist_delete(&filter->link);
+ dlist_push_head(&prefetcher->filter_queue, &filter->link);
+ }
+}
+
+/*
+ * Have we replayed the records that caused us to begin filtering a block
+ * range? That means that relations should have been created, extended or
+ * dropped as required, so we can drop relevant filters.
+ */
+static inline void
+XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn)
+{
+ while (unlikely(!dlist_is_empty(&prefetcher->filter_queue)))
+ {
+ XLogPrefetcherFilter *filter = dlist_tail_element(XLogPrefetcherFilter,
+ link,
+ &prefetcher->filter_queue);
+
+ if (filter->filter_until_replayed >= replaying_lsn)
+ break;
+ dlist_delete(&filter->link);
+ hash_search(prefetcher->filter_table, filter, HASH_REMOVE, NULL);
+ }
+}
+
+/*
+ * Check if a given block should be skipped due to a filter.
+ */
+static inline bool
+XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher, RelFileNode rnode,
+ BlockNumber blockno)
+{
+ /*
+ * Test for empty queue first, because we expect it to be empty most of
+ * the time and we can avoid the hash table lookup in that case.
+ */
+ if (unlikely(!dlist_is_empty(&prefetcher->filter_queue)))
+ {
+ XLogPrefetcherFilter *filter = hash_search(prefetcher->filter_table, &rnode,
+ HASH_FIND, NULL);
+
+ if (filter && filter->filter_from_block <= blockno)
+ return true;
+ }
+
+ return false;
+}
+
+/*
+ * Insert an LSN into the queue. The queue must not be full already. This
+ * tracks the fact that we have (to the best of our knowledge) initiated an
+ * I/O, so that we can impose a cap on concurrent prefetching.
+ */
+static inline void
+XLogPrefetcherInitiatedIO(XLogPrefetcher *prefetcher,
+ XLogRecPtr prefetching_lsn)
+{
+ Assert(!XLogPrefetcherSaturated(prefetcher));
+ prefetcher->prefetch_queue[prefetcher->prefetch_head] = prefetching_lsn;
+ prefetcher->prefetch_head =
+ XLogPrefetcherNext(prefetcher, prefetcher->prefetch_head);
+ SharedStats->queue_depth++;
+
+ Assert(SharedStats->queue_depth <= prefetcher->prefetch_queue_size);
+}
+
+/*
+ * Have we replayed the records that caused us to initiate the oldest
+ * prefetches yet? That means that they're definitely finished, so we can can
+ * forget about them and allow ourselves to initiate more prefetches. For now
+ * we don't have any awareness of when I/O really completes.
+ */
+static inline void
+XLogPrefetcherCompletedIO(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn)
+{
+ while (prefetcher->prefetch_head != prefetcher->prefetch_tail &&
+ prefetcher->prefetch_queue[prefetcher->prefetch_tail] < replaying_lsn)
+ {
+ prefetcher->prefetch_tail =
+ XLogPrefetcherNext(prefetcher, prefetcher->prefetch_tail);
+ SharedStats->queue_depth--;
+
+ Assert(SharedStats->queue_depth >= 0);
+ }
+}
+
+/*
+ * Check if the maximum allowed number of I/Os is already in flight.
+ */
+static inline bool
+XLogPrefetcherSaturated(XLogPrefetcher *prefetcher)
+{
+ int next = XLogPrefetcherNext(prefetcher, prefetcher->prefetch_head);
+
+ return next == prefetcher->prefetch_tail;
+}
+
+void
+assign_recovery_prefetch(bool new_value, void *extra)
+{
+ /* Reconfigure prefetching, because a setting it depends on changed. */
+ recovery_prefetch = new_value;
+ if (AmStartupProcess())
+ XLogPrefetchReconfigure();
+}
+
+void
+assign_recovery_prefetch_fpw(bool new_value, void *extra)
+{
+ /* Reconfigure prefetching, because a setting it depends on changed. */
+ recovery_prefetch_fpw = new_value;
+ if (AmStartupProcess())
+ XLogPrefetchReconfigure();
+}
diff --git a/src/backend/access/transam/xlogreader.c b/src/backend/access/transam/xlogreader.c
index f66592482a4..3ae4127b8a8 100644
--- a/src/backend/access/transam/xlogreader.c
+++ b/src/backend/access/transam/xlogreader.c
@@ -1927,6 +1927,8 @@ DecodeXLogRecord(XLogReaderState *state,
blk->has_image = ((fork_flags & BKPBLOCK_HAS_IMAGE) != 0);
blk->has_data = ((fork_flags & BKPBLOCK_HAS_DATA) != 0);
+ blk->recent_buffer = InvalidBuffer;
+
COPY_HEADER_FIELD(&blk->data_len, sizeof(uint16));
/* cross-check that the HAS_DATA flag is set iff data_length > 0 */
if (blk->has_data && blk->data_len == 0)
@@ -2135,6 +2137,15 @@ bool
XLogRecGetBlockTag(XLogReaderState *record, uint8 block_id,
RelFileNode *rnode, ForkNumber *forknum, BlockNumber *blknum)
{
+ return XLogRecGetRecentBuffer(record, block_id, rnode, forknum, blknum,
+ NULL);
+}
+
+bool
+XLogRecGetRecentBuffer(XLogReaderState *record, uint8 block_id,
+ RelFileNode *rnode, ForkNumber *forknum,
+ BlockNumber *blknum, Buffer *recent_buffer)
+{
DecodedBkpBlock *bkpb;
if (block_id > record->record->max_block_id ||
@@ -2148,6 +2159,8 @@ XLogRecGetBlockTag(XLogReaderState *record, uint8 block_id,
*forknum = bkpb->forknum;
if (blknum)
*blknum = bkpb->blkno;
+ if (recent_buffer)
+ *recent_buffer = bkpb->recent_buffer;
return true;
}
diff --git a/src/backend/access/transam/xlogutils.c b/src/backend/access/transam/xlogutils.c
index eedd95cc137..4d5c9bb08f5 100644
--- a/src/backend/access/transam/xlogutils.c
+++ b/src/backend/access/transam/xlogutils.c
@@ -335,11 +335,13 @@ XLogReadBufferForRedoExtended(XLogReaderState *record,
RelFileNode rnode;
ForkNumber forknum;
BlockNumber blkno;
+ Buffer recent_buffer;
Page page;
bool zeromode;
bool willinit;
- if (!XLogRecGetBlockTag(record, block_id, &rnode, &forknum, &blkno))
+ if (!XLogRecGetRecentBuffer(record, block_id, &rnode, &forknum, &blkno,
+ &recent_buffer))
{
/* Caller specified a bogus block_id */
elog(PANIC, "failed to locate backup block with ID %d", block_id);
@@ -361,7 +363,8 @@ XLogReadBufferForRedoExtended(XLogReaderState *record,
{
Assert(XLogRecHasBlockImage(record, block_id));
*buf = XLogReadBufferExtended(rnode, forknum, blkno,
- get_cleanup_lock ? RBM_ZERO_AND_CLEANUP_LOCK : RBM_ZERO_AND_LOCK);
+ get_cleanup_lock ? RBM_ZERO_AND_CLEANUP_LOCK : RBM_ZERO_AND_LOCK,
+ recent_buffer);
page = BufferGetPage(*buf);
if (!RestoreBlockImage(record, block_id, page))
elog(ERROR, "failed to restore block image");
@@ -390,7 +393,8 @@ XLogReadBufferForRedoExtended(XLogReaderState *record,
}
else
{
- *buf = XLogReadBufferExtended(rnode, forknum, blkno, mode);
+ *buf = XLogReadBufferExtended(rnode, forknum, blkno, mode,
+ recent_buffer);
if (BufferIsValid(*buf))
{
if (mode != RBM_ZERO_AND_LOCK && mode != RBM_ZERO_AND_CLEANUP_LOCK)
@@ -437,7 +441,8 @@ XLogReadBufferForRedoExtended(XLogReaderState *record,
*/
Buffer
XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum,
- BlockNumber blkno, ReadBufferMode mode)
+ BlockNumber blkno, ReadBufferMode mode,
+ Buffer recent_buffer)
{
BlockNumber lastblock;
Buffer buffer;
@@ -445,6 +450,15 @@ XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum,
Assert(blkno != P_NEW);
+ /* Do we have a clue where the buffer might be already? */
+ if (BufferIsValid(recent_buffer) &&
+ mode == RBM_NORMAL &&
+ ReadRecentBuffer(rnode, forknum, blkno, recent_buffer))
+ {
+ buffer = recent_buffer;
+ goto recent_buffer_fast_path;
+ }
+
/* Open the relation at smgr level */
smgr = smgropen(rnode, InvalidBackendId);
@@ -503,6 +517,7 @@ XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum,
}
}
+recent_buffer_fast_path:
if (mode == RBM_NORMAL)
{
/* check that page has been initialized */
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index ff65b3edfa7..451db2ee0a0 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -911,6 +911,20 @@ CREATE VIEW pg_stat_wal_receiver AS
FROM pg_stat_get_wal_receiver() s
WHERE s.pid IS NOT NULL;
+CREATE VIEW pg_stat_prefetch_recovery AS
+ SELECT
+ s.stats_reset,
+ s.prefetch,
+ s.skip_hit,
+ s.skip_new,
+ s.skip_fpw,
+ s.skip_seq,
+ s.distance,
+ s.queue_depth,
+ s.avg_distance,
+ s.avg_queue_depth
+ FROM pg_stat_get_prefetch_recovery() s;
+
CREATE VIEW pg_stat_subscription AS
SELECT
su.oid AS subid,
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index 958183dd69d..f4467625f7f 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -38,6 +38,7 @@
#include "access/transam.h"
#include "access/twophase_rmgr.h"
#include "access/xact.h"
+#include "access/xlogprefetch.h"
#include "catalog/partition.h"
#include "catalog/pg_database.h"
#include "catalog/pg_proc.h"
@@ -278,6 +279,7 @@ static PgStat_WalStats walStats;
static PgStat_SLRUStats slruStats[SLRU_NUM_ELEMENTS];
static PgStat_ReplSlotStats *replSlotStats;
static int nReplSlotStats;
+static PgStat_RecoveryPrefetchStats recoveryPrefetchStats;
/*
* List of OIDs of databases we need to write out. If an entry is InvalidOid,
@@ -349,6 +351,7 @@ static void pgstat_recv_archiver(PgStat_MsgArchiver *msg, int len);
static void pgstat_recv_bgwriter(PgStat_MsgBgWriter *msg, int len);
static void pgstat_recv_wal(PgStat_MsgWal *msg, int len);
static void pgstat_recv_slru(PgStat_MsgSLRU *msg, int len);
+static void pgstat_recv_recoveryprefetch(PgStat_MsgRecoveryPrefetch *msg, int len);
static void pgstat_recv_funcstat(PgStat_MsgFuncstat *msg, int len);
static void pgstat_recv_funcpurge(PgStat_MsgFuncpurge *msg, int len);
static void pgstat_recv_recoveryconflict(PgStat_MsgRecoveryConflict *msg, int len);
@@ -1424,11 +1427,20 @@ pgstat_reset_shared_counters(const char *target)
msg.m_resettarget = RESET_BGWRITER;
else if (strcmp(target, "wal") == 0)
msg.m_resettarget = RESET_WAL;
+ else if (strcmp(target, "prefetch_recovery") == 0)
+ {
+ /*
+ * We can't ask the stats collector to do this for us as it is not
+ * attached to shared memory.
+ */
+ XLogPrefetchRequestResetStats();
+ return;
+ }
else
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("unrecognized reset target: \"%s\"", target),
- errhint("Target must be \"archiver\", \"bgwriter\" or \"wal\".")));
+ errhint("Target must be \"archiver\", \"bgwriter\", \"wal\" or \"prefetch_recovery\".")));
pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETSHAREDCOUNTER);
pgstat_send(&msg, sizeof(msg));
@@ -2874,6 +2886,22 @@ pgstat_fetch_replslot(int *nslots_p)
}
/*
+ * ---------
+ * pgstat_fetch_recoveryprefetch() -
+ *
+ * Support function for restoring the counters managed by xlogprefetch.c.
+ * ---------
+ */
+PgStat_RecoveryPrefetchStats *
+pgstat_fetch_recoveryprefetch(void)
+{
+ backend_read_statsfile();
+
+ return &recoveryPrefetchStats;
+}
+
+
+/*
* Shut down a single backend's statistics reporting at process exit.
*
* Flush any remaining statistics counts out to the collector.
@@ -3149,6 +3177,23 @@ pgstat_send_slru(void)
/* ----------
+ * pgstat_send_recoveryprefetch() -
+ *
+ * Send recovery prefetch statistics to the collector
+ * ----------
+ */
+void
+pgstat_send_recoveryprefetch(PgStat_RecoveryPrefetchStats *stats)
+{
+ PgStat_MsgRecoveryPrefetch msg;
+
+ pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RECOVERYPREFETCH);
+ msg.m_stats = *stats;
+ pgstat_send(&msg, sizeof(msg));
+}
+
+
+/* ----------
* PgstatCollectorMain() -
*
* Start up the statistics collector process. This is the body of the
@@ -3365,6 +3410,10 @@ PgstatCollectorMain(int argc, char *argv[])
pgstat_recv_slru(&msg.msg_slru, len);
break;
+ case PGSTAT_MTYPE_RECOVERYPREFETCH:
+ pgstat_recv_recoveryprefetch(&msg.msg_recoveryprefetch, len);
+ break;
+
case PGSTAT_MTYPE_FUNCSTAT:
pgstat_recv_funcstat(&msg.msg_funcstat, len);
break;
@@ -3659,6 +3708,13 @@ pgstat_write_statsfiles(bool permanent, bool allDbs)
(void) rc; /* we'll check for error with ferror */
/*
+ * Write recovery prefetch stats struct
+ */
+ rc = fwrite(&recoveryPrefetchStats, sizeof(recoveryPrefetchStats), 1,
+ fpout);
+ (void) rc; /* we'll check for error with ferror */
+
+ /*
* Walk through the database table.
*/
hash_seq_init(&hstat, pgStatDBHash);
@@ -3933,6 +3989,7 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
memset(&archiverStats, 0, sizeof(archiverStats));
memset(&walStats, 0, sizeof(walStats));
memset(&slruStats, 0, sizeof(slruStats));
+ memset(&recoveryPrefetchStats, 0, sizeof(recoveryPrefetchStats));
/*
* Set the current timestamp (will be kept only in case we can't load an
@@ -4039,6 +4096,18 @@ pgstat_read_statsfiles(Oid onlydb, bool permanent, bool deep)
}
/*
+ * Read recoveryPrefetchStats struct
+ */
+ if (fread(&recoveryPrefetchStats, 1, sizeof(recoveryPrefetchStats),
+ fpin) != sizeof(recoveryPrefetchStats))
+ {
+ ereport(pgStatRunningInCollector ? LOG : WARNING,
+ (errmsg("corrupted statistics file \"%s\"", statfile)));
+ memset(&recoveryPrefetchStats, 0, sizeof(recoveryPrefetchStats));
+ goto done;
+ }
+
+ /*
* We found an existing collector stats file. Read it and put all the
* hashtable entries into place.
*/
@@ -4356,6 +4425,7 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent,
PgStat_WalStats myWalStats;
PgStat_SLRUStats mySLRUStats[SLRU_NUM_ELEMENTS];
PgStat_ReplSlotStats myReplSlotStats;
+ PgStat_RecoveryPrefetchStats myRecoveryPrefetchStats;
FILE *fpin;
int32 format_id;
const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename;
@@ -4432,6 +4502,18 @@ pgstat_read_db_statsfile_timestamp(Oid databaseid, bool permanent,
return false;
}
+ /*
+ * Read recovery prefetch stats struct
+ */
+ if (fread(&myRecoveryPrefetchStats, 1, sizeof(myRecoveryPrefetchStats),
+ fpin) != sizeof(myRecoveryPrefetchStats))
+ {
+ ereport(pgStatRunningInCollector ? LOG : WARNING,
+ (errmsg("corrupted statistics file \"%s\"", statfile)));
+ FreeFile(fpin);
+ return false;
+ }
+
/* By default, we're going to return the timestamp of the global file. */
*ts = myGlobalStats.stats_timestamp;
@@ -4615,6 +4697,13 @@ backend_read_statsfile(void)
if (ok && file_ts >= min_ts)
break;
+ /*
+ * If we're in crash recovery, the collector may not even be running,
+ * so work with what we have.
+ */
+ if (InRecovery)
+ break;
+
/* Not there or too old, so kick the collector and wait a bit */
if ((count % PGSTAT_INQ_LOOP_COUNT) == 0)
pgstat_send_inquiry(cur_ts, min_ts, inquiry_db);
@@ -5350,6 +5439,18 @@ pgstat_recv_slru(PgStat_MsgSLRU *msg, int len)
}
/* ----------
+ * pgstat_recv_recoveryprefetch() -
+ *
+ * Process a recovery prefetch message.
+ * ----------
+ */
+static void
+pgstat_recv_recoveryprefetch(PgStat_MsgRecoveryPrefetch *msg, int len)
+{
+ recoveryPrefetchStats = msg->m_stats;
+}
+
+/* ----------
* pgstat_recv_recoveryconflict() -
*
* Process a RECOVERYCONFLICT message.
diff --git a/src/backend/storage/freespace/freespace.c b/src/backend/storage/freespace/freespace.c
index 8c12dda2380..cfa0414e5ab 100644
--- a/src/backend/storage/freespace/freespace.c
+++ b/src/backend/storage/freespace/freespace.c
@@ -210,7 +210,8 @@ XLogRecordPageWithFreeSpace(RelFileNode rnode, BlockNumber heapBlk,
blkno = fsm_logical_to_physical(addr);
/* If the page doesn't exist already, extend */
- buf = XLogReadBufferExtended(rnode, FSM_FORKNUM, blkno, RBM_ZERO_ON_ERROR);
+ buf = XLogReadBufferExtended(rnode, FSM_FORKNUM, blkno, RBM_ZERO_ON_ERROR,
+ InvalidBuffer);
LockBuffer(buf, BUFFER_LOCK_EXCLUSIVE);
page = BufferGetPage(buf);
diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c
index 3e4ec53a97e..47847563ef0 100644
--- a/src/backend/storage/ipc/ipci.c
+++ b/src/backend/storage/ipc/ipci.c
@@ -22,6 +22,7 @@
#include "access/subtrans.h"
#include "access/syncscan.h"
#include "access/twophase.h"
+#include "access/xlogprefetch.h"
#include "commands/async.h"
#include "miscadmin.h"
#include "pgstat.h"
@@ -126,6 +127,7 @@ CreateSharedMemoryAndSemaphores(void)
size = add_size(size, PredicateLockShmemSize());
size = add_size(size, ProcGlobalShmemSize());
size = add_size(size, XLOGShmemSize());
+ size = add_size(size, XLogPrefetchShmemSize());
size = add_size(size, CLOGShmemSize());
size = add_size(size, CommitTsShmemSize());
size = add_size(size, SUBTRANSShmemSize());
@@ -217,6 +219,7 @@ CreateSharedMemoryAndSemaphores(void)
* Set up xlog, clog, and buffers
*/
XLOGShmemInit();
+ XLogPrefetchShmemInit();
CLOGShmemInit();
CommitTsShmemInit();
SUBTRANSShmemInit();
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index bee976bae87..090abdad8ba 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -41,6 +41,7 @@
#include "access/twophase.h"
#include "access/xact.h"
#include "access/xlog_internal.h"
+#include "access/xlogprefetch.h"
#include "catalog/namespace.h"
#include "catalog/pg_authid.h"
#include "catalog/storage.h"
@@ -209,6 +210,7 @@ static bool check_effective_io_concurrency(int *newval, void **extra, GucSource
static bool check_maintenance_io_concurrency(int *newval, void **extra, GucSource source);
static bool check_huge_page_size(int *newval, void **extra, GucSource source);
static bool check_client_connection_check_interval(int *newval, void **extra, GucSource source);
+static void assign_maintenance_io_concurrency(int newval, void *extra);
static void assign_pgstat_temp_directory(const char *newval, void *extra);
static bool check_application_name(char **newval, void **extra, GucSource source);
static void assign_application_name(const char *newval, void *extra);
@@ -1294,6 +1296,27 @@ static struct config_bool ConfigureNamesBool[] =
true,
NULL, NULL, NULL
},
+ {
+ {"recovery_prefetch", PGC_SIGHUP, WAL_SETTINGS,
+ gettext_noop("Prefetch referenced blocks during recovery"),
+ gettext_noop("Read ahead of the current replay position to find uncached blocks.")
+ },
+ &recovery_prefetch,
+ false,
+ NULL, assign_recovery_prefetch, NULL
+ },
+ {
+ {"recovery_prefetch_fpw", PGC_SIGHUP, WAL_SETTINGS,
+ gettext_noop("Prefetch blocks that have full page images in the WAL"),
+ gettext_noop("On some systems, there is no benefit to prefetching pages that will be "
+ "entirely overwritten, but if the logical page size of the filesystem is "
+ "larger than PostgreSQL's, this can be beneficial. This option has no "
+ "effect unless recovery_prefetch is enabled.")
+ },
+ &recovery_prefetch_fpw,
+ false,
+ NULL, assign_recovery_prefetch_fpw, NULL
+ },
{
{"wal_log_hints", PGC_POSTMASTER, WAL_SETTINGS,
@@ -2749,6 +2772,17 @@ static struct config_int ConfigureNamesInt[] =
},
{
+ {"wal_decode_buffer_size", PGC_POSTMASTER, WAL_ARCHIVE_RECOVERY,
+ gettext_noop("Maximum buffer size for reading ahead in the WAL during recovery."),
+ gettext_noop("This controls the maximum distance we can read ahead n the WAL to prefetch referenced blocks."),
+ GUC_UNIT_BYTE
+ },
+ &wal_decode_buffer_size,
+ 512 * 1024, 64 * 1024, INT_MAX,
+ NULL, NULL, NULL
+ },
+
+ {
{"wal_keep_size", PGC_SIGHUP, REPLICATION_SENDING,
gettext_noop("Sets the size of WAL files held for standby servers."),
NULL,
@@ -3068,7 +3102,8 @@ static struct config_int ConfigureNamesInt[] =
0,
#endif
0, MAX_IO_CONCURRENCY,
- check_maintenance_io_concurrency, NULL, NULL
+ check_maintenance_io_concurrency, assign_maintenance_io_concurrency,
+ NULL
},
{
@@ -12073,6 +12108,20 @@ check_client_connection_check_interval(int *newval, void **extra, GucSource sour
}
static void
+assign_maintenance_io_concurrency(int newval, void *extra)
+{
+#ifdef USE_PREFETCH
+ /*
+ * Reconfigure recovery prefetching, because a setting it depends on
+ * changed.
+ */
+ maintenance_io_concurrency = newval;
+ if (AmStartupProcess())
+ XLogPrefetchReconfigure();
+#endif
+}
+
+static void
assign_pgstat_temp_directory(const char *newval, void *extra)
{
/* check_canonical_path already canonicalized newval for us */
diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample
index ff9fa006fef..9830cfe382e 100644
--- a/src/backend/utils/misc/postgresql.conf.sample
+++ b/src/backend/utils/misc/postgresql.conf.sample
@@ -235,6 +235,12 @@
#checkpoint_flush_after = 0 # measured in pages, 0 disables
#checkpoint_warning = 30s # 0 disables
+# - Prefetching during recovery -
+
+#wal_decode_buffer_size = 512kB # lookahead window used for prefetching
+#recovery_prefetch = off # prefetch pages referenced in the WAL?
+#recovery_prefetch_fpw = off # even pages logged with full page?
+
# - Archiving -
#archive_mode = off # enables archiving; off, on, or always
diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h
index 77187c12beb..f542af0a262 100644
--- a/src/include/access/xlog.h
+++ b/src/include/access/xlog.h
@@ -132,6 +132,7 @@ extern char *PrimaryConnInfo;
extern char *PrimarySlotName;
extern bool wal_receiver_create_temp_slot;
extern bool track_wal_io_timing;
+extern int wal_decode_buffer_size;
/* indirectly set via GUC system */
extern TransactionId recoveryTargetXid;
diff --git a/src/include/access/xlogprefetch.h b/src/include/access/xlogprefetch.h
new file mode 100644
index 00000000000..59ce9c64739
--- /dev/null
+++ b/src/include/access/xlogprefetch.h
@@ -0,0 +1,82 @@
+/*-------------------------------------------------------------------------
+ *
+ * xlogprefetch.h
+ * Declarations for the recovery prefetching module.
+ *
+ * Portions Copyright (c) 2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/include/access/xlogprefetch.h
+ *-------------------------------------------------------------------------
+ */
+#ifndef XLOGPREFETCH_H
+#define XLOGPREFETCH_H
+
+#include "access/xlogdefs.h"
+
+/* GUCs */
+extern bool recovery_prefetch;
+extern bool recovery_prefetch_fpw;
+
+struct XLogPrefetcher;
+typedef struct XLogPrefetcher XLogPrefetcher;
+
+extern int XLogPrefetchReconfigureCount;
+
+typedef struct XLogPrefetchState
+{
+ XLogReaderState *reader;
+ XLogPrefetcher *prefetcher;
+ int reconfigure_count;
+} XLogPrefetchState;
+
+extern size_t XLogPrefetchShmemSize(void);
+extern void XLogPrefetchShmemInit(void);
+
+extern void XLogPrefetchReconfigure(void);
+extern void XLogPrefetchRequestResetStats(void);
+
+extern void XLogPrefetchBegin(XLogPrefetchState *state, XLogReaderState *reader);
+extern void XLogPrefetchEnd(XLogPrefetchState *state);
+
+/* Functions exposed only for the use of XLogPrefetch(). */
+extern XLogPrefetcher *XLogPrefetcherAllocate(XLogReaderState *reader);
+extern void XLogPrefetcherFree(XLogPrefetcher *prefetcher);
+extern bool XLogPrefetcherReadAhead(XLogPrefetcher *prefetch,
+ XLogRecPtr replaying_lsn);
+
+/*
+ * Tell the prefetching module that we are now replaying a given LSN, so that
+ * it can decide how far ahead to read in the WAL, if configured. Return
+ * true if more data is needed by the reader.
+ */
+static inline bool
+XLogPrefetch(XLogPrefetchState *state, XLogRecPtr replaying_lsn)
+{
+ /*
+ * Handle any configuration changes. Rather than trying to deal with
+ * various parameter changes, we just tear down and set up a new
+ * prefetcher if anything we depend on changes.
+ */
+ if (unlikely(state->reconfigure_count != XLogPrefetchReconfigureCount))
+ {
+ /* If we had a prefetcher, tear it down. */
+ if (state->prefetcher)
+ {
+ XLogPrefetcherFree(state->prefetcher);
+ state->prefetcher = NULL;
+ }
+ /* If we want a prefetcher, set it up. */
+ if (recovery_prefetch)
+ state->prefetcher = XLogPrefetcherAllocate(state->reader);
+ state->reconfigure_count = XLogPrefetchReconfigureCount;
+ }
+
+ if (state->prefetcher)
+ return XLogPrefetcherReadAhead(state->prefetcher, replaying_lsn);
+
+ return false;
+}
+
+#endif
diff --git a/src/include/access/xlogreader.h b/src/include/access/xlogreader.h
index 010cbb59d6b..d5308b25a28 100644
--- a/src/include/access/xlogreader.h
+++ b/src/include/access/xlogreader.h
@@ -39,6 +39,7 @@
#endif
#include "access/xlogrecord.h"
+#include "storage/buf.h"
/* WALOpenSegment represents a WAL segment being read. */
typedef struct WALOpenSegment
@@ -77,6 +78,9 @@ typedef struct
ForkNumber forknum;
BlockNumber blkno;
+ /* Workspace for remembering last known buffer holding this block. */
+ Buffer recent_buffer;
+
/* copy of the fork_flags field from the XLogRecordBlockHeader */
uint8 flags;
@@ -397,5 +401,8 @@ extern char *XLogRecGetBlockData(XLogReaderState *record, uint8 block_id, Size *
extern bool XLogRecGetBlockTag(XLogReaderState *record, uint8 block_id,
RelFileNode *rnode, ForkNumber *forknum,
BlockNumber *blknum);
+extern bool XLogRecGetRecentBuffer(XLogReaderState *record, uint8 block_id,
+ RelFileNode *rnode, ForkNumber *forknum,
+ BlockNumber *blknum, Buffer *recent_buffer);
#endif /* XLOGREADER_H */
diff --git a/src/include/access/xlogutils.h b/src/include/access/xlogutils.h
index 397fb27fc22..bbc60851307 100644
--- a/src/include/access/xlogutils.h
+++ b/src/include/access/xlogutils.h
@@ -42,7 +42,8 @@ extern XLogRedoAction XLogReadBufferForRedoExtended(XLogReaderState *record,
Buffer *buf);
extern Buffer XLogReadBufferExtended(RelFileNode rnode, ForkNumber forknum,
- BlockNumber blkno, ReadBufferMode mode);
+ BlockNumber blkno, ReadBufferMode mode,
+ Buffer recent_buffer);
extern Relation CreateFakeRelcacheEntry(RelFileNode rnode);
extern void FreeFakeRelcacheEntry(Relation fakerel);
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 599dd10d10e..f4957653ae6 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -6291,6 +6291,14 @@
prorettype => 'text', proargtypes => '',
prosrc => 'pg_get_wal_replay_pause_state' },
+{ oid => '9085', descr => 'statistics: information about WAL prefetching',
+ proname => 'pg_stat_get_prefetch_recovery', prorows => '1', provolatile => 'v',
+ proretset => 't', prorettype => 'record', proargtypes => '',
+ proallargtypes => '{timestamptz,int8,int8,int8,int8,int8,int4,int4,float4,float4}',
+ proargmodes => '{o,o,o,o,o,o,o,o,o,o}',
+ proargnames => '{stats_reset,prefetch,skip_hit,skip_new,skip_fpw,skip_seq,distance,queue_depth,avg_distance,avg_queue_depth}',
+ prosrc => 'pg_stat_get_prefetch_recovery' },
+
{ oid => '2621', descr => 'reload configuration files',
proname => 'pg_reload_conf', provolatile => 'v', prorettype => 'bool',
proargtypes => '', prosrc => 'pg_reload_conf' },
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index 89cd324454a..9a87e7cd884 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -74,6 +74,7 @@ typedef enum StatMsgType
PGSTAT_MTYPE_BGWRITER,
PGSTAT_MTYPE_WAL,
PGSTAT_MTYPE_SLRU,
+ PGSTAT_MTYPE_RECOVERYPREFETCH,
PGSTAT_MTYPE_FUNCSTAT,
PGSTAT_MTYPE_FUNCPURGE,
PGSTAT_MTYPE_RECOVERYCONFLICT,
@@ -197,6 +198,19 @@ typedef struct PgStat_TableXactStatus
struct PgStat_TableXactStatus *next; /* next of same subxact */
} PgStat_TableXactStatus;
+/*
+ * Recovery prefetching statistics persisted on disk by pgstat.c, but kept in
+ * shared memory by xlogprefetch.c.
+ */
+typedef struct PgStat_RecoveryPrefetchStats
+{
+ PgStat_Counter prefetch;
+ PgStat_Counter skip_hit;
+ PgStat_Counter skip_new;
+ PgStat_Counter skip_fpw;
+ PgStat_Counter skip_seq;
+ TimestampTz stat_reset_timestamp;
+} PgStat_RecoveryPrefetchStats;
/* ------------------------------------------------------------
* Message formats follow
@@ -536,6 +550,15 @@ typedef struct PgStat_MsgReplSlot
PgStat_Counter m_stream_bytes;
} PgStat_MsgReplSlot;
+/* ----------
+ * PgStat_MsgRecoveryPrefetch Sent by XLogPrefetch to save statistics.
+ * ----------
+ */
+typedef struct PgStat_MsgRecoveryPrefetch
+{
+ PgStat_MsgHdr m_hdr;
+ PgStat_RecoveryPrefetchStats m_stats;
+} PgStat_MsgRecoveryPrefetch;
/* ----------
* PgStat_MsgRecoveryConflict Sent by the backend upon recovery conflict
@@ -699,6 +722,7 @@ typedef union PgStat_Msg
PgStat_MsgBgWriter msg_bgwriter;
PgStat_MsgWal msg_wal;
PgStat_MsgSLRU msg_slru;
+ PgStat_MsgRecoveryPrefetch msg_recoveryprefetch;
PgStat_MsgFuncstat msg_funcstat;
PgStat_MsgFuncpurge msg_funcpurge;
PgStat_MsgRecoveryConflict msg_recoveryconflict;
@@ -1088,6 +1112,7 @@ extern void pgstat_twophase_postabort(TransactionId xid, uint16 info,
extern void pgstat_send_archiver(const char *xlog, bool failed);
extern void pgstat_send_bgwriter(void);
+extern void pgstat_send_recoveryprefetch(PgStat_RecoveryPrefetchStats *stats);
extern void pgstat_report_wal(void);
extern bool pgstat_send_wal(bool force);
@@ -1104,6 +1129,7 @@ extern PgStat_GlobalStats *pgstat_fetch_global(void);
extern PgStat_WalStats *pgstat_fetch_stat_wal(void);
extern PgStat_SLRUStats *pgstat_fetch_slru(void);
extern PgStat_ReplSlotStats *pgstat_fetch_replslot(int *nslots_p);
+extern PgStat_RecoveryPrefetchStats *pgstat_fetch_recoveryprefetch(void);
extern void pgstat_count_slru_page_zeroed(int slru_idx);
extern void pgstat_count_slru_page_hit(int slru_idx);
diff --git a/src/include/utils/guc.h b/src/include/utils/guc.h
index 9b6552b25b2..1892c7927b4 100644
--- a/src/include/utils/guc.h
+++ b/src/include/utils/guc.h
@@ -443,4 +443,8 @@ extern void assign_search_path(const char *newval, void *extra);
extern bool check_wal_buffers(int *newval, void **extra, GucSource source);
extern void assign_xlog_sync_method(int new_sync_method, void *extra);
+/* in access/transam/xlogprefetch.c */
+extern void assign_recovery_prefetch(bool new_value, void *extra);
+extern void assign_recovery_prefetch_fpw(bool new_value, void *extra);
+
#endif /* GUC_H */
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index a8a1cc72d0d..186e6c966c6 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1879,6 +1879,17 @@ pg_stat_gssapi| SELECT s.pid,
s.gss_enc AS encrypted
FROM pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid, queryid)
WHERE (s.client_port IS NOT NULL);
+pg_stat_prefetch_recovery| SELECT s.stats_reset,
+ s.prefetch,
+ s.skip_hit,
+ s.skip_new,
+ s.skip_fpw,
+ s.skip_seq,
+ s.distance,
+ s.queue_depth,
+ s.avg_distance,
+ s.avg_queue_depth
+ FROM pg_stat_get_prefetch_recovery() s(stats_reset, prefetch, skip_hit, skip_new, skip_fpw, skip_seq, distance, queue_depth, avg_distance, avg_queue_depth);
pg_stat_progress_analyze| SELECT s.pid,
s.datid,
d.datname,
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index efb98111982..200015cac79 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2803,6 +2803,10 @@ XLogPageHeader
XLogPageHeaderData
XLogPageReadCB
XLogPageReadPrivate
+XLogPrefetcher
+XLogPrefetcherFilter
+XLogPrefetchState
+XLogPrefetchStats
XLogReaderRoutine
XLogReaderState
XLogRecData