diff options
Diffstat (limited to 'src/backend/storage/aio')
-rw-r--r-- | src/backend/storage/aio/method_io_uring.c | 210 | ||||
-rw-r--r-- | src/backend/storage/aio/method_worker.c | 62 |
2 files changed, 239 insertions, 33 deletions
diff --git a/src/backend/storage/aio/method_io_uring.c b/src/backend/storage/aio/method_io_uring.c index b78048328e1..0a8c054162f 100644 --- a/src/backend/storage/aio/method_io_uring.c +++ b/src/backend/storage/aio/method_io_uring.c @@ -29,6 +29,9 @@ #ifdef IOMETHOD_IO_URING_ENABLED +#include <sys/mman.h> +#include <unistd.h> + #include <liburing.h> #include "miscadmin.h" @@ -94,12 +97,32 @@ PgAioUringContext struct io_uring io_uring_ring; } PgAioUringContext; +/* + * Information about the capabilities that io_uring has. + * + * Depending on liburing and kernel version different features are + * supported. At least for the kernel a kernel version check does not suffice + * as various vendors do backport features to older kernels :(. + */ +typedef struct PgAioUringCaps +{ + bool checked; + /* -1 if io_uring_queue_init_mem() is unsupported */ + int mem_init_size; +} PgAioUringCaps; + + /* PgAioUringContexts for all backends */ static PgAioUringContext *pgaio_uring_contexts; /* the current backend's context */ static PgAioUringContext *pgaio_my_uring_context; +static PgAioUringCaps pgaio_uring_caps = +{ + .checked = false, + .mem_init_size = -1, +}; static uint32 pgaio_uring_procs(void) @@ -111,16 +134,145 @@ pgaio_uring_procs(void) return MaxBackends + NUM_AUXILIARY_PROCS - MAX_IO_WORKERS; } -static Size +/* + * Initializes pgaio_uring_caps, unless that's already done. + */ +static void +pgaio_uring_check_capabilities(void) +{ + if (pgaio_uring_caps.checked) + return; + + /* + * By default io_uring creates a shared memory mapping for each io_uring + * instance, leading to a large number of memory mappings. Unfortunately a + * large number of memory mappings slows things down, backend exit is + * particularly affected. To address that, newer kernels (6.5) support + * using user-provided memory for the memory, by putting the relevant + * memory into shared memory we don't need any additional mappings. + * + * To know whether this is supported, we unfortunately need to probe the + * kernel by trying to create a ring with userspace-provided memory. This + * also has a secondary benefit: We can determine precisely how much + * memory we need for each io_uring instance. + */ +#if defined(HAVE_LIBURING_QUEUE_INIT_MEM) && defined(IORING_SETUP_NO_MMAP) + { + struct io_uring test_ring; + size_t ring_size; + void *ring_ptr; + struct io_uring_params p = {0}; + int ret; + + /* + * Liburing does not yet provide an API to query how much memory a + * ring will need. So we over-estimate it here. As the memory is freed + * just below that's small temporary waste of memory. + * + * 1MB is more than enough for rings within io_max_concurrency's + * range. + */ + ring_size = 1024 * 1024; + + /* + * Hard to believe a system exists where 1MB would not be a multiple + * of the page size. But it's cheap to ensure... + */ + ring_size -= ring_size % sysconf(_SC_PAGESIZE); + + ring_ptr = mmap(NULL, ring_size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0); + if (ring_ptr == MAP_FAILED) + elog(ERROR, + "mmap(%zu) to determine io_uring_queue_init_mem() support failed: %m", + ring_size); + + ret = io_uring_queue_init_mem(io_max_concurrency, &test_ring, &p, ring_ptr, ring_size); + if (ret > 0) + { + pgaio_uring_caps.mem_init_size = ret; + + elog(DEBUG1, + "can use combined memory mapping for io_uring, each ring needs %d bytes", + ret); + + /* clean up the created ring, it was just for a test */ + io_uring_queue_exit(&test_ring); + } + else + { + /* + * There are different reasons for ring creation to fail, but it's + * ok to treat that just as io_uring_queue_init_mem() not being + * supported. We'll report a more detailed error in + * pgaio_uring_shmem_init(). + */ + errno = -ret; + elog(DEBUG1, + "cannot use combined memory mapping for io_uring, ring creation failed: %m"); + + } + + if (munmap(ring_ptr, ring_size) != 0) + elog(ERROR, "munmap() failed: %m"); + } +#else + { + elog(DEBUG1, + "can't use combined memory mapping for io_uring, kernel or liburing too old"); + } +#endif + + pgaio_uring_caps.checked = true; +} + +/* + * Memory for all PgAioUringContext instances + */ +static size_t pgaio_uring_context_shmem_size(void) { return mul_size(pgaio_uring_procs(), sizeof(PgAioUringContext)); } +/* + * Memory for the combined memory used by io_uring instances. Returns 0 if + * that is not supported by kernel/liburing. + */ +static size_t +pgaio_uring_ring_shmem_size(void) +{ + size_t sz = 0; + + if (pgaio_uring_caps.mem_init_size > 0) + { + /* + * Memory for rings needs to be allocated to the page boundary, + * reserve space. Luckily it does not need to be aligned to hugepage + * boundaries, even if huge pages are used. + */ + sz = add_size(sz, sysconf(_SC_PAGESIZE)); + sz = add_size(sz, mul_size(pgaio_uring_procs(), + pgaio_uring_caps.mem_init_size)); + } + + return sz; +} + static size_t pgaio_uring_shmem_size(void) { - return pgaio_uring_context_shmem_size(); + size_t sz; + + /* + * Kernel and liburing support for various features influences how much + * shmem we need, perform the necessary checks. + */ + pgaio_uring_check_capabilities(); + + sz = pgaio_uring_context_shmem_size(); + sz = add_size(sz, pgaio_uring_ring_shmem_size()); + + return sz; } static void @@ -128,13 +280,38 @@ pgaio_uring_shmem_init(bool first_time) { int TotalProcs = pgaio_uring_procs(); bool found; + char *shmem; + size_t ring_mem_remain = 0; + char *ring_mem_next = 0; - pgaio_uring_contexts = (PgAioUringContext *) - ShmemInitStruct("AioUring", pgaio_uring_shmem_size(), &found); - + /* + * We allocate memory for all PgAioUringContext instances and, if + * supported, the memory required for each of the io_uring instances, in + * one ShmemInitStruct(). + */ + shmem = ShmemInitStruct("AioUringContext", pgaio_uring_shmem_size(), &found); if (found) return; + pgaio_uring_contexts = (PgAioUringContext *) shmem; + shmem += pgaio_uring_context_shmem_size(); + + /* if supported, handle memory alignment / sizing for io_uring memory */ + if (pgaio_uring_caps.mem_init_size > 0) + { + ring_mem_remain = pgaio_uring_ring_shmem_size(); + ring_mem_next = (char *) shmem; + + /* align to page boundary, see also pgaio_uring_ring_shmem_size() */ + ring_mem_next = (char *) TYPEALIGN(sysconf(_SC_PAGESIZE), ring_mem_next); + + /* account for alignment */ + ring_mem_remain -= ring_mem_next - shmem; + shmem += ring_mem_next - shmem; + + shmem += ring_mem_remain; + } + for (int contextno = 0; contextno < TotalProcs; contextno++) { PgAioUringContext *context = &pgaio_uring_contexts[contextno]; @@ -158,7 +335,28 @@ pgaio_uring_shmem_init(bool first_time) * be worth using that - also need to evaluate if that causes * noticeable additional contention? */ - ret = io_uring_queue_init(io_max_concurrency, &context->io_uring_ring, 0); + + /* + * If supported (c.f. pgaio_uring_check_capabilities()), create ring + * with its data in shared memory. Otherwise fall back io_uring + * creating a memory mapping for each ring. + */ +#if defined(HAVE_LIBURING_QUEUE_INIT_MEM) && defined(IORING_SETUP_NO_MMAP) + if (pgaio_uring_caps.mem_init_size > 0) + { + struct io_uring_params p = {0}; + + ret = io_uring_queue_init_mem(io_max_concurrency, &context->io_uring_ring, &p, ring_mem_next, ring_mem_remain); + + ring_mem_remain -= ret; + ring_mem_next += ret; + } + else +#endif + { + ret = io_uring_queue_init(io_max_concurrency, &context->io_uring_ring, 0); + } + if (ret < 0) { char *hint = NULL; diff --git a/src/backend/storage/aio/method_worker.c b/src/backend/storage/aio/method_worker.c index 36be179678d..bf8f77e6ff6 100644 --- a/src/backend/storage/aio/method_worker.c +++ b/src/backend/storage/aio/method_worker.c @@ -52,26 +52,26 @@ #define IO_WORKER_WAKEUP_FANOUT 2 -typedef struct AioWorkerSubmissionQueue +typedef struct PgAioWorkerSubmissionQueue { uint32 size; uint32 mask; uint32 head; uint32 tail; - uint32 ios[FLEXIBLE_ARRAY_MEMBER]; -} AioWorkerSubmissionQueue; + uint32 sqes[FLEXIBLE_ARRAY_MEMBER]; +} PgAioWorkerSubmissionQueue; -typedef struct AioWorkerSlot +typedef struct PgAioWorkerSlot { Latch *latch; bool in_use; -} AioWorkerSlot; +} PgAioWorkerSlot; -typedef struct AioWorkerControl +typedef struct PgAioWorkerControl { uint64 idle_worker_mask; - AioWorkerSlot workers[FLEXIBLE_ARRAY_MEMBER]; -} AioWorkerControl; + PgAioWorkerSlot workers[FLEXIBLE_ARRAY_MEMBER]; +} PgAioWorkerControl; static size_t pgaio_worker_shmem_size(void); @@ -96,8 +96,8 @@ int io_workers = 3; static int io_worker_queue_size = 64; static int MyIoWorkerId; -static AioWorkerSubmissionQueue *io_worker_submission_queue; -static AioWorkerControl *io_worker_control; +static PgAioWorkerSubmissionQueue *io_worker_submission_queue; +static PgAioWorkerControl *io_worker_control; static size_t @@ -106,15 +106,15 @@ pgaio_worker_queue_shmem_size(int *queue_size) /* Round size up to next power of two so we can make a mask. */ *queue_size = pg_nextpower2_32(io_worker_queue_size); - return offsetof(AioWorkerSubmissionQueue, ios) + + return offsetof(PgAioWorkerSubmissionQueue, sqes) + sizeof(uint32) * *queue_size; } static size_t pgaio_worker_control_shmem_size(void) { - return offsetof(AioWorkerControl, workers) + - sizeof(AioWorkerSlot) * MAX_IO_WORKERS; + return offsetof(PgAioWorkerControl, workers) + + sizeof(PgAioWorkerSlot) * MAX_IO_WORKERS; } static size_t @@ -162,7 +162,7 @@ pgaio_worker_shmem_init(bool first_time) } static int -pgaio_choose_idle_worker(void) +pgaio_worker_choose_idle(void) { int worker; @@ -172,6 +172,7 @@ pgaio_choose_idle_worker(void) /* Find the lowest bit position, and clear it. */ worker = pg_rightmost_one_pos64(io_worker_control->idle_worker_mask); io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << worker); + Assert(io_worker_control->workers[worker].in_use); return worker; } @@ -179,7 +180,7 @@ pgaio_choose_idle_worker(void) static bool pgaio_worker_submission_queue_insert(PgAioHandle *ioh) { - AioWorkerSubmissionQueue *queue; + PgAioWorkerSubmissionQueue *queue; uint32 new_head; queue = io_worker_submission_queue; @@ -191,7 +192,7 @@ pgaio_worker_submission_queue_insert(PgAioHandle *ioh) return false; /* full */ } - queue->ios[queue->head] = pgaio_io_get_id(ioh); + queue->sqes[queue->head] = pgaio_io_get_id(ioh); queue->head = new_head; return true; @@ -200,14 +201,14 @@ pgaio_worker_submission_queue_insert(PgAioHandle *ioh) static uint32 pgaio_worker_submission_queue_consume(void) { - AioWorkerSubmissionQueue *queue; + PgAioWorkerSubmissionQueue *queue; uint32 result; queue = io_worker_submission_queue; if (queue->tail == queue->head) return UINT32_MAX; /* empty */ - result = queue->ios[queue->tail]; + result = queue->sqes[queue->tail]; queue->tail = (queue->tail + 1) & (queue->size - 1); return result; @@ -240,37 +241,37 @@ pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh) } static void -pgaio_worker_submit_internal(int nios, PgAioHandle *ios[]) +pgaio_worker_submit_internal(int num_staged_ios, PgAioHandle **staged_ios) { PgAioHandle *synchronous_ios[PGAIO_SUBMIT_BATCH_SIZE]; int nsync = 0; Latch *wakeup = NULL; int worker; - Assert(nios <= PGAIO_SUBMIT_BATCH_SIZE); + Assert(num_staged_ios <= PGAIO_SUBMIT_BATCH_SIZE); LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE); - for (int i = 0; i < nios; ++i) + for (int i = 0; i < num_staged_ios; ++i) { - Assert(!pgaio_worker_needs_synchronous_execution(ios[i])); - if (!pgaio_worker_submission_queue_insert(ios[i])) + Assert(!pgaio_worker_needs_synchronous_execution(staged_ios[i])); + if (!pgaio_worker_submission_queue_insert(staged_ios[i])) { /* * We'll do it synchronously, but only after we've sent as many as * we can to workers, to maximize concurrency. */ - synchronous_ios[nsync++] = ios[i]; + synchronous_ios[nsync++] = staged_ios[i]; continue; } if (wakeup == NULL) { /* Choose an idle worker to wake up if we haven't already. */ - worker = pgaio_choose_idle_worker(); + worker = pgaio_worker_choose_idle(); if (worker >= 0) wakeup = io_worker_control->workers[worker].latch; - pgaio_debug_io(DEBUG4, ios[i], + pgaio_debug_io(DEBUG4, staged_ios[i], "choosing worker %d", worker); } @@ -316,6 +317,7 @@ pgaio_worker_die(int code, Datum arg) Assert(io_worker_control->workers[MyIoWorkerId].in_use); Assert(io_worker_control->workers[MyIoWorkerId].latch == MyLatch); + io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << MyIoWorkerId); io_worker_control->workers[MyIoWorkerId].in_use = false; io_worker_control->workers[MyIoWorkerId].latch = NULL; LWLockRelease(AioWorkerSubmissionQueueLock); @@ -488,7 +490,7 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len) IO_WORKER_WAKEUP_FANOUT); for (int i = 0; i < nwakeups; ++i) { - if ((worker = pgaio_choose_idle_worker()) < 0) + if ((worker = pgaio_worker_choose_idle()) < 0) break; latches[nlatches++] = io_worker_control->workers[worker].latch; } @@ -573,6 +575,12 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len) } CHECK_FOR_INTERRUPTS(); + + if (ConfigReloadPending) + { + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + } } error_context_stack = errcallback.previous; |