aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/walsender.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/walsender.c')
-rw-r--r--src/backend/replication/walsender.c45
1 files changed, 36 insertions, 9 deletions
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 45d027803ab..e4e5337d549 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -245,7 +245,9 @@ static void WalSndCheckTimeOut(TimestampTz now);
static long WalSndComputeSleeptime(TimestampTz now);
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
+static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid);
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
+static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
@@ -923,7 +925,8 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
logical_read_xlog_page,
- WalSndPrepareWrite, WalSndWriteData);
+ WalSndPrepareWrite, WalSndWriteData,
+ WalSndUpdateProgress);
/*
* Signal that we don't need the timeout mechanism. We're just
@@ -1077,10 +1080,11 @@ StartLogicalReplication(StartReplicationCmd *cmd)
* Initialize position to the last ack'ed one, then the xlog records begin
* to be shipped from that position.
*/
- logical_decoding_ctx = CreateDecodingContext(
- cmd->startpoint, cmd->options,
+ logical_decoding_ctx = CreateDecodingContext(cmd->startpoint, cmd->options,
logical_read_xlog_page,
- WalSndPrepareWrite, WalSndWriteData);
+ WalSndPrepareWrite,
+ WalSndWriteData,
+ WalSndUpdateProgress);
/* Start reading WAL from the oldest required WAL. */
logical_startptr = MyReplicationSlot->data.restart_lsn;
@@ -1240,6 +1244,30 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
}
/*
+ * LogicalDecodingContext 'progress_update' callback.
+ *
+ * Write the current position to the log tracker (see XLogSendPhysical).
+ */
+static void
+WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
+{
+ static TimestampTz sendTime = 0;
+ TimestampTz now = GetCurrentTimestamp();
+
+ /*
+ * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS
+ * to avoid flooding the lag tracker when we commit frequently.
+ */
+#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
+ if (!TimestampDifferenceExceeds(sendTime, now,
+ WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
+ return;
+
+ LagTrackerWrite(lsn, now);
+ sendTime = now;
+}
+
+/*
* Wait till WAL < loc is flushed to disk so it can be safely read.
*/
static XLogRecPtr
@@ -2730,9 +2758,9 @@ XLogSendLogical(void)
if (record != NULL)
{
/*
- * Note the lack of any call to LagTrackerWrite() which is the responsibility
- * of the logical decoding plugin. Response messages are handled normally,
- * so this responsibility does not extend to needing to call LagTrackerRead().
+ * Note the lack of any call to LagTrackerWrite() which is handled
+ * by WalSndUpdateProgress which is called by output plugin through
+ * logical decoding write api.
*/
LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader);
@@ -3328,9 +3356,8 @@ WalSndKeepaliveIfNecessary(TimestampTz now)
* LagTrackerRead can compute the elapsed time (lag) when this WAL position is
* eventually reported to have been written, flushed and applied by the
* standby in a reply message.
- * Exported to allow logical decoding plugins to call this when they choose.
*/
-void
+static void
LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
{
bool buffer_full;