summaryrefslogtreecommitdiff
path: root/ngx_stream_lua-0.0.16/src/ngx_stream_lua_uthread.c
diff options
context:
space:
mode:
authorkaiwu <kaiwu2004@gmail.com>2025-03-01 12:42:23 +0800
committerkaiwu <kaiwu2004@gmail.com>2025-03-01 12:42:23 +0800
commit3f33461e4948bf05e60bdff35ec6c57a649c7860 (patch)
tree284c2ba95a41536ae1bff6bea710db0709a64739 /ngx_stream_lua-0.0.16/src/ngx_stream_lua_uthread.c
downloadopenresty-3f33461e4948bf05e60bdff35ec6c57a649c7860.tar.gz
openresty-3f33461e4948bf05e60bdff35ec6c57a649c7860.zip
openresty bundle
Diffstat (limited to 'ngx_stream_lua-0.0.16/src/ngx_stream_lua_uthread.c')
-rw-r--r--ngx_stream_lua-0.0.16/src/ngx_stream_lua_uthread.c290
1 files changed, 290 insertions, 0 deletions
diff --git a/ngx_stream_lua-0.0.16/src/ngx_stream_lua_uthread.c b/ngx_stream_lua-0.0.16/src/ngx_stream_lua_uthread.c
new file mode 100644
index 0000000..8d906de
--- /dev/null
+++ b/ngx_stream_lua-0.0.16/src/ngx_stream_lua_uthread.c
@@ -0,0 +1,290 @@
+
+/*
+ * !!! DO NOT EDIT DIRECTLY !!!
+ * This file was automatically generated from the following template:
+ *
+ * src/subsys/ngx_subsys_lua_uthread.c.tt2
+ */
+
+
+/*
+ * Copyright (C) Yichun Zhang (agentzh)
+ */
+
+
+#ifndef DDEBUG
+#define DDEBUG 0
+#endif
+#include "ddebug.h"
+
+
+#include "ngx_stream_lua_uthread.h"
+#include "ngx_stream_lua_coroutine.h"
+#include "ngx_stream_lua_util.h"
+#include "ngx_stream_lua_probe.h"
+
+
+#if 1
+#undef ngx_stream_lua_probe_info
+#define ngx_stream_lua_probe_info(msg)
+#endif
+
+
+static int ngx_stream_lua_uthread_spawn(lua_State *L);
+static int ngx_stream_lua_uthread_wait(lua_State *L);
+static int ngx_stream_lua_uthread_kill(lua_State *L);
+
+
+void
+ngx_stream_lua_inject_uthread_api(ngx_log_t *log, lua_State *L)
+{
+ /* new thread table */
+ lua_createtable(L, 0 /* narr */, 3 /* nrec */);
+
+ lua_pushcfunction(L, ngx_stream_lua_uthread_spawn);
+ lua_setfield(L, -2, "spawn");
+
+ lua_pushcfunction(L, ngx_stream_lua_uthread_wait);
+ lua_setfield(L, -2, "wait");
+
+ lua_pushcfunction(L, ngx_stream_lua_uthread_kill);
+ lua_setfield(L, -2, "kill");
+
+ lua_setfield(L, -2, "thread");
+}
+
+
+static int
+ngx_stream_lua_uthread_spawn(lua_State *L)
+{
+ int n;
+ ngx_stream_lua_request_t *r;
+ ngx_stream_lua_ctx_t *ctx;
+ ngx_stream_lua_co_ctx_t *coctx = NULL;
+
+ n = lua_gettop(L);
+
+ r = ngx_stream_lua_get_req(L);
+ if (r == NULL) {
+ return luaL_error(L, "no request found");
+ }
+
+ ctx = ngx_stream_lua_get_module_ctx(r, ngx_stream_lua_module);
+ if (ctx == NULL) {
+ return luaL_error(L, "no request ctx found");
+ }
+
+ ngx_stream_lua_coroutine_create_helper(L, r, ctx, &coctx);
+
+ /* anchor the newly created coroutine into the Lua registry */
+
+ lua_pushlightuserdata(L, ngx_stream_lua_lightudata_mask(
+ coroutines_key));
+ lua_rawget(L, LUA_REGISTRYINDEX);
+ lua_pushvalue(L, -2);
+ coctx->co_ref = luaL_ref(L, -2);
+ lua_pop(L, 1);
+
+ if (n > 1) {
+ lua_replace(L, 1);
+ lua_xmove(L, coctx->co, n - 1);
+ }
+
+ coctx->is_uthread = 1;
+ ctx->uthreads++;
+
+ coctx->co_status = NGX_STREAM_LUA_CO_RUNNING;
+ ctx->co_op = NGX_STREAM_LUA_USER_THREAD_RESUME;
+
+ ctx->cur_co_ctx->thread_spawn_yielded = 1;
+
+ if (ngx_stream_lua_post_thread(r, ctx, ctx->cur_co_ctx) != NGX_OK) {
+ return luaL_error(L, "no memory");
+ }
+
+ coctx->parent_co_ctx = ctx->cur_co_ctx;
+ ctx->cur_co_ctx = coctx;
+
+ ngx_stream_lua_attach_co_ctx_to_L(coctx->co, coctx);
+
+ ngx_stream_lua_probe_user_thread_spawn(r, L, coctx->co);
+
+ dd("yielding with arg %s, top=%d, index-1:%s", luaL_typename(L, -1),
+ (int) lua_gettop(L), luaL_typename(L, 1));
+ return lua_yield(L, 1);
+}
+
+
+static int
+ngx_stream_lua_uthread_wait(lua_State *L)
+{
+ int i, nargs, nrets;
+ lua_State *sub_co;
+ ngx_stream_lua_request_t *r;
+
+ ngx_stream_lua_ctx_t *ctx;
+ ngx_stream_lua_co_ctx_t *coctx, *sub_coctx;
+
+ r = ngx_stream_lua_get_req(L);
+ if (r == NULL) {
+ return luaL_error(L, "no request found");
+ }
+
+ ctx = ngx_stream_lua_get_module_ctx(r, ngx_stream_lua_module);
+ if (ctx == NULL) {
+ return luaL_error(L, "no request ctx found");
+ }
+
+ ngx_stream_lua_check_context(L, ctx, NGX_STREAM_LUA_CONTEXT_YIELDABLE);
+
+ coctx = ctx->cur_co_ctx;
+
+ nargs = lua_gettop(L);
+
+ for (i = 1; i <= nargs; i++) {
+ sub_co = lua_tothread(L, i);
+
+ luaL_argcheck(L, sub_co, i, "lua thread expected");
+
+ sub_coctx = ngx_stream_lua_get_co_ctx(sub_co, ctx);
+ if (sub_coctx == NULL) {
+ return luaL_error(L, "no co ctx found");
+ }
+
+ if (!sub_coctx->is_uthread) {
+ return luaL_error(L, "attempt to wait on a coroutine that is "
+ "not a user thread");
+ }
+
+ if (sub_coctx->parent_co_ctx != coctx) {
+ return luaL_error(L, "only the parent coroutine can wait on the "
+ "thread");
+ }
+
+ switch (sub_coctx->co_status) {
+ case NGX_STREAM_LUA_CO_ZOMBIE:
+
+ ngx_stream_lua_probe_info("found zombie child");
+
+ nrets = lua_gettop(sub_coctx->co);
+
+ dd("child retval count: %d, %s: %s", (int) nrets,
+ luaL_typename(sub_coctx->co, -1),
+ lua_tostring(sub_coctx->co, -1));
+
+ if (nrets) {
+ lua_xmove(sub_coctx->co, L, nrets);
+ }
+
+#if 1
+ ngx_stream_lua_del_thread(r, L, ctx, sub_coctx);
+ ctx->uthreads--;
+#endif
+
+ return nrets;
+
+ case NGX_STREAM_LUA_CO_DEAD:
+ dd("uthread already waited: %p (parent %p)", sub_coctx,
+ coctx);
+
+ if (i < nargs) {
+ /* just ignore it if it is not the last one */
+ continue;
+ }
+
+ /* being the last one */
+ lua_pushnil(L);
+ lua_pushliteral(L, "already waited or killed");
+ return 2;
+
+ default:
+ dd("uthread %p still alive, status: %d, parent %p", sub_coctx,
+ sub_coctx->co_status, coctx);
+ break;
+ }
+
+ ngx_stream_lua_probe_user_thread_wait(L, sub_coctx->co);
+ sub_coctx->waited_by_parent = 1;
+ }
+
+ return lua_yield(L, 0);
+}
+
+
+static int
+ngx_stream_lua_uthread_kill(lua_State *L)
+{
+ lua_State *sub_co;
+ ngx_stream_lua_request_t *r;
+
+ ngx_stream_lua_ctx_t *ctx;
+ ngx_stream_lua_co_ctx_t *coctx, *sub_coctx;
+
+ r = ngx_stream_lua_get_req(L);
+ if (r == NULL) {
+ return luaL_error(L, "no request found");
+ }
+
+ ctx = ngx_stream_lua_get_module_ctx(r, ngx_stream_lua_module);
+ if (ctx == NULL) {
+ return luaL_error(L, "no request ctx found");
+ }
+
+ ngx_stream_lua_check_context(L, ctx, NGX_STREAM_LUA_CONTEXT_CONTENT
+
+ | NGX_STREAM_LUA_CONTEXT_PREREAD
+ | NGX_STREAM_LUA_CONTEXT_SSL_CLIENT_HELLO
+ | NGX_STREAM_LUA_CONTEXT_SSL_CERT
+ | NGX_STREAM_LUA_CONTEXT_TIMER);
+
+ coctx = ctx->cur_co_ctx;
+
+ sub_co = lua_tothread(L, 1);
+ luaL_argcheck(L, sub_co, 1, "lua thread expected");
+
+ sub_coctx = ngx_stream_lua_get_co_ctx(sub_co, ctx);
+
+ if (sub_coctx == NULL) {
+ return luaL_error(L, "no co ctx found");
+ }
+
+ if (!sub_coctx->is_uthread) {
+ lua_pushnil(L);
+ lua_pushliteral(L, "not user thread");
+ return 2;
+ }
+
+ if (sub_coctx->parent_co_ctx != coctx) {
+ lua_pushnil(L);
+ lua_pushliteral(L, "killer not parent");
+ return 2;
+ }
+
+
+ switch (sub_coctx->co_status) {
+ case NGX_STREAM_LUA_CO_ZOMBIE:
+ ngx_stream_lua_del_thread(r, L, ctx, sub_coctx);
+ ctx->uthreads--;
+
+ lua_pushnil(L);
+ lua_pushliteral(L, "already terminated");
+ return 2;
+
+ case NGX_STREAM_LUA_CO_DEAD:
+ lua_pushnil(L);
+ lua_pushliteral(L, "already waited or killed");
+ return 2;
+
+ default:
+ ngx_stream_lua_cleanup_pending_operation(sub_coctx);
+ ngx_stream_lua_del_thread(r, L, ctx, sub_coctx);
+ ctx->uthreads--;
+
+ lua_pushinteger(L, 1);
+ return 1;
+ }
+
+ /* not reachable */
+}
+
+/* vi:set ft=c ts=4 sw=4 et fdm=marker: */