aboutsummaryrefslogtreecommitdiff
path: root/src/backend/access/transam/xlog.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/access/transam/xlog.c')
-rw-r--r--src/backend/access/transam/xlog.c441
1 files changed, 393 insertions, 48 deletions
diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c
index 6b5c5ac0da7..de8e682d849 100644
--- a/src/backend/access/transam/xlog.c
+++ b/src/backend/access/transam/xlog.c
@@ -7,7 +7,7 @@
* Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
* Portions Copyright (c) 1994, Regents of the University of California
*
- * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.245 2006/07/30 02:07:18 alvherre Exp $
+ * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.246 2006/08/06 03:53:44 tgl Exp $
*
*-------------------------------------------------------------------------
*/
@@ -312,7 +312,8 @@ static XLogRecPtr RedoRecPtr;
* CheckpointLock: must be held to do a checkpoint (ensures only one
* checkpointer at a time; even though the postmaster won't launch
* parallel checkpoint processes, we need this because manual checkpoints
- * could be launched simultaneously).
+ * could be launched simultaneously). XXX now that all checkpoints are
+ * done by the bgwriter, isn't this lock redundant?
*
*----------
*/
@@ -465,8 +466,8 @@ static bool recoveryStopsHere(XLogRecord *record, bool *includeThis);
static bool XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites,
XLogRecPtr *lsn, BkpBlock *bkpb);
-static bool AdvanceXLInsertBuffer(void);
-static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible);
+static bool AdvanceXLInsertBuffer(bool new_segment);
+static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch);
static int XLogFileInit(uint32 log, uint32 seg,
bool *use_existent, bool use_lock);
static bool InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath,
@@ -543,7 +544,8 @@ XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata)
XLogwrtRqst LogwrtRqst;
bool updrqst;
bool doPageWrites;
- bool no_tran = (rmid == RM_XLOG_ID) ? true : false;
+ bool isLogSwitch = (rmid == RM_XLOG_ID && info == XLOG_SWITCH);
+ bool no_tran = (rmid == RM_XLOG_ID);
if (info & XLR_INFO_MASK)
{
@@ -687,14 +689,13 @@ begin:;
}
/*
- * NOTE: the test for len == 0 here is somewhat fishy, since in theory all
- * of the rmgr data might have been suppressed in favor of backup blocks.
- * Currently, all callers of XLogInsert provide at least some
- * not-in-a-buffer data and so len == 0 should never happen, but that may
- * not be true forever. If you need to remove the len == 0 check, also
- * remove the check for xl_len == 0 in ReadRecord, below.
+ * NOTE: We disallow len == 0 because it provides a useful bit of extra
+ * error checking in ReadRecord. This means that all callers of
+ * XLogInsert must supply at least some not-in-a-buffer data. However,
+ * we make an exception for XLOG SWITCH records because we don't want
+ * them to ever cross a segment boundary.
*/
- if (len == 0)
+ if (len == 0 && !isLogSwitch)
elog(PANIC, "invalid xlog record length %u", len);
START_CRIT_SECTION();
@@ -731,7 +732,7 @@ begin:;
*/
LogwrtResult = XLogCtl->Write.LogwrtResult;
if (XLByteLT(LogwrtResult.Write, LogwrtRqst.Write))
- XLogWrite(LogwrtRqst, true);
+ XLogWrite(LogwrtRqst, true, false);
LWLockRelease(WALWriteLock);
}
}
@@ -854,15 +855,55 @@ begin:;
freespace = INSERT_FREESPACE(Insert);
if (freespace < SizeOfXLogRecord)
{
- updrqst = AdvanceXLInsertBuffer();
+ updrqst = AdvanceXLInsertBuffer(false);
freespace = INSERT_FREESPACE(Insert);
}
+ /* Compute record's XLOG location */
curridx = Insert->curridx;
- record = (XLogRecord *) Insert->currpos;
+ INSERT_RECPTR(RecPtr, Insert, curridx);
+
+ /*
+ * If the record is an XLOG_SWITCH, and we are exactly at the start
+ * of a segment, we need not insert it (and don't want to because
+ * we'd like consecutive switch requests to be no-ops). Instead,
+ * make sure everything is written and flushed through the end of
+ * the prior segment, and return the prior segment's end address.
+ */
+ if (isLogSwitch &&
+ (RecPtr.xrecoff % XLogSegSize) == SizeOfXLogLongPHD)
+ {
+ /* We can release insert lock immediately */
+ LWLockRelease(WALInsertLock);
+
+ RecPtr.xrecoff -= SizeOfXLogLongPHD;
+ if (RecPtr.xrecoff == 0)
+ {
+ /* crossing a logid boundary */
+ RecPtr.xlogid -= 1;
+ RecPtr.xrecoff = XLogFileSize;
+ }
+
+ LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
+ LogwrtResult = XLogCtl->Write.LogwrtResult;
+ if (!XLByteLE(RecPtr, LogwrtResult.Flush))
+ {
+ XLogwrtRqst FlushRqst;
+
+ FlushRqst.Write = RecPtr;
+ FlushRqst.Flush = RecPtr;
+ XLogWrite(FlushRqst, false, false);
+ }
+ LWLockRelease(WALWriteLock);
+
+ END_CRIT_SECTION();
+
+ return RecPtr;
+ }
/* Insert record header */
+ record = (XLogRecord *) Insert->currpos;
record->xl_prev = Insert->PrevRecord;
record->xl_xid = GetCurrentTransactionIdIfAny();
record->xl_tot_len = SizeOfXLogRecord + write_len;
@@ -876,17 +917,14 @@ begin:;
FIN_CRC32(rdata_crc);
record->xl_crc = rdata_crc;
- /* Compute record's XLOG location */
- INSERT_RECPTR(RecPtr, Insert, curridx);
-
#ifdef WAL_DEBUG
if (XLOG_DEBUG)
{
StringInfoData buf;
initStringInfo(&buf);
- appendStringInfo(&buf, "INSERT @ %X/%X: ",
- RecPtr.xlogid, RecPtr.xrecoff);
+ appendStringInfo(&buf, "INSERT @ %X/%X: ",
+ RecPtr.xlogid, RecPtr.xrecoff);
xlog_outrec(&buf, record);
if (rdata->data != NULL)
{
@@ -937,7 +975,7 @@ begin:;
}
/* Use next buffer */
- updrqst = AdvanceXLInsertBuffer();
+ updrqst = AdvanceXLInsertBuffer(false);
curridx = Insert->curridx;
/* Insert cont-record header */
Insert->currpage->xlp_info |= XLP_FIRST_IS_CONTRECORD;
@@ -958,13 +996,92 @@ begin:;
*/
INSERT_RECPTR(RecPtr, Insert, curridx);
- /* Need to update shared LogwrtRqst if some block was filled up */
- if (freespace < SizeOfXLogRecord)
- updrqst = true; /* curridx is filled and available for writing
- * out */
+ /*
+ * If the record is an XLOG_SWITCH, we must now write and flush all the
+ * existing data, and then forcibly advance to the start of the next
+ * segment. It's not good to do this I/O while holding the insert lock,
+ * but there seems too much risk of confusion if we try to release the
+ * lock sooner. Fortunately xlog switch needn't be a high-performance
+ * operation anyway...
+ */
+ if (isLogSwitch)
+ {
+ XLogCtlWrite *Write = &XLogCtl->Write;
+ XLogwrtRqst FlushRqst;
+ XLogRecPtr OldSegEnd;
+
+ LWLockAcquire(WALWriteLock, LW_EXCLUSIVE);
+
+ /*
+ * Flush through the end of the page containing XLOG_SWITCH,
+ * and perform end-of-segment actions (eg, notifying archiver).
+ */
+ WriteRqst = XLogCtl->xlblocks[curridx];
+ FlushRqst.Write = WriteRqst;
+ FlushRqst.Flush = WriteRqst;
+ XLogWrite(FlushRqst, false, true);
+
+ /* Set up the next buffer as first page of next segment */
+ /* Note: AdvanceXLInsertBuffer cannot need to do I/O here */
+ (void) AdvanceXLInsertBuffer(true);
+
+ /* There should be no unwritten data */
+ curridx = Insert->curridx;
+ Assert(curridx == Write->curridx);
+
+ /* Compute end address of old segment */
+ OldSegEnd = XLogCtl->xlblocks[curridx];
+ OldSegEnd.xrecoff -= XLOG_BLCKSZ;
+ if (OldSegEnd.xrecoff == 0)
+ {
+ /* crossing a logid boundary */
+ OldSegEnd.xlogid -= 1;
+ OldSegEnd.xrecoff = XLogFileSize;
+ }
+
+ /* Make it look like we've written and synced all of old segment */
+ LogwrtResult.Write = OldSegEnd;
+ LogwrtResult.Flush = OldSegEnd;
+
+ /*
+ * Update shared-memory status --- this code should match XLogWrite
+ */
+ {
+ /* use volatile pointer to prevent code rearrangement */
+ volatile XLogCtlData *xlogctl = XLogCtl;
+
+ SpinLockAcquire(&xlogctl->info_lck);
+ xlogctl->LogwrtResult = LogwrtResult;
+ if (XLByteLT(xlogctl->LogwrtRqst.Write, LogwrtResult.Write))
+ xlogctl->LogwrtRqst.Write = LogwrtResult.Write;
+ if (XLByteLT(xlogctl->LogwrtRqst.Flush, LogwrtResult.Flush))
+ xlogctl->LogwrtRqst.Flush = LogwrtResult.Flush;
+ SpinLockRelease(&xlogctl->info_lck);
+ }
+
+ Write->LogwrtResult = LogwrtResult;
+
+ LWLockRelease(WALWriteLock);
+
+ updrqst = false; /* done already */
+ }
else
- curridx = PrevBufIdx(curridx);
- WriteRqst = XLogCtl->xlblocks[curridx];
+ {
+ /* normal case, ie not xlog switch */
+
+ /* Need to update shared LogwrtRqst if some block was filled up */
+ if (freespace < SizeOfXLogRecord)
+ {
+ /* curridx is filled and available for writing out */
+ updrqst = true;
+ }
+ else
+ {
+ /* if updrqst already set, write through end of previous buf */
+ curridx = PrevBufIdx(curridx);
+ }
+ WriteRqst = XLogCtl->xlblocks[curridx];
+ }
LWLockRelease(WALInsertLock);
@@ -1173,6 +1290,10 @@ XLogArchiveCleanup(const char *xlog)
* Advance the Insert state to the next buffer page, writing out the next
* buffer if it still contains unwritten data.
*
+ * If new_segment is TRUE then we set up the next buffer page as the first
+ * page of the next xlog segment file, possibly but not usually the next
+ * consecutive file page.
+ *
* The global LogwrtRqst.Write pointer needs to be advanced to include the
* just-filled page. If we can do this for free (without an extra lock),
* we do so here. Otherwise the caller must do it. We return TRUE if the
@@ -1181,7 +1302,7 @@ XLogArchiveCleanup(const char *xlog)
* Must be called with WALInsertLock held.
*/
static bool
-AdvanceXLInsertBuffer(void)
+AdvanceXLInsertBuffer(bool new_segment)
{
XLogCtlInsert *Insert = &XLogCtl->Insert;
XLogCtlWrite *Write = &XLogCtl->Write;
@@ -1248,7 +1369,7 @@ AdvanceXLInsertBuffer(void)
WriteRqst.Write = OldPageRqstPtr;
WriteRqst.Flush.xlogid = 0;
WriteRqst.Flush.xrecoff = 0;
- XLogWrite(WriteRqst, false);
+ XLogWrite(WriteRqst, false, false);
LWLockRelease(WALWriteLock);
Insert->LogwrtResult = LogwrtResult;
}
@@ -1260,6 +1381,14 @@ AdvanceXLInsertBuffer(void)
* output page.
*/
NewPageEndPtr = XLogCtl->xlblocks[Insert->curridx];
+
+ if (new_segment)
+ {
+ /* force it to a segment start point */
+ NewPageEndPtr.xrecoff += XLogSegSize - 1;
+ NewPageEndPtr.xrecoff -= NewPageEndPtr.xrecoff % XLogSegSize;
+ }
+
if (NewPageEndPtr.xrecoff >= XLogFileSize)
{
/* crossing a logid boundary */
@@ -1318,13 +1447,20 @@ AdvanceXLInsertBuffer(void)
* This option allows us to avoid uselessly issuing multiple writes when a
* single one would do.
*
+ * If xlog_switch == TRUE, we are intending an xlog segment switch, so
+ * perform end-of-segment actions after writing the last page, even if
+ * it's not physically the end of its segment. (NB: this will work properly
+ * only if caller specifies WriteRqst == page-end and flexible == false,
+ * and there is some data to write.)
+ *
* Must be called with WALWriteLock held.
*/
static void
-XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
+XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch)
{
XLogCtlWrite *Write = &XLogCtl->Write;
bool ispartialpage;
+ bool last_iteration;
bool finishing_seg;
bool use_existent;
int curridx;
@@ -1468,10 +1604,12 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
* contiguous in memory), or if we are at the end of the logfile
* segment.
*/
+ last_iteration = !XLByteLT(LogwrtResult.Write, WriteRqst.Write);
+
finishing_seg = !ispartialpage &&
(startoffset + npages * XLOG_BLCKSZ) >= XLogSegSize;
- if (!XLByteLT(LogwrtResult.Write, WriteRqst.Write) ||
+ if (last_iteration ||
curridx == XLogCtl->XLogCacheBlck ||
finishing_seg)
{
@@ -1519,10 +1657,13 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible)
* later. Doing it here ensures that one and only one backend will
* perform this fsync.
*
+ * We also do this if this is the last page written for an xlog
+ * switch.
+ *
* This is also the right place to notify the Archiver that the
* segment is ready to copy to archival storage.
*/
- if (finishing_seg)
+ if (finishing_seg || (xlog_switch && last_iteration))
{
issue_xlog_fsync();
LogwrtResult.Flush = LogwrtResult.Write; /* end of page */
@@ -1681,7 +1822,7 @@ XLogFlush(XLogRecPtr record)
WriteRqst.Write = WriteRqstPtr;
WriteRqst.Flush = record;
}
- XLogWrite(WriteRqst, false);
+ XLogWrite(WriteRqst, false, false);
}
LWLockRelease(WALWriteLock);
}
@@ -2828,10 +2969,20 @@ ReadRecord(XLogRecPtr *RecPtr, int emode)
got_record:;
/*
- * Currently, xl_len == 0 must be bad data, but that might not be true
- * forever. See note in XLogInsert.
+ * xl_len == 0 is bad data for everything except XLOG SWITCH, where
+ * it is required.
*/
- if (record->xl_len == 0)
+ if (record->xl_rmid == RM_XLOG_ID && record->xl_info == XLOG_SWITCH)
+ {
+ if (record->xl_len != 0)
+ {
+ ereport(emode,
+ (errmsg("invalid xlog switch record at %X/%X",
+ RecPtr->xlogid, RecPtr->xrecoff)));
+ goto next_record_is_invalid;
+ }
+ }
+ else if (record->xl_len == 0)
{
ereport(emode,
(errmsg("record with zero length at %X/%X",
@@ -2994,6 +3145,7 @@ got_record:;
pageHeaderSize +
MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len);
ReadRecPtr = *RecPtr;
+ /* needn't worry about XLOG SWITCH, it can't cross page boundaries */
return record;
}
@@ -3007,6 +3159,22 @@ got_record:;
EndRecPtr.xrecoff = RecPtr->xrecoff + MAXALIGN(total_len);
ReadRecPtr = *RecPtr;
memcpy(buffer, record, total_len);
+ /*
+ * Special processing if it's an XLOG SWITCH record
+ */
+ if (record->xl_rmid == RM_XLOG_ID && record->xl_info == XLOG_SWITCH)
+ {
+ /* Pretend it extends to end of segment */
+ EndRecPtr.xrecoff += XLogSegSize - 1;
+ EndRecPtr.xrecoff -= EndRecPtr.xrecoff % XLogSegSize;
+ nextRecord = NULL; /* definitely not on same page */
+ /*
+ * Pretend that readBuf contains the last page of the segment.
+ * This is just to avoid Assert failure in StartupXLOG if XLOG
+ * ends with this segment.
+ */
+ readOff = XLogSegSize - XLOG_BLCKSZ;
+ }
return (XLogRecord *) buffer;
next_record_is_invalid:;
@@ -5262,7 +5430,7 @@ CreateCheckPoint(bool shutdown, bool force)
freespace = INSERT_FREESPACE(Insert);
if (freespace < SizeOfXLogRecord)
{
- (void) AdvanceXLInsertBuffer();
+ (void) AdvanceXLInsertBuffer(false);
/* OK to ignore update return flag, since we will do flush anyway */
freespace = INSERT_FREESPACE(Insert);
}
@@ -5448,6 +5616,33 @@ XLogPutNextOid(Oid nextOid)
}
/*
+ * Write an XLOG SWITCH record.
+ *
+ * Here we just blindly issue an XLogInsert request for the record.
+ * All the magic happens inside XLogInsert.
+ *
+ * The return value is either the end+1 address of the switch record,
+ * or the end+1 address of the prior segment if we did not need to
+ * write a switch record because we are already at segment start.
+ */
+static XLogRecPtr
+RequestXLogSwitch(void)
+{
+ XLogRecPtr RecPtr;
+ XLogRecData rdata;
+
+ /* XLOG SWITCH, alone among xlog record types, has no data */
+ rdata.buffer = InvalidBuffer;
+ rdata.data = NULL;
+ rdata.len = 0;
+ rdata.next = NULL;
+
+ RecPtr = XLogInsert(RM_XLOG_ID, XLOG_SWITCH, &rdata);
+
+ return RecPtr;
+}
+
+/*
* XLOG resource manager's routines
*/
void
@@ -5515,6 +5710,10 @@ xlog_redo(XLogRecPtr lsn, XLogRecord *record)
(errmsg("unexpected timeline ID %u (should be %u) in checkpoint record",
checkPoint.ThisTimeLineID, ThisTimeLineID)));
}
+ else if (info == XLOG_SWITCH)
+ {
+ /* nothing to do here */
+ }
}
void
@@ -5544,6 +5743,10 @@ xlog_desc(StringInfo buf, uint8 xl_info, char *rec)
memcpy(&nextOid, rec, sizeof(Oid));
appendStringInfo(buf, "nextOid: %u", nextOid);
}
+ else if (info == XLOG_SWITCH)
+ {
+ appendStringInfo(buf, "xlog switch");
+ }
else
appendStringInfo(buf, "UNKNOWN");
}
@@ -5694,7 +5897,7 @@ issue_xlog_fsync(void)
* where it will be archived as part of the backup dump. The label file
* contains the user-supplied label string (typically this would be used
* to tell where the backup dump will be stored) and the starting time and
- * starting WAL offset for the dump.
+ * starting WAL location for the dump.
*/
Datum
pg_start_backup(PG_FUNCTION_ARGS)
@@ -5844,7 +6047,7 @@ pg_start_backup(PG_FUNCTION_ARGS)
PG_END_TRY();
/*
- * We're done. As a convenience, return the starting WAL offset.
+ * We're done. As a convenience, return the starting WAL location.
*/
snprintf(xlogfilename, sizeof(xlogfilename), "%X/%X",
startpoint.xlogid, startpoint.xrecoff);
@@ -5859,13 +6062,12 @@ pg_start_backup(PG_FUNCTION_ARGS)
* We remove the backup label file created by pg_start_backup, and instead
* create a backup history file in pg_xlog (whence it will immediately be
* archived). The backup history file contains the same info found in
- * the label file, plus the backup-end time and WAL offset.
+ * the label file, plus the backup-end time and WAL location.
*/
Datum
pg_stop_backup(PG_FUNCTION_ARGS)
{
text *result;
- XLogCtlInsert *Insert = &XLogCtl->Insert;
XLogRecPtr startpoint;
XLogRecPtr stoppoint;
time_t stamp_time;
@@ -5886,15 +6088,20 @@ pg_stop_backup(PG_FUNCTION_ARGS)
(errmsg("must be superuser to run a backup"))));
/*
- * Get the current end-of-WAL position; it will be unsafe to use this dump
- * to restore to a point in advance of this time. We can also clear
- * forcePageWrites here.
+ * OK to clear forcePageWrites
*/
LWLockAcquire(WALInsertLock, LW_EXCLUSIVE);
- INSERT_RECPTR(stoppoint, Insert, Insert->curridx);
XLogCtl->Insert.forcePageWrites = false;
LWLockRelease(WALInsertLock);
+ /*
+ * Force a switch to a new xlog segment file, so that the backup
+ * is valid as soon as archiver moves out the current segment file.
+ * We'll report the end address of the XLOG SWITCH record as the backup
+ * stopping point.
+ */
+ stoppoint = RequestXLogSwitch();
+
XLByteToSeg(stoppoint, _logId, _logSeg);
XLogFileName(stopxlogfilename, ThisTimeLineID, _logId, _logSeg);
@@ -5984,7 +6191,7 @@ pg_stop_backup(PG_FUNCTION_ARGS)
CleanupBackupHistory();
/*
- * We're done. As a convenience, return the ending WAL offset.
+ * We're done. As a convenience, return the ending WAL location.
*/
snprintf(stopxlogfilename, sizeof(stopxlogfilename), "%X/%X",
stoppoint.xlogid, stoppoint.xrecoff);
@@ -5994,6 +6201,144 @@ pg_stop_backup(PG_FUNCTION_ARGS)
}
/*
+ * pg_switch_xlog: switch to next xlog file
+ */
+Datum
+pg_switch_xlog(PG_FUNCTION_ARGS)
+{
+ text *result;
+ XLogRecPtr switchpoint;
+ char location[MAXFNAMELEN];
+
+ if (!superuser())
+ ereport(ERROR,
+ (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+ (errmsg("must be superuser to switch xlog files"))));
+
+ switchpoint = RequestXLogSwitch();
+
+ /*
+ * As a convenience, return the WAL location of the switch record
+ */
+ snprintf(location, sizeof(location), "%X/%X",
+ switchpoint.xlogid, switchpoint.xrecoff);
+ result = DatumGetTextP(DirectFunctionCall1(textin,
+ CStringGetDatum(location)));
+ PG_RETURN_TEXT_P(result);
+}
+
+/*
+ * Report the current WAL location (same format as pg_start_backup etc)
+ */
+Datum
+pg_current_xlog_location(PG_FUNCTION_ARGS)
+{
+ text *result;
+ XLogCtlInsert *Insert = &XLogCtl->Insert;
+ XLogRecPtr current_recptr;
+ char location[MAXFNAMELEN];
+
+ /*
+ * Get the current end-of-WAL position ... shared lock is sufficient
+ */
+ LWLockAcquire(WALInsertLock, LW_SHARED);
+ INSERT_RECPTR(current_recptr, Insert, Insert->curridx);
+ LWLockRelease(WALInsertLock);
+
+ snprintf(location, sizeof(location), "%X/%X",
+ current_recptr.xlogid, current_recptr.xrecoff);
+
+ result = DatumGetTextP(DirectFunctionCall1(textin,
+ CStringGetDatum(location)));
+ PG_RETURN_TEXT_P(result);
+}
+
+/*
+ * Compute an xlog file name and decimal byte offset given a WAL location,
+ * such as is returned by pg_stop_backup() or pg_xlog_switch().
+ *
+ * Note that a location exactly at a segment boundary is taken to be in
+ * the previous segment. This is usually the right thing, since the
+ * expected usage is to determine which xlog file(s) are ready to archive.
+ */
+Datum
+pg_xlogfile_name_offset(PG_FUNCTION_ARGS)
+{
+ text *location = PG_GETARG_TEXT_P(0);
+ text *result;
+ char *locationstr;
+ unsigned int uxlogid;
+ unsigned int uxrecoff;
+ uint32 xlogid;
+ uint32 xlogseg;
+ uint32 xrecoff;
+ XLogRecPtr locationpoint;
+ char xlogfilename[MAXFNAMELEN];
+
+ locationstr = DatumGetCString(DirectFunctionCall1(textout,
+ PointerGetDatum(location)));
+
+ if (sscanf(locationstr, "%X/%X", &uxlogid, &uxrecoff) != 2)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("could not parse xlog location \"%s\"",
+ locationstr)));
+
+ locationpoint.xlogid = uxlogid;
+ locationpoint.xrecoff = uxrecoff;
+
+ XLByteToPrevSeg(locationpoint, xlogid, xlogseg);
+ XLogFileName(xlogfilename, ThisTimeLineID, xlogid, xlogseg);
+
+ xrecoff = locationpoint.xrecoff - xlogseg * XLogSegSize;
+ snprintf(xlogfilename + strlen(xlogfilename),
+ sizeof(xlogfilename) - strlen(xlogfilename),
+ " %u",
+ (unsigned int) xrecoff);
+
+ result = DatumGetTextP(DirectFunctionCall1(textin,
+ CStringGetDatum(xlogfilename)));
+ PG_RETURN_TEXT_P(result);
+}
+
+/*
+ * Compute an xlog file name given a WAL location,
+ * such as is returned by pg_stop_backup() or pg_xlog_switch().
+ */
+Datum
+pg_xlogfile_name(PG_FUNCTION_ARGS)
+{
+ text *location = PG_GETARG_TEXT_P(0);
+ text *result;
+ char *locationstr;
+ unsigned int uxlogid;
+ unsigned int uxrecoff;
+ uint32 xlogid;
+ uint32 xlogseg;
+ XLogRecPtr locationpoint;
+ char xlogfilename[MAXFNAMELEN];
+
+ locationstr = DatumGetCString(DirectFunctionCall1(textout,
+ PointerGetDatum(location)));
+
+ if (sscanf(locationstr, "%X/%X", &uxlogid, &uxrecoff) != 2)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("could not parse xlog location \"%s\"",
+ locationstr)));
+
+ locationpoint.xlogid = uxlogid;
+ locationpoint.xrecoff = uxrecoff;
+
+ XLByteToPrevSeg(locationpoint, xlogid, xlogseg);
+ XLogFileName(xlogfilename, ThisTimeLineID, xlogid, xlogseg);
+
+ result = DatumGetTextP(DirectFunctionCall1(textin,
+ CStringGetDatum(xlogfilename)));
+ PG_RETURN_TEXT_P(result);
+}
+
+/*
* read_backup_label: check to see if a backup_label file is present
*
* If we see a backup_label during recovery, we assume that we are recovering
@@ -6133,8 +6478,8 @@ rm_redo_error_callback(void *arg)
StringInfoData buf;
initStringInfo(&buf);
- RmgrTable[record->xl_rmid].rm_desc(&buf,
- record->xl_info,
+ RmgrTable[record->xl_rmid].rm_desc(&buf,
+ record->xl_info,
XLogRecGetData(record));
/* don't bother emitting empty description */