1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-21 21:50:00 -05:00

perf(ext/http): fast path for uncompressed bodies (#14366)

This commit is contained in:
Divy Srivastava 2022-04-25 08:13:22 +05:30 committed by GitHub
parent e2fba7b967
commit 6dcf3a447c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -343,6 +343,7 @@ impl Default for HttpRequestReader {
enum HttpResponseWriter { enum HttpResponseWriter {
Headers(oneshot::Sender<Response<Body>>), Headers(oneshot::Sender<Response<Body>>),
Body(Pin<Box<dyn tokio::io::AsyncWrite>>), Body(Pin<Box<dyn tokio::io::AsyncWrite>>),
BodyUncompressed(hyper::body::Sender),
Closed, Closed,
} }
@ -638,17 +639,15 @@ async fn op_http_write_headers(
} }
None => { None => {
// If no buffer was passed, the caller will stream the response body. // If no buffer was passed, the caller will stream the response body.
// Create a one way pipe that implements tokio's async io traits. To do
// this we create a [tokio::io::DuplexStream], but then throw away one
// of the directions to create a one way pipe.
let (a, b) = tokio::io::duplex(64 * 1024);
let (reader, _) = tokio::io::split(a);
let (_, writer) = tokio::io::split(b);
let writer_body: Pin<Box<dyn tokio::io::AsyncWrite>>;
if should_compress { if should_compress {
// Create a one way pipe that implements tokio's async io traits. To do
// this we create a [tokio::io::DuplexStream], but then throw away one
// of the directions to create a one way pipe.
let (a, b) = tokio::io::duplex(64 * 1024);
let (reader, _) = tokio::io::split(a);
let (_, writer) = tokio::io::split(b);
let writer_body: Pin<Box<dyn tokio::io::AsyncWrite>>;
match *stream.accept_encoding.borrow() { match *stream.accept_encoding.borrow() {
Encoding::Brotli => { Encoding::Brotli => {
let writer = BrotliEncoder::new(writer); let writer = BrotliEncoder::new(writer);
@ -662,12 +661,14 @@ async fn op_http_write_headers(
builder = builder.header("content-encoding", "gzip"); builder = builder.header("content-encoding", "gzip");
} }
} }
} else {
writer_body = Box::pin(writer);
}
body = builder.body(Body::wrap_stream(ReaderStream::new(reader)))?; body = builder.body(Body::wrap_stream(ReaderStream::new(reader)))?;
new_wr = HttpResponseWriter::Body(writer_body); new_wr = HttpResponseWriter::Body(writer_body);
} else {
let (body_tx, body_rx) = Body::channel();
body = builder.body(body_rx)?;
new_wr = HttpResponseWriter::BodyUncompressed(body_tx);
}
} }
} }
@ -699,14 +700,14 @@ async fn op_http_write_resource(
let mut wr = RcRef::map(&http_stream, |r| &r.wr).borrow_mut().await; let mut wr = RcRef::map(&http_stream, |r| &r.wr).borrow_mut().await;
let resource = state.borrow().resource_table.get_any(stream)?; let resource = state.borrow().resource_table.get_any(stream)?;
loop { loop {
let body_writer = match &mut *wr { match *wr {
HttpResponseWriter::Body(body_writer) => body_writer,
HttpResponseWriter::Headers(_) => { HttpResponseWriter::Headers(_) => {
return Err(http_error("no response headers")) return Err(http_error("no response headers"))
} }
HttpResponseWriter::Closed => { HttpResponseWriter::Closed => {
return Err(http_error("response already completed")) return Err(http_error("response already completed"))
} }
_ => {}
}; };
let vec = vec![0u8; 64 * 1024]; // 64KB let vec = vec![0u8; 64 * 1024]; // 64KB
@ -715,17 +716,29 @@ async fn op_http_write_resource(
if nread == 0 { if nread == 0 {
break; break;
} }
match body_writer.write_all(&buf[..nread]).await {
Ok(_) => {} match &mut *wr {
Err(err) => { HttpResponseWriter::Body(body) => {
assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe); if let Err(err) = body.write_all(&buf[..nread]).await {
// Don't return "broken pipe", that's an implementation detail. assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe);
// Pull up the failure associated with the transport connection instead. // Don't return "broken pipe", that's an implementation detail.
http_stream.conn.closed().await?; // Pull up the failure associated with the transport connection instead.
// If there was no connection error, drop body_tx. http_stream.conn.closed().await?;
*wr = HttpResponseWriter::Closed; // If there was no connection error, drop body_tx.
*wr = HttpResponseWriter::Closed;
}
} }
} HttpResponseWriter::BodyUncompressed(body) => {
if let Err(err) = body.send_data(Bytes::from(buf.to_temp())).await {
assert!(err.is_closed());
// Pull up the failure associated with the transport connection instead.
http_stream.conn.closed().await?;
// If there was no connection error, drop body_tx.
*wr = HttpResponseWriter::Closed;
}
}
_ => unreachable!(),
};
} }
let wr = take(&mut *wr); let wr = take(&mut *wr);
@ -756,30 +769,42 @@ async fn op_http_write(
let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await; let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await;
loop { loop {
let body_writer = match &mut *wr { match &mut *wr {
HttpResponseWriter::Body(body_tx) => body_tx,
HttpResponseWriter::Headers(_) => { HttpResponseWriter::Headers(_) => {
break Err(http_error("no response headers")) break Err(http_error("no response headers"))
} }
HttpResponseWriter::Closed => { HttpResponseWriter::Closed => {
break Err(http_error("response already completed")) break Err(http_error("response already completed"))
} }
}; HttpResponseWriter::Body(body) => {
let mut result = body.write_all(&buf).await;
let mut res = body_writer.write_all(&buf).await; if result.is_ok() {
if res.is_ok() { result = body.flush().await;
res = body_writer.flush().await; }
} match result {
Ok(_) => break Ok(()),
match res { Err(err) => {
Ok(_) => break Ok(()), assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe);
Err(err) => { // Don't return "broken pipe", that's an implementation detail.
assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe); // Pull up the failure associated with the transport connection instead.
// Don't return "broken pipe", that's an implementation detail. stream.conn.closed().await?;
// Pull up the failure associated with the transport connection instead. // If there was no connection error, drop body_tx.
stream.conn.closed().await?; *wr = HttpResponseWriter::Closed;
// If there was no connection error, drop body_tx. }
*wr = HttpResponseWriter::Closed; }
}
HttpResponseWriter::BodyUncompressed(body) => {
let bytes = Bytes::copy_from_slice(&buf[..]);
match body.send_data(bytes).await {
Ok(_) => break Ok(()),
Err(err) => {
assert!(err.is_closed());
// Pull up the failure associated with the transport connection instead.
stream.conn.closed().await?;
// If there was no connection error, drop body_tx.
*wr = HttpResponseWriter::Closed;
}
}
} }
} }
} }