diff options
Diffstat (limited to 'src/backend/commands/subscriptioncmds.c')
-rw-r--r-- | src/backend/commands/subscriptioncmds.c | 66 |
1 files changed, 27 insertions, 39 deletions
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 5f834a9c300..239d263f835 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -98,7 +98,8 @@ static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, * Caller is expected to have cleared 'opts'. */ static void -parse_subscription_options(List *stmt_options, bits32 supported_opts, SubOpts *opts) +parse_subscription_options(ParseState *pstate, List *stmt_options, + bits32 supported_opts, SubOpts *opts) { ListCell *lc; @@ -137,9 +138,7 @@ parse_subscription_options(List *stmt_options, bits32 supported_opts, SubOpts *o strcmp(defel->defname, "connect") == 0) { if (IsSet(opts->specified_opts, SUBOPT_CONNECT)) - ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("conflicting or redundant options"))); + errorConflictingDefElem(defel, pstate); opts->specified_opts |= SUBOPT_CONNECT; opts->connect = defGetBoolean(defel); @@ -148,9 +147,7 @@ parse_subscription_options(List *stmt_options, bits32 supported_opts, SubOpts *o strcmp(defel->defname, "enabled") == 0) { if (IsSet(opts->specified_opts, SUBOPT_ENABLED)) - ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("conflicting or redundant options"))); + errorConflictingDefElem(defel, pstate); opts->specified_opts |= SUBOPT_ENABLED; opts->enabled = defGetBoolean(defel); @@ -159,9 +156,7 @@ parse_subscription_options(List *stmt_options, bits32 supported_opts, SubOpts *o strcmp(defel->defname, "create_slot") == 0) { if (IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT)) - ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("conflicting or redundant options"))); + errorConflictingDefElem(defel, pstate); opts->specified_opts |= SUBOPT_CREATE_SLOT; opts->create_slot = defGetBoolean(defel); @@ -170,9 +165,7 @@ parse_subscription_options(List *stmt_options, bits32 supported_opts, SubOpts *o strcmp(defel->defname, "slot_name") == 0) { if (IsSet(opts->specified_opts, SUBOPT_SLOT_NAME)) - ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("conflicting or redundant options"))); + errorConflictingDefElem(defel, pstate); opts->specified_opts |= SUBOPT_SLOT_NAME; opts->slot_name = defGetString(defel); @@ -185,9 +178,7 @@ parse_subscription_options(List *stmt_options, bits32 supported_opts, SubOpts *o strcmp(defel->defname, "copy_data") == 0) { if (IsSet(opts->specified_opts, SUBOPT_COPY_DATA)) - ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("conflicting or redundant options"))); + errorConflictingDefElem(defel, pstate); opts->specified_opts |= SUBOPT_COPY_DATA; opts->copy_data = defGetBoolean(defel); @@ -196,9 +187,7 @@ parse_subscription_options(List *stmt_options, bits32 supported_opts, SubOpts *o strcmp(defel->defname, "synchronous_commit") == 0) { if (IsSet(opts->specified_opts, SUBOPT_SYNCHRONOUS_COMMIT)) - ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("conflicting or redundant options"))); + errorConflictingDefElem(defel, pstate); opts->specified_opts |= SUBOPT_SYNCHRONOUS_COMMIT; opts->synchronous_commit = defGetString(defel); @@ -212,9 +201,7 @@ parse_subscription_options(List *stmt_options, bits32 supported_opts, SubOpts *o strcmp(defel->defname, "refresh") == 0) { if (IsSet(opts->specified_opts, SUBOPT_REFRESH)) - ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("conflicting or redundant options"))); + errorConflictingDefElem(defel, pstate); opts->specified_opts |= SUBOPT_REFRESH; opts->refresh = defGetBoolean(defel); @@ -223,9 +210,7 @@ parse_subscription_options(List *stmt_options, bits32 supported_opts, SubOpts *o strcmp(defel->defname, "binary") == 0) { if (IsSet(opts->specified_opts, SUBOPT_BINARY)) - ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("conflicting or redundant options"))); + errorConflictingDefElem(defel, pstate); opts->specified_opts |= SUBOPT_BINARY; opts->binary = defGetBoolean(defel); @@ -234,9 +219,7 @@ parse_subscription_options(List *stmt_options, bits32 supported_opts, SubOpts *o strcmp(defel->defname, "streaming") == 0) { if (IsSet(opts->specified_opts, SUBOPT_STREAMING)) - ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("conflicting or redundant options"))); + errorConflictingDefElem(defel, pstate); opts->specified_opts |= SUBOPT_STREAMING; opts->streaming = defGetBoolean(defel); @@ -257,9 +240,7 @@ parse_subscription_options(List *stmt_options, bits32 supported_opts, SubOpts *o errmsg("unrecognized subscription parameter: \"%s\"", defel->defname))); if (IsSet(opts->specified_opts, SUBOPT_TWOPHASE_COMMIT)) - ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("conflicting or redundant options"))); + errorConflictingDefElem(defel, pstate); opts->specified_opts |= SUBOPT_TWOPHASE_COMMIT; opts->twophase = defGetBoolean(defel); @@ -408,7 +389,8 @@ publicationListToArray(List *publist) * Create new subscription. */ ObjectAddress -CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) +CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, + bool isTopLevel) { Relation rel; ObjectAddress myself; @@ -432,7 +414,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel) SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA | SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT); - parse_subscription_options(stmt->options, supported_opts, &opts); + parse_subscription_options(pstate, stmt->options, supported_opts, &opts); /* * Since creating a replication slot is not transactional, rolling back @@ -853,7 +835,8 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data) * Alter the existing subscription. */ ObjectAddress -AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) +AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, + bool isTopLevel) { Relation rel; ObjectAddress myself; @@ -906,7 +889,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY | SUBOPT_STREAMING); - parse_subscription_options(stmt->options, supported_opts, &opts); + parse_subscription_options(pstate, stmt->options, + supported_opts, &opts); if (IsSet(opts.specified_opts, SUBOPT_SLOT_NAME)) { @@ -957,7 +941,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) case ALTER_SUBSCRIPTION_ENABLED: { - parse_subscription_options(stmt->options, SUBOPT_ENABLED, &opts); + parse_subscription_options(pstate, stmt->options, + SUBOPT_ENABLED, &opts); Assert(IsSet(opts.specified_opts, SUBOPT_ENABLED)); if (!sub->slotname && opts.enabled) @@ -991,7 +976,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) case ALTER_SUBSCRIPTION_SET_PUBLICATION: { supported_opts = SUBOPT_COPY_DATA | SUBOPT_REFRESH; - parse_subscription_options(stmt->options, supported_opts, &opts); + parse_subscription_options(pstate, stmt->options, + supported_opts, &opts); values[Anum_pg_subscription_subpublications - 1] = publicationListToArray(stmt->publication); @@ -1040,7 +1026,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) if (isadd) supported_opts |= SUBOPT_COPY_DATA; - parse_subscription_options(stmt->options, supported_opts, &opts); + parse_subscription_options(pstate, stmt->options, + supported_opts, &opts); publist = merge_publications(sub->publications, stmt->publication, isadd, stmt->subname); values[Anum_pg_subscription_subpublications - 1] = @@ -1087,7 +1074,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel) (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions"))); - parse_subscription_options(stmt->options, SUBOPT_COPY_DATA, &opts); + parse_subscription_options(pstate, stmt->options, + SUBOPT_COPY_DATA, &opts); /* * The subscription option "two_phase" requires that |