From f03ffaf5407b5466fadce6d83d8676e2b12b980c Mon Sep 17 00:00:00 2001 From: Matt Mastracci Date: Mon, 28 Aug 2023 13:29:34 -0600 Subject: [PATCH] fix(ext/http): don't panic on stream responses in cancelled requests (#20316) When a TCP connection is force-closed (ie: browser refresh), the underlying future we pass to Hyper is dropped which may cause us to try to drop the body resource while the OpState lock is still held. Preconditions for this bug to trigger: - The body resource must have been taken - The response must return a resource (which requires us to take the OpState lock) - The TCP connection must have been dropped before this Fixes #20315 and #20298 --- cli/tests/unit/serve_test.ts | 62 ++++++++++++++++++++++++++++++++++++ ext/http/http_next.rs | 53 ++++++++++++++++++------------ ext/http/slab.rs | 8 ++++- 3 files changed, 102 insertions(+), 21 deletions(-) diff --git a/cli/tests/unit/serve_test.ts b/cli/tests/unit/serve_test.ts index 05affc4f87..d1ac826967 100644 --- a/cli/tests/unit/serve_test.ts +++ b/cli/tests/unit/serve_test.ts @@ -2403,6 +2403,68 @@ Deno.test( }, ); +for (const url of ["text", "file", "stream"]) { + // Ensure that we don't panic when the incoming TCP request was dropped + // https://github.com/denoland/deno/issues/20315 + Deno.test({ + permissions: { read: true, write: true, net: true }, + name: `httpServerTcpCancellation_${url}`, + fn: async function () { + const ac = new AbortController(); + const listeningPromise = deferred(); + const waitForAbort = deferred(); + const waitForRequest = deferred(); + const server = Deno.serve({ + port: servePort, + signal: ac.signal, + onListen: onListen(listeningPromise), + handler: async (req: Request) => { + waitForRequest.resolve(); + await waitForAbort; + // Allocate the request body + let _body = req.body; + if (req.url.includes("/text")) { + return new Response("text"); + } else if (req.url.includes("/file")) { + return new Response((await makeTempFile(1024)).readable); + } else if (req.url.includes("/stream")) { + return new Response( + new ReadableStream({ + start(controller) { + _body = null; + controller.enqueue(new Uint8Array([1])); + controller.close(); + }, + }), + ); + } else { + fail(); + } + }, + }); + + await listeningPromise; + + // Create a POST request and drop it once the server has received it + const conn = await Deno.connect({ port: servePort }); + const writer = conn.writable.getWriter(); + writer.write(new TextEncoder().encode(`POST /${url} HTTP/1.0\n\n`)); + await waitForRequest; + writer.close(); + + // Give it a few milliseconds for the serve machinery to work + await new Promise((r) => setTimeout(r, 10)); + waitForAbort.resolve(); + + // Give it a few milliseconds for the serve machinery to work + await new Promise((r) => setTimeout(r, 10)); + + ac.abort(); + await server.finished; + }, + }); +} + Deno.test( { permissions: { read: true, net: true } }, async function httpServerWithTls() { diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs index e8517c9019..476a55a804 100644 --- a/ext/http/http_next.rs +++ b/ext/http/http_next.rs @@ -585,39 +585,52 @@ fn set_response( response_fn: impl FnOnce(Compression) -> ResponseBytesInner, ) { let mut http = slab_get(slab_id); - let resource = http.take_resource(); - let compression = is_request_compressible(&http.request_parts().headers); - let response = http.response(); - let compression = modify_compressibility_from_response( - compression, - length, - response.headers_mut(), - ); - response - .body_mut() - .initialize(response_fn(compression), resource); + // The request may have been cancelled by this point and if so, there's no need for us to + // do all of this work to send the response. + if !http.cancelled() { + let resource = http.take_resource(); + let compression = is_request_compressible(&http.request_parts().headers); + let response = http.response(); + let compression = modify_compressibility_from_response( + compression, + length, + response.headers_mut(), + ); + response + .body_mut() + .initialize(response_fn(compression), resource); - // The Javascript code should never provide a status that is invalid here (see 23_response.js), so we - // will quitely ignore invalid values. - if let Ok(code) = StatusCode::from_u16(status) { - *response.status_mut() = code; + // The Javascript code should never provide a status that is invalid here (see 23_response.js), so we + // will quitely ignore invalid values. + if let Ok(code) = StatusCode::from_u16(status) { + *response.status_mut() = code; + } } http.complete(); } #[op2(fast)] pub fn op_http_set_response_body_resource( - state: &mut OpState, + state: Rc>, #[smi] slab_id: SlabId, #[smi] stream_rid: ResourceId, auto_close: bool, status: u16, ) -> Result<(), AnyError> { + // IMPORTANT: We might end up requiring the OpState lock in set_response if we need to drop the request + // body resource so we _cannot_ hold the OpState lock longer than necessary. + // If the stream is auto_close, we will hold the last ref to it until the response is complete. - let resource = if auto_close { - state.resource_table.take_any(stream_rid)? - } else { - state.resource_table.get_any(stream_rid)? + // TODO(mmastrac): We should be using the same auto-close functionality rather than removing autoclose resources. + // It's possible things could fail elsewhere if code expects the rid to continue existing after the response has been + // returned. + let resource = { + let mut state = state.borrow_mut(); + if auto_close { + state.resource_table.take_any(stream_rid)? + } else { + state.resource_table.get_any(stream_rid)? + } }; set_response( diff --git a/ext/http/slab.rs b/ext/http/slab.rs index 8c285c8606..8dd562cc2d 100644 --- a/ext/http/slab.rs +++ b/ext/http/slab.rs @@ -234,7 +234,7 @@ impl SlabEntry { self.self_mut().request_body = Some(RequestBodyState::Resource(res)); } - /// Complete this entry, potentially expunging it if it is complete. + /// Complete this entry, potentially expunging it if it is fully complete (ie: dropped as well). pub fn complete(self) { let promise = &self.self_ref().promise; assert!( @@ -251,6 +251,12 @@ impl SlabEntry { } } + /// Has the future for this entry been dropped? ie, has the underlying TCP connection + /// been closed? + pub fn cancelled(&self) -> bool { + self.self_ref().been_dropped + } + /// Get a mutable reference to the response. pub fn response(&mut self) -> &mut Response { self.self_mut().response.as_mut().unwrap()