diff options
Diffstat (limited to 'src/backend/commands/subscriptioncmds.c')
-rw-r--r-- | src/backend/commands/subscriptioncmds.c | 67 |
1 files changed, 62 insertions, 5 deletions
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index f15a332bae3..baff00dd74e 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -85,7 +85,7 @@ typedef struct SubOpts bool copy_data; bool refresh; bool binary; - bool streaming; + char streaming; bool twophase; bool disableonerr; char *origin; @@ -139,7 +139,7 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, if (IsSet(supported_opts, SUBOPT_BINARY)) opts->binary = false; if (IsSet(supported_opts, SUBOPT_STREAMING)) - opts->streaming = false; + opts->streaming = LOGICALREP_STREAM_OFF; if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT)) opts->twophase = false; if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR)) @@ -242,7 +242,7 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, errorConflictingDefElem(defel, pstate); opts->specified_opts |= SUBOPT_STREAMING; - opts->streaming = defGetBoolean(defel); + opts->streaming = defGetStreamingMode(defel); } else if (strcmp(defel->defname, "two_phase") == 0) { @@ -630,7 +630,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt, values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner); values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled); values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(opts.binary); - values[Anum_pg_subscription_substream - 1] = BoolGetDatum(opts.streaming); + values[Anum_pg_subscription_substream - 1] = CharGetDatum(opts.streaming); values[Anum_pg_subscription_subtwophasestate - 1] = CharGetDatum(opts.twophase ? LOGICALREP_TWOPHASE_STATE_PENDING : @@ -1099,7 +1099,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, if (IsSet(opts.specified_opts, SUBOPT_STREAMING)) { values[Anum_pg_subscription_substream - 1] = - BoolGetDatum(opts.streaming); + CharGetDatum(opts.streaming); replaces[Anum_pg_subscription_substream - 1] = true; } @@ -2128,3 +2128,60 @@ merge_publications(List *oldpublist, List *newpublist, bool addpub, const char * return oldpublist; } + +/* + * Extract the streaming mode value from a DefElem. This is like + * defGetBoolean() but also accepts the special value of "parallel". + */ +char +defGetStreamingMode(DefElem *def) +{ + /* + * If no parameter value given, assume "true" is meant. + */ + if (!def->arg) + return LOGICALREP_STREAM_ON; + + /* + * Allow 0, 1, "false", "true", "off", "on" or "parallel". + */ + switch (nodeTag(def->arg)) + { + case T_Integer: + switch (intVal(def->arg)) + { + case 0: + return LOGICALREP_STREAM_OFF; + case 1: + return LOGICALREP_STREAM_ON; + default: + /* otherwise, error out below */ + break; + } + break; + default: + { + char *sval = defGetString(def); + + /* + * The set of strings accepted here should match up with the + * grammar's opt_boolean_or_string production. + */ + if (pg_strcasecmp(sval, "false") == 0 || + pg_strcasecmp(sval, "off") == 0) + return LOGICALREP_STREAM_OFF; + if (pg_strcasecmp(sval, "true") == 0 || + pg_strcasecmp(sval, "on") == 0) + return LOGICALREP_STREAM_ON; + if (pg_strcasecmp(sval, "parallel") == 0) + return LOGICALREP_STREAM_PARALLEL; + } + break; + } + + ereport(ERROR, + (errcode(ERRCODE_SYNTAX_ERROR), + errmsg("%s requires a Boolean value or \"parallel\"", + def->defname))); + return LOGICALREP_STREAM_OFF; /* keep compiler quiet */ +} |