aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/libpqwalreceiver/libpqwalreceiver.c')
-rw-r--r--src/backend/replication/libpqwalreceiver/libpqwalreceiver.c217
1 files changed, 194 insertions, 23 deletions
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 7671b166ed3..7df3698afb2 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -24,9 +24,11 @@
#include "access/xlog.h"
#include "miscadmin.h"
#include "pgstat.h"
+#include "replication/logicalproto.h"
#include "replication/walreceiver.h"
#include "storage/proc.h"
#include "utils/builtins.h"
+#include "utils/pg_lsn.h"
PG_MODULE_MAGIC;
@@ -44,26 +46,35 @@ struct WalReceiverConn
/* Prototypes for interface functions */
static WalReceiverConn *libpqrcv_connect(const char *conninfo,
- bool logical, const char *appname);
+ bool logical, const char *appname,
+ char **err);
+static void libpqrcv_check_conninfo(const char *conninfo);
static char *libpqrcv_get_conninfo(WalReceiverConn *conn);
static char *libpqrcv_identify_system(WalReceiverConn *conn,
- TimeLineID *primary_tli);
+ TimeLineID *primary_tli,
+ int *server_version);
static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
TimeLineID tli, char **filename,
char **content, int *len);
static bool libpqrcv_startstreaming(WalReceiverConn *conn,
- TimeLineID tli, XLogRecPtr startpoint,
- const char *slotname);
+ const WalRcvStreamOptions *options);
static void libpqrcv_endstreaming(WalReceiverConn *conn,
TimeLineID *next_tli);
static int libpqrcv_receive(WalReceiverConn *conn, char **buffer,
pgsocket *wait_fd);
static void libpqrcv_send(WalReceiverConn *conn, const char *buffer,
int nbytes);
+static char *libpqrcv_create_slot(WalReceiverConn *conn,
+ const char *slotname,
+ bool temporary,
+ XLogRecPtr *lsn);
+static bool libpqrcv_command(WalReceiverConn *conn,
+ const char *cmd, char **err);
static void libpqrcv_disconnect(WalReceiverConn *conn);
static WalReceiverFunctionsType PQWalReceiverFunctions = {
libpqrcv_connect,
+ libpqrcv_check_conninfo,
libpqrcv_get_conninfo,
libpqrcv_identify_system,
libpqrcv_readtimelinehistoryfile,
@@ -71,11 +82,14 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = {
libpqrcv_endstreaming,
libpqrcv_receive,
libpqrcv_send,
+ libpqrcv_create_slot,
+ libpqrcv_command,
libpqrcv_disconnect
};
/* Prototypes for private functions */
static PGresult *libpqrcv_PQexec(PGconn *streamConn, const char *query);
+static char *stringlist_to_identifierstr(PGconn *conn, List *strings);
/*
* Module initialization function
@@ -90,9 +104,12 @@ _PG_init(void)
/*
* Establish the connection to the primary server for XLOG streaming
+ *
+ * Returns NULL on error and fills the err with palloc'ed error message.
*/
static WalReceiverConn *
-libpqrcv_connect(const char *conninfo, bool logical, const char *appname)
+libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
+ char **err)
{
WalReceiverConn *conn;
const char *keys[5];
@@ -123,15 +140,35 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname)
conn = palloc0(sizeof(WalReceiverConn));
conn->streamConn = PQconnectdbParams(keys, vals, /* expand_dbname = */ true);
if (PQstatus(conn->streamConn) != CONNECTION_OK)
- ereport(ERROR,
- (errmsg("could not connect to the primary server: %s",
- PQerrorMessage(conn->streamConn))));
+ {
+ *err = pstrdup(PQerrorMessage(conn->streamConn));
+ return NULL;
+ }
+
conn->logical = logical;
return conn;
}
/*
+ * Validate connection info string (just try to parse it)
+ */
+static void
+libpqrcv_check_conninfo(const char *conninfo)
+{
+ PQconninfoOption *opts = NULL;
+ char *err = NULL;
+
+ opts = PQconninfoParse(conninfo, &err);
+ if (opts == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("invalid connection string syntax: %s", err)));
+
+ PQconninfoFree(opts);
+}
+
+/*
* Return a user-displayable conninfo string. Any security-sensitive fields
* are obfuscated.
*/
@@ -185,7 +222,8 @@ libpqrcv_get_conninfo(WalReceiverConn *conn)
* timeline ID of the primary.
*/
static char *
-libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli)
+libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli,
+ int *server_version)
{
PGresult *res;
char *primary_sysid;
@@ -218,11 +256,13 @@ libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli)
*primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0);
PQclear(res);
+ *server_version = PQserverVersion(conn->streamConn);
+
return primary_sysid;
}
/*
- * Start streaming WAL data from given startpoint and timeline.
+ * Start streaming WAL data from given streaming options.
*
* Returns true if we switched successfully to copy-both mode. False
* means the server received the command and executed it successfully, but
@@ -233,27 +273,54 @@ libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli)
*/
static bool
libpqrcv_startstreaming(WalReceiverConn *conn,
- TimeLineID tli, XLogRecPtr startpoint,
- const char *slotname)
+ const WalRcvStreamOptions *options)
{
StringInfoData cmd;
PGresult *res;
- Assert(!conn->logical);
+ Assert(options->logical == conn->logical);
+ Assert(options->slotname || !options->logical);
initStringInfo(&cmd);
- /* Start streaming from the point requested by startup process */
- if (slotname != NULL)
- appendStringInfo(&cmd,
- "START_REPLICATION SLOT \"%s\" %X/%X TIMELINE %u",
- slotname,
- (uint32) (startpoint >> 32), (uint32) startpoint,
- tli);
+ /* Build the command. */
+ appendStringInfoString(&cmd, "START_REPLICATION");
+ if (options->slotname != NULL)
+ appendStringInfo(&cmd, " SLOT \"%s\"",
+ options->slotname);
+
+ if (options->logical)
+ appendStringInfo(&cmd, " LOGICAL");
+
+ appendStringInfo(&cmd, " %X/%X",
+ (uint32) (options->startpoint >> 32),
+ (uint32) options->startpoint);
+
+ /*
+ * Additional options are different depending on if we are doing logical
+ * or physical replication.
+ */
+ if (options->logical)
+ {
+ char *pubnames_str;
+ List *pubnames;
+
+ appendStringInfoString(&cmd, " (");
+ appendStringInfo(&cmd, "proto_version '%u'",
+ options->proto.logical.proto_version);
+ pubnames = options->proto.logical.publication_names;
+ pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames);
+ appendStringInfo(&cmd, ", publication_names %s",
+ PQescapeLiteral(conn->streamConn, pubnames_str,
+ strlen(pubnames_str)));
+ appendStringInfoChar(&cmd, ')');
+ pfree(pubnames_str);
+ }
else
- appendStringInfo(&cmd, "START_REPLICATION %X/%X TIMELINE %u",
- (uint32) (startpoint >> 32), (uint32) startpoint,
- tli);
+ appendStringInfo(&cmd, " TIMELINE %u",
+ options->proto.physical.startpointTLI);
+
+ /* Start streaming. */
res = libpqrcv_PQexec(conn->streamConn, cmd.data);
pfree(cmd.data);
@@ -577,3 +644,107 @@ libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes)
(errmsg("could not send data to WAL stream: %s",
PQerrorMessage(conn->streamConn))));
}
+
+/*
+ * Create new replication slot.
+ * Returns the name of the exported snapshot for logical slot or NULL for
+ * physical slot.
+ */
+static char *
+libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
+ bool temporary, XLogRecPtr *lsn)
+{
+ PGresult *res;
+ StringInfoData cmd;
+ char *snapshot;
+
+ initStringInfo(&cmd);
+
+ appendStringInfo(&cmd, "CREATE_REPLICATION_SLOT \"%s\" ", slotname);
+
+ if (temporary)
+ appendStringInfo(&cmd, "TEMPORARY ");
+
+ if (conn->logical)
+ appendStringInfo(&cmd, "LOGICAL pgoutput");
+
+ res = libpqrcv_PQexec(conn->streamConn, cmd.data);
+ pfree(cmd.data);
+
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ PQclear(res);
+ ereport(ERROR,
+ (errmsg("could not create replication slot \"%s\": %s",
+ slotname, PQerrorMessage(conn->streamConn))));
+ }
+
+ *lsn = DatumGetLSN(DirectFunctionCall1Coll(pg_lsn_in, InvalidOid,
+ CStringGetDatum(PQgetvalue(res, 0, 1))));
+ if (!PQgetisnull(res, 0, 2))
+ snapshot = pstrdup(PQgetvalue(res, 0, 2));
+ else
+ snapshot = NULL;
+
+ PQclear(res);
+
+ return snapshot;
+}
+
+/*
+ * Run command.
+ *
+ * Returns if the command has succeeded and fills the err with palloced
+ * error message if not.
+ */
+static bool
+libpqrcv_command(WalReceiverConn *conn, const char *cmd, char **err)
+{
+ PGresult *res;
+
+ res = libpqrcv_PQexec(conn->streamConn, cmd);
+
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ {
+ PQclear(res);
+ *err = pstrdup(PQerrorMessage(conn->streamConn));
+ return false;
+ }
+
+ PQclear(res);
+
+ return true;
+}
+
+/*
+ * Given a List of strings, return it as single comma separated
+ * string, quoting identifiers as needed.
+ *
+ * This is essentially the reverse of SplitIdentifierString.
+ *
+ * The caller should free the result.
+ */
+static char *
+stringlist_to_identifierstr(PGconn *conn, List *strings)
+{
+ ListCell *lc;
+ StringInfoData res;
+ bool first = true;
+
+ initStringInfo(&res);
+
+ foreach (lc, strings)
+ {
+ char *val = strVal(lfirst(lc));
+
+ if (first)
+ first = false;
+ else
+ appendStringInfoChar(&res, ',');
+
+ appendStringInfoString(&res,
+ PQescapeIdentifier(conn, val, strlen(val)));
+ }
+
+ return res.data;
+}