aboutsummaryrefslogtreecommitdiff
path: root/src/backend/commands/subscriptioncmds.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/commands/subscriptioncmds.c')
-rw-r--r--src/backend/commands/subscriptioncmds.c67
1 files changed, 62 insertions, 5 deletions
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index f15a332bae3..baff00dd74e 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -85,7 +85,7 @@ typedef struct SubOpts
bool copy_data;
bool refresh;
bool binary;
- bool streaming;
+ char streaming;
bool twophase;
bool disableonerr;
char *origin;
@@ -139,7 +139,7 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
if (IsSet(supported_opts, SUBOPT_BINARY))
opts->binary = false;
if (IsSet(supported_opts, SUBOPT_STREAMING))
- opts->streaming = false;
+ opts->streaming = LOGICALREP_STREAM_OFF;
if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT))
opts->twophase = false;
if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR))
@@ -242,7 +242,7 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
errorConflictingDefElem(defel, pstate);
opts->specified_opts |= SUBOPT_STREAMING;
- opts->streaming = defGetBoolean(defel);
+ opts->streaming = defGetStreamingMode(defel);
}
else if (strcmp(defel->defname, "two_phase") == 0)
{
@@ -630,7 +630,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
values[Anum_pg_subscription_subowner - 1] = ObjectIdGetDatum(owner);
values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(opts.enabled);
values[Anum_pg_subscription_subbinary - 1] = BoolGetDatum(opts.binary);
- values[Anum_pg_subscription_substream - 1] = BoolGetDatum(opts.streaming);
+ values[Anum_pg_subscription_substream - 1] = CharGetDatum(opts.streaming);
values[Anum_pg_subscription_subtwophasestate - 1] =
CharGetDatum(opts.twophase ?
LOGICALREP_TWOPHASE_STATE_PENDING :
@@ -1099,7 +1099,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
if (IsSet(opts.specified_opts, SUBOPT_STREAMING))
{
values[Anum_pg_subscription_substream - 1] =
- BoolGetDatum(opts.streaming);
+ CharGetDatum(opts.streaming);
replaces[Anum_pg_subscription_substream - 1] = true;
}
@@ -2128,3 +2128,60 @@ merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *
return oldpublist;
}
+
+/*
+ * Extract the streaming mode value from a DefElem. This is like
+ * defGetBoolean() but also accepts the special value of "parallel".
+ */
+char
+defGetStreamingMode(DefElem *def)
+{
+ /*
+ * If no parameter value given, assume "true" is meant.
+ */
+ if (!def->arg)
+ return LOGICALREP_STREAM_ON;
+
+ /*
+ * Allow 0, 1, "false", "true", "off", "on" or "parallel".
+ */
+ switch (nodeTag(def->arg))
+ {
+ case T_Integer:
+ switch (intVal(def->arg))
+ {
+ case 0:
+ return LOGICALREP_STREAM_OFF;
+ case 1:
+ return LOGICALREP_STREAM_ON;
+ default:
+ /* otherwise, error out below */
+ break;
+ }
+ break;
+ default:
+ {
+ char *sval = defGetString(def);
+
+ /*
+ * The set of strings accepted here should match up with the
+ * grammar's opt_boolean_or_string production.
+ */
+ if (pg_strcasecmp(sval, "false") == 0 ||
+ pg_strcasecmp(sval, "off") == 0)
+ return LOGICALREP_STREAM_OFF;
+ if (pg_strcasecmp(sval, "true") == 0 ||
+ pg_strcasecmp(sval, "on") == 0)
+ return LOGICALREP_STREAM_ON;
+ if (pg_strcasecmp(sval, "parallel") == 0)
+ return LOGICALREP_STREAM_PARALLEL;
+ }
+ break;
+ }
+
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("%s requires a Boolean value or \"parallel\"",
+ def->defname)));
+ return LOGICALREP_STREAM_OFF; /* keep compiler quiet */
+}