0
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-02-02 04:38:21 -05:00

feat(ext/node): Very basic node:http2 support (#19344)

This commit adds basic support for "node:http2" module. Not
all APIs have been yet implemented, but this change already
allows to use this module for some basic functions. 

The "grpc" package is still not working, but it's a good stepping
stone.

---------

Co-authored-by: Bartek Iwańczuk <biwanczuk@gmail.com>
This commit is contained in:
Matt Mastracci 2023-06-06 04:29:55 -06:00 committed by Bartek Iwańczuk
parent 05282308ff
commit 03ad309ccd
No known key found for this signature in database
GPG key ID: 0C6BCDDC3B3AD750
5 changed files with 444 additions and 87 deletions

View file

@ -53,6 +53,7 @@ util::unit_test_factory!(
crypto_sign_test = crypto / crypto_sign_test,
fs_test,
http_test,
http2_test,
_randomBytes_test = internal / _randomBytes_test,
_randomFill_test = internal / _randomFill_test,
_randomInt_test = internal / _randomInt_test,

View file

@ -0,0 +1,104 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
import * as http2 from "node:http2";
import * as net from "node:net";
import { deferred } from "../../../test_util/std/async/deferred.ts";
import { assertEquals } from "https://deno.land/std@v0.42.0/testing/asserts.ts";
const {
HTTP2_HEADER_AUTHORITY,
HTTP2_HEADER_METHOD,
HTTP2_HEADER_PATH,
HTTP2_HEADER_STATUS,
} = http2.constants;
Deno.test("[node/http2 client]", async () => {
// Create a server to respond to the HTTP2 requests
const portPromise = deferred();
const reqPromise = deferred<Request>();
const ready = deferred();
const ac = new AbortController();
const server = Deno.serve({
port: 0,
signal: ac.signal,
onListen: ({ port }: { port: number }) => portPromise.resolve(port),
handler: async (req: Request) => {
reqPromise.resolve(req);
await ready;
return new Response("body", {
status: 401,
headers: { "resp-header-name": "resp-header-value" },
});
},
});
const port = await portPromise;
// Get a session
const sessionPromise = deferred();
const session = http2.connect(
`localhost:${port}`,
{},
sessionPromise.resolve.bind(sessionPromise),
);
const session2 = await sessionPromise;
assertEquals(session, session2);
// Write a request, including a body
const stream = session.request({
[HTTP2_HEADER_AUTHORITY]: `localhost:${port}`,
[HTTP2_HEADER_METHOD]: "POST",
[HTTP2_HEADER_PATH]: "/path",
"req-header-name": "req-header-value",
});
stream.write("body");
stream.end();
// Check the request
const req = await reqPromise;
assertEquals(req.headers.get("req-header-name"), "req-header-value");
assertEquals(await req.text(), "body");
ready.resolve();
// Read a response
const headerPromise = new Promise<Record<string, string | string[]>>((
resolve,
) => stream.on("headers", resolve));
const headers = await headerPromise;
assertEquals(headers["resp-header-name"], "resp-header-value");
assertEquals(headers[HTTP2_HEADER_STATUS], "401");
ac.abort();
await server.finished;
});
Deno.test("[node/http2 server]", async () => {
const server = http2.createServer();
server.listen(0);
const port = (<net.AddressInfo> server.address()).port;
const sessionPromise = new Promise<http2.Http2Session>((resolve) =>
server.on("session", resolve)
);
const responsePromise = fetch(`http://localhost:${port}/path`, {
method: "POST",
body: "body",
});
const session = await sessionPromise;
const stream = await new Promise<http2.ServerHttp2Stream>((resolve) =>
session.on("stream", resolve)
);
const _headers = await new Promise((resolve) =>
stream.on("headers", resolve)
);
const _data = await new Promise((resolve) => stream.on("data", resolve));
const _end = await new Promise((resolve) => stream.on("end", resolve));
stream.respond();
stream.end();
const resp = await responsePromise;
await resp.text();
await new Promise((resolve) => server.close(resolve));
});

View file

@ -748,4 +748,10 @@ internals.upgradeHttpRaw = upgradeHttpRaw;
internals.serveHttpOnListener = serveHttpOnListener;
internals.serveHttpOnConnection = serveHttpOnConnection;
export { serve, upgradeHttpRaw };
export {
addTrailers,
serve,
serveHttpOnConnection,
serveHttpOnListener,
upgradeHttpRaw,
};

View file

@ -1,12 +1,21 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
// Copyright Joyent and Node contributors. All rights reserved. MIT license.
import { notImplemented } from "ext:deno_node/_utils.ts";
import { notImplemented, warnNotImplemented } from "ext:deno_node/_utils.ts";
import { EventEmitter } from "ext:deno_node/events.ts";
import { Buffer } from "ext:deno_node/buffer.ts";
import { Socket } from "ext:deno_node/net.ts";
import { Server, Socket, TCP } from "ext:deno_node/net.ts";
import { TypedArray } from "ext:deno_node/internal/util/types.ts";
import { setStreamTimeout } from "ext:deno_node/internal/stream_base_commons.ts";
import { FileHandle } from "ext:deno_node/fs/promises.ts";
import { kStreamBaseField } from "ext:deno_node/internal_binding/stream_wrap.ts";
import { addTrailers, serveHttpOnConnection } from "ext:deno_http/00_serve.js";
import { type Deferred, deferred } from "ext:deno_node/_util/async.ts";
import { nextTick } from "ext:deno_node/_next_tick.ts";
import { TextEncoder } from "ext:deno_web/08_text_encoding.js";
const ENCODER = new TextEncoder();
type Http2Headers = Record<string, string | string[]>;
export class Http2Session extends EventEmitter {
constructor() {
@ -19,11 +28,10 @@ export class Http2Session extends EventEmitter {
}
close(_callback?: () => void) {
notImplemented("Http2Session.close");
warnNotImplemented("Http2Session.close");
}
get closed(): boolean {
notImplemented("Http2Session.closed");
return false;
}
@ -37,7 +45,6 @@ export class Http2Session extends EventEmitter {
}
get destroyed(): boolean {
notImplemented("Http2Session.destroyed");
return false;
}
@ -78,7 +85,7 @@ export class Http2Session extends EventEmitter {
}
ref() {
notImplemented("Http2Session.ref");
warnNotImplemented("Http2Session.ref");
}
get remoteSettings(): Record<string, unknown> {
@ -90,17 +97,15 @@ export class Http2Session extends EventEmitter {
notImplemented("Http2Session.setLocalWindowSize");
}
setTimeout(_msecs: number, _callback: () => void) {
notImplemented("Http2Session.setTimeout");
setTimeout(msecs: number, callback?: () => void) {
setStreamTimeout(this, msecs, callback);
}
get socket(): Socket /*| TlsSocket*/ {
notImplemented("Http2Session.socket");
return null;
return {};
}
get state(): Record<string, unknown> {
notImplemented("Http2Session.state");
return {};
}
@ -114,7 +119,7 @@ export class Http2Session extends EventEmitter {
}
unref() {
notImplemented("Http2Session.unref");
warnNotImplemented("Http2Session.unref");
}
}
@ -136,21 +141,131 @@ export class ServerHttp2Session extends Http2Session {
}
export class ClientHttp2Session extends Http2Session {
constructor() {
constructor(
_authority: string | URL,
_options: Record<string, unknown>,
callback: (session: Http2Session) => void,
) {
super();
if (callback) {
this.on("connect", callback);
}
nextTick(() => this.emit("connect", this));
}
request(
_headers: Record<string, unknown>,
headers: Http2Headers,
_options?: Record<string, unknown>,
): ClientHttp2Stream {
notImplemented("ClientHttp2Session.request");
return new ClientHttp2Stream();
const reqHeaders: string[][] = [];
const controllerPromise: Deferred<
ReadableStreamDefaultController<Uint8Array>
> = deferred();
const body = new ReadableStream({
start(controller) {
controllerPromise.resolve(controller);
},
});
const request: RequestInit = { headers: reqHeaders, body };
let authority = null;
let path = null;
for (const [name, value] of Object.entries(headers)) {
if (name == constants.HTTP2_HEADER_PATH) {
path = String(value);
} else if (name == constants.HTTP2_HEADER_METHOD) {
request.method = String(value);
} else if (name == constants.HTTP2_HEADER_AUTHORITY) {
authority = String(value);
} else {
reqHeaders.push([name, String(value)]);
}
}
const fetchPromise = fetch(`http://${authority}${path}`, request);
const readerPromise = deferred();
const headersPromise = deferred();
(async () => {
const fetch = await fetchPromise;
readerPromise.resolve(fetch.body);
const headers: Http2Headers = {};
for (const [key, value] of fetch.headers) {
headers[key] = value;
}
headers[constants.HTTP2_HEADER_STATUS] = String(fetch.status);
headersPromise.resolve(headers);
})();
return new ClientHttp2Stream(
this,
headersPromise,
controllerPromise,
readerPromise,
);
}
}
export class Http2Stream {
constructor() {
export class Http2Stream extends EventEmitter {
#session: Http2Session;
#headers: Deferred<Http2Headers>;
#controllerPromise: Deferred<ReadableStreamDefaultController<Uint8Array>>;
#readerPromise: Deferred<ReadableStream<Uint8Array>>;
#closed: boolean;
_response: Response;
constructor(
session: Http2Session,
headers: Promise<Http2Headers>,
controllerPromise: Promise<ReadableStreamDefaultController<Uint8Array>>,
readerPromise: Promise<ReadableStream<Uint8Array>>,
) {
super();
this.#session = session;
this.#headers = headers;
this.#controllerPromise = controllerPromise;
this.#readerPromise = readerPromise;
this.#closed = false;
nextTick(() => {
(async () => {
const headers = await this.#headers;
this.emit("headers", headers);
})();
(async () => {
const reader = await this.#readerPromise;
if (reader) {
for await (const data of reader) {
this.emit("data", new Buffer(data));
}
}
this.emit("end");
})();
});
}
// TODO(mmastrac): Implement duplex
end() {
(async () => {
const controller = await this.#controllerPromise;
controller.close();
})();
}
write(buffer, callback?: () => void) {
(async () => {
const controller = await this.#controllerPromise;
if (typeof buffer === "string") {
controller.enqueue(ENCODER.encode(buffer));
} else {
controller.enqueue(buffer);
}
callback?.();
})();
}
resume() {
}
pause() {
}
get aborted(): boolean {
@ -164,16 +279,15 @@ export class Http2Stream {
}
close(_code: number, _callback: () => void) {
notImplemented("Http2Stream.close");
this.#closed = true;
this.emit("close");
}
get closed(): boolean {
notImplemented("Http2Stream.closed");
return false;
return this.#closed;
}
get destroyed(): boolean {
notImplemented("Http2Stream.destroyed");
return false;
}
@ -197,7 +311,7 @@ export class Http2Stream {
}
get rstCode(): number {
notImplemented("Http2Stream.rstCode");
// notImplemented("Http2Stream.rstCode");
return 0;
}
@ -217,12 +331,11 @@ export class Http2Stream {
}
get session(): Http2Session {
notImplemented("Http2Stream.session");
return new Http2Session();
return this.#session;
}
setTimeout(_msecs: number, _callback: () => void) {
notImplemented("Http2Stream.setTimeout");
setTimeout(msecs: number, callback?: () => void) {
setStreamTimeout(this, msecs, callback);
}
get state(): Record<string, unknown> {
@ -231,28 +344,52 @@ export class Http2Stream {
}
sendTrailers(_headers: Record<string, unknown>) {
notImplemented("Http2Stream.sendTrailers");
addTrailers(this._response, [["grpc-status", "0"], ["grpc-message", "OK"]]);
}
}
export class ClientHttp2Stream extends Http2Stream {
constructor() {
super();
constructor(
session: Http2Session,
headers: Promise<Http2Headers>,
controllerPromise: Deferred<ReadableStreamDefaultController<Uint8Array>>,
readerPromise: Deferred<ReadableStream<Uint8Array>>,
) {
super(session, headers, controllerPromise, readerPromise);
}
}
export class ServerHttp2Stream extends Http2Stream {
constructor() {
super();
_promise: Deferred<Response>;
#body: ReadableStream<Uint8Array>;
#waitForTrailers: boolean;
#headersSent: boolean;
constructor(
session: Http2Session,
headers: Promise<Http2Headers>,
controllerPromise: Promise<ReadableStreamDefaultController<Uint8Array>>,
reader: ReadableStream<Uint8Array>,
body: ReadableStream<Uint8Array>,
) {
super(session, headers, controllerPromise, Promise.resolve(reader));
this._promise = new deferred();
this.#body = body;
}
additionalHeaders(_headers: Record<string, unknown>) {
notImplemented("ServerHttp2Stream.additionalHeaders");
}
end(): void {
super.end();
if (this.#waitForTrailers) {
this.emit("wantTrailers");
}
}
get headersSent(): boolean {
notImplemented("ServerHttp2Stream.headersSent");
return false;
return this.#headersSent;
}
get pushAllowed(): boolean {
@ -269,10 +406,26 @@ export class ServerHttp2Stream extends Http2Stream {
}
respond(
_headers: Record<string, unknown>,
_options: Record<string, unknown>,
headers: Http2Headers,
options: Record<string, unknown>,
) {
notImplemented("ServerHttp2Stream.respond");
this.#headersSent = true;
const response: ResponseInit = {};
if (headers) {
for (const [name, value] of Object.entries(headers)) {
if (name == constants.HTTP2_HEADER_STATUS) {
response.status = Number(value);
}
}
}
if (options?.endStream) {
this._promise.resolve(this._response = new Response("", response));
} else {
this.#waitForTrailers = options?.waitForTrailers;
this._promise.resolve(
this._response = new Response(this.#body, response),
);
}
}
respondWithFD(
@ -292,56 +445,145 @@ export class ServerHttp2Stream extends Http2Stream {
}
}
export class Http2Server {
constructor() {
export class Http2Server extends Server {
#options: Record<string, unknown> = {};
#abortController;
#server;
timeout = 0;
constructor(
options: Record<string, unknown>,
requestListener: () => unknown,
) {
super(options);
this.#abortController = new AbortController();
this.on(
"connection",
(conn: Deno.Conn) => {
try {
const session = new ServerHttp2Session();
this.emit("session", session);
this.#server = serveHttpOnConnection(
conn,
this.#abortController.signal,
async (req: Request) => {
try {
const controllerPromise: Deferred<
ReadableStreamDefaultController<Uint8Array>
> = deferred();
const body = new ReadableStream({
start(controller) {
controllerPromise.resolve(controller);
},
});
const headers: Http2Headers = {};
for (const [name, value] of req.headers) {
headers[name] = value;
}
headers[constants.HTTP2_HEADER_PATH] =
new URL(req.url).pathname;
const stream = new ServerHttp2Stream(
session,
Promise.resolve(headers),
controllerPromise,
req.body,
body,
);
session.emit("stream", stream, headers);
this.emit("stream", stream, headers);
return await stream._promise;
} catch (e) {
console.log("Error in serveHttpOnConnection", e);
}
return new Response("");
},
() => {
console.log("error");
},
() => {},
);
} catch (e) {
console.log("Error in Http2Server", e);
}
},
);
this.on(
"newListener",
(event) => console.log(`Event in newListener: ${event}`),
);
this.#options = options;
if (typeof requestListener === "function") {
this.on("request", requestListener);
}
}
close(_callback?: () => unknown) {
notImplemented("Http2Server.close");
// Prevent the TCP server from wrapping this in a socket, since we need it to serve HTTP
_createSocket(clientHandle: TCP) {
return clientHandle[kStreamBaseField];
}
setTimeout(_msecs: number, _callback?: () => unknown) {
notImplemented("Http2Server.setTimeout");
close(callback?: () => unknown) {
if (callback) {
this.on("close", callback);
}
this.#abortController.abort();
super.close();
}
get timeout(): number {
notImplemented("Http2Server.timeout");
return 0;
setTimeout(msecs: number, callback?: () => unknown) {
this.timeout = msecs;
if (callback !== undefined) {
this.on("timeout", callback);
}
}
updateSettings(_settings: Record<string, unknown>) {
notImplemented("Http2Server.updateSettings");
updateSettings(settings: Record<string, unknown>) {
this.#options.settings = { ...this.#options.settings, ...settings };
}
}
export class Http2SecureServer {
constructor() {
export class Http2SecureServer extends Server {
#options: Record<string, unknown> = {};
timeout = 0;
constructor(
options: Record<string, unknown>,
requestListener: () => unknown,
) {
super(options, function () {
notImplemented("connectionListener");
});
this.#options = options;
if (typeof requestListener === "function") {
this.on("request", requestListener);
}
}
close(_callback?: () => unknown) {
notImplemented("Http2SecureServer.close");
}
setTimeout(_msecs: number, _callback?: () => unknown) {
notImplemented("Http2SecureServer.setTimeout");
setTimeout(msecs: number, callback?: () => unknown) {
this.timeout = msecs;
if (callback !== undefined) {
this.on("timeout", callback);
}
}
get timeout(): number {
notImplemented("Http2SecureServer.timeout");
return 0;
}
updateSettings(_settings: Record<string, unknown>) {
notImplemented("Http2SecureServer.updateSettings");
updateSettings(settings: Record<string, unknown>) {
this.#options.settings = { ...this.#options.settings, ...settings };
}
}
export function createServer(
_options: Record<string, unknown>,
_onRequestHandler: () => unknown,
options: Record<string, unknown>,
onRequestHandler: () => unknown,
): Http2Server {
notImplemented("http2.createServer");
return new Http2Server();
if (typeof options === "function") {
onRequestHandler = options;
options = {};
}
return new Http2Server(options, onRequestHandler);
}
export function createSecureServer(
@ -353,11 +595,11 @@ export function createSecureServer(
}
export function connect(
_authority: string | URL,
_options: Record<string, unknown>,
authority: string | URL,
options: Record<string, unknown>,
callback: (session: ClientHttp2Session) => void,
): ClientHttp2Session {
notImplemented("http2.connect");
return new ClientHttp2Session();
return new ClientHttp2Session(authority, options, callback);
}
export const constants = {
@ -681,8 +923,8 @@ export class Http2ServerRequest {
return "";
}
setTimeout(_msecs: number, _callback: () => unknown) {
notImplemented("Http2ServerRequest.setTimeout");
setTimeout(msecs: number, callback?: () => unknown) {
this.stream.setTimeout(callback, msecs);
}
get socket(): Socket /*| TlsSocket*/ {
@ -781,8 +1023,8 @@ export class Http2ServerResponse {
notImplemented("Http2ServerResponse.setHeader");
}
setTimeout(_msecs: number, _callback: () => unknown) {
notImplemented("Http2ServerResponse.setTimeout");
setTimeout(msecs: number, callback?: () => unknown) {
this.stream.setTimeout(msecs, callback);
}
get socket(): Socket /*| TlsSocket*/ {

View file

@ -1834,21 +1834,8 @@ function _onconnection(this: any, err: number, clientHandle?: Handle) {
return;
}
const socket = new Socket({
handle: clientHandle,
allowHalfOpen: self.allowHalfOpen,
pauseOnCreate: self.pauseOnConnect,
readable: true,
writable: true,
});
// TODO(@bartlomieju): implement noDelay and setKeepAlive
self._connections++;
socket.server = self;
socket._server = self;
DTRACE_NET_SERVER_CONNECTION(socket);
const socket = self._createSocket(clientHandle);
this._connections++;
self.emit("connection", socket);
if (netServerSocketChannel.hasSubscribers) {
@ -2369,6 +2356,23 @@ export class Server extends EventEmitter {
return !!this._handle;
}
_createSocket(clientHandle) {
const socket = new Socket({
handle: clientHandle,
allowHalfOpen: this.allowHalfOpen,
pauseOnCreate: this.pauseOnConnect,
readable: true,
writable: true,
});
// TODO(@bartlomieju): implement noDelay and setKeepAlive
socket.server = this;
socket._server = this;
DTRACE_NET_SERVER_CONNECTION(socket);
}
_listen2 = _setupListenHandle;
_emitCloseIfDrained() {