From 8b258070542a81d217226fe832b26d81cf20113d Mon Sep 17 00:00:00 2001 From: Luca Casonato Date: Wed, 20 Apr 2022 22:53:56 +0200 Subject: [PATCH] feat(ext/http): stream auto resp body compression (#14325) This commit adds support for auto response body compression for streaming bodies. --- Cargo.lock | 1 + cli/tests/unit/http_test.ts | 161 ++++++++++++++++++++++++------ ext/http/Cargo.toml | 1 + ext/http/lib.rs | 189 +++++++++++++++++++++++------------- serde_v8/magic/buffer.rs | 8 -- 5 files changed, 256 insertions(+), 104 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 76f8ca0281..794c7d8758 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -964,6 +964,7 @@ dependencies = [ name = "deno_http" version = "0.41.0" dependencies = [ + "async-compression", "base64 0.13.0", "brotli", "bytes", diff --git a/cli/tests/unit/http_test.ts b/cli/tests/unit/http_test.ts index f48f314db8..5fabd40fea 100644 --- a/cli/tests/unit/http_test.ts +++ b/cli/tests/unit/http_test.ts @@ -1224,26 +1224,25 @@ Deno.test( const decoder = new TextDecoder(); Deno.test({ - name: "http server compresses body", + name: "http server compresses body - check headers", permissions: { net: true, run: true }, async fn() { const hostname = "localhost"; const port = 4501; + const listener = Deno.listen({ hostname, port }); + + const data = { hello: "deno", now: "with", compressed: "body" }; async function server() { - const listener = Deno.listen({ hostname, port }); const tcpConn = await listener.accept(); const httpConn = Deno.serveHttp(tcpConn); const e = await httpConn.nextRequest(); assert(e); const { request, respondWith } = e; assertEquals(request.headers.get("Accept-Encoding"), "gzip, deflate, br"); - const response = new Response( - JSON.stringify({ hello: "deno", now: "with", compressed: "body" }), - { - headers: { "content-type": "application/json" }, - }, - ); + const response = new Response(JSON.stringify(data), { + headers: { "content-type": "application/json" }, + }); await respondWith(response); httpConn.close(); listener.close(); @@ -1274,6 +1273,60 @@ Deno.test({ }, }); +Deno.test({ + name: "http server compresses body - check body", + permissions: { net: true, run: true }, + async fn() { + const hostname = "localhost"; + const port = 4501; + const listener = Deno.listen({ hostname, port }); + + const data = { hello: "deno", now: "with", compressed: "body" }; + + async function server() { + const tcpConn = await listener.accept(); + const httpConn = Deno.serveHttp(tcpConn); + const e = await httpConn.nextRequest(); + assert(e); + const { request, respondWith } = e; + assertEquals(request.headers.get("Accept-Encoding"), "gzip, deflate, br"); + const response = new Response(JSON.stringify(data), { + headers: { "content-type": "application/json" }, + }); + await respondWith(response); + httpConn.close(); + listener.close(); + } + + async function client() { + const url = `http://${hostname}:${port}/`; + const cmd = [ + "curl", + "--request", + "GET", + "--url", + url, + "--header", + "Accept-Encoding: gzip, deflate, br", + ]; + const proc = Deno.run({ cmd, stdout: "piped", stderr: "null" }); + const status = await proc.status(); + assert(status.success); + const stdout = proc.stdout!.readable + .pipeThrough(new DecompressionStream("gzip")) + .pipeThrough(new TextDecoderStream()); + let body = ""; + for await (const chunk of stdout) { + body += chunk; + } + assertEquals(JSON.parse(body), data); + proc.close(); + } + + await Promise.all([server(), client()]); + }, +}); + Deno.test({ name: "http server doesn't compress small body", permissions: { net: true, run: true }, @@ -1653,15 +1706,18 @@ Deno.test({ }); Deno.test({ - name: "http server doesn't compress streamed bodies", + name: "http server compresses streamed bodies - check headers", permissions: { net: true, run: true }, async fn() { const hostname = "localhost"; const port = 4501; + const encoder = new TextEncoder(); + const listener = Deno.listen({ hostname, port }); + + const data = { hello: "deno", now: "with", compressed: "body" }; + async function server() { - const encoder = new TextEncoder(); - const listener = Deno.listen({ hostname, port }); const tcpConn = await listener.accept(); const httpConn = Deno.serveHttp(tcpConn); const e = await httpConn.nextRequest(); @@ -1670,23 +1726,13 @@ Deno.test({ assertEquals(request.headers.get("Accept-Encoding"), "gzip, deflate, br"); const bodyInit = new ReadableStream({ start(controller) { - controller.enqueue( - encoder.encode( - JSON.stringify({ - hello: "deno", - now: "with", - compressed: "body", - }), - ), - ); + controller.enqueue(encoder.encode(JSON.stringify(data))); controller.close(); }, }); const response = new Response( bodyInit, - { - headers: { "content-type": "application/json", vary: "Accept" }, - }, + { headers: { "content-type": "application/json" } }, ); await respondWith(response); httpConn.close(); @@ -1709,8 +1755,71 @@ Deno.test({ const status = await proc.status(); assert(status.success); const output = decoder.decode(await proc.output()); - assert(output.includes("vary: Accept\r\n")); - assert(!output.includes("content-encoding: ")); + assert(output.includes("vary: Accept-Encoding\r\n")); + assert(output.includes("content-encoding: gzip\r\n")); + proc.close(); + } + + await Promise.all([server(), client()]); + }, +}); + +Deno.test({ + name: "http server compresses streamed bodies - check body", + permissions: { net: true, run: true }, + async fn() { + const hostname = "localhost"; + const port = 4501; + + const encoder = new TextEncoder(); + const listener = Deno.listen({ hostname, port }); + + const data = { hello: "deno", now: "with", compressed: "body" }; + + async function server() { + const tcpConn = await listener.accept(); + const httpConn = Deno.serveHttp(tcpConn); + const e = await httpConn.nextRequest(); + assert(e); + const { request, respondWith } = e; + assertEquals(request.headers.get("Accept-Encoding"), "gzip, deflate, br"); + const bodyInit = new ReadableStream({ + start(controller) { + controller.enqueue(encoder.encode(JSON.stringify(data))); + controller.close(); + }, + }); + const response = new Response( + bodyInit, + { headers: { "content-type": "application/json" } }, + ); + await respondWith(response); + httpConn.close(); + listener.close(); + } + + async function client() { + const url = `http://${hostname}:${port}/`; + const cmd = [ + "curl", + "--request", + "GET", + "--url", + url, + "--header", + "Accept-Encoding: gzip, deflate, br", + ]; + const proc = Deno.run({ cmd, stdout: "piped", stderr: "null" }); + const status = await proc.status(); + assert(status.success); + const stdout = proc.stdout.readable + .pipeThrough(new DecompressionStream("gzip")) + .pipeThrough(new TextDecoderStream()); + let body = ""; + for await (const chunk of stdout) { + body += chunk; + } + assertEquals(JSON.parse(body), data); proc.close(); } @@ -1775,8 +1884,6 @@ Deno.test({ // Ensure the content-length header is updated. assert(!output.includes(`content-length: ${contentLength}\r\n`)); assert(output.includes("content-length: 72\r\n")); - console.log(output); - proc.close(); } diff --git a/ext/http/Cargo.toml b/ext/http/Cargo.toml index 2bdbfdade7..b4a2082282 100644 --- a/ext/http/Cargo.toml +++ b/ext/http/Cargo.toml @@ -14,6 +14,7 @@ description = "HTTP server implementation for Deno" path = "lib.rs" [dependencies] +async-compression = { version = "0.3.1", features = ["tokio", "brotli", "gzip"] } base64 = "0.13.0" brotli = "3.3.3" bytes = "1" diff --git a/ext/http/lib.rs b/ext/http/lib.rs index b85dcc4736..a6f47c1c95 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -1,6 +1,7 @@ // Copyright 2018-2022 the Deno authors. All rights reserved. MIT license. -use bytes::Bytes; +use async_compression::tokio::write::BrotliEncoder; +use async_compression::tokio::write::GzipEncoder; use cache_control::CacheControl; use deno_core::error::custom_error; use deno_core::error::AnyError; @@ -21,7 +22,6 @@ use deno_core::futures::StreamExt; use deno_core::futures::TryFutureExt; use deno_core::include_js_files; use deno_core::op; - use deno_core::AsyncRefCell; use deno_core::ByteString; use deno_core::CancelFuture; @@ -60,7 +60,9 @@ use std::task::Context; use std::task::Poll; use tokio::io::AsyncRead; use tokio::io::AsyncWrite; +use tokio::io::AsyncWriteExt; use tokio::task::spawn_local; +use tokio_util::io::ReaderStream; mod compressible; @@ -339,7 +341,7 @@ impl Default for HttpRequestReader { /// The write half of an HTTP stream. enum HttpResponseWriter { Headers(oneshot::Sender>), - Body(hyper::body::Sender), + Body(Pin>), Closed, } @@ -546,55 +548,60 @@ async fn op_http_write_headers( let body: Response; let new_wr: HttpResponseWriter; - match data { - Some(data) => { - // Set Vary: Accept-Encoding header for direct body response. - // Note: we set the header irrespective of whether or not we compress the - // data to make sure cache services do not serve uncompressed data to - // clients that support compression. - let vary_value = if let Some(value) = vary_header { - if let Ok(value_str) = std::str::from_utf8(value.as_slice()) { - if !value_str.to_lowercase().contains("accept-encoding") { - format!("Accept-Encoding, {}", value_str) - } else { - value_str.to_string() - } + // Set Vary: Accept-Encoding header for direct body response. + // Note: we set the header irrespective of whether or not we compress the data + // to make sure cache services do not serve uncompressed data to clients that + // support compression. + let vary_value = if let Some(value) = vary_header { + if let Ok(value_str) = std::str::from_utf8(value.as_slice()) { + if !value_str.to_lowercase().contains("accept-encoding") { + format!("Accept-Encoding, {}", value_str) + } else { + value_str.to_string() + } + } else { + // the header value wasn't valid UTF8, so it would have been a + // problem anyways, so sending a default header. + "Accept-Encoding".to_string() + } + } else { + "Accept-Encoding".to_string() + }; + builder = builder.header("vary", &vary_value); + + let accepts_compression = matches!( + *stream.accept_encoding.borrow(), + Encoding::Brotli | Encoding::Gzip + ); + let should_compress = body_compressible + && (matches!(data, Some(ref data) if data.len() > 20) || data.is_none()) + && accepts_compression; + + if should_compress { + // If user provided a ETag header for uncompressed data, we need to + // ensure it is a Weak Etag header ("W/"). + if let Some(value) = etag_header { + if let Ok(value_str) = std::str::from_utf8(value.as_slice()) { + if !value_str.starts_with("W/") { + builder = builder.header("etag", format!("W/{}", value_str)); } else { - // the header value wasn't valid UTF8, so it would have been a - // problem anyways, so sending a default header. - "Accept-Encoding".to_string() + builder = builder.header("etag", value.as_slice()); } } else { - "Accept-Encoding".to_string() - }; - builder = builder.header("vary", &vary_value); - - let accepts_compression = matches!( - *stream.accept_encoding.borrow(), - Encoding::Brotli | Encoding::Gzip - ); - - let should_compress = - body_compressible && data.len() > 20 && accepts_compression; + builder = builder.header("etag", value.as_slice()); + } + } + } else if let Some(value) = etag_header { + builder = builder.header("etag", value.as_slice()); + } + match data { + Some(data) => { if should_compress { // Drop 'content-length' header. Hyper will update it using compressed body. if let Some(headers) = builder.headers_mut() { headers.remove("content-length"); } - // If user provided a ETag header for uncompressed data, we need to - // ensure it is a Weak Etag header ("W/"). - if let Some(value) = etag_header { - if let Ok(value_str) = std::str::from_utf8(value.as_slice()) { - if !value_str.starts_with("W/") { - builder = builder.header("etag", format!("W/{}", value_str)); - } else { - builder = builder.header("etag", value.as_slice()); - } - } else { - builder = builder.header("etag", value.as_slice()); - } - } match *stream.accept_encoding.borrow() { Encoding::Brotli => { @@ -622,9 +629,6 @@ async fn op_http_write_headers( } } } else { - if let Some(value) = etag_header { - builder = builder.header("etag", value.as_slice()); - } // If a buffer was passed, but isn't compressible, we use it to // construct a response body. body = builder.body(data.into_bytes().into())?; @@ -634,19 +638,35 @@ async fn op_http_write_headers( None => { // If no buffer was passed, the caller will stream the response body. - // TODO(@kitsonk) had compression for streamed bodies. + // Create a one way pipe that implements tokio's async io traits. To do + // this we create a [tokio::io::DuplexStream], but then throw away one + // of the directions to create a one way pipe. + let (a, b) = tokio::io::duplex(64 * 1024); + let (reader, _) = tokio::io::split(a); + let (_, writer) = tokio::io::split(b); - // Set the user provided ETag & Vary headers for a streaming response - if let Some(value) = etag_header { - builder = builder.header("etag", value.as_slice()); - } - if let Some(value) = vary_header { - builder = builder.header("vary", value.as_slice()); + let writer_body: Pin>; + + if should_compress { + match *stream.accept_encoding.borrow() { + Encoding::Brotli => { + let writer = BrotliEncoder::new(writer); + writer_body = Box::pin(writer); + builder = builder.header("content-encoding", "br"); + } + _ => { + assert_eq!(*stream.accept_encoding.borrow(), Encoding::Gzip); + let writer = GzipEncoder::new(writer); + writer_body = Box::pin(writer); + builder = builder.header("content-encoding", "gzip"); + } + } + } else { + writer_body = Box::pin(writer); } - let (body_tx, body_rx) = Body::channel(); - body = builder.body(body_rx)?; - new_wr = HttpResponseWriter::Body(body_tx); + body = builder.body(Body::wrap_stream(ReaderStream::new(reader)))?; + new_wr = HttpResponseWriter::Body(writer_body); } } @@ -678,8 +698,8 @@ async fn op_http_write_resource( let mut wr = RcRef::map(&http_stream, |r| &r.wr).borrow_mut().await; let resource = state.borrow().resource_table.get_any(stream)?; loop { - let body_tx = match &mut *wr { - HttpResponseWriter::Body(body_tx) => body_tx, + let body_writer = match &mut *wr { + HttpResponseWriter::Body(body_writer) => body_writer, HttpResponseWriter::Headers(_) => { return Err(http_error("no response headers")) } @@ -694,13 +714,17 @@ async fn op_http_write_resource( if nread == 0 { break; } - let bytes = Bytes::from(buf.to_temp()); - match body_tx.send_data(bytes).await { + + let mut res = body_writer.write_all(&buf).await; + if res.is_ok() { + res = body_writer.flush().await; + } + match res { Ok(_) => {} Err(err) => { - // Don't return "channel closed", that's an implementation detail. + assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe); + // Don't return "broken pipe", that's an implementation detail. // Pull up the failure associated with the transport connection instead. - assert!(err.is_closed()); http_stream.conn.closed().await?; // If there was no connection error, drop body_tx. *wr = HttpResponseWriter::Closed; @@ -708,7 +732,19 @@ async fn op_http_write_resource( } } - take(&mut *wr); + let wr = take(&mut *wr); + if let HttpResponseWriter::Body(mut body_writer) = wr { + match body_writer.shutdown().await { + Ok(_) => {} + Err(err) => { + assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe); + // Don't return "broken pipe", that's an implementation detail. + // Pull up the failure associated with the transport connection instead. + http_stream.conn.closed().await?; + } + } + } + Ok(()) } @@ -725,7 +761,7 @@ async fn op_http_write( let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await; loop { - let body_tx = match &mut *wr { + let body_writer = match &mut *wr { HttpResponseWriter::Body(body_tx) => body_tx, HttpResponseWriter::Headers(_) => { break Err(http_error("no response headers")) @@ -735,13 +771,17 @@ async fn op_http_write( } }; - let bytes = Bytes::copy_from_slice(&buf[..]); - match body_tx.send_data(bytes).await { + let mut res = body_writer.write_all(&buf).await; + if res.is_ok() { + res = body_writer.flush().await; + } + + match res { Ok(_) => break Ok(()), Err(err) => { - // Don't return "channel closed", that's an implementation detail. + assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe); + // Don't return "broken pipe", that's an implementation detail. // Pull up the failure associated with the transport connection instead. - assert!(err.is_closed()); stream.conn.closed().await?; // If there was no connection error, drop body_tx. *wr = HttpResponseWriter::Closed; @@ -763,7 +803,18 @@ async fn op_http_shutdown( .resource_table .get::(rid)?; let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await; - take(&mut *wr); + let wr = take(&mut *wr); + if let HttpResponseWriter::Body(mut body_writer) = wr { + match body_writer.shutdown().await { + Ok(_) => {} + Err(err) => { + assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe); + // Don't return "broken pipe", that's an implementation detail. + // Pull up the failure associated with the transport connection instead. + stream.conn.closed().await?; + } + } + } Ok(()) } diff --git a/serde_v8/magic/buffer.rs b/serde_v8/magic/buffer.rs index 3a8c9499b8..a0a1c974bc 100644 --- a/serde_v8/magic/buffer.rs +++ b/serde_v8/magic/buffer.rs @@ -29,14 +29,6 @@ impl MagicBuffer { pub fn new_temp(vec: Vec) -> Self { MagicBuffer::Temp(vec) } - - // TODO(@littledivy): Temporary, this needs a refactor. - pub fn to_temp(self) -> Vec { - match self { - MagicBuffer::Temp(vec) => vec, - _ => unreachable!(), - } - } } impl Clone for MagicBuffer {