aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/backend/access/transam/xlog.c6
-rw-r--r--src/backend/access/transam/xlogarchive.c2
-rw-r--r--src/backend/access/transam/xlogrecovery.c37
-rw-r--r--src/backend/replication/walreceiver.c2
-rw-r--r--src/backend/replication/walsender.c41
-rw-r--r--src/include/replication/walsender.h22
-rw-r--r--src/include/replication/walsender_private.h3
7 files changed, 84 insertions, 29 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 13f83dd57d6..1b7c2f23a41 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -2645,7 +2645,7 @@ XLogFlush(XLogRecPtr record)
END_CRIT_SECTION();
/* wake up walsenders now that we've released heavily contended locks */
- WalSndWakeupProcessRequests();
+ WalSndWakeupProcessRequests(true, !RecoveryInProgress());
/*
* If we still haven't flushed to the request point then we have a
@@ -2816,7 +2816,7 @@ XLogBackgroundFlush(void)
END_CRIT_SECTION();
/* wake up walsenders now that we've released heavily contended locks */
- WalSndWakeupProcessRequests();
+ WalSndWakeupProcessRequests(true, !RecoveryInProgress());
/*
* Great, done. To take some work off the critical path, try to initialize
@@ -5765,7 +5765,7 @@ StartupXLOG(void)
* If there were cascading standby servers connected to us, nudge any wal
* sender processes to notice that we've been promoted.
*/
- WalSndWakeup();
+ WalSndWakeup(true, true);
/*
* If this was a promotion, request an (online) checkpoint now. This isn't
diff --git a/src/backend/access/transam/xlogarchive.c b/src/backend/access/transam/xlogarchive.c
index a0f5aa24b58..f3fb92c8f96 100644
--- a/src/backend/access/transam/xlogarchive.c
+++ b/src/backend/access/transam/xlogarchive.c
@@ -421,7 +421,7 @@ KeepFileRestoredFromArchive(const char *path, const char *xlogfname)
* if we restored something other than a WAL segment, but it does no harm
* either.
*/
- WalSndWakeup();
+ WalSndWakeup(true, false);
}
/*
diff --git a/src/backend/access/transam/xlogrecovery.c b/src/backend/access/transam/xlogrecovery.c
index dbe93947627..02d1b2cd6d8 100644
--- a/src/backend/access/transam/xlogrecovery.c
+++ b/src/backend/access/transam/xlogrecovery.c
@@ -1935,6 +1935,31 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl
XLogRecoveryCtl->lastReplayedTLI = *replayTLI;
SpinLockRelease(&XLogRecoveryCtl->info_lck);
+ /* ------
+ * Wakeup walsenders:
+ *
+ * On the standby, the WAL is flushed first (which will only wake up
+ * physical walsenders) and then applied, which will only wake up logical
+ * walsenders.
+ *
+ * Indeed, logical walsenders on standby can't decode and send data until
+ * it's been applied.
+ *
+ * Physical walsenders don't need to be woken up during replay unless
+ * cascading replication is allowed and time line change occurred (so that
+ * they can notice that they are on a new time line).
+ *
+ * That's why the wake up conditions are for:
+ *
+ * - physical walsenders in case of new time line and cascade
+ * replication is allowed
+ * - logical walsenders in case cascade replication is allowed (could not
+ * be created otherwise)
+ * ------
+ */
+ if (AllowCascadeReplication())
+ WalSndWakeup(switchedTLI, true);
+
/*
* If rm_redo called XLogRequestWalReceiverReply, then we wake up the
* receiver so that it notices the updated lastReplayedEndRecPtr and sends
@@ -1958,12 +1983,6 @@ ApplyWalRecord(XLogReaderState *xlogreader, XLogRecord *record, TimeLineID *repl
*/
RemoveNonParentXlogFiles(xlogreader->EndRecPtr, *replayTLI);
- /*
- * Wake up any walsenders to notice that we are on a new timeline.
- */
- if (AllowCascadeReplication())
- WalSndWakeup();
-
/* Reset the prefetcher. */
XLogPrefetchReconfigure();
}
@@ -3050,9 +3069,9 @@ ReadRecord(XLogPrefetcher *xlogprefetcher, int emode,
{
/*
* When we find that WAL ends in an incomplete record, keep track
- * of that record. After recovery is done, we'll write a record to
- * indicate to downstream WAL readers that that portion is to be
- * ignored.
+ * of that record. After recovery is done, we'll write a record
+ * to indicate to downstream WAL readers that that portion is to
+ * be ignored.
*
* However, when ArchiveRecoveryRequested = true, we're going to
* switch to a new timeline at the end of recovery. We will only
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 685af51d5d3..feff7094351 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -1010,7 +1010,7 @@ XLogWalRcvFlush(bool dying, TimeLineID tli)
/* Signal the startup process and walsender that new WAL has arrived */
WakeupRecovery();
if (AllowCascadeReplication())
- WalSndWakeup();
+ WalSndWakeup(true, false);
/* Report XLOG streaming progress in PS display */
if (update_process_title)
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index e40a9b1ba7b..5423cf0a171 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -2603,6 +2603,23 @@ InitWalSenderSlot(void)
walsnd->sync_standby_priority = 0;
walsnd->latch = &MyProc->procLatch;
walsnd->replyTime = 0;
+
+ /*
+ * The kind assignment is done here and not in StartReplication()
+ * and StartLogicalReplication(). Indeed, the logical walsender
+ * needs to read WAL records (like snapshot of running
+ * transactions) during the slot creation. So it needs to be woken
+ * up based on its kind.
+ *
+ * The kind assignment could also be done in StartReplication(),
+ * StartLogicalReplication() and CREATE_REPLICATION_SLOT but it
+ * seems better to set it on one place.
+ */
+ if (MyDatabaseId == InvalidOid)
+ walsnd->kind = REPLICATION_KIND_PHYSICAL;
+ else
+ walsnd->kind = REPLICATION_KIND_LOGICAL;
+
SpinLockRelease(&walsnd->mutex);
/* don't need the lock anymore */
MyWalSnd = (WalSnd *) walsnd;
@@ -3280,30 +3297,46 @@ WalSndShmemInit(void)
}
/*
- * Wake up all walsenders
+ * Wake up physical, logical or both kinds of walsenders
+ *
+ * The distinction between physical and logical walsenders is done, because:
+ * - physical walsenders can't send data until it's been flushed
+ * - logical walsenders on standby can't decode and send data until it's been
+ * applied
+ *
+ * For cascading replication we need to wake up physical walsenders separately
+ * from logical walsenders (see the comment before calling WalSndWakeup() in
+ * ApplyWalRecord() for more details).
*
* This will be called inside critical sections, so throwing an error is not
* advisable.
*/
void
-WalSndWakeup(void)
+WalSndWakeup(bool physical, bool logical)
{
int i;
for (i = 0; i < max_wal_senders; i++)
{
Latch *latch;
+ ReplicationKind kind;
WalSnd *walsnd = &WalSndCtl->walsnds[i];
/*
* Get latch pointer with spinlock held, for the unlikely case that
- * pointer reads aren't atomic (as they're 8 bytes).
+ * pointer reads aren't atomic (as they're 8 bytes). While at it, also
+ * get kind.
*/
SpinLockAcquire(&walsnd->mutex);
latch = walsnd->latch;
+ kind = walsnd->kind;
SpinLockRelease(&walsnd->mutex);
- if (latch != NULL)
+ if (latch == NULL)
+ continue;
+
+ if ((physical && kind == REPLICATION_KIND_PHYSICAL) ||
+ (logical && kind == REPLICATION_KIND_LOGICAL))
SetLatch(latch);
}
}
diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h
index 52bb3e2aae3..9df7e50f943 100644
--- a/src/include/replication/walsender.h
+++ b/src/include/replication/walsender.h
@@ -42,7 +42,7 @@ extern void WalSndResourceCleanup(bool isCommit);
extern void WalSndSignals(void);
extern Size WalSndShmemSize(void);
extern void WalSndShmemInit(void);
-extern void WalSndWakeup(void);
+extern void WalSndWakeup(bool physical, bool logical);
extern void WalSndInitStopping(void);
extern void WalSndWaitStopping(void);
extern void HandleWalSndInitStopping(void);
@@ -60,15 +60,15 @@ extern void WalSndRqstFileReload(void);
/*
* wakeup walsenders if there is work to be done
*/
-#define WalSndWakeupProcessRequests() \
- do \
- { \
- if (wake_wal_senders) \
- { \
- wake_wal_senders = false; \
- if (max_wal_senders > 0) \
- WalSndWakeup(); \
- } \
- } while (0)
+static inline void
+WalSndWakeupProcessRequests(bool physical, bool logical)
+{
+ if (wake_wal_senders)
+ {
+ wake_wal_senders = false;
+ if (max_wal_senders > 0)
+ WalSndWakeup(physical, logical);
+ }
+}
#endif /* _WALSENDER_H */
diff --git a/src/include/replication/walsender_private.h b/src/include/replication/walsender_private.h
index 5310e054c48..ff25aa70a89 100644
--- a/src/include/replication/walsender_private.h
+++ b/src/include/replication/walsender_private.h
@@ -15,6 +15,7 @@
#include "access/xlog.h"
#include "lib/ilist.h"
#include "nodes/nodes.h"
+#include "nodes/replnodes.h"
#include "replication/syncrep.h"
#include "storage/latch.h"
#include "storage/shmem.h"
@@ -79,6 +80,8 @@ typedef struct WalSnd
* Timestamp of the last message received from standby.
*/
TimestampTz replyTime;
+
+ ReplicationKind kind;
} WalSnd;
extern PGDLLIMPORT WalSnd *MyWalSnd;