aboutsummaryrefslogtreecommitdiff
path: root/src/backend
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend')
-rw-r--r--src/backend/catalog/system_views.sql3
-rw-r--r--src/backend/replication/walsender.c71
2 files changed, 60 insertions, 14 deletions
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql
index 715995dd883..8630542bb34 100644
--- a/src/backend/catalog/system_views.sql
+++ b/src/backend/catalog/system_views.sql
@@ -734,7 +734,8 @@ CREATE VIEW pg_stat_replication AS
W.flush_lag,
W.replay_lag,
W.sync_priority,
- W.sync_state
+ W.sync_state,
+ W.reply_time
FROM pg_stat_get_activity(NULL) AS S
JOIN pg_stat_get_wal_senders() AS W ON (S.pid = W.pid)
LEFT JOIN pg_authid AS U ON (S.usesysid = U.oid);
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 46edb525e88..d1a8113cb66 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -1763,6 +1763,7 @@ ProcessStandbyReplyMessage(void)
applyLag;
bool clearLagTimes;
TimestampTz now;
+ TimestampTz replyTime;
static bool fullyAppliedLastTime = false;
@@ -1770,14 +1771,25 @@ ProcessStandbyReplyMessage(void)
writePtr = pq_getmsgint64(&reply_message);
flushPtr = pq_getmsgint64(&reply_message);
applyPtr = pq_getmsgint64(&reply_message);
- (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */
+ replyTime = pq_getmsgint64(&reply_message);
replyRequested = pq_getmsgbyte(&reply_message);
- elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s",
- (uint32) (writePtr >> 32), (uint32) writePtr,
- (uint32) (flushPtr >> 32), (uint32) flushPtr,
- (uint32) (applyPtr >> 32), (uint32) applyPtr,
- replyRequested ? " (reply requested)" : "");
+ if (log_min_messages <= DEBUG2)
+ {
+ char *replyTimeStr;
+
+ /* Copy because timestamptz_to_str returns a static buffer */
+ replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
+
+ elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s reply_time %s",
+ (uint32) (writePtr >> 32), (uint32) writePtr,
+ (uint32) (flushPtr >> 32), (uint32) flushPtr,
+ (uint32) (applyPtr >> 32), (uint32) applyPtr,
+ replyRequested ? " (reply requested)" : "",
+ replyTimeStr);
+
+ pfree(replyTimeStr);
+ }
/* See if we can compute the round-trip lag for these positions. */
now = GetCurrentTimestamp();
@@ -1824,6 +1836,7 @@ ProcessStandbyReplyMessage(void)
walsnd->flushLag = flushLag;
if (applyLag != -1 || clearLagTimes)
walsnd->applyLag = applyLag;
+ walsnd->replyTime = replyTime;
SpinLockRelease(&walsnd->mutex);
}
@@ -1927,23 +1940,47 @@ ProcessStandbyHSFeedbackMessage(void)
uint32 feedbackEpoch;
TransactionId feedbackCatalogXmin;
uint32 feedbackCatalogEpoch;
+ TimestampTz replyTime;
/*
* Decipher the reply message. The caller already consumed the msgtype
* byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
* of this message.
*/
- (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */
+ replyTime = pq_getmsgint64(&reply_message);
feedbackXmin = pq_getmsgint(&reply_message, 4);
feedbackEpoch = pq_getmsgint(&reply_message, 4);
feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
- elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u",
- feedbackXmin,
- feedbackEpoch,
- feedbackCatalogXmin,
- feedbackCatalogEpoch);
+ if (log_min_messages <= DEBUG2)
+ {
+ char *replyTimeStr;
+
+ /* Copy because timestamptz_to_str returns a static buffer */
+ replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
+
+ elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u reply_time %s",
+ feedbackXmin,
+ feedbackEpoch,
+ feedbackCatalogXmin,
+ feedbackCatalogEpoch,
+ replyTimeStr);
+
+ pfree(replyTimeStr);
+ }
+
+ /*
+ * Update shared state for this WalSender process based on reply data from
+ * standby.
+ */
+ {
+ WalSnd *walsnd = MyWalSnd;
+
+ SpinLockAcquire(&walsnd->mutex);
+ walsnd->replyTime = replyTime;
+ SpinLockRelease(&walsnd->mutex);
+ }
/*
* Unset WalSender's xmins if the feedback message values are invalid.
@@ -2265,6 +2302,7 @@ InitWalSenderSlot(void)
walsnd->applyLag = -1;
walsnd->state = WALSNDSTATE_STARTUP;
walsnd->latch = &MyProc->procLatch;
+ walsnd->replyTime = 0;
SpinLockRelease(&walsnd->mutex);
/* don't need the lock anymore */
MyWalSnd = (WalSnd *) walsnd;
@@ -3179,7 +3217,7 @@ offset_to_interval(TimeOffset offset)
Datum
pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
{
-#define PG_STAT_GET_WAL_SENDERS_COLS 11
+#define PG_STAT_GET_WAL_SENDERS_COLS 12
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc tupdesc;
Tuplestorestate *tupstore;
@@ -3233,6 +3271,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
int priority;
int pid;
WalSndState state;
+ TimestampTz replyTime;
Datum values[PG_STAT_GET_WAL_SENDERS_COLS];
bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
@@ -3252,6 +3291,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
flushLag = walsnd->flushLag;
applyLag = walsnd->applyLag;
priority = walsnd->sync_standby_priority;
+ replyTime = walsnd->replyTime;
SpinLockRelease(&walsnd->mutex);
memset(nulls, 0, sizeof(nulls));
@@ -3328,6 +3368,11 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
else
values[10] = CStringGetTextDatum("potential");
+
+ if (replyTime == 0)
+ nulls[11] = true;
+ else
+ values[11] = TimestampTzGetDatum(replyTime);
}
tuplestore_putvalues(tupstore, tupdesc, values, nulls);