1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-22 15:10:44 -05:00
denoland-deno/cli/rt/11_streams.js
2020-08-19 14:43:20 +02:00

3289 lines
94 KiB
JavaScript

// Copyright 2018-2020 the Deno authors. All rights reserved. MIT license.
// This code closely follows the WHATWG Stream Specification
// See: https://streams.spec.whatwg.org/
//
// There are some parts that are not fully implemented, and there are some
// comments which point to steps of the specification that are not implemented.
((window) => {
/* eslint-disable @typescript-eslint/no-explicit-any,require-await */
const { cloneValue, setFunctionName } = window.__bootstrap.webUtil;
const { assert, AssertionError } = window.__bootstrap.util;
const { customInspect, inspect } = window.__bootstrap.console;
const sym = {
abortAlgorithm: Symbol("abortAlgorithm"),
abortSteps: Symbol("abortSteps"),
asyncIteratorReader: Symbol("asyncIteratorReader"),
autoAllocateChunkSize: Symbol("autoAllocateChunkSize"),
backpressure: Symbol("backpressure"),
backpressureChangePromise: Symbol("backpressureChangePromise"),
byobRequest: Symbol("byobRequest"),
cancelAlgorithm: Symbol("cancelAlgorithm"),
cancelSteps: Symbol("cancelSteps"),
closeAlgorithm: Symbol("closeAlgorithm"),
closedPromise: Symbol("closedPromise"),
closeRequest: Symbol("closeRequest"),
closeRequested: Symbol("closeRequested"),
controlledReadableByteStream: Symbol(
"controlledReadableByteStream",
),
controlledReadableStream: Symbol("controlledReadableStream"),
controlledTransformStream: Symbol("controlledTransformStream"),
controlledWritableStream: Symbol("controlledWritableStream"),
disturbed: Symbol("disturbed"),
errorSteps: Symbol("errorSteps"),
flushAlgorithm: Symbol("flushAlgorithm"),
forAuthorCode: Symbol("forAuthorCode"),
inFlightWriteRequest: Symbol("inFlightWriteRequest"),
inFlightCloseRequest: Symbol("inFlightCloseRequest"),
isFakeDetached: Symbol("isFakeDetached"),
ownerReadableStream: Symbol("ownerReadableStream"),
ownerWritableStream: Symbol("ownerWritableStream"),
pendingAbortRequest: Symbol("pendingAbortRequest"),
preventCancel: Symbol("preventCancel"),
pullAgain: Symbol("pullAgain"),
pullAlgorithm: Symbol("pullAlgorithm"),
pulling: Symbol("pulling"),
pullSteps: Symbol("pullSteps"),
queue: Symbol("queue"),
queueTotalSize: Symbol("queueTotalSize"),
readable: Symbol("readable"),
readableStreamController: Symbol("readableStreamController"),
reader: Symbol("reader"),
readRequests: Symbol("readRequests"),
readyPromise: Symbol("readyPromise"),
started: Symbol("started"),
state: Symbol("state"),
storedError: Symbol("storedError"),
strategyHWM: Symbol("strategyHWM"),
strategySizeAlgorithm: Symbol("strategySizeAlgorithm"),
transformAlgorithm: Symbol("transformAlgorithm"),
transformStreamController: Symbol("transformStreamController"),
writableStreamController: Symbol("writableStreamController"),
writeAlgorithm: Symbol("writeAlgorithm"),
writable: Symbol("writable"),
writer: Symbol("writer"),
writeRequests: Symbol("writeRequests"),
};
class ReadableByteStreamController {
constructor() {
throw new TypeError(
"ReadableByteStreamController's constructor cannot be called.",
);
}
get byobRequest() {
return undefined;
}
get desiredSize() {
if (!isReadableByteStreamController(this)) {
throw new TypeError("Invalid ReadableByteStreamController.");
}
return readableByteStreamControllerGetDesiredSize(this);
}
close() {
if (!isReadableByteStreamController(this)) {
throw new TypeError("Invalid ReadableByteStreamController.");
}
if (this[sym.closeRequested]) {
throw new TypeError("Closed already requested.");
}
if (this[sym.controlledReadableByteStream][sym.state] !== "readable") {
throw new TypeError(
"ReadableByteStreamController's stream is not in a readable state.",
);
}
readableByteStreamControllerClose(this);
}
enqueue(chunk) {
if (!isReadableByteStreamController(this)) {
throw new TypeError("Invalid ReadableByteStreamController.");
}
if (this[sym.closeRequested]) {
throw new TypeError("Closed already requested.");
}
if (this[sym.controlledReadableByteStream][sym.state] !== "readable") {
throw new TypeError(
"ReadableByteStreamController's stream is not in a readable state.",
);
}
if (!ArrayBuffer.isView(chunk)) {
throw new TypeError(
"You can only enqueue array buffer views when using a ReadableByteStreamController",
);
}
if (isDetachedBuffer(chunk.buffer)) {
throw new TypeError(
"Cannot enqueue a view onto a detached ArrayBuffer",
);
}
readableByteStreamControllerEnqueue(this, chunk);
}
error(error) {
if (!isReadableByteStreamController(this)) {
throw new TypeError("Invalid ReadableByteStreamController.");
}
readableByteStreamControllerError(this, error);
}
[sym.cancelSteps](reason) {
// 3.11.5.1.1 If this.[[pendingPullIntos]] is not empty,
resetQueue(this);
const result = this[sym.cancelAlgorithm](reason);
readableByteStreamControllerClearAlgorithms(this);
return result;
}
[sym.pullSteps]() {
const stream = this[sym.controlledReadableByteStream];
assert(readableStreamHasDefaultReader(stream));
if (this[sym.queueTotalSize] > 0) {
assert(readableStreamGetNumReadRequests(stream) === 0);
const entry = this[sym.queue].shift();
assert(entry);
this[sym.queueTotalSize] -= entry.size;
readableByteStreamControllerHandleQueueDrain(this);
const view = new Uint8Array(entry.value, entry.offset, entry.size);
return Promise.resolve(
readableStreamCreateReadResult(
view,
false,
stream[sym.reader][sym.forAuthorCode],
),
);
}
// 3.11.5.2.5 If autoAllocateChunkSize is not undefined,
const promise = readableStreamAddReadRequest(stream);
readableByteStreamControllerCallPullIfNeeded(this);
return promise;
}
[customInspect]() {
return `${this.constructor.name} { byobRequest: ${
String(this.byobRequest)
}, desiredSize: ${String(this.desiredSize)} }`;
}
}
class ReadableStreamDefaultController {
constructor() {
throw new TypeError(
"ReadableStreamDefaultController's constructor cannot be called.",
);
}
get desiredSize() {
if (!isReadableStreamDefaultController(this)) {
throw new TypeError("Invalid ReadableStreamDefaultController.");
}
return readableStreamDefaultControllerGetDesiredSize(this);
}
close() {
if (!isReadableStreamDefaultController(this)) {
throw new TypeError("Invalid ReadableStreamDefaultController.");
}
if (!readableStreamDefaultControllerCanCloseOrEnqueue(this)) {
throw new TypeError(
"ReadableStreamDefaultController cannot close or enqueue.",
);
}
readableStreamDefaultControllerClose(this);
}
enqueue(chunk) {
if (!isReadableStreamDefaultController(this)) {
throw new TypeError("Invalid ReadableStreamDefaultController.");
}
if (!readableStreamDefaultControllerCanCloseOrEnqueue(this)) {
throw new TypeError("ReadableSteamController cannot enqueue.");
}
return readableStreamDefaultControllerEnqueue(this, chunk);
}
error(error) {
if (!isReadableStreamDefaultController(this)) {
throw new TypeError("Invalid ReadableStreamDefaultController.");
}
readableStreamDefaultControllerError(this, error);
}
[sym.cancelSteps](reason) {
resetQueue(this);
const result = this[sym.cancelAlgorithm](reason);
readableStreamDefaultControllerClearAlgorithms(this);
return result;
}
[sym.pullSteps]() {
const stream = this[sym.controlledReadableStream];
if (this[sym.queue].length) {
const chunk = dequeueValue(this);
if (this[sym.closeRequested] && this[sym.queue].length === 0) {
readableStreamDefaultControllerClearAlgorithms(this);
readableStreamClose(stream);
} else {
readableStreamDefaultControllerCallPullIfNeeded(this);
}
return Promise.resolve(
readableStreamCreateReadResult(
chunk,
false,
stream[sym.reader][sym.forAuthorCode],
),
);
}
const pendingPromise = readableStreamAddReadRequest(stream);
readableStreamDefaultControllerCallPullIfNeeded(this);
return pendingPromise;
}
[customInspect]() {
return `${this.constructor.name} { desiredSize: ${
String(this.desiredSize)
} }`;
}
}
class ReadableStreamDefaultReader {
constructor(stream) {
if (!isReadableStream(stream)) {
throw new TypeError("stream is not a ReadableStream.");
}
if (isReadableStreamLocked(stream)) {
throw new TypeError("stream is locked.");
}
readableStreamReaderGenericInitialize(this, stream);
this[sym.readRequests] = [];
}
get closed() {
if (!isReadableStreamDefaultReader(this)) {
return Promise.reject(
new TypeError("Invalid ReadableStreamDefaultReader."),
);
}
return (
this[sym.closedPromise].promise ??
Promise.reject(new TypeError("Invalid reader."))
);
}
cancel(reason) {
if (!isReadableStreamDefaultReader(this)) {
return Promise.reject(
new TypeError("Invalid ReadableStreamDefaultReader."),
);
}
if (!this[sym.ownerReadableStream]) {
return Promise.reject(new TypeError("Invalid reader."));
}
return readableStreamReaderGenericCancel(this, reason);
}
read() {
if (!isReadableStreamDefaultReader(this)) {
return Promise.reject(
new TypeError("Invalid ReadableStreamDefaultReader."),
);
}
if (!this[sym.ownerReadableStream]) {
return Promise.reject(new TypeError("Invalid reader."));
}
return readableStreamDefaultReaderRead(this);
}
releaseLock() {
if (!isReadableStreamDefaultReader(this)) {
throw new TypeError("Invalid ReadableStreamDefaultReader.");
}
if (this[sym.ownerReadableStream] === undefined) {
return;
}
if (this[sym.readRequests].length) {
throw new TypeError("Cannot release lock with pending read requests.");
}
readableStreamReaderGenericRelease(this);
}
[customInspect]() {
return `${this.constructor.name} { closed: Promise }`;
}
}
const AsyncIteratorPrototype = Object
.getPrototypeOf(Object.getPrototypeOf(async function* () {}).prototype);
const ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf({
next() {
if (!isReadableStreamAsyncIterator(this)) {
return Promise.reject(
new TypeError("invalid ReadableStreamAsyncIterator."),
);
}
const reader = this[sym.asyncIteratorReader];
if (!reader[sym.ownerReadableStream]) {
return Promise.reject(
new TypeError("reader owner ReadableStream is undefined."),
);
}
return readableStreamDefaultReaderRead(reader).then((result) => {
assert(typeof result === "object");
const { done } = result;
assert(typeof done === "boolean");
if (done) {
readableStreamReaderGenericRelease(reader);
}
const { value } = result;
return readableStreamCreateReadResult(value, done, true);
});
},
return(
value,
) {
if (!isReadableStreamAsyncIterator(this)) {
return Promise.reject(
new TypeError("invalid ReadableStreamAsyncIterator."),
);
}
const reader = this[sym.asyncIteratorReader];
if (!reader[sym.ownerReadableStream]) {
return Promise.reject(
new TypeError("reader owner ReadableStream is undefined."),
);
}
if (reader[sym.readRequests].length) {
return Promise.reject(
new TypeError("reader has outstanding read requests."),
);
}
if (!this[sym.preventCancel]) {
const result = readableStreamReaderGenericCancel(reader, value);
readableStreamReaderGenericRelease(reader);
return result.then(() =>
readableStreamCreateReadResult(value, true, true)
);
}
readableStreamReaderGenericRelease(reader);
return Promise.resolve(
readableStreamCreateReadResult(value, true, true),
);
},
}, AsyncIteratorPrototype);
class ReadableStream {
constructor(
underlyingSource = {},
strategy = {},
) {
initializeReadableStream(this);
const { size } = strategy;
let { highWaterMark } = strategy;
const { type } = underlyingSource;
if (isUnderlyingByteSource(underlyingSource)) {
if (size !== undefined) {
throw new RangeError(
`When underlying source is "bytes", strategy.size must be undefined.`,
);
}
highWaterMark = validateAndNormalizeHighWaterMark(highWaterMark ?? 0);
setUpReadableByteStreamControllerFromUnderlyingSource(
this,
underlyingSource,
highWaterMark,
);
} else if (type === undefined) {
const sizeAlgorithm = makeSizeAlgorithmFromSizeFunction(size);
highWaterMark = validateAndNormalizeHighWaterMark(highWaterMark ?? 1);
setUpReadableStreamDefaultControllerFromUnderlyingSource(
this,
underlyingSource,
highWaterMark,
sizeAlgorithm,
);
} else {
throw new RangeError(
`Valid values for underlyingSource are "bytes" or undefined. Received: "${type}".`,
);
}
}
get locked() {
if (!isReadableStream(this)) {
throw new TypeError("Invalid ReadableStream.");
}
return isReadableStreamLocked(this);
}
cancel(reason) {
if (!isReadableStream(this)) {
return Promise.reject(new TypeError("Invalid ReadableStream."));
}
if (isReadableStreamLocked(this)) {
return Promise.reject(
new TypeError("Cannot cancel a locked ReadableStream."),
);
}
return readableStreamCancel(this, reason);
}
getIterator({
preventCancel,
} = {}) {
if (!isReadableStream(this)) {
throw new TypeError("Invalid ReadableStream.");
}
const reader = acquireReadableStreamDefaultReader(this);
const iterator = Object.create(ReadableStreamAsyncIteratorPrototype);
iterator[sym.asyncIteratorReader] = reader;
iterator[sym.preventCancel] = Boolean(preventCancel);
return iterator;
}
getReader({ mode } = {}) {
if (!isReadableStream(this)) {
throw new TypeError("Invalid ReadableStream.");
}
if (mode === undefined) {
return acquireReadableStreamDefaultReader(this, true);
}
mode = String(mode);
// 3.2.5.4.4 If mode is "byob", return ? AcquireReadableStreamBYOBReader(this, true).
throw new RangeError(`Unsupported mode "${mode}"`);
}
pipeThrough(
{
writable,
readable,
},
{ preventClose, preventAbort, preventCancel, signal } = {},
) {
if (!isReadableStream(this)) {
throw new TypeError("Invalid ReadableStream.");
}
if (!isWritableStream(writable)) {
throw new TypeError("writable is not a valid WritableStream.");
}
if (!isReadableStream(readable)) {
throw new TypeError("readable is not a valid ReadableStream.");
}
preventClose = Boolean(preventClose);
preventAbort = Boolean(preventAbort);
preventCancel = Boolean(preventCancel);
if (signal && !(signal instanceof AbortSignal)) {
throw new TypeError("Invalid signal.");
}
if (isReadableStreamLocked(this)) {
throw new TypeError("ReadableStream is locked.");
}
if (isWritableStreamLocked(writable)) {
throw new TypeError("writable is locked.");
}
const promise = readableStreamPipeTo(
this,
writable,
preventClose,
preventAbort,
preventCancel,
signal,
);
setPromiseIsHandledToTrue(promise);
return readable;
}
pipeTo(
dest,
{ preventClose, preventAbort, preventCancel, signal } = {},
) {
if (!isReadableStream(this)) {
return Promise.reject(new TypeError("Invalid ReadableStream."));
}
if (!isWritableStream(dest)) {
return Promise.reject(
new TypeError("dest is not a valid WritableStream."),
);
}
preventClose = Boolean(preventClose);
preventAbort = Boolean(preventAbort);
preventCancel = Boolean(preventCancel);
if (signal && !(signal instanceof AbortSignal)) {
return Promise.reject(new TypeError("Invalid signal."));
}
if (isReadableStreamLocked(this)) {
return Promise.reject(new TypeError("ReadableStream is locked."));
}
if (isWritableStreamLocked(dest)) {
return Promise.reject(new TypeError("dest is locked."));
}
return readableStreamPipeTo(
this,
dest,
preventClose,
preventAbort,
preventCancel,
signal,
);
}
tee() {
if (!isReadableStream(this)) {
throw new TypeError("Invalid ReadableStream.");
}
return readableStreamTee(this, false);
}
[customInspect]() {
return `${this.constructor.name} { locked: ${String(this.locked)} }`;
}
[Symbol.asyncIterator](
options = {},
) {
return this.getIterator(options);
}
}
class TransformStream {
constructor(
transformer = {},
writableStrategy = {},
readableStrategy = {},
) {
const writableSizeFunction = writableStrategy.size;
let writableHighWaterMark = writableStrategy.highWaterMark;
const readableSizeFunction = readableStrategy.size;
let readableHighWaterMark = readableStrategy.highWaterMark;
const writableType = transformer.writableType;
if (writableType !== undefined) {
throw new RangeError(
`Expected transformer writableType to be undefined, received "${
String(writableType)
}"`,
);
}
const writableSizeAlgorithm = makeSizeAlgorithmFromSizeFunction(
writableSizeFunction,
);
if (writableHighWaterMark === undefined) {
writableHighWaterMark = 1;
}
writableHighWaterMark = validateAndNormalizeHighWaterMark(
writableHighWaterMark,
);
const readableType = transformer.readableType;
if (readableType !== undefined) {
throw new RangeError(
`Expected transformer readableType to be undefined, received "${
String(readableType)
}"`,
);
}
const readableSizeAlgorithm = makeSizeAlgorithmFromSizeFunction(
readableSizeFunction,
);
if (readableHighWaterMark === undefined) {
readableHighWaterMark = 1;
}
readableHighWaterMark = validateAndNormalizeHighWaterMark(
readableHighWaterMark,
);
const startPromise = getDeferred();
initializeTransformStream(
this,
startPromise.promise,
writableHighWaterMark,
writableSizeAlgorithm,
readableHighWaterMark,
readableSizeAlgorithm,
);
// the brand check expects this, and the brand check occurs in the following
// but the property hasn't been defined.
Object.defineProperty(this, sym.transformStreamController, {
value: undefined,
writable: true,
configurable: true,
});
setUpTransformStreamDefaultControllerFromTransformer(this, transformer);
const startResult = invokeOrNoop(
transformer,
"start",
this[sym.transformStreamController],
);
startPromise.resolve(startResult);
}
get readable() {
if (!isTransformStream(this)) {
throw new TypeError("Invalid TransformStream.");
}
return this[sym.readable];
}
get writable() {
if (!isTransformStream(this)) {
throw new TypeError("Invalid TransformStream.");
}
return this[sym.writable];
}
[customInspect]() {
return `${this.constructor.name} {\n readable: ${
inspect(this.readable)
}\n writable: ${inspect(this.writable)}\n}`;
}
}
class TransformStreamDefaultController {
constructor() {
throw new TypeError(
"TransformStreamDefaultController's constructor cannot be called.",
);
}
get desiredSize() {
if (!isTransformStreamDefaultController(this)) {
throw new TypeError("Invalid TransformStreamDefaultController.");
}
const readableController = this[sym.controlledTransformStream][
sym.readable
][sym.readableStreamController];
return readableStreamDefaultControllerGetDesiredSize(
readableController,
);
}
enqueue(chunk) {
if (!isTransformStreamDefaultController(this)) {
throw new TypeError("Invalid TransformStreamDefaultController.");
}
transformStreamDefaultControllerEnqueue(this, chunk);
}
error(reason) {
if (!isTransformStreamDefaultController(this)) {
throw new TypeError("Invalid TransformStreamDefaultController.");
}
transformStreamDefaultControllerError(this, reason);
}
terminate() {
if (!isTransformStreamDefaultController(this)) {
throw new TypeError("Invalid TransformStreamDefaultController.");
}
transformStreamDefaultControllerTerminate(this);
}
[customInspect]() {
return `${this.constructor.name} { desiredSize: ${
String(this.desiredSize)
} }`;
}
}
class WritableStreamDefaultController {
constructor() {
throw new TypeError(
"WritableStreamDefaultController's constructor cannot be called.",
);
}
error(e) {
if (!isWritableStreamDefaultController(this)) {
throw new TypeError("Invalid WritableStreamDefaultController.");
}
const state = this[sym.controlledWritableStream][sym.state];
if (state !== "writable") {
return;
}
writableStreamDefaultControllerError(this, e);
}
[sym.abortSteps](reason) {
const result = this[sym.abortAlgorithm](reason);
writableStreamDefaultControllerClearAlgorithms(this);
return result;
}
[sym.errorSteps]() {
resetQueue(this);
}
[customInspect]() {
return `${this.constructor.name} { }`;
}
}
class WritableStreamDefaultWriter {
constructor(stream) {
if (!isWritableStream(stream)) {
throw new TypeError("Invalid stream.");
}
if (isWritableStreamLocked(stream)) {
throw new TypeError("Cannot create a writer for a locked stream.");
}
this[sym.ownerWritableStream] = stream;
stream[sym.writer] = this;
const state = stream[sym.state];
if (state === "writable") {
if (
!writableStreamCloseQueuedOrInFlight(stream) &&
stream[sym.backpressure]
) {
this[sym.readyPromise] = getDeferred();
} else {
this[sym.readyPromise] = { promise: Promise.resolve() };
}
this[sym.closedPromise] = getDeferred();
} else if (state === "erroring") {
this[sym.readyPromise] = {
promise: Promise.reject(stream[sym.storedError]),
};
setPromiseIsHandledToTrue(this[sym.readyPromise].promise);
this[sym.closedPromise] = getDeferred();
} else if (state === "closed") {
this[sym.readyPromise] = { promise: Promise.resolve() };
this[sym.closedPromise] = { promise: Promise.resolve() };
} else {
assert(state === "errored");
const storedError = stream[sym.storedError];
this[sym.readyPromise] = { promise: Promise.reject(storedError) };
setPromiseIsHandledToTrue(this[sym.readyPromise].promise);
this[sym.closedPromise] = { promise: Promise.reject(storedError) };
setPromiseIsHandledToTrue(this[sym.closedPromise].promise);
}
}
get closed() {
if (!isWritableStreamDefaultWriter(this)) {
return Promise.reject(
new TypeError("Invalid WritableStreamDefaultWriter."),
);
}
return this[sym.closedPromise].promise;
}
get desiredSize() {
if (!isWritableStreamDefaultWriter(this)) {
throw new TypeError("Invalid WritableStreamDefaultWriter.");
}
if (!this[sym.ownerWritableStream]) {
throw new TypeError("WritableStreamDefaultWriter has no owner.");
}
return writableStreamDefaultWriterGetDesiredSize(this);
}
get ready() {
if (!isWritableStreamDefaultWriter(this)) {
return Promise.reject(
new TypeError("Invalid WritableStreamDefaultWriter."),
);
}
return this[sym.readyPromise].promise;
}
abort(reason) {
if (!isWritableStreamDefaultWriter(this)) {
return Promise.reject(
new TypeError("Invalid WritableStreamDefaultWriter."),
);
}
if (!this[sym.ownerWritableStream]) {
Promise.reject(
new TypeError("WritableStreamDefaultWriter has no owner."),
);
}
return writableStreamDefaultWriterAbort(this, reason);
}
close() {
if (!isWritableStreamDefaultWriter(this)) {
return Promise.reject(
new TypeError("Invalid WritableStreamDefaultWriter."),
);
}
const stream = this[sym.ownerWritableStream];
if (!stream) {
Promise.reject(
new TypeError("WritableStreamDefaultWriter has no owner."),
);
}
if (writableStreamCloseQueuedOrInFlight(stream)) {
Promise.reject(
new TypeError("Stream is in an invalid state to be closed."),
);
}
return writableStreamDefaultWriterClose(this);
}
releaseLock() {
if (!isWritableStreamDefaultWriter(this)) {
throw new TypeError("Invalid WritableStreamDefaultWriter.");
}
const stream = this[sym.ownerWritableStream];
if (!stream) {
return;
}
assert(stream[sym.writer]);
writableStreamDefaultWriterRelease(this);
}
write(chunk) {
if (!isWritableStreamDefaultWriter(this)) {
return Promise.reject(
new TypeError("Invalid WritableStreamDefaultWriter."),
);
}
if (!this[sym.ownerWritableStream]) {
Promise.reject(
new TypeError("WritableStreamDefaultWriter has no owner."),
);
}
return writableStreamDefaultWriterWrite(this, chunk);
}
[customInspect]() {
return `${this.constructor.name} { closed: Promise, desiredSize: ${
String(this.desiredSize)
}, ready: Promise }`;
}
}
class WritableStream {
constructor(
underlyingSink = {},
strategy = {},
) {
initializeWritableStream(this);
const size = strategy.size;
let highWaterMark = strategy.highWaterMark ?? 1;
const { type } = underlyingSink;
if (type !== undefined) {
throw new RangeError(`Sink type of "${String(type)}" not supported.`);
}
const sizeAlgorithm = makeSizeAlgorithmFromSizeFunction(size);
highWaterMark = validateAndNormalizeHighWaterMark(highWaterMark);
setUpWritableStreamDefaultControllerFromUnderlyingSink(
this,
underlyingSink,
highWaterMark,
sizeAlgorithm,
);
}
get locked() {
if (!isWritableStream(this)) {
throw new TypeError("Invalid WritableStream.");
}
return isWritableStreamLocked(this);
}
abort(reason) {
if (!isWritableStream(this)) {
return Promise.reject(new TypeError("Invalid WritableStream."));
}
if (isWritableStreamLocked(this)) {
return Promise.reject(
new TypeError("Cannot abort a locked WritableStream."),
);
}
return writableStreamAbort(this, reason);
}
close() {
if (!isWritableStream(this)) {
return Promise.reject(new TypeError("Invalid WritableStream."));
}
if (isWritableStreamLocked(this)) {
return Promise.reject(
new TypeError("Cannot abort a locked WritableStream."),
);
}
if (writableStreamCloseQueuedOrInFlight(this)) {
return Promise.reject(
new TypeError("Cannot close an already closing WritableStream."),
);
}
return writableStreamClose(this);
}
getWriter() {
if (!isWritableStream(this)) {
throw new TypeError("Invalid WritableStream.");
}
return acquireWritableStreamDefaultWriter(this);
}
[customInspect]() {
return `${this.constructor.name} { locked: ${String(this.locked)} }`;
}
}
function acquireReadableStreamDefaultReader(
stream,
forAuthorCode = false,
) {
const reader = new ReadableStreamDefaultReader(stream);
reader[sym.forAuthorCode] = forAuthorCode;
return reader;
}
function acquireWritableStreamDefaultWriter(
stream,
) {
return new WritableStreamDefaultWriter(stream);
}
function call(
fn,
v,
args,
) {
return Function.prototype.apply.call(fn, v, args);
}
function createAlgorithmFromUnderlyingMethod(
underlyingObject,
methodName,
algoArgCount,
...extraArgs
) {
const method = underlyingObject[methodName];
if (method) {
if (!isCallable(method)) {
throw new TypeError("method is not callable");
}
if (algoArgCount === 0) {
return async () => call(method, underlyingObject, extraArgs);
} else {
return async (arg) => {
const fullArgs = [arg, ...extraArgs];
return call(method, underlyingObject, fullArgs);
};
}
}
return async () => undefined;
}
function createReadableStream(
startAlgorithm,
pullAlgorithm,
cancelAlgorithm,
highWaterMark = 1,
sizeAlgorithm = () => 1,
) {
highWaterMark = validateAndNormalizeHighWaterMark(highWaterMark);
const stream = Object.create(
ReadableStream.prototype,
);
initializeReadableStream(stream);
const controller = Object.create(
ReadableStreamDefaultController.prototype,
);
setUpReadableStreamDefaultController(
stream,
controller,
startAlgorithm,
pullAlgorithm,
cancelAlgorithm,
highWaterMark,
sizeAlgorithm,
);
return stream;
}
function createWritableStream(
startAlgorithm,
writeAlgorithm,
closeAlgorithm,
abortAlgorithm,
highWaterMark = 1,
sizeAlgorithm = () => 1,
) {
highWaterMark = validateAndNormalizeHighWaterMark(highWaterMark);
const stream = Object.create(WritableStream.prototype);
initializeWritableStream(stream);
const controller = Object.create(
WritableStreamDefaultController.prototype,
);
setUpWritableStreamDefaultController(
stream,
controller,
startAlgorithm,
writeAlgorithm,
closeAlgorithm,
abortAlgorithm,
highWaterMark,
sizeAlgorithm,
);
return stream;
}
function dequeueValue(container) {
assert(sym.queue in container && sym.queueTotalSize in container);
assert(container[sym.queue].length);
const pair = container[sym.queue].shift();
container[sym.queueTotalSize] -= pair.size;
if (container[sym.queueTotalSize] <= 0) {
container[sym.queueTotalSize] = 0;
}
return pair.value;
}
function enqueueValueWithSize(
container,
value,
size,
) {
assert(sym.queue in container && sym.queueTotalSize in container);
size = Number(size);
if (!isFiniteNonNegativeNumber(size)) {
throw new RangeError("size must be a finite non-negative number.");
}
container[sym.queue].push({ value, size });
container[sym.queueTotalSize] += size;
}
/** Non-spec mechanism to "unwrap" a promise and store it to be resolved
* later. */
function getDeferred() {
let resolve;
let reject;
const promise = new Promise((res, rej) => {
resolve = res;
reject = rej;
});
return { promise, resolve: resolve, reject: reject };
}
function initializeReadableStream(
stream,
) {
stream[sym.state] = "readable";
stream[sym.reader] = stream[sym.storedError] = undefined;
stream[sym.disturbed] = false;
}
function initializeTransformStream(
stream,
startPromise,
writableHighWaterMark,
writableSizeAlgorithm,
readableHighWaterMark,
readableSizeAlgorithm,
) {
const startAlgorithm = () => startPromise;
const writeAlgorithm = (chunk) =>
transformStreamDefaultSinkWriteAlgorithm(stream, chunk);
const abortAlgorithm = (reason) =>
transformStreamDefaultSinkAbortAlgorithm(stream, reason);
const closeAlgorithm = () =>
transformStreamDefaultSinkCloseAlgorithm(stream);
stream[sym.writable] = createWritableStream(
startAlgorithm,
writeAlgorithm,
closeAlgorithm,
abortAlgorithm,
writableHighWaterMark,
writableSizeAlgorithm,
);
const pullAlgorithm = () =>
transformStreamDefaultSourcePullAlgorithm(stream);
const cancelAlgorithm = (reason) => {
transformStreamErrorWritableAndUnblockWrite(stream, reason);
return Promise.resolve(undefined);
};
stream[sym.readable] = createReadableStream(
startAlgorithm,
pullAlgorithm,
cancelAlgorithm,
readableHighWaterMark,
readableSizeAlgorithm,
);
stream[sym.backpressure] = stream[sym.backpressureChangePromise] =
undefined;
transformStreamSetBackpressure(stream, true);
Object.defineProperty(stream, sym.transformStreamController, {
value: undefined,
configurable: true,
});
}
function initializeWritableStream(
stream,
) {
stream[sym.state] = "writable";
stream[sym.storedError] = stream[sym.writer] = stream[
sym.writableStreamController
] = stream[sym.inFlightWriteRequest] = stream[sym.closeRequest] = stream[
sym.inFlightCloseRequest
] = stream[sym.pendingAbortRequest] = undefined;
stream[sym.writeRequests] = [];
stream[sym.backpressure] = false;
}
function invokeOrNoop(
o,
p,
...args
) {
assert(o);
const method = o[p];
if (!method) {
return undefined;
}
return call(method, o, args);
}
function isCallable(value) {
return typeof value === "function";
}
function isDetachedBuffer(value) {
return sym.isFakeDetached in value;
}
function isFiniteNonNegativeNumber(v) {
return Number.isFinite(v) && (v) >= 0;
}
function isReadableByteStreamController(
x,
) {
return !(
typeof x !== "object" ||
x === null ||
!(sym.controlledReadableByteStream in x)
);
}
function isReadableStream(x) {
return !(
typeof x !== "object" ||
x === null ||
!(sym.readableStreamController in x)
);
}
function isReadableStreamAsyncIterator(
x,
) {
if (typeof x !== "object" || x === null) {
return false;
}
return sym.asyncIteratorReader in x;
}
function isReadableStreamDefaultController(
x,
) {
return !(
typeof x !== "object" ||
x === null ||
!(sym.controlledReadableStream in x)
);
}
function isReadableStreamDefaultReader(
x,
) {
return !(typeof x !== "object" || x === null || !(sym.readRequests in x));
}
function isReadableStreamLocked(stream) {
assert(isReadableStream(stream));
return !!stream[sym.reader];
}
function isReadableStreamDisturbed(stream) {
assert(isReadableStream(stream));
return !!stream[sym.disturbed];
}
function isTransformStream(x) {
return !(
typeof x !== "object" ||
x === null ||
!(sym.transformStreamController in x)
);
}
function isTransformStreamDefaultController(
x,
) {
return !(
typeof x !== "object" ||
x === null ||
!(sym.controlledTransformStream in x)
);
}
function isUnderlyingByteSource(
underlyingSource,
) {
const { type } = underlyingSource;
const typeString = String(type);
return typeString === "bytes";
}
function isWritableStream(x) {
return !(
typeof x !== "object" ||
x === null ||
!(sym.writableStreamController in x)
);
}
function isWritableStreamDefaultController(
x,
) {
return !(
typeof x !== "object" ||
x === null ||
!(sym.controlledWritableStream in x)
);
}
function isWritableStreamDefaultWriter(
x,
) {
return !(
typeof x !== "object" ||
x === null ||
!(sym.ownerWritableStream in x)
);
}
function isWritableStreamLocked(stream) {
assert(isWritableStream(stream));
return stream[sym.writer] !== undefined;
}
function makeSizeAlgorithmFromSizeFunction(
size,
) {
if (size === undefined) {
return () => 1;
}
if (typeof size !== "function") {
throw new TypeError("size must be callable.");
}
return (chunk) => {
return size.call(undefined, chunk);
};
}
function peekQueueValue(container) {
assert(sym.queue in container && sym.queueTotalSize in container);
assert(container[sym.queue].length);
const [pair] = container[sym.queue];
return pair.value;
}
function readableByteStreamControllerShouldCallPull(
controller,
) {
const stream = controller[sym.controlledReadableByteStream];
if (
stream[sym.state] !== "readable" ||
controller[sym.closeRequested] ||
!controller[sym.started]
) {
return false;
}
if (
readableStreamHasDefaultReader(stream) &&
readableStreamGetNumReadRequests(stream) > 0
) {
return true;
}
// 3.13.25.6 If ! ReadableStreamHasBYOBReader(stream) is true and !
// ReadableStreamGetNumReadIntoRequests(stream) > 0, return true.
const desiredSize = readableByteStreamControllerGetDesiredSize(controller);
assert(desiredSize !== null);
return desiredSize > 0;
}
function readableByteStreamControllerCallPullIfNeeded(
controller,
) {
const shouldPull = readableByteStreamControllerShouldCallPull(controller);
if (!shouldPull) {
return;
}
if (controller[sym.pulling]) {
controller[sym.pullAgain] = true;
return;
}
assert(controller[sym.pullAgain] === false);
controller[sym.pulling] = true;
const pullPromise = controller[sym.pullAlgorithm]();
setPromiseIsHandledToTrue(
pullPromise.then(
() => {
controller[sym.pulling] = false;
if (controller[sym.pullAgain]) {
controller[sym.pullAgain] = false;
readableByteStreamControllerCallPullIfNeeded(controller);
}
},
(e) => {
readableByteStreamControllerError(controller, e);
},
),
);
}
function readableByteStreamControllerClearAlgorithms(
controller,
) {
controller[sym.pullAlgorithm] = undefined;
controller[sym.cancelAlgorithm] = undefined;
}
function readableByteStreamControllerClose(
controller,
) {
const stream = controller[sym.controlledReadableByteStream];
if (controller[sym.closeRequested] || stream[sym.state] !== "readable") {
return;
}
if (controller[sym.queueTotalSize] > 0) {
controller[sym.closeRequested] = true;
return;
}
// 3.13.6.4 If controller.[[pendingPullIntos]] is not empty, (BYOB Support)
readableByteStreamControllerClearAlgorithms(controller);
readableStreamClose(stream);
}
function readableByteStreamControllerEnqueue(
controller,
chunk,
) {
const stream = controller[sym.controlledReadableByteStream];
if (controller[sym.closeRequested] || stream[sym.state] !== "readable") {
return;
}
const { buffer, byteOffset, byteLength } = chunk;
const transferredBuffer = transferArrayBuffer(buffer);
if (readableStreamHasDefaultReader(stream)) {
if (readableStreamGetNumReadRequests(stream) === 0) {
readableByteStreamControllerEnqueueChunkToQueue(
controller,
transferredBuffer,
byteOffset,
byteLength,
);
} else {
assert(controller[sym.queue].length === 0);
const transferredView = new Uint8Array(
transferredBuffer,
byteOffset,
byteLength,
);
readableStreamFulfillReadRequest(stream, transferredView, false);
}
// 3.13.9.8 Otherwise, if ! ReadableStreamHasBYOBReader(stream) is true
} else {
assert(!isReadableStreamLocked(stream));
readableByteStreamControllerEnqueueChunkToQueue(
controller,
transferredBuffer,
byteOffset,
byteLength,
);
}
readableByteStreamControllerCallPullIfNeeded(controller);
}
function readableByteStreamControllerEnqueueChunkToQueue(
controller,
buffer,
byteOffset,
byteLength,
) {
controller[sym.queue].push({
value: buffer,
offset: byteOffset,
size: byteLength,
});
controller[sym.queueTotalSize] += byteLength;
}
function readableByteStreamControllerError(
controller,
e,
) {
const stream = controller[sym.controlledReadableByteStream];
if (stream[sym.state] !== "readable") {
return;
}
// 3.13.11.3 Perform ! ReadableByteStreamControllerClearPendingPullIntos(controller).
resetQueue(controller);
readableByteStreamControllerClearAlgorithms(controller);
readableStreamError(stream, e);
}
function readableByteStreamControllerGetDesiredSize(
controller,
) {
const stream = controller[sym.controlledReadableByteStream];
const state = stream[sym.state];
if (state === "errored") {
return null;
}
if (state === "closed") {
return 0;
}
return controller[sym.strategyHWM] - controller[sym.queueTotalSize];
}
function readableByteStreamControllerHandleQueueDrain(
controller,
) {
assert(
controller[sym.controlledReadableByteStream][sym.state] === "readable",
);
if (
controller[sym.queueTotalSize] === 0 && controller[sym.closeRequested]
) {
readableByteStreamControllerClearAlgorithms(controller);
readableStreamClose(controller[sym.controlledReadableByteStream]);
} else {
readableByteStreamControllerCallPullIfNeeded(controller);
}
}
function readableStreamAddReadRequest(
stream,
) {
assert(isReadableStreamDefaultReader(stream[sym.reader]));
assert(stream[sym.state] === "readable");
const promise = getDeferred();
stream[sym.reader][sym.readRequests].push(promise);
return promise.promise;
}
function readableStreamCancel(
stream,
reason,
) {
stream[sym.disturbed] = true;
if (stream[sym.state] === "closed") {
return Promise.resolve();
}
if (stream[sym.state] === "errored") {
return Promise.reject(stream[sym.storedError]);
}
readableStreamClose(stream);
return stream[sym.readableStreamController][sym.cancelSteps](reason).then(
() => undefined,
);
}
function readableStreamClose(stream) {
assert(stream[sym.state] === "readable");
stream[sym.state] = "closed";
const reader = stream[sym.reader];
if (!reader) {
return;
}
if (isReadableStreamDefaultReader(reader)) {
for (const readRequest of reader[sym.readRequests]) {
assert(readRequest.resolve);
readRequest.resolve(
readableStreamCreateReadResult(
undefined,
true,
reader[sym.forAuthorCode],
),
);
}
reader[sym.readRequests] = [];
}
const resolve = reader[sym.closedPromise].resolve;
assert(resolve);
resolve();
}
function readableStreamCreateReadResult(
value,
done,
forAuthorCode,
) {
const prototype = forAuthorCode ? Object.prototype : null;
assert(typeof done === "boolean");
const obj = Object.create(prototype);
Object.defineProperties(obj, {
value: { value, writable: true, enumerable: true, configurable: true },
done: {
value: done,
writable: true,
enumerable: true,
configurable: true,
},
});
return obj;
}
function readableStreamDefaultControllerCallPullIfNeeded(
controller,
) {
const shouldPull = readableStreamDefaultControllerShouldCallPull(
controller,
);
if (!shouldPull) {
return;
}
if (controller[sym.pulling]) {
controller[sym.pullAgain] = true;
return;
}
assert(controller[sym.pullAgain] === false);
controller[sym.pulling] = true;
const pullPromise = controller[sym.pullAlgorithm]();
pullPromise.then(
() => {
controller[sym.pulling] = false;
if (controller[sym.pullAgain]) {
controller[sym.pullAgain] = false;
readableStreamDefaultControllerCallPullIfNeeded(controller);
}
},
(e) => {
readableStreamDefaultControllerError(controller, e);
},
);
}
function readableStreamDefaultControllerCanCloseOrEnqueue(
controller,
) {
const state = controller[sym.controlledReadableStream][sym.state];
return !controller[sym.closeRequested] && state === "readable";
}
function readableStreamDefaultControllerClearAlgorithms(
controller,
) {
controller[sym.pullAlgorithm] = undefined;
controller[sym.cancelAlgorithm] = undefined;
controller[sym.strategySizeAlgorithm] = undefined;
}
function readableStreamDefaultControllerClose(
controller,
) {
if (!readableStreamDefaultControllerCanCloseOrEnqueue(controller)) {
return;
}
const stream = controller[sym.controlledReadableStream];
controller[sym.closeRequested] = true;
if (controller[sym.queue].length === 0) {
readableStreamDefaultControllerClearAlgorithms(controller);
readableStreamClose(stream);
}
}
function readableStreamDefaultControllerEnqueue(
controller,
chunk,
) {
if (!readableStreamDefaultControllerCanCloseOrEnqueue(controller)) {
return;
}
const stream = controller[sym.controlledReadableStream];
if (
isReadableStreamLocked(stream) &&
readableStreamGetNumReadRequests(stream) > 0
) {
readableStreamFulfillReadRequest(stream, chunk, false);
} else {
try {
const chunkSize = controller[sym.strategySizeAlgorithm](chunk);
enqueueValueWithSize(controller, chunk, chunkSize);
} catch (err) {
readableStreamDefaultControllerError(controller, err);
throw err;
}
}
readableStreamDefaultControllerCallPullIfNeeded(controller);
}
function readableStreamDefaultControllerGetDesiredSize(
controller,
) {
const stream = controller[sym.controlledReadableStream];
const state = stream[sym.state];
if (state === "errored") {
return null;
}
if (state === "closed") {
return 0;
}
return controller[sym.strategyHWM] - controller[sym.queueTotalSize];
}
function readableStreamDefaultControllerError(
controller,
e,
) {
const stream = controller[sym.controlledReadableStream];
if (stream[sym.state] !== "readable") {
return;
}
resetQueue(controller);
readableStreamDefaultControllerClearAlgorithms(controller);
readableStreamError(stream, e);
}
function readableStreamDefaultControllerHasBackpressure(
controller,
) {
return readableStreamDefaultControllerShouldCallPull(controller);
}
function readableStreamDefaultControllerShouldCallPull(
controller,
) {
const stream = controller[sym.controlledReadableStream];
if (
!readableStreamDefaultControllerCanCloseOrEnqueue(controller) ||
controller[sym.started] === false
) {
return false;
}
if (
isReadableStreamLocked(stream) &&
readableStreamGetNumReadRequests(stream) > 0
) {
return true;
}
const desiredSize = readableStreamDefaultControllerGetDesiredSize(
controller,
);
assert(desiredSize !== null);
return desiredSize > 0;
}
function readableStreamDefaultReaderRead(
reader,
) {
const stream = reader[sym.ownerReadableStream];
assert(stream);
stream[sym.disturbed] = true;
if (stream[sym.state] === "closed") {
return Promise.resolve(
readableStreamCreateReadResult(
undefined,
true,
reader[sym.forAuthorCode],
),
);
}
if (stream[sym.state] === "errored") {
return Promise.reject(stream[sym.storedError]);
}
assert(stream[sym.state] === "readable");
return (stream[
sym.readableStreamController
])[sym.pullSteps]();
}
function readableStreamError(stream, e) {
assert(isReadableStream(stream));
assert(stream[sym.state] === "readable");
stream[sym.state] = "errored";
stream[sym.storedError] = e;
const reader = stream[sym.reader];
if (reader === undefined) {
return;
}
if (isReadableStreamDefaultReader(reader)) {
for (const readRequest of reader[sym.readRequests]) {
assert(readRequest.reject);
readRequest.reject(e);
readRequest.reject = undefined;
readRequest.resolve = undefined;
}
reader[sym.readRequests] = [];
}
// 3.5.6.8 Otherwise, support BYOB Reader
reader[sym.closedPromise].reject(e);
reader[sym.closedPromise].reject = undefined;
reader[sym.closedPromise].resolve = undefined;
setPromiseIsHandledToTrue(reader[sym.closedPromise].promise);
}
function readableStreamFulfillReadRequest(
stream,
chunk,
done,
) {
const reader = stream[sym.reader];
const readRequest = reader[sym.readRequests].shift();
assert(readRequest.resolve);
readRequest.resolve(
readableStreamCreateReadResult(chunk, done, reader[sym.forAuthorCode]),
);
}
function readableStreamGetNumReadRequests(
stream,
) {
return stream[sym.reader]?.[sym.readRequests].length ?? 0;
}
function readableStreamHasDefaultReader(
stream,
) {
const reader = stream[sym.reader];
return !(reader === undefined || !isReadableStreamDefaultReader(reader));
}
function readableStreamPipeTo(
source,
dest,
preventClose,
preventAbort,
preventCancel,
signal,
) {
assert(isReadableStream(source));
assert(isWritableStream(dest));
assert(
typeof preventClose === "boolean" &&
typeof preventAbort === "boolean" &&
typeof preventCancel === "boolean",
);
assert(signal === undefined || signal instanceof AbortSignal);
assert(!isReadableStreamLocked(source));
assert(!isWritableStreamLocked(dest));
const reader = acquireReadableStreamDefaultReader(source);
const writer = acquireWritableStreamDefaultWriter(dest);
source[sym.disturbed] = true;
let shuttingDown = false;
const promise = getDeferred();
let abortAlgorithm;
if (signal) {
abortAlgorithm = () => {
const error = new DOMException("Abort signal received.", "AbortSignal");
const actions = [];
if (!preventAbort) {
actions.push(() => {
if (dest[sym.state] === "writable") {
return writableStreamAbort(dest, error);
} else {
return Promise.resolve(undefined);
}
});
}
if (!preventCancel) {
actions.push(() => {
if (source[sym.state] === "readable") {
return readableStreamCancel(source, error);
} else {
return Promise.resolve(undefined);
}
});
}
shutdownWithAction(
() => Promise.all(actions.map((action) => action())),
true,
error,
);
};
if (signal.aborted) {
abortAlgorithm();
return promise.promise;
}
signal.addEventListener("abort", abortAlgorithm);
}
let currentWrite = Promise.resolve();
// At this point, the spec becomes non-specific and vague. Most of the rest
// of this code is based on the reference implementation that is part of the
// specification. This is why the functions are only scoped to this function
// to ensure they don't leak into the spec compliant parts.
function isOrBecomesClosed(
stream,
promise,
action,
) {
if (stream[sym.state] === "closed") {
action();
} else {
setPromiseIsHandledToTrue(promise.then(action));
}
}
function isOrBecomesErrored(
stream,
promise,
action,
) {
if (stream[sym.state] === "errored") {
action(stream[sym.storedError]);
} else {
setPromiseIsHandledToTrue(promise.catch((error) => action(error)));
}
}
function finalize(isError, error) {
writableStreamDefaultWriterRelease(writer);
readableStreamReaderGenericRelease(reader);
if (signal) {
signal.removeEventListener("abort", abortAlgorithm);
}
if (isError) {
promise.reject(error);
} else {
promise.resolve();
}
}
function waitForWritesToFinish() {
const oldCurrentWrite = currentWrite;
return currentWrite.then(() =>
oldCurrentWrite !== currentWrite ? waitForWritesToFinish() : undefined
);
}
function shutdownWithAction(
action,
originalIsError,
originalError,
) {
function doTheRest() {
setPromiseIsHandledToTrue(
action().then(
() => finalize(originalIsError, originalError),
(newError) => finalize(true, newError),
),
);
}
if (shuttingDown) {
return;
}
shuttingDown = true;
if (
dest[sym.state] === "writable" &&
writableStreamCloseQueuedOrInFlight(dest) === false
) {
setPromiseIsHandledToTrue(waitForWritesToFinish().then(doTheRest));
} else {
doTheRest();
}
}
function shutdown(isError, error) {
if (shuttingDown) {
return;
}
shuttingDown = true;
if (
dest[sym.state] === "writable" &&
!writableStreamCloseQueuedOrInFlight(dest)
) {
setPromiseIsHandledToTrue(
waitForWritesToFinish().then(() => finalize(isError, error)),
);
}
finalize(isError, error);
}
function pipeStep() {
if (shuttingDown) {
return Promise.resolve(true);
}
return writer[sym.readyPromise].promise.then(() => {
return readableStreamDefaultReaderRead(reader).then(
({ value, done }) => {
if (done === true) {
return true;
}
currentWrite = writableStreamDefaultWriterWrite(
writer,
value,
).then(undefined, () => {});
return false;
},
);
});
}
function pipeLoop() {
return new Promise((resolveLoop, rejectLoop) => {
function next(done) {
if (done) {
resolveLoop(undefined);
} else {
setPromiseIsHandledToTrue(pipeStep().then(next, rejectLoop));
}
}
next(false);
});
}
isOrBecomesErrored(
source,
reader[sym.closedPromise].promise,
(storedError) => {
if (!preventAbort) {
shutdownWithAction(
() => writableStreamAbort(dest, storedError),
true,
storedError,
);
} else {
shutdown(true, storedError);
}
},
);
isOrBecomesErrored(
dest,
writer[sym.closedPromise].promise,
(storedError) => {
if (!preventCancel) {
shutdownWithAction(
() => readableStreamCancel(source, storedError),
true,
storedError,
);
} else {
shutdown(true, storedError);
}
},
);
isOrBecomesClosed(source, reader[sym.closedPromise].promise, () => {
if (!preventClose) {
shutdownWithAction(() =>
writableStreamDefaultWriterCloseWithErrorPropagation(writer)
);
}
});
if (
writableStreamCloseQueuedOrInFlight(dest) ||
dest[sym.state] === "closed"
) {
const destClosed = new TypeError(
"The destination writable stream closed before all data could be piped to it.",
);
if (!preventCancel) {
shutdownWithAction(
() => readableStreamCancel(source, destClosed),
true,
destClosed,
);
} else {
shutdown(true, destClosed);
}
}
setPromiseIsHandledToTrue(pipeLoop());
return promise.promise;
}
function readableStreamReaderGenericCancel(
reader,
reason,
) {
const stream = reader[sym.ownerReadableStream];
assert(stream);
return readableStreamCancel(stream, reason);
}
function readableStreamReaderGenericInitialize(
reader,
stream,
) {
reader[sym.forAuthorCode] = true;
reader[sym.ownerReadableStream] = stream;
stream[sym.reader] = reader;
if (stream[sym.state] === "readable") {
reader[sym.closedPromise] = getDeferred();
} else if (stream[sym.state] === "closed") {
reader[sym.closedPromise] = { promise: Promise.resolve() };
} else {
assert(stream[sym.state] === "errored");
reader[sym.closedPromise] = {
promise: Promise.reject(stream[sym.storedError]),
};
setPromiseIsHandledToTrue(reader[sym.closedPromise].promise);
}
}
function readableStreamReaderGenericRelease(
reader,
) {
assert(reader[sym.ownerReadableStream]);
assert(reader[sym.ownerReadableStream][sym.reader] === reader);
const closedPromise = reader[sym.closedPromise];
if (reader[sym.ownerReadableStream][sym.state] === "readable") {
assert(closedPromise.reject);
closedPromise.reject(new TypeError("ReadableStream state is readable."));
} else {
closedPromise.promise = Promise.reject(
new TypeError("Reading is closed."),
);
delete closedPromise.reject;
delete closedPromise.resolve;
}
setPromiseIsHandledToTrue(closedPromise.promise);
reader[sym.ownerReadableStream][sym.reader] = undefined;
reader[sym.ownerReadableStream] = undefined;
}
function readableStreamTee(
stream,
cloneForBranch2,
) {
assert(isReadableStream(stream));
assert(typeof cloneForBranch2 === "boolean");
const reader = acquireReadableStreamDefaultReader(stream);
let reading = false;
let canceled1 = false;
let canceled2 = false;
let reason1 = undefined;
let reason2 = undefined;
/* eslint-disable prefer-const */
let branch1;
let branch2;
/* eslint-enable prefer-const */
const cancelPromise = getDeferred();
const pullAlgorithm = () => {
if (reading) {
return Promise.resolve();
}
reading = true;
const readPromise = readableStreamDefaultReaderRead(reader).then(
(result) => {
reading = false;
assert(typeof result === "object");
const { done } = result;
assert(typeof done === "boolean");
if (done) {
if (!canceled1) {
readableStreamDefaultControllerClose(
branch1[
sym.readableStreamController
],
);
}
if (!canceled2) {
readableStreamDefaultControllerClose(
branch2[
sym.readableStreamController
],
);
}
return;
}
const { value } = result;
const value1 = value;
let value2 = value;
if (!canceled2 && cloneForBranch2) {
value2 = cloneValue(value2);
}
if (!canceled1) {
readableStreamDefaultControllerEnqueue(
branch1[
sym.readableStreamController
],
value1,
);
}
if (!canceled2) {
readableStreamDefaultControllerEnqueue(
branch2[
sym.readableStreamController
],
value2,
);
}
},
);
setPromiseIsHandledToTrue(readPromise);
return Promise.resolve();
};
const cancel1Algorithm = (reason) => {
canceled1 = true;
reason1 = reason;
if (canceled2) {
const compositeReason = [reason1, reason2];
const cancelResult = readableStreamCancel(stream, compositeReason);
cancelPromise.resolve(cancelResult);
}
return cancelPromise.promise;
};
const cancel2Algorithm = (reason) => {
canceled2 = true;
reason2 = reason;
if (canceled1) {
const compositeReason = [reason1, reason2];
const cancelResult = readableStreamCancel(stream, compositeReason);
cancelPromise.resolve(cancelResult);
}
return cancelPromise.promise;
};
const startAlgorithm = () => undefined;
branch1 = createReadableStream(
startAlgorithm,
pullAlgorithm,
cancel1Algorithm,
);
branch2 = createReadableStream(
startAlgorithm,
pullAlgorithm,
cancel2Algorithm,
);
setPromiseIsHandledToTrue(
reader[sym.closedPromise].promise.catch((r) => {
readableStreamDefaultControllerError(
branch1[
sym.readableStreamController
],
r,
);
readableStreamDefaultControllerError(
branch2[
sym.readableStreamController
],
r,
);
}),
);
return [branch1, branch2];
}
function resetQueue(container) {
assert(sym.queue in container && sym.queueTotalSize in container);
container[sym.queue] = [];
container[sym.queueTotalSize] = 0;
}
/** An internal function which mimics the behavior of setting the promise to
* handled in JavaScript. In this situation, an assertion failure, which
* shouldn't happen will get thrown, instead of swallowed. */
function setPromiseIsHandledToTrue(promise) {
promise.then(undefined, (e) => {
if (e && e instanceof AssertionError) {
queueMicrotask(() => {
throw e;
});
}
});
}
function setUpReadableByteStreamController(
stream,
controller,
startAlgorithm,
pullAlgorithm,
cancelAlgorithm,
highWaterMark,
autoAllocateChunkSize,
) {
assert(stream[sym.readableStreamController] === undefined);
if (autoAllocateChunkSize !== undefined) {
assert(Number.isInteger(autoAllocateChunkSize));
assert(autoAllocateChunkSize >= 0);
}
controller[sym.controlledReadableByteStream] = stream;
controller[sym.pulling] = controller[sym.pullAgain] = false;
controller[sym.byobRequest] = undefined;
controller[sym.queue] = [];
controller[sym.queueTotalSize] = 0;
controller[sym.closeRequested] = controller[sym.started] = false;
controller[sym.strategyHWM] = validateAndNormalizeHighWaterMark(
highWaterMark,
);
controller[sym.pullAlgorithm] = pullAlgorithm;
controller[sym.cancelAlgorithm] = cancelAlgorithm;
controller[sym.autoAllocateChunkSize] = autoAllocateChunkSize;
// 3.13.26.12 Set controller.[[pendingPullIntos]] to a new empty List.
stream[sym.readableStreamController] = controller;
const startResult = startAlgorithm();
const startPromise = Promise.resolve(startResult);
setPromiseIsHandledToTrue(
startPromise.then(
() => {
controller[sym.started] = true;
assert(!controller[sym.pulling]);
assert(!controller[sym.pullAgain]);
readableByteStreamControllerCallPullIfNeeded(controller);
},
(r) => {
readableByteStreamControllerError(controller, r);
},
),
);
}
function setUpReadableByteStreamControllerFromUnderlyingSource(
stream,
underlyingByteSource,
highWaterMark,
) {
assert(underlyingByteSource);
const controller = Object.create(
ReadableByteStreamController.prototype,
);
const startAlgorithm = () => {
return invokeOrNoop(underlyingByteSource, "start", controller);
};
const pullAlgorithm = createAlgorithmFromUnderlyingMethod(
underlyingByteSource,
"pull",
0,
controller,
);
setFunctionName(pullAlgorithm, "[[pullAlgorithm]]");
const cancelAlgorithm = createAlgorithmFromUnderlyingMethod(
underlyingByteSource,
"cancel",
1,
);
setFunctionName(cancelAlgorithm, "[[cancelAlgorithm]]");
// 3.13.27.6 Let autoAllocateChunkSize be ? GetV(underlyingByteSource, "autoAllocateChunkSize").
const autoAllocateChunkSize = undefined;
setUpReadableByteStreamController(
stream,
controller,
startAlgorithm,
pullAlgorithm,
cancelAlgorithm,
highWaterMark,
autoAllocateChunkSize,
);
}
function setUpReadableStreamDefaultController(
stream,
controller,
startAlgorithm,
pullAlgorithm,
cancelAlgorithm,
highWaterMark,
sizeAlgorithm,
) {
assert(stream[sym.readableStreamController] === undefined);
controller[sym.controlledReadableStream] = stream;
controller[sym.queue] = [];
controller[sym.queueTotalSize] = 0;
controller[sym.started] = controller[sym.closeRequested] = controller[
sym.pullAgain
] = controller[sym.pulling] = false;
controller[sym.strategySizeAlgorithm] = sizeAlgorithm;
controller[sym.strategyHWM] = highWaterMark;
controller[sym.pullAlgorithm] = pullAlgorithm;
controller[sym.cancelAlgorithm] = cancelAlgorithm;
stream[sym.readableStreamController] = controller;
const startResult = startAlgorithm();
const startPromise = Promise.resolve(startResult);
setPromiseIsHandledToTrue(
startPromise.then(
() => {
controller[sym.started] = true;
assert(controller[sym.pulling] === false);
assert(controller[sym.pullAgain] === false);
readableStreamDefaultControllerCallPullIfNeeded(controller);
},
(r) => {
readableStreamDefaultControllerError(controller, r);
},
),
);
}
function setUpReadableStreamDefaultControllerFromUnderlyingSource(
stream,
underlyingSource,
highWaterMark,
sizeAlgorithm,
) {
assert(underlyingSource);
const controller = Object.create(
ReadableStreamDefaultController.prototype,
);
const startAlgorithm = () =>
invokeOrNoop(underlyingSource, "start", controller);
const pullAlgorithm = createAlgorithmFromUnderlyingMethod(
underlyingSource,
"pull",
0,
controller,
);
setFunctionName(pullAlgorithm, "[[pullAlgorithm]]");
const cancelAlgorithm = createAlgorithmFromUnderlyingMethod(
underlyingSource,
"cancel",
1,
);
setFunctionName(cancelAlgorithm, "[[cancelAlgorithm]]");
setUpReadableStreamDefaultController(
stream,
controller,
startAlgorithm,
pullAlgorithm,
cancelAlgorithm,
highWaterMark,
sizeAlgorithm,
);
}
function setUpTransformStreamDefaultController(
stream,
controller,
transformAlgorithm,
flushAlgorithm,
) {
assert(isTransformStream(stream));
assert(stream[sym.transformStreamController] === undefined);
controller[sym.controlledTransformStream] = stream;
stream[sym.transformStreamController] = controller;
controller[sym.transformAlgorithm] = transformAlgorithm;
controller[sym.flushAlgorithm] = flushAlgorithm;
}
function setUpTransformStreamDefaultControllerFromTransformer(
stream,
transformer,
) {
assert(transformer);
const controller = Object.create(
TransformStreamDefaultController.prototype,
);
let transformAlgorithm = (chunk) => {
try {
transformStreamDefaultControllerEnqueue(
controller,
// it defaults to no tranformation, so I is assumed to be O
chunk,
);
} catch (e) {
return Promise.reject(e);
}
return Promise.resolve();
};
const transformMethod = transformer.transform;
if (transformMethod) {
if (typeof transformMethod !== "function") {
throw new TypeError("tranformer.transform must be callable.");
}
transformAlgorithm = async (chunk) =>
call(transformMethod, transformer, [chunk, controller]);
}
const flushAlgorithm = createAlgorithmFromUnderlyingMethod(
transformer,
"flush",
0,
controller,
);
setUpTransformStreamDefaultController(
stream,
controller,
transformAlgorithm,
flushAlgorithm,
);
}
function setUpWritableStreamDefaultController(
stream,
controller,
startAlgorithm,
writeAlgorithm,
closeAlgorithm,
abortAlgorithm,
highWaterMark,
sizeAlgorithm,
) {
assert(isWritableStream(stream));
assert(stream[sym.writableStreamController] === undefined);
controller[sym.controlledWritableStream] = stream;
stream[sym.writableStreamController] = controller;
controller[sym.queue] = [];
controller[sym.queueTotalSize] = 0;
controller[sym.started] = false;
controller[sym.strategySizeAlgorithm] = sizeAlgorithm;
controller[sym.strategyHWM] = highWaterMark;
controller[sym.writeAlgorithm] = writeAlgorithm;
controller[sym.closeAlgorithm] = closeAlgorithm;
controller[sym.abortAlgorithm] = abortAlgorithm;
const backpressure = writableStreamDefaultControllerGetBackpressure(
controller,
);
writableStreamUpdateBackpressure(stream, backpressure);
const startResult = startAlgorithm();
const startPromise = Promise.resolve(startResult);
setPromiseIsHandledToTrue(
startPromise.then(
() => {
assert(
stream[sym.state] === "writable" ||
stream[sym.state] === "erroring",
);
controller[sym.started] = true;
writableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
},
(r) => {
assert(
stream[sym.state] === "writable" ||
stream[sym.state] === "erroring",
);
controller[sym.started] = true;
writableStreamDealWithRejection(stream, r);
},
),
);
}
function setUpWritableStreamDefaultControllerFromUnderlyingSink(
stream,
underlyingSink,
highWaterMark,
sizeAlgorithm,
) {
assert(underlyingSink);
const controller = Object.create(
WritableStreamDefaultController.prototype,
);
const startAlgorithm = () => {
return invokeOrNoop(underlyingSink, "start", controller);
};
const writeAlgorithm = createAlgorithmFromUnderlyingMethod(
underlyingSink,
"write",
1,
controller,
);
setFunctionName(writeAlgorithm, "[[writeAlgorithm]]");
const closeAlgorithm = createAlgorithmFromUnderlyingMethod(
underlyingSink,
"close",
0,
);
setFunctionName(closeAlgorithm, "[[closeAlgorithm]]");
const abortAlgorithm = createAlgorithmFromUnderlyingMethod(
underlyingSink,
"abort",
1,
);
setFunctionName(abortAlgorithm, "[[abortAlgorithm]]");
setUpWritableStreamDefaultController(
stream,
controller,
startAlgorithm,
writeAlgorithm,
closeAlgorithm,
abortAlgorithm,
highWaterMark,
sizeAlgorithm,
);
}
function transformStreamDefaultControllerClearAlgorithms(
controller,
) {
controller[sym.transformAlgorithm] = undefined;
controller[sym.flushAlgorithm] = undefined;
}
function transformStreamDefaultControllerEnqueue(
controller,
chunk,
) {
const stream = controller[sym.controlledTransformStream];
const readableController = stream[sym.readable][
sym.readableStreamController
];
if (!readableStreamDefaultControllerCanCloseOrEnqueue(readableController)) {
throw new TypeError(
"TransformStream's readable controller cannot be closed or enqueued.",
);
}
try {
readableStreamDefaultControllerEnqueue(readableController, chunk);
} catch (e) {
transformStreamErrorWritableAndUnblockWrite(stream, e);
throw stream[sym.readable][sym.storedError];
}
const backpressure = readableStreamDefaultControllerHasBackpressure(
readableController,
);
if (backpressure) {
transformStreamSetBackpressure(stream, true);
}
}
function transformStreamDefaultControllerError(
controller,
e,
) {
transformStreamError(controller[sym.controlledTransformStream], e);
}
function transformStreamDefaultControllerPerformTransform(
controller,
chunk,
) {
const transformPromise = controller[sym.transformAlgorithm](chunk);
return transformPromise.then(undefined, (r) => {
transformStreamError(controller[sym.controlledTransformStream], r);
throw r;
});
}
function transformStreamDefaultSinkAbortAlgorithm(
stream,
reason,
) {
transformStreamError(stream, reason);
return Promise.resolve(undefined);
}
function transformStreamDefaultSinkCloseAlgorithm(
stream,
) {
const readable = stream[sym.readable];
const controller = stream[sym.transformStreamController];
const flushPromise = controller[sym.flushAlgorithm]();
transformStreamDefaultControllerClearAlgorithms(controller);
return flushPromise.then(
() => {
if (readable[sym.state] === "errored") {
throw readable[sym.storedError];
}
const readableController = readable[
sym.readableStreamController
];
if (
readableStreamDefaultControllerCanCloseOrEnqueue(readableController)
) {
readableStreamDefaultControllerClose(readableController);
}
},
(r) => {
transformStreamError(stream, r);
throw readable[sym.storedError];
},
);
}
function transformStreamDefaultSinkWriteAlgorithm(
stream,
chunk,
) {
assert(stream[sym.writable][sym.state] === "writable");
const controller = stream[sym.transformStreamController];
if (stream[sym.backpressure]) {
const backpressureChangePromise = stream[sym.backpressureChangePromise];
assert(backpressureChangePromise);
return backpressureChangePromise.promise.then(() => {
const writable = stream[sym.writable];
const state = writable[sym.state];
if (state === "erroring") {
throw writable[sym.storedError];
}
assert(state === "writable");
return transformStreamDefaultControllerPerformTransform(
controller,
chunk,
);
});
}
return transformStreamDefaultControllerPerformTransform(controller, chunk);
}
function transformStreamDefaultSourcePullAlgorithm(
stream,
) {
assert(stream[sym.backpressure] === true);
assert(stream[sym.backpressureChangePromise] !== undefined);
transformStreamSetBackpressure(stream, false);
return stream[sym.backpressureChangePromise].promise;
}
function transformStreamError(
stream,
e,
) {
readableStreamDefaultControllerError(
stream[sym.readable][
sym.readableStreamController
],
e,
);
transformStreamErrorWritableAndUnblockWrite(stream, e);
}
function transformStreamDefaultControllerTerminate(
controller,
) {
const stream = controller[sym.controlledTransformStream];
const readableController = stream[sym.readable][
sym.readableStreamController
];
readableStreamDefaultControllerClose(readableController);
const error = new TypeError("TransformStream is closed.");
transformStreamErrorWritableAndUnblockWrite(stream, error);
}
function transformStreamErrorWritableAndUnblockWrite(
stream,
e,
) {
transformStreamDefaultControllerClearAlgorithms(
stream[sym.transformStreamController],
);
writableStreamDefaultControllerErrorIfNeeded(
stream[sym.writable][sym.writableStreamController],
e,
);
if (stream[sym.backpressure]) {
transformStreamSetBackpressure(stream, false);
}
}
function transformStreamSetBackpressure(
stream,
backpressure,
) {
assert(stream[sym.backpressure] !== backpressure);
if (stream[sym.backpressureChangePromise] !== undefined) {
stream[sym.backpressureChangePromise].resolve(undefined);
}
stream[sym.backpressureChangePromise] = getDeferred();
stream[sym.backpressure] = backpressure;
}
function transferArrayBuffer(buffer) {
assert(!isDetachedBuffer(buffer));
const transferredIshVersion = buffer.slice(0);
Object.defineProperty(buffer, "byteLength", {
get() {
return 0;
},
});
buffer[sym.isFakeDetached] = true;
return transferredIshVersion;
}
function validateAndNormalizeHighWaterMark(
highWaterMark,
) {
highWaterMark = Number(highWaterMark);
if (Number.isNaN(highWaterMark) || highWaterMark < 0) {
throw new RangeError(
`highWaterMark must be a positive number or Infinity. Received: ${highWaterMark}.`,
);
}
return highWaterMark;
}
function writableStreamAbort(
stream,
reason,
) {
const state = stream[sym.state];
if (state === "closed" || state === "errored") {
return Promise.resolve(undefined);
}
if (stream[sym.pendingAbortRequest]) {
return stream[sym.pendingAbortRequest].promise.promise;
}
assert(state === "writable" || state === "erroring");
let wasAlreadyErroring = false;
if (state === "erroring") {
wasAlreadyErroring = true;
reason = undefined;
}
const promise = getDeferred();
stream[sym.pendingAbortRequest] = { promise, reason, wasAlreadyErroring };
if (wasAlreadyErroring === false) {
writableStreamStartErroring(stream, reason);
}
return promise.promise;
}
function writableStreamAddWriteRequest(
stream,
) {
assert(isWritableStream(stream));
assert(stream[sym.state] === "writable");
const promise = getDeferred();
stream[sym.writeRequests].push(promise);
return promise.promise;
}
function writableStreamClose(
stream,
) {
const state = stream[sym.state];
if (state === "closed" || state === "errored") {
return Promise.reject(
new TypeError(
"Cannot close an already closed or errored WritableStream.",
),
);
}
assert(!writableStreamCloseQueuedOrInFlight(stream));
const promise = getDeferred();
stream[sym.closeRequest] = promise;
const writer = stream[sym.writer];
if (writer && stream[sym.backpressure] && state === "writable") {
writer[sym.readyPromise].resolve();
writer[sym.readyPromise].resolve = undefined;
writer[sym.readyPromise].reject = undefined;
}
writableStreamDefaultControllerClose(stream[sym.writableStreamController]);
return promise.promise;
}
function writableStreamCloseQueuedOrInFlight(
stream,
) {
return !(
stream[sym.closeRequest] === undefined &&
stream[sym.inFlightCloseRequest] === undefined
);
}
function writableStreamDealWithRejection(
stream,
error,
) {
const state = stream[sym.state];
if (state === "writable") {
writableStreamStartErroring(stream, error);
return;
}
assert(state === "erroring");
writableStreamFinishErroring(stream);
}
function writableStreamDefaultControllerAdvanceQueueIfNeeded(
controller,
) {
const stream = controller[sym.controlledWritableStream];
if (!controller[sym.started]) {
return;
}
if (stream[sym.inFlightWriteRequest]) {
return;
}
const state = stream[sym.state];
assert(state !== "closed" && state !== "errored");
if (state === "erroring") {
writableStreamFinishErroring(stream);
return;
}
if (!controller[sym.queue].length) {
return;
}
const writeRecord = peekQueueValue(controller);
if (writeRecord === "close") {
writableStreamDefaultControllerProcessClose(controller);
} else {
writableStreamDefaultControllerProcessWrite(
controller,
writeRecord.chunk,
);
}
}
function writableStreamDefaultControllerClearAlgorithms(
controller,
) {
controller[sym.writeAlgorithm] = undefined;
controller[sym.closeAlgorithm] = undefined;
controller[sym.abortAlgorithm] = undefined;
controller[sym.strategySizeAlgorithm] = undefined;
}
function writableStreamDefaultControllerClose(
controller,
) {
enqueueValueWithSize(controller, "close", 0);
writableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
}
function writableStreamDefaultControllerError(
controller,
error,
) {
const stream = controller[sym.controlledWritableStream];
assert(stream[sym.state] === "writable");
writableStreamDefaultControllerClearAlgorithms(controller);
writableStreamStartErroring(stream, error);
}
function writableStreamDefaultControllerErrorIfNeeded(
controller,
error,
) {
if (controller[sym.controlledWritableStream][sym.state] === "writable") {
writableStreamDefaultControllerError(controller, error);
}
}
function writableStreamDefaultControllerGetBackpressure(
controller,
) {
const desiredSize = writableStreamDefaultControllerGetDesiredSize(
controller,
);
return desiredSize <= 0;
}
function writableStreamDefaultControllerGetChunkSize(
controller,
chunk,
) {
let returnValue;
try {
returnValue = controller[sym.strategySizeAlgorithm](chunk);
} catch (e) {
writableStreamDefaultControllerErrorIfNeeded(controller, e);
return 1;
}
return returnValue;
}
function writableStreamDefaultControllerGetDesiredSize(
controller,
) {
return controller[sym.strategyHWM] - controller[sym.queueTotalSize];
}
function writableStreamDefaultControllerProcessClose(
controller,
) {
const stream = controller[sym.controlledWritableStream];
writableStreamMarkCloseRequestInFlight(stream);
dequeueValue(controller);
assert(controller[sym.queue].length === 0);
const sinkClosePromise = controller[sym.closeAlgorithm]();
writableStreamDefaultControllerClearAlgorithms(controller);
setPromiseIsHandledToTrue(
sinkClosePromise.then(
() => {
writableStreamFinishInFlightClose(stream);
},
(reason) => {
writableStreamFinishInFlightCloseWithError(stream, reason);
},
),
);
}
function writableStreamDefaultControllerProcessWrite(
controller,
chunk,
) {
const stream = controller[sym.controlledWritableStream];
writableStreamMarkFirstWriteRequestInFlight(stream);
const sinkWritePromise = controller[sym.writeAlgorithm](chunk);
setPromiseIsHandledToTrue(
sinkWritePromise.then(
() => {
writableStreamFinishInFlightWrite(stream);
const state = stream[sym.state];
assert(state === "writable" || state === "erroring");
dequeueValue(controller);
if (
!writableStreamCloseQueuedOrInFlight(stream) &&
state === "writable"
) {
const backpressure = writableStreamDefaultControllerGetBackpressure(
controller,
);
writableStreamUpdateBackpressure(stream, backpressure);
}
writableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
},
(reason) => {
if (stream[sym.state] === "writable") {
writableStreamDefaultControllerClearAlgorithms(controller);
}
writableStreamFinishInFlightWriteWithError(stream, reason);
},
),
);
}
function writableStreamDefaultControllerWrite(
controller,
chunk,
chunkSize,
) {
const writeRecord = { chunk };
try {
enqueueValueWithSize(controller, writeRecord, chunkSize);
} catch (e) {
writableStreamDefaultControllerErrorIfNeeded(controller, e);
return;
}
const stream = controller[sym.controlledWritableStream];
if (
!writableStreamCloseQueuedOrInFlight(stream) &&
stream[sym.state] === "writable"
) {
const backpressure = writableStreamDefaultControllerGetBackpressure(
controller,
);
writableStreamUpdateBackpressure(stream, backpressure);
}
writableStreamDefaultControllerAdvanceQueueIfNeeded(controller);
}
function writableStreamDefaultWriterAbort(
writer,
reason,
) {
const stream = writer[sym.ownerWritableStream];
assert(stream);
return writableStreamAbort(stream, reason);
}
function writableStreamDefaultWriterClose(
writer,
) {
const stream = writer[sym.ownerWritableStream];
assert(stream);
return writableStreamClose(stream);
}
function writableStreamDefaultWriterCloseWithErrorPropagation(
writer,
) {
const stream = writer[sym.ownerWritableStream];
assert(stream);
const state = stream[sym.state];
if (writableStreamCloseQueuedOrInFlight(stream) || state === "closed") {
return Promise.resolve();
}
if (state === "errored") {
return Promise.reject(stream[sym.storedError]);
}
assert(state === "writable" || state === "erroring");
return writableStreamDefaultWriterClose(writer);
}
function writableStreamDefaultWriterEnsureClosePromiseRejected(
writer,
error,
) {
if (writer[sym.closedPromise].reject) {
writer[sym.closedPromise].reject(error);
} else {
writer[sym.closedPromise] = {
promise: Promise.reject(error),
};
}
setPromiseIsHandledToTrue(writer[sym.closedPromise].promise);
}
function writableStreamDefaultWriterEnsureReadyPromiseRejected(
writer,
error,
) {
if (writer[sym.readyPromise].reject) {
writer[sym.readyPromise].reject(error);
writer[sym.readyPromise].reject = undefined;
writer[sym.readyPromise].resolve = undefined;
} else {
writer[sym.readyPromise] = {
promise: Promise.reject(error),
};
}
setPromiseIsHandledToTrue(writer[sym.readyPromise].promise);
}
function writableStreamDefaultWriterWrite(
writer,
chunk,
) {
const stream = writer[sym.ownerWritableStream];
assert(stream);
const controller = stream[sym.writableStreamController];
assert(controller);
const chunkSize = writableStreamDefaultControllerGetChunkSize(
controller,
chunk,
);
if (stream !== writer[sym.ownerWritableStream]) {
return Promise.reject("Writer has incorrect WritableStream.");
}
const state = stream[sym.state];
if (state === "errored") {
return Promise.reject(stream[sym.storedError]);
}
if (writableStreamCloseQueuedOrInFlight(stream) || state === "closed") {
return Promise.reject(new TypeError("The stream is closed or closing."));
}
if (state === "erroring") {
return Promise.reject(stream[sym.storedError]);
}
assert(state === "writable");
const promise = writableStreamAddWriteRequest(stream);
writableStreamDefaultControllerWrite(controller, chunk, chunkSize);
return promise;
}
function writableStreamDefaultWriterGetDesiredSize(
writer,
) {
const stream = writer[sym.ownerWritableStream];
const state = stream[sym.state];
if (state === "errored" || state === "erroring") {
return null;
}
if (state === "closed") {
return 0;
}
return writableStreamDefaultControllerGetDesiredSize(
stream[sym.writableStreamController],
);
}
function writableStreamDefaultWriterRelease(
writer,
) {
const stream = writer[sym.ownerWritableStream];
assert(stream);
assert(stream[sym.writer] === writer);
const releasedError = new TypeError(
"Writer was released and can no longer be used to monitor the stream's closedness.",
);
writableStreamDefaultWriterEnsureReadyPromiseRejected(
writer,
releasedError,
);
writableStreamDefaultWriterEnsureClosePromiseRejected(
writer,
releasedError,
);
stream[sym.writer] = undefined;
writer[sym.ownerWritableStream] = undefined;
}
function writableStreamFinishErroring(stream) {
assert(stream[sym.state] === "erroring");
assert(!writableStreamHasOperationMarkedInFlight(stream));
stream[sym.state] = "errored";
stream[sym.writableStreamController][sym.errorSteps]();
const storedError = stream[sym.storedError];
for (const writeRequest of stream[sym.writeRequests]) {
assert(writeRequest.reject);
writeRequest.reject(storedError);
}
stream[sym.writeRequests] = [];
if (!stream[sym.pendingAbortRequest]) {
writableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
return;
}
const abortRequest = stream[sym.pendingAbortRequest];
assert(abortRequest);
stream[sym.pendingAbortRequest] = undefined;
if (abortRequest.wasAlreadyErroring) {
assert(abortRequest.promise.reject);
abortRequest.promise.reject(storedError);
writableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
return;
}
const promise = stream[sym.writableStreamController][sym.abortSteps](
abortRequest.reason,
);
setPromiseIsHandledToTrue(
promise.then(
() => {
assert(abortRequest.promise.resolve);
abortRequest.promise.resolve();
writableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
},
(reason) => {
assert(abortRequest.promise.reject);
abortRequest.promise.reject(reason);
writableStreamRejectCloseAndClosedPromiseIfNeeded(stream);
},
),
);
}
function writableStreamFinishInFlightClose(
stream,
) {
assert(stream[sym.inFlightCloseRequest]);
stream[sym.inFlightCloseRequest]?.resolve();
stream[sym.inFlightCloseRequest] = undefined;
const state = stream[sym.state];
assert(state === "writable" || state === "erroring");
if (state === "erroring") {
stream[sym.storedError] = undefined;
if (stream[sym.pendingAbortRequest]) {
stream[sym.pendingAbortRequest].promise.resolve();
stream[sym.pendingAbortRequest] = undefined;
}
}
stream[sym.state] = "closed";
const writer = stream[sym.writer];
if (writer) {
writer[sym.closedPromise].resolve();
}
assert(stream[sym.pendingAbortRequest] === undefined);
assert(stream[sym.storedError] === undefined);
}
function writableStreamFinishInFlightCloseWithError(
stream,
error,
) {
assert(stream[sym.inFlightCloseRequest]);
stream[sym.inFlightCloseRequest]?.reject(error);
stream[sym.inFlightCloseRequest] = undefined;
assert(
stream[sym.state] === "writable" || stream[sym.state] === "erroring",
);
if (stream[sym.pendingAbortRequest]) {
stream[sym.pendingAbortRequest]?.promise.reject(error);
stream[sym.pendingAbortRequest] = undefined;
}
writableStreamDealWithRejection(stream, error);
}
function writableStreamFinishInFlightWrite(
stream,
) {
assert(stream[sym.inFlightWriteRequest]);
stream[sym.inFlightWriteRequest].resolve();
stream[sym.inFlightWriteRequest] = undefined;
}
function writableStreamFinishInFlightWriteWithError(
stream,
error,
) {
assert(stream[sym.inFlightWriteRequest]);
stream[sym.inFlightWriteRequest].reject(error);
stream[sym.inFlightWriteRequest] = undefined;
assert(
stream[sym.state] === "writable" || stream[sym.state] === "erroring",
);
writableStreamDealWithRejection(stream, error);
}
function writableStreamHasOperationMarkedInFlight(
stream,
) {
return !(
stream[sym.inFlightWriteRequest] === undefined &&
stream[sym.inFlightCloseRequest] === undefined
);
}
function writableStreamMarkCloseRequestInFlight(
stream,
) {
assert(stream[sym.inFlightCloseRequest] === undefined);
assert(stream[sym.closeRequest] !== undefined);
stream[sym.inFlightCloseRequest] = stream[sym.closeRequest];
stream[sym.closeRequest] = undefined;
}
function writableStreamMarkFirstWriteRequestInFlight(
stream,
) {
assert(stream[sym.inFlightWriteRequest] === undefined);
assert(stream[sym.writeRequests].length);
const writeRequest = stream[sym.writeRequests].shift();
stream[sym.inFlightWriteRequest] = writeRequest;
}
function writableStreamRejectCloseAndClosedPromiseIfNeeded(
stream,
) {
assert(stream[sym.state] === "errored");
if (stream[sym.closeRequest]) {
assert(stream[sym.inFlightCloseRequest] === undefined);
stream[sym.closeRequest].reject(stream[sym.storedError]);
stream[sym.closeRequest] = undefined;
}
const writer = stream[sym.writer];
if (writer) {
writer[sym.closedPromise].reject(stream[sym.storedError]);
setPromiseIsHandledToTrue(writer[sym.closedPromise].promise);
}
}
function writableStreamStartErroring(
stream,
reason,
) {
assert(stream[sym.storedError] === undefined);
assert(stream[sym.state] === "writable");
const controller = stream[sym.writableStreamController];
assert(controller);
stream[sym.state] = "erroring";
stream[sym.storedError] = reason;
const writer = stream[sym.writer];
if (writer) {
writableStreamDefaultWriterEnsureReadyPromiseRejected(writer, reason);
}
if (
!writableStreamHasOperationMarkedInFlight(stream) &&
controller[sym.started]
) {
writableStreamFinishErroring(stream);
}
}
function writableStreamUpdateBackpressure(
stream,
backpressure,
) {
assert(stream[sym.state] === "writable");
assert(!writableStreamCloseQueuedOrInFlight(stream));
const writer = stream[sym.writer];
if (writer && backpressure !== stream[sym.backpressure]) {
if (backpressure) {
writer[sym.readyPromise] = getDeferred();
} else {
assert(backpressure === false);
writer[sym.readyPromise].resolve();
writer[sym.readyPromise].resolve = undefined;
writer[sym.readyPromise].reject = undefined;
}
}
stream[sym.backpressure] = backpressure;
}
/* eslint-enable */
window.__bootstrap.streams = {
ReadableStream,
TransformStream,
WritableStream,
isReadableStreamDisturbed,
};
})(this);