0
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-03-03 09:31:22 -05:00

chore: get rid of REPL timeout (#11175)

* Get rid of timeout
* Use tokio channel and reduce calls to run_event_loop

Co-authored-by: David Sherret <dsherret@gmail.com>
This commit is contained in:
Bert Belder 2021-06-29 20:39:28 +02:00 committed by GitHub
parent a0c0daac24
commit 5db9f627e3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 33 additions and 39 deletions

View file

@ -25,24 +25,25 @@ use rustyline::Context;
use rustyline::Editor;
use rustyline_derive::{Helper, Hinter};
use std::borrow::Cow;
use std::cell::RefCell;
use std::path::PathBuf;
use std::sync::mpsc::channel;
use std::sync::mpsc::sync_channel;
use std::sync::mpsc::Receiver;
use std::sync::mpsc::Sender;
use std::sync::mpsc::SyncSender;
use std::sync::Arc;
use std::sync::Mutex;
use swc_ecmascript::parser::token::{Token, Word};
use tokio::pin;
use tokio::sync::mpsc::channel;
use tokio::sync::mpsc::unbounded_channel;
use tokio::sync::mpsc::Receiver;
use tokio::sync::mpsc::Sender;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio::sync::mpsc::UnboundedSender;
// Provides helpers to the editor like validation for multi-line edits, completion candidates for
// tab completion.
#[derive(Helper, Hinter)]
struct EditorHelper {
context_id: u64,
message_tx: SyncSender<(String, Option<Value>)>,
response_rx: Receiver<Result<Value, AnyError>>,
message_tx: Sender<(String, Option<Value>)>,
response_rx: RefCell<UnboundedReceiver<Result<Value, AnyError>>>,
}
impl EditorHelper {
@ -51,8 +52,10 @@ impl EditorHelper {
method: &str,
params: Option<Value>,
) -> Result<Value, AnyError> {
self.message_tx.send((method.to_string(), params))?;
self.response_rx.recv()?
self
.message_tx
.blocking_send((method.to_string(), params))?;
self.response_rx.borrow_mut().blocking_recv().unwrap()
}
fn get_global_lexical_scope_names(&self) -> Vec<String> {
@ -444,7 +447,7 @@ impl ReplSession {
}
pub async fn run_event_loop(&mut self) -> Result<(), AnyError> {
self.worker.run_event_loop(false).await
self.worker.run_event_loop(true).await
}
pub async fn evaluate_line_and_get_output(
@ -622,39 +625,31 @@ impl ReplSession {
async fn read_line_and_poll(
repl_session: &mut ReplSession,
message_rx: &Receiver<(String, Option<Value>)>,
response_tx: &Sender<Result<Value, AnyError>>,
message_rx: &mut Receiver<(String, Option<Value>)>,
response_tx: &UnboundedSender<Result<Value, AnyError>>,
editor: ReplEditor,
) -> Result<String, ReadlineError> {
let mut line = tokio::task::spawn_blocking(move || editor.readline());
let mut line_fut = tokio::task::spawn_blocking(move || editor.readline());
let mut poll_worker = true;
loop {
for (method, params) in message_rx.try_iter() {
let result = repl_session
.post_message_with_event_loop(&method, params)
.await;
response_tx.send(result).unwrap();
}
// Because an inspector websocket client may choose to connect at anytime when we have an
// inspector server we need to keep polling the worker to pick up new connections.
// TODO(piscisaureus): the above comment is a red herring; figure out if/why
// the event loop isn't woken by a waker when a websocket client connects.
let timeout = tokio::time::sleep(tokio::time::Duration::from_millis(100));
pin!(timeout);
tokio::select! {
result = &mut line => {
result = &mut line_fut => {
return result.unwrap();
}
result = message_rx.recv() => {
if let Some((method, params)) = result {
let result = repl_session
.post_message_with_event_loop(&method, params)
.await;
response_tx.send(result).unwrap();
}
poll_worker = true;
},
_ = repl_session.run_event_loop(), if poll_worker => {
poll_worker = false;
}
_ = timeout => {
poll_worker = true
}
}
}
}
@ -664,13 +659,13 @@ pub async fn run(
worker: MainWorker,
) -> Result<(), AnyError> {
let mut repl_session = ReplSession::initialize(worker).await?;
let (message_tx, message_rx) = sync_channel(1);
let (response_tx, response_rx) = channel();
let (message_tx, mut message_rx) = channel(1);
let (response_tx, response_rx) = unbounded_channel();
let helper = EditorHelper {
context_id: repl_session.context_id,
message_tx,
response_rx,
response_rx: RefCell::new(response_rx),
};
let history_file_path = program_state.dir.root.join("deno_history.txt");
@ -682,7 +677,7 @@ pub async fn run(
loop {
let line = read_line_and_poll(
&mut repl_session,
&message_rx,
&mut message_rx,
&response_tx,
editor.clone(),
)

View file

@ -7,7 +7,6 @@ use crate::ops;
use crate::permissions::Permissions;
use deno_broadcast_channel::InMemoryBroadcastChannel;
use deno_core::error::AnyError;
use deno_core::futures::future::poll_fn;
use deno_core::futures::stream::StreamExt;
use deno_core::futures::Future;
use deno_core::located_script_name;
@ -247,7 +246,7 @@ impl MainWorker {
&mut self,
wait_for_inspector: bool,
) -> Result<(), AnyError> {
poll_fn(|cx| self.poll_event_loop(cx, wait_for_inspector)).await
self.js_runtime.run_event_loop(wait_for_inspector).await
}
/// A utility function that runs provided future concurrently with the event loop.