aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication')
-rw-r--r--src/backend/replication/basebackup.c20
-rw-r--r--src/backend/replication/walreceiver.c30
-rw-r--r--src/backend/replication/walsender.c71
3 files changed, 50 insertions, 71 deletions
diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c
index 0bc88a4040d..14c42b46c23 100644
--- a/src/backend/replication/basebackup.c
+++ b/src/backend/replication/basebackup.c
@@ -221,10 +221,8 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
* We've left the last tar file "open", so we can now append the
* required WAL files to it.
*/
- uint32 logid,
- logseg;
- uint32 endlogid,
- endlogseg;
+ XLogSegNo logsegno;
+ XLogSegNo endlogsegno;
struct stat statbuf;
MemSet(&statbuf, 0, sizeof(statbuf));
@@ -236,8 +234,8 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
statbuf.st_size = XLogSegSize;
statbuf.st_mtime = time(NULL);
- XLByteToSeg(startptr, logid, logseg);
- XLByteToPrevSeg(endptr, endlogid, endlogseg);
+ XLByteToSeg(startptr, logsegno);
+ XLByteToPrevSeg(endptr, endlogsegno);
while (true)
{
@@ -245,7 +243,7 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
char fn[MAXPGPATH];
int i;
- XLogFilePath(fn, ThisTimeLineID, logid, logseg);
+ XLogFilePath(fn, ThisTimeLineID, logsegno);
_tarWriteHeader(fn, NULL, &statbuf);
/* Send the actual WAL file contents, block-by-block */
@@ -254,8 +252,7 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
char buf[TAR_SEND_SIZE];
XLogRecPtr ptr;
- ptr.xlogid = logid;
- ptr.xrecoff = logseg * XLogSegSize + TAR_SEND_SIZE * i;
+ XLogSegNoOffsetToRecPtr(logsegno, TAR_SEND_SIZE * i, ptr);
/*
* Some old compilers, e.g. gcc 2.95.3/x86, think that passing
@@ -277,11 +274,10 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
/* Advance to the next WAL file */
- NextLogSeg(logid, logseg);
+ logsegno++;
/* Have we reached our stop position yet? */
- if (logid > endlogid ||
- (logid == endlogid && logseg > endlogseg))
+ if (logsegno > endlogsegno)
break;
}
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index 650b74fff7d..b3ba7089dfb 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -69,11 +69,12 @@ walrcv_disconnect_type walrcv_disconnect = NULL;
/*
* These variables are used similarly to openLogFile/Id/Seg/Off,
- * but for walreceiver to write the XLOG.
+ * but for walreceiver to write the XLOG. recvFileTLI is the TimeLineID
+ * corresponding the filename of recvFile, used for error messages.
*/
static int recvFile = -1;
-static uint32 recvId = 0;
-static uint32 recvSeg = 0;
+static TimeLineID recvFileTLI = -1;
+static XLogSegNo recvSegNo = 0;
static uint32 recvOff = 0;
/*
@@ -481,7 +482,7 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
{
int segbytes;
- if (recvFile < 0 || !XLByteInSeg(recptr, recvId, recvSeg))
+ if (recvFile < 0 || !XLByteInSeg(recptr, recvSegNo))
{
bool use_existent;
@@ -501,15 +502,16 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
if (close(recvFile) != 0)
ereport(PANIC,
(errcode_for_file_access(),
- errmsg("could not close log file %u, segment %u: %m",
- recvId, recvSeg)));
+ errmsg("could not close log segment %s: %m",
+ XLogFileNameP(recvFileTLI, recvSegNo))));
}
recvFile = -1;
/* Create/use new log file */
- XLByteToSeg(recptr, recvId, recvSeg);
+ XLByteToSeg(recptr, recvSegNo);
use_existent = true;
- recvFile = XLogFileInit(recvId, recvSeg, &use_existent, true);
+ recvFile = XLogFileInit(recvSegNo, &use_existent, true);
+ recvFileTLI = ThisTimeLineID;
recvOff = 0;
}
@@ -527,9 +529,9 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
if (lseek(recvFile, (off_t) startoff, SEEK_SET) < 0)
ereport(PANIC,
(errcode_for_file_access(),
- errmsg("could not seek in log file %u, "
- "segment %u to offset %u: %m",
- recvId, recvSeg, startoff)));
+ errmsg("could not seek in log segment %s, to offset %u: %m",
+ XLogFileNameP(recvFileTLI, recvSegNo),
+ startoff)));
recvOff = startoff;
}
@@ -544,9 +546,9 @@ XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr)
errno = ENOSPC;
ereport(PANIC,
(errcode_for_file_access(),
- errmsg("could not write to log file %u, segment %u "
+ errmsg("could not write to log segment %s "
"at offset %u, length %lu: %m",
- recvId, recvSeg,
+ XLogFileNameP(recvFileTLI, recvSegNo),
recvOff, (unsigned long) segbytes)));
}
@@ -575,7 +577,7 @@ XLogWalRcvFlush(bool dying)
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
- issue_xlog_fsync(recvFile, recvId, recvSeg);
+ issue_xlog_fsync(recvFile, recvSegNo);
LogstreamResult.Flush = LogstreamResult.Write;
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c
index 45a3b2ef294..2c04df08ed1 100644
--- a/src/backend/replication/walsender.c
+++ b/src/backend/replication/walsender.c
@@ -87,8 +87,7 @@ int replication_timeout = 60 * 1000; /* maximum time to send one
* but for walsender to read the XLOG.
*/
static int sendFile = -1;
-static uint32 sendId = 0;
-static uint32 sendSeg = 0;
+static XLogSegNo sendSegNo = 0;
static uint32 sendOff = 0;
/*
@@ -977,10 +976,8 @@ XLogRead(char *buf, XLogRecPtr startptr, Size count)
char *p;
XLogRecPtr recptr;
Size nbytes;
- uint32 lastRemovedLog;
- uint32 lastRemovedSeg;
- uint32 log;
- uint32 seg;
+ XLogSegNo lastRemovedSegNo;
+ XLogSegNo segno;
retry:
p = buf;
@@ -995,7 +992,7 @@ retry:
startoff = recptr.xrecoff % XLogSegSize;
- if (sendFile < 0 || !XLByteInSeg(recptr, sendId, sendSeg))
+ if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
{
char path[MAXPGPATH];
@@ -1003,8 +1000,8 @@ retry:
if (sendFile >= 0)
close(sendFile);
- XLByteToSeg(recptr, sendId, sendSeg);
- XLogFilePath(path, ThisTimeLineID, sendId, sendSeg);
+ XLByteToSeg(recptr, sendSegNo);
+ XLogFilePath(path, ThisTimeLineID, sendSegNo);
sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
if (sendFile < 0)
@@ -1015,20 +1012,15 @@ retry:
* removed or recycled.
*/
if (errno == ENOENT)
- {
- char filename[MAXFNAMELEN];
-
- XLogFileName(filename, ThisTimeLineID, sendId, sendSeg);
ereport(ERROR,
(errcode_for_file_access(),
errmsg("requested WAL segment %s has already been removed",
- filename)));
- }
+ XLogFileNameP(ThisTimeLineID, sendSegNo))));
else
ereport(ERROR,
(errcode_for_file_access(),
- errmsg("could not open file \"%s\" (log file %u, segment %u): %m",
- path, sendId, sendSeg)));
+ errmsg("could not open file \"%s\": %m",
+ path)));
}
sendOff = 0;
}
@@ -1039,8 +1031,9 @@ retry:
if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0)
ereport(ERROR,
(errcode_for_file_access(),
- errmsg("could not seek in log file %u, segment %u to offset %u: %m",
- sendId, sendSeg, startoff)));
+ errmsg("could not seek in log segment %s to offset %u: %m",
+ XLogFileNameP(ThisTimeLineID, sendSegNo),
+ startoff)));
sendOff = startoff;
}
@@ -1052,11 +1045,13 @@ retry:
readbytes = read(sendFile, p, segbytes);
if (readbytes <= 0)
+ {
ereport(ERROR,
(errcode_for_file_access(),
- errmsg("could not read from log file %u, segment %u, offset %u, "
- "length %lu: %m",
- sendId, sendSeg, sendOff, (unsigned long) segbytes)));
+ errmsg("could not read from log segment %s, offset %u, length %lu: %m",
+ XLogFileNameP(ThisTimeLineID, sendSegNo),
+ sendOff, (unsigned long) segbytes)));
+ }
/* Update state for read */
XLByteAdvance(recptr, readbytes);
@@ -1073,19 +1068,13 @@ retry:
* read() succeeds in that case, but the data we tried to read might
* already have been overwritten with new WAL records.
*/
- XLogGetLastRemoved(&lastRemovedLog, &lastRemovedSeg);
- XLByteToSeg(startptr, log, seg);
- if (log < lastRemovedLog ||
- (log == lastRemovedLog && seg <= lastRemovedSeg))
- {
- char filename[MAXFNAMELEN];
-
- XLogFileName(filename, ThisTimeLineID, log, seg);
+ XLogGetLastRemoved(&lastRemovedSegNo);
+ XLByteToSeg(startptr, segno);
+ if (segno <= lastRemovedSegNo)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("requested WAL segment %s has already been removed",
- filename)));
- }
+ XLogFileNameP(ThisTimeLineID, segno))));
/*
* During recovery, the currently-open WAL file might be replaced with the
@@ -1165,24 +1154,13 @@ XLogSend(char *msgbuf, bool *caughtup)
* SendRqstPtr never points to the middle of a WAL record.
*/
startptr = sentPtr;
- if (startptr.xrecoff >= XLogFileSize)
- {
- /*
- * crossing a logid boundary, skip the non-existent last log segment
- * in previous logical log file.
- */
- startptr.xlogid += 1;
- startptr.xrecoff = 0;
- }
-
endptr = startptr;
XLByteAdvance(endptr, MAX_SEND_SIZE);
if (endptr.xlogid != startptr.xlogid)
{
/* Don't cross a logfile boundary within one message */
Assert(endptr.xlogid == startptr.xlogid + 1);
- endptr.xlogid = startptr.xlogid;
- endptr.xrecoff = XLogFileSize;
+ endptr.xrecoff = 0;
}
/* if we went beyond SendRqstPtr, back off */
@@ -1198,7 +1176,10 @@ XLogSend(char *msgbuf, bool *caughtup)
*caughtup = false;
}
- nbytes = endptr.xrecoff - startptr.xrecoff;
+ if (endptr.xrecoff == 0)
+ nbytes = 0x100000000L - (uint64) startptr.xrecoff;
+ else
+ nbytes = endptr.xrecoff - startptr.xrecoff;
Assert(nbytes <= MAX_SEND_SIZE);
/*