diff options
Diffstat (limited to 'ngx_postgres-1.0/src/ngx_postgres_upstream.c')
-rw-r--r-- | ngx_postgres-1.0/src/ngx_postgres_upstream.c | 598 |
1 files changed, 598 insertions, 0 deletions
diff --git a/ngx_postgres-1.0/src/ngx_postgres_upstream.c b/ngx_postgres-1.0/src/ngx_postgres_upstream.c new file mode 100644 index 0000000..919029b --- /dev/null +++ b/ngx_postgres-1.0/src/ngx_postgres_upstream.c @@ -0,0 +1,598 @@ +/* + * Copyright (c) 2010, FRiCKLE Piotr Sikora <info@frickle.com> + * Copyright (c) 2009-2010, Xiaozhe Wang <chaoslawful@gmail.com> + * Copyright (c) 2009-2010, Yichun Zhang <agentzh@gmail.com> + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +#ifndef DDEBUG +#define DDEBUG 0 +#endif + +#include <nginx.h> +#include "ngx_postgres_ddebug.h" +#include "ngx_postgres_module.h" +#include "ngx_postgres_keepalive.h" +#include "ngx_postgres_processor.h" + + +ngx_int_t +ngx_postgres_upstream_init(ngx_conf_t *cf, ngx_http_upstream_srv_conf_t *uscf) +{ + ngx_postgres_upstream_srv_conf_t *pgscf; + ngx_postgres_upstream_server_t *server; + ngx_postgres_upstream_peers_t *peers; + ngx_uint_t i, j, n; + + dd("entering"); + + uscf->peer.init = ngx_postgres_upstream_init_peer; + + pgscf = ngx_http_conf_upstream_srv_conf(uscf, ngx_postgres_module); + + if (pgscf->servers == NULL || pgscf->servers->nelts == 0) { + ngx_log_error(NGX_LOG_ERR, cf->log, 0, + "postgres: no \"postgres_server\" defined" + " in upstream \"%V\" in %s:%ui", + &uscf->host, uscf->file_name, uscf->line); + + dd("returning NGX_ERROR"); + return NGX_ERROR; + } + + /* pgscf->servers != NULL */ + + server = uscf->servers->elts; + + n = 0; + + for (i = 0; i < uscf->servers->nelts; i++) { + n += server[i].naddrs; + } + + peers = ngx_pcalloc(cf->pool, sizeof(ngx_postgres_upstream_peers_t) + + sizeof(ngx_postgres_upstream_peer_t) * (n - 1)); + + if (peers == NULL) { + dd("returning NGX_ERROR"); + return NGX_ERROR; + } + + peers->single = (n == 1); + peers->number = n; + peers->name = &uscf->host; + + n = 0; + + for (i = 0; i < uscf->servers->nelts; i++) { + for (j = 0; j < server[i].naddrs; j++) { + peers->peer[n].sockaddr = server[i].addrs[j].sockaddr; + peers->peer[n].socklen = server[i].addrs[j].socklen; + peers->peer[n].name = server[i].addrs[j].name; + peers->peer[n].port = server[i].port; + peers->peer[n].dbname = server[i].dbname; + peers->peer[n].user = server[i].user; + peers->peer[n].password = server[i].password; + + peers->peer[n].host.data = ngx_pnalloc(cf->pool, + NGX_SOCKADDR_STRLEN); + if (peers->peer[n].host.data == NULL) { + dd("returning NGX_ERROR"); + return NGX_ERROR; + } + + peers->peer[n].host.len = ngx_sock_ntop(peers->peer[n].sockaddr, +#if defined(nginx_version) && (nginx_version >= 1005003) + peers->peer[n].socklen, +#endif + peers->peer[n].host.data, + NGX_SOCKADDR_STRLEN, 0); + if (peers->peer[n].host.len == 0) { + dd("returning NGX_ERROR"); + return NGX_ERROR; + } + + n++; + } + } + + pgscf->peers = peers; + pgscf->active_conns = 0; + + if (pgscf->max_cached) { + dd("returning"); + return ngx_postgres_keepalive_init(cf->pool, pgscf); + } + + dd("returning NGX_OK"); + return NGX_OK; +} + +ngx_int_t +ngx_postgres_upstream_init_peer(ngx_http_request_t *r, + ngx_http_upstream_srv_conf_t *uscf) +{ + ngx_postgres_upstream_peer_data_t *pgdt; + ngx_postgres_upstream_srv_conf_t *pgscf; + ngx_postgres_loc_conf_t *pglcf; + ngx_postgres_ctx_t *pgctx; + ngx_http_core_loc_conf_t *clcf; + ngx_http_upstream_t *u; + ngx_postgres_mixed_t *query; + ngx_str_t sql; + ngx_uint_t i; + + dd("entering"); + + pgdt = ngx_pcalloc(r->pool, sizeof(ngx_postgres_upstream_peer_data_t)); + if (pgdt == NULL) { + goto failed; + } + + u = r->upstream; + + pgdt->upstream = u; + pgdt->request = r; + + pgscf = ngx_http_conf_upstream_srv_conf(uscf, ngx_postgres_module); + pglcf = ngx_http_get_module_loc_conf(r, ngx_postgres_module); + pgctx = ngx_http_get_module_ctx(r, ngx_postgres_module); + + pgdt->srv_conf = pgscf; + pgdt->loc_conf = pglcf; + + u->peer.data = pgdt; + u->peer.get = ngx_postgres_upstream_get_peer; + u->peer.free = ngx_postgres_upstream_free_peer; + + if (pglcf->query.methods_set & r->method) { + /* method-specific query */ + dd("using method-specific query"); + + query = pglcf->query.methods->elts; + for (i = 0; i < pglcf->query.methods->nelts; i++) { + if (query[i].key & r->method) { + query = &query[i]; + break; + } + } + + if (i == pglcf->query.methods->nelts) { + goto failed; + } + } else { + /* default query */ + dd("using default query"); + + query = pglcf->query.def; + } + + if (query->cv) { + /* complex value */ + dd("using complex value"); + + if (ngx_http_complex_value(r, query->cv, &sql) != NGX_OK) { + goto failed; + } + + if (sql.len == 0) { + clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module); + + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, + "postgres: empty \"postgres_query\" (was: \"%V\")" + " in location \"%V\"", &query->cv->value, + &clcf->name); + + goto failed; + } + + pgdt->query = sql; + } else { + /* simple value */ + dd("using simple value"); + + pgdt->query = query->sv; + } + + /* set $postgres_query */ + pgctx->var_query = pgdt->query; + + dd("returning NGX_OK"); + return NGX_OK; + +failed: + +#if defined(nginx_version) && (nginx_version >= 8017) + dd("returning NGX_ERROR"); + return NGX_ERROR; +#else + r->upstream->peer.data = NULL; + + dd("returning NGX_OK (NGX_ERROR)"); + return NGX_OK; +#endif +} + +ngx_int_t +ngx_postgres_upstream_get_peer(ngx_peer_connection_t *pc, void *data) +{ + ngx_postgres_upstream_peer_data_t *pgdt = data; + ngx_postgres_upstream_srv_conf_t *pgscf; +#if defined(nginx_version) && (nginx_version < 8017) + ngx_postgres_ctx_t *pgctx; +#endif + ngx_postgres_upstream_peers_t *peers; + ngx_postgres_upstream_peer_t *peer; + ngx_connection_t *pgxc = NULL; + int fd; + ngx_event_t *rev, *wev; + ngx_int_t rc; + u_char *connstring, *last; + size_t len; + + dd("entering"); + +#if defined(nginx_version) && (nginx_version < 8017) + if (data == NULL) { + goto failed; + } + + pgctx = ngx_http_get_module_ctx(pgdt->request, ngx_postgres_module); +#endif + + pgscf = pgdt->srv_conf; + + pgdt->failed = 0; + + if (pgscf->max_cached && pgscf->single) { + rc = ngx_postgres_keepalive_get_peer_single(pc, pgdt, pgscf); + if (rc != NGX_DECLINED) { + /* re-use keepalive peer */ + dd("re-using keepalive peer (single)"); + + pgdt->state = state_db_send_query; + + ngx_postgres_process_events(pgdt->request); + + dd("returning NGX_AGAIN"); + return NGX_AGAIN; + } + } + + peers = pgscf->peers; + + if (pgscf->current > peers->number - 1) { + pgscf->current = 0; + } + + peer = &peers->peer[pgscf->current++]; + + pgdt->name.len = peer->name.len; + pgdt->name.data = peer->name.data; + + pgdt->sockaddr = *peer->sockaddr; + + pc->name = &pgdt->name; + pc->sockaddr = &pgdt->sockaddr; + pc->socklen = peer->socklen; + pc->cached = 0; + + if ((pgscf->max_cached) && (!pgscf->single)) { + rc = ngx_postgres_keepalive_get_peer_multi(pc, pgdt, pgscf); + if (rc != NGX_DECLINED) { + /* re-use keepalive peer */ + dd("re-using keepalive peer (multi)"); + + pgdt->state = state_db_send_query; + + ngx_postgres_process_events(pgdt->request); + + dd("returning NGX_AGAIN"); + return NGX_AGAIN; + } + } + + if ((pgscf->reject) && (pgscf->active_conns >= pgscf->max_cached)) { + ngx_log_error(NGX_LOG_INFO, pc->log, 0, + "postgres: keepalive connection pool is full," + " rejecting request to upstream \"%V\"", &peer->name); + + /* a bit hack-ish way to return error response (setup part) */ + pc->connection = ngx_get_connection(0, pc->log); + +#if defined(nginx_version) && (nginx_version < 8017) + pgctx->status = NGX_HTTP_SERVICE_UNAVAILABLE; +#endif + + dd("returning NGX_AGAIN (NGX_HTTP_SERVICE_UNAVAILABLE)"); + return NGX_AGAIN; + } + + /* sizeof("...") - 1 + 1 (for spaces and '\0' omitted */ + len = sizeof("hostaddr=") + peer->host.len + + sizeof("port=") + sizeof("65535") - 1 + + sizeof("dbname=") + peer->dbname.len + + sizeof("user=") + peer->user.len + + sizeof("password=") + peer->password.len + + sizeof("sslmode=disable"); + + connstring = ngx_pnalloc(pgdt->request->pool, len); + if (connstring == NULL) { +#if defined(nginx_version) && (nginx_version >= 8017) + dd("returning NGX_ERROR"); + return NGX_ERROR; +#else + goto failed; +#endif + } + + /* TODO add unix sockets */ + last = ngx_snprintf(connstring, len - 1, + "hostaddr=%V port=%d dbname=%V user=%V password=%V" + " sslmode=disable", + &peer->host, peer->port, &peer->dbname, &peer->user, + &peer->password); + *last = '\0'; + + dd("PostgreSQL connection string: %s", connstring); + + /* + * internal checks in PQsetnonblocking are taking care of any + * PQconnectStart failures, so we don't need to check them here. + */ + + ngx_log_debug0(NGX_LOG_DEBUG_HTTP, pc->log, 0, + "postgres: connecting"); + + pgdt->pgconn = PQconnectStart((const char *)connstring); + if (PQsetnonblocking(pgdt->pgconn, 1) == -1) { + ngx_log_error(NGX_LOG_ERR, pc->log, 0, + "postgres: connection failed: %s in upstream \"%V\"", + PQerrorMessage(pgdt->pgconn), &peer->name); + + PQfinish(pgdt->pgconn); + pgdt->pgconn = NULL; + +#if defined(nginx_version) && (nginx_version >= 8017) + dd("returning NGX_DECLINED"); + return NGX_DECLINED; +#else + pgctx->status = NGX_HTTP_BAD_GATEWAY; + goto failed; +#endif + } + +#if defined(DDEBUG) && (DDEBUG > 1) + PQtrace(pgdt->pgconn, stderr); +#endif + + dd("connection status:%d", (int) PQstatus(pgdt->pgconn)); + + /* take spot in keepalive connection pool */ + pgscf->active_conns++; + + /* add the file descriptor (fd) into an nginx connection structure */ + + fd = PQsocket(pgdt->pgconn); + if (fd == -1) { + ngx_log_error(NGX_LOG_ERR, pc->log, 0, + "postgres: failed to get connection fd"); + + goto invalid; + } + + ngx_log_debug1(NGX_LOG_DEBUG_HTTP, pc->log, 0, + "postgres: connection fd:%d", fd); + + pgxc = pc->connection = ngx_get_connection(fd, pc->log); + + if (pgxc == NULL) { + ngx_log_error(NGX_LOG_ERR, pc->log, 0, + "postgres: failed to get a free nginx connection"); + + goto invalid; + } + + pgxc->log = pc->log; + pgxc->log_error = pc->log_error; + pgxc->number = ngx_atomic_fetch_add(ngx_connection_counter, 1); + + rev = pgxc->read; + wev = pgxc->write; + + rev->log = pc->log; + wev->log = pc->log; + + /* register the connection with postgres connection fd into the + * nginx event model */ + + if (ngx_event_flags & NGX_USE_RTSIG_EVENT) { + dd("NGX_USE_RTSIG_EVENT"); + if (ngx_add_conn(pgxc) != NGX_OK) { + goto bad_add; + } + + } else if (ngx_event_flags & NGX_USE_CLEAR_EVENT) { + dd("NGX_USE_CLEAR_EVENT"); + if (ngx_add_event(rev, NGX_READ_EVENT, NGX_CLEAR_EVENT) != NGX_OK) { + goto bad_add; + } + + if (ngx_add_event(wev, NGX_WRITE_EVENT, NGX_CLEAR_EVENT) != NGX_OK) { + goto bad_add; + } + + } else { + dd("NGX_USE_LEVEL_EVENT"); + if (ngx_add_event(rev, NGX_READ_EVENT, NGX_LEVEL_EVENT) != NGX_OK) { + goto bad_add; + } + + if (ngx_add_event(wev, NGX_WRITE_EVENT, NGX_LEVEL_EVENT) != NGX_OK) { + goto bad_add; + } + } + + pgxc->log->action = "connecting to PostgreSQL database"; + pgdt->state = state_db_connect; + + dd("returning NGX_AGAIN"); + return NGX_AGAIN; + +bad_add: + + ngx_log_error(NGX_LOG_ERR, pc->log, 0, + "postgres: failed to add nginx connection"); + +invalid: + + ngx_postgres_upstream_free_connection(pc->log, pc->connection, + pgdt->pgconn, pgscf); + +#if defined(nginx_version) && (nginx_version >= 8017) + dd("returning NGX_ERROR"); + return NGX_ERROR; +#else + +failed: + + /* a bit hack-ish way to return error response (setup part) */ + pc->connection = ngx_get_connection(0, pc->log); + + dd("returning NGX_AGAIN (NGX_ERROR)"); + return NGX_AGAIN; +#endif +} + +void +ngx_postgres_upstream_free_peer(ngx_peer_connection_t *pc, + void *data, ngx_uint_t state) +{ + ngx_postgres_upstream_peer_data_t *pgdt = data; + ngx_postgres_upstream_srv_conf_t *pgscf; + + dd("entering"); + +#if defined(nginx_version) && (nginx_version < 8017) + if (data == NULL) { + dd("returning"); + return; + } +#endif + + pgscf = pgdt->srv_conf; + + if (pgscf->max_cached) { + ngx_postgres_keepalive_free_peer(pc, pgdt, pgscf, state); + } + + if (pc->connection) { + dd("free connection to PostgreSQL database"); + + ngx_postgres_upstream_free_connection(pc->log, pc->connection, + pgdt->pgconn, pgscf); + + pgdt->pgconn = NULL; + pc->connection = NULL; + } + + dd("returning"); +} + +ngx_flag_t +ngx_postgres_upstream_is_my_peer(const ngx_peer_connection_t *peer) +{ + dd("entering & returning"); + return (peer->get == ngx_postgres_upstream_get_peer); +} + +void +ngx_postgres_upstream_free_connection(ngx_log_t *log, ngx_connection_t *c, + PGconn *pgconn, ngx_postgres_upstream_srv_conf_t *pgscf) +{ + ngx_event_t *rev, *wev; + + dd("entering"); + + PQfinish(pgconn); + + if (c) { + rev = c->read; + wev = c->write; + + if (rev->timer_set) { + ngx_del_timer(rev); + } + + if (wev->timer_set) { + ngx_del_timer(wev); + } + + if (ngx_del_conn) { + ngx_del_conn(c, NGX_CLOSE_EVENT); + } else { + if (rev->active || rev->disabled) { + ngx_del_event(rev, NGX_READ_EVENT, NGX_CLOSE_EVENT); + } + + if (wev->active || wev->disabled) { + ngx_del_event(wev, NGX_WRITE_EVENT, NGX_CLOSE_EVENT); + } + } + +#if defined(nginx_version) && nginx_version >= 1007005 + if (rev->posted) { +#else + if (rev->prev) { +#endif + ngx_delete_posted_event(rev); + } + +#if defined(nginx_version) && nginx_version >= 1007005 + if (wev->posted) { +#else + if (wev->prev) { +#endif + ngx_delete_posted_event(wev); + } + + rev->closed = 1; + wev->closed = 1; + +#if defined(nginx_version) && (nginx_version >= 1001004) + if (c->pool) { + ngx_destroy_pool(c->pool); + } +#endif + + ngx_free_connection(c); + + c->fd = (ngx_socket_t) -1; + } + + /* free spot in keepalive connection pool */ + pgscf->active_conns--; + + dd("returning"); +} |