diff options
Diffstat (limited to 'src/backend')
-rw-r--r-- | src/backend/replication/logical/worker.c | 35 | ||||
-rw-r--r-- | src/backend/utils/error/elog.c | 17 |
2 files changed, 52 insertions, 0 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index cdea6295d8a..38c28953078 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -4713,6 +4713,17 @@ InitializeLogRepWorker(void) CommitTransactionCommand(); } +/* + * Reset the origin state. + */ +static void +replorigin_reset(int code, Datum arg) +{ + replorigin_session_origin = InvalidRepOriginId; + replorigin_session_origin_lsn = InvalidXLogRecPtr; + replorigin_session_origin_timestamp = 0; +} + /* Common function to setup the leader apply or tablesync worker. */ void SetupApplyOrSyncWorker(int worker_slot) @@ -4741,6 +4752,19 @@ SetupApplyOrSyncWorker(int worker_slot) InitializeLogRepWorker(); + /* + * Register a callback to reset the origin state before aborting any + * pending transaction during shutdown (see ShutdownPostgres()). This will + * avoid origin advancement for an in-complete transaction which could + * otherwise lead to its loss as such a transaction won't be sent by the + * server again. + * + * Note that even a LOG or DEBUG statement placed after setting the origin + * state may process a shutdown signal before committing the current apply + * operation. So, it is important to register such a callback here. + */ + before_shmem_exit(replorigin_reset, (Datum) 0); + /* Connect to the origin and start the replication. */ elog(DEBUG1, "connecting to publisher using connection string \"%s\"", MySubscription->conninfo); @@ -4967,12 +4991,23 @@ void apply_error_callback(void *arg) { ApplyErrorCallbackArg *errarg = &apply_error_callback_arg; + int elevel; if (apply_error_callback_arg.command == 0) return; Assert(errarg->origin_name); + elevel = geterrlevel(); + + /* + * Reset the origin state to prevent the advancement of origin progress if + * we fail to apply. Otherwise, this will result in transaction loss as + * that transaction won't be sent again by the server. + */ + if (elevel >= ERROR) + replorigin_reset(0, (Datum) 0); + if (errarg->rel == NULL) { if (!TransactionIdIsValid(errarg->remote_xid)) diff --git a/src/backend/utils/error/elog.c b/src/backend/utils/error/elog.c index 943d8588f3d..5cbb5b54168 100644 --- a/src/backend/utils/error/elog.c +++ b/src/backend/utils/error/elog.c @@ -1569,6 +1569,23 @@ geterrcode(void) } /* + * geterrlevel --- return the currently set error level + * + * This is only intended for use in error callback subroutines, since there + * is no other place outside elog.c where the concept is meaningful. + */ +int +geterrlevel(void) +{ + ErrorData *edata = &errordata[errordata_stack_depth]; + + /* we don't bother incrementing recursion_depth */ + CHECK_STACK_DEPTH(); + + return edata->elevel; +} + +/* * geterrposition --- return the currently set error position (0 if none) * * This is only intended for use in error callback subroutines, since there |