aboutsummaryrefslogtreecommitdiff
path: root/contrib/test_decoding/test_decoding.c
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/test_decoding/test_decoding.c')
-rw-r--r--contrib/test_decoding/test_decoding.c65
1 files changed, 65 insertions, 0 deletions
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 42fe91a2f9c..6b6012e095b 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -35,6 +35,7 @@ typedef struct
bool include_timestamp;
bool skip_empty_xacts;
bool only_local;
+ bool include_sequences;
} TestDecodingData;
/*
@@ -76,6 +77,10 @@ static void pg_decode_message(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr message_lsn,
bool transactional, const char *prefix,
Size sz, const char *message);
+static void pg_decode_sequence(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn, XLogRecPtr sequence_lsn,
+ Relation rel, bool transactional,
+ int64 last_value, int64 log_cnt, bool is_called);
static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx,
TransactionId xid,
const char *gid);
@@ -116,6 +121,10 @@ static void pg_decode_stream_message(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, XLogRecPtr message_lsn,
bool transactional, const char *prefix,
Size sz, const char *message);
+static void pg_decode_stream_sequence(LogicalDecodingContext *ctx,
+ ReorderBufferTXN *txn, XLogRecPtr sequence_lsn,
+ Relation rel, bool transactional,
+ int64 last_value, int64 log_cnt, bool is_called);
static void pg_decode_stream_truncate(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn,
int nrelations, Relation relations[],
@@ -141,6 +150,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->filter_by_origin_cb = pg_decode_filter;
cb->shutdown_cb = pg_decode_shutdown;
cb->message_cb = pg_decode_message;
+ cb->sequence_cb = pg_decode_sequence;
cb->filter_prepare_cb = pg_decode_filter_prepare;
cb->begin_prepare_cb = pg_decode_begin_prepare_txn;
cb->prepare_cb = pg_decode_prepare_txn;
@@ -153,6 +163,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->stream_commit_cb = pg_decode_stream_commit;
cb->stream_change_cb = pg_decode_stream_change;
cb->stream_message_cb = pg_decode_stream_message;
+ cb->stream_sequence_cb = pg_decode_stream_sequence;
cb->stream_truncate_cb = pg_decode_stream_truncate;
}
@@ -173,6 +184,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
data->include_xids = true;
data->include_timestamp = false;
data->skip_empty_xacts = false;
+ data->include_sequences = true;
data->only_local = false;
ctx->output_plugin_private = data;
@@ -265,6 +277,17 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
errmsg("could not parse value \"%s\" for parameter \"%s\"",
strVal(elem->arg), elem->defname)));
}
+ else if (strcmp(elem->defname, "include-sequences") == 0)
+ {
+
+ if (elem->arg == NULL)
+ data->include_sequences = false;
+ else if (!parse_bool(strVal(elem->arg), &data->include_sequences))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("could not parse value \"%s\" for parameter \"%s\"",
+ strVal(elem->arg), elem->defname)));
+ }
else
{
ereport(ERROR,
@@ -745,6 +768,27 @@ pg_decode_message(LogicalDecodingContext *ctx,
}
static void
+pg_decode_sequence(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+ XLogRecPtr sequence_lsn, Relation rel,
+ bool transactional,
+ int64 last_value, int64 log_cnt, bool is_called)
+{
+ TestDecodingData *data = ctx->output_plugin_private;
+
+ if (!data->include_sequences)
+ return;
+
+ OutputPluginPrepareWrite(ctx, true);
+ appendStringInfoString(ctx->out, "sequence ");
+ appendStringInfoString(ctx->out,
+ quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(rel))),
+ RelationGetRelationName(rel)));
+ appendStringInfo(ctx->out, ": transactional:%d last_value: " INT64_FORMAT " log_cnt: " INT64_FORMAT " is_called:%d",
+ transactional, last_value, log_cnt, is_called);
+ OutputPluginWrite(ctx, true);
+}
+
+static void
pg_decode_stream_start(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn)
{
@@ -943,6 +987,27 @@ pg_decode_stream_message(LogicalDecodingContext *ctx,
OutputPluginWrite(ctx, true);
}
+static void
+pg_decode_stream_sequence(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+ XLogRecPtr sequence_lsn, Relation rel,
+ bool transactional,
+ int64 last_value, int64 log_cnt, bool is_called)
+{
+ TestDecodingData *data = ctx->output_plugin_private;
+
+ if (!data->include_sequences)
+ return;
+
+ OutputPluginPrepareWrite(ctx, true);
+ appendStringInfoString(ctx->out, "streaming sequence ");
+ appendStringInfoString(ctx->out,
+ quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(rel))),
+ RelationGetRelationName(rel)));
+ appendStringInfo(ctx->out, ": transactional:%d last_value: " INT64_FORMAT " log_cnt: " INT64_FORMAT " is_called:%d",
+ transactional, last_value, log_cnt, is_called);
+ OutputPluginWrite(ctx, true);
+}
+
/*
* In streaming mode, we don't display the detailed information of Truncate.
* See pg_decode_stream_change.