From 6261c89e04b8f1a3aabc771dbc8cddad904710e9 Mon Sep 17 00:00:00 2001 From: Luca Casonato Date: Tue, 22 Jun 2021 16:30:16 +0200 Subject: [PATCH] feat: transfer MessagePort between workers (#11076) Add support for transferring `MessagePort`s between workers. --- cli/dts/lib.deno.shared_globals.d.ts | 2 +- cli/dts/lib.deno.worker.d.ts | 6 +- cli/main.rs | 4 +- cli/tests/workers/message_port.ts | 14 + cli/tests/workers/nonexistent_worker.out | 2 +- .../workers/permissions_blob_local.ts.out | 2 +- .../workers/permissions_blob_remote.ts.out | 2 +- .../workers/permissions_data_local.ts.out | 2 +- .../workers/permissions_data_remote.ts.out | 2 +- .../workers/permissions_dynamic_remote.ts.out | 2 +- .../workers/permissions_remote_remote.ts.out | 2 +- cli/tests/workers/test.ts | 53 ++++ cli/tests/workers/worker_error.ts.out | 2 +- cli/tests/workers/worker_nested_error.ts.out | 2 +- extensions/web/lib.rs | 2 + extensions/web/message_port.rs | 19 +- runtime/js/11_workers.js | 93 ++++-- runtime/js/99_main.js | 40 ++- runtime/ops/web_worker.rs | 36 +-- runtime/ops/worker_host.rs | 155 ++++----- runtime/web_worker.rs | 295 ++++++------------ 21 files changed, 382 insertions(+), 355 deletions(-) create mode 100644 cli/tests/workers/message_port.ts diff --git a/cli/dts/lib.deno.shared_globals.d.ts b/cli/dts/lib.deno.shared_globals.d.ts index 46154c64ea..be35fae014 100644 --- a/cli/dts/lib.deno.shared_globals.d.ts +++ b/cli/dts/lib.deno.shared_globals.d.ts @@ -386,7 +386,7 @@ declare class Worker extends EventTarget { specifier: string | URL, options?: WorkerOptions, ); - postMessage(message: any, transfer: ArrayBuffer[]): void; + postMessage(message: any, transfer: Transferable[]): void; postMessage(message: any, options?: PostMessageOptions): void; addEventListener( type: K, diff --git a/cli/dts/lib.deno.worker.d.ts b/cli/dts/lib.deno.worker.d.ts index eb8f6ebf10..7d8f6078b1 100644 --- a/cli/dts/lib.deno.worker.d.ts +++ b/cli/dts/lib.deno.worker.d.ts @@ -68,7 +68,8 @@ declare class DedicatedWorkerGlobalScope extends WorkerGlobalScope { | ((this: DedicatedWorkerGlobalScope, ev: MessageEvent) => any) | null; close(): void; - postMessage(message: any): void; + postMessage(message: any, transfer: Transferable[]): void; + postMessage(message: any, options?: PostMessageOptions): void; addEventListener( type: K, listener: ( @@ -105,7 +106,8 @@ declare var onmessageerror: | ((this: DedicatedWorkerGlobalScope, ev: MessageEvent) => any) | null; declare function close(): void; -declare function postMessage(message: any): void; +declare function postMessage(message: any, transfer: Transferable[]): void; +declare function postMessage(message: any, options?: PostMessageOptions): void; declare var navigator: WorkerNavigator; declare var onerror: | ((this: DedicatedWorkerGlobalScope, ev: ErrorEvent) => any) diff --git a/cli/main.rs b/cli/main.rs index 2586e9b60c..29151f14b1 100644 --- a/cli/main.rs +++ b/cli/main.rs @@ -125,7 +125,7 @@ fn create_web_worker_callback( broadcast_channel: program_state.broadcast_channel.clone(), }; - let mut worker = WebWorker::from_options( + let (mut worker, external_handle) = WebWorker::from_options( args.name, args.permissions, args.main_module, @@ -151,7 +151,7 @@ fn create_web_worker_callback( } worker.bootstrap(&options); - worker + (worker, external_handle) }) } diff --git a/cli/tests/workers/message_port.ts b/cli/tests/workers/message_port.ts new file mode 100644 index 0000000000..d78304a39c --- /dev/null +++ b/cli/tests/workers/message_port.ts @@ -0,0 +1,14 @@ +const channel = new MessageChannel(); + +channel.port2.onmessage = (e) => { + channel.port2.postMessage(e.data === "2"); + channel.port2.close(); +}; + +self.postMessage("1", [channel.port1]); + +self.onmessage = (e) => { + const port1 = e.ports[0]; + port1.postMessage(e.data === "3"); + port1.close(); +}; diff --git a/cli/tests/workers/nonexistent_worker.out b/cli/tests/workers/nonexistent_worker.out index e43b81c5f6..04b9f801da 100644 --- a/cli/tests/workers/nonexistent_worker.out +++ b/cli/tests/workers/nonexistent_worker.out @@ -1,3 +1,3 @@ [WILDCARD]error: Uncaught (in worker "") Cannot resolve module "file:///[WILDCARD]cli/tests/workers/doesnt_exist.js". error: Uncaught (in promise) Error: Unhandled error event reached main worker. - at Worker.#poll ([WILDCARD]) + at Worker.#pollControl ([WILDCARD]) diff --git a/cli/tests/workers/permissions_blob_local.ts.out b/cli/tests/workers/permissions_blob_local.ts.out index a6a34e3a27..0835777ecd 100644 --- a/cli/tests/workers/permissions_blob_local.ts.out +++ b/cli/tests/workers/permissions_blob_local.ts.out @@ -1,4 +1,4 @@ error: Uncaught (in worker "") Requires read access to "[WILDCARD]local_file.ts", run again with the --allow-read flag at blob:null/[WILDCARD]:1:0 error: Uncaught (in promise) Error: Unhandled error event reached main worker. - at Worker.#poll (deno:runtime/js/11_workers.js:243:23) + at Worker.#pollControl ([WILDCARD]) diff --git a/cli/tests/workers/permissions_blob_remote.ts.out b/cli/tests/workers/permissions_blob_remote.ts.out index 8bd2773611..2d01458ca4 100644 --- a/cli/tests/workers/permissions_blob_remote.ts.out +++ b/cli/tests/workers/permissions_blob_remote.ts.out @@ -1,4 +1,4 @@ error: Uncaught (in worker "") Requires net access to "example.com", run again with the --allow-net flag at blob:null/[WILDCARD]:1:0 error: Uncaught (in promise) Error: Unhandled error event reached main worker. - at Worker.#poll (deno:runtime/js/11_workers.js:243:23) + at Worker.#pollControl ([WILDCARD]) diff --git a/cli/tests/workers/permissions_data_local.ts.out b/cli/tests/workers/permissions_data_local.ts.out index 302ab99c85..2a6be2b57e 100644 --- a/cli/tests/workers/permissions_data_local.ts.out +++ b/cli/tests/workers/permissions_data_local.ts.out @@ -1,4 +1,4 @@ error: Uncaught (in worker "") Requires read access to "[WILDCARD]local_file.ts", run again with the --allow-read flag at data:application/javascript;base64,[WILDCARD]:1:0 error: Uncaught (in promise) Error: Unhandled error event reached main worker. - at Worker.#poll (deno:runtime/js/11_workers.js:243:23) + at Worker.#pollControl ([WILDCARD]) diff --git a/cli/tests/workers/permissions_data_remote.ts.out b/cli/tests/workers/permissions_data_remote.ts.out index 9b0ae44ccf..90677892a9 100644 --- a/cli/tests/workers/permissions_data_remote.ts.out +++ b/cli/tests/workers/permissions_data_remote.ts.out @@ -1,4 +1,4 @@ error: Uncaught (in worker "") Requires net access to "example.com", run again with the --allow-net flag at data:application/javascript;base64,aW1wb3J0ICJodHRwczovL2V4YW1wbGUuY29tL3NvbWUvZmlsZS50cyI7:1:0 error: Uncaught (in promise) Error: Unhandled error event reached main worker. - at Worker.#poll (deno:runtime/js/11_workers.js:243:23) + at Worker.#pollControl ([WILDCARD]) diff --git a/cli/tests/workers/permissions_dynamic_remote.ts.out b/cli/tests/workers/permissions_dynamic_remote.ts.out index e2c671c346..e68c96df1a 100644 --- a/cli/tests/workers/permissions_dynamic_remote.ts.out +++ b/cli/tests/workers/permissions_dynamic_remote.ts.out @@ -3,4 +3,4 @@ await import("https://example.com/some/file.ts"); ^ at async http://localhost:4545/cli/tests/workers/dynamic_remote.ts:2:1 [WILDCARD]error: Uncaught (in promise) Error: Unhandled error event reached main worker. - at Worker.#poll (deno:runtime/js/11_workers.js:243:23) + at Worker.#pollControl ([WILDCARD]) diff --git a/cli/tests/workers/permissions_remote_remote.ts.out b/cli/tests/workers/permissions_remote_remote.ts.out index 8b8820c7d8..5656b75a1c 100644 --- a/cli/tests/workers/permissions_remote_remote.ts.out +++ b/cli/tests/workers/permissions_remote_remote.ts.out @@ -1,4 +1,4 @@ error: Uncaught (in worker "") Requires net access to "example.com", run again with the --allow-net flag at http://localhost:4545/cli/tests/workers/static_remote.ts:2:0 error: Uncaught (in promise) Error: Unhandled error event reached main worker. - at Worker.#poll (deno:runtime/js/11_workers.js:243:23) + at Worker.#pollControl ([WILDCARD]) diff --git a/cli/tests/workers/test.ts b/cli/tests/workers/test.ts index 6a572b92f1..b37b7aeb14 100644 --- a/cli/tests/workers/test.ts +++ b/cli/tests/workers/test.ts @@ -769,3 +769,56 @@ Deno.test({ worker.terminate(); }, }); + +Deno.test({ + name: "worker with relative specifier", + fn: async function (): Promise { + assertEquals(location.href, "http://127.0.0.1:4545/cli/tests/"); + const promise = deferred(); + const w = new Worker( + "./workers/test_worker.ts", + { type: "module", name: "tsWorker" }, + ); + w.onmessage = (e): void => { + assertEquals(e.data, "Hello, world!"); + promise.resolve(); + }; + w.postMessage("Hello, world!"); + await promise; + w.terminate(); + }, +}); + +Deno.test({ + name: "Send MessagePorts from / to workers", + fn: async function (): Promise { + const result = deferred(); + const worker = new Worker( + new URL("message_port.ts", import.meta.url).href, + { type: "module" }, + ); + + const channel = new MessageChannel(); + + worker.onmessage = (e) => { + assertEquals(e.data, "1"); + assertEquals(e.ports.length, 1); + const port1 = e.ports[0]; + port1.onmessage = (e) => { + assertEquals(e.data, true); + port1.close(); + worker.postMessage("3", [channel.port1]); + }; + port1.postMessage("2"); + }; + + channel.port2.onmessage = (e) => { + assertEquals(e.data, true); + channel.port2.close(); + result.resolve(); + }; + + await result; + worker.terminate(); + }, +}); diff --git a/cli/tests/workers/worker_error.ts.out b/cli/tests/workers/worker_error.ts.out index 244e564175..4a8e92f00a 100644 --- a/cli/tests/workers/worker_error.ts.out +++ b/cli/tests/workers/worker_error.ts.out @@ -2,4 +2,4 @@ at foo ([WILDCARD]) at [WILDCARD] error: Uncaught (in promise) Error: Unhandled error event reached main worker. - at Worker.#poll ([WILDCARD]) + at Worker.#pollControl ([WILDCARD]) diff --git a/cli/tests/workers/worker_nested_error.ts.out b/cli/tests/workers/worker_nested_error.ts.out index 244e564175..4a8e92f00a 100644 --- a/cli/tests/workers/worker_nested_error.ts.out +++ b/cli/tests/workers/worker_nested_error.ts.out @@ -2,4 +2,4 @@ at foo ([WILDCARD]) at [WILDCARD] error: Uncaught (in promise) Error: Unhandled error event reached main worker. - at Worker.#poll ([WILDCARD]) + at Worker.#pollControl ([WILDCARD]) diff --git a/extensions/web/lib.rs b/extensions/web/lib.rs index d74bb619d8..6e35524762 100644 --- a/extensions/web/lib.rs +++ b/extensions/web/lib.rs @@ -2,7 +2,9 @@ mod message_port; +pub use crate::message_port::create_entangled_message_port; pub use crate::message_port::JsMessageData; +pub use crate::message_port::MessagePort; use deno_core::error::bad_resource_id; use deno_core::error::null_opbuf; diff --git a/extensions/web/message_port.rs b/extensions/web/message_port.rs index d10b455d52..f73d0486ab 100644 --- a/extensions/web/message_port.rs +++ b/extensions/web/message_port.rs @@ -23,7 +23,7 @@ type MessagePortMessage = (Vec, Vec); pub struct MessagePort { rx: RefCell>, - tx: UnboundedSender, + tx: RefCell>>, } impl MessagePort { @@ -37,7 +37,9 @@ impl MessagePort { // Swallow the failed to send error. It means the channel was disentangled, // but not cleaned up. - self.tx.send((data.data.to_vec(), transferables)).ok(); + if let Some(tx) = &*self.tx.borrow() { + tx.send((data.data.to_vec(), transferables)).ok(); + } Ok(()) } @@ -60,6 +62,13 @@ impl MessagePort { } Ok(None) } + + /// This forcefully disconnects the message port from its paired port. This + /// will wake up the `.recv` on the paired port, which will return `Ok(None)`. + pub fn disentangle(&self) { + let mut tx = self.tx.borrow_mut(); + tx.take(); + } } pub fn create_entangled_message_port() -> (MessagePort, MessagePort) { @@ -68,12 +77,12 @@ pub fn create_entangled_message_port() -> (MessagePort, MessagePort) { let port1 = MessagePort { rx: RefCell::new(port1_rx), - tx: port1_tx, + tx: RefCell::new(Some(port1_tx)), }; let port2 = MessagePort { rx: RefCell::new(port2_rx), - tx: port2_tx, + tx: RefCell::new(Some(port2_tx)), }; (port1, port2) @@ -204,5 +213,5 @@ pub async fn op_message_port_recv_message( } }; let cancel = RcRef::map(resource.clone(), |r| &r.cancel); - resource.port.recv(state.clone()).or_cancel(cancel).await? + resource.port.recv(state).or_cancel(cancel).await? } diff --git a/runtime/js/11_workers.js b/runtime/js/11_workers.js index c917a28803..7267bec38d 100644 --- a/runtime/js/11_workers.js +++ b/runtime/js/11_workers.js @@ -3,10 +3,13 @@ ((window) => { const core = window.Deno.core; + const webidl = window.__bootstrap.webidl; const { Window } = window.__bootstrap.globalInterfaces; const { getLocationHref } = window.__bootstrap.location; const { log, pathFromURL } = window.__bootstrap.util; const { defineEventHandler } = window.__bootstrap.webUtil; + const { deserializeJsMessageData, serializeJsMessageData } = + window.__bootstrap.messagePort; function createWorker( specifier, @@ -34,8 +37,12 @@ core.opSync("op_host_post_message", id, data); } - function hostGetMessage(id) { - return core.opAsync("op_host_get_message", id); + function hostRecvCtrl(id) { + return core.opAsync("op_host_recv_ctrl", id); + } + + function hostRecvMessage(id) { + return core.opAsync("op_host_recv_message", id); } /** @@ -187,18 +194,9 @@ options?.name, ); this.#id = id; - this.#poll(); + this.#pollControl(); + this.#pollMessages(); } - - #handleMessage(data) { - const msgEvent = new MessageEvent("message", { - cancelable: false, - data, - }); - - this.dispatchEvent(msgEvent); - } - #handleError(e) { const event = new ErrorEvent("error", { cancelable: true, @@ -219,9 +217,9 @@ return handled; } - #poll = async () => { + #pollControl = async () => { while (!this.#terminated) { - const [type, data] = await hostGetMessage(this.#id); + const [type, data] = await hostRecvCtrl(this.#id); // If terminate was called then we ignore all messages if (this.#terminated) { @@ -229,11 +227,6 @@ } switch (type) { - case 0: { // Message - const msg = core.deserialize(data); - this.#handleMessage(msg); - break; - } case 1: { // TerminalError this.#terminated = true; } /* falls through */ @@ -262,19 +255,57 @@ } }; - postMessage(message, transferOrOptions) { - if (transferOrOptions) { - throw new Error( - "Not yet implemented: `transfer` and `options` are not supported.", + #pollMessages = async () => { + while (!this.terminated) { + const data = await hostRecvMessage(this.#id); + if (data === null) break; + let message, transfer; + try { + const v = deserializeJsMessageData(data); + message = v[0]; + transfer = v[1]; + } catch (err) { + const event = new MessageEvent("messageerror", { + cancelable: false, + data: err, + }); + this.dispatchEvent(event); + return; + } + const event = new MessageEvent("message", { + cancelable: false, + data: message, + ports: transfer, + }); + this.dispatchEvent(event); + } + }; + + postMessage(message, transferOrOptions = {}) { + const prefix = "Failed to execute 'postMessage' on 'MessagePort'"; + webidl.requiredArguments(arguments.length, 1, { prefix }); + message = webidl.converters.any(message); + let options; + if ( + webidl.type(transferOrOptions) === "Object" && + transferOrOptions !== undefined && + transferOrOptions[Symbol.iterator] !== undefined + ) { + const transfer = webidl.converters["sequence"]( + transferOrOptions, + { prefix, context: "Argument 2" }, ); + options = { transfer }; + } else { + options = webidl.converters.PostMessageOptions(transferOrOptions, { + prefix, + context: "Argument 2", + }); } - - if (this.#terminated) { - return; - } - - const bufferMsg = core.serialize(message); - hostPostMessage(this.#id, bufferMsg); + const { transfer } = options; + const data = serializeJsMessageData(message, transfer); + if (this.#terminated) return; + hostPostMessage(this.#id, data); } terminate() { diff --git a/runtime/js/99_main.js b/runtime/js/99_main.js index 91a4dcefd4..d0e86bce7c 100644 --- a/runtime/js/99_main.js +++ b/runtime/js/99_main.js @@ -42,6 +42,8 @@ delete Object.prototype.__proto__; const errors = window.__bootstrap.errors.errors; const webidl = window.__bootstrap.webidl; const { defineEventHandler } = window.__bootstrap.webUtil; + const { deserializeJsMessageData, serializeJsMessageData } = + window.__bootstrap.messagePort; let windowIsClosing = false; @@ -77,9 +79,31 @@ delete Object.prototype.__proto__; const onmessage = () => {}; const onerror = () => {}; - function postMessage(data) { - const dataIntArray = core.serialize(data); - core.opSync("op_worker_post_message", null, dataIntArray); + function postMessage(message, transferOrOptions = {}) { + const prefix = + "Failed to execute 'postMessage' on 'DedicatedWorkerGlobalScope'"; + webidl.requiredArguments(arguments.length, 1, { prefix }); + message = webidl.converters.any(message); + let options; + if ( + webidl.type(transferOrOptions) === "Object" && + transferOrOptions !== undefined && + transferOrOptions[Symbol.iterator] !== undefined + ) { + const transfer = webidl.converters["sequence"]( + transferOrOptions, + { prefix, context: "Argument 2" }, + ); + options = { transfer }; + } else { + options = webidl.converters.PostMessageOptions(transferOrOptions, { + prefix, + context: "Argument 2", + }); + } + const { transfer } = options; + const data = serializeJsMessageData(message, transfer); + core.opSync("op_worker_post_message", data); } let isClosing = false; @@ -90,12 +114,16 @@ delete Object.prototype.__proto__; globalDispatchEvent = globalThis.dispatchEvent.bind(globalThis); } while (!isClosing) { - const bufferMsg = await core.opAsync("op_worker_get_message"); - const data = core.deserialize(bufferMsg); + const data = await core.opAsync("op_worker_recv_message"); + if (data === null) break; + const v = deserializeJsMessageData(data); + const message = v[0]; + const transfer = v[1]; const msgEvent = new MessageEvent("message", { cancelable: false, - data, + data: message, + ports: transfer, }); try { diff --git a/runtime/ops/web_worker.rs b/runtime/ops/web_worker.rs index 39aa2c0a90..026e38157e 100644 --- a/runtime/ops/web_worker.rs +++ b/runtime/ops/web_worker.rs @@ -1,15 +1,15 @@ // Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. use crate::web_worker::WebWorkerInternalHandle; -use crate::web_worker::WorkerEvent; +use crate::web_worker::WorkerControlEvent; use deno_core::error::generic_error; -use deno_core::error::null_opbuf; use deno_core::error::AnyError; use deno_core::op_async; use deno_core::op_sync; +use deno_core::CancelFuture; use deno_core::Extension; use deno_core::OpState; -use deno_core::ZeroCopyBuf; +use deno_web::JsMessageData; use std::cell::RefCell; use std::rc::Rc; @@ -17,7 +17,7 @@ pub fn init() -> Extension { Extension::builder() .ops(vec![ ("op_worker_post_message", op_sync(op_worker_post_message)), - ("op_worker_get_message", op_async(op_worker_get_message)), + ("op_worker_recv_message", op_async(op_worker_recv_message)), // Notify host that guest worker closes. ("op_worker_close", op_sync(op_worker_close)), // Notify host that guest worker has unhandled error. @@ -31,30 +31,28 @@ pub fn init() -> Extension { fn op_worker_post_message( state: &mut OpState, + data: JsMessageData, _: (), - buf: Option, ) -> Result<(), AnyError> { - let buf = buf.ok_or_else(null_opbuf)?; let handle = state.borrow::().clone(); - handle - .post_event(WorkerEvent::Message(buf)) - .expect("Failed to post message to host"); + handle.port.send(state, data)?; Ok(()) } -async fn op_worker_get_message( +async fn op_worker_recv_message( state: Rc>, _: (), _: (), -) -> Result { - let temp = { - let a = state.borrow(); - a.borrow::().clone() +) -> Result, AnyError> { + let handle = { + let state = state.borrow(); + state.borrow::().clone() }; - - let maybe_data = temp.get_message().await; - - Ok(maybe_data.unwrap_or_else(ZeroCopyBuf::empty)) + handle + .port + .recv(state.clone()) + .or_cancel(handle.cancel) + .await? } fn op_worker_close(state: &mut OpState, _: (), _: ()) -> Result<(), AnyError> { @@ -77,7 +75,7 @@ fn op_worker_unhandled_error( ) -> Result<(), AnyError> { let sender = state.borrow::().clone(); sender - .post_event(WorkerEvent::Error(generic_error(message))) + .post_event(WorkerControlEvent::Error(generic_error(message))) .expect("Failed to propagate error event to parent worker"); Ok(()) } diff --git a/runtime/ops/worker_host.rs b/runtime/ops/worker_host.rs index 57d3ac2b8f..162f9f4f77 100644 --- a/runtime/ops/worker_host.rs +++ b/runtime/ops/worker_host.rs @@ -12,25 +12,23 @@ use crate::permissions::UnaryPermission; use crate::permissions::UnitPermission; use crate::permissions::WriteDescriptor; use crate::web_worker::run_web_worker; +use crate::web_worker::SendableWebWorkerHandle; use crate::web_worker::WebWorker; use crate::web_worker::WebWorkerHandle; -use crate::web_worker::WorkerEvent; +use crate::web_worker::WorkerControlEvent; use crate::web_worker::WorkerId; use deno_core::error::custom_error; -use deno_core::error::null_opbuf; use deno_core::error::AnyError; -use deno_core::error::JsError; use deno_core::op_async; use deno_core::op_sync; use deno_core::serde::de; use deno_core::serde::de::SeqAccess; use deno_core::serde::Deserialize; use deno_core::serde::Deserializer; -use deno_core::serde_json::json; use deno_core::Extension; use deno_core::ModuleSpecifier; use deno_core::OpState; -use deno_core::ZeroCopyBuf; +use deno_web::JsMessageData; use log::debug; use std::cell::RefCell; use std::collections::HashMap; @@ -51,8 +49,9 @@ pub struct CreateWebWorkerArgs { pub use_deno_namespace: bool, } -pub type CreateWebWorkerCb = - dyn Fn(CreateWebWorkerArgs) -> WebWorker + Sync + Send; +pub type CreateWebWorkerCb = dyn Fn(CreateWebWorkerArgs) -> (WebWorker, SendableWebWorkerHandle) + + Sync + + Send; /// A holder for callback that is used to create a new /// WebWorker. It's a struct instead of a type alias @@ -87,7 +86,8 @@ pub fn init(create_web_worker_cb: Arc) -> Extension { op_sync(op_host_terminate_worker), ), ("op_host_post_message", op_sync(op_host_post_message)), - ("op_host_get_message", op_async(op_host_get_message)), + ("op_host_recv_ctrl", op_async(op_host_recv_ctrl)), + ("op_host_recv_message", op_async(op_host_recv_message)), ]) .build() } @@ -458,8 +458,9 @@ fn op_create_worker( let module_specifier = deno_core::resolve_url(&specifier)?; let worker_name = args_name.unwrap_or_else(|| "".to_string()); - let (handle_sender, handle_receiver) = - std::sync::mpsc::sync_channel::>(1); + let (handle_sender, handle_receiver) = std::sync::mpsc::sync_channel::< + Result, + >(1); // Setup new thread let thread_builder = @@ -472,17 +473,18 @@ fn op_create_worker( // all action done upon it should be noops // - newly spawned thread exits - let worker = (create_module_loader.0)(CreateWebWorkerArgs { - name: worker_name, - worker_id, - parent_permissions, - permissions: worker_permissions, - main_module: module_specifier.clone(), - use_deno_namespace, - }); + let (worker, external_handle) = + (create_module_loader.0)(CreateWebWorkerArgs { + name: worker_name, + worker_id, + parent_permissions, + permissions: worker_permissions, + main_module: module_specifier.clone(), + use_deno_namespace, + }); // Send thread safe handle from newly created worker to host thread - handle_sender.send(Ok(worker.thread_safe_handle())).unwrap(); + handle_sender.send(Ok(external_handle)).unwrap(); drop(handle_sender); // At this point the only method of communication with host @@ -497,7 +499,7 @@ fn op_create_worker( let worker_thread = WorkerThread { join_handle, - worker_handle, + worker_handle: worker_handle.into(), }; // At this point all interactions with worker happen using thread @@ -514,7 +516,7 @@ fn op_host_terminate_worker( id: WorkerId, _: (), ) -> Result<(), AnyError> { - let mut worker_thread = state + let worker_thread = state .borrow_mut::() .remove(&id) .expect("No worker handle found"); @@ -527,52 +529,13 @@ fn op_host_terminate_worker( Ok(()) } -use deno_core::serde::Serialize; -use deno_core::serde::Serializer; - -impl Serialize for WorkerEvent { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - let type_id = match &self { - WorkerEvent::Message(_) => 0_i32, - WorkerEvent::TerminalError(_) => 1_i32, - WorkerEvent::Error(_) => 2_i32, - WorkerEvent::Close => 3_i32, - }; - - match self { - WorkerEvent::Message(buf) => { - Serialize::serialize(&(type_id, buf), serializer) - } - WorkerEvent::TerminalError(error) | WorkerEvent::Error(error) => { - let value = match error.downcast_ref::() { - Some(js_error) => json!({ - "message": js_error.message, - "fileName": js_error.script_resource_name, - "lineNumber": js_error.line_number, - "columnNumber": js_error.start_column, - }), - None => json!({ - "message": error.to_string(), - }), - }; - - Serialize::serialize(&(type_id, value), serializer) - } - _ => Serialize::serialize(&(type_id, ()), serializer), - } - } -} - /// Try to remove worker from workers table - NOTE: `Worker.terminate()` /// might have been called already meaning that we won't find worker in /// table - in that case ignore. fn try_remove_and_close(state: Rc>, id: WorkerId) { let mut s = state.borrow_mut(); let workers = s.borrow_mut::(); - if let Some(mut worker_thread) = workers.remove(&id) { + if let Some(worker_thread) = workers.remove(&id) { worker_thread.worker_handle.terminate(); worker_thread .join_handle @@ -582,12 +545,43 @@ fn try_remove_and_close(state: Rc>, id: WorkerId) { } } -/// Get message from guest worker as host -async fn op_host_get_message( +/// Get control event from guest worker as host +async fn op_host_recv_ctrl( state: Rc>, id: WorkerId, _: (), -) -> Result { +) -> Result { + let worker_handle = { + let state = state.borrow(); + let workers_table = state.borrow::(); + let maybe_handle = workers_table.get(&id); + if let Some(handle) = maybe_handle { + handle.worker_handle.clone() + } else { + // If handle was not found it means worker has already shutdown + return Ok(WorkerControlEvent::Close); + } + }; + + let maybe_event = worker_handle.get_control_event().await?; + if let Some(event) = maybe_event { + // Terminal error means that worker should be removed from worker table. + if let WorkerControlEvent::TerminalError(_) = &event { + try_remove_and_close(state, id); + } + return Ok(event); + } + + // If there was no event from worker it means it has already been closed. + try_remove_and_close(state, id); + Ok(WorkerControlEvent::Close) +} + +async fn op_host_recv_message( + state: Rc>, + id: WorkerId, + _: (), +) -> Result, AnyError> { let worker_handle = { let s = state.borrow(); let workers_table = s.borrow::(); @@ -596,37 +590,26 @@ async fn op_host_get_message( handle.worker_handle.clone() } else { // If handle was not found it means worker has already shutdown - return Ok(WorkerEvent::Close); + return Ok(None); } }; - - let maybe_event = worker_handle.get_event().await?; - if let Some(event) = maybe_event { - // Terminal error means that worker should be removed from worker table. - if let WorkerEvent::TerminalError(_) = &event { - try_remove_and_close(state, id); - } - return Ok(event); - } - - // If there was no event from worker it means it has already been closed. - try_remove_and_close(state, id); - Ok(WorkerEvent::Close) + worker_handle.port.recv(state).await } /// Post message to guest worker as host fn op_host_post_message( state: &mut OpState, id: WorkerId, - data: Option, + data: JsMessageData, ) -> Result<(), AnyError> { - let msg = data.ok_or_else(null_opbuf)?; - debug!("post message to worker {}", id); - let worker_thread = state - .borrow::() - .get(&id) - .expect("No worker handle found"); - worker_thread.worker_handle.post_message(msg)?; + let worker_handle = { + let worker_thread = state + .borrow::() + .get(&id) + .expect("No worker handle found"); + worker_thread.worker_handle.clone() + }; + worker_handle.port.send(state, data)?; Ok(()) } diff --git a/runtime/web_worker.rs b/runtime/web_worker.rs index 753238052e..a3a062221d 100644 --- a/runtime/web_worker.rs +++ b/runtime/web_worker.rs @@ -8,6 +8,7 @@ use crate::permissions::Permissions; use crate::tokio_util::create_basic_runtime; use deno_broadcast_channel::InMemoryBroadcastChannel; use deno_core::error::AnyError; +use deno_core::error::JsError; use deno_core::futures::channel::mpsc; use deno_core::futures::future::poll_fn; use deno_core::futures::future::FutureExt; @@ -18,6 +19,7 @@ use deno_core::serde::Serialize; use deno_core::serde_json; use deno_core::serde_json::json; use deno_core::v8; +use deno_core::CancelHandle; use deno_core::Extension; use deno_core::GetErrorClassFn; use deno_core::JsErrorCreateFn; @@ -26,8 +28,9 @@ use deno_core::ModuleId; use deno_core::ModuleLoader; use deno_core::ModuleSpecifier; use deno_core::RuntimeOptions; -use deno_core::ZeroCopyBuf; +use deno_web::create_entangled_message_port; use deno_web::BlobUrlStore; +use deno_web::MessagePort; use log::debug; use std::cell::RefCell; use std::env; @@ -38,7 +41,6 @@ use std::sync::atomic::Ordering; use std::sync::Arc; use std::task::Context; use std::task::Poll; -use tokio::sync::Mutex as AsyncMutex; #[derive( Debug, Default, Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, @@ -55,29 +57,62 @@ impl WorkerId { } } -type WorkerMessage = ZeroCopyBuf; - /// Events that are sent to host from child /// worker. -pub enum WorkerEvent { - Message(WorkerMessage), +pub enum WorkerControlEvent { Error(AnyError), TerminalError(AnyError), Close, } +use deno_core::serde::Serializer; + +impl Serialize for WorkerControlEvent { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let type_id = match &self { + WorkerControlEvent::TerminalError(_) => 1_i32, + WorkerControlEvent::Error(_) => 2_i32, + WorkerControlEvent::Close => 3_i32, + }; + + match self { + WorkerControlEvent::TerminalError(error) + | WorkerControlEvent::Error(error) => { + let value = match error.downcast_ref::() { + Some(js_error) => json!({ + "message": js_error.message, + "fileName": js_error.script_resource_name, + "lineNumber": js_error.line_number, + "columnNumber": js_error.start_column, + }), + None => json!({ + "message": error.to_string(), + }), + }; + + Serialize::serialize(&(type_id, value), serializer) + } + _ => Serialize::serialize(&(type_id, ()), serializer), + } + } +} + // Channels used for communication with worker's parent #[derive(Clone)] pub struct WebWorkerInternalHandle { - sender: mpsc::Sender, - receiver: Rc>>, + sender: mpsc::Sender, + pub port: Rc, + pub cancel: Rc, terminated: Arc, isolate_handle: v8::IsolateHandle, } impl WebWorkerInternalHandle { /// Post WorkerEvent to parent as a worker - pub fn post_event(&self, event: WorkerEvent) -> Result<(), AnyError> { + pub fn post_event(&self, event: WorkerControlEvent) -> Result<(), AnyError> { let mut sender = self.sender.clone(); // If the channel is closed, // the worker must have terminated but the termination message has not yet been received. @@ -91,13 +126,6 @@ impl WebWorkerInternalHandle { Ok(()) } - /// Get the WorkerEvent with lock - /// Panic if more than one listener tries to get event - pub async fn get_message(&self) -> Option { - let mut receiver = self.receiver.borrow_mut(); - receiver.next().await - } - /// Check if this worker is terminated or being terminated pub fn is_terminated(&self) -> bool { self.terminated.load(Ordering::SeqCst) @@ -106,6 +134,8 @@ impl WebWorkerInternalHandle { /// Terminate the worker /// This function will set terminated to true, terminate the isolate and close the message channel pub fn terminate(&mut self) { + self.cancel.cancel(); + // This function can be called multiple times by whomever holds // the handle. However only a single "termination" should occur so // we need a guard here. @@ -121,40 +151,52 @@ impl WebWorkerInternalHandle { } } +pub struct SendableWebWorkerHandle { + port: MessagePort, + receiver: mpsc::Receiver, + terminated: Arc, + isolate_handle: v8::IsolateHandle, +} + +impl From for WebWorkerHandle { + fn from(handle: SendableWebWorkerHandle) -> Self { + WebWorkerHandle { + receiver: Rc::new(RefCell::new(handle.receiver)), + port: Rc::new(handle.port), + terminated: handle.terminated, + isolate_handle: handle.isolate_handle, + } + } +} + +/// This is the handle to the web worker that the parent thread uses to +/// communicate with the worker. It is created from a `SendableWebWorkerHandle` +/// which is sent to the parent thread from the worker thread where it is +/// created. The reason for this seperation is that the handle first needs to be +/// `Send` when transferring between threads, and then must be `Clone` when it +/// has arrived on the parent thread. It can not be both at once without large +/// amounts of Arc and other fun stuff. #[derive(Clone)] pub struct WebWorkerHandle { - sender: mpsc::Sender, - receiver: Arc>>, + pub port: Rc, + receiver: Rc>>, terminated: Arc, isolate_handle: v8::IsolateHandle, } impl WebWorkerHandle { - /// Post WorkerMessage to worker as a host - pub fn post_message(&self, buf: WorkerMessage) -> Result<(), AnyError> { - let mut sender = self.sender.clone(); - // If the channel is closed, - // the worker must have terminated but the termination message has not yet been recieved. - // - // Therefore just treat it as if the worker has terminated and return. - if sender.is_closed() { - self.terminated.store(true, Ordering::SeqCst); - return Ok(()); - } - sender.try_send(buf)?; - Ok(()) - } - /// Get the WorkerEvent with lock /// Return error if more than one listener tries to get event - pub async fn get_event(&self) -> Result, AnyError> { - let mut receiver = self.receiver.try_lock()?; + pub async fn get_control_event( + &self, + ) -> Result, AnyError> { + let mut receiver = self.receiver.borrow_mut(); Ok(receiver.next().await) } /// Terminate the worker /// This function will set terminated to true, terminate the isolate and close the message channel - pub fn terminate(&mut self) { + pub fn terminate(self) { // This function can be called multiple times by whomever holds // the handle. However only a single "termination" should occur so // we need a guard here. @@ -165,26 +207,26 @@ impl WebWorkerHandle { self.isolate_handle.terminate_execution(); } - // Wake web worker by closing the channel - self.sender.close_channel(); + self.port.disentangle(); } } fn create_handles( isolate_handle: v8::IsolateHandle, -) -> (WebWorkerInternalHandle, WebWorkerHandle) { - let (in_tx, in_rx) = mpsc::channel::(1); - let (out_tx, out_rx) = mpsc::channel::(1); +) -> (WebWorkerInternalHandle, SendableWebWorkerHandle) { + let (parent_port, worker_port) = create_entangled_message_port(); + let (ctrl_tx, ctrl_rx) = mpsc::channel::(1); let terminated = Arc::new(AtomicBool::new(false)); let internal_handle = WebWorkerInternalHandle { - sender: out_tx, - receiver: Rc::new(RefCell::new(in_rx)), + sender: ctrl_tx, + port: Rc::new(parent_port), terminated: terminated.clone(), isolate_handle: isolate_handle.clone(), + cancel: CancelHandle::new_rc(), }; - let external_handle = WebWorkerHandle { - sender: in_tx, - receiver: Arc::new(AsyncMutex::new(out_rx)), + let external_handle = SendableWebWorkerHandle { + receiver: ctrl_rx, + port: worker_port, terminated, isolate_handle, }; @@ -200,7 +242,6 @@ pub struct WebWorker { pub js_runtime: JsRuntime, pub name: String, internal_handle: WebWorkerInternalHandle, - external_handle: WebWorkerHandle, pub use_deno_namespace: bool, pub main_module: ModuleSpecifier, } @@ -237,7 +278,7 @@ impl WebWorker { main_module: ModuleSpecifier, worker_id: WorkerId, options: &WebWorkerOptions, - ) -> Self { + ) -> (Self, SendableWebWorkerHandle) { // Permissions: many ops depend on this let unstable = options.unstable; let perm_ext = Extension::builder() @@ -333,15 +374,17 @@ impl WebWorker { (internal_handle, external_handle) }; - Self { - id: worker_id, - js_runtime, - name, - internal_handle, + ( + Self { + id: worker_id, + js_runtime, + name, + internal_handle, + use_deno_namespace: options.use_deno_namespace, + main_module, + }, external_handle, - use_deno_namespace: options.use_deno_namespace, - main_module, - } + ) } pub fn bootstrap(&mut self, options: &WebWorkerOptions) { @@ -419,11 +462,6 @@ impl WebWorker { } } - /// Returns a way to communicate with the Worker from other threads. - pub fn thread_safe_handle(&self) -> WebWorkerHandle { - self.external_handle.clone() - } - pub fn poll_event_loop( &mut self, cx: &mut Context, @@ -446,7 +484,7 @@ impl WebWorker { print_worker_error(e.to_string(), &self.name); let handle = self.internal_handle.clone(); handle - .post_event(WorkerEvent::Error(e)) + .post_event(WorkerControlEvent::Error(e)) .expect("Failed to post message to host"); return Poll::Pending; @@ -513,7 +551,7 @@ pub fn run_web_worker( if let Err(e) = result { print_worker_error(e.to_string(), &name); internal_handle - .post_event(WorkerEvent::TerminalError(e)) + .post_event(WorkerControlEvent::TerminalError(e)) .expect("Failed to post message to host"); // Failure to execute script is a terminal error, bye, bye. @@ -524,134 +562,3 @@ pub fn run_web_worker( debug!("Worker thread shuts down {}", &name); result } - -#[cfg(test)] -mod tests { - use super::*; - use crate::tokio_util; - - fn create_test_web_worker() -> WebWorker { - let main_module = deno_core::resolve_url_or_path("./hello.js").unwrap(); - let module_loader = Rc::new(deno_core::NoopModuleLoader); - let create_web_worker_cb = Arc::new(|_| unreachable!()); - - let options = WebWorkerOptions { - args: vec![], - apply_source_maps: false, - debug_flag: false, - unstable: false, - ca_data: None, - user_agent: "x".to_string(), - seed: None, - module_loader, - create_web_worker_cb, - js_error_create_fn: None, - use_deno_namespace: false, - maybe_inspector_server: None, - runtime_version: "x".to_string(), - ts_version: "x".to_string(), - no_color: true, - get_error_class_fn: None, - blob_url_store: BlobUrlStore::default(), - broadcast_channel: InMemoryBroadcastChannel::default(), - }; - - let mut worker = WebWorker::from_options( - "TEST".to_string(), - Permissions::allow_all(), - main_module, - WorkerId(1), - &options, - ); - worker.bootstrap(&options); - worker - } - - #[tokio::test] - async fn test_worker_messages() { - let (handle_sender, handle_receiver) = - std::sync::mpsc::sync_channel::(1); - - let join_handle = std::thread::spawn(move || { - let mut worker = create_test_web_worker(); - let source = r#" - onmessage = function(e) { - console.log("msg from main script", e.data); - if (e.data == "exit") { - return close(); - } else { - console.assert(e.data === "hi"); - } - postMessage([1, 2, 3]); - console.log("after postMessage"); - } - "#; - worker.execute_script("a", source).unwrap(); - let handle = worker.thread_safe_handle(); - handle_sender.send(handle).unwrap(); - let r = tokio_util::run_basic(worker.run_event_loop(false)); - assert!(r.is_ok()) - }); - - let mut handle = handle_receiver.recv().unwrap(); - - // TODO(Inteon): use Deno.core.serialize() instead of hardcoded encoded value - let msg = vec![34, 2, 104, 105].into_boxed_slice(); // "hi" encoded - let r = handle.post_message(msg.clone().into()); - assert!(r.is_ok()); - - let maybe_msg = handle.get_event().await.unwrap(); - assert!(maybe_msg.is_some()); - - let r = handle.post_message(msg.clone().into()); - assert!(r.is_ok()); - - let maybe_msg = handle.get_event().await.unwrap(); - assert!(maybe_msg.is_some()); - match maybe_msg { - Some(WorkerEvent::Message(buf)) => { - // TODO(Inteon): use Deno.core.serialize() instead of hardcoded encoded value - assert_eq!(*buf, [65, 3, 73, 2, 73, 4, 73, 6, 36, 0, 3]); - } - _ => unreachable!(), - } - - // TODO(Inteon): use Deno.core.serialize() instead of hardcoded encoded value - let msg = vec![34, 4, 101, 120, 105, 116].into_boxed_slice(); // "exit" encoded - let r = handle.post_message(msg.into()); - assert!(r.is_ok()); - let event = handle.get_event().await.unwrap(); - assert!(event.is_none()); - handle.sender.close_channel(); - join_handle.join().expect("Failed to join worker thread"); - } - - #[tokio::test] - async fn removed_from_resource_table_on_close() { - let (handle_sender, handle_receiver) = - std::sync::mpsc::sync_channel::(1); - - let join_handle = std::thread::spawn(move || { - let mut worker = create_test_web_worker(); - worker - .execute_script("a", "onmessage = () => { close(); }") - .unwrap(); - let handle = worker.thread_safe_handle(); - handle_sender.send(handle).unwrap(); - let r = tokio_util::run_basic(worker.run_event_loop(false)); - assert!(r.is_ok()) - }); - - let mut handle = handle_receiver.recv().unwrap(); - - // TODO(Inteon): use Deno.core.serialize() instead of hardcoded encoded value - let msg = vec![34, 2, 104, 105].into_boxed_slice(); // "hi" encoded - let r = handle.post_message(msg.clone().into()); - assert!(r.is_ok()); - let event = handle.get_event().await.unwrap(); - assert!(event.is_none()); - handle.sender.close_channel(); - - join_handle.join().expect("Failed to join worker thread"); - } -}