diff options
Diffstat (limited to 'src/backend')
-rw-r--r-- | src/backend/catalog/pg_subscription.c | 1 | ||||
-rw-r--r-- | src/backend/catalog/system_views.sql | 2 | ||||
-rw-r--r-- | src/backend/commands/subscriptioncmds.c | 23 | ||||
-rw-r--r-- | src/backend/replication/logical/worker.c | 46 |
4 files changed, 57 insertions, 15 deletions
diff --git a/src/backend/catalog/pg_subscription.c b/src/backend/catalog/pg_subscription.c index 87e8ea7efaf..d07f88ce280 100644 --- a/src/backend/catalog/pg_subscription.c +++ b/src/backend/catalog/pg_subscription.c @@ -72,6 +72,7 @@ GetSubscription(Oid subid, bool missing_ok) sub->twophasestate = subform->subtwophasestate; sub->disableonerr = subform->subdisableonerr; sub->passwordrequired = subform->subpasswordrequired; + sub->runasowner = subform->subrunasowner; /* Get conninfo */ datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID, diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 574cbc2e448..6b098234f8c 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -1319,7 +1319,7 @@ REVOKE ALL ON pg_replication_origin_status FROM public; REVOKE ALL ON pg_subscription FROM public; GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled, subbinary, substream, subtwophasestate, subdisableonerr, - subpasswordrequired, + subpasswordrequired, subrunasowner, subslotname, subsynccommit, subpublications, suborigin) ON pg_subscription TO public; diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 87eb23496eb..3251d89ba80 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -68,8 +68,9 @@ #define SUBOPT_TWOPHASE_COMMIT 0x00000200 #define SUBOPT_DISABLE_ON_ERR 0x00000400 #define SUBOPT_PASSWORD_REQUIRED 0x00000800 -#define SUBOPT_LSN 0x00001000 -#define SUBOPT_ORIGIN 0x00002000 +#define SUBOPT_RUN_AS_OWNER 0x00001000 +#define SUBOPT_LSN 0x00002000 +#define SUBOPT_ORIGIN 0x00004000 /* check if the 'val' has 'bits' set */ #define IsSet(val, bits) (((val) & (bits)) == (bits)) @@ -93,6 +94,7 @@ typedef struct SubOpts bool twophase; bool disableonerr; bool passwordrequired; + bool runasowner; char *origin; XLogRecPtr lsn; } SubOpts; @@ -151,6 +153,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->disableonerr = false; if (IsSet(supported_opts, SUBOPT_PASSWORD_REQUIRED)) opts->passwordrequired = true; + if (IsSet(supported_opts, SUBOPT_RUN_AS_OWNER)) + opts->runasowner = false; if (IsSet(supported_opts, SUBOPT_ORIGIN)) opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY); @@ -290,6 +294,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, opts->specified_opts |= SUBOPT_PASSWORD_REQUIRED; opts->passwordrequired = defGetBoolean(defel); } + else if (IsSet(supported_opts, SUBOPT_RUN_AS_OWNER) && + strcmp(defel->defname, "run_as_owner") == 0) + { + if (IsSet(opts->specified_opts, SUBOPT_RUN_AS_OWNER)) + errorConflictingDefElem(defel, pstate); + + opts->specified_opts |= SUBOPT_RUN_AS_OWNER; + opts->runasowner = defGetBoolean(defel); + } else if (IsSet(supported_opts, SUBOPT_ORIGIN) && strcmp(defel->defname, "origin") == 0) { @@ -578,7 +591,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT | SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED | - SUBOPT_ORIGIN); + SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); /* @@ -681,6 +694,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, LOGICALREP_TWOPHASE_STATE_DISABLED); values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr); values[Anum_pg_subscription_subpasswordrequired - 1] = BoolGetDatum(opts.passwordrequired); + values[Anum_pg_subscription_subrunasowner - 1] = BoolGetDatum(opts.runasowner); values[Anum_pg_subscription_subconninfo - 1] = CStringGetTextDatum(conninfo); if (opts.slot_name) @@ -1115,7 +1129,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, supported_opts = (SUBOPT_SLOT_NAME | SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR | - SUBOPT_PASSWORD_REQUIRED | SUBOPT_ORIGIN); + SUBOPT_PASSWORD_REQUIRED | + SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN); parse_subscription_options(pstate, stmt->options, supported_opts, &opts); diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 61009fa8cda..3d58910c145 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -2401,6 +2401,7 @@ apply_handle_insert(StringInfo s) EState *estate; TupleTableSlot *remoteslot; MemoryContext oldctx; + bool run_as_owner; /* * Quick return if we are skipping data modification changes or handling @@ -2425,8 +2426,13 @@ apply_handle_insert(StringInfo s) return; } - /* Make sure that any user-supplied code runs as the table owner. */ - SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt); + /* + * Make sure that any user-supplied code runs as the table owner, unless + * the user has opted out of that behavior. + */ + run_as_owner = MySubscription->runasowner; + if (!run_as_owner) + SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt); /* Set relation for error callback */ apply_error_callback_arg.rel = rel; @@ -2457,7 +2463,8 @@ apply_handle_insert(StringInfo s) /* Reset relation for error callback */ apply_error_callback_arg.rel = NULL; - RestoreUserContext(&ucxt); + if (!run_as_owner) + RestoreUserContext(&ucxt); logicalrep_rel_close(rel, NoLock); @@ -2546,6 +2553,7 @@ apply_handle_update(StringInfo s) TupleTableSlot *remoteslot; RTEPermissionInfo *target_perminfo; MemoryContext oldctx; + bool run_as_owner; /* * Quick return if we are skipping data modification changes or handling @@ -2577,8 +2585,13 @@ apply_handle_update(StringInfo s) /* Check if we can do the update. */ check_relation_updatable(rel); - /* Make sure that any user-supplied code runs as the table owner. */ - SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt); + /* + * Make sure that any user-supplied code runs as the table owner, unless + * the user has opted out of that behavior. + */ + run_as_owner = MySubscription->runasowner; + if (!run_as_owner) + SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt); /* Initialize the executor state. */ edata = create_edata_for_relation(rel); @@ -2630,7 +2643,8 @@ apply_handle_update(StringInfo s) /* Reset relation for error callback */ apply_error_callback_arg.rel = NULL; - RestoreUserContext(&ucxt); + if (!run_as_owner) + RestoreUserContext(&ucxt); logicalrep_rel_close(rel, NoLock); @@ -2720,6 +2734,7 @@ apply_handle_delete(StringInfo s) EState *estate; TupleTableSlot *remoteslot; MemoryContext oldctx; + bool run_as_owner; /* * Quick return if we are skipping data modification changes or handling @@ -2750,8 +2765,13 @@ apply_handle_delete(StringInfo s) /* Check if we can do the delete. */ check_relation_updatable(rel); - /* Make sure that any user-supplied code runs as the table owner. */ - SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt); + /* + * Make sure that any user-supplied code runs as the table owner, unless + * the user has opted out of that behavior. + */ + run_as_owner = MySubscription->runasowner; + if (!run_as_owner) + SwitchToUntrustedUser(rel->localrel->rd_rel->relowner, &ucxt); /* Initialize the executor state. */ edata = create_edata_for_relation(rel); @@ -2778,7 +2798,8 @@ apply_handle_delete(StringInfo s) /* Reset relation for error callback */ apply_error_callback_arg.rel = NULL; - RestoreUserContext(&ucxt); + if (!run_as_owner) + RestoreUserContext(&ucxt); logicalrep_rel_close(rel, NoLock); @@ -3225,13 +3246,18 @@ apply_handle_truncate(StringInfo s) * Even if we used CASCADE on the upstream primary we explicitly default * to replaying changes without further cascading. This might be later * changeable with a user specified option. + * + * MySubscription->runasowner tells us whether we want to execute + * replication actions as the subscription owner; the last argument to + * TruncateGuts tells it whether we want to switch to the table owner. + * Those are exactly opposite conditions. */ ExecuteTruncateGuts(rels, relids, relids_logged, DROP_RESTRICT, restart_seqs, - true); + !MySubscription->runasowner); foreach(lc, remote_rels) { LogicalRepRelMapEntry *rel = lfirst(lc); |