aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/replication/logical/worker.c73
1 files changed, 33 insertions, 40 deletions
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index 7e267f79607..92aa794706d 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -226,7 +226,6 @@ typedef struct ApplyErrorCallbackArg
/* Remote node information */
int remote_attnum; /* -1 if invalid */
TransactionId remote_xid;
- TimestampTz ts; /* commit, rollback, or prepare timestamp */
} ApplyErrorCallbackArg;
static ApplyErrorCallbackArg apply_error_callback_arg =
@@ -235,7 +234,6 @@ static ApplyErrorCallbackArg apply_error_callback_arg =
.rel = NULL,
.remote_attnum = -1,
.remote_xid = InvalidTransactionId,
- .ts = 0,
};
static MemoryContext ApplyMessageContext = NULL;
@@ -334,7 +332,7 @@ static void apply_spooled_messages(TransactionId xid, XLogRecPtr lsn);
/* Functions for apply error callback */
static void apply_error_callback(void *arg);
-static inline void set_apply_error_context_xact(TransactionId xid, TimestampTz ts);
+static inline void set_apply_error_context_xact(TransactionId xid);
static inline void reset_apply_error_context_info(void);
/*
@@ -787,7 +785,7 @@ apply_handle_begin(StringInfo s)
LogicalRepBeginData begin_data;
logicalrep_read_begin(s, &begin_data);
- set_apply_error_context_xact(begin_data.xid, begin_data.committime);
+ set_apply_error_context_xact(begin_data.xid);
remote_final_lsn = begin_data.final_lsn;
@@ -839,7 +837,7 @@ apply_handle_begin_prepare(StringInfo s)
errmsg_internal("tablesync worker received a BEGIN PREPARE message")));
logicalrep_read_begin_prepare(s, &begin_data);
- set_apply_error_context_xact(begin_data.xid, begin_data.prepare_time);
+ set_apply_error_context_xact(begin_data.xid);
remote_final_lsn = begin_data.prepare_lsn;
@@ -938,7 +936,7 @@ apply_handle_commit_prepared(StringInfo s)
char gid[GIDSIZE];
logicalrep_read_commit_prepared(s, &prepare_data);
- set_apply_error_context_xact(prepare_data.xid, prepare_data.commit_time);
+ set_apply_error_context_xact(prepare_data.xid);
/* Compute GID for two_phase transactions. */
TwoPhaseTransactionGid(MySubscription->oid, prepare_data.xid,
@@ -979,7 +977,7 @@ apply_handle_rollback_prepared(StringInfo s)
char gid[GIDSIZE];
logicalrep_read_rollback_prepared(s, &rollback_data);
- set_apply_error_context_xact(rollback_data.xid, rollback_data.rollback_time);
+ set_apply_error_context_xact(rollback_data.xid);
/* Compute GID for two_phase transactions. */
TwoPhaseTransactionGid(MySubscription->oid, rollback_data.xid,
@@ -1044,7 +1042,7 @@ apply_handle_stream_prepare(StringInfo s)
errmsg_internal("tablesync worker received a STREAM PREPARE message")));
logicalrep_read_stream_prepare(s, &prepare_data);
- set_apply_error_context_xact(prepare_data.xid, prepare_data.prepare_time);
+ set_apply_error_context_xact(prepare_data.xid);
elog(DEBUG1, "received prepare for streamed transaction %u", prepare_data.xid);
@@ -1126,7 +1124,7 @@ apply_handle_stream_start(StringInfo s)
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg_internal("invalid transaction ID in streamed replication transaction")));
- set_apply_error_context_xact(stream_xid, 0);
+ set_apply_error_context_xact(stream_xid);
/*
* Initialize the worker's stream_fileset if we haven't yet. This will be
@@ -1215,7 +1213,7 @@ apply_handle_stream_abort(StringInfo s)
*/
if (xid == subxid)
{
- set_apply_error_context_xact(xid, 0);
+ set_apply_error_context_xact(xid);
stream_cleanup_files(MyLogicalRepWorker->subid, xid);
}
else
@@ -1241,7 +1239,7 @@ apply_handle_stream_abort(StringInfo s)
bool found = false;
char path[MAXPGPATH];
- set_apply_error_context_xact(subxid, 0);
+ set_apply_error_context_xact(subxid);
subidx = -1;
begin_replication_step();
@@ -1426,7 +1424,7 @@ apply_handle_stream_commit(StringInfo s)
errmsg_internal("STREAM COMMIT message without STREAM STOP")));
xid = logicalrep_read_stream_commit(s, &commit_data);
- set_apply_error_context_xact(xid, commit_data.committime);
+ set_apply_error_context_xact(xid);
elog(DEBUG1, "received commit for streamed transaction %u", xid);
@@ -3648,46 +3646,41 @@ IsLogicalWorker(void)
static void
apply_error_callback(void *arg)
{
- StringInfoData buf;
ApplyErrorCallbackArg *errarg = &apply_error_callback_arg;
if (apply_error_callback_arg.command == 0)
return;
- initStringInfo(&buf);
- appendStringInfo(&buf, _("processing remote data during \"%s\""),
- logicalrep_message_type(errarg->command));
-
- /* append relation information */
- if (errarg->rel)
- {
- appendStringInfo(&buf, _(" for replication target relation \"%s.%s\""),
- errarg->rel->remoterel.nspname,
- errarg->rel->remoterel.relname);
- if (errarg->remote_attnum >= 0)
- appendStringInfo(&buf, _(" column \"%s\""),
- errarg->rel->remoterel.attnames[errarg->remote_attnum]);
- }
-
- /* append transaction information */
- if (TransactionIdIsNormal(errarg->remote_xid))
+ if (errarg->rel == NULL)
{
- appendStringInfo(&buf, _(" in transaction %u"), errarg->remote_xid);
- if (errarg->ts != 0)
- appendStringInfo(&buf, _(" at %s"),
- timestamptz_to_str(errarg->ts));
+ if (!TransactionIdIsValid(errarg->remote_xid))
+ errcontext("processing remote data during \"%s\"",
+ logicalrep_message_type(errarg->command));
+ else
+ errcontext("processing remote data during \"%s\" in transaction %u",
+ logicalrep_message_type(errarg->command),
+ errarg->remote_xid);
}
-
- errcontext("%s", buf.data);
- pfree(buf.data);
+ else if (errarg->remote_attnum < 0)
+ errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" in transaction %u",
+ logicalrep_message_type(errarg->command),
+ errarg->rel->remoterel.nspname,
+ errarg->rel->remoterel.relname,
+ errarg->remote_xid);
+ else
+ errcontext("processing remote data during \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u",
+ logicalrep_message_type(errarg->command),
+ errarg->rel->remoterel.nspname,
+ errarg->rel->remoterel.relname,
+ errarg->rel->remoterel.attnames[errarg->remote_attnum],
+ errarg->remote_xid);
}
/* Set transaction information of apply error callback */
static inline void
-set_apply_error_context_xact(TransactionId xid, TimestampTz ts)
+set_apply_error_context_xact(TransactionId xid)
{
apply_error_callback_arg.remote_xid = xid;
- apply_error_callback_arg.ts = ts;
}
/* Reset all information of apply error callback */
@@ -3697,5 +3690,5 @@ reset_apply_error_context_info(void)
apply_error_callback_arg.command = 0;
apply_error_callback_arg.rel = NULL;
apply_error_callback_arg.remote_attnum = -1;
- set_apply_error_context_xact(InvalidTransactionId, 0);
+ set_apply_error_context_xact(InvalidTransactionId);
}