diff options
Diffstat (limited to 'rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_output.c')
-rw-r--r-- | rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_output.c | 770 |
1 files changed, 770 insertions, 0 deletions
diff --git a/rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_output.c b/rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_output.c new file mode 100644 index 0000000..abb7db3 --- /dev/null +++ b/rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_output.c @@ -0,0 +1,770 @@ + +/* + * Copyright (C) Yichun Zhang (agentzh) + */ + + +#ifndef DDEBUG +#define DDEBUG 0 +#endif +#include "ddebug.h" + + +#include "ngx_http_rds_csv_filter_module.h" +#include "ngx_http_rds_csv_output.h" +#include "ngx_http_rds_csv_util.h" +#include "resty_dbd_stream.h" + + +static u_char *ngx_http_rds_csv_request_mem(ngx_http_request_t *r, + ngx_http_rds_csv_ctx_t *ctx, size_t len); +static ngx_int_t ngx_http_rds_csv_get_buf(ngx_http_request_t *r, + ngx_http_rds_csv_ctx_t *ctx); +static u_char *ngx_http_rds_csv_get_postponed(ngx_http_request_t *r, + ngx_http_rds_csv_ctx_t *ctx, size_t len); +static ngx_int_t ngx_http_rds_csv_submit_mem(ngx_http_request_t *r, + ngx_http_rds_csv_ctx_t *ctx, size_t len, unsigned last_buf); +static size_t ngx_get_num_size(uint64_t i); + + +ngx_int_t +ngx_http_rds_csv_output_literal(ngx_http_request_t *r, + ngx_http_rds_csv_ctx_t *ctx, u_char *data, size_t len, + int last_buf) +{ + u_char *pos; + + pos = ngx_http_rds_csv_request_mem(r, ctx, len); + if (pos == NULL) { + return NGX_ERROR; + } + + ngx_memcpy(pos, data, len); + + dd("before output chain"); + + if (last_buf) { + ctx->seen_stream_end = 1; + + if (r != r->main) { + last_buf = 0; + } + } + + return ngx_http_rds_csv_submit_mem(r, ctx, len, (unsigned) last_buf); +} + + +ngx_int_t +ngx_http_rds_csv_output_bufs(ngx_http_request_t *r, + ngx_http_rds_csv_ctx_t *ctx) +{ + ngx_int_t rc; + ngx_chain_t *cl; + + dd("entered output chain"); + + if (ctx->seen_stream_end) { + ctx->seen_stream_end = 0; + + if (ctx->avail_out) { + cl = ngx_alloc_chain_link(r->pool); + if (cl == NULL) { + return NGX_ERROR; + } + + cl->buf = ctx->out_buf; + cl->next = NULL; + *ctx->last_out = cl; + ctx->last_out = &cl->next; + + ctx->avail_out = 0; + } + } + + dd_dump_chain_size(); + + for ( ;; ) { + if (ctx->out == NULL) { + /* fprintf(stderr, "\n"); */ + return NGX_OK; + } + + /* fprintf(stderr, "XXX Relooping..."); */ + + rc = ngx_http_rds_csv_next_body_filter(r, ctx->out); + + if (rc == NGX_ERROR || rc >= NGX_HTTP_SPECIAL_RESPONSE) { + return rc; + } + +#if defined(nginx_version) && nginx_version >= 1001004 + ngx_chain_update_chains(r->pool, &ctx->free_bufs, &ctx->busy_bufs, + &ctx->out, ctx->tag); +#else + ngx_chain_update_chains(&ctx->free_bufs, &ctx->busy_bufs, &ctx->out, + ctx->tag); +#endif + + ctx->last_out = &ctx->out; + } + + /* impossible to reach here */ + return NGX_ERROR; +} + + +ngx_int_t +ngx_http_rds_csv_output_header(ngx_http_request_t *r, + ngx_http_rds_csv_ctx_t *ctx, ngx_http_rds_header_t *header) +{ + u_char *pos, *last; + size_t size; + uintptr_t escape; + unsigned last_buf = 0; + unsigned need_quotes = 0; + u_char sep; + + ngx_http_rds_csv_loc_conf_t *conf; + + /* calculate the buffer size */ + + conf = ngx_http_get_module_loc_conf(r, ngx_http_rds_csv_filter_module); + + if (conf->field_name_header) { + size = sizeof("errcode,errstr,insert_id,affected_rows") - 1 + + conf->row_term.len; + + } else { + size = 0; + } + + sep = (u_char) conf->field_sep; + + size += 3 /* field seperators */ + conf->row_term.len; + + size += ngx_get_num_size(header->std_errcode); + + escape = ngx_http_rds_csv_escape_csv_str(sep, NULL, header->errstr.data, + header->errstr.len, + &need_quotes); + + if (need_quotes) { + size += sizeof("\"\"") - 1; + } + + size += header->errstr.len + escape + + ngx_get_num_size(header->insert_id) + + ngx_get_num_size(header->affected_rows); + + /* create the buffer */ + + pos = ngx_http_rds_csv_request_mem(r, ctx, size); + if (pos == NULL) { + return NGX_ERROR; + } + + last = pos; + + /* fill up the buffer */ + + last = ngx_sprintf(last, "errcode%cerrstr%cinsert_id%caffected_rows%V" + "%uD%c", sep, sep, sep, &conf->row_term, + (uint32_t) header->std_errcode, sep); + + if (need_quotes) { + *last++ = '"'; + } + + if (escape == 0) { + last = ngx_copy(last, header->errstr.data, header->errstr.len); + + } else { + last = (u_char *) + ngx_http_rds_csv_escape_csv_str(sep, last, + header->errstr.data, + header->errstr.len, NULL); + } + + if (need_quotes) { + *last++ = '"'; + } + + last = ngx_sprintf(last, "%c%uL%c%uL%V", sep, header->insert_id, sep, + header->affected_rows, &conf->row_term); + + if ((size_t) (last - pos) != size) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, + "rds_csv: output header buffer error: %uz != %uz", + (size_t) (last - pos), size); + + return NGX_ERROR; + } + + if (r == r->main) { + last_buf = 1; + } + + ctx->seen_stream_end = 1; + + return ngx_http_rds_csv_submit_mem(r, ctx, size, last_buf); +} + + +ngx_int_t +ngx_http_rds_csv_output_field_names(ngx_http_request_t *r, + ngx_http_rds_csv_ctx_t *ctx) +{ + ngx_uint_t i; + ngx_http_rds_column_t *col; + size_t size; + u_char *pos, *last; + uintptr_t escape = 0; + unsigned need_quotes; + u_char sep; + ngx_http_rds_csv_loc_conf_t *conf; + + conf = ngx_http_get_module_loc_conf(r, ngx_http_rds_csv_filter_module); + + sep = (u_char) conf->field_sep; + + size = ctx->col_count - 1 /* field sep count */ + + conf->row_term.len; + + for (i = 0; i < ctx->col_count; i++) { + col = &ctx->cols[i]; + escape = ngx_http_rds_csv_escape_csv_str(sep, NULL, col->name.data, + col->name.len, &need_quotes); + + dd("field escape: %d", (int) escape); + + if (need_quotes) { + size += sizeof("\"\"") - 1; + } + + size += col->name.len + escape; + } + + ctx->generated_col_names = 1; + + pos = ngx_http_rds_csv_request_mem(r, ctx, size); + if (pos == NULL) { + return NGX_ERROR; + } + + last = pos; + + for (i = 0; i < ctx->col_count; i++) { + col = &ctx->cols[i]; + + escape = ngx_http_rds_csv_escape_csv_str(sep, NULL, col->name.data, + col->name.len, &need_quotes); + + if (need_quotes) { + *last++ = '"'; + } + + if (escape == 0) { + last = ngx_copy(last, col->name.data, col->name.len); + + } else { + last = (u_char *) + ngx_http_rds_csv_escape_csv_str(sep, last, + col->name.data, + col->name.len, NULL); + } + + if (need_quotes) { + *last++ = '"'; + } + + if (i != ctx->col_count - 1) { + *last++ = sep; + } + } + + last = ngx_copy(last, conf->row_term.data, conf->row_term.len); + + if ((size_t) (last - pos) != size) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, + "rds_csv: output field names buffer error: %uz != %uz", + (size_t) (last - pos), size); + + return NGX_ERROR; + } + + return ngx_http_rds_csv_submit_mem(r, ctx, size, 0); +} + + +ngx_int_t +ngx_http_rds_csv_output_field(ngx_http_request_t *r, + ngx_http_rds_csv_ctx_t *ctx, u_char *data, size_t len, int is_null) +{ + u_char *pos, *last; + ngx_http_rds_column_t *col; + size_t size; + uintptr_t val_escape = 0; + unsigned need_quotes = 0; + u_char sep; + ngx_http_rds_csv_loc_conf_t *conf; +#if DDEBUG + u_char *p; +#endif + + conf = ngx_http_get_module_loc_conf(r, ngx_http_rds_csv_filter_module); + + sep = (u_char) conf->field_sep; + + dd("reading row %llu, col %d, len %d", + (unsigned long long) ctx->row, + (int) ctx->cur_col, (int) len); + + /* calculate the buffer size */ + + if (ctx->cur_col == 0) { + size = 0; + + } else { + size = 1 /* field sep */; + } + + col = &ctx->cols[ctx->cur_col]; + + if (len == 0 && ctx->field_data_rest > 0) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, + "rds_csv: at least one octet should go with the field " + "size in one buf"); + + return NGX_ERROR; + } + + if (is_null) { + /* SQL NULL is just empty in the CSV field */ + + } else if (len == 0) { + /* empty string is also empty */ + + } else { + switch (col->std_type & 0xc000) { + case rds_rough_col_type_float: + case rds_rough_col_type_int: + case rds_rough_col_type_bool: + size += len; + break; + + default: + dd("string field found"); + + val_escape = ngx_http_rds_csv_escape_csv_str(sep, NULL, data, len, + &need_quotes); + + if (ctx->field_data_rest > 0 && !need_quotes) { + need_quotes = 1; + } + + if (need_quotes) { + if (ctx->field_data_rest == 0) { + size += sizeof("\"\"") - 1; + + } else { + size += sizeof("\"") - 1; + } + } + + size += len + val_escape; + break; + } + } + + if (ctx->field_data_rest == 0 && ctx->cur_col == ctx->col_count - 1) { + /* last column in the row */ + size += conf->row_term.len; + } + + /* allocate the buffer */ + + pos = ngx_http_rds_csv_request_mem(r, ctx, size); + if (pos == NULL) { + return NGX_ERROR; + } + + last = pos; + + /* fill up the buffer */ + + if (ctx->cur_col != 0) { + *last++ = sep; + } + + if (is_null || len == 0) { + /* do nothing */ + + } else { + switch (col->std_type & 0xc000) { + case rds_rough_col_type_int: + case rds_rough_col_type_float: + case rds_rough_col_type_bool: + last = ngx_copy(last, data, len); + break; + + default: + /* string */ + if (need_quotes) { + *last++ = '"'; + } + + if (val_escape == 0) { + last = ngx_copy(last, data, len); + + } else { + dd("field: string value escape non-zero: %d", + (int) val_escape); + +#if DDEBUG + p = last; +#endif + + last = (u_char *) + ngx_http_rds_csv_escape_csv_str(sep, last, data, len, + NULL); + +#if DDEBUG + dd("escaped value \"%.*s\" (len %d, escape %d, escape2 %d)", + (int) (len + val_escape), + p, (int) (len + val_escape), + (int) val_escape, + (int) ((last - p) - len)); +#endif + } + + if (need_quotes && ctx->field_data_rest == 0) { + *last++ = '"'; + } + + break; + } + } + + if (ctx->field_data_rest == 0 && ctx->cur_col == ctx->col_count - 1) { + last = ngx_copy(last, conf->row_term.data, conf->row_term.len); + } + + if ((size_t) (last - pos) != size) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, + "rds_csv: output field: buffer error (%d left)", + (int) size - (last - pos)); + + return NGX_ERROR; + } + + return ngx_http_rds_csv_submit_mem(r, ctx, size, 0); +} + + +ngx_int_t +ngx_http_rds_csv_output_more_field_data(ngx_http_request_t *r, + ngx_http_rds_csv_ctx_t *ctx, u_char *data, size_t len) +{ + u_char *pos, *last; + size_t size = 0; + ngx_http_rds_column_t *col; + uintptr_t escape = 0; +#if DDEBUG + u_char *p; +#endif + unsigned need_quotes; + u_char sep; + ngx_http_rds_csv_loc_conf_t *conf; + + conf = ngx_http_get_module_loc_conf(r, ngx_http_rds_csv_filter_module); + + sep = (u_char) conf->field_sep; + + /* calculate the buffer size */ + + col = &ctx->cols[ctx->cur_col]; + + switch (col->std_type & 0xc000) { + case rds_rough_col_type_int: + case rds_rough_col_type_float: + case rds_rough_col_type_bool: + size += len; + break; + + default: + /* string */ + + escape = ngx_http_rds_csv_escape_csv_str(sep, NULL, data, len, + &need_quotes); + + size = len + escape; + + if (ctx->field_data_rest == 0) { + size += sizeof("\"") - 1; + } + + break; + } + + if (ctx->field_data_rest == 0 && ctx->cur_col == ctx->col_count - 1) { + /* last column in the row */ + size += conf->row_term.len; + } + + /* allocate the buffer */ + + pos = ngx_http_rds_csv_request_mem(r, ctx, size); + if (pos == NULL) { + return NGX_ERROR; + } + + last = pos; + + /* fill up the buffer */ + + switch (col->std_type & 0xc000) { + case rds_rough_col_type_int: + case rds_rough_col_type_float: + case rds_rough_col_type_bool: + last = ngx_copy(last, data, len); + break; + + default: + /* string */ + if (escape == 0) { + last = ngx_copy(last, data, len); + + } else { + dd("more field data: string value escape non-zero: %d", + (int) escape); + +#if DDEBUG + p = last; +#endif + + last = (u_char *) ngx_http_rds_csv_escape_csv_str(sep, last, data, + len, NULL); + +#if DDEBUG + dd("escaped value \"%.*s\" (len %d, escape %d, escape2 %d)", + (int) (len + escape), + p, (int) (len + escape), + (int) escape, + (int) ((last - p) - len)); +#endif + } + + if (ctx->field_data_rest == 0) { + *last++ = '"'; + } + + break; + } /* switch */ + + if (ctx->field_data_rest == 0 && ctx->cur_col == ctx->col_count - 1) { + /* last column in the row */ + last = ngx_copy(last, conf->row_term.data, conf->row_term.len); + } + + if ((size_t) (last - pos) != size) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, + "rds_csv: output more field data: buffer error " + "(%d left)", (int) (size - (last - pos))); + return NGX_ERROR; + } + + return ngx_http_rds_csv_submit_mem(r, ctx, size, 0); +} + + +static u_char * +ngx_http_rds_csv_request_mem(ngx_http_request_t *r, + ngx_http_rds_csv_ctx_t *ctx, size_t len) +{ + ngx_int_t rc; + u_char *p; + + rc = ngx_http_rds_csv_get_buf(r, ctx); + if (rc != NGX_OK) { + return NULL; + } + + if (ctx->avail_out < len) { + p = ngx_http_rds_csv_get_postponed(r, ctx, len); + if (p == NULL) { + return NULL; + } + + ctx->postponed.pos = p; + ctx->postponed.last = p + len; + + return p; + } + + return ctx->out_buf->last; +} + + +static ngx_int_t +ngx_http_rds_csv_get_buf(ngx_http_request_t *r, ngx_http_rds_csv_ctx_t *ctx) +{ + ngx_http_rds_csv_loc_conf_t *conf; + + dd("MEM enter"); + + if (ctx->avail_out) { + return NGX_OK; + } + + conf = ngx_http_get_module_loc_conf(r, ngx_http_rds_csv_filter_module); + + if (ctx->free_bufs) { + dd("MEM reusing temp buf from free_bufs"); + + ctx->out_buf = ctx->free_bufs->buf; + ctx->free_bufs = ctx->free_bufs->next; + + } else { + dd("MEM creating temp buf with size: %d", (int) conf->buf_size); + ctx->out_buf = ngx_create_temp_buf(r->pool, conf->buf_size); + if (ctx->out_buf == NULL) { + return NGX_ERROR; + } + + ctx->out_buf->tag = (ngx_buf_tag_t) &ngx_http_rds_csv_filter_module; + ctx->out_buf->recycled = 1; + } + + ctx->avail_out = conf->buf_size; + + return NGX_OK; +} + + +static u_char * +ngx_http_rds_csv_get_postponed(ngx_http_request_t *r, + ngx_http_rds_csv_ctx_t *ctx, size_t len) +{ + u_char *p; + + dd("MEM enter"); + + if (ctx->cached.start == NULL) { + goto alloc; + } + + if ((size_t) (ctx->cached.end - ctx->cached.start) < len) { + ngx_pfree(r->pool, ctx->cached.start); + goto alloc; + } + + return ctx->cached.start; + +alloc: + + p = ngx_palloc(r->pool, len); + if (p == NULL) { + return NULL; + } + + ctx->cached.start = p; + ctx->cached.end = p + len; + + return p; +} + + +static ngx_int_t +ngx_http_rds_csv_submit_mem(ngx_http_request_t *r, + ngx_http_rds_csv_ctx_t *ctx, size_t len, unsigned last_buf) +{ + ngx_chain_t *cl; + ngx_int_t rc; + + if (ctx->postponed.pos != NULL) { + dd("MEM copy postponed data over to ctx->out for len %d", (int) len); + + for ( ;; ) { + len = ctx->postponed.last - ctx->postponed.pos; + if (len > ctx->avail_out) { + len = ctx->avail_out; + } + + ctx->out_buf->last = ngx_copy(ctx->out_buf->last, + ctx->postponed.pos, len); + + ctx->avail_out -= len; + + ctx->postponed.pos += len; + + if (ctx->postponed.pos == ctx->postponed.last) { + ctx->postponed.pos = NULL; + } + + if (ctx->avail_out > 0) { + break; + } + + dd("MEM save ctx->out_buf"); + + cl = ngx_alloc_chain_link(r->pool); + if (cl == NULL) { + return NGX_ERROR; + } + + cl->buf = ctx->out_buf; + cl->next = NULL; + *ctx->last_out = cl; + ctx->last_out = &cl->next; + + if (ctx->postponed.pos == NULL) { + ctx->out_buf->last_buf = last_buf; + break; + } + + rc = ngx_http_rds_csv_get_buf(r, ctx); + if (rc != NGX_OK) { + return NGX_ERROR; + } + } + + return NGX_OK; + } + + dd("MEM consuming out_buf for %d", (int) len); + + ctx->out_buf->last += len; + ctx->avail_out -= len; + ctx->out_buf->last_buf = last_buf; + + if (ctx->avail_out == 0) { + dd("MEM save ctx->out_buf"); + + cl = ngx_alloc_chain_link(r->pool); + if (cl == NULL) { + return NGX_ERROR; + } + + cl->buf = ctx->out_buf; + cl->next = NULL; + *ctx->last_out = cl; + ctx->last_out = &cl->next; + } + + return NGX_OK; +} + + +static size_t +ngx_get_num_size(uint64_t i) +{ + size_t n = 0; + + do { + i = i / 10; + n++; + } while (i > 0); + + return n; +} |