aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/replication/libpqwalreceiver/libpqwalreceiver.c53
1 files changed, 7 insertions, 46 deletions
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index fefc8660259..560ec974fa7 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -24,6 +24,7 @@
#include "common/connect.h"
#include "funcapi.h"
#include "libpq-fe.h"
+#include "libpq/libpq-be-fe-helpers.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "pgstat.h"
@@ -125,7 +126,6 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
char **err)
{
WalReceiverConn *conn;
- PostgresPollingStatusType status;
const char *keys[6];
const char *vals[6];
int i = 0;
@@ -172,49 +172,10 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
Assert(i < sizeof(keys));
conn = palloc0(sizeof(WalReceiverConn));
- conn->streamConn = PQconnectStartParams(keys, vals,
- /* expand_dbname = */ true);
- if (PQstatus(conn->streamConn) == CONNECTION_BAD)
- goto bad_connection_errmsg;
-
- /*
- * Poll connection until we have OK or FAILED status.
- *
- * Per spec for PQconnectPoll, first wait till socket is write-ready.
- */
- status = PGRES_POLLING_WRITING;
- do
- {
- int io_flag;
- int rc;
-
- if (status == PGRES_POLLING_READING)
- io_flag = WL_SOCKET_READABLE;
-#ifdef WIN32
- /* Windows needs a different test while waiting for connection-made */
- else if (PQstatus(conn->streamConn) == CONNECTION_STARTED)
- io_flag = WL_SOCKET_CONNECTED;
-#endif
- else
- io_flag = WL_SOCKET_WRITEABLE;
-
- rc = WaitLatchOrSocket(MyLatch,
- WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag,
- PQsocket(conn->streamConn),
- 0,
- WAIT_EVENT_LIBPQWALRECEIVER_CONNECT);
-
- /* Interrupted? */
- if (rc & WL_LATCH_SET)
- {
- ResetLatch(MyLatch);
- ProcessWalRcvInterrupts();
- }
-
- /* If socket is ready, advance the libpq state machine */
- if (rc & io_flag)
- status = PQconnectPoll(conn->streamConn);
- } while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED);
+ conn->streamConn =
+ libpqsrv_connect_params(keys, vals,
+ /* expand_dbname = */ true,
+ WAIT_EVENT_LIBPQWALRECEIVER_CONNECT);
if (PQstatus(conn->streamConn) != CONNECTION_OK)
goto bad_connection_errmsg;
@@ -245,7 +206,7 @@ bad_connection_errmsg:
/* error path, error already set */
bad_connection:
- PQfinish(conn->streamConn);
+ libpqsrv_disconnect(conn->streamConn);
pfree(conn);
return NULL;
}
@@ -744,7 +705,7 @@ libpqrcv_PQgetResult(PGconn *streamConn)
static void
libpqrcv_disconnect(WalReceiverConn *conn)
{
- PQfinish(conn->streamConn);
+ libpqsrv_disconnect(conn->streamConn);
PQfreemem(conn->recvBuf);
pfree(conn);
}