aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAmit Kapila <akapila@postgresql.org>2021-03-03 07:28:43 +0530
committerAmit Kapila <akapila@postgresql.org>2021-03-03 07:34:11 +0530
commit19890a064ebf53dedcefed0d8339ed3d449b06e6 (patch)
tree76adf39f2d69ce5d6f8bda9efc48c34c0879e6bc
parentee28cacf619f4d9c23af5a80e1171a5adae97381 (diff)
downloadpostgresql-19890a064ebf53dedcefed0d8339ed3d449b06e6.tar.gz
postgresql-19890a064ebf53dedcefed0d8339ed3d449b06e6.zip
Add option to enable two_phase commits via pg_create_logical_replication_slot.
Commit 0aa8a01d04 extends the output plugin API to allow decoding of prepared xacts and allowed the user to enable/disable the two-phase option via pg_logical_slot_get_changes(). This can lead to a problem such that the first time when it gets changes via pg_logical_slot_get_changes() without two_phase option enabled it will not get the prepared even though prepare is after consistent snapshot. Now next time during getting changes, if the two_phase option is enabled it can skip prepare because by that time start decoding point has been moved. So the user will only get commit prepared. Allow to enable/disable this option at the create slot time and default will be false. It will break the existing slots which is fine in a major release. Author: Ajin Cherian Reviewed-by: Amit Kapila and Vignesh C Discussion: https://postgr.es/m/d0f60d60-133d-bf8d-bd70-47784d8fabf3@enterprisedb.com
-rw-r--r--contrib/test_decoding/expected/twophase.out34
-rw-r--r--contrib/test_decoding/expected/twophase_snapshot.out6
-rw-r--r--contrib/test_decoding/expected/twophase_stream.out10
-rw-r--r--contrib/test_decoding/specs/twophase_snapshot.spec4
-rw-r--r--contrib/test_decoding/sql/twophase.sql34
-rw-r--r--contrib/test_decoding/sql/twophase_stream.sql10
-rw-r--r--contrib/test_decoding/test_decoding.c12
-rw-r--r--doc/src/sgml/catalogs.sgml10
-rw-r--r--doc/src/sgml/func.sgml10
-rw-r--r--doc/src/sgml/logicaldecoding.sgml19
-rw-r--r--src/backend/catalog/system_views.sql4
-rw-r--r--src/backend/replication/logical/logical.c12
-rw-r--r--src/backend/replication/slot.c10
-rw-r--r--src/backend/replication/slotfuncs.c14
-rw-r--r--src/backend/replication/walsender.c6
-rw-r--r--src/include/catalog/catversion.h2
-rw-r--r--src/include/catalog/pg_proc.dat14
-rw-r--r--src/include/nodes/replnodes.h1
-rw-r--r--src/include/replication/slot.h7
-rw-r--r--src/test/regress/expected/rules.out5
20 files changed, 131 insertions, 93 deletions
diff --git a/contrib/test_decoding/expected/twophase.out b/contrib/test_decoding/expected/twophase.out
index 8a1d06d706d..e5e0f968961 100644
--- a/contrib/test_decoding/expected/twophase.out
+++ b/contrib/test_decoding/expected/twophase.out
@@ -1,7 +1,7 @@
-- Test prepared transactions. When two-phase-commit is enabled, transactions are
-- decoded at PREPARE time rather than at COMMIT PREPARED time.
SET synchronous_commit = on;
-SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding', false, true);
?column?
----------
init
@@ -15,14 +15,14 @@ BEGIN;
INSERT INTO test_prepared1 VALUES (1);
INSERT INTO test_prepared1 VALUES (2);
-- should show nothing because the xact has not been prepared yet.
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data
------
(0 rows)
PREPARE TRANSACTION 'test_prepared#1';
-- should show both the above inserts and the PREPARE TRANSACTION.
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data
----------------------------------------------------
BEGIN
@@ -32,7 +32,7 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two
(4 rows)
COMMIT PREPARED 'test_prepared#1';
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data
-----------------------------------
COMMIT PREPARED 'test_prepared#1'
@@ -42,7 +42,7 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two
BEGIN;
INSERT INTO test_prepared1 VALUES (3);
PREPARE TRANSACTION 'test_prepared#2';
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data
----------------------------------------------------
BEGIN
@@ -51,7 +51,7 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two
(3 rows)
ROLLBACK PREPARED 'test_prepared#2';
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data
-------------------------------------
ROLLBACK PREPARED 'test_prepared#2'
@@ -74,7 +74,7 @@ WHERE locktype = 'relation'
(2 rows)
-- The insert should show the newly altered column but not the DDL.
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data
-------------------------------------------------------------------------
BEGIN
@@ -89,7 +89,7 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two
-- the ALTER will stop us inserting into the other one.
--
INSERT INTO test_prepared2 VALUES (5);
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data
----------------------------------------------------
BEGIN
@@ -98,7 +98,7 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two
(3 rows)
COMMIT PREPARED 'test_prepared#3';
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data
-----------------------------------
COMMIT PREPARED 'test_prepared#3'
@@ -107,7 +107,7 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two
-- make sure stuff still works
INSERT INTO test_prepared1 VALUES (6);
INSERT INTO test_prepared2 VALUES (7);
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data
--------------------------------------------------------------------
BEGIN
@@ -138,7 +138,7 @@ WHERE locktype = 'relation'
-- The above CLUSTER command shouldn't cause a timeout on 2pc decoding.
SET statement_timeout = '180s';
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data
---------------------------------------------------------------------------
BEGIN
@@ -150,7 +150,7 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two
RESET statement_timeout;
COMMIT PREPARED 'test_prepared_lock';
-- consume the commit
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data
--------------------------------------
COMMIT PREPARED 'test_prepared_lock'
@@ -166,7 +166,7 @@ INSERT INTO test_prepared_savepoint VALUES (2);
ROLLBACK TO SAVEPOINT test_savepoint;
PREPARE TRANSACTION 'test_prepared_savepoint';
-- should show only 1, not 2
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data
------------------------------------------------------------
BEGIN
@@ -176,7 +176,7 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two
COMMIT PREPARED 'test_prepared_savepoint';
-- consume the commit
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data
-------------------------------------------
COMMIT PREPARED 'test_prepared_savepoint'
@@ -187,14 +187,14 @@ BEGIN;
INSERT INTO test_prepared1 VALUES (20);
PREPARE TRANSACTION 'test_prepared_nodecode';
-- should show nothing
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data
------
(0 rows)
COMMIT PREPARED 'test_prepared_nodecode';
-- should be decoded now
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data
---------------------------------------------------------------------
BEGIN
@@ -207,7 +207,7 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two
DROP TABLE test_prepared1;
DROP TABLE test_prepared2;
-- show results. There should be nothing to show
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
data
------
(0 rows)
diff --git a/contrib/test_decoding/expected/twophase_snapshot.out b/contrib/test_decoding/expected/twophase_snapshot.out
index 14d93876462..0e8e1f50fe4 100644
--- a/contrib/test_decoding/expected/twophase_snapshot.out
+++ b/contrib/test_decoding/expected/twophase_snapshot.out
@@ -6,7 +6,7 @@ step s2txid: SELECT pg_current_xact_id() IS NULL;
?column?
f
-step s1init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding'); <waiting ...>
+step s1init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding', false, true); <waiting ...>
step s3b: BEGIN;
step s3txid: SELECT pg_current_xact_id() IS NULL;
?column?
@@ -22,14 +22,14 @@ step s1init: <... completed>
init
step s1insert: INSERT INTO do_write DEFAULT VALUES;
-step s1start: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false', 'skip-empty-xacts', '1', 'two-phase-commit', '1');
+step s1start: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false', 'skip-empty-xacts', '1');
data
BEGIN
table public.do_write: INSERT: id[integer]:2
COMMIT
step s2cp: COMMIT PREPARED 'test1';
-step s1start: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false', 'skip-empty-xacts', '1', 'two-phase-commit', '1');
+step s1start: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false', 'skip-empty-xacts', '1');
data
BEGIN
diff --git a/contrib/test_decoding/expected/twophase_stream.out b/contrib/test_decoding/expected/twophase_stream.out
index d54e640b409..b08bb0e5730 100644
--- a/contrib/test_decoding/expected/twophase_stream.out
+++ b/contrib/test_decoding/expected/twophase_stream.out
@@ -1,6 +1,6 @@
-- Test streaming of two-phase commits
SET synchronous_commit = on;
-SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding', false, true);
?column?
----------
init
@@ -28,7 +28,7 @@ ROLLBACK TO s1;
INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i);
PREPARE TRANSACTION 'test1';
-- should show the inserts after a ROLLBACK
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
data
----------------------------------------------------------
streaming message: transactional: 1 prefix: test, sz: 50
@@ -59,7 +59,7 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-
COMMIT PREPARED 'test1';
--should show the COMMIT PREPARED and the other changes in the transaction
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
data
-------------------------
COMMIT PREPARED 'test1'
@@ -81,7 +81,7 @@ ROLLBACK to s1;
INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i);
PREPARE TRANSACTION 'test1_nodecode';
-- should NOT show inserts after a ROLLBACK
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
data
----------------------------------------------------------
streaming message: transactional: 1 prefix: test, sz: 50
@@ -89,7 +89,7 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-
COMMIT PREPARED 'test1_nodecode';
-- should show the inserts but not show a COMMIT PREPARED but a COMMIT
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
data
-------------------------------------------------------------
BEGIN
diff --git a/contrib/test_decoding/specs/twophase_snapshot.spec b/contrib/test_decoding/specs/twophase_snapshot.spec
index 3e700404e0e..e8d9567fb9a 100644
--- a/contrib/test_decoding/specs/twophase_snapshot.spec
+++ b/contrib/test_decoding/specs/twophase_snapshot.spec
@@ -15,8 +15,8 @@ teardown
session "s1"
setup { SET synchronous_commit=on; }
-step "s1init" {SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding');}
-step "s1start" {SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false', 'skip-empty-xacts', '1', 'two-phase-commit', '1');}
+step "s1init" {SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding', false, true);}
+step "s1start" {SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'include-xids', 'false', 'skip-empty-xacts', '1');}
step "s1insert" { INSERT INTO do_write DEFAULT VALUES; }
session "s2"
diff --git a/contrib/test_decoding/sql/twophase.sql b/contrib/test_decoding/sql/twophase.sql
index dacedfe1814..05f18e84948 100644
--- a/contrib/test_decoding/sql/twophase.sql
+++ b/contrib/test_decoding/sql/twophase.sql
@@ -1,7 +1,7 @@
-- Test prepared transactions. When two-phase-commit is enabled, transactions are
-- decoded at PREPARE time rather than at COMMIT PREPARED time.
SET synchronous_commit = on;
-SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding', false, true);
CREATE TABLE test_prepared1(id integer primary key);
CREATE TABLE test_prepared2(id integer primary key);
@@ -12,20 +12,20 @@ BEGIN;
INSERT INTO test_prepared1 VALUES (1);
INSERT INTO test_prepared1 VALUES (2);
-- should show nothing because the xact has not been prepared yet.
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
PREPARE TRANSACTION 'test_prepared#1';
-- should show both the above inserts and the PREPARE TRANSACTION.
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
COMMIT PREPARED 'test_prepared#1';
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
-- Test that rollback of a prepared xact is decoded.
BEGIN;
INSERT INTO test_prepared1 VALUES (3);
PREPARE TRANSACTION 'test_prepared#2';
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
ROLLBACK PREPARED 'test_prepared#2';
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
-- Test prepare of a xact containing ddl. Leaving xact uncommitted for next test.
BEGIN;
@@ -38,7 +38,7 @@ FROM pg_locks
WHERE locktype = 'relation'
AND relation = 'test_prepared1'::regclass;
-- The insert should show the newly altered column but not the DDL.
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
-- Test that we decode correctly while an uncommitted prepared xact
-- with ddl exists.
@@ -47,14 +47,14 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two
-- the ALTER will stop us inserting into the other one.
--
INSERT INTO test_prepared2 VALUES (5);
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
COMMIT PREPARED 'test_prepared#3';
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
-- make sure stuff still works
INSERT INTO test_prepared1 VALUES (6);
INSERT INTO test_prepared2 VALUES (7);
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
-- Check 'CLUSTER' (as operation that hold exclusive lock) doesn't block
-- logical decoding.
@@ -70,11 +70,11 @@ WHERE locktype = 'relation'
AND relation = 'test_prepared1'::regclass;
-- The above CLUSTER command shouldn't cause a timeout on 2pc decoding.
SET statement_timeout = '180s';
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
RESET statement_timeout;
COMMIT PREPARED 'test_prepared_lock';
-- consume the commit
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
-- Test savepoints and sub-xacts. Creating savepoints will create
-- sub-xacts implicitly.
@@ -86,26 +86,26 @@ INSERT INTO test_prepared_savepoint VALUES (2);
ROLLBACK TO SAVEPOINT test_savepoint;
PREPARE TRANSACTION 'test_prepared_savepoint';
-- should show only 1, not 2
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
COMMIT PREPARED 'test_prepared_savepoint';
-- consume the commit
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
-- Test that a GID containing "_nodecode" gets decoded at commit prepared time.
BEGIN;
INSERT INTO test_prepared1 VALUES (20);
PREPARE TRANSACTION 'test_prepared_nodecode';
-- should show nothing
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
COMMIT PREPARED 'test_prepared_nodecode';
-- should be decoded now
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
-- Test 8:
-- cleanup and make sure results are also empty
DROP TABLE test_prepared1;
DROP TABLE test_prepared2;
-- show results. There should be nothing to show
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
SELECT pg_drop_replication_slot('regression_slot');
diff --git a/contrib/test_decoding/sql/twophase_stream.sql b/contrib/test_decoding/sql/twophase_stream.sql
index e9dd44fdb37..646076da207 100644
--- a/contrib/test_decoding/sql/twophase_stream.sql
+++ b/contrib/test_decoding/sql/twophase_stream.sql
@@ -1,7 +1,7 @@
-- Test streaming of two-phase commits
SET synchronous_commit = on;
-SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding', false, true);
CREATE TABLE stream_test(data text);
@@ -18,11 +18,11 @@ ROLLBACK TO s1;
INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i);
PREPARE TRANSACTION 'test1';
-- should show the inserts after a ROLLBACK
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
COMMIT PREPARED 'test1';
--should show the COMMIT PREPARED and the other changes in the transaction
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
-- streaming test with sub-transaction and PREPARE/COMMIT PREPARED but with
-- filtered gid. gids with '_nodecode' will not be decoded at prepare time.
@@ -35,11 +35,11 @@ ROLLBACK to s1;
INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i);
PREPARE TRANSACTION 'test1_nodecode';
-- should NOT show inserts after a ROLLBACK
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
COMMIT PREPARED 'test1_nodecode';
-- should show the inserts but not show a COMMIT PREPARED but a COMMIT
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'two-phase-commit', '1', 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
DROP TABLE stream_test;
SELECT pg_drop_replication_slot('regression_slot');
diff --git a/contrib/test_decoding/test_decoding.c b/contrib/test_decoding/test_decoding.c
index 929255eac74..ae5f397f351 100644
--- a/contrib/test_decoding/test_decoding.c
+++ b/contrib/test_decoding/test_decoding.c
@@ -164,7 +164,6 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
ListCell *option;
TestDecodingData *data;
bool enable_streaming = false;
- bool enable_twophase = false;
data = palloc0(sizeof(TestDecodingData));
data->context = AllocSetContextCreate(ctx->context,
@@ -265,16 +264,6 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
errmsg("could not parse value \"%s\" for parameter \"%s\"",
strVal(elem->arg), elem->defname)));
}
- else if (strcmp(elem->defname, "two-phase-commit") == 0)
- {
- if (elem->arg == NULL)
- continue;
- else if (!parse_bool(strVal(elem->arg), &enable_twophase))
- ereport(ERROR,
- (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
- errmsg("could not parse value \"%s\" for parameter \"%s\"",
- strVal(elem->arg), elem->defname)));
- }
else
{
ereport(ERROR,
@@ -286,7 +275,6 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
}
ctx->streaming &= enable_streaming;
- ctx->twophase &= enable_twophase;
}
/* cleanup this plugin's resources */
diff --git a/doc/src/sgml/catalogs.sgml b/doc/src/sgml/catalogs.sgml
index db29905e91f..b1de6d0674b 100644
--- a/doc/src/sgml/catalogs.sgml
+++ b/doc/src/sgml/catalogs.sgml
@@ -11529,6 +11529,16 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
is <literal>-1</literal>.
</para></entry>
</row>
+
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>two_phase</structfield> <type>bool</type>
+ </para>
+ <para>
+ True if the slot is enabled for decoding prepared transactions. Always
+ false for physical slots.
+ </para></entry>
+ </row>
</tbody>
</tgroup>
</table>
diff --git a/doc/src/sgml/func.sgml b/doc/src/sgml/func.sgml
index 6c189bfed25..bf99f821496 100644
--- a/doc/src/sgml/func.sgml
+++ b/doc/src/sgml/func.sgml
@@ -25559,7 +25559,7 @@ postgres=# SELECT * FROM pg_walfile_name_offset(pg_stop_backup());
<indexterm>
<primary>pg_create_logical_replication_slot</primary>
</indexterm>
- <function>pg_create_logical_replication_slot</function> ( <parameter>slot_name</parameter> <type>name</type>, <parameter>plugin</parameter> <type>name</type> <optional>, <parameter>temporary</parameter> <type>boolean</type> </optional> )
+ <function>pg_create_logical_replication_slot</function> ( <parameter>slot_name</parameter> <type>name</type>, <parameter>plugin</parameter> <type>name</type> <optional>, <parameter>temporary</parameter> <type>boolean</type>, <parameter>two_phase</parameter> <type>boolean</type> </optional> )
<returnvalue>record</returnvalue>
( <parameter>slot_name</parameter> <type>name</type>,
<parameter>lsn</parameter> <type>pg_lsn</type> )
@@ -25571,9 +25571,11 @@ postgres=# SELECT * FROM pg_walfile_name_offset(pg_stop_backup());
parameter, <parameter>temporary</parameter>, when set to true, specifies that
the slot should not be permanently stored to disk and is only meant
for use by the current session. Temporary slots are also
- released upon any error. A call to this function has the same
- effect as the replication protocol command
- <literal>CREATE_REPLICATION_SLOT ... LOGICAL</literal>.
+ released upon any error. The optional fourth parameter,
+ <parameter>two_phase</parameter>, when set to true, specifies
+ that the decoding of prepared transactions is enabled for this
+ slot. A call to this function has the same effect as the replication
+ protocol command <literal>CREATE_REPLICATION_SLOT ... LOGICAL</literal>.
</para></entry>
</row>
diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml
index f1f13d81d56..80eb96d609a 100644
--- a/doc/src/sgml/logicaldecoding.sgml
+++ b/doc/src/sgml/logicaldecoding.sgml
@@ -55,7 +55,7 @@
<programlisting>
postgres=# -- Create a slot named 'regression_slot' using the output plugin 'test_decoding'
-postgres=# SELECT * FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+postgres=# SELECT * FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding', false, true);
slot_name | lsn
-----------------+-----------
regression_slot | 0/16B1970
@@ -169,17 +169,18 @@ $ pg_recvlogical -d postgres --slot=test --drop-slot
<para>
The following example shows SQL interface that can be used to decode prepared
transactions. Before you use two-phase commit commands, you must set
- <varname>max_prepared_transactions</varname> to at least 1. You must also set
- the option 'two-phase-commit' to 1 while calling
- <function>pg_logical_slot_get_changes</function>. Note that we will stream
- the entire transaction after the commit if it is not already decoded.
+ <varname>max_prepared_transactions</varname> to at least 1. You must also have
+ set the two-phase parameter as 'true' while creating the slot using
+ <function>pg_create_logical_replication_slot</function>
+ Note that we will stream the entire transaction after the commit if it
+ is not already decoded.
</para>
<programlisting>
postgres=# BEGIN;
postgres=*# INSERT INTO data(data) VALUES('5');
postgres=*# PREPARE TRANSACTION 'test_prepared1';
-postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1');
+postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL);
lsn | xid | data
-----------+-----+---------------------------------------------------------
0/1689DC0 | 529 | BEGIN 529
@@ -188,7 +189,7 @@ postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NU
(3 rows)
postgres=# COMMIT PREPARED 'test_prepared1';
-postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1');
+postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL);
lsn | xid | data
-----------+-----+--------------------------------------------
0/168A060 | 529 | COMMIT PREPARED 'test_prepared1', txid 529
@@ -198,7 +199,7 @@ postgres=#-- you can also rollback a prepared transaction
postgres=# BEGIN;
postgres=*# INSERT INTO data(data) VALUES('6');
postgres=*# PREPARE TRANSACTION 'test_prepared2';
-postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1');
+postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL);
lsn | xid | data
-----------+-----+---------------------------------------------------------
0/168A180 | 530 | BEGIN 530
@@ -207,7 +208,7 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU
(3 rows)
postgres=# ROLLBACK PREPARED 'test_prepared2';
-postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'two-phase-commit', '1');
+postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NULL);
lsn | xid | data
-----------+-----+----------------------------------------------
0/168A4B8 | 530 | ROLLBACK PREPARED 'test_prepared2', txid 530
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index fa58afd9d78..fc94a73a54a 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -894,7 +894,8 @@ CREATE VIEW pg_replication_slots AS
L.restart_lsn,
L.confirmed_flush_lsn,
L.wal_status,
- L.safe_wal_size
+ L.safe_wal_size,
+ L.two_phase
FROM pg_get_replication_slots() AS L
LEFT JOIN pg_database D ON (L.datoid = D.oid);
@@ -1318,6 +1319,7 @@ AS 'pg_create_physical_replication_slot';
CREATE OR REPLACE FUNCTION pg_create_logical_replication_slot(
IN slot_name name, IN plugin name,
IN temporary boolean DEFAULT false,
+ IN twophase boolean DEFAULT false,
OUT slot_name name, OUT lsn pg_lsn)
RETURNS RECORD
LANGUAGE INTERNAL
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index 3f6d723d096..37b75deb728 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -431,6 +431,12 @@ CreateInitDecodingContext(const char *plugin,
startup_cb_wrapper(ctx, &ctx->options, true);
MemoryContextSwitchTo(old_context);
+ /*
+ * We allow decoding of prepared transactions iff the two_phase option is
+ * enabled at the time of slot creation.
+ */
+ ctx->twophase &= MyReplicationSlot->data.two_phase;
+
ctx->reorder->output_rewrites = ctx->options.receive_rewrites;
return ctx;
@@ -531,6 +537,12 @@ CreateDecodingContext(XLogRecPtr start_lsn,
startup_cb_wrapper(ctx, &ctx->options, false);
MemoryContextSwitchTo(old_context);
+ /*
+ * We allow decoding of prepared transactions iff the two_phase option is
+ * enabled at the time of slot creation.
+ */
+ ctx->twophase &= MyReplicationSlot->data.two_phase;
+
ctx->reorder->output_rewrites = ctx->options.receive_rewrites;
ereport(LOG,
diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c
index fb4af2ef52d..75a087c2f9d 100644
--- a/src/backend/replication/slot.c
+++ b/src/backend/replication/slot.c
@@ -216,10 +216,17 @@ ReplicationSlotValidateName(const char *name, int elevel)
* name: Name of the slot
* db_specific: logical decoding is db specific; if the slot is going to
* be used for that pass true, otherwise false.
+ * two_phase: Allows decoding of prepared transactions. We allow this option
+ * to be enabled only at the slot creation time. If we allow this option
+ * to be changed during decoding then it is quite possible that we skip
+ * prepare first time because this option was not enabled. Now next time
+ * during getting changes, if the two_phase option is enabled it can skip
+ * prepare because by that time start decoding point has been moved. So the
+ * user will only get commit prepared.
*/
void
ReplicationSlotCreate(const char *name, bool db_specific,
- ReplicationSlotPersistency persistency)
+ ReplicationSlotPersistency persistency, bool two_phase)
{
ReplicationSlot *slot = NULL;
int i;
@@ -277,6 +284,7 @@ ReplicationSlotCreate(const char *name, bool db_specific,
namestrcpy(&slot->data.name, name);
slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
slot->data.persistency = persistency;
+ slot->data.two_phase = two_phase;
/* and then data only present in shared memory */
slot->just_dirtied = false;
diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c
index d24bb5b0b5f..9817b441136 100644
--- a/src/backend/replication/slotfuncs.c
+++ b/src/backend/replication/slotfuncs.c
@@ -50,7 +50,7 @@ create_physical_replication_slot(char *name, bool immediately_reserve,
/* acquire replication slot, this will check for conflicting names */
ReplicationSlotCreate(name, false,
- temporary ? RS_TEMPORARY : RS_PERSISTENT);
+ temporary ? RS_TEMPORARY : RS_PERSISTENT, false);
if (immediately_reserve)
{
@@ -124,7 +124,8 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
*/
static void
create_logical_replication_slot(char *name, char *plugin,
- bool temporary, XLogRecPtr restart_lsn,
+ bool temporary, bool two_phase,
+ XLogRecPtr restart_lsn,
bool find_startpoint)
{
LogicalDecodingContext *ctx = NULL;
@@ -140,7 +141,7 @@ create_logical_replication_slot(char *name, char *plugin,
* error as well.
*/
ReplicationSlotCreate(name, true,
- temporary ? RS_TEMPORARY : RS_EPHEMERAL);
+ temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase);
/*
* Create logical decoding context to find start point or, if we don't
@@ -177,6 +178,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
Name name = PG_GETARG_NAME(0);
Name plugin = PG_GETARG_NAME(1);
bool temporary = PG_GETARG_BOOL(2);
+ bool two_phase = PG_GETARG_BOOL(3);
Datum result;
TupleDesc tupdesc;
HeapTuple tuple;
@@ -193,6 +195,7 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
create_logical_replication_slot(NameStr(*name),
NameStr(*plugin),
temporary,
+ two_phase,
InvalidXLogRecPtr,
true);
@@ -236,7 +239,7 @@ pg_drop_replication_slot(PG_FUNCTION_ARGS)
Datum
pg_get_replication_slots(PG_FUNCTION_ARGS)
{
-#define PG_GET_REPLICATION_SLOTS_COLS 13
+#define PG_GET_REPLICATION_SLOTS_COLS 14
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupdesc;
Tuplestorestate *tupstore;
@@ -432,6 +435,8 @@ pg_get_replication_slots(PG_FUNCTION_ARGS)
values[i++] = Int64GetDatum(failLSN - currlsn);
}
+ values[i++] = BoolGetDatum(slot_contents.data.two_phase);
+
Assert(i == PG_GET_REPLICATION_SLOTS_COLS);
tuplestore_putvalues(tupstore, tupdesc, values, nulls);
@@ -796,6 +801,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
create_logical_replication_slot(NameStr(*dst_name),
plugin,
temporary,
+ false,
src_restart_lsn,
false);
}
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index eb3f18ed487..23baa4498af 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -938,7 +938,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
if (cmd->kind == REPLICATION_KIND_PHYSICAL)
{
ReplicationSlotCreate(cmd->slotname, false,
- cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT);
+ cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT,
+ false);
}
else
{
@@ -952,7 +953,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
* they get dropped on error as well.
*/
ReplicationSlotCreate(cmd->slotname, true,
- cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL);
+ cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL,
+ cmd->two_phase);
}
if (cmd->kind == REPLICATION_KIND_LOGICAL)
diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h
index 4cc94de224a..b19975c5c89 100644
--- a/src/include/catalog/catversion.h
+++ b/src/include/catalog/catversion.h
@@ -53,6 +53,6 @@
*/
/* yyyymmddN */
-#define CATALOG_VERSION_NO 202102191
+#define CATALOG_VERSION_NO 202103031
#endif
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 1487710d590..3d3974f4676 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -10496,16 +10496,16 @@
proname => 'pg_get_replication_slots', prorows => '10', proisstrict => 'f',
proretset => 't', provolatile => 's', prorettype => 'record',
proargtypes => '',
- proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8}',
- proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o}',
- proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size}',
+ proallargtypes => '{name,name,text,oid,bool,bool,int4,xid,xid,pg_lsn,pg_lsn,text,int8,bool}',
+ proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
+ proargnames => '{slot_name,plugin,slot_type,datoid,temporary,active,active_pid,xmin,catalog_xmin,restart_lsn,confirmed_flush_lsn,wal_status,safe_wal_size,two_phase}',
prosrc => 'pg_get_replication_slots' },
{ oid => '3786', descr => 'set up a logical replication slot',
proname => 'pg_create_logical_replication_slot', provolatile => 'v',
- proparallel => 'u', prorettype => 'record', proargtypes => 'name name bool',
- proallargtypes => '{name,name,bool,name,pg_lsn}',
- proargmodes => '{i,i,i,o,o}',
- proargnames => '{slot_name,plugin,temporary,slot_name,lsn}',
+ proparallel => 'u', prorettype => 'record', proargtypes => 'name name bool bool',
+ proallargtypes => '{name,name,bool,bool,name,pg_lsn}',
+ proargmodes => '{i,i,i,i,o,o}',
+ proargnames => '{slot_name,plugin,temporary,twophase,slot_name,lsn}',
prosrc => 'pg_create_logical_replication_slot' },
{ oid => '4222',
descr => 'copy a logical replication slot, changing temporality and plugin',
diff --git a/src/include/nodes/replnodes.h b/src/include/nodes/replnodes.h
index faa3a251f26..ebc43a0293d 100644
--- a/src/include/nodes/replnodes.h
+++ b/src/include/nodes/replnodes.h
@@ -56,6 +56,7 @@ typedef struct CreateReplicationSlotCmd
ReplicationKind kind;
char *plugin;
bool temporary;
+ bool two_phase;
List *options;
} CreateReplicationSlotCmd;
diff --git a/src/include/replication/slot.h b/src/include/replication/slot.h
index 5c3fde20c69..1ad5e6c50df 100644
--- a/src/include/replication/slot.h
+++ b/src/include/replication/slot.h
@@ -98,6 +98,11 @@ typedef struct ReplicationSlotPersistentData
*/
XLogRecPtr initial_consistent_point;
+ /*
+ * Allow decoding of prepared transactions?
+ */
+ bool two_phase;
+
/* plugin name */
NameData plugin;
} ReplicationSlotPersistentData;
@@ -199,7 +204,7 @@ extern void ReplicationSlotsShmemInit(void);
/* management of individual slots */
extern void ReplicationSlotCreate(const char *name, bool db_specific,
- ReplicationSlotPersistency p);
+ ReplicationSlotPersistency p, bool two_phase);
extern void ReplicationSlotPersist(void);
extern void ReplicationSlotDrop(const char *name, bool nowait);
diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out
index 10a1f34ebc1..b1c9b7bdfe3 100644
--- a/src/test/regress/expected/rules.out
+++ b/src/test/regress/expected/rules.out
@@ -1477,8 +1477,9 @@ pg_replication_slots| SELECT l.slot_name,
l.restart_lsn,
l.confirmed_flush_lsn,
l.wal_status,
- l.safe_wal_size
- FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size)
+ l.safe_wal_size,
+ l.two_phase
+ FROM (pg_get_replication_slots() l(slot_name, plugin, slot_type, datoid, temporary, active, active_pid, xmin, catalog_xmin, restart_lsn, confirmed_flush_lsn, wal_status, safe_wal_size, two_phase)
LEFT JOIN pg_database d ON ((l.datoid = d.oid)));
pg_roles| SELECT pg_authid.rolname,
pg_authid.rolsuper,