diff options
-rw-r--r-- | src/backend/replication/logical/worker.c | 73 |
1 files changed, 33 insertions, 40 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c index 7e267f79607..92aa794706d 100644 --- a/src/backend/replication/logical/worker.c +++ b/src/backend/replication/logical/worker.c @@ -226,7 +226,6 @@ typedef struct ApplyErrorCallbackArg /* Remote node information */ int remote_attnum; /* -1 if invalid */ TransactionId remote_xid; - TimestampTz ts; /* commit, rollback, or prepare timestamp */ } ApplyErrorCallbackArg; static ApplyErrorCallbackArg apply_error_callback_arg = @@ -235,7 +234,6 @@ static ApplyErrorCallbackArg apply_error_callback_arg = .rel = NULL, .remote_attnum = -1, .remote_xid = InvalidTransactionId, - .ts = 0, }; static MemoryContext ApplyMessageContext = NULL; @@ -334,7 +332,7 @@ static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn); /* Functions for apply error callback */ static void apply_error_callback(void *arg); -static inline void set_apply_error_context_xact(TransactionId xid, TimestampTz ts); +static inline void set_apply_error_context_xact(TransactionId xid); static inline void reset_apply_error_context_info(void); /* @@ -787,7 +785,7 @@ apply_handle_begin(StringInfo s) LogicalRepBeginData begin_data; logicalrep_read_begin(s, &begin_data); - set_apply_error_context_xact(begin_data.xid, begin_data.committime); + set_apply_error_context_xact(begin_data.xid); remote_final_lsn = begin_data.final_lsn; @@ -839,7 +837,7 @@ apply_handle_begin_prepare(StringInfo s) errmsg_internal("tablesync worker received a BEGIN PREPARE message"))); logicalrep_read_begin_prepare(s, &begin_data); - set_apply_error_context_xact(begin_data.xid, begin_data.prepare_time); + set_apply_error_context_xact(begin_data.xid); remote_final_lsn = begin_data.prepare_lsn; @@ -938,7 +936,7 @@ apply_handle_commit_prepared(StringInfo s) char gid[GIDSIZE]; logicalrep_read_commit_prepared(s, &prepare_data); - set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_time); + set_apply_error_context_xact(prepare_data.xid); /* Compute GID for two_phase transactions. */ TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid, @@ -979,7 +977,7 @@ apply_handle_rollback_prepared(StringInfo s) char gid[GIDSIZE]; logicalrep_read_rollback_prepared(s, &rollback_data); - set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_time); + set_apply_error_context_xact(rollback_data.xid); /* Compute GID for two_phase transactions. */ TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid, @@ -1044,7 +1042,7 @@ apply_handle_stream_prepare(StringInfo s) errmsg_internal("tablesync worker received a STREAM PREPARE message"))); logicalrep_read_stream_prepare(s, &prepare_data); - set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_time); + set_apply_error_context_xact(prepare_data.xid); elog(DEBUG1, "received prepare for streamed transaction %u", prepare_data.xid); @@ -1126,7 +1124,7 @@ apply_handle_stream_start(StringInfo s) (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg_internal("invalid transaction ID in streamed replication transaction"))); - set_apply_error_context_xact(stream_xid, 0); + set_apply_error_context_xact(stream_xid); /* * Initialize the worker's stream_fileset if we haven't yet. This will be @@ -1215,7 +1213,7 @@ apply_handle_stream_abort(StringInfo s) */ if (xid == subxid) { - set_apply_error_context_xact(xid, 0); + set_apply_error_context_xact(xid); stream_cleanup_files(MyLogicalRepWorker->subid, xid); } else @@ -1241,7 +1239,7 @@ apply_handle_stream_abort(StringInfo s) bool found = false; char path[MAXPGPATH]; - set_apply_error_context_xact(subxid, 0); + set_apply_error_context_xact(subxid); subidx = -1; begin_replication_step(); @@ -1426,7 +1424,7 @@ apply_handle_stream_commit(StringInfo s) errmsg_internal("STREAM COMMIT message without STREAM STOP"))); xid = logicalrep_read_stream_commit(s, &commit_data); - set_apply_error_context_xact(xid, commit_data.committime); + set_apply_error_context_xact(xid); elog(DEBUG1, "received commit for streamed transaction %u", xid); @@ -3648,46 +3646,41 @@ IsLogicalWorker(void) static void apply_error_callback(void *arg) { - StringInfoData buf; ApplyErrorCallbackArg *errarg = &apply_error_callback_arg; if (apply_error_callback_arg.command == 0) return; - initStringInfo(&buf); - appendStringInfo(&buf, _("processing remote data during \"%s\""), - logicalrep_message_type(errarg->command)); - - /* append relation information */ - if (errarg->rel) - { - appendStringInfo(&buf, _(" for replication target relation \"%s.%s\""), - errarg->rel->remoterel.nspname, - errarg->rel->remoterel.relname); - if (errarg->remote_attnum >= 0) - appendStringInfo(&buf, _(" column \"%s\""), - errarg->rel->remoterel.attnames[errarg->remote_attnum]); - } - - /* append transaction information */ - if (TransactionIdIsNormal(errarg->remote_xid)) + if (errarg->rel == NULL) { - appendStringInfo(&buf, _(" in transaction %u"), errarg->remote_xid); - if (errarg->ts != 0) - appendStringInfo(&buf, _(" at %s"), - timestamptz_to_str(errarg->ts)); + if (!TransactionIdIsValid(errarg->remote_xid)) + errcontext("processing remote data during \"%s\"", + logicalrep_message_type(errarg->command)); + else + errcontext("processing remote data during \"%s\" in transaction %u", + logicalrep_message_type(errarg->command), + errarg->remote_xid); } - - errcontext("%s", buf.data); - pfree(buf.data); + else if (errarg->remote_attnum < 0) + errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" in transaction %u", + logicalrep_message_type(errarg->command), + errarg->rel->remoterel.nspname, + errarg->rel->remoterel.relname, + errarg->remote_xid); + else + errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u", + logicalrep_message_type(errarg->command), + errarg->rel->remoterel.nspname, + errarg->rel->remoterel.relname, + errarg->rel->remoterel.attnames[errarg->remote_attnum], + errarg->remote_xid); } /* Set transaction information of apply error callback */ static inline void -set_apply_error_context_xact(TransactionId xid, TimestampTz ts) +set_apply_error_context_xact(TransactionId xid) { apply_error_callback_arg.remote_xid = xid; - apply_error_callback_arg.ts = ts; } /* Reset all information of apply error callback */ @@ -3697,5 +3690,5 @@ reset_apply_error_context_info(void) apply_error_callback_arg.command = 0; apply_error_callback_arg.rel = NULL; apply_error_callback_arg.remote_attnum = -1; - set_apply_error_context_xact(InvalidTransactionId, 0); + set_apply_error_context_xact(InvalidTransactionId); } |