aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/replication/walsender.c43
1 files changed, 25 insertions, 18 deletions
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 81244541e2b..eb3f18ed487 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -244,6 +244,7 @@ static void WalSndKeepalive(bool requestReply);
static void WalSndKeepaliveIfNecessary(void);
static void WalSndCheckTimeOut(void);
static long WalSndComputeSleeptime(TimestampTz now);
+static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event);
static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid);
@@ -1287,7 +1288,6 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
/* If we have pending write here, go to slow path */
for (;;)
{
- int wakeEvents;
long sleeptime;
/* Check for input from the client */
@@ -1304,13 +1304,9 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
- wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH |
- WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE | WL_TIMEOUT;
-
/* Sleep until something happens or we time out */
- (void) WaitLatchOrSocket(MyLatch, wakeEvents,
- MyProcPort->sock, sleeptime,
- WAIT_EVENT_WAL_SENDER_WRITE_DATA);
+ WalSndWait(WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE, sleeptime,
+ WAIT_EVENT_WAL_SENDER_WRITE_DATA);
/* Clear any already-pending wakeups */
ResetLatch(MyLatch);
@@ -1480,15 +1476,12 @@ WalSndWaitForWal(XLogRecPtr loc)
*/
sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
- wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH |
- WL_SOCKET_READABLE | WL_TIMEOUT;
+ wakeEvents = WL_SOCKET_READABLE;
if (pq_is_send_pending())
wakeEvents |= WL_SOCKET_WRITEABLE;
- (void) WaitLatchOrSocket(MyLatch, wakeEvents,
- MyProcPort->sock, sleeptime,
- WAIT_EVENT_WAL_SENDER_WAIT_WAL);
+ WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_WAIT_WAL);
}
/* reactivate latch so WalSndLoop knows to continue */
@@ -2348,10 +2341,10 @@ WalSndLoop(WalSndSendDataCallback send_data)
long sleeptime;
int wakeEvents;
- wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | WL_TIMEOUT;
-
if (!streamingDoneReceiving)
- wakeEvents |= WL_SOCKET_READABLE;
+ wakeEvents = WL_SOCKET_READABLE;
+ else
+ wakeEvents = 0;
/*
* Use fresh timestamp, not last_processing, to reduce the chance
@@ -2363,9 +2356,7 @@ WalSndLoop(WalSndSendDataCallback send_data)
wakeEvents |= WL_SOCKET_WRITEABLE;
/* Sleep until something happens or we time out */
- (void) WaitLatchOrSocket(MyLatch, wakeEvents,
- MyProcPort->sock, sleeptime,
- WAIT_EVENT_WAL_SENDER_MAIN);
+ WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_MAIN);
}
}
}
@@ -3122,6 +3113,22 @@ WalSndWakeup(void)
}
/*
+ * Wait for readiness on the FeBe socket, or a timeout. The mask should be
+ * composed of optional WL_SOCKET_WRITEABLE and WL_SOCKET_READABLE flags. Exit
+ * on postmaster death.
+ */
+static void
+WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
+{
+ WaitEvent event;
+
+ ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, socket_events, NULL);
+ if (WaitEventSetWait(FeBeWaitSet, timeout, &event, 1, wait_event) == 1 &&
+ (event.events & WL_POSTMASTER_DEATH))
+ proc_exit(1);
+}
+
+/*
* Signal all walsenders to move to stopping state.
*
* This will trigger walsenders to move to a state where no further WAL can be