mirror of
https://github.com/denoland/deno.git
synced 2025-01-21 21:50:00 -05:00
fix(ext/http): don't panic on stream responses in cancelled requests (#20316)
When a TCP connection is force-closed (ie: browser refresh), the underlying future we pass to Hyper is dropped which may cause us to try to drop the body resource while the OpState lock is still held. Preconditions for this bug to trigger: - The body resource must have been taken - The response must return a resource (which requires us to take the OpState lock) - The TCP connection must have been dropped before this Fixes #20315 and #20298
This commit is contained in:
parent
539e5032d3
commit
9198bbd454
3 changed files with 102 additions and 21 deletions
|
@ -2403,6 +2403,68 @@ Deno.test(
|
|||
},
|
||||
);
|
||||
|
||||
for (const url of ["text", "file", "stream"]) {
|
||||
// Ensure that we don't panic when the incoming TCP request was dropped
|
||||
// https://github.com/denoland/deno/issues/20315
|
||||
Deno.test({
|
||||
permissions: { read: true, write: true, net: true },
|
||||
name: `httpServerTcpCancellation_${url}`,
|
||||
fn: async function () {
|
||||
const ac = new AbortController();
|
||||
const listeningPromise = deferred();
|
||||
const waitForAbort = deferred();
|
||||
const waitForRequest = deferred();
|
||||
const server = Deno.serve({
|
||||
port: servePort,
|
||||
signal: ac.signal,
|
||||
onListen: onListen(listeningPromise),
|
||||
handler: async (req: Request) => {
|
||||
waitForRequest.resolve();
|
||||
await waitForAbort;
|
||||
// Allocate the request body
|
||||
let _body = req.body;
|
||||
if (req.url.includes("/text")) {
|
||||
return new Response("text");
|
||||
} else if (req.url.includes("/file")) {
|
||||
return new Response((await makeTempFile(1024)).readable);
|
||||
} else if (req.url.includes("/stream")) {
|
||||
return new Response(
|
||||
new ReadableStream({
|
||||
start(controller) {
|
||||
_body = null;
|
||||
controller.enqueue(new Uint8Array([1]));
|
||||
controller.close();
|
||||
},
|
||||
}),
|
||||
);
|
||||
} else {
|
||||
fail();
|
||||
}
|
||||
},
|
||||
});
|
||||
|
||||
await listeningPromise;
|
||||
|
||||
// Create a POST request and drop it once the server has received it
|
||||
const conn = await Deno.connect({ port: servePort });
|
||||
const writer = conn.writable.getWriter();
|
||||
writer.write(new TextEncoder().encode(`POST /${url} HTTP/1.0\n\n`));
|
||||
await waitForRequest;
|
||||
writer.close();
|
||||
|
||||
// Give it a few milliseconds for the serve machinery to work
|
||||
await new Promise((r) => setTimeout(r, 10));
|
||||
waitForAbort.resolve();
|
||||
|
||||
// Give it a few milliseconds for the serve machinery to work
|
||||
await new Promise((r) => setTimeout(r, 10));
|
||||
|
||||
ac.abort();
|
||||
await server.finished;
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
Deno.test(
|
||||
{ permissions: { read: true, net: true } },
|
||||
async function httpServerWithTls() {
|
||||
|
|
|
@ -585,39 +585,52 @@ fn set_response(
|
|||
response_fn: impl FnOnce(Compression) -> ResponseBytesInner,
|
||||
) {
|
||||
let mut http = slab_get(slab_id);
|
||||
let resource = http.take_resource();
|
||||
let compression = is_request_compressible(&http.request_parts().headers);
|
||||
let response = http.response();
|
||||
let compression = modify_compressibility_from_response(
|
||||
compression,
|
||||
length,
|
||||
response.headers_mut(),
|
||||
);
|
||||
response
|
||||
.body_mut()
|
||||
.initialize(response_fn(compression), resource);
|
||||
// The request may have been cancelled by this point and if so, there's no need for us to
|
||||
// do all of this work to send the response.
|
||||
if !http.cancelled() {
|
||||
let resource = http.take_resource();
|
||||
let compression = is_request_compressible(&http.request_parts().headers);
|
||||
let response = http.response();
|
||||
let compression = modify_compressibility_from_response(
|
||||
compression,
|
||||
length,
|
||||
response.headers_mut(),
|
||||
);
|
||||
response
|
||||
.body_mut()
|
||||
.initialize(response_fn(compression), resource);
|
||||
|
||||
// The Javascript code should never provide a status that is invalid here (see 23_response.js), so we
|
||||
// will quitely ignore invalid values.
|
||||
if let Ok(code) = StatusCode::from_u16(status) {
|
||||
*response.status_mut() = code;
|
||||
// The Javascript code should never provide a status that is invalid here (see 23_response.js), so we
|
||||
// will quitely ignore invalid values.
|
||||
if let Ok(code) = StatusCode::from_u16(status) {
|
||||
*response.status_mut() = code;
|
||||
}
|
||||
}
|
||||
http.complete();
|
||||
}
|
||||
|
||||
#[op2(fast)]
|
||||
pub fn op_http_set_response_body_resource(
|
||||
state: &mut OpState,
|
||||
state: Rc<RefCell<OpState>>,
|
||||
#[smi] slab_id: SlabId,
|
||||
#[smi] stream_rid: ResourceId,
|
||||
auto_close: bool,
|
||||
status: u16,
|
||||
) -> Result<(), AnyError> {
|
||||
// IMPORTANT: We might end up requiring the OpState lock in set_response if we need to drop the request
|
||||
// body resource so we _cannot_ hold the OpState lock longer than necessary.
|
||||
|
||||
// If the stream is auto_close, we will hold the last ref to it until the response is complete.
|
||||
let resource = if auto_close {
|
||||
state.resource_table.take_any(stream_rid)?
|
||||
} else {
|
||||
state.resource_table.get_any(stream_rid)?
|
||||
// TODO(mmastrac): We should be using the same auto-close functionality rather than removing autoclose resources.
|
||||
// It's possible things could fail elsewhere if code expects the rid to continue existing after the response has been
|
||||
// returned.
|
||||
let resource = {
|
||||
let mut state = state.borrow_mut();
|
||||
if auto_close {
|
||||
state.resource_table.take_any(stream_rid)?
|
||||
} else {
|
||||
state.resource_table.get_any(stream_rid)?
|
||||
}
|
||||
};
|
||||
|
||||
set_response(
|
||||
|
|
|
@ -234,7 +234,7 @@ impl SlabEntry {
|
|||
self.self_mut().request_body = Some(RequestBodyState::Resource(res));
|
||||
}
|
||||
|
||||
/// Complete this entry, potentially expunging it if it is complete.
|
||||
/// Complete this entry, potentially expunging it if it is fully complete (ie: dropped as well).
|
||||
pub fn complete(self) {
|
||||
let promise = &self.self_ref().promise;
|
||||
assert!(
|
||||
|
@ -251,6 +251,12 @@ impl SlabEntry {
|
|||
}
|
||||
}
|
||||
|
||||
/// Has the future for this entry been dropped? ie, has the underlying TCP connection
|
||||
/// been closed?
|
||||
pub fn cancelled(&self) -> bool {
|
||||
self.self_ref().been_dropped
|
||||
}
|
||||
|
||||
/// Get a mutable reference to the response.
|
||||
pub fn response(&mut self) -> &mut Response {
|
||||
self.self_mut().response.as_mut().unwrap()
|
||||
|
|
Loading…
Add table
Reference in a new issue