aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/logical
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/logical')
-rw-r--r--src/backend/replication/logical/launcher.c42
-rw-r--r--src/backend/replication/logical/logical.c14
-rw-r--r--src/backend/replication/logical/origin.c6
-rw-r--r--src/backend/replication/logical/reorderbuffer.c2
-rw-r--r--src/backend/replication/logical/slotsync.c12
-rw-r--r--src/backend/replication/logical/snapbuild.c46
-rw-r--r--src/backend/replication/logical/tablesync.c21
-rw-r--r--src/backend/replication/logical/worker.c27
8 files changed, 96 insertions, 74 deletions
diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c
index 1c3c051403d..4aed0dfcebb 100644
--- a/src/backend/replication/logical/launcher.c
+++ b/src/backend/replication/logical/launcher.c
@@ -175,12 +175,14 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
uint16 generation,
BackgroundWorkerHandle *handle)
{
- BgwHandleStatus status;
- int rc;
+ bool result = false;
+ bool dropped_latch = false;
for (;;)
{
+ BgwHandleStatus status;
pid_t pid;
+ int rc;
CHECK_FOR_INTERRUPTS();
@@ -189,8 +191,9 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
/* Worker either died or has started. Return false if died. */
if (!worker->in_use || worker->proc)
{
+ result = worker->in_use;
LWLockRelease(LogicalRepWorkerLock);
- return worker->in_use;
+ break;
}
LWLockRelease(LogicalRepWorkerLock);
@@ -205,7 +208,7 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
if (generation == worker->generation)
logicalrep_worker_cleanup(worker);
LWLockRelease(LogicalRepWorkerLock);
- return false;
+ break; /* result is already false */
}
/*
@@ -220,8 +223,18 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
{
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
+ dropped_latch = true;
}
}
+
+ /*
+ * If we had to clear a latch event in order to wait, be sure to restore
+ * it before exiting. Otherwise caller may miss events.
+ */
+ if (dropped_latch)
+ SetLatch(MyLatch);
+
+ return result;
}
/*
@@ -328,7 +341,7 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype,
if (max_active_replication_origins == 0)
ereport(ERROR,
(errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
- errmsg("cannot start logical replication workers when \"max_active_replication_origins\"=0")));
+ errmsg("cannot start logical replication workers when \"max_active_replication_origins\" is 0")));
/*
* We need to do the modification of the shared memory under lock so that
@@ -1194,10 +1207,21 @@ ApplyLauncherMain(Datum main_arg)
(elapsed = TimestampDifferenceMilliseconds(last_start, now)) >= wal_retrieve_retry_interval)
{
ApplyLauncherSetWorkerStartTime(sub->oid, now);
- logicalrep_worker_launch(WORKERTYPE_APPLY,
- sub->dbid, sub->oid, sub->name,
- sub->owner, InvalidOid,
- DSM_HANDLE_INVALID);
+ if (!logicalrep_worker_launch(WORKERTYPE_APPLY,
+ sub->dbid, sub->oid, sub->name,
+ sub->owner, InvalidOid,
+ DSM_HANDLE_INVALID))
+ {
+ /*
+ * We get here either if we failed to launch a worker
+ * (perhaps for resource-exhaustion reasons) or if we
+ * launched one but it immediately quit. Either way, it
+ * seems appropriate to try again after
+ * wal_retrieve_retry_interval.
+ */
+ wait_time = Min(wait_time,
+ wal_retrieve_retry_interval);
+ }
}
else
{
diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c
index f1eb798f3e9..7e363a7c05b 100644
--- a/src/backend/replication/logical/logical.c
+++ b/src/backend/replication/logical/logical.c
@@ -567,7 +567,7 @@ CreateDecodingContext(XLogRecPtr start_lsn,
* kinds of client errors; so the client may wish to check that
* confirmed_flush_lsn matches its expectations.
*/
- elog(LOG, "%X/%X has been already streamed, forwarding to %X/%X",
+ elog(LOG, "%X/%08X has been already streamed, forwarding to %X/%08X",
LSN_FORMAT_ARGS(start_lsn),
LSN_FORMAT_ARGS(slot->data.confirmed_flush));
@@ -610,7 +610,7 @@ CreateDecodingContext(XLogRecPtr start_lsn,
ereport(LOG,
(errmsg("starting logical decoding for slot \"%s\"",
NameStr(slot->data.name)),
- errdetail("Streaming transactions committing after %X/%X, reading WAL from %X/%X.",
+ errdetail("Streaming transactions committing after %X/%08X, reading WAL from %X/%08X.",
LSN_FORMAT_ARGS(slot->data.confirmed_flush),
LSN_FORMAT_ARGS(slot->data.restart_lsn))));
@@ -637,7 +637,7 @@ DecodingContextFindStartpoint(LogicalDecodingContext *ctx)
/* Initialize from where to start reading WAL. */
XLogBeginRead(ctx->reader, slot->data.restart_lsn);
- elog(DEBUG1, "searching for logical decoding starting point, starting at %X/%X",
+ elog(DEBUG1, "searching for logical decoding starting point, starting at %X/%08X",
LSN_FORMAT_ARGS(slot->data.restart_lsn));
/* Wait for a consistent starting point */
@@ -758,7 +758,7 @@ output_plugin_error_callback(void *arg)
/* not all callbacks have an associated LSN */
if (state->report_location != InvalidXLogRecPtr)
- errcontext("slot \"%s\", output plugin \"%s\", in the %s callback, associated LSN %X/%X",
+ errcontext("slot \"%s\", output plugin \"%s\", in the %s callback, associated LSN %X/%08X",
NameStr(state->ctx->slot->data.name),
NameStr(state->ctx->slot->data.plugin),
state->callback_name,
@@ -1725,7 +1725,7 @@ LogicalIncreaseXminForSlot(XLogRecPtr current_lsn, TransactionId xmin)
SpinLockRelease(&slot->mutex);
if (got_new_xmin)
- elog(DEBUG1, "got new catalog xmin %u at %X/%X", xmin,
+ elog(DEBUG1, "got new catalog xmin %u at %X/%08X", xmin,
LSN_FORMAT_ARGS(current_lsn));
/* candidate already valid with the current flush position, apply */
@@ -1785,7 +1785,7 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
slot->candidate_restart_lsn = restart_lsn;
SpinLockRelease(&slot->mutex);
- elog(DEBUG1, "got new restart lsn %X/%X at %X/%X",
+ elog(DEBUG1, "got new restart lsn %X/%08X at %X/%08X",
LSN_FORMAT_ARGS(restart_lsn),
LSN_FORMAT_ARGS(current_lsn));
}
@@ -1800,7 +1800,7 @@ LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart
confirmed_flush = slot->data.confirmed_flush;
SpinLockRelease(&slot->mutex);
- elog(DEBUG1, "failed to increase restart lsn: proposed %X/%X, after %X/%X, current candidate %X/%X, current after %X/%X, flushed up to %X/%X",
+ elog(DEBUG1, "failed to increase restart lsn: proposed %X/%08X, after %X/%08X, current candidate %X/%08X, current after %X/%08X, flushed up to %X/%08X",
LSN_FORMAT_ARGS(restart_lsn),
LSN_FORMAT_ARGS(current_lsn),
LSN_FORMAT_ARGS(candidate_restart_lsn),
diff --git a/src/backend/replication/logical/origin.c b/src/backend/replication/logical/origin.c
index a17bacf88e7..87f10e50dcc 100644
--- a/src/backend/replication/logical/origin.c
+++ b/src/backend/replication/logical/origin.c
@@ -826,9 +826,9 @@ StartupReplicationOrigin(void)
last_state++;
ereport(LOG,
- (errmsg("recovered replication state of node %d to %X/%X",
- disk_state.roident,
- LSN_FORMAT_ARGS(disk_state.remote_lsn))));
+ errmsg("recovered replication state of node %d to %X/%08X",
+ disk_state.roident,
+ LSN_FORMAT_ARGS(disk_state.remote_lsn)));
}
/* now check checksum */
diff --git a/src/backend/replication/logical/reorderbuffer.c b/src/backend/replication/logical/reorderbuffer.c
index c4299c76fb1..7b4e8629553 100644
--- a/src/backend/replication/logical/reorderbuffer.c
+++ b/src/backend/replication/logical/reorderbuffer.c
@@ -1415,7 +1415,7 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
int32 off;
/* nothing there anymore */
- if (state->heap->bh_size == 0)
+ if (binaryheap_empty(state->heap))
return NULL;
off = DatumGetInt32(binaryheap_first(state->heap));
diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c
index 656e66e0ae0..2f0c08b8fbd 100644
--- a/src/backend/replication/logical/slotsync.c
+++ b/src/backend/replication/logical/slotsync.c
@@ -211,9 +211,9 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
* impact the users, so we used DEBUG1 level to log the message.
*/
ereport(slot->data.persistency == RS_TEMPORARY ? LOG : DEBUG1,
- errmsg("could not synchronize replication slot \"%s\" because remote slot precedes local slot",
+ errmsg("could not synchronize replication slot \"%s\"",
remote_slot->name),
- errdetail("The remote slot has LSN %X/%X and catalog xmin %u, but the local slot has LSN %X/%X and catalog xmin %u.",
+ errdetail("Synchronization could lead to data loss, because the remote slot needs WAL at LSN %X/%08X and catalog xmin %u, but the standby has LSN %X/%08X and catalog xmin %u.",
LSN_FORMAT_ARGS(remote_slot->restart_lsn),
remote_slot->catalog_xmin,
LSN_FORMAT_ARGS(slot->data.restart_lsn),
@@ -275,7 +275,7 @@ update_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
ereport(ERROR,
errmsg_internal("synchronized confirmed_flush for slot \"%s\" differs from remote slot",
remote_slot->name),
- errdetail_internal("Remote slot has LSN %X/%X but local slot has LSN %X/%X.",
+ errdetail_internal("Remote slot has LSN %X/%08X but local slot has LSN %X/%08X.",
LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
LSN_FORMAT_ARGS(slot->data.confirmed_flush)));
}
@@ -593,7 +593,7 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
{
ereport(LOG,
errmsg("could not synchronize replication slot \"%s\"", remote_slot->name),
- errdetail("Logical decoding could not find consistent point from local slot's LSN %X/%X.",
+ errdetail("Synchronization could lead to data loss, because the standby could not build a consistent snapshot to decode WALs at LSN %X/%08X.",
LSN_FORMAT_ARGS(slot->data.restart_lsn)));
return false;
@@ -642,7 +642,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
ereport(AmLogicalSlotSyncWorkerProcess() ? LOG : ERROR,
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("skipping slot synchronization because the received slot sync"
- " LSN %X/%X for slot \"%s\" is ahead of the standby position %X/%X",
+ " LSN %X/%08X for slot \"%s\" is ahead of the standby position %X/%08X",
LSN_FORMAT_ARGS(remote_slot->confirmed_lsn),
remote_slot->name,
LSN_FORMAT_ARGS(latestFlushPtr)));
@@ -733,7 +733,7 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
ereport(ERROR,
errmsg_internal("cannot synchronize local slot \"%s\"",
remote_slot->name),
- errdetail_internal("Local slot's start streaming location LSN(%X/%X) is ahead of remote slot's LSN(%X/%X).",
+ errdetail_internal("Local slot's start streaming location LSN(%X/%08X) is ahead of remote slot's LSN(%X/%08X).",
LSN_FORMAT_ARGS(slot->data.confirmed_flush),
LSN_FORMAT_ARGS(remote_slot->confirmed_lsn)));
diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c
index adf18c397db..8532bfd27e5 100644
--- a/src/backend/replication/logical/snapbuild.c
+++ b/src/backend/replication/logical/snapbuild.c
@@ -774,7 +774,7 @@ SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, Transact
if (rbtxn_is_prepared(txn))
continue;
- elog(DEBUG2, "adding a new snapshot and invalidations to %u at %X/%X",
+ elog(DEBUG2, "adding a new snapshot and invalidations to %u at %X/%08X",
txn->xid, LSN_FORMAT_ARGS(lsn));
/*
@@ -1271,10 +1271,10 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
builder->initial_xmin_horizon))
{
ereport(DEBUG1,
- (errmsg_internal("skipping snapshot at %X/%X while building logical decoding snapshot, xmin horizon too low",
- LSN_FORMAT_ARGS(lsn)),
- errdetail_internal("initial xmin horizon of %u vs the snapshot's %u",
- builder->initial_xmin_horizon, running->oldestRunningXid)));
+ errmsg_internal("skipping snapshot at %X/%08X while building logical decoding snapshot, xmin horizon too low",
+ LSN_FORMAT_ARGS(lsn)),
+ errdetail_internal("initial xmin horizon of %u vs the snapshot's %u",
+ builder->initial_xmin_horizon, running->oldestRunningXid));
SnapBuildWaitSnapshot(running, builder->initial_xmin_horizon);
@@ -1310,9 +1310,9 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
builder->next_phase_at = InvalidTransactionId;
ereport(LOG,
- (errmsg("logical decoding found consistent point at %X/%X",
- LSN_FORMAT_ARGS(lsn)),
- errdetail("There are no running transactions.")));
+ errmsg("logical decoding found consistent point at %X/%08X",
+ LSN_FORMAT_ARGS(lsn)),
+ errdetail("There are no running transactions."));
return false;
}
@@ -1359,10 +1359,10 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
Assert(TransactionIdIsNormal(builder->xmax));
ereport(LOG,
- (errmsg("logical decoding found initial starting point at %X/%X",
- LSN_FORMAT_ARGS(lsn)),
- errdetail("Waiting for transactions (approximately %d) older than %u to end.",
- running->xcnt, running->nextXid)));
+ errmsg("logical decoding found initial starting point at %X/%08X",
+ LSN_FORMAT_ARGS(lsn)),
+ errdetail("Waiting for transactions (approximately %d) older than %u to end.",
+ running->xcnt, running->nextXid));
SnapBuildWaitSnapshot(running, running->nextXid);
}
@@ -1383,10 +1383,10 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
builder->next_phase_at = running->nextXid;
ereport(LOG,
- (errmsg("logical decoding found initial consistent point at %X/%X",
- LSN_FORMAT_ARGS(lsn)),
- errdetail("Waiting for transactions (approximately %d) older than %u to end.",
- running->xcnt, running->nextXid)));
+ errmsg("logical decoding found initial consistent point at %X/%08X",
+ LSN_FORMAT_ARGS(lsn)),
+ errdetail("Waiting for transactions (approximately %d) older than %u to end.",
+ running->xcnt, running->nextXid));
SnapBuildWaitSnapshot(running, running->nextXid);
}
@@ -1407,9 +1407,9 @@ SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *runn
builder->next_phase_at = InvalidTransactionId;
ereport(LOG,
- (errmsg("logical decoding found consistent point at %X/%X",
- LSN_FORMAT_ARGS(lsn)),
- errdetail("There are no old transactions anymore.")));
+ errmsg("logical decoding found consistent point at %X/%08X",
+ LSN_FORMAT_ARGS(lsn)),
+ errdetail("There are no old transactions anymore."));
}
/*
@@ -1913,9 +1913,9 @@ SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
Assert(builder->state == SNAPBUILD_CONSISTENT);
ereport(LOG,
- (errmsg("logical decoding found consistent point at %X/%X",
- LSN_FORMAT_ARGS(lsn)),
- errdetail("Logical decoding will begin using saved snapshot.")));
+ errmsg("logical decoding found consistent point at %X/%08X",
+ LSN_FORMAT_ARGS(lsn)),
+ errdetail("Logical decoding will begin using saved snapshot."));
return true;
snapshot_not_interesting:
@@ -2061,7 +2061,7 @@ SnapBuildSnapshotExists(XLogRecPtr lsn)
int ret;
struct stat stat_buf;
- sprintf(path, "%s/%X-%X.snap",
+ sprintf(path, "%s/%08X-%08X.snap",
PG_LOGICAL_SNAPSHOTS_DIR,
LSN_FORMAT_ARGS(lsn));
diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c
index 8e1e8762f62..e4fd6347fd1 100644
--- a/src/backend/replication/logical/tablesync.c
+++ b/src/backend/replication/logical/tablesync.c
@@ -603,14 +603,19 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
TimestampDifferenceExceeds(hentry->last_start_time, now,
wal_retrieve_retry_interval))
{
- logicalrep_worker_launch(WORKERTYPE_TABLESYNC,
- MyLogicalRepWorker->dbid,
- MySubscription->oid,
- MySubscription->name,
- MyLogicalRepWorker->userid,
- rstate->relid,
- DSM_HANDLE_INVALID);
+ /*
+ * Set the last_start_time even if we fail to start
+ * the worker, so that we won't retry until
+ * wal_retrieve_retry_interval has elapsed.
+ */
hentry->last_start_time = now;
+ (void) logicalrep_worker_launch(WORKERTYPE_TABLESYNC,
+ MyLogicalRepWorker->dbid,
+ MySubscription->oid,
+ MySubscription->name,
+ MyLogicalRepWorker->userid,
+ rstate->relid,
+ DSM_HANDLE_INVALID);
}
}
}
@@ -1548,7 +1553,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
copy_table_done:
elog(DEBUG1,
- "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%X",
+ "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%08X",
originname, LSN_FORMAT_ARGS(*origin_startpos));
/*
diff --git a/src/backend/replication/logical/worker.c b/src/backend/replication/logical/worker.c
index a23262957ac..c5fb627aa56 100644
--- a/src/backend/replication/logical/worker.c
+++ b/src/backend/replication/logical/worker.c
@@ -109,13 +109,6 @@
* If ever a user needs to be aware of the tri-state value, they can fetch it
* from the pg_subscription catalog (see column subtwophasestate).
*
- * We don't allow to toggle two_phase option of a subscription because it can
- * lead to an inconsistent replica. Consider, initially, it was on and we have
- * received some prepare then we turn it off, now at commit time the server
- * will send the entire transaction data along with the commit. With some more
- * analysis, we can allow changing this option from off to on but not sure if
- * that alone would be useful.
- *
* Finally, to avoid problems mentioned in previous paragraphs from any
* subsequent (not READY) tablesyncs (need to toggle two_phase option from 'on'
* to 'off' and then again back to 'on') there is a restriction for
@@ -1023,7 +1016,7 @@ apply_handle_commit(StringInfo s)
if (commit_data.commit_lsn != remote_final_lsn)
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg_internal("incorrect commit LSN %X/%X in commit message (expected %X/%X)",
+ errmsg_internal("incorrect commit LSN %X/%08X in commit message (expected %X/%08X)",
LSN_FORMAT_ARGS(commit_data.commit_lsn),
LSN_FORMAT_ARGS(remote_final_lsn))));
@@ -1115,7 +1108,7 @@ apply_handle_prepare(StringInfo s)
if (prepare_data.prepare_lsn != remote_final_lsn)
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg_internal("incorrect prepare LSN %X/%X in prepare message (expected %X/%X)",
+ errmsg_internal("incorrect prepare LSN %X/%08X in prepare message (expected %X/%08X)",
LSN_FORMAT_ARGS(prepare_data.prepare_lsn),
LSN_FORMAT_ARGS(remote_final_lsn))));
@@ -3910,7 +3903,7 @@ send_feedback(XLogRecPtr recvpos, bool force, bool requestReply)
pq_sendint64(reply_message, now); /* sendTime */
pq_sendbyte(reply_message, requestReply); /* replyRequested */
- elog(DEBUG2, "sending feedback (force %d) to recv %X/%X, write %X/%X, flush %X/%X",
+ elog(DEBUG2, "sending feedback (force %d) to recv %X/%08X, write %X/%08X, flush %X/%08X",
force,
LSN_FORMAT_ARGS(recvpos),
LSN_FORMAT_ARGS(writepos),
@@ -4916,7 +4909,7 @@ maybe_start_skipping_changes(XLogRecPtr finish_lsn)
skip_xact_finish_lsn = finish_lsn;
ereport(LOG,
- errmsg("logical replication starts skipping transaction at LSN %X/%X",
+ errmsg("logical replication starts skipping transaction at LSN %X/%08X",
LSN_FORMAT_ARGS(skip_xact_finish_lsn)));
}
@@ -4930,8 +4923,8 @@ stop_skipping_changes(void)
return;
ereport(LOG,
- (errmsg("logical replication completed skipping transaction at LSN %X/%X",
- LSN_FORMAT_ARGS(skip_xact_finish_lsn))));
+ errmsg("logical replication completed skipping transaction at LSN %X/%08X",
+ LSN_FORMAT_ARGS(skip_xact_finish_lsn)));
/* Stop skipping changes */
skip_xact_finish_lsn = InvalidXLogRecPtr;
@@ -5019,7 +5012,7 @@ clear_subscription_skip_lsn(XLogRecPtr finish_lsn)
if (myskiplsn != finish_lsn)
ereport(WARNING,
errmsg("skip-LSN of subscription \"%s\" cleared", MySubscription->name),
- errdetail("Remote transaction's finish WAL location (LSN) %X/%X did not match skip-LSN %X/%X.",
+ errdetail("Remote transaction's finish WAL location (LSN) %X/%08X did not match skip-LSN %X/%08X.",
LSN_FORMAT_ARGS(finish_lsn),
LSN_FORMAT_ARGS(myskiplsn)));
}
@@ -5056,7 +5049,7 @@ apply_error_callback(void *arg)
logicalrep_message_type(errarg->command),
errarg->remote_xid);
else
- errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u, finished at %X/%X",
+ errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" in transaction %u, finished at %X/%08X",
errarg->origin_name,
logicalrep_message_type(errarg->command),
errarg->remote_xid,
@@ -5074,7 +5067,7 @@ apply_error_callback(void *arg)
errarg->rel->remoterel.relname,
errarg->remote_xid);
else
- errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u, finished at %X/%X",
+ errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" in transaction %u, finished at %X/%08X",
errarg->origin_name,
logicalrep_message_type(errarg->command),
errarg->rel->remoterel.nspname,
@@ -5093,7 +5086,7 @@ apply_error_callback(void *arg)
errarg->rel->remoterel.attnames[errarg->remote_attnum],
errarg->remote_xid);
else
- errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u, finished at %X/%X",
+ errcontext("processing remote data for replication origin \"%s\" during message type \"%s\" for replication target relation \"%s.%s\" column \"%s\" in transaction %u, finished at %X/%08X",
errarg->origin_name,
logicalrep_message_type(errarg->command),
errarg->rel->remoterel.nspname,