aboutsummaryrefslogtreecommitdiff
path: root/src/backend
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend')
-rw-r--r--src/backend/catalog/pg_subscription.c1
-rw-r--r--src/backend/catalog/system_views.sql2
-rw-r--r--src/backend/commands/subscriptioncmds.c23
-rw-r--r--src/backend/replication/logical/worker.c46
4 files changed, 57 insertions, 15 deletions
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c
index 87e8ea7efaf..d07f88ce280 100644
--- a/src/backend/catalog/pg_subscription.c
+++ b/src/backend/catalog/pg_subscription.c
@@ -72,6 +72,7 @@ GetSubscription(Oid subid, bool missing_ok)
sub->twophasestate = subform->subtwophasestate;
sub->disableonerr = subform->subdisableonerr;
sub->passwordrequired = subform->subpasswordrequired;
+ sub->runasowner = subform->subrunasowner;
/* Get conninfo */
datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 574cbc2e448..6b098234f8c 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -1319,7 +1319,7 @@ REVOKE ALL ON pg_replication_origin_status FROM public;
REVOKE ALL ON pg_subscription FROM public;
GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled,
subbinary, substream, subtwophasestate, subdisableonerr,
- subpasswordrequired,
+ subpasswordrequired, subrunasowner,
subslotname, subsynccommit, subpublications, suborigin)
ON pg_subscription TO public;
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 87eb23496eb..3251d89ba80 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -68,8 +68,9 @@
#define SUBOPT_TWOPHASE_COMMIT 0x00000200
#define SUBOPT_DISABLE_ON_ERR 0x00000400
#define SUBOPT_PASSWORD_REQUIRED 0x00000800
-#define SUBOPT_LSN 0x00001000
-#define SUBOPT_ORIGIN 0x00002000
+#define SUBOPT_RUN_AS_OWNER 0x00001000
+#define SUBOPT_LSN 0x00002000
+#define SUBOPT_ORIGIN 0x00004000
/* check if the 'val' has 'bits' set */
#define IsSet(val, bits) (((val) & (bits)) == (bits))
@@ -93,6 +94,7 @@ typedef struct SubOpts
bool twophase;
bool disableonerr;
bool passwordrequired;
+ bool runasowner;
char *origin;
XLogRecPtr lsn;
} SubOpts;
@@ -151,6 +153,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
opts->disableonerr = false;
if (IsSet(supported_opts, SUBOPT_PASSWORD_REQUIRED))
opts->passwordrequired = true;
+ if (IsSet(supported_opts, SUBOPT_RUN_AS_OWNER))
+ opts->runasowner = false;
if (IsSet(supported_opts, SUBOPT_ORIGIN))
opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);
@@ -290,6 +294,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
opts->specified_opts |= SUBOPT_PASSWORD_REQUIRED;
opts->passwordrequired = defGetBoolean(defel);
}
+ else if (IsSet(supported_opts, SUBOPT_RUN_AS_OWNER) &&
+ strcmp(defel->defname, "run_as_owner") == 0)
+ {
+ if (IsSet(opts->specified_opts, SUBOPT_RUN_AS_OWNER))
+ errorConflictingDefElem(defel, pstate);
+
+ opts->specified_opts |= SUBOPT_RUN_AS_OWNER;
+ opts->runasowner = defGetBoolean(defel);
+ }
else if (IsSet(supported_opts, SUBOPT_ORIGIN) &&
strcmp(defel->defname, "origin") == 0)
{
@@ -578,7 +591,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED |
- SUBOPT_ORIGIN);
+ SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN);
parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
/*
@@ -681,6 +694,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
LOGICALREP_TWOPHASE_STATE_DISABLED);
values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr);
values[Anum_pg_subscription_subpasswordrequired - 1] = BoolGetDatum(opts.passwordrequired);
+ values[Anum_pg_subscription_subrunasowner - 1] = BoolGetDatum(opts.runasowner);
values[Anum_pg_subscription_subconninfo - 1] =
CStringGetTextDatum(conninfo);
if (opts.slot_name)
@@ -1115,7 +1129,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
supported_opts = (SUBOPT_SLOT_NAME |
SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR |
- SUBOPT_PASSWORD_REQUIRED | SUBOPT_ORIGIN);
+ SUBOPT_PASSWORD_REQUIRED |
+ SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN);
parse_subscription_options(pstate, stmt->options,
supported_opts, &opts);
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 61009fa8cda..3d58910c145 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -2401,6 +2401,7 @@ apply_handle_insert(StringInfo s)
EState *estate;
TupleTableSlot *remoteslot;
MemoryContext oldctx;
+ bool run_as_owner;
/*
* Quick return if we are skipping data modification changes or handling
@@ -2425,8 +2426,13 @@ apply_handle_insert(StringInfo s)
return;
}
- /* Make sure that any user-supplied code runs as the table owner. */
- SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
+ /*
+ * Make sure that any user-supplied code runs as the table owner, unless
+ * the user has opted out of that behavior.
+ */
+ run_as_owner = MySubscription->runasowner;
+ if (!run_as_owner)
+ SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
/* Set relation for error callback */
apply_error_callback_arg.rel = rel;
@@ -2457,7 +2463,8 @@ apply_handle_insert(StringInfo s)
/* Reset relation for error callback */
apply_error_callback_arg.rel = NULL;
- RestoreUserContext(&ucxt);
+ if (!run_as_owner)
+ RestoreUserContext(&ucxt);
logicalrep_rel_close(rel, NoLock);
@@ -2546,6 +2553,7 @@ apply_handle_update(StringInfo s)
TupleTableSlot *remoteslot;
RTEPermissionInfo *target_perminfo;
MemoryContext oldctx;
+ bool run_as_owner;
/*
* Quick return if we are skipping data modification changes or handling
@@ -2577,8 +2585,13 @@ apply_handle_update(StringInfo s)
/* Check if we can do the update. */
check_relation_updatable(rel);
- /* Make sure that any user-supplied code runs as the table owner. */
- SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
+ /*
+ * Make sure that any user-supplied code runs as the table owner, unless
+ * the user has opted out of that behavior.
+ */
+ run_as_owner = MySubscription->runasowner;
+ if (!run_as_owner)
+ SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
/* Initialize the executor state. */
edata = create_edata_for_relation(rel);
@@ -2630,7 +2643,8 @@ apply_handle_update(StringInfo s)
/* Reset relation for error callback */
apply_error_callback_arg.rel = NULL;
- RestoreUserContext(&ucxt);
+ if (!run_as_owner)
+ RestoreUserContext(&ucxt);
logicalrep_rel_close(rel, NoLock);
@@ -2720,6 +2734,7 @@ apply_handle_delete(StringInfo s)
EState *estate;
TupleTableSlot *remoteslot;
MemoryContext oldctx;
+ bool run_as_owner;
/*
* Quick return if we are skipping data modification changes or handling
@@ -2750,8 +2765,13 @@ apply_handle_delete(StringInfo s)
/* Check if we can do the delete. */
check_relation_updatable(rel);
- /* Make sure that any user-supplied code runs as the table owner. */
- SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
+ /*
+ * Make sure that any user-supplied code runs as the table owner, unless
+ * the user has opted out of that behavior.
+ */
+ run_as_owner = MySubscription->runasowner;
+ if (!run_as_owner)
+ SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt);
/* Initialize the executor state. */
edata = create_edata_for_relation(rel);
@@ -2778,7 +2798,8 @@ apply_handle_delete(StringInfo s)
/* Reset relation for error callback */
apply_error_callback_arg.rel = NULL;
- RestoreUserContext(&ucxt);
+ if (!run_as_owner)
+ RestoreUserContext(&ucxt);
logicalrep_rel_close(rel, NoLock);
@@ -3225,13 +3246,18 @@ apply_handle_truncate(StringInfo s)
* Even if we used CASCADE on the upstream primary we explicitly default
* to replaying changes without further cascading. This might be later
* changeable with a user specified option.
+ *
+ * MySubscription->runasowner tells us whether we want to execute
+ * replication actions as the subscription owner; the last argument to
+ * TruncateGuts tells it whether we want to switch to the table owner.
+ * Those are exactly opposite conditions.
*/
ExecuteTruncateGuts(rels,
relids,
relids_logged,
DROP_RESTRICT,
restart_seqs,
- true);
+ !MySubscription->runasowner);
foreach(lc, remote_rels)
{
LogicalRepRelMapEntry *rel = lfirst(lc);