aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/walreceiverfuncs.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/walreceiverfuncs.c')
-rw-r--r--src/backend/replication/walreceiverfuncs.c90
1 files changed, 78 insertions, 12 deletions
diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c
index 9eba180f049..a8ccfc66398 100644
--- a/src/backend/replication/walreceiverfuncs.c
+++ b/src/backend/replication/walreceiverfuncs.c
@@ -64,12 +64,13 @@ WalRcvShmemInit(void)
MemSet(WalRcv, 0, WalRcvShmemSize());
WalRcv->walRcvState = WALRCV_STOPPED;
SpinLockInit(&WalRcv->mutex);
+ InitSharedLatch(&WalRcv->latch);
}
}
-/* Is walreceiver in progress (or starting up)? */
+/* Is walreceiver running (or starting up)? */
bool
-WalRcvInProgress(void)
+WalRcvRunning(void)
{
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
@@ -111,6 +112,53 @@ WalRcvInProgress(void)
}
/*
+ * Is walreceiver running and streaming (or at least attempting to connect,
+ * or starting up)?
+ */
+bool
+WalRcvStreaming(void)
+{
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalRcvData *walrcv = WalRcv;
+ WalRcvState state;
+ pg_time_t startTime;
+
+ SpinLockAcquire(&walrcv->mutex);
+
+ state = walrcv->walRcvState;
+ startTime = walrcv->startTime;
+
+ SpinLockRelease(&walrcv->mutex);
+
+ /*
+ * If it has taken too long for walreceiver to start up, give up. Setting
+ * the state to STOPPED ensures that if walreceiver later does start up
+ * after all, it will see that it's not supposed to be running and die
+ * without doing anything.
+ */
+ if (state == WALRCV_STARTING)
+ {
+ pg_time_t now = (pg_time_t) time(NULL);
+
+ if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
+ {
+ SpinLockAcquire(&walrcv->mutex);
+
+ if (walrcv->walRcvState == WALRCV_STARTING)
+ state = walrcv->walRcvState = WALRCV_STOPPED;
+
+ SpinLockRelease(&walrcv->mutex);
+ }
+ }
+
+ if (state == WALRCV_STREAMING || state == WALRCV_STARTING ||
+ state == WALRCV_RESTARTING)
+ return true;
+ else
+ return false;
+}
+
+/*
* Stop walreceiver (if running) and wait for it to die.
* Executed by the Startup process.
*/
@@ -135,7 +183,9 @@ ShutdownWalRcv(void)
walrcv->walRcvState = WALRCV_STOPPED;
break;
- case WALRCV_RUNNING:
+ case WALRCV_STREAMING:
+ case WALRCV_WAITING:
+ case WALRCV_RESTARTING:
walrcv->walRcvState = WALRCV_STOPPING;
/* fall through */
case WALRCV_STOPPING:
@@ -154,7 +204,7 @@ ShutdownWalRcv(void)
* Wait for walreceiver to acknowledge its death by setting state to
* WALRCV_STOPPED.
*/
- while (WalRcvInProgress())
+ while (WalRcvRunning())
{
/*
* This possibly-long loop needs to handle interrupts of startup
@@ -173,10 +223,11 @@ ShutdownWalRcv(void)
* is a libpq connection string to use.
*/
void
-RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo)
+RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo)
{
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
+ bool launch = false;
pg_time_t now = (pg_time_t) time(NULL);
/*
@@ -190,14 +241,22 @@ RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo)
SpinLockAcquire(&walrcv->mutex);
- /* It better be stopped before we try to restart it */
- Assert(walrcv->walRcvState == WALRCV_STOPPED);
+ /* It better be stopped if we try to restart it */
+ Assert(walrcv->walRcvState == WALRCV_STOPPED ||
+ walrcv->walRcvState == WALRCV_WAITING);
if (conninfo != NULL)
strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO);
else
walrcv->conninfo[0] = '\0';
- walrcv->walRcvState = WALRCV_STARTING;
+
+ if (walrcv->walRcvState == WALRCV_STOPPED)
+ {
+ launch = true;
+ walrcv->walRcvState = WALRCV_STARTING;
+ }
+ else
+ walrcv->walRcvState = WALRCV_RESTARTING;
walrcv->startTime = now;
/*
@@ -210,10 +269,14 @@ RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo)
walrcv->latestChunkStart = recptr;
}
walrcv->receiveStart = recptr;
+ walrcv->receiveStartTLI = tli;
SpinLockRelease(&walrcv->mutex);
- SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
+ if (launch)
+ SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
+ else
+ SetLatch(&walrcv->latch);
}
/*
@@ -221,10 +284,11 @@ RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo)
*
* Optionally, returns the previous chunk start, that is the first byte
* written in the most recent walreceiver flush cycle. Callers not
- * interested in that value may pass NULL for latestChunkStart.
+ * interested in that value may pass NULL for latestChunkStart. Same for
+ * receiveTLI.
*/
XLogRecPtr
-GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart)
+GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
{
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
@@ -234,6 +298,8 @@ GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart)
recptr = walrcv->receivedUpto;
if (latestChunkStart)
*latestChunkStart = walrcv->latestChunkStart;
+ if (receiveTLI)
+ *receiveTLI = walrcv->receivedTLI;
SpinLockRelease(&walrcv->mutex);
return recptr;
@@ -258,7 +324,7 @@ GetReplicationApplyDelay(void)
receivePtr = walrcv->receivedUpto;
SpinLockRelease(&walrcv->mutex);
- replayPtr = GetXLogReplayRecPtr(NULL);
+ replayPtr = GetXLogReplayRecPtr();
if (XLByteEQ(receivePtr, replayPtr))
return 0;