diff options
Diffstat (limited to 'src/backend/replication/libpqwalreceiver/libpqwalreceiver.c')
-rw-r--r-- | src/backend/replication/libpqwalreceiver/libpqwalreceiver.c | 217 |
1 files changed, 194 insertions, 23 deletions
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 7671b166ed3..7df3698afb2 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -24,9 +24,11 @@ #include "access/xlog.h" #include "miscadmin.h" #include "pgstat.h" +#include "replication/logicalproto.h" #include "replication/walreceiver.h" #include "storage/proc.h" #include "utils/builtins.h" +#include "utils/pg_lsn.h" PG_MODULE_MAGIC; @@ -44,26 +46,35 @@ struct WalReceiverConn /* Prototypes for interface functions */ static WalReceiverConn *libpqrcv_connect(const char *conninfo, - bool logical, const char *appname); + bool logical, const char *appname, + char **err); +static void libpqrcv_check_conninfo(const char *conninfo); static char *libpqrcv_get_conninfo(WalReceiverConn *conn); static char *libpqrcv_identify_system(WalReceiverConn *conn, - TimeLineID *primary_tli); + TimeLineID *primary_tli, + int *server_version); static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn, TimeLineID tli, char **filename, char **content, int *len); static bool libpqrcv_startstreaming(WalReceiverConn *conn, - TimeLineID tli, XLogRecPtr startpoint, - const char *slotname); + const WalRcvStreamOptions *options); static void libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli); static int libpqrcv_receive(WalReceiverConn *conn, char **buffer, pgsocket *wait_fd); static void libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes); +static char *libpqrcv_create_slot(WalReceiverConn *conn, + const char *slotname, + bool temporary, + XLogRecPtr *lsn); +static bool libpqrcv_command(WalReceiverConn *conn, + const char *cmd, char **err); static void libpqrcv_disconnect(WalReceiverConn *conn); static WalReceiverFunctionsType PQWalReceiverFunctions = { libpqrcv_connect, + libpqrcv_check_conninfo, libpqrcv_get_conninfo, libpqrcv_identify_system, libpqrcv_readtimelinehistoryfile, @@ -71,11 +82,14 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = { libpqrcv_endstreaming, libpqrcv_receive, libpqrcv_send, + libpqrcv_create_slot, + libpqrcv_command, libpqrcv_disconnect }; /* Prototypes for private functions */ static PGresult *libpqrcv_PQexec(PGconn *streamConn, const char *query); +static char *stringlist_to_identifierstr(PGconn *conn, List *strings); /* * Module initialization function @@ -90,9 +104,12 @@ _PG_init(void) /* * Establish the connection to the primary server for XLOG streaming + * + * Returns NULL on error and fills the err with palloc'ed error message. */ static WalReceiverConn * -libpqrcv_connect(const char *conninfo, bool logical, const char *appname) +libpqrcv_connect(const char *conninfo, bool logical, const char *appname, + char **err) { WalReceiverConn *conn; const char *keys[5]; @@ -123,15 +140,35 @@ libpqrcv_connect(const char *conninfo, bool logical, const char *appname) conn = palloc0(sizeof(WalReceiverConn)); conn->streamConn = PQconnectdbParams(keys, vals, /* expand_dbname = */ true); if (PQstatus(conn->streamConn) != CONNECTION_OK) - ereport(ERROR, - (errmsg("could not connect to the primary server: %s", - PQerrorMessage(conn->streamConn)))); + { + *err = pstrdup(PQerrorMessage(conn->streamConn)); + return NULL; + } + conn->logical = logical; return conn; } /* + * Validate connection info string (just try to parse it) + */ +static void +libpqrcv_check_conninfo(const char *conninfo) +{ + PQconninfoOption *opts = NULL; + char *err = NULL; + + opts = PQconninfoParse(conninfo, &err); + if (opts == NULL) + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("invalid connection string syntax: %s", err))); + + PQconninfoFree(opts); +} + +/* * Return a user-displayable conninfo string. Any security-sensitive fields * are obfuscated. */ @@ -185,7 +222,8 @@ libpqrcv_get_conninfo(WalReceiverConn *conn) * timeline ID of the primary. */ static char * -libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli) +libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli, + int *server_version) { PGresult *res; char *primary_sysid; @@ -218,11 +256,13 @@ libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli) *primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0); PQclear(res); + *server_version = PQserverVersion(conn->streamConn); + return primary_sysid; } /* - * Start streaming WAL data from given startpoint and timeline. + * Start streaming WAL data from given streaming options. * * Returns true if we switched successfully to copy-both mode. False * means the server received the command and executed it successfully, but @@ -233,27 +273,54 @@ libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli) */ static bool libpqrcv_startstreaming(WalReceiverConn *conn, - TimeLineID tli, XLogRecPtr startpoint, - const char *slotname) + const WalRcvStreamOptions *options) { StringInfoData cmd; PGresult *res; - Assert(!conn->logical); + Assert(options->logical == conn->logical); + Assert(options->slotname || !options->logical); initStringInfo(&cmd); - /* Start streaming from the point requested by startup process */ - if (slotname != NULL) - appendStringInfo(&cmd, - "START_REPLICATION SLOT \"%s\" %X/%X TIMELINE %u", - slotname, - (uint32) (startpoint >> 32), (uint32) startpoint, - tli); + /* Build the command. */ + appendStringInfoString(&cmd, "START_REPLICATION"); + if (options->slotname != NULL) + appendStringInfo(&cmd, " SLOT \"%s\"", + options->slotname); + + if (options->logical) + appendStringInfo(&cmd, " LOGICAL"); + + appendStringInfo(&cmd, " %X/%X", + (uint32) (options->startpoint >> 32), + (uint32) options->startpoint); + + /* + * Additional options are different depending on if we are doing logical + * or physical replication. + */ + if (options->logical) + { + char *pubnames_str; + List *pubnames; + + appendStringInfoString(&cmd, " ("); + appendStringInfo(&cmd, "proto_version '%u'", + options->proto.logical.proto_version); + pubnames = options->proto.logical.publication_names; + pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames); + appendStringInfo(&cmd, ", publication_names %s", + PQescapeLiteral(conn->streamConn, pubnames_str, + strlen(pubnames_str))); + appendStringInfoChar(&cmd, ')'); + pfree(pubnames_str); + } else - appendStringInfo(&cmd, "START_REPLICATION %X/%X TIMELINE %u", - (uint32) (startpoint >> 32), (uint32) startpoint, - tli); + appendStringInfo(&cmd, " TIMELINE %u", + options->proto.physical.startpointTLI); + + /* Start streaming. */ res = libpqrcv_PQexec(conn->streamConn, cmd.data); pfree(cmd.data); @@ -577,3 +644,107 @@ libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes) (errmsg("could not send data to WAL stream: %s", PQerrorMessage(conn->streamConn)))); } + +/* + * Create new replication slot. + * Returns the name of the exported snapshot for logical slot or NULL for + * physical slot. + */ +static char * +libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname, + bool temporary, XLogRecPtr *lsn) +{ + PGresult *res; + StringInfoData cmd; + char *snapshot; + + initStringInfo(&cmd); + + appendStringInfo(&cmd, "CREATE_REPLICATION_SLOT \"%s\" ", slotname); + + if (temporary) + appendStringInfo(&cmd, "TEMPORARY "); + + if (conn->logical) + appendStringInfo(&cmd, "LOGICAL pgoutput"); + + res = libpqrcv_PQexec(conn->streamConn, cmd.data); + pfree(cmd.data); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + PQclear(res); + ereport(ERROR, + (errmsg("could not create replication slot \"%s\": %s", + slotname, PQerrorMessage(conn->streamConn)))); + } + + *lsn = DatumGetLSN(DirectFunctionCall1Coll(pg_lsn_in, InvalidOid, + CStringGetDatum(PQgetvalue(res, 0, 1)))); + if (!PQgetisnull(res, 0, 2)) + snapshot = pstrdup(PQgetvalue(res, 0, 2)); + else + snapshot = NULL; + + PQclear(res); + + return snapshot; +} + +/* + * Run command. + * + * Returns if the command has succeeded and fills the err with palloced + * error message if not. + */ +static bool +libpqrcv_command(WalReceiverConn *conn, const char *cmd, char **err) +{ + PGresult *res; + + res = libpqrcv_PQexec(conn->streamConn, cmd); + + if (PQresultStatus(res) != PGRES_COMMAND_OK) + { + PQclear(res); + *err = pstrdup(PQerrorMessage(conn->streamConn)); + return false; + } + + PQclear(res); + + return true; +} + +/* + * Given a List of strings, return it as single comma separated + * string, quoting identifiers as needed. + * + * This is essentially the reverse of SplitIdentifierString. + * + * The caller should free the result. + */ +static char * +stringlist_to_identifierstr(PGconn *conn, List *strings) +{ + ListCell *lc; + StringInfoData res; + bool first = true; + + initStringInfo(&res); + + foreach (lc, strings) + { + char *val = strVal(lfirst(lc)); + + if (first) + first = false; + else + appendStringInfoChar(&res, ','); + + appendStringInfoString(&res, + PQescapeIdentifier(conn, val, strlen(val))); + } + + return res.data; +} |