diff options
Diffstat (limited to 'src/backend/storage/aio')
-rw-r--r-- | src/backend/storage/aio/aio.c | 140 | ||||
-rw-r--r-- | src/backend/storage/aio/aio_callback.c | 7 | ||||
-rw-r--r-- | src/backend/storage/aio/aio_io.c | 4 | ||||
-rw-r--r-- | src/backend/storage/aio/method_io_uring.c | 218 | ||||
-rw-r--r-- | src/backend/storage/aio/method_worker.c | 69 |
5 files changed, 379 insertions, 59 deletions
diff --git a/src/backend/storage/aio/aio.c b/src/backend/storage/aio/aio.c index ebb5a771bfd..3643f27ad6e 100644 --- a/src/backend/storage/aio/aio.c +++ b/src/backend/storage/aio/aio.c @@ -184,6 +184,8 @@ pgaio_io_acquire(struct ResourceOwnerData *resowner, PgAioReturn *ret) PgAioHandle * pgaio_io_acquire_nb(struct ResourceOwnerData *resowner, PgAioReturn *ret) { + PgAioHandle *ioh = NULL; + if (pgaio_my_backend->num_staged_ios >= PGAIO_SUBMIT_BATCH_SIZE) { Assert(pgaio_my_backend->num_staged_ios == PGAIO_SUBMIT_BATCH_SIZE); @@ -193,10 +195,17 @@ pgaio_io_acquire_nb(struct ResourceOwnerData *resowner, PgAioReturn *ret) if (pgaio_my_backend->handed_out_io) elog(ERROR, "API violation: Only one IO can be handed out"); + /* + * Probably not needed today, as interrupts should not process this IO, + * but... + */ + HOLD_INTERRUPTS(); + if (!dclist_is_empty(&pgaio_my_backend->idle_ios)) { dlist_node *ion = dclist_pop_head_node(&pgaio_my_backend->idle_ios); - PgAioHandle *ioh = dclist_container(PgAioHandle, node, ion); + + ioh = dclist_container(PgAioHandle, node, ion); Assert(ioh->state == PGAIO_HS_IDLE); Assert(ioh->owner_procno == MyProcNumber); @@ -212,11 +221,11 @@ pgaio_io_acquire_nb(struct ResourceOwnerData *resowner, PgAioReturn *ret) ioh->report_return = ret; ret->result.status = PGAIO_RS_UNKNOWN; } - - return ioh; } - return NULL; + RESUME_INTERRUPTS(); + + return ioh; } /* @@ -233,6 +242,12 @@ pgaio_io_release(PgAioHandle *ioh) Assert(ioh->resowner); pgaio_my_backend->handed_out_io = NULL; + + /* + * Note that no interrupts are processed between the handed_out_io + * check and the call to reclaim - that's important as otherwise an + * interrupt could have already reclaimed the handle. + */ pgaio_io_reclaim(ioh); } else @@ -251,6 +266,12 @@ pgaio_io_release_resowner(dlist_node *ioh_node, bool on_error) Assert(ioh->resowner); + /* + * Otherwise an interrupt, in the middle of releasing the IO, could end up + * trying to wait for the IO, leading to state confusion. + */ + HOLD_INTERRUPTS(); + ResourceOwnerForgetAioHandle(ioh->resowner, &ioh->resowner_node); ioh->resowner = NULL; @@ -291,6 +312,8 @@ pgaio_io_release_resowner(dlist_node *ioh_node, bool on_error) */ if (ioh->report_return) ioh->report_return = NULL; + + RESUME_INTERRUPTS(); } /* @@ -359,6 +382,13 @@ pgaio_io_get_wref(PgAioHandle *ioh, PgAioWaitRef *iow) static inline void pgaio_io_update_state(PgAioHandle *ioh, PgAioHandleState new_state) { + /* + * All callers need to have held interrupts in some form, otherwise + * interrupt processing could wait for the IO to complete, while in an + * intermediary state. + */ + Assert(!INTERRUPTS_CAN_BE_PROCESSED()); + pgaio_debug_io(DEBUG5, ioh, "updating state to %s", pgaio_io_state_get_name(new_state)); @@ -396,6 +426,13 @@ pgaio_io_stage(PgAioHandle *ioh, PgAioOp op) Assert(pgaio_my_backend->handed_out_io == ioh); Assert(pgaio_io_has_target(ioh)); + /* + * Otherwise an interrupt, in the middle of staging and possibly executing + * the IO, could end up trying to wait for the IO, leading to state + * confusion. + */ + HOLD_INTERRUPTS(); + ioh->op = op; ioh->result = 0; @@ -435,6 +472,8 @@ pgaio_io_stage(PgAioHandle *ioh, PgAioOp op) pgaio_io_prepare_submit(ioh); pgaio_io_perform_synchronously(ioh); } + + RESUME_INTERRUPTS(); } bool @@ -517,6 +556,13 @@ bool pgaio_io_was_recycled(PgAioHandle *ioh, uint64 ref_generation, PgAioHandleState *state) { *state = ioh->state; + + /* + * Ensure that we don't see an earlier state of the handle than ioh->state + * due to compiler or CPU reordering. This protects both ->generation as + * directly used here, and other fields in the handle accessed in the + * caller if the handle was not reused. + */ pg_read_barrier(); return ioh->generation != ref_generation; @@ -544,8 +590,8 @@ pgaio_io_wait(PgAioHandle *ioh, uint64 ref_generation) && state != PGAIO_HS_COMPLETED_SHARED && state != PGAIO_HS_COMPLETED_LOCAL) { - elog(PANIC, "waiting for own IO in wrong state: %d", - state); + elog(PANIC, "waiting for own IO %d in wrong state: %s", + pgaio_io_get_id(ioh), pgaio_io_get_state_name(ioh)); } } @@ -599,7 +645,13 @@ pgaio_io_wait(PgAioHandle *ioh, uint64 ref_generation) case PGAIO_HS_COMPLETED_SHARED: case PGAIO_HS_COMPLETED_LOCAL: - /* see above */ + + /* + * Note that no interrupts are processed between + * pgaio_io_was_recycled() and this check - that's important + * as otherwise an interrupt could have already reclaimed the + * handle. + */ if (am_owner) pgaio_io_reclaim(ioh); return; @@ -610,6 +662,11 @@ pgaio_io_wait(PgAioHandle *ioh, uint64 ref_generation) /* * Make IO handle ready to be reused after IO has completed or after the * handle has been released without being used. + * + * Note that callers need to be careful about only calling this in the right + * state and that no interrupts can be processed between the state check and + * the call to pgaio_io_reclaim(). Otherwise interrupt processing could + * already have reclaimed the handle. */ static void pgaio_io_reclaim(PgAioHandle *ioh) @@ -618,6 +675,9 @@ pgaio_io_reclaim(PgAioHandle *ioh) Assert(ioh->owner_procno == MyProcNumber); Assert(ioh->state != PGAIO_HS_IDLE); + /* see comment in function header */ + HOLD_INTERRUPTS(); + /* * It's a bit ugly, but right now the easiest place to put the execution * of local completion callbacks is this function, as we need to execute @@ -685,6 +745,8 @@ pgaio_io_reclaim(PgAioHandle *ioh) * efficient in cases where only a few IOs are used. */ dclist_push_head(&pgaio_my_backend->idle_ios, &ioh->node); + + RESUME_INTERRUPTS(); } /* @@ -697,10 +759,10 @@ pgaio_io_wait_for_free(void) { int reclaimed = 0; - pgaio_debug(DEBUG2, "waiting for free IO with %d pending, %d in-flight, %d idle IOs", + pgaio_debug(DEBUG2, "waiting for free IO with %d pending, %u in-flight, %u idle IOs", pgaio_my_backend->num_staged_ios, dclist_count(&pgaio_my_backend->in_flight_ios), - dclist_is_empty(&pgaio_my_backend->idle_ios)); + dclist_count(&pgaio_my_backend->idle_ios)); /* * First check if any of our IOs actually have completed - when using @@ -714,6 +776,16 @@ pgaio_io_wait_for_free(void) if (ioh->state == PGAIO_HS_COMPLETED_SHARED) { + /* + * Note that no interrupts are processed between the state check + * and the call to reclaim - that's important as otherwise an + * interrupt could have already reclaimed the handle. + * + * Need to ensure that there's no reordering, in the more common + * paths, where we wait for IO, that's done by + * pgaio_io_was_recycled(). + */ + pg_read_barrier(); pgaio_io_reclaim(ioh); reclaimed++; } @@ -730,13 +802,17 @@ pgaio_io_wait_for_free(void) if (pgaio_my_backend->num_staged_ios > 0) pgaio_submit_staged(); + /* possibly some IOs finished during submission */ + if (!dclist_is_empty(&pgaio_my_backend->idle_ios)) + return; + if (dclist_count(&pgaio_my_backend->in_flight_ios) == 0) ereport(ERROR, errmsg_internal("no free IOs despite no in-flight IOs"), - errdetail_internal("%d pending, %d in-flight, %d idle IOs", + errdetail_internal("%d pending, %u in-flight, %u idle IOs", pgaio_my_backend->num_staged_ios, dclist_count(&pgaio_my_backend->in_flight_ios), - dclist_is_empty(&pgaio_my_backend->idle_ios))); + dclist_count(&pgaio_my_backend->idle_ios))); /* * Wait for the oldest in-flight IO to complete. @@ -747,6 +823,7 @@ pgaio_io_wait_for_free(void) { PgAioHandle *ioh = dclist_head_element(PgAioHandle, node, &pgaio_my_backend->in_flight_ios); + uint64 generation = ioh->generation; switch (ioh->state) { @@ -763,20 +840,36 @@ pgaio_io_wait_for_free(void) case PGAIO_HS_COMPLETED_IO: case PGAIO_HS_SUBMITTED: pgaio_debug_io(DEBUG2, ioh, - "waiting for free io with %d in flight", + "waiting for free io with %u in flight", dclist_count(&pgaio_my_backend->in_flight_ios)); /* * In a more general case this would be racy, because the * generation could increase after we read ioh->state above. * But we are only looking at IOs by the current backend and - * the IO can only be recycled by this backend. + * the IO can only be recycled by this backend. Even this is + * only OK because we get the handle's generation before + * potentially processing interrupts, e.g. as part of + * pgaio_debug_io(). */ - pgaio_io_wait(ioh, ioh->generation); + pgaio_io_wait(ioh, generation); break; case PGAIO_HS_COMPLETED_SHARED: - /* it's possible that another backend just finished this IO */ + + /* + * It's possible that another backend just finished this IO. + * + * Note that no interrupts are processed between the state + * check and the call to reclaim - that's important as + * otherwise an interrupt could have already reclaimed the + * handle. + * + * Need to ensure that there's no reordering, in the more + * common paths, where we wait for IO, that's done by + * pgaio_io_was_recycled(). + */ + pg_read_barrier(); pgaio_io_reclaim(ioh); break; } @@ -926,6 +1019,11 @@ pgaio_wref_check_done(PgAioWaitRef *iow) if (state == PGAIO_HS_COMPLETED_SHARED || state == PGAIO_HS_COMPLETED_LOCAL) { + /* + * Note that no interrupts are processed between + * pgaio_io_was_recycled() and this check - that's important as + * otherwise an interrupt could have already reclaimed the handle. + */ if (am_owner) pgaio_io_reclaim(ioh); return true; @@ -1153,11 +1251,14 @@ pgaio_closing_fd(int fd) { dlist_iter iter; PgAioHandle *ioh = NULL; + uint64 generation; dclist_foreach(iter, &pgaio_my_backend->in_flight_ios) { ioh = dclist_container(PgAioHandle, node, iter.cur); + generation = ioh->generation; + if (pgaio_io_uses_fd(ioh, fd)) break; else @@ -1168,11 +1269,11 @@ pgaio_closing_fd(int fd) break; pgaio_debug_io(DEBUG2, ioh, - "waiting for IO before FD %d gets closed, %d in-flight IOs", + "waiting for IO before FD %d gets closed, %u in-flight IOs", fd, dclist_count(&pgaio_my_backend->in_flight_ios)); /* see comment in pgaio_io_wait_for_free() about raciness */ - pgaio_io_wait(ioh, ioh->generation); + pgaio_io_wait(ioh, generation); } } } @@ -1201,13 +1302,14 @@ pgaio_shutdown(int code, Datum arg) while (!dclist_is_empty(&pgaio_my_backend->in_flight_ios)) { PgAioHandle *ioh = dclist_head_element(PgAioHandle, node, &pgaio_my_backend->in_flight_ios); + uint64 generation = ioh->generation; pgaio_debug_io(DEBUG2, ioh, - "waiting for IO to complete during shutdown, %d in-flight IOs", + "waiting for IO to complete during shutdown, %u in-flight IOs", dclist_count(&pgaio_my_backend->in_flight_ios)); /* see comment in pgaio_io_wait_for_free() about raciness */ - pgaio_io_wait(ioh, ioh->generation); + pgaio_io_wait(ioh, generation); } pgaio_my_backend = NULL; diff --git a/src/backend/storage/aio/aio_callback.c b/src/backend/storage/aio/aio_callback.c index 0ad9795bb7e..03c9bba0802 100644 --- a/src/backend/storage/aio/aio_callback.c +++ b/src/backend/storage/aio/aio_callback.c @@ -256,6 +256,9 @@ pgaio_io_call_complete_shared(PgAioHandle *ioh) pgaio_result_status_string(result.status), result.id, result.error_data, result.result); result = ce->cb->complete_shared(ioh, result, cb_data); + + /* the callback should never transition to unknown */ + Assert(result.status != PGAIO_RS_UNKNOWN); } ioh->distilled_result = result; @@ -290,6 +293,7 @@ pgaio_io_call_complete_local(PgAioHandle *ioh) /* start with distilled result from shared callback */ result = ioh->distilled_result; + Assert(result.status != PGAIO_RS_UNKNOWN); for (int i = ioh->num_callbacks; i > 0; i--) { @@ -306,6 +310,9 @@ pgaio_io_call_complete_local(PgAioHandle *ioh) pgaio_result_status_string(result.status), result.id, result.error_data, result.result); result = ce->cb->complete_local(ioh, result, cb_data); + + /* the callback should never transition to unknown */ + Assert(result.status != PGAIO_RS_UNKNOWN); } /* diff --git a/src/backend/storage/aio/aio_io.c b/src/backend/storage/aio/aio_io.c index 00e176135a6..520b5077df2 100644 --- a/src/backend/storage/aio/aio_io.c +++ b/src/backend/storage/aio/aio_io.c @@ -181,9 +181,9 @@ pgaio_io_get_op_name(PgAioHandle *ioh) case PGAIO_OP_INVALID: return "invalid"; case PGAIO_OP_READV: - return "read"; + return "readv"; case PGAIO_OP_WRITEV: - return "write"; + return "writev"; } return NULL; /* silence compiler */ diff --git a/src/backend/storage/aio/method_io_uring.c b/src/backend/storage/aio/method_io_uring.c index c719ba2727a..0a8c054162f 100644 --- a/src/backend/storage/aio/method_io_uring.c +++ b/src/backend/storage/aio/method_io_uring.c @@ -29,6 +29,9 @@ #ifdef IOMETHOD_IO_URING_ENABLED +#include <sys/mman.h> +#include <unistd.h> + #include <liburing.h> #include "miscadmin.h" @@ -94,12 +97,32 @@ PgAioUringContext struct io_uring io_uring_ring; } PgAioUringContext; +/* + * Information about the capabilities that io_uring has. + * + * Depending on liburing and kernel version different features are + * supported. At least for the kernel a kernel version check does not suffice + * as various vendors do backport features to older kernels :(. + */ +typedef struct PgAioUringCaps +{ + bool checked; + /* -1 if io_uring_queue_init_mem() is unsupported */ + int mem_init_size; +} PgAioUringCaps; + + /* PgAioUringContexts for all backends */ static PgAioUringContext *pgaio_uring_contexts; /* the current backend's context */ static PgAioUringContext *pgaio_my_uring_context; +static PgAioUringCaps pgaio_uring_caps = +{ + .checked = false, + .mem_init_size = -1, +}; static uint32 pgaio_uring_procs(void) @@ -111,30 +134,184 @@ pgaio_uring_procs(void) return MaxBackends + NUM_AUXILIARY_PROCS - MAX_IO_WORKERS; } -static Size +/* + * Initializes pgaio_uring_caps, unless that's already done. + */ +static void +pgaio_uring_check_capabilities(void) +{ + if (pgaio_uring_caps.checked) + return; + + /* + * By default io_uring creates a shared memory mapping for each io_uring + * instance, leading to a large number of memory mappings. Unfortunately a + * large number of memory mappings slows things down, backend exit is + * particularly affected. To address that, newer kernels (6.5) support + * using user-provided memory for the memory, by putting the relevant + * memory into shared memory we don't need any additional mappings. + * + * To know whether this is supported, we unfortunately need to probe the + * kernel by trying to create a ring with userspace-provided memory. This + * also has a secondary benefit: We can determine precisely how much + * memory we need for each io_uring instance. + */ +#if defined(HAVE_LIBURING_QUEUE_INIT_MEM) && defined(IORING_SETUP_NO_MMAP) + { + struct io_uring test_ring; + size_t ring_size; + void *ring_ptr; + struct io_uring_params p = {0}; + int ret; + + /* + * Liburing does not yet provide an API to query how much memory a + * ring will need. So we over-estimate it here. As the memory is freed + * just below that's small temporary waste of memory. + * + * 1MB is more than enough for rings within io_max_concurrency's + * range. + */ + ring_size = 1024 * 1024; + + /* + * Hard to believe a system exists where 1MB would not be a multiple + * of the page size. But it's cheap to ensure... + */ + ring_size -= ring_size % sysconf(_SC_PAGESIZE); + + ring_ptr = mmap(NULL, ring_size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0); + if (ring_ptr == MAP_FAILED) + elog(ERROR, + "mmap(%zu) to determine io_uring_queue_init_mem() support failed: %m", + ring_size); + + ret = io_uring_queue_init_mem(io_max_concurrency, &test_ring, &p, ring_ptr, ring_size); + if (ret > 0) + { + pgaio_uring_caps.mem_init_size = ret; + + elog(DEBUG1, + "can use combined memory mapping for io_uring, each ring needs %d bytes", + ret); + + /* clean up the created ring, it was just for a test */ + io_uring_queue_exit(&test_ring); + } + else + { + /* + * There are different reasons for ring creation to fail, but it's + * ok to treat that just as io_uring_queue_init_mem() not being + * supported. We'll report a more detailed error in + * pgaio_uring_shmem_init(). + */ + errno = -ret; + elog(DEBUG1, + "cannot use combined memory mapping for io_uring, ring creation failed: %m"); + + } + + if (munmap(ring_ptr, ring_size) != 0) + elog(ERROR, "munmap() failed: %m"); + } +#else + { + elog(DEBUG1, + "can't use combined memory mapping for io_uring, kernel or liburing too old"); + } +#endif + + pgaio_uring_caps.checked = true; +} + +/* + * Memory for all PgAioUringContext instances + */ +static size_t pgaio_uring_context_shmem_size(void) { return mul_size(pgaio_uring_procs(), sizeof(PgAioUringContext)); } +/* + * Memory for the combined memory used by io_uring instances. Returns 0 if + * that is not supported by kernel/liburing. + */ +static size_t +pgaio_uring_ring_shmem_size(void) +{ + size_t sz = 0; + + if (pgaio_uring_caps.mem_init_size > 0) + { + /* + * Memory for rings needs to be allocated to the page boundary, + * reserve space. Luckily it does not need to be aligned to hugepage + * boundaries, even if huge pages are used. + */ + sz = add_size(sz, sysconf(_SC_PAGESIZE)); + sz = add_size(sz, mul_size(pgaio_uring_procs(), + pgaio_uring_caps.mem_init_size)); + } + + return sz; +} + static size_t pgaio_uring_shmem_size(void) { - return pgaio_uring_context_shmem_size(); + size_t sz; + + /* + * Kernel and liburing support for various features influences how much + * shmem we need, perform the necessary checks. + */ + pgaio_uring_check_capabilities(); + + sz = pgaio_uring_context_shmem_size(); + sz = add_size(sz, pgaio_uring_ring_shmem_size()); + + return sz; } static void pgaio_uring_shmem_init(bool first_time) { - int TotalProcs = MaxBackends + NUM_AUXILIARY_PROCS - MAX_IO_WORKERS; + int TotalProcs = pgaio_uring_procs(); bool found; + char *shmem; + size_t ring_mem_remain = 0; + char *ring_mem_next = 0; - pgaio_uring_contexts = (PgAioUringContext *) - ShmemInitStruct("AioUring", pgaio_uring_shmem_size(), &found); - + /* + * We allocate memory for all PgAioUringContext instances and, if + * supported, the memory required for each of the io_uring instances, in + * one ShmemInitStruct(). + */ + shmem = ShmemInitStruct("AioUringContext", pgaio_uring_shmem_size(), &found); if (found) return; + pgaio_uring_contexts = (PgAioUringContext *) shmem; + shmem += pgaio_uring_context_shmem_size(); + + /* if supported, handle memory alignment / sizing for io_uring memory */ + if (pgaio_uring_caps.mem_init_size > 0) + { + ring_mem_remain = pgaio_uring_ring_shmem_size(); + ring_mem_next = (char *) shmem; + + /* align to page boundary, see also pgaio_uring_ring_shmem_size() */ + ring_mem_next = (char *) TYPEALIGN(sysconf(_SC_PAGESIZE), ring_mem_next); + + /* account for alignment */ + ring_mem_remain -= ring_mem_next - shmem; + shmem += ring_mem_next - shmem; + + shmem += ring_mem_remain; + } + for (int contextno = 0; contextno < TotalProcs; contextno++) { PgAioUringContext *context = &pgaio_uring_contexts[contextno]; @@ -158,7 +335,28 @@ pgaio_uring_shmem_init(bool first_time) * be worth using that - also need to evaluate if that causes * noticeable additional contention? */ - ret = io_uring_queue_init(io_max_concurrency, &context->io_uring_ring, 0); + + /* + * If supported (c.f. pgaio_uring_check_capabilities()), create ring + * with its data in shared memory. Otherwise fall back io_uring + * creating a memory mapping for each ring. + */ +#if defined(HAVE_LIBURING_QUEUE_INIT_MEM) && defined(IORING_SETUP_NO_MMAP) + if (pgaio_uring_caps.mem_init_size > 0) + { + struct io_uring_params p = {0}; + + ret = io_uring_queue_init_mem(io_max_concurrency, &context->io_uring_ring, &p, ring_mem_next, ring_mem_remain); + + ring_mem_remain -= ret; + ring_mem_next += ret; + } + else +#endif + { + ret = io_uring_queue_init(io_max_concurrency, &context->io_uring_ring, 0); + } + if (ret < 0) { char *hint = NULL; @@ -400,9 +598,9 @@ pgaio_uring_wait_one(PgAioHandle *ioh, uint64 ref_generation) while (true) { pgaio_debug_io(DEBUG3, ioh, - "wait_one io_gen: %llu, ref_gen: %llu, cycle %d", - (long long unsigned) ioh->generation, - (long long unsigned) ref_generation, + "wait_one io_gen: %" PRIu64 ", ref_gen: %" PRIu64 ", cycle %d", + ioh->generation, + ref_generation, waited); if (pgaio_io_was_recycled(ioh, ref_generation, &state) || diff --git a/src/backend/storage/aio/method_worker.c b/src/backend/storage/aio/method_worker.c index 743cccc2acd..bf8f77e6ff6 100644 --- a/src/backend/storage/aio/method_worker.c +++ b/src/backend/storage/aio/method_worker.c @@ -52,26 +52,26 @@ #define IO_WORKER_WAKEUP_FANOUT 2 -typedef struct AioWorkerSubmissionQueue +typedef struct PgAioWorkerSubmissionQueue { uint32 size; uint32 mask; uint32 head; uint32 tail; - uint32 ios[FLEXIBLE_ARRAY_MEMBER]; -} AioWorkerSubmissionQueue; + uint32 sqes[FLEXIBLE_ARRAY_MEMBER]; +} PgAioWorkerSubmissionQueue; -typedef struct AioWorkerSlot +typedef struct PgAioWorkerSlot { Latch *latch; bool in_use; -} AioWorkerSlot; +} PgAioWorkerSlot; -typedef struct AioWorkerControl +typedef struct PgAioWorkerControl { uint64 idle_worker_mask; - AioWorkerSlot workers[FLEXIBLE_ARRAY_MEMBER]; -} AioWorkerControl; + PgAioWorkerSlot workers[FLEXIBLE_ARRAY_MEMBER]; +} PgAioWorkerControl; static size_t pgaio_worker_shmem_size(void); @@ -96,8 +96,8 @@ int io_workers = 3; static int io_worker_queue_size = 64; static int MyIoWorkerId; -static AioWorkerSubmissionQueue *io_worker_submission_queue; -static AioWorkerControl *io_worker_control; +static PgAioWorkerSubmissionQueue *io_worker_submission_queue; +static PgAioWorkerControl *io_worker_control; static size_t @@ -106,15 +106,15 @@ pgaio_worker_queue_shmem_size(int *queue_size) /* Round size up to next power of two so we can make a mask. */ *queue_size = pg_nextpower2_32(io_worker_queue_size); - return offsetof(AioWorkerSubmissionQueue, ios) + + return offsetof(PgAioWorkerSubmissionQueue, sqes) + sizeof(uint32) * *queue_size; } static size_t pgaio_worker_control_shmem_size(void) { - return offsetof(AioWorkerControl, workers) + - sizeof(AioWorkerSlot) * MAX_IO_WORKERS; + return offsetof(PgAioWorkerControl, workers) + + sizeof(PgAioWorkerSlot) * MAX_IO_WORKERS; } static size_t @@ -162,7 +162,7 @@ pgaio_worker_shmem_init(bool first_time) } static int -pgaio_choose_idle_worker(void) +pgaio_worker_choose_idle(void) { int worker; @@ -172,6 +172,7 @@ pgaio_choose_idle_worker(void) /* Find the lowest bit position, and clear it. */ worker = pg_rightmost_one_pos64(io_worker_control->idle_worker_mask); io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << worker); + Assert(io_worker_control->workers[worker].in_use); return worker; } @@ -179,7 +180,7 @@ pgaio_choose_idle_worker(void) static bool pgaio_worker_submission_queue_insert(PgAioHandle *ioh) { - AioWorkerSubmissionQueue *queue; + PgAioWorkerSubmissionQueue *queue; uint32 new_head; queue = io_worker_submission_queue; @@ -191,7 +192,7 @@ pgaio_worker_submission_queue_insert(PgAioHandle *ioh) return false; /* full */ } - queue->ios[queue->head] = pgaio_io_get_id(ioh); + queue->sqes[queue->head] = pgaio_io_get_id(ioh); queue->head = new_head; return true; @@ -200,14 +201,14 @@ pgaio_worker_submission_queue_insert(PgAioHandle *ioh) static uint32 pgaio_worker_submission_queue_consume(void) { - AioWorkerSubmissionQueue *queue; + PgAioWorkerSubmissionQueue *queue; uint32 result; queue = io_worker_submission_queue; if (queue->tail == queue->head) return UINT32_MAX; /* empty */ - result = queue->ios[queue->tail]; + result = queue->sqes[queue->tail]; queue->tail = (queue->tail + 1) & (queue->size - 1); return result; @@ -240,37 +241,37 @@ pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh) } static void -pgaio_worker_submit_internal(int nios, PgAioHandle *ios[]) +pgaio_worker_submit_internal(int num_staged_ios, PgAioHandle **staged_ios) { PgAioHandle *synchronous_ios[PGAIO_SUBMIT_BATCH_SIZE]; int nsync = 0; Latch *wakeup = NULL; int worker; - Assert(nios <= PGAIO_SUBMIT_BATCH_SIZE); + Assert(num_staged_ios <= PGAIO_SUBMIT_BATCH_SIZE); LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE); - for (int i = 0; i < nios; ++i) + for (int i = 0; i < num_staged_ios; ++i) { - Assert(!pgaio_worker_needs_synchronous_execution(ios[i])); - if (!pgaio_worker_submission_queue_insert(ios[i])) + Assert(!pgaio_worker_needs_synchronous_execution(staged_ios[i])); + if (!pgaio_worker_submission_queue_insert(staged_ios[i])) { /* * We'll do it synchronously, but only after we've sent as many as * we can to workers, to maximize concurrency. */ - synchronous_ios[nsync++] = ios[i]; + synchronous_ios[nsync++] = staged_ios[i]; continue; } if (wakeup == NULL) { /* Choose an idle worker to wake up if we haven't already. */ - worker = pgaio_choose_idle_worker(); + worker = pgaio_worker_choose_idle(); if (worker >= 0) wakeup = io_worker_control->workers[worker].latch; - pgaio_debug_io(DEBUG4, ios[i], + pgaio_debug_io(DEBUG4, staged_ios[i], "choosing worker %d", worker); } @@ -316,6 +317,7 @@ pgaio_worker_die(int code, Datum arg) Assert(io_worker_control->workers[MyIoWorkerId].in_use); Assert(io_worker_control->workers[MyIoWorkerId].latch == MyLatch); + io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << MyIoWorkerId); io_worker_control->workers[MyIoWorkerId].in_use = false; io_worker_control->workers[MyIoWorkerId].latch = NULL; LWLockRelease(AioWorkerSubmissionQueueLock); @@ -461,7 +463,12 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len) int nwakeups = 0; int worker; - /* Try to get a job to do. */ + /* + * Try to get a job to do. + * + * The lwlock acquisition also provides the necessary memory barrier + * to ensure that we don't see an outdated data in the handle. + */ LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE); if ((io_index = pgaio_worker_submission_queue_consume()) == UINT32_MAX) { @@ -483,7 +490,7 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len) IO_WORKER_WAKEUP_FANOUT); for (int i = 0; i < nwakeups; ++i) { - if ((worker = pgaio_choose_idle_worker()) < 0) + if ((worker = pgaio_worker_choose_idle()) < 0) break; latches[nlatches++] = io_worker_control->workers[worker].latch; } @@ -568,6 +575,12 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len) } CHECK_FOR_INTERRUPTS(); + + if (ConfigReloadPending) + { + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + } } error_context_stack = errcallback.previous; |