diff options
author | Magnus Hagander <magnus@hagander.net> | 2010-04-19 14:10:45 +0000 |
---|---|---|
committer | Magnus Hagander <magnus@hagander.net> | 2010-04-19 14:10:45 +0000 |
commit | 03a571a4cf26e06ff504e5b38a9432a003008c19 (patch) | |
tree | ba38fedbaab8536b690d2eba5442925cc3f1729f /src | |
parent | 5b89ef384c7719478bb08b0c771dcbfdc51d507e (diff) | |
download | postgresql-03a571a4cf26e06ff504e5b38a9432a003008c19.tar.gz postgresql-03a571a4cf26e06ff504e5b38a9432a003008c19.zip |
Add wrapper function libpqrcv_PQexec() in the walreceiver that uses async
libpq to send queries, making the waiting for responses interruptible on
platforms where PQexec() can't normally be interrupted by signals, such
as win32.
Fujii Masao and Magnus Hagander
Diffstat (limited to 'src')
-rw-r--r-- | src/backend/replication/libpqwalreceiver/libpqwalreceiver.c | 88 | ||||
-rw-r--r-- | src/backend/replication/walreceiver.c | 6 |
2 files changed, 88 insertions, 6 deletions
diff --git a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c index 9e318e669d0..d41858e49a2 100644 --- a/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c +++ b/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c @@ -10,7 +10,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c,v 1.8 2010/03/21 00:17:58 petere Exp $ + * $PostgreSQL: pgsql/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c,v 1.9 2010/04/19 14:10:45 mha Exp $ * *------------------------------------------------------------------------- */ @@ -54,6 +54,7 @@ static void libpqrcv_disconnect(void); /* Prototypes for private functions */ static bool libpq_select(int timeout_ms); +static PGresult *libpqrcv_PQexec(const char *query); /* * Module load callback @@ -97,7 +98,7 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint) * Get the system identifier and timeline ID as a DataRow message from the * primary server. */ - res = PQexec(streamConn, "IDENTIFY_SYSTEM"); + res = libpqrcv_PQexec("IDENTIFY_SYSTEM"); if (PQresultStatus(res) != PGRES_TUPLES_OK) { PQclear(res); @@ -149,11 +150,14 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint) /* Start streaming from the point requested by startup process */ snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X", startpoint.xlogid, startpoint.xrecoff); - res = PQexec(streamConn, cmd); + res = libpqrcv_PQexec(cmd); if (PQresultStatus(res) != PGRES_COPY_OUT) + { + PQclear(res); ereport(ERROR, (errmsg("could not start WAL streaming: %s", PQerrorMessage(streamConn)))); + } PQclear(res); justconnected = true; @@ -225,6 +229,84 @@ libpq_select(int timeout_ms) } /* + * Send a query and wait for the results by using the asynchronous libpq + * functions and the backend version of select(). + * + * We must not use the regular blocking libpq functions like PQexec() + * since they are uninterruptible by signals on some platforms, such as + * Windows. + * + * We must also not use vanilla select() here since it cannot handle the + * signal emulation layer on Windows. + * + * The function is modeled on PQexec() in libpq, but only implements + * those parts that are in use in the walreceiver. + * + * Queries are always executed on the connection in streamConn. + */ +static PGresult * +libpqrcv_PQexec(const char *query) +{ + PGresult *result = NULL; + PGresult *lastResult = NULL; + + /* + * PQexec() silently discards any prior query results on the + * connection. This is not required for walreceiver since it's + * expected that walsender won't generate any such junk results. + */ + + /* + * Submit a query. Since we don't use non-blocking mode, this also + * can block. But its risk is relatively small, so we ignore that + * for now. + */ + if (!PQsendQuery(streamConn, query)) + return NULL; + + for (;;) + { + /* + * Receive data until PQgetResult is ready to get the result + * without blocking. + */ + while (PQisBusy(streamConn)) + { + /* + * We don't need to break down the sleep into smaller increments, + * and check for interrupts after each nap, since we can just + * elog(FATAL) within SIGTERM signal handler if the signal + * arrives in the middle of establishment of replication connection. + */ + if (!libpq_select(-1)) + continue; /* interrupted */ + if (PQconsumeInput(streamConn) == 0) + return NULL; /* trouble */ + } + + /* + * Emulate the PQexec()'s behavior of returning the last result + * when there are many. + * Since walsender will never generate multiple results, we skip + * the concatenation of error messages. + */ + result = PQgetResult(streamConn); + if (result == NULL) + break; /* query is complete */ + + PQclear(lastResult); + lastResult = result; + + if (PQresultStatus(lastResult) == PGRES_COPY_IN || + PQresultStatus(lastResult) == PGRES_COPY_OUT || + PQstatus(streamConn) == CONNECTION_BAD) + break; + } + + return lastResult; +} + +/* * Disconnect connection to primary, if any. */ static void diff --git a/src/backend/replication/walreceiver.c b/src/backend/replication/walreceiver.c index 090111bb112..f2694db8733 100644 --- a/src/backend/replication/walreceiver.c +++ b/src/backend/replication/walreceiver.c @@ -29,7 +29,7 @@ * * * IDENTIFICATION - * $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.8 2010/04/13 08:16:09 mha Exp $ + * $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.9 2010/04/19 14:10:45 mha Exp $ * *------------------------------------------------------------------------- */ @@ -86,8 +86,8 @@ static void DisableWalRcvImmediateExit(void); * We can't just exit(1) within SIGTERM signal handler, because the signal * might arrive in the middle of some critical operation, like while we're * holding a spinlock. We also can't just set a flag in signal handler and - * check it in the main loop, because we perform some blocking libpq - * operations like PQexec(), which can take a long time to finish. + * check it in the main loop, because we perform some blocking operations + * like libpqrcv_PQexec(), which can take a long time to finish. * * We use a combined approach: When WalRcvImmediateInterruptOK is true, it's * safe for the signal handler to elog(FATAL) immediately. Otherwise it just |