0
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-03-03 17:34:47 -05:00

fix(ext/http): ensure that resources are closed when request is cancelled (#20641)

Builds on top of #20622 to fix #10854
This commit is contained in:
Matt Mastracci 2023-09-25 09:23:55 -06:00 committed by GitHub
parent 83f20007aa
commit a27ee8f368
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 106 additions and 7 deletions

View file

@ -2699,12 +2699,14 @@ Deno.test(
for (const url of ["text", "file", "stream"]) { for (const url of ["text", "file", "stream"]) {
// Ensure that we don't panic when the incoming TCP request was dropped // Ensure that we don't panic when the incoming TCP request was dropped
// https://github.com/denoland/deno/issues/20315 // https://github.com/denoland/deno/issues/20315 and that we correctly
// close/cancel the response
Deno.test({ Deno.test({
permissions: { read: true, write: true, net: true }, permissions: { read: true, write: true, net: true },
name: `httpServerTcpCancellation_${url}`, name: `httpServerTcpCancellation_${url}`,
fn: async function () { fn: async function () {
const ac = new AbortController(); const ac = new AbortController();
const streamCancelled = url == "stream" ? deferred() : undefined;
const listeningPromise = deferred(); const listeningPromise = deferred();
const waitForAbort = deferred(); const waitForAbort = deferred();
const waitForRequest = deferred(); const waitForRequest = deferred();
@ -2727,7 +2729,9 @@ for (const url of ["text", "file", "stream"]) {
start(controller) { start(controller) {
_body = null; _body = null;
controller.enqueue(new Uint8Array([1])); controller.enqueue(new Uint8Array([1]));
controller.close(); },
cancel(reason) {
streamCancelled!.resolve(reason);
}, },
}), }),
); );
@ -2753,14 +2757,56 @@ for (const url of ["text", "file", "stream"]) {
// Give it a few milliseconds for the serve machinery to work // Give it a few milliseconds for the serve machinery to work
await new Promise((r) => setTimeout(r, 10)); await new Promise((r) => setTimeout(r, 10));
// Wait for cancellation before we shut the server down
if (streamCancelled !== undefined) {
await streamCancelled;
}
// Since the handler has a chance of creating resources or running async ops, we need to use a // Since the handler has a chance of creating resources or running async ops, we need to use a
// graceful shutdown here to ensure they have fully drained. // graceful shutdown here to ensure they have fully drained.
await server.shutdown(); await server.shutdown();
await server.finished; await server.finished;
}, },
}); });
} }
Deno.test(
{ permissions: { net: true } },
async function httpServerCancelFetch() {
const request2 = deferred();
const request2Aborted = deferred();
const { finished, abort } = await makeServer(async (req) => {
if (req.url.endsWith("/1")) {
const fetchRecursive = await fetch(`http://localhost:${servePort}/2`);
return new Response(fetchRecursive.body);
} else if (req.url.endsWith("/2")) {
request2.resolve();
return new Response(
new ReadableStream({
start(_controller) {/* just hang */},
cancel(reason) {
request2Aborted.resolve(reason);
},
}),
);
}
fail();
});
const fetchAbort = new AbortController();
const fetchPromise = await fetch(`http://localhost:${servePort}/1`, {
signal: fetchAbort.signal,
});
await fetchPromise;
await request2;
fetchAbort.abort();
assertEquals("resource closed", await request2Aborted);
abort();
await finished;
},
);
Deno.test( Deno.test(
{ permissions: { read: true, net: true } }, { permissions: { read: true, net: true } },
async function httpServerWithTls() { async function httpServerWithTls() {

View file

@ -577,10 +577,13 @@ fn ensure_vary_accept_encoding(hmap: &mut HeaderMap) {
); );
} }
/// Sets the appropriate response body. Use `force_instantiate_body` if you need
/// to ensure that the response is cleaned up correctly (eg: for resources).
fn set_response( fn set_response(
slab_id: SlabId, slab_id: SlabId,
length: Option<usize>, length: Option<usize>,
status: u16, status: u16,
force_instantiate_body: bool,
response_fn: impl FnOnce(Compression) -> ResponseBytesInner, response_fn: impl FnOnce(Compression) -> ResponseBytesInner,
) { ) {
let mut http = slab_get(slab_id); let mut http = slab_get(slab_id);
@ -602,7 +605,10 @@ fn set_response(
if let Ok(code) = StatusCode::from_u16(status) { if let Ok(code) = StatusCode::from_u16(status) {
*response.status_mut() = code; *response.status_mut() = code;
} }
} else if force_instantiate_body {
response_fn(Compression::None).abort();
} }
http.complete(); http.complete();
} }
@ -634,6 +640,7 @@ pub fn op_http_set_response_body_resource(
slab_id, slab_id,
resource.size_hint().1.map(|s| s as usize), resource.size_hint().1.map(|s| s as usize),
status, status,
true,
move |compression| { move |compression| {
ResponseBytesInner::from_resource(compression, resource, auto_close) ResponseBytesInner::from_resource(compression, resource, auto_close)
}, },
@ -649,7 +656,7 @@ pub fn op_http_set_response_body_text(
status: u16, status: u16,
) { ) {
if !text.is_empty() { if !text.is_empty() {
set_response(slab_id, Some(text.len()), status, |compression| { set_response(slab_id, Some(text.len()), status, false, |compression| {
ResponseBytesInner::from_vec(compression, text.into_bytes()) ResponseBytesInner::from_vec(compression, text.into_bytes())
}); });
} else { } else {
@ -665,7 +672,7 @@ pub fn op_http_set_response_body_bytes(
status: u16, status: u16,
) { ) {
if !buffer.is_empty() { if !buffer.is_empty() {
set_response(slab_id, Some(buffer.len()), status, |compression| { set_response(slab_id, Some(buffer.len()), status, false, |compression| {
ResponseBytesInner::from_bufview(compression, BufView::from(buffer)) ResponseBytesInner::from_bufview(compression, BufView::from(buffer))
}); });
} else { } else {

View file

@ -125,6 +125,16 @@ pub enum ResponseStream {
TestChannel(tokio::sync::mpsc::Receiver<BufView>), TestChannel(tokio::sync::mpsc::Receiver<BufView>),
} }
impl ResponseStream {
pub fn abort(self) {
match self {
ResponseStream::Resource(resource) => resource.stm.close(),
#[cfg(test)]
ResponseStream::TestChannel(..) => {}
}
}
}
#[derive(Default)] #[derive(Default)]
pub enum ResponseBytesInner { pub enum ResponseBytesInner {
/// An empty stream. /// An empty stream.
@ -192,11 +202,25 @@ impl ResponseBytes {
let current = std::mem::replace(&mut self.inner, ResponseBytesInner::Done); let current = std::mem::replace(&mut self.inner, ResponseBytesInner::Done);
self.completion_handle.complete(success); self.completion_handle.complete(success);
if success {
current current
} else {
current.abort();
ResponseBytesInner::Done
}
} }
} }
impl ResponseBytesInner { impl ResponseBytesInner {
pub fn abort(self) {
match self {
Self::Done | Self::Empty | Self::Bytes(..) => {}
Self::BrotliStream(stm) => stm.abort(),
Self::GZipStream(stm) => stm.abort(),
Self::UncompressedStream(stm) => stm.abort(),
}
}
pub fn size_hint(&self) -> SizeHint { pub fn size_hint(&self) -> SizeHint {
match self { match self {
Self::Done => SizeHint::with_exact(0), Self::Done => SizeHint::with_exact(0),
@ -463,6 +487,10 @@ impl GZipResponseStream {
underlying, underlying,
} }
} }
pub fn abort(self) {
self.underlying.abort()
}
} }
/// This is a minimal GZip header suitable for serving data from a webserver. We don't need to provide /// This is a minimal GZip header suitable for serving data from a webserver. We don't need to provide
@ -645,6 +673,10 @@ impl BrotliResponseStream {
underlying, underlying,
} }
} }
pub fn abort(self) {
self.underlying.abort()
}
} }
fn max_compressed_size(input_size: usize) -> usize { fn max_compressed_size(input_size: usize) -> usize {

View file

@ -364,6 +364,15 @@ impl ReadableStreamResource {
.read(limit) .read(limit)
.map(|buf| buf.unwrap_or_else(BufView::empty)) .map(|buf| buf.unwrap_or_else(BufView::empty))
} }
fn close_channel(&self) {
// Trigger the promise in JS to cancel the stream if necessarily
self.data.completion.complete(true);
// Cancel any outstanding read requests
self.cancel_handle.cancel();
// Close the channel to wake up anyone waiting
self.channel.close();
}
} }
impl Resource for ReadableStreamResource { impl Resource for ReadableStreamResource {
@ -376,8 +385,13 @@ impl Resource for ReadableStreamResource {
} }
fn close(self: Rc<Self>) { fn close(self: Rc<Self>) {
self.cancel_handle.cancel(); self.close_channel();
self.channel.close(); }
}
impl Drop for ReadableStreamResource {
fn drop(&mut self) {
self.close_channel();
} }
} }