diff --git a/cli/tests/worker_with_top_level_await.ts b/cli/tests/worker_with_top_level_await.ts new file mode 100644 index 0000000000..cf3418bf7f --- /dev/null +++ b/cli/tests/worker_with_top_level_await.ts @@ -0,0 +1,15 @@ +// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. + +import { serve } from "../../std/http/server.ts"; + +const server = serve({ port: 8080 }); + +self.onmessage = (e: MessageEvent) => { + console.log("TLA worker received message", e.data); +}; + +self.postMessage("hello"); + +for await (const _r of server) { + // pass +} diff --git a/cli/tests/workers_test.ts b/cli/tests/workers_test.ts index d907c97a9b..4f7682be27 100644 --- a/cli/tests/workers_test.ts +++ b/cli/tests/workers_test.ts @@ -357,3 +357,22 @@ Deno.test({ w.terminate(); }, }); + +Deno.test({ + name: "Worker with top-level-await", + fn: async function (): Promise { + const promise = deferred(); + const worker = new Worker( + new URL("./worker_with_top_level_await.ts", import.meta.url).href, + { deno: true, type: "module" }, + ); + worker.onmessage = (e): void => { + console.log("received from worker", e.data); + worker.postMessage("from main"); + promise.resolve(); + }; + + await promise; + worker.terminate(); + }, +}); diff --git a/core/modules.rs b/core/modules.rs index 546f2464f8..8248eb32d8 100644 --- a/core/modules.rs +++ b/core/modules.rs @@ -718,8 +718,8 @@ mod tests { let spec = ModuleSpecifier::resolve_url("file:///a.js").unwrap(); let a_id_fut = runtime.load_module(&spec, None); let a_id = futures::executor::block_on(a_id_fut).expect("Failed to load"); - - futures::executor::block_on(runtime.mod_evaluate(a_id)).unwrap(); + runtime.mod_evaluate(a_id); + futures::executor::block_on(runtime.run_event_loop()).unwrap(); let l = loads.lock().unwrap(); assert_eq!( l.to_vec(), @@ -786,7 +786,8 @@ mod tests { let result = runtime.load_module(&spec, None).await; assert!(result.is_ok()); let circular1_id = result.unwrap(); - runtime.mod_evaluate(circular1_id).await.unwrap(); + runtime.mod_evaluate(circular1_id); + runtime.run_event_loop().await.unwrap(); let l = loads.lock().unwrap(); assert_eq!( @@ -863,7 +864,8 @@ mod tests { println!(">> result {:?}", result); assert!(result.is_ok()); let redirect1_id = result.unwrap(); - runtime.mod_evaluate(redirect1_id).await.unwrap(); + runtime.mod_evaluate(redirect1_id); + runtime.run_event_loop().await.unwrap(); let l = loads.lock().unwrap(); assert_eq!( l.to_vec(), @@ -1012,8 +1014,8 @@ mod tests { .boxed_local(); let main_id = futures::executor::block_on(main_id_fut).expect("Failed to load"); - - futures::executor::block_on(runtime.mod_evaluate(main_id)).unwrap(); + runtime.mod_evaluate(main_id); + futures::executor::block_on(runtime.run_event_loop()).unwrap(); let l = loads.lock().unwrap(); assert_eq!( diff --git a/core/runtime.rs b/core/runtime.rs index 24bdf4dc29..e9949bc8e2 100644 --- a/core/runtime.rs +++ b/core/runtime.rs @@ -825,12 +825,17 @@ impl JsRuntime { Ok(()) } + // TODO(bartlomieju): make it return `ModuleEvaluationFuture`? /// Evaluates an already instantiated ES module. /// + /// Returns a receiver handle that resolves when module promise resolves. + /// Implementors must manually call `run_event_loop()` to drive module + /// evaluation future. + /// /// `AnyError` can be downcast to a type that exposes additional information /// about the V8 exception. By default this type is `JsError`, however it may /// be a different type if `RuntimeOptions::js_error_create_fn` has been set. - fn mod_evaluate_inner( + pub fn mod_evaluate( &mut self, id: ModuleId, ) -> mpsc::Receiver> { @@ -902,24 +907,6 @@ impl JsRuntime { receiver } - pub async fn mod_evaluate(&mut self, id: ModuleId) -> Result<(), AnyError> { - let mut receiver = self.mod_evaluate_inner(id); - - poll_fn(|cx| { - if let Poll::Ready(maybe_result) = receiver.poll_next_unpin(cx) { - debug!("received module evaluate {:#?}", maybe_result); - // If `None` is returned it means that runtime was destroyed before - // evaluation was complete. This can happen in Web Worker when `self.close()` - // is called at top level. - let result = maybe_result.unwrap_or(Ok(())); - return Poll::Ready(result); - } - let _r = self.poll_event_loop(cx)?; - Poll::Pending - }) - .await - } - fn dyn_import_error(&mut self, id: ModuleLoadId, err: AnyError) { let state_rc = Self::state(self.v8_isolate()); let context = self.global_context(); @@ -1110,7 +1097,8 @@ impl JsRuntime { v8::PromiseState::Fulfilled => { state.pending_mod_evaluate.take(); scope.perform_microtask_checkpoint(); - sender.try_send(Ok(())).unwrap(); + // Receiver end might have been already dropped, ignore the result + let _ = sender.try_send(Ok(())); } v8::PromiseState::Rejected => { let exception = promise.result(scope); @@ -1120,7 +1108,8 @@ impl JsRuntime { let err1 = exception_to_err_result::<()>(scope, exception, false) .map_err(|err| attach_handle_to_error(scope, err, exception)) .unwrap_err(); - sender.try_send(Err(err1)).unwrap(); + // Receiver end might have been already dropped, ignore the result + let _ = sender.try_send(Err(err1)); } } } @@ -2259,7 +2248,7 @@ pub mod tests { runtime.mod_instantiate(mod_a).unwrap(); assert_eq!(dispatch_count.load(Ordering::Relaxed), 0); - runtime.mod_evaluate_inner(mod_a); + runtime.mod_evaluate(mod_a); assert_eq!(dispatch_count.load(Ordering::Relaxed), 1); } @@ -2502,7 +2491,8 @@ pub mod tests { ) .unwrap(); - futures::executor::block_on(runtime.mod_evaluate(module_id)).unwrap(); + runtime.mod_evaluate(module_id); + futures::executor::block_on(runtime.run_event_loop()).unwrap(); let _snapshot = runtime.snapshot(); } diff --git a/runtime/web_worker.rs b/runtime/web_worker.rs index c1713f8150..235cb2c7ef 100644 --- a/runtime/web_worker.rs +++ b/runtime/web_worker.rs @@ -315,7 +315,28 @@ impl WebWorker { module_specifier: &ModuleSpecifier, ) -> Result<(), AnyError> { let id = self.js_runtime.load_module(module_specifier, None).await?; - self.js_runtime.mod_evaluate(id).await + + let mut receiver = self.js_runtime.mod_evaluate(id); + tokio::select! { + maybe_result = receiver.next() => { + debug!("received worker module evaluate {:#?}", maybe_result); + // If `None` is returned it means that runtime was destroyed before + // evaluation was complete. This can happen in Web Worker when `self.close()` + // is called at top level. + let result = maybe_result.unwrap_or(Ok(())); + return result; + } + + event_loop_result = self.run_event_loop() => { + if self.has_been_terminated() { + return Ok(()); + } + event_loop_result?; + let maybe_result = receiver.next().await; + let result = maybe_result.unwrap_or(Ok(())); + return result; + } + } } /// Returns a way to communicate with the Worker from other threads. @@ -374,6 +395,8 @@ impl WebWorker { let msg = String::from_utf8(msg.to_vec()).unwrap(); let script = format!("workerMessageRecvCallback({})", msg); + // TODO(bartlomieju): set proper script name like "deno:runtime/web_worker.js" + // so it's dimmed in stack trace instead of using "__anonymous__" if let Err(e) = self.execute(&script) { // If execution was terminated during message callback then // just ignore it diff --git a/runtime/worker.rs b/runtime/worker.rs index adb525c4c9..b01da45533 100644 --- a/runtime/worker.rs +++ b/runtime/worker.rs @@ -10,6 +10,7 @@ use crate::permissions::Permissions; use deno_core::error::AnyError; use deno_core::futures::future::poll_fn; use deno_core::futures::future::FutureExt; +use deno_core::futures::stream::StreamExt; use deno_core::serde_json; use deno_core::serde_json::json; use deno_core::url::Url; @@ -211,7 +212,21 @@ impl MainWorker { ) -> Result<(), AnyError> { let id = self.preload_module(module_specifier).await?; self.wait_for_inspector_session(); - self.js_runtime.mod_evaluate(id).await + let mut receiver = self.js_runtime.mod_evaluate(id); + tokio::select! { + maybe_result = receiver.next() => { + debug!("received module evaluate {:#?}", maybe_result); + let result = maybe_result.expect("Module evaluation result not provided."); + return result; + } + + event_loop_result = self.run_event_loop() => { + event_loop_result?; + let maybe_result = receiver.next().await; + let result = maybe_result.expect("Module evaluation result not provided."); + return result; + } + } } fn wait_for_inspector_session(&mut self) {