]> git.kaiwu.me - quickjs.git/commitdiff
enabled os.Worker on Windows (bnoordhuis)
authorFabrice Bellard <fabrice@bellard.org>
Mon, 14 Apr 2025 17:13:57 +0000 (19:13 +0200)
committerFabrice Bellard <fabrice@bellard.org>
Mon, 14 Apr 2025 17:13:57 +0000 (19:13 +0200)
quickjs-libc.c

index a9fff6a0330f5e4e972665cf4f7af83ee2e2bff0..0788d8c707b8ca55f41f12225720535356718a9b 100644 (file)
@@ -64,10 +64,8 @@ typedef sig_t sighandler_t;
 
 #endif
 
-#if !defined(_WIN32)
-/* enable the os.Worker API. IT relies on POSIX threads */
+/* enable the os.Worker API. It relies on POSIX threads */
 #define USE_WORKER
-#endif
 
 #ifdef USE_WORKER
 #include <pthread.h>
@@ -114,14 +112,22 @@ typedef struct {
     size_t sab_tab_len;
 } JSWorkerMessage;
 
+typedef struct JSWaker {
+#ifdef _WIN32
+    HANDLE handle;
+#else
+    int read_fd;
+    int write_fd;
+#endif
+} JSWaker;
+
 typedef struct {
     int ref_count;
 #ifdef USE_WORKER
     pthread_mutex_t mutex;
 #endif
     struct list_head msg_queue; /* list of JSWorkerMessage.link */
-    int read_fd;
-    int write_fd;
+    JSWaker waker;
 } JSWorkerMessagePipe;
 
 typedef struct {
@@ -2138,82 +2144,81 @@ static void call_handler(JSContext *ctx, JSValueConst func)
     JS_FreeValue(ctx, ret);
 }
 
-#if defined(_WIN32)
+#ifdef USE_WORKER
 
-static int js_os_poll(JSContext *ctx)
+#ifdef _WIN32
+
+static int js_waker_init(JSWaker *w)
 {
-    JSRuntime *rt = JS_GetRuntime(ctx);
-    JSThreadState *ts = JS_GetRuntimeOpaque(rt);
-    int min_delay, console_fd;
-    int64_t cur_time, delay;
-    JSOSRWHandler *rh;
-    struct list_head *el;
+    w->handle = CreateEvent(NULL, TRUE, FALSE, NULL);
+    return w->handle ? 0 : -1;
+}
 
-    /* XXX: handle signals if useful */
+static void js_waker_signal(JSWaker *w)
+{
+    SetEvent(w->handle);
+}
 
-    if (list_empty(&ts->os_rw_handlers) && list_empty(&ts->os_timers))
-        return -1; /* no more events */
+static void js_waker_clear(JSWaker *w)
+{
+    ResetEvent(w->handle);
+}
 
-    /* XXX: only timers and basic console input are supported */
-    if (!list_empty(&ts->os_timers)) {
-        cur_time = get_time_ms();
-        min_delay = 10000;
-        list_for_each(el, &ts->os_timers) {
-            JSOSTimer *th = list_entry(el, JSOSTimer, link);
-            delay = th->timeout - cur_time;
-            if (delay <= 0) {
-                JSValue func;
-                /* the timer expired */
-                func = th->func;
-                th->func = JS_UNDEFINED;
-                free_timer(rt, th);
-                call_handler(ctx, func);
-                JS_FreeValue(ctx, func);
-                return 0;
-            } else if (delay < min_delay) {
-                min_delay = delay;
-            }
-        }
-    } else {
-        min_delay = -1;
-    }
+static void js_waker_close(JSWaker *w)
+{
+    CloseHandle(w->handle);
+    w->handle = INVALID_HANDLE_VALUE;
+}
 
-    console_fd = -1;
-    list_for_each(el, &ts->os_rw_handlers) {
-        rh = list_entry(el, JSOSRWHandler, link);
-        if (rh->fd == 0 && !JS_IsNull(rh->rw_func[0])) {
-            console_fd = rh->fd;
+#else // !_WIN32
+
+static int js_waker_init(JSWaker *w)
+{
+    int fds[2];
+
+    if (pipe(fds) < 0)
+        return -1;
+    w->read_fd = fds[0];
+    w->write_fd = fds[1];
+    return 0;
+}
+
+static void js_waker_signal(JSWaker *w)
+{
+    int ret;
+
+    for(;;) {
+        ret = write(w->write_fd, "", 1);
+        if (ret == 1)
+            break;
+        if (ret < 0 && (errno != EAGAIN || errno != EINTR))
             break;
-        }
     }
+}
 
-    if (console_fd >= 0) {
-        DWORD ti, ret;
-        HANDLE handle;
-        if (min_delay == -1)
-            ti = INFINITE;
-        else
-            ti = min_delay;
-        handle = (HANDLE)_get_osfhandle(console_fd);
-        ret = WaitForSingleObject(handle, ti);
-        if (ret == WAIT_OBJECT_0) {
-            list_for_each(el, &ts->os_rw_handlers) {
-                rh = list_entry(el, JSOSRWHandler, link);
-                if (rh->fd == console_fd && !JS_IsNull(rh->rw_func[0])) {
-                    call_handler(ctx, rh->rw_func[0]);
-                    /* must stop because the list may have been modified */
-                    break;
-                }
-            }
-        }
-    } else {
-        Sleep(min_delay);
+static void js_waker_clear(JSWaker *w)
+{
+    uint8_t buf[16];
+    int ret;
+
+    for(;;) {
+        ret = read(w->read_fd, buf, sizeof(buf));
+        if (ret >= 0)
+            break;
+        if (errno != EAGAIN && errno != EINTR)
+            break;
     }
-    return 0;
 }
-#else
 
-#ifdef USE_WORKER
+static void js_waker_close(JSWaker *w)
+{
+    close(w->read_fd);
+    close(w->write_fd);
+    w->read_fd = -1;
+    w->write_fd = -1;
+}
+
+#endif // _WIN32
 
 static void js_free_message(JSWorkerMessage *msg);
 
@@ -2235,17 +2240,8 @@ static int handle_posted_message(JSRuntime *rt, JSContext *ctx,
         /* remove the message from the queue */
         list_del(&msg->link);
 
-        if (list_empty(&ps->msg_queue)) {
-            uint8_t buf[16];
-            int ret;
-            for(;;) {
-                ret = read(ps->read_fd, buf, sizeof(buf));
-                if (ret >= 0)
-                    break;
-                if (errno != EAGAIN && errno != EINTR)
-                    break;
-            }
-        }
+        if (list_empty(&ps->msg_queue))
+            js_waker_clear(&ps->waker);
 
         pthread_mutex_unlock(&ps->mutex);
 
@@ -2288,7 +2284,104 @@ static int handle_posted_message(JSRuntime *rt, JSContext *ctx,
 {
     return 0;
 }
-#endif
+#endif /* !USE_WORKER */
+
+#if defined(_WIN32)
+
+static int js_os_poll(JSContext *ctx)
+{
+    JSRuntime *rt = JS_GetRuntime(ctx);
+    JSThreadState *ts = JS_GetRuntimeOpaque(rt);
+    int min_delay, count;
+    int64_t cur_time, delay;
+    JSOSRWHandler *rh;
+    struct list_head *el;
+    HANDLE handles[MAXIMUM_WAIT_OBJECTS]; // 64
+
+    /* XXX: handle signals if useful */
+
+    if (list_empty(&ts->os_rw_handlers) && list_empty(&ts->os_timers) &&
+        list_empty(&ts->port_list)) {
+        return -1; /* no more events */
+    }
+    
+    if (!list_empty(&ts->os_timers)) {
+        cur_time = get_time_ms();
+        min_delay = 10000;
+        list_for_each(el, &ts->os_timers) {
+            JSOSTimer *th = list_entry(el, JSOSTimer, link);
+            delay = th->timeout - cur_time;
+            if (delay <= 0) {
+                JSValue func;
+                /* the timer expired */
+                func = th->func;
+                th->func = JS_UNDEFINED;
+                free_timer(rt, th);
+                call_handler(ctx, func);
+                JS_FreeValue(ctx, func);
+                return 0;
+            } else if (delay < min_delay) {
+                min_delay = delay;
+            }
+        }
+    } else {
+        min_delay = -1;
+    }
+
+    count = 0;
+    list_for_each(el, &ts->os_rw_handlers) {
+        rh = list_entry(el, JSOSRWHandler, link);
+        if (rh->fd == 0 && !JS_IsNull(rh->rw_func[0])) {
+            handles[count++] = (HANDLE)_get_osfhandle(rh->fd); // stdin
+            if (count == (int)countof(handles))
+                break;
+        }
+    }
+
+    list_for_each(el, &ts->port_list) {
+        JSWorkerMessageHandler *port = list_entry(el, JSWorkerMessageHandler, link);
+        if (JS_IsNull(port->on_message_func))
+            continue;
+        handles[count++] = port->recv_pipe->waker.handle;
+        if (count == (int)countof(handles))
+            break;
+    }
+
+    if (count > 0) {
+        DWORD ret, timeout = INFINITE;
+        if (min_delay != -1)
+            timeout = min_delay;
+        ret = WaitForMultipleObjects(count, handles, FALSE, timeout);
+
+        if (ret < count) {
+            list_for_each(el, &ts->os_rw_handlers) {
+                rh = list_entry(el, JSOSRWHandler, link);
+                if (rh->fd == 0 && !JS_IsNull(rh->rw_func[0])) {
+                    call_handler(ctx, rh->rw_func[0]);
+                    /* must stop because the list may have been modified */
+                    goto done;
+                }
+            }
+
+            list_for_each(el, &ts->port_list) {
+                JSWorkerMessageHandler *port = list_entry(el, JSWorkerMessageHandler, link);
+                if (!JS_IsNull(port->on_message_func)) {
+                    JSWorkerMessagePipe *ps = port->recv_pipe;
+                    if (ps->waker.handle == handles[ret]) {
+                        if (handle_posted_message(rt, ctx, port))
+                            goto done;
+                    }
+                }
+            }
+        }
+    } else {
+        Sleep(min_delay);
+    }
+ done:
+    return 0;
+}
+
+#else
 
 static int js_os_poll(JSContext *ctx)
 {
@@ -2364,8 +2457,8 @@ static int js_os_poll(JSContext *ctx)
         JSWorkerMessageHandler *port = list_entry(el, JSWorkerMessageHandler, link);
         if (!JS_IsNull(port->on_message_func)) {
             JSWorkerMessagePipe *ps = port->recv_pipe;
-            fd_max = max_int(fd_max, ps->read_fd);
-            FD_SET(ps->read_fd, &rfds);
+            fd_max = max_int(fd_max, ps->waker.read_fd);
+            FD_SET(ps->waker.read_fd, &rfds);
         }
     }
 
@@ -2391,14 +2484,14 @@ static int js_os_poll(JSContext *ctx)
             JSWorkerMessageHandler *port = list_entry(el, JSWorkerMessageHandler, link);
             if (!JS_IsNull(port->on_message_func)) {
                 JSWorkerMessagePipe *ps = port->recv_pipe;
-                if (FD_ISSET(ps->read_fd, &rfds)) {
+                if (FD_ISSET(ps->waker.read_fd, &rfds)) {
                     if (handle_posted_message(rt, ctx, port))
                         goto done;
                 }
             }
         }
     }
   done:
+ done:
     return 0;
 }
 #endif /* !_WIN32 */
@@ -3269,22 +3362,17 @@ static void js_sab_dup(void *opaque, void *ptr)
 static JSWorkerMessagePipe *js_new_message_pipe(void)
 {
     JSWorkerMessagePipe *ps;
-    int pipe_fds[2];
-
-    if (pipe(pipe_fds) < 0)
-        return NULL;
 
     ps = malloc(sizeof(*ps));
-    if (!ps) {
-        close(pipe_fds[0]);
-        close(pipe_fds[1]);
+    if (!ps)
+        return NULL;
+    if (js_waker_init(&ps->waker)) {
+        free(ps);
         return NULL;
     }
     ps->ref_count = 1;
     init_list_head(&ps->msg_queue);
     pthread_mutex_init(&ps->mutex, NULL);
-    ps->read_fd = pipe_fds[0];
-    ps->write_fd = pipe_fds[1];
     return ps;
 }
 
@@ -3323,8 +3411,7 @@ static void js_free_message_pipe(JSWorkerMessagePipe *ps)
             js_free_message(msg);
         }
         pthread_mutex_destroy(&ps->mutex);
-        close(ps->read_fd);
-        close(ps->write_fd);
+        js_waker_close(&ps->waker);
         free(ps);
     }
 }
@@ -3572,17 +3659,8 @@ static JSValue js_worker_postMessage(JSContext *ctx, JSValueConst this_val,
     ps = worker->send_pipe;
     pthread_mutex_lock(&ps->mutex);
     /* indicate that data is present */
-    if (list_empty(&ps->msg_queue)) {
-        uint8_t ch = '\0';
-        int ret;
-        for(;;) {
-            ret = write(ps->write_fd, &ch, 1);
-            if (ret == 1)
-                break;
-            if (ret < 0 && (errno != EAGAIN || errno != EINTR))
-                break;
-        }
-    }
+    if (list_empty(&ps->msg_queue))
+        js_waker_signal(&ps->waker);
     list_add_tail(&msg->link, &ps->msg_queue);
     pthread_mutex_unlock(&ps->mutex);
     return JS_UNDEFINED;