diff --git a/ext/http/lib.rs b/ext/http/lib.rs index 28689654e6..e34d4b80fe 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -39,6 +39,8 @@ use flate2::write::GzEncoder; use flate2::Compression; use fly_accept_encoding::Encoding; use hyper::body::Bytes; +use hyper::header::HeaderName; +use hyper::header::HeaderValue; use hyper::server::conn::Http; use hyper::service::Service; use hyper::Body; @@ -500,177 +502,46 @@ async fn op_http_write_headers( let mut builder = Response::builder().status(status); - let mut body_compressible = false; - let mut headers_allow_compression = true; - let mut vary_header = None; - let mut etag_header = None; - let mut content_type_header = None; + // Add headers + let header_count = headers.len(); + let headers = headers.into_iter().filter_map(|(k, v)| { + let v: Vec = v.into(); + Some(( + HeaderName::try_from(k.as_slice()).ok()?, + HeaderValue::try_from(v).ok()?, + )) + }); + // Track supported encoding + let encoding = *stream.accept_encoding.borrow(); - builder.headers_mut().unwrap().reserve(headers.len()); - for (key, value) in &headers { - if key.eq_ignore_ascii_case(b"cache-control") { - if let Ok(value) = std::str::from_utf8(value) { - if let Some(cache_control) = CacheControl::from_value(value) { - // We skip compression if the cache-control header value is set to - // "no-transform" - if cache_control.no_transform { - headers_allow_compression = false; - } - } - } else { - headers_allow_compression = false; - } - } else if key.eq_ignore_ascii_case(b"content-range") { - // we skip compression if the `content-range` header value is set, as it - // indicates the contents of the body were negotiated based directly - // with the user code and we can't compress the response - headers_allow_compression = false; - } else if key.eq_ignore_ascii_case(b"content-type") && !value.is_empty() { - content_type_header = Some(value); - } else if key.eq_ignore_ascii_case(b"content-encoding") { - // we don't compress if a content-encoding header was provided - headers_allow_compression = false; - } else if key.eq_ignore_ascii_case(b"etag") && !value.is_empty() { - // we store the values of ETag and Vary and skip adding them for now, as - // we may need to modify or change. - etag_header = Some(value); - continue; - } else if key.eq_ignore_ascii_case(b"vary") && !value.is_empty() { - vary_header = Some(value); - continue; - } - builder = builder.header(key.as_slice(), value.as_slice()); - } + let hmap = builder.headers_mut().unwrap(); + hmap.reserve(header_count + 2); + hmap.extend(headers); + ensure_vary_accept_encoding(hmap); - if headers_allow_compression { - body_compressible = content_type_header - .map(compressible::is_content_compressible) - .unwrap_or_default(); - } - - let body: Response; - let new_wr: HttpResponseWriter; - - // Set Vary: Accept-Encoding header for direct body response. - // Note: we set the header irrespective of whether or not we compress the data - // to make sure cache services do not serve uncompressed data to clients that - // support compression. - let vary_value = if let Some(value) = vary_header { - if let Ok(value_str) = std::str::from_utf8(value.as_slice()) { - if !value_str.to_lowercase().contains("accept-encoding") { - format!("Accept-Encoding, {}", value_str) - } else { - value_str.to_string() - } - } else { - // the header value wasn't valid UTF8, so it would have been a - // problem anyways, so sending a default header. - "Accept-Encoding".to_string() - } - } else { - "Accept-Encoding".to_string() - }; - builder = builder.header("vary", &vary_value); - - let accepts_compression = matches!( - *stream.accept_encoding.borrow(), - Encoding::Brotli | Encoding::Gzip - ); - let should_compress = body_compressible + let accepts_compression = + matches!(encoding, Encoding::Brotli | Encoding::Gzip); + let compressing = accepts_compression && (matches!(data, Some(ref data) if data.len() > 20) || data.is_none()) - && accepts_compression; + && should_compress(hmap); - if should_compress { - // If user provided a ETag header for uncompressed data, we need to - // ensure it is a Weak Etag header ("W/"). - if let Some(value) = etag_header { - if let Ok(value_str) = std::str::from_utf8(value.as_slice()) { - if !value_str.starts_with("W/") { - builder = builder.header("etag", format!("W/{}", value_str)); - } else { - builder = builder.header("etag", value.as_slice()); - } - } else { - builder = builder.header("etag", value.as_slice()); - } - } + if compressing { + weaken_etag(hmap); // Drop 'content-length' header. Hyper will update it using compressed body. - if let Some(headers) = builder.headers_mut() { - headers.remove("content-length"); - } - } else if let Some(value) = etag_header { - builder = builder.header("etag", value.as_slice()); + hmap.remove(hyper::header::CONTENT_LENGTH); + // Content-Encoding header + hmap.insert( + hyper::header::CONTENT_ENCODING, + HeaderValue::from_static(match encoding { + Encoding::Brotli => "br", + Encoding::Gzip => "gzip", + _ => unreachable!(), // Forbidden by accepts_compression + }), + ); } - match data { - Some(data) => { - if should_compress { - match *stream.accept_encoding.borrow() { - Encoding::Brotli => { - builder = builder.header("content-encoding", "br"); - // quality level 6 is based on google's nginx default value for - // on-the-fly compression - // https://github.com/google/ngx_brotli#brotli_comp_level - // lgwin 22 is equivalent to brotli window size of (2**22)-16 bytes - // (~4MB) - let mut writer = - brotli::CompressorWriter::new(Vec::new(), 4096, 6, 22); - writer.write_all(&data)?; - body = builder.body(writer.into_inner().into())?; - } - _ => { - assert_eq!(*stream.accept_encoding.borrow(), Encoding::Gzip); - builder = builder.header("content-encoding", "gzip"); - // Gzip, after level 1, doesn't produce significant size difference. - // Probably the reason why nginx's default gzip compression level is - // 1. - // https://nginx.org/en/docs/http/ngx_http_gzip_module.html#gzip_comp_level - let mut writer = GzEncoder::new(Vec::new(), Compression::new(1)); - writer.write_all(&data)?; - body = builder.body(writer.finish()?.into())?; - } - } - } else { - // If a buffer was passed, but isn't compressible, we use it to - // construct a response body. - body = builder.body(Bytes::copy_from_slice(&data).into())?; - } - new_wr = HttpResponseWriter::Closed; - } - None => { - // If no buffer was passed, the caller will stream the response body. - if should_compress { - // Create a one way pipe that implements tokio's async io traits. To do - // this we create a [tokio::io::DuplexStream], but then throw away one - // of the directions to create a one way pipe. - let (a, b) = tokio::io::duplex(64 * 1024); - let (reader, _) = tokio::io::split(a); - let (_, writer) = tokio::io::split(b); - - let writer_body: Pin>; - match *stream.accept_encoding.borrow() { - Encoding::Brotli => { - let writer = BrotliEncoder::new(writer); - writer_body = Box::pin(writer); - builder = builder.header("content-encoding", "br"); - } - _ => { - assert_eq!(*stream.accept_encoding.borrow(), Encoding::Gzip); - let writer = GzipEncoder::new(writer); - writer_body = Box::pin(writer); - builder = builder.header("content-encoding", "gzip"); - } - } - - body = builder.body(Body::wrap_stream(ReaderStream::new(reader)))?; - new_wr = HttpResponseWriter::Body(writer_body); - } else { - let (body_tx, body_rx) = Body::channel(); - body = builder.body(body_rx)?; - new_wr = HttpResponseWriter::BodyUncompressed(body_tx); - } - } - } + let (new_wr, body) = http_response(data, compressing, encoding)?; + let body = builder.body(body)?; let mut old_wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await; let response_tx = match replace(&mut *old_wr, new_wr) { @@ -687,6 +558,122 @@ async fn op_http_write_headers( } } +fn http_response( + data: Option, + compressing: bool, + encoding: Encoding, +) -> Result<(HttpResponseWriter, hyper::Body), AnyError> { + match data { + Some(data) if compressing => match encoding { + Encoding::Brotli => { + // quality level 6 is based on google's nginx default value for + // on-the-fly compression + // https://github.com/google/ngx_brotli#brotli_comp_level + // lgwin 22 is equivalent to brotli window size of (2**22)-16 bytes + // (~4MB) + let mut writer = brotli::CompressorWriter::new(Vec::new(), 4096, 6, 22); + writer.write_all(&data)?; + Ok((HttpResponseWriter::Closed, writer.into_inner().into())) + } + Encoding::Gzip => { + // Gzip, after level 1, doesn't produce significant size difference. + // Probably the reason why nginx's default gzip compression level is + // 1. + // https://nginx.org/en/docs/http/ngx_http_gzip_module.html#gzip_comp_level + let mut writer = GzEncoder::new(Vec::new(), Compression::new(1)); + writer.write_all(&data)?; + Ok((HttpResponseWriter::Closed, writer.finish()?.into())) + } + _ => unreachable!(), // forbidden by accepts_compression + }, + Some(data) => { + // If a buffer was passed, but isn't compressible, we use it to + // construct a response body. + Ok(( + HttpResponseWriter::Closed, + Bytes::copy_from_slice(&data).into(), + )) + } + None if compressing => { + // Create a one way pipe that implements tokio's async io traits. To do + // this we create a [tokio::io::DuplexStream], but then throw away one + // of the directions to create a one way pipe. + let (a, b) = tokio::io::duplex(64 * 1024); + let (reader, _) = tokio::io::split(a); + let (_, writer) = tokio::io::split(b); + let writer: Pin> = match encoding { + Encoding::Brotli => Box::pin(BrotliEncoder::new(writer)), + Encoding::Gzip => Box::pin(GzipEncoder::new(writer)), + _ => unreachable!(), // forbidden by accepts_compression + }; + Ok(( + HttpResponseWriter::Body(writer), + Body::wrap_stream(ReaderStream::new(reader)), + )) + } + None => { + let (body_tx, body_rx) = Body::channel(); + Ok((HttpResponseWriter::BodyUncompressed(body_tx), body_rx)) + } + } +} + +// If user provided a ETag header for uncompressed data, we need to +// ensure it is a Weak Etag header ("W/"). +fn weaken_etag(hmap: &mut hyper::HeaderMap) { + if let Some(etag) = hmap.get_mut(hyper::header::ETAG) { + if !etag.as_bytes().starts_with(b"W/") { + let mut v = Vec::with_capacity(etag.as_bytes().len() + 2); + v.extend(b"W/"); + v.extend(etag.as_bytes()); + *etag = v.try_into().unwrap(); + } + } +} + +// Set Vary: Accept-Encoding header for direct body response. +// Note: we set the header irrespective of whether or not we compress the data +// to make sure cache services do not serve uncompressed data to clients that +// support compression. +fn ensure_vary_accept_encoding(hmap: &mut hyper::HeaderMap) { + if let Some(v) = hmap.get_mut(hyper::header::VARY) { + if let Ok(s) = v.to_str() { + if !s.to_lowercase().contains("accept-encoding") { + *v = format!("Accept-Encoding, {}", s).try_into().unwrap() + } + return; + } + } + hmap.insert( + hyper::header::VARY, + HeaderValue::from_static("Accept-Encoding"), + ); +} + +fn should_compress(headers: &hyper::HeaderMap) -> bool { + // skip compression if the cache-control header value is set to "no-transform" or not utf8 + fn cache_control_no_transform(headers: &hyper::HeaderMap) -> Option { + let v = headers.get(hyper::header::CACHE_CONTROL)?; + let s = match std::str::from_utf8(v.as_bytes()) { + Ok(s) => s, + Err(_) => return Some(true), + }; + let c = CacheControl::from_value(s)?; + Some(c.no_transform) + } + // we skip compression if the `content-range` header value is set, as it + // indicates the contents of the body were negotiated based directly + // with the user code and we can't compress the response + let content_range = headers.contains_key(hyper::header::CONTENT_RANGE); + + !content_range + && !cache_control_no_transform(headers).unwrap_or_default() + && headers + .get(hyper::header::CONTENT_TYPE) + .map(compressible::is_content_compressible) + .unwrap_or_default() +} + #[op] async fn op_http_write_resource( state: Rc>, @@ -757,42 +744,38 @@ async fn op_http_write( .get::(rid)?; let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await; - loop { - match &mut *wr { - HttpResponseWriter::Headers(_) => { - break Err(http_error("no response headers")) + match &mut *wr { + HttpResponseWriter::Headers(_) => Err(http_error("no response headers")), + HttpResponseWriter::Closed => Err(http_error("response already completed")), + HttpResponseWriter::Body(body) => { + let mut result = body.write_all(&buf).await; + if result.is_ok() { + result = body.flush().await; } - HttpResponseWriter::Closed => { - break Err(http_error("response already completed")) - } - HttpResponseWriter::Body(body) => { - let mut result = body.write_all(&buf).await; - if result.is_ok() { - result = body.flush().await; - } - match result { - Ok(_) => break Ok(()), - Err(err) => { - assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe); - // Don't return "broken pipe", that's an implementation detail. - // Pull up the failure associated with the transport connection instead. - stream.conn.closed().await?; - // If there was no connection error, drop body_tx. - *wr = HttpResponseWriter::Closed; - } + match result { + Ok(_) => Ok(()), + Err(err) => { + assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe); + // Don't return "broken pipe", that's an implementation detail. + // Pull up the failure associated with the transport connection instead. + stream.conn.closed().await?; + // If there was no connection error, drop body_tx. + *wr = HttpResponseWriter::Closed; + Err(http_error("response already completed")) } } - HttpResponseWriter::BodyUncompressed(body) => { - let bytes = Bytes::copy_from_slice(&buf[..]); - match body.send_data(bytes).await { - Ok(_) => break Ok(()), - Err(err) => { - assert!(err.is_closed()); - // Pull up the failure associated with the transport connection instead. - stream.conn.closed().await?; - // If there was no connection error, drop body_tx. - *wr = HttpResponseWriter::Closed; - } + } + HttpResponseWriter::BodyUncompressed(body) => { + let bytes = Bytes::copy_from_slice(&buf[..]); + match body.send_data(bytes).await { + Ok(_) => Ok(()), + Err(err) => { + assert!(err.is_closed()); + // Pull up the failure associated with the transport connection instead. + stream.conn.closed().await?; + // If there was no connection error, drop body_tx. + *wr = HttpResponseWriter::Closed; + Err(http_error("response already completed")) } } } diff --git a/serde_v8/magic/bytestring.rs b/serde_v8/magic/bytestring.rs index a4c664b295..d6f3cb1740 100644 --- a/serde_v8/magic/bytestring.rs +++ b/serde_v8/magic/bytestring.rs @@ -46,3 +46,10 @@ impl FromV8 for ByteString { Ok(buffer.into()) } } + +#[allow(clippy::from_over_into)] +impl Into> for ByteString { + fn into(self) -> Vec { + self.0 + } +}