aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/commands/subscriptioncmds.c8
-rw-r--r--src/backend/replication/libpqwalreceiver/libpqwalreceiver.c119
-rw-r--r--src/backend/replication/logical/tablesync.c2
-rw-r--r--src/backend/replication/logical/worker.c2
-rw-r--r--src/backend/replication/walreceiver.c2
-rw-r--r--src/include/replication/walreceiver.h21
6 files changed, 114 insertions, 40 deletions
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index b647a81fc86..a400ba0e40c 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -759,7 +759,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
/* Try to connect to the publisher. */
must_use_password = !superuser_arg(owner) && opts.passwordrequired;
- wrconn = walrcv_connect(conninfo, true, must_use_password,
+ wrconn = walrcv_connect(conninfo, true, true, must_use_password,
stmt->subname, &err);
if (!wrconn)
ereport(ERROR,
@@ -910,7 +910,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
/* Try to connect to the publisher. */
must_use_password = sub->passwordrequired && !sub->ownersuperuser;
- wrconn = walrcv_connect(sub->conninfo, true, must_use_password,
+ wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password,
sub->name, &err);
if (!wrconn)
ereport(ERROR,
@@ -1537,7 +1537,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
/* Try to connect to the publisher. */
must_use_password = sub->passwordrequired && !sub->ownersuperuser;
- wrconn = walrcv_connect(sub->conninfo, true, must_use_password,
+ wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password,
sub->name, &err);
if (!wrconn)
ereport(ERROR,
@@ -1788,7 +1788,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
*/
load_file("libpqwalreceiver", false);
- wrconn = walrcv_connect(conninfo, true, must_use_password,
+ wrconn = walrcv_connect(conninfo, true, true, must_use_password,
subname, &err);
if (wrconn == NULL)
{
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 2439733b55b..9270d7b855b 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -48,7 +48,8 @@ struct WalReceiverConn
/* Prototypes for interface functions */
static WalReceiverConn *libpqrcv_connect(const char *conninfo,
- bool logical, bool must_use_password,
+ bool replication, bool logical,
+ bool must_use_password,
const char *appname, char **err);
static void libpqrcv_check_conninfo(const char *conninfo,
bool must_use_password);
@@ -57,6 +58,7 @@ static void libpqrcv_get_senderinfo(WalReceiverConn *conn,
char **sender_host, int *sender_port);
static char *libpqrcv_identify_system(WalReceiverConn *conn,
TimeLineID *primary_tli);
+static char *libpqrcv_get_dbname_from_conninfo(const char *conninfo);
static int libpqrcv_server_version(WalReceiverConn *conn);
static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
TimeLineID tli, char **filename,
@@ -99,6 +101,7 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = {
.walrcv_send = libpqrcv_send,
.walrcv_create_slot = libpqrcv_create_slot,
.walrcv_alter_slot = libpqrcv_alter_slot,
+ .walrcv_get_dbname_from_conninfo = libpqrcv_get_dbname_from_conninfo,
.walrcv_get_backend_pid = libpqrcv_get_backend_pid,
.walrcv_exec = libpqrcv_exec,
.walrcv_disconnect = libpqrcv_disconnect
@@ -121,7 +124,11 @@ _PG_init(void)
}
/*
- * Establish the connection to the primary server for XLOG streaming
+ * Establish the connection to the primary server.
+ *
+ * This function can be used for both replication and regular connections.
+ * If it is a replication connection, it could be either logical or physical
+ * based on input argument 'logical'.
*
* If an error occurs, this function will normally return NULL and set *err
* to a palloc'ed error message. However, if must_use_password is true and
@@ -132,8 +139,8 @@ _PG_init(void)
* case.
*/
static WalReceiverConn *
-libpqrcv_connect(const char *conninfo, bool logical, bool must_use_password,
- const char *appname, char **err)
+libpqrcv_connect(const char *conninfo, bool replication, bool logical,
+ bool must_use_password, const char *appname, char **err)
{
WalReceiverConn *conn;
PostgresPollingStatusType status;
@@ -156,36 +163,46 @@ libpqrcv_connect(const char *conninfo, bool logical, bool must_use_password,
*/
keys[i] = "dbname";
vals[i] = conninfo;
- keys[++i] = "replication";
- vals[i] = logical ? "database" : "true";
- if (!logical)
+
+ /* We can not have logical without replication */
+ Assert(replication || !logical);
+
+ if (replication)
{
- /*
- * The database name is ignored by the server in replication mode, but
- * specify "replication" for .pgpass lookup.
- */
- keys[++i] = "dbname";
- vals[i] = "replication";
+ keys[++i] = "replication";
+ vals[i] = logical ? "database" : "true";
+
+ if (logical)
+ {
+ /* Tell the publisher to translate to our encoding */
+ keys[++i] = "client_encoding";
+ vals[i] = GetDatabaseEncodingName();
+
+ /*
+ * Force assorted GUC parameters to settings that ensure that the
+ * publisher will output data values in a form that is unambiguous
+ * to the subscriber. (We don't want to modify the subscriber's
+ * GUC settings, since that might surprise user-defined code
+ * running in the subscriber, such as triggers.) This should
+ * match what pg_dump does.
+ */
+ keys[++i] = "options";
+ vals[i] = "-c datestyle=ISO -c intervalstyle=postgres -c extra_float_digits=3";
+ }
+ else
+ {
+ /*
+ * The database name is ignored by the server in replication mode,
+ * but specify "replication" for .pgpass lookup.
+ */
+ keys[++i] = "dbname";
+ vals[i] = "replication";
+ }
}
+
keys[++i] = "fallback_application_name";
vals[i] = appname;
- if (logical)
- {
- /* Tell the publisher to translate to our encoding */
- keys[++i] = "client_encoding";
- vals[i] = GetDatabaseEncodingName();
- /*
- * Force assorted GUC parameters to settings that ensure that the
- * publisher will output data values in a form that is unambiguous to
- * the subscriber. (We don't want to modify the subscriber's GUC
- * settings, since that might surprise user-defined code running in
- * the subscriber, such as triggers.) This should match what pg_dump
- * does.
- */
- keys[++i] = "options";
- vals[i] = "-c datestyle=ISO -c intervalstyle=postgres -c extra_float_digits=3";
- }
keys[++i] = NULL;
vals[i] = NULL;
@@ -472,6 +489,50 @@ libpqrcv_server_version(WalReceiverConn *conn)
}
/*
+ * Get database name from the primary server's conninfo.
+ *
+ * If dbname is not found in connInfo, return NULL value.
+ */
+static char *
+libpqrcv_get_dbname_from_conninfo(const char *connInfo)
+{
+ PQconninfoOption *opts;
+ char *dbname = NULL;
+ char *err = NULL;
+
+ opts = PQconninfoParse(connInfo, &err);
+ if (opts == NULL)
+ {
+ /* The error string is malloc'd, so we must free it explicitly */
+ char *errcopy = err ? pstrdup(err) : "out of memory";
+
+ PQfreemem(err);
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("invalid connection string syntax: %s", errcopy)));
+ }
+
+ for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
+ {
+ /*
+ * If multiple dbnames are specified, then the last one will be
+ * returned
+ */
+ if (strcmp(opt->keyword, "dbname") == 0 && opt->val &&
+ *opt->val)
+ {
+ if (dbname)
+ pfree(dbname);
+
+ dbname = pstrdup(opt->val);
+ }
+ }
+
+ PQconninfoFree(opts);
+ return dbname;
+}
+
+/*
* Start streaming WAL data from given streaming options.
*
* Returns true if we switched successfully to copy-both mode. False
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 5acab3f3e23..ee066290881 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -1329,7 +1329,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
* so that synchronous replication can distinguish them.
*/
LogRepWorkerWalRcvConn =
- walrcv_connect(MySubscription->conninfo, true,
+ walrcv_connect(MySubscription->conninfo, true, true,
must_use_password,
slotname, &err);
if (LogRepWorkerWalRcvConn == NULL)
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 32ff4c03364..9dd2446fbfd 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -4519,7 +4519,7 @@ run_apply_worker()
!MySubscription->ownersuperuser;
LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
- must_use_password,
+ true, must_use_password,
MySubscription->name, &err);
if (LogRepWorkerWalRcvConn == NULL)
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index e29a6196a3e..b80447d15f1 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -296,7 +296,7 @@ WalReceiverMain(void)
sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
/* Establish the connection to the primary for XLOG streaming */
- wrconn = walrcv_connect(conninfo, false, false,
+ wrconn = walrcv_connect(conninfo, true, false, false,
cluster_name[0] ? cluster_name : "walreceiver",
&err);
if (!wrconn)
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index f566a99ba16..b906bb5ce83 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -228,8 +228,10 @@ typedef struct WalRcvExecResult
/*
* walrcv_connect_fn
*
- * Establish connection to a cluster. 'logical' is true if the
- * connection is logical, and false if the connection is physical.
+ * Establish connection to a cluster. 'replication' is true if the
+ * connection is a replication connection, and false if it is a
+ * regular connection. If it is a replication connection, it could
+ * be either logical or physical based on input argument 'logical'.
* 'appname' is a name associated to the connection, to use for example
* with fallback_application_name or application_name. Returns the
* details about the connection established, as defined by
@@ -237,6 +239,7 @@ typedef struct WalRcvExecResult
* returned with 'err' including the error generated.
*/
typedef WalReceiverConn *(*walrcv_connect_fn) (const char *conninfo,
+ bool replication,
bool logical,
bool must_use_password,
const char *appname,
@@ -280,6 +283,13 @@ typedef char *(*walrcv_identify_system_fn) (WalReceiverConn *conn,
TimeLineID *primary_tli);
/*
+ * walrcv_get_dbname_from_conninfo_fn
+ *
+ * Returns the database name from the primary_conninfo
+ */
+typedef char *(*walrcv_get_dbname_from_conninfo_fn) (const char *conninfo);
+
+/*
* walrcv_server_version_fn
*
* Returns the version number of the cluster connected to.
@@ -403,6 +413,7 @@ typedef struct WalReceiverFunctionsType
walrcv_get_conninfo_fn walrcv_get_conninfo;
walrcv_get_senderinfo_fn walrcv_get_senderinfo;
walrcv_identify_system_fn walrcv_identify_system;
+ walrcv_get_dbname_from_conninfo_fn walrcv_get_dbname_from_conninfo;
walrcv_server_version_fn walrcv_server_version;
walrcv_readtimelinehistoryfile_fn walrcv_readtimelinehistoryfile;
walrcv_startstreaming_fn walrcv_startstreaming;
@@ -418,8 +429,8 @@ typedef struct WalReceiverFunctionsType
extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
-#define walrcv_connect(conninfo, logical, must_use_password, appname, err) \
- WalReceiverFunctions->walrcv_connect(conninfo, logical, must_use_password, appname, err)
+#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err) \
+ WalReceiverFunctions->walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
#define walrcv_check_conninfo(conninfo, must_use_password) \
WalReceiverFunctions->walrcv_check_conninfo(conninfo, must_use_password)
#define walrcv_get_conninfo(conn) \
@@ -428,6 +439,8 @@ extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
WalReceiverFunctions->walrcv_get_senderinfo(conn, sender_host, sender_port)
#define walrcv_identify_system(conn, primary_tli) \
WalReceiverFunctions->walrcv_identify_system(conn, primary_tli)
+#define walrcv_get_dbname_from_conninfo(conninfo) \
+ WalReceiverFunctions->walrcv_get_dbname_from_conninfo(conninfo)
#define walrcv_server_version(conn) \
WalReceiverFunctions->walrcv_server_version(conn)
#define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size) \