diff options
Diffstat (limited to 'src/backend/replication/walsender.c')
-rw-r--r-- | src/backend/replication/walsender.c | 269 |
1 files changed, 59 insertions, 210 deletions
diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index cc27848318b..0ba2ad44140 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -78,6 +78,8 @@ bool am_walsender = false; /* Am I a walsender process ? */ bool am_cascading_walsender = false; /* Am I cascading WAL to * another standby ? */ +static bool replication_started = false; /* Started streaming yet? */ + /* User-settable parameters for walsender */ int max_wal_senders = 0; /* the maximum number of concurrent walsenders */ int replication_timeout = 60 * 1000; /* maximum time to send one @@ -113,21 +115,16 @@ static TimestampTz last_reply_timestamp; /* Flags set by signal handlers for later service in main loop */ static volatile sig_atomic_t got_SIGHUP = false; -volatile sig_atomic_t walsender_shutdown_requested = false; volatile sig_atomic_t walsender_ready_to_stop = false; /* Signal handlers */ static void WalSndSigHupHandler(SIGNAL_ARGS); -static void WalSndShutdownHandler(SIGNAL_ARGS); -static void WalSndQuickDieHandler(SIGNAL_ARGS); static void WalSndXLogSendHandler(SIGNAL_ARGS); static void WalSndLastCycleHandler(SIGNAL_ARGS); /* Prototypes for private functions */ -static bool HandleReplicationCommand(const char *cmd_string); static void WalSndLoop(void) __attribute__((noreturn)); -static void InitWalSnd(void); -static void WalSndHandshake(void); +static void InitWalSenderSlot(void); static void WalSndKill(int code, Datum arg); static void XLogSend(char *msgbuf, bool *caughtup); static void IdentifySystem(void); @@ -139,147 +136,48 @@ static void ProcessRepliesIfAny(void); static void WalSndKeepalive(char *msgbuf); -/* Main entry point for walsender process */ +/* Initialize walsender process before entering the main command loop */ void -WalSenderMain(void) +InitWalSender(void) { - MemoryContext walsnd_context; - am_cascading_walsender = RecoveryInProgress(); /* Create a per-walsender data structure in shared memory */ - InitWalSnd(); - - /* - * Create a memory context that we will do all our work in. We do this so - * that we can reset the context during error recovery and thereby avoid - * possible memory leaks. Formerly this code just ran in - * TopMemoryContext, but resetting that would be a really bad idea. - * - * XXX: we don't actually attempt error recovery in walsender, we just - * close the connection and exit. - */ - walsnd_context = AllocSetContextCreate(TopMemoryContext, - "Wal Sender", - ALLOCSET_DEFAULT_MINSIZE, - ALLOCSET_DEFAULT_INITSIZE, - ALLOCSET_DEFAULT_MAXSIZE); - MemoryContextSwitchTo(walsnd_context); + InitWalSenderSlot(); /* Set up resource owner */ CurrentResourceOwner = ResourceOwnerCreate(NULL, "walsender top-level resource owner"); - /* Unblock signals (they were blocked when the postmaster forked us) */ - PG_SETMASK(&UnBlockSig); - /* * Use the recovery target timeline ID during recovery */ if (am_cascading_walsender) ThisTimeLineID = GetRecoveryTargetTLI(); - - /* Tell the standby that walsender is ready for receiving commands */ - ReadyForQuery(DestRemote); - - /* Handle handshake messages before streaming */ - WalSndHandshake(); - - /* Initialize shared memory status */ - { - /* use volatile pointer to prevent code rearrangement */ - volatile WalSnd *walsnd = MyWalSnd; - - SpinLockAcquire(&walsnd->mutex); - walsnd->sentPtr = sentPtr; - SpinLockRelease(&walsnd->mutex); - } - - SyncRepInitConfig(); - - /* Main loop of walsender */ - WalSndLoop(); } /* - * Execute commands from walreceiver, until we enter streaming mode. + * Clean up after an error. + * + * WAL sender processes don't use transactions like regular backends do. + * This function does any cleanup requited after an error in a WAL sender + * process, similar to what transaction abort does in a regular backend. */ -static void -WalSndHandshake(void) +void +WalSndErrorCleanup() { - StringInfoData input_message; - bool replication_started = false; - - initStringInfo(&input_message); - - while (!replication_started) + if (sendFile >= 0) { - int firstchar; - - WalSndSetState(WALSNDSTATE_STARTUP); - set_ps_display("idle", false); - - /* Wait for a command to arrive */ - firstchar = pq_getbyte(); - - /* - * Emergency bailout if postmaster has died. This is to avoid the - * necessity for manual cleanup of all postmaster children. - */ - if (!PostmasterIsAlive()) - exit(1); - - /* - * Check for any other interesting events that happened while we - * slept. - */ - if (got_SIGHUP) - { - got_SIGHUP = false; - ProcessConfigFile(PGC_SIGHUP); - } - - if (firstchar != EOF) - { - /* - * Read the message contents. This is expected to be done without - * blocking because we've been able to get message type code. - */ - if (pq_getmessage(&input_message, 0)) - firstchar = EOF; /* suitable message already logged */ - } - - /* Handle the very limited subset of commands expected in this phase */ - switch (firstchar) - { - case 'Q': /* Query message */ - { - const char *query_string; - - query_string = pq_getmsgstring(&input_message); - pq_getmsgend(&input_message); - - if (HandleReplicationCommand(query_string)) - replication_started = true; - } - break; - - case 'X': - /* standby is closing the connection */ - proc_exit(0); - - case EOF: - /* standby disconnected unexpectedly */ - ereport(COMMERROR, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg("unexpected EOF on standby connection"))); - proc_exit(0); - - default: - ereport(FATAL, - (errcode(ERRCODE_PROTOCOL_VIOLATION), - errmsg("invalid standby handshake message type %d", firstchar))); - } + close(sendFile); + sendFile = -1; } + + /* + * Don't return back to the command loop after we've started replicating. + * We've already marked us as an actively streaming WAL sender in the + * PMSignal slot, and there's currently no way to undo that. + */ + if (replication_started) + proc_exit(0); } /* @@ -350,15 +248,13 @@ IdentifySystem(void) pq_sendbytes(&buf, (char *) xpos, strlen(xpos)); pq_endmessage(&buf); - - /* Send CommandComplete and ReadyForQuery messages */ - EndCommand("SELECT", DestRemote); - ReadyForQuery(DestRemote); - /* ReadyForQuery did pq_flush for us */ } /* - * START_REPLICATION + * Handle START_REPLICATION command. + * + * At the moment, this never returns, but an ereport(ERROR) will take us back + * to the main loop. */ static void StartReplication(StartReplicationCmd *cmd) @@ -374,6 +270,7 @@ StartReplication(StartReplicationCmd *cmd) */ MarkPostmasterChildWalSender(); SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE); + replication_started = true; /* * When promoting a cascading standby, postmaster sends SIGUSR2 to any @@ -435,15 +332,29 @@ StartReplication(StartReplicationCmd *cmd) * be shipped from that position */ sentPtr = cmd->startpoint; + + /* Also update the start position status in shared memory */ + { + /* use volatile pointer to prevent code rearrangement */ + volatile WalSnd *walsnd = MyWalSnd; + + SpinLockAcquire(&walsnd->mutex); + walsnd->sentPtr = sentPtr; + SpinLockRelease(&walsnd->mutex); + } + + SyncRepInitConfig(); + + /* Main loop of walsender */ + WalSndLoop(); } /* * Execute an incoming replication command. */ -static bool -HandleReplicationCommand(const char *cmd_string) +void +exec_replication_command(const char *cmd_string) { - bool replication_started = false; int parse_rc; Node *cmd_node; MemoryContext cmd_context; @@ -451,6 +362,8 @@ HandleReplicationCommand(const char *cmd_string) elog(DEBUG1, "received replication command: %s", cmd_string); + CHECK_FOR_INTERRUPTS(); + cmd_context = AllocSetContextCreate(CurrentMemoryContext, "Replication command context", ALLOCSET_DEFAULT_MINSIZE, @@ -476,18 +389,10 @@ HandleReplicationCommand(const char *cmd_string) case T_StartReplicationCmd: StartReplication((StartReplicationCmd *) cmd_node); - - /* break out of the loop */ - replication_started = true; break; case T_BaseBackupCmd: SendBaseBackup((BaseBackupCmd *) cmd_node); - - /* Send CommandComplete and ReadyForQuery messages */ - EndCommand("SELECT", DestRemote); - ReadyForQuery(DestRemote); - /* ReadyForQuery did pq_flush for us */ break; default: @@ -500,7 +405,8 @@ HandleReplicationCommand(const char *cmd_string) MemoryContextSwitchTo(old_context); MemoryContextDelete(cmd_context); - return replication_started; + /* Send CommandComplete message */ + EndCommand("SELECT", DestRemote); } /* @@ -710,7 +616,7 @@ ProcessStandbyHSFeedbackMessage(void) MyPgXact->xmin = reply.xmin; } -/* Main loop of walsender process */ +/* Main loop of walsender process that streams the WAL over Copy messages. */ static void WalSndLoop(void) { @@ -754,15 +660,7 @@ WalSndLoop(void) SyncRepInitConfig(); } - /* Normal exit from the walsender is here */ - if (walsender_shutdown_requested) - { - /* Inform the standby that XLOG streaming is done */ - pq_puttextmessage('C', "COPY 0"); - pq_flush(); - - proc_exit(0); - } + CHECK_FOR_INTERRUPTS(); /* Check for input from the client */ ProcessRepliesIfAny(); @@ -813,7 +711,7 @@ WalSndLoop(void) XLogSend(output_message, &caughtup); if (caughtup && !pq_is_send_pending()) { - walsender_shutdown_requested = true; + ProcDiePending = true; continue; /* don't want to wait more */ } } @@ -854,8 +752,11 @@ WalSndLoop(void) } /* Sleep until something happens or replication timeout */ + ImmediateInterruptOK = true; + CHECK_FOR_INTERRUPTS(); WaitLatchOrSocket(&MyWalSnd->latch, wakeEvents, MyProcPort->sock, sleeptime); + ImmediateInterruptOK = false; /* * Check for replication timeout. Note we ignore the corner case @@ -892,7 +793,7 @@ WalSndLoop(void) /* Initialize a per-walsender data structure for this walsender process */ static void -InitWalSnd(void) +InitWalSenderSlot(void) { int i; @@ -1284,58 +1185,6 @@ WalSndSigHupHandler(SIGNAL_ARGS) errno = save_errno; } -/* SIGTERM: set flag to shut down */ -static void -WalSndShutdownHandler(SIGNAL_ARGS) -{ - int save_errno = errno; - - walsender_shutdown_requested = true; - if (MyWalSnd) - SetLatch(&MyWalSnd->latch); - - /* - * Set the standard (non-walsender) state as well, so that we can abort - * things like do_pg_stop_backup(). - */ - InterruptPending = true; - ProcDiePending = true; - - errno = save_errno; -} - -/* - * WalSndQuickDieHandler() occurs when signalled SIGQUIT by the postmaster. - * - * Some backend has bought the farm, - * so we need to stop what we're doing and exit. - */ -static void -WalSndQuickDieHandler(SIGNAL_ARGS) -{ - PG_SETMASK(&BlockSig); - - /* - * We DO NOT want to run proc_exit() callbacks -- we're here because - * shared memory may be corrupted, so we don't want to try to clean up our - * transaction. Just nail the windows shut and get out of town. Now that - * there's an atexit callback to prevent third-party code from breaking - * things by calling exit() directly, we have to reset the callbacks - * explicitly to make this work as intended. - */ - on_exit_reset(); - - /* - * Note we do exit(2) not exit(0). This is to force the postmaster into a - * system reset cycle if some idiot DBA sends a manual SIGQUIT to a random - * backend. This is necessary precisely because we don't clean up our - * shared memory state. (The "dead man switch" mechanism in pmsignal.c - * should ensure the postmaster sees this as a crash, too, but no harm in - * being doubly sure.) - */ - exit(2); -} - /* SIGUSR1: set flag to send WAL records */ static void WalSndXLogSendHandler(SIGNAL_ARGS) @@ -1368,8 +1217,8 @@ WalSndSignals(void) pqsignal(SIGHUP, WalSndSigHupHandler); /* set flag to read config * file */ pqsignal(SIGINT, SIG_IGN); /* not used */ - pqsignal(SIGTERM, WalSndShutdownHandler); /* request shutdown */ - pqsignal(SIGQUIT, WalSndQuickDieHandler); /* hard crash time */ + pqsignal(SIGTERM, die); /* request shutdown */ + pqsignal(SIGQUIT, quickdie); /* hard crash time */ InitializeTimeouts(); /* establishes SIGALRM handler */ pqsignal(SIGPIPE, SIG_IGN); pqsignal(SIGUSR1, WalSndXLogSendHandler); /* request WAL sending */ |