aboutsummaryrefslogtreecommitdiff
path: root/src/include/libpq/libpq-be-fe-helpers.h
blob: 16205b824fa55de8c55a4b8de495817b32a0fb10 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
/*-------------------------------------------------------------------------
 *
 * libpq-be-fe-helpers.h
 *	  Helper functions for using libpq in extensions
 *
 * Code built directly into the backend is not allowed to link to libpq
 * directly. Extension code is allowed to use libpq however. However, libpq
 * used in extensions has to be careful not to block inside libpq, otherwise
 * interrupts will not be processed, leading to issues like unresolvable
 * deadlocks. Backend code also needs to take care to acquire/release an
 * external fd for the connection, otherwise fd.c's accounting of fd's is
 * broken.
 *
 * This file provides helper functions to make it easier to comply with these
 * rules. It is a header only library as it needs to be linked into each
 * extension using libpq, and it seems too small to be worth adding a
 * dedicated static library for.
 *
 * TODO: For historical reasons the connections established here are not put
 * into non-blocking mode. That can lead to blocking even when only the async
 * libpq functions are used. This should be fixed.
 *
 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 * src/include/libpq/libpq-be-fe-helpers.h
 *
 *-------------------------------------------------------------------------
 */
#ifndef LIBPQ_BE_FE_HELPERS_H
#define LIBPQ_BE_FE_HELPERS_H

/*
 * Despite the name, BUILDING_DLL is set only when building code directly part
 * of the backend. Which also is where libpq isn't allowed to be
 * used. Obviously this doesn't protect against libpq-fe.h getting included
 * otherwise, but perhaps still protects against a few mistakes...
 */
#ifdef BUILDING_DLL
#error "libpq may not be used code directly built into the backend"
#endif

#include "libpq-fe.h"
#include "miscadmin.h"
#include "storage/fd.h"
#include "storage/latch.h"
#include "utils/timestamp.h"
#include "utils/wait_event.h"


static inline void libpqsrv_connect_prepare(void);
static inline void libpqsrv_connect_internal(PGconn *conn, uint32 wait_event_info);
static inline PGresult *libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info);
static inline PGresult *libpqsrv_get_result(PGconn *conn, uint32 wait_event_info);


/*
 * PQconnectdb() wrapper that reserves a file descriptor and processes
 * interrupts during connection establishment.
 *
 * Throws an error if AcquireExternalFD() fails, but does not throw if
 * connection establishment itself fails. Callers need to use PQstatus() to
 * check if connection establishment succeeded.
 */
static inline PGconn *
libpqsrv_connect(const char *conninfo, uint32 wait_event_info)
{
	PGconn	   *conn = NULL;

	libpqsrv_connect_prepare();

	conn = PQconnectStart(conninfo);

	libpqsrv_connect_internal(conn, wait_event_info);

	return conn;
}

/*
 * Like libpqsrv_connect(), except that this is a wrapper for
 * PQconnectdbParams().
  */
static inline PGconn *
libpqsrv_connect_params(const char *const *keywords,
						const char *const *values,
						int expand_dbname,
						uint32 wait_event_info)
{
	PGconn	   *conn = NULL;

	libpqsrv_connect_prepare();

	conn = PQconnectStartParams(keywords, values, expand_dbname);

	libpqsrv_connect_internal(conn, wait_event_info);

	return conn;
}

/*
 * PQfinish() wrapper that additionally releases the reserved file descriptor.
 *
 * It is allowed to call this with a NULL pgconn iff NULL was returned by
 * libpqsrv_connect*.
 */
static inline void
libpqsrv_disconnect(PGconn *conn)
{
	/*
	 * If no connection was established, we haven't reserved an FD for it (or
	 * already released it). This rule makes it easier to write PG_CATCH()
	 * handlers for this facility's users.
	 *
	 * See also libpqsrv_connect_internal().
	 */
	if (conn == NULL)
		return;

	ReleaseExternalFD();
	PQfinish(conn);
}


/* internal helper functions follow */


/*
 * Helper function for all connection establishment functions.
 */
static inline void
libpqsrv_connect_prepare(void)
{
	/*
	 * We must obey fd.c's limit on non-virtual file descriptors.  Assume that
	 * a PGconn represents one long-lived FD.  (Doing this here also ensures
	 * that VFDs are closed if needed to make room.)
	 */
	if (!AcquireExternalFD())
	{
#ifndef WIN32					/* can't write #if within ereport() macro */
		ereport(ERROR,
				(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
				 errmsg("could not establish connection"),
				 errdetail("There are too many open files on the local server."),
				 errhint("Raise the server's \"max_files_per_process\" and/or \"ulimit -n\" limits.")));
#else
		ereport(ERROR,
				(errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
				 errmsg("could not establish connection"),
				 errdetail("There are too many open files on the local server."),
				 errhint("Raise the server's \"max_files_per_process\" setting.")));
#endif
	}
}

/*
 * Helper function for all connection establishment functions.
 */
static inline void
libpqsrv_connect_internal(PGconn *conn, uint32 wait_event_info)
{
	/*
	 * With conn == NULL libpqsrv_disconnect() wouldn't release the FD. So do
	 * that here.
	 */
	if (conn == NULL)
	{
		ReleaseExternalFD();
		return;
	}

	/*
	 * Can't wait without a socket. Note that we don't want to close the libpq
	 * connection yet, so callers can emit a useful error.
	 */
	if (PQstatus(conn) == CONNECTION_BAD)
		return;

	/*
	 * WaitLatchOrSocket() can conceivably fail, handle that case here instead
	 * of requiring all callers to do so.
	 */
	PG_TRY();
	{
		PostgresPollingStatusType status;

		/*
		 * Poll connection until we have OK or FAILED status.
		 *
		 * Per spec for PQconnectPoll, first wait till socket is write-ready.
		 */
		status = PGRES_POLLING_WRITING;
		while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED)
		{
			int			io_flag;
			int			rc;

			if (status == PGRES_POLLING_READING)
				io_flag = WL_SOCKET_READABLE;
#ifdef WIN32

			/*
			 * Windows needs a different test while waiting for
			 * connection-made
			 */
			else if (PQstatus(conn) == CONNECTION_STARTED)
				io_flag = WL_SOCKET_CONNECTED;
#endif
			else
				io_flag = WL_SOCKET_WRITEABLE;

			rc = WaitLatchOrSocket(MyLatch,
								   WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag,
								   PQsocket(conn),
								   0,
								   wait_event_info);

			/* Interrupted? */
			if (rc & WL_LATCH_SET)
			{
				ResetLatch(MyLatch);
				CHECK_FOR_INTERRUPTS();
			}

			/* If socket is ready, advance the libpq state machine */
			if (rc & io_flag)
				status = PQconnectPoll(conn);
		}
	}
	PG_CATCH();
	{
		/*
		 * If an error is thrown here, the callers won't call
		 * libpqsrv_disconnect() with a conn, so release resources
		 * immediately.
		 */
		ReleaseExternalFD();
		PQfinish(conn);

		PG_RE_THROW();
	}
	PG_END_TRY();
}

/*
 * PQexec() wrapper that processes interrupts.
 *
 * Unless PQsetnonblocking(conn, 1) is in effect, this can't process
 * interrupts while pushing the query text to the server.  Consider that
 * setting if query strings can be long relative to TCP buffer size.
 *
 * This has the preconditions of PQsendQuery(), not those of PQexec().  Most
 * notably, PQexec() would silently discard any prior query results.
 */
static inline PGresult *
libpqsrv_exec(PGconn *conn, const char *query, uint32 wait_event_info)
{
	if (!PQsendQuery(conn, query))
		return NULL;
	return libpqsrv_get_result_last(conn, wait_event_info);
}

/*
 * PQexecParams() wrapper that processes interrupts.
 *
 * See notes at libpqsrv_exec().
 */
static inline PGresult *
libpqsrv_exec_params(PGconn *conn,
					 const char *command,
					 int nParams,
					 const Oid *paramTypes,
					 const char *const *paramValues,
					 const int *paramLengths,
					 const int *paramFormats,
					 int resultFormat,
					 uint32 wait_event_info)
{
	if (!PQsendQueryParams(conn, command, nParams, paramTypes, paramValues,
						   paramLengths, paramFormats, resultFormat))
		return NULL;
	return libpqsrv_get_result_last(conn, wait_event_info);
}

/*
 * Like PQexec(), loop over PQgetResult() until it returns NULL or another
 * terminal state.  Return the last non-NULL result or the terminal state.
 */
static inline PGresult *
libpqsrv_get_result_last(PGconn *conn, uint32 wait_event_info)
{
	PGresult   *volatile lastResult = NULL;

	/* In what follows, do not leak any PGresults on an error. */
	PG_TRY();
	{
		for (;;)
		{
			/* Wait for, and collect, the next PGresult. */
			PGresult   *result;

			result = libpqsrv_get_result(conn, wait_event_info);
			if (result == NULL)
				break;			/* query is complete, or failure */

			/*
			 * Emulate PQexec()'s behavior of returning the last result when
			 * there are many.
			 */
			PQclear(lastResult);
			lastResult = result;

			if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
				PQresultStatus(lastResult) == PGRES_COPY_OUT ||
				PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
				PQstatus(conn) == CONNECTION_BAD)
				break;
		}
	}
	PG_CATCH();
	{
		PQclear(lastResult);
		PG_RE_THROW();
	}
	PG_END_TRY();

	return lastResult;
}

/*
 * Perform the equivalent of PQgetResult(), but watch for interrupts.
 */
static inline PGresult *
libpqsrv_get_result(PGconn *conn, uint32 wait_event_info)
{
	/*
	 * Collect data until PQgetResult is ready to get the result without
	 * blocking.
	 */
	while (PQisBusy(conn))
	{
		int			rc;

		rc = WaitLatchOrSocket(MyLatch,
							   WL_EXIT_ON_PM_DEATH | WL_LATCH_SET |
							   WL_SOCKET_READABLE,
							   PQsocket(conn),
							   0,
							   wait_event_info);

		/* Interrupted? */
		if (rc & WL_LATCH_SET)
		{
			ResetLatch(MyLatch);
			CHECK_FOR_INTERRUPTS();
		}

		/* Consume whatever data is available from the socket */
		if (PQconsumeInput(conn) == 0)
		{
			/* trouble; expect PQgetResult() to return NULL */
			break;
		}
	}

	/* Now we can collect and return the next PGresult */
	return PQgetResult(conn);
}

/*
 * Submit a cancel request to the given connection, waiting only until
 * the given time.
 *
 * We sleep interruptibly until we receive confirmation that the cancel
 * request has been accepted, and if it is, return NULL; if the cancel
 * request fails, return an error message string (which is not to be
 * freed).
 *
 * For other problems (to wit: OOM when strdup'ing an error message from
 * libpq), this function can ereport(ERROR).
 *
 * Note: this function leaks a string's worth of memory when reporting
 * libpq errors.  Make sure to call it in a transient memory context.
 */
static inline const char *
libpqsrv_cancel(PGconn *conn, TimestampTz endtime)
{
	PGcancelConn *cancel_conn;
	const char *error = NULL;

	cancel_conn = PQcancelCreate(conn);
	if (cancel_conn == NULL)
		return "out of memory";

	/* In what follows, do not leak any PGcancelConn on any errors. */

	PG_TRY();
	{
		if (!PQcancelStart(cancel_conn))
		{
			error = pchomp(PQcancelErrorMessage(cancel_conn));
			goto exit;
		}

		for (;;)
		{
			PostgresPollingStatusType pollres;
			TimestampTz now;
			long		cur_timeout;
			int			waitEvents = WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH;

			pollres = PQcancelPoll(cancel_conn);
			if (pollres == PGRES_POLLING_OK)
				break;			/* success! */

			/* If timeout has expired, give up, else get sleep time. */
			now = GetCurrentTimestamp();
			cur_timeout = TimestampDifferenceMilliseconds(now, endtime);
			if (cur_timeout <= 0)
			{
				error = "cancel request timed out";
				break;
			}

			switch (pollres)
			{
				case PGRES_POLLING_READING:
					waitEvents |= WL_SOCKET_READABLE;
					break;
				case PGRES_POLLING_WRITING:
					waitEvents |= WL_SOCKET_WRITEABLE;
					break;
				default:
					error = pchomp(PQcancelErrorMessage(cancel_conn));
					goto exit;
			}

			/* Sleep until there's something to do */
			WaitLatchOrSocket(MyLatch, waitEvents, PQcancelSocket(cancel_conn),
							  cur_timeout, PG_WAIT_CLIENT);

			ResetLatch(MyLatch);

			CHECK_FOR_INTERRUPTS();
		}
exit:	;
	}
	PG_FINALLY();
	{
		PQcancelFinish(cancel_conn);
	}
	PG_END_TRY();

	return error;
}

#endif							/* LIBPQ_BE_FE_HELPERS_H */