diff options
Diffstat (limited to 'src/backend')
-rw-r--r-- | src/backend/catalog/system_views.sql | 3 | ||||
-rw-r--r-- | src/backend/replication/walsender.c | 71 |
2 files changed, 60 insertions, 14 deletions
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 715995dd883..8630542bb34 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -734,7 +734,8 @@ CREATE VIEW pg_stat_replication AS W.flush_lag, W.replay_lag, W.sync_priority, - W.sync_state + W.sync_state, + W.reply_time FROM pg_stat_get_activity(NULL) AS S JOIN pg_stat_get_wal_senders() AS W ON (S.pid = W.pid) LEFT JOIN pg_authid AS U ON (S.usesysid = U.oid); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 46edb525e88..d1a8113cb66 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1763,6 +1763,7 @@ ProcessStandbyReplyMessage(void) applyLag; bool clearLagTimes; TimestampTz now; + TimestampTz replyTime; static bool fullyAppliedLastTime = false; @@ -1770,14 +1771,25 @@ ProcessStandbyReplyMessage(void) writePtr = pq_getmsgint64(&reply_message); flushPtr = pq_getmsgint64(&reply_message); applyPtr = pq_getmsgint64(&reply_message); - (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */ + replyTime = pq_getmsgint64(&reply_message); replyRequested = pq_getmsgbyte(&reply_message); - elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s", - (uint32) (writePtr >> 32), (uint32) writePtr, - (uint32) (flushPtr >> 32), (uint32) flushPtr, - (uint32) (applyPtr >> 32), (uint32) applyPtr, - replyRequested ? " (reply requested)" : ""); + if (log_min_messages <= DEBUG2) + { + char *replyTimeStr; + + /* Copy because timestamptz_to_str returns a static buffer */ + replyTimeStr = pstrdup(timestamptz_to_str(replyTime)); + + elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s reply_time %s", + (uint32) (writePtr >> 32), (uint32) writePtr, + (uint32) (flushPtr >> 32), (uint32) flushPtr, + (uint32) (applyPtr >> 32), (uint32) applyPtr, + replyRequested ? " (reply requested)" : "", + replyTimeStr); + + pfree(replyTimeStr); + } /* See if we can compute the round-trip lag for these positions. */ now = GetCurrentTimestamp(); @@ -1824,6 +1836,7 @@ ProcessStandbyReplyMessage(void) walsnd->flushLag = flushLag; if (applyLag != -1 || clearLagTimes) walsnd->applyLag = applyLag; + walsnd->replyTime = replyTime; SpinLockRelease(&walsnd->mutex); } @@ -1927,23 +1940,47 @@ ProcessStandbyHSFeedbackMessage(void) uint32 feedbackEpoch; TransactionId feedbackCatalogXmin; uint32 feedbackCatalogEpoch; + TimestampTz replyTime; /* * Decipher the reply message. The caller already consumed the msgtype * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation * of this message. */ - (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */ + replyTime = pq_getmsgint64(&reply_message); feedbackXmin = pq_getmsgint(&reply_message, 4); feedbackEpoch = pq_getmsgint(&reply_message, 4); feedbackCatalogXmin = pq_getmsgint(&reply_message, 4); feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4); - elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u", - feedbackXmin, - feedbackEpoch, - feedbackCatalogXmin, - feedbackCatalogEpoch); + if (log_min_messages <= DEBUG2) + { + char *replyTimeStr; + + /* Copy because timestamptz_to_str returns a static buffer */ + replyTimeStr = pstrdup(timestamptz_to_str(replyTime)); + + elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u reply_time %s", + feedbackXmin, + feedbackEpoch, + feedbackCatalogXmin, + feedbackCatalogEpoch, + replyTimeStr); + + pfree(replyTimeStr); + } + + /* + * Update shared state for this WalSender process based on reply data from + * standby. + */ + { + WalSnd *walsnd = MyWalSnd; + + SpinLockAcquire(&walsnd->mutex); + walsnd->replyTime = replyTime; + SpinLockRelease(&walsnd->mutex); + } /* * Unset WalSender's xmins if the feedback message values are invalid. @@ -2265,6 +2302,7 @@ InitWalSenderSlot(void) walsnd->applyLag = -1; walsnd->state = WALSNDSTATE_STARTUP; walsnd->latch = &MyProc->procLatch; + walsnd->replyTime = 0; SpinLockRelease(&walsnd->mutex); /* don't need the lock anymore */ MyWalSnd = (WalSnd *) walsnd; @@ -3179,7 +3217,7 @@ offset_to_interval(TimeOffset offset) Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_WAL_SENDERS_COLS 11 +#define PG_STAT_GET_WAL_SENDERS_COLS 12 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc tupdesc; Tuplestorestate *tupstore; @@ -3233,6 +3271,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) int priority; int pid; WalSndState state; + TimestampTz replyTime; Datum values[PG_STAT_GET_WAL_SENDERS_COLS]; bool nulls[PG_STAT_GET_WAL_SENDERS_COLS]; @@ -3252,6 +3291,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) flushLag = walsnd->flushLag; applyLag = walsnd->applyLag; priority = walsnd->sync_standby_priority; + replyTime = walsnd->replyTime; SpinLockRelease(&walsnd->mutex); memset(nulls, 0, sizeof(nulls)); @@ -3328,6 +3368,11 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) CStringGetTextDatum("sync") : CStringGetTextDatum("quorum"); else values[10] = CStringGetTextDatum("potential"); + + if (replyTime == 0) + nulls[11] = true; + else + values[11] = TimestampTzGetDatum(replyTime); } tuplestore_putvalues(tupstore, tupdesc, values, nulls); |