1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-21 21:50:00 -05:00

feat(core): Add ability to "ref" and "unref" pending ops (#12889)

This commit adds an ability to "ref" or "unref" pending ops.

Up to this point Deno had a notion of "async ops" and "unref async ops";
the former keep event loop alive, while the latter do not block event loop
from finishing. It was not possible to change between op types after
dispatching, one had to decide which type to use before dispatch.

Instead of storing ops in two separate "FuturesUnordered" collections,
now ops are stored in a single collection, with supplemental "HashSet"
storing ids of promises that were "unrefed".

Two APIs were added to "Deno.core":

"Deno.core.refOp(promiseId)" which allows to mark promise id
to be "refed" and keep event loop alive (the default behavior)
"Deno.core.unrefOp(promiseId)" which allows to mark promise
id as "unrefed" which won't block event loop from exiting
This commit is contained in:
Bartek Iwańczuk 2021-11-25 19:49:09 +01:00 committed by GitHub
parent 2853e37604
commit f3c0f0565b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 160 additions and 96 deletions

View file

@ -23,6 +23,7 @@
MapPrototypeSet,
PromisePrototypeThen,
ObjectAssign,
SymbolFor,
} = window.__bootstrap.primordials;
// Available on start due to bindings.
@ -43,6 +44,9 @@
const RING_SIZE = 4 * 1024;
const NO_PROMISE = null; // Alias to null is faster than plain nulls
const promiseRing = ArrayPrototypeFill(new Array(RING_SIZE), NO_PROMISE);
// TODO(bartlomieju): it future use `v8::Private` so it's not visible
// to users. Currently missing bindings.
const promiseIdSymbol = SymbolFor("Deno.core.internalPromiseId");
function setPromise(promiseId) {
const idx = promiseId % RING_SIZE;
@ -135,7 +139,10 @@
const maybeError = opcallAsync(opsCache[opName], promiseId, arg1, arg2);
// Handle sync error (e.g: error parsing args)
if (maybeError) return unwrapOpResult(maybeError);
return PromisePrototypeThen(setPromise(promiseId), unwrapOpResult);
const p = PromisePrototypeThen(setPromise(promiseId), unwrapOpResult);
// Save the id on the promise so it can later be ref'ed or unref'ed
p[promiseIdSymbol] = promiseId;
return p;
}
function opSync(opName, arg1 = null, arg2 = null) {

View file

@ -35,6 +35,12 @@ lazy_static::lazy_static! {
v8::ExternalReference {
function: opcall_sync.map_fn_to()
},
v8::ExternalReference {
function: ref_op.map_fn_to()
},
v8::ExternalReference {
function: unref_op.map_fn_to()
},
v8::ExternalReference {
function: set_macrotask_callback.map_fn_to()
},
@ -151,6 +157,8 @@ pub fn initialize_context<'s>(
// Bind functions to Deno.core.*
set_func(scope, core_val, "opcallSync", opcall_sync);
set_func(scope, core_val, "opcallAsync", opcall_async);
set_func(scope, core_val, "refOp", ref_op);
set_func(scope, core_val, "unrefOp", unref_op);
set_func(
scope,
core_val,
@ -453,17 +461,56 @@ fn opcall_async<'s>(
state.pending_ops.push(fut);
state.have_unpolled_ops = true;
}
Op::AsyncUnref(fut) => {
state.op_state.borrow().tracker.track_unref(op_id);
state.pending_unref_ops.push(fut);
state.have_unpolled_ops = true;
}
Op::NotFound => {
throw_type_error(scope, format!("Unknown op id: {}", op_id));
}
}
}
fn ref_op<'s>(
scope: &mut v8::HandleScope<'s>,
args: v8::FunctionCallbackArguments,
_rv: v8::ReturnValue,
) {
let state_rc = JsRuntime::state(scope);
let mut state = state_rc.borrow_mut();
let promise_id = match v8::Local::<v8::Integer>::try_from(args.get(0))
.map(|l| l.value() as PromiseId)
.map_err(Error::from)
{
Ok(promise_id) => promise_id,
Err(err) => {
throw_type_error(scope, format!("invalid promise id: {}", err));
return;
}
};
state.unrefed_ops.remove(&promise_id);
}
fn unref_op<'s>(
scope: &mut v8::HandleScope<'s>,
args: v8::FunctionCallbackArguments,
_rv: v8::ReturnValue,
) {
let state_rc = JsRuntime::state(scope);
let mut state = state_rc.borrow_mut();
let promise_id = match v8::Local::<v8::Integer>::try_from(args.get(0))
.map(|l| l.value() as PromiseId)
.map_err(Error::from)
{
Ok(promise_id) => promise_id,
Err(err) => {
throw_type_error(scope, format!("invalid promise id: {}", err));
return;
}
};
state.unrefed_ops.insert(promise_id);
}
fn has_tick_scheduled(
scope: &mut v8::HandleScope,
_args: v8::FunctionCallbackArguments,

View file

@ -21,6 +21,14 @@ declare namespace Deno {
b?: any,
): Promise<any>;
/** Mark following promise as "ref", ie. event loop won't exit
* until all "ref" promises are resolved. All async ops are "ref" by default. */
function refOp(promiseId: number): void;
/** Mark following promise as "unref", ie. event loop will exit
* if there are only "unref" promises left. */
function unrefOps(promiseId: number): void;
/**
* Retrieve a list of all registered ops, in the form of a map that maps op
* name to internal numerical op id.

View file

@ -83,7 +83,6 @@ pub use crate::ops_builtin::op_close;
pub use crate::ops_builtin::op_print;
pub use crate::ops_builtin::op_resources;
pub use crate::ops_json::op_async;
pub use crate::ops_json::op_async_unref;
pub use crate::ops_json::op_sync;
pub use crate::ops_json::void_op_async;
pub use crate::ops_json::void_op_sync;

View file

@ -80,7 +80,7 @@ where
}
}
pub type PromiseId = u64;
pub type PromiseId = i32;
pub type OpAsyncFuture = OpCall<(PromiseId, OpId, OpResult)>;
pub type OpFn = dyn Fn(Rc<RefCell<OpState>>, OpPayload) -> Op + 'static;
pub type OpId = usize;
@ -111,9 +111,6 @@ impl<'a, 'b, 'c> OpPayload<'a, 'b, 'c> {
pub enum Op {
Sync(OpResult),
Async(OpAsyncFuture),
/// AsyncUnref is the variation of Async, which doesn't block the program
/// exiting.
AsyncUnref(OpAsyncFuture),
NotFound,
}

View file

@ -67,7 +67,6 @@ where
/// Creates an op that passes data asynchronously using JSON.
///
/// When this op is dispatched, the runtime doesn't exit while processing it.
/// Use op_async_unref instead if you want to make the runtime exit while processing it.
///
/// The provided function `op_fn` has the following parameters:
/// * `Rc<RefCell<OpState>`: the op state, can be used to read/write resources in the runtime from an op.
@ -118,38 +117,6 @@ where
})
}
/// Creates an op that passes data asynchronously using JSON.
///
/// When this op is dispatched, the runtime still can exit while processing it.
///
/// The other usages are the same as `op_async`.
pub fn op_async_unref<F, A, B, R, RV>(op_fn: F) -> Box<OpFn>
where
F: Fn(Rc<RefCell<OpState>>, A, B) -> R + 'static,
A: DeserializeOwned,
B: DeserializeOwned,
R: Future<Output = Result<RV, Error>> + 'static,
RV: Serialize + 'static,
{
Box::new(move |state, payload| -> Op {
let op_id = payload.op_id;
let pid = payload.promise_id;
// Deserialize args, sync error on failure
let args = match payload.deserialize() {
Ok(args) => args,
Err(err) => {
return Op::Sync(serialize_op_result(Err::<(), Error>(err), state))
}
};
let (a, b) = args;
use crate::futures::FutureExt;
let fut = op_fn(state.clone(), a, b)
.map(move |result| (pid, op_id, serialize_op_result(result, state)));
Op::AsyncUnref(OpCall::eager(fut))
})
}
#[cfg(test)]
mod tests {
use super::*;

View file

@ -11,10 +11,12 @@ pub struct OpMetrics {
pub ops_dispatched: u64,
pub ops_dispatched_sync: u64,
pub ops_dispatched_async: u64,
// TODO(bartlomieju): this field is never updated
pub ops_dispatched_async_unref: u64,
pub ops_completed: u64,
pub ops_completed_sync: u64,
pub ops_completed_async: u64,
// TODO(bartlomieju): this field is never updated
pub ops_completed_async_unref: u64,
pub bytes_sent_control: u64,
pub bytes_sent_data: u64,
@ -84,16 +86,4 @@ impl OpsTracker {
metrics.ops_completed += 1;
metrics.ops_completed_async += 1;
}
pub fn track_unref(&self, id: OpId) {
let metrics = &mut self.metrics_mut(id);
metrics.ops_dispatched += 1;
metrics.ops_dispatched_async_unref += 1;
}
pub fn track_unref_completed(&self, id: OpId) {
let metrics = &mut self.metrics_mut(id);
metrics.ops_completed += 1;
metrics.ops_completed_async_unref += 1;
}
}

View file

@ -29,6 +29,7 @@ use futures::task::AtomicWaker;
use std::any::Any;
use std::cell::RefCell;
use std::collections::HashMap;
use std::collections::HashSet;
use std::ffi::c_void;
use std::mem::forget;
use std::option::Option;
@ -135,23 +136,6 @@ pub type SharedArrayBufferStore =
pub type CompiledWasmModuleStore = CrossIsolateStore<v8::CompiledWasmModule>;
struct AsyncOpIterator<'a, 'b, 'c> {
ops: &'b mut FuturesUnordered<PendingOpFuture>,
cx: &'a mut Context<'c>,
}
impl Iterator for AsyncOpIterator<'_, '_, '_> {
type Item = (PromiseId, OpId, OpResult);
#[inline]
fn next(&mut self) -> Option<Self::Item> {
match self.ops.poll_next_unpin(self.cx) {
Poll::Ready(Some(item)) => Some(item),
_ => None,
}
}
}
/// Internal state for JsRuntime which is stored in one of v8::Isolate's
/// embedder slots.
pub(crate) struct JsRuntimeState {
@ -171,7 +155,7 @@ pub(crate) struct JsRuntimeState {
dyn_module_evaluate_idle_counter: u32,
pub(crate) js_error_create_fn: Rc<JsErrorCreateFn>,
pub(crate) pending_ops: FuturesUnordered<PendingOpFuture>,
pub(crate) pending_unref_ops: FuturesUnordered<PendingOpFuture>,
pub(crate) unrefed_ops: HashSet<i32>,
pub(crate) have_unpolled_ops: bool,
pub(crate) op_state: Rc<RefCell<OpState>>,
pub(crate) shared_array_buffer_store: Option<SharedArrayBufferStore>,
@ -371,7 +355,7 @@ impl JsRuntime {
js_wasm_streaming_cb: None,
js_error_create_fn,
pending_ops: FuturesUnordered::new(),
pending_unref_ops: FuturesUnordered::new(),
unrefed_ops: HashSet::new(),
shared_array_buffer_store: options.shared_array_buffer_store,
compiled_wasm_module_store: options.compiled_wasm_module_store,
op_state: op_state.clone(),
@ -801,7 +785,8 @@ impl JsRuntime {
let mut state = state_rc.borrow_mut();
let module_map = module_map_rc.borrow();
let has_pending_ops = !state.pending_ops.is_empty();
let has_pending_refed_ops =
state.pending_ops.len() > state.unrefed_ops.len();
let has_pending_dyn_imports = module_map.has_pending_dynamic_imports();
let has_pending_dyn_module_evaluation =
!state.pending_dyn_mod_evaluate.is_empty();
@ -815,7 +800,7 @@ impl JsRuntime {
.map(|i| i.has_active_sessions())
.unwrap_or(false);
if !has_pending_ops
if !has_pending_refed_ops
&& !has_pending_dyn_imports
&& !has_pending_dyn_module_evaluation
&& !has_pending_module_evaluation
@ -841,7 +826,7 @@ impl JsRuntime {
}
if has_pending_module_evaluation {
if has_pending_ops
if has_pending_refed_ops
|| has_pending_dyn_imports
|| has_pending_dyn_module_evaluation
|| has_pending_background_tasks
@ -854,7 +839,7 @@ impl JsRuntime {
}
if has_pending_dyn_module_evaluation {
if has_pending_ops
if has_pending_refed_ops
|| has_pending_dyn_imports
|| has_pending_background_tasks
{
@ -1529,21 +1514,12 @@ impl JsRuntime {
state.have_unpolled_ops = false;
let op_state = state.op_state.clone();
let ops = AsyncOpIterator {
ops: &mut state.pending_ops,
cx,
};
for (promise_id, op_id, resp) in ops {
while let Poll::Ready(Some(item)) = state.pending_ops.poll_next_unpin(cx)
{
let (promise_id, op_id, resp) = item;
op_state.borrow().tracker.track_async_completed(op_id);
args.push(v8::Integer::new(scope, promise_id as i32).into());
args.push(resp.to_v8(scope).unwrap());
}
let ops = AsyncOpIterator {
ops: &mut state.pending_unref_ops,
cx,
};
for (promise_id, op_id, resp) in ops {
op_state.borrow().tracker.track_unref_completed(op_id);
state.unrefed_ops.remove(&promise_id);
args.push(v8::Integer::new(scope, promise_id as i32).into());
args.push(resp.to_v8(scope).unwrap());
}
@ -1743,6 +1719,76 @@ pub mod tests {
assert_eq!(dispatch_count.load(Ordering::Relaxed), 2);
}
#[test]
fn test_op_async_promise_id() {
let (mut runtime, _dispatch_count) = setup(Mode::Async);
runtime
.execute_script(
"filename.js",
r#"
const p = Deno.core.opAsync("op_test", 42);
if (p[Symbol.for("Deno.core.internalPromiseId")] == undefined) {
throw new Error("missing id on returned promise");
}
"#,
)
.unwrap();
}
#[test]
fn test_ref_unref_ops() {
let (mut runtime, _dispatch_count) = setup(Mode::Async);
runtime
.execute_script(
"filename.js",
r#"
var promiseIdSymbol = Symbol.for("Deno.core.internalPromiseId");
var p1 = Deno.core.opAsync("op_test", 42);
var p2 = Deno.core.opAsync("op_test", 42);
"#,
)
.unwrap();
{
let isolate = runtime.v8_isolate();
let state_rc = JsRuntime::state(isolate);
let state = state_rc.borrow();
assert_eq!(state.pending_ops.len(), 2);
assert_eq!(state.unrefed_ops.len(), 0);
}
runtime
.execute_script(
"filename.js",
r#"
Deno.core.unrefOp(p1[promiseIdSymbol]);
Deno.core.unrefOp(p2[promiseIdSymbol]);
"#,
)
.unwrap();
{
let isolate = runtime.v8_isolate();
let state_rc = JsRuntime::state(isolate);
let state = state_rc.borrow();
assert_eq!(state.pending_ops.len(), 2);
assert_eq!(state.unrefed_ops.len(), 2);
}
runtime
.execute_script(
"filename.js",
r#"
Deno.core.refOp(p1[promiseIdSymbol]);
Deno.core.refOp(p2[promiseIdSymbol]);
"#,
)
.unwrap();
{
let isolate = runtime.v8_isolate();
let state_rc = JsRuntime::state(isolate);
let state = state_rc.borrow();
assert_eq!(state.pending_ops.len(), 2);
assert_eq!(state.unrefed_ops.len(), 0);
}
}
#[test]
fn test_dispatch_no_zero_copy_buf() {
let (mut runtime, dispatch_count) = setup(Mode::AsyncZeroCopy(false));

View file

@ -5,6 +5,7 @@
const core = window.Deno.core;
const {
Set,
SymbolFor,
TypeError,
} = window.__bootstrap.primordials;
@ -13,7 +14,9 @@
}
function pollSignal(rid) {
return core.opAsync("op_signal_poll", rid);
const promise = core.opAsync("op_signal_poll", rid);
core.unrefOp(promise[SymbolFor("Deno.core.internalPromiseId")]);
return promise;
}
function unbindSignal(rid) {

View file

@ -4,7 +4,7 @@ use deno_core::error::generic_error;
#[cfg(not(target_os = "windows"))]
use deno_core::error::type_error;
use deno_core::error::AnyError;
use deno_core::op_async_unref;
use deno_core::op_async;
use deno_core::op_sync;
use deno_core::Extension;
use deno_core::OpState;
@ -33,7 +33,7 @@ pub fn init() -> Extension {
.ops(vec![
("op_signal_bind", op_sync(op_signal_bind)),
("op_signal_unbind", op_sync(op_signal_unbind)),
("op_signal_poll", op_async_unref(op_signal_poll)),
("op_signal_poll", op_async(op_signal_poll)),
])
.build()
}