aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorTom Lane <tgl@sss.pgh.pa.us>2010-06-03 22:17:32 +0000
committerTom Lane <tgl@sss.pgh.pa.us>2010-06-03 22:17:32 +0000
commit0cc59cc1f38d91587d52b14789e20bdd4c1af70a (patch)
treecb7b46dbdfb470b2b66f2e12206e6c06257e309a /src
parent572ec5a2760dfa12b74001428431a5f7d9027e27 (diff)
downloadpostgresql-0cc59cc1f38d91587d52b14789e20bdd4c1af70a.tar.gz
postgresql-0cc59cc1f38d91587d52b14789e20bdd4c1af70a.zip
Add current WAL end (as seen by walsender, ie, GetWriteRecPtr() result)
and current server clock time to SR data messages. These are not currently used on the slave side but seem likely to be useful in future, and it'd be better not to change the SR protocol after release. Per discussion. Also do some minor code review and cleanup on walsender.c, and improve the protocol documentation.
Diffstat (limited to 'src')
-rw-r--r--src/backend/replication/walreceiver.c17
-rw-r--r--src/backend/replication/walsender.c188
-rw-r--r--src/include/replication/walprotocol.h53
-rw-r--r--src/include/replication/walreceiver.h3
4 files changed, 169 insertions, 92 deletions
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index ecb2c3a6d39..b31cfb4147d 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -29,7 +29,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.10 2010/04/20 22:55:03 tgl Exp $
+ * $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.11 2010/06/03 22:17:32 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -41,6 +41,7 @@
#include "access/xlog_internal.h"
#include "libpq/pqsignal.h"
#include "miscadmin.h"
+#include "replication/walprotocol.h"
#include "replication/walreceiver.h"
#include "storage/ipc.h"
#include "storage/pmsignal.h"
@@ -393,18 +394,18 @@ XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len)
{
case 'w': /* WAL records */
{
- XLogRecPtr recptr;
+ WalDataMessageHeader msghdr;
- if (len < sizeof(XLogRecPtr))
+ if (len < sizeof(WalDataMessageHeader))
ereport(ERROR,
(errcode(ERRCODE_PROTOCOL_VIOLATION),
errmsg_internal("invalid WAL message received from primary")));
+ /* memcpy is required here for alignment reasons */
+ memcpy(&msghdr, buf, sizeof(WalDataMessageHeader));
+ buf += sizeof(WalDataMessageHeader);
+ len -= sizeof(WalDataMessageHeader);
- memcpy(&recptr, buf, sizeof(XLogRecPtr));
- buf += sizeof(XLogRecPtr);
- len -= sizeof(XLogRecPtr);
-
- XLogWalRcvWrite(buf, len, recptr);
+ XLogWalRcvWrite(buf, len, msghdr.dataStart);
break;
}
default:
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index d2e37fd0086..e337e7e5a6f 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -5,11 +5,10 @@
* The WAL sender process (walsender) is new as of Postgres 9.0. It takes
* charge of XLOG streaming sender in the primary server. At first, it is
* started by the postmaster when the walreceiver in the standby server
- * connects to the primary server and requests XLOG streaming replication,
- * i.e., unlike any auxiliary process, it is not an always-running process.
+ * connects to the primary server and requests XLOG streaming replication.
* It attempts to keep reading XLOG records from the disk and sending them
* to the standby server, as long as the connection is alive (i.e., like
- * any backend, there is an one to one relationship between a connection
+ * any backend, there is a one-to-one relationship between a connection
* and a walsender process).
*
* Normal termination is by SIGTERM, which instructs the walsender to
@@ -30,7 +29,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/replication/walsender.c,v 1.24 2010/06/03 21:02:12 petere Exp $
+ * $PostgreSQL: pgsql/src/backend/replication/walsender.c,v 1.25 2010/06/03 22:17:32 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -44,6 +43,7 @@
#include "libpq/pqformat.h"
#include "libpq/pqsignal.h"
#include "miscadmin.h"
+#include "replication/walprotocol.h"
#include "replication/walsender.h"
#include "storage/fd.h"
#include "storage/ipc.h"
@@ -80,7 +80,7 @@ static uint32 sendOff = 0;
/*
* How far have we sent WAL already? This is also advertised in
- * MyWalSnd->sentPtr.
+ * MyWalSnd->sentPtr. (Actually, this is the next WAL location to send.)
*/
static XLogRecPtr sentPtr = {0, 0};
@@ -100,19 +100,9 @@ static void InitWalSnd(void);
static void WalSndHandshake(void);
static void WalSndKill(int code, Datum arg);
static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes);
-static bool XLogSend(StringInfo outMsg, bool *caughtup);
+static bool XLogSend(char *msgbuf, bool *caughtup);
static void CheckClosedConnection(void);
-/*
- * How much WAL to send in one message? Must be >= XLOG_BLCKSZ.
- *
- * We don't have a good idea of what a good value would be; there's some
- * overhead per message in both walsender and walreceiver, but on the other
- * hand sending large batches makes walsender less responsive to signals
- * because signals are checked only between messages. 128kB (with
- * default 8k blocks) seems like a reasonable guess for now.
- */
-#define MAX_SEND_SIZE (XLOG_BLCKSZ * 16)
/* Main entry point for walsender process */
int
@@ -157,6 +147,9 @@ WalSenderMain(void)
return WalSndLoop();
}
+/*
+ * Execute commands from walreceiver, until we enter streaming mode.
+ */
static void
WalSndHandshake(void)
{
@@ -173,6 +166,13 @@ WalSndHandshake(void)
firstchar = pq_getbyte();
/*
+ * Emergency bailout if postmaster has died. This is to avoid the
+ * necessity for manual cleanup of all postmaster children.
+ */
+ if (!PostmasterIsAlive(true))
+ exit(1);
+
+ /*
* Check for any other interesting events that happened while we
* slept.
*/
@@ -211,7 +211,7 @@ WalSndHandshake(void)
/*
* Reply with a result set with one row, two columns.
- * First col is system ID, and second if timeline ID
+ * First col is system ID, and second is timeline ID
*/
snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
@@ -253,6 +253,7 @@ WalSndHandshake(void)
/* Send CommandComplete and ReadyForQuery messages */
EndCommand("SELECT", DestRemote);
ReadyForQuery(DestRemote);
+ /* ReadyForQuery did pq_flush for us */
}
else if (sscanf(query_string, "START_REPLICATION %X/%X",
&recptr.xlogid, &recptr.xrecoff) == 2)
@@ -365,12 +366,17 @@ CheckClosedConnection(void)
static int
WalSndLoop(void)
{
- StringInfoData output_message;
+ char *output_message;
bool caughtup = false;
- initStringInfo(&output_message);
+ /*
+ * Allocate buffer that will be used for each output message. We do this
+ * just once to reduce palloc overhead. The buffer must be made large
+ * enough for maximum-sized messages.
+ */
+ output_message = palloc(1 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE);
- /* Loop forever */
+ /* Loop forever, unless we get an error */
for (;;)
{
long remain; /* remaining time (us) */
@@ -381,6 +387,7 @@ WalSndLoop(void)
*/
if (!PostmasterIsAlive(true))
exit(1);
+
/* Process any requests or signals received recently */
if (got_SIGHUP)
{
@@ -394,8 +401,8 @@ WalSndLoop(void)
*/
if (ready_to_stop)
{
- if (!XLogSend(&output_message, &caughtup))
- goto eof;
+ if (!XLogSend(output_message, &caughtup))
+ break;
if (caughtup)
shutdown_requested = true;
}
@@ -435,17 +442,15 @@ WalSndLoop(void)
remain -= NAPTIME_PER_CYCLE;
}
}
+
/* Attempt to send the log once every loop */
- if (!XLogSend(&output_message, &caughtup))
- goto eof;
+ if (!XLogSend(output_message, &caughtup))
+ break;
}
- /* can't get here because the above loop never exits */
- return 1;
-
-eof:
-
/*
+ * Get here on send failure. Clean up and exit.
+ *
* Reset whereToSendOutput to prevent ereport from attempting to send any
* more messages to the standby.
*/
@@ -524,6 +529,9 @@ WalSndKill(int code, Datum arg)
/*
* Read 'nbytes' bytes from WAL into 'buf', starting at location 'recptr'
+ *
+ * XXX probably this should be improved to suck data directly from the
+ * WAL buffers when possible.
*/
static void
XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
@@ -634,51 +642,46 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes)
/*
* Read up to MAX_SEND_SIZE bytes of WAL that's been written (and flushed),
- * but not yet sent to the client, and send it. If there is no unsent WAL,
- * *caughtup is set to true and nothing is sent, otherwise *caughtup is set
- * to false.
+ * but not yet sent to the client, and send it.
+ *
+ * msgbuf is a work area in which the output message is constructed. It's
+ * passed in just so we can avoid re-palloc'ing the buffer on each cycle.
+ * It must be of size 1 + sizeof(WalDataMessageHeader) + MAX_SEND_SIZE.
+ *
+ * If there is no unsent WAL remaining, *caughtup is set to true, otherwise
+ * *caughtup is set to false.
*
* Returns true if OK, false if trouble.
*/
static bool
-XLogSend(StringInfo outMsg, bool *caughtup)
+XLogSend(char *msgbuf, bool *caughtup)
{
XLogRecPtr SendRqstPtr;
XLogRecPtr startptr;
XLogRecPtr endptr;
Size nbytes;
- char activitymsg[50];
-
- /* use volatile pointer to prevent code rearrangement */
- volatile WalSnd *walsnd = MyWalSnd;
+ WalDataMessageHeader msghdr;
/* Attempt to send all records flushed to the disk already */
SendRqstPtr = GetWriteRecPtr();
/* Quick exit if nothing to do */
- if (!XLByteLT(sentPtr, SendRqstPtr))
+ if (XLByteLE(SendRqstPtr, sentPtr))
{
*caughtup = true;
return true;
}
- /*
- * Otherwise let the caller know that we're not fully caught up. Unless
- * there's a huge backlog, we'll be caught up to the current WriteRecPtr
- * after we've sent everything below, but more WAL could accumulate while
- * we're busy sending.
- */
- *caughtup = false;
/*
- * Figure out how much to send in one message. If there's less than
+ * Figure out how much to send in one message. If there's no more than
* MAX_SEND_SIZE bytes to send, send everything. Otherwise send
- * MAX_SEND_SIZE bytes, but round to page boundary.
+ * MAX_SEND_SIZE bytes, but round to logfile or page boundary.
*
* The rounding is not only for performance reasons. Walreceiver
* relies on the fact that we never split a WAL record across two
* messages. Since a long WAL record is split at page boundary into
* continuation records, page boundary is always a safe cut-off point.
- * We also assume that SendRqstPtr never points in the middle of a WAL
+ * We also assume that SendRqstPtr never points to the middle of a WAL
* record.
*/
startptr = sentPtr;
@@ -694,59 +697,78 @@ XLogSend(StringInfo outMsg, bool *caughtup)
endptr = startptr;
XLByteAdvance(endptr, MAX_SEND_SIZE);
- /* round down to page boundary. */
- endptr.xrecoff -= (endptr.xrecoff % XLOG_BLCKSZ);
- /* if we went beyond SendRqstPtr, back off */
- if (XLByteLT(SendRqstPtr, endptr))
- endptr = SendRqstPtr;
-
- /*
- * OK to read and send the slice.
- *
- * We don't need to convert the xlogid/xrecoff from host byte order to
- * network byte order because the both server can be expected to have
- * the same byte order. If they have different byte order, we don't
- * reach here.
- */
- pq_sendbyte(outMsg, 'w');
- pq_sendbytes(outMsg, (char *) &startptr, sizeof(startptr));
-
if (endptr.xlogid != startptr.xlogid)
{
+ /* Don't cross a logfile boundary within one message */
Assert(endptr.xlogid == startptr.xlogid + 1);
- nbytes = endptr.xrecoff + XLogFileSize - startptr.xrecoff;
+ endptr.xlogid = startptr.xlogid;
+ endptr.xrecoff = XLogFileSize;
+ }
+
+ /* if we went beyond SendRqstPtr, back off */
+ if (XLByteLE(SendRqstPtr, endptr))
+ {
+ endptr = SendRqstPtr;
+ *caughtup = true;
}
else
- nbytes = endptr.xrecoff - startptr.xrecoff;
+ {
+ /* round down to page boundary. */
+ endptr.xrecoff -= (endptr.xrecoff % XLOG_BLCKSZ);
+ *caughtup = false;
+ }
- sentPtr = endptr;
+ nbytes = endptr.xrecoff - startptr.xrecoff;
+ Assert(nbytes <= MAX_SEND_SIZE);
/*
- * Read the log directly into the output buffer to prevent extra
- * memcpy calls.
+ * OK to read and send the slice.
*/
- enlargeStringInfo(outMsg, nbytes);
+ msgbuf[0] = 'w';
- XLogRead(&outMsg->data[outMsg->len], startptr, nbytes);
- outMsg->len += nbytes;
- outMsg->data[outMsg->len] = '\0';
+ /*
+ * Read the log directly into the output buffer to avoid extra memcpy
+ * calls.
+ */
+ XLogRead(msgbuf + 1 + sizeof(WalDataMessageHeader), startptr, nbytes);
- pq_putmessage('d', outMsg->data, outMsg->len);
- resetStringInfo(outMsg);
+ /*
+ * We fill the message header last so that the send timestamp is taken
+ * as late as possible.
+ */
+ msghdr.dataStart = startptr;
+ msghdr.walEnd = SendRqstPtr;
+ msghdr.sendTime = GetCurrentTimestamp();
- /* Update shared memory status */
- SpinLockAcquire(&walsnd->mutex);
- walsnd->sentPtr = sentPtr;
- SpinLockRelease(&walsnd->mutex);
+ memcpy(msgbuf + 1, &msghdr, sizeof(WalDataMessageHeader));
+
+ pq_putmessage('d', msgbuf, 1 + sizeof(WalDataMessageHeader) + nbytes);
/* Flush pending output */
if (pq_flush())
return false;
+ sentPtr = endptr;
+
+ /* Update shared memory status */
+ {
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalSnd *walsnd = MyWalSnd;
+
+ SpinLockAcquire(&walsnd->mutex);
+ walsnd->sentPtr = sentPtr;
+ SpinLockRelease(&walsnd->mutex);
+ }
+
/* Report progress of XLOG streaming in PS display */
- snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
- sentPtr.xlogid, sentPtr.xrecoff);
- set_ps_display(activitymsg, false);
+ if (update_process_title)
+ {
+ char activitymsg[50];
+
+ snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
+ sentPtr.xlogid, sentPtr.xrecoff);
+ set_ps_display(activitymsg, false);
+ }
return true;
}
diff --git a/src/include/replication/walprotocol.h b/src/include/replication/walprotocol.h
new file mode 100644
index 00000000000..15025a277c0
--- /dev/null
+++ b/src/include/replication/walprotocol.h
@@ -0,0 +1,53 @@
+/*-------------------------------------------------------------------------
+ *
+ * walprotocol.h
+ * Definitions relevant to the streaming WAL transmission protocol.
+ *
+ * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group
+ *
+ * $PostgreSQL: pgsql/src/include/replication/walprotocol.h,v 1.1 2010/06/03 22:17:32 tgl Exp $
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef _WALPROTOCOL_H
+#define _WALPROTOCOL_H
+
+#include "access/xlogdefs.h"
+#include "utils/timestamp.h"
+
+
+/*
+ * Header for a WAL data message (message type 'w'). This is wrapped within
+ * a CopyData message at the FE/BE protocol level.
+ *
+ * The header is followed by actual WAL data. Note that the data length is
+ * not specified in the header --- it's just whatever remains in the message.
+ *
+ * walEnd and sendTime are not essential data, but are provided in case
+ * the receiver wants to adjust its behavior depending on how far behind
+ * it is.
+ */
+typedef struct
+{
+ /* WAL start location of the data included in this message */
+ XLogRecPtr dataStart;
+
+ /* Current end of WAL on the sender */
+ XLogRecPtr walEnd;
+
+ /* Sender's system clock at the time of transmission */
+ TimestampTz sendTime;
+} WalDataMessageHeader;
+
+/*
+ * Maximum data payload in a WAL data message. Must be >= XLOG_BLCKSZ.
+ *
+ * We don't have a good idea of what a good value would be; there's some
+ * overhead per message in both walsender and walreceiver, but on the other
+ * hand sending large batches makes walsender less responsive to signals
+ * because signals are checked only between messages. 128kB (with
+ * default 8k blocks) seems like a reasonable guess for now.
+ */
+#define MAX_SEND_SIZE (XLOG_BLCKSZ * 16)
+
+#endif /* _WALPROTOCOL_H */
diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h
index 4300b80b278..5dcaeba3f33 100644
--- a/src/include/replication/walreceiver.h
+++ b/src/include/replication/walreceiver.h
@@ -5,7 +5,7 @@
*
* Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group
*
- * $PostgreSQL: pgsql/src/include/replication/walreceiver.h,v 1.8 2010/02/26 02:01:27 momjian Exp $
+ * $PostgreSQL: pgsql/src/include/replication/walreceiver.h,v 1.9 2010/06/03 22:17:32 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -14,6 +14,7 @@
#include "access/xlogdefs.h"
#include "storage/spin.h"
+#include "pgtime.h"
extern bool am_walreceiver;