diff options
Diffstat (limited to 'src')
46 files changed, 3182 insertions, 206 deletions
diff --git a/src/Makefile b/src/Makefile index 966cb946879..80c38cb568a 100644 --- a/src/Makefile +++ b/src/Makefile @@ -4,7 +4,7 @@ # # Copyright (c) 1994, Regents of the University of California # -# $PostgreSQL: pgsql/src/Makefile,v 1.47 2009/08/26 22:24:42 petere Exp $ +# $PostgreSQL: pgsql/src/Makefile,v 1.48 2010/01/15 09:18:59 heikki Exp $ # #------------------------------------------------------------------------- @@ -18,6 +18,7 @@ all install installdirs uninstall distprep: $(MAKE) -C timezone $@ $(MAKE) -C backend $@ $(MAKE) -C backend/utils/mb/conversion_procs $@ + $(MAKE) -C backend/replication/walreceiver $@ $(MAKE) -C backend/snowball $@ $(MAKE) -C include $@ $(MAKE) -C interfaces $@ diff --git a/src/backend/Makefile b/src/backend/Makefile index 21f8d4d32a7..7a95139eb70 100644 --- a/src/backend/Makefile +++ b/src/backend/Makefile @@ -5,7 +5,7 @@ # Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group # Portions Copyright (c) 1994, Regents of the University of California # -# $PostgreSQL: pgsql/src/backend/Makefile,v 1.139 2010/01/05 01:20:35 tgl Exp $ +# $PostgreSQL: pgsql/src/backend/Makefile,v 1.140 2010/01/15 09:18:59 heikki Exp $ # #------------------------------------------------------------------------- @@ -15,7 +15,7 @@ top_builddir = ../.. include $(top_builddir)/src/Makefile.global SUBDIRS = access bootstrap catalog parser commands executor foreign lib libpq \ - main nodes optimizer port postmaster regex rewrite \ + main nodes optimizer port postmaster regex replication rewrite \ storage tcop tsearch utils $(top_builddir)/src/timezone include $(srcdir)/common.mk diff --git a/src/backend/access/nbtree/nbtsort.c b/src/backend/access/nbtree/nbtsort.c index 9ea85559412..68747beae5f 100644 --- a/src/backend/access/nbtree/nbtsort.c +++ b/src/backend/access/nbtree/nbtsort.c @@ -59,7 +59,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/access/nbtree/nbtsort.c,v 1.121 2010/01/02 16:57:35 momjian Exp $ + * $PostgreSQL: pgsql/src/backend/access/nbtree/nbtsort.c,v 1.122 2010/01/15 09:19:00 heikki Exp $ * *------------------------------------------------------------------------- */ @@ -210,10 +210,10 @@ _bt_leafbuild(BTSpool *btspool, BTSpool *btspool2) wstate.index = btspool->index; /* - * We need to log index creation in WAL iff WAL archiving is enabled AND - * it's not a temp index. + * We need to log index creation in WAL iff WAL archiving/streaming is + * enabled AND it's not a temp index. */ - wstate.btws_use_wal = XLogArchivingActive() && !wstate.index->rd_istemp; + wstate.btws_use_wal = XLogIsNeeded() && !wstate.index->rd_istemp; /* reserve the metapage */ wstate.btws_pages_alloced = BTREE_METAPAGE + 1; diff --git a/src/backend/access/transam/recovery.conf.sample b/src/backend/access/transam/recovery.conf.sample index cdbb49295fd..34d3b613bcd 100644 --- a/src/backend/access/transam/recovery.conf.sample +++ b/src/backend/access/transam/recovery.conf.sample @@ -2,13 +2,14 @@ # PostgreSQL recovery config file # ------------------------------- # -# Edit this file to provide the parameters that PostgreSQL -# needs to perform an archive recovery of a database. +# Edit this file to provide the parameters that PostgreSQL needs to +# perform an archive recovery of a database, or to act as a log-streaming +# replication standby. # # If "recovery.conf" is present in the PostgreSQL data directory, it is # read on postmaster startup. After successful recovery, it is renamed # to "recovery.done" to ensure that we do not accidentally re-enter -# archive recovery mode. +# archive recovery or standby mode. # # This file consists of lines of the form: # @@ -23,7 +24,7 @@ # are example values. # #--------------------------------------------------------------------------- -# REQUIRED PARAMETERS +# ARCHIVE RECOVERY PARAMETERS #--------------------------------------------------------------------------- # # restore_command @@ -33,6 +34,9 @@ # which is replaced by the name of the desired log file, and %p, # which is replaced by the absolute path to copy the log file to. # +# This parameter is *required* for an archive recovery, but optional +# for replication. +# # It is important that the command return nonzero exit status on failure. # The command *will* be asked for log files that are not present in the # archive; it must return nonzero when so asked. @@ -43,10 +47,6 @@ #restore_command = 'cp /mnt/server/archivedir/%f %p' # # -#--------------------------------------------------------------------------- -# OPTIONAL PARAMETERS -#--------------------------------------------------------------------------- -# # recovery_end_command # # specifies an optional shell command to execute at completion of recovery. @@ -79,6 +79,28 @@ # # #--------------------------------------------------------------------------- +# LOG-STREAMING REPLICATION PARAMETERS +#--------------------------------------------------------------------------- +# +# When standby_mode is enabled, the PostgreSQL server will work as +# a standby. It tries to connect to the primary according to the +# connection settings primary_conninfo, and receives XLOG records +# continuously. +# +#standby_mode = 'false' # 'true' or 'false' +# +#primary_conninfo = 'host=localhost port=5432' +# +# +# By default, a standby server keeps streaming XLOG records from the +# primary indefinitely. If you want to stop streaming and finish recovery, +# opening up the system in read/write mode, specify path to a trigger file. +# Server will poll the trigger file path periodically and stop streaming +# when it's found. +# +#trigger_file = '' +# +#--------------------------------------------------------------------------- # HOT STANDBY PARAMETERS #--------------------------------------------------------------------------- # diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index 9a13a98cbc1..a0dddcad813 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.357 2010/01/04 12:50:49 heikki Exp $ + * $PostgreSQL: pgsql/src/backend/access/transam/xlog.c,v 1.358 2010/01/15 09:19:00 heikki Exp $ * *------------------------------------------------------------------------- */ @@ -41,6 +41,8 @@ #include "miscadmin.h" #include "pgstat.h" #include "postmaster/bgwriter.h" +#include "replication/walreceiver.h" +#include "replication/walsender.h" #include "storage/bufmgr.h" #include "storage/fd.h" #include "storage/ipc.h" @@ -142,6 +144,16 @@ HotStandbyState standbyState = STANDBY_DISABLED; static XLogRecPtr LastRec; /* + * Are we doing recovery from XLOG stream? If so, we recover without using + * offline XLOG archives even though InArchiveRecovery==true. This flag is + * used only in standby mode. + */ +static bool InStreamingRecovery = false; + +/* The current log page is partially-filled, and so needs to be read again? */ +static bool needReread = false; + +/* * Local copy of SharedRecoveryInProgress variable. True actually means "not * known, need to check the shared state". */ @@ -165,7 +177,7 @@ static bool InArchiveRecovery = false; /* Was the last xlog file restored from archive, or local? */ static bool restoredFromArchive = false; -/* options taken from recovery.conf */ +/* options taken from recovery.conf for archive recovery */ static char *recoveryRestoreCommand = NULL; static char *recoveryEndCommand = NULL; static bool recoveryTarget = false; @@ -175,6 +187,11 @@ static TransactionId recoveryTargetXid; static TimestampTz recoveryTargetTime; static TimestampTz recoveryLastXTime = 0; +/* options taken from recovery.conf for XLOG streaming */ +static bool StandbyMode = false; +static char *PrimaryConnInfo = NULL; +char *TriggerFile = NULL; + /* if recoveryStopsHere returns true, it saves actual stop xid/time here */ static TransactionId recoveryStopXid; static TimestampTz recoveryStopTime; @@ -229,6 +246,18 @@ XLogRecPtr XactLastRecEnd = {0, 0}; */ static XLogRecPtr RedoRecPtr; +/* + * RedoStartLSN points to the checkpoint's REDO location which is specified + * in a backup label file, backup history file or control file. In standby + * mode, XLOG streaming usually starts from the position where an invalid + * record was found. But if we fail to read even the initial checkpoint + * record, we use the REDO location instead of the checkpoint location as + * the start position of XLOG streaming. Otherwise we would have to jump + * backwards to the REDO location after reading the checkpoint record, + * because the REDO record can precede the checkpoint record. + */ +static XLogRecPtr RedoStartLSN = {0, 0}; + /*---------- * Shared-memory data structures for XLOG control * @@ -349,6 +378,7 @@ typedef struct XLogCtlData XLogRecPtr *xlblocks; /* 1st byte ptr-s + XLOG_BLCKSZ */ int XLogCacheBlck; /* highest allocated xlog buffer index */ TimeLineID ThisTimeLineID; + TimeLineID RecoveryTargetTLI; /* * SharedRecoveryInProgress indicates if we're still in crash or archive @@ -369,6 +399,8 @@ typedef struct XLogCtlData XLogRecPtr replayEndRecPtr; /* timestamp of last record replayed (or being replayed) */ TimestampTz recoveryLastXTime; + /* end+1 of the last record replayed */ + XLogRecPtr recoveryLastRecPtr; slock_t info_lck; /* locks shared variables shown above */ } XLogCtlData; @@ -481,12 +513,9 @@ static bool XLogCheckBuffer(XLogRecData *rdata, bool doPageWrites, XLogRecPtr *lsn, BkpBlock *bkpb); static bool AdvanceXLInsertBuffer(bool new_segment); static void XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch); -static int XLogFileInit(uint32 log, uint32 seg, - bool *use_existent, bool use_lock); static bool InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath, bool find_free, int *max_advance, bool use_lock); -static int XLogFileOpen(uint32 log, uint32 seg); static int XLogFileRead(uint32 log, uint32 seg, int emode); static void XLogFileClose(void); static bool RestoreArchivedFile(char *path, const char *xlogfname, @@ -497,6 +526,7 @@ static void RemoveOldXlogFiles(uint32 log, uint32 seg, XLogRecPtr endptr); static void ValidateXLOGDirectoryStructure(void); static void CleanupBackupHistory(void); static void UpdateMinRecoveryPoint(XLogRecPtr lsn, bool force); +static XLogRecord *FetchRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt); static XLogRecord *ReadRecord(XLogRecPtr *RecPtr, int emode); static bool ValidXLOGHeader(XLogPageHeader hdr, int emode); static XLogRecord *ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt); @@ -513,7 +543,6 @@ static char *str_time(pg_time_t tnow); #ifdef WAL_DEBUG static void xlog_outrec(StringInfo buf, XLogRecord *record); #endif -static void issue_xlog_fsync(void); static void pg_start_backup_callback(int code, Datum arg); static bool read_backup_label(XLogRecPtr *checkPointLoc); static void rm_redo_error_callback(void *arg); @@ -1690,7 +1719,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch) */ if (finishing_seg || (xlog_switch && last_iteration)) { - issue_xlog_fsync(); + issue_xlog_fsync(openLogFile, openLogId, openLogSeg); LogwrtResult.Flush = LogwrtResult.Write; /* end of page */ if (XLogArchivingActive()) @@ -1754,7 +1783,7 @@ XLogWrite(XLogwrtRqst WriteRqst, bool flexible, bool xlog_switch) openLogFile = XLogFileOpen(openLogId, openLogSeg); openLogOff = 0; } - issue_xlog_fsync(); + issue_xlog_fsync(openLogFile, openLogId, openLogSeg); } LogwrtResult.Flush = LogwrtResult.Write; } @@ -2189,7 +2218,7 @@ XLogNeedsFlush(XLogRecPtr record) * take down the system on failure). They will promote to PANIC if we are * in a critical section. */ -static int +int XLogFileInit(uint32 log, uint32 seg, bool *use_existent, bool use_lock) { @@ -2536,7 +2565,7 @@ InstallXLogFileSegment(uint32 *log, uint32 *seg, char *tmppath, /* * Open a pre-existing logfile segment for writing. */ -static int +int XLogFileOpen(uint32 log, uint32 seg) { char path[MAXPGPATH]; @@ -2586,7 +2615,7 @@ XLogFileRead(uint32 log, uint32 seg, int emode) XLogFileName(xlogfname, tli, log, seg); - if (InArchiveRecovery) + if (InArchiveRecovery && !InStreamingRecovery) { /* Report recovery progress in PS display */ snprintf(activitymsg, sizeof(activitymsg), "waiting for %s", @@ -2641,12 +2670,13 @@ XLogFileClose(void) /* * WAL segment files will not be re-read in normal operation, so we advise * the OS to release any cached pages. But do not do so if WAL archiving - * is active, because archiver process could use the cache to read the WAL - * segment. Also, don't bother with it if we are using O_DIRECT, since - * the kernel is presumably not caching in that case. + * or streaming is active, because archiver and walsender process could use + * the cache to read the WAL segment. Also, don't bother with it if we + * are using O_DIRECT, since the kernel is presumably not caching in that + * case. */ #if defined(USE_POSIX_FADVISE) && defined(POSIX_FADV_DONTNEED) - if (!XLogArchivingActive() && + if (!XLogIsNeeded() && (get_sync_bit(sync_method) & PG_O_DIRECT) == 0) (void) posix_fadvise(openLogFile, 0, 0, POSIX_FADV_DONTNEED); #endif @@ -2689,6 +2719,10 @@ RestoreArchivedFile(char *path, const char *xlogfname, uint32 restartLog; uint32 restartSeg; + /* In standby mode, restore_command might not be supplied */ + if (StandbyMode && recoveryRestoreCommand == NULL) + goto not_available; + /* * When doing archive recovery, we always prefer an archived log file even * if a file of the same name exists in XLOGDIR. The reason is that the @@ -2913,6 +2947,7 @@ RestoreArchivedFile(char *path, const char *xlogfname, (errmsg("could not restore file \"%s\" from archive: return code %d", xlogfname, rc))); +not_available: /* * if an archived file is not available, there might still be a version of * this file in XLOGDIR, so return that as the filename to open. @@ -3117,7 +3152,18 @@ RemoveOldXlogFiles(uint32 log, uint32 seg, XLogRecPtr endptr) strspn(xlde->d_name, "0123456789ABCDEF") == 24 && strcmp(xlde->d_name + 8, lastoff + 8) <= 0) { - if (XLogArchiveCheckDone(xlde->d_name)) + /* + * Normally we don't delete old XLOG files during recovery to + * avoid accidentally deleting a file that looks stale due to a + * bug or hardware issue, but in fact contains important data. + * During streaming recovery, however, we will eventually fill the + * disk if we never clean up, so we have to. That's not an issue + * with file-based archive recovery because in that case we + * restore one XLOG file at a time, on-demand, and with a + * different filename that can't be confused with regular XLOG + * files. + */ + if (InStreamingRecovery || XLogArchiveCheckDone(xlde->d_name)) { snprintf(path, MAXPGPATH, XLOGDIR "/%s", xlde->d_name); @@ -3428,19 +3474,92 @@ RecordIsValid(XLogRecord *record, XLogRecPtr recptr, int emode) } /* + * Attempt to fetch an XLOG record. + * + * If RecPtr is not NULL, try to fetch a record at that position. Otherwise + * try to fetch a record just after the last one previously read. + * + * In standby mode, if we failed in reading a valid record and are not doing + * recovery from XLOG stream yet, we ignore the failure and start walreceiver + * process to fetch the record from the primary. Otherwise, returns NULL, + * or fails if emode is PANIC. (emode must be either PANIC or LOG.) + * + * If fetching_ckpt is TRUE, RecPtr points to the checkpoint location. In + * this case, if we have to start XLOG streaming, we use RedoStartLSN as the + * streaming start position instead of RecPtr. + * + * The record is copied into readRecordBuf, so that on successful return, + * the returned record pointer always points there. + */ +static XLogRecord * +FetchRecord(XLogRecPtr *RecPtr, int emode, bool fetching_ckpt) +{ + if (StandbyMode && !InStreamingRecovery) + { + XLogRecord *record; + XLogRecPtr startlsn; + bool haveNextRecord = (nextRecord != NULL); + + /* An invalid record is OK here, so we set emode to DEBUG2 */ + record = ReadRecord(RecPtr, DEBUG2); + if (record != NULL) + return record; + + /* + * Start XLOG streaming if there is no more valid records available + * in the archive. + * + * We need to calculate the start position of XLOG streaming. If we + * read a record in the middle of a segment which doesn't exist in + * pg_xlog, we use the start of the segment as the start position. + * That prevents a broken segment (i.e., with no records in the + * first half of a segment) from being created by XLOG streaming, + * which might cause trouble later on if the segment is e.g + * archived. + */ + startlsn = fetching_ckpt ? RedoStartLSN : EndRecPtr; + if (startlsn.xrecoff % XLogSegSize != 0) + { + char xlogpath[MAXPGPATH]; + struct stat stat_buf; + uint32 log; + uint32 seg; + + XLByteToSeg(startlsn, log, seg); + XLogFilePath(xlogpath, recoveryTargetTLI, log, seg); + + if (stat(xlogpath, &stat_buf) != 0) + startlsn.xrecoff -= startlsn.xrecoff % XLogSegSize; + } + RequestXLogStreaming(startlsn, PrimaryConnInfo); + + /* Needs to read the current page again if the next record is in it */ + needReread = haveNextRecord; + nextRecord = NULL; + + InStreamingRecovery = true; + ereport(LOG, + (errmsg("starting streaming recovery at %X/%X", + startlsn.xlogid, startlsn.xrecoff))); + } + + return ReadRecord(RecPtr, emode); +} + +/* * Attempt to read an XLOG record. * * If RecPtr is not NULL, try to read a record at that position. Otherwise * try to read a record just after the last one previously read. * * If no valid record is available, returns NULL, or fails if emode is PANIC. - * (emode must be either PANIC or LOG.) + * (emode must be either PANIC, LOG or DEBUG2.) * * The record is copied into readRecordBuf, so that on successful return, * the returned record pointer always points there. */ static XLogRecord * -ReadRecord(XLogRecPtr *RecPtr, int emode) +ReadRecord(XLogRecPtr *RecPtr, int emode_arg) { XLogRecord *record; char *buffer; @@ -3451,6 +3570,19 @@ ReadRecord(XLogRecPtr *RecPtr, int emode) uint32 targetPageOff; uint32 targetRecOff; uint32 pageHeaderSize; + XLogRecPtr receivedUpto = {0,0}; + bool finished; + int emode; + + /* + * We don't expect any invalid records during streaming recovery: we + * should never hit the end of WAL because we wait for it to be streamed. + * Therefore treat any broken WAL as PANIC, instead of failing over. + */ + if (InStreamingRecovery) + emode = PANIC; + else + emode = emode_arg; if (readBuf == NULL) { @@ -3474,14 +3606,13 @@ ReadRecord(XLogRecPtr *RecPtr, int emode) record = nextRecord; goto got_record; } - /* align old recptr to next page */ - if (tmpRecPtr.xrecoff % XLOG_BLCKSZ != 0) - tmpRecPtr.xrecoff += (XLOG_BLCKSZ - tmpRecPtr.xrecoff % XLOG_BLCKSZ); - if (tmpRecPtr.xrecoff >= XLogFileSize) - { - (tmpRecPtr.xlogid)++; - tmpRecPtr.xrecoff = 0; - } + + /* + * Align old recptr to next page if the current page is filled and + * doesn't need to be read again. + */ + if (!needReread) + NextLogPage(tmpRecPtr); /* We will account for page header size below */ } else @@ -3507,6 +3638,21 @@ ReadRecord(XLogRecPtr *RecPtr, int emode) close(readFile); readFile = -1; } + + /* Is the target record ready yet? */ + if (InStreamingRecovery) + { + receivedUpto = WaitNextXLogAvailable(*RecPtr, &finished); + if (finished) + { + if (emode_arg == PANIC) + ereport(PANIC, + (errmsg("streaming recovery ended"))); + else + return NULL; + } + } + XLByteToSeg(*RecPtr, readId, readSeg); if (readFile < 0) { @@ -3539,9 +3685,10 @@ ReadRecord(XLogRecPtr *RecPtr, int emode) } targetPageOff = ((RecPtr->xrecoff % XLogSegSize) / XLOG_BLCKSZ) * XLOG_BLCKSZ; - if (readOff != targetPageOff) + if (readOff != targetPageOff || needReread) { readOff = targetPageOff; + needReread = false; if (lseek(readFile, (off_t) readOff, SEEK_SET) < 0) { ereport(emode, @@ -3697,6 +3844,7 @@ got_record:; { /* Need to reassemble record */ XLogContRecord *contrecord; + XLogRecPtr nextpagelsn = *RecPtr; uint32 gotlen = len; memcpy(buffer, record, len); @@ -3704,6 +3852,23 @@ got_record:; buffer += len; for (;;) { + /* Is the next page ready yet? */ + if (InStreamingRecovery) + { + if (gotlen != len) + nextpagelsn.xrecoff += XLOG_BLCKSZ; + NextLogPage(nextpagelsn); + receivedUpto = WaitNextXLogAvailable(nextpagelsn, &finished); + if (finished) + { + if (emode_arg == PANIC) + ereport(PANIC, + (errmsg("streaming recovery ended"))); + else + return NULL; + } + } + readOff += XLOG_BLCKSZ; if (readOff >= XLogSegSize) { @@ -3768,6 +3933,21 @@ got_record:; EndRecPtr.xrecoff = readSeg * XLogSegSize + readOff + pageHeaderSize + MAXALIGN(SizeOfXLogContRecord + contrecord->xl_rem_len); + + /* + * Check whether the current page needs to be read again. If there is no + * unread record in the current page (nextRecord == NULL), obviously we + * don't need to reread it. If we're not in streaming recovery mode yet, + * partially-filled page doesn't need to be reread because it is the + * last valid page. + */ + if (nextRecord != NULL && InStreamingRecovery && + XLByteLE(receivedUpto, EndRecPtr)) + { + nextRecord = NULL; + needReread = true; + } + ReadRecPtr = *RecPtr; /* needn't worry about XLOG SWITCH, it can't cross page boundaries */ return record; @@ -3781,6 +3961,21 @@ got_record:; nextRecord = (XLogRecord *) ((char *) record + MAXALIGN(total_len)); EndRecPtr.xlogid = RecPtr->xlogid; EndRecPtr.xrecoff = RecPtr->xrecoff + MAXALIGN(total_len); + + /* + * Check whether the current page needs to be read again. If there is no + * unread record in the current page (nextRecord == NULL), obviously we + * don't need to reread it. If we're not in streaming recovery mode yet, + * partially-filled page doesn't need to be reread because it is the last + * valid page. + */ + if (nextRecord != NULL && InStreamingRecovery && + XLByteLE(receivedUpto, EndRecPtr)) + { + nextRecord = NULL; + needReread = true; + } + ReadRecPtr = *RecPtr; memcpy(buffer, record, total_len); @@ -3793,6 +3988,7 @@ got_record:; EndRecPtr.xrecoff += XLogSegSize - 1; EndRecPtr.xrecoff -= EndRecPtr.xrecoff % XLogSegSize; nextRecord = NULL; /* definitely not on same page */ + needReread = false; /* * Pretend that readBuf contains the last page of the segment. This is @@ -4587,6 +4783,16 @@ UpdateControlFile(void) } /* + * Returns the unique system identifier from control file. + */ +uint64 +GetSystemIdentifier(void) +{ + Assert(ControlFile != NULL); + return ControlFile->system_identifier; +} + +/* * Initialization of shared memory for XLOG */ Size @@ -4822,7 +5028,7 @@ str_time(pg_time_t tnow) /* * See if there is a recovery command file (recovery.conf), and if so - * read in parameters for archive recovery. + * read in parameters for archive recovery and XLOG streaming. * * XXX longer term intention is to expand this to * cater for additional parameters and controls @@ -4974,6 +5180,29 @@ readRecoveryCommandFile(void) ereport(LOG, (errmsg("recovery_target_inclusive = %s", tok2))); } + else if (strcmp(tok1, "standby_mode") == 0) + { + if (!parse_bool(tok2, &StandbyMode)) + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("parameter \"standby_mode\" requires a Boolean value"))); + ereport(LOG, + (errmsg("standby_mode = '%s'", tok2))); + } + else if (strcmp(tok1, "primary_conninfo") == 0) + { + PrimaryConnInfo = pstrdup(tok2); + ereport(LOG, + (errmsg("primary_conninfo = '%s'", + PrimaryConnInfo))); + } + else if (strcmp(tok1, "trigger_file") == 0) + { + TriggerFile = pstrdup(tok2); + ereport(LOG, + (errmsg("trigger_file = '%s'", + TriggerFile))); + } else ereport(FATAL, (errmsg("unrecognized recovery parameter \"%s\"", @@ -4988,10 +5217,10 @@ readRecoveryCommandFile(void) cmdline), errhint("Lines should have the format parameter = 'value'."))); - /* Check that required parameters were supplied */ - if (recoveryRestoreCommand == NULL) + /* If not in standby mode, restore_command must be supplied */ + if (!StandbyMode && recoveryRestoreCommand == NULL) ereport(FATAL, - (errmsg("recovery command file \"%s\" did not specify restore_command", + (errmsg("recovery command file \"%s\" did not specify restore_command nor standby_mode", RECOVERY_COMMAND_FILE))); /* Enable fetching from archive recovery area */ @@ -5452,6 +5681,9 @@ StartupXLOG(void) recoveryTargetTLI, ControlFile->checkPointCopy.ThisTimeLineID))); + /* Save the selected recovery target timeline ID in shared memory */ + XLogCtl->RecoveryTargetTLI = recoveryTargetTLI; + if (read_backup_label(&checkPointLoc)) { /* @@ -5482,6 +5714,7 @@ StartupXLOG(void) * to pg_control is broken, try the next-to-last one. */ checkPointLoc = ControlFile->checkPoint; + RedoStartLSN = ControlFile->checkPointCopy.redo; record = ReadCheckpointRecord(checkPointLoc, 1); if (record != NULL) { @@ -5489,6 +5722,15 @@ StartupXLOG(void) (errmsg("checkpoint record is at %X/%X", checkPointLoc.xlogid, checkPointLoc.xrecoff))); } + else if (InStreamingRecovery) + { + /* + * The last valid checkpoint record required for a streaming + * recovery exists in neither standby nor the primary. + */ + ereport(PANIC, + (errmsg("could not locate a valid checkpoint record"))); + } else { checkPointLoc = ControlFile->prevCheckPoint; @@ -5688,12 +5930,12 @@ StartupXLOG(void) if (XLByteLT(checkPoint.redo, RecPtr)) { /* back up to find the record */ - record = ReadRecord(&(checkPoint.redo), PANIC); + record = FetchRecord(&(checkPoint.redo), PANIC, false); } else { /* just have to read next record after CheckPoint */ - record = ReadRecord(NULL, LOG); + record = FetchRecord(NULL, LOG, false); } if (record != NULL) @@ -5706,9 +5948,10 @@ StartupXLOG(void) /* use volatile pointer to prevent code rearrangement */ volatile XLogCtlData *xlogctl = XLogCtl; - /* initialize shared replayEndRecPtr */ + /* initialize shared replayEndRecPtr and recoveryLastRecPtr */ SpinLockAcquire(&xlogctl->info_lck); xlogctl->replayEndRecPtr = ReadRecPtr; + xlogctl->recoveryLastRecPtr = ReadRecPtr; SpinLockRelease(&xlogctl->info_lck); InRedo = true; @@ -5762,14 +6005,8 @@ StartupXLOG(void) } #endif - /* - * Check if we were requested to re-read config file. - */ - if (got_SIGHUP) - { - got_SIGHUP = false; - ProcessConfigFile(PGC_SIGHUP); - } + /* Handle interrupt signals of startup process */ + HandleStartupProcInterrupts(); /* * Have we passed our safe starting point? @@ -5841,9 +6078,17 @@ StartupXLOG(void) /* Pop the error context stack */ error_context_stack = errcontext.previous; + /* + * Update shared recoveryLastRecPtr after this record has been + * replayed. + */ + SpinLockAcquire(&xlogctl->info_lck); + xlogctl->recoveryLastRecPtr = EndRecPtr; + SpinLockRelease(&xlogctl->info_lck); + LastRec = ReadRecPtr; - record = ReadRecord(NULL, LOG); + record = FetchRecord(NULL, LOG, false); } while (record != NULL && recoveryContinue); /* @@ -5868,6 +6113,27 @@ StartupXLOG(void) } /* + * If we launched a WAL receiver, it should be gone by now. It will trump + * over the startup checkpoint and subsequent records if it's still alive, + * so be extra sure that it's gone. + */ + if (WalRcvInProgress()) + elog(PANIC, "wal receiver still active"); + + /* + * We are now done reading the xlog from stream. Turn off streaming + * recovery, and restart fetching the files (which would be required + * at end of recovery, e.g., timeline history file) from archive. + */ + if (InStreamingRecovery) + { + /* We are no longer in streaming recovery state */ + InStreamingRecovery = false; + ereport(LOG, + (errmsg("streaming recovery complete"))); + } + + /* * Re-fetch the last valid or last applied record, so we can identify the * exact endpoint of what we consider the valid portion of WAL. */ @@ -6241,7 +6507,7 @@ ReadCheckpointRecord(XLogRecPtr RecPtr, int whichChkpt) return NULL; } - record = ReadRecord(&RecPtr, LOG); + record = FetchRecord(&RecPtr, LOG, true); if (record == NULL) { @@ -6388,6 +6654,26 @@ GetInsertRecPtr(void) } /* + * GetWriteRecPtr -- Returns the current write position. + * + * NOTE: The value returned lags behind the real write position. But, + * an approximation is enough for the current usage of this function. + */ +XLogRecPtr +GetWriteRecPtr(void) +{ + /* use volatile pointer to prevent code rearrangement */ + volatile XLogCtlData *xlogctl = XLogCtl; + XLogRecPtr recptr; + + SpinLockAcquire(&xlogctl->info_lck); + recptr = xlogctl->LogwrtResult.Write; + SpinLockRelease(&xlogctl->info_lck); + + return recptr; +} + +/* * Get the time of the last xlog segment switch */ pg_time_t @@ -6444,6 +6730,16 @@ GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch) } /* + * GetRecoveryTargetTLI - get the recovery target timeline ID + */ +TimeLineID +GetRecoveryTargetTLI(void) +{ + /* RecoveryTargetTLI doesn't change so we need no lock to copy it */ + return XLogCtl->RecoveryTargetTLI; +} + +/* * This must be called ONCE during postmaster or standalone-backend shutdown */ void @@ -6917,8 +7213,34 @@ CreateCheckPoint(int flags) smgrpostckpt(); /* - * Delete old log files (those no longer needed even for previous - * checkpoint). + * If there's connected standby servers doing XLOG streaming, don't + * delete XLOG files that have not been streamed to all of them yet. + * This does nothing to prevent them from being deleted when the + * standby is disconnected (e.g because of network problems), but at + * least it avoids an open replication connection from failing because + * of that. + */ + if ((_logId || _logSeg) && MaxWalSenders > 0) + { + XLogRecPtr oldest; + uint32 log; + uint32 seg; + + oldest = GetOldestWALSendPointer(); + if (oldest.xlogid != 0 || oldest.xrecoff != 0) + { + XLByteToSeg(oldest, log, seg); + if (log < _logId || (log == _logId && seg < _logSeg)) + { + _logId = log; + _logSeg = seg; + } + } + } + + /* + * Delete old log files (those no longer needed even for + * previous checkpoint or the standbys in XLOG streaming). */ if (_logId || _logSeg) { @@ -7036,6 +7358,8 @@ CreateRestartPoint(int flags) { XLogRecPtr lastCheckPointRecPtr; CheckPoint lastCheckPoint; + uint32 _logId; + uint32 _logSeg; /* use volatile pointer to prevent code rearrangement */ volatile XLogCtlData *xlogctl = XLogCtl; @@ -7106,6 +7430,12 @@ CreateRestartPoint(int flags) CheckPointGuts(lastCheckPoint.redo, flags); /* + * Select point at which we can truncate the xlog, which we base on the + * prior checkpoint's earliest info. + */ + XLByteToSeg(ControlFile->checkPointCopy.redo, _logId, _logSeg); + + /* * Update pg_control, using current time. Check that it still shows * IN_ARCHIVE_RECOVERY state and an older checkpoint, else do nothing; * this is a quick hack to make sure nothing really bad happens if @@ -7123,6 +7453,34 @@ CreateRestartPoint(int flags) } LWLockRelease(ControlFileLock); + /* Are we doing recovery from XLOG stream? */ + if (!InStreamingRecovery) + InStreamingRecovery = WalRcvInProgress(); + + /* + * Delete old log files (those no longer needed even for previous + * checkpoint/restartpoint) to prevent the disk holding the xlog from + * growing full. We don't need do this during normal recovery, but during + * streaming recovery we have to or the disk will eventually fill up from + * old log files streamed from master. + */ + if (InStreamingRecovery && (_logId || _logSeg)) + { + XLogRecPtr endptr; + + /* Get the current (or recent) end of xlog */ + endptr = GetWalRcvWriteRecPtr(); + + PrevLogSeg(_logId, _logSeg); + RemoveOldXlogFiles(_logId, _logSeg, endptr); + + /* + * Make more log segments if needed. (Do this after recycling old log + * segments, since that may supply some of the needed files.) + */ + PreallocXlogFiles(endptr); + } + /* * Currently, there is no need to truncate pg_subtrans during recovery. If * we did do that, we will need to have called StartupSUBTRANS() already @@ -7495,36 +7853,39 @@ assign_xlog_sync_method(int new_sync_method, bool doit, GucSource source) /* - * Issue appropriate kind of fsync (if any) on the current XLOG output file + * Issue appropriate kind of fsync (if any) for an XLOG output file. + * + * 'fd' is a file descriptor for the XLOG file to be fsync'd. + * 'log' and 'seg' are for error reporting purposes. */ -static void -issue_xlog_fsync(void) +void +issue_xlog_fsync(int fd, uint32 log, uint32 seg) { switch (sync_method) { case SYNC_METHOD_FSYNC: - if (pg_fsync_no_writethrough(openLogFile) != 0) + if (pg_fsync_no_writethrough(fd) != 0) ereport(PANIC, (errcode_for_file_access(), errmsg("could not fsync log file %u, segment %u: %m", - openLogId, openLogSeg))); + log, seg))); break; #ifdef HAVE_FSYNC_WRITETHROUGH case SYNC_METHOD_FSYNC_WRITETHROUGH: - if (pg_fsync_writethrough(openLogFile) != 0) + if (pg_fsync_writethrough(fd) != 0) ereport(PANIC, (errcode_for_file_access(), errmsg("could not fsync write-through log file %u, segment %u: %m", - openLogId, openLogSeg))); + log, seg))); break; #endif #ifdef HAVE_FDATASYNC case SYNC_METHOD_FDATASYNC: - if (pg_fdatasync(openLogFile) != 0) + if (pg_fdatasync(fd) != 0) ereport(PANIC, (errcode_for_file_access(), errmsg("could not fdatasync log file %u, segment %u: %m", - openLogId, openLogSeg))); + log, seg))); break; #endif case SYNC_METHOD_OPEN: @@ -8021,6 +8382,48 @@ pg_current_xlog_insert_location(PG_FUNCTION_ARGS) } /* + * Report the last WAL receive location (same format as pg_start_backup etc) + * + * This is useful for determining how much of WAL is guaranteed to be received + * and synced to disk by walreceiver. + */ +Datum +pg_last_xlog_receive_location(PG_FUNCTION_ARGS) +{ + XLogRecPtr recptr; + char location[MAXFNAMELEN]; + + recptr = GetWalRcvWriteRecPtr(); + + snprintf(location, sizeof(location), "%X/%X", + recptr.xlogid, recptr.xrecoff); + PG_RETURN_TEXT_P(cstring_to_text(location)); +} + +/* + * Report the last WAL replay location (same format as pg_start_backup etc) + * + * This is useful for determining how much of WAL is visible to read-only + * connections during recovery. + */ +Datum +pg_last_xlog_replay_location(PG_FUNCTION_ARGS) +{ + /* use volatile pointer to prevent code rearrangement */ + volatile XLogCtlData *xlogctl = XLogCtl; + XLogRecPtr recptr; + char location[MAXFNAMELEN]; + + SpinLockAcquire(&xlogctl->info_lck); + recptr = xlogctl->recoveryLastRecPtr; + SpinLockRelease(&xlogctl->info_lck); + + snprintf(location, sizeof(location), "%X/%X", + recptr.xlogid, recptr.xrecoff); + PG_RETURN_TEXT_P(cstring_to_text(location)); +} + +/* * Compute an xlog file name and decimal byte offset given a WAL location, * such as is returned by pg_stop_backup() or pg_xlog_switch(). * @@ -8143,12 +8546,12 @@ pg_xlogfile_name(PG_FUNCTION_ARGS) * point, we will fail to restore a consistent database state. * * Returns TRUE if a backup_label was found (and fills the checkpoint - * location into *checkPointLoc); returns FALSE if not. + * location and its REDO location into *checkPointLoc and RedoStartLSN, + * respectively); returns FALSE if not. */ static bool read_backup_label(XLogRecPtr *checkPointLoc) { - XLogRecPtr startpoint; char startxlogfilename[MAXFNAMELEN]; TimeLineID tli; FILE *lfp; @@ -8174,7 +8577,7 @@ read_backup_label(XLogRecPtr *checkPointLoc) * format). */ if (fscanf(lfp, "START WAL LOCATION: %X/%X (file %08X%16s)%c", - &startpoint.xlogid, &startpoint.xrecoff, &tli, + &RedoStartLSN.xlogid, &RedoStartLSN.xrecoff, &tli, startxlogfilename, &ch) != 5 || ch != '\n') ereport(FATAL, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), @@ -8319,6 +8722,25 @@ StartupProcShutdownHandler(SIGNAL_ARGS) shutdown_requested = true; } +/* Handle SIGHUP and SIGTERM signals of startup process */ +void +HandleStartupProcInterrupts(void) +{ + /* + * Check if we were requested to re-read config file. + */ + if (got_SIGHUP) + { + got_SIGHUP = false; + ProcessConfigFile(PGC_SIGHUP); + } + /* + * Check if we were requested to exit without finishing recovery. + */ + if (shutdown_requested) + proc_exit(1); +} + /* Main entry point for startup process */ void StartupProcessMain(void) diff --git a/src/backend/bootstrap/bootstrap.c b/src/backend/bootstrap/bootstrap.c index 748420401ce..84dd6638efc 100644 --- a/src/backend/bootstrap/bootstrap.c +++ b/src/backend/bootstrap/bootstrap.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/bootstrap/bootstrap.c,v 1.255 2010/01/02 16:57:36 momjian Exp $ + * $PostgreSQL: pgsql/src/backend/bootstrap/bootstrap.c,v 1.256 2010/01/15 09:19:00 heikki Exp $ * *------------------------------------------------------------------------- */ @@ -32,6 +32,7 @@ #include "nodes/makefuncs.h" #include "postmaster/bgwriter.h" #include "postmaster/walwriter.h" +#include "replication/walreceiver.h" #include "storage/bufmgr.h" #include "storage/ipc.h" #include "storage/proc.h" @@ -173,7 +174,7 @@ static IndexList *ILHead = NULL; * AuxiliaryProcessMain * * The main entry point for auxiliary processes, such as the bgwriter, - * walwriter, bootstrapper and the shared memory checker code. + * walwriter, walreceiver, bootstrapper and the shared memory checker code. * * This code is here just because of historical reasons. */ @@ -314,6 +315,9 @@ AuxiliaryProcessMain(int argc, char *argv[]) case WalWriterProcess: statmsg = "wal writer process"; break; + case WalReceiverProcess: + statmsg = "wal receiver process"; + break; default: statmsg = "??? process"; break; @@ -419,6 +423,24 @@ AuxiliaryProcessMain(int argc, char *argv[]) WalWriterMain(); proc_exit(1); /* should never return */ + case WalReceiverProcess: + /* don't set signals, walreceiver has its own agenda */ + { + PGFunction WalReceiverMain; + + /* + * Walreceiver is not linked directly into the server + * binary because we would then need to link the server + * with libpq. It's compiled as a dynamically loaded module + * to avoid that. + */ + WalReceiverMain = load_external_function("walreceiver", + "WalReceiverMain", + true, NULL); + WalReceiverMain(NULL); + } + proc_exit(1); /* should never return */ + default: elog(PANIC, "unrecognized process type: %d", auxType); proc_exit(1); diff --git a/src/backend/commands/cluster.c b/src/backend/commands/cluster.c index 0f0ebd3ebc0..913ba1681c5 100644 --- a/src/backend/commands/cluster.c +++ b/src/backend/commands/cluster.c @@ -11,7 +11,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/commands/cluster.c,v 1.192 2010/01/06 11:25:39 itagaki Exp $ + * $PostgreSQL: pgsql/src/backend/commands/cluster.c,v 1.193 2010/01/15 09:19:01 heikki Exp $ * *------------------------------------------------------------------------- */ @@ -816,10 +816,10 @@ copy_heap_data(Oid OIDNewHeap, Oid OIDOldHeap, Oid OIDOldIndex, isnull = (bool *) palloc(natts * sizeof(bool)); /* - * We need to log the copied data in WAL iff WAL archiving is enabled AND - * it's not a temp rel. + * We need to log the copied data in WAL iff WAL archiving/streaming + * is enabled AND it's not a temp rel. */ - use_wal = XLogArchivingActive() && !NewHeap->rd_istemp; + use_wal = XLogIsNeeded() && !NewHeap->rd_istemp; /* use_wal off requires rd_targblock be initially invalid */ Assert(NewHeap->rd_targblock == InvalidBlockNumber); diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c index 265b50cc1ce..b7e1a175b53 100644 --- a/src/backend/commands/copy.c +++ b/src/backend/commands/copy.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/commands/copy.c,v 1.320 2010/01/02 16:57:37 momjian Exp $ + * $PostgreSQL: pgsql/src/backend/commands/copy.c,v 1.321 2010/01/15 09:19:01 heikki Exp $ * *------------------------------------------------------------------------- */ @@ -1725,7 +1725,7 @@ CopyFrom(CopyState cstate) /*---------- * Check to see if we can avoid writing WAL * - * If archive logging is not enabled *and* either + * If archive logging/streaming is not enabled *and* either * - table was created in same transaction as this COPY * - data is being written to relfilenode created in this transaction * then we can skip writing WAL. It's safe because if the transaction @@ -1753,7 +1753,7 @@ CopyFrom(CopyState cstate) cstate->rel->rd_newRelfilenodeSubid != InvalidSubTransactionId) { hi_options |= HEAP_INSERT_SKIP_FSM; - if (!XLogArchivingActive()) + if (!XLogIsNeeded()) hi_options |= HEAP_INSERT_SKIP_WAL; } diff --git a/src/backend/commands/tablecmds.c b/src/backend/commands/tablecmds.c index dd7da742d22..bf1234614c9 100644 --- a/src/backend/commands/tablecmds.c +++ b/src/backend/commands/tablecmds.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/commands/tablecmds.c,v 1.314 2010/01/06 03:04:00 momjian Exp $ + * $PostgreSQL: pgsql/src/backend/commands/tablecmds.c,v 1.315 2010/01/15 09:19:01 heikki Exp $ * *------------------------------------------------------------------------- */ @@ -7044,10 +7044,10 @@ copy_relation_data(SMgrRelation src, SMgrRelation dst, Page page = (Page) buf; /* - * We need to log the copied data in WAL iff WAL archiving is enabled AND - * it's not a temp rel. + * We need to log the copied data in WAL iff WAL archiving/streaming is + * enabled AND it's not a temp rel. */ - use_wal = XLogArchivingActive() && !istemp; + use_wal = XLogIsNeeded() && !istemp; nblocks = smgrnblocks(src, forkNum); diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index 8d7adfffbb6..557ecc7e15c 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -26,7 +26,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/executor/execMain.c,v 1.341 2010/01/08 02:44:00 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/executor/execMain.c,v 1.342 2010/01/15 09:19:02 heikki Exp $ * *------------------------------------------------------------------------- */ @@ -2213,11 +2213,11 @@ OpenIntoRel(QueryDesc *queryDesc) myState->rel = intoRelationDesc; /* - * We can skip WAL-logging the insertions, unless PITR is in use. We can - * skip the FSM in any case. + * We can skip WAL-logging the insertions, unless PITR or streaming + * replication is in use. We can skip the FSM in any case. */ myState->hi_options = HEAP_INSERT_SKIP_FSM | - (XLogArchivingActive() ? 0 : HEAP_INSERT_SKIP_WAL); + (XLogIsNeeded() ? 0 : HEAP_INSERT_SKIP_WAL); myState->bistate = GetBulkInsertState(); /* Not using WAL requires rd_targblock be initially invalid */ diff --git a/src/backend/libpq/be-secure.c b/src/backend/libpq/be-secure.c index 4fe19e92c39..ea6b16c13ba 100644 --- a/src/backend/libpq/be-secure.c +++ b/src/backend/libpq/be-secure.c @@ -11,7 +11,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/libpq/be-secure.c,v 1.95 2010/01/02 16:57:45 momjian Exp $ + * $PostgreSQL: pgsql/src/backend/libpq/be-secure.c,v 1.96 2010/01/15 09:19:02 heikki Exp $ * * Since the server static private key ($DataDir/server.key) * will normally be stored unencrypted so that the database @@ -255,6 +255,8 @@ rloop: break; case SSL_ERROR_WANT_READ: case SSL_ERROR_WANT_WRITE: + if (port->noblock) + return 0; #ifdef WIN32 pgwin32_waitforsinglesocket(SSL_get_fd(port->ssl), (err == SSL_ERROR_WANT_READ) ? diff --git a/src/backend/libpq/hba.c b/src/backend/libpq/hba.c index c3dc1ad5891..98011c2822b 100644 --- a/src/backend/libpq/hba.c +++ b/src/backend/libpq/hba.c @@ -10,7 +10,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/libpq/hba.c,v 1.194 2010/01/02 16:57:45 momjian Exp $ + * $PostgreSQL: pgsql/src/backend/libpq/hba.c,v 1.195 2010/01/15 09:19:02 heikki Exp $ * *------------------------------------------------------------------------- */ @@ -28,6 +28,7 @@ #include "libpq/ip.h" #include "libpq/libpq.h" #include "regex/regex.h" +#include "replication/walsender.h" #include "storage/fd.h" #include "utils/acl.h" #include "utils/guc.h" @@ -190,7 +191,8 @@ next_token(FILE *fp, char *buf, int bufsz) (strcmp(start_buf, "all") == 0 || strcmp(start_buf, "sameuser") == 0 || strcmp(start_buf, "samegroup") == 0 || - strcmp(start_buf, "samerole") == 0)) + strcmp(start_buf, "samerole") == 0 || + strcmp(start_buf, "replication") == 0)) { /* append newline to a magical keyword */ *buf++ = '\n'; @@ -514,6 +516,9 @@ check_db(const char *dbname, const char *role, Oid roleid, char *param_str) if (is_member(roleid, dbname)) return true; } + else if (strcmp(tok, "replication\n") == 0 && + am_walsender) + return true; else if (strcmp(tok, dbname) == 0) return true; } diff --git a/src/backend/libpq/pg_hba.conf.sample b/src/backend/libpq/pg_hba.conf.sample index cfcd246aae6..54b369d5f22 100644 --- a/src/backend/libpq/pg_hba.conf.sample +++ b/src/backend/libpq/pg_hba.conf.sample @@ -20,8 +20,8 @@ # "host" is either a plain or SSL-encrypted TCP/IP socket, "hostssl" is an # SSL-encrypted TCP/IP socket, and "hostnossl" is a plain TCP/IP socket. # -# DATABASE can be "all", "sameuser", "samerole", a database name, or -# a comma-separated list thereof. +# DATABASE can be "all", "sameuser", "samerole", "replication", +# a database name, or a comma-separated list thereof. # # USER can be "all", a user name, a group name prefixed with "+", or # a comma-separated list thereof. In both the DATABASE and USER fields @@ -47,9 +47,9 @@ # for a list of which options are available for which authentication methods. # # Database and user names containing spaces, commas, quotes and other special -# characters must be quoted. Quoting one of the keywords "all", "sameuser" or -# "samerole" makes the name lose its special character, and just match a -# database or username with that name. +# characters must be quoted. Quoting one of the keywords "all", "sameuser", +# "samerole" or "replication" makes the name lose its special character, +# and just match a database or username with that name. # # This file is read on server startup and when the postmaster receives # a SIGHUP signal. If you edit the file on a running system, you have diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c index b99c9da2ab5..f4e69742524 100644 --- a/src/backend/libpq/pqcomm.c +++ b/src/backend/libpq/pqcomm.c @@ -30,7 +30,7 @@ * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/backend/libpq/pqcomm.c,v 1.201 2010/01/10 14:16:07 mha Exp $ + * $PostgreSQL: pgsql/src/backend/libpq/pqcomm.c,v 1.202 2010/01/15 09:19:02 heikki Exp $ * *------------------------------------------------------------------------- */ @@ -55,6 +55,7 @@ * pq_peekbyte - peek at next byte from connection * pq_putbytes - send bytes to connection (not flushed until pq_flush) * pq_flush - flush pending output + * pq_getbyte_if_available - get a byte if available without blocking * * message-level I/O (and old-style-COPY-OUT cruft): * pq_putmessage - send a normal message (suppressed in COPY OUT mode) @@ -815,6 +816,56 @@ pq_peekbyte(void) return (unsigned char) PqRecvBuffer[PqRecvPointer]; } + +/* -------------------------------- + * pq_getbyte_if_available - get a single byte from connection, + * if available + * + * The received byte is stored in *c. Returns 1 if a byte was read, 0 if + * if no data was available, or EOF. + * -------------------------------- + */ +int +pq_getbyte_if_available(unsigned char *c) +{ + int r; + + if (PqRecvPointer < PqRecvLength) + { + *c = PqRecvBuffer[PqRecvPointer++]; + return 1; + } + + /* Temporarily put the socket into non-blocking mode */ + if (!pg_set_noblock(MyProcPort->sock)) + ereport(ERROR, + (errmsg("couldn't put socket to non-blocking mode: %m"))); + MyProcPort->noblock = true; + PG_TRY(); + { + r = secure_read(MyProcPort, c, 1); + } + PG_CATCH(); + { + /* + * The rest of the backend code assumes the socket is in blocking + * mode, so treat failure as FATAL. + */ + if (!pg_set_block(MyProcPort->sock)) + ereport(FATAL, + (errmsg("couldn't put socket to blocking mode: %m"))); + MyProcPort->noblock = false; + PG_RE_THROW(); + } + PG_END_TRY(); + if (!pg_set_block(MyProcPort->sock)) + ereport(FATAL, + (errmsg("couldn't put socket to blocking mode: %m"))); + MyProcPort->noblock = false; + + return r; +} + /* -------------------------------- * pq_getbytes - get a known number of bytes from connection * diff --git a/src/backend/postmaster/bgwriter.c b/src/backend/postmaster/bgwriter.c index e26befacc6f..b51cceea74d 100644 --- a/src/backend/postmaster/bgwriter.c +++ b/src/backend/postmaster/bgwriter.c @@ -38,7 +38,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/postmaster/bgwriter.c,v 1.65 2010/01/02 16:57:50 momjian Exp $ + * $PostgreSQL: pgsql/src/backend/postmaster/bgwriter.c,v 1.66 2010/01/15 09:19:02 heikki Exp $ * *------------------------------------------------------------------------- */ @@ -354,6 +354,12 @@ BackgroundWriterMain(void) PG_SETMASK(&UnBlockSig); /* + * Use the recovery target timeline ID during recovery + */ + if (RecoveryInProgress()) + ThisTimeLineID = GetRecoveryTargetTLI(); + + /* * Loop forever */ for (;;) diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index 94672be0c09..59f994bd16e 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -37,7 +37,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/postmaster/postmaster.c,v 1.600 2010/01/10 14:16:08 mha Exp $ + * $PostgreSQL: pgsql/src/backend/postmaster/postmaster.c,v 1.601 2010/01/15 09:19:02 heikki Exp $ * * NOTES * @@ -108,6 +108,7 @@ #include "postmaster/pgarch.h" #include "postmaster/postmaster.h" #include "postmaster/syslogger.h" +#include "replication/walsender.h" #include "storage/fd.h" #include "storage/ipc.h" #include "storage/pg_shmem.h" @@ -131,8 +132,8 @@ * children we have and send them appropriate signals when necessary. * * "Special" children such as the startup, bgwriter and autovacuum launcher - * tasks are not in this list. Autovacuum worker processes are in it. - * Also, "dead_end" children are in it: these are children launched just + * tasks are not in this list. Autovacuum worker and walsender processes are + * in it. Also, "dead_end" children are in it: these are children launched just * for the purpose of sending a friendly rejection message to a would-be * client. We must track them because they are attached to shared memory, * but we know they will never become live backends. dead_end children are @@ -207,6 +208,7 @@ char *bonjour_name; static pid_t StartupPID = 0, BgWriterPID = 0, WalWriterPID = 0, + WalReceiverPID = 0, AutoVacPID = 0, PgArchPID = 0, PgStatPID = 0, @@ -222,6 +224,9 @@ static int Shutdown = NoShutdown; static bool FatalError = false; /* T if recovering from backend crash */ static bool RecoveryError = false; /* T if WAL recovery failed */ +/* If WalReceiverActive is true, restart walreceiver if it dies */ +static bool WalReceiverActive = false; + /* * We use a simple state machine to control startup, shutdown, and * crash recovery (which is rather like shutdown followed by startup). @@ -278,7 +283,7 @@ typedef enum PM_WAIT_BACKUP, /* waiting for online backup mode to end */ PM_WAIT_BACKENDS, /* waiting for live backends to exit */ PM_SHUTDOWN, /* waiting for bgwriter to do shutdown ckpt */ - PM_SHUTDOWN_2, /* waiting for archiver to finish */ + PM_SHUTDOWN_2, /* waiting for archiver and walsenders to finish */ PM_WAIT_DEAD_END, /* waiting for dead_end children to exit */ PM_NO_CHILDREN /* all important children have exited */ } PMState; @@ -348,11 +353,21 @@ static enum CAC_state canAcceptConnections(void); static long PostmasterRandom(void); static void RandomSalt(char *md5Salt); static void signal_child(pid_t pid, int signal); -static void SignalSomeChildren(int signal, bool only_autovac); +static bool SignalSomeChildren(int signal, int targets); + +#define SignalChildren(sig) SignalSomeChildren(sig, BACKEND_TYPE_ALL) +#define SignalAutovacWorkers(sig) SignalSomeChildren(sig, BACKEND_TYPE_AUTOVAC) + +/* + * Possible types of a backend. These are OR-able request flag bits + * for SignalSomeChildren() and CountChildren(). + */ +#define BACKEND_TYPE_NORMAL 0x0001 /* normal backend */ +#define BACKEND_TYPE_AUTOVAC 0x0002 /* autovacuum worker process */ +#define BACKEND_TYPE_WALSND 0x0004 /* walsender process */ +#define BACKEND_TYPE_ALL 0x0007 /* OR of all the above */ -#define SignalChildren(sig) SignalSomeChildren(sig, false) -#define SignalAutovacWorkers(sig) SignalSomeChildren(sig, true) -static int CountChildren(void); +static int CountChildren(int target); static bool CreateOptsFile(int argc, char *argv[], char *fullprogname); static pid_t StartChildProcess(AuxProcType type); static void StartAutovacuumWorker(void); @@ -451,6 +466,7 @@ static void ShmemBackendArrayRemove(Backend *bn); #define StartupDataBase() StartChildProcess(StartupProcess) #define StartBackgroundWriter() StartChildProcess(BgWriterProcess) #define StartWalWriter() StartChildProcess(WalWriterProcess) +#define StartWalReceiver() StartChildProcess(WalReceiverProcess) /* Macros to check exit status of a child process */ #define EXIT_STATUS_0(st) ((st) == 0) @@ -1453,6 +1469,11 @@ ServerLoop(void) if (PgStatPID == 0 && pmState == PM_RUN) PgStatPID = pgstat_start(); + /* If we have lost walreceiver, try to start a new one */ + if (WalReceiverPID == 0 && WalReceiverActive && + (pmState == PM_RECOVERY || pmState == PM_RECOVERY_CONSISTENT)) + WalReceiverPID = StartWalReceiver(); + /* If we need to signal the autovacuum launcher, do so now */ if (avlauncher_needs_signal) { @@ -1678,6 +1699,13 @@ retry1: port->user_name = pstrdup(valptr); else if (strcmp(nameptr, "options") == 0) port->cmdline_options = pstrdup(valptr); + else if (strcmp(nameptr, "replication") == 0) + { + if(!parse_bool(valptr, &am_walsender)) + ereport(FATAL, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("invalid value for boolean option \"replication\""))); + } else { /* Assume it's a generic GUC option */ @@ -1762,6 +1790,10 @@ retry1: if (strlen(port->user_name) >= NAMEDATALEN) port->user_name[NAMEDATALEN - 1] = '\0'; + /* Walsender is not related to a particular database */ + if (am_walsender) + port->database_name[0] = '\0'; + /* * Done putting stuff in TopMemoryContext. */ @@ -1907,7 +1939,7 @@ canAcceptConnections(void) * The limit here must match the sizes of the per-child-process arrays; * see comments for MaxLivePostmasterChildren(). */ - if (CountChildren() >= MaxLivePostmasterChildren()) + if (CountChildren(BACKEND_TYPE_ALL) >= MaxLivePostmasterChildren()) return CAC_TOOMANY; return CAC_OK; @@ -2071,6 +2103,8 @@ SIGHUP_handler(SIGNAL_ARGS) signal_child(BgWriterPID, SIGHUP); if (WalWriterPID != 0) signal_child(WalWriterPID, SIGHUP); + if (WalReceiverPID != 0) + signal_child(WalReceiverPID, SIGHUP); if (AutoVacPID != 0) signal_child(AutoVacPID, SIGHUP); if (PgArchPID != 0) @@ -2166,6 +2200,8 @@ pmdie(SIGNAL_ARGS) if (StartupPID != 0) signal_child(StartupPID, SIGTERM); + if (WalReceiverPID != 0) + signal_child(WalReceiverPID, SIGTERM); if (pmState == PM_RECOVERY) { /* only bgwriter is active in this state */ @@ -2179,7 +2215,8 @@ pmdie(SIGNAL_ARGS) ereport(LOG, (errmsg("aborting any active transactions"))); /* shut down all backends and autovac workers */ - SignalChildren(SIGTERM); + SignalSomeChildren(SIGTERM, + BACKEND_TYPE_NORMAL | BACKEND_TYPE_AUTOVAC); /* and the autovac launcher too */ if (AutoVacPID != 0) signal_child(AutoVacPID, SIGTERM); @@ -2213,6 +2250,8 @@ pmdie(SIGNAL_ARGS) signal_child(BgWriterPID, SIGQUIT); if (WalWriterPID != 0) signal_child(WalWriterPID, SIGQUIT); + if (WalReceiverPID != 0) + signal_child(WalReceiverPID, SIGQUIT); if (AutoVacPID != 0) signal_child(AutoVacPID, SIGQUIT); if (PgArchPID != 0) @@ -2364,19 +2403,22 @@ reaper(SIGNAL_ARGS) * have dead_end children to wait for. * * If we have an archiver subprocess, tell it to do a last - * archive cycle and quit; otherwise we can go directly to - * PM_WAIT_DEAD_END state. + * archive cycle and quit. Likewise, if we have walsender + * processes, tell them to send any remaining WAL and quit. */ Assert(Shutdown > NoShutdown); + /* Waken archiver for the last time */ if (PgArchPID != 0) - { - /* Waken archiver for the last time */ signal_child(PgArchPID, SIGUSR2); - pmState = PM_SHUTDOWN_2; - } - else - pmState = PM_WAIT_DEAD_END; + + /* + * Waken walsenders for the last time. No regular backends + * should be around anymore. + */ + SignalChildren(SIGUSR2); + + pmState = PM_SHUTDOWN_2; /* * We can also shut down the stats collector now; there's @@ -2413,6 +2455,20 @@ reaper(SIGNAL_ARGS) } /* + * Was it the wal receiver? If exit status is zero (normal) or one + * (FATAL exit), we assume everything is all right just like normal + * backends. + */ + if (pid == WalReceiverPID) + { + WalReceiverPID = 0; + if (!EXIT_STATUS_0(exitstatus) && !EXIT_STATUS_1(exitstatus)) + HandleChildCrash(pid, exitstatus, + _("WAL receiver process")); + continue; + } + + /* * Was it the autovacuum launcher? Normal exit can be ignored; we'll * start a new one at the next iteration of the postmaster's main * loop, if necessary. Any other exit condition is treated as a @@ -2430,8 +2486,10 @@ reaper(SIGNAL_ARGS) /* * Was it the archiver? If so, just try to start a new one; no need * to force reset of the rest of the system. (If fail, we'll try - * again in future cycles of the main loop.) But if we were waiting - * for it to shut down, advance to the next shutdown step. + * again in future cycles of the main loop.). Unless we were + * waiting for it to shut down; don't restart it in that case, and + * and PostmasterStateMachine() will advance to the next shutdown + * step. */ if (pid == PgArchPID) { @@ -2441,8 +2499,6 @@ reaper(SIGNAL_ARGS) pid, exitstatus); if (XLogArchivingActive() && pmState == PM_RUN) PgArchPID = pgarch_start(); - else if (pmState == PM_SHUTDOWN_2) - pmState = PM_WAIT_DEAD_END; continue; } @@ -2653,6 +2709,18 @@ HandleChildCrash(int pid, int exitstatus, const char *procname) signal_child(WalWriterPID, (SendStop ? SIGSTOP : SIGQUIT)); } + /* Take care of the walreceiver too */ + if (pid == WalReceiverPID) + WalReceiverPID = 0; + else if (WalReceiverPID != 0 && !FatalError) + { + ereport(DEBUG2, + (errmsg_internal("sending %s to process %d", + (SendStop ? "SIGSTOP" : "SIGQUIT"), + (int) WalReceiverPID))); + signal_child(WalReceiverPID, (SendStop ? SIGSTOP : SIGQUIT)); + } + /* Take care of the autovacuum launcher too */ if (pid == AutoVacPID) AutoVacPID = 0; @@ -2791,10 +2859,13 @@ PostmasterStateMachine(void) * If we are doing crash recovery then we expect the bgwriter to exit * too, otherwise not. The archiver, stats, and syslogger processes * are disregarded since they are not connected to shared memory; we - * also disregard dead_end children here. + * also disregard dead_end children here. Walsenders are also + * disregarded, they will be terminated later after writing the + * checkpoint record, like the archiver process. */ - if (CountChildren() == 0 && + if (CountChildren(BACKEND_TYPE_NORMAL | BACKEND_TYPE_AUTOVAC) == 0 && StartupPID == 0 && + WalReceiverPID == 0 && (BgWriterPID == 0 || !FatalError) && WalWriterPID == 0 && AutoVacPID == 0) @@ -2840,7 +2911,8 @@ PostmasterStateMachine(void) FatalError = true; pmState = PM_WAIT_DEAD_END; - /* Kill the archiver and stats collector too */ + /* Kill the walsenders, archiver and stats collector too */ + SignalSomeChildren(SIGQUIT, BACKEND_TYPE_ALL); if (PgArchPID != 0) signal_child(PgArchPID, SIGQUIT); if (PgStatPID != 0) @@ -2850,6 +2922,24 @@ PostmasterStateMachine(void) } } + if (pmState == PM_SHUTDOWN_2) + { + /* + * PM_SHUTDOWN_2 state ends when there's no other children than + * dead_end children left. There shouldn't be any regular backends + * left by now anyway; what we're really waiting for is walsenders + * and archiver. + * + * Walreceiver should normally be dead by now, but not when a fast + * shutdown is performed during recovery. + */ + if (PgArchPID == 0 && CountChildren(BACKEND_TYPE_ALL) == 0 && + WalReceiverPID == 0) + { + pmState = PM_WAIT_DEAD_END; + } + } + if (pmState == PM_WAIT_DEAD_END) { /* @@ -2870,6 +2960,7 @@ PostmasterStateMachine(void) { /* These other guys should be dead already */ Assert(StartupPID == 0); + Assert(WalReceiverPID == 0); Assert(BgWriterPID == 0); Assert(WalWriterPID == 0); Assert(AutoVacPID == 0); @@ -2976,14 +3067,14 @@ signal_child(pid_t pid, int signal) } /* - * Send a signal to all backend children, including autovacuum workers - * (but NOT special children; dead_end children are never signaled, either). - * If only_autovac is TRUE, only the autovacuum worker processes are signalled. + * Send a signal to the targeted children (but NOT special children; + * dead_end children are never signaled, either). */ -static void -SignalSomeChildren(int signal, bool only_autovac) +static bool +SignalSomeChildren(int signal, int target) { Dlelem *curr; + bool signaled = false; for (curr = DLGetHead(BackendList); curr; curr = DLGetSucc(curr)) { @@ -2991,14 +3082,21 @@ SignalSomeChildren(int signal, bool only_autovac) if (bp->dead_end) continue; - if (only_autovac && !bp->is_autovacuum) + if (!(target & BACKEND_TYPE_NORMAL) && !bp->is_autovacuum) + continue; + if (!(target & BACKEND_TYPE_AUTOVAC) && bp->is_autovacuum) + continue; + if (!(target & BACKEND_TYPE_WALSND) && + IsPostmasterChildWalSender(bp->child_slot)) continue; ereport(DEBUG4, (errmsg_internal("sending signal %d to process %d", signal, (int) bp->pid))); signal_child(bp->pid, signal); + signaled = true; } + return signaled; } /* @@ -3279,9 +3377,21 @@ BackendInitialize(Port *port) /* * Now that we have the user and database name, we can set the process * title for ps. It's good to do this as early as possible in startup. + * + * For a walsender, the ps display is set in the following form: + * + * postgres: wal sender process <user> <host> <activity> + * + * To achieve that, we pass "wal sender process" as username and username + * as dbname to init_ps_display(). XXX: should add a new variant of + * init_ps_display() to avoid abusing the parameters like this. */ - init_ps_display(port->user_name, port->database_name, remote_ps_data, - update_process_title ? "authentication" : ""); + if (am_walsender) + init_ps_display("wal sender process", port->user_name, remote_ps_data, + update_process_title ? "authentication" : ""); + else + init_ps_display(port->user_name, port->database_name, remote_ps_data, + update_process_title ? "authentication" : ""); /* * Disable the timeout, and prevent SIGTERM/SIGQUIT again. @@ -4053,6 +4163,20 @@ sigusr1_handler(SIGNAL_ARGS) StartAutovacuumWorker(); } + if (CheckPostmasterSignal(PMSIGNAL_START_WALRECEIVER) && + WalReceiverPID == 0) + { + /* Startup Process wants us to start the walreceiver process. */ + WalReceiverActive = true; + WalReceiverPID = StartWalReceiver(); + } + + if (CheckPostmasterSignal(PMSIGNAL_SHUTDOWN_WALRECEIVER)) + { + /* The walreceiver process doesn't want to be restarted anymore */ + WalReceiverActive = false; + } + PG_SETMASK(&UnBlockSig); errno = save_errno; @@ -4146,11 +4270,11 @@ PostmasterRandom(void) } /* - * Count up number of child processes (excluding special children and - * dead_end children) + * Count up number of child processes of specified types (dead_end chidren + * are always excluded). */ static int -CountChildren(void) +CountChildren(int target) { Dlelem *curr; int cnt = 0; @@ -4159,8 +4283,17 @@ CountChildren(void) { Backend *bp = (Backend *) DLE_VAL(curr); - if (!bp->dead_end) - cnt++; + if (bp->dead_end) + continue; + if (!(target & BACKEND_TYPE_NORMAL) && !bp->is_autovacuum) + continue; + if (!(target & BACKEND_TYPE_AUTOVAC) && bp->is_autovacuum) + continue; + if (!(target & BACKEND_TYPE_WALSND) && + IsPostmasterChildWalSender(bp->child_slot)) + continue; + + cnt++; } return cnt; } @@ -4244,6 +4377,10 @@ StartChildProcess(AuxProcType type) ereport(LOG, (errmsg("could not fork WAL writer process: %m"))); break; + case WalReceiverProcess: + ereport(LOG, + (errmsg("could not fork WAL receiver process: %m"))); + break; default: ereport(LOG, (errmsg("could not fork process: %m"))); @@ -4383,11 +4520,11 @@ CreateOptsFile(int argc, char *argv[], char *fullprogname) * * This reports the number of entries needed in per-child-process arrays * (the PMChildFlags array, and if EXEC_BACKEND the ShmemBackendArray). - * These arrays include regular backends and autovac workers, but not special - * children nor dead_end children. This allows the arrays to have a fixed - * maximum size, to wit the same too-many-children limit enforced by - * canAcceptConnections(). The exact value isn't too critical as long as - * it's more than MaxBackends. + * These arrays include regular backends, autovac workers and walsenders, + * but not special children nor dead_end children. This allows the arrays + * to have a fixed maximum size, to wit the same too-many-children limit + * enforced by canAcceptConnections(). The exact value isn't too critical + * as long as it's more than MaxBackends. */ int MaxLivePostmasterChildren(void) diff --git a/src/backend/replication/Makefile b/src/backend/replication/Makefile new file mode 100644 index 00000000000..7903c1ac5e4 --- /dev/null +++ b/src/backend/replication/Makefile @@ -0,0 +1,17 @@ +#------------------------------------------------------------------------- +# +# Makefile-- +# Makefile for src/backend/replication +# +# IDENTIFICATION +# $PostgreSQL: pgsql/src/backend/replication/Makefile,v 1.1 2010/01/15 09:19:03 heikki Exp $ +# +#------------------------------------------------------------------------- + +subdir = src/backend/replication +top_builddir = ../../.. +include $(top_builddir)/src/Makefile.global + +OBJS = walsender.o walreceiverfuncs.o + +include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/replication/README b/src/backend/replication/README new file mode 100644 index 00000000000..0f40dc79e90 --- /dev/null +++ b/src/backend/replication/README @@ -0,0 +1,58 @@ +$PostgreSQL: pgsql/src/backend/replication/README,v 1.1 2010/01/15 09:19:03 heikki Exp $ + +Walreceiver IPC +--------------- + +When the WAL replay in startup process has reached the end of archived WAL, +recoverable using recovery_command, it starts up the walreceiver process +to fetch more WAL (if streaming replication is configured). + +Walreceiver is a postmaster subprocess, so the startup process can't fork it +directly. Instead, it sends a signal to postmaster, asking postmaster to launch +it. Before that, however, startup process fills in WalRcvData->conninfo, +and initializes the starting point in WalRcvData->receivedUpTo. + +As walreceiver receives WAL from the master server, and writes and flushes +it to disk (in pg_xlog), it updates WalRcvData->receivedUpTo. Startup process +polls that to know how far it can proceed with WAL replay. + +Walsender IPC +------------- + +At shutdown, postmaster handles walsender processes differently from regular +backends. It waits for regular backends to die before writing the +shutdown checkpoint and terminating pgarch and other auxiliary processes, but +that's not desirable for walsenders, because we want the standby servers to +receive all the WAL, including the shutdown checkpoint, before the master +is shut down. Therefore postmaster treats walsenders like the pgarch process, +and instructs them to terminate at PM_SHUTDOWN_2 phase, after all regular +backends have died and bgwriter has written the shutdown checkpoint. + +When postmaster accepts a connection, it immediately forks a new process +to handle the handshake and authentication, and the process initializes to +become a backend. Postmaster doesn't know if the process becomes a regular +backend or a walsender process at that time - that's indicated in the +connection handshake - so we need some extra signaling to let postmaster +identify walsender processes. + +When walsender process starts up, it marks itself as a walsender process in +the PMSignal array. That way postmaster can tell it apart from regular +backends. + +Note that no big harm is done if postmaster thinks that a walsender is a +regular backend; it will just terminate the walsender earlier in the shutdown +phase. A walsenders will look like a regular backends until it's done with the +initialization and has marked itself in PMSignal array, and at process +termination, after unmarking the PMSignal slot. + +Each walsender allocates an entry from the WalSndCtl array, and advertises +there how far it has streamed WAL already. This is used at checkpoints, to +avoid recycling WAL that hasn't been streamed to a slave yet. However, +that doesn't stop such WAL from being recycled when the connection is not +established. + + +Walsender - walreceiver protocol +-------------------------------- + +See manual. diff --git a/src/backend/replication/walreceiver/Makefile b/src/backend/replication/walreceiver/Makefile new file mode 100644 index 00000000000..2b26a95b4de --- /dev/null +++ b/src/backend/replication/walreceiver/Makefile @@ -0,0 +1,32 @@ +#------------------------------------------------------------------------- +# +# Makefile-- +# Makefile for src/backend/replication/walreceiver +# +# IDENTIFICATION +# $PostgreSQL: pgsql/src/backend/replication/walreceiver/Makefile,v 1.1 2010/01/15 09:19:03 heikki Exp $ +# +#------------------------------------------------------------------------- + +subdir = src/backend/postmaster/walreceiver +top_builddir = ../../../.. +include $(top_builddir)/src/Makefile.global + +override CPPFLAGS := -I$(srcdir) -I$(libpq_srcdir) $(CPPFLAGS) + +OBJS = walreceiver.o +SHLIB_LINK = $(libpq) +NAME = walreceiver + +all: submake-libpq all-shared-lib + +include $(top_srcdir)/src/Makefile.shlib + +install: all installdirs install-lib + +installdirs: installdirs-lib + +uninstall: uninstall-lib + +clean distclean maintainer-clean: clean-lib + rm -f $(OBJS) diff --git a/src/backend/replication/walreceiver/walreceiver.c b/src/backend/replication/walreceiver/walreceiver.c new file mode 100644 index 00000000000..1d0d6404b90 --- /dev/null +++ b/src/backend/replication/walreceiver/walreceiver.c @@ -0,0 +1,796 @@ +/*------------------------------------------------------------------------- + * + * walreceiver.c + * + * The WAL receiver process (walreceiver) is new as of Postgres 8.5. It + * is the process in the standby server that takes charge of receiving + * XLOG records from a primary server during streaming replication. + * + * When the startup process determines that it's time to start streaming, + * it instructs postmaster to start walreceiver. Walreceiver first connects + * connects to the primary server (it will be served by a walsender process + * in the primary server), and then keeps receiving XLOG records and + * writing them to the disk as long as the connection is alive. As XLOG + * records are received and flushed to disk, it updates the + * WalRcv->receivedUpTo variable in shared memory, to inform the startup + * process of how far it can proceed with XLOG replay. + * + * Normal termination is by SIGTERM, which instructs the walreceiver to + * exit(0). Emergency termination is by SIGQUIT; like any postmaster child + * process, the walreceiver will simply abort and exit on SIGQUIT. A close + * of the connection and a FATAL error are treated not as a crash but as + * normal operation. + * + * Walreceiver is a postmaster child process like others, but it's compiled + * as a dynamic module to avoid linking libpq with the main server binary. + * + * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group + * + * + * IDENTIFICATION + * $PostgreSQL: pgsql/src/backend/replication/walreceiver/walreceiver.c,v 1.1 2010/01/15 09:19:03 heikki Exp $ + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include <unistd.h> + +#include "access/xlog_internal.h" +#include "libpq-fe.h" +#include "libpq/pqsignal.h" +#include "miscadmin.h" +#include "replication/walreceiver.h" +#include "storage/ipc.h" +#include "storage/pmsignal.h" +#include "utils/builtins.h" +#include "utils/guc.h" +#include "utils/memutils.h" +#include "utils/ps_status.h" +#include "utils/resowner.h" + +#ifdef HAVE_POLL_H +#include <poll.h> +#endif +#ifdef HAVE_SYS_POLL_H +#include <sys/poll.h> +#endif +#ifdef HAVE_SYS_SELECT_H +#include <sys/select.h> +#endif + +PG_MODULE_MAGIC; + +PG_FUNCTION_INFO_V1(WalReceiverMain); +Datum WalReceiverMain(PG_FUNCTION_ARGS); + +/* streamConn is a PGconn object of a connection to walsender from walreceiver */ +static PGconn *streamConn = NULL; + +#define NAPTIME_PER_CYCLE 100 /* max sleep time between cycles (100ms) */ + +/* + * These variables are used similarly to openLogFile/Id/Seg/Off, + * but for walreceiver to write the XLOG. + */ +static int recvFile = -1; +static uint32 recvId = 0; +static uint32 recvSeg = 0; +static uint32 recvOff = 0; + +/* Buffer for currently read records */ +static char *recvBuf = NULL; + +/* Flags set by interrupt handlers of walreceiver for later service in the main loop */ +static volatile sig_atomic_t got_SIGHUP = false; +static volatile sig_atomic_t got_SIGTERM = false; + +static void ProcessWalRcvInterrupts(void); +static void EnableImmediateExit(void); +static void DisableImmediateExit(void); + +/* + * About SIGTERM handling: + * + * We can't just exit(1) within SIGTERM signal handler, because the signal + * might arrive in the middle of some critical operation, like while we're + * holding a spinlock. We also can't just set a flag in signal handler and + * check it in the main loop, because we perform some blocking libpq + * operations like PQexec(), which can take a long time to finish. + * + * We use a combined approach: When WalRcvImmediateInterruptOK is true, it's + * safe for the signal handler to elog(FATAL) immediately. Otherwise it just + * sets got_SIGTERM flag, which is checked in the main loop when convenient. + * + * This is very much like what regular backends do with ImmediateInterruptOK, + * ProcessInterrupts() etc. + */ +static volatile bool WalRcvImmediateInterruptOK = false; + +static void +ProcessWalRcvInterrupts(void) +{ + /* + * Although walreceiver interrupt handling doesn't use the same scheme + * as regular backends, call CHECK_FOR_INTERRUPTS() to make sure we + * receive any incoming signals on Win32. + */ + CHECK_FOR_INTERRUPTS(); + + if (got_SIGTERM) + { + WalRcvImmediateInterruptOK = false; + ereport(FATAL, + (errcode(ERRCODE_ADMIN_SHUTDOWN), + errmsg("terminating walreceiver process due to administrator command"))); + } +} + +static void +EnableImmediateExit() +{ + WalRcvImmediateInterruptOK = true; + ProcessWalRcvInterrupts(); +} + +static void +DisableImmediateExit() +{ + WalRcvImmediateInterruptOK = false; + ProcessWalRcvInterrupts(); +} + +/* Signal handlers */ +static void WalRcvSigHupHandler(SIGNAL_ARGS); +static void WalRcvShutdownHandler(SIGNAL_ARGS); +static void WalRcvQuickDieHandler(SIGNAL_ARGS); + +/* Prototypes for private functions */ +static void WalRcvLoop(void); +static void InitWalRcv(void); +static void WalRcvConnect(void); +static bool WalRcvWait(int timeout_ms); +static void WalRcvKill(int code, Datum arg); +static void XLogRecv(void); +static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr); +static void XLogWalRcvFlush(void); + +/* + * LogstreamResult indicates the byte positions that we have already + * written/fsynced. + */ +static struct +{ + XLogRecPtr Write; /* last byte + 1 written out in the standby */ + XLogRecPtr Flush; /* last byte + 1 flushed in the standby */ +} LogstreamResult; + +/* Main entry point for walreceiver process */ +Datum +WalReceiverMain(PG_FUNCTION_ARGS) +{ + sigjmp_buf local_sigjmp_buf; + MemoryContext walrcv_context; + + /* Mark walreceiver in progress */ + InitWalRcv(); + + /* + * If possible, make this process a group leader, so that the postmaster + * can signal any child processes too. (walreceiver probably never has + * any child processes, but for consistency we make all postmaster child + * processes do this.) + */ +#ifdef HAVE_SETSID + if (setsid() < 0) + elog(FATAL, "setsid() failed: %m"); +#endif + + /* Properly accept or ignore signals the postmaster might send us */ + pqsignal(SIGHUP, WalRcvSigHupHandler); /* set flag to read config file */ + pqsignal(SIGINT, SIG_IGN); + pqsignal(SIGTERM, WalRcvShutdownHandler); /* request shutdown */ + pqsignal(SIGQUIT, WalRcvQuickDieHandler); /* hard crash time */ + pqsignal(SIGALRM, SIG_IGN); + pqsignal(SIGPIPE, SIG_IGN); + pqsignal(SIGUSR1, SIG_IGN); + pqsignal(SIGUSR2, SIG_IGN); + + /* Reset some signals that are accepted by postmaster but not here */ + pqsignal(SIGCHLD, SIG_DFL); + pqsignal(SIGTTIN, SIG_DFL); + pqsignal(SIGTTOU, SIG_DFL); + pqsignal(SIGCONT, SIG_DFL); + pqsignal(SIGWINCH, SIG_DFL); + + /* We allow SIGQUIT (quickdie) at all times */ + sigdelset(&BlockSig, SIGQUIT); + + /* + * Create a resource owner to keep track of our resources (not clear that + * we need this, but may as well have one). + */ + CurrentResourceOwner = ResourceOwnerCreate(NULL, "Wal Receiver"); + + /* + * Create a memory context that we will do all our work in. We do this so + * that we can reset the context during error recovery and thereby avoid + * possible memory leaks. + */ + walrcv_context = AllocSetContextCreate(TopMemoryContext, + "Wal Receiver", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + MemoryContextSwitchTo(walrcv_context); + + /* + * If an exception is encountered, processing resumes here. + * + * This code is heavily based on bgwriter.c, q.v. + */ + if (sigsetjmp(local_sigjmp_buf, 1) != 0) + { + /* Since not using PG_TRY, must reset error stack by hand */ + error_context_stack = NULL; + + /* Reset WalRcvImmediateInterruptOK */ + DisableImmediateExit(); + + /* Prevent interrupts while cleaning up */ + HOLD_INTERRUPTS(); + + /* Report the error to the server log */ + EmitErrorReport(); + + /* Free the data structure related to a connection */ + PQfinish(streamConn); + streamConn = NULL; + if (recvBuf != NULL) + PQfreemem(recvBuf); + recvBuf = NULL; + + /* + * Now return to normal top-level context and clear ErrorContext for + * next time. + */ + MemoryContextSwitchTo(walrcv_context); + FlushErrorState(); + + /* Flush any leaked data in the top-level context */ + MemoryContextResetAndDeleteChildren(walrcv_context); + + /* Now we can allow interrupts again */ + RESUME_INTERRUPTS(); + + /* + * Sleep at least 1 second after any error. A write error is likely + * to be repeated, and we don't want to be filling the error logs as + * fast as we can. + */ + pg_usleep(1000000L); + } + + /* We can now handle ereport(ERROR) */ + PG_exception_stack = &local_sigjmp_buf; + + /* Unblock signals (they were blocked when the postmaster forked us) */ + PG_SETMASK(&UnBlockSig); + + /* Establish the connection to the primary for XLOG streaming */ + WalRcvConnect(); + + /* Main loop of walreceiver */ + WalRcvLoop(); + + PG_RETURN_VOID(); /* WalRcvLoop() never returns, but keep compiler quiet */ +} + +/* Main loop of walreceiver process */ +static void +WalRcvLoop(void) +{ + /* Loop until end-of-streaming or error */ + for (;;) + { + /* + * Emergency bailout if postmaster has died. This is to avoid the + * necessity for manual cleanup of all postmaster children. + */ + if (!PostmasterIsAlive(true)) + exit(1); + + /* + * Exit walreceiver if we're not in recovery. This should not happen, + * but cross-check the status here. + */ + if (!RecoveryInProgress()) + ereport(FATAL, + (errmsg("cannot continue XLOG streaming, recovery has already ended"))); + + /* Process any requests or signals received recently */ + ProcessWalRcvInterrupts(); + + if (got_SIGHUP) + { + got_SIGHUP = false; + ProcessConfigFile(PGC_SIGHUP); + } + + /* Wait a while for data to arrive */ + if (WalRcvWait(NAPTIME_PER_CYCLE)) + { + /* data has arrived. Process it */ + if (PQconsumeInput(streamConn) == 0) + ereport(ERROR, + (errmsg("could not read xlog records: %s", + PQerrorMessage(streamConn)))); + XLogRecv(); + } + } +} + +/* Advertise our pid in shared memory, so that startup process can kill us. */ +static void +InitWalRcv(void) +{ + /* use volatile pointer to prevent code rearrangement */ + volatile WalRcvData *walrcv = WalRcv; + + /* + * WalRcv should be set up already (if we are a backend, we inherit + * this by fork() or EXEC_BACKEND mechanism from the postmaster). + */ + if (walrcv == NULL) + elog(PANIC, "walreceiver control data uninitialized"); + + /* If we've already been requested to stop, don't start up */ + SpinLockAcquire(&walrcv->mutex); + Assert(walrcv->pid == 0); + if (walrcv->walRcvState == WALRCV_STOPPED || + walrcv->walRcvState == WALRCV_STOPPING) + { + walrcv->walRcvState = WALRCV_STOPPED; + SpinLockRelease(&walrcv->mutex); + proc_exit(1); + } + walrcv->pid = MyProcPid; + SpinLockRelease(&walrcv->mutex); + + /* Arrange to clean up at walreceiver exit */ + on_shmem_exit(WalRcvKill, 0); +} + +/* + * Establish the connection to the primary server for XLOG streaming + */ +static void +WalRcvConnect(void) +{ + char conninfo[MAXCONNINFO + 14]; + char *primary_sysid; + char standby_sysid[32]; + TimeLineID primary_tli; + TimeLineID standby_tli; + PGresult *res; + XLogRecPtr recptr; + char cmd[64]; + /* use volatile pointer to prevent code rearrangement */ + volatile WalRcvData *walrcv = WalRcv; + + /* + * Set up a connection for XLOG streaming + */ + SpinLockAcquire(&walrcv->mutex); + snprintf(conninfo, sizeof(conninfo), "%s replication=true", walrcv->conninfo); + recptr = walrcv->receivedUpto; + SpinLockRelease(&walrcv->mutex); + + /* initialize local XLOG pointers */ + LogstreamResult.Write = LogstreamResult.Flush = recptr; + + Assert(recptr.xlogid != 0 || recptr.xrecoff != 0); + + EnableImmediateExit(); + streamConn = PQconnectdb(conninfo); + DisableImmediateExit(); + if (PQstatus(streamConn) != CONNECTION_OK) + ereport(ERROR, + (errmsg("could not connect to the primary server : %s", + PQerrorMessage(streamConn)))); + + /* + * Get the system identifier and timeline ID as a DataRow message + * from the primary server. + */ + EnableImmediateExit(); + res = PQexec(streamConn, "IDENTIFY_SYSTEM"); + DisableImmediateExit(); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + PQclear(res); + ereport(ERROR, + (errmsg("could not receive the SYSID and timeline ID from " + "the primary server: %s", + PQerrorMessage(streamConn)))); + } + if (PQnfields(res) != 2 || PQntuples(res) != 1) + { + int ntuples = PQntuples(res); + int nfields = PQnfields(res); + PQclear(res); + ereport(ERROR, + (errmsg("invalid response from primary server"), + errdetail("expected 1 tuple with 2 fields, got %d tuples with %d fields", + ntuples, nfields))); + } + primary_sysid = PQgetvalue(res, 0, 0); + primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0); + + /* + * Confirm that the system identifier of the primary is the same + * as ours. + */ + snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT, + GetSystemIdentifier()); + if (strcmp(primary_sysid, standby_sysid) != 0) + { + PQclear(res); + ereport(ERROR, + (errmsg("system differs between the primary and standby"), + errdetail("the primary SYSID is %s, standby SYSID is %s", + primary_sysid, standby_sysid))); + } + + /* + * Confirm that the current timeline of the primary is the same + * as the recovery target timeline. + */ + standby_tli = GetRecoveryTargetTLI(); + PQclear(res); + if (primary_tli != standby_tli) + ereport(ERROR, + (errmsg("timeline %u of the primary does not match recovery target timeline %u", + primary_tli, standby_tli))); + ThisTimeLineID = primary_tli; + + /* Start streaming from the point requested by startup process */ + snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X", recptr.xlogid, recptr.xrecoff); + EnableImmediateExit(); + res = PQexec(streamConn, cmd); + DisableImmediateExit(); + if (PQresultStatus(res) != PGRES_COPY_OUT) + ereport(ERROR, + (errmsg("could not start XLOG streaming: %s", + PQerrorMessage(streamConn)))); + PQclear(res); + + /* + * Process the outstanding messages before beginning to wait for + * new message to arrive. + */ + XLogRecv(); +} + +/* + * Wait until we can read WAL stream, or timeout. + * + * Returns true if data has become available for reading, false if timed out + * or interrupted by signal. + * + * This is based on pqSocketCheck. + */ +static bool +WalRcvWait(int timeout_ms) +{ + int ret; + + Assert(streamConn != NULL); + if (PQsocket(streamConn) < 0) + ereport(ERROR, + (errcode_for_socket_access(), + errmsg("socket not open"))); + + /* We use poll(2) if available, otherwise select(2) */ + { +#ifdef HAVE_POLL + struct pollfd input_fd; + + input_fd.fd = PQsocket(streamConn); + input_fd.events = POLLIN | POLLERR; + input_fd.revents = 0; + + ret = poll(&input_fd, 1, timeout_ms); +#else /* !HAVE_POLL */ + + fd_set input_mask; + struct timeval timeout; + struct timeval *ptr_timeout; + + FD_ZERO(&input_mask); + FD_SET(PQsocket(streamConn), &input_mask); + + if (timeout_ms < 0) + ptr_timeout = NULL; + else + { + timeout.tv_sec = timeout_ms / 1000; + timeout.tv_usec = (timeout_ms % 1000) * 1000; + ptr_timeout = &timeout; + } + + ret = select(PQsocket(streamConn) + 1, &input_mask, + NULL, NULL, ptr_timeout); +#endif /* HAVE_POLL */ + } + + if (ret == 0 || (ret < 0 && errno == EINTR)) + return false; + if (ret < 0) + ereport(ERROR, + (errcode_for_socket_access(), + errmsg("select() failed: %m"))); + return true; +} + +/* + * Clear our pid from shared memory at exit. + */ +static void +WalRcvKill(int code, Datum arg) +{ + /* use volatile pointer to prevent code rearrangement */ + volatile WalRcvData *walrcv = WalRcv; + bool stopped = false; + + SpinLockAcquire(&walrcv->mutex); + if (walrcv->walRcvState == WALRCV_STOPPING || + walrcv->walRcvState == WALRCV_STOPPED) + { + walrcv->walRcvState = WALRCV_STOPPED; + stopped = true; + elog(LOG, "walreceiver stopped"); + } + walrcv->pid = 0; + SpinLockRelease(&walrcv->mutex); + + PQfinish(streamConn); + + /* If requested to stop, tell postmaster to not restart us. */ + if (stopped) + SendPostmasterSignal(PMSIGNAL_SHUTDOWN_WALRECEIVER); +} + +/* SIGHUP: set flag to re-read config file at next convenient time */ +static void +WalRcvSigHupHandler(SIGNAL_ARGS) +{ + got_SIGHUP = true; +} + +/* SIGTERM: set flag for main loop, or shutdown immediately if safe */ +static void +WalRcvShutdownHandler(SIGNAL_ARGS) +{ + got_SIGTERM = true; + + /* Don't joggle the elbow of proc_exit */ + if (!proc_exit_inprogress && WalRcvImmediateInterruptOK) + ProcessWalRcvInterrupts(); +} + +/* + * WalRcvQuickDieHandler() occurs when signalled SIGQUIT by the postmaster. + * + * Some backend has bought the farm, so we need to stop what we're doing and + * exit. + */ +static void +WalRcvQuickDieHandler(SIGNAL_ARGS) +{ + PG_SETMASK(&BlockSig); + + /* + * We DO NOT want to run proc_exit() callbacks -- we're here because + * shared memory may be corrupted, so we don't want to try to clean up our + * transaction. Just nail the windows shut and get out of town. Now that + * there's an atexit callback to prevent third-party code from breaking + * things by calling exit() directly, we have to reset the callbacks + * explicitly to make this work as intended. + */ + on_exit_reset(); + + /* + * Note we do exit(2) not exit(0). This is to force the postmaster into a + * system reset cycle if some idiot DBA sends a manual SIGQUIT to a random + * backend. This is necessary precisely because we don't clean up our + * shared memory state. (The "dead man switch" mechanism in pmsignal.c + * should ensure the postmaster sees this as a crash, too, but no harm + * in being doubly sure.) + */ + exit(2); +} + +/* + * Receive any WAL records available without blocking from XLOG stream and + * write it to the disk. + */ +static void +XLogRecv(void) +{ + XLogRecPtr *recptr; + int len; + + for (;;) + { + /* Receive CopyData message */ + len = PQgetCopyData(streamConn, &recvBuf, 1); + if (len == 0) /* no records available yet, then return */ + break; + if (len == -1) /* end-of-streaming or error */ + { + PGresult *res; + + res = PQgetResult(streamConn); + if (PQresultStatus(res) == PGRES_COMMAND_OK) + { + PQclear(res); + ereport(ERROR, + (errmsg("replication terminated by primary server"))); + } + PQclear(res); + ereport(ERROR, + (errmsg("could not read xlog records: %s", + PQerrorMessage(streamConn)))); + } + if (len < -1) + ereport(ERROR, + (errmsg("could not read xlog records: %s", + PQerrorMessage(streamConn)))); + + if (len < sizeof(XLogRecPtr)) + ereport(ERROR, + (errmsg("invalid WAL message received from primary"))); + + /* Write received WAL records to disk */ + recptr = (XLogRecPtr *) recvBuf; + XLogWalRcvWrite(recvBuf + sizeof(XLogRecPtr), + len - sizeof(XLogRecPtr), *recptr); + + if (recvBuf != NULL) + PQfreemem(recvBuf); + recvBuf = NULL; + } + + /* + * Now that we've written some records, flush them to disk and let the + * startup process know about them. + */ + XLogWalRcvFlush(); +} + +/* + * Write XLOG data to disk. + */ +static void +XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr) +{ + int startoff; + int byteswritten; + + while (nbytes > 0) + { + int segbytes; + + if (recvFile < 0 || !XLByteInSeg(recptr, recvId, recvSeg)) + { + bool use_existent; + + /* + * XLOG segment files will be re-read in recovery operation soon, + * so we don't need to advise the OS to release any cache page. + */ + if (recvFile >= 0) + { + /* + * fsync() before we switch to next file. We would otherwise + * have to reopen this file to fsync it later + */ + XLogWalRcvFlush(); + if (close(recvFile) != 0) + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not close log file %u, segment %u: %m", + recvId, recvSeg))); + } + recvFile = -1; + + /* Create/use new log file */ + XLByteToSeg(recptr, recvId, recvSeg); + use_existent = true; + recvFile = XLogFileInit(recvId, recvSeg, + &use_existent, true); + recvOff = 0; + } + + /* Calculate the start offset of the received logs */ + startoff = recptr.xrecoff % XLogSegSize; + + if (startoff + nbytes > XLogSegSize) + segbytes = XLogSegSize - startoff; + else + segbytes = nbytes; + + /* Need to seek in the file? */ + if (recvOff != startoff) + { + if (lseek(recvFile, (off_t) startoff, SEEK_SET) < 0) + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not seek in log file %u, " + "segment %u to offset %u: %m", + recvId, recvSeg, startoff))); + recvOff = startoff; + } + + /* OK to write the logs */ + errno = 0; + + byteswritten = write(recvFile, buf, segbytes); + if (byteswritten <= 0) + { + /* if write didn't set errno, assume no disk space */ + if (errno == 0) + errno = ENOSPC; + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not write to log file %u, segment %u " + "at offset %u, length %lu: %m", + recvId, recvSeg, + recvOff, (unsigned long) segbytes))); + } + + /* Update state for write */ + XLByteAdvance(recptr, byteswritten); + + recvOff += byteswritten; + nbytes -= byteswritten; + buf += byteswritten; + + LogstreamResult.Write = recptr; + + /* + * XXX: Should we signal bgwriter to start a restartpoint + * if we've consumed too much xlog since the last one, like + * in normal processing? But this is not worth doing unless + * a restartpoint can be created independently from a + * checkpoint record. + */ + } +} + +/* Flush the log to disk */ +static void +XLogWalRcvFlush(void) +{ + if (XLByteLT(LogstreamResult.Flush, LogstreamResult.Write)) + { + /* use volatile pointer to prevent code rearrangement */ + volatile WalRcvData *walrcv = WalRcv; + char activitymsg[50]; + + issue_xlog_fsync(recvFile, recvId, recvSeg); + + LogstreamResult.Flush = LogstreamResult.Write; + + /* Update shared-memory status */ + SpinLockAcquire(&walrcv->mutex); + walrcv->receivedUpto = LogstreamResult.Flush; + SpinLockRelease(&walrcv->mutex); + + /* Report XLOG streaming progress in PS display */ + snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X", + LogstreamResult.Write.xlogid, LogstreamResult.Write.xrecoff); + set_ps_display(activitymsg, false); + } +} diff --git a/src/backend/replication/walreceiverfuncs.c b/src/backend/replication/walreceiverfuncs.c new file mode 100644 index 00000000000..4342e252d65 --- /dev/null +++ b/src/backend/replication/walreceiverfuncs.c @@ -0,0 +1,262 @@ +/*------------------------------------------------------------------------- + * + * walreceiverfuncs.c + * + * This file contains functions used by the startup process to communicate + * with the walreceiver process. Functions implementing walreceiver itself + * are in src/backend/replication/walreceiver subdirectory. + * + * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group + * + * + * IDENTIFICATION + * $PostgreSQL: pgsql/src/backend/replication/walreceiverfuncs.c,v 1.1 2010/01/15 09:19:03 heikki Exp $ + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include <sys/types.h> +#include <sys/stat.h> +#include <unistd.h> +#include <signal.h> + +#include "access/xlog_internal.h" +#include "replication/walreceiver.h" +#include "storage/fd.h" +#include "storage/pmsignal.h" +#include "storage/shmem.h" +#include "utils/guc.h" + +WalRcvData *WalRcv = NULL; + +static bool CheckForStandbyTrigger(void); +static void ShutdownWalRcv(void); + +/* Report shared memory space needed by WalRcvShmemInit */ +Size +WalRcvShmemSize(void) +{ + Size size = 0; + + size = add_size(size, sizeof(WalRcvData)); + + return size; +} + +/* Allocate and initialize walreceiver-related shared memory */ +void +WalRcvShmemInit(void) +{ + bool found; + + WalRcv = (WalRcvData *) + ShmemInitStruct("Wal Receiver Ctl", WalRcvShmemSize(), &found); + + if (WalRcv == NULL) + ereport(FATAL, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("not enough shared memory for walreceiver"))); + if (found) + return; /* already initialized */ + + /* Initialize the data structures */ + MemSet(WalRcv, 0, WalRcvShmemSize()); + WalRcv->walRcvState = WALRCV_NOT_STARTED; + SpinLockInit(&WalRcv->mutex); +} + +/* Is walreceiver in progress (or starting up)? */ +bool +WalRcvInProgress(void) +{ + /* use volatile pointer to prevent code rearrangement */ + volatile WalRcvData *walrcv = WalRcv; + WalRcvState state; + + SpinLockAcquire(&walrcv->mutex); + state = walrcv->walRcvState; + SpinLockRelease(&walrcv->mutex); + + if (state == WALRCV_RUNNING || state == WALRCV_STOPPING) + return true; + else + return false; +} + +/* + * Wait for the XLOG record at given position to become available. + * + * 'recptr' indicates the byte position which caller wants to read the + * XLOG record up to. The byte position actually written and flushed + * by walreceiver is returned. It can be higher than the requested + * location, and the caller can safely read up to that point without + * calling WaitNextXLogAvailable() again. + * + * If WAL streaming is ended (because a trigger file is found), *finished + * is set to true and function returns immediately. The returned position + * can be lower than requested in that case. + * + * Called by the startup process during streaming recovery. + */ +XLogRecPtr +WaitNextXLogAvailable(XLogRecPtr recptr, bool *finished) +{ + static XLogRecPtr receivedUpto = {0, 0}; + + *finished = false; + + /* Quick exit if already known available */ + if (XLByteLT(recptr, receivedUpto)) + return receivedUpto; + + for (;;) + { + /* use volatile pointer to prevent code rearrangement */ + volatile WalRcvData *walrcv = WalRcv; + + /* Update local status */ + SpinLockAcquire(&walrcv->mutex); + receivedUpto = walrcv->receivedUpto; + SpinLockRelease(&walrcv->mutex); + + /* If available already, leave here */ + if (XLByteLT(recptr, receivedUpto)) + return receivedUpto; + + /* Check to see if the trigger file exists */ + if (CheckForStandbyTrigger()) + { + *finished = true; + return receivedUpto; + } + + pg_usleep(100000L); /* 100ms */ + + /* + * This possibly-long loop needs to handle interrupts of startup + * process. + */ + HandleStartupProcInterrupts(); + + /* + * Emergency bailout if postmaster has died. This is to avoid the + * necessity for manual cleanup of all postmaster children. + */ + if (!PostmasterIsAlive(true)) + exit(1); + } +} + +/* + * Stop walreceiver and wait for it to die. + */ +static void +ShutdownWalRcv(void) +{ + /* use volatile pointer to prevent code rearrangement */ + volatile WalRcvData *walrcv = WalRcv; + pid_t walrcvpid; + + /* + * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED + * mode once it's finished, and will also request postmaster to not + * restart itself. + */ + SpinLockAcquire(&walrcv->mutex); + Assert(walrcv->walRcvState == WALRCV_RUNNING); + walrcv->walRcvState = WALRCV_STOPPING; + walrcvpid = walrcv->pid; + SpinLockRelease(&walrcv->mutex); + + /* + * Pid can be 0, if no walreceiver process is active right now. + * Postmaster should restart it, and when it does, it will see the + * STOPPING state. + */ + if (walrcvpid != 0) + kill(walrcvpid, SIGTERM); + + /* + * Wait for walreceiver to acknowledge its death by setting state to + * WALRCV_STOPPED. + */ + while (WalRcvInProgress()) + { + /* + * This possibly-long loop needs to handle interrupts of startup + * process. + */ + HandleStartupProcInterrupts(); + + pg_usleep(100000); /* 100ms */ + } +} + +/* + * Check to see if the trigger file exists. If it does, request postmaster + * to shut down walreceiver and wait for it to exit, and remove the trigger + * file. + */ +static bool +CheckForStandbyTrigger(void) +{ + struct stat stat_buf; + + if (TriggerFile == NULL) + return false; + + if (stat(TriggerFile, &stat_buf) == 0) + { + ereport(LOG, + (errmsg("trigger file found: %s", TriggerFile))); + ShutdownWalRcv(); + unlink(TriggerFile); + return true; + } + return false; +} + +/* + * Request postmaster to start walreceiver. + * + * recptr indicates the position where streaming should begin, and conninfo + * is a libpq connection string to use. + */ +void +RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo) +{ + /* use volatile pointer to prevent code rearrangement */ + volatile WalRcvData *walrcv = WalRcv; + + Assert(walrcv->walRcvState == WALRCV_NOT_STARTED); + + /* locking is just pro forma here; walreceiver isn't started yet */ + SpinLockAcquire(&walrcv->mutex); + walrcv->receivedUpto = recptr; + if (conninfo != NULL) + strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO); + else + walrcv->conninfo[0] = '\0'; + walrcv->walRcvState = WALRCV_RUNNING; + SpinLockRelease(&walrcv->mutex); + + SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER); +} + +/* + * Returns the byte position that walreceiver has written + */ +XLogRecPtr +GetWalRcvWriteRecPtr(void) +{ + /* use volatile pointer to prevent code rearrangement */ + volatile WalRcvData *walrcv = WalRcv; + XLogRecPtr recptr; + + SpinLockAcquire(&walrcv->mutex); + recptr = walrcv->receivedUpto; + SpinLockRelease(&walrcv->mutex); + + return recptr; +} diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c new file mode 100644 index 00000000000..8a3352e0755 --- /dev/null +++ b/src/backend/replication/walsender.c @@ -0,0 +1,851 @@ +/*------------------------------------------------------------------------- + * + * walsender.c + * + * The WAL sender process (walsender) is new as of Postgres 8.5. It takes + * charge of XLOG streaming sender in the primary server. At first, it is + * started by the postmaster when the walreceiver in the standby server + * connects to the primary server and requests XLOG streaming replication, + * i.e., unlike any auxiliary process, it is not an always-running process. + * It attempts to keep reading XLOG records from the disk and sending them + * to the standby server, as long as the connection is alive (i.e., like + * any backend, there is an one to one relationship between a connection + * and a walsender process). + * + * Normal termination is by SIGTERM, which instructs the walsender to + * close the connection and exit(0) at next convenient moment. Emergency + * termination is by SIGQUIT; like any backend, the walsender will simply + * abort and exit on SIGQUIT. A close of the connection and a FATAL error + * are treated as not a crash but approximately normal termination; + * the walsender will exit quickly without sending any more XLOG records. + * + * If the server is shut down, postmaster sends us SIGUSR2 after all + * regular backends have exited and the shutdown checkpoint has been written. + * This instruct walsender to send any outstanding WAL, including the + * shutdown checkpoint record, and then exit. + * + * Note that there can be more than one walsender process concurrently. + * + * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group + * + * + * IDENTIFICATION + * $PostgreSQL: pgsql/src/backend/replication/walsender.c,v 1.1 2010/01/15 09:19:03 heikki Exp $ + * + *------------------------------------------------------------------------- + */ +#include "postgres.h" + +#include <unistd.h> + +#include "access/xlog_internal.h" +#include "catalog/pg_type.h" +#include "libpq/libpq.h" +#include "libpq/pqformat.h" +#include "libpq/pqsignal.h" +#include "miscadmin.h" +#include "replication/walsender.h" +#include "storage/fd.h" +#include "storage/ipc.h" +#include "storage/lock.h" +#include "storage/pmsignal.h" +#include "tcop/tcopprot.h" +#include "utils/guc.h" +#include "utils/memutils.h" +#include "utils/ps_status.h" + +/* Array of WalSnds in shared memory */ +WalSndCtlData *WalSndCtl = NULL; + +/* My slot in the shared memory array */ +static WalSnd *MyWalSnd = NULL; + +/* Global state */ +bool am_walsender = false; /* Am I a walsender process ? */ + +/* User-settable parameters for walsender */ +int MaxWalSenders = 0; /* the maximum number of concurrent walsenders */ +int WalSndDelay = 200; /* max sleep time between some actions */ + +#define NAPTIME_PER_CYCLE 100 /* max sleep time between cycles (100ms) */ + +/* + * These variables are used similarly to openLogFile/Id/Seg/Off, + * but for walsender to read the XLOG. + */ +static int sendFile = -1; +static uint32 sendId = 0; +static uint32 sendSeg = 0; +static uint32 sendOff = 0; + +/* + * How far have we sent WAL already? This is also advertised in + * MyWalSnd->sentPtr. + */ +static XLogRecPtr sentPtr = {0, 0}; + +/* Flags set by signal handlers for later service in main loop */ +static volatile sig_atomic_t got_SIGHUP = false; +static volatile sig_atomic_t shutdown_requested = false; +static volatile sig_atomic_t ready_to_stop = false; + +/* Signal handlers */ +static void WalSndSigHupHandler(SIGNAL_ARGS); +static void WalSndShutdownHandler(SIGNAL_ARGS); +static void WalSndQuickDieHandler(SIGNAL_ARGS); + +/* Prototypes for private functions */ +static int WalSndLoop(void); +static void InitWalSnd(void); +static void WalSndHandshake(void); +static void WalSndKill(int code, Datum arg); +static void XLogRead(char *buf, XLogRecPtr recptr, Size nbytes); +static bool XLogSend(StringInfo outMsg); +static void CheckClosedConnection(void); + +/* + * How much WAL to send in one message? Must be >= XLOG_BLCKSZ. + */ +#define MAX_SEND_SIZE (XLOG_SEG_SIZE / 2) + +/* Main entry point for walsender process */ +int +WalSenderMain(void) +{ + MemoryContext walsnd_context; + + if (!superuser()) + ereport(FATAL, + (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE), + errmsg("must be superuser to start walsender"))); + + /* Create a per-walsender data structure in shared memory */ + InitWalSnd(); + + /* + * Create a memory context that we will do all our work in. We do this so + * that we can reset the context during error recovery and thereby avoid + * possible memory leaks. Formerly this code just ran in + * TopMemoryContext, but resetting that would be a really bad idea. + * + * XXX: we don't actually attempt error recovery in walsender, we just + * close the connection and exit. + */ + walsnd_context = AllocSetContextCreate(TopMemoryContext, + "Wal Sender", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + MemoryContextSwitchTo(walsnd_context); + + /* Unblock signals (they were blocked when the postmaster forked us) */ + PG_SETMASK(&UnBlockSig); + + /* Tell the standby that walsender is ready for receiving commands */ + ReadyForQuery(DestRemote); + + /* Handle handshake messages before streaming */ + WalSndHandshake(); + + /* Main loop of walsender */ + return WalSndLoop(); +} + +static void +WalSndHandshake(void) +{ + StringInfoData input_message; + bool replication_started = false; + + initStringInfo(&input_message); + + while (!replication_started) + { + int firstchar; + + /* Wait for a command to arrive */ + firstchar = pq_getbyte(); + + /* + * Check for any other interesting events that happened while we + * slept. + */ + if (got_SIGHUP) + { + got_SIGHUP = false; + ProcessConfigFile(PGC_SIGHUP); + } + + if (firstchar == EOF) + { + /* standby disconnected */ + ereport(COMMERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("unexpected EOF on standby connection"))); + } + else + { + /* + * Read the message contents. This is expected to be done without + * blocking because we've been able to get message type code. + */ + if (pq_getmessage(&input_message, 0)) + firstchar = EOF; /* suitable message already logged */ + } + + + /* Handle the very limited subset of commands expected in this phase */ + + switch (firstchar) + { + case 'Q': /* Query message */ + { + const char *query_string; + XLogRecPtr recptr; + + query_string = pq_getmsgstring(&input_message); + pq_getmsgend(&input_message); + + if (strcmp(query_string, "IDENTIFY_SYSTEM") == 0) + { + StringInfoData buf; + char sysid[32]; + char tli[11]; + + /* + * Reply with a result set with one row, two columns. + * First col is system ID, and second if timeline ID + */ + + snprintf(sysid, sizeof(sysid), UINT64_FORMAT, + GetSystemIdentifier()); + snprintf(tli, sizeof(tli), "%u", ThisTimeLineID); + + /* Send a RowDescription message */ + pq_beginmessage(&buf, 'T'); + pq_sendint(&buf, 2, 2); /* 2 fields */ + + /* first field */ + pq_sendstring(&buf, "systemid"); /* col name */ + pq_sendint(&buf, 0, 4); /* table oid */ + pq_sendint(&buf, 0, 2); /* attnum */ + pq_sendint(&buf, TEXTOID, 4); /* type oid */ + pq_sendint(&buf, -1, 2); /* typlen */ + pq_sendint(&buf, 0, 4); /* typmod */ + pq_sendint(&buf, 0, 2); /* format code */ + + /* second field */ + pq_sendstring(&buf, "timeline"); /* col name */ + pq_sendint(&buf, 0, 4); /* table oid */ + pq_sendint(&buf, 0, 2); /* attnum */ + pq_sendint(&buf, INT4OID, 4); /* type oid */ + pq_sendint(&buf, 4, 2); /* typlen */ + pq_sendint(&buf, 0, 4); /* typmod */ + pq_sendint(&buf, 0, 2); /* format code */ + pq_endmessage(&buf); + + /* Send a DataRow message */ + pq_beginmessage(&buf, 'D'); + pq_sendint(&buf, 2, 2); /* # of columns */ + pq_sendint(&buf, strlen(sysid), 4); /* col1 len */ + pq_sendbytes(&buf, (char *) &sysid, strlen(sysid)); + pq_sendint(&buf, strlen(tli), 4); /* col2 len */ + pq_sendbytes(&buf, (char *) tli, strlen(tli)); + pq_endmessage(&buf); + + /* Send CommandComplete and ReadyForQuery messages */ + EndCommand("SELECT", DestRemote); + ReadyForQuery(DestRemote); + } + else if (sscanf(query_string, "START_REPLICATION %X/%X", + &recptr.xlogid, &recptr.xrecoff) == 2) + { + StringInfoData buf; + + /* Send a CopyOutResponse message, and start streaming */ + pq_beginmessage(&buf, 'H'); + pq_sendbyte(&buf, 0); + pq_sendint(&buf, 0, 2); + pq_endmessage(&buf); + + /* + * Initialize position to the received one, then + * the xlog records begin to be shipped from that position + */ + sentPtr = recptr; + + /* break out of the loop */ + replication_started = true; + } + else + { + ereport(FATAL, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("invalid standby query string: %s", query_string))); + } + break; + } + + /* 'X' means that the standby is closing the connection */ + case 'X': + proc_exit(0); + + case EOF: + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("unexpected EOF on standby connection"))); + + default: + ereport(FATAL, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("invalid standby handshake message type %d", firstchar))); + } + } +} + +/* + * Check if the remote end has closed the connection. + */ +static void +CheckClosedConnection(void) +{ + unsigned char firstchar; + int r; + + r = pq_getbyte_if_available(&firstchar); + if (r < 0) + { + /* no data available */ + if (errno == EAGAIN || errno == EWOULDBLOCK) + return; + + /* + * Ok if interrupted, though it shouldn't really happen with + * a non-blocking operation. + */ + if (errno == EINTR) + return; + + ereport(COMMERROR, + (errcode_for_socket_access(), + errmsg("could not receive data from client: %m"))); + } + if (r == 0) + { + /* standby disconnected unexpectedly */ + ereport(ERROR, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("unexpected EOF on standby connection"))); + } + + /* Handle the very limited subset of commands expected in this phase */ + switch (firstchar) + { + /* + * 'X' means that the standby is closing down the socket. EOF means + * unexpected loss of standby connection. Either way, perform normal + * shutdown. + */ + case 'X': + proc_exit(0); + + default: + ereport(FATAL, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("invalid standby closing message type %d", + firstchar))); + } +} + +/* Main loop of walsender process */ +static int +WalSndLoop(void) +{ + StringInfoData output_message; + + initStringInfo(&output_message); + + /* Loop forever */ + for (;;) + { + int remain; /* remaining time (ms) */ + + /* + * Emergency bailout if postmaster has died. This is to avoid the + * necessity for manual cleanup of all postmaster children. + */ + if (!PostmasterIsAlive(true)) + exit(1); + /* Process any requests or signals received recently */ + if (got_SIGHUP) + { + got_SIGHUP = false; + ProcessConfigFile(PGC_SIGHUP); + } + + /* + * When SIGUSR2 arrives, we send all outstanding logs up to the + * shutdown checkpoint record (i.e., the latest record) and exit. + */ + if (ready_to_stop) + { + XLogSend(&output_message); + shutdown_requested = true; + } + + /* Normal exit from the walsender is here */ + if (shutdown_requested) + { + /* Inform the standby that XLOG streaming was done */ + pq_puttextmessage('C', "COPY 0"); + pq_flush(); + + proc_exit(0); + } + + /* + * Nap for the configured time or until a message arrives. + * + * On some platforms, signals won't interrupt the sleep. To ensure we + * respond reasonably promptly when someone signals us, break down the + * sleep into NAPTIME_PER_CYCLE (ms) increments, and check for + * interrupts after each nap. + */ + remain = WalSndDelay; + while (remain > 0) + { + if (got_SIGHUP || shutdown_requested || ready_to_stop) + break; + + /* + * Check to see whether a message from the standby or an interrupt + * from other processes has arrived. + */ + pg_usleep(remain > NAPTIME_PER_CYCLE ? NAPTIME_PER_CYCLE : remain); + CheckClosedConnection(); + + remain -= NAPTIME_PER_CYCLE; + } + + /* Attempt to send the log once every loop */ + if (!XLogSend(&output_message)) + goto eof; + } + + /* can't get here because the above loop never exits */ + return 1; + +eof: + /* + * Reset whereToSendOutput to prevent ereport from attempting + * to send any more messages to the standby. + */ + if (whereToSendOutput == DestRemote) + whereToSendOutput = DestNone; + + proc_exit(0); + return 1; /* keep the compiler quiet */ +} + +/* Initialize a per-walsender data structure for this walsender process */ +static void +InitWalSnd(void) +{ + /* use volatile pointer to prevent code rearrangement */ + volatile WalSndCtlData *walsndctl = WalSndCtl; + int i; + + /* + * WalSndCtl should be set up already (we inherit this by fork() or + * EXEC_BACKEND mechanism from the postmaster). + */ + Assert(walsndctl != NULL); + Assert(MyWalSnd == NULL); + + /* + * Find a free walsender slot and reserve it. If this fails, we must be + * out of WalSnd structures. + */ + for (i = 0; i < MaxWalSenders; i++) + { + volatile WalSnd *walsnd = &WalSndCtl->walsnds[i]; + + SpinLockAcquire(&walsnd->mutex); + + if (walsnd->pid != 0) + { + SpinLockRelease(&walsnd->mutex); + continue; + } + else + { + /* found */ + MyWalSnd = (WalSnd *) walsnd; + walsnd->pid = MyProcPid; + MemSet(&MyWalSnd->sentPtr, 0, sizeof(XLogRecPtr)); + SpinLockRelease(&walsnd->mutex); + break; + } + } + if (MyWalSnd == NULL) + ereport(FATAL, + (errcode(ERRCODE_TOO_MANY_CONNECTIONS), + errmsg("sorry, too many standbys already"))); + + /* Arrange to clean up at walsender exit */ + on_shmem_exit(WalSndKill, 0); +} + +/* Destroy the per-walsender data structure for this walsender process */ +static void +WalSndKill(int code, Datum arg) +{ + Assert(MyWalSnd != NULL); + + /* + * Mark WalSnd struct no longer in use. Assume that no lock is required + * for this. + */ + MyWalSnd->pid = 0; + + /* WalSnd struct isn't mine anymore */ + MyWalSnd = NULL; +} + +/* + * Read 'nbytes' bytes from WAL into 'buf', starting at location 'recptr' + */ +void +XLogRead(char *buf, XLogRecPtr recptr, Size nbytes) +{ + char path[MAXPGPATH]; + uint32 startoff; + + while (nbytes > 0) + { + int segbytes; + int readbytes; + + startoff = recptr.xrecoff % XLogSegSize; + + if (sendFile < 0 || !XLByteInSeg(recptr, sendId, sendSeg)) + { + /* Switch to another logfile segment */ + if (sendFile >= 0) + close(sendFile); + + XLByteToSeg(recptr, sendId, sendSeg); + XLogFilePath(path, ThisTimeLineID, sendId, sendSeg); + + sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0); + if (sendFile < 0) + ereport(FATAL, /* XXX: Why FATAL? */ + (errcode_for_file_access(), + errmsg("could not open file \"%s\" (log file %u, segment %u): %m", + path, sendId, sendSeg))); + sendOff = 0; + } + + /* Need to seek in the file? */ + if (sendOff != startoff) + { + if (lseek(sendFile, (off_t) startoff, SEEK_SET) < 0) + ereport(FATAL, + (errcode_for_file_access(), + errmsg("could not seek in log file %u, segment %u to offset %u: %m", + sendId, sendSeg, startoff))); + sendOff = startoff; + } + + /* How many bytes are within this segment? */ + if (nbytes > (XLogSegSize - startoff)) + segbytes = XLogSegSize - startoff; + else + segbytes = nbytes; + + readbytes = read(sendFile, buf, segbytes); + if (readbytes <= 0) + ereport(FATAL, + (errcode_for_file_access(), + errmsg("could not read from log file %u, segment %u, offset %u, " + "length %lu: %m", + sendId, sendSeg, sendOff, (unsigned long) segbytes))); + + /* Update state for read */ + XLByteAdvance(recptr, readbytes); + + sendOff += readbytes; + nbytes -= readbytes; + buf += readbytes; + } +} + +/* + * Read all WAL that's been written (and flushed) since last cycle, and send + * it to client. + * + * Returns true if OK, false if trouble. + */ +static bool +XLogSend(StringInfo outMsg) +{ + XLogRecPtr SendRqstPtr; + char activitymsg[50]; + /* use volatile pointer to prevent code rearrangement */ + volatile WalSnd *walsnd = MyWalSnd; + + /* + * Invalid position means that we have not yet received the initial + * CopyData message from the slave that indicates where to start the + * streaming. + */ + if (sentPtr.xlogid == 0 && + sentPtr.xrecoff == 0) + return true; + + /* Attempt to send all the records which were written to the disk */ + SendRqstPtr = GetWriteRecPtr(); + + /* Quick exit if nothing to do */ + if (!XLByteLT(sentPtr, SendRqstPtr)) + return true; + + /* + * Since successive pages in a segment are consecutively written, + * we can gather multiple records together by issuing just one + * read() call, and send them as one CopyData message at one time; + * nmsgs is the number of CopyData messages sent in this XLogSend; + * npages is the number of pages we have determined can be read and + * sent together; startpos is the starting position of reading and + * sending in the first page, startoff is the file offset at which + * it should go and endpos is the end position of reading and + * sending in the last page. We must initialize all of them to + * keep the compiler quiet. + */ + + while (XLByteLT(sentPtr, SendRqstPtr)) + { + XLogRecPtr startptr; + XLogRecPtr endptr; + Size nbytes; + + /* + * Figure out how much to send in one message. If there's less than + * MAX_SEND_SIZE bytes to send, send everything. Otherwise send + * MAX_SEND_SIZE bytes, but round to page boundary for efficiency. + */ + startptr = sentPtr; + endptr = startptr; + XLByteAdvance(endptr, MAX_SEND_SIZE); + + /* + * Round down to page boundary. This is not only for performance + * reasons, walreceiver relies on the fact that we never split a WAL + * record across two messages. Since a long WAL record is split at + * page boundary into continuation records, page boundary is always + * safe cut-off point. We also assume that SendRqstPtr never points + * in the middle of a WAL record. + */ + endptr.xrecoff -= (endptr.xrecoff % XLOG_BLCKSZ); + + if (XLByteLT(SendRqstPtr, endptr)) + endptr = SendRqstPtr; + + /* + * OK to read and send the log. + * + * We don't need to convert the xlogid/xrecoff from host byte order + * to network byte order because the both server can be expected to + * have the same byte order. If they have the different order, we + * don't reach here. + */ + pq_sendbytes(outMsg, (char *) &startptr, sizeof(startptr)); + + if (endptr.xlogid != startptr.xlogid) + { + Assert(endptr.xlogid == startptr.xlogid + 1); + nbytes = endptr.xrecoff + XLogFileSize - startptr.xrecoff; + } + else + nbytes = endptr.xrecoff - startptr.xrecoff; + + sentPtr = endptr; + + /* + * Read the log into the output buffer directly to prevent + * extra memcpy calls. + */ + enlargeStringInfo(outMsg, nbytes); + + XLogRead(&outMsg->data[outMsg->len], startptr, nbytes); + outMsg->len += nbytes; + outMsg->data[outMsg->len] = '\0'; + + pq_putmessage('d', outMsg->data, outMsg->len); + resetStringInfo(outMsg); + } + + /* Update shared memory status */ + SpinLockAcquire(&walsnd->mutex); + walsnd->sentPtr = sentPtr; + SpinLockRelease(&walsnd->mutex); + + /* Flush pending output */ + if (pq_flush()) + return false; + + /* Report progress of XLOG streaming in PS display */ + snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X", + sentPtr.xlogid, sentPtr.xrecoff); + set_ps_display(activitymsg, false); + + return true; +} + +/* SIGHUP: set flag to re-read config file at next convenient time */ +static void +WalSndSigHupHandler(SIGNAL_ARGS) +{ + got_SIGHUP = true; +} + +/* SIGTERM: set flag to shut down */ +static void +WalSndShutdownHandler(SIGNAL_ARGS) +{ + shutdown_requested = true; +} + +/* + * WalSndQuickDieHandler() occurs when signalled SIGQUIT by the postmaster. + * + * Some backend has bought the farm, + * so we need to stop what we're doing and exit. + */ +static void +WalSndQuickDieHandler(SIGNAL_ARGS) +{ + PG_SETMASK(&BlockSig); + + /* + * We DO NOT want to run proc_exit() callbacks -- we're here because + * shared memory may be corrupted, so we don't want to try to clean up our + * transaction. Just nail the windows shut and get out of town. Now that + * there's an atexit callback to prevent third-party code from breaking + * things by calling exit() directly, we have to reset the callbacks + * explicitly to make this work as intended. + */ + on_exit_reset(); + + /* + * Note we do exit(2) not exit(0). This is to force the postmaster into a + * system reset cycle if some idiot DBA sends a manual SIGQUIT to a random + * backend. This is necessary precisely because we don't clean up our + * shared memory state. (The "dead man switch" mechanism in pmsignal.c + * should ensure the postmaster sees this as a crash, too, but no harm + * in being doubly sure.) + */ + exit(2); +} + +/* SIGUSR2: set flag to do a last cycle and shut down afterwards */ +static void +WalSndLastCycleHandler(SIGNAL_ARGS) +{ + ready_to_stop = true; +} + +/* Set up signal handlers */ +void +WalSndSignals(void) +{ + /* Set up signal handlers */ + pqsignal(SIGHUP, WalSndSigHupHandler); /* set flag to read config file */ + pqsignal(SIGINT, SIG_IGN); /* not used */ + pqsignal(SIGTERM, WalSndShutdownHandler); /* request shutdown */ + pqsignal(SIGQUIT, WalSndQuickDieHandler); /* hard crash time */ + pqsignal(SIGALRM, SIG_IGN); + pqsignal(SIGPIPE, SIG_IGN); + pqsignal(SIGUSR1, SIG_IGN); /* not used */ + pqsignal(SIGUSR2, WalSndLastCycleHandler); /* request a last cycle and shutdown */ + + /* Reset some signals that are accepted by postmaster but not here */ + pqsignal(SIGCHLD, SIG_DFL); + pqsignal(SIGTTIN, SIG_DFL); + pqsignal(SIGTTOU, SIG_DFL); + pqsignal(SIGCONT, SIG_DFL); + pqsignal(SIGWINCH, SIG_DFL); +} + +/* Report shared-memory space needed by WalSndShmemInit */ +Size +WalSndShmemSize(void) +{ + Size size = 0; + + size = offsetof(WalSndCtlData, walsnds); + size = add_size(size, mul_size(MaxWalSenders, sizeof(WalSnd))); + + return size; +} + +/* Allocate and initialize walsender-related shared memory */ +void +WalSndShmemInit(void) +{ + bool found; + int i; + + WalSndCtl = (WalSndCtlData *) + ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found); + + if (WalSndCtl == NULL) + ereport(FATAL, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("not enough shared memory for walsender"))); + if (found) + return; /* already initialized */ + + /* Initialize the data structures */ + MemSet(WalSndCtl, 0, WalSndShmemSize()); + + for (i = 0; i < MaxWalSenders; i++) + { + WalSnd *walsnd = &WalSndCtl->walsnds[i]; + SpinLockInit(&walsnd->mutex); + } +} + +/* + * Returns the oldest Send position among walsenders. Or InvalidXLogRecPtr + * if none. + */ +XLogRecPtr +GetOldestWALSendPointer(void) +{ + XLogRecPtr oldest = {0, 0}; + int i; + bool found = false; + + for (i = 0; i < MaxWalSenders; i++) + { + /* use volatile pointer to prevent code rearrangement */ + volatile WalSnd *walsnd = &WalSndCtl->walsnds[i]; + XLogRecPtr recptr; + + if (walsnd->pid == 0) + continue; + + SpinLockAcquire(&walsnd->mutex); + recptr = walsnd->sentPtr; + SpinLockRelease(&walsnd->mutex); + + if (recptr.xlogid == 0 && recptr.xrecoff == 0) + continue; + + if (!found || XLByteLT(recptr, oldest)) + oldest = recptr; + found = true; + } + return oldest; +} diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index 134521c91cd..01a9fabc8c9 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/storage/ipc/ipci.c,v 1.102 2010/01/02 16:57:51 momjian Exp $ + * $PostgreSQL: pgsql/src/backend/storage/ipc/ipci.c,v 1.103 2010/01/15 09:19:03 heikki Exp $ * *------------------------------------------------------------------------- */ @@ -25,6 +25,8 @@ #include "postmaster/autovacuum.h" #include "postmaster/bgwriter.h" #include "postmaster/postmaster.h" +#include "replication/walreceiver.h" +#include "replication/walsender.h" #include "storage/bufmgr.h" #include "storage/ipc.h" #include "storage/pg_shmem.h" @@ -116,6 +118,8 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port) size = add_size(size, ProcSignalShmemSize()); size = add_size(size, BgWriterShmemSize()); size = add_size(size, AutoVacuumShmemSize()); + size = add_size(size, WalSndShmemSize()); + size = add_size(size, WalRcvShmemSize()); size = add_size(size, BTreeShmemSize()); size = add_size(size, SyncScanShmemSize()); #ifdef EXEC_BACKEND @@ -213,6 +217,8 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port) ProcSignalShmemInit(); BgWriterShmemInit(); AutoVacuumShmemInit(); + WalSndShmemInit(); + WalRcvShmemInit(); /* * Set up other modules that need some shared memory space diff --git a/src/backend/storage/ipc/pmsignal.c b/src/backend/storage/ipc/pmsignal.c index 0199c8b577c..2a414a0c2f3 100644 --- a/src/backend/storage/ipc/pmsignal.c +++ b/src/backend/storage/ipc/pmsignal.c @@ -8,7 +8,7 @@ * Portions Copyright (c) 1994, Regents of the University of California * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/storage/ipc/pmsignal.c,v 1.29 2010/01/02 16:57:51 momjian Exp $ + * $PostgreSQL: pgsql/src/backend/storage/ipc/pmsignal.c,v 1.30 2010/01/15 09:19:03 heikki Exp $ * *------------------------------------------------------------------------- */ @@ -45,11 +45,15 @@ * process is actively using shared memory. The slots are assigned to * child processes at random, and postmaster.c is responsible for tracking * which one goes with which PID. + * + * The fourth state, WALSENDER, is just like ACTIVE, but carries the extra + * information that the child is a WAL sender. */ #define PM_CHILD_UNUSED 0 /* these values must fit in sig_atomic_t */ #define PM_CHILD_ASSIGNED 1 #define PM_CHILD_ACTIVE 2 +#define PM_CHILD_WALSENDER 3 /* "typedef struct PMSignalData PMSignalData" appears in pmsignal.h */ struct PMSignalData @@ -193,6 +197,22 @@ ReleasePostmasterChildSlot(int slot) } /* + * IsPostmasterChildWalSender - check if given slot is in use by a + * walsender process. + */ +bool +IsPostmasterChildWalSender(int slot) +{ + Assert(slot > 0 && slot <= PMSignalState->num_child_flags); + slot--; + + if (PMSignalState->PMChildFlags[slot] == PM_CHILD_WALSENDER) + return true; + else + return false; +} + +/* * MarkPostmasterChildActive - mark a postmaster child as about to begin * actively using shared memory. This is called in the child process. */ @@ -208,6 +228,22 @@ MarkPostmasterChildActive(void) } /* + * MarkPostmasterChildWalSender - like MarkPostmasterChildActive(), but + * marks the postmaster child as a WAL sender instead of a regular backend. + * This is called in the child process. + */ +void +MarkPostmasterChildWalSender(void) +{ + int slot = MyPMChildSlot; + + Assert(slot > 0 && slot <= PMSignalState->num_child_flags); + slot--; + Assert(PMSignalState->PMChildFlags[slot] == PM_CHILD_ASSIGNED); + PMSignalState->PMChildFlags[slot] = PM_CHILD_WALSENDER; +} + +/* * MarkPostmasterChildInactive - mark a postmaster child as done using * shared memory. This is called in the child process. */ @@ -218,7 +254,8 @@ MarkPostmasterChildInactive(void) Assert(slot > 0 && slot <= PMSignalState->num_child_flags); slot--; - Assert(PMSignalState->PMChildFlags[slot] == PM_CHILD_ACTIVE); + Assert(PMSignalState->PMChildFlags[slot] == PM_CHILD_ACTIVE || + PMSignalState->PMChildFlags[slot] == PM_CHILD_WALSENDER); PMSignalState->PMChildFlags[slot] = PM_CHILD_ASSIGNED; } diff --git a/src/backend/storage/lmgr/proc.c b/src/backend/storage/lmgr/proc.c index ca7a19e4f4c..91065b368cd 100644 --- a/src/backend/storage/lmgr/proc.c +++ b/src/backend/storage/lmgr/proc.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/storage/lmgr/proc.c,v 1.211 2010/01/02 16:57:52 momjian Exp $ + * $PostgreSQL: pgsql/src/backend/storage/lmgr/proc.c,v 1.212 2010/01/15 09:19:03 heikki Exp $ * *------------------------------------------------------------------------- */ @@ -39,6 +39,7 @@ #include "access/xact.h" #include "miscadmin.h" #include "postmaster/autovacuum.h" +#include "replication/walsender.h" #include "storage/ipc.h" #include "storage/lmgr.h" #include "storage/pmsignal.h" @@ -290,7 +291,12 @@ InitProcess(void) * this; it probably should.) */ if (IsUnderPostmaster && !IsAutoVacuumLauncherProcess()) - MarkPostmasterChildActive(); + { + if (am_walsender) + MarkPostmasterChildWalSender(); + else + MarkPostmasterChildActive(); + } /* * Initialize all fields of MyProc, except for the semaphore which was diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index 28560b94083..ad8678e2200 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/tcop/postgres.c,v 1.581 2010/01/07 16:29:58 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/tcop/postgres.c,v 1.582 2010/01/15 09:19:04 heikki Exp $ * * NOTES * this is the "main" module of the postgres backend and @@ -56,6 +56,7 @@ #include "parser/parser.h" #include "postmaster/autovacuum.h" #include "postmaster/postmaster.h" +#include "replication/walsender.h" #include "rewrite/rewriteHandler.h" #include "storage/bufmgr.h" #include "storage/ipc.h" @@ -3331,36 +3332,41 @@ PostgresMain(int argc, char *argv[], const char *username) * an issue for signals that are locally generated, such as SIGALRM and * SIGPIPE.) */ - pqsignal(SIGHUP, SigHupHandler); /* set flag to read config file */ - pqsignal(SIGINT, StatementCancelHandler); /* cancel current query */ - pqsignal(SIGTERM, die); /* cancel current query and exit */ - - /* - * In a standalone backend, SIGQUIT can be generated from the keyboard - * easily, while SIGTERM cannot, so we make both signals do die() rather - * than quickdie(). - */ - if (IsUnderPostmaster) - pqsignal(SIGQUIT, quickdie); /* hard crash time */ + if (am_walsender) + WalSndSignals(); else - pqsignal(SIGQUIT, die); /* cancel current query and exit */ - pqsignal(SIGALRM, handle_sig_alarm); /* timeout conditions */ + { + pqsignal(SIGHUP, SigHupHandler); /* set flag to read config file */ + pqsignal(SIGINT, StatementCancelHandler); /* cancel current query */ + pqsignal(SIGTERM, die); /* cancel current query and exit */ - /* - * Ignore failure to write to frontend. Note: if frontend closes - * connection, we will notice it and exit cleanly when control next - * returns to outer loop. This seems safer than forcing exit in the midst - * of output during who-knows-what operation... - */ - pqsignal(SIGPIPE, SIG_IGN); - pqsignal(SIGUSR1, procsignal_sigusr1_handler); - pqsignal(SIGUSR2, SIG_IGN); - pqsignal(SIGFPE, FloatExceptionHandler); + /* + * In a standalone backend, SIGQUIT can be generated from the keyboard + * easily, while SIGTERM cannot, so we make both signals do die() rather + * than quickdie(). + */ + if (IsUnderPostmaster) + pqsignal(SIGQUIT, quickdie); /* hard crash time */ + else + pqsignal(SIGQUIT, die); /* cancel current query and exit */ + pqsignal(SIGALRM, handle_sig_alarm); /* timeout conditions */ - /* - * Reset some signals that are accepted by postmaster but not by backend - */ - pqsignal(SIGCHLD, SIG_DFL); /* system() requires this on some platforms */ + /* + * Ignore failure to write to frontend. Note: if frontend closes + * connection, we will notice it and exit cleanly when control next + * returns to outer loop. This seems safer than forcing exit in the midst + * of output during who-knows-what operation... + */ + pqsignal(SIGPIPE, SIG_IGN); + pqsignal(SIGUSR1, procsignal_sigusr1_handler); + pqsignal(SIGUSR2, SIG_IGN); + pqsignal(SIGFPE, FloatExceptionHandler); + + /* + * Reset some signals that are accepted by postmaster but not by backend + */ + pqsignal(SIGCHLD, SIG_DFL); /* system() requires this on some platforms */ + } pqinitmask(); @@ -3456,6 +3462,10 @@ PostgresMain(int argc, char *argv[], const char *username) if (IsUnderPostmaster && Log_disconnections) on_proc_exit(log_disconnections, 0); + /* If this is a WAL sender process, we're done with initialization. */ + if (am_walsender) + proc_exit(WalSenderMain()); + /* * process any libraries that should be preloaded at backend start (this * likewise can't be done until GUC settings are complete) diff --git a/src/backend/utils/init/postinit.c b/src/backend/utils/init/postinit.c index 8c10af7d7ca..891d8d4efd5 100644 --- a/src/backend/utils/init/postinit.c +++ b/src/backend/utils/init/postinit.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/utils/init/postinit.c,v 1.200 2010/01/02 16:57:56 momjian Exp $ + * $PostgreSQL: pgsql/src/backend/utils/init/postinit.c,v 1.201 2010/01/15 09:19:04 heikki Exp $ * * *------------------------------------------------------------------------- @@ -36,6 +36,7 @@ #include "pgstat.h" #include "postmaster/autovacuum.h" #include "postmaster/postmaster.h" +#include "replication/walsender.h" #include "storage/bufmgr.h" #include "storage/fd.h" #include "storage/ipc.h" @@ -446,6 +447,7 @@ BaseInit(void) * In bootstrap mode no parameters are used. The autovacuum launcher process * doesn't use any parameters either, because it only goes far enough to be * able to read pg_database; it doesn't connect to any particular database. + * In walsender mode only username is used. * * As of PostgreSQL 8.2, we expect InitProcess() was already called, so we * already have a PGPROC struct ... but it's not completely filled in yet. @@ -557,10 +559,10 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username, * Set up the global variables holding database id and default tablespace. * But note we won't actually try to touch the database just yet. * - * We take a shortcut in the bootstrap case, otherwise we have to look up - * the db's entry in pg_database. + * We take a shortcut in the bootstrap and walsender case, otherwise we + * have to look up the db's entry in pg_database. */ - if (bootstrap) + if (bootstrap || am_walsender) { MyDatabaseId = TemplateDbOid; MyDatabaseTableSpace = DEFAULTTABLESPACE_OID; @@ -623,7 +625,7 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username, * AccessShareLock for such sessions and thereby not conflict against * CREATE DATABASE. */ - if (!bootstrap) + if (!bootstrap && !am_walsender) LockSharedObject(DatabaseRelationId, MyDatabaseId, 0, RowExclusiveLock); @@ -632,7 +634,7 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username, * If there was a concurrent DROP DATABASE, this ensures we will die * cleanly without creating a mess. */ - if (!bootstrap) + if (!bootstrap && !am_walsender) { HeapTuple tuple; @@ -652,7 +654,7 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username, */ fullpath = GetDatabasePath(MyDatabaseId, MyDatabaseTableSpace); - if (!bootstrap) + if (!bootstrap && !am_walsender) { if (access(fullpath, F_OK) == -1) { @@ -727,7 +729,7 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username, * database-access infrastructure is up. (Also, it wants to know if the * user is a superuser, so the above stuff has to happen first.) */ - if (!bootstrap) + if (!bootstrap && !am_walsender) CheckMyDatabase(dbname, am_superuser); /* @@ -824,6 +826,10 @@ InitPostgres(const char *in_dbname, Oid dboid, const char *username, /* initialize client encoding */ InitializeClientEncoding(); + /* reset the database for walsender */ + if (am_walsender) + MyProc->databaseId = MyDatabaseId = InvalidOid; + /* report this backend in the PgBackendStatus array */ if (!bootstrap) pgstat_bestart(); diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 38d577f87bb..274030195fb 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -10,7 +10,7 @@ * Written by Peter Eisentraut <peter_e@gmx.net>. * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/utils/misc/guc.c,v 1.532 2010/01/07 04:53:35 tgl Exp $ + * $PostgreSQL: pgsql/src/backend/utils/misc/guc.c,v 1.533 2010/01/15 09:19:04 heikki Exp $ * *-------------------------------------------------------------------- */ @@ -55,6 +55,7 @@ #include "postmaster/postmaster.h" #include "postmaster/syslogger.h" #include "postmaster/walwriter.h" +#include "replication/walsender.h" #include "storage/bufmgr.h" #include "storage/fd.h" #include "tcop/tcopprot.h" @@ -494,6 +495,8 @@ const char *const config_group_names[] = gettext_noop("Write-Ahead Log / Settings"), /* WAL_CHECKPOINTS */ gettext_noop("Write-Ahead Log / Checkpoints"), + /* WAL_REPLICATION */ + gettext_noop("Write-Ahead Log / Replication"), /* QUERY_TUNING */ gettext_noop("Query Tuning"), /* QUERY_TUNING_METHOD */ @@ -1698,6 +1701,26 @@ static struct config_int ConfigureNamesInt[] = }, { + /* see max_connections */ + {"max_wal_senders", PGC_POSTMASTER, WAL_REPLICATION, + gettext_noop("Sets the maximum number of simultaneously running WAL sender processes."), + NULL + }, + &MaxWalSenders, + 0, 0, INT_MAX / 4, NULL, NULL + }, + + { + {"wal_sender_delay", PGC_SIGHUP, WAL_REPLICATION, + gettext_noop("WAL sender sleep time between WAL replications."), + NULL, + GUC_UNIT_MS + }, + &WalSndDelay, + 200, 1, 10000, NULL, NULL + }, + + { {"commit_delay", PGC_USERSET, WAL_SETTINGS, gettext_noop("Sets the delay in microseconds between transaction commit and " "flushing WAL to disk."), diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index c4ddeaf2bca..2b4e761096f 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -185,6 +185,12 @@ #max_standby_delay = 30 # max acceptable standby lag (s) to help queries # complete without conflict; -1 disables +# - Replication - + +#max_wal_senders = 0 # max number of walsender processes +#wal_sender_delay = 200ms # 1-10000 milliseconds + + #------------------------------------------------------------------------------ # QUERY TUNING #------------------------------------------------------------------------------ diff --git a/src/include/Makefile b/src/include/Makefile index ed88dca3942..c89960c7d62 100644 --- a/src/include/Makefile +++ b/src/include/Makefile @@ -4,7 +4,7 @@ # # 'make install' installs whole contents of src/include. # -# $PostgreSQL: pgsql/src/include/Makefile,v 1.30 2010/01/05 01:06:56 tgl Exp $ +# $PostgreSQL: pgsql/src/include/Makefile,v 1.31 2010/01/15 09:19:05 heikki Exp $ # #------------------------------------------------------------------------- @@ -18,8 +18,8 @@ all: pg_config.h pg_config_os.h # Subdirectories containing headers for server-side dev SUBDIRS = access bootstrap catalog commands executor foreign lib libpq mb \ - nodes optimizer parser postmaster regex rewrite storage tcop \ - snowball snowball/libstemmer tsearch tsearch/dicts utils \ + nodes optimizer parser postmaster regex replication rewrite storage \ + tcop snowball snowball/libstemmer tsearch tsearch/dicts utils \ port port/win32 port/win32_msvc port/win32_msvc/sys \ port/win32/arpa port/win32/netinet port/win32/sys \ portability diff --git a/src/include/access/xlog.h b/src/include/access/xlog.h index 1d18cb5b1b9..20083e14c54 100644 --- a/src/include/access/xlog.h +++ b/src/include/access/xlog.h @@ -6,7 +6,7 @@ * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/include/access/xlog.h,v 1.95 2010/01/02 16:58:00 momjian Exp $ + * $PostgreSQL: pgsql/src/include/access/xlog.h,v 1.96 2010/01/15 09:19:06 heikki Exp $ */ #ifndef XLOG_H #define XLOG_H @@ -188,6 +188,18 @@ extern int MaxStandbyDelay; #define XLogArchiveCommandSet() (XLogArchiveCommand[0] != '\0') #define XLogStandbyInfoActive() (XLogRequestRecoveryConnections && XLogArchiveMode) +/* + * This is in walsender.c, but declared here so that we don't need to include + * walsender.h in all files that check XLogIsNeeded() + */ +extern int MaxWalSenders; + +/* + * Is WAL-logging necessary? We need to log an XLOG record iff either + * WAL archiving is enabled or XLOG streaming is allowed. + */ +#define XLogIsNeeded() (XLogArchivingActive() || (MaxWalSenders > 0)) + #ifdef WAL_DEBUG extern bool XLOG_DEBUG; #endif @@ -228,12 +240,19 @@ typedef struct CheckpointStatsData extern CheckpointStatsData CheckpointStats; +/* Read from recovery.conf, in startup process */ +extern char *TriggerFile; + extern XLogRecPtr XLogInsert(RmgrId rmid, uint8 info, XLogRecData *rdata); extern void XLogFlush(XLogRecPtr RecPtr); extern void XLogBackgroundFlush(void); extern void XLogAsyncCommitFlush(void); extern bool XLogNeedsFlush(XLogRecPtr RecPtr); +extern int XLogFileInit(uint32 log, uint32 seg, + bool *use_existent, bool use_lock); +extern int XLogFileOpen(uint32 log, uint32 seg); + extern void XLogSetAsyncCommitLSN(XLogRecPtr record); @@ -242,11 +261,14 @@ extern void RestoreBkpBlocks(XLogRecPtr lsn, XLogRecord *record, bool cleanup); extern void xlog_redo(XLogRecPtr lsn, XLogRecord *record); extern void xlog_desc(StringInfo buf, uint8 xl_info, char *rec); +extern void issue_xlog_fsync(int fd, uint32 log, uint32 seg); + extern bool RecoveryInProgress(void); extern bool XLogInsertAllowed(void); extern TimestampTz GetLatestXLogTime(void); extern void UpdateControlFile(void); +extern uint64 GetSystemIdentifier(void); extern Size XLOGShmemSize(void); extern void XLOGShmemInit(void); extern void BootStrapXLOG(void); @@ -258,8 +280,11 @@ extern bool CreateRestartPoint(int flags); extern void XLogPutNextOid(Oid nextOid); extern XLogRecPtr GetRedoRecPtr(void); extern XLogRecPtr GetInsertRecPtr(void); +extern XLogRecPtr GetWriteRecPtr(void); extern void GetNextXidAndEpoch(TransactionId *xid, uint32 *epoch); +extern TimeLineID GetRecoveryTargetTLI(void); +extern void HandleStartupProcInterrupts(void); extern void StartupProcessMain(void); #endif /* XLOG_H */ diff --git a/src/include/access/xlog_internal.h b/src/include/access/xlog_internal.h index a3614665e18..cfb7f0a4de6 100644 --- a/src/include/access/xlog_internal.h +++ b/src/include/access/xlog_internal.h @@ -11,7 +11,7 @@ * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/include/access/xlog_internal.h,v 1.27 2010/01/02 16:58:01 momjian Exp $ + * $PostgreSQL: pgsql/src/include/access/xlog_internal.h,v 1.28 2010/01/15 09:19:06 heikki Exp $ */ #ifndef XLOG_INTERNAL_H #define XLOG_INTERNAL_H @@ -151,6 +151,19 @@ typedef XLogLongPageHeaderData *XLogLongPageHeader; } \ } while (0) +/* Align a record pointer to next page */ +#define NextLogPage(recptr) \ + do { \ + if (recptr.xrecoff % XLOG_BLCKSZ != 0) \ + recptr.xrecoff += \ + (XLOG_BLCKSZ - recptr.xrecoff % XLOG_BLCKSZ); \ + if (recptr.xrecoff >= XLogFileSize) \ + { \ + (recptr.xlogid)++; \ + recptr.xrecoff = 0; \ + } \ + } while (0) + /* * Compute ID and segment from an XLogRecPtr. * @@ -253,6 +266,8 @@ extern Datum pg_stop_backup(PG_FUNCTION_ARGS); extern Datum pg_switch_xlog(PG_FUNCTION_ARGS); extern Datum pg_current_xlog_location(PG_FUNCTION_ARGS); extern Datum pg_current_xlog_insert_location(PG_FUNCTION_ARGS); +extern Datum pg_last_xlog_receive_location(PG_FUNCTION_ARGS); +extern Datum pg_last_xlog_replay_location(PG_FUNCTION_ARGS); extern Datum pg_xlogfile_name_offset(PG_FUNCTION_ARGS); extern Datum pg_xlogfile_name(PG_FUNCTION_ARGS); extern Datum pg_is_in_recovery(PG_FUNCTION_ARGS); diff --git a/src/include/access/xlogdefs.h b/src/include/access/xlogdefs.h index 8f524df75e2..8ecc3a21b1c 100644 --- a/src/include/access/xlogdefs.h +++ b/src/include/access/xlogdefs.h @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/include/access/xlogdefs.h,v 1.24 2010/01/02 16:58:01 momjian Exp $ + * $PostgreSQL: pgsql/src/include/access/xlogdefs.h,v 1.25 2010/01/15 09:19:06 heikki Exp $ */ #ifndef XLOG_DEFS_H #define XLOG_DEFS_H @@ -57,6 +57,22 @@ typedef struct XLogRecPtr /* + * Macro for advancing a record pointer by the specified number of bytes. + */ +#define XLByteAdvance(recptr, nbytes) \ + do { \ + if (recptr.xrecoff + nbytes >= XLogFileSize) \ + { \ + recptr.xlogid += 1; \ + recptr.xrecoff \ + = recptr.xrecoff + nbytes - XLogFileSize; \ + } \ + else \ + recptr.xrecoff += nbytes; \ + } while (0) + + +/* * TimeLineID (TLI) - identifies different database histories to prevent * confusion after restoring a prior state of a database installation. * TLI does not change in a normal stop/restart of the database (including diff --git a/src/include/bootstrap/bootstrap.h b/src/include/bootstrap/bootstrap.h index eef42018eb4..5e989eff4ec 100644 --- a/src/include/bootstrap/bootstrap.h +++ b/src/include/bootstrap/bootstrap.h @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/include/bootstrap/bootstrap.h,v 1.54 2010/01/02 16:58:01 momjian Exp $ + * $PostgreSQL: pgsql/src/include/bootstrap/bootstrap.h,v 1.55 2010/01/15 09:19:06 heikki Exp $ * *------------------------------------------------------------------------- */ @@ -23,6 +23,7 @@ typedef enum StartupProcess, BgWriterProcess, WalWriterProcess, + WalReceiverProcess, NUM_AUXPROCTYPES /* Must be last! */ } AuxProcType; diff --git a/src/include/catalog/catversion.h b/src/include/catalog/catversion.h index e981823320c..b2c92860f07 100644 --- a/src/include/catalog/catversion.h +++ b/src/include/catalog/catversion.h @@ -37,7 +37,7 @@ * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/include/catalog/catversion.h,v 1.572 2010/01/14 16:31:09 teodor Exp $ + * $PostgreSQL: pgsql/src/include/catalog/catversion.h,v 1.573 2010/01/15 09:19:07 heikki Exp $ * *------------------------------------------------------------------------- */ @@ -53,6 +53,6 @@ */ /* yyyymmddN */ -#define CATALOG_VERSION_NO 201001141 +#define CATALOG_VERSION_NO 201001151 #endif diff --git a/src/include/catalog/pg_proc.h b/src/include/catalog/pg_proc.h index 7a16e3faa73..a498cc51720 100644 --- a/src/include/catalog/pg_proc.h +++ b/src/include/catalog/pg_proc.h @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/include/catalog/pg_proc.h,v 1.561 2010/01/14 16:31:09 teodor Exp $ + * $PostgreSQL: pgsql/src/include/catalog/pg_proc.h,v 1.562 2010/01/15 09:19:07 heikki Exp $ * * NOTES * The script catalog/genbki.pl reads this file and generates .bki @@ -3290,6 +3290,11 @@ DESCR("xlog filename, given an xlog location"); DATA(insert OID = 3810 ( pg_is_in_recovery PGNSP PGUID 12 1 0 0 f f f t f v 0 0 16 "" _null_ _null_ _null_ _null_ pg_is_in_recovery _null_ _null_ _null_ )); DESCR("true if server is in recovery"); +DATA(insert OID = 3820 ( pg_last_xlog_receive_location PGNSP PGUID 12 1 0 0 f f f t f v 0 0 25 "" _null_ _null_ _null_ _null_ pg_last_xlog_receive_location _null_ _null_ _null_ )); +DESCR("current xlog flush location"); +DATA(insert OID = 3821 ( pg_last_xlog_replay_location PGNSP PGUID 12 1 0 0 f f f t f v 0 0 25 "" _null_ _null_ _null_ _null_ pg_last_xlog_replay_location _null_ _null_ _null_ )); +DESCR("last xlog replay location"); + DATA(insert OID = 2621 ( pg_reload_conf PGNSP PGUID 12 1 0 0 f f f t f v 0 0 16 "" _null_ _null_ _null_ _null_ pg_reload_conf _null_ _null_ _null_ )); DESCR("reload configuration files"); DATA(insert OID = 2622 ( pg_rotate_logfile PGNSP PGUID 12 1 0 0 f f f t f v 0 0 16 "" _null_ _null_ _null_ _null_ pg_rotate_logfile _null_ _null_ _null_ )); diff --git a/src/include/libpq/libpq-be.h b/src/include/libpq/libpq-be.h index 1d358715288..6ee4714489b 100644 --- a/src/include/libpq/libpq-be.h +++ b/src/include/libpq/libpq-be.h @@ -11,7 +11,7 @@ * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/include/libpq/libpq-be.h,v 1.73 2010/01/10 14:16:08 mha Exp $ + * $PostgreSQL: pgsql/src/include/libpq/libpq-be.h,v 1.74 2010/01/15 09:19:08 heikki Exp $ * *------------------------------------------------------------------------- */ @@ -104,6 +104,7 @@ typedef struct typedef struct Port { pgsocket sock; /* File descriptor */ + bool noblock; /* is the socket in non-blocking mode? */ ProtocolVersion proto; /* FE/BE protocol version */ SockAddr laddr; /* local addr (postmaster) */ SockAddr raddr; /* remote addr (client) */ diff --git a/src/include/libpq/libpq.h b/src/include/libpq/libpq.h index c9c7e0d7640..c8fa2778824 100644 --- a/src/include/libpq/libpq.h +++ b/src/include/libpq/libpq.h @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/include/libpq/libpq.h,v 1.73 2010/01/10 14:16:08 mha Exp $ + * $PostgreSQL: pgsql/src/include/libpq/libpq.h,v 1.74 2010/01/15 09:19:08 heikki Exp $ * *------------------------------------------------------------------------- */ @@ -57,6 +57,7 @@ extern int pq_getstring(StringInfo s); extern int pq_getmessage(StringInfo s, int maxlen); extern int pq_getbyte(void); extern int pq_peekbyte(void); +extern int pq_getbyte_if_available(unsigned char *c); extern int pq_putbytes(const char *s, size_t len); extern int pq_flush(void); extern int pq_putmessage(char msgtype, const char *s, size_t len); diff --git a/src/include/replication/walreceiver.h b/src/include/replication/walreceiver.h new file mode 100644 index 00000000000..7651a696a0f --- /dev/null +++ b/src/include/replication/walreceiver.h @@ -0,0 +1,70 @@ +/*------------------------------------------------------------------------- + * + * walreceiver.h + * Exports from replication/walreceiverfuncs.c. + * + * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group + * + * $PostgreSQL: pgsql/src/include/replication/walreceiver.h,v 1.1 2010/01/15 09:19:09 heikki Exp $ + * + *------------------------------------------------------------------------- + */ +#ifndef _WALRECEIVER_H +#define _WALRECEIVER_H + +#include "storage/spin.h" + +/* + * MAXCONNINFO: maximum size of a connection string. + * + * XXX: Should this move to pg_config_manual.h? + */ +#define MAXCONNINFO 1024 + +/* + * Values for WalRcv->walRcvState. + */ +typedef enum +{ + WALRCV_NOT_STARTED, + WALRCV_RUNNING, /* walreceiver has been started */ + WALRCV_STOPPING, /* requested to stop, but still running */ + WALRCV_STOPPED /* stopped and mustn't start up again */ +} WalRcvState; + +/* Shared memory area for management of walreceiver process */ +typedef struct +{ + /* + * connection string; is used for walreceiver to connect with + * the primary. + */ + char conninfo[MAXCONNINFO]; + + /* + * PID of currently active walreceiver process, and the current state. + */ + pid_t pid; + WalRcvState walRcvState; + + /* + * receivedUpto-1 is the last byte position that has been already + * received. When startup process starts the walreceiver, it sets this + * to the point where it wants the streaming to begin. After that, + * walreceiver updates this whenever it flushes the received WAL. + */ + XLogRecPtr receivedUpto; + + slock_t mutex; /* locks shared variables shown above */ +} WalRcvData; + +extern WalRcvData *WalRcv; + +extern Size WalRcvShmemSize(void); +extern void WalRcvShmemInit(void); +extern bool WalRcvInProgress(void); +extern XLogRecPtr WaitNextXLogAvailable(XLogRecPtr recptr, bool *finished); +extern void RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo); +extern XLogRecPtr GetWalRcvWriteRecPtr(void); + +#endif /* _WALRECEIVER_H */ diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h new file mode 100644 index 00000000000..c9bfd12e8bc --- /dev/null +++ b/src/include/replication/walsender.h @@ -0,0 +1,49 @@ +/*------------------------------------------------------------------------- + * + * walsender.h + * Exports from replication/walsender.c. + * + * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group + * + * $PostgreSQL: pgsql/src/include/replication/walsender.h,v 1.1 2010/01/15 09:19:09 heikki Exp $ + * + *------------------------------------------------------------------------- + */ +#ifndef _WALSENDER_H +#define _WALSENDER_H + +#include "access/xlog.h" +#include "storage/spin.h" + +/* + * Each walsender has a WalSnd struct in shared memory. + */ +typedef struct WalSnd +{ + pid_t pid; /* this walsender's process id, or 0 */ + XLogRecPtr sentPtr; /* WAL has been sent up to this point */ + + slock_t mutex; /* locks shared variables shown above */ +} WalSnd; + +/* There is one WalSndCtl struct for the whole database cluster */ +typedef struct +{ + WalSnd walsnds[1]; /* VARIABLE LENGTH ARRAY */ +} WalSndCtlData; + +extern WalSndCtlData *WalSndCtl; + +/* global state */ +extern bool am_walsender; + +/* user-settable parameters */ +extern int WalSndDelay; + +extern int WalSenderMain(void); +extern void WalSndSignals(void); +extern Size WalSndShmemSize(void); +extern void WalSndShmemInit(void); +extern XLogRecPtr GetOldestWALSendPointer(void); + +#endif /* _WALSENDER_H */ diff --git a/src/include/storage/pmsignal.h b/src/include/storage/pmsignal.h index 4db0f8c1db9..75ef17a5a0a 100644 --- a/src/include/storage/pmsignal.h +++ b/src/include/storage/pmsignal.h @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/include/storage/pmsignal.h,v 1.27 2010/01/02 16:58:08 momjian Exp $ + * $PostgreSQL: pgsql/src/include/storage/pmsignal.h,v 1.28 2010/01/15 09:19:09 heikki Exp $ * *------------------------------------------------------------------------- */ @@ -29,6 +29,8 @@ typedef enum PMSIGNAL_ROTATE_LOGFILE, /* send SIGUSR1 to syslogger to rotate logfile */ PMSIGNAL_START_AUTOVAC_LAUNCHER, /* start an autovacuum launcher */ PMSIGNAL_START_AUTOVAC_WORKER, /* start an autovacuum worker */ + PMSIGNAL_START_WALRECEIVER, /* start a walreceiver */ + PMSIGNAL_SHUTDOWN_WALRECEIVER, /* shut down a walreceiver */ NUM_PMSIGNALS /* Must be last value of enum! */ } PMSignalReason; @@ -45,7 +47,9 @@ extern void SendPostmasterSignal(PMSignalReason reason); extern bool CheckPostmasterSignal(PMSignalReason reason); extern int AssignPostmasterChildSlot(void); extern bool ReleasePostmasterChildSlot(int slot); +extern bool IsPostmasterChildWalSender(int slot); extern void MarkPostmasterChildActive(void); +extern void MarkPostmasterChildWalSender(void); extern void MarkPostmasterChildInactive(void); extern bool PostmasterIsAlive(bool amDirectChild); diff --git a/src/include/storage/proc.h b/src/include/storage/proc.h index ba725d8fa81..cee02c359d7 100644 --- a/src/include/storage/proc.h +++ b/src/include/storage/proc.h @@ -7,7 +7,7 @@ * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/include/storage/proc.h,v 1.116 2010/01/02 16:58:08 momjian Exp $ + * $PostgreSQL: pgsql/src/include/storage/proc.h,v 1.117 2010/01/15 09:19:09 heikki Exp $ * *------------------------------------------------------------------------- */ @@ -152,10 +152,10 @@ typedef struct PROC_HDR * ie things that aren't full-fledged backends but need shmem access. * * Background writer and WAL writer run during normal operation. Startup - * process also consumes one slot, but WAL writer is launched only after - * startup has exited, so we only need 2 slots. + * process and WAL receiver also consume 2 slots, but WAL writer is + * launched only after startup has exited, so we only need 3 slots. */ -#define NUM_AUXILIARY_PROCS 2 +#define NUM_AUXILIARY_PROCS 3 /* configurable options */ diff --git a/src/include/utils/guc_tables.h b/src/include/utils/guc_tables.h index 4815bc08703..bb96b0e546e 100644 --- a/src/include/utils/guc_tables.h +++ b/src/include/utils/guc_tables.h @@ -7,7 +7,7 @@ * * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group * - * $PostgreSQL: pgsql/src/include/utils/guc_tables.h,v 1.47 2010/01/02 16:58:10 momjian Exp $ + * $PostgreSQL: pgsql/src/include/utils/guc_tables.h,v 1.48 2010/01/15 09:19:09 heikki Exp $ * *------------------------------------------------------------------------- */ @@ -53,6 +53,7 @@ enum config_group WAL, WAL_SETTINGS, WAL_CHECKPOINTS, + WAL_REPLICATION, QUERY_TUNING, QUERY_TUNING_METHOD, QUERY_TUNING_COST, diff --git a/src/interfaces/libpq/fe-connect.c b/src/interfaces/libpq/fe-connect.c index 10233db9f5e..125d93cf0b1 100644 --- a/src/interfaces/libpq/fe-connect.c +++ b/src/interfaces/libpq/fe-connect.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/interfaces/libpq/fe-connect.c,v 1.382 2010/01/02 16:58:11 momjian Exp $ + * $PostgreSQL: pgsql/src/interfaces/libpq/fe-connect.c,v 1.383 2010/01/15 09:19:10 heikki Exp $ * *------------------------------------------------------------------------- */ @@ -227,6 +227,9 @@ static const PQconninfoOption PQconninfoOptions[] = { "GSS-library", "", 7}, /* sizeof("gssapi") = 7 */ #endif + {"replication", NULL, NULL, NULL, + "Replication", "D", 5}, + /* Terminating entry --- MUST BE LAST */ {NULL, NULL, NULL, NULL, NULL, NULL, 0} @@ -472,6 +475,8 @@ connectOptions1(PGconn *conn, const char *conninfo) tmp = conninfo_getval(connOptions, "gsslib"); conn->gsslib = tmp ? strdup(tmp) : NULL; #endif + tmp = conninfo_getval(connOptions, "replication"); + conn->replication = tmp ? strdup(tmp) : NULL; /* * Free the option info - all is in conn now @@ -2136,6 +2141,8 @@ freePGconn(PGconn *conn) free(conn->fbappname); if (conn->dbName) free(conn->dbName); + if (conn->replication) + free(conn->replication); if (conn->pguser) free(conn->pguser); if (conn->pgpass) diff --git a/src/interfaces/libpq/fe-protocol3.c b/src/interfaces/libpq/fe-protocol3.c index e5df9e712d3..4cef2b4eb72 100644 --- a/src/interfaces/libpq/fe-protocol3.c +++ b/src/interfaces/libpq/fe-protocol3.c @@ -8,7 +8,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/interfaces/libpq/fe-protocol3.c,v 1.41 2010/01/02 16:58:12 momjian Exp $ + * $PostgreSQL: pgsql/src/interfaces/libpq/fe-protocol3.c,v 1.42 2010/01/15 09:19:10 heikki Exp $ * *------------------------------------------------------------------------- */ @@ -1909,6 +1909,8 @@ build_startup_packet(const PGconn *conn, char *packet, ADD_STARTUP_OPTION("user", conn->pguser); if (conn->dbName && conn->dbName[0]) ADD_STARTUP_OPTION("database", conn->dbName); + if (conn->replication && conn->replication) + ADD_STARTUP_OPTION("replication", conn->replication); if (conn->pgoptions && conn->pgoptions[0]) ADD_STARTUP_OPTION("options", conn->pgoptions); if (conn->send_appname) diff --git a/src/interfaces/libpq/libpq-int.h b/src/interfaces/libpq/libpq-int.h index 737a6253a13..b19e5266349 100644 --- a/src/interfaces/libpq/libpq-int.h +++ b/src/interfaces/libpq/libpq-int.h @@ -12,7 +12,7 @@ * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group * Portions Copyright (c) 1994, Regents of the University of California * - * $PostgreSQL: pgsql/src/interfaces/libpq/libpq-int.h,v 1.147 2010/01/02 16:58:12 momjian Exp $ + * $PostgreSQL: pgsql/src/interfaces/libpq/libpq-int.h,v 1.148 2010/01/15 09:19:10 heikki Exp $ * *------------------------------------------------------------------------- */ @@ -298,6 +298,7 @@ struct pg_conn char *appname; /* application name */ char *fbappname; /* fallback application name */ char *dbName; /* database name */ + char *replication; /* connect as the replication standby? */ char *pguser; /* Postgres username and password, if any */ char *pgpass; char *sslmode; /* SSL mode (require,prefer,allow,disable) */ |