aboutsummaryrefslogtreecommitdiff
path: root/src/backend/storage/aio/method_worker.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/storage/aio/method_worker.c')
-rw-r--r--src/backend/storage/aio/method_worker.c69
1 files changed, 41 insertions, 28 deletions
diff --git a/src/backend/storage/aio/method_worker.c b/src/backend/storage/aio/method_worker.c
index 743cccc2acd..bf8f77e6ff6 100644
--- a/src/backend/storage/aio/method_worker.c
+++ b/src/backend/storage/aio/method_worker.c
@@ -52,26 +52,26 @@
#define IO_WORKER_WAKEUP_FANOUT 2
-typedef struct AioWorkerSubmissionQueue
+typedef struct PgAioWorkerSubmissionQueue
{
uint32 size;
uint32 mask;
uint32 head;
uint32 tail;
- uint32 ios[FLEXIBLE_ARRAY_MEMBER];
-} AioWorkerSubmissionQueue;
+ uint32 sqes[FLEXIBLE_ARRAY_MEMBER];
+} PgAioWorkerSubmissionQueue;
-typedef struct AioWorkerSlot
+typedef struct PgAioWorkerSlot
{
Latch *latch;
bool in_use;
-} AioWorkerSlot;
+} PgAioWorkerSlot;
-typedef struct AioWorkerControl
+typedef struct PgAioWorkerControl
{
uint64 idle_worker_mask;
- AioWorkerSlot workers[FLEXIBLE_ARRAY_MEMBER];
-} AioWorkerControl;
+ PgAioWorkerSlot workers[FLEXIBLE_ARRAY_MEMBER];
+} PgAioWorkerControl;
static size_t pgaio_worker_shmem_size(void);
@@ -96,8 +96,8 @@ int io_workers = 3;
static int io_worker_queue_size = 64;
static int MyIoWorkerId;
-static AioWorkerSubmissionQueue *io_worker_submission_queue;
-static AioWorkerControl *io_worker_control;
+static PgAioWorkerSubmissionQueue *io_worker_submission_queue;
+static PgAioWorkerControl *io_worker_control;
static size_t
@@ -106,15 +106,15 @@ pgaio_worker_queue_shmem_size(int *queue_size)
/* Round size up to next power of two so we can make a mask. */
*queue_size = pg_nextpower2_32(io_worker_queue_size);
- return offsetof(AioWorkerSubmissionQueue, ios) +
+ return offsetof(PgAioWorkerSubmissionQueue, sqes) +
sizeof(uint32) * *queue_size;
}
static size_t
pgaio_worker_control_shmem_size(void)
{
- return offsetof(AioWorkerControl, workers) +
- sizeof(AioWorkerSlot) * MAX_IO_WORKERS;
+ return offsetof(PgAioWorkerControl, workers) +
+ sizeof(PgAioWorkerSlot) * MAX_IO_WORKERS;
}
static size_t
@@ -162,7 +162,7 @@ pgaio_worker_shmem_init(bool first_time)
}
static int
-pgaio_choose_idle_worker(void)
+pgaio_worker_choose_idle(void)
{
int worker;
@@ -172,6 +172,7 @@ pgaio_choose_idle_worker(void)
/* Find the lowest bit position, and clear it. */
worker = pg_rightmost_one_pos64(io_worker_control->idle_worker_mask);
io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << worker);
+ Assert(io_worker_control->workers[worker].in_use);
return worker;
}
@@ -179,7 +180,7 @@ pgaio_choose_idle_worker(void)
static bool
pgaio_worker_submission_queue_insert(PgAioHandle *ioh)
{
- AioWorkerSubmissionQueue *queue;
+ PgAioWorkerSubmissionQueue *queue;
uint32 new_head;
queue = io_worker_submission_queue;
@@ -191,7 +192,7 @@ pgaio_worker_submission_queue_insert(PgAioHandle *ioh)
return false; /* full */
}
- queue->ios[queue->head] = pgaio_io_get_id(ioh);
+ queue->sqes[queue->head] = pgaio_io_get_id(ioh);
queue->head = new_head;
return true;
@@ -200,14 +201,14 @@ pgaio_worker_submission_queue_insert(PgAioHandle *ioh)
static uint32
pgaio_worker_submission_queue_consume(void)
{
- AioWorkerSubmissionQueue *queue;
+ PgAioWorkerSubmissionQueue *queue;
uint32 result;
queue = io_worker_submission_queue;
if (queue->tail == queue->head)
return UINT32_MAX; /* empty */
- result = queue->ios[queue->tail];
+ result = queue->sqes[queue->tail];
queue->tail = (queue->tail + 1) & (queue->size - 1);
return result;
@@ -240,37 +241,37 @@ pgaio_worker_needs_synchronous_execution(PgAioHandle *ioh)
}
static void
-pgaio_worker_submit_internal(int nios, PgAioHandle *ios[])
+pgaio_worker_submit_internal(int num_staged_ios, PgAioHandle **staged_ios)
{
PgAioHandle *synchronous_ios[PGAIO_SUBMIT_BATCH_SIZE];
int nsync = 0;
Latch *wakeup = NULL;
int worker;
- Assert(nios <= PGAIO_SUBMIT_BATCH_SIZE);
+ Assert(num_staged_ios <= PGAIO_SUBMIT_BATCH_SIZE);
LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
- for (int i = 0; i < nios; ++i)
+ for (int i = 0; i < num_staged_ios; ++i)
{
- Assert(!pgaio_worker_needs_synchronous_execution(ios[i]));
- if (!pgaio_worker_submission_queue_insert(ios[i]))
+ Assert(!pgaio_worker_needs_synchronous_execution(staged_ios[i]));
+ if (!pgaio_worker_submission_queue_insert(staged_ios[i]))
{
/*
* We'll do it synchronously, but only after we've sent as many as
* we can to workers, to maximize concurrency.
*/
- synchronous_ios[nsync++] = ios[i];
+ synchronous_ios[nsync++] = staged_ios[i];
continue;
}
if (wakeup == NULL)
{
/* Choose an idle worker to wake up if we haven't already. */
- worker = pgaio_choose_idle_worker();
+ worker = pgaio_worker_choose_idle();
if (worker >= 0)
wakeup = io_worker_control->workers[worker].latch;
- pgaio_debug_io(DEBUG4, ios[i],
+ pgaio_debug_io(DEBUG4, staged_ios[i],
"choosing worker %d",
worker);
}
@@ -316,6 +317,7 @@ pgaio_worker_die(int code, Datum arg)
Assert(io_worker_control->workers[MyIoWorkerId].in_use);
Assert(io_worker_control->workers[MyIoWorkerId].latch == MyLatch);
+ io_worker_control->idle_worker_mask &= ~(UINT64_C(1) << MyIoWorkerId);
io_worker_control->workers[MyIoWorkerId].in_use = false;
io_worker_control->workers[MyIoWorkerId].latch = NULL;
LWLockRelease(AioWorkerSubmissionQueueLock);
@@ -461,7 +463,12 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
int nwakeups = 0;
int worker;
- /* Try to get a job to do. */
+ /*
+ * Try to get a job to do.
+ *
+ * The lwlock acquisition also provides the necessary memory barrier
+ * to ensure that we don't see an outdated data in the handle.
+ */
LWLockAcquire(AioWorkerSubmissionQueueLock, LW_EXCLUSIVE);
if ((io_index = pgaio_worker_submission_queue_consume()) == UINT32_MAX)
{
@@ -483,7 +490,7 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
IO_WORKER_WAKEUP_FANOUT);
for (int i = 0; i < nwakeups; ++i)
{
- if ((worker = pgaio_choose_idle_worker()) < 0)
+ if ((worker = pgaio_worker_choose_idle()) < 0)
break;
latches[nlatches++] = io_worker_control->workers[worker].latch;
}
@@ -568,6 +575,12 @@ IoWorkerMain(const void *startup_data, size_t startup_data_len)
}
CHECK_FOR_INTERRUPTS();
+
+ if (ConfigReloadPending)
+ {
+ ConfigReloadPending = false;
+ ProcessConfigFile(PGC_SIGHUP);
+ }
}
error_context_stack = errcallback.previous;