aboutsummaryrefslogtreecommitdiff
path: root/src/backend
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend')
-rw-r--r--src/backend/replication/logical/worker.c35
-rw-r--r--src/backend/utils/error/elog.c17
2 files changed, 52 insertions, 0 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index cdea6295d8a..38c28953078 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -4713,6 +4713,17 @@ InitializeLogRepWorker(void)
CommitTransactionCommand();
}
+/*
+ * Reset the origin state.
+ */
+static void
+replorigin_reset(int code, Datum arg)
+{
+ replorigin_session_origin = InvalidRepOriginId;
+ replorigin_session_origin_lsn = InvalidXLogRecPtr;
+ replorigin_session_origin_timestamp = 0;
+}
+
/* Common function to setup the leader apply or tablesync worker. */
void
SetupApplyOrSyncWorker(int worker_slot)
@@ -4741,6 +4752,19 @@ SetupApplyOrSyncWorker(int worker_slot)
InitializeLogRepWorker();
+ /*
+ * Register a callback to reset the origin state before aborting any
+ * pending transaction during shutdown (see ShutdownPostgres()). This will
+ * avoid origin advancement for an in-complete transaction which could
+ * otherwise lead to its loss as such a transaction won't be sent by the
+ * server again.
+ *
+ * Note that even a LOG or DEBUG statement placed after setting the origin
+ * state may process a shutdown signal before committing the current apply
+ * operation. So, it is important to register such a callback here.
+ */
+ before_shmem_exit(replorigin_reset, (Datum) 0);
+
/* Connect to the origin and start the replication. */
elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
MySubscription->conninfo);
@@ -4967,12 +4991,23 @@ void
apply_error_callback(void *arg)
{
ApplyErrorCallbackArg *errarg = &apply_error_callback_arg;
+ int elevel;
if (apply_error_callback_arg.command == 0)
return;
Assert(errarg->origin_name);
+ elevel = geterrlevel();
+
+ /*
+ * Reset the origin state to prevent the advancement of origin progress if
+ * we fail to apply. Otherwise, this will result in transaction loss as
+ * that transaction won't be sent again by the server.
+ */
+ if (elevel >= ERROR)
+ replorigin_reset(0, (Datum) 0);
+
if (errarg->rel == NULL)
{
if (!TransactionIdIsValid(errarg->remote_xid))
diff --git a/src/backend/utils/error/elog.c b/src/backend/utils/error/elog.c
index 943d8588f3d..5cbb5b54168 100644
--- a/src/backend/utils/error/elog.c
+++ b/src/backend/utils/error/elog.c
@@ -1569,6 +1569,23 @@ geterrcode(void)
}
/*
+ * geterrlevel --- return the currently set error level
+ *
+ * This is only intended for use in error callback subroutines, since there
+ * is no other place outside elog.c where the concept is meaningful.
+ */
+int
+geterrlevel(void)
+{
+ ErrorData *edata = &errordata[errordata_stack_depth];
+
+ /* we don't bother incrementing recursion_depth */
+ CHECK_STACK_DEPTH();
+
+ return edata->elevel;
+}
+
+/*
* geterrposition --- return the currently set error position (0 if none)
*
* This is only intended for use in error callback subroutines, since there