diff --git a/Cargo.lock b/Cargo.lock index 6b7f42c11b..7c98166685 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1285,6 +1285,7 @@ dependencies = [ "flate2", "fly-accept-encoding", "http", + "http-body-util", "httparse", "hyper 0.14.27", "hyper 1.0.0-rc.4", @@ -1298,7 +1299,6 @@ dependencies = [ "ring", "scopeguard", "serde", - "slab", "smallvec", "thiserror", "tokio", @@ -2716,6 +2716,19 @@ dependencies = [ "http", ] +[[package]] +name = "http-body-util" +version = "0.1.0-rc.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08ef12f041acdd397010e5fb6433270c147d3b8b2d0a840cd7fff8e531dca5c8" +dependencies = [ + "bytes", + "futures-util", + "http", + "http-body 1.0.0-rc.2", + "pin-project-lite", +] + [[package]] name = "httparse" version = "1.8.0" diff --git a/ext/http/00_serve.js b/ext/http/00_serve.js index fbd2014a7c..05e0bb5c33 100644 --- a/ext/http/00_serve.js +++ b/ext/http/00_serve.js @@ -117,11 +117,11 @@ function upgradeHttpRaw(req, conn) { function addTrailers(resp, headerList) { const inner = toInnerResponse(resp); - op_http_set_response_trailers(inner.slabId, headerList); + op_http_set_response_trailers(inner.external, headerList); } class InnerRequest { - #slabId; + #external; #context; #methodAndUri; #streamRid; @@ -129,14 +129,14 @@ class InnerRequest { #upgraded; #urlValue; - constructor(slabId, context) { - this.#slabId = slabId; + constructor(external, context) { + this.#external = external; this.#context = context; this.#upgraded = false; } close() { - this.#slabId = undefined; + this.#external = null; } get [_upgraded]() { @@ -147,7 +147,7 @@ class InnerRequest { if (this.#upgraded) { throw new Deno.errors.Http("already upgraded"); } - if (this.#slabId === undefined) { + if (this.#external === null) { throw new Deno.errors.Http("already closed"); } @@ -159,7 +159,7 @@ class InnerRequest { // upgradeHttpRaw is sync if (upgradeType == "upgradeHttpRaw") { - const slabId = this.#slabId; + const external = this.#external; const underlyingConn = originalArgs[0]; this.url(); @@ -168,7 +168,7 @@ class InnerRequest { this.#upgraded = () => {}; - const upgradeRid = op_http_upgrade_raw(slabId); + const upgradeRid = op_http_upgrade_raw(external); const conn = new TcpConn( upgradeRid, @@ -184,7 +184,7 @@ class InnerRequest { const response = originalArgs[0]; const ws = originalArgs[1]; - const slabId = this.#slabId; + const external = this.#external; this.url(); this.headerList; @@ -194,15 +194,16 @@ class InnerRequest { this.#upgraded = () => { goAhead.resolve(); }; + const wsPromise = op_http_upgrade_websocket_next( + external, + response.headerList, + ); // Start the upgrade in the background. (async () => { try { // Returns the upgraded websocket connection - const wsRid = await op_http_upgrade_websocket_next( - slabId, - response.headerList, - ); + const wsRid = await wsPromise; // We have to wait for the go-ahead signal await goAhead; @@ -236,12 +237,12 @@ class InnerRequest { } if (this.#methodAndUri === undefined) { - if (this.#slabId === undefined) { + if (this.#external === null) { throw new TypeError("request closed"); } // TODO(mmastrac): This is quite slow as we're serializing a large number of values. We may want to consider // splitting this up into multiple ops. - this.#methodAndUri = op_http_get_request_method_and_url(this.#slabId); + this.#methodAndUri = op_http_get_request_method_and_url(this.#external); } const path = this.#methodAndUri[2]; @@ -281,10 +282,10 @@ class InnerRequest { }; } if (this.#methodAndUri === undefined) { - if (this.#slabId === undefined) { + if (this.#external === null) { throw new TypeError("request closed"); } - this.#methodAndUri = op_http_get_request_method_and_url(this.#slabId); + this.#methodAndUri = op_http_get_request_method_and_url(this.#external); } return { transport: "tcp", @@ -295,16 +296,16 @@ class InnerRequest { get method() { if (this.#methodAndUri === undefined) { - if (this.#slabId === undefined) { + if (this.#external === null) { throw new TypeError("request closed"); } - this.#methodAndUri = op_http_get_request_method_and_url(this.#slabId); + this.#methodAndUri = op_http_get_request_method_and_url(this.#external); } return this.#methodAndUri[0]; } get body() { - if (this.#slabId === undefined) { + if (this.#external === null) { throw new TypeError("request closed"); } if (this.#body !== undefined) { @@ -316,25 +317,25 @@ class InnerRequest { this.#body = null; return null; } - this.#streamRid = op_http_read_request_body(this.#slabId); + this.#streamRid = op_http_read_request_body(this.#external); this.#body = new InnerBody(readableStreamForRid(this.#streamRid, false)); return this.#body; } get headerList() { - if (this.#slabId === undefined) { + if (this.#external === null) { throw new TypeError("request closed"); } const headers = []; - const reqHeaders = op_http_get_request_headers(this.#slabId); + const reqHeaders = op_http_get_request_headers(this.#external); for (let i = 0; i < reqHeaders.length; i += 2) { ArrayPrototypePush(headers, [reqHeaders[i], reqHeaders[i + 1]]); } return headers; } - get slabId() { - return this.#slabId; + get external() { + return this.#external; } } @@ -483,8 +484,8 @@ function mapToCallback(context, callback, onError) { // Did everything shut down while we were waiting? if (context.closed) { // We're shutting down, so this status shouldn't make it back to the client but "Service Unavailable" seems appropriate - op_http_set_promise_complete(req, 503); innerRequest?.close(); + op_http_set_promise_complete(req, 503); return; } @@ -498,8 +499,8 @@ function mapToCallback(context, callback, onError) { } } - fastSyncResponseOrStream(req, inner.body, status); innerRequest?.close(); + fastSyncResponseOrStream(req, inner.body, status); }; } @@ -659,7 +660,7 @@ function serveHttpOn(context, callback) { try { // Attempt to pull as many requests out of the queue as possible before awaiting. This API is // a synchronous, non-blocking API that returns u32::MAX if anything goes wrong. - while ((req = op_http_try_wait(rid)) !== -1) { + while ((req = op_http_try_wait(rid)) !== null) { PromisePrototypeCatch(callback(req), promiseErrorHandler); } currentPromise = op_http_wait(rid); @@ -677,7 +678,7 @@ function serveHttpOn(context, callback) { } throw new Deno.errors.Http(error); } - if (req === -1) { + if (req === null) { break; } PromisePrototypeCatch(callback(req), promiseErrorHandler); diff --git a/ext/http/Cargo.toml b/ext/http/Cargo.toml index ff44a69924..923d9e2eab 100644 --- a/ext/http/Cargo.toml +++ b/ext/http/Cargo.toml @@ -11,7 +11,6 @@ repository.workspace = true description = "HTTP server implementation for Deno" [features] -"__zombie_http_tracking" = [] "__http_tracing" = [] [lib] @@ -46,7 +45,6 @@ pin-project.workspace = true ring.workspace = true scopeguard.workspace = true serde.workspace = true -slab.workspace = true smallvec.workspace = true thiserror.workspace = true tokio.workspace = true @@ -54,4 +52,5 @@ tokio-util = { workspace = true, features = ["io"] } [dev-dependencies] bencher.workspace = true +http-body-util = "=0.1.0-rc.3" rand.workspace = true diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs index 7fc396b386..399515159e 100644 --- a/ext/http/http_next.rs +++ b/ext/http/http_next.rs @@ -10,12 +10,11 @@ use crate::request_properties::HttpPropertyExtractor; use crate::response_body::Compression; use crate::response_body::ResponseBytes; use crate::response_body::ResponseBytesInner; -use crate::slab::new_slab_future; -use crate::slab::slab_get; -use crate::slab::slab_init; -use crate::slab::HttpRequestBodyAutocloser; -use crate::slab::RefCount; -use crate::slab::SlabId; +use crate::service::handle_request; +use crate::service::http_trace; +use crate::service::HttpRecord; +use crate::service::HttpRequestBodyAutocloser; +use crate::service::RefCount; use crate::websocket_upgrade::WebSocketUpgrade; use crate::LocalExecutor; use cache_control::CacheControl; @@ -33,6 +32,7 @@ use deno_core::ByteString; use deno_core::CancelFuture; use deno_core::CancelHandle; use deno_core::CancelTryFuture; +use deno_core::ExternalPointer; use deno_core::JsBuffer; use deno_core::OpState; use deno_core::RcRef; @@ -62,9 +62,11 @@ use once_cell::sync::Lazy; use smallvec::SmallVec; use std::borrow::Cow; use std::cell::RefCell; +use std::ffi::c_void; use std::future::Future; use std::io; use std::pin::Pin; +use std::ptr::null; use std::rc::Rc; use std::time::Duration; @@ -116,14 +118,66 @@ impl< { } +#[repr(transparent)] +struct RcHttpRecord(Rc); + +// Temp copy +/// Define an external type. +macro_rules! external { + ($type:ident, $name:literal) => { + impl deno_core::Externalizable for $type { + fn external_marker() -> usize { + // Use the address of a static mut as a way to get around lack of usize-sized TypeId. Because it is mutable, the + // compiler cannot collapse multiple definitions into one. + static mut DEFINITION: deno_core::ExternalDefinition = + deno_core::ExternalDefinition::new($name); + // Wash the pointer through black_box so the compiler cannot see what we're going to do with it and needs + // to assume it will be used for valid purposes. + // SAFETY: temporary while waiting on deno core bump + let ptr = std::hint::black_box(unsafe { &mut DEFINITION } as *mut _); + ptr as usize + } + + fn external_name() -> &'static str { + $name + } + } + }; +} + +// Register the [`HttpRecord`] as an external. +external!(RcHttpRecord, "http record"); + +/// Construct Rc from raw external pointer, consuming +/// refcount. You must make sure the external is deleted on the JS side. +macro_rules! take_external { + ($external:expr, $args:tt) => {{ + let ptr = ExternalPointer::::from_raw($external); + let record = ptr.unsafely_take().0; + http_trace!(record, $args); + record + }}; +} + +/// Clone Rc from raw external pointer. +macro_rules! clone_external { + ($external:expr, $args:tt) => {{ + let ptr = ExternalPointer::::from_raw($external); + ptr.unsafely_deref().0.clone() + }}; +} + #[op2(fast)] #[smi] pub fn op_http_upgrade_raw( state: &mut OpState, - #[smi] slab_id: SlabId, + external: *const c_void, ) -> Result { + // SAFETY: external is deleted before calling this op. + let http = unsafe { take_external!(external, "op_http_upgrade_raw") }; + // Stage 1: extract the upgrade future - let upgrade = slab_get(slab_id).upgrade()?; + let upgrade = http.upgrade()?; let (read, write) = tokio::io::duplex(1024); let (read_rx, write_tx) = tokio::io::split(read); let (mut write_rx, mut read_tx) = tokio::io::split(write); @@ -137,7 +191,6 @@ pub fn op_http_upgrade_raw( match upgrade_stream.write(&buf[..read]) { Ok(None) => continue, Ok(Some((response, bytes))) => { - let mut http = slab_get(slab_id); *http.response() = response; http.complete(); let mut upgraded = TokioIo::new(upgrade.await?); @@ -188,20 +241,23 @@ pub fn op_http_upgrade_raw( #[smi] pub async fn op_http_upgrade_websocket_next( state: Rc>, - #[smi] slab_id: SlabId, + external: *const c_void, #[serde] headers: Vec<(ByteString, ByteString)>, ) -> Result { - let mut http = slab_get(slab_id); + let http = + // SAFETY: external is deleted before calling this op. + unsafe { take_external!(external, "op_http_upgrade_websocket_next") }; // Stage 1: set the response to 101 Switching Protocols and send it let upgrade = http.upgrade()?; - - let response = http.response(); - *response.status_mut() = StatusCode::SWITCHING_PROTOCOLS; - for (name, value) in headers { - response.headers_mut().append( - HeaderName::from_bytes(&name).unwrap(), - HeaderValue::from_bytes(&value).unwrap(), - ); + { + let mut response = http.response(); + *response.status_mut() = StatusCode::SWITCHING_PROTOCOLS; + for (name, value) in headers { + response.headers_mut().append( + HeaderName::from_bytes(&name).unwrap(), + HeaderValue::from_bytes(&value).unwrap(), + ); + } } http.complete(); @@ -214,8 +270,10 @@ pub async fn op_http_upgrade_websocket_next( } #[op2(fast)] -pub fn op_http_set_promise_complete(#[smi] slab_id: SlabId, status: u16) { - let mut http = slab_get(slab_id); +pub fn op_http_set_promise_complete(external: *const c_void, status: u16) { + let http = + // SAFETY: external is deleted before calling this op. + unsafe { take_external!(external, "op_http_set_promise_complete") }; // The Javascript code should never provide a status that is invalid here (see 23_response.js), so we // will quitely ignore invalid values. if let Ok(code) = StatusCode::from_u16(status) { @@ -227,16 +285,18 @@ pub fn op_http_set_promise_complete(#[smi] slab_id: SlabId, status: u16) { #[op2] pub fn op_http_get_request_method_and_url<'scope, HTTP>( scope: &mut v8::HandleScope<'scope>, - #[smi] slab_id: SlabId, + external: *const c_void, ) -> v8::Local<'scope, v8::Array> where HTTP: HttpPropertyExtractor, { - let http = slab_get(slab_id); + let http = + // SAFETY: op is called with external. + unsafe { clone_external!(external, "op_http_get_request_method_and_url") }; let request_info = http.request_info(); let request_parts = http.request_parts(); let request_properties = HTTP::request_properties( - request_info, + &request_info, &request_parts.uri, &request_parts.headers, ); @@ -291,20 +351,25 @@ where #[op2] #[serde] pub fn op_http_get_request_header( - #[smi] slab_id: SlabId, + external: *const c_void, #[string] name: String, ) -> Option { - let http = slab_get(slab_id); - let value = http.request_parts().headers.get(name); + let http = + // SAFETY: op is called with external. + unsafe { clone_external!(external, "op_http_get_request_header") }; + let request_parts = http.request_parts(); + let value = request_parts.headers.get(name); value.map(|value| value.as_bytes().into()) } #[op2] pub fn op_http_get_request_headers<'scope>( scope: &mut v8::HandleScope<'scope>, - #[smi] slab_id: SlabId, + external: *const c_void, ) -> v8::Local<'scope, v8::Array> { - let http = slab_get(slab_id); + let http = + // SAFETY: op is called with external. + unsafe { clone_external!(external, "op_http_get_request_headers") }; let headers = &http.request_parts().headers; // Two slots for each header key/value pair let mut vec: SmallVec<[v8::Local; 32]> = @@ -371,9 +436,11 @@ pub fn op_http_get_request_headers<'scope>( #[smi] pub fn op_http_read_request_body( state: Rc>, - #[smi] slab_id: SlabId, + external: *const c_void, ) -> ResourceId { - let mut http = slab_get(slab_id); + let http = + // SAFETY: op is called with external. + unsafe { clone_external!(external, "op_http_read_request_body") }; let rid = if let Some(incoming) = http.take_body() { let body_resource = Rc::new(HttpRequestBody::new(incoming)); state.borrow_mut().resource_table.add_rc(body_resource) @@ -388,12 +455,15 @@ pub fn op_http_read_request_body( #[op2(fast)] pub fn op_http_set_response_header( - #[smi] slab_id: SlabId, + external: *const c_void, #[string(onebyte)] name: Cow<[u8]>, #[string(onebyte)] value: Cow<[u8]>, ) { - let mut http = slab_get(slab_id); - let resp_headers = http.response().headers_mut(); + let http = + // SAFETY: op is called with external. + unsafe { clone_external!(external, "op_http_set_response_header") }; + let mut response = http.response(); + let resp_headers = response.headers_mut(); // These are valid latin-1 strings let name = HeaderName::from_bytes(&name).unwrap(); let value = match value { @@ -409,12 +479,15 @@ pub fn op_http_set_response_header( #[op2] pub fn op_http_set_response_headers( scope: &mut v8::HandleScope, - #[smi] slab_id: SlabId, + external: *const c_void, headers: v8::Local, ) { - let mut http = slab_get(slab_id); + let http = + // SAFETY: op is called with external. + unsafe { clone_external!(external, "op_http_set_response_headers") }; // TODO(mmastrac): Invalid headers should be handled? - let resp_headers = http.response().headers_mut(); + let mut response = http.response(); + let resp_headers = response.headers_mut(); let len = headers.length(); let header_len = len * 2; @@ -438,10 +511,12 @@ pub fn op_http_set_response_headers( #[op2] pub fn op_http_set_response_trailers( - #[smi] slab_id: SlabId, + external: *const c_void, #[serde] trailers: Vec<(ByteString, ByteString)>, ) { - let mut http = slab_get(slab_id); + let http = + // SAFETY: op is called with external. + unsafe { clone_external!(external, "op_http_set_response_trailers") }; let mut trailer_map: HeaderMap = HeaderMap::with_capacity(trailers.len()); for (name, value) in trailers { // These are valid latin-1 strings @@ -577,20 +652,21 @@ fn ensure_vary_accept_encoding(hmap: &mut HeaderMap) { /// Sets the appropriate response body. Use `force_instantiate_body` if you need /// to ensure that the response is cleaned up correctly (eg: for resources). fn set_response( - slab_id: SlabId, + external: *const c_void, length: Option, status: u16, force_instantiate_body: bool, response_fn: impl FnOnce(Compression) -> ResponseBytesInner, ) { - let mut http = slab_get(slab_id); + // SAFETY: external is deleted before calling this op. + let http = unsafe { take_external!(external, "set_response") }; // The request may have been cancelled by this point and if so, there's no need for us to // do all of this work to send the response. if !http.cancelled() { let resource = http.take_resource(); let compression = is_request_compressible(length, &http.request_parts().headers); - let response = http.response(); + let mut response = http.response(); let compression = modify_compressibility_from_response(compression, response.headers_mut()); response @@ -612,7 +688,7 @@ fn set_response( #[op2(fast)] pub fn op_http_set_response_body_resource( state: Rc>, - #[smi] slab_id: SlabId, + external: *const c_void, #[smi] stream_rid: ResourceId, auto_close: bool, status: u16, @@ -634,7 +710,7 @@ pub fn op_http_set_response_body_resource( }; set_response( - slab_id, + external, resource.size_hint().1.map(|s| s as usize), status, true, @@ -648,41 +724,42 @@ pub fn op_http_set_response_body_resource( #[op2(fast)] pub fn op_http_set_response_body_text( - #[smi] slab_id: SlabId, + external: *const c_void, #[string] text: String, status: u16, ) { if !text.is_empty() { - set_response(slab_id, Some(text.len()), status, false, |compression| { + set_response(external, Some(text.len()), status, false, |compression| { ResponseBytesInner::from_vec(compression, text.into_bytes()) }); } else { - op_http_set_promise_complete::call(slab_id, status); + op_http_set_promise_complete::call(external, status); } } #[op2(fast)] pub fn op_http_set_response_body_bytes( - #[smi] slab_id: SlabId, + external: *const c_void, #[buffer] buffer: JsBuffer, status: u16, ) { if !buffer.is_empty() { - set_response(slab_id, Some(buffer.len()), status, false, |compression| { + set_response(external, Some(buffer.len()), status, false, |compression| { ResponseBytesInner::from_bufview(compression, BufView::from(buffer)) }); } else { - op_http_set_promise_complete::call(slab_id, status); + op_http_set_promise_complete::call(external, status); } } #[op2(async)] pub async fn op_http_track( state: Rc>, - #[smi] slab_id: SlabId, + external: *const c_void, #[smi] server_rid: ResourceId, ) -> Result<(), AnyError> { - let http = slab_get(slab_id); + // SAFETY: op is called with external. + let http = unsafe { clone_external!(external, "op_http_track") }; let handle = http.body_promise(); let join_handle = state @@ -764,7 +841,7 @@ fn serve_https( mut io: TlsStream, request_info: HttpConnectionProperties, lifetime: HttpLifetime, - tx: tokio::sync::mpsc::Sender, + tx: tokio::sync::mpsc::Sender>, ) -> JoinHandle> { let HttpLifetime { refcount, @@ -773,7 +850,7 @@ fn serve_https( } = lifetime; let svc = service_fn(move |req: Request| { - new_slab_future(req, request_info.clone(), refcount.clone(), tx.clone()) + handle_request(req, request_info.clone(), refcount.clone(), tx.clone()) }); spawn( async { @@ -801,7 +878,7 @@ fn serve_http( io: impl HttpServeStream, request_info: HttpConnectionProperties, lifetime: HttpLifetime, - tx: tokio::sync::mpsc::Sender, + tx: tokio::sync::mpsc::Sender>, ) -> JoinHandle> { let HttpLifetime { refcount, @@ -810,7 +887,7 @@ fn serve_http( } = lifetime; let svc = service_fn(move |req: Request| { - new_slab_future(req, request_info.clone(), refcount.clone(), tx.clone()) + handle_request(req, request_info.clone(), refcount.clone(), tx.clone()) }); spawn( serve_http2_autodetect(io, svc, listen_cancel_handle) @@ -822,7 +899,7 @@ fn serve_http_on( connection: HTTP::Connection, listen_properties: &HttpListenProperties, lifetime: HttpLifetime, - tx: tokio::sync::mpsc::Sender, + tx: tokio::sync::mpsc::Sender>, ) -> JoinHandle> where HTTP: HttpPropertyExtractor, @@ -857,12 +934,12 @@ struct HttpJoinHandle { join_handle: AsyncRefCell>>>, connection_cancel_handle: Rc, listen_cancel_handle: Rc, - rx: AsyncRefCell>, + rx: AsyncRefCell>>, refcount: RefCount, } impl HttpJoinHandle { - fn new(rx: tokio::sync::mpsc::Receiver) -> Self { + fn new(rx: tokio::sync::mpsc::Receiver>) -> Self { Self { join_handle: AsyncRefCell::new(None), connection_cancel_handle: CancelHandle::new_rc(), @@ -918,8 +995,6 @@ pub fn op_http_serve( where HTTP: HttpPropertyExtractor, { - slab_init(); - let listener = HTTP::get_listener_for_rid(&mut state.borrow_mut(), listener_rid)?; @@ -969,8 +1044,6 @@ pub fn op_http_serve_on( where HTTP: HttpPropertyExtractor, { - slab_init(); - let connection = HTTP::get_connection_for_rid(&mut state.borrow_mut(), connection_rid)?; @@ -1000,36 +1073,38 @@ where } /// Synchronous, non-blocking call to see if there are any further HTTP requests. If anything -/// goes wrong in this method we return [`SlabId::MAX`] and let the async handler pick up the real error. +/// goes wrong in this method we return null and let the async handler pick up the real error. #[op2(fast)] -#[smi] -pub fn op_http_try_wait(state: &mut OpState, #[smi] rid: ResourceId) -> SlabId { +pub fn op_http_try_wait( + state: &mut OpState, + #[smi] rid: ResourceId, +) -> *const c_void { // The resource needs to exist. let Ok(join_handle) = state.resource_table.get::(rid) else { - return SlabId::MAX; + return null(); }; // If join handle is somehow locked, just abort. let Some(mut handle) = RcRef::map(&join_handle, |this| &this.rx).try_borrow_mut() else { - return SlabId::MAX; + return null(); }; // See if there are any requests waiting on this channel. If not, return. - let Ok(id) = handle.try_recv() else { - return SlabId::MAX; + let Ok(record) = handle.try_recv() else { + return null(); }; - id + let ptr = ExternalPointer::new(RcHttpRecord(record)); + ptr.into_raw() } #[op2(async)] -#[smi] pub async fn op_http_wait( state: Rc>, #[smi] rid: ResourceId, -) -> Result { +) -> Result<*const c_void, AnyError> { // We will get the join handle initially, as we might be consuming requests still let join_handle = state .borrow_mut() @@ -1046,8 +1121,9 @@ pub async fn op_http_wait( .await; // Do we have a request? - if let Some(req) = next { - return Ok(req); + if let Some(record) = next { + let ptr = ExternalPointer::new(RcHttpRecord(record)); + return Ok(ptr.into_raw()); } // No - we're shutting down @@ -1063,14 +1139,14 @@ pub async fn op_http_wait( if let Some(err) = err.source() { if let Some(err) = err.downcast_ref::() { if err.kind() == io::ErrorKind::NotConnected { - return Ok(SlabId::MAX); + return Ok(null()); } } } return Err(err); } - Ok(SlabId::MAX) + Ok(null()) } /// Cancels the HTTP handle. diff --git a/ext/http/lib.rs b/ext/http/lib.rs index d470111195..0460a3707f 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -86,7 +86,7 @@ mod reader_stream; mod request_body; mod request_properties; mod response_body; -mod slab; +mod service; mod websocket_upgrade; pub use request_properties::DefaultHttpPropertyExtractor; diff --git a/ext/http/response_body.rs b/ext/http/response_body.rs index 4f7e3b0a5d..7d91dce6b4 100644 --- a/ext/http/response_body.rs +++ b/ext/http/response_body.rs @@ -23,7 +23,7 @@ use hyper1::body::Frame; use hyper1::body::SizeHint; use pin_project::pin_project; -use crate::slab::HttpRequestBodyAutocloser; +use crate::service::HttpRequestBodyAutocloser; /// Simplification for nested types we use for our streams. We provide a way to convert from /// this type into Hyper's body [`Frame`]. @@ -80,6 +80,7 @@ impl CompletionHandle { } } + #[allow(dead_code)] pub fn is_completed(&self) -> bool { self.inner.borrow().complete } diff --git a/ext/http/service.rs b/ext/http/service.rs new file mode 100644 index 0000000000..ea67980f3e --- /dev/null +++ b/ext/http/service.rs @@ -0,0 +1,401 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +use crate::request_properties::HttpConnectionProperties; +use crate::response_body::CompletionHandle; +use crate::response_body::ResponseBytes; +use deno_core::error::AnyError; +use deno_core::OpState; +use deno_core::ResourceId; +use http::request::Parts; +use http::HeaderMap; +use hyper1::body::Incoming; +use hyper1::upgrade::OnUpgrade; + +use scopeguard::guard; +use scopeguard::ScopeGuard; +use std::cell::Ref; +use std::cell::RefCell; +use std::cell::RefMut; +use std::future::Future; +use std::rc::Rc; + +pub type Request = hyper1::Request; +pub type Response = hyper1::Response; + +macro_rules! http_trace { + ($record:expr, $args:tt) => { + #[cfg(feature = "__http_tracing")] + { + println!( + "HTTP id={:p} strong={}: {}", + $record, + std::rc::Rc::strong_count(&$record), + format!($args), + ); + } + }; +} + +pub(crate) use http_trace; + +#[repr(transparent)] +#[derive(Clone, Default)] +pub struct RefCount(pub Rc<()>); + +enum RequestBodyState { + Incoming(Incoming), + Resource(HttpRequestBodyAutocloser), +} + +impl From for RequestBodyState { + fn from(value: Incoming) -> Self { + RequestBodyState::Incoming(value) + } +} + +/// Ensures that the request body closes itself when no longer needed. +pub struct HttpRequestBodyAutocloser(ResourceId, Rc>); + +impl HttpRequestBodyAutocloser { + pub fn new(res: ResourceId, op_state: Rc>) -> Self { + Self(res, op_state) + } +} + +impl Drop for HttpRequestBodyAutocloser { + fn drop(&mut self) { + if let Ok(res) = self.1.borrow_mut().resource_table.take_any(self.0) { + res.close(); + } + } +} + +pub async fn handle_request( + request: Request, + request_info: HttpConnectionProperties, + _refcount: RefCount, // Keep server alive for duration of this future. + tx: tokio::sync::mpsc::Sender>, +) -> Result { + // If the underlying TCP connection is closed, this future will be dropped + // and execution could stop at any await point. + // The HttpRecord must live until JavaScript is done processing so is wrapped + // in an Rc. The guard ensures unneeded resources are freed at cancellation. + let guarded_record = + guard(HttpRecord::new(request, request_info), HttpRecord::cancel); + + // Clone HttpRecord and send to JavaScript for processing. + // Safe to unwrap as channel receiver is never closed. + tx.send(guarded_record.clone()).await.unwrap(); + + // Wait for JavaScript handler to return request. + http_trace!(*guarded_record, "handle_request response_ready.await"); + guarded_record.response_ready().await; + + // Defuse the guard. Must not await after the point. + let record = ScopeGuard::into_inner(guarded_record); + http_trace!(record, "handle_request complete"); + assert!( + Rc::strong_count(&record) == 1, + "HTTP state error: Expected to be last strong reference (handle_request)" + ); + let response = record.take_response(); + Ok(response) +} + +struct HttpRecordInner { + request_info: HttpConnectionProperties, + request_parts: Parts, + request_body: Option, + /// The response may get taken before we tear this down + response: Option, + response_ready: bool, + response_waker: Option, + trailers: Rc>>, + been_dropped: bool, +} + +pub struct HttpRecord(RefCell); + +#[cfg(feature = "__http_tracing")] +pub static RECORD_COUNT: std::sync::atomic::AtomicUsize = + std::sync::atomic::AtomicUsize::new(0); + +#[cfg(feature = "__http_tracing")] +impl Drop for HttpRecord { + fn drop(&mut self) { + let count = RECORD_COUNT + .fetch_sub(1, std::sync::atomic::Ordering::SeqCst) + .checked_sub(1) + .expect("Count went below zero"); + println!("HTTP count={count}: HttpRecord::drop"); + } +} + +impl HttpRecord { + fn new(request: Request, request_info: HttpConnectionProperties) -> Rc { + #[cfg(feature = "__http_tracing")] + { + RECORD_COUNT.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + } + let (request_parts, request_body) = request.into_parts(); + let body = ResponseBytes::default(); + let trailers = body.trailers(); + let request_body = Some(request_body.into()); + let inner = HttpRecordInner { + request_info, + request_parts, + request_body, + response: Some(Response::new(body)), + response_ready: false, + response_waker: None, + trailers, + been_dropped: false, + }; + #[allow(clippy::let_and_return)] + let record = Rc::new(Self(RefCell::new(inner))); + http_trace!(record, "HttpRecord::new"); + record + } + + fn self_ref(&self) -> Ref<'_, HttpRecordInner> { + self.0.borrow() + } + + fn self_mut(&self) -> RefMut<'_, HttpRecordInner> { + self.0.borrow_mut() + } + + /// Perform the Hyper upgrade on this record. + pub fn upgrade(&self) -> Result { + // Manually perform the upgrade. We're peeking into hyper's underlying machinery here a bit + self + .self_mut() + .request_parts + .extensions + .remove::() + .ok_or_else(|| AnyError::msg("upgrade unavailable")) + } + + /// Take the Hyper body from this record. + pub fn take_body(&self) -> Option { + let body_holder = &mut self.self_mut().request_body; + let body = body_holder.take(); + match body { + Some(RequestBodyState::Incoming(body)) => Some(body), + x => { + *body_holder = x; + None + } + } + } + + pub fn take_resource(&self) -> Option { + let body_holder = &mut self.self_mut().request_body; + let body = body_holder.take(); + match body { + Some(RequestBodyState::Resource(res)) => Some(res), + x => { + *body_holder = x; + None + } + } + } + + /// Replace the request body with a resource ID and the OpState we'll need to shut it down. + /// We cannot keep just the resource itself, as JS code might be reading from the resource ID + /// to generate the response data (requiring us to keep it in the resource table). + pub fn put_resource(&self, res: HttpRequestBodyAutocloser) { + self.self_mut().request_body = Some(RequestBodyState::Resource(res)); + } + + /// Cleanup resources not needed after the future is dropped. + fn cancel(self: Rc) { + http_trace!(self, "HttpRecord::cancel"); + let mut inner = self.0.borrow_mut(); + inner.been_dropped = true; + // The request body might include actual resources. + inner.request_body.take(); + } + + /// Complete this record, potentially expunging it if it is fully complete (ie: cancelled as well). + pub fn complete(self: Rc) { + http_trace!(self, "HttpRecord::complete"); + let mut inner = self.self_mut(); + assert!( + !inner.been_dropped || Rc::strong_count(&self) == 1, + "HTTP state error: Expected to be last strong reference (been_dropped)" + ); + assert!( + !inner.response_ready, + "HTTP state error: Entry has already been completed" + ); + inner.response_ready = true; + if let Some(waker) = inner.response_waker.take() { + drop(inner); + waker.wake(); + } + } + + /// Has the future for this record been dropped? ie, has the underlying TCP connection + /// been closed? + pub fn cancelled(&self) -> bool { + self.self_ref().been_dropped + } + + /// Get a mutable reference to the response. + pub fn response(&self) -> RefMut<'_, Response> { + RefMut::map(self.self_mut(), |inner| inner.response.as_mut().unwrap()) + } + + /// Get a mutable reference to the trailers. + pub fn trailers(&self) -> Ref<'_, Rc>>> { + Ref::map(self.self_ref(), |inner| &inner.trailers) + } + + /// Take the response. + fn take_response(&self) -> Response { + self.self_mut().response.take().unwrap() + } + + /// Get a reference to the connection properties. + pub fn request_info(&self) -> Ref<'_, HttpConnectionProperties> { + Ref::map(self.self_ref(), |inner| &inner.request_info) + } + + /// Get a reference to the request parts. + pub fn request_parts(&self) -> Ref<'_, Parts> { + Ref::map(self.self_ref(), |inner| &inner.request_parts) + } + + /// Get a reference to the completion handle. + fn response_ready(&self) -> impl Future + '_ { + struct HttpRecordComplete<'a>(&'a HttpRecord); + + impl<'a> Future for HttpRecordComplete<'a> { + type Output = (); + + fn poll( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + let mut mut_self = self.0 .0.borrow_mut(); + if mut_self.response_ready { + return std::task::Poll::Ready(()); + } + mut_self.response_waker = Some(cx.waker().clone()); + std::task::Poll::Pending + } + } + + HttpRecordComplete(self) + } + + /// Get a reference to the response body completion handle. + pub fn body_promise(&self) -> CompletionHandle { + self + .self_ref() + .response + .as_ref() + .unwrap() + .body() + .completion_handle() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::hyper_util_tokioio::TokioIo; + use crate::response_body::Compression; + use crate::response_body::ResponseBytesInner; + use bytes::Buf; + use deno_net::raw::NetworkStreamType; + use hyper1::body::Body; + use hyper1::service::service_fn; + use hyper1::service::HttpService; + use std::error::Error as StdError; + + /// Execute client request on service and concurrently map the response. + async fn serve_request( + req: http::Request, + service: S, + map_response: impl FnOnce(hyper1::Response) -> F, + ) -> hyper1::Result + where + B: Body + Send + 'static, // Send bound due to DuplexStream + B::Data: Send, + B::Error: Into>, + S: HttpService, + S::Error: Into>, + S::ResBody: 'static, + ::Error: Into>, + F: std::future::Future>, + { + use hyper1::client::conn::http1::handshake; + use hyper1::server::conn::http1::Builder; + let (stream_client, stream_server) = tokio::io::duplex(16 * 1024); + let conn_server = + Builder::new().serve_connection(TokioIo::new(stream_server), service); + let (mut sender, conn_client) = + handshake(TokioIo::new(stream_client)).await?; + + let (res, _, _) = tokio::try_join!( + async move { + let res = sender.send_request(req).await?; + map_response(res).await + }, + conn_server, + conn_client, + )?; + Ok(res) + } + + #[tokio::test] + async fn test_handle_request() -> Result<(), AnyError> { + let (tx, mut rx) = tokio::sync::mpsc::channel(10); + let refcount = RefCount::default(); + let refcount_check = refcount.clone(); + let request_info = HttpConnectionProperties { + peer_address: "".into(), + peer_port: None, + local_port: None, + stream_type: NetworkStreamType::Tcp, + }; + let svc = service_fn(move |req: hyper1::Request| { + handle_request(req, request_info.clone(), refcount.clone(), tx.clone()) + }); + + let client_req = http::Request::builder().uri("/").body("".to_string())?; + + // Response produced by concurrent tasks + tokio::try_join!( + async move { + // JavaScript handler produces response + let record = rx.recv().await.unwrap(); + let resource = record.take_resource(); + record.response().body_mut().initialize( + ResponseBytesInner::from_vec( + Compression::None, + b"hello world".to_vec(), + ), + resource, + ); + record.complete(); + Ok(()) + }, + // Server connection executes service + async move { + serve_request(client_req, svc, |res| async { + // Client reads the response + use http_body_util::BodyExt; + assert_eq!(res.status(), 200); + let body = res.collect().await?.to_bytes(); + assert_eq!(body.chunk(), b"hello world"); + Ok(()) + }) + .await + }, + )?; + assert_eq!(Rc::strong_count(&refcount_check.0), 1); + Ok(()) + } +} diff --git a/ext/http/slab.rs b/ext/http/slab.rs deleted file mode 100644 index 790b4649a2..0000000000 --- a/ext/http/slab.rs +++ /dev/null @@ -1,374 +0,0 @@ -// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. -use crate::request_properties::HttpConnectionProperties; -use crate::response_body::CompletionHandle; -use crate::response_body::ResponseBytes; -use deno_core::error::AnyError; -use deno_core::OpState; -use deno_core::ResourceId; -use http::request::Parts; -use http::HeaderMap; -use hyper1::body::Incoming; -use hyper1::upgrade::OnUpgrade; - -use scopeguard::defer; -use slab::Slab; -use std::cell::RefCell; -use std::cell::RefMut; -use std::ptr::NonNull; -use std::rc::Rc; - -pub type Request = hyper1::Request; -pub type Response = hyper1::Response; -pub type SlabId = u32; - -#[repr(transparent)] -#[derive(Clone, Default)] -pub struct RefCount(pub Rc<()>); - -enum RequestBodyState { - Incoming(Incoming), - Resource(HttpRequestBodyAutocloser), -} - -impl From for RequestBodyState { - fn from(value: Incoming) -> Self { - RequestBodyState::Incoming(value) - } -} - -/// Ensures that the request body closes itself when no longer needed. -pub struct HttpRequestBodyAutocloser(ResourceId, Rc>); - -impl HttpRequestBodyAutocloser { - pub fn new(res: ResourceId, op_state: Rc>) -> Self { - Self(res, op_state) - } -} - -impl Drop for HttpRequestBodyAutocloser { - fn drop(&mut self) { - if let Ok(res) = self.1.borrow_mut().resource_table.take_any(self.0) { - res.close(); - } - } -} - -pub async fn new_slab_future( - request: Request, - request_info: HttpConnectionProperties, - refcount: RefCount, - tx: tokio::sync::mpsc::Sender, -) -> Result { - let index = slab_insert(request, request_info, refcount); - defer! { - slab_drop(index); - } - let rx = slab_get(index).promise(); - if tx.send(index).await.is_ok() { - http_trace!(index, "SlabFuture await"); - // We only need to wait for completion if we aren't closed - rx.await; - http_trace!(index, "SlabFuture complete"); - } - let response = slab_get(index).take_response(); - Ok(response) -} - -pub struct HttpSlabRecord { - request_info: HttpConnectionProperties, - request_parts: Parts, - request_body: Option, - /// The response may get taken before we tear this down - response: Option, - promise: CompletionHandle, - trailers: Rc>>, - been_dropped: bool, - /// Use a `Rc` to keep track of outstanding requests. We don't use this, but - /// when it drops, it decrements the refcount of the server itself. - refcount: Option, - #[cfg(feature = "__zombie_http_tracking")] - alive: bool, -} - -thread_local! { - pub(crate) static SLAB: RefCell> = const { RefCell::new(Slab::new()) }; -} - -macro_rules! http_trace { - ($index:expr, $args:tt) => { - #[cfg(feature = "__http_tracing")] - { - let total = $crate::slab::SLAB.with(|x| x.try_borrow().map(|x| x.len())); - if let Ok(total) = total { - println!("HTTP id={} total={}: {}", $index, total, format!($args)); - } else { - println!("HTTP id={} total=?: {}", $index, format!($args)); - } - } - }; -} - -pub(crate) use http_trace; - -/// Hold a lock on the slab table and a reference to one entry in the table. -pub struct SlabEntry( - NonNull, - SlabId, - RefMut<'static, Slab>, -); - -const SLAB_CAPACITY: usize = 1024; - -pub fn slab_init() { - SLAB.with(|slab: &RefCell>| { - // Note that there might already be an active HTTP server, so this may just - // end up adding room for an additional SLAB_CAPACITY items. All HTTP servers - // on a single thread share the same slab. - let mut slab = slab.borrow_mut(); - slab.reserve(SLAB_CAPACITY); - }) -} - -pub fn slab_get(index: SlabId) -> SlabEntry { - http_trace!(index, "slab_get"); - let mut lock: RefMut<'static, Slab> = SLAB.with(|x| { - // SAFETY: We're extracting a lock here and placing it into an object that is thread-local, !Send as a &'static - unsafe { std::mem::transmute(x.borrow_mut()) } - }); - let Some(entry) = lock.get_mut(index as usize) else { - panic!("HTTP state error: Attempted to access invalid request {} ({} in total available)", - index, - lock.len()) - }; - #[cfg(feature = "__zombie_http_tracking")] - { - assert!(entry.alive, "HTTP state error: Entry is not alive"); - } - let entry = NonNull::new(entry as _).unwrap(); - - SlabEntry(entry, index, lock) -} - -#[allow(clippy::let_and_return)] -fn slab_insert_raw( - request_parts: Parts, - request_body: Option, - request_info: HttpConnectionProperties, - refcount: RefCount, -) -> SlabId { - let index = SLAB.with(|slab| { - let mut slab = slab.borrow_mut(); - let body = ResponseBytes::default(); - let trailers = body.trailers(); - let request_body = request_body.map(|r| r.into()); - slab.insert(HttpSlabRecord { - request_info, - request_parts, - request_body, - response: Some(Response::new(body)), - trailers, - been_dropped: false, - promise: CompletionHandle::default(), - refcount: Some(refcount), - #[cfg(feature = "__zombie_http_tracking")] - alive: true, - }) - }) as u32; - http_trace!(index, "slab_insert"); - index -} - -pub fn slab_insert( - request: Request, - request_info: HttpConnectionProperties, - refcount: RefCount, -) -> SlabId { - let (request_parts, request_body) = request.into_parts(); - slab_insert_raw(request_parts, Some(request_body), request_info, refcount) -} - -pub fn slab_drop(index: SlabId) { - http_trace!(index, "slab_drop"); - let mut entry = slab_get(index); - let record = entry.self_mut(); - assert!( - !record.been_dropped, - "HTTP state error: Entry has already been dropped" - ); - - // The logic here is somewhat complicated. A slab record cannot be expunged until it has been dropped by Rust AND - // the promise has been completed (indicating that JavaScript is done processing). However, if Rust has finished - // dealing with this entry, we DO want to clean up some of the associated items -- namely the request body, which - // might include actual resources, and the refcount, which is keeping the server alive. - record.been_dropped = true; - if record.promise.is_completed() { - drop(entry); - slab_expunge(index); - } else { - // Take the request body, as the future has been dropped and this will allow some resources to close - record.request_body.take(); - // Take the refcount keeping the server alive. The future is no longer alive, which means this request - // is toast. - record.refcount.take(); - } -} - -fn slab_expunge(index: SlabId) { - SLAB.with(|slab| { - #[cfg(__zombie_http_tracking)] - { - slab.borrow_mut().get_mut(index as usize).unwrap().alive = false; - } - #[cfg(not(__zombie_http_tracking))] - { - slab.borrow_mut().remove(index as usize); - } - }); - http_trace!(index, "slab_expunge"); -} - -impl SlabEntry { - fn self_ref(&self) -> &HttpSlabRecord { - // SAFETY: We have the lock and we're borrowing lifetime from self - unsafe { self.0.as_ref() } - } - - fn self_mut(&mut self) -> &mut HttpSlabRecord { - // SAFETY: We have the lock and we're borrowing lifetime from self - unsafe { self.0.as_mut() } - } - - /// Perform the Hyper upgrade on this entry. - pub fn upgrade(&mut self) -> Result { - // Manually perform the upgrade. We're peeking into hyper's underlying machinery here a bit - self - .self_mut() - .request_parts - .extensions - .remove::() - .ok_or_else(|| AnyError::msg("upgrade unavailable")) - } - - /// Take the Hyper body from this entry. - pub fn take_body(&mut self) -> Option { - let body_holder = &mut self.self_mut().request_body; - let body = body_holder.take(); - match body { - Some(RequestBodyState::Incoming(body)) => Some(body), - x => { - *body_holder = x; - None - } - } - } - - pub fn take_resource(&mut self) -> Option { - let body_holder = &mut self.self_mut().request_body; - let body = body_holder.take(); - match body { - Some(RequestBodyState::Resource(res)) => Some(res), - x => { - *body_holder = x; - None - } - } - } - - /// Replace the request body with a resource ID and the OpState we'll need to shut it down. - /// We cannot keep just the resource itself, as JS code might be reading from the resource ID - /// to generate the response data (requiring us to keep it in the resource table). - pub fn put_resource(&mut self, res: HttpRequestBodyAutocloser) { - self.self_mut().request_body = Some(RequestBodyState::Resource(res)); - } - - /// Complete this entry, potentially expunging it if it is fully complete (ie: dropped as well). - pub fn complete(self) { - let promise = &self.self_ref().promise; - assert!( - !promise.is_completed(), - "HTTP state error: Entry has already been completed" - ); - http_trace!(self.1, "SlabEntry::complete"); - promise.complete(true); - // If we're all done, we need to drop ourself to release the lock before we expunge this record - if self.self_ref().been_dropped { - let index = self.1; - drop(self); - slab_expunge(index); - } - } - - /// Has the future for this entry been dropped? ie, has the underlying TCP connection - /// been closed? - pub fn cancelled(&self) -> bool { - self.self_ref().been_dropped - } - - /// Get a mutable reference to the response. - pub fn response(&mut self) -> &mut Response { - self.self_mut().response.as_mut().unwrap() - } - - /// Get a mutable reference to the trailers. - pub fn trailers(&mut self) -> &RefCell> { - &self.self_mut().trailers - } - - /// Take the response. - pub fn take_response(&mut self) -> Response { - self.self_mut().response.take().unwrap() - } - - /// Get a reference to the connection properties. - pub fn request_info(&self) -> &HttpConnectionProperties { - &self.self_ref().request_info - } - - /// Get a reference to the request parts. - pub fn request_parts(&self) -> &Parts { - &self.self_ref().request_parts - } - - /// Get a reference to the completion handle. - pub fn promise(&self) -> CompletionHandle { - self.self_ref().promise.clone() - } - - /// Get a reference to the response body completion handle. - pub fn body_promise(&self) -> CompletionHandle { - self - .self_ref() - .response - .as_ref() - .unwrap() - .body() - .completion_handle() - } -} - -#[cfg(test)] -mod tests { - use super::*; - use deno_net::raw::NetworkStreamType; - use http::Request; - - #[test] - fn test_slab() { - let req = Request::builder().body(()).unwrap(); - let (parts, _) = req.into_parts(); - let id = slab_insert_raw( - parts, - None, - HttpConnectionProperties { - peer_address: "".into(), - peer_port: None, - local_port: None, - stream_type: NetworkStreamType::Tcp, - }, - RefCount::default(), - ); - let entry = slab_get(id); - entry.complete(); - slab_drop(id); - } -}