diff --git a/cli/tests/unit/streams_test.ts b/cli/tests/unit/streams_test.ts index 4a573c9344..c62c48469c 100644 --- a/cli/tests/unit/streams_test.ts +++ b/cli/tests/unit/streams_test.ts @@ -142,7 +142,7 @@ Deno.test(async function readableStreamClose() { const nread = await core.ops.op_read(rid, buffer); assertEquals(nread, 12); core.ops.op_close(rid); - assertEquals(await cancel, undefined); + assertEquals(await cancel, "resource closed"); }); // Close the stream without reading everything @@ -153,7 +153,7 @@ Deno.test(async function readableStreamClosePartialRead() { const nread = await core.ops.op_read(rid, buffer); assertEquals(nread, 5); core.ops.op_close(rid); - assertEquals(await cancel, undefined); + assertEquals(await cancel, "resource closed"); }); // Close the stream without reading anything @@ -161,7 +161,7 @@ Deno.test(async function readableStreamCloseWithoutRead() { const cancel = deferred(); const rid = resourceForReadableStream(helloWorldStream(false, cancel)); core.ops.op_close(rid); - assertEquals(await cancel, undefined); + assertEquals(await cancel, "resource closed"); }); Deno.test(async function readableStreamPartial() { @@ -205,7 +205,13 @@ for ( ) { Deno.test(`readableStreamError_${type}`, async function () { const rid = resourceForReadableStream(errorStream(type)); - assertEquals(12, await core.ops.op_read(rid, new Uint8Array(16))); + let nread; + try { + nread = await core.ops.op_read(rid, new Uint8Array(16)); + } catch (_) { + fail("Should not have thrown"); + } + assertEquals(12, nread); try { await core.ops.op_read(rid, new Uint8Array(1)); fail(); @@ -297,3 +303,32 @@ function createStreamTest( } }); } + +Deno.test(async function readableStreamWithAggressiveResourceClose() { + let first = true; + const reasonPromise = deferred(); + const rid = resourceForReadableStream( + new ReadableStream({ + pull(controller) { + if (first) { + // We queue this up and then immediately close the resource (not the reader) + controller.enqueue(new Uint8Array(1)); + core.close(rid); + // This doesn't throw, even though the resource is closed + controller.enqueue(new Uint8Array(1)); + first = false; + } + }, + cancel(reason) { + reasonPromise.resolve(reason); + }, + }), + ); + try { + await core.ops.op_read(rid, new Uint8Array(1)); + fail(); + } catch (e) { + assertEquals(e.message, "operation canceled"); + } + assertEquals(await reasonPromise, "resource closed"); +}); diff --git a/ext/web/06_streams.js b/ext/web/06_streams.js index 0849d221d1..9dde03b7f4 100644 --- a/ext/web/06_streams.js +++ b/ext/web/06_streams.js @@ -724,7 +724,7 @@ function resourceForReadableStream(stream) { PromisePrototypeCatch( PromisePrototypeThen( op_readable_stream_resource_await_close(rid), - () => reader.cancel(), + () => reader.cancel("resource closed"), ), () => {}, ); @@ -745,17 +745,25 @@ function resourceForReadableStream(stream) { break; } } catch (err) { - const message = err.message; - if (message) { - await op_readable_stream_resource_write_error(sink, err.message); - } else { - await op_readable_stream_resource_write_error(sink, String(err)); + const message = err?.message; + const success = (message && (typeof message == "string")) + ? await op_readable_stream_resource_write_error(sink, message) + : await op_readable_stream_resource_write_error( + sink, + String(err), + ); + // We don't cancel the reader if there was an error reading. We'll let the downstream + // consumer close the resource after it receives the error. + if (!success) { + reader.cancel("resource closed"); } break; } // If the chunk has non-zero length, write it if (value.length > 0) { - await op_readable_stream_resource_write_buf(sink, value); + if (!await op_readable_stream_resource_write_buf(sink, value)) { + reader.cancel("resource closed"); + } } } } finally { diff --git a/ext/web/stream_resource.rs b/ext/web/stream_resource.rs index 4c2a756483..0c483ecccc 100644 --- a/ext/web/stream_resource.rs +++ b/ext/web/stream_resource.rs @@ -111,6 +111,10 @@ impl Resource for ReadableStreamResource { fn read(self: Rc, limit: usize) -> AsyncResult { Box::pin(ReadableStreamResource::read(self, limit)) } + + fn close(self: Rc) { + self.cancel_handle.cancel(); + } } // TODO(mmastrac): Move this to deno_core @@ -155,10 +159,6 @@ impl Future for CompletionHandle { } } -fn sender_closed() -> Error { - type_error("sender closed") -} - /// Allocate a resource that wraps a ReadableStream. #[op2(fast)] #[smi] @@ -210,15 +210,13 @@ fn drop_sender(sender: *const c_void) { pub fn op_readable_stream_resource_write_buf( sender: *const c_void, #[buffer] buffer: JsBuffer, -) -> impl Future> { +) -> impl Future { let sender = get_sender(sender); async move { - let sender = sender.ok_or_else(sender_closed)?; - sender - .send(Ok(buffer.into())) - .await - .map_err(|_| sender_closed())?; - Ok(()) + let Some(sender) = sender else { + return false; + }; + sender.send(Ok(buffer.into())).await.ok().is_some() } } @@ -226,15 +224,17 @@ pub fn op_readable_stream_resource_write_buf( pub fn op_readable_stream_resource_write_error( sender: *const c_void, #[string] error: String, -) -> impl Future> { +) -> impl Future { let sender = get_sender(sender); async move { - let sender = sender.ok_or_else(sender_closed)?; + let Some(sender) = sender else { + return false; + }; sender .send(Err(type_error(Cow::Owned(error)))) .await - .map_err(|_| sender_closed())?; - Ok(()) + .ok() + .is_some() } }