aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/replication/logical/worker.c35
-rw-r--r--src/backend/utils/error/elog.c17
-rw-r--r--src/include/utils/elog.h1
-rw-r--r--src/test/subscription/t/021_twophase.pl14
4 files changed, 66 insertions, 1 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 832b1cf7642..18f86c73bd3 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -4405,6 +4405,17 @@ start_table_sync(XLogRecPtr *origin_startpos, char **myslotname)
}
/*
+ * Reset the origin state.
+ */
+static void
+replorigin_reset(int code, Datum arg)
+{
+ replorigin_session_origin = InvalidRepOriginId;
+ replorigin_session_origin_lsn = InvalidXLogRecPtr;
+ replorigin_session_origin_timestamp = 0;
+}
+
+/*
* Run the apply loop with error handling. Disable the subscription,
* if necessary.
*
@@ -4553,6 +4564,19 @@ ApplyWorkerMain(Datum main_arg)
InitializeApplyWorker();
+ /*
+ * Register a callback to reset the origin state before aborting any
+ * pending transaction during shutdown (see ShutdownPostgres()). This will
+ * avoid origin advancement for an in-complete transaction which could
+ * otherwise lead to its loss as such a transaction won't be sent by the
+ * server again.
+ *
+ * Note that even a LOG or DEBUG statement placed after setting the origin
+ * state may process a shutdown signal before committing the current apply
+ * operation. So, it is important to register such a callback here.
+ */
+ before_shmem_exit(replorigin_reset, (Datum) 0);
+
InitializingApplyWorker = false;
/* Connect to the origin and start the replication. */
@@ -4916,12 +4940,23 @@ void
apply_error_callback(void *arg)
{
ApplyErrorCallbackArg *errarg = &apply_error_callback_arg;
+ int elevel;
if (apply_error_callback_arg.command == 0)
return;
Assert(errarg->origin_name);
+ elevel = geterrlevel();
+
+ /*
+ * Reset the origin state to prevent the advancement of origin progress if
+ * we fail to apply. Otherwise, this will result in transaction loss as
+ * that transaction won't be sent again by the server.
+ */
+ if (elevel >= ERROR)
+ replorigin_reset(0, (Datum) 0);
+
if (errarg->rel == NULL)
{
if (!TransactionIdIsValid(errarg->remote_xid))
diff --git a/src/backend/utils/error/elog.c b/src/backend/utils/error/elog.c
index 7112fb00069..3347f24d4a5 100644
--- a/src/backend/utils/error/elog.c
+++ b/src/backend/utils/error/elog.c
@@ -1572,6 +1572,23 @@ geterrcode(void)
}
/*
+ * geterrlevel --- return the currently set SQLSTATE error level
+ *
+ * This is only intended for use in error callback subroutines, since there
+ * is no other place outside elog.c where the concept is meaningful.
+ */
+int
+geterrlevel(void)
+{
+ ErrorData *edata = &errordata[errordata_stack_depth];
+
+ /* we don't bother incrementing recursion_depth */
+ CHECK_STACK_DEPTH();
+
+ return edata->elevel;
+}
+
+/*
* geterrposition --- return the currently set error position (0 if none)
*
* This is only intended for use in error callback subroutines, since there
diff --git a/src/include/utils/elog.h b/src/include/utils/elog.h
index 0292e88b4f2..8bb55e5b3cc 100644
--- a/src/include/utils/elog.h
+++ b/src/include/utils/elog.h
@@ -226,6 +226,7 @@ extern int internalerrquery(const char *query);
extern int err_generic_string(int field, const char *str);
extern int geterrcode(void);
+extern int geterrlevel(void);
extern int geterrposition(void);
extern int getinternalerrposition(void);
diff --git a/src/test/subscription/t/021_twophase.pl b/src/test/subscription/t/021_twophase.pl
index 8ce4cfc983c..822932c1dbc 100644
--- a/src/test/subscription/t/021_twophase.pl
+++ b/src/test/subscription/t/021_twophase.pl
@@ -23,7 +23,7 @@ $node_publisher->start;
my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
$node_subscriber->init(allows_streaming => 'logical');
$node_subscriber->append_conf('postgresql.conf',
- qq(max_prepared_transactions = 10));
+ qq(max_prepared_transactions = 0));
$node_subscriber->start;
# Create some pre-existing content on publisher
@@ -67,12 +67,24 @@ $node_subscriber->poll_query_until('postgres', $twophase_query)
# then COMMIT PREPARED
###############################
+# Save the log location, to see the failure of the application
+my $log_location = -s $node_subscriber->logfile;
+
$node_publisher->safe_psql(
'postgres', "
BEGIN;
INSERT INTO tab_full VALUES (11);
PREPARE TRANSACTION 'test_prepared_tab_full';");
+# Confirm the ERROR is reported becasue max_prepared_transactions is zero
+$node_subscriber->wait_for_log(
+ qr/ERROR: ( [A-Z0-9]+:)? prepared transactions are disabled/);
+
+# Set max_prepared_transactions to correct value to resume the replication
+$node_subscriber->append_conf('postgresql.conf',
+ qq(max_prepared_transactions = 10));
+$node_subscriber->restart;
+
$node_publisher->wait_for_catchup($appname);
# check that transaction is in prepared state on subscriber