From 8e947bb674725195a4d2a754445ee71029108f61 Mon Sep 17 00:00:00 2001
From: Luca Casonato <hello@lcas.dev>
Date: Tue, 20 Dec 2022 09:46:45 +0100
Subject: [PATCH] fix(ext/http): close stream on resp body error (#17126)

Previously, errored streaming response bodies did not cause the HTTP
stream to be aborted. It instead caused the stream to be closed gracefully,
which had the result that the client could not detect the difference
between a successful response and an errored response.

This commit fixes the issue by aborting the stream on error.
---
 Cargo.lock                  |   1 +
 cli/tests/unit/http_test.ts | 123 ++++++++++++++++++++++++++++
 ext/http/01_http.js         |  15 ++--
 ext/http/Cargo.toml         |   1 +
 ext/http/lib.rs             | 100 +++++++++++++++++------
 ext/http/reader_stream.rs   | 157 ++++++++++++++++++++++++++++++++++++
 6 files changed, 369 insertions(+), 28 deletions(-)
 create mode 100644 ext/http/reader_stream.rs

diff --git a/Cargo.lock b/Cargo.lock
index 6ae7e6b81f..665a0901ef 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1095,6 +1095,7 @@ dependencies = [
  "mime",
  "percent-encoding",
  "phf",
+ "pin-project",
  "ring",
  "serde",
  "tokio",
diff --git a/cli/tests/unit/http_test.ts b/cli/tests/unit/http_test.ts
index 3475513620..73bf07b68c 100644
--- a/cli/tests/unit/http_test.ts
+++ b/cli/tests/unit/http_test.ts
@@ -2614,6 +2614,129 @@ Deno.test({
   },
 });
 
+async function httpServerWithErrorBody(
+  listener: Deno.Listener,
+  compression: boolean,
+): Promise<Deno.HttpConn> {
+  const conn = await listener.accept();
+  listener.close();
+  const httpConn = Deno.serveHttp(conn);
+  const e = await httpConn.nextRequest();
+  assert(e);
+  const { respondWith } = e;
+  const originalErr = new Error("boom");
+  const rs = new ReadableStream({
+    async start(controller) {
+      controller.enqueue(new Uint8Array([65]));
+      await delay(1000);
+      controller.error(originalErr);
+    },
+  });
+  const init = compression ? { headers: { "content-type": "text/plain" } } : {};
+  const response = new Response(rs, init);
+  const err = await assertRejects(() => respondWith(response));
+  assert(err === originalErr);
+  return httpConn;
+}
+
+for (const compression of [true, false]) {
+  Deno.test({
+    name: `http server errors stream if response body errors (http/1.1${
+      compression ? " + compression" : ""
+    })`,
+    permissions: { net: true },
+    async fn() {
+      const hostname = "localhost";
+      const port = 4501;
+
+      const listener = Deno.listen({ hostname, port });
+      const server = httpServerWithErrorBody(listener, compression);
+
+      const conn = await Deno.connect({ hostname, port });
+      const msg = new TextEncoder().encode(
+        `GET / HTTP/1.1\r\nHost: ${hostname}:${port}\r\n\r\n`,
+      );
+      const nwritten = await conn.write(msg);
+      assertEquals(nwritten, msg.byteLength);
+
+      const buf = new Uint8Array(1024);
+      const nread = await conn.read(buf);
+      assert(nread);
+      const data = new TextDecoder().decode(buf.subarray(0, nread));
+      assert(data.endsWith("1\r\nA\r\n"));
+      const nread2 = await conn.read(buf); // connection should be closed now because the stream errored
+      assertEquals(nread2, null);
+      conn.close();
+
+      const httpConn = await server;
+      httpConn.close();
+    },
+  });
+
+  Deno.test({
+    name: `http server errors stream if response body errors (http/1.1 + fetch${
+      compression ? " + compression" : ""
+    })`,
+    permissions: { net: true },
+    async fn() {
+      const hostname = "localhost";
+      const port = 4501;
+
+      const listener = Deno.listen({ hostname, port });
+      const server = httpServerWithErrorBody(listener, compression);
+
+      const resp = await fetch(`http://${hostname}:${port}/`);
+      assert(resp.body);
+      const reader = resp.body.getReader();
+      const result = await reader.read();
+      assert(!result.done);
+      assertEquals(result.value, new Uint8Array([65]));
+      const err = await assertRejects(() => reader.read());
+      assert(err instanceof TypeError);
+      assert(err.message.includes("unexpected EOF"));
+
+      const httpConn = await server;
+      httpConn.close();
+    },
+  });
+
+  Deno.test({
+    name: `http server errors stream if response body errors (http/2 + fetch${
+      compression ? " + compression" : ""
+    }))`,
+    permissions: { net: true, read: true },
+    async fn() {
+      const hostname = "localhost";
+      const port = 4501;
+
+      const listener = Deno.listenTls({
+        hostname,
+        port,
+        certFile: "cli/tests/testdata/tls/localhost.crt",
+        keyFile: "cli/tests/testdata/tls/localhost.key",
+        alpnProtocols: ["h2"],
+      });
+      const server = httpServerWithErrorBody(listener, compression);
+
+      const caCert = Deno.readTextFileSync("cli/tests/testdata/tls/RootCA.pem");
+      const client = Deno.createHttpClient({ caCerts: [caCert] });
+      const resp = await fetch(`https://${hostname}:${port}/`, { client });
+      client.close();
+      assert(resp.body);
+      const reader = resp.body.getReader();
+      const result = await reader.read();
+      assert(!result.done);
+      assertEquals(result.value, new Uint8Array([65]));
+      const err = await assertRejects(() => reader.read());
+      assert(err instanceof TypeError);
+      assert(err.message.includes("unexpected internal error encountered"));
+
+      const httpConn = await server;
+      httpConn.close();
+    },
+  });
+}
+
 function chunkedBodyReader(h: Headers, r: BufReader): Deno.Reader {
   // Based on https://tools.ietf.org/html/rfc2616#section-19.4.6
   const tp = new TextProtoReader(r);
diff --git a/ext/http/01_http.js b/ext/http/01_http.js
index bd740b600c..dfb0f206cf 100644
--- a/ext/http/01_http.js
+++ b/ext/http/01_http.js
@@ -263,6 +263,7 @@
         }
 
         if (isStreamingResponseBody) {
+          let success = false;
           if (
             respBody === null ||
             !ObjectPrototypeIsPrototypeOf(ReadableStreamPrototype, respBody)
@@ -284,6 +285,7 @@
               );
               if (resourceBacking.autoClose) core.tryClose(resourceBacking.rid);
               readableStreamClose(respBody); // Release JS lock.
+              success = true;
             } catch (error) {
               const connError = httpConn[connErrorSymbol];
               if (
@@ -320,13 +322,16 @@
                 throw error;
               }
             }
+            success = true;
           }
 
-          try {
-            await core.opAsync("op_http_shutdown", streamRid);
-          } catch (error) {
-            await reader.cancel(error);
-            throw error;
+          if (success) {
+            try {
+              await core.opAsync("op_http_shutdown", streamRid);
+            } catch (error) {
+              await reader.cancel(error);
+              throw error;
+            }
           }
         }
 
diff --git a/ext/http/Cargo.toml b/ext/http/Cargo.toml
index 2f4ae31e64..65cd4ccfef 100644
--- a/ext/http/Cargo.toml
+++ b/ext/http/Cargo.toml
@@ -31,6 +31,7 @@ hyper = { workspace = true, features = ["server", "stream", "http1", "http2", "r
 mime = "0.3.16"
 percent-encoding.workspace = true
 phf = { version = "0.10", features = ["macros"] }
+pin-project.workspace = true
 ring.workspace = true
 serde.workspace = true
 tokio.workspace = true
diff --git a/ext/http/lib.rs b/ext/http/lib.rs
index af117d3f92..812394d94b 100644
--- a/ext/http/lib.rs
+++ b/ext/http/lib.rs
@@ -70,9 +70,12 @@ use tokio::io::AsyncRead;
 use tokio::io::AsyncWrite;
 use tokio::io::AsyncWriteExt;
 use tokio::task::spawn_local;
-use tokio_util::io::ReaderStream;
+
+use crate::reader_stream::ExternallyAbortableReaderStream;
+use crate::reader_stream::ShutdownHandle;
 
 pub mod compressible;
+mod reader_stream;
 
 pub fn init() -> Extension {
   Extension::builder()
@@ -414,8 +417,11 @@ impl Default for HttpRequestReader {
 /// The write half of an HTTP stream.
 enum HttpResponseWriter {
   Headers(oneshot::Sender<Response<Body>>),
-  Body(Pin<Box<dyn tokio::io::AsyncWrite>>),
-  BodyUncompressed(hyper::body::Sender),
+  Body {
+    writer: Pin<Box<dyn tokio::io::AsyncWrite>>,
+    shutdown_handle: ShutdownHandle,
+  },
+  BodyUncompressed(BodyUncompressedSender),
   Closed,
 }
 
@@ -425,6 +431,36 @@ impl Default for HttpResponseWriter {
   }
 }
 
+struct BodyUncompressedSender(Option<hyper::body::Sender>);
+
+impl BodyUncompressedSender {
+  fn sender(&mut self) -> &mut hyper::body::Sender {
+    // This is safe because we only ever take the sender out of the option
+    // inside of the shutdown method.
+    self.0.as_mut().unwrap()
+  }
+
+  fn shutdown(mut self) {
+    // take the sender out of self so that when self is dropped at the end of
+    // this block, it doesn't get aborted
+    self.0.take();
+  }
+}
+
+impl From<hyper::body::Sender> for BodyUncompressedSender {
+  fn from(sender: hyper::body::Sender) -> Self {
+    BodyUncompressedSender(Some(sender))
+  }
+}
+
+impl Drop for BodyUncompressedSender {
+  fn drop(&mut self) {
+    if let Some(sender) = self.0.take() {
+      sender.abort();
+    }
+  }
+}
+
 // We use a tuple instead of struct to avoid serialization overhead of the keys.
 #[derive(Serialize)]
 #[serde(rename_all = "camelCase")]
@@ -668,14 +704,22 @@ fn http_response(
         Encoding::Gzip => Box::pin(GzipEncoder::new(writer)),
         _ => unreachable!(), // forbidden by accepts_compression
       };
+      let (stream, shutdown_handle) =
+        ExternallyAbortableReaderStream::new(reader);
       Ok((
-        HttpResponseWriter::Body(writer),
-        Body::wrap_stream(ReaderStream::new(reader)),
+        HttpResponseWriter::Body {
+          writer,
+          shutdown_handle,
+        },
+        Body::wrap_stream(stream),
       ))
     }
     None => {
       let (body_tx, body_rx) = Body::channel();
-      Ok((HttpResponseWriter::BodyUncompressed(body_tx), body_rx))
+      Ok((
+        HttpResponseWriter::BodyUncompressed(body_tx.into()),
+        body_rx,
+      ))
     }
   }
 }
@@ -768,10 +812,10 @@ async fn op_http_write_resource(
     }
 
     match &mut *wr {
-      HttpResponseWriter::Body(body) => {
-        let mut result = body.write_all(&view).await;
+      HttpResponseWriter::Body { writer, .. } => {
+        let mut result = writer.write_all(&view).await;
         if result.is_ok() {
-          result = body.flush().await;
+          result = writer.flush().await;
         }
         if let Err(err) = result {
           assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe);
@@ -784,7 +828,7 @@ async fn op_http_write_resource(
       }
       HttpResponseWriter::BodyUncompressed(body) => {
         let bytes = Bytes::from(view);
-        if let Err(err) = body.send_data(bytes).await {
+        if let Err(err) = body.sender().send_data(bytes).await {
           assert!(err.is_closed());
           // Pull up the failure associated with the transport connection instead.
           http_stream.conn.closed().await?;
@@ -813,10 +857,10 @@ async fn op_http_write(
   match &mut *wr {
     HttpResponseWriter::Headers(_) => Err(http_error("no response headers")),
     HttpResponseWriter::Closed => Err(http_error("response already completed")),
-    HttpResponseWriter::Body(body) => {
-      let mut result = body.write_all(&buf).await;
+    HttpResponseWriter::Body { writer, .. } => {
+      let mut result = writer.write_all(&buf).await;
       if result.is_ok() {
-        result = body.flush().await;
+        result = writer.flush().await;
       }
       match result {
         Ok(_) => Ok(()),
@@ -833,7 +877,7 @@ async fn op_http_write(
     }
     HttpResponseWriter::BodyUncompressed(body) => {
       let bytes = Bytes::from(buf);
-      match body.send_data(bytes).await {
+      match body.sender().send_data(bytes).await {
         Ok(_) => Ok(()),
         Err(err) => {
           assert!(err.is_closed());
@@ -862,17 +906,27 @@ async fn op_http_shutdown(
     .get::<HttpStreamResource>(rid)?;
   let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await;
   let wr = take(&mut *wr);
-  if let HttpResponseWriter::Body(mut body_writer) = wr {
-    match body_writer.shutdown().await {
-      Ok(_) => {}
-      Err(err) => {
-        assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe);
-        // Don't return "broken pipe", that's an implementation detail.
-        // Pull up the failure associated with the transport connection instead.
-        stream.conn.closed().await?;
+  match wr {
+    HttpResponseWriter::Body {
+      mut writer,
+      shutdown_handle,
+    } => {
+      shutdown_handle.shutdown();
+      match writer.shutdown().await {
+        Ok(_) => {}
+        Err(err) => {
+          assert_eq!(err.kind(), std::io::ErrorKind::BrokenPipe);
+          // Don't return "broken pipe", that's an implementation detail.
+          // Pull up the failure associated with the transport connection instead.
+          stream.conn.closed().await?;
+        }
       }
     }
-  }
+    HttpResponseWriter::BodyUncompressed(body) => {
+      body.shutdown();
+    }
+    _ => {}
+  };
   Ok(())
 }
 
diff --git a/ext/http/reader_stream.rs b/ext/http/reader_stream.rs
new file mode 100644
index 0000000000..388b8db814
--- /dev/null
+++ b/ext/http/reader_stream.rs
@@ -0,0 +1,157 @@
+// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
+
+use std::pin::Pin;
+use std::sync::atomic::AtomicBool;
+use std::sync::atomic::Ordering;
+use std::sync::Arc;
+use std::task::Context;
+use std::task::Poll;
+
+use bytes::Bytes;
+use deno_core::futures::Stream;
+use pin_project::pin_project;
+use tokio::io::AsyncRead;
+use tokio_util::io::ReaderStream;
+
+/// [ExternallyAbortableByteStream] adapts a [tokio::AsyncRead] into a [Stream].
+/// It is used to bridge between the HTTP response body resource, and
+/// `hyper::Body`. The stream has the special property that it errors if the
+/// underlying reader is closed before an explicit EOF is sent (in the form of
+/// setting the `shutdown` flag to true).
+#[pin_project]
+pub struct ExternallyAbortableReaderStream<R: AsyncRead> {
+  #[pin]
+  inner: ReaderStream<R>,
+  done: Arc<AtomicBool>,
+}
+
+pub struct ShutdownHandle(Arc<AtomicBool>);
+
+impl ShutdownHandle {
+  pub fn shutdown(&self) {
+    self.0.store(true, std::sync::atomic::Ordering::SeqCst);
+  }
+}
+
+impl<R: AsyncRead> ExternallyAbortableReaderStream<R> {
+  pub fn new(reader: R) -> (Self, ShutdownHandle) {
+    let done = Arc::new(AtomicBool::new(false));
+    let this = Self {
+      inner: ReaderStream::new(reader),
+      done: done.clone(),
+    };
+    (this, ShutdownHandle(done))
+  }
+}
+
+impl<R: AsyncRead> Stream for ExternallyAbortableReaderStream<R> {
+  type Item = std::io::Result<Bytes>;
+
+  fn poll_next(
+    self: Pin<&mut Self>,
+    cx: &mut Context<'_>,
+  ) -> Poll<Option<Self::Item>> {
+    let this = self.project();
+    let val = std::task::ready!(this.inner.poll_next(cx));
+    match val {
+      None if this.done.load(Ordering::SeqCst) => Poll::Ready(None),
+      None => Poll::Ready(Some(Err(std::io::Error::new(
+        std::io::ErrorKind::UnexpectedEof,
+        "stream reader has shut down",
+      )))),
+      Some(val) => Poll::Ready(Some(val)),
+    }
+  }
+}
+
+#[cfg(test)]
+mod tests {
+  use super::*;
+  use bytes::Bytes;
+  use deno_core::futures::StreamExt;
+  use tokio::io::AsyncWriteExt;
+
+  #[tokio::test]
+  async fn success() {
+    let (a, b) = tokio::io::duplex(64 * 1024);
+    let (reader, _) = tokio::io::split(a);
+    let (_, mut writer) = tokio::io::split(b);
+
+    let (mut stream, shutdown_handle) =
+      ExternallyAbortableReaderStream::new(reader);
+
+    writer.write_all(b"hello").await.unwrap();
+    assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("hello"));
+
+    writer.write_all(b"world").await.unwrap();
+    assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("world"));
+
+    shutdown_handle.shutdown();
+    writer.shutdown().await.unwrap();
+    drop(writer);
+    assert!(stream.next().await.is_none());
+  }
+
+  #[tokio::test]
+  async fn error() {
+    let (a, b) = tokio::io::duplex(64 * 1024);
+    let (reader, _) = tokio::io::split(a);
+    let (_, mut writer) = tokio::io::split(b);
+
+    let (mut stream, _shutdown_handle) =
+      ExternallyAbortableReaderStream::new(reader);
+
+    writer.write_all(b"hello").await.unwrap();
+    assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("hello"));
+
+    drop(writer);
+    assert_eq!(
+      stream.next().await.unwrap().unwrap_err().kind(),
+      std::io::ErrorKind::UnexpectedEof
+    );
+  }
+
+  #[tokio::test]
+  async fn error2() {
+    let (a, b) = tokio::io::duplex(64 * 1024);
+    let (reader, _) = tokio::io::split(a);
+    let (_, mut writer) = tokio::io::split(b);
+
+    let (mut stream, _shutdown_handle) =
+      ExternallyAbortableReaderStream::new(reader);
+
+    writer.write_all(b"hello").await.unwrap();
+    assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("hello"));
+
+    writer.shutdown().await.unwrap();
+    drop(writer);
+    assert_eq!(
+      stream.next().await.unwrap().unwrap_err().kind(),
+      std::io::ErrorKind::UnexpectedEof
+    );
+  }
+
+  #[tokio::test]
+  async fn write_after_shutdown() {
+    let (a, b) = tokio::io::duplex(64 * 1024);
+    let (reader, _) = tokio::io::split(a);
+    let (_, mut writer) = tokio::io::split(b);
+
+    let (mut stream, shutdown_handle) =
+      ExternallyAbortableReaderStream::new(reader);
+
+    writer.write_all(b"hello").await.unwrap();
+    assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("hello"));
+
+    writer.write_all(b"world").await.unwrap();
+    assert_eq!(stream.next().await.unwrap().unwrap(), Bytes::from("world"));
+
+    shutdown_handle.shutdown();
+    writer.shutdown().await.unwrap();
+
+    assert!(writer.write_all(b"!").await.is_err());
+
+    drop(writer);
+    assert!(stream.next().await.is_none());
+  }
+}