From bfe93c6e814b5ba27e9bd356359910ff3c3f49bd Mon Sep 17 00:00:00 2001 From: Luca Casonato Date: Mon, 15 May 2023 16:55:47 +0200 Subject: [PATCH] refactor(ext/http): generic abstract listeners (#19132) Improve abstractions around listeners to support listener + connection network stream combinations not previously possible (for example a listener exposed as a Tcp, creating Unix network streams). --- Cargo.lock | 1 + ext/http/Cargo.toml | 1 + ext/http/http_next.rs | 42 +++------- ext/http/request_properties.rs | 144 +++++++++++++++++++++++---------- ext/net/raw.rs | 2 +- 5 files changed, 118 insertions(+), 72 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4b46e4b653..6dcecdd031 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1026,6 +1026,7 @@ name = "deno_http" version = "0.99.0" dependencies = [ "async-compression", + "async-trait", "base64 0.13.1", "bencher", "brotli", diff --git a/ext/http/Cargo.toml b/ext/http/Cargo.toml index 8bf1d42e2b..e555d742e5 100644 --- a/ext/http/Cargo.toml +++ b/ext/http/Cargo.toml @@ -22,6 +22,7 @@ harness = false [dependencies] async-compression = { version = "0.3.12", features = ["tokio", "brotli", "gzip"] } +async-trait.workspace = true base64.workspace = true brotli = "3.3.4" bytes.workspace = true diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs index 8b2f91be06..eaa19a89d0 100644 --- a/ext/http/http_next.rs +++ b/ext/http/http_next.rs @@ -33,7 +33,6 @@ use deno_core::ZeroCopyBuf; use deno_net::ops_tls::TlsStream; use deno_net::raw::put_network_stream_resource; use deno_net::raw::NetworkStream; -use deno_net::raw::NetworkStreamAddress; use fly_accept_encoding::Encoding; use http::header::ACCEPT_ENCODING; use http::header::CACHE_CONTROL; @@ -61,9 +60,6 @@ use std::borrow::Cow; use std::cell::RefCell; use std::future::Future; use std::io; -use std::net::Ipv4Addr; -use std::net::SocketAddr; -use std::net::SocketAddrV4; use std::pin::Pin; use std::rc::Rc; @@ -825,7 +821,7 @@ fn serve_http( } fn serve_http_on( - network_stream: NetworkStream, + connection: HTTP::Connection, listen_properties: &HttpListenProperties, cancel: Rc, tx: tokio::sync::mpsc::Sender, @@ -833,15 +829,10 @@ fn serve_http_on( where HTTP: HttpPropertyExtractor, { - // We always want some sort of peer address. If we can't get one, just make up one. - let peer_address = network_stream.peer_address().unwrap_or_else(|_| { - NetworkStreamAddress::Ip(SocketAddr::V4(SocketAddrV4::new( - Ipv4Addr::new(0, 0, 0, 0), - 0, - ))) - }); let connection_properties: HttpConnectionProperties = - HTTP::connection_properties(listen_properties, &peer_address); + HTTP::connection_properties(listen_properties, &connection); + + let network_stream = HTTP::to_network_stream_from_connection(connection); match network_stream { NetworkStream::Tcp(conn) => { @@ -895,14 +886,10 @@ pub fn op_http_serve( where HTTP: HttpPropertyExtractor, { - let listener = HTTP::get_network_stream_listener_for_rid( - &mut state.borrow_mut(), - listener_rid, - )?; + let listener = + HTTP::get_listener_for_rid(&mut state.borrow_mut(), listener_rid)?; - let local_address = listener.listen_address()?; - let listen_properties = - HTTP::listen_properties(listener.stream(), &local_address); + let listen_properties = HTTP::listen_properties_from_listener(&listener)?; let (tx, rx) = tokio::sync::mpsc::channel(10); let resource: Rc = Rc::new(HttpJoinHandle( @@ -915,8 +902,7 @@ where let listen_properties_clone: HttpListenProperties = listen_properties.clone(); let handle = spawn(async move { loop { - let conn = listener - .accept() + let conn = HTTP::accept_connection_from_listener(&listener) .try_or_cancel(cancel_clone.clone()) .await?; serve_http_on::( @@ -945,17 +931,15 @@ where #[op(v8)] pub fn op_http_serve_on( state: Rc>, - conn: ResourceId, + connection_rid: ResourceId, ) -> Result<(ResourceId, &'static str, String), AnyError> where HTTP: HttpPropertyExtractor, { - let network_stream: NetworkStream = - HTTP::get_network_stream_for_rid(&mut state.borrow_mut(), conn)?; + let connection = + HTTP::get_connection_for_rid(&mut state.borrow_mut(), connection_rid)?; - let local_address = network_stream.local_address()?; - let listen_properties = - HTTP::listen_properties(network_stream.stream(), &local_address); + let listen_properties = HTTP::listen_properties_from_connection(&connection)?; let (tx, rx) = tokio::sync::mpsc::channel(10); let resource: Rc = Rc::new(HttpJoinHandle( @@ -966,7 +950,7 @@ where let handle: JoinHandle> = serve_http_on::( - network_stream, + connection, &listen_properties, resource.cancel_handle(), tx, diff --git a/ext/http/request_properties.rs b/ext/http/request_properties.rs index 9c0c0e8152..905139673e 100644 --- a/ext/http/request_properties.rs +++ b/ext/http/request_properties.rs @@ -1,10 +1,10 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. use deno_core::error::AnyError; use deno_core::OpState; use deno_core::ResourceId; -use deno_net::raw::NetworkStream; -// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. use deno_net::raw::take_network_stream_listener_resource; use deno_net::raw::take_network_stream_resource; +use deno_net::raw::NetworkStream; use deno_net::raw::NetworkStreamAddress; use deno_net::raw::NetworkStreamListener; use deno_net::raw::NetworkStreamType; @@ -12,23 +12,26 @@ use hyper::HeaderMap; use hyper::Uri; use hyper1::header::HOST; use std::borrow::Cow; +use std::net::Ipv4Addr; +use std::net::SocketAddr; +use std::net::SocketAddrV4; use std::rc::Rc; // TODO(mmastrac): I don't like that we have to clone this, but it's one-time setup #[derive(Clone)] pub struct HttpListenProperties { - pub stream_type: NetworkStreamType, pub scheme: &'static str, pub fallback_host: String, pub local_port: Option, + pub stream_type: NetworkStreamType, } #[derive(Clone)] pub struct HttpConnectionProperties { - pub stream_type: NetworkStreamType, pub peer_address: Rc, pub peer_port: Option, pub local_port: Option, + pub stream_type: NetworkStreamType, } pub struct HttpRequestProperties { @@ -37,31 +40,49 @@ pub struct HttpRequestProperties { /// Pluggable trait to determine listen, connection and request properties /// for embedders that wish to provide alternative routes for incoming HTTP. +#[async_trait::async_trait(?Send)] pub trait HttpPropertyExtractor { - /// Given a listener [`ResourceId`], returns the [`NetworkStreamListener`]. - fn get_network_stream_listener_for_rid( + type Listener: 'static; + type Connection; + + /// Given a listener [`ResourceId`], returns the [`HttpPropertyExtractor::Listener`]. + fn get_listener_for_rid( state: &mut OpState, listener_rid: ResourceId, - ) -> Result; + ) -> Result; - /// Given a connection [`ResourceId`], returns the [`NetworkStream`]. - fn get_network_stream_for_rid( + /// Given a connection [`ResourceId`], returns the [`HttpPropertyExtractor::Connection`]. + fn get_connection_for_rid( state: &mut OpState, - rid: ResourceId, - ) -> Result; + connection_rid: ResourceId, + ) -> Result; /// Determines the listener properties. - fn listen_properties( - stream_type: NetworkStreamType, - local_address: &NetworkStreamAddress, - ) -> HttpListenProperties; + fn listen_properties_from_listener( + listener: &Self::Listener, + ) -> Result; + + /// Determines the listener properties given a [`HttpPropertyExtractor::Connection`]. + fn listen_properties_from_connection( + connection: &Self::Connection, + ) -> Result; + + /// Accept a new [`HttpPropertyExtractor::Connection`] from the given listener [`HttpPropertyExtractor::Listener`]. + async fn accept_connection_from_listener( + listener: &Self::Listener, + ) -> Result; /// Determines the connection properties. fn connection_properties( listen_properties: &HttpListenProperties, - peer_address: &NetworkStreamAddress, + connection: &Self::Connection, ) -> HttpConnectionProperties; + /// Turn a given [`HttpPropertyExtractor::Connection`] into a [`NetworkStream`]. + fn to_network_stream_from_connection( + connection: Self::Connection, + ) -> NetworkStream; + /// Determines the request properties. fn request_properties( connection_properties: &HttpConnectionProperties, @@ -72,15 +93,13 @@ pub trait HttpPropertyExtractor { pub struct DefaultHttpPropertyExtractor {} +#[async_trait::async_trait(?Send)] impl HttpPropertyExtractor for DefaultHttpPropertyExtractor { - fn get_network_stream_for_rid( - state: &mut OpState, - rid: ResourceId, - ) -> Result { - take_network_stream_resource(&mut state.resource_table, rid) - } + type Listener = NetworkStreamListener; - fn get_network_stream_listener_for_rid( + type Connection = NetworkStream; + + fn get_listener_for_rid( state: &mut OpState, listener_rid: ResourceId, ) -> Result { @@ -90,30 +109,52 @@ impl HttpPropertyExtractor for DefaultHttpPropertyExtractor { ) } - fn listen_properties( - stream_type: NetworkStreamType, - local_address: &NetworkStreamAddress, - ) -> HttpListenProperties { - let scheme = req_scheme_from_stream_type(stream_type); - let fallback_host = req_host_from_addr(stream_type, local_address); - let local_port: Option = match local_address { - NetworkStreamAddress::Ip(ip) => Some(ip.port()), - #[cfg(unix)] - NetworkStreamAddress::Unix(_) => None, - }; + fn get_connection_for_rid( + state: &mut OpState, + stream_rid: ResourceId, + ) -> Result { + take_network_stream_resource(&mut state.resource_table, stream_rid) + } - HttpListenProperties { - scheme, - fallback_host, - local_port, - stream_type, - } + async fn accept_connection_from_listener( + listener: &NetworkStreamListener, + ) -> Result { + listener.accept().await.map_err(Into::into) + } + + fn listen_properties_from_listener( + listener: &NetworkStreamListener, + ) -> Result { + let stream_type = listener.stream(); + let local_address = listener.listen_address()?; + listener_properties(stream_type, local_address) + } + + fn listen_properties_from_connection( + connection: &Self::Connection, + ) -> Result { + let stream_type = connection.stream(); + let local_address = connection.local_address()?; + listener_properties(stream_type, local_address) + } + + fn to_network_stream_from_connection( + connection: Self::Connection, + ) -> NetworkStream { + connection } fn connection_properties( listen_properties: &HttpListenProperties, - peer_address: &NetworkStreamAddress, + connection: &NetworkStream, ) -> HttpConnectionProperties { + // We always want some sort of peer address. If we can't get one, just make up one. + let peer_address = connection.peer_address().unwrap_or_else(|_| { + NetworkStreamAddress::Ip(SocketAddr::V4(SocketAddrV4::new( + Ipv4Addr::new(0, 0, 0, 0), + 0, + ))) + }); let peer_port: Option = match peer_address { NetworkStreamAddress::Ip(ip) => Some(ip.port()), #[cfg(unix)] @@ -128,10 +169,10 @@ impl HttpPropertyExtractor for DefaultHttpPropertyExtractor { let stream_type = listen_properties.stream_type; HttpConnectionProperties { - stream_type, peer_address, peer_port, local_port, + stream_type, } } @@ -152,6 +193,25 @@ impl HttpPropertyExtractor for DefaultHttpPropertyExtractor { } } +fn listener_properties( + stream_type: NetworkStreamType, + local_address: NetworkStreamAddress, +) -> Result { + let scheme = req_scheme_from_stream_type(stream_type); + let fallback_host = req_host_from_addr(stream_type, &local_address); + let local_port: Option = match local_address { + NetworkStreamAddress::Ip(ip) => Some(ip.port()), + #[cfg(unix)] + NetworkStreamAddress::Unix(_) => None, + }; + Ok(HttpListenProperties { + scheme, + fallback_host, + local_port, + stream_type, + }) +} + /// Compute the fallback address from the [`NetworkStreamListenAddress`]. If the request has no authority/host in /// its URI, and there is no [`HeaderName::HOST`] header, we fall back to this. fn req_host_from_addr( diff --git a/ext/net/raw.rs b/ext/net/raw.rs index 3b50af41e0..3f230a08ba 100644 --- a/ext/net/raw.rs +++ b/ext/net/raw.rs @@ -179,7 +179,7 @@ pub enum NetworkStreamAddress { impl NetworkStreamListener { /// Accepts a connection on this listener. - pub async fn accept(&self) -> Result { + pub async fn accept(&self) -> Result { Ok(match self { Self::Tcp(tcp) => { let (stream, _addr) = tcp.accept().await?;