diff options
author | kaiwu <kaiwu2004@gmail.com> | 2025-03-01 12:42:23 +0800 |
---|---|---|
committer | kaiwu <kaiwu2004@gmail.com> | 2025-03-01 12:42:23 +0800 |
commit | 3f33461e4948bf05e60bdff35ec6c57a649c7860 (patch) | |
tree | 284c2ba95a41536ae1bff6bea710db0709a64739 /rds-csv-nginx-module-0.09/src | |
download | openresty-3f33461e4948bf05e60bdff35ec6c57a649c7860.tar.gz openresty-3f33461e4948bf05e60bdff35ec6c57a649c7860.zip |
openresty bundle
Diffstat (limited to 'rds-csv-nginx-module-0.09/src')
-rw-r--r-- | rds-csv-nginx-module-0.09/src/ddebug.h | 85 | ||||
-rw-r--r-- | rds-csv-nginx-module-0.09/src/ngx_http_rds.h | 41 | ||||
-rw-r--r-- | rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_filter_module.c | 519 | ||||
-rw-r--r-- | rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_filter_module.h | 89 | ||||
-rw-r--r-- | rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_output.c | 770 | ||||
-rw-r--r-- | rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_output.h | 38 | ||||
-rw-r--r-- | rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_processor.c | 476 | ||||
-rw-r--r-- | rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_processor.h | 34 | ||||
-rw-r--r-- | rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_util.c | 105 | ||||
-rw-r--r-- | rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_util.h | 34 | ||||
-rw-r--r-- | rds-csv-nginx-module-0.09/src/ngx_http_rds_utils.h | 205 | ||||
-rw-r--r-- | rds-csv-nginx-module-0.09/src/resty_dbd_stream.h | 59 |
12 files changed, 2455 insertions, 0 deletions
diff --git a/rds-csv-nginx-module-0.09/src/ddebug.h b/rds-csv-nginx-module-0.09/src/ddebug.h new file mode 100644 index 0000000..5c15a74 --- /dev/null +++ b/rds-csv-nginx-module-0.09/src/ddebug.h @@ -0,0 +1,85 @@ +#ifndef DDEBUG_H +#define DDEBUG_H + +#include <ngx_config.h> +#include <ngx_core.h> + +#if defined(DDEBUG) && (DDEBUG) + +# define dd_dump_chain_size() { \ + int n; \ + ngx_chain_t *cl; \ + \ + for (n = 0, cl = ctx->out; cl; cl = cl->next, n++) { \ + } \ + \ + dd("chain size: %d", n); \ + } + +# if (NGX_HAVE_VARIADIC_MACROS) + +# define dd(...) fprintf(stderr, "rds-csv *** %s: ", __func__); \ + fprintf(stderr, __VA_ARGS__); \ + fprintf(stderr, " at %s line %d.\n", __FILE__, __LINE__) + +# else + +#include <stdarg.h> +#include <stdio.h> + +#include <stdarg.h> + +static ngx_inline void +dd(const char * fmt, ...) { +} + +# endif + +#else + +# define dd_dump_chain_size() + +# if (NGX_HAVE_VARIADIC_MACROS) + +# define dd(...) + +# else + +#include <stdarg.h> + +static ngx_inline void +dd(const char * fmt, ...) { +} + +# endif + +#endif + +#if defined(DDEBUG) && (DDEBUG) + +#define dd_check_read_event_handler(r) \ + dd("r->read_event_handler = %s", \ + r->read_event_handler == ngx_http_block_reading ? \ + "ngx_http_block_reading" : \ + r->read_event_handler == ngx_http_test_reading ? \ + "ngx_http_test_reading" : \ + r->read_event_handler == ngx_http_request_empty_handler ? \ + "ngx_http_request_empty_handler" : "UNKNOWN") + +#define dd_check_write_event_handler(r) \ + dd("r->write_event_handler = %s", \ + r->write_event_handler == ngx_http_handler ? \ + "ngx_http_handler" : \ + r->write_event_handler == ngx_http_core_run_phases ? \ + "ngx_http_core_run_phases" : \ + r->write_event_handler == ngx_http_request_empty_handler ? \ + "ngx_http_request_empty_handler" : "UNKNOWN") + +#else + +#define dd_check_read_event_handler(r) +#define dd_check_write_event_handler(r) + +#endif + +#endif /* DDEBUG_H */ diff --git a/rds-csv-nginx-module-0.09/src/ngx_http_rds.h b/rds-csv-nginx-module-0.09/src/ngx_http_rds.h new file mode 100644 index 0000000..2e3d878 --- /dev/null +++ b/rds-csv-nginx-module-0.09/src/ngx_http_rds.h @@ -0,0 +1,41 @@ + +/* + * Copyright (C) agentzh + */ + +#ifndef NGX_HTTP_RDS_H +#define NGX_HTTP_RDS_H + + +#include "resty_dbd_stream.h" +#include <nginx.h> +#include <ngx_core.h> +#include <ngx_http.h> + + +typedef struct { + uint16_t std_errcode; + uint16_t drv_errcode; + ngx_str_t errstr; + + uint64_t affected_rows; + uint64_t insert_id; + uint16_t col_count; + +} ngx_http_rds_header_t; + + +typedef struct ngx_http_rds_column_s { + rds_col_type_t std_type; + uint16_t drv_type; + + ngx_str_t name; + +} ngx_http_rds_column_t; + + + + + +#endif /* NGX_HTTP_RDS_H */ + diff --git a/rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_filter_module.c b/rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_filter_module.c new file mode 100644 index 0000000..35b6862 --- /dev/null +++ b/rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_filter_module.c @@ -0,0 +1,519 @@ + +/* + * Copyright (C) Yichun Zhang (agentzh) + */ + + +#ifndef DDEBUG +#define DDEBUG 0 +#endif +#include "ddebug.h" + + +#include "ngx_http_rds_csv_filter_module.h" +#include "ngx_http_rds_csv_util.h" +#include "ngx_http_rds_csv_processor.h" +#include "ngx_http_rds_csv_output.h" + +#include <ngx_config.h> + + +#define ngx_http_rds_csv_content_type "text/csv" +#define ngx_http_rds_csv_row_term "\r\n" + + +static volatile ngx_cycle_t *ngx_http_rds_csv_prev_cycle = NULL; + + +ngx_http_output_header_filter_pt ngx_http_rds_csv_next_header_filter; +ngx_http_output_body_filter_pt ngx_http_rds_csv_next_body_filter; + + +static void *ngx_http_rds_csv_create_loc_conf(ngx_conf_t *cf); +static char *ngx_http_rds_csv_merge_loc_conf(ngx_conf_t *cf, void *parent, + void *child); +static ngx_int_t ngx_http_rds_csv_filter_init(ngx_conf_t *cf); +static char *ngx_http_rds_csv_row_terminator(ngx_conf_t *cf, + ngx_command_t *cmd, void *conf); +static char *ngx_http_rds_csv_field_separator(ngx_conf_t *cf, + ngx_command_t *cmd, void *conf); +static char *ngx_http_rds_csv(ngx_conf_t *cf, ngx_command_t *cmd, void *conf); +static void *ngx_http_rds_csv_create_main_conf(ngx_conf_t *cf); + + +static ngx_command_t ngx_http_rds_csv_commands[] = { + + { ngx_string("rds_csv"), + NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF + |NGX_HTTP_LOC_CONF|NGX_HTTP_LIF_CONF + |NGX_CONF_FLAG, + ngx_http_rds_csv, + NGX_HTTP_LOC_CONF_OFFSET, + offsetof(ngx_http_rds_csv_loc_conf_t, enabled), + NULL }, + + { ngx_string("rds_csv_row_terminator"), + NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF + |NGX_HTTP_LOC_CONF|NGX_HTTP_LIF_CONF + |NGX_CONF_TAKE1, + ngx_http_rds_csv_row_terminator, + NGX_HTTP_LOC_CONF_OFFSET, + offsetof(ngx_http_rds_csv_loc_conf_t, row_term), + NULL }, + + { ngx_string("rds_csv_field_separator"), + NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF + |NGX_HTTP_LOC_CONF|NGX_HTTP_LIF_CONF + |NGX_CONF_TAKE1, + ngx_http_rds_csv_field_separator, + NGX_HTTP_LOC_CONF_OFFSET, + 0, + NULL }, + + { ngx_string("rds_csv_field_name_header"), + NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF + |NGX_HTTP_LOC_CONF|NGX_HTTP_LIF_CONF + |NGX_CONF_FLAG, + ngx_conf_set_flag_slot, + NGX_HTTP_LOC_CONF_OFFSET, + offsetof(ngx_http_rds_csv_loc_conf_t, field_name_header), + NULL }, + + { ngx_string("rds_csv_content_type"), + NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF + |NGX_HTTP_LOC_CONF|NGX_HTTP_LIF_CONF + |NGX_CONF_TAKE1, + ngx_conf_set_str_slot, + NGX_HTTP_LOC_CONF_OFFSET, + offsetof(ngx_http_rds_csv_loc_conf_t, content_type), + NULL }, + + { ngx_string("rds_csv_buffer_size"), + NGX_HTTP_MAIN_CONF|NGX_HTTP_SRV_CONF|NGX_HTTP_LOC_CONF|NGX_HTTP_LIF_CONF + |NGX_CONF_TAKE1, + ngx_conf_set_size_slot, + NGX_HTTP_LOC_CONF_OFFSET, + offsetof(ngx_http_rds_csv_loc_conf_t, buf_size), + NULL }, + + ngx_null_command +}; + + +static ngx_http_module_t ngx_http_rds_csv_filter_module_ctx = { + NULL, /* preconfiguration */ + ngx_http_rds_csv_filter_init, /* postconfiguration */ + + ngx_http_rds_csv_create_main_conf, /* create main configuration */ + NULL, /* init main configuration */ + + NULL, /* create server configuration */ + NULL, /* merge server configuration */ + + ngx_http_rds_csv_create_loc_conf, /* create location configuration */ + ngx_http_rds_csv_merge_loc_conf /* merge location configuration */ +}; + + +ngx_module_t ngx_http_rds_csv_filter_module = { + NGX_MODULE_V1, + &ngx_http_rds_csv_filter_module_ctx, /* module context */ + ngx_http_rds_csv_commands, /* module directives */ + NGX_HTTP_MODULE, /* module type */ + NULL, /* init master */ + NULL, /* init module */ + NULL, /* init process */ + NULL, /* init thread */ + NULL, /* exit thread */ + NULL, /* exit process */ + NULL, /* exit master */ + NGX_MODULE_V1_PADDING +}; + + +static ngx_int_t +ngx_http_rds_csv_header_filter(ngx_http_request_t *r) +{ + ngx_http_rds_csv_ctx_t *ctx; + ngx_http_rds_csv_loc_conf_t *conf; + size_t len; + u_char *p; + + /* XXX maybe we can generate stub JSON strings like + * {"errcode":403,"error":"Permission denied"} + * for HTTP error pages? */ + if ((r->headers_out.status < NGX_HTTP_OK) + || (r->headers_out.status >= NGX_HTTP_SPECIAL_RESPONSE) + || (r->headers_out.status == NGX_HTTP_NO_CONTENT) + || (r->headers_out.status == NGX_HTTP_RESET_CONTENT)) + { + ngx_http_set_ctx(r, NULL, ngx_http_rds_csv_filter_module); + + dd("status is not OK: %d, skipping", (int) r->headers_out.status); + + return ngx_http_rds_csv_next_header_filter(r); + } + + /* r->headers_out.status = 0; */ + + conf = ngx_http_get_module_loc_conf(r, ngx_http_rds_csv_filter_module); + + if (!conf->enabled) { + return ngx_http_rds_csv_next_header_filter(r); + } + + if (ngx_http_rds_csv_test_content_type(r) != NGX_OK) { + return ngx_http_rds_csv_next_header_filter(r); + } + + if (conf->content_type.len == sizeof(ngx_http_rds_csv_content_type) - 1 + && ngx_strncmp(conf->content_type.data, ngx_http_rds_csv_content_type, + sizeof(ngx_http_rds_csv_content_type) - 1) == 0) + { + /* MIME type is text/csv, we process Content-Type + * according to RFC 4180 */ + + len = sizeof(ngx_http_rds_csv_content_type) - 1 + + sizeof("; header=") - 1; + + if (conf->field_name_header) { + len += sizeof("presence") - 1; + + } else { + len += sizeof("absence") - 1; + } + + p = ngx_palloc(r->pool, len); + if (p == NULL) { + return NGX_ERROR; + } + + r->headers_out.content_type.len = len; + r->headers_out.content_type_len = len; + + r->headers_out.content_type.data = p; + + p = ngx_copy(p, conf->content_type.data, conf->content_type.len); + + if (conf->field_name_header) { + p = ngx_copy_literal(p, "; header=presence"); + + } else { + p = ngx_copy_literal(p, "; header=absence"); + } + + if (p - r->headers_out.content_type.data != (ssize_t) len) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, + "rds_csv: content type buffer error: %uz != %uz", + (size_t) (p - r->headers_out.content_type.data), + len); + + return NGX_ERROR; + } + + } else { + /* custom MIME-type, we just pass it through */ + + r->headers_out.content_type = conf->content_type; + r->headers_out.content_type_len = conf->content_type.len; + } + + ctx = ngx_pcalloc(r->pool, sizeof(ngx_http_rds_csv_ctx_t)); + + if (ctx == NULL) { + return NGX_ERROR; + } + + ctx->tag = (ngx_buf_tag_t) &ngx_http_rds_csv_filter_module; + + ctx->state = state_expect_header; + + ctx->header_sent = 0; + + ctx->last_out = &ctx->out; + + /* set by ngx_pcalloc + * ctx->out = NULL + * ctx->busy_bufs = NULL + * ctx->free_bufs = NULL + * ctx->cached = (ngx_buf_t) 0 + * ctx->postponed = (ngx_buf_t) 0 + * ctx->avail_out = 0 + * ctx->col_names = NULL + * ctx->col_count = 0 + * ctx->cur_col = 0 + * ctx->field_offset = 0 + * ctx->field_total = 0 + * ctx->field_data_rest = 0 + */ + + ngx_http_set_ctx(r, ctx, ngx_http_rds_csv_filter_module); + + ngx_http_clear_content_length(r); + + r->filter_need_in_memory = 1; + + /* we do postpone the header sending to the body filter */ + return NGX_OK; +} + + +static ngx_int_t +ngx_http_rds_csv_body_filter(ngx_http_request_t *r, ngx_chain_t *in) +{ + ngx_http_rds_csv_ctx_t *ctx; + ngx_int_t rc; + + if (in == NULL || r->header_only) { + return ngx_http_rds_csv_next_body_filter(r, in); + } + + ctx = ngx_http_get_module_ctx(r, ngx_http_rds_csv_filter_module); + + if (ctx == NULL) { + return ngx_http_rds_csv_next_body_filter(r, in); + } + + switch (ctx->state) { + + case state_expect_header: + rc = ngx_http_rds_csv_process_header(r, in, ctx); + break; + + case state_expect_col: + rc = ngx_http_rds_csv_process_col(r, in, ctx); + break; + + case state_expect_row: + rc = ngx_http_rds_csv_process_row(r, in, ctx); + break; + + case state_expect_field: + rc = ngx_http_rds_csv_process_field(r, in, ctx); + break; + + case state_expect_more_field_data: + rc = ngx_http_rds_csv_process_more_field_data(r, in, ctx); + break; + + case state_done: + + /* mark the remaining bufs as consumed */ + + dd("discarding bufs"); + + ngx_http_rds_csv_discard_bufs(r->pool, in); + + return NGX_OK; + break; + default: + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, + "rds_csv: invalid internal state: %d", + ctx->state); + + rc = NGX_ERROR; + + break; + } + + dd("body filter rc: %d", (int) rc); + + if (rc == NGX_ERROR || rc >= NGX_HTTP_SPECIAL_RESPONSE) { + ctx->state = state_done; + + if (!ctx->header_sent) { + ctx->header_sent = 1; + + if (rc == NGX_ERROR) { + rc = NGX_HTTP_INTERNAL_SERVER_ERROR; + } + + r->headers_out.status = rc; + + dd("sending ERROR headers"); + + ngx_http_rds_csv_next_header_filter(r); + ngx_http_send_special(r, NGX_HTTP_LAST); + + return NGX_ERROR; + } + + return NGX_ERROR; + } + + dd("output bufs"); + + return ngx_http_rds_csv_output_bufs(r, ctx); +} + + +static ngx_int_t +ngx_http_rds_csv_filter_init(ngx_conf_t *cf) +{ + int multi_http_blocks; + ngx_http_rds_csv_main_conf_t *rmcf; + + rmcf = ngx_http_conf_get_module_main_conf(cf, + ngx_http_rds_csv_filter_module); + + if (ngx_http_rds_csv_prev_cycle != ngx_cycle) { + ngx_http_rds_csv_prev_cycle = ngx_cycle; + multi_http_blocks = 0; + + } else { + multi_http_blocks = 1; + } + + if (multi_http_blocks || rmcf->requires_filter) { + ngx_http_rds_csv_next_header_filter = ngx_http_top_header_filter; + ngx_http_top_header_filter = ngx_http_rds_csv_header_filter; + + ngx_http_rds_csv_next_body_filter = ngx_http_top_body_filter; + ngx_http_top_body_filter = ngx_http_rds_csv_body_filter; + } + + return NGX_OK; +} + + +static void * +ngx_http_rds_csv_create_loc_conf(ngx_conf_t *cf) +{ + ngx_http_rds_csv_loc_conf_t *conf; + + conf = ngx_pcalloc(cf->pool, sizeof(ngx_http_rds_csv_loc_conf_t)); + if (conf == NULL) { + return NULL; + } + + /* + * set by ngx_pcalloc(): + * + * conf->content_type = { 0, NULL }; + * conf->row_term = { 0, NULL }; + */ + + conf->enabled = NGX_CONF_UNSET; + conf->field_sep = NGX_CONF_UNSET_UINT; + conf->buf_size = NGX_CONF_UNSET_SIZE; + conf->field_name_header = NGX_CONF_UNSET; + + return conf; +} + + +static char * +ngx_http_rds_csv_merge_loc_conf(ngx_conf_t *cf, void *parent, void *child) +{ + ngx_http_rds_csv_loc_conf_t *prev = parent; + ngx_http_rds_csv_loc_conf_t *conf = child; + + ngx_conf_merge_value(conf->enabled, prev->enabled, 0); + + ngx_conf_merge_value(conf->field_name_header, prev->field_name_header, 1); + + ngx_conf_merge_uint_value(conf->field_sep, prev->field_sep, + (ngx_uint_t) ','); + + ngx_conf_merge_str_value(conf->row_term, prev->row_term, + ngx_http_rds_csv_row_term); + + ngx_conf_merge_str_value(conf->content_type, prev->content_type, + ngx_http_rds_csv_content_type); + + ngx_conf_merge_size_value(conf->buf_size, prev->buf_size, + (size_t) ngx_pagesize); + + return NGX_CONF_OK; +} + + +static char * +ngx_http_rds_csv_row_terminator(ngx_conf_t *cf, ngx_command_t *cmd, + void *conf) +{ + ngx_http_rds_csv_loc_conf_t *rlcf = conf; + ngx_str_t *value; + ngx_str_t *term; + + if (rlcf->row_term.len != 0) { + return "is duplicate"; + } + + value = cf->args->elts; + + term = &value[1]; + + if (term->len == 0) { + return "takes empty string value"; + } + + if ((term->len == 1 && term->data[0] == '\n') + || (term->len == 2 && term->data[0] == '\r' && term->data[1] == '\n')) + { + return ngx_conf_set_str_slot(cf, cmd, conf); + } + + return "takes a value other than \"\\n\" and \"\\r\\n\""; +} + + +static char * +ngx_http_rds_csv_field_separator(ngx_conf_t *cf, ngx_command_t *cmd, + void *conf) +{ + ngx_http_rds_csv_loc_conf_t *rlcf = conf; + ngx_str_t *value; + ngx_str_t *sep; + + if (rlcf->field_sep != NGX_CONF_UNSET_UINT) { + return "is duplicate"; + } + + value = cf->args->elts; + + sep = &value[1]; + + if (sep->len != 1) { + return "takes a string value not of length 1"; + } + + if (sep->data[0] == ',' || sep->data[0] == ';' || sep->data[0] == '\t') { + rlcf->field_sep = (ngx_uint_t) (sep->data[0]); + return NGX_CONF_OK; + } + + return "takes a value other than \",\", \";\", and \"\\t\""; +} + + +static char * +ngx_http_rds_csv(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) +{ + ngx_http_rds_csv_main_conf_t *rmcf; + + rmcf = ngx_http_conf_get_module_main_conf(cf, + ngx_http_rds_csv_filter_module); + + rmcf->requires_filter = 1; + + return ngx_conf_set_flag_slot(cf, cmd, conf); +} + + +static void * +ngx_http_rds_csv_create_main_conf(ngx_conf_t *cf) +{ + ngx_http_rds_csv_main_conf_t *rmcf; + + rmcf = ngx_pcalloc(cf->pool, sizeof(ngx_http_rds_csv_main_conf_t)); + if (rmcf == NULL) { + return NULL; + } + + /* set by ngx_pcalloc: + * rmcf->requires_filter = 0; + */ + + return rmcf; +} diff --git a/rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_filter_module.h b/rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_filter_module.h new file mode 100644 index 0000000..ab43c4a --- /dev/null +++ b/rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_filter_module.h @@ -0,0 +1,89 @@ + +/* + * Copyright (C) agentzh + */ + + +#ifndef NGX_HTTP_RDS_CSV_FILTER_MODULE_H +#define NGX_HTTP_RDS_CSV_FILTER_MODULE_H + + +#include "ngx_http_rds.h" + +#include <ngx_core.h> +#include <ngx_http.h> +#include <nginx.h> + + +#ifndef NGX_HTTP_RESET_CONTENT +#define NGX_HTTP_RESET_CONTENT 205 +#endif + + +extern ngx_module_t ngx_http_rds_csv_filter_module; + +extern ngx_http_output_header_filter_pt ngx_http_rds_csv_next_header_filter; +extern ngx_http_output_body_filter_pt ngx_http_rds_csv_next_body_filter; + + +typedef struct { + ngx_flag_t enabled; + ngx_str_t row_term; + ngx_uint_t field_sep; + size_t buf_size; + ngx_flag_t field_name_header; + ngx_str_t content_type; +} ngx_http_rds_csv_loc_conf_t; + + +typedef struct { + ngx_int_t requires_filter; +} ngx_http_rds_csv_main_conf_t; + + +typedef enum { + state_expect_header, + state_expect_col, + state_expect_row, + state_expect_field, + state_expect_more_field_data, + state_done + +} ngx_http_rds_csv_state_t; + + +typedef struct { + ngx_http_rds_csv_state_t state; + + ngx_str_t *col_name; + ngx_uint_t col_count; + ngx_uint_t cur_col; + + ngx_http_rds_column_t *cols; + size_t row; + + uint32_t field_offset; + uint32_t field_total; + + ngx_buf_tag_t tag; + + ngx_chain_t *out; + ngx_chain_t **last_out; + ngx_chain_t *busy_bufs; + ngx_chain_t *free_bufs; + + ngx_buf_t *out_buf; + ngx_buf_t cached; + ngx_buf_t postponed; + + size_t avail_out; + + uint32_t field_data_rest; + + ngx_flag_t header_sent:1; + ngx_flag_t seen_stream_end:1; + ngx_flag_t generated_col_names:1; +} ngx_http_rds_csv_ctx_t; + + +#endif /* NGX_HTTP_RDS_CSV_FILTER_MODULE_H */ diff --git a/rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_output.c b/rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_output.c new file mode 100644 index 0000000..abb7db3 --- /dev/null +++ b/rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_output.c @@ -0,0 +1,770 @@ + +/* + * Copyright (C) Yichun Zhang (agentzh) + */ + + +#ifndef DDEBUG +#define DDEBUG 0 +#endif +#include "ddebug.h" + + +#include "ngx_http_rds_csv_filter_module.h" +#include "ngx_http_rds_csv_output.h" +#include "ngx_http_rds_csv_util.h" +#include "resty_dbd_stream.h" + + +static u_char *ngx_http_rds_csv_request_mem(ngx_http_request_t *r, + ngx_http_rds_csv_ctx_t *ctx, size_t len); +static ngx_int_t ngx_http_rds_csv_get_buf(ngx_http_request_t *r, + ngx_http_rds_csv_ctx_t *ctx); +static u_char *ngx_http_rds_csv_get_postponed(ngx_http_request_t *r, + ngx_http_rds_csv_ctx_t *ctx, size_t len); +static ngx_int_t ngx_http_rds_csv_submit_mem(ngx_http_request_t *r, + ngx_http_rds_csv_ctx_t *ctx, size_t len, unsigned last_buf); +static size_t ngx_get_num_size(uint64_t i); + + +ngx_int_t +ngx_http_rds_csv_output_literal(ngx_http_request_t *r, + ngx_http_rds_csv_ctx_t *ctx, u_char *data, size_t len, + int last_buf) +{ + u_char *pos; + + pos = ngx_http_rds_csv_request_mem(r, ctx, len); + if (pos == NULL) { + return NGX_ERROR; + } + + ngx_memcpy(pos, data, len); + + dd("before output chain"); + + if (last_buf) { + ctx->seen_stream_end = 1; + + if (r != r->main) { + last_buf = 0; + } + } + + return ngx_http_rds_csv_submit_mem(r, ctx, len, (unsigned) last_buf); +} + + +ngx_int_t +ngx_http_rds_csv_output_bufs(ngx_http_request_t *r, + ngx_http_rds_csv_ctx_t *ctx) +{ + ngx_int_t rc; + ngx_chain_t *cl; + + dd("entered output chain"); + + if (ctx->seen_stream_end) { + ctx->seen_stream_end = 0; + + if (ctx->avail_out) { + cl = ngx_alloc_chain_link(r->pool); + if (cl == NULL) { + return NGX_ERROR; + } + + cl->buf = ctx->out_buf; + cl->next = NULL; + *ctx->last_out = cl; + ctx->last_out = &cl->next; + + ctx->avail_out = 0; + } + } + + dd_dump_chain_size(); + + for ( ;; ) { + if (ctx->out == NULL) { + /* fprintf(stderr, "\n"); */ + return NGX_OK; + } + + /* fprintf(stderr, "XXX Relooping..."); */ + + rc = ngx_http_rds_csv_next_body_filter(r, ctx->out); + + if (rc == NGX_ERROR || rc >= NGX_HTTP_SPECIAL_RESPONSE) { + return rc; + } + +#if defined(nginx_version) && nginx_version >= 1001004 + ngx_chain_update_chains(r->pool, &ctx->free_bufs, &ctx->busy_bufs, + &ctx->out, ctx->tag); +#else + ngx_chain_update_chains(&ctx->free_bufs, &ctx->busy_bufs, &ctx->out, + ctx->tag); +#endif + + ctx->last_out = &ctx->out; + } + + /* impossible to reach here */ + return NGX_ERROR; +} + + +ngx_int_t +ngx_http_rds_csv_output_header(ngx_http_request_t *r, + ngx_http_rds_csv_ctx_t *ctx, ngx_http_rds_header_t *header) +{ + u_char *pos, *last; + size_t size; + uintptr_t escape; + unsigned last_buf = 0; + unsigned need_quotes = 0; + u_char sep; + + ngx_http_rds_csv_loc_conf_t *conf; + + /* calculate the buffer size */ + + conf = ngx_http_get_module_loc_conf(r, ngx_http_rds_csv_filter_module); + + if (conf->field_name_header) { + size = sizeof("errcode,errstr,insert_id,affected_rows") - 1 + + conf->row_term.len; + + } else { + size = 0; + } + + sep = (u_char) conf->field_sep; + + size += 3 /* field seperators */ + conf->row_term.len; + + size += ngx_get_num_size(header->std_errcode); + + escape = ngx_http_rds_csv_escape_csv_str(sep, NULL, header->errstr.data, + header->errstr.len, + &need_quotes); + + if (need_quotes) { + size += sizeof("\"\"") - 1; + } + + size += header->errstr.len + escape + + ngx_get_num_size(header->insert_id) + + ngx_get_num_size(header->affected_rows); + + /* create the buffer */ + + pos = ngx_http_rds_csv_request_mem(r, ctx, size); + if (pos == NULL) { + return NGX_ERROR; + } + + last = pos; + + /* fill up the buffer */ + + last = ngx_sprintf(last, "errcode%cerrstr%cinsert_id%caffected_rows%V" + "%uD%c", sep, sep, sep, &conf->row_term, + (uint32_t) header->std_errcode, sep); + + if (need_quotes) { + *last++ = '"'; + } + + if (escape == 0) { + last = ngx_copy(last, header->errstr.data, header->errstr.len); + + } else { + last = (u_char *) + ngx_http_rds_csv_escape_csv_str(sep, last, + header->errstr.data, + header->errstr.len, NULL); + } + + if (need_quotes) { + *last++ = '"'; + } + + last = ngx_sprintf(last, "%c%uL%c%uL%V", sep, header->insert_id, sep, + header->affected_rows, &conf->row_term); + + if ((size_t) (last - pos) != size) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, + "rds_csv: output header buffer error: %uz != %uz", + (size_t) (last - pos), size); + + return NGX_ERROR; + } + + if (r == r->main) { + last_buf = 1; + } + + ctx->seen_stream_end = 1; + + return ngx_http_rds_csv_submit_mem(r, ctx, size, last_buf); +} + + +ngx_int_t +ngx_http_rds_csv_output_field_names(ngx_http_request_t *r, + ngx_http_rds_csv_ctx_t *ctx) +{ + ngx_uint_t i; + ngx_http_rds_column_t *col; + size_t size; + u_char *pos, *last; + uintptr_t escape = 0; + unsigned need_quotes; + u_char sep; + ngx_http_rds_csv_loc_conf_t *conf; + + conf = ngx_http_get_module_loc_conf(r, ngx_http_rds_csv_filter_module); + + sep = (u_char) conf->field_sep; + + size = ctx->col_count - 1 /* field sep count */ + + conf->row_term.len; + + for (i = 0; i < ctx->col_count; i++) { + col = &ctx->cols[i]; + escape = ngx_http_rds_csv_escape_csv_str(sep, NULL, col->name.data, + col->name.len, &need_quotes); + + dd("field escape: %d", (int) escape); + + if (need_quotes) { + size += sizeof("\"\"") - 1; + } + + size += col->name.len + escape; + } + + ctx->generated_col_names = 1; + + pos = ngx_http_rds_csv_request_mem(r, ctx, size); + if (pos == NULL) { + return NGX_ERROR; + } + + last = pos; + + for (i = 0; i < ctx->col_count; i++) { + col = &ctx->cols[i]; + + escape = ngx_http_rds_csv_escape_csv_str(sep, NULL, col->name.data, + col->name.len, &need_quotes); + + if (need_quotes) { + *last++ = '"'; + } + + if (escape == 0) { + last = ngx_copy(last, col->name.data, col->name.len); + + } else { + last = (u_char *) + ngx_http_rds_csv_escape_csv_str(sep, last, + col->name.data, + col->name.len, NULL); + } + + if (need_quotes) { + *last++ = '"'; + } + + if (i != ctx->col_count - 1) { + *last++ = sep; + } + } + + last = ngx_copy(last, conf->row_term.data, conf->row_term.len); + + if ((size_t) (last - pos) != size) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, + "rds_csv: output field names buffer error: %uz != %uz", + (size_t) (last - pos), size); + + return NGX_ERROR; + } + + return ngx_http_rds_csv_submit_mem(r, ctx, size, 0); +} + + +ngx_int_t +ngx_http_rds_csv_output_field(ngx_http_request_t *r, + ngx_http_rds_csv_ctx_t *ctx, u_char *data, size_t len, int is_null) +{ + u_char *pos, *last; + ngx_http_rds_column_t *col; + size_t size; + uintptr_t val_escape = 0; + unsigned need_quotes = 0; + u_char sep; + ngx_http_rds_csv_loc_conf_t *conf; +#if DDEBUG + u_char *p; +#endif + + conf = ngx_http_get_module_loc_conf(r, ngx_http_rds_csv_filter_module); + + sep = (u_char) conf->field_sep; + + dd("reading row %llu, col %d, len %d", + (unsigned long long) ctx->row, + (int) ctx->cur_col, (int) len); + + /* calculate the buffer size */ + + if (ctx->cur_col == 0) { + size = 0; + + } else { + size = 1 /* field sep */; + } + + col = &ctx->cols[ctx->cur_col]; + + if (len == 0 && ctx->field_data_rest > 0) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, + "rds_csv: at least one octet should go with the field " + "size in one buf"); + + return NGX_ERROR; + } + + if (is_null) { + /* SQL NULL is just empty in the CSV field */ + + } else if (len == 0) { + /* empty string is also empty */ + + } else { + switch (col->std_type & 0xc000) { + case rds_rough_col_type_float: + case rds_rough_col_type_int: + case rds_rough_col_type_bool: + size += len; + break; + + default: + dd("string field found"); + + val_escape = ngx_http_rds_csv_escape_csv_str(sep, NULL, data, len, + &need_quotes); + + if (ctx->field_data_rest > 0 && !need_quotes) { + need_quotes = 1; + } + + if (need_quotes) { + if (ctx->field_data_rest == 0) { + size += sizeof("\"\"") - 1; + + } else { + size += sizeof("\"") - 1; + } + } + + size += len + val_escape; + break; + } + } + + if (ctx->field_data_rest == 0 && ctx->cur_col == ctx->col_count - 1) { + /* last column in the row */ + size += conf->row_term.len; + } + + /* allocate the buffer */ + + pos = ngx_http_rds_csv_request_mem(r, ctx, size); + if (pos == NULL) { + return NGX_ERROR; + } + + last = pos; + + /* fill up the buffer */ + + if (ctx->cur_col != 0) { + *last++ = sep; + } + + if (is_null || len == 0) { + /* do nothing */ + + } else { + switch (col->std_type & 0xc000) { + case rds_rough_col_type_int: + case rds_rough_col_type_float: + case rds_rough_col_type_bool: + last = ngx_copy(last, data, len); + break; + + default: + /* string */ + if (need_quotes) { + *last++ = '"'; + } + + if (val_escape == 0) { + last = ngx_copy(last, data, len); + + } else { + dd("field: string value escape non-zero: %d", + (int) val_escape); + +#if DDEBUG + p = last; +#endif + + last = (u_char *) + ngx_http_rds_csv_escape_csv_str(sep, last, data, len, + NULL); + +#if DDEBUG + dd("escaped value \"%.*s\" (len %d, escape %d, escape2 %d)", + (int) (len + val_escape), + p, (int) (len + val_escape), + (int) val_escape, + (int) ((last - p) - len)); +#endif + } + + if (need_quotes && ctx->field_data_rest == 0) { + *last++ = '"'; + } + + break; + } + } + + if (ctx->field_data_rest == 0 && ctx->cur_col == ctx->col_count - 1) { + last = ngx_copy(last, conf->row_term.data, conf->row_term.len); + } + + if ((size_t) (last - pos) != size) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, + "rds_csv: output field: buffer error (%d left)", + (int) size - (last - pos)); + + return NGX_ERROR; + } + + return ngx_http_rds_csv_submit_mem(r, ctx, size, 0); +} + + +ngx_int_t +ngx_http_rds_csv_output_more_field_data(ngx_http_request_t *r, + ngx_http_rds_csv_ctx_t *ctx, u_char *data, size_t len) +{ + u_char *pos, *last; + size_t size = 0; + ngx_http_rds_column_t *col; + uintptr_t escape = 0; +#if DDEBUG + u_char *p; +#endif + unsigned need_quotes; + u_char sep; + ngx_http_rds_csv_loc_conf_t *conf; + + conf = ngx_http_get_module_loc_conf(r, ngx_http_rds_csv_filter_module); + + sep = (u_char) conf->field_sep; + + /* calculate the buffer size */ + + col = &ctx->cols[ctx->cur_col]; + + switch (col->std_type & 0xc000) { + case rds_rough_col_type_int: + case rds_rough_col_type_float: + case rds_rough_col_type_bool: + size += len; + break; + + default: + /* string */ + + escape = ngx_http_rds_csv_escape_csv_str(sep, NULL, data, len, + &need_quotes); + + size = len + escape; + + if (ctx->field_data_rest == 0) { + size += sizeof("\"") - 1; + } + + break; + } + + if (ctx->field_data_rest == 0 && ctx->cur_col == ctx->col_count - 1) { + /* last column in the row */ + size += conf->row_term.len; + } + + /* allocate the buffer */ + + pos = ngx_http_rds_csv_request_mem(r, ctx, size); + if (pos == NULL) { + return NGX_ERROR; + } + + last = pos; + + /* fill up the buffer */ + + switch (col->std_type & 0xc000) { + case rds_rough_col_type_int: + case rds_rough_col_type_float: + case rds_rough_col_type_bool: + last = ngx_copy(last, data, len); + break; + + default: + /* string */ + if (escape == 0) { + last = ngx_copy(last, data, len); + + } else { + dd("more field data: string value escape non-zero: %d", + (int) escape); + +#if DDEBUG + p = last; +#endif + + last = (u_char *) ngx_http_rds_csv_escape_csv_str(sep, last, data, + len, NULL); + +#if DDEBUG + dd("escaped value \"%.*s\" (len %d, escape %d, escape2 %d)", + (int) (len + escape), + p, (int) (len + escape), + (int) escape, + (int) ((last - p) - len)); +#endif + } + + if (ctx->field_data_rest == 0) { + *last++ = '"'; + } + + break; + } /* switch */ + + if (ctx->field_data_rest == 0 && ctx->cur_col == ctx->col_count - 1) { + /* last column in the row */ + last = ngx_copy(last, conf->row_term.data, conf->row_term.len); + } + + if ((size_t) (last - pos) != size) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, + "rds_csv: output more field data: buffer error " + "(%d left)", (int) (size - (last - pos))); + return NGX_ERROR; + } + + return ngx_http_rds_csv_submit_mem(r, ctx, size, 0); +} + + +static u_char * +ngx_http_rds_csv_request_mem(ngx_http_request_t *r, + ngx_http_rds_csv_ctx_t *ctx, size_t len) +{ + ngx_int_t rc; + u_char *p; + + rc = ngx_http_rds_csv_get_buf(r, ctx); + if (rc != NGX_OK) { + return NULL; + } + + if (ctx->avail_out < len) { + p = ngx_http_rds_csv_get_postponed(r, ctx, len); + if (p == NULL) { + return NULL; + } + + ctx->postponed.pos = p; + ctx->postponed.last = p + len; + + return p; + } + + return ctx->out_buf->last; +} + + +static ngx_int_t +ngx_http_rds_csv_get_buf(ngx_http_request_t *r, ngx_http_rds_csv_ctx_t *ctx) +{ + ngx_http_rds_csv_loc_conf_t *conf; + + dd("MEM enter"); + + if (ctx->avail_out) { + return NGX_OK; + } + + conf = ngx_http_get_module_loc_conf(r, ngx_http_rds_csv_filter_module); + + if (ctx->free_bufs) { + dd("MEM reusing temp buf from free_bufs"); + + ctx->out_buf = ctx->free_bufs->buf; + ctx->free_bufs = ctx->free_bufs->next; + + } else { + dd("MEM creating temp buf with size: %d", (int) conf->buf_size); + ctx->out_buf = ngx_create_temp_buf(r->pool, conf->buf_size); + if (ctx->out_buf == NULL) { + return NGX_ERROR; + } + + ctx->out_buf->tag = (ngx_buf_tag_t) &ngx_http_rds_csv_filter_module; + ctx->out_buf->recycled = 1; + } + + ctx->avail_out = conf->buf_size; + + return NGX_OK; +} + + +static u_char * +ngx_http_rds_csv_get_postponed(ngx_http_request_t *r, + ngx_http_rds_csv_ctx_t *ctx, size_t len) +{ + u_char *p; + + dd("MEM enter"); + + if (ctx->cached.start == NULL) { + goto alloc; + } + + if ((size_t) (ctx->cached.end - ctx->cached.start) < len) { + ngx_pfree(r->pool, ctx->cached.start); + goto alloc; + } + + return ctx->cached.start; + +alloc: + + p = ngx_palloc(r->pool, len); + if (p == NULL) { + return NULL; + } + + ctx->cached.start = p; + ctx->cached.end = p + len; + + return p; +} + + +static ngx_int_t +ngx_http_rds_csv_submit_mem(ngx_http_request_t *r, + ngx_http_rds_csv_ctx_t *ctx, size_t len, unsigned last_buf) +{ + ngx_chain_t *cl; + ngx_int_t rc; + + if (ctx->postponed.pos != NULL) { + dd("MEM copy postponed data over to ctx->out for len %d", (int) len); + + for ( ;; ) { + len = ctx->postponed.last - ctx->postponed.pos; + if (len > ctx->avail_out) { + len = ctx->avail_out; + } + + ctx->out_buf->last = ngx_copy(ctx->out_buf->last, + ctx->postponed.pos, len); + + ctx->avail_out -= len; + + ctx->postponed.pos += len; + + if (ctx->postponed.pos == ctx->postponed.last) { + ctx->postponed.pos = NULL; + } + + if (ctx->avail_out > 0) { + break; + } + + dd("MEM save ctx->out_buf"); + + cl = ngx_alloc_chain_link(r->pool); + if (cl == NULL) { + return NGX_ERROR; + } + + cl->buf = ctx->out_buf; + cl->next = NULL; + *ctx->last_out = cl; + ctx->last_out = &cl->next; + + if (ctx->postponed.pos == NULL) { + ctx->out_buf->last_buf = last_buf; + break; + } + + rc = ngx_http_rds_csv_get_buf(r, ctx); + if (rc != NGX_OK) { + return NGX_ERROR; + } + } + + return NGX_OK; + } + + dd("MEM consuming out_buf for %d", (int) len); + + ctx->out_buf->last += len; + ctx->avail_out -= len; + ctx->out_buf->last_buf = last_buf; + + if (ctx->avail_out == 0) { + dd("MEM save ctx->out_buf"); + + cl = ngx_alloc_chain_link(r->pool); + if (cl == NULL) { + return NGX_ERROR; + } + + cl->buf = ctx->out_buf; + cl->next = NULL; + *ctx->last_out = cl; + ctx->last_out = &cl->next; + } + + return NGX_OK; +} + + +static size_t +ngx_get_num_size(uint64_t i) +{ + size_t n = 0; + + do { + i = i / 10; + n++; + } while (i > 0); + + return n; +} diff --git a/rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_output.h b/rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_output.h new file mode 100644 index 0000000..43b955f --- /dev/null +++ b/rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_output.h @@ -0,0 +1,38 @@ + +/* + * Copyright (C) Yichun Zhang (agentzh) + */ + + +#ifndef NGX_HTTP_RDS_CSV_OUTPUT_H +#define NGX_HTTP_RDS_CSV_OUTPUT_H + + +#include "ngx_http_rds_csv_filter_module.h" +#include "ngx_http_rds.h" + +#include <ngx_core.h> +#include <ngx_http.h> +#include <nginx.h> + + +ngx_int_t ngx_http_rds_csv_output_header(ngx_http_request_t *r, + ngx_http_rds_csv_ctx_t *ctx, ngx_http_rds_header_t *header); + +ngx_int_t ngx_http_rds_csv_output_field_names(ngx_http_request_t *r, + ngx_http_rds_csv_ctx_t *ctx); + +ngx_int_t ngx_http_rds_csv_output_literal(ngx_http_request_t *r, + ngx_http_rds_csv_ctx_t *ctx, u_char *data, size_t len, int last_buf); + +ngx_int_t ngx_http_rds_csv_output_bufs(ngx_http_request_t *r, + ngx_http_rds_csv_ctx_t *ctx); + +ngx_int_t ngx_http_rds_csv_output_field(ngx_http_request_t *r, + ngx_http_rds_csv_ctx_t *ctx, u_char *data, size_t len, int is_null); + +ngx_int_t ngx_http_rds_csv_output_more_field_data(ngx_http_request_t *r, + ngx_http_rds_csv_ctx_t *ctx, u_char *data, size_t len); + + +#endif /* NGX_HTTP_RDS_CSV_OUTPUT_H */ diff --git a/rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_processor.c b/rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_processor.c new file mode 100644 index 0000000..4a156d1 --- /dev/null +++ b/rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_processor.c @@ -0,0 +1,476 @@ +/* + * Copyright (C) agentzh + */ + + +#ifndef DDEBUG +#define DDEBUG 0 +#endif +#include "ddebug.h" + + +#include "ngx_http_rds_csv_processor.h" +#include "ngx_http_rds_csv_util.h" +#include "ngx_http_rds_csv_output.h" +#include "ngx_http_rds.h" +#include "ngx_http_rds_utils.h" + + +#include <ngx_core.h> +#include <ngx_http.h> + + +ngx_int_t +ngx_http_rds_csv_process_header(ngx_http_request_t *r, ngx_chain_t *in, + ngx_http_rds_csv_ctx_t *ctx) +{ + ngx_buf_t *b; + ngx_http_rds_header_t header; + ngx_int_t rc; + + if (in == NULL) { + return NGX_OK; + } + + b = in->buf; + + if (!ngx_buf_in_memory(b)) { + if (!ngx_buf_special(b)) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, + "rds_csv: process header: buf from " + "upstream not in memory"); + goto invalid; + } + + in = in->next; + + if (in == NULL) { + return NGX_OK; + } + + b = in->buf; + } + + rc = ngx_http_rds_parse_header(r, b, &header); + + if (rc != NGX_OK) { + goto invalid; + } + + dd("col count: %d", (int) header.col_count); + + if (header.col_count == 0) { + /* for empty result set, just return the JSON + * representation of the RDS header */ + + dd("col count == 0"); + + if (b->pos != b->last) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, + "rds_csv: header: there's unexpected remaining data " + "in the buf"); + + goto invalid; + } + + ctx->state = state_done; + + /* now we send the postponed response header */ + if (!ctx->header_sent) { + ctx->header_sent = 1; + + rc = ngx_http_rds_csv_next_header_filter(r); + + if (rc == NGX_ERROR || rc >= NGX_HTTP_SPECIAL_RESPONSE) { + return rc; + } + } + + rc = ngx_http_rds_csv_output_header(r, ctx, &header); + + if (rc == NGX_ERROR || rc >= NGX_HTTP_SPECIAL_RESPONSE) { + return rc; + } + + ngx_http_rds_csv_discard_bufs(r->pool, in); + + return rc; + } + + ctx->cols = ngx_palloc(r->pool, + header.col_count * sizeof(ngx_http_rds_column_t)); + + if (ctx->cols == NULL) { + goto invalid; + } + + ctx->state = state_expect_col; + ctx->cur_col = 0; + ctx->col_count = header.col_count; + + /* now we send the postponed response header */ + if (!ctx->header_sent) { + ctx->header_sent = 1; + + rc = ngx_http_rds_csv_next_header_filter(r); + if (rc == NGX_ERROR || rc >= NGX_HTTP_SPECIAL_RESPONSE) { + return rc; + } + } + + return ngx_http_rds_csv_process_col(r, b->pos == b->last ? in->next : in, + ctx); + +invalid: + + dd("return 500"); + if (!ctx->header_sent) { + ctx->header_sent = 1; + + r->headers_out.status = NGX_HTTP_INTERNAL_SERVER_ERROR; + ngx_http_send_header(r); + ngx_http_send_special(r, NGX_HTTP_LAST); + + return NGX_ERROR; + } + + return NGX_ERROR; +} + + +ngx_int_t +ngx_http_rds_csv_process_col(ngx_http_request_t *r, ngx_chain_t *in, + ngx_http_rds_csv_ctx_t *ctx) +{ + ngx_buf_t *b; + ngx_int_t rc; + ngx_http_rds_csv_loc_conf_t *conf; + + if (in == NULL) { + return NGX_OK; + } + + b = in->buf; + + if (!ngx_buf_in_memory(b)) { + if (!ngx_buf_special(b)) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, + "rds_csv: process col: buf from upstream not in " + "memory"); + return NGX_ERROR; + } + + in = in->next; + + if (in == NULL) { + return NGX_OK; + } + + b = in->buf; + } + + dd("parsing rds column"); + + rc = ngx_http_rds_parse_col(r, b, &ctx->cols[ctx->cur_col]); + + dd("parse col returns %d (%d)", (int) rc, (int) NGX_OK); + + if (rc != NGX_OK) { + return NGX_ERROR; + } + + if (b->pos == b->last) { + dd("parse col buf consumed"); + in = in->next; + } + + ctx->cur_col++; + + if (ctx->cur_col >= ctx->col_count) { + dd("end of column list"); + + ctx->state = state_expect_row; + ctx->row = 0; + + conf = ngx_http_get_module_loc_conf(r, ngx_http_rds_csv_filter_module); + + if (conf->field_name_header) { + rc = ngx_http_rds_csv_output_field_names(r, ctx); + + if (rc == NGX_ERROR || rc >= NGX_HTTP_SPECIAL_RESPONSE) { + return rc; + } + } + + dd("after output literal"); + + if (rc == NGX_ERROR || rc >= NGX_HTTP_SPECIAL_RESPONSE) { + return rc; + } + + + dd("process col is entering process row..."); + return ngx_http_rds_csv_process_row(r, in, ctx); + } + + return ngx_http_rds_csv_process_col(r, in, ctx); +} + + +ngx_int_t +ngx_http_rds_csv_process_row(ngx_http_request_t *r, ngx_chain_t *in, + ngx_http_rds_csv_ctx_t *ctx) +{ + ngx_buf_t *b; + ngx_int_t rc; + + if (in == NULL) { + return NGX_OK; + } + + dd("process row"); + + b = in->buf; + + if (!ngx_buf_in_memory(b)) { + if (!ngx_buf_special(b)) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, + "rds_csv: process row: buf from " + "upstream not in memory"); + return NGX_ERROR; + } + + in = in->next; + + if (in == NULL) { + return NGX_OK; + } + + b = in->buf; + } + + if (b->last - b->pos < (ssize_t) sizeof(uint8_t)) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, + "rds_csv: row flag is incomplete in the buf"); + return NGX_ERROR; + } + + dd("row flag: %d (offset %d)", (char) *b->pos, (int) (b->pos - b->start)); + + if (*b->pos++ == 0) { + /* end of row list */ + ctx->state = state_done; + + if (b->pos != b->last) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, + "rds_csv: row: there's unexpected remaining data " + "in the buf"); + return NGX_ERROR; + } + + rc = ngx_http_rds_csv_output_literal(r, ctx, (u_char *) "", 0, + 1 /* last buf*/); + + if (rc == NGX_ERROR || rc >= NGX_HTTP_SPECIAL_RESPONSE) { + return rc; + } + + return rc; + } + + ctx->row++; + ctx->cur_col = 0; + ctx->state = state_expect_field; + + if (b->pos == b->last) { + in = in->next; + + } else { + dd("process row: buf not consumed completely"); + } + + return ngx_http_rds_csv_process_field(r, in, ctx); +} + + +ngx_int_t +ngx_http_rds_csv_process_field(ngx_http_request_t *r, ngx_chain_t *in, + ngx_http_rds_csv_ctx_t *ctx) +{ + size_t total, len; + ngx_buf_t *b; + ngx_int_t rc; + + for (;;) { + if (in == NULL) { + return NGX_OK; + } + + b = in->buf; + + if (!ngx_buf_in_memory(b)) { + dd("buf not in memory"); + + if (!ngx_buf_special(b)) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, + "rds_csv: process field: buf from " + "upstream not in memory"); + return NGX_ERROR; + } + + in = in->next; + + if (in == NULL) { + return NGX_OK; + } + + b = in->buf; + } + + dd("process field: buf size: %d", (int) ngx_buf_size(b)); + + if (b->last - b->pos < (ssize_t) sizeof(uint32_t)) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, + "rds_csv: field size is incomplete in the buf: %*s " + "(len: %d)", b->last - b->pos, b->pos, + (int) (b->last - b->pos)); + + return NGX_ERROR; + } + + total = *(uint32_t *) b->pos; + + dd("total: %d", (int) total); + + b->pos += sizeof(uint32_t); + + if (total == (uint32_t) -1) { + /* SQL NULL found */ + total = 0; + len = 0; + ctx->field_data_rest = 0; + + rc = ngx_http_rds_csv_output_field(r, ctx, b->pos, len, + 1 /* is null */); + + } else { + len = (uint32_t) (b->last - b->pos); + + if (len >= total) { + len = total; + } + + ctx->field_data_rest = total - len; + + rc = ngx_http_rds_csv_output_field(r, ctx, b->pos, len, + 0 /* not null */); + } + + if (rc == NGX_ERROR || rc >= NGX_HTTP_SPECIAL_RESPONSE) { + return rc; + } + + b->pos += len; + + if (b->pos == b->last) { + in = in->next; + } + + if (len < total) { + dd("process field: need to read more field data"); + + ctx->state = state_expect_more_field_data; + + return ngx_http_rds_csv_process_more_field_data(r, in, ctx); + } + + ctx->cur_col++; + + if (ctx->cur_col >= ctx->col_count) { + dd("reached the end of the current row"); + + ctx->state = state_expect_row; + + return ngx_http_rds_csv_process_row(r, in, ctx); + } + + /* continue to process the next field (if any) */ + } + + /* impossible to reach here */ + + return NGX_ERROR; +} + + +ngx_int_t +ngx_http_rds_csv_process_more_field_data(ngx_http_request_t *r, + ngx_chain_t *in, ngx_http_rds_csv_ctx_t *ctx) +{ + ngx_int_t rc; + ngx_buf_t *b; + size_t len; + + for (;;) { + if (in == NULL) { + return NGX_OK; + } + + b = in->buf; + + if (!ngx_buf_in_memory(b)) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, + "rds_csv: buf from upstream not in memory"); + return NGX_ERROR; + } + + len = b->last - b->pos; + + if (len >= ctx->field_data_rest) { + len = ctx->field_data_rest; + ctx->field_data_rest = 0; + + } else { + ctx->field_data_rest -= len; + } + + rc = ngx_http_rds_csv_output_more_field_data(r, ctx, b->pos, len); + + if (rc == NGX_ERROR || rc >= NGX_HTTP_SPECIAL_RESPONSE) { + return rc; + } + + b->pos += len; + + if (b->pos == b->last) { + in = in->next; + } + + if (ctx->field_data_rest) { + dd("process more field data: still some data remaining"); + continue; + } + + dd("process more field data: reached the end of the current field"); + + ctx->cur_col++; + + if (ctx->cur_col >= ctx->col_count) { + dd("process more field data: reached the end of the current row"); + + ctx->state = state_expect_row; + + return ngx_http_rds_csv_process_row(r, in, ctx); + } + + dd("proces more field data: read the next field"); + + ctx->state = state_expect_field; + + return ngx_http_rds_csv_process_field(r, in, ctx); + } + + /* impossible to reach here */ + + return NGX_ERROR; +} diff --git a/rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_processor.h b/rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_processor.h new file mode 100644 index 0000000..011913b --- /dev/null +++ b/rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_processor.h @@ -0,0 +1,34 @@ + +/* + * Copyright (C) Yichun Zhang (agentzh) + */ + + +#ifndef NGX_HTTP_RDS_CSV_PROCESSOR_H +#define NGX_HTTP_RDS_CSV_PROCESSOR_H + + +#include "ngx_http_rds_csv_filter_module.h" + +#include <ngx_core.h> +#include <ngx_http.h> +#include <nginx.h> + + +ngx_int_t ngx_http_rds_csv_process_header(ngx_http_request_t *r, + ngx_chain_t *in, ngx_http_rds_csv_ctx_t *ctx); + +ngx_int_t ngx_http_rds_csv_process_col(ngx_http_request_t *r, + ngx_chain_t *in, ngx_http_rds_csv_ctx_t *ctx); + +ngx_int_t ngx_http_rds_csv_process_row(ngx_http_request_t *r, + ngx_chain_t *in, ngx_http_rds_csv_ctx_t *ctx); + +ngx_int_t ngx_http_rds_csv_process_field(ngx_http_request_t *r, + ngx_chain_t *in, ngx_http_rds_csv_ctx_t *ctx); + +ngx_int_t ngx_http_rds_csv_process_more_field_data(ngx_http_request_t *r, + ngx_chain_t *in, ngx_http_rds_csv_ctx_t *ctx); + + +#endif /* NGX_HTTP_RDS_CSV_PROCESSOR_H */ diff --git a/rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_util.c b/rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_util.c new file mode 100644 index 0000000..8ca0414 --- /dev/null +++ b/rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_util.c @@ -0,0 +1,105 @@ + +/* + * Copyright (C) Yichun Zhang (agentzh) + */ + + +#ifndef DDEBUG +#define DDEBUG 0 +#endif +#include "ddebug.h" + + +#include "resty_dbd_stream.h" +#include "ngx_http_rds_csv_util.h" + + +uintptr_t +ngx_http_rds_csv_escape_csv_str(u_char field_sep, u_char *dst, u_char *src, + size_t size, unsigned *need_quotes) +{ + ngx_uint_t n; + + if (dst == NULL) { + *need_quotes = 0; + + /* find the number of characters to be escaped */ + + n = 0; + + while (size) { + switch (*src) { + case '"': + n++; + /* fallthrough */ + + case '\r': + case '\n': + *need_quotes = 1; + break; + + default: + if (*src == field_sep) { + *need_quotes = 1; + } + break; + } + + src++; + size--; + } + + return (uintptr_t) n; + } + + while (size) { + if (*src == '"') { + *dst++ = '"'; + *dst++ = '"'; + src++; + + } else { + *dst++ = *src++; + } + + size--; + } + + return (uintptr_t) dst; +} + + +ngx_int_t +ngx_http_rds_csv_test_content_type(ngx_http_request_t *r) +{ + ngx_str_t *type; + + type = &r->headers_out.content_type; + if (type->len != rds_content_type_len + || ngx_strncmp(type->data, rds_content_type, rds_content_type_len) + != 0) + { + return NGX_DECLINED; + } + + return NGX_OK; +} + + +void +ngx_http_rds_csv_discard_bufs(ngx_pool_t *pool, ngx_chain_t *in) +{ + ngx_chain_t *cl; + + for (cl = in; cl; cl = cl->next) { +#if 0 + if (cl->buf->temporary + && ngx_buf_size(cl->buf) > 0) + { + ngx_pfree(pool, cl->buf->start); + } +#endif + + cl->buf->pos = cl->buf->last; + } +} diff --git a/rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_util.h b/rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_util.h new file mode 100644 index 0000000..57fe892 --- /dev/null +++ b/rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_util.h @@ -0,0 +1,34 @@ + +/* + * Copyright (C) Yichun Zhang (agentzh) + */ + + +#ifndef NGX_HTTP_RDS_CSV_UTIL_H +#define NGX_HTTP_RDS_CSV_UTIL_H + + +#include <ngx_core.h> +#include <ngx_http.h> + + +#ifndef NGX_UINT64_LEN +#define NGX_UINT64_LEN (sizeof("18446744073709551615") - 1) +#endif + +#ifndef NGX_UINT16_LEN +#define NGX_UINT16_LEN (sizeof("65535") - 1) +#endif + +#ifndef ngx_copy_literal +#define ngx_copy_literal(p, s) ngx_copy(p, s, sizeof(s) - 1) +#endif + + +uintptr_t ngx_http_rds_csv_escape_csv_str(u_char field_sep, u_char *dst, + u_char *src, size_t size, unsigned *need_quotes); +ngx_int_t ngx_http_rds_csv_test_content_type(ngx_http_request_t *r); +void ngx_http_rds_csv_discard_bufs(ngx_pool_t *pool, ngx_chain_t *in); + + +#endif /* NGX_HTTP_RDS_CSV_UTIL_H */ diff --git a/rds-csv-nginx-module-0.09/src/ngx_http_rds_utils.h b/rds-csv-nginx-module-0.09/src/ngx_http_rds_utils.h new file mode 100644 index 0000000..4a71d4a --- /dev/null +++ b/rds-csv-nginx-module-0.09/src/ngx_http_rds_utils.h @@ -0,0 +1,205 @@ + +/* + * Copyright (C) Yichun Zhang (agentzh) + */ + + +#ifndef NGX_HTTP_RDS_UTILS_H +#define NGX_HTTP_RDS_UTILS_H + + +#include <stdint.h> + + +static ngx_inline ngx_int_t +ngx_http_rds_parse_header(ngx_http_request_t *r, ngx_buf_t *b, + ngx_http_rds_header_t *header) +{ + ssize_t rest; + + rest = sizeof(uint8_t) /* endian type */ + + sizeof(uint32_t) /* format version */ + + sizeof(uint8_t) /* result type */ + + + sizeof(uint16_t) /* standard error code */ + + sizeof(uint16_t) /* driver-specific error code */ + + + sizeof(uint16_t) /* driver-specific errstr len */ + + 0 /* driver-specific errstr data */ + + sizeof(uint64_t) /* affected rows */ + + sizeof(uint64_t) /* insert id */ + + sizeof(uint16_t) /* column count */ + ; + + if (b->last - b->pos < rest) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, + "rds: header is incomplete in the buf"); + return NGX_ERROR; + } + + /* check endian type */ + + if (*(uint8_t *) b->pos != +#if (NGX_HAVE_LITTLE_ENDIAN) + 0 +#else /* big endian */ + 1 +#endif + ) + { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, + "rds: endian type in the header differ"); + return NGX_ERROR; + } + + b->pos += sizeof(uint8_t); + + /* check RDS format version number */ + + if (*(uint32_t *) b->pos != (uint32_t) resty_dbd_stream_version) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, + "rds: RDS format version differ"); + return NGX_ERROR; + } + + dd("RDS format version: %d", (int) *(uint32_t *) b->pos); + + b->pos += sizeof(uint32_t); + + /* check RDS result type */ + + if (*b->pos != 0) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, + "rds: RDS result type must be 0 for now"); + return NGX_ERROR; + } + + b->pos++; + + /* save the standard error code */ + + header->std_errcode = *(uint16_t *) b->pos; + + b->pos += sizeof(uint16_t); + + /* save the driver-specific error code */ + + header->drv_errcode = *(uint16_t *) b->pos; + + b->pos += sizeof(uint16_t); + + /* save the error string length */ + + header->errstr.len = *(uint16_t *) b->pos; + + b->pos += sizeof(uint16_t); + + dd("errstr len: %d", (int) header->errstr.len); + + /* check the rest data's size */ + + rest = header->errstr.len + + sizeof(uint64_t) /* affected rows */ + + sizeof(uint64_t) /* insert id */ + + sizeof(uint16_t) /* column count */ + ; + + if (b->last - b->pos < rest) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, + "rds: header is incomplete in the buf"); + return NGX_ERROR; + } + + /* save the error string data */ + + header->errstr.data = b->pos; + + b->pos += header->errstr.len; + + /* save affected rows */ + + header->affected_rows = *(uint64_t *) b->pos; + + b->pos += sizeof(uint64_t); + + /* save insert id */ + + header->insert_id = *(uint64_t *)b->pos; + + b->pos += sizeof(uint64_t); + + /* save column count */ + + header->col_count = *(uint16_t *) b->pos; + + b->pos += sizeof(uint16_t); + + dd("saved column count: %d", (int) header->col_count); + + return NGX_OK; +} + + +static ngx_inline ngx_int_t +ngx_http_rds_parse_col(ngx_http_request_t *r, ngx_buf_t *b, + ngx_http_rds_column_t *col) +{ + ssize_t rest; + + rest = sizeof(uint16_t) /* std col type */ + + sizeof(uint16_t) /* driver col type */ + + sizeof(uint16_t) /* col name str len */ + ; + + if (b->last - b->pos < rest) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, + "rds: column spec is incomplete in the buf"); + return NGX_ERROR; + } + + /* save standard column type */ + col->std_type = *(uint16_t *) b->pos; + b->pos += sizeof(uint16_t); + + /* save driver-specific column type */ + col->drv_type = *(uint16_t *) b->pos; + b->pos += sizeof(uint16_t); + + /* read column name string length */ + + col->name.len = *(uint16_t *) b->pos; + b->pos += sizeof(uint16_t); + + if (col->name.len == 0) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, + "rds_csv: column name empty"); + return NGX_ERROR; + } + + rest = col->name.len; + + if (b->last - b->pos < rest) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, + "rds: column name string is incomplete in the buf"); + return NGX_ERROR; + } + + /* save the column name string data */ + + col->name.data = ngx_palloc(r->pool, col->name.len); + if (col->name.data == NULL) { + return NGX_ERROR; + } + + ngx_memcpy(col->name.data, b->pos, col->name.len); + b->pos += col->name.len; + + dd("saved column name \"%.*s\" (len %d, offset %d)", + (int) col->name.len, col->name.data, + (int) col->name.len, (int) (b->pos - b->start)); + + return NGX_OK; +} + + +#endif /* NGX_HTTP_RDS_UTILS_H */ diff --git a/rds-csv-nginx-module-0.09/src/resty_dbd_stream.h b/rds-csv-nginx-module-0.09/src/resty_dbd_stream.h new file mode 100644 index 0000000..ebb0cb3 --- /dev/null +++ b/rds-csv-nginx-module-0.09/src/resty_dbd_stream.h @@ -0,0 +1,59 @@ +#ifndef RESTY_DBD_STREAME_H +#define RESTY_DBD_STREAME_H + +#define resty_dbd_stream_version 3 +#define resty_dbd_stream_version_string "0.0.3" + +#define rds_content_type \ + "application/x-resty-dbd-stream" + +#define rds_content_type_len \ + (sizeof(rds_content_type) - 1) + + +typedef enum { + rds_rough_col_type_int = 0 << 14, + rds_rough_col_type_float = 1 << 14, + rds_rough_col_type_str = 2 << 14, + rds_rough_col_type_bool = 3 << 14 + +} rds_rough_col_type_t; + + +/* The following types (or spellings thereof) are specified + * by SQL: + * bigint, bit, bit varying, boolean, char, character varying, + * character, varchar, date, double precision, integer, + * interval, numeric, decimal, real, smallint, + * time (with or without time zone), + * timestamp (with or without time zone), xml */ + +typedef enum { + rds_col_type_unknown = 0 | rds_rough_col_type_str, + rds_col_type_bigint = 1 | rds_rough_col_type_int, + rds_col_type_bit = 2 | rds_rough_col_type_str, + rds_col_type_bit_varying = 3 | rds_rough_col_type_str, + + rds_col_type_bool = 4 | rds_rough_col_type_bool, + rds_col_type_char = 5 | rds_rough_col_type_str, + rds_col_type_varchar = 6 | rds_rough_col_type_str, + rds_col_type_date = 7 | rds_rough_col_type_str, + rds_col_type_double = 8 | rds_rough_col_type_float, + rds_col_type_integer = 9 | rds_rough_col_type_int, + rds_col_type_interval = 10 | rds_rough_col_type_float, + rds_col_type_decimal = 11 | rds_rough_col_type_float, + rds_col_type_real = 12 | rds_rough_col_type_float, + rds_col_type_smallint = 13 | rds_rough_col_type_int, + rds_col_type_time_with_time_zone = 14 | rds_rough_col_type_str, + rds_col_type_time = 15 | rds_rough_col_type_str, + rds_col_type_timestamp_with_time_zone = 16 | rds_rough_col_type_str, + rds_col_type_timestamp = 17 | rds_rough_col_type_str, + rds_col_type_xml = 18 | rds_rough_col_type_str, + + /* our additions */ + rds_col_type_blob = 19 | rds_rough_col_type_str + +} rds_col_type_t; + +#endif /* RESTY_DBD_STREAME_H */ + |