aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/walreceiver.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/walreceiver.c')
-rw-r--r--src/backend/replication/walreceiver.c85
1 files changed, 70 insertions, 15 deletions
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 7b36e02faa9..057c250793d 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -352,8 +352,6 @@ WalReceiverMain(void)
if (walrcv_startstreaming(startpointTLI, startpoint,
slotname[0] != '\0' ? slotname : NULL))
{
- bool endofwal = false;
-
if (first_stream)
ereport(LOG,
(errmsg("started streaming WAL from primary at %X/%X on timeline %u",
@@ -376,18 +374,13 @@ WalReceiverMain(void)
ping_sent = false;
/* Loop until end-of-streaming or error */
- while (!endofwal)
+ for (;;)
{
char *buf;
int len;
-
- /*
- * Emergency bailout if postmaster has died. This is to avoid
- * the necessity for manual cleanup of all postmaster
- * children.
- */
- if (!PostmasterIsAlive())
- exit(1);
+ bool endofwal = false;
+ int wait_fd = PGINVALID_SOCKET;
+ int rc;
/*
* Exit walreceiver if we're not in recovery. This should not
@@ -407,8 +400,8 @@ WalReceiverMain(void)
XLogWalRcvSendHSFeedback(true);
}
- /* Wait a while for data to arrive */
- len = walrcv_receive(NAPTIME_PER_CYCLE, &buf);
+ /* See if we can read data immediately */
+ len = walrcv_receive(&buf, &wait_fd);
if (len != 0)
{
/*
@@ -439,7 +432,7 @@ WalReceiverMain(void)
endofwal = true;
break;
}
- len = walrcv_receive(0, &buf);
+ len = walrcv_receive(&buf, &wait_fd);
}
/* Let the master know that we received some data. */
@@ -452,7 +445,54 @@ WalReceiverMain(void)
*/
XLogWalRcvFlush(false);
}
- else
+
+ /* Check if we need to exit the streaming loop. */
+ if (endofwal)
+ break;
+
+ /*
+ * Ideally we would reuse a WaitEventSet object repeatedly
+ * here to avoid the overheads of WaitLatchOrSocket on epoll
+ * systems, but we can't be sure that libpq (or any other
+ * walreceiver implementation) has the same socket (even if
+ * the fd is the same number, it may have been closed and
+ * reopened since the last time). In future, if there is a
+ * function for removing sockets from WaitEventSet, then we
+ * could add and remove just the socket each time, potentially
+ * avoiding some system calls.
+ */
+ Assert(wait_fd != PGINVALID_SOCKET);
+ rc = WaitLatchOrSocket(&walrcv->latch,
+ WL_POSTMASTER_DEATH | WL_SOCKET_READABLE |
+ WL_TIMEOUT | WL_LATCH_SET,
+ wait_fd,
+ NAPTIME_PER_CYCLE);
+ if (rc & WL_LATCH_SET)
+ {
+ ResetLatch(&walrcv->latch);
+ if (walrcv->force_reply)
+ {
+ /*
+ * The recovery process has asked us to send apply
+ * feedback now. Make sure the flag is really set to
+ * false in shared memory before sending the reply,
+ * so we don't miss a new request for a reply.
+ */
+ walrcv->force_reply = false;
+ pg_memory_barrier();
+ XLogWalRcvSendReply(true, false);
+ }
+ }
+ if (rc & WL_POSTMASTER_DEATH)
+ {
+ /*
+ * Emergency bailout if postmaster has died. This is to
+ * avoid the necessity for manual cleanup of all
+ * postmaster children.
+ */
+ exit(1);
+ }
+ if (rc & WL_TIMEOUT)
{
/*
* We didn't receive anything new. If we haven't heard
@@ -1222,6 +1262,21 @@ ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
}
/*
+ * Wake up the walreceiver main loop.
+ *
+ * This is called by the startup process whenever interesting xlog records
+ * are applied, so that walreceiver can check if it needs to send an apply
+ * notification back to the master which may be waiting in a COMMIT with
+ * synchronous_commit = remote_apply.
+ */
+void
+WalRcvForceReply(void)
+{
+ WalRcv->force_reply = true;
+ SetLatch(&WalRcv->latch);
+}
+
+/*
* Return a string constant representing the state. This is used
* in system functions and views, and should *not* be translated.
*/