diff --git a/cli/bench/testdata/deno_upgrade_http.js b/cli/bench/testdata/deno_upgrade_http.js deleted file mode 100644 index a959846ce6..0000000000 --- a/cli/bench/testdata/deno_upgrade_http.js +++ /dev/null @@ -1,12 +0,0 @@ -const { serve, upgradeHttpRaw } = Deno; -const u8 = Deno[Deno.internal].core.encode( - "HTTP/1.1 101 Switching Protocols\r\n\r\n", -); - -async function handler(req) { - const [conn, _firstPacket] = upgradeHttpRaw(req); - await conn.write(u8); - await conn.close(); -} - -serve({ hostname: "127.0.0.1", port: 9000 }, handler); diff --git a/cli/tests/unit/serve_test.ts b/cli/tests/unit/serve_test.ts index 6158f587e6..5d5d0428f9 100644 --- a/cli/tests/unit/serve_test.ts +++ b/cli/tests/unit/serve_test.ts @@ -17,6 +17,11 @@ import { } from "./test_util.ts"; import { consoleSize } from "../../../runtime/js/40_tty.js"; +const { + upgradeHttpRaw, + // @ts-expect-error TypeScript (as of 3.7) does not support indexing namespaces by symbol +} = Deno[Deno.internal]; + function createOnErrorCb(ac: AbortController): (err: unknown) => Response { return (err) => { console.error(err); @@ -803,6 +808,85 @@ Deno.test({ permissions: { net: true } }, async function httpServerWebSocket() { await server; }); +Deno.test( + { permissions: { net: true } }, + async function httpServerWebSocketRaw() { + const ac = new AbortController(); + const listeningPromise = deferred(); + const server = Deno.serve({ + handler: async (request) => { + const { conn, response } = upgradeHttpRaw(request); + const buf = new Uint8Array(1024); + let read; + + // Write our fake HTTP upgrade + await conn.write( + new TextEncoder().encode( + "HTTP/1.1 101 Switching Protocols\r\nConnection: Upgraded\r\n\r\nExtra", + ), + ); + + // Upgrade data + read = await conn.read(buf); + assertEquals( + new TextDecoder().decode(buf.subarray(0, read!)), + "Upgrade data", + ); + // Read the packet to echo + read = await conn.read(buf); + // Echo + await conn.write(buf.subarray(0, read!)); + + conn.close(); + return response; + }, + port: 4501, + signal: ac.signal, + onListen: onListen(listeningPromise), + onError: createOnErrorCb(ac), + }); + + await listeningPromise; + + const conn = await Deno.connect({ port: 4501 }); + await conn.write( + new TextEncoder().encode( + "GET / HTTP/1.1\r\nConnection: Upgrade\r\nUpgrade: websocket\r\n\r\nUpgrade data", + ), + ); + const buf = new Uint8Array(1024); + let len; + + // Headers + let headers = ""; + for (let i = 0; i < 2; i++) { + len = await conn.read(buf); + headers += new TextDecoder().decode(buf.subarray(0, len!)); + if (headers.endsWith("Extra")) { + break; + } + } + assertMatch( + headers, + /HTTP\/1\.1 101 Switching Protocols[ ,.A-Za-z:0-9\r\n]*Extra/im, + ); + + // Data to echo + await conn.write(new TextEncoder().encode("buffer data")); + + // Echo + len = await conn.read(buf); + assertEquals( + new TextDecoder().decode(buf.subarray(0, len!)), + "buffer data", + ); + + conn.close(); + ac.abort(); + await server; + }, +); + Deno.test( { permissions: { net: true } }, async function httpServerWebSocketUpgradeTwice() { diff --git a/cli/tsc/dts/lib.deno.unstable.d.ts b/cli/tsc/dts/lib.deno.unstable.d.ts index dc3bfcfc01..f169e0254b 100644 --- a/cli/tsc/dts/lib.deno.unstable.d.ts +++ b/cli/tsc/dts/lib.deno.unstable.d.ts @@ -1516,25 +1516,6 @@ declare namespace Deno { request: Request, ): Promise<[Deno.Conn, Uint8Array]>; - /** **UNSTABLE**: New API, yet to be vetted. - * - * Allows "hijacking" the connection that the request is associated with. - * This can be used to implement protocols that build on top of HTTP (eg. - * {@linkcode WebSocket}). - * - * Unlike {@linkcode Deno.upgradeHttp} this function does not require that you - * respond to the request with a {@linkcode Response} object. Instead this - * function returns the underlying connection and first packet received - * immediately, and then the caller is responsible for writing the response to - * the connection. - * - * This method can only be called on requests originating the - * {@linkcode Deno.serve} server. - * - * @category HTTP Server - */ - export function upgradeHttpRaw(request: Request): [Deno.Conn, Uint8Array]; - /** **UNSTABLE**: New API, yet to be vetted. * * Open a new {@linkcode Deno.Kv} connection to persist data. diff --git a/ext/http/00_serve.js b/ext/http/00_serve.js index 8518e8d621..0b2c605388 100644 --- a/ext/http/00_serve.js +++ b/ext/http/00_serve.js @@ -1,6 +1,7 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. const core = globalThis.Deno.core; const primordials = globalThis.__bootstrap.primordials; +const internals = globalThis.__bootstrap.internals; const { BadResourcePrototype } = core; import { InnerBody } from "ext:deno_fetch/22_body.js"; @@ -10,7 +11,7 @@ import { newInnerResponse, toInnerResponse, } from "ext:deno_fetch/23_response.js"; -import { fromInnerRequest } from "ext:deno_fetch/23_request.js"; +import { fromInnerRequest, toInnerRequest } from "ext:deno_fetch/23_request.js"; import { AbortController } from "ext:deno_web/03_abort_signal.js"; import { _eventLoop, @@ -32,6 +33,7 @@ import { readableStreamForRid, ReadableStreamPrototype, } from "ext:deno_web/06_streams.js"; +import { TcpConn } from "ext:deno_net/01_net.js"; const { ObjectPrototypeIsPrototypeOf, SafeSet, @@ -82,6 +84,14 @@ const UPGRADE_RESPONSE_SENTINEL = fromInnerResponse( "immutable", ); +function upgradeHttpRaw(req, conn) { + const inner = toInnerRequest(req); + if (inner._wantsUpgrade) { + return inner._wantsUpgrade("upgradeHttpRaw", conn); + } + throw new TypeError("upgradeHttpRaw may only be used with Deno.serve"); +} + class InnerRequest { #slabId; #context; @@ -122,10 +132,26 @@ class InnerRequest { throw "upgradeHttp is unavailable in Deno.serve at this time"; } - // upgradeHttpRaw is async - // TODO(mmastrac) + // upgradeHttpRaw is sync if (upgradeType == "upgradeHttpRaw") { - throw "upgradeHttp is unavailable in Deno.serve at this time"; + const slabId = this.#slabId; + const underlyingConn = originalArgs[0]; + + this.url(); + this.headerList; + this.close(); + + this.#upgraded = () => {}; + + const upgradeRid = core.ops.op_upgrade_raw(slabId); + + const conn = new TcpConn( + upgradeRid, + underlyingConn?.remoteAddr, + underlyingConn?.localAddr, + ); + + return { response: UPGRADE_RESPONSE_SENTINEL, conn }; } // upgradeWebSocket is sync @@ -623,4 +649,6 @@ async function serve(arg1, arg2) { } } -export { serve }; +internals.upgradeHttpRaw = upgradeHttpRaw; + +export { serve, upgradeHttpRaw }; diff --git a/ext/http/01_http.js b/ext/http/01_http.js index 95e2cee740..0048eedebb 100644 --- a/ext/http/01_http.js +++ b/ext/http/01_http.js @@ -64,7 +64,6 @@ const { } = primordials; const connErrorSymbol = Symbol("connError"); -const streamRid = Symbol("streamRid"); const _deferred = Symbol("upgradeHttpDeferred"); class HttpConn { @@ -482,16 +481,6 @@ function upgradeHttp(req) { return req[_deferred].promise; } -async function upgradeHttpRaw(req, tcpConn) { - const inner = toInnerRequest(req); - if (inner._wantsUpgrade) { - return inner._wantsUpgrade("upgradeHttpRaw", arguments); - } - - const res = await core.opAsync("op_http_upgrade_early", inner[streamRid]); - return new TcpConn(res, tcpConn.remoteAddr, tcpConn.localAddr); -} - const spaceCharCode = StringPrototypeCharCodeAt(" ", 0); const tabCharCode = StringPrototypeCharCodeAt("\t", 0); const commaCharCode = StringPrototypeCharCodeAt(",", 0); @@ -566,4 +555,4 @@ function buildCaseInsensitiveCommaValueFinder(checkText) { internals.buildCaseInsensitiveCommaValueFinder = buildCaseInsensitiveCommaValueFinder; -export { _ws, HttpConn, serve, upgradeHttp, upgradeHttpRaw, upgradeWebSocket }; +export { _ws, HttpConn, serve, upgradeHttp, upgradeWebSocket }; diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs index 1c2a232e20..593a9c8166 100644 --- a/ext/http/http_next.rs +++ b/ext/http/http_next.rs @@ -10,11 +10,13 @@ use crate::response_body::CompletionHandle; use crate::response_body::ResponseBytes; use crate::response_body::ResponseBytesInner; use crate::response_body::V8StreamHttpResponseBody; +use crate::websocket_upgrade::WebSocketUpgrade; use crate::LocalExecutor; use deno_core::error::AnyError; use deno_core::futures::TryFutureExt; use deno_core::op; use deno_core::AsyncRefCell; +use deno_core::AsyncResult; use deno_core::BufView; use deno_core::ByteString; use deno_core::CancelFuture; @@ -39,6 +41,7 @@ use hyper1::server::conn::http2; use hyper1::service::service_fn; use hyper1::service::HttpService; use hyper1::upgrade::OnUpgrade; + use hyper1::StatusCode; use pin_project::pin_project; use pin_project::pinned_drop; @@ -52,6 +55,10 @@ use std::net::SocketAddr; use std::net::SocketAddrV4; use std::pin::Pin; use std::rc::Rc; + +use tokio::io::AsyncReadExt; +use tokio::io::AsyncWriteExt; + use tokio::task::spawn_local; use tokio::task::JoinHandle; @@ -228,7 +235,79 @@ fn slab_insert( } #[op] -pub fn op_upgrade_raw(_index: usize) {} +pub fn op_upgrade_raw( + state: &mut OpState, + index: u32, +) -> Result { + // Stage 1: extract the upgrade future + let upgrade = with_http_mut(index, |http| { + // Manually perform the upgrade. We're peeking into hyper's underlying machinery here a bit + http + .request_parts + .extensions + .remove::() + .ok_or_else(|| AnyError::msg("upgrade unavailable")) + })?; + + let (read, write) = tokio::io::duplex(1024); + let (read_rx, write_tx) = tokio::io::split(read); + let (mut write_rx, mut read_tx) = tokio::io::split(write); + + spawn_local(async move { + let mut upgrade_stream = WebSocketUpgrade::::default(); + + // Stage 2: Extract the Upgraded connection + let mut buf = [0; 1024]; + let upgraded = loop { + let read = Pin::new(&mut write_rx).read(&mut buf).await?; + match upgrade_stream.write(&buf[..read]) { + Ok(None) => continue, + Ok(Some((response, bytes))) => { + with_resp_mut(index, |resp| *resp = Some(response)); + with_promise_mut(index, |promise| promise.complete(true)); + let mut upgraded = upgrade.await?; + upgraded.write_all(&bytes).await?; + break upgraded; + } + Err(err) => return Err(err), + } + }; + + // Stage 3: Pump the data + let (mut upgraded_rx, mut upgraded_tx) = tokio::io::split(upgraded); + + spawn_local(async move { + let mut buf = [0; 1024]; + loop { + let read = upgraded_rx.read(&mut buf).await?; + if read == 0 { + break; + } + read_tx.write_all(&buf[..read]).await?; + } + Ok::<_, AnyError>(()) + }); + spawn_local(async move { + let mut buf = [0; 1024]; + loop { + let read = write_rx.read(&mut buf).await?; + if read == 0 { + break; + } + upgraded_tx.write_all(&buf[..read]).await?; + } + Ok::<_, AnyError>(()) + }); + + Ok(()) + }); + + Ok( + state + .resource_table + .add(UpgradeStream::new(read_rx, write_tx)), + ) +} #[op] pub async fn op_upgrade( @@ -825,3 +904,57 @@ pub async fn op_http_wait( Ok(u32::MAX) } + +struct UpgradeStream { + read: AsyncRefCell>, + write: AsyncRefCell>, + cancel_handle: CancelHandle, +} + +impl UpgradeStream { + pub fn new( + read: tokio::io::ReadHalf, + write: tokio::io::WriteHalf, + ) -> Self { + Self { + read: AsyncRefCell::new(read), + write: AsyncRefCell::new(write), + cancel_handle: CancelHandle::new(), + } + } + + async fn read(self: Rc, buf: &mut [u8]) -> Result { + let cancel_handle = RcRef::map(self.clone(), |this| &this.cancel_handle); + async { + let read = RcRef::map(self, |this| &this.read); + let mut read = read.borrow_mut().await; + Ok(Pin::new(&mut *read).read(buf).await?) + } + .try_or_cancel(cancel_handle) + .await + } + + async fn write(self: Rc, buf: &[u8]) -> Result { + let cancel_handle = RcRef::map(self.clone(), |this| &this.cancel_handle); + async { + let write = RcRef::map(self, |this| &this.write); + let mut write = write.borrow_mut().await; + Ok(Pin::new(&mut *write).write(buf).await?) + } + .try_or_cancel(cancel_handle) + .await + } +} + +impl Resource for UpgradeStream { + fn name(&self) -> Cow { + "httpRawUpgradeStream".into() + } + + deno_core::impl_readable_byob!(); + deno_core::impl_writable!(); + + fn close(self: Rc) { + self.cancel_handle.cancel(); + } +} diff --git a/ext/http/lib.rs b/ext/http/lib.rs index d5404d189a..cde15af88c 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -32,7 +32,6 @@ use deno_core::RcRef; use deno_core::Resource; use deno_core::ResourceId; use deno_core::StringOrBuffer; -use deno_core::WriteOutcome; use deno_core::ZeroCopyBuf; use deno_net::raw::NetworkStream; use deno_websocket::ws_create_server_stream; @@ -67,11 +66,9 @@ use std::sync::Arc; use std::task::Context; use std::task::Poll; use tokio::io::AsyncRead; -use tokio::io::AsyncReadExt; use tokio::io::AsyncWrite; use tokio::io::AsyncWriteExt; use tokio::task::spawn_local; -use websocket_upgrade::WebSocketUpgrade; use crate::network_buffered_stream::NetworkBufferedStream; use crate::reader_stream::ExternallyAbortableReaderStream; @@ -97,7 +94,6 @@ deno_core::extension!( op_http_write_resource, op_http_shutdown, op_http_websocket_accept_header, - op_http_upgrade_early, op_http_upgrade_websocket, http_next::op_serve_http, http_next::op_serve_http_on, @@ -967,192 +963,6 @@ fn op_http_websocket_accept_header(key: String) -> Result { Ok(base64::encode(digest)) } -struct EarlyUpgradeSocket(AsyncRefCell, CancelHandle); - -enum EarlyUpgradeSocketInner { - PreResponse( - Rc, - WebSocketUpgrade, - // Readers need to block in this state, so they can wait here for the broadcast. - tokio::sync::broadcast::Sender< - Rc>>, - >, - ), - PostResponse( - Rc>>, - Rc>>, - ), -} - -impl EarlyUpgradeSocket { - /// Gets a reader without holding the lock. - async fn get_reader( - self: Rc, - ) -> Result< - Rc>>, - AnyError, - > { - let mut borrow = RcRef::map(self.clone(), |x| &x.0).borrow_mut().await; - let cancel = RcRef::map(self, |x| &x.1); - let inner = &mut *borrow; - match inner { - EarlyUpgradeSocketInner::PreResponse(_, _, tx) => { - let mut rx = tx.subscribe(); - // Ensure we're not borrowing self here - drop(borrow); - Ok( - rx.recv() - .map_err(AnyError::from) - .try_or_cancel(&cancel) - .await?, - ) - } - EarlyUpgradeSocketInner::PostResponse(rx, _) => Ok(rx.clone()), - } - } - - async fn read(self: Rc, data: &mut [u8]) -> Result { - let reader = self.clone().get_reader().await?; - let cancel = RcRef::map(self, |x| &x.1); - Ok( - reader - .borrow_mut() - .await - .read(data) - .try_or_cancel(&cancel) - .await?, - ) - } - - /// Write all the data provided, only holding the lock while we see if the connection needs to be - /// upgraded. - async fn write_all(self: Rc, buf: &[u8]) -> Result<(), AnyError> { - let mut borrow = RcRef::map(self.clone(), |x| &x.0).borrow_mut().await; - let cancel = RcRef::map(self, |x| &x.1); - let inner = &mut *borrow; - match inner { - EarlyUpgradeSocketInner::PreResponse(stream, upgrade, rx_tx) => { - if let Some((resp, extra)) = upgrade.write(buf)? { - let new_wr = HttpResponseWriter::Closed; - let mut old_wr = - RcRef::map(stream.clone(), |r| &r.wr).borrow_mut().await; - let response_tx = match replace(&mut *old_wr, new_wr) { - HttpResponseWriter::Headers(response_tx) => response_tx, - _ => return Err(http_error("response headers already sent")), - }; - - if response_tx.send(resp).is_err() { - stream.conn.closed().await?; - return Err(http_error("connection closed while sending response")); - }; - - let mut old_rd = - RcRef::map(stream.clone(), |r| &r.rd).borrow_mut().await; - let new_rd = HttpRequestReader::Closed; - let upgraded = match replace(&mut *old_rd, new_rd) { - HttpRequestReader::Headers(request) => { - hyper::upgrade::on(request) - .map_err(AnyError::from) - .try_or_cancel(&cancel) - .await? - } - _ => { - return Err(http_error("response already started")); - } - }; - - let (rx, tx) = tokio::io::split(upgraded); - let rx = Rc::new(AsyncRefCell::new(rx)); - let tx = Rc::new(AsyncRefCell::new(tx)); - - // Take the tx and rx lock before we allow anything else to happen because we want to control - // the order of reads and writes. - let mut tx_lock = tx.clone().borrow_mut().await; - let rx_lock = rx.clone().borrow_mut().await; - - // Allow all the pending readers to go now. We still have the lock on inner, so no more - // pending readers can show up. We intentionally ignore errors here, as there may be - // nobody waiting on a read. - _ = rx_tx.send(rx.clone()); - - // We swap out inner here, so once the lock is gone, readers will acquire rx directly. - // We also fully release our lock. - *inner = EarlyUpgradeSocketInner::PostResponse(rx, tx); - drop(borrow); - - // We've updated inner and unlocked it, reads are free to go in-order. - drop(rx_lock); - - // If we had extra data after the response, write that to the upgraded connection - if !extra.is_empty() { - tx_lock.write_all(&extra).try_or_cancel(&cancel).await?; - } - } - } - EarlyUpgradeSocketInner::PostResponse(_, tx) => { - let tx = tx.clone(); - drop(borrow); - tx.borrow_mut() - .await - .write_all(buf) - .try_or_cancel(&cancel) - .await?; - } - }; - Ok(()) - } -} - -impl Resource for EarlyUpgradeSocket { - fn name(&self) -> Cow { - "upgradedHttpConnection".into() - } - - deno_core::impl_readable_byob!(); - - fn write( - self: Rc, - buf: BufView, - ) -> AsyncResult { - Box::pin(async move { - let nwritten = buf.len(); - Self::write_all(self, &buf).await?; - Ok(WriteOutcome::Full { nwritten }) - }) - } - - fn write_all(self: Rc, buf: BufView) -> AsyncResult<()> { - Box::pin(async move { Self::write_all(self, &buf).await }) - } - - fn close(self: Rc) { - self.1.cancel() - } -} - -#[op] -async fn op_http_upgrade_early( - state: Rc>, - rid: ResourceId, -) -> Result { - let stream = state - .borrow_mut() - .resource_table - .get::(rid)?; - let resources = &mut state.borrow_mut().resource_table; - let (tx, _rx) = tokio::sync::broadcast::channel(1); - let socket = EarlyUpgradeSocketInner::PreResponse( - stream, - WebSocketUpgrade::default(), - tx, - ); - let rid = resources.add(EarlyUpgradeSocket( - AsyncRefCell::new(socket), - CancelHandle::new(), - )); - Ok(rid) -} - #[op] async fn op_http_upgrade_websocket( state: Rc>, diff --git a/ext/http/websocket_upgrade.rs b/ext/http/websocket_upgrade.rs index 042a467219..70ad785267 100644 --- a/ext/http/websocket_upgrade.rs +++ b/ext/http/websocket_upgrade.rs @@ -1,12 +1,13 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +use std::marker::PhantomData; + use bytes::Bytes; use bytes::BytesMut; use deno_core::error::AnyError; use httparse::Status; use hyper::http::HeaderName; use hyper::http::HeaderValue; -use hyper::Body; use hyper::Response; use memmem::Searcher; use memmem::TwoWaySearcher; @@ -15,14 +16,14 @@ use once_cell::sync::OnceCell; use crate::http_error; /// Given a buffer that ends in `\n\n` or `\r\n\r\n`, returns a parsed [`Request`]. -fn parse_response( +fn parse_response( header_bytes: &[u8], -) -> Result<(usize, Response), AnyError> { +) -> Result<(usize, Response), AnyError> { let mut headers = [httparse::EMPTY_HEADER; 16]; let status = httparse::parse_headers(header_bytes, &mut headers)?; match status { Status::Complete((index, parsed)) => { - let mut resp = Response::builder().status(101).body(Body::empty())?; + let mut resp = Response::builder().status(101).body(T::default())?; for header in parsed.iter() { resp.headers_mut().append( HeaderName::from_bytes(header.name.as_bytes())?, @@ -59,12 +60,13 @@ static HEADER_SEARCHER: OnceCell = OnceCell::new(); static HEADER_SEARCHER2: OnceCell = OnceCell::new(); #[derive(Default)] -pub struct WebSocketUpgrade { +pub struct WebSocketUpgrade { state: WebSocketUpgradeState, buf: BytesMut, + _t: PhantomData, } -impl WebSocketUpgrade { +impl WebSocketUpgrade { /// Ensures that the status line starts with "HTTP/1.1 101 " which matches all of the node.js /// WebSocket libraries that are known. We don't care about the trailing status text. fn validate_status(&self, status: &[u8]) -> Result<(), AnyError> { @@ -80,7 +82,7 @@ impl WebSocketUpgrade { pub fn write( &mut self, bytes: &[u8], - ) -> Result, Bytes)>, AnyError> { + ) -> Result, Bytes)>, AnyError> { use WebSocketUpgradeState::*; match self.state { @@ -153,6 +155,7 @@ impl WebSocketUpgrade { #[cfg(test)] mod tests { use super::*; + use hyper::Body; type ExpectedResponseAndHead = Option<(Response, &'static [u8])>; diff --git a/ext/node/polyfills/http.ts b/ext/node/polyfills/http.ts index d8ec7650bc..785bbaab3a 100644 --- a/ext/node/polyfills/http.ts +++ b/ext/node/polyfills/http.ts @@ -16,7 +16,7 @@ import { Agent } from "ext:deno_node/_http_agent.mjs"; import { chunkExpression as RE_TE_CHUNKED } from "ext:deno_node/_http_common.ts"; import { urlToHttpOptions } from "ext:deno_node/internal/url.ts"; import { constants, TCP } from "ext:deno_node/internal_binding/tcp_wrap.ts"; -import * as denoHttp from "ext:deno_http/01_http.js"; +import { upgradeHttpRaw } from "ext:deno_http/00_serve.js"; import * as httpRuntime from "ext:runtime/40_http.js"; import { connResetException } from "ext:deno_node/internal/errors.ts"; @@ -704,7 +704,7 @@ class ServerImpl extends EventEmitter { } const req = new IncomingMessageForServer(reqEvent.request, tcpConn); if (req.upgrade && this.listenerCount("upgrade") > 0) { - const conn = await denoHttp.upgradeHttpRaw( + const conn = await upgradeHttpRaw( reqEvent.request, tcpConn, ) as Deno.Conn;