From 0aa8a01d04c8fe200b7a106878eebc3d0af9105c Mon Sep 17 00:00:00 2001 From: Amit Kapila Date: Wed, 30 Dec 2020 16:17:26 +0530 Subject: Extend the output plugin API to allow decoding of prepared xacts. This adds six methods to the output plugin API, adding support for streaming changes of two-phase transactions at prepare time. * begin_prepare * filter_prepare * prepare * commit_prepared * rollback_prepared * stream_prepare Most of this is a simple extension of the existing methods, with the semantic difference that the transaction is not yet committed and maybe aborted later. Until now two-phase transactions were translated into regular transactions on the subscriber, and the GID was not forwarded to it. None of the two-phase commands were communicated to the subscriber. This patch provides the infrastructure for logical decoding plugins to be informed of two-phase commands Like PREPARE TRANSACTION, COMMIT PREPARED and ROLLBACK PREPARED commands with the corresponding GID. This also extends the 'test_decoding' plugin, implementing these new methods. This commit simply adds these new APIs and the upcoming patch to "allow the decoding at prepare time in ReorderBuffer" will use these APIs. Author: Ajin Cherian and Amit Kapila based on previous work by Nikhil Sontakke and Stas Kelvich Reviewed-by: Amit Kapila, Peter Smith, Sawada Masahiko, and Dilip Kumar Discussion: https://postgr.es/m/02DA5F5E-CECE-4D9C-8B4B-418077E2C010@postgrespro.ru https://postgr.es/m/CAMGcDxeqEpWj3fTXwqhSwBdXd2RS9jzwWscO-XbeCfso6ts3+Q@mail.gmail.com --- contrib/test_decoding/test_decoding.c | 167 ++++++++++++++++++++++++++++++++++ 1 file changed, 167 insertions(+) (limited to 'contrib/test_decoding/test_decoding.c') diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index e12278beb58..05763553a40 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -76,6 +76,20 @@ static void pg_decode_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message); +static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx, + const char *gid); +static void pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn); +static void pg_decode_prepare_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); +static void pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr commit_lsn); +static void pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr prepare_end_lsn, + TimestampTz prepare_time); static void pg_decode_stream_start(LogicalDecodingContext *ctx, ReorderBufferTXN *txn); static void pg_output_stream_start(LogicalDecodingContext *ctx, @@ -87,6 +101,9 @@ static void pg_decode_stream_stop(LogicalDecodingContext *ctx, static void pg_decode_stream_abort(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr abort_lsn); +static void pg_decode_stream_prepare(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn); static void pg_decode_stream_commit(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr commit_lsn); @@ -123,9 +140,15 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->filter_by_origin_cb = pg_decode_filter; cb->shutdown_cb = pg_decode_shutdown; cb->message_cb = pg_decode_message; + cb->filter_prepare_cb = pg_decode_filter_prepare; + cb->begin_prepare_cb = pg_decode_begin_prepare_txn; + cb->prepare_cb = pg_decode_prepare_txn; + cb->commit_prepared_cb = pg_decode_commit_prepared_txn; + cb->rollback_prepared_cb = pg_decode_rollback_prepared_txn; cb->stream_start_cb = pg_decode_stream_start; cb->stream_stop_cb = pg_decode_stream_stop; cb->stream_abort_cb = pg_decode_stream_abort; + cb->stream_prepare_cb = pg_decode_stream_prepare; cb->stream_commit_cb = pg_decode_stream_commit; cb->stream_change_cb = pg_decode_stream_change; cb->stream_message_cb = pg_decode_stream_message; @@ -141,6 +164,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, ListCell *option; TestDecodingData *data; bool enable_streaming = false; + bool enable_twophase = false; data = palloc0(sizeof(TestDecodingData)); data->context = AllocSetContextCreate(ctx->context, @@ -241,6 +265,16 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, errmsg("could not parse value \"%s\" for parameter \"%s\"", strVal(elem->arg), elem->defname))); } + else if (strcmp(elem->defname, "two-phase-commit") == 0) + { + if (elem->arg == NULL) + continue; + else if (!parse_bool(strVal(elem->arg), &enable_twophase)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("could not parse value \"%s\" for parameter \"%s\"", + strVal(elem->arg), elem->defname))); + } else { ereport(ERROR, @@ -252,6 +286,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, } ctx->streaming &= enable_streaming; + ctx->twophase &= enable_twophase; } /* cleanup this plugin's resources */ @@ -320,6 +355,111 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, OutputPluginWrite(ctx, true); } +/* BEGIN PREPARE callback */ +static void +pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) +{ + TestDecodingData *data = ctx->output_plugin_private; + TestDecodingTxnData *txndata = + MemoryContextAllocZero(ctx->context, sizeof(TestDecodingTxnData)); + + txndata->xact_wrote_changes = false; + txn->output_plugin_private = txndata; + + if (data->skip_empty_xacts) + return; + + pg_output_begin(ctx, data, txn, true); +} + +/* PREPARE callback */ +static void +pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn) +{ + TestDecodingData *data = ctx->output_plugin_private; + TestDecodingTxnData *txndata = txn->output_plugin_private; + + if (data->skip_empty_xacts && !txndata->xact_wrote_changes) + return; + + OutputPluginPrepareWrite(ctx, true); + + appendStringInfo(ctx->out, "PREPARE TRANSACTION %s", + quote_literal_cstr(txn->gid)); + + if (data->include_xids) + appendStringInfo(ctx->out, ", txid %u", txn->xid); + + if (data->include_timestamp) + appendStringInfo(ctx->out, " (at %s)", + timestamptz_to_str(txn->commit_time)); + + OutputPluginWrite(ctx, true); +} + +/* COMMIT PREPARED callback */ +static void +pg_decode_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr commit_lsn) +{ + TestDecodingData *data = ctx->output_plugin_private; + + OutputPluginPrepareWrite(ctx, true); + + appendStringInfo(ctx->out, "COMMIT PREPARED %s", + quote_literal_cstr(txn->gid)); + + if (data->include_xids) + appendStringInfo(ctx->out, ", txid %u", txn->xid); + + if (data->include_timestamp) + appendStringInfo(ctx->out, " (at %s)", + timestamptz_to_str(txn->commit_time)); + + OutputPluginWrite(ctx, true); +} + +/* ROLLBACK PREPARED callback */ +static void +pg_decode_rollback_prepared_txn(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr prepare_end_lsn, + TimestampTz prepare_time) +{ + TestDecodingData *data = ctx->output_plugin_private; + + OutputPluginPrepareWrite(ctx, true); + + appendStringInfo(ctx->out, "ROLLBACK PREPARED %s", + quote_literal_cstr(txn->gid)); + + if (data->include_xids) + appendStringInfo(ctx->out, ", txid %u", txn->xid); + + if (data->include_timestamp) + appendStringInfo(ctx->out, " (at %s)", + timestamptz_to_str(txn->commit_time)); + + OutputPluginWrite(ctx, true); +} + +/* + * Filter out two-phase transactions. + * + * Each plugin can implement its own filtering logic. Here we demonstrate a + * simple logic by checking the GID. If the GID contains the "_nodecode" + * substring, then we filter it out. + */ +static bool +pg_decode_filter_prepare(LogicalDecodingContext *ctx, const char *gid) +{ + if (strstr(gid, "_nodecode") != NULL) + return true; + + return false; +} + static bool pg_decode_filter(LogicalDecodingContext *ctx, RepOriginId origin_id) @@ -701,6 +841,33 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx, OutputPluginWrite(ctx, true); } +static void +pg_decode_stream_prepare(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, + XLogRecPtr prepare_lsn) +{ + TestDecodingData *data = ctx->output_plugin_private; + TestDecodingTxnData *txndata = txn->output_plugin_private; + + if (data->skip_empty_xacts && !txndata->xact_wrote_changes) + return; + + OutputPluginPrepareWrite(ctx, true); + + if (data->include_xids) + appendStringInfo(ctx->out, "preparing streamed transaction TXN %s, txid %u", + quote_literal_cstr(txn->gid), txn->xid); + else + appendStringInfo(ctx->out, "preparing streamed transaction %s", + quote_literal_cstr(txn->gid)); + + if (data->include_timestamp) + appendStringInfo(ctx->out, " (at %s)", + timestamptz_to_str(txn->commit_time)); + + OutputPluginWrite(ctx, true); +} + static void pg_decode_stream_commit(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, -- cgit v1.2.3