aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication')
-rw-r--r--src/backend/replication/libpqwalreceiver/libpqwalreceiver.c119
-rw-r--r--src/backend/replication/logical/tablesync.c2
-rw-r--r--src/backend/replication/logical/worker.c2
-rw-r--r--src/backend/replication/walreceiver.c2
4 files changed, 93 insertions, 32 deletions
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
index 2439733b55b..9270d7b855b 100644
--- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
+++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
@@ -48,7 +48,8 @@ struct WalReceiverConn
/* Prototypes for interface functions */
static WalReceiverConn *libpqrcv_connect(const char *conninfo,
- bool logical, bool must_use_password,
+ bool replication, bool logical,
+ bool must_use_password,
const char *appname, char **err);
static void libpqrcv_check_conninfo(const char *conninfo,
bool must_use_password);
@@ -57,6 +58,7 @@ static void libpqrcv_get_senderinfo(WalReceiverConn *conn,
char **sender_host, int *sender_port);
static char *libpqrcv_identify_system(WalReceiverConn *conn,
TimeLineID *primary_tli);
+static char *libpqrcv_get_dbname_from_conninfo(const char *conninfo);
static int libpqrcv_server_version(WalReceiverConn *conn);
static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
TimeLineID tli, char **filename,
@@ -99,6 +101,7 @@ static WalReceiverFunctionsType PQWalReceiverFunctions = {
.walrcv_send = libpqrcv_send,
.walrcv_create_slot = libpqrcv_create_slot,
.walrcv_alter_slot = libpqrcv_alter_slot,
+ .walrcv_get_dbname_from_conninfo = libpqrcv_get_dbname_from_conninfo,
.walrcv_get_backend_pid = libpqrcv_get_backend_pid,
.walrcv_exec = libpqrcv_exec,
.walrcv_disconnect = libpqrcv_disconnect
@@ -121,7 +124,11 @@ _PG_init(void)
}
/*
- * Establish the connection to the primary server for XLOG streaming
+ * Establish the connection to the primary server.
+ *
+ * This function can be used for both replication and regular connections.
+ * If it is a replication connection, it could be either logical or physical
+ * based on input argument 'logical'.
*
* If an error occurs, this function will normally return NULL and set *err
* to a palloc'ed error message. However, if must_use_password is true and
@@ -132,8 +139,8 @@ _PG_init(void)
* case.
*/
static WalReceiverConn *
-libpqrcv_connect(const char *conninfo, bool logical, bool must_use_password,
- const char *appname, char **err)
+libpqrcv_connect(const char *conninfo, bool replication, bool logical,
+ bool must_use_password, const char *appname, char **err)
{
WalReceiverConn *conn;
PostgresPollingStatusType status;
@@ -156,36 +163,46 @@ libpqrcv_connect(const char *conninfo, bool logical, bool must_use_password,
*/
keys[i] = "dbname";
vals[i] = conninfo;
- keys[++i] = "replication";
- vals[i] = logical ? "database" : "true";
- if (!logical)
+
+ /* We can not have logical without replication */
+ Assert(replication || !logical);
+
+ if (replication)
{
- /*
- * The database name is ignored by the server in replication mode, but
- * specify "replication" for .pgpass lookup.
- */
- keys[++i] = "dbname";
- vals[i] = "replication";
+ keys[++i] = "replication";
+ vals[i] = logical ? "database" : "true";
+
+ if (logical)
+ {
+ /* Tell the publisher to translate to our encoding */
+ keys[++i] = "client_encoding";
+ vals[i] = GetDatabaseEncodingName();
+
+ /*
+ * Force assorted GUC parameters to settings that ensure that the
+ * publisher will output data values in a form that is unambiguous
+ * to the subscriber. (We don't want to modify the subscriber's
+ * GUC settings, since that might surprise user-defined code
+ * running in the subscriber, such as triggers.) This should
+ * match what pg_dump does.
+ */
+ keys[++i] = "options";
+ vals[i] = "-c datestyle=ISO -c intervalstyle=postgres -c extra_float_digits=3";
+ }
+ else
+ {
+ /*
+ * The database name is ignored by the server in replication mode,
+ * but specify "replication" for .pgpass lookup.
+ */
+ keys[++i] = "dbname";
+ vals[i] = "replication";
+ }
}
+
keys[++i] = "fallback_application_name";
vals[i] = appname;
- if (logical)
- {
- /* Tell the publisher to translate to our encoding */
- keys[++i] = "client_encoding";
- vals[i] = GetDatabaseEncodingName();
- /*
- * Force assorted GUC parameters to settings that ensure that the
- * publisher will output data values in a form that is unambiguous to
- * the subscriber. (We don't want to modify the subscriber's GUC
- * settings, since that might surprise user-defined code running in
- * the subscriber, such as triggers.) This should match what pg_dump
- * does.
- */
- keys[++i] = "options";
- vals[i] = "-c datestyle=ISO -c intervalstyle=postgres -c extra_float_digits=3";
- }
keys[++i] = NULL;
vals[i] = NULL;
@@ -472,6 +489,50 @@ libpqrcv_server_version(WalReceiverConn *conn)
}
/*
+ * Get database name from the primary server's conninfo.
+ *
+ * If dbname is not found in connInfo, return NULL value.
+ */
+static char *
+libpqrcv_get_dbname_from_conninfo(const char *connInfo)
+{
+ PQconninfoOption *opts;
+ char *dbname = NULL;
+ char *err = NULL;
+
+ opts = PQconninfoParse(connInfo, &err);
+ if (opts == NULL)
+ {
+ /* The error string is malloc'd, so we must free it explicitly */
+ char *errcopy = err ? pstrdup(err) : "out of memory";
+
+ PQfreemem(err);
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("invalid connection string syntax: %s", errcopy)));
+ }
+
+ for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
+ {
+ /*
+ * If multiple dbnames are specified, then the last one will be
+ * returned
+ */
+ if (strcmp(opt->keyword, "dbname") == 0 && opt->val &&
+ *opt->val)
+ {
+ if (dbname)
+ pfree(dbname);
+
+ dbname = pstrdup(opt->val);
+ }
+ }
+
+ PQconninfoFree(opts);
+ return dbname;
+}
+
+/*
* Start streaming WAL data from given streaming options.
*
* Returns true if we switched successfully to copy-both mode. False
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 5acab3f3e23..ee066290881 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -1329,7 +1329,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
* so that synchronous replication can distinguish them.
*/
LogRepWorkerWalRcvConn =
- walrcv_connect(MySubscription->conninfo, true,
+ walrcv_connect(MySubscription->conninfo, true, true,
must_use_password,
slotname, &err);
if (LogRepWorkerWalRcvConn == NULL)
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 32ff4c03364..9dd2446fbfd 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -4519,7 +4519,7 @@ run_apply_worker()
!MySubscription->ownersuperuser;
LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
- must_use_password,
+ true, must_use_password,
MySubscription->name, &err);
if (LogRepWorkerWalRcvConn == NULL)
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index e29a6196a3e..b80447d15f1 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -296,7 +296,7 @@ WalReceiverMain(void)
sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
/* Establish the connection to the primary for XLOG streaming */
- wrconn = walrcv_connect(conninfo, false, false,
+ wrconn = walrcv_connect(conninfo, true, false, false,
cluster_name[0] ? cluster_name : "walreceiver",
&err);
if (!wrconn)