diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/replication/libpqwalreceiver/libpqwalreceiver.c | 64 | ||||
-rw-r--r-- | src/backend/replication/walreceiver.c | 9 | ||||
-rw-r--r-- | src/include/replication/walreceiver.h | 2 |
3 files changed, 46 insertions, 29 deletions
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 84ab25b0e2a..e6e670e9e4b 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -50,7 +50,7 @@ static void libpqrcv_connect(char *conninfo); static void libpqrcv_identify_system(TimeLineID *primary_tli); static void libpqrcv_readtimelinehistoryfile(TimeLineID tli, char **filename, char **content, int *len); static bool libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint); -static void libpqrcv_endstreaming(void); +static void libpqrcv_endstreaming(TimeLineID *next_tli); static int libpqrcv_receive(int timeout, char **buffer); static void libpqrcv_send(const char *buffer, int nbytes); static void libpqrcv_disconnect(void); @@ -199,10 +199,11 @@ libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint) } /* - * Stop streaming WAL data. + * Stop streaming WAL data. Returns the next timeline's ID in *next_tli, as + * reported by the server, or 0 if it did not report it. */ static void -libpqrcv_endstreaming(void) +libpqrcv_endstreaming(TimeLineID *next_tli) { PGresult *res; @@ -211,33 +212,42 @@ libpqrcv_endstreaming(void) (errmsg("could not send end-of-streaming message to primary: %s", PQerrorMessage(streamConn)))); - /* Read the command result after COPY is finished */ - - while ((res = PQgetResult(streamConn)) != NULL) + /* + * After COPY is finished, we should receive a result set indicating the + * next timeline's ID, or just CommandComplete if the server was shut down. + * + * If we had not yet received CopyDone from the backend, PGRES_COPY_IN + * would also be possible. However, at the moment this function is only + * called after receiving CopyDone from the backend - the walreceiver + * never terminates replication on its own initiative. + */ + res = PQgetResult(streamConn); + if (PQresultStatus(res) == PGRES_TUPLES_OK) { - /* - * After Copy, if the streaming ended because we reached end of the - * timeline, server sends one result set with the next timeline's ID. - * We don't need it, so just slurp and ignore it. - * - * If we had not yet received CopyDone from the backend, PGRES_COPY_IN - * is also possible. However, at the moment this function is only - * called after receiving CopyDone from the backend - the walreceiver - * never terminates replication on its own initiative. - */ - switch (PQresultStatus(res)) - { - case PGRES_COMMAND_OK: - case PGRES_TUPLES_OK: - break; - - default: - ereport(ERROR, - (errmsg("error reading result of streaming command: %s", - PQerrorMessage(streamConn)))); - } + /* Read the next timeline's ID */ + if (PQnfields(res) != 1 || PQntuples(res) != 1) + ereport(ERROR, + (errmsg("unexpected result set after end-of-streaming"))); + *next_tli = pg_atoi(PQgetvalue(res, 0, 0), sizeof(uint32), 0); PQclear(res); + + /* the result set should be followed by CommandComplete */ + res = PQgetResult(streamConn); } + else + *next_tli = 0; + + if (PQresultStatus(res) != PGRES_COMMAND_OK) + ereport(ERROR, + (errmsg("error reading result of streaming command: %s", + PQerrorMessage(streamConn)))); + + /* Verify that there are no more results */ + res = PQgetResult(streamConn); + if (res != NULL) + ereport(ERROR, + (errmsg("unexpected result after CommandComplete: %s", + PQerrorMessage(streamConn)))); } /* diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 444be9463bc..73592973ac6 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -505,8 +505,15 @@ WalReceiverMain(void) * our side, too. */ EnableWalRcvImmediateExit(); - walrcv_endstreaming(); + walrcv_endstreaming(&primaryTLI); DisableWalRcvImmediateExit(); + + /* + * If the server had switched to a new timeline that we didn't know + * about when we began streaming, fetch its timeline history file + * now. + */ + WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI); } else ereport(LOG, diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 3afc4a86bbd..72878f84c61 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -128,7 +128,7 @@ extern PGDLLIMPORT walrcv_readtimelinehistoryfile_type walrcv_readtimelinehistor typedef bool (*walrcv_startstreaming_type) (TimeLineID tli, XLogRecPtr startpoint); extern PGDLLIMPORT walrcv_startstreaming_type walrcv_startstreaming; -typedef void (*walrcv_endstreaming_type) (void); +typedef void (*walrcv_endstreaming_type) (TimeLineID *next_tli); extern PGDLLIMPORT walrcv_endstreaming_type walrcv_endstreaming; typedef int (*walrcv_receive_type) (int timeout, char **buffer); |