From 25950baed347fa2311ecd59ae569a5d3ab4851f5 Mon Sep 17 00:00:00 2001 From: Laurence Rowe Date: Mon, 13 Nov 2023 09:32:34 -0800 Subject: [PATCH] perf(ext/http): Object pooling for HttpRecord and HeaderMap (#20809) Reuse existing existing allocations for HttpRecord and response HeaderMap where possible. At request end used allocations are returned to the pool and the pool and the pool sized to 1/8th the current number of inflight requests. For http1 hyper will reuse the response HeaderMap for the following request on the connection. Builds upon https://github.com/denoland/deno/pull/20770 --------- Co-authored-by: Matt Mastracci --- ext/http/http_next.rs | 27 ++++---- ext/http/service.rs | 141 +++++++++++++++++++++++++++++++----------- 2 files changed, 119 insertions(+), 49 deletions(-) diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs index 399515159e..4035fe2594 100644 --- a/ext/http/http_next.rs +++ b/ext/http/http_next.rs @@ -14,7 +14,7 @@ use crate::service::handle_request; use crate::service::http_trace; use crate::service::HttpRecord; use crate::service::HttpRequestBodyAutocloser; -use crate::service::RefCount; +use crate::service::HttpServerState; use crate::websocket_upgrade::WebSocketUpgrade; use crate::LocalExecutor; use cache_control::CacheControl; @@ -844,13 +844,13 @@ fn serve_https( tx: tokio::sync::mpsc::Sender>, ) -> JoinHandle> { let HttpLifetime { - refcount, + server_state, connection_cancel_handle, listen_cancel_handle, } = lifetime; let svc = service_fn(move |req: Request| { - handle_request(req, request_info.clone(), refcount.clone(), tx.clone()) + handle_request(req, request_info.clone(), server_state.clone(), tx.clone()) }); spawn( async { @@ -881,13 +881,13 @@ fn serve_http( tx: tokio::sync::mpsc::Sender>, ) -> JoinHandle> { let HttpLifetime { - refcount, + server_state, connection_cancel_handle, listen_cancel_handle, } = lifetime; let svc = service_fn(move |req: Request| { - handle_request(req, request_info.clone(), refcount.clone(), tx.clone()) + handle_request(req, request_info.clone(), server_state.clone(), tx.clone()) }); spawn( serve_http2_autodetect(io, svc, listen_cancel_handle) @@ -927,7 +927,7 @@ where struct HttpLifetime { connection_cancel_handle: Rc, listen_cancel_handle: Rc, - refcount: RefCount, + server_state: Rc, } struct HttpJoinHandle { @@ -935,7 +935,7 @@ struct HttpJoinHandle { connection_cancel_handle: Rc, listen_cancel_handle: Rc, rx: AsyncRefCell>>, - refcount: RefCount, + server_state: Rc, } impl HttpJoinHandle { @@ -945,7 +945,7 @@ impl HttpJoinHandle { connection_cancel_handle: CancelHandle::new_rc(), listen_cancel_handle: CancelHandle::new_rc(), rx: AsyncRefCell::new(rx), - refcount: RefCount::default(), + server_state: HttpServerState::new(), } } @@ -953,7 +953,7 @@ impl HttpJoinHandle { HttpLifetime { connection_cancel_handle: self.connection_cancel_handle.clone(), listen_cancel_handle: self.listen_cancel_handle.clone(), - refcount: self.refcount.clone(), + server_state: self.server_state.clone(), } } @@ -1194,17 +1194,16 @@ pub async fn op_http_close( // In a graceful shutdown, we close the listener and allow all the remaining connections to drain join_handle.listen_cancel_handle().cancel(); + // Async spin on the server_state while we wait for everything to drain + while Rc::strong_count(&join_handle.server_state) > 1 { + tokio::time::sleep(Duration::from_millis(10)).await; + } } else { // In a forceful shutdown, we close everything join_handle.listen_cancel_handle().cancel(); join_handle.connection_cancel_handle().cancel(); } - // Async spin on the refcount while we wait for everything to drain - while Rc::strong_count(&join_handle.refcount.0) > 1 { - tokio::time::sleep(Duration::from_millis(10)).await; - } - let mut join_handle = RcRef::map(&join_handle, |this| &this.join_handle) .borrow_mut() .await; diff --git a/ext/http/service.rs b/ext/http/service.rs index ea67980f3e..87b3087557 100644 --- a/ext/http/service.rs +++ b/ext/http/service.rs @@ -22,14 +22,14 @@ pub type Request = hyper1::Request; pub type Response = hyper1::Response; macro_rules! http_trace { - ($record:expr, $args:tt) => { + ($record:expr $(, $args:expr)*) => { #[cfg(feature = "__http_tracing")] { println!( "HTTP id={:p} strong={}: {}", $record, std::rc::Rc::strong_count(&$record), - format!($args), + format!($($args),*), ); } }; @@ -37,9 +37,19 @@ macro_rules! http_trace { pub(crate) use http_trace; -#[repr(transparent)] -#[derive(Clone, Default)] -pub struct RefCount(pub Rc<()>); +struct HttpServerStateInner { + pool: Vec<(Rc, HeaderMap)>, +} + +pub struct HttpServerState(RefCell); + +impl HttpServerState { + pub fn new() -> Rc { + Rc::new(Self(RefCell::new(HttpServerStateInner { + pool: Vec::new(), + }))) + } +} enum RequestBodyState { Incoming(Incoming), @@ -72,15 +82,17 @@ impl Drop for HttpRequestBodyAutocloser { pub async fn handle_request( request: Request, request_info: HttpConnectionProperties, - _refcount: RefCount, // Keep server alive for duration of this future. + server_state: Rc, // Keep server alive for duration of this future. tx: tokio::sync::mpsc::Sender>, ) -> Result { // If the underlying TCP connection is closed, this future will be dropped // and execution could stop at any await point. // The HttpRecord must live until JavaScript is done processing so is wrapped // in an Rc. The guard ensures unneeded resources are freed at cancellation. - let guarded_record = - guard(HttpRecord::new(request, request_info), HttpRecord::cancel); + let guarded_record = guard( + HttpRecord::new(request, request_info, server_state), + HttpRecord::cancel, + ); // Clone HttpRecord and send to JavaScript for processing. // Safe to unwrap as channel receiver is never closed. @@ -93,15 +105,13 @@ pub async fn handle_request( // Defuse the guard. Must not await after the point. let record = ScopeGuard::into_inner(guarded_record); http_trace!(record, "handle_request complete"); - assert!( - Rc::strong_count(&record) == 1, - "HTTP state error: Expected to be last strong reference (handle_request)" - ); let response = record.take_response(); + record.recycle(); Ok(response) } struct HttpRecordInner { + server_state: Rc, request_info: HttpConnectionProperties, request_parts: Parts, request_body: Option, @@ -113,7 +123,7 @@ struct HttpRecordInner { been_dropped: bool, } -pub struct HttpRecord(RefCell); +pub struct HttpRecord(RefCell>); #[cfg(feature = "__http_tracing")] pub static RECORD_COUNT: std::sync::atomic::AtomicUsize = @@ -131,37 +141,86 @@ impl Drop for HttpRecord { } impl HttpRecord { - fn new(request: Request, request_info: HttpConnectionProperties) -> Rc { - #[cfg(feature = "__http_tracing")] - { - RECORD_COUNT.fetch_add(1, std::sync::atomic::Ordering::SeqCst); - } + fn new( + request: Request, + request_info: HttpConnectionProperties, + server_state: Rc, + ) -> Rc { let (request_parts, request_body) = request.into_parts(); let body = ResponseBytes::default(); let trailers = body.trailers(); let request_body = Some(request_body.into()); + let mut response = Response::new(body); + let reuse_record = + if let Some((record, headers)) = server_state.0.borrow_mut().pool.pop() { + *response.headers_mut() = headers; + Some(record) + } else { + None + }; let inner = HttpRecordInner { + server_state, request_info, request_parts, request_body, - response: Some(Response::new(body)), + response: Some(response), response_ready: false, response_waker: None, trailers, been_dropped: false, }; - #[allow(clippy::let_and_return)] - let record = Rc::new(Self(RefCell::new(inner))); - http_trace!(record, "HttpRecord::new"); - record + if let Some(record) = reuse_record { + *record.0.borrow_mut() = Some(inner); + http_trace!(record, "HttpRecord::reuse"); + record + } else { + #[cfg(feature = "__http_tracing")] + { + RECORD_COUNT.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + } + + #[allow(clippy::let_and_return)] + let record = Rc::new(Self(RefCell::new(Some(inner)))); + http_trace!(record, "HttpRecord::new"); + record + } + } + + fn recycle(self: Rc) { + assert!( + Rc::strong_count(&self) == 1, + "HTTP state error: Expected to be last strong reference" + ); + let HttpRecordInner { + server_state, + request_parts: Parts { mut headers, .. }, + .. + } = self.0.borrow_mut().take().unwrap(); + let mut server_state_mut = server_state.0.borrow_mut(); + let inflight = Rc::strong_count(&server_state); + http_trace!(self, "HttpRecord::recycle inflight={}", inflight); + + // TODO(mmastrac): we never recover the pooled memory here, and we could likely be shuttling + // the to-drop objects off to another thread. + + // Keep a buffer of allocations on hand to be reused by incoming requests. + // Estimated target size is 16 + 1/8 the number of inflight requests. + let target = 16 + (inflight >> 3); + let pool = &mut server_state_mut.pool; + if target > pool.len() { + headers.clear(); + pool.push((self, headers)); + } else if target < pool.len() - 8 { + pool.truncate(target); + } } fn self_ref(&self) -> Ref<'_, HttpRecordInner> { - self.0.borrow() + Ref::map(self.0.borrow(), |option| option.as_ref().unwrap()) } fn self_mut(&self) -> RefMut<'_, HttpRecordInner> { - self.0.borrow_mut() + RefMut::map(self.0.borrow_mut(), |option| option.as_mut().unwrap()) } /// Perform the Hyper upgrade on this record. @@ -210,7 +269,13 @@ impl HttpRecord { /// Cleanup resources not needed after the future is dropped. fn cancel(self: Rc) { http_trace!(self, "HttpRecord::cancel"); - let mut inner = self.0.borrow_mut(); + let mut inner = self.self_mut(); + if inner.response_ready { + // Future dropped between wake() and async fn resuming. + drop(inner); + self.recycle(); + return; + } inner.been_dropped = true; // The request body might include actual resources. inner.request_body.take(); @@ -220,14 +285,15 @@ impl HttpRecord { pub fn complete(self: Rc) { http_trace!(self, "HttpRecord::complete"); let mut inner = self.self_mut(); - assert!( - !inner.been_dropped || Rc::strong_count(&self) == 1, - "HTTP state error: Expected to be last strong reference (been_dropped)" - ); assert!( !inner.response_ready, "HTTP state error: Entry has already been completed" ); + if inner.been_dropped { + drop(inner); + self.recycle(); + return; + } inner.response_ready = true; if let Some(waker) = inner.response_waker.take() { drop(inner); @@ -277,7 +343,7 @@ impl HttpRecord { self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>, ) -> std::task::Poll { - let mut mut_self = self.0 .0.borrow_mut(); + let mut mut_self = self.0.self_mut(); if mut_self.response_ready { return std::task::Poll::Ready(()); } @@ -352,8 +418,8 @@ mod tests { #[tokio::test] async fn test_handle_request() -> Result<(), AnyError> { let (tx, mut rx) = tokio::sync::mpsc::channel(10); - let refcount = RefCount::default(); - let refcount_check = refcount.clone(); + let server_state = HttpServerState::new(); + let server_state_check = server_state.clone(); let request_info = HttpConnectionProperties { peer_address: "".into(), peer_port: None, @@ -361,7 +427,12 @@ mod tests { stream_type: NetworkStreamType::Tcp, }; let svc = service_fn(move |req: hyper1::Request| { - handle_request(req, request_info.clone(), refcount.clone(), tx.clone()) + handle_request( + req, + request_info.clone(), + server_state.clone(), + tx.clone(), + ) }); let client_req = http::Request::builder().uri("/").body("".to_string())?; @@ -395,7 +466,7 @@ mod tests { .await }, )?; - assert_eq!(Rc::strong_count(&refcount_check.0), 1); + assert_eq!(Rc::strong_count(&server_state_check), 1); Ok(()) } }