From 71af3c375c229e3311e4c82350025d1955cfa123 Mon Sep 17 00:00:00 2001 From: Matt Mastracci Date: Fri, 15 Sep 2023 08:08:21 -0600 Subject: [PATCH] fix(ext/http): ensure aborted bodies throw (#20503) Fixes #20502 -- ensure that Hyper errors make it through to JS. --- cli/tests/unit/serve_test.ts | 42 +++++++++++++++++++++++++++++++++++- ext/http/request_body.rs | 31 +++++++++++++++----------- 2 files changed, 60 insertions(+), 13 deletions(-) diff --git a/cli/tests/unit/serve_test.ts b/cli/tests/unit/serve_test.ts index 3f58903a85..76433f1e3f 100644 --- a/cli/tests/unit/serve_test.ts +++ b/cli/tests/unit/serve_test.ts @@ -1,6 +1,9 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. -import { assertMatch } from "../../../test_util/std/testing/asserts.ts"; +import { + assertMatch, + assertRejects, +} from "../../../test_util/std/testing/asserts.ts"; import { Buffer, BufReader, BufWriter } from "../../../test_util/std/io/mod.ts"; import { TextProtoReader } from "../testdata/run/textproto.ts"; import { @@ -879,6 +882,43 @@ Deno.test( }, ); +Deno.test( + { permissions: { net: true } }, + async function httpServerAbortedRequestBody() { + const promise = deferred(); + const ac = new AbortController(); + const listeningPromise = deferred(); + + const server = Deno.serve({ + handler: async (request) => { + await assertRejects(async () => { + await request.text(); + }); + promise.resolve(); + // Not actually used + return new Response(); + }, + port: servePort, + signal: ac.signal, + onListen: onListen(listeningPromise), + onError: createOnErrorCb(ac), + }); + + await listeningPromise; + const conn = await Deno.connect({ port: servePort }); + // Send POST request with a body + content-length, but don't send it all + const encoder = new TextEncoder(); + const body = + `POST / HTTP/1.1\r\nHost: 127.0.0.1:${servePort}\r\nContent-Length: 10\r\n\r\n12345`; + const writeResult = await conn.write(encoder.encode(body)); + assertEquals(body.length, writeResult); + conn.close(); + await promise; + ac.abort(); + await server.finished; + }, +); + function createStreamTest(count: number, delay: number, action: string) { function doAction(controller: ReadableStreamDefaultController, i: number) { if (i == count) { diff --git a/ext/http/request_body.rs b/ext/http/request_body.rs index 73908ca55d..0c3f293203 100644 --- a/ext/http/request_body.rs +++ b/ext/http/request_body.rs @@ -15,6 +15,8 @@ use hyper1::body::SizeHint; use std::borrow::Cow; use std::pin::Pin; use std::rc::Rc; +use std::task::ready; +use std::task::Poll; /// Converts a hyper incoming body stream into a stream of [`Bytes`] that we can use to read in V8. struct ReadFuture(Incoming); @@ -25,21 +27,26 @@ impl Stream for ReadFuture { fn poll_next( self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - let res = Pin::new(&mut self.get_mut().0).poll_frame(cx); - match res { - std::task::Poll::Ready(Some(Ok(frame))) => { - if let Ok(data) = frame.into_data() { - // Ensure that we never yield an empty frame - if !data.is_empty() { - return std::task::Poll::Ready(Some(Ok(data))); + ) -> Poll> { + // Loop until we receive a non-empty frame from Hyper + let this = self.get_mut(); + loop { + let res = ready!(Pin::new(&mut this.0).poll_frame(cx)); + break match res { + Some(Ok(frame)) => { + if let Ok(data) = frame.into_data() { + // Ensure that we never yield an empty frame + if !data.is_empty() { + break Poll::Ready(Some(Ok::<_, AnyError>(data))); + } } + // Loop again so we don't lose the waker + continue; } - } - std::task::Poll::Ready(None) => return std::task::Poll::Ready(None), - _ => {} + Some(Err(e)) => Poll::Ready(Some(Err(e.into()))), + None => Poll::Ready(None), + }; } - std::task::Poll::Pending } }