diff options
Diffstat (limited to 'contrib/test_decoding/test_decoding.c')
-rw-r--r-- | contrib/test_decoding/test_decoding.c | 65 |
1 files changed, 65 insertions, 0 deletions
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index 42fe91a2f9c..6b6012e095b 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -35,6 +35,7 @@ typedef struct bool include_timestamp; bool skip_empty_xacts; bool only_local; + bool include_sequences; } TestDecodingData; /* @@ -76,6 +77,10 @@ static void pg_decode_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message); +static void pg_decode_sequence(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr sequence_lsn, + Relation rel, bool transactional, + int64 last_value, int64 log_cnt, bool is_called); static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx, TransactionId xid, const char *gid); @@ -116,6 +121,10 @@ static void pg_decode_stream_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz, const char *message); +static void pg_decode_stream_sequence(LogicalDecodingContext *ctx, + ReorderBufferTXN *txn, XLogRecPtr sequence_lsn, + Relation rel, bool transactional, + int64 last_value, int64 log_cnt, bool is_called); static void pg_decode_stream_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, int nrelations, Relation relations[], @@ -141,6 +150,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->filter_by_origin_cb = pg_decode_filter; cb->shutdown_cb = pg_decode_shutdown; cb->message_cb = pg_decode_message; + cb->sequence_cb = pg_decode_sequence; cb->filter_prepare_cb = pg_decode_filter_prepare; cb->begin_prepare_cb = pg_decode_begin_prepare_txn; cb->prepare_cb = pg_decode_prepare_txn; @@ -153,6 +163,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->stream_commit_cb = pg_decode_stream_commit; cb->stream_change_cb = pg_decode_stream_change; cb->stream_message_cb = pg_decode_stream_message; + cb->stream_sequence_cb = pg_decode_stream_sequence; cb->stream_truncate_cb = pg_decode_stream_truncate; } @@ -173,6 +184,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, data->include_xids = true; data->include_timestamp = false; data->skip_empty_xacts = false; + data->include_sequences = true; data->only_local = false; ctx->output_plugin_private = data; @@ -265,6 +277,17 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, errmsg("could not parse value \"%s\" for parameter \"%s\"", strVal(elem->arg), elem->defname))); } + else if (strcmp(elem->defname, "include-sequences") == 0) + { + + if (elem->arg == NULL) + data->include_sequences = false; + else if (!parse_bool(strVal(elem->arg), &data->include_sequences)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("could not parse value \"%s\" for parameter \"%s\"", + strVal(elem->arg), elem->defname))); + } else { ereport(ERROR, @@ -745,6 +768,27 @@ pg_decode_message(LogicalDecodingContext *ctx, } static void +pg_decode_sequence(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr sequence_lsn, Relation rel, + bool transactional, + int64 last_value, int64 log_cnt, bool is_called) +{ + TestDecodingData *data = ctx->output_plugin_private; + + if (!data->include_sequences) + return; + + OutputPluginPrepareWrite(ctx, true); + appendStringInfoString(ctx->out, "sequence "); + appendStringInfoString(ctx->out, + quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(rel))), + RelationGetRelationName(rel))); + appendStringInfo(ctx->out, ": transactional:%d last_value: " INT64_FORMAT " log_cnt: " INT64_FORMAT " is_called:%d", + transactional, last_value, log_cnt, is_called); + OutputPluginWrite(ctx, true); +} + +static void pg_decode_stream_start(LogicalDecodingContext *ctx, ReorderBufferTXN *txn) { @@ -943,6 +987,27 @@ pg_decode_stream_message(LogicalDecodingContext *ctx, OutputPluginWrite(ctx, true); } +static void +pg_decode_stream_sequence(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, + XLogRecPtr sequence_lsn, Relation rel, + bool transactional, + int64 last_value, int64 log_cnt, bool is_called) +{ + TestDecodingData *data = ctx->output_plugin_private; + + if (!data->include_sequences) + return; + + OutputPluginPrepareWrite(ctx, true); + appendStringInfoString(ctx->out, "streaming sequence "); + appendStringInfoString(ctx->out, + quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(rel))), + RelationGetRelationName(rel))); + appendStringInfo(ctx->out, ": transactional:%d last_value: " INT64_FORMAT " log_cnt: " INT64_FORMAT " is_called:%d", + transactional, last_value, log_cnt, is_called); + OutputPluginWrite(ctx, true); +} + /* * In streaming mode, we don't display the detailed information of Truncate. * See pg_decode_stream_change. |