diff options
author | kaiwu <kaiwu2004@gmail.com> | 2025-03-01 12:42:23 +0800 |
---|---|---|
committer | kaiwu <kaiwu2004@gmail.com> | 2025-03-01 12:42:23 +0800 |
commit | 3f33461e4948bf05e60bdff35ec6c57a649c7860 (patch) | |
tree | 284c2ba95a41536ae1bff6bea710db0709a64739 /rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_processor.c | |
download | openresty-3f33461e4948bf05e60bdff35ec6c57a649c7860.tar.gz openresty-3f33461e4948bf05e60bdff35ec6c57a649c7860.zip |
openresty bundle
Diffstat (limited to 'rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_processor.c')
-rw-r--r-- | rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_processor.c | 476 |
1 files changed, 476 insertions, 0 deletions
diff --git a/rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_processor.c b/rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_processor.c new file mode 100644 index 0000000..4a156d1 --- /dev/null +++ b/rds-csv-nginx-module-0.09/src/ngx_http_rds_csv_processor.c @@ -0,0 +1,476 @@ +/* + * Copyright (C) agentzh + */ + + +#ifndef DDEBUG +#define DDEBUG 0 +#endif +#include "ddebug.h" + + +#include "ngx_http_rds_csv_processor.h" +#include "ngx_http_rds_csv_util.h" +#include "ngx_http_rds_csv_output.h" +#include "ngx_http_rds.h" +#include "ngx_http_rds_utils.h" + + +#include <ngx_core.h> +#include <ngx_http.h> + + +ngx_int_t +ngx_http_rds_csv_process_header(ngx_http_request_t *r, ngx_chain_t *in, + ngx_http_rds_csv_ctx_t *ctx) +{ + ngx_buf_t *b; + ngx_http_rds_header_t header; + ngx_int_t rc; + + if (in == NULL) { + return NGX_OK; + } + + b = in->buf; + + if (!ngx_buf_in_memory(b)) { + if (!ngx_buf_special(b)) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, + "rds_csv: process header: buf from " + "upstream not in memory"); + goto invalid; + } + + in = in->next; + + if (in == NULL) { + return NGX_OK; + } + + b = in->buf; + } + + rc = ngx_http_rds_parse_header(r, b, &header); + + if (rc != NGX_OK) { + goto invalid; + } + + dd("col count: %d", (int) header.col_count); + + if (header.col_count == 0) { + /* for empty result set, just return the JSON + * representation of the RDS header */ + + dd("col count == 0"); + + if (b->pos != b->last) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, + "rds_csv: header: there's unexpected remaining data " + "in the buf"); + + goto invalid; + } + + ctx->state = state_done; + + /* now we send the postponed response header */ + if (!ctx->header_sent) { + ctx->header_sent = 1; + + rc = ngx_http_rds_csv_next_header_filter(r); + + if (rc == NGX_ERROR || rc >= NGX_HTTP_SPECIAL_RESPONSE) { + return rc; + } + } + + rc = ngx_http_rds_csv_output_header(r, ctx, &header); + + if (rc == NGX_ERROR || rc >= NGX_HTTP_SPECIAL_RESPONSE) { + return rc; + } + + ngx_http_rds_csv_discard_bufs(r->pool, in); + + return rc; + } + + ctx->cols = ngx_palloc(r->pool, + header.col_count * sizeof(ngx_http_rds_column_t)); + + if (ctx->cols == NULL) { + goto invalid; + } + + ctx->state = state_expect_col; + ctx->cur_col = 0; + ctx->col_count = header.col_count; + + /* now we send the postponed response header */ + if (!ctx->header_sent) { + ctx->header_sent = 1; + + rc = ngx_http_rds_csv_next_header_filter(r); + if (rc == NGX_ERROR || rc >= NGX_HTTP_SPECIAL_RESPONSE) { + return rc; + } + } + + return ngx_http_rds_csv_process_col(r, b->pos == b->last ? in->next : in, + ctx); + +invalid: + + dd("return 500"); + if (!ctx->header_sent) { + ctx->header_sent = 1; + + r->headers_out.status = NGX_HTTP_INTERNAL_SERVER_ERROR; + ngx_http_send_header(r); + ngx_http_send_special(r, NGX_HTTP_LAST); + + return NGX_ERROR; + } + + return NGX_ERROR; +} + + +ngx_int_t +ngx_http_rds_csv_process_col(ngx_http_request_t *r, ngx_chain_t *in, + ngx_http_rds_csv_ctx_t *ctx) +{ + ngx_buf_t *b; + ngx_int_t rc; + ngx_http_rds_csv_loc_conf_t *conf; + + if (in == NULL) { + return NGX_OK; + } + + b = in->buf; + + if (!ngx_buf_in_memory(b)) { + if (!ngx_buf_special(b)) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, + "rds_csv: process col: buf from upstream not in " + "memory"); + return NGX_ERROR; + } + + in = in->next; + + if (in == NULL) { + return NGX_OK; + } + + b = in->buf; + } + + dd("parsing rds column"); + + rc = ngx_http_rds_parse_col(r, b, &ctx->cols[ctx->cur_col]); + + dd("parse col returns %d (%d)", (int) rc, (int) NGX_OK); + + if (rc != NGX_OK) { + return NGX_ERROR; + } + + if (b->pos == b->last) { + dd("parse col buf consumed"); + in = in->next; + } + + ctx->cur_col++; + + if (ctx->cur_col >= ctx->col_count) { + dd("end of column list"); + + ctx->state = state_expect_row; + ctx->row = 0; + + conf = ngx_http_get_module_loc_conf(r, ngx_http_rds_csv_filter_module); + + if (conf->field_name_header) { + rc = ngx_http_rds_csv_output_field_names(r, ctx); + + if (rc == NGX_ERROR || rc >= NGX_HTTP_SPECIAL_RESPONSE) { + return rc; + } + } + + dd("after output literal"); + + if (rc == NGX_ERROR || rc >= NGX_HTTP_SPECIAL_RESPONSE) { + return rc; + } + + + dd("process col is entering process row..."); + return ngx_http_rds_csv_process_row(r, in, ctx); + } + + return ngx_http_rds_csv_process_col(r, in, ctx); +} + + +ngx_int_t +ngx_http_rds_csv_process_row(ngx_http_request_t *r, ngx_chain_t *in, + ngx_http_rds_csv_ctx_t *ctx) +{ + ngx_buf_t *b; + ngx_int_t rc; + + if (in == NULL) { + return NGX_OK; + } + + dd("process row"); + + b = in->buf; + + if (!ngx_buf_in_memory(b)) { + if (!ngx_buf_special(b)) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, + "rds_csv: process row: buf from " + "upstream not in memory"); + return NGX_ERROR; + } + + in = in->next; + + if (in == NULL) { + return NGX_OK; + } + + b = in->buf; + } + + if (b->last - b->pos < (ssize_t) sizeof(uint8_t)) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, + "rds_csv: row flag is incomplete in the buf"); + return NGX_ERROR; + } + + dd("row flag: %d (offset %d)", (char) *b->pos, (int) (b->pos - b->start)); + + if (*b->pos++ == 0) { + /* end of row list */ + ctx->state = state_done; + + if (b->pos != b->last) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, + "rds_csv: row: there's unexpected remaining data " + "in the buf"); + return NGX_ERROR; + } + + rc = ngx_http_rds_csv_output_literal(r, ctx, (u_char *) "", 0, + 1 /* last buf*/); + + if (rc == NGX_ERROR || rc >= NGX_HTTP_SPECIAL_RESPONSE) { + return rc; + } + + return rc; + } + + ctx->row++; + ctx->cur_col = 0; + ctx->state = state_expect_field; + + if (b->pos == b->last) { + in = in->next; + + } else { + dd("process row: buf not consumed completely"); + } + + return ngx_http_rds_csv_process_field(r, in, ctx); +} + + +ngx_int_t +ngx_http_rds_csv_process_field(ngx_http_request_t *r, ngx_chain_t *in, + ngx_http_rds_csv_ctx_t *ctx) +{ + size_t total, len; + ngx_buf_t *b; + ngx_int_t rc; + + for (;;) { + if (in == NULL) { + return NGX_OK; + } + + b = in->buf; + + if (!ngx_buf_in_memory(b)) { + dd("buf not in memory"); + + if (!ngx_buf_special(b)) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, + "rds_csv: process field: buf from " + "upstream not in memory"); + return NGX_ERROR; + } + + in = in->next; + + if (in == NULL) { + return NGX_OK; + } + + b = in->buf; + } + + dd("process field: buf size: %d", (int) ngx_buf_size(b)); + + if (b->last - b->pos < (ssize_t) sizeof(uint32_t)) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, + "rds_csv: field size is incomplete in the buf: %*s " + "(len: %d)", b->last - b->pos, b->pos, + (int) (b->last - b->pos)); + + return NGX_ERROR; + } + + total = *(uint32_t *) b->pos; + + dd("total: %d", (int) total); + + b->pos += sizeof(uint32_t); + + if (total == (uint32_t) -1) { + /* SQL NULL found */ + total = 0; + len = 0; + ctx->field_data_rest = 0; + + rc = ngx_http_rds_csv_output_field(r, ctx, b->pos, len, + 1 /* is null */); + + } else { + len = (uint32_t) (b->last - b->pos); + + if (len >= total) { + len = total; + } + + ctx->field_data_rest = total - len; + + rc = ngx_http_rds_csv_output_field(r, ctx, b->pos, len, + 0 /* not null */); + } + + if (rc == NGX_ERROR || rc >= NGX_HTTP_SPECIAL_RESPONSE) { + return rc; + } + + b->pos += len; + + if (b->pos == b->last) { + in = in->next; + } + + if (len < total) { + dd("process field: need to read more field data"); + + ctx->state = state_expect_more_field_data; + + return ngx_http_rds_csv_process_more_field_data(r, in, ctx); + } + + ctx->cur_col++; + + if (ctx->cur_col >= ctx->col_count) { + dd("reached the end of the current row"); + + ctx->state = state_expect_row; + + return ngx_http_rds_csv_process_row(r, in, ctx); + } + + /* continue to process the next field (if any) */ + } + + /* impossible to reach here */ + + return NGX_ERROR; +} + + +ngx_int_t +ngx_http_rds_csv_process_more_field_data(ngx_http_request_t *r, + ngx_chain_t *in, ngx_http_rds_csv_ctx_t *ctx) +{ + ngx_int_t rc; + ngx_buf_t *b; + size_t len; + + for (;;) { + if (in == NULL) { + return NGX_OK; + } + + b = in->buf; + + if (!ngx_buf_in_memory(b)) { + ngx_log_error(NGX_LOG_ERR, r->connection->log, 0, + "rds_csv: buf from upstream not in memory"); + return NGX_ERROR; + } + + len = b->last - b->pos; + + if (len >= ctx->field_data_rest) { + len = ctx->field_data_rest; + ctx->field_data_rest = 0; + + } else { + ctx->field_data_rest -= len; + } + + rc = ngx_http_rds_csv_output_more_field_data(r, ctx, b->pos, len); + + if (rc == NGX_ERROR || rc >= NGX_HTTP_SPECIAL_RESPONSE) { + return rc; + } + + b->pos += len; + + if (b->pos == b->last) { + in = in->next; + } + + if (ctx->field_data_rest) { + dd("process more field data: still some data remaining"); + continue; + } + + dd("process more field data: reached the end of the current field"); + + ctx->cur_col++; + + if (ctx->cur_col >= ctx->col_count) { + dd("process more field data: reached the end of the current row"); + + ctx->state = state_expect_row; + + return ngx_http_rds_csv_process_row(r, in, ctx); + } + + dd("proces more field data: read the next field"); + + ctx->state = state_expect_field; + + return ngx_http_rds_csv_process_field(r, in, ctx); + } + + /* impossible to reach here */ + + return NGX_ERROR; +} |