0
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-03-03 09:31:22 -05:00

fix: top-level-await module execution (#7946)

This commit changes implementation of top-level-await in "deno_core".

Previously promise returned from module evaluation was not awaited,
leading to out-of-order execution of modules that have TLA. It's been
fixed by changing "JsRuntime::mod_evaluate" to be an async function
that resolves when the promise returned from module evaluation also
resolves. When waiting for promise resolution event loop is polled
repeatedly, until there are no more dynamic imports or pending
ops.
This commit is contained in:
Bartek Iwańczuk 2020-10-14 14:04:09 +02:00 committed by GitHub
parent 10654fa955
commit 135053486c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
23 changed files with 479 additions and 40 deletions

View file

@ -153,9 +153,15 @@ fn run_worker_thread(
rt.block_on(load_future)
};
if let Err(e) = result {
let mut sender = worker.internal_channels.sender.clone();
let mut sender = worker.internal_channels.sender.clone();
// If sender is closed it means that worker has already been closed from
// within using "globalThis.close()"
if sender.is_closed() {
return;
}
if let Err(e) = result {
sender
.try_send(WorkerEvent::TerminalError(e))
.expect("Failed to post message to host");

View file

@ -2421,6 +2421,28 @@ itest!(wasm_unreachable {
exit_code: 1,
});
itest!(top_level_await_order {
args: "run --allow-read top_level_await_order.js",
output: "top_level_await_order.out",
});
itest!(top_level_await_loop {
args: "run --allow-read top_level_await_loop.js",
output: "top_level_await_loop.out",
});
itest!(top_level_await_circular {
args: "run --allow-read top_level_await_circular.js",
output: "top_level_await_circular.out",
exit_code: 1,
});
itest!(top_level_await_unresolved {
args: "run top_level_await_unresolved.js",
output: "top_level_await_unresolved.out",
exit_code: 1,
});
itest!(top_level_await {
args: "run --allow-read top_level_await.js",
output: "top_level_await.out",

3
cli/tests/tla/a.js Normal file
View file

@ -0,0 +1,3 @@
import order from "./order.js";
order.push("b");

7
cli/tests/tla/b.js Normal file
View file

@ -0,0 +1,7 @@
import order from "./order.js";
await new Promise((resolve) => {
setTimeout(resolve, 200);
});
order.push("a");

3
cli/tests/tla/c.js Normal file
View file

@ -0,0 +1,3 @@
import order from "./order.js";
order.push("c");

6
cli/tests/tla/d.js Normal file
View file

@ -0,0 +1,6 @@
import order from "./order.js";
const end = Date.now() + 500;
while (end < Date.now()) {}
order.push("d");

1
cli/tests/tla/order.js Normal file
View file

@ -0,0 +1 @@
export default ["order"];

9
cli/tests/tla/parent.js Normal file
View file

@ -0,0 +1,9 @@
import order from "./order.js";
import "./a.js";
import "./b.js";
import "./c.js";
import "./d.js";
order.push("parent");
export default order;

5
cli/tests/tla2/a.js Normal file
View file

@ -0,0 +1,5 @@
export default class Foo {
constructor(message) {
this.message = message;
}
}

5
cli/tests/tla2/b.js Normal file
View file

@ -0,0 +1,5 @@
export default class Bar {
constructor(message) {
this.message = message;
}
}

7
cli/tests/tla3/b.js Normal file
View file

@ -0,0 +1,7 @@
import { foo } from "./timeout_loop.js";
import { collection } from "../top_level_await_circular.js";
console.log("collection in b", collection);
console.log("foo in b", foo);
export const a = "a";

View file

@ -0,0 +1,23 @@
export const foo = "foo";
export function delay(ms) {
return new Promise((res) =>
setTimeout(() => {
res();
}, ms)
);
}
let i = 0;
async function timeoutLoop() {
await delay(1000);
console.log("timeout loop", i);
i++;
if (i > 5) {
return;
}
timeoutLoop();
}
timeoutLoop();

View file

@ -0,0 +1,8 @@
import { foo } from "./tla3/timeout_loop.js";
export const collection = [];
const mod = await import("./tla3/b.js");
console.log("foo in main", foo);
console.log("mod", mod);

View file

@ -0,0 +1,7 @@
timeout loop 0
timeout loop 1
timeout loop 2
timeout loop 3
timeout loop 4
timeout loop 5
error: Dynamically imported module evaluation is still pending but there are no pending ops. This situation is often caused by unresolved promise.

View file

@ -0,0 +1,18 @@
const importsDir = Deno.readDirSync(Deno.realPathSync("./tla2"));
const resolvedPaths = [];
for (const { name } of importsDir) {
const filePath = Deno.realPathSync(`./tla2/${name}`);
resolvedPaths.push(filePath);
}
resolvedPaths.sort();
for (const filePath of resolvedPaths) {
console.log("loading", filePath);
const mod = await import(`file://${filePath}`);
console.log("loaded", mod);
}
console.log("all loaded");

View file

@ -0,0 +1,5 @@
loading [WILDCARD]a.js
loaded Module { default: [Function: Foo], [Symbol(Symbol.toStringTag)]: "Module" }
loading [WILDCARD]b.js
loaded Module { default: [Function: Bar], [Symbol(Symbol.toStringTag)]: "Module" }
all loaded

View file

@ -0,0 +1,21 @@
// Ported from Node
// https://github.com/nodejs/node/blob/54746bb763ebea0dc7e99d88ff4b379bcd680964/test/es-module/test-esm-tla.mjs
const { default: order } = await import("./tla/parent.js");
console.log("order", JSON.stringify(order));
if (
!(
order[0] === "order" &&
order[1] === "b" &&
order[2] === "c" &&
order[3] === "d" &&
order[4] === "a" &&
order[5] === "parent"
)
) {
throw new Error("TLA wrong order");
}
console.log("TLA order correct");

View file

@ -0,0 +1,2 @@
order ["order","b","c","d","a","parent"]
TLA order correct

View file

@ -0,0 +1 @@
await new Promise(() => {});

View file

@ -0,0 +1 @@
error: Module evaluation is still pending but there are no pending ops or dynamic imports. This situation is often caused by unresolved promise.

View file

@ -191,7 +191,7 @@ impl Worker {
) -> Result<(), AnyError> {
let id = self.preload_module(module_specifier).await?;
self.wait_for_inspector_session();
self.js_runtime.mod_evaluate(id)
self.js_runtime.mod_evaluate(id).await
}
/// Returns a way to communicate with the Worker from other threads.

View file

@ -667,7 +667,7 @@ mod tests {
let a_id_fut = runtime.load_module(&spec, None);
let a_id = futures::executor::block_on(a_id_fut).expect("Failed to load");
runtime.mod_evaluate(a_id).unwrap();
futures::executor::block_on(runtime.mod_evaluate(a_id)).unwrap();
let l = loads.lock().unwrap();
assert_eq!(
l.to_vec(),
@ -734,7 +734,7 @@ mod tests {
let result = runtime.load_module(&spec, None).await;
assert!(result.is_ok());
let circular1_id = result.unwrap();
runtime.mod_evaluate(circular1_id).unwrap();
runtime.mod_evaluate(circular1_id).await.unwrap();
let l = loads.lock().unwrap();
assert_eq!(
@ -811,7 +811,7 @@ mod tests {
println!(">> result {:?}", result);
assert!(result.is_ok());
let redirect1_id = result.unwrap();
runtime.mod_evaluate(redirect1_id).unwrap();
runtime.mod_evaluate(redirect1_id).await.unwrap();
let l = loads.lock().unwrap();
assert_eq!(
l.to_vec(),
@ -961,7 +961,7 @@ mod tests {
let main_id =
futures::executor::block_on(main_id_fut).expect("Failed to load");
runtime.mod_evaluate(main_id).unwrap();
futures::executor::block_on(runtime.mod_evaluate(main_id)).unwrap();
let l = loads.lock().unwrap();
assert_eq!(

View file

@ -4,6 +4,7 @@ use rusty_v8 as v8;
use crate::bindings;
use crate::error::attach_handle_to_error;
use crate::error::generic_error;
use crate::error::AnyError;
use crate::error::ErrWithV8Handle;
use crate::error::JsError;
@ -23,6 +24,7 @@ use crate::shared_queue::SharedQueue;
use crate::shared_queue::RECOMMENDED_SIZE;
use crate::BufVec;
use crate::OpState;
use futures::channel::mpsc;
use futures::future::poll_fn;
use futures::stream::FuturesUnordered;
use futures::stream::StreamExt;
@ -83,6 +85,17 @@ pub struct JsRuntime {
allocations: IsolateAllocations,
}
struct DynImportModEvaluate {
module_id: ModuleId,
promise: v8::Global<v8::Promise>,
module: v8::Global<v8::Module>,
}
struct ModEvaluate {
promise: v8::Global<v8::Promise>,
sender: mpsc::Sender<Result<(), AnyError>>,
}
/// Internal state for JsRuntime which is stored in one of v8::Isolate's
/// embedder slots.
pub(crate) struct JsRuntimeState {
@ -91,6 +104,8 @@ pub(crate) struct JsRuntimeState {
pub(crate) js_recv_cb: Option<v8::Global<v8::Function>>,
pub(crate) js_macrotask_cb: Option<v8::Global<v8::Function>>,
pub(crate) pending_promise_exceptions: HashMap<i32, v8::Global<v8::Value>>,
pending_dyn_mod_evaluate: HashMap<ModuleLoadId, DynImportModEvaluate>,
pending_mod_evaluate: Option<ModEvaluate>,
pub(crate) js_error_create_fn: Box<JsErrorCreateFn>,
pub(crate) shared: SharedQueue,
pub(crate) pending_ops: FuturesUnordered<PendingOpFuture>,
@ -264,6 +279,8 @@ impl JsRuntime {
isolate.set_slot(Rc::new(RefCell::new(JsRuntimeState {
global_context: Some(global_context),
pending_promise_exceptions: HashMap::new(),
pending_dyn_mod_evaluate: HashMap::new(),
pending_mod_evaluate: None,
shared_ab: None,
js_recv_cb: None,
js_macrotask_cb: None,
@ -466,17 +483,6 @@ impl JsRuntime {
state.waker.register(cx.waker());
}
// Dynamic module loading - ie. modules loaded using "import()"
{
let poll_imports = self.prepare_dyn_imports(cx)?;
assert!(poll_imports.is_ready());
let poll_imports = self.poll_dyn_imports(cx)?;
assert!(poll_imports.is_ready());
self.check_promise_exceptions()?;
}
// Ops
{
let overflow_response = self.poll_pending_ops(cx);
@ -485,14 +491,38 @@ impl JsRuntime {
self.check_promise_exceptions()?;
}
let state = state_rc.borrow();
let is_idle = {
state.pending_ops.is_empty()
&& state.pending_dyn_imports.is_empty()
&& state.preparing_dyn_imports.is_empty()
};
// Dynamic module loading - ie. modules loaded using "import()"
{
let poll_imports = self.prepare_dyn_imports(cx)?;
assert!(poll_imports.is_ready());
if is_idle {
let poll_imports = self.poll_dyn_imports(cx)?;
assert!(poll_imports.is_ready());
self.evaluate_dyn_imports()?;
self.check_promise_exceptions()?;
}
// Top level module
self.evaluate_pending_module()?;
let state = state_rc.borrow();
let has_pending_ops = !state.pending_ops.is_empty();
let has_pending_dyn_imports = !{
state.preparing_dyn_imports.is_empty()
&& state.pending_dyn_imports.is_empty()
};
let has_pending_dyn_module_evaluation =
!state.pending_dyn_mod_evaluate.is_empty();
let has_pending_module_evaluation = state.pending_mod_evaluate.is_some();
if !has_pending_ops
&& !has_pending_dyn_imports
&& !has_pending_dyn_module_evaluation
&& !has_pending_module_evaluation
{
return Poll::Ready(Ok(()));
}
@ -502,6 +532,27 @@ impl JsRuntime {
state.waker.wake();
}
if has_pending_module_evaluation {
if has_pending_ops
|| has_pending_dyn_imports
|| has_pending_dyn_module_evaluation
{
// pass, will be polled again
} else {
let msg = "Module evaluation is still pending but there are no pending ops or dynamic imports. This situation is often caused by unresolved promise.";
return Poll::Ready(Err(generic_error(msg)));
}
}
if has_pending_dyn_module_evaluation {
if has_pending_ops || has_pending_dyn_imports {
// pass, will be polled again
} else {
let msg = "Dynamically imported module evaluation is still pending but there are no pending ops. This situation is often caused by unresolved promise.";
return Poll::Ready(Err(generic_error(msg)));
}
}
Poll::Pending
}
}
@ -694,7 +745,100 @@ impl JsRuntime {
/// `AnyError` can be downcast to a type that exposes additional information
/// about the V8 exception. By default this type is `JsError`, however it may
/// be a different type if `RuntimeOptions::js_error_create_fn` has been set.
pub fn mod_evaluate(&mut self, id: ModuleId) -> Result<(), AnyError> {
pub fn dyn_mod_evaluate(
&mut self,
load_id: ModuleLoadId,
id: ModuleId,
) -> Result<(), AnyError> {
self.shared_init();
let state_rc = Self::state(self.v8_isolate());
let context = self.global_context();
let context1 = self.global_context();
let module_handle = state_rc
.borrow()
.modules
.get_info(id)
.expect("ModuleInfo not found")
.handle
.clone();
let status = {
let scope =
&mut v8::HandleScope::with_context(self.v8_isolate(), context);
let module = module_handle.get(scope);
module.get_status()
};
if status == v8::ModuleStatus::Instantiated {
// IMPORTANT: Top-level-await is enabled, which means that return value
// of module evaluation is a promise.
//
// Because that promise is created internally by V8, when error occurs during
// module evaluation the promise is rejected, and since the promise has no rejection
// handler it will result in call to `bindings::promise_reject_callback` adding
// the promise to pending promise rejection table - meaning JsRuntime will return
// error on next poll().
//
// This situation is not desirable as we want to manually return error at the
// end of this function to handle it further. It means we need to manually
// remove this promise from pending promise rejection table.
//
// For more details see:
// https://github.com/denoland/deno/issues/4908
// https://v8.dev/features/top-level-await#module-execution-order
let scope =
&mut v8::HandleScope::with_context(self.v8_isolate(), context1);
let module = v8::Local::new(scope, &module_handle);
let maybe_value = module.evaluate(scope);
// Update status after evaluating.
let status = module.get_status();
if let Some(value) = maybe_value {
assert!(
status == v8::ModuleStatus::Evaluated
|| status == v8::ModuleStatus::Errored
);
let promise = v8::Local::<v8::Promise>::try_from(value)
.expect("Expected to get promise as module evaluation result");
let promise_id = promise.get_identity_hash();
let mut state = state_rc.borrow_mut();
state.pending_promise_exceptions.remove(&promise_id);
let promise_global = v8::Global::new(scope, promise);
let module_global = v8::Global::new(scope, module);
let dyn_import_mod_evaluate = DynImportModEvaluate {
module_id: id,
promise: promise_global,
module: module_global,
};
state
.pending_dyn_mod_evaluate
.insert(load_id, dyn_import_mod_evaluate);
} else {
assert!(status == v8::ModuleStatus::Errored);
}
}
if status == v8::ModuleStatus::Evaluated {
self.dyn_import_done(load_id, id)?;
}
Ok(())
}
/// Evaluates an already instantiated ES module.
///
/// `AnyError` can be downcast to a type that exposes additional information
/// about the V8 exception. By default this type is `JsError`, however it may
/// be a different type if `RuntimeOptions::js_error_create_fn` has been set.
fn mod_evaluate_inner(
&mut self,
id: ModuleId,
) -> Result<mpsc::Receiver<Result<(), AnyError>>, AnyError> {
self.shared_init();
let state_rc = Self::state(self.v8_isolate());
@ -710,6 +854,8 @@ impl JsRuntime {
.expect("ModuleInfo not found");
let mut status = module.get_status();
let (sender, receiver) = mpsc::channel(1);
if status == v8::ModuleStatus::Instantiated {
// IMPORTANT: Top-level-await is enabled, which means that return value
// of module evaluation is a promise.
@ -742,20 +888,37 @@ impl JsRuntime {
let promise_id = promise.get_identity_hash();
let mut state = state_rc.borrow_mut();
state.pending_promise_exceptions.remove(&promise_id);
let promise_global = v8::Global::new(scope, promise);
assert!(
state.pending_mod_evaluate.is_none(),
"There is already pending top level module evaluation"
);
state.pending_mod_evaluate = Some(ModEvaluate {
promise: promise_global,
sender,
});
scope.perform_microtask_checkpoint();
} else {
assert!(status == v8::ModuleStatus::Errored);
}
}
match status {
v8::ModuleStatus::Evaluated => Ok(()),
v8::ModuleStatus::Errored => {
let exception = module.get_exception();
exception_to_err_result(scope, exception)
.map_err(|err| attach_handle_to_error(scope, err, exception))
Ok(receiver)
}
pub async fn mod_evaluate(&mut self, id: ModuleId) -> Result<(), AnyError> {
let mut receiver = self.mod_evaluate_inner(id)?;
poll_fn(|cx| {
if let Poll::Ready(result) = receiver.poll_next_unpin(cx) {
debug!("received module evaluate");
return Poll::Ready(result.unwrap());
}
other => panic!("Unexpected module status {:?}", other),
}
let _r = self.poll_event_loop(cx)?;
Poll::Pending
})
.await
}
fn dyn_import_error(
@ -916,16 +1079,132 @@ impl JsRuntime {
// Load is done.
let module_id = load.root_module_id.unwrap();
self.mod_instantiate(module_id)?;
match self.mod_evaluate(module_id) {
Ok(()) => self.dyn_import_done(dyn_import_id, module_id)?,
Err(err) => self.dyn_import_error(dyn_import_id, err)?,
};
self.dyn_mod_evaluate(dyn_import_id, module_id)?;
}
}
}
}
}
/// "deno_core" runs V8 with "--harmony-top-level-await"
/// flag on - it means that each module evaluation returns a promise
/// from V8.
///
/// This promise resolves after all dependent modules have also
/// resolved. Each dependent module may perform calls to "import()" and APIs
/// using async ops will add futures to the runtime's event loop.
/// It means that the promise returned from module evaluation will
/// resolve only after all futures in the event loop are done.
///
/// Thus during turn of event loop we need to check if V8 has
/// resolved or rejected the promise. If the promise is still pending
/// then another turn of event loop must be performed.
fn evaluate_pending_module(&mut self) -> Result<(), AnyError> {
let state_rc = Self::state(self.v8_isolate());
let context = self.global_context();
{
let scope =
&mut v8::HandleScope::with_context(self.v8_isolate(), context);
let mut state = state_rc.borrow_mut();
if let Some(module_evaluation) = state.pending_mod_evaluate.as_ref() {
let promise = module_evaluation.promise.get(scope);
let mut sender = module_evaluation.sender.clone();
let promise_state = promise.state();
match promise_state {
v8::PromiseState::Pending => {
// pass, poll_event_loop will decide if
// runtime would be woken soon
}
v8::PromiseState::Fulfilled => {
state.pending_mod_evaluate.take();
scope.perform_microtask_checkpoint();
sender.try_send(Ok(())).unwrap();
}
v8::PromiseState::Rejected => {
let exception = promise.result(scope);
state.pending_mod_evaluate.take();
drop(state);
scope.perform_microtask_checkpoint();
let err1 = exception_to_err_result::<()>(scope, exception)
.map_err(|err| attach_handle_to_error(scope, err, exception))
.unwrap_err();
sender.try_send(Err(err1)).unwrap();
}
}
}
};
Ok(())
}
fn evaluate_dyn_imports(&mut self) -> Result<(), AnyError> {
let state_rc = Self::state(self.v8_isolate());
loop {
let context = self.global_context();
let maybe_result = {
let scope =
&mut v8::HandleScope::with_context(self.v8_isolate(), context);
let mut state = state_rc.borrow_mut();
if let Some(&dyn_import_id) =
state.pending_dyn_mod_evaluate.keys().next()
{
let handle = state
.pending_dyn_mod_evaluate
.remove(&dyn_import_id)
.unwrap();
drop(state);
let module_id = handle.module_id;
let promise = handle.promise.get(scope);
let _module = handle.module.get(scope);
let promise_state = promise.state();
match promise_state {
v8::PromiseState::Pending => {
state_rc
.borrow_mut()
.pending_dyn_mod_evaluate
.insert(dyn_import_id, handle);
None
}
v8::PromiseState::Fulfilled => Some(Ok((dyn_import_id, module_id))),
v8::PromiseState::Rejected => {
let exception = promise.result(scope);
let err1 = exception_to_err_result::<()>(scope, exception)
.map_err(|err| attach_handle_to_error(scope, err, exception))
.unwrap_err();
Some(Err((dyn_import_id, err1)))
}
}
} else {
None
}
};
if let Some(result) = maybe_result {
match result {
Ok((dyn_import_id, module_id)) => {
self.dyn_import_done(dyn_import_id, module_id)?;
}
Err((dyn_import_id, err1)) => {
self.dyn_import_error(dyn_import_id, err1)?;
}
}
} else {
break;
}
}
Ok(())
}
fn register_during_load(
&mut self,
info: ModuleSource,
@ -1997,7 +2276,7 @@ pub mod tests {
runtime.mod_instantiate(mod_a).unwrap();
assert_eq!(dispatch_count.load(Ordering::Relaxed), 0);
runtime.mod_evaluate(mod_a).unwrap();
runtime.mod_evaluate_inner(mod_a).unwrap();
assert_eq!(dispatch_count.load(Ordering::Relaxed), 1);
}
@ -2240,7 +2519,7 @@ pub mod tests {
)
.unwrap();
runtime.mod_evaluate(module_id).unwrap();
futures::executor::block_on(runtime.mod_evaluate(module_id)).unwrap();
let _snapshot = runtime.snapshot();
}