aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/walsender.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/walsender.c')
-rw-r--r--src/backend/replication/walsender.c72
1 files changed, 57 insertions, 15 deletions
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index cffb3482adf..75400a53f2f 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -242,14 +242,16 @@ static void ProcessStandbyMessage(void);
static void ProcessStandbyReplyMessage(void);
static void ProcessStandbyHSFeedbackMessage(void);
static void ProcessRepliesIfAny(void);
-static void WalSndKeepalive(bool requestReply);
+static void ProcessPendingWrites(void);
+static void WalSndKeepalive(bool requestReply, XLogRecPtr writePtr);
static void WalSndKeepaliveIfNecessary(void);
static void WalSndCheckTimeOut(void);
static long WalSndComputeSleeptime(TimestampTz now);
static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event);
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
-static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid);
+static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
+ bool skipped_xact);
static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
@@ -1399,6 +1401,16 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
}
/* If we have pending write here, go to slow path */
+ ProcessPendingWrites();
+}
+
+/*
+ * Wait until there is no pending write. Also process replies from the other
+ * side and check timeouts during that.
+ */
+static void
+ProcessPendingWrites(void)
+{
for (;;)
{
long sleeptime;
@@ -1447,9 +1459,12 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
* LogicalDecodingContext 'update_progress' callback.
*
* Write the current position to the lag tracker (see XLogSendPhysical).
+ *
+ * When skipping empty transactions, send a keepalive message if necessary.
*/
static void
-WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
+WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
+ bool skipped_xact)
{
static TimestampTz sendTime = 0;
TimestampTz now = GetCurrentTimestamp();
@@ -1459,12 +1474,35 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId
* avoid flooding the lag tracker when we commit frequently.
*/
#define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000
- if (!TimestampDifferenceExceeds(sendTime, now,
- WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
- return;
+ if (TimestampDifferenceExceeds(sendTime, now,
+ WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
+ {
+ LagTrackerWrite(lsn, now);
+ sendTime = now;
+ }
- LagTrackerWrite(lsn, now);
- sendTime = now;
+ /*
+ * When skipping empty transactions in synchronous replication, we send a
+ * keepalive message to avoid delaying such transactions.
+ *
+ * It is okay to check sync_standbys_defined flag without lock here as
+ * in the worst case we will just send an extra keepalive message when it
+ * is really not required.
+ */
+ if (skipped_xact &&
+ SyncRepRequested() &&
+ ((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_defined)
+ {
+ WalSndKeepalive(false, lsn);
+
+ /* Try to flush pending output to the client */
+ if (pq_flush_if_writable() != 0)
+ WalSndShutdown();
+
+ /* If we have pending write here, make sure it's actually flushed */
+ if (pq_is_send_pending())
+ ProcessPendingWrites();
+ }
}
/*
@@ -1550,7 +1588,7 @@ WalSndWaitForWal(XLogRecPtr loc)
if (MyWalSnd->flush < sentPtr &&
MyWalSnd->write < sentPtr &&
!waiting_for_ping_response)
- WalSndKeepalive(false);
+ WalSndKeepalive(false, InvalidXLogRecPtr);
/* check whether we're done */
if (loc <= RecentFlushPtr)
@@ -2068,7 +2106,7 @@ ProcessStandbyReplyMessage(void)
/* Send a reply if the standby requested one. */
if (replyRequested)
- WalSndKeepalive(false);
+ WalSndKeepalive(false, InvalidXLogRecPtr);
/*
* Update shared state for this WalSender process based on reply data from
@@ -3074,7 +3112,7 @@ WalSndDone(WalSndSendDataCallback send_data)
proc_exit(0);
}
if (!waiting_for_ping_response)
- WalSndKeepalive(true);
+ WalSndKeepalive(true, InvalidXLogRecPtr);
}
/*
@@ -3563,18 +3601,22 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
*
* If requestReply is set, the message requests the other party to send
* a message back to us, for heartbeat purposes. We also set a flag to
- * let nearby code that we're waiting for that response, to avoid
+ * let nearby code know that we're waiting for that response, to avoid
* repeated requests.
+ *
+ * writePtr is the location up to which the WAL is sent. It is essentially
+ * the same as sentPtr but in some cases, we need to send keep alive before
+ * sentPtr is updated like when skipping empty transactions.
*/
static void
-WalSndKeepalive(bool requestReply)
+WalSndKeepalive(bool requestReply, XLogRecPtr writePtr)
{
elog(DEBUG2, "sending replication keepalive");
/* construct the message... */
resetStringInfo(&output_message);
pq_sendbyte(&output_message, 'k');
- pq_sendint64(&output_message, sentPtr);
+ pq_sendint64(&output_message, XLogRecPtrIsInvalid(writePtr) ? sentPtr : writePtr);
pq_sendint64(&output_message, GetCurrentTimestamp());
pq_sendbyte(&output_message, requestReply ? 1 : 0);
@@ -3613,7 +3655,7 @@ WalSndKeepaliveIfNecessary(void)
wal_sender_timeout / 2);
if (last_processing >= ping_time)
{
- WalSndKeepalive(true);
+ WalSndKeepalive(true, InvalidXLogRecPtr);
/* Try to flush pending output to the client */
if (pq_flush_if_writable() != 0)