0
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-03-03 09:31:22 -05:00

feat(ops): implement fast lazy async ops (#16579)

Implements fast scheduling of deferred op futures. 

```rs
#[op(fast)]
async fn op_read(
  state: Rc<RefCell<OpState>>,
  rid: ResourceId,
  buf: &mut [u8],
) -> Result<u32, Error> {
  // ...
}
```

The future is scheduled via a fast API call and polled by the event loop
after being woken up by its waker.
This commit is contained in:
Divy Srivastava 2022-11-11 05:44:53 -08:00 committed by GitHub
parent ff92febb38
commit 5b9620df7a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
24 changed files with 355 additions and 56 deletions

View file

@ -121,7 +121,9 @@ pub mod _ops {
pub use super::error_codes::get_error_code;
pub use super::ops::to_op_result;
pub use super::ops::OpCtx;
pub use super::ops::OpResult;
pub use super::runtime::queue_async_op;
pub use super::runtime::queue_fast_async_op;
pub use super::runtime::V8_WRAPPER_OBJECT_INDEX;
pub use super::runtime::V8_WRAPPER_TYPE_INDEX;
}

View file

@ -2202,6 +2202,22 @@ impl JsRealm {
// TODO(andreubotella): `mod_evaluate`, `load_main_module`, `load_side_module`
}
#[inline]
pub fn queue_fast_async_op(
ctx: &OpCtx,
op: impl Future<Output = (PromiseId, OpId, OpResult)> + 'static,
) {
let runtime_state = match ctx.runtime_state.upgrade() {
Some(rc_state) => rc_state,
// atleast 1 Rc is held by the JsRuntime.
None => unreachable!(),
};
let mut state = runtime_state.borrow_mut();
state.pending_ops.push(OpCall::lazy(op));
state.have_unpolled_ops = true;
}
#[inline]
pub fn queue_async_op(
ctx: &OpCtx,

View file

@ -30,7 +30,14 @@ pub(crate) fn generate(
// TODO(@littledivy): Use `let..else` on 1.65.0
let output_ty = match &optimizer.fast_result {
// Assert that the optimizer did not set a return type.
//
// @littledivy: This *could* potentially be used to optimize resolving
// promises but knowing the return type at compile time instead of
// serde_v8 serialization.
Some(_) if optimizer.is_async => &FastValue::Void,
Some(ty) => ty,
None if optimizer.is_async => &FastValue::Void,
None => {
return FastImplItems {
impl_and_fn: TokenStream::new(),
@ -131,7 +138,10 @@ pub(crate) fn generate(
.collect::<Punctuated<_, Comma>>();
// Apply *hard* optimizer hints.
if optimizer.has_fast_callback_option || optimizer.needs_opstate() {
if optimizer.has_fast_callback_option
|| optimizer.needs_opstate()
|| optimizer.is_async
{
fast_fn_inputs.push(parse_quote! {
fast_api_callback_options: *mut #core::v8::fast_api::FastApiCallbackOptions
});
@ -139,9 +149,20 @@ pub(crate) fn generate(
input_variants.push(q!({ CallbackOptions }));
}
// (recv, p_id, ...)
//
// Optimizer has already set it in the fast parameter variant list.
if optimizer.is_async {
if fast_fn_inputs.is_empty() {
fast_fn_inputs.push(parse_quote! { __promise_id: i32 });
} else {
fast_fn_inputs.insert(0, parse_quote! { __promise_id: i32 });
}
}
let mut output_transforms = q!({});
if optimizer.needs_opstate() {
if optimizer.needs_opstate() || optimizer.is_async {
// Grab the op_state identifier, the first one. ¯\_(ツ)_/¯
let op_state = match idents.first() {
Some(ident) if optimizer.has_opstate_in_parameters() => ident.clone(),
@ -155,24 +176,36 @@ pub(crate) fn generate(
// - `data` union is always initialized as the `v8::Local<v8::Value>` variant.
// - deno_core guarantees that `data` is a v8 External pointing to an OpCtx for the
// isolate's lifetime.
let prelude = q!(
Vars {
op_state: &op_state
},
{
let __opts: &mut v8::fast_api::FastApiCallbackOptions =
unsafe { &mut *fast_api_callback_options };
let __ctx = unsafe {
&*(v8::Local::<v8::External>::cast(unsafe { __opts.data.data })
.value() as *const _ops::OpCtx)
};
let op_state = &mut ::std::cell::RefCell::borrow_mut(&__ctx.state);
}
);
let prelude = q!({
let __opts: &mut v8::fast_api::FastApiCallbackOptions =
unsafe { &mut *fast_api_callback_options };
let __ctx = unsafe {
&*(v8::Local::<v8::External>::cast(unsafe { __opts.data.data }).value()
as *const _ops::OpCtx)
};
});
pre_transforms.push_tokens(&prelude);
pre_transforms.push_tokens(&match optimizer.is_async {
false => q!(
Vars {
op_state: &op_state
},
{
let op_state = &mut ::std::cell::RefCell::borrow_mut(&__ctx.state);
}
),
true => q!(
Vars {
op_state: &op_state
},
{
let op_state = __ctx.state.clone();
}
),
});
if optimizer.returns_result {
if optimizer.returns_result && !optimizer.is_async {
// Magic fallback 🪄
//
// If Result<T, E> is Ok(T), return T as fast value.
@ -196,9 +229,42 @@ pub(crate) fn generate(
}
}
if optimizer.is_async {
// Referenced variables are declared in parent block.
let track_async = q!({
let __op_id = __ctx.id;
let __state = ::std::cell::RefCell::borrow(&__ctx.state);
__state.tracker.track_async(__op_id);
});
output_transforms.push_tokens(&track_async);
let queue_future = if optimizer.returns_result {
q!({
let __get_class = __state.get_error_class_fn;
let result = _ops::queue_fast_async_op(__ctx, async move {
let result = result.await;
(
__promise_id,
__op_id,
_ops::to_op_result(__get_class, result),
)
});
})
} else {
q!({
let result = _ops::queue_fast_async_op(__ctx, async move {
let result = result.await;
(__promise_id, __op_id, _ops::OpResult::Ok(result.into()))
});
})
};
output_transforms.push_tokens(&queue_future);
}
if !optimizer.returns_result {
let default_output = q!({ result });
output_transforms.push_tokens(&default_output);
}
@ -360,7 +426,7 @@ fn exclude_lifetime_params(
#[cfg(test)]
mod tests {
use super::*;
use crate::Op;
use crate::{Attributes, Op};
use std::path::PathBuf;
#[testing_macros::fixture("optimizer_tests/**/*.rs")]
@ -371,8 +437,13 @@ mod tests {
let source =
std::fs::read_to_string(&input).expect("Failed to read test file");
let mut attrs = Attributes::default();
if source.contains("// @test-attr:fast") {
attrs.must_be_fast = true;
}
let item = syn::parse_str(&source).expect("Failed to parse test file");
let mut op = Op::new(item, Default::default());
let mut op = Op::new(item, attrs);
let mut optimizer = Optimizer::new();
if optimizer.analyze(&mut op).is_err() {
// Tested by optimizer::test tests.

View file

@ -79,7 +79,6 @@ impl Op {
Err(BailoutReason::FastUnsupportedParamType) => {
optimizer.fast_compatible = false;
}
Err(err) => return quote!(compile_error!(#err);),
};
let Self {

View file

@ -2,7 +2,6 @@
use crate::Op;
use pmutil::{q, Quote};
use proc_macro2::TokenStream;
use quote::{quote, ToTokens};
use std::collections::HashMap;
use std::fmt::Debug;
use std::fmt::Formatter;
@ -18,22 +17,6 @@ pub(crate) enum BailoutReason {
// Recoverable errors
MustBeSingleSegment,
FastUnsupportedParamType,
FastAsync,
}
impl ToTokens for BailoutReason {
fn to_tokens(&self, tokens: &mut TokenStream) {
match self {
BailoutReason::FastAsync => {
tokens.extend(quote! { "fast async calls are not supported" });
}
BailoutReason::MustBeSingleSegment
| BailoutReason::FastUnsupportedParamType => {
unreachable!("error not recovered");
}
}
}
}
#[derive(Debug, PartialEq)]
@ -197,6 +180,8 @@ pub(crate) struct Optimizer {
pub(crate) transforms: HashMap<usize, Transform>,
pub(crate) fast_compatible: bool,
pub(crate) is_async: bool,
}
impl Debug for Optimizer {
@ -213,6 +198,8 @@ impl Debug for Optimizer {
writeln!(f, "fast_result: {:?}", self.fast_result)?;
writeln!(f, "fast_parameters: {:?}", self.fast_parameters)?;
writeln!(f, "transforms: {:?}", self.transforms)?;
writeln!(f, "is_async: {}", self.is_async)?;
writeln!(f, "fast_compatible: {}", self.fast_compatible)?;
Ok(())
}
}
@ -231,16 +218,18 @@ impl Optimizer {
}
pub(crate) fn analyze(&mut self, op: &mut Op) -> Result<(), BailoutReason> {
if op.is_async && op.attrs.must_be_fast {
self.fast_compatible = false;
return Err(BailoutReason::FastAsync);
}
if op.attrs.is_v8 || op.is_async {
// Fast async ops are opt-in as they have a lazy polling behavior.
if op.is_async && !op.attrs.must_be_fast {
self.fast_compatible = false;
return Ok(());
}
if op.attrs.is_v8 {
self.fast_compatible = false;
return Ok(());
}
self.is_async = op.is_async;
self.fast_compatible = true;
let sig = &op.item.sig;
@ -253,12 +242,29 @@ impl Optimizer {
Signature {
output: ReturnType::Type(_, ty),
..
} => self.analyze_return_type(ty)?,
} if !self.is_async => self.analyze_return_type(ty)?,
// No need to error on the return type for async ops, its OK if
// it's not a fast value.
Signature {
output: ReturnType::Type(_, ty),
..
} => {
let _ = self.analyze_return_type(ty);
// Recover.
self.fast_result = None;
self.fast_compatible = true;
}
};
// The reciever, which we don't actually care about.
self.fast_parameters.push(FastValue::V8Value);
if self.is_async {
// The promise ID.
self.fast_parameters.push(FastValue::I32);
}
// Analyze parameters
for (index, param) in sig.inputs.iter().enumerate() {
self.analyze_param_type(index, param)?;
@ -406,7 +412,9 @@ impl Optimizer {
let segment = single_segment(segments)?;
match segment {
// -> Rc<RefCell<T>>
PathSegment { ident, .. } if ident == "RefCell" => {
PathSegment {
ident, arguments, ..
} if ident == "RefCell" => {
if let PathArguments::AngleBracketed(
AngleBracketedGenericArguments { args, .. },
) = arguments
@ -543,7 +551,7 @@ fn double_segment(
#[cfg(test)]
mod tests {
use super::*;
use crate::Op;
use crate::{Attributes, Op};
use std::path::PathBuf;
use syn::parse_quote;
@ -573,8 +581,13 @@ mod tests {
let expected = std::fs::read_to_string(input.with_extension("expected"))
.expect("Failed to read expected file");
let mut attrs = Attributes::default();
if source.contains("// @test-attr:fast") {
attrs.must_be_fast = true;
}
let item = syn::parse_str(&source).expect("Failed to parse test file");
let mut op = Op::new(item, Default::default());
let mut op = Op::new(item, attrs);
let mut optimizer = Optimizer::new();
if let Err(e) = optimizer.analyze(&mut op) {
let e_str = format!("{:?}", e);

View file

@ -0,0 +1,10 @@
=== Optimizer Dump ===
returns_result: false
has_ref_opstate: false
has_rc_opstate: false
has_fast_callback_option: false
fast_result: Some(Void)
fast_parameters: [V8Value, I32]
transforms: {}
is_async: true
fast_compatible: true

View file

@ -0,0 +1,44 @@
struct op_void_async_fast {
_phantom: ::std::marker::PhantomData<()>,
}
impl<'scope> deno_core::v8::fast_api::FastFunction for op_void_async_fast {
fn function(&self) -> *const ::std::ffi::c_void {
op_void_async_fast_fn as *const ::std::ffi::c_void
}
fn args(&self) -> &'static [deno_core::v8::fast_api::Type] {
use deno_core::v8::fast_api::Type::*;
use deno_core::v8::fast_api::CType;
&[V8Value, Int32, CallbackOptions]
}
fn return_type(&self) -> deno_core::v8::fast_api::CType {
deno_core::v8::fast_api::CType::Void
}
}
fn op_void_async_fast_fn<'scope>(
_: deno_core::v8::Local<deno_core::v8::Object>,
__promise_id: i32,
fast_api_callback_options: *mut deno_core::v8::fast_api::FastApiCallbackOptions,
) -> () {
use deno_core::v8;
use deno_core::_ops;
let __opts: &mut v8::fast_api::FastApiCallbackOptions = unsafe {
&mut *fast_api_callback_options
};
let __ctx = unsafe {
&*(v8::Local::<v8::External>::cast(unsafe { __opts.data.data }).value()
as *const _ops::OpCtx)
};
let op_state = __ctx.state.clone();
let result = op_void_async::call();
let __op_id = __ctx.id;
let __state = ::std::cell::RefCell::borrow(&__ctx.state);
__state.tracker.track_async(__op_id);
let result = _ops::queue_fast_async_op(
__ctx,
async move {
let result = result.await;
(__promise_id, __op_id, _ops::OpResult::Ok(result.into()))
},
);
result
}

View file

@ -0,0 +1,3 @@
async fn op_void_async() {
// @test-attr:fast
}

View file

@ -0,0 +1,10 @@
=== Optimizer Dump ===
returns_result: true
has_ref_opstate: false
has_rc_opstate: true
has_fast_callback_option: true
fast_result: None
fast_parameters: [V8Value, I32, U32, Uint8Array]
transforms: {2: Transform { kind: SliceU8(true), index: 2 }}
is_async: true
fast_compatible: true

View file

@ -0,0 +1,53 @@
struct op_read_fast {
_phantom: ::std::marker::PhantomData<()>,
}
impl<'scope> deno_core::v8::fast_api::FastFunction for op_read_fast {
fn function(&self) -> *const ::std::ffi::c_void {
op_read_fast_fn as *const ::std::ffi::c_void
}
fn args(&self) -> &'static [deno_core::v8::fast_api::Type] {
use deno_core::v8::fast_api::Type::*;
use deno_core::v8::fast_api::CType;
&[V8Value, Int32, Uint32, TypedArray(CType::Uint8), CallbackOptions]
}
fn return_type(&self) -> deno_core::v8::fast_api::CType {
deno_core::v8::fast_api::CType::Void
}
}
fn op_read_fast_fn<'scope>(
_: deno_core::v8::Local<deno_core::v8::Object>,
__promise_id: i32,
rid: ResourceId,
buf: *const deno_core::v8::fast_api::FastApiTypedArray<u8>,
fast_api_callback_options: *mut deno_core::v8::fast_api::FastApiCallbackOptions,
) -> () {
use deno_core::v8;
use deno_core::_ops;
let __opts: &mut v8::fast_api::FastApiCallbackOptions = unsafe {
&mut *fast_api_callback_options
};
let __ctx = unsafe {
&*(v8::Local::<v8::External>::cast(unsafe { __opts.data.data }).value()
as *const _ops::OpCtx)
};
let state = __ctx.state.clone();
let buf = match unsafe { &*buf }.get_storage_if_aligned() {
Some(v) => v,
None => {
unsafe { &mut *fast_api_callback_options }.fallback = true;
return Default::default();
}
};
let result = op_read::call(state, rid, buf);
let __op_id = __ctx.id;
let __state = ::std::cell::RefCell::borrow(&__ctx.state);
__state.tracker.track_async(__op_id);
let __get_class = __state.get_error_class_fn;
let result = _ops::queue_fast_async_op(
__ctx,
async move {
let result = result.await;
(__promise_id, __op_id, _ops::to_op_result(__get_class, result))
},
);
}

View file

@ -0,0 +1,7 @@
async fn op_read(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
buf: &mut [u8],
) -> Result<u32, Error> {
// @test-attr:fast
}

View file

@ -6,3 +6,5 @@ has_fast_callback_option: false
fast_result: Some(Void)
fast_parameters: [V8Value]
transforms: {}
is_async: false
fast_compatible: true

View file

@ -6,3 +6,5 @@ has_fast_callback_option: false
fast_result: Some(Void)
fast_parameters: [V8Value, I32]
transforms: {}
is_async: false
fast_compatible: true

View file

@ -6,3 +6,5 @@ has_fast_callback_option: false
fast_result: Some(U32)
fast_parameters: [V8Value, U32, U32]
transforms: {}
is_async: false
fast_compatible: true

View file

@ -6,3 +6,5 @@ has_fast_callback_option: false
fast_result: Some(Void)
fast_parameters: [V8Value]
transforms: {}
is_async: false
fast_compatible: true

View file

@ -6,3 +6,5 @@ has_fast_callback_option: false
fast_result: Some(U32)
fast_parameters: [V8Value, U32, U32]
transforms: {}
is_async: false
fast_compatible: true

View file

@ -0,0 +1,10 @@
=== Optimizer Dump ===
returns_result: true
has_ref_opstate: true
has_rc_opstate: false
has_fast_callback_option: false
fast_result: Some(U32)
fast_parameters: [V8Value]
transforms: {}
is_async: false
fast_compatible: true

View file

@ -0,0 +1,40 @@
struct op_listen_fast {
_phantom: ::std::marker::PhantomData<()>,
}
impl<'scope> deno_core::v8::fast_api::FastFunction for op_listen_fast {
fn function(&self) -> *const ::std::ffi::c_void {
op_listen_fast_fn as *const ::std::ffi::c_void
}
fn args(&self) -> &'static [deno_core::v8::fast_api::Type] {
use deno_core::v8::fast_api::Type::*;
use deno_core::v8::fast_api::CType;
&[V8Value, CallbackOptions]
}
fn return_type(&self) -> deno_core::v8::fast_api::CType {
deno_core::v8::fast_api::CType::Uint32
}
}
fn op_listen_fast_fn<'scope>(
_: deno_core::v8::Local<deno_core::v8::Object>,
fast_api_callback_options: *mut deno_core::v8::fast_api::FastApiCallbackOptions,
) -> u32 {
use deno_core::v8;
use deno_core::_ops;
let __opts: &mut v8::fast_api::FastApiCallbackOptions = unsafe {
&mut *fast_api_callback_options
};
let __ctx = unsafe {
&*(v8::Local::<v8::External>::cast(unsafe { __opts.data.data }).value()
as *const _ops::OpCtx)
};
let state = &mut ::std::cell::RefCell::borrow_mut(&__ctx.state);
let result = op_listen::call(state);
match result {
Ok(result) => result,
Err(err) => {
state.last_fast_op_error.replace(err);
__opts.fallback = true;
Default::default()
}
}
}

View file

@ -0,0 +1,9 @@
fn op_listen(state: &mut OpState) -> Result<ResourceId, Error> {
log::debug!("listen");
let addr = "127.0.0.1:4570".parse::<SocketAddr>().unwrap();
let std_listener = std::net::TcpListener::bind(&addr)?;
std_listener.set_nonblocking(true)?;
let listener = TcpListener::try_from(std_listener)?;
let rid = state.resource_table.add(listener);
Ok(rid)
}

View file

@ -6,3 +6,5 @@ has_fast_callback_option: true
fast_result: Some(Void)
fast_parameters: [V8Value, Uint8Array]
transforms: {1: Transform { kind: SliceU8(true), index: 1 }}
is_async: false
fast_compatible: true

View file

@ -6,3 +6,5 @@ has_fast_callback_option: false
fast_result: Some(U32)
fast_parameters: [V8Value, U32, U32, U32, U32]
transforms: {}
is_async: false
fast_compatible: true

View file

@ -6,3 +6,5 @@ has_fast_callback_option: false
fast_result: Some(Bool)
fast_parameters: [V8Value, V8Value]
transforms: {0: Transform { kind: V8Value, index: 0 }}
is_async: false
fast_compatible: true

View file

@ -1,11 +1,3 @@
error: fast async calls are not supported
--> tests/compile_fail/unsupported.rs:22:1
|
22 | #[op(fast)]
| ^^^^^^^^^^^
|
= note: this error originates in the attribute macro `op` (in Nightly builds, run with -Z macro-backtrace for more info)
error[E0277]: the trait bound `&mut FastApiCallbackOptions<'_>: Deserialize<'_>` is not satisfied
--> tests/compile_fail/unsupported.rs:17:1
|

View file

@ -120,6 +120,10 @@ async function clippy() {
"-A",
// https://github.com/rust-lang/rust-clippy/issues/407
"clippy::extra_unused_lifetimes",
"-A",
// https://github.com/rust-lang/rust-clippy/issues/7271
// False positives in core/resources.rs for lifetime elision.
"clippy::needless_lifetimes",
],
stdout: "inherit",
stderr: "inherit",