1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-22 06:09:25 -05:00

feat: op registration during calls (#3375)

This commit is contained in:
Andy Finch 2019-11-18 21:13:04 -05:00 committed by Ry Dahl
parent f437521afb
commit b6b813cbfc
3 changed files with 74 additions and 22 deletions

View file

@ -165,7 +165,7 @@ fn main() {
filename: "http_bench.js", filename: "http_bench.js",
}); });
let mut isolate = deno::Isolate::new(startup_data, false); let isolate = deno::Isolate::new(startup_data, false);
isolate.register_op("listen", http_op(op_listen)); isolate.register_op("listen", http_op(op_listen));
isolate.register_op("accept", http_op(op_accept)); isolate.register_op("accept", http_op(op_accept));
isolate.register_op("read", http_op(op_read)); isolate.register_op("read", http_op(op_read));

View file

@ -178,7 +178,7 @@ pub struct Isolate {
pending_dyn_imports: FuturesUnordered<StreamFuture<IntoStream<DynImport>>>, pending_dyn_imports: FuturesUnordered<StreamFuture<IntoStream<DynImport>>>,
have_unpolled_ops: bool, have_unpolled_ops: bool,
startup_script: Option<OwnedScript>, startup_script: Option<OwnedScript>,
op_registry: OpRegistry, pub op_registry: Arc<OpRegistry>,
eager_poll_count: u32, eager_poll_count: u32,
waker: AtomicWaker, waker: AtomicWaker,
} }
@ -245,7 +245,7 @@ impl Isolate {
have_unpolled_ops: false, have_unpolled_ops: false,
pending_dyn_imports: FuturesUnordered::new(), pending_dyn_imports: FuturesUnordered::new(),
startup_script, startup_script,
op_registry: OpRegistry::new(), op_registry: Arc::new(OpRegistry::new()),
eager_poll_count: 0, eager_poll_count: 0,
waker: AtomicWaker::new(), waker: AtomicWaker::new(),
} }
@ -256,7 +256,7 @@ impl Isolate {
/// corresponds to the second argument of Deno.core.dispatch(). /// corresponds to the second argument of Deno.core.dispatch().
/// ///
/// Requires runtime to explicitly ask for op ids before using any of the ops. /// Requires runtime to explicitly ask for op ids before using any of the ops.
pub fn register_op<F>(&mut self, name: &str, op: F) -> OpId pub fn register_op<F>(&self, name: &str, op: F) -> OpId
where where
F: Fn(&[u8], Option<PinnedBuf>) -> CoreOp + Send + Sync + 'static, F: Fn(&[u8], Option<PinnedBuf>) -> CoreOp + Send + Sync + 'static,
{ {

View file

@ -4,6 +4,8 @@ use crate::PinnedBuf;
use futures::Future; use futures::Future;
use std::collections::HashMap; use std::collections::HashMap;
use std::pin::Pin; use std::pin::Pin;
use std::sync::Arc;
use std::sync::RwLock;
pub type Buf = Box<[u8]>; pub type Buf = Box<[u8]>;
@ -25,17 +27,18 @@ pub type CoreError = ();
pub type CoreOp = Op<CoreError>; pub type CoreOp = Op<CoreError>;
/// Main type describing op /// Main type describing op
type OpDispatcher = dyn Fn(&[u8], Option<PinnedBuf>) -> CoreOp; type OpDispatcher =
dyn Fn(&[u8], Option<PinnedBuf>) -> CoreOp + Send + Sync + 'static;
#[derive(Default)] #[derive(Default)]
pub struct OpRegistry { pub struct OpRegistry {
dispatchers: Vec<Box<OpDispatcher>>, dispatchers: RwLock<Vec<Arc<Box<OpDispatcher>>>>,
name_to_id: HashMap<String, OpId>, name_to_id: RwLock<HashMap<String, OpId>>,
} }
impl OpRegistry { impl OpRegistry {
pub fn new() -> Self { pub fn new() -> Self {
let mut registry = Self::default(); let registry = Self::default();
let op_id = registry.register("ops", |_, _| { let op_id = registry.register("ops", |_, _| {
// ops is a special op which is handled in call. // ops is a special op which is handled in call.
unreachable!() unreachable!()
@ -44,24 +47,29 @@ impl OpRegistry {
registry registry
} }
pub fn register<F>(&mut self, name: &str, op: F) -> OpId pub fn register<F>(&self, name: &str, op: F) -> OpId
where where
F: Fn(&[u8], Option<PinnedBuf>) -> CoreOp + Send + Sync + 'static, F: Fn(&[u8], Option<PinnedBuf>) -> CoreOp + Send + Sync + 'static,
{ {
let op_id = self.dispatchers.len() as u32; let mut lock = self.dispatchers.write().unwrap();
let op_id = lock.len() as u32;
let existing = self.name_to_id.insert(name.to_string(), op_id); let mut name_lock = self.name_to_id.write().unwrap();
let existing = name_lock.insert(name.to_string(), op_id);
assert!( assert!(
existing.is_none(), existing.is_none(),
format!("Op already registered: {}", name) format!("Op already registered: {}", name)
); );
self.dispatchers.push(Box::new(op)); lock.push(Arc::new(Box::new(op)));
drop(name_lock);
drop(lock);
op_id op_id
} }
fn json_map(&self) -> Buf { fn json_map(&self) -> Buf {
let op_map_json = serde_json::to_string(&self.name_to_id).unwrap(); let lock = self.name_to_id.read().unwrap();
let op_map_json = serde_json::to_string(&*lock).unwrap();
op_map_json.as_bytes().to_owned().into_boxed_slice() op_map_json.as_bytes().to_owned().into_boxed_slice()
} }
@ -78,13 +86,15 @@ impl OpRegistry {
if op_id == 0 { if op_id == 0 {
return Some(Op::Sync(self.json_map())); return Some(Op::Sync(self.json_map()));
} }
let lock = self.dispatchers.read().unwrap();
let d = match self.dispatchers.get(op_id as usize) { if let Some(op) = lock.get(op_id as usize) {
Some(handler) => &*handler, let op_ = Arc::clone(&op);
None => return None, // This should allow for changes to the dispatcher list during a call.
}; drop(lock);
Some(op_(control, zero_copy_buf))
Some(d(control, zero_copy_buf)) } else {
None
}
} }
} }
@ -92,7 +102,7 @@ impl OpRegistry {
fn test_op_registry() { fn test_op_registry() {
use std::sync::atomic; use std::sync::atomic;
use std::sync::Arc; use std::sync::Arc;
let mut op_registry = OpRegistry::new(); let op_registry = OpRegistry::new();
let c = Arc::new(atomic::AtomicUsize::new(0)); let c = Arc::new(atomic::AtomicUsize::new(0));
let c_ = c.clone(); let c_ = c.clone();
@ -106,7 +116,8 @@ fn test_op_registry() {
let mut expected = HashMap::new(); let mut expected = HashMap::new();
expected.insert("ops".to_string(), 0); expected.insert("ops".to_string(), 0);
expected.insert("test".to_string(), 1); expected.insert("test".to_string(), 1);
assert_eq!(op_registry.name_to_id, expected); let name_to_id = op_registry.name_to_id.read().unwrap();
assert_eq!(*name_to_id, expected);
let res = op_registry.call(test_id, &[], None).unwrap(); let res = op_registry.call(test_id, &[], None).unwrap();
if let Op::Sync(buf) = res { if let Op::Sync(buf) = res {
@ -119,3 +130,44 @@ fn test_op_registry() {
let res = op_registry.call(100, &[], None); let res = op_registry.call(100, &[], None);
assert!(res.is_none()); assert!(res.is_none());
} }
#[test]
fn register_op_during_call() {
use std::sync::atomic;
use std::sync::Arc;
let op_registry = Arc::new(OpRegistry::new());
let c = Arc::new(atomic::AtomicUsize::new(0));
let c_ = c.clone();
let op_registry_ = op_registry.clone();
let test_id = op_registry.register("dynamic_register_op", move |_, _| {
let c__ = c_.clone();
op_registry_.register("test", move |_, _| {
c__.fetch_add(1, atomic::Ordering::SeqCst);
CoreOp::Sync(Box::new([]))
});
CoreOp::Sync(Box::new([]))
});
assert!(test_id != 0);
op_registry.call(test_id, &[], None);
let mut expected = HashMap::new();
expected.insert("ops".to_string(), 0);
expected.insert("dynamic_register_op".to_string(), 1);
expected.insert("test".to_string(), 2);
let name_to_id = op_registry.name_to_id.read().unwrap();
assert_eq!(*name_to_id, expected);
let res = op_registry.call(2, &[], None).unwrap();
if let Op::Sync(buf) = res {
assert_eq!(buf.len(), 0);
} else {
unreachable!();
}
assert_eq!(c.load(atomic::Ordering::SeqCst), 1);
let res = op_registry.call(100, &[], None);
assert!(res.is_none());
}