aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/backend/replication/walreceiver.c19
-rw-r--r--src/backend/replication/walreceiverfuncs.c7
-rw-r--r--src/include/replication/walreceiver.h17
3 files changed, 27 insertions, 16 deletions
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 57c305d0e54..1bf9be673b7 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -256,13 +256,14 @@ WalReceiverMain(void)
walrcv->lastMsgSendTime =
walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = now;
+ /* Report the latch to use to awaken this process */
+ walrcv->latch = &MyProc->procLatch;
+
SpinLockRelease(&walrcv->mutex);
/* Arrange to clean up at walreceiver exit */
on_shmem_exit(WalRcvDie, 0);
- walrcv->latch = &MyProc->procLatch;
-
/* Properly accept or ignore signals the postmaster might send us */
pqsignal(SIGHUP, WalRcvSigHupHandler); /* set flag to read config file */
pqsignal(SIGINT, SIG_IGN);
@@ -777,8 +778,7 @@ WalRcvDie(int code, Datum arg)
/* Ensure that all WAL records received are flushed to disk */
XLogWalRcvFlush(true);
- walrcv->latch = NULL;
-
+ /* Mark ourselves inactive in shared memory */
SpinLockAcquire(&walrcv->mutex);
Assert(walrcv->walRcvState == WALRCV_STREAMING ||
walrcv->walRcvState == WALRCV_RESTARTING ||
@@ -789,6 +789,7 @@ WalRcvDie(int code, Datum arg)
walrcv->walRcvState = WALRCV_STOPPED;
walrcv->pid = 0;
walrcv->ready_to_display = false;
+ walrcv->latch = NULL;
SpinLockRelease(&walrcv->mutex);
/* Terminate the connection gracefully. */
@@ -1344,9 +1345,15 @@ ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
void
WalRcvForceReply(void)
{
+ Latch *latch;
+
WalRcv->force_reply = true;
- if (WalRcv->latch)
- SetLatch(WalRcv->latch);
+ /* fetching the latch pointer might not be atomic, so use spinlock */
+ SpinLockAcquire(&WalRcv->mutex);
+ latch = WalRcv->latch;
+ SpinLockRelease(&WalRcv->mutex);
+ if (latch)
+ SetLatch(latch);
}
/*
diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c
index 78f8693ece7..b1f28d0fc4e 100644
--- a/src/backend/replication/walreceiverfuncs.c
+++ b/src/backend/replication/walreceiverfuncs.c
@@ -226,6 +226,7 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
WalRcvData *walrcv = WalRcv;
bool launch = false;
pg_time_t now = (pg_time_t) time(NULL);
+ Latch *latch;
/*
* We always start at the beginning of the segment. That prevents a broken
@@ -274,12 +275,14 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
walrcv->receiveStart = recptr;
walrcv->receiveStartTLI = tli;
+ latch = walrcv->latch;
+
SpinLockRelease(&walrcv->mutex);
if (launch)
SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
- else if (walrcv->latch)
- SetLatch(walrcv->latch);
+ else if (latch)
+ SetLatch(latch);
}
/*
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 9a8b2e207ec..e58fc49c681 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -117,14 +117,6 @@ typedef struct
/* set true once conninfo is ready to display (obfuscated pwds etc) */
bool ready_to_display;
- slock_t mutex; /* locks shared variables shown above */
-
- /*
- * force walreceiver reply? This doesn't need to be locked; memory
- * barriers for ordering are sufficient.
- */
- bool force_reply;
-
/*
* Latch used by startup process to wake up walreceiver after telling it
* where to start streaming (after setting receiveStart and
@@ -133,6 +125,15 @@ typedef struct
* normally mapped to procLatch when walreceiver is running.
*/
Latch *latch;
+
+ slock_t mutex; /* locks shared variables shown above */
+
+ /*
+ * force walreceiver reply? This doesn't need to be locked; memory
+ * barriers for ordering are sufficient. But we do need atomic fetch and
+ * store semantics, so use sig_atomic_t.
+ */
+ sig_atomic_t force_reply; /* used as a bool */
} WalRcvData;
extern WalRcvData *WalRcv;