diff --git a/cli/main.rs b/cli/main.rs index 47dd4087dc..fdbfdad773 100644 --- a/cli/main.rs +++ b/cli/main.rs @@ -45,6 +45,7 @@ mod tools; mod tsc; mod tsc_config; mod version; +mod web_worker; mod worker; use crate::file_fetcher::File; @@ -792,37 +793,31 @@ async fn test_command( Ok(()) } -pub fn main() { - #[cfg(windows)] - colors::enable_ansi(); // For Windows 10 - - let args: Vec = env::args().collect(); - let flags = flags::flags_from_vec(args); - - if let Some(ref v8_flags) = flags.v8_flags { - let v8_flags_includes_help = v8_flags - .iter() - .any(|flag| flag == "-help" || flag == "--help"); - let v8_flags = once("UNUSED_BUT_NECESSARY_ARG0".to_owned()) - .chain(v8_flags.iter().cloned()) - .collect::>(); - let unrecognized_v8_flags = v8_set_flags(v8_flags) - .into_iter() - .skip(1) - .collect::>(); - if !unrecognized_v8_flags.is_empty() { - for f in unrecognized_v8_flags { - eprintln!("error: V8 did not recognize flag '{}'", f); - } - eprintln!("\nFor a list of V8 flags, use '--v8-flags=--help'"); - std::process::exit(1); - } - if v8_flags_includes_help { - std::process::exit(0); +fn init_v8_flags(v8_flags: &[String]) { + let v8_flags_includes_help = v8_flags + .iter() + .any(|flag| flag == "-help" || flag == "--help"); + let v8_flags = once("UNUSED_BUT_NECESSARY_ARG0".to_owned()) + .chain(v8_flags.iter().cloned()) + .collect::>(); + let unrecognized_v8_flags = v8_set_flags(v8_flags) + .into_iter() + .skip(1) + .collect::>(); + if !unrecognized_v8_flags.is_empty() { + for f in unrecognized_v8_flags { + eprintln!("error: V8 did not recognize flag '{}'", f); } + eprintln!("\nFor a list of V8 flags, use '--v8-flags=--help'"); + std::process::exit(1); } + if v8_flags_includes_help { + std::process::exit(0); + } +} - let log_level = match flags.log_level { +fn init_logger(maybe_level: Option) { + let log_level = match maybe_level { Some(level) => level, None => Level::Info, // Default log level }; @@ -853,8 +848,12 @@ pub fn main() { } }) .init(); +} - let fut = match flags.clone().subcommand { +fn get_subcommand( + flags: Flags, +) -> Pin>>> { + match flags.clone().subcommand { DenoSubcommand::Bundle { source_file, out_file, @@ -914,7 +913,7 @@ pub fn main() { eprintln!("{}", e); std::process::exit(1); } - return; + std::process::exit(0); } DenoSubcommand::Types => { let types = get_types(flags.unstable); @@ -922,7 +921,7 @@ pub fn main() { eprintln!("{}", e); std::process::exit(1); } - return; + std::process::exit(0); } DenoSubcommand::Upgrade { force, @@ -934,9 +933,23 @@ pub fn main() { tools::upgrade::upgrade_command(dry_run, force, version, output, ca_file) .boxed_local() } - }; + } +} - let result = tokio_util::run_basic(fut); +pub fn main() { + #[cfg(windows)] + colors::enable_ansi(); // For Windows 10 + + let args: Vec = env::args().collect(); + let flags = flags::flags_from_vec(args); + + if let Some(ref v8_flags) = flags.v8_flags { + init_v8_flags(v8_flags); + } + init_logger(flags.log_level); + + let subcommand_future = get_subcommand(flags); + let result = tokio_util::run_basic(subcommand_future); if let Err(err) = result { eprintln!("{}: {}", colors::red_bold("error"), err.to_string()); std::process::exit(1); diff --git a/cli/ops/web_worker.rs b/cli/ops/web_worker.rs index 42b6a56ceb..d88330a048 100644 --- a/cli/ops/web_worker.rs +++ b/cli/ops/web_worker.rs @@ -1,7 +1,7 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. -use crate::worker::WebWorkerHandle; -use crate::worker::WorkerEvent; +use crate::web_worker::WebWorkerHandle; +use crate::web_worker::WorkerEvent; use deno_core::futures::channel::mpsc; use deno_core::serde_json::json; diff --git a/cli/ops/worker_host.rs b/cli/ops/worker_host.rs index f2e936ef25..c464e6df2f 100644 --- a/cli/ops/worker_host.rs +++ b/cli/ops/worker_host.rs @@ -5,9 +5,9 @@ use crate::ops::io::get_stdio; use crate::permissions::Permissions; use crate::program_state::ProgramState; use crate::tokio_util::create_basic_runtime; -use crate::worker::WebWorker; -use crate::worker::WebWorkerHandle; -use crate::worker::WorkerEvent; +use crate::web_worker::WebWorker; +use crate::web_worker::WebWorkerHandle; +use crate::web_worker::WorkerEvent; use deno_core::error::generic_error; use deno_core::error::AnyError; use deno_core::error::JsError; diff --git a/cli/tools/repl.rs b/cli/tools/repl.rs index e6a2fd709d..8786f57f39 100644 --- a/cli/tools/repl.rs +++ b/cli/tools/repl.rs @@ -4,7 +4,6 @@ use crate::colors; use crate::inspector::InspectorSession; use crate::program_state::ProgramState; use crate::worker::MainWorker; -use crate::worker::Worker; use deno_core::error::AnyError; use deno_core::serde_json::json; use deno_core::serde_json::Value; @@ -280,7 +279,7 @@ impl Highlighter for LineHighlighter { } async fn post_message_and_poll( - worker: &mut Worker, + worker: &mut MainWorker, session: &mut InspectorSession, method: &str, params: Option, @@ -305,7 +304,7 @@ async fn post_message_and_poll( } async fn read_line_and_poll( - worker: &mut Worker, + worker: &mut MainWorker, session: &mut InspectorSession, message_rx: &Receiver<(String, Option)>, response_tx: &Sender>, @@ -425,7 +424,7 @@ pub async fn run( let history_file = program_state.dir.root.join("deno_history.txt"); - post_message_and_poll(&mut *worker, &mut session, "Runtime.enable", None) + post_message_and_poll(&mut worker, &mut session, "Runtime.enable", None) .await?; // Enabling the runtime domain will always send trigger one executionContextCreated for each @@ -474,7 +473,7 @@ pub async fn run( while !is_closing(&mut worker, &mut session, context_id).await? { let line = read_line_and_poll( - &mut *worker, + &mut worker, &mut session, &message_rx, &response_tx, @@ -495,7 +494,7 @@ pub async fn run( }; let evaluate_response = post_message_and_poll( - &mut *worker, + &mut worker, &mut session, "Runtime.evaluate", Some(json!({ @@ -513,7 +512,7 @@ pub async fn run( && wrapped_line != line { post_message_and_poll( - &mut *worker, + &mut worker, &mut session, "Runtime.evaluate", Some(json!({ @@ -533,7 +532,7 @@ pub async fn run( if evaluate_exception_details.is_some() { post_message_and_poll( - &mut *worker, + &mut worker, &mut session, "Runtime.callFunctionOn", Some(json!({ @@ -546,7 +545,7 @@ pub async fn run( ).await?; } else { post_message_and_poll( - &mut *worker, + &mut worker, &mut session, "Runtime.callFunctionOn", Some(json!({ @@ -564,7 +563,7 @@ pub async fn run( // Deno.inspectArgs. let inspect_response = post_message_and_poll( - &mut *worker, + &mut worker, &mut session, "Runtime.callFunctionOn", Some(json!({ diff --git a/cli/web_worker.rs b/cli/web_worker.rs new file mode 100644 index 0000000000..97db42279a --- /dev/null +++ b/cli/web_worker.rs @@ -0,0 +1,454 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. + +use crate::colors; +use crate::fmt_errors::PrettyJsError; +use crate::inspector::DenoInspector; +use crate::js; +use crate::metrics::Metrics; +use crate::module_loader::CliModuleLoader; +use crate::ops; +use crate::permissions::Permissions; +use crate::program_state::ProgramState; +use crate::source_maps::apply_source_map; +use deno_core::error::AnyError; +use deno_core::futures::channel::mpsc; +use deno_core::futures::future::poll_fn; +use deno_core::futures::future::FutureExt; +use deno_core::futures::stream::StreamExt; +use deno_core::futures::task::AtomicWaker; +use deno_core::url::Url; +use deno_core::v8; +use deno_core::JsRuntime; +use deno_core::ModuleSpecifier; +use deno_core::RuntimeOptions; +use std::env; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; +use std::sync::Arc; +use std::task::Context; +use std::task::Poll; +use tokio::sync::Mutex as AsyncMutex; + +/// Events that are sent to host from child +/// worker. +pub enum WorkerEvent { + Message(Box<[u8]>), + Error(AnyError), + TerminalError(AnyError), +} + +pub struct WorkerChannelsInternal { + pub sender: mpsc::Sender, + pub receiver: mpsc::Receiver>, +} + +/// Wrapper for `WorkerHandle` that adds functionality +/// for terminating workers. +/// +/// This struct is used by host as well as worker itself. +/// +/// Host uses it to communicate with worker and terminate it, +/// while worker uses it only to finish execution on `self.close()`. +#[derive(Clone)] +pub struct WebWorkerHandle { + pub sender: mpsc::Sender>, + pub receiver: Arc>>, + terminate_tx: mpsc::Sender<()>, + terminated: Arc, + isolate_handle: v8::IsolateHandle, +} + +impl WebWorkerHandle { + /// Post message to worker as a host. + pub fn post_message(&self, buf: Box<[u8]>) -> Result<(), AnyError> { + let mut sender = self.sender.clone(); + sender.try_send(buf)?; + Ok(()) + } + + /// Get the event with lock. + /// Return error if more than one listener tries to get event + pub async fn get_event(&self) -> Result, AnyError> { + let mut receiver = self.receiver.try_lock()?; + Ok(receiver.next().await) + } + + pub fn terminate(&self) { + // This function can be called multiple times by whomever holds + // the handle. However only a single "termination" should occur so + // we need a guard here. + let already_terminated = self.terminated.swap(true, Ordering::Relaxed); + + if !already_terminated { + self.isolate_handle.terminate_execution(); + let mut sender = self.terminate_tx.clone(); + // This call should be infallible hence the `expect`. + // This might change in the future. + sender.try_send(()).expect("Failed to terminate"); + } + } +} + +fn create_channels( + isolate_handle: v8::IsolateHandle, + terminate_tx: mpsc::Sender<()>, +) -> (WorkerChannelsInternal, WebWorkerHandle) { + let (in_tx, in_rx) = mpsc::channel::>(1); + let (out_tx, out_rx) = mpsc::channel::(1); + let internal_channels = WorkerChannelsInternal { + sender: out_tx, + receiver: in_rx, + }; + let external_channels = WebWorkerHandle { + sender: in_tx, + receiver: Arc::new(AsyncMutex::new(out_rx)), + terminated: Arc::new(AtomicBool::new(false)), + terminate_tx, + isolate_handle, + }; + (internal_channels, external_channels) +} + +/// This struct is an implementation of `Worker` Web API +/// +/// Each `WebWorker` is either a child of `MainWorker` or other +/// `WebWorker`. +pub struct WebWorker { + inspector: Option>, + // Following fields are pub because they are accessed + // when creating a new WebWorker instance. + pub(crate) internal_channels: WorkerChannelsInternal, + pub(crate) js_runtime: JsRuntime, + pub(crate) name: String, + waker: AtomicWaker, + event_loop_idle: bool, + terminate_rx: mpsc::Receiver<()>, + handle: WebWorkerHandle, + pub has_deno_namespace: bool, +} + +impl WebWorker { + pub fn new( + name: String, + permissions: Permissions, + main_module: ModuleSpecifier, + program_state: Arc, + has_deno_namespace: bool, + ) -> Self { + let module_loader = CliModuleLoader::new_for_worker(); + let global_state_ = program_state.clone(); + + let js_error_create_fn = Box::new(move |core_js_error| { + let source_mapped_error = + apply_source_map(&core_js_error, global_state_.clone()); + PrettyJsError::create(source_mapped_error) + }); + + let mut js_runtime = JsRuntime::new(RuntimeOptions { + module_loader: Some(module_loader), + startup_snapshot: Some(js::deno_isolate_init()), + js_error_create_fn: Some(js_error_create_fn), + get_error_class_fn: Some(&crate::errors::get_error_class_name), + ..Default::default() + }); + + let inspector = + if let Some(inspector_server) = &program_state.maybe_inspector_server { + Some(DenoInspector::new( + &mut js_runtime, + Some(inspector_server.clone()), + )) + } else if program_state.flags.coverage || program_state.flags.repl { + Some(DenoInspector::new(&mut js_runtime, None)) + } else { + None + }; + + let (terminate_tx, terminate_rx) = mpsc::channel::<()>(1); + let isolate_handle = js_runtime.v8_isolate().thread_safe_handle(); + let (internal_channels, handle) = + create_channels(isolate_handle, terminate_tx); + + let mut worker = Self { + inspector, + internal_channels, + js_runtime, + name, + waker: AtomicWaker::new(), + event_loop_idle: false, + terminate_rx, + handle, + has_deno_namespace, + }; + + { + let handle = worker.thread_safe_handle(); + let sender = worker.internal_channels.sender.clone(); + let js_runtime = &mut worker.js_runtime; + // All ops registered in this function depend on these + { + let op_state = js_runtime.op_state(); + let mut op_state = op_state.borrow_mut(); + op_state.put::(Default::default()); + op_state.put::>(program_state.clone()); + op_state.put::(permissions); + } + + ops::web_worker::init(js_runtime, sender.clone(), handle); + ops::runtime::init(js_runtime, main_module); + ops::fetch::init(js_runtime, program_state.flags.ca_file.as_deref()); + ops::timers::init(js_runtime); + ops::worker_host::init(js_runtime, Some(sender)); + ops::reg_json_sync(js_runtime, "op_close", deno_core::op_close); + ops::reg_json_sync(js_runtime, "op_resources", deno_core::op_resources); + ops::reg_json_sync( + js_runtime, + "op_domain_to_ascii", + deno_web::op_domain_to_ascii, + ); + ops::errors::init(js_runtime); + ops::io::init(js_runtime); + ops::websocket::init(js_runtime); + + if has_deno_namespace { + ops::fs_events::init(js_runtime); + ops::fs::init(js_runtime); + ops::net::init(js_runtime); + ops::os::init(js_runtime); + ops::permissions::init(js_runtime); + ops::plugin::init(js_runtime); + ops::process::init(js_runtime); + ops::crypto::init(js_runtime, program_state.flags.seed); + ops::runtime_compiler::init(js_runtime); + ops::signal::init(js_runtime); + ops::tls::init(js_runtime); + ops::tty::init(js_runtime); + } + } + + worker + } + + /// Same as execute2() but the filename defaults to "$CWD/__anonymous__". + pub fn execute(&mut self, js_source: &str) -> Result<(), AnyError> { + let path = env::current_dir().unwrap().join("__anonymous__"); + let url = Url::from_file_path(path).unwrap(); + self.js_runtime.execute(url.as_str(), js_source) + } + + /// Loads, instantiates and executes specified JavaScript module. + pub async fn execute_module( + &mut self, + module_specifier: &ModuleSpecifier, + ) -> Result<(), AnyError> { + let id = self.js_runtime.load_module(module_specifier, None).await?; + self.js_runtime.mod_evaluate(id).await + } + + /// Returns a way to communicate with the Worker from other threads. + pub fn thread_safe_handle(&self) -> WebWorkerHandle { + self.handle.clone() + } + + pub fn poll_event_loop( + &mut self, + cx: &mut Context, + ) -> Poll> { + let terminated = self.handle.terminated.load(Ordering::Relaxed); + + if terminated { + return Poll::Ready(Ok(())); + } + + if !self.event_loop_idle { + let poll_result = { + // We always poll the inspector if it exists. + let _ = self.inspector.as_mut().map(|i| i.poll_unpin(cx)); + self.waker.register(cx.waker()); + self.js_runtime.poll_event_loop(cx) + }; + match poll_result { + Poll::Ready(r) => { + let terminated = self.handle.terminated.load(Ordering::Relaxed); + if terminated { + return Poll::Ready(Ok(())); + } + + if let Err(e) = r { + eprintln!( + "{}: Uncaught (in worker \"{}\") {}", + colors::red_bold("error"), + self.name.to_string(), + e.to_string().trim_start_matches("Uncaught "), + ); + let mut sender = self.internal_channels.sender.clone(); + sender + .try_send(WorkerEvent::Error(e)) + .expect("Failed to post message to host"); + } + self.event_loop_idle = true; + } + Poll::Pending => {} + } + } + + if let Poll::Ready(r) = self.terminate_rx.poll_next_unpin(cx) { + // terminate_rx should never be closed + assert!(r.is_some()); + return Poll::Ready(Ok(())); + } + + if let Poll::Ready(r) = self.internal_channels.receiver.poll_next_unpin(cx) + { + match r { + Some(msg) => { + let msg = String::from_utf8(msg.to_vec()).unwrap(); + let script = format!("workerMessageRecvCallback({})", msg); + + if let Err(e) = self.execute(&script) { + // If execution was terminated during message callback then + // just ignore it + if self.handle.terminated.load(Ordering::Relaxed) { + return Poll::Ready(Ok(())); + } + + // Otherwise forward error to host + let mut sender = self.internal_channels.sender.clone(); + sender + .try_send(WorkerEvent::Error(e)) + .expect("Failed to post message to host"); + } + + // Let event loop be polled again + self.event_loop_idle = false; + self.waker.wake(); + } + None => unreachable!(), + } + } + + Poll::Pending + } + + pub async fn run_event_loop(&mut self) -> Result<(), AnyError> { + poll_fn(|cx| self.poll_event_loop(cx)).await + } +} + +impl Drop for WebWorker { + fn drop(&mut self) { + // The Isolate object must outlive the Inspector object, but this is + // currently not enforced by the type system. + self.inspector.take(); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::program_state::ProgramState; + use crate::tokio_util; + use deno_core::serde_json::json; + + fn create_test_web_worker() -> WebWorker { + let main_module = + ModuleSpecifier::resolve_url_or_path("./hello.js").unwrap(); + let program_state = ProgramState::mock(vec!["deno".to_string()], None); + let mut worker = WebWorker::new( + "TEST".to_string(), + Permissions::allow_all(), + main_module, + program_state, + false, + ); + worker + .execute("bootstrap.workerRuntime(\"TEST\", false)") + .unwrap(); + worker + } + + #[tokio::test] + async fn test_worker_messages() { + let (handle_sender, handle_receiver) = + std::sync::mpsc::sync_channel::(1); + + let join_handle = std::thread::spawn(move || { + let mut worker = create_test_web_worker(); + let source = r#" + onmessage = function(e) { + console.log("msg from main script", e.data); + if (e.data == "exit") { + return close(); + } else { + console.assert(e.data === "hi"); + } + postMessage([1, 2, 3]); + console.log("after postMessage"); + } + "#; + worker.execute(source).unwrap(); + let handle = worker.thread_safe_handle(); + handle_sender.send(handle).unwrap(); + let r = tokio_util::run_basic(worker.run_event_loop()); + assert!(r.is_ok()) + }); + + let mut handle = handle_receiver.recv().unwrap(); + + let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); + let r = handle.post_message(msg.clone()); + assert!(r.is_ok()); + + let maybe_msg = handle.get_event().await.unwrap(); + assert!(maybe_msg.is_some()); + + let r = handle.post_message(msg.clone()); + assert!(r.is_ok()); + + let maybe_msg = handle.get_event().await.unwrap(); + assert!(maybe_msg.is_some()); + match maybe_msg { + Some(WorkerEvent::Message(buf)) => { + assert_eq!(*buf, *b"[1,2,3]"); + } + _ => unreachable!(), + } + + let msg = json!("exit") + .to_string() + .into_boxed_str() + .into_boxed_bytes(); + let r = handle.post_message(msg); + assert!(r.is_ok()); + let event = handle.get_event().await.unwrap(); + assert!(event.is_none()); + handle.sender.close_channel(); + join_handle.join().expect("Failed to join worker thread"); + } + + #[tokio::test] + async fn removed_from_resource_table_on_close() { + let (handle_sender, handle_receiver) = + std::sync::mpsc::sync_channel::(1); + + let join_handle = std::thread::spawn(move || { + let mut worker = create_test_web_worker(); + worker.execute("onmessage = () => { close(); }").unwrap(); + let handle = worker.thread_safe_handle(); + handle_sender.send(handle).unwrap(); + let r = tokio_util::run_basic(worker.run_event_loop()); + assert!(r.is_ok()) + }); + + let mut handle = handle_receiver.recv().unwrap(); + + let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); + let r = handle.post_message(msg.clone()); + assert!(r.is_ok()); + let event = handle.get_event().await.unwrap(); + assert!(event.is_none()); + handle.sender.close_channel(); + + join_handle.join().expect("Failed to join worker thread"); + } +} diff --git a/cli/worker.rs b/cli/worker.rs index 74bee1d037..f4a919df6a 100644 --- a/cli/worker.rs +++ b/cli/worker.rs @@ -1,6 +1,5 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. -use crate::colors; use crate::fmt_errors::PrettyJsError; use crate::inspector::DenoInspector; use crate::inspector::InspectorSession; @@ -13,109 +12,39 @@ use crate::permissions::Permissions; use crate::program_state::ProgramState; use crate::source_maps::apply_source_map; use deno_core::error::AnyError; -use deno_core::futures::channel::mpsc; use deno_core::futures::future::poll_fn; use deno_core::futures::future::FutureExt; -use deno_core::futures::stream::StreamExt; -use deno_core::futures::task::AtomicWaker; use deno_core::url::Url; -use deno_core::v8; use deno_core::JsRuntime; use deno_core::ModuleId; use deno_core::ModuleSpecifier; use deno_core::RuntimeOptions; -use deno_core::Snapshot; use std::env; -use std::ops::Deref; -use std::ops::DerefMut; -use std::rc::Rc; -use std::sync::atomic::AtomicBool; -use std::sync::atomic::Ordering; use std::sync::Arc; use std::task::Context; use std::task::Poll; -use tokio::sync::Mutex as AsyncMutex; -/// Events that are sent to host from child -/// worker. -pub enum WorkerEvent { - Message(Box<[u8]>), - Error(AnyError), - TerminalError(AnyError), -} - -pub struct WorkerChannelsInternal { - pub sender: mpsc::Sender, - pub receiver: mpsc::Receiver>, -} - -#[derive(Clone)] -pub struct WorkerHandle { - pub sender: mpsc::Sender>, - pub receiver: Arc>>, -} - -impl WorkerHandle { - /// Post message to worker as a host. - pub fn post_message(&self, buf: Box<[u8]>) -> Result<(), AnyError> { - let mut sender = self.sender.clone(); - sender.try_send(buf)?; - Ok(()) - } - - /// Get the event with lock. - /// Return error if more than one listener tries to get event - pub async fn get_event(&self) -> Result, AnyError> { - let mut receiver = self.receiver.try_lock()?; - Ok(receiver.next().await) - } -} - -fn create_channels() -> (WorkerChannelsInternal, WorkerHandle) { - let (in_tx, in_rx) = mpsc::channel::>(1); - let (out_tx, out_rx) = mpsc::channel::(1); - let internal_channels = WorkerChannelsInternal { - sender: out_tx, - receiver: in_rx, - }; - let external_channels = WorkerHandle { - sender: in_tx, - receiver: Arc::new(AsyncMutex::new(out_rx)), - }; - (internal_channels, external_channels) -} - -/// Worker is a CLI wrapper for `deno_core::Isolate`. +/// This worker is created and used by almost all +/// subcommands in Deno executable. /// -/// It provides infrastructure to communicate with a worker and -/// consequently between workers. +/// It provides ops available in the `Deno` namespace. /// -/// This struct is meant to be used as a base struct for concrete -/// type of worker that registers set of ops. -/// -/// Currently there are two types of workers: -/// - `MainWorker` -/// - `WebWorker` -pub struct Worker { - external_channels: WorkerHandle, +/// All `WebWorker`s created during program execution +/// are descendants of this worker. +pub struct MainWorker { inspector: Option>, - // Following fields are pub because they are accessed - // when creating a new WebWorker instance. - pub(crate) internal_channels: WorkerChannelsInternal, - pub(crate) js_runtime: JsRuntime, - pub(crate) name: String, + js_runtime: JsRuntime, should_break_on_first_statement: bool, - waker: AtomicWaker, } -impl Worker { +impl MainWorker { pub fn new( - name: String, - startup_snapshot: Snapshot, - program_state: Arc, - module_loader: Rc, - is_main: bool, + program_state: &Arc, + main_module: ModuleSpecifier, + permissions: Permissions, ) -> Self { + let module_loader = + CliModuleLoader::new(program_state.maybe_import_map.clone()); let global_state_ = program_state.clone(); let js_error_create_fn = Box::new(move |core_js_error| { @@ -126,7 +55,7 @@ impl Worker { let mut js_runtime = JsRuntime::new(RuntimeOptions { module_loader: Some(module_loader), - startup_snapshot: Some(startup_snapshot), + startup_snapshot: Some(js::deno_isolate_init()), js_error_create_fn: Some(js_error_create_fn), get_error_class_fn: Some(&crate::errors::get_error_class_name), ..Default::default() @@ -144,126 +73,15 @@ impl Worker { None }; - let should_break_on_first_statement = inspector.is_some() - && is_main - && program_state.flags.inspect_brk.is_some(); + let should_break_on_first_statement = + inspector.is_some() && program_state.flags.inspect_brk.is_some(); - let (internal_channels, external_channels) = create_channels(); - - Self { - external_channels, + let mut worker = Self { inspector, - internal_channels, js_runtime, - name, should_break_on_first_statement, - waker: AtomicWaker::new(), - } - } + }; - /// Same as execute2() but the filename defaults to "$CWD/__anonymous__". - pub fn execute(&mut self, js_source: &str) -> Result<(), AnyError> { - let path = env::current_dir().unwrap().join("__anonymous__"); - let url = Url::from_file_path(path).unwrap(); - self.execute2(url.as_str(), js_source) - } - - /// Executes the provided JavaScript source code. The js_filename argument is - /// provided only for debugging purposes. - pub fn execute2( - &mut self, - js_filename: &str, - js_source: &str, - ) -> Result<(), AnyError> { - self.js_runtime.execute(js_filename, js_source) - } - - /// Loads and instantiates specified JavaScript module. - pub async fn preload_module( - &mut self, - module_specifier: &ModuleSpecifier, - ) -> Result { - self.js_runtime.load_module(module_specifier, None).await - } - - /// Loads, instantiates and executes specified JavaScript module. - pub async fn execute_module( - &mut self, - module_specifier: &ModuleSpecifier, - ) -> Result<(), AnyError> { - let id = self.preload_module(module_specifier).await?; - self.wait_for_inspector_session(); - self.js_runtime.mod_evaluate(id).await - } - - /// Returns a way to communicate with the Worker from other threads. - pub fn thread_safe_handle(&self) -> WorkerHandle { - self.external_channels.clone() - } - - fn wait_for_inspector_session(&mut self) { - if self.should_break_on_first_statement { - self - .inspector - .as_mut() - .unwrap() - .wait_for_session_and_break_on_next_statement() - } - } - - /// Create new inspector session. This function panics if Worker - /// was not configured to create inspector. - pub fn create_inspector_session(&mut self) -> Box { - let inspector = self.inspector.as_mut().unwrap(); - - InspectorSession::new(&mut **inspector) - } - - pub fn poll_event_loop( - &mut self, - cx: &mut Context, - ) -> Poll> { - // We always poll the inspector if it exists. - let _ = self.inspector.as_mut().map(|i| i.poll_unpin(cx)); - self.waker.register(cx.waker()); - self.js_runtime.poll_event_loop(cx) - } - - pub async fn run_event_loop(&mut self) -> Result<(), AnyError> { - poll_fn(|cx| self.poll_event_loop(cx)).await - } -} - -impl Drop for Worker { - fn drop(&mut self) { - // The Isolate object must outlive the Inspector object, but this is - // currently not enforced by the type system. - self.inspector.take(); - } -} - -/// This worker is created and used by Deno executable. -/// -/// It provides ops available in the `Deno` namespace. -/// -/// All WebWorkers created during program execution are descendants of -/// this worker. -pub struct MainWorker(Worker); - -impl MainWorker { - pub fn new( - program_state: &Arc, - main_module: ModuleSpecifier, - permissions: Permissions, - ) -> Self { - let loader = CliModuleLoader::new(program_state.maybe_import_map.clone()); - let mut worker = Worker::new( - "main".to_string(), - js::deno_isolate_init(), - program_state.clone(), - loader, - true, - ); let js_runtime = &mut worker.js_runtime; { // All ops registered in this function depend on these @@ -320,266 +138,71 @@ impl MainWorker { worker .execute("bootstrap.mainRuntime()") .expect("Failed to execute bootstrap script"); - Self(worker) + worker } -} -impl Deref for MainWorker { - type Target = Worker; - fn deref(&self) -> &Self::Target { - &self.0 + /// Same as execute2() but the filename defaults to "$CWD/__anonymous__". + pub fn execute(&mut self, js_source: &str) -> Result<(), AnyError> { + let path = env::current_dir().unwrap().join("__anonymous__"); + let url = Url::from_file_path(path).unwrap(); + self.js_runtime.execute(url.as_str(), js_source) } -} -impl DerefMut for MainWorker { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 + /// Loads and instantiates specified JavaScript module. + pub async fn preload_module( + &mut self, + module_specifier: &ModuleSpecifier, + ) -> Result { + self.js_runtime.load_module(module_specifier, None).await } -} -/// Wrapper for `WorkerHandle` that adds functionality -/// for terminating workers. -/// -/// This struct is used by host as well as worker itself. -/// -/// Host uses it to communicate with worker and terminate it, -/// while worker uses it only to finish execution on `self.close()`. -#[derive(Clone)] -pub struct WebWorkerHandle { - worker_handle: WorkerHandle, - terminate_tx: mpsc::Sender<()>, - terminated: Arc, - isolate_handle: v8::IsolateHandle, -} - -impl Deref for WebWorkerHandle { - type Target = WorkerHandle; - fn deref(&self) -> &Self::Target { - &self.worker_handle + /// Loads, instantiates and executes specified JavaScript module. + pub async fn execute_module( + &mut self, + module_specifier: &ModuleSpecifier, + ) -> Result<(), AnyError> { + let id = self.preload_module(module_specifier).await?; + self.wait_for_inspector_session(); + self.js_runtime.mod_evaluate(id).await } -} -impl DerefMut for WebWorkerHandle { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.worker_handle - } -} - -impl WebWorkerHandle { - pub fn terminate(&self) { - // This function can be called multiple times by whomever holds - // the handle. However only a single "termination" should occur so - // we need a guard here. - let already_terminated = self.terminated.swap(true, Ordering::Relaxed); - - if !already_terminated { - self.isolate_handle.terminate_execution(); - let mut sender = self.terminate_tx.clone(); - // This call should be infallible hence the `expect`. - // This might change in the future. - sender.try_send(()).expect("Failed to terminate"); + fn wait_for_inspector_session(&mut self) { + if self.should_break_on_first_statement { + self + .inspector + .as_mut() + .unwrap() + .wait_for_session_and_break_on_next_statement() } } -} -/// This worker is implementation of `Worker` Web API -/// -/// At the moment this type of worker supports only -/// communication with parent and creating new workers. -/// -/// Each `WebWorker` is either a child of `MainWorker` or other -/// `WebWorker`. -pub struct WebWorker { - worker: Worker, - event_loop_idle: bool, - terminate_rx: mpsc::Receiver<()>, - handle: WebWorkerHandle, - pub has_deno_namespace: bool, -} + /// Create new inspector session. This function panics if Worker + /// was not configured to create inspector. + pub fn create_inspector_session(&mut self) -> Box { + let inspector = self.inspector.as_mut().unwrap(); -impl WebWorker { - pub fn new( - name: String, - permissions: Permissions, - main_module: ModuleSpecifier, - program_state: Arc, - has_deno_namespace: bool, - ) -> Self { - let loader = CliModuleLoader::new_for_worker(); - let mut worker = Worker::new( - name, - js::deno_isolate_init(), - program_state.clone(), - loader, - false, - ); - - let terminated = Arc::new(AtomicBool::new(false)); - let isolate_handle = worker.js_runtime.v8_isolate().thread_safe_handle(); - let (terminate_tx, terminate_rx) = mpsc::channel::<()>(1); - - let handle = WebWorkerHandle { - worker_handle: worker.thread_safe_handle(), - terminated, - isolate_handle, - terminate_tx, - }; - - let mut web_worker = Self { - worker, - event_loop_idle: false, - terminate_rx, - handle, - has_deno_namespace, - }; - - { - let handle = web_worker.thread_safe_handle(); - let sender = web_worker.worker.internal_channels.sender.clone(); - let js_runtime = &mut web_worker.js_runtime; - // All ops registered in this function depend on these - { - let op_state = js_runtime.op_state(); - let mut op_state = op_state.borrow_mut(); - op_state.put::(Default::default()); - op_state.put::>(program_state.clone()); - op_state.put::(permissions); - } - - ops::web_worker::init(js_runtime, sender.clone(), handle); - ops::runtime::init(js_runtime, main_module); - ops::fetch::init(js_runtime, program_state.flags.ca_file.as_deref()); - ops::timers::init(js_runtime); - ops::worker_host::init(js_runtime, Some(sender)); - ops::reg_json_sync(js_runtime, "op_close", deno_core::op_close); - ops::reg_json_sync(js_runtime, "op_resources", deno_core::op_resources); - ops::reg_json_sync( - js_runtime, - "op_domain_to_ascii", - deno_web::op_domain_to_ascii, - ); - ops::errors::init(js_runtime); - ops::io::init(js_runtime); - ops::websocket::init(js_runtime); - - if has_deno_namespace { - ops::fs_events::init(js_runtime); - ops::fs::init(js_runtime); - ops::net::init(js_runtime); - ops::os::init(js_runtime); - ops::permissions::init(js_runtime); - ops::plugin::init(js_runtime); - ops::process::init(js_runtime); - ops::crypto::init(js_runtime, program_state.flags.seed); - ops::runtime_compiler::init(js_runtime); - ops::signal::init(js_runtime); - ops::tls::init(js_runtime); - ops::tty::init(js_runtime); - } - } - - web_worker - } -} - -impl WebWorker { - /// Returns a way to communicate with the Worker from other threads. - pub fn thread_safe_handle(&self) -> WebWorkerHandle { - self.handle.clone() - } - - pub async fn run_event_loop(&mut self) -> Result<(), AnyError> { - poll_fn(|cx| self.poll_event_loop(cx)).await + InspectorSession::new(&mut **inspector) } pub fn poll_event_loop( &mut self, cx: &mut Context, ) -> Poll> { - let worker = &mut self.worker; + // We always poll the inspector if it exists. + let _ = self.inspector.as_mut().map(|i| i.poll_unpin(cx)); + self.js_runtime.poll_event_loop(cx) + } - let terminated = self.handle.terminated.load(Ordering::Relaxed); - - if terminated { - return Poll::Ready(Ok(())); - } - - if !self.event_loop_idle { - match worker.poll_event_loop(cx) { - Poll::Ready(r) => { - let terminated = self.handle.terminated.load(Ordering::Relaxed); - if terminated { - return Poll::Ready(Ok(())); - } - - if let Err(e) = r { - eprintln!( - "{}: Uncaught (in worker \"{}\") {}", - colors::red_bold("error"), - worker.name.to_string(), - e.to_string().trim_start_matches("Uncaught "), - ); - let mut sender = worker.internal_channels.sender.clone(); - sender - .try_send(WorkerEvent::Error(e)) - .expect("Failed to post message to host"); - } - self.event_loop_idle = true; - } - Poll::Pending => {} - } - } - - if let Poll::Ready(r) = self.terminate_rx.poll_next_unpin(cx) { - // terminate_rx should never be closed - assert!(r.is_some()); - return Poll::Ready(Ok(())); - } - - if let Poll::Ready(r) = - worker.internal_channels.receiver.poll_next_unpin(cx) - { - match r { - Some(msg) => { - let msg = String::from_utf8(msg.to_vec()).unwrap(); - let script = format!("workerMessageRecvCallback({})", msg); - - if let Err(e) = worker.execute(&script) { - // If execution was terminated during message callback then - // just ignore it - if self.handle.terminated.load(Ordering::Relaxed) { - return Poll::Ready(Ok(())); - } - - // Otherwise forward error to host - let mut sender = worker.internal_channels.sender.clone(); - sender - .try_send(WorkerEvent::Error(e)) - .expect("Failed to post message to host"); - } - - // Let event loop be polled again - self.event_loop_idle = false; - worker.waker.wake(); - } - None => unreachable!(), - } - } - - Poll::Pending + pub async fn run_event_loop(&mut self) -> Result<(), AnyError> { + poll_fn(|cx| self.poll_event_loop(cx)).await } } -impl Deref for WebWorker { - type Target = Worker; - fn deref(&self) -> &Self::Target { - &self.worker - } -} - -impl DerefMut for WebWorker { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.worker +impl Drop for MainWorker { + fn drop(&mut self) { + // The Isolate object must outlive the Inspector object, but this is + // currently not enforced by the type system. + self.inspector.take(); } } @@ -589,9 +212,6 @@ mod tests { use crate::flags::DenoSubcommand; use crate::flags::Flags; use crate::program_state::ProgramState; - use crate::tokio_util; - use crate::worker::WorkerEvent; - use deno_core::serde_json::json; fn create_test_worker() -> MainWorker { let main_module = @@ -687,105 +307,4 @@ mod tests { let result = worker.execute_module(&module_specifier).await; assert!(result.is_ok()); } - - fn create_test_web_worker() -> WebWorker { - let main_module = - ModuleSpecifier::resolve_url_or_path("./hello.js").unwrap(); - let program_state = ProgramState::mock(vec!["deno".to_string()], None); - let mut worker = WebWorker::new( - "TEST".to_string(), - Permissions::allow_all(), - main_module, - program_state, - false, - ); - worker - .execute("bootstrap.workerRuntime(\"TEST\", false)") - .unwrap(); - worker - } - #[tokio::test] - async fn test_worker_messages() { - let (handle_sender, handle_receiver) = - std::sync::mpsc::sync_channel::(1); - - let join_handle = std::thread::spawn(move || { - let mut worker = create_test_web_worker(); - let source = r#" - onmessage = function(e) { - console.log("msg from main script", e.data); - if (e.data == "exit") { - return close(); - } else { - console.assert(e.data === "hi"); - } - postMessage([1, 2, 3]); - console.log("after postMessage"); - } - "#; - worker.execute(source).unwrap(); - let handle = worker.thread_safe_handle(); - handle_sender.send(handle).unwrap(); - let r = tokio_util::run_basic(worker.run_event_loop()); - assert!(r.is_ok()) - }); - - let mut handle = handle_receiver.recv().unwrap(); - - let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); - let r = handle.post_message(msg.clone()); - assert!(r.is_ok()); - - let maybe_msg = handle.get_event().await.unwrap(); - assert!(maybe_msg.is_some()); - - let r = handle.post_message(msg.clone()); - assert!(r.is_ok()); - - let maybe_msg = handle.get_event().await.unwrap(); - assert!(maybe_msg.is_some()); - match maybe_msg { - Some(WorkerEvent::Message(buf)) => { - assert_eq!(*buf, *b"[1,2,3]"); - } - _ => unreachable!(), - } - - let msg = json!("exit") - .to_string() - .into_boxed_str() - .into_boxed_bytes(); - let r = handle.post_message(msg); - assert!(r.is_ok()); - let event = handle.get_event().await.unwrap(); - assert!(event.is_none()); - handle.sender.close_channel(); - join_handle.join().expect("Failed to join worker thread"); - } - - #[tokio::test] - async fn removed_from_resource_table_on_close() { - let (handle_sender, handle_receiver) = - std::sync::mpsc::sync_channel::(1); - - let join_handle = std::thread::spawn(move || { - let mut worker = create_test_web_worker(); - worker.execute("onmessage = () => { close(); }").unwrap(); - let handle = worker.thread_safe_handle(); - handle_sender.send(handle).unwrap(); - let r = tokio_util::run_basic(worker.run_event_loop()); - assert!(r.is_ok()) - }); - - let mut handle = handle_receiver.recv().unwrap(); - - let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); - let r = handle.post_message(msg.clone()); - assert!(r.is_ok()); - let event = handle.get_event().await.unwrap(); - assert!(event.is_none()); - handle.sender.close_channel(); - - join_handle.join().expect("Failed to join worker thread"); - } }