0
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-03-03 09:31:22 -05:00

fix(runtime): no FastStream for unrefable streams (#16095)

This commit is contained in:
Luca Casonato 2022-09-30 00:42:33 +02:00 committed by GitHub
parent 927f4e2e83
commit 38f544538b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 92 additions and 30 deletions

View file

@ -354,7 +354,6 @@
resolveDns,
};
window.__bootstrap.streamUtils = {
readableStreamForRid,
writableStreamForRid,
};
})(this);

View file

@ -648,16 +648,15 @@
const DEFAULT_CHUNK_SIZE = 64 * 1024; // 64 KiB
/**
* @callback unrefCallback
* @param {Promise} promise
* @returns {undefined}
*/
/**
* @param {number} rid
* @param {unrefCallback=} unrefCallback
* Create a new ReadableStream object that is backed by a Resource that
* implements `Resource::read_return`. This object contains enough metadata to
* allow callers to bypass the JavaScript ReadableStream implementation and
* read directly from the underlying resource if they so choose (FastStream).
*
* @param {number} rid The resource ID to read from.
* @returns {ReadableStream<Uint8Array>}
*/
function readableStreamForRid(rid, unrefCallback) {
function readableStreamForRid(rid) {
const stream = webidl.createBranded(ReadableStream);
stream[_maybeRid] = rid;
const underlyingSource = {
@ -665,12 +664,7 @@
async pull(controller) {
const v = controller.byobRequest.view;
try {
const promise = core.read(rid, v);
unrefCallback?.(promise);
const bytesRead = await promise;
const bytesRead = await core.read(rid, v);
if (bytesRead === 0) {
core.tryClose(rid);
controller.close();
@ -695,10 +689,78 @@
underlyingSource,
0,
);
return stream;
}
const promiseIdSymbol = SymbolFor("Deno.core.internalPromiseId");
const _isUnref = Symbol("isUnref");
/**
* Create a new ReadableStream object that is backed by a Resource that
* implements `Resource::read_return`. This readable stream supports being
* refed and unrefed by calling `readableStreamForRidUnrefableRef` and
* `readableStreamForRidUnrefableUnref` on it. Unrefable streams are not
* FastStream compatible.
*
* @param {number} rid The resource ID to read from.
* @returns {ReadableStream<Uint8Array>}
*/
function readableStreamForRidUnrefable(rid) {
const stream = webidl.createBranded(ReadableStream);
stream[promiseIdSymbol] = undefined;
stream[_isUnref] = false;
const underlyingSource = {
type: "bytes",
async pull(controller) {
const v = controller.byobRequest.view;
try {
const promise = core.read(rid, v);
const promiseId = stream[promiseIdSymbol] = promise[promiseIdSymbol];
if (stream[_isUnref]) core.unrefOp(promiseId);
const bytesRead = await promise;
stream[promiseIdSymbol] = undefined;
if (bytesRead === 0) {
core.tryClose(rid);
controller.close();
controller.byobRequest.respond(0);
} else {
controller.byobRequest.respond(bytesRead);
}
} catch (e) {
controller.error(e);
core.tryClose(rid);
}
},
cancel() {
core.tryClose(rid);
},
autoAllocateChunkSize: DEFAULT_CHUNK_SIZE,
};
initializeReadableStream(stream);
setUpReadableByteStreamControllerFromUnderlyingSource(
stream,
underlyingSource,
underlyingSource,
0,
);
return stream;
}
function readableStreamForRidUnrefableRef(stream) {
if (!(_isUnref in stream)) throw new TypeError("Not an unrefable stream");
stream[_isUnref] = false;
if (stream[promiseIdSymbol] !== undefined) {
core.refOp(stream[promiseIdSymbol]);
}
}
function readableStreamForRidUnrefableUnref(stream) {
if (!(_isUnref in stream)) throw new TypeError("Not an unrefable stream");
stream[_isUnref] = true;
if (stream[promiseIdSymbol] !== undefined) {
core.unrefOp(stream[promiseIdSymbol]);
}
}
function getReadableStreamRid(stream) {
return stream[_maybeRid];
}
@ -5921,6 +5983,9 @@
readableStreamClose,
readableStreamDisturb,
readableStreamForRid,
readableStreamForRidUnrefable,
readableStreamForRidUnrefableRef,
readableStreamForRidUnrefableUnref,
getReadableStreamRid,
Deferred,
// Exposed in global runtime scope

View file

@ -16,8 +16,12 @@
PromiseAll,
SymbolFor,
} = window.__bootstrap.primordials;
const { readableStreamForRid, writableStreamForRid } =
window.__bootstrap.streamUtils;
const {
readableStreamForRidUnrefable,
readableStreamForRidUnrefableRef,
readableStreamForRidUnrefableUnref,
} = window.__bootstrap.streams;
const { writableStreamForRid } = window.__bootstrap.streamUtils;
const promiseIdSymbol = SymbolFor("Deno.core.internalPromiseId");
@ -136,18 +140,12 @@
if (stdoutRid !== null) {
this.#stdoutRid = stdoutRid;
this.#stdout = readableStreamForRid(stdoutRid, (promise) => {
this.#stdoutPromiseId = promise[promiseIdSymbol];
if (this.#unrefed) core.unrefOp(this.#stdoutPromiseId);
});
this.#stdout = readableStreamForRidUnrefable(stdoutRid);
}
if (stderrRid !== null) {
this.#stderrRid = stderrRid;
this.#stderr = readableStreamForRid(stderrRid, (promise) => {
this.#stderrPromiseId = promise[promiseIdSymbol];
if (this.#unrefed) core.unrefOp(this.#stderrPromiseId);
});
this.#stderr = readableStreamForRidUnrefable(stderrRid);
}
const onAbort = () => this.kill("SIGTERM");
@ -214,15 +212,15 @@
ref() {
this.#unrefed = false;
core.refOp(this.#waitPromiseId);
if (this.#stdoutPromiseId) core.refOp(this.#stdoutPromiseId);
if (this.#stderrPromiseId) core.refOp(this.#stderrPromiseId);
if (this.#stdout) readableStreamForRidUnrefableRef(this.#stdout);
if (this.#stderr) readableStreamForRidUnrefableRef(this.#stderr);
}
unref() {
this.#unrefed = true;
core.unrefOp(this.#waitPromiseId);
if (this.#stdoutPromiseId) core.unrefOp(this.#stdoutPromiseId);
if (this.#stderrPromiseId) core.unrefOp(this.#stderrPromiseId);
if (this.#stdout) readableStreamForRidUnrefableUnref(this.#stdout);
if (this.#stderr) readableStreamForRidUnrefableUnref(this.#stderr);
}
}