aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/replication/libpqwalreceiver/libpqwalreceiver.c35
1 files changed, 17 insertions, 18 deletions
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index e7581160cc9..8f1c592f48b 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -41,7 +41,6 @@ void _PG_init(void);
/* Current connection to the primary, if any */
static PGconn *streamConn = NULL;
-static bool justconnected = false;
/* Buffer for currently read records */
static char *recvBuf = NULL;
@@ -166,7 +165,6 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
}
PQclear(res);
- justconnected = true;
ereport(LOG,
(errmsg("streaming replication successfully connected to primary")));
@@ -318,7 +316,6 @@ libpqrcv_disconnect(void)
{
PQfinish(streamConn);
streamConn = NULL;
- justconnected = false;
}
/*
@@ -348,28 +345,30 @@ libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len)
PQfreemem(recvBuf);
recvBuf = NULL;
- /*
- * If the caller requested to block, wait for data to arrive. But if this
- * is the first call after connecting, don't wait, because there might
- * already be some data in libpq buffer that we haven't returned to
- * caller.
- */
- if (timeout > 0 && !justconnected)
+ /* Try to receive a CopyData message */
+ rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
+ if (rawlen == 0)
{
- if (!libpq_select(timeout))
- return false;
+ /*
+ * No data available yet. If the caller requested to block, wait for
+ * more data to arrive.
+ */
+ if (timeout > 0)
+ {
+ if (!libpq_select(timeout))
+ return false;
+ }
if (PQconsumeInput(streamConn) == 0)
ereport(ERROR,
(errmsg("could not receive data from WAL stream: %s",
PQerrorMessage(streamConn))));
- }
- justconnected = false;
- /* Receive CopyData message */
- rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
- if (rawlen == 0) /* no data available yet, then return */
- return false;
+ /* Now that we've consumed some input, try again */
+ rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
+ if (rawlen == 0)
+ return false;
+ }
if (rawlen == -1) /* end-of-streaming or error */
{
PGresult *res;