diff options
Diffstat (limited to 'src/backend/replication/walreceiverfuncs.c')
-rw-r--r-- | src/backend/replication/walreceiverfuncs.c | 90 |
1 files changed, 78 insertions, 12 deletions
diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c index 9eba180f049..a8ccfc66398 100644 --- a/src/backend/replication/walreceiverfuncs.c +++ b/src/backend/replication/walreceiverfuncs.c @@ -64,12 +64,13 @@ WalRcvShmemInit(void) MemSet(WalRcv, 0, WalRcvShmemSize()); WalRcv->walRcvState = WALRCV_STOPPED; SpinLockInit(&WalRcv->mutex); + InitSharedLatch(&WalRcv->latch); } } -/* Is walreceiver in progress (or starting up)? */ +/* Is walreceiver running (or starting up)? */ bool -WalRcvInProgress(void) +WalRcvRunning(void) { /* use volatile pointer to prevent code rearrangement */ volatile WalRcvData *walrcv = WalRcv; @@ -111,6 +112,53 @@ WalRcvInProgress(void) } /* + * Is walreceiver running and streaming (or at least attempting to connect, + * or starting up)? + */ +bool +WalRcvStreaming(void) +{ + /* use volatile pointer to prevent code rearrangement */ + volatile WalRcvData *walrcv = WalRcv; + WalRcvState state; + pg_time_t startTime; + + SpinLockAcquire(&walrcv->mutex); + + state = walrcv->walRcvState; + startTime = walrcv->startTime; + + SpinLockRelease(&walrcv->mutex); + + /* + * If it has taken too long for walreceiver to start up, give up. Setting + * the state to STOPPED ensures that if walreceiver later does start up + * after all, it will see that it's not supposed to be running and die + * without doing anything. + */ + if (state == WALRCV_STARTING) + { + pg_time_t now = (pg_time_t) time(NULL); + + if ((now - startTime) > WALRCV_STARTUP_TIMEOUT) + { + SpinLockAcquire(&walrcv->mutex); + + if (walrcv->walRcvState == WALRCV_STARTING) + state = walrcv->walRcvState = WALRCV_STOPPED; + + SpinLockRelease(&walrcv->mutex); + } + } + + if (state == WALRCV_STREAMING || state == WALRCV_STARTING || + state == WALRCV_RESTARTING) + return true; + else + return false; +} + +/* * Stop walreceiver (if running) and wait for it to die. * Executed by the Startup process. */ @@ -135,7 +183,9 @@ ShutdownWalRcv(void) walrcv->walRcvState = WALRCV_STOPPED; break; - case WALRCV_RUNNING: + case WALRCV_STREAMING: + case WALRCV_WAITING: + case WALRCV_RESTARTING: walrcv->walRcvState = WALRCV_STOPPING; /* fall through */ case WALRCV_STOPPING: @@ -154,7 +204,7 @@ ShutdownWalRcv(void) * Wait for walreceiver to acknowledge its death by setting state to * WALRCV_STOPPED. */ - while (WalRcvInProgress()) + while (WalRcvRunning()) { /* * This possibly-long loop needs to handle interrupts of startup @@ -173,10 +223,11 @@ ShutdownWalRcv(void) * is a libpq connection string to use. */ void -RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo) +RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo) { /* use volatile pointer to prevent code rearrangement */ volatile WalRcvData *walrcv = WalRcv; + bool launch = false; pg_time_t now = (pg_time_t) time(NULL); /* @@ -190,14 +241,22 @@ RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo) SpinLockAcquire(&walrcv->mutex); - /* It better be stopped before we try to restart it */ - Assert(walrcv->walRcvState == WALRCV_STOPPED); + /* It better be stopped if we try to restart it */ + Assert(walrcv->walRcvState == WALRCV_STOPPED || + walrcv->walRcvState == WALRCV_WAITING); if (conninfo != NULL) strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO); else walrcv->conninfo[0] = '\0'; - walrcv->walRcvState = WALRCV_STARTING; + + if (walrcv->walRcvState == WALRCV_STOPPED) + { + launch = true; + walrcv->walRcvState = WALRCV_STARTING; + } + else + walrcv->walRcvState = WALRCV_RESTARTING; walrcv->startTime = now; /* @@ -210,10 +269,14 @@ RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo) walrcv->latestChunkStart = recptr; } walrcv->receiveStart = recptr; + walrcv->receiveStartTLI = tli; SpinLockRelease(&walrcv->mutex); - SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER); + if (launch) + SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER); + else + SetLatch(&walrcv->latch); } /* @@ -221,10 +284,11 @@ RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo) * * Optionally, returns the previous chunk start, that is the first byte * written in the most recent walreceiver flush cycle. Callers not - * interested in that value may pass NULL for latestChunkStart. + * interested in that value may pass NULL for latestChunkStart. Same for + * receiveTLI. */ XLogRecPtr -GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart) +GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI) { /* use volatile pointer to prevent code rearrangement */ volatile WalRcvData *walrcv = WalRcv; @@ -234,6 +298,8 @@ GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart) recptr = walrcv->receivedUpto; if (latestChunkStart) *latestChunkStart = walrcv->latestChunkStart; + if (receiveTLI) + *receiveTLI = walrcv->receivedTLI; SpinLockRelease(&walrcv->mutex); return recptr; @@ -258,7 +324,7 @@ GetReplicationApplyDelay(void) receivePtr = walrcv->receivedUpto; SpinLockRelease(&walrcv->mutex); - replayPtr = GetXLogReplayRecPtr(NULL); + replayPtr = GetXLogReplayRecPtr(); if (XLByteEQ(receivePtr, replayPtr)) return 0; |