diff options
Diffstat (limited to 'src/backend/replication/walsender.c')
-rw-r--r-- | src/backend/replication/walsender.c | 182 |
1 files changed, 94 insertions, 88 deletions
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 1dcb0f57f44..717cbfd61c6 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -94,12 +94,13 @@ bool am_cascading_walsender = false; /* Am I cascading WAL to /* User-settable parameters for walsender */ int max_wal_senders = 0; /* the maximum number of concurrent walsenders */ -int wal_sender_timeout = 60 * 1000; /* maximum time to send one +int wal_sender_timeout = 60 * 1000; /* maximum time to send one * WAL data message */ + /* * State for WalSndWakeupRequest */ -bool wake_wal_senders = false; +bool wake_wal_senders = false; /* * These variables are used similarly to openLogFile/Id/Seg/Off, @@ -110,7 +111,7 @@ static XLogSegNo sendSegNo = 0; static uint32 sendOff = 0; /* Timeline ID of the currently open file */ -static TimeLineID curFileTimeLine = 0; +static TimeLineID curFileTimeLine = 0; /* * These variables keep track of the state of the timeline we're currently @@ -118,10 +119,10 @@ static TimeLineID curFileTimeLine = 0; * the timeline is not the latest timeline on this server, and the server's * history forked off from that timeline at sendTimeLineValidUpto. */ -static TimeLineID sendTimeLine = 0; -static TimeLineID sendTimeLineNextTLI = 0; -static bool sendTimeLineIsHistoric = false; -static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr; +static TimeLineID sendTimeLine = 0; +static TimeLineID sendTimeLineNextTLI = 0; +static bool sendTimeLineIsHistoric = false; +static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr; /* * How far have we sent WAL already? This is also advertised in @@ -138,8 +139,9 @@ static StringInfoData tmpbuf; * Timestamp of the last receipt of the reply from the standby. */ static TimestampTz last_reply_timestamp; + /* Have we sent a heartbeat message asking for reply, since last reply? */ -static bool ping_sent = false; +static bool ping_sent = false; /* * While streaming WAL in Copy mode, streamingDoneSending is set to true @@ -147,8 +149,8 @@ static bool ping_sent = false; * after that. streamingDoneReceiving is set to true when we receive CopyDone * from the other end. When both become true, it's time to exit Copy mode. */ -static bool streamingDoneSending; -static bool streamingDoneReceiving; +static bool streamingDoneSending; +static bool streamingDoneReceiving; /* Flags set by signal handlers for later service in main loop */ static volatile sig_atomic_t got_SIGHUP = false; @@ -322,8 +324,8 @@ SendTimeLineHistory(TimeLineHistoryCmd *cmd) off_t bytesleft; /* - * Reply with a result set with one row, and two columns. The first col - * is the name of the history file, 2nd is the contents. + * Reply with a result set with one row, and two columns. The first col is + * the name of the history file, 2nd is the contents. */ TLHistoryFileName(histfname, cmd->timeline); @@ -343,7 +345,7 @@ SendTimeLineHistory(TimeLineHistoryCmd *cmd) pq_sendint(&buf, 0, 2); /* format code */ /* second field */ - pq_sendstring(&buf, "content"); /* col name */ + pq_sendstring(&buf, "content"); /* col name */ pq_sendint(&buf, 0, 4); /* table oid */ pq_sendint(&buf, 0, 2); /* attnum */ pq_sendint(&buf, BYTEAOID, 4); /* type oid */ @@ -355,7 +357,7 @@ SendTimeLineHistory(TimeLineHistoryCmd *cmd) /* Send a DataRow message */ pq_beginmessage(&buf, 'D'); pq_sendint(&buf, 2, 2); /* # of columns */ - pq_sendint(&buf, strlen(histfname), 4); /* col1 len */ + pq_sendint(&buf, strlen(histfname), 4); /* col1 len */ pq_sendbytes(&buf, histfname, strlen(histfname)); fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0666); @@ -373,15 +375,15 @@ SendTimeLineHistory(TimeLineHistoryCmd *cmd) if (lseek(fd, 0, SEEK_SET) != 0) ereport(ERROR, (errcode_for_file_access(), - errmsg("could not seek to beginning of file \"%s\": %m", path))); + errmsg("could not seek to beginning of file \"%s\": %m", path))); pq_sendint(&buf, histfilelen, 4); /* col2 len */ bytesleft = histfilelen; while (bytesleft > 0) { - char rbuf[BLCKSZ]; - int nread; + char rbuf[BLCKSZ]; + int nread; nread = read(fd, rbuf, sizeof(rbuf)); if (nread <= 0) @@ -407,7 +409,7 @@ static void StartReplication(StartReplicationCmd *cmd) { StringInfoData buf; - XLogRecPtr FlushPtr; + XLogRecPtr FlushPtr; /* * We assume here that we're logging enough information in the WAL for @@ -420,8 +422,8 @@ StartReplication(StartReplicationCmd *cmd) /* * Select the timeline. If it was given explicitly by the client, use - * that. Otherwise use the timeline of the last replayed record, which - * is kept in ThisTimeLineID. + * that. Otherwise use the timeline of the last replayed record, which is + * kept in ThisTimeLineID. */ if (am_cascading_walsender) { @@ -448,8 +450,8 @@ StartReplication(StartReplicationCmd *cmd) sendTimeLineIsHistoric = true; /* - * Check that the timeline the client requested for exists, and the - * requested start location is on that timeline. + * Check that the timeline the client requested for exists, and + * the requested start location is on that timeline. */ timeLineHistory = readTimeLineHistory(ThisTimeLineID); switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory, @@ -461,14 +463,14 @@ StartReplication(StartReplicationCmd *cmd) * requested startpoint is on that timeline in our history. * * This is quite loose on purpose. We only check that we didn't - * fork off the requested timeline before the switchpoint. We don't - * check that we switched *to* it before the requested starting - * point. This is because the client can legitimately request to - * start replication from the beginning of the WAL segment that - * contains switchpoint, but on the new timeline, so that it - * doesn't end up with a partial segment. If you ask for a too old - * starting point, you'll get an error later when we fail to find - * the requested WAL segment in pg_xlog. + * fork off the requested timeline before the switchpoint. We + * don't check that we switched *to* it before the requested + * starting point. This is because the client can legitimately + * request to start replication from the beginning of the WAL + * segment that contains switchpoint, but on the new timeline, so + * that it doesn't end up with a partial segment. If you ask for a + * too old starting point, you'll get an error later when we fail + * to find the requested WAL segment in pg_xlog. * * XXX: we could be more strict here and only allow a startpoint * that's older than the switchpoint, if it it's still in the same @@ -503,12 +505,13 @@ StartReplication(StartReplicationCmd *cmd) if (!sendTimeLineIsHistoric || cmd->startpoint < sendTimeLineValidUpto) { /* - * When we first start replication the standby will be behind the primary. - * For some applications, for example, synchronous replication, it is - * important to have a clear state for this initial catchup mode, so we - * can trigger actions when we change streaming state later. We may stay - * in this state for a long time, which is exactly why we want to be able - * to monitor whether or not we are still here. + * When we first start replication the standby will be behind the + * primary. For some applications, for example, synchronous + * replication, it is important to have a clear state for this initial + * catchup mode, so we can trigger actions when we change streaming + * state later. We may stay in this state for a long time, which is + * exactly why we want to be able to monitor whether or not we are + * still here. */ WalSndSetState(WALSNDSTATE_CATCHUP); @@ -568,20 +571,21 @@ StartReplication(StartReplicationCmd *cmd) if (sendTimeLineIsHistoric) { char tli_str[11]; - char startpos_str[8+1+8+1]; + char startpos_str[8 + 1 + 8 + 1]; snprintf(tli_str, sizeof(tli_str), "%u", sendTimeLineNextTLI); snprintf(startpos_str, sizeof(startpos_str), "%X/%X", (uint32) (sendTimeLineValidUpto >> 32), (uint32) sendTimeLineValidUpto); - pq_beginmessage(&buf, 'T'); /* RowDescription */ - pq_sendint(&buf, 2, 2); /* 2 fields */ + pq_beginmessage(&buf, 'T'); /* RowDescription */ + pq_sendint(&buf, 2, 2); /* 2 fields */ /* Field header */ pq_sendstring(&buf, "next_tli"); - pq_sendint(&buf, 0, 4); /* table oid */ - pq_sendint(&buf, 0, 2); /* attnum */ + pq_sendint(&buf, 0, 4); /* table oid */ + pq_sendint(&buf, 0, 2); /* attnum */ + /* * int8 may seem like a surprising data type for this, but in theory * int4 would not be wide enough for this, as TimeLineID is unsigned. @@ -592,8 +596,8 @@ StartReplication(StartReplicationCmd *cmd) pq_sendint(&buf, 0, 2); pq_sendstring(&buf, "next_tli_startpos"); - pq_sendint(&buf, 0, 4); /* table oid */ - pq_sendint(&buf, 0, 2); /* attnum */ + pq_sendint(&buf, 0, 4); /* table oid */ + pq_sendint(&buf, 0, 2); /* attnum */ pq_sendint(&buf, TEXTOID, 4); /* type oid */ pq_sendint(&buf, -1, 2); pq_sendint(&buf, 0, 4); @@ -602,12 +606,12 @@ StartReplication(StartReplicationCmd *cmd) /* Data row */ pq_beginmessage(&buf, 'D'); - pq_sendint(&buf, 2, 2); /* number of columns */ + pq_sendint(&buf, 2, 2); /* number of columns */ pq_sendint(&buf, strlen(tli_str), 4); /* length */ pq_sendbytes(&buf, tli_str, strlen(tli_str)); - pq_sendint(&buf, strlen(startpos_str), 4); /* length */ + pq_sendint(&buf, strlen(startpos_str), 4); /* length */ pq_sendbytes(&buf, startpos_str, strlen(startpos_str)); pq_endmessage(&buf); @@ -840,7 +844,7 @@ ProcessStandbyReplyMessage(void) writePtr = pq_getmsgint64(&reply_message); flushPtr = pq_getmsgint64(&reply_message); applyPtr = pq_getmsgint64(&reply_message); - (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */ + (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */ replyRequested = pq_getmsgbyte(&reply_message); elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s", @@ -887,7 +891,7 @@ ProcessStandbyHSFeedbackMessage(void) * Decipher the reply message. The caller already consumed the msgtype * byte. */ - (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */ + (void) pq_getmsgint64(&reply_message); /* sendTime; not used ATM */ feedbackXmin = pq_getmsgint(&reply_message, 4); feedbackEpoch = pq_getmsgint(&reply_message, 4); @@ -932,11 +936,11 @@ ProcessStandbyHSFeedbackMessage(void) * cleanup conflicts on the standby server. * * There is a small window for a race condition here: although we just - * checked that feedbackXmin precedes nextXid, the nextXid could have gotten - * advanced between our fetching it and applying the xmin below, perhaps - * far enough to make feedbackXmin wrap around. In that case the xmin we - * set here would be "in the future" and have no effect. No point in - * worrying about this since it's too late to save the desired data + * checked that feedbackXmin precedes nextXid, the nextXid could have + * gotten advanced between our fetching it and applying the xmin below, + * perhaps far enough to make feedbackXmin wrap around. In that case the + * xmin we set here would be "in the future" and have no effect. No point + * in worrying about this since it's too late to save the desired data * anyway. Assuming that the standby sends us an increasing sequence of * xmins, this could only happen during the first reply cycle, else our * own xmin would prevent nextXid from advancing so far. @@ -969,8 +973,8 @@ WalSndLoop(void) ping_sent = false; /* - * Loop until we reach the end of this timeline or the client requests - * to stop streaming. + * Loop until we reach the end of this timeline or the client requests to + * stop streaming. */ for (;;) { @@ -1082,8 +1086,8 @@ WalSndLoop(void) { /* * If half of wal_sender_timeout has lapsed without receiving - * any reply from standby, send a keep-alive message to standby - * requesting an immediate reply. + * any reply from standby, send a keep-alive message to + * standby requesting an immediate reply. */ timeout = TimestampTzPlusMilliseconds(last_reply_timestamp, wal_sender_timeout / 2); @@ -1133,6 +1137,7 @@ WalSndLoop(void) return; send_failure: + /* * Get here on send failure. Clean up and exit. * @@ -1290,7 +1295,7 @@ retry: curFileTimeLine = sendTimeLine; if (sendTimeLineIsHistoric) { - XLogSegNo endSegNo; + XLogSegNo endSegNo; XLByteToSeg(sendTimeLineValidUpto, endSegNo); if (sendSegNo == endSegNo) @@ -1311,7 +1316,7 @@ retry: ereport(ERROR, (errcode_for_file_access(), errmsg("requested WAL segment %s has already been removed", - XLogFileNameP(curFileTimeLine, sendSegNo)))); + XLogFileNameP(curFileTimeLine, sendSegNo)))); else ereport(ERROR, (errcode_for_file_access(), @@ -1327,9 +1332,9 @@ retry: if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0) ereport(ERROR, (errcode_for_file_access(), - errmsg("could not seek in log segment %s to offset %u: %m", - XLogFileNameP(curFileTimeLine, sendSegNo), - startoff))); + errmsg("could not seek in log segment %s to offset %u: %m", + XLogFileNameP(curFileTimeLine, sendSegNo), + startoff))); sendOff = startoff; } @@ -1344,9 +1349,9 @@ retry: { ereport(ERROR, (errcode_for_file_access(), - errmsg("could not read from log segment %s, offset %u, length %lu: %m", - XLogFileNameP(curFileTimeLine, sendSegNo), - sendOff, (unsigned long) segbytes))); + errmsg("could not read from log segment %s, offset %u, length %lu: %m", + XLogFileNameP(curFileTimeLine, sendSegNo), + sendOff, (unsigned long) segbytes))); } /* Update state for read */ @@ -1431,16 +1436,16 @@ XLogSend(bool *caughtup) /* * Streaming the latest timeline on a standby. * - * Attempt to send all WAL that has already been replayed, so that - * we know it's valid. If we're receiving WAL through streaming + * Attempt to send all WAL that has already been replayed, so that we + * know it's valid. If we're receiving WAL through streaming * replication, it's also OK to send any WAL that has been received * but not replayed. * * The timeline we're recovering from can change, or we can be - * promoted. In either case, the current timeline becomes historic. - * We need to detect that so that we don't try to stream past the - * point where we switched to another timeline. We check for promotion - * or timeline switch after calculating FlushPtr, to avoid a race + * promoted. In either case, the current timeline becomes historic. We + * need to detect that so that we don't try to stream past the point + * where we switched to another timeline. We check for promotion or + * timeline switch after calculating FlushPtr, to avoid a race * condition: if the timeline becomes historic just after we checked * that it was still current, it's still be OK to stream it up to the * FlushPtr that was calculated before it became historic. @@ -1496,7 +1501,7 @@ XLogSend(bool *caughtup) * * Attempt to send all data that's already been written out and * fsync'd to disk. We cannot go further than what's been written out - * given the current implementation of XLogRead(). And in any case + * given the current implementation of XLogRead(). And in any case * it's unsafe to send WAL that is not securely down to disk on the * master: if the master subsequently crashes and restarts, slaves * must not have applied any WAL that gets lost on the master. @@ -1509,13 +1514,14 @@ XLogSend(bool *caughtup) * forked to the next timeline, stop streaming. * * Note: We might already have sent WAL > sendTimeLineValidUpto. The - * startup process will normally replay all WAL that has been received from - * the master, before promoting, but if the WAL streaming is terminated at - * a WAL page boundary, the valid portion of the timeline might end in the - * middle of a WAL record. We might've already sent the first half of that - * partial WAL record to the cascading standby, so that sentPtr > - * sendTimeLineValidUpto. That's OK; the cascading standby can't replay the - * partial WAL record either, so it can still follow our timeline switch. + * startup process will normally replay all WAL that has been received + * from the master, before promoting, but if the WAL streaming is + * terminated at a WAL page boundary, the valid portion of the timeline + * might end in the middle of a WAL record. We might've already sent the + * first half of that partial WAL record to the cascading standby, so that + * sentPtr > sendTimeLineValidUpto. That's OK; the cascading standby can't + * replay the partial WAL record either, so it can still follow our + * timeline switch. */ if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr) { @@ -1585,8 +1591,8 @@ XLogSend(bool *caughtup) pq_sendbyte(&output_message, 'w'); pq_sendint64(&output_message, startptr); /* dataStart */ - pq_sendint64(&output_message, SendRqstPtr); /* walEnd */ - pq_sendint64(&output_message, 0); /* sendtime, filled in last */ + pq_sendint64(&output_message, SendRqstPtr); /* walEnd */ + pq_sendint64(&output_message, 0); /* sendtime, filled in last */ /* * Read the log directly into the output buffer to avoid extra memcpy @@ -1643,16 +1649,16 @@ XLogSend(bool *caughtup) static XLogRecPtr GetStandbyFlushRecPtr(void) { - XLogRecPtr replayPtr; - TimeLineID replayTLI; - XLogRecPtr receivePtr; - TimeLineID receiveTLI; + XLogRecPtr replayPtr; + TimeLineID replayTLI; + XLogRecPtr receivePtr; + TimeLineID receiveTLI; XLogRecPtr result; /* * We can safely send what's already been replayed. Also, if walreceiver - * is streaming WAL from the same timeline, we can send anything that - * it has streamed, but hasn't been replayed yet. + * is streaming WAL from the same timeline, we can send anything that it + * has streamed, but hasn't been replayed yet. */ receivePtr = GetWalRcvWriteRecPtr(NULL, &receiveTLI); @@ -1742,8 +1748,8 @@ WalSndSignals(void) pqsignal(SIGHUP, WalSndSigHupHandler); /* set flag to read config * file */ pqsignal(SIGINT, SIG_IGN); /* not used */ - pqsignal(SIGTERM, die); /* request shutdown */ - pqsignal(SIGQUIT, quickdie); /* hard crash time */ + pqsignal(SIGTERM, die); /* request shutdown */ + pqsignal(SIGQUIT, quickdie); /* hard crash time */ InitializeTimeouts(); /* establishes SIGALRM handler */ pqsignal(SIGPIPE, SIG_IGN); pqsignal(SIGUSR1, WalSndXLogSendHandler); /* request WAL sending */ |