diff --git a/Cargo.lock b/Cargo.lock index 944694a6d7..6cdb4f551f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1505,6 +1505,7 @@ version = "0.146.0" dependencies = [ "async-trait", "base64-simd", + "bytes", "deno_bench_util", "deno_console", "deno_core", @@ -1512,6 +1513,7 @@ dependencies = [ "deno_webidl", "encoding_rs", "flate2", + "futures", "serde", "tokio", "uuid", diff --git a/cli/tests/integration/js_unit_tests.rs b/cli/tests/integration/js_unit_tests.rs index c8039f89c8..f54280b23c 100644 --- a/cli/tests/integration/js_unit_tests.rs +++ b/cli/tests/integration/js_unit_tests.rs @@ -78,6 +78,7 @@ util::unit_test_factory!( signal_test, stat_test, stdio_test, + streams_test, structured_clone_test, symlink_test, sync_test, diff --git a/cli/tests/unit/serve_test.ts b/cli/tests/unit/serve_test.ts index 68d03e8463..f0a5b430be 100644 --- a/cli/tests/unit/serve_test.ts +++ b/cli/tests/unit/serve_test.ts @@ -693,24 +693,30 @@ function createStreamTest(count: number, delay: number, action: string) { onError: createOnErrorCb(ac), }); - await listeningPromise; - const resp = await fetch(`http://127.0.0.1:${servePort}/`); - const text = await resp.text(); + try { + await listeningPromise; + const resp = await fetch(`http://127.0.0.1:${servePort}/`); + if (action == "Throw") { + try { + await resp.text(); + fail(); + } catch (_) { + // expected + } + } else { + const text = await resp.text(); - ac.abort(); - await server.finished; - let expected = ""; - if (action == "Throw" && count < 2 && delay < 1000) { - // NOTE: This is specific to the current implementation. In some cases where a stream errors, we - // don't send the first packet. - expected = ""; - } else { - for (let i = 0; i < count; i++) { - expected += `a${i}`; + let expected = ""; + for (let i = 0; i < count; i++) { + expected += `a${i}`; + } + + assertEquals(text, expected); } + } finally { + ac.abort(); + await server.finished; } - - assertEquals(text, expected); }); } diff --git a/cli/tests/unit/streams_test.ts b/cli/tests/unit/streams_test.ts new file mode 100644 index 0000000000..4a573c9344 --- /dev/null +++ b/cli/tests/unit/streams_test.ts @@ -0,0 +1,299 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +import { fail } from "https://deno.land/std@v0.42.0/testing/asserts.ts"; +import { assertEquals, Deferred, deferred } from "./test_util.ts"; + +const { + core, + resourceForReadableStream, + // @ts-expect-error TypeScript (as of 3.7) does not support indexing namespaces by symbol +} = Deno[Deno.internal]; + +const LOREM = + "Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua."; + +// Hello world, with optional close +// deno-lint-ignore no-explicit-any +function helloWorldStream(close?: boolean, completion?: Deferred) { + return new ReadableStream({ + start(controller) { + controller.enqueue("hello, world"); + if (close == true) { + controller.close(); + } + }, + cancel(reason) { + completion?.resolve(reason); + }, + }).pipeThrough(new TextEncoderStream()); +} + +// Hello world, with optional close +function errorStream(type: "string" | "controller" | "TypeError") { + return new ReadableStream({ + start(controller) { + controller.enqueue("hello, world"); + }, + pull(controller) { + if (type == "string") { + throw "Uh oh (string)!"; + } + if (type == "TypeError") { + throw TypeError("Uh oh (TypeError)!"); + } + controller.error("Uh oh (controller)!"); + }, + }).pipeThrough(new TextEncoderStream()); +} + +// Long stream with Lorem Ipsum text. +function longStream() { + return new ReadableStream({ + start(controller) { + for (let i = 0; i < 4; i++) { + setTimeout(() => { + controller.enqueue(LOREM); + if (i == 3) { + controller.close(); + } + }, i * 100); + } + }, + }).pipeThrough(new TextEncoderStream()); +} + +// Empty stream, closes either immediately or on a call to pull. +function emptyStream(onPull: boolean) { + return new ReadableStream({ + start(controller) { + if (!onPull) { + controller.close(); + } + }, + pull(controller) { + if (onPull) { + controller.close(); + } + }, + }).pipeThrough(new TextEncoderStream()); +} + +// Include an empty chunk +function emptyChunkStream() { + return new ReadableStream({ + start(controller) { + controller.enqueue(new Uint8Array([1])); + controller.enqueue(new Uint8Array([])); + controller.enqueue(new Uint8Array([2])); + controller.close(); + }, + }); +} + +// Creates a stream with the given number of packets, a configurable delay between packets, and a final +// action (either "Throw" or "Close"). +function makeStreamWithCount( + count: number, + delay: number, + action: "Throw" | "Close", +): ReadableStream { + function doAction(controller: ReadableStreamDefaultController, i: number) { + if (i == count) { + if (action == "Throw") { + controller.error(new Error("Expected error!")); + } else { + controller.close(); + } + } else { + controller.enqueue(String.fromCharCode("a".charCodeAt(0) + i)); + + if (delay == 0) { + doAction(controller, i + 1); + } else { + setTimeout(() => doAction(controller, i + 1), delay); + } + } + } + + return new ReadableStream({ + start(controller) { + if (delay == 0) { + doAction(controller, 0); + } else { + setTimeout(() => doAction(controller, 0), delay); + } + }, + }).pipeThrough(new TextEncoderStream()); +} + +// Normal stream operation +Deno.test(async function readableStream() { + const rid = resourceForReadableStream(helloWorldStream()); + const buffer = new Uint8Array(1024); + const nread = await core.ops.op_read(rid, buffer); + assertEquals(nread, 12); + core.ops.op_close(rid); +}); + +// Close the stream after reading everything +Deno.test(async function readableStreamClose() { + const cancel = deferred(); + const rid = resourceForReadableStream(helloWorldStream(false, cancel)); + const buffer = new Uint8Array(1024); + const nread = await core.ops.op_read(rid, buffer); + assertEquals(nread, 12); + core.ops.op_close(rid); + assertEquals(await cancel, undefined); +}); + +// Close the stream without reading everything +Deno.test(async function readableStreamClosePartialRead() { + const cancel = deferred(); + const rid = resourceForReadableStream(helloWorldStream(false, cancel)); + const buffer = new Uint8Array(5); + const nread = await core.ops.op_read(rid, buffer); + assertEquals(nread, 5); + core.ops.op_close(rid); + assertEquals(await cancel, undefined); +}); + +// Close the stream without reading anything +Deno.test(async function readableStreamCloseWithoutRead() { + const cancel = deferred(); + const rid = resourceForReadableStream(helloWorldStream(false, cancel)); + core.ops.op_close(rid); + assertEquals(await cancel, undefined); +}); + +Deno.test(async function readableStreamPartial() { + const rid = resourceForReadableStream(helloWorldStream()); + const buffer = new Uint8Array(5); + const nread = await core.ops.op_read(rid, buffer); + assertEquals(nread, 5); + const buffer2 = new Uint8Array(1024); + const nread2 = await core.ops.op_read(rid, buffer2); + assertEquals(nread2, 7); + core.ops.op_close(rid); +}); + +Deno.test(async function readableStreamLongReadAll() { + const rid = resourceForReadableStream(longStream()); + const buffer = await core.ops.op_read_all(rid); + assertEquals(buffer.length, LOREM.length * 4); + core.ops.op_close(rid); +}); + +Deno.test(async function readableStreamLongByPiece() { + const rid = resourceForReadableStream(longStream()); + let total = 0; + for (let i = 0; i < 100; i++) { + const length = await core.ops.op_read(rid, new Uint8Array(16)); + total += length; + if (length == 0) { + break; + } + } + assertEquals(total, LOREM.length * 4); + core.ops.op_close(rid); +}); + +for ( + const type of [ + "string", + "TypeError", + "controller", + ] as ("string" | "TypeError" | "controller")[] +) { + Deno.test(`readableStreamError_${type}`, async function () { + const rid = resourceForReadableStream(errorStream(type)); + assertEquals(12, await core.ops.op_read(rid, new Uint8Array(16))); + try { + await core.ops.op_read(rid, new Uint8Array(1)); + fail(); + } catch (e) { + assertEquals(e.message, `Uh oh (${type})!`); + } + core.ops.op_close(rid); + }); +} + +Deno.test(async function readableStreamEmptyOnStart() { + const rid = resourceForReadableStream(emptyStream(true)); + const buffer = new Uint8Array(1024); + const nread = await core.ops.op_read(rid, buffer); + assertEquals(nread, 0); + core.ops.op_close(rid); +}); + +Deno.test(async function readableStreamEmptyOnPull() { + const rid = resourceForReadableStream(emptyStream(false)); + const buffer = new Uint8Array(1024); + const nread = await core.ops.op_read(rid, buffer); + assertEquals(nread, 0); + core.ops.op_close(rid); +}); + +Deno.test(async function readableStreamEmptyReadAll() { + const rid = resourceForReadableStream(emptyStream(false)); + const buffer = await core.ops.op_read_all(rid); + assertEquals(buffer.length, 0); + core.ops.op_close(rid); +}); + +Deno.test(async function readableStreamWithEmptyChunk() { + const rid = resourceForReadableStream(emptyChunkStream()); + const buffer = await core.ops.op_read_all(rid); + assertEquals(buffer, new Uint8Array([1, 2])); + core.ops.op_close(rid); +}); + +Deno.test(async function readableStreamWithEmptyChunkOneByOne() { + const rid = resourceForReadableStream(emptyChunkStream()); + assertEquals(1, await core.ops.op_read(rid, new Uint8Array(1))); + assertEquals(1, await core.ops.op_read(rid, new Uint8Array(1))); + assertEquals(0, await core.ops.op_read(rid, new Uint8Array(1))); + core.ops.op_close(rid); +}); + +for (const count of [0, 1, 2, 3]) { + for (const delay of [0, 1, 10]) { + // Creating a stream that errors in start will throw + if (delay > 0) { + createStreamTest(count, delay, "Throw"); + } + createStreamTest(count, delay, "Close"); + } +} + +function createStreamTest( + count: number, + delay: number, + action: "Throw" | "Close", +) { + Deno.test(`streamCount${count}Delay${delay}${action}`, async () => { + let rid; + try { + rid = resourceForReadableStream( + makeStreamWithCount(count, delay, action), + ); + for (let i = 0; i < count; i++) { + const buffer = new Uint8Array(1); + await core.ops.op_read(rid, buffer); + } + if (action == "Throw") { + try { + const buffer = new Uint8Array(1); + assertEquals(1, await core.ops.op_read(rid, buffer)); + fail(); + } catch (e) { + // We expect this to be thrown + assertEquals(e.message, "Expected error!"); + } + } else { + const buffer = new Uint8Array(1); + assertEquals(0, await core.ops.op_read(rid, buffer)); + } + } finally { + core.ops.op_close(rid); + } + }); +} diff --git a/cli/tests/unit_node/http_test.ts b/cli/tests/unit_node/http_test.ts index a361ff2571..706c672f14 100644 --- a/cli/tests/unit_node/http_test.ts +++ b/cli/tests/unit_node/http_test.ts @@ -141,8 +141,7 @@ Deno.test("[node/http] chunked response", async () => { } }); -// TODO(kt3k): This test case exercises the workaround for https://github.com/denoland/deno/issues/17194 -// This should be removed when #17194 is resolved. +// Test empty chunks: https://github.com/denoland/deno/issues/17194 Deno.test("[node/http] empty chunk in the middle of response", async () => { const promise = deferred(); diff --git a/ext/http/00_serve.js b/ext/http/00_serve.js index 3447f48e21..265b797066 100644 --- a/ext/http/00_serve.js +++ b/ext/http/00_serve.js @@ -30,9 +30,9 @@ import { import { Deferred, getReadableStreamResourceBacking, - readableStreamClose, readableStreamForRid, ReadableStreamPrototype, + resourceForReadableStream, } from "ext:deno_web/06_streams.js"; import { listen, TcpConn } from "ext:deno_net/01_net.js"; import { listenTls } from "ext:deno_net/02_tls.js"; @@ -41,10 +41,6 @@ const { Error, ObjectPrototypeIsPrototypeOf, PromisePrototypeCatch, - SafeSet, - SafeSetIterator, - SetPrototypeAdd, - SetPrototypeDelete, Symbol, SymbolFor, TypeError, @@ -61,7 +57,6 @@ const { op_http_set_promise_complete, op_http_set_response_body_bytes, op_http_set_response_body_resource, - op_http_set_response_body_stream, op_http_set_response_body_text, op_http_set_response_header, op_http_set_response_headers, @@ -339,7 +334,6 @@ class InnerRequest { class CallbackContext { abortController; - responseBodies; scheme; fallbackHost; serverRid; @@ -352,7 +346,6 @@ class CallbackContext { { once: true }, ); this.abortController = new AbortController(); - this.responseBodies = new SafeSet(); this.serverRid = args[0]; this.scheme = args[1]; this.fallbackHost = args[2]; @@ -379,23 +372,24 @@ class ServeHandlerInfo { } } -function fastSyncResponseOrStream(req, respBody) { +function fastSyncResponseOrStream(req, respBody, status) { if (respBody === null || respBody === undefined) { // Don't set the body - return null; + op_http_set_promise_complete(req, status); + return; } const stream = respBody.streamOrStatic; const body = stream.body; if (ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, body)) { - op_http_set_response_body_bytes(req, body); - return null; + op_http_set_response_body_bytes(req, body, status); + return; } if (typeof body === "string") { - op_http_set_response_body_text(req, body); - return null; + op_http_set_response_body_text(req, body, status); + return; } // At this point in the response it needs to be a stream @@ -408,115 +402,16 @@ function fastSyncResponseOrStream(req, respBody) { req, resourceBacking.rid, resourceBacking.autoClose, + status, + ); + } else { + const rid = resourceForReadableStream(stream); + op_http_set_response_body_resource( + req, + rid, + true, + status, ); - return null; - } - - return stream; -} - -async function asyncResponse(responseBodies, req, status, stream) { - const reader = stream.getReader(); - let responseRid; - let closed = false; - let timeout; - - try { - // IMPORTANT: We get a performance boost from this optimization, but V8 is very - // sensitive to the order and structure. Benchmark any changes to this code. - - // Optimize for streams that are done in zero or one packets. We will not - // have to allocate a resource in this case. - const { value: value1, done: done1 } = await reader.read(); - if (done1) { - closed = true; - // Exit 1: no response body at all, extreme fast path - // Reader will be closed by finally block - return; - } - - // The second value cannot block indefinitely, as someone may be waiting on a response - // of the first packet that may influence this packet. We set this timeout arbitrarily to 250ms - // and we race it. - let timeoutPromise; - timeout = setTimeout(() => { - responseRid = op_http_set_response_body_stream(req); - SetPrototypeAdd(responseBodies, responseRid); - op_http_set_promise_complete(req, status); - // TODO(mmastrac): if this promise fails before we get to the await below, it crashes - // the process with an error: - // - // 'Uncaught (in promise) BadResource: failed to write'. - // - // To avoid this, we're going to swallow errors here and allow the code later in the - // file to re-throw them in a way that doesn't appear to be an uncaught promise rejection. - timeoutPromise = PromisePrototypeCatch( - core.writeAll(responseRid, value1), - () => null, - ); - }, 250); - const { value: value2, done: done2 } = await reader.read(); - - if (timeoutPromise) { - await timeoutPromise; - if (done2) { - closed = true; - // Exit 2(a): read 2 is EOS, and timeout resolved. - // Reader will be closed by finally block - // Response stream will be closed by finally block. - return; - } - - // Timeout resolved, value1 written but read2 is not EOS. Carry value2 forward. - } else { - clearTimeout(timeout); - timeout = undefined; - - if (done2) { - // Exit 2(b): read 2 is EOS, and timeout did not resolve as we read fast enough. - // Reader will be closed by finally block - // No response stream - closed = true; - op_http_set_response_body_bytes(req, value1); - return; - } - - responseRid = op_http_set_response_body_stream(req); - SetPrototypeAdd(responseBodies, responseRid); - op_http_set_promise_complete(req, status); - // Write our first packet - await core.writeAll(responseRid, value1); - } - - await core.writeAll(responseRid, value2); - while (true) { - const { value, done } = await reader.read(); - if (done) { - closed = true; - break; - } - await core.writeAll(responseRid, value); - } - } catch (error) { - closed = true; - try { - await reader.cancel(error); - } catch { - // Pass - } - } finally { - if (!closed) { - readableStreamClose(reader); - } - if (timeout !== undefined) { - clearTimeout(timeout); - } - if (responseRid) { - core.tryClose(responseRid); - SetPrototypeDelete(responseBodies, responseRid); - } else { - op_http_set_promise_complete(req, status); - } } } @@ -528,7 +423,6 @@ async function asyncResponse(responseBodies, req, status, stream) { * This function returns a promise that will only reject in the case of abnormal exit. */ function mapToCallback(context, callback, onError) { - const responseBodies = context.responseBodies; const signal = context.abortController.signal; const hasCallback = callback.length > 0; const hasOneCallback = callback.length === 1; @@ -591,15 +485,7 @@ function mapToCallback(context, callback, onError) { } } - // Attempt to respond quickly to this request, otherwise extract the stream - const stream = fastSyncResponseOrStream(req, inner.body); - if (stream !== null) { - // Handle the stream asynchronously - await asyncResponse(responseBodies, req, status, stream); - } else { - op_http_set_promise_complete(req, status); - } - + fastSyncResponseOrStream(req, inner.body, status); innerRequest?.close(); }; } @@ -755,10 +641,6 @@ function serveHttpOn(context, callback) { } PromisePrototypeCatch(callback(req), promiseErrorHandler); } - - for (const streamRid of new SafeSetIterator(context.responseBodies)) { - core.tryClose(streamRid); - } })(); return { diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs index 2e9b315ca1..60ef83b0f7 100644 --- a/ext/http/http_next.rs +++ b/ext/http/http_next.rs @@ -10,7 +10,6 @@ use crate::request_properties::HttpPropertyExtractor; use crate::response_body::Compression; use crate::response_body::ResponseBytes; use crate::response_body::ResponseBytesInner; -use crate::response_body::V8StreamHttpResponseBody; use crate::slab::slab_drop; use crate::slab::slab_get; use crate::slab::slab_init; @@ -30,6 +29,7 @@ use deno_core::task::JoinHandle; use deno_core::v8; use deno_core::AsyncRefCell; use deno_core::AsyncResult; +use deno_core::BufView; use deno_core::ByteString; use deno_core::CancelFuture; use deno_core::CancelHandle; @@ -573,6 +573,7 @@ fn ensure_vary_accept_encoding(hmap: &mut HeaderMap) { fn set_response( slab_id: SlabId, length: Option, + status: u16, response_fn: impl FnOnce(Compression) -> ResponseBytesInner, ) { let mut http = slab_get(slab_id); @@ -583,7 +584,14 @@ fn set_response( length, response.headers_mut(), ); - response.body_mut().initialize(response_fn(compression)) + response.body_mut().initialize(response_fn(compression)); + + // The Javascript code should never provide a status that is invalid here (see 23_response.js), so we + // will quitely ignore invalid values. + if let Ok(code) = StatusCode::from_u16(status) { + *response.status_mut() = code; + } + http.complete(); } #[op2(fast)] @@ -592,6 +600,7 @@ pub fn op_http_set_response_body_resource( #[smi] slab_id: SlabId, #[smi] stream_rid: ResourceId, auto_close: bool, + status: u16, ) -> Result<(), AnyError> { // If the stream is auto_close, we will hold the last ref to it until the response is complete. let resource = if auto_close { @@ -603,6 +612,7 @@ pub fn op_http_set_response_body_resource( set_response( slab_id, resource.size_hint().1.map(|s| s as usize), + status, move |compression| { ResponseBytesInner::from_resource(compression, resource, auto_close) }, @@ -611,43 +621,35 @@ pub fn op_http_set_response_body_resource( Ok(()) } -#[op2(fast)] -#[smi] -pub fn op_http_set_response_body_stream( - state: &mut OpState, - #[smi] slab_id: SlabId, -) -> Result { - // TODO(mmastrac): what should this channel size be? - let (tx, rx) = tokio::sync::mpsc::channel(1); - set_response(slab_id, None, |compression| { - ResponseBytesInner::from_v8(compression, rx) - }); - - Ok(state.resource_table.add(V8StreamHttpResponseBody::new(tx))) -} - #[op2(fast)] pub fn op_http_set_response_body_text( #[smi] slab_id: SlabId, #[string] text: String, + status: u16, ) { if !text.is_empty() { - set_response(slab_id, Some(text.len()), |compression| { + set_response(slab_id, Some(text.len()), status, |compression| { ResponseBytesInner::from_vec(compression, text.into_bytes()) }); + } else { + op_http_set_promise_complete::call(slab_id, status); } } -#[op2(fast)] +// Skipping `fast` because we prefer an owned buffer here. +#[op2] pub fn op_http_set_response_body_bytes( #[smi] slab_id: SlabId, - #[buffer] buffer: &[u8], + #[buffer] buffer: JsBuffer, + status: u16, ) { if !buffer.is_empty() { - set_response(slab_id, Some(buffer.len()), |compression| { - ResponseBytesInner::from_slice(compression, buffer) + set_response(slab_id, Some(buffer.len()), status, |compression| { + ResponseBytesInner::from_bufview(compression, BufView::from(buffer)) }); - }; + } else { + op_http_set_promise_complete::call(slab_id, status); + } } #[op2(async)] diff --git a/ext/http/lib.rs b/ext/http/lib.rs index 8060b5a1e8..e0c5c89d02 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -115,7 +115,6 @@ deno_core::extension!( http_next::op_http_set_promise_complete, http_next::op_http_set_response_body_bytes, http_next::op_http_set_response_body_resource, - http_next::op_http_set_response_body_stream, http_next::op_http_set_response_body_text, http_next::op_http_set_response_header, http_next::op_http_set_response_headers, diff --git a/ext/http/response_body.rs b/ext/http/response_body.rs index 3697b2732f..bd9d6f4332 100644 --- a/ext/http/response_body.rs +++ b/ext/http/response_body.rs @@ -1,5 +1,4 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. -use std::borrow::Cow; use std::cell::RefCell; use std::future::Future; use std::io::Write; @@ -11,18 +10,12 @@ use brotli::enc::encode::BrotliEncoderParameter; use brotli::ffi::compressor::BrotliEncoderState; use bytes::Bytes; use bytes::BytesMut; -use deno_core::error::bad_resource; use deno_core::error::AnyError; use deno_core::futures::ready; use deno_core::futures::FutureExt; -use deno_core::AsyncRefCell; use deno_core::AsyncResult; use deno_core::BufView; -use deno_core::CancelHandle; -use deno_core::CancelTryFuture; -use deno_core::RcRef; use deno_core::Resource; -use deno_core::WriteOutcome; use flate2::write::GzEncoder; use http::HeaderMap; use hyper1::body::Body; @@ -126,8 +119,8 @@ pub enum Compression { pub enum ResponseStream { /// A resource stream, piped in fast mode. Resource(ResourceBodyAdapter), - /// A JS-backed stream, written in JS and transported via pipe. - V8Stream(tokio::sync::mpsc::Receiver), + #[cfg(test)] + TestChannel(tokio::sync::mpsc::Receiver), } #[derive(Default)] @@ -217,13 +210,6 @@ impl ResponseBytesInner { } } - pub fn from_v8( - compression: Compression, - rx: tokio::sync::mpsc::Receiver, - ) -> Self { - Self::from_stream(compression, ResponseStream::V8Stream(rx)) - } - pub fn from_resource( compression: Compression, stm: Rc, @@ -235,12 +221,12 @@ impl ResponseBytesInner { ) } - pub fn from_slice(compression: Compression, bytes: &[u8]) -> Self { + pub fn from_bufview(compression: Compression, buf: BufView) -> Self { match compression { Compression::GZip => { let mut writer = GzEncoder::new(Vec::new(), flate2::Compression::fast()); - writer.write_all(bytes).unwrap(); + writer.write_all(&buf).unwrap(); Self::Bytes(BufView::from(writer.finish().unwrap())) } Compression::Brotli => { @@ -251,11 +237,11 @@ impl ResponseBytesInner { // (~4MB) let mut writer = brotli::CompressorWriter::new(Vec::new(), 65 * 1024, 6, 22); - writer.write_all(bytes).unwrap(); + writer.write_all(&buf).unwrap(); writer.flush().unwrap(); Self::Bytes(BufView::from(writer.into_inner())) } - _ => Self::Bytes(BufView::from(bytes.to_vec())), + _ => Self::Bytes(buf), } } @@ -368,14 +354,16 @@ impl PollFrame for ResponseStream { ) -> std::task::Poll { match &mut *self { ResponseStream::Resource(res) => Pin::new(res).poll_frame(cx), - ResponseStream::V8Stream(res) => Pin::new(res).poll_frame(cx), + #[cfg(test)] + ResponseStream::TestChannel(rx) => Pin::new(rx).poll_frame(cx), } } fn size_hint(&self) -> SizeHint { match self { ResponseStream::Resource(res) => res.size_hint(), - ResponseStream::V8Stream(res) => res.size_hint(), + #[cfg(test)] + ResponseStream::TestChannel(_) => SizeHint::default(), } } } @@ -414,6 +402,7 @@ impl PollFrame for ResourceBodyAdapter { } } +#[cfg(test)] impl PollFrame for tokio::sync::mpsc::Receiver { fn poll_frame( mut self: Pin<&mut Self>, @@ -761,52 +750,6 @@ impl PollFrame for BrotliResponseStream { } } -/// A response body object that can be passed to V8. This body will feed byte buffers to a channel which -/// feed's hyper's HTTP response. -pub struct V8StreamHttpResponseBody( - AsyncRefCell>>, - CancelHandle, -); - -impl V8StreamHttpResponseBody { - pub fn new(sender: tokio::sync::mpsc::Sender) -> Self { - Self(AsyncRefCell::new(Some(sender)), CancelHandle::default()) - } -} - -impl Resource for V8StreamHttpResponseBody { - fn name(&self) -> Cow { - "responseBody".into() - } - - fn write( - self: Rc, - buf: BufView, - ) -> AsyncResult { - let cancel_handle = RcRef::map(&self, |this| &this.1); - Box::pin( - async move { - let nwritten = buf.len(); - - let res = RcRef::map(self, |this| &this.0).borrow().await; - if let Some(tx) = res.as_ref() { - tx.send(buf) - .await - .map_err(|_| bad_resource("failed to write"))?; - Ok(WriteOutcome::Full { nwritten }) - } else { - Err(bad_resource("failed to write")) - } - } - .try_or_cancel(cancel_handle), - ) - } - - fn close(self: Rc) { - self.1.cancel(); - } -} - #[cfg(test)] mod tests { use super::*; @@ -892,7 +835,7 @@ mod tests { expected.extend(v); } let (tx, rx) = tokio::sync::mpsc::channel(1); - let underlying = ResponseStream::V8Stream(rx); + let underlying = ResponseStream::TestChannel(rx); let mut resp = GZipResponseStream::new(underlying); let handle = tokio::task::spawn(async move { for chunk in v { @@ -934,7 +877,7 @@ mod tests { expected.extend(v); } let (tx, rx) = tokio::sync::mpsc::channel(1); - let underlying = ResponseStream::V8Stream(rx); + let underlying = ResponseStream::TestChannel(rx); let mut resp = BrotliResponseStream::new(underlying); let handle = tokio::task::spawn(async move { for chunk in v { diff --git a/ext/web/06_streams.js b/ext/web/06_streams.js index 01f84aa2cf..0849d221d1 100644 --- a/ext/web/06_streams.js +++ b/ext/web/06_streams.js @@ -1,4 +1,5 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +// deno-lint-ignore-file camelcase // @ts-check /// @@ -7,7 +8,17 @@ /// const core = globalThis.Deno.core; -const ops = core.ops; +const internals = globalThis.__bootstrap.internals; +const { + op_arraybuffer_was_detached, + op_transfer_arraybuffer, + op_readable_stream_resource_allocate, + op_readable_stream_resource_get_sink, + op_readable_stream_resource_write_error, + op_readable_stream_resource_write_buf, + op_readable_stream_resource_close, + op_readable_stream_resource_await_close, +} = core.ensureFastOps(); import * as webidl from "ext:deno_webidl/00_webidl.js"; import { structuredClone } from "ext:deno_web/02_structured_clone.js"; import { @@ -61,6 +72,7 @@ const { SafeWeakMap, // TODO(lucacasonato): add SharedArrayBuffer to primordials // SharedArrayBufferPrototype, + String, Symbol, SymbolAsyncIterator, SymbolIterator, @@ -218,7 +230,7 @@ function isDetachedBuffer(O) { return false; } return ArrayBufferPrototypeGetByteLength(O) === 0 && - ops.op_arraybuffer_was_detached(O); + op_arraybuffer_was_detached(O); } /** @@ -244,7 +256,7 @@ function canTransferArrayBuffer(O) { * @returns {ArrayBufferLike} */ function transferArrayBuffer(O) { - return ops.op_transfer_arraybuffer(O); + return op_transfer_arraybuffer(O); } /** @@ -695,6 +707,68 @@ function isReadableStreamDisturbed(stream) { return stream[_disturbed]; } +/** + * Create a new resource that wraps a ReadableStream. The resource will support + * read operations, and those read operations will be fed by the output of the + * ReadableStream source. + * @param {ReadableStream} stream + * @returns {number} + */ +function resourceForReadableStream(stream) { + const reader = acquireReadableStreamDefaultReader(stream); + + // Allocate the resource + const rid = op_readable_stream_resource_allocate(); + + // Close the Reader we get from the ReadableStream when the resource is closed, ignoring any errors + PromisePrototypeCatch( + PromisePrototypeThen( + op_readable_stream_resource_await_close(rid), + () => reader.cancel(), + ), + () => {}, + ); + + // The ops here look like op_write_all/op_close, but we're not actually writing to a + // real resource. + (async () => { + try { + // This allocation is freed in the finally block below, guaranteeing it won't leak + const sink = op_readable_stream_resource_get_sink(rid); + try { + while (true) { + let value; + try { + const read = await reader.read(); + value = read.value; + if (read.done) { + break; + } + } catch (err) { + const message = err.message; + if (message) { + await op_readable_stream_resource_write_error(sink, err.message); + } else { + await op_readable_stream_resource_write_error(sink, String(err)); + } + break; + } + // If the chunk has non-zero length, write it + if (value.length > 0) { + await op_readable_stream_resource_write_buf(sink, value); + } + } + } finally { + op_readable_stream_resource_close(sink); + } + } catch (err) { + // Something went terribly wrong with this stream -- log and continue + console.error("Unexpected internal error on stream", err); + } + })(); + return rid; +} + const DEFAULT_CHUNK_SIZE = 64 * 1024; // 64 KiB // A finalization registry to clean up underlying resources that are GC'ed. @@ -6454,6 +6528,8 @@ webidl.converters.StreamPipeOptions = webidl { key: "signal", converter: webidl.converters.AbortSignal }, ]); +internals.resourceForReadableStream = resourceForReadableStream; + export { // Non-Public _state, @@ -6482,6 +6558,7 @@ export { ReadableStreamPrototype, readableStreamTee, readableStreamThrowIfErrored, + resourceForReadableStream, TransformStream, TransformStreamDefaultController, WritableStream, diff --git a/ext/web/Cargo.toml b/ext/web/Cargo.toml index dbc2df8c09..b923bc95ef 100644 --- a/ext/web/Cargo.toml +++ b/ext/web/Cargo.toml @@ -16,9 +16,11 @@ path = "lib.rs" [dependencies] async-trait.workspace = true base64-simd = "0.8" +bytes.workspace = true deno_core.workspace = true encoding_rs.workspace = true flate2.workspace = true +futures.workspace = true serde = "1.0.149" tokio.workspace = true uuid = { workspace = true, features = ["serde"] } diff --git a/ext/web/lib.rs b/ext/web/lib.rs index 374815804c..88937efb2c 100644 --- a/ext/web/lib.rs +++ b/ext/web/lib.rs @@ -4,6 +4,7 @@ mod blob; mod compression; mod hr_timer_lock; mod message_port; +mod stream_resource; mod timers; use deno_core::error::range_error; @@ -90,6 +91,12 @@ deno_core::extension!(deno_web, op_cancel_handle, op_sleep, op_transfer_arraybuffer, + stream_resource::op_readable_stream_resource_allocate, + stream_resource::op_readable_stream_resource_get_sink, + stream_resource::op_readable_stream_resource_write_error, + stream_resource::op_readable_stream_resource_write_buf, + stream_resource::op_readable_stream_resource_close, + stream_resource::op_readable_stream_resource_await_close, ], esm = [ "00_infra.js", diff --git a/ext/web/stream_resource.rs b/ext/web/stream_resource.rs new file mode 100644 index 0000000000..4c2a756483 --- /dev/null +++ b/ext/web/stream_resource.rs @@ -0,0 +1,274 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +use deno_core::anyhow::Error; +use deno_core::error::type_error; +use deno_core::error::AnyError; +use deno_core::op2; +use deno_core::AsyncRefCell; +use deno_core::AsyncResult; +use deno_core::BufView; +use deno_core::CancelFuture; +use deno_core::CancelHandle; +use deno_core::JsBuffer; +use deno_core::OpState; +use deno_core::RcLike; +use deno_core::RcRef; +use deno_core::Resource; +use deno_core::ResourceId; +use futures::stream::Peekable; +use futures::Stream; +use futures::StreamExt; +use std::borrow::Cow; +use std::cell::RefCell; +use std::ffi::c_void; +use std::future::Future; +use std::pin::Pin; +use std::rc::Rc; +use std::task::Context; +use std::task::Poll; +use std::task::Waker; +use tokio::sync::mpsc::Receiver; +use tokio::sync::mpsc::Sender; + +type SenderCell = RefCell>>>; + +// This indirection allows us to more easily integrate the fast streams work at a later date +#[repr(transparent)] +struct ChannelStreamAdapter(C); + +impl Stream for ChannelStreamAdapter +where + C: ChannelBytesRead, +{ + type Item = Result; + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + self.0.poll_recv(cx) + } +} + +pub trait ChannelBytesRead: Unpin + 'static { + fn poll_recv( + &mut self, + cx: &mut Context<'_>, + ) -> Poll>>; +} + +impl ChannelBytesRead for tokio::sync::mpsc::Receiver> { + fn poll_recv( + &mut self, + cx: &mut Context<'_>, + ) -> Poll>> { + self.poll_recv(cx) + } +} + +#[allow(clippy::type_complexity)] +struct ReadableStreamResource { + reader: AsyncRefCell< + Peekable>>>, + >, + cancel_handle: CancelHandle, + data: ReadableStreamResourceData, +} + +impl ReadableStreamResource { + pub fn cancel_handle(self: &Rc) -> impl RcLike { + RcRef::map(self, |s| &s.cancel_handle).clone() + } + + async fn read(self: Rc, limit: usize) -> Result { + let cancel_handle = self.cancel_handle(); + let peekable = RcRef::map(self, |this| &this.reader); + let mut peekable = peekable.borrow_mut().await; + match Pin::new(&mut *peekable) + .peek_mut() + .or_cancel(cancel_handle) + .await? + { + None => Ok(BufView::empty()), + // Take the actual error since we only have a reference to it + Some(Err(_)) => Err(peekable.next().await.unwrap().err().unwrap()), + Some(Ok(bytes)) => { + if bytes.len() <= limit { + // We can safely take the next item since we peeked it + return peekable.next().await.unwrap(); + } + // The remainder of the bytes after we split it is still left in the peek buffer + let ret = bytes.split_to(limit); + Ok(ret) + } + } + } +} + +impl Resource for ReadableStreamResource { + fn name(&self) -> Cow { + Cow::Borrowed("readableStream") + } + + fn read(self: Rc, limit: usize) -> AsyncResult { + Box::pin(ReadableStreamResource::read(self, limit)) + } +} + +// TODO(mmastrac): Move this to deno_core +#[derive(Clone, Debug, Default)] +pub struct CompletionHandle { + inner: Rc>, +} + +#[derive(Debug, Default)] +struct CompletionHandleInner { + complete: bool, + success: bool, + waker: Option, +} + +impl CompletionHandle { + pub fn complete(&self, success: bool) { + let mut mut_self = self.inner.borrow_mut(); + mut_self.complete = true; + mut_self.success = success; + if let Some(waker) = mut_self.waker.take() { + drop(mut_self); + waker.wake(); + } + } +} + +impl Future for CompletionHandle { + type Output = bool; + + fn poll( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + let mut mut_self = self.inner.borrow_mut(); + if mut_self.complete { + return std::task::Poll::Ready(mut_self.success); + } + + mut_self.waker = Some(cx.waker().clone()); + std::task::Poll::Pending + } +} + +fn sender_closed() -> Error { + type_error("sender closed") +} + +/// Allocate a resource that wraps a ReadableStream. +#[op2(fast)] +#[smi] +pub fn op_readable_stream_resource_allocate(state: &mut OpState) -> ResourceId { + let (tx, rx) = tokio::sync::mpsc::channel(1); + let tx = RefCell::new(Some(tx)); + let completion = CompletionHandle::default(); + let tx = Box::new(tx); + let resource = ReadableStreamResource { + cancel_handle: Default::default(), + reader: AsyncRefCell::new(ChannelStreamAdapter(rx).peekable()), + data: ReadableStreamResourceData { + tx: Box::into_raw(tx), + completion, + }, + }; + state.resource_table.add(resource) +} + +#[op2(fast)] +pub fn op_readable_stream_resource_get_sink( + state: &mut OpState, + #[smi] rid: ResourceId, +) -> *const c_void { + let Ok(resource) = state.resource_table.get::(rid) else { + return std::ptr::null(); + }; + resource.data.tx as _ +} + +fn get_sender(sender: *const c_void) -> Option>> { + // SAFETY: We know this is a valid v8::External + unsafe { + (sender as *const SenderCell) + .as_ref() + .and_then(|r| r.borrow_mut().as_ref().cloned()) + } +} + +fn drop_sender(sender: *const c_void) { + // SAFETY: We know this is a valid v8::External + unsafe { + assert!(!sender.is_null()); + _ = Box::from_raw(sender as *mut SenderCell); + } +} + +#[op2(async)] +pub fn op_readable_stream_resource_write_buf( + sender: *const c_void, + #[buffer] buffer: JsBuffer, +) -> impl Future> { + let sender = get_sender(sender); + async move { + let sender = sender.ok_or_else(sender_closed)?; + sender + .send(Ok(buffer.into())) + .await + .map_err(|_| sender_closed())?; + Ok(()) + } +} + +#[op2(async)] +pub fn op_readable_stream_resource_write_error( + sender: *const c_void, + #[string] error: String, +) -> impl Future> { + let sender = get_sender(sender); + async move { + let sender = sender.ok_or_else(sender_closed)?; + sender + .send(Err(type_error(Cow::Owned(error)))) + .await + .map_err(|_| sender_closed())?; + Ok(()) + } +} + +#[op2(fast)] +#[smi] +pub fn op_readable_stream_resource_close(sender: *const c_void) { + drop_sender(sender); +} + +#[op2(async)] +pub fn op_readable_stream_resource_await_close( + state: &mut OpState, + #[smi] rid: ResourceId, +) -> impl Future { + let completion = state + .resource_table + .get::(rid) + .ok() + .map(|r| r.data.completion.clone()); + + async move { + if let Some(completion) = completion { + completion.await; + } + } +} + +struct ReadableStreamResourceData { + tx: *const SenderCell, + completion: CompletionHandle, +} + +impl Drop for ReadableStreamResourceData { + fn drop(&mut self) { + self.completion.complete(true); + } +}