diff options
author | Michael Paquier <michael@paquier.xyz> | 2018-12-09 16:35:06 +0900 |
---|---|---|
committer | Michael Paquier <michael@paquier.xyz> | 2018-12-09 16:35:06 +0900 |
commit | 7fee252f6fbf78ca5e50ee591573d59f98e75d37 (patch) | |
tree | 83d0719d6dc9fa454da312d3056a6b5c6e30cae4 /src/backend | |
parent | 1f66c657f2b4eb55c68593d5c14256115aa6a0ea (diff) | |
download | postgresql-7fee252f6fbf78ca5e50ee591573d59f98e75d37.tar.gz postgresql-7fee252f6fbf78ca5e50ee591573d59f98e75d37.zip |
Add timestamp of last received message from standby to pg_stat_replication
The timestamp generated by the standby at message transmission has been
included in the protocol since its introduction for both the status
update message and hot standby feedback message, but it has never
appeared in pg_stat_replication. Seeing this timestamp does not matter
much with a cluster which has a lot of activity, but on a mostly-idle
cluster, this makes monitoring able to react faster than the configured
timeouts.
Author: MyungKyu LIM
Reviewed-by: Michael Paquier, Masahiko Sawada
Discussion: https://postgr.es/m/1657809367.407321.1533027417725.JavaMail.jboss@ep2ml404
Diffstat (limited to 'src/backend')
-rw-r--r-- | src/backend/catalog/system_views.sql | 3 | ||||
-rw-r--r-- | src/backend/replication/walsender.c | 71 |
2 files changed, 60 insertions, 14 deletions
diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 715995dd883..8630542bb34 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -734,7 +734,8 @@ CREATE VIEW pg_stat_replication AS W.flush_lag, W.replay_lag, W.sync_priority, - W.sync_state + W.sync_state, + W.reply_time FROM pg_stat_get_activity(NULL) AS S JOIN pg_stat_get_wal_senders() AS W ON (S.pid = W.pid) LEFT JOIN pg_authid AS U ON (S.usesysid = U.oid); diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 46edb525e88..d1a8113cb66 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1763,6 +1763,7 @@ ProcessStandbyReplyMessage(void) applyLag; bool clearLagTimes; TimestampTz now; + TimestampTz replyTime; static bool fullyAppliedLastTime = false; @@ -1770,14 +1771,25 @@ ProcessStandbyReplyMessage(void) writePtr = pq_getmsgint64(&reply_message); flushPtr = pq_getmsgint64(&reply_message); applyPtr = pq_getmsgint64(&reply_message); - (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */ + replyTime = pq_getmsgint64(&reply_message); replyRequested = pq_getmsgbyte(&reply_message); - elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s", - (uint32) (writePtr >> 32), (uint32) writePtr, - (uint32) (flushPtr >> 32), (uint32) flushPtr, - (uint32) (applyPtr >> 32), (uint32) applyPtr, - replyRequested ? " (reply requested)" : ""); + if (log_min_messages <= DEBUG2) + { + char *replyTimeStr; + + /* Copy because timestamptz_to_str returns a static buffer */ + replyTimeStr = pstrdup(timestamptz_to_str(replyTime)); + + elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s reply_time %s", + (uint32) (writePtr >> 32), (uint32) writePtr, + (uint32) (flushPtr >> 32), (uint32) flushPtr, + (uint32) (applyPtr >> 32), (uint32) applyPtr, + replyRequested ? " (reply requested)" : "", + replyTimeStr); + + pfree(replyTimeStr); + } /* See if we can compute the round-trip lag for these positions. */ now = GetCurrentTimestamp(); @@ -1824,6 +1836,7 @@ ProcessStandbyReplyMessage(void) walsnd->flushLag = flushLag; if (applyLag != -1 || clearLagTimes) walsnd->applyLag = applyLag; + walsnd->replyTime = replyTime; SpinLockRelease(&walsnd->mutex); } @@ -1927,23 +1940,47 @@ ProcessStandbyHSFeedbackMessage(void) uint32 feedbackEpoch; TransactionId feedbackCatalogXmin; uint32 feedbackCatalogEpoch; + TimestampTz replyTime; /* * Decipher the reply message. The caller already consumed the msgtype * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation * of this message. */ - (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */ + replyTime = pq_getmsgint64(&reply_message); feedbackXmin = pq_getmsgint(&reply_message, 4); feedbackEpoch = pq_getmsgint(&reply_message, 4); feedbackCatalogXmin = pq_getmsgint(&reply_message, 4); feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4); - elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u", - feedbackXmin, - feedbackEpoch, - feedbackCatalogXmin, - feedbackCatalogEpoch); + if (log_min_messages <= DEBUG2) + { + char *replyTimeStr; + + /* Copy because timestamptz_to_str returns a static buffer */ + replyTimeStr = pstrdup(timestamptz_to_str(replyTime)); + + elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u reply_time %s", + feedbackXmin, + feedbackEpoch, + feedbackCatalogXmin, + feedbackCatalogEpoch, + replyTimeStr); + + pfree(replyTimeStr); + } + + /* + * Update shared state for this WalSender process based on reply data from + * standby. + */ + { + WalSnd *walsnd = MyWalSnd; + + SpinLockAcquire(&walsnd->mutex); + walsnd->replyTime = replyTime; + SpinLockRelease(&walsnd->mutex); + } /* * Unset WalSender's xmins if the feedback message values are invalid. @@ -2265,6 +2302,7 @@ InitWalSenderSlot(void) walsnd->applyLag = -1; walsnd->state = WALSNDSTATE_STARTUP; walsnd->latch = &MyProc->procLatch; + walsnd->replyTime = 0; SpinLockRelease(&walsnd->mutex); /* don't need the lock anymore */ MyWalSnd = (WalSnd *) walsnd; @@ -3179,7 +3217,7 @@ offset_to_interval(TimeOffset offset) Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_WAL_SENDERS_COLS 11 +#define PG_STAT_GET_WAL_SENDERS_COLS 12 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc tupdesc; Tuplestorestate *tupstore; @@ -3233,6 +3271,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) int priority; int pid; WalSndState state; + TimestampTz replyTime; Datum values[PG_STAT_GET_WAL_SENDERS_COLS]; bool nulls[PG_STAT_GET_WAL_SENDERS_COLS]; @@ -3252,6 +3291,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) flushLag = walsnd->flushLag; applyLag = walsnd->applyLag; priority = walsnd->sync_standby_priority; + replyTime = walsnd->replyTime; SpinLockRelease(&walsnd->mutex); memset(nulls, 0, sizeof(nulls)); @@ -3328,6 +3368,11 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) CStringGetTextDatum("sync") : CStringGetTextDatum("quorum"); else values[10] = CStringGetTextDatum("potential"); + + if (replyTime == 0) + nulls[11] = true; + else + values[11] = TimestampTzGetDatum(replyTime); } tuplestore_putvalues(tupstore, tupdesc, values, nulls); |