summaryrefslogtreecommitdiff
path: root/ngx_stream_lua-0.0.16/src/ngx_stream_lua_socket_tcp.c
diff options
context:
space:
mode:
authorkaiwu <kaiwu2004@gmail.com>2025-03-01 12:42:23 +0800
committerkaiwu <kaiwu2004@gmail.com>2025-03-01 12:42:23 +0800
commit3f33461e4948bf05e60bdff35ec6c57a649c7860 (patch)
tree284c2ba95a41536ae1bff6bea710db0709a64739 /ngx_stream_lua-0.0.16/src/ngx_stream_lua_socket_tcp.c
downloadopenresty-3f33461e4948bf05e60bdff35ec6c57a649c7860.tar.gz
openresty-3f33461e4948bf05e60bdff35ec6c57a649c7860.zip
openresty bundle
Diffstat (limited to 'ngx_stream_lua-0.0.16/src/ngx_stream_lua_socket_tcp.c')
-rw-r--r--ngx_stream_lua-0.0.16/src/ngx_stream_lua_socket_tcp.c6242
1 files changed, 6242 insertions, 0 deletions
diff --git a/ngx_stream_lua-0.0.16/src/ngx_stream_lua_socket_tcp.c b/ngx_stream_lua-0.0.16/src/ngx_stream_lua_socket_tcp.c
new file mode 100644
index 0000000..c5b33df
--- /dev/null
+++ b/ngx_stream_lua-0.0.16/src/ngx_stream_lua_socket_tcp.c
@@ -0,0 +1,6242 @@
+
+/*
+ * !!! DO NOT EDIT DIRECTLY !!!
+ * This file was automatically generated from the following template:
+ *
+ * src/subsys/ngx_subsys_lua_socket_tcp.c.tt2
+ */
+
+
+/*
+ * Copyright (C) Yichun Zhang (agentzh)
+ */
+
+
+#ifndef DDEBUG
+#define DDEBUG 0
+#endif
+#include "ddebug.h"
+
+
+#include "ngx_stream_lua_socket_tcp.h"
+#include "ngx_stream_lua_input_filters.h"
+#include "ngx_stream_lua_util.h"
+#include "ngx_stream_lua_uthread.h"
+#include "ngx_stream_lua_output.h"
+#include "ngx_stream_lua_contentby.h"
+#include "ngx_stream_lua_probe.h"
+
+
+static int ngx_stream_lua_socket_tcp(lua_State *L);
+static int ngx_stream_lua_socket_tcp_connect(lua_State *L);
+#if (NGX_STREAM_SSL)
+static int ngx_stream_lua_socket_tcp_sslhandshake(lua_State *L);
+#endif
+static int ngx_stream_lua_socket_tcp_receive(lua_State *L);
+static int ngx_stream_lua_socket_tcp_receiveany(lua_State *L);
+static int ngx_stream_lua_socket_tcp_send(lua_State *L);
+static int ngx_stream_lua_socket_tcp_close(lua_State *L);
+static int ngx_stream_lua_socket_tcp_setoption(lua_State *L);
+static int ngx_stream_lua_socket_tcp_settimeout(lua_State *L);
+static int ngx_stream_lua_socket_tcp_settimeouts(lua_State *L);
+static void ngx_stream_lua_socket_tcp_handler(ngx_event_t *ev);
+static ngx_int_t ngx_stream_lua_socket_tcp_get_peer(ngx_peer_connection_t *pc,
+ void *data);
+static void ngx_stream_lua_socket_init_peer_connection_addr_text(
+ ngx_peer_connection_t *pc);
+static void ngx_stream_lua_socket_read_handler(ngx_stream_lua_request_t *r,
+ ngx_stream_lua_socket_tcp_upstream_t *u);
+static void ngx_stream_lua_socket_send_handler(ngx_stream_lua_request_t *r,
+ ngx_stream_lua_socket_tcp_upstream_t *u);
+static void ngx_stream_lua_socket_connected_handler(ngx_stream_lua_request_t *r,
+ ngx_stream_lua_socket_tcp_upstream_t *u);
+static void ngx_stream_lua_socket_tcp_cleanup(void *data);
+static void ngx_stream_lua_socket_tcp_finalize(ngx_stream_lua_request_t *r,
+ ngx_stream_lua_socket_tcp_upstream_t *u);
+static void ngx_stream_lua_socket_tcp_finalize_read_part(
+ ngx_stream_lua_request_t *r, ngx_stream_lua_socket_tcp_upstream_t *u);
+static void ngx_stream_lua_socket_tcp_finalize_write_part(
+ ngx_stream_lua_request_t *r, ngx_stream_lua_socket_tcp_upstream_t *u,
+ int do_shutdown);
+
+static ngx_int_t ngx_stream_lua_socket_send(ngx_stream_lua_request_t *r,
+ ngx_stream_lua_socket_tcp_upstream_t *u);
+static ngx_int_t ngx_stream_lua_socket_test_connect(ngx_stream_lua_request_t *r,
+ ngx_connection_t *c);
+static void ngx_stream_lua_socket_handle_conn_error(ngx_stream_lua_request_t *r,
+ ngx_stream_lua_socket_tcp_upstream_t *u, ngx_uint_t ft_type);
+static void ngx_stream_lua_socket_handle_read_error(ngx_stream_lua_request_t *r,
+ ngx_stream_lua_socket_tcp_upstream_t *u, ngx_uint_t ft_type);
+static void ngx_stream_lua_socket_handle_write_error(
+ ngx_stream_lua_request_t *r, ngx_stream_lua_socket_tcp_upstream_t *u,
+ ngx_uint_t ft_type);
+static void ngx_stream_lua_socket_handle_conn_success(
+ ngx_stream_lua_request_t *r, ngx_stream_lua_socket_tcp_upstream_t *u);
+static void ngx_stream_lua_socket_handle_read_success(
+ ngx_stream_lua_request_t *r, ngx_stream_lua_socket_tcp_upstream_t *u);
+static void ngx_stream_lua_socket_handle_write_success(
+ ngx_stream_lua_request_t *r, ngx_stream_lua_socket_tcp_upstream_t *u);
+static int ngx_stream_lua_socket_tcp_send_retval_handler(
+ ngx_stream_lua_request_t *r, ngx_stream_lua_socket_tcp_upstream_t *u,
+ lua_State *L);
+static int ngx_stream_lua_socket_tcp_conn_retval_handler(
+ ngx_stream_lua_request_t *r, ngx_stream_lua_socket_tcp_upstream_t *u,
+ lua_State *L);
+static void ngx_stream_lua_socket_dummy_handler(ngx_stream_lua_request_t *r,
+ ngx_stream_lua_socket_tcp_upstream_t *u);
+static int ngx_stream_lua_socket_tcp_receive_helper(ngx_stream_lua_request_t *r,
+ ngx_stream_lua_socket_tcp_upstream_t *u, lua_State *L);
+static ngx_int_t ngx_stream_lua_socket_tcp_read(ngx_stream_lua_request_t *r,
+ ngx_stream_lua_socket_tcp_upstream_t *u);
+static int ngx_stream_lua_socket_tcp_receive_retval_handler(
+ ngx_stream_lua_request_t *r, ngx_stream_lua_socket_tcp_upstream_t *u,
+ lua_State *L);
+static ngx_int_t ngx_stream_lua_socket_read_line(void *data, ssize_t bytes);
+static void ngx_stream_lua_socket_resolve_handler(ngx_resolver_ctx_t *ctx);
+static int ngx_stream_lua_socket_resolve_retval_handler(
+ ngx_stream_lua_request_t *r, ngx_stream_lua_socket_tcp_upstream_t *u,
+ lua_State *L);
+static int ngx_stream_lua_socket_conn_error_retval_handler(
+ ngx_stream_lua_request_t *r, ngx_stream_lua_socket_tcp_upstream_t *u,
+ lua_State *L);
+static int ngx_stream_lua_socket_read_error_retval_handler(
+ ngx_stream_lua_request_t *r, ngx_stream_lua_socket_tcp_upstream_t *u,
+ lua_State *L);
+static int ngx_stream_lua_socket_write_error_retval_handler(
+ ngx_stream_lua_request_t *r, ngx_stream_lua_socket_tcp_upstream_t *u,
+ lua_State *L);
+static ngx_int_t ngx_stream_lua_socket_read_all(void *data, ssize_t bytes);
+static ngx_int_t ngx_stream_lua_socket_read_until(void *data, ssize_t bytes);
+static ngx_int_t ngx_stream_lua_socket_read_chunk(void *data, ssize_t bytes);
+static ngx_int_t ngx_stream_lua_socket_read_any(void *data, ssize_t bytes);
+static int ngx_stream_lua_socket_tcp_receiveuntil(lua_State *L);
+static int ngx_stream_lua_socket_receiveuntil_iterator(lua_State *L);
+static ngx_int_t ngx_stream_lua_socket_compile_pattern(u_char *data, size_t len,
+ ngx_stream_lua_socket_compiled_pattern_t *cp, ngx_log_t *log);
+static int ngx_stream_lua_socket_cleanup_compiled_pattern(lua_State *L);
+static void ngx_stream_lua_req_socket_rev_handler(ngx_stream_lua_request_t *r);
+static int ngx_stream_lua_socket_tcp_getreusedtimes(lua_State *L);
+static int ngx_stream_lua_socket_tcp_setkeepalive(lua_State *L);
+static void ngx_stream_lua_socket_tcp_create_socket_pool(lua_State *L,
+ ngx_stream_lua_request_t *r, ngx_str_t key, ngx_int_t pool_size,
+ ngx_int_t backlog, ngx_stream_lua_socket_pool_t **spool);
+static ngx_int_t ngx_stream_lua_get_keepalive_peer(ngx_stream_lua_request_t *r,
+ ngx_stream_lua_socket_tcp_upstream_t *u);
+static void ngx_stream_lua_socket_keepalive_dummy_handler(ngx_event_t *ev);
+static int ngx_stream_lua_socket_tcp_connect_helper(lua_State *L,
+ ngx_stream_lua_socket_tcp_upstream_t *u, ngx_stream_lua_request_t *r,
+ ngx_stream_lua_ctx_t *ctx, u_char *host_ref, size_t host_len,
+ in_port_t port, unsigned resuming);
+static void ngx_stream_lua_socket_tcp_conn_op_timeout_handler(
+ ngx_event_t *ev);
+static int ngx_stream_lua_socket_tcp_conn_op_timeout_retval_handler(
+ ngx_stream_lua_request_t *r, ngx_stream_lua_socket_tcp_upstream_t *u,
+ lua_State *L);
+static void ngx_stream_lua_socket_tcp_resume_conn_op(
+ ngx_stream_lua_socket_pool_t *spool);
+static void ngx_stream_lua_socket_tcp_conn_op_ctx_cleanup(void *data);
+static void ngx_stream_lua_socket_tcp_conn_op_resume_handler(ngx_event_t *ev);
+static ngx_int_t ngx_stream_lua_socket_keepalive_close_handler(ngx_event_t *ev);
+static void ngx_stream_lua_socket_keepalive_rev_handler(ngx_event_t *ev);
+static int ngx_stream_lua_socket_tcp_conn_op_resume_retval_handler(
+ ngx_stream_lua_request_t *r, ngx_stream_lua_socket_tcp_upstream_t *u,
+ lua_State *L);
+static int ngx_stream_lua_socket_tcp_upstream_destroy(lua_State *L);
+static int ngx_stream_lua_socket_downstream_destroy(lua_State *L);
+static ngx_int_t ngx_stream_lua_socket_push_input_data(
+ ngx_stream_lua_request_t *r, ngx_stream_lua_ctx_t *ctx,
+ ngx_stream_lua_socket_tcp_upstream_t *u, lua_State *L);
+static ngx_int_t ngx_stream_lua_socket_add_pending_data(
+ ngx_stream_lua_request_t *r, ngx_stream_lua_socket_tcp_upstream_t *u,
+ u_char *pos, size_t len, u_char *pat, int prefix, int old_state);
+static ngx_int_t ngx_stream_lua_socket_add_input_buffer(
+ ngx_stream_lua_request_t *r, ngx_stream_lua_socket_tcp_upstream_t *u);
+static ngx_int_t ngx_stream_lua_socket_insert_buffer(
+ ngx_stream_lua_request_t *r, ngx_stream_lua_socket_tcp_upstream_t *u,
+ u_char *pat, size_t prefix);
+static ngx_int_t ngx_stream_lua_socket_tcp_conn_op_resume(
+ ngx_stream_lua_request_t *r);
+static ngx_int_t ngx_stream_lua_socket_tcp_conn_resume(
+ ngx_stream_lua_request_t *r);
+static ngx_int_t ngx_stream_lua_socket_tcp_read_resume(
+ ngx_stream_lua_request_t *r);
+static ngx_int_t ngx_stream_lua_socket_tcp_write_resume(
+ ngx_stream_lua_request_t *r);
+static ngx_int_t ngx_stream_lua_socket_tcp_resume_helper(
+ ngx_stream_lua_request_t *r, int socket_op);
+static void ngx_stream_lua_tcp_queue_conn_op_cleanup(void *data);
+static void ngx_stream_lua_tcp_resolve_cleanup(void *data);
+static void ngx_stream_lua_coctx_cleanup(void *data);
+static void ngx_stream_lua_socket_free_pool(ngx_log_t *log,
+ ngx_stream_lua_socket_pool_t *spool);
+static int ngx_stream_lua_socket_shutdown_pool(lua_State *L);
+static void ngx_stream_lua_socket_shutdown_pool_helper(
+ ngx_stream_lua_socket_pool_t *spool);
+static int ngx_stream_lua_socket_prepare_error_retvals(
+ ngx_stream_lua_request_t *r, ngx_stream_lua_socket_tcp_upstream_t *u,
+ lua_State *L, ngx_uint_t ft_type);
+#if (NGX_STREAM_SSL)
+static int ngx_stream_lua_ssl_handshake_retval_handler(
+ ngx_stream_lua_request_t *r, ngx_stream_lua_socket_tcp_upstream_t *u,
+ lua_State *L);
+static void ngx_stream_lua_ssl_handshake_handler(ngx_connection_t *c);
+static int ngx_stream_lua_ssl_free_session(lua_State *L);
+#endif
+static void ngx_stream_lua_socket_tcp_close_connection(ngx_connection_t *c);
+
+static int ngx_stream_lua_socket_tcp_peek(lua_State *L);
+static ngx_int_t ngx_stream_lua_socket_tcp_peek_resume(ngx_stream_lua_request_t *r);
+static int ngx_stream_lua_socket_tcp_shutdown(lua_State *L);
+
+
+enum {
+ SOCKET_CTX_INDEX = 1,
+ SOCKET_KEY_INDEX = 3,
+ SOCKET_CONNECT_TIMEOUT_INDEX = 2,
+ SOCKET_SEND_TIMEOUT_INDEX = 4,
+ SOCKET_READ_TIMEOUT_INDEX = 5,
+};
+
+
+enum {
+ SOCKET_OP_CONNECT,
+ SOCKET_OP_READ,
+ SOCKET_OP_WRITE,
+ SOCKET_OP_RESUME_CONN
+};
+
+
+#define ngx_stream_lua_socket_check_busy_connecting(r, u, L) \
+ if ((u)->conn_waiting) { \
+ lua_pushnil(L); \
+ lua_pushliteral(L, "socket busy connecting"); \
+ return 2; \
+ }
+
+
+#define ngx_stream_lua_socket_check_busy_reading(r, u, L) \
+ if ((u)->read_waiting) { \
+ lua_pushnil(L); \
+ lua_pushliteral(L, "socket busy reading"); \
+ return 2; \
+ }
+
+
+#define ngx_stream_lua_socket_check_busy_writing(r, u, L) \
+ if ((u)->write_waiting) { \
+ lua_pushnil(L); \
+ lua_pushliteral(L, "socket busy writing"); \
+ return 2; \
+ } \
+ if ((u)->raw_downstream \
+ && ((r)->connection->buffered)) \
+ { \
+ lua_pushnil(L); \
+ lua_pushliteral(L, "socket busy writing"); \
+ return 2; \
+ }
+
+
+
+static char ngx_stream_lua_raw_req_socket_metatable_key;
+static char ngx_stream_lua_tcp_socket_metatable_key;
+static char ngx_stream_lua_upstream_udata_metatable_key;
+static char ngx_stream_lua_downstream_udata_metatable_key;
+static char ngx_stream_lua_pool_udata_metatable_key;
+static char ngx_stream_lua_pattern_udata_metatable_key;
+#if (NGX_STREAM_SSL)
+static char ngx_stream_lua_ssl_session_metatable_key;
+#endif
+
+
+void
+ngx_stream_lua_inject_socket_tcp_api(ngx_log_t *log, lua_State *L)
+{
+ ngx_int_t rc;
+
+ lua_createtable(L, 0, 4 /* nrec */); /* ngx.socket */
+
+ lua_pushcfunction(L, ngx_stream_lua_socket_tcp);
+ lua_pushvalue(L, -1);
+ lua_setfield(L, -3, "tcp");
+ lua_setfield(L, -2, "stream");
+
+ {
+ const char buf[] = "local sock = ngx.socket.tcp()"
+ " local ok, err = sock:connect(...)"
+ " if ok then return sock else return nil, err end";
+
+ rc = luaL_loadbuffer(L, buf, sizeof(buf) - 1, "=ngx.socket.connect");
+ }
+
+ if (rc != NGX_OK) {
+ ngx_log_error(NGX_LOG_CRIT, log, 0,
+ "failed to load Lua code for ngx.socket.connect(): %i",
+ rc);
+
+ } else {
+ lua_setfield(L, -2, "connect");
+ }
+
+ lua_setfield(L, -2, "socket");
+
+
+ /* {{{raw req socket object metatable */
+ lua_pushlightuserdata(L, ngx_stream_lua_lightudata_mask(
+ raw_req_socket_metatable_key));
+ lua_createtable(L, 0 /* narr */, 9 /* nrec */);
+
+ lua_pushcfunction(L, ngx_stream_lua_socket_tcp_receive);
+ lua_setfield(L, -2, "receive");
+
+ lua_pushcfunction(L, ngx_stream_lua_socket_tcp_receiveany);
+ lua_setfield(L, -2, "receiveany");
+
+ lua_pushcfunction(L, ngx_stream_lua_socket_tcp_receiveuntil);
+ lua_setfield(L, -2, "receiveuntil");
+
+ lua_pushcfunction(L, ngx_stream_lua_socket_tcp_send);
+ lua_setfield(L, -2, "send");
+
+ lua_pushcfunction(L, ngx_stream_lua_socket_tcp_settimeout);
+ lua_setfield(L, -2, "settimeout"); /* ngx socket mt */
+
+ lua_pushcfunction(L, ngx_stream_lua_socket_tcp_settimeouts);
+ lua_setfield(L, -2, "settimeouts"); /* ngx socket mt */
+
+ lua_pushcfunction(L, ngx_stream_lua_socket_tcp_peek);
+ lua_setfield(L, -2, "peek");
+
+ lua_pushcfunction(L, ngx_stream_lua_socket_tcp_shutdown);
+ lua_setfield(L, -2, "shutdown");
+
+ lua_pushvalue(L, -1);
+ lua_setfield(L, -2, "__index");
+
+ lua_rawset(L, LUA_REGISTRYINDEX);
+ /* }}} */
+
+ /* {{{tcp object metatable */
+ lua_pushlightuserdata(L, ngx_stream_lua_lightudata_mask(
+ tcp_socket_metatable_key));
+ lua_createtable(L, 0 /* narr */, 14 /* nrec */);
+
+ lua_pushcfunction(L, ngx_stream_lua_socket_tcp_connect);
+ lua_setfield(L, -2, "connect");
+
+#if (NGX_STREAM_SSL)
+
+ lua_pushcfunction(L, ngx_stream_lua_socket_tcp_sslhandshake);
+ lua_setfield(L, -2, "sslhandshake");
+
+#endif
+
+ lua_pushcfunction(L, ngx_stream_lua_socket_tcp_receive);
+ lua_setfield(L, -2, "receive");
+
+ lua_pushcfunction(L, ngx_stream_lua_socket_tcp_receiveuntil);
+ lua_setfield(L, -2, "receiveuntil");
+
+ lua_pushcfunction(L, ngx_stream_lua_socket_tcp_receiveany);
+ lua_setfield(L, -2, "receiveany");
+
+ lua_pushcfunction(L, ngx_stream_lua_socket_tcp_send);
+ lua_setfield(L, -2, "send");
+
+ lua_pushcfunction(L, ngx_stream_lua_socket_tcp_close);
+ lua_setfield(L, -2, "close");
+
+ lua_pushcfunction(L, ngx_stream_lua_socket_tcp_setoption);
+ lua_setfield(L, -2, "setoption");
+
+ lua_pushcfunction(L, ngx_stream_lua_socket_tcp_settimeout);
+ lua_setfield(L, -2, "settimeout"); /* ngx socket mt */
+
+ lua_pushcfunction(L, ngx_stream_lua_socket_tcp_settimeouts);
+ lua_setfield(L, -2, "settimeouts"); /* ngx socket mt */
+
+ lua_pushcfunction(L, ngx_stream_lua_socket_tcp_getreusedtimes);
+ lua_setfield(L, -2, "getreusedtimes");
+
+ lua_pushcfunction(L, ngx_stream_lua_socket_tcp_setkeepalive);
+ lua_setfield(L, -2, "setkeepalive");
+
+ lua_pushcfunction(L, ngx_stream_lua_socket_tcp_shutdown);
+ lua_setfield(L, -2, "shutdown");
+
+ lua_pushvalue(L, -1);
+ lua_setfield(L, -2, "__index");
+ lua_rawset(L, LUA_REGISTRYINDEX);
+ /* }}} */
+
+ /* {{{upstream userdata metatable */
+ lua_pushlightuserdata(L, ngx_stream_lua_lightudata_mask(
+ upstream_udata_metatable_key));
+ lua_createtable(L, 0 /* narr */, 1 /* nrec */); /* metatable */
+ lua_pushcfunction(L, ngx_stream_lua_socket_tcp_upstream_destroy);
+ lua_setfield(L, -2, "__gc");
+ lua_rawset(L, LUA_REGISTRYINDEX);
+ /* }}} */
+
+ /* {{{downstream userdata metatable */
+ lua_pushlightuserdata(L, ngx_stream_lua_lightudata_mask(
+ downstream_udata_metatable_key));
+ lua_createtable(L, 0 /* narr */, 1 /* nrec */); /* metatable */
+ lua_pushcfunction(L, ngx_stream_lua_socket_downstream_destroy);
+ lua_setfield(L, -2, "__gc");
+ lua_rawset(L, LUA_REGISTRYINDEX);
+ /* }}} */
+
+ /* {{{socket pool userdata metatable */
+ lua_pushlightuserdata(L, ngx_stream_lua_lightudata_mask(
+ pool_udata_metatable_key));
+ lua_createtable(L, 0, 1); /* metatable */
+ lua_pushcfunction(L, ngx_stream_lua_socket_shutdown_pool);
+ lua_setfield(L, -2, "__gc");
+ lua_rawset(L, LUA_REGISTRYINDEX);
+ /* }}} */
+
+ /* {{{socket compiled pattern userdata metatable */
+ lua_pushlightuserdata(L, ngx_stream_lua_lightudata_mask(
+ pattern_udata_metatable_key));
+ lua_createtable(L, 0 /* narr */, 1 /* nrec */); /* metatable */
+ lua_pushcfunction(L, ngx_stream_lua_socket_cleanup_compiled_pattern);
+ lua_setfield(L, -2, "__gc");
+ lua_rawset(L, LUA_REGISTRYINDEX);
+ /* }}} */
+
+#if (NGX_STREAM_SSL)
+
+ /* {{{ssl session userdata metatable */
+ lua_pushlightuserdata(L, ngx_stream_lua_lightudata_mask(
+ ssl_session_metatable_key));
+ lua_createtable(L, 0 /* narr */, 1 /* nrec */); /* metatable */
+ lua_pushcfunction(L, ngx_stream_lua_ssl_free_session);
+ lua_setfield(L, -2, "__gc");
+ lua_rawset(L, LUA_REGISTRYINDEX);
+ /* }}} */
+
+#endif
+}
+
+
+static int
+ngx_stream_lua_socket_tcp(lua_State *L)
+{
+ ngx_stream_lua_request_t *r;
+ ngx_stream_lua_ctx_t *ctx;
+
+ if (lua_gettop(L) != 0) {
+ return luaL_error(L, "expecting zero arguments, but got %d",
+ lua_gettop(L));
+ }
+
+ r = ngx_stream_lua_get_req(L);
+ if (r == NULL) {
+ return luaL_error(L, "no request found");
+ }
+
+ ctx = ngx_stream_lua_get_module_ctx(r, ngx_stream_lua_module);
+ if (ctx == NULL) {
+ return luaL_error(L, "no ctx found");
+ }
+
+ ngx_stream_lua_check_context(L, ctx, NGX_STREAM_LUA_CONTEXT_YIELDABLE);
+
+ lua_createtable(L, 5 /* narr */, 1 /* nrec */);
+ lua_pushlightuserdata(L, ngx_stream_lua_lightudata_mask(
+ tcp_socket_metatable_key));
+ lua_rawget(L, LUA_REGISTRYINDEX);
+ lua_setmetatable(L, -2);
+
+ dd("top: %d", lua_gettop(L));
+
+ return 1;
+}
+
+
+static void
+ngx_stream_lua_socket_tcp_create_socket_pool(lua_State *L,
+ ngx_stream_lua_request_t *r, ngx_str_t key, ngx_int_t pool_size, ngx_int_t backlog,
+ ngx_stream_lua_socket_pool_t **spool)
+{
+ u_char *p;
+ size_t size, key_len;
+ ngx_int_t i;
+
+ ngx_stream_lua_socket_pool_t *sp;
+ ngx_stream_lua_socket_pool_item_t *items;
+
+ ngx_log_debug2(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "stream lua tcp socket connection pool size: "
+ "%i, backlog: %i",
+ pool_size, backlog);
+
+ key_len = ngx_align(key.len + 1, sizeof(void *));
+
+ size = sizeof(ngx_stream_lua_socket_pool_t) - 1 + key_len
+ + sizeof(ngx_stream_lua_socket_pool_item_t) * pool_size;
+
+ /* before calling this function, the Lua stack is:
+ * -1 key
+ * -2 pools
+ */
+ sp = lua_newuserdata(L, size);
+ if (sp == NULL) {
+ luaL_error(L, "no memory");
+ return;
+ }
+
+ lua_pushlightuserdata(L, ngx_stream_lua_lightudata_mask(
+ pool_udata_metatable_key));
+ lua_rawget(L, LUA_REGISTRYINDEX);
+ lua_setmetatable(L, -2);
+
+ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "stream lua tcp socket keepalive create "
+ "connection pool for key \"%V\"", &key);
+
+ /* a new socket pool with metatable is push to the stack, so now we have:
+ * -1 sp
+ * -2 key
+ * -3 pools
+ *
+ * it is time to set pools[key] to sp.
+ */
+ lua_rawset(L, -3);
+
+ /* clean up the stack for consistency's sake */
+ lua_pop(L, 1);
+
+ sp->backlog = backlog;
+ sp->size = pool_size;
+ sp->connections = 0;
+ sp->lua_vm = ngx_stream_lua_get_lua_vm(r, NULL);
+
+ ngx_queue_init(&sp->cache_connect_op);
+ ngx_queue_init(&sp->wait_connect_op);
+ ngx_queue_init(&sp->cache);
+ ngx_queue_init(&sp->free);
+
+ p = ngx_copy(sp->key, key.data, key.len);
+ *p++ = '\0';
+
+ items = (ngx_stream_lua_socket_pool_item_t *) (sp->key + key_len);
+
+ dd("items: %p", items);
+
+ ngx_stream_lua_assert((void *) items
+ == ngx_align_ptr(items, sizeof(void *)));
+
+ for (i = 0; i < pool_size; i++) {
+ ngx_queue_insert_head(&sp->free, &items[i].queue);
+ items[i].socket_pool = sp;
+ }
+
+ *spool = sp;
+}
+
+
+static int
+ngx_stream_lua_socket_tcp_connect_helper(lua_State *L,
+ ngx_stream_lua_socket_tcp_upstream_t *u, ngx_stream_lua_request_t *r,
+ ngx_stream_lua_ctx_t *ctx, u_char *host_ref, size_t host_len,
+ in_port_t port, unsigned resuming)
+{
+ int n;
+ int host_size;
+ int saved_top;
+ ngx_int_t rc;
+ ngx_str_t host;
+ ngx_str_t *conn_op_host;
+ ngx_url_t url;
+ ngx_queue_t *q;
+ ngx_resolver_ctx_t *rctx, temp;
+
+ ngx_stream_lua_co_ctx_t *coctx;
+ ngx_stream_lua_socket_pool_t *spool;
+ ngx_stream_lua_socket_tcp_conn_op_ctx_t *conn_op_ctx;
+
+ ngx_stream_core_srv_conf_t *clcf;
+
+ spool = u->socket_pool;
+ if (spool != NULL) {
+ rc = ngx_stream_lua_get_keepalive_peer(r, u);
+
+ if (rc == NGX_OK) {
+ lua_pushinteger(L, 1);
+ return 1;
+ }
+
+ /* rc == NGX_DECLINED */
+
+ spool->connections++;
+
+ /* check if backlog is enabled and
+ * don't queue resuming connection operation */
+ if (spool->backlog >= 0 && !resuming) {
+
+ dd("lua tcp socket %s connections %ld",
+ spool->key, spool->connections);
+
+ if (spool->connections > spool->size + spool->backlog) {
+ spool->connections--;
+ lua_pushnil(L);
+ lua_pushliteral(L, "too many waiting connect operations");
+ return 2;
+ }
+
+ if (spool->connections > spool->size) {
+ ngx_log_debug2(NGX_LOG_DEBUG_STREAM, u->peer.log, 0,
+ "stream lua tcp socket queue connect "
+ "operation for connection pool \"%s\", "
+ "connections: %i",
+ spool->key, spool->connections);
+
+ host_size = sizeof(u_char) *
+ (ngx_max(host_len, NGX_INET_ADDRSTRLEN) + 1);
+
+ if (!ngx_queue_empty(&spool->cache_connect_op)) {
+ q = ngx_queue_last(&spool->cache_connect_op);
+ ngx_queue_remove(q);
+ conn_op_ctx = ngx_queue_data(
+ q, ngx_stream_lua_socket_tcp_conn_op_ctx_t, queue);
+
+ conn_op_host = &conn_op_ctx->host;
+ if (host_len > conn_op_host->len
+ && host_len > NGX_INET_ADDRSTRLEN)
+ {
+ ngx_free(conn_op_host->data);
+ conn_op_host->data = ngx_alloc(host_size,
+ ngx_cycle->log);
+ if (conn_op_host->data == NULL) {
+ ngx_free(conn_op_ctx);
+ goto no_memory_and_not_resuming;
+ }
+ }
+
+ } else {
+ conn_op_ctx = ngx_alloc(
+ sizeof(ngx_stream_lua_socket_tcp_conn_op_ctx_t),
+ ngx_cycle->log);
+ if (conn_op_ctx == NULL) {
+ goto no_memory_and_not_resuming;
+ }
+
+ conn_op_host = &conn_op_ctx->host;
+ conn_op_host->data = ngx_alloc(host_size, ngx_cycle->log);
+ if (conn_op_host->data == NULL) {
+ ngx_free(conn_op_ctx);
+ goto no_memory_and_not_resuming;
+ }
+ }
+
+ conn_op_ctx->cleanup = NULL;
+
+ ngx_memcpy(conn_op_host->data, host_ref, host_len);
+ conn_op_host->data[host_len] = '\0';
+ conn_op_host->len = host_len;
+
+ conn_op_ctx->port = port;
+
+ u->write_co_ctx = ctx->cur_co_ctx;
+
+ conn_op_ctx->u = u;
+ ctx->cur_co_ctx->cleanup =
+ ngx_stream_lua_tcp_queue_conn_op_cleanup;
+ ctx->cur_co_ctx->data = conn_op_ctx;
+
+ ngx_memzero(&conn_op_ctx->event, sizeof(ngx_event_t));
+ conn_op_ctx->event.handler =
+ ngx_stream_lua_socket_tcp_conn_op_timeout_handler;
+ conn_op_ctx->event.data = conn_op_ctx;
+ conn_op_ctx->event.log = ngx_cycle->log;
+
+ ngx_add_timer(&conn_op_ctx->event, u->connect_timeout);
+
+ ngx_queue_insert_tail(&spool->wait_connect_op,
+ &conn_op_ctx->queue);
+
+ ngx_log_debug3(NGX_LOG_DEBUG_STREAM, ngx_cycle->log, 0,
+ "stream lua tcp socket queued connect "
+ "operation for %d(ms), u: %p, ctx: %p",
+ u->connect_timeout, conn_op_ctx->u, conn_op_ctx);
+
+ return lua_yield(L, 0);
+ }
+ }
+
+ } /* end spool != NULL */
+
+ host.data = ngx_palloc(r->pool, host_len + 1);
+ if (host.data == NULL) {
+ return luaL_error(L, "no memory");
+ }
+
+ host.len = host_len;
+
+ ngx_memcpy(host.data, host_ref, host_len);
+ host.data[host_len] = '\0';
+
+ ngx_memzero(&url, sizeof(ngx_url_t));
+ url.url = host;
+ url.default_port = port;
+ url.no_resolve = 1;
+
+ coctx = ctx->cur_co_ctx;
+
+ if (ngx_parse_url(r->pool, &url) != NGX_OK) {
+ lua_pushnil(L);
+
+ if (url.err) {
+ lua_pushfstring(L, "failed to parse host name \"%s\": %s",
+ url.url.data, url.err);
+
+ } else {
+ lua_pushfstring(L, "failed to parse host name \"%s\"",
+ url.url.data);
+ }
+
+ goto failed;
+ }
+
+ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "stream lua tcp socket connect timeout: %M",
+ u->connect_timeout);
+
+ u->resolved = ngx_pcalloc(r->pool,
+ sizeof(ngx_stream_upstream_resolved_t));
+ if (u->resolved == NULL) {
+ if (resuming) {
+ lua_pushnil(L);
+ lua_pushliteral(L, "no memory");
+ goto failed;
+ }
+
+ goto no_memory_and_not_resuming;
+ }
+
+ if (url.addrs && url.addrs[0].sockaddr) {
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "stream lua tcp socket network address given "
+ "directly");
+
+ u->resolved->sockaddr = url.addrs[0].sockaddr;
+ u->resolved->socklen = url.addrs[0].socklen;
+ u->resolved->naddrs = 1;
+ u->resolved->host = url.addrs[0].name;
+
+ } else {
+ u->resolved->host = host;
+ u->resolved->port = url.default_port;
+ }
+
+ if (u->resolved->sockaddr) {
+ rc = ngx_stream_lua_socket_resolve_retval_handler(r, u, L);
+ if (rc == NGX_AGAIN && !resuming) {
+ return lua_yield(L, 0);
+ }
+
+ if (rc > 1) {
+ goto failed;
+ }
+
+ return rc;
+ }
+
+ clcf = ngx_stream_lua_get_module_loc_conf(r, ngx_stream_core_module);
+
+ temp.name = host;
+ rctx = ngx_resolve_start(clcf->resolver, &temp);
+ if (rctx == NULL) {
+ u->ft_type |= NGX_STREAM_LUA_SOCKET_FT_RESOLVER;
+ lua_pushnil(L);
+ lua_pushliteral(L, "failed to start the resolver");
+ goto failed;
+ }
+
+ if (rctx == NGX_NO_RESOLVER) {
+ u->ft_type |= NGX_STREAM_LUA_SOCKET_FT_RESOLVER;
+ lua_pushnil(L);
+ lua_pushfstring(L, "no resolver defined to resolve \"%s\"", host.data);
+ goto failed;
+ }
+
+ rctx->name = host;
+ rctx->handler = ngx_stream_lua_socket_resolve_handler;
+ rctx->data = u;
+ rctx->timeout = clcf->resolver_timeout;
+
+ u->resolved->ctx = rctx;
+ u->write_co_ctx = ctx->cur_co_ctx;
+
+ ngx_stream_lua_cleanup_pending_operation(coctx);
+ coctx->cleanup = ngx_stream_lua_tcp_resolve_cleanup;
+ coctx->data = u;
+
+ saved_top = lua_gettop(L);
+
+ if (ngx_resolve_name(rctx) != NGX_OK) {
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "stream lua tcp socket fail to run resolver "
+ "immediately");
+
+ u->ft_type |= NGX_STREAM_LUA_SOCKET_FT_RESOLVER;
+
+ coctx->cleanup = NULL;
+ coctx->data = NULL;
+
+ u->resolved->ctx = NULL;
+ lua_pushnil(L);
+ lua_pushfstring(L, "%s could not be resolved", host.data);
+ goto failed;
+ }
+
+ if (u->conn_waiting) {
+ dd("resolved and already connecting");
+
+ if (resuming) {
+ return NGX_AGAIN;
+ }
+
+ return lua_yield(L, 0);
+ }
+
+ n = lua_gettop(L) - saved_top;
+ if (n) {
+ dd("errors occurred during resolving or connecting"
+ "or already connected");
+
+ if (n > 1) {
+ goto failed;
+ }
+
+ return n;
+ }
+
+ /* still resolving */
+
+ u->conn_waiting = 1;
+ u->write_prepare_retvals = ngx_stream_lua_socket_resolve_retval_handler;
+
+ dd("setting data to %p", u);
+
+ if (ctx->entered_content_phase) {
+ r->write_event_handler = ngx_stream_lua_content_wev_handler;
+
+ } else {
+ r->write_event_handler = ngx_stream_lua_core_run_phases;
+ }
+
+ if (resuming) {
+ return NGX_AGAIN;
+ }
+
+ return lua_yield(L, 0);
+
+failed:
+
+ if (spool != NULL) {
+ spool->connections--;
+ ngx_stream_lua_socket_tcp_resume_conn_op(spool);
+ }
+
+ return 2;
+
+no_memory_and_not_resuming:
+
+ if (spool != NULL) {
+ spool->connections--;
+ ngx_stream_lua_socket_tcp_resume_conn_op(spool);
+ }
+
+ return luaL_error(L, "no memory");
+}
+
+
+static int
+ngx_stream_lua_socket_tcp_connect(lua_State *L)
+{
+ ngx_stream_lua_request_t *r;
+ ngx_stream_lua_ctx_t *ctx;
+ int port;
+ int n;
+ u_char *p;
+ size_t len;
+ ngx_peer_connection_t *pc;
+ int connect_timeout, send_timeout, read_timeout;
+ unsigned custom_pool;
+ int key_index;
+ ngx_int_t backlog;
+ ngx_int_t pool_size;
+ ngx_str_t key;
+ const char *msg;
+
+ ngx_stream_lua_loc_conf_t *llcf;
+ ngx_stream_lua_socket_tcp_upstream_t *u;
+ ngx_stream_lua_socket_pool_t *spool;
+
+ n = lua_gettop(L);
+ if (n != 2 && n != 3 && n != 4) {
+ return luaL_error(L, "ngx.socket connect: expecting 2, 3, or 4 "
+ "arguments (including the object), but seen %d", n);
+ }
+
+ r = ngx_stream_lua_get_req(L);
+ if (r == NULL) {
+ return luaL_error(L, "no request found");
+ }
+
+ ctx = ngx_stream_lua_get_module_ctx(r, ngx_stream_lua_module);
+ if (ctx == NULL) {
+ return luaL_error(L, "no ctx found");
+ }
+
+ ngx_stream_lua_check_context(L, ctx, NGX_STREAM_LUA_CONTEXT_YIELDABLE);
+
+ luaL_checktype(L, 1, LUA_TTABLE);
+
+ p = (u_char *) luaL_checklstring(L, 2, &len);
+
+ backlog = -1;
+ key_index = 2;
+ pool_size = 0;
+ custom_pool = 0;
+ llcf = ngx_stream_lua_get_module_loc_conf(r, ngx_stream_lua_module);
+
+ if (lua_type(L, n) == LUA_TTABLE) {
+
+ /* found the last optional option table */
+
+ lua_getfield(L, n, "pool_size");
+
+ if (lua_isnumber(L, -1)) {
+ pool_size = (ngx_int_t) lua_tointeger(L, -1);
+
+ if (pool_size <= 0) {
+ msg = lua_pushfstring(L, "bad \"pool_size\" option value: %i",
+ pool_size);
+ return luaL_argerror(L, n, msg);
+ }
+
+ } else if (!lua_isnil(L, -1)) {
+ msg = lua_pushfstring(L, "bad \"pool_size\" option type: %s",
+ lua_typename(L, lua_type(L, -1)));
+ return luaL_argerror(L, n, msg);
+ }
+
+ lua_pop(L, 1);
+
+ lua_getfield(L, n, "backlog");
+
+ if (lua_isnumber(L, -1)) {
+ backlog = (ngx_int_t) lua_tointeger(L, -1);
+
+ if (backlog < 0) {
+ msg = lua_pushfstring(L, "bad \"backlog\" option value: %i",
+ backlog);
+ return luaL_argerror(L, n, msg);
+ }
+
+ /* use default value for pool size if only backlog specified */
+ if (pool_size == 0) {
+ pool_size = llcf->pool_size;
+ }
+ }
+
+ lua_pop(L, 1);
+
+ lua_getfield(L, n, "pool");
+
+ switch (lua_type(L, -1)) {
+ case LUA_TNUMBER:
+ lua_tostring(L, -1);
+ /* FALLTHROUGH */
+
+ case LUA_TSTRING:
+ custom_pool = 1;
+
+ lua_pushvalue(L, -1);
+ lua_rawseti(L, 1, SOCKET_KEY_INDEX);
+
+ key_index = n + 1;
+
+ break;
+
+ case LUA_TNIL:
+ lua_pop(L, 2);
+ break;
+
+ default:
+ msg = lua_pushfstring(L, "bad \"pool\" option type: %s",
+ luaL_typename(L, -1));
+ luaL_argerror(L, n, msg);
+ break;
+ }
+
+ n--;
+ }
+
+ /* the fourth argument is not a table */
+ if (n == 4) {
+ lua_pop(L, 1);
+ n--;
+ }
+
+ if (n == 3) {
+ port = luaL_checkinteger(L, 3);
+
+ if (port < 0 || port > 65535) {
+ lua_pushnil(L);
+ lua_pushfstring(L, "bad port number: %d", port);
+ return 2;
+ }
+
+ if (!custom_pool) {
+ lua_pushliteral(L, ":");
+ lua_insert(L, 3);
+ lua_concat(L, 3);
+ }
+
+ dd("socket key: %s", lua_tostring(L, -1));
+
+ } else { /* n == 2 */
+ port = 0;
+ }
+
+ if (!custom_pool) {
+ /* the key's index is 2 */
+
+ lua_pushvalue(L, 2);
+ lua_rawseti(L, 1, SOCKET_KEY_INDEX);
+ }
+
+ lua_rawgeti(L, 1, SOCKET_CTX_INDEX);
+ u = lua_touserdata(L, -1);
+ lua_pop(L, 1);
+
+ if (u) {
+ if (u->request && u->request != r) {
+ return luaL_error(L, "bad request");
+ }
+
+ ngx_stream_lua_socket_check_busy_connecting(r, u, L);
+ ngx_stream_lua_socket_check_busy_reading(r, u, L);
+ ngx_stream_lua_socket_check_busy_writing(r, u, L);
+
+ if (u->body_downstream || u->raw_downstream) {
+ return luaL_error(L, "attempt to re-connect a request socket");
+ }
+
+ if (u->peer.connection) {
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "stream lua tcp socket reconnect without "
+ "shutting down");
+
+ ngx_stream_lua_socket_tcp_finalize(r, u);
+ }
+
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "lua reuse socket upstream ctx");
+
+ } else {
+ u = lua_newuserdata(L, sizeof(ngx_stream_lua_socket_tcp_upstream_t));
+ if (u == NULL) {
+ return luaL_error(L, "no memory");
+ }
+
+#if 1
+ lua_pushlightuserdata(L, ngx_stream_lua_lightudata_mask(
+ upstream_udata_metatable_key));
+ lua_rawget(L, LUA_REGISTRYINDEX);
+ lua_setmetatable(L, -2);
+#endif
+
+ lua_rawseti(L, 1, SOCKET_CTX_INDEX);
+ }
+
+ ngx_memzero(u, sizeof(ngx_stream_lua_socket_tcp_upstream_t));
+
+ u->request = r; /* set the controlling request */
+
+ u->conf = llcf;
+
+ pc = &u->peer;
+
+ pc->log = r->connection->log;
+ pc->log_error = NGX_ERROR_ERR;
+
+ dd("lua peer connection log: %p", pc->log);
+
+ lua_rawgeti(L, 1, SOCKET_CONNECT_TIMEOUT_INDEX);
+ lua_rawgeti(L, 1, SOCKET_SEND_TIMEOUT_INDEX);
+ lua_rawgeti(L, 1, SOCKET_READ_TIMEOUT_INDEX);
+
+ read_timeout = (ngx_int_t) lua_tointeger(L, -1);
+ send_timeout = (ngx_int_t) lua_tointeger(L, -2);
+ connect_timeout = (ngx_int_t) lua_tointeger(L, -3);
+
+ lua_pop(L, 3);
+
+ if (connect_timeout > 0) {
+ u->connect_timeout = (ngx_msec_t) connect_timeout;
+
+ } else {
+ u->connect_timeout = u->conf->connect_timeout;
+ }
+
+ if (send_timeout > 0) {
+ u->send_timeout = (ngx_msec_t) send_timeout;
+
+ } else {
+ u->send_timeout = u->conf->send_timeout;
+ }
+
+ if (read_timeout > 0) {
+ u->read_timeout = (ngx_msec_t) read_timeout;
+
+ } else {
+ u->read_timeout = u->conf->read_timeout;
+ }
+
+ lua_pushlightuserdata(L, ngx_stream_lua_lightudata_mask(
+ socket_pool_key));
+ lua_rawget(L, LUA_REGISTRYINDEX); /* table */
+ lua_pushvalue(L, key_index); /* key */
+
+ lua_rawget(L, -2);
+ spool = lua_touserdata(L, -1);
+ lua_pop(L, 1);
+
+ if (spool != NULL) {
+ u->socket_pool = spool;
+
+ } else if (pool_size > 0) {
+ lua_pushvalue(L, key_index);
+ key.data = (u_char *) lua_tolstring(L, -1, &key.len);
+
+ ngx_stream_lua_socket_tcp_create_socket_pool(L, r, key, pool_size,
+ backlog, &spool);
+ u->socket_pool = spool;
+ }
+
+ return ngx_stream_lua_socket_tcp_connect_helper(L, u, r, ctx, p,
+ len, port, 0);
+}
+
+
+static void
+ngx_stream_lua_socket_resolve_handler(ngx_resolver_ctx_t *ctx)
+{
+ ngx_stream_lua_request_t *r;
+#if (NGX_DEBUG)
+ ngx_connection_t *c;
+#endif
+ lua_State *L;
+ u_char *p;
+ size_t len;
+ socklen_t socklen;
+ struct sockaddr *sockaddr;
+ ngx_uint_t i;
+ unsigned waiting;
+
+ ngx_stream_upstream_resolved_t *ur;
+ ngx_stream_lua_ctx_t *lctx;
+ ngx_stream_lua_socket_tcp_upstream_t *u;
+
+ u = ctx->data;
+ r = u->request;
+
+#if (NGX_DEBUG)
+
+ c = r->connection;
+
+#endif
+
+ ur = u->resolved;
+
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, c->log, 0,
+ "stream lua tcp socket resolve handler");
+
+ lctx = ngx_stream_lua_get_module_ctx(r, ngx_stream_lua_module);
+ if (lctx == NULL) {
+ return;
+ }
+
+ lctx->cur_co_ctx = u->write_co_ctx;
+
+ u->write_co_ctx->cleanup = NULL;
+
+ L = lctx->cur_co_ctx->co;
+
+ waiting = u->conn_waiting;
+
+ if (ctx->state) {
+ ngx_log_debug2(NGX_LOG_DEBUG_STREAM, c->log, 0,
+ "stream lua tcp socket resolver error: %s "
+ "(connect waiting: %d)",
+ ngx_resolver_strerror(ctx->state), (int) waiting);
+
+ lua_pushnil(L);
+ lua_pushlstring(L, (char *) ctx->name.data, ctx->name.len);
+ lua_pushfstring(L, " could not be resolved (%d: %s)",
+ (int) ctx->state,
+ ngx_resolver_strerror(ctx->state));
+ lua_concat(L, 2);
+
+ u->write_prepare_retvals =
+ ngx_stream_lua_socket_conn_error_retval_handler;
+ ngx_stream_lua_socket_handle_conn_error(r, u,
+ NGX_STREAM_LUA_SOCKET_FT_RESOLVER);
+
+
+ return;
+ }
+
+ ur->naddrs = ctx->naddrs;
+ ur->addrs = ctx->addrs;
+
+#if (NGX_DEBUG)
+ {
+ u_char text[NGX_SOCKADDR_STRLEN];
+ ngx_str_t addr;
+ ngx_uint_t i;
+
+ addr.data = text;
+
+ for (i = 0; i < ctx->naddrs; i++) {
+ addr.len = ngx_sock_ntop(ur->addrs[i].sockaddr, ur->addrs[i].socklen,
+ text, NGX_SOCKADDR_STRLEN, 0);
+
+ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "name was resolved to %V", &addr);
+ }
+ }
+#endif
+
+ ngx_stream_lua_assert(ur->naddrs > 0);
+
+ if (ur->naddrs == 1) {
+ i = 0;
+
+ } else {
+ i = ngx_random() % ur->naddrs;
+ }
+
+ dd("selected addr index: %d", (int) i);
+
+ socklen = ur->addrs[i].socklen;
+
+ sockaddr = ngx_palloc(r->pool, socklen);
+ if (sockaddr == NULL) {
+ goto nomem;
+ }
+
+ ngx_memcpy(sockaddr, ur->addrs[i].sockaddr, socklen);
+
+ switch (sockaddr->sa_family) {
+#if (NGX_HAVE_INET6)
+ case AF_INET6:
+ ((struct sockaddr_in6 *) sockaddr)->sin6_port = htons(ur->port);
+ break;
+#endif
+ default: /* AF_INET */
+ ((struct sockaddr_in *) sockaddr)->sin_port = htons(ur->port);
+ }
+
+ p = ngx_pnalloc(r->pool, NGX_SOCKADDR_STRLEN);
+ if (p == NULL) {
+ goto nomem;
+ }
+
+ len = ngx_sock_ntop(sockaddr, socklen, p, NGX_SOCKADDR_STRLEN, 1);
+ ur->sockaddr = sockaddr;
+ ur->socklen = socklen;
+ ur->host.data = p;
+ ur->host.len = len;
+ ur->naddrs = 1;
+
+ ngx_resolve_name_done(ctx);
+ ur->ctx = NULL;
+
+ u->conn_waiting = 0;
+ u->write_co_ctx = NULL;
+
+ if (waiting) {
+ lctx->resume_handler = ngx_stream_lua_socket_tcp_conn_resume;
+ r->write_event_handler(r);
+
+
+ } else {
+ (void) ngx_stream_lua_socket_resolve_retval_handler(r, u, L);
+ }
+
+ return;
+
+nomem:
+
+ if (ur->ctx) {
+ ngx_resolve_name_done(ctx);
+ ur->ctx = NULL;
+ }
+
+ u->write_prepare_retvals = ngx_stream_lua_socket_conn_error_retval_handler;
+ ngx_stream_lua_socket_handle_conn_error(r, u,
+ NGX_STREAM_LUA_SOCKET_FT_NOMEM);
+
+ if (waiting) {
+ dd("run posted requests");
+
+
+ } else {
+ lua_pushnil(L);
+ lua_pushliteral(L, "no memory");
+ }
+}
+
+
+static void
+ngx_stream_lua_socket_init_peer_connection_addr_text(ngx_peer_connection_t *pc)
+{
+ ngx_connection_t *c;
+ size_t addr_text_max_len;
+
+ c = pc->connection;
+
+ switch (pc->sockaddr->sa_family) {
+
+#if (NGX_HAVE_INET6)
+ case AF_INET6:
+ addr_text_max_len = NGX_INET6_ADDRSTRLEN;
+ break;
+#endif
+
+#if (NGX_HAVE_UNIX_DOMAIN)
+ case AF_UNIX:
+ addr_text_max_len = NGX_UNIX_ADDRSTRLEN;
+ break;
+#endif
+
+ case AF_INET:
+ addr_text_max_len = NGX_INET_ADDRSTRLEN;
+ break;
+
+ default:
+ addr_text_max_len = NGX_SOCKADDR_STRLEN;
+ break;
+ }
+
+ c->addr_text.data = ngx_pnalloc(c->pool, addr_text_max_len);
+ if (c->addr_text.data == NULL) {
+ ngx_log_error(NGX_LOG_ERR, pc->log, 0,
+ "init peer connection addr_text failed: no memory");
+ return;
+ }
+
+ c->addr_text.len = ngx_sock_ntop(pc->sockaddr, pc->socklen,
+ c->addr_text.data,
+ addr_text_max_len, 0);
+}
+
+
+static int
+ngx_stream_lua_socket_resolve_retval_handler(ngx_stream_lua_request_t *r,
+ ngx_stream_lua_socket_tcp_upstream_t *u, lua_State *L)
+{
+ ngx_stream_lua_ctx_t *ctx;
+ ngx_peer_connection_t *pc;
+ ngx_connection_t *c;
+ ngx_stream_lua_cleanup_t *cln;
+ ngx_stream_upstream_resolved_t *ur;
+ ngx_int_t rc;
+ ngx_stream_lua_co_ctx_t *coctx;
+
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "stream lua tcp socket resolve retval handler");
+
+ if (u->ft_type & NGX_STREAM_LUA_SOCKET_FT_RESOLVER) {
+ return 2;
+ }
+
+ pc = &u->peer;
+
+ ur = u->resolved;
+
+ if (ur->sockaddr) {
+ pc->sockaddr = ur->sockaddr;
+ pc->socklen = ur->socklen;
+ pc->name = &ur->host;
+
+ } else {
+ lua_pushnil(L);
+ lua_pushliteral(L, "resolver not working");
+ return 2;
+ }
+
+ pc->get = ngx_stream_lua_socket_tcp_get_peer;
+
+ rc = ngx_event_connect_peer(pc);
+
+ if (rc == NGX_ERROR) {
+ u->socket_errno = ngx_socket_errno;
+ }
+
+ if (u->cleanup == NULL) {
+ cln = ngx_stream_lua_cleanup_add(r, 0);
+ if (cln == NULL) {
+ u->ft_type |= NGX_STREAM_LUA_SOCKET_FT_ERROR;
+ lua_pushnil(L);
+ lua_pushliteral(L, "no memory");
+ return 2;
+ }
+
+ cln->handler = ngx_stream_lua_socket_tcp_cleanup;
+ cln->data = u;
+ u->cleanup = &cln->handler;
+ }
+
+ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "stream lua tcp socket connect: %i", rc);
+
+ if (rc == NGX_ERROR) {
+ return ngx_stream_lua_socket_conn_error_retval_handler(r, u, L);
+ }
+
+ if (rc == NGX_BUSY) {
+ u->ft_type |= NGX_STREAM_LUA_SOCKET_FT_ERROR;
+ lua_pushnil(L);
+ lua_pushliteral(L, "no live connection");
+ return 2;
+ }
+
+ if (rc == NGX_DECLINED) {
+ dd("socket errno: %d", (int) ngx_socket_errno);
+ u->ft_type |= NGX_STREAM_LUA_SOCKET_FT_ERROR;
+ u->socket_errno = ngx_socket_errno;
+ return ngx_stream_lua_socket_conn_error_retval_handler(r, u, L);
+ }
+
+ /* rc == NGX_OK || rc == NGX_AGAIN */
+
+ c = pc->connection;
+
+ c->data = u;
+
+ c->write->handler = ngx_stream_lua_socket_tcp_handler;
+ c->read->handler = ngx_stream_lua_socket_tcp_handler;
+
+ u->write_event_handler = ngx_stream_lua_socket_connected_handler;
+ u->read_event_handler = ngx_stream_lua_socket_connected_handler;
+
+ c->sendfile &= r->connection->sendfile;
+
+ if (c->pool == NULL) {
+
+ /* we need separate pool here to be able to cache SSL connections */
+
+ c->pool = ngx_create_pool(128, r->connection->log);
+ if (c->pool == NULL) {
+ return ngx_stream_lua_socket_prepare_error_retvals(r, u, L,
+ NGX_STREAM_LUA_SOCKET_FT_NOMEM);
+ }
+ }
+
+ c->log = r->connection->log;
+ c->pool->log = c->log;
+ c->read->log = c->log;
+ c->write->log = c->log;
+
+ /* init or reinit the ngx_output_chain() and ngx_chain_writer() contexts */
+
+#if 0
+ u->writer.out = NULL;
+ u->writer.last = &u->writer.out;
+#endif
+
+ ctx = ngx_stream_lua_get_module_ctx(r, ngx_stream_lua_module);
+
+ coctx = ctx->cur_co_ctx;
+
+ dd("setting data to %p", u);
+
+ if (rc == NGX_OK) {
+ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "stream lua tcp socket connected: fd:%d", (int) c->fd);
+
+ /* We should delete the current write/read event
+ * here because the socket object may not be used immediately
+ * on the Lua land, thus causing hot spin around level triggered
+ * event poll and wasting CPU cycles. */
+
+ if (ngx_handle_write_event(c->write, 0) != NGX_OK) {
+ ngx_stream_lua_socket_handle_conn_error(r, u,
+ NGX_STREAM_LUA_SOCKET_FT_ERROR);
+ lua_pushnil(L);
+ lua_pushliteral(L, "failed to handle write event");
+ return 2;
+ }
+
+ if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
+ ngx_stream_lua_socket_handle_conn_error(r, u,
+ NGX_STREAM_LUA_SOCKET_FT_ERROR);
+ lua_pushnil(L);
+ lua_pushliteral(L, "failed to handle read event");
+ return 2;
+ }
+
+ u->read_event_handler = ngx_stream_lua_socket_dummy_handler;
+ u->write_event_handler = ngx_stream_lua_socket_dummy_handler;
+
+ lua_pushinteger(L, 1);
+ return 1;
+ }
+
+ /* rc == NGX_AGAIN */
+
+ ngx_stream_lua_cleanup_pending_operation(coctx);
+ coctx->cleanup = ngx_stream_lua_coctx_cleanup;
+ coctx->data = u;
+
+ ngx_add_timer(c->write, u->connect_timeout);
+
+ u->write_co_ctx = ctx->cur_co_ctx;
+ u->conn_waiting = 1;
+ u->write_prepare_retvals = ngx_stream_lua_socket_tcp_conn_retval_handler;
+
+ dd("setting data to %p", u);
+
+ if (ctx->entered_content_phase) {
+ r->write_event_handler = ngx_stream_lua_content_wev_handler;
+
+ } else {
+ r->write_event_handler = ngx_stream_lua_core_run_phases;
+ }
+
+ return NGX_AGAIN;
+}
+
+
+static int
+ngx_stream_lua_socket_conn_error_retval_handler(ngx_stream_lua_request_t *r,
+ ngx_stream_lua_socket_tcp_upstream_t *u, lua_State *L)
+{
+ ngx_uint_t ft_type;
+
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "stream lua tcp socket error retval handler");
+
+ if (u->write_co_ctx) {
+ u->write_co_ctx->cleanup = NULL;
+ }
+
+ ngx_stream_lua_socket_tcp_finalize(r, u);
+
+ ft_type = u->ft_type;
+ u->ft_type = 0;
+ return ngx_stream_lua_socket_prepare_error_retvals(r, u, L, ft_type);
+}
+
+
+#if (NGX_STREAM_SSL)
+
+static int
+ngx_stream_lua_socket_tcp_sslhandshake(lua_State *L)
+{
+ int n, top;
+ ngx_int_t rc;
+ ngx_str_t name = ngx_null_string;
+ ngx_connection_t *c;
+ ngx_ssl_session_t **psession;
+
+ ngx_stream_lua_request_t *r;
+ ngx_stream_lua_ctx_t *ctx;
+ ngx_stream_lua_co_ctx_t *coctx;
+ ngx_stream_lua_socket_tcp_upstream_t *u;
+
+ /* Lua function arguments: self [,session] [,host] [,verify]
+ [,send_status_req] */
+
+ n = lua_gettop(L);
+ if (n < 1 || n > 5) {
+ return luaL_error(L, "ngx.socket sslhandshake: expecting 1 ~ 5 "
+ "arguments (including the object), but seen %d", n);
+ }
+
+ r = ngx_stream_lua_get_req(L);
+ if (r == NULL) {
+ return luaL_error(L, "no request found");
+ }
+
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "stream lua tcp socket ssl handshake");
+
+ luaL_checktype(L, 1, LUA_TTABLE);
+
+ lua_rawgeti(L, 1, SOCKET_CTX_INDEX);
+ u = lua_touserdata(L, -1);
+
+ if (u == NULL
+ || u->peer.connection == NULL
+ || u->read_closed
+ || u->write_closed)
+ {
+ lua_pushnil(L);
+ lua_pushliteral(L, "closed");
+ return 2;
+ }
+
+ if (u->request != r) {
+ return luaL_error(L, "bad request");
+ }
+
+ ngx_stream_lua_socket_check_busy_connecting(r, u, L);
+ ngx_stream_lua_socket_check_busy_reading(r, u, L);
+ ngx_stream_lua_socket_check_busy_writing(r, u, L);
+
+ if (u->raw_downstream || u->body_downstream) {
+ lua_pushnil(L);
+ lua_pushliteral(L, "not supported for downstream");
+ return 2;
+ }
+
+ c = u->peer.connection;
+
+ u->ssl_session_reuse = 1;
+
+ if (c->ssl && c->ssl->handshaked) {
+ switch (lua_type(L, 2)) {
+ case LUA_TUSERDATA:
+ lua_pushvalue(L, 2);
+ break;
+
+ case LUA_TBOOLEAN:
+ if (!lua_toboolean(L, 2)) {
+ /* avoid generating the ssl session */
+ lua_pushboolean(L, 1);
+ break;
+ }
+ /* fall through */
+
+ default:
+ ngx_stream_lua_ssl_handshake_retval_handler(r, u, L);
+ break;
+ }
+
+ return 1;
+ }
+
+ if (ngx_ssl_create_connection(u->conf->ssl, c,
+ NGX_SSL_BUFFER|NGX_SSL_CLIENT)
+ != NGX_OK)
+ {
+ lua_pushnil(L);
+ lua_pushliteral(L, "failed to create ssl connection");
+ return 2;
+ }
+
+ ctx = ngx_stream_lua_get_module_ctx(r, ngx_stream_lua_module);
+ if (ctx == NULL) {
+ return luaL_error(L, "no ctx found");
+ }
+
+ coctx = ctx->cur_co_ctx;
+
+ c->sendfile = 0;
+
+ if (n >= 2) {
+ if (lua_type(L, 2) == LUA_TBOOLEAN) {
+ u->ssl_session_reuse = lua_toboolean(L, 2);
+
+ } else {
+ psession = lua_touserdata(L, 2);
+
+ if (psession != NULL && *psession != NULL) {
+ if (ngx_ssl_set_session(c, *psession) != NGX_OK) {
+ lua_pushnil(L);
+ lua_pushliteral(L, "lua ssl set session failed");
+ return 2;
+ }
+
+ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, c->log, 0,
+ "stream lua ssl set session: %p", *psession);
+ }
+ }
+
+ if (n >= 3) {
+ name.data = (u_char *) lua_tolstring(L, 3, &name.len);
+
+ if (name.data) {
+ ngx_log_debug2(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "stream lua ssl server name: \"%*s\"", name.len,
+ name.data);
+
+#ifdef SSL_CTRL_SET_TLSEXT_HOSTNAME
+
+ if (SSL_set_tlsext_host_name(c->ssl->connection,
+ (char *) name.data)
+ == 0)
+ {
+ lua_pushnil(L);
+ lua_pushliteral(L, "SSL_set_tlsext_host_name failed");
+ return 2;
+ }
+
+#else
+
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, c->log, 0,
+ "lua socket SNI disabled because the current "
+ "version of OpenSSL lacks the support");
+
+#endif
+ }
+
+ if (n >= 4) {
+ u->ssl_verify = lua_toboolean(L, 4);
+
+ if (n >= 5) {
+ if (lua_toboolean(L, 5)) {
+#ifdef NGX_STREAM_LUA_USE_OCSP
+ SSL_set_tlsext_status_type(c->ssl->connection,
+ TLSEXT_STATUSTYPE_ocsp);
+#else
+ return luaL_error(L, "no OCSP support");
+#endif
+ }
+ }
+ }
+ }
+ }
+
+ dd("found sni name: %.*s %p", (int) name.len, name.data, name.data);
+
+ if (name.len == 0) {
+ u->ssl_name.len = 0;
+
+ } else {
+ if (u->ssl_name.data) {
+ /* buffer already allocated */
+
+ if (u->ssl_name.len >= name.len) {
+ /* reuse it */
+ ngx_memcpy(u->ssl_name.data, name.data, name.len);
+ u->ssl_name.len = name.len;
+
+ } else {
+ ngx_free(u->ssl_name.data);
+ goto new_ssl_name;
+ }
+
+ } else {
+
+new_ssl_name:
+
+ u->ssl_name.data = ngx_alloc(name.len, ngx_cycle->log);
+ if (u->ssl_name.data == NULL) {
+ u->ssl_name.len = 0;
+
+ lua_pushnil(L);
+ lua_pushliteral(L, "no memory");
+ return 2;
+ }
+
+ ngx_memcpy(u->ssl_name.data, name.data, name.len);
+ u->ssl_name.len = name.len;
+ }
+ }
+
+ u->write_co_ctx = coctx;
+
+#if 0
+#ifdef NGX_STREAM_LUA_USE_OCSP
+ SSL_set_tlsext_status_type(c->ssl->connection, TLSEXT_STATUSTYPE_ocsp);
+#endif
+#endif
+
+ rc = ngx_ssl_handshake(c);
+
+ dd("ngx_ssl_handshake returned %d", (int) rc);
+
+ if (rc == NGX_AGAIN) {
+ if (c->write->timer_set) {
+ ngx_del_timer(c->write);
+ }
+
+ ngx_add_timer(c->read, u->connect_timeout);
+
+ u->conn_waiting = 1;
+ u->write_prepare_retvals = ngx_stream_lua_ssl_handshake_retval_handler;
+
+ ngx_stream_lua_cleanup_pending_operation(coctx);
+ coctx->cleanup = ngx_stream_lua_coctx_cleanup;
+ coctx->data = u;
+
+ c->ssl->handler = ngx_stream_lua_ssl_handshake_handler;
+
+ if (ctx->entered_content_phase) {
+ r->write_event_handler = ngx_stream_lua_content_wev_handler;
+
+ } else {
+ r->write_event_handler = ngx_stream_lua_core_run_phases;
+ }
+
+ return lua_yield(L, 0);
+ }
+
+ top = lua_gettop(L);
+ ngx_stream_lua_ssl_handshake_handler(c);
+ return lua_gettop(L) - top;
+}
+
+
+static void
+ngx_stream_lua_ssl_handshake_handler(ngx_connection_t *c)
+{
+ const char *err;
+ int waiting;
+ lua_State *L;
+ ngx_int_t rc;
+ ngx_connection_t *dc; /* downstream connection */
+ ngx_stream_lua_request_t *r;
+
+ ngx_stream_lua_ctx_t *ctx;
+ ngx_stream_lua_loc_conf_t *llcf;
+ ngx_stream_lua_socket_tcp_upstream_t *u;
+
+ u = c->data;
+ r = u->request;
+
+ ctx = ngx_stream_lua_get_module_ctx(r, ngx_stream_lua_module);
+ if (ctx == NULL) {
+ return;
+ }
+
+ c->write->handler = ngx_stream_lua_socket_tcp_handler;
+ c->read->handler = ngx_stream_lua_socket_tcp_handler;
+
+ waiting = u->conn_waiting;
+
+ dc = r->connection;
+ L = u->write_co_ctx->co;
+
+ if (c->read->timedout) {
+ lua_pushnil(L);
+ lua_pushliteral(L, "timeout");
+ goto failed;
+ }
+
+ if (c->read->timer_set) {
+ ngx_del_timer(c->read);
+ }
+
+ if (c->ssl->handshaked) {
+
+ if (u->ssl_verify) {
+ rc = SSL_get_verify_result(c->ssl->connection);
+
+ if (rc != X509_V_OK) {
+ lua_pushnil(L);
+ err = lua_pushfstring(L, "%d: %s", (int) rc,
+ X509_verify_cert_error_string(rc));
+
+ llcf = ngx_stream_lua_get_module_loc_conf(r,
+ ngx_stream_lua_module);
+ if (llcf->log_socket_errors) {
+ ngx_log_error(NGX_LOG_ERR, dc->log, 0, "stream lua ssl "
+ "certificate verify error: (%s)", err);
+ }
+
+ goto failed;
+ }
+
+#if defined(nginx_version) && nginx_version >= 1007000
+
+ if (u->ssl_name.len
+ && ngx_ssl_check_host(c, &u->ssl_name) != NGX_OK)
+ {
+ lua_pushnil(L);
+ lua_pushliteral(L, "certificate host mismatch");
+
+ llcf = ngx_stream_lua_get_module_loc_conf(r,
+ ngx_stream_lua_module);
+ if (llcf->log_socket_errors) {
+ ngx_log_error(NGX_LOG_ERR, dc->log, 0, "stream lua ssl "
+ "certificate does not match host \"%V\"",
+ &u->ssl_name);
+ }
+
+ goto failed;
+ }
+
+#endif
+ }
+
+ if (waiting) {
+ ngx_stream_lua_socket_handle_conn_success(r, u);
+
+ } else {
+ (void) ngx_stream_lua_ssl_handshake_retval_handler(r, u, L);
+ }
+
+
+ return;
+ }
+
+ lua_pushnil(L);
+ lua_pushliteral(L, "handshake failed");
+
+failed:
+
+ if (waiting) {
+ u->write_prepare_retvals =
+ ngx_stream_lua_socket_conn_error_retval_handler;
+ ngx_stream_lua_socket_handle_conn_error(r, u,
+ NGX_STREAM_LUA_SOCKET_FT_SSL);
+
+
+ } else {
+ (void) ngx_stream_lua_socket_conn_error_retval_handler(r, u, L);
+ }
+}
+
+
+static int
+ngx_stream_lua_ssl_handshake_retval_handler(ngx_stream_lua_request_t *r,
+ ngx_stream_lua_socket_tcp_upstream_t *u, lua_State *L)
+{
+ ngx_connection_t *c;
+ ngx_ssl_session_t *ssl_session, **ud;
+
+ if (!u->ssl_session_reuse) {
+ lua_pushboolean(L, 1);
+ return 1;
+ }
+
+ ud = lua_newuserdata(L, sizeof(ngx_ssl_session_t *));
+
+ c = u->peer.connection;
+
+ ssl_session = ngx_ssl_get_session(c);
+ if (ssl_session == NULL) {
+ *ud = NULL;
+
+ } else {
+ *ud = ssl_session;
+
+ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, c->log, 0,
+ "stream lua ssl save session: %p", ssl_session);
+
+ /* set up the __gc metamethod */
+ lua_pushlightuserdata(L, ngx_stream_lua_lightudata_mask(
+ ssl_session_metatable_key));
+ lua_rawget(L, LUA_REGISTRYINDEX);
+ lua_setmetatable(L, -2);
+ }
+
+ return 1;
+}
+
+#endif /* NGX_STREAM_SSL */
+
+
+static int
+ngx_stream_lua_socket_read_error_retval_handler(ngx_stream_lua_request_t *r,
+ ngx_stream_lua_socket_tcp_upstream_t *u, lua_State *L)
+{
+ ngx_uint_t ft_type;
+
+ if (u->read_co_ctx) {
+ u->read_co_ctx->cleanup = NULL;
+ }
+
+ ft_type = u->ft_type;
+ u->ft_type = 0;
+
+ if (u->no_close) {
+ u->no_close = 0;
+
+ } else {
+ ngx_stream_lua_socket_tcp_finalize_read_part(r, u);
+ }
+
+ return ngx_stream_lua_socket_prepare_error_retvals(r, u, L, ft_type);
+}
+
+
+static int
+ngx_stream_lua_socket_write_error_retval_handler(ngx_stream_lua_request_t *r,
+ ngx_stream_lua_socket_tcp_upstream_t *u, lua_State *L)
+{
+ ngx_uint_t ft_type;
+
+ if (u->write_co_ctx) {
+ u->write_co_ctx->cleanup = NULL;
+ }
+
+ ngx_stream_lua_socket_tcp_finalize_write_part(r, u, 0);
+
+ ft_type = u->ft_type;
+ u->ft_type = 0;
+ return ngx_stream_lua_socket_prepare_error_retvals(r, u, L, ft_type);
+}
+
+
+static int
+ngx_stream_lua_socket_prepare_error_retvals(ngx_stream_lua_request_t *r,
+ ngx_stream_lua_socket_tcp_upstream_t *u, lua_State *L, ngx_uint_t ft_type)
+{
+ u_char errstr[NGX_MAX_ERROR_STR];
+ u_char *p;
+
+ if (ft_type & (NGX_STREAM_LUA_SOCKET_FT_RESOLVER
+ | NGX_STREAM_LUA_SOCKET_FT_SSL))
+ {
+ return 2;
+ }
+
+ lua_pushnil(L);
+
+ if (ft_type & NGX_STREAM_LUA_SOCKET_FT_TIMEOUT) {
+ lua_pushliteral(L, "timeout");
+
+ } else if (ft_type & NGX_STREAM_LUA_SOCKET_FT_CLOSED) {
+ lua_pushliteral(L, "closed");
+
+ } else if (ft_type & NGX_STREAM_LUA_SOCKET_FT_BUFTOOSMALL) {
+ lua_pushliteral(L, "buffer too small");
+
+ } else if (ft_type & NGX_STREAM_LUA_SOCKET_FT_NOMEM) {
+ lua_pushliteral(L, "no memory");
+
+ } else if (ft_type & NGX_STREAM_LUA_SOCKET_FT_CLIENTABORT) {
+ lua_pushliteral(L, "client aborted");
+
+ } else {
+
+ if (u->socket_errno) {
+ p = ngx_strerror(u->socket_errno, errstr, sizeof(errstr));
+ /* for compatibility with LuaSocket */
+ ngx_strlow(errstr, errstr, p - errstr);
+ lua_pushlstring(L, (char *) errstr, p - errstr);
+
+ } else {
+ lua_pushliteral(L, "error");
+ }
+ }
+
+ return 2;
+}
+
+
+static int
+ngx_stream_lua_socket_tcp_conn_retval_handler(ngx_stream_lua_request_t *r,
+ ngx_stream_lua_socket_tcp_upstream_t *u, lua_State *L)
+{
+ if (u->ft_type) {
+ return ngx_stream_lua_socket_conn_error_retval_handler(r, u, L);
+ }
+
+ lua_pushinteger(L, 1);
+ return 1;
+}
+
+
+static int
+ngx_stream_lua_socket_tcp_peek(lua_State *L)
+{
+ ngx_stream_lua_request_t *r;
+ ngx_connection_t *c;
+ ngx_stream_lua_ctx_t *ctx;
+ ngx_stream_lua_loc_conf_t *llcf;
+ ngx_stream_lua_co_ctx_t *coctx;
+ int n;
+ lua_Integer bytes;
+ size_t size;
+
+ ngx_stream_lua_socket_tcp_upstream_t *u;
+
+ r = ngx_stream_lua_get_req(L);
+ if (r == NULL) {
+ return luaL_error(L, "no request found");
+ }
+
+ ctx = ngx_stream_lua_get_module_ctx(r, ngx_stream_lua_module);
+ ngx_stream_lua_check_context(L, ctx, NGX_STREAM_LUA_CONTEXT_PREREAD);
+
+ n = lua_gettop(L);
+ if (n != 2) {
+ return luaL_error(L, "expecting 2 arguments "
+ "(including the object), but got %d", n);
+ }
+
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "stream lua tcp socket calling peek() method");
+
+ luaL_checktype(L, 1, LUA_TTABLE);
+
+ lua_rawgeti(L, 1, SOCKET_CTX_INDEX);
+ u = lua_touserdata(L, -1);
+
+ if (u == NULL) {
+ llcf = ngx_stream_lua_get_module_loc_conf(r, ngx_stream_lua_module);
+
+ if (llcf->log_socket_errors) {
+ ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
+ "attempt to peek data on a closed socket: u:%p", u);
+ }
+
+ lua_pushnil(L);
+ lua_pushliteral(L, "closed");
+ return 2;
+ }
+
+ if (u->read_consumed) {
+ return luaL_error(L, "attempt to peek on a consumed socket");
+ }
+
+ c = u->peer.connection;
+
+ if (u->request != r) {
+ return luaL_error(L, "bad request");
+ }
+
+ ngx_stream_lua_socket_check_busy_reading(r, u, L);
+
+ if (!lua_isnumber(L, 2)) {
+ return luaL_error(L, "argument must be a number");
+ }
+
+ bytes = lua_tointeger(L, 2);
+ if (bytes < 0) {
+ return luaL_argerror(L, 2, "bytes can not be negative");
+ }
+
+ if (bytes == 0) {
+ lua_pushliteral(L, "");
+ return 1;
+ }
+
+ u->length = (size_t) bytes;
+
+ if (c->buffer != NULL) {
+ size = c->buffer->last - c->buffer->pos;
+
+ if (size >= u->length) {
+ lua_pushlstring(L, (char *) c->buffer->pos, u->length);
+ return 1;
+ }
+ }
+
+ /* not enough data in the preread buffer */
+
+ coctx = ctx->cur_co_ctx;
+
+ ngx_stream_lua_cleanup_pending_operation(coctx);
+ coctx->cleanup = ngx_stream_lua_coctx_cleanup;
+ coctx->data = u;
+
+ dd("setting data to %p, coctx:%p", u, coctx);
+
+ ctx->downstream = u;
+ ctx->resume_handler = ngx_stream_lua_socket_tcp_peek_resume;
+ ctx->peek_needs_more_data = 1;
+ u->read_co_ctx = coctx;
+ u->read_waiting = 1;
+
+ return lua_yield(L, 0);
+}
+
+
+static ngx_int_t
+ngx_stream_lua_socket_tcp_peek_resume(ngx_stream_lua_request_t *r)
+{
+ lua_State *vm;
+ ngx_int_t rc;
+ ngx_uint_t nreqs;
+ ngx_connection_t *c;
+ ngx_stream_lua_ctx_t *ctx;
+ size_t size;
+
+ ngx_stream_lua_socket_tcp_upstream_t *u;
+
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "stream lua tcp socket resuming peek");
+
+ ctx = ngx_stream_lua_get_module_ctx(r, ngx_stream_lua_module);
+ if (ctx == NULL) {
+ return NGX_ERROR;
+ }
+
+ u = ctx->downstream;
+ c = r->connection;
+ vm = ngx_stream_lua_get_lua_vm(r, ctx);
+ nreqs = c->requests;
+
+ size = c->buffer->last - c->buffer->pos;
+
+ if (size < u->length) {
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "lua peek does not have enough data, returning NGX_AGAIN");
+
+ return ngx_stream_lua_run_posted_threads(c, vm, r, ctx, nreqs);
+ }
+
+ ctx->resume_handler = ngx_stream_lua_wev_handler;
+ /* read handler might have been changed by ngx_stream_core_preread_phase */
+ r->connection->read->handler = ngx_stream_lua_request_handler;
+
+ lua_pushlstring(u->read_co_ctx->co, (char *) c->buffer->pos, u->length);
+
+ u->read_co_ctx->cleanup = NULL;
+ ctx->cur_co_ctx = u->read_co_ctx;
+ u->read_co_ctx = NULL;
+ ctx->peek_needs_more_data = 0;
+ u->read_waiting = 0;
+
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "lua tcp operation done, resuming lua thread");
+
+ rc = ngx_stream_lua_run_thread(vm, r, ctx, 1);
+
+ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "lua run thread returned %d", rc);
+
+ if (rc == NGX_AGAIN) {
+ return ngx_stream_lua_run_posted_threads(c, vm, r, ctx, nreqs);
+ }
+
+ if (rc == NGX_DONE) {
+ ngx_stream_lua_finalize_request(r, NGX_DONE);
+ return ngx_stream_lua_run_posted_threads(c, vm, r, ctx, nreqs);
+ }
+
+ return rc;
+}
+
+
+static int
+ngx_stream_lua_socket_tcp_receive_helper(ngx_stream_lua_request_t *r,
+ ngx_stream_lua_socket_tcp_upstream_t *u, lua_State *L)
+{
+ ngx_int_t rc;
+ ngx_stream_lua_ctx_t *lctx;
+ ngx_stream_lua_co_ctx_t *coctx;
+
+ u->input_filter_ctx = u;
+
+ lctx = ngx_stream_lua_get_module_ctx(r, ngx_stream_lua_module);
+
+ if (u->bufs_in == NULL) {
+ u->bufs_in =
+ ngx_stream_lua_chain_get_free_buf(r->connection->log,
+ r->pool,
+ &lctx->free_recv_bufs,
+ u->conf->buffer_size);
+
+ if (u->bufs_in == NULL) {
+ return luaL_error(L, "no memory");
+ }
+
+ u->buf_in = u->bufs_in;
+ u->buffer = *u->buf_in->buf;
+ }
+
+ dd("tcp receive: buf_in: %p, bufs_in: %p", u->buf_in, u->bufs_in);
+
+ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "stream lua tcp socket read timeout: %M", u->read_timeout);
+
+ if (u->raw_downstream || u->body_downstream) {
+ r->read_event_handler = ngx_stream_lua_req_socket_rev_handler;
+ }
+
+ u->read_waiting = 0;
+ u->read_co_ctx = NULL;
+
+ rc = ngx_stream_lua_socket_tcp_read(r, u);
+
+ if (rc == NGX_ERROR) {
+ dd("read failed: %d", (int) u->ft_type);
+ rc = ngx_stream_lua_socket_tcp_receive_retval_handler(r, u, L);
+ dd("tcp receive retval returned: %d", (int) rc);
+ return rc;
+ }
+
+ if (rc == NGX_OK) {
+
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "stream lua tcp socket receive done in a single run");
+
+ return ngx_stream_lua_socket_tcp_receive_retval_handler(r, u, L);
+ }
+
+ /* rc == NGX_AGAIN */
+
+ u->read_event_handler = ngx_stream_lua_socket_read_handler;
+
+ coctx = lctx->cur_co_ctx;
+
+ ngx_stream_lua_cleanup_pending_operation(coctx);
+ coctx->cleanup = ngx_stream_lua_coctx_cleanup;
+ coctx->data = u;
+
+ if (lctx->entered_content_phase) {
+ r->write_event_handler = ngx_stream_lua_content_wev_handler;
+
+ } else {
+ r->write_event_handler = ngx_stream_lua_core_run_phases;
+ }
+
+ u->read_co_ctx = coctx;
+ u->read_waiting = 1;
+ u->read_prepare_retvals = ngx_stream_lua_socket_tcp_receive_retval_handler;
+
+ dd("setting data to %p, coctx:%p", u, coctx);
+
+ if (u->raw_downstream || u->body_downstream) {
+ lctx->downstream = u;
+ }
+
+ return lua_yield(L, 0);
+}
+
+
+static int
+ngx_stream_lua_socket_tcp_receiveany(lua_State *L)
+{
+ int n;
+ lua_Integer bytes;
+ ngx_stream_lua_request_t *r;
+ ngx_stream_lua_loc_conf_t *llcf;
+
+ ngx_stream_lua_socket_tcp_upstream_t *u;
+
+ n = lua_gettop(L);
+ if (n != 2) {
+ return luaL_error(L, "expecting 2 arguments "
+ "(including the object), but got %d", n);
+ }
+
+ r = ngx_stream_lua_get_req(L);
+ if (r == NULL) {
+ return luaL_error(L, "no request found");
+ }
+
+ luaL_checktype(L, 1, LUA_TTABLE);
+
+ lua_rawgeti(L, 1, SOCKET_CTX_INDEX);
+ u = lua_touserdata(L, -1);
+
+ if (u == NULL || u->peer.connection == NULL || u->read_closed) {
+
+ llcf = ngx_stream_lua_get_module_loc_conf(r, ngx_stream_lua_module);
+
+ if (llcf->log_socket_errors) {
+ ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
+ "attempt to receive data on a closed "
+ "socket: u:%p, c:%p, ft:%d eof:%d",
+ u, u ? u->peer.connection : NULL,
+ u ? (int) u->ft_type : 0, u ? (int) u->eof : 0);
+ }
+
+ lua_pushnil(L);
+ lua_pushliteral(L, "closed");
+ return 2;
+ }
+
+ if (u->request != r) {
+ return luaL_error(L, "bad request");
+ }
+
+ ngx_stream_lua_socket_check_busy_connecting(r, u, L);
+ ngx_stream_lua_socket_check_busy_reading(r, u, L);
+
+ if (!lua_isnumber(L, 2)) {
+ return luaL_argerror(L, 2, "bad max argument");
+ }
+
+ bytes = lua_tointeger(L, 2);
+ if (bytes <= 0) {
+ return luaL_argerror(L, 2, "bad max argument");
+ }
+
+ u->input_filter = ngx_stream_lua_socket_read_any;
+ u->rest = (size_t) bytes;
+ u->length = u->rest;
+
+ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "stream lua tcp socket calling receiveany() "
+ "method to read at most %uz bytes", u->rest);
+
+ return ngx_stream_lua_socket_tcp_receive_helper(r, u, L);
+}
+
+
+static int
+ngx_stream_lua_socket_tcp_receive(lua_State *L)
+{
+ ngx_stream_lua_request_t *r;
+ ngx_stream_lua_loc_conf_t *llcf;
+ int n;
+ ngx_str_t pat;
+ lua_Integer bytes;
+ char *p;
+ int typ;
+
+ ngx_stream_lua_socket_tcp_upstream_t *u;
+
+ n = lua_gettop(L);
+ if (n != 1 && n != 2) {
+ return luaL_error(L, "expecting 1 or 2 arguments "
+ "(including the object), but got %d", n);
+ }
+
+ r = ngx_stream_lua_get_req(L);
+ if (r == NULL) {
+ return luaL_error(L, "no request found");
+ }
+
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "stream lua tcp socket calling receive() method");
+
+ luaL_checktype(L, 1, LUA_TTABLE);
+
+ lua_rawgeti(L, 1, SOCKET_CTX_INDEX);
+ u = lua_touserdata(L, -1);
+
+ if (u == NULL || u->peer.connection == NULL || u->read_closed) {
+
+ llcf = ngx_stream_lua_get_module_loc_conf(r, ngx_stream_lua_module);
+
+ if (llcf->log_socket_errors) {
+ ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
+ "stream attempt to receive data on a closed "
+ "socket: u:%p, c:%p, ft:%d eof:%d",
+ u, u ? u->peer.connection : NULL,
+ u ? (int) u->ft_type : 0, u ? (int) u->eof : 0);
+ }
+
+ lua_pushnil(L);
+ lua_pushliteral(L, "closed");
+ return 2;
+ }
+
+ if (u->request != r) {
+ return luaL_error(L, "bad request");
+ }
+
+ ngx_stream_lua_socket_check_busy_connecting(r, u, L);
+ ngx_stream_lua_socket_check_busy_reading(r, u, L);
+
+ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "stream lua tcp socket read timeout: %M", u->read_timeout);
+
+ if (n > 1) {
+ if (lua_isnumber(L, 2)) {
+ typ = LUA_TNUMBER;
+
+ } else {
+ typ = lua_type(L, 2);
+ }
+
+ switch (typ) {
+ case LUA_TSTRING:
+ pat.data = (u_char *) luaL_checklstring(L, 2, &pat.len);
+ if (pat.len != 2 || pat.data[0] != '*') {
+ p = (char *) lua_pushfstring(L, "bad pattern argument: %s",
+ (char *) pat.data);
+
+ return luaL_argerror(L, 2, p);
+ }
+
+ switch (pat.data[1]) {
+ case 'l':
+ u->input_filter = ngx_stream_lua_socket_read_line;
+ break;
+
+ case 'a':
+ u->input_filter = ngx_stream_lua_socket_read_all;
+ break;
+
+ default:
+ return luaL_argerror(L, 2, "bad pattern argument");
+ break;
+ }
+
+ u->length = 0;
+ u->rest = 0;
+
+ break;
+
+ case LUA_TNUMBER:
+ bytes = lua_tointeger(L, 2);
+ if (bytes < 0) {
+ return luaL_argerror(L, 2, "bad pattern argument");
+ }
+
+#if 1
+ if (bytes == 0) {
+ lua_pushliteral(L, "");
+ return 1;
+ }
+#endif
+
+ u->input_filter = ngx_stream_lua_socket_read_chunk;
+ u->length = (size_t) bytes;
+ u->rest = u->length;
+
+ break;
+
+ default:
+ return luaL_argerror(L, 2, "bad pattern argument");
+ break;
+ }
+
+ } else {
+ u->input_filter = ngx_stream_lua_socket_read_line;
+ u->length = 0;
+ u->rest = 0;
+ }
+
+ return ngx_stream_lua_socket_tcp_receive_helper(r, u, L);
+}
+
+
+static ngx_int_t
+ngx_stream_lua_socket_read_chunk(void *data, ssize_t bytes)
+{
+ ngx_int_t rc;
+ ngx_stream_lua_socket_tcp_upstream_t *u = data;
+
+ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, u->request->connection->log, 0,
+ "stream lua tcp socket read chunk %z", bytes);
+
+ rc = ngx_stream_lua_read_bytes(&u->buffer, u->buf_in, &u->rest,
+ bytes, u->request->connection->log);
+ if (rc == NGX_ERROR) {
+ u->ft_type |= NGX_STREAM_LUA_SOCKET_FT_CLOSED;
+ return NGX_ERROR;
+ }
+
+ return rc;
+}
+
+
+static ngx_int_t
+ngx_stream_lua_socket_read_all(void *data, ssize_t bytes)
+{
+ ngx_stream_lua_socket_tcp_upstream_t *u = data;
+
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, u->request->connection->log, 0,
+ "stream lua tcp socket read all");
+ return ngx_stream_lua_read_all(&u->buffer, u->buf_in, bytes,
+ u->request->connection->log);
+}
+
+
+static ngx_int_t
+ngx_stream_lua_socket_read_line(void *data, ssize_t bytes)
+{
+ ngx_int_t rc;
+ ngx_stream_lua_socket_tcp_upstream_t *u = data;
+
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, u->request->connection->log, 0,
+ "stream lua tcp socket read line");
+
+ rc = ngx_stream_lua_read_line(&u->buffer, u->buf_in, bytes,
+ u->request->connection->log);
+ if (rc == NGX_ERROR) {
+ u->ft_type |= NGX_STREAM_LUA_SOCKET_FT_CLOSED;
+ return NGX_ERROR;
+ }
+
+ return rc;
+}
+
+
+static ngx_int_t
+ngx_stream_lua_socket_read_any(void *data, ssize_t bytes)
+{
+ ngx_int_t rc;
+ ngx_stream_lua_socket_tcp_upstream_t *u = data;
+
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, u->request->connection->log, 0,
+ "stream lua tcp socket read any");
+
+ rc = ngx_stream_lua_read_any(&u->buffer, u->buf_in, &u->rest, bytes,
+ u->request->connection->log);
+ if (rc == NGX_ERROR) {
+ u->ft_type |= NGX_STREAM_LUA_SOCKET_FT_CLOSED;
+ return NGX_ERROR;
+ }
+
+ return rc;
+}
+
+
+static ngx_int_t
+ngx_stream_lua_socket_tcp_read(ngx_stream_lua_request_t *r,
+ ngx_stream_lua_socket_tcp_upstream_t *u)
+{
+ ngx_int_t rc;
+ ngx_connection_t *c;
+ ngx_buf_t *b;
+ ngx_event_t *rev;
+ size_t size;
+ ssize_t n;
+ unsigned read;
+ off_t preread = 0;
+
+ ngx_stream_lua_loc_conf_t *llcf;
+
+ c = u->peer.connection;
+ rev = c->read;
+
+ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, c->log, 0,
+ "stream lua tcp socket read data: wait:%d",
+ (int) u->read_waiting);
+
+ b = &u->buffer;
+ read = 0;
+
+ for ( ;; ) {
+
+ size = b->last - b->pos;
+
+ if (size || u->eof) {
+
+ rc = u->input_filter(u->input_filter_ctx, size);
+
+ if (rc == NGX_OK) {
+
+ ngx_log_debug2(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "stream lua tcp socket receive done: "
+ "wait:%d, eof:%d, ", (int) u->read_waiting,
+ (int) u->eof);
+
+
+#if 1
+ if (ngx_handle_read_event(rev, 0) != NGX_OK) {
+ ngx_stream_lua_socket_handle_read_error(r, u,
+ NGX_STREAM_LUA_SOCKET_FT_ERROR);
+ return NGX_ERROR;
+ }
+#endif
+
+
+ u->read_consumed = 1;
+
+ ngx_stream_lua_socket_handle_read_success(r, u);
+ return NGX_OK;
+ }
+
+ if (rc == NGX_ERROR) {
+ dd("input filter error: ft_type:%d wait:%d",
+ (int) u->ft_type, (int) u->read_waiting);
+
+ ngx_stream_lua_socket_handle_read_error(r, u,
+ NGX_STREAM_LUA_SOCKET_FT_ERROR);
+ return NGX_ERROR;
+ }
+
+ /* rc == NGX_AGAIN */
+
+
+ continue;
+ }
+
+ if (read && !rev->ready) {
+ rc = NGX_AGAIN;
+ break;
+ }
+
+ size = b->end - b->last;
+
+ if (size == 0) {
+ rc = ngx_stream_lua_socket_add_input_buffer(r, u);
+ if (rc == NGX_ERROR) {
+ ngx_stream_lua_socket_handle_read_error(r, u,
+ NGX_STREAM_LUA_SOCKET_FT_NOMEM);
+
+ return NGX_ERROR;
+ }
+
+ b = &u->buffer;
+ size = (size_t) (b->end - b->last);
+ }
+
+ if (u->raw_downstream) {
+ if (r->connection->buffer != NULL) {
+ preread = ngx_buf_size(r->connection->buffer);
+ }
+
+ if (preread) {
+
+ if ((off_t) size > preread) {
+ size = (size_t) preread;
+ }
+
+ ngx_stream_lua_probe_req_socket_consume_preread(r,
+ r->connection->buffer->pos,
+ size);
+
+ b->last = ngx_copy(b->last, r->connection->buffer->pos, size);
+ r->connection->buffer->pos += size;
+ continue;
+ }
+
+ }
+
+#if 1
+ if (rev->active && !rev->ready) {
+ rc = NGX_AGAIN;
+ break;
+ }
+#endif
+
+ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "stream lua tcp socket try to recv data %uz", size);
+
+ n = c->recv(c, b->last, size);
+
+ dd("read event ready: %d", (int) c->read->ready);
+
+ read = 1;
+
+ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "stream lua tcp socket recv returned %d", (int) n);
+
+ if (n == NGX_AGAIN) {
+ rc = NGX_AGAIN;
+ dd("socket recv busy");
+ break;
+ }
+
+ if (n == 0) {
+
+ if (u->raw_downstream || u->body_downstream) {
+
+ llcf = ngx_stream_lua_get_module_loc_conf(r,
+ ngx_stream_lua_module);
+
+ if (llcf->check_client_abort) {
+
+ ngx_stream_lua_socket_handle_read_error(r, u,
+ NGX_STREAM_LUA_SOCKET_FT_CLIENTABORT);
+ return NGX_ERROR;
+ }
+
+ /* llcf->check_client_abort == 0 */
+
+ }
+
+ u->eof = 1;
+
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "stream lua tcp socket closed");
+
+ continue;
+ }
+
+ if (n == NGX_ERROR) {
+ u->socket_errno = ngx_socket_errno;
+ ngx_stream_lua_socket_handle_read_error(r, u,
+ NGX_STREAM_LUA_SOCKET_FT_ERROR);
+ return NGX_ERROR;
+ }
+
+ b->last += n;
+
+ }
+
+#if 1
+ if (ngx_handle_read_event(rev, 0) != NGX_OK) {
+ ngx_stream_lua_socket_handle_read_error(r, u,
+ NGX_STREAM_LUA_SOCKET_FT_ERROR);
+ return NGX_ERROR;
+ }
+#endif
+
+ if (rev->active) {
+ ngx_add_timer(rev, u->read_timeout);
+
+ } else if (rev->timer_set) {
+ ngx_del_timer(rev);
+ }
+
+ return rc;
+}
+
+
+static int
+ngx_stream_lua_socket_tcp_send(lua_State *L)
+{
+ ngx_int_t rc;
+ ngx_stream_lua_request_t *r;
+ u_char *p;
+ size_t len;
+ ngx_chain_t *cl;
+ int type;
+ int tcp_nodelay;
+ const char *msg;
+ ngx_buf_t *b;
+ ngx_connection_t *c;
+
+ ngx_stream_lua_ctx_t *ctx;
+ ngx_stream_lua_socket_tcp_upstream_t *u;
+ ngx_stream_lua_loc_conf_t *llcf;
+
+ ngx_stream_core_srv_conf_t *clcf;
+
+ ngx_stream_lua_co_ctx_t *coctx;
+
+ /* TODO: add support for the optional "i" and "j" arguments */
+
+ if (lua_gettop(L) != 2) {
+ return luaL_error(L, "expecting 2 arguments (including the object), "
+ "but got %d", lua_gettop(L));
+ }
+
+ r = ngx_stream_lua_get_req(L);
+ if (r == NULL) {
+ return luaL_error(L, "no request found");
+ }
+
+ luaL_checktype(L, 1, LUA_TTABLE);
+
+ lua_rawgeti(L, 1, SOCKET_CTX_INDEX);
+ u = lua_touserdata(L, -1);
+ lua_pop(L, 1);
+
+ dd("tcp send: u=%p, u->write_closed=%d", u, (unsigned) u->write_closed);
+
+ if (u == NULL || u->peer.connection == NULL || u->write_closed) {
+ llcf = ngx_stream_lua_get_module_loc_conf(r, ngx_stream_lua_module);
+
+ if (llcf->log_socket_errors) {
+ ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
+ "attempt to send data on a closed socket: u:%p, "
+ "c:%p, ft:%d eof:%d",
+ u, u ? u->peer.connection : NULL,
+ u ? (int) u->ft_type : 0, u ? (int) u->eof : 0);
+ }
+
+ lua_pushnil(L);
+ lua_pushliteral(L, "closed");
+ return 2;
+ }
+
+ if (u->request != r) {
+ return luaL_error(L, "bad request");
+ }
+
+ ngx_stream_lua_socket_check_busy_connecting(r, u, L);
+ ngx_stream_lua_socket_check_busy_writing(r, u, L);
+
+ if (u->body_downstream) {
+ return luaL_error(L, "attempt to write to request sockets");
+ }
+
+ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "stream lua tcp socket send timeout: %M", u->send_timeout);
+
+ type = lua_type(L, 2);
+ switch (type) {
+ case LUA_TNUMBER:
+ case LUA_TSTRING:
+ lua_tolstring(L, 2, &len);
+ break;
+
+ case LUA_TTABLE:
+ len = ngx_stream_lua_calc_strlen_in_table(L, 2, 2, 1 /* strict */);
+ break;
+
+ case LUA_TNIL:
+ len = sizeof("nil") - 1;
+ break;
+
+ case LUA_TBOOLEAN:
+ if (lua_toboolean(L, 2)) {
+ len = sizeof("true") - 1;
+
+ } else {
+ len = sizeof("false") - 1;
+ }
+
+ break;
+
+ default:
+ msg = lua_pushfstring(L, "string, number, boolean, nil, "
+ "or array table expected, got %s",
+ lua_typename(L, type));
+
+ return luaL_argerror(L, 2, msg);
+ }
+
+ if (len == 0) {
+ lua_pushinteger(L, 0);
+ return 1;
+ }
+
+ ctx = ngx_stream_lua_get_module_ctx(r, ngx_stream_lua_module);
+
+ cl = ngx_stream_lua_chain_get_free_buf(r->connection->log, r->pool,
+ &ctx->free_bufs, len);
+
+ if (cl == NULL) {
+ return luaL_error(L, "no memory");
+ }
+
+ b = cl->buf;
+
+ switch (type) {
+ case LUA_TNUMBER:
+ case LUA_TSTRING:
+ p = (u_char *) lua_tolstring(L, -1, &len);
+ b->last = ngx_copy(b->last, (u_char *) p, len);
+ break;
+
+ case LUA_TTABLE:
+ b->last = ngx_stream_lua_copy_str_in_table(L, -1, b->last);
+ break;
+
+ case LUA_TNIL:
+ *b->last++ = 'n';
+ *b->last++ = 'i';
+ *b->last++ = 'l';
+ break;
+
+ case LUA_TBOOLEAN:
+ if (lua_toboolean(L, 2)) {
+ *b->last++ = 't';
+ *b->last++ = 'r';
+ *b->last++ = 'u';
+ *b->last++ = 'e';
+
+ } else {
+ *b->last++ = 'f';
+ *b->last++ = 'a';
+ *b->last++ = 'l';
+ *b->last++ = 's';
+ *b->last++ = 'e';
+ }
+
+ break;
+
+ default:
+ return luaL_error(L, "impossible to reach here");
+ }
+
+ u->request_bufs = cl;
+
+ u->request_len = len;
+
+ /* mimic ngx_http_upstream_init_request here */
+
+ clcf = ngx_stream_lua_get_module_loc_conf(r, ngx_stream_core_module);
+ c = u->peer.connection;
+
+ if (clcf->tcp_nodelay && c->tcp_nodelay == NGX_TCP_NODELAY_UNSET) {
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, c->log, 0,
+ "stream lua socket tcp_nodelay");
+
+ tcp_nodelay = 1;
+
+ if (setsockopt(c->fd, IPPROTO_TCP, TCP_NODELAY,
+ (const void *) &tcp_nodelay, sizeof(int))
+ == -1)
+ {
+ llcf = ngx_stream_lua_get_module_loc_conf(r, ngx_stream_lua_module);
+ if (llcf->log_socket_errors) {
+ ngx_connection_error(c, ngx_socket_errno,
+ "setsockopt(TCP_NODELAY) "
+ "failed");
+ }
+
+ lua_pushnil(L);
+ lua_pushliteral(L, "setsocketopt tcp_nodelay failed");
+ return 2;
+ }
+
+ c->tcp_nodelay = NGX_TCP_NODELAY_SET;
+ }
+
+#if 1
+ u->write_waiting = 0;
+ u->write_co_ctx = NULL;
+#endif
+
+ ngx_stream_lua_probe_socket_tcp_send_start(r, u, b->pos, len);
+
+ rc = ngx_stream_lua_socket_send(r, u);
+
+ dd("socket send returned %d", (int) rc);
+
+ if (rc == NGX_ERROR) {
+ return ngx_stream_lua_socket_write_error_retval_handler(r, u, L);
+ }
+
+ if (rc == NGX_OK) {
+ lua_pushinteger(L, len);
+ return 1;
+ }
+
+ /* rc == NGX_AGAIN */
+
+ coctx = ctx->cur_co_ctx;
+
+ ngx_stream_lua_cleanup_pending_operation(coctx);
+ coctx->cleanup = ngx_stream_lua_coctx_cleanup;
+ coctx->data = u;
+
+ if (u->raw_downstream) {
+ ctx->writing_raw_req_socket = 1;
+ }
+
+ if (ctx->entered_content_phase) {
+ r->write_event_handler = ngx_stream_lua_content_wev_handler;
+
+ } else {
+ r->write_event_handler = ngx_stream_lua_core_run_phases;
+ }
+
+ u->write_co_ctx = coctx;
+ u->write_waiting = 1;
+ u->write_prepare_retvals = ngx_stream_lua_socket_tcp_send_retval_handler;
+
+ dd("setting data to %p", u);
+
+ return lua_yield(L, 0);
+}
+
+
+static int
+ngx_stream_lua_socket_tcp_send_retval_handler(ngx_stream_lua_request_t *r,
+ ngx_stream_lua_socket_tcp_upstream_t *u, lua_State *L)
+{
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "stream lua tcp socket send return value handler");
+
+ if (u->ft_type) {
+ return ngx_stream_lua_socket_write_error_retval_handler(r, u, L);
+ }
+
+ lua_pushinteger(L, u->request_len);
+ return 1;
+}
+
+
+static int
+ngx_stream_lua_socket_tcp_receive_retval_handler(ngx_stream_lua_request_t *r,
+ ngx_stream_lua_socket_tcp_upstream_t *u, lua_State *L)
+{
+ int n;
+ ngx_int_t rc;
+ ngx_stream_lua_ctx_t *ctx;
+ ngx_event_t *ev;
+
+ ngx_stream_lua_loc_conf_t *llcf;
+
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "stream lua tcp socket receive return value handler");
+
+ ctx = ngx_stream_lua_get_module_ctx(r, ngx_stream_lua_module);
+
+#if 1
+ if (u->raw_downstream || u->body_downstream) {
+ llcf = ngx_stream_lua_get_module_loc_conf(r, ngx_stream_lua_module);
+
+ if (llcf->check_client_abort) {
+
+ r->read_event_handler = ngx_stream_lua_rd_check_broken_connection;
+
+ ev = r->connection->read;
+
+ dd("rev active: %d", ev->active);
+
+ if ((ngx_event_flags & NGX_USE_LEVEL_EVENT) && !ev->active) {
+ if (ngx_add_event(ev, NGX_READ_EVENT, 0) != NGX_OK) {
+ lua_pushnil(L);
+ lua_pushliteral(L, "failed to add event");
+ return 2;
+ }
+ }
+
+ } else {
+ /* llcf->check_client_abort == 0 */
+ r->read_event_handler = ngx_stream_lua_block_reading;
+ }
+ }
+#endif
+
+ if (u->ft_type) {
+
+ if (u->ft_type & NGX_STREAM_LUA_SOCKET_FT_TIMEOUT) {
+ u->no_close = 1;
+ }
+
+ dd("u->bufs_in: %p", u->bufs_in);
+
+ if (u->bufs_in) {
+ rc = ngx_stream_lua_socket_push_input_data(r, ctx, u, L);
+ if (rc == NGX_ERROR) {
+ lua_pushnil(L);
+ lua_pushliteral(L, "no memory");
+ return 2;
+ }
+
+ (void) ngx_stream_lua_socket_read_error_retval_handler(r, u, L);
+
+ lua_pushvalue(L, -3);
+ lua_remove(L, -4);
+ return 3;
+ }
+
+ n = ngx_stream_lua_socket_read_error_retval_handler(r, u, L);
+ lua_pushliteral(L, "");
+ return n + 1;
+ }
+
+ rc = ngx_stream_lua_socket_push_input_data(r, ctx, u, L);
+ if (rc == NGX_ERROR) {
+ lua_pushnil(L);
+ lua_pushliteral(L, "no memory");
+ return 2;
+ }
+
+ return 1;
+}
+
+
+static int
+ngx_stream_lua_socket_tcp_close(lua_State *L)
+{
+ ngx_stream_lua_request_t *r;
+
+ ngx_stream_lua_socket_tcp_upstream_t *u;
+
+ if (lua_gettop(L) != 1) {
+ return luaL_error(L, "expecting 1 argument "
+ "(including the object) but seen %d", lua_gettop(L));
+ }
+
+ r = ngx_stream_lua_get_req(L);
+ if (r == NULL) {
+ return luaL_error(L, "no request found");
+ }
+
+ luaL_checktype(L, 1, LUA_TTABLE);
+
+ lua_rawgeti(L, 1, SOCKET_CTX_INDEX);
+ u = lua_touserdata(L, -1);
+ lua_pop(L, 1);
+
+ if (u == NULL
+ || u->peer.connection == NULL
+ || (u->read_closed && u->write_closed))
+ {
+ lua_pushnil(L);
+ lua_pushliteral(L, "closed");
+ return 2;
+ }
+
+ if (u->request != r) {
+ return luaL_error(L, "bad request");
+ }
+
+ ngx_stream_lua_socket_check_busy_connecting(r, u, L);
+ ngx_stream_lua_socket_check_busy_reading(r, u, L);
+ ngx_stream_lua_socket_check_busy_writing(r, u, L);
+
+ if (u->raw_downstream || u->body_downstream) {
+ lua_pushnil(L);
+ lua_pushliteral(L, "attempt to close a request socket");
+ return 2;
+ }
+
+ ngx_stream_lua_socket_tcp_finalize(r, u);
+
+ lua_pushinteger(L, 1);
+ return 1;
+}
+
+
+static int
+ngx_stream_lua_socket_tcp_shutdown(lua_State *L)
+{
+ ngx_stream_lua_request_t *r;
+ ngx_stream_lua_socket_tcp_upstream_t *u;
+ ngx_str_t direction;
+ char *p;
+ ngx_stream_lua_ctx_t *ctx;
+
+ if (lua_gettop(L) != 2) {
+ return luaL_error(L, "expecting 2 arguments "
+ "(including the object) but seen %d", lua_gettop(L));
+ }
+
+ luaL_checktype(L, 1, LUA_TTABLE);
+
+ lua_rawgeti(L, 1, SOCKET_CTX_INDEX);
+ u = lua_touserdata(L, -1);
+ lua_pop(L, 1);
+
+ r = ngx_stream_lua_get_req(L);
+ if (r == NULL) {
+ return luaL_error(L, "no request found");
+ }
+
+ if (u == NULL
+ || u->peer.connection == NULL
+ || (u->read_closed && u->write_closed))
+ {
+ lua_pushnil(L);
+ lua_pushliteral(L, "closed");
+ return 2;
+ }
+
+ if (u->write_closed) {
+ lua_pushnil(L);
+ lua_pushliteral(L, "already shutdown");
+ return 2;
+ }
+
+ if (u->request != r) {
+ return luaL_error(L, "bad request");
+ }
+
+ ctx = ngx_stream_lua_get_module_ctx(r, ngx_stream_lua_module);
+ if (ctx == NULL) {
+ ngx_stream_lua_socket_handle_write_error(r, u,
+ NGX_STREAM_LUA_SOCKET_FT_ERROR);
+ return NGX_ERROR;
+ }
+
+ /*
+ * only allow shutdown on raw request in stream module in content phase.
+ * in http module, lingering close will take care of the shutdown.
+ * in stream module, it is unsafe to shutdown prior on reaching content
+ * phase as later phases may still need to write to the socket.
+ */
+
+ if (u->raw_downstream) {
+ ngx_stream_lua_check_context(L, ctx, NGX_STREAM_LUA_CONTEXT_CONTENT);
+
+ if (ctx->eof) {
+ lua_pushnil(L);
+ lua_pushliteral(L, "seen eof");
+ return 2;
+ }
+
+ /* prevent all further output attempt */
+ ctx->eof = 1;
+ }
+
+ ngx_stream_lua_socket_check_busy_connecting(r, u, L);
+ ngx_stream_lua_socket_check_busy_writing(r, u, L);
+
+ /* shutdown */
+ direction.data = (u_char *) luaL_checklstring(L, 2, &direction.len);
+ if (direction.len == 0) {
+ lua_pushnil(L);
+ lua_pushliteral(L, "pattern is empty");
+ return 2;
+ }
+
+ if (direction.len != 4 || ngx_strcmp(direction.data, "send") != 0) {
+ p = (char *) lua_pushfstring(L, "bad shutdown argument: %s",
+ (char *) direction.data);
+
+ return luaL_argerror(L, 2, p);
+ }
+
+ ngx_stream_lua_socket_tcp_finalize_write_part(r, u, 1);
+
+ lua_pushinteger(L, 1);
+ return 1;
+}
+
+
+static int
+ngx_stream_lua_socket_tcp_setoption(lua_State *L)
+{
+ /* TODO */
+ return 0;
+}
+
+
+static int
+ngx_stream_lua_socket_tcp_settimeout(lua_State *L)
+{
+ int n;
+ ngx_int_t timeout;
+
+ ngx_stream_lua_socket_tcp_upstream_t *u;
+
+ n = lua_gettop(L);
+
+ if (n != 2) {
+ return luaL_error(L, "ngx.socket settimout: expecting 2 arguments "
+ "(including the object) but seen %d", lua_gettop(L));
+ }
+
+ timeout = (ngx_int_t) lua_tonumber(L, 2);
+ if (timeout >> 31) {
+ return luaL_error(L, "bad timeout value");
+ }
+
+ lua_pushinteger(L, timeout);
+ lua_pushinteger(L, timeout);
+
+ lua_rawseti(L, 1, SOCKET_CONNECT_TIMEOUT_INDEX);
+ lua_rawseti(L, 1, SOCKET_SEND_TIMEOUT_INDEX);
+ lua_rawseti(L, 1, SOCKET_READ_TIMEOUT_INDEX);
+
+ lua_rawgeti(L, 1, SOCKET_CTX_INDEX);
+ u = lua_touserdata(L, -1);
+
+ if (u) {
+ if (timeout > 0) {
+ u->read_timeout = (ngx_msec_t) timeout;
+ u->send_timeout = (ngx_msec_t) timeout;
+ u->connect_timeout = (ngx_msec_t) timeout;
+
+ } else {
+ u->read_timeout = u->conf->read_timeout;
+ u->send_timeout = u->conf->send_timeout;
+ u->connect_timeout = u->conf->connect_timeout;
+ }
+ }
+
+ return 0;
+}
+
+
+static int
+ngx_stream_lua_socket_tcp_settimeouts(lua_State *L)
+{
+ int n;
+ ngx_int_t connect_timeout, send_timeout, read_timeout;
+
+ ngx_stream_lua_socket_tcp_upstream_t *u;
+
+ n = lua_gettop(L);
+
+ if (n != 4) {
+ return luaL_error(L, "ngx.socket settimout: expecting 4 arguments "
+ "(including the object) but seen %d", lua_gettop(L));
+ }
+
+ connect_timeout = (ngx_int_t) lua_tonumber(L, 2);
+ if (connect_timeout >> 31) {
+ return luaL_error(L, "bad timeout value");
+ }
+
+ send_timeout = (ngx_int_t) lua_tonumber(L, 3);
+ if (send_timeout >> 31) {
+ return luaL_error(L, "bad timeout value");
+ }
+
+ read_timeout = (ngx_int_t) lua_tonumber(L, 4);
+ if (read_timeout >> 31) {
+ return luaL_error(L, "bad timeout value");
+ }
+
+ lua_rawseti(L, 1, SOCKET_READ_TIMEOUT_INDEX);
+ lua_rawseti(L, 1, SOCKET_SEND_TIMEOUT_INDEX);
+ lua_rawseti(L, 1, SOCKET_CONNECT_TIMEOUT_INDEX);
+
+ lua_rawgeti(L, 1, SOCKET_CTX_INDEX);
+ u = lua_touserdata(L, -1);
+
+ if (u) {
+ if (connect_timeout > 0) {
+ u->connect_timeout = (ngx_msec_t) connect_timeout;
+
+ } else {
+ u->connect_timeout = u->conf->connect_timeout;
+ }
+
+ if (send_timeout > 0) {
+ u->send_timeout = (ngx_msec_t) send_timeout;
+
+ } else {
+ u->send_timeout = u->conf->send_timeout;
+ }
+
+ if (read_timeout > 0) {
+ u->read_timeout = (ngx_msec_t) read_timeout;
+
+ } else {
+ u->read_timeout = u->conf->read_timeout;
+ }
+ }
+
+ return 0;
+}
+
+
+static void
+ngx_stream_lua_socket_tcp_handler(ngx_event_t *ev)
+{
+ ngx_stream_lua_request_t *r;
+ ngx_stream_lua_socket_tcp_upstream_t *u;
+ ngx_connection_t *c;
+
+ c = ev->data;
+ u = c->data;
+ r = u->request;
+ c = r->connection;
+
+ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, c->log, 0,
+ "stream lua tcp socket handler: wev %d", (int) ev->write);
+
+ if (ev->write) {
+ u->write_event_handler(r, u);
+
+ } else {
+ u->read_event_handler(r, u);
+ }
+
+}
+
+
+static ngx_int_t
+ngx_stream_lua_socket_tcp_get_peer(ngx_peer_connection_t *pc, void *data)
+{
+ /* empty */
+ return NGX_OK;
+}
+
+
+static void
+ngx_stream_lua_socket_read_handler(ngx_stream_lua_request_t *r,
+ ngx_stream_lua_socket_tcp_upstream_t *u)
+{
+ ngx_connection_t *c;
+ ngx_stream_lua_loc_conf_t *llcf;
+
+ c = u->peer.connection;
+
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "stream lua tcp socket read handler");
+
+ if (c->read->timedout) {
+ c->read->timedout = 0;
+
+ llcf = ngx_stream_lua_get_module_loc_conf(r, ngx_stream_lua_module);
+
+ if (llcf->log_socket_errors) {
+ ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
+ "stream lua tcp socket read timed out");
+ }
+
+ ngx_stream_lua_socket_handle_read_error(r, u,
+ NGX_STREAM_LUA_SOCKET_FT_TIMEOUT);
+ return;
+ }
+
+#if 1
+ if (c->read->timer_set) {
+ ngx_del_timer(c->read);
+ }
+#endif
+
+ if (u->buffer.start != NULL) {
+ (void) ngx_stream_lua_socket_tcp_read(r, u);
+ }
+}
+
+
+static void
+ngx_stream_lua_socket_send_handler(ngx_stream_lua_request_t *r,
+ ngx_stream_lua_socket_tcp_upstream_t *u)
+{
+ ngx_connection_t *c;
+ ngx_stream_lua_loc_conf_t *llcf;
+
+ c = u->peer.connection;
+
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "stream lua tcp socket send handler");
+
+ if (c->write->timedout) {
+ llcf = ngx_stream_lua_get_module_loc_conf(r, ngx_stream_lua_module);
+
+ if (llcf->log_socket_errors) {
+ ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
+ "stream lua tcp socket write timed out");
+ }
+
+ ngx_stream_lua_socket_handle_write_error(r, u,
+ NGX_STREAM_LUA_SOCKET_FT_TIMEOUT);
+ return;
+ }
+
+ if (u->request_bufs) {
+ (void) ngx_stream_lua_socket_send(r, u);
+ }
+}
+
+
+static ngx_int_t
+ngx_stream_lua_socket_send(ngx_stream_lua_request_t *r,
+ ngx_stream_lua_socket_tcp_upstream_t *u)
+{
+ ngx_int_t n;
+ ngx_connection_t *c;
+ ngx_stream_lua_ctx_t *ctx;
+ ngx_buf_t *b;
+
+ c = u->peer.connection;
+
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "stream lua tcp socket send data");
+
+ ctx = ngx_stream_lua_get_module_ctx(r, ngx_stream_lua_module);
+ if (ctx == NULL) {
+ ngx_stream_lua_socket_handle_write_error(r, u,
+ NGX_STREAM_LUA_SOCKET_FT_ERROR);
+ return NGX_ERROR;
+ }
+
+ b = u->request_bufs->buf;
+
+ for (;;) {
+ n = c->send(c, b->pos, b->last - b->pos);
+
+ if (n >= 0) {
+ b->pos += n;
+
+ if (b->pos == b->last) {
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, c->log, 0,
+ "stream lua tcp socket sent all the data");
+
+ if (c->write->timer_set) {
+ ngx_del_timer(c->write);
+ }
+
+
+ ngx_chain_update_chains(r->pool,
+ &ctx->free_bufs, &ctx->busy_bufs,
+ &u->request_bufs,
+ (ngx_buf_tag_t) &ngx_stream_lua_module);
+
+ u->write_event_handler = ngx_stream_lua_socket_dummy_handler;
+
+ if (ngx_handle_write_event(c->write, 0) != NGX_OK) {
+ ngx_stream_lua_socket_handle_write_error(r, u,
+ NGX_STREAM_LUA_SOCKET_FT_ERROR);
+ return NGX_ERROR;
+ }
+
+ ngx_stream_lua_socket_handle_write_success(r, u);
+ return NGX_OK;
+ }
+
+ /* keep sending more data */
+ continue;
+ }
+
+ /* NGX_ERROR || NGX_AGAIN */
+ break;
+ }
+
+ if (n == NGX_ERROR) {
+ c->error = 1;
+ u->socket_errno = ngx_socket_errno;
+ ngx_stream_lua_socket_handle_write_error(r, u,
+ NGX_STREAM_LUA_SOCKET_FT_ERROR);
+ return NGX_ERROR;
+ }
+
+ /* n == NGX_AGAIN */
+
+ if (u->raw_downstream) {
+ ctx->writing_raw_req_socket = 1;
+ }
+
+ u->write_event_handler = ngx_stream_lua_socket_send_handler;
+
+ ngx_add_timer(c->write, u->send_timeout);
+
+ if (ngx_handle_write_event(c->write, u->conf->send_lowat) != NGX_OK) {
+ ngx_stream_lua_socket_handle_write_error(r, u,
+ NGX_STREAM_LUA_SOCKET_FT_ERROR);
+ return NGX_ERROR;
+ }
+
+ return NGX_AGAIN;
+}
+
+
+static void
+ngx_stream_lua_socket_handle_conn_success(ngx_stream_lua_request_t *r,
+ ngx_stream_lua_socket_tcp_upstream_t *u)
+{
+ ngx_stream_lua_ctx_t *ctx;
+ ngx_stream_lua_co_ctx_t *coctx;
+
+#if 1
+ u->read_event_handler = ngx_stream_lua_socket_dummy_handler;
+ u->write_event_handler = ngx_stream_lua_socket_dummy_handler;
+#endif
+
+ if (u->conn_waiting) {
+ u->conn_waiting = 0;
+
+ coctx = u->write_co_ctx;
+ coctx->cleanup = NULL;
+ u->write_co_ctx = NULL;
+
+ ctx = ngx_stream_lua_get_module_ctx(r, ngx_stream_lua_module);
+ if (ctx == NULL) {
+ return;
+ }
+
+ ctx->resume_handler = ngx_stream_lua_socket_tcp_conn_resume;
+ ctx->cur_co_ctx = coctx;
+
+ ngx_stream_lua_assert(coctx && (!ngx_stream_lua_is_thread(ctx)
+ || coctx->co_ref >= 0));
+
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "stream lua tcp socket waking up the current "
+ "request (conn)");
+
+ r->write_event_handler(r);
+ }
+}
+
+
+static void
+ngx_stream_lua_socket_handle_read_success(ngx_stream_lua_request_t *r,
+ ngx_stream_lua_socket_tcp_upstream_t *u)
+{
+ ngx_stream_lua_ctx_t *ctx;
+ ngx_stream_lua_co_ctx_t *coctx;
+
+#if 1
+ u->read_event_handler = ngx_stream_lua_socket_dummy_handler;
+#endif
+
+ if (u->read_waiting) {
+ u->read_waiting = 0;
+
+ coctx = u->read_co_ctx;
+ coctx->cleanup = NULL;
+ u->read_co_ctx = NULL;
+
+ ctx = ngx_stream_lua_get_module_ctx(r, ngx_stream_lua_module);
+ if (ctx == NULL) {
+ return;
+ }
+
+ ctx->resume_handler = ngx_stream_lua_socket_tcp_read_resume;
+ ctx->cur_co_ctx = coctx;
+
+ ngx_stream_lua_assert(coctx && (!ngx_stream_lua_is_thread(ctx)
+ || coctx->co_ref >= 0));
+
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "stream lua tcp socket waking up the current "
+ "request (read)");
+
+ r->write_event_handler(r);
+ }
+}
+
+
+static void
+ngx_stream_lua_socket_handle_write_success(ngx_stream_lua_request_t *r,
+ ngx_stream_lua_socket_tcp_upstream_t *u)
+{
+ ngx_stream_lua_ctx_t *ctx;
+ ngx_stream_lua_co_ctx_t *coctx;
+
+#if 1
+ u->write_event_handler = ngx_stream_lua_socket_dummy_handler;
+#endif
+
+ if (u->write_waiting) {
+ u->write_waiting = 0;
+
+ coctx = u->write_co_ctx;
+ coctx->cleanup = NULL;
+ u->write_co_ctx = NULL;
+
+ ctx = ngx_stream_lua_get_module_ctx(r, ngx_stream_lua_module);
+ if (ctx == NULL) {
+ return;
+ }
+
+ ctx->resume_handler = ngx_stream_lua_socket_tcp_write_resume;
+ ctx->cur_co_ctx = coctx;
+
+ ngx_stream_lua_assert(coctx && (!ngx_stream_lua_is_thread(ctx)
+ || coctx->co_ref >= 0));
+
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "stream lua tcp socket waking up the current "
+ "request (read)");
+
+ r->write_event_handler(r);
+ }
+}
+
+
+static void
+ngx_stream_lua_socket_handle_conn_error(ngx_stream_lua_request_t *r,
+ ngx_stream_lua_socket_tcp_upstream_t *u, ngx_uint_t ft_type)
+{
+ ngx_stream_lua_ctx_t *ctx;
+ ngx_stream_lua_co_ctx_t *coctx;
+
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "stream lua tcp socket handle connect error");
+
+ u->ft_type |= ft_type;
+
+#if 1
+ ngx_stream_lua_socket_tcp_finalize(r, u);
+#endif
+
+ u->read_event_handler = ngx_stream_lua_socket_dummy_handler;
+ u->write_event_handler = ngx_stream_lua_socket_dummy_handler;
+
+ dd("connection waiting: %d", (int) u->conn_waiting);
+
+ coctx = u->write_co_ctx;
+
+ if (u->conn_waiting) {
+ u->conn_waiting = 0;
+
+ coctx->cleanup = NULL;
+ u->write_co_ctx = NULL;
+
+ ctx = ngx_stream_lua_get_module_ctx(r, ngx_stream_lua_module);
+
+ ctx->resume_handler = ngx_stream_lua_socket_tcp_conn_resume;
+ ctx->cur_co_ctx = coctx;
+
+ ngx_stream_lua_assert(coctx && (!ngx_stream_lua_is_thread(ctx)
+ || coctx->co_ref >= 0));
+
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "stream lua tcp socket waking up the current request");
+
+ r->write_event_handler(r);
+ }
+}
+
+
+static void
+ngx_stream_lua_socket_handle_read_error(ngx_stream_lua_request_t *r,
+ ngx_stream_lua_socket_tcp_upstream_t *u, ngx_uint_t ft_type)
+{
+ ngx_stream_lua_ctx_t *ctx;
+ ngx_stream_lua_co_ctx_t *coctx;
+
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "stream lua tcp socket handle read error");
+
+ u->ft_type |= ft_type;
+
+#if 0
+ ngx_stream_lua_socket_tcp_finalize(r, u);
+#endif
+
+ u->read_event_handler = ngx_stream_lua_socket_dummy_handler;
+
+ if (u->read_waiting) {
+ u->read_waiting = 0;
+
+ coctx = u->read_co_ctx;
+ coctx->cleanup = NULL;
+ u->read_co_ctx = NULL;
+
+ ctx = ngx_stream_lua_get_module_ctx(r, ngx_stream_lua_module);
+
+ ctx->resume_handler = ngx_stream_lua_socket_tcp_read_resume;
+ ctx->cur_co_ctx = coctx;
+
+ ngx_stream_lua_assert(coctx && (!ngx_stream_lua_is_thread(ctx)
+ || coctx->co_ref >= 0));
+
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "stream lua tcp socket waking up the current request");
+
+ r->write_event_handler(r);
+ }
+}
+
+
+static void
+ngx_stream_lua_socket_handle_write_error(ngx_stream_lua_request_t *r,
+ ngx_stream_lua_socket_tcp_upstream_t *u, ngx_uint_t ft_type)
+{
+ ngx_stream_lua_ctx_t *ctx;
+ ngx_stream_lua_co_ctx_t *coctx;
+
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "stream lua tcp socket handle write error");
+
+ u->ft_type |= ft_type;
+
+#if 0
+ ngx_stream_lua_socket_tcp_finalize(r, u);
+#endif
+
+ u->write_event_handler = ngx_stream_lua_socket_dummy_handler;
+
+ if (u->write_waiting) {
+ u->write_waiting = 0;
+
+ coctx = u->write_co_ctx;
+ coctx->cleanup = NULL;
+ u->write_co_ctx = NULL;
+
+ ctx = ngx_stream_lua_get_module_ctx(r, ngx_stream_lua_module);
+
+ ctx->resume_handler = ngx_stream_lua_socket_tcp_write_resume;
+ ctx->cur_co_ctx = coctx;
+
+ ngx_stream_lua_assert(coctx && (!ngx_stream_lua_is_thread(ctx)
+ || coctx->co_ref >= 0));
+
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "stream lua tcp socket waking up the current request");
+
+ r->write_event_handler(r);
+ }
+}
+
+
+static void
+ngx_stream_lua_socket_connected_handler(ngx_stream_lua_request_t *r,
+ ngx_stream_lua_socket_tcp_upstream_t *u)
+{
+ ngx_int_t rc;
+ ngx_connection_t *c;
+
+ ngx_stream_lua_loc_conf_t *llcf;
+
+ c = u->peer.connection;
+
+ if (c->write->timedout) {
+
+ llcf = ngx_stream_lua_get_module_loc_conf(r, ngx_stream_lua_module);
+
+ if (llcf->log_socket_errors) {
+ ngx_stream_lua_socket_init_peer_connection_addr_text(&u->peer);
+
+ ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
+ "stream lua tcp socket connect timed out,"
+ " when connecting to %V:%ud",
+ &c->addr_text, ngx_inet_get_port(u->peer.sockaddr));
+ }
+
+ ngx_stream_lua_socket_handle_conn_error(r, u,
+ NGX_STREAM_LUA_SOCKET_FT_TIMEOUT);
+ return;
+ }
+
+ if (c->write->timer_set) {
+ ngx_del_timer(c->write);
+ }
+
+ rc = ngx_stream_lua_socket_test_connect(r, c);
+ if (rc != NGX_OK) {
+ if (rc > 0) {
+ u->socket_errno = (ngx_err_t) rc;
+ }
+
+ ngx_stream_lua_socket_handle_conn_error(r, u,
+ NGX_STREAM_LUA_SOCKET_FT_ERROR);
+ return;
+ }
+
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "stream lua tcp socket connected");
+
+ /* We should delete the current write/read event
+ * here because the socket object may not be used immediately
+ * on the Lua land, thus causing hot spin around level triggered
+ * event poll and wasting CPU cycles. */
+
+ if (ngx_handle_write_event(c->write, 0) != NGX_OK) {
+ ngx_stream_lua_socket_handle_conn_error(r, u,
+ NGX_STREAM_LUA_SOCKET_FT_ERROR);
+ return;
+ }
+
+ if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
+ ngx_stream_lua_socket_handle_conn_error(r, u,
+ NGX_STREAM_LUA_SOCKET_FT_ERROR);
+ return;
+ }
+
+ ngx_stream_lua_socket_handle_conn_success(r, u);
+}
+
+
+static void
+ngx_stream_lua_socket_tcp_cleanup(void *data)
+{
+ ngx_stream_lua_socket_tcp_upstream_t *u = data;
+
+ ngx_stream_lua_request_t *r;
+
+ r = u->request;
+
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "cleanup lua tcp socket request");
+
+ ngx_stream_lua_socket_tcp_finalize(r, u);
+}
+
+
+static void
+ngx_stream_lua_socket_tcp_finalize_read_part(ngx_stream_lua_request_t *r,
+ ngx_stream_lua_socket_tcp_upstream_t *u)
+{
+ ngx_chain_t *cl;
+ ngx_chain_t **ll;
+ ngx_connection_t *c;
+ ngx_stream_lua_ctx_t *ctx;
+
+ if (u->read_closed) {
+ return;
+ }
+
+ u->read_closed = 1;
+
+ ctx = ngx_stream_lua_get_module_ctx(r, ngx_stream_lua_module);
+
+ if (ctx && u->bufs_in) {
+
+ ll = &u->bufs_in;
+ for (cl = u->bufs_in; cl; cl = cl->next) {
+ dd("bufs_in chain: %p, next %p", cl, cl->next);
+ cl->buf->pos = cl->buf->last;
+ ll = &cl->next;
+ }
+
+ dd("ctx: %p", ctx);
+ dd("free recv bufs: %p", ctx->free_recv_bufs);
+ *ll = ctx->free_recv_bufs;
+ ctx->free_recv_bufs = u->bufs_in;
+ u->bufs_in = NULL;
+ u->buf_in = NULL;
+ ngx_memzero(&u->buffer, sizeof(ngx_buf_t));
+ }
+
+ if (u->raw_downstream || u->body_downstream) {
+ if (r->connection->read->timer_set) {
+ ngx_del_timer(r->connection->read);
+ }
+ return;
+ }
+
+ c = u->peer.connection;
+
+ if (c) {
+ if (c->read->timer_set) {
+ ngx_del_timer(c->read);
+ }
+
+ if (c->read->active || c->read->disabled) {
+ ngx_del_event(c->read, NGX_READ_EVENT, NGX_CLOSE_EVENT);
+ }
+
+#if defined(nginx_version) && nginx_version >= 1007005
+ if (c->read->posted) {
+#else
+ if (c->read->prev) {
+#endif
+ ngx_delete_posted_event(c->read);
+ }
+
+ c->read->closed = 1;
+
+ /* TODO: shutdown the reading part of the connection */
+ }
+}
+
+
+static void
+ngx_stream_lua_socket_tcp_finalize_write_part(ngx_stream_lua_request_t *r,
+ ngx_stream_lua_socket_tcp_upstream_t *u, int do_shutdown)
+
+{
+ ngx_connection_t *c;
+ ngx_stream_lua_ctx_t *ctx;
+
+ c = u->peer.connection;
+
+ if (u->write_closed) {
+ return;
+ }
+
+ u->write_closed = 1;
+
+ ctx = ngx_stream_lua_get_module_ctx(r, ngx_stream_lua_module);
+
+ if (c && do_shutdown) {
+ if (ngx_shutdown_socket(c->fd, NGX_WRITE_SHUTDOWN) == -1) {
+ ngx_connection_error(c, ngx_socket_errno,
+ ngx_shutdown_socket_n " failed");
+ return;
+ }
+
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "stream lua shutdown socket write direction");
+ }
+
+ if (u->raw_downstream || u->body_downstream) {
+ if (ctx && ctx->writing_raw_req_socket) {
+ ctx->writing_raw_req_socket = 0;
+ if (r->connection->write->timer_set) {
+ ngx_del_timer(r->connection->write);
+ }
+
+ r->connection->write->error = 1;
+ }
+ return;
+ }
+
+ if (c) {
+ if (c->write->timer_set) {
+ ngx_del_timer(c->write);
+ }
+
+ if (c->write->active || c->write->disabled) {
+ ngx_del_event(c->write, NGX_WRITE_EVENT, NGX_CLOSE_EVENT);
+ }
+
+#if defined(nginx_version) && nginx_version >= 1007005
+ if (c->write->posted) {
+#else
+ if (c->write->prev) {
+#endif
+ ngx_delete_posted_event(c->write);
+ }
+
+ c->write->closed = 1;
+ }
+}
+
+
+static void
+ngx_stream_lua_socket_tcp_conn_op_timeout_handler(ngx_event_t *ev)
+{
+ ngx_stream_lua_socket_tcp_upstream_t *u;
+ ngx_stream_lua_ctx_t *ctx;
+ ngx_stream_lua_request_t *r;
+ ngx_stream_lua_co_ctx_t *coctx;
+ ngx_stream_lua_loc_conf_t *llcf;
+ ngx_stream_lua_socket_tcp_conn_op_ctx_t *conn_op_ctx;
+
+ conn_op_ctx = ev->data;
+ ngx_queue_remove(&conn_op_ctx->queue);
+
+ u = conn_op_ctx->u;
+ r = u->request;
+
+ coctx = u->write_co_ctx;
+ coctx->cleanup = NULL;
+ /* note that we store conn_op_ctx in coctx->data instead of u */
+ coctx->data = conn_op_ctx;
+ u->write_co_ctx = NULL;
+
+ llcf = ngx_stream_lua_get_module_loc_conf(r, ngx_stream_lua_module);
+
+ if (llcf->log_socket_errors) {
+ ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
+ "stream lua tcp socket queued connect "
+ "timed out, when trying to connect to %V:%ud",
+ &conn_op_ctx->host, conn_op_ctx->port);
+ }
+
+ ngx_queue_insert_head(&u->socket_pool->cache_connect_op,
+ &conn_op_ctx->queue);
+ u->socket_pool->connections--;
+
+ ctx = ngx_stream_lua_get_module_ctx(r, ngx_stream_lua_module);
+ if (ctx == NULL) {
+ return;
+ }
+
+ ctx->cur_co_ctx = coctx;
+
+ ngx_stream_lua_assert(coctx && (!ngx_stream_lua_is_thread(ctx)
+ || coctx->co_ref >= 0));
+
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "stream lua tcp socket waking up the current "
+ "request");
+
+ u->write_prepare_retvals =
+ ngx_stream_lua_socket_tcp_conn_op_timeout_retval_handler;
+
+ if (ctx->entered_content_phase) {
+ (void) ngx_stream_lua_socket_tcp_conn_op_resume(r);
+
+ } else {
+ ctx->resume_handler = ngx_stream_lua_socket_tcp_conn_op_resume;
+ ngx_stream_lua_core_run_phases(r);
+ }
+
+}
+
+
+static int
+ngx_stream_lua_socket_tcp_conn_op_timeout_retval_handler(
+ ngx_stream_lua_request_t *r, ngx_stream_lua_socket_tcp_upstream_t *u,
+ lua_State *L)
+{
+ lua_pushnil(L);
+ lua_pushliteral(L, "timeout");
+ return 2;
+}
+
+
+static void
+ngx_stream_lua_socket_tcp_resume_conn_op(
+ ngx_stream_lua_socket_pool_t *spool)
+{
+ ngx_queue_t *q;
+ ngx_stream_lua_socket_tcp_conn_op_ctx_t *conn_op_ctx;
+
+#if (NGX_DEBUG)
+ ngx_stream_lua_assert(spool->connections >= 0);
+
+#else
+ if (spool->connections < 0) {
+ ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0,
+ "stream lua tcp socket connections count "
+ "mismatched for connection pool \"%s\", connections: "
+ "%i, size: %i",
+ spool->key, spool->connections, spool->size);
+ spool->connections = 0;
+ }
+#endif
+
+ /* we manually destroy wait_connect_op before triggering connect
+ * operation resumption, so that there is no resumption happens when Nginx
+ * is exiting.
+ */
+ if (ngx_queue_empty(&spool->wait_connect_op)) {
+ return;
+ }
+
+ q = ngx_queue_head(&spool->wait_connect_op);
+ conn_op_ctx = ngx_queue_data(q,
+ ngx_stream_lua_socket_tcp_conn_op_ctx_t,
+ queue);
+ ngx_log_debug4(NGX_LOG_DEBUG_STREAM, ngx_cycle->log, 0,
+ "stream lua tcp socket post connect operation "
+ "resumption u: %p, ctx: %p for connection pool \"%s\", "
+ "connections: %i",
+ conn_op_ctx->u, conn_op_ctx, spool->key, spool->connections);
+
+ if (conn_op_ctx->event.timer_set) {
+ ngx_del_timer(&conn_op_ctx->event);
+ }
+
+ conn_op_ctx->event.handler =
+ ngx_stream_lua_socket_tcp_conn_op_resume_handler;
+
+ ngx_post_event((&conn_op_ctx->event), &ngx_posted_events);
+}
+
+
+static void
+ngx_stream_lua_socket_tcp_conn_op_ctx_cleanup(void *data)
+{
+ ngx_stream_lua_socket_tcp_upstream_t *u;
+ ngx_stream_lua_socket_tcp_conn_op_ctx_t *conn_op_ctx = data;
+
+ u = conn_op_ctx->u;
+
+ ngx_log_debug2(NGX_LOG_DEBUG_STREAM, u->request->connection->log, 0,
+ "stream cleanup lua tcp socket "
+ "conn_op_ctx: %p, u: %p",
+ conn_op_ctx, u);
+
+ ngx_queue_insert_head(&u->socket_pool->cache_connect_op,
+ &conn_op_ctx->queue);
+}
+
+
+static void
+ngx_stream_lua_socket_tcp_conn_op_resume_handler(ngx_event_t *ev)
+{
+ ngx_queue_t *q;
+ ngx_stream_lua_request_t *r;
+ ngx_stream_lua_cleanup_t *cln;
+ ngx_stream_lua_ctx_t *ctx;
+ ngx_stream_lua_co_ctx_t *coctx;
+ ngx_stream_lua_socket_pool_t *spool;
+ ngx_stream_lua_socket_tcp_upstream_t *u;
+ ngx_stream_lua_socket_tcp_conn_op_ctx_t *conn_op_ctx;
+
+ conn_op_ctx = ev->data;
+ u = conn_op_ctx->u;
+ r = u->request;
+ spool = u->socket_pool;
+
+ if (ngx_queue_empty(&spool->wait_connect_op)) {
+#if (NGX_DEBUG)
+ ngx_stream_lua_assert(!(spool->backlog >= 0
+ && spool->connections > spool->size));
+
+#else
+ if (spool->backlog >= 0 && spool->connections > spool->size) {
+ ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0,
+ "stream lua tcp socket connections count "
+ "mismatched for connection pool \"%s\", connections: "
+ "%i, size: %i",
+ spool->key, spool->connections, spool->size);
+ spool->connections = spool->size;
+ }
+#endif
+
+ return;
+ }
+
+ q = ngx_queue_head(&spool->wait_connect_op);
+ ngx_queue_remove(q);
+
+ coctx = u->write_co_ctx;
+ coctx->cleanup = NULL;
+ /* note that we store conn_op_ctx in coctx->data instead of u */
+ coctx->data = conn_op_ctx;
+ /* clear ngx_stream_lua_tcp_queue_conn_op_cleanup */
+ u->write_co_ctx = NULL;
+
+ ctx = ngx_stream_lua_get_module_ctx(r, ngx_stream_lua_module);
+ if (ctx == NULL) {
+ ngx_queue_insert_head(&spool->cache_connect_op,
+ &conn_op_ctx->queue);
+ return;
+ }
+
+ ctx->cur_co_ctx = coctx;
+
+ ngx_stream_lua_assert(coctx && (!ngx_stream_lua_is_thread(ctx)
+ || coctx->co_ref >= 0));
+
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "stream lua tcp socket waking up the current "
+ "request");
+
+ u->write_prepare_retvals =
+ ngx_stream_lua_socket_tcp_conn_op_resume_retval_handler;
+
+ if (ctx->entered_content_phase) {
+ (void) ngx_stream_lua_socket_tcp_conn_op_resume(r);
+
+ } else {
+ cln = ngx_stream_lua_cleanup_add(r, 0);
+ if (cln != NULL) {
+ cln->handler = ngx_stream_lua_socket_tcp_conn_op_ctx_cleanup;
+ cln->data = conn_op_ctx;
+ conn_op_ctx->cleanup = &cln->handler;
+ }
+
+ ctx->resume_handler = ngx_stream_lua_socket_tcp_conn_op_resume;
+ ngx_stream_lua_core_run_phases(r);
+ }
+
+}
+
+
+static int
+ngx_stream_lua_socket_tcp_conn_op_resume_retval_handler(ngx_stream_lua_request_t *r,
+ ngx_stream_lua_socket_tcp_upstream_t *u, lua_State *L)
+{
+ int nret;
+ ngx_stream_lua_ctx_t *ctx;
+ ngx_stream_lua_co_ctx_t *coctx;
+ ngx_stream_lua_socket_tcp_conn_op_ctx_t *conn_op_ctx;
+
+ ctx = ngx_stream_lua_get_module_ctx(r, ngx_stream_lua_module);
+ if (ctx == NULL) {
+ return NGX_ERROR;
+ }
+
+ coctx = ctx->cur_co_ctx;
+ dd("coctx: %p", coctx);
+ conn_op_ctx = coctx->data;
+ if (conn_op_ctx->cleanup != NULL) {
+ *conn_op_ctx->cleanup = NULL;
+ ngx_stream_lua_cleanup_free(r, conn_op_ctx->cleanup);
+ conn_op_ctx->cleanup = NULL;
+ }
+
+ /* decrease pending connect operation counter */
+ u->socket_pool->connections--;
+
+ nret = ngx_stream_lua_socket_tcp_connect_helper(L, u, r, ctx,
+ conn_op_ctx->host.data,
+ conn_op_ctx->host.len,
+ conn_op_ctx->port, 1);
+ ngx_queue_insert_head(&u->socket_pool->cache_connect_op,
+ &conn_op_ctx->queue);
+
+ return nret;
+}
+
+
+static void
+ngx_stream_lua_socket_tcp_finalize(ngx_stream_lua_request_t *r,
+ ngx_stream_lua_socket_tcp_upstream_t *u)
+{
+ ngx_connection_t *c;
+
+ ngx_stream_lua_socket_pool_t *spool;
+
+ dd("request: %p, u: %p, u->cleanup: %p", r, u, u->cleanup);
+
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "stream lua finalize socket");
+
+ if (u->cleanup) {
+ *u->cleanup = NULL;
+ ngx_stream_lua_cleanup_free(r, u->cleanup);
+ u->cleanup = NULL;
+ }
+
+ ngx_stream_lua_socket_tcp_finalize_read_part(r, u);
+
+ ngx_stream_lua_socket_tcp_finalize_write_part(r, u, 0);
+
+ if (u->raw_downstream || u->body_downstream) {
+ u->peer.connection = NULL;
+ return;
+ }
+
+ if (u->resolved && u->resolved->ctx) {
+ ngx_resolve_name_done(u->resolved->ctx);
+ u->resolved->ctx = NULL;
+ }
+
+ if (u->peer.free) {
+ u->peer.free(&u->peer, u->peer.data, 0);
+ }
+
+#if (NGX_STREAM_SSL)
+ if (u->ssl_name.data) {
+ ngx_free(u->ssl_name.data);
+ u->ssl_name.data = NULL;
+ u->ssl_name.len = 0;
+ }
+#endif
+
+ c = u->peer.connection;
+ if (c) {
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "lua close socket connection");
+
+ ngx_stream_lua_socket_tcp_close_connection(c);
+ u->peer.connection = NULL;
+
+ u->conn_closed = 1;
+
+ spool = u->socket_pool;
+ if (spool == NULL) {
+ return;
+ }
+
+ spool->connections--;
+
+ if (spool->connections == 0) {
+ ngx_stream_lua_socket_free_pool(r->connection->log, spool);
+ return;
+ }
+
+ ngx_stream_lua_socket_tcp_resume_conn_op(spool);
+ }
+}
+
+
+static void
+ngx_stream_lua_socket_tcp_close_connection(ngx_connection_t *c)
+{
+#if (NGX_STREAM_SSL)
+
+ if (c->ssl) {
+ c->ssl->no_wait_shutdown = 1;
+ c->ssl->no_send_shutdown = 1;
+
+ (void) ngx_ssl_shutdown(c);
+ }
+
+#endif
+
+ if (c->pool) {
+ ngx_destroy_pool(c->pool);
+ c->pool = NULL;
+ }
+
+ ngx_close_connection(c);
+}
+
+
+static ngx_int_t
+ngx_stream_lua_socket_test_connect(ngx_stream_lua_request_t *r,
+ ngx_connection_t *c)
+{
+ int err;
+ socklen_t len;
+
+ ngx_stream_lua_loc_conf_t *llcf;
+
+#if (NGX_HAVE_KQUEUE)
+
+ ngx_event_t *ev;
+
+ if (ngx_event_flags & NGX_USE_KQUEUE_EVENT) {
+ dd("pending eof: (%p)%d (%p)%d", c->write, c->write->pending_eof,
+ c->read, c->read->pending_eof);
+
+ if (c->write->pending_eof) {
+ ev = c->write;
+
+ } else if (c->read->pending_eof) {
+ ev = c->read;
+
+ } else {
+ ev = NULL;
+ }
+
+ if (ev) {
+ llcf = ngx_stream_lua_get_module_loc_conf(r, ngx_stream_lua_module);
+ if (llcf->log_socket_errors) {
+ (void) ngx_connection_error(c, ev->kq_errno,
+ "kevent() reported that "
+ "connect() failed");
+ }
+ return ev->kq_errno;
+ }
+
+ } else
+#endif
+ {
+ err = 0;
+ len = sizeof(int);
+
+ /*
+ * BSDs and Linux return 0 and set a pending error in err
+ * Solaris returns -1 and sets errno
+ */
+
+ if (getsockopt(c->fd, SOL_SOCKET, SO_ERROR, (void *) &err, &len)
+ == -1)
+ {
+ err = ngx_errno;
+ }
+
+ if (err) {
+ llcf = ngx_stream_lua_get_module_loc_conf(r, ngx_stream_lua_module);
+ if (llcf->log_socket_errors) {
+ (void) ngx_connection_error(c, err, "connect() failed");
+ }
+ return err;
+ }
+ }
+
+ return NGX_OK;
+}
+
+
+static void
+ngx_stream_lua_socket_dummy_handler(ngx_stream_lua_request_t *r,
+ ngx_stream_lua_socket_tcp_upstream_t *u)
+{
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "stream lua tcp socket dummy handler");
+}
+
+
+static int
+ngx_stream_lua_socket_tcp_receiveuntil(lua_State *L)
+{
+ ngx_stream_lua_request_t *r;
+ int n;
+ ngx_str_t pat;
+ ngx_int_t rc;
+ size_t size;
+ unsigned inclusive = 0;
+
+ ngx_stream_lua_socket_compiled_pattern_t *cp;
+
+ n = lua_gettop(L);
+ if (n != 2 && n != 3) {
+ return luaL_error(L, "expecting 2 or 3 arguments "
+ "(including the object), but got %d", n);
+ }
+
+ if (n == 3) {
+ /* check out the options table */
+
+ luaL_checktype(L, 3, LUA_TTABLE);
+
+ lua_getfield(L, 3, "inclusive");
+
+ switch (lua_type(L, -1)) {
+ case LUA_TNIL:
+ /* do nothing */
+ break;
+
+ case LUA_TBOOLEAN:
+ if (lua_toboolean(L, -1)) {
+ inclusive = 1;
+ }
+ break;
+
+ default:
+ return luaL_error(L, "bad \"inclusive\" option value type: %s",
+ luaL_typename(L, -1));
+
+ }
+
+ lua_pop(L, 2);
+ }
+
+ r = ngx_stream_lua_get_req(L);
+ if (r == NULL) {
+ return luaL_error(L, "no request found");
+ }
+
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "stream lua tcp socket calling receiveuntil() method");
+
+ luaL_checktype(L, 1, LUA_TTABLE);
+
+ pat.data = (u_char *) luaL_checklstring(L, 2, &pat.len);
+ if (pat.len == 0) {
+ lua_pushnil(L);
+ lua_pushliteral(L, "pattern is empty");
+ return 2;
+ }
+
+ size = sizeof(ngx_stream_lua_socket_compiled_pattern_t);
+
+ cp = lua_newuserdata(L, size);
+ if (cp == NULL) {
+ return luaL_error(L, "no memory");
+ }
+
+ lua_pushlightuserdata(L, ngx_stream_lua_lightudata_mask(
+ pattern_udata_metatable_key));
+ lua_rawget(L, LUA_REGISTRYINDEX);
+ lua_setmetatable(L, -2);
+
+ ngx_memzero(cp, size);
+
+ cp->inclusive = inclusive;
+
+ rc = ngx_stream_lua_socket_compile_pattern(pat.data, pat.len, cp,
+ r->connection->log);
+
+ if (rc != NGX_OK) {
+ lua_pushnil(L);
+ lua_pushliteral(L, "failed to compile pattern");
+ return 2;
+ }
+
+ lua_pushcclosure(L, ngx_stream_lua_socket_receiveuntil_iterator, 3);
+ return 1;
+}
+
+
+static int
+ngx_stream_lua_socket_receiveuntil_iterator(lua_State *L)
+{
+ ngx_stream_lua_request_t *r;
+ ngx_int_t rc;
+ ngx_stream_lua_ctx_t *ctx;
+ lua_Integer bytes;
+ int n;
+
+ ngx_stream_lua_socket_tcp_upstream_t *u;
+ ngx_stream_lua_co_ctx_t *coctx;
+ ngx_stream_lua_socket_compiled_pattern_t *cp;
+
+ n = lua_gettop(L);
+ if (n > 1) {
+ return luaL_error(L, "expecting 0 or 1 arguments, "
+ "but seen %d", n);
+ }
+
+ if (n >= 1) {
+ bytes = luaL_checkinteger(L, 1);
+ if (bytes < 0) {
+ bytes = 0;
+ }
+
+ } else {
+ bytes = 0;
+ }
+
+ lua_rawgeti(L, lua_upvalueindex(1), SOCKET_CTX_INDEX);
+ u = lua_touserdata(L, -1);
+ lua_pop(L, 1);
+
+ if (u == NULL || u->peer.connection == NULL || u->read_closed) {
+ lua_pushnil(L);
+ lua_pushliteral(L, "closed");
+ return 2;
+ }
+
+ r = ngx_stream_lua_get_req(L);
+ if (r == NULL) {
+ return luaL_error(L, "no request found");
+ }
+
+ if (u->request != r) {
+ return luaL_error(L, "bad request");
+ }
+
+ ngx_stream_lua_socket_check_busy_connecting(r, u, L);
+ ngx_stream_lua_socket_check_busy_reading(r, u, L);
+
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "stream lua tcp socket receiveuntil iterator");
+
+ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "stream lua tcp socket read timeout: %M", u->read_timeout);
+
+ u->input_filter = ngx_stream_lua_socket_read_until;
+
+ cp = lua_touserdata(L, lua_upvalueindex(3));
+
+ dd("checking existing state: %d", cp->state);
+
+ if (cp->state == -1) {
+ cp->state = 0;
+
+ lua_pushnil(L);
+ lua_pushnil(L);
+ lua_pushnil(L);
+ return 3;
+ }
+
+ cp->upstream = u;
+
+ cp->pattern.data =
+ (u_char *) lua_tolstring(L, lua_upvalueindex(2),
+ &cp->pattern.len);
+
+ u->input_filter_ctx = cp;
+
+ ctx = ngx_stream_lua_get_module_ctx(r, ngx_stream_lua_module);
+
+ if (u->bufs_in == NULL) {
+ u->bufs_in =
+ ngx_stream_lua_chain_get_free_buf(r->connection->log, r->pool,
+ &ctx->free_recv_bufs,
+ u->conf->buffer_size);
+
+ if (u->bufs_in == NULL) {
+ return luaL_error(L, "no memory");
+ }
+
+ u->buf_in = u->bufs_in;
+ u->buffer = *u->buf_in->buf;
+ }
+
+ u->length = (size_t) bytes;
+ u->rest = u->length;
+
+ if (u->raw_downstream || u->body_downstream) {
+ r->read_event_handler = ngx_stream_lua_req_socket_rev_handler;
+ }
+
+ u->read_waiting = 0;
+ u->read_co_ctx = NULL;
+
+ rc = ngx_stream_lua_socket_tcp_read(r, u);
+
+ if (rc == NGX_ERROR) {
+ dd("read failed: %d", (int) u->ft_type);
+ rc = ngx_stream_lua_socket_tcp_receive_retval_handler(r, u, L);
+ dd("tcp receive retval returned: %d", (int) rc);
+ return rc;
+ }
+
+ if (rc == NGX_OK) {
+
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "stream lua tcp socket receive done in a single run");
+
+ return ngx_stream_lua_socket_tcp_receive_retval_handler(r, u, L);
+ }
+
+ /* rc == NGX_AGAIN */
+
+ coctx = ctx->cur_co_ctx;
+
+ u->read_event_handler = ngx_stream_lua_socket_read_handler;
+
+ ngx_stream_lua_cleanup_pending_operation(coctx);
+ coctx->cleanup = ngx_stream_lua_coctx_cleanup;
+ coctx->data = u;
+
+ if (ctx->entered_content_phase) {
+ r->write_event_handler = ngx_stream_lua_content_wev_handler;
+
+ } else {
+ r->write_event_handler = ngx_stream_lua_core_run_phases;
+ }
+
+ u->read_co_ctx = coctx;
+ u->read_waiting = 1;
+ u->read_prepare_retvals = ngx_stream_lua_socket_tcp_receive_retval_handler;
+
+ dd("setting data to %p", u);
+
+ if (u->raw_downstream || u->body_downstream) {
+ ctx->downstream = u;
+ }
+
+ return lua_yield(L, 0);
+}
+
+
+static ngx_int_t
+ngx_stream_lua_socket_compile_pattern(u_char *data, size_t len,
+ ngx_stream_lua_socket_compiled_pattern_t *cp, ngx_log_t *log)
+{
+ size_t i;
+ size_t prefix_len;
+ size_t size;
+ unsigned found;
+ int cur_state, new_state;
+
+ ngx_stream_lua_dfa_edge_t *edge;
+ ngx_stream_lua_dfa_edge_t **last = NULL;
+
+ cp->pattern.len = len;
+
+ if (len <= 2) {
+ return NGX_OK;
+ }
+
+ for (i = 1; i < len; i++) {
+ prefix_len = 1;
+
+ while (prefix_len <= len - i - 1) {
+
+ if (ngx_memcmp(data, &data[i], prefix_len) == 0) {
+ if (data[prefix_len] == data[i + prefix_len]) {
+ prefix_len++;
+ continue;
+ }
+
+ cur_state = i + prefix_len;
+ new_state = prefix_len + 1;
+
+ if (cp->recovering == NULL) {
+ size = sizeof(void *) * (len - 2);
+ cp->recovering = ngx_alloc(size, log);
+ if (cp->recovering == NULL) {
+ return NGX_ERROR;
+ }
+
+ ngx_memzero(cp->recovering, size);
+ }
+
+ edge = cp->recovering[cur_state - 2];
+
+ found = 0;
+
+ if (edge == NULL) {
+ last = &cp->recovering[cur_state - 2];
+
+ } else {
+
+ for (; edge; edge = edge->next) {
+ last = &edge->next;
+
+ if (edge->chr == data[prefix_len]) {
+ found = 1;
+
+ if (edge->new_state < new_state) {
+ edge->new_state = new_state;
+ }
+
+ break;
+ }
+ }
+ }
+
+ if (!found) {
+ ngx_log_debug7(NGX_LOG_DEBUG_STREAM, log, 0,
+ "stream lua tcp socket read until "
+ "recovering point: on state %d (%*s), if "
+ "next is '%c', then recover to state %d "
+ "(%*s)", cur_state, (size_t) cur_state, data,
+ data[prefix_len], new_state,
+ (size_t) new_state, data);
+
+ edge = ngx_alloc(sizeof(ngx_stream_lua_dfa_edge_t), log);
+ if (edge == NULL) {
+ return NGX_ERROR;
+ }
+
+ edge->chr = data[prefix_len];
+ edge->new_state = new_state;
+ edge->next = NULL;
+
+ *last = edge;
+ }
+
+ break;
+ }
+
+ break;
+ }
+ }
+
+ return NGX_OK;
+}
+
+
+static ngx_int_t
+ngx_stream_lua_socket_read_until(void *data, ssize_t bytes)
+{
+ ngx_stream_lua_socket_compiled_pattern_t *cp = data;
+ ngx_stream_lua_socket_tcp_upstream_t *u;
+
+ ngx_stream_lua_request_t *r;
+ ngx_buf_t *b;
+ u_char c;
+ u_char *pat;
+ size_t pat_len;
+ int i;
+ int state;
+ int old_state = 0; /* just to make old
+ gcc happy */
+ ngx_stream_lua_dfa_edge_t *edge;
+ unsigned matched;
+ ngx_int_t rc;
+
+ u = cp->upstream;
+ r = u->request;
+
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "stream lua tcp socket read until");
+
+ if (bytes == 0) {
+ u->ft_type |= NGX_STREAM_LUA_SOCKET_FT_CLOSED;
+ return NGX_ERROR;
+ }
+
+ b = &u->buffer;
+
+ pat = cp->pattern.data;
+ pat_len = cp->pattern.len;
+ state = cp->state;
+
+ i = 0;
+ while (i < bytes) {
+ c = b->pos[i];
+
+ dd("%d: read char %d, state: %d", i, c, state);
+
+ if (c == pat[state]) {
+ i++;
+ state++;
+
+ if (state == (int) pat_len) {
+ /* already matched the whole pattern */
+ dd("pat len: %d", (int) pat_len);
+
+ b->pos += i;
+
+ if (u->length) {
+ cp->state = -1;
+
+ } else {
+ cp->state = 0;
+ }
+
+ if (cp->inclusive) {
+ rc = ngx_stream_lua_socket_add_pending_data(r, u, b->pos, 0,
+ pat, state,
+ state);
+
+ if (rc != NGX_OK) {
+ u->ft_type |= NGX_STREAM_LUA_SOCKET_FT_ERROR;
+ return NGX_ERROR;
+ }
+ }
+
+ return NGX_OK;
+ }
+
+ continue;
+ }
+
+ if (state == 0) {
+ u->buf_in->buf->last++;
+
+ i++;
+
+ if (u->length && --u->rest == 0) {
+ cp->state = state;
+ b->pos += i;
+ return NGX_OK;
+ }
+
+ continue;
+ }
+
+ matched = 0;
+
+ if (cp->recovering && state >= 2) {
+ dd("accessing state: %d, index: %d", state, state - 2);
+ for (edge = cp->recovering[state - 2]; edge; edge = edge->next) {
+
+ if (edge->chr == c) {
+ dd("matched '%c' and jumping to state %d", c,
+ edge->new_state);
+
+ old_state = state;
+ state = edge->new_state;
+ matched = 1;
+ break;
+ }
+ }
+ }
+
+ if (!matched) {
+#if 1
+ dd("adding pending data: %.*s", state, pat);
+ rc = ngx_stream_lua_socket_add_pending_data(r, u, b->pos, i, pat,
+ state, state);
+
+ if (rc != NGX_OK) {
+ u->ft_type |= NGX_STREAM_LUA_SOCKET_FT_ERROR;
+ return NGX_ERROR;
+ }
+
+#endif
+
+ if (u->length) {
+ if (u->rest <= (size_t) state) {
+ u->rest = 0;
+ cp->state = 0;
+ b->pos += i;
+ return NGX_OK;
+
+ } else {
+ u->rest -= state;
+ }
+ }
+
+ state = 0;
+ continue;
+ }
+
+ /* matched */
+
+ dd("adding pending data: %.*s", (int) (old_state + 1 - state),
+ (char *) pat);
+
+ rc = ngx_stream_lua_socket_add_pending_data(r, u, b->pos, i, pat,
+ old_state + 1 - state,
+ old_state);
+
+ if (rc != NGX_OK) {
+ u->ft_type |= NGX_STREAM_LUA_SOCKET_FT_ERROR;
+ return NGX_ERROR;
+ }
+
+ i++;
+
+ if (u->length) {
+ if (u->rest <= (size_t) state) {
+ u->rest = 0;
+ cp->state = state;
+ b->pos += i;
+ return NGX_OK;
+
+ } else {
+ u->rest -= state;
+ }
+ }
+
+ continue;
+ }
+
+ b->pos += i;
+ cp->state = state;
+
+ return NGX_AGAIN;
+}
+
+
+static int
+ngx_stream_lua_socket_cleanup_compiled_pattern(lua_State *L)
+{
+ ngx_stream_lua_socket_compiled_pattern_t *cp;
+
+ ngx_stream_lua_dfa_edge_t *edge, *p;
+ unsigned i;
+
+ dd("cleanup compiled pattern");
+
+ cp = lua_touserdata(L, 1);
+ if (cp == NULL || cp->recovering == NULL) {
+ return 0;
+ }
+
+ dd("pattern len: %d", (int) cp->pattern.len);
+
+ for (i = 0; i < cp->pattern.len - 2; i++) {
+ edge = cp->recovering[i];
+
+ while (edge) {
+ p = edge;
+ edge = edge->next;
+
+ dd("freeing edge %p", p);
+
+ ngx_free(p);
+
+ dd("edge: %p", edge);
+ }
+ }
+
+#if 1
+ ngx_free(cp->recovering);
+ cp->recovering = NULL;
+#endif
+
+ return 0;
+}
+
+
+int
+ngx_stream_lua_req_socket_tcp(lua_State *L)
+{
+ int n, raw;
+ ngx_peer_connection_t *pc;
+ ngx_stream_lua_loc_conf_t *llcf;
+ ngx_connection_t *c;
+ ngx_stream_lua_request_t *r;
+ ngx_stream_lua_ctx_t *ctx;
+ ngx_stream_lua_cleanup_t *cln;
+ ngx_stream_lua_co_ctx_t *coctx;
+
+ ngx_stream_lua_socket_tcp_upstream_t *u;
+
+ n = lua_gettop(L);
+
+ if (n != 0 && n != 1) {
+ return luaL_error(L, "expecting zero arguments, but got %d",
+ lua_gettop(L));
+ }
+
+ if (n == 1) {
+ lua_pop(L, 1);
+ }
+
+ raw = 1;
+
+ r = ngx_stream_lua_get_req(L);
+
+
+ ctx = ngx_stream_lua_get_module_ctx(r, ngx_stream_lua_module);
+ if (ctx == NULL) {
+ return luaL_error(L, "no ctx found");
+ }
+
+ ngx_stream_lua_check_context(L, ctx, NGX_STREAM_LUA_CONTEXT_CONTENT
+ |NGX_STREAM_LUA_CONTEXT_PREREAD);
+
+ c = r->connection;
+
+ if (raw) {
+
+ if (c->buffered) {
+ lua_pushnil(L);
+ lua_pushliteral(L, "pending data to write");
+ return 2;
+ }
+
+
+
+ dd("ctx acquired raw req socket: %d", ctx->acquired_raw_req_socket);
+
+ if (ctx->acquired_raw_req_socket) {
+ lua_pushnil(L);
+ lua_pushliteral(L, "duplicate call");
+ return 2;
+ }
+
+ ctx->acquired_raw_req_socket = 1;
+
+
+ } else {
+ }
+
+ lua_createtable(L, 2 /* narr */, 3 /* nrec */); /* the object */
+
+ if (raw) {
+ lua_pushlightuserdata(L, ngx_stream_lua_lightudata_mask(
+ raw_req_socket_metatable_key));
+
+ }
+
+ lua_rawget(L, LUA_REGISTRYINDEX);
+ lua_setmetatable(L, -2);
+
+ u = lua_newuserdata(L, sizeof(ngx_stream_lua_socket_tcp_upstream_t));
+ if (u == NULL) {
+ return luaL_error(L, "no memory");
+ }
+
+#if 1
+ lua_pushlightuserdata(L, ngx_stream_lua_lightudata_mask(
+ downstream_udata_metatable_key));
+ lua_rawget(L, LUA_REGISTRYINDEX);
+ lua_setmetatable(L, -2);
+#endif
+
+ lua_rawseti(L, 1, SOCKET_CTX_INDEX);
+
+ ngx_memzero(u, sizeof(ngx_stream_lua_socket_tcp_upstream_t));
+
+ if (raw) {
+ u->raw_downstream = 1;
+
+ } else {
+ }
+
+ coctx = ctx->cur_co_ctx;
+
+ u->request = r;
+
+ llcf = ngx_stream_lua_get_module_loc_conf(r, ngx_stream_lua_module);
+
+ u->conf = llcf;
+
+ u->read_timeout = u->conf->read_timeout;
+ u->connect_timeout = u->conf->connect_timeout;
+ u->send_timeout = u->conf->send_timeout;
+
+ cln = ngx_stream_lua_cleanup_add(r, 0);
+ if (cln == NULL) {
+ u->ft_type |= NGX_STREAM_LUA_SOCKET_FT_ERROR;
+ lua_pushnil(L);
+ lua_pushliteral(L, "no memory");
+ return 2;
+ }
+
+ cln->handler = ngx_stream_lua_socket_tcp_cleanup;
+ cln->data = u;
+ u->cleanup = &cln->handler;
+
+ pc = &u->peer;
+
+ pc->log = c->log;
+ pc->log_error = NGX_ERROR_ERR;
+
+ pc->connection = c;
+
+ dd("setting data to %p", u);
+
+ coctx->data = u;
+ ctx->downstream = u;
+
+ if (c->read->timer_set) {
+ ngx_del_timer(c->read);
+ }
+
+ if (raw) {
+ if (c->write->timer_set) {
+ ngx_del_timer(c->write);
+ }
+ }
+
+ lua_settop(L, 1);
+ return 1;
+}
+
+
+static void
+ngx_stream_lua_req_socket_rev_handler(ngx_stream_lua_request_t *r)
+{
+ ngx_stream_lua_ctx_t *ctx;
+ ngx_stream_lua_socket_tcp_upstream_t *u;
+
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "lua request socket read event handler");
+
+ ctx = ngx_stream_lua_get_module_ctx(r, ngx_stream_lua_module);
+ if (ctx == NULL) {
+ r->read_event_handler = ngx_stream_lua_block_reading;
+ return;
+ }
+
+ u = ctx->downstream;
+ if (u == NULL || u->peer.connection == NULL) {
+ r->read_event_handler = ngx_stream_lua_block_reading;
+ return;
+ }
+
+ u->read_event_handler(r, u);
+}
+
+static int
+ngx_stream_lua_socket_tcp_getreusedtimes(lua_State *L)
+{
+ ngx_stream_lua_socket_tcp_upstream_t *u;
+
+ if (lua_gettop(L) != 1) {
+ return luaL_error(L, "expecting 1 argument "
+ "(including the object), but got %d", lua_gettop(L));
+ }
+
+ luaL_checktype(L, 1, LUA_TTABLE);
+
+ lua_rawgeti(L, 1, SOCKET_CTX_INDEX);
+ u = lua_touserdata(L, -1);
+
+ if (u == NULL
+ || u->peer.connection == NULL
+ || (u->read_closed && u->write_closed))
+ {
+ lua_pushnil(L);
+ lua_pushliteral(L, "closed");
+ return 2;
+ }
+
+ lua_pushinteger(L, u->reused);
+ return 1;
+}
+
+
+static int
+ngx_stream_lua_socket_tcp_setkeepalive(lua_State *L)
+{
+ ngx_stream_lua_loc_conf_t *llcf;
+ ngx_stream_lua_socket_tcp_upstream_t *u;
+ ngx_stream_lua_socket_pool_t *spool;
+ ngx_stream_lua_socket_pool_item_t *item;
+
+ ngx_connection_t *c;
+ ngx_str_t key;
+ ngx_queue_t *q;
+ ngx_peer_connection_t *pc;
+ ngx_stream_lua_request_t *r;
+ ngx_msec_t timeout;
+ ngx_int_t pool_size;
+ int n;
+ ngx_int_t rc;
+ ngx_buf_t *b;
+ const char *msg;
+
+ n = lua_gettop(L);
+
+ if (n < 1 || n > 3) {
+ return luaL_error(L, "expecting 1 to 3 arguments "
+ "(including the object), but got %d", n);
+ }
+
+ luaL_checktype(L, 1, LUA_TTABLE);
+
+ r = ngx_stream_lua_get_req(L);
+ if (r == NULL) {
+ return luaL_error(L, "no request found");
+ }
+
+ llcf = ngx_stream_lua_get_module_loc_conf(r, ngx_stream_lua_module);
+
+ /* luaL_checkinteger will throw error if the argument is not a number.
+ * e.g.: bad argument \#2 to '?' (number expected, got string)
+ *
+ * We should check the argument in advance; otherwise,
+ * throwing an exception in the middle can compromise data integrity.
+ * e.g.: set pc->connection to NULL without following cleanup.
+ */
+ if (n >= 2 && !lua_isnil(L, 2)) {
+ timeout = (ngx_msec_t) luaL_checkinteger(L, 2);
+
+ } else {
+ timeout = llcf->keepalive_timeout;
+ }
+
+ if (n >= 3 && !lua_isnil(L, 3)) {
+ pool_size = luaL_checkinteger(L, 3);
+
+ } else {
+ pool_size = llcf->pool_size;
+ }
+
+ lua_rawgeti(L, 1, SOCKET_CTX_INDEX);
+ u = lua_touserdata(L, -1);
+ lua_pop(L, 1);
+
+ if (u == NULL) {
+ lua_pushnil(L);
+ lua_pushliteral(L, "closed");
+ return 2;
+ }
+
+ /* stack: obj timeout? size? */
+
+ pc = &u->peer;
+ c = pc->connection;
+
+ if (c == NULL || u->read_closed || u->write_closed) {
+ lua_pushnil(L);
+ lua_pushliteral(L, "closed");
+ return 2;
+ }
+
+ if (u->request != r) {
+ return luaL_error(L, "bad request");
+ }
+
+ ngx_stream_lua_socket_check_busy_connecting(r, u, L);
+ ngx_stream_lua_socket_check_busy_reading(r, u, L);
+ ngx_stream_lua_socket_check_busy_writing(r, u, L);
+
+ b = &u->buffer;
+
+ if (b->start && ngx_buf_size(b)) {
+ ngx_stream_lua_probe_socket_tcp_setkeepalive_buf_unread(r, u, b->pos,
+ b->last - b->pos);
+
+ lua_pushnil(L);
+ lua_pushliteral(L, "unread data in buffer");
+ return 2;
+ }
+
+ if (c->read->eof
+ || c->read->error
+ || c->read->timedout
+ || c->write->error
+ || c->write->timedout)
+ {
+ lua_pushnil(L);
+ lua_pushliteral(L, "invalid connection");
+ return 2;
+ }
+
+ if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
+ lua_pushnil(L);
+ lua_pushliteral(L, "failed to handle read event");
+ return 2;
+ }
+
+ if (ngx_terminate || ngx_exiting) {
+ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, pc->log, 0,
+ "stream lua tcp socket set keepalive while "
+ "process exiting, closing connection %p", c);
+
+ ngx_stream_lua_socket_tcp_finalize(r, u);
+ lua_pushinteger(L, 1);
+ return 1;
+ }
+
+ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, pc->log, 0,
+ "stream lua tcp socket set keepalive: saving "
+ "connection %p", c);
+
+ lua_pushlightuserdata(L, ngx_stream_lua_lightudata_mask(
+ socket_pool_key));
+ lua_rawget(L, LUA_REGISTRYINDEX);
+
+ /* stack: obj timeout? size? pools */
+
+ lua_rawgeti(L, 1, SOCKET_KEY_INDEX);
+ key.data = (u_char *) lua_tolstring(L, -1, &key.len);
+ if (key.data == NULL) {
+ lua_pushnil(L);
+ lua_pushliteral(L, "key not found");
+ return 2;
+ }
+
+ dd("saving connection to key %s", lua_tostring(L, -1));
+
+ lua_pushvalue(L, -1);
+ lua_rawget(L, -3);
+ spool = lua_touserdata(L, -1);
+ lua_pop(L, 1);
+
+ /* stack: obj timeout? size? pools cache_key */
+
+ if (spool == NULL) {
+ /* create a new socket pool for the current peer key */
+
+ if (pool_size <= 0) {
+ msg = lua_pushfstring(L, "bad \"pool_size\" option value: %i",
+ pool_size);
+ return luaL_argerror(L, n, msg);
+ }
+
+ ngx_stream_lua_socket_tcp_create_socket_pool(L, r, key,
+ pool_size, -1,
+ &spool);
+ }
+
+ if (ngx_queue_empty(&spool->free)) {
+
+ q = ngx_queue_last(&spool->cache);
+ ngx_queue_remove(q);
+
+ item = ngx_queue_data(q, ngx_stream_lua_socket_pool_item_t, queue);
+
+ ngx_stream_lua_socket_tcp_close_connection(item->connection);
+
+ /* only decrease the counter for connections which were counted */
+ if (u->socket_pool != NULL) {
+ u->socket_pool->connections--;
+ }
+
+ } else {
+ q = ngx_queue_head(&spool->free);
+ ngx_queue_remove(q);
+
+ item = ngx_queue_data(q, ngx_stream_lua_socket_pool_item_t, queue);
+
+ /* we should always increase connections after getting connected,
+ * and decrease connections after getting closed.
+ * however, we don't create connection pool in previous connect method.
+ * so we increase connections here for backward compatibility.
+ */
+ if (u->socket_pool == NULL) {
+ spool->connections++;
+ }
+ }
+
+ item->connection = c;
+ ngx_queue_insert_head(&spool->cache, q);
+
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, pc->log, 0,
+ "stream lua tcp socket clear current socket connection");
+
+ pc->connection = NULL;
+
+#if 0
+ if (u->cleanup) {
+ *u->cleanup = NULL;
+ u->cleanup = NULL;
+ }
+#endif
+
+ if (c->read->timer_set) {
+ ngx_del_timer(c->read);
+ }
+
+ if (c->write->timer_set) {
+ ngx_del_timer(c->write);
+ }
+
+#if (NGX_DEBUG)
+ if (timeout == 0) {
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "stream lua tcp socket keepalive timeout: unlimited");
+ }
+#endif
+
+ if (timeout) {
+ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "stream lua tcp socket keepalive timeout: %M ms",
+ timeout);
+
+ ngx_add_timer(c->read, timeout);
+ }
+
+ c->write->handler = ngx_stream_lua_socket_keepalive_dummy_handler;
+ c->read->handler = ngx_stream_lua_socket_keepalive_rev_handler;
+
+ c->data = item;
+ c->idle = 1;
+ c->log = ngx_cycle->log;
+ c->pool->log = ngx_cycle->log;
+ c->read->log = ngx_cycle->log;
+ c->write->log = ngx_cycle->log;
+
+ item->socklen = pc->socklen;
+ ngx_memcpy(&item->sockaddr, pc->sockaddr, pc->socklen);
+ item->reused = u->reused;
+
+ if (c->read->ready) {
+ rc = ngx_stream_lua_socket_keepalive_close_handler(c->read);
+ if (rc != NGX_OK) {
+ lua_pushnil(L);
+ lua_pushliteral(L, "connection in dubious state");
+ return 2;
+ }
+ }
+
+#if 1
+ ngx_stream_lua_socket_tcp_finalize(r, u);
+#endif
+
+ /* since we set u->peer->connection to NULL previously, the connect
+ * operation won't be resumed in the
+ * ngx_stream_lua_socket_tcp_finalize.
+ * Therefore we need to resume it here.
+ */
+ ngx_stream_lua_socket_tcp_resume_conn_op(spool);
+
+ lua_pushinteger(L, 1);
+ return 1;
+}
+
+
+static ngx_int_t
+ngx_stream_lua_get_keepalive_peer(ngx_stream_lua_request_t *r,
+ ngx_stream_lua_socket_tcp_upstream_t *u)
+{
+ ngx_stream_lua_socket_pool_item_t *item;
+ ngx_stream_lua_socket_pool_t *spool;
+
+ ngx_stream_lua_cleanup_t *cln;
+ ngx_queue_t *q;
+ ngx_peer_connection_t *pc;
+ ngx_connection_t *c;
+
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "stream lua tcp socket pool get keepalive peer");
+
+ pc = &u->peer;
+
+ spool = u->socket_pool;
+
+ if (!ngx_queue_empty(&spool->cache)) {
+ q = ngx_queue_head(&spool->cache);
+
+ item = ngx_queue_data(q, ngx_stream_lua_socket_pool_item_t, queue);
+ c = item->connection;
+
+ ngx_queue_remove(q);
+ ngx_queue_insert_head(&spool->free, q);
+
+ ngx_log_debug2(NGX_LOG_DEBUG_STREAM, pc->log, 0,
+ "stream lua tcp socket get keepalive peer: "
+ "using connection %p, fd:%d", c, c->fd);
+
+ c->idle = 0;
+ c->log = pc->log;
+ c->pool->log = pc->log;
+ c->read->log = pc->log;
+ c->write->log = pc->log;
+ c->data = u;
+
+#if 1
+ c->write->handler = ngx_stream_lua_socket_tcp_handler;
+ c->read->handler = ngx_stream_lua_socket_tcp_handler;
+#endif
+
+ if (c->read->timer_set) {
+ ngx_del_timer(c->read);
+ }
+
+ pc->connection = c;
+ pc->cached = 1;
+
+ u->reused = item->reused + 1;
+
+#if 1
+ u->write_event_handler = ngx_stream_lua_socket_dummy_handler;
+ u->read_event_handler = ngx_stream_lua_socket_dummy_handler;
+#endif
+
+ if (u->cleanup == NULL) {
+ cln = ngx_stream_lua_cleanup_add(r, 0);
+ if (cln == NULL) {
+ u->ft_type |= NGX_STREAM_LUA_SOCKET_FT_ERROR;
+ return NGX_ERROR;
+ }
+
+ cln->handler = ngx_stream_lua_socket_tcp_cleanup;
+ cln->data = u;
+ u->cleanup = &cln->handler;
+ }
+
+ return NGX_OK;
+ }
+
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, pc->log, 0,
+ "stream lua tcp socket keepalive: connection pool empty");
+
+ return NGX_DECLINED;
+}
+
+
+static void
+ngx_stream_lua_socket_keepalive_dummy_handler(ngx_event_t *ev)
+{
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, ev->log, 0,
+ "stream keepalive dummy handler");
+}
+
+
+static void
+ngx_stream_lua_socket_keepalive_rev_handler(ngx_event_t *ev)
+{
+ (void) ngx_stream_lua_socket_keepalive_close_handler(ev);
+}
+
+
+static ngx_int_t
+ngx_stream_lua_socket_keepalive_close_handler(ngx_event_t *ev)
+{
+ ngx_stream_lua_socket_pool_item_t *item;
+ ngx_stream_lua_socket_pool_t *spool;
+
+ int n;
+ unsigned char buf[1];
+ ngx_connection_t *c;
+
+ c = ev->data;
+
+ if (c->close) {
+ goto close;
+ }
+
+ if (c->read->timedout) {
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, ev->log, 0,
+ "stream lua tcp socket keepalive max idle timeout");
+
+ goto close;
+ }
+
+ dd("read event ready: %d", (int) c->read->ready);
+
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, ev->log, 0,
+ "stream lua tcp socket keepalive close handler "
+ "check stale events");
+
+ /* consume the possible ssl-layer data implicitly */
+ n = c->recv(c, buf, 1);
+
+ if (n == NGX_AGAIN) {
+ /* stale event */
+
+ if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
+ goto close;
+ }
+
+ return NGX_OK;
+ }
+
+close:
+
+ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, ev->log, 0,
+ "stream lua tcp socket keepalive close handler: fd:%d",
+ c->fd);
+
+ item = c->data;
+ spool = item->socket_pool;
+
+ ngx_stream_lua_socket_tcp_close_connection(c);
+
+ ngx_queue_remove(&item->queue);
+ ngx_queue_insert_head(&spool->free, &item->queue);
+ spool->connections--;
+
+ dd("keepalive: connections: %u", (unsigned) spool->connections);
+
+ if (spool->connections == 0) {
+ ngx_stream_lua_socket_free_pool(ev->log, spool);
+
+ } else {
+ ngx_stream_lua_socket_tcp_resume_conn_op(spool);
+ }
+
+ return NGX_DECLINED;
+}
+
+
+static void
+ngx_stream_lua_socket_free_pool(ngx_log_t *log,
+ ngx_stream_lua_socket_pool_t *spool)
+{
+ lua_State *L;
+
+ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, log, 0,
+ "stream lua tcp socket keepalive: free "
+ "connection pool for \"%s\"", spool->key);
+
+ L = spool->lua_vm;
+
+ lua_pushlightuserdata(L, ngx_stream_lua_lightudata_mask(
+ socket_pool_key));
+ lua_rawget(L, LUA_REGISTRYINDEX);
+ lua_pushstring(L, (char *) spool->key);
+ lua_pushnil(L);
+ lua_rawset(L, -3);
+ lua_pop(L, 1);
+}
+
+
+static void
+ngx_stream_lua_socket_shutdown_pool_helper(
+ ngx_stream_lua_socket_pool_t *spool)
+{
+ ngx_queue_t *q;
+ ngx_connection_t *c;
+ ngx_stream_lua_socket_pool_item_t *item;
+ ngx_stream_lua_socket_tcp_conn_op_ctx_t *conn_op_ctx;
+
+ while (!ngx_queue_empty(&spool->cache)) {
+ q = ngx_queue_head(&spool->cache);
+
+ item = ngx_queue_data(q, ngx_stream_lua_socket_pool_item_t, queue);
+ c = item->connection;
+
+ ngx_stream_lua_socket_tcp_close_connection(c);
+
+ ngx_queue_remove(q);
+ ngx_queue_insert_head(&spool->free, q);
+ }
+
+ while (!ngx_queue_empty(&spool->cache_connect_op)) {
+ q = ngx_queue_head(&spool->cache_connect_op);
+ ngx_queue_remove(q);
+ conn_op_ctx = ngx_queue_data(q, ngx_stream_lua_socket_tcp_conn_op_ctx_t,
+ queue);
+ ngx_stream_lua_socket_tcp_free_conn_op_ctx(conn_op_ctx);
+ }
+
+ while (!ngx_queue_empty(&spool->wait_connect_op)) {
+ q = ngx_queue_head(&spool->wait_connect_op);
+ ngx_queue_remove(q);
+ conn_op_ctx = ngx_queue_data(q, ngx_stream_lua_socket_tcp_conn_op_ctx_t,
+ queue);
+
+ if (conn_op_ctx->event.timer_set) {
+ ngx_del_timer(&conn_op_ctx->event);
+ }
+
+ ngx_stream_lua_socket_tcp_free_conn_op_ctx(conn_op_ctx);
+ }
+
+ /* spool->connections will be decreased down to zero in
+ * ngx_stream_lua_socket_tcp_finalize */
+}
+
+
+static int
+ngx_stream_lua_socket_shutdown_pool(lua_State *L)
+{
+ ngx_stream_lua_socket_pool_t *spool;
+
+ spool = lua_touserdata(L, 1);
+
+ if (spool != NULL) {
+ ngx_stream_lua_socket_shutdown_pool_helper(spool);
+ }
+
+ return 0;
+}
+
+
+static int
+ngx_stream_lua_socket_tcp_upstream_destroy(lua_State *L)
+{
+ ngx_stream_lua_socket_tcp_upstream_t *u;
+
+ dd("upstream destroy triggered by Lua GC");
+
+ u = lua_touserdata(L, 1);
+ if (u == NULL) {
+ return 0;
+ }
+
+ if (u->cleanup) {
+ ngx_stream_lua_socket_tcp_cleanup(u); /* it will clear u->cleanup */
+ }
+
+ return 0;
+}
+
+
+static int
+ngx_stream_lua_socket_downstream_destroy(lua_State *L)
+{
+ ngx_stream_lua_socket_tcp_upstream_t *u;
+
+ dd("downstream destroy");
+
+ u = lua_touserdata(L, 1);
+ if (u == NULL) {
+ dd("u is NULL");
+ return 0;
+ }
+
+ if (u->cleanup) {
+ ngx_stream_lua_socket_tcp_cleanup(u); /* it will clear u->cleanup */
+ }
+
+ return 0;
+}
+
+
+static ngx_int_t
+ngx_stream_lua_socket_push_input_data(ngx_stream_lua_request_t *r,
+ ngx_stream_lua_ctx_t *ctx, ngx_stream_lua_socket_tcp_upstream_t *u,
+ lua_State *L)
+{
+ ngx_chain_t *cl;
+ ngx_chain_t **ll;
+#if (DDEBUG) || (NGX_DTRACE)
+ size_t size = 0;
+#endif
+ size_t chunk_size;
+ ngx_buf_t *b;
+ size_t nbufs;
+ luaL_Buffer luabuf;
+
+ dd("bufs_in: %p, buf_in: %p", u->bufs_in, u->buf_in);
+
+ nbufs = 0;
+ ll = NULL;
+
+ luaL_buffinit(L, &luabuf);
+
+ for (cl = u->bufs_in; cl; cl = cl->next) {
+ b = cl->buf;
+ chunk_size = b->last - b->pos;
+
+ dd("copying input data chunk from %p: \"%.*s\"", cl,
+ (int) chunk_size, b->pos);
+
+ luaL_addlstring(&luabuf, (char *) b->pos, chunk_size);
+
+ if (cl->next) {
+ ll = &cl->next;
+ }
+
+#if (DDEBUG) || (NGX_DTRACE)
+ size += chunk_size;
+#endif
+
+ nbufs++;
+ }
+
+ luaL_pushresult(&luabuf);
+
+#if (DDEBUG)
+ dd("size: %d, nbufs: %d", (int) size, (int) nbufs);
+#endif
+
+#if (NGX_DTRACE)
+ ngx_stream_lua_probe_socket_tcp_receive_done(r, u,
+ (u_char *) lua_tostring(L, -1),
+ size);
+#endif
+
+ if (nbufs > 1 && ll) {
+ dd("recycle buffers: %d", (int) (nbufs - 1));
+
+ *ll = ctx->free_recv_bufs;
+ ctx->free_recv_bufs = u->bufs_in;
+ u->bufs_in = u->buf_in;
+ }
+
+ if (u->buffer.pos == u->buffer.last) {
+ dd("resetting u->buffer pos & last");
+ u->buffer.pos = u->buffer.start;
+ u->buffer.last = u->buffer.start;
+ }
+
+ if (u->bufs_in) {
+ u->buf_in->buf->last = u->buffer.pos;
+ u->buf_in->buf->pos = u->buffer.pos;
+ }
+
+ return NGX_OK;
+}
+
+
+static ngx_int_t
+ngx_stream_lua_socket_add_input_buffer(ngx_stream_lua_request_t *r,
+ ngx_stream_lua_socket_tcp_upstream_t *u)
+{
+ ngx_chain_t *cl;
+ ngx_stream_lua_ctx_t *ctx;
+
+ ctx = ngx_stream_lua_get_module_ctx(r, ngx_stream_lua_module);
+
+ cl = ngx_stream_lua_chain_get_free_buf(r->connection->log, r->pool,
+ &ctx->free_recv_bufs,
+ u->conf->buffer_size);
+
+ if (cl == NULL) {
+ return NGX_ERROR;
+ }
+
+ u->buf_in->next = cl;
+ u->buf_in = cl;
+ u->buffer = *cl->buf;
+
+ return NGX_OK;
+}
+
+
+static ngx_int_t
+ngx_stream_lua_socket_add_pending_data(ngx_stream_lua_request_t *r,
+ ngx_stream_lua_socket_tcp_upstream_t *u, u_char *pos, size_t len,
+ u_char *pat, int prefix, int old_state)
+{
+ u_char *last;
+ ngx_buf_t *b;
+
+ dd("resuming data: %d: [%.*s]", prefix, prefix, pat);
+
+ last = &pos[len];
+
+ b = u->buf_in->buf;
+
+ if (last - b->last == old_state) {
+ b->last += prefix;
+ return NGX_OK;
+ }
+
+ dd("need more buffers because %d != %d", (int) (last - b->last),
+ (int) old_state);
+
+ if (ngx_stream_lua_socket_insert_buffer(r, u, pat, prefix) != NGX_OK) {
+ return NGX_ERROR;
+ }
+
+ b->pos = last;
+ b->last = last;
+
+ return NGX_OK;
+}
+
+
+static ngx_int_t ngx_stream_lua_socket_insert_buffer(
+ ngx_stream_lua_request_t *r, ngx_stream_lua_socket_tcp_upstream_t *u,
+ u_char *pat, size_t prefix)
+{
+ ngx_chain_t *cl, *new_cl, **ll;
+ size_t size;
+ ngx_buf_t *b;
+
+ ngx_stream_lua_ctx_t *ctx;
+
+ if (prefix <= u->conf->buffer_size) {
+ size = u->conf->buffer_size;
+
+ } else {
+ size = prefix;
+ }
+
+ ctx = ngx_stream_lua_get_module_ctx(r, ngx_stream_lua_module);
+
+ new_cl = ngx_stream_lua_chain_get_free_buf(r->connection->log, r->pool,
+ &ctx->free_recv_bufs,
+ size);
+
+ if (new_cl == NULL) {
+ return NGX_ERROR;
+ }
+
+ b = new_cl->buf;
+
+ b->last = ngx_copy(b->last, pat, prefix);
+
+ dd("copy resumed data to %p: %d: \"%.*s\"",
+ new_cl, (int) (b->last - b->pos), (int) (b->last - b->pos), b->pos);
+
+ dd("before resuming data: bufs_in %p, buf_in %p, buf_in next %p",
+ u->bufs_in, u->buf_in, u->buf_in->next);
+
+ ll = &u->bufs_in;
+ for (cl = u->bufs_in; cl->next; cl = cl->next) {
+ ll = &cl->next;
+ }
+
+ *ll = new_cl;
+ new_cl->next = u->buf_in;
+
+ dd("after resuming data: bufs_in %p, buf_in %p, buf_in next %p",
+ u->bufs_in, u->buf_in, u->buf_in->next);
+
+#if (DDEBUG)
+ for (cl = u->bufs_in; cl; cl = cl->next) {
+ b = cl->buf;
+
+ dd("result buf after resuming data: %p: %.*s", cl,
+ (int) ngx_buf_size(b), b->pos);
+ }
+#endif
+
+ return NGX_OK;
+}
+
+
+static ngx_int_t
+ngx_stream_lua_socket_tcp_conn_op_resume(ngx_stream_lua_request_t *r)
+{
+ return ngx_stream_lua_socket_tcp_resume_helper(r,
+ SOCKET_OP_RESUME_CONN);
+}
+
+
+static ngx_int_t
+ngx_stream_lua_socket_tcp_conn_resume(ngx_stream_lua_request_t *r)
+{
+ return ngx_stream_lua_socket_tcp_resume_helper(r, SOCKET_OP_CONNECT);
+}
+
+
+static ngx_int_t
+ngx_stream_lua_socket_tcp_read_resume(ngx_stream_lua_request_t *r)
+{
+ return ngx_stream_lua_socket_tcp_resume_helper(r, SOCKET_OP_READ);
+}
+
+
+static ngx_int_t
+ngx_stream_lua_socket_tcp_write_resume(ngx_stream_lua_request_t *r)
+{
+ return ngx_stream_lua_socket_tcp_resume_helper(r, SOCKET_OP_WRITE);
+}
+
+
+static ngx_int_t
+ngx_stream_lua_socket_tcp_resume_helper(ngx_stream_lua_request_t *r,
+ int socket_op)
+{
+ int nret;
+ lua_State *vm;
+ ngx_int_t rc;
+ ngx_uint_t nreqs;
+ ngx_connection_t *c;
+
+ ngx_stream_lua_ctx_t *ctx;
+ ngx_stream_lua_co_ctx_t *coctx;
+ ngx_stream_lua_socket_tcp_upstream_t *u;
+ ngx_stream_lua_socket_tcp_conn_op_ctx_t *conn_op_ctx;
+ ngx_stream_lua_socket_tcp_retval_handler prepare_retvals;
+
+ ctx = ngx_stream_lua_get_module_ctx(r, ngx_stream_lua_module);
+ if (ctx == NULL) {
+ return NGX_ERROR;
+ }
+
+ ctx->resume_handler = ngx_stream_lua_wev_handler;
+
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "stream lua tcp operation done, resuming lua "
+ "thread");
+
+ coctx = ctx->cur_co_ctx;
+
+ dd("coctx: %p", coctx);
+
+ switch (socket_op) {
+
+ case SOCKET_OP_RESUME_CONN:
+ conn_op_ctx = coctx->data;
+ u = conn_op_ctx->u;
+ prepare_retvals = u->write_prepare_retvals;
+ break;
+
+ case SOCKET_OP_CONNECT:
+ case SOCKET_OP_WRITE:
+ u = coctx->data;
+ prepare_retvals = u->write_prepare_retvals;
+ break;
+
+ case SOCKET_OP_READ:
+ u = coctx->data;
+ prepare_retvals = u->read_prepare_retvals;
+ break;
+
+ default:
+ /* impossible to reach here */
+ return NGX_ERROR;
+ }
+
+ ngx_log_debug2(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "stream lua tcp socket calling prepare retvals handler %p, "
+ "u:%p", prepare_retvals, u);
+
+ nret = prepare_retvals(r, u, ctx->cur_co_ctx->co);
+ if (socket_op == SOCKET_OP_CONNECT
+ && nret > 1
+ && !u->conn_closed
+ && u->socket_pool != NULL)
+ {
+ u->socket_pool->connections--;
+ ngx_stream_lua_socket_tcp_resume_conn_op(u->socket_pool);
+ }
+
+ if (nret == NGX_AGAIN) {
+ return NGX_DONE;
+ }
+
+ c = r->connection;
+ vm = ngx_stream_lua_get_lua_vm(r, ctx);
+ nreqs = c->requests;
+
+ rc = ngx_stream_lua_run_thread(vm, r, ctx, nret);
+
+ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, r->connection->log, 0,
+ "stream lua run thread returned %d", rc);
+
+ if (rc == NGX_AGAIN) {
+ return ngx_stream_lua_run_posted_threads(c, vm, r, ctx, nreqs);
+ }
+
+ if (rc == NGX_DONE) {
+ ngx_stream_lua_finalize_request(r, NGX_DONE);
+ return ngx_stream_lua_run_posted_threads(c, vm, r, ctx, nreqs);
+ }
+
+ if (ctx->entered_content_phase) {
+ ngx_stream_lua_finalize_request(r, rc);
+ return NGX_DONE;
+ }
+
+ return rc;
+}
+
+
+static void
+ngx_stream_lua_tcp_queue_conn_op_cleanup(void *data)
+{
+ ngx_stream_lua_co_ctx_t *coctx = data;
+ ngx_stream_lua_socket_tcp_upstream_t *u;
+ ngx_stream_lua_socket_tcp_conn_op_ctx_t *conn_op_ctx;
+
+ conn_op_ctx = coctx->data;
+ u = conn_op_ctx->u;
+
+ ngx_log_debug2(NGX_LOG_DEBUG_STREAM, ngx_cycle->log, 0,
+ "stream lua tcp socket abort queueing, "
+ "conn_op_ctx: %p, u: %p",
+ conn_op_ctx, u);
+
+ if (conn_op_ctx->event.posted) {
+ ngx_delete_posted_event(&conn_op_ctx->event);
+
+ } else if (conn_op_ctx->event.timer_set) {
+ ngx_del_timer(&conn_op_ctx->event);
+ }
+
+ ngx_queue_remove(&conn_op_ctx->queue);
+ ngx_queue_insert_head(&u->socket_pool->cache_connect_op,
+ &conn_op_ctx->queue);
+
+ u->socket_pool->connections--;
+ ngx_stream_lua_socket_tcp_resume_conn_op(u->socket_pool);
+}
+
+
+static void
+ngx_stream_lua_tcp_resolve_cleanup(void *data)
+{
+ ngx_resolver_ctx_t *rctx;
+ ngx_stream_lua_socket_tcp_upstream_t *u;
+ ngx_stream_lua_co_ctx_t *coctx = data;
+
+ ngx_log_debug0(NGX_LOG_DEBUG_STREAM, ngx_cycle->log, 0,
+ "stream lua tcp socket abort resolver");
+
+ u = coctx->data;
+ if (u == NULL) {
+ return;
+ }
+
+ if (u->socket_pool != NULL) {
+ u->socket_pool->connections--;
+ ngx_stream_lua_socket_tcp_resume_conn_op(u->socket_pool);
+ }
+
+ rctx = u->resolved->ctx;
+ if (rctx == NULL) {
+ return;
+ }
+
+ /* postpone free the rctx in the handler */
+ rctx->handler = ngx_resolve_name_done;
+}
+
+
+static void
+ngx_stream_lua_coctx_cleanup(void *data)
+{
+ ngx_stream_lua_socket_tcp_upstream_t *u;
+ ngx_stream_lua_co_ctx_t *coctx = data;
+
+ dd("running coctx cleanup");
+
+ u = coctx->data;
+ if (u == NULL) {
+ return;
+ }
+
+ if (u->request == NULL) {
+ return;
+ }
+
+ ngx_stream_lua_socket_tcp_finalize(u->request, u);
+}
+
+
+#if (NGX_STREAM_SSL)
+
+static int
+ngx_stream_lua_ssl_free_session(lua_State *L)
+{
+ ngx_ssl_session_t **psession;
+
+ psession = lua_touserdata(L, 1);
+ if (psession && *psession != NULL) {
+ ngx_log_debug1(NGX_LOG_DEBUG_STREAM, ngx_cycle->log, 0,
+ "stream lua ssl free session: %p", *psession);
+
+ ngx_ssl_free_session(*psession);
+ }
+
+ return 0;
+}
+
+#endif /* NGX_STREAM_SSL */
+
+
+void
+ngx_stream_lua_cleanup_conn_pools(lua_State *L)
+{
+ ngx_stream_lua_socket_pool_t *spool;
+
+ lua_pushlightuserdata(L, ngx_stream_lua_lightudata_mask(
+ socket_pool_key));
+ lua_rawget(L, LUA_REGISTRYINDEX); /* table */
+
+ lua_pushnil(L); /* first key */
+ while (lua_next(L, -2) != 0) {
+ /* tb key val */
+ spool = lua_touserdata(L, -1);
+
+ if (spool != NULL) {
+ ngx_log_debug2(NGX_LOG_DEBUG_STREAM, ngx_cycle->log, 0,
+ "stream lua tcp socket keepalive: free "
+ "connection pool %p for \"%s\"", spool, spool->key);
+
+ ngx_stream_lua_socket_shutdown_pool_helper(spool);
+ }
+
+ lua_pop(L, 1);
+ }
+
+ lua_pop(L, 1);
+}
+
+/* vi:set ft=c ts=4 sw=4 et fdm=marker: */