1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-21 13:00:36 -05:00

feat(ext/http): abort event when request is cancelled (#26781)

```js
Deno.serve(async (req) => {
  const { promise, resolve } = Promise.withResolvers<void>();

  req.signal.addEventListener("abort", () => {
    resolve();
  });

  await promise;

  return new Response("Ok");
});
```
This commit is contained in:
Divy Srivastava 2024-11-08 18:46:11 +05:30 committed by GitHub
parent 637b1d5508
commit b482a50299
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 65 additions and 8 deletions

View file

@ -281,11 +281,11 @@ class Request {
if (signal === undefined) {
const signal = newSignal();
this[_signalCache] = signal;
return signal;
}
if (!signal.aborted && this[_request].isCancelled) {
this[_request].onCancel?.(() => {
signal[signalAbort](signalAbortError);
});
return signal;
}
return signal;

View file

@ -11,10 +11,10 @@ import {
op_http_cancel,
op_http_close,
op_http_close_after_finish,
op_http_get_request_cancelled,
op_http_get_request_headers,
op_http_get_request_method_and_url,
op_http_read_request_body,
op_http_request_on_cancel,
op_http_serve,
op_http_serve_on,
op_http_set_promise_complete,
@ -375,11 +375,16 @@ class InnerRequest {
return this.#external;
}
get isCancelled() {
onCancel(callback) {
if (this.#external === null) {
return true;
callback();
return;
}
return op_http_get_request_cancelled(this.#external);
PromisePrototypeThen(
op_http_request_on_cancel(this.#external),
callback,
);
}
}

View file

@ -708,6 +708,19 @@ pub fn op_http_get_request_cancelled(external: *const c_void) -> bool {
http.cancelled()
}
#[op2(async)]
pub async fn op_http_request_on_cancel(external: *const c_void) {
let http =
// SAFETY: op is called with external.
unsafe { clone_external!(external, "op_http_request_on_cancel") };
let (tx, rx) = tokio::sync::oneshot::channel();
http.on_cancel(tx);
drop(http);
rx.await.ok();
}
/// Returned promise resolves when body streaming finishes.
/// Call [`op_http_close_after_finish`] when done with the external.
#[op2(async)]

View file

@ -112,6 +112,7 @@ deno_core::extension!(
http_next::op_http_close_after_finish,
http_next::op_http_get_request_header,
http_next::op_http_get_request_headers,
http_next::op_http_request_on_cancel,
http_next::op_http_get_request_method_and_url<HTTP>,
http_next::op_http_get_request_cancelled,
http_next::op_http_read_request_body,

View file

@ -27,6 +27,7 @@ use std::rc::Rc;
use std::task::Context;
use std::task::Poll;
use std::task::Waker;
use tokio::sync::oneshot;
pub type Request = hyper::Request<Incoming>;
pub type Response = hyper::Response<HttpRecordResponse>;
@ -211,6 +212,7 @@ pub struct UpgradeUnavailableError;
struct HttpRecordInner {
server_state: SignallingRc<HttpServerState>,
closed_channel: Option<oneshot::Sender<()>>,
request_info: HttpConnectionProperties,
request_parts: http::request::Parts,
request_body: Option<RequestBodyState>,
@ -276,6 +278,7 @@ impl HttpRecord {
response_body_finished: false,
response_body_waker: None,
trailers: None,
closed_channel: None,
been_dropped: false,
finished: false,
needs_close_after_finish: false,
@ -312,6 +315,10 @@ impl HttpRecord {
RefMut::map(self.self_mut(), |inner| &mut inner.needs_close_after_finish)
}
pub fn on_cancel(&self, sender: oneshot::Sender<()>) {
self.self_mut().closed_channel = Some(sender);
}
fn recycle(self: Rc<Self>) {
assert!(
Rc::strong_count(&self) == 1,
@ -390,6 +397,9 @@ impl HttpRecord {
inner.been_dropped = true;
// The request body might include actual resources.
inner.request_body.take();
if let Some(closed_channel) = inner.closed_channel.take() {
let _ = closed_channel.send(());
}
}
/// Complete this record, potentially expunging it if it is fully complete (ie: cancelled as well).

View file

@ -4299,3 +4299,31 @@ Deno.test({
assert(cancelled);
});
Deno.test({
name: "AbortSignal event aborted when request is cancelled",
}, async () => {
const { promise, resolve } = Promise.withResolvers<void>();
const server = Deno.serve({
hostname: "0.0.0.0",
port: servePort,
onListen: () => resolve(),
}, async (request) => {
const { promise: promiseAbort, resolve: resolveAbort } = Promise
.withResolvers<void>();
request.signal.addEventListener("abort", () => resolveAbort());
assert(!request.signal.aborted);
await promiseAbort;
return new Response("Ok");
});
await promise;
await fetch(`http://localhost:${servePort}/`, {
signal: AbortSignal.timeout(100),
}).catch(() => {});
await server.shutdown();
});