From ecd1d3abb0cae9c7cbc1330cbaa035a5786e94d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Tue, 21 Jan 2020 17:50:06 +0100 Subject: [PATCH] refactor: split cli::Worker (#3735) * cli::Worker is base struct to create specialized workers * add MainWorker * add CompilerWorker * refactor WebWorker to use Worker --- cli/compilers/compiler_worker.rs | 78 +++++++++++++++ cli/compilers/mod.rs | 1 + cli/compilers/ts.rs | 14 +-- cli/compilers/wasm.rs | 9 +- cli/lib.rs | 13 ++- cli/ops/compiler.rs | 48 ---------- cli/ops/mod.rs | 1 + cli/ops/runtime_compiler.rs | 56 +++++++++++ cli/ops/web_worker.rs | 30 +----- cli/ops/worker_host.rs | 28 ++---- cli/state.rs | 9 +- cli/web_worker.rs | 135 ++++++-------------------- cli/worker.rs | 157 +++++++++++++++++++------------ 13 files changed, 299 insertions(+), 280 deletions(-) create mode 100644 cli/compilers/compiler_worker.rs create mode 100644 cli/ops/runtime_compiler.rs diff --git a/cli/compilers/compiler_worker.rs b/cli/compilers/compiler_worker.rs new file mode 100644 index 0000000000..461194c37c --- /dev/null +++ b/cli/compilers/compiler_worker.rs @@ -0,0 +1,78 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. +use crate::ops; +use crate::state::ThreadSafeState; +use crate::worker::Worker; +use crate::worker::WorkerChannels; +use deno_core; +use deno_core::ErrBox; +use deno_core::StartupData; +use futures::future::FutureExt; +use std::future::Future; +use std::ops::Deref; +use std::ops::DerefMut; +use std::pin::Pin; +use std::task::Context; +use std::task::Poll; + +/// This worker is used to host TypeScript and WASM compilers. +/// +/// It provides minimal set of ops that are necessary to facilitate +/// compilation. +/// +/// NOTE: This worker is considered priveleged, because it may +/// access file system without permission check. +/// +/// At the moment this worker is meant to be single-use - after +/// performing single compilation/bundling it should be destroyed. +/// +/// TODO(bartlomieju): add support to reuse the worker - or in other +/// words support stateful TS compiler +#[derive(Clone)] +pub struct CompilerWorker(Worker); + +impl CompilerWorker { + pub fn new( + name: String, + startup_data: StartupData, + state: ThreadSafeState, + external_channels: WorkerChannels, + ) -> Self { + let state_ = state.clone(); + let worker = Worker::new(name, startup_data, state_, external_channels); + { + let mut isolate = worker.isolate.try_lock().unwrap(); + ops::compiler::init(&mut isolate, &state); + ops::web_worker::init(&mut isolate, &state); + // TODO(bartlomieju): CompilerWorker should not + // depend on those ops + ops::os::init(&mut isolate, &state); + ops::files::init(&mut isolate, &state); + ops::fs::init(&mut isolate, &state); + ops::io::init(&mut isolate, &state); + } + + Self(worker) + } +} + +impl Deref for CompilerWorker { + type Target = Worker; + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for CompilerWorker { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl Future for CompilerWorker { + type Output = Result<(), ErrBox>; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let inner = self.get_mut(); + inner.0.poll_unpin(cx) + } +} diff --git a/cli/compilers/mod.rs b/cli/compilers/mod.rs index 87e34ac5fc..f4aac3681d 100644 --- a/cli/compilers/mod.rs +++ b/cli/compilers/mod.rs @@ -3,6 +3,7 @@ use deno_core::ErrBox; use futures::Future; use serde_json::Value; +mod compiler_worker; mod js; mod json; mod ts; diff --git a/cli/compilers/ts.rs b/cli/compilers/ts.rs index f3147334f7..3adf924956 100644 --- a/cli/compilers/ts.rs +++ b/cli/compilers/ts.rs @@ -1,4 +1,5 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. +use super::compiler_worker::CompilerWorker; use crate::compilers::CompilationResultFuture; use crate::compilers::CompiledModule; use crate::compilers::CompiledModuleFuture; @@ -13,7 +14,6 @@ use crate::source_maps::SourceMapGetter; use crate::startup_data; use crate::state::*; use crate::version; -use crate::worker::Worker; use deno_core::Buf; use deno_core::ErrBox; use deno_core::ModuleSpecifier; @@ -228,7 +228,7 @@ impl TsCompiler { } /// Create a new V8 worker with snapshot of TS compiler and setup compiler's runtime. - fn setup_worker(global_state: ThreadSafeGlobalState) -> Worker { + fn setup_worker(global_state: ThreadSafeGlobalState) -> CompilerWorker { let (int, ext) = ThreadSafeState::create_channels(); let worker_state = ThreadSafeState::new(global_state.clone(), None, None, int) @@ -240,7 +240,7 @@ impl TsCompiler { .compiler_starts .fetch_add(1, Ordering::SeqCst); - let mut worker = Worker::new( + let mut worker = CompilerWorker::new( "TS".to_string(), startup_data::compiler_isolate_init(), worker_state, @@ -279,7 +279,7 @@ impl TsCompiler { worker.post_message(req_msg).await?; worker.await?; debug!("Sent message to worker"); - let maybe_msg = worker_.get_message().await?; + let maybe_msg = worker_.get_message().await; debug!("Received message from worker"); if let Some(msg) = maybe_msg { let json_str = std::str::from_utf8(&msg).unwrap(); @@ -378,7 +378,7 @@ impl TsCompiler { worker.post_message(req_msg).await?; worker.await?; debug!("Sent message to worker"); - let maybe_msg = worker_.get_message().await?; + let maybe_msg = worker_.get_message().await; if let Some(msg) = maybe_msg { let json_str = std::str::from_utf8(&msg).unwrap(); debug!("Message: {}", json_str); @@ -633,7 +633,7 @@ pub fn runtime_compile_async( worker.post_message(req_msg).await?; worker.await?; debug!("Sent message to worker"); - let msg = (worker_.get_message().await?).unwrap(); + let msg = (worker_.get_message().await).unwrap(); let json_str = std::str::from_utf8(&msg).unwrap(); Ok(json!(json_str)) } @@ -661,7 +661,7 @@ pub fn runtime_transpile_async( worker.post_message(req_msg).await?; worker.await?; debug!("Sent message to worker"); - let msg = (worker_.get_message().await?).unwrap(); + let msg = (worker_.get_message().await).unwrap(); let json_str = std::str::from_utf8(&msg).unwrap(); Ok(json!(json_str)) } diff --git a/cli/compilers/wasm.rs b/cli/compilers/wasm.rs index e2a293f187..637fc76878 100644 --- a/cli/compilers/wasm.rs +++ b/cli/compilers/wasm.rs @@ -1,11 +1,11 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. +use super::compiler_worker::CompilerWorker; use crate::compilers::CompiledModule; use crate::compilers::CompiledModuleFuture; use crate::file_fetcher::SourceFile; use crate::global_state::ThreadSafeGlobalState; use crate::startup_data; use crate::state::*; -use crate::worker::Worker; use futures::FutureExt; use serde_derive::Deserialize; use serde_json; @@ -42,7 +42,7 @@ pub struct WasmCompiler { impl WasmCompiler { /// Create a new V8 worker with snapshot of WASM compiler and setup compiler's runtime. - fn setup_worker(global_state: ThreadSafeGlobalState) -> Worker { + fn setup_worker(global_state: ThreadSafeGlobalState) -> CompilerWorker { let (int, ext) = ThreadSafeState::create_channels(); let worker_state = ThreadSafeState::new(global_state.clone(), None, None, int) @@ -54,7 +54,7 @@ impl WasmCompiler { .compiler_starts .fetch_add(1, Ordering::SeqCst); - let mut worker = Worker::new( + let mut worker = CompilerWorker::new( "WASM".to_string(), startup_data::compiler_isolate_init(), worker_state, @@ -100,10 +100,9 @@ impl WasmCompiler { std::process::exit(1); } debug!("Sent message to worker"); - let maybe_msg = worker_.get_message().await.expect("not handled"); + let json_msg = worker_.get_message().await.expect("not handled"); debug!("Received message from worker"); - let json_msg = maybe_msg.unwrap(); let module_info: WasmModuleInfo = serde_json::from_slice(&json_msg).unwrap(); debug!("WASM module info: {:#?}", &module_info); diff --git a/cli/lib.rs b/cli/lib.rs index c0ad84c852..53dac1ea9a 100644 --- a/cli/lib.rs +++ b/cli/lib.rs @@ -60,7 +60,7 @@ use crate::global_state::ThreadSafeGlobalState; use crate::ops::io::get_stdio; use crate::progress::Progress; use crate::state::ThreadSafeState; -use crate::worker::Worker; +use crate::worker::MainWorker; use deno_core::v8_set_flags; use deno_core::ErrBox; use deno_core::ModuleSpecifier; @@ -97,7 +97,7 @@ impl log::Log for Logger { fn create_worker_and_state( flags: DenoFlags, -) -> (Worker, ThreadSafeGlobalState) { +) -> (MainWorker, ThreadSafeGlobalState) { use crate::shell::Shell; use std::sync::Arc; use std::sync::Mutex; @@ -135,7 +135,7 @@ fn create_worker_and_state( resource_table.add("stderr", Box::new(stderr)); } - let worker = Worker::new( + let worker = MainWorker::new( "main".to_string(), startup_data::deno_isolate_init(), state, @@ -150,7 +150,7 @@ fn types_command() { println!("{}", content); } -fn print_cache_info(worker: Worker) { +fn print_cache_info(worker: MainWorker) { let state = &worker.state.global_state; println!( @@ -170,7 +170,10 @@ fn print_cache_info(worker: Worker) { ); } -async fn print_file_info(worker: Worker, module_specifier: ModuleSpecifier) { +async fn print_file_info( + worker: MainWorker, + module_specifier: ModuleSpecifier, +) { let global_state_ = &worker.state.global_state; let maybe_source_file = global_state_ diff --git a/cli/ops/compiler.rs b/cli/ops/compiler.rs index 5d6875fb0e..e515081df4 100644 --- a/cli/ops/compiler.rs +++ b/cli/ops/compiler.rs @@ -1,14 +1,11 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. use super::dispatch_json::{Deserialize, JsonOp, Value}; -use crate::compilers::runtime_compile_async; -use crate::compilers::runtime_transpile_async; use crate::futures::future::try_join_all; use crate::msg; use crate::ops::json_op; use crate::state::ThreadSafeState; use deno_core::Loader; use deno_core::*; -use std::collections::HashMap; pub fn init(i: &mut Isolate, s: &ThreadSafeState) { i.register_op("cache", s.core_op(json_op(s.stateful_op(op_cache)))); @@ -20,8 +17,6 @@ pub fn init(i: &mut Isolate, s: &ThreadSafeState) { "fetch_source_files", s.core_op(json_op(s.stateful_op(op_fetch_source_files))), ); - i.register_op("compile", s.core_op(json_op(s.stateful_op(op_compile)))); - i.register_op("transpile", s.core_op(json_op(s.stateful_op(op_transpile)))); } #[derive(Deserialize)] @@ -150,46 +145,3 @@ fn op_fetch_source_files( Ok(JsonOp::Async(future)) } - -#[derive(Deserialize, Debug)] -#[serde(rename_all = "camelCase")] -struct CompileArgs { - root_name: String, - sources: Option>, - bundle: bool, - options: Option, -} - -fn op_compile( - state: &ThreadSafeState, - args: Value, - _zero_copy: Option, -) -> Result { - let args: CompileArgs = serde_json::from_value(args)?; - Ok(JsonOp::Async(runtime_compile_async( - state.global_state.clone(), - &args.root_name, - &args.sources, - args.bundle, - &args.options, - ))) -} - -#[derive(Deserialize, Debug)] -struct TranspileArgs { - sources: HashMap, - options: Option, -} - -fn op_transpile( - state: &ThreadSafeState, - args: Value, - _zero_copy: Option, -) -> Result { - let args: TranspileArgs = serde_json::from_value(args)?; - Ok(JsonOp::Async(runtime_transpile_async( - state.global_state.clone(), - &args.sources, - &args.options, - ))) -} diff --git a/cli/ops/mod.rs b/cli/ops/mod.rs index 203d1e17e2..81f95ffb9c 100644 --- a/cli/ops/mod.rs +++ b/cli/ops/mod.rs @@ -21,6 +21,7 @@ pub mod process; pub mod random; pub mod repl; pub mod resources; +pub mod runtime_compiler; pub mod timers; pub mod tls; pub mod web_worker; diff --git a/cli/ops/runtime_compiler.rs b/cli/ops/runtime_compiler.rs new file mode 100644 index 0000000000..4202f6b3c1 --- /dev/null +++ b/cli/ops/runtime_compiler.rs @@ -0,0 +1,56 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. +use super::dispatch_json::{Deserialize, JsonOp, Value}; +use crate::compilers::runtime_compile_async; +use crate::compilers::runtime_transpile_async; +use crate::ops::json_op; +use crate::state::ThreadSafeState; +use deno_core::*; +use std::collections::HashMap; + +pub fn init(i: &mut Isolate, s: &ThreadSafeState) { + i.register_op("compile", s.core_op(json_op(s.stateful_op(op_compile)))); + i.register_op("transpile", s.core_op(json_op(s.stateful_op(op_transpile)))); +} + +#[derive(Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +struct CompileArgs { + root_name: String, + sources: Option>, + bundle: bool, + options: Option, +} + +fn op_compile( + state: &ThreadSafeState, + args: Value, + _zero_copy: Option, +) -> Result { + let args: CompileArgs = serde_json::from_value(args)?; + Ok(JsonOp::Async(runtime_compile_async( + state.global_state.clone(), + &args.root_name, + &args.sources, + args.bundle, + &args.options, + ))) +} + +#[derive(Deserialize, Debug)] +struct TranspileArgs { + sources: HashMap, + options: Option, +} + +fn op_transpile( + state: &ThreadSafeState, + args: Value, + _zero_copy: Option, +) -> Result { + let args: TranspileArgs = serde_json::from_value(args)?; + Ok(JsonOp::Async(runtime_transpile_async( + state.global_state.clone(), + &args.sources, + &args.options, + ))) +} diff --git a/cli/ops/web_worker.rs b/cli/ops/web_worker.rs index 300a0dfd1d..be987c09fb 100644 --- a/cli/ops/web_worker.rs +++ b/cli/ops/web_worker.rs @@ -11,10 +11,6 @@ use futures::sink::SinkExt; use futures::stream::StreamExt; use std; use std::convert::From; -use std::future::Future; -use std::pin::Pin; -use std::task::Context; -use std::task::Poll; pub fn init(i: &mut Isolate, s: &ThreadSafeState) { i.register_op( @@ -27,33 +23,16 @@ pub fn init(i: &mut Isolate, s: &ThreadSafeState) { ); } -struct GetMessageFuture { - state: ThreadSafeState, -} - -impl Future for GetMessageFuture { - type Output = Option; - - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { - let inner = self.get_mut(); - let mut channels = inner.state.worker_channels.lock().unwrap(); - let receiver = &mut channels.receiver; - receiver.poll_next_unpin(cx) - } -} - /// Get message from host as guest worker fn op_worker_get_message( state: &ThreadSafeState, _args: Value, _data: Option, ) -> Result { - let op = GetMessageFuture { - state: state.clone(), - }; - + let state_ = state.clone(); let op = async move { - let maybe_buf = op.await; + let mut receiver = state_.worker_channels.receiver.lock().await; + let maybe_buf = receiver.next().await; debug!("op_worker_get_message"); Ok(json!({ "data": maybe_buf })) }; @@ -68,8 +47,7 @@ fn op_worker_post_message( data: Option, ) -> Result { let d = Vec::from(data.unwrap().as_ref()).into_boxed_slice(); - let mut channels = state.worker_channels.lock().unwrap(); - let sender = &mut channels.sender; + let mut sender = state.worker_channels.sender.clone(); futures::executor::block_on(sender.send(d)) .map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?; diff --git a/cli/ops/worker_host.rs b/cli/ops/worker_host.rs index 6ac48228d2..c64e86c1ce 100644 --- a/cli/ops/worker_host.rs +++ b/cli/ops/worker_host.rs @@ -57,21 +57,6 @@ pub fn init(i: &mut Isolate, s: &ThreadSafeState) { i.register_op("metrics", s.core_op(json_op(s.stateful_op(op_metrics)))); } -struct GetMessageFuture { - state: ThreadSafeState, -} - -impl Future for GetMessageFuture { - type Output = Option; - - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { - let inner = self.get_mut(); - let mut channels = inner.state.worker_channels.lock().unwrap(); - let receiver = &mut channels.receiver; - receiver.poll_next_unpin(cx) - } -} - #[derive(Deserialize)] #[serde(rename_all = "camelCase")] struct CreateWorkerArgs { @@ -250,9 +235,12 @@ fn op_host_close_worker( let mut workers_table = state_.workers.lock().unwrap(); let maybe_worker = workers_table.remove(&id); if let Some(worker) = maybe_worker { - let mut channels = worker.state.worker_channels.lock().unwrap(); - channels.sender.close_channel(); - channels.receiver.close(); + let channels = worker.state.worker_channels.clone(); + let mut sender = channels.sender.clone(); + sender.close_channel(); + + let mut receiver = futures::executor::block_on(channels.receiver.lock()); + receiver.close(); }; Ok(JsonOp::Sync(json!({}))) @@ -285,9 +273,9 @@ fn op_host_get_message( _data: Option, ) -> Result { let args: HostGetMessageArgs = serde_json::from_value(args)?; - + let state_ = state.clone(); let id = args.id as u32; - let mut table = state.workers.lock().unwrap(); + let mut table = state_.workers.lock().unwrap(); // TODO: don't return bad resource anymore let worker = table.get_mut(&id).ok_or_else(bad_resource)?; let fut = worker.get_message(); diff --git a/cli/state.rs b/cli/state.rs index 4ad8241bec..02c2582808 100644 --- a/cli/state.rs +++ b/cli/state.rs @@ -35,6 +35,7 @@ use std::sync::Arc; use std::sync::Mutex; use std::sync::MutexGuard; use std::time::Instant; +use tokio::sync::Mutex as AsyncMutex; /// Isolate cannot be passed between threads but ThreadSafeState can. /// ThreadSafeState satisfies Send and Sync. So any state that needs to be @@ -46,7 +47,7 @@ pub struct State { pub global_state: ThreadSafeGlobalState, pub permissions: Arc>, pub main_module: Option, - pub worker_channels: Mutex, + pub worker_channels: WorkerChannels, /// When flags contains a `.import_map_path` option, the content of the /// import map file will be resolved and set. pub import_map: Option, @@ -203,11 +204,11 @@ impl ThreadSafeState { let (out_tx, out_rx) = mpsc::channel::(1); let internal_channels = WorkerChannels { sender: out_tx, - receiver: in_rx, + receiver: Arc::new(AsyncMutex::new(in_rx)), }; let external_channels = WorkerChannels { sender: in_tx, - receiver: out_rx, + receiver: Arc::new(AsyncMutex::new(out_rx)), }; (internal_channels, external_channels) } @@ -241,7 +242,7 @@ impl ThreadSafeState { main_module, permissions, import_map, - worker_channels: Mutex::new(internal_channels), + worker_channels: internal_channels, metrics: Metrics::default(), global_timer: Mutex::new(GlobalTimer::new()), workers: Mutex::new(HashMap::new()), diff --git a/cli/web_worker.rs b/cli/web_worker.rs index f933cbdc47..7b21d49370 100644 --- a/cli/web_worker.rs +++ b/cli/web_worker.rs @@ -1,35 +1,28 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. -use crate::fmt_errors::JSError; use crate::ops; use crate::state::ThreadSafeState; +use crate::worker::Worker; use crate::worker::WorkerChannels; -use crate::worker::WorkerReceiver; use deno_core; -use deno_core::Buf; use deno_core::ErrBox; -use deno_core::ModuleSpecifier; use deno_core::StartupData; use futures::future::FutureExt; -use futures::future::TryFutureExt; -use futures::sink::SinkExt; -use futures::task::AtomicWaker; -use std::env; use std::future::Future; +use std::ops::Deref; +use std::ops::DerefMut; use std::pin::Pin; -use std::sync::Arc; -use std::sync::Mutex; use std::task::Context; use std::task::Poll; -use tokio::sync::Mutex as AsyncMutex; -use url::Url; +/// This worker is implementation of `Worker` Web API +/// +/// At the moment this type of worker supports only +/// communication with parent and creating new workers. +/// +/// Each `WebWorker` is either a child of `MainWorker` or other +/// `WebWorker`. #[derive(Clone)] -pub struct WebWorker { - pub name: String, - pub isolate: Arc>>, - pub state: ThreadSafeState, - external_channels: Arc>, -} +pub struct WebWorker(Worker); impl WebWorker { pub fn new( @@ -38,92 +31,28 @@ impl WebWorker { state: ThreadSafeState, external_channels: WorkerChannels, ) -> Self { - let mut isolate = - deno_core::EsIsolate::new(Box::new(state.clone()), startup_data, false); - - ops::web_worker::init(&mut isolate, &state); - ops::worker_host::init(&mut isolate, &state); - - let global_state_ = state.global_state.clone(); - isolate.set_js_error_create(move |v8_exception| { - JSError::from_v8_exception(v8_exception, &global_state_.ts_compiler) - }); - - Self { - name, - isolate: Arc::new(AsyncMutex::new(isolate)), - state, - external_channels: Arc::new(Mutex::new(external_channels)), - } - } - - /// Same as execute2() but the filename defaults to "$CWD/__anonymous__". - pub fn execute(&mut self, js_source: &str) -> Result<(), ErrBox> { - let path = env::current_dir().unwrap().join("__anonymous__"); - let url = Url::from_file_path(path).unwrap(); - self.execute2(url.as_str(), js_source) - } - - /// Executes the provided JavaScript source code. The js_filename argument is - /// provided only for debugging purposes. - fn execute2( - &mut self, - js_filename: &str, - js_source: &str, - ) -> Result<(), ErrBox> { - let mut isolate = self.isolate.try_lock().unwrap(); - isolate.execute(js_filename, js_source) - } - - /// Executes the provided JavaScript module. - /// - /// Takes ownership of the isolate behind mutex. - pub async fn execute_mod_async( - &mut self, - module_specifier: &ModuleSpecifier, - maybe_code: Option, - is_prefetch: bool, - ) -> Result<(), ErrBox> { - let specifier = module_specifier.to_string(); - let worker = self.clone(); - - let mut isolate = self.isolate.lock().await; - let id = isolate.load_module(&specifier, maybe_code).await?; - worker.state.global_state.progress.done(); - - if !is_prefetch { - return isolate.mod_evaluate(id); + let state_ = state.clone(); + let worker = Worker::new(name, startup_data, state_, external_channels); + { + let mut isolate = worker.isolate.try_lock().unwrap(); + ops::web_worker::init(&mut isolate, &state); + ops::worker_host::init(&mut isolate, &state); } - Ok(()) + Self(worker) } +} - /// Post message to worker as a host. - /// - /// This method blocks current thread. - pub fn post_message( - &self, - buf: Buf, - ) -> impl Future> { - let channels = self.external_channels.lock().unwrap(); - let mut sender = channels.sender.clone(); - async move { - let result = sender.send(buf).map_err(ErrBox::from).await; - drop(sender); - result - } +impl Deref for WebWorker { + type Target = Worker; + fn deref(&self) -> &Self::Target { + &self.0 } +} - /// Get message from worker as a host. - pub fn get_message(&self) -> WorkerReceiver { - WorkerReceiver { - channels: self.external_channels.clone(), - } - } - - pub fn clear_exception(&mut self) { - let mut isolate = self.isolate.try_lock().unwrap(); - isolate.clear_exception(); +impl DerefMut for WebWorker { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 } } @@ -132,14 +61,6 @@ impl Future for WebWorker { fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { let inner = self.get_mut(); - let waker = AtomicWaker::new(); - waker.register(cx.waker()); - match inner.isolate.try_lock() { - Ok(mut isolate) => isolate.poll_unpin(cx), - Err(_) => { - waker.wake(); - Poll::Pending - } - } + inner.0.poll_unpin(cx) } } diff --git a/cli/worker.rs b/cli/worker.rs index 4ad79a09b2..ef72602d41 100644 --- a/cli/worker.rs +++ b/cli/worker.rs @@ -15,9 +15,10 @@ use futures::stream::StreamExt; use futures::task::AtomicWaker; use std::env; use std::future::Future; +use std::ops::Deref; +use std::ops::DerefMut; use std::pin::Pin; use std::sync::Arc; -use std::sync::Mutex; use std::task::Context; use std::task::Poll; use tokio::sync::Mutex as AsyncMutex; @@ -26,19 +27,30 @@ use url::Url; /// Wraps mpsc channels so they can be referenced /// from ops and used to facilitate parent-child communication /// for workers. +#[derive(Clone)] pub struct WorkerChannels { pub sender: mpsc::Sender, - pub receiver: mpsc::Receiver, + pub receiver: Arc>>, } -/// Wraps deno_core::Isolate to provide source maps, ops for the CLI, and -/// high-level module loading. +/// Worker is a CLI wrapper for `deno_core::Isolate`. +/// +/// It provides infrastructure to communicate with a worker and +/// consequently between workers. +/// +/// This struct is meant to be used as a base struct for concrete +/// type of worker that registers set of ops. +/// +/// Currently there are three types of workers: +/// - `MainWorker` +/// - `CompilerWorker` +/// - `WebWorker` #[derive(Clone)] pub struct Worker { pub name: String, pub isolate: Arc>>, pub state: ThreadSafeState, - external_channels: Arc>, + external_channels: WorkerChannels, } impl Worker { @@ -50,26 +62,6 @@ impl Worker { ) -> Self { let mut isolate = deno_core::EsIsolate::new(Box::new(state.clone()), startup_data, false); - let op_registry = isolate.op_registry.clone(); - - ops::compiler::init(&mut isolate, &state); - ops::errors::init(&mut isolate, &state); - ops::fetch::init(&mut isolate, &state); - ops::files::init(&mut isolate, &state); - ops::fs::init(&mut isolate, &state); - ops::io::init(&mut isolate, &state); - ops::plugins::init(&mut isolate, &state, op_registry); - ops::net::init(&mut isolate, &state); - ops::tls::init(&mut isolate, &state); - ops::os::init(&mut isolate, &state); - ops::permissions::init(&mut isolate, &state); - ops::process::init(&mut isolate, &state); - ops::random::init(&mut isolate, &state); - ops::repl::init(&mut isolate, &state); - ops::resources::init(&mut isolate, &state); - ops::timers::init(&mut isolate, &state); - ops::worker_host::init(&mut isolate, &state); - ops::web_worker::init(&mut isolate, &state); let global_state_ = state.global_state.clone(); isolate.set_js_error_create(move |v8_exception| { @@ -80,7 +72,7 @@ impl Worker { name, isolate: Arc::new(AsyncMutex::new(isolate)), state, - external_channels: Arc::new(Mutex::new(external_channels)), + external_channels, } } @@ -128,24 +120,24 @@ impl Worker { /// Post message to worker as a host. /// /// This method blocks current thread. - pub fn post_message( - &self, - buf: Buf, - ) -> impl Future> { - let channels = self.external_channels.lock().unwrap(); - let mut sender = channels.sender.clone(); - async move { - let result = sender.send(buf).map_err(ErrBox::from).await; - drop(sender); - result - } + pub async fn post_message(&self, buf: Buf) -> Result<(), ErrBox> { + let mut sender = self.external_channels.sender.clone(); + let result = sender.send(buf).map_err(ErrBox::from).await; + drop(sender); + result } /// Get message from worker as a host. - pub fn get_message(&self) -> WorkerReceiver { - WorkerReceiver { - channels: self.external_channels.clone(), + pub fn get_message( + &self, + ) -> Pin> + Send>> { + let receiver_mutex = self.external_channels.receiver.clone(); + + async move { + let mut receiver = receiver_mutex.lock().await; + receiver.next().await } + .boxed() } pub fn clear_exception(&mut self) { @@ -171,22 +163,71 @@ impl Future for Worker { } } -/// This structure wraps worker's resource id to implement future -/// that will return message received from worker or None -/// if worker's channel has been closed. -pub struct WorkerReceiver { - pub channels: Arc>, +/// This worker is created and used by Deno executable. +/// +/// It provides ops available in the `Deno` namespace. +/// +/// All WebWorkers created during program execution are decendants of +/// this worker. +#[derive(Clone)] +pub struct MainWorker(Worker); + +impl MainWorker { + pub fn new( + name: String, + startup_data: StartupData, + state: ThreadSafeState, + external_channels: WorkerChannels, + ) -> Self { + let state_ = state.clone(); + let worker = Worker::new(name, startup_data, state_, external_channels); + { + let mut isolate = worker.isolate.try_lock().unwrap(); + let op_registry = isolate.op_registry.clone(); + + ops::runtime_compiler::init(&mut isolate, &state); + ops::errors::init(&mut isolate, &state); + ops::fetch::init(&mut isolate, &state); + ops::files::init(&mut isolate, &state); + ops::fs::init(&mut isolate, &state); + ops::io::init(&mut isolate, &state); + ops::plugins::init(&mut isolate, &state, op_registry); + ops::net::init(&mut isolate, &state); + ops::tls::init(&mut isolate, &state); + ops::os::init(&mut isolate, &state); + ops::permissions::init(&mut isolate, &state); + ops::process::init(&mut isolate, &state); + ops::random::init(&mut isolate, &state); + ops::repl::init(&mut isolate, &state); + ops::resources::init(&mut isolate, &state); + ops::timers::init(&mut isolate, &state); + ops::worker_host::init(&mut isolate, &state); + ops::web_worker::init(&mut isolate, &state); + } + + Self(worker) + } } -impl Future for WorkerReceiver { - type Output = Result, ErrBox>; +impl Deref for MainWorker { + type Target = Worker; + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for MainWorker { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl Future for MainWorker { + type Output = Result<(), ErrBox>; fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { - let mut channels = self.channels.lock().unwrap(); - match channels.receiver.poll_next_unpin(cx) { - Poll::Ready(v) => Poll::Ready(Ok(v)), - Poll::Pending => Poll::Pending, - } + let inner = self.get_mut(); + inner.0.poll_unpin(cx) } } @@ -248,7 +289,7 @@ mod tests { let state_ = state.clone(); tokio_util::run(async move { let mut worker = - Worker::new("TEST".to_string(), StartupData::None, state, ext); + MainWorker::new("TEST".to_string(), StartupData::None, state, ext); let result = worker .execute_mod_async(&module_specifier, None, false) .await; @@ -291,7 +332,7 @@ mod tests { let state_ = state.clone(); tokio_util::run(async move { let mut worker = - Worker::new("TEST".to_string(), StartupData::None, state, ext); + MainWorker::new("TEST".to_string(), StartupData::None, state, ext); let result = worker .execute_mod_async(&module_specifier, None, false) .await; @@ -333,7 +374,7 @@ mod tests { let global_state_ = global_state; let state_ = state.clone(); tokio_util::run(async move { - let mut worker = Worker::new( + let mut worker = MainWorker::new( "TEST".to_string(), startup_data::deno_isolate_init(), state, @@ -359,13 +400,13 @@ mod tests { drop(http_server_guard); } - fn create_test_worker() -> Worker { + fn create_test_worker() -> MainWorker { let (int, ext) = ThreadSafeState::create_channels(); let state = ThreadSafeState::mock( vec![String::from("./deno"), String::from("hello.js")], int, ); - let mut worker = Worker::new( + let mut worker = MainWorker::new( "TEST".to_string(), startup_data::deno_isolate_init(), state, @@ -409,7 +450,7 @@ mod tests { let r = block_on(worker_.post_message(msg)); assert!(r.is_ok()); - let maybe_msg = block_on(worker_.get_message()).unwrap(); + let maybe_msg = block_on(worker_.get_message()); assert!(maybe_msg.is_some()); // Check if message received is [1, 2, 3] in json assert_eq!(*maybe_msg.unwrap(), *b"[1,2,3]");