aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication')
-rw-r--r--src/backend/replication/basebackup.c5
-rw-r--r--src/backend/replication/syncrep.c7
-rw-r--r--src/backend/replication/walreceiver.c7
-rw-r--r--src/backend/replication/walsender.c108
4 files changed, 100 insertions, 27 deletions
diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c
index bcde19c71b6..74d28440bf4 100644
--- a/src/backend/replication/basebackup.c
+++ b/src/backend/replication/basebackup.c
@@ -339,6 +339,11 @@ SendBaseBackup(BaseBackupCmd *cmd)
MemoryContext old_context;
basebackup_options opt;
+ if (am_cascading_walsender)
+ ereport(FATAL,
+ (errcode(ERRCODE_CANNOT_CONNECT_NOW),
+ errmsg("recovery is still in progress, can't accept WAL streaming connections for backup")));
+
parse_basebackup_options(cmd->options, &opt);
backup_context = AllocSetContextCreate(CurrentMemoryContext,
diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c
index b73d225a8ef..32db2bc4c52 100644
--- a/src/backend/replication/syncrep.c
+++ b/src/backend/replication/syncrep.c
@@ -469,6 +469,13 @@ SyncRepGetStandbyPriority(void)
int priority = 0;
bool found = false;
+ /*
+ * Since synchronous cascade replication is not allowed, we always
+ * set the priority of cascading walsender to zero.
+ */
+ if (am_cascading_walsender)
+ return 0;
+
/* Need a modifiable copy of string */
rawstring = pstrdup(SyncRepStandbyNames);
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index ea6f6cdcdaf..c24fa87394d 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -44,6 +44,7 @@
#include "miscadmin.h"
#include "replication/walprotocol.h"
#include "replication/walreceiver.h"
+#include "replication/walsender.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
#include "storage/procarray.h"
@@ -564,8 +565,10 @@ XLogWalRcvFlush(bool dying)
}
SpinLockRelease(&walrcv->mutex);
- /* Signal the startup process that new WAL has arrived */
+ /* Signal the startup process and walsender that new WAL has arrived */
WakeupRecovery();
+ if (AllowCascadeReplication())
+ WalSndWakeup();
/* Report XLOG streaming progress in PS display */
if (update_process_title)
@@ -625,7 +628,7 @@ XLogWalRcvSendReply(void)
/* Construct a new message */
reply_message.write = LogstreamResult.Write;
reply_message.flush = LogstreamResult.Flush;
- reply_message.apply = GetXLogReplayRecPtr();
+ reply_message.apply = GetXLogReplayRecPtr(NULL);
reply_message.sendTime = now;
elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X",
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index bc5b3300d23..63a63048dbb 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -48,6 +48,7 @@
#include "replication/basebackup.h"
#include "replication/replnodes.h"
#include "replication/walprotocol.h"
+#include "replication/walreceiver.h"
#include "replication/walsender.h"
#include "storage/fd.h"
#include "storage/ipc.h"
@@ -70,6 +71,7 @@ WalSnd *MyWalSnd = NULL;
/* Global state */
bool am_walsender = false; /* Am I a walsender process ? */
+bool am_cascading_walsender = false; /* Am I cascading WAL to another standby ? */
/* User-settable parameters for walsender */
int max_wal_senders = 0; /* the maximum number of concurrent walsenders */
@@ -135,10 +137,7 @@ WalSenderMain(void)
{
MemoryContext walsnd_context;
- if (RecoveryInProgress())
- ereport(FATAL,
- (errcode(ERRCODE_CANNOT_CONNECT_NOW),
- errmsg("recovery is still in progress, can't accept WAL streaming connections")));
+ am_cascading_walsender = RecoveryInProgress();
/* Create a per-walsender data structure in shared memory */
InitWalSnd();
@@ -165,6 +164,12 @@ WalSenderMain(void)
/* Unblock signals (they were blocked when the postmaster forked us) */
PG_SETMASK(&UnBlockSig);
+ /*
+ * Use the recovery target timeline ID during recovery
+ */
+ if (am_cascading_walsender)
+ ThisTimeLineID = GetRecoveryTargetTLI();
+
/* Tell the standby that walsender is ready for receiving commands */
ReadyForQuery(DestRemote);
@@ -290,7 +295,7 @@ IdentifySystem(void)
GetSystemIdentifier());
snprintf(tli, sizeof(tli), "%u", ThisTimeLineID);
- logptr = GetInsertRecPtr();
+ logptr = am_cascading_walsender ? GetStandbyFlushRecPtr() : GetInsertRecPtr();
snprintf(xpos, sizeof(xpos), "%X/%X",
logptr.xlogid, logptr.xrecoff);
@@ -364,19 +369,13 @@ StartReplication(StartReplicationCmd *cmd)
SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE);
/*
- * Check that we're logging enough information in the WAL for
- * log-shipping.
+ * We assume here that we're logging enough information in the WAL for
+ * log-shipping, since this is checked in PostmasterMain().
*
- * NOTE: This only checks the current value of wal_level. Even if the
- * current setting is not 'minimal', there can be old WAL in the pg_xlog
- * directory that was created with 'minimal'. So this is not bulletproof,
- * the purpose is just to give a user-friendly error message that hints
- * how to configure the system correctly.
+ * NOTE: wal_level can only change at shutdown, so in most cases it is
+ * difficult for there to be WAL data that we can still see that was written
+ * at wal_level='minimal'.
*/
- if (wal_level == WAL_LEVEL_MINIMAL)
- ereport(FATAL,
- (errcode(ERRCODE_CANNOT_CONNECT_NOW),
- errmsg("standby connections not allowed because wal_level=minimal")));
/*
* When we first start replication the standby will be behind the primary.
@@ -601,7 +600,8 @@ ProcessStandbyReplyMessage(void)
SpinLockRelease(&walsnd->mutex);
}
- SyncRepReleaseWaiters();
+ if (!am_cascading_walsender)
+ SyncRepReleaseWaiters();
}
/*
@@ -764,6 +764,8 @@ WalSndLoop(void)
/*
* When SIGUSR2 arrives, we send any outstanding logs up to the
* shutdown checkpoint record (i.e., the latest record) and exit.
+ * This may be a normal termination at shutdown, or a promotion,
+ * the walsender is not sure which.
*/
if (walsender_ready_to_stop && !pq_is_send_pending())
{
@@ -933,7 +935,7 @@ WalSndKill(int code, Datum arg)
}
/*
- * Read 'nbytes' bytes from WAL into 'buf', starting at location 'recptr'
+ * Read 'count' bytes from WAL into 'buf', starting at location 'startptr'
*
* XXX probably this should be improved to suck data directly from the
* WAL buffers when possible.
@@ -944,15 +946,21 @@ WalSndKill(int code, Datum arg)
* more than one.
*/
void
-XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
+XLogRead(char *buf, XLogRecPtr startptr, Size count)
{
- XLogRecPtr startRecPtr = recptr;
- char path[MAXPGPATH];
+ char *p;
+ XLogRecPtr recptr;
+ Size nbytes;
uint32 lastRemovedLog;
uint32 lastRemovedSeg;
uint32 log;
uint32 seg;
+retry:
+ p = buf;
+ recptr = startptr;
+ nbytes = count;
+
while (nbytes > 0)
{
uint32 startoff;
@@ -963,6 +971,8 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
if (sendFile < 0 || !XLByteInSeg(recptr, sendId, sendSeg))
{
+ char path[MAXPGPATH];
+
/* Switch to another logfile segment */
if (sendFile >= 0)
close(sendFile);
@@ -1014,7 +1024,7 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
else
segbytes = nbytes;
- readbytes = read(sendFile, buf, segbytes);
+ readbytes = read(sendFile, p, segbytes);
if (readbytes <= 0)
ereport(ERROR,
(errcode_for_file_access(),
@@ -1027,7 +1037,7 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
sendOff += readbytes;
nbytes -= readbytes;
- buf += readbytes;
+ p += readbytes;
}
/*
@@ -1038,7 +1048,7 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
* already have been overwritten with new WAL records.
*/
XLogGetLastRemoved(&lastRemovedLog, &lastRemovedSeg);
- XLByteToSeg(startRecPtr, log, seg);
+ XLByteToSeg(startptr, log, seg);
if (log < lastRemovedLog ||
(log == lastRemovedLog && seg <= lastRemovedSeg))
{
@@ -1050,6 +1060,32 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
errmsg("requested WAL segment %s has already been removed",
filename)));
}
+
+ /*
+ * During recovery, the currently-open WAL file might be replaced with
+ * the file of the same name retrieved from archive. So we always need
+ * to check what we read was valid after reading into the buffer. If it's
+ * invalid, we try to open and read the file again.
+ */
+ if (am_cascading_walsender)
+ {
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalSnd *walsnd = MyWalSnd;
+ bool reload;
+
+ SpinLockAcquire(&walsnd->mutex);
+ reload = walsnd->needreload;
+ walsnd->needreload = false;
+ SpinLockRelease(&walsnd->mutex);
+
+ if (reload && sendFile >= 0)
+ {
+ close(sendFile);
+ sendFile = -1;
+
+ goto retry;
+ }
+ }
}
/*
@@ -1082,7 +1118,7 @@ XLogSend(char *msgbuf, bool *caughtup)
* subsequently crashes and restarts, slaves must not have applied any WAL
* that gets lost on the master.
*/
- SendRqstPtr = GetFlushRecPtr();
+ SendRqstPtr = am_cascading_walsender ? GetStandbyFlushRecPtr() : GetFlushRecPtr();
/* Quick exit if nothing to do */
if (XLByteLE(SendRqstPtr, sentPtr))
@@ -1187,6 +1223,28 @@ XLogSend(char *msgbuf, bool *caughtup)
return;
}
+/*
+ * Request walsenders to reload the currently-open WAL file
+ */
+void
+WalSndRqstFileReload(void)
+{
+ int i;
+
+ for (i = 0; i < max_wal_senders; i++)
+ {
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalSnd *walsnd = &WalSndCtl->walsnds[i];
+
+ if (walsnd->pid == 0)
+ continue;
+
+ SpinLockAcquire(&walsnd->mutex);
+ walsnd->needreload = true;
+ SpinLockRelease(&walsnd->mutex);
+ }
+}
+
/* SIGHUP: set flag to re-read config file at next convenient time */
static void
WalSndSigHupHandler(SIGNAL_ARGS)