diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/access/transam/xlog.c | 14 | ||||
-rw-r--r-- | src/backend/catalog/system_views.sql | 3 | ||||
-rw-r--r-- | src/backend/replication/walsender.c | 277 | ||||
-rw-r--r-- | src/include/catalog/pg_proc.h | 2 | ||||
-rw-r--r-- | src/include/replication/logical.h | 2 | ||||
-rw-r--r-- | src/include/replication/walsender_private.h | 5 | ||||
-rw-r--r-- | src/test/regress/expected/rules.out | 5 |
7 files changed, 301 insertions, 7 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 3924738a336..de1937e013d 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -11555,6 +11555,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, { static TimestampTz last_fail_time = 0; TimestampTz now; + bool streaming_reply_sent = false; /*------- * Standby mode is implemented by a state machine: @@ -11878,6 +11879,19 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess, } /* + * Since we have replayed everything we have received so + * far and are about to start waiting for more WAL, let's + * tell the upstream server our replay location now so + * that pg_stat_replication doesn't show stale + * information. + */ + if (!streaming_reply_sent) + { + WalRcvForceReply(); + streaming_reply_sent = true; + } + + /* * Wait for more WAL to arrive. Time out after 5 seconds * to react to a trigger file promptly. */ diff --git a/src/backend/catalog/system_views.sql b/src/backend/catalog/system_views.sql index 5723714fb97..80d14296de2 100644 --- a/src/backend/catalog/system_views.sql +++ b/src/backend/catalog/system_views.sql @@ -705,6 +705,9 @@ CREATE VIEW pg_stat_replication AS W.write_location, W.flush_location, W.replay_location, + W.write_lag, + W.flush_lag, + W.replay_lag, W.sync_priority, W.sync_state FROM pg_stat_get_activity(NULL) AS S diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index c6ba916c49b..a29d0e7cf4b 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -190,6 +190,26 @@ static volatile sig_atomic_t replication_active = false; static LogicalDecodingContext *logical_decoding_ctx = NULL; static XLogRecPtr logical_startptr = InvalidXLogRecPtr; +/* A sample associating a log position with the time it was written. */ +typedef struct +{ + XLogRecPtr lsn; + TimestampTz time; +} WalTimeSample; + +/* The size of our buffer of time samples. */ +#define LAG_TRACKER_BUFFER_SIZE 8192 + +/* A mechanism for tracking replication lag. */ +static struct +{ + XLogRecPtr last_lsn; + WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE]; + int write_head; + int read_heads[NUM_SYNC_REP_WAIT_MODE]; + WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE]; +} LagTracker; + /* Signal handlers */ static void WalSndSigHupHandler(SIGNAL_ARGS); static void WalSndXLogSendHandler(SIGNAL_ARGS); @@ -221,6 +241,7 @@ static long WalSndComputeSleeptime(TimestampTz now); static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write); static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write); static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc); +static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now); static void XLogRead(char *buf, XLogRecPtr startptr, Size count); @@ -246,6 +267,9 @@ InitWalSender(void) */ MarkPostmasterChildWalSender(); SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE); + + /* Initialize empty timestamp buffer for lag tracking. */ + memset(&LagTracker, 0, sizeof(LagTracker)); } /* @@ -1646,6 +1670,13 @@ ProcessStandbyReplyMessage(void) flushPtr, applyPtr; bool replyRequested; + TimeOffset writeLag, + flushLag, + applyLag; + bool clearLagTimes; + TimestampTz now; + + static bool fullyAppliedLastTime = false; /* the caller already consumed the msgtype byte */ writePtr = pq_getmsgint64(&reply_message); @@ -1660,6 +1691,30 @@ ProcessStandbyReplyMessage(void) (uint32) (applyPtr >> 32), (uint32) applyPtr, replyRequested ? " (reply requested)" : ""); + /* See if we can compute the round-trip lag for these positions. */ + now = GetCurrentTimestamp(); + writeLag = LagTrackerRead(SYNC_REP_WAIT_WRITE, writePtr, now); + flushLag = LagTrackerRead(SYNC_REP_WAIT_FLUSH, flushPtr, now); + applyLag = LagTrackerRead(SYNC_REP_WAIT_APPLY, applyPtr, now); + + /* + * If the standby reports that it has fully replayed the WAL in two + * consecutive reply messages, then the second such message must result + * from wal_receiver_status_interval expiring on the standby. This is a + * convenient time to forget the lag times measured when it last + * wrote/flushed/applied a WAL record, to avoid displaying stale lag data + * until more WAL traffic arrives. + */ + clearLagTimes = false; + if (applyPtr == sentPtr) + { + if (fullyAppliedLastTime) + clearLagTimes = true; + fullyAppliedLastTime = true; + } + else + fullyAppliedLastTime = false; + /* Send a reply if the standby requested one. */ if (replyRequested) WalSndKeepalive(false); @@ -1675,6 +1730,12 @@ ProcessStandbyReplyMessage(void) walsnd->write = writePtr; walsnd->flush = flushPtr; walsnd->apply = applyPtr; + if (writeLag != -1 || clearLagTimes) + walsnd->writeLag = writeLag; + if (flushLag != -1 || clearLagTimes) + walsnd->flushLag = flushLag; + if (applyLag != -1 || clearLagTimes) + walsnd->applyLag = applyLag; SpinLockRelease(&walsnd->mutex); } @@ -2063,6 +2124,9 @@ InitWalSenderSlot(void) walsnd->write = InvalidXLogRecPtr; walsnd->flush = InvalidXLogRecPtr; walsnd->apply = InvalidXLogRecPtr; + walsnd->writeLag = -1; + walsnd->flushLag = -1; + walsnd->applyLag = -1; walsnd->state = WALSNDSTATE_STARTUP; walsnd->latch = &MyProc->procLatch; SpinLockRelease(&walsnd->mutex); @@ -2390,6 +2454,32 @@ XLogSendPhysical(void) } /* + * Record the current system time as an approximation of the time at which + * this WAL position was written for the purposes of lag tracking. + * + * In theory we could make XLogFlush() record a time in shmem whenever WAL + * is flushed and we could get that time as well as the LSN when we call + * GetFlushRecPtr() above (and likewise for the cascading standby + * equivalent), but rather than putting any new code into the hot WAL path + * it seems good enough to capture the time here. We should reach this + * after XLogFlush() runs WalSndWakeupProcessRequests(), and although that + * may take some time, we read the WAL flush pointer and take the time + * very close to together here so that we'll get a later position if it + * is still moving. + * + * Because LagTrackerWriter ignores samples when the LSN hasn't advanced, + * this gives us a cheap approximation for the WAL flush time for this + * LSN. + * + * Note that the LSN is not necessarily the LSN for the data contained in + * the present message; it's the end of the the WAL, which might be + * further ahead. All the lag tracking machinery cares about is finding + * out when that arbitrary LSN is eventually reported as written, flushed + * and applied, so that it can measure the elapsed time. + */ + LagTrackerWrite(SendRqstPtr, GetCurrentTimestamp()); + + /* * If this is a historic timeline and we've reached the point where we * forked to the next timeline, stop streaming. * @@ -2543,6 +2633,11 @@ XLogSendLogical(void) if (record != NULL) { + /* + * Note the lack of any call to LagTrackerWrite() which is the responsibility + * of the logical decoding plugin. Response messages are handled normally, + * so this responsibility does not extend to needing to call LagTrackerRead(). + */ LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader); sentPtr = logical_decoding_ctx->reader->EndRecPtr; @@ -2839,6 +2934,17 @@ WalSndGetStateString(WalSndState state) return "UNKNOWN"; } +static Interval * +offset_to_interval(TimeOffset offset) +{ + Interval *result = palloc(sizeof(Interval)); + + result->month = 0; + result->day = 0; + result->time = offset; + + return result; +} /* * Returns activity of walsenders, including pids and xlog locations sent to @@ -2847,7 +2953,7 @@ WalSndGetStateString(WalSndState state) Datum pg_stat_get_wal_senders(PG_FUNCTION_ARGS) { -#define PG_STAT_GET_WAL_SENDERS_COLS 8 +#define PG_STAT_GET_WAL_SENDERS_COLS 11 ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc tupdesc; Tuplestorestate *tupstore; @@ -2895,6 +3001,9 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) XLogRecPtr write; XLogRecPtr flush; XLogRecPtr apply; + TimeOffset writeLag; + TimeOffset flushLag; + TimeOffset applyLag; int priority; WalSndState state; Datum values[PG_STAT_GET_WAL_SENDERS_COLS]; @@ -2909,6 +3018,9 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) write = walsnd->write; flush = walsnd->flush; apply = walsnd->apply; + writeLag = walsnd->writeLag; + flushLag = walsnd->flushLag; + applyLag = walsnd->applyLag; priority = walsnd->sync_standby_priority; SpinLockRelease(&walsnd->mutex); @@ -2950,7 +3062,22 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) */ priority = XLogRecPtrIsInvalid(walsnd->flush) ? 0 : priority; - values[6] = Int32GetDatum(priority); + if (writeLag < 0) + nulls[6] = true; + else + values[6] = IntervalPGetDatum(offset_to_interval(writeLag)); + + if (flushLag < 0) + nulls[7] = true; + else + values[7] = IntervalPGetDatum(offset_to_interval(flushLag)); + + if (applyLag < 0) + nulls[8] = true; + else + values[8] = IntervalPGetDatum(offset_to_interval(applyLag)); + + values[9] = Int32GetDatum(priority); /* * More easily understood version of standby state. This is purely @@ -2964,12 +3091,12 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) * states. We report just "quorum" for them. */ if (priority == 0) - values[7] = CStringGetTextDatum("async"); + values[10] = CStringGetTextDatum("async"); else if (list_member_int(sync_standbys, i)) - values[7] = SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY ? + values[10] = SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY ? CStringGetTextDatum("sync") : CStringGetTextDatum("quorum"); else - values[7] = CStringGetTextDatum("potential"); + values[10] = CStringGetTextDatum("potential"); } tuplestore_putvalues(tupstore, tupdesc, values, nulls); @@ -3037,3 +3164,143 @@ WalSndKeepaliveIfNecessary(TimestampTz now) WalSndShutdown(); } } + +/* + * Record the end of the WAL and the time it was flushed locally, so that + * LagTrackerRead can compute the elapsed time (lag) when this WAL position is + * eventually reported to have been written, flushed and applied by the + * standby in a reply message. + * Exported to allow logical decoding plugins to call this when they choose. + */ +void +LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time) +{ + bool buffer_full; + int new_write_head; + int i; + + if (!am_walsender) + return; + + /* + * If the lsn hasn't advanced since last time, then do nothing. This way + * we only record a new sample when new WAL has been written. + */ + if (LagTracker.last_lsn == lsn) + return; + LagTracker.last_lsn = lsn; + + /* + * If advancing the write head of the circular buffer would crash into any + * of the read heads, then the buffer is full. In other words, the + * slowest reader (presumably apply) is the one that controls the release + * of space. + */ + new_write_head = (LagTracker.write_head + 1) % LAG_TRACKER_BUFFER_SIZE; + buffer_full = false; + for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i) + { + if (new_write_head == LagTracker.read_heads[i]) + buffer_full = true; + } + + /* + * If the buffer is full, for now we just rewind by one slot and overwrite + * the last sample, as a simple (if somewhat uneven) way to lower the + * sampling rate. There may be better adaptive compaction algorithms. + */ + if (buffer_full) + { + new_write_head = LagTracker.write_head; + if (LagTracker.write_head > 0) + LagTracker.write_head--; + else + LagTracker.write_head = LAG_TRACKER_BUFFER_SIZE - 1; + } + + /* Store a sample at the current write head position. */ + LagTracker.buffer[LagTracker.write_head].lsn = lsn; + LagTracker.buffer[LagTracker.write_head].time = local_flush_time; + LagTracker.write_head = new_write_head; +} + +/* + * Find out how much time has elapsed between the moment WAL position 'lsn' + * (or the highest known earlier LSN) was flushed locally and the time 'now'. + * We have a separate read head for each of the reported LSN locations we + * receive in replies from standby; 'head' controls which read head is + * used. Whenever a read head crosses an LSN which was written into the + * lag buffer with LagTrackerWrite, we can use the associated timestamp to + * find out the time this LSN (or an earlier one) was flushed locally, and + * therefore compute the lag. + * + * Return -1 if no new sample data is available, and otherwise the elapsed + * time in microseconds. + */ +static TimeOffset +LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now) +{ + TimestampTz time = 0; + + /* Read all unread samples up to this LSN or end of buffer. */ + while (LagTracker.read_heads[head] != LagTracker.write_head && + LagTracker.buffer[LagTracker.read_heads[head]].lsn <= lsn) + { + time = LagTracker.buffer[LagTracker.read_heads[head]].time; + LagTracker.last_read[head] = + LagTracker.buffer[LagTracker.read_heads[head]]; + LagTracker.read_heads[head] = + (LagTracker.read_heads[head] + 1) % LAG_TRACKER_BUFFER_SIZE; + } + + if (time > now) + { + /* If the clock somehow went backwards, treat as not found. */ + return -1; + } + else if (time == 0) + { + /* + * We didn't cross a time. If there is a future sample that we + * haven't reached yet, and we've already reached at least one sample, + * let's interpolate the local flushed time. This is mainly useful for + * reporting a completely stuck apply position as having increasing + * lag, since otherwise we'd have to wait for it to eventually start + * moving again and cross one of our samples before we can show the + * lag increasing. + */ + if (LagTracker.read_heads[head] != LagTracker.write_head && + LagTracker.last_read[head].time != 0) + { + double fraction; + WalTimeSample prev = LagTracker.last_read[head]; + WalTimeSample next = LagTracker.buffer[LagTracker.read_heads[head]]; + + Assert(lsn >= prev.lsn); + Assert(prev.lsn < next.lsn); + + if (prev.time > next.time) + { + /* If the clock somehow went backwards, treat as not found. */ + return -1; + } + + /* See how far we are between the previous and next samples. */ + fraction = + (double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn); + + /* Scale the local flush time proportionally. */ + time = (TimestampTz) + ((double) prev.time + (next.time - prev.time) * fraction); + } + else + { + /* Couldn't interpolate due to lack of data. */ + return -1; + } + } + + /* Return the elapsed time since local flush time in microseconds. */ + Assert(time != 0); + return now - time; +} diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index 78c23e3f5d5..a5b415346b7 100644 --- a/src/include/catalog/pg_proc.h +++ b/src/include/catalog/pg_proc.h @@ -2804,7 +2804,7 @@ DATA(insert OID = 2022 ( pg_stat_get_activity PGNSP PGUID 12 1 100 0 0 f f f DESCR("statistics: information about currently active backends"); DATA(insert OID = 3318 ( pg_stat_get_progress_info PGNSP PGUID 12 1 100 0 0 f f f f t t s r 1 0 2249 "25" "{25,23,26,26,20,20,20,20,20,20,20,20,20,20}" "{i,o,o,o,o,o,o,o,o,o,o,o,o,o}" "{cmdtype,pid,datid,relid,param1,param2,param3,param4,param5,param6,param7,param8,param9,param10}" _null_ _null_ pg_stat_get_progress_info _null_ _null_ _null_ )); DESCR("statistics: information about progress of backends running maintenance command"); -DATA(insert OID = 3099 ( pg_stat_get_wal_senders PGNSP PGUID 12 1 10 0 0 f f f f f t s r 0 0 2249 "" "{23,25,3220,3220,3220,3220,23,25}" "{o,o,o,o,o,o,o,o}" "{pid,state,sent_location,write_location,flush_location,replay_location,sync_priority,sync_state}" _null_ _null_ pg_stat_get_wal_senders _null_ _null_ _null_ )); +DATA(insert OID = 3099 ( pg_stat_get_wal_senders PGNSP PGUID 12 1 10 0 0 f f f f f t s r 0 0 2249 "" "{23,25,3220,3220,3220,3220,1186,1186,1186,23,25}" "{o,o,o,o,o,o,o,o,o,o,o}" "{pid,state,sent_location,write_location,flush_location,replay_location,write_lag,flush_lag,replay_lag,sync_priority,sync_state}" _null_ _null_ pg_stat_get_wal_senders _null_ _null_ _null_ )); DESCR("statistics: information about currently active replication"); DATA(insert OID = 3317 ( pg_stat_get_wal_receiver PGNSP PGUID 12 1 0 0 0 f f f f f f s r 0 0 2249 "" "{23,25,3220,23,3220,23,1184,1184,3220,1184,25,25}" "{o,o,o,o,o,o,o,o,o,o,o,o}" "{pid,status,receive_start_lsn,receive_start_tli,received_lsn,received_tli,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,slot_name,conninfo}" _null_ _null_ pg_stat_get_wal_receiver _null_ _null_ _null_ )); DESCR("statistics: information about WAL receiver"); diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index d10dd2c90af..7d6c88efe34 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -106,6 +106,8 @@ extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn, XLogRecPtr restart_lsn); extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn); +extern void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time); + extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id); #endif diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h index 5e6ccfc57b2..2c59056cefd 100644 --- a/src/include/replication/walsender_private.h +++ b/src/include/replication/walsender_private.h @@ -47,6 +47,11 @@ typedef struct WalSnd XLogRecPtr flush; XLogRecPtr apply; + /* Measured lag times, or -1 for unknown/none. */ + TimeOffset writeLag; + TimeOffset flushLag; + TimeOffset applyLag; + /* Protects shared variables shown above. */ slock_t mutex; diff --git a/src/test/regress/expected/rules.out b/src/test/regress/expected/rules.out index f7c3a637b5d..c4c8450b830 100644 --- a/src/test/regress/expected/rules.out +++ b/src/test/regress/expected/rules.out @@ -1831,10 +1831,13 @@ pg_stat_replication| SELECT s.pid, w.write_location, w.flush_location, w.replay_location, + w.write_lag, + w.flush_lag, + w.replay_lag, w.sync_priority, w.sync_state FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, ssl, sslversion, sslcipher, sslbits, sslcompression, sslclientdn) - JOIN pg_stat_get_wal_senders() w(pid, state, sent_location, write_location, flush_location, replay_location, sync_priority, sync_state) ON ((s.pid = w.pid))) + JOIN pg_stat_get_wal_senders() w(pid, state, sent_location, write_location, flush_location, replay_location, write_lag, flush_lag, replay_lag, sync_priority, sync_state) ON ((s.pid = w.pid))) LEFT JOIN pg_authid u ON ((s.usesysid = u.oid))); pg_stat_ssl| SELECT s.pid, s.ssl, |