diff options
Diffstat (limited to 'src/backend/access/transam/xlogprefetch.c')
-rw-r--r-- | src/backend/access/transam/xlogprefetch.c | 922 |
1 files changed, 922 insertions, 0 deletions
diff --git a/src/backend/access/transam/xlogprefetch.c b/src/backend/access/transam/xlogprefetch.c new file mode 100644 index 00000000000..28764326bcc --- /dev/null +++ b/src/backend/access/transam/xlogprefetch.c @@ -0,0 +1,922 @@ +/*------------------------------------------------------------------------- + * + * xlogprefetch.c + * Prefetching support for recovery. + * + * Portions Copyright (c) 2021, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * + * IDENTIFICATION + * src/backend/access/transam/xlogprefetch.c + * + * The goal of this module is to read future WAL records and issue + * PrefetchSharedBuffer() calls for referenced blocks, so that we avoid I/O + * stalls in the main recovery loop. + * + * When examining a WAL record from the future, we need to consider that a + * referenced block or segment file might not exist on disk until this record + * or some earlier record has been replayed. After a crash, a file might also + * be missing because it was dropped by a later WAL record; in that case, it + * will be recreated when this record is replayed. These cases are handled by + * recognizing them and adding a "filter" that prevents all prefetching of a + * certain block range until the present WAL record has been replayed. Blocks + * skipped for these reasons are counted as "skip_new" (that is, cases where we + * didn't try to prefetch "new" blocks). + * + * Blocks found in the buffer pool already are counted as "skip_hit". + * Repeated access to the same buffer is detected and skipped, and this is + * counted with "skip_seq". Blocks that were logged with FPWs are skipped if + * recovery_prefetch_fpw is off, since on most systems there will be no I/O + * stall; this is counted with "skip_fpw". + * + * The only way we currently have to know that an I/O initiated with + * PrefetchSharedBuffer() has that recovery will eventually call ReadBuffer(), + * and perform a synchronous read. Therefore, we track the number of + * potentially in-flight I/Os by using a circular buffer of LSNs. When it's + * full, we have to wait for recovery to replay records so that the queue + * depth can be reduced, before we can do any more prefetching. Ideally, this + * keeps us the right distance ahead to respect maintenance_io_concurrency. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/xlog.h" +#include "access/xlogprefetch.h" +#include "access/xlogreader.h" +#include "access/xlogutils.h" +#include "catalog/storage_xlog.h" +#include "utils/fmgrprotos.h" +#include "utils/timestamp.h" +#include "funcapi.h" +#include "pgstat.h" +#include "miscadmin.h" +#include "port/atomics.h" +#include "storage/bufmgr.h" +#include "storage/shmem.h" +#include "storage/smgr.h" +#include "utils/guc.h" +#include "utils/hsearch.h" + +/* + * Sample the queue depth and distance every time we replay this much WAL. + * This is used to compute avg_queue_depth and avg_distance for the log + * message that appears at the end of crash recovery. It's also used to send + * messages periodically to the stats collector, to save the counters on disk. + */ +#define XLOGPREFETCHER_SAMPLE_DISTANCE 0x40000 + +/* GUCs */ +bool recovery_prefetch = false; +bool recovery_prefetch_fpw = false; + +int XLogPrefetchReconfigureCount; + +/* + * A prefetcher object. There is at most one of these in existence at a time, + * recreated whenever there is a configuration change. + */ +struct XLogPrefetcher +{ + /* Reader and current reading state. */ + XLogReaderState *reader; + DecodedXLogRecord *record; + int next_block_id; + bool shutdown; + + /* Details of last prefetch to skip repeats and seq scans. */ + SMgrRelation last_reln; + RelFileNode last_rnode; + BlockNumber last_blkno; + + /* Online averages. */ + uint64 samples; + double avg_queue_depth; + double avg_distance; + XLogRecPtr next_sample_lsn; + + /* Book-keeping required to avoid accessing non-existing blocks. */ + HTAB *filter_table; + dlist_head filter_queue; + + /* Book-keeping required to limit concurrent prefetches. */ + int prefetch_head; + int prefetch_tail; + int prefetch_queue_size; + XLogRecPtr prefetch_queue[MAX_IO_CONCURRENCY + 1]; +}; + +/* + * A temporary filter used to track block ranges that haven't been created + * yet, whole relations that haven't been created yet, and whole relations + * that we must assume have already been dropped. + */ +typedef struct XLogPrefetcherFilter +{ + RelFileNode rnode; + XLogRecPtr filter_until_replayed; + BlockNumber filter_from_block; + dlist_node link; +} XLogPrefetcherFilter; + +/* + * Counters exposed in shared memory for pg_stat_prefetch_recovery. + */ +typedef struct XLogPrefetchStats +{ + pg_atomic_uint64 reset_time; /* Time of last reset. */ + pg_atomic_uint64 prefetch; /* Prefetches initiated. */ + pg_atomic_uint64 skip_hit; /* Blocks already buffered. */ + pg_atomic_uint64 skip_new; /* New/missing blocks filtered. */ + pg_atomic_uint64 skip_fpw; /* FPWs skipped. */ + pg_atomic_uint64 skip_seq; /* Repeat blocks skipped. */ + float avg_distance; + float avg_queue_depth; + + /* Reset counters */ + pg_atomic_uint32 reset_request; + uint32 reset_handled; + + /* Dynamic values */ + int distance; /* Number of bytes ahead in the WAL. */ + int queue_depth; /* Number of I/Os possibly in progress. */ +} XLogPrefetchStats; + +static inline void XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher, + RelFileNode rnode, + BlockNumber blockno, + XLogRecPtr lsn); +static inline bool XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher, + RelFileNode rnode, + BlockNumber blockno); +static inline void XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher, + XLogRecPtr replaying_lsn); +static inline void XLogPrefetcherInitiatedIO(XLogPrefetcher *prefetcher, + XLogRecPtr prefetching_lsn); +static inline void XLogPrefetcherCompletedIO(XLogPrefetcher *prefetcher, + XLogRecPtr replaying_lsn); +static inline bool XLogPrefetcherSaturated(XLogPrefetcher *prefetcher); +static bool XLogPrefetcherScanRecords(XLogPrefetcher *prefetcher, + XLogRecPtr replaying_lsn); +static bool XLogPrefetcherScanBlocks(XLogPrefetcher *prefetcher); +static void XLogPrefetchSaveStats(void); +static void XLogPrefetchRestoreStats(void); + +static XLogPrefetchStats *SharedStats; + +size_t +XLogPrefetchShmemSize(void) +{ + return sizeof(XLogPrefetchStats); +} + +static void +XLogPrefetchResetStats(void) +{ + pg_atomic_write_u64(&SharedStats->reset_time, GetCurrentTimestamp()); + pg_atomic_write_u64(&SharedStats->prefetch, 0); + pg_atomic_write_u64(&SharedStats->skip_hit, 0); + pg_atomic_write_u64(&SharedStats->skip_new, 0); + pg_atomic_write_u64(&SharedStats->skip_fpw, 0); + pg_atomic_write_u64(&SharedStats->skip_seq, 0); + SharedStats->avg_distance = 0; + SharedStats->avg_queue_depth = 0; +} + +void +XLogPrefetchShmemInit(void) +{ + bool found; + + SharedStats = (XLogPrefetchStats *) + ShmemInitStruct("XLogPrefetchStats", + sizeof(XLogPrefetchStats), + &found); + + if (!found) + { + pg_atomic_init_u32(&SharedStats->reset_request, 0); + SharedStats->reset_handled = 0; + + pg_atomic_init_u64(&SharedStats->reset_time, GetCurrentTimestamp()); + pg_atomic_init_u64(&SharedStats->prefetch, 0); + pg_atomic_init_u64(&SharedStats->skip_hit, 0); + pg_atomic_init_u64(&SharedStats->skip_new, 0); + pg_atomic_init_u64(&SharedStats->skip_fpw, 0); + pg_atomic_init_u64(&SharedStats->skip_seq, 0); + SharedStats->avg_distance = 0; + SharedStats->avg_queue_depth = 0; + SharedStats->distance = 0; + SharedStats->queue_depth = 0; + } +} + +/* + * Called when any GUC is changed that affects prefetching. + */ +void +XLogPrefetchReconfigure(void) +{ + XLogPrefetchReconfigureCount++; +} + +/* + * Called by any backend to request that the stats be reset. + */ +void +XLogPrefetchRequestResetStats(void) +{ + pg_atomic_fetch_add_u32(&SharedStats->reset_request, 1); +} + +/* + * Tell the stats collector to serialize the shared memory counters into the + * stats file. + */ +static void +XLogPrefetchSaveStats(void) +{ + PgStat_RecoveryPrefetchStats serialized = { + .prefetch = pg_atomic_read_u64(&SharedStats->prefetch), + .skip_hit = pg_atomic_read_u64(&SharedStats->skip_hit), + .skip_new = pg_atomic_read_u64(&SharedStats->skip_new), + .skip_fpw = pg_atomic_read_u64(&SharedStats->skip_fpw), + .skip_seq = pg_atomic_read_u64(&SharedStats->skip_seq), + .stat_reset_timestamp = pg_atomic_read_u64(&SharedStats->reset_time) + }; + + pgstat_send_recoveryprefetch(&serialized); +} + +/* + * Try to restore the shared memory counters from the stats file. + */ +static void +XLogPrefetchRestoreStats(void) +{ + PgStat_RecoveryPrefetchStats *serialized = pgstat_fetch_recoveryprefetch(); + + if (serialized->stat_reset_timestamp != 0) + { + pg_atomic_write_u64(&SharedStats->prefetch, serialized->prefetch); + pg_atomic_write_u64(&SharedStats->skip_hit, serialized->skip_hit); + pg_atomic_write_u64(&SharedStats->skip_new, serialized->skip_new); + pg_atomic_write_u64(&SharedStats->skip_fpw, serialized->skip_fpw); + pg_atomic_write_u64(&SharedStats->skip_seq, serialized->skip_seq); + pg_atomic_write_u64(&SharedStats->reset_time, serialized->stat_reset_timestamp); + } +} + +/* + * Increment a counter in shared memory. This is equivalent to *counter++ on a + * plain uint64 without any memory barrier or locking, except on platforms + * where readers can't read uint64 without possibly observing a torn value. + */ +static inline void +XLogPrefetchIncrement(pg_atomic_uint64 *counter) +{ + Assert(AmStartupProcess() || !IsUnderPostmaster); + pg_atomic_write_u64(counter, pg_atomic_read_u64(counter) + 1); +} + +/* + * Initialize an XLogPrefetchState object and restore the last saved + * statistics from disk. + */ +void +XLogPrefetchBegin(XLogPrefetchState *state, XLogReaderState *reader) +{ + XLogPrefetchRestoreStats(); + + /* We'll reconfigure on the first call to XLogPrefetch(). */ + state->reader = reader; + state->prefetcher = NULL; + state->reconfigure_count = XLogPrefetchReconfigureCount - 1; +} + +/* + * Shut down the prefetching infrastructure, if configured. + */ +void +XLogPrefetchEnd(XLogPrefetchState *state) +{ + XLogPrefetchSaveStats(); + + if (state->prefetcher) + XLogPrefetcherFree(state->prefetcher); + state->prefetcher = NULL; + + SharedStats->queue_depth = 0; + SharedStats->distance = 0; +} + +/* + * Create a prefetcher that is ready to begin prefetching blocks referenced by + * WAL records. + */ +XLogPrefetcher * +XLogPrefetcherAllocate(XLogReaderState *reader) +{ + XLogPrefetcher *prefetcher; + static HASHCTL hash_table_ctl = { + .keysize = sizeof(RelFileNode), + .entrysize = sizeof(XLogPrefetcherFilter) + }; + + /* + * The size of the queue is based on the maintenance_io_concurrency + * setting. In theory we might have a separate queue for each tablespace, + * but it's not clear how that should work, so for now we'll just use the + * general GUC to rate-limit all prefetching. The queue has space for up + * the highest possible value of the GUC + 1, because our circular buffer + * has a gap between head and tail when full. + */ + prefetcher = palloc0(sizeof(XLogPrefetcher)); + prefetcher->prefetch_queue_size = maintenance_io_concurrency + 1; + prefetcher->reader = reader; + prefetcher->filter_table = hash_create("XLogPrefetcherFilterTable", 1024, + &hash_table_ctl, + HASH_ELEM | HASH_BLOBS); + dlist_init(&prefetcher->filter_queue); + + SharedStats->queue_depth = 0; + SharedStats->distance = 0; + + return prefetcher; +} + +/* + * Destroy a prefetcher and release all resources. + */ +void +XLogPrefetcherFree(XLogPrefetcher *prefetcher) +{ + /* Log final statistics. */ + ereport(LOG, + (errmsg("recovery finished prefetching at %X/%X; " + "prefetch = " UINT64_FORMAT ", " + "skip_hit = " UINT64_FORMAT ", " + "skip_new = " UINT64_FORMAT ", " + "skip_fpw = " UINT64_FORMAT ", " + "skip_seq = " UINT64_FORMAT ", " + "avg_distance = %f, " + "avg_queue_depth = %f", + (uint32) (prefetcher->reader->EndRecPtr << 32), + (uint32) (prefetcher->reader->EndRecPtr), + pg_atomic_read_u64(&SharedStats->prefetch), + pg_atomic_read_u64(&SharedStats->skip_hit), + pg_atomic_read_u64(&SharedStats->skip_new), + pg_atomic_read_u64(&SharedStats->skip_fpw), + pg_atomic_read_u64(&SharedStats->skip_seq), + SharedStats->avg_distance, + SharedStats->avg_queue_depth))); + hash_destroy(prefetcher->filter_table); + pfree(prefetcher); +} + +/* + * Called when recovery is replaying a new LSN, to check if we can read ahead. + */ +bool +XLogPrefetcherReadAhead(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn) +{ + uint32 reset_request; + + /* If an error has occurred or we've hit the end of the WAL, do nothing. */ + if (prefetcher->shutdown) + return false; + + /* + * Have any in-flight prefetches definitely completed, judging by the LSN + * that is currently being replayed? + */ + XLogPrefetcherCompletedIO(prefetcher, replaying_lsn); + + /* + * Do we already have the maximum permitted number of I/Os running + * (according to the information we have)? If so, we have to wait for at + * least one to complete, so give up early and let recovery catch up. + */ + if (XLogPrefetcherSaturated(prefetcher)) + return false; + + /* + * Can we drop any filters yet? This happens when the LSN that is + * currently being replayed has moved past a record that prevents + * prefetching of a block range, such as relation extension. + */ + XLogPrefetcherCompleteFilters(prefetcher, replaying_lsn); + + /* + * Have we been asked to reset our stats counters? This is checked with + * an unsynchronized memory read, but we'll see it eventually and we'll be + * accessing that cache line anyway. + */ + reset_request = pg_atomic_read_u32(&SharedStats->reset_request); + if (reset_request != SharedStats->reset_handled) + { + XLogPrefetchResetStats(); + SharedStats->reset_handled = reset_request; + + prefetcher->avg_distance = 0; + prefetcher->avg_queue_depth = 0; + prefetcher->samples = 0; + } + + /* OK, we can now try reading ahead. */ + return XLogPrefetcherScanRecords(prefetcher, replaying_lsn); +} + +/* + * Read ahead as far as we are allowed to, considering the LSN that recovery + * is currently replaying. + * + * Return true if the xlogreader would like more data. + */ +static bool +XLogPrefetcherScanRecords(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn) +{ + XLogReaderState *reader = prefetcher->reader; + DecodedXLogRecord *record; + + Assert(!XLogPrefetcherSaturated(prefetcher)); + + for (;;) + { + char *error; + int64 distance; + + /* If we don't already have a record, then try to read one. */ + if (prefetcher->record == NULL) + { + switch (XLogReadAhead(reader, &record, &error)) + { + case XLREAD_NEED_DATA: + return true; + case XLREAD_FAIL: + if (error) + ereport(LOG, + (errmsg("recovery no longer prefetching: %s", + error))); + else + ereport(LOG, + (errmsg("recovery no longer prefetching"))); + prefetcher->shutdown = true; + SharedStats->queue_depth = 0; + SharedStats->distance = 0; + + return false; + case XLREAD_FULL: + return false; + case XLREAD_SUCCESS: + prefetcher->record = record; + prefetcher->next_block_id = 0; + break; + } + } + else + { + /* + * We ran out of I/O queue while part way through a record. We'll + * carry on where we left off, according to next_block_id. + */ + record = prefetcher->record; + } + + /* How far ahead of replay are we now? */ + distance = record->lsn - replaying_lsn; + + /* Update distance shown in shm. */ + SharedStats->distance = distance; + + /* Periodically recompute some statistics. */ + if (unlikely(replaying_lsn >= prefetcher->next_sample_lsn)) + { + /* Compute online averages. */ + prefetcher->samples++; + if (prefetcher->samples == 1) + { + prefetcher->avg_distance = SharedStats->distance; + prefetcher->avg_queue_depth = SharedStats->queue_depth; + } + else + { + prefetcher->avg_distance += + (SharedStats->distance - prefetcher->avg_distance) / + prefetcher->samples; + prefetcher->avg_queue_depth += + (SharedStats->queue_depth - prefetcher->avg_queue_depth) / + prefetcher->samples; + } + + /* Expose it in shared memory. */ + SharedStats->avg_distance = prefetcher->avg_distance; + SharedStats->avg_queue_depth = prefetcher->avg_queue_depth; + + /* Also periodically save the simple counters. */ + XLogPrefetchSaveStats(); + + prefetcher->next_sample_lsn = + replaying_lsn + XLOGPREFETCHER_SAMPLE_DISTANCE; + } + + /* Are we not far enough ahead? */ + if (distance <= 0) + { + /* XXX Is this still possible? */ + prefetcher->record = NULL; /* skip this record */ + continue; + } + + /* + * If this is a record that creates a new SMGR relation, we'll avoid + * prefetching anything from that rnode until it has been replayed. + */ + if (replaying_lsn < record->lsn && + record->header.xl_rmid == RM_SMGR_ID && + (record->header.xl_info & ~XLR_INFO_MASK) == XLOG_SMGR_CREATE) + { + xl_smgr_create *xlrec = (xl_smgr_create *) record->main_data; + + XLogPrefetcherAddFilter(prefetcher, xlrec->rnode, 0, record->lsn); + } + + /* Scan the record's block references. */ + if (!XLogPrefetcherScanBlocks(prefetcher)) + return false; + + /* Advance to the next record. */ + prefetcher->record = NULL; + } +} + +/* + * Scan the current record for block references, and consider prefetching. + * + * Return true if we processed the current record to completion and still have + * queue space to process a new record, and false if we saturated the I/O + * queue and need to wait for recovery to advance before we continue. + */ +static bool +XLogPrefetcherScanBlocks(XLogPrefetcher *prefetcher) +{ + DecodedXLogRecord *record = prefetcher->record; + + Assert(!XLogPrefetcherSaturated(prefetcher)); + + /* + * We might already have been partway through processing this record when + * our queue became saturated, so we need to start where we left off. + */ + for (int block_id = prefetcher->next_block_id; + block_id <= record->max_block_id; + ++block_id) + { + DecodedBkpBlock *block = &record->blocks[block_id]; + PrefetchBufferResult prefetch; + SMgrRelation reln; + + /* Ignore everything but the main fork for now. */ + if (block->forknum != MAIN_FORKNUM) + continue; + + /* + * If there is a full page image attached, we won't be reading the + * page, so you might think we should skip it. However, if the + * underlying filesystem uses larger logical blocks than us, it might + * still need to perform a read-before-write some time later. + * Therefore, only prefetch if configured to do so. + */ + if (block->has_image && !recovery_prefetch_fpw) + { + XLogPrefetchIncrement(&SharedStats->skip_fpw); + continue; + } + + /* + * If this block will initialize a new page then it's probably a + * relation extension. Since that might create a new segment, we + * can't try to prefetch this block until the record has been + * replayed, or we might try to open a file that doesn't exist yet. + */ + if (block->flags & BKPBLOCK_WILL_INIT) + { + XLogPrefetcherAddFilter(prefetcher, block->rnode, block->blkno, + record->lsn); + XLogPrefetchIncrement(&SharedStats->skip_new); + continue; + } + + /* Should we skip this block due to a filter? */ + if (XLogPrefetcherIsFiltered(prefetcher, block->rnode, block->blkno)) + { + XLogPrefetchIncrement(&SharedStats->skip_new); + continue; + } + + /* Fast path for repeated references to the same relation. */ + if (RelFileNodeEquals(block->rnode, prefetcher->last_rnode)) + { + /* + * If this is a repeat access to the same block, then skip it. + * + * XXX We could also check for last_blkno + 1 too, and also update + * last_blkno; it's not clear if the kernel would do a better job + * of sequential prefetching. + */ + if (block->blkno == prefetcher->last_blkno) + { + XLogPrefetchIncrement(&SharedStats->skip_seq); + continue; + } + + /* We can avoid calling smgropen(). */ + reln = prefetcher->last_reln; + } + else + { + /* Otherwise we have to open it. */ + reln = smgropen(block->rnode, InvalidBackendId); + prefetcher->last_rnode = block->rnode; + prefetcher->last_reln = reln; + } + prefetcher->last_blkno = block->blkno; + + /* Try to prefetch this block! */ + prefetch = PrefetchSharedBuffer(reln, block->forknum, block->blkno); + if (BufferIsValid(prefetch.recent_buffer)) + { + /* + * It was already cached, so do nothing. We'll remember the + * buffer, so that recovery can try to avoid looking it up again. + */ + block->recent_buffer = prefetch.recent_buffer; + XLogPrefetchIncrement(&SharedStats->skip_hit); + } + else if (prefetch.initiated_io) + { + /* + * I/O has possibly been initiated (though we don't know if it was + * already cached by the kernel, so we just have to assume that it + * has due to lack of better information). Record this as an I/O + * in progress until eventually we replay this LSN. + */ + XLogPrefetchIncrement(&SharedStats->prefetch); + XLogPrefetcherInitiatedIO(prefetcher, record->lsn); + + /* + * If the queue is now full, we'll have to wait before processing + * any more blocks from this record, or move to a new record if + * that was the last block. + */ + if (XLogPrefetcherSaturated(prefetcher)) + { + prefetcher->next_block_id = block_id + 1; + return false; + } + } + else + { + /* + * Neither cached nor initiated. The underlying segment file + * doesn't exist. Presumably it will be unlinked by a later WAL + * record. When recovery reads this block, it will use the + * EXTENSION_CREATE_RECOVERY flag. We certainly don't want to do + * that sort of thing while merely prefetching, so let's just + * ignore references to this relation until this record is + * replayed, and let recovery create the dummy file or complain if + * something is wrong. + */ + XLogPrefetcherAddFilter(prefetcher, block->rnode, 0, + record->lsn); + XLogPrefetchIncrement(&SharedStats->skip_new); + } + } + + return true; +} + +/* + * Expose statistics about recovery prefetching. + */ +Datum +pg_stat_get_prefetch_recovery(PG_FUNCTION_ARGS) +{ +#define PG_STAT_GET_PREFETCH_RECOVERY_COLS 10 + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + TupleDesc tupdesc; + Tuplestorestate *tupstore; + MemoryContext per_query_ctx; + MemoryContext oldcontext; + Datum values[PG_STAT_GET_PREFETCH_RECOVERY_COLS]; + bool nulls[PG_STAT_GET_PREFETCH_RECOVERY_COLS]; + + if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("set-valued function called in context that cannot accept a set"))); + if (!(rsinfo->allowedModes & SFRM_Materialize)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("materialize mod required, but it is not allowed in this context"))); + + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + elog(ERROR, "return type must be a row type"); + + per_query_ctx = rsinfo->econtext->ecxt_per_query_memory; + oldcontext = MemoryContextSwitchTo(per_query_ctx); + + tupstore = tuplestore_begin_heap(true, false, work_mem); + rsinfo->returnMode = SFRM_Materialize; + rsinfo->setResult = tupstore; + rsinfo->setDesc = tupdesc; + + MemoryContextSwitchTo(oldcontext); + + if (pg_atomic_read_u32(&SharedStats->reset_request) != SharedStats->reset_handled) + { + /* There's an unhandled reset request, so just show NULLs */ + for (int i = 0; i < PG_STAT_GET_PREFETCH_RECOVERY_COLS; ++i) + nulls[i] = true; + } + else + { + for (int i = 0; i < PG_STAT_GET_PREFETCH_RECOVERY_COLS; ++i) + nulls[i] = false; + } + + values[0] = TimestampTzGetDatum(pg_atomic_read_u64(&SharedStats->reset_time)); + values[1] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->prefetch)); + values[2] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_hit)); + values[3] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_new)); + values[4] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_fpw)); + values[5] = Int64GetDatum(pg_atomic_read_u64(&SharedStats->skip_seq)); + values[6] = Int32GetDatum(SharedStats->distance); + values[7] = Int32GetDatum(SharedStats->queue_depth); + values[8] = Float4GetDatum(SharedStats->avg_distance); + values[9] = Float4GetDatum(SharedStats->avg_queue_depth); + tuplestore_putvalues(tupstore, tupdesc, values, nulls); + tuplestore_donestoring(tupstore); + + return (Datum) 0; +} + +/* + * Compute (n + 1) % prefetch_queue_size, assuming n < prefetch_queue_size, + * without using division. + */ +static inline int +XLogPrefetcherNext(XLogPrefetcher *prefetcher, int n) +{ + int next = n + 1; + + return next == prefetcher->prefetch_queue_size ? 0 : next; +} + +/* + * Don't prefetch any blocks >= 'blockno' from a given 'rnode', until 'lsn' + * has been replayed. + */ +static inline void +XLogPrefetcherAddFilter(XLogPrefetcher *prefetcher, RelFileNode rnode, + BlockNumber blockno, XLogRecPtr lsn) +{ + XLogPrefetcherFilter *filter; + bool found; + + filter = hash_search(prefetcher->filter_table, &rnode, HASH_ENTER, &found); + if (!found) + { + /* + * Don't allow any prefetching of this block or higher until replayed. + */ + filter->filter_until_replayed = lsn; + filter->filter_from_block = blockno; + dlist_push_head(&prefetcher->filter_queue, &filter->link); + } + else + { + /* + * We were already filtering this rnode. Extend the filter's lifetime + * to cover this WAL record, but leave the (presumably lower) block + * number there because we don't want to have to track individual + * blocks. + */ + filter->filter_until_replayed = lsn; + dlist_delete(&filter->link); + dlist_push_head(&prefetcher->filter_queue, &filter->link); + } +} + +/* + * Have we replayed the records that caused us to begin filtering a block + * range? That means that relations should have been created, extended or + * dropped as required, so we can drop relevant filters. + */ +static inline void +XLogPrefetcherCompleteFilters(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn) +{ + while (unlikely(!dlist_is_empty(&prefetcher->filter_queue))) + { + XLogPrefetcherFilter *filter = dlist_tail_element(XLogPrefetcherFilter, + link, + &prefetcher->filter_queue); + + if (filter->filter_until_replayed >= replaying_lsn) + break; + dlist_delete(&filter->link); + hash_search(prefetcher->filter_table, filter, HASH_REMOVE, NULL); + } +} + +/* + * Check if a given block should be skipped due to a filter. + */ +static inline bool +XLogPrefetcherIsFiltered(XLogPrefetcher *prefetcher, RelFileNode rnode, + BlockNumber blockno) +{ + /* + * Test for empty queue first, because we expect it to be empty most of + * the time and we can avoid the hash table lookup in that case. + */ + if (unlikely(!dlist_is_empty(&prefetcher->filter_queue))) + { + XLogPrefetcherFilter *filter = hash_search(prefetcher->filter_table, &rnode, + HASH_FIND, NULL); + + if (filter && filter->filter_from_block <= blockno) + return true; + } + + return false; +} + +/* + * Insert an LSN into the queue. The queue must not be full already. This + * tracks the fact that we have (to the best of our knowledge) initiated an + * I/O, so that we can impose a cap on concurrent prefetching. + */ +static inline void +XLogPrefetcherInitiatedIO(XLogPrefetcher *prefetcher, + XLogRecPtr prefetching_lsn) +{ + Assert(!XLogPrefetcherSaturated(prefetcher)); + prefetcher->prefetch_queue[prefetcher->prefetch_head] = prefetching_lsn; + prefetcher->prefetch_head = + XLogPrefetcherNext(prefetcher, prefetcher->prefetch_head); + SharedStats->queue_depth++; + + Assert(SharedStats->queue_depth <= prefetcher->prefetch_queue_size); +} + +/* + * Have we replayed the records that caused us to initiate the oldest + * prefetches yet? That means that they're definitely finished, so we can can + * forget about them and allow ourselves to initiate more prefetches. For now + * we don't have any awareness of when I/O really completes. + */ +static inline void +XLogPrefetcherCompletedIO(XLogPrefetcher *prefetcher, XLogRecPtr replaying_lsn) +{ + while (prefetcher->prefetch_head != prefetcher->prefetch_tail && + prefetcher->prefetch_queue[prefetcher->prefetch_tail] < replaying_lsn) + { + prefetcher->prefetch_tail = + XLogPrefetcherNext(prefetcher, prefetcher->prefetch_tail); + SharedStats->queue_depth--; + + Assert(SharedStats->queue_depth >= 0); + } +} + +/* + * Check if the maximum allowed number of I/Os is already in flight. + */ +static inline bool +XLogPrefetcherSaturated(XLogPrefetcher *prefetcher) +{ + int next = XLogPrefetcherNext(prefetcher, prefetcher->prefetch_head); + + return next == prefetcher->prefetch_tail; +} + +void +assign_recovery_prefetch(bool new_value, void *extra) +{ + /* Reconfigure prefetching, because a setting it depends on changed. */ + recovery_prefetch = new_value; + if (AmStartupProcess()) + XLogPrefetchReconfigure(); +} + +void +assign_recovery_prefetch_fpw(bool new_value, void *extra) +{ + /* Reconfigure prefetching, because a setting it depends on changed. */ + recovery_prefetch_fpw = new_value; + if (AmStartupProcess()) + XLogPrefetchReconfigure(); +} |