From 38f544538b337074cbce317e67859a69bb23684c Mon Sep 17 00:00:00 2001 From: Luca Casonato Date: Fri, 30 Sep 2022 00:42:33 +0200 Subject: [PATCH] fix(runtime): no FastStream for unrefable streams (#16095) --- ext/net/01_net.js | 1 - ext/web/06_streams.js | 95 +++++++++++++++++++++++++++++++++++------- runtime/js/40_spawn.js | 26 ++++++------ 3 files changed, 92 insertions(+), 30 deletions(-) diff --git a/ext/net/01_net.js b/ext/net/01_net.js index 2c7ec0f47a..04360116b2 100644 --- a/ext/net/01_net.js +++ b/ext/net/01_net.js @@ -354,7 +354,6 @@ resolveDns, }; window.__bootstrap.streamUtils = { - readableStreamForRid, writableStreamForRid, }; })(this); diff --git a/ext/web/06_streams.js b/ext/web/06_streams.js index bd3b79149b..cbf781b53f 100644 --- a/ext/web/06_streams.js +++ b/ext/web/06_streams.js @@ -648,16 +648,15 @@ const DEFAULT_CHUNK_SIZE = 64 * 1024; // 64 KiB /** - * @callback unrefCallback - * @param {Promise} promise - * @returns {undefined} - */ - /** - * @param {number} rid - * @param {unrefCallback=} unrefCallback + * Create a new ReadableStream object that is backed by a Resource that + * implements `Resource::read_return`. This object contains enough metadata to + * allow callers to bypass the JavaScript ReadableStream implementation and + * read directly from the underlying resource if they so choose (FastStream). + * + * @param {number} rid The resource ID to read from. * @returns {ReadableStream} */ - function readableStreamForRid(rid, unrefCallback) { + function readableStreamForRid(rid) { const stream = webidl.createBranded(ReadableStream); stream[_maybeRid] = rid; const underlyingSource = { @@ -665,12 +664,7 @@ async pull(controller) { const v = controller.byobRequest.view; try { - const promise = core.read(rid, v); - - unrefCallback?.(promise); - - const bytesRead = await promise; - + const bytesRead = await core.read(rid, v); if (bytesRead === 0) { core.tryClose(rid); controller.close(); @@ -695,10 +689,78 @@ underlyingSource, 0, ); - return stream; } + const promiseIdSymbol = SymbolFor("Deno.core.internalPromiseId"); + const _isUnref = Symbol("isUnref"); + /** + * Create a new ReadableStream object that is backed by a Resource that + * implements `Resource::read_return`. This readable stream supports being + * refed and unrefed by calling `readableStreamForRidUnrefableRef` and + * `readableStreamForRidUnrefableUnref` on it. Unrefable streams are not + * FastStream compatible. + * + * @param {number} rid The resource ID to read from. + * @returns {ReadableStream} + */ + function readableStreamForRidUnrefable(rid) { + const stream = webidl.createBranded(ReadableStream); + stream[promiseIdSymbol] = undefined; + stream[_isUnref] = false; + const underlyingSource = { + type: "bytes", + async pull(controller) { + const v = controller.byobRequest.view; + try { + const promise = core.read(rid, v); + const promiseId = stream[promiseIdSymbol] = promise[promiseIdSymbol]; + if (stream[_isUnref]) core.unrefOp(promiseId); + const bytesRead = await promise; + stream[promiseIdSymbol] = undefined; + if (bytesRead === 0) { + core.tryClose(rid); + controller.close(); + controller.byobRequest.respond(0); + } else { + controller.byobRequest.respond(bytesRead); + } + } catch (e) { + controller.error(e); + core.tryClose(rid); + } + }, + cancel() { + core.tryClose(rid); + }, + autoAllocateChunkSize: DEFAULT_CHUNK_SIZE, + }; + initializeReadableStream(stream); + setUpReadableByteStreamControllerFromUnderlyingSource( + stream, + underlyingSource, + underlyingSource, + 0, + ); + return stream; + } + + function readableStreamForRidUnrefableRef(stream) { + if (!(_isUnref in stream)) throw new TypeError("Not an unrefable stream"); + stream[_isUnref] = false; + if (stream[promiseIdSymbol] !== undefined) { + core.refOp(stream[promiseIdSymbol]); + } + } + + function readableStreamForRidUnrefableUnref(stream) { + if (!(_isUnref in stream)) throw new TypeError("Not an unrefable stream"); + stream[_isUnref] = true; + if (stream[promiseIdSymbol] !== undefined) { + core.unrefOp(stream[promiseIdSymbol]); + } + } + function getReadableStreamRid(stream) { return stream[_maybeRid]; } @@ -5921,6 +5983,9 @@ readableStreamClose, readableStreamDisturb, readableStreamForRid, + readableStreamForRidUnrefable, + readableStreamForRidUnrefableRef, + readableStreamForRidUnrefableUnref, getReadableStreamRid, Deferred, // Exposed in global runtime scope diff --git a/runtime/js/40_spawn.js b/runtime/js/40_spawn.js index daa4f8ff88..99661bf1a9 100644 --- a/runtime/js/40_spawn.js +++ b/runtime/js/40_spawn.js @@ -16,8 +16,12 @@ PromiseAll, SymbolFor, } = window.__bootstrap.primordials; - const { readableStreamForRid, writableStreamForRid } = - window.__bootstrap.streamUtils; + const { + readableStreamForRidUnrefable, + readableStreamForRidUnrefableRef, + readableStreamForRidUnrefableUnref, + } = window.__bootstrap.streams; + const { writableStreamForRid } = window.__bootstrap.streamUtils; const promiseIdSymbol = SymbolFor("Deno.core.internalPromiseId"); @@ -136,18 +140,12 @@ if (stdoutRid !== null) { this.#stdoutRid = stdoutRid; - this.#stdout = readableStreamForRid(stdoutRid, (promise) => { - this.#stdoutPromiseId = promise[promiseIdSymbol]; - if (this.#unrefed) core.unrefOp(this.#stdoutPromiseId); - }); + this.#stdout = readableStreamForRidUnrefable(stdoutRid); } if (stderrRid !== null) { this.#stderrRid = stderrRid; - this.#stderr = readableStreamForRid(stderrRid, (promise) => { - this.#stderrPromiseId = promise[promiseIdSymbol]; - if (this.#unrefed) core.unrefOp(this.#stderrPromiseId); - }); + this.#stderr = readableStreamForRidUnrefable(stderrRid); } const onAbort = () => this.kill("SIGTERM"); @@ -214,15 +212,15 @@ ref() { this.#unrefed = false; core.refOp(this.#waitPromiseId); - if (this.#stdoutPromiseId) core.refOp(this.#stdoutPromiseId); - if (this.#stderrPromiseId) core.refOp(this.#stderrPromiseId); + if (this.#stdout) readableStreamForRidUnrefableRef(this.#stdout); + if (this.#stderr) readableStreamForRidUnrefableRef(this.#stderr); } unref() { this.#unrefed = true; core.unrefOp(this.#waitPromiseId); - if (this.#stdoutPromiseId) core.unrefOp(this.#stdoutPromiseId); - if (this.#stderrPromiseId) core.unrefOp(this.#stderrPromiseId); + if (this.#stdout) readableStreamForRidUnrefableUnref(this.#stdout); + if (this.#stderr) readableStreamForRidUnrefableUnref(this.#stderr); } }