aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/walsender.c72
-rw-r--r--src/include/replication/walsender_private.h5
2 files changed, 53 insertions, 24 deletions
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 45b8b3684f6..d3a136b6f55 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -3309,6 +3309,9 @@ WalSndShmemInit(void)
SpinLockInit(&walsnd->mutex);
}
+
+ ConditionVariableInit(&WalSndCtl->wal_flush_cv);
+ ConditionVariableInit(&WalSndCtl->wal_replay_cv);
}
}
@@ -3330,31 +3333,17 @@ WalSndShmemInit(void)
void
WalSndWakeup(bool physical, bool logical)
{
- int i;
-
- for (i = 0; i < max_wal_senders; i++)
- {
- Latch *latch;
- ReplicationKind kind;
- WalSnd *walsnd = &WalSndCtl->walsnds[i];
-
- /*
- * Get latch pointer with spinlock held, for the unlikely case that
- * pointer reads aren't atomic (as they're 8 bytes). While at it, also
- * get kind.
- */
- SpinLockAcquire(&walsnd->mutex);
- latch = walsnd->latch;
- kind = walsnd->kind;
- SpinLockRelease(&walsnd->mutex);
-
- if (latch == NULL)
- continue;
+ /*
+ * Wake up all the walsenders waiting on WAL being flushed or replayed
+ * respectively. Note that waiting walsender would have prepared to sleep
+ * on the CV (i.e., added itself to the CV's waitlist) in WalSndWait()
+ * before actually waiting.
+ */
+ if (physical)
+ ConditionVariableBroadcast(&WalSndCtl->wal_flush_cv);
- if ((physical && kind == REPLICATION_KIND_PHYSICAL) ||
- (logical && kind == REPLICATION_KIND_LOGICAL))
- SetLatch(latch);
- }
+ if (logical)
+ ConditionVariableBroadcast(&WalSndCtl->wal_replay_cv);
}
/*
@@ -3368,9 +3357,44 @@ WalSndWait(uint32 socket_events, long timeout, uint32 wait_event)
WaitEvent event;
ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, socket_events, NULL);
+
+ /*
+ * We use a condition variable to efficiently wake up walsenders in
+ * WalSndWakeup().
+ *
+ * Every walsender prepares to sleep on a shared memory CV. Note that it
+ * just prepares to sleep on the CV (i.e., adds itself to the CV's
+ * waitlist), but does not actually wait on the CV (IOW, it never calls
+ * ConditionVariableSleep()). It still uses WaitEventSetWait() for
+ * waiting, because we also need to wait for socket events. The processes
+ * (startup process, walreceiver etc.) wanting to wake up walsenders use
+ * ConditionVariableBroadcast(), which in turn calls SetLatch(), helping
+ * walsenders come out of WaitEventSetWait().
+ *
+ * This approach is simple and efficient because, one doesn't have to loop
+ * through all the walsenders slots, with a spinlock acquisition and
+ * release for every iteration, just to wake up only the waiting
+ * walsenders. It makes WalSndWakeup() callers' life easy.
+ *
+ * XXX: A desirable future improvement would be to add support for CVs
+ * into WaitEventSetWait().
+ *
+ * And, we use separate shared memory CVs for physical and logical
+ * walsenders for selective wake ups, see WalSndWakeup() for more details.
+ */
+ if (MyWalSnd->kind == REPLICATION_KIND_PHYSICAL)
+ ConditionVariablePrepareToSleep(&WalSndCtl->wal_flush_cv);
+ else if (MyWalSnd->kind == REPLICATION_KIND_LOGICAL)
+ ConditionVariablePrepareToSleep(&WalSndCtl->wal_replay_cv);
+
if (WaitEventSetWait(FeBeWaitSet, timeout, &event, 1, wait_event) == 1 &&
(event.events & WL_POSTMASTER_DEATH))
+ {
+ ConditionVariableCancelSleep();
proc_exit(1);
+ }
+
+ ConditionVariableCancelSleep();
}
/*
diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h
index ff25aa70a89..7d919583bd3 100644
--- a/src/include/replication/walsender_private.h
+++ b/src/include/replication/walsender_private.h
@@ -17,6 +17,7 @@
#include "nodes/nodes.h"
#include "nodes/replnodes.h"
#include "replication/syncrep.h"
+#include "storage/condition_variable.h"
#include "storage/latch.h"
#include "storage/shmem.h"
#include "storage/spin.h"
@@ -108,6 +109,10 @@ typedef struct
*/
bool sync_standbys_defined;
+ /* used as a registry of physical / logical walsenders to wake */
+ ConditionVariable wal_flush_cv;
+ ConditionVariable wal_replay_cv;
+
WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER];
} WalSndCtlData;