aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/libpqwalreceiver/libpqwalreceiver.c64
-rw-r--r--src/backend/replication/walreceiver.c9
-rw-r--r--src/include/replication/walreceiver.h2
3 files changed, 46 insertions, 29 deletions
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 84ab25b0e2a..e6e670e9e4b 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -50,7 +50,7 @@ static void libpqrcv_connect(char *conninfo);
static void libpqrcv_identify_system(TimeLineID *primary_tli);
static void libpqrcv_readtimelinehistoryfile(TimeLineID tli, char **filename, char **content, int *len);
static bool libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint);
-static void libpqrcv_endstreaming(void);
+static void libpqrcv_endstreaming(TimeLineID *next_tli);
static int libpqrcv_receive(int timeout, char **buffer);
static void libpqrcv_send(const char *buffer, int nbytes);
static void libpqrcv_disconnect(void);
@@ -199,10 +199,11 @@ libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint)
}
/*
- * Stop streaming WAL data.
+ * Stop streaming WAL data. Returns the next timeline's ID in *next_tli, as
+ * reported by the server, or 0 if it did not report it.
*/
static void
-libpqrcv_endstreaming(void)
+libpqrcv_endstreaming(TimeLineID *next_tli)
{
PGresult *res;
@@ -211,33 +212,42 @@ libpqrcv_endstreaming(void)
(errmsg("could not send end-of-streaming message to primary: %s",
PQerrorMessage(streamConn))));
- /* Read the command result after COPY is finished */
-
- while ((res = PQgetResult(streamConn)) != NULL)
+ /*
+ * After COPY is finished, we should receive a result set indicating the
+ * next timeline's ID, or just CommandComplete if the server was shut down.
+ *
+ * If we had not yet received CopyDone from the backend, PGRES_COPY_IN
+ * would also be possible. However, at the moment this function is only
+ * called after receiving CopyDone from the backend - the walreceiver
+ * never terminates replication on its own initiative.
+ */
+ res = PQgetResult(streamConn);
+ if (PQresultStatus(res) == PGRES_TUPLES_OK)
{
- /*
- * After Copy, if the streaming ended because we reached end of the
- * timeline, server sends one result set with the next timeline's ID.
- * We don't need it, so just slurp and ignore it.
- *
- * If we had not yet received CopyDone from the backend, PGRES_COPY_IN
- * is also possible. However, at the moment this function is only
- * called after receiving CopyDone from the backend - the walreceiver
- * never terminates replication on its own initiative.
- */
- switch (PQresultStatus(res))
- {
- case PGRES_COMMAND_OK:
- case PGRES_TUPLES_OK:
- break;
-
- default:
- ereport(ERROR,
- (errmsg("error reading result of streaming command: %s",
- PQerrorMessage(streamConn))));
- }
+ /* Read the next timeline's ID */
+ if (PQnfields(res) != 1 || PQntuples(res) != 1)
+ ereport(ERROR,
+ (errmsg("unexpected result set after end-of-streaming")));
+ *next_tli = pg_atoi(PQgetvalue(res, 0, 0), sizeof(uint32), 0);
PQclear(res);
+
+ /* the result set should be followed by CommandComplete */
+ res = PQgetResult(streamConn);
}
+ else
+ *next_tli = 0;
+
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ ereport(ERROR,
+ (errmsg("error reading result of streaming command: %s",
+ PQerrorMessage(streamConn))));
+
+ /* Verify that there are no more results */
+ res = PQgetResult(streamConn);
+ if (res != NULL)
+ ereport(ERROR,
+ (errmsg("unexpected result after CommandComplete: %s",
+ PQerrorMessage(streamConn))));
}
/*
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 444be9463bc..73592973ac6 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -505,8 +505,15 @@ WalReceiverMain(void)
* our side, too.
*/
EnableWalRcvImmediateExit();
- walrcv_endstreaming();
+ walrcv_endstreaming(&primaryTLI);
DisableWalRcvImmediateExit();
+
+ /*
+ * If the server had switched to a new timeline that we didn't know
+ * about when we began streaming, fetch its timeline history file
+ * now.
+ */
+ WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
}
else
ereport(LOG,
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 3afc4a86bbd..72878f84c61 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -128,7 +128,7 @@ extern PGDLLIMPORT walrcv_readtimelinehistoryfile_type walrcv_readtimelinehistor
typedef bool (*walrcv_startstreaming_type) (TimeLineID tli, XLogRecPtr startpoint);
extern PGDLLIMPORT walrcv_startstreaming_type walrcv_startstreaming;
-typedef void (*walrcv_endstreaming_type) (void);
+typedef void (*walrcv_endstreaming_type) (TimeLineID *next_tli);
extern PGDLLIMPORT walrcv_endstreaming_type walrcv_endstreaming;
typedef int (*walrcv_receive_type) (int timeout, char **buffer);