diff --git a/cli/tests/unit/serve_test.ts b/cli/tests/unit/serve_test.ts index 05affc4f87..d1ac826967 100644 --- a/cli/tests/unit/serve_test.ts +++ b/cli/tests/unit/serve_test.ts @@ -2403,6 +2403,68 @@ Deno.test( }, ); +for (const url of ["text", "file", "stream"]) { + // Ensure that we don't panic when the incoming TCP request was dropped + // https://github.com/denoland/deno/issues/20315 + Deno.test({ + permissions: { read: true, write: true, net: true }, + name: `httpServerTcpCancellation_${url}`, + fn: async function () { + const ac = new AbortController(); + const listeningPromise = deferred(); + const waitForAbort = deferred(); + const waitForRequest = deferred(); + const server = Deno.serve({ + port: servePort, + signal: ac.signal, + onListen: onListen(listeningPromise), + handler: async (req: Request) => { + waitForRequest.resolve(); + await waitForAbort; + // Allocate the request body + let _body = req.body; + if (req.url.includes("/text")) { + return new Response("text"); + } else if (req.url.includes("/file")) { + return new Response((await makeTempFile(1024)).readable); + } else if (req.url.includes("/stream")) { + return new Response( + new ReadableStream({ + start(controller) { + _body = null; + controller.enqueue(new Uint8Array([1])); + controller.close(); + }, + }), + ); + } else { + fail(); + } + }, + }); + + await listeningPromise; + + // Create a POST request and drop it once the server has received it + const conn = await Deno.connect({ port: servePort }); + const writer = conn.writable.getWriter(); + writer.write(new TextEncoder().encode(`POST /${url} HTTP/1.0\n\n`)); + await waitForRequest; + writer.close(); + + // Give it a few milliseconds for the serve machinery to work + await new Promise((r) => setTimeout(r, 10)); + waitForAbort.resolve(); + + // Give it a few milliseconds for the serve machinery to work + await new Promise((r) => setTimeout(r, 10)); + + ac.abort(); + await server.finished; + }, + }); +} + Deno.test( { permissions: { read: true, net: true } }, async function httpServerWithTls() { diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs index e8517c9019..476a55a804 100644 --- a/ext/http/http_next.rs +++ b/ext/http/http_next.rs @@ -585,39 +585,52 @@ fn set_response( response_fn: impl FnOnce(Compression) -> ResponseBytesInner, ) { let mut http = slab_get(slab_id); - let resource = http.take_resource(); - let compression = is_request_compressible(&http.request_parts().headers); - let response = http.response(); - let compression = modify_compressibility_from_response( - compression, - length, - response.headers_mut(), - ); - response - .body_mut() - .initialize(response_fn(compression), resource); + // The request may have been cancelled by this point and if so, there's no need for us to + // do all of this work to send the response. + if !http.cancelled() { + let resource = http.take_resource(); + let compression = is_request_compressible(&http.request_parts().headers); + let response = http.response(); + let compression = modify_compressibility_from_response( + compression, + length, + response.headers_mut(), + ); + response + .body_mut() + .initialize(response_fn(compression), resource); - // The Javascript code should never provide a status that is invalid here (see 23_response.js), so we - // will quitely ignore invalid values. - if let Ok(code) = StatusCode::from_u16(status) { - *response.status_mut() = code; + // The Javascript code should never provide a status that is invalid here (see 23_response.js), so we + // will quitely ignore invalid values. + if let Ok(code) = StatusCode::from_u16(status) { + *response.status_mut() = code; + } } http.complete(); } #[op2(fast)] pub fn op_http_set_response_body_resource( - state: &mut OpState, + state: Rc>, #[smi] slab_id: SlabId, #[smi] stream_rid: ResourceId, auto_close: bool, status: u16, ) -> Result<(), AnyError> { + // IMPORTANT: We might end up requiring the OpState lock in set_response if we need to drop the request + // body resource so we _cannot_ hold the OpState lock longer than necessary. + // If the stream is auto_close, we will hold the last ref to it until the response is complete. - let resource = if auto_close { - state.resource_table.take_any(stream_rid)? - } else { - state.resource_table.get_any(stream_rid)? + // TODO(mmastrac): We should be using the same auto-close functionality rather than removing autoclose resources. + // It's possible things could fail elsewhere if code expects the rid to continue existing after the response has been + // returned. + let resource = { + let mut state = state.borrow_mut(); + if auto_close { + state.resource_table.take_any(stream_rid)? + } else { + state.resource_table.get_any(stream_rid)? + } }; set_response( diff --git a/ext/http/slab.rs b/ext/http/slab.rs index 8c285c8606..8dd562cc2d 100644 --- a/ext/http/slab.rs +++ b/ext/http/slab.rs @@ -234,7 +234,7 @@ impl SlabEntry { self.self_mut().request_body = Some(RequestBodyState::Resource(res)); } - /// Complete this entry, potentially expunging it if it is complete. + /// Complete this entry, potentially expunging it if it is fully complete (ie: dropped as well). pub fn complete(self) { let promise = &self.self_ref().promise; assert!( @@ -251,6 +251,12 @@ impl SlabEntry { } } + /// Has the future for this entry been dropped? ie, has the underlying TCP connection + /// been closed? + pub fn cancelled(&self) -> bool { + self.self_ref().been_dropped + } + /// Get a mutable reference to the response. pub fn response(&mut self) -> &mut Response { self.self_mut().response.as_mut().unwrap()