aboutsummaryrefslogtreecommitdiff
path: root/src/backend/storage/aio
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/storage/aio')
-rw-r--r--src/backend/storage/aio/aio.c140
-rw-r--r--src/backend/storage/aio/aio_callback.c7
-rw-r--r--src/backend/storage/aio/aio_io.c4
-rw-r--r--src/backend/storage/aio/method_io_uring.c218
-rw-r--r--src/backend/storage/aio/method_worker.c69
5 files changed, 379 insertions, 59 deletions
diff --git a/src/backend/storage/aio/aio.c b/src/backend/storage/aio/aio.c
index ebb5a771bfd..3643f27ad6e 100644
--- a/src/backend/storage/aio/aio.c
+++ b/src/backend/storage/aio/aio.c
@@ -184,6 +184,8 @@ pgaio_io_acquire(struct ResourceOwnerData *resowner, PgAioReturn *ret)
PgAioHandle *
pgaio_io_acquire_nb(struct ResourceOwnerData *resowner, PgAioReturn *ret)
{
+ PgAioHandle *ioh = NULL;
+
if (pgaio_my_backend->num_staged_ios >= PGAIO_SUBMIT_BATCH_SIZE)
{
Assert(pgaio_my_backend->num_staged_ios == PGAIO_SUBMIT_BATCH_SIZE);
@@ -193,10 +195,17 @@ pgaio_io_acquire_nb(struct ResourceOwnerData *resowner, PgAioReturn *ret)
if (pgaio_my_backend->handed_out_io)
elog(ERROR, "API violation: Only one IO can be handed out");
+ /*
+ * Probably not needed today, as interrupts should not process this IO,
+ * but...
+ */
+ HOLD_INTERRUPTS();
+
if (!dclist_is_empty(&pgaio_my_backend->idle_ios))
{
dlist_node *ion = dclist_pop_head_node(&pgaio_my_backend->idle_ios);
- PgAioHandle *ioh = dclist_container(PgAioHandle, node, ion);
+
+ ioh = dclist_container(PgAioHandle, node, ion);
Assert(ioh->state == PGAIO_HS_IDLE);
Assert(ioh->owner_procno == MyProcNumber);
@@ -212,11 +221,11 @@ pgaio_io_acquire_nb(struct ResourceOwnerData *resowner, PgAioReturn *ret)
ioh->report_return = ret;
ret->result.status = PGAIO_RS_UNKNOWN;
}
-
- return ioh;
}
- return NULL;
+ RESUME_INTERRUPTS();
+
+ return ioh;
}
/*
@@ -233,6 +242,12 @@ pgaio_io_release(PgAioHandle *ioh)
Assert(ioh->resowner);
pgaio_my_backend->handed_out_io = NULL;
+
+ /*
+ * Note that no interrupts are processed between the handed_out_io
+ * check and the call to reclaim - that's important as otherwise an
+ * interrupt could have already reclaimed the handle.
+ */
pgaio_io_reclaim(ioh);
}
else
@@ -251,6 +266,12 @@ pgaio_io_release_resowner(dlist_node *ioh_node, bool on_error)
Assert(ioh->resowner);
+ /*
+ * Otherwise an interrupt, in the middle of releasing the IO, could end up
+ * trying to wait for the IO, leading to state confusion.
+ */
+ HOLD_INTERRUPTS();
+
ResourceOwnerForgetAioHandle(ioh->resowner, &ioh->resowner_node);
ioh->resowner = NULL;
@@ -291,6 +312,8 @@ pgaio_io_release_resowner(dlist_node *ioh_node, bool on_error)
*/
if (ioh->report_return)
ioh->report_return = NULL;
+
+ RESUME_INTERRUPTS();
}
/*
@@ -359,6 +382,13 @@ pgaio_io_get_wref(PgAioHandle *ioh, PgAioWaitRef *iow)
static inline void
pgaio_io_update_state(PgAioHandle *ioh, PgAioHandleState new_state)
{
+ /*
+ * All callers need to have held interrupts in some form, otherwise
+ * interrupt processing could wait for the IO to complete, while in an
+ * intermediary state.
+ */
+ Assert(!INTERRUPTS_CAN_BE_PROCESSED());
+
pgaio_debug_io(DEBUG5, ioh,
"updating state to %s",
pgaio_io_state_get_name(new_state));
@@ -396,6 +426,13 @@ pgaio_io_stage(PgAioHandle *ioh, PgAioOp op)
Assert(pgaio_my_backend->handed_out_io == ioh);
Assert(pgaio_io_has_target(ioh));
+ /*
+ * Otherwise an interrupt, in the middle of staging and possibly executing
+ * the IO, could end up trying to wait for the IO, leading to state
+ * confusion.
+ */
+ HOLD_INTERRUPTS();
+
ioh->op = op;
ioh->result = 0;
@@ -435,6 +472,8 @@ pgaio_io_stage(PgAioHandle *ioh, PgAioOp op)
pgaio_io_prepare_submit(ioh);
pgaio_io_perform_synchronously(ioh);
}
+
+ RESUME_INTERRUPTS();
}
bool
@@ -517,6 +556,13 @@ bool
pgaio_io_was_recycled(PgAioHandle *ioh, uint64 ref_generation, PgAioHandleState *state)
{
*state = ioh->state;
+
+ /*
+ * Ensure that we don't see an earlier state of the handle than ioh->state
+ * due to compiler or CPU reordering. This protects both ->generation as
+ * directly used here, and other fields in the handle accessed in the
+ * caller if the handle was not reused.
+ */
pg_read_barrier();
return ioh->generation != ref_generation;
@@ -544,8 +590,8 @@ pgaio_io_wait(PgAioHandle *ioh, uint64 ref_generation)
&& state != PGAIO_HS_COMPLETED_SHARED
&& state != PGAIO_HS_COMPLETED_LOCAL)
{
- elog(PANIC, "waiting for own IO in wrong state: %d",
- state);
+ elog(PANIC, "waiting for own IO %d in wrong state: %s",
+ pgaio_io_get_id(ioh), pgaio_io_get_state_name(ioh));
}
}
@@ -599,7 +645,13 @@ pgaio_io_wait(PgAioHandle *ioh, uint64 ref_generation)
case PGAIO_HS_COMPLETED_SHARED:
case PGAIO_HS_COMPLETED_LOCAL:
- /* see above */
+
+ /*
+ * Note that no interrupts are processed between
+ * pgaio_io_was_recycled() and this check - that's important
+ * as otherwise an interrupt could have already reclaimed the
+ * handle.
+ */
if (am_owner)
pgaio_io_reclaim(ioh);
return;
@@ -610,6 +662,11 @@ pgaio_io_wait(PgAioHandle *ioh, uint64 ref_generation)
/*
* Make IO handle ready to be reused after IO has completed or after the
* handle has been released without being used.
+ *
+ * Note that callers need to be careful about only calling this in the right
+ * state and that no interrupts can be processed between the state check and
+ * the call to pgaio_io_reclaim(). Otherwise interrupt processing could
+ * already have reclaimed the handle.
*/
static void
pgaio_io_reclaim(PgAioHandle *ioh)
@@ -618,6 +675,9 @@ pgaio_io_reclaim(PgAioHandle *ioh)
Assert(ioh->owner_procno == MyProcNumber);
Assert(ioh->state != PGAIO_HS_IDLE);
+ /* see comment in function header */
+ HOLD_INTERRUPTS();
+
/*
* It's a bit ugly, but right now the easiest place to put the execution
* of local completion callbacks is this function, as we need to execute
@@ -685,6 +745,8 @@ pgaio_io_reclaim(PgAioHandle *ioh)
* efficient in cases where only a few IOs are used.
*/
dclist_push_head(&pgaio_my_backend->idle_ios, &ioh->node);
+
+ RESUME_INTERRUPTS();
}
/*
@@ -697,10 +759,10 @@ pgaio_io_wait_for_free(void)
{
int reclaimed = 0;
- pgaio_debug(DEBUG2, "waiting for free IO with %d pending, %d in-flight, %d idle IOs",
+ pgaio_debug(DEBUG2, "waiting for free IO with %d pending, %u in-flight, %u idle IOs",
pgaio_my_backend->num_staged_ios,
dclist_count(&pgaio_my_backend->in_flight_ios),
- dclist_is_empty(&pgaio_my_backend->idle_ios));
+ dclist_count(&pgaio_my_backend->idle_ios));
/*
* First check if any of our IOs actually have completed - when using
@@ -714,6 +776,16 @@ pgaio_io_wait_for_free(void)
if (ioh->state == PGAIO_HS_COMPLETED_SHARED)
{
+ /*
+ * Note that no interrupts are processed between the state check
+ * and the call to reclaim - that's important as otherwise an
+ * interrupt could have already reclaimed the handle.
+ *
+ * Need to ensure that there's no reordering, in the more common
+ * paths, where we wait for IO, that's done by
+ * pgaio_io_was_recycled().
+ */
+ pg_read_barrier();
pgaio_io_reclaim(ioh);
reclaimed++;
}
@@ -730,13 +802,17 @@ pgaio_io_wait_for_free(void)
if (pgaio_my_backend->num_staged_ios > 0)
pgaio_submit_staged();
+ /* possibly some IOs finished during submission */
+ if (!dclist_is_empty(&pgaio_my_backend->idle_ios))
+ return;
+
if (dclist_count(&pgaio_my_backend->in_flight_ios) == 0)
ereport(ERROR,
errmsg_internal("no free IOs despite no in-flight IOs"),
- errdetail_internal("%d pending, %d in-flight, %d idle IOs",
+ errdetail_internal("%d pending, %u in-flight, %u idle IOs",
pgaio_my_backend->num_staged_ios,
dclist_count(&pgaio_my_backend->in_flight_ios),
- dclist_is_empty(&pgaio_my_backend->idle_ios)));
+ dclist_count(&pgaio_my_backend->idle_ios)));
/*
* Wait for the oldest in-flight IO to complete.
@@ -747,6 +823,7 @@ pgaio_io_wait_for_free(void)
{
PgAioHandle *ioh = dclist_head_element(PgAioHandle, node,
&pgaio_my_backend->in_flight_ios);
+ uint64 generation = ioh->generation;
switch (ioh->state)
{
@@ -763,20 +840,36 @@ pgaio_io_wait_for_free(void)
case PGAIO_HS_COMPLETED_IO:
case PGAIO_HS_SUBMITTED:
pgaio_debug_io(DEBUG2, ioh,
- "waiting for free io with %d in flight",
+ "waiting for free io with %u in flight",
dclist_count(&pgaio_my_backend->in_flight_ios));
/*
* In a more general case this would be racy, because the
* generation could increase after we read ioh->state above.
* But we are only looking at IOs by the current backend and
- * the IO can only be recycled by this backend.
+ * the IO can only be recycled by this backend. Even this is
+ * only OK because we get the handle's generation before
+ * potentially processing interrupts, e.g. as part of
+ * pgaio_debug_io().
*/
- pgaio_io_wait(ioh, ioh->generation);
+ pgaio_io_wait(ioh, generation);
break;
case PGAIO_HS_COMPLETED_SHARED:
- /* it's possible that another backend just finished this IO */
+
+ /*
+ * It's possible that another backend just finished this IO.
+ *
+ * Note that no interrupts are processed between the state
+ * check and the call to reclaim - that's important as
+ * otherwise an interrupt could have already reclaimed the
+ * handle.
+ *
+ * Need to ensure that there's no reordering, in the more
+ * common paths, where we wait for IO, that's done by
+ * pgaio_io_was_recycled().
+ */
+ pg_read_barrier();
pgaio_io_reclaim(ioh);
break;
}
@@ -926,6 +1019,11 @@ pgaio_wref_check_done(PgAioWaitRef *iow)
if (state == PGAIO_HS_COMPLETED_SHARED ||
state == PGAIO_HS_COMPLETED_LOCAL)
{
+ /*
+ * Note that no interrupts are processed between
+ * pgaio_io_was_recycled() and this check - that's important as
+ * otherwise an interrupt could have already reclaimed the handle.
+ */
if (am_owner)
pgaio_io_reclaim(ioh);
return true;
@@ -1153,11 +1251,14 @@ pgaio_closing_fd(int fd)
{
dlist_iter iter;
PgAioHandle *ioh = NULL;
+ uint64 generation;
dclist_foreach(iter, &pgaio_my_backend->in_flight_ios)
{
ioh = dclist_container(PgAioHandle, node, iter.cur);
+ generation = ioh->generation;
+
if (pgaio_io_uses_fd(ioh, fd))
break;
else
@@ -1168,11 +1269,11 @@ pgaio_closing_fd(int fd)
break;
pgaio_debug_io(DEBUG2, ioh,
- "waiting for IO before FD %d gets closed, %d in-flight IOs",
+ "waiting for IO before FD %d gets closed, %u in-flight IOs",
fd, dclist_count(&pgaio_my_backend->in_flight_ios));
/* see comment in pgaio_io_wait_for_free() about raciness */
- pgaio_io_wait(ioh, ioh->generation);
+ pgaio_io_wait(ioh, generation);
}
}
}
@@ -1201,13 +1302,14 @@ pgaio_shutdown(int code, Datum arg)
while (!dclist_is_empty(&pgaio_my_backend->in_flight_ios))
{
PgAioHandle *ioh = dclist_head_element(PgAioHandle, node, &pgaio_my_backend->in_flight_ios);
+ uint64 generation = ioh->generation;
pgaio_debug_io(DEBUG2, ioh,
- "waiting for IO to complete during shutdown, %d in-flight IOs",
+ "waiting for IO to complete during shutdown, %u in-flight IOs",
dclist_count(&pgaio_my_backend->in_flight_ios));
/* see comment in pgaio_io_wait_for_free() about raciness */
- pgaio_io_wait(ioh, ioh->generation);
+ pgaio_io_wait(ioh, generation);
}
pgaio_my_backend = NULL;
diff --git a/src/backend/storage/aio/aio_callback.c b/src/backend/storage/aio/aio_callback.c
index 0ad9795bb7e..03c9bba0802 100644
--- a/src/backend/storage/aio/aio_callback.c
+++ b/src/backend/storage/aio/aio_callback.c
@@ -256,6 +256,9 @@ pgaio_io_call_complete_shared(PgAioHandle *ioh)
pgaio_result_status_string(result.status),
result.id, result.error_data, result.result);
result = ce->cb->complete_shared(ioh, result, cb_data);
+
+ /* the callback should never transition to unknown */
+ Assert(result.status != PGAIO_RS_UNKNOWN);
}
ioh->distilled_result = result;
@@ -290,6 +293,7 @@ pgaio_io_call_complete_local(PgAioHandle *ioh)
/* start with distilled result from shared callback */
result = ioh->distilled_result;
+ Assert(result.status != PGAIO_RS_UNKNOWN);
for (int i = ioh->num_callbacks; i > 0; i--)
{
@@ -306,6 +310,9 @@ pgaio_io_call_complete_local(PgAioHandle *ioh)
pgaio_result_status_string(result.status),
result.id, result.error_data, result.result);
result = ce->cb->complete_local(ioh, result, cb_data);
+
+ /* the callback should never transition to unknown */
+ Assert(result.status != PGAIO_RS_UNKNOWN);
}
/*
diff --git a/src/backend/storage/aio/aio_io.c b/src/backend/storage/aio/aio_io.c
index 00e176135a6..520b5077df2 100644
--- a/src/backend/storage/aio/aio_io.c
+++ b/src/backend/storage/aio/aio_io.c
@@ -181,9 +181,9 @@ pgaio_io_get_op_name(PgAioHandle *ioh)
case PGAIO_OP_INVALID:
return "invalid";
case PGAIO_OP_READV:
- return "read";
+ return "readv";
case PGAIO_OP_WRITEV:
- return "write";
+ return "writev";
}
return NULL; /* silence compiler */
diff --git a/src/backend/storage/aio/method_io_uring.c b/src/backend/storage/aio/method_io_uring.c
index c719ba2727a..0a8c054162f 100644
--- a/src/backend/storage/aio/method_io_uring.c
+++ b/src/backend/storage/aio/method_io_uring.c
@@ -29,6 +29,9 @@
#ifdef IOMETHOD_IO_URING_ENABLED
+#include <sys/mman.h>
+#include <unistd.h>
+
#include <liburing.h>
#include "miscadmin.h"
@@ -94,12 +97,32 @@ PgAioUringContext
struct io_uring io_uring_ring;
} PgAioUringContext;
+/*
+ * Information about the capabilities that io_uring has.
+ *
+ * Depending on liburing and kernel version different features are
+ * supported. At least for the kernel a kernel version check does not suffice
+ * as various vendors do backport features to older kernels :(.
+ */
+typedef struct PgAioUringCaps
+{
+ bool checked;
+ /* -1 if io_uring_queue_init_mem() is unsupported */
+ int mem_init_size;
+} PgAioUringCaps;
+
+
/* PgAioUringContexts for all backends */
static PgAioUringContext *pgaio_uring_contexts;
/* the current backend's context */
static PgAioUringContext *pgaio_my_uring_context;
+static PgAioUringCaps pgaio_uring_caps =
+{
+ .checked = false,
+ .mem_init_size = -1,
+};
static uint32
pgaio_uring_procs(void)
@@ -111,30 +134,184 @@ pgaio_uring_procs(void)
return MaxBackends + NUM_AUXILIARY_PROCS - MAX_IO_WORKERS;
}
-static Size
+/*
+ * Initializes pgaio_uring_caps, unless that's already done.
+ */
+static void
+pgaio_uring_check_capabilities(void)
+{
+ if (pgaio_uring_caps.checked)
+ return;
+
+ /*
+ * By default io_uring creates a shared memory mapping for each io_uring
+ * instance, leading to a large number of memory mappings. Unfortunately a
+ * large number of memory mappings slows things down, backend exit is
+ * particularly affected. To address that, newer kernels (6.5) support
+ * using user-provided memory for the memory, by putting the relevant
+ * memory into shared memory we don't need any additional mappings.
+ *
+ * To know whether this is supported, we unfortunately need to probe the
+ * kernel by trying to create a ring with userspace-provided memory. This
+ * also has a secondary benefit: We can determine precisely how much
+ * memory we need for each io_uring instance.
+ */
+#if defined(HAVE_LIBURING_QUEUE_INIT_MEM) && defined(IORING_SETUP_NO_MMAP)
+ {
+ struct io_uring test_ring;
+ size_t ring_size;
+ void *ring_ptr;
+ struct io_uring_params p = {0};
+ int ret;
+
+ /*
+ * Liburing does not yet provide an API to query how much memory a
+ * ring will need. So we over-estimate it here. As the memory is freed
+ * just below that's small temporary waste of memory.
+ *
+ * 1MB is more than enough for rings within io_max_concurrency's
+ * range.
+ */
+ ring_size = 1024 * 1024;
+
+ /*
+ * Hard to believe a system exists where 1MB would not be a multiple
+ * of the page size. But it's cheap to ensure...
+ */
+ ring_size -= ring_size % sysconf(_SC_PAGESIZE);
+
+ ring_ptr = mmap(NULL, ring_size, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0);
+ if (ring_ptr == MAP_FAILED)
+ elog(ERROR,
+ "mmap(%zu) to determine io_uring_queue_init_mem() support failed: %m",
+ ring_size);
+
+ ret = io_uring_queue_init_mem(io_max_concurrency, &test_ring, &p, ring_ptr, ring_size);
+ if (ret > 0)
+ {
+ pgaio_uring_caps.mem_init_size = ret;
+
+ elog(DEBUG1,
+ "can use combined memory mapping for io_uring, each ring needs %d bytes",
+ ret);
+
+ /* clean up the created ring, it was just for a test */
+ io_uring_queue_exit(&test_ring);
+ }
+ else
+ {
+ /*
+ * There are different reasons for ring creation to fail, but it's
+ * ok to treat that just as io_uring_queue_init_mem() not being
+ * supported. We'll report a more detailed error in
+ * pgaio_uring_shmem_init().
+ */
+ errno = -ret;
+ elog(DEBUG1,
+ "cannot use combined memory mapping for io_uring, ring creation failed: %m");
+
+ }
+
+ if (munmap(ring_ptr, ring_size) != 0)
+ elog(ERROR, "munmap() failed: %m");
+ }
+#else
+ {
+ elog(DEBUG1,
+ "can't use combined memory mapping for io_uring, kernel or liburing too old");
+ }
+#endif
+
+ pgaio_uring_caps.checked = true;
+}
+
+/*
+ * Memory for all PgAioUringContext instances
+ */
+static size_t
pgaio_uring_context_shmem_size(void)
{
return mul_size(pgaio_uring_procs(), sizeof(PgAioUringContext));
}
+/*
+ * Memory for the combined memory used by io_uring instances. Returns 0 if
+ * that is not supported by kernel/liburing.
+ */
+static size_t
+pgaio_uring_ring_shmem_size(void)
+{
+ size_t sz = 0;
+
+ if (pgaio_uring_caps.mem_init_size > 0)
+ {
+ /*
+ * Memory for rings needs to be allocated to the page boundary,
+ * reserve space. Luckily it does not need to be aligned to hugepage
+ * boundaries, even if huge pages are used.
+ */
+ sz = add_size(sz, sysconf(_SC_PAGESIZE));
+ sz = add_size(sz, mul_size(pgaio_uring_procs(),
+ pgaio_uring_caps.mem_init_size));
+ }
+
+ return sz;
+}
+
static size_t
pgaio_uring_shmem_size(void)
{
- return pgaio_uring_context_shmem_size();
+ size_t sz;
+
+ /*
+ * Kernel and liburing support for various features influences how much
+ * shmem we need, perform the necessary checks.
+ */
+ pgaio_uring_check_capabilities();
+
+ sz = pgaio_uring_context_shmem_size();
+ sz = add_size(sz, pgaio_uring_ring_shmem_size());
+
+ return sz;
}
static void
pgaio_uring_shmem_init(bool first_time)
{
- int TotalProcs = MaxBackends + NUM_AUXILIARY_PROCS - MAX_IO_WORKERS;
+ int TotalProcs = pgaio_uring_procs();
bool found;
+ char *shmem;
+ size_t ring_mem_remain = 0;
+ char *ring_mem_next = 0;
- pgaio_uring_contexts = (PgAioUringContext *)
- ShmemInitStruct("AioUring", pgaio_uring_shmem_size(), &found);
-
+ /*
+ * We allocate memory for all PgAioUringContext instances and, if
+ * supported, the memory required for each of the io_uring instances, in
+ * one ShmemInitStruct().
+ */
+ shmem = ShmemInitStruct("AioUringContext", pgaio_uring_shmem_size(), &found);
if (found)
return;
+ pgaio_uring_contexts = (PgAioUringContext *) shmem;
+ shmem += pgaio_uring_context_shmem_size();
+
+ /* if supported, handle memory alignment / sizing for io_uring memory */
+ if (pgaio_uring_caps.mem_init_size > 0)
+ {
+ ring_mem_remain = pgaio_uring_ring_shmem_size();
+ ring_mem_next = (char *) shmem;
+
+ /* align to page boundary, see also pgaio_uring_ring_shmem_size() */
+ ring_mem_next = (char *) TYPEALIGN(sysconf(_SC_PAGESIZE), ring_mem_next);
+
+ /* account for alignment */
+ ring_mem_remain -= ring_mem_next - shmem;
+ shmem += ring_mem_next - shmem;
+
+ shmem += ring_mem_remain;
+ }
+
for (int contextno = 0; contextno < TotalProcs; contextno++)
{
PgAioUringContext *context = &pgaio_uring_contexts[contextno];
@@ -158,7 +335,28 @@ pgaio_uring_shmem_init(bool first_time)
* be worth using that - also need to evaluate if that causes
* noticeable additional contention?
*/
- ret = io_uring_queue_init(io_max_concurrency, &context->io_uring_ring, 0);
+
+ /*
+ * If supported (c.f. pgaio_uring_check_capabilities()), create ring
+ * with its data in shared memory. Otherwise fall back io_uring
+ * creating a memory mapping for each ring.
+ */
+#if defined(HAVE_LIBURING_QUEUE_INIT_MEM) && defined(IORING_SETUP_NO_MMAP)
+ if (pgaio_uring_caps.mem_init_size > 0)
+ {
+ struct io_uring_params p = {0};
+
+ ret = io_uring_queue_init_mem(io_max_concurrency, &context->io_uring_ring, &p, ring_mem_next, ring_mem_remain);
+
+ ring_mem_remain -= ret;
+ ring_mem_next += ret;
+ }
+ else
+#endif
+ {
+ ret = io_uring_queue_init(io_max_concurrency, &context->io_uring_ring, 0);
+ }
+
if (ret < 0)
{
char *hint = NULL;
@@ -400,9 +598,9 @@ pgaio_uring_wait_one(PgAioHandle *ioh, uint64 ref_generation)
while (true)
{
pgaio_debug_io(DEBUG3, ioh,
- "wait_one io_gen: %llu, ref_gen: %llu, cycle %d",
- (long long unsigned) ioh->generation,
- (long long unsigned) ref_generation,
+ "wait_one io_gen: %" PRIu64 ", ref_gen: %" PRIu64 ", cycle %d",
+ ioh->generation,
+ ref_generation,
waited);
if (pgaio_io_was_recycled(ioh, ref_generation, &state) ||
diff --git a/src/backend/storage/aio/method_worker.c b/src/backend/storage/aio/method_worker.c
index 743cccc2acd..bf8f77e6ff6 100644
--- a/src/backend/storage/aio/method_worker.c
+++ b/src/backend/storage/aio/method_worker.c
@@ -52,26 +52,26 @@
#define IO_WORKER_WAKEUP_FANOUT 2
-typedef struct AioWorkerSubmissionQueue
+typedef struct PgAioWorkerSubmissionQueue
{
uint32 size;
uint32 mask;
uint32 head;
uint32 tail;
- uint32 ios[FLEXIBLE_ARRAY_MEMBER];
-} AioWorkerSubmissionQueue;
+ uint32 sqes[FLEXIBLE_ARRAY_MEMBER];
+} PgAioWorkerSubmissionQueue;
-typedef struct AioWorkerSlot
+typedef struct PgAioWorkerSlot
{
Latch *latch;
bool in_use;
-} AioWorkerSlot;
+} PgAioWorkerSlot;
-typedef struct AioWorkerControl
+typedef struct PgAioWorkerControl
{
uint64 idle_worker_mask;
- AioWorkerSlot workers[FLEXIBLE_ARRAY_MEMBER];
-} AioWorkerControl;
+ PgAioWorkerSlot workers[FLEXIBLE_ARRAY_MEMBER];
+} PgAioWorkerControl;
static size_t pgaio_worker_shmem_size(void);
@@ -96,8 +96,8 @@ int io_workers = 3;
static int io_worker_queue_size = 64;
static int MyIoWorkerId;
-static AioWorkerSubmissionQueue *io_worker_submission_queue;
-static AioWorkerControl *io_worker_control;
+static PgAioWorkerSubmissionQueue *io_worker_submission_queue;
+static PgAioWorkerControl *io_worker_control;
static size_t
@@ -106,15 +106,15 @@ pgaio_worker_queue_shmem_size(int *queue_size)
/* Round size up to next power of two so we can make a mask. */
*queue_size = pg_nextpower2_32(io_worker_queue_size);
- return offsetof(AioWorkerSubmissionQueue, ios) +
+ return offsetof(PgAioWorkerSubmissionQueue, sqes) +
sizeof(uint32) * *queue_size;
}
static size_t
pgaio_worker_control_shmem_size(void)
{
- return offsetof(AioWorkerControl, workers) +
- sizeof(AioWorkerSlot) * MAX_IO_WORKERS;
+ return offsetof(PgAioWorkerControl, workers) +
+ sizeof(PgAioWorkerSlot) * MAX_IO_WORKERS;
}
static size_t
@@ -162,7 +162,7 @@ pgaio_worker_shmem_init(bool first_time)
}
static int
-pgaio_choose_idle_worker(void)
+pgaio_worker_choose_idle(void)
{
int worker;
@@ -172,6 +172,7 @@ pgaio_choose_idle_worker(void)
/* Find the lowest bit position, and clear it. */
worker = pg_rightmost_one_pos64(io_worker_control->idle_worker_mask);
io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << worker);
+ Assert(io_worker_control->workers[worker].in_use);
return worker;
}
@@ -179,7 +180,7 @@ pgaio_choose_idle_worker(void)
static bool
pgaio_worker_submission_queue_insert(PgAioHandle *ioh)
{
- AioWorkerSubmissionQueue *queue;
+ PgAioWorkerSubmissionQueue *queue;
uint32 new_head;
queue = io_worker_submission_queue;
@@ -191,7 +192,7 @@ pgaio_worker_submission_queue_insert(PgAioHandle *ioh)
return false; /* full */
}
- queue->ios[queue->head] = pgaio_io_get_id(ioh);
+ queue->sqes[queue->head] = pgaio_io_get_id(ioh);
queue->head = new_head;
return true;
@@ -200,14 +201,14 @@ pgaio_worker_submission_queue_insert(PgAioHandle *ioh)
static uint32
pgaio_worker_submission_queue_consume(void)
{
- AioWorkerSubmissionQueue *queue;
+ PgAioWorkerSubmissionQueue *queue;
uint32 result;
queue = io_worker_submission_queue;
if (queue->tail == queue->head)
return UINT32_MAX; /* empty */
- result = queue->ios[queue->tail];
+ result = queue->sqes[queue->tail];
queue->tail = (queue->tail + 1) & (queue->size - 1);
return result;
@@ -240,37 +241,37 @@ pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh)
}
static void
-pgaio_worker_submit_internal(int nios, PgAioHandle *ios[])
+pgaio_worker_submit_internal(int num_staged_ios, PgAioHandle **staged_ios)
{
PgAioHandle *synchronous_ios[PGAIO_SUBMIT_BATCH_SIZE];
int nsync = 0;
Latch *wakeup = NULL;
int worker;
- Assert(nios <= PGAIO_SUBMIT_BATCH_SIZE);
+ Assert(num_staged_ios <= PGAIO_SUBMIT_BATCH_SIZE);
LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
- for (int i = 0; i < nios; ++i)
+ for (int i = 0; i < num_staged_ios; ++i)
{
- Assert(!pgaio_worker_needs_synchronous_execution(ios[i]));
- if (!pgaio_worker_submission_queue_insert(ios[i]))
+ Assert(!pgaio_worker_needs_synchronous_execution(staged_ios[i]));
+ if (!pgaio_worker_submission_queue_insert(staged_ios[i]))
{
/*
* We'll do it synchronously, but only after we've sent as many as
* we can to workers, to maximize concurrency.
*/
- synchronous_ios[nsync++] = ios[i];
+ synchronous_ios[nsync++] = staged_ios[i];
continue;
}
if (wakeup == NULL)
{
/* Choose an idle worker to wake up if we haven't already. */
- worker = pgaio_choose_idle_worker();
+ worker = pgaio_worker_choose_idle();
if (worker >= 0)
wakeup = io_worker_control->workers[worker].latch;
- pgaio_debug_io(DEBUG4, ios[i],
+ pgaio_debug_io(DEBUG4, staged_ios[i],
"choosing worker %d",
worker);
}
@@ -316,6 +317,7 @@ pgaio_worker_die(int code, Datum arg)
Assert(io_worker_control->workers[MyIoWorkerId].in_use);
Assert(io_worker_control->workers[MyIoWorkerId].latch == MyLatch);
+ io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << MyIoWorkerId);
io_worker_control->workers[MyIoWorkerId].in_use = false;
io_worker_control->workers[MyIoWorkerId].latch = NULL;
LWLockRelease(AioWorkerSubmissionQueueLock);
@@ -461,7 +463,12 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
int nwakeups = 0;
int worker;
- /* Try to get a job to do. */
+ /*
+ * Try to get a job to do.
+ *
+ * The lwlock acquisition also provides the necessary memory barrier
+ * to ensure that we don't see an outdated data in the handle.
+ */
LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
if ((io_index = pgaio_worker_submission_queue_consume()) == UINT32_MAX)
{
@@ -483,7 +490,7 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
IO_WORKER_WAKEUP_FANOUT);
for (int i = 0; i < nwakeups; ++i)
{
- if ((worker = pgaio_choose_idle_worker()) < 0)
+ if ((worker = pgaio_worker_choose_idle()) < 0)
break;
latches[nlatches++] = io_worker_control->workers[worker].latch;
}
@@ -568,6 +575,12 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
}
CHECK_FOR_INTERRUPTS();
+
+ if (ConfigReloadPending)
+ {
+ ConfigReloadPending = false;
+ ProcessConfigFile(PGC_SIGHUP);
+ }
}
error_context_stack = errcallback.previous;