diff options
Diffstat (limited to 'src/backend/replication/logical/worker.c')
-rw-r--r-- | src/backend/replication/logical/worker.c | 35 |
1 files changed, 35 insertions, 0 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 832b1cf7642..18f86c73bd3 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -4405,6 +4405,17 @@ start_table_sync(XLogRecPtr *origin_startpos, char **myslotname) } /* + * Reset the origin state. + */ +static void +replorigin_reset(int code, Datum arg) +{ + replorigin_session_origin = InvalidRepOriginId; + replorigin_session_origin_lsn = InvalidXLogRecPtr; + replorigin_session_origin_timestamp = 0; +} + +/* * Run the apply loop with error handling. Disable the subscription, * if necessary. * @@ -4553,6 +4564,19 @@ ApplyWorkerMain(Datum main_arg) InitializeApplyWorker(); + /* + * Register a callback to reset the origin state before aborting any + * pending transaction during shutdown (see ShutdownPostgres()). This will + * avoid origin advancement for an in-complete transaction which could + * otherwise lead to its loss as such a transaction won't be sent by the + * server again. + * + * Note that even a LOG or DEBUG statement placed after setting the origin + * state may process a shutdown signal before committing the current apply + * operation. So, it is important to register such a callback here. + */ + before_shmem_exit(replorigin_reset, (Datum) 0); + InitializingApplyWorker = false; /* Connect to the origin and start the replication. */ @@ -4916,12 +4940,23 @@ void apply_error_callback(void *arg) { ApplyErrorCallbackArg *errarg = &apply_error_callback_arg; + int elevel; if (apply_error_callback_arg.command == 0) return; Assert(errarg->origin_name); + elevel = geterrlevel(); + + /* + * Reset the origin state to prevent the advancement of origin progress if + * we fail to apply. Otherwise, this will result in transaction loss as + * that transaction won't be sent again by the server. + */ + if (elevel >= ERROR) + replorigin_reset(0, (Datum) 0); + if (errarg->rel == NULL) { if (!TransactionIdIsValid(errarg->remote_xid)) |