aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/syncrep.c32
-rw-r--r--src/backend/replication/walreceiver.c42
-rw-r--r--src/backend/replication/walsender.c46
-rw-r--r--src/include/replication/walreceiver.h6
-rw-r--r--src/include/replication/walsender_private.h9
5 files changed, 80 insertions, 55 deletions
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c
index 5fd47689dd2..6a28becdad5 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -711,14 +711,24 @@ SyncRepGetSyncStandbysQuorum(bool *am_sync)
for (i = 0; i < max_wal_senders; i++)
{
+ XLogRecPtr flush;
+ WalSndState state;
+ int pid;
+
walsnd = &WalSndCtl->walsnds[i];
+ SpinLockAcquire(&walsnd->mutex);
+ pid = walsnd->pid;
+ flush = walsnd->flush;
+ state = walsnd->state;
+ SpinLockRelease(&walsnd->mutex);
+
/* Must be active */
- if (walsnd->pid == 0)
+ if (pid == 0)
continue;
/* Must be streaming */
- if (walsnd->state != WALSNDSTATE_STREAMING)
+ if (state != WALSNDSTATE_STREAMING)
continue;
/* Must be synchronous */
@@ -726,7 +736,7 @@ SyncRepGetSyncStandbysQuorum(bool *am_sync)
continue;
/* Must have a valid flush position */
- if (XLogRecPtrIsInvalid(walsnd->flush))
+ if (XLogRecPtrIsInvalid(flush))
continue;
/*
@@ -780,14 +790,24 @@ SyncRepGetSyncStandbysPriority(bool *am_sync)
*/
for (i = 0; i < max_wal_senders; i++)
{
+ XLogRecPtr flush;
+ WalSndState state;
+ int pid;
+
walsnd = &WalSndCtl->walsnds[i];
+ SpinLockAcquire(&walsnd->mutex);
+ pid = walsnd->pid;
+ flush = walsnd->flush;
+ state = walsnd->state;
+ SpinLockRelease(&walsnd->mutex);
+
/* Must be active */
- if (walsnd->pid == 0)
+ if (pid == 0)
continue;
/* Must be streaming */
- if (walsnd->state != WALSNDSTATE_STREAMING)
+ if (state != WALSNDSTATE_STREAMING)
continue;
/* Must be synchronous */
@@ -796,7 +816,7 @@ SyncRepGetSyncStandbysPriority(bool *am_sync)
continue;
/* Must have a valid flush position */
- if (XLogRecPtrIsInvalid(walsnd->flush))
+ if (XLogRecPtrIsInvalid(flush))
continue;
/*
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 8a249e22b9f..ea9d21a46b3 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -1379,7 +1379,8 @@ pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
TupleDesc tupdesc;
Datum *values;
bool *nulls;
- WalRcvData *walrcv = WalRcv;
+ int pid;
+ bool ready_to_display;
WalRcvState state;
XLogRecPtr receive_start_lsn;
TimeLineID receive_start_tli;
@@ -1392,11 +1393,28 @@ pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
char *slotname;
char *conninfo;
+ /* Take a lock to ensure value consistency */
+ SpinLockAcquire(&WalRcv->mutex);
+ pid = (int) WalRcv->pid;
+ ready_to_display = WalRcv->ready_to_display;
+ state = WalRcv->walRcvState;
+ receive_start_lsn = WalRcv->receiveStart;
+ receive_start_tli = WalRcv->receiveStartTLI;
+ received_lsn = WalRcv->receivedUpto;
+ received_tli = WalRcv->receivedTLI;
+ last_send_time = WalRcv->lastMsgSendTime;
+ last_receipt_time = WalRcv->lastMsgReceiptTime;
+ latest_end_lsn = WalRcv->latestWalEnd;
+ latest_end_time = WalRcv->latestWalEndTime;
+ slotname = pstrdup(WalRcv->slotname);
+ conninfo = pstrdup(WalRcv->conninfo);
+ SpinLockRelease(&WalRcv->mutex);
+
/*
* No WAL receiver (or not ready yet), just return a tuple with NULL
* values
*/
- if (walrcv->pid == 0 || !walrcv->ready_to_display)
+ if (pid == 0 || !ready_to_display)
PG_RETURN_NULL();
/* determine result type */
@@ -1406,23 +1424,8 @@ pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
values = palloc0(sizeof(Datum) * tupdesc->natts);
nulls = palloc0(sizeof(bool) * tupdesc->natts);
- /* Take a lock to ensure value consistency */
- SpinLockAcquire(&walrcv->mutex);
- state = walrcv->walRcvState;
- receive_start_lsn = walrcv->receiveStart;
- receive_start_tli = walrcv->receiveStartTLI;
- received_lsn = walrcv->receivedUpto;
- received_tli = walrcv->receivedTLI;
- last_send_time = walrcv->lastMsgSendTime;
- last_receipt_time = walrcv->lastMsgReceiptTime;
- latest_end_lsn = walrcv->latestWalEnd;
- latest_end_time = walrcv->latestWalEndTime;
- slotname = pstrdup(walrcv->slotname);
- conninfo = pstrdup(walrcv->conninfo);
- SpinLockRelease(&walrcv->mutex);
-
/* Fetch values */
- values[0] = Int32GetDatum(walrcv->pid);
+ values[0] = Int32GetDatum(pid);
if (!is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS))
{
@@ -1473,6 +1476,5 @@ pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
}
/* Returns the record as Datum */
- PG_RETURN_DATUM(HeapTupleGetDatum(
- heap_form_tuple(tupdesc, values, nulls)));
+ PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
}
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index da0553e016a..002143b26a2 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -668,13 +668,9 @@ StartReplication(StartReplicationCmd *cmd)
sentPtr = cmd->startpoint;
/* Initialize shared memory status, too */
- {
- WalSnd *walsnd = MyWalSnd;
-
- SpinLockAcquire(&walsnd->mutex);
- walsnd->sentPtr = sentPtr;
- SpinLockRelease(&walsnd->mutex);
- }
+ SpinLockAcquire(&MyWalSnd->mutex);
+ MyWalSnd->sentPtr = sentPtr;
+ SpinLockRelease(&MyWalSnd->mutex);
SyncRepInitConfig();
@@ -1093,13 +1089,9 @@ StartLogicalReplication(StartReplicationCmd *cmd)
sentPtr = MyReplicationSlot->data.confirmed_flush;
/* Also update the sent position status in shared memory */
- {
- WalSnd *walsnd = MyWalSnd;
-
- SpinLockAcquire(&walsnd->mutex);
- walsnd->sentPtr = MyReplicationSlot->data.restart_lsn;
- SpinLockRelease(&walsnd->mutex);
- }
+ SpinLockAcquire(&MyWalSnd->mutex);
+ MyWalSnd->sentPtr = MyReplicationSlot->data.restart_lsn;
+ SpinLockRelease(&MyWalSnd->mutex);
replication_active = true;
@@ -2892,10 +2884,12 @@ WalSndRqstFileReload(void)
{
WalSnd *walsnd = &WalSndCtl->walsnds[i];
+ SpinLockAcquire(&walsnd->mutex);
if (walsnd->pid == 0)
+ {
+ SpinLockRelease(&walsnd->mutex);
continue;
-
- SpinLockAcquire(&walsnd->mutex);
+ }
walsnd->needreload = true;
SpinLockRelease(&walsnd->mutex);
}
@@ -3071,7 +3065,6 @@ WalSndWaitStopping(void)
for (i = 0; i < max_wal_senders; i++)
{
- WalSndState state;
WalSnd *walsnd = &WalSndCtl->walsnds[i];
SpinLockAcquire(&walsnd->mutex);
@@ -3082,14 +3075,13 @@ WalSndWaitStopping(void)
continue;
}
- state = walsnd->state;
- SpinLockRelease(&walsnd->mutex);
-
- if (state != WALSNDSTATE_STOPPING)
+ if (walsnd->state != WALSNDSTATE_STOPPING)
{
all_stopped = false;
+ SpinLockRelease(&walsnd->mutex);
break;
}
+ SpinLockRelease(&walsnd->mutex);
}
/* safe to leave if confirmation is done for all WAL senders */
@@ -3210,14 +3202,18 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
TimeOffset flushLag;
TimeOffset applyLag;
int priority;
+ int pid;
WalSndState state;
Datum values[PG_STAT_GET_WAL_SENDERS_COLS];
bool nulls[PG_STAT_GET_WAL_SENDERS_COLS];
+ SpinLockAcquire(&walsnd->mutex);
if (walsnd->pid == 0)
+ {
+ SpinLockRelease(&walsnd->mutex);
continue;
-
- SpinLockAcquire(&walsnd->mutex);
+ }
+ pid = walsnd->pid;
sentPtr = walsnd->sentPtr;
state = walsnd->state;
write = walsnd->write;
@@ -3230,7 +3226,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
SpinLockRelease(&walsnd->mutex);
memset(nulls, 0, sizeof(nulls));
- values[0] = Int32GetDatum(walsnd->pid);
+ values[0] = Int32GetDatum(pid);
if (!superuser())
{
@@ -3265,7 +3261,7 @@ pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
* which always returns an invalid flush location, as an
* asynchronous standby.
*/
- priority = XLogRecPtrIsInvalid(walsnd->flush) ? 0 : priority;
+ priority = XLogRecPtrIsInvalid(flush) ? 0 : priority;
if (writeLag < 0)
nulls[6] = true;
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index c8652dbd489..9a8b2e207ec 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -114,6 +114,9 @@ typedef struct
*/
char slotname[NAMEDATALEN];
+ /* set true once conninfo is ready to display (obfuscated pwds etc) */
+ bool ready_to_display;
+
slock_t mutex; /* locks shared variables shown above */
/*
@@ -122,9 +125,6 @@ typedef struct
*/
bool force_reply;
- /* set true once conninfo is ready to display (obfuscated pwds etc) */
- bool ready_to_display;
-
/*
* Latch used by startup process to wake up walreceiver after telling it
* where to start streaming (after setting receiveStart and
diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h
index 0aa80d5c3e2..17c68cba235 100644
--- a/src/include/replication/walsender_private.h
+++ b/src/include/replication/walsender_private.h
@@ -30,10 +30,17 @@ typedef enum WalSndState
/*
* Each walsender has a WalSnd struct in shared memory.
+ *
+ * This struct is protected by 'mutex', with two exceptions: one is
+ * sync_standby_priority as noted below. The other exception is that some
+ * members are only written by the walsender process itself, and thus that
+ * process is free to read those members without holding spinlock. pid and
+ * needreload always require the spinlock to be held for all accesses.
*/
typedef struct WalSnd
{
- pid_t pid; /* this walsender's process id, or 0 */
+ pid_t pid; /* this walsender's PID, or 0 if not active */
+
WalSndState state; /* this walsender's state */
XLogRecPtr sentPtr; /* WAL has been sent up to this point */
bool needreload; /* does currently-open file need to be