aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/pgoutput/pgoutput.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/pgoutput/pgoutput.c')
-rw-r--r--src/backend/replication/pgoutput/pgoutput.c22
1 files changed, 17 insertions, 5 deletions
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index 876adab38e5..19c10c028f9 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -18,6 +18,7 @@
#include "catalog/pg_publication_rel.h"
#include "catalog/pg_subscription.h"
#include "commands/defrem.h"
+#include "commands/subscriptioncmds.h"
#include "executor/executor.h"
#include "fmgr.h"
#include "nodes/makefuncs.h"
@@ -290,7 +291,7 @@ parse_output_parameters(List *options, PGOutputData *data)
bool origin_option_given = false;
data->binary = false;
- data->streaming = false;
+ data->streaming = LOGICALREP_STREAM_OFF;
data->messages = false;
data->two_phase = false;
@@ -369,7 +370,7 @@ parse_output_parameters(List *options, PGOutputData *data)
errmsg("conflicting or redundant options")));
streaming_given = true;
- data->streaming = defGetBoolean(defel);
+ data->streaming = defGetStreamingMode(defel);
}
else if (strcmp(defel->defname, "two_phase") == 0)
{
@@ -461,13 +462,20 @@ pgoutput_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
* we only allow it with sufficient version of the protocol, and when
* the output plugin supports it.
*/
- if (!data->streaming)
+ if (data->streaming == LOGICALREP_STREAM_OFF)
ctx->streaming = false;
- else if (data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM)
+ else if (data->streaming == LOGICALREP_STREAM_ON &&
+ data->protocol_version < LOGICALREP_PROTO_STREAM_VERSION_NUM)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("requested proto_version=%d does not support streaming, need %d or higher",
data->protocol_version, LOGICALREP_PROTO_STREAM_VERSION_NUM)));
+ else if (data->streaming == LOGICALREP_STREAM_PARALLEL &&
+ data->protocol_version < LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("requested proto_version=%d does not support parallel streaming, need %d or higher",
+ data->protocol_version, LOGICALREP_PROTO_STREAM_PARALLEL_VERSION_NUM)));
else if (!ctx->streaming)
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
@@ -1841,6 +1849,8 @@ pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
XLogRecPtr abort_lsn)
{
ReorderBufferTXN *toptxn;
+ PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+ bool write_abort_info = (data->streaming == LOGICALREP_STREAM_PARALLEL);
/*
* The abort should happen outside streaming block, even for streamed
@@ -1854,7 +1864,9 @@ pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
Assert(rbtxn_is_streamed(toptxn));
OutputPluginPrepareWrite(ctx, true);
- logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid);
+ logicalrep_write_stream_abort(ctx->out, toptxn->xid, txn->xid, abort_lsn,
+ txn->xact_time.abort_time, write_abort_info);
+
OutputPluginWrite(ctx, true);
cleanup_rel_sync_cache(toptxn->xid, false);