aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/logical/worker.c35
-rw-r--r--src/backend/utils/error/elog.c17
-rw-r--r--src/include/utils/elog.h1
-rw-r--r--src/test/subscription/t/021_twophase.pl14
4 files changed, 66 insertions, 1 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index cdea6295d8a..38c28953078 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -4713,6 +4713,17 @@ InitializeLogRepWorker(void)
CommitTransactionCommand();
}
+/*
+ * Reset the origin state.
+ */
+static void
+replorigin_reset(int code, Datum arg)
+{
+ replorigin_session_origin = InvalidRepOriginId;
+ replorigin_session_origin_lsn = InvalidXLogRecPtr;
+ replorigin_session_origin_timestamp = 0;
+}
+
/* Common function to setup the leader apply or tablesync worker. */
void
SetupApplyOrSyncWorker(int worker_slot)
@@ -4741,6 +4752,19 @@ SetupApplyOrSyncWorker(int worker_slot)
InitializeLogRepWorker();
+ /*
+ * Register a callback to reset the origin state before aborting any
+ * pending transaction during shutdown (see ShutdownPostgres()). This will
+ * avoid origin advancement for an in-complete transaction which could
+ * otherwise lead to its loss as such a transaction won't be sent by the
+ * server again.
+ *
+ * Note that even a LOG or DEBUG statement placed after setting the origin
+ * state may process a shutdown signal before committing the current apply
+ * operation. So, it is important to register such a callback here.
+ */
+ before_shmem_exit(replorigin_reset, (Datum) 0);
+
/* Connect to the origin and start the replication. */
elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
MySubscription->conninfo);
@@ -4967,12 +4991,23 @@ void
apply_error_callback(void *arg)
{
ApplyErrorCallbackArg *errarg = &apply_error_callback_arg;
+ int elevel;
if (apply_error_callback_arg.command == 0)
return;
Assert(errarg->origin_name);
+ elevel = geterrlevel();
+
+ /*
+ * Reset the origin state to prevent the advancement of origin progress if
+ * we fail to apply. Otherwise, this will result in transaction loss as
+ * that transaction won't be sent again by the server.
+ */
+ if (elevel >= ERROR)
+ replorigin_reset(0, (Datum) 0);
+
if (errarg->rel == NULL)
{
if (!TransactionIdIsValid(errarg->remote_xid))
diff --git a/src/backend/utils/error/elog.c b/src/backend/utils/error/elog.c
index 943d8588f3d..5cbb5b54168 100644
--- a/src/backend/utils/error/elog.c
+++ b/src/backend/utils/error/elog.c
@@ -1569,6 +1569,23 @@ geterrcode(void)
}
/*
+ * geterrlevel --- return the currently set error level
+ *
+ * This is only intended for use in error callback subroutines, since there
+ * is no other place outside elog.c where the concept is meaningful.
+ */
+int
+geterrlevel(void)
+{
+ ErrorData *edata = &errordata[errordata_stack_depth];
+
+ /* we don't bother incrementing recursion_depth */
+ CHECK_STACK_DEPTH();
+
+ return edata->elevel;
+}
+
+/*
* geterrposition --- return the currently set error position (0 if none)
*
* This is only intended for use in error callback subroutines, since there
diff --git a/src/include/utils/elog.h b/src/include/utils/elog.h
index 054dd2bf62f..e54eca5b489 100644
--- a/src/include/utils/elog.h
+++ b/src/include/utils/elog.h
@@ -226,6 +226,7 @@ extern int internalerrquery(const char *query);
extern int err_generic_string(int field, const char *str);
extern int geterrcode(void);
+extern int geterrlevel(void);
extern int geterrposition(void);
extern int getinternalerrposition(void);
diff --git a/src/test/subscription/t/021_twophase.pl b/src/test/subscription/t/021_twophase.pl
index 5e50f1af338..19147f31e21 100644
--- a/src/test/subscription/t/021_twophase.pl
+++ b/src/test/subscription/t/021_twophase.pl
@@ -23,7 +23,7 @@ $node_publisher->start;
my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
$node_subscriber->init;
$node_subscriber->append_conf('postgresql.conf',
- qq(max_prepared_transactions = 10));
+ qq(max_prepared_transactions = 0));
$node_subscriber->start;
# Create some pre-existing content on publisher
@@ -67,12 +67,24 @@ $node_subscriber->poll_query_until('postgres', $twophase_query)
# then COMMIT PREPARED
###############################
+# Save the log location, to see the failure of the application
+my $log_location = -s $node_subscriber->logfile;
+
$node_publisher->safe_psql(
'postgres', "
BEGIN;
INSERT INTO tab_full VALUES (11);
PREPARE TRANSACTION 'test_prepared_tab_full';");
+# Confirm the ERROR is reported becasue max_prepared_transactions is zero
+$node_subscriber->wait_for_log(
+ qr/ERROR: ( [A-Z0-9]+:)? prepared transactions are disabled/);
+
+# Set max_prepared_transactions to correct value to resume the replication
+$node_subscriber->append_conf('postgresql.conf',
+ qq(max_prepared_transactions = 10));
+$node_subscriber->restart;
+
$node_publisher->wait_for_catchup($appname);
# check that transaction is in prepared state on subscriber