From fbefceeb56ebde276fe2fe9e5bcb7ebbcdc9ab22 Mon Sep 17 00:00:00 2001 From: Divy Srivastava Date: Wed, 26 Apr 2023 15:38:13 +0530 Subject: [PATCH] perf(ext/http): use smi for slab IDs (#18848) --- ext/http/http_next.rs | 80 ++++++++++++++++++++----------------------- 1 file changed, 38 insertions(+), 42 deletions(-) diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs index 71f2a32b68..1c2a232e20 100644 --- a/ext/http/http_next.rs +++ b/ext/http/http_next.rs @@ -141,11 +141,11 @@ macro_rules! with { ($ref:ident, $mut:ident, $type:ty, $http:ident, $expr:expr) => { #[inline(always)] #[allow(dead_code)] - pub(crate) fn $mut(key: usize, f: impl FnOnce(&mut $type) -> T) -> T { + pub(crate) fn $mut(key: u32, f: impl FnOnce(&mut $type) -> T) -> T { SLAB.with(|slab| { let mut borrow = slab.borrow_mut(); #[allow(unused_mut)] // TODO(mmastrac): compiler issue? - let mut $http = match borrow.get_mut(key) { + let mut $http = match borrow.get_mut(key as usize) { Some(http) => http, None => panic!( "Attemped to access invalid request {} ({} in total available)", @@ -163,10 +163,10 @@ macro_rules! with { #[inline(always)] #[allow(dead_code)] - pub(crate) fn $ref(key: usize, f: impl FnOnce(&$type) -> T) -> T { + pub(crate) fn $ref(key: u32, f: impl FnOnce(&$type) -> T) -> T { SLAB.with(|slab| { let borrow = slab.borrow(); - let $http = borrow.get(key).unwrap(); + let $http = borrow.get(key as usize).unwrap(); #[cfg(__zombie_http_tracking)] if !$http.alive { panic!("Attempted to access a dead HTTP object") @@ -211,7 +211,7 @@ with!(with_http, with_http_mut, HttpSlabRecord, http, http); fn slab_insert( request: Request, request_info: HttpConnectionProperties, -) -> usize { +) -> u32 { SLAB.with(|slab| { let (request_parts, request_body) = request.into_parts(); slab.borrow_mut().insert(HttpSlabRecord { @@ -224,7 +224,7 @@ fn slab_insert( #[cfg(__zombie_http_tracking)] alive: true, }) - }) + }) as u32 } #[op] @@ -233,7 +233,7 @@ pub fn op_upgrade_raw(_index: usize) {} #[op] pub async fn op_upgrade( state: Rc>, - index: usize, + index: u32, headers: Vec<(ByteString, ByteString)>, ) -> Result<(ResourceId, ZeroCopyBuf), AnyError> { // Stage 1: set the respnse to 101 Switching Protocols and send it @@ -273,8 +273,8 @@ pub async fn op_upgrade( )) } -#[op] -pub fn op_set_promise_complete(index: usize, status: u16) { +#[op(fast)] +pub fn op_set_promise_complete(index: u32, status: u16) { with_resp_mut(index, |resp| { // The Javascript code will never provide a status that is invalid here (see 23_response.js) *resp.as_mut().unwrap().status_mut() = @@ -287,7 +287,7 @@ pub fn op_set_promise_complete(index: usize, status: u16) { #[op] pub fn op_get_request_method_and_url( - index: usize, + index: u32, ) -> (String, Option, String, String, Option) { // TODO(mmastrac): Passing method can be optimized with_http(index, |http| { @@ -314,7 +314,7 @@ pub fn op_get_request_method_and_url( } #[op] -pub fn op_get_request_header(index: usize, name: String) -> Option { +pub fn op_get_request_header(index: u32, name: String) -> Option { with_req(index, |req| { let value = req.headers.get(name); value.map(|value| value.as_bytes().into()) @@ -322,7 +322,7 @@ pub fn op_get_request_header(index: usize, name: String) -> Option { } #[op] -pub fn op_get_request_headers(index: usize) -> Vec<(ByteString, ByteString)> { +pub fn op_get_request_headers(index: u32) -> Vec<(ByteString, ByteString)> { with_req(index, |req| { let headers = &req.headers; let mut vec = Vec::with_capacity(headers.len()); @@ -356,8 +356,8 @@ pub fn op_get_request_headers(index: usize) -> Vec<(ByteString, ByteString)> { }) } -#[op] -pub fn op_read_request_body(state: &mut OpState, index: usize) -> ResourceId { +#[op(fast)] +pub fn op_read_request_body(state: &mut OpState, index: u32) -> ResourceId { let incoming = with_req_body_mut(index, |body| body.take().unwrap()); let body_resource = Rc::new(HttpRequestBody::new(incoming)); let res = state.resource_table.add_rc(body_resource.clone()); @@ -367,24 +367,20 @@ pub fn op_read_request_body(state: &mut OpState, index: usize) -> ResourceId { res } -#[op] -pub fn op_set_response_header( - index: usize, - name: ByteString, - value: ByteString, -) { +#[op(fast)] +pub fn op_set_response_header(index: u32, name: &str, value: &str) { with_resp_mut(index, |resp| { let resp_headers = resp.as_mut().unwrap().headers_mut(); // These are valid latin-1 strings - let name = HeaderName::from_bytes(&name).unwrap(); - let value = HeaderValue::from_bytes(&value).unwrap(); + let name = HeaderName::from_bytes(name.as_bytes()).unwrap(); + let value = HeaderValue::from_bytes(value.as_bytes()).unwrap(); resp_headers.append(name, value); }); } #[op] pub fn op_set_response_headers( - index: usize, + index: u32, headers: Vec<(ByteString, ByteString)>, ) { // TODO(mmastrac): Invalid headers should be handled? @@ -400,10 +396,10 @@ pub fn op_set_response_headers( }) } -#[op] +#[op(fast)] pub fn op_set_response_body_resource( state: &mut OpState, - index: usize, + index: u32, stream_rid: ResourceId, auto_close: bool, ) -> Result<(), AnyError> { @@ -426,10 +422,10 @@ pub fn op_set_response_body_resource( Ok(()) } -#[op] +#[op(fast)] pub fn op_set_response_body_stream( state: &mut OpState, - index: usize, + index: u32, ) -> Result { // TODO(mmastrac): what should this channel size be? let (tx, rx) = tokio::sync::mpsc::channel(1); @@ -445,8 +441,8 @@ pub fn op_set_response_body_stream( Ok(state.resource_table.add(tx)) } -#[op] -pub fn op_set_response_body_text(index: usize, text: String) { +#[op(fast)] +pub fn op_set_response_body_text(index: u32, text: String) { if !text.is_empty() { with_resp_mut(index, move |response| { response @@ -458,15 +454,15 @@ pub fn op_set_response_body_text(index: usize, text: String) { } } -#[op] -pub fn op_set_response_body_bytes(index: usize, buffer: ZeroCopyBuf) { +#[op(fast)] +pub fn op_set_response_body_bytes(index: u32, buffer: &[u8]) { if !buffer.is_empty() { with_resp_mut(index, |response| { response .as_mut() .unwrap() .body_mut() - .initialize(ResponseBytesInner::Bytes(BufView::from(buffer))) + .initialize(ResponseBytesInner::Bytes(BufView::from(buffer.to_vec()))) }); }; } @@ -474,7 +470,7 @@ pub fn op_set_response_body_bytes(index: usize, buffer: ZeroCopyBuf) { #[op] pub async fn op_http_track( state: Rc>, - index: usize, + index: u32, server_rid: ResourceId, ) -> Result<(), AnyError> { let handle = with_resp(index, |resp| { @@ -496,12 +492,12 @@ pub async fn op_http_track( } #[pin_project(PinnedDrop)] -pub struct SlabFuture>(usize, #[pin] F); +pub struct SlabFuture>(u32, #[pin] F); pub fn new_slab_future( request: Request, request_info: HttpConnectionProperties, - tx: tokio::sync::mpsc::Sender, + tx: tokio::sync::mpsc::Sender, ) -> SlabFuture> { let index = slab_insert(request, request_info); let rx = with_promise(index, |promise| promise.clone()); @@ -521,11 +517,11 @@ impl> PinnedDrop for SlabFuture { SLAB.with(|slab| { #[cfg(__zombie_http_tracking)] { - slab.borrow_mut().get_mut(self.0).unwrap().alive = false; + slab.borrow_mut().get_mut(self.0 as usize).unwrap().alive = false; } #[cfg(not(__zombie_http_tracking))] { - slab.borrow_mut().remove(self.0); + slab.borrow_mut().remove(self.0 as usize); } }); } @@ -589,7 +585,7 @@ fn serve_https( mut io: TlsStream, request_info: HttpConnectionProperties, cancel: RcRef, - tx: tokio::sync::mpsc::Sender, + tx: tokio::sync::mpsc::Sender, ) -> JoinHandle> { // TODO(mmastrac): This is faster if we can use tokio::spawn but then the send bounds get us let svc = service_fn(move |req: Request| { @@ -614,7 +610,7 @@ fn serve_http( io: impl HttpServeStream, request_info: HttpConnectionProperties, cancel: RcRef, - tx: tokio::sync::mpsc::Sender, + tx: tokio::sync::mpsc::Sender, ) -> JoinHandle> { // TODO(mmastrac): This is faster if we can use tokio::spawn but then the send bounds get us let svc = service_fn(move |req: Request| { @@ -627,7 +623,7 @@ fn serve_http_on( network_stream: NetworkStream, listen_properties: &HttpListenProperties, cancel: RcRef, - tx: tokio::sync::mpsc::Sender, + tx: tokio::sync::mpsc::Sender, ) -> JoinHandle> { // We always want some sort of peer address. If we can't get one, just make up one. let peer_address = network_stream.peer_address().unwrap_or_else(|_| { @@ -659,7 +655,7 @@ fn serve_http_on( struct HttpJoinHandle( AsyncRefCell>>>, CancelHandle, - AsyncRefCell>, + AsyncRefCell>, ); impl HttpJoinHandle { @@ -798,7 +794,7 @@ pub async fn op_http_wait( // Do we have a request? if let Some(req) = next { - return Ok(req as u32); + return Ok(req); } // No - we're shutting down