From 24af5a2c4c45eb1c2e04825d7a3671a1a64738c1 Mon Sep 17 00:00:00 2001 From: Florian Schwalm <68847951+egfx-notifications@users.noreply.github.com> Date: Tue, 13 Feb 2024 22:45:23 +0100 Subject: [PATCH] fix(ext/web): Prevent (De-)CompressionStream resource leak on stream cancellation (#21199) Based on #21074 and #20741 I was looking for further potential use cases of `TransformStream` `cancel()` method, so here go `CompressionStream` and `DecompressionStream`. Fixes #14212 --- ext/web/14_compression.js | 10 +++++-- ext/web/compression.rs | 26 +++++++++++------ tests/unit/streams_test.ts | 59 +++++++++++++++++++++++++++++++++++++- 3 files changed, 84 insertions(+), 11 deletions(-) diff --git a/ext/web/14_compression.js b/ext/web/14_compression.js index 1ec9e54628..0d3a8ebd0d 100644 --- a/ext/web/14_compression.js +++ b/ext/web/14_compression.js @@ -50,9 +50,12 @@ class CompressionStream { maybeEnqueue(controller, output); }, flush(controller) { - const output = op_compression_finish(rid); + const output = op_compression_finish(rid, true); maybeEnqueue(controller, output); }, + cancel: (_reason) => { + op_compression_finish(rid, false); + }, }); this[webidl.brand] = webidl.brand; @@ -109,9 +112,12 @@ class DecompressionStream { maybeEnqueue(controller, output); }, flush(controller) { - const output = op_compression_finish(rid); + const output = op_compression_finish(rid, true); maybeEnqueue(controller, output); }, + cancel: (_reason) => { + op_compression_finish(rid, false); + }, }); this[webidl.brand] = webidl.brand; diff --git a/ext/web/compression.rs b/ext/web/compression.rs index 0829af8e57..4f4d90cbfc 100644 --- a/ext/web/compression.rs +++ b/ext/web/compression.rs @@ -116,25 +116,35 @@ pub fn op_compression_write( pub fn op_compression_finish( state: &mut OpState, #[smi] rid: ResourceId, + report_errors: bool, ) -> Result { let resource = state.resource_table.take::(rid)?; let resource = Rc::try_unwrap(resource).unwrap(); let inner = resource.0.into_inner(); - let out: Vec = match inner { + let out = match inner { Inner::DeflateDecoder(d) => { - d.finish().map_err(|e| type_error(e.to_string()))? + d.finish().map_err(|e| type_error(e.to_string())) } Inner::DeflateEncoder(d) => { - d.finish().map_err(|e| type_error(e.to_string()))? + d.finish().map_err(|e| type_error(e.to_string())) } Inner::DeflateRawDecoder(d) => { - d.finish().map_err(|e| type_error(e.to_string()))? + d.finish().map_err(|e| type_error(e.to_string())) } Inner::DeflateRawEncoder(d) => { - d.finish().map_err(|e| type_error(e.to_string()))? + d.finish().map_err(|e| type_error(e.to_string())) } - Inner::GzDecoder(d) => d.finish().map_err(|e| type_error(e.to_string()))?, - Inner::GzEncoder(d) => d.finish().map_err(|e| type_error(e.to_string()))?, + Inner::GzDecoder(d) => d.finish().map_err(|e| type_error(e.to_string())), + Inner::GzEncoder(d) => d.finish().map_err(|e| type_error(e.to_string())), }; - Ok(out.into()) + match out { + Err(err) => { + if report_errors { + Err(err) + } else { + Ok(Vec::with_capacity(0).into()) + } + } + Ok(out) => Ok(out.into()), + } } diff --git a/tests/unit/streams_test.ts b/tests/unit/streams_test.ts index 6db9f666cc..80b45e6024 100644 --- a/tests/unit/streams_test.ts +++ b/tests/unit/streams_test.ts @@ -1,5 +1,5 @@ // Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. -import { assertEquals, fail } from "./test_util.ts"; +import { assertEquals, assertRejects, fail } from "./test_util.ts"; const { core, @@ -476,3 +476,60 @@ for (const packetCount of [1, 1024]) { assertEquals(await promise, "resource closed"); }); } + +Deno.test(async function compressionStreamWritableMayBeAborted() { + await Promise.all([ + new CompressionStream("gzip").writable.getWriter().abort(), + new CompressionStream("deflate").writable.getWriter().abort(), + new CompressionStream("deflate-raw").writable.getWriter().abort(), + ]); +}); + +Deno.test(async function compressionStreamReadableMayBeCancelled() { + await Promise.all([ + new CompressionStream("gzip").readable.getReader().cancel(), + new CompressionStream("deflate").readable.getReader().cancel(), + new CompressionStream("deflate-raw").readable.getReader().cancel(), + ]); +}); + +Deno.test(async function decompressionStreamWritableMayBeAborted() { + await Promise.all([ + new DecompressionStream("gzip").writable.getWriter().abort(), + new DecompressionStream("deflate").writable.getWriter().abort(), + new DecompressionStream("deflate-raw").writable.getWriter().abort(), + ]); +}); + +Deno.test(async function decompressionStreamReadableMayBeCancelled() { + await Promise.all([ + new DecompressionStream("gzip").readable.getReader().cancel(), + new DecompressionStream("deflate").readable.getReader().cancel(), + new DecompressionStream("deflate-raw").readable.getReader().cancel(), + ]); +}); + +Deno.test(async function decompressionStreamValidGzipDoesNotThrow() { + const cs = new CompressionStream("gzip"); + const ds = new DecompressionStream("gzip"); + cs.readable.pipeThrough(ds); + const writer = cs.writable.getWriter(); + await writer.write(new Uint8Array([1])); + writer.releaseLock(); + await cs.writable.close(); + let result = new Uint8Array(); + for await (const chunk of ds.readable.values()) { + result = new Uint8Array([...result, ...chunk]); + } + assertEquals(result, new Uint8Array([1])); +}); + +Deno.test(async function decompressionStreamInvalidGzipStillReported() { + await assertRejects( + async () => { + await new DecompressionStream("gzip").writable.close(); + }, + TypeError, + "corrupt gzip stream does not have a matching checksum", + ); +});