aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/libpqwalreceiver/libpqwalreceiver.c')
-rw-r--r--src/backend/replication/libpqwalreceiver/libpqwalreceiver.c202
1 files changed, 150 insertions, 52 deletions
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index bfaebeae842..180d96b6a69 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -46,9 +46,12 @@ static PGconn *streamConn = NULL;
static char *recvBuf = NULL;
/* Prototypes for interface functions */
-static bool libpqrcv_connect(char *conninfo, XLogRecPtr startpoint);
-static bool libpqrcv_receive(int timeout, unsigned char *type,
- char **buffer, int *len);
+static void libpqrcv_connect(char *conninfo);
+static void libpqrcv_identify_system(TimeLineID *primary_tli);
+static void libpqrcv_readtimelinehistoryfile(TimeLineID tli, char **filename, char **content, int *len);
+static bool libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint);
+static void libpqrcv_endstreaming(void);
+static int libpqrcv_receive(int timeout, char **buffer);
static void libpqrcv_send(const char *buffer, int nbytes);
static void libpqrcv_disconnect(void);
@@ -63,10 +66,17 @@ void
_PG_init(void)
{
/* Tell walreceiver how to reach us */
- if (walrcv_connect != NULL || walrcv_receive != NULL ||
- walrcv_send != NULL || walrcv_disconnect != NULL)
+ if (walrcv_connect != NULL || walrcv_identify_system != NULL ||
+ walrcv_readtimelinehistoryfile != NULL ||
+ walrcv_startstreaming != NULL || walrcv_endstreaming != NULL ||
+ walrcv_receive != NULL || walrcv_send != NULL ||
+ walrcv_disconnect != NULL)
elog(ERROR, "libpqwalreceiver already loaded");
walrcv_connect = libpqrcv_connect;
+ walrcv_identify_system = libpqrcv_identify_system;
+ walrcv_readtimelinehistoryfile = libpqrcv_readtimelinehistoryfile;
+ walrcv_startstreaming = libpqrcv_startstreaming;
+ walrcv_endstreaming = libpqrcv_endstreaming;
walrcv_receive = libpqrcv_receive;
walrcv_send = libpqrcv_send;
walrcv_disconnect = libpqrcv_disconnect;
@@ -75,16 +85,10 @@ _PG_init(void)
/*
* Establish the connection to the primary server for XLOG streaming
*/
-static bool
-libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
+static void
+libpqrcv_connect(char *conninfo)
{
char conninfo_repl[MAXCONNINFO + 75];
- char *primary_sysid;
- char standby_sysid[32];
- TimeLineID primary_tli;
- TimeLineID standby_tli;
- PGresult *res;
- char cmd[64];
/*
* Connect using deliberately undocumented parameter: replication. The
@@ -100,6 +104,18 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
ereport(ERROR,
(errmsg("could not connect to the primary server: %s",
PQerrorMessage(streamConn))));
+}
+
+/*
+ * Check that primary's system identifier matches ours, and fetch the current
+ * timeline ID of the primary.
+ */
+static void
+libpqrcv_identify_system(TimeLineID *primary_tli)
+{
+ PGresult *res;
+ char *primary_sysid;
+ char standby_sysid[32];
/*
* Get the system identifier and timeline ID as a DataRow message from the
@@ -126,7 +142,7 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
ntuples, nfields)));
}
primary_sysid = PQgetvalue(res, 0, 0);
- primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0);
+ *primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0);
/*
* Confirm that the system identifier of the primary is the same as ours.
@@ -141,24 +157,37 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
errdetail("The primary's identifier is %s, the standby's identifier is %s.",
primary_sysid, standby_sysid)));
}
-
- /*
- * Confirm that the current timeline of the primary is the same as the
- * recovery target timeline.
- */
- standby_tli = GetRecoveryTargetTLI();
PQclear(res);
- if (primary_tli != standby_tli)
- ereport(ERROR,
- (errmsg("timeline %u of the primary does not match recovery target timeline %u",
- primary_tli, standby_tli)));
- ThisTimeLineID = primary_tli;
+}
+
+/*
+ * Start streaming WAL data from given startpoint and timeline.
+ *
+ * Returns true if we switched successfully to copy-both mode. False
+ * means the server received the command and executed it successfully, but
+ * didn't switch to copy-mode. That means that there was no WAL on the
+ * requested timeline and starting point, because the server switched to
+ * another timeline at or before the requested starting point. On failure,
+ * throws an ERROR.
+ */
+static bool
+libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint)
+{
+ char cmd[64];
+ PGresult *res;
/* Start streaming from the point requested by startup process */
- snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X",
- (uint32) (startpoint >> 32), (uint32) startpoint);
+ snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X TIMELINE %u",
+ (uint32) (startpoint >> 32), (uint32) startpoint,
+ tli);
res = libpqrcv_PQexec(cmd);
- if (PQresultStatus(res) != PGRES_COPY_BOTH)
+
+ if (PQresultStatus(res) == PGRES_COMMAND_OK)
+ {
+ PQclear(res);
+ return false;
+ }
+ else if (PQresultStatus(res) != PGRES_COPY_BOTH)
{
PQclear(res);
ereport(ERROR,
@@ -166,11 +195,81 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
PQerrorMessage(streamConn))));
}
PQclear(res);
+ return true;
+}
+
+/*
+ * Stop streaming WAL data.
+ */
+static void
+libpqrcv_endstreaming(void)
+{
+ PGresult *res;
+
+ if (PQputCopyEnd(streamConn, NULL) <= 0 || PQflush(streamConn))
+ ereport(ERROR,
+ (errmsg("could not send end-of-streaming message to primary: %s",
+ PQerrorMessage(streamConn))));
- ereport(LOG,
- (errmsg("streaming replication successfully connected to primary")));
+ /* Read the command result after COPY is finished */
- return true;
+ while ((res = PQgetResult(streamConn)) != NULL)
+ {
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ ereport(ERROR,
+ (errmsg("error reading result of streaming command: %s",
+ PQerrorMessage(streamConn))));
+ /*
+ * If we had not yet received CopyDone from the backend, PGRES_COPY_IN
+ * is also possible. However, at the moment this function is only
+ * called after receiving CopyDone from the backend - the walreceiver
+ * never terminates replication on its own initiative.
+ */
+
+ PQclear(res);
+ }
+}
+
+/*
+ * Fetch the timeline history file for 'tli' from primary.
+ */
+static void
+libpqrcv_readtimelinehistoryfile(TimeLineID tli,
+ char **filename, char **content, int *len)
+{
+ PGresult *res;
+ char cmd[64];
+
+ /*
+ * Request the primary to send over the history file for given timeline.
+ */
+ snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u", tli);
+ res = libpqrcv_PQexec(cmd);
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ PQclear(res);
+ ereport(ERROR,
+ (errmsg("could not receive timeline history file from "
+ "the primary server: %s",
+ PQerrorMessage(streamConn))));
+ }
+ if (PQnfields(res) != 2 || PQntuples(res) != 1)
+ {
+ int ntuples = PQntuples(res);
+ int nfields = PQnfields(res);
+
+ PQclear(res);
+ ereport(ERROR,
+ (errmsg("invalid response from primary server"),
+ errdetail("Expected 1 tuple with 2 fields, got %d tuples with %d fields.",
+ ntuples, nfields)));
+ }
+ *filename = pstrdup(PQgetvalue(res, 0, 0));
+
+ *len = PQgetlength(res, 0, 1);
+ *content = palloc(*len);
+ memcpy(*content, PQgetvalue(res, 0, 1), *len);
+ PQclear(res);
}
/*
@@ -327,20 +426,19 @@ libpqrcv_disconnect(void)
*
* Returns:
*
- * True if data was received. *type, *buffer and *len are set to
- * the type of the received data, buffer holding it, and length,
- * respectively.
+ * If data was received, returns the length of the data. *buffer is set to
+ * point to a buffer holding the received message. The buffer is only valid
+ * until the next libpqrcv_* call.
*
- * False if no data was available within timeout, or wait was interrupted
+ * 0 if no data was available within timeout, or wait was interrupted
* by signal.
*
- * The buffer returned is only valid until the next call of this function or
- * libpq_connect/disconnect.
+ * -1 if the server ended the COPY.
*
* ereports on error.
*/
-static bool
-libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len)
+static int
+libpqrcv_receive(int timeout, char **buffer)
{
int rawlen;
@@ -359,7 +457,7 @@ libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len)
if (timeout > 0)
{
if (!libpq_select(timeout))
- return false;
+ return 0;
}
if (PQconsumeInput(streamConn) == 0)
@@ -370,23 +468,26 @@ libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len)
/* Now that we've consumed some input, try again */
rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
if (rawlen == 0)
- return false;
+ return 0;
}
if (rawlen == -1) /* end-of-streaming or error */
{
PGresult *res;
res = PQgetResult(streamConn);
- if (PQresultStatus(res) == PGRES_COMMAND_OK)
+ if (PQresultStatus(res) == PGRES_COMMAND_OK ||
+ PQresultStatus(res) == PGRES_COPY_IN)
+ {
+ PQclear(res);
+ return -1;
+ }
+ else
{
PQclear(res);
ereport(ERROR,
- (errmsg("replication terminated by primary server")));
+ (errmsg("could not receive data from WAL stream: %s",
+ PQerrorMessage(streamConn))));
}
- PQclear(res);
- ereport(ERROR,
- (errmsg("could not receive data from WAL stream: %s",
- PQerrorMessage(streamConn))));
}
if (rawlen < -1)
ereport(ERROR,
@@ -394,11 +495,8 @@ libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len)
PQerrorMessage(streamConn))));
/* Return received messages to caller */
- *type = *((unsigned char *) recvBuf);
- *buffer = recvBuf + sizeof(*type);
- *len = rawlen - sizeof(*type);
-
- return true;
+ *buffer = recvBuf;
+ return rawlen;
}
/*