0
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-02-01 12:16:11 -05:00

perf(ext/http): use smi for slab IDs (#18848)

This commit is contained in:
Divy Srivastava 2023-04-26 15:38:13 +05:30 committed by GitHub
parent 18170f2326
commit fbefceeb56
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -141,11 +141,11 @@ macro_rules! with {
($ref:ident, $mut:ident, $type:ty, $http:ident, $expr:expr) => {
#[inline(always)]
#[allow(dead_code)]
pub(crate) fn $mut<T>(key: usize, f: impl FnOnce(&mut $type) -> T) -> T {
pub(crate) fn $mut<T>(key: u32, f: impl FnOnce(&mut $type) -> T) -> T {
SLAB.with(|slab| {
let mut borrow = slab.borrow_mut();
#[allow(unused_mut)] // TODO(mmastrac): compiler issue?
let mut $http = match borrow.get_mut(key) {
let mut $http = match borrow.get_mut(key as usize) {
Some(http) => http,
None => panic!(
"Attemped to access invalid request {} ({} in total available)",
@ -163,10 +163,10 @@ macro_rules! with {
#[inline(always)]
#[allow(dead_code)]
pub(crate) fn $ref<T>(key: usize, f: impl FnOnce(&$type) -> T) -> T {
pub(crate) fn $ref<T>(key: u32, f: impl FnOnce(&$type) -> T) -> T {
SLAB.with(|slab| {
let borrow = slab.borrow();
let $http = borrow.get(key).unwrap();
let $http = borrow.get(key as usize).unwrap();
#[cfg(__zombie_http_tracking)]
if !$http.alive {
panic!("Attempted to access a dead HTTP object")
@ -211,7 +211,7 @@ with!(with_http, with_http_mut, HttpSlabRecord, http, http);
fn slab_insert(
request: Request,
request_info: HttpConnectionProperties,
) -> usize {
) -> u32 {
SLAB.with(|slab| {
let (request_parts, request_body) = request.into_parts();
slab.borrow_mut().insert(HttpSlabRecord {
@ -224,7 +224,7 @@ fn slab_insert(
#[cfg(__zombie_http_tracking)]
alive: true,
})
})
}) as u32
}
#[op]
@ -233,7 +233,7 @@ pub fn op_upgrade_raw(_index: usize) {}
#[op]
pub async fn op_upgrade(
state: Rc<RefCell<OpState>>,
index: usize,
index: u32,
headers: Vec<(ByteString, ByteString)>,
) -> Result<(ResourceId, ZeroCopyBuf), AnyError> {
// Stage 1: set the respnse to 101 Switching Protocols and send it
@ -273,8 +273,8 @@ pub async fn op_upgrade(
))
}
#[op]
pub fn op_set_promise_complete(index: usize, status: u16) {
#[op(fast)]
pub fn op_set_promise_complete(index: u32, status: u16) {
with_resp_mut(index, |resp| {
// The Javascript code will never provide a status that is invalid here (see 23_response.js)
*resp.as_mut().unwrap().status_mut() =
@ -287,7 +287,7 @@ pub fn op_set_promise_complete(index: usize, status: u16) {
#[op]
pub fn op_get_request_method_and_url(
index: usize,
index: u32,
) -> (String, Option<String>, String, String, Option<u16>) {
// TODO(mmastrac): Passing method can be optimized
with_http(index, |http| {
@ -314,7 +314,7 @@ pub fn op_get_request_method_and_url(
}
#[op]
pub fn op_get_request_header(index: usize, name: String) -> Option<ByteString> {
pub fn op_get_request_header(index: u32, name: String) -> Option<ByteString> {
with_req(index, |req| {
let value = req.headers.get(name);
value.map(|value| value.as_bytes().into())
@ -322,7 +322,7 @@ pub fn op_get_request_header(index: usize, name: String) -> Option<ByteString> {
}
#[op]
pub fn op_get_request_headers(index: usize) -> Vec<(ByteString, ByteString)> {
pub fn op_get_request_headers(index: u32) -> Vec<(ByteString, ByteString)> {
with_req(index, |req| {
let headers = &req.headers;
let mut vec = Vec::with_capacity(headers.len());
@ -356,8 +356,8 @@ pub fn op_get_request_headers(index: usize) -> Vec<(ByteString, ByteString)> {
})
}
#[op]
pub fn op_read_request_body(state: &mut OpState, index: usize) -> ResourceId {
#[op(fast)]
pub fn op_read_request_body(state: &mut OpState, index: u32) -> ResourceId {
let incoming = with_req_body_mut(index, |body| body.take().unwrap());
let body_resource = Rc::new(HttpRequestBody::new(incoming));
let res = state.resource_table.add_rc(body_resource.clone());
@ -367,24 +367,20 @@ pub fn op_read_request_body(state: &mut OpState, index: usize) -> ResourceId {
res
}
#[op]
pub fn op_set_response_header(
index: usize,
name: ByteString,
value: ByteString,
) {
#[op(fast)]
pub fn op_set_response_header(index: u32, name: &str, value: &str) {
with_resp_mut(index, |resp| {
let resp_headers = resp.as_mut().unwrap().headers_mut();
// These are valid latin-1 strings
let name = HeaderName::from_bytes(&name).unwrap();
let value = HeaderValue::from_bytes(&value).unwrap();
let name = HeaderName::from_bytes(name.as_bytes()).unwrap();
let value = HeaderValue::from_bytes(value.as_bytes()).unwrap();
resp_headers.append(name, value);
});
}
#[op]
pub fn op_set_response_headers(
index: usize,
index: u32,
headers: Vec<(ByteString, ByteString)>,
) {
// TODO(mmastrac): Invalid headers should be handled?
@ -400,10 +396,10 @@ pub fn op_set_response_headers(
})
}
#[op]
#[op(fast)]
pub fn op_set_response_body_resource(
state: &mut OpState,
index: usize,
index: u32,
stream_rid: ResourceId,
auto_close: bool,
) -> Result<(), AnyError> {
@ -426,10 +422,10 @@ pub fn op_set_response_body_resource(
Ok(())
}
#[op]
#[op(fast)]
pub fn op_set_response_body_stream(
state: &mut OpState,
index: usize,
index: u32,
) -> Result<ResourceId, AnyError> {
// TODO(mmastrac): what should this channel size be?
let (tx, rx) = tokio::sync::mpsc::channel(1);
@ -445,8 +441,8 @@ pub fn op_set_response_body_stream(
Ok(state.resource_table.add(tx))
}
#[op]
pub fn op_set_response_body_text(index: usize, text: String) {
#[op(fast)]
pub fn op_set_response_body_text(index: u32, text: String) {
if !text.is_empty() {
with_resp_mut(index, move |response| {
response
@ -458,15 +454,15 @@ pub fn op_set_response_body_text(index: usize, text: String) {
}
}
#[op]
pub fn op_set_response_body_bytes(index: usize, buffer: ZeroCopyBuf) {
#[op(fast)]
pub fn op_set_response_body_bytes(index: u32, buffer: &[u8]) {
if !buffer.is_empty() {
with_resp_mut(index, |response| {
response
.as_mut()
.unwrap()
.body_mut()
.initialize(ResponseBytesInner::Bytes(BufView::from(buffer)))
.initialize(ResponseBytesInner::Bytes(BufView::from(buffer.to_vec())))
});
};
}
@ -474,7 +470,7 @@ pub fn op_set_response_body_bytes(index: usize, buffer: ZeroCopyBuf) {
#[op]
pub async fn op_http_track(
state: Rc<RefCell<OpState>>,
index: usize,
index: u32,
server_rid: ResourceId,
) -> Result<(), AnyError> {
let handle = with_resp(index, |resp| {
@ -496,12 +492,12 @@ pub async fn op_http_track(
}
#[pin_project(PinnedDrop)]
pub struct SlabFuture<F: Future<Output = ()>>(usize, #[pin] F);
pub struct SlabFuture<F: Future<Output = ()>>(u32, #[pin] F);
pub fn new_slab_future(
request: Request,
request_info: HttpConnectionProperties,
tx: tokio::sync::mpsc::Sender<usize>,
tx: tokio::sync::mpsc::Sender<u32>,
) -> SlabFuture<impl Future<Output = ()>> {
let index = slab_insert(request, request_info);
let rx = with_promise(index, |promise| promise.clone());
@ -521,11 +517,11 @@ impl<F: Future<Output = ()>> PinnedDrop for SlabFuture<F> {
SLAB.with(|slab| {
#[cfg(__zombie_http_tracking)]
{
slab.borrow_mut().get_mut(self.0).unwrap().alive = false;
slab.borrow_mut().get_mut(self.0 as usize).unwrap().alive = false;
}
#[cfg(not(__zombie_http_tracking))]
{
slab.borrow_mut().remove(self.0);
slab.borrow_mut().remove(self.0 as usize);
}
});
}
@ -589,7 +585,7 @@ fn serve_https(
mut io: TlsStream,
request_info: HttpConnectionProperties,
cancel: RcRef<CancelHandle>,
tx: tokio::sync::mpsc::Sender<usize>,
tx: tokio::sync::mpsc::Sender<u32>,
) -> JoinHandle<Result<(), AnyError>> {
// TODO(mmastrac): This is faster if we can use tokio::spawn but then the send bounds get us
let svc = service_fn(move |req: Request| {
@ -614,7 +610,7 @@ fn serve_http(
io: impl HttpServeStream,
request_info: HttpConnectionProperties,
cancel: RcRef<CancelHandle>,
tx: tokio::sync::mpsc::Sender<usize>,
tx: tokio::sync::mpsc::Sender<u32>,
) -> JoinHandle<Result<(), AnyError>> {
// TODO(mmastrac): This is faster if we can use tokio::spawn but then the send bounds get us
let svc = service_fn(move |req: Request| {
@ -627,7 +623,7 @@ fn serve_http_on(
network_stream: NetworkStream,
listen_properties: &HttpListenProperties,
cancel: RcRef<CancelHandle>,
tx: tokio::sync::mpsc::Sender<usize>,
tx: tokio::sync::mpsc::Sender<u32>,
) -> JoinHandle<Result<(), AnyError>> {
// We always want some sort of peer address. If we can't get one, just make up one.
let peer_address = network_stream.peer_address().unwrap_or_else(|_| {
@ -659,7 +655,7 @@ fn serve_http_on(
struct HttpJoinHandle(
AsyncRefCell<Option<JoinHandle<Result<(), AnyError>>>>,
CancelHandle,
AsyncRefCell<tokio::sync::mpsc::Receiver<usize>>,
AsyncRefCell<tokio::sync::mpsc::Receiver<u32>>,
);
impl HttpJoinHandle {
@ -798,7 +794,7 @@ pub async fn op_http_wait(
// Do we have a request?
if let Some(req) = next {
return Ok(req as u32);
return Ok(req);
}
// No - we're shutting down