From ed680552a24b7d4b936b7c16a63b46e0f24c0e60 Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Wed, 5 Feb 2020 17:16:07 -0500 Subject: [PATCH] fix: basic web worker message passing (#3893) Removes OP_HOST_GET_WORKER_LOADED, OP_HOST_POLL_WORKER, OP_HOST_RESUME_WORKER and ready/messageBuffer in cli/js/workers.ts. --- cli/js/dispatch.ts | 3 - cli/js/workers.ts | 71 +++------------- cli/ops/worker_host.rs | 113 ++------------------------ cli/tests/integration_tests.rs | 5 ++ cli/tests/subdir/test_worker_basic.js | 17 ++++ cli/tests/workers_basic.out | 3 + cli/tests/workers_basic.ts | 11 +++ 7 files changed, 56 insertions(+), 167 deletions(-) create mode 100644 cli/tests/subdir/test_worker_basic.js create mode 100644 cli/tests/workers_basic.out create mode 100644 cli/tests/workers_basic.ts diff --git a/cli/js/dispatch.ts b/cli/js/dispatch.ts index aa6696fa2c..1a6b6528db 100644 --- a/cli/js/dispatch.ts +++ b/cli/js/dispatch.ts @@ -42,11 +42,8 @@ export let OP_QUERY_PERMISSION: number; export let OP_REVOKE_PERMISSION: number; export let OP_REQUEST_PERMISSION: number; export let OP_CREATE_WORKER: number; -export let OP_HOST_GET_WORKER_LOADED: number; export let OP_HOST_POST_MESSAGE: number; -export let OP_HOST_POLL_WORKER: number; export let OP_HOST_CLOSE_WORKER: number; -export let OP_HOST_RESUME_WORKER: number; export let OP_HOST_GET_MESSAGE: number; export let OP_WORKER_POST_MESSAGE: number; export let OP_WORKER_GET_MESSAGE: number; diff --git a/cli/js/workers.ts b/cli/js/workers.ts index 2a5d4d1909..fb63a3260b 100644 --- a/cli/js/workers.ts +++ b/cli/js/workers.ts @@ -29,7 +29,7 @@ function createWorker( hasSourceCode: boolean, sourceCode: Uint8Array, name?: string -): { id: number; loaded: boolean } { +): { id: number } { return sendSync(dispatch.OP_CREATE_WORKER, { specifier, hasSourceCode, @@ -38,22 +38,6 @@ function createWorker( }); } -async function hostGetWorkerLoaded(id: number): Promise { - return await sendAsync(dispatch.OP_HOST_GET_WORKER_LOADED, { id }); -} - -async function hostPollWorker(id: number): Promise { - return await sendAsync(dispatch.OP_HOST_POLL_WORKER, { id }); -} - -function hostCloseWorker(id: number): void { - sendSync(dispatch.OP_HOST_CLOSE_WORKER, { id }); -} - -function hostResumeWorker(id: number): void { - sendSync(dispatch.OP_HOST_RESUME_WORKER, { id }); -} - function hostPostMessage(id: number, data: any): void { const dataIntArray = encodeMessage(data); sendSync(dispatch.OP_HOST_POST_MESSAGE, { id }, dataIntArray); @@ -85,8 +69,6 @@ export interface WorkerOptions { export class WorkerImpl extends EventTarget implements Worker { private readonly id: number; private isClosing = false; - private messageBuffer: any[] = []; - private ready = false; public onerror?: (e: any) => void; public onmessage?: (data: any) => void; public onmessageerror?: () => void; @@ -125,14 +107,13 @@ export class WorkerImpl extends EventTarget implements Worker { } */ - const { id, loaded } = createWorker( + const { id } = createWorker( specifier, hasSourceCode, sourceCode, options?.name ); this.id = id; - this.ready = loaded; this.poll(); } @@ -158,27 +139,19 @@ export class WorkerImpl extends EventTarget implements Worker { } async poll(): Promise { - // If worker has not been immediately executed - // then let's await it's readiness - if (!this.ready) { - const result = await hostGetWorkerLoaded(this.id); - - if (result.error) { - if (!this.handleError(result.error)) { - throw new Error(result.error.message); - } - return; + while (!this.isClosing) { + const data = await hostGetMessage(this.id); + if (data == null) { + log("worker got null message. quitting."); + break; + } + if (this.onmessage) { + const event = { data }; + this.onmessage(event); } } - // drain messages - for (const data of this.messageBuffer) { - hostPostMessage(this.id, data); - } - this.messageBuffer = []; - this.ready = true; - this.run(); - + /* while (true) { const result = await hostPollWorker(this.id); @@ -194,32 +167,14 @@ export class WorkerImpl extends EventTarget implements Worker { break; } } + */ } postMessage(data: any): void { - if (!this.ready) { - this.messageBuffer.push(data); - return; - } - hostPostMessage(this.id, data); } terminate(): void { throw new Error("Not yet implemented"); } - - private async run(): Promise { - while (!this.isClosing) { - const data = await hostGetMessage(this.id); - if (data == null) { - log("worker got null message. quitting."); - break; - } - if (this.onmessage) { - const event = { data }; - this.onmessage(event); - } - } - } } diff --git a/cli/ops/worker_host.rs b/cli/ops/worker_host.rs index c1dcd6aaaf..f8b3edfce8 100644 --- a/cli/ops/worker_host.rs +++ b/cli/ops/worker_host.rs @@ -4,7 +4,6 @@ use crate::deno_error::bad_resource; use crate::deno_error::js_check; use crate::deno_error::DenoError; use crate::deno_error::ErrorKind; -use crate::fmt_errors::JSError; use crate::ops::dispatch_json::JsonResult; use crate::ops::json_op; use crate::startup_data; @@ -12,11 +11,8 @@ use crate::state::ThreadSafeState; use crate::web_worker::WebWorker; use deno_core::*; use futures; -use futures::channel::mpsc; use futures::future::FutureExt; use futures::future::TryFutureExt; -use futures::sink::SinkExt; -use futures::stream::StreamExt; use std; use std::convert::From; use std::sync::atomic::Ordering; @@ -26,22 +22,10 @@ pub fn init(i: &mut Isolate, s: &ThreadSafeState) { "create_worker", s.core_op(json_op(s.stateful_op(op_create_worker))), ); - i.register_op( - "host_get_worker_loaded", - s.core_op(json_op(s.stateful_op(op_host_get_worker_loaded))), - ); - i.register_op( - "host_poll_worker", - s.core_op(json_op(s.stateful_op(op_host_poll_worker))), - ); i.register_op( "host_close_worker", s.core_op(json_op(s.stateful_op(op_host_close_worker))), ); - i.register_op( - "host_resume_worker", - s.core_op(json_op(s.stateful_op(op_host_resume_worker))), - ); i.register_op( "host_post_message", s.core_op(json_op(s.stateful_op(op_host_post_message))), @@ -130,29 +114,21 @@ fn op_create_worker( // Has provided source code, execute immediately. if has_source_code { js_check(worker.execute(&source_code)); - load_sender - .send(Ok(json!({"id": worker_id, "loaded": true}))) - .unwrap(); + load_sender.send(Ok(json!({ "id": worker_id }))).unwrap(); return; } - let (mut sender, receiver) = mpsc::channel::>(1); - - // TODO(bartlomieju): this future should be spawned on the separate thread, - // dedicated to that worker let fut = async move { - let result = worker + let r = worker .execute_mod_async(&module_specifier, None, false) .await; - sender.send(result).await.expect("Failed to send message"); + if r.is_ok() { + let _ = (&mut *worker).await; + } } .boxed_local(); - let mut table = parent_state.loading_workers.lock().unwrap(); - table.insert(worker_id, receiver); - load_sender - .send(Ok(json!({"id": worker_id, "loaded": false}))) - .unwrap(); + load_sender.send(Ok(json!({ "id": worker_id }))).unwrap(); crate::tokio_util::run_basic(fut); }); @@ -162,67 +138,11 @@ fn op_create_worker( Ok(JsonOp::Sync(r.unwrap())) } -fn serialize_worker_result(result: Result<(), ErrBox>) -> Value { - use crate::deno_error::GetErrorKind; - - if let Err(error) = result { - match error.kind() { - ErrorKind::JSError => { - let error = error.downcast::().unwrap(); - let exception: V8Exception = error.into(); - json!({"error": { - "message": exception.message, - "fileName": exception.script_resource_name, - "lineNumber": exception.line_number, - "columnNumber": exception.start_column, - }}) - } - _ => json!({"error": { - "message": error.to_string(), - }}), - } - } else { - json!({"ok": true}) - } -} - #[derive(Deserialize)] struct WorkerArgs { id: i32, } -fn op_host_get_worker_loaded( - state: &ThreadSafeState, - args: Value, - _data: Option, -) -> Result { - let args: WorkerArgs = serde_json::from_value(args)?; - let id = args.id as u32; - let mut table = state.loading_workers.lock().unwrap(); - let mut receiver = table.remove(&id).unwrap(); - - let op = async move { - let result = receiver.next().await.unwrap(); - Ok(serialize_worker_result(result)) - }; - - Ok(JsonOp::Async(op.boxed_local())) -} - -fn op_host_poll_worker( - _state: &ThreadSafeState, - _args: Value, - _data: Option, -) -> Result { - println!("op_host_poll_worker"); - // TOOO(ry) remove this. - todo!() - /* - let op = async { Ok(serialize_worker_result(Ok(()))) }; - Ok(JsonOp::Async(op.boxed_local())) - */ -} - fn op_host_close_worker( state: &ThreadSafeState, args: Value, @@ -246,25 +166,6 @@ fn op_host_close_worker( Ok(JsonOp::Sync(json!({}))) } -fn op_host_resume_worker( - _state: &ThreadSafeState, - _args: Value, - _data: Option, -) -> Result { - // TODO(ry) We are not on the same thread. We cannot just call worker.execute. - // We can only send messages. This needs to be reimplemented somehow. - todo!() - /* - let args: WorkerArgs = serde_json::from_value(args)?; - let id = args.id as u32; - let state = state.clone(); - let mut workers_table = state.workers.lock().unwrap(); - let worker = workers_table.get_mut(&id).unwrap(); - js_check(worker.execute("runWorkerMessageLoop()")); - Ok(JsonOp::Sync(json!({}))) - */ -} - #[derive(Deserialize)] struct HostGetMessageArgs { id: i32, @@ -284,7 +185,7 @@ fn op_host_get_message( let worker_handle = table.get_mut(&id).ok_or_else(bad_resource)?; let fut = worker_handle.get_message(); let op = async move { - let maybe_buf = fut.await.unwrap(); + let maybe_buf = fut.await; Ok(json!({ "data": maybe_buf })) }; Ok(JsonOp::Async(op.boxed_local())) diff --git a/cli/tests/integration_tests.rs b/cli/tests/integration_tests.rs index 08fccd7357..9555f93c07 100644 --- a/cli/tests/integration_tests.rs +++ b/cli/tests/integration_tests.rs @@ -401,6 +401,11 @@ itest!(_026_workers { }); */ +itest!(workers_basic { + args: "run --reload workers_basic.ts", + output: "workers_basic.out", +}); + itest!(_027_redirect_typescript { args: "run --reload 027_redirect_typescript.ts", output: "027_redirect_typescript.ts.out", diff --git a/cli/tests/subdir/test_worker_basic.js b/cli/tests/subdir/test_worker_basic.js new file mode 100644 index 0000000000..db00b6d0c7 --- /dev/null +++ b/cli/tests/subdir/test_worker_basic.js @@ -0,0 +1,17 @@ +console.log("hello from test_worker_basic.js"); + +// TODO(bartlomieju): add test for throwing in web worker +if (self.name !== "jsWorker") { + throw Error(`Bad worker name: ${self.name}, expected jsWorker`); +} + +onmessage = function(e) { + console.log("jsWorker onmessage", e.data); + postMessage(e.data); + close(); +}; + +onerror = function() { + console.log("called onerror in worker"); + return false; +}; diff --git a/cli/tests/workers_basic.out b/cli/tests/workers_basic.out new file mode 100644 index 0000000000..15c5735303 --- /dev/null +++ b/cli/tests/workers_basic.out @@ -0,0 +1,3 @@ +hello from test_worker_basic.js +jsWorker onmessage msg1 +main recv: msg1 diff --git a/cli/tests/workers_basic.ts b/cli/tests/workers_basic.ts new file mode 100644 index 0000000000..64bd58fcca --- /dev/null +++ b/cli/tests/workers_basic.ts @@ -0,0 +1,11 @@ +// Tests basic postMessage, close, onmessage +const jsWorker = new Worker("./subdir/test_worker_basic.js", { + type: "module", + name: "jsWorker" +}); + +jsWorker.onmessage = (e): void => { + console.log("main recv: " + e.data); +}; + +jsWorker.postMessage("msg1");