diff options
author | Amit Kapila <akapila@postgresql.org> | 2021-08-04 07:47:06 +0530 |
---|---|---|
committer | Amit Kapila <akapila@postgresql.org> | 2021-08-04 07:47:06 +0530 |
commit | 63cf61cdeb7b0450dcf3b2f719c553177bac85a2 (patch) | |
tree | 91df5f810b6cd96ddd1d7f2f4bd1c286af0fa2dc /src/backend | |
parent | 6424337073589476303b10f6d7cc74f501b8d9d7 (diff) | |
download | postgresql-63cf61cdeb7b0450dcf3b2f719c553177bac85a2.tar.gz postgresql-63cf61cdeb7b0450dcf3b2f719c553177bac85a2.zip |
Add prepare API support for streaming transactions in logical replication.
Commit a8fd13cab0 added support for prepared transactions to built-in
logical replication via a new option "two_phase" for a subscription. The
"two_phase" option was not allowed with the existing streaming option.
This commit permits the combination of "streaming" and "two_phase"
subscription options. It extends the pgoutput plugin and the subscriber
side code to add the prepare API for streaming transactions which will
apply the changes accumulated in the spool-file at prepare time.
Author: Peter Smith and Ajin Cherian
Reviewed-by: Vignesh C, Amit Kapila, Greg Nancarrow
Tested-By: Haiying Tang
Discussion: https://postgr.es/m/02DA5F5E-CECE-4D9C-8B4B-418077E2C010@postgrespro.ru
Discussion: https://postgr.es/m/CAMGcDxeqEpWj3fTXwqhSwBdXd2RS9jzwWscO-XbeCfso6ts3+Q@mail.gmail.com
Diffstat (limited to 'src/backend')
-rw-r--r-- | src/backend/commands/subscriptioncmds.c | 25 | ||||
-rw-r--r-- | src/backend/replication/logical/proto.c | 29 | ||||
-rw-r--r-- | src/backend/replication/logical/worker.c | 56 | ||||
-rw-r--r-- | src/backend/replication/pgoutput/pgoutput.c | 33 |
4 files changed, 103 insertions, 40 deletions
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index 22ae9823288..5157f44058f 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -335,25 +335,6 @@ parse_subscription_options(ParseState *pstate, List *stmt_options, errmsg("subscription with %s must also set %s", "slot_name = NONE", "create_slot = false"))); } - - /* - * Do additional checking for the disallowed combination of two_phase and - * streaming. While streaming and two_phase can theoretically be - * supported, it needs more analysis to allow them together. - */ - if (opts->twophase && - IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT) && - IsSet(opts->specified_opts, SUBOPT_TWOPHASE_COMMIT)) - { - if (opts->streaming && - IsSet(supported_opts, SUBOPT_STREAMING) && - IsSet(opts->specified_opts, SUBOPT_STREAMING)) - ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - /*- translator: both %s are strings of the form "option = value" */ - errmsg("%s and %s are mutually exclusive options", - "two_phase = true", "streaming = true"))); - } } /* @@ -933,12 +914,6 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, if (IsSet(opts.specified_opts, SUBOPT_STREAMING)) { - if ((sub->twophasestate != LOGICALREP_TWOPHASE_STATE_DISABLED) && opts.streaming) - ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("cannot set %s for two-phase enabled subscription", - "streaming = true"))); - values[Anum_pg_subscription_substream - 1] = BoolGetDatum(opts.streaming); replaces[Anum_pg_subscription_substream - 1] = true; diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c index 2d774567e08..52b65e95721 100644 --- a/src/backend/replication/logical/proto.c +++ b/src/backend/replication/logical/proto.c @@ -145,7 +145,8 @@ logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_da } /* - * The core functionality for logicalrep_write_prepare. + * The core functionality for logicalrep_write_prepare and + * logicalrep_write_stream_prepare. */ static void logicalrep_write_prepare_common(StringInfo out, LogicalRepMsgType type, @@ -188,7 +189,8 @@ logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn, } /* - * The core functionality for logicalrep_read_prepare. + * The core functionality for logicalrep_read_prepare and + * logicalrep_read_stream_prepare. */ static void logicalrep_read_prepare_common(StringInfo in, char *msgtype, @@ -209,6 +211,8 @@ logicalrep_read_prepare_common(StringInfo in, char *msgtype, elog(ERROR, "end_lsn is not set in %s message", msgtype); prepare_data->prepare_time = pq_getmsgint64(in); prepare_data->xid = pq_getmsgint(in, 4); + if (prepare_data->xid == InvalidTransactionId) + elog(ERROR, "invalid two-phase transaction ID in %s message", msgtype); /* read gid (copy it into a pre-allocated buffer) */ strlcpy(prepare_data->gid, pq_getmsgstring(in), sizeof(prepare_data->gid)); @@ -340,6 +344,27 @@ logicalrep_read_rollback_prepared(StringInfo in, } /* + * Write STREAM PREPARE to the output stream. + */ +void +logicalrep_write_stream_prepare(StringInfo out, + ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn) +{ + logicalrep_write_prepare_common(out, LOGICAL_REP_MSG_STREAM_PREPARE, + txn, prepare_lsn); +} + +/* + * Read STREAM PREPARE from the stream. + */ +void +logicalrep_read_stream_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data) +{ + logicalrep_read_prepare_common(in, "stream prepare", prepare_data); +} + +/* * Write ORIGIN to the output stream. */ void diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 249de807984..ecaed157f29 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -1053,6 +1053,56 @@ apply_handle_rollback_prepared(StringInfo s) } /* + * Handle STREAM PREPARE. + * + * Logic is in two parts: + * 1. Replay all the spooled operations + * 2. Mark the transaction as prepared + */ +static void +apply_handle_stream_prepare(StringInfo s) +{ + LogicalRepPreparedTxnData prepare_data; + + if (in_streamed_transaction) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg_internal("STREAM PREPARE message without STREAM STOP"))); + + /* Tablesync should never receive prepare. */ + if (am_tablesync_worker()) + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg_internal("tablesync worker received a STREAM PREPARE message"))); + + logicalrep_read_stream_prepare(s, &prepare_data); + + elog(DEBUG1, "received prepare for streamed transaction %u", prepare_data.xid); + + /* Replay all the spooled operations. */ + apply_spooled_messages(prepare_data.xid, prepare_data.prepare_lsn); + + /* Mark the transaction as prepared. */ + apply_handle_prepare_internal(&prepare_data); + + CommitTransactionCommand(); + + pgstat_report_stat(false); + + store_flush_position(prepare_data.end_lsn); + + in_remote_transaction = false; + + /* unlink the files with serialized changes and subxact info. */ + stream_cleanup_files(MyLogicalRepWorker->subid, prepare_data.xid); + + /* Process any tables that are being synchronized in parallel. */ + process_syncing_tables(prepare_data.end_lsn); + + pgstat_report_activity(STATE_IDLE, NULL); +} + +/* * Handle ORIGIN message. * * TODO, support tracking of multiple origins @@ -1291,7 +1341,7 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn) */ oldcxt = MemoryContextSwitchTo(TopTransactionContext); - /* open the spool file for the committed transaction */ + /* Open the spool file for the committed/prepared transaction */ changes_filename(path, MyLogicalRepWorker->subid, xid); elog(DEBUG1, "replaying changes from file \"%s\"", path); @@ -2357,6 +2407,10 @@ apply_dispatch(StringInfo s) case LOGICAL_REP_MSG_ROLLBACK_PREPARED: apply_handle_rollback_prepared(s); return; + + case LOGICAL_REP_MSG_STREAM_PREPARE: + apply_handle_stream_prepare(s); + return; } ereport(ERROR, diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c index e4314af13ae..286119c8c83 100644 --- a/src/backend/replication/pgoutput/pgoutput.c +++ b/src/backend/replication/pgoutput/pgoutput.c @@ -71,6 +71,8 @@ static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx, static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn); +static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr prepare_lsn); static bool publications_valid; static bool in_streaming; @@ -175,7 +177,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->stream_message_cb = pgoutput_message; cb->stream_truncate_cb = pgoutput_truncate; /* transaction streaming - two-phase commit */ - cb->stream_prepare_cb = NULL; + cb->stream_prepare_cb = pgoutput_stream_prepare_txn; } static void @@ -280,17 +282,6 @@ parse_output_parameters(List *options, PGOutputData *data) } else elog(ERROR, "unrecognized pgoutput option: %s", defel->defname); - - /* - * Do additional checking for the disallowed combination of two_phase - * and streaming. While streaming and two_phase can theoretically be - * supported, it needs more analysis to allow them together. - */ - if (data->two_phase && data->streaming) - ereport(ERROR, - (errcode(ERRCODE_SYNTAX_ERROR), - errmsg("%s and %s are mutually exclusive options", - "two_phase", "streaming"))); } } @@ -1030,6 +1021,24 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx, } /* + * PREPARE callback (for streaming two-phase commit). + * + * Notify the downstream to prepare the transaction. + */ +static void +pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn) +{ + Assert(rbtxn_is_streamed(txn)); + + OutputPluginUpdateProgress(ctx); + OutputPluginPrepareWrite(ctx, true); + logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn); + OutputPluginWrite(ctx, true); +} + +/* * Initialize the relation schema sync cache for a decoding session. * * The hash table is destroyed at the end of a decoding session. While |