1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
|
/*-------------------------------------------------------------------------
*
* aio_init.c
* AIO - Subsystem Initialization
*
* Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
* IDENTIFICATION
* src/backend/storage/aio/aio_init.c
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "miscadmin.h"
#include "storage/aio.h"
#include "storage/aio_internal.h"
#include "storage/aio_subsys.h"
#include "storage/bufmgr.h"
#include "storage/io_worker.h"
#include "storage/ipc.h"
#include "storage/proc.h"
#include "storage/shmem.h"
#include "utils/guc.h"
static Size
AioCtlShmemSize(void)
{
Size sz;
/* pgaio_ctl itself */
sz = offsetof(PgAioCtl, io_handles);
return sz;
}
static uint32
AioProcs(void)
{
/*
* While AIO workers don't need their own AIO context, we can't currently
* guarantee nothing gets assigned to the a ProcNumber for an IO worker if
* we just subtracted MAX_IO_WORKERS.
*/
return MaxBackends + NUM_AUXILIARY_PROCS;
}
static Size
AioBackendShmemSize(void)
{
return mul_size(AioProcs(), sizeof(PgAioBackend));
}
static Size
AioHandleShmemSize(void)
{
Size sz;
/* verify AioChooseMaxConcurrency() did its thing */
Assert(io_max_concurrency > 0);
/* io handles */
sz = mul_size(AioProcs(),
mul_size(io_max_concurrency, sizeof(PgAioHandle)));
return sz;
}
static Size
AioHandleIOVShmemSize(void)
{
/* each IO handle can have up to io_max_combine_limit iovec objects */
return mul_size(sizeof(struct iovec),
mul_size(mul_size(io_max_combine_limit, AioProcs()),
io_max_concurrency));
}
static Size
AioHandleDataShmemSize(void)
{
/* each buffer referenced by an iovec can have associated data */
return mul_size(sizeof(uint64),
mul_size(mul_size(io_max_combine_limit, AioProcs()),
io_max_concurrency));
}
/*
* Choose a suitable value for io_max_concurrency.
*
* It's unlikely that we could have more IOs in flight than buffers that we
* would be allowed to pin.
*
* On the upper end, apply a cap too - just because shared_buffers is large,
* it doesn't make sense have millions of buffers undergo IO concurrently.
*/
static int
AioChooseMaxConcurrency(void)
{
uint32 max_backends;
int max_proportional_pins;
/* Similar logic to LimitAdditionalPins() */
max_backends = MaxBackends + NUM_AUXILIARY_PROCS;
max_proportional_pins = NBuffers / max_backends;
max_proportional_pins = Max(max_proportional_pins, 1);
/* apply upper limit */
return Min(max_proportional_pins, 64);
}
Size
AioShmemSize(void)
{
Size sz = 0;
/*
* We prefer to report this value's source as PGC_S_DYNAMIC_DEFAULT.
* However, if the DBA explicitly set io_max_concurrency = -1 in the
* config file, then PGC_S_DYNAMIC_DEFAULT will fail to override that and
* we must force the matter with PGC_S_OVERRIDE.
*/
if (io_max_concurrency == -1)
{
char buf[32];
snprintf(buf, sizeof(buf), "%d", AioChooseMaxConcurrency());
SetConfigOption("io_max_concurrency", buf, PGC_POSTMASTER,
PGC_S_DYNAMIC_DEFAULT);
if (io_max_concurrency == -1) /* failed to apply it? */
SetConfigOption("io_max_concurrency", buf, PGC_POSTMASTER,
PGC_S_OVERRIDE);
}
sz = add_size(sz, AioCtlShmemSize());
sz = add_size(sz, AioBackendShmemSize());
sz = add_size(sz, AioHandleShmemSize());
sz = add_size(sz, AioHandleIOVShmemSize());
sz = add_size(sz, AioHandleDataShmemSize());
/* Reserve space for method specific resources. */
if (pgaio_method_ops->shmem_size)
sz = add_size(sz, pgaio_method_ops->shmem_size());
return sz;
}
void
AioShmemInit(void)
{
bool found;
uint32 io_handle_off = 0;
uint32 iovec_off = 0;
uint32 per_backend_iovecs = io_max_concurrency * io_max_combine_limit;
pgaio_ctl = (PgAioCtl *)
ShmemInitStruct("AioCtl", AioCtlShmemSize(), &found);
if (found)
goto out;
memset(pgaio_ctl, 0, AioCtlShmemSize());
pgaio_ctl->io_handle_count = AioProcs() * io_max_concurrency;
pgaio_ctl->iovec_count = AioProcs() * per_backend_iovecs;
pgaio_ctl->backend_state = (PgAioBackend *)
ShmemInitStruct("AioBackend", AioBackendShmemSize(), &found);
pgaio_ctl->io_handles = (PgAioHandle *)
ShmemInitStruct("AioHandle", AioHandleShmemSize(), &found);
pgaio_ctl->iovecs = (struct iovec *)
ShmemInitStruct("AioHandleIOV", AioHandleIOVShmemSize(), &found);
pgaio_ctl->handle_data = (uint64 *)
ShmemInitStruct("AioHandleData", AioHandleDataShmemSize(), &found);
for (int procno = 0; procno < AioProcs(); procno++)
{
PgAioBackend *bs = &pgaio_ctl->backend_state[procno];
bs->io_handle_off = io_handle_off;
io_handle_off += io_max_concurrency;
dclist_init(&bs->idle_ios);
memset(bs->staged_ios, 0, sizeof(PgAioHandle *) * PGAIO_SUBMIT_BATCH_SIZE);
dclist_init(&bs->in_flight_ios);
/* initialize per-backend IOs */
for (int i = 0; i < io_max_concurrency; i++)
{
PgAioHandle *ioh = &pgaio_ctl->io_handles[bs->io_handle_off + i];
ioh->generation = 1;
ioh->owner_procno = procno;
ioh->iovec_off = iovec_off;
ioh->handle_data_len = 0;
ioh->report_return = NULL;
ioh->resowner = NULL;
ioh->num_callbacks = 0;
ioh->distilled_result.status = PGAIO_RS_UNKNOWN;
ioh->flags = 0;
ConditionVariableInit(&ioh->cv);
dclist_push_tail(&bs->idle_ios, &ioh->node);
iovec_off += io_max_combine_limit;
}
}
out:
/* Initialize IO method specific resources. */
if (pgaio_method_ops->shmem_init)
pgaio_method_ops->shmem_init(!found);
}
void
pgaio_init_backend(void)
{
/* shouldn't be initialized twice */
Assert(!pgaio_my_backend);
if (MyBackendType == B_IO_WORKER)
return;
if (MyProc == NULL || MyProcNumber >= AioProcs())
elog(ERROR, "aio requires a normal PGPROC");
pgaio_my_backend = &pgaio_ctl->backend_state[MyProcNumber];
if (pgaio_method_ops->init_backend)
pgaio_method_ops->init_backend();
before_shmem_exit(pgaio_shutdown, 0);
}
|