diff options
Diffstat (limited to 'src/backend/replication/logical/worker.c')
-rw-r--r-- | src/backend/replication/logical/worker.c | 35 |
1 files changed, 35 insertions, 0 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index b5a80fe3e84..d091a1dd27c 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -4639,6 +4639,17 @@ InitializeLogRepWorker(void) CommitTransactionCommand(); } +/* + * Reset the origin state. + */ +static void +replorigin_reset(int code, Datum arg) +{ + replorigin_session_origin = InvalidRepOriginId; + replorigin_session_origin_lsn = InvalidXLogRecPtr; + replorigin_session_origin_timestamp = 0; +} + /* Common function to setup the leader apply or tablesync worker. */ void SetupApplyOrSyncWorker(int worker_slot) @@ -4667,6 +4678,19 @@ SetupApplyOrSyncWorker(int worker_slot) InitializeLogRepWorker(); + /* + * Register a callback to reset the origin state before aborting any + * pending transaction during shutdown (see ShutdownPostgres()). This will + * avoid origin advancement for an in-complete transaction which could + * otherwise lead to its loss as such a transaction won't be sent by the + * server again. + * + * Note that even a LOG or DEBUG statement placed after setting the origin + * state may process a shutdown signal before committing the current apply + * operation. So, it is important to register such a callback here. + */ + before_shmem_exit(replorigin_reset, (Datum) 0); + /* Connect to the origin and start the replication. */ elog(DEBUG1, "connecting to publisher using connection string \"%s\"", MySubscription->conninfo); @@ -4893,12 +4917,23 @@ void apply_error_callback(void *arg) { ApplyErrorCallbackArg *errarg = &apply_error_callback_arg; + int elevel; if (apply_error_callback_arg.command == 0) return; Assert(errarg->origin_name); + elevel = geterrlevel(); + + /* + * Reset the origin state to prevent the advancement of origin progress if + * we fail to apply. Otherwise, this will result in transaction loss as + * that transaction won't be sent again by the server. + */ + if (elevel >= ERROR) + replorigin_reset(0, (Datum) 0); + if (errarg->rel == NULL) { if (!TransactionIdIsValid(errarg->remote_xid)) |