aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/replication/walreceiver.c171
-rw-r--r--src/tools/pgindent/typedefs.list1
2 files changed, 115 insertions, 57 deletions
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 6cbb67c92a3..8bd2ba37ddf 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -95,8 +95,6 @@ bool hot_standby_feedback;
static WalReceiverConn *wrconn = NULL;
WalReceiverFunctionsType *WalReceiverFunctions = NULL;
-#define NAPTIME_PER_CYCLE 100 /* max sleep time between cycles (100ms) */
-
/*
* These variables are used similarly to openLogFile/SegNo,
* but for walreceiver to write the XLOG. recvFileTLI is the TimeLineID
@@ -116,6 +114,23 @@ static struct
XLogRecPtr Flush; /* last byte + 1 flushed in the standby */
} LogstreamResult;
+/*
+ * Reasons to wake up and perform periodic tasks.
+ */
+typedef enum WalRcvWakeupReason
+{
+ WALRCV_WAKEUP_TERMINATE,
+ WALRCV_WAKEUP_PING,
+ WALRCV_WAKEUP_REPLY,
+ WALRCV_WAKEUP_HSFEEDBACK,
+ NUM_WALRCV_WAKEUPS
+} WalRcvWakeupReason;
+
+/*
+ * Wake up times for periodic tasks.
+ */
+static TimestampTz wakeup[NUM_WALRCV_WAKEUPS];
+
static StringInfoData reply_message;
static StringInfoData incoming_message;
@@ -132,6 +147,7 @@ static void XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli);
static void XLogWalRcvSendReply(bool force, bool requestReply);
static void XLogWalRcvSendHSFeedback(bool immed);
static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
+static void WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now);
/*
* Process any interrupts the walreceiver process may have received.
@@ -179,9 +195,7 @@ WalReceiverMain(void)
TimeLineID primaryTLI;
bool first_stream;
WalRcvData *walrcv = WalRcv;
- TimestampTz last_recv_timestamp;
- TimestampTz starttime;
- bool ping_sent;
+ TimestampTz now;
char *err;
char *sender_host = NULL;
int sender_port = 0;
@@ -192,7 +206,7 @@ WalReceiverMain(void)
*/
Assert(walrcv != NULL);
- starttime = GetCurrentTimestamp();
+ now = GetCurrentTimestamp();
/*
* Mark walreceiver as running in shared memory.
@@ -248,7 +262,7 @@ WalReceiverMain(void)
/* Initialise to a sanish value */
walrcv->lastMsgSendTime =
- walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = starttime;
+ walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = now;
/* Report the latch to use to awaken this process */
walrcv->latch = &MyProc->procLatch;
@@ -414,9 +428,10 @@ WalReceiverMain(void)
initStringInfo(&reply_message);
initStringInfo(&incoming_message);
- /* Initialize the last recv timestamp */
- last_recv_timestamp = GetCurrentTimestamp();
- ping_sent = false;
+ /* Initialize nap wakeup times. */
+ now = GetCurrentTimestamp();
+ for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
+ WalRcvComputeNextWakeup(i, now);
/* Loop until end-of-streaming or error */
for (;;)
@@ -426,6 +441,8 @@ WalReceiverMain(void)
bool endofwal = false;
pgsocket wait_fd = PGINVALID_SOCKET;
int rc;
+ TimestampTz nextWakeup;
+ int nap;
/*
* Exit walreceiver if we're not in recovery. This should not
@@ -443,11 +460,15 @@ WalReceiverMain(void)
{
ConfigReloadPending = false;
ProcessConfigFile(PGC_SIGHUP);
+ now = GetCurrentTimestamp();
+ for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
+ WalRcvComputeNextWakeup(i, now);
XLogWalRcvSendHSFeedback(true);
}
/* See if we can read data immediately */
len = walrcv_receive(wrconn, &buf, &wait_fd);
+ now = GetCurrentTimestamp();
if (len != 0)
{
/*
@@ -459,11 +480,12 @@ WalReceiverMain(void)
if (len > 0)
{
/*
- * Something was received from primary, so reset
- * timeout
+ * Something was received from primary, so adjust
+ * the ping and terminate wakeup times.
*/
- last_recv_timestamp = GetCurrentTimestamp();
- ping_sent = false;
+ WalRcvComputeNextWakeup(WALRCV_WAKEUP_TERMINATE,
+ now);
+ WalRcvComputeNextWakeup(WALRCV_WAKEUP_PING, now);
XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1,
startpointTLI);
}
@@ -480,6 +502,7 @@ WalReceiverMain(void)
break;
}
len = walrcv_receive(wrconn, &buf, &wait_fd);
+ now = GetCurrentTimestamp();
}
/* Let the primary know that we received some data. */
@@ -497,6 +520,20 @@ WalReceiverMain(void)
if (endofwal)
break;
+ /* Find the soonest wakeup time, to limit our nap. */
+ nextWakeup = PG_INT64_MAX;
+ for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
+ nextWakeup = Min(wakeup[i], nextWakeup);
+
+ /*
+ * Calculate the nap time. WaitLatchOrSocket() doesn't accept
+ * timeouts longer than INT_MAX milliseconds, so we limit the
+ * result accordingly. Also, we round up to the next
+ * millisecond to avoid waking up too early and spinning until
+ * one of the wakeup times.
+ */
+ nap = (int) Min(INT_MAX, Max(0, (nextWakeup - now + 999) / 1000));
+
/*
* Ideally we would reuse a WaitEventSet object repeatedly
* here to avoid the overheads of WaitLatchOrSocket on epoll
@@ -513,8 +550,9 @@ WalReceiverMain(void)
WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE |
WL_TIMEOUT | WL_LATCH_SET,
wait_fd,
- NAPTIME_PER_CYCLE,
+ nap,
WAIT_EVENT_WAL_RECEIVER_MAIN);
+ now = GetCurrentTimestamp();
if (rc & WL_LATCH_SET)
{
ResetLatch(MyLatch);
@@ -550,34 +588,19 @@ WalReceiverMain(void)
* Check if time since last receive from primary has
* reached the configured limit.
*/
- if (wal_receiver_timeout > 0)
- {
- TimestampTz now = GetCurrentTimestamp();
- TimestampTz timeout;
-
- timeout =
- TimestampTzPlusMilliseconds(last_recv_timestamp,
- wal_receiver_timeout);
+ if (now >= wakeup[WALRCV_WAKEUP_TERMINATE])
+ ereport(ERROR,
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("terminating walreceiver due to timeout")));
- if (now >= timeout)
- ereport(ERROR,
- (errcode(ERRCODE_CONNECTION_FAILURE),
- errmsg("terminating walreceiver due to timeout")));
-
- /*
- * We didn't receive anything new, for half of
- * receiver replication timeout. Ping the server.
- */
- if (!ping_sent)
- {
- timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
- (wal_receiver_timeout / 2));
- if (now >= timeout)
- {
- requestReply = true;
- ping_sent = true;
- }
- }
+ /*
+ * We didn't receive anything new, for half of receiver
+ * replication timeout. Ping the server.
+ */
+ if (now >= wakeup[WALRCV_WAKEUP_PING])
+ {
+ requestReply = true;
+ wakeup[WALRCV_WAKEUP_PING] = PG_INT64_MAX;
}
XLogWalRcvSendReply(requestReply, requestReply);
@@ -1076,7 +1099,6 @@ XLogWalRcvSendReply(bool force, bool requestReply)
static XLogRecPtr writePtr = 0;
static XLogRecPtr flushPtr = 0;
XLogRecPtr applyPtr;
- static TimestampTz sendTime = 0;
TimestampTz now;
/*
@@ -1101,10 +1123,11 @@ XLogWalRcvSendReply(bool force, bool requestReply)
if (!force
&& writePtr == LogstreamResult.Write
&& flushPtr == LogstreamResult.Flush
- && !TimestampDifferenceExceeds(sendTime, now,
- wal_receiver_status_interval * 1000))
+ && now < wakeup[WALRCV_WAKEUP_REPLY])
return;
- sendTime = now;
+
+ /* Make sure we wake up when it's time to send another reply. */
+ WalRcvComputeNextWakeup(WALRCV_WAKEUP_REPLY, now);
/* Construct a new message */
writePtr = LogstreamResult.Write;
@@ -1149,7 +1172,6 @@ XLogWalRcvSendHSFeedback(bool immed)
catalog_xmin_epoch;
TransactionId xmin,
catalog_xmin;
- static TimestampTz sendTime = 0;
/* initially true so we always send at least one feedback message */
static bool primary_has_standby_xmin = true;
@@ -1165,16 +1187,12 @@ XLogWalRcvSendHSFeedback(bool immed)
/* Get current timestamp. */
now = GetCurrentTimestamp();
- if (!immed)
- {
- /*
- * Send feedback at most once per wal_receiver_status_interval.
- */
- if (!TimestampDifferenceExceeds(sendTime, now,
- wal_receiver_status_interval * 1000))
- return;
- sendTime = now;
- }
+ /* Send feedback at most once per wal_receiver_status_interval. */
+ if (!immed && now < wakeup[WALRCV_WAKEUP_HSFEEDBACK])
+ return;
+
+ /* Make sure we wake up when it's time to send feedback again. */
+ WalRcvComputeNextWakeup(WALRCV_WAKEUP_HSFEEDBACK, now);
/*
* If Hot Standby is not yet accepting connections there is nothing to
@@ -1286,6 +1304,45 @@ ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
}
/*
+ * Compute the next wakeup time for a given wakeup reason. Can be called to
+ * initialize a wakeup time, to adjust it for the next wakeup, or to
+ * reinitialize it when GUCs have changed.
+ */
+static void
+WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now)
+{
+ switch (reason)
+ {
+ case WALRCV_WAKEUP_TERMINATE:
+ if (wal_receiver_timeout <= 0)
+ wakeup[reason] = PG_INT64_MAX;
+ else
+ wakeup[reason] = now + wal_receiver_timeout * INT64CONST(1000);
+ break;
+ case WALRCV_WAKEUP_PING:
+ if (wal_receiver_timeout <= 0)
+ wakeup[reason] = PG_INT64_MAX;
+ else
+ wakeup[reason] = now + (wal_receiver_timeout / 2) * INT64CONST(1000);
+ break;
+ case WALRCV_WAKEUP_HSFEEDBACK:
+ if (!hot_standby_feedback || wal_receiver_status_interval <= 0)
+ wakeup[reason] = PG_INT64_MAX;
+ else
+ wakeup[reason] = now + wal_receiver_status_interval * INT64CONST(1000000);
+ break;
+ case WALRCV_WAKEUP_REPLY:
+ if (wal_receiver_status_interval <= 0)
+ wakeup[reason] = PG_INT64_MAX;
+ else
+ wakeup[reason] = now + wal_receiver_status_interval * INT64CONST(1000000);
+ break;
+ default:
+ break;
+ }
+}
+
+/*
* Wake up the walreceiver main loop.
*
* This is called by the startup process whenever interesting xlog records
diff --git a/src/tools/pgindent/typedefs.list b/src/tools/pgindent/typedefs.list
index 9683b0a88e5..245aea1dd14 100644
--- a/src/tools/pgindent/typedefs.list
+++ b/src/tools/pgindent/typedefs.list
@@ -2927,6 +2927,7 @@ WALInsertLock
WALInsertLockPadded
WALOpenSegment
WALReadError
+WalRcvWakeupReason
WALSegmentCloseCB
WALSegmentContext
WALSegmentOpenCB