diff --git a/Cargo.lock b/Cargo.lock index 4bb0cfb18e..a4a5e242dd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1790,6 +1790,8 @@ dependencies = [ "deno_net", "deno_websocket", "flate2", + "h3", + "h3-quinn", "http 0.2.12", "http 1.1.0", "http-body-util", @@ -3685,6 +3687,34 @@ dependencies = [ "tracing", ] +[[package]] +name = "h3" +version = "0.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e7675a0963b47a6d12fe44c279918b4ffb19baee838ac37f48d2722ad5bc6ab" +dependencies = [ + "bytes", + "fastrand", + "futures-util", + "http 1.1.0", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "h3-quinn" +version = "0.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "17c799f413fceeea505236c4d8132f084ff4b55a652288d91439ee93dc24d855" +dependencies = [ + "bytes", + "futures", + "h3", + "quinn", + "tokio", + "tokio-util", +] + [[package]] name = "halfbrown" version = "0.2.5" @@ -5968,6 +5998,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "62e96808277ec6f97351a2380e6c25114bc9e67037775464979f3037c92d05ef" dependencies = [ "bytes", + "futures-io", "pin-project-lite", "quinn-proto", "quinn-udp", diff --git a/Cargo.toml b/Cargo.toml index 81c750f0af..7adf4d6d2b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -135,6 +135,8 @@ fs3 = "0.5.0" futures = "0.3.21" glob = "0.3.1" h2 = "0.4.4" +h3 = "0.0.6" +h3-quinn = "0.0.7" hickory-resolver = { version = "0.25.0-alpha.4", features = ["tokio-runtime", "serde"] } http = "1.0" http-body = "1.0" @@ -169,6 +171,7 @@ pin-project = "1.0.11" # don't pin because they yank crates from cargo pretty_assertions = "=1.4.0" prost = "0.13" prost-build = "0.13" +quinn = { version = "0.11.6", default-features = false, features = ["runtime-tokio", "rustls", "ring"] } rand = "=0.8.5" regex = "^1.7.0" reqwest = { version = "=0.12.5", default-features = false, features = ["rustls-tls", "stream", "gzip", "brotli", "socks", "json", "http2"] } # pinned because of https://github.com/seanmonstar/reqwest/pull/1955 diff --git a/ext/http/00_serve.ts b/ext/http/00_serve.ts index 446533e910..21143766a2 100644 --- a/ext/http/00_serve.ts +++ b/ext/http/00_serve.ts @@ -42,6 +42,7 @@ const { TypedArrayPrototypeGetSymbolToStringTag, Uint8Array, Promise, + SafePromiseAll, } = primordials; const { getAsyncContext, @@ -340,7 +341,7 @@ class InnerRequest { this.#methodAndUri = op_http_get_request_method_and_url(this.#external); } return { - transport: "tcp", + transport, hostname: this.#methodAndUri[3], port: this.#methodAndUri[4], }; @@ -727,7 +728,7 @@ function serve(arg1, arg2) { const path = listener.addr.path; return serveHttpOnListener(listener, signal, handler, onError, () => { if (options.onListen) { - options.onListen(listener.addr); + options.onListen(listener.addr, "unix"); } else { // deno-lint-ignore no-console console.error(`Listening on ${path}`); @@ -754,6 +755,7 @@ function serve(arg1, arg2) { } let listener; + let listenerQuic; if (wantsHttps) { if (!options.cert || !options.key) { throw new TypeError( @@ -765,6 +767,17 @@ function serve(arg1, arg2) { listenOpts.alpnProtocols = ["h2", "http/1.1"]; listener = listenTls(listenOpts); listenOpts.port = listener.addr.port; + + if (Deno.QuicEndpoint) { + listenerQuic = new Deno.QuicEndpoint({ + hostname: listenOpts.hostname, + port: listenOpts.port, + }).listen({ + key: listenOpts.key, + cert: listenOpts.cert, + alpnProtocols: ["h3"], + }); + } } else { listener = listen(listenOpts); listenOpts.port = listener.addr.port; @@ -772,18 +785,56 @@ function serve(arg1, arg2) { const addr = listener.addr; - const onListen = (scheme) => { + const onListen = (scheme, transport = "tcp") => { if (options.onListen) { - options.onListen(addr); + options.onListen(addr, transport); } else { const host = formatHostName(addr.hostname); // deno-lint-ignore no-console - console.error(`Listening on ${scheme}${host}:${addr.port}/`); + console.error( + `Listening with ${transport} on ${scheme}${host}:${addr.port}/`, + ); } }; - return serveHttpOnListener(listener, signal, handler, onError, onListen); + const tcp = serveHttpOnListener(listener, signal, handler, onError, onListen); + + if (listenerQuic) { + const udp = serveHttpOnListener( + listenerQuic, + signal, + handler, + onError, + onListen, + ); + return { + addr: tcp.addr, + finished: SafePromiseAll([ + tcp.finished, + udp.finished, + ]), + async shutdown() { + await SafePromiseAll([ + tcp.shutdown(), + udp.shutdown(), + ]); + }, + ref() { + tcp.ref(); + udp.ref(); + }, + unref() { + tcp.unref(); + udp.unref(); + }, + [SymbolAsyncDispose]() { + return this.shutdown(); + }, + }; + } + + return tcp; } /** @@ -797,7 +848,7 @@ function serveHttpOnListener(listener, signal, handler, onError, onListen) { ); const callback = mapToCallback(context, handler, onError); - onListen(context.scheme); + onListen(context.scheme, context.listener.addr.transport); return serveHttpOn(context, listener.addr, callback); } diff --git a/ext/http/Cargo.toml b/ext/http/Cargo.toml index e7aaad2fc0..175d69556b 100644 --- a/ext/http/Cargo.toml +++ b/ext/http/Cargo.toml @@ -31,7 +31,10 @@ deno_core.workspace = true deno_net.workspace = true deno_websocket.workspace = true flate2.workspace = true +h3.workspace = true +h3-quinn.workspace = true http.workspace = true +http-body-util.workspace = true http_v02.workspace = true httparse.workspace = true hyper.workspace = true diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs index 7dbac6021a..1efaf0cabb 100644 --- a/ext/http/http_next.rs +++ b/ext/http/http_next.rs @@ -3,6 +3,7 @@ use crate::compressible::is_content_compressible; use crate::extract_network_stream; use crate::network_buffered_stream::NetworkStreamPrefixCheck; use crate::request_body::HttpRequestBody; +use crate::request_properties::listener_properties; use crate::request_properties::HttpConnectionProperties; use crate::request_properties::HttpListenProperties; use crate::request_properties::HttpPropertyExtractor; @@ -22,6 +23,7 @@ use crate::Options; use cache_control::CacheControl; use deno_core::external; use deno_core::futures::future::poll_fn; +use deno_core::futures::StreamExt; use deno_core::futures::TryFutureExt; use deno_core::op2; use deno_core::serde_v8::from_v8; @@ -42,8 +44,12 @@ use deno_core::RcRef; use deno_core::Resource; use deno_core::ResourceId; use deno_net::ops_tls::TlsStream; +use deno_net::quic::ListenerResource as QuicListener; use deno_net::raw::NetworkStream; +use deno_net::raw::NetworkStreamAddress; +use deno_net::raw::NetworkStreamType; use deno_websocket::ws_create_server_stream; +use http_body_util::BodyExt; use hyper::body::Incoming; use hyper::header::HeaderMap; use hyper::header::ACCEPT_ENCODING; @@ -157,6 +163,10 @@ pub enum HttpNextError { #[error("{0}")] Hyper(#[from] hyper::Error), #[error(transparent)] + H3(#[from] h3::Error), + #[error(transparent)] + QuinnConnectionError(#[from] h3_quinn::quinn::ConnectionError), + #[error(transparent)] JoinError(#[from] tokio::task::JoinError), #[error(transparent)] Canceled(#[from] deno_core::Canceled), @@ -905,7 +915,12 @@ fn serve_https( } = lifetime; let svc = service_fn(move |req: Request| { - handle_request(req, request_info.clone(), server_state.clone(), tx.clone()) + handle_request( + req.map(Into::into), + request_info.clone(), + server_state.clone(), + tx.clone(), + ) }); spawn( async move { @@ -953,7 +968,12 @@ fn serve_http( } = lifetime; let svc = service_fn(move |req: Request| { - handle_request(req, request_info.clone(), server_state.clone(), tx.clone()) + handle_request( + req.map(Into::into), + request_info.clone(), + server_state.clone(), + tx.clone(), + ) }); spawn( serve_http2_autodetect(io, svc, listen_cancel_handle, options) @@ -1062,9 +1082,19 @@ pub fn op_http_serve( where HTTP: HttpPropertyExtractor, { - let listener = - HTTP::get_listener_for_rid(&mut state.borrow_mut(), listener_rid) - .map_err(HttpNextError::Resource)?; + let listener = { + let mut bstate = state.borrow_mut(); + + if let Ok(endpoint) = + bstate.resource_table.take::(listener_rid) + { + drop(bstate); + return serve_on_quic(state, endpoint); + } + + HTTP::get_listener_for_rid(&mut bstate, listener_rid) + .map_err(HttpNextError::Resource)? + }; let listen_properties = HTTP::listen_properties_from_listener(&listener)?; @@ -1110,6 +1140,117 @@ where )) } +fn serve_on_quic( + state: Rc>, + listener: Rc, +) -> Result<(ResourceId, &'static str, String), HttpNextError> { + let endpoint = listener.0.clone(); + let (tx, rx) = tokio::sync::mpsc::channel(10); + let resource: Rc = Rc::new(HttpJoinHandle::new(rx)); + let lifetime = resource.lifetime(); + + let local_addr = endpoint.local_addr()?; + let properties = listener_properties( + NetworkStreamType::Tls, + NetworkStreamAddress::Ip(local_addr), + )?; + + let handle = spawn(async move { + while let Some(incoming) = endpoint + .accept() + .or_cancel(lifetime.listen_cancel_handle.clone()) + .await? + { + let lifetime = lifetime.clone(); + let tx = tx.clone(); + spawn(async move { + let conn = incoming + .accept()? + .or_cancel(lifetime.listen_cancel_handle.clone()) + .await??; + + let request_info = HttpConnectionProperties { + peer_address: Rc::from(conn.remote_address().ip().to_string()), + peer_port: Some(conn.remote_address().port()), + local_port: properties.local_port, + stream_type: properties.stream_type, + }; + + let mut h3_conn = h3::server::builder() + .send_grease(true) + .enable_datagram(true) + .enable_connect(true) + .enable_webtransport(true) + .max_webtransport_sessions(1u32.into()) + .build(h3_quinn::Connection::new(conn)) + .or_cancel(lifetime.connection_cancel_handle.clone()) + .await??; + + while let Some((req, stream)) = h3_conn + .accept() + .or_cancel(lifetime.connection_cancel_handle.clone()) + .await?? + { + let request_info = request_info.clone(); + let lifetime = lifetime.clone(); + let tx = tx.clone(); + spawn(async move { + let (mut send, recv) = stream.split(); + + let (parts, _body) = req.into_parts(); + let req = hyper::Request::from_parts(parts, recv.into()); + + match handle_request( + req, + request_info, + lifetime.server_state.clone(), + tx.clone(), + ) + .await + { + Ok(res) => { + let (parts, body) = res.into_parts(); + let res = hyper::Response::from_parts(parts, ()); + + send.send_response(res).await?; + + let mut body = body.into_data_stream(); + while let Some(v) = body.next().await { + send.send_data(v.unwrap()).await?; + } + } + Err(_) => { + send + .send_response( + hyper::Response::builder().status(500).body(()).unwrap(), + ) + .await?; + } + } + + Ok::<_, HttpNextError>(()) + }); + } + + Ok::<_, HttpNextError>(()) + }); + } + + Ok(()) + }); + + // Set the handle after we start the future + *RcRef::map(&resource, |this| &this.join_handle) + .try_borrow_mut() + .unwrap() = Some(handle); + + Ok(( + state.borrow_mut().resource_table.add_rc(resource), + properties.scheme, + properties.fallback_host, + )) +} + #[op2] #[serde] pub fn op_http_serve_on( diff --git a/ext/http/request_body.rs b/ext/http/request_body.rs index f1c3f358ea..9187d0a233 100644 --- a/ext/http/request_body.rs +++ b/ext/http/request_body.rs @@ -1,4 +1,5 @@ // Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. +use crate::service::RequestBody; use bytes::Bytes; use deno_core::futures::stream::Peekable; use deno_core::futures::Stream; @@ -10,7 +11,6 @@ use deno_core::BufView; use deno_core::RcRef; use deno_core::Resource; use hyper::body::Body; -use hyper::body::Incoming; use hyper::body::SizeHint; use std::borrow::Cow; use std::pin::Pin; @@ -19,10 +19,10 @@ use std::task::ready; use std::task::Poll; /// Converts a hyper incoming body stream into a stream of [`Bytes`] that we can use to read in V8. -struct ReadFuture(Incoming); +struct ReadFuture(RequestBody); impl Stream for ReadFuture { - type Item = Result; + type Item = Result; fn poll_next( self: Pin<&mut Self>, @@ -53,12 +53,15 @@ impl Stream for ReadFuture { pub struct HttpRequestBody(AsyncRefCell>, SizeHint); impl HttpRequestBody { - pub fn new(body: Incoming) -> Self { + pub fn new(body: RequestBody) -> Self { let size_hint = body.size_hint(); Self(AsyncRefCell::new(ReadFuture(body).peekable()), size_hint) } - async fn read(self: Rc, limit: usize) -> Result { + async fn read( + self: Rc, + limit: usize, + ) -> Result { let peekable = RcRef::map(self, |this| &this.0); let mut peekable = peekable.borrow_mut().await; match Pin::new(&mut *peekable).peek_mut().await { diff --git a/ext/http/request_properties.rs b/ext/http/request_properties.rs index 39d35a79f1..6638e1df69 100644 --- a/ext/http/request_properties.rs +++ b/ext/http/request_properties.rs @@ -196,7 +196,7 @@ impl HttpPropertyExtractor for DefaultHttpPropertyExtractor { } } -fn listener_properties( +pub(crate) fn listener_properties( stream_type: NetworkStreamType, local_address: NetworkStreamAddress, ) -> Result { diff --git a/ext/http/service.rs b/ext/http/service.rs index ce24dea43f..e565971cc6 100644 --- a/ext/http/service.rs +++ b/ext/http/service.rs @@ -2,6 +2,7 @@ use crate::request_properties::HttpConnectionProperties; use crate::response_body::ResponseBytesInner; use crate::response_body::ResponseStreamResult; +use bytes::Buf; use deno_core::futures::ready; use deno_core::BufView; use deno_core::OpState; @@ -29,7 +30,7 @@ use std::task::Poll; use std::task::Waker; use tokio::sync::oneshot; -pub type Request = hyper::Request; +pub type Request = hyper::Request; pub type Response = hyper::Response; #[cfg(feature = "__http_tracing")] @@ -148,13 +149,54 @@ impl std::ops::Deref for HttpServerState { } } -enum RequestBodyState { +type H3Stream = h3::server::RequestStream; + +pub enum RequestBody { Incoming(Incoming), + H3(H3Stream), +} + +impl Body for RequestBody { + type Data = bytes::Bytes; + type Error = crate::HttpNextError; + + fn poll_frame( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>>> { + match self.get_mut() { + Self::Incoming(incoming) => { + Pin::new(incoming).poll_frame(cx).map_err(Into::into) + } + Self::H3(stream) => match stream.poll_recv_data(cx)? { + Poll::Ready(v) => Poll::Ready( + v.map(|mut v| Ok(Frame::data(v.copy_to_bytes(v.remaining())))), + ), + Poll::Pending => Poll::Pending, + }, + } + } +} + +impl From for RequestBody { + fn from(value: Incoming) -> Self { + Self::Incoming(value) + } +} + +impl From for RequestBody { + fn from(value: H3Stream) -> Self { + Self::H3(value) + } +} + +enum RequestBodyState { + Incoming(RequestBody), Resource(#[allow(dead_code)] HttpRequestBodyAutocloser), } -impl From for RequestBodyState { - fn from(value: Incoming) -> Self { +impl From for RequestBodyState { + fn from(value: RequestBody) -> Self { RequestBodyState::Incoming(value) } } @@ -365,7 +407,7 @@ impl HttpRecord { } /// Take the Hyper body from this record. - pub fn take_request_body(&self) -> Option { + pub fn take_request_body(&self) -> Option { let body_holder = &mut self.self_mut().request_body; let body = body_holder.take(); match body { @@ -544,13 +586,13 @@ impl Body for HttpRecordResponse { if let Some(trailers) = inner.trailers.take() { return Poll::Ready(Some(Ok(Frame::trailers(trailers)))); } - unreachable!() + return Poll::Ready(None); } ResponseBytesInner::Bytes(..) => { drop(inner); let ResponseBytesInner::Bytes(data) = record.take_response_body() else { - unreachable!(); + return Poll::Ready(None); }; return Poll::Ready(Some(Ok(Frame::data(data)))); } @@ -665,7 +707,7 @@ mod tests { }; let svc = service_fn(move |req: hyper::Request| { handle_request( - req, + req.map(Into::into), request_info.clone(), server_state.clone(), tx.clone(), diff --git a/ext/net/03_quic.js b/ext/net/03_quic.js index ffb9f9186a..a03fb0c6e8 100644 --- a/ext/net/03_quic.js +++ b/ext/net/03_quic.js @@ -29,7 +29,6 @@ import { op_quic_incoming_remote_addr, op_quic_incoming_remote_addr_validated, op_quic_listener_accept, - op_quic_listener_stop, op_quic_recv_stream_get_id, op_quic_send_stream_get_id, op_quic_send_stream_get_priority, @@ -45,9 +44,11 @@ import { } from "ext:deno_web/06_streams.js"; import { loadTlsKeyPair } from "ext:deno_net/02_tls.js"; const { + internalRidSymbol, BadResourcePrototype, } = core; const { + ObjectDefineProperty, ObjectPrototypeIsPrototypeOf, PromisePrototypeThen, SymbolAsyncIterator, @@ -84,7 +85,10 @@ class QuicEndpoint { } get addr() { - return op_quic_endpoint_get_addr(this.#endpoint); + return { + ...op_quic_endpoint_get_addr(this.#endpoint), + transport: "quic", + }; } listen(options) { @@ -117,6 +121,16 @@ class QuicListener { constructor(listener, endpoint) { this.#listener = listener; this.#endpoint = endpoint; + + ObjectDefineProperty(this, internalRidSymbol, { + __proto__: null, + enumerable: false, + value: listener, + }); + } + + get addr() { + return this.#endpoint.addr; } get endpoint() { @@ -151,7 +165,7 @@ class QuicListener { } stop() { - op_quic_listener_stop(this.#listener); + core.close(this.#listener); } } diff --git a/ext/net/Cargo.toml b/ext/net/Cargo.toml index eaee7bfb4b..c436813493 100644 --- a/ext/net/Cargo.toml +++ b/ext/net/Cargo.toml @@ -20,7 +20,7 @@ deno_tls.workspace = true hickory-proto = "0.25.0-alpha.4" hickory-resolver.workspace = true pin-project.workspace = true -quinn = { version = "0.11.6", default-features = false, features = ["runtime-tokio", "rustls", "ring"] } +quinn.workspace = true rustls-tokio-stream.workspace = true serde.workspace = true socket2.workspace = true diff --git a/ext/net/lib.rs b/ext/net/lib.rs index 878f336a24..cc5b83f496 100644 --- a/ext/net/lib.rs +++ b/ext/net/lib.rs @@ -5,7 +5,7 @@ pub mod ops; pub mod ops_tls; #[cfg(unix)] pub mod ops_unix; -mod quic; +pub mod quic; pub mod raw; pub mod resolve_addr; pub mod tcp; @@ -190,7 +190,6 @@ deno_core::extension!(deno_net, quic::op_quic_incoming_remote_addr, quic::op_quic_incoming_remote_addr_validated, quic::op_quic_listener_accept, - quic::op_quic_listener_stop, quic::op_quic_recv_stream_get_id, quic::op_quic_send_stream_get_id, quic::op_quic_send_stream_get_priority, diff --git a/ext/net/quic.rs b/ext/net/quic.rs index 08e967bf8c..4d9fd2d2cc 100644 --- a/ext/net/quic.rs +++ b/ext/net/quic.rs @@ -250,24 +250,27 @@ pub(crate) fn op_quic_endpoint_close( Ok(()) } -struct ListenerResource(quinn::Endpoint, Arc); +pub struct ListenerResource(pub quinn::Endpoint, Arc); -impl Drop for ListenerResource { - fn drop(&mut self) { +impl Resource for ListenerResource { + fn name(&self) -> Cow { + "quicListener".into() + } + + fn close(self: Rc) { self.0.set_server_config(None); } } -impl GarbageCollected for ListenerResource {} - #[op2] -#[cppgc] +#[smi] pub(crate) fn op_quic_endpoint_listen( + state: Rc>, #[cppgc] endpoint: &EndpointResource, #[serde] args: ListenArgs, #[serde] transport_config: TransportConfig, #[cppgc] keys: &TlsKeysHolder, -) -> Result { +) -> Result { if !endpoint.can_listen { return Err(QuicError::CannotListen); } @@ -301,7 +304,11 @@ pub(crate) fn op_quic_endpoint_listen( endpoint.endpoint.set_server_config(Some(config)); - Ok(ListenerResource(endpoint.endpoint.clone(), server_config)) + let rid = state + .borrow_mut() + .resource_table + .add(ListenerResource(endpoint.endpoint.clone(), server_config)); + Ok(rid) } struct ConnectionResource( @@ -321,8 +328,13 @@ impl GarbageCollected for IncomingResource {} #[op2(async)] #[cppgc] pub(crate) async fn op_quic_listener_accept( - #[cppgc] resource: &ListenerResource, + state: Rc>, + #[smi] resource_id: u32, ) -> Result { + let resource = state + .borrow() + .resource_table + .get::(resource_id)?; match resource.0.accept().await { Some(incoming) => Ok(IncomingResource( RefCell::new(Some(incoming)), @@ -332,11 +344,6 @@ pub(crate) async fn op_quic_listener_accept( } } -#[op2(fast)] -pub(crate) fn op_quic_listener_stop(#[cppgc] resource: &ListenerResource) { - resource.0.set_server_config(None); -} - #[op2] #[string] pub(crate) fn op_quic_incoming_local_ip( diff --git a/runtime/errors.rs b/runtime/errors.rs index 2f7b8b1c82..b22ec63d55 100644 --- a/runtime/errors.rs +++ b/runtime/errors.rs @@ -1013,6 +1013,8 @@ fn get_http_next_error(error: &HttpNextError) -> &'static str { HttpNextError::Io(e) => get_io_error_class(e), HttpNextError::WebSocketUpgrade(e) => get_websocket_upgrade_error(e), HttpNextError::Hyper(e) => get_hyper_error_class(e), + HttpNextError::H3(_) => "Http", + HttpNextError::QuinnConnectionError(_) => "Http", HttpNextError::JoinError(_) => "Error", HttpNextError::Canceled(e) => { let io_err: io::Error = e.to_owned().into();