From ad87d6f4b00db22b57b54da0093b44c6c14bccdf Mon Sep 17 00:00:00 2001 From: snek Date: Mon, 6 Jan 2025 16:28:30 +0100 Subject: [PATCH] feat(unstable): WebTransport --- Cargo.lock | 15 + cli/tsc/99_main_compiler.js | 10 +- ext/net/03_quic.js | 58 +- ext/net/Cargo.toml | 3 + ext/net/lib.rs | 2 + ext/net/quic.rs | 354 +++++- ext/web/06_streams.js | 8 +- ext/web/lib.deno_web.d.ts | 218 ++++ ext/web/lib.rs | 1 + ext/web/webtransport.js | 1125 +++++++++++++++++++ ext/webidl/00_webidl.js | 10 + runtime/js/90_deno_ns.js | 5 + runtime/js/98_global_scope_shared.js | 29 + tests/integration/lsp_tests.rs | 4 +- tests/specs/run/webtransport/__test__.jsonc | 5 + tests/specs/run/webtransport/deno.json | 4 + tests/specs/run/webtransport/main.ts | 100 ++ tools/core_import_map.json | 1 + 18 files changed, 1935 insertions(+), 17 deletions(-) create mode 100644 ext/web/webtransport.js create mode 100644 tests/specs/run/webtransport/__test__.jsonc create mode 100644 tests/specs/run/webtransport/deno.json create mode 100644 tests/specs/run/webtransport/main.ts diff --git a/Cargo.lock b/Cargo.lock index 7a6108b2e7..febbfa3c5d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2013,9 +2013,12 @@ dependencies = [ "quinn", "rustls-tokio-stream", "serde", + "sha2", "socket2", "thiserror 2.0.3", "tokio", + "url", + "web-transport-proto", ] [[package]] @@ -8869,6 +8872,18 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "web-transport-proto" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a3806ea43df5817f0d90618c842d28db5946bc18a5db0659b2275c2be48d472" +dependencies = [ + "bytes", + "http 1.1.0", + "thiserror 1.0.64", + "url", +] + [[package]] name = "webpki-root-certs" version = "0.26.6" diff --git a/cli/tsc/99_main_compiler.js b/cli/tsc/99_main_compiler.js index 65319211fb..ffe45ccc3d 100644 --- a/cli/tsc/99_main_compiler.js +++ b/cli/tsc/99_main_compiler.js @@ -1481,9 +1481,15 @@ delete Object.prototype.__proto__; options: SNAPSHOT_COMPILE_OPTIONS, host, }); + const errors = ts.getPreEmitDiagnostics(TS_SNAPSHOT_PROGRAM); assert( - ts.getPreEmitDiagnostics(TS_SNAPSHOT_PROGRAM).length === 0, - "lib.d.ts files have errors", + errors.length === 0, + `lib.d.ts files have errors:\n${ + ts.formatDiagnosticsWithColorAndContext( + errors, + host, + ) + }`, ); // remove this now that we don't need it anymore for warming up tsc diff --git a/ext/net/03_quic.js b/ext/net/03_quic.js index d74d356edb..a07eef8e79 100644 --- a/ext/net/03_quic.js +++ b/ext/net/03_quic.js @@ -34,6 +34,8 @@ import { op_quic_send_stream_get_id, op_quic_send_stream_get_priority, op_quic_send_stream_set_priority, + op_webtransport_accept, + op_webtransport_connect, } from "ext:core/ops"; import { getReadableStreamResourceBacking, @@ -50,6 +52,7 @@ const { const { ObjectPrototypeIsPrototypeOf, PromisePrototypeThen, + ReflectConstruct, Symbol, SymbolAsyncIterator, SafePromisePrototypeFinally, @@ -205,6 +208,9 @@ class QuicIncoming { } } +let webtransportConnect; +let webtransportAccept; + class QuicConn { #resource; #bidiStream = null; @@ -309,6 +315,43 @@ class QuicConn { close({ closeCode = 0, reason = "" } = { __proto__: null }) { op_quic_connection_close(this.#resource, closeCode, reason); } + + static { + webtransportConnect = async function webtransportConnect(conn, url) { + const { + 0: connectTxRid, + 1: connectRxRid, + 2: settingsTxRid, + 3: settingsRxRid, + } = await op_webtransport_connect(conn.#resource, url); + const connect = new QuicBidirectionalStream( + connectTxRid, + connectRxRid, + conn.closed, + ); + const settingsTx = writableStream(settingsTxRid, conn.closed); + const settingsRx = readableStream(settingsRxRid, conn.closed); + return { connect, settingsTx, settingsRx }; + }; + + webtransportAccept = async function webtransportAccept(conn) { + const { + 0: url, + 1: connectTxRid, + 2: connectRxRid, + 3: settingsTxRid, + 4: settingsRxRid, + } = await op_webtransport_accept(conn.#resource); + const connect = new QuicBidirectionalStream( + connectTxRid, + connectRxRid, + conn.closed, + ); + const settingsTx = writableStream(settingsTxRid, conn.closed); + const settingsRx = readableStream(settingsRxRid, conn.closed); + return { url, connect, settingsTx, settingsRx }; + }; + } } class QuicSendStream extends WritableStream { @@ -345,7 +388,11 @@ function readableStream(rid, closed) { SafePromisePrototypeFinally(closed, () => { core.tryClose(rid); }); - return readableStreamForRid(rid, true, QuicReceiveStream); + return readableStreamForRid( + rid, + true, + (...args) => ReflectConstruct(QuicReceiveStream, args), + ); } function writableStream(rid, closed) { @@ -353,7 +400,11 @@ function writableStream(rid, closed) { SafePromisePrototypeFinally(closed, () => { core.tryClose(rid); }); - return writableStreamForRid(rid, true, QuicSendStream); + return writableStreamForRid( + rid, + true, + (...args) => ReflectConstruct(QuicSendStream, args), + ); } class QuicBidirectionalStream { @@ -421,6 +472,7 @@ function connectQuic(options) { caCerts: options.caCerts, alpnProtocols: options.alpnProtocols, serverName: options.serverName, + serverCertificateHashes: options.serverCertificateHashes, }, transportOptions(options), keyPair, @@ -448,4 +500,6 @@ export { QuicListener, QuicReceiveStream, QuicSendStream, + webtransportAccept, + webtransportConnect, }; diff --git a/ext/net/Cargo.toml b/ext/net/Cargo.toml index 348d8682ae..071a15fbe0 100644 --- a/ext/net/Cargo.toml +++ b/ext/net/Cargo.toml @@ -24,6 +24,9 @@ pin-project.workspace = true quinn = { version = "0.11.6", default-features = false, features = ["runtime-tokio", "rustls", "ring"] } rustls-tokio-stream.workspace = true serde.workspace = true +sha2.workspace = true socket2.workspace = true thiserror.workspace = true tokio.workspace = true +url.workspace = true +web-transport-proto = "0.2.3" diff --git a/ext/net/lib.rs b/ext/net/lib.rs index b21da19f30..739d331a02 100644 --- a/ext/net/lib.rs +++ b/ext/net/lib.rs @@ -196,6 +196,8 @@ deno_core::extension!(deno_net, quic::op_quic_send_stream_get_id, quic::op_quic_send_stream_get_priority, quic::op_quic_send_stream_set_priority, + quic::webtransport::op_webtransport_accept, + quic::webtransport::op_webtransport_connect, ], esm = [ "01_net.js", "02_tls.js" ], lazy_loaded_esm = [ "03_quic.js" ], diff --git a/ext/net/quic.rs b/ext/net/quic.rs index af13a3f009..29176da221 100644 --- a/ext/net/quic.rs +++ b/ext/net/quic.rs @@ -93,12 +93,27 @@ pub enum QuicError { #[class("BadResource")] #[error("{0}")] ClosedStream(#[from] quinn::ClosedStream), + #[class(generic)] + #[error("{0}")] + ReadError(#[from] quinn::ReadError), + #[class(generic)] + #[error("{0}")] + WriteError(#[from] quinn::WriteError), #[class("BadResource")] #[error("Invalid {0} resource")] BadResource(&'static str), #[class(range)] #[error("Connection has reached the maximum number of concurrent outgoing {0} streams")] MaxStreams(&'static str), + #[class(generic)] + #[error("Peer does not support WebTransport")] + WebTransportPeerUnsupported, + #[class(generic)] + #[error("{0}")] + WebTransportSettingsError(#[from] web_transport_proto::SettingsError), + #[class(generic)] + #[error("{0}")] + WebTransportConnectError(#[from] web_transport_proto::ConnectError), #[class(inherit)] #[error(transparent)] Other(#[from] JsErrorBox), @@ -475,6 +490,14 @@ struct ConnectArgs { ca_certs: Option>, alpn_protocols: Option>, server_name: Option, + server_certificate_hashes: Option>, +} + +#[derive(Deserialize)] +#[serde(rename_all = "camelCase")] +struct CertificateHash { + algorithm: String, + value: JsBuffer, } #[op2] @@ -515,13 +538,28 @@ where .map(|s| s.into_bytes()) .collect::>(); - let mut tls_config = create_client_config( - root_cert_store, - ca_certs, - unsafely_ignore_certificate_errors, - key_pair.take(), - SocketUse::GeneralSsl, - )?; + let mut tls_config = if let Some(hashes) = args.server_certificate_hashes { + deno_tls::rustls::ClientConfig::builder() + .dangerous() + .with_custom_certificate_verifier(Arc::new( + webtransport::ServerFingerprints::new( + hashes + .into_iter() + .filter(|h| h.algorithm.to_lowercase() == "sha-256") + .map(|h| h.value.to_vec()) + .collect(), + ), + )) + .with_no_client_auth() + } else { + create_client_config( + root_cert_store, + ca_certs, + unsafely_ignore_certificate_errors, + key_pair.take(), + SocketUse::GeneralSsl, + )? + }; if let Some(alpn_protocols) = args.alpn_protocols { tls_config.alpn_protocols = @@ -925,3 +963,305 @@ pub(crate) fn op_quic_recv_stream_get_id( let stream_id = quinn::VarInt::from(resource.stream_id).into_inner(); Ok(stream_id) } + +pub(crate) mod webtransport { + // MIT License + // + // Copyright (c) 2023 Luke Curley + // + // Permission is hereby granted, free of charge, to any person obtaining a copy + // of this software and associated documentation files (the "Software"), to deal + // in the Software without restriction, including without limitation the rights + // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + // copies of the Software, and to permit persons to whom the Software is + // furnished to do so, subject to the following conditions: + // + // The above copyright notice and this permission notice shall be included in all + // copies or substantial portions of the Software. + // + // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + // SOFTWARE. + // + // https://github.com/kixelated/web-transport-rs + + use deno_core::futures::try_join; + use deno_tls::rustls; + use rustls::client::danger::ServerCertVerifier; + use rustls::crypto::verify_tls12_signature; + use rustls::crypto::verify_tls13_signature; + use rustls::crypto::CryptoProvider; + use sha2::Digest; + use sha2::Sha256; + + use super::*; + + async fn exchange_settings( + state: Rc>, + conn: quinn::Connection, + ) -> Result<(u32, u32), QuicError> { + use web_transport_proto::Settings; + use web_transport_proto::SettingsError; + + let settings_tx_rid = async { + let mut tx = conn.open_uni().await?; + + let mut settings = Settings::default(); + settings.enable_webtransport(1); + + let mut buf = vec![]; + settings.encode(&mut buf); + tx.write_all(&buf).await?; + + let rid = state + .borrow_mut() + .resource_table + .add(SendStreamResource::new(tx)); + + Ok(rid) + }; + + let settings_rx_rid = async { + let mut rx = conn.accept_uni().await?; + let mut buf = Vec::new(); + + loop { + let chunk = rx.read_chunk(usize::MAX, true).await?; + let chunk = chunk.ok_or(QuicError::WebTransportPeerUnsupported)?; + buf.extend_from_slice(&chunk.bytes); + + let mut limit = std::io::Cursor::new(&buf); + + let settings = match Settings::decode(&mut limit) { + Ok(settings) => settings, + Err(SettingsError::UnexpectedEnd) => continue, + Err(e) => return Err(e.into()), + }; + + if settings.supports_webtransport() == 0 { + return Err(QuicError::WebTransportPeerUnsupported); + } + + break; + } + + let rid = state + .borrow_mut() + .resource_table + .add(RecvStreamResource::new(rx)); + + Ok(rid) + }; + + let (settings_tx_rid, settings_rx_rid) = + try_join!(settings_tx_rid, settings_rx_rid)?; + + Ok((settings_tx_rid, settings_rx_rid)) + } + + #[op2(async)] + #[serde] + pub(crate) async fn op_webtransport_connect( + state: Rc>, + #[cppgc] connection_resource: &ConnectionResource, + #[string] url: String, + ) -> Result<(u32, u32, u32, u32), QuicError> { + use web_transport_proto::ConnectError; + use web_transport_proto::ConnectRequest; + use web_transport_proto::ConnectResponse; + + let conn = connection_resource.0.clone(); + let url = url::Url::parse(&url).unwrap(); + + let (settings_tx_rid, settings_rx_rid) = + exchange_settings(state.clone(), conn.clone()).await?; + + let (connect_tx_rid, connect_rx_rid) = { + let (mut tx, mut rx) = conn.open_bi().await?; + + let request = ConnectRequest { url: url.clone() }; + + let mut buf = Vec::new(); + request.encode(&mut buf); + tx.write_all(&buf).await?; + + buf.clear(); + loop { + let chunk = rx.read_chunk(usize::MAX, true).await?; + let chunk = chunk.ok_or(QuicError::WebTransportPeerUnsupported)?; + buf.extend_from_slice(&chunk.bytes); + + let mut limit = std::io::Cursor::new(&buf); + + let res = match ConnectResponse::decode(&mut limit) { + Ok(res) => res, + Err(ConnectError::UnexpectedEnd) => { + continue; + } + Err(e) => return Err(e.into()), + }; + + if res.status != 200 { + return Err(QuicError::WebTransportPeerUnsupported); + } + + break; + } + + let mut state = state.borrow_mut(); + let tx_rid = state.resource_table.add(SendStreamResource::new(tx)); + let rx_rid = state.resource_table.add(RecvStreamResource::new(rx)); + + (tx_rid, rx_rid) + }; + + Ok(( + connect_tx_rid, + connect_rx_rid, + settings_tx_rid, + settings_rx_rid, + )) + } + + #[op2(async)] + #[serde] + pub(crate) async fn op_webtransport_accept( + state: Rc>, + #[cppgc] connection_resource: &ConnectionResource, + ) -> Result<(String, u32, u32, u32, u32), QuicError> { + use web_transport_proto::ConnectError; + use web_transport_proto::ConnectRequest; + use web_transport_proto::ConnectResponse; + + let conn = connection_resource.0.clone(); + + let (settings_tx_rid, settings_rx_rid) = + exchange_settings(state.clone(), conn.clone()).await?; + + let (url, connect_tx_rid, connect_rx_rid) = { + let (mut tx, mut rx) = conn.accept_bi().await?; + + let mut buf = Vec::new(); + + let req = loop { + let chunk = rx.read_chunk(usize::MAX, true).await?; + let chunk = chunk.ok_or(QuicError::WebTransportPeerUnsupported)?; + buf.extend_from_slice(&chunk.bytes); + + let mut limit = std::io::Cursor::new(&buf); + + let req = match ConnectRequest::decode(&mut limit) { + Ok(res) => res, + Err(ConnectError::UnexpectedEnd) => { + continue; + } + Err(e) => return Err(e.into()), + }; + + break req; + }; + + buf.clear(); + let resp = ConnectResponse { + status: 200u16.try_into().unwrap(), + }; + resp.encode(&mut buf); + tx.write_all(&buf).await?; + + let mut state = state.borrow_mut(); + let tx_rid = state.resource_table.add(SendStreamResource::new(tx)); + let rx_rid = state.resource_table.add(RecvStreamResource::new(rx)); + + (req.url, tx_rid, rx_rid) + }; + + Ok(( + url.to_string(), + connect_tx_rid, + connect_rx_rid, + settings_tx_rid, + settings_rx_rid, + )) + } + + #[derive(Debug)] + pub(crate) struct ServerFingerprints { + fingerprints: Vec>, + provider: CryptoProvider, + } + + impl ServerFingerprints { + pub(crate) fn new(fingerprints: Vec>) -> ServerFingerprints { + Self { + fingerprints, + provider: rustls::crypto::ring::default_provider(), + } + } + } + + impl ServerCertVerifier for ServerFingerprints { + fn verify_server_cert( + &self, + end_entity: &rustls::pki_types::CertificateDer<'_>, + _intermediates: &[rustls::pki_types::CertificateDer<'_>], + _server_name: &rustls::pki_types::ServerName<'_>, + _ocsp_response: &[u8], + _now: rustls::pki_types::UnixTime, + ) -> Result { + let cert_hash = Sha256::digest(end_entity); + + if self + .fingerprints + .iter() + .any(|fingerprint| fingerprint == cert_hash.as_slice()) + { + return Ok(rustls::client::danger::ServerCertVerified::assertion()); + } + + Err(rustls::Error::InvalidCertificate( + rustls::CertificateError::UnknownIssuer, + )) + } + + fn verify_tls12_signature( + &self, + message: &[u8], + cert: &rustls::pki_types::CertificateDer<'_>, + dss: &rustls::DigitallySignedStruct, + ) -> Result + { + verify_tls12_signature( + message, + cert, + dss, + &self.provider.signature_verification_algorithms, + ) + } + + fn verify_tls13_signature( + &self, + message: &[u8], + cert: &rustls::pki_types::CertificateDer<'_>, + dss: &rustls::DigitallySignedStruct, + ) -> Result + { + verify_tls13_signature( + message, + cert, + dss, + &self.provider.signature_verification_algorithms, + ) + } + + fn supported_verify_schemes(&self) -> Vec { + self + .provider + .signature_verification_algorithms + .supported_schemes() + } + } +} diff --git a/ext/web/06_streams.js b/ext/web/06_streams.js index 950d46e829..2ad350c553 100644 --- a/ext/web/06_streams.js +++ b/ext/web/06_streams.js @@ -908,8 +908,8 @@ const _original = Symbol("[[original]]"); * @param {boolean=} autoClose If the resource should be auto-closed when the stream closes. Defaults to true. * @returns {ReadableStream} */ -function readableStreamForRid(rid, autoClose = true, Super, onError) { - const stream = new (Super ?? ReadableStream)(_brand); +function readableStreamForRid(rid, autoClose = true, cfn, onError) { + const stream = cfn ? cfn(_brand) : new ReadableStream(_brand); stream[_resourceBacking] = { rid, autoClose }; const tryClose = () => { @@ -1134,8 +1134,8 @@ async function readableStreamCollectIntoUint8Array(stream) { * @param {boolean=} autoClose If the resource should be auto-closed when the stream closes. Defaults to true. * @returns {ReadableStream} */ -function writableStreamForRid(rid, autoClose = true, Super) { - const stream = new (Super ?? WritableStream)(_brand); +function writableStreamForRid(rid, autoClose = true, cfn) { + const stream = cfn ? cfn(_brand) : new WritableStream(_brand); stream[_resourceBacking] = { rid, autoClose }; const tryClose = () => { diff --git a/ext/web/lib.deno_web.d.ts b/ext/web/lib.deno_web.d.ts index 1fb003b66f..f9a9d42ae7 100644 --- a/ext/web/lib.deno_web.d.ts +++ b/ext/web/lib.deno_web.d.ts @@ -1378,3 +1378,221 @@ declare var ImageData: { settings?: ImageDataSettings, ): ImageData; }; + +/** @category Platform */ +interface WebTransportCloseInfo { + closeCode?: number; + reason?: string; +} + +/** @category Platform */ +interface WebTransportErrorOptions { + source?: WebTransportErrorSource; + streamErrorCode?: number | null; +} + +/** @category Platform */ +interface WebTransportHash { + algorithm?: string; + value?: BufferSource; +} + +/** @category Platform */ +interface WebTransportOptions { + allowPooling?: boolean; + congestionControl?: WebTransportCongestionControl; + requireUnreliable?: boolean; + serverCertificateHashes?: WebTransportHash[]; +} + +/** @category Platform */ +interface WebTransportSendStreamOptions { + sendGroup?: WebTransportSendGroup; + sendOrder?: number; + waitUntilAvailable?: boolean; +} + +/** + * [MDN Reference](https://developer.mozilla.org/docs/Web/API/WebTransport) + * @category Platform + */ +interface WebTransport { + /** [MDN Reference](https://developer.mozilla.org/docs/Web/API/WebTransport/closed) */ + readonly closed: Promise; + /** [MDN Reference](https://developer.mozilla.org/docs/Web/API/WebTransport/datagrams) */ + readonly datagrams: WebTransportDatagramDuplexStream; + /** [MDN Reference](https://developer.mozilla.org/docs/Web/API/WebTransport/incomingBidirectionalStreams) */ + readonly incomingBidirectionalStreams: ReadableStream< + WebTransportBidirectionalStream + >; + /** [MDN Reference](https://developer.mozilla.org/docs/Web/API/WebTransport/incomingUnidirectionalStreams) */ + readonly incomingUnidirectionalStreams: ReadableStream< + WebTransportReceiveStream + >; + /** [MDN Reference](https://developer.mozilla.org/docs/Web/API/WebTransport/ready) */ + readonly ready: Promise; + /** [MDN Reference](https://developer.mozilla.org/docs/Web/API/WebTransport/close) */ + close(closeInfo?: WebTransportCloseInfo): void; + /** [MDN Reference](https://developer.mozilla.org/docs/Web/API/WebTransport/createBidirectionalStream) */ + createBidirectionalStream( + options?: WebTransportSendStreamOptions, + ): Promise; + /** [MDN Reference](https://developer.mozilla.org/docs/Web/API/WebTransport/createUnidirectionalStream) */ + createUnidirectionalStream( + options?: WebTransportSendStreamOptions, + ): Promise; + /** [MDN Reference](https://developer.mozilla.org/docs/Web/API/WebTransport/createSendGroup) */ + createSendGroup(): WebTransportSendGroup; +} + +/** @category Platform */ +declare var WebTransport: { + prototype: WebTransport; + new (url: string | URL, options?: WebTransportOptions): WebTransport; +}; + +/** + * [MDN Reference](https://developer.mozilla.org/docs/Web/API/WebTransportBidirectionalStream) + * @category Platform + */ +interface WebTransportBidirectionalStream { + /** [MDN Reference](https://developer.mozilla.org/docs/Web/API/WebTransportBidirectionalStream/readable) */ + readonly readable: WebTransportReceiveStream; + /** [MDN Reference](https://developer.mozilla.org/docs/Web/API/WebTransportBidirectionalStream/writable) */ + readonly writable: WebTransportSendStream; +} + +/** @category Platform */ +declare var WebTransportBidirectionalStream: { + prototype: WebTransportBidirectionalStream; + new (): WebTransportBidirectionalStream; +}; + +/** + * [MDN Reference](https://developer.mozilla.org/docs/Web/API/WebTransportDatagramDuplexStream) + * @category Platform + */ +interface WebTransportDatagramDuplexStream { + /** [MDN Reference](https://developer.mozilla.org/docs/Web/API/WebTransportDatagramDuplexStream/incomingHighWaterMark) */ + incomingHighWaterMark: number; + /** [MDN Reference](https://developer.mozilla.org/docs/Web/API/WebTransportDatagramDuplexStream/incomingMaxAge) */ + incomingMaxAge: number | null; + /** [MDN Reference](https://developer.mozilla.org/docs/Web/API/WebTransportDatagramDuplexStream/maxDatagramSize) */ + readonly maxDatagramSize: number; + /** [MDN Reference](https://developer.mozilla.org/docs/Web/API/WebTransportDatagramDuplexStream/outgoingHighWaterMark) */ + outgoingHighWaterMark: number; + /** [MDN Reference](https://developer.mozilla.org/docs/Web/API/WebTransportDatagramDuplexStream/outgoingMaxAge) */ + outgoingMaxAge: number | null; + /** [MDN Reference](https://developer.mozilla.org/docs/Web/API/WebTransportDatagramDuplexStream/readable) */ + readonly readable: WebTransportReceiveStream; + /** [MDN Reference](https://developer.mozilla.org/docs/Web/API/WebTransportDatagramDuplexStream/writable) */ + readonly writable: WebTransportSendStream; +} + +/** @category Platform */ +declare var WebTransportDatagramDuplexStream: { + prototype: WebTransportDatagramDuplexStream; + new (): WebTransportDatagramDuplexStream; +}; + +/** + * [MDN Reference](https://developer.mozilla.org/docs/Web/API/WebTransportSendStream) + * @category Platform + */ +interface WebTransportSendStream extends WritableStream { + /** [MDN Reference](https://developer.mozilla.org/docs/Web/API/WebTransportSendStream/sendOrder) */ + sendOrder: number; + /** [MDN Reference](https://developer.mozilla.org/docs/Web/API/WebTransportSendStream/sendGroup) */ + sendGroup?: WebTransportSendGroup; + /** [MDN Reference](https://developer.mozilla.org/docs/Web/API/WebTransportSendStream/getStats) */ + getStats(): Promise; + /** [MDN Reference](https://developer.mozilla.org/docs/Web/API/WebTransportSendStream/getWriter) */ + getWriter(): WebTransportWriter; +} + +/** @category Platform */ +declare var WebTransportSendStream: { + prototype: WebTransportSendStream; + new (): WebTransportSendStream; +}; + +/** @category Platform */ +interface WebTransportSendStreamStats { + bytesWritten: number; + bytesSent: number; + bytesAcknowledged: number; +} + +/** + * [MDN Reference](https://developer.mozilla.org/docs/Web/API/WebTransportWriter) + * @category Platform + */ +interface WebTransportWriter extends WritableStreamDefaultWriter { + /** [MDN Reference](https://developer.mozilla.org/docs/Web/API/WebTransportWriter/atomicWrite) */ + atomicWrite(chunk: any): Promise; +} + +/** @category Platform */ +declare var WebTransportWriter: { + prototype: WebTransportWriter; + new (): WebTransportWriter; +}; + +/** + * [MDN Reference](https://developer.mozilla.org/docs/Web/API/WebTransportReceiveStream) + * @category Platform + */ +interface WebTransportReceiveStream extends ReadableStream { + /** [MDN Reference](https://developer.mozilla.org/docs/Web/API/WebTransportReceiveStream/getStats) */ + getStats(): Promise; +} + +/** @category Platform */ +declare var WebTransportReceiveStream: { + prototype: WebTransportReceiveStream; + new (): WebTransportReceiveStream; +}; + +/** @category Platform */ +interface WebTransportReceiveStreamStats { + bytesReceived: number; + bytesRead: number; +} + +/** + * [MDN Reference](https://developer.mozilla.org/docs/Web/API/WebTransportSendGroup) + * @category Platform + */ +interface WebTransportSendGroup { + /** [MDN Reference](https://developer.mozilla.org/docs/Web/API/WebTransportSendGroup/getStats) */ + getStats(): Promise; +} + +/** @category Platform */ +declare var WebTransportSendGroup: { + prototype: WebTransportSendGroup; + new (): WebTransportSendGroup; +}; + +/** + * [MDN Reference](https://developer.mozilla.org/docs/Web/API/WebTransportError) + * @category Platform + */ +interface WebTransportError extends DOMException { + /** [MDN Reference](https://developer.mozilla.org/docs/Web/API/WebTransportError/source) */ + readonly source: WebTransportErrorSource; + /** [MDN Reference](https://developer.mozilla.org/docs/Web/API/WebTransportError/streamErrorCode) */ + readonly streamErrorCode: number | null; +} + +/** @category Platform */ +declare var WebTransportError: { + prototype: WebTransportError; + new (message?: string, options?: WebTransportErrorOptions): WebTransportError; +}; + +/** @category Platform */ +type WebTransportCongestionControl = "default" | "low-latency" | "throughput"; + +/** @category Platform */ +type WebTransportErrorSource = "session" | "stream"; diff --git a/ext/web/lib.rs b/ext/web/lib.rs index 7d22fa3b2a..387a4d5263 100644 --- a/ext/web/lib.rs +++ b/ext/web/lib.rs @@ -113,6 +113,7 @@ deno_core::extension!(deno_web, "15_performance.js", "16_image_data.js", ], + lazy_loaded_esm = [ "webtransport.js" ], options = { blob_store: Arc, maybe_location: Option, diff --git a/ext/web/webtransport.js b/ext/web/webtransport.js new file mode 100644 index 0000000000..383046a408 --- /dev/null +++ b/ext/web/webtransport.js @@ -0,0 +1,1125 @@ +// Copyright 2018-2025 the Deno authors. MIT license. + +import { primordials } from "ext:core/mod.js"; +import * as webidl from "ext:deno_webidl/00_webidl.js"; +import { + connectQuic, + webtransportAccept, + webtransportConnect, +} from "ext:deno_net/03_quic.js"; +import { assert } from "ext:deno_web/00_infra.js"; +import { DOMException } from "ext:deno_web/01_dom_exception.js"; +import { + getReadableStreamResourceBacking, + getWritableStreamResourceBacking, + ReadableStream, + readableStreamForRid, + WritableStream, + WritableStreamDefaultWriter, + writableStreamForRid, +} from "ext:deno_web/06_streams.js"; +import { getLocationHref } from "ext:deno_web/12_location.js"; + +const { + ArrayBuffer, + ArrayBufferPrototype, + ArrayBufferIsView, + ArrayPrototypeConcat, + ArrayPrototypeShift, + ArrayPrototypePush, + DataView, + DataViewPrototype, + DataViewPrototypeGetUint16, + DataViewPrototypeGetUint32, + DataViewPrototypeGetBigUint64, + DataViewPrototypeSetUint16, + DataViewPrototypeSetUint32, + DataViewPrototypeSetBigUint64, + DateNow, + BigInt, + Number, + ObjectPrototypeIsPrototypeOf, + Promise, + PromiseReject, + PromiseResolve, + PromisePrototypeThen, + PromisePrototypeCatch, + RangeError, + ReflectConstruct, + Symbol, + TypedArrayPrototypeGetBuffer, + TypedArrayPrototypeGetByteLength, + TypedArrayPrototypeGetByteOffset, + TypeError, + Uint8Array, +} = primordials; + +const MAX_PRIORITY = 2_147_483_647; +const BI_WEBTRANSPORT = 0x41n; +const UNI_WEBTRANSPORT = 0x54n; +const UNI_QPACK_ENCODER = 0x02n; +const UNI_QPACK_DECODER = 0x03n; + +function encodeVarint(x) { + x = BigInt(x); + if (x < 2n ** 6n) { + return new Uint8Array([Number(x)]); + } + if (x < 2n ** 14n) { + const s = Number(0b01n << 14n | x); + const a = new Uint8Array(2); + const v = new DataView(TypedArrayPrototypeGetBuffer(a)); + DataViewPrototypeSetUint16(v, 0, s, false); + return a; + } + if (x < 2n ** 30n) { + const s = Number(0b10n << 30n | x); + const a = new Uint8Array(4); + const v = new DataView(TypedArrayPrototypeGetBuffer(a)); + DataViewPrototypeSetUint32(v, 0, s, false); + return a; + } + if (x < 2n ** 62n) { + const s = 0b11n << 62n | x; + const a = new Uint8Array(8); + const v = new DataView(TypedArrayPrototypeGetBuffer(a)); + DataViewPrototypeSetBigUint64(v, 0, s, false); + return a; + } + throw new RangeError("invalid varint"); +} + +function decodeVarint(data) { + const view = new DataView( + TypedArrayPrototypeGetBuffer(data), + TypedArrayPrototypeGetByteOffset(data), + TypedArrayPrototypeGetByteLength(data), + ); + const tag = data[0] >> 6; + switch (tag) { + case 0b00: + return BigInt(data[0]); + case 0b01: { + const v = DataViewPrototypeGetUint16(view, 0, false); + return BigInt(v) & ((2n ** 12n) - 1n); + } + case 0b10: { + const v = DataViewPrototypeGetUint32(view, 0, false); + return BigInt(v) & ((2n ** 28n) - 1n); + } + case 0b11: { + const v = DataViewPrototypeGetBigUint64(view, 0, false); + return v & ((2n ** 60n) - 1n); + } + default: + throw new RangeError(); + } +} + +async function readVarint(reader) { + const buf = new ArrayBuffer(8); + const { value: b1 } = await reader.read(new Uint8Array(buf, 0, 1)); + const size = 1 << (b1[0] >> 6); + if (size === 1) { + return decodeVarint(b1); + } + const { value: bn } = await reader.read( + new Uint8Array(TypedArrayPrototypeGetBuffer(b1), 1, size - 1), + ); + return decodeVarint( + new Uint8Array(TypedArrayPrototypeGetBuffer(bn, 0, size)), + ); +} + +function equal(a, b) { + if (a.length !== b.length) return false; + for (let i = 0; i < a.length; i += 1) { + if (a[i] !== b[i]) return false; + } + return true; +} + +function concat(a, b) { + const c = new Uint8Array(a.length + b.length); + c.set(a, 0); + c.set(b, a.length); + return c; +} + +const illegalConstructorKey = Symbol("illegalConstructorKey"); + +class WebTransport { + [webidl.brand] = webidl.brand; + #conn; + #promise; + #ready; + // deno-lint-ignore prefer-primordials + #closed = Promise.withResolvers(); + #settingsTx; + #settingsRx; + #connect; + #headerUni; + #headerBi; + #reliability = "pending"; + #congestionControl = "default"; + #anticipatedConcurrentIncomingBidirectionalStreams = null; + #anticipatedConcurrentIncomingUnidirectionalStreams = null; + #incomingBidirectionalStreams; + #incomingUnidirectionalStreams; + #datagrams; + + // endpoint may incorrectly open qpack streams. + // if so, we need to hold on to them so they + // aren't closed, even though we will never do + // anything with them :( + #qpackEncoder; + #qpackDecoder; + + constructor(url, options) { + let promise; + + if (url === illegalConstructorKey) { + promise = PromiseResolve(options); + } else { + const prefix = "Failed to construct 'WebTransport'"; + webidl.requiredArguments(arguments.length, 1, prefix); + url = webidl.converters.USVString(url, prefix, "Argument 1"); + options = webidl.converters.WebTransportOptions( + options, + prefix, + "Argument 2", + ); + + let parsedURL; + try { + parsedURL = new URL(url, getLocationHref()); + } catch (e) { + throw new DOMException(e.message, "SyntaxError"); + } + + switch (options.congestionControl) { + case "throughput": + this.#congestionControl = "throughput"; + break; + case "low-latency": + this.#congestionControl = "low-latency"; + break; + default: + this.#congestionControl = "default"; + } + this.#anticipatedConcurrentIncomingBidirectionalStreams = + options.anticipatedConcurrentIncomingBidirectionalStreams; + this.#anticipatedConcurrentIncomingUnidirectionalStreams = + options.anticipatedConcurrentIncomingUnidirectionalStreams; + + promise = PromisePrototypeThen( + connectQuic({ + hostname: parsedURL.hostname, + port: Number(parsedURL.port) || 443, + keepAliveInterval: 4e3, + maxIdleTimeout: 10e3, + congestionControl: options.congestionControl, + alpnProtocols: ["h3"], + serverCertificateHashes: options.serverCertificateHashes, + }), + async (conn) => { + const { connect, settingsTx, settingsRx } = await webtransportConnect( + conn, + // deno-lint-ignore prefer-primordials + parsedURL.toString(), + ); + + return { + conn, + connect, + settingsTx, + settingsRx, + }; + }, + ); + } + + PromisePrototypeCatch(promise, () => this.#closed.resolve()); + + promise = PromisePrototypeThen( + promise, + ({ conn, connect, settingsTx, settingsRx }) => { + this.#conn = conn; + this.#closed.resolve(conn.closed); + + const sessionId = connect.writable.id; + const sessionIdBuf = encodeVarint(sessionId); + this.#headerBi = concat(encodeVarint(BI_WEBTRANSPORT), sessionIdBuf); + this.#headerUni = concat(encodeVarint(UNI_WEBTRANSPORT), sessionIdBuf); + + this.#settingsTx = settingsTx; + this.#settingsRx = settingsRx; + this.#connect = connect; + + this.#reliability = "supports-unreliable"; + + return { conn, sessionId, sessionIdBuf }; + }, + ); + + this.#promise = promise; + this.#datagrams = new WebTransportDatagramDuplexStream( + illegalConstructorKey, + promise, + ); + this.#ready = PromisePrototypeThen(promise, () => undefined, (e) => { + throw new WebTransportError(e.message); + }); + } + + getStats() { + webidl.assertBranded(this, WebTransportPrototype); + return PromiseResolve({ + bytesSent: 0, + packetsSent: 0, + bytesLost: 0, + packetsLost: 0, + bytesReceived: 0, + packetsReceived: 0, + smoothedRtt: 0, + rttVariation: 0, + minRtt: 0, + estimatedSendRate: null, + atSendCapacity: false, + }); + } + + get ready() { + webidl.assertBranded(this, WebTransportPrototype); + return this.#ready; + } + + get reliability() { + webidl.assertBranded(this, WebTransportPrototype); + return this.#reliability; + } + + get congestionControl() { + webidl.assertBranded(this, WebTransportPrototype); + return this.#congestionControl; + } + + get anticipatedConcurrentIncomingUnidirectionalStreams() { + webidl.assertBranded(this, WebTransportPrototype); + return this.#anticipatedConcurrentIncomingUnidirectionalStreams; + } + + get anticipatedConcurrentIncomingBidirectionalStreams() { + webidl.assertBranded(this, WebTransportPrototype); + return this.#anticipatedConcurrentIncomingBidirectionalStreams; + } + + get closed() { + webidl.assertBranded(this, WebTransportPrototype); + return this.#closed.promise; + } + + close(closeInfo) { + webidl.assertBranded(this, WebTransportPrototype); + closeInfo = webidl.converters.WebTransportCloseInfo( + closeInfo, + "Failed to execute 'close' on 'WebTransport'", + "Argument 1", + ); + if (!this.#conn) { + throw new WebTransportError("WebTransport is not connected", { + source: "session", + }); + } + this.#conn.close({ + closeCode: closeInfo.closeCode, + reason: closeInfo.reason, + }); + } + + get datagrams() { + webidl.assertBranded(this, WebTransportPrototype); + return this.#datagrams; + } + + async createBidirectionalStream(options) { + webidl.assertBranded(this, WebTransportPrototype); + options = webidl.converters.WebTransportSendStreamOptions( + options, + "Failed to execute 'createBidirectionalStream' on 'WebTransport'", + "Argument 1", + ); + + const { conn } = await this.#promise; + const bidi = await conn.createBidirectionalStream({ + waitUntilAvailable: options.waitUntilAvailable, + }); + + bidi.writable.sendOrder = MAX_PRIORITY; + const writer = bidi.writable.getWriter(); + await writer.write(this.#headerBi); + writer.releaseLock(); + bidi.writable.sendOrder = options.sendOrder || 0; + + const wrapper = new WebTransportBidirectionalStream( + illegalConstructorKey, + bidi, + ); + if (options.sendGroup) { + wrapper.writable.sendGroup = options.sendGroup; + } + + return wrapper; + } + + get incomingBidirectionalStreams() { + webidl.assertBranded(this, WebTransportPrototype); + if (!this.#incomingBidirectionalStreams) { + const readerPromise = PromisePrototypeThen( + this.#promise, + ({ conn, sessionId }) => ({ + sessionId, + reader: conn.incomingBidirectionalStreams.getReader(), + }), + ); + this.#incomingBidirectionalStreams = new ReadableStream({ + pull: async (controller) => { + const { sessionId, reader } = await readerPromise; + const { value: bidi, done } = await reader.read(); + if (done) { + controller.close(); + } else { + const reader = bidi.readable.getReader({ mode: "byob" }); + try { + const type = await readVarint(reader); + if (type !== BI_WEBTRANSPORT) return; + const sid = await readVarint(reader); + if (sid !== sessionId) return; + controller.enqueue( + new WebTransportBidirectionalStream( + illegalConstructorKey, + bidi, + ), + ); + } finally { + reader.releaseLock(); + } + } + }, + }); + } + return this.#incomingBidirectionalStreams; + } + + async createUnidirectionalStream(options) { + webidl.assertBranded(this, WebTransportPrototype); + options = webidl.converters.WebTransportSendStreamOptions( + options, + "Failed to execute 'createUnidirectionalStream' on 'WebTransport'", + "Argument 1", + ); + + const { conn } = await this.#promise; + const stream = await conn.createUnidirectionalStream({ + waitUntilAvailable: options.waitUntilAvailable, + }); + + stream.sendOrder = MAX_PRIORITY; + const writer = stream.getWriter(); + await writer.write(this.#headerUni); + writer.releaseLock(); + stream.sendOrder = options.sendOrder || 0; + + const wrapper = writableStream(stream); + if (options.sendGroup) { + wrapper.sendGroup = options.sendGroup; + } + + return wrapper; + } + + get incomingUnidirectionalStreams() { + webidl.assertBranded(this, WebTransportPrototype); + + if (!this.#incomingUnidirectionalStreams) { + const readerPromise = PromisePrototypeThen( + this.#promise, + ({ conn, sessionId }) => ({ + sessionId, + reader: conn.incomingUnidirectionalStreams.getReader(), + }), + ); + this.#incomingUnidirectionalStreams = new ReadableStream({ + pull: async (controller) => { + const { reader, sessionId } = await readerPromise; + const { value: stream, done } = await reader.read(); + if (done) { + controller.close(); + } else { + const reader = stream.getReader({ mode: "byob" }); + try { + const type = await readVarint(reader); + switch (type) { + case UNI_QPACK_ENCODER: + this.#qpackEncoder = stream; + return; + case UNI_QPACK_DECODER: + this.#qpackDecoder = stream; + return; + case UNI_WEBTRANSPORT: { + const sid = await readVarint(reader); + if (sid === sessionId) { + controller.enqueue( + readableStream(stream), + ); + } + break; + } + default: + break; + } + } finally { + reader.releaseLock(); + } + } + }, + }); + } + + return this.#incomingUnidirectionalStreams; + } + + createSendGroup() { + webidl.assertBranded(this, WebTransportPrototype); + + return new WebTransportSendGroup(illegalConstructorKey); + } + + static get supportsReliableOnly() { + return false; + } +} +webidl.configureInterface(WebTransport); +const WebTransportPrototype = WebTransport.prototype; + +async function upgradeWebTransport(conn) { + const { url, connect, settingsTx, settingsRx } = await webtransportAccept( + conn, + ); + const wt = new WebTransport(illegalConstructorKey, { + conn, + connect, + settingsTx, + settingsRx, + }); + wt.url = url; + return wt; +} + +function readableStream(stream) { + return readableStreamForRid( + getReadableStreamResourceBacking(stream).rid, + false, // input stream already has cleanup + (...args) => + ReflectConstruct( + WebTransportReceiveStream, + ArrayPrototypeConcat(args, [illegalConstructorKey, stream]), + ), + ); +} + +function writableStream(stream) { + return writableStreamForRid( + getWritableStreamResourceBacking(stream).rid, + false, // input stream already has cleanup + (...args) => + ReflectConstruct( + WebTransportSendStream, + ArrayPrototypeConcat(args, [illegalConstructorKey, stream]), + ), + ); +} + +class WebTransportBidirectionalStream { + [webidl.brand] = webidl.brand; + #inner; + #readable; + #writable; + + constructor(key, inner) { + if (key !== illegalConstructorKey) { + webidl.illegalConstructor(); + } + this.#inner = inner; + } + + get readable() { + webidl.assertBranded(this, WebTransportBidirectionalStreamPrototype); + if (!this.#readable) { + this.#readable = readableStream(this.#inner.readable); + } + return this.#readable; + } + + get writable() { + webidl.assertBranded(this, WebTransportBidirectionalStreamPrototype); + if (!this.#writable) { + this.#writable = writableStream(this.#inner.writable); + } + return this.#writable; + } +} +webidl.configureInterface(WebTransportBidirectionalStream); +const WebTransportBidirectionalStreamPrototype = + WebTransportBidirectionalStream.prototype; + +class WebTransportSendStream extends WritableStream { + [webidl.brand] = webidl.brand; + #inner; + #sendGroup = null; + + constructor(brand, key, inner) { + if (key !== illegalConstructorKey) { + webidl.illegalConstructor(); + } + super(brand); + this.#inner = inner; + } + + get sendGroup() { + webidl.assertBranded(this, WebTransportSendStreamPrototype); + return this.#sendGroup; + } + + set sendGroup(value) { + webidl.assertBranded(this, WebTransportSendStreamPrototype); + value = webidl.converters.WebTransportSendGroup( + value, + "Failed to execute 'sendGroup' on 'WebTransportSendStream'", + ); + this.#sendGroup = value; + } + + get sendOrder() { + webidl.assertBranded(this, WebTransportSendStreamPrototype); + return this.#inner.sendOrder; + } + + set sendOrder(sendOrder) { + webidl.assertBranded(this, WebTransportSendStreamPrototype); + sendOrder = webidl.converters["long long"]( + sendOrder, + "Failed to execute 'sendOrder' on 'WebTransportSendStream'", + ); + this.#inner.sendOrder = sendOrder; + } + + getStats() { + webidl.assertBranded(this, WebTransportSendStreamPrototype); + return PromiseResolve({ + bytesWritten: 0, + bytesSent: 0, + bytesAcknowledged: 0, + }); + } + + getWriter() { + webidl.assertBranded(this, WebTransportSendStreamPrototype); + return new WebTransportWriter(this); + } +} +webidl.configureInterface(WebTransportSendStream); +const WebTransportSendStreamPrototype = WebTransportSendStream.prototype; + +class WebTransportReceiveStream extends ReadableStream { + [webidl.brand] = webidl.brand; + #inner; + + constructor(brand, key, inner) { + if (key !== illegalConstructorKey) { + webidl.illegalConstructor(); + } + super(brand); + this.#inner = inner; + } + + getStats() { + webidl.assertBranded(this, WebTransportReceiveStreamPrototype); + return PromiseResolve({ + bytesReceived: 0, + bytesRead: 0, + }); + } +} +webidl.configureInterface(WebTransportReceiveStream); +const WebTransportReceiveStreamPrototype = WebTransportReceiveStream.prototype; + +class WebTransportWriter extends WritableStreamDefaultWriter { + [webidl.brand] = webidl.brand; + + // atomicWrite() {} +} +webidl.configureInterface(WebTransportWriter); + +class WebTransportDatagramDuplexStream { + [webidl.brand] = webidl.brand; + #promise; + #conn; + #sessionIdBuf; + #readable; + #readableController; + #writable; + #incomingMaxAge = Infinity; + #outgoingMaxAge = Infinity; + #incomingHighWaterMark = 1; + #outgoingHighWaterMark = 5; + #incomingDatagramsPullPromise = null; + #incomingDatagramsQueue = []; + #outgoingDatagramsQueue = []; + #sending = false; + + constructor(key, promise) { + if (key !== illegalConstructorKey) { + webidl.illegalConstructor(); + } + + this.#promise = promise; + PromisePrototypeThen(promise, ({ conn, sessionIdBuf }) => { + this.#conn = conn; + this.#sessionIdBuf = sessionIdBuf; + }); + + this.#receiveDatagrams(); + } + + async #receiveDatagrams() { + const { conn, sessionIdBuf } = await this.#promise; + while (true) { + const queue = this.#incomingDatagramsQueue; + const duration = this.#incomingMaxAge ?? Infinity; + + let datagram; + try { + datagram = await conn.readDatagram(); + } catch { + break; + } + if (!equal(datagram.subarray(0, sessionIdBuf.length), sessionIdBuf)) { + continue; + } + datagram = datagram.subarray(sessionIdBuf.length); + + ArrayPrototypePush(queue, { datagram, timestamp: DateNow() }); + + const toBeRemoved = queue.length - this.#incomingHighWaterMark; + while (toBeRemoved > 0) { + ArrayPrototypeShift(queue); + } + + while (queue.length > 0) { + const { timestamp } = queue[0]; + if (DateNow() - timestamp > duration) { + ArrayPrototypeShift(queue); + } else { + break; + } + } + + if (queue.length > 0 && this.#incomingDatagramsPullPromise) { + const { datagram } = ArrayPrototypeShift(queue); + const promise = this.#incomingDatagramsPullPromise; + this.#incomingDatagramsPullPromise = null; + this.#readableController.enqueue(datagram); + promise.resolve(undefined); + } + } + } + + async #sendDatagrams() { + if (this.#sending) return; + this.#sending = true; + const { conn, sessionIdBuf } = await this.#promise; + + const queue = this.#outgoingDatagramsQueue; + const duration = this.#outgoingMaxAge ?? Infinity; + while (queue.length > 0) { + const { bytes, timestamp, promise } = ArrayPrototypeShift(queue); + + if (DateNow() - timestamp > duration) { + promise.resolve(undefined); + continue; + } + + if (bytes.length <= this.maxDatagramSize) { + const datagram = concat(sessionIdBuf, bytes); + try { + await conn.sendDatagram(datagram); + } catch { + break; + } + } + + promise.resolve(undefined); + } + + this.#sending = false; + } + + get incomingMaxAge() { + webidl.assertBranded(this, WebTransportDatagramDuplexStreamPrototype); + return this.#incomingMaxAge; + } + + set incomingMaxAge(value) { + webidl.assertBranded(this, WebTransportDatagramDuplexStreamPrototype); + value = webidl.converters["unrestricted double?"]( + value, + "Failed to execute 'incomingMaxAge' on 'WebTransportDatagramDuplexStream'", + ); + if (value < 0 || NumberIsNaN(value)) { + throw new RangeError("incomingMaxAge cannot be negative"); + } + if (value === 0) value = null; + this.#incomingMaxAge = value; + } + + get outgoingMaxAge() { + webidl.assertBranded(this, WebTransportDatagramDuplexStreamPrototype); + return this.#outgoingMaxAge; + } + + set outgoingMaxAge(value) { + webidl.assertBranded(this, WebTransportDatagramDuplexStreamPrototype); + value = webidl.converters["unrestricted double?"]( + value, + "Failed to execute 'outgoingMaxAge' on 'WebTransportDatagramDuplexStream'", + ); + if (value < 0 || NumberIsNaN(value)) { + throw new RangeError("outgoingMaxAge cannot be negative"); + } + if (value === 0) value = null; + this.#outgoingMaxAge = value; + } + + get incomingHighWaterMark() { + webidl.assertBranded(this, WebTransportDatagramDuplexStreamPrototype); + return this.#incomingHighWaterMark; + } + + set incomingHighWaterMark(value) { + webidl.assertBranded(this, WebTransportDatagramDuplexStreamPrototype); + value = webidl.converters["unrestricted double"]( + value, + "Failed to execute 'incomingHighWaterMark' on 'WebTransportDatagramDuplexStream'", + ); + if (value < 0 || NumberIsNaN(value)) { + throw new RangeError("incomingHighWaterMark cannot be negative"); + } + if (value < 1) value = 1; + this.#incomingHighWaterMark = value; + } + + get outgoingHighWaterMark() { + webidl.assertBranded(this, WebTransportDatagramDuplexStreamPrototype); + return this.#outgoingHighWaterMark; + } + + set outgoingHighWaterMark(value) { + webidl.assertBranded(this, WebTransportDatagramDuplexStreamPrototype); + value = webidl.converters["unrestricted double"]( + value, + "Failed to execute 'outgoingHighWaterMark' on 'WebTransportDatagramDuplexStream'", + ); + if (value < 0 || NumberIsNaN(value)) { + throw new RangeError("outgoingHighWaterMark cannot be negative"); + } + if (value < 1) value = 1; + this.#outgoingHighWaterMark = value; + } + + get maxDatagramSize() { + webidl.assertBranded(this, WebTransportDatagramDuplexStreamPrototype); + if (this.#conn) { + return this.#conn.maxDatagramSize - this.#sessionIdBuf.length; + } + return 1024; + } + + get readable() { + webidl.assertBranded(this, WebTransportDatagramDuplexStreamPrototype); + if (!this.#readable) { + this.#readable = new ReadableStream({ + type: "bytes", + start: (controller) => { + PromisePrototypeThen( + PromisePrototypeThen(this.#promise, ({ conn }) => conn.closed), + () => { + try { + controller.close(); + } catch { + // nothing + } + }, + ); + this.#readableController = controller; + }, + pull: (controller) => { + assert(this.#incomingDatagramsPullPromise === null); + const queue = this.#incomingDatagramsQueue; + if (queue.length === 0) { + // deno-lint-ignore prefer-primordials + this.#incomingDatagramsPullPromise = Promise.withResolvers(); + return this.#incomingDatagramsPullPromise.promise; + } + const { datagram } = ArrayPrototypeShift(queue); + if (controller.byobRequest) { + const view = controller.byobRequest.view; + if ( + ObjectPrototypeIsPrototypeOf(DataViewPrototype, view) || + TypedArrayPrototypeGetLength(view) < datagram.length + ) { + return PromiseReject( + new RangeError("BYOB view is not large enough for datagram"), + ); + } + if (view.constructor.BYTES_PER_ELEMENT !== 1) { + return PromiseReject( + new TypeError("BYOB view must be Uint8Array"), + ); + } + view.set(datagram); + controller.byobRequest.respond(datagram.length); + } else { + controller.enqueue(datagram); + } + return PromiseResolve(undefined); + }, + }, { highWaterMark: 0 }); + } + return this.#readable; + } + + get writable() { + webidl.assertBranded(this, WebTransportDatagramDuplexStreamPrototype); + if (!this.#writable) { + this.#writable = new WritableStream({ + write: (data) => { + if ( + !(ObjectPrototypeIsPrototypeOf(ArrayBufferPrototype, data) || + ArrayBufferIsView(data)) + ) { + return PromiseReject(new TypeError("Invalid BYOB view")); + } + if (data.length > this.maxDatagramSize) { + return PromiseResolve(undefined); + } + return new Promise((resolve, reject) => { + const bytes = new Uint8Array(data.length); + bytes.set(data); + const chunk = { + bytes, + timestamp: DateNow(), + promise: { resolve, reject }, + }; + ArrayPrototypePush(this.#outgoingDatagramsQueue, chunk); + if ( + this.#outgoingDatagramsQueue.length < this.#outgoingHighWaterMark + ) { + resolve(undefined); + } + this.#sendDatagrams(); + }); + }, + }); + } + return this.#writable; + } +} +webidl.configureInterface(WebTransportDatagramDuplexStream); +const WebTransportDatagramDuplexStreamPrototype = + WebTransportDatagramDuplexStream.prototype; + +class WebTransportSendGroup { + constructor(key) { + if (key !== illegalConstructorKey) { + webidl.illegalConstructor(); + } + } + + getStats() { + webidl.assertBranded(this, WebTransportSendGroupPrototype); + return PromiseResolve({ + bytesWritten: 0, + bytesSent: 0, + bytesAcknowledged: 0, + }); + } +} +webidl.configureInterface(WebTransportSendGroup); +const WebTransportSendGroupPrototype = WebTransportSendGroup.prototype; + +class WebTransportError extends DOMException { + #source; + #streamErrorCode; + + constructor(message = "", init = { __proto__: null }) { + super(message, "WebTransportError"); + this[webidl.brand] = webidl.brand; + + init = webidl.converters["WebTransportErrorOptions"]( + init, + "Failed to construct 'WebTransportError'", + "Argument 2", + ); + + this.#source = init.source; + this.#streamErrorCode = init.streamErrorCode; + } + + get source() { + webidl.assertBranded(this, WebTransportErrorPrototype); + return this.#source; + } + + get streamErrorCode() { + webidl.assertBranded(this, WebTransportErrorPrototype); + return this.#streamErrorCode; + } +} +webidl.configureInterface(WebTransportError); +const WebTransportErrorPrototype = WebTransportError.prototype; + +webidl.converters.WebTransportSendGroup = webidl.createInterfaceConverter( + "WebTransportSendGroup", + WebTransportSendGroupPrototype, +); + +webidl.converters.WebTransportSendStreamOptions = webidl + .createDictionaryConverter("WebTransportSendStreamOptions", [ + { + key: "sendGroup", + converter: webidl.converters.WebTransportSendGroup, + }, + { + key: "sendOrder", + converter: webidl.converters["long long"], + defaultValue: 0, + }, + { + key: "waitUntilAvailable", + converter: webidl.converters.boolean, + defaultValue: false, + }, + ]); + +webidl.converters.WebTransportCloseInfo = webidl.createDictionaryConverter( + "WebTransportCloseInfo", + [ + { + key: "closeCode", + converter: webidl.converters["unsigned long"], + defaultValue: 0, + }, + { + key: "reason", + converter: webidl.converters.USVString, + defaultValue: "", + }, + ], +); + +webidl.converters.WebTransportHash = webidl.createDictionaryConverter( + "WebTransportHash", + [ + { + key: "algorithm", + converter: webidl.converters.DOMString, + }, + { + key: "value", + converter: webidl.converters.BufferSource, + }, + ], +); + +webidl.converters["sequence"] = webidl + .createSequenceConverter(webidl.converters.WebTransportHash); + +webidl.converters.WebTransportCongestionControl = webidl.createEnumConverter( + "WebTransportCongestionControl", + [ + "default", + "throughput", + "low-latency", + ], +); + +webidl.converters.WebTransportOptions = webidl + .createDictionaryConverter("WebTransportOptions", [ + { + key: "allowPooling", + converter: webidl.converters.boolean, + defaultValue: false, + }, + { + key: "requireUnreliable", + converter: webidl.converters.boolean, + defaultValue: false, + }, + { + key: "serverCertificateHashes", + converter: webidl.converters["sequence"], + }, + { + key: "congestionControl", + converter: webidl.converters.WebTransportCongestionControl, + defaultValue: "default", + }, + { + key: "anticipatedConcurrentIncomingUnidirectionalStreams", + converter: webidl.converters["unsigned short?"], + defaultValue: null, + }, + { + key: "anticipatedConcurrentIncomingBidirectionalStreams", + converter: webidl.converters["unsigned short?"], + defaultValue: null, + }, + { + key: "protocols", + converter: webidl.converters["sequence"], + defaultValue: [], + }, + ]); + +webidl.converters.WebTransportErrorSource = webidl.createEnumConverter( + "WebTransportErrorSource", + ["stream", "session"], +); + +webidl.converters.WebTransportErrorOptions = webidl.createDictionaryConverter( + "WebTransportErrorOptions", + [ + { + key: "source", + converter: webidl.converters.WebTransportErrorSource, + defaultValue: "stream", + }, + { + key: "streamErrorCode", + converter: webidl.converters["unsigned long?"], + defaultValue: null, + }, + ], +); + +export { + upgradeWebTransport, + WebTransport, + WebTransportBidirectionalStream, + WebTransportDatagramDuplexStream, + WebTransportError, + WebTransportReceiveStream, + WebTransportSendGroup, + WebTransportSendStream, +}; diff --git a/ext/webidl/00_webidl.js b/ext/webidl/00_webidl.js index b3c3b299f0..034c077e29 100644 --- a/ext/webidl/00_webidl.js +++ b/ext/webidl/00_webidl.js @@ -324,9 +324,15 @@ converters.short = createIntegerConversion(16, { unsigned: false }); converters["unsigned short"] = createIntegerConversion(16, { unsigned: true, }); +converters["unsigned short?"] = createNullableConverter( + converters["unsigned short"], +); converters.long = createIntegerConversion(32, { unsigned: false }); converters["unsigned long"] = createIntegerConversion(32, { unsigned: true }); +converters["unsigned long?"] = createNullableConverter( + converters["unsigned long"], +); converters["long long"] = createLongLongConversion(64, { unsigned: false }); converters["unsigned long long"] = createLongLongConversion(64, { @@ -398,6 +404,10 @@ converters["unrestricted double"] = (V, _prefix, _context, _opts) => { return x; }; +converters["unrestricted double?"] = createNullableConverter( + converters["unrestricted double"], +); + converters.DOMString = function ( V, prefix, diff --git a/runtime/js/90_deno_ns.js b/runtime/js/90_deno_ns.js index 5aaf0614dc..831779c6e9 100644 --- a/runtime/js/90_deno_ns.js +++ b/runtime/js/90_deno_ns.js @@ -34,6 +34,7 @@ import * as telemetry from "ext:deno_telemetry/telemetry.ts"; const { ObjectDefineProperties } = primordials; const loadQuic = core.createLazyLoader("ext:deno_net/03_quic.js"); +const loadWebTransport = core.createLazyLoader("ext:deno_web/webtransport.js"); const denoNs = { Process: process.Process, @@ -198,6 +199,10 @@ ObjectDefineProperties(denoNsUnstableById[unstableIds.net], { loadQuic, ), QuicIncoming: core.propWritableLazyLoaded((q) => q.QuicIncoming, loadQuic), + upgradeWebTransport: core.propWritableLazyLoaded( + (wt) => wt.upgradeWebTransport, + loadWebTransport, + ), }); // denoNsUnstableById[unstableIds.unsafeProto] = { __proto__: null } diff --git a/runtime/js/98_global_scope_shared.js b/runtime/js/98_global_scope_shared.js index 99bace7647..36543a869d 100644 --- a/runtime/js/98_global_scope_shared.js +++ b/runtime/js/98_global_scope_shared.js @@ -39,6 +39,7 @@ import * as webgpuSurface from "ext:deno_webgpu/02_surface.js"; import { unstableIds } from "ext:runtime/90_deno_ns.js"; const loadImage = core.createLazyLoader("ext:deno_canvas/01_image.js"); +const loadWebTransport = core.createLazyLoader("ext:deno_web/webtransport.js"); // https://developer.mozilla.org/en-US/docs/Web/API/WindowOrWorkerGlobalScope const windowOrWorkerGlobalScope = { @@ -298,6 +299,34 @@ unstableForWindowOrWorkerGlobalScope[unstableIds.broadcastChannel] = { unstableForWindowOrWorkerGlobalScope[unstableIds.net] = { WebSocketStream: core.propNonEnumerable(webSocketStream.WebSocketStream), WebSocketError: core.propNonEnumerable(webSocketStream.WebSocketError), + WebTransport: core.propNonEnumerableLazyLoaded( + (wt) => wt.WebTransport, + loadWebTransport, + ), + WebTransportBidirectionalStream: core.propNonEnumerableLazyLoaded( + (wt) => wt.WebTransportBidirectionalStream, + loadWebTransport, + ), + WebTransportDatagramDuplexStream: core.propNonEnumerableLazyLoaded( + (wt) => wt.WebTransportDatagramDuplexStream, + loadWebTransport, + ), + WebTransportReceiveStream: core.propNonEnumerableLazyLoaded( + (wt) => wt.WebTransportReceiveStream, + loadWebTransport, + ), + WebTransportSendGroup: core.propNonEnumerableLazyLoaded( + (wt) => wt.WebTransportSendGroup, + loadWebTransport, + ), + WebTransportSendStream: core.propNonEnumerableLazyLoaded( + (wt) => wt.WebTransportSendStream, + loadWebTransport, + ), + WebTransportError: core.propNonEnumerableLazyLoaded( + (wt) => wt.WebTransportError, + loadWebTransport, + ), }; unstableForWindowOrWorkerGlobalScope[unstableIds.webgpu] = {}; diff --git a/tests/integration/lsp_tests.rs b/tests/integration/lsp_tests.rs index 196184f3e0..4e51734c86 100644 --- a/tests/integration/lsp_tests.rs +++ b/tests/integration/lsp_tests.rs @@ -5762,7 +5762,7 @@ fn lsp_jsr_auto_import_completion() { json!({ "triggerKind": 1 }), ); assert!(!list.is_incomplete); - assert_eq!(list.items.len(), 268); + assert_eq!(list.items.len(), 276); let item = list.items.iter().find(|i| i.label == "add").unwrap(); assert_eq!(&item.label, "add"); assert_eq!( @@ -5842,7 +5842,7 @@ fn lsp_jsr_auto_import_completion_import_map() { json!({ "triggerKind": 1 }), ); assert!(!list.is_incomplete); - assert_eq!(list.items.len(), 268); + assert_eq!(list.items.len(), 276); let item = list.items.iter().find(|i| i.label == "add").unwrap(); assert_eq!(&item.label, "add"); assert_eq!(json!(&item.label_details), json!({ "description": "add" })); diff --git a/tests/specs/run/webtransport/__test__.jsonc b/tests/specs/run/webtransport/__test__.jsonc new file mode 100644 index 0000000000..e097cf9155 --- /dev/null +++ b/tests/specs/run/webtransport/__test__.jsonc @@ -0,0 +1,5 @@ +{ + "args": "run --quiet --unstable-net -A main.ts", + "output": "", + "exitCode": 0 +} diff --git a/tests/specs/run/webtransport/deno.json b/tests/specs/run/webtransport/deno.json new file mode 100644 index 0000000000..105514e133 --- /dev/null +++ b/tests/specs/run/webtransport/deno.json @@ -0,0 +1,4 @@ +{ + "lock": false, + "importMap": "../../../../import_map.json" +} diff --git a/tests/specs/run/webtransport/main.ts b/tests/specs/run/webtransport/main.ts new file mode 100644 index 0000000000..d8b291db4e --- /dev/null +++ b/tests/specs/run/webtransport/main.ts @@ -0,0 +1,100 @@ +// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. + +import { decodeBase64 } from "@std/encoding/base64"; +import { assertEquals } from "@std/assert"; + +const cert = Deno.readTextFileSync("../../../testdata/tls/localhost.crt"); +const certHash = await crypto.subtle.digest( + "SHA-256", + decodeBase64(cert.split("\n").slice(1, -2).join("")), +); + +const server = new Deno.QuicEndpoint({ + hostname: "localhost", + port: 0, +}); +const listener = server.listen({ + cert, + key: Deno.readTextFileSync("../../../testdata/tls/localhost.key"), + alpnProtocols: ["h3"], +}); + +(async () => { + for await (const conn of listener) { + const wt = await Deno.upgradeWebTransport(conn); + + assertEquals(wt.url, `https://localhost:${server.addr.port}/path`); + + wt.ready.then(() => { + (async () => { + for await (const bidi of wt.incomingBidirectionalStreams) { + bidi.readable.pipeTo(bidi.writable).catch(() => {}); + } + })(); + + (async () => { + for await (const stream of wt.incomingUnidirectionalStreams) { + const out = await wt.createUnidirectionalStream(); + stream.pipeTo(out).catch(() => {}); + } + })(); + + wt.datagrams.readable.pipeTo(wt.datagrams.writable); + }); + } +})(); + +const client = new WebTransport(`https://localhost:${server.addr.port}/path`, { + serverCertificateHashes: [{ + algorithm: "sha-256", + value: certHash, + }], +}); + +client.ready.then(async () => { + const bi = await client.createBidirectionalStream(); + + { + const writer = bi.writable.getWriter(); + await writer.write(new Uint8Array([1, 0, 1, 0])); + writer.releaseLock(); + } + + { + const reader = bi.readable.getReader(); + assertEquals(await reader.read(), { + value: new Uint8Array([1, 0, 1, 0]), + done: false, + }); + reader.releaseLock(); + } + + { + const uni = await client.createUnidirectionalStream(); + const writer = uni.getWriter(); + await writer.write(new Uint8Array([0, 2, 0, 2])); + writer.releaseLock(); + } + + { + const uni = + (await client.incomingUnidirectionalStreams.getReader().read()).value; + const reader = uni.getReader(); + assertEquals(await reader.read(), { + value: new Uint8Array([0, 2, 0, 2]), + done: false, + }); + reader.releaseLock(); + } + + await client.datagrams.writable.getWriter().write( + new Uint8Array([3, 0, 3, 0]), + ); + assertEquals(await client.datagrams.readable.getReader().read(), { + value: new Uint8Array([3, 0, 3, 0]), + done: false, + }); + + client.close(); + server.close(); +}); diff --git a/tools/core_import_map.json b/tools/core_import_map.json index 935c7179a1..4e76eae102 100644 --- a/tools/core_import_map.json +++ b/tools/core_import_map.json @@ -233,6 +233,7 @@ "ext:deno_web/14_compression.js": "../ext/web/14_compression.js", "ext:deno_web/15_performance.js": "../ext/web/15_performance.js", "ext:deno_web/16_image_data.js": "../ext/web/16_image_data.js", + "ext:deno_web/webtransport.js": "../ext/web/webtransport.js", "ext:deno_webidl/00_webidl.js": "../ext/webidl/00_webidl.js", "ext:deno_websocket/01_websocket.js": "../ext/websocket/01_websocket.js", "ext:deno_websocket/02_websocketstream.js": "../ext/websocket/02_websocketstream.js",