aboutsummaryrefslogtreecommitdiff
path: root/src/backend/commands/subscriptioncmds.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/commands/subscriptioncmds.c')
-rw-r--r--src/backend/commands/subscriptioncmds.c66
1 files changed, 27 insertions, 39 deletions
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 5f834a9c300..239d263f835 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -98,7 +98,8 @@ static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname,
* Caller is expected to have cleared 'opts'.
*/
static void
-parse_subscription_options(List *stmt_options, bits32 supported_opts, SubOpts *opts)
+parse_subscription_options(ParseState *pstate, List *stmt_options,
+ bits32 supported_opts, SubOpts *opts)
{
ListCell *lc;
@@ -137,9 +138,7 @@ parse_subscription_options(List *stmt_options, bits32 supported_opts, SubOpts *o
strcmp(defel->defname, "connect") == 0)
{
if (IsSet(opts->specified_opts, SUBOPT_CONNECT))
- ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("conflicting or redundant options")));
+ errorConflictingDefElem(defel, pstate);
opts->specified_opts |= SUBOPT_CONNECT;
opts->connect = defGetBoolean(defel);
@@ -148,9 +147,7 @@ parse_subscription_options(List *stmt_options, bits32 supported_opts, SubOpts *o
strcmp(defel->defname, "enabled") == 0)
{
if (IsSet(opts->specified_opts, SUBOPT_ENABLED))
- ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("conflicting or redundant options")));
+ errorConflictingDefElem(defel, pstate);
opts->specified_opts |= SUBOPT_ENABLED;
opts->enabled = defGetBoolean(defel);
@@ -159,9 +156,7 @@ parse_subscription_options(List *stmt_options, bits32 supported_opts, SubOpts *o
strcmp(defel->defname, "create_slot") == 0)
{
if (IsSet(opts->specified_opts, SUBOPT_CREATE_SLOT))
- ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("conflicting or redundant options")));
+ errorConflictingDefElem(defel, pstate);
opts->specified_opts |= SUBOPT_CREATE_SLOT;
opts->create_slot = defGetBoolean(defel);
@@ -170,9 +165,7 @@ parse_subscription_options(List *stmt_options, bits32 supported_opts, SubOpts *o
strcmp(defel->defname, "slot_name") == 0)
{
if (IsSet(opts->specified_opts, SUBOPT_SLOT_NAME))
- ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("conflicting or redundant options")));
+ errorConflictingDefElem(defel, pstate);
opts->specified_opts |= SUBOPT_SLOT_NAME;
opts->slot_name = defGetString(defel);
@@ -185,9 +178,7 @@ parse_subscription_options(List *stmt_options, bits32 supported_opts, SubOpts *o
strcmp(defel->defname, "copy_data") == 0)
{
if (IsSet(opts->specified_opts, SUBOPT_COPY_DATA))
- ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("conflicting or redundant options")));
+ errorConflictingDefElem(defel, pstate);
opts->specified_opts |= SUBOPT_COPY_DATA;
opts->copy_data = defGetBoolean(defel);
@@ -196,9 +187,7 @@ parse_subscription_options(List *stmt_options, bits32 supported_opts, SubOpts *o
strcmp(defel->defname, "synchronous_commit") == 0)
{
if (IsSet(opts->specified_opts, SUBOPT_SYNCHRONOUS_COMMIT))
- ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("conflicting or redundant options")));
+ errorConflictingDefElem(defel, pstate);
opts->specified_opts |= SUBOPT_SYNCHRONOUS_COMMIT;
opts->synchronous_commit = defGetString(defel);
@@ -212,9 +201,7 @@ parse_subscription_options(List *stmt_options, bits32 supported_opts, SubOpts *o
strcmp(defel->defname, "refresh") == 0)
{
if (IsSet(opts->specified_opts, SUBOPT_REFRESH))
- ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("conflicting or redundant options")));
+ errorConflictingDefElem(defel, pstate);
opts->specified_opts |= SUBOPT_REFRESH;
opts->refresh = defGetBoolean(defel);
@@ -223,9 +210,7 @@ parse_subscription_options(List *stmt_options, bits32 supported_opts, SubOpts *o
strcmp(defel->defname, "binary") == 0)
{
if (IsSet(opts->specified_opts, SUBOPT_BINARY))
- ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("conflicting or redundant options")));
+ errorConflictingDefElem(defel, pstate);
opts->specified_opts |= SUBOPT_BINARY;
opts->binary = defGetBoolean(defel);
@@ -234,9 +219,7 @@ parse_subscription_options(List *stmt_options, bits32 supported_opts, SubOpts *o
strcmp(defel->defname, "streaming") == 0)
{
if (IsSet(opts->specified_opts, SUBOPT_STREAMING))
- ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("conflicting or redundant options")));
+ errorConflictingDefElem(defel, pstate);
opts->specified_opts |= SUBOPT_STREAMING;
opts->streaming = defGetBoolean(defel);
@@ -257,9 +240,7 @@ parse_subscription_options(List *stmt_options, bits32 supported_opts, SubOpts *o
errmsg("unrecognized subscription parameter: \"%s\"", defel->defname)));
if (IsSet(opts->specified_opts, SUBOPT_TWOPHASE_COMMIT))
- ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("conflicting or redundant options")));
+ errorConflictingDefElem(defel, pstate);
opts->specified_opts |= SUBOPT_TWOPHASE_COMMIT;
opts->twophase = defGetBoolean(defel);
@@ -408,7 +389,8 @@ publicationListToArray(List *publist)
* Create new subscription.
*/
ObjectAddress
-CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
+CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
+ bool isTopLevel)
{
Relation rel;
ObjectAddress myself;
@@ -432,7 +414,7 @@ CreateSubscription(CreateSubscriptionStmt *stmt, bool isTopLevel)
SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |
SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT);
- parse_subscription_options(stmt->options, supported_opts, &opts);
+ parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
/*
* Since creating a replication slot is not transactional, rolling back
@@ -853,7 +835,8 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data)
* Alter the existing subscription.
*/
ObjectAddress
-AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
+AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
+ bool isTopLevel)
{
Relation rel;
ObjectAddress myself;
@@ -906,7 +889,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
SUBOPT_STREAMING);
- parse_subscription_options(stmt->options, supported_opts, &opts);
+ parse_subscription_options(pstate, stmt->options,
+ supported_opts, &opts);
if (IsSet(opts.specified_opts, SUBOPT_SLOT_NAME))
{
@@ -957,7 +941,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
case ALTER_SUBSCRIPTION_ENABLED:
{
- parse_subscription_options(stmt->options, SUBOPT_ENABLED, &opts);
+ parse_subscription_options(pstate, stmt->options,
+ SUBOPT_ENABLED, &opts);
Assert(IsSet(opts.specified_opts, SUBOPT_ENABLED));
if (!sub->slotname && opts.enabled)
@@ -991,7 +976,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
case ALTER_SUBSCRIPTION_SET_PUBLICATION:
{
supported_opts = SUBOPT_COPY_DATA | SUBOPT_REFRESH;
- parse_subscription_options(stmt->options, supported_opts, &opts);
+ parse_subscription_options(pstate, stmt->options,
+ supported_opts, &opts);
values[Anum_pg_subscription_subpublications - 1] =
publicationListToArray(stmt->publication);
@@ -1040,7 +1026,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
if (isadd)
supported_opts |= SUBOPT_COPY_DATA;
- parse_subscription_options(stmt->options, supported_opts, &opts);
+ parse_subscription_options(pstate, stmt->options,
+ supported_opts, &opts);
publist = merge_publications(sub->publications, stmt->publication, isadd, stmt->subname);
values[Anum_pg_subscription_subpublications - 1] =
@@ -1087,7 +1074,8 @@ AlterSubscription(AlterSubscriptionStmt *stmt, bool isTopLevel)
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("ALTER SUBSCRIPTION ... REFRESH is not allowed for disabled subscriptions")));
- parse_subscription_options(stmt->options, SUBOPT_COPY_DATA, &opts);
+ parse_subscription_options(pstate, stmt->options,
+ SUBOPT_COPY_DATA, &opts);
/*
* The subscription option "two_phase" requires that