diff options
Diffstat (limited to 'src/backend/replication/walsender.c')
-rw-r--r-- | src/backend/replication/walsender.c | 45 |
1 files changed, 36 insertions, 9 deletions
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 45d027803ab..e4e5337d549 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -245,7 +245,9 @@ static void WalSndCheckTimeOut(TimestampTz now); static long WalSndComputeSleeptime(TimestampTz now); static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write); static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write); +static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid); static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc); +static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time); static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now); static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch); @@ -923,7 +925,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot, logical_read_xlog_page, - WalSndPrepareWrite, WalSndWriteData); + WalSndPrepareWrite, WalSndWriteData, + WalSndUpdateProgress); /* * Signal that we don't need the timeout mechanism. We're just @@ -1077,10 +1080,11 @@ StartLogicalReplication(StartReplicationCmd *cmd) * Initialize position to the last ack'ed one, then the xlog records begin * to be shipped from that position. */ - logical_decoding_ctx = CreateDecodingContext( - cmd->startpoint, cmd->options, + logical_decoding_ctx = CreateDecodingContext(cmd->startpoint, cmd->options, logical_read_xlog_page, - WalSndPrepareWrite, WalSndWriteData); + WalSndPrepareWrite, + WalSndWriteData, + WalSndUpdateProgress); /* Start reading WAL from the oldest required WAL. */ logical_startptr = MyReplicationSlot->data.restart_lsn; @@ -1240,6 +1244,30 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, } /* + * LogicalDecodingContext 'progress_update' callback. + * + * Write the current position to the log tracker (see XLogSendPhysical). + */ +static void +WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid) +{ + static TimestampTz sendTime = 0; + TimestampTz now = GetCurrentTimestamp(); + + /* + * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS + * to avoid flooding the lag tracker when we commit frequently. + */ +#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000 + if (!TimestampDifferenceExceeds(sendTime, now, + WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS)) + return; + + LagTrackerWrite(lsn, now); + sendTime = now; +} + +/* * Wait till WAL < loc is flushed to disk so it can be safely read. */ static XLogRecPtr @@ -2730,9 +2758,9 @@ XLogSendLogical(void) if (record != NULL) { /* - * Note the lack of any call to LagTrackerWrite() which is the responsibility - * of the logical decoding plugin. Response messages are handled normally, - * so this responsibility does not extend to needing to call LagTrackerRead(). + * Note the lack of any call to LagTrackerWrite() which is handled + * by WalSndUpdateProgress which is called by output plugin through + * logical decoding write api. */ LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader); @@ -3328,9 +3356,8 @@ WalSndKeepaliveIfNecessary(TimestampTz now) * LagTrackerRead can compute the elapsed time (lag) when this WAL position is * eventually reported to have been written, flushed and applied by the * standby in a reply message. - * Exported to allow logical decoding plugins to call this when they choose. */ -void +static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time) { bool buffer_full; |