diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/replication/walreceiver.c | 19 | ||||
-rw-r--r-- | src/backend/replication/walreceiverfuncs.c | 7 | ||||
-rw-r--r-- | src/include/replication/walreceiver.h | 5 |
3 files changed, 21 insertions, 10 deletions
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index ec59d6e5621..c85ffb0908b 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -256,13 +256,14 @@ WalReceiverMain(void) walrcv->lastMsgSendTime = walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = now; + /* Report the latch to use to awaken this process */ + walrcv->latch = &MyProc->procLatch; + SpinLockRelease(&walrcv->mutex); /* Arrange to clean up at walreceiver exit */ on_shmem_exit(WalRcvDie, 0); - walrcv->latch = &MyProc->procLatch; - /* Properly accept or ignore signals the postmaster might send us */ pqsignal(SIGHUP, WalRcvSigHupHandler); /* set flag to read config file */ pqsignal(SIGINT, SIG_IGN); @@ -777,8 +778,7 @@ WalRcvDie(int code, Datum arg) /* Ensure that all WAL records received are flushed to disk */ XLogWalRcvFlush(true); - walrcv->latch = NULL; - + /* Mark ourselves inactive in shared memory */ SpinLockAcquire(&walrcv->mutex); Assert(walrcv->walRcvState == WALRCV_STREAMING || walrcv->walRcvState == WALRCV_RESTARTING || @@ -789,6 +789,7 @@ WalRcvDie(int code, Datum arg) walrcv->walRcvState = WALRCV_STOPPED; walrcv->pid = 0; walrcv->ready_to_display = false; + walrcv->latch = NULL; SpinLockRelease(&walrcv->mutex); /* Terminate the connection gracefully. */ @@ -1344,9 +1345,15 @@ ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime) void WalRcvForceReply(void) { + Latch *latch; + WalRcv->force_reply = true; - if (WalRcv->latch) - SetLatch(WalRcv->latch); + /* fetching the latch pointer might not be atomic, so use spinlock */ + SpinLockAcquire(&WalRcv->mutex); + latch = WalRcv->latch; + SpinLockRelease(&WalRcv->mutex); + if (latch) + SetLatch(latch); } /* diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c index 8ed7254b5c6..48e8498d620 100644 --- a/src/backend/replication/walreceiverfuncs.c +++ b/src/backend/replication/walreceiverfuncs.c @@ -226,6 +226,7 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo, WalRcvData *walrcv = WalRcv; bool launch = false; pg_time_t now = (pg_time_t) time(NULL); + Latch *latch; /* * We always start at the beginning of the segment. That prevents a broken @@ -274,12 +275,14 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo, walrcv->receiveStart = recptr; walrcv->receiveStartTLI = tli; + latch = walrcv->latch; + SpinLockRelease(&walrcv->mutex); if (launch) SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER); - else if (walrcv->latch) - SetLatch(walrcv->latch); + else if (latch) + SetLatch(latch); } /* diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h index 9a8b2e207ec..742ab6be000 100644 --- a/src/include/replication/walreceiver.h +++ b/src/include/replication/walreceiver.h @@ -121,9 +121,10 @@ typedef struct /* * force walreceiver reply? This doesn't need to be locked; memory - * barriers for ordering are sufficient. + * barriers for ordering are sufficient. But we do need atomic fetch and + * store semantics, so use sig_atomic_t. */ - bool force_reply; + sig_atomic_t force_reply; /* used as a bool */ /* * Latch used by startup process to wake up walreceiver after telling it |