diff options
Diffstat (limited to 'src/backend/storage')
-rw-r--r-- | src/backend/storage/buffer/buf_init.c | 5 | ||||
-rw-r--r-- | src/backend/storage/buffer/bufmgr.c | 193 | ||||
-rw-r--r-- | src/backend/storage/file/copydir.c | 4 | ||||
-rw-r--r-- | src/backend/storage/file/fd.c | 157 | ||||
-rw-r--r-- | src/backend/storage/smgr/md.c | 50 | ||||
-rw-r--r-- | src/backend/storage/smgr/smgr.c | 19 |
6 files changed, 401 insertions, 27 deletions
diff --git a/src/backend/storage/buffer/buf_init.c b/src/backend/storage/buffer/buf_init.c index f013a4d9581..e10071d9c0e 100644 --- a/src/backend/storage/buffer/buf_init.c +++ b/src/backend/storage/buffer/buf_init.c @@ -23,6 +23,7 @@ char *BufferBlocks; LWLockMinimallyPadded *BufferIOLWLockArray = NULL; LWLockTranche BufferIOLWLockTranche; LWLockTranche BufferContentLWLockTranche; +WritebackContext BackendWritebackContext; /* @@ -149,6 +150,10 @@ InitBufferPool(void) /* Init other shared buffer-management stuff */ StrategyInitialize(!foundDescs); + + /* Initialize per-backend file flush context */ + WritebackContextInit(&BackendWritebackContext, + &backend_flush_after); } /* diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index e8e0825eb0c..5b9192ed450 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -83,6 +83,14 @@ bool track_io_timing = false; int effective_io_concurrency = 0; /* + * GUC variables about triggering kernel writeback for buffers written; OS + * dependant defaults are set via the GUC mechanism. + */ +int checkpoint_flush_after = 0; +int bgwriter_flush_after = 0; +int backend_flush_after = 0; + +/* * How many buffers PrefetchBuffer callers should try to stay ahead of their * ReadBuffer calls by. This is maintained by the assign hook for * effective_io_concurrency. Zero means "never prefetch". This value is @@ -399,7 +407,7 @@ static bool PinBuffer(BufferDesc *buf, BufferAccessStrategy strategy); static void PinBuffer_Locked(BufferDesc *buf); static void UnpinBuffer(BufferDesc *buf, bool fixOwner); static void BufferSync(int flags); -static int SyncOneBuffer(int buf_id, bool skip_recently_used); +static int SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *flush_context); static void WaitIO(BufferDesc *buf); static bool StartBufferIO(BufferDesc *buf, bool forInput); static void TerminateBufferIO(BufferDesc *buf, bool clear_dirty, @@ -416,6 +424,7 @@ static void FlushBuffer(BufferDesc *buf, SMgrRelation reln); static void AtProcExit_Buffers(int code, Datum arg); static void CheckForBufferLeaks(void); static int rnode_comparator(const void *p1, const void *p2); +static int buffertag_comparator(const void *p1, const void *p2); /* @@ -818,6 +827,13 @@ ReadBuffer_common(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, MemSet((char *) bufBlock, 0, BLCKSZ); /* don't set checksum for all-zero page */ smgrextend(smgr, forkNum, blockNum, (char *) bufBlock, false); + + /* + * NB: we're *not* doing a ScheduleBufferTagForWriteback here; + * although we're essentially performing a write. At least on linux + * doing so defeats the 'delayed allocation' mechanism, leading to + * increased file fragmentation. + */ } else { @@ -1084,6 +1100,9 @@ BufferAlloc(SMgrRelation smgr, char relpersistence, ForkNumber forkNum, FlushBuffer(buf, NULL); LWLockRelease(BufferDescriptorGetContentLock(buf)); + ScheduleBufferTagForWriteback(&BackendWritebackContext, + &buf->tag); + TRACE_POSTGRESQL_BUFFER_WRITE_DIRTY_DONE(forkNum, blockNum, smgr->smgr_rnode.node.spcNode, smgr->smgr_rnode.node.dbNode, @@ -1642,6 +1661,7 @@ BufferSync(int flags) int num_to_write; int num_written; int mask = BM_DIRTY; + WritebackContext wb_context; /* Make sure we can handle the pin inside SyncOneBuffer */ ResourceOwnerEnlargeBuffers(CurrentResourceOwner); @@ -1694,6 +1714,8 @@ BufferSync(int flags) if (num_to_write == 0) return; /* nothing to do */ + WritebackContextInit(&wb_context, &checkpoint_flush_after); + TRACE_POSTGRESQL_BUFFER_SYNC_START(NBuffers, num_to_write); /* @@ -1725,7 +1747,7 @@ BufferSync(int flags) */ if (bufHdr->flags & BM_CHECKPOINT_NEEDED) { - if (SyncOneBuffer(buf_id, false) & BUF_WRITTEN) + if (SyncOneBuffer(buf_id, false, &wb_context) & BUF_WRITTEN) { TRACE_POSTGRESQL_BUFFER_SYNC_WRITTEN(buf_id); BgWriterStats.m_buf_written_checkpoints++; @@ -1756,6 +1778,9 @@ BufferSync(int flags) buf_id = 0; } + /* issue all pending flushes */ + IssuePendingWritebacks(&wb_context); + /* * Update checkpoint statistics. As noted above, this doesn't include * buffers written by other backends or bgwriter scan. @@ -1777,7 +1802,7 @@ BufferSync(int flags) * bgwriter_lru_maxpages to 0.) */ bool -BgBufferSync(void) +BgBufferSync(WritebackContext *wb_context) { /* info obtained from freelist.c */ int strategy_buf_id; @@ -2002,7 +2027,8 @@ BgBufferSync(void) /* Execute the LRU scan */ while (num_to_scan > 0 && reusable_buffers < upcoming_alloc_est) { - int buffer_state = SyncOneBuffer(next_to_clean, true); + int buffer_state = SyncOneBuffer(next_to_clean, true, + wb_context); if (++next_to_clean >= NBuffers) { @@ -2079,10 +2105,11 @@ BgBufferSync(void) * Note: caller must have done ResourceOwnerEnlargeBuffers. */ static int -SyncOneBuffer(int buf_id, bool skip_recently_used) +SyncOneBuffer(int buf_id, bool skip_recently_used, WritebackContext *wb_context) { BufferDesc *bufHdr = GetBufferDescriptor(buf_id); int result = 0; + BufferTag tag; ReservePrivateRefCountEntry(); @@ -2123,8 +2150,13 @@ SyncOneBuffer(int buf_id, bool skip_recently_used) FlushBuffer(bufHdr, NULL); LWLockRelease(BufferDescriptorGetContentLock(bufHdr)); + + tag = bufHdr->tag; + UnpinBuffer(bufHdr, true); + ScheduleBufferTagForWriteback(wb_context, &tag); + return result | BUF_WRITTEN; } @@ -3729,3 +3761,154 @@ rnode_comparator(const void *p1, const void *p2) else return 0; } + +/* + * BufferTag comparator. + */ +static int +buffertag_comparator(const void *a, const void *b) +{ + const BufferTag *ba = (const BufferTag *) a; + const BufferTag *bb = (const BufferTag *) b; + int ret; + + ret = rnode_comparator(&ba->rnode, &bb->rnode); + + if (ret != 0) + return ret; + + if (ba->forkNum < bb->forkNum) + return -1; + if (ba->forkNum > bb->forkNum) + return 1; + + if (ba->blockNum < bb->blockNum) + return -1; + if (ba->blockNum > bb->blockNum) + return 1; + + return 0; +} + +/* + * Initialize a writeback context, discarding potential previous state. + * + * *max_coalesce is a pointer to a variable containing the current maximum + * number of writeback requests that will be coalesced into a bigger one. A + * value <= 0 means that no writeback control will be performed. max_pending + * is a pointer instead of an immediate value, so the coalesce limits can + * easily changed by the GUC mechanism, and so calling code does not have to + * check the current configuration. + */ +void +WritebackContextInit(WritebackContext *context, int *max_pending) +{ + Assert(*max_pending <= WRITEBACK_MAX_PENDING_FLUSHES); + + context->max_pending = max_pending; + context->nr_pending = 0; +} + +/* + * Add buffer to list of pending writeback requests. + */ +void +ScheduleBufferTagForWriteback(WritebackContext *context, BufferTag *tag) +{ + PendingWriteback *pending; + + /* + * Add buffer to the pending writeback array, unless writeback control is + * disabled. + */ + if (*context->max_pending > 0) + { + Assert(*context->max_pending <= WRITEBACK_MAX_PENDING_FLUSHES); + + pending = &context->pending_writebacks[context->nr_pending++]; + + pending->tag = *tag; + } + + /* + * Perform pending flushes if the writeback limit is exceeded. This + * includes the case where previously an item has been added, but control + * is now disabled. + */ + if (context->nr_pending >= *context->max_pending) + IssuePendingWritebacks(context); +} + +/* + * Issue all pending writeback requests, previously scheduled with + * ScheduleBufferTagForWriteback, to the OS. + * + * Because this is only used to improve the OSs IO scheduling we try to never + * error out - it's just a hint. + */ +void +IssuePendingWritebacks(WritebackContext *context) +{ + int i; + + if (context->nr_pending == 0) + return; + + /* + * Executing the writes in-order can make them a lot faster, and allows to + * merge writeback requests to consecutive blocks into larger writebacks. + */ + qsort(&context->pending_writebacks, context->nr_pending, + sizeof(PendingWriteback), buffertag_comparator); + + /* + * Coalesce neighbouring writes, but nothing else. For that we iterate + * through the, now sorted, array of pending flushes, and look forward to + * find all neighbouring (or identical) writes. + */ + for (i = 0; i < context->nr_pending; i++) + { + PendingWriteback *cur; + PendingWriteback *next; + SMgrRelation reln; + int ahead; + BufferTag tag; + Size nblocks = 1; + + cur = &context->pending_writebacks[i]; + tag = cur->tag; + + /* + * Peek ahead, into following writeback requests, to see if they can + * be combined with the current one. + */ + for (ahead = 0; i + ahead + 1 < context->nr_pending; ahead++) + { + next = &context->pending_writebacks[i + ahead + 1]; + + /* different file, stop */ + if (!RelFileNodeEquals(cur->tag.rnode, next->tag.rnode) || + cur->tag.forkNum != next->tag.forkNum) + break; + + /* ok, block queued twice, skip */ + if (cur->tag.blockNum == next->tag.blockNum) + continue; + + /* only merge consecutive writes */ + if (cur->tag.blockNum + 1 != next->tag.blockNum) + break; + + nblocks++; + cur = next; + } + + i += ahead; + + /* and finally tell the kernel to write the data to storage */ + reln = smgropen(tag.rnode, InvalidBackendId); + smgrwriteback(reln, tag.forkNum, tag.blockNum, nblocks); + } + + context->nr_pending = 0; +} diff --git a/src/backend/storage/file/copydir.c b/src/backend/storage/file/copydir.c index 522f42079ed..a51ee815662 100644 --- a/src/backend/storage/file/copydir.c +++ b/src/backend/storage/file/copydir.c @@ -190,9 +190,9 @@ copy_file(char *fromfile, char *tofile) /* * We fsync the files later but first flush them to avoid spamming the * cache and hopefully get the kernel to start writing them out before - * the fsync comes. Ignore any error, since it's only a hint. + * the fsync comes. */ - (void) pg_flush_data(dstfd, offset, nbytes); + pg_flush_data(dstfd, offset, nbytes); } if (CloseTransientFile(dstfd)) diff --git a/src/backend/storage/file/fd.c b/src/backend/storage/file/fd.c index c1076992a33..046d1b3cc30 100644 --- a/src/backend/storage/file/fd.c +++ b/src/backend/storage/file/fd.c @@ -61,6 +61,9 @@ #include <sys/file.h> #include <sys/param.h> #include <sys/stat.h> +#ifndef WIN32 +#include <sys/mman.h> +#endif #include <unistd.h> #include <fcntl.h> #ifdef HAVE_SYS_RESOURCE_H @@ -82,6 +85,8 @@ /* Define PG_FLUSH_DATA_WORKS if we have an implementation for pg_flush_data */ #if defined(HAVE_SYNC_FILE_RANGE) #define PG_FLUSH_DATA_WORKS 1 +#elif !defined(WIN32) && defined(MS_ASYNC) +#define PG_FLUSH_DATA_WORKS 1 #elif defined(USE_POSIX_FADVISE) && defined(POSIX_FADV_DONTNEED) #define PG_FLUSH_DATA_WORKS 1 #endif @@ -383,29 +388,126 @@ pg_fdatasync(int fd) } /* - * pg_flush_data --- advise OS that the data described won't be needed soon + * pg_flush_data --- advise OS that the described dirty data should be flushed * - * Not all platforms have sync_file_range or posix_fadvise; treat as no-op - * if not available. Also, treat as no-op if enableFsync is off; this is - * because the call isn't free, and some platforms such as Linux will actually - * block the requestor until the write is scheduled. + * An offset of 0 with an nbytes 0 means that the entire file should be + * flushed. */ -int -pg_flush_data(int fd, off_t offset, off_t amount) +void +pg_flush_data(int fd, off_t offset, off_t nbytes) { -#ifdef PG_FLUSH_DATA_WORKS - if (enableFsync) - { + /* + * Right now file flushing is primarily used to avoid making later + * fsync()/fdatasync() calls have a less impact. Thus don't trigger + * flushes if fsyncs are disabled - that's a decision we might want to + * make configurable at some point. + */ + if (!enableFsync) + return; + + /* + * XXX: compile all alternatives, to find portability problems more easily + */ #if defined(HAVE_SYNC_FILE_RANGE) - return sync_file_range(fd, offset, amount, SYNC_FILE_RANGE_WRITE); -#elif defined(USE_POSIX_FADVISE) && defined(POSIX_FADV_DONTNEED) - return posix_fadvise(fd, offset, amount, POSIX_FADV_DONTNEED); -#else -#error PG_FLUSH_DATA_WORKS should not have been defined + { + int rc = 0; + + /* + * sync_file_range(SYNC_FILE_RANGE_WRITE), currently linux specific, + * tells the OS that writeback for the passed in blocks should be + * started, but that we don't want to wait for completion. Note that + * this call might block if too much dirty data exists in the range. + * This is the preferrable method on OSs supporting it, as it works + * reliably when available (contrast to msync()) and doesn't flush out + * clean data (like FADV_DONTNEED). + */ + rc = sync_file_range(fd, offset, nbytes, + SYNC_FILE_RANGE_WRITE); + + /* don't error out, this is just a performance optimization */ + if (rc != 0) + { + ereport(WARNING, + (errcode_for_file_access(), + errmsg("could not flush dirty data: %m"))); + } + + return; + } #endif +#if !defined(WIN32) && defined(MS_ASYNC) + { + int rc = 0; + void *p; + + /* + * On several OSs msync(MS_ASYNC) on a mmap'ed file triggers + * writeback. On linux it only does so with MS_SYNC is specified, but + * then it does the writeback synchronously. Luckily all common linux + * systems have sync_file_range(). This is preferrable over + * FADV_DONTNEED because it doesn't flush out clean data. + * + * We map the file (mmap()), tell the kernel to sync back the contents + * (msync()), and then remove the mapping again (munmap()). + */ + p = mmap(NULL, nbytes, + PROT_READ | PROT_WRITE, MAP_SHARED, + fd, offset); + if (p == MAP_FAILED) + { + ereport(WARNING, + (errcode_for_file_access(), + errmsg("could not mmap while flushing dirty data: %m"))); + return; + } + + rc = msync(p, nbytes, MS_ASYNC); + if (rc != 0) + { + ereport(WARNING, + (errcode_for_file_access(), + errmsg("could not flush dirty data: %m"))); + /* NB: need to fall through to munmap()! */ + } + + rc = munmap(p, nbytes); + if (rc != 0) + { + /* FATAL error because mapping would remain */ + ereport(FATAL, + (errcode_for_file_access(), + errmsg("could not munmap while flushing blocks: %m"))); + } + + return; + } +#endif +#if defined(USE_POSIX_FADVISE) && defined(POSIX_FADV_DONTNEED) + { + int rc = 0; + + /* + * Signal the kernel that the passed in range should not be cached + * anymore. This has the, desired, side effect of writing out dirty + * data, and the, undesired, side effect of likely discarding useful + * clean cached blocks. For the latter reason this is the least + * preferrable method. + */ + + rc = posix_fadvise(fd, offset, nbytes, POSIX_FADV_DONTNEED); + + /* don't error out, this is just a performance optimization */ + if (rc != 0) + { + ereport(WARNING, + (errcode_for_file_access(), + errmsg("could not flush dirty data: %m"))); + return; + } + + return; } #endif - return 0; } @@ -1396,6 +1498,24 @@ FilePrefetch(File file, off_t offset, int amount) #endif } +void +FileWriteback(File file, off_t offset, int amount) +{ + int returnCode; + + Assert(FileIsValid(file)); + + DO_DB(elog(LOG, "FileWriteback: %d (%s) " INT64_FORMAT " %d", + file, VfdCache[file].fileName, + (int64) offset, amount)); + + returnCode = FileAccess(file); + if (returnCode < 0) + return; + + pg_flush_data(VfdCache[file].fd, offset, amount); +} + int FileRead(File file, char *buffer, int amount) { @@ -2796,9 +2916,10 @@ pre_sync_fname(const char *fname, bool isdir, int elevel) } /* - * We ignore errors from pg_flush_data() because this is only a hint. + * pg_flush_data() ignores errors, which is ok because this is only a + * hint. */ - (void) pg_flush_data(fd, 0, 0); + pg_flush_data(fd, 0, 0); (void) CloseTransientFile(fd); } diff --git a/src/backend/storage/smgr/md.c b/src/backend/storage/smgr/md.c index f6b79a99689..764cfb53946 100644 --- a/src/backend/storage/smgr/md.c +++ b/src/backend/storage/smgr/md.c @@ -662,6 +662,56 @@ mdprefetch(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum) #endif /* USE_PREFETCH */ } +/* + * mdwriteback() -- Tell the kernel to write pages back to storage. + * + * This accepts a range of blocks because flushing several pages at once is + * considerably more efficient than doing so individually. + */ +void +mdwriteback(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, int nblocks) +{ + /* + * Issue flush requests in as few requests as possible; have to split at + * segment boundaries though, since those are actually separate files. + */ + while (nblocks != 0) + { + int nflush = nblocks; + off_t seekpos; + MdfdVec *v; + int segnum_start, + segnum_end; + + v = _mdfd_getseg(reln, forknum, blocknum, false, + EXTENSION_RETURN_NULL); + + /* + * We might be flushing buffers of already removed relations, that's + * ok, just ignore that case. + */ + if (!v) + return; + + /* compute offset inside the current segment */ + segnum_start = blocknum / RELSEG_SIZE; + + /* compute number of desired writes within the current segment */ + segnum_end = (blocknum + nblocks - 1) / RELSEG_SIZE; + if (segnum_start != segnum_end) + nflush = RELSEG_SIZE - (blocknum % ((BlockNumber) RELSEG_SIZE)); + + Assert(nflush >= 1); + Assert(nflush <= nblocks); + + seekpos = (off_t) BLCKSZ *(blocknum % ((BlockNumber) RELSEG_SIZE)); + + FileWriteback(v->mdfd_vfd, seekpos, BLCKSZ * nflush); + + nblocks -= nflush; + blocknum += nflush; + } +} /* * mdread() -- Read the specified block from a relation. diff --git a/src/backend/storage/smgr/smgr.c b/src/backend/storage/smgr/smgr.c index 87ff3583ff8..c0915c8d889 100644 --- a/src/backend/storage/smgr/smgr.c +++ b/src/backend/storage/smgr/smgr.c @@ -53,6 +53,8 @@ typedef struct f_smgr BlockNumber blocknum, char *buffer); void (*smgr_write) (SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, char *buffer, bool skipFsync); + void (*smgr_writeback) (SMgrRelation reln, ForkNumber forknum, + BlockNumber blocknum, int nblocks); BlockNumber (*smgr_nblocks) (SMgrRelation reln, ForkNumber forknum); void (*smgr_truncate) (SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks); @@ -66,8 +68,8 @@ typedef struct f_smgr static const f_smgr smgrsw[] = { /* magnetic disk */ {mdinit, NULL, mdclose, mdcreate, mdexists, mdunlink, mdextend, - mdprefetch, mdread, mdwrite, mdnblocks, mdtruncate, mdimmedsync, - mdpreckpt, mdsync, mdpostckpt + mdprefetch, mdread, mdwrite, mdwriteback, mdnblocks, mdtruncate, + mdimmedsync, mdpreckpt, mdsync, mdpostckpt } }; @@ -649,6 +651,19 @@ smgrwrite(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, buffer, skipFsync); } + +/* + * smgrwriteback() -- Trigger kernel writeback for the supplied range of + * blocks. + */ +void +smgrwriteback(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, + int nblocks) +{ + (*(smgrsw[reln->smgr_which].smgr_writeback)) (reln, forknum, blocknum, + nblocks); +} + /* * smgrnblocks() -- Calculate the number of blocks in the * supplied relation. |