aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/walsender.c48
1 files changed, 31 insertions, 17 deletions
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index e4ecbe37ee9..9c320ff1edf 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -764,15 +764,14 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
/* make sure we have enough WAL available */
flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
- /* more than one block available */
- if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
- count = XLOG_BLCKSZ;
- /* not enough WAL synced, that can happen during shutdown */
- else if (targetPagePtr + reqLen > flushptr)
+ /* fail if not (implies we are going to shut down) */
+ if (flushptr < targetPagePtr + reqLen)
return -1;
- /* part of the page available */
+
+ if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
+ count = XLOG_BLCKSZ; /* more than one block available */
else
- count = flushptr - targetPagePtr;
+ count = flushptr - targetPagePtr; /* part of the page available */
/* now actually read the data, we know it's there */
XLogRead(cur_page, targetPagePtr, XLOG_BLCKSZ);
@@ -1158,7 +1157,11 @@ WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
}
/*
- * Wait till WAL < loc is flushed to disk so it can be safely read.
+ * Wait till WAL < loc is flushed to disk so it can be safely sent to client.
+ *
+ * Returns end LSN of flushed WAL. Normally this will be >= loc, but
+ * if we detect a shutdown request (either from postmaster or client)
+ * we will return early, so caller must always check.
*/
static XLogRecPtr
WalSndWaitForWal(XLogRecPtr loc)
@@ -1225,9 +1228,7 @@ WalSndWaitForWal(XLogRecPtr loc)
RecentFlushPtr = GetXLogReplayRecPtr(NULL);
/*
- * If postmaster asked us to stop, don't wait here anymore. This will
- * cause the xlogreader to return without reading a full record, which
- * is the fastest way to reach the mainloop which then can quit.
+ * If postmaster asked us to stop, don't wait anymore.
*
* It's important to do this check after the recomputation of
* RecentFlushPtr, so we can send all remaining data before shutting
@@ -1258,14 +1259,20 @@ WalSndWaitForWal(XLogRecPtr loc)
WalSndCaughtUp = true;
/*
- * Try to flush pending output to the client. Also wait for the socket
- * becoming writable, if there's still pending output after an attempt
- * to flush. Otherwise we might just sit on output data while waiting
- * for new WAL being generated.
+ * Try to flush any pending output to the client.
*/
if (pq_flush_if_writable() != 0)
WalSndShutdown();
+ /*
+ * If we have received CopyDone from the client, sent CopyDone
+ * ourselves, and the output buffer is empty, it's time to exit
+ * streaming, so fail the current WAL fetch request.
+ */
+ if (streamingDoneReceiving && streamingDoneSending &&
+ !pq_is_send_pending())
+ break;
+
now = GetCurrentTimestamp();
/* die if timeout was reached */
@@ -1274,6 +1281,13 @@ WalSndWaitForWal(XLogRecPtr loc)
/* Send keepalive if the time has come */
WalSndKeepaliveIfNecessary(now);
+ /*
+ * Sleep until something happens or we time out. Also wait for the
+ * socket becoming writable, if there's still pending output.
+ * Otherwise we might sit on sendable output data while waiting for
+ * new WAL to be generated. (But if we have nothing to send, we don't
+ * want to wake on socket-writable.)
+ */
sleeptime = WalSndComputeSleeptime(now);
wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
@@ -1282,7 +1296,6 @@ WalSndWaitForWal(XLogRecPtr loc)
if (pq_is_send_pending())
wakeEvents |= WL_SOCKET_WRITEABLE;
- /* Sleep until something happens or we time out */
WaitLatchOrSocket(MyLatch, wakeEvents,
MyProcPort->sock, sleeptime);
}
@@ -1870,7 +1883,8 @@ WalSndLoop(WalSndSendDataCallback send_data)
* ourselves, and the output buffer is empty, it's time to exit
* streaming.
*/
- if (!pq_is_send_pending() && streamingDoneSending && streamingDoneReceiving)
+ if (streamingDoneReceiving && streamingDoneSending &&
+ !pq_is_send_pending())
break;
/*