From be59e93220e24a2e66ae2843a136e61eab9d8ac3 Mon Sep 17 00:00:00 2001 From: Leo Kettmeir Date: Sat, 27 May 2023 15:42:20 +0200 Subject: [PATCH] refactor(node/http): don't use readablestream for writing to request (#19282) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Refactors the internal usage of a readablestream to write to the resource directly --------- Co-authored-by: Bartek IwaƄczuk --- cli/tests/node_compat/config.jsonc | 3 +- .../parallel/test-client-request-destroy.js | 20 -- ext/node/lib.rs | 15 +- ext/node/ops/http.rs | 12 +- ext/node/polyfills/_http_outgoing.ts | 50 ++-- ext/node/polyfills/http.ts | 238 +++++++----------- runtime/build.rs | 7 + runtime/permissions/mod.rs | 10 + tools/node_compat/TODO.md | 3 +- 9 files changed, 157 insertions(+), 201 deletions(-) delete mode 100644 cli/tests/node_compat/test/parallel/test-client-request-destroy.js diff --git a/cli/tests/node_compat/config.jsonc b/cli/tests/node_compat/config.jsonc index ccc83cd3de..ef2f4fccb4 100644 --- a/cli/tests/node_compat/config.jsonc +++ b/cli/tests/node_compat/config.jsonc @@ -232,7 +232,8 @@ "test-child-process-spawnsync-maxbuf.js", "test-child-process-spawnsync-validation-errors.js", "test-child-process-spawnsync.js", - "test-client-request-destroy.js", + // TODO(crowlKats): socket is not yet polyfilled + // "test-client-request-destroy.js", "test-console-async-write-error.js", "test-console-group.js", "test-console-log-stdio-broken-dest.js", diff --git a/cli/tests/node_compat/test/parallel/test-client-request-destroy.js b/cli/tests/node_compat/test/parallel/test-client-request-destroy.js deleted file mode 100644 index f7e11ae0bf..0000000000 --- a/cli/tests/node_compat/test/parallel/test-client-request-destroy.js +++ /dev/null @@ -1,20 +0,0 @@ -// deno-fmt-ignore-file -// deno-lint-ignore-file - -// Copyright Joyent and Node contributors. All rights reserved. MIT license. -// Taken from Node 18.12.1 -// This file is automatically generated by "node/_tools/setup.ts". Do not modify this file manually - -'use strict'; - -// Test that http.ClientRequest,prototype.destroy() returns `this`. -require('../common'); - -const assert = require('assert'); -const http = require('http'); -const clientRequest = new http.ClientRequest({ createConnection: () => {} }); - -assert.strictEqual(clientRequest.destroyed, false); -assert.strictEqual(clientRequest.destroy(), clientRequest); -assert.strictEqual(clientRequest.destroyed, true); -assert.strictEqual(clientRequest.destroy(), clientRequest); diff --git a/ext/node/lib.rs b/ext/node/lib.rs index 4c81351a89..c8242992a0 100644 --- a/ext/node/lib.rs +++ b/ext/node/lib.rs @@ -9,6 +9,7 @@ use deno_core::error::AnyError; use deno_core::located_script_name; use deno_core::op; use deno_core::serde_json; +use deno_core::url::Url; use deno_core::JsRuntime; use deno_core::ModuleSpecifier; use deno_fs::sync::MaybeSend; @@ -41,12 +42,24 @@ pub use resolution::NodeResolutionMode; pub use resolution::NodeResolver; pub trait NodePermissions { + fn check_net_url( + &mut self, + url: &Url, + api_name: &str, + ) -> Result<(), AnyError>; fn check_read(&self, path: &Path) -> Result<(), AnyError>; } pub(crate) struct AllowAllNodePermissions; impl NodePermissions for AllowAllNodePermissions { + fn check_net_url( + &mut self, + _url: &Url, + _api_name: &str, + ) -> Result<(), AnyError> { + Ok(()) + } fn check_read(&self, _path: &Path) -> Result<(), AnyError> { Ok(()) } @@ -206,7 +219,7 @@ deno_core::extension!(deno_node, ops::zlib::op_zlib_write_async, ops::zlib::op_zlib_init, ops::zlib::op_zlib_reset, - ops::http::op_node_http_request, + ops::http::op_node_http_request

, op_node_build_os, ops::require::op_require_init_paths, ops::require::op_require_node_module_paths

, diff --git a/ext/node/ops/http.rs b/ext/node/ops/http.rs index 2039fb3885..cc7dbf5220 100644 --- a/ext/node/ops/http.rs +++ b/ext/node/ops/http.rs @@ -24,14 +24,17 @@ use reqwest::Body; use reqwest::Method; #[op] -pub fn op_node_http_request( +pub fn op_node_http_request

( state: &mut OpState, method: ByteString, url: String, headers: Vec<(ByteString, ByteString)>, client_rid: Option, has_body: bool, -) -> Result { +) -> Result +where + P: crate::NodePermissions + 'static, +{ let client = if let Some(rid) = client_rid { let r = state.resource_table.get::(rid)?; r.client.clone() @@ -42,6 +45,11 @@ pub fn op_node_http_request( let method = Method::from_bytes(&method)?; let url = Url::parse(&url)?; + { + let permissions = state.borrow_mut::

(); + permissions.check_net_url(&url, "ClientRequest")?; + } + let mut header_map = HeaderMap::new(); for (key, value) in headers { let name = HeaderName::from_bytes(&key) diff --git a/ext/node/polyfills/_http_outgoing.ts b/ext/node/polyfills/_http_outgoing.ts index c4b88ae2fa..ab6a78038d 100644 --- a/ext/node/polyfills/_http_outgoing.ts +++ b/ext/node/polyfills/_http_outgoing.ts @@ -1,6 +1,7 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. // Copyright Joyent and Node contributors. All rights reserved. MIT license. +const core = globalThis.__bootstrap.core; import { getDefaultHighWaterMark } from "ext:deno_node/internal/streams/state.mjs"; import assert from "ext:deno_node/internal/assert.mjs"; import EE from "ext:deno_node/events.ts"; @@ -137,12 +138,6 @@ export class OutgoingMessage extends Stream { this._keepAliveTimeout = 0; this._onPendingData = nop; - - this.stream = new ReadableStream({ - start: (controller) => { - this.controller = controller; - }, - }); } get writableFinished() { @@ -374,21 +369,30 @@ export class OutgoingMessage extends Stream { return headers; } - controller: ReadableStreamDefaultController; write( chunk: string | Uint8Array | Buffer, encoding: string | null, - // TODO(crowlKats): use callback - _callback: () => void, + callback: () => void, ): boolean { - if (typeof chunk === "string") { - chunk = Buffer.from(chunk, encoding); - } - if (chunk instanceof Buffer) { - chunk = new Uint8Array(chunk.buffer); - } + if ( + (typeof chunk === "string" && chunk.length > 0) || + ((chunk instanceof Buffer || chunk instanceof Uint8Array) && + chunk.buffer.byteLength > 0) + ) { + if (typeof chunk === "string") { + chunk = Buffer.from(chunk, encoding); + } + if (chunk instanceof Buffer) { + chunk = new Uint8Array(chunk.buffer); + } - this.controller.enqueue(chunk); + core.writeAll(this._bodyWriteRid, chunk).then(() => { + callback?.(); + this.emit("drain"); + }).catch((e) => { + this._requestSendError = e; + }); + } return false; } @@ -400,18 +404,8 @@ export class OutgoingMessage extends Stream { } // deno-lint-ignore no-explicit-any - end(chunk: any, encoding: any, _callback: any) { - if (typeof chunk === "function") { - callback = chunk; - chunk = null; - encoding = null; - } else if (typeof encoding === "function") { - callback = encoding; - encoding = null; - } - // TODO(crowlKats): finish - - return this; + end(_chunk: any, _encoding: any, _callback: any) { + notImplemented("OutgoingMessage.end"); } flushHeaders() { diff --git a/ext/node/polyfills/http.ts b/ext/node/polyfills/http.ts index 93c802d377..4e72b80f84 100644 --- a/ext/node/polyfills/http.ts +++ b/ext/node/polyfills/http.ts @@ -38,6 +38,7 @@ import { Agent, globalAgent } from "ext:deno_node/_http_agent.mjs"; import { urlToHttpOptions } from "ext:deno_node/internal/url.ts"; import { kEmptyObject } from "ext:deno_node/internal/util.mjs"; import { constants, TCP } from "ext:deno_node/internal_binding/tcp_wrap.ts"; +import { notImplemented } from "ext:deno_node/_utils.ts"; import { connResetException, ERR_HTTP_HEADERS_SENT, @@ -500,6 +501,14 @@ class ClientRequest extends OutgoingMessage { delete optsWithoutSignal.signal; } + if (options!.createConnection) { + notImplemented("ClientRequest.options.createConnection"); + } + + if (options!.lookup) { + notImplemented("ClientRequest.options.lookup"); + } + // initiate connection // TODO(crowlKats): finish this /*if (this.agent) { @@ -547,61 +556,14 @@ class ClientRequest extends OutgoingMessage { const client = this._getClient() ?? createHttpClient({ http2: false }); this._client = client; - const req = core.ops.op_node_http_request( + this._req = core.ops.op_node_http_request( this.method, url, headers, client.rid, this.method === "POST" || this.method === "PATCH", ); - - this._req = req; - - if (req.requestBodyRid !== null) { - const reader = this.stream.getReader(); - (async () => { - let done = false; - while (!done) { - let val; - try { - const res = await reader.read(); - done = res.done; - val = res.value; - } catch (err) { - //if (terminator.aborted) break; - // TODO(lucacasonato): propagate error into response body stream - this._requestSendError = err; - this._requestSendErrorSet = true; - break; - } - if (done) break; - try { - await core.writeAll(req.requestBodyRid, val); - } catch (err) { - //if (terminator.aborted) break; - await reader.cancel(err); - // TODO(lucacasonato): propagate error into response body stream - this._requestSendError = err; - this._requestSendErrorSet = true; - break; - } - } - if (done /*&& !terminator.aborted*/) { - try { - await core.shutdown(req.requestBodyRid); - } catch (err) { - // TODO(bartlomieju): fix this conditional - // deno-lint-ignore no-constant-condition - if (true) { - this._requestSendError = err; - this._requestSendErrorSet = true; - } - } - } - //WeakMapPrototypeDelete(requestBodyReaders, req); - core.tryClose(req.requestBodyRid); - })(); - } + this._bodyWriteRid = this._req.requestBodyRid; } _getClient(): Deno.HttpClient | undefined { @@ -645,112 +607,92 @@ class ClientRequest extends OutgoingMessage { } } - // TODO(bartlomieju): use callback here // deno-lint-ignore no-explicit-any - end(chunk?: any, encoding?: any, _cb?: any): this { + end(chunk?: any, encoding?: any, cb?: any): this { this.finished = true; - - if (chunk !== undefined) { + if (chunk !== undefined && chunk !== null) { this.write(chunk, encoding); } - this.controller.close(); - core.opAsync("op_fetch_send", this._req.requestRid).then((res) => { - if (this._timeout) { - this._timeout.onabort = null; + (async () => { + try { + const [res, _] = await Promise.all([ + core.opAsync("op_fetch_send", this._req.requestRid), + (async () => { + if (this._bodyWriteRid) { + try { + await core.shutdown(this._bodyWriteRid); + } catch (err) { + this._requestSendError = err; + } + + core.tryClose(this._bodyWriteRid); + + try { + cb?.(); + } catch (_) { + // + } + } + })(), + ]); + if (this._timeout) { + this._timeout.onabort = null; + } + this._client.close(); + const incoming = new IncomingMessageForClient(this.socket); + + // TODO(@crowlKats): + // incoming.httpVersionMajor = versionMajor; + // incoming.httpVersionMinor = versionMinor; + // incoming.httpVersion = `${versionMajor}.${versionMinor}`; + // incoming.joinDuplicateHeaders = socket?.server?.joinDuplicateHeaders || + // parser.joinDuplicateHeaders; + + incoming.url = res.url; + incoming.statusCode = res.status; + incoming.statusMessage = res.statusText; + + incoming._addHeaderLines( + res.headers, + Object.entries(res.headers).flat().length, + ); + incoming._bodyRid = res.responseRid; + + if (this._req.cancelHandleRid !== null) { + core.tryClose(this._req.cancelHandleRid); + } + + this.emit("response", incoming); + } catch (err) { + if (this._req.cancelHandleRid !== null) { + core.tryClose(this._req.cancelHandleRid); + } + + if (this._requestSendError !== undefined) { + // if the request body stream errored, we want to propagate that error + // instead of the original error from opFetchSend + throw new TypeError( + "Failed to fetch: request body stream errored", + { + cause: this._requestSendError, + }, + ); + } + + if ( + err.message.includes("connection closed before message completed") + ) { + // Node.js seems ignoring this error + } else if (err.message.includes("The signal has been aborted")) { + // Remap this error + this.emit("error", connResetException("socket hang up")); + } else { + this.emit("error", err); + } } - this._client.close(); - const incoming = new IncomingMessageForClient(this.socket); - - // TODO(@crowlKats): - // incoming.httpVersionMajor = versionMajor; - // incoming.httpVersionMinor = versionMinor; - // incoming.httpVersion = `${versionMajor}.${versionMinor}`; - // incoming.joinDuplicateHeaders = socket?.server?.joinDuplicateHeaders || - // parser.joinDuplicateHeaders; - - incoming.url = res.url; - incoming.statusCode = res.status; - incoming.statusMessage = res.statusText; - - incoming._addHeaderLines( - res.headers, - Object.entries(res.headers).flat().length, - ); - incoming._bodyRid = res.responseRid; - - if (this._req.cancelHandleRid !== null) { - core.tryClose(this._req.cancelHandleRid); - } - - this.emit("response", incoming); - }).catch((err) => { - if (this._req.cancelHandleRid !== null) { - core.tryClose(this._req.cancelHandleRid); - } - - if (this._requestSendErrorSet) { - // if the request body stream errored, we want to propagate that error - // instead of the original error from opFetchSend - throw new TypeError("Failed to fetch: request body stream errored", { - cause: this._requestSendError, - }); - } - - if (err.message.includes("connection closed before message completed")) { - // Node.js seems ignoring this error - } else if (err.message.includes("The signal has been aborted")) { - // Remap this error - this.emit("error", connResetException("socket hang up")); - } else { - this.emit("error", err); - } - }); + })(); } - /* - override async _final() { - if (this.controller) { - this.controller.close(); - } - - const body = await this._createBody(this.body, this.opts); - const client = await this._createCustomClient(); - const opts = { - body, - method: this.opts.method, - client, - headers: this.opts.headers, - signal: this.opts.signal ?? undefined, - }; - const mayResponse = fetch(this._createUrlStrFromOptions(this.opts), opts) - .catch((e) => { - if (e.message.includes("connection closed before message completed")) { - // Node.js seems ignoring this error - } else if (e.message.includes("The signal has been aborted")) { - // Remap this error - this.emit("error", connResetException("socket hang up")); - } else { - this.emit("error", e); - } - return undefined; - }); - - const res = new IncomingMessageForClient( - await mayResponse, - this._createSocket(), - ); - this.emit("response", res); - if (client) { - res.on("end", () => { - client.close(); - }); - } - if (this.opts.timeout != undefined) { - clearTimeout(this.opts.timeout); - this.opts.timeout = undefined; - } - this.cb?.(res); - }*/ abort() { if (this.aborted) { diff --git a/runtime/build.rs b/runtime/build.rs index bd141d2970..334c3b11a9 100644 --- a/runtime/build.rs +++ b/runtime/build.rs @@ -122,6 +122,13 @@ mod startup_snapshot { } impl deno_node::NodePermissions for Permissions { + fn check_net_url( + &mut self, + _url: &deno_core::url::Url, + _api_name: &str, + ) -> Result<(), deno_core::error::AnyError> { + unreachable!("snapshotting!") + } fn check_read(&self, _p: &Path) -> Result<(), deno_core::error::AnyError> { unreachable!("snapshotting!") } diff --git a/runtime/permissions/mod.rs b/runtime/permissions/mod.rs index 6cce7d1e99..f1d0362cad 100644 --- a/runtime/permissions/mod.rs +++ b/runtime/permissions/mod.rs @@ -13,6 +13,7 @@ use deno_core::serde::Deserializer; use deno_core::serde::Serialize; use deno_core::serde_json; use deno_core::url; +use deno_core::url::Url; use deno_core::ModuleSpecifier; use deno_core::OpState; use log; @@ -1871,6 +1872,15 @@ impl PermissionsContainer { } impl deno_node::NodePermissions for PermissionsContainer { + #[inline(always)] + fn check_net_url( + &mut self, + url: &Url, + api_name: &str, + ) -> Result<(), AnyError> { + self.0.lock().net.check_url(url, Some(api_name)) + } + #[inline(always)] fn check_read(&self, path: &Path) -> Result<(), AnyError> { self.0.lock().read.check(path, None) diff --git a/tools/node_compat/TODO.md b/tools/node_compat/TODO.md index d29fe8f9ff..3aff62668d 100644 --- a/tools/node_compat/TODO.md +++ b/tools/node_compat/TODO.md @@ -3,7 +3,7 @@ NOTE: This file should not be manually edited. Please edit 'cli/tests/node_compat/config.json' and run 'tools/node_compat/setup.ts' instead. -Total: 2934 +Total: 2935 - [abort/test-abort-backtrace.js](https://github.com/nodejs/node/tree/v18.12.1/test/abort/test-abort-backtrace.js) - [abort/test-abort-fatal-error.js](https://github.com/nodejs/node/tree/v18.12.1/test/abort/test-abort-fatal-error.js) @@ -359,6 +359,7 @@ Total: 2934 - [parallel/test-cli-syntax-eval.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cli-syntax-eval.js) - [parallel/test-cli-syntax-piped-bad.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cli-syntax-piped-bad.js) - [parallel/test-cli-syntax-piped-good.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cli-syntax-piped-good.js) +- [parallel/test-client-request-destroy.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-client-request-destroy.js) - [parallel/test-cluster-accept-fail.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-accept-fail.js) - [parallel/test-cluster-advanced-serialization.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-advanced-serialization.js) - [parallel/test-cluster-basic.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-basic.js)