aboutsummaryrefslogtreecommitdiff
path: root/contrib/test_decoding
diff options
context:
space:
mode:
Diffstat (limited to 'contrib/test_decoding')
-rw-r--r--contrib/test_decoding/Makefile3
-rw-r--r--contrib/test_decoding/expected/replorigin.out141
-rw-r--r--contrib/test_decoding/sql/replorigin.sql64
-rw-r--r--contrib/test_decoding/test_decoding.c28
4 files changed, 235 insertions, 1 deletions
diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile
index 613e9c387b7..656eabfa005 100644
--- a/contrib/test_decoding/Makefile
+++ b/contrib/test_decoding/Makefile
@@ -37,7 +37,8 @@ submake-isolation:
submake-test_decoding:
$(MAKE) -C $(top_builddir)/contrib/test_decoding
-REGRESSCHECKS=ddl rewrite toast permissions decoding_in_xact decoding_into_rel binary prepared
+REGRESSCHECKS=ddl rewrite toast permissions decoding_in_xact decoding_into_rel \
+ binary prepared replorigin
regresscheck: all | submake-regress submake-test_decoding temp-install
$(MKDIR_P) regression_output
diff --git a/contrib/test_decoding/expected/replorigin.out b/contrib/test_decoding/expected/replorigin.out
new file mode 100644
index 00000000000..c0f512579ca
--- /dev/null
+++ b/contrib/test_decoding/expected/replorigin.out
@@ -0,0 +1,141 @@
+-- predictability
+SET synchronous_commit = on;
+CREATE TABLE origin_tbl(id serial primary key, data text);
+CREATE TABLE target_tbl(id serial primary key, data text);
+SELECT pg_replication_origin_create('test_decoding: regression_slot');
+ pg_replication_origin_create
+------------------------------
+ 1
+(1 row)
+
+-- ensure duplicate creations fail
+SELECT pg_replication_origin_create('test_decoding: regression_slot');
+ERROR: duplicate key value violates unique constraint "pg_replication_origin_roname_index"
+DETAIL: Key (roname)=(test_decoding: regression_slot) already exists.
+--ensure deletions work (once)
+SELECT pg_replication_origin_create('test_decoding: temp');
+ pg_replication_origin_create
+------------------------------
+ 2
+(1 row)
+
+SELECT pg_replication_origin_drop('test_decoding: temp');
+ pg_replication_origin_drop
+----------------------------
+
+(1 row)
+
+SELECT pg_replication_origin_drop('test_decoding: temp');
+ERROR: cache lookup failed for replication origin 'test_decoding: temp'
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+ ?column?
+----------
+ init
+(1 row)
+
+-- origin tx
+INSERT INTO origin_tbl(data) VALUES ('will be replicated and decoded and decoded again');
+INSERT INTO target_tbl(data)
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+-- as is normal, the insert into target_tbl shows up
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+ data
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ BEGIN
+ table public.target_tbl: INSERT: id[integer]:1 data[text]:'BEGIN'
+ table public.target_tbl: INSERT: id[integer]:2 data[text]:'table public.origin_tbl: INSERT: id[integer]:1 data[text]:''will be replicated and decoded and decoded again'''
+ table public.target_tbl: INSERT: id[integer]:3 data[text]:'COMMIT'
+ COMMIT
+(5 rows)
+
+INSERT INTO origin_tbl(data) VALUES ('will be replicated, but not decoded again');
+-- mark session as replaying
+SELECT pg_replication_origin_session_setup('test_decoding: regression_slot');
+ pg_replication_origin_session_setup
+-------------------------------------
+
+(1 row)
+
+-- ensure we prevent duplicate setup
+SELECT pg_replication_origin_session_setup('test_decoding: regression_slot');
+ERROR: cannot setup replication origin when one is already setup
+BEGIN;
+-- setup transaction origin
+SELECT pg_replication_origin_xact_setup('0/aabbccdd', '2013-01-01 00:00');
+ pg_replication_origin_xact_setup
+----------------------------------
+
+(1 row)
+
+INSERT INTO target_tbl(data)
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1');
+COMMIT;
+-- check replication progress for the session is correct
+SELECT pg_replication_origin_session_progress(false);
+ pg_replication_origin_session_progress
+----------------------------------------
+ 0/AABBCCDD
+(1 row)
+
+SELECT pg_replication_origin_session_progress(true);
+ pg_replication_origin_session_progress
+----------------------------------------
+ 0/AABBCCDD
+(1 row)
+
+SELECT pg_replication_origin_session_reset();
+ pg_replication_origin_session_reset
+-------------------------------------
+
+(1 row)
+
+SELECT local_id, external_id, remote_lsn, local_lsn <> '0/0' FROM pg_replication_origin_status;
+ local_id | external_id | remote_lsn | ?column?
+----------+--------------------------------+------------+----------
+ 1 | test_decoding: regression_slot | 0/AABBCCDD | t
+(1 row)
+
+-- check replication progress identified by name is correct
+SELECT pg_replication_origin_progress('test_decoding: regression_slot', false);
+ pg_replication_origin_progress
+--------------------------------
+ 0/AABBCCDD
+(1 row)
+
+SELECT pg_replication_origin_progress('test_decoding: regression_slot', true);
+ pg_replication_origin_progress
+--------------------------------
+ 0/AABBCCDD
+(1 row)
+
+-- ensure reset requires previously setup state
+SELECT pg_replication_origin_session_reset();
+ERROR: no replication origin is configured
+-- and magically the replayed xact will be filtered!
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1');
+ data
+------
+(0 rows)
+
+--but new original changes still show up
+INSERT INTO origin_tbl(data) VALUES ('will be replicated');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1');
+ data
+--------------------------------------------------------------------------------
+ BEGIN
+ table public.origin_tbl: INSERT: id[integer]:3 data[text]:'will be replicated'
+ COMMIT
+(3 rows)
+
+SELECT pg_drop_replication_slot('regression_slot');
+ pg_drop_replication_slot
+--------------------------
+
+(1 row)
+
+SELECT pg_replication_origin_drop('test_decoding: regression_slot');
+ pg_replication_origin_drop
+----------------------------
+
+(1 row)
+
diff --git a/contrib/test_decoding/sql/replorigin.sql b/contrib/test_decoding/sql/replorigin.sql
new file mode 100644
index 00000000000..e12404e106e
--- /dev/null
+++ b/contrib/test_decoding/sql/replorigin.sql
@@ -0,0 +1,64 @@
+-- predictability
+SET synchronous_commit = on;
+
+CREATE TABLE origin_tbl(id serial primary key, data text);
+CREATE TABLE target_tbl(id serial primary key, data text);
+
+SELECT pg_replication_origin_create('test_decoding: regression_slot');
+-- ensure duplicate creations fail
+SELECT pg_replication_origin_create('test_decoding: regression_slot');
+
+--ensure deletions work (once)
+SELECT pg_replication_origin_create('test_decoding: temp');
+SELECT pg_replication_origin_drop('test_decoding: temp');
+SELECT pg_replication_origin_drop('test_decoding: temp');
+
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+
+-- origin tx
+INSERT INTO origin_tbl(data) VALUES ('will be replicated and decoded and decoded again');
+INSERT INTO target_tbl(data)
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- as is normal, the insert into target_tbl shows up
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+INSERT INTO origin_tbl(data) VALUES ('will be replicated, but not decoded again');
+
+-- mark session as replaying
+SELECT pg_replication_origin_session_setup('test_decoding: regression_slot');
+
+-- ensure we prevent duplicate setup
+SELECT pg_replication_origin_session_setup('test_decoding: regression_slot');
+
+BEGIN;
+-- setup transaction origin
+SELECT pg_replication_origin_xact_setup('0/aabbccdd', '2013-01-01 00:00');
+INSERT INTO target_tbl(data)
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1');
+COMMIT;
+
+-- check replication progress for the session is correct
+SELECT pg_replication_origin_session_progress(false);
+SELECT pg_replication_origin_session_progress(true);
+
+SELECT pg_replication_origin_session_reset();
+
+SELECT local_id, external_id, remote_lsn, local_lsn <> '0/0' FROM pg_replication_origin_status;
+
+-- check replication progress identified by name is correct
+SELECT pg_replication_origin_progress('test_decoding: regression_slot', false);
+SELECT pg_replication_origin_progress('test_decoding: regression_slot', true);
+
+-- ensure reset requires previously setup state
+SELECT pg_replication_origin_session_reset();
+
+-- and magically the replayed xact will be filtered!
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1');
+
+--but new original changes still show up
+INSERT INTO origin_tbl(data) VALUES ('will be replicated');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1');
+
+SELECT pg_drop_replication_slot('regression_slot');
+SELECT pg_replication_origin_drop('test_decoding: regression_slot');
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 963d5df9dae..bca03ee21b4 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -21,6 +21,7 @@
#include "replication/output_plugin.h"
#include "replication/logical.h"
+#include "replication/origin.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
@@ -43,6 +44,7 @@ typedef struct
bool include_timestamp;
bool skip_empty_xacts;
bool xact_wrote_changes;
+ bool only_local;
} TestDecodingData;
static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
@@ -59,6 +61,8 @@ static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
static void pg_decode_change(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn, Relation rel,
ReorderBufferChange *change);
+static bool pg_decode_filter(LogicalDecodingContext *ctx,
+ RepOriginId origin_id);
void
_PG_init(void)
@@ -76,6 +80,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
cb->begin_cb = pg_decode_begin_txn;
cb->change_cb = pg_decode_change;
cb->commit_cb = pg_decode_commit_txn;
+ cb->filter_by_origin_cb = pg_decode_filter;
cb->shutdown_cb = pg_decode_shutdown;
}
@@ -97,6 +102,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
data->include_xids = true;
data->include_timestamp = false;
data->skip_empty_xacts = false;
+ data->only_local = false;
ctx->output_plugin_private = data;
@@ -155,6 +161,17 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
errmsg("could not parse value \"%s\" for parameter \"%s\"",
strVal(elem->arg), elem->defname)));
}
+ else if (strcmp(elem->defname, "only-local") == 0)
+ {
+
+ if (elem->arg == NULL)
+ data->only_local = true;
+ else if (!parse_bool(strVal(elem->arg), &data->only_local))
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("could not parse value \"%s\" for parameter \"%s\"",
+ strVal(elem->arg), elem->defname)));
+ }
else
{
ereport(ERROR,
@@ -223,6 +240,17 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
OutputPluginWrite(ctx, true);
}
+static bool
+pg_decode_filter(LogicalDecodingContext *ctx,
+ RepOriginId origin_id)
+{
+ TestDecodingData *data = ctx->output_plugin_private;
+
+ if (data->only_local && origin_id != InvalidRepOriginId)
+ return true;
+ return false;
+}
+
/*
* Print literal `outputstr' already represented as string of type `typid'
* into stringbuf `s'.