diff --git a/cli/tests/unit/http_test.ts b/cli/tests/unit/http_test.ts index dd2c6fc734..d947282dbe 100644 --- a/cli/tests/unit/http_test.ts +++ b/cli/tests/unit/http_test.ts @@ -7,7 +7,6 @@ import { assert, assertEquals, assertRejects, - assertStrictEquals, assertThrows, deferred, delay, @@ -387,7 +386,7 @@ unitTest( Deno.errors.Http, "connection closed", ); - // The error from `op_http_accept` reroutes to `respondWith()`. + // The error from `op_http_request_next` reroutes to `respondWith()`. assertEquals(await nextRequestPromise, null); listener.close(); })(); @@ -866,7 +865,6 @@ unitTest( const writer = writable.getWriter(); async function writeResponse() { - await delay(50); await writer.write( new TextEncoder().encode( "written to the writable side of a TransformStream", @@ -1001,39 +999,3 @@ unitTest( await Promise.all([server(), client()]); }, ); - -// https://github.com/denoland/deno/pull/12332 -unitTest( - { permissions: { net: true } }, - async function httpConnConcurrentNextRequestCalls() { - const hostname = "localhost"; - const port = 4501; - - async function server() { - const listener = Deno.listen({ hostname, port }); - const tcpConn = await listener.accept(); - const httpConn = Deno.serveHttp(tcpConn); - const promises = new Array(10).fill(null).map(async (_, i) => { - const event = await httpConn.nextRequest(); - assert(event); - const { pathname } = new URL(event.request.url); - assertStrictEquals(pathname, `/${i}`); - const response = new Response(`Response #${i}`); - await event.respondWith(response); - }); - await Promise.all(promises); - httpConn.close(); - listener.close(); - } - - async function client() { - for (let i = 0; i < 10; i++) { - const response = await fetch(`http://${hostname}:${port}/${i}`); - const body = await response.text(); - assertStrictEquals(body, `Response #${i}`); - } - } - - await Promise.all([server(), delay(100).then(client)]); - }, -); diff --git a/ext/http/01_http.js b/ext/http/01_http.js index d06be2142e..9f05809f5a 100644 --- a/ext/http/01_http.js +++ b/ext/http/01_http.js @@ -27,7 +27,6 @@ Set, SetPrototypeAdd, SetPrototypeDelete, - SetPrototypeHas, SetPrototypeValues, StringPrototypeIncludes, StringPrototypeToLowerCase, @@ -43,8 +42,6 @@ class HttpConn { #rid = 0; - #closed = false; - // This set holds resource ids of resources // that were created during lifecycle of this request. // When the connection is closed these resources should be closed @@ -65,11 +62,10 @@ let nextRequest; try { nextRequest = await core.opAsync( - "op_http_accept", + "op_http_request_next", this.#rid, ); } catch (error) { - this.close(); // A connection error seen here would cause disrupted responses to throw // a generic `BadResource` error. Instead store this error and replace // those with it. @@ -83,26 +79,26 @@ } throw error; } - if (nextRequest == null) { - this.close(); - return null; - } + if (nextRequest === null) return null; const [ - streamRid, + requestRid, + responseSenderRid, method, headersList, url, ] = nextRequest; - SetPrototypeAdd(this.managedResources, streamRid); /** @type {ReadableStream | undefined} */ let body = null; - // There might be a body, but we don't expose it for GET/HEAD requests. - // It will be closed automatically once the request has been handled and - // the response has been sent. - if (method !== "GET" && method !== "HEAD") { - body = createRequestBodyStream(streamRid); + if (typeof requestRid === "number") { + SetPrototypeAdd(this.managedResources, requestRid); + // There might be a body, but we don't expose it for GET/HEAD requests. + // It will be closed automatically once the request has been handled and + // the response has been sent. + if (method !== "GET" && method !== "HEAD") { + body = createRequestBodyStream(this, requestRid); + } } const innerRequest = newInnerRequest( @@ -115,21 +111,22 @@ const signal = abortSignal.newSignal(); const request = fromInnerRequest(innerRequest, signal, "immutable"); - const respondWith = createRespondWith(this, streamRid); + SetPrototypeAdd(this.managedResources, responseSenderRid); + const respondWith = createRespondWith( + this, + responseSenderRid, + requestRid, + ); return { request, respondWith }; } /** @returns {void} */ close() { - if (!this.#closed) { - this.#closed = true; - core.close(this.#rid); - for (const rid of SetPrototypeValues(this.managedResources)) { - SetPrototypeDelete(this.managedResources, rid); - core.close(rid); - } + for (const rid of SetPrototypeValues(this.managedResources)) { + core.tryClose(rid); } + core.close(this.#rid); } [SymbolAsyncIterator]() { @@ -139,86 +136,97 @@ async next() { const reqEvt = await httpConn.nextRequest(); // Change with caution, current form avoids a v8 deopt - return { value: reqEvt ?? undefined, done: reqEvt === null }; + return { value: reqEvt, done: reqEvt === null }; }, }; } } - function readRequest(streamRid, buf) { - return core.opAsync("op_http_read", streamRid, buf); + function readRequest(requestRid, zeroCopyBuf) { + return core.opAsync( + "op_http_request_read", + requestRid, + zeroCopyBuf, + ); } - function createRespondWith(httpConn, streamRid) { + function createRespondWith(httpConn, responseSenderRid, requestRid) { return async function respondWith(resp) { - try { - if (resp instanceof Promise) { - resp = await resp; - } + if (resp instanceof Promise) { + resp = await resp; + } - if (!(resp instanceof Response)) { - throw new TypeError( - "First argument to respondWith must be a Response or a promise resolving to a Response.", - ); - } + if (!(resp instanceof Response)) { + throw new TypeError( + "First argument to respondWith must be a Response or a promise resolving to a Response.", + ); + } - const innerResp = toInnerResponse(resp); + const innerResp = toInnerResponse(resp); - // If response body length is known, it will be sent synchronously in a - // single op, in other case a "response body" resource will be created and - // we'll be streaming it. - /** @type {ReadableStream | Uint8Array | null} */ - let respBody = null; - if (innerResp.body !== null) { - if (innerResp.body.unusable()) { - throw new TypeError("Body is unusable."); - } - if (innerResp.body.streamOrStatic instanceof ReadableStream) { - if ( - innerResp.body.length === null || - innerResp.body.source instanceof Blob - ) { - respBody = innerResp.body.stream; - } else { - const reader = innerResp.body.stream.getReader(); - const r1 = await reader.read(); - if (r1.done) { - respBody = new Uint8Array(0); - } else { - respBody = r1.value; - const r2 = await reader.read(); - if (!r2.done) throw new TypeError("Unreachable"); - } - } + // If response body length is known, it will be sent synchronously in a + // single op, in other case a "response body" resource will be created and + // we'll be streaming it. + /** @type {ReadableStream | Uint8Array | null} */ + let respBody = null; + if (innerResp.body !== null) { + if (innerResp.body.unusable()) throw new TypeError("Body is unusable."); + if (innerResp.body.streamOrStatic instanceof ReadableStream) { + if ( + innerResp.body.length === null || + innerResp.body.source instanceof Blob + ) { + respBody = innerResp.body.stream; } else { - innerResp.body.streamOrStatic.consumed = true; - respBody = innerResp.body.streamOrStatic.body; + const reader = innerResp.body.stream.getReader(); + const r1 = await reader.read(); + if (r1.done) { + respBody = new Uint8Array(0); + } else { + respBody = r1.value; + const r2 = await reader.read(); + if (!r2.done) throw new TypeError("Unreachable"); + } } } else { - respBody = new Uint8Array(0); + innerResp.body.streamOrStatic.consumed = true; + respBody = innerResp.body.streamOrStatic.body; } - const isStreamingResponseBody = - !(typeof respBody === "string" || respBody instanceof Uint8Array); + } else { + respBody = new Uint8Array(0); + } - try { - await core.opAsync("op_http_write_headers", [ - streamRid, + SetPrototypeDelete(httpConn.managedResources, responseSenderRid); + let responseBodyRid; + try { + responseBodyRid = await core.opAsync( + "op_http_response", + [ + responseSenderRid, innerResp.status ?? 200, innerResp.headerList, - ], isStreamingResponseBody ? null : respBody); - } catch (error) { - const connError = httpConn[connErrorSymbol]; - if (error instanceof BadResource && connError != null) { - // deno-lint-ignore no-ex-assign - error = new connError.constructor(connError.message); - } - if (respBody !== null && respBody instanceof ReadableStream) { - await respBody.cancel(error); - } - throw error; + ], + (respBody instanceof Uint8Array || typeof respBody === "string") + ? respBody + : null, + ); + } catch (error) { + const connError = httpConn[connErrorSymbol]; + if (error instanceof BadResource && connError != null) { + // deno-lint-ignore no-ex-assign + error = new connError.constructor(connError.message); } + if (respBody !== null && respBody instanceof ReadableStream) { + await respBody.cancel(error); + } + throw error; + } - if (isStreamingResponseBody) { + // If `respond` returns a responseBodyRid, we should stream the body + // to that resource. + if (responseBodyRid !== null) { + SetPrototypeAdd(httpConn.managedResources, responseBodyRid); + try { if (respBody === null || !(respBody instanceof ReadableStream)) { throw new TypeError("Unreachable"); } @@ -231,7 +239,11 @@ break; } try { - await core.opAsync("op_http_write", streamRid, value); + await core.opAsync( + "op_http_response_write", + responseBodyRid, + value, + ); } catch (error) { const connError = httpConn[connErrorSymbol]; if (error instanceof BadResource && connError != null) { @@ -242,55 +254,61 @@ throw error; } } + } finally { + // Once all chunks are sent, and the request body is closed, we can + // close the response body. + SetPrototypeDelete(httpConn.managedResources, responseBodyRid); try { - await core.opAsync("op_http_shutdown", streamRid); - } catch (error) { - await reader.cancel(error); - throw error; - } + await core.opAsync("op_http_response_close", responseBodyRid); + } catch { /* pass */ } } + } - const ws = resp[_ws]; - if (ws) { - const wsRid = await core.opAsync( - "op_http_upgrade_websocket", - streamRid, + const ws = resp[_ws]; + if (ws) { + if (typeof requestRid !== "number") { + throw new TypeError( + "This request can not be upgraded to a websocket connection.", ); - ws[_rid] = wsRid; - ws[_protocol] = resp.headers.get("sec-websocket-protocol"); - - httpConn.close(); - - if (ws[_readyState] === WebSocket.CLOSING) { - await core.opAsync("op_ws_close", { rid: wsRid }); - - ws[_readyState] = WebSocket.CLOSED; - - const errEvent = new ErrorEvent("error"); - ws.dispatchEvent(errEvent); - - const event = new CloseEvent("close"); - ws.dispatchEvent(event); - - core.tryClose(wsRid); - } else { - ws[_readyState] = WebSocket.OPEN; - const event = new Event("open"); - ws.dispatchEvent(event); - - ws[_eventLoop](); - } } - } finally { - if (SetPrototypeHas(httpConn.managedResources, streamRid)) { - SetPrototypeDelete(httpConn.managedResources, streamRid); - core.close(streamRid); + + const wsRid = await core.opAsync( + "op_http_upgrade_websocket", + requestRid, + ); + ws[_rid] = wsRid; + ws[_protocol] = resp.headers.get("sec-websocket-protocol"); + + if (ws[_readyState] === WebSocket.CLOSING) { + await core.opAsync("op_ws_close", { rid: wsRid }); + + ws[_readyState] = WebSocket.CLOSED; + + const errEvent = new ErrorEvent("error"); + ws.dispatchEvent(errEvent); + + const event = new CloseEvent("close"); + ws.dispatchEvent(event); + + core.tryClose(wsRid); + } else { + ws[_readyState] = WebSocket.OPEN; + const event = new Event("open"); + ws.dispatchEvent(event); + + ws[_eventLoop](); } + } else if (typeof requestRid === "number") { + // Try to close "request" resource. It might have been already consumed, + // but if it hasn't been we need to close it here to avoid resource + // leak. + SetPrototypeDelete(httpConn.managedResources, requestRid); + core.tryClose(requestRid); } }; } - function createRequestBodyStream(streamRid) { + function createRequestBodyStream(httpConn, requestRid) { return new ReadableStream({ type: "bytes", async pull(controller) { @@ -298,21 +316,32 @@ // This is the largest possible size for a single packet on a TLS // stream. const chunk = new Uint8Array(16 * 1024 + 256); - const read = await readRequest(streamRid, chunk); + const read = await readRequest( + requestRid, + chunk, + ); if (read > 0) { // We read some data. Enqueue it onto the stream. controller.enqueue(TypedArrayPrototypeSubarray(chunk, 0, read)); } else { // We have reached the end of the body, so we close the stream. controller.close(); + SetPrototypeDelete(httpConn.managedResources, requestRid); + core.close(requestRid); } } catch (err) { // There was an error while reading a chunk of the body, so we // error. controller.error(err); controller.close(); + SetPrototypeDelete(httpConn.managedResources, requestRid); + core.close(requestRid); } }, + cancel() { + SetPrototypeDelete(httpConn.managedResources, requestRid); + core.close(requestRid); + }, }); } diff --git a/ext/http/lib.rs b/ext/http/lib.rs index 5a14f845f6..aae6415cb1 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -1,29 +1,17 @@ // Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. -use bytes::Bytes; -use deno_core::error::custom_error; +use deno_core::error::bad_resource_id; +use deno_core::error::type_error; use deno_core::error::AnyError; -use deno_core::futures::channel::mpsc; -use deno_core::futures::channel::oneshot; -use deno_core::futures::future::pending; -use deno_core::futures::future::select; -use deno_core::futures::future::Either; -use deno_core::futures::future::Pending; -use deno_core::futures::future::RemoteHandle; -use deno_core::futures::future::Shared; -use deno_core::futures::never::Never; -use deno_core::futures::pin_mut; -use deno_core::futures::ready; -use deno_core::futures::stream::Peekable; +use deno_core::futures::future::poll_fn; use deno_core::futures::FutureExt; +use deno_core::futures::Stream; use deno_core::futures::StreamExt; -use deno_core::futures::TryFutureExt; use deno_core::include_js_files; use deno_core::op_async; use deno_core::op_sync; use deno_core::AsyncRefCell; use deno_core::ByteString; -use deno_core::CancelFuture; use deno_core::CancelHandle; use deno_core::CancelTryFuture; use deno_core::Extension; @@ -33,31 +21,33 @@ use deno_core::Resource; use deno_core::ResourceId; use deno_core::StringOrBuffer; use deno_core::ZeroCopyBuf; -use deno_websocket::ws_create_server_stream; +use hyper::body::HttpBody; +use hyper::header::CONNECTION; +use hyper::header::SEC_WEBSOCKET_KEY; +use hyper::header::SEC_WEBSOCKET_VERSION; +use hyper::header::UPGRADE; +use hyper::http; use hyper::server::conn::Http; -use hyper::service::Service; +use hyper::service::Service as HyperService; use hyper::Body; +use hyper::Method; use hyper::Request; use hyper::Response; use serde::Deserialize; use serde::Serialize; use std::borrow::Cow; use std::cell::RefCell; -use std::cmp::min; -use std::error::Error; use std::future::Future; -use std::io; -use std::mem::replace; -use std::mem::take; use std::net::SocketAddr; use std::pin::Pin; use std::rc::Rc; -use std::sync::Arc; use std::task::Context; use std::task::Poll; use tokio::io::AsyncRead; +use tokio::io::AsyncReadExt; use tokio::io::AsyncWrite; -use tokio::task::spawn_local; +use tokio::sync::oneshot; +use tokio_util::io::StreamReader; pub fn init() -> Extension { Extension::builder() @@ -66,11 +56,11 @@ pub fn init() -> Extension { "01_http.js", )) .ops(vec![ - ("op_http_accept", op_async(op_http_accept)), - ("op_http_read", op_async(op_http_read)), - ("op_http_write_headers", op_async(op_http_write_headers)), - ("op_http_write", op_async(op_http_write)), - ("op_http_shutdown", op_async(op_http_shutdown)), + ("op_http_request_next", op_async(op_http_request_next)), + ("op_http_request_read", op_async(op_http_request_read)), + ("op_http_response", op_async(op_http_response)), + ("op_http_response_write", op_async(op_http_response_write)), + ("op_http_response_close", op_async(op_http_response_close)), ( "op_http_websocket_accept_header", op_sync(op_http_websocket_accept_header), @@ -83,247 +73,86 @@ pub fn init() -> Extension { .build() } -struct HttpConnResource { - addr: SocketAddr, - scheme: &'static str, - acceptors_tx: mpsc::UnboundedSender, - closed_fut: Shared>>>, - cancel_handle: Rc, // Closes gracefully and cancels accept ops. +struct ServiceInner { + request: Request, + response_tx: oneshot::Sender>, } -impl HttpConnResource { - fn new(io: S, scheme: &'static str, addr: SocketAddr) -> Self - where - S: AsyncRead + AsyncWrite + Unpin + Send + 'static, - { - let (acceptors_tx, acceptors_rx) = mpsc::unbounded::(); - let service = HttpService::new(acceptors_rx); - - let conn_fut = Http::new() - .with_executor(LocalExecutor) - .serve_connection(io, service) - .with_upgrades(); - - // When the cancel handle is used, the connection shuts down gracefully. - // No new HTTP streams will be accepted, but existing streams will be able - // to continue operating and eventually shut down cleanly. - let cancel_handle = CancelHandle::new_rc(); - let shutdown_fut = never().or_cancel(&cancel_handle).fuse(); - - // A local task that polls the hyper connection future to completion. - let task_fut = async move { - pin_mut!(shutdown_fut); - pin_mut!(conn_fut); - let result = match select(conn_fut, shutdown_fut).await { - Either::Left((result, _)) => result, - Either::Right((_, mut conn_fut)) => { - conn_fut.as_mut().graceful_shutdown(); - conn_fut.await - } - }; - filter_enotconn(result).map_err(Arc::from) - }; - let (task_fut, closed_fut) = task_fut.remote_handle(); - let closed_fut = closed_fut.shared(); - spawn_local(task_fut); - - Self { - addr, - scheme, - acceptors_tx, - closed_fut, - cancel_handle, - } - } - - // Accepts a new incoming HTTP request. - async fn accept( - self: &Rc, - ) -> Result, AnyError> { - let fut = async { - let (request_tx, request_rx) = oneshot::channel(); - let (response_tx, response_rx) = oneshot::channel(); - - let acceptor = HttpAcceptor::new(request_tx, response_rx); - self.acceptors_tx.unbounded_send(acceptor).ok()?; - - let request = request_rx.await.ok()?; - let stream = HttpStreamResource::new(self, request, response_tx); - Some(stream) - }; - - async { - match fut.await { - Some(stream) => Ok(Some(stream)), - // Return the connection error, if any. - None => self.closed().map_ok(|_| None).await, - } - } - .try_or_cancel(&self.cancel_handle) - .await - } - - /// A future that completes when this HTTP connection is closed or errors. - async fn closed(&self) -> Result<(), AnyError> { - self.closed_fut.clone().map_err(AnyError::from).await - } - - fn scheme(&self) -> &'static str { - self.scheme - } - - fn addr(&self) -> SocketAddr { - self.addr - } +#[derive(Clone, Default)] +struct Service { + inner: Rc>>, + waker: Rc, } -impl Resource for HttpConnResource { - fn name(&self) -> Cow { - "httpConn".into() - } - - fn close(self: Rc) { - self.cancel_handle.cancel(); - } -} - -/// Creates a new HttpConn resource which uses `io` as its transport. -pub fn http_create_conn_resource( - state: &mut OpState, - io: S, - addr: SocketAddr, - scheme: &'static str, -) -> Result -where - S: AsyncRead + AsyncWrite + Unpin + Send + 'static, -{ - let conn = HttpConnResource::new(io, scheme, addr); - let rid = state.resource_table.add(conn); - Ok(rid) -} - -/// An object that implements the `hyper::Service` trait, through which Hyper -/// delivers incoming HTTP requests. -struct HttpService { - acceptors_rx: Peekable>, -} - -impl HttpService { - fn new(acceptors_rx: mpsc::UnboundedReceiver) -> Self { - let acceptors_rx = acceptors_rx.peekable(); - Self { acceptors_rx } - } -} - -impl Service> for HttpService { +impl HyperService> for Service { type Response = Response; - type Error = oneshot::Canceled; - type Future = oneshot::Receiver>; + type Error = http::Error; + #[allow(clippy::type_complexity)] + type Future = + Pin>>>; fn poll_ready( &mut self, - cx: &mut Context<'_>, + _cx: &mut Context<'_>, ) -> Poll> { - let acceptors_rx = Pin::new(&mut self.acceptors_rx); - let result = ready!(acceptors_rx.poll_peek(cx)) - .map(|_| ()) - .ok_or(oneshot::Canceled); - Poll::Ready(result) - } - - fn call(&mut self, request: Request) -> Self::Future { - let acceptor = self.acceptors_rx.next().now_or_never().flatten().unwrap(); - acceptor.call(request) - } -} - -/// A pair of one-shot channels which first transfer a HTTP request from the -/// Hyper service to the HttpConn resource, and then take the Response back to -/// the service. -struct HttpAcceptor { - request_tx: oneshot::Sender>, - response_rx: oneshot::Receiver>, -} - -impl HttpAcceptor { - fn new( - request_tx: oneshot::Sender>, - response_rx: oneshot::Receiver>, - ) -> Self { - Self { - request_tx, - response_rx, + if self.inner.borrow().is_some() { + Poll::Pending + } else { + Poll::Ready(Ok(())) } } - fn call(self, request: Request) -> oneshot::Receiver> { - let Self { - request_tx, - response_rx, - } = self; - request_tx - .send(request) - .map(|_| response_rx) - .unwrap_or_else(|_| oneshot::channel().1) // Make new canceled receiver. - } -} + fn call(&mut self, req: Request) -> Self::Future { + let (resp_tx, resp_rx) = oneshot::channel(); + self.inner.borrow_mut().replace(ServiceInner { + request: req, + response_tx: resp_tx, + }); -/// A resource representing a single HTTP request/response stream. -struct HttpStreamResource { - conn: Rc, - rd: AsyncRefCell, - wr: AsyncRefCell, - cancel_handle: CancelHandle, -} - -impl HttpStreamResource { - fn new( - conn: &Rc, - request: Request, - response_tx: oneshot::Sender>, - ) -> Self { - Self { - conn: conn.clone(), - rd: HttpRequestReader::Headers(request).into(), - wr: HttpResponseWriter::Headers(response_tx).into(), - cancel_handle: CancelHandle::new(), + async move { + resp_rx.await.or_else(|_| + // Fallback dummy response in case sender was dropped due to closed conn + Response::builder() + .status(hyper::StatusCode::INTERNAL_SERVER_ERROR) + .body(vec![].into())) } + .boxed_local() } } -impl Resource for HttpStreamResource { +type ConnFuture = Pin>>>; + +struct Conn { + scheme: &'static str, + addr: SocketAddr, + conn: Rc>, +} + +struct ConnResource { + hyper_connection: Conn, + deno_service: Service, + cancel: CancelHandle, +} + +impl ConnResource { + // TODO(ry) impl Future for ConnResource? + fn poll(&self, cx: &mut Context<'_>) -> Poll> { + self + .hyper_connection + .conn + .borrow_mut() + .poll_unpin(cx) + .map_err(AnyError::from) + } +} + +impl Resource for ConnResource { fn name(&self) -> Cow { - "httpStream".into() + "httpConnection".into() } fn close(self: Rc) { - self.cancel_handle.cancel(); - } -} - -/// The read half of an HTTP stream. -enum HttpRequestReader { - Headers(Request), - Body(Peekable), - Closed, -} - -impl Default for HttpRequestReader { - fn default() -> Self { - Self::Closed - } -} - -/// The write half of an HTTP stream. -enum HttpResponseWriter { - Headers(oneshot::Sender>), - Body(hyper::body::Sender), - Closed, -} - -impl Default for HttpResponseWriter { - fn default() -> Self { - Self::Closed + self.cancel.cancel() } } @@ -331,7 +160,9 @@ impl Default for HttpResponseWriter { #[derive(Serialize)] #[serde(rename_all = "camelCase")] struct NextRequestResponse( - // stream_rid: + // request_rid: + Option, + // response_sender_rid: ResourceId, // method: // This is a String rather than a ByteString because reqwest will only return @@ -343,40 +174,111 @@ struct NextRequestResponse( String, ); -async fn op_http_accept( +async fn op_http_request_next( state: Rc>, - rid: ResourceId, + conn_rid: ResourceId, _: (), ) -> Result, AnyError> { - let conn = state.borrow().resource_table.get::(rid)?; + let conn_resource = state + .borrow() + .resource_table + .get::(conn_rid)?; - let stream = match conn.accept().await { - Ok(Some(stream)) => Rc::new(stream), - Ok(None) => return Ok(None), - Err(err) => return Err(err), + let cancel = RcRef::map(conn_resource.clone(), |r| &r.cancel); + + poll_fn(|cx| { + conn_resource.deno_service.waker.register(cx.waker()); + + // Check if conn is open/close/errored + let (conn_closed, conn_result) = match conn_resource.poll(cx) { + Poll::Pending => (false, Ok(())), + Poll::Ready(Ok(())) => (true, Ok(())), + Poll::Ready(Err(e)) => { + if should_ignore_error(&e) { + (true, Ok(())) + } else { + (true, Err(e)) + } + } + }; + // Drop conn resource if closed + if conn_closed { + // TODO(ry) close RequestResource associated with connection + // TODO(ry) close ResponseBodyResource associated with connection + // try to close ConnResource, but don't unwrap as it might + // already be closed + let _ = state + .borrow_mut() + .resource_table + .take::(conn_rid); + + // Fail with err if unexpected conn error, early return None otherwise + return Poll::Ready(conn_result.map(|_| None)); + } + + if let Some(inner) = conn_resource.deno_service.inner.borrow_mut().take() { + let Conn { scheme, addr, .. } = conn_resource.hyper_connection; + let mut state = state.borrow_mut(); + let next = + prepare_next_request(&mut state, conn_rid, inner, scheme, addr)?; + Poll::Ready(Ok(Some(next))) + } else { + Poll::Pending + } + }) + .try_or_cancel(cancel) + .await + .map_err(AnyError::from) +} + +fn prepare_next_request( + state: &mut OpState, + conn_rid: ResourceId, + request_resource: ServiceInner, + scheme: &'static str, + addr: SocketAddr, +) -> Result { + let tx = request_resource.response_tx; + let req = request_resource.request; + let method = req.method().to_string(); + let headers = req_headers(&req); + let url = req_url(&req, scheme, addr)?; + + let is_websocket = is_websocket_request(&req); + let can_have_body = !matches!(*req.method(), Method::GET | Method::HEAD); + let has_body = + is_websocket || (can_have_body && req.size_hint().exact() != Some(0)); + + let maybe_request_rid = if has_body { + let request_rid = state.resource_table.add(RequestResource { + conn_rid, + inner: AsyncRefCell::new(RequestOrStreamReader::Request(Some(req))), + cancel: CancelHandle::default(), + }); + Some(request_rid) + } else { + None }; - let rd = RcRef::map(&stream, |r| &r.rd).borrow().await; - let request = match &*rd { - HttpRequestReader::Headers(request) => request, - _ => unreachable!(), - }; + let response_sender_rid = state.resource_table.add(ResponseSenderResource { + sender: tx, + conn_rid, + }); - let method = request.method().to_string(); - let headers = req_headers(request); - let url = req_url(request, conn.scheme(), conn.addr()); - - let stream_rid = state.borrow_mut().resource_table.add_rc(stream); - - let r = NextRequestResponse(stream_rid, method, headers, url); - Ok(Some(r)) + Ok(NextRequestResponse( + maybe_request_rid, + response_sender_rid, + method, + headers, + url, + )) } fn req_url( req: &hyper::Request, scheme: &'static str, addr: SocketAddr, -) -> String { +) -> Result { let host: Cow = if let Some(auth) = req.uri().authority() { match addr.port() { 443 if scheme == "https" => Cow::Borrowed(auth.host()), @@ -386,22 +288,12 @@ fn req_url( } else if let Some(host) = req.uri().host() { Cow::Borrowed(host) } else if let Some(host) = req.headers().get("HOST") { - match host.to_str() { - Ok(host) => Cow::Borrowed(host), - Err(_) => Cow::Owned( - host - .as_bytes() - .iter() - .cloned() - .map(char::from) - .collect::(), - ), - } + Cow::Borrowed(host.to_str()?) } else { Cow::Owned(addr.to_string()) }; let path = req.uri().path_and_query().map_or("/", |p| p.as_str()); - [scheme, "://", &host, path].concat() + Ok([scheme, "://", &host, path].concat()) } fn req_headers( @@ -435,6 +327,68 @@ fn req_headers( headers } +fn is_websocket_request(req: &hyper::Request) -> bool { + req.version() == hyper::Version::HTTP_11 + && req.method() == hyper::Method::GET + && req.headers().contains_key(&SEC_WEBSOCKET_KEY) + && header(req.headers(), &SEC_WEBSOCKET_VERSION) == b"13" + && header(req.headers(), &UPGRADE) + .split(|c| *c == b' ' || *c == b',') + .any(|token| token.eq_ignore_ascii_case(b"websocket")) + && header(req.headers(), &CONNECTION) + .split(|c| *c == b' ' || *c == b',') + .any(|token| token.eq_ignore_ascii_case(b"upgrade")) +} + +fn header<'a>( + h: &'a hyper::http::HeaderMap, + name: &hyper::header::HeaderName, +) -> &'a [u8] { + h.get(name) + .map(hyper::header::HeaderValue::as_bytes) + .unwrap_or_default() +} + +fn should_ignore_error(e: &AnyError) -> bool { + if let Some(e) = e.downcast_ref::() { + use std::error::Error; + if let Some(std_err) = e.source() { + if let Some(io_err) = std_err.downcast_ref::() { + if io_err.kind() == std::io::ErrorKind::NotConnected { + return true; + } + } + } + } + false +} + +pub fn start_http( + state: &mut OpState, + io: IO, + addr: SocketAddr, + scheme: &'static str, +) -> Result { + let deno_service = Service::default(); + + let hyper_connection = Http::new() + .with_executor(LocalExecutor) + .serve_connection(io, deno_service.clone()) + .with_upgrades(); + let conn = Pin::new(Box::new(hyper_connection)); + let conn_resource = ConnResource { + hyper_connection: Conn { + scheme, + addr, + conn: Rc::new(RefCell::new(conn)), + }, + deno_service, + cancel: CancelHandle::default(), + }; + let rid = state.resource_table.add(conn_resource); + Ok(rid) +} + // We use a tuple instead of struct to avoid serialization overhead of the keys. #[derive(Deserialize)] struct RespondArgs( @@ -446,16 +400,27 @@ struct RespondArgs( Vec<(ByteString, ByteString)>, ); -async fn op_http_write_headers( +async fn op_http_response( state: Rc>, args: RespondArgs, data: Option, -) -> Result<(), AnyError> { +) -> Result, AnyError> { let RespondArgs(rid, status, headers) = args; - let stream = state + + let response_sender = state .borrow_mut() .resource_table - .get::(rid)?; + .take::(rid)?; + let response_sender = Rc::try_unwrap(response_sender) + .ok() + .expect("multiple op_http_respond ongoing"); + + let conn_rid = response_sender.conn_rid; + + let conn_resource = state + .borrow() + .resource_table + .get::(conn_rid)?; let mut builder = Response::builder().status(status); @@ -464,138 +429,171 @@ async fn op_http_write_headers( builder = builder.header(key.as_ref(), value.as_ref()); } - let body: Response; - let new_wr: HttpResponseWriter; + let (maybe_response_body_rid, res) = if let Some(d) = data { + // If a body is passed, we use it, and don't return a body for streaming. + (None, builder.body(d.into_bytes().into())?) + } else { + // If no body is passed, we return a writer for streaming the body. + let (sender, body) = Body::channel(); + let res = builder.body(body)?; - match data { - Some(data) => { - // If a buffer was passed, we use it to construct a response body. - body = builder.body(data.into_bytes().into())?; - new_wr = HttpResponseWriter::Closed; - } - None => { - // If no buffer was passed, the caller will stream the response body. - let (body_tx, body_rx) = Body::channel(); - body = builder.body(body_rx)?; - new_wr = HttpResponseWriter::Body(body_tx); - } - } + let response_body_rid = + state.borrow_mut().resource_table.add(ResponseBodyResource { + body: AsyncRefCell::new(sender), + conn_rid, + }); - let mut old_wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await; - let response_tx = match replace(&mut *old_wr, new_wr) { - HttpResponseWriter::Headers(response_tx) => response_tx, - _ => return Err(http_error("response headers already sent")), + (Some(response_body_rid), res) }; - match response_tx.send(body) { - Ok(_) => Ok(()), - Err(_) => { - stream.conn.closed().await?; - Err(http_error("connection closed while sending response")) + // oneshot::Sender::send(v) returns |v| on error, not an error object. + // The only failure mode is the receiver already having dropped its end + // of the channel. + if response_sender.sender.send(res).is_err() { + if let Some(rid) = maybe_response_body_rid { + let _ = state + .borrow_mut() + .resource_table + .take::(rid); } + return Err(type_error("internal communication error")); } + + let result = poll_fn(|cx| match conn_resource.poll(cx) { + Poll::Ready(x) => { + state.borrow_mut().resource_table.close(conn_rid).ok(); + Poll::Ready(x) + } + Poll::Pending => Poll::Ready(Ok(())), + }) + .await; + + if let Err(e) = result { + if let Some(rid) = maybe_response_body_rid { + let _ = state + .borrow_mut() + .resource_table + .take::(rid); + } + return Err(e); + } + + if maybe_response_body_rid.is_none() { + conn_resource.deno_service.waker.wake(); + } + Ok(maybe_response_body_rid) } -async fn op_http_write( - state: Rc>, - rid: ResourceId, - buf: ZeroCopyBuf, -) -> Result<(), AnyError> { - let stream = state - .borrow() - .resource_table - .get::(rid)?; - let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await; - - loop { - let body_tx = match &mut *wr { - HttpResponseWriter::Body(body_tx) => body_tx, - HttpResponseWriter::Headers(_) => { - break Err(http_error("no response headers")) - } - HttpResponseWriter::Closed => { - break Err(http_error("response already completed")) - } - }; - - let bytes = Bytes::copy_from_slice(&buf[..]); - match body_tx.send_data(bytes).await { - Ok(_) => break Ok(()), - Err(err) => { - // Don't return "channel closed", that's an implementation detail. - // Pull up the failure associated with the transport connection instead. - assert!(err.is_closed()); - stream.conn.closed().await?; - // If there was no connection error, drop body_tx. - *wr = HttpResponseWriter::Closed; - } - } - } -} - -/// Gracefully closes the write half of the HTTP stream. Note that this does not -/// remove the HTTP stream resource from the resource table; it still has to be -/// closed with `Deno.core.close()`. -async fn op_http_shutdown( +async fn op_http_response_close( state: Rc>, rid: ResourceId, _: (), ) -> Result<(), AnyError> { - let stream = state - .borrow() - .resource_table - .get::(rid)?; - let mut wr = RcRef::map(&stream, |r| &r.wr).borrow_mut().await; - take(&mut *wr); - Ok(()) -} - -async fn op_http_read( - state: Rc>, - rid: ResourceId, - mut buf: ZeroCopyBuf, -) -> Result { - let stream = state + let resource = state .borrow_mut() .resource_table - .get::(rid)?; - let mut rd = RcRef::map(&stream, |r| &r.rd).borrow_mut().await; + .take::(rid)?; - let body = loop { - match &mut *rd { - HttpRequestReader::Headers(_) => {} - HttpRequestReader::Body(body) => break body, - HttpRequestReader::Closed => return Ok(0), - } - match take(&mut *rd) { - HttpRequestReader::Headers(request) => { - let body = request.into_body().peekable(); - *rd = HttpRequestReader::Body(body); - } - _ => unreachable!(), - }; + let conn_resource = state + .borrow() + .resource_table + .get::(resource.conn_rid)?; + drop(resource); + + let r = poll_fn(|cx| match conn_resource.poll(cx) { + Poll::Ready(x) => Poll::Ready(x), + Poll::Pending => Poll::Ready(Ok(())), + }) + .await; + conn_resource.deno_service.waker.wake(); + r +} + +async fn op_http_request_read( + state: Rc>, + rid: ResourceId, + mut data: ZeroCopyBuf, +) -> Result { + let resource = state + .borrow() + .resource_table + .get::(rid as u32)?; + + let conn_resource = state + .borrow() + .resource_table + .get::(resource.conn_rid)?; + + let mut inner = RcRef::map(resource.clone(), |r| &r.inner) + .borrow_mut() + .await; + + if let RequestOrStreamReader::Request(req) = &mut *inner { + let req = req.take().unwrap(); + let stream: BytesStream = Box::pin(req.into_body().map(|r| { + r.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)) + })); + let reader = StreamReader::new(stream); + *inner = RequestOrStreamReader::StreamReader(reader); }; - let fut = async { - let mut body = Pin::new(body); - loop { - match body.as_mut().peek_mut().await { - Some(Ok(chunk)) if !chunk.is_empty() => { - let len = min(buf.len(), chunk.len()); - buf[..len].copy_from_slice(&chunk.split_to(len)); - break Ok(len); - } - Some(_) => match body.as_mut().next().await.unwrap() { - Ok(chunk) => assert!(chunk.is_empty()), - Err(err) => break Err(AnyError::from(err)), - }, - None => break Ok(0), - } - } + let reader = match &mut *inner { + RequestOrStreamReader::StreamReader(reader) => reader, + _ => unreachable!(), }; - let cancel_handle = RcRef::map(&stream, |r| &r.cancel_handle); - fut.try_or_cancel(cancel_handle).await + let cancel = RcRef::map(resource, |r| &r.cancel); + + let mut read_fut = reader.read(&mut data).try_or_cancel(cancel).boxed_local(); + + poll_fn(|cx| { + if let Poll::Ready(Err(e)) = conn_resource.poll(cx) { + // close ConnResource + // close RequestResource associated with connection + // close ResponseBodyResource associated with connection + return Poll::Ready(Err(e)); + } + + read_fut.poll_unpin(cx).map_err(AnyError::from) + }) + .await +} + +async fn op_http_response_write( + state: Rc>, + rid: ResourceId, + data: ZeroCopyBuf, +) -> Result<(), AnyError> { + let resource = state + .borrow() + .resource_table + .get::(rid as u32)?; + + let conn_resource = state + .borrow() + .resource_table + .get::(resource.conn_rid)?; + + let mut body = RcRef::map(&resource, |r| &r.body).borrow_mut().await; + + let mut send_data_fut = body.send_data(data.to_vec().into()).boxed_local(); + + poll_fn(|cx| { + let r = send_data_fut.poll_unpin(cx).map_err(AnyError::from); + + // Poll connection so the data is flushed + if let Poll::Ready(Err(e)) = conn_resource.poll(cx) { + // close ConnResource + // close RequestResource associated with connection + // close ResponseBodyResource associated with connection + return Poll::Ready(Err(e)); + } + + r + }) + .await?; + + Ok(()) } fn op_http_websocket_accept_header( @@ -615,22 +613,86 @@ async fn op_http_upgrade_websocket( rid: ResourceId, _: (), ) -> Result { - let stream = state + let req_resource = state .borrow_mut() .resource_table - .get::(rid)?; - let mut rd = RcRef::map(&stream, |r| &r.rd).borrow_mut().await; + .take::(rid)?; - let request = match &mut *rd { - HttpRequestReader::Headers(request) => request, - _ => { - return Err(http_error("cannot upgrade because request body was used")) - } - }; + let mut inner = RcRef::map(&req_resource, |r| &r.inner).borrow_mut().await; - let transport = hyper::upgrade::on(request).await?; - let ws_rid = ws_create_server_stream(&state, transport).await?; - Ok(ws_rid) + if let RequestOrStreamReader::Request(req) = inner.as_mut() { + let upgraded = hyper::upgrade::on(req.as_mut().unwrap()).await?; + let stream = + deno_websocket::tokio_tungstenite::WebSocketStream::from_raw_socket( + upgraded, + deno_websocket::tokio_tungstenite::tungstenite::protocol::Role::Server, + None, + ) + .await; + + let (ws_tx, ws_rx) = stream.split(); + let rid = + state + .borrow_mut() + .resource_table + .add(deno_websocket::WsStreamResource { + stream: deno_websocket::WebSocketStreamType::Server { + rx: AsyncRefCell::new(ws_rx), + tx: AsyncRefCell::new(ws_tx), + }, + cancel: Default::default(), + }); + + Ok(rid) + } else { + Err(bad_resource_id()) + } +} + +type BytesStream = + Pin> + Unpin>>; + +enum RequestOrStreamReader { + Request(Option>), + StreamReader(StreamReader), +} + +struct RequestResource { + conn_rid: ResourceId, + inner: AsyncRefCell, + cancel: CancelHandle, +} + +impl Resource for RequestResource { + fn name(&self) -> Cow { + "request".into() + } + + fn close(self: Rc) { + self.cancel.cancel() + } +} + +struct ResponseSenderResource { + sender: oneshot::Sender>, + conn_rid: ResourceId, +} + +impl Resource for ResponseSenderResource { + fn name(&self) -> Cow { + "responseSender".into() + } +} + +struct ResponseBodyResource { + body: AsyncRefCell, + conn_rid: ResourceId, +} + +impl Resource for ResponseBodyResource { + fn name(&self) -> Cow { + "responseBody".into() + } } // Needed so hyper can use non Send futures @@ -643,33 +705,6 @@ where Fut::Output: 'static, { fn execute(&self, fut: Fut) { - spawn_local(fut); + tokio::task::spawn_local(fut); } } - -fn http_error(message: &'static str) -> AnyError { - custom_error("Http", message) -} - -/// Filters out the ever-surprising 'shutdown ENOTCONN' errors. -fn filter_enotconn( - result: Result<(), hyper::Error>, -) -> Result<(), hyper::Error> { - if result - .as_ref() - .err() - .and_then(|err| err.source()) - .and_then(|err| err.downcast_ref::()) - .filter(|err| err.kind() == io::ErrorKind::NotConnected) - .is_some() - { - Ok(()) - } else { - result - } -} - -/// Create a future that is forever pending. -fn never() -> Pending { - pending() -} diff --git a/ext/websocket/lib.rs b/ext/websocket/lib.rs index ba626a45a4..d469b5aaf1 100644 --- a/ext/websocket/lib.rs +++ b/ext/websocket/lib.rs @@ -34,13 +34,12 @@ use std::sync::Arc; use tokio::net::TcpStream; use tokio_rustls::rustls::RootCertStore; use tokio_rustls::TlsConnector; -use tokio_tungstenite::client_async; use tokio_tungstenite::tungstenite::{ handshake::client::Response, protocol::frame::coding::CloseCode, - protocol::CloseFrame, protocol::Role, Message, + protocol::CloseFrame, Message, }; use tokio_tungstenite::MaybeTlsStream; -use tokio_tungstenite::WebSocketStream; +use tokio_tungstenite::{client_async, WebSocketStream}; pub use tokio_tungstenite; // Re-export tokio_tungstenite @@ -73,27 +72,6 @@ pub enum WebSocketStreamType { }, } -pub async fn ws_create_server_stream( - state: &Rc>, - transport: hyper::upgrade::Upgraded, -) -> Result { - let ws_stream = - WebSocketStream::from_raw_socket(transport, Role::Server, None).await; - let (ws_tx, ws_rx) = ws_stream.split(); - - let ws_resource = WsStreamResource { - stream: WebSocketStreamType::Server { - tx: AsyncRefCell::new(ws_tx), - rx: AsyncRefCell::new(ws_rx), - }, - cancel: Default::default(), - }; - - let resource_table = &mut state.borrow_mut().resource_table; - let rid = resource_table.add(ws_resource); - Ok(rid) -} - pub struct WsStreamResource { pub stream: WebSocketStreamType, // When a `WsStreamResource` resource is closed, all pending 'read' ops are diff --git a/runtime/errors.rs b/runtime/errors.rs index 1491161d35..fe6e711931 100644 --- a/runtime/errors.rs +++ b/runtime/errors.rs @@ -17,7 +17,6 @@ use deno_fetch::reqwest; use std::env; use std::error::Error; use std::io; -use std::sync::Arc; fn get_dlopen_error_class(error: &dlopen::Error) -> &'static str { use dlopen::Error::*; @@ -164,10 +163,6 @@ pub fn get_error_class_name(e: &AnyError) -> Option<&'static str> { .map(get_dlopen_error_class) }) .or_else(|| e.downcast_ref::().map(get_hyper_error_class)) - .or_else(|| { - e.downcast_ref::>() - .map(|e| get_hyper_error_class(&**e)) - }) .or_else(|| { e.downcast_ref::().map(|e| { let io_err: io::Error = e.to_owned().into(); diff --git a/runtime/ops/http.rs b/runtime/ops/http.rs index fddac92612..683dc1a576 100644 --- a/runtime/ops/http.rs +++ b/runtime/ops/http.rs @@ -6,7 +6,6 @@ use deno_core::op_sync; use deno_core::Extension; use deno_core::OpState; use deno_core::ResourceId; -use deno_http::http_create_conn_resource; use deno_net::io::TcpStreamResource; use deno_net::ops_tls::TlsStreamResource; @@ -30,7 +29,7 @@ fn op_http_start( let (read_half, write_half) = resource.into_inner(); let tcp_stream = read_half.reunite(write_half)?; let addr = tcp_stream.local_addr()?; - return http_create_conn_resource(state, tcp_stream, addr, "http"); + return deno_http::start_http(state, tcp_stream, addr, "http"); } if let Ok(resource_rc) = state @@ -42,7 +41,7 @@ fn op_http_start( let (read_half, write_half) = resource.into_inner(); let tls_stream = read_half.reunite(write_half); let addr = tls_stream.get_ref().0.local_addr()?; - return http_create_conn_resource(state, tls_stream, addr, "https"); + return deno_http::start_http(state, tls_stream, addr, "https"); } Err(bad_resource_id())