diff --git a/cli/tests/unit/http_test.ts b/cli/tests/unit/http_test.ts index e2a7c24516..e7c99352fc 100644 --- a/cli/tests/unit/http_test.ts +++ b/cli/tests/unit/http_test.ts @@ -2459,6 +2459,62 @@ Deno.test( }, ); +Deno.test( + { permissions: { net: true } }, + async function httpServerRequestResponseClone() { + const body = "deno".repeat(64 * 1024); + let httpConn: Deno.HttpConn; + const listener = Deno.listen({ port: 4501 }); + const promise = (async () => { + const conn = await listener.accept(); + listener.close(); + httpConn = Deno.serveHttp(conn); + const reqEvent = await httpConn.nextRequest(); + assert(reqEvent); + const { request, respondWith } = reqEvent; + const clone = request.clone(); + const reader = clone.body!.getReader(); + + // get first chunk from branch2 + const clonedChunks = []; + const { value, done } = await reader.read(); + assert(!done); + clonedChunks.push(value); + + // consume request after first chunk single read + // readAll should read correctly the rest of the body. + // firstChunk should be in the stream internal buffer + const body1 = await request.text(); + + while (true) { + const { value, done } = await reader.read(); + if (done) break; + clonedChunks.push(value); + } + let offset = 0; + const body2 = new Uint8Array(body.length); + for (const chunk of clonedChunks) { + body2.set(chunk, offset); + offset += chunk.byteLength; + } + + assertEquals(body1, body); + assertEquals(body1, new TextDecoder().decode(body2)); + await respondWith(new Response(body)); + })(); + + const response = await fetch("http://localhost:4501", { + body, + method: "POST", + }); + const clone = response.clone(); + assertEquals(await response.text(), await clone.text()); + + await promise; + httpConn!.close(); + }, +); + function chunkedBodyReader(h: Headers, r: BufReader): Deno.Reader { // Based on https://tools.ietf.org/html/rfc2616#section-19.4.6 const tp = new TextProtoReader(r); diff --git a/core/01_core.js b/core/01_core.js index 04f0eba77d..7bee019d9f 100644 --- a/core/01_core.js +++ b/core/01_core.js @@ -323,6 +323,7 @@ close: (rid) => ops.op_close(rid), tryClose: (rid) => ops.op_try_close(rid), read: opAsync.bind(null, "op_read"), + readAll: opAsync.bind(null, "op_read_all"), write: opAsync.bind(null, "op_write"), writeAll: opAsync.bind(null, "op_write_all"), shutdown: opAsync.bind(null, "op_shutdown"), diff --git a/ext/web/06_streams.js b/ext/web/06_streams.js index c783b95183..52488efb69 100644 --- a/ext/web/06_streams.js +++ b/ext/web/06_streams.js @@ -652,6 +652,9 @@ const RESOURCE_REGISTRY = new FinalizationRegistry((rid) => { core.tryClose(rid); }); + + const _readAll = Symbol("[[readAll]]"); + const _original = Symbol("[[original]]"); /** * Create a new ReadableStream object that is backed by a Resource that * implements `Resource::read_return`. This object contains enough metadata to @@ -681,6 +684,17 @@ async pull(controller) { const v = controller.byobRequest.view; try { + if (controller[_readAll] === true) { + // fast path for tee'd streams consuming body + const chunk = await core.readAll(rid); + if (chunk.byteLength > 0) { + controller.enqueue(chunk); + } + controller.close(); + tryClose(); + return; + } + const bytesRead = await core.read(rid, v); if (bytesRead === 0) { tryClose(); @@ -809,8 +823,17 @@ /** @type {Uint8Array[]} */ const chunks = []; let totalLength = 0; + + // tee'd stream + if (stream[_original]) { + // One of the branches is consuming the stream + // signal controller.pull that we can consume it in a single op + stream[_original][_controller][_readAll] = true; + } + while (true) { const { value: chunk, done } = await reader.read(); + if (done) break; if (!ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, chunk)) { @@ -3029,6 +3052,10 @@ pull2Algorithm, cancel2Algorithm, ); + + branch1[_original] = stream; + branch2[_original] = stream; + forwardReaderError(reader); return [branch1, branch2]; }