From 1ab3691b091e34ffa5a0b8f2cd18a87da8c4930c Mon Sep 17 00:00:00 2001 From: Luca Casonato Date: Mon, 10 Oct 2022 10:28:35 +0200 Subject: [PATCH] feat(core): add Deno.core.writeAll(rid, chunk) (#16228) This commit adds a new op_write_all to core that allows writing an entire chunk in a single async op call. Internally this calls `Resource::write_all`. The `writableStreamForRid` has been moved to `06_streams.js` now, and uses this new op. Various other code paths now also use this new op. Closes #16227 --- core/01_core.js | 1 + core/examples/http_bench_json_ops.js | 2 +- core/lib.deno_core.d.ts | 5 +++ core/ops_builtin.rs | 13 +++++++ ext/cache/01_cache.js | 2 +- ext/fetch/26_fetch.js | 2 +- ext/net/01_net.js | 39 +------------------ ext/web/06_streams.js | 58 ++++++++++++++++++++++++++++ runtime/js/40_files.js | 4 +- runtime/js/40_spawn.js | 2 +- 10 files changed, 85 insertions(+), 43 deletions(-) diff --git a/core/01_core.js b/core/01_core.js index 655b4219e0..b98e54160b 100644 --- a/core/01_core.js +++ b/core/01_core.js @@ -329,6 +329,7 @@ tryClose: (rid) => ops.op_try_close(rid), read: opAsync.bind(null, "op_read"), write: opAsync.bind(null, "op_write"), + writeAll: opAsync.bind(null, "op_write_all"), shutdown: opAsync.bind(null, "op_shutdown"), print: (msg, isErr) => ops.op_print(msg, isErr), setMacrotaskCallback: (fn) => ops.op_set_macrotask_callback(fn), diff --git a/core/examples/http_bench_json_ops.js b/core/examples/http_bench_json_ops.js index cea3449878..98b2f4ef86 100644 --- a/core/examples/http_bench_json_ops.js +++ b/core/examples/http_bench_json_ops.js @@ -23,7 +23,7 @@ async function serve(rid) { try { while (true) { await Deno.core.read(rid, requestBuf); - await Deno.core.write(rid, responseBuf); + await Deno.core.writeAll(rid, responseBuf); } } catch (e) { if ( diff --git a/core/lib.deno_core.d.ts b/core/lib.deno_core.d.ts index 7e46d0f141..2a37647301 100644 --- a/core/lib.deno_core.d.ts +++ b/core/lib.deno_core.d.ts @@ -61,6 +61,11 @@ declare namespace Deno { */ function write(rid: number, buf: Uint8Array): Promise; + /** + * Write to a (stream) resource that implements write() + */ + function writeAll(rid: number, buf: Uint8Array): Promise; + /** * Print a message to stdout or stderr */ diff --git a/core/ops_builtin.rs b/core/ops_builtin.rs index 41741bf287..3fc9d62d6e 100644 --- a/core/ops_builtin.rs +++ b/core/ops_builtin.rs @@ -38,6 +38,7 @@ pub(crate) fn init_builtins() -> Extension { op_read::decl(), op_read_all::decl(), op_write::decl(), + op_write_all::decl(), op_shutdown::decl(), op_metrics::decl(), op_format_file_name::decl(), @@ -253,6 +254,18 @@ async fn op_write( Ok(resp.nwritten() as u32) } +#[op] +async fn op_write_all( + state: Rc>, + rid: ResourceId, + buf: ZeroCopyBuf, +) -> Result<(), Error> { + let resource = state.borrow().resource_table.get_any(rid)?; + let view = BufView::from(buf); + resource.write_all(view).await?; + Ok(()) +} + #[op] async fn op_shutdown( state: Rc>, diff --git a/ext/cache/01_cache.js b/ext/cache/01_cache.js index fa0b680378..afde7f7894 100644 --- a/ext/cache/01_cache.js +++ b/ext/cache/01_cache.js @@ -148,7 +148,7 @@ await core.shutdown(rid); break; } - await core.write(rid, value); + await core.writeAll(rid, value); } } finally { core.close(rid); diff --git a/ext/fetch/26_fetch.js b/ext/fetch/26_fetch.js index 169db2bbf2..5c824898de 100644 --- a/ext/fetch/26_fetch.js +++ b/ext/fetch/26_fetch.js @@ -225,7 +225,7 @@ } try { await PromisePrototypeCatch( - core.write(requestBodyRid, value), + core.writeAll(requestBodyRid, value), (err) => { if (terminator.aborted) return; throw err; diff --git a/ext/net/01_net.js b/ext/net/01_net.js index 04360116b2..d7a093ba6c 100644 --- a/ext/net/01_net.js +++ b/ext/net/01_net.js @@ -4,7 +4,8 @@ ((window) => { const core = window.Deno.core; const { BadResourcePrototype, InterruptedPrototype, ops } = core; - const { WritableStream, readableStreamForRid } = window.__bootstrap.streams; + const { readableStreamForRid, writableStreamForRid } = + window.__bootstrap.streams; const { Error, ObjectPrototypeIsPrototypeOf, @@ -65,39 +66,6 @@ return core.opAsync("op_dns_resolve", { query, recordType, options }); } - function tryClose(rid) { - try { - core.close(rid); - } catch { - // Ignore errors - } - } - - function writableStreamForRid(rid) { - return new WritableStream({ - async write(chunk, controller) { - try { - let nwritten = 0; - while (nwritten < chunk.length) { - nwritten += await write( - rid, - TypedArrayPrototypeSubarray(chunk, nwritten), - ); - } - } catch (e) { - controller.error(e); - tryClose(rid); - } - }, - close() { - tryClose(rid); - }, - abort() { - tryClose(rid); - }, - }); - } - class Conn { #rid = 0; #remoteAddr = null; @@ -353,7 +321,4 @@ Datagram, resolveDns, }; - window.__bootstrap.streamUtils = { - writableStreamForRid, - }; })(this); diff --git a/ext/web/06_streams.js b/ext/web/06_streams.js index 76e31503f7..09e5b74148 100644 --- a/ext/web/06_streams.js +++ b/ext/web/06_streams.js @@ -826,6 +826,62 @@ return finalBuffer; } + /** + * Create a new Writable object that is backed by a Resource that implements + * `Resource::write` / `Resource::write_all`. This object contains enough + * metadata to allow callers to bypass the JavaScript WritableStream + * implementation and write directly to the underlying resource if they so + * choose (FastStream). + * + * @param {number} rid The resource ID to write to. + * @param {boolean=} autoClose If the resource should be auto-closed when the stream closes. Defaults to true. + * @returns {ReadableStream} + */ + function writableStreamForRid(rid, autoClose = true) { + const stream = webidl.createBranded(WritableStream); + stream[_resourceBacking] = { rid, autoClose }; + + const tryClose = () => { + if (!autoClose) return; + RESOURCE_REGISTRY.unregister(stream); + core.tryClose(rid); + }; + + if (autoClose) { + RESOURCE_REGISTRY.register(stream, rid, stream); + } + + const underlyingSink = { + async write(chunk, controller) { + try { + await core.writeAll(rid, chunk); + } catch (e) { + controller.error(e); + tryClose(); + } + }, + close() { + tryClose(); + }, + abort() { + tryClose(); + }, + }; + initializeWritableStream(stream); + setUpWritableStreamDefaultControllerFromUnderlyingSink( + stream, + underlyingSink, + underlyingSink, + 1, + () => 1, + ); + return stream; + } + + function getWritableStreamResourceBacking(stream) { + return stream[_resourceBacking]; + } + /* * @param {ReadableStream} stream */ @@ -6059,6 +6115,8 @@ readableStreamForRidUnrefableUnref, readableStreamThrowIfErrored, getReadableStreamResourceBacking, + writableStreamForRid, + getWritableStreamResourceBacking, Deferred, // Exposed in global runtime scope ByteLengthQueuingStrategy, diff --git a/runtime/js/40_files.js b/runtime/js/40_files.js index c864d39707..226abb33e6 100644 --- a/runtime/js/40_files.js +++ b/runtime/js/40_files.js @@ -7,8 +7,8 @@ const { read, readSync, write, writeSync } = window.__bootstrap.io; const { ftruncate, ftruncateSync, fstat, fstatSync } = window.__bootstrap.fs; const { pathFromURL } = window.__bootstrap.util; - const { writableStreamForRid } = window.__bootstrap.streamUtils; - const { readableStreamForRid } = window.__bootstrap.streams; + const { readableStreamForRid, writableStreamForRid } = + window.__bootstrap.streams; const { ArrayPrototypeFilter, Error, diff --git a/runtime/js/40_spawn.js b/runtime/js/40_spawn.js index 99661bf1a9..a0283f0ffb 100644 --- a/runtime/js/40_spawn.js +++ b/runtime/js/40_spawn.js @@ -20,8 +20,8 @@ readableStreamForRidUnrefable, readableStreamForRidUnrefableRef, readableStreamForRidUnrefableUnref, + writableStreamForRid, } = window.__bootstrap.streams; - const { writableStreamForRid } = window.__bootstrap.streamUtils; const promiseIdSymbol = SymbolFor("Deno.core.internalPromiseId");