aboutsummaryrefslogtreecommitdiff
path: root/src/backend/replication/walreceiver.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/backend/replication/walreceiver.c')
-rw-r--r--src/backend/replication/walreceiver.c182
1 files changed, 59 insertions, 123 deletions
diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c
index f805e673e11..4a5ba5b4263 100644
--- a/src/backend/replication/walreceiver.c
+++ b/src/backend/replication/walreceiver.c
@@ -29,7 +29,7 @@
*
*
* IDENTIFICATION
- * $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.1 2010/01/20 09:16:24 heikki Exp $
+ * $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.2 2010/01/27 15:27:51 heikki Exp $
*
*-------------------------------------------------------------------------
*/
@@ -134,8 +134,7 @@ static void WalRcvShutdownHandler(SIGNAL_ARGS);
static void WalRcvQuickDieHandler(SIGNAL_ARGS);
/* Prototypes for private functions */
-static void InitWalRcv(void);
-static void WalRcvKill(int code, Datum arg);
+static void WalRcvDie(int code, Datum arg);
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
static void XLogWalRcvFlush(void);
@@ -153,21 +152,57 @@ static struct
void
WalReceiverMain(void)
{
- sigjmp_buf local_sigjmp_buf;
- MemoryContext walrcv_context;
char conninfo[MAXCONNINFO];
XLogRecPtr startpoint;
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
- /* Load the libpq-specific functions */
- load_file("libpqwalreceiver", false);
- if (walrcv_connect == NULL || walrcv_receive == NULL ||
- walrcv_disconnect == NULL)
- elog(ERROR, "libpqwalreceiver didn't initialize correctly");
+ /*
+ * WalRcv should be set up already (if we are a backend, we inherit
+ * this by fork() or EXEC_BACKEND mechanism from the postmaster).
+ */
+ Assert(walrcv != NULL);
+
+ /*
+ * Mark walreceiver as running in shared memory.
+ *
+ * Do this as early as possible, so that if we fail later on, we'll
+ * set state to STOPPED. If we die before this, the startup process
+ * will keep waiting for us to start up, until it times out.
+ */
+ SpinLockAcquire(&walrcv->mutex);
+ Assert(walrcv->pid == 0);
+ switch(walrcv->walRcvState)
+ {
+ case WALRCV_STOPPING:
+ /* If we've already been requested to stop, don't start up. */
+ walrcv->walRcvState = WALRCV_STOPPED;
+ /* fall through */
+
+ case WALRCV_STOPPED:
+ SpinLockRelease(&walrcv->mutex);
+ proc_exit(1);
+ break;
+
+ case WALRCV_STARTING:
+ /* The usual case */
+ break;
+
+ case WALRCV_RUNNING:
+ /* Shouldn't happen */
+ elog(PANIC, "walreceiver still running according to shared memory state");
+ }
+ /* Advertise our PID so that the startup process can kill us */
+ walrcv->pid = MyProcPid;
+ walrcv->walRcvState = WALRCV_RUNNING;
+
+ /* Fetch information required to start streaming */
+ strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
+ startpoint = walrcv->receivedUpto;
+ SpinLockRelease(&walrcv->mutex);
- /* Mark walreceiver in progress */
- InitWalRcv();
+ /* Arrange to clean up at walreceiver exit */
+ on_shmem_exit(WalRcvDie, 0);
/*
* If possible, make this process a group leader, so that the postmaster
@@ -200,81 +235,21 @@ WalReceiverMain(void)
/* We allow SIGQUIT (quickdie) at all times */
sigdelset(&BlockSig, SIGQUIT);
+ /* Load the libpq-specific functions */
+ load_file("libpqwalreceiver", false);
+ if (walrcv_connect == NULL || walrcv_receive == NULL ||
+ walrcv_disconnect == NULL)
+ elog(ERROR, "libpqwalreceiver didn't initialize correctly");
+
/*
* Create a resource owner to keep track of our resources (not clear that
* we need this, but may as well have one).
*/
CurrentResourceOwner = ResourceOwnerCreate(NULL, "Wal Receiver");
- /*
- * Create a memory context that we will do all our work in. We do this so
- * that we can reset the context during error recovery and thereby avoid
- * possible memory leaks.
- */
- walrcv_context = AllocSetContextCreate(TopMemoryContext,
- "Wal Receiver",
- ALLOCSET_DEFAULT_MINSIZE,
- ALLOCSET_DEFAULT_INITSIZE,
- ALLOCSET_DEFAULT_MAXSIZE);
- MemoryContextSwitchTo(walrcv_context);
-
- /*
- * If an exception is encountered, processing resumes here.
- *
- * This code is heavily based on bgwriter.c, q.v.
- */
- if (sigsetjmp(local_sigjmp_buf, 1) != 0)
- {
- /* Since not using PG_TRY, must reset error stack by hand */
- error_context_stack = NULL;
-
- /* Reset WalRcvImmediateInterruptOK */
- DisableWalRcvImmediateExit();
-
- /* Prevent interrupts while cleaning up */
- HOLD_INTERRUPTS();
-
- /* Report the error to the server log */
- EmitErrorReport();
-
- /* Disconnect any previous connection. */
- EnableWalRcvImmediateExit();
- walrcv_disconnect();
- DisableWalRcvImmediateExit();
-
- /*
- * Now return to normal top-level context and clear ErrorContext for
- * next time.
- */
- MemoryContextSwitchTo(walrcv_context);
- FlushErrorState();
-
- /* Flush any leaked data in the top-level context */
- MemoryContextResetAndDeleteChildren(walrcv_context);
-
- /* Now we can allow interrupts again */
- RESUME_INTERRUPTS();
-
- /*
- * Sleep at least 1 second after any error. A write error is likely
- * to be repeated, and we don't want to be filling the error logs as
- * fast as we can.
- */
- pg_usleep(1000000L);
- }
-
- /* We can now handle ereport(ERROR) */
- PG_exception_stack = &local_sigjmp_buf;
-
/* Unblock signals (they were blocked when the postmaster forked us) */
PG_SETMASK(&UnBlockSig);
- /* Fetch connection information from shared memory */
- SpinLockAcquire(&walrcv->mutex);
- strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
- startpoint = walrcv->receivedUpto;
- SpinLockRelease(&walrcv->mutex);
-
/* Establish the connection to the primary for XLOG streaming */
EnableWalRcvImmediateExit();
walrcv_connect(conninfo, startpoint);
@@ -330,63 +305,24 @@ WalReceiverMain(void)
}
}
-/* Advertise our pid in shared memory, so that startup process can kill us. */
-static void
-InitWalRcv(void)
-{
- /* use volatile pointer to prevent code rearrangement */
- volatile WalRcvData *walrcv = WalRcv;
-
- /*
- * WalRcv should be set up already (if we are a backend, we inherit
- * this by fork() or EXEC_BACKEND mechanism from the postmaster).
- */
- if (walrcv == NULL)
- elog(PANIC, "walreceiver control data uninitialized");
-
- /* If we've already been requested to stop, don't start up */
- SpinLockAcquire(&walrcv->mutex);
- Assert(walrcv->pid == 0);
- if (walrcv->walRcvState == WALRCV_STOPPED ||
- walrcv->walRcvState == WALRCV_STOPPING)
- {
- walrcv->walRcvState = WALRCV_STOPPED;
- SpinLockRelease(&walrcv->mutex);
- proc_exit(1);
- }
- walrcv->pid = MyProcPid;
- SpinLockRelease(&walrcv->mutex);
-
- /* Arrange to clean up at walreceiver exit */
- on_shmem_exit(WalRcvKill, 0);
-}
-
/*
- * Clear our pid from shared memory at exit.
+ * Mark us as STOPPED in shared memory at exit.
*/
static void
-WalRcvKill(int code, Datum arg)
+WalRcvDie(int code, Datum arg)
{
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
- bool stopped = false;
SpinLockAcquire(&walrcv->mutex);
- if (walrcv->walRcvState == WALRCV_STOPPING ||
- walrcv->walRcvState == WALRCV_STOPPED)
- {
- walrcv->walRcvState = WALRCV_STOPPED;
- stopped = true;
- elog(LOG, "walreceiver stopped");
- }
+ Assert(walrcv->walRcvState == WALRCV_RUNNING ||
+ walrcv->walRcvState == WALRCV_STOPPING);
+ walrcv->walRcvState = WALRCV_STOPPED;
walrcv->pid = 0;
SpinLockRelease(&walrcv->mutex);
+ /* Terminate the connection gracefully. */
walrcv_disconnect();
-
- /* If requested to stop, tell postmaster to not restart us. */
- if (stopped)
- SendPostmasterSignal(PMSIGNAL_SHUTDOWN_WALRECEIVER);
}
/* SIGHUP: set flag to re-read config file at next convenient time */