From c171813e894f0759abb1b80413aa2a24dbad079b Mon Sep 17 00:00:00 2001 From: Ryan Dahl Date: Wed, 1 May 2019 18:22:32 -0400 Subject: [PATCH] core: express op as enum (#2255) --- cli/ops.rs | 11 ++++++-- cli/state.rs | 6 +--- core/examples/http_bench.rs | 14 ++++++---- core/isolate.rs | 55 +++++++++++++++++++++---------------- 4 files changed, 48 insertions(+), 38 deletions(-) diff --git a/cli/ops.rs b/cli/ops.rs index 5463bac4d6..c49ce517c1 100644 --- a/cli/ops.rs +++ b/cli/ops.rs @@ -83,7 +83,7 @@ pub fn dispatch_all( control: &[u8], zero_copy: Option, op_selector: OpSelector, -) -> (bool, Box) { +) -> Op { let bytes_sent_control = control.len(); let bytes_sent_zero_copy = zero_copy.as_ref().map(|b| b.len()).unwrap_or(0); let base = msg::get_root_as_base(&control); @@ -101,7 +101,7 @@ pub fn dispatch_all( let state = state.clone(); state.metrics_op_dispatched(bytes_sent_control, bytes_sent_zero_copy); - let boxed_op = Box::new( + let fut = Box::new( op.or_else(move |err: DenoError| -> Result { debug!("op err {}", err); // No matter whether we got an Err or Ok, we want a serialized message to @@ -143,7 +143,12 @@ pub fn dispatch_all( msg::enum_name_any(inner_type), base.sync() ); - (base.sync(), boxed_op) + + if base.sync() { + Op::Sync(fut.wait().unwrap()) + } else { + Op::Async(fut) + } } pub fn op_selector_compiler(inner_type: msg::Any) -> Option { diff --git a/cli/state.rs b/cli/state.rs index 2bfc641d5a..8a4f4eaee0 100644 --- a/cli/state.rs +++ b/cli/state.rs @@ -81,11 +81,7 @@ impl Deref for ThreadSafeState { } impl ThreadSafeState { - pub fn dispatch( - &self, - control: &[u8], - zero_copy: Option, - ) -> (bool, Box) { + pub fn dispatch(&self, control: &[u8], zero_copy: Option) -> Op { ops::dispatch_all(self, control, zero_copy, self.dispatch_selector) } } diff --git a/core/examples/http_bench.rs b/core/examples/http_bench.rs index b355f5568a..757e9a3b71 100644 --- a/core/examples/http_bench.rs +++ b/core/examples/http_bench.rs @@ -111,10 +111,7 @@ fn test_record_from() { pub type HttpBenchOp = dyn Future + Send; -fn dispatch( - control: &[u8], - zero_copy_buf: Option, -) -> (bool, Box) { +fn dispatch(control: &[u8], zero_copy_buf: Option) -> Op { let record = Record::from(control); let is_sync = record.promise_id == 0; let http_bench_op = match record.op_id { @@ -147,7 +144,7 @@ fn dispatch( let mut record_a = record.clone(); let mut record_b = record.clone(); - let op = Box::new( + let fut = Box::new( http_bench_op .and_then(move |result| { record_a.result = result; @@ -161,7 +158,12 @@ fn dispatch( Ok(record.into()) }), ); - (is_sync, op) + + if is_sync { + Op::Sync(fut.wait().unwrap()) + } else { + Op::Async(fut) + } } fn main() { diff --git a/core/isolate.rs b/core/isolate.rs index 2cafb29b6e..96d9dc24bc 100644 --- a/core/isolate.rs +++ b/core/isolate.rs @@ -26,7 +26,13 @@ use std::ptr::null; use std::sync::{Arc, Mutex, Once, ONCE_INIT}; pub type Buf = Box<[u8]>; -pub type Op = dyn Future + Send; + +pub type OpAsyncFuture = Box + Send>; + +pub enum Op { + Sync(Buf), + Async(OpAsyncFuture), +} /// Stores a script used to initalize a Isolate pub struct Script<'a> { @@ -46,8 +52,7 @@ pub enum StartupData<'a> { #[derive(Default)] pub struct Config { - dispatch: - Option) -> (bool, Box) + Send + Sync>>, + dispatch: Option) -> Op>>, pub will_snapshot: bool, } @@ -57,7 +62,7 @@ impl Config { /// corresponds to the second argument of Deno.core.dispatch(). pub fn dispatch(&mut self, f: F) where - F: Fn(&[u8], Option) -> (bool, Box) + Send + Sync + 'static, + F: Fn(&[u8], Option) -> Op + Send + Sync + 'static, { self.dispatch = Some(Arc::new(f)); } @@ -69,15 +74,15 @@ impl Config { /// pending ops have completed. /// /// Ops are created in JavaScript by calling Deno.core.dispatch(), and in Rust -/// by implementing deno::Dispatch::dispatch. An Op corresponds exactly to a -/// Promise in JavaScript. +/// by implementing deno::Dispatch::dispatch. An async Op corresponds exactly to +/// a Promise in JavaScript. pub struct Isolate { libdeno_isolate: *const libdeno::isolate, shared_libdeno_isolate: Arc>>, config: Config, needs_init: bool, shared: SharedQueue, - pending_ops: FuturesUnordered>, + pending_ops: FuturesUnordered, have_unpolled_ops: bool, } @@ -175,7 +180,7 @@ impl Isolate { let isolate = unsafe { Isolate::from_raw_ptr(user_data) }; let control_shared = isolate.shared.shift(); - let (is_sync, op) = if control_argv0.len() > 0 { + let op = if control_argv0.len() > 0 { // The user called Deno.core.send(control) if let Some(ref f) = isolate.config.dispatch { f(control_argv0.as_ref(), PinnedBuf::new(zero_copy_buf)) @@ -201,16 +206,18 @@ impl Isolate { // At this point the SharedQueue should be empty. assert_eq!(isolate.shared.size(), 0); - if is_sync { - let res_record = op.wait().unwrap(); - // For sync messages, we always return the response via Deno.core.send's - // return value. - // TODO(ry) check that if JSError thrown during respond(), that it will be - // picked up. - let _ = isolate.respond(Some(&res_record)); - } else { - isolate.pending_ops.push(op); - isolate.have_unpolled_ops = true; + match op { + Op::Sync(buf) => { + // For sync messages, we always return the response via Deno.core.send's + // return value. + // TODO(ry) check that if JSError thrown during respond(), that it will be + // picked up. + let _ = isolate.respond(Some(&buf)); + } + Op::Async(fut) => { + isolate.pending_ops.push(fut); + isolate.have_unpolled_ops = true; + } } } @@ -555,19 +562,19 @@ pub mod tests { let dispatch_count_ = dispatch_count.clone(); let mut config = Config::default(); - config.dispatch(move |control, _| -> (bool, Box) { + config.dispatch(move |control, _| -> Op { dispatch_count_.fetch_add(1, Ordering::Relaxed); match mode { Mode::AsyncImmediate => { assert_eq!(control.len(), 1); assert_eq!(control[0], 42); let buf = vec![43u8].into_boxed_slice(); - (false, Box::new(futures::future::ok(buf))) + Op::Async(Box::new(futures::future::ok(buf))) } Mode::OverflowReqSync => { assert_eq!(control.len(), 100 * 1024 * 1024); let buf = vec![43u8].into_boxed_slice(); - (true, Box::new(futures::future::ok(buf))) + Op::Sync(buf) } Mode::OverflowResSync => { assert_eq!(control.len(), 1); @@ -576,12 +583,12 @@ pub mod tests { vec.resize(100 * 1024 * 1024, 0); vec[0] = 99; let buf = vec.into_boxed_slice(); - (true, Box::new(futures::future::ok(buf))) + Op::Sync(buf) } Mode::OverflowReqAsync => { assert_eq!(control.len(), 100 * 1024 * 1024); let buf = vec![43u8].into_boxed_slice(); - (false, Box::new(futures::future::ok(buf))) + Op::Async(Box::new(futures::future::ok(buf))) } Mode::OverflowResAsync => { assert_eq!(control.len(), 1); @@ -590,7 +597,7 @@ pub mod tests { vec.resize(100 * 1024 * 1024, 0); vec[0] = 4; let buf = vec.into_boxed_slice(); - (false, Box::new(futures::future::ok(buf))) + Op::Async(Box::new(futures::future::ok(buf))) } } });