From c0cb198114ccc2dc4fa2764d307ad985c456882a Mon Sep 17 00:00:00 2001 From: Bert Belder Date: Fri, 3 Apr 2020 19:40:11 +0200 Subject: [PATCH] Make inspector more robust, add --inspect-brk support (#4552) --- cli/compilers/ts.rs | 5 +- cli/compilers/wasm.rs | 5 +- cli/flags.rs | 55 +- cli/global_state.rs | 11 - cli/inspector.rs | 1041 +++++++++++++++++++------------- cli/lib.rs | 3 +- cli/state.rs | 17 +- cli/tests/inspector2.js | 1 + cli/tests/integration_tests.rs | 88 ++- cli/worker.rs | 47 +- 10 files changed, 785 insertions(+), 488 deletions(-) create mode 100644 cli/tests/inspector2.js diff --git a/cli/compilers/ts.rs b/cli/compilers/ts.rs index a74e4a0966..641f16bca3 100644 --- a/cli/compilers/ts.rs +++ b/cli/compilers/ts.rs @@ -255,8 +255,9 @@ impl TsCompiler { fn setup_worker(global_state: GlobalState) -> CompilerWorker { let entry_point = ModuleSpecifier::resolve_url_or_path("./__$deno$ts_compiler.ts").unwrap(); - let worker_state = State::new(global_state.clone(), None, entry_point) - .expect("Unable to create worker state"); + let worker_state = + State::new(global_state.clone(), None, entry_point, DebugType::Internal) + .expect("Unable to create worker state"); // Count how many times we start the compiler worker. global_state.compiler_starts.fetch_add(1, Ordering::SeqCst); diff --git a/cli/compilers/wasm.rs b/cli/compilers/wasm.rs index bd22a2523e..ff7a04097d 100644 --- a/cli/compilers/wasm.rs +++ b/cli/compilers/wasm.rs @@ -56,8 +56,9 @@ impl WasmCompiler { let entry_point = ModuleSpecifier::resolve_url_or_path("./__$deno$wasm_compiler.ts") .unwrap(); - let worker_state = State::new(global_state.clone(), None, entry_point) - .expect("Unable to create worker state"); + let worker_state = + State::new(global_state.clone(), None, entry_point, DebugType::Internal) + .expect("Unable to create worker state"); // Count how many times we start the compiler worker. global_state.compiler_starts.fetch_add(1, Ordering::SeqCst); diff --git a/cli/flags.rs b/cli/flags.rs index db14ad4596..e6b56a94a8 100644 --- a/cli/flags.rs +++ b/cli/flags.rs @@ -7,6 +7,7 @@ use clap::ArgMatches; use clap::SubCommand; use log::Level; use std::collections::HashSet; +use std::net::SocketAddr; use std::path::{Path, PathBuf}; /// Creates vector of strings, Vec @@ -107,8 +108,8 @@ pub struct Flags { pub no_prompts: bool, pub no_remote: bool, pub cached_only: bool, - pub inspect: Option, - pub inspect_brk: Option, + pub inspect: Option, + pub inspect_brk: Option, pub seed: Option, pub v8_flags: Option>, @@ -1021,6 +1022,7 @@ fn ca_file_arg<'a, 'b>() -> Arg<'a, 'b> { .help("Load certificate authority from PEM encoded file") .takes_value(true) } + fn ca_file_arg_parse(flags: &mut Flags, matches: &clap::ArgMatches) { flags.ca_file = matches.value_of("cert").map(ToOwned::to_owned); } @@ -1035,7 +1037,8 @@ fn inspect_args<'a, 'b>(app: App<'a, 'b>) -> App<'a, 'b> { .min_values(0) .max_values(1) .require_equals(true) - .takes_value(true), + .takes_value(true) + .validator(inspect_arg_validate), ) .arg( Arg::with_name("inspect-brk") @@ -1047,26 +1050,34 @@ fn inspect_args<'a, 'b>(app: App<'a, 'b>) -> App<'a, 'b> { .min_values(0) .max_values(1) .require_equals(true) - .takes_value(true), + .takes_value(true) + .validator(inspect_arg_validate), ) } +fn inspect_arg_validate(val: String) -> Result<(), String> { + match val.parse::() { + Ok(_) => Ok(()), + Err(e) => Err(e.to_string()), + } +} + fn inspect_arg_parse(flags: &mut Flags, matches: &clap::ArgMatches) { - const DEFAULT: &str = "127.0.0.1:9229"; + let default = || "127.0.0.1:9229".parse::().unwrap(); flags.inspect = if matches.is_present("inspect") { if let Some(host) = matches.value_of("inspect") { - Some(host.to_string()) + Some(host.parse().unwrap()) } else { - Some(DEFAULT.to_string()) + Some(default()) } } else { None }; flags.inspect_brk = if matches.is_present("inspect-brk") { if let Some(host) = matches.value_of("inspect-brk") { - Some(host.to_string()) + Some(host.parse().unwrap()) } else { - Some(DEFAULT.to_string()) + Some(default()) } } else { None @@ -2390,7 +2401,7 @@ mod tests { code: "const foo = 'bar'".to_string(), as_typescript: false, }, - inspect: Some("127.0.0.1:9229".to_string()), + inspect: Some("127.0.0.1:9229".parse().unwrap()), allow_net: true, allow_env: true, allow_run: true, @@ -2499,7 +2510,7 @@ mod tests { r.unwrap(), Flags { subcommand: DenoSubcommand::Repl {}, - inspect: Some("127.0.0.1:9229".to_string()), + inspect: Some("127.0.0.1:9229".parse().unwrap()), allow_read: true, allow_write: true, allow_net: true, @@ -2556,27 +2567,7 @@ mod tests { subcommand: DenoSubcommand::Run { script: "foo.js".to_string(), }, - inspect: Some("127.0.0.1:9229".to_string()), - ..Flags::default() - } - ); - } - - #[test] - fn inspect_custom_host() { - let r = flags_from_vec_safe(svec![ - "deno", - "run", - "--inspect=deno.land:80", - "foo.js" - ]); - assert_eq!( - r.unwrap(), - Flags { - subcommand: DenoSubcommand::Run { - script: "foo.js".to_string(), - }, - inspect: Some("deno.land:80".to_string()), + inspect: Some("127.0.0.1:9229".parse().unwrap()), ..Flags::default() } ); diff --git a/cli/global_state.rs b/cli/global_state.rs index 001c3f55fb..45a31406cb 100644 --- a/cli/global_state.rs +++ b/cli/global_state.rs @@ -9,7 +9,6 @@ use crate::deno_dir; use crate::file_fetcher::SourceFileFetcher; use crate::flags; use crate::http_cache; -use crate::inspector::InspectorServer; use crate::lockfile::Lockfile; use crate::msg; use crate::permissions::DenoPermissions; @@ -43,7 +42,6 @@ pub struct GlobalStateInner { pub wasm_compiler: WasmCompiler, pub lockfile: Option>, pub compiler_starts: AtomicUsize, - pub inspector_server: Option, compile_lock: AsyncMutex<()>, } @@ -84,16 +82,7 @@ impl GlobalState { None }; - let inspector_server = if let Some(ref host) = flags.inspect { - Some(InspectorServer::new(host, false)) - } else if let Some(ref host) = flags.inspect_brk { - Some(InspectorServer::new(host, true)) - } else { - None - }; - let inner = GlobalStateInner { - inspector_server, dir, permissions: DenoPermissions::from_flags(&flags), flags, diff --git a/cli/inspector.rs b/cli/inspector.rs index 203551f6fc..ecc6665f57 100644 --- a/cli/inspector.rs +++ b/cli/inspector.rs @@ -1,280 +1,195 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. -// The documentation for the inspector API is sparse, but these are helpful: -// https://chromedevtools.github.io/devtools-protocol/ -// https://hyperandroid.com/2020/02/12/v8-inspector-from-an-embedder-standpoint/ +//! The documentation for the inspector API is sparse, but these are helpful: +//! https://chromedevtools.github.io/devtools-protocol/ +//! https://hyperandroid.com/2020/02/12/v8-inspector-from-an-embedder-standpoint/ +use core::convert::Infallible as Never; // Alias for the future `!` type. +use deno_core; use deno_core::v8; -use futures; -use futures::executor; -use futures::future; -use futures::FutureExt; -use futures::SinkExt; -use futures::StreamExt; +use futures::channel::mpsc; +use futures::channel::mpsc::UnboundedReceiver; +use futures::channel::mpsc::UnboundedSender; +use futures::channel::oneshot; +use futures::future::Future; +use futures::prelude::*; +use futures::select; +use futures::stream::FuturesUnordered; +use futures::task; +use futures::task::Context; +use futures::task::Poll; +use std::cell::BorrowMutError; +use std::cell::RefCell; use std::collections::HashMap; use std::ffi::c_void; -use std::future::Future; +use std::mem::replace; +use std::mem::take; use std::mem::MaybeUninit; -use std::net::SocketAddrV4; +use std::net::SocketAddr; +use std::ops::Deref; +use std::ops::DerefMut; use std::pin::Pin; +use std::process; use std::ptr; -use std::sync::atomic::AtomicBool; -use std::sync::atomic::Ordering; +use std::ptr::NonNull; use std::sync::Arc; -use std::task::Context; -use std::task::Poll; -use tokio; -use tokio::sync::mpsc; -use tokio::sync::mpsc::error::TryRecvError; +use std::sync::Mutex; +use std::sync::Once; +use std::thread; use uuid::Uuid; use warp; use warp::filters::ws; +use warp::filters::ws::WebSocket; use warp::Filter; -const CONTEXT_GROUP_ID: i32 = 1; - -/// Owned by GloalState, this channel end can be used by any isolate thread -/// to register it's inspector with the inspector server. -type ServerMsgTx = mpsc::UnboundedSender; -/// Owned by the inspector server thread, used to to receive information about -/// new isolates. -type ServerMsgRx = mpsc::UnboundedReceiver; -/// These messages can be sent from any thread to the server thread. -enum ServerMsg { - AddInspector(InspectorInfo), -} - -/// Owned by the web socket server. Relays incoming websocket connections and -/// messages to the isolate/inspector thread. -type FrontendToInspectorTx = mpsc::UnboundedSender; -/// Owned by the isolate/worker. Receives incoming websocket connections and -/// messages from the inspector server thread. -type FrontendToInspectorRx = mpsc::UnboundedReceiver; -/// Messages sent over the FrontendToInspectorTx/FrontendToInspectorRx channel. -pub enum FrontendToInspectorMsg { - WsConnection { - session_uuid: Uuid, - session_to_frontend_tx: SessionToFrontendTx, - }, - WsIncoming { - session_uuid: Uuid, - msg: ws::Message, - }, -} - -/// Owned by the deno inspector session, used to forward messages from the -/// inspector channel on the isolate thread to the websocket that is owned by -/// the inspector server. -type SessionToFrontendTx = mpsc::UnboundedSender; -/// Owned by the inspector server. Messages arriving on this channel, coming -/// from the inspector session on the isolate thread are forwarded over the -/// websocket to the devtools frontend. -type SessionToFrontendRx = mpsc::UnboundedReceiver; - -/// Stored in a UUID hashmap, used by WS server. Clonable. -#[derive(Clone)] -struct InspectorInfo { - uuid: Uuid, - frontend_to_inspector_tx: FrontendToInspectorTx, - inspector_handle: DenoInspectorHandle, -} - -/// Owned by GlobalState. -pub struct InspectorServer { - address: SocketAddrV4, - thread_handle: Option>, - server_msg_tx: Option, +struct InspectorServer { + host: SocketAddr, + register_inspector_tx: UnboundedSender, + _thread_handle: thread::JoinHandle<()>, } impl InspectorServer { - pub fn new(host: &str, brk: bool) -> Self { - if brk { - todo!("--inspect-brk not yet supported"); - } - let address = host.parse::().unwrap(); - let (server_msg_tx, server_msg_rx) = mpsc::unbounded_channel::(); - let thread_handle = std::thread::spawn(move || { - crate::tokio_util::run_basic(server(address, server_msg_rx)); + /// Registers an Inspector instance with the inspector server. If the server + /// is not running yet, it'll be started first. + pub fn register_inspector(info: InspectorInfo) { + let self_ = Self::global(&info.host); + self_.register_inspector_tx.unbounded_send(info).unwrap(); + } + + /// Returns the global InspectorServer instance. If the server is not yet + /// running, this function starts it. + fn global(host: &SocketAddr) -> &'static InspectorServer { + let instance = unsafe { + static mut INSTANCE: Option = None; + static INIT: Once = Once::new(); + INIT.call_once(|| { + INSTANCE.replace(Self::new(*host)); + }); + INSTANCE.as_ref().unwrap() + }; + // We only start a single server, so all inspectors must bind to the same + // host:port combination. + assert_eq!(host, &instance.host); + instance + } + + fn new(host: SocketAddr) -> Self { + let (register_inspector_tx, register_inspector_rx) = + mpsc::unbounded::(); + let thread_handle = thread::spawn(move || { + crate::tokio_util::run_basic(server(host, register_inspector_rx)) }); Self { - address, - thread_handle: Some(thread_handle), - server_msg_tx: Some(server_msg_tx), + host, + register_inspector_tx, + _thread_handle: thread_handle, } } +} - /// Each worker/isolate to be debugged should call this exactly one. - /// Called from worker's thread - pub fn add_inspector( - &self, - isolate: &mut deno_core::Isolate, - ) -> Box { - let deno_core::Isolate { - v8_isolate, - global_context, - .. - } = isolate; - let v8_isolate = v8_isolate.as_mut().unwrap(); +/// Inspector information that is sent from the isolate thread to the server +/// thread when a new inspector is created. +struct InspectorInfo { + host: SocketAddr, + uuid: Uuid, + thread_name: Option, + new_websocket_tx: UnboundedSender, + canary_rx: oneshot::Receiver, +} - let mut hs = v8::HandleScope::new(v8_isolate); - let scope = hs.enter(); - let context = global_context.get(scope).unwrap(); +impl InspectorInfo { + fn get_json_metadata(&self) -> serde_json::Value { + json!({ + "description": "deno", + "devtoolsFrontendUrl": self.get_frontend_url(), + "faviconUrl": "https://deno.land/favicon.ico", + "id": self.uuid.to_string(), + "title": self.get_title(), + "type": "deno", + // TODO(ry): "url": "file://", + "webSocketDebuggerUrl": self.get_websocket_debugger_url(), + }) + } - let server_msg_tx = self.server_msg_tx.as_ref().unwrap().clone(); - let address = self.address; - let (frontend_to_inspector_tx, frontend_to_inspector_rx) = - mpsc::unbounded_channel::(); - let uuid = Uuid::new_v4(); + fn get_websocket_debugger_url(&self) -> String { + format!("ws://{}/ws/{}", &self.host, &self.uuid) + } - let inspector = crate::inspector::DenoInspector::new( - scope, - context, - frontend_to_inspector_rx, - ); + fn get_frontend_url(&self) -> String { + format!( + "chrome-devtools://devtools/bundled/inspector.html?v8only=true&ws={}/ws/{}", + &self.host, &self.uuid + ) + } - info!( - "Debugger listening on {}", - websocket_debugger_url(address, &uuid) - ); - - server_msg_tx - .send(ServerMsg::AddInspector(InspectorInfo { - uuid, - frontend_to_inspector_tx, - inspector_handle: DenoInspectorHandle::new( - &inspector, - v8_isolate.thread_safe_handle(), - ), - })) - .unwrap_or_else(|_| { - panic!("sending message to inspector server thread failed"); - }); - - inspector + fn get_title(&self) -> String { + format!( + "[{}] deno{}", + process::id(), + self + .thread_name + .as_ref() + .map(|n| format!(" - {}", n)) + .unwrap_or_default() + ) } } -impl Drop for InspectorServer { - fn drop(&mut self) { - self.server_msg_tx.take(); - self.thread_handle.take().unwrap().join().unwrap(); - panic!("TODO: this drop is never called"); - } -} - -fn websocket_debugger_url(address: SocketAddrV4, uuid: &Uuid) -> String { - format!("ws://{}:{}/ws/{}", address.ip(), address.port(), uuid) -} - -async fn server(address: SocketAddrV4, mut server_msg_rx: ServerMsgRx) { +async fn server( + host: SocketAddr, + register_inspector_rx: UnboundedReceiver, +) { + // TODO: `inspector_map` in an Rc> instead. This is currently not + // possible because warp requires all filters to implement Send, which should + // not be necessary because we are using a single-threaded runtime. let inspector_map = HashMap::::new(); - let inspector_map = Arc::new(std::sync::Mutex::new(inspector_map)); + let inspector_map = Arc::new(Mutex::new(inspector_map)); let inspector_map_ = inspector_map.clone(); - let msg_handler = async move { - while let Some(msg) = server_msg_rx.next().await { - match msg { - ServerMsg::AddInspector(inspector_info) => { - let existing = inspector_map_ - .lock() - .unwrap() - .insert(inspector_info.uuid, inspector_info); - if existing.is_some() { - panic!("UUID already in map"); - } - } - }; - } - }; + let mut register_inspector_handler = register_inspector_rx + .map(|info| { + eprintln!( + "Debugger listening on {}", + info.get_websocket_debugger_url() + ); + let mut g = inspector_map_.lock().unwrap(); + if g.insert(info.uuid, info).is_some() { + panic!("Inspector UUID already in map"); + } + }) + .collect::<()>(); + + let inspector_map_ = inspector_map_.clone(); + let mut deregister_inspector_handler = future::poll_fn(|cx| { + let mut g = inspector_map_.lock().unwrap(); + g.retain(|_, info| info.canary_rx.poll_unpin(cx) == Poll::Pending); + Poll::::Pending + }) + .fuse(); let inspector_map_ = inspector_map.clone(); - let websocket = warp::path("ws") + let websocket_route = warp::path("ws") .and(warp::path::param()) .and(warp::ws()) - .map(move |uuid: String, ws: warp::ws::Ws| { - let inspector_map__ = inspector_map_.clone(); - ws.on_upgrade(move |socket| async move { - let inspector_info = { - if let Ok(uuid) = Uuid::parse_str(&uuid) { - let g = inspector_map__.lock().unwrap(); - if let Some(inspector_info) = g.get(&uuid) { - inspector_info.clone() - } else { - return; - } - } else { - return; - } - }; - - // send a message back so register_worker can return... - let (mut ws_tx, mut ws_rx) = socket.split(); - - let (session_to_frontend_tx, mut session_to_frontend_rx): ( - SessionToFrontendTx, - SessionToFrontendRx, - ) = mpsc::unbounded_channel(); - - // Not to be confused with the WS's uuid... - let session_uuid = Uuid::new_v4(); - - inspector_info - .frontend_to_inspector_tx - .send(FrontendToInspectorMsg::WsConnection { - session_to_frontend_tx, - session_uuid, + .and_then(move |uuid: String, ws: warp::ws::Ws| { + future::ready( + Uuid::parse_str(&uuid) + .ok() + .and_then(|uuid| { + let g = inspector_map_.lock().unwrap(); + g.get(&uuid).map(|info| info.new_websocket_tx.clone()).map( + |new_websocket_tx| { + ws.on_upgrade(move |websocket| async move { + let _ = new_websocket_tx.unbounded_send(websocket); + }) + }, + ) }) - .unwrap_or_else(|_| { - panic!("sending message to frontend_to_inspector_tx failed"); - }); - - inspector_info.inspector_handle.interrupt(); - - let pump_to_inspector = async { - while let Some(Ok(msg)) = ws_rx.next().await { - inspector_info - .frontend_to_inspector_tx - .send(FrontendToInspectorMsg::WsIncoming { msg, session_uuid }) - .unwrap_or_else(|_| { - panic!("sending message to frontend_to_inspector_tx failed"); - }); - - inspector_info.inspector_handle.interrupt(); - } - }; - - let pump_from_session = async { - while let Some(msg) = session_to_frontend_rx.next().await { - ws_tx.send(msg).await.ok(); - } - }; - - future::join(pump_to_inspector, pump_from_session).await; - }) + .ok_or_else(warp::reject::not_found), + ) }); - let inspector_map_ = inspector_map.clone(); - let json_list = - warp::path("json") - .map(move || { - let g = inspector_map_.lock().unwrap(); - let json_values: Vec = g.iter().map(|(uuid, _)| { - let url = websocket_debugger_url(address, uuid); - json!({ - "description": "deno", - "devtoolsFrontendUrl": format!("chrome-devtools://devtools/bundled/js_app.html?experiments=true&v8only=true&ws={}", url), - "faviconUrl": "https://deno.land/favicon.ico", - "id": uuid.to_string(), - "title": format!("deno[{}]", std::process::id()), - "type": "deno", - "url": "file://", - "webSocketDebuggerUrl": url, - }) - }).collect(); - warp::reply::json(&json!(json_values)) - }); - - let version = warp::path!("json" / "version").map(|| { + let json_version_route = warp::path!("json" / "version").map(|| { warp::reply::json(&json!({ "Browser": format!("Deno/{}", crate::version::DENO), "Protocol-Version": "1.3", @@ -282,239 +197,525 @@ async fn server(address: SocketAddrV4, mut server_msg_rx: ServerMsgRx) { })) }); - let routes = websocket.or(version).or(json_list); - let (_, web_handler) = warp::serve(routes) - .try_bind_ephemeral(address) - .unwrap_or_else(|e| { - eprintln!("Cannot start inspector server: {}", e); - std::process::exit(1); - }); + let inspector_map_ = inspector_map.clone(); + let json_list_route = warp::path("json").map(move || { + let g = inspector_map_.lock().unwrap(); + let json_values = g + .values() + .map(|info| info.get_json_metadata()) + .collect::>(); + warp::reply::json(&json!(json_values)) + }); - future::join(msg_handler, web_handler).await; + let server_routes = + websocket_route.or(json_version_route).or(json_list_route); + let mut server_handler = warp::serve(server_routes) + .try_bind_ephemeral(host) + .map(|(_, fut)| fut) + .unwrap_or_else(|err| { + eprintln!("Cannot start inspector server: {}.", err); + process::exit(1); + }) + .fuse(); + + select! { + _ = register_inspector_handler => (), + _ = deregister_inspector_handler => unreachable!(), + _ = server_handler => unreachable!(), + } +} + +#[derive(Clone, Copy)] +enum PollState { + Idle, + Woken, + Polling, + Parked, + Dropped, } pub struct DenoInspector { - client: v8::inspector::V8InspectorClientBase, - inspector: v8::UniqueRef, - pub sessions: HashMap>, - frontend_to_inspector_rx: FrontendToInspectorRx, - paused: bool, - interrupted: Arc, + v8_inspector_client: v8::inspector::V8InspectorClientBase, + v8_inspector: v8::UniqueRef, + sessions: RefCell, + flags: RefCell, + waker: Arc, + _canary_tx: oneshot::Sender, } -impl DenoInspector { - pub fn new

( - scope: &mut P, - context: v8::Local, - frontend_to_inspector_rx: FrontendToInspectorRx, - ) -> Box - where - P: v8::InIsolate, - { - let mut deno_inspector = new_box_with(|address| Self { - client: v8::inspector::V8InspectorClientBase::new::(), - // TODO(piscisaureus): V8Inspector::create() should require that - // the 'client' argument cannot move. - inspector: v8::inspector::V8Inspector::create(scope, unsafe { - &mut *address - }), - sessions: HashMap::new(), - frontend_to_inspector_rx, - paused: false, - interrupted: Arc::new(AtomicBool::new(false)), - }); - - let empty_view = v8::inspector::StringView::empty(); - deno_inspector.inspector.context_created( - context, - CONTEXT_GROUP_ID, - &empty_view, - ); - - deno_inspector - } - - pub fn connect( - &mut self, - session_uuid: Uuid, - session_to_frontend_tx: SessionToFrontendTx, - ) { - let session = - DenoInspectorSession::new(&mut self.inspector, session_to_frontend_tx); - self.sessions.insert(session_uuid, session); - } - - fn dispatch_frontend_to_inspector_msg( - &mut self, - msg: FrontendToInspectorMsg, - ) { - match msg { - FrontendToInspectorMsg::WsConnection { - session_uuid, - session_to_frontend_tx, - } => { - self.connect(session_uuid, session_to_frontend_tx); - } - FrontendToInspectorMsg::WsIncoming { session_uuid, msg } => { - if let Some(deno_session) = self.sessions.get_mut(&session_uuid) { - deno_session.dispatch_protocol_message(msg) - } else { - info!("Unknown inspector session {}. msg {:?}", session_uuid, msg); - } - } - }; - } - - extern "C" fn poll_interrupt( - _isolate: &mut v8::Isolate, - self_ptr: *mut c_void, - ) { - let self_ = unsafe { &mut *(self_ptr as *mut Self) }; - let _ = self_.poll_without_waker(); - } - - fn poll_without_waker(&mut self) -> Poll<::Output> { - loop { - match self.frontend_to_inspector_rx.try_recv() { - Ok(msg) => self.dispatch_frontend_to_inspector_msg(msg), - Err(TryRecvError::Closed) => break Poll::Ready(()), - Err(TryRecvError::Empty) - if self.interrupted.swap(false, Ordering::AcqRel) => {} - Err(TryRecvError::Empty) => break Poll::Pending, - } - } +impl Deref for DenoInspector { + type Target = v8::inspector::V8Inspector; + fn deref(&self) -> &Self::Target { + &self.v8_inspector } } -/// DenoInspector implements a Future so that it can poll for incoming messages -/// from the WebSocket server. Since a Worker ownes a DenoInspector, and because -/// a Worker is a Future too, Worker::poll will call this. -impl Future for DenoInspector { - type Output = (); +impl DerefMut for DenoInspector { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.v8_inspector + } +} - fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { - let self_ = self.get_mut(); - loop { - match self_.frontend_to_inspector_rx.poll_recv(cx) { - Poll::Ready(Some(msg)) => self_.dispatch_frontend_to_inspector_msg(msg), - Poll::Ready(None) => break Poll::Ready(()), - Poll::Pending if self_.interrupted.swap(false, Ordering::AcqRel) => {} - Poll::Pending => break Poll::Pending, - } - } +impl Drop for DenoInspector { + fn drop(&mut self) { + // Since the waker is cloneable, it might outlive the inspector itself. + // Set the poll state to 'dropped' so it doesn't attempt to request an + // interrupt from the isolate. + self.waker.update(|w| w.poll_state = PollState::Dropped); + // V8 automatically deletes all sessions when an Inspector instance is + // deleted, however InspectorSession also has a drop handler that cleans + // up after itself. To avoid a double free, make sure the inspector is + // dropped last. + take(&mut *self.sessions.borrow_mut()); } } impl v8::inspector::V8InspectorClientImpl for DenoInspector { fn base(&self) -> &v8::inspector::V8InspectorClientBase { - &self.client + &self.v8_inspector_client } fn base_mut(&mut self) -> &mut v8::inspector::V8InspectorClientBase { - &mut self.client + &mut self.v8_inspector_client } fn run_message_loop_on_pause(&mut self, context_group_id: i32) { - assert_eq!(context_group_id, CONTEXT_GROUP_ID); - assert!(!self.paused); - self.paused = true; - - // Creating a new executor and calling block_on generally causes a panic. - // In this case it works because the outer executor is provided by tokio - // and the one created here comes from the 'futures' crate, and they don't - // see each other. - let dispatch_messages_while_paused = - future::poll_fn(|cx| match self.poll_unpin(cx) { - Poll::Pending if self.paused => Poll::Pending, - _ => Poll::Ready(()), - }); - executor::block_on(dispatch_messages_while_paused); + assert_eq!(context_group_id, DenoInspectorSession::CONTEXT_GROUP_ID); + self.flags.borrow_mut().on_pause = true; + let _ = self.poll_sessions(None); } fn quit_message_loop_on_pause(&mut self) { - self.paused = false; + self.flags.borrow_mut().on_pause = false; } fn run_if_waiting_for_debugger(&mut self, context_group_id: i32) { - assert_eq!(context_group_id, CONTEXT_GROUP_ID); + assert_eq!(context_group_id, DenoInspectorSession::CONTEXT_GROUP_ID); + self.flags.borrow_mut().session_handshake_done = true; } } -#[derive(Clone)] -struct DenoInspectorHandle { - deno_inspector_ptr: *mut c_void, - isolate_handle: v8::IsolateHandle, - interrupted: Arc, +/// DenoInspector implements a Future so that it can poll for new incoming +/// connections and messages from the WebSocket server. The Worker that owns +/// this DenoInspector will call our poll function from Worker::poll(). +impl Future for DenoInspector { + type Output = (); + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<()> { + self.poll_sessions(Some(cx)).unwrap() + } } -impl DenoInspectorHandle { +impl DenoInspector { + const CONTEXT_GROUP_ID: i32 = 1; + pub fn new( - deno_inspector: &DenoInspector, - isolate_handle: v8::IsolateHandle, - ) -> Self { - Self { - deno_inspector_ptr: deno_inspector as *const DenoInspector - as *const c_void as *mut c_void, - isolate_handle, - interrupted: deno_inspector.interrupted.clone(), - } + isolate: &mut deno_core::Isolate, + host: SocketAddr, + wait_for_debugger: bool, + ) -> Box { + let deno_core::Isolate { + v8_isolate, + global_context, + .. + } = isolate; + + let v8_isolate = v8_isolate.as_mut().unwrap(); + let mut hs = v8::HandleScope::new(v8_isolate); + let scope = hs.enter(); + + let (new_websocket_tx, new_websocket_rx) = mpsc::unbounded::(); + let (canary_tx, canary_rx) = oneshot::channel::(); + + // Create DenoInspector instance. + let mut self_ = new_box_with(|self_ptr| { + let v8_inspector_client = + v8::inspector::V8InspectorClientBase::new::(); + let v8_inspector = + v8::inspector::V8Inspector::create(scope, unsafe { &mut *self_ptr }); + + let sessions = InspectorSessions::new(self_ptr, new_websocket_rx); + let flags = InspectorFlags::new(wait_for_debugger); + let waker = InspectorWaker::new(scope.isolate().thread_safe_handle()); + + Self { + v8_inspector_client, + v8_inspector, + sessions, + flags, + waker, + _canary_tx: canary_tx, + } + }); + + // Tell the inspector about the global context. + let context = global_context.get(scope).unwrap(); + let context_name = v8::inspector::StringView::from(&b"global context"[..]); + self_.context_created(context, Self::CONTEXT_GROUP_ID, &context_name); + + // Register this inspector with the server thread. + // Note: poll_sessions() might block if we need to wait for a + // debugger front-end to connect. Therefore the server thread must to be + // nofified *before* polling. + let info = InspectorInfo { + host, + uuid: Uuid::new_v4(), + thread_name: thread::current().name().map(|n| n.to_owned()), + new_websocket_tx, + canary_rx, + }; + InspectorServer::register_inspector(info); + + // Poll the session handler so we will get notified whenever there is + // new_incoming debugger activity. + let _ = self_.poll_sessions(None).unwrap(); + + self_ } - pub fn interrupt(&self) { - if !self.interrupted.swap(true, Ordering::AcqRel) { - self.isolate_handle.request_interrupt( - DenoInspector::poll_interrupt, - self.deno_inspector_ptr, - ); + fn poll_sessions( + &self, + mut invoker_cx: Option<&mut Context>, + ) -> Result, BorrowMutError> { + // The futures this function uses do not have re-entrant poll() functions. + // However it is can happpen that poll_sessions() gets re-entered, e.g. + // when an interrupt request is honored while the inspector future is polled + // by the task executor. We let the caller know by returning some error. + let mut sessions = self.sessions.try_borrow_mut()?; + + self.waker.update(|w| { + match w.poll_state { + PollState::Idle | PollState::Woken => w.poll_state = PollState::Polling, + _ => unreachable!(), + }; + }); + + // Create a new Context object that will make downstream futures + // use the InspectorWaker when they are ready to be polled again. + let waker_ref = task::waker_ref(&self.waker); + let cx = &mut Context::from_waker(&waker_ref); + + loop { + loop { + // Do one "handshake" with a newly connected session at a time. + if let Some(session) = &mut sessions.handshake { + let poll_result = session.poll_unpin(cx); + let handshake_done = + replace(&mut self.flags.borrow_mut().session_handshake_done, false); + match poll_result { + Poll::Pending if handshake_done => { + let mut session = sessions.handshake.take().unwrap(); + if replace( + &mut self.flags.borrow_mut().waiting_for_session, + false, + ) { + session.break_on_first_statement(); + } + sessions.established.push(session); + } + Poll::Ready(_) => sessions.handshake = None, + Poll::Pending => break, + }; + } + + // Accept new connections. + match sessions.new_incoming.poll_next_unpin(cx) { + Poll::Ready(Some(session)) => { + let prev = sessions.handshake.replace(session); + assert!(prev.is_none()); + continue; + } + Poll::Ready(None) => {} + Poll::Pending => {} + } + + // Poll established sessions. + match sessions.established.poll_next_unpin(cx) { + Poll::Ready(Some(_)) => continue, + Poll::Ready(None) => break, + Poll::Pending => break, + }; + } + + let should_block = sessions.handshake.is_some() + || self.flags.borrow().on_pause + || self.flags.borrow().waiting_for_session; + + let new_state = self.waker.update(|w| { + match w.poll_state { + PollState::Woken => { + // The inspector was woken while the session handler was being + // polled, so we poll it another time. + w.poll_state = PollState::Polling; + } + PollState::Polling if !should_block => { + // The session handler doesn't need to be polled any longer, and + // there's no reason to block (execution is not paused), so this + // function is about to return. + w.poll_state = PollState::Idle; + // Register the task waker that can be used to wake the parent + // task that will poll the inspector future. + if let Some(cx) = invoker_cx.take() { + w.task_waker.replace(cx.waker().clone()); + } + // Register the address of the inspector, which allows the waker + // to request an interrupt from the isolate. + w.inspector_ptr = NonNull::new(self as *const _ as *mut Self); + } + PollState::Polling if should_block => { + // Isolate execution has been paused but there are no more + // events to process, so this thread will be parked. Therefore, + // store the current thread handle in the waker so it knows + // which thread to unpark when new events arrive. + w.poll_state = PollState::Parked; + w.parked_thread.replace(thread::current()); + } + _ => unreachable!(), + }; + w.poll_state + }); + match new_state { + PollState::Idle => break Ok(Poll::Pending), // Yield to task. + PollState::Polling => {} // Poll the session handler again. + PollState::Parked => thread::park(), // Park the thread. + _ => unreachable!(), + }; } } } -unsafe impl Send for DenoInspectorHandle {} -unsafe impl Sync for DenoInspectorHandle {} +#[derive(Default)] +struct InspectorFlags { + waiting_for_session: bool, + session_handshake_done: bool, + on_pause: bool, +} -/// sub-class of v8::inspector::Channel -pub struct DenoInspectorSession { - channel: v8::inspector::ChannelBase, - session: v8::UniqueRef, - session_to_frontend_tx: SessionToFrontendTx, +impl InspectorFlags { + fn new(waiting_for_session: bool) -> RefCell { + let self_ = Self { + waiting_for_session, + ..Default::default() + }; + RefCell::new(self_) + } +} + +struct InspectorSessions { + new_incoming: + Pin> + 'static>>, + handshake: Option>, + established: FuturesUnordered>, +} + +impl InspectorSessions { + fn new( + inspector_ptr: *mut DenoInspector, + new_websocket_rx: UnboundedReceiver, + ) -> RefCell { + let new_incoming = new_websocket_rx + .map(move |websocket| DenoInspectorSession::new(inspector_ptr, websocket)) + .boxed_local(); + let self_ = Self { + new_incoming, + ..Default::default() + }; + RefCell::new(self_) + } +} + +impl Default for InspectorSessions { + fn default() -> Self { + Self { + new_incoming: stream::empty().boxed_local(), + handshake: None, + established: FuturesUnordered::new(), + } + } +} + +struct InspectorWakerInner { + poll_state: PollState, + task_waker: Option, + parked_thread: Option, + inspector_ptr: Option>, + isolate_handle: v8::IsolateHandle, +} + +unsafe impl Send for InspectorWakerInner {} + +struct InspectorWaker(Mutex); + +impl InspectorWaker { + fn new(isolate_handle: v8::IsolateHandle) -> Arc { + let inner = InspectorWakerInner { + poll_state: PollState::Idle, + task_waker: None, + parked_thread: None, + inspector_ptr: None, + isolate_handle, + }; + Arc::new(Self(Mutex::new(inner))) + } + + fn update(&self, update_fn: F) -> R + where + F: FnOnce(&mut InspectorWakerInner) -> R, + { + let mut g = self.0.lock().unwrap(); + update_fn(&mut g) + } +} + +impl task::ArcWake for InspectorWaker { + fn wake_by_ref(arc_self: &Arc) { + arc_self.update(|w| { + match w.poll_state { + PollState::Idle => { + // Wake the task, if any, that has polled the Inspector future last. + if let Some(waker) = w.task_waker.take() { + waker.wake() + } + // Request an interrupt from the isolate if it's running and there's + // not unhandled interrupt request in flight. + if let Some(arg) = w + .inspector_ptr + .take() + .map(|ptr| ptr.as_ptr() as *mut c_void) + { + w.isolate_handle.request_interrupt(handle_interrupt, arg); + } + extern "C" fn handle_interrupt( + _isolate: &mut v8::Isolate, + arg: *mut c_void, + ) { + let inspector = unsafe { &*(arg as *mut DenoInspector) }; + let _ = inspector.poll_sessions(None); + } + } + PollState::Parked => { + // Unpark the isolate thread. + let parked_thread = w.parked_thread.take().unwrap(); + assert_ne!(parked_thread.id(), thread::current().id()); + parked_thread.unpark(); + } + _ => {} + }; + w.poll_state = PollState::Woken; + }); + } +} + +struct DenoInspectorSession { + v8_channel: v8::inspector::ChannelBase, + v8_session: v8::UniqueRef, + message_handler: Pin + 'static>>, + // Internal channel/queue that temporarily stores messages sent by V8 to + // the front-end, before they are sent over the websocket. + outbound_queue_tx: + UnboundedSender>, +} + +impl Deref for DenoInspectorSession { + type Target = v8::inspector::V8InspectorSession; + fn deref(&self) -> &Self::Target { + &self.v8_session + } +} + +impl DerefMut for DenoInspectorSession { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.v8_session + } } impl DenoInspectorSession { + const CONTEXT_GROUP_ID: i32 = 1; + pub fn new( - inspector: &mut v8::inspector::V8Inspector, - session_to_frontend_tx: SessionToFrontendTx, + inspector_ptr: *mut DenoInspector, + websocket: WebSocket, ) -> Box { - new_box_with(|address| { + new_box_with(move |self_ptr| { + let v8_channel = v8::inspector::ChannelBase::new::(); + let empty_view = v8::inspector::StringView::empty(); + let v8_session = unsafe { &mut *inspector_ptr }.connect( + Self::CONTEXT_GROUP_ID, + // Todo(piscisaureus): V8Inspector::connect() should require that + // the 'v8_channel' argument cannot move. + unsafe { &mut *self_ptr }, + &empty_view, + ); + + let (outbound_queue_tx, outbound_queue_rx) = + mpsc::unbounded::>(); + + let message_handler = + Self::create_message_handler(self_ptr, websocket, outbound_queue_rx); + Self { - channel: v8::inspector::ChannelBase::new::(), - session: inspector.connect( - CONTEXT_GROUP_ID, - // Todo(piscisaureus): V8Inspector::connect() should require that - // the 'channel' argument cannot move. - unsafe { &mut *address }, - &empty_view, - ), - session_to_frontend_tx, + v8_channel, + v8_session, + message_handler, + outbound_queue_tx, } }) } - pub fn dispatch_protocol_message(&mut self, ws_msg: ws::Message) { - let bytes = ws_msg.as_bytes(); - let string_view = v8::inspector::StringView::from(bytes); - self.session.dispatch_protocol_message(&string_view); + fn create_message_handler( + self_ptr: *mut Self, + websocket: WebSocket, + outbound_queue_rx: UnboundedReceiver< + v8::UniquePtr, + >, + ) -> Pin + 'static>> { + let (websocket_tx, websocket_rx) = websocket.split(); + + // Receive messages from the websocket and dispatch them to the V8 session. + let inbound_pump = websocket_rx + .map_ok(move |msg| { + let msg = msg.as_bytes(); + let msg = v8::inspector::StringView::from(msg); + unsafe { &mut *self_ptr }.dispatch_protocol_message(&msg); + }) + .try_collect::<()>(); + + // Convert and forward messages from the outbound message queue to the + // websocket. + let outbound_pump = outbound_queue_rx + .map(move |msg| { + let msg = msg.unwrap().string().to_string(); + let msg = ws::Message::text(msg); + Ok(msg) + }) + .forward(websocket_tx); + + let disconnect_future = future::try_join(inbound_pump, outbound_pump); + + async move { + eprintln!("Debugger session started."); + match disconnect_future.await { + Ok(_) => eprintln!("Debugger session ended."), + Err(err) => eprintln!("Debugger session ended: {}.", err), + }; + } + .boxed_local() + } + + pub fn break_on_first_statement(&mut self) { + let reason = v8::inspector::StringView::from(&b"debugCommand"[..]); + let detail = v8::inspector::StringView::empty(); + self.schedule_pause_on_next_statement(&reason, &detail); } } impl v8::inspector::ChannelImpl for DenoInspectorSession { fn base(&self) -> &v8::inspector::ChannelBase { - &self.channel + &self.v8_channel } fn base_mut(&mut self) -> &mut v8::inspector::ChannelBase { - &mut self.channel + &mut self.v8_channel } fn send_response( @@ -522,28 +723,24 @@ impl v8::inspector::ChannelImpl for DenoInspectorSession { _call_id: i32, message: v8::UniquePtr, ) { - let ws_msg = v8_to_ws_msg(message); - self.session_to_frontend_tx.send(ws_msg).unwrap(); + let _ = self.outbound_queue_tx.unbounded_send(message); } fn send_notification( &mut self, message: v8::UniquePtr, ) { - let ws_msg = v8_to_ws_msg(message); - self.session_to_frontend_tx.send(ws_msg).unwrap(); + let _ = self.outbound_queue_tx.unbounded_send(message); } fn flush_protocol_notifications(&mut self) {} } -// TODO impl From or Into -fn v8_to_ws_msg( - message: v8::UniquePtr, -) -> ws::Message { - let mut x = message.unwrap(); - let s = x.string().to_string(); - ws::Message::text(s) +impl Future for DenoInspectorSession { + type Output = (); + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { + self.message_handler.poll_unpin(cx) + } } fn new_box_with(new_fn: impl FnOnce(*mut T) -> T) -> Box { diff --git a/cli/lib.rs b/cli/lib.rs index 832ff1ccb6..16c7942c66 100644 --- a/cli/lib.rs +++ b/cli/lib.rs @@ -70,6 +70,7 @@ use crate::file_fetcher::SourceFile; use crate::global_state::GlobalState; use crate::msg::MediaType; use crate::ops::io::get_stdio; +use crate::state::DebugType; use crate::state::State; use crate::worker::MainWorker; use deno_core::v8_set_flags; @@ -132,7 +133,7 @@ fn create_main_worker( global_state: GlobalState, main_module: ModuleSpecifier, ) -> Result { - let state = State::new(global_state, None, main_module)?; + let state = State::new(global_state, None, main_module, DebugType::Main)?; { let mut s = state.borrow_mut(); diff --git a/cli/state.rs b/cli/state.rs index 44ab73034e..fe5aa1b74c 100644 --- a/cli/state.rs +++ b/cli/state.rs @@ -33,6 +33,16 @@ use std::str; use std::thread::JoinHandle; use std::time::Instant; +#[derive(Copy, Clone, Eq, PartialEq)] +pub enum DebugType { + /// Can be debugged, will wait for debugger when --inspect-brk given. + Main, + /// Can be debugged, never waits for debugger. + Dependent, + /// No inspector instance is created. + Internal, +} + #[derive(Clone)] pub struct State(Rc>); @@ -59,6 +69,7 @@ pub struct StateInner { pub seeded_rng: Option, pub resource_table: ResourceTable, pub target_lib: TargetLib, + pub debug_type: DebugType, } impl State { @@ -230,6 +241,7 @@ impl State { global_state: GlobalState, shared_permissions: Option, main_module: ModuleSpecifier, + debug_type: DebugType, ) -> Result { let import_map: Option = match global_state.flags.import_map_path.as_ref() { @@ -259,9 +271,9 @@ impl State { next_worker_id: 0, start_time: Instant::now(), seeded_rng, - resource_table: ResourceTable::default(), target_lib: TargetLib::Main, + debug_type, })); Ok(Self(state)) @@ -295,9 +307,9 @@ impl State { next_worker_id: 0, start_time: Instant::now(), seeded_rng, - resource_table: ResourceTable::default(), target_lib: TargetLib::Worker, + debug_type: DebugType::Dependent, })); Ok(Self(state)) @@ -370,6 +382,7 @@ impl State { GlobalState::mock(vec!["deno".to_string()]), None, module_specifier, + DebugType::Main, ) .unwrap() } diff --git a/cli/tests/inspector2.js b/cli/tests/inspector2.js new file mode 100644 index 0000000000..34d8097d23 --- /dev/null +++ b/cli/tests/inspector2.js @@ -0,0 +1 @@ +console.log("hello from the script"); diff --git a/cli/tests/integration_tests.rs b/cli/tests/integration_tests.rs index 98c564b4d9..4f2847ee46 100644 --- a/cli/tests/integration_tests.rs +++ b/cli/tests/integration_tests.rs @@ -7,6 +7,8 @@ extern crate nix; extern crate pty; extern crate tempfile; +use futures::prelude::*; +use std::io::BufRead; use std::process::Command; use tempfile::TempDir; @@ -1999,11 +2001,9 @@ fn test_permissions_net_listen_allow_localhost() { assert!(!err.contains(util::PERMISSION_DENIED_PATTERN)); } -#[cfg(not(target_os = "linux"))] // TODO(ry) broken on github actions. fn extract_ws_url_from_stderr( stderr: &mut std::process::ChildStderr, ) -> url::Url { - use std::io::BufRead; let mut stderr = std::io::BufReader::new(stderr); let mut stderr_first_line = String::from(""); let _ = stderr.read_line(&mut stderr_first_line).unwrap(); @@ -2026,7 +2026,7 @@ async fn inspector_connect() { .arg("run") // Warning: each inspector test should be on its own port to avoid // conflicting with another inspector test. - .arg("--inspect=127.0.0.1:9229") + .arg("--inspect=127.0.0.1:9228") .arg(script) .stderr(std::process::Stdio::piped()) .spawn() @@ -2042,6 +2042,86 @@ async fn inspector_connect() { child.kill().unwrap(); } +enum TestStep { + StdOut(&'static str), + WsRecv(&'static str), + WsSend(&'static str), +} + +#[tokio::test] +async fn inspector_break_on_first_line() { + let script = deno::test_util::root_path() + .join("cli") + .join("tests") + .join("inspector2.js"); + let mut child = util::deno_cmd() + .arg("run") + // Warning: each inspector test should be on its own port to avoid + // conflicting with another inspector test. + .arg("--inspect-brk=127.0.0.1:9229") + .arg(script) + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::piped()) + .spawn() + .unwrap(); + + let stderr = child.stderr.as_mut().unwrap(); + let ws_url = extract_ws_url_from_stderr(stderr); + let (socket, response) = tokio_tungstenite::connect_async(ws_url) + .await + .expect("Can't connect"); + assert_eq!(response.status(), 101); // Switching protocols. + + let (mut socket_tx, mut socket_rx) = socket.split(); + + let stdout = child.stdout.as_mut().unwrap(); + let mut stdout_lines = std::io::BufReader::new(stdout).lines(); + + use TestStep::*; + let test_steps = vec![ + WsSend(r#"{"id":1,"method":"Runtime.enable"}"#), + WsSend(r#"{"id":2,"method":"Debugger.enable"}"#), + WsRecv( + r#"{"method":"Runtime.executionContextCreated","params":{"context":{"id":1,"#, + ), + WsRecv(r#"{"id":1,"result":{}}"#), + WsRecv(r#"{"id":2,"result":{"debuggerId":"#), + WsSend(r#"{"id":3,"method":"Runtime.runIfWaitingForDebugger"}"#), + WsRecv(r#"{"id":3,"result":{}}"#), + WsRecv(r#"{"method":"Debugger.paused","#), + WsSend( + r#"{"id":5,"method":"Runtime.evaluate","params":{"expression":"Deno.core.print(\"hello from the inspector\\n\")","contextId":1,"includeCommandLineAPI":true,"silent":false,"returnByValue":true}}"#, + ), + WsRecv(r#"{"id":5,"result":{"result":{"type":"undefined"}}}"#), + StdOut("hello from the inspector"), + WsSend(r#"{"id":6,"method":"Debugger.resume"}"#), + WsRecv(r#"{"id":6,"result":{}}"#), + StdOut("hello from the script"), + ]; + + for step in test_steps { + match step { + StdOut(s) => match stdout_lines.next() { + Some(Ok(line)) => assert_eq!(line, s), + other => panic!(other), + }, + WsRecv(s) => loop { + let msg = match socket_rx.next().await { + Some(Ok(msg)) => msg.to_string(), + other => panic!(other), + }; + if !msg.starts_with(r#"{"method":"Debugger.scriptParsed","#) { + assert!(msg.starts_with(s)); + break; + } + }, + WsSend(s) => socket_tx.send(s.into()).await.unwrap(), + } + } + + child.kill().unwrap(); +} + #[cfg(not(target_os = "linux"))] // TODO(ry) broken on github actions. #[tokio::test] async fn inspector_pause() { @@ -2059,7 +2139,6 @@ async fn inspector_pause() { .spawn() .unwrap(); let ws_url = extract_ws_url_from_stderr(child.stderr.as_mut().unwrap()); - println!("ws_url {}", ws_url); // We use tokio_tungstenite as a websocket client because warp (which is // a dependency of Deno) uses it. let (mut socket, _) = tokio_tungstenite::connect_async(ws_url) @@ -2082,7 +2161,6 @@ async fn inspector_pause() { unreachable!() } - use futures::sink::SinkExt; socket .send(r#"{"id":6,"method":"Debugger.enable"}"#.into()) .await diff --git a/cli/worker.rs b/cli/worker.rs index 994f22f04a..7c102a6023 100644 --- a/cli/worker.rs +++ b/cli/worker.rs @@ -1,6 +1,8 @@ // Copyright 2018-2020 the Deno authors. All rights reserved. MIT license. use crate::fmt_errors::JSError; +use crate::inspector::DenoInspector; use crate::ops; +use crate::state::DebugType; use crate::state::State; use deno_core; use deno_core::Buf; @@ -97,7 +99,7 @@ pub struct Worker { pub waker: AtomicWaker, pub(crate) internal_channels: WorkerChannelsInternal, external_channels: WorkerHandle, - inspector: Option>, + inspector: Option>, } impl Worker { @@ -107,10 +109,18 @@ impl Worker { let global_state = state.borrow().global_state.clone(); - let inspector = global_state - .inspector_server - .as_ref() - .map(|s| s.add_inspector(&mut *isolate)); + let inspect = global_state.flags.inspect.as_ref(); + let inspect_brk = global_state.flags.inspect_brk.as_ref(); + let inspector = inspect + .or(inspect_brk) + .and_then(|host| match state.borrow().debug_type { + DebugType::Main if inspect_brk.is_some() => Some((host, true)), + DebugType::Main | DebugType::Dependent => Some((host, false)), + DebugType::Internal => None, + }) + .map(|(host, wait_for_debugger)| { + DenoInspector::new(&mut isolate, *host, wait_for_debugger) + }); isolate.set_js_error_create_fn(move |core_js_error| { JSError::create(core_js_error, &global_state.ts_compiler) @@ -287,8 +297,13 @@ mod tests { let module_specifier = ModuleSpecifier::resolve_url_or_path(&p.to_string_lossy()).unwrap(); let global_state = GlobalState::new(flags::Flags::default()).unwrap(); - let state = - State::new(global_state, None, module_specifier.clone()).unwrap(); + let state = State::new( + global_state, + None, + module_specifier.clone(), + DebugType::Main, + ) + .unwrap(); let state_ = state.clone(); tokio_util::run_basic(async move { let mut worker = @@ -316,8 +331,13 @@ mod tests { let module_specifier = ModuleSpecifier::resolve_url_or_path(&p.to_string_lossy()).unwrap(); let global_state = GlobalState::new(flags::Flags::default()).unwrap(); - let state = - State::new(global_state, None, module_specifier.clone()).unwrap(); + let state = State::new( + global_state, + None, + module_specifier.clone(), + DebugType::Main, + ) + .unwrap(); let state_ = state.clone(); tokio_util::run_basic(async move { let mut worker = @@ -354,8 +374,13 @@ mod tests { ..flags::Flags::default() }; let global_state = GlobalState::new(flags).unwrap(); - let state = - State::new(global_state.clone(), None, module_specifier.clone()).unwrap(); + let state = State::new( + global_state.clone(), + None, + module_specifier.clone(), + DebugType::Main, + ) + .unwrap(); let mut worker = MainWorker::new( "TEST".to_string(), startup_data::deno_isolate_init(),