diff options
-rw-r--r-- | src/backend/replication/walsender.c | 43 |
1 files changed, 25 insertions, 18 deletions
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 81244541e2b..eb3f18ed487 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -244,6 +244,7 @@ static void WalSndKeepalive(bool requestReply); static void WalSndKeepaliveIfNecessary(void); static void WalSndCheckTimeOut(void); static long WalSndComputeSleeptime(TimestampTz now); +static void WalSndWait(uint32 socket_events, long timeout, uint32 wait_event); static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write); static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write); static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid); @@ -1287,7 +1288,6 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, /* If we have pending write here, go to slow path */ for (;;) { - int wakeEvents; long sleeptime; /* Check for input from the client */ @@ -1304,13 +1304,9 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp()); - wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | - WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE | WL_TIMEOUT; - /* Sleep until something happens or we time out */ - (void) WaitLatchOrSocket(MyLatch, wakeEvents, - MyProcPort->sock, sleeptime, - WAIT_EVENT_WAL_SENDER_WRITE_DATA); + WalSndWait(WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE, sleeptime, + WAIT_EVENT_WAL_SENDER_WRITE_DATA); /* Clear any already-pending wakeups */ ResetLatch(MyLatch); @@ -1480,15 +1476,12 @@ WalSndWaitForWal(XLogRecPtr loc) */ sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp()); - wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | - WL_SOCKET_READABLE | WL_TIMEOUT; + wakeEvents = WL_SOCKET_READABLE; if (pq_is_send_pending()) wakeEvents |= WL_SOCKET_WRITEABLE; - (void) WaitLatchOrSocket(MyLatch, wakeEvents, - MyProcPort->sock, sleeptime, - WAIT_EVENT_WAL_SENDER_WAIT_WAL); + WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_WAIT_WAL); } /* reactivate latch so WalSndLoop knows to continue */ @@ -2348,10 +2341,10 @@ WalSndLoop(WalSndSendDataCallback send_data) long sleeptime; int wakeEvents; - wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | WL_TIMEOUT; - if (!streamingDoneReceiving) - wakeEvents |= WL_SOCKET_READABLE; + wakeEvents = WL_SOCKET_READABLE; + else + wakeEvents = 0; /* * Use fresh timestamp, not last_processing, to reduce the chance @@ -2363,9 +2356,7 @@ WalSndLoop(WalSndSendDataCallback send_data) wakeEvents |= WL_SOCKET_WRITEABLE; /* Sleep until something happens or we time out */ - (void) WaitLatchOrSocket(MyLatch, wakeEvents, - MyProcPort->sock, sleeptime, - WAIT_EVENT_WAL_SENDER_MAIN); + WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_MAIN); } } } @@ -3122,6 +3113,22 @@ WalSndWakeup(void) } /* + * Wait for readiness on the FeBe socket, or a timeout. The mask should be + * composed of optional WL_SOCKET_WRITEABLE and WL_SOCKET_READABLE flags. Exit + * on postmaster death. + */ +static void +WalSndWait(uint32 socket_events, long timeout, uint32 wait_event) +{ + WaitEvent event; + + ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, socket_events, NULL); + if (WaitEventSetWait(FeBeWaitSet, timeout, &event, 1, wait_event) == 1 && + (event.events & WL_POSTMASTER_DEATH)) + proc_exit(1); +} + +/* * Signal all walsenders to move to stopping state. * * This will trigger walsenders to move to a state where no further WAL can be |