1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-20 20:42:19 -05:00

feat(unstable): http3

This commit is contained in:
snek 2024-12-24 10:58:48 +01:00
parent a563265316
commit bbb929fa83
No known key found for this signature in database
13 changed files with 342 additions and 46 deletions

31
Cargo.lock generated
View file

@ -1790,6 +1790,8 @@ dependencies = [
"deno_net",
"deno_websocket",
"flate2",
"h3",
"h3-quinn",
"http 0.2.12",
"http 1.1.0",
"http-body-util",
@ -3685,6 +3687,34 @@ dependencies = [
"tracing",
]
[[package]]
name = "h3"
version = "0.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e7675a0963b47a6d12fe44c279918b4ffb19baee838ac37f48d2722ad5bc6ab"
dependencies = [
"bytes",
"fastrand",
"futures-util",
"http 1.1.0",
"pin-project-lite",
"tokio",
]
[[package]]
name = "h3-quinn"
version = "0.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "17c799f413fceeea505236c4d8132f084ff4b55a652288d91439ee93dc24d855"
dependencies = [
"bytes",
"futures",
"h3",
"quinn",
"tokio",
"tokio-util",
]
[[package]]
name = "halfbrown"
version = "0.2.5"
@ -5968,6 +5998,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62e96808277ec6f97351a2380e6c25114bc9e67037775464979f3037c92d05ef"
dependencies = [
"bytes",
"futures-io",
"pin-project-lite",
"quinn-proto",
"quinn-udp",

View file

@ -135,6 +135,8 @@ fs3 = "0.5.0"
futures = "0.3.21"
glob = "0.3.1"
h2 = "0.4.4"
h3 = "0.0.6"
h3-quinn = "0.0.7"
hickory-resolver = { version = "0.25.0-alpha.4", features = ["tokio-runtime", "serde"] }
http = "1.0"
http-body = "1.0"
@ -169,6 +171,7 @@ pin-project = "1.0.11" # don't pin because they yank crates from cargo
pretty_assertions = "=1.4.0"
prost = "0.13"
prost-build = "0.13"
quinn = { version = "0.11.6", default-features = false, features = ["runtime-tokio", "rustls", "ring"] }
rand = "=0.8.5"
regex = "^1.7.0"
reqwest = { version = "=0.12.5", default-features = false, features = ["rustls-tls", "stream", "gzip", "brotli", "socks", "json", "http2"] } # pinned because of https://github.com/seanmonstar/reqwest/pull/1955

View file

@ -42,6 +42,7 @@ const {
TypedArrayPrototypeGetSymbolToStringTag,
Uint8Array,
Promise,
SafePromiseAll,
} = primordials;
const {
getAsyncContext,
@ -340,7 +341,7 @@ class InnerRequest {
this.#methodAndUri = op_http_get_request_method_and_url(this.#external);
}
return {
transport: "tcp",
transport,
hostname: this.#methodAndUri[3],
port: this.#methodAndUri[4],
};
@ -727,7 +728,7 @@ function serve(arg1, arg2) {
const path = listener.addr.path;
return serveHttpOnListener(listener, signal, handler, onError, () => {
if (options.onListen) {
options.onListen(listener.addr);
options.onListen(listener.addr, "unix");
} else {
// deno-lint-ignore no-console
console.error(`Listening on ${path}`);
@ -754,6 +755,7 @@ function serve(arg1, arg2) {
}
let listener;
let listenerQuic;
if (wantsHttps) {
if (!options.cert || !options.key) {
throw new TypeError(
@ -765,6 +767,17 @@ function serve(arg1, arg2) {
listenOpts.alpnProtocols = ["h2", "http/1.1"];
listener = listenTls(listenOpts);
listenOpts.port = listener.addr.port;
if (Deno.QuicEndpoint) {
listenerQuic = new Deno.QuicEndpoint({
hostname: listenOpts.hostname,
port: listenOpts.port,
}).listen({
key: listenOpts.key,
cert: listenOpts.cert,
alpnProtocols: ["h3"],
});
}
} else {
listener = listen(listenOpts);
listenOpts.port = listener.addr.port;
@ -772,18 +785,56 @@ function serve(arg1, arg2) {
const addr = listener.addr;
const onListen = (scheme) => {
const onListen = (scheme, transport = "tcp") => {
if (options.onListen) {
options.onListen(addr);
options.onListen(addr, transport);
} else {
const host = formatHostName(addr.hostname);
// deno-lint-ignore no-console
console.error(`Listening on ${scheme}${host}:${addr.port}/`);
console.error(
`Listening with ${transport} on ${scheme}${host}:${addr.port}/`,
);
}
};
return serveHttpOnListener(listener, signal, handler, onError, onListen);
const tcp = serveHttpOnListener(listener, signal, handler, onError, onListen);
if (listenerQuic) {
const udp = serveHttpOnListener(
listenerQuic,
signal,
handler,
onError,
onListen,
);
return {
addr: tcp.addr,
finished: SafePromiseAll([
tcp.finished,
udp.finished,
]),
async shutdown() {
await SafePromiseAll([
tcp.shutdown(),
udp.shutdown(),
]);
},
ref() {
tcp.ref();
udp.ref();
},
unref() {
tcp.unref();
udp.unref();
},
[SymbolAsyncDispose]() {
return this.shutdown();
},
};
}
return tcp;
}
/**
@ -797,7 +848,7 @@ function serveHttpOnListener(listener, signal, handler, onError, onListen) {
);
const callback = mapToCallback(context, handler, onError);
onListen(context.scheme);
onListen(context.scheme, context.listener.addr.transport);
return serveHttpOn(context, listener.addr, callback);
}

View file

@ -31,7 +31,10 @@ deno_core.workspace = true
deno_net.workspace = true
deno_websocket.workspace = true
flate2.workspace = true
h3.workspace = true
h3-quinn.workspace = true
http.workspace = true
http-body-util.workspace = true
http_v02.workspace = true
httparse.workspace = true
hyper.workspace = true

View file

@ -3,6 +3,7 @@ use crate::compressible::is_content_compressible;
use crate::extract_network_stream;
use crate::network_buffered_stream::NetworkStreamPrefixCheck;
use crate::request_body::HttpRequestBody;
use crate::request_properties::listener_properties;
use crate::request_properties::HttpConnectionProperties;
use crate::request_properties::HttpListenProperties;
use crate::request_properties::HttpPropertyExtractor;
@ -22,6 +23,7 @@ use crate::Options;
use cache_control::CacheControl;
use deno_core::external;
use deno_core::futures::future::poll_fn;
use deno_core::futures::StreamExt;
use deno_core::futures::TryFutureExt;
use deno_core::op2;
use deno_core::serde_v8::from_v8;
@ -42,8 +44,12 @@ use deno_core::RcRef;
use deno_core::Resource;
use deno_core::ResourceId;
use deno_net::ops_tls::TlsStream;
use deno_net::quic::ListenerResource as QuicListener;
use deno_net::raw::NetworkStream;
use deno_net::raw::NetworkStreamAddress;
use deno_net::raw::NetworkStreamType;
use deno_websocket::ws_create_server_stream;
use http_body_util::BodyExt;
use hyper::body::Incoming;
use hyper::header::HeaderMap;
use hyper::header::ACCEPT_ENCODING;
@ -157,6 +163,10 @@ pub enum HttpNextError {
#[error("{0}")]
Hyper(#[from] hyper::Error),
#[error(transparent)]
H3(#[from] h3::Error),
#[error(transparent)]
QuinnConnectionError(#[from] h3_quinn::quinn::ConnectionError),
#[error(transparent)]
JoinError(#[from] tokio::task::JoinError),
#[error(transparent)]
Canceled(#[from] deno_core::Canceled),
@ -905,7 +915,12 @@ fn serve_https(
} = lifetime;
let svc = service_fn(move |req: Request| {
handle_request(req, request_info.clone(), server_state.clone(), tx.clone())
handle_request(
req.map(Into::into),
request_info.clone(),
server_state.clone(),
tx.clone(),
)
});
spawn(
async move {
@ -953,7 +968,12 @@ fn serve_http(
} = lifetime;
let svc = service_fn(move |req: Request| {
handle_request(req, request_info.clone(), server_state.clone(), tx.clone())
handle_request(
req.map(Into::into),
request_info.clone(),
server_state.clone(),
tx.clone(),
)
});
spawn(
serve_http2_autodetect(io, svc, listen_cancel_handle, options)
@ -1062,9 +1082,19 @@ pub fn op_http_serve<HTTP>(
where
HTTP: HttpPropertyExtractor,
{
let listener =
HTTP::get_listener_for_rid(&mut state.borrow_mut(), listener_rid)
.map_err(HttpNextError::Resource)?;
let listener = {
let mut bstate = state.borrow_mut();
if let Ok(endpoint) =
bstate.resource_table.take::<QuicListener>(listener_rid)
{
drop(bstate);
return serve_on_quic(state, endpoint);
}
HTTP::get_listener_for_rid(&mut bstate, listener_rid)
.map_err(HttpNextError::Resource)?
};
let listen_properties = HTTP::listen_properties_from_listener(&listener)?;
@ -1110,6 +1140,117 @@ where
))
}
fn serve_on_quic(
state: Rc<RefCell<OpState>>,
listener: Rc<QuicListener>,
) -> Result<(ResourceId, &'static str, String), HttpNextError> {
let endpoint = listener.0.clone();
let (tx, rx) = tokio::sync::mpsc::channel(10);
let resource: Rc<HttpJoinHandle> = Rc::new(HttpJoinHandle::new(rx));
let lifetime = resource.lifetime();
let local_addr = endpoint.local_addr()?;
let properties = listener_properties(
NetworkStreamType::Tls,
NetworkStreamAddress::Ip(local_addr),
)?;
let handle = spawn(async move {
while let Some(incoming) = endpoint
.accept()
.or_cancel(lifetime.listen_cancel_handle.clone())
.await?
{
let lifetime = lifetime.clone();
let tx = tx.clone();
spawn(async move {
let conn = incoming
.accept()?
.or_cancel(lifetime.listen_cancel_handle.clone())
.await??;
let request_info = HttpConnectionProperties {
peer_address: Rc::from(conn.remote_address().ip().to_string()),
peer_port: Some(conn.remote_address().port()),
local_port: properties.local_port,
stream_type: properties.stream_type,
};
let mut h3_conn = h3::server::builder()
.send_grease(true)
.enable_datagram(true)
.enable_connect(true)
.enable_webtransport(true)
.max_webtransport_sessions(1u32.into())
.build(h3_quinn::Connection::new(conn))
.or_cancel(lifetime.connection_cancel_handle.clone())
.await??;
while let Some((req, stream)) = h3_conn
.accept()
.or_cancel(lifetime.connection_cancel_handle.clone())
.await??
{
let request_info = request_info.clone();
let lifetime = lifetime.clone();
let tx = tx.clone();
spawn(async move {
let (mut send, recv) = stream.split();
let (parts, _body) = req.into_parts();
let req = hyper::Request::from_parts(parts, recv.into());
match handle_request(
req,
request_info,
lifetime.server_state.clone(),
tx.clone(),
)
.await
{
Ok(res) => {
let (parts, body) = res.into_parts();
let res = hyper::Response::from_parts(parts, ());
send.send_response(res).await?;
let mut body = body.into_data_stream();
while let Some(v) = body.next().await {
send.send_data(v.unwrap()).await?;
}
}
Err(_) => {
send
.send_response(
hyper::Response::builder().status(500).body(()).unwrap(),
)
.await?;
}
}
Ok::<_, HttpNextError>(())
});
}
Ok::<_, HttpNextError>(())
});
}
Ok(())
});
// Set the handle after we start the future
*RcRef::map(&resource, |this| &this.join_handle)
.try_borrow_mut()
.unwrap() = Some(handle);
Ok((
state.borrow_mut().resource_table.add_rc(resource),
properties.scheme,
properties.fallback_host,
))
}
#[op2]
#[serde]
pub fn op_http_serve_on<HTTP>(

View file

@ -1,4 +1,5 @@
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
use crate::service::RequestBody;
use bytes::Bytes;
use deno_core::futures::stream::Peekable;
use deno_core::futures::Stream;
@ -10,7 +11,6 @@ use deno_core::BufView;
use deno_core::RcRef;
use deno_core::Resource;
use hyper::body::Body;
use hyper::body::Incoming;
use hyper::body::SizeHint;
use std::borrow::Cow;
use std::pin::Pin;
@ -19,10 +19,10 @@ use std::task::ready;
use std::task::Poll;
/// Converts a hyper incoming body stream into a stream of [`Bytes`] that we can use to read in V8.
struct ReadFuture(Incoming);
struct ReadFuture(RequestBody);
impl Stream for ReadFuture {
type Item = Result<Bytes, hyper::Error>;
type Item = Result<Bytes, crate::HttpNextError>;
fn poll_next(
self: Pin<&mut Self>,
@ -53,12 +53,15 @@ impl Stream for ReadFuture {
pub struct HttpRequestBody(AsyncRefCell<Peekable<ReadFuture>>, SizeHint);
impl HttpRequestBody {
pub fn new(body: Incoming) -> Self {
pub fn new(body: RequestBody) -> Self {
let size_hint = body.size_hint();
Self(AsyncRefCell::new(ReadFuture(body).peekable()), size_hint)
}
async fn read(self: Rc<Self>, limit: usize) -> Result<BufView, hyper::Error> {
async fn read(
self: Rc<Self>,
limit: usize,
) -> Result<BufView, crate::HttpNextError> {
let peekable = RcRef::map(self, |this| &this.0);
let mut peekable = peekable.borrow_mut().await;
match Pin::new(&mut *peekable).peek_mut().await {

View file

@ -196,7 +196,7 @@ impl HttpPropertyExtractor for DefaultHttpPropertyExtractor {
}
}
fn listener_properties(
pub(crate) fn listener_properties(
stream_type: NetworkStreamType,
local_address: NetworkStreamAddress,
) -> Result<HttpListenProperties, std::io::Error> {

View file

@ -2,6 +2,7 @@
use crate::request_properties::HttpConnectionProperties;
use crate::response_body::ResponseBytesInner;
use crate::response_body::ResponseStreamResult;
use bytes::Buf;
use deno_core::futures::ready;
use deno_core::BufView;
use deno_core::OpState;
@ -29,7 +30,7 @@ use std::task::Poll;
use std::task::Waker;
use tokio::sync::oneshot;
pub type Request = hyper::Request<Incoming>;
pub type Request = hyper::Request<RequestBody>;
pub type Response = hyper::Response<HttpRecordResponse>;
#[cfg(feature = "__http_tracing")]
@ -148,13 +149,54 @@ impl std::ops::Deref for HttpServerState {
}
}
enum RequestBodyState {
type H3Stream = h3::server::RequestStream<h3_quinn::RecvStream, BufView>;
pub enum RequestBody {
Incoming(Incoming),
H3(H3Stream),
}
impl Body for RequestBody {
type Data = bytes::Bytes;
type Error = crate::HttpNextError;
fn poll_frame(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
match self.get_mut() {
Self::Incoming(incoming) => {
Pin::new(incoming).poll_frame(cx).map_err(Into::into)
}
Self::H3(stream) => match stream.poll_recv_data(cx)? {
Poll::Ready(v) => Poll::Ready(
v.map(|mut v| Ok(Frame::data(v.copy_to_bytes(v.remaining())))),
),
Poll::Pending => Poll::Pending,
},
}
}
}
impl From<Incoming> for RequestBody {
fn from(value: Incoming) -> Self {
Self::Incoming(value)
}
}
impl From<H3Stream> for RequestBody {
fn from(value: H3Stream) -> Self {
Self::H3(value)
}
}
enum RequestBodyState {
Incoming(RequestBody),
Resource(#[allow(dead_code)] HttpRequestBodyAutocloser),
}
impl From<Incoming> for RequestBodyState {
fn from(value: Incoming) -> Self {
impl From<RequestBody> for RequestBodyState {
fn from(value: RequestBody) -> Self {
RequestBodyState::Incoming(value)
}
}
@ -365,7 +407,7 @@ impl HttpRecord {
}
/// Take the Hyper body from this record.
pub fn take_request_body(&self) -> Option<Incoming> {
pub fn take_request_body(&self) -> Option<RequestBody> {
let body_holder = &mut self.self_mut().request_body;
let body = body_holder.take();
match body {
@ -544,13 +586,13 @@ impl Body for HttpRecordResponse {
if let Some(trailers) = inner.trailers.take() {
return Poll::Ready(Some(Ok(Frame::trailers(trailers))));
}
unreachable!()
return Poll::Ready(None);
}
ResponseBytesInner::Bytes(..) => {
drop(inner);
let ResponseBytesInner::Bytes(data) = record.take_response_body()
else {
unreachable!();
return Poll::Ready(None);
};
return Poll::Ready(Some(Ok(Frame::data(data))));
}
@ -665,7 +707,7 @@ mod tests {
};
let svc = service_fn(move |req: hyper::Request<Incoming>| {
handle_request(
req,
req.map(Into::into),
request_info.clone(),
server_state.clone(),
tx.clone(),

View file

@ -29,7 +29,6 @@ import {
op_quic_incoming_remote_addr,
op_quic_incoming_remote_addr_validated,
op_quic_listener_accept,
op_quic_listener_stop,
op_quic_recv_stream_get_id,
op_quic_send_stream_get_id,
op_quic_send_stream_get_priority,
@ -45,9 +44,11 @@ import {
} from "ext:deno_web/06_streams.js";
import { loadTlsKeyPair } from "ext:deno_net/02_tls.js";
const {
internalRidSymbol,
BadResourcePrototype,
} = core;
const {
ObjectDefineProperty,
ObjectPrototypeIsPrototypeOf,
PromisePrototypeThen,
SymbolAsyncIterator,
@ -84,7 +85,10 @@ class QuicEndpoint {
}
get addr() {
return op_quic_endpoint_get_addr(this.#endpoint);
return {
...op_quic_endpoint_get_addr(this.#endpoint),
transport: "quic",
};
}
listen(options) {
@ -117,6 +121,16 @@ class QuicListener {
constructor(listener, endpoint) {
this.#listener = listener;
this.#endpoint = endpoint;
ObjectDefineProperty(this, internalRidSymbol, {
__proto__: null,
enumerable: false,
value: listener,
});
}
get addr() {
return this.#endpoint.addr;
}
get endpoint() {
@ -151,7 +165,7 @@ class QuicListener {
}
stop() {
op_quic_listener_stop(this.#listener);
core.close(this.#listener);
}
}

View file

@ -20,7 +20,7 @@ deno_tls.workspace = true
hickory-proto = "0.25.0-alpha.4"
hickory-resolver.workspace = true
pin-project.workspace = true
quinn = { version = "0.11.6", default-features = false, features = ["runtime-tokio", "rustls", "ring"] }
quinn.workspace = true
rustls-tokio-stream.workspace = true
serde.workspace = true
socket2.workspace = true

View file

@ -5,7 +5,7 @@ pub mod ops;
pub mod ops_tls;
#[cfg(unix)]
pub mod ops_unix;
mod quic;
pub mod quic;
pub mod raw;
pub mod resolve_addr;
pub mod tcp;
@ -190,7 +190,6 @@ deno_core::extension!(deno_net,
quic::op_quic_incoming_remote_addr,
quic::op_quic_incoming_remote_addr_validated,
quic::op_quic_listener_accept,
quic::op_quic_listener_stop,
quic::op_quic_recv_stream_get_id,
quic::op_quic_send_stream_get_id,
quic::op_quic_send_stream_get_priority,

View file

@ -250,24 +250,27 @@ pub(crate) fn op_quic_endpoint_close(
Ok(())
}
struct ListenerResource(quinn::Endpoint, Arc<QuicServerConfig>);
pub struct ListenerResource(pub quinn::Endpoint, Arc<QuicServerConfig>);
impl Drop for ListenerResource {
fn drop(&mut self) {
impl Resource for ListenerResource {
fn name(&self) -> Cow<str> {
"quicListener".into()
}
fn close(self: Rc<Self>) {
self.0.set_server_config(None);
}
}
impl GarbageCollected for ListenerResource {}
#[op2]
#[cppgc]
#[smi]
pub(crate) fn op_quic_endpoint_listen(
state: Rc<RefCell<OpState>>,
#[cppgc] endpoint: &EndpointResource,
#[serde] args: ListenArgs,
#[serde] transport_config: TransportConfig,
#[cppgc] keys: &TlsKeysHolder,
) -> Result<ListenerResource, QuicError> {
) -> Result<u32, QuicError> {
if !endpoint.can_listen {
return Err(QuicError::CannotListen);
}
@ -301,7 +304,11 @@ pub(crate) fn op_quic_endpoint_listen(
endpoint.endpoint.set_server_config(Some(config));
Ok(ListenerResource(endpoint.endpoint.clone(), server_config))
let rid = state
.borrow_mut()
.resource_table
.add(ListenerResource(endpoint.endpoint.clone(), server_config));
Ok(rid)
}
struct ConnectionResource(
@ -321,8 +328,13 @@ impl GarbageCollected for IncomingResource {}
#[op2(async)]
#[cppgc]
pub(crate) async fn op_quic_listener_accept(
#[cppgc] resource: &ListenerResource,
state: Rc<RefCell<OpState>>,
#[smi] resource_id: u32,
) -> Result<IncomingResource, QuicError> {
let resource = state
.borrow()
.resource_table
.get::<ListenerResource>(resource_id)?;
match resource.0.accept().await {
Some(incoming) => Ok(IncomingResource(
RefCell::new(Some(incoming)),
@ -332,11 +344,6 @@ pub(crate) async fn op_quic_listener_accept(
}
}
#[op2(fast)]
pub(crate) fn op_quic_listener_stop(#[cppgc] resource: &ListenerResource) {
resource.0.set_server_config(None);
}
#[op2]
#[string]
pub(crate) fn op_quic_incoming_local_ip(

View file

@ -1013,6 +1013,8 @@ fn get_http_next_error(error: &HttpNextError) -> &'static str {
HttpNextError::Io(e) => get_io_error_class(e),
HttpNextError::WebSocketUpgrade(e) => get_websocket_upgrade_error(e),
HttpNextError::Hyper(e) => get_hyper_error_class(e),
HttpNextError::H3(_) => "Http",
HttpNextError::QuinnConnectionError(_) => "Http",
HttpNextError::JoinError(_) => "Error",
HttpNextError::Canceled(e) => {
let io_err: io::Error = e.to_owned().into();