diff options
Diffstat (limited to 'src/backend/replication/walreceiver.c')
-rw-r--r-- | src/backend/replication/walreceiver.c | 182 |
1 files changed, 59 insertions, 123 deletions
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index f805e673e11..4a5ba5b4263 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -29,7 +29,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.1 2010/01/20 09:16:24 heikki Exp $ + * $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.2 2010/01/27 15:27:51 heikki Exp $ * *------------------------------------------------------------------------- */ @@ -134,8 +134,7 @@ static void WalRcvShutdownHandler(SIGNAL_ARGS); static void WalRcvQuickDieHandler(SIGNAL_ARGS); /* Prototypes for private functions */ -static void InitWalRcv(void); -static void WalRcvKill(int code, Datum arg); +static void WalRcvDie(int code, Datum arg); static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr); static void XLogWalRcvFlush(void); @@ -153,21 +152,57 @@ static struct void WalReceiverMain(void) { - sigjmp_buf local_sigjmp_buf; - MemoryContext walrcv_context; char conninfo[MAXCONNINFO]; XLogRecPtr startpoint; /* use volatile pointer to prevent code rearrangement */ volatile WalRcvData *walrcv = WalRcv; - /* Load the libpq-specific functions */ - load_file("libpqwalreceiver", false); - if (walrcv_connect == NULL || walrcv_receive == NULL || - walrcv_disconnect == NULL) - elog(ERROR, "libpqwalreceiver didn't initialize correctly"); + /* + * WalRcv should be set up already (if we are a backend, we inherit + * this by fork() or EXEC_BACKEND mechanism from the postmaster). + */ + Assert(walrcv != NULL); + + /* + * Mark walreceiver as running in shared memory. + * + * Do this as early as possible, so that if we fail later on, we'll + * set state to STOPPED. If we die before this, the startup process + * will keep waiting for us to start up, until it times out. + */ + SpinLockAcquire(&walrcv->mutex); + Assert(walrcv->pid == 0); + switch(walrcv->walRcvState) + { + case WALRCV_STOPPING: + /* If we've already been requested to stop, don't start up. */ + walrcv->walRcvState = WALRCV_STOPPED; + /* fall through */ + + case WALRCV_STOPPED: + SpinLockRelease(&walrcv->mutex); + proc_exit(1); + break; + + case WALRCV_STARTING: + /* The usual case */ + break; + + case WALRCV_RUNNING: + /* Shouldn't happen */ + elog(PANIC, "walreceiver still running according to shared memory state"); + } + /* Advertise our PID so that the startup process can kill us */ + walrcv->pid = MyProcPid; + walrcv->walRcvState = WALRCV_RUNNING; + + /* Fetch information required to start streaming */ + strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO); + startpoint = walrcv->receivedUpto; + SpinLockRelease(&walrcv->mutex); - /* Mark walreceiver in progress */ - InitWalRcv(); + /* Arrange to clean up at walreceiver exit */ + on_shmem_exit(WalRcvDie, 0); /* * If possible, make this process a group leader, so that the postmaster @@ -200,81 +235,21 @@ WalReceiverMain(void) /* We allow SIGQUIT (quickdie) at all times */ sigdelset(&BlockSig, SIGQUIT); + /* Load the libpq-specific functions */ + load_file("libpqwalreceiver", false); + if (walrcv_connect == NULL || walrcv_receive == NULL || + walrcv_disconnect == NULL) + elog(ERROR, "libpqwalreceiver didn't initialize correctly"); + /* * Create a resource owner to keep track of our resources (not clear that * we need this, but may as well have one). */ CurrentResourceOwner = ResourceOwnerCreate(NULL, "Wal Receiver"); - /* - * Create a memory context that we will do all our work in. We do this so - * that we can reset the context during error recovery and thereby avoid - * possible memory leaks. - */ - walrcv_context = AllocSetContextCreate(TopMemoryContext, - "Wal Receiver", - ALLOCSET_DEFAULT_MINSIZE, - ALLOCSET_DEFAULT_INITSIZE, - ALLOCSET_DEFAULT_MAXSIZE); - MemoryContextSwitchTo(walrcv_context); - - /* - * If an exception is encountered, processing resumes here. - * - * This code is heavily based on bgwriter.c, q.v. - */ - if (sigsetjmp(local_sigjmp_buf, 1) != 0) - { - /* Since not using PG_TRY, must reset error stack by hand */ - error_context_stack = NULL; - - /* Reset WalRcvImmediateInterruptOK */ - DisableWalRcvImmediateExit(); - - /* Prevent interrupts while cleaning up */ - HOLD_INTERRUPTS(); - - /* Report the error to the server log */ - EmitErrorReport(); - - /* Disconnect any previous connection. */ - EnableWalRcvImmediateExit(); - walrcv_disconnect(); - DisableWalRcvImmediateExit(); - - /* - * Now return to normal top-level context and clear ErrorContext for - * next time. - */ - MemoryContextSwitchTo(walrcv_context); - FlushErrorState(); - - /* Flush any leaked data in the top-level context */ - MemoryContextResetAndDeleteChildren(walrcv_context); - - /* Now we can allow interrupts again */ - RESUME_INTERRUPTS(); - - /* - * Sleep at least 1 second after any error. A write error is likely - * to be repeated, and we don't want to be filling the error logs as - * fast as we can. - */ - pg_usleep(1000000L); - } - - /* We can now handle ereport(ERROR) */ - PG_exception_stack = &local_sigjmp_buf; - /* Unblock signals (they were blocked when the postmaster forked us) */ PG_SETMASK(&UnBlockSig); - /* Fetch connection information from shared memory */ - SpinLockAcquire(&walrcv->mutex); - strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO); - startpoint = walrcv->receivedUpto; - SpinLockRelease(&walrcv->mutex); - /* Establish the connection to the primary for XLOG streaming */ EnableWalRcvImmediateExit(); walrcv_connect(conninfo, startpoint); @@ -330,63 +305,24 @@ WalReceiverMain(void) } } -/* Advertise our pid in shared memory, so that startup process can kill us. */ -static void -InitWalRcv(void) -{ - /* use volatile pointer to prevent code rearrangement */ - volatile WalRcvData *walrcv = WalRcv; - - /* - * WalRcv should be set up already (if we are a backend, we inherit - * this by fork() or EXEC_BACKEND mechanism from the postmaster). - */ - if (walrcv == NULL) - elog(PANIC, "walreceiver control data uninitialized"); - - /* If we've already been requested to stop, don't start up */ - SpinLockAcquire(&walrcv->mutex); - Assert(walrcv->pid == 0); - if (walrcv->walRcvState == WALRCV_STOPPED || - walrcv->walRcvState == WALRCV_STOPPING) - { - walrcv->walRcvState = WALRCV_STOPPED; - SpinLockRelease(&walrcv->mutex); - proc_exit(1); - } - walrcv->pid = MyProcPid; - SpinLockRelease(&walrcv->mutex); - - /* Arrange to clean up at walreceiver exit */ - on_shmem_exit(WalRcvKill, 0); -} - /* - * Clear our pid from shared memory at exit. + * Mark us as STOPPED in shared memory at exit. */ static void -WalRcvKill(int code, Datum arg) +WalRcvDie(int code, Datum arg) { /* use volatile pointer to prevent code rearrangement */ volatile WalRcvData *walrcv = WalRcv; - bool stopped = false; SpinLockAcquire(&walrcv->mutex); - if (walrcv->walRcvState == WALRCV_STOPPING || - walrcv->walRcvState == WALRCV_STOPPED) - { - walrcv->walRcvState = WALRCV_STOPPED; - stopped = true; - elog(LOG, "walreceiver stopped"); - } + Assert(walrcv->walRcvState == WALRCV_RUNNING || + walrcv->walRcvState == WALRCV_STOPPING); + walrcv->walRcvState = WALRCV_STOPPED; walrcv->pid = 0; SpinLockRelease(&walrcv->mutex); + /* Terminate the connection gracefully. */ walrcv_disconnect(); - - /* If requested to stop, tell postmaster to not restart us. */ - if (stopped) - SendPostmasterSignal(PMSIGNAL_SHUTDOWN_WALRECEIVER); } /* SIGHUP: set flag to re-read config file at next convenient time */ |