diff options
Diffstat (limited to 'src/backend/replication')
-rw-r--r-- | src/backend/replication/basebackup.c | 5 | ||||
-rw-r--r-- | src/backend/replication/syncrep.c | 7 | ||||
-rw-r--r-- | src/backend/replication/walreceiver.c | 7 | ||||
-rw-r--r-- | src/backend/replication/walsender.c | 108 |
4 files changed, 100 insertions, 27 deletions
diff --git a/src/backend/replication/basebackup.c b/src/backend/replication/basebackup.c index bcde19c71b6..74d28440bf4 100644 --- a/src/backend/replication/basebackup.c +++ b/src/backend/replication/basebackup.c @@ -339,6 +339,11 @@ SendBaseBackup(BaseBackupCmd *cmd) MemoryContext old_context; basebackup_options opt; + if (am_cascading_walsender) + ereport(FATAL, + (errcode(ERRCODE_CANNOT_CONNECT_NOW), + errmsg("recovery is still in progress, can't accept WAL streaming connections for backup"))); + parse_basebackup_options(cmd->options, &opt); backup_context = AllocSetContextCreate(CurrentMemoryContext, diff --git a/src/backend/replication/syncrep.c b/src/backend/replication/syncrep.c index b73d225a8ef..32db2bc4c52 100644 --- a/src/backend/replication/syncrep.c +++ b/src/backend/replication/syncrep.c @@ -469,6 +469,13 @@ SyncRepGetStandbyPriority(void) int priority = 0; bool found = false; + /* + * Since synchronous cascade replication is not allowed, we always + * set the priority of cascading walsender to zero. + */ + if (am_cascading_walsender) + return 0; + /* Need a modifiable copy of string */ rawstring = pstrdup(SyncRepStandbyNames); diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index ea6f6cdcdaf..c24fa87394d 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -44,6 +44,7 @@ #include "miscadmin.h" #include "replication/walprotocol.h" #include "replication/walreceiver.h" +#include "replication/walsender.h" #include "storage/ipc.h" #include "storage/pmsignal.h" #include "storage/procarray.h" @@ -564,8 +565,10 @@ XLogWalRcvFlush(bool dying) } SpinLockRelease(&walrcv->mutex); - /* Signal the startup process that new WAL has arrived */ + /* Signal the startup process and walsender that new WAL has arrived */ WakeupRecovery(); + if (AllowCascadeReplication()) + WalSndWakeup(); /* Report XLOG streaming progress in PS display */ if (update_process_title) @@ -625,7 +628,7 @@ XLogWalRcvSendReply(void) /* Construct a new message */ reply_message.write = LogstreamResult.Write; reply_message.flush = LogstreamResult.Flush; - reply_message.apply = GetXLogReplayRecPtr(); + reply_message.apply = GetXLogReplayRecPtr(NULL); reply_message.sendTime = now; elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X", diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index bc5b3300d23..63a63048dbb 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -48,6 +48,7 @@ #include "replication/basebackup.h" #include "replication/replnodes.h" #include "replication/walprotocol.h" +#include "replication/walreceiver.h" #include "replication/walsender.h" #include "storage/fd.h" #include "storage/ipc.h" @@ -70,6 +71,7 @@ WalSnd *MyWalSnd = NULL; /* Global state */ bool am_walsender = false; /* Am I a walsender process ? */ +bool am_cascading_walsender = false; /* Am I cascading WAL to another standby ? */ /* User-settable parameters for walsender */ int max_wal_senders = 0; /* the maximum number of concurrent walsenders */ @@ -135,10 +137,7 @@ WalSenderMain(void) { MemoryContext walsnd_context; - if (RecoveryInProgress()) - ereport(FATAL, - (errcode(ERRCODE_CANNOT_CONNECT_NOW), - errmsg("recovery is still in progress, can't accept WAL streaming connections"))); + am_cascading_walsender = RecoveryInProgress(); /* Create a per-walsender data structure in shared memory */ InitWalSnd(); @@ -165,6 +164,12 @@ WalSenderMain(void) /* Unblock signals (they were blocked when the postmaster forked us) */ PG_SETMASK(&UnBlockSig); + /* + * Use the recovery target timeline ID during recovery + */ + if (am_cascading_walsender) + ThisTimeLineID = GetRecoveryTargetTLI(); + /* Tell the standby that walsender is ready for receiving commands */ ReadyForQuery(DestRemote); @@ -290,7 +295,7 @@ IdentifySystem(void) GetSystemIdentifier()); snprintf(tli, sizeof(tli), "%u", ThisTimeLineID); - logptr = GetInsertRecPtr(); + logptr = am_cascading_walsender ? GetStandbyFlushRecPtr() : GetInsertRecPtr(); snprintf(xpos, sizeof(xpos), "%X/%X", logptr.xlogid, logptr.xrecoff); @@ -364,19 +369,13 @@ StartReplication(StartReplicationCmd *cmd) SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE); /* - * Check that we're logging enough information in the WAL for - * log-shipping. + * We assume here that we're logging enough information in the WAL for + * log-shipping, since this is checked in PostmasterMain(). * - * NOTE: This only checks the current value of wal_level. Even if the - * current setting is not 'minimal', there can be old WAL in the pg_xlog - * directory that was created with 'minimal'. So this is not bulletproof, - * the purpose is just to give a user-friendly error message that hints - * how to configure the system correctly. + * NOTE: wal_level can only change at shutdown, so in most cases it is + * difficult for there to be WAL data that we can still see that was written + * at wal_level='minimal'. */ - if (wal_level == WAL_LEVEL_MINIMAL) - ereport(FATAL, - (errcode(ERRCODE_CANNOT_CONNECT_NOW), - errmsg("standby connections not allowed because wal_level=minimal"))); /* * When we first start replication the standby will be behind the primary. @@ -601,7 +600,8 @@ ProcessStandbyReplyMessage(void) SpinLockRelease(&walsnd->mutex); } - SyncRepReleaseWaiters(); + if (!am_cascading_walsender) + SyncRepReleaseWaiters(); } /* @@ -764,6 +764,8 @@ WalSndLoop(void) /* * When SIGUSR2 arrives, we send any outstanding logs up to the * shutdown checkpoint record (i.e., the latest record) and exit. + * This may be a normal termination at shutdown, or a promotion, + * the walsender is not sure which. */ if (walsender_ready_to_stop && !pq_is_send_pending()) { @@ -933,7 +935,7 @@ WalSndKill(int code, Datum arg) } /* - * Read 'nbytes' bytes from WAL into 'buf', starting at location 'recptr' + * Read 'count' bytes from WAL into 'buf', starting at location 'startptr' * * XXX probably this should be improved to suck data directly from the * WAL buffers when possible. @@ -944,15 +946,21 @@ WalSndKill(int code, Datum arg) * more than one. */ void -XLogRead(char *buf, XLogRecPtr recptr, Size nbytes) +XLogRead(char *buf, XLogRecPtr startptr, Size count) { - XLogRecPtr startRecPtr = recptr; - char path[MAXPGPATH]; + char *p; + XLogRecPtr recptr; + Size nbytes; uint32 lastRemovedLog; uint32 lastRemovedSeg; uint32 log; uint32 seg; +retry: + p = buf; + recptr = startptr; + nbytes = count; + while (nbytes > 0) { uint32 startoff; @@ -963,6 +971,8 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes) if (sendFile < 0 || !XLByteInSeg(recptr, sendId, sendSeg)) { + char path[MAXPGPATH]; + /* Switch to another logfile segment */ if (sendFile >= 0) close(sendFile); @@ -1014,7 +1024,7 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes) else segbytes = nbytes; - readbytes = read(sendFile, buf, segbytes); + readbytes = read(sendFile, p, segbytes); if (readbytes <= 0) ereport(ERROR, (errcode_for_file_access(), @@ -1027,7 +1037,7 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes) sendOff += readbytes; nbytes -= readbytes; - buf += readbytes; + p += readbytes; } /* @@ -1038,7 +1048,7 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes) * already have been overwritten with new WAL records. */ XLogGetLastRemoved(&lastRemovedLog, &lastRemovedSeg); - XLByteToSeg(startRecPtr, log, seg); + XLByteToSeg(startptr, log, seg); if (log < lastRemovedLog || (log == lastRemovedLog && seg <= lastRemovedSeg)) { @@ -1050,6 +1060,32 @@ XLogRead(char *buf, XLogRecPtr recptr, Size nbytes) errmsg("requested WAL segment %s has already been removed", filename))); } + + /* + * During recovery, the currently-open WAL file might be replaced with + * the file of the same name retrieved from archive. So we always need + * to check what we read was valid after reading into the buffer. If it's + * invalid, we try to open and read the file again. + */ + if (am_cascading_walsender) + { + /* use volatile pointer to prevent code rearrangement */ + volatile WalSnd *walsnd = MyWalSnd; + bool reload; + + SpinLockAcquire(&walsnd->mutex); + reload = walsnd->needreload; + walsnd->needreload = false; + SpinLockRelease(&walsnd->mutex); + + if (reload && sendFile >= 0) + { + close(sendFile); + sendFile = -1; + + goto retry; + } + } } /* @@ -1082,7 +1118,7 @@ XLogSend(char *msgbuf, bool *caughtup) * subsequently crashes and restarts, slaves must not have applied any WAL * that gets lost on the master. */ - SendRqstPtr = GetFlushRecPtr(); + SendRqstPtr = am_cascading_walsender ? GetStandbyFlushRecPtr() : GetFlushRecPtr(); /* Quick exit if nothing to do */ if (XLByteLE(SendRqstPtr, sentPtr)) @@ -1187,6 +1223,28 @@ XLogSend(char *msgbuf, bool *caughtup) return; } +/* + * Request walsenders to reload the currently-open WAL file + */ +void +WalSndRqstFileReload(void) +{ + int i; + + for (i = 0; i < max_wal_senders; i++) + { + /* use volatile pointer to prevent code rearrangement */ + volatile WalSnd *walsnd = &WalSndCtl->walsnds[i]; + + if (walsnd->pid == 0) + continue; + + SpinLockAcquire(&walsnd->mutex); + walsnd->needreload = true; + SpinLockRelease(&walsnd->mutex); + } +} + /* SIGHUP: set flag to re-read config file at next convenient time */ static void WalSndSigHupHandler(SIGNAL_ARGS) |