diff --git a/cli/tools/repl.rs b/cli/tools/repl.rs index 957d60b66b..6c6d262406 100644 --- a/cli/tools/repl.rs +++ b/cli/tools/repl.rs @@ -25,24 +25,25 @@ use rustyline::Context; use rustyline::Editor; use rustyline_derive::{Helper, Hinter}; use std::borrow::Cow; +use std::cell::RefCell; use std::path::PathBuf; -use std::sync::mpsc::channel; -use std::sync::mpsc::sync_channel; -use std::sync::mpsc::Receiver; -use std::sync::mpsc::Sender; -use std::sync::mpsc::SyncSender; use std::sync::Arc; use std::sync::Mutex; use swc_ecmascript::parser::token::{Token, Word}; -use tokio::pin; +use tokio::sync::mpsc::channel; +use tokio::sync::mpsc::unbounded_channel; +use tokio::sync::mpsc::Receiver; +use tokio::sync::mpsc::Sender; +use tokio::sync::mpsc::UnboundedReceiver; +use tokio::sync::mpsc::UnboundedSender; // Provides helpers to the editor like validation for multi-line edits, completion candidates for // tab completion. #[derive(Helper, Hinter)] struct EditorHelper { context_id: u64, - message_tx: SyncSender<(String, Option)>, - response_rx: Receiver>, + message_tx: Sender<(String, Option)>, + response_rx: RefCell>>, } impl EditorHelper { @@ -51,8 +52,10 @@ impl EditorHelper { method: &str, params: Option, ) -> Result { - self.message_tx.send((method.to_string(), params))?; - self.response_rx.recv()? + self + .message_tx + .blocking_send((method.to_string(), params))?; + self.response_rx.borrow_mut().blocking_recv().unwrap() } fn get_global_lexical_scope_names(&self) -> Vec { @@ -444,7 +447,7 @@ impl ReplSession { } pub async fn run_event_loop(&mut self) -> Result<(), AnyError> { - self.worker.run_event_loop(false).await + self.worker.run_event_loop(true).await } pub async fn evaluate_line_and_get_output( @@ -622,39 +625,31 @@ impl ReplSession { async fn read_line_and_poll( repl_session: &mut ReplSession, - message_rx: &Receiver<(String, Option)>, - response_tx: &Sender>, + message_rx: &mut Receiver<(String, Option)>, + response_tx: &UnboundedSender>, editor: ReplEditor, ) -> Result { - let mut line = tokio::task::spawn_blocking(move || editor.readline()); - + let mut line_fut = tokio::task::spawn_blocking(move || editor.readline()); let mut poll_worker = true; loop { - for (method, params) in message_rx.try_iter() { - let result = repl_session - .post_message_with_event_loop(&method, params) - .await; - response_tx.send(result).unwrap(); - } - - // Because an inspector websocket client may choose to connect at anytime when we have an - // inspector server we need to keep polling the worker to pick up new connections. - // TODO(piscisaureus): the above comment is a red herring; figure out if/why - // the event loop isn't woken by a waker when a websocket client connects. - let timeout = tokio::time::sleep(tokio::time::Duration::from_millis(100)); - pin!(timeout); - tokio::select! { - result = &mut line => { + result = &mut line_fut => { return result.unwrap(); } + result = message_rx.recv() => { + if let Some((method, params)) = result { + let result = repl_session + .post_message_with_event_loop(&method, params) + .await; + response_tx.send(result).unwrap(); + } + + poll_worker = true; + }, _ = repl_session.run_event_loop(), if poll_worker => { poll_worker = false; } - _ = timeout => { - poll_worker = true - } } } } @@ -664,13 +659,13 @@ pub async fn run( worker: MainWorker, ) -> Result<(), AnyError> { let mut repl_session = ReplSession::initialize(worker).await?; - let (message_tx, message_rx) = sync_channel(1); - let (response_tx, response_rx) = channel(); + let (message_tx, mut message_rx) = channel(1); + let (response_tx, response_rx) = unbounded_channel(); let helper = EditorHelper { context_id: repl_session.context_id, message_tx, - response_rx, + response_rx: RefCell::new(response_rx), }; let history_file_path = program_state.dir.root.join("deno_history.txt"); @@ -682,7 +677,7 @@ pub async fn run( loop { let line = read_line_and_poll( &mut repl_session, - &message_rx, + &mut message_rx, &response_tx, editor.clone(), ) diff --git a/runtime/worker.rs b/runtime/worker.rs index 567e752536..091b971193 100644 --- a/runtime/worker.rs +++ b/runtime/worker.rs @@ -7,7 +7,6 @@ use crate::ops; use crate::permissions::Permissions; use deno_broadcast_channel::InMemoryBroadcastChannel; use deno_core::error::AnyError; -use deno_core::futures::future::poll_fn; use deno_core::futures::stream::StreamExt; use deno_core::futures::Future; use deno_core::located_script_name; @@ -247,7 +246,7 @@ impl MainWorker { &mut self, wait_for_inspector: bool, ) -> Result<(), AnyError> { - poll_fn(|cx| self.poll_event_loop(cx, wait_for_inspector)).await + self.js_runtime.run_event_loop(wait_for_inspector).await } /// A utility function that runs provided future concurrently with the event loop.