mirror of
synced 2025-03-03 17:34:47 -05:00
cleanup(ext/http): simpler http write ops (#14552)
Facilitates making `op_http_write_headers` sync and thus faster
This commit is contained in:
2 changed files with 187 additions and 197 deletions
@ -39,6 +39,8 @@ use flate2::write::GzEncoder;
use flate2::Compression;
use flate2::Compression;
use fly_accept_encoding::Encoding;
use fly_accept_encoding::Encoding;
use hyper::body::Bytes;
use hyper::body::Bytes;
use hyper::header::HeaderName;
use hyper::header::HeaderValue;
use hyper::server::conn::Http;
use hyper::server::conn::Http;
use hyper::service::Service;
use hyper::service::Service;
use hyper::Body;
use hyper::Body;
@ -500,177 +502,46 @@ async fn op_http_write_headers(
let mut builder = Response::builder().status(status);
let mut builder = Response::builder().status(status);
let mut body_compressible = false;
// Add headers
let mut headers_allow_compression = true;
let header_count = headers.len();
let mut vary_header = None;
let headers = headers.into_iter().filter_map(|(k, v)| {
let mut etag_header = None;
let v: Vec<u8> = v.into();
let mut content_type_header = None;
// Track supported encoding
let encoding = *stream.accept_encoding.borrow();
let hmap = builder.headers_mut().unwrap();
for (key, value) in &headers {
hmap.reserve(header_count + 2);
if key.eq_ignore_ascii_case(b"cache-control") {
if let Ok(value) = std::str::from_utf8(value) {
if let Some(cache_control) = CacheControl::from_value(value) {
// We skip compression if the cache-control header value is set to
// "no-transform"
if cache_control.no_transform {
headers_allow_compression = false;
} else {
headers_allow_compression = false;
} else if key.eq_ignore_ascii_case(b"content-range") {
// we skip compression if the `content-range` header value is set, as it
// indicates the contents of the body were negotiated based directly
// with the user code and we can't compress the response
headers_allow_compression = false;
} else if key.eq_ignore_ascii_case(b"content-type") && !value.is_empty() {
content_type_header = Some(value);
} else if key.eq_ignore_ascii_case(b"content-encoding") {
// we don't compress if a content-encoding header was provided
headers_allow_compression = false;
} else if key.eq_ignore_ascii_case(b"etag") && !value.is_empty() {
// we store the values of ETag and Vary and skip adding them for now, as
// we may need to modify or change.
etag_header = Some(value);
} else if key.eq_ignore_ascii_case(b"vary") && !value.is_empty() {
vary_header = Some(value);
builder = builder.header(key.as_slice(), value.as_slice());
if headers_allow_compression {
let accepts_compression =
body_compressible = content_type_header
matches!(encoding, Encoding::Brotli | Encoding::Gzip);
let compressing = accepts_compression
let body: Response<Body>;
let new_wr: HttpResponseWriter;
// Set Vary: Accept-Encoding header for direct body response.
// Note: we set the header irrespective of whether or not we compress the data
// to make sure cache services do not serve uncompressed data to clients that
// support compression.
let vary_value = if let Some(value) = vary_header {
if let Ok(value_str) = std::str::from_utf8(value.as_slice()) {
if !value_str.to_lowercase().contains("accept-encoding") {
format!("Accept-Encoding, {}", value_str)
} else {
} else {
// the header value wasn't valid UTF8, so it would have been a
// problem anyways, so sending a default header.
} else {
builder = builder.header("vary", &vary_value);
let accepts_compression = matches!(
Encoding::Brotli | Encoding::Gzip
let should_compress = body_compressible
&& (matches!(data, Some(ref data) if data.len() > 20) || data.is_none())
&& (matches!(data, Some(ref data) if data.len() > 20) || data.is_none())
&& accepts_compression;
&& should_compress(hmap);
if should_compress {
if compressing {
// If user provided a ETag header for uncompressed data, we need to
// ensure it is a Weak Etag header ("W/").
if let Some(value) = etag_header {
if let Ok(value_str) = std::str::from_utf8(value.as_slice()) {
if !value_str.starts_with("W/") {
builder = builder.header("etag", format!("W/{}", value_str));
} else {
builder = builder.header("etag", value.as_slice());
} else {
builder = builder.header("etag", value.as_slice());
// Drop 'content-length' header. Hyper will update it using compressed body.
// Drop 'content-length' header. Hyper will update it using compressed body.
if let Some(headers) = builder.headers_mut() {
// Content-Encoding header
} else if let Some(value) = etag_header {
builder = builder.header("etag", value.as_slice());
HeaderValue::from_static(match encoding {
Encoding::Brotli => "br",
Encoding::Gzip => "gzip",
_ => unreachable!(), // Forbidden by accepts_compression
match data {
let (new_wr, body) = http_response(data, compressing, encoding)?;
Some(data) => {
let body = builder.body(body)?;
if should_compress {
match *stream.accept_encoding.borrow() {
Encoding::Brotli => {
builder = builder.header("content-encoding", "br");
// quality level 6 is based on google's nginx default value for
// on-the-fly compression
// https://github.com/google/ngx_brotli#brotli_comp_level
// lgwin 22 is equivalent to brotli window size of (2**22)-16 bytes
// (~4MB)
let mut writer =
brotli::CompressorWriter::new(Vec::new(), 4096, 6, 22);
body = builder.body(writer.into_inner().into())?;
_ => {
assert_eq!(*stream.accept_encoding.borrow(), Encoding::Gzip);
builder = builder.header("content-encoding", "gzip");
// Gzip, after level 1, doesn't produce significant size difference.
// Probably the reason why nginx's default gzip compression level is
// 1.
// https://nginx.org/en/docs/http/ngx_http_gzip_module.html#gzip_comp_level
let mut writer = GzEncoder::new(Vec::new(), Compression::new(1));
body = builder.body(writer.finish()?.into())?;
} else {
// If a buffer was passed, but isn't compressible, we use it to
// construct a response body.
body = builder.body(Bytes::copy_from_slice(&data).into())?;
new_wr = HttpResponseWriter::Closed;
None => {
// If no buffer was passed, the caller will stream the response body.
if should_compress {
// Create a one way pipe that implements tokio's async io traits. To do
// this we create a [tokio::io::DuplexStream], but then throw away one
// of the directions to create a one way pipe.
let (a, b) = tokio::io::duplex(64 * 1024);
let (reader, _) = tokio::io::split(a);
let (_, writer) = tokio::io::split(b);
let writer_body: Pin<Box<dyn tokio::io::AsyncWrite>>;
match *stream.accept_encoding.borrow() {
Encoding::Brotli => {
let writer = BrotliEncoder::new(writer);
writer_body = Box::pin(writer);
builder = builder.header("content-encoding", "br");
_ => {
assert_eq!(*stream.accept_encoding.borrow(), Encoding::Gzip);
let writer = GzipEncoder::new(writer);
writer_body = Box::pin(writer);
builder = builder.header("content-encoding", "gzip");
body = builder.body(Body::wrap_stream(ReaderStream::new(reader)))?;
new_wr = HttpResponseWriter::Body(writer_body);
} else {
let (body_tx, body_rx) = Body::channel();
body = builder.body(body_rx)?;
new_wr = HttpResponseWriter::BodyUncompressed(body_tx);
let mut old_wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await;
let mut old_wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await;
let response_tx = match replace(&mut *old_wr, new_wr) {
let response_tx = match replace(&mut *old_wr, new_wr) {
@ -687,6 +558,122 @@ async fn op_http_write_headers(
fn http_response(
data: Option<StringOrBuffer>,
compressing: bool,
encoding: Encoding,
) -> Result<(HttpResponseWriter, hyper::Body), AnyError> {
match data {
Some(data) if compressing => match encoding {
Encoding::Brotli => {
// quality level 6 is based on google's nginx default value for
// on-the-fly compression
// https://github.com/google/ngx_brotli#brotli_comp_level
// lgwin 22 is equivalent to brotli window size of (2**22)-16 bytes
// (~4MB)
let mut writer = brotli::CompressorWriter::new(Vec::new(), 4096, 6, 22);
Ok((HttpResponseWriter::Closed, writer.into_inner().into()))
Encoding::Gzip => {
// Gzip, after level 1, doesn't produce significant size difference.
// Probably the reason why nginx's default gzip compression level is
// 1.
// https://nginx.org/en/docs/http/ngx_http_gzip_module.html#gzip_comp_level
let mut writer = GzEncoder::new(Vec::new(), Compression::new(1));
Ok((HttpResponseWriter::Closed, writer.finish()?.into()))
_ => unreachable!(), // forbidden by accepts_compression
Some(data) => {
// If a buffer was passed, but isn't compressible, we use it to
// construct a response body.
None if compressing => {
// Create a one way pipe that implements tokio's async io traits. To do
// this we create a [tokio::io::DuplexStream], but then throw away one
// of the directions to create a one way pipe.
let (a, b) = tokio::io::duplex(64 * 1024);
let (reader, _) = tokio::io::split(a);
let (_, writer) = tokio::io::split(b);
let writer: Pin<Box<dyn tokio::io::AsyncWrite>> = match encoding {
Encoding::Brotli => Box::pin(BrotliEncoder::new(writer)),
Encoding::Gzip => Box::pin(GzipEncoder::new(writer)),
_ => unreachable!(), // forbidden by accepts_compression
None => {
let (body_tx, body_rx) = Body::channel();
Ok((HttpResponseWriter::BodyUncompressed(body_tx), body_rx))
// If user provided a ETag header for uncompressed data, we need to
// ensure it is a Weak Etag header ("W/").
fn weaken_etag(hmap: &mut hyper::HeaderMap) {
if let Some(etag) = hmap.get_mut(hyper::header::ETAG) {
if !etag.as_bytes().starts_with(b"W/") {
let mut v = Vec::with_capacity(etag.as_bytes().len() + 2);
*etag = v.try_into().unwrap();
// Set Vary: Accept-Encoding header for direct body response.
// Note: we set the header irrespective of whether or not we compress the data
// to make sure cache services do not serve uncompressed data to clients that
// support compression.
fn ensure_vary_accept_encoding(hmap: &mut hyper::HeaderMap) {
if let Some(v) = hmap.get_mut(hyper::header::VARY) {
if let Ok(s) = v.to_str() {
if !s.to_lowercase().contains("accept-encoding") {
*v = format!("Accept-Encoding, {}", s).try_into().unwrap()
fn should_compress(headers: &hyper::HeaderMap) -> bool {
// skip compression if the cache-control header value is set to "no-transform" or not utf8
fn cache_control_no_transform(headers: &hyper::HeaderMap) -> Option<bool> {
let v = headers.get(hyper::header::CACHE_CONTROL)?;
let s = match std::str::from_utf8(v.as_bytes()) {
Ok(s) => s,
Err(_) => return Some(true),
let c = CacheControl::from_value(s)?;
// we skip compression if the `content-range` header value is set, as it
// indicates the contents of the body were negotiated based directly
// with the user code and we can't compress the response
let content_range = headers.contains_key(hyper::header::CONTENT_RANGE);
&& !cache_control_no_transform(headers).unwrap_or_default()
&& headers
async fn op_http_write_resource(
async fn op_http_write_resource(
state: Rc<RefCell<OpState>>,
state: Rc<RefCell<OpState>>,
@ -757,42 +744,38 @@ async fn op_http_write(
let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await;
let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await;
loop {
match &mut *wr {
match &mut *wr {
HttpResponseWriter::Headers(_) => Err(http_error("no response headers")),
HttpResponseWriter::Headers(_) => {
HttpResponseWriter::Closed => Err(http_error("response already completed")),
break Err(http_error("no response headers"))
HttpResponseWriter::Body(body) => {
let mut result = body.write_all(&buf).await;
if result.is_ok() {
result = body.flush().await;
HttpResponseWriter::Closed => {
match result {
break Err(http_error("response already completed"))
Ok(_) => Ok(()),
Err(err) => {
HttpResponseWriter::Body(body) => {
assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe);
let mut result = body.write_all(&buf).await;
// Don't return "broken pipe", that's an implementation detail.
if result.is_ok() {
// Pull up the failure associated with the transport connection instead.
result = body.flush().await;
// If there was no connection error, drop body_tx.
match result {
*wr = HttpResponseWriter::Closed;
Ok(_) => break Ok(()),
Err(http_error("response already completed"))
Err(err) => {
assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe);
// Don't return "broken pipe", that's an implementation detail.
// Pull up the failure associated with the transport connection instead.
// If there was no connection error, drop body_tx.
*wr = HttpResponseWriter::Closed;
HttpResponseWriter::BodyUncompressed(body) => {
let bytes = Bytes::copy_from_slice(&buf[..]);
HttpResponseWriter::BodyUncompressed(body) => {
match body.send_data(bytes).await {
let bytes = Bytes::copy_from_slice(&buf[..]);
Ok(_) => break Ok(()),
match body.send_data(bytes).await {
Err(err) => {
Ok(_) => Ok(()),
Err(err) => {
// Pull up the failure associated with the transport connection instead.
// Pull up the failure associated with the transport connection instead.
// If there was no connection error, drop body_tx.
*wr = HttpResponseWriter::Closed;
// If there was no connection error, drop body_tx.
*wr = HttpResponseWriter::Closed;
Err(http_error("response already completed"))
@ -46,3 +46,10 @@ impl FromV8 for ByteString {
impl Into<Vec<u8>> for ByteString {
fn into(self) -> Vec<u8> {
Add table
Reference in a new issue