aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/pgoutput/pgoutput.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/pgoutput/pgoutput.c')
-rw-r--r--src/backend/replication/pgoutput/pgoutput.c24
1 files changed, 9 insertions, 15 deletions
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 1b993fb032d..6146c5acdb3 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -156,9 +156,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
}
static void
-parse_output_parameters(List *options, uint32 *protocol_version,
- List **publication_names, bool *binary,
- bool *enable_streaming)
+parse_output_parameters(List *options, PGOutputData *data)
{
ListCell *lc;
bool protocol_version_given = false;
@@ -166,7 +164,8 @@ parse_output_parameters(List *options, uint32 *protocol_version,
bool binary_option_given = false;
bool streaming_given = false;
- *binary = false;
+ data->binary = false;
+ data->streaming = false;
foreach(lc, options)
{
@@ -196,7 +195,7 @@ parse_output_parameters(List *options, uint32 *protocol_version,
errmsg("proto_version \"%s\" out of range",
strVal(defel->arg))));
- *protocol_version = (uint32) parsed;
+ data->protocol_version = (uint32) parsed;
}
else if (strcmp(defel->defname, "publication_names") == 0)
{
@@ -207,7 +206,7 @@ parse_output_parameters(List *options, uint32 *protocol_version,
publication_names_given = true;
if (!SplitIdentifierString(strVal(defel->arg), ',',
- publication_names))
+ &data->publication_names))
ereport(ERROR,
(errcode(ERRCODE_INVALID_NAME),
errmsg("invalid publication_names syntax")));
@@ -220,7 +219,7 @@ parse_output_parameters(List *options, uint32 *protocol_version,
errmsg("conflicting or redundant options")));
binary_option_given = true;
- *binary = defGetBoolean(defel);
+ data->binary = defGetBoolean(defel);
}
else if (strcmp(defel->defname, "streaming") == 0)
{
@@ -230,7 +229,7 @@ parse_output_parameters(List *options, uint32 *protocol_version,
errmsg("conflicting or redundant options")));
streaming_given = true;
- *enable_streaming = defGetBoolean(defel);
+ data->streaming = defGetBoolean(defel);
}
else
elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
@@ -244,7 +243,6 @@ static void
pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
bool is_init)
{
- bool enable_streaming = false;
PGOutputData *data = palloc0(sizeof(PGOutputData));
/* Create our memory context for private allocations. */
@@ -265,11 +263,7 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
if (!is_init)
{
/* Parse the params and ERROR if we see any we don't recognize */
- parse_output_parameters(ctx->output_plugin_options,
- &data->protocol_version,
- &data->publication_names,
- &data->binary,
- &enable_streaming);
+ parse_output_parameters(ctx->output_plugin_options, data);
/* Check if we support requested protocol */
if (data->protocol_version > LOGICALREP_PROTO_MAX_VERSION_NUM)
@@ -295,7 +289,7 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
* we only allow it with sufficient version of the protocol, and when
* the output plugin supports it.
*/
- if (!enable_streaming)
+ if (!data->streaming)
ctx->streaming = false;
else if (data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM)
ereport(ERROR,