diff options
author | Amit Kapila <akapila@postgresql.org> | 2025-02-26 11:12:50 +0530 |
---|---|---|
committer | Amit Kapila <akapila@postgresql.org> | 2025-02-26 11:12:50 +0530 |
commit | e117cfb2f6c67fe4ba46720bc6917da3dbd48c10 (patch) | |
tree | ebe5e19afdf9b40a2f0ebe68b980e261476384aa /src | |
parent | adc6032fa8824e7653252b02abc6a59d8b9d01a6 (diff) | |
download | postgresql-e117cfb2f6c67fe4ba46720bc6917da3dbd48c10.tar.gz postgresql-e117cfb2f6c67fe4ba46720bc6917da3dbd48c10.zip |
Add two-phase option in pg_createsubscriber.
This patch introduces the '--enable-two-phase' option to the
'pg_createsubscriber' utility, allowing users to enable two-phase commit
for all subscriptions during their creation.
Note that even without this option users can enable the two_phase option
for the subscriptions created by pg_createsubscriber. However, it requires
the subscription to be disabled first which could be inconvenient for
users.
When two-phase commit is enabled, prepared transactions are sent to the
subscriber at the time of 'PREPARE TRANSACTION', and they are processed as
two-phase transactions on the subscriber as well. If disabled, prepared
transactions are sent only when committed and are processed immediately by
the subscriber.
Author: Shubham Khanna <khannashubham1197@gmail.com>
Reviewed-by: vignesh C <vignesh21@gmail.com>
Reviewed-by: Hayato Kuroda <kuroda.hayato@fujitsu.com>
Reviewed-by: Peter Smith <smithpb2250@gmail.com>
Reviewed-by: Ajin Cherian <itsajin@gmail.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Discussion: https://postgr.es/m/CAHv8RjLPdFP=kA5LNSmWZ=+GMXmO+LczvV6p9HJjsXxZz10KGA@mail.gmail.com
Diffstat (limited to 'src')
-rw-r--r-- | src/bin/pg_basebackup/pg_createsubscriber.c | 84 | ||||
-rw-r--r-- | src/bin/pg_basebackup/t/040_pg_createsubscriber.pl | 11 |
2 files changed, 66 insertions, 29 deletions
diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c index 9fdf15e5ac0..a5a2d61165d 100644 --- a/src/bin/pg_basebackup/pg_createsubscriber.c +++ b/src/bin/pg_basebackup/pg_createsubscriber.c @@ -38,6 +38,7 @@ struct CreateSubscriberOptions char *socket_dir; /* directory for Unix-domain socket, if any */ char *sub_port; /* subscriber port number */ const char *sub_username; /* subscriber username */ + bool two_phase; /* enable-two-phase option */ SimpleStringList database_names; /* list of database names */ SimpleStringList pub_names; /* list of publication names */ SimpleStringList sub_names; /* list of subscription names */ @@ -45,6 +46,7 @@ struct CreateSubscriberOptions int recovery_timeout; /* stop recovery after this time */ }; +/* per-database publication/subscription info */ struct LogicalRepInfo { char *dbname; /* database name */ @@ -58,6 +60,16 @@ struct LogicalRepInfo bool made_publication; /* publication was created */ }; +/* + * Information shared across all the databases (or publications and + * subscriptions). + */ +struct LogicalRepInfos +{ + struct LogicalRepInfo *dbinfo; + bool two_phase; /* enable-two-phase option */ +}; + static void cleanup_objects_atexit(void); static void usage(); static char *get_base_conninfo(const char *conninfo, char **dbname); @@ -117,7 +129,7 @@ static bool dry_run = false; static bool success = false; -static struct LogicalRepInfo *dbinfo; +static struct LogicalRepInfos dbinfos; static int num_dbs = 0; /* number of specified databases */ static int num_pubs = 0; /* number of specified publications */ static int num_subs = 0; /* number of specified subscriptions */ @@ -172,17 +184,17 @@ cleanup_objects_atexit(void) for (int i = 0; i < num_dbs; i++) { - if (dbinfo[i].made_publication || dbinfo[i].made_replslot) + if (dbinfos.dbinfo[i].made_publication || dbinfos.dbinfo[i].made_replslot) { PGconn *conn; - conn = connect_database(dbinfo[i].pubconninfo, false); + conn = connect_database(dbinfos.dbinfo[i].pubconninfo, false); if (conn != NULL) { - if (dbinfo[i].made_publication) - drop_publication(conn, &dbinfo[i]); - if (dbinfo[i].made_replslot) - drop_replication_slot(conn, &dbinfo[i], dbinfo[i].replslotname); + if (dbinfos.dbinfo[i].made_publication) + drop_publication(conn, &dbinfos.dbinfo[i]); + if (dbinfos.dbinfo[i].made_replslot) + drop_replication_slot(conn, &dbinfos.dbinfo[i], dbinfos.dbinfo[i].replslotname); disconnect_database(conn, false); } else @@ -192,16 +204,18 @@ cleanup_objects_atexit(void) * that some objects were left on primary and should be * removed before trying again. */ - if (dbinfo[i].made_publication) + if (dbinfos.dbinfo[i].made_publication) { pg_log_warning("publication \"%s\" created in database \"%s\" on primary was left behind", - dbinfo[i].pubname, dbinfo[i].dbname); + dbinfos.dbinfo[i].pubname, + dbinfos.dbinfo[i].dbname); pg_log_warning_hint("Drop this publication before trying again."); } - if (dbinfo[i].made_replslot) + if (dbinfos.dbinfo[i].made_replslot) { pg_log_warning("replication slot \"%s\" created in database \"%s\" on primary was left behind", - dbinfo[i].replslotname, dbinfo[i].dbname); + dbinfos.dbinfo[i].replslotname, + dbinfos.dbinfo[i].dbname); pg_log_warning_hint("Drop this replication slot soon to avoid retention of WAL files."); } } @@ -227,6 +241,7 @@ usage(void) printf(_(" -P, --publisher-server=CONNSTR publisher connection string\n")); printf(_(" -s, --socketdir=DIR socket directory to use (default current dir.)\n")); printf(_(" -t, --recovery-timeout=SECS seconds to wait for recovery to end\n")); + printf(_(" -T, --enable-two-phase enable two-phase commit for all subscriptions\n")); printf(_(" -U, --subscriber-username=NAME user name for subscriber connection\n")); printf(_(" -v, --verbose output verbose messages\n")); printf(_(" --config-file=FILENAME use specified main server configuration\n" @@ -479,9 +494,10 @@ store_pub_sub_info(const struct CreateSubscriberOptions *opt, dbinfo[i].pubname ? dbinfo[i].pubname : "(auto)", dbinfo[i].replslotname ? dbinfo[i].replslotname : "(auto)", dbinfo[i].pubconninfo); - pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s", i, + pg_log_debug("subscriber(%d): subscription: %s ; connection string: %s, two_phase: %s", i, dbinfo[i].subname ? dbinfo[i].subname : "(auto)", - dbinfo[i].subconninfo); + dbinfo[i].subconninfo, + dbinfos.two_phase ? "true" : "false"); if (num_pubs > 0) pubcell = pubcell->next; @@ -938,11 +954,12 @@ check_publisher(const struct LogicalRepInfo *dbinfo) failed = true; } - if (max_prepared_transactions != 0) + if (max_prepared_transactions != 0 && !dbinfos.two_phase) { pg_log_warning("two_phase option will not be enabled for replication slots"); pg_log_warning_detail("Subscriptions will be created with the two_phase option disabled. " "Prepared transactions will be replicated at COMMIT PREPARED."); + pg_log_warning_hint("You can use --enable-two-phase switch to enable two_phase."); } /* @@ -1345,8 +1362,9 @@ create_logical_replication_slot(PGconn *conn, struct LogicalRepInfo *dbinfo) slot_name_esc = PQescapeLiteral(conn, slot_name, strlen(slot_name)); appendPQExpBuffer(str, - "SELECT lsn FROM pg_catalog.pg_create_logical_replication_slot(%s, 'pgoutput', false, false, false)", - slot_name_esc); + "SELECT lsn FROM pg_catalog.pg_create_logical_replication_slot(%s, 'pgoutput', false, %s, false)", + slot_name_esc, + dbinfos.two_phase ? "true" : "false"); PQfreemem(slot_name_esc); @@ -1722,8 +1740,9 @@ create_subscription(PGconn *conn, const struct LogicalRepInfo *dbinfo) appendPQExpBuffer(str, "CREATE SUBSCRIPTION %s CONNECTION %s PUBLICATION %s " "WITH (create_slot = false, enabled = false, " - "slot_name = %s, copy_data = false)", - subname_esc, pubconninfo_esc, pubname_esc, replslotname_esc); + "slot_name = %s, copy_data = false, two_phase = %s)", + subname_esc, pubconninfo_esc, pubname_esc, replslotname_esc, + dbinfos.two_phase ? "true" : "false"); PQfreemem(pubname_esc); PQfreemem(subname_esc); @@ -1895,6 +1914,7 @@ main(int argc, char **argv) {"publisher-server", required_argument, NULL, 'P'}, {"socketdir", required_argument, NULL, 's'}, {"recovery-timeout", required_argument, NULL, 't'}, + {"enable-two-phase", no_argument, NULL, 'T'}, {"subscriber-username", required_argument, NULL, 'U'}, {"verbose", no_argument, NULL, 'v'}, {"version", no_argument, NULL, 'V'}, @@ -1950,6 +1970,7 @@ main(int argc, char **argv) opt.socket_dir = NULL; opt.sub_port = DEFAULT_SUB_PORT; opt.sub_username = NULL; + opt.two_phase = false; opt.database_names = (SimpleStringList) { 0 @@ -1972,7 +1993,7 @@ main(int argc, char **argv) get_restricted_token(); - while ((c = getopt_long(argc, argv, "d:D:np:P:s:t:U:v", + while ((c = getopt_long(argc, argv, "d:D:np:P:s:t:TU:v", long_options, &option_index)) != -1) { switch (c) @@ -2009,6 +2030,9 @@ main(int argc, char **argv) case 't': opt.recovery_timeout = atoi(optarg); break; + case 'T': + opt.two_phase = true; + break; case 'U': opt.sub_username = pg_strdup(optarg); break; @@ -2170,12 +2194,14 @@ main(int argc, char **argv) /* Rudimentary check for a data directory */ check_data_directory(subscriber_dir); + dbinfos.two_phase = opt.two_phase; + /* * Store database information for publisher and subscriber. It should be * called before atexit() because its return is used in the * cleanup_objects_atexit(). */ - dbinfo = store_pub_sub_info(&opt, pub_base_conninfo, sub_base_conninfo); + dbinfos.dbinfo = store_pub_sub_info(&opt, pub_base_conninfo, sub_base_conninfo); /* Register a function to clean up objects in case of failure */ atexit(cleanup_objects_atexit); @@ -2184,7 +2210,7 @@ main(int argc, char **argv) * Check if the subscriber data directory has the same system identifier * than the publisher data directory. */ - pub_sysid = get_primary_sysid(dbinfo[0].pubconninfo); + pub_sysid = get_primary_sysid(dbinfos.dbinfo[0].pubconninfo); sub_sysid = get_standby_sysid(subscriber_dir); if (pub_sysid != sub_sysid) pg_fatal("subscriber data directory is not a copy of the source database cluster"); @@ -2214,10 +2240,10 @@ main(int argc, char **argv) start_standby_server(&opt, true, false); /* Check if the standby server is ready for logical replication */ - check_subscriber(dbinfo); + check_subscriber(dbinfos.dbinfo); /* Check if the primary server is ready for logical replication */ - check_publisher(dbinfo); + check_publisher(dbinfos.dbinfo); /* * Stop the target server. The recovery process requires that the server @@ -2230,10 +2256,10 @@ main(int argc, char **argv) stop_standby_server(subscriber_dir); /* Create the required objects for each database on publisher */ - consistent_lsn = setup_publisher(dbinfo); + consistent_lsn = setup_publisher(dbinfos.dbinfo); /* Write the required recovery parameters */ - setup_recovery(dbinfo, subscriber_dir, consistent_lsn); + setup_recovery(dbinfos.dbinfo, subscriber_dir, consistent_lsn); /* * Start subscriber so the recovery parameters will take effect. Wait @@ -2244,7 +2270,7 @@ main(int argc, char **argv) start_standby_server(&opt, true, true); /* Waiting the subscriber to be promoted */ - wait_for_end_recovery(dbinfo[0].subconninfo, &opt); + wait_for_end_recovery(dbinfos.dbinfo[0].subconninfo, &opt); /* * Create the subscription for each database on subscriber. It does not @@ -2252,13 +2278,13 @@ main(int argc, char **argv) * point to the LSN reported by setup_publisher(). It also cleans up * publications created by this tool and replication to the standby. */ - setup_subscriber(dbinfo, consistent_lsn); + setup_subscriber(dbinfos.dbinfo, consistent_lsn); /* Remove primary_slot_name if it exists on primary */ - drop_primary_replication_slot(dbinfo, primary_slot_name); + drop_primary_replication_slot(dbinfos.dbinfo, primary_slot_name); /* Remove failover replication slots if they exist on subscriber */ - drop_failover_replication_slots(dbinfo); + drop_failover_replication_slots(dbinfos.dbinfo); /* Stop the subscriber */ pg_log_info("stopping the subscriber"); diff --git a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl index c8dbdb7e9b7..c35fa108ce3 100644 --- a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl +++ b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl @@ -373,6 +373,7 @@ command_ok( # Run pg_createsubscriber on node S. --verbose is used twice # to show more information. +# In passing, also test the --enable-two-phase option command_ok( [ 'pg_createsubscriber', @@ -388,6 +389,7 @@ command_ok( '--replication-slot' => 'replslot2', '--database' => $db1, '--database' => $db2, + '--enable-two-phase' ], 'run pg_createsubscriber on node S'); @@ -406,6 +408,15 @@ $node_p->safe_psql($db2, "INSERT INTO tbl2 VALUES('row 1')"); # Start subscriber $node_s->start; +# Verify that all subtwophase states are pending or enabled, +# e.g. there are no subscriptions where subtwophase is disabled ('d') +is( $node_s->safe_psql( + 'postgres', + "SELECT count(1) = 0 FROM pg_subscription WHERE subtwophasestate = 'd'" + ), + 't', + 'subscriptions are created with the two-phase option enabled'); + # Confirm the pre-existing subscription has been removed $result = $node_s->safe_psql( 'postgres', qq( |