From 92576fdcfd3e32dce63b533ab20d4974136b097d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Sat, 16 Mar 2024 00:59:18 +0000 Subject: [PATCH] fix(ext/node): support MessagePort in `WorkerOptions.workerData` (#22950) This commit fixes passing `MessagePort` instances to `WorkerOptions.workerData`. Before they were not serialized and deserialized properly when spawning a worker thread. Closes https://github.com/denoland/deno/issues/22935 --- ext/web/lib.rs | 3 ++ ext/web/message_port.rs | 11 +++--- runtime/ops/worker_host.rs | 15 ++++++-- runtime/web_worker.rs | 26 +++++++++++--- tests/unit_node/worker_threads_test.ts | 49 ++++++++++++++++++++++++++ 5 files changed, 92 insertions(+), 12 deletions(-) diff --git a/ext/web/lib.rs b/ext/web/lib.rs index 60a0cc0d72..74ed78c7e4 100644 --- a/ext/web/lib.rs +++ b/ext/web/lib.rs @@ -43,12 +43,15 @@ pub use crate::blob::BlobStore; pub use crate::blob::InMemoryBlobPart; pub use crate::message_port::create_entangled_message_port; +pub use crate::message_port::deserialize_js_transferables; use crate::message_port::op_message_port_create_entangled; use crate::message_port::op_message_port_post_message; use crate::message_port::op_message_port_recv_message; use crate::message_port::op_message_port_recv_message_sync; +pub use crate::message_port::serialize_transferables; pub use crate::message_port::JsMessageData; pub use crate::message_port::MessagePort; +pub use crate::message_port::Transferable; use crate::timers::op_defer; use crate::timers::op_now; diff --git a/ext/web/message_port.rs b/ext/web/message_port.rs index 18429a1795..1cd29c64d8 100644 --- a/ext/web/message_port.rs +++ b/ext/web/message_port.rs @@ -22,7 +22,7 @@ use tokio::sync::mpsc::unbounded_channel; use tokio::sync::mpsc::UnboundedReceiver; use tokio::sync::mpsc::UnboundedSender; -enum Transferable { +pub enum Transferable { MessagePort(MessagePort), ArrayBuffer(u32), } @@ -140,7 +140,7 @@ pub enum JsTransferable { ArrayBuffer(u32), } -fn deserialize_js_transferables( +pub fn deserialize_js_transferables( state: &mut OpState, js_transferables: Vec, ) -> Result, AnyError> { @@ -165,7 +165,7 @@ fn deserialize_js_transferables( Ok(transferables) } -fn serialize_transferables( +pub fn serialize_transferables( state: &mut OpState, transferables: Vec, ) -> Vec { @@ -189,8 +189,8 @@ fn serialize_transferables( #[derive(Deserialize, Serialize)] pub struct JsMessageData { - data: DetachedBuffer, - transferables: Vec, + pub data: DetachedBuffer, + pub transferables: Vec, } #[op2] @@ -208,7 +208,6 @@ pub fn op_message_port_post_message( } let resource = state.resource_table.get::(rid)?; - resource.port.send(state, data) } diff --git a/runtime/ops/worker_host.rs b/runtime/ops/worker_host.rs index 242d3bcda8..e3360b8305 100644 --- a/runtime/ops/worker_host.rs +++ b/runtime/ops/worker_host.rs @@ -11,6 +11,7 @@ use crate::web_worker::WebWorkerHandle; use crate::web_worker::WebWorkerType; use crate::web_worker::WorkerControlEvent; use crate::web_worker::WorkerId; +use crate::web_worker::WorkerMetadata; use crate::worker::FormatJsErrorFn; use deno_core::error::AnyError; use deno_core::op2; @@ -19,6 +20,7 @@ use deno_core::CancelFuture; use deno_core::CancelHandle; use deno_core::ModuleSpecifier; use deno_core::OpState; +use deno_web::deserialize_js_transferables; use deno_web::JsMessageData; use log::debug; use std::cell::RefCell; @@ -36,7 +38,7 @@ pub struct CreateWebWorkerArgs { pub main_module: ModuleSpecifier, pub worker_type: WebWorkerType, pub close_on_idle: bool, - pub maybe_worker_metadata: Option, + pub maybe_worker_metadata: Option, } pub type CreateWebWorkerCb = dyn Fn(CreateWebWorkerArgs) -> (WebWorker, SendableWebWorkerHandle) @@ -175,7 +177,16 @@ fn op_create_worker( // Setup new thread let thread_builder = std::thread::Builder::new().name(format!("{worker_id}")); - + let maybe_worker_metadata = if let Some(data) = maybe_worker_metadata { + let transferables = + deserialize_js_transferables(state, data.transferables)?; + Some(WorkerMetadata { + buffer: data.data, + transferables, + }) + } else { + None + }; // Spawn it thread_builder.spawn(move || { // Any error inside this block is terminal: diff --git a/runtime/web_worker.rs b/runtime/web_worker.rs index 55749ca27f..27fe633ad4 100644 --- a/runtime/web_worker.rs +++ b/runtime/web_worker.rs @@ -27,6 +27,7 @@ use deno_core::serde_json::json; use deno_core::v8; use deno_core::CancelHandle; use deno_core::CompiledWasmModuleStore; +use deno_core::DetachedBuffer; use deno_core::Extension; use deno_core::FeatureChecker; use deno_core::GetErrorClassFn; @@ -47,9 +48,11 @@ use deno_kv::dynamic::MultiBackendDbHandler; use deno_terminal::colors; use deno_tls::RootCertStoreProvider; use deno_web::create_entangled_message_port; +use deno_web::serialize_transferables; use deno_web::BlobStore; use deno_web::JsMessageData; use deno_web::MessagePort; +use deno_web::Transferable; use log::debug; use std::cell::RefCell; use std::fmt; @@ -61,6 +64,11 @@ use std::sync::Arc; use std::task::Context; use std::task::Poll; +pub struct WorkerMetadata { + pub buffer: DetachedBuffer, + pub transferables: Vec, +} + static WORKER_ID_COUNTER: AtomicU32 = AtomicU32::new(1); #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] @@ -343,7 +351,7 @@ pub struct WebWorker { has_message_event_listener_fn: Option>, bootstrap_fn_global: Option>, // Consumed when `bootstrap_fn` is called - maybe_worker_metadata: Option, + maybe_worker_metadata: Option, } pub struct WebWorkerOptions { @@ -371,7 +379,7 @@ pub struct WebWorkerOptions { pub feature_checker: Arc, pub strace_ops: Option>, pub close_on_idle: bool, - pub maybe_worker_metadata: Option, + pub maybe_worker_metadata: Option, } impl WebWorker { @@ -622,7 +630,8 @@ impl WebWorker { } pub fn bootstrap(&mut self, options: &BootstrapOptions) { - self.js_runtime.op_state().borrow_mut().put(options.clone()); + let op_state = self.js_runtime.op_state(); + op_state.borrow_mut().put(options.clone()); // Instead of using name for log we use `worker-${id}` because // WebWorkers can have empty string as name. { @@ -633,7 +642,16 @@ impl WebWorker { let undefined = v8::undefined(scope); let mut worker_data: v8::Local = v8::undefined(scope).into(); if let Some(data) = self.maybe_worker_metadata.take() { - worker_data = deno_core::serde_v8::to_v8(scope, data).unwrap(); + let js_transferables = serialize_transferables( + &mut op_state.borrow_mut(), + data.transferables, + ); + let js_message_data = JsMessageData { + data: data.buffer, + transferables: js_transferables, + }; + worker_data = + deno_core::serde_v8::to_v8(scope, js_message_data).unwrap(); } let name_str: v8::Local = v8::String::new(scope, &self.name).unwrap().into(); diff --git a/tests/unit_node/worker_threads_test.ts b/tests/unit_node/worker_threads_test.ts index 1ded9a591f..f2ce00c847 100644 --- a/tests/unit_node/worker_threads_test.ts +++ b/tests/unit_node/worker_threads_test.ts @@ -238,3 +238,52 @@ Deno.test({ }, sanitizeResources: false, }); + +Deno.test({ + name: "[worker_threads] Worker workerData with MessagePort", + async fn() { + const { port1: mainPort, port2: workerPort } = new workerThreads + .MessageChannel(); + const deferred = Promise.withResolvers(); + const worker = new workerThreads.Worker( + ` + import { + isMainThread, + MessageChannel, + parentPort, + receiveMessageOnPort, + Worker, + workerData, + } from "node:worker_threads"; + parentPort.on("message", (msg) => { + console.log("message from main", msg); + parentPort.postMessage("Hello from worker on parentPort!"); + workerData.workerPort.postMessage("Hello from worker on workerPort!"); + }); + `, + { + eval: true, + workerData: { workerPort }, + transferList: [workerPort], + }, + ); + + worker.on("message", (data) => { + assertEquals(data, "Hello from worker on parentPort!"); + // TODO(bartlomieju): it would be better to use `mainPort.on("message")`, + // but we currently don't support it. + // https://github.com/denoland/deno/issues/22951 + // Wait a bit so the message can arrive. + setTimeout(() => { + const msg = workerThreads.receiveMessageOnPort(mainPort)!.message; + assertEquals(msg, "Hello from worker on workerPort!"); + deferred.resolve(); + }, 500); + }); + + worker.postMessage("Hello from parent"); + await deferred.promise; + await worker.terminate(); + mainPort.close(); + }, +});