diff --git a/Cargo.lock b/Cargo.lock index a172e0c1e9..42e21306f1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -958,7 +958,7 @@ dependencies = [ "env_logger", "eszip", "fancy-regex", - "fastwebsockets", + "fastwebsockets 0.5.0", "flaky_test", "flate2", "fs3", @@ -1612,13 +1612,18 @@ dependencies = [ "deno_webstorage", "dlopen2", "encoding_rs", - "fastwebsockets", + "fastwebsockets 0.5.0", + "fastwebsockets 0.6.0", "filetime", "flate2", "fs3", "fwdansi", "http 0.2.11", + "http 1.0.0", + "http-body-util", "hyper 0.14.27", + "hyper 1.1.0", + "hyper-util", "libc", "log", "netif", @@ -1765,7 +1770,7 @@ dependencies = [ "deno_core", "deno_net", "deno_tls", - "fastwebsockets", + "fastwebsockets 0.5.0", "h2 0.3.22", "http 0.2.11", "hyper 0.14.27", @@ -2353,6 +2358,25 @@ dependencies = [ "utf-8", ] +[[package]] +name = "fastwebsockets" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f63dd7b57f9b33b1741fa631c9522eb35d43e96dcca4a6a91d5e4ca7c93acdc1" +dependencies = [ + "base64 0.21.5", + "http-body-util", + "hyper 1.1.0", + "hyper-util", + "pin-project", + "rand", + "sha1", + "simdutf8", + "thiserror", + "tokio", + "utf-8", +] + [[package]] name = "fd-lock" version = "4.0.1" @@ -5958,7 +5982,7 @@ dependencies = [ "bytes", "console_static_text", "denokv_proto", - "fastwebsockets", + "fastwebsockets 0.5.0", "flate2", "futures", "glob", diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index 2e9fc4d7e5..a00a4f6694 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -98,10 +98,15 @@ fastwebsockets.workspace = true console_static_text.workspace = true dlopen2.workspace = true encoding_rs.workspace = true +fastwebsockets_06 = { package = "fastwebsockets", version = "0.6", features = ["upgrade"] } filetime = "0.2.16" fs3.workspace = true http.workspace = true +http-body-util = "0.1" +http_1 = { package = "http", version = "1.0" } hyper = { workspace = true, features = ["server", "stream", "http1", "http2", "runtime"] } +hyper-util = { version = "0.1", features = ["server", "server-auto"] } +hyper1 = { package = "hyper", version = "1.0.1", features = ["server"] } libc.workspace = true log.workspace = true netif = "0.1.6" diff --git a/runtime/inspector_server.rs b/runtime/inspector_server.rs index 313b5fd514..a1864266a0 100644 --- a/runtime/inspector_server.rs +++ b/runtime/inspector_server.rs @@ -7,7 +7,6 @@ use deno_core::futures::channel::mpsc::UnboundedReceiver; use deno_core::futures::channel::mpsc::UnboundedSender; use deno_core::futures::channel::oneshot; use deno_core::futures::future; -use deno_core::futures::future::Future; use deno_core::futures::prelude::*; use deno_core::futures::select; use deno_core::futures::stream::StreamExt; @@ -20,17 +19,20 @@ use deno_core::url::Url; use deno_core::InspectorMsg; use deno_core::InspectorSessionProxy; use deno_core::JsRuntime; -use fastwebsockets::Frame; -use fastwebsockets::OpCode; -use fastwebsockets::WebSocket; +use fastwebsockets_06::Frame; +use fastwebsockets_06::OpCode; +use fastwebsockets_06::WebSocket; +use hyper::body::Bytes; +use hyper_util::rt::TokioIo; use std::cell::RefCell; use std::collections::HashMap; -use std::convert::Infallible; use std::net::SocketAddr; use std::pin::pin; use std::process; use std::rc::Rc; use std::thread; +use tokio::net::TcpListener; +use tokio::sync::broadcast; use uuid::Uuid; /// Websocket server that is used to proxy connections from @@ -38,7 +40,7 @@ use uuid::Uuid; pub struct InspectorServer { pub host: SocketAddr, register_inspector_tx: UnboundedSender, - shutdown_server_tx: Option>, + shutdown_server_tx: Option>, thread_handle: Option>, } @@ -47,7 +49,7 @@ impl InspectorServer { let (register_inspector_tx, register_inspector_rx) = mpsc::unbounded::(); - let (shutdown_server_tx, shutdown_server_rx) = oneshot::channel(); + let (shutdown_server_tx, shutdown_server_rx) = broadcast::channel(1); let thread_handle = thread::spawn(move || { let rt = crate::tokio_util::create_basic_runtime(); @@ -101,26 +103,12 @@ impl Drop for InspectorServer { } } -// Needed so hyper can use non Send futures -#[derive(Clone)] -struct LocalExecutor; - -impl hyper::rt::Executor for LocalExecutor -where - Fut: Future + 'static, - Fut::Output: 'static, -{ - fn execute(&self, fut: Fut) { - deno_core::unsync::spawn(fut); - } -} - fn handle_ws_request( - req: http::Request, + req: http_1::Request, inspector_map_rc: Rc>>, -) -> http::Result> { +) -> http_1::Result>>> { let (parts, body) = req.into_parts(); - let req = http::Request::from_parts(parts, ()); + let req = http_1::Request::from_parts(parts, ()); let maybe_uuid = req .uri() @@ -129,9 +117,9 @@ fn handle_ws_request( .and_then(|s| Uuid::parse_str(s).ok()); if maybe_uuid.is_none() { - return http::Response::builder() - .status(http::StatusCode::BAD_REQUEST) - .body("Malformed inspector UUID".into()); + return http_1::Response::builder() + .status(http_1::StatusCode::BAD_REQUEST) + .body(Box::new(Bytes::from("Malformed inspector UUID").into())); } // run in a block to not hold borrow to `inspector_map` for too long @@ -140,34 +128,47 @@ fn handle_ws_request( let maybe_inspector_info = inspector_map.get(&maybe_uuid.unwrap()); if maybe_inspector_info.is_none() { - return http::Response::builder() - .status(http::StatusCode::NOT_FOUND) - .body("Invalid inspector UUID".into()); + return http_1::Response::builder() + .status(http_1::StatusCode::NOT_FOUND) + .body(Box::new(Bytes::from("Invalid inspector UUID").into())); } let info = maybe_inspector_info.unwrap(); info.new_session_tx.clone() }; let (parts, _) = req.into_parts(); - let mut req = http::Request::from_parts(parts, body); + let mut req = http_1::Request::from_parts(parts, body); - let (resp, fut) = match fastwebsockets::upgrade::upgrade(&mut req) { - Ok(e) => e, + let (resp, fut) = match fastwebsockets_06::upgrade::upgrade(&mut req) { + Ok((resp, fut)) => { + let (parts, _body) = resp.into_parts(); + let resp = http_1::Response::from_parts( + parts, + Box::new(http_body_util::Full::new(Bytes::new())), + ); + (resp, fut) + } _ => { - return http::Response::builder() - .status(http::StatusCode::BAD_REQUEST) - .body("Not a valid Websocket Request".into()); + return http_1::Response::builder() + .status(http_1::StatusCode::BAD_REQUEST) + .body(Box::new( + Bytes::from("Not a valid Websocket Request").into(), + )); } }; // spawn a task that will wait for websocket connection and then pump messages between // the socket and inspector proxy spawn(async move { - let websocket = if let Ok(w) = fut.await { - w - } else { - eprintln!("Inspector server failed to upgrade to WS connection"); - return; + let websocket = match fut.await { + Ok(w) => w, + Err(err) => { + eprintln!( + "Inspector server failed to upgrade to WS connection: {:?}", + err + ); + return; + } }; // The 'outbound' channel carries messages sent to the websocket. @@ -191,31 +192,37 @@ fn handle_ws_request( fn handle_json_request( inspector_map: Rc>>, host: Option, -) -> http::Result> { +) -> http_1::Result>>> { let data = inspector_map .borrow() .values() .map(move |info| info.get_json_metadata(&host)) .collect::>(); - http::Response::builder() - .status(http::StatusCode::OK) - .header(http::header::CONTENT_TYPE, "application/json") - .body(serde_json::to_string(&data).unwrap().into()) + let body: http_body_util::Full = + Bytes::from(serde_json::to_string(&data).unwrap()).into(); + http_1::Response::builder() + .status(http_1::StatusCode::OK) + .header(http_1::header::CONTENT_TYPE, "application/json") + .body(Box::new(body)) } fn handle_json_version_request( version_response: Value, -) -> http::Result> { - http::Response::builder() - .status(http::StatusCode::OK) - .header(http::header::CONTENT_TYPE, "application/json") - .body(serde_json::to_string(&version_response).unwrap().into()) +) -> http_1::Result>>> { + let body = Box::new(http_body_util::Full::from( + serde_json::to_string(&version_response).unwrap(), + )); + + http_1::Response::builder() + .status(http_1::StatusCode::OK) + .header(http_1::header::CONTENT_TYPE, "application/json") + .body(body) } async fn server( host: SocketAddr, register_inspector_rx: UnboundedReceiver, - shutdown_server_rx: oneshot::Receiver<()>, + shutdown_server_rx: broadcast::Receiver<()>, name: &str, ) { let inspector_map_ = @@ -253,61 +260,99 @@ async fn server( "V8-Version": deno_core::v8_version(), }); - let make_svc = hyper::service::make_service_fn(|_| { - let inspector_map = Rc::clone(&inspector_map_); - let json_version_response = json_version_response.clone(); - - future::ok::<_, Infallible>(hyper::service::service_fn( - move |req: http::Request| { - future::ready({ - // If the host header can make a valid URL, use it - let host = req - .headers() - .get("host") - .and_then(|host| host.to_str().ok()) - .and_then(|host| Url::parse(&format!("http://{host}")).ok()) - .and_then(|url| match (url.host(), url.port()) { - (Some(host), Some(port)) => Some(format!("{host}:{port}")), - (Some(host), None) => Some(format!("{host}")), - _ => None, - }); - match (req.method(), req.uri().path()) { - (&http::Method::GET, path) if path.starts_with("/ws/") => { - handle_ws_request(req, Rc::clone(&inspector_map)) - } - (&http::Method::GET, "/json/version") => { - handle_json_version_request(json_version_response.clone()) - } - (&http::Method::GET, "/json") => { - handle_json_request(Rc::clone(&inspector_map), host) - } - (&http::Method::GET, "/json/list") => { - handle_json_request(Rc::clone(&inspector_map), host) - } - _ => http::Response::builder() - .status(http::StatusCode::NOT_FOUND) - .body("Not Found".into()), - } - }) - }, - )) - }); - // Create the server manually so it can use the Local Executor - let mut server_handler = pin!(hyper::server::Builder::new( - hyper::server::conn::AddrIncoming::bind(&host).unwrap_or_else(|e| { - eprintln!("Cannot start inspector server: {e}."); - process::exit(1); - }), - hyper::server::conn::Http::new().with_executor(LocalExecutor), - ) - .serve(make_svc) - .with_graceful_shutdown(async { - shutdown_server_rx.await.ok(); - }) - .unwrap_or_else(|err| { - eprintln!("Cannot start inspector server: {err}."); - process::exit(1); + let listener = match TcpListener::bind(&host).await { + Ok(l) => l, + Err(err) => { + eprintln!("Cannot start inspector server: {:?}", err); + return; + } + }; + + let mut server_handler = pin!(deno_core::unsync::spawn(async move { + loop { + let mut rx = shutdown_server_rx.resubscribe(); + let mut shutdown_rx = pin!(rx.recv()); + let mut accept = pin!(listener.accept()); + + let stream = tokio::select! { + accept_result = &mut accept => { + match accept_result { + Ok((s, _)) => s, + Err(err) => { + eprintln!("Failed to accept inspector connection: {:?}", err); + continue; + } + } + }, + + _ = &mut shutdown_rx => { + break; + } + }; + let io = TokioIo::new(stream); + + let inspector_map = Rc::clone(&inspector_map_); + let json_version_response = json_version_response.clone(); + let mut shutdown_server_rx = shutdown_server_rx.resubscribe(); + + let service = hyper1::service::service_fn( + move |req: http_1::Request| { + future::ready({ + // If the host header can make a valid URL, use it + let host = req + .headers() + .get("host") + .and_then(|host| host.to_str().ok()) + .and_then(|host| Url::parse(&format!("http://{host}")).ok()) + .and_then(|url| match (url.host(), url.port()) { + (Some(host), Some(port)) => Some(format!("{host}:{port}")), + (Some(host), None) => Some(format!("{host}")), + _ => None, + }); + match (req.method(), req.uri().path()) { + (&http_1::Method::GET, path) if path.starts_with("/ws/") => { + handle_ws_request(req, Rc::clone(&inspector_map)) + } + (&http_1::Method::GET, "/json/version") => { + handle_json_version_request(json_version_response.clone()) + } + (&http_1::Method::GET, "/json") => { + handle_json_request(Rc::clone(&inspector_map), host) + } + (&http_1::Method::GET, "/json/list") => { + handle_json_request(Rc::clone(&inspector_map), host) + } + _ => http_1::Response::builder() + .status(http_1::StatusCode::NOT_FOUND) + .body(Box::new(http_body_util::Full::new(Bytes::from( + "Not Found", + )))), + } + }) + }, + ); + + deno_core::unsync::spawn(async move { + let server = hyper1::server::conn::http1::Builder::new(); + + let mut conn = + pin!(server.serve_connection(io, service).with_upgrades()); + let mut shutdown_rx = pin!(shutdown_server_rx.recv()); + + tokio::select! { + result = conn.as_mut() => { + if let Err(err) = result { + eprintln!("Failed to serve connection: {:?}", err); + } + }, + _ = &mut shutdown_rx => { + conn.as_mut().graceful_shutdown(); + let _ = conn.await; + } + } + }); + } }) .fuse()); @@ -331,7 +376,7 @@ async fn server( /// 'futures' crate, therefore they can't participate in Tokio's cooperative /// task yielding. async fn pump_websocket_messages( - mut websocket: WebSocket, + mut websocket: WebSocket>, inbound_tx: UnboundedSender, mut outbound_rx: UnboundedReceiver, ) {