diff options
Diffstat (limited to 'src/backend')
-rw-r--r-- | src/backend/replication/libpqwalreceiver/libpqwalreceiver.c | 22 | ||||
-rw-r--r-- | src/backend/replication/walreceiver.c | 3 | ||||
-rw-r--r-- | src/backend/replication/walsender.c | 4 |
3 files changed, 24 insertions, 5 deletions
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index f66a6b46b98..d1ab36755f4 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -50,6 +50,7 @@ static char *recvBuf = NULL; static bool libpqrcv_connect(char *conninfo, XLogRecPtr startpoint); static bool libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len); +static void libpqrcv_send(const char *buffer, int nbytes); static void libpqrcv_disconnect(void); /* Prototypes for private functions */ @@ -64,10 +65,11 @@ _PG_init(void) { /* Tell walreceiver how to reach us */ if (walrcv_connect != NULL || walrcv_receive != NULL || - walrcv_disconnect != NULL) + walrcv_send != NULL || walrcv_disconnect != NULL) elog(ERROR, "libpqwalreceiver already loaded"); walrcv_connect = libpqrcv_connect; walrcv_receive = libpqrcv_receive; + walrcv_send = libpqrcv_send; walrcv_disconnect = libpqrcv_disconnect; } @@ -157,7 +159,7 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint) snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X", startpoint.xlogid, startpoint.xrecoff); res = libpqrcv_PQexec(cmd); - if (PQresultStatus(res) != PGRES_COPY_OUT) + if (PQresultStatus(res) != PGRES_COPY_BOTH) { PQclear(res); ereport(ERROR, @@ -303,6 +305,7 @@ libpqrcv_PQexec(const char *query) if (PQresultStatus(lastResult) == PGRES_COPY_IN || PQresultStatus(lastResult) == PGRES_COPY_OUT || + PQresultStatus(lastResult) == PGRES_COPY_BOTH || PQstatus(streamConn) == CONNECTION_BAD) break; } @@ -398,3 +401,18 @@ libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len) return true; } + +/* + * Send a message to XLOG stream. + * + * ereports on error. + */ +static void +libpqrcv_send(const char *buffer, int nbytes) +{ + if (PQputCopyData(streamConn, buffer, nbytes) <= 0 || + PQflush(streamConn)) + ereport(ERROR, + (errmsg("could not send data to WAL stream: %s", + PQerrorMessage(streamConn)))); +} diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index a49ff6c896b..fac3be340f9 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -57,6 +57,7 @@ bool am_walreceiver; /* libpqreceiver hooks to these when loaded */ walrcv_connect_type walrcv_connect = NULL; walrcv_receive_type walrcv_receive = NULL; +walrcv_send_type walrcv_send = NULL; walrcv_disconnect_type walrcv_disconnect = NULL; #define NAPTIME_PER_CYCLE 100 /* max sleep time between cycles (100ms) */ @@ -247,7 +248,7 @@ WalReceiverMain(void) /* Load the libpq-specific functions */ load_file("libpqwalreceiver", false); if (walrcv_connect == NULL || walrcv_receive == NULL || - walrcv_disconnect == NULL) + walrcv_send == NULL || walrcv_disconnect == NULL) elog(ERROR, "libpqwalreceiver didn't initialize correctly"); /* diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index d2b9e5c5f9a..c8d2433158e 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -287,8 +287,8 @@ WalSndHandshake(void) (errcode(ERRCODE_CANNOT_CONNECT_NOW), errmsg("standby connections not allowed because wal_level=minimal"))); - /* Send a CopyOutResponse message, and start streaming */ - pq_beginmessage(&buf, 'H'); + /* Send a CopyBothResponse message, and start streaming */ + pq_beginmessage(&buf, 'W'); pq_sendbyte(&buf, 0); pq_sendint(&buf, 0, 2); pq_endmessage(&buf); |