From 8fcea5966c0e723c30e00f6661477dc2e7062d11 Mon Sep 17 00:00:00 2001 From: Laurence Rowe Date: Tue, 26 Sep 2023 04:42:48 -0700 Subject: [PATCH] refactor(ext/http): use scopeguard defer to handle async drop (#20652) Use the [scopeguard](https://docs.rs/scopeguard/) defer macro to run cleanup code for `new_slab_future`. This means it can be a single async function, avoiding the need to create a struct and implement `PinnedDrop` Async cleanup in Rust is awkward because async functions may be cancelled at any await point when their Future is dropped. The scopeguard approach comes from the following articles: * [How to think about `async`/`await` in Rust](http://cliffle.com/blog/async-inversion/) * [Async Cancellation I](https://blog.yoshuawuyts.com/async-cancellation-1/) (Reddit [discussion](https://www.reddit.com/r/rust/comments/qrhg39/blog_post_async_cancellation/)) --- Cargo.lock | 1 + Cargo.toml | 1 + ext/http/Cargo.toml | 1 + ext/http/http_next.rs | 53 +------------------------------------------ ext/http/slab.rs | 22 ++++++++++++++++++ 5 files changed, 26 insertions(+), 52 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a1777c7272..798333c0c8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1411,6 +1411,7 @@ dependencies = [ "pin-project", "rand 0.8.5", "ring", + "scopeguard", "serde", "slab", "smallvec", diff --git a/Cargo.toml b/Cargo.toml index afb7f87055..73e6fdbd5f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -129,6 +129,7 @@ rustls-pemfile = "1.0.0" rustls-webpki = "0.101.4" rustls-native-certs = "0.6.2" webpki-roots = "0.25.2" +scopeguard = "1.2.0" serde = { version = "1.0.149", features = ["derive"] } serde_bytes = "0.11" serde_json = "1.0.85" diff --git a/ext/http/Cargo.toml b/ext/http/Cargo.toml index b55f80423f..4c7de14aca 100644 --- a/ext/http/Cargo.toml +++ b/ext/http/Cargo.toml @@ -44,6 +44,7 @@ percent-encoding.workspace = true phf = { version = "0.10", features = ["macros"] } pin-project.workspace = true ring.workspace = true +scopeguard.workspace = true serde.workspace = true slab.workspace = true smallvec.workspace = true diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs index 7ccd9ec816..880925603f 100644 --- a/ext/http/http_next.rs +++ b/ext/http/http_next.rs @@ -10,11 +10,9 @@ use crate::request_properties::HttpPropertyExtractor; use crate::response_body::Compression; use crate::response_body::ResponseBytes; use crate::response_body::ResponseBytesInner; -use crate::slab::http_trace; -use crate::slab::slab_drop; +use crate::slab::new_slab_future; use crate::slab::slab_get; use crate::slab::slab_init; -use crate::slab::slab_insert; use crate::slab::HttpRequestBodyAutocloser; use crate::slab::RefCount; use crate::slab::SlabId; @@ -61,8 +59,6 @@ use hyper1::service::service_fn; use hyper1::service::HttpService; use hyper1::StatusCode; use once_cell::sync::Lazy; -use pin_project::pin_project; -use pin_project::pinned_drop; use smallvec::SmallVec; use std::borrow::Cow; use std::cell::RefCell; @@ -76,7 +72,6 @@ use tokio::io::AsyncReadExt; use tokio::io::AsyncWriteExt; type Request = hyper1::Request; -type Response = hyper1::Response; static USE_WRITEV: Lazy = Lazy::new(|| { let enable = std::env::var("DENO_USE_WRITEV").ok(); @@ -706,52 +701,6 @@ pub async fn op_http_track( } } -#[pin_project(PinnedDrop)] -pub struct SlabFuture>(SlabId, #[pin] F); - -pub fn new_slab_future( - request: Request, - request_info: HttpConnectionProperties, - refcount: RefCount, - tx: tokio::sync::mpsc::Sender, -) -> SlabFuture> { - let index = slab_insert(request, request_info, refcount); - let rx = slab_get(index).promise(); - SlabFuture(index, async move { - if tx.send(index).await.is_ok() { - http_trace!(index, "SlabFuture await"); - // We only need to wait for completion if we aren't closed - rx.await; - http_trace!(index, "SlabFuture complete"); - } - }) -} - -impl> SlabFuture {} - -#[pinned_drop] -impl> PinnedDrop for SlabFuture { - fn drop(self: Pin<&mut Self>) { - slab_drop(self.0); - } -} - -impl> Future for SlabFuture { - type Output = Result; - - fn poll( - self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll { - let index = self.0; - self - .project() - .1 - .poll(cx) - .map(|_| Ok(slab_get(index).take_response())) - } -} - fn serve_http11_unconditional( io: impl HttpServeStream, svc: impl HttpService + 'static, diff --git a/ext/http/slab.rs b/ext/http/slab.rs index 7c48b87b64..790b4649a2 100644 --- a/ext/http/slab.rs +++ b/ext/http/slab.rs @@ -10,6 +10,7 @@ use http::HeaderMap; use hyper1::body::Incoming; use hyper1::upgrade::OnUpgrade; +use scopeguard::defer; use slab::Slab; use std::cell::RefCell; use std::cell::RefMut; @@ -52,6 +53,27 @@ impl Drop for HttpRequestBodyAutocloser { } } +pub async fn new_slab_future( + request: Request, + request_info: HttpConnectionProperties, + refcount: RefCount, + tx: tokio::sync::mpsc::Sender, +) -> Result { + let index = slab_insert(request, request_info, refcount); + defer! { + slab_drop(index); + } + let rx = slab_get(index).promise(); + if tx.send(index).await.is_ok() { + http_trace!(index, "SlabFuture await"); + // We only need to wait for completion if we aren't closed + rx.await; + http_trace!(index, "SlabFuture complete"); + } + let response = slab_get(index).take_response(); + Ok(response) +} + pub struct HttpSlabRecord { request_info: HttpConnectionProperties, request_parts: Parts,