diff options
Diffstat (limited to 'src/backend/port')
-rw-r--r-- | src/backend/port/unix_latch.c | 101 | ||||
-rw-r--r-- | src/backend/port/win32_latch.c | 98 |
2 files changed, 141 insertions, 58 deletions
diff --git a/src/backend/port/unix_latch.c b/src/backend/port/unix_latch.c index 6dae7c94c03..9940a42e33c 100644 --- a/src/backend/port/unix_latch.c +++ b/src/backend/port/unix_latch.c @@ -93,6 +93,7 @@ #endif #include "miscadmin.h" +#include "postmaster/postmaster.h" #include "storage/latch.h" #include "storage/shmem.h" @@ -176,34 +177,44 @@ DisownLatch(volatile Latch *latch) } /* - * Wait for given latch to be set or until timeout is exceeded. - * If the latch is already set, the function returns immediately. + * Wait for a given latch to be set, postmaster death, or until timeout is + * exceeded. 'wakeEvents' is a bitmask that specifies which of those events + * to wait for. If the latch is already set (and WL_LATCH_SET is given), the + * function returns immediately. * - * The 'timeout' is given in microseconds, and -1 means wait forever. - * On some platforms, signals cause the timeout to be restarted, so beware - * that the function can sleep for several times longer than the specified - * timeout. + * The 'timeout' is given in microseconds. It must be >= 0 if WL_TIMEOUT + * event is given, otherwise it is ignored. On some platforms, signals cause + * the timeout to be restarted, so beware that the function can sleep for + * several times longer than the specified timeout. * * The latch must be owned by the current process, ie. it must be a * backend-local latch initialized with InitLatch, or a shared latch * associated with the current process by calling OwnLatch. * - * Returns 'true' if the latch was set, or 'false' if timeout was reached. + * Returns bit field indicating which condition(s) caused the wake-up. Note + * that if multiple wake-up conditions are true, there is no guarantee that + * we return all of them in one call, but we will return at least one. Also, + * according to the select(2) man page on Linux, select(2) may spuriously + * return and report a file descriptor as readable, when it's not. We use + * select(2), so WaitLatch can also spuriously claim that a socket is + * readable, or postmaster has died, even when none of the wake conditions + * have been satisfied. That should be rare in practice, but the caller + * should not use the return value for anything critical, re-checking the + * situation with PostmasterIsAlive() or read() on a socket if necessary. */ -bool -WaitLatch(volatile Latch *latch, long timeout) +int +WaitLatch(volatile Latch *latch, int wakeEvents, long timeout) { - return WaitLatchOrSocket(latch, PGINVALID_SOCKET, false, false, timeout) > 0; + return WaitLatchOrSocket(latch, wakeEvents, PGINVALID_SOCKET, timeout); } /* - * Like WaitLatch, but will also return when there's data available in - * 'sock' for reading or writing. Returns 0 if timeout was reached, - * 1 if the latch was set, 2 if the socket became readable or writable. + * Like WaitLatch, but with an extra socket argument for WL_SOCKET_* + * conditions. */ int -WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, bool forRead, - bool forWrite, long timeout) +WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, pgsocket sock, + long timeout) { struct timeval tv, *tvp = NULL; @@ -212,19 +223,26 @@ WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, bool forRead, int rc; int result = 0; - if (latch->owner_pid != MyProcPid) + /* Ignore WL_SOCKET_* events if no valid socket is given */ + if (sock == PGINVALID_SOCKET) + wakeEvents &= ~(WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE); + + Assert(wakeEvents != 0); /* must have at least one wake event */ + + if ((wakeEvents & WL_LATCH_SET) && latch->owner_pid != MyProcPid) elog(ERROR, "cannot wait on a latch owned by another process"); /* Initialize timeout */ - if (timeout >= 0) + if (wakeEvents & WL_TIMEOUT) { + Assert(timeout >= 0); tv.tv_sec = timeout / 1000000L; tv.tv_usec = timeout % 1000000L; tvp = &tv; } waiting = true; - for (;;) + do { int hifd; @@ -235,16 +253,28 @@ WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, bool forRead, * do that), and the select() will return immediately. */ drainSelfPipe(); - if (latch->is_set) + if ((wakeEvents & WL_LATCH_SET) && latch->is_set) { - result = 1; + result |= WL_LATCH_SET; + /* + * Leave loop immediately, avoid blocking again. We don't attempt + * to report any other events that might also be satisfied. + */ break; } FD_ZERO(&input_mask); FD_SET(selfpipe_readfd, &input_mask); hifd = selfpipe_readfd; - if (sock != PGINVALID_SOCKET && forRead) + + if (wakeEvents & WL_POSTMASTER_DEATH) + { + FD_SET(postmaster_alive_fds[POSTMASTER_FD_WATCH], &input_mask); + if (postmaster_alive_fds[POSTMASTER_FD_WATCH] > hifd) + hifd = postmaster_alive_fds[POSTMASTER_FD_WATCH]; + } + + if (wakeEvents & WL_SOCKET_READABLE) { FD_SET(sock, &input_mask); if (sock > hifd) @@ -252,14 +282,17 @@ WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, bool forRead, } FD_ZERO(&output_mask); - if (sock != PGINVALID_SOCKET && forWrite) + if (wakeEvents & WL_SOCKET_WRITEABLE) { FD_SET(sock, &output_mask); if (sock > hifd) hifd = sock; } + /* Sleep */ rc = select(hifd + 1, &input_mask, &output_mask, NULL, tvp); + + /* Check return code */ if (rc < 0) { if (errno == EINTR) @@ -268,20 +301,26 @@ WaitLatchOrSocket(volatile Latch *latch, pgsocket sock, bool forRead, (errcode_for_socket_access(), errmsg("select() failed: %m"))); } - if (rc == 0) + if (rc == 0 && (wakeEvents & WL_TIMEOUT)) { /* timeout exceeded */ - result = 0; - break; + result |= WL_TIMEOUT; } - if (sock != PGINVALID_SOCKET && - ((forRead && FD_ISSET(sock, &input_mask)) || - (forWrite && FD_ISSET(sock, &output_mask)))) + if ((wakeEvents & WL_SOCKET_READABLE) && FD_ISSET(sock, &input_mask)) { - result = 2; - break; /* data available in socket */ + /* data available in socket */ + result |= WL_SOCKET_READABLE; } - } + if ((wakeEvents & WL_SOCKET_WRITEABLE) && FD_ISSET(sock, &output_mask)) + { + result |= WL_SOCKET_WRITEABLE; + } + if ((wakeEvents & WL_POSTMASTER_DEATH) && + FD_ISSET(postmaster_alive_fds[POSTMASTER_FD_WATCH], &input_mask)) + { + result |= WL_POSTMASTER_DEATH; + } + } while(result == 0); waiting = false; return result; diff --git a/src/backend/port/win32_latch.c b/src/backend/port/win32_latch.c index 4bcf7b7a8f3..ef61b0184d1 100644 --- a/src/backend/port/win32_latch.c +++ b/src/backend/port/win32_latch.c @@ -23,6 +23,7 @@ #include <unistd.h> #include "miscadmin.h" +#include "postmaster/postmaster.h" #include "replication/walsender.h" #include "storage/latch.h" #include "storage/shmem.h" @@ -81,43 +82,67 @@ DisownLatch(volatile Latch *latch) latch->owner_pid = 0; } -bool -WaitLatch(volatile Latch *latch, long timeout) +int +WaitLatch(volatile Latch *latch, int wakeEvents, long timeout) { - return WaitLatchOrSocket(latch, PGINVALID_SOCKET, false, false, timeout) > 0; + return WaitLatchOrSocket(latch, wakeEvents, PGINVALID_SOCKET, timeout); } int -WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, bool forRead, - bool forWrite, long timeout) +WaitLatchOrSocket(volatile Latch *latch, int wakeEvents, SOCKET sock, + long timeout) { DWORD rc; - HANDLE events[3]; + HANDLE events[4]; HANDLE latchevent; - HANDLE sockevent = WSA_INVALID_EVENT; /* silence compiler */ + HANDLE sockevent = WSA_INVALID_EVENT; int numevents; int result = 0; + int pmdeath_eventno; + long timeout_ms; + + Assert(wakeEvents != 0); + + /* Ignore WL_SOCKET_* events if no valid socket is given */ + if (sock == PGINVALID_SOCKET) + wakeEvents &= ~(WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE); + + /* Convert timeout to milliseconds for WaitForMultipleObjects() */ + if (wakeEvents & WL_TIMEOUT) + { + Assert(timeout >= 0); + timeout_ms = timeout / 1000; + } + else + timeout_ms = INFINITE; + /* Construct an array of event handles for WaitforMultipleObjects() */ latchevent = latch->event; events[0] = latchevent; events[1] = pgwin32_signal_event; numevents = 2; - if (sock != PGINVALID_SOCKET && (forRead || forWrite)) + if (((wakeEvents & WL_SOCKET_READABLE) || + (wakeEvents & WL_SOCKET_WRITEABLE))) { int flags = 0; - if (forRead) + if (wakeEvents & WL_SOCKET_READABLE) flags |= FD_READ; - if (forWrite) + if (wakeEvents & WL_SOCKET_WRITEABLE) flags |= FD_WRITE; sockevent = WSACreateEvent(); WSAEventSelect(sock, sockevent, flags); events[numevents++] = sockevent; } + if (wakeEvents & WL_POSTMASTER_DEATH) + { + pmdeath_eventno = numevents; + events[numevents++] = PostmasterHandle; + } - for (;;) + do { /* * Reset the event, and check if the latch is set already. If someone @@ -127,45 +152,64 @@ WaitLatchOrSocket(volatile Latch *latch, SOCKET sock, bool forRead, */ if (!ResetEvent(latchevent)) elog(ERROR, "ResetEvent failed: error code %d", (int) GetLastError()); - if (latch->is_set) + if (latch->is_set && (wakeEvents & WL_LATCH_SET)) { - result = 1; + result |= WL_LATCH_SET; + /* + * Leave loop immediately, avoid blocking again. We don't attempt + * to report any other events that might also be satisfied. + */ break; } - rc = WaitForMultipleObjects(numevents, events, FALSE, - (timeout >= 0) ? (timeout / 1000) : INFINITE); + rc = WaitForMultipleObjects(numevents, events, FALSE, timeout_ms); + if (rc == WAIT_FAILED) elog(ERROR, "WaitForMultipleObjects() failed: error code %d", (int) GetLastError()); + + /* Participate in Windows signal emulation */ + else if (rc == WAIT_OBJECT_0 + 1) + pgwin32_dispatch_queued_signals(); + + else if ((wakeEvents & WL_POSTMASTER_DEATH) && + rc == WAIT_OBJECT_0 + pmdeath_eventno) + { + /* Postmaster died */ + result |= WL_POSTMASTER_DEATH; + } else if (rc == WAIT_TIMEOUT) { - result = 0; - break; + result |= WL_TIMEOUT; } - else if (rc == WAIT_OBJECT_0 + 1) - pgwin32_dispatch_queued_signals(); - else if (rc == WAIT_OBJECT_0 + 2) + else if ((wakeEvents & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) != 0 && + rc == WAIT_OBJECT_0 + 2) /* socket is at event slot 2 */ { WSANETWORKEVENTS resEvents; - Assert(sock != PGINVALID_SOCKET); - ZeroMemory(&resEvents, sizeof(resEvents)); if (WSAEnumNetworkEvents(sock, sockevent, &resEvents) == SOCKET_ERROR) ereport(FATAL, (errmsg_internal("failed to enumerate network events: %i", (int) GetLastError()))); - if ((forRead && resEvents.lNetworkEvents & FD_READ) || - (forWrite && resEvents.lNetworkEvents & FD_WRITE)) - result = 2; - break; + if ((wakeEvents & WL_SOCKET_READABLE) && + (resEvents.lNetworkEvents & FD_READ)) + { + result |= WL_SOCKET_READABLE; + } + if ((wakeEvents & WL_SOCKET_WRITEABLE) && + (resEvents.lNetworkEvents & FD_WRITE)) + { + result |= WL_SOCKET_WRITEABLE; + } } + /* Otherwise it must be the latch event */ else if (rc != WAIT_OBJECT_0) elog(ERROR, "unexpected return code from WaitForMultipleObjects(): %d", (int) rc); } + while(result == 0); /* Clean up the handle we created for the socket */ - if (sock != PGINVALID_SOCKET && (forRead || forWrite)) + if (sockevent != WSA_INVALID_EVENT) { WSAEventSelect(sock, sockevent, 0); WSACloseEvent(sockevent); |