summaryrefslogtreecommitdiff
path: root/rds-csv-nginx-module-0.09/src
diff options
context:
space:
mode:
authorkaiwu <kaiwu2004@gmail.com>2025-03-01 12:42:23 +0800
committerkaiwu <kaiwu2004@gmail.com>2025-03-01 12:42:23 +0800
commit3f33461e4948bf05e60bdff35ec6c57a649c7860 (patch)
tree284c2ba95a41536ae1bff6bea710db0709a64739 /rds-csv-nginx-module-0.09/src
downloadopenresty-3f33461e4948bf05e60bdff35ec6c57a649c7860.tar.gz
openresty-3f33461e4948bf05e60bdff35ec6c57a649c7860.zip
openresty bundle
Diffstat (limited to 'rds-csv-nginx-module-0.09/src')
-rw-r--r--rds-csv-nginx-module-0.09/src/ddebug.h85
-rw-r--r--rds-csv-nginx-module-0.09/src/ngx_http_rds.h41
-rw-r--r--rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_filter_module.c519
-rw-r--r--rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_filter_module.h89
-rw-r--r--rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_output.c770
-rw-r--r--rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_output.h38
-rw-r--r--rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_processor.c476
-rw-r--r--rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_processor.h34
-rw-r--r--rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_util.c105
-rw-r--r--rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_util.h34
-rw-r--r--rds-csv-nginx-module-0.09/src/ngx_http_rds_utils.h205
-rw-r--r--rds-csv-nginx-module-0.09/src/resty_dbd_stream.h59
12 files changed, 2455 insertions, 0 deletions
diff --git a/rds-csv-nginx-module-0.09/src/ddebug.h b/rds-csv-nginx-module-0.09/src/ddebug.h
new file mode 100644
index 0000000..5c15a74
--- /dev/null
+++ b/rds-csv-nginx-module-0.09/src/ddebug.h
@@ -0,0 +1,85 @@
+#ifndef DDEBUG_H
+#define DDEBUG_H
+
+#include <ngx_config.h>
+#include <ngx_core.h>
+
+#if defined(DDEBUG) && (DDEBUG)
+
+# define dd_dump_chain_size() { \
+ int n; \
+ ngx_chain_t *cl; \
+ \
+ for (n = 0, cl = ctx->out; cl; cl = cl->next, n++) { \
+ } \
+ \
+ dd("chain size: %d", n); \
+ }
+
+# if (NGX_HAVE_VARIADIC_MACROS)
+
+# define dd(...) fprintf(stderr, "rds-csv *** %s: ", __func__); \
+ fprintf(stderr, __VA_ARGS__); \
+ fprintf(stderr, " at %s line %d.\n", __FILE__, __LINE__)
+
+# else
+
+#include <stdarg.h>
+#include <stdio.h>
+
+#include <stdarg.h>
+
+static ngx_inline void
+dd(const char * fmt, ...) {
+}
+
+# endif
+
+#else
+
+# define dd_dump_chain_size()
+
+# if (NGX_HAVE_VARIADIC_MACROS)
+
+# define dd(...)
+
+# else
+
+#include <stdarg.h>
+
+static ngx_inline void
+dd(const char * fmt, ...) {
+}
+
+# endif
+
+#endif
+
+#if defined(DDEBUG) && (DDEBUG)
+
+#define dd_check_read_event_handler(r) \
+ dd("r->read_event_handler = %s", \
+ r->read_event_handler == ngx_http_block_reading ? \
+ "ngx_http_block_reading" : \
+ r->read_event_handler == ngx_http_test_reading ? \
+ "ngx_http_test_reading" : \
+ r->read_event_handler == ngx_http_request_empty_handler ? \
+ "ngx_http_request_empty_handler" : "UNKNOWN")
+
+#define dd_check_write_event_handler(r) \
+ dd("r->write_event_handler = %s", \
+ r->write_event_handler == ngx_http_handler ? \
+ "ngx_http_handler" : \
+ r->write_event_handler == ngx_http_core_run_phases ? \
+ "ngx_http_core_run_phases" : \
+ r->write_event_handler == ngx_http_request_empty_handler ? \
+ "ngx_http_request_empty_handler" : "UNKNOWN")
+
+#else
+
+#define dd_check_read_event_handler(r)
+#define dd_check_write_event_handler(r)
+
+#endif
+
+#endif /* DDEBUG_H */
diff --git a/rds-csv-nginx-module-0.09/src/ngx_http_rds.h b/rds-csv-nginx-module-0.09/src/ngx_http_rds.h
new file mode 100644
index 0000000..2e3d878
--- /dev/null
+++ b/rds-csv-nginx-module-0.09/src/ngx_http_rds.h
@@ -0,0 +1,41 @@
+
+/*
+ * Copyright (C) agentzh
+ */
+
+#ifndef NGX_HTTP_RDS_H
+#define NGX_HTTP_RDS_H
+
+
+#include "resty_dbd_stream.h"
+#include <nginx.h>
+#include <ngx_core.h>
+#include <ngx_http.h>
+
+
+typedef struct {
+ uint16_t std_errcode;
+ uint16_t drv_errcode;
+ ngx_str_t errstr;
+
+ uint64_t affected_rows;
+ uint64_t insert_id;
+ uint16_t col_count;
+
+} ngx_http_rds_header_t;
+
+
+typedef struct ngx_http_rds_column_s {
+ rds_col_type_t std_type;
+ uint16_t drv_type;
+
+ ngx_str_t name;
+
+} ngx_http_rds_column_t;
+
+
+
+
+
+#endif /* NGX_HTTP_RDS_H */
+
diff --git a/rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_filter_module.c b/rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_filter_module.c
new file mode 100644
index 0000000..35b6862
--- /dev/null
+++ b/rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_filter_module.c
@@ -0,0 +1,519 @@
+
+/*
+ * Copyright (C) Yichun Zhang (agentzh)
+ */
+
+
+#ifndef DDEBUG
+#define DDEBUG 0
+#endif
+#include "ddebug.h"
+
+
+#include "ngx_http_rds_csv_filter_module.h"
+#include "ngx_http_rds_csv_util.h"
+#include "ngx_http_rds_csv_processor.h"
+#include "ngx_http_rds_csv_output.h"
+
+#include <ngx_config.h>
+
+
+#define ngx_http_rds_csv_content_type "text/csv"
+#define ngx_http_rds_csv_row_term "\r\n"
+
+
+static volatile ngx_cycle_t *ngx_http_rds_csv_prev_cycle = NULL;
+
+
+ngx_http_output_header_filter_pt ngx_http_rds_csv_next_header_filter;
+ngx_http_output_body_filter_pt ngx_http_rds_csv_next_body_filter;
+
+
+static void *ngx_http_rds_csv_create_loc_conf(ngx_conf_t *cf);
+static char *ngx_http_rds_csv_merge_loc_conf(ngx_conf_t *cf, void *parent,
+ void *child);
+static ngx_int_t ngx_http_rds_csv_filter_init(ngx_conf_t *cf);
+static char *ngx_http_rds_csv_row_terminator(ngx_conf_t *cf,
+ ngx_command_t *cmd, void *conf);
+static char *ngx_http_rds_csv_field_separator(ngx_conf_t *cf,
+ ngx_command_t *cmd, void *conf);
+static char *ngx_http_rds_csv(ngx_conf_t *cf, ngx_command_t *cmd, void *conf);
+static void *ngx_http_rds_csv_create_main_conf(ngx_conf_t *cf);
+
+
+static ngx_command_t ngx_http_rds_csv_commands[] = {
+
+ { ngx_string("rds_csv"),
+ NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF
+ |NGX_HTTP_LOC_CONF|NGX_HTTP_LIF_CONF
+ |NGX_CONF_FLAG,
+ ngx_http_rds_csv,
+ NGX_HTTP_LOC_CONF_OFFSET,
+ offsetof(ngx_http_rds_csv_loc_conf_t, enabled),
+ NULL },
+
+ { ngx_string("rds_csv_row_terminator"),
+ NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF
+ |NGX_HTTP_LOC_CONF|NGX_HTTP_LIF_CONF
+ |NGX_CONF_TAKE1,
+ ngx_http_rds_csv_row_terminator,
+ NGX_HTTP_LOC_CONF_OFFSET,
+ offsetof(ngx_http_rds_csv_loc_conf_t, row_term),
+ NULL },
+
+ { ngx_string("rds_csv_field_separator"),
+ NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF
+ |NGX_HTTP_LOC_CONF|NGX_HTTP_LIF_CONF
+ |NGX_CONF_TAKE1,
+ ngx_http_rds_csv_field_separator,
+ NGX_HTTP_LOC_CONF_OFFSET,
+ 0,
+ NULL },
+
+ { ngx_string("rds_csv_field_name_header"),
+ NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF
+ |NGX_HTTP_LOC_CONF|NGX_HTTP_LIF_CONF
+ |NGX_CONF_FLAG,
+ ngx_conf_set_flag_slot,
+ NGX_HTTP_LOC_CONF_OFFSET,
+ offsetof(ngx_http_rds_csv_loc_conf_t, field_name_header),
+ NULL },
+
+ { ngx_string("rds_csv_content_type"),
+ NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF
+ |NGX_HTTP_LOC_CONF|NGX_HTTP_LIF_CONF
+ |NGX_CONF_TAKE1,
+ ngx_conf_set_str_slot,
+ NGX_HTTP_LOC_CONF_OFFSET,
+ offsetof(ngx_http_rds_csv_loc_conf_t, content_type),
+ NULL },
+
+ { ngx_string("rds_csv_buffer_size"),
+ NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_HTTP_LIF_CONF
+ |NGX_CONF_TAKE1,
+ ngx_conf_set_size_slot,
+ NGX_HTTP_LOC_CONF_OFFSET,
+ offsetof(ngx_http_rds_csv_loc_conf_t, buf_size),
+ NULL },
+
+ ngx_null_command
+};
+
+
+static ngx_http_module_t ngx_http_rds_csv_filter_module_ctx = {
+ NULL, /* preconfiguration */
+ ngx_http_rds_csv_filter_init, /* postconfiguration */
+
+ ngx_http_rds_csv_create_main_conf, /* create main configuration */
+ NULL, /* init main configuration */
+
+ NULL, /* create server configuration */
+ NULL, /* merge server configuration */
+
+ ngx_http_rds_csv_create_loc_conf, /* create location configuration */
+ ngx_http_rds_csv_merge_loc_conf /* merge location configuration */
+};
+
+
+ngx_module_t ngx_http_rds_csv_filter_module = {
+ NGX_MODULE_V1,
+ &ngx_http_rds_csv_filter_module_ctx, /* module context */
+ ngx_http_rds_csv_commands, /* module directives */
+ NGX_HTTP_MODULE, /* module type */
+ NULL, /* init master */
+ NULL, /* init module */
+ NULL, /* init process */
+ NULL, /* init thread */
+ NULL, /* exit thread */
+ NULL, /* exit process */
+ NULL, /* exit master */
+ NGX_MODULE_V1_PADDING
+};
+
+
+static ngx_int_t
+ngx_http_rds_csv_header_filter(ngx_http_request_t *r)
+{
+ ngx_http_rds_csv_ctx_t *ctx;
+ ngx_http_rds_csv_loc_conf_t *conf;
+ size_t len;
+ u_char *p;
+
+ /* XXX maybe we can generate stub JSON strings like
+ * {"errcode":403,"error":"Permission denied"}
+ * for HTTP error pages? */
+ if ((r->headers_out.status < NGX_HTTP_OK)
+ || (r->headers_out.status >= NGX_HTTP_SPECIAL_RESPONSE)
+ || (r->headers_out.status == NGX_HTTP_NO_CONTENT)
+ || (r->headers_out.status == NGX_HTTP_RESET_CONTENT))
+ {
+ ngx_http_set_ctx(r, NULL, ngx_http_rds_csv_filter_module);
+
+ dd("status is not OK: %d, skipping", (int) r->headers_out.status);
+
+ return ngx_http_rds_csv_next_header_filter(r);
+ }
+
+ /* r->headers_out.status = 0; */
+
+ conf = ngx_http_get_module_loc_conf(r, ngx_http_rds_csv_filter_module);
+
+ if (!conf->enabled) {
+ return ngx_http_rds_csv_next_header_filter(r);
+ }
+
+ if (ngx_http_rds_csv_test_content_type(r) != NGX_OK) {
+ return ngx_http_rds_csv_next_header_filter(r);
+ }
+
+ if (conf->content_type.len == sizeof(ngx_http_rds_csv_content_type) - 1
+ && ngx_strncmp(conf->content_type.data, ngx_http_rds_csv_content_type,
+ sizeof(ngx_http_rds_csv_content_type) - 1) == 0)
+ {
+ /* MIME type is text/csv, we process Content-Type
+ * according to RFC 4180 */
+
+ len = sizeof(ngx_http_rds_csv_content_type) - 1
+ + sizeof("; header=") - 1;
+
+ if (conf->field_name_header) {
+ len += sizeof("presence") - 1;
+
+ } else {
+ len += sizeof("absence") - 1;
+ }
+
+ p = ngx_palloc(r->pool, len);
+ if (p == NULL) {
+ return NGX_ERROR;
+ }
+
+ r->headers_out.content_type.len = len;
+ r->headers_out.content_type_len = len;
+
+ r->headers_out.content_type.data = p;
+
+ p = ngx_copy(p, conf->content_type.data, conf->content_type.len);
+
+ if (conf->field_name_header) {
+ p = ngx_copy_literal(p, "; header=presence");
+
+ } else {
+ p = ngx_copy_literal(p, "; header=absence");
+ }
+
+ if (p - r->headers_out.content_type.data != (ssize_t) len) {
+ ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
+ "rds_csv: content type buffer error: %uz != %uz",
+ (size_t) (p - r->headers_out.content_type.data),
+ len);
+
+ return NGX_ERROR;
+ }
+
+ } else {
+ /* custom MIME-type, we just pass it through */
+
+ r->headers_out.content_type = conf->content_type;
+ r->headers_out.content_type_len = conf->content_type.len;
+ }
+
+ ctx = ngx_pcalloc(r->pool, sizeof(ngx_http_rds_csv_ctx_t));
+
+ if (ctx == NULL) {
+ return NGX_ERROR;
+ }
+
+ ctx->tag = (ngx_buf_tag_t) &ngx_http_rds_csv_filter_module;
+
+ ctx->state = state_expect_header;
+
+ ctx->header_sent = 0;
+
+ ctx->last_out = &ctx->out;
+
+ /* set by ngx_pcalloc
+ * ctx->out = NULL
+ * ctx->busy_bufs = NULL
+ * ctx->free_bufs = NULL
+ * ctx->cached = (ngx_buf_t) 0
+ * ctx->postponed = (ngx_buf_t) 0
+ * ctx->avail_out = 0
+ * ctx->col_names = NULL
+ * ctx->col_count = 0
+ * ctx->cur_col = 0
+ * ctx->field_offset = 0
+ * ctx->field_total = 0
+ * ctx->field_data_rest = 0
+ */
+
+ ngx_http_set_ctx(r, ctx, ngx_http_rds_csv_filter_module);
+
+ ngx_http_clear_content_length(r);
+
+ r->filter_need_in_memory = 1;
+
+ /* we do postpone the header sending to the body filter */
+ return NGX_OK;
+}
+
+
+static ngx_int_t
+ngx_http_rds_csv_body_filter(ngx_http_request_t *r, ngx_chain_t *in)
+{
+ ngx_http_rds_csv_ctx_t *ctx;
+ ngx_int_t rc;
+
+ if (in == NULL || r->header_only) {
+ return ngx_http_rds_csv_next_body_filter(r, in);
+ }
+
+ ctx = ngx_http_get_module_ctx(r, ngx_http_rds_csv_filter_module);
+
+ if (ctx == NULL) {
+ return ngx_http_rds_csv_next_body_filter(r, in);
+ }
+
+ switch (ctx->state) {
+
+ case state_expect_header:
+ rc = ngx_http_rds_csv_process_header(r, in, ctx);
+ break;
+
+ case state_expect_col:
+ rc = ngx_http_rds_csv_process_col(r, in, ctx);
+ break;
+
+ case state_expect_row:
+ rc = ngx_http_rds_csv_process_row(r, in, ctx);
+ break;
+
+ case state_expect_field:
+ rc = ngx_http_rds_csv_process_field(r, in, ctx);
+ break;
+
+ case state_expect_more_field_data:
+ rc = ngx_http_rds_csv_process_more_field_data(r, in, ctx);
+ break;
+
+ case state_done:
+
+ /* mark the remaining bufs as consumed */
+
+ dd("discarding bufs");
+
+ ngx_http_rds_csv_discard_bufs(r->pool, in);
+
+ return NGX_OK;
+ break;
+ default:
+ ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
+ "rds_csv: invalid internal state: %d",
+ ctx->state);
+
+ rc = NGX_ERROR;
+
+ break;
+ }
+
+ dd("body filter rc: %d", (int) rc);
+
+ if (rc == NGX_ERROR || rc >= NGX_HTTP_SPECIAL_RESPONSE) {
+ ctx->state = state_done;
+
+ if (!ctx->header_sent) {
+ ctx->header_sent = 1;
+
+ if (rc == NGX_ERROR) {
+ rc = NGX_HTTP_INTERNAL_SERVER_ERROR;
+ }
+
+ r->headers_out.status = rc;
+
+ dd("sending ERROR headers");
+
+ ngx_http_rds_csv_next_header_filter(r);
+ ngx_http_send_special(r, NGX_HTTP_LAST);
+
+ return NGX_ERROR;
+ }
+
+ return NGX_ERROR;
+ }
+
+ dd("output bufs");
+
+ return ngx_http_rds_csv_output_bufs(r, ctx);
+}
+
+
+static ngx_int_t
+ngx_http_rds_csv_filter_init(ngx_conf_t *cf)
+{
+ int multi_http_blocks;
+ ngx_http_rds_csv_main_conf_t *rmcf;
+
+ rmcf = ngx_http_conf_get_module_main_conf(cf,
+ ngx_http_rds_csv_filter_module);
+
+ if (ngx_http_rds_csv_prev_cycle != ngx_cycle) {
+ ngx_http_rds_csv_prev_cycle = ngx_cycle;
+ multi_http_blocks = 0;
+
+ } else {
+ multi_http_blocks = 1;
+ }
+
+ if (multi_http_blocks || rmcf->requires_filter) {
+ ngx_http_rds_csv_next_header_filter = ngx_http_top_header_filter;
+ ngx_http_top_header_filter = ngx_http_rds_csv_header_filter;
+
+ ngx_http_rds_csv_next_body_filter = ngx_http_top_body_filter;
+ ngx_http_top_body_filter = ngx_http_rds_csv_body_filter;
+ }
+
+ return NGX_OK;
+}
+
+
+static void *
+ngx_http_rds_csv_create_loc_conf(ngx_conf_t *cf)
+{
+ ngx_http_rds_csv_loc_conf_t *conf;
+
+ conf = ngx_pcalloc(cf->pool, sizeof(ngx_http_rds_csv_loc_conf_t));
+ if (conf == NULL) {
+ return NULL;
+ }
+
+ /*
+ * set by ngx_pcalloc():
+ *
+ * conf->content_type = { 0, NULL };
+ * conf->row_term = { 0, NULL };
+ */
+
+ conf->enabled = NGX_CONF_UNSET;
+ conf->field_sep = NGX_CONF_UNSET_UINT;
+ conf->buf_size = NGX_CONF_UNSET_SIZE;
+ conf->field_name_header = NGX_CONF_UNSET;
+
+ return conf;
+}
+
+
+static char *
+ngx_http_rds_csv_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child)
+{
+ ngx_http_rds_csv_loc_conf_t *prev = parent;
+ ngx_http_rds_csv_loc_conf_t *conf = child;
+
+ ngx_conf_merge_value(conf->enabled, prev->enabled, 0);
+
+ ngx_conf_merge_value(conf->field_name_header, prev->field_name_header, 1);
+
+ ngx_conf_merge_uint_value(conf->field_sep, prev->field_sep,
+ (ngx_uint_t) ',');
+
+ ngx_conf_merge_str_value(conf->row_term, prev->row_term,
+ ngx_http_rds_csv_row_term);
+
+ ngx_conf_merge_str_value(conf->content_type, prev->content_type,
+ ngx_http_rds_csv_content_type);
+
+ ngx_conf_merge_size_value(conf->buf_size, prev->buf_size,
+ (size_t) ngx_pagesize);
+
+ return NGX_CONF_OK;
+}
+
+
+static char *
+ngx_http_rds_csv_row_terminator(ngx_conf_t *cf, ngx_command_t *cmd,
+ void *conf)
+{
+ ngx_http_rds_csv_loc_conf_t *rlcf = conf;
+ ngx_str_t *value;
+ ngx_str_t *term;
+
+ if (rlcf->row_term.len != 0) {
+ return "is duplicate";
+ }
+
+ value = cf->args->elts;
+
+ term = &value[1];
+
+ if (term->len == 0) {
+ return "takes empty string value";
+ }
+
+ if ((term->len == 1 && term->data[0] == '\n')
+ || (term->len == 2 && term->data[0] == '\r' && term->data[1] == '\n'))
+ {
+ return ngx_conf_set_str_slot(cf, cmd, conf);
+ }
+
+ return "takes a value other than \"\\n\" and \"\\r\\n\"";
+}
+
+
+static char *
+ngx_http_rds_csv_field_separator(ngx_conf_t *cf, ngx_command_t *cmd,
+ void *conf)
+{
+ ngx_http_rds_csv_loc_conf_t *rlcf = conf;
+ ngx_str_t *value;
+ ngx_str_t *sep;
+
+ if (rlcf->field_sep != NGX_CONF_UNSET_UINT) {
+ return "is duplicate";
+ }
+
+ value = cf->args->elts;
+
+ sep = &value[1];
+
+ if (sep->len != 1) {
+ return "takes a string value not of length 1";
+ }
+
+ if (sep->data[0] == ',' || sep->data[0] == ';' || sep->data[0] == '\t') {
+ rlcf->field_sep = (ngx_uint_t) (sep->data[0]);
+ return NGX_CONF_OK;
+ }
+
+ return "takes a value other than \",\", \";\", and \"\\t\"";
+}
+
+
+static char *
+ngx_http_rds_csv(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
+{
+ ngx_http_rds_csv_main_conf_t *rmcf;
+
+ rmcf = ngx_http_conf_get_module_main_conf(cf,
+ ngx_http_rds_csv_filter_module);
+
+ rmcf->requires_filter = 1;
+
+ return ngx_conf_set_flag_slot(cf, cmd, conf);
+}
+
+
+static void *
+ngx_http_rds_csv_create_main_conf(ngx_conf_t *cf)
+{
+ ngx_http_rds_csv_main_conf_t *rmcf;
+
+ rmcf = ngx_pcalloc(cf->pool, sizeof(ngx_http_rds_csv_main_conf_t));
+ if (rmcf == NULL) {
+ return NULL;
+ }
+
+ /* set by ngx_pcalloc:
+ * rmcf->requires_filter = 0;
+ */
+
+ return rmcf;
+}
diff --git a/rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_filter_module.h b/rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_filter_module.h
new file mode 100644
index 0000000..ab43c4a
--- /dev/null
+++ b/rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_filter_module.h
@@ -0,0 +1,89 @@
+
+/*
+ * Copyright (C) agentzh
+ */
+
+
+#ifndef NGX_HTTP_RDS_CSV_FILTER_MODULE_H
+#define NGX_HTTP_RDS_CSV_FILTER_MODULE_H
+
+
+#include "ngx_http_rds.h"
+
+#include <ngx_core.h>
+#include <ngx_http.h>
+#include <nginx.h>
+
+
+#ifndef NGX_HTTP_RESET_CONTENT
+#define NGX_HTTP_RESET_CONTENT 205
+#endif
+
+
+extern ngx_module_t ngx_http_rds_csv_filter_module;
+
+extern ngx_http_output_header_filter_pt ngx_http_rds_csv_next_header_filter;
+extern ngx_http_output_body_filter_pt ngx_http_rds_csv_next_body_filter;
+
+
+typedef struct {
+ ngx_flag_t enabled;
+ ngx_str_t row_term;
+ ngx_uint_t field_sep;
+ size_t buf_size;
+ ngx_flag_t field_name_header;
+ ngx_str_t content_type;
+} ngx_http_rds_csv_loc_conf_t;
+
+
+typedef struct {
+ ngx_int_t requires_filter;
+} ngx_http_rds_csv_main_conf_t;
+
+
+typedef enum {
+ state_expect_header,
+ state_expect_col,
+ state_expect_row,
+ state_expect_field,
+ state_expect_more_field_data,
+ state_done
+
+} ngx_http_rds_csv_state_t;
+
+
+typedef struct {
+ ngx_http_rds_csv_state_t state;
+
+ ngx_str_t *col_name;
+ ngx_uint_t col_count;
+ ngx_uint_t cur_col;
+
+ ngx_http_rds_column_t *cols;
+ size_t row;
+
+ uint32_t field_offset;
+ uint32_t field_total;
+
+ ngx_buf_tag_t tag;
+
+ ngx_chain_t *out;
+ ngx_chain_t **last_out;
+ ngx_chain_t *busy_bufs;
+ ngx_chain_t *free_bufs;
+
+ ngx_buf_t *out_buf;
+ ngx_buf_t cached;
+ ngx_buf_t postponed;
+
+ size_t avail_out;
+
+ uint32_t field_data_rest;
+
+ ngx_flag_t header_sent:1;
+ ngx_flag_t seen_stream_end:1;
+ ngx_flag_t generated_col_names:1;
+} ngx_http_rds_csv_ctx_t;
+
+
+#endif /* NGX_HTTP_RDS_CSV_FILTER_MODULE_H */
diff --git a/rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_output.c b/rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_output.c
new file mode 100644
index 0000000..abb7db3
--- /dev/null
+++ b/rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_output.c
@@ -0,0 +1,770 @@
+
+/*
+ * Copyright (C) Yichun Zhang (agentzh)
+ */
+
+
+#ifndef DDEBUG
+#define DDEBUG 0
+#endif
+#include "ddebug.h"
+
+
+#include "ngx_http_rds_csv_filter_module.h"
+#include "ngx_http_rds_csv_output.h"
+#include "ngx_http_rds_csv_util.h"
+#include "resty_dbd_stream.h"
+
+
+static u_char *ngx_http_rds_csv_request_mem(ngx_http_request_t *r,
+ ngx_http_rds_csv_ctx_t *ctx, size_t len);
+static ngx_int_t ngx_http_rds_csv_get_buf(ngx_http_request_t *r,
+ ngx_http_rds_csv_ctx_t *ctx);
+static u_char *ngx_http_rds_csv_get_postponed(ngx_http_request_t *r,
+ ngx_http_rds_csv_ctx_t *ctx, size_t len);
+static ngx_int_t ngx_http_rds_csv_submit_mem(ngx_http_request_t *r,
+ ngx_http_rds_csv_ctx_t *ctx, size_t len, unsigned last_buf);
+static size_t ngx_get_num_size(uint64_t i);
+
+
+ngx_int_t
+ngx_http_rds_csv_output_literal(ngx_http_request_t *r,
+ ngx_http_rds_csv_ctx_t *ctx, u_char *data, size_t len,
+ int last_buf)
+{
+ u_char *pos;
+
+ pos = ngx_http_rds_csv_request_mem(r, ctx, len);
+ if (pos == NULL) {
+ return NGX_ERROR;
+ }
+
+ ngx_memcpy(pos, data, len);
+
+ dd("before output chain");
+
+ if (last_buf) {
+ ctx->seen_stream_end = 1;
+
+ if (r != r->main) {
+ last_buf = 0;
+ }
+ }
+
+ return ngx_http_rds_csv_submit_mem(r, ctx, len, (unsigned) last_buf);
+}
+
+
+ngx_int_t
+ngx_http_rds_csv_output_bufs(ngx_http_request_t *r,
+ ngx_http_rds_csv_ctx_t *ctx)
+{
+ ngx_int_t rc;
+ ngx_chain_t *cl;
+
+ dd("entered output chain");
+
+ if (ctx->seen_stream_end) {
+ ctx->seen_stream_end = 0;
+
+ if (ctx->avail_out) {
+ cl = ngx_alloc_chain_link(r->pool);
+ if (cl == NULL) {
+ return NGX_ERROR;
+ }
+
+ cl->buf = ctx->out_buf;
+ cl->next = NULL;
+ *ctx->last_out = cl;
+ ctx->last_out = &cl->next;
+
+ ctx->avail_out = 0;
+ }
+ }
+
+ dd_dump_chain_size();
+
+ for ( ;; ) {
+ if (ctx->out == NULL) {
+ /* fprintf(stderr, "\n"); */
+ return NGX_OK;
+ }
+
+ /* fprintf(stderr, "XXX Relooping..."); */
+
+ rc = ngx_http_rds_csv_next_body_filter(r, ctx->out);
+
+ if (rc == NGX_ERROR || rc >= NGX_HTTP_SPECIAL_RESPONSE) {
+ return rc;
+ }
+
+#if defined(nginx_version) && nginx_version >= 1001004
+ ngx_chain_update_chains(r->pool, &ctx->free_bufs, &ctx->busy_bufs,
+ &ctx->out, ctx->tag);
+#else
+ ngx_chain_update_chains(&ctx->free_bufs, &ctx->busy_bufs, &ctx->out,
+ ctx->tag);
+#endif
+
+ ctx->last_out = &ctx->out;
+ }
+
+ /* impossible to reach here */
+ return NGX_ERROR;
+}
+
+
+ngx_int_t
+ngx_http_rds_csv_output_header(ngx_http_request_t *r,
+ ngx_http_rds_csv_ctx_t *ctx, ngx_http_rds_header_t *header)
+{
+ u_char *pos, *last;
+ size_t size;
+ uintptr_t escape;
+ unsigned last_buf = 0;
+ unsigned need_quotes = 0;
+ u_char sep;
+
+ ngx_http_rds_csv_loc_conf_t *conf;
+
+ /* calculate the buffer size */
+
+ conf = ngx_http_get_module_loc_conf(r, ngx_http_rds_csv_filter_module);
+
+ if (conf->field_name_header) {
+ size = sizeof("errcode,errstr,insert_id,affected_rows") - 1
+ + conf->row_term.len;
+
+ } else {
+ size = 0;
+ }
+
+ sep = (u_char) conf->field_sep;
+
+ size += 3 /* field seperators */ + conf->row_term.len;
+
+ size += ngx_get_num_size(header->std_errcode);
+
+ escape = ngx_http_rds_csv_escape_csv_str(sep, NULL, header->errstr.data,
+ header->errstr.len,
+ &need_quotes);
+
+ if (need_quotes) {
+ size += sizeof("\"\"") - 1;
+ }
+
+ size += header->errstr.len + escape
+ + ngx_get_num_size(header->insert_id)
+ + ngx_get_num_size(header->affected_rows);
+
+ /* create the buffer */
+
+ pos = ngx_http_rds_csv_request_mem(r, ctx, size);
+ if (pos == NULL) {
+ return NGX_ERROR;
+ }
+
+ last = pos;
+
+ /* fill up the buffer */
+
+ last = ngx_sprintf(last, "errcode%cerrstr%cinsert_id%caffected_rows%V"
+ "%uD%c", sep, sep, sep, &conf->row_term,
+ (uint32_t) header->std_errcode, sep);
+
+ if (need_quotes) {
+ *last++ = '"';
+ }
+
+ if (escape == 0) {
+ last = ngx_copy(last, header->errstr.data, header->errstr.len);
+
+ } else {
+ last = (u_char *)
+ ngx_http_rds_csv_escape_csv_str(sep, last,
+ header->errstr.data,
+ header->errstr.len, NULL);
+ }
+
+ if (need_quotes) {
+ *last++ = '"';
+ }
+
+ last = ngx_sprintf(last, "%c%uL%c%uL%V", sep, header->insert_id, sep,
+ header->affected_rows, &conf->row_term);
+
+ if ((size_t) (last - pos) != size) {
+ ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
+ "rds_csv: output header buffer error: %uz != %uz",
+ (size_t) (last - pos), size);
+
+ return NGX_ERROR;
+ }
+
+ if (r == r->main) {
+ last_buf = 1;
+ }
+
+ ctx->seen_stream_end = 1;
+
+ return ngx_http_rds_csv_submit_mem(r, ctx, size, last_buf);
+}
+
+
+ngx_int_t
+ngx_http_rds_csv_output_field_names(ngx_http_request_t *r,
+ ngx_http_rds_csv_ctx_t *ctx)
+{
+ ngx_uint_t i;
+ ngx_http_rds_column_t *col;
+ size_t size;
+ u_char *pos, *last;
+ uintptr_t escape = 0;
+ unsigned need_quotes;
+ u_char sep;
+ ngx_http_rds_csv_loc_conf_t *conf;
+
+ conf = ngx_http_get_module_loc_conf(r, ngx_http_rds_csv_filter_module);
+
+ sep = (u_char) conf->field_sep;
+
+ size = ctx->col_count - 1 /* field sep count */
+ + conf->row_term.len;
+
+ for (i = 0; i < ctx->col_count; i++) {
+ col = &ctx->cols[i];
+ escape = ngx_http_rds_csv_escape_csv_str(sep, NULL, col->name.data,
+ col->name.len, &need_quotes);
+
+ dd("field escape: %d", (int) escape);
+
+ if (need_quotes) {
+ size += sizeof("\"\"") - 1;
+ }
+
+ size += col->name.len + escape;
+ }
+
+ ctx->generated_col_names = 1;
+
+ pos = ngx_http_rds_csv_request_mem(r, ctx, size);
+ if (pos == NULL) {
+ return NGX_ERROR;
+ }
+
+ last = pos;
+
+ for (i = 0; i < ctx->col_count; i++) {
+ col = &ctx->cols[i];
+
+ escape = ngx_http_rds_csv_escape_csv_str(sep, NULL, col->name.data,
+ col->name.len, &need_quotes);
+
+ if (need_quotes) {
+ *last++ = '"';
+ }
+
+ if (escape == 0) {
+ last = ngx_copy(last, col->name.data, col->name.len);
+
+ } else {
+ last = (u_char *)
+ ngx_http_rds_csv_escape_csv_str(sep, last,
+ col->name.data,
+ col->name.len, NULL);
+ }
+
+ if (need_quotes) {
+ *last++ = '"';
+ }
+
+ if (i != ctx->col_count - 1) {
+ *last++ = sep;
+ }
+ }
+
+ last = ngx_copy(last, conf->row_term.data, conf->row_term.len);
+
+ if ((size_t) (last - pos) != size) {
+ ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
+ "rds_csv: output field names buffer error: %uz != %uz",
+ (size_t) (last - pos), size);
+
+ return NGX_ERROR;
+ }
+
+ return ngx_http_rds_csv_submit_mem(r, ctx, size, 0);
+}
+
+
+ngx_int_t
+ngx_http_rds_csv_output_field(ngx_http_request_t *r,
+ ngx_http_rds_csv_ctx_t *ctx, u_char *data, size_t len, int is_null)
+{
+ u_char *pos, *last;
+ ngx_http_rds_column_t *col;
+ size_t size;
+ uintptr_t val_escape = 0;
+ unsigned need_quotes = 0;
+ u_char sep;
+ ngx_http_rds_csv_loc_conf_t *conf;
+#if DDEBUG
+ u_char *p;
+#endif
+
+ conf = ngx_http_get_module_loc_conf(r, ngx_http_rds_csv_filter_module);
+
+ sep = (u_char) conf->field_sep;
+
+ dd("reading row %llu, col %d, len %d",
+ (unsigned long long) ctx->row,
+ (int) ctx->cur_col, (int) len);
+
+ /* calculate the buffer size */
+
+ if (ctx->cur_col == 0) {
+ size = 0;
+
+ } else {
+ size = 1 /* field sep */;
+ }
+
+ col = &ctx->cols[ctx->cur_col];
+
+ if (len == 0 && ctx->field_data_rest > 0) {
+ ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
+ "rds_csv: at least one octet should go with the field "
+ "size in one buf");
+
+ return NGX_ERROR;
+ }
+
+ if (is_null) {
+ /* SQL NULL is just empty in the CSV field */
+
+ } else if (len == 0) {
+ /* empty string is also empty */
+
+ } else {
+ switch (col->std_type & 0xc000) {
+ case rds_rough_col_type_float:
+ case rds_rough_col_type_int:
+ case rds_rough_col_type_bool:
+ size += len;
+ break;
+
+ default:
+ dd("string field found");
+
+ val_escape = ngx_http_rds_csv_escape_csv_str(sep, NULL, data, len,
+ &need_quotes);
+
+ if (ctx->field_data_rest > 0 && !need_quotes) {
+ need_quotes = 1;
+ }
+
+ if (need_quotes) {
+ if (ctx->field_data_rest == 0) {
+ size += sizeof("\"\"") - 1;
+
+ } else {
+ size += sizeof("\"") - 1;
+ }
+ }
+
+ size += len + val_escape;
+ break;
+ }
+ }
+
+ if (ctx->field_data_rest == 0 && ctx->cur_col == ctx->col_count - 1) {
+ /* last column in the row */
+ size += conf->row_term.len;
+ }
+
+ /* allocate the buffer */
+
+ pos = ngx_http_rds_csv_request_mem(r, ctx, size);
+ if (pos == NULL) {
+ return NGX_ERROR;
+ }
+
+ last = pos;
+
+ /* fill up the buffer */
+
+ if (ctx->cur_col != 0) {
+ *last++ = sep;
+ }
+
+ if (is_null || len == 0) {
+ /* do nothing */
+
+ } else {
+ switch (col->std_type & 0xc000) {
+ case rds_rough_col_type_int:
+ case rds_rough_col_type_float:
+ case rds_rough_col_type_bool:
+ last = ngx_copy(last, data, len);
+ break;
+
+ default:
+ /* string */
+ if (need_quotes) {
+ *last++ = '"';
+ }
+
+ if (val_escape == 0) {
+ last = ngx_copy(last, data, len);
+
+ } else {
+ dd("field: string value escape non-zero: %d",
+ (int) val_escape);
+
+#if DDEBUG
+ p = last;
+#endif
+
+ last = (u_char *)
+ ngx_http_rds_csv_escape_csv_str(sep, last, data, len,
+ NULL);
+
+#if DDEBUG
+ dd("escaped value \"%.*s\" (len %d, escape %d, escape2 %d)",
+ (int) (len + val_escape),
+ p, (int) (len + val_escape),
+ (int) val_escape,
+ (int) ((last - p) - len));
+#endif
+ }
+
+ if (need_quotes && ctx->field_data_rest == 0) {
+ *last++ = '"';
+ }
+
+ break;
+ }
+ }
+
+ if (ctx->field_data_rest == 0 && ctx->cur_col == ctx->col_count - 1) {
+ last = ngx_copy(last, conf->row_term.data, conf->row_term.len);
+ }
+
+ if ((size_t) (last - pos) != size) {
+ ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
+ "rds_csv: output field: buffer error (%d left)",
+ (int) size - (last - pos));
+
+ return NGX_ERROR;
+ }
+
+ return ngx_http_rds_csv_submit_mem(r, ctx, size, 0);
+}
+
+
+ngx_int_t
+ngx_http_rds_csv_output_more_field_data(ngx_http_request_t *r,
+ ngx_http_rds_csv_ctx_t *ctx, u_char *data, size_t len)
+{
+ u_char *pos, *last;
+ size_t size = 0;
+ ngx_http_rds_column_t *col;
+ uintptr_t escape = 0;
+#if DDEBUG
+ u_char *p;
+#endif
+ unsigned need_quotes;
+ u_char sep;
+ ngx_http_rds_csv_loc_conf_t *conf;
+
+ conf = ngx_http_get_module_loc_conf(r, ngx_http_rds_csv_filter_module);
+
+ sep = (u_char) conf->field_sep;
+
+ /* calculate the buffer size */
+
+ col = &ctx->cols[ctx->cur_col];
+
+ switch (col->std_type & 0xc000) {
+ case rds_rough_col_type_int:
+ case rds_rough_col_type_float:
+ case rds_rough_col_type_bool:
+ size += len;
+ break;
+
+ default:
+ /* string */
+
+ escape = ngx_http_rds_csv_escape_csv_str(sep, NULL, data, len,
+ &need_quotes);
+
+ size = len + escape;
+
+ if (ctx->field_data_rest == 0) {
+ size += sizeof("\"") - 1;
+ }
+
+ break;
+ }
+
+ if (ctx->field_data_rest == 0 && ctx->cur_col == ctx->col_count - 1) {
+ /* last column in the row */
+ size += conf->row_term.len;
+ }
+
+ /* allocate the buffer */
+
+ pos = ngx_http_rds_csv_request_mem(r, ctx, size);
+ if (pos == NULL) {
+ return NGX_ERROR;
+ }
+
+ last = pos;
+
+ /* fill up the buffer */
+
+ switch (col->std_type & 0xc000) {
+ case rds_rough_col_type_int:
+ case rds_rough_col_type_float:
+ case rds_rough_col_type_bool:
+ last = ngx_copy(last, data, len);
+ break;
+
+ default:
+ /* string */
+ if (escape == 0) {
+ last = ngx_copy(last, data, len);
+
+ } else {
+ dd("more field data: string value escape non-zero: %d",
+ (int) escape);
+
+#if DDEBUG
+ p = last;
+#endif
+
+ last = (u_char *) ngx_http_rds_csv_escape_csv_str(sep, last, data,
+ len, NULL);
+
+#if DDEBUG
+ dd("escaped value \"%.*s\" (len %d, escape %d, escape2 %d)",
+ (int) (len + escape),
+ p, (int) (len + escape),
+ (int) escape,
+ (int) ((last - p) - len));
+#endif
+ }
+
+ if (ctx->field_data_rest == 0) {
+ *last++ = '"';
+ }
+
+ break;
+ } /* switch */
+
+ if (ctx->field_data_rest == 0 && ctx->cur_col == ctx->col_count - 1) {
+ /* last column in the row */
+ last = ngx_copy(last, conf->row_term.data, conf->row_term.len);
+ }
+
+ if ((size_t) (last - pos) != size) {
+ ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
+ "rds_csv: output more field data: buffer error "
+ "(%d left)", (int) (size - (last - pos)));
+ return NGX_ERROR;
+ }
+
+ return ngx_http_rds_csv_submit_mem(r, ctx, size, 0);
+}
+
+
+static u_char *
+ngx_http_rds_csv_request_mem(ngx_http_request_t *r,
+ ngx_http_rds_csv_ctx_t *ctx, size_t len)
+{
+ ngx_int_t rc;
+ u_char *p;
+
+ rc = ngx_http_rds_csv_get_buf(r, ctx);
+ if (rc != NGX_OK) {
+ return NULL;
+ }
+
+ if (ctx->avail_out < len) {
+ p = ngx_http_rds_csv_get_postponed(r, ctx, len);
+ if (p == NULL) {
+ return NULL;
+ }
+
+ ctx->postponed.pos = p;
+ ctx->postponed.last = p + len;
+
+ return p;
+ }
+
+ return ctx->out_buf->last;
+}
+
+
+static ngx_int_t
+ngx_http_rds_csv_get_buf(ngx_http_request_t *r, ngx_http_rds_csv_ctx_t *ctx)
+{
+ ngx_http_rds_csv_loc_conf_t *conf;
+
+ dd("MEM enter");
+
+ if (ctx->avail_out) {
+ return NGX_OK;
+ }
+
+ conf = ngx_http_get_module_loc_conf(r, ngx_http_rds_csv_filter_module);
+
+ if (ctx->free_bufs) {
+ dd("MEM reusing temp buf from free_bufs");
+
+ ctx->out_buf = ctx->free_bufs->buf;
+ ctx->free_bufs = ctx->free_bufs->next;
+
+ } else {
+ dd("MEM creating temp buf with size: %d", (int) conf->buf_size);
+ ctx->out_buf = ngx_create_temp_buf(r->pool, conf->buf_size);
+ if (ctx->out_buf == NULL) {
+ return NGX_ERROR;
+ }
+
+ ctx->out_buf->tag = (ngx_buf_tag_t) &ngx_http_rds_csv_filter_module;
+ ctx->out_buf->recycled = 1;
+ }
+
+ ctx->avail_out = conf->buf_size;
+
+ return NGX_OK;
+}
+
+
+static u_char *
+ngx_http_rds_csv_get_postponed(ngx_http_request_t *r,
+ ngx_http_rds_csv_ctx_t *ctx, size_t len)
+{
+ u_char *p;
+
+ dd("MEM enter");
+
+ if (ctx->cached.start == NULL) {
+ goto alloc;
+ }
+
+ if ((size_t) (ctx->cached.end - ctx->cached.start) < len) {
+ ngx_pfree(r->pool, ctx->cached.start);
+ goto alloc;
+ }
+
+ return ctx->cached.start;
+
+alloc:
+
+ p = ngx_palloc(r->pool, len);
+ if (p == NULL) {
+ return NULL;
+ }
+
+ ctx->cached.start = p;
+ ctx->cached.end = p + len;
+
+ return p;
+}
+
+
+static ngx_int_t
+ngx_http_rds_csv_submit_mem(ngx_http_request_t *r,
+ ngx_http_rds_csv_ctx_t *ctx, size_t len, unsigned last_buf)
+{
+ ngx_chain_t *cl;
+ ngx_int_t rc;
+
+ if (ctx->postponed.pos != NULL) {
+ dd("MEM copy postponed data over to ctx->out for len %d", (int) len);
+
+ for ( ;; ) {
+ len = ctx->postponed.last - ctx->postponed.pos;
+ if (len > ctx->avail_out) {
+ len = ctx->avail_out;
+ }
+
+ ctx->out_buf->last = ngx_copy(ctx->out_buf->last,
+ ctx->postponed.pos, len);
+
+ ctx->avail_out -= len;
+
+ ctx->postponed.pos += len;
+
+ if (ctx->postponed.pos == ctx->postponed.last) {
+ ctx->postponed.pos = NULL;
+ }
+
+ if (ctx->avail_out > 0) {
+ break;
+ }
+
+ dd("MEM save ctx->out_buf");
+
+ cl = ngx_alloc_chain_link(r->pool);
+ if (cl == NULL) {
+ return NGX_ERROR;
+ }
+
+ cl->buf = ctx->out_buf;
+ cl->next = NULL;
+ *ctx->last_out = cl;
+ ctx->last_out = &cl->next;
+
+ if (ctx->postponed.pos == NULL) {
+ ctx->out_buf->last_buf = last_buf;
+ break;
+ }
+
+ rc = ngx_http_rds_csv_get_buf(r, ctx);
+ if (rc != NGX_OK) {
+ return NGX_ERROR;
+ }
+ }
+
+ return NGX_OK;
+ }
+
+ dd("MEM consuming out_buf for %d", (int) len);
+
+ ctx->out_buf->last += len;
+ ctx->avail_out -= len;
+ ctx->out_buf->last_buf = last_buf;
+
+ if (ctx->avail_out == 0) {
+ dd("MEM save ctx->out_buf");
+
+ cl = ngx_alloc_chain_link(r->pool);
+ if (cl == NULL) {
+ return NGX_ERROR;
+ }
+
+ cl->buf = ctx->out_buf;
+ cl->next = NULL;
+ *ctx->last_out = cl;
+ ctx->last_out = &cl->next;
+ }
+
+ return NGX_OK;
+}
+
+
+static size_t
+ngx_get_num_size(uint64_t i)
+{
+ size_t n = 0;
+
+ do {
+ i = i / 10;
+ n++;
+ } while (i > 0);
+
+ return n;
+}
diff --git a/rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_output.h b/rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_output.h
new file mode 100644
index 0000000..43b955f
--- /dev/null
+++ b/rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_output.h
@@ -0,0 +1,38 @@
+
+/*
+ * Copyright (C) Yichun Zhang (agentzh)
+ */
+
+
+#ifndef NGX_HTTP_RDS_CSV_OUTPUT_H
+#define NGX_HTTP_RDS_CSV_OUTPUT_H
+
+
+#include "ngx_http_rds_csv_filter_module.h"
+#include "ngx_http_rds.h"
+
+#include <ngx_core.h>
+#include <ngx_http.h>
+#include <nginx.h>
+
+
+ngx_int_t ngx_http_rds_csv_output_header(ngx_http_request_t *r,
+ ngx_http_rds_csv_ctx_t *ctx, ngx_http_rds_header_t *header);
+
+ngx_int_t ngx_http_rds_csv_output_field_names(ngx_http_request_t *r,
+ ngx_http_rds_csv_ctx_t *ctx);
+
+ngx_int_t ngx_http_rds_csv_output_literal(ngx_http_request_t *r,
+ ngx_http_rds_csv_ctx_t *ctx, u_char *data, size_t len, int last_buf);
+
+ngx_int_t ngx_http_rds_csv_output_bufs(ngx_http_request_t *r,
+ ngx_http_rds_csv_ctx_t *ctx);
+
+ngx_int_t ngx_http_rds_csv_output_field(ngx_http_request_t *r,
+ ngx_http_rds_csv_ctx_t *ctx, u_char *data, size_t len, int is_null);
+
+ngx_int_t ngx_http_rds_csv_output_more_field_data(ngx_http_request_t *r,
+ ngx_http_rds_csv_ctx_t *ctx, u_char *data, size_t len);
+
+
+#endif /* NGX_HTTP_RDS_CSV_OUTPUT_H */
diff --git a/rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_processor.c b/rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_processor.c
new file mode 100644
index 0000000..4a156d1
--- /dev/null
+++ b/rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_processor.c
@@ -0,0 +1,476 @@
+/*
+ * Copyright (C) agentzh
+ */
+
+
+#ifndef DDEBUG
+#define DDEBUG 0
+#endif
+#include "ddebug.h"
+
+
+#include "ngx_http_rds_csv_processor.h"
+#include "ngx_http_rds_csv_util.h"
+#include "ngx_http_rds_csv_output.h"
+#include "ngx_http_rds.h"
+#include "ngx_http_rds_utils.h"
+
+
+#include <ngx_core.h>
+#include <ngx_http.h>
+
+
+ngx_int_t
+ngx_http_rds_csv_process_header(ngx_http_request_t *r, ngx_chain_t *in,
+ ngx_http_rds_csv_ctx_t *ctx)
+{
+ ngx_buf_t *b;
+ ngx_http_rds_header_t header;
+ ngx_int_t rc;
+
+ if (in == NULL) {
+ return NGX_OK;
+ }
+
+ b = in->buf;
+
+ if (!ngx_buf_in_memory(b)) {
+ if (!ngx_buf_special(b)) {
+ ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
+ "rds_csv: process header: buf from "
+ "upstream not in memory");
+ goto invalid;
+ }
+
+ in = in->next;
+
+ if (in == NULL) {
+ return NGX_OK;
+ }
+
+ b = in->buf;
+ }
+
+ rc = ngx_http_rds_parse_header(r, b, &header);
+
+ if (rc != NGX_OK) {
+ goto invalid;
+ }
+
+ dd("col count: %d", (int) header.col_count);
+
+ if (header.col_count == 0) {
+ /* for empty result set, just return the JSON
+ * representation of the RDS header */
+
+ dd("col count == 0");
+
+ if (b->pos != b->last) {
+ ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
+ "rds_csv: header: there's unexpected remaining data "
+ "in the buf");
+
+ goto invalid;
+ }
+
+ ctx->state = state_done;
+
+ /* now we send the postponed response header */
+ if (!ctx->header_sent) {
+ ctx->header_sent = 1;
+
+ rc = ngx_http_rds_csv_next_header_filter(r);
+
+ if (rc == NGX_ERROR || rc >= NGX_HTTP_SPECIAL_RESPONSE) {
+ return rc;
+ }
+ }
+
+ rc = ngx_http_rds_csv_output_header(r, ctx, &header);
+
+ if (rc == NGX_ERROR || rc >= NGX_HTTP_SPECIAL_RESPONSE) {
+ return rc;
+ }
+
+ ngx_http_rds_csv_discard_bufs(r->pool, in);
+
+ return rc;
+ }
+
+ ctx->cols = ngx_palloc(r->pool,
+ header.col_count * sizeof(ngx_http_rds_column_t));
+
+ if (ctx->cols == NULL) {
+ goto invalid;
+ }
+
+ ctx->state = state_expect_col;
+ ctx->cur_col = 0;
+ ctx->col_count = header.col_count;
+
+ /* now we send the postponed response header */
+ if (!ctx->header_sent) {
+ ctx->header_sent = 1;
+
+ rc = ngx_http_rds_csv_next_header_filter(r);
+ if (rc == NGX_ERROR || rc >= NGX_HTTP_SPECIAL_RESPONSE) {
+ return rc;
+ }
+ }
+
+ return ngx_http_rds_csv_process_col(r, b->pos == b->last ? in->next : in,
+ ctx);
+
+invalid:
+
+ dd("return 500");
+ if (!ctx->header_sent) {
+ ctx->header_sent = 1;
+
+ r->headers_out.status = NGX_HTTP_INTERNAL_SERVER_ERROR;
+ ngx_http_send_header(r);
+ ngx_http_send_special(r, NGX_HTTP_LAST);
+
+ return NGX_ERROR;
+ }
+
+ return NGX_ERROR;
+}
+
+
+ngx_int_t
+ngx_http_rds_csv_process_col(ngx_http_request_t *r, ngx_chain_t *in,
+ ngx_http_rds_csv_ctx_t *ctx)
+{
+ ngx_buf_t *b;
+ ngx_int_t rc;
+ ngx_http_rds_csv_loc_conf_t *conf;
+
+ if (in == NULL) {
+ return NGX_OK;
+ }
+
+ b = in->buf;
+
+ if (!ngx_buf_in_memory(b)) {
+ if (!ngx_buf_special(b)) {
+ ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
+ "rds_csv: process col: buf from upstream not in "
+ "memory");
+ return NGX_ERROR;
+ }
+
+ in = in->next;
+
+ if (in == NULL) {
+ return NGX_OK;
+ }
+
+ b = in->buf;
+ }
+
+ dd("parsing rds column");
+
+ rc = ngx_http_rds_parse_col(r, b, &ctx->cols[ctx->cur_col]);
+
+ dd("parse col returns %d (%d)", (int) rc, (int) NGX_OK);
+
+ if (rc != NGX_OK) {
+ return NGX_ERROR;
+ }
+
+ if (b->pos == b->last) {
+ dd("parse col buf consumed");
+ in = in->next;
+ }
+
+ ctx->cur_col++;
+
+ if (ctx->cur_col >= ctx->col_count) {
+ dd("end of column list");
+
+ ctx->state = state_expect_row;
+ ctx->row = 0;
+
+ conf = ngx_http_get_module_loc_conf(r, ngx_http_rds_csv_filter_module);
+
+ if (conf->field_name_header) {
+ rc = ngx_http_rds_csv_output_field_names(r, ctx);
+
+ if (rc == NGX_ERROR || rc >= NGX_HTTP_SPECIAL_RESPONSE) {
+ return rc;
+ }
+ }
+
+ dd("after output literal");
+
+ if (rc == NGX_ERROR || rc >= NGX_HTTP_SPECIAL_RESPONSE) {
+ return rc;
+ }
+
+
+ dd("process col is entering process row...");
+ return ngx_http_rds_csv_process_row(r, in, ctx);
+ }
+
+ return ngx_http_rds_csv_process_col(r, in, ctx);
+}
+
+
+ngx_int_t
+ngx_http_rds_csv_process_row(ngx_http_request_t *r, ngx_chain_t *in,
+ ngx_http_rds_csv_ctx_t *ctx)
+{
+ ngx_buf_t *b;
+ ngx_int_t rc;
+
+ if (in == NULL) {
+ return NGX_OK;
+ }
+
+ dd("process row");
+
+ b = in->buf;
+
+ if (!ngx_buf_in_memory(b)) {
+ if (!ngx_buf_special(b)) {
+ ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
+ "rds_csv: process row: buf from "
+ "upstream not in memory");
+ return NGX_ERROR;
+ }
+
+ in = in->next;
+
+ if (in == NULL) {
+ return NGX_OK;
+ }
+
+ b = in->buf;
+ }
+
+ if (b->last - b->pos < (ssize_t) sizeof(uint8_t)) {
+ ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
+ "rds_csv: row flag is incomplete in the buf");
+ return NGX_ERROR;
+ }
+
+ dd("row flag: %d (offset %d)", (char) *b->pos, (int) (b->pos - b->start));
+
+ if (*b->pos++ == 0) {
+ /* end of row list */
+ ctx->state = state_done;
+
+ if (b->pos != b->last) {
+ ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
+ "rds_csv: row: there's unexpected remaining data "
+ "in the buf");
+ return NGX_ERROR;
+ }
+
+ rc = ngx_http_rds_csv_output_literal(r, ctx, (u_char *) "", 0,
+ 1 /* last buf*/);
+
+ if (rc == NGX_ERROR || rc >= NGX_HTTP_SPECIAL_RESPONSE) {
+ return rc;
+ }
+
+ return rc;
+ }
+
+ ctx->row++;
+ ctx->cur_col = 0;
+ ctx->state = state_expect_field;
+
+ if (b->pos == b->last) {
+ in = in->next;
+
+ } else {
+ dd("process row: buf not consumed completely");
+ }
+
+ return ngx_http_rds_csv_process_field(r, in, ctx);
+}
+
+
+ngx_int_t
+ngx_http_rds_csv_process_field(ngx_http_request_t *r, ngx_chain_t *in,
+ ngx_http_rds_csv_ctx_t *ctx)
+{
+ size_t total, len;
+ ngx_buf_t *b;
+ ngx_int_t rc;
+
+ for (;;) {
+ if (in == NULL) {
+ return NGX_OK;
+ }
+
+ b = in->buf;
+
+ if (!ngx_buf_in_memory(b)) {
+ dd("buf not in memory");
+
+ if (!ngx_buf_special(b)) {
+ ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
+ "rds_csv: process field: buf from "
+ "upstream not in memory");
+ return NGX_ERROR;
+ }
+
+ in = in->next;
+
+ if (in == NULL) {
+ return NGX_OK;
+ }
+
+ b = in->buf;
+ }
+
+ dd("process field: buf size: %d", (int) ngx_buf_size(b));
+
+ if (b->last - b->pos < (ssize_t) sizeof(uint32_t)) {
+ ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
+ "rds_csv: field size is incomplete in the buf: %*s "
+ "(len: %d)", b->last - b->pos, b->pos,
+ (int) (b->last - b->pos));
+
+ return NGX_ERROR;
+ }
+
+ total = *(uint32_t *) b->pos;
+
+ dd("total: %d", (int) total);
+
+ b->pos += sizeof(uint32_t);
+
+ if (total == (uint32_t) -1) {
+ /* SQL NULL found */
+ total = 0;
+ len = 0;
+ ctx->field_data_rest = 0;
+
+ rc = ngx_http_rds_csv_output_field(r, ctx, b->pos, len,
+ 1 /* is null */);
+
+ } else {
+ len = (uint32_t) (b->last - b->pos);
+
+ if (len >= total) {
+ len = total;
+ }
+
+ ctx->field_data_rest = total - len;
+
+ rc = ngx_http_rds_csv_output_field(r, ctx, b->pos, len,
+ 0 /* not null */);
+ }
+
+ if (rc == NGX_ERROR || rc >= NGX_HTTP_SPECIAL_RESPONSE) {
+ return rc;
+ }
+
+ b->pos += len;
+
+ if (b->pos == b->last) {
+ in = in->next;
+ }
+
+ if (len < total) {
+ dd("process field: need to read more field data");
+
+ ctx->state = state_expect_more_field_data;
+
+ return ngx_http_rds_csv_process_more_field_data(r, in, ctx);
+ }
+
+ ctx->cur_col++;
+
+ if (ctx->cur_col >= ctx->col_count) {
+ dd("reached the end of the current row");
+
+ ctx->state = state_expect_row;
+
+ return ngx_http_rds_csv_process_row(r, in, ctx);
+ }
+
+ /* continue to process the next field (if any) */
+ }
+
+ /* impossible to reach here */
+
+ return NGX_ERROR;
+}
+
+
+ngx_int_t
+ngx_http_rds_csv_process_more_field_data(ngx_http_request_t *r,
+ ngx_chain_t *in, ngx_http_rds_csv_ctx_t *ctx)
+{
+ ngx_int_t rc;
+ ngx_buf_t *b;
+ size_t len;
+
+ for (;;) {
+ if (in == NULL) {
+ return NGX_OK;
+ }
+
+ b = in->buf;
+
+ if (!ngx_buf_in_memory(b)) {
+ ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
+ "rds_csv: buf from upstream not in memory");
+ return NGX_ERROR;
+ }
+
+ len = b->last - b->pos;
+
+ if (len >= ctx->field_data_rest) {
+ len = ctx->field_data_rest;
+ ctx->field_data_rest = 0;
+
+ } else {
+ ctx->field_data_rest -= len;
+ }
+
+ rc = ngx_http_rds_csv_output_more_field_data(r, ctx, b->pos, len);
+
+ if (rc == NGX_ERROR || rc >= NGX_HTTP_SPECIAL_RESPONSE) {
+ return rc;
+ }
+
+ b->pos += len;
+
+ if (b->pos == b->last) {
+ in = in->next;
+ }
+
+ if (ctx->field_data_rest) {
+ dd("process more field data: still some data remaining");
+ continue;
+ }
+
+ dd("process more field data: reached the end of the current field");
+
+ ctx->cur_col++;
+
+ if (ctx->cur_col >= ctx->col_count) {
+ dd("process more field data: reached the end of the current row");
+
+ ctx->state = state_expect_row;
+
+ return ngx_http_rds_csv_process_row(r, in, ctx);
+ }
+
+ dd("proces more field data: read the next field");
+
+ ctx->state = state_expect_field;
+
+ return ngx_http_rds_csv_process_field(r, in, ctx);
+ }
+
+ /* impossible to reach here */
+
+ return NGX_ERROR;
+}
diff --git a/rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_processor.h b/rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_processor.h
new file mode 100644
index 0000000..011913b
--- /dev/null
+++ b/rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_processor.h
@@ -0,0 +1,34 @@
+
+/*
+ * Copyright (C) Yichun Zhang (agentzh)
+ */
+
+
+#ifndef NGX_HTTP_RDS_CSV_PROCESSOR_H
+#define NGX_HTTP_RDS_CSV_PROCESSOR_H
+
+
+#include "ngx_http_rds_csv_filter_module.h"
+
+#include <ngx_core.h>
+#include <ngx_http.h>
+#include <nginx.h>
+
+
+ngx_int_t ngx_http_rds_csv_process_header(ngx_http_request_t *r,
+ ngx_chain_t *in, ngx_http_rds_csv_ctx_t *ctx);
+
+ngx_int_t ngx_http_rds_csv_process_col(ngx_http_request_t *r,
+ ngx_chain_t *in, ngx_http_rds_csv_ctx_t *ctx);
+
+ngx_int_t ngx_http_rds_csv_process_row(ngx_http_request_t *r,
+ ngx_chain_t *in, ngx_http_rds_csv_ctx_t *ctx);
+
+ngx_int_t ngx_http_rds_csv_process_field(ngx_http_request_t *r,
+ ngx_chain_t *in, ngx_http_rds_csv_ctx_t *ctx);
+
+ngx_int_t ngx_http_rds_csv_process_more_field_data(ngx_http_request_t *r,
+ ngx_chain_t *in, ngx_http_rds_csv_ctx_t *ctx);
+
+
+#endif /* NGX_HTTP_RDS_CSV_PROCESSOR_H */
diff --git a/rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_util.c b/rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_util.c
new file mode 100644
index 0000000..8ca0414
--- /dev/null
+++ b/rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_util.c
@@ -0,0 +1,105 @@
+
+/*
+ * Copyright (C) Yichun Zhang (agentzh)
+ */
+
+
+#ifndef DDEBUG
+#define DDEBUG 0
+#endif
+#include "ddebug.h"
+
+
+#include "resty_dbd_stream.h"
+#include "ngx_http_rds_csv_util.h"
+
+
+uintptr_t
+ngx_http_rds_csv_escape_csv_str(u_char field_sep, u_char *dst, u_char *src,
+ size_t size, unsigned *need_quotes)
+{
+ ngx_uint_t n;
+
+ if (dst == NULL) {
+ *need_quotes = 0;
+
+ /* find the number of characters to be escaped */
+
+ n = 0;
+
+ while (size) {
+ switch (*src) {
+ case '"':
+ n++;
+ /* fallthrough */
+
+ case '\r':
+ case '\n':
+ *need_quotes = 1;
+ break;
+
+ default:
+ if (*src == field_sep) {
+ *need_quotes = 1;
+ }
+ break;
+ }
+
+ src++;
+ size--;
+ }
+
+ return (uintptr_t) n;
+ }
+
+ while (size) {
+ if (*src == '"') {
+ *dst++ = '"';
+ *dst++ = '"';
+ src++;
+
+ } else {
+ *dst++ = *src++;
+ }
+
+ size--;
+ }
+
+ return (uintptr_t) dst;
+}
+
+
+ngx_int_t
+ngx_http_rds_csv_test_content_type(ngx_http_request_t *r)
+{
+ ngx_str_t *type;
+
+ type = &r->headers_out.content_type;
+ if (type->len != rds_content_type_len
+ || ngx_strncmp(type->data, rds_content_type, rds_content_type_len)
+ != 0)
+ {
+ return NGX_DECLINED;
+ }
+
+ return NGX_OK;
+}
+
+
+void
+ngx_http_rds_csv_discard_bufs(ngx_pool_t *pool, ngx_chain_t *in)
+{
+ ngx_chain_t *cl;
+
+ for (cl = in; cl; cl = cl->next) {
+#if 0
+ if (cl->buf->temporary
+ && ngx_buf_size(cl->buf) > 0)
+ {
+ ngx_pfree(pool, cl->buf->start);
+ }
+#endif
+
+ cl->buf->pos = cl->buf->last;
+ }
+}
diff --git a/rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_util.h b/rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_util.h
new file mode 100644
index 0000000..57fe892
--- /dev/null
+++ b/rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_util.h
@@ -0,0 +1,34 @@
+
+/*
+ * Copyright (C) Yichun Zhang (agentzh)
+ */
+
+
+#ifndef NGX_HTTP_RDS_CSV_UTIL_H
+#define NGX_HTTP_RDS_CSV_UTIL_H
+
+
+#include <ngx_core.h>
+#include <ngx_http.h>
+
+
+#ifndef NGX_UINT64_LEN
+#define NGX_UINT64_LEN (sizeof("18446744073709551615") - 1)
+#endif
+
+#ifndef NGX_UINT16_LEN
+#define NGX_UINT16_LEN (sizeof("65535") - 1)
+#endif
+
+#ifndef ngx_copy_literal
+#define ngx_copy_literal(p, s) ngx_copy(p, s, sizeof(s) - 1)
+#endif
+
+
+uintptr_t ngx_http_rds_csv_escape_csv_str(u_char field_sep, u_char *dst,
+ u_char *src, size_t size, unsigned *need_quotes);
+ngx_int_t ngx_http_rds_csv_test_content_type(ngx_http_request_t *r);
+void ngx_http_rds_csv_discard_bufs(ngx_pool_t *pool, ngx_chain_t *in);
+
+
+#endif /* NGX_HTTP_RDS_CSV_UTIL_H */
diff --git a/rds-csv-nginx-module-0.09/src/ngx_http_rds_utils.h b/rds-csv-nginx-module-0.09/src/ngx_http_rds_utils.h
new file mode 100644
index 0000000..4a71d4a
--- /dev/null
+++ b/rds-csv-nginx-module-0.09/src/ngx_http_rds_utils.h
@@ -0,0 +1,205 @@
+
+/*
+ * Copyright (C) Yichun Zhang (agentzh)
+ */
+
+
+#ifndef NGX_HTTP_RDS_UTILS_H
+#define NGX_HTTP_RDS_UTILS_H
+
+
+#include <stdint.h>
+
+
+static ngx_inline ngx_int_t
+ngx_http_rds_parse_header(ngx_http_request_t *r, ngx_buf_t *b,
+ ngx_http_rds_header_t *header)
+{
+ ssize_t rest;
+
+ rest = sizeof(uint8_t) /* endian type */
+ + sizeof(uint32_t) /* format version */
+ + sizeof(uint8_t) /* result type */
+
+ + sizeof(uint16_t) /* standard error code */
+ + sizeof(uint16_t) /* driver-specific error code */
+
+ + sizeof(uint16_t) /* driver-specific errstr len */
+ + 0 /* driver-specific errstr data */
+ + sizeof(uint64_t) /* affected rows */
+ + sizeof(uint64_t) /* insert id */
+ + sizeof(uint16_t) /* column count */
+ ;
+
+ if (b->last - b->pos < rest) {
+ ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
+ "rds: header is incomplete in the buf");
+ return NGX_ERROR;
+ }
+
+ /* check endian type */
+
+ if (*(uint8_t *) b->pos !=
+#if (NGX_HAVE_LITTLE_ENDIAN)
+ 0
+#else /* big endian */
+ 1
+#endif
+ )
+ {
+ ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
+ "rds: endian type in the header differ");
+ return NGX_ERROR;
+ }
+
+ b->pos += sizeof(uint8_t);
+
+ /* check RDS format version number */
+
+ if (*(uint32_t *) b->pos != (uint32_t) resty_dbd_stream_version) {
+ ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
+ "rds: RDS format version differ");
+ return NGX_ERROR;
+ }
+
+ dd("RDS format version: %d", (int) *(uint32_t *) b->pos);
+
+ b->pos += sizeof(uint32_t);
+
+ /* check RDS result type */
+
+ if (*b->pos != 0) {
+ ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
+ "rds: RDS result type must be 0 for now");
+ return NGX_ERROR;
+ }
+
+ b->pos++;
+
+ /* save the standard error code */
+
+ header->std_errcode = *(uint16_t *) b->pos;
+
+ b->pos += sizeof(uint16_t);
+
+ /* save the driver-specific error code */
+
+ header->drv_errcode = *(uint16_t *) b->pos;
+
+ b->pos += sizeof(uint16_t);
+
+ /* save the error string length */
+
+ header->errstr.len = *(uint16_t *) b->pos;
+
+ b->pos += sizeof(uint16_t);
+
+ dd("errstr len: %d", (int) header->errstr.len);
+
+ /* check the rest data's size */
+
+ rest = header->errstr.len
+ + sizeof(uint64_t) /* affected rows */
+ + sizeof(uint64_t) /* insert id */
+ + sizeof(uint16_t) /* column count */
+ ;
+
+ if (b->last - b->pos < rest) {
+ ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
+ "rds: header is incomplete in the buf");
+ return NGX_ERROR;
+ }
+
+ /* save the error string data */
+
+ header->errstr.data = b->pos;
+
+ b->pos += header->errstr.len;
+
+ /* save affected rows */
+
+ header->affected_rows = *(uint64_t *) b->pos;
+
+ b->pos += sizeof(uint64_t);
+
+ /* save insert id */
+
+ header->insert_id = *(uint64_t *)b->pos;
+
+ b->pos += sizeof(uint64_t);
+
+ /* save column count */
+
+ header->col_count = *(uint16_t *) b->pos;
+
+ b->pos += sizeof(uint16_t);
+
+ dd("saved column count: %d", (int) header->col_count);
+
+ return NGX_OK;
+}
+
+
+static ngx_inline ngx_int_t
+ngx_http_rds_parse_col(ngx_http_request_t *r, ngx_buf_t *b,
+ ngx_http_rds_column_t *col)
+{
+ ssize_t rest;
+
+ rest = sizeof(uint16_t) /* std col type */
+ + sizeof(uint16_t) /* driver col type */
+ + sizeof(uint16_t) /* col name str len */
+ ;
+
+ if (b->last - b->pos < rest) {
+ ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
+ "rds: column spec is incomplete in the buf");
+ return NGX_ERROR;
+ }
+
+ /* save standard column type */
+ col->std_type = *(uint16_t *) b->pos;
+ b->pos += sizeof(uint16_t);
+
+ /* save driver-specific column type */
+ col->drv_type = *(uint16_t *) b->pos;
+ b->pos += sizeof(uint16_t);
+
+ /* read column name string length */
+
+ col->name.len = *(uint16_t *) b->pos;
+ b->pos += sizeof(uint16_t);
+
+ if (col->name.len == 0) {
+ ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
+ "rds_csv: column name empty");
+ return NGX_ERROR;
+ }
+
+ rest = col->name.len;
+
+ if (b->last - b->pos < rest) {
+ ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
+ "rds: column name string is incomplete in the buf");
+ return NGX_ERROR;
+ }
+
+ /* save the column name string data */
+
+ col->name.data = ngx_palloc(r->pool, col->name.len);
+ if (col->name.data == NULL) {
+ return NGX_ERROR;
+ }
+
+ ngx_memcpy(col->name.data, b->pos, col->name.len);
+ b->pos += col->name.len;
+
+ dd("saved column name \"%.*s\" (len %d, offset %d)",
+ (int) col->name.len, col->name.data,
+ (int) col->name.len, (int) (b->pos - b->start));
+
+ return NGX_OK;
+}
+
+
+#endif /* NGX_HTTP_RDS_UTILS_H */
diff --git a/rds-csv-nginx-module-0.09/src/resty_dbd_stream.h b/rds-csv-nginx-module-0.09/src/resty_dbd_stream.h
new file mode 100644
index 0000000..ebb0cb3
--- /dev/null
+++ b/rds-csv-nginx-module-0.09/src/resty_dbd_stream.h
@@ -0,0 +1,59 @@
+#ifndef RESTY_DBD_STREAME_H
+#define RESTY_DBD_STREAME_H
+
+#define resty_dbd_stream_version 3
+#define resty_dbd_stream_version_string "0.0.3"
+
+#define rds_content_type \
+ "application/x-resty-dbd-stream"
+
+#define rds_content_type_len \
+ (sizeof(rds_content_type) - 1)
+
+
+typedef enum {
+ rds_rough_col_type_int = 0 << 14,
+ rds_rough_col_type_float = 1 << 14,
+ rds_rough_col_type_str = 2 << 14,
+ rds_rough_col_type_bool = 3 << 14
+
+} rds_rough_col_type_t;
+
+
+/* The following types (or spellings thereof) are specified
+ * by SQL:
+ * bigint, bit, bit varying, boolean, char, character varying,
+ * character, varchar, date, double precision, integer,
+ * interval, numeric, decimal, real, smallint,
+ * time (with or without time zone),
+ * timestamp (with or without time zone), xml */
+
+typedef enum {
+ rds_col_type_unknown = 0 | rds_rough_col_type_str,
+ rds_col_type_bigint = 1 | rds_rough_col_type_int,
+ rds_col_type_bit = 2 | rds_rough_col_type_str,
+ rds_col_type_bit_varying = 3 | rds_rough_col_type_str,
+
+ rds_col_type_bool = 4 | rds_rough_col_type_bool,
+ rds_col_type_char = 5 | rds_rough_col_type_str,
+ rds_col_type_varchar = 6 | rds_rough_col_type_str,
+ rds_col_type_date = 7 | rds_rough_col_type_str,
+ rds_col_type_double = 8 | rds_rough_col_type_float,
+ rds_col_type_integer = 9 | rds_rough_col_type_int,
+ rds_col_type_interval = 10 | rds_rough_col_type_float,
+ rds_col_type_decimal = 11 | rds_rough_col_type_float,
+ rds_col_type_real = 12 | rds_rough_col_type_float,
+ rds_col_type_smallint = 13 | rds_rough_col_type_int,
+ rds_col_type_time_with_time_zone = 14 | rds_rough_col_type_str,
+ rds_col_type_time = 15 | rds_rough_col_type_str,
+ rds_col_type_timestamp_with_time_zone = 16 | rds_rough_col_type_str,
+ rds_col_type_timestamp = 17 | rds_rough_col_type_str,
+ rds_col_type_xml = 18 | rds_rough_col_type_str,
+
+ /* our additions */
+ rds_col_type_blob = 19 | rds_rough_col_type_str
+
+} rds_col_type_t;
+
+#endif /* RESTY_DBD_STREAME_H */
+