/*------------------------------------------------------------------------- * * aio.c * AIO - Core Logic * * For documentation about how AIO works on a higher level, including a * schematic example, see README.md. * * * AIO is a complicated subsystem. To keep things navigable, it is split * across a number of files: * * - method_*.c - different ways of executing AIO (e.g. worker process) * * - aio_target.c - IO on different kinds of targets * * - aio_io.c - method-independent code for specific IO ops (e.g. readv) * * - aio_callback.c - callbacks at IO operation lifecycle events * * - aio_init.c - per-server and per-backend initialization * * - aio.c - all other topics * * - read_stream.c - helper for reading buffered relation data * * - README.md - higher-level overview over AIO * * * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION * src/backend/storage/aio/aio.c * *------------------------------------------------------------------------- */ #include "postgres.h" #include "lib/ilist.h" #include "miscadmin.h" #include "port/atomics.h" #include "storage/aio.h" #include "storage/aio_internal.h" #include "storage/aio_subsys.h" #include "utils/guc.h" #include "utils/guc_hooks.h" #include "utils/injection_point.h" #include "utils/resowner.h" #include "utils/wait_event_types.h" static inline void pgaio_io_update_state(PgAioHandle *ioh, PgAioHandleState new_state); static void pgaio_io_reclaim(PgAioHandle *ioh); static void pgaio_io_resowner_register(PgAioHandle *ioh); static void pgaio_io_wait_for_free(void); static PgAioHandle *pgaio_io_from_wref(PgAioWaitRef *iow, uint64 *ref_generation); static const char *pgaio_io_state_get_name(PgAioHandleState s); static void pgaio_io_wait(PgAioHandle *ioh, uint64 ref_generation); /* Options for io_method. */ const struct config_enum_entry io_method_options[] = { {"sync", IOMETHOD_SYNC, false}, {"worker", IOMETHOD_WORKER, false}, #ifdef IOMETHOD_IO_URING_ENABLED {"io_uring", IOMETHOD_IO_URING, false}, #endif {NULL, 0, false} }; /* GUCs */ int io_method = DEFAULT_IO_METHOD; int io_max_concurrency = -1; /* global control for AIO */ PgAioCtl *pgaio_ctl; /* current backend's per-backend state */ PgAioBackend *pgaio_my_backend; static const IoMethodOps *const pgaio_method_ops_table[] = { [IOMETHOD_SYNC] = &pgaio_sync_ops, [IOMETHOD_WORKER] = &pgaio_worker_ops, #ifdef IOMETHOD_IO_URING_ENABLED [IOMETHOD_IO_URING] = &pgaio_uring_ops, #endif }; /* callbacks for the configured io_method, set by assign_io_method */ const IoMethodOps *pgaio_method_ops; /* -------------------------------------------------------------------------------- * Public Functions related to PgAioHandle * -------------------------------------------------------------------------------- */ /* * Acquire an AioHandle, waiting for IO completion if necessary. * * Each backend can only have one AIO handle that has been "handed out" to * code, but not yet submitted or released. This restriction is necessary to * ensure that it is possible for code to wait for an unused handle by waiting * for in-flight IO to complete. There is a limited number of handles in each * backend, if multiple handles could be handed out without being submitted, * waiting for all in-flight IO to complete would not guarantee that handles * free up. * * It is cheap to acquire an IO handle, unless all handles are in use. In that * case this function waits for the oldest IO to complete. If that is not * desirable, use pgaio_io_acquire_nb(). * * If a handle was acquired but then does not turn out to be needed, * e.g. because pgaio_io_acquire() is called before starting an IO in a * critical section, the handle needs to be released with pgaio_io_release(). * * * To react to the completion of the IO as soon as it is known to have * completed, callbacks can be registered with pgaio_io_register_callbacks(). * * To actually execute IO using the returned handle, the pgaio_io_start_*() * family of functions is used. In many cases the pgaio_io_start_*() call will * not be done directly by code that acquired the handle, but by lower level * code that gets passed the handle. E.g. if code in bufmgr.c wants to perform * AIO, it typically will pass the handle to smgr.c, which will pass it on to * md.c, on to fd.c, which then finally calls pgaio_io_start_*(). This * forwarding allows the various layers to react to the IO's completion by * registering callbacks. These callbacks in turn can translate a lower * layer's result into a result understandable by a higher layer. * * During pgaio_io_start_*() the IO is staged (i.e. prepared for execution but * not submitted to the kernel). Unless in batchmode * (c.f. pgaio_enter_batchmode()), the IO will also get submitted for * execution. Note that, whether in batchmode or not, the IO might even * complete before the functions return. * * After pgaio_io_start_*() the AioHandle is "consumed" and may not be * referenced by the IO issuing code. To e.g. wait for IO, references to the * IO can be established with pgaio_io_get_wref() *before* pgaio_io_start_*() * is called. pgaio_wref_wait() can be used to wait for the IO to complete. * * * To know if the IO [partially] succeeded or failed, a PgAioReturn * can be * passed to pgaio_io_acquire(). Once the issuing backend has called * pgaio_wref_wait(), the PgAioReturn contains information about whether the * operation succeeded and details about the first failure, if any. The error * can be raised / logged with pgaio_result_report(). * * The lifetime of the memory pointed to be *ret needs to be at least as long * as the passed in resowner. If the resowner releases resources before the IO * completes (typically due to an error), the reference to *ret will be * cleared. In case of resowner cleanup *ret will not be updated with the * results of the IO operation. */ PgAioHandle * pgaio_io_acquire(struct ResourceOwnerData *resowner, PgAioReturn *ret) { PgAioHandle *h; while (true) { h = pgaio_io_acquire_nb(resowner, ret); if (h != NULL) return h; /* * Evidently all handles by this backend are in use. Just wait for * some to complete. */ pgaio_io_wait_for_free(); } } /* * Acquire an AioHandle, returning NULL if no handles are free. * * See pgaio_io_acquire(). The only difference is that this function will return * NULL if there are no idle handles, instead of blocking. */ PgAioHandle * pgaio_io_acquire_nb(struct ResourceOwnerData *resowner, PgAioReturn *ret) { PgAioHandle *ioh = NULL; if (pgaio_my_backend->num_staged_ios >= PGAIO_SUBMIT_BATCH_SIZE) { Assert(pgaio_my_backend->num_staged_ios == PGAIO_SUBMIT_BATCH_SIZE); pgaio_submit_staged(); } if (pgaio_my_backend->handed_out_io) elog(ERROR, "API violation: Only one IO can be handed out"); /* * Probably not needed today, as interrupts should not process this IO, * but... */ HOLD_INTERRUPTS(); if (!dclist_is_empty(&pgaio_my_backend->idle_ios)) { dlist_node *ion = dclist_pop_head_node(&pgaio_my_backend->idle_ios); ioh = dclist_container(PgAioHandle, node, ion); Assert(ioh->state == PGAIO_HS_IDLE); Assert(ioh->owner_procno == MyProcNumber); pgaio_io_update_state(ioh, PGAIO_HS_HANDED_OUT); pgaio_my_backend->handed_out_io = ioh; if (resowner) pgaio_io_resowner_register(ioh); if (ret) { ioh->report_return = ret; ret->result.status = PGAIO_RS_UNKNOWN; } } RESUME_INTERRUPTS(); return ioh; } /* * Release IO handle that turned out to not be required. * * See pgaio_io_acquire() for more details. */ void pgaio_io_release(PgAioHandle *ioh) { if (ioh == pgaio_my_backend->handed_out_io) { Assert(ioh->state == PGAIO_HS_HANDED_OUT); Assert(ioh->resowner); pgaio_my_backend->handed_out_io = NULL; /* * Note that no interrupts are processed between the handed_out_io * check and the call to reclaim - that's important as otherwise an * interrupt could have already reclaimed the handle. */ pgaio_io_reclaim(ioh); } else { elog(ERROR, "release in unexpected state"); } } /* * Release IO handle during resource owner cleanup. */ void pgaio_io_release_resowner(dlist_node *ioh_node, bool on_error) { PgAioHandle *ioh = dlist_container(PgAioHandle, resowner_node, ioh_node); Assert(ioh->resowner); /* * Otherwise an interrupt, in the middle of releasing the IO, could end up * trying to wait for the IO, leading to state confusion. */ HOLD_INTERRUPTS(); ResourceOwnerForgetAioHandle(ioh->resowner, &ioh->resowner_node); ioh->resowner = NULL; switch (ioh->state) { case PGAIO_HS_IDLE: elog(ERROR, "unexpected"); break; case PGAIO_HS_HANDED_OUT: Assert(ioh == pgaio_my_backend->handed_out_io || pgaio_my_backend->handed_out_io == NULL); if (ioh == pgaio_my_backend->handed_out_io) { pgaio_my_backend->handed_out_io = NULL; if (!on_error) elog(WARNING, "leaked AIO handle"); } pgaio_io_reclaim(ioh); break; case PGAIO_HS_DEFINED: case PGAIO_HS_STAGED: if (!on_error) elog(WARNING, "AIO handle was not submitted"); pgaio_submit_staged(); break; case PGAIO_HS_SUBMITTED: case PGAIO_HS_COMPLETED_IO: case PGAIO_HS_COMPLETED_SHARED: case PGAIO_HS_COMPLETED_LOCAL: /* this is expected to happen */ break; } /* * Need to unregister the reporting of the IO's result, the memory it's * referencing likely has gone away. */ if (ioh->report_return) ioh->report_return = NULL; RESUME_INTERRUPTS(); } /* * Add a [set of] flags to the IO. * * Note that this combines flags with already set flags, rather than set flags * to explicitly the passed in parameters. This is to allow multiple callsites * to set flags. */ void pgaio_io_set_flag(PgAioHandle *ioh, PgAioHandleFlags flag) { Assert(ioh->state == PGAIO_HS_HANDED_OUT); ioh->flags |= flag; } /* * Returns an ID uniquely identifying the IO handle. This is only really * useful for logging, as handles are reused across multiple IOs. */ int pgaio_io_get_id(PgAioHandle *ioh) { Assert(ioh >= pgaio_ctl->io_handles && ioh < (pgaio_ctl->io_handles + pgaio_ctl->io_handle_count)); return ioh - pgaio_ctl->io_handles; } /* * Return the ProcNumber for the process that can use an IO handle. The * mapping from IO handles to PGPROCs is static, therefore this even works * when the corresponding PGPROC is not in use. */ ProcNumber pgaio_io_get_owner(PgAioHandle *ioh) { return ioh->owner_procno; } /* * Return a wait reference for the IO. Only wait references can be used to * wait for an IOs completion, as handles themselves can be reused after * completion. See also the comment above pgaio_io_acquire(). */ void pgaio_io_get_wref(PgAioHandle *ioh, PgAioWaitRef *iow) { Assert(ioh->state == PGAIO_HS_HANDED_OUT || ioh->state == PGAIO_HS_DEFINED || ioh->state == PGAIO_HS_STAGED); Assert(ioh->generation != 0); iow->aio_index = ioh - pgaio_ctl->io_handles; iow->generation_upper = (uint32) (ioh->generation >> 32); iow->generation_lower = (uint32) ioh->generation; } /* -------------------------------------------------------------------------------- * Internal Functions related to PgAioHandle * -------------------------------------------------------------------------------- */ static inline void pgaio_io_update_state(PgAioHandle *ioh, PgAioHandleState new_state) { /* * All callers need to have held interrupts in some form, otherwise * interrupt processing could wait for the IO to complete, while in an * intermediary state. */ Assert(!INTERRUPTS_CAN_BE_PROCESSED()); pgaio_debug_io(DEBUG5, ioh, "updating state to %s", pgaio_io_state_get_name(new_state)); /* * Ensure the changes signified by the new state are visible before the * new state becomes visible. */ pg_write_barrier(); ioh->state = new_state; } static void pgaio_io_resowner_register(PgAioHandle *ioh) { Assert(!ioh->resowner); Assert(CurrentResourceOwner); ResourceOwnerRememberAioHandle(CurrentResourceOwner, &ioh->resowner_node); ioh->resowner = CurrentResourceOwner; } /* * Stage IO for execution and, if appropriate, submit it immediately. * * Should only be called from pgaio_io_start_*(). */ void pgaio_io_stage(PgAioHandle *ioh, PgAioOp op) { bool needs_synchronous; Assert(ioh->state == PGAIO_HS_HANDED_OUT); Assert(pgaio_my_backend->handed_out_io == ioh); Assert(pgaio_io_has_target(ioh)); /* * Otherwise an interrupt, in the middle of staging and possibly executing * the IO, could end up trying to wait for the IO, leading to state * confusion. */ HOLD_INTERRUPTS(); ioh->op = op; ioh->result = 0; pgaio_io_update_state(ioh, PGAIO_HS_DEFINED); /* allow a new IO to be staged */ pgaio_my_backend->handed_out_io = NULL; pgaio_io_call_stage(ioh); pgaio_io_update_state(ioh, PGAIO_HS_STAGED); /* * Synchronous execution has to be executed, well, synchronously, so check * that first. */ needs_synchronous = pgaio_io_needs_synchronous_execution(ioh); pgaio_debug_io(DEBUG3, ioh, "staged (synchronous: %d, in_batch: %d)", needs_synchronous, pgaio_my_backend->in_batchmode); if (!needs_synchronous) { pgaio_my_backend->staged_ios[pgaio_my_backend->num_staged_ios++] = ioh; Assert(pgaio_my_backend->num_staged_ios <= PGAIO_SUBMIT_BATCH_SIZE); /* * Unless code explicitly opted into batching IOs, submit the IO * immediately. */ if (!pgaio_my_backend->in_batchmode) pgaio_submit_staged(); } else { pgaio_io_prepare_submit(ioh); pgaio_io_perform_synchronously(ioh); } RESUME_INTERRUPTS(); } bool pgaio_io_needs_synchronous_execution(PgAioHandle *ioh) { /* * If the caller said to execute the IO synchronously, do so. * * XXX: We could optimize the logic when to execute synchronously by first * checking if there are other IOs in flight and only synchronously * executing if not. Unclear whether that'll be sufficiently common to be * worth worrying about. */ if (ioh->flags & PGAIO_HF_SYNCHRONOUS) return true; /* Check if the IO method requires synchronous execution of IO */ if (pgaio_method_ops->needs_synchronous_execution) return pgaio_method_ops->needs_synchronous_execution(ioh); return false; } /* * Handle IO being processed by IO method. * * Should be called by IO methods / synchronous IO execution, just before the * IO is performed. */ void pgaio_io_prepare_submit(PgAioHandle *ioh) { pgaio_io_update_state(ioh, PGAIO_HS_SUBMITTED); dclist_push_tail(&pgaio_my_backend->in_flight_ios, &ioh->node); } /* * Handle IO getting completed by a method. * * Should be called by IO methods / synchronous IO execution, just after the * IO has been performed. * * Expects to be called in a critical section. We expect IOs to be usable for * WAL etc, which requires being able to execute completion callbacks in a * critical section. */ void pgaio_io_process_completion(PgAioHandle *ioh, int result) { Assert(ioh->state == PGAIO_HS_SUBMITTED); Assert(CritSectionCount > 0); ioh->result = result; pgaio_io_update_state(ioh, PGAIO_HS_COMPLETED_IO); INJECTION_POINT("aio-process-completion-before-shared", ioh); pgaio_io_call_complete_shared(ioh); pgaio_io_update_state(ioh, PGAIO_HS_COMPLETED_SHARED); /* condition variable broadcast ensures state is visible before wakeup */ ConditionVariableBroadcast(&ioh->cv); /* contains call to pgaio_io_call_complete_local() */ if (ioh->owner_procno == MyProcNumber) pgaio_io_reclaim(ioh); } /* * Has the IO completed and thus the IO handle been reused? * * This is useful when waiting for IO completion at a low level (e.g. in an IO * method's ->wait_one() callback). */ bool pgaio_io_was_recycled(PgAioHandle *ioh, uint64 ref_generation, PgAioHandleState *state) { *state = ioh->state; pg_read_barrier(); return ioh->generation != ref_generation; } /* * Wait for IO to complete. External code should never use this, outside of * the AIO subsystem waits are only allowed via pgaio_wref_wait(). */ static void pgaio_io_wait(PgAioHandle *ioh, uint64 ref_generation) { PgAioHandleState state; bool am_owner; am_owner = ioh->owner_procno == MyProcNumber; if (pgaio_io_was_recycled(ioh, ref_generation, &state)) return; if (am_owner) { if (state != PGAIO_HS_SUBMITTED && state != PGAIO_HS_COMPLETED_IO && state != PGAIO_HS_COMPLETED_SHARED && state != PGAIO_HS_COMPLETED_LOCAL) { elog(PANIC, "waiting for own IO %d in wrong state: %s", pgaio_io_get_id(ioh), pgaio_io_get_state_name(ioh)); } } while (true) { if (pgaio_io_was_recycled(ioh, ref_generation, &state)) return; switch (state) { case PGAIO_HS_IDLE: case PGAIO_HS_HANDED_OUT: elog(ERROR, "IO in wrong state: %d", state); break; case PGAIO_HS_SUBMITTED: /* * If we need to wait via the IO method, do so now. Don't * check via the IO method if the issuing backend is executing * the IO synchronously. */ if (pgaio_method_ops->wait_one && !(ioh->flags & PGAIO_HF_SYNCHRONOUS)) { pgaio_method_ops->wait_one(ioh, ref_generation); continue; } /* fallthrough */ /* waiting for owner to submit */ case PGAIO_HS_DEFINED: case PGAIO_HS_STAGED: /* waiting for reaper to complete */ /* fallthrough */ case PGAIO_HS_COMPLETED_IO: /* shouldn't be able to hit this otherwise */ Assert(IsUnderPostmaster); /* ensure we're going to get woken up */ ConditionVariablePrepareToSleep(&ioh->cv); while (!pgaio_io_was_recycled(ioh, ref_generation, &state)) { if (state == PGAIO_HS_COMPLETED_SHARED || state == PGAIO_HS_COMPLETED_LOCAL) break; ConditionVariableSleep(&ioh->cv, WAIT_EVENT_AIO_IO_COMPLETION); } ConditionVariableCancelSleep(); break; case PGAIO_HS_COMPLETED_SHARED: case PGAIO_HS_COMPLETED_LOCAL: /* * Note that no interrupts are processed between * pgaio_io_was_recycled() and this check - that's important * as otherwise an interrupt could have already reclaimed the * handle. */ if (am_owner) pgaio_io_reclaim(ioh); return; } } } /* * Make IO handle ready to be reused after IO has completed or after the * handle has been released without being used. * * Note that callers need to be careful about only calling this in the right * state and that no interrupts can be processed between the state check and * the call to pgaio_io_reclaim(). Otherwise interrupt processing could * already have reclaimed the handle. */ static void pgaio_io_reclaim(PgAioHandle *ioh) { /* This is only ok if it's our IO */ Assert(ioh->owner_procno == MyProcNumber); Assert(ioh->state != PGAIO_HS_IDLE); /* see comment in function header */ HOLD_INTERRUPTS(); /* * It's a bit ugly, but right now the easiest place to put the execution * of local completion callbacks is this function, as we need to execute * local callbacks just before reclaiming at multiple callsites. */ if (ioh->state == PGAIO_HS_COMPLETED_SHARED) { PgAioResult local_result; local_result = pgaio_io_call_complete_local(ioh); pgaio_io_update_state(ioh, PGAIO_HS_COMPLETED_LOCAL); if (ioh->report_return) { ioh->report_return->result = local_result; ioh->report_return->target_data = ioh->target_data; } } pgaio_debug_io(DEBUG4, ioh, "reclaiming: distilled_result: (status %s, id %u, error_data %d), raw_result: %d", pgaio_result_status_string(ioh->distilled_result.status), ioh->distilled_result.id, ioh->distilled_result.error_data, ioh->result); /* if the IO has been defined, it's on the in-flight list, remove */ if (ioh->state != PGAIO_HS_HANDED_OUT) dclist_delete_from(&pgaio_my_backend->in_flight_ios, &ioh->node); if (ioh->resowner) { ResourceOwnerForgetAioHandle(ioh->resowner, &ioh->resowner_node); ioh->resowner = NULL; } Assert(!ioh->resowner); /* * Update generation & state first, before resetting the IO's fields, * otherwise a concurrent "viewer" could think the fields are valid, even * though they are being reset. Increment the generation first, so that * we can assert elsewhere that we never wait for an IDLE IO. While it's * a bit weird for the state to go backwards for a generation, it's OK * here, as there cannot be references to the "reborn" IO yet. Can't * update both at once, so something has to give. */ ioh->generation++; pgaio_io_update_state(ioh, PGAIO_HS_IDLE); /* ensure the state update is visible before we reset fields */ pg_write_barrier(); ioh->op = PGAIO_OP_INVALID; ioh->target = PGAIO_TID_INVALID; ioh->flags = 0; ioh->num_callbacks = 0; ioh->handle_data_len = 0; ioh->report_return = NULL; ioh->result = 0; ioh->distilled_result.status = PGAIO_RS_UNKNOWN; /* * We push the IO to the head of the idle IO list, that seems more cache * efficient in cases where only a few IOs are used. */ dclist_push_head(&pgaio_my_backend->idle_ios, &ioh->node); RESUME_INTERRUPTS(); } /* * Wait for an IO handle to become usable. * * This only really is useful for pgaio_io_acquire(). */ static void pgaio_io_wait_for_free(void) { int reclaimed = 0; pgaio_debug(DEBUG2, "waiting for free IO with %d pending, %d in-flight, %d idle IOs", pgaio_my_backend->num_staged_ios, dclist_count(&pgaio_my_backend->in_flight_ios), dclist_count(&pgaio_my_backend->idle_ios)); /* * First check if any of our IOs actually have completed - when using * worker, that'll often be the case. We could do so as part of the loop * below, but that'd potentially lead us to wait for some IO submitted * before. */ for (int i = 0; i < io_max_concurrency; i++) { PgAioHandle *ioh = &pgaio_ctl->io_handles[pgaio_my_backend->io_handle_off + i]; if (ioh->state == PGAIO_HS_COMPLETED_SHARED) { /* * Note that no interrupts are processed between the state check * and the call to reclaim - that's important as otherwise an * interrupt could have already reclaimed the handle. */ pgaio_io_reclaim(ioh); reclaimed++; } } if (reclaimed > 0) return; /* * If we have any unsubmitted IOs, submit them now. We'll start waiting in * a second, so it's better they're in flight. This also addresses the * edge-case that all IOs are unsubmitted. */ if (pgaio_my_backend->num_staged_ios > 0) pgaio_submit_staged(); /* possibly some IOs finished during submission */ if (!dclist_is_empty(&pgaio_my_backend->idle_ios)) return; if (dclist_count(&pgaio_my_backend->in_flight_ios) == 0) ereport(ERROR, errmsg_internal("no free IOs despite no in-flight IOs"), errdetail_internal("%d pending, %d in-flight, %d idle IOs", pgaio_my_backend->num_staged_ios, dclist_count(&pgaio_my_backend->in_flight_ios), dclist_count(&pgaio_my_backend->idle_ios))); /* * Wait for the oldest in-flight IO to complete. * * XXX: Reusing the general IO wait is suboptimal, we don't need to wait * for that specific IO to complete, we just need *any* IO to complete. */ { PgAioHandle *ioh = dclist_head_element(PgAioHandle, node, &pgaio_my_backend->in_flight_ios); uint64 generation = ioh->generation; switch (ioh->state) { /* should not be in in-flight list */ case PGAIO_HS_IDLE: case PGAIO_HS_DEFINED: case PGAIO_HS_HANDED_OUT: case PGAIO_HS_STAGED: case PGAIO_HS_COMPLETED_LOCAL: elog(ERROR, "shouldn't get here with io:%d in state %d", pgaio_io_get_id(ioh), ioh->state); break; case PGAIO_HS_COMPLETED_IO: case PGAIO_HS_SUBMITTED: pgaio_debug_io(DEBUG2, ioh, "waiting for free io with %d in flight", dclist_count(&pgaio_my_backend->in_flight_ios)); /* * In a more general case this would be racy, because the * generation could increase after we read ioh->state above. * But we are only looking at IOs by the current backend and * the IO can only be recycled by this backend. Even this is * only OK because we get the handle's generation before * potentially processing interrupts, e.g. as part of * pgaio_debug_io(). */ pgaio_io_wait(ioh, generation); break; case PGAIO_HS_COMPLETED_SHARED: /* * It's possible that another backend just finished this IO. * * Note that no interrupts are processed between the state * check and the call to reclaim - that's important as * otherwise an interrupt could have already reclaimed the * handle. */ pgaio_io_reclaim(ioh); break; } if (dclist_count(&pgaio_my_backend->idle_ios) == 0) elog(PANIC, "no idle IO after waiting for IO to terminate"); return; } } /* * Internal - code outside of AIO should never need this and it'd be hard for * such code to be safe. */ static PgAioHandle * pgaio_io_from_wref(PgAioWaitRef *iow, uint64 *ref_generation) { PgAioHandle *ioh; Assert(iow->aio_index < pgaio_ctl->io_handle_count); ioh = &pgaio_ctl->io_handles[iow->aio_index]; *ref_generation = ((uint64) iow->generation_upper) << 32 | iow->generation_lower; Assert(*ref_generation != 0); return ioh; } static const char * pgaio_io_state_get_name(PgAioHandleState s) { #define PGAIO_HS_TOSTR_CASE(sym) case PGAIO_HS_##sym: return #sym switch (s) { PGAIO_HS_TOSTR_CASE(IDLE); PGAIO_HS_TOSTR_CASE(HANDED_OUT); PGAIO_HS_TOSTR_CASE(DEFINED); PGAIO_HS_TOSTR_CASE(STAGED); PGAIO_HS_TOSTR_CASE(SUBMITTED); PGAIO_HS_TOSTR_CASE(COMPLETED_IO); PGAIO_HS_TOSTR_CASE(COMPLETED_SHARED); PGAIO_HS_TOSTR_CASE(COMPLETED_LOCAL); } #undef PGAIO_HS_TOSTR_CASE return NULL; /* silence compiler */ } const char * pgaio_io_get_state_name(PgAioHandle *ioh) { return pgaio_io_state_get_name(ioh->state); } const char * pgaio_result_status_string(PgAioResultStatus rs) { switch (rs) { case PGAIO_RS_UNKNOWN: return "UNKNOWN"; case PGAIO_RS_OK: return "OK"; case PGAIO_RS_WARNING: return "WARNING"; case PGAIO_RS_PARTIAL: return "PARTIAL"; case PGAIO_RS_ERROR: return "ERROR"; } return NULL; /* silence compiler */ } /* -------------------------------------------------------------------------------- * Functions primarily related to IO Wait References * -------------------------------------------------------------------------------- */ /* * Mark a wait reference as invalid */ void pgaio_wref_clear(PgAioWaitRef *iow) { iow->aio_index = PG_UINT32_MAX; } /* Is the wait reference valid? */ bool pgaio_wref_valid(PgAioWaitRef *iow) { return iow->aio_index != PG_UINT32_MAX; } /* * Similar to pgaio_io_get_id(), just for wait references. */ int pgaio_wref_get_id(PgAioWaitRef *iow) { Assert(pgaio_wref_valid(iow)); return iow->aio_index; } /* * Wait for the IO to have completed. Can be called in any process, not just * in the issuing backend. */ void pgaio_wref_wait(PgAioWaitRef *iow) { uint64 ref_generation; PgAioHandle *ioh; ioh = pgaio_io_from_wref(iow, &ref_generation); pgaio_io_wait(ioh, ref_generation); } /* * Check if the referenced IO completed, without blocking. */ bool pgaio_wref_check_done(PgAioWaitRef *iow) { uint64 ref_generation; PgAioHandleState state; bool am_owner; PgAioHandle *ioh; ioh = pgaio_io_from_wref(iow, &ref_generation); if (pgaio_io_was_recycled(ioh, ref_generation, &state)) return true; if (state == PGAIO_HS_IDLE) return true; am_owner = ioh->owner_procno == MyProcNumber; if (state == PGAIO_HS_COMPLETED_SHARED || state == PGAIO_HS_COMPLETED_LOCAL) { /* * Note that no interrupts are processed between * pgaio_io_was_recycled() and this check - that's important as * otherwise an interrupt could have already reclaimed the handle. */ if (am_owner) pgaio_io_reclaim(ioh); return true; } /* * XXX: It likely would be worth checking in with the io method, to give * the IO method a chance to check if there are completion events queued. */ return false; } /* -------------------------------------------------------------------------------- * Actions on multiple IOs. * -------------------------------------------------------------------------------- */ /* * Submit IOs in batches going forward. * * Submitting multiple IOs at once can be substantially faster than doing so * one-by-one. At the same time, submitting multiple IOs at once requires more * care to avoid deadlocks. * * Consider backend A staging an IO for buffer 1 and then trying to start IO * on buffer 2, while backend B does the inverse. If A submitted the IO before * moving on to buffer 2, this works just fine, B will wait for the IO to * complete. But if batching were used, each backend will wait for IO that has * not yet been submitted to complete, i.e. forever. * * End batch submission mode with pgaio_exit_batchmode(). (Throwing errors is * allowed; error recovery will end the batch.) * * To avoid deadlocks, code needs to ensure that it will not wait for another * backend while there is unsubmitted IO. E.g. by using conditional lock * acquisition when acquiring buffer locks. To check if there currently are * staged IOs, call pgaio_have_staged() and to submit all staged IOs call * pgaio_submit_staged(). * * It is not allowed to enter batchmode while already in batchmode, it's * unlikely to ever be needed, as code needs to be explicitly aware of being * called in batchmode, to avoid the deadlock risks explained above. * * Note that IOs may get submitted before pgaio_exit_batchmode() is called, * e.g. because too many IOs have been staged or because pgaio_submit_staged() * was called. */ void pgaio_enter_batchmode(void) { if (pgaio_my_backend->in_batchmode) elog(ERROR, "starting batch while batch already in progress"); pgaio_my_backend->in_batchmode = true; } /* * Stop submitting IOs in batches. */ void pgaio_exit_batchmode(void) { Assert(pgaio_my_backend->in_batchmode); pgaio_submit_staged(); pgaio_my_backend->in_batchmode = false; } /* * Are there staged but unsubmitted IOs? * * See comment above pgaio_enter_batchmode() for why code may need to check if * there is IO in that state. */ bool pgaio_have_staged(void) { Assert(pgaio_my_backend->in_batchmode || pgaio_my_backend->num_staged_ios == 0); return pgaio_my_backend->num_staged_ios > 0; } /* * Submit all staged but not yet submitted IOs. * * Unless in batch mode, this never needs to be called, as IOs get submitted * as soon as possible. While in batchmode pgaio_submit_staged() can be called * before waiting on another backend, to avoid the risk of deadlocks. See * pgaio_enter_batchmode(). */ void pgaio_submit_staged(void) { int total_submitted = 0; int did_submit; if (pgaio_my_backend->num_staged_ios == 0) return; START_CRIT_SECTION(); did_submit = pgaio_method_ops->submit(pgaio_my_backend->num_staged_ios, pgaio_my_backend->staged_ios); END_CRIT_SECTION(); total_submitted += did_submit; Assert(total_submitted == did_submit); pgaio_my_backend->num_staged_ios = 0; pgaio_debug(DEBUG4, "aio: submitted %d IOs", total_submitted); } /* -------------------------------------------------------------------------------- * Other * -------------------------------------------------------------------------------- */ /* * Perform AIO related cleanup after an error. * * This should be called early in the error recovery paths, as later steps may * need to issue AIO (e.g. to record a transaction abort WAL record). */ void pgaio_error_cleanup(void) { /* * It is possible that code errored out after pgaio_enter_batchmode() but * before pgaio_exit_batchmode() was called. In that case we need to * submit the IO now. */ if (pgaio_my_backend->in_batchmode) { pgaio_my_backend->in_batchmode = false; pgaio_submit_staged(); } /* * As we aren't in batchmode, there shouldn't be any unsubmitted IOs. */ Assert(pgaio_my_backend->num_staged_ios == 0); } /* * Perform AIO related checks at (sub-)transactional boundaries. * * This should be called late during (sub-)transactional commit/abort, after * all steps that might need to perform AIO, so that we can verify that the * AIO subsystem is in a valid state at the end of a transaction. */ void AtEOXact_Aio(bool is_commit) { /* * We should never be in batch mode at transactional boundaries. In case * an error was thrown while in batch mode, pgaio_error_cleanup() should * have exited batchmode. * * In case we are in batchmode somehow, make sure to submit all staged * IOs, other backends may need them to complete to continue. */ if (pgaio_my_backend->in_batchmode) { pgaio_error_cleanup(); elog(WARNING, "open AIO batch at end of (sub-)transaction"); } /* * As we aren't in batchmode, there shouldn't be any unsubmitted IOs. */ Assert(pgaio_my_backend->num_staged_ios == 0); } /* * Need to submit staged but not yet submitted IOs using the fd, otherwise * the IO would end up targeting something bogus. */ void pgaio_closing_fd(int fd) { /* * Might be called before AIO is initialized or in a subprocess that * doesn't use AIO. */ if (!pgaio_my_backend) return; /* * For now just submit all staged IOs - we could be more selective, but * it's probably not worth it. */ if (pgaio_my_backend->num_staged_ios > 0) { pgaio_debug(DEBUG2, "submitting %d IOs before FD %d gets closed", pgaio_my_backend->num_staged_ios, fd); pgaio_submit_staged(); } /* * If requested by the IO method, wait for all IOs that use the * to-be-closed FD. */ if (pgaio_method_ops->wait_on_fd_before_close) { /* * As waiting for one IO to complete may complete multiple IOs, we * can't just use a mutable list iterator. The maximum number of * in-flight IOs is fairly small, so just restart the loop after * waiting for an IO. */ while (!dclist_is_empty(&pgaio_my_backend->in_flight_ios)) { dlist_iter iter; PgAioHandle *ioh = NULL; uint64 generation; dclist_foreach(iter, &pgaio_my_backend->in_flight_ios) { ioh = dclist_container(PgAioHandle, node, iter.cur); generation = ioh->generation; if (pgaio_io_uses_fd(ioh, fd)) break; else ioh = NULL; } if (!ioh) break; pgaio_debug_io(DEBUG2, ioh, "waiting for IO before FD %d gets closed, %d in-flight IOs", fd, dclist_count(&pgaio_my_backend->in_flight_ios)); /* see comment in pgaio_io_wait_for_free() about raciness */ pgaio_io_wait(ioh, generation); } } } /* * Registered as before_shmem_exit() callback in pgaio_init_backend() */ void pgaio_shutdown(int code, Datum arg) { Assert(pgaio_my_backend); Assert(!pgaio_my_backend->handed_out_io); /* first clean up resources as we would at a transaction boundary */ AtEOXact_Aio(code == 0); /* * Before exiting, make sure that all IOs are finished. That has two main * purposes: * * - Some kernel-level AIO mechanisms don't deal well with the issuer of * an AIO exiting before IO completed * * - It'd be confusing to see partially finished IOs in stats views etc */ while (!dclist_is_empty(&pgaio_my_backend->in_flight_ios)) { PgAioHandle *ioh = dclist_head_element(PgAioHandle, node, &pgaio_my_backend->in_flight_ios); uint64 generation = ioh->generation; pgaio_debug_io(DEBUG2, ioh, "waiting for IO to complete during shutdown, %d in-flight IOs", dclist_count(&pgaio_my_backend->in_flight_ios)); /* see comment in pgaio_io_wait_for_free() about raciness */ pgaio_io_wait(ioh, generation); } pgaio_my_backend = NULL; } void assign_io_method(int newval, void *extra) { Assert(pgaio_method_ops_table[newval] != NULL); Assert(newval < lengthof(io_method_options)); pgaio_method_ops = pgaio_method_ops_table[newval]; } bool check_io_max_concurrency(int *newval, void **extra, GucSource source) { if (*newval == -1) { /* * Auto-tuning will be applied later during startup, as auto-tuning * depends on the value of various GUCs. */ return true; } else if (*newval == 0) { GUC_check_errdetail("Only -1 or values bigger than 0 are valid."); return false; } return true; }