diff options
-rw-r--r-- | src/backend/replication/logical/worker.c | 21 | ||||
-rw-r--r-- | src/backend/utils/error/elog.c | 17 | ||||
-rw-r--r-- | src/include/utils/elog.h | 1 | ||||
-rw-r--r-- | src/test/subscription/t/100_bugs.pl | 77 |
4 files changed, 87 insertions, 29 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 5ce596f4576..4151a4b2a96 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -414,6 +414,8 @@ static inline void reset_apply_error_context_info(void); static TransApplyAction get_transaction_apply_action(TransactionId xid, ParallelApplyWorkerInfo **winfo); +static void replorigin_reset(int code, Datum arg); + /* * Form the origin name for the subscription. * @@ -4516,6 +4518,14 @@ start_apply(XLogRecPtr origin_startpos) } PG_CATCH(); { + /* + * Reset the origin state to prevent the advancement of origin + * progress if we fail to apply. Otherwise, this will result in + * transaction loss as that transaction won't be sent again by the + * server. + */ + replorigin_reset(0, (Datum) 0); + if (MySubscription->disableonerr) DisableSubscriptionAndExit(); else @@ -5004,23 +5014,12 @@ void apply_error_callback(void *arg) { ApplyErrorCallbackArg *errarg = &apply_error_callback_arg; - int elevel; if (apply_error_callback_arg.command == 0) return; Assert(errarg->origin_name); - elevel = geterrlevel(); - - /* - * Reset the origin state to prevent the advancement of origin progress if - * we fail to apply. Otherwise, this will result in transaction loss as - * that transaction won't be sent again by the server. - */ - if (elevel >= ERROR) - replorigin_reset(0, (Datum) 0); - if (errarg->rel == NULL) { if (!TransactionIdIsValid(errarg->remote_xid)) diff --git a/src/backend/utils/error/elog.c b/src/backend/utils/error/elog.c index d6299633ab7..47af743990f 100644 --- a/src/backend/utils/error/elog.c +++ b/src/backend/utils/error/elog.c @@ -1591,23 +1591,6 @@ geterrcode(void) } /* - * geterrlevel --- return the currently set error level - * - * This is only intended for use in error callback subroutines, since there - * is no other place outside elog.c where the concept is meaningful. - */ -int -geterrlevel(void) -{ - ErrorData *edata = &errordata[errordata_stack_depth]; - - /* we don't bother incrementing recursion_depth */ - CHECK_STACK_DEPTH(); - - return edata->elevel; -} - -/* * geterrposition --- return the currently set error position (0 if none) * * This is only intended for use in error callback subroutines, since there diff --git a/src/include/utils/elog.h b/src/include/utils/elog.h index a5313c5d2d5..5eac0e16970 100644 --- a/src/include/utils/elog.h +++ b/src/include/utils/elog.h @@ -227,7 +227,6 @@ extern int internalerrquery(const char *query); extern int err_generic_string(int field, const char *str); extern int geterrcode(void); -extern int geterrlevel(void); extern int geterrposition(void); extern int getinternalerrposition(void); diff --git a/src/test/subscription/t/100_bugs.pl b/src/test/subscription/t/100_bugs.pl index b3924ca4b09..5e357701183 100644 --- a/src/test/subscription/t/100_bugs.pl +++ b/src/test/subscription/t/100_bugs.pl @@ -498,4 +498,81 @@ is( $node_publisher->psql( $node_publisher->stop('fast'); $node_subscriber->stop('fast'); +# The bug was that when an ERROR was caught and handled by a (PL/pgSQL) +# function, the apply worker reset the replication origin but continued +# processing subsequent changes. So, we fail to update the replication origin +# during further apply operations. This can lead to the apply worker requesting +# the changes that have been applied again after restarting. + +$node_publisher->rotate_logfile(); +$node_publisher->start(); + +$node_subscriber->rotate_logfile(); +$node_subscriber->start(); + +# Set up a publication with a table +$node_publisher->safe_psql( + 'postgres', qq( + CREATE TABLE t1 (a int); + CREATE PUBLICATION regress_pub FOR TABLE t1; +)); + +# Set up a subscription which subscribes the publication +$node_subscriber->safe_psql( + 'postgres', qq( + CREATE TABLE t1 (a int); + CREATE SUBSCRIPTION regress_sub CONNECTION '$publisher_connstr' PUBLICATION regress_pub; +)); + +$node_subscriber->wait_for_subscription_sync($node_publisher, 'regress_sub'); + +# Create an AFTER INSERT trigger on the table that raises and subsequently +# handles an exception. Subsequent insertions will trigger this exception, +# causing the apply worker to invoke its error callback with an ERROR. However, +# since the error is caught within the trigger, the apply worker will continue +# processing changes. +$node_subscriber->safe_psql( + 'postgres', q{ +CREATE FUNCTION handle_exception_trigger() +RETURNS TRIGGER AS $$ +BEGIN + BEGIN + -- Raise an exception + RAISE EXCEPTION 'This is a test exception'; + EXCEPTION + WHEN OTHERS THEN + RETURN NEW; + END; + + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +CREATE TRIGGER silent_exception_trigger +AFTER INSERT OR UPDATE ON t1 +FOR EACH ROW +EXECUTE FUNCTION handle_exception_trigger(); + +ALTER TABLE t1 ENABLE ALWAYS TRIGGER silent_exception_trigger; +}); + +# Obtain current remote_lsn value to check its advancement later +my $remote_lsn = $node_subscriber->safe_psql('postgres', + "SELECT remote_lsn FROM pg_replication_origin_status os, pg_subscription s WHERE os.external_id = 'pg_' || s.oid AND s.subname = 'regress_sub'" +); + +# Insert a tuple to replicate changes +$node_publisher->safe_psql('postgres', "INSERT INTO t1 VALUES (1);"); +$node_publisher->wait_for_catchup('regress_sub'); + +# Confirms the origin can be advanced +$result = $node_subscriber->safe_psql('postgres', + "SELECT remote_lsn > '$remote_lsn' FROM pg_replication_origin_status os, pg_subscription s WHERE os.external_id = 'pg_' || s.oid AND s.subname = 'regress_sub'" +); +is($result, 't', + 'remote_lsn has advanced for apply worker raising an exception'); + +$node_publisher->stop('fast'); +$node_subscriber->stop('fast'); + done_testing(); |