aboutsummaryrefslogtreecommitdiff
path: root/src/backend
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend')
-rw-r--r--src/backend/replication/libpqwalreceiver/libpqwalreceiver.c22
-rw-r--r--src/backend/replication/walreceiver.c3
-rw-r--r--src/backend/replication/walsender.c4
3 files changed, 24 insertions, 5 deletions
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index f66a6b46b98..d1ab36755f4 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -50,6 +50,7 @@ static char *recvBuf = NULL;
static bool libpqrcv_connect(char *conninfo, XLogRecPtr startpoint);
static bool libpqrcv_receive(int timeout, unsigned char *type,
char **buffer, int *len);
+static void libpqrcv_send(const char *buffer, int nbytes);
static void libpqrcv_disconnect(void);
/* Prototypes for private functions */
@@ -64,10 +65,11 @@ _PG_init(void)
{
/* Tell walreceiver how to reach us */
if (walrcv_connect != NULL || walrcv_receive != NULL ||
- walrcv_disconnect != NULL)
+ walrcv_send != NULL || walrcv_disconnect != NULL)
elog(ERROR, "libpqwalreceiver already loaded");
walrcv_connect = libpqrcv_connect;
walrcv_receive = libpqrcv_receive;
+ walrcv_send = libpqrcv_send;
walrcv_disconnect = libpqrcv_disconnect;
}
@@ -157,7 +159,7 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X",
startpoint.xlogid, startpoint.xrecoff);
res = libpqrcv_PQexec(cmd);
- if (PQresultStatus(res) != PGRES_COPY_OUT)
+ if (PQresultStatus(res) != PGRES_COPY_BOTH)
{
PQclear(res);
ereport(ERROR,
@@ -303,6 +305,7 @@ libpqrcv_PQexec(const char *query)
if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
PQresultStatus(lastResult) == PGRES_COPY_OUT ||
+ PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
PQstatus(streamConn) == CONNECTION_BAD)
break;
}
@@ -398,3 +401,18 @@ libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len)
return true;
}
+
+/*
+ * Send a message to XLOG stream.
+ *
+ * ereports on error.
+ */
+static void
+libpqrcv_send(const char *buffer, int nbytes)
+{
+ if (PQputCopyData(streamConn, buffer, nbytes) <= 0 ||
+ PQflush(streamConn))
+ ereport(ERROR,
+ (errmsg("could not send data to WAL stream: %s",
+ PQerrorMessage(streamConn))));
+}
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index a49ff6c896b..fac3be340f9 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -57,6 +57,7 @@ bool am_walreceiver;
/* libpqreceiver hooks to these when loaded */
walrcv_connect_type walrcv_connect = NULL;
walrcv_receive_type walrcv_receive = NULL;
+walrcv_send_type walrcv_send = NULL;
walrcv_disconnect_type walrcv_disconnect = NULL;
#define NAPTIME_PER_CYCLE 100 /* max sleep time between cycles (100ms) */
@@ -247,7 +248,7 @@ WalReceiverMain(void)
/* Load the libpq-specific functions */
load_file("libpqwalreceiver", false);
if (walrcv_connect == NULL || walrcv_receive == NULL ||
- walrcv_disconnect == NULL)
+ walrcv_send == NULL || walrcv_disconnect == NULL)
elog(ERROR, "libpqwalreceiver didn't initialize correctly");
/*
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index d2b9e5c5f9a..c8d2433158e 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -287,8 +287,8 @@ WalSndHandshake(void)
(errcode(ERRCODE_CANNOT_CONNECT_NOW),
errmsg("standby connections not allowed because wal_level=minimal")));
- /* Send a CopyOutResponse message, and start streaming */
- pq_beginmessage(&buf, 'H');
+ /* Send a CopyBothResponse message, and start streaming */
+ pq_beginmessage(&buf, 'W');
pq_sendbyte(&buf, 0);
pq_sendint(&buf, 0, 2);
pq_endmessage(&buf);