1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-21 21:50:00 -05:00

refactor: use hyper 1.0 in WS test server (#21698)

This commit is contained in:
Bartek Iwańczuk 2023-12-25 17:38:48 +01:00 committed by GitHub
parent 60da9d493c
commit d1f4d81dcf
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 57 additions and 49 deletions

3
Cargo.lock generated
View file

@ -5983,13 +5983,16 @@ dependencies = [
"console_static_text", "console_static_text",
"denokv_proto", "denokv_proto",
"fastwebsockets 0.5.0", "fastwebsockets 0.5.0",
"fastwebsockets 0.6.0",
"flate2", "flate2",
"futures", "futures",
"glob", "glob",
"h2 0.3.22", "h2 0.3.22",
"h2 0.4.0", "h2 0.4.0",
"http 1.0.0",
"hyper 0.14.27", "hyper 0.14.27",
"hyper 1.1.0", "hyper 1.1.0",
"hyper-util",
"lazy-regex", "lazy-regex",
"libc", "libc",
"lsp-types", "lsp-types",

View file

@ -21,12 +21,15 @@ bytes.workspace = true
console_static_text.workspace = true console_static_text.workspace = true
denokv_proto.workspace = true denokv_proto.workspace = true
fastwebsockets = { workspace = true, features = ["upgrade"] } fastwebsockets = { workspace = true, features = ["upgrade"] }
fastwebsockets_06 = { package = "fastwebsockets", version = "0.6", features = ["upgrade"] }
flate2 = { workspace = true, features = ["default"] } flate2 = { workspace = true, features = ["default"] }
futures.workspace = true futures.workspace = true
glob.workspace = true glob.workspace = true
h2.workspace = true h2.workspace = true
h2_04 = { package = "h2", version = "0.4" } h2_04 = { package = "h2", version = "0.4" }
http_1 = { package = "http", version = "1.0" }
hyper = { workspace = true, features = ["server", "http1", "http2", "runtime"] } hyper = { workspace = true, features = ["server", "http1", "http2", "runtime"] }
hyper-util.workspace = true
hyper1.workspace = true hyper1.workspace = true
lazy-regex.workspace = true lazy-regex.workspace = true
libc.workspace = true libc.workspace = true

View file

@ -2,26 +2,25 @@
use anyhow::anyhow; use anyhow::anyhow;
use bytes::Bytes; use bytes::Bytes;
use fastwebsockets::FragmentCollector; use fastwebsockets_06::FragmentCollector;
use fastwebsockets::Frame; use fastwebsockets_06::Frame;
use fastwebsockets::OpCode; use fastwebsockets_06::OpCode;
use fastwebsockets::Role; use fastwebsockets_06::Role;
use fastwebsockets::WebSocket; use fastwebsockets_06::WebSocket;
use futures::future::join3; use futures::future::join3;
use futures::future::poll_fn; use futures::future::poll_fn;
use futures::Future; use futures::Future;
use futures::StreamExt; use futures::StreamExt;
use h2::server::Handshake; use h2_04::server::Handshake;
use h2::server::SendResponse; use h2_04::server::SendResponse;
use h2::Reason; use h2_04::Reason;
use h2::RecvStream; use h2_04::RecvStream;
use hyper::service::service_fn; use hyper1::upgrade::Upgraded;
use hyper::upgrade::Upgraded; use hyper1::Method;
use hyper::Body; use hyper1::Request;
use hyper::Method; use hyper1::Response;
use hyper::Request; use hyper1::StatusCode;
use hyper::Response; use hyper_util::rt::TokioIo;
use hyper::StatusCode;
use pretty_assertions::assert_eq; use pretty_assertions::assert_eq;
use std::pin::Pin; use std::pin::Pin;
use std::result::Result; use std::result::Result;
@ -71,7 +70,7 @@ pub async fn run_wss2_server(port: u16) {
.await; .await;
while let Some(Ok(tls)) = tls.next().await { while let Some(Ok(tls)) = tls.next().await {
tokio::spawn(async move { tokio::spawn(async move {
let mut h2 = h2::server::Builder::new(); let mut h2 = h2_04::server::Builder::new();
h2.enable_connect_protocol(); h2.enable_connect_protocol();
// Using Bytes is pretty alloc-heavy but this is a test server // Using Bytes is pretty alloc-heavy but this is a test server
let server: Handshake<_, Bytes> = h2.handshake(tls); let server: Handshake<_, Bytes> = h2.handshake(tls);
@ -100,15 +99,15 @@ pub async fn run_wss2_server(port: u16) {
} }
async fn echo_websocket_handler( async fn echo_websocket_handler(
ws: fastwebsockets::WebSocket<Upgraded>, ws: fastwebsockets_06::WebSocket<TokioIo<Upgraded>>,
) -> Result<(), anyhow::Error> { ) -> Result<(), anyhow::Error> {
let mut ws = fastwebsockets::FragmentCollector::new(ws); let mut ws = FragmentCollector::new(ws);
loop { loop {
let frame = ws.read_frame().await.unwrap(); let frame = ws.read_frame().await.unwrap();
match frame.opcode { match frame.opcode {
fastwebsockets::OpCode::Close => break, OpCode::Close => break,
fastwebsockets::OpCode::Text | fastwebsockets::OpCode::Binary => { OpCode::Text | OpCode::Binary => {
ws.write_frame(frame).await.unwrap(); ws.write_frame(frame).await.unwrap();
} }
_ => {} _ => {}
@ -120,37 +119,42 @@ async fn echo_websocket_handler(
type WsHandler = type WsHandler =
fn( fn(
fastwebsockets::WebSocket<Upgraded>, fastwebsockets_06::WebSocket<TokioIo<Upgraded>>,
) -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + Send>>; ) -> Pin<Box<dyn Future<Output = Result<(), anyhow::Error>> + Send>>;
fn spawn_ws_server<S>(stream: S, handler: WsHandler) fn spawn_ws_server<S>(stream: S, handler: WsHandler)
where where
S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static, S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static,
{ {
let srv_fn = service_fn(move |mut req: Request<Body>| async move { let service = hyper1::service::service_fn(
let (response, upgrade_fut) = fastwebsockets::upgrade::upgrade(&mut req) move |mut req: http_1::Request<hyper1::body::Incoming>| async move {
.map_err(|e| anyhow!("Error upgrading websocket connection: {}", e))?; let (response, upgrade_fut) =
fastwebsockets_06::upgrade::upgrade(&mut req).map_err(|e| {
anyhow!("Error upgrading websocket connection: {}", e)
})?;
tokio::spawn(async move { tokio::spawn(async move {
let ws = upgrade_fut let ws = upgrade_fut
.await .await
.map_err(|e| anyhow!("Error upgrading websocket connection: {}", e)) .map_err(|e| anyhow!("Error upgrading websocket connection: {}", e))
.unwrap(); .unwrap();
if let Err(e) = handler(ws).await { if let Err(e) = handler(ws).await {
eprintln!("Error in websocket connection: {}", e); eprintln!("Error in websocket connection: {}", e);
} }
}); });
Ok::<_, anyhow::Error>(response) Ok::<_, anyhow::Error>(response)
}); },
);
let io = TokioIo::new(stream);
tokio::spawn(async move { tokio::spawn(async move {
let conn_fut = hyper::server::conn::Http::new() let conn = hyper1::server::conn::http1::Builder::new()
.serve_connection(stream, srv_fn) .serve_connection(io, service)
.with_upgrades(); .with_upgrades();
if let Err(e) = conn_fut.await { if let Err(e) = conn.await {
eprintln!("websocket server error: {e:?}"); eprintln!("websocket server error: {e:?}");
} }
}); });
@ -159,13 +163,13 @@ where
async fn handle_wss_stream( async fn handle_wss_stream(
recv: Request<RecvStream>, recv: Request<RecvStream>,
mut send: SendResponse<Bytes>, mut send: SendResponse<Bytes>,
) -> Result<(), h2::Error> { ) -> Result<(), h2_04::Error> {
if recv.method() != Method::CONNECT { if recv.method() != Method::CONNECT {
eprintln!("wss2: refusing non-CONNECT stream"); eprintln!("wss2: refusing non-CONNECT stream");
send.send_reset(Reason::REFUSED_STREAM); send.send_reset(Reason::REFUSED_STREAM);
return Ok(()); return Ok(());
} }
let Some(protocol) = recv.extensions().get::<h2::ext::Protocol>() else { let Some(protocol) = recv.extensions().get::<h2_04::ext::Protocol>() else {
eprintln!("wss2: refusing no-:protocol stream"); eprintln!("wss2: refusing no-:protocol stream");
send.send_reset(Reason::REFUSED_STREAM); send.send_reset(Reason::REFUSED_STREAM);
return Ok(()); return Ok(());
@ -224,11 +228,11 @@ async fn handle_wss_stream(
} }
async fn close_websocket_handler( async fn close_websocket_handler(
ws: fastwebsockets::WebSocket<Upgraded>, ws: fastwebsockets_06::WebSocket<TokioIo<Upgraded>>,
) -> Result<(), anyhow::Error> { ) -> Result<(), anyhow::Error> {
let mut ws = fastwebsockets::FragmentCollector::new(ws); let mut ws = FragmentCollector::new(ws);
ws.write_frame(fastwebsockets::Frame::close_raw(vec![].into())) ws.write_frame(Frame::close_raw(vec![].into()))
.await .await
.unwrap(); .unwrap();
@ -236,9 +240,9 @@ async fn close_websocket_handler(
} }
async fn ping_websocket_handler( async fn ping_websocket_handler(
ws: fastwebsockets::WebSocket<Upgraded>, ws: fastwebsockets_06::WebSocket<TokioIo<Upgraded>>,
) -> Result<(), anyhow::Error> { ) -> Result<(), anyhow::Error> {
let mut ws = fastwebsockets::FragmentCollector::new(ws); let mut ws = FragmentCollector::new(ws);
for i in 0..9 { for i in 0..9 {
ws.write_frame(Frame::new(true, OpCode::Ping, None, vec![].into())) ws.write_frame(Frame::new(true, OpCode::Ping, None, vec![].into()))
@ -260,9 +264,7 @@ async fn ping_websocket_handler(
assert_eq!(frame.payload, format!("hello {}", i).as_bytes()); assert_eq!(frame.payload, format!("hello {}", i).as_bytes());
} }
ws.write_frame(fastwebsockets::Frame::close(1000, b"")) ws.write_frame(Frame::close(1000, b"")).await.unwrap();
.await
.unwrap();
Ok(()) Ok(())
} }