aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/libpqwalreceiver/libpqwalreceiver.c')
-rw-r--r--src/backend/replication/libpqwalreceiver/libpqwalreceiver.c26
1 files changed, 10 insertions, 16 deletions
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 4ee4d7106da..a3bec498fa0 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -52,7 +52,7 @@ static void libpqrcv_readtimelinehistoryfile(TimeLineID tli, char **filename, ch
static bool libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint,
char *slotname);
static void libpqrcv_endstreaming(TimeLineID *next_tli);
-static int libpqrcv_receive(int timeout, char **buffer);
+static int libpqrcv_receive(char **buffer, int *wait_fd);
static void libpqrcv_send(const char *buffer, int nbytes);
static void libpqrcv_disconnect(void);
@@ -463,8 +463,7 @@ libpqrcv_disconnect(void)
}
/*
- * Receive a message available from XLOG stream, blocking for
- * maximum of 'timeout' ms.
+ * Receive a message available from XLOG stream.
*
* Returns:
*
@@ -472,15 +471,15 @@ libpqrcv_disconnect(void)
* point to a buffer holding the received message. The buffer is only valid
* until the next libpqrcv_* call.
*
- * 0 if no data was available within timeout, or wait was interrupted
- * by signal.
+ * If no data was available immediately, returns 0, and *wait_fd is set to a
+ * file descriptor which can be waited on before trying again.
*
* -1 if the server ended the COPY.
*
* ereports on error.
*/
static int
-libpqrcv_receive(int timeout, char **buffer)
+libpqrcv_receive(char **buffer, int *wait_fd)
{
int rawlen;
@@ -492,16 +491,7 @@ libpqrcv_receive(int timeout, char **buffer)
rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
if (rawlen == 0)
{
- /*
- * No data available yet. If the caller requested to block, wait for
- * more data to arrive.
- */
- if (timeout > 0)
- {
- if (!libpq_select(timeout))
- return 0;
- }
-
+ /* Try consuming some data. */
if (PQconsumeInput(streamConn) == 0)
ereport(ERROR,
(errmsg("could not receive data from WAL stream: %s",
@@ -510,7 +500,11 @@ libpqrcv_receive(int timeout, char **buffer)
/* Now that we've consumed some input, try again */
rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
if (rawlen == 0)
+ {
+ /* Tell caller to try again when our socket is ready. */
+ *wait_fd = PQsocket(streamConn);
return 0;
+ }
}
if (rawlen == -1) /* end-of-streaming or error */
{