From f3c0f0565bbf43b4cc31979b05e729d4f4a1538f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Thu, 25 Nov 2021 19:49:09 +0100 Subject: [PATCH] feat(core): Add ability to "ref" and "unref" pending ops (#12889) This commit adds an ability to "ref" or "unref" pending ops. Up to this point Deno had a notion of "async ops" and "unref async ops"; the former keep event loop alive, while the latter do not block event loop from finishing. It was not possible to change between op types after dispatching, one had to decide which type to use before dispatch. Instead of storing ops in two separate "FuturesUnordered" collections, now ops are stored in a single collection, with supplemental "HashSet" storing ids of promises that were "unrefed". Two APIs were added to "Deno.core": "Deno.core.refOp(promiseId)" which allows to mark promise id to be "refed" and keep event loop alive (the default behavior) "Deno.core.unrefOp(promiseId)" which allows to mark promise id as "unrefed" which won't block event loop from exiting --- core/01_core.js | 9 ++- core/bindings.rs | 57 +++++++++++++++++-- core/lib.deno_core.d.ts | 8 +++ core/lib.rs | 1 - core/ops.rs | 5 +- core/ops_json.rs | 33 ----------- core/ops_metrics.rs | 14 +---- core/runtime.rs | 120 +++++++++++++++++++++++++++------------ runtime/js/40_signals.js | 5 +- runtime/ops/signal.rs | 4 +- 10 files changed, 160 insertions(+), 96 deletions(-) diff --git a/core/01_core.js b/core/01_core.js index 75bfc884f4..c3fd7cf9d9 100644 --- a/core/01_core.js +++ b/core/01_core.js @@ -23,6 +23,7 @@ MapPrototypeSet, PromisePrototypeThen, ObjectAssign, + SymbolFor, } = window.__bootstrap.primordials; // Available on start due to bindings. @@ -43,6 +44,9 @@ const RING_SIZE = 4 * 1024; const NO_PROMISE = null; // Alias to null is faster than plain nulls const promiseRing = ArrayPrototypeFill(new Array(RING_SIZE), NO_PROMISE); + // TODO(bartlomieju): it future use `v8::Private` so it's not visible + // to users. Currently missing bindings. + const promiseIdSymbol = SymbolFor("Deno.core.internalPromiseId"); function setPromise(promiseId) { const idx = promiseId % RING_SIZE; @@ -135,7 +139,10 @@ const maybeError = opcallAsync(opsCache[opName], promiseId, arg1, arg2); // Handle sync error (e.g: error parsing args) if (maybeError) return unwrapOpResult(maybeError); - return PromisePrototypeThen(setPromise(promiseId), unwrapOpResult); + const p = PromisePrototypeThen(setPromise(promiseId), unwrapOpResult); + // Save the id on the promise so it can later be ref'ed or unref'ed + p[promiseIdSymbol] = promiseId; + return p; } function opSync(opName, arg1 = null, arg2 = null) { diff --git a/core/bindings.rs b/core/bindings.rs index f5eb1705ee..b18d24c795 100644 --- a/core/bindings.rs +++ b/core/bindings.rs @@ -35,6 +35,12 @@ lazy_static::lazy_static! { v8::ExternalReference { function: opcall_sync.map_fn_to() }, + v8::ExternalReference { + function: ref_op.map_fn_to() + }, + v8::ExternalReference { + function: unref_op.map_fn_to() + }, v8::ExternalReference { function: set_macrotask_callback.map_fn_to() }, @@ -151,6 +157,8 @@ pub fn initialize_context<'s>( // Bind functions to Deno.core.* set_func(scope, core_val, "opcallSync", opcall_sync); set_func(scope, core_val, "opcallAsync", opcall_async); + set_func(scope, core_val, "refOp", ref_op); + set_func(scope, core_val, "unrefOp", unref_op); set_func( scope, core_val, @@ -453,17 +461,56 @@ fn opcall_async<'s>( state.pending_ops.push(fut); state.have_unpolled_ops = true; } - Op::AsyncUnref(fut) => { - state.op_state.borrow().tracker.track_unref(op_id); - state.pending_unref_ops.push(fut); - state.have_unpolled_ops = true; - } Op::NotFound => { throw_type_error(scope, format!("Unknown op id: {}", op_id)); } } } +fn ref_op<'s>( + scope: &mut v8::HandleScope<'s>, + args: v8::FunctionCallbackArguments, + _rv: v8::ReturnValue, +) { + let state_rc = JsRuntime::state(scope); + let mut state = state_rc.borrow_mut(); + + let promise_id = match v8::Local::::try_from(args.get(0)) + .map(|l| l.value() as PromiseId) + .map_err(Error::from) + { + Ok(promise_id) => promise_id, + Err(err) => { + throw_type_error(scope, format!("invalid promise id: {}", err)); + return; + } + }; + + state.unrefed_ops.remove(&promise_id); +} + +fn unref_op<'s>( + scope: &mut v8::HandleScope<'s>, + args: v8::FunctionCallbackArguments, + _rv: v8::ReturnValue, +) { + let state_rc = JsRuntime::state(scope); + let mut state = state_rc.borrow_mut(); + + let promise_id = match v8::Local::::try_from(args.get(0)) + .map(|l| l.value() as PromiseId) + .map_err(Error::from) + { + Ok(promise_id) => promise_id, + Err(err) => { + throw_type_error(scope, format!("invalid promise id: {}", err)); + return; + } + }; + + state.unrefed_ops.insert(promise_id); +} + fn has_tick_scheduled( scope: &mut v8::HandleScope, _args: v8::FunctionCallbackArguments, diff --git a/core/lib.deno_core.d.ts b/core/lib.deno_core.d.ts index f33f6164a9..59b2df542e 100644 --- a/core/lib.deno_core.d.ts +++ b/core/lib.deno_core.d.ts @@ -21,6 +21,14 @@ declare namespace Deno { b?: any, ): Promise; + /** Mark following promise as "ref", ie. event loop won't exit + * until all "ref" promises are resolved. All async ops are "ref" by default. */ + function refOp(promiseId: number): void; + + /** Mark following promise as "unref", ie. event loop will exit + * if there are only "unref" promises left. */ + function unrefOps(promiseId: number): void; + /** * Retrieve a list of all registered ops, in the form of a map that maps op * name to internal numerical op id. diff --git a/core/lib.rs b/core/lib.rs index 87994720f4..f47db7f2e4 100644 --- a/core/lib.rs +++ b/core/lib.rs @@ -83,7 +83,6 @@ pub use crate::ops_builtin::op_close; pub use crate::ops_builtin::op_print; pub use crate::ops_builtin::op_resources; pub use crate::ops_json::op_async; -pub use crate::ops_json::op_async_unref; pub use crate::ops_json::op_sync; pub use crate::ops_json::void_op_async; pub use crate::ops_json::void_op_sync; diff --git a/core/ops.rs b/core/ops.rs index 13f0011461..6b2c06397f 100644 --- a/core/ops.rs +++ b/core/ops.rs @@ -80,7 +80,7 @@ where } } -pub type PromiseId = u64; +pub type PromiseId = i32; pub type OpAsyncFuture = OpCall<(PromiseId, OpId, OpResult)>; pub type OpFn = dyn Fn(Rc>, OpPayload) -> Op + 'static; pub type OpId = usize; @@ -111,9 +111,6 @@ impl<'a, 'b, 'c> OpPayload<'a, 'b, 'c> { pub enum Op { Sync(OpResult), Async(OpAsyncFuture), - /// AsyncUnref is the variation of Async, which doesn't block the program - /// exiting. - AsyncUnref(OpAsyncFuture), NotFound, } diff --git a/core/ops_json.rs b/core/ops_json.rs index b3153763e1..ad4aeeb470 100644 --- a/core/ops_json.rs +++ b/core/ops_json.rs @@ -67,7 +67,6 @@ where /// Creates an op that passes data asynchronously using JSON. /// /// When this op is dispatched, the runtime doesn't exit while processing it. -/// Use op_async_unref instead if you want to make the runtime exit while processing it. /// /// The provided function `op_fn` has the following parameters: /// * `Rc`: the op state, can be used to read/write resources in the runtime from an op. @@ -118,38 +117,6 @@ where }) } -/// Creates an op that passes data asynchronously using JSON. -/// -/// When this op is dispatched, the runtime still can exit while processing it. -/// -/// The other usages are the same as `op_async`. -pub fn op_async_unref(op_fn: F) -> Box -where - F: Fn(Rc>, A, B) -> R + 'static, - A: DeserializeOwned, - B: DeserializeOwned, - R: Future> + 'static, - RV: Serialize + 'static, -{ - Box::new(move |state, payload| -> Op { - let op_id = payload.op_id; - let pid = payload.promise_id; - // Deserialize args, sync error on failure - let args = match payload.deserialize() { - Ok(args) => args, - Err(err) => { - return Op::Sync(serialize_op_result(Err::<(), Error>(err), state)) - } - }; - let (a, b) = args; - - use crate::futures::FutureExt; - let fut = op_fn(state.clone(), a, b) - .map(move |result| (pid, op_id, serialize_op_result(result, state))); - Op::AsyncUnref(OpCall::eager(fut)) - }) -} - #[cfg(test)] mod tests { use super::*; diff --git a/core/ops_metrics.rs b/core/ops_metrics.rs index 5c40a47ca3..f2e9bc255c 100644 --- a/core/ops_metrics.rs +++ b/core/ops_metrics.rs @@ -11,10 +11,12 @@ pub struct OpMetrics { pub ops_dispatched: u64, pub ops_dispatched_sync: u64, pub ops_dispatched_async: u64, + // TODO(bartlomieju): this field is never updated pub ops_dispatched_async_unref: u64, pub ops_completed: u64, pub ops_completed_sync: u64, pub ops_completed_async: u64, + // TODO(bartlomieju): this field is never updated pub ops_completed_async_unref: u64, pub bytes_sent_control: u64, pub bytes_sent_data: u64, @@ -84,16 +86,4 @@ impl OpsTracker { metrics.ops_completed += 1; metrics.ops_completed_async += 1; } - - pub fn track_unref(&self, id: OpId) { - let metrics = &mut self.metrics_mut(id); - metrics.ops_dispatched += 1; - metrics.ops_dispatched_async_unref += 1; - } - - pub fn track_unref_completed(&self, id: OpId) { - let metrics = &mut self.metrics_mut(id); - metrics.ops_completed += 1; - metrics.ops_completed_async_unref += 1; - } } diff --git a/core/runtime.rs b/core/runtime.rs index 3af090a1c8..ad7f168865 100644 --- a/core/runtime.rs +++ b/core/runtime.rs @@ -29,6 +29,7 @@ use futures::task::AtomicWaker; use std::any::Any; use std::cell::RefCell; use std::collections::HashMap; +use std::collections::HashSet; use std::ffi::c_void; use std::mem::forget; use std::option::Option; @@ -135,23 +136,6 @@ pub type SharedArrayBufferStore = pub type CompiledWasmModuleStore = CrossIsolateStore; -struct AsyncOpIterator<'a, 'b, 'c> { - ops: &'b mut FuturesUnordered, - cx: &'a mut Context<'c>, -} - -impl Iterator for AsyncOpIterator<'_, '_, '_> { - type Item = (PromiseId, OpId, OpResult); - - #[inline] - fn next(&mut self) -> Option { - match self.ops.poll_next_unpin(self.cx) { - Poll::Ready(Some(item)) => Some(item), - _ => None, - } - } -} - /// Internal state for JsRuntime which is stored in one of v8::Isolate's /// embedder slots. pub(crate) struct JsRuntimeState { @@ -171,7 +155,7 @@ pub(crate) struct JsRuntimeState { dyn_module_evaluate_idle_counter: u32, pub(crate) js_error_create_fn: Rc, pub(crate) pending_ops: FuturesUnordered, - pub(crate) pending_unref_ops: FuturesUnordered, + pub(crate) unrefed_ops: HashSet, pub(crate) have_unpolled_ops: bool, pub(crate) op_state: Rc>, pub(crate) shared_array_buffer_store: Option, @@ -371,7 +355,7 @@ impl JsRuntime { js_wasm_streaming_cb: None, js_error_create_fn, pending_ops: FuturesUnordered::new(), - pending_unref_ops: FuturesUnordered::new(), + unrefed_ops: HashSet::new(), shared_array_buffer_store: options.shared_array_buffer_store, compiled_wasm_module_store: options.compiled_wasm_module_store, op_state: op_state.clone(), @@ -801,7 +785,8 @@ impl JsRuntime { let mut state = state_rc.borrow_mut(); let module_map = module_map_rc.borrow(); - let has_pending_ops = !state.pending_ops.is_empty(); + let has_pending_refed_ops = + state.pending_ops.len() > state.unrefed_ops.len(); let has_pending_dyn_imports = module_map.has_pending_dynamic_imports(); let has_pending_dyn_module_evaluation = !state.pending_dyn_mod_evaluate.is_empty(); @@ -815,7 +800,7 @@ impl JsRuntime { .map(|i| i.has_active_sessions()) .unwrap_or(false); - if !has_pending_ops + if !has_pending_refed_ops && !has_pending_dyn_imports && !has_pending_dyn_module_evaluation && !has_pending_module_evaluation @@ -841,7 +826,7 @@ impl JsRuntime { } if has_pending_module_evaluation { - if has_pending_ops + if has_pending_refed_ops || has_pending_dyn_imports || has_pending_dyn_module_evaluation || has_pending_background_tasks @@ -854,7 +839,7 @@ impl JsRuntime { } if has_pending_dyn_module_evaluation { - if has_pending_ops + if has_pending_refed_ops || has_pending_dyn_imports || has_pending_background_tasks { @@ -1529,21 +1514,12 @@ impl JsRuntime { state.have_unpolled_ops = false; let op_state = state.op_state.clone(); - let ops = AsyncOpIterator { - ops: &mut state.pending_ops, - cx, - }; - for (promise_id, op_id, resp) in ops { + + while let Poll::Ready(Some(item)) = state.pending_ops.poll_next_unpin(cx) + { + let (promise_id, op_id, resp) = item; op_state.borrow().tracker.track_async_completed(op_id); - args.push(v8::Integer::new(scope, promise_id as i32).into()); - args.push(resp.to_v8(scope).unwrap()); - } - let ops = AsyncOpIterator { - ops: &mut state.pending_unref_ops, - cx, - }; - for (promise_id, op_id, resp) in ops { - op_state.borrow().tracker.track_unref_completed(op_id); + state.unrefed_ops.remove(&promise_id); args.push(v8::Integer::new(scope, promise_id as i32).into()); args.push(resp.to_v8(scope).unwrap()); } @@ -1743,6 +1719,76 @@ pub mod tests { assert_eq!(dispatch_count.load(Ordering::Relaxed), 2); } + #[test] + fn test_op_async_promise_id() { + let (mut runtime, _dispatch_count) = setup(Mode::Async); + runtime + .execute_script( + "filename.js", + r#" + const p = Deno.core.opAsync("op_test", 42); + if (p[Symbol.for("Deno.core.internalPromiseId")] == undefined) { + throw new Error("missing id on returned promise"); + } + "#, + ) + .unwrap(); + } + + #[test] + fn test_ref_unref_ops() { + let (mut runtime, _dispatch_count) = setup(Mode::Async); + runtime + .execute_script( + "filename.js", + r#" + var promiseIdSymbol = Symbol.for("Deno.core.internalPromiseId"); + var p1 = Deno.core.opAsync("op_test", 42); + var p2 = Deno.core.opAsync("op_test", 42); + "#, + ) + .unwrap(); + { + let isolate = runtime.v8_isolate(); + let state_rc = JsRuntime::state(isolate); + let state = state_rc.borrow(); + assert_eq!(state.pending_ops.len(), 2); + assert_eq!(state.unrefed_ops.len(), 0); + } + runtime + .execute_script( + "filename.js", + r#" + Deno.core.unrefOp(p1[promiseIdSymbol]); + Deno.core.unrefOp(p2[promiseIdSymbol]); + "#, + ) + .unwrap(); + { + let isolate = runtime.v8_isolate(); + let state_rc = JsRuntime::state(isolate); + let state = state_rc.borrow(); + assert_eq!(state.pending_ops.len(), 2); + assert_eq!(state.unrefed_ops.len(), 2); + } + runtime + .execute_script( + "filename.js", + r#" + Deno.core.refOp(p1[promiseIdSymbol]); + Deno.core.refOp(p2[promiseIdSymbol]); + "#, + ) + .unwrap(); + { + let isolate = runtime.v8_isolate(); + let state_rc = JsRuntime::state(isolate); + let state = state_rc.borrow(); + assert_eq!(state.pending_ops.len(), 2); + assert_eq!(state.unrefed_ops.len(), 0); + } + } + #[test] fn test_dispatch_no_zero_copy_buf() { let (mut runtime, dispatch_count) = setup(Mode::AsyncZeroCopy(false)); diff --git a/runtime/js/40_signals.js b/runtime/js/40_signals.js index a4f3a6ccd4..2498c40d72 100644 --- a/runtime/js/40_signals.js +++ b/runtime/js/40_signals.js @@ -5,6 +5,7 @@ const core = window.Deno.core; const { Set, + SymbolFor, TypeError, } = window.__bootstrap.primordials; @@ -13,7 +14,9 @@ } function pollSignal(rid) { - return core.opAsync("op_signal_poll", rid); + const promise = core.opAsync("op_signal_poll", rid); + core.unrefOp(promise[SymbolFor("Deno.core.internalPromiseId")]); + return promise; } function unbindSignal(rid) { diff --git a/runtime/ops/signal.rs b/runtime/ops/signal.rs index aa419c6c81..db30c66d88 100644 --- a/runtime/ops/signal.rs +++ b/runtime/ops/signal.rs @@ -4,7 +4,7 @@ use deno_core::error::generic_error; #[cfg(not(target_os = "windows"))] use deno_core::error::type_error; use deno_core::error::AnyError; -use deno_core::op_async_unref; +use deno_core::op_async; use deno_core::op_sync; use deno_core::Extension; use deno_core::OpState; @@ -33,7 +33,7 @@ pub fn init() -> Extension { .ops(vec![ ("op_signal_bind", op_sync(op_signal_bind)), ("op_signal_unbind", op_sync(op_signal_unbind)), - ("op_signal_poll", op_async_unref(op_signal_poll)), + ("op_signal_poll", op_async(op_signal_poll)), ]) .build() }