/*------------------------------------------------------------------------- * * method_worker.c * AIO - perform AIO using worker processes * * IO workers consume IOs from a shared memory submission queue, run * traditional synchronous system calls, and perform the shared completion * handling immediately. Client code submits most requests by pushing IOs * into the submission queue, and waits (if necessary) using condition * variables. Some IOs cannot be performed in another process due to lack of * infrastructure for reopening the file, and must processed synchronously by * the client code when submitted. * * So that the submitter can make just one system call when submitting a batch * of IOs, wakeups "fan out"; each woken IO worker can wake two more. XXX This * could be improved by using futexes instead of latches to wake N waiters. * * This method of AIO is available in all builds on all operating systems, and * is the default. * * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION * src/backend/storage/aio/method_worker.c * *------------------------------------------------------------------------- */ #include "postgres.h" #include "libpq/pqsignal.h" #include "miscadmin.h" #include "port/pg_bitutils.h" #include "postmaster/auxprocess.h" #include "postmaster/interrupt.h" #include "storage/aio.h" #include "storage/aio_internal.h" #include "storage/aio_subsys.h" #include "storage/io_worker.h" #include "storage/ipc.h" #include "storage/latch.h" #include "storage/proc.h" #include "tcop/tcopprot.h" #include "utils/injection_point.h" #include "utils/memdebug.h" #include "utils/ps_status.h" #include "utils/wait_event.h" /* How many workers should each worker wake up if needed? */ #define IO_WORKER_WAKEUP_FANOUT 2 typedef struct AioWorkerSubmissionQueue { uint32 size; uint32 mask; uint32 head; uint32 tail; uint32 ios[FLEXIBLE_ARRAY_MEMBER]; } AioWorkerSubmissionQueue; typedef struct AioWorkerSlot { Latch *latch; bool in_use; } AioWorkerSlot; typedef struct AioWorkerControl { uint64 idle_worker_mask; AioWorkerSlot workers[FLEXIBLE_ARRAY_MEMBER]; } AioWorkerControl; static size_t pgaio_worker_shmem_size(void); static void pgaio_worker_shmem_init(bool first_time); static bool pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh); static int pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios); const IoMethodOps pgaio_worker_ops = { .shmem_size = pgaio_worker_shmem_size, .shmem_init = pgaio_worker_shmem_init, .needs_synchronous_execution = pgaio_worker_needs_synchronous_execution, .submit = pgaio_worker_submit, }; /* GUCs */ int io_workers = 3; static int io_worker_queue_size = 64; static int MyIoWorkerId; static AioWorkerSubmissionQueue *io_worker_submission_queue; static AioWorkerControl *io_worker_control; static size_t pgaio_worker_queue_shmem_size(int *queue_size) { /* Round size up to next power of two so we can make a mask. */ *queue_size = pg_nextpower2_32(io_worker_queue_size); return offsetof(AioWorkerSubmissionQueue, ios) + sizeof(uint32) * *queue_size; } static size_t pgaio_worker_control_shmem_size(void) { return offsetof(AioWorkerControl, workers) + sizeof(AioWorkerSlot) * MAX_IO_WORKERS; } static size_t pgaio_worker_shmem_size(void) { size_t sz; int queue_size; sz = pgaio_worker_queue_shmem_size(&queue_size); sz = add_size(sz, pgaio_worker_control_shmem_size()); return sz; } static void pgaio_worker_shmem_init(bool first_time) { bool found; int queue_size; io_worker_submission_queue = ShmemInitStruct("AioWorkerSubmissionQueue", pgaio_worker_queue_shmem_size(&queue_size), &found); if (!found) { io_worker_submission_queue->size = queue_size; io_worker_submission_queue->head = 0; io_worker_submission_queue->tail = 0; } io_worker_control = ShmemInitStruct("AioWorkerControl", pgaio_worker_control_shmem_size(), &found); if (!found) { io_worker_control->idle_worker_mask = 0; for (int i = 0; i < MAX_IO_WORKERS; ++i) { io_worker_control->workers[i].latch = NULL; io_worker_control->workers[i].in_use = false; } } } static int pgaio_choose_idle_worker(void) { int worker; if (io_worker_control->idle_worker_mask == 0) return -1; /* Find the lowest bit position, and clear it. */ worker = pg_rightmost_one_pos64(io_worker_control->idle_worker_mask); io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << worker); return worker; } static bool pgaio_worker_submission_queue_insert(PgAioHandle *ioh) { AioWorkerSubmissionQueue *queue; uint32 new_head; queue = io_worker_submission_queue; new_head = (queue->head + 1) & (queue->size - 1); if (new_head == queue->tail) { pgaio_debug(DEBUG3, "io queue is full, at %u elements", io_worker_submission_queue->size); return false; /* full */ } queue->ios[queue->head] = pgaio_io_get_id(ioh); queue->head = new_head; return true; } static uint32 pgaio_worker_submission_queue_consume(void) { AioWorkerSubmissionQueue *queue; uint32 result; queue = io_worker_submission_queue; if (queue->tail == queue->head) return UINT32_MAX; /* empty */ result = queue->ios[queue->tail]; queue->tail = (queue->tail + 1) & (queue->size - 1); return result; } static uint32 pgaio_worker_submission_queue_depth(void) { uint32 head; uint32 tail; head = io_worker_submission_queue->head; tail = io_worker_submission_queue->tail; if (tail > head) head += io_worker_submission_queue->size; Assert(head >= tail); return head - tail; } static bool pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh) { return !IsUnderPostmaster || ioh->flags & PGAIO_HF_REFERENCES_LOCAL || !pgaio_io_can_reopen(ioh); } static void pgaio_worker_submit_internal(int nios, PgAioHandle *ios[]) { PgAioHandle *synchronous_ios[PGAIO_SUBMIT_BATCH_SIZE]; int nsync = 0; Latch *wakeup = NULL; int worker; Assert(nios <= PGAIO_SUBMIT_BATCH_SIZE); LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE); for (int i = 0; i < nios; ++i) { Assert(!pgaio_worker_needs_synchronous_execution(ios[i])); if (!pgaio_worker_submission_queue_insert(ios[i])) { /* * We'll do it synchronously, but only after we've sent as many as * we can to workers, to maximize concurrency. */ synchronous_ios[nsync++] = ios[i]; continue; } if (wakeup == NULL) { /* Choose an idle worker to wake up if we haven't already. */ worker = pgaio_choose_idle_worker(); if (worker >= 0) wakeup = io_worker_control->workers[worker].latch; pgaio_debug_io(DEBUG4, ios[i], "choosing worker %d", worker); } } LWLockRelease(AioWorkerSubmissionQueueLock); if (wakeup) SetLatch(wakeup); /* Run whatever is left synchronously. */ if (nsync > 0) { for (int i = 0; i < nsync; ++i) { pgaio_io_perform_synchronously(synchronous_ios[i]); } } } static int pgaio_worker_submit(uint16 num_staged_ios, PgAioHandle **staged_ios) { for (int i = 0; i < num_staged_ios; i++) { PgAioHandle *ioh = staged_ios[i]; pgaio_io_prepare_submit(ioh); } pgaio_worker_submit_internal(num_staged_ios, staged_ios); return num_staged_ios; } /* * on_shmem_exit() callback that releases the worker's slot in * io_worker_control. */ static void pgaio_worker_die(int code, Datum arg) { LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE); Assert(io_worker_control->workers[MyIoWorkerId].in_use); Assert(io_worker_control->workers[MyIoWorkerId].latch == MyLatch); io_worker_control->workers[MyIoWorkerId].in_use = false; io_worker_control->workers[MyIoWorkerId].latch = NULL; LWLockRelease(AioWorkerSubmissionQueueLock); } /* * Register the worker in shared memory, assign MyIoWorkerId and register a * shutdown callback to release registration. */ static void pgaio_worker_register(void) { MyIoWorkerId = -1; /* * XXX: This could do with more fine-grained locking. But it's also not * very common for the number of workers to change at the moment... */ LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE); for (int i = 0; i < MAX_IO_WORKERS; ++i) { if (!io_worker_control->workers[i].in_use) { Assert(io_worker_control->workers[i].latch == NULL); io_worker_control->workers[i].in_use = true; MyIoWorkerId = i; break; } else Assert(io_worker_control->workers[i].latch != NULL); } if (MyIoWorkerId == -1) elog(ERROR, "couldn't find a free worker slot"); io_worker_control->idle_worker_mask |= (UINT64_C(1) << MyIoWorkerId); io_worker_control->workers[MyIoWorkerId].latch = MyLatch; LWLockRelease(AioWorkerSubmissionQueueLock); on_shmem_exit(pgaio_worker_die, 0); } static void pgaio_worker_error_callback(void *arg) { ProcNumber owner; PGPROC *owner_proc; int32 owner_pid; PgAioHandle *ioh = arg; if (!ioh) return; Assert(ioh->owner_procno != MyProcNumber); Assert(MyBackendType == B_IO_WORKER); owner = ioh->owner_procno; owner_proc = GetPGProcByNumber(owner); owner_pid = owner_proc->pid; errcontext("I/O worker executing I/O on behalf of process %d", owner_pid); } void IoWorkerMain(const void *startup_data, size_t startup_data_len) { sigjmp_buf local_sigjmp_buf; PgAioHandle *volatile error_ioh = NULL; ErrorContextCallback errcallback = {0}; volatile int error_errno = 0; char cmd[128]; MyBackendType = B_IO_WORKER; AuxiliaryProcessMainCommon(); pqsignal(SIGHUP, SignalHandlerForConfigReload); pqsignal(SIGINT, die); /* to allow manually triggering worker restart */ /* * Ignore SIGTERM, will get explicit shutdown via SIGUSR2 later in the * shutdown sequence, similar to checkpointer. */ pqsignal(SIGTERM, SIG_IGN); /* SIGQUIT handler was already set up by InitPostmasterChild */ pqsignal(SIGALRM, SIG_IGN); pqsignal(SIGPIPE, SIG_IGN); pqsignal(SIGUSR1, procsignal_sigusr1_handler); pqsignal(SIGUSR2, SignalHandlerForShutdownRequest); /* also registers a shutdown callback to unregister */ pgaio_worker_register(); sprintf(cmd, "%d", MyIoWorkerId); set_ps_display(cmd); errcallback.callback = pgaio_worker_error_callback; errcallback.previous = error_context_stack; error_context_stack = &errcallback; /* see PostgresMain() */ if (sigsetjmp(local_sigjmp_buf, 1) != 0) { error_context_stack = NULL; HOLD_INTERRUPTS(); EmitErrorReport(); /* * In the - very unlikely - case that the IO failed in a way that * raises an error we need to mark the IO as failed. * * Need to do just enough error recovery so that we can mark the IO as * failed and then exit (postmaster will start a new worker). */ LWLockReleaseAll(); if (error_ioh != NULL) { /* should never fail without setting error_errno */ Assert(error_errno != 0); errno = error_errno; START_CRIT_SECTION(); pgaio_io_process_completion(error_ioh, -error_errno); END_CRIT_SECTION(); } proc_exit(1); } /* We can now handle ereport(ERROR) */ PG_exception_stack = &local_sigjmp_buf; sigprocmask(SIG_SETMASK, &UnBlockSig, NULL); while (!ShutdownRequestPending) { uint32 io_index; Latch *latches[IO_WORKER_WAKEUP_FANOUT]; int nlatches = 0; int nwakeups = 0; int worker; /* Try to get a job to do. */ LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE); if ((io_index = pgaio_worker_submission_queue_consume()) == UINT32_MAX) { /* * Nothing to do. Mark self idle. * * XXX: Invent some kind of back pressure to reduce useless * wakeups? */ io_worker_control->idle_worker_mask |= (UINT64_C(1) << MyIoWorkerId); } else { /* Got one. Clear idle flag. */ io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << MyIoWorkerId); /* See if we can wake up some peers. */ nwakeups = Min(pgaio_worker_submission_queue_depth(), IO_WORKER_WAKEUP_FANOUT); for (int i = 0; i < nwakeups; ++i) { if ((worker = pgaio_choose_idle_worker()) < 0) break; latches[nlatches++] = io_worker_control->workers[worker].latch; } } LWLockRelease(AioWorkerSubmissionQueueLock); for (int i = 0; i < nlatches; ++i) SetLatch(latches[i]); if (io_index != UINT32_MAX) { PgAioHandle *ioh = NULL; ioh = &pgaio_ctl->io_handles[io_index]; error_ioh = ioh; errcallback.arg = ioh; pgaio_debug_io(DEBUG4, ioh, "worker %d processing IO", MyIoWorkerId); /* * Prevent interrupts between pgaio_io_reopen() and * pgaio_io_perform_synchronously() that otherwise could lead to * the FD getting closed in that window. */ HOLD_INTERRUPTS(); /* * It's very unlikely, but possible, that reopen fails. E.g. due * to memory allocations failing or file permissions changing or * such. In that case we need to fail the IO. * * There's not really a good errno we can report here. */ error_errno = ENOENT; pgaio_io_reopen(ioh); /* * To be able to exercise the reopen-fails path, allow injection * points to trigger a failure at this point. */ INJECTION_POINT("aio-worker-after-reopen", ioh); error_errno = 0; error_ioh = NULL; /* * As part of IO completion the buffer will be marked as NOACCESS, * until the buffer is pinned again - which never happens in io * workers. Therefore the next time there is IO for the same * buffer, the memory will be considered inaccessible. To avoid * that, explicitly allow access to the memory before reading data * into it. */ #ifdef USE_VALGRIND { struct iovec *iov; uint16 iov_length = pgaio_io_get_iovec_length(ioh, &iov); for (int i = 0; i < iov_length; i++) VALGRIND_MAKE_MEM_UNDEFINED(iov[i].iov_base, iov[i].iov_len); } #endif /* * We don't expect this to ever fail with ERROR or FATAL, no need * to keep error_ioh set to the IO. * pgaio_io_perform_synchronously() contains a critical section to * ensure we don't accidentally fail. */ pgaio_io_perform_synchronously(ioh); RESUME_INTERRUPTS(); errcallback.arg = NULL; } else { WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1, WAIT_EVENT_IO_WORKER_MAIN); ResetLatch(MyLatch); } CHECK_FOR_INTERRUPTS(); } error_context_stack = errcallback.previous; proc_exit(0); } bool pgaio_workers_enabled(void) { return io_method == IOMETHOD_WORKER; }