summaryrefslogtreecommitdiff
path: root/ngx_postgres-1.0/src/ngx_postgres_upstream.c
diff options
context:
space:
mode:
Diffstat (limited to 'ngx_postgres-1.0/src/ngx_postgres_upstream.c')
-rw-r--r--ngx_postgres-1.0/src/ngx_postgres_upstream.c598
1 files changed, 598 insertions, 0 deletions
diff --git a/ngx_postgres-1.0/src/ngx_postgres_upstream.c b/ngx_postgres-1.0/src/ngx_postgres_upstream.c
new file mode 100644
index 0000000..919029b
--- /dev/null
+++ b/ngx_postgres-1.0/src/ngx_postgres_upstream.c
@@ -0,0 +1,598 @@
+/*
+ * Copyright (c) 2010, FRiCKLE Piotr Sikora <info@frickle.com>
+ * Copyright (c) 2009-2010, Xiaozhe Wang <chaoslawful@gmail.com>
+ * Copyright (c) 2009-2010, Yichun Zhang <agentzh@gmail.com>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef DDEBUG
+#define DDEBUG 0
+#endif
+
+#include <nginx.h>
+#include "ngx_postgres_ddebug.h"
+#include "ngx_postgres_module.h"
+#include "ngx_postgres_keepalive.h"
+#include "ngx_postgres_processor.h"
+
+
+ngx_int_t
+ngx_postgres_upstream_init(ngx_conf_t *cf, ngx_http_upstream_srv_conf_t *uscf)
+{
+ ngx_postgres_upstream_srv_conf_t *pgscf;
+ ngx_postgres_upstream_server_t *server;
+ ngx_postgres_upstream_peers_t *peers;
+ ngx_uint_t i, j, n;
+
+ dd("entering");
+
+ uscf->peer.init = ngx_postgres_upstream_init_peer;
+
+ pgscf = ngx_http_conf_upstream_srv_conf(uscf, ngx_postgres_module);
+
+ if (pgscf->servers == NULL || pgscf->servers->nelts == 0) {
+ ngx_log_error(NGX_LOG_ERR, cf->log, 0,
+ "postgres: no \"postgres_server\" defined"
+ " in upstream \"%V\" in %s:%ui",
+ &uscf->host, uscf->file_name, uscf->line);
+
+ dd("returning NGX_ERROR");
+ return NGX_ERROR;
+ }
+
+ /* pgscf->servers != NULL */
+
+ server = uscf->servers->elts;
+
+ n = 0;
+
+ for (i = 0; i < uscf->servers->nelts; i++) {
+ n += server[i].naddrs;
+ }
+
+ peers = ngx_pcalloc(cf->pool, sizeof(ngx_postgres_upstream_peers_t)
+ + sizeof(ngx_postgres_upstream_peer_t) * (n - 1));
+
+ if (peers == NULL) {
+ dd("returning NGX_ERROR");
+ return NGX_ERROR;
+ }
+
+ peers->single = (n == 1);
+ peers->number = n;
+ peers->name = &uscf->host;
+
+ n = 0;
+
+ for (i = 0; i < uscf->servers->nelts; i++) {
+ for (j = 0; j < server[i].naddrs; j++) {
+ peers->peer[n].sockaddr = server[i].addrs[j].sockaddr;
+ peers->peer[n].socklen = server[i].addrs[j].socklen;
+ peers->peer[n].name = server[i].addrs[j].name;
+ peers->peer[n].port = server[i].port;
+ peers->peer[n].dbname = server[i].dbname;
+ peers->peer[n].user = server[i].user;
+ peers->peer[n].password = server[i].password;
+
+ peers->peer[n].host.data = ngx_pnalloc(cf->pool,
+ NGX_SOCKADDR_STRLEN);
+ if (peers->peer[n].host.data == NULL) {
+ dd("returning NGX_ERROR");
+ return NGX_ERROR;
+ }
+
+ peers->peer[n].host.len = ngx_sock_ntop(peers->peer[n].sockaddr,
+#if defined(nginx_version) && (nginx_version >= 1005003)
+ peers->peer[n].socklen,
+#endif
+ peers->peer[n].host.data,
+ NGX_SOCKADDR_STRLEN, 0);
+ if (peers->peer[n].host.len == 0) {
+ dd("returning NGX_ERROR");
+ return NGX_ERROR;
+ }
+
+ n++;
+ }
+ }
+
+ pgscf->peers = peers;
+ pgscf->active_conns = 0;
+
+ if (pgscf->max_cached) {
+ dd("returning");
+ return ngx_postgres_keepalive_init(cf->pool, pgscf);
+ }
+
+ dd("returning NGX_OK");
+ return NGX_OK;
+}
+
+ngx_int_t
+ngx_postgres_upstream_init_peer(ngx_http_request_t *r,
+ ngx_http_upstream_srv_conf_t *uscf)
+{
+ ngx_postgres_upstream_peer_data_t *pgdt;
+ ngx_postgres_upstream_srv_conf_t *pgscf;
+ ngx_postgres_loc_conf_t *pglcf;
+ ngx_postgres_ctx_t *pgctx;
+ ngx_http_core_loc_conf_t *clcf;
+ ngx_http_upstream_t *u;
+ ngx_postgres_mixed_t *query;
+ ngx_str_t sql;
+ ngx_uint_t i;
+
+ dd("entering");
+
+ pgdt = ngx_pcalloc(r->pool, sizeof(ngx_postgres_upstream_peer_data_t));
+ if (pgdt == NULL) {
+ goto failed;
+ }
+
+ u = r->upstream;
+
+ pgdt->upstream = u;
+ pgdt->request = r;
+
+ pgscf = ngx_http_conf_upstream_srv_conf(uscf, ngx_postgres_module);
+ pglcf = ngx_http_get_module_loc_conf(r, ngx_postgres_module);
+ pgctx = ngx_http_get_module_ctx(r, ngx_postgres_module);
+
+ pgdt->srv_conf = pgscf;
+ pgdt->loc_conf = pglcf;
+
+ u->peer.data = pgdt;
+ u->peer.get = ngx_postgres_upstream_get_peer;
+ u->peer.free = ngx_postgres_upstream_free_peer;
+
+ if (pglcf->query.methods_set & r->method) {
+ /* method-specific query */
+ dd("using method-specific query");
+
+ query = pglcf->query.methods->elts;
+ for (i = 0; i < pglcf->query.methods->nelts; i++) {
+ if (query[i].key & r->method) {
+ query = &query[i];
+ break;
+ }
+ }
+
+ if (i == pglcf->query.methods->nelts) {
+ goto failed;
+ }
+ } else {
+ /* default query */
+ dd("using default query");
+
+ query = pglcf->query.def;
+ }
+
+ if (query->cv) {
+ /* complex value */
+ dd("using complex value");
+
+ if (ngx_http_complex_value(r, query->cv, &sql) != NGX_OK) {
+ goto failed;
+ }
+
+ if (sql.len == 0) {
+ clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);
+
+ ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
+ "postgres: empty \"postgres_query\" (was: \"%V\")"
+ " in location \"%V\"", &query->cv->value,
+ &clcf->name);
+
+ goto failed;
+ }
+
+ pgdt->query = sql;
+ } else {
+ /* simple value */
+ dd("using simple value");
+
+ pgdt->query = query->sv;
+ }
+
+ /* set $postgres_query */
+ pgctx->var_query = pgdt->query;
+
+ dd("returning NGX_OK");
+ return NGX_OK;
+
+failed:
+
+#if defined(nginx_version) && (nginx_version >= 8017)
+ dd("returning NGX_ERROR");
+ return NGX_ERROR;
+#else
+ r->upstream->peer.data = NULL;
+
+ dd("returning NGX_OK (NGX_ERROR)");
+ return NGX_OK;
+#endif
+}
+
+ngx_int_t
+ngx_postgres_upstream_get_peer(ngx_peer_connection_t *pc, void *data)
+{
+ ngx_postgres_upstream_peer_data_t *pgdt = data;
+ ngx_postgres_upstream_srv_conf_t *pgscf;
+#if defined(nginx_version) && (nginx_version < 8017)
+ ngx_postgres_ctx_t *pgctx;
+#endif
+ ngx_postgres_upstream_peers_t *peers;
+ ngx_postgres_upstream_peer_t *peer;
+ ngx_connection_t *pgxc = NULL;
+ int fd;
+ ngx_event_t *rev, *wev;
+ ngx_int_t rc;
+ u_char *connstring, *last;
+ size_t len;
+
+ dd("entering");
+
+#if defined(nginx_version) && (nginx_version < 8017)
+ if (data == NULL) {
+ goto failed;
+ }
+
+ pgctx = ngx_http_get_module_ctx(pgdt->request, ngx_postgres_module);
+#endif
+
+ pgscf = pgdt->srv_conf;
+
+ pgdt->failed = 0;
+
+ if (pgscf->max_cached && pgscf->single) {
+ rc = ngx_postgres_keepalive_get_peer_single(pc, pgdt, pgscf);
+ if (rc != NGX_DECLINED) {
+ /* re-use keepalive peer */
+ dd("re-using keepalive peer (single)");
+
+ pgdt->state = state_db_send_query;
+
+ ngx_postgres_process_events(pgdt->request);
+
+ dd("returning NGX_AGAIN");
+ return NGX_AGAIN;
+ }
+ }
+
+ peers = pgscf->peers;
+
+ if (pgscf->current > peers->number - 1) {
+ pgscf->current = 0;
+ }
+
+ peer = &peers->peer[pgscf->current++];
+
+ pgdt->name.len = peer->name.len;
+ pgdt->name.data = peer->name.data;
+
+ pgdt->sockaddr = *peer->sockaddr;
+
+ pc->name = &pgdt->name;
+ pc->sockaddr = &pgdt->sockaddr;
+ pc->socklen = peer->socklen;
+ pc->cached = 0;
+
+ if ((pgscf->max_cached) && (!pgscf->single)) {
+ rc = ngx_postgres_keepalive_get_peer_multi(pc, pgdt, pgscf);
+ if (rc != NGX_DECLINED) {
+ /* re-use keepalive peer */
+ dd("re-using keepalive peer (multi)");
+
+ pgdt->state = state_db_send_query;
+
+ ngx_postgres_process_events(pgdt->request);
+
+ dd("returning NGX_AGAIN");
+ return NGX_AGAIN;
+ }
+ }
+
+ if ((pgscf->reject) && (pgscf->active_conns >= pgscf->max_cached)) {
+ ngx_log_error(NGX_LOG_INFO, pc->log, 0,
+ "postgres: keepalive connection pool is full,"
+ " rejecting request to upstream \"%V\"", &peer->name);
+
+ /* a bit hack-ish way to return error response (setup part) */
+ pc->connection = ngx_get_connection(0, pc->log);
+
+#if defined(nginx_version) && (nginx_version < 8017)
+ pgctx->status = NGX_HTTP_SERVICE_UNAVAILABLE;
+#endif
+
+ dd("returning NGX_AGAIN (NGX_HTTP_SERVICE_UNAVAILABLE)");
+ return NGX_AGAIN;
+ }
+
+ /* sizeof("...") - 1 + 1 (for spaces and '\0' omitted */
+ len = sizeof("hostaddr=") + peer->host.len
+ + sizeof("port=") + sizeof("65535") - 1
+ + sizeof("dbname=") + peer->dbname.len
+ + sizeof("user=") + peer->user.len
+ + sizeof("password=") + peer->password.len
+ + sizeof("sslmode=disable");
+
+ connstring = ngx_pnalloc(pgdt->request->pool, len);
+ if (connstring == NULL) {
+#if defined(nginx_version) && (nginx_version >= 8017)
+ dd("returning NGX_ERROR");
+ return NGX_ERROR;
+#else
+ goto failed;
+#endif
+ }
+
+ /* TODO add unix sockets */
+ last = ngx_snprintf(connstring, len - 1,
+ "hostaddr=%V port=%d dbname=%V user=%V password=%V"
+ " sslmode=disable",
+ &peer->host, peer->port, &peer->dbname, &peer->user,
+ &peer->password);
+ *last = '\0';
+
+ dd("PostgreSQL connection string: %s", connstring);
+
+ /*
+ * internal checks in PQsetnonblocking are taking care of any
+ * PQconnectStart failures, so we don't need to check them here.
+ */
+
+ ngx_log_debug0(NGX_LOG_DEBUG_HTTP, pc->log, 0,
+ "postgres: connecting");
+
+ pgdt->pgconn = PQconnectStart((const char *)connstring);
+ if (PQsetnonblocking(pgdt->pgconn, 1) == -1) {
+ ngx_log_error(NGX_LOG_ERR, pc->log, 0,
+ "postgres: connection failed: %s in upstream \"%V\"",
+ PQerrorMessage(pgdt->pgconn), &peer->name);
+
+ PQfinish(pgdt->pgconn);
+ pgdt->pgconn = NULL;
+
+#if defined(nginx_version) && (nginx_version >= 8017)
+ dd("returning NGX_DECLINED");
+ return NGX_DECLINED;
+#else
+ pgctx->status = NGX_HTTP_BAD_GATEWAY;
+ goto failed;
+#endif
+ }
+
+#if defined(DDEBUG) && (DDEBUG > 1)
+ PQtrace(pgdt->pgconn, stderr);
+#endif
+
+ dd("connection status:%d", (int) PQstatus(pgdt->pgconn));
+
+ /* take spot in keepalive connection pool */
+ pgscf->active_conns++;
+
+ /* add the file descriptor (fd) into an nginx connection structure */
+
+ fd = PQsocket(pgdt->pgconn);
+ if (fd == -1) {
+ ngx_log_error(NGX_LOG_ERR, pc->log, 0,
+ "postgres: failed to get connection fd");
+
+ goto invalid;
+ }
+
+ ngx_log_debug1(NGX_LOG_DEBUG_HTTP, pc->log, 0,
+ "postgres: connection fd:%d", fd);
+
+ pgxc = pc->connection = ngx_get_connection(fd, pc->log);
+
+ if (pgxc == NULL) {
+ ngx_log_error(NGX_LOG_ERR, pc->log, 0,
+ "postgres: failed to get a free nginx connection");
+
+ goto invalid;
+ }
+
+ pgxc->log = pc->log;
+ pgxc->log_error = pc->log_error;
+ pgxc->number = ngx_atomic_fetch_add(ngx_connection_counter, 1);
+
+ rev = pgxc->read;
+ wev = pgxc->write;
+
+ rev->log = pc->log;
+ wev->log = pc->log;
+
+ /* register the connection with postgres connection fd into the
+ * nginx event model */
+
+ if (ngx_event_flags & NGX_USE_RTSIG_EVENT) {
+ dd("NGX_USE_RTSIG_EVENT");
+ if (ngx_add_conn(pgxc) != NGX_OK) {
+ goto bad_add;
+ }
+
+ } else if (ngx_event_flags & NGX_USE_CLEAR_EVENT) {
+ dd("NGX_USE_CLEAR_EVENT");
+ if (ngx_add_event(rev, NGX_READ_EVENT, NGX_CLEAR_EVENT) != NGX_OK) {
+ goto bad_add;
+ }
+
+ if (ngx_add_event(wev, NGX_WRITE_EVENT, NGX_CLEAR_EVENT) != NGX_OK) {
+ goto bad_add;
+ }
+
+ } else {
+ dd("NGX_USE_LEVEL_EVENT");
+ if (ngx_add_event(rev, NGX_READ_EVENT, NGX_LEVEL_EVENT) != NGX_OK) {
+ goto bad_add;
+ }
+
+ if (ngx_add_event(wev, NGX_WRITE_EVENT, NGX_LEVEL_EVENT) != NGX_OK) {
+ goto bad_add;
+ }
+ }
+
+ pgxc->log->action = "connecting to PostgreSQL database";
+ pgdt->state = state_db_connect;
+
+ dd("returning NGX_AGAIN");
+ return NGX_AGAIN;
+
+bad_add:
+
+ ngx_log_error(NGX_LOG_ERR, pc->log, 0,
+ "postgres: failed to add nginx connection");
+
+invalid:
+
+ ngx_postgres_upstream_free_connection(pc->log, pc->connection,
+ pgdt->pgconn, pgscf);
+
+#if defined(nginx_version) && (nginx_version >= 8017)
+ dd("returning NGX_ERROR");
+ return NGX_ERROR;
+#else
+
+failed:
+
+ /* a bit hack-ish way to return error response (setup part) */
+ pc->connection = ngx_get_connection(0, pc->log);
+
+ dd("returning NGX_AGAIN (NGX_ERROR)");
+ return NGX_AGAIN;
+#endif
+}
+
+void
+ngx_postgres_upstream_free_peer(ngx_peer_connection_t *pc,
+ void *data, ngx_uint_t state)
+{
+ ngx_postgres_upstream_peer_data_t *pgdt = data;
+ ngx_postgres_upstream_srv_conf_t *pgscf;
+
+ dd("entering");
+
+#if defined(nginx_version) && (nginx_version < 8017)
+ if (data == NULL) {
+ dd("returning");
+ return;
+ }
+#endif
+
+ pgscf = pgdt->srv_conf;
+
+ if (pgscf->max_cached) {
+ ngx_postgres_keepalive_free_peer(pc, pgdt, pgscf, state);
+ }
+
+ if (pc->connection) {
+ dd("free connection to PostgreSQL database");
+
+ ngx_postgres_upstream_free_connection(pc->log, pc->connection,
+ pgdt->pgconn, pgscf);
+
+ pgdt->pgconn = NULL;
+ pc->connection = NULL;
+ }
+
+ dd("returning");
+}
+
+ngx_flag_t
+ngx_postgres_upstream_is_my_peer(const ngx_peer_connection_t *peer)
+{
+ dd("entering & returning");
+ return (peer->get == ngx_postgres_upstream_get_peer);
+}
+
+void
+ngx_postgres_upstream_free_connection(ngx_log_t *log, ngx_connection_t *c,
+ PGconn *pgconn, ngx_postgres_upstream_srv_conf_t *pgscf)
+{
+ ngx_event_t *rev, *wev;
+
+ dd("entering");
+
+ PQfinish(pgconn);
+
+ if (c) {
+ rev = c->read;
+ wev = c->write;
+
+ if (rev->timer_set) {
+ ngx_del_timer(rev);
+ }
+
+ if (wev->timer_set) {
+ ngx_del_timer(wev);
+ }
+
+ if (ngx_del_conn) {
+ ngx_del_conn(c, NGX_CLOSE_EVENT);
+ } else {
+ if (rev->active || rev->disabled) {
+ ngx_del_event(rev, NGX_READ_EVENT, NGX_CLOSE_EVENT);
+ }
+
+ if (wev->active || wev->disabled) {
+ ngx_del_event(wev, NGX_WRITE_EVENT, NGX_CLOSE_EVENT);
+ }
+ }
+
+#if defined(nginx_version) && nginx_version >= 1007005
+ if (rev->posted) {
+#else
+ if (rev->prev) {
+#endif
+ ngx_delete_posted_event(rev);
+ }
+
+#if defined(nginx_version) && nginx_version >= 1007005
+ if (wev->posted) {
+#else
+ if (wev->prev) {
+#endif
+ ngx_delete_posted_event(wev);
+ }
+
+ rev->closed = 1;
+ wev->closed = 1;
+
+#if defined(nginx_version) && (nginx_version >= 1001004)
+ if (c->pool) {
+ ngx_destroy_pool(c->pool);
+ }
+#endif
+
+ ngx_free_connection(c);
+
+ c->fd = (ngx_socket_t) -1;
+ }
+
+ /* free spot in keepalive connection pool */
+ pgscf->active_conns--;
+
+ dd("returning");
+}