mirror of
https://github.com/denoland/deno.git
synced 2025-01-22 06:09:25 -05:00
fix(ext/node): do not exit worker thread when there is pending async op (#27378)
This change fixes the premature exit of worker threads when there are still remaining pending ops. This change reuses the idea of #22647 (unref'ing `op_worker_recv_message` in worker threads if closeOnIdle specified) and uses `web_worker.has_message_event_listener` check in the opposite way as #22944. (Now we continue the worker when `has_message_event_listener` is true instead of stopping it when `has_message_event_listener` is false. closes #23061 closes #26154
This commit is contained in:
parent
1e38520e4e
commit
cc8e339c25
8 changed files with 49 additions and 38 deletions
|
@ -612,6 +612,7 @@ impl CliMainWorkerFactory {
|
|||
serve_port: shared.options.serve_port,
|
||||
serve_host: shared.options.serve_host.clone(),
|
||||
otel_config: shared.otel_config.clone(),
|
||||
close_on_idle: true,
|
||||
},
|
||||
extensions: custom_extensions,
|
||||
startup_snapshot: crate::js::deno_isolate_init(),
|
||||
|
@ -812,6 +813,7 @@ fn create_web_worker_callback(
|
|||
serve_port: shared.options.serve_port,
|
||||
serve_host: shared.options.serve_host.clone(),
|
||||
otel_config: shared.otel_config.clone(),
|
||||
close_on_idle: args.close_on_idle,
|
||||
},
|
||||
extensions: vec![],
|
||||
startup_snapshot: crate::js::deno_isolate_init(),
|
||||
|
|
|
@ -21,7 +21,7 @@ import {
|
|||
nodeWorkerThreadCloseCb,
|
||||
refMessagePort,
|
||||
serializeJsMessageData,
|
||||
unrefPollForMessages,
|
||||
unrefParentPort,
|
||||
} from "ext:deno_web/13_message_port.js";
|
||||
import * as webidl from "ext:deno_webidl/00_webidl.js";
|
||||
import { notImplemented } from "ext:deno_node/_utils.ts";
|
||||
|
@ -451,10 +451,10 @@ internals.__initWorkerThreads = (
|
|||
parentPort.emit("close");
|
||||
});
|
||||
parentPort.unref = () => {
|
||||
parentPort[unrefPollForMessages] = true;
|
||||
parentPort[unrefParentPort] = true;
|
||||
};
|
||||
parentPort.ref = () => {
|
||||
parentPort[unrefPollForMessages] = false;
|
||||
parentPort[unrefParentPort] = false;
|
||||
};
|
||||
|
||||
if (isWorkerThread) {
|
||||
|
|
|
@ -102,8 +102,8 @@ const nodeWorkerThreadCloseCb = Symbol("nodeWorkerThreadCloseCb");
|
|||
const nodeWorkerThreadCloseCbInvoked = Symbol("nodeWorkerThreadCloseCbInvoked");
|
||||
export const refMessagePort = Symbol("refMessagePort");
|
||||
/** It is used by 99_main.js and worker_threads to
|
||||
* unref/ref on the global pollForMessages promise. */
|
||||
export const unrefPollForMessages = Symbol("unrefPollForMessages");
|
||||
* unref/ref on the global message event handler count. */
|
||||
export const unrefParentPort = Symbol("unrefParentPort");
|
||||
|
||||
/**
|
||||
* @param {number} id
|
||||
|
|
|
@ -170,12 +170,14 @@ function postMessage(message, transferOrOptions = { __proto__: null }) {
|
|||
|
||||
let isClosing = false;
|
||||
let globalDispatchEvent;
|
||||
let closeOnIdle;
|
||||
|
||||
function hasMessageEventListener() {
|
||||
// the function name is kind of a misnomer, but we want to behave
|
||||
// as if we have message event listeners if a node message port is explicitly
|
||||
// refed (and the inverse as well)
|
||||
return event.listenerCount(globalThis, "message") > 0 ||
|
||||
return (event.listenerCount(globalThis, "message") > 0 &&
|
||||
!globalThis[messagePort.unrefParentPort]) ||
|
||||
messagePort.refedMessagePortsCount > 0;
|
||||
}
|
||||
|
||||
|
@ -188,7 +190,10 @@ async function pollForMessages() {
|
|||
}
|
||||
while (!isClosing) {
|
||||
const recvMessage = op_worker_recv_message();
|
||||
if (globalThis[messagePort.unrefPollForMessages] === true) {
|
||||
// In a Node.js worker, unref() the op promise to prevent it from
|
||||
// keeping the event loop alive. This avoids the need to explicitly
|
||||
// call self.close() or worker.terminate().
|
||||
if (closeOnIdle) {
|
||||
core.unrefOpPromise(recvMessage);
|
||||
}
|
||||
const data = await recvMessage;
|
||||
|
@ -915,6 +920,7 @@ function bootstrapWorkerRuntime(
|
|||
6: argv0,
|
||||
7: nodeDebug,
|
||||
13: otelConfig,
|
||||
14: closeOnIdle_,
|
||||
} = runtimeOptions;
|
||||
|
||||
performance.setTimeOrigin();
|
||||
|
@ -967,6 +973,7 @@ function bootstrapWorkerRuntime(
|
|||
|
||||
globalThis.pollForMessages = pollForMessages;
|
||||
globalThis.hasMessageEventListener = hasMessageEventListener;
|
||||
closeOnIdle = closeOnIdle_;
|
||||
|
||||
for (let i = 0; i <= unstableFeatures.length; i++) {
|
||||
const id = unstableFeatures[i];
|
||||
|
|
|
@ -58,7 +58,6 @@ use std::task::Poll;
|
|||
use crate::inspector_server::InspectorServer;
|
||||
use crate::ops;
|
||||
use crate::ops::process::NpmProcessStateProviderRc;
|
||||
use crate::ops::worker_host::WorkersTable;
|
||||
use crate::shared::maybe_transpile_source;
|
||||
use crate::shared::runtime;
|
||||
use crate::tokio_util::create_and_run_current_thread;
|
||||
|
@ -385,7 +384,6 @@ pub struct WebWorker {
|
|||
pub js_runtime: JsRuntime,
|
||||
pub name: String,
|
||||
close_on_idle: bool,
|
||||
has_executed_main_module: bool,
|
||||
internal_handle: WebWorkerInternalHandle,
|
||||
pub worker_type: WebWorkerType,
|
||||
pub main_module: ModuleSpecifier,
|
||||
|
@ -658,7 +656,6 @@ impl WebWorker {
|
|||
has_message_event_listener_fn: None,
|
||||
bootstrap_fn_global: Some(bootstrap_fn_global),
|
||||
close_on_idle: options.close_on_idle,
|
||||
has_executed_main_module: false,
|
||||
maybe_worker_metadata: options.maybe_worker_metadata,
|
||||
},
|
||||
external_handle,
|
||||
|
@ -799,7 +796,6 @@ impl WebWorker {
|
|||
|
||||
maybe_result = &mut receiver => {
|
||||
debug!("received worker module evaluate {:#?}", maybe_result);
|
||||
self.has_executed_main_module = true;
|
||||
maybe_result
|
||||
}
|
||||
|
||||
|
@ -837,6 +833,9 @@ impl WebWorker {
|
|||
}
|
||||
|
||||
if self.close_on_idle {
|
||||
if self.has_message_event_listener() {
|
||||
return Poll::Pending;
|
||||
}
|
||||
return Poll::Ready(Ok(()));
|
||||
}
|
||||
|
||||
|
@ -851,22 +850,7 @@ impl WebWorker {
|
|||
Poll::Ready(Ok(()))
|
||||
}
|
||||
}
|
||||
Poll::Pending => {
|
||||
// This is special code path for workers created from `node:worker_threads`
|
||||
// module that have different semantics than Web workers.
|
||||
// We want the worker thread to terminate automatically if we've done executing
|
||||
// Top-Level await, there are no child workers spawned by that workers
|
||||
// and there's no "message" event listener.
|
||||
if self.close_on_idle
|
||||
&& self.has_executed_main_module
|
||||
&& !self.has_child_workers()
|
||||
&& !self.has_message_event_listener()
|
||||
{
|
||||
Poll::Ready(Ok(()))
|
||||
} else {
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
Poll::Pending => Poll::Pending,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -904,15 +888,6 @@ impl WebWorker {
|
|||
None => false,
|
||||
}
|
||||
}
|
||||
|
||||
fn has_child_workers(&mut self) -> bool {
|
||||
!self
|
||||
.js_runtime
|
||||
.op_state()
|
||||
.borrow()
|
||||
.borrow::<WorkersTable>()
|
||||
.is_empty()
|
||||
}
|
||||
}
|
||||
|
||||
fn print_worker_error(
|
||||
|
|
|
@ -120,6 +120,7 @@ pub struct BootstrapOptions {
|
|||
pub serve_port: Option<u16>,
|
||||
pub serve_host: Option<String>,
|
||||
pub otel_config: OtelConfig,
|
||||
pub close_on_idle: bool,
|
||||
}
|
||||
|
||||
impl Default for BootstrapOptions {
|
||||
|
@ -155,6 +156,7 @@ impl Default for BootstrapOptions {
|
|||
serve_port: Default::default(),
|
||||
serve_host: Default::default(),
|
||||
otel_config: Default::default(),
|
||||
close_on_idle: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -198,6 +200,8 @@ struct BootstrapV8<'a>(
|
|||
Option<usize>,
|
||||
// OTEL config
|
||||
Box<[u8]>,
|
||||
// close on idle
|
||||
bool,
|
||||
);
|
||||
|
||||
impl BootstrapOptions {
|
||||
|
@ -225,6 +229,7 @@ impl BootstrapOptions {
|
|||
serve_is_main,
|
||||
serve_worker_count,
|
||||
self.otel_config.as_v8(),
|
||||
self.close_on_idle,
|
||||
);
|
||||
|
||||
bootstrap.serialize(ser).unwrap()
|
||||
|
|
|
@ -3,5 +3,4 @@ await import(specifier);
|
|||
^
|
||||
at async file:///[WILDLINE]
|
||||
error: Uncaught (in promise) Error: Unhandled error in child worker.
|
||||
at [WILDLINE]
|
||||
at [WILDLINE]
|
||||
at [WILDCARD]
|
|
@ -841,3 +841,26 @@ Deno.test({
|
|||
assertEquals(result, true);
|
||||
},
|
||||
});
|
||||
|
||||
Deno.test("[node/worker_threads] Worker runs async ops correctly", async () => {
|
||||
const recvMessage = Promise.withResolvers<void>();
|
||||
const timer = setTimeout(() => recvMessage.reject(), 1000);
|
||||
const worker = new workerThreads.Worker(
|
||||
`
|
||||
import { parentPort } from "node:worker_threads";
|
||||
setTimeout(() => {
|
||||
parentPort.postMessage("Hello from worker");
|
||||
}, 10);
|
||||
`,
|
||||
{ eval: true },
|
||||
);
|
||||
|
||||
worker.on("message", (msg) => {
|
||||
assertEquals(msg, "Hello from worker");
|
||||
worker.terminate();
|
||||
recvMessage.resolve();
|
||||
clearTimeout(timer);
|
||||
});
|
||||
|
||||
await recvMessage.promise;
|
||||
});
|
||||
|
|
Loading…
Add table
Reference in a new issue