diff options
Diffstat (limited to 'src/backend/replication/libpqwalreceiver/libpqwalreceiver.c')
-rw-r--r-- | src/backend/replication/libpqwalreceiver/libpqwalreceiver.c | 26 |
1 files changed, 10 insertions, 16 deletions
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 4ee4d7106da..a3bec498fa0 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -52,7 +52,7 @@ static void libpqrcv_readtimelinehistoryfile(TimeLineID tli, char **filename, ch static bool libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint, char *slotname); static void libpqrcv_endstreaming(TimeLineID *next_tli); -static int libpqrcv_receive(int timeout, char **buffer); +static int libpqrcv_receive(char **buffer, int *wait_fd); static void libpqrcv_send(const char *buffer, int nbytes); static void libpqrcv_disconnect(void); @@ -463,8 +463,7 @@ libpqrcv_disconnect(void) } /* - * Receive a message available from XLOG stream, blocking for - * maximum of 'timeout' ms. + * Receive a message available from XLOG stream. * * Returns: * @@ -472,15 +471,15 @@ libpqrcv_disconnect(void) * point to a buffer holding the received message. The buffer is only valid * until the next libpqrcv_* call. * - * 0 if no data was available within timeout, or wait was interrupted - * by signal. + * If no data was available immediately, returns 0, and *wait_fd is set to a + * file descriptor which can be waited on before trying again. * * -1 if the server ended the COPY. * * ereports on error. */ static int -libpqrcv_receive(int timeout, char **buffer) +libpqrcv_receive(char **buffer, int *wait_fd) { int rawlen; @@ -492,16 +491,7 @@ libpqrcv_receive(int timeout, char **buffer) rawlen = PQgetCopyData(streamConn, &recvBuf, 1); if (rawlen == 0) { - /* - * No data available yet. If the caller requested to block, wait for - * more data to arrive. - */ - if (timeout > 0) - { - if (!libpq_select(timeout)) - return 0; - } - + /* Try consuming some data. */ if (PQconsumeInput(streamConn) == 0) ereport(ERROR, (errmsg("could not receive data from WAL stream: %s", @@ -510,7 +500,11 @@ libpqrcv_receive(int timeout, char **buffer) /* Now that we've consumed some input, try again */ rawlen = PQgetCopyData(streamConn, &recvBuf, 1); if (rawlen == 0) + { + /* Tell caller to try again when our socket is ready. */ + *wait_fd = PQsocket(streamConn); return 0; + } } if (rawlen == -1) /* end-of-streaming or error */ { |