aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/walreceiver.c19
-rw-r--r--src/backend/replication/walreceiverfuncs.c7
-rw-r--r--src/include/replication/walreceiver.h5
3 files changed, 21 insertions, 10 deletions
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index ec59d6e5621..c85ffb0908b 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -256,13 +256,14 @@ WalReceiverMain(void)
walrcv->lastMsgSendTime =
walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = now;
+ /* Report the latch to use to awaken this process */
+ walrcv->latch = &MyProc->procLatch;
+
SpinLockRelease(&walrcv->mutex);
/* Arrange to clean up at walreceiver exit */
on_shmem_exit(WalRcvDie, 0);
- walrcv->latch = &MyProc->procLatch;
-
/* Properly accept or ignore signals the postmaster might send us */
pqsignal(SIGHUP, WalRcvSigHupHandler); /* set flag to read config file */
pqsignal(SIGINT, SIG_IGN);
@@ -777,8 +778,7 @@ WalRcvDie(int code, Datum arg)
/* Ensure that all WAL records received are flushed to disk */
XLogWalRcvFlush(true);
- walrcv->latch = NULL;
-
+ /* Mark ourselves inactive in shared memory */
SpinLockAcquire(&walrcv->mutex);
Assert(walrcv->walRcvState == WALRCV_STREAMING ||
walrcv->walRcvState == WALRCV_RESTARTING ||
@@ -789,6 +789,7 @@ WalRcvDie(int code, Datum arg)
walrcv->walRcvState = WALRCV_STOPPED;
walrcv->pid = 0;
walrcv->ready_to_display = false;
+ walrcv->latch = NULL;
SpinLockRelease(&walrcv->mutex);
/* Terminate the connection gracefully. */
@@ -1344,9 +1345,15 @@ ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
void
WalRcvForceReply(void)
{
+ Latch *latch;
+
WalRcv->force_reply = true;
- if (WalRcv->latch)
- SetLatch(WalRcv->latch);
+ /* fetching the latch pointer might not be atomic, so use spinlock */
+ SpinLockAcquire(&WalRcv->mutex);
+ latch = WalRcv->latch;
+ SpinLockRelease(&WalRcv->mutex);
+ if (latch)
+ SetLatch(latch);
}
/*
diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c
index 8ed7254b5c6..48e8498d620 100644
--- a/src/backend/replication/walreceiverfuncs.c
+++ b/src/backend/replication/walreceiverfuncs.c
@@ -226,6 +226,7 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
WalRcvData *walrcv = WalRcv;
bool launch = false;
pg_time_t now = (pg_time_t) time(NULL);
+ Latch *latch;
/*
* We always start at the beginning of the segment. That prevents a broken
@@ -274,12 +275,14 @@ RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
walrcv->receiveStart = recptr;
walrcv->receiveStartTLI = tli;
+ latch = walrcv->latch;
+
SpinLockRelease(&walrcv->mutex);
if (launch)
SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
- else if (walrcv->latch)
- SetLatch(walrcv->latch);
+ else if (latch)
+ SetLatch(latch);
}
/*
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 9a8b2e207ec..742ab6be000 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -121,9 +121,10 @@ typedef struct
/*
* force walreceiver reply? This doesn't need to be locked; memory
- * barriers for ordering are sufficient.
+ * barriers for ordering are sufficient. But we do need atomic fetch and
+ * store semantics, so use sig_atomic_t.
*/
- bool force_reply;
+ sig_atomic_t force_reply; /* used as a bool */
/*
* Latch used by startup process to wake up walreceiver after telling it