aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/catalog/pg_subscription.c3
-rw-r--r--src/backend/commands/subscriptioncmds.c4
-rw-r--r--src/backend/replication/logical/tablesync.c6
-rw-r--r--src/backend/replication/logical/worker.c33
-rw-r--r--src/include/catalog/pg_subscription.h1
-rw-r--r--src/test/subscription/t/027_nosuperuser.pl24
6 files changed, 60 insertions, 11 deletions
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index d07f88ce280..d6a978f1362 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -108,6 +108,9 @@ GetSubscription(Oid subid, bool missing_ok)
Anum_pg_subscription_suborigin);
sub->origin = TextDatumGetCString(datum);
+ /* Is the subscription owner a superuser? */
+ sub->ownersuperuser = superuser_arg(sub->owner);
+
ReleaseSysCache(tup);
return sub;
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 6fe111e98d3..edc82c11beb 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -869,7 +869,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
load_file("libpqwalreceiver", false);
/* Try to connect to the publisher. */
- must_use_password = !superuser_arg(sub->owner) && sub->passwordrequired;
+ must_use_password = sub->passwordrequired && !sub->ownersuperuser;
wrconn = walrcv_connect(sub->conninfo, true, must_use_password,
sub->name, &err);
if (!wrconn)
@@ -1249,7 +1249,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
load_file("libpqwalreceiver", false);
/* Check the connection info string. */
walrcv_check_conninfo(stmt->conninfo,
- sub->passwordrequired && !superuser_arg(sub->owner));
+ sub->passwordrequired && !sub->ownersuperuser);
values[Anum_pg_subscription_subconninfo - 1] =
CStringGetTextDatum(stmt->conninfo);
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index e2cee92cf26..37a0abe2f4d 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -1275,13 +1275,11 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
relstate = GetSubscriptionRelState(MyLogicalRepWorker->subid,
MyLogicalRepWorker->relid,
&relstate_lsn);
+ CommitTransactionCommand();
/* Is the use of a password mandatory? */
must_use_password = MySubscription->passwordrequired &&
- !superuser_arg(MySubscription->owner);
-
- /* Note that the superuser_arg call can access the DB */
- CommitTransactionCommand();
+ !MySubscription->ownersuperuser;
SpinLockAcquire(&MyLogicalRepWorker->relmutex);
MyLogicalRepWorker->relstate = relstate;
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 597947410f8..54c14495bea 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -3966,6 +3966,24 @@ maybe_reread_subscription(void)
apply_worker_exit();
}
+ /*
+ * Exit if the subscription owner's superuser privileges have been
+ * revoked.
+ */
+ if (!newsub->ownersuperuser && MySubscription->ownersuperuser)
+ {
+ if (am_parallel_apply_worker())
+ ereport(LOG,
+ errmsg("logical replication parallel apply worker for subscription \"%s\" will stop because the subscription owner's superuser privileges have been revoked",
+ MySubscription->name));
+ else
+ ereport(LOG,
+ errmsg("logical replication worker for subscription \"%s\" will restart because the subscription owner's superuser privileges have been revoked",
+ MySubscription->name));
+
+ apply_worker_exit();
+ }
+
/* Check for other changes that should never happen too. */
if (newsub->dbid != MySubscription->dbid)
{
@@ -4492,13 +4510,11 @@ run_apply_worker()
replorigin_session_setup(originid, 0);
replorigin_session_origin = originid;
origin_startpos = replorigin_session_get_progress(false);
+ CommitTransactionCommand();
/* Is the use of a password mandatory? */
must_use_password = MySubscription->passwordrequired &&
- !superuser_arg(MySubscription->owner);
-
- /* Note that the superuser_arg call can access the DB */
- CommitTransactionCommand();
+ !MySubscription->ownersuperuser;
LogRepWorkerWalRcvConn = walrcv_connect(MySubscription->conninfo, true,
must_use_password,
@@ -4621,11 +4637,18 @@ InitializeLogRepWorker(void)
SetConfigOption("synchronous_commit", MySubscription->synccommit,
PGC_BACKEND, PGC_S_OVERRIDE);
- /* Keep us informed about subscription changes. */
+ /*
+ * Keep us informed about subscription or role changes. Note that the
+ * role's superuser privilege can be revoked.
+ */
CacheRegisterSyscacheCallback(SUBSCRIPTIONOID,
subscription_change_cb,
(Datum) 0);
+ CacheRegisterSyscacheCallback(AUTHOID,
+ subscription_change_cb,
+ (Datum) 0);
+
if (am_tablesync_worker())
ereport(LOG,
(errmsg("logical replication table synchronization worker for subscription \"%s\", table \"%s\" has started",
diff --git a/src/include/catalog/pg_subscription.h b/src/include/catalog/pg_subscription.h
index be36c4a8207..e0b91eacd2a 100644
--- a/src/include/catalog/pg_subscription.h
+++ b/src/include/catalog/pg_subscription.h
@@ -127,6 +127,7 @@ typedef struct Subscription
* skipped */
char *name; /* Name of the subscription */
Oid owner; /* Oid of the subscription owner */
+ bool ownersuperuser; /* Is the subscription owner a superuser? */
bool enabled; /* Indicates if the subscription is enabled */
bool binary; /* Indicates if the subscription wants data in
* binary format */
diff --git a/src/test/subscription/t/027_nosuperuser.pl b/src/test/subscription/t/027_nosuperuser.pl
index d7a7e3ef5bb..642baa5d7c9 100644
--- a/src/test/subscription/t/027_nosuperuser.pl
+++ b/src/test/subscription/t/027_nosuperuser.pl
@@ -104,6 +104,7 @@ for my $node ($node_publisher, $node_subscriber)
CREATE ROLE regress_admin SUPERUSER LOGIN;
CREATE ROLE regress_alice NOSUPERUSER LOGIN;
GRANT CREATE ON DATABASE postgres TO regress_alice;
+ GRANT PG_CREATE_SUBSCRIPTION TO regress_alice;
SET SESSION AUTHORIZATION regress_alice;
CREATE SCHEMA alice;
GRANT USAGE ON SCHEMA alice TO regress_admin;
@@ -303,4 +304,27 @@ GRANT SELECT ON alice.unpartitioned TO regress_alice;
expect_replication("alice.unpartitioned", 3, 17, 21,
"restoring SELECT permission permits replication to continue");
+# The apply worker should get restarted after the superuser privileges are
+# revoked for subscription owner alice.
+grant_superuser("regress_alice");
+$node_subscriber->safe_psql(
+ 'postgres', qq(
+SET SESSION AUTHORIZATION regress_alice;
+CREATE SUBSCRIPTION regression_sub CONNECTION '$publisher_connstr' PUBLICATION alice;
+));
+
+# Wait for initial sync to finish
+$node_subscriber->wait_for_subscription_sync($node_publisher,
+ 'regression_sub');
+
+# Check the subscriber log from now on.
+$offset = -s $node_subscriber->logfile;
+
+revoke_superuser("regress_alice");
+
+# After the user becomes non-superuser the apply worker should be restarted.
+$node_subscriber->wait_for_log(
+ qr/LOG: ( [A-Z0-9]+:)? logical replication worker for subscription \"regression_sub\" will restart because the subscription owner's superuser privileges have been revoked/,
+ $offset);
+
done_testing();