aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/walsender.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/walsender.c')
-rw-r--r--src/backend/replication/walsender.c182
1 files changed, 94 insertions, 88 deletions
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 1dcb0f57f44..717cbfd61c6 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -94,12 +94,13 @@ bool am_cascading_walsender = false; /* Am I cascading WAL to
/* User-settable parameters for walsender */
int max_wal_senders = 0; /* the maximum number of concurrent walsenders */
-int wal_sender_timeout = 60 * 1000; /* maximum time to send one
+int wal_sender_timeout = 60 * 1000; /* maximum time to send one
* WAL data message */
+
/*
* State for WalSndWakeupRequest
*/
-bool wake_wal_senders = false;
+bool wake_wal_senders = false;
/*
* These variables are used similarly to openLogFile/Id/Seg/Off,
@@ -110,7 +111,7 @@ static XLogSegNo sendSegNo = 0;
static uint32 sendOff = 0;
/* Timeline ID of the currently open file */
-static TimeLineID curFileTimeLine = 0;
+static TimeLineID curFileTimeLine = 0;
/*
* These variables keep track of the state of the timeline we're currently
@@ -118,10 +119,10 @@ static TimeLineID curFileTimeLine = 0;
* the timeline is not the latest timeline on this server, and the server's
* history forked off from that timeline at sendTimeLineValidUpto.
*/
-static TimeLineID sendTimeLine = 0;
-static TimeLineID sendTimeLineNextTLI = 0;
-static bool sendTimeLineIsHistoric = false;
-static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr;
+static TimeLineID sendTimeLine = 0;
+static TimeLineID sendTimeLineNextTLI = 0;
+static bool sendTimeLineIsHistoric = false;
+static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr;
/*
* How far have we sent WAL already? This is also advertised in
@@ -138,8 +139,9 @@ static StringInfoData tmpbuf;
* Timestamp of the last receipt of the reply from the standby.
*/
static TimestampTz last_reply_timestamp;
+
/* Have we sent a heartbeat message asking for reply, since last reply? */
-static bool ping_sent = false;
+static bool ping_sent = false;
/*
* While streaming WAL in Copy mode, streamingDoneSending is set to true
@@ -147,8 +149,8 @@ static bool ping_sent = false;
* after that. streamingDoneReceiving is set to true when we receive CopyDone
* from the other end. When both become true, it's time to exit Copy mode.
*/
-static bool streamingDoneSending;
-static bool streamingDoneReceiving;
+static bool streamingDoneSending;
+static bool streamingDoneReceiving;
/* Flags set by signal handlers for later service in main loop */
static volatile sig_atomic_t got_SIGHUP = false;
@@ -322,8 +324,8 @@ SendTimeLineHistory(TimeLineHistoryCmd *cmd)
off_t bytesleft;
/*
- * Reply with a result set with one row, and two columns. The first col
- * is the name of the history file, 2nd is the contents.
+ * Reply with a result set with one row, and two columns. The first col is
+ * the name of the history file, 2nd is the contents.
*/
TLHistoryFileName(histfname, cmd->timeline);
@@ -343,7 +345,7 @@ SendTimeLineHistory(TimeLineHistoryCmd *cmd)
pq_sendint(&buf, 0, 2); /* format code */
/* second field */
- pq_sendstring(&buf, "content"); /* col name */
+ pq_sendstring(&buf, "content"); /* col name */
pq_sendint(&buf, 0, 4); /* table oid */
pq_sendint(&buf, 0, 2); /* attnum */
pq_sendint(&buf, BYTEAOID, 4); /* type oid */
@@ -355,7 +357,7 @@ SendTimeLineHistory(TimeLineHistoryCmd *cmd)
/* Send a DataRow message */
pq_beginmessage(&buf, 'D');
pq_sendint(&buf, 2, 2); /* # of columns */
- pq_sendint(&buf, strlen(histfname), 4); /* col1 len */
+ pq_sendint(&buf, strlen(histfname), 4); /* col1 len */
pq_sendbytes(&buf, histfname, strlen(histfname));
fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0666);
@@ -373,15 +375,15 @@ SendTimeLineHistory(TimeLineHistoryCmd *cmd)
if (lseek(fd, 0, SEEK_SET) != 0)
ereport(ERROR,
(errcode_for_file_access(),
- errmsg("could not seek to beginning of file \"%s\": %m", path)));
+ errmsg("could not seek to beginning of file \"%s\": %m", path)));
pq_sendint(&buf, histfilelen, 4); /* col2 len */
bytesleft = histfilelen;
while (bytesleft > 0)
{
- char rbuf[BLCKSZ];
- int nread;
+ char rbuf[BLCKSZ];
+ int nread;
nread = read(fd, rbuf, sizeof(rbuf));
if (nread <= 0)
@@ -407,7 +409,7 @@ static void
StartReplication(StartReplicationCmd *cmd)
{
StringInfoData buf;
- XLogRecPtr FlushPtr;
+ XLogRecPtr FlushPtr;
/*
* We assume here that we're logging enough information in the WAL for
@@ -420,8 +422,8 @@ StartReplication(StartReplicationCmd *cmd)
/*
* Select the timeline. If it was given explicitly by the client, use
- * that. Otherwise use the timeline of the last replayed record, which
- * is kept in ThisTimeLineID.
+ * that. Otherwise use the timeline of the last replayed record, which is
+ * kept in ThisTimeLineID.
*/
if (am_cascading_walsender)
{
@@ -448,8 +450,8 @@ StartReplication(StartReplicationCmd *cmd)
sendTimeLineIsHistoric = true;
/*
- * Check that the timeline the client requested for exists, and the
- * requested start location is on that timeline.
+ * Check that the timeline the client requested for exists, and
+ * the requested start location is on that timeline.
*/
timeLineHistory = readTimeLineHistory(ThisTimeLineID);
switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
@@ -461,14 +463,14 @@ StartReplication(StartReplicationCmd *cmd)
* requested startpoint is on that timeline in our history.
*
* This is quite loose on purpose. We only check that we didn't
- * fork off the requested timeline before the switchpoint. We don't
- * check that we switched *to* it before the requested starting
- * point. This is because the client can legitimately request to
- * start replication from the beginning of the WAL segment that
- * contains switchpoint, but on the new timeline, so that it
- * doesn't end up with a partial segment. If you ask for a too old
- * starting point, you'll get an error later when we fail to find
- * the requested WAL segment in pg_xlog.
+ * fork off the requested timeline before the switchpoint. We
+ * don't check that we switched *to* it before the requested
+ * starting point. This is because the client can legitimately
+ * request to start replication from the beginning of the WAL
+ * segment that contains switchpoint, but on the new timeline, so
+ * that it doesn't end up with a partial segment. If you ask for a
+ * too old starting point, you'll get an error later when we fail
+ * to find the requested WAL segment in pg_xlog.
*
* XXX: we could be more strict here and only allow a startpoint
* that's older than the switchpoint, if it it's still in the same
@@ -503,12 +505,13 @@ StartReplication(StartReplicationCmd *cmd)
if (!sendTimeLineIsHistoric || cmd->startpoint < sendTimeLineValidUpto)
{
/*
- * When we first start replication the standby will be behind the primary.
- * For some applications, for example, synchronous replication, it is
- * important to have a clear state for this initial catchup mode, so we
- * can trigger actions when we change streaming state later. We may stay
- * in this state for a long time, which is exactly why we want to be able
- * to monitor whether or not we are still here.
+ * When we first start replication the standby will be behind the
+ * primary. For some applications, for example, synchronous
+ * replication, it is important to have a clear state for this initial
+ * catchup mode, so we can trigger actions when we change streaming
+ * state later. We may stay in this state for a long time, which is
+ * exactly why we want to be able to monitor whether or not we are
+ * still here.
*/
WalSndSetState(WALSNDSTATE_CATCHUP);
@@ -568,20 +571,21 @@ StartReplication(StartReplicationCmd *cmd)
if (sendTimeLineIsHistoric)
{
char tli_str[11];
- char startpos_str[8+1+8+1];
+ char startpos_str[8 + 1 + 8 + 1];
snprintf(tli_str, sizeof(tli_str), "%u", sendTimeLineNextTLI);
snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
(uint32) (sendTimeLineValidUpto >> 32),
(uint32) sendTimeLineValidUpto);
- pq_beginmessage(&buf, 'T'); /* RowDescription */
- pq_sendint(&buf, 2, 2); /* 2 fields */
+ pq_beginmessage(&buf, 'T'); /* RowDescription */
+ pq_sendint(&buf, 2, 2); /* 2 fields */
/* Field header */
pq_sendstring(&buf, "next_tli");
- pq_sendint(&buf, 0, 4); /* table oid */
- pq_sendint(&buf, 0, 2); /* attnum */
+ pq_sendint(&buf, 0, 4); /* table oid */
+ pq_sendint(&buf, 0, 2); /* attnum */
+
/*
* int8 may seem like a surprising data type for this, but in theory
* int4 would not be wide enough for this, as TimeLineID is unsigned.
@@ -592,8 +596,8 @@ StartReplication(StartReplicationCmd *cmd)
pq_sendint(&buf, 0, 2);
pq_sendstring(&buf, "next_tli_startpos");
- pq_sendint(&buf, 0, 4); /* table oid */
- pq_sendint(&buf, 0, 2); /* attnum */
+ pq_sendint(&buf, 0, 4); /* table oid */
+ pq_sendint(&buf, 0, 2); /* attnum */
pq_sendint(&buf, TEXTOID, 4); /* type oid */
pq_sendint(&buf, -1, 2);
pq_sendint(&buf, 0, 4);
@@ -602,12 +606,12 @@ StartReplication(StartReplicationCmd *cmd)
/* Data row */
pq_beginmessage(&buf, 'D');
- pq_sendint(&buf, 2, 2); /* number of columns */
+ pq_sendint(&buf, 2, 2); /* number of columns */
pq_sendint(&buf, strlen(tli_str), 4); /* length */
pq_sendbytes(&buf, tli_str, strlen(tli_str));
- pq_sendint(&buf, strlen(startpos_str), 4); /* length */
+ pq_sendint(&buf, strlen(startpos_str), 4); /* length */
pq_sendbytes(&buf, startpos_str, strlen(startpos_str));
pq_endmessage(&buf);
@@ -840,7 +844,7 @@ ProcessStandbyReplyMessage(void)
writePtr = pq_getmsgint64(&reply_message);
flushPtr = pq_getmsgint64(&reply_message);
applyPtr = pq_getmsgint64(&reply_message);
- (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */
+ (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */
replyRequested = pq_getmsgbyte(&reply_message);
elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s",
@@ -887,7 +891,7 @@ ProcessStandbyHSFeedbackMessage(void)
* Decipher the reply message. The caller already consumed the msgtype
* byte.
*/
- (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */
+ (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */
feedbackXmin = pq_getmsgint(&reply_message, 4);
feedbackEpoch = pq_getmsgint(&reply_message, 4);
@@ -932,11 +936,11 @@ ProcessStandbyHSFeedbackMessage(void)
* cleanup conflicts on the standby server.
*
* There is a small window for a race condition here: although we just
- * checked that feedbackXmin precedes nextXid, the nextXid could have gotten
- * advanced between our fetching it and applying the xmin below, perhaps
- * far enough to make feedbackXmin wrap around. In that case the xmin we
- * set here would be "in the future" and have no effect. No point in
- * worrying about this since it's too late to save the desired data
+ * checked that feedbackXmin precedes nextXid, the nextXid could have
+ * gotten advanced between our fetching it and applying the xmin below,
+ * perhaps far enough to make feedbackXmin wrap around. In that case the
+ * xmin we set here would be "in the future" and have no effect. No point
+ * in worrying about this since it's too late to save the desired data
* anyway. Assuming that the standby sends us an increasing sequence of
* xmins, this could only happen during the first reply cycle, else our
* own xmin would prevent nextXid from advancing so far.
@@ -969,8 +973,8 @@ WalSndLoop(void)
ping_sent = false;
/*
- * Loop until we reach the end of this timeline or the client requests
- * to stop streaming.
+ * Loop until we reach the end of this timeline or the client requests to
+ * stop streaming.
*/
for (;;)
{
@@ -1082,8 +1086,8 @@ WalSndLoop(void)
{
/*
* If half of wal_sender_timeout has lapsed without receiving
- * any reply from standby, send a keep-alive message to standby
- * requesting an immediate reply.
+ * any reply from standby, send a keep-alive message to
+ * standby requesting an immediate reply.
*/
timeout = TimestampTzPlusMilliseconds(last_reply_timestamp,
wal_sender_timeout / 2);
@@ -1133,6 +1137,7 @@ WalSndLoop(void)
return;
send_failure:
+
/*
* Get here on send failure. Clean up and exit.
*
@@ -1290,7 +1295,7 @@ retry:
curFileTimeLine = sendTimeLine;
if (sendTimeLineIsHistoric)
{
- XLogSegNo endSegNo;
+ XLogSegNo endSegNo;
XLByteToSeg(sendTimeLineValidUpto, endSegNo);
if (sendSegNo == endSegNo)
@@ -1311,7 +1316,7 @@ retry:
ereport(ERROR,
(errcode_for_file_access(),
errmsg("requested WAL segment %s has already been removed",
- XLogFileNameP(curFileTimeLine, sendSegNo))));
+ XLogFileNameP(curFileTimeLine, sendSegNo))));
else
ereport(ERROR,
(errcode_for_file_access(),
@@ -1327,9 +1332,9 @@ retry:
if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
ereport(ERROR,
(errcode_for_file_access(),
- errmsg("could not seek in log segment %s to offset %u: %m",
- XLogFileNameP(curFileTimeLine, sendSegNo),
- startoff)));
+ errmsg("could not seek in log segment %s to offset %u: %m",
+ XLogFileNameP(curFileTimeLine, sendSegNo),
+ startoff)));
sendOff = startoff;
}
@@ -1344,9 +1349,9 @@ retry:
{
ereport(ERROR,
(errcode_for_file_access(),
- errmsg("could not read from log segment %s, offset %u, length %lu: %m",
- XLogFileNameP(curFileTimeLine, sendSegNo),
- sendOff, (unsigned long) segbytes)));
+ errmsg("could not read from log segment %s, offset %u, length %lu: %m",
+ XLogFileNameP(curFileTimeLine, sendSegNo),
+ sendOff, (unsigned long) segbytes)));
}
/* Update state for read */
@@ -1431,16 +1436,16 @@ XLogSend(bool *caughtup)
/*
* Streaming the latest timeline on a standby.
*
- * Attempt to send all WAL that has already been replayed, so that
- * we know it's valid. If we're receiving WAL through streaming
+ * Attempt to send all WAL that has already been replayed, so that we
+ * know it's valid. If we're receiving WAL through streaming
* replication, it's also OK to send any WAL that has been received
* but not replayed.
*
* The timeline we're recovering from can change, or we can be
- * promoted. In either case, the current timeline becomes historic.
- * We need to detect that so that we don't try to stream past the
- * point where we switched to another timeline. We check for promotion
- * or timeline switch after calculating FlushPtr, to avoid a race
+ * promoted. In either case, the current timeline becomes historic. We
+ * need to detect that so that we don't try to stream past the point
+ * where we switched to another timeline. We check for promotion or
+ * timeline switch after calculating FlushPtr, to avoid a race
* condition: if the timeline becomes historic just after we checked
* that it was still current, it's still be OK to stream it up to the
* FlushPtr that was calculated before it became historic.
@@ -1496,7 +1501,7 @@ XLogSend(bool *caughtup)
*
* Attempt to send all data that's already been written out and
* fsync'd to disk. We cannot go further than what's been written out
- * given the current implementation of XLogRead(). And in any case
+ * given the current implementation of XLogRead(). And in any case
* it's unsafe to send WAL that is not securely down to disk on the
* master: if the master subsequently crashes and restarts, slaves
* must not have applied any WAL that gets lost on the master.
@@ -1509,13 +1514,14 @@ XLogSend(bool *caughtup)
* forked to the next timeline, stop streaming.
*
* Note: We might already have sent WAL > sendTimeLineValidUpto. The
- * startup process will normally replay all WAL that has been received from
- * the master, before promoting, but if the WAL streaming is terminated at
- * a WAL page boundary, the valid portion of the timeline might end in the
- * middle of a WAL record. We might've already sent the first half of that
- * partial WAL record to the cascading standby, so that sentPtr >
- * sendTimeLineValidUpto. That's OK; the cascading standby can't replay the
- * partial WAL record either, so it can still follow our timeline switch.
+ * startup process will normally replay all WAL that has been received
+ * from the master, before promoting, but if the WAL streaming is
+ * terminated at a WAL page boundary, the valid portion of the timeline
+ * might end in the middle of a WAL record. We might've already sent the
+ * first half of that partial WAL record to the cascading standby, so that
+ * sentPtr > sendTimeLineValidUpto. That's OK; the cascading standby can't
+ * replay the partial WAL record either, so it can still follow our
+ * timeline switch.
*/
if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr)
{
@@ -1585,8 +1591,8 @@ XLogSend(bool *caughtup)
pq_sendbyte(&output_message, 'w');
pq_sendint64(&output_message, startptr); /* dataStart */
- pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
- pq_sendint64(&output_message, 0); /* sendtime, filled in last */
+ pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
+ pq_sendint64(&output_message, 0); /* sendtime, filled in last */
/*
* Read the log directly into the output buffer to avoid extra memcpy
@@ -1643,16 +1649,16 @@ XLogSend(bool *caughtup)
static XLogRecPtr
GetStandbyFlushRecPtr(void)
{
- XLogRecPtr replayPtr;
- TimeLineID replayTLI;
- XLogRecPtr receivePtr;
- TimeLineID receiveTLI;
+ XLogRecPtr replayPtr;
+ TimeLineID replayTLI;
+ XLogRecPtr receivePtr;
+ TimeLineID receiveTLI;
XLogRecPtr result;
/*
* We can safely send what's already been replayed. Also, if walreceiver
- * is streaming WAL from the same timeline, we can send anything that
- * it has streamed, but hasn't been replayed yet.
+ * is streaming WAL from the same timeline, we can send anything that it
+ * has streamed, but hasn't been replayed yet.
*/
receivePtr = GetWalRcvWriteRecPtr(NULL, &receiveTLI);
@@ -1742,8 +1748,8 @@ WalSndSignals(void)
pqsignal(SIGHUP, WalSndSigHupHandler); /* set flag to read config
* file */
pqsignal(SIGINT, SIG_IGN); /* not used */
- pqsignal(SIGTERM, die); /* request shutdown */
- pqsignal(SIGQUIT, quickdie); /* hard crash time */
+ pqsignal(SIGTERM, die); /* request shutdown */
+ pqsignal(SIGQUIT, quickdie); /* hard crash time */
InitializeTimeouts(); /* establishes SIGALRM handler */
pqsignal(SIGPIPE, SIG_IGN);
pqsignal(SIGUSR1, WalSndXLogSendHandler); /* request WAL sending */