1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-22 23:19:55 -05:00

refactor(ext/http): Expose internal serveHttpOnListener API for HTTP2 (#19331)

For the first implementation of node:http2, we'll use the internal
version of `Deno.serve` which allows us to listen on a raw TCP
connection rather than a listener.

This is mostly a refactoring, and hooking up of `op_http_serve_on` that
was never previously exposed (but designed for this purpose).
This commit is contained in:
Matt Mastracci 2023-05-31 17:20:39 -06:00 committed by Bartek Iwańczuk
parent 34ab009e3c
commit 8f9a05f16e
No known key found for this signature in database
GPG key ID: 0C6BCDDC3B3AD750
2 changed files with 157 additions and 39 deletions

View file

@ -20,6 +20,8 @@ const servePort = 4502;
const { const {
upgradeHttpRaw, upgradeHttpRaw,
addTrailers, addTrailers,
serveHttpOnListener,
serveHttpOnConnection,
// @ts-expect-error TypeScript (as of 3.7) does not support indexing namespaces by symbol // @ts-expect-error TypeScript (as of 3.7) does not support indexing namespaces by symbol
} = Deno[Deno.internal]; } = Deno[Deno.internal];
@ -165,6 +167,98 @@ Deno.test({ permissions: { net: true } }, async function httpServerBasic() {
await server; await server;
}); });
// Test serving of HTTP on an arbitrary listener.
Deno.test(
{ permissions: { net: true } },
async function httpServerOnListener() {
const ac = new AbortController();
const promise = deferred();
const listeningPromise = deferred();
const listener = Deno.listen({ port: servePort });
const server = serveHttpOnListener(
listener,
ac.signal,
async (
request: Request,
{ remoteAddr }: { remoteAddr: { hostname: string } },
) => {
assertEquals(
new URL(request.url).href,
`http://127.0.0.1:${servePort}/`,
);
assertEquals(await request.text(), "");
assertEquals(remoteAddr.hostname, "127.0.0.1");
promise.resolve();
return new Response("Hello World", { headers: { "foo": "bar" } });
},
createOnErrorCb(ac),
onListen(listeningPromise),
);
await listeningPromise;
const resp = await fetch(`http://127.0.0.1:${servePort}/`, {
headers: { "connection": "close" },
});
await promise;
const clone = resp.clone();
const text = await resp.text();
assertEquals(text, "Hello World");
assertEquals(resp.headers.get("foo"), "bar");
const cloneText = await clone.text();
assertEquals(cloneText, "Hello World");
ac.abort();
await server;
},
);
// Test serving of HTTP on an arbitrary connection.
Deno.test(
{ permissions: { net: true } },
async function httpServerOnConnection() {
const ac = new AbortController();
const promise = deferred();
const listeningPromise = deferred();
const listener = Deno.listen({ port: servePort });
const acceptPromise = listener.accept();
const fetchPromise = fetch(`http://127.0.0.1:${servePort}/`, {
headers: { "connection": "close" },
});
const server = serveHttpOnConnection(
await acceptPromise,
ac.signal,
async (
request: Request,
{ remoteAddr }: { remoteAddr: { hostname: string } },
) => {
assertEquals(
new URL(request.url).href,
`http://127.0.0.1:${servePort}/`,
);
assertEquals(await request.text(), "");
assertEquals(remoteAddr.hostname, "127.0.0.1");
promise.resolve();
return new Response("Hello World", { headers: { "foo": "bar" } });
},
createOnErrorCb(ac),
onListen(listeningPromise),
);
const resp = await fetchPromise;
await promise;
const clone = resp.clone();
const text = await resp.text();
assertEquals(text, "Hello World");
assertEquals(resp.headers.get("foo"), "bar");
const cloneText = await clone.text();
assertEquals(cloneText, "Hello World");
// Note that we don't need to abort this server -- it closes when the connection does
// ac.abort();
await server;
listener.close();
},
);
Deno.test({ permissions: { net: true } }, async function httpServerOnError() { Deno.test({ permissions: { net: true } }, async function httpServerOnError() {
const ac = new AbortController(); const ac = new AbortController();
const listeningPromise = deferred(); const listeningPromise = deferred();

View file

@ -34,7 +34,8 @@ import {
readableStreamForRid, readableStreamForRid,
ReadableStreamPrototype, ReadableStreamPrototype,
} from "ext:deno_web/06_streams.js"; } from "ext:deno_web/06_streams.js";
import { TcpConn } from "ext:deno_net/01_net.js"; import { listen, TcpConn } from "ext:deno_net/01_net.js";
import { listenTls } from "ext:deno_net/02_tls.js";
const { const {
ObjectPrototypeIsPrototypeOf, ObjectPrototypeIsPrototypeOf,
PromisePrototypeCatch, PromisePrototypeCatch,
@ -54,6 +55,7 @@ const {
op_http_get_request_method_and_url, op_http_get_request_method_and_url,
op_http_read_request_body, op_http_read_request_body,
op_http_serve, op_http_serve,
op_http_serve_on,
op_http_set_promise_complete, op_http_set_promise_complete,
op_http_set_response_body_bytes, op_http_set_response_body_bytes,
op_http_set_response_body_resource, op_http_set_response_body_resource,
@ -71,6 +73,7 @@ const {
"op_http_get_request_method_and_url", "op_http_get_request_method_and_url",
"op_http_read_request_body", "op_http_read_request_body",
"op_http_serve", "op_http_serve",
"op_http_serve_on",
"op_http_set_promise_complete", "op_http_set_promise_complete",
"op_http_set_response_body_bytes", "op_http_set_response_body_bytes",
"op_http_set_response_body_resource", "op_http_set_response_body_resource",
@ -340,12 +343,21 @@ class InnerRequest {
} }
class CallbackContext { class CallbackContext {
abortController;
responseBodies;
scheme; scheme;
fallbackHost; fallbackHost;
serverRid; serverRid;
closed; closed;
initialize(args) { constructor(signal, args) {
signal?.addEventListener(
"abort",
() => this.close(),
{ once: true },
);
this.abortController = new AbortController();
this.responseBodies = new SafeSet();
this.serverRid = args[0]; this.serverRid = args[0];
this.scheme = args[1]; this.scheme = args[1];
this.fallbackHost = args[2]; this.fallbackHost = args[2];
@ -500,7 +512,9 @@ async function asyncResponse(responseBodies, req, status, stream) {
* *
* This function returns a promise that will only reject in the case of abnormal exit. * This function returns a promise that will only reject in the case of abnormal exit.
*/ */
function mapToCallback(responseBodies, context, signal, callback, onError) { function mapToCallback(context, callback, onError) {
const responseBodies = context.responseBodies;
const signal = context.abortController.signal;
return async function (req) { return async function (req) {
// Get the response from the user-provided callback. If that fails, use onError. If that fails, return a fallback // Get the response from the user-provided callback. If that fails, use onError. If that fails, return a fallback
// 500 error. // 500 error.
@ -611,18 +625,7 @@ function serve(arg1, arg2) {
reusePort: options.reusePort ?? false, reusePort: options.reusePort ?? false,
}; };
const abortController = new AbortController(); let listener;
const responseBodies = new SafeSet();
const context = new CallbackContext();
const callback = mapToCallback(
responseBodies,
context,
abortController.signal,
handler,
onError,
);
if (wantsHttps) { if (wantsHttps) {
if (!options.cert || !options.key) { if (!options.cert || !options.key) {
throw new TypeError( throw new TypeError(
@ -632,37 +635,56 @@ function serve(arg1, arg2) {
listenOpts.cert = options.cert; listenOpts.cert = options.cert;
listenOpts.key = options.key; listenOpts.key = options.key;
listenOpts.alpnProtocols = ["h2", "http/1.1"]; listenOpts.alpnProtocols = ["h2", "http/1.1"];
const listener = Deno.listenTls(listenOpts); listener = listenTls(listenOpts);
listenOpts.port = listener.addr.port; listenOpts.port = listener.addr.port;
context.initialize(op_http_serve(
listener.rid,
));
} else { } else {
const listener = Deno.listen(listenOpts); listener = listen(listenOpts);
listenOpts.port = listener.addr.port; listenOpts.port = listener.addr.port;
context.initialize(op_http_serve(
listener.rid,
));
} }
signal?.addEventListener( const onListen = (scheme) => {
"abort", const port = listenOpts.port;
() => context.close(), if (options.onListen) {
{ once: true }, options.onListen({ port });
); } else {
// If the hostname is "0.0.0.0", we display "localhost" in console
const onListen = options.onListen ?? function ({ port }) { // because browsers in Windows don't resolve "0.0.0.0".
// If the hostname is "0.0.0.0", we display "localhost" in console // See the discussion in https://github.com/denoland/deno_std/issues/1165
// because browsers in Windows don't resolve "0.0.0.0". const hostname = listenOpts.hostname == "0.0.0.0"
// See the discussion in https://github.com/denoland/deno_std/issues/1165 ? "localhost"
const hostname = listenOpts.hostname == "0.0.0.0" : listenOpts.hostname;
? "localhost" console.log(`Listening on ${scheme}${hostname}:${port}/`);
: listenOpts.hostname; }
console.log(`Listening on ${context.scheme}${hostname}:${port}/`);
}; };
onListen({ port: listenOpts.port }); return serveHttpOnListener(listener, signal, handler, onError, onListen);
}
/**
* Serve HTTP/1.1 and/or HTTP/2 on an arbitrary listener.
*/
function serveHttpOnListener(listener, signal, handler, onError, onListen) {
const context = new CallbackContext(signal, op_http_serve(listener.rid));
const callback = mapToCallback(context, handler, onError);
onListen(context.scheme);
return serveHttpOn(context, callback);
}
/**
* Serve HTTP/1.1 and/or HTTP/2 on an arbitrary connection.
*/
function serveHttpOnConnection(connection, signal, handler, onError, onListen) {
const context = new CallbackContext(signal, op_http_serve_on(connection.rid));
const callback = mapToCallback(context, handler, onError);
onListen(context.scheme);
return serveHttpOn(context, callback);
}
function serveHttpOn(context, callback) {
let ref = true; let ref = true;
let currentPromise = null; let currentPromise = null;
const promiseIdSymbol = SymbolFor("Deno.core.internalPromiseId"); const promiseIdSymbol = SymbolFor("Deno.core.internalPromiseId");
@ -710,7 +732,7 @@ function serve(arg1, arg2) {
}); });
} }
for (const streamRid of new SafeSetIterator(responseBodies)) { for (const streamRid of new SafeSetIterator(context.responseBodies)) {
core.tryClose(streamRid); core.tryClose(streamRid);
} }
})(); })();
@ -734,5 +756,7 @@ function serve(arg1, arg2) {
internals.addTrailers = addTrailers; internals.addTrailers = addTrailers;
internals.upgradeHttpRaw = upgradeHttpRaw; internals.upgradeHttpRaw = upgradeHttpRaw;
internals.serveHttpOnListener = serveHttpOnListener;
internals.serveHttpOnConnection = serveHttpOnConnection;
export { serve, upgradeHttpRaw }; export { serve, upgradeHttpRaw };