diff options
Diffstat (limited to 'src/backend/replication/walsender.c')
-rw-r--r-- | src/backend/replication/walsender.c | 72 |
1 files changed, 57 insertions, 15 deletions
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index cffb3482adf..75400a53f2f 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -242,14 +242,16 @@ static void ProcessStandbyMessage(void); static void ProcessStandbyReplyMessage(void); static void ProcessStandbyHSFeedbackMessage(void); static void ProcessRepliesIfAny(void); -static void WalSndKeepalive(bool requestReply); +static void ProcessPendingWrites(void); +static void WalSndKeepalive(bool requestReply, XLogRecPtr writePtr); static void WalSndKeepaliveIfNecessary(void); static void WalSndCheckTimeOut(void); static long WalSndComputeSleeptime(TimestampTz now); static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event); static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write); static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write); -static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid); +static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, + bool skipped_xact); static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc); static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time); static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now); @@ -1399,6 +1401,16 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, } /* If we have pending write here, go to slow path */ + ProcessPendingWrites(); +} + +/* + * Wait until there is no pending write. Also process replies from the other + * side and check timeouts during that. + */ +static void +ProcessPendingWrites(void) +{ for (;;) { long sleeptime; @@ -1447,9 +1459,12 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, * LogicalDecodingContext 'update_progress' callback. * * Write the current position to the lag tracker (see XLogSendPhysical). + * + * When skipping empty transactions, send a keepalive message if necessary. */ static void -WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid) +WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, + bool skipped_xact) { static TimestampTz sendTime = 0; TimestampTz now = GetCurrentTimestamp(); @@ -1459,12 +1474,35 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId * avoid flooding the lag tracker when we commit frequently. */ #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS 1000 - if (!TimestampDifferenceExceeds(sendTime, now, - WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS)) - return; + if (TimestampDifferenceExceeds(sendTime, now, + WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS)) + { + LagTrackerWrite(lsn, now); + sendTime = now; + } - LagTrackerWrite(lsn, now); - sendTime = now; + /* + * When skipping empty transactions in synchronous replication, we send a + * keepalive message to avoid delaying such transactions. + * + * It is okay to check sync_standbys_defined flag without lock here as + * in the worst case we will just send an extra keepalive message when it + * is really not required. + */ + if (skipped_xact && + SyncRepRequested() && + ((volatile WalSndCtlData *) WalSndCtl)->sync_standbys_defined) + { + WalSndKeepalive(false, lsn); + + /* Try to flush pending output to the client */ + if (pq_flush_if_writable() != 0) + WalSndShutdown(); + + /* If we have pending write here, make sure it's actually flushed */ + if (pq_is_send_pending()) + ProcessPendingWrites(); + } } /* @@ -1550,7 +1588,7 @@ WalSndWaitForWal(XLogRecPtr loc) if (MyWalSnd->flush < sentPtr && MyWalSnd->write < sentPtr && !waiting_for_ping_response) - WalSndKeepalive(false); + WalSndKeepalive(false, InvalidXLogRecPtr); /* check whether we're done */ if (loc <= RecentFlushPtr) @@ -2068,7 +2106,7 @@ ProcessStandbyReplyMessage(void) /* Send a reply if the standby requested one. */ if (replyRequested) - WalSndKeepalive(false); + WalSndKeepalive(false, InvalidXLogRecPtr); /* * Update shared state for this WalSender process based on reply data from @@ -3074,7 +3112,7 @@ WalSndDone(WalSndSendDataCallback send_data) proc_exit(0); } if (!waiting_for_ping_response) - WalSndKeepalive(true); + WalSndKeepalive(true, InvalidXLogRecPtr); } /* @@ -3563,18 +3601,22 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS) * * If requestReply is set, the message requests the other party to send * a message back to us, for heartbeat purposes. We also set a flag to - * let nearby code that we're waiting for that response, to avoid + * let nearby code know that we're waiting for that response, to avoid * repeated requests. + * + * writePtr is the location up to which the WAL is sent. It is essentially + * the same as sentPtr but in some cases, we need to send keep alive before + * sentPtr is updated like when skipping empty transactions. */ static void -WalSndKeepalive(bool requestReply) +WalSndKeepalive(bool requestReply, XLogRecPtr writePtr) { elog(DEBUG2, "sending replication keepalive"); /* construct the message... */ resetStringInfo(&output_message); pq_sendbyte(&output_message, 'k'); - pq_sendint64(&output_message, sentPtr); + pq_sendint64(&output_message, XLogRecPtrIsInvalid(writePtr) ? sentPtr : writePtr); pq_sendint64(&output_message, GetCurrentTimestamp()); pq_sendbyte(&output_message, requestReply ? 1 : 0); @@ -3613,7 +3655,7 @@ WalSndKeepaliveIfNecessary(void) wal_sender_timeout / 2); if (last_processing >= ping_time) { - WalSndKeepalive(true); + WalSndKeepalive(true, InvalidXLogRecPtr); /* Try to flush pending output to the client */ if (pq_flush_if_writable() != 0) |