0
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-03-03 09:31:22 -05:00

fix(ext/node): worker_threads doesn't exit if there are message listeners (#22944)

Closes https://github.com/denoland/deno/issues/22934
This commit is contained in:
Bartek Iwańczuk 2024-03-15 20:38:16 +00:00 committed by GitHub
parent e40f9a5c14
commit c342cd36ba
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 91 additions and 36 deletions

View file

@ -610,7 +610,6 @@ impl CliMainWorkerFactory {
disable_deprecated_api_warning: shared.disable_deprecated_api_warning,
verbose_deprecated_api_warning: shared.verbose_deprecated_api_warning,
future: shared.enable_future_features,
close_on_idle: true,
},
extensions: custom_extensions,
startup_snapshot: crate::js::deno_isolate_init(),
@ -815,7 +814,6 @@ fn create_web_worker_callback(
disable_deprecated_api_warning: shared.disable_deprecated_api_warning,
verbose_deprecated_api_warning: shared.verbose_deprecated_api_warning,
future: false,
close_on_idle: args.close_on_idle,
},
extensions: vec![],
startup_snapshot: crate::js::deno_isolate_init(),

View file

@ -279,7 +279,10 @@ function postMessage(message, transferOrOptions = {}) {
let isClosing = false;
let globalDispatchEvent;
let closeOnIdle;
function hasMessageEventListener() {
return event.listenerCount(globalThis, "message") > 0;
}
async function pollForMessages() {
if (!globalDispatchEvent) {
@ -289,14 +292,7 @@ async function pollForMessages() {
);
}
while (!isClosing) {
const op = op_worker_recv_message();
// In a Node.js worker, unref() the op promise to prevent it from
// keeping the event loop alive. This avoids the need to explicitly
// call self.close() or worker.terminate().
if (closeOnIdle) {
core.unrefOpPromise(op);
}
const data = await op;
const data = await op_worker_recv_message();
if (data === null) break;
const v = messagePort.deserializeJsMessageData(data);
const message = v[0];
@ -813,7 +809,6 @@ function bootstrapWorkerRuntime(
7: shouldDisableDeprecatedApiWarning,
8: shouldUseVerboseDeprecatedApiWarning,
9: _future,
10: closeOnIdle_,
} = runtimeOptions;
deprecatedApiWarningDisabled = shouldDisableDeprecatedApiWarning;
@ -875,8 +870,8 @@ function bootstrapWorkerRuntime(
location.setLocationHref(location_);
closeOnIdle = closeOnIdle_;
globalThis.pollForMessages = pollForMessages;
globalThis.hasMessageEventListener = hasMessageEventListener;
// TODO(bartlomieju): deprecate --unstable
if (unstableFlag) {

View file

@ -1,6 +1,7 @@
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
use crate::inspector_server::InspectorServer;
use crate::ops;
use crate::ops::worker_host::WorkersTable;
use crate::permissions::PermissionsContainer;
use crate::shared::maybe_transpile_source;
use crate::shared::runtime;
@ -13,7 +14,6 @@ use crate::BootstrapOptions;
use deno_broadcast_channel::InMemoryBroadcastChannel;
use deno_cache::CreateCache;
use deno_cache::SqliteBackedCache;
use deno_core::ascii_str;
use deno_core::error::AnyError;
use deno_core::error::JsError;
use deno_core::futures::channel::mpsc;
@ -335,10 +335,12 @@ pub struct WebWorker {
pub js_runtime: JsRuntime,
pub name: String,
close_on_idle: bool,
has_executed_main_module: bool,
internal_handle: WebWorkerInternalHandle,
pub worker_type: WebWorkerType,
pub main_module: ModuleSpecifier,
poll_for_messages_fn: Option<v8::Global<v8::Value>>,
has_message_event_listener_fn: Option<v8::Global<v8::Value>>,
bootstrap_fn_global: Option<v8::Global<v8::Function>>,
// Consumed when `bootstrap_fn` is called
maybe_worker_metadata: Option<JsMessageData>,
@ -609,8 +611,10 @@ impl WebWorker {
worker_type: options.worker_type,
main_module,
poll_for_messages_fn: None,
has_message_event_listener_fn: None,
bootstrap_fn_global: Some(bootstrap_fn_global),
close_on_idle: options.close_on_idle,
has_executed_main_module: false,
maybe_worker_metadata: options.maybe_worker_metadata,
},
external_handle,
@ -646,22 +650,32 @@ impl WebWorker {
&[args, name_str, id_str, id, worker_data],
)
.unwrap();
let context = scope.get_current_context();
let global = context.global(scope);
let poll_for_messages_str =
v8::String::new_external_onebyte_static(scope, b"pollForMessages")
.unwrap();
let poll_for_messages_fn = global
.get(scope, poll_for_messages_str.into())
.expect("get globalThis.pollForMessages");
global.delete(scope, poll_for_messages_str.into());
self.poll_for_messages_fn =
Some(v8::Global::new(scope, poll_for_messages_fn));
let has_message_event_listener_str =
v8::String::new_external_onebyte_static(
scope,
b"hasMessageEventListener",
)
.unwrap();
let has_message_event_listener_fn = global
.get(scope, has_message_event_listener_str.into())
.expect("get globalThis.hasMessageEventListener");
global.delete(scope, has_message_event_listener_str.into());
self.has_message_event_listener_fn =
Some(v8::Global::new(scope, has_message_event_listener_fn));
}
// TODO(bartlomieju): this could be done using V8 API, without calling `execute_script`.
// Save a reference to function that will start polling for messages
// from a worker host; it will be called after the user code is loaded.
let script = ascii_str!(
r#"
const pollForMessages = globalThis.pollForMessages;
delete globalThis.pollForMessages;
pollForMessages
"#
);
let poll_for_messages_fn = self
.js_runtime
.execute_script(located_script_name!(), script)
.expect("Failed to execute worker bootstrap script");
self.poll_for_messages_fn = Some(poll_for_messages_fn);
}
/// See [JsRuntime::execute_script](deno_core::JsRuntime::execute_script)
@ -730,6 +744,7 @@ impl WebWorker {
maybe_result = &mut receiver => {
debug!("received worker module evaluate {:#?}", maybe_result);
self.has_executed_main_module = true;
maybe_result
}
@ -781,7 +796,22 @@ impl WebWorker {
Poll::Ready(Ok(()))
}
}
Poll::Pending => Poll::Pending,
Poll::Pending => {
// This is special code path for workers created from `node:worker_threads`
// module that have different semantics than Web workers.
// We want the worker thread to terminate automatically if we've done executing
// Top-Level await, there are no child workers spawned by that workers
// and there's no "message" event listener.
if self.close_on_idle
&& self.has_executed_main_module
&& !self.has_child_workers()
&& !self.has_message_event_listener()
{
Poll::Ready(Ok(()))
} else {
Poll::Pending
}
}
}
}
@ -803,6 +833,31 @@ impl WebWorker {
// This call may return `None` if worker is terminated.
fn_.call(scope, undefined.into(), &[]);
}
fn has_message_event_listener(&mut self) -> bool {
let has_message_event_listener_fn =
self.has_message_event_listener_fn.as_ref().unwrap();
let scope = &mut self.js_runtime.handle_scope();
let has_message_event_listener =
v8::Local::<v8::Value>::new(scope, has_message_event_listener_fn);
let fn_ =
v8::Local::<v8::Function>::try_from(has_message_event_listener).unwrap();
let undefined = v8::undefined(scope);
// This call may return `None` if worker is terminated.
match fn_.call(scope, undefined.into(), &[]) {
Some(result) => result.is_true(),
None => false,
}
}
fn has_child_workers(&mut self) -> bool {
!self
.js_runtime
.op_state()
.borrow()
.borrow::<WorkersTable>()
.is_empty()
}
}
fn print_worker_error(

View file

@ -63,7 +63,6 @@ pub struct BootstrapOptions {
pub disable_deprecated_api_warning: bool,
pub verbose_deprecated_api_warning: bool,
pub future: bool,
pub close_on_idle: bool,
}
impl Default for BootstrapOptions {
@ -95,7 +94,6 @@ impl Default for BootstrapOptions {
disable_deprecated_api_warning: false,
verbose_deprecated_api_warning: false,
future: false,
close_on_idle: false,
}
}
}
@ -131,8 +129,6 @@ struct BootstrapV8<'a>(
bool,
// future
bool,
// close_on_idle
bool,
);
impl BootstrapOptions {
@ -155,7 +151,6 @@ impl BootstrapOptions {
self.disable_deprecated_api_warning,
self.verbose_deprecated_api_warning,
self.future,
self.close_on_idle,
);
bootstrap.serialize(ser).unwrap()

View file

@ -119,6 +119,7 @@ itest!(worker_ids_are_sequential {
});
// Test for https://github.com/denoland/deno/issues/22629
// Test for https://github.com/denoland/deno/issues/22934
itest!(node_worker_auto_exits {
args: "run --quiet --allow-read workers/node_worker_auto_exits.mjs",
output: "workers/node_worker_auto_exits.mjs.out",

View file

@ -1,9 +1,19 @@
import { isMainThread, Worker } from "node:worker_threads";
import { isMainThread, parentPort, Worker } from "node:worker_threads";
function onMessageOneshot() {
console.log("Got message from main thread!");
parentPort.off("message", onMessageOneshot);
}
if (isMainThread) {
// This re-loads the current file inside a Worker instance.
const w = new Worker(import.meta.filename);
setTimeout(() => {
w.postMessage("Hello! I am from the main thread.");
}, 500);
} else {
console.log("Inside Worker!");
console.log(isMainThread); // Prints 'false'.
parentPort.on("message", onMessageOneshot);
}

View file

@ -1,2 +1,3 @@
Inside Worker!
false
Got message from main thread!