diff options
Diffstat (limited to 'src/backend/storage/buffer/bufmgr.c')
-rw-r--r-- | src/backend/storage/buffer/bufmgr.c | 274 |
1 files changed, 238 insertions, 36 deletions
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index 5b9192ed450..8f6efdbf0e0 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -37,6 +37,7 @@ #include "catalog/catalog.h" #include "catalog/storage.h" #include "executor/instrument.h" +#include "lib/binaryheap.h" #include "miscadmin.h" #include "pg_trace.h" #include "pgstat.h" @@ -75,6 +76,34 @@ typedef struct PrivateRefCountEntry /* 64 bytes, about the size of a cache line on common systems */ #define REFCOUNT_ARRAY_ENTRIES 8 +/* + * Status of buffers to checkpoint for a particular tablespace, used + * internally in BufferSync. + */ +typedef struct CkptTsStatus +{ + /* oid of the tablespace */ + Oid tsId; + + /* + * Checkpoint progress for this tablespace. To make progress comparable + * between tablespaces the progress is, for each tablespace, measured as a + * number between 0 and the total number of to-be-checkpointed pages. Each + * page checkpointed in this tablespace increments this space's progress + * by progress_slice. + */ + float8 progress; + float8 progress_slice; + + /* number of to-be checkpointed pages in this tablespace */ + int num_to_scan; + /* already processed pages in this tablespace */ + int num_scanned; + + /* current offset in CkptBufferIds for this tablespace */ + int index; +} CkptTsStatus; + /* GUC variables */ bool zero_damaged_pages = false; int bgwriter_lru_maxpages = 100; @@ -425,6 +454,8 @@ static void AtProcExit_Buffers(int code, Datum arg); static void CheckForBufferLeaks(void); static int rnode_comparator(const void *p1, const void *p2); static int buffertag_comparator(const void *p1, const void *p2); +static int ckpt_buforder_comparator(const void *pa, const void *pb); +static int ts_ckpt_progress_comparator(Datum a, Datum b, void *arg); /* @@ -1658,8 +1689,13 @@ BufferSync(int flags) { int buf_id; int num_to_scan; - int num_to_write; + int num_spaces; + int num_processed; int num_written; + CkptTsStatus *per_ts_stat = NULL; + Oid last_tsid; + binaryheap *ts_heap; + int i; int mask = BM_DIRTY; WritebackContext wb_context; @@ -1677,7 +1713,7 @@ BufferSync(int flags) /* * Loop over all buffers, and mark the ones that need to be written with - * BM_CHECKPOINT_NEEDED. Count them as we go (num_to_write), so that we + * BM_CHECKPOINT_NEEDED. Count them as we go (num_to_scan), so that we * can estimate how much work needs to be done. * * This allows us to write only those pages that were dirty when the @@ -1691,7 +1727,7 @@ BufferSync(int flags) * BM_CHECKPOINT_NEEDED still set. This is OK since any such buffer would * certainly need to be written for the next checkpoint attempt, too. */ - num_to_write = 0; + num_to_scan = 0; for (buf_id = 0; buf_id < NBuffers; buf_id++) { BufferDesc *bufHdr = GetBufferDescriptor(buf_id); @@ -1704,34 +1740,141 @@ BufferSync(int flags) if ((bufHdr->flags & mask) == mask) { + CkptSortItem *item; + bufHdr->flags |= BM_CHECKPOINT_NEEDED; - num_to_write++; + + item = &CkptBufferIds[num_to_scan++]; + item->buf_id = buf_id; + item->tsId = bufHdr->tag.rnode.spcNode; + item->relNode = bufHdr->tag.rnode.relNode; + item->forkNum = bufHdr->tag.forkNum; + item->blockNum = bufHdr->tag.blockNum; } UnlockBufHdr(bufHdr); } - if (num_to_write == 0) + if (num_to_scan == 0) return; /* nothing to do */ WritebackContextInit(&wb_context, &checkpoint_flush_after); - TRACE_POSTGRESQL_BUFFER_SYNC_START(NBuffers, num_to_write); + TRACE_POSTGRESQL_BUFFER_SYNC_START(NBuffers, num_to_scan); /* - * Loop over all buffers again, and write the ones (still) marked with - * BM_CHECKPOINT_NEEDED. In this loop, we start at the clock sweep point - * since we might as well dump soon-to-be-recycled buffers first. - * - * Note that we don't read the buffer alloc count here --- that should be - * left untouched till the next BgBufferSync() call. + * Sort buffers that need to be written to reduce the likelihood of random + * IO. The sorting is also important for the implementation of balancing + * writes between tablespaces. Without balancing writes we'd potentially + * end up writing to the tablespaces one-by-one; possibly overloading the + * underlying system. */ - buf_id = StrategySyncStart(NULL, NULL); - num_to_scan = NBuffers; + qsort(CkptBufferIds, num_to_scan, sizeof(CkptSortItem), + ckpt_buforder_comparator); + + num_spaces = 0; + + /* + * Allocate progress status for each tablespace with buffers that need to + * be flushed. This requires the to-be-flushed array to be sorted. + */ + last_tsid = InvalidOid; + for (i = 0; i < num_to_scan; i++) + { + CkptTsStatus *s; + Oid cur_tsid; + + cur_tsid = CkptBufferIds[i].tsId; + + /* + * Grow array of per-tablespace status structs, every time a new + * tablespace is found. + */ + if (last_tsid == InvalidOid || last_tsid != cur_tsid) + { + Size sz; + + num_spaces++; + + /* + * Not worth adding grow-by-power-of-2 logic here - even with a + * few hundred tablespaces this should be fine. + */ + sz = sizeof(CkptTsStatus) * num_spaces; + + if (per_ts_stat == NULL) + per_ts_stat = (CkptTsStatus *) palloc(sz); + else + per_ts_stat = (CkptTsStatus *) repalloc(per_ts_stat, sz); + + s = &per_ts_stat[num_spaces - 1]; + memset(s, 0, sizeof(*s)); + s->tsId = cur_tsid; + + /* + * The first buffer in this tablespace. As CkptBufferIds is sorted + * by tablespace all (s->num_to_scan) buffers in this tablespace + * will follow afterwards. + */ + s->index = i; + + /* + * progress_slice will be determined once we know how many buffers + * are in each tablespace, i.e. after this loop. + */ + + last_tsid = cur_tsid; + } + else + { + s = &per_ts_stat[num_spaces - 1]; + } + + s->num_to_scan++; + } + + Assert(num_spaces > 0); + + /* + * Build a min-heap over the write-progress in the individual tablespaces, + * and compute how large a portion of the total progress a single + * processed buffer is. + */ + ts_heap = binaryheap_allocate(num_spaces, + ts_ckpt_progress_comparator, + NULL); + + for (i = 0; i < num_spaces; i++) + { + CkptTsStatus *ts_stat = &per_ts_stat[i]; + + ts_stat->progress_slice = (float8) num_to_scan / ts_stat->num_to_scan; + + binaryheap_add_unordered(ts_heap, PointerGetDatum(ts_stat)); + } + + binaryheap_build(ts_heap); + + /* + * Iterate through to-be-checkpointed buffers and write the ones (still) + * marked with BM_CHECKPOINT_NEEDED. The writes are balanced between + * tablespaces; otherwise the sorting would lead to only one tablespace + * receiving writes at a time, making inefficient use of the hardware. + */ + num_processed = 0; num_written = 0; - while (num_to_scan-- > 0) + while (!binaryheap_empty(ts_heap)) { - BufferDesc *bufHdr = GetBufferDescriptor(buf_id); + BufferDesc *bufHdr = NULL; + CkptTsStatus *ts_stat = (CkptTsStatus *) + DatumGetPointer(binaryheap_first(ts_heap)); + + buf_id = CkptBufferIds[ts_stat->index].buf_id; + Assert(buf_id != -1); + + bufHdr = GetBufferDescriptor(buf_id); + + num_processed++; /* * We don't need to acquire the lock here, because we're only looking @@ -1752,35 +1895,41 @@ BufferSync(int flags) TRACE_POSTGRESQL_BUFFER_SYNC_WRITTEN(buf_id); BgWriterStats.m_buf_written_checkpoints++; num_written++; + } + } - /* - * We know there are at most num_to_write buffers with - * BM_CHECKPOINT_NEEDED set; so we can stop scanning if - * num_written reaches num_to_write. - * - * Note that num_written doesn't include buffers written by - * other backends, or by the bgwriter cleaning scan. That - * means that the estimate of how much progress we've made is - * conservative, and also that this test will often fail to - * trigger. But it seems worth making anyway. - */ - if (num_written >= num_to_write) - break; + /* + * Measure progress independent of actualy having to flush the buffer + * - otherwise writing become unbalanced. + */ + ts_stat->progress += ts_stat->progress_slice; + ts_stat->num_scanned++; + ts_stat->index++; - /* - * Sleep to throttle our I/O rate. - */ - CheckpointWriteDelay(flags, (double) num_written / num_to_write); - } + /* Have all the buffers from the tablespace been processed? */ + if (ts_stat->num_scanned == ts_stat->num_to_scan) + { + binaryheap_remove_first(ts_heap); + } + else + { + /* update heap with the new progress */ + binaryheap_replace_first(ts_heap, PointerGetDatum(ts_stat)); } - if (++buf_id >= NBuffers) - buf_id = 0; + /* + * Sleep to throttle our I/O rate. + */ + CheckpointWriteDelay(flags, (double) num_processed / num_to_scan); } /* issue all pending flushes */ IssuePendingWritebacks(&wb_context); + pfree(per_ts_stat); + per_ts_stat = NULL; + binaryheap_free(ts_heap); + /* * Update checkpoint statistics. As noted above, this doesn't include * buffers written by other backends or bgwriter scan. @@ -3791,6 +3940,59 @@ buffertag_comparator(const void *a, const void *b) } /* + * Comparator determining the writeout order in a checkpoint. + * + * It is important that tablespaces are compared first, the logic balancing + * writes between tablespaces relies on it. + */ +static int +ckpt_buforder_comparator(const void *pa, const void *pb) +{ + const CkptSortItem *a = (CkptSortItem *) pa; + const CkptSortItem *b = (CkptSortItem *) pb; + + /* compare tablespace */ + if (a->tsId < b->tsId) + return -1; + else if (a->tsId > b->tsId) + return 1; + /* compare relation */ + if (a->relNode < b->relNode) + return -1; + else if (a->relNode > b->relNode) + return 1; + /* compare fork */ + else if (a->forkNum < b->forkNum) + return -1; + else if (a->forkNum > b->forkNum) + return 1; + /* compare block number */ + else if (a->blockNum < b->blockNum) + return -1; + else /* should not be the same block ... */ + return 1; +} + +/* + * Comparator for a Min-Heap over the per-tablespace checkpoint completion + * progress. + */ +static int +ts_ckpt_progress_comparator(Datum a, Datum b, void *arg) +{ + CkptTsStatus *sa = (CkptTsStatus *) a; + CkptTsStatus *sb = (CkptTsStatus *) b; + + /* we want a min-heap, so return 1 for the a < b */ + if (sa->progress < sb->progress) + return 1; + else if (sa->progress == sb->progress) + return 0; + else + return -1; +} + +/* * Initialize a writeback context, discarding potential previous state. * * *max_coalesce is a pointer to a variable containing the current maximum |