diff --git a/ext/web/06_streams.js b/ext/web/06_streams.js index b66d4dca3d..5e60f1e712 100644 --- a/ext/web/06_streams.js +++ b/ext/web/06_streams.js @@ -279,6 +279,7 @@ const _pullAlgorithm = Symbol("[[pullAlgorithm]]"); const _pulling = Symbol("[[pulling]]"); const _pullSteps = Symbol("[[PullSteps]]"); + const _releaseSteps = Symbol("[[ReleaseSteps]]"); const _queue = Symbol("[[queue]]"); const _queueTotalSize = Symbol("[[queueTotalSize]]"); const _readable = Symbol("[[readable]]"); @@ -800,12 +801,19 @@ "The BYOB request's buffer has been detached and so cannot be filled with an enqueued chunk", ); } + readableByteStreamControllerInvalidateBYOBRequest(controller); firstPendingPullInto.buffer = transferArrayBuffer( firstPendingPullInto.buffer, ); + if (firstPendingPullInto.readerType === "none") { + readableByteStreamControllerEnqueueDetachedPullIntoToQueue( + controller, + firstPendingPullInto, + ); + } } - readableByteStreamControllerInvalidateBYOBRequest(controller); if (readableStreamHasDefaultReader(stream)) { + readableByteStreamControllerProcessReadRequestsUsingQueue(controller); if (readableStreamGetNumReadRequests(stream) === 0) { assert(controller[_pendingPullIntos].length === 0); readableByteStreamControllerEnqueueChunkToQueue( @@ -866,6 +874,54 @@ controller[_queueTotalSize] += byteLength; } + /** + * @param {ReadableByteStreamController} controller + * @param {ArrayBufferLike} buffer + * @param {number} byteOffset + * @param {number} byteLength + * @returns {void} + */ + function readableByteStreamControllerEnqueueClonedChunkToQueue( + controller, + buffer, + byteOffset, + byteLength, + ) { + let cloneResult; + try { + cloneResult = buffer.slice(byteOffset, byteOffset + byteLength); + } catch (e) { + readableByteStreamControllerError(controller, e); + } + readableByteStreamControllerEnqueueChunkToQueue( + controller, + cloneResult, + 0, + byteLength, + ); + } + + /** + * @param {ReadableByteStreamController} controller + * @param {PullIntoDescriptor} pullIntoDescriptor + * @returns {void} + */ + function readableByteStreamControllerEnqueueDetachedPullIntoToQueue( + controller, + pullIntoDescriptor, + ) { + assert(pullIntoDescriptor.readerType === "none"); + if (pullIntoDescriptor.bytesFilled > 0) { + readableByteStreamControllerEnqueueClonedChunkToQueue( + controller, + pullIntoDescriptor.buffer, + pullIntoDescriptor.byteOffset, + pullIntoDescriptor.bytesFilled, + ); + } + readableByteStreamControllerShiftPendingPullInto(controller); + } + /** * @param {ReadableByteStreamController} controller * @returns {ReadableStreamBYOBRequest | null} @@ -1000,10 +1056,11 @@ readableStreamClose(stream); const reader = stream[_reader]; if (reader !== undefined && isReadableStreamBYOBReader(reader)) { - for (const readIntoRequest of reader[_readIntoRequests]) { + const readIntoRequests = reader[_readIntoRequests]; + reader[_readIntoRequests] = []; + for (const readIntoRequest of readIntoRequests) { readIntoRequest.closeSteps(undefined); } - reader[_readIntoRequests] = []; } /** @type {Promise} */ const sourceCancelPromise = stream[_controller][_cancelSteps](reason); @@ -1026,10 +1083,10 @@ if (isReadableStreamDefaultReader(reader)) { /** @type {Array>} */ const readRequests = reader[_readRequests]; + reader[_readRequests] = []; for (const readRequest of readRequests) { readRequest.closeSteps(); } - reader[_readRequests] = []; } // This promise can be double resolved. // See: https://github.com/whatwg/streams/issues/1100 @@ -1224,6 +1281,29 @@ } } + /** + * @param {ReadableStreamBYOBReader} reader + */ + function readableStreamBYOBReaderRelease(reader) { + readableStreamReaderGenericRelease(reader); + const e = new TypeError( + "There are pending read requests, so the reader cannot be released.", + ); + readableStreamBYOBReaderErrorReadIntoRequests(reader, e); + } + + /** + * @param {ReadableStreamBYOBReader} reader + * @param {any} e + */ + function readableStreamDefaultReaderErrorReadRequests(reader, e) { + const readRequests = reader[_readRequests]; + reader[_readRequests] = []; + for (const readRequest of readRequests) { + readRequest.errorSteps(e); + } + } + /** * @param {ReadableByteStreamController} controller */ @@ -1250,6 +1330,25 @@ } } } + /** + * @param {ReadableByteStreamController} controller + */ + function readableByteStreamControllerProcessReadRequestsUsingQueue( + controller, + ) { + const reader = controller[_stream][_reader]; + assert(isReadableStreamDefaultReader(reader)); + while (reader[_readRequests].length !== 0) { + if (controller[_queueTotalSize] === 0) { + return; + } + const readRequest = ArrayPrototypeShift(reader[_readRequests]); + readableByteStreamControllerFillReadRequestFromQueue( + controller, + readRequest, + ); + } + } /** * @param {ReadableByteStreamController} controller @@ -1401,6 +1500,16 @@ bytesWritten, pullIntoDescriptor, ); + if (pullIntoDescriptor.readerType === "none") { + readableByteStreamControllerEnqueueDetachedPullIntoToQueue( + controller, + pullIntoDescriptor, + ); + readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue( + controller, + ); + return; + } if (pullIntoDescriptor.bytesFilled < pullIntoDescriptor.elementSize) { return; } @@ -1410,16 +1519,11 @@ if (remainderSize > 0) { const end = pullIntoDescriptor.byteOffset + pullIntoDescriptor.bytesFilled; - // We dont have access to CloneArrayBuffer, so we use .slice(). End is non-inclusive, as the spec says. - const remainder = pullIntoDescriptor.buffer.slice( - end - remainderSize, - end, - ); - readableByteStreamControllerEnqueueChunkToQueue( + readableByteStreamControllerEnqueueClonedChunkToQueue( controller, - remainder, - 0, - remainder.byteLength, + pullIntoDescriptor.buffer, + end - remainderSize, + remainderSize, ); } pullIntoDescriptor.bytesFilled -= remainderSize; @@ -1484,6 +1588,9 @@ firstDescriptor, ) { assert(firstDescriptor.bytesFilled === 0); + if (firstDescriptor.readerType === "none") { + readableByteStreamControllerShiftPendingPullInto(controller); + } const stream = controller[_stream]; if (readableStreamHasBYOBReader(stream)) { while (readableStreamGetNumReadIntoRequests(stream) > 0) { @@ -1507,6 +1614,7 @@ pullIntoDescriptor, ) { assert(stream[_state] !== "errored"); + assert(pullIntoDescriptor.readerType !== "none"); let done = false; if (stream[_state] === "closed") { assert(pullIntoDescriptor.bytesFilled === 0); @@ -1650,6 +1758,27 @@ return ready; } + /** + * @param {ReadableByteStreamController} controller + * @param {ReadRequest} readRequest + * @returns {void} + */ + function readableByteStreamControllerFillReadRequestFromQueue( + controller, + readRequest, + ) { + assert(controller[_queueTotalSize] > 0); + const entry = ArrayPrototypeShift(controller[_queue]); + controller[_queueTotalSize] -= entry.byteLength; + readableByteStreamControllerHandleQueueDrain(controller); + const view = new Uint8Array( + entry.buffer, + entry.byteOffset, + entry.byteLength, + ); + readRequest.chunkSteps(view); + } + /** * @param {ReadableByteStreamController} controller * @param {number} size @@ -1708,6 +1837,18 @@ } } + /** + * @template R + * @param {ReadableStreamDefaultReader} reader + */ + function readableStreamDefaultReaderRelease(reader) { + readableStreamReaderGenericRelease(reader); + const e = new TypeError( + "There are pending read requests, so the reader cannot be released.", + ); + readableStreamDefaultReaderErrorReadRequests(reader, e); + } + /** * @template R * @param {ReadableStream} stream @@ -1727,18 +1868,10 @@ closedPromise.reject(e); setPromiseIsHandledToTrue(closedPromise.promise); if (isReadableStreamDefaultReader(reader)) { - /** @type {Array>} */ - const readRequests = reader[_readRequests]; - for (const readRequest of readRequests) { - readRequest.errorSteps(e); - } - reader[_readRequests] = []; + readableStreamDefaultReaderErrorReadRequests(reader, e); } else { assert(isReadableStreamBYOBReader(reader)); - for (const readIntoRequest of reader[_readIntoRequests]) { - readIntoRequest.errorSteps(e); - } - reader[_readIntoRequests] = []; + readableStreamBYOBReaderErrorReadIntoRequests(reader, e); } } @@ -2104,7 +2237,7 @@ */ function finalize(isError, error) { writableStreamDefaultWriterRelease(writer); - readableStreamReaderGenericRelease(reader); + readableStreamDefaultReaderRelease(reader); if (signal !== undefined) { signal[remove](abortAlgorithm); @@ -2154,9 +2287,10 @@ * @param {ReadableStreamGenericReader | ReadableStreamBYOBReader} reader */ function readableStreamReaderGenericRelease(reader) { - assert(reader[_stream] !== undefined); - assert(reader[_stream][_reader] === reader); - if (reader[_stream][_state] === "readable") { + const stream = reader[_stream]; + assert(stream !== undefined); + assert(stream[_reader] === reader); + if (stream[_state] === "readable") { reader[_closedPromise].reject( new TypeError( "Reader was released and can no longer be used to monitor the stream's closedness.", @@ -2171,10 +2305,23 @@ ); } setPromiseIsHandledToTrue(reader[_closedPromise].promise); - reader[_stream][_reader] = undefined; + stream[_controller][_releaseSteps](); + stream[_reader] = undefined; reader[_stream] = undefined; } + /** + * @param {ReadableStreamBYOBReader} reader + * @param {any} e + */ + function readableStreamBYOBReaderErrorReadIntoRequests(reader, e) { + const readIntoRequests = reader[_readIntoRequests]; + reader[_readIntoRequests] = []; + for (const readIntoRequest of readIntoRequests) { + readIntoRequest.errorSteps(e); + } + } + /** * @template R * @param {ReadableStream} stream @@ -2381,7 +2528,7 @@ function pullWithDefaultReader() { if (isReadableStreamBYOBReader(reader)) { assert(reader[_readIntoRequests].length === 0); - readableStreamReaderGenericRelease(reader); + readableStreamBYOBReaderRelease(reader); reader = acquireReadableStreamDefaultReader(stream); forwardReaderError(reader); } @@ -2446,7 +2593,7 @@ function pullWithBYOBReader(view, forBranch2) { if (isReadableStreamDefaultReader(reader)) { assert(reader[_readRequests].length === 0); - readableStreamReaderGenericRelease(reader); + readableStreamDefaultReaderRelease(reader); reader = acquireReadableStreamBYOBReader(stream); forwardReaderError(reader); } @@ -3982,11 +4129,11 @@ promise.resolve(createIteratorResult(chunk, false)); }, closeSteps() { - readableStreamReaderGenericRelease(reader); + readableStreamDefaultReaderRelease(reader); promise.resolve(createIteratorResult(undefined, true)); }, errorSteps(e) { - readableStreamReaderGenericRelease(reader); + readableStreamDefaultReaderRelease(reader); promise.reject(e); }, }; @@ -4006,11 +4153,11 @@ assert(reader[_readRequests].length === 0); if (this[_preventCancel] === false) { const result = readableStreamReaderGenericCancel(reader, arg); - readableStreamReaderGenericRelease(reader); + readableStreamDefaultReaderRelease(reader); await result; return createIteratorResult(arg, true); } - readableStreamReaderGenericRelease(reader); + readableStreamDefaultReaderRelease(reader); return createIteratorResult(undefined, true); }, }, asyncIteratorPrototype); @@ -4417,12 +4564,7 @@ if (this[_stream] === undefined) { return; } - if (this[_readRequests].length) { - throw new TypeError( - "There are pending read requests, so the reader cannot be release.", - ); - } - readableStreamReaderGenericRelease(this); + readableStreamDefaultReaderRelease(this); } get closed() { @@ -4544,12 +4686,7 @@ if (this[_stream] === undefined) { return; } - if (this[_readIntoRequests].length !== 0) { - throw new TypeError( - "There are pending read requests, so the reader cannot be released.", - ); - } - readableStreamReaderGenericRelease(this); + readableStreamBYOBReaderRelease(this); } get closed() { @@ -4794,15 +4931,7 @@ assert(readableStreamHasDefaultReader(stream)); if (this[_queueTotalSize] > 0) { assert(readableStreamGetNumReadRequests(stream) === 0); - const entry = ArrayPrototypeShift(this[_queue]); - this[_queueTotalSize] -= entry.byteLength; - readableByteStreamControllerHandleQueueDrain(this); - const view = new Uint8Array( - entry.buffer, - entry.byteOffset, - entry.byteLength, - ); - readRequest.chunkSteps(view); + readableByteStreamControllerFillReadRequestFromQueue(this, readRequest); return; } const autoAllocateChunkSize = this[_autoAllocateChunkSize]; @@ -4830,6 +4959,15 @@ readableStreamAddReadRequest(stream, readRequest); readableByteStreamControllerCallPullIfNeeded(this); } + + [_releaseSteps]() { + if (this[_pendingPullIntos].length !== 0) { + /** @type {PullIntoDescriptor} */ + const firstPendingPullInto = this[_pendingPullIntos][0]; + firstPendingPullInto.readerType = "none"; + this[_pendingPullIntos] = [firstPendingPullInto]; + } + } } webidl.configurePrototype(ReadableByteStreamController); @@ -4944,6 +5082,10 @@ readableStreamDefaultControllerCallPullIfNeeded(this); } } + + [_releaseSteps]() { + return; + } } webidl.configurePrototype(ReadableStreamDefaultController); diff --git a/ext/web/06_streams_types.d.ts b/ext/web/06_streams_types.d.ts index 2347dc0a43..bc3306b945 100644 --- a/ext/web/06_streams_types.d.ts +++ b/ext/web/06_streams_types.d.ts @@ -33,7 +33,7 @@ interface PullIntoDescriptor { elementSize: number; // deno-lint-ignore no-explicit-any viewConstructor: any; - readerType: "default" | "byob"; + readerType: "default" | "byob" | "none"; } interface ReadableByteStreamQueueEntry { diff --git a/tools/wpt/expectation.json b/tools/wpt/expectation.json index 11760c77d2..4251f75920 100644 --- a/tools/wpt/expectation.json +++ b/tools/wpt/expectation.json @@ -1430,42 +1430,12 @@ "construct-byob-request.any.html": true, "construct-byob-request.any.worker.html": true, "general.any.html": [ - "ReadableStream with byte source: releaseLock() on ReadableStreamDefaultReader must reject pending read()", - "ReadableStream with byte source: releaseLock() on ReadableStreamBYOBReader must reject pending read()", "ReadableStream with byte source: Respond to multiple pull() by separate enqueue()", - "pull() resolving should not resolve read()", - "ReadableStream with byte source: enqueue() discards auto-allocated BYOB request", - "ReadableStream with byte source: releaseLock() with pending read(view), read(view) on second reader, respond()", - "ReadableStream with byte source: releaseLock() with pending read(view), read(view) on second reader with 1 element Uint16Array, respond(1)", - "ReadableStream with byte source: releaseLock() with pending read(view), read(view) on second reader with 2 element Uint8Array, respond(3)", - "ReadableStream with byte source: releaseLock() with pending read(view), read(view) on second reader, respondWithNewView()", - "ReadableStream with byte source: releaseLock() with pending read(view), read(view) on second reader, enqueue()", - "ReadableStream with byte source: releaseLock() with pending read(view), read(view) on second reader, close(), respond(0)", - "ReadableStream with byte source: autoAllocateChunkSize, releaseLock() with pending read(), read() on second reader, respond()", - "ReadableStream with byte source: autoAllocateChunkSize, releaseLock() with pending read(), read() on second reader, enqueue()", - "ReadableStream with byte source: autoAllocateChunkSize, releaseLock() with pending read(), read(view) on second reader, respond()", - "ReadableStream with byte source: autoAllocateChunkSize, releaseLock() with pending read(), read(view) on second reader, enqueue()", - "ReadableStream with byte source: read(view) with 1 element Uint16Array, respond(1), releaseLock(), read(view) on second reader with 1 element Uint16Array, respond(1)", - "ReadableStream with byte source: read(view) with 1 element Uint16Array, respond(1), releaseLock(), read() on second reader, enqueue()" + "ReadableStream with byte source: enqueue() discards auto-allocated BYOB request" ], "general.any.worker.html": [ - "ReadableStream with byte source: releaseLock() on ReadableStreamDefaultReader must reject pending read()", - "ReadableStream with byte source: releaseLock() on ReadableStreamBYOBReader must reject pending read()", "ReadableStream with byte source: Respond to multiple pull() by separate enqueue()", - "pull() resolving should not resolve read()", - "ReadableStream with byte source: enqueue() discards auto-allocated BYOB request", - "ReadableStream with byte source: releaseLock() with pending read(view), read(view) on second reader, respond()", - "ReadableStream with byte source: releaseLock() with pending read(view), read(view) on second reader with 1 element Uint16Array, respond(1)", - "ReadableStream with byte source: releaseLock() with pending read(view), read(view) on second reader with 2 element Uint8Array, respond(3)", - "ReadableStream with byte source: releaseLock() with pending read(view), read(view) on second reader, respondWithNewView()", - "ReadableStream with byte source: releaseLock() with pending read(view), read(view) on second reader, enqueue()", - "ReadableStream with byte source: releaseLock() with pending read(view), read(view) on second reader, close(), respond(0)", - "ReadableStream with byte source: autoAllocateChunkSize, releaseLock() with pending read(), read() on second reader, respond()", - "ReadableStream with byte source: autoAllocateChunkSize, releaseLock() with pending read(), read() on second reader, enqueue()", - "ReadableStream with byte source: autoAllocateChunkSize, releaseLock() with pending read(), read(view) on second reader, respond()", - "ReadableStream with byte source: autoAllocateChunkSize, releaseLock() with pending read(), read(view) on second reader, enqueue()", - "ReadableStream with byte source: read(view) with 1 element Uint16Array, respond(1), releaseLock(), read(view) on second reader with 1 element Uint16Array, respond(1)", - "ReadableStream with byte source: read(view) with 1 element Uint16Array, respond(1), releaseLock(), read() on second reader, enqueue()" + "ReadableStream with byte source: enqueue() discards auto-allocated BYOB request" ], "non-transferable-buffers.any.html": false, "non-transferable-buffers.any.worker.html": false, @@ -1487,12 +1457,8 @@ "constructor.any.worker.html": true, "count-queuing-strategy-integration.any.html": true, "count-queuing-strategy-integration.any.worker.html": true, - "default-reader.any.html": [ - "Second reader can read chunks after first reader was released with pending read requests" - ], - "default-reader.any.worker.html": [ - "Second reader can read chunks after first reader was released with pending read requests" - ], + "default-reader.any.html": true, + "default-reader.any.worker.html": true, "floating-point-total-queue-size.any.html": true, "floating-point-total-queue-size.any.worker.html": true, "garbage-collection.any.html": true, @@ -1505,12 +1471,8 @@ "reentrant-strategies.any.worker.html": true, "tee.any.html": false, "tee.any.worker.html": false, - "templated.any.html": [ - "ReadableStream (empty) reader: releasing the lock should reject all pending read requests" - ], - "templated.any.worker.html": [ - "ReadableStream (empty) reader: releasing the lock should reject all pending read requests" - ] + "templated.any.html": true, + "templated.any.worker.html": true }, "transform-streams": { "backpressure.any.html": true, @@ -4189,4 +4151,4 @@ "Pattern: [{\"pathname\":\"*//*\"}] Inputs: [{\"pathname\":\"foo/bar\"}]" ] } -} \ No newline at end of file +}