diff --git a/cli/tests/integration/node_unit_tests.rs b/cli/tests/integration/node_unit_tests.rs index f62c8761cf..363e5dfa34 100644 --- a/cli/tests/integration/node_unit_tests.rs +++ b/cli/tests/integration/node_unit_tests.rs @@ -53,6 +53,7 @@ util::unit_test_factory!( crypto_sign_test = crypto / crypto_sign_test, fs_test, http_test, + http2_test, _randomBytes_test = internal / _randomBytes_test, _randomFill_test = internal / _randomFill_test, _randomInt_test = internal / _randomInt_test, diff --git a/cli/tests/unit_node/http2_test.ts b/cli/tests/unit_node/http2_test.ts new file mode 100644 index 0000000000..543543cbdc --- /dev/null +++ b/cli/tests/unit_node/http2_test.ts @@ -0,0 +1,104 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. + +import * as http2 from "node:http2"; +import * as net from "node:net"; +import { deferred } from "../../../test_util/std/async/deferred.ts"; +import { assertEquals } from "https://deno.land/std@v0.42.0/testing/asserts.ts"; + +const { + HTTP2_HEADER_AUTHORITY, + HTTP2_HEADER_METHOD, + HTTP2_HEADER_PATH, + HTTP2_HEADER_STATUS, +} = http2.constants; + +Deno.test("[node/http2 client]", async () => { + // Create a server to respond to the HTTP2 requests + const portPromise = deferred(); + const reqPromise = deferred(); + const ready = deferred(); + const ac = new AbortController(); + const server = Deno.serve({ + port: 0, + signal: ac.signal, + onListen: ({ port }: { port: number }) => portPromise.resolve(port), + handler: async (req: Request) => { + reqPromise.resolve(req); + await ready; + return new Response("body", { + status: 401, + headers: { "resp-header-name": "resp-header-value" }, + }); + }, + }); + + const port = await portPromise; + + // Get a session + const sessionPromise = deferred(); + const session = http2.connect( + `localhost:${port}`, + {}, + sessionPromise.resolve.bind(sessionPromise), + ); + const session2 = await sessionPromise; + assertEquals(session, session2); + + // Write a request, including a body + const stream = session.request({ + [HTTP2_HEADER_AUTHORITY]: `localhost:${port}`, + [HTTP2_HEADER_METHOD]: "POST", + [HTTP2_HEADER_PATH]: "/path", + "req-header-name": "req-header-value", + }); + stream.write("body"); + stream.end(); + + // Check the request + const req = await reqPromise; + assertEquals(req.headers.get("req-header-name"), "req-header-value"); + assertEquals(await req.text(), "body"); + + ready.resolve(); + + // Read a response + const headerPromise = new Promise>(( + resolve, + ) => stream.on("headers", resolve)); + const headers = await headerPromise; + assertEquals(headers["resp-header-name"], "resp-header-value"); + assertEquals(headers[HTTP2_HEADER_STATUS], "401"); + + ac.abort(); + await server.finished; +}); + +Deno.test("[node/http2 server]", async () => { + const server = http2.createServer(); + server.listen(0); + const port = ( server.address()).port; + const sessionPromise = new Promise((resolve) => + server.on("session", resolve) + ); + + const responsePromise = fetch(`http://localhost:${port}/path`, { + method: "POST", + body: "body", + }); + + const session = await sessionPromise; + const stream = await new Promise((resolve) => + session.on("stream", resolve) + ); + const _headers = await new Promise((resolve) => + stream.on("headers", resolve) + ); + const _data = await new Promise((resolve) => stream.on("data", resolve)); + const _end = await new Promise((resolve) => stream.on("end", resolve)); + stream.respond(); + stream.end(); + const resp = await responsePromise; + await resp.text(); + + await new Promise((resolve) => server.close(resolve)); +}); diff --git a/ext/http/00_serve.js b/ext/http/00_serve.js index fa55079e77..c5a5c0e189 100644 --- a/ext/http/00_serve.js +++ b/ext/http/00_serve.js @@ -748,4 +748,10 @@ internals.upgradeHttpRaw = upgradeHttpRaw; internals.serveHttpOnListener = serveHttpOnListener; internals.serveHttpOnConnection = serveHttpOnConnection; -export { serve, upgradeHttpRaw }; +export { + addTrailers, + serve, + serveHttpOnConnection, + serveHttpOnListener, + upgradeHttpRaw, +}; diff --git a/ext/node/polyfills/http2.ts b/ext/node/polyfills/http2.ts index 90b1be1a2f..a5d945efea 100644 --- a/ext/node/polyfills/http2.ts +++ b/ext/node/polyfills/http2.ts @@ -1,12 +1,21 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. // Copyright Joyent and Node contributors. All rights reserved. MIT license. -import { notImplemented } from "ext:deno_node/_utils.ts"; +import { notImplemented, warnNotImplemented } from "ext:deno_node/_utils.ts"; import { EventEmitter } from "ext:deno_node/events.ts"; import { Buffer } from "ext:deno_node/buffer.ts"; -import { Socket } from "ext:deno_node/net.ts"; +import { Server, Socket, TCP } from "ext:deno_node/net.ts"; import { TypedArray } from "ext:deno_node/internal/util/types.ts"; +import { setStreamTimeout } from "ext:deno_node/internal/stream_base_commons.ts"; import { FileHandle } from "ext:deno_node/fs/promises.ts"; +import { kStreamBaseField } from "ext:deno_node/internal_binding/stream_wrap.ts"; +import { addTrailers, serveHttpOnConnection } from "ext:deno_http/00_serve.js"; +import { type Deferred, deferred } from "ext:deno_node/_util/async.ts"; +import { nextTick } from "ext:deno_node/_next_tick.ts"; +import { TextEncoder } from "ext:deno_web/08_text_encoding.js"; + +const ENCODER = new TextEncoder(); +type Http2Headers = Record; export class Http2Session extends EventEmitter { constructor() { @@ -19,11 +28,10 @@ export class Http2Session extends EventEmitter { } close(_callback?: () => void) { - notImplemented("Http2Session.close"); + warnNotImplemented("Http2Session.close"); } get closed(): boolean { - notImplemented("Http2Session.closed"); return false; } @@ -37,7 +45,6 @@ export class Http2Session extends EventEmitter { } get destroyed(): boolean { - notImplemented("Http2Session.destroyed"); return false; } @@ -78,7 +85,7 @@ export class Http2Session extends EventEmitter { } ref() { - notImplemented("Http2Session.ref"); + warnNotImplemented("Http2Session.ref"); } get remoteSettings(): Record { @@ -90,17 +97,15 @@ export class Http2Session extends EventEmitter { notImplemented("Http2Session.setLocalWindowSize"); } - setTimeout(_msecs: number, _callback: () => void) { - notImplemented("Http2Session.setTimeout"); + setTimeout(msecs: number, callback?: () => void) { + setStreamTimeout(this, msecs, callback); } get socket(): Socket /*| TlsSocket*/ { - notImplemented("Http2Session.socket"); - return null; + return {}; } get state(): Record { - notImplemented("Http2Session.state"); return {}; } @@ -114,7 +119,7 @@ export class Http2Session extends EventEmitter { } unref() { - notImplemented("Http2Session.unref"); + warnNotImplemented("Http2Session.unref"); } } @@ -136,21 +141,131 @@ export class ServerHttp2Session extends Http2Session { } export class ClientHttp2Session extends Http2Session { - constructor() { + constructor( + _authority: string | URL, + _options: Record, + callback: (session: Http2Session) => void, + ) { super(); + if (callback) { + this.on("connect", callback); + } + nextTick(() => this.emit("connect", this)); } request( - _headers: Record, + headers: Http2Headers, _options?: Record, ): ClientHttp2Stream { - notImplemented("ClientHttp2Session.request"); - return new ClientHttp2Stream(); + const reqHeaders: string[][] = []; + const controllerPromise: Deferred< + ReadableStreamDefaultController + > = deferred(); + const body = new ReadableStream({ + start(controller) { + controllerPromise.resolve(controller); + }, + }); + const request: RequestInit = { headers: reqHeaders, body }; + let authority = null; + let path = null; + for (const [name, value] of Object.entries(headers)) { + if (name == constants.HTTP2_HEADER_PATH) { + path = String(value); + } else if (name == constants.HTTP2_HEADER_METHOD) { + request.method = String(value); + } else if (name == constants.HTTP2_HEADER_AUTHORITY) { + authority = String(value); + } else { + reqHeaders.push([name, String(value)]); + } + } + + const fetchPromise = fetch(`http://${authority}${path}`, request); + const readerPromise = deferred(); + const headersPromise = deferred(); + (async () => { + const fetch = await fetchPromise; + readerPromise.resolve(fetch.body); + + const headers: Http2Headers = {}; + for (const [key, value] of fetch.headers) { + headers[key] = value; + } + headers[constants.HTTP2_HEADER_STATUS] = String(fetch.status); + + headersPromise.resolve(headers); + })(); + return new ClientHttp2Stream( + this, + headersPromise, + controllerPromise, + readerPromise, + ); } } -export class Http2Stream { - constructor() { +export class Http2Stream extends EventEmitter { + #session: Http2Session; + #headers: Deferred; + #controllerPromise: Deferred>; + #readerPromise: Deferred>; + #closed: boolean; + _response: Response; + + constructor( + session: Http2Session, + headers: Promise, + controllerPromise: Promise>, + readerPromise: Promise>, + ) { + super(); + this.#session = session; + this.#headers = headers; + this.#controllerPromise = controllerPromise; + this.#readerPromise = readerPromise; + this.#closed = false; + nextTick(() => { + (async () => { + const headers = await this.#headers; + this.emit("headers", headers); + })(); + (async () => { + const reader = await this.#readerPromise; + if (reader) { + for await (const data of reader) { + this.emit("data", new Buffer(data)); + } + } + this.emit("end"); + })(); + }); + } + + // TODO(mmastrac): Implement duplex + end() { + (async () => { + const controller = await this.#controllerPromise; + controller.close(); + })(); + } + + write(buffer, callback?: () => void) { + (async () => { + const controller = await this.#controllerPromise; + if (typeof buffer === "string") { + controller.enqueue(ENCODER.encode(buffer)); + } else { + controller.enqueue(buffer); + } + callback?.(); + })(); + } + + resume() { + } + + pause() { } get aborted(): boolean { @@ -164,16 +279,15 @@ export class Http2Stream { } close(_code: number, _callback: () => void) { - notImplemented("Http2Stream.close"); + this.#closed = true; + this.emit("close"); } get closed(): boolean { - notImplemented("Http2Stream.closed"); - return false; + return this.#closed; } get destroyed(): boolean { - notImplemented("Http2Stream.destroyed"); return false; } @@ -197,7 +311,7 @@ export class Http2Stream { } get rstCode(): number { - notImplemented("Http2Stream.rstCode"); + // notImplemented("Http2Stream.rstCode"); return 0; } @@ -217,12 +331,11 @@ export class Http2Stream { } get session(): Http2Session { - notImplemented("Http2Stream.session"); - return new Http2Session(); + return this.#session; } - setTimeout(_msecs: number, _callback: () => void) { - notImplemented("Http2Stream.setTimeout"); + setTimeout(msecs: number, callback?: () => void) { + setStreamTimeout(this, msecs, callback); } get state(): Record { @@ -231,28 +344,52 @@ export class Http2Stream { } sendTrailers(_headers: Record) { - notImplemented("Http2Stream.sendTrailers"); + addTrailers(this._response, [["grpc-status", "0"], ["grpc-message", "OK"]]); } } export class ClientHttp2Stream extends Http2Stream { - constructor() { - super(); + constructor( + session: Http2Session, + headers: Promise, + controllerPromise: Deferred>, + readerPromise: Deferred>, + ) { + super(session, headers, controllerPromise, readerPromise); } } export class ServerHttp2Stream extends Http2Stream { - constructor() { - super(); + _promise: Deferred; + #body: ReadableStream; + #waitForTrailers: boolean; + #headersSent: boolean; + + constructor( + session: Http2Session, + headers: Promise, + controllerPromise: Promise>, + reader: ReadableStream, + body: ReadableStream, + ) { + super(session, headers, controllerPromise, Promise.resolve(reader)); + this._promise = new deferred(); + this.#body = body; } additionalHeaders(_headers: Record) { notImplemented("ServerHttp2Stream.additionalHeaders"); } + end(): void { + super.end(); + if (this.#waitForTrailers) { + this.emit("wantTrailers"); + } + } + get headersSent(): boolean { - notImplemented("ServerHttp2Stream.headersSent"); - return false; + return this.#headersSent; } get pushAllowed(): boolean { @@ -269,10 +406,26 @@ export class ServerHttp2Stream extends Http2Stream { } respond( - _headers: Record, - _options: Record, + headers: Http2Headers, + options: Record, ) { - notImplemented("ServerHttp2Stream.respond"); + this.#headersSent = true; + const response: ResponseInit = {}; + if (headers) { + for (const [name, value] of Object.entries(headers)) { + if (name == constants.HTTP2_HEADER_STATUS) { + response.status = Number(value); + } + } + } + if (options?.endStream) { + this._promise.resolve(this._response = new Response("", response)); + } else { + this.#waitForTrailers = options?.waitForTrailers; + this._promise.resolve( + this._response = new Response(this.#body, response), + ); + } } respondWithFD( @@ -292,56 +445,145 @@ export class ServerHttp2Stream extends Http2Stream { } } -export class Http2Server { - constructor() { +export class Http2Server extends Server { + #options: Record = {}; + #abortController; + #server; + timeout = 0; + + constructor( + options: Record, + requestListener: () => unknown, + ) { + super(options); + this.#abortController = new AbortController(); + this.on( + "connection", + (conn: Deno.Conn) => { + try { + const session = new ServerHttp2Session(); + this.emit("session", session); + this.#server = serveHttpOnConnection( + conn, + this.#abortController.signal, + async (req: Request) => { + try { + const controllerPromise: Deferred< + ReadableStreamDefaultController + > = deferred(); + const body = new ReadableStream({ + start(controller) { + controllerPromise.resolve(controller); + }, + }); + const headers: Http2Headers = {}; + for (const [name, value] of req.headers) { + headers[name] = value; + } + headers[constants.HTTP2_HEADER_PATH] = + new URL(req.url).pathname; + const stream = new ServerHttp2Stream( + session, + Promise.resolve(headers), + controllerPromise, + req.body, + body, + ); + session.emit("stream", stream, headers); + this.emit("stream", stream, headers); + return await stream._promise; + } catch (e) { + console.log("Error in serveHttpOnConnection", e); + } + return new Response(""); + }, + () => { + console.log("error"); + }, + () => {}, + ); + } catch (e) { + console.log("Error in Http2Server", e); + } + }, + ); + this.on( + "newListener", + (event) => console.log(`Event in newListener: ${event}`), + ); + this.#options = options; + if (typeof requestListener === "function") { + this.on("request", requestListener); + } } - close(_callback?: () => unknown) { - notImplemented("Http2Server.close"); + // Prevent the TCP server from wrapping this in a socket, since we need it to serve HTTP + _createSocket(clientHandle: TCP) { + return clientHandle[kStreamBaseField]; } - setTimeout(_msecs: number, _callback?: () => unknown) { - notImplemented("Http2Server.setTimeout"); + close(callback?: () => unknown) { + if (callback) { + this.on("close", callback); + } + this.#abortController.abort(); + super.close(); } - get timeout(): number { - notImplemented("Http2Server.timeout"); - return 0; + setTimeout(msecs: number, callback?: () => unknown) { + this.timeout = msecs; + if (callback !== undefined) { + this.on("timeout", callback); + } } - updateSettings(_settings: Record) { - notImplemented("Http2Server.updateSettings"); + updateSettings(settings: Record) { + this.#options.settings = { ...this.#options.settings, ...settings }; } } -export class Http2SecureServer { - constructor() { +export class Http2SecureServer extends Server { + #options: Record = {}; + timeout = 0; + + constructor( + options: Record, + requestListener: () => unknown, + ) { + super(options, function () { + notImplemented("connectionListener"); + }); + this.#options = options; + if (typeof requestListener === "function") { + this.on("request", requestListener); + } } close(_callback?: () => unknown) { notImplemented("Http2SecureServer.close"); } - setTimeout(_msecs: number, _callback?: () => unknown) { - notImplemented("Http2SecureServer.setTimeout"); + setTimeout(msecs: number, callback?: () => unknown) { + this.timeout = msecs; + if (callback !== undefined) { + this.on("timeout", callback); + } } - get timeout(): number { - notImplemented("Http2SecureServer.timeout"); - return 0; - } - - updateSettings(_settings: Record) { - notImplemented("Http2SecureServer.updateSettings"); + updateSettings(settings: Record) { + this.#options.settings = { ...this.#options.settings, ...settings }; } } export function createServer( - _options: Record, - _onRequestHandler: () => unknown, + options: Record, + onRequestHandler: () => unknown, ): Http2Server { - notImplemented("http2.createServer"); - return new Http2Server(); + if (typeof options === "function") { + onRequestHandler = options; + options = {}; + } + return new Http2Server(options, onRequestHandler); } export function createSecureServer( @@ -353,11 +595,11 @@ export function createSecureServer( } export function connect( - _authority: string | URL, - _options: Record, + authority: string | URL, + options: Record, + callback: (session: ClientHttp2Session) => void, ): ClientHttp2Session { - notImplemented("http2.connect"); - return new ClientHttp2Session(); + return new ClientHttp2Session(authority, options, callback); } export const constants = { @@ -681,8 +923,8 @@ export class Http2ServerRequest { return ""; } - setTimeout(_msecs: number, _callback: () => unknown) { - notImplemented("Http2ServerRequest.setTimeout"); + setTimeout(msecs: number, callback?: () => unknown) { + this.stream.setTimeout(callback, msecs); } get socket(): Socket /*| TlsSocket*/ { @@ -781,8 +1023,8 @@ export class Http2ServerResponse { notImplemented("Http2ServerResponse.setHeader"); } - setTimeout(_msecs: number, _callback: () => unknown) { - notImplemented("Http2ServerResponse.setTimeout"); + setTimeout(msecs: number, callback?: () => unknown) { + this.stream.setTimeout(msecs, callback); } get socket(): Socket /*| TlsSocket*/ { diff --git a/ext/node/polyfills/net.ts b/ext/node/polyfills/net.ts index 2c2f5f9448..79845adb2e 100644 --- a/ext/node/polyfills/net.ts +++ b/ext/node/polyfills/net.ts @@ -1834,21 +1834,8 @@ function _onconnection(this: any, err: number, clientHandle?: Handle) { return; } - const socket = new Socket({ - handle: clientHandle, - allowHalfOpen: self.allowHalfOpen, - pauseOnCreate: self.pauseOnConnect, - readable: true, - writable: true, - }); - - // TODO(@bartlomieju): implement noDelay and setKeepAlive - - self._connections++; - socket.server = self; - socket._server = self; - - DTRACE_NET_SERVER_CONNECTION(socket); + const socket = self._createSocket(clientHandle); + this._connections++; self.emit("connection", socket); if (netServerSocketChannel.hasSubscribers) { @@ -2369,6 +2356,23 @@ export class Server extends EventEmitter { return !!this._handle; } + _createSocket(clientHandle) { + const socket = new Socket({ + handle: clientHandle, + allowHalfOpen: this.allowHalfOpen, + pauseOnCreate: this.pauseOnConnect, + readable: true, + writable: true, + }); + + // TODO(@bartlomieju): implement noDelay and setKeepAlive + + socket.server = this; + socket._server = this; + + DTRACE_NET_SERVER_CONNECTION(socket); + } + _listen2 = _setupListenHandle; _emitCloseIfDrained() {