aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical/worker.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical/worker.c')
-rw-r--r--src/backend/replication/logical/worker.c21
1 files changed, 10 insertions, 11 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 5ce596f4576..4151a4b2a96 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -414,6 +414,8 @@ static inline void reset_apply_error_context_info(void);
static TransApplyAction get_transaction_apply_action(TransactionId xid,
ParallelApplyWorkerInfo **winfo);
+static void replorigin_reset(int code, Datum arg);
+
/*
* Form the origin name for the subscription.
*
@@ -4516,6 +4518,14 @@ start_apply(XLogRecPtr origin_startpos)
}
PG_CATCH();
{
+ /*
+ * Reset the origin state to prevent the advancement of origin
+ * progress if we fail to apply. Otherwise, this will result in
+ * transaction loss as that transaction won't be sent again by the
+ * server.
+ */
+ replorigin_reset(0, (Datum) 0);
+
if (MySubscription->disableonerr)
DisableSubscriptionAndExit();
else
@@ -5004,23 +5014,12 @@ void
apply_error_callback(void *arg)
{
ApplyErrorCallbackArg *errarg = &apply_error_callback_arg;
- int elevel;
if (apply_error_callback_arg.command == 0)
return;
Assert(errarg->origin_name);
- elevel = geterrlevel();
-
- /*
- * Reset the origin state to prevent the advancement of origin progress if
- * we fail to apply. Otherwise, this will result in transaction loss as
- * that transaction won't be sent again by the server.
- */
- if (elevel >= ERROR)
- replorigin_reset(0, (Datum) 0);
-
if (errarg->rel == NULL)
{
if (!TransactionIdIsValid(errarg->remote_xid))