mirror of
synced 2025-03-09 13:49:37 -04:00

Use the [scopeguard](https://docs.rs/scopeguard/) defer macro to run cleanup code for `new_slab_future`. This means it can be a single async function, avoiding the need to create a struct and implement `PinnedDrop` Async cleanup in Rust is awkward because async functions may be cancelled at any await point when their Future is dropped. The scopeguard approach comes from the following articles: * [How to think about `async`/`await` in Rust](http://cliffle.com/blog/async-inversion/) * [Async Cancellation I](https://blog.yoshuawuyts.com/async-cancellation-1/) (Reddit [discussion](https://www.reddit.com/r/rust/comments/qrhg39/blog_post_async_cancellation/))
374 lines
11 KiB
374 lines
11 KiB
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
use crate::request_properties::HttpConnectionProperties;
use crate::response_body::CompletionHandle;
use crate::response_body::ResponseBytes;
use deno_core::error::AnyError;
use deno_core::OpState;
use deno_core::ResourceId;
use http::request::Parts;
use http::HeaderMap;
use hyper1::body::Incoming;
use hyper1::upgrade::OnUpgrade;
use scopeguard::defer;
use slab::Slab;
use std::cell::RefCell;
use std::cell::RefMut;
use std::ptr::NonNull;
use std::rc::Rc;
pub type Request = hyper1::Request<Incoming>;
pub type Response = hyper1::Response<ResponseBytes>;
pub type SlabId = u32;
#[derive(Clone, Default)]
pub struct RefCount(pub Rc<()>);
enum RequestBodyState {
impl From<Incoming> for RequestBodyState {
fn from(value: Incoming) -> Self {
/// Ensures that the request body closes itself when no longer needed.
pub struct HttpRequestBodyAutocloser(ResourceId, Rc<RefCell<OpState>>);
impl HttpRequestBodyAutocloser {
pub fn new(res: ResourceId, op_state: Rc<RefCell<OpState>>) -> Self {
Self(res, op_state)
impl Drop for HttpRequestBodyAutocloser {
fn drop(&mut self) {
if let Ok(res) = self.1.borrow_mut().resource_table.take_any(self.0) {
pub async fn new_slab_future(
request: Request,
request_info: HttpConnectionProperties,
refcount: RefCount,
tx: tokio::sync::mpsc::Sender<SlabId>,
) -> Result<Response, hyper::Error> {
let index = slab_insert(request, request_info, refcount);
defer! {
let rx = slab_get(index).promise();
if tx.send(index).await.is_ok() {
http_trace!(index, "SlabFuture await");
// We only need to wait for completion if we aren't closed
http_trace!(index, "SlabFuture complete");
let response = slab_get(index).take_response();
pub struct HttpSlabRecord {
request_info: HttpConnectionProperties,
request_parts: Parts,
request_body: Option<RequestBodyState>,
/// The response may get taken before we tear this down
response: Option<Response>,
promise: CompletionHandle,
trailers: Rc<RefCell<Option<HeaderMap>>>,
been_dropped: bool,
/// Use a `Rc` to keep track of outstanding requests. We don't use this, but
/// when it drops, it decrements the refcount of the server itself.
refcount: Option<RefCount>,
#[cfg(feature = "__zombie_http_tracking")]
alive: bool,
thread_local! {
pub(crate) static SLAB: RefCell<Slab<HttpSlabRecord>> = const { RefCell::new(Slab::new()) };
macro_rules! http_trace {
($index:expr, $args:tt) => {
#[cfg(feature = "__http_tracing")]
let total = $crate::slab::SLAB.with(|x| x.try_borrow().map(|x| x.len()));
if let Ok(total) = total {
println!("HTTP id={} total={}: {}", $index, total, format!($args));
} else {
println!("HTTP id={} total=?: {}", $index, format!($args));
pub(crate) use http_trace;
/// Hold a lock on the slab table and a reference to one entry in the table.
pub struct SlabEntry(
RefMut<'static, Slab<HttpSlabRecord>>,
const SLAB_CAPACITY: usize = 1024;
pub fn slab_init() {
SLAB.with(|slab: &RefCell<Slab<HttpSlabRecord>>| {
// Note that there might already be an active HTTP server, so this may just
// end up adding room for an additional SLAB_CAPACITY items. All HTTP servers
// on a single thread share the same slab.
let mut slab = slab.borrow_mut();
pub fn slab_get(index: SlabId) -> SlabEntry {
http_trace!(index, "slab_get");
let mut lock: RefMut<'static, Slab<HttpSlabRecord>> = SLAB.with(|x| {
// SAFETY: We're extracting a lock here and placing it into an object that is thread-local, !Send as a &'static
unsafe { std::mem::transmute(x.borrow_mut()) }
let Some(entry) = lock.get_mut(index as usize) else {
panic!("HTTP state error: Attempted to access invalid request {} ({} in total available)",
#[cfg(feature = "__zombie_http_tracking")]
assert!(entry.alive, "HTTP state error: Entry is not alive");
let entry = NonNull::new(entry as _).unwrap();
SlabEntry(entry, index, lock)
fn slab_insert_raw(
request_parts: Parts,
request_body: Option<Incoming>,
request_info: HttpConnectionProperties,
refcount: RefCount,
) -> SlabId {
let index = SLAB.with(|slab| {
let mut slab = slab.borrow_mut();
let body = ResponseBytes::default();
let trailers = body.trailers();
let request_body = request_body.map(|r| r.into());
slab.insert(HttpSlabRecord {
response: Some(Response::new(body)),
been_dropped: false,
promise: CompletionHandle::default(),
refcount: Some(refcount),
#[cfg(feature = "__zombie_http_tracking")]
alive: true,
}) as u32;
http_trace!(index, "slab_insert");
pub fn slab_insert(
request: Request,
request_info: HttpConnectionProperties,
refcount: RefCount,
) -> SlabId {
let (request_parts, request_body) = request.into_parts();
slab_insert_raw(request_parts, Some(request_body), request_info, refcount)
pub fn slab_drop(index: SlabId) {
http_trace!(index, "slab_drop");
let mut entry = slab_get(index);
let record = entry.self_mut();
"HTTP state error: Entry has already been dropped"
// The logic here is somewhat complicated. A slab record cannot be expunged until it has been dropped by Rust AND
// the promise has been completed (indicating that JavaScript is done processing). However, if Rust has finished
// dealing with this entry, we DO want to clean up some of the associated items -- namely the request body, which
// might include actual resources, and the refcount, which is keeping the server alive.
record.been_dropped = true;
if record.promise.is_completed() {
} else {
// Take the request body, as the future has been dropped and this will allow some resources to close
// Take the refcount keeping the server alive. The future is no longer alive, which means this request
// is toast.
fn slab_expunge(index: SlabId) {
SLAB.with(|slab| {
slab.borrow_mut().get_mut(index as usize).unwrap().alive = false;
slab.borrow_mut().remove(index as usize);
http_trace!(index, "slab_expunge");
impl SlabEntry {
fn self_ref(&self) -> &HttpSlabRecord {
// SAFETY: We have the lock and we're borrowing lifetime from self
unsafe { self.0.as_ref() }
fn self_mut(&mut self) -> &mut HttpSlabRecord {
// SAFETY: We have the lock and we're borrowing lifetime from self
unsafe { self.0.as_mut() }
/// Perform the Hyper upgrade on this entry.
pub fn upgrade(&mut self) -> Result<OnUpgrade, AnyError> {
// Manually perform the upgrade. We're peeking into hyper's underlying machinery here a bit
.ok_or_else(|| AnyError::msg("upgrade unavailable"))
/// Take the Hyper body from this entry.
pub fn take_body(&mut self) -> Option<Incoming> {
let body_holder = &mut self.self_mut().request_body;
let body = body_holder.take();
match body {
Some(RequestBodyState::Incoming(body)) => Some(body),
x => {
*body_holder = x;
pub fn take_resource(&mut self) -> Option<HttpRequestBodyAutocloser> {
let body_holder = &mut self.self_mut().request_body;
let body = body_holder.take();
match body {
Some(RequestBodyState::Resource(res)) => Some(res),
x => {
*body_holder = x;
/// Replace the request body with a resource ID and the OpState we'll need to shut it down.
/// We cannot keep just the resource itself, as JS code might be reading from the resource ID
/// to generate the response data (requiring us to keep it in the resource table).
pub fn put_resource(&mut self, res: HttpRequestBodyAutocloser) {
self.self_mut().request_body = Some(RequestBodyState::Resource(res));
/// Complete this entry, potentially expunging it if it is fully complete (ie: dropped as well).
pub fn complete(self) {
let promise = &self.self_ref().promise;
"HTTP state error: Entry has already been completed"
http_trace!(self.1, "SlabEntry::complete");
// If we're all done, we need to drop ourself to release the lock before we expunge this record
if self.self_ref().been_dropped {
let index = self.1;
/// Has the future for this entry been dropped? ie, has the underlying TCP connection
/// been closed?
pub fn cancelled(&self) -> bool {
/// Get a mutable reference to the response.
pub fn response(&mut self) -> &mut Response {
/// Get a mutable reference to the trailers.
pub fn trailers(&mut self) -> &RefCell<Option<HeaderMap>> {
/// Take the response.
pub fn take_response(&mut self) -> Response {
/// Get a reference to the connection properties.
pub fn request_info(&self) -> &HttpConnectionProperties {
/// Get a reference to the request parts.
pub fn request_parts(&self) -> &Parts {
/// Get a reference to the completion handle.
pub fn promise(&self) -> CompletionHandle {
/// Get a reference to the response body completion handle.
pub fn body_promise(&self) -> CompletionHandle {
mod tests {
use super::*;
use deno_net::raw::NetworkStreamType;
use http::Request;
fn test_slab() {
let req = Request::builder().body(()).unwrap();
let (parts, _) = req.into_parts();
let id = slab_insert_raw(
HttpConnectionProperties {
peer_address: "".into(),
peer_port: None,
local_port: None,
stream_type: NetworkStreamType::Tcp,
let entry = slab_get(id);