mirror of
https://github.com/denoland/deno.git
synced 2025-03-03 09:31:22 -05:00
feat(core): Event loop middlewares for Extensions (#13816)
This commit is contained in:
parent
303d691a16
commit
e166d7eed0
3 changed files with 117 additions and 0 deletions
66
core/examples/schedule_task.rs
Normal file
66
core/examples/schedule_task.rs
Normal file
|
@ -0,0 +1,66 @@
|
|||
// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
|
||||
|
||||
use deno_core::anyhow::Error;
|
||||
use deno_core::Extension;
|
||||
use deno_core::JsRuntime;
|
||||
use deno_core::OpState;
|
||||
use deno_core::RuntimeOptions;
|
||||
use futures::channel::mpsc;
|
||||
use futures::stream::StreamExt;
|
||||
use std::task::Poll;
|
||||
|
||||
type Task = Box<dyn FnOnce()>;
|
||||
|
||||
fn main() {
|
||||
let my_ext = Extension::builder()
|
||||
.ops(vec![(
|
||||
"op_schedule_task",
|
||||
deno_core::op_sync(op_schedule_task),
|
||||
)])
|
||||
.event_loop_middleware(|state, cx| {
|
||||
let recv = state.borrow_mut::<mpsc::UnboundedReceiver<Task>>();
|
||||
let mut ref_loop = false;
|
||||
while let Poll::Ready(Some(call)) = recv.poll_next_unpin(cx) {
|
||||
call();
|
||||
ref_loop = true; // `call` can callback into runtime and schedule new callbacks :-)
|
||||
}
|
||||
ref_loop
|
||||
})
|
||||
.state(move |state| {
|
||||
let (tx, rx) = mpsc::unbounded::<Task>();
|
||||
state.put(tx);
|
||||
state.put(rx);
|
||||
|
||||
Ok(())
|
||||
})
|
||||
.build();
|
||||
|
||||
// Initialize a runtime instance
|
||||
let mut js_runtime = JsRuntime::new(RuntimeOptions {
|
||||
extensions: vec![my_ext],
|
||||
..Default::default()
|
||||
});
|
||||
let runtime = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
let future = async move {
|
||||
// Schedule 10 tasks.
|
||||
js_runtime
|
||||
.execute_script(
|
||||
"<usage>",
|
||||
r#"for (let i = 1; i <= 10; i++) Deno.core.opSync("op_schedule_task", i);"#
|
||||
)
|
||||
.unwrap();
|
||||
js_runtime.run_event_loop(false).await
|
||||
};
|
||||
runtime.block_on(future).unwrap();
|
||||
}
|
||||
|
||||
fn op_schedule_task(state: &mut OpState, i: u8, _: ()) -> Result<(), Error> {
|
||||
let tx = state.borrow_mut::<mpsc::UnboundedSender<Task>>();
|
||||
tx.unbounded_send(Box::new(move || println!("Hello, world! x{}", i)))
|
||||
.expect("unbounded_send failed");
|
||||
Ok(())
|
||||
}
|
|
@ -1,12 +1,14 @@
|
|||
use crate::OpFn;
|
||||
use crate::OpState;
|
||||
use anyhow::Error;
|
||||
use std::task::Context;
|
||||
|
||||
pub type SourcePair = (&'static str, Box<SourceLoadFn>);
|
||||
pub type SourceLoadFn = dyn Fn() -> Result<String, Error>;
|
||||
pub type OpPair = (&'static str, Box<OpFn>);
|
||||
pub type OpMiddlewareFn = dyn Fn(&'static str, Box<OpFn>) -> Box<OpFn>;
|
||||
pub type OpStateFn = dyn Fn(&mut OpState) -> Result<(), Error>;
|
||||
pub type OpEventLoopFn = dyn Fn(&mut OpState, &mut Context) -> bool;
|
||||
|
||||
#[derive(Default)]
|
||||
pub struct Extension {
|
||||
|
@ -14,6 +16,7 @@ pub struct Extension {
|
|||
ops: Option<Vec<OpPair>>,
|
||||
opstate_fn: Option<Box<OpStateFn>>,
|
||||
middleware_fn: Option<Box<OpMiddlewareFn>>,
|
||||
event_loop_middleware: Option<Box<OpEventLoopFn>>,
|
||||
initialized: bool,
|
||||
}
|
||||
|
||||
|
@ -56,6 +59,22 @@ impl Extension {
|
|||
pub fn init_middleware(&mut self) -> Option<Box<OpMiddlewareFn>> {
|
||||
self.middleware_fn.take()
|
||||
}
|
||||
|
||||
pub fn init_event_loop_middleware(&mut self) -> Option<Box<OpEventLoopFn>> {
|
||||
self.event_loop_middleware.take()
|
||||
}
|
||||
|
||||
pub fn run_event_loop_middleware(
|
||||
&self,
|
||||
op_state: &mut OpState,
|
||||
cx: &mut Context,
|
||||
) -> bool {
|
||||
self
|
||||
.event_loop_middleware
|
||||
.as_ref()
|
||||
.map(|f| f(op_state, cx))
|
||||
.unwrap_or(false)
|
||||
}
|
||||
}
|
||||
|
||||
// Provides a convenient builder pattern to declare Extensions
|
||||
|
@ -65,6 +84,7 @@ pub struct ExtensionBuilder {
|
|||
ops: Vec<OpPair>,
|
||||
state: Option<Box<OpStateFn>>,
|
||||
middleware: Option<Box<OpMiddlewareFn>>,
|
||||
event_loop_middleware: Option<Box<OpEventLoopFn>>,
|
||||
}
|
||||
|
||||
impl ExtensionBuilder {
|
||||
|
@ -94,6 +114,14 @@ impl ExtensionBuilder {
|
|||
self
|
||||
}
|
||||
|
||||
pub fn event_loop_middleware<F>(&mut self, middleware_fn: F) -> &mut Self
|
||||
where
|
||||
F: Fn(&mut OpState, &mut Context) -> bool + 'static,
|
||||
{
|
||||
self.event_loop_middleware = Some(Box::new(middleware_fn));
|
||||
self
|
||||
}
|
||||
|
||||
pub fn build(&mut self) -> Extension {
|
||||
let js_files = Some(std::mem::take(&mut self.js));
|
||||
let ops = Some(std::mem::take(&mut self.ops));
|
||||
|
@ -102,6 +130,7 @@ impl ExtensionBuilder {
|
|||
ops,
|
||||
opstate_fn: self.state.take(),
|
||||
middleware_fn: self.middleware.take(),
|
||||
event_loop_middleware: self.event_loop_middleware.take(),
|
||||
initialized: false,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,6 +5,7 @@ use crate::error::attach_handle_to_error;
|
|||
use crate::error::generic_error;
|
||||
use crate::error::ErrWithV8Handle;
|
||||
use crate::error::JsError;
|
||||
use crate::extensions::OpEventLoopFn;
|
||||
use crate::inspector::JsRuntimeInspector;
|
||||
use crate::module_specifier::ModuleSpecifier;
|
||||
use crate::modules::ModuleId;
|
||||
|
@ -80,6 +81,7 @@ pub struct JsRuntime {
|
|||
has_snapshotted: bool,
|
||||
allocations: IsolateAllocations,
|
||||
extensions: Vec<Extension>,
|
||||
event_loop_middlewares: Vec<Box<OpEventLoopFn>>,
|
||||
}
|
||||
|
||||
struct DynImportModEvaluate {
|
||||
|
@ -381,6 +383,7 @@ impl JsRuntime {
|
|||
snapshot_creator: maybe_snapshot_creator,
|
||||
has_snapshotted: false,
|
||||
allocations: IsolateAllocations::default(),
|
||||
event_loop_middlewares: Vec::with_capacity(options.extensions.len()),
|
||||
extensions: options.extensions,
|
||||
};
|
||||
|
||||
|
@ -481,6 +484,10 @@ impl JsRuntime {
|
|||
for (name, opfn) in ops {
|
||||
self.register_op(name, macroware(name, opfn));
|
||||
}
|
||||
|
||||
if let Some(middleware) = e.init_event_loop_middleware() {
|
||||
self.event_loop_middlewares.push(middleware);
|
||||
}
|
||||
}
|
||||
// Restore extensions
|
||||
self.extensions = extensions;
|
||||
|
@ -788,6 +795,18 @@ impl JsRuntime {
|
|||
self.check_promise_exceptions()?;
|
||||
}
|
||||
|
||||
// Event loop middlewares
|
||||
let mut maybe_scheduling = false;
|
||||
{
|
||||
let state = state_rc.borrow();
|
||||
let op_state = state.op_state.clone();
|
||||
for f in &self.event_loop_middlewares {
|
||||
if f(&mut op_state.borrow_mut(), cx) {
|
||||
maybe_scheduling = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Top level module
|
||||
self.evaluate_pending_module();
|
||||
|
||||
|
@ -815,6 +834,7 @@ impl JsRuntime {
|
|||
&& !has_pending_module_evaluation
|
||||
&& !has_pending_background_tasks
|
||||
&& !has_tick_scheduled
|
||||
&& !maybe_scheduling
|
||||
{
|
||||
if wait_for_inspector && inspector_has_active_sessions {
|
||||
return Poll::Pending;
|
||||
|
@ -833,6 +853,7 @@ impl JsRuntime {
|
|||
if state.have_unpolled_ops
|
||||
|| has_pending_background_tasks
|
||||
|| has_tick_scheduled
|
||||
|| maybe_scheduling
|
||||
{
|
||||
state.waker.wake();
|
||||
}
|
||||
|
@ -843,6 +864,7 @@ impl JsRuntime {
|
|||
|| has_pending_dyn_module_evaluation
|
||||
|| has_pending_background_tasks
|
||||
|| has_tick_scheduled
|
||||
|| maybe_scheduling
|
||||
{
|
||||
// pass, will be polled again
|
||||
} else {
|
||||
|
|
Loading…
Add table
Reference in a new issue