diff --git a/cli/worker.rs b/cli/worker.rs index 0bbc27b29f..c733f41321 100644 --- a/cli/worker.rs +++ b/cli/worker.rs @@ -612,6 +612,7 @@ impl CliMainWorkerFactory { serve_port: shared.options.serve_port, serve_host: shared.options.serve_host.clone(), otel_config: shared.otel_config.clone(), + close_on_idle: true, }, extensions: custom_extensions, startup_snapshot: crate::js::deno_isolate_init(), @@ -812,6 +813,7 @@ fn create_web_worker_callback( serve_port: shared.options.serve_port, serve_host: shared.options.serve_host.clone(), otel_config: shared.otel_config.clone(), + close_on_idle: args.close_on_idle, }, extensions: vec![], startup_snapshot: crate::js::deno_isolate_init(), diff --git a/ext/node/polyfills/worker_threads.ts b/ext/node/polyfills/worker_threads.ts index 1b175fb1dd..dc844169c5 100644 --- a/ext/node/polyfills/worker_threads.ts +++ b/ext/node/polyfills/worker_threads.ts @@ -21,7 +21,7 @@ import { nodeWorkerThreadCloseCb, refMessagePort, serializeJsMessageData, - unrefPollForMessages, + unrefParentPort, } from "ext:deno_web/13_message_port.js"; import * as webidl from "ext:deno_webidl/00_webidl.js"; import { notImplemented } from "ext:deno_node/_utils.ts"; @@ -451,10 +451,10 @@ internals.__initWorkerThreads = ( parentPort.emit("close"); }); parentPort.unref = () => { - parentPort[unrefPollForMessages] = true; + parentPort[unrefParentPort] = true; }; parentPort.ref = () => { - parentPort[unrefPollForMessages] = false; + parentPort[unrefParentPort] = false; }; if (isWorkerThread) { diff --git a/ext/web/13_message_port.js b/ext/web/13_message_port.js index cf72c43e6f..79fec9de2f 100644 --- a/ext/web/13_message_port.js +++ b/ext/web/13_message_port.js @@ -102,8 +102,8 @@ const nodeWorkerThreadCloseCb = Symbol("nodeWorkerThreadCloseCb"); const nodeWorkerThreadCloseCbInvoked = Symbol("nodeWorkerThreadCloseCbInvoked"); export const refMessagePort = Symbol("refMessagePort"); /** It is used by 99_main.js and worker_threads to - * unref/ref on the global pollForMessages promise. */ -export const unrefPollForMessages = Symbol("unrefPollForMessages"); + * unref/ref on the global message event handler count. */ +export const unrefParentPort = Symbol("unrefParentPort"); /** * @param {number} id diff --git a/runtime/js/99_main.js b/runtime/js/99_main.js index 19432745d4..bceb1f7ddb 100644 --- a/runtime/js/99_main.js +++ b/runtime/js/99_main.js @@ -170,12 +170,14 @@ function postMessage(message, transferOrOptions = { __proto__: null }) { let isClosing = false; let globalDispatchEvent; +let closeOnIdle; function hasMessageEventListener() { // the function name is kind of a misnomer, but we want to behave // as if we have message event listeners if a node message port is explicitly // refed (and the inverse as well) - return event.listenerCount(globalThis, "message") > 0 || + return (event.listenerCount(globalThis, "message") > 0 && + !globalThis[messagePort.unrefParentPort]) || messagePort.refedMessagePortsCount > 0; } @@ -188,7 +190,10 @@ async function pollForMessages() { } while (!isClosing) { const recvMessage = op_worker_recv_message(); - if (globalThis[messagePort.unrefPollForMessages] === true) { + // In a Node.js worker, unref() the op promise to prevent it from + // keeping the event loop alive. This avoids the need to explicitly + // call self.close() or worker.terminate(). + if (closeOnIdle) { core.unrefOpPromise(recvMessage); } const data = await recvMessage; @@ -915,6 +920,7 @@ function bootstrapWorkerRuntime( 6: argv0, 7: nodeDebug, 13: otelConfig, + 14: closeOnIdle_, } = runtimeOptions; performance.setTimeOrigin(); @@ -967,6 +973,7 @@ function bootstrapWorkerRuntime( globalThis.pollForMessages = pollForMessages; globalThis.hasMessageEventListener = hasMessageEventListener; + closeOnIdle = closeOnIdle_; for (let i = 0; i <= unstableFeatures.length; i++) { const id = unstableFeatures[i]; diff --git a/runtime/web_worker.rs b/runtime/web_worker.rs index e3a69b39c0..faf4f3fc52 100644 --- a/runtime/web_worker.rs +++ b/runtime/web_worker.rs @@ -58,7 +58,6 @@ use std::task::Poll; use crate::inspector_server::InspectorServer; use crate::ops; use crate::ops::process::NpmProcessStateProviderRc; -use crate::ops::worker_host::WorkersTable; use crate::shared::maybe_transpile_source; use crate::shared::runtime; use crate::tokio_util::create_and_run_current_thread; @@ -385,7 +384,6 @@ pub struct WebWorker { pub js_runtime: JsRuntime, pub name: String, close_on_idle: bool, - has_executed_main_module: bool, internal_handle: WebWorkerInternalHandle, pub worker_type: WebWorkerType, pub main_module: ModuleSpecifier, @@ -658,7 +656,6 @@ impl WebWorker { has_message_event_listener_fn: None, bootstrap_fn_global: Some(bootstrap_fn_global), close_on_idle: options.close_on_idle, - has_executed_main_module: false, maybe_worker_metadata: options.maybe_worker_metadata, }, external_handle, @@ -799,7 +796,6 @@ impl WebWorker { maybe_result = &mut receiver => { debug!("received worker module evaluate {:#?}", maybe_result); - self.has_executed_main_module = true; maybe_result } @@ -837,6 +833,9 @@ impl WebWorker { } if self.close_on_idle { + if self.has_message_event_listener() { + return Poll::Pending; + } return Poll::Ready(Ok(())); } @@ -851,22 +850,7 @@ impl WebWorker { Poll::Ready(Ok(())) } } - Poll::Pending => { - // This is special code path for workers created from `node:worker_threads` - // module that have different semantics than Web workers. - // We want the worker thread to terminate automatically if we've done executing - // Top-Level await, there are no child workers spawned by that workers - // and there's no "message" event listener. - if self.close_on_idle - && self.has_executed_main_module - && !self.has_child_workers() - && !self.has_message_event_listener() - { - Poll::Ready(Ok(())) - } else { - Poll::Pending - } - } + Poll::Pending => Poll::Pending, } } @@ -904,15 +888,6 @@ impl WebWorker { None => false, } } - - fn has_child_workers(&mut self) -> bool { - !self - .js_runtime - .op_state() - .borrow() - .borrow::() - .is_empty() - } } fn print_worker_error( diff --git a/runtime/worker_bootstrap.rs b/runtime/worker_bootstrap.rs index 2020c2bc8d..8364fe0d2b 100644 --- a/runtime/worker_bootstrap.rs +++ b/runtime/worker_bootstrap.rs @@ -120,6 +120,7 @@ pub struct BootstrapOptions { pub serve_port: Option, pub serve_host: Option, pub otel_config: OtelConfig, + pub close_on_idle: bool, } impl Default for BootstrapOptions { @@ -155,6 +156,7 @@ impl Default for BootstrapOptions { serve_port: Default::default(), serve_host: Default::default(), otel_config: Default::default(), + close_on_idle: false, } } } @@ -198,6 +200,8 @@ struct BootstrapV8<'a>( Option, // OTEL config Box<[u8]>, + // close on idle + bool, ); impl BootstrapOptions { @@ -225,6 +229,7 @@ impl BootstrapOptions { serve_is_main, serve_worker_count, self.otel_config.as_v8(), + self.close_on_idle, ); bootstrap.serialize(ser).unwrap() diff --git a/tests/specs/permission/allow_import_worker/denied.out b/tests/specs/permission/allow_import_worker/denied.out index 6e4dcaee09..af44ae21ee 100644 --- a/tests/specs/permission/allow_import_worker/denied.out +++ b/tests/specs/permission/allow_import_worker/denied.out @@ -3,5 +3,4 @@ await import(specifier); ^ at async file:///[WILDLINE] error: Uncaught (in promise) Error: Unhandled error in child worker. - at [WILDLINE] - at [WILDLINE] \ No newline at end of file + at [WILDCARD] \ No newline at end of file diff --git a/tests/unit_node/worker_threads_test.ts b/tests/unit_node/worker_threads_test.ts index 808fd6116e..5f38d51d4d 100644 --- a/tests/unit_node/worker_threads_test.ts +++ b/tests/unit_node/worker_threads_test.ts @@ -841,3 +841,26 @@ Deno.test({ assertEquals(result, true); }, }); + +Deno.test("[node/worker_threads] Worker runs async ops correctly", async () => { + const recvMessage = Promise.withResolvers(); + const timer = setTimeout(() => recvMessage.reject(), 1000); + const worker = new workerThreads.Worker( + ` + import { parentPort } from "node:worker_threads"; + setTimeout(() => { + parentPort.postMessage("Hello from worker"); + }, 10); + `, + { eval: true }, + ); + + worker.on("message", (msg) => { + assertEquals(msg, "Hello from worker"); + worker.terminate(); + recvMessage.resolve(); + clearTimeout(timer); + }); + + await recvMessage.promise; +});