diff options
Diffstat (limited to 'src/backend/replication/libpqwalreceiver/libpqwalreceiver.c')
-rw-r--r-- | src/backend/replication/libpqwalreceiver/libpqwalreceiver.c | 202 |
1 files changed, 150 insertions, 52 deletions
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index bfaebeae842..180d96b6a69 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -46,9 +46,12 @@ static PGconn *streamConn = NULL; static char *recvBuf = NULL; /* Prototypes for interface functions */ -static bool libpqrcv_connect(char *conninfo, XLogRecPtr startpoint); -static bool libpqrcv_receive(int timeout, unsigned char *type, - char **buffer, int *len); +static void libpqrcv_connect(char *conninfo); +static void libpqrcv_identify_system(TimeLineID *primary_tli); +static void libpqrcv_readtimelinehistoryfile(TimeLineID tli, char **filename, char **content, int *len); +static bool libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint); +static void libpqrcv_endstreaming(void); +static int libpqrcv_receive(int timeout, char **buffer); static void libpqrcv_send(const char *buffer, int nbytes); static void libpqrcv_disconnect(void); @@ -63,10 +66,17 @@ void _PG_init(void) { /* Tell walreceiver how to reach us */ - if (walrcv_connect != NULL || walrcv_receive != NULL || - walrcv_send != NULL || walrcv_disconnect != NULL) + if (walrcv_connect != NULL || walrcv_identify_system != NULL || + walrcv_readtimelinehistoryfile != NULL || + walrcv_startstreaming != NULL || walrcv_endstreaming != NULL || + walrcv_receive != NULL || walrcv_send != NULL || + walrcv_disconnect != NULL) elog(ERROR, "libpqwalreceiver already loaded"); walrcv_connect = libpqrcv_connect; + walrcv_identify_system = libpqrcv_identify_system; + walrcv_readtimelinehistoryfile = libpqrcv_readtimelinehistoryfile; + walrcv_startstreaming = libpqrcv_startstreaming; + walrcv_endstreaming = libpqrcv_endstreaming; walrcv_receive = libpqrcv_receive; walrcv_send = libpqrcv_send; walrcv_disconnect = libpqrcv_disconnect; @@ -75,16 +85,10 @@ _PG_init(void) /* * Establish the connection to the primary server for XLOG streaming */ -static bool -libpqrcv_connect(char *conninfo, XLogRecPtr startpoint) +static void +libpqrcv_connect(char *conninfo) { char conninfo_repl[MAXCONNINFO + 75]; - char *primary_sysid; - char standby_sysid[32]; - TimeLineID primary_tli; - TimeLineID standby_tli; - PGresult *res; - char cmd[64]; /* * Connect using deliberately undocumented parameter: replication. The @@ -100,6 +104,18 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint) ereport(ERROR, (errmsg("could not connect to the primary server: %s", PQerrorMessage(streamConn)))); +} + +/* + * Check that primary's system identifier matches ours, and fetch the current + * timeline ID of the primary. + */ +static void +libpqrcv_identify_system(TimeLineID *primary_tli) +{ + PGresult *res; + char *primary_sysid; + char standby_sysid[32]; /* * Get the system identifier and timeline ID as a DataRow message from the @@ -126,7 +142,7 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint) ntuples, nfields))); } primary_sysid = PQgetvalue(res, 0, 0); - primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0); + *primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0); /* * Confirm that the system identifier of the primary is the same as ours. @@ -141,24 +157,37 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint) errdetail("The primary's identifier is %s, the standby's identifier is %s.", primary_sysid, standby_sysid))); } - - /* - * Confirm that the current timeline of the primary is the same as the - * recovery target timeline. - */ - standby_tli = GetRecoveryTargetTLI(); PQclear(res); - if (primary_tli != standby_tli) - ereport(ERROR, - (errmsg("timeline %u of the primary does not match recovery target timeline %u", - primary_tli, standby_tli))); - ThisTimeLineID = primary_tli; +} + +/* + * Start streaming WAL data from given startpoint and timeline. + * + * Returns true if we switched successfully to copy-both mode. False + * means the server received the command and executed it successfully, but + * didn't switch to copy-mode. That means that there was no WAL on the + * requested timeline and starting point, because the server switched to + * another timeline at or before the requested starting point. On failure, + * throws an ERROR. + */ +static bool +libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint) +{ + char cmd[64]; + PGresult *res; /* Start streaming from the point requested by startup process */ - snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X", - (uint32) (startpoint >> 32), (uint32) startpoint); + snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X TIMELINE %u", + (uint32) (startpoint >> 32), (uint32) startpoint, + tli); res = libpqrcv_PQexec(cmd); - if (PQresultStatus(res) != PGRES_COPY_BOTH) + + if (PQresultStatus(res) == PGRES_COMMAND_OK) + { + PQclear(res); + return false; + } + else if (PQresultStatus(res) != PGRES_COPY_BOTH) { PQclear(res); ereport(ERROR, @@ -166,11 +195,81 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint) PQerrorMessage(streamConn)))); } PQclear(res); + return true; +} + +/* + * Stop streaming WAL data. + */ +static void +libpqrcv_endstreaming(void) +{ + PGresult *res; + + if (PQputCopyEnd(streamConn, NULL) <= 0 || PQflush(streamConn)) + ereport(ERROR, + (errmsg("could not send end-of-streaming message to primary: %s", + PQerrorMessage(streamConn)))); - ereport(LOG, - (errmsg("streaming replication successfully connected to primary"))); + /* Read the command result after COPY is finished */ - return true; + while ((res = PQgetResult(streamConn)) != NULL) + { + if (PQresultStatus(res) != PGRES_COMMAND_OK) + ereport(ERROR, + (errmsg("error reading result of streaming command: %s", + PQerrorMessage(streamConn)))); + /* + * If we had not yet received CopyDone from the backend, PGRES_COPY_IN + * is also possible. However, at the moment this function is only + * called after receiving CopyDone from the backend - the walreceiver + * never terminates replication on its own initiative. + */ + + PQclear(res); + } +} + +/* + * Fetch the timeline history file for 'tli' from primary. + */ +static void +libpqrcv_readtimelinehistoryfile(TimeLineID tli, + char **filename, char **content, int *len) +{ + PGresult *res; + char cmd[64]; + + /* + * Request the primary to send over the history file for given timeline. + */ + snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u", tli); + res = libpqrcv_PQexec(cmd); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + PQclear(res); + ereport(ERROR, + (errmsg("could not receive timeline history file from " + "the primary server: %s", + PQerrorMessage(streamConn)))); + } + if (PQnfields(res) != 2 || PQntuples(res) != 1) + { + int ntuples = PQntuples(res); + int nfields = PQnfields(res); + + PQclear(res); + ereport(ERROR, + (errmsg("invalid response from primary server"), + errdetail("Expected 1 tuple with 2 fields, got %d tuples with %d fields.", + ntuples, nfields))); + } + *filename = pstrdup(PQgetvalue(res, 0, 0)); + + *len = PQgetlength(res, 0, 1); + *content = palloc(*len); + memcpy(*content, PQgetvalue(res, 0, 1), *len); + PQclear(res); } /* @@ -327,20 +426,19 @@ libpqrcv_disconnect(void) * * Returns: * - * True if data was received. *type, *buffer and *len are set to - * the type of the received data, buffer holding it, and length, - * respectively. + * If data was received, returns the length of the data. *buffer is set to + * point to a buffer holding the received message. The buffer is only valid + * until the next libpqrcv_* call. * - * False if no data was available within timeout, or wait was interrupted + * 0 if no data was available within timeout, or wait was interrupted * by signal. * - * The buffer returned is only valid until the next call of this function or - * libpq_connect/disconnect. + * -1 if the server ended the COPY. * * ereports on error. */ -static bool -libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len) +static int +libpqrcv_receive(int timeout, char **buffer) { int rawlen; @@ -359,7 +457,7 @@ libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len) if (timeout > 0) { if (!libpq_select(timeout)) - return false; + return 0; } if (PQconsumeInput(streamConn) == 0) @@ -370,23 +468,26 @@ libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len) /* Now that we've consumed some input, try again */ rawlen = PQgetCopyData(streamConn, &recvBuf, 1); if (rawlen == 0) - return false; + return 0; } if (rawlen == -1) /* end-of-streaming or error */ { PGresult *res; res = PQgetResult(streamConn); - if (PQresultStatus(res) == PGRES_COMMAND_OK) + if (PQresultStatus(res) == PGRES_COMMAND_OK || + PQresultStatus(res) == PGRES_COPY_IN) + { + PQclear(res); + return -1; + } + else { PQclear(res); ereport(ERROR, - (errmsg("replication terminated by primary server"))); + (errmsg("could not receive data from WAL stream: %s", + PQerrorMessage(streamConn)))); } - PQclear(res); - ereport(ERROR, - (errmsg("could not receive data from WAL stream: %s", - PQerrorMessage(streamConn)))); } if (rawlen < -1) ereport(ERROR, @@ -394,11 +495,8 @@ libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len) PQerrorMessage(streamConn)))); /* Return received messages to caller */ - *type = *((unsigned char *) recvBuf); - *buffer = recvBuf + sizeof(*type); - *len = rawlen - sizeof(*type); - - return true; + *buffer = recvBuf; + return rawlen; } /* |