aboutsummaryrefslogtreecommitdiff
path: root/src/backend/storage/buffer/bufmgr.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/storage/buffer/bufmgr.c')
-rw-r--r--src/backend/storage/buffer/bufmgr.c274
1 files changed, 238 insertions, 36 deletions
diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c
index 5b9192ed450..8f6efdbf0e0 100644
--- a/src/backend/storage/buffer/bufmgr.c
+++ b/src/backend/storage/buffer/bufmgr.c
@@ -37,6 +37,7 @@
#include "catalog/catalog.h"
#include "catalog/storage.h"
#include "executor/instrument.h"
+#include "lib/binaryheap.h"
#include "miscadmin.h"
#include "pg_trace.h"
#include "pgstat.h"
@@ -75,6 +76,34 @@ typedef struct PrivateRefCountEntry
/* 64 bytes, about the size of a cache line on common systems */
#define REFCOUNT_ARRAY_ENTRIES 8
+/*
+ * Status of buffers to checkpoint for a particular tablespace, used
+ * internally in BufferSync.
+ */
+typedef struct CkptTsStatus
+{
+ /* oid of the tablespace */
+ Oid tsId;
+
+ /*
+ * Checkpoint progress for this tablespace. To make progress comparable
+ * between tablespaces the progress is, for each tablespace, measured as a
+ * number between 0 and the total number of to-be-checkpointed pages. Each
+ * page checkpointed in this tablespace increments this space's progress
+ * by progress_slice.
+ */
+ float8 progress;
+ float8 progress_slice;
+
+ /* number of to-be checkpointed pages in this tablespace */
+ int num_to_scan;
+ /* already processed pages in this tablespace */
+ int num_scanned;
+
+ /* current offset in CkptBufferIds for this tablespace */
+ int index;
+} CkptTsStatus;
+
/* GUC variables */
bool zero_damaged_pages = false;
int bgwriter_lru_maxpages = 100;
@@ -425,6 +454,8 @@ static void AtProcExit_Buffers(int code, Datum arg);
static void CheckForBufferLeaks(void);
static int rnode_comparator(const void *p1, const void *p2);
static int buffertag_comparator(const void *p1, const void *p2);
+static int ckpt_buforder_comparator(const void *pa, const void *pb);
+static int ts_ckpt_progress_comparator(Datum a, Datum b, void *arg);
/*
@@ -1658,8 +1689,13 @@ BufferSync(int flags)
{
int buf_id;
int num_to_scan;
- int num_to_write;
+ int num_spaces;
+ int num_processed;
int num_written;
+ CkptTsStatus *per_ts_stat = NULL;
+ Oid last_tsid;
+ binaryheap *ts_heap;
+ int i;
int mask = BM_DIRTY;
WritebackContext wb_context;
@@ -1677,7 +1713,7 @@ BufferSync(int flags)
/*
* Loop over all buffers, and mark the ones that need to be written with
- * BM_CHECKPOINT_NEEDED. Count them as we go (num_to_write), so that we
+ * BM_CHECKPOINT_NEEDED. Count them as we go (num_to_scan), so that we
* can estimate how much work needs to be done.
*
* This allows us to write only those pages that were dirty when the
@@ -1691,7 +1727,7 @@ BufferSync(int flags)
* BM_CHECKPOINT_NEEDED still set. This is OK since any such buffer would
* certainly need to be written for the next checkpoint attempt, too.
*/
- num_to_write = 0;
+ num_to_scan = 0;
for (buf_id = 0; buf_id < NBuffers; buf_id++)
{
BufferDesc *bufHdr = GetBufferDescriptor(buf_id);
@@ -1704,34 +1740,141 @@ BufferSync(int flags)
if ((bufHdr->flags & mask) == mask)
{
+ CkptSortItem *item;
+
bufHdr->flags |= BM_CHECKPOINT_NEEDED;
- num_to_write++;
+
+ item = &CkptBufferIds[num_to_scan++];
+ item->buf_id = buf_id;
+ item->tsId = bufHdr->tag.rnode.spcNode;
+ item->relNode = bufHdr->tag.rnode.relNode;
+ item->forkNum = bufHdr->tag.forkNum;
+ item->blockNum = bufHdr->tag.blockNum;
}
UnlockBufHdr(bufHdr);
}
- if (num_to_write == 0)
+ if (num_to_scan == 0)
return; /* nothing to do */
WritebackContextInit(&wb_context, &checkpoint_flush_after);
- TRACE_POSTGRESQL_BUFFER_SYNC_START(NBuffers, num_to_write);
+ TRACE_POSTGRESQL_BUFFER_SYNC_START(NBuffers, num_to_scan);
/*
- * Loop over all buffers again, and write the ones (still) marked with
- * BM_CHECKPOINT_NEEDED. In this loop, we start at the clock sweep point
- * since we might as well dump soon-to-be-recycled buffers first.
- *
- * Note that we don't read the buffer alloc count here --- that should be
- * left untouched till the next BgBufferSync() call.
+ * Sort buffers that need to be written to reduce the likelihood of random
+ * IO. The sorting is also important for the implementation of balancing
+ * writes between tablespaces. Without balancing writes we'd potentially
+ * end up writing to the tablespaces one-by-one; possibly overloading the
+ * underlying system.
*/
- buf_id = StrategySyncStart(NULL, NULL);
- num_to_scan = NBuffers;
+ qsort(CkptBufferIds, num_to_scan, sizeof(CkptSortItem),
+ ckpt_buforder_comparator);
+
+ num_spaces = 0;
+
+ /*
+ * Allocate progress status for each tablespace with buffers that need to
+ * be flushed. This requires the to-be-flushed array to be sorted.
+ */
+ last_tsid = InvalidOid;
+ for (i = 0; i < num_to_scan; i++)
+ {
+ CkptTsStatus *s;
+ Oid cur_tsid;
+
+ cur_tsid = CkptBufferIds[i].tsId;
+
+ /*
+ * Grow array of per-tablespace status structs, every time a new
+ * tablespace is found.
+ */
+ if (last_tsid == InvalidOid || last_tsid != cur_tsid)
+ {
+ Size sz;
+
+ num_spaces++;
+
+ /*
+ * Not worth adding grow-by-power-of-2 logic here - even with a
+ * few hundred tablespaces this should be fine.
+ */
+ sz = sizeof(CkptTsStatus) * num_spaces;
+
+ if (per_ts_stat == NULL)
+ per_ts_stat = (CkptTsStatus *) palloc(sz);
+ else
+ per_ts_stat = (CkptTsStatus *) repalloc(per_ts_stat, sz);
+
+ s = &per_ts_stat[num_spaces - 1];
+ memset(s, 0, sizeof(*s));
+ s->tsId = cur_tsid;
+
+ /*
+ * The first buffer in this tablespace. As CkptBufferIds is sorted
+ * by tablespace all (s->num_to_scan) buffers in this tablespace
+ * will follow afterwards.
+ */
+ s->index = i;
+
+ /*
+ * progress_slice will be determined once we know how many buffers
+ * are in each tablespace, i.e. after this loop.
+ */
+
+ last_tsid = cur_tsid;
+ }
+ else
+ {
+ s = &per_ts_stat[num_spaces - 1];
+ }
+
+ s->num_to_scan++;
+ }
+
+ Assert(num_spaces > 0);
+
+ /*
+ * Build a min-heap over the write-progress in the individual tablespaces,
+ * and compute how large a portion of the total progress a single
+ * processed buffer is.
+ */
+ ts_heap = binaryheap_allocate(num_spaces,
+ ts_ckpt_progress_comparator,
+ NULL);
+
+ for (i = 0; i < num_spaces; i++)
+ {
+ CkptTsStatus *ts_stat = &per_ts_stat[i];
+
+ ts_stat->progress_slice = (float8) num_to_scan / ts_stat->num_to_scan;
+
+ binaryheap_add_unordered(ts_heap, PointerGetDatum(ts_stat));
+ }
+
+ binaryheap_build(ts_heap);
+
+ /*
+ * Iterate through to-be-checkpointed buffers and write the ones (still)
+ * marked with BM_CHECKPOINT_NEEDED. The writes are balanced between
+ * tablespaces; otherwise the sorting would lead to only one tablespace
+ * receiving writes at a time, making inefficient use of the hardware.
+ */
+ num_processed = 0;
num_written = 0;
- while (num_to_scan-- > 0)
+ while (!binaryheap_empty(ts_heap))
{
- BufferDesc *bufHdr = GetBufferDescriptor(buf_id);
+ BufferDesc *bufHdr = NULL;
+ CkptTsStatus *ts_stat = (CkptTsStatus *)
+ DatumGetPointer(binaryheap_first(ts_heap));
+
+ buf_id = CkptBufferIds[ts_stat->index].buf_id;
+ Assert(buf_id != -1);
+
+ bufHdr = GetBufferDescriptor(buf_id);
+
+ num_processed++;
/*
* We don't need to acquire the lock here, because we're only looking
@@ -1752,35 +1895,41 @@ BufferSync(int flags)
TRACE_POSTGRESQL_BUFFER_SYNC_WRITTEN(buf_id);
BgWriterStats.m_buf_written_checkpoints++;
num_written++;
+ }
+ }
- /*
- * We know there are at most num_to_write buffers with
- * BM_CHECKPOINT_NEEDED set; so we can stop scanning if
- * num_written reaches num_to_write.
- *
- * Note that num_written doesn't include buffers written by
- * other backends, or by the bgwriter cleaning scan. That
- * means that the estimate of how much progress we've made is
- * conservative, and also that this test will often fail to
- * trigger. But it seems worth making anyway.
- */
- if (num_written >= num_to_write)
- break;
+ /*
+ * Measure progress independent of actualy having to flush the buffer
+ * - otherwise writing become unbalanced.
+ */
+ ts_stat->progress += ts_stat->progress_slice;
+ ts_stat->num_scanned++;
+ ts_stat->index++;
- /*
- * Sleep to throttle our I/O rate.
- */
- CheckpointWriteDelay(flags, (double) num_written / num_to_write);
- }
+ /* Have all the buffers from the tablespace been processed? */
+ if (ts_stat->num_scanned == ts_stat->num_to_scan)
+ {
+ binaryheap_remove_first(ts_heap);
+ }
+ else
+ {
+ /* update heap with the new progress */
+ binaryheap_replace_first(ts_heap, PointerGetDatum(ts_stat));
}
- if (++buf_id >= NBuffers)
- buf_id = 0;
+ /*
+ * Sleep to throttle our I/O rate.
+ */
+ CheckpointWriteDelay(flags, (double) num_processed / num_to_scan);
}
/* issue all pending flushes */
IssuePendingWritebacks(&wb_context);
+ pfree(per_ts_stat);
+ per_ts_stat = NULL;
+ binaryheap_free(ts_heap);
+
/*
* Update checkpoint statistics. As noted above, this doesn't include
* buffers written by other backends or bgwriter scan.
@@ -3791,6 +3940,59 @@ buffertag_comparator(const void *a, const void *b)
}
/*
+ * Comparator determining the writeout order in a checkpoint.
+ *
+ * It is important that tablespaces are compared first, the logic balancing
+ * writes between tablespaces relies on it.
+ */
+static int
+ckpt_buforder_comparator(const void *pa, const void *pb)
+{
+ const CkptSortItem *a = (CkptSortItem *) pa;
+ const CkptSortItem *b = (CkptSortItem *) pb;
+
+ /* compare tablespace */
+ if (a->tsId < b->tsId)
+ return -1;
+ else if (a->tsId > b->tsId)
+ return 1;
+ /* compare relation */
+ if (a->relNode < b->relNode)
+ return -1;
+ else if (a->relNode > b->relNode)
+ return 1;
+ /* compare fork */
+ else if (a->forkNum < b->forkNum)
+ return -1;
+ else if (a->forkNum > b->forkNum)
+ return 1;
+ /* compare block number */
+ else if (a->blockNum < b->blockNum)
+ return -1;
+ else /* should not be the same block ... */
+ return 1;
+}
+
+/*
+ * Comparator for a Min-Heap over the per-tablespace checkpoint completion
+ * progress.
+ */
+static int
+ts_ckpt_progress_comparator(Datum a, Datum b, void *arg)
+{
+ CkptTsStatus *sa = (CkptTsStatus *) a;
+ CkptTsStatus *sb = (CkptTsStatus *) b;
+
+ /* we want a min-heap, so return 1 for the a < b */
+ if (sa->progress < sb->progress)
+ return 1;
+ else if (sa->progress == sb->progress)
+ return 0;
+ else
+ return -1;
+}
+
+/*
* Initialize a writeback context, discarding potential previous state.
*
* *max_coalesce is a pointer to a variable containing the current maximum