diff options
Diffstat (limited to 'contrib/test_decoding')
-rw-r--r-- | contrib/test_decoding/Makefile | 3 | ||||
-rw-r--r-- | contrib/test_decoding/expected/replorigin.out | 141 | ||||
-rw-r--r-- | contrib/test_decoding/sql/replorigin.sql | 64 | ||||
-rw-r--r-- | contrib/test_decoding/test_decoding.c | 28 |
4 files changed, 235 insertions, 1 deletions
diff --git a/contrib/test_decoding/Makefile b/contrib/test_decoding/Makefile index 613e9c387b7..656eabfa005 100644 --- a/contrib/test_decoding/Makefile +++ b/contrib/test_decoding/Makefile @@ -37,7 +37,8 @@ submake-isolation: submake-test_decoding: $(MAKE) -C $(top_builddir)/contrib/test_decoding -REGRESSCHECKS=ddl rewrite toast permissions decoding_in_xact decoding_into_rel binary prepared +REGRESSCHECKS=ddl rewrite toast permissions decoding_in_xact decoding_into_rel \ + binary prepared replorigin regresscheck: all | submake-regress submake-test_decoding temp-install $(MKDIR_P) regression_output diff --git a/contrib/test_decoding/expected/replorigin.out b/contrib/test_decoding/expected/replorigin.out new file mode 100644 index 00000000000..c0f512579ca --- /dev/null +++ b/contrib/test_decoding/expected/replorigin.out @@ -0,0 +1,141 @@ +-- predictability +SET synchronous_commit = on; +CREATE TABLE origin_tbl(id serial primary key, data text); +CREATE TABLE target_tbl(id serial primary key, data text); +SELECT pg_replication_origin_create('test_decoding: regression_slot'); + pg_replication_origin_create +------------------------------ + 1 +(1 row) + +-- ensure duplicate creations fail +SELECT pg_replication_origin_create('test_decoding: regression_slot'); +ERROR: duplicate key value violates unique constraint "pg_replication_origin_roname_index" +DETAIL: Key (roname)=(test_decoding: regression_slot) already exists. +--ensure deletions work (once) +SELECT pg_replication_origin_create('test_decoding: temp'); + pg_replication_origin_create +------------------------------ + 2 +(1 row) + +SELECT pg_replication_origin_drop('test_decoding: temp'); + pg_replication_origin_drop +---------------------------- + +(1 row) + +SELECT pg_replication_origin_drop('test_decoding: temp'); +ERROR: cache lookup failed for replication origin 'test_decoding: temp' +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + ?column? +---------- + init +(1 row) + +-- origin tx +INSERT INTO origin_tbl(data) VALUES ('will be replicated and decoded and decoded again'); +INSERT INTO target_tbl(data) +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); +-- as is normal, the insert into target_tbl shows up +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + data +---------------------------------------------------------------------------------------------------------------------------------------------------------------------------- + BEGIN + table public.target_tbl: INSERT: id[integer]:1 data[text]:'BEGIN' + table public.target_tbl: INSERT: id[integer]:2 data[text]:'table public.origin_tbl: INSERT: id[integer]:1 data[text]:''will be replicated and decoded and decoded again''' + table public.target_tbl: INSERT: id[integer]:3 data[text]:'COMMIT' + COMMIT +(5 rows) + +INSERT INTO origin_tbl(data) VALUES ('will be replicated, but not decoded again'); +-- mark session as replaying +SELECT pg_replication_origin_session_setup('test_decoding: regression_slot'); + pg_replication_origin_session_setup +------------------------------------- + +(1 row) + +-- ensure we prevent duplicate setup +SELECT pg_replication_origin_session_setup('test_decoding: regression_slot'); +ERROR: cannot setup replication origin when one is already setup +BEGIN; +-- setup transaction origin +SELECT pg_replication_origin_xact_setup('0/aabbccdd', '2013-01-01 00:00'); + pg_replication_origin_xact_setup +---------------------------------- + +(1 row) + +INSERT INTO target_tbl(data) +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1'); +COMMIT; +-- check replication progress for the session is correct +SELECT pg_replication_origin_session_progress(false); + pg_replication_origin_session_progress +---------------------------------------- + 0/AABBCCDD +(1 row) + +SELECT pg_replication_origin_session_progress(true); + pg_replication_origin_session_progress +---------------------------------------- + 0/AABBCCDD +(1 row) + +SELECT pg_replication_origin_session_reset(); + pg_replication_origin_session_reset +------------------------------------- + +(1 row) + +SELECT local_id, external_id, remote_lsn, local_lsn <> '0/0' FROM pg_replication_origin_status; + local_id | external_id | remote_lsn | ?column? +----------+--------------------------------+------------+---------- + 1 | test_decoding: regression_slot | 0/AABBCCDD | t +(1 row) + +-- check replication progress identified by name is correct +SELECT pg_replication_origin_progress('test_decoding: regression_slot', false); + pg_replication_origin_progress +-------------------------------- + 0/AABBCCDD +(1 row) + +SELECT pg_replication_origin_progress('test_decoding: regression_slot', true); + pg_replication_origin_progress +-------------------------------- + 0/AABBCCDD +(1 row) + +-- ensure reset requires previously setup state +SELECT pg_replication_origin_session_reset(); +ERROR: no replication origin is configured +-- and magically the replayed xact will be filtered! +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1'); + data +------ +(0 rows) + +--but new original changes still show up +INSERT INTO origin_tbl(data) VALUES ('will be replicated'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1'); + data +-------------------------------------------------------------------------------- + BEGIN + table public.origin_tbl: INSERT: id[integer]:3 data[text]:'will be replicated' + COMMIT +(3 rows) + +SELECT pg_drop_replication_slot('regression_slot'); + pg_drop_replication_slot +-------------------------- + +(1 row) + +SELECT pg_replication_origin_drop('test_decoding: regression_slot'); + pg_replication_origin_drop +---------------------------- + +(1 row) + diff --git a/contrib/test_decoding/sql/replorigin.sql b/contrib/test_decoding/sql/replorigin.sql new file mode 100644 index 00000000000..e12404e106e --- /dev/null +++ b/contrib/test_decoding/sql/replorigin.sql @@ -0,0 +1,64 @@ +-- predictability +SET synchronous_commit = on; + +CREATE TABLE origin_tbl(id serial primary key, data text); +CREATE TABLE target_tbl(id serial primary key, data text); + +SELECT pg_replication_origin_create('test_decoding: regression_slot'); +-- ensure duplicate creations fail +SELECT pg_replication_origin_create('test_decoding: regression_slot'); + +--ensure deletions work (once) +SELECT pg_replication_origin_create('test_decoding: temp'); +SELECT pg_replication_origin_drop('test_decoding: temp'); +SELECT pg_replication_origin_drop('test_decoding: temp'); + +SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding'); + +-- origin tx +INSERT INTO origin_tbl(data) VALUES ('will be replicated and decoded and decoded again'); +INSERT INTO target_tbl(data) +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + +-- as is normal, the insert into target_tbl shows up +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1'); + +INSERT INTO origin_tbl(data) VALUES ('will be replicated, but not decoded again'); + +-- mark session as replaying +SELECT pg_replication_origin_session_setup('test_decoding: regression_slot'); + +-- ensure we prevent duplicate setup +SELECT pg_replication_origin_session_setup('test_decoding: regression_slot'); + +BEGIN; +-- setup transaction origin +SELECT pg_replication_origin_xact_setup('0/aabbccdd', '2013-01-01 00:00'); +INSERT INTO target_tbl(data) +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1'); +COMMIT; + +-- check replication progress for the session is correct +SELECT pg_replication_origin_session_progress(false); +SELECT pg_replication_origin_session_progress(true); + +SELECT pg_replication_origin_session_reset(); + +SELECT local_id, external_id, remote_lsn, local_lsn <> '0/0' FROM pg_replication_origin_status; + +-- check replication progress identified by name is correct +SELECT pg_replication_origin_progress('test_decoding: regression_slot', false); +SELECT pg_replication_origin_progress('test_decoding: regression_slot', true); + +-- ensure reset requires previously setup state +SELECT pg_replication_origin_session_reset(); + +-- and magically the replayed xact will be filtered! +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1'); + +--but new original changes still show up +INSERT INTO origin_tbl(data) VALUES ('will be replicated'); +SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'only-local', '1'); + +SELECT pg_drop_replication_slot('regression_slot'); +SELECT pg_replication_origin_drop('test_decoding: regression_slot'); diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c index 963d5df9dae..bca03ee21b4 100644 --- a/contrib/test_decoding/test_decoding.c +++ b/contrib/test_decoding/test_decoding.c @@ -21,6 +21,7 @@ #include "replication/output_plugin.h" #include "replication/logical.h" +#include "replication/origin.h" #include "utils/builtins.h" #include "utils/lsyscache.h" @@ -43,6 +44,7 @@ typedef struct bool include_timestamp; bool skip_empty_xacts; bool xact_wrote_changes; + bool only_local; } TestDecodingData; static void pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, @@ -59,6 +61,8 @@ static void pg_decode_commit_txn(LogicalDecodingContext *ctx, static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, Relation rel, ReorderBufferChange *change); +static bool pg_decode_filter(LogicalDecodingContext *ctx, + RepOriginId origin_id); void _PG_init(void) @@ -76,6 +80,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb) cb->begin_cb = pg_decode_begin_txn; cb->change_cb = pg_decode_change; cb->commit_cb = pg_decode_commit_txn; + cb->filter_by_origin_cb = pg_decode_filter; cb->shutdown_cb = pg_decode_shutdown; } @@ -97,6 +102,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, data->include_xids = true; data->include_timestamp = false; data->skip_empty_xacts = false; + data->only_local = false; ctx->output_plugin_private = data; @@ -155,6 +161,17 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt, errmsg("could not parse value \"%s\" for parameter \"%s\"", strVal(elem->arg), elem->defname))); } + else if (strcmp(elem->defname, "only-local") == 0) + { + + if (elem->arg == NULL) + data->only_local = true; + else if (!parse_bool(strVal(elem->arg), &data->only_local)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("could not parse value \"%s\" for parameter \"%s\"", + strVal(elem->arg), elem->defname))); + } else { ereport(ERROR, @@ -223,6 +240,17 @@ pg_decode_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, OutputPluginWrite(ctx, true); } +static bool +pg_decode_filter(LogicalDecodingContext *ctx, + RepOriginId origin_id) +{ + TestDecodingData *data = ctx->output_plugin_private; + + if (data->only_local && origin_id != InvalidRepOriginId) + return true; + return false; +} + /* * Print literal `outputstr' already represented as string of type `typid' * into stringbuf `s'. |