diff options
Diffstat (limited to 'src/backend/replication/walreceiverfuncs.c')
-rw-r--r-- | src/backend/replication/walreceiverfuncs.c | 41 |
1 files changed, 28 insertions, 13 deletions
diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c index 63e60478ea6..fff6c54c45d 100644 --- a/src/backend/replication/walreceiverfuncs.c +++ b/src/backend/replication/walreceiverfuncs.c @@ -23,6 +23,7 @@ #include <signal.h> #include "access/xlog_internal.h" +#include "pgstat.h" #include "postmaster/startup.h" #include "replication/walreceiver.h" #include "storage/pmsignal.h" @@ -62,6 +63,7 @@ WalRcvShmemInit(void) /* First time through, so initialize */ MemSet(WalRcv, 0, WalRcvShmemSize()); WalRcv->walRcvState = WALRCV_STOPPED; + ConditionVariableInit(&WalRcv->walRcvStoppedCV); SpinLockInit(&WalRcv->mutex); pg_atomic_init_u64(&WalRcv->writtenUpto, 0); WalRcv->latch = NULL; @@ -95,12 +97,18 @@ WalRcvRunning(void) if ((now - startTime) > WALRCV_STARTUP_TIMEOUT) { - SpinLockAcquire(&walrcv->mutex); + bool stopped = false; + SpinLockAcquire(&walrcv->mutex); if (walrcv->walRcvState == WALRCV_STARTING) + { state = walrcv->walRcvState = WALRCV_STOPPED; - + stopped = true; + } SpinLockRelease(&walrcv->mutex); + + if (stopped) + ConditionVariableBroadcast(&walrcv->walRcvStoppedCV); } } @@ -140,12 +148,18 @@ WalRcvStreaming(void) if ((now - startTime) > WALRCV_STARTUP_TIMEOUT) { - SpinLockAcquire(&walrcv->mutex); + bool stopped = false; + SpinLockAcquire(&walrcv->mutex); if (walrcv->walRcvState == WALRCV_STARTING) + { state = walrcv->walRcvState = WALRCV_STOPPED; - + stopped = true; + } SpinLockRelease(&walrcv->mutex); + + if (stopped) + ConditionVariableBroadcast(&walrcv->walRcvStoppedCV); } } @@ -165,6 +179,7 @@ ShutdownWalRcv(void) { WalRcvData *walrcv = WalRcv; pid_t walrcvpid = 0; + bool stopped = false; /* * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED @@ -178,6 +193,7 @@ ShutdownWalRcv(void) break; case WALRCV_STARTING: walrcv->walRcvState = WALRCV_STOPPED; + stopped = true; break; case WALRCV_STREAMING: @@ -191,6 +207,10 @@ ShutdownWalRcv(void) } SpinLockRelease(&walrcv->mutex); + /* Unnecessary but consistent. */ + if (stopped) + ConditionVariableBroadcast(&walrcv->walRcvStoppedCV); + /* * Signal walreceiver process if it was still running. */ @@ -201,16 +221,11 @@ ShutdownWalRcv(void) * Wait for walreceiver to acknowledge its death by setting state to * WALRCV_STOPPED. */ + ConditionVariablePrepareToSleep(&walrcv->walRcvStoppedCV); while (WalRcvRunning()) - { - /* - * This possibly-long loop needs to handle interrupts of startup - * process. - */ - HandleStartupProcInterrupts(); - - pg_usleep(100000); /* 100ms */ - } + ConditionVariableSleep(&walrcv->walRcvStoppedCV, + WAIT_EVENT_WALRCV_EXIT); + ConditionVariableCancelSleep(); } /* |