summaryrefslogtreecommitdiff
path: root/ngx_lua-0.10.28/src/ngx_http_lua_semaphore.c
diff options
context:
space:
mode:
authorkaiwu <kaiwu2004@gmail.com>2025-03-01 12:42:23 +0800
committerkaiwu <kaiwu2004@gmail.com>2025-03-01 12:42:23 +0800
commit3f33461e4948bf05e60bdff35ec6c57a649c7860 (patch)
tree284c2ba95a41536ae1bff6bea710db0709a64739 /ngx_lua-0.10.28/src/ngx_http_lua_semaphore.c
downloadopenresty-3f33461e4948bf05e60bdff35ec6c57a649c7860.tar.gz
openresty-3f33461e4948bf05e60bdff35ec6c57a649c7860.zip
openresty bundle
Diffstat (limited to 'ngx_lua-0.10.28/src/ngx_http_lua_semaphore.c')
-rw-r--r--ngx_lua-0.10.28/src/ngx_http_lua_semaphore.c580
1 files changed, 580 insertions, 0 deletions
diff --git a/ngx_lua-0.10.28/src/ngx_http_lua_semaphore.c b/ngx_lua-0.10.28/src/ngx_http_lua_semaphore.c
new file mode 100644
index 0000000..435beaa
--- /dev/null
+++ b/ngx_lua-0.10.28/src/ngx_http_lua_semaphore.c
@@ -0,0 +1,580 @@
+
+/*
+ * Copyright (C) Yichun Zhang (agentzh)
+ * Copyright (C) cuiweixie
+ * I hereby assign copyright in this code to the lua-nginx-module project,
+ * to be licensed under the same terms as the rest of the code.
+ */
+
+
+#ifndef DDEBUG
+#define DDEBUG 0
+#endif
+#include "ddebug.h"
+
+
+#include "ngx_http_lua_util.h"
+#include "ngx_http_lua_semaphore.h"
+#include "ngx_http_lua_contentby.h"
+
+
+ngx_int_t ngx_http_lua_sema_mm_init(ngx_conf_t *cf,
+ ngx_http_lua_main_conf_t *lmcf);
+void ngx_http_lua_sema_mm_cleanup(void *data);
+static ngx_http_lua_sema_t *ngx_http_lua_alloc_sema(void);
+static void ngx_http_lua_free_sema(ngx_http_lua_sema_t *sem);
+static ngx_int_t ngx_http_lua_sema_resume(ngx_http_request_t *r);
+int ngx_http_lua_ffi_sema_new(ngx_http_lua_sema_t **psem,
+ int n, char **errmsg);
+int ngx_http_lua_ffi_sema_post(ngx_http_lua_sema_t *sem, int n);
+int ngx_http_lua_ffi_sema_wait(ngx_http_request_t *r,
+ ngx_http_lua_sema_t *sem, int wait_ms, u_char *err, size_t *errlen);
+static void ngx_http_lua_sema_cleanup(void *data);
+static void ngx_http_lua_sema_handler(ngx_event_t *ev);
+static void ngx_http_lua_sema_timeout_handler(ngx_event_t *ev);
+void ngx_http_lua_ffi_sema_gc(ngx_http_lua_sema_t *sem);
+
+
+enum {
+ SEMAPHORE_WAIT_SUCC = 0,
+ SEMAPHORE_WAIT_TIMEOUT = 1,
+};
+
+
+ngx_int_t
+ngx_http_lua_sema_mm_init(ngx_conf_t *cf, ngx_http_lua_main_conf_t *lmcf)
+{
+ ngx_http_lua_sema_mm_t *mm;
+
+ mm = ngx_palloc(cf->pool, sizeof(ngx_http_lua_sema_mm_t));
+ if (mm == NULL) {
+ return NGX_ERROR;
+ }
+
+ lmcf->sema_mm = mm;
+ mm->lmcf = lmcf;
+
+ ngx_queue_init(&mm->free_queue);
+ mm->cur_epoch = 0;
+ mm->total = 0;
+ mm->used = 0;
+
+ /* it's better to be 4096, but it needs some space for
+ * ngx_http_lua_sema_mm_block_t, one is enough, so it is 4095
+ */
+ mm->num_per_block = 4095;
+
+ return NGX_OK;
+}
+
+
+static ngx_http_lua_sema_t *
+ngx_http_lua_alloc_sema(void)
+{
+ ngx_uint_t i, n;
+ ngx_queue_t *q;
+ ngx_http_lua_sema_t *sem, *iter;
+ ngx_http_lua_sema_mm_t *mm;
+ ngx_http_lua_main_conf_t *lmcf;
+ ngx_http_lua_sema_mm_block_t *block;
+
+ ngx_http_lua_assert(ngx_cycle && ngx_cycle->conf_ctx);
+
+ lmcf = ngx_http_cycle_get_module_main_conf(ngx_cycle,
+ ngx_http_lua_module);
+
+ ngx_http_lua_assert(lmcf != NULL);
+
+ mm = lmcf->sema_mm;
+
+ if (!ngx_queue_empty(&mm->free_queue)) {
+ q = ngx_queue_head(&mm->free_queue);
+ ngx_queue_remove(q);
+
+ sem = ngx_queue_data(q, ngx_http_lua_sema_t, chain);
+
+ sem->block->used++;
+
+ ngx_memzero(&sem->sem_event, sizeof(ngx_event_t));
+
+ sem->sem_event.handler = ngx_http_lua_sema_handler;
+ sem->sem_event.data = sem;
+ sem->sem_event.log = ngx_cycle->log;
+
+ mm->used++;
+
+ ngx_log_debug1(NGX_LOG_DEBUG_HTTP, ngx_cycle->log, 0,
+ "from head of free queue, alloc semaphore: %p", sem);
+
+ return sem;
+ }
+
+ /* free_queue is empty */
+
+ n = sizeof(ngx_http_lua_sema_mm_block_t)
+ + mm->num_per_block * sizeof(ngx_http_lua_sema_t);
+
+ dd("block size: %d, item size: %d",
+ (int) sizeof(ngx_http_lua_sema_mm_block_t),
+ (int) sizeof(ngx_http_lua_sema_t));
+
+ block = ngx_alloc(n, ngx_cycle->log);
+ if (block == NULL) {
+ return NULL;
+ }
+
+ mm->cur_epoch++;
+ mm->total += mm->num_per_block;
+ mm->used++;
+
+ block->mm = mm;
+ block->epoch = mm->cur_epoch;
+
+ sem = (ngx_http_lua_sema_t *) (block + 1);
+ sem->block = block;
+ sem->block->used = 1;
+
+ ngx_memzero(&sem->sem_event, sizeof(ngx_event_t));
+
+ sem->sem_event.handler = ngx_http_lua_sema_handler;
+ sem->sem_event.data = sem;
+ sem->sem_event.log = ngx_cycle->log;
+
+ for (iter = sem + 1, i = 1; i < mm->num_per_block; i++, iter++) {
+ iter->block = block;
+ ngx_queue_insert_tail(&mm->free_queue, &iter->chain);
+ }
+
+ ngx_log_debug2(NGX_LOG_DEBUG_HTTP, ngx_cycle->log, 0,
+ "new block, alloc semaphore: %p block: %p", sem, block);
+
+ return sem;
+}
+
+
+void
+ngx_http_lua_sema_mm_cleanup(void *data)
+{
+ ngx_uint_t i;
+ ngx_queue_t *q;
+ ngx_http_lua_sema_t *sem, *iter;
+ ngx_http_lua_sema_mm_t *mm;
+ ngx_http_lua_main_conf_t *lmcf;
+ ngx_http_lua_sema_mm_block_t *block;
+
+ lmcf = (ngx_http_lua_main_conf_t *) data;
+ mm = lmcf->sema_mm;
+
+ while (!ngx_queue_empty(&mm->free_queue)) {
+ q = ngx_queue_head(&mm->free_queue);
+
+ sem = ngx_queue_data(q, ngx_http_lua_sema_t, chain);
+ block = sem->block;
+
+ ngx_http_lua_assert(block != NULL);
+
+ if (block->used == 0) {
+ iter = (ngx_http_lua_sema_t *) (block + 1);
+
+ for (i = 0; i < block->mm->num_per_block; i++, iter++) {
+ ngx_queue_remove(&iter->chain);
+ }
+
+ dd("free sema block: %p at final", block);
+
+ ngx_free(block);
+
+ } else {
+ /* just return directly when some thing goes wrong */
+
+ ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, 0,
+ "lua sema mm: freeing a block %p that is still "
+ " used by someone", block);
+
+ return;
+ }
+ }
+
+ dd("lua sema mm cleanup done");
+}
+
+
+static void
+ngx_http_lua_free_sema(ngx_http_lua_sema_t *sem)
+{
+ ngx_http_lua_sema_t *iter;
+ ngx_uint_t i, mid_epoch;
+ ngx_http_lua_sema_mm_block_t *block;
+ ngx_http_lua_sema_mm_t *mm;
+
+ block = sem->block;
+ block->used--;
+
+ mm = block->mm;
+ mm->used--;
+
+ mid_epoch = mm->cur_epoch - ((mm->total / mm->num_per_block) >> 1);
+
+ if (block->epoch < mid_epoch) {
+ ngx_queue_insert_tail(&mm->free_queue, &sem->chain);
+ ngx_log_debug4(NGX_LOG_DEBUG_HTTP, ngx_cycle->log, 0,
+ "add to free queue tail semaphore: %p epoch: %d"
+ "mid_epoch: %d cur_epoch: %d", sem, (int) block->epoch,
+ (int) mid_epoch, (int) mm->cur_epoch);
+
+ } else {
+ ngx_queue_insert_head(&mm->free_queue, &sem->chain);
+ ngx_log_debug4(NGX_LOG_DEBUG_HTTP, ngx_cycle->log, 0,
+ "add to free queue head semaphore: %p epoch: %d"
+ "mid_epoch: %d cur_epoch: %d", sem, (int) block->epoch,
+ (int) mid_epoch, (int) mm->cur_epoch);
+ }
+
+ dd("used: %d", (int) block->used);
+
+ if (block->used == 0
+ && mm->used <= (mm->total >> 1)
+ && block->epoch < mid_epoch)
+ {
+ /* load <= 50% and it's on the older side */
+ iter = (ngx_http_lua_sema_t *) (block + 1);
+
+ for (i = 0; i < mm->num_per_block; i++, iter++) {
+ ngx_queue_remove(&iter->chain);
+ }
+
+ mm->total -= mm->num_per_block;
+
+ ngx_log_debug1(NGX_LOG_DEBUG_HTTP, ngx_cycle->log, 0,
+ "free semaphore block: %p", block);
+
+ ngx_free(block);
+ }
+}
+
+
+static ngx_int_t
+ngx_http_lua_sema_resume(ngx_http_request_t *r)
+{
+ lua_State *vm;
+ ngx_connection_t *c;
+ ngx_int_t rc;
+ ngx_uint_t nreqs;
+ ngx_http_lua_ctx_t *ctx;
+
+ ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module);
+ if (ctx == NULL) {
+ return NGX_ERROR;
+ }
+
+ ctx->resume_handler = ngx_http_lua_wev_handler;
+
+ c = r->connection;
+ vm = ngx_http_lua_get_lua_vm(r, ctx);
+ nreqs = c->requests;
+
+ if (ctx->cur_co_ctx->sem_resume_status == SEMAPHORE_WAIT_SUCC) {
+ lua_pushboolean(ctx->cur_co_ctx->co, 1);
+ lua_pushnil(ctx->cur_co_ctx->co);
+
+ } else {
+ lua_pushboolean(ctx->cur_co_ctx->co, 0);
+ lua_pushliteral(ctx->cur_co_ctx->co, "timeout");
+ }
+
+ rc = ngx_http_lua_run_thread(vm, r, ctx, 2);
+
+ ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
+ "lua run thread returned %d", rc);
+
+ if (rc == NGX_AGAIN) {
+ return ngx_http_lua_run_posted_threads(c, vm, r, ctx, nreqs);
+ }
+
+ if (rc == NGX_DONE) {
+ ngx_http_lua_finalize_request(r, NGX_DONE);
+ return ngx_http_lua_run_posted_threads(c, vm, r, ctx, nreqs);
+ }
+
+ /* rc == NGX_ERROR || rc >= NGX_OK */
+
+ if (ctx->entered_content_phase) {
+ ngx_http_lua_finalize_request(r, rc);
+ return NGX_DONE;
+ }
+
+ return rc;
+}
+
+
+int
+ngx_http_lua_ffi_sema_new(ngx_http_lua_sema_t **psem,
+ int n, char **errmsg)
+{
+ ngx_http_lua_sema_t *sem;
+
+ sem = ngx_http_lua_alloc_sema();
+ if (sem == NULL) {
+ *errmsg = "no memory";
+ return NGX_ERROR;
+ }
+
+ ngx_queue_init(&sem->wait_queue);
+
+ sem->resource_count = n;
+ sem->wait_count = 0;
+ *psem = sem;
+
+ ngx_log_debug2(NGX_LOG_DEBUG_HTTP, ngx_cycle->log, 0,
+ "http lua semaphore new: %p, resources: %d",
+ sem, sem->resource_count);
+
+ return NGX_OK;
+}
+
+
+int
+ngx_http_lua_ffi_sema_post(ngx_http_lua_sema_t *sem, int n)
+{
+ ngx_log_debug3(NGX_LOG_DEBUG_HTTP, ngx_cycle->log, 0,
+ "http lua semaphore post: %p, n: %d, resources: %d",
+ sem, n, sem->resource_count);
+
+ sem->resource_count += n;
+
+ if (!ngx_queue_empty(&sem->wait_queue)) {
+ /* we need the extra parentheses around the first argument of
+ * ngx_post_event() just to work around macro issues in nginx
+ * cores older than nginx 1.7.12 (exclusive).
+ */
+ ngx_post_event((&sem->sem_event), &ngx_posted_events);
+ }
+
+ return NGX_OK;
+}
+
+
+int
+ngx_http_lua_ffi_sema_wait(ngx_http_request_t *r,
+ ngx_http_lua_sema_t *sem, int wait_ms, u_char *err, size_t *errlen)
+{
+ ngx_http_lua_ctx_t *ctx;
+ ngx_http_lua_co_ctx_t *wait_co_ctx;
+ ngx_int_t rc;
+
+ ngx_log_debug4(NGX_LOG_DEBUG_HTTP, ngx_cycle->log, 0,
+ "http lua semaphore wait: %p, timeout: %d, "
+ "resources: %d, event posted: %d",
+ sem, wait_ms, sem->resource_count,
+#if (nginx_version >= 1007005)
+ (int) sem->sem_event.posted
+#else
+ sem->sem_event.prev ? 1 : 0
+#endif
+ );
+
+ ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module);
+ if (ctx == NULL) {
+ *errlen = ngx_snprintf(err, *errlen, "no request ctx found") - err;
+ return NGX_ERROR;
+ }
+
+ rc = ngx_http_lua_ffi_check_context(ctx, NGX_HTTP_LUA_CONTEXT_YIELDABLE,
+ err, errlen);
+
+ if (rc != NGX_OK) {
+ return NGX_ERROR;
+ }
+
+ /* we keep the order, will first resume the thread waiting for the
+ * longest time in ngx_http_lua_sema_handler
+ */
+
+ if (ngx_queue_empty(&sem->wait_queue) && sem->resource_count > 0) {
+ sem->resource_count--;
+ return NGX_OK;
+ }
+
+ if (wait_ms == 0) {
+ return NGX_DECLINED;
+ }
+
+ sem->wait_count++;
+ wait_co_ctx = ctx->cur_co_ctx;
+
+ wait_co_ctx->sleep.handler = ngx_http_lua_sema_timeout_handler;
+ wait_co_ctx->sleep.data = ctx->cur_co_ctx;
+ wait_co_ctx->sleep.log = r->connection->log;
+
+ ngx_add_timer(&wait_co_ctx->sleep, (ngx_msec_t) wait_ms);
+
+ dd("ngx_http_lua_ffi_sema_wait add timer coctx:%p wait: %d(ms)",
+ wait_co_ctx, wait_ms);
+
+ ngx_queue_insert_tail(&sem->wait_queue, &wait_co_ctx->sem_wait_queue);
+
+ wait_co_ctx->data = sem;
+ wait_co_ctx->cleanup = ngx_http_lua_sema_cleanup;
+
+ ngx_log_debug0(NGX_LOG_DEBUG_HTTP, ngx_cycle->log, 0,
+ "http lua semaphore wait yielding");
+
+ return NGX_AGAIN;
+}
+
+
+int
+ngx_http_lua_ffi_sema_count(ngx_http_lua_sema_t *sem)
+{
+ return sem->resource_count - sem->wait_count;
+}
+
+
+static void
+ngx_http_lua_sema_cleanup(void *data)
+{
+ ngx_http_lua_co_ctx_t *coctx = data;
+ ngx_queue_t *q;
+ ngx_http_lua_sema_t *sem;
+
+ sem = coctx->data;
+
+ ngx_log_debug0(NGX_LOG_DEBUG_HTTP, ngx_cycle->log, 0,
+ "http lua semaphore cleanup");
+
+ if (coctx->sleep.timer_set) {
+ ngx_del_timer(&coctx->sleep);
+ }
+
+ q = &coctx->sem_wait_queue;
+
+ ngx_queue_remove(q);
+ sem->wait_count--;
+ coctx->cleanup = NULL;
+}
+
+
+static void
+ngx_http_lua_sema_handler(ngx_event_t *ev)
+{
+ ngx_http_lua_sema_t *sem;
+ ngx_http_request_t *r;
+ ngx_http_lua_ctx_t *ctx;
+ ngx_http_lua_co_ctx_t *wait_co_ctx;
+ ngx_connection_t *c;
+ ngx_queue_t *q;
+
+ sem = ev->data;
+
+ ngx_log_debug2(NGX_LOG_DEBUG_HTTP, ngx_cycle->log, 0,
+ "semaphore handler: wait queue: %sempty, resource count: %d",
+ ngx_queue_empty(&sem->wait_queue) ? "" : "not ",
+ sem->resource_count);
+ while (!ngx_queue_empty(&sem->wait_queue) && sem->resource_count > 0) {
+ q = ngx_queue_head(&sem->wait_queue);
+ ngx_queue_remove(q);
+
+ sem->wait_count--;
+
+ wait_co_ctx = ngx_queue_data(q, ngx_http_lua_co_ctx_t, sem_wait_queue);
+ wait_co_ctx->cleanup = NULL;
+
+ if (wait_co_ctx->sleep.timer_set) {
+ ngx_del_timer(&wait_co_ctx->sleep);
+ }
+
+ r = ngx_http_lua_get_req(wait_co_ctx->co);
+ c = r->connection;
+
+ ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module);
+ ngx_http_lua_assert(ctx != NULL);
+
+ sem->resource_count--;
+
+ ctx->cur_co_ctx = wait_co_ctx;
+
+ wait_co_ctx->sem_resume_status = SEMAPHORE_WAIT_SUCC;
+
+ if (ctx->entered_content_phase) {
+ (void) ngx_http_lua_sema_resume(r);
+
+ } else {
+ ctx->resume_handler = ngx_http_lua_sema_resume;
+ ngx_http_core_run_phases(r);
+ }
+
+ ngx_http_run_posted_requests(c);
+ }
+}
+
+
+static void
+ngx_http_lua_sema_timeout_handler(ngx_event_t *ev)
+{
+ ngx_http_lua_co_ctx_t *wait_co_ctx;
+ ngx_http_request_t *r;
+ ngx_http_lua_ctx_t *ctx;
+ ngx_connection_t *c;
+ ngx_http_lua_sema_t *sem;
+
+ wait_co_ctx = ev->data;
+ wait_co_ctx->cleanup = NULL;
+
+ dd("ngx_http_lua_sema_timeout_handler timeout coctx:%p", wait_co_ctx);
+
+ sem = wait_co_ctx->data;
+
+ ngx_queue_remove(&wait_co_ctx->sem_wait_queue);
+ sem->wait_count--;
+
+ r = ngx_http_lua_get_req(wait_co_ctx->co);
+ c = r->connection;
+
+ ctx = ngx_http_get_module_ctx(r, ngx_http_lua_module);
+ ngx_http_lua_assert(ctx != NULL);
+
+ ctx->cur_co_ctx = wait_co_ctx;
+
+ wait_co_ctx->sem_resume_status = SEMAPHORE_WAIT_TIMEOUT;
+
+ if (ctx->entered_content_phase) {
+ (void) ngx_http_lua_sema_resume(r);
+
+ } else {
+ ctx->resume_handler = ngx_http_lua_sema_resume;
+ ngx_http_core_run_phases(r);
+ }
+
+ ngx_http_run_posted_requests(c);
+}
+
+
+void
+ngx_http_lua_ffi_sema_gc(ngx_http_lua_sema_t *sem)
+{
+ ngx_log_debug1(NGX_LOG_DEBUG_HTTP, ngx_cycle->log, 0,
+ "in lua gc, semaphore %p", sem);
+
+ if (sem == NULL) {
+ return;
+ }
+
+ if (!ngx_terminate
+ && !ngx_quit
+ && !ngx_queue_empty(&sem->wait_queue))
+ {
+ ngx_log_error(NGX_LOG_ERR, ngx_cycle->log, 0,
+ "in lua semaphore gc wait queue is"
+ " not empty while the semaphore %p is being "
+ "destroyed", sem);
+ }
+
+ if (sem->sem_event.posted) {
+ ngx_delete_posted_event(&sem->sem_event);
+ }
+
+ ngx_http_lua_free_sema(sem);
+}
+
+
+/* vi:set ft=c ts=4 sw=4 et fdm=marker: */