summaryrefslogtreecommitdiff
path: root/ngx_postgres-1.0/src/ngx_postgres_processor.c
diff options
context:
space:
mode:
Diffstat (limited to 'ngx_postgres-1.0/src/ngx_postgres_processor.c')
-rw-r--r--ngx_postgres-1.0/src/ngx_postgres_processor.c514
1 files changed, 514 insertions, 0 deletions
diff --git a/ngx_postgres-1.0/src/ngx_postgres_processor.c b/ngx_postgres-1.0/src/ngx_postgres_processor.c
new file mode 100644
index 0000000..d25c054
--- /dev/null
+++ b/ngx_postgres-1.0/src/ngx_postgres_processor.c
@@ -0,0 +1,514 @@
+/*
+ * Copyright (c) 2010, FRiCKLE Piotr Sikora <info@frickle.com>
+ * Copyright (c) 2009-2010, Xiaozhe Wang <chaoslawful@gmail.com>
+ * Copyright (c) 2009-2010, Yichun Zhang <agentzh@gmail.com>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#ifndef DDEBUG
+#define DDEBUG 0
+#endif
+
+#include "ngx_postgres_ddebug.h"
+#include "ngx_postgres_output.h"
+#include "ngx_postgres_processor.h"
+#include "ngx_postgres_util.h"
+#include "ngx_postgres_variable.h"
+
+
+void
+ngx_postgres_process_events(ngx_http_request_t *r)
+{
+ ngx_postgres_upstream_peer_data_t *pgdt;
+ ngx_connection_t *pgxc;
+ ngx_http_upstream_t *u;
+ ngx_int_t rc;
+
+ dd("entering");
+
+ u = r->upstream;
+ pgxc = u->peer.connection;
+ pgdt = u->peer.data;
+
+ if (!ngx_postgres_upstream_is_my_peer(&u->peer)) {
+ ngx_log_error(NGX_LOG_ERR, pgxc->log, 0,
+ "postgres: trying to connect to something that"
+ " is not PostgreSQL database");
+
+ goto failed;
+ }
+
+ ngx_log_debug0(NGX_LOG_DEBUG_HTTP, pgxc->log, 0,
+ "postgres: process events");
+
+ switch (pgdt->state) {
+ case state_db_connect:
+ dd("state_db_connect");
+ rc = ngx_postgres_upstream_connect(r, pgxc, pgdt);
+ break;
+ case state_db_send_query:
+ dd("state_db_send_query");
+ rc = ngx_postgres_upstream_send_query(r, pgxc, pgdt);
+ break;
+ case state_db_get_result:
+ dd("state_db_get_result");
+ rc = ngx_postgres_upstream_get_result(r, pgxc, pgdt);
+ break;
+ case state_db_get_ack:
+ dd("state_db_get_ack");
+ rc = ngx_postgres_upstream_get_ack(r, pgxc, pgdt);
+ break;
+ case state_db_idle:
+ dd("state_db_idle, re-using keepalive connection");
+ pgxc->log->action = "sending query to PostgreSQL database";
+ pgdt->state = state_db_send_query;
+ rc = ngx_postgres_upstream_send_query(r, pgxc, pgdt);
+ break;
+ default:
+ dd("unknown state:%d", pgdt->state);
+ ngx_log_error(NGX_LOG_ERR, pgxc->log, 0,
+ "postgres: unknown state:%d", pgdt->state);
+
+ goto failed;
+ }
+
+ if (rc >= NGX_HTTP_SPECIAL_RESPONSE) {
+ ngx_postgres_upstream_finalize_request(r, u, rc);
+ } else if (rc == NGX_ERROR) {
+ goto failed;
+ }
+
+ dd("returning");
+ return;
+
+failed:
+
+ ngx_postgres_upstream_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
+
+ dd("returning");
+}
+
+ngx_int_t
+ngx_postgres_upstream_connect(ngx_http_request_t *r, ngx_connection_t *pgxc,
+ ngx_postgres_upstream_peer_data_t *pgdt)
+{
+ PostgresPollingStatusType pgrc;
+
+ dd("entering");
+
+ pgrc = PQconnectPoll(pgdt->pgconn);
+
+ ngx_log_debug1(NGX_LOG_DEBUG_HTTP, pgxc->log, 0,
+ "postgres: polling while connecting, rc:%d", (int) pgrc);
+
+ if (pgrc == PGRES_POLLING_READING || pgrc == PGRES_POLLING_WRITING) {
+
+ /*
+ * Fix for Linux issue found by chaoslawful (via agentzh):
+ * "According to the source of libpq (around fe-connect.c:1215), during
+ * the state switch from CONNECTION_STARTED to CONNECTION_MADE, there's
+ * no socket read/write operations (just a plain getsockopt call and a
+ * getsockname call). Therefore, for edge-triggered event model, we
+ * have to call PQconnectPoll one more time (immediately) when we see
+ * CONNECTION_MADE is returned, or we're very likely to wait for a
+ * writable event that has already appeared and will never appear
+ * again :)"
+ */
+ if (PQstatus(pgdt->pgconn) == CONNECTION_MADE && pgxc->write->ready) {
+ dd("re-polling on connection made");
+
+ pgrc = PQconnectPoll(pgdt->pgconn);
+ dd("re-polling rc:%d", (int) pgrc);
+
+ ngx_log_debug1(NGX_LOG_DEBUG_HTTP, pgxc->log, 0,
+ "postgres: re-polling while connecting, rc:%d",
+ (int) pgrc);
+
+ if (pgrc == PGRES_POLLING_READING || pgrc == PGRES_POLLING_WRITING)
+ {
+ ngx_log_debug1(NGX_LOG_DEBUG_HTTP, pgxc->log, 0,
+ "postgres: busy while connecting, rc:%d",
+ (int) pgrc);
+
+ dd("returning NGX_AGAIN");
+ return NGX_AGAIN;
+ }
+
+ goto done;
+ }
+
+#if defined(DDEBUG) && (DDEBUG)
+ switch (PQstatus(pgdt->pgconn)) {
+ case CONNECTION_NEEDED:
+ dd("connecting (waiting for connect()))");
+ break;
+ case CONNECTION_STARTED:
+ dd("connecting (waiting for connection to be made)");
+ break;
+ case CONNECTION_MADE:
+ dd("connecting (connection established)");
+ break;
+ case CONNECTION_AWAITING_RESPONSE:
+ dd("connecting (credentials sent, waiting for response)");
+ break;
+ case CONNECTION_AUTH_OK:
+ dd("connecting (authenticated)");
+ break;
+ case CONNECTION_SETENV:
+ dd("connecting (negotiating envinroment)");
+ break;
+ case CONNECTION_SSL_STARTUP:
+ dd("connecting (negotiating SSL)");
+ break;
+ default:
+ /*
+ * This cannot happen, PQconnectPoll would return
+ * PGRES_POLLING_FAILED in that case.
+ */
+ dd("connecting (unknown state:%d)", (int) PQstatus(pgdt->pgconn));
+
+ dd("returning NGX_ERROR");
+ return NGX_ERROR;
+ }
+#endif /* DDEBUG */
+
+ ngx_log_debug1(NGX_LOG_DEBUG_HTTP, pgxc->log, 0,
+ "postgres: busy while connecting, rc:%d", (int) pgrc);
+
+ dd("returning NGX_AGAIN");
+ return NGX_AGAIN;
+ }
+
+done:
+
+ /* remove connection timeout from new connection */
+ if (pgxc->write->timer_set) {
+ ngx_del_timer(pgxc->write);
+ }
+
+ if (pgrc != PGRES_POLLING_OK) {
+ dd("connection failed");
+ ngx_log_error(NGX_LOG_ERR, pgxc->log, 0,
+ "postgres: connection failed: %s",
+ PQerrorMessage(pgdt->pgconn));
+
+ dd("returning NGX_ERROR");
+ return NGX_ERROR;
+ }
+
+ dd("connected successfully");
+ ngx_log_debug0(NGX_LOG_DEBUG_HTTP, pgxc->log, 0,
+ "postgres: connected successfully");
+
+ pgxc->log->action = "sending query to PostgreSQL database";
+ pgdt->state = state_db_send_query;
+
+ dd("returning");
+ return ngx_postgres_upstream_send_query(r, pgxc, pgdt);
+}
+
+ngx_int_t
+ngx_postgres_upstream_send_query(ngx_http_request_t *r, ngx_connection_t *pgxc,
+ ngx_postgres_upstream_peer_data_t *pgdt)
+{
+ ngx_postgres_loc_conf_t *pglcf;
+ ngx_int_t pgrc;
+ u_char *query;
+
+ dd("entering");
+
+ pglcf = ngx_http_get_module_loc_conf(r, ngx_postgres_module);
+
+ query = ngx_pnalloc(r->pool, pgdt->query.len + 1);
+ if (query == NULL) {
+ dd("returning NGX_ERROR");
+ return NGX_ERROR;
+ }
+
+ (void) ngx_cpystrn(query, pgdt->query.data, pgdt->query.len + 1);
+
+ dd("sending query: %s", query);
+ ngx_log_debug1(NGX_LOG_DEBUG_HTTP, pgxc->log, 0,
+ "postgres: sending query: \"%s\"", query);
+
+ if (pglcf->output_binary) {
+ pgrc = PQsendQueryParams(pgdt->pgconn, (const char *) query,
+ 0, NULL, NULL, NULL, NULL, /* binary */ 1);
+ } else {
+ pgrc = PQsendQuery(pgdt->pgconn, (const char *) query);
+ }
+
+ if (pgrc == 0) {
+ dd("sending query failed");
+ ngx_log_error(NGX_LOG_ERR, pgxc->log, 0,
+ "postgres: sending query failed: %s",
+ PQerrorMessage(pgdt->pgconn));
+
+ dd("returning NGX_ERROR");
+ return NGX_ERROR;
+ }
+
+ /* set result timeout */
+ ngx_add_timer(pgxc->read, r->upstream->conf->read_timeout);
+
+ dd("query sent successfully");
+ ngx_log_debug0(NGX_LOG_DEBUG_HTTP, pgxc->log, 0,
+ "postgres: query sent successfully");
+
+ pgxc->log->action = "waiting for result from PostgreSQL database";
+ pgdt->state = state_db_get_result;
+
+ dd("returning NGX_DONE");
+ return NGX_DONE;
+}
+
+ngx_int_t
+ngx_postgres_upstream_get_result(ngx_http_request_t *r, ngx_connection_t *pgxc,
+ ngx_postgres_upstream_peer_data_t *pgdt)
+{
+ ExecStatusType pgrc;
+ PGresult *res;
+ ngx_int_t rc;
+
+ dd("entering");
+
+ /* remove connection timeout from re-used keepalive connection */
+ if (pgxc->write->timer_set) {
+ ngx_del_timer(pgxc->write);
+ }
+
+ if (!PQconsumeInput(pgdt->pgconn)) {
+ ngx_log_error(NGX_LOG_ERR, pgxc->log, 0,
+ "postgres: failed to consume input: %s",
+ PQerrorMessage(pgdt->pgconn));
+
+ dd("returning NGX_ERROR");
+ return NGX_ERROR;
+ }
+
+ if (PQisBusy(pgdt->pgconn)) {
+ ngx_log_debug0(NGX_LOG_DEBUG_HTTP, pgxc->log, 0,
+ "postgres: busy while receiving result");
+
+ dd("returning NGX_AGAIN");
+ return NGX_AGAIN;
+ }
+
+ dd("receiving result");
+
+ res = PQgetResult(pgdt->pgconn);
+ if (res == NULL) {
+ dd("receiving result failed");
+ ngx_log_error(NGX_LOG_ERR, pgxc->log, 0,
+ "postgres: failed to receive result: %s",
+ PQerrorMessage(pgdt->pgconn));
+
+ dd("returning NGX_ERROR");
+ return NGX_ERROR;
+ }
+
+ pgrc = PQresultStatus(res);
+ if ((pgrc != PGRES_COMMAND_OK) && (pgrc != PGRES_TUPLES_OK)) {
+ dd("receiving result failed");
+ ngx_log_error(NGX_LOG_ERR, pgxc->log, 0,
+ "postgres: failed to receive result: %s: %s",
+ PQresStatus(pgrc),
+ PQerrorMessage(pgdt->pgconn));
+
+ PQclear(res);
+
+ dd("returning NGX_HTTP_INTERNAL_SERVER_ERROR");
+ return NGX_HTTP_INTERNAL_SERVER_ERROR;
+ }
+
+ dd("result received successfully, cols:%d rows:%d",
+ PQnfields(res), PQntuples(res));
+
+ ngx_log_debug2(NGX_LOG_DEBUG_HTTP, pgxc->log, 0,
+ "postgres: result received successfully, cols:%d rows:%d",
+ PQnfields(res), PQntuples(res));
+
+ pgxc->log->action = "processing result from PostgreSQL database";
+ rc = ngx_postgres_process_response(r, res);
+
+ PQclear(res);
+
+ if (rc != NGX_DONE) {
+ dd("returning rc:%d", (int) rc);
+ return rc;
+ }
+
+ dd("result processed successfully");
+
+ pgxc->log->action = "waiting for ACK from PostgreSQL database";
+ pgdt->state = state_db_get_ack;
+
+ dd("returning");
+ return ngx_postgres_upstream_get_ack(r, pgxc, pgdt);
+}
+
+ngx_int_t
+ngx_postgres_process_response(ngx_http_request_t *r, PGresult *res)
+{
+ ngx_postgres_loc_conf_t *pglcf;
+ ngx_postgres_ctx_t *pgctx;
+ ngx_postgres_rewrite_conf_t *pgrcf;
+ ngx_postgres_variable_t *pgvar;
+ ngx_str_t *store;
+ char *affected;
+ size_t affected_len;
+ ngx_uint_t i;
+ ngx_int_t rc;
+
+ dd("entering");
+
+ pglcf = ngx_http_get_module_loc_conf(r, ngx_postgres_module);
+ pgctx = ngx_http_get_module_ctx(r, ngx_postgres_module);
+
+ /* set $postgres_columns */
+ pgctx->var_cols = PQnfields(res);
+
+ /* set $postgres_rows */
+ pgctx->var_rows = PQntuples(res);
+
+ /* set $postgres_affected */
+ if (ngx_strncmp(PQcmdStatus(res), "SELECT", sizeof("SELECT") - 1)) {
+ affected = PQcmdTuples(res);
+ affected_len = ngx_strlen(affected);
+ if (affected_len) {
+ pgctx->var_affected = ngx_atoi((u_char *) affected, affected_len);
+ }
+ }
+
+ if (pglcf->rewrites) {
+ /* process rewrites */
+ pgrcf = pglcf->rewrites->elts;
+ for (i = 0; i < pglcf->rewrites->nelts; i++) {
+ rc = pgrcf[i].handler(r, &pgrcf[i]);
+ if (rc != NGX_DECLINED) {
+ if (rc >= NGX_HTTP_SPECIAL_RESPONSE) {
+ dd("returning NGX_DONE, status %d", (int) rc);
+ pgctx->status = rc;
+ return NGX_DONE;
+ }
+
+ pgctx->status = rc;
+ break;
+ }
+ }
+ }
+
+ if (pglcf->variables) {
+ /* set custom variables */
+ pgvar = pglcf->variables->elts;
+ store = pgctx->variables->elts;
+
+ for (i = 0; i < pglcf->variables->nelts; i++) {
+ store[i] = ngx_postgres_variable_set_custom(r, res, &pgvar[i]);
+ if ((store[i].len == 0) && (pgvar[i].value.required)) {
+ dd("returning NGX_DONE, status NGX_HTTP_INTERNAL_SERVER_ERROR");
+ pgctx->status = NGX_HTTP_INTERNAL_SERVER_ERROR;
+ return NGX_DONE;
+ }
+ }
+ }
+
+ if (pglcf->output_handler) {
+ /* generate output */
+ dd("returning");
+ return pglcf->output_handler(r, res);
+ }
+
+ dd("returning NGX_DONE");
+ return NGX_DONE;
+}
+
+ngx_int_t
+ngx_postgres_upstream_get_ack(ngx_http_request_t *r, ngx_connection_t *pgxc,
+ ngx_postgres_upstream_peer_data_t *pgdt)
+{
+ PGresult *res;
+
+ dd("entering");
+
+ if (!PQconsumeInput(pgdt->pgconn)) {
+ dd("returning NGX_ERROR");
+ return NGX_ERROR;
+ }
+
+ if (PQisBusy(pgdt->pgconn)) {
+ dd("returning NGX_AGAIN");
+ return NGX_AGAIN;
+ }
+
+ /* remove result timeout */
+ if (pgxc->read->timer_set) {
+ ngx_del_timer(pgxc->read);
+ }
+
+ dd("receiving ACK (ready for next query)");
+
+ res = PQgetResult(pgdt->pgconn);
+ if (res != NULL) {
+ dd("receiving ACK failed");
+ ngx_log_error(NGX_LOG_ERR, pgxc->log, 0,
+ "postgres: receiving ACK failed: multiple queries(?)");
+
+ PQclear(res);
+
+ dd("returning NGX_HTTP_INTERNAL_SERVER_ERROR");
+ return NGX_HTTP_INTERNAL_SERVER_ERROR;
+ }
+
+ dd("ACK received successfully");
+
+ pgxc->log->action = "being idle on PostgreSQL database";
+ pgdt->state = state_db_idle;
+
+ dd("returning");
+ return ngx_postgres_upstream_done(r, r->upstream, pgdt);
+}
+
+ngx_int_t
+ngx_postgres_upstream_done(ngx_http_request_t *r, ngx_http_upstream_t *u,
+ ngx_postgres_upstream_peer_data_t *pgdt)
+{
+ ngx_postgres_ctx_t *pgctx;
+
+ dd("entering");
+
+ /* flag for keepalive */
+ u->headers_in.status_n = NGX_HTTP_OK;
+
+ pgctx = ngx_http_get_module_ctx(r, ngx_postgres_module);
+
+ if (pgctx->status >= NGX_HTTP_SPECIAL_RESPONSE) {
+ ngx_postgres_upstream_finalize_request(r, u, pgctx->status);
+ } else {
+ ngx_postgres_upstream_finalize_request(r, u, NGX_OK);
+ }
+
+ dd("returning NGX_DONE");
+ return NGX_DONE;
+}