aboutsummaryrefslogtreecommitdiff
path: root/src/backend
diff options
context:
space:
mode:
authorAmit Kapila <akapila@postgresql.org>2021-08-04 07:47:06 +0530
committerAmit Kapila <akapila@postgresql.org>2021-08-04 07:47:06 +0530
commit63cf61cdeb7b0450dcf3b2f719c553177bac85a2 (patch)
tree91df5f810b6cd96ddd1d7f2f4bd1c286af0fa2dc /src/backend
parent6424337073589476303b10f6d7cc74f501b8d9d7 (diff)
downloadpostgresql-63cf61cdeb7b0450dcf3b2f719c553177bac85a2.tar.gz
postgresql-63cf61cdeb7b0450dcf3b2f719c553177bac85a2.zip
Add prepare API support for streaming transactions in logical replication.
Commit a8fd13cab0 added support for prepared transactions to built-in logical replication via a new option "two_phase" for a subscription. The "two_phase" option was not allowed with the existing streaming option. This commit permits the combination of "streaming" and "two_phase" subscription options. It extends the pgoutput plugin and the subscriber side code to add the prepare API for streaming transactions which will apply the changes accumulated in the spool-file at prepare time. Author: Peter Smith and Ajin Cherian Reviewed-by: Vignesh C, Amit Kapila, Greg Nancarrow Tested-By: Haiying Tang Discussion: https://postgr.es/m/02DA5F5E-CECE-4D9C-8B4B-418077E2C010@postgrespro.ru Discussion: https://postgr.es/m/CAMGcDxeqEpWj3fTXwqhSwBdXd2RS9jzwWscO-XbeCfso6ts3+Q@mail.gmail.com
Diffstat (limited to 'src/backend')
-rw-r--r--src/backend/commands/subscriptioncmds.c25
-rw-r--r--src/backend/replication/logical/proto.c29
-rw-r--r--src/backend/replication/logical/worker.c56
-rw-r--r--src/backend/replication/pgoutput/pgoutput.c33
4 files changed, 103 insertions, 40 deletions
diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c
index 22ae9823288..5157f44058f 100644
--- a/src/backend/commands/subscriptioncmds.c
+++ b/src/backend/commands/subscriptioncmds.c
@@ -335,25 +335,6 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
errmsg("subscription with %s must also set %s",
"slot_name = NONE", "create_slot = false")));
}
-
- /*
- * Do additional checking for the disallowed combination of two_phase and
- * streaming. While streaming and two_phase can theoretically be
- * supported, it needs more analysis to allow them together.
- */
- if (opts->twophase &&
- IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT) &&
- IsSet(opts->specified_opts, SUBOPT_TWOPHASE_COMMIT))
- {
- if (opts->streaming &&
- IsSet(supported_opts, SUBOPT_STREAMING) &&
- IsSet(opts->specified_opts, SUBOPT_STREAMING))
- ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
- /*- translator: both %s are strings of the form "option = value" */
- errmsg("%s and %s are mutually exclusive options",
- "two_phase = true", "streaming = true")));
- }
}
/*
@@ -933,12 +914,6 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
if (IsSet(opts.specified_opts, SUBOPT_STREAMING))
{
- if ((sub->twophasestate != LOGICALREP_TWOPHASE_STATE_DISABLED) && opts.streaming)
- ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("cannot set %s for two-phase enabled subscription",
- "streaming = true")));
-
values[Anum_pg_subscription_substream - 1] =
BoolGetDatum(opts.streaming);
replaces[Anum_pg_subscription_substream - 1] = true;
diff --git a/src/backend/replication/logical/proto.c b/src/backend/replication/logical/proto.c
index 2d774567e08..52b65e95721 100644
--- a/src/backend/replication/logical/proto.c
+++ b/src/backend/replication/logical/proto.c
@@ -145,7 +145,8 @@ logicalrep_read_begin_prepare(StringInfo in, LogicalRepPreparedTxnData *begin_da
}
/*
- * The core functionality for logicalrep_write_prepare.
+ * The core functionality for logicalrep_write_prepare and
+ * logicalrep_write_stream_prepare.
*/
static void
logicalrep_write_prepare_common(StringInfo out, LogicalRepMsgType type,
@@ -188,7 +189,8 @@ logicalrep_write_prepare(StringInfo out, ReorderBufferTXN *txn,
}
/*
- * The core functionality for logicalrep_read_prepare.
+ * The core functionality for logicalrep_read_prepare and
+ * logicalrep_read_stream_prepare.
*/
static void
logicalrep_read_prepare_common(StringInfo in, char *msgtype,
@@ -209,6 +211,8 @@ logicalrep_read_prepare_common(StringInfo in, char *msgtype,
elog(ERROR, "end_lsn is not set in %s message", msgtype);
prepare_data->prepare_time = pq_getmsgint64(in);
prepare_data->xid = pq_getmsgint(in, 4);
+ if (prepare_data->xid == InvalidTransactionId)
+ elog(ERROR, "invalid two-phase transaction ID in %s message", msgtype);
/* read gid (copy it into a pre-allocated buffer) */
strlcpy(prepare_data->gid, pq_getmsgstring(in), sizeof(prepare_data->gid));
@@ -340,6 +344,27 @@ logicalrep_read_rollback_prepared(StringInfo in,
}
/*
+ * Write STREAM PREPARE to the output stream.
+ */
+void
+logicalrep_write_stream_prepare(StringInfo out,
+ ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn)
+{
+ logicalrep_write_prepare_common(out, LOGICAL_REP_MSG_STREAM_PREPARE,
+ txn, prepare_lsn);
+}
+
+/*
+ * Read STREAM PREPARE from the stream.
+ */
+void
+logicalrep_read_stream_prepare(StringInfo in, LogicalRepPreparedTxnData *prepare_data)
+{
+ logicalrep_read_prepare_common(in, "stream prepare", prepare_data);
+}
+
+/*
* Write ORIGIN to the output stream.
*/
void
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 249de807984..ecaed157f29 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -1053,6 +1053,56 @@ apply_handle_rollback_prepared(StringInfo s)
}
/*
+ * Handle STREAM PREPARE.
+ *
+ * Logic is in two parts:
+ * 1. Replay all the spooled operations
+ * 2. Mark the transaction as prepared
+ */
+static void
+apply_handle_stream_prepare(StringInfo s)
+{
+ LogicalRepPreparedTxnData prepare_data;
+
+ if (in_streamed_transaction)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("STREAM PREPARE message without STREAM STOP")));
+
+ /* Tablesync should never receive prepare. */
+ if (am_tablesync_worker())
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("tablesync worker received a STREAM PREPARE message")));
+
+ logicalrep_read_stream_prepare(s, &prepare_data);
+
+ elog(DEBUG1, "received prepare for streamed transaction %u", prepare_data.xid);
+
+ /* Replay all the spooled operations. */
+ apply_spooled_messages(prepare_data.xid, prepare_data.prepare_lsn);
+
+ /* Mark the transaction as prepared. */
+ apply_handle_prepare_internal(&prepare_data);
+
+ CommitTransactionCommand();
+
+ pgstat_report_stat(false);
+
+ store_flush_position(prepare_data.end_lsn);
+
+ in_remote_transaction = false;
+
+ /* unlink the files with serialized changes and subxact info. */
+ stream_cleanup_files(MyLogicalRepWorker->subid, prepare_data.xid);
+
+ /* Process any tables that are being synchronized in parallel. */
+ process_syncing_tables(prepare_data.end_lsn);
+
+ pgstat_report_activity(STATE_IDLE, NULL);
+}
+
+/*
* Handle ORIGIN message.
*
* TODO, support tracking of multiple origins
@@ -1291,7 +1341,7 @@ apply_spooled_messages(TransactionId xid, XLogRecPtr lsn)
*/
oldcxt = MemoryContextSwitchTo(TopTransactionContext);
- /* open the spool file for the committed transaction */
+ /* Open the spool file for the committed/prepared transaction */
changes_filename(path, MyLogicalRepWorker->subid, xid);
elog(DEBUG1, "replaying changes from file \"%s\"", path);
@@ -2357,6 +2407,10 @@ apply_dispatch(StringInfo s)
case LOGICAL_REP_MSG_ROLLBACK_PREPARED:
apply_handle_rollback_prepared(s);
return;
+
+ case LOGICAL_REP_MSG_STREAM_PREPARE:
+ apply_handle_stream_prepare(s);
+ return;
}
ereport(ERROR,
diff --git a/src/backend/replication/pgoutput/pgoutput.c b/src/backend/replication/pgoutput/pgoutput.c
index e4314af13ae..286119c8c83 100644
--- a/src/backend/replication/pgoutput/pgoutput.c
+++ b/src/backend/replication/pgoutput/pgoutput.c
@@ -71,6 +71,8 @@ static void pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
static void pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
XLogRecPtr commit_lsn);
+static void pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn, XLogRecPtr prepare_lsn);
static bool publications_valid;
static bool in_streaming;
@@ -175,7 +177,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->stream_message_cb = pgoutput_message;
cb->stream_truncate_cb = pgoutput_truncate;
/* transaction streaming - two-phase commit */
- cb->stream_prepare_cb = NULL;
+ cb->stream_prepare_cb = pgoutput_stream_prepare_txn;
}
static void
@@ -280,17 +282,6 @@ parse_output_parameters(List *options, PGOutputData *data)
}
else
elog(ERROR, "unrecognized pgoutput option: %s", defel->defname);
-
- /*
- * Do additional checking for the disallowed combination of two_phase
- * and streaming. While streaming and two_phase can theoretically be
- * supported, it needs more analysis to allow them together.
- */
- if (data->two_phase && data->streaming)
- ereport(ERROR,
- (errcode(ERRCODE_SYNTAX_ERROR),
- errmsg("%s and %s are mutually exclusive options",
- "two_phase", "streaming")));
}
}
@@ -1030,6 +1021,24 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
}
/*
+ * PREPARE callback (for streaming two-phase commit).
+ *
+ * Notify the downstream to prepare the transaction.
+ */
+static void
+pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn,
+ XLogRecPtr prepare_lsn)
+{
+ Assert(rbtxn_is_streamed(txn));
+
+ OutputPluginUpdateProgress(ctx);
+ OutputPluginPrepareWrite(ctx, true);
+ logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
+ OutputPluginWrite(ctx, true);
+}
+
+/*
* Initialize the relation schema sync cache for a decoding session.
*
* The hash table is destroyed at the end of a decoding session. While