summaryrefslogtreecommitdiff
path: root/ngx_postgres-1.0/src/ngx_postgres_keepalive.c
diff options
context:
space:
mode:
Diffstat (limited to 'ngx_postgres-1.0/src/ngx_postgres_keepalive.c')
-rw-r--r--ngx_postgres-1.0/src/ngx_postgres_keepalive.c344
1 files changed, 344 insertions, 0 deletions
diff --git a/ngx_postgres-1.0/src/ngx_postgres_keepalive.c b/ngx_postgres-1.0/src/ngx_postgres_keepalive.c
new file mode 100644
index 0000000..6575338
--- /dev/null
+++ b/ngx_postgres-1.0/src/ngx_postgres_keepalive.c
@@ -0,0 +1,344 @@
+/*
+ * Copyright (c) 2010, FRiCKLE Piotr Sikora <info@frickle.com>
+ * Copyright (c) 2009-2010, Yichun Zhang <agentzh@gmail.com>
+ * Copyright (C) 2008, Maxim Dounin
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY AUTHOR AND CONTRIBUTORS ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL AUTHOR OR CONTRIBUTORS BE LIABLE
+ * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+ * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+ * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+ * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+#ifndef DDEBUG
+#define DDEBUG 0
+#endif
+
+#include "ngx_postgres_ddebug.h"
+#include "ngx_postgres_keepalive.h"
+
+
+ngx_int_t
+ngx_postgres_keepalive_init(ngx_pool_t *pool,
+ ngx_postgres_upstream_srv_conf_t *pgscf)
+{
+ ngx_postgres_keepalive_cache_t *cached;
+ ngx_uint_t i;
+
+ dd("entering");
+
+ cached = ngx_pcalloc(pool,
+ sizeof(ngx_postgres_keepalive_cache_t) * pgscf->max_cached);
+ if (cached == NULL) {
+ dd("returning NGX_ERROR");
+ return NGX_ERROR;
+ }
+
+ ngx_queue_init(&pgscf->cache);
+ ngx_queue_init(&pgscf->free);
+
+ for (i = 0; i < pgscf->max_cached; i++) {
+ ngx_queue_insert_head(&pgscf->free, &cached[i].queue);
+ cached[i].srv_conf = pgscf;
+ }
+
+ dd("returning NGX_OK");
+ return NGX_OK;
+}
+
+ngx_int_t
+ngx_postgres_keepalive_get_peer_single(ngx_peer_connection_t *pc,
+ ngx_postgres_upstream_peer_data_t *pgp,
+ ngx_postgres_upstream_srv_conf_t *pgscf)
+{
+ ngx_postgres_keepalive_cache_t *item;
+ ngx_queue_t *q;
+ ngx_connection_t *c;
+
+ dd("entering");
+
+ if (!ngx_queue_empty(&pgscf->cache)) {
+ dd("non-empty queue");
+
+ q = ngx_queue_head(&pgscf->cache);
+ ngx_queue_remove(q);
+
+ item = ngx_queue_data(q, ngx_postgres_keepalive_cache_t, queue);
+ c = item->connection;
+
+ ngx_queue_insert_head(&pgscf->free, q);
+
+ c->idle = 0;
+ c->log = pc->log;
+#if defined(nginx_version) && (nginx_version >= 1001004)
+ c->pool->log = pc->log;
+#endif
+ c->read->log = pc->log;
+ c->write->log = pc->log;
+
+ pgp->name.data = item->name.data;
+ pgp->name.len = item->name.len;
+
+ pgp->sockaddr = item->sockaddr;
+
+ pgp->pgconn = item->pgconn;
+
+ pc->connection = c;
+ pc->cached = 1;
+
+ pc->name = &pgp->name;
+
+ pc->sockaddr = &pgp->sockaddr;
+ pc->socklen = item->socklen;
+
+ dd("returning NGX_DONE");
+
+ return NGX_DONE;
+ }
+
+ dd("returning NGX_DECLINED");
+ return NGX_DECLINED;
+}
+
+ngx_int_t
+ngx_postgres_keepalive_get_peer_multi(ngx_peer_connection_t *pc,
+ ngx_postgres_upstream_peer_data_t *pgp,
+ ngx_postgres_upstream_srv_conf_t *pgscf)
+{
+ ngx_postgres_keepalive_cache_t *item;
+ ngx_queue_t *q, *cache;
+ ngx_connection_t *c;
+
+ dd("entering");
+
+ cache = &pgscf->cache;
+
+ for (q = ngx_queue_head(cache);
+ q != ngx_queue_sentinel(cache);
+ q = ngx_queue_next(q))
+ {
+ item = ngx_queue_data(q, ngx_postgres_keepalive_cache_t, queue);
+ c = item->connection;
+
+ if (ngx_memn2cmp((u_char *) &item->sockaddr, (u_char *) pc->sockaddr,
+ item->socklen, pc->socklen) == 0)
+ {
+ ngx_queue_remove(q);
+ ngx_queue_insert_head(&pgscf->free, q);
+
+ c->idle = 0;
+ c->log = pc->log;
+#if defined(nginx_version) && (nginx_version >= 1001004)
+ c->pool->log = pc->log;
+#endif
+ c->read->log = pc->log;
+ c->write->log = pc->log;
+
+ pc->connection = c;
+ pc->cached = 1;
+
+ /* we do not need to resume the peer name
+ * because we already take the right value outside */
+
+ pgp->pgconn = item->pgconn;
+
+ dd("returning NGX_DONE");
+ return NGX_DONE;
+ }
+ }
+
+ dd("returning NGX_DECLINED");
+ return NGX_DECLINED;
+}
+
+void
+ngx_postgres_keepalive_free_peer(ngx_peer_connection_t *pc,
+ ngx_postgres_upstream_peer_data_t *pgp,
+ ngx_postgres_upstream_srv_conf_t *pgscf, ngx_uint_t state)
+{
+ ngx_postgres_keepalive_cache_t *item;
+ ngx_queue_t *q;
+ ngx_connection_t *c;
+ ngx_http_upstream_t *u;
+
+ dd("entering");
+
+ ngx_log_debug0(NGX_LOG_DEBUG_HTTP, pc->log, 0,
+ "postgres: free keepalive peer");
+
+ if (state & NGX_PEER_FAILED) {
+ pgp->failed = 1;
+ }
+
+ u = pgp->upstream;
+
+ if ((!pgp->failed) && (pc->connection != NULL)
+ && (u->headers_in.status_n == NGX_HTTP_OK))
+ {
+ c = pc->connection;
+
+ if (c->read->timer_set) {
+ ngx_del_timer(c->read);
+ }
+
+ if (c->write->timer_set) {
+ ngx_del_timer(c->write);
+ }
+
+ if (c->write->active && (ngx_event_flags & NGX_USE_LEVEL_EVENT)) {
+ if (ngx_del_event(c->write, NGX_WRITE_EVENT, 0) != NGX_OK) {
+ return;
+ }
+ }
+
+ pc->connection = NULL;
+
+ ngx_log_debug1(NGX_LOG_DEBUG_HTTP, pc->log, 0,
+ "postgres: free keepalive peer: saving connection %p",
+ c);
+
+ if (ngx_queue_empty(&pgscf->free)) {
+ /* connection pool is already full */
+
+ q = ngx_queue_last(&pgscf->cache);
+ ngx_queue_remove(q);
+
+ item = ngx_queue_data(q, ngx_postgres_keepalive_cache_t,
+ queue);
+
+ ngx_postgres_upstream_free_connection(pc->log, item->connection,
+ item->pgconn, pgscf);
+
+ } else {
+ q = ngx_queue_head(&pgscf->free);
+ ngx_queue_remove(q);
+
+ item = ngx_queue_data(q, ngx_postgres_keepalive_cache_t,
+ queue);
+ }
+
+ item->connection = c;
+ ngx_queue_insert_head(&pgscf->cache, q);
+
+ c->write->handler = ngx_postgres_keepalive_dummy_handler;
+ c->read->handler = ngx_postgres_keepalive_close_handler;
+
+ c->data = item;
+ c->idle = 1;
+ c->log = ngx_cycle->log;
+#if defined(nginx_version) && (nginx_version >= 1001004)
+ c->pool->log = ngx_cycle->log;
+#endif
+ c->read->log = ngx_cycle->log;
+ c->write->log = ngx_cycle->log;
+
+ item->socklen = pc->socklen;
+ ngx_memcpy(&item->sockaddr, pc->sockaddr, pc->socklen);
+
+ item->pgconn = pgp->pgconn;
+
+ item->name.data = pgp->name.data;
+ item->name.len = pgp->name.len;
+ }
+
+ dd("returning");
+}
+
+void
+ngx_postgres_keepalive_dummy_handler(ngx_event_t *ev)
+{
+ dd("entering & returning (dummy handler)");
+}
+
+void
+ngx_postgres_keepalive_close_handler(ngx_event_t *ev)
+{
+ ngx_postgres_upstream_srv_conf_t *pgscf;
+ ngx_postgres_keepalive_cache_t *item;
+ ngx_connection_t *c;
+ PGresult *res;
+
+ dd("entering");
+
+ c = ev->data;
+ item = c->data;
+
+ if (c->close) {
+ goto close;
+ }
+
+ if (PQconsumeInput(item->pgconn) && !PQisBusy(item->pgconn)) {
+ res = PQgetResult(item->pgconn);
+ if (res == NULL) {
+ dd("returning");
+ return;
+ }
+
+ PQclear(res);
+
+ dd("received result on idle keepalive connection");
+ ngx_log_error(NGX_LOG_ERR, c->log, 0,
+ "postgres: received result on idle keepalive connection");
+ }
+
+close:
+
+ pgscf = item->srv_conf;
+
+ ngx_postgres_upstream_free_connection(ev->log, c, item->pgconn, pgscf);
+
+ ngx_queue_remove(&item->queue);
+ ngx_queue_insert_head(&pgscf->free, &item->queue);
+
+ dd("returning");
+}
+
+void
+ngx_postgres_keepalive_cleanup(void *data)
+{
+ ngx_postgres_upstream_srv_conf_t *pgscf = data;
+ ngx_postgres_keepalive_cache_t *item;
+ ngx_queue_t *q;
+
+ dd("entering");
+
+ /* ngx_queue_empty is broken when used on unitialized queue */
+ if (pgscf->cache.prev == NULL) {
+ dd("returning");
+ return;
+ }
+
+ /* just to be on the safe-side */
+ pgscf->max_cached = 0;
+
+ while (!ngx_queue_empty(&pgscf->cache)) {
+ q = ngx_queue_head(&pgscf->cache);
+ ngx_queue_remove(q);
+
+ item = ngx_queue_data(q, ngx_postgres_keepalive_cache_t,
+ queue);
+
+ dd("postgres: disconnecting %p", item->connection);
+
+ ngx_postgres_upstream_free_connection(item->connection->log,
+ item->connection,
+ item->pgconn, pgscf);
+ }
+
+ dd("returning");
+}