From bdffcb409fd1e257db280ab73e07cc319711256c Mon Sep 17 00:00:00 2001 From: Matt Mastracci Date: Sat, 22 Apr 2023 11:48:21 -0600 Subject: [PATCH] feat(ext/http): Rework Deno.serve using hyper 1.0-rc3 (#18619) This is a rewrite of the `Deno.serve` API to live on top of hyper 1.0-rc3. The code should be more maintainable long-term, and avoids some of the slower mpsc patterns that made the older code less efficient than it could have been. Missing features: - `upgradeHttp` and `upgradeHttpRaw` (`upgradeWebSocket` is available, however). - Automatic compression is unavailable on responses. --- Cargo.lock | 64 +- Cargo.toml | 3 +- cli/bench/http/deno_http_serve_https.js | 18 + cli/tests/unit/serve_test.ts | 347 +++++++++-- cli/tests/unit/websocket_test.ts | 16 + core/io.rs | 33 +- ext/http/00_serve.js | 534 +++++++++++++++++ ext/http/01_http.js | 257 +------- ext/http/Cargo.toml | 8 + ext/http/http_next.rs | 765 ++++++++++++++++++++++++ ext/http/lib.rs | 84 ++- ext/http/request_body.rs | 84 +++ ext/http/request_properties.rs | 249 ++++++++ ext/http/response_body.rs | 253 ++++++++ ext/net/Cargo.toml | 1 + ext/net/lib.rs | 1 + ext/net/ops_tls.rs | 29 +- ext/net/ops_unix.rs | 4 +- ext/net/raw.rs | 304 ++++++++++ ext/websocket/Cargo.toml | 4 +- ext/websocket/lib.rs | 75 ++- ext/websocket/stream.rs | 115 ++++ 22 files changed, 2912 insertions(+), 336 deletions(-) create mode 100644 cli/bench/http/deno_http_serve_https.js create mode 100644 ext/http/00_serve.js create mode 100644 ext/http/http_next.rs create mode 100644 ext/http/request_body.rs create mode 100644 ext/http/request_properties.rs create mode 100644 ext/http/response_body.rs create mode 100644 ext/net/raw.rs create mode 100644 ext/websocket/stream.rs diff --git a/Cargo.lock b/Cargo.lock index 114a6e0e80..ac188de531 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -728,7 +728,7 @@ dependencies = [ "fwdansi", "glibc_version", "http", - "hyper", + "hyper 0.14.26", "import_map 0.15.0", "indexmap", "jsonc-parser", @@ -1022,11 +1022,14 @@ dependencies = [ "bytes", "cache_control", "deno_core", + "deno_net", "deno_websocket", "flate2", "fly-accept-encoding", + "http", "httparse", - "hyper", + "hyper 0.14.26", + "hyper 1.0.0-rc.3", "memmem", "mime", "once_cell", @@ -1035,6 +1038,8 @@ dependencies = [ "pin-project", "ring", "serde", + "slab", + "thiserror", "tokio", "tokio-util", ] @@ -1119,6 +1124,7 @@ dependencies = [ "deno_core", "deno_tls", "log", + "pin-project", "serde", "socket2", "tokio", @@ -1242,7 +1248,7 @@ dependencies = [ "fs3", "fwdansi", "http", - "hyper", + "hyper 0.14.26", "libc", "log", "netif", @@ -1345,11 +1351,13 @@ dependencies = [ name = "deno_websocket" version = "0.104.0" dependencies = [ + "bytes", "deno_core", + "deno_net", "deno_tls", "fastwebsockets", "http", - "hyper", + "hyper 0.14.26", "serde", "tokio", "tokio-rustls", @@ -1794,13 +1802,13 @@ dependencies = [ [[package]] name = "fastwebsockets" -version = "0.2.5" +version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9e973e2bd2dbd77cc9e929ede2ce65984a35ac5481976afbfbd509cb40dc965" +checksum = "2fbc4aeb6c0ab927a93b5e5fc70d4c7f834260fc414021ac40c58d046ea0e394" dependencies = [ "base64 0.21.0", "cc", - "hyper", + "hyper 0.14.26", "pin-project", "rand", "sha1", @@ -2237,6 +2245,16 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "http-body" +version = "1.0.0-rc.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "951dfc2e32ac02d67c90c0d65bd27009a635dc9b381a2cc7d284ab01e3a0150d" +dependencies = [ + "bytes", + "http", +] + [[package]] name = "httparse" version = "1.8.0" @@ -2267,7 +2285,7 @@ dependencies = [ "futures-util", "h2", "http", - "http-body", + "http-body 0.4.5", "httparse", "httpdate", "itoa", @@ -2279,6 +2297,28 @@ dependencies = [ "want", ] +[[package]] +name = "hyper" +version = "1.0.0-rc.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b75264b2003a3913f118d35c586e535293b3e22e41f074930762929d071e092" +dependencies = [ + "bytes", + "futures-channel", + "futures-core", + "futures-util", + "h2", + "http", + "http-body 1.0.0-rc.2", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "tokio", + "tracing", + "want", +] + [[package]] name = "hyper-rustls" version = "0.23.2" @@ -2286,7 +2326,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1788965e61b367cd03a62950836d5cd41560c3577d90e40e0819373194d1661c" dependencies = [ "http", - "hyper", + "hyper 0.14.26", "rustls", "tokio", "tokio-rustls", @@ -3614,8 +3654,8 @@ dependencies = [ "futures-util", "h2", "http", - "http-body", - "hyper", + "http-body 0.4.5", + "hyper 0.14.26", "hyper-rustls", "ipnet", "js-sys", @@ -4870,7 +4910,7 @@ dependencies = [ "fastwebsockets", "flate2", "futures", - "hyper", + "hyper 0.14.26", "lazy-regex", "lsp-types", "nix", diff --git a/Cargo.toml b/Cargo.toml index aa12e16295..9edd7f8357 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -91,7 +91,7 @@ data-url = "=0.2.0" dlopen = "0.1.8" encoding_rs = "=0.8.31" ecb = "=0.1.1" -fastwebsockets = "=0.2.5" +fastwebsockets = "=0.2.6" flate2 = "=1.0.24" fs3 = "0.5.0" futures = "0.3.21" @@ -126,6 +126,7 @@ serde_json = "1.0.85" serde_repr = "=0.1.9" sha2 = { version = "0.10.6", features = ["oid"] } signature = "=1.6.4" +slab = "0.4" smallvec = "1.8" socket2 = "0.4.7" tar = "=0.4.38" diff --git a/cli/bench/http/deno_http_serve_https.js b/cli/bench/http/deno_http_serve_https.js new file mode 100644 index 0000000000..cea659e093 --- /dev/null +++ b/cli/bench/http/deno_http_serve_https.js @@ -0,0 +1,18 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. + +const addr = Deno.args[0] ?? "127.0.0.1:4500"; +const [hostname, port] = addr.split(":"); +const { serve } = Deno; + +function readFileSync(file) { + return Deno.readTextFileSync(new URL(file, import.meta.url).pathname); +} + +const CERT = readFileSync("../../tests/testdata/tls/localhost.crt"); +const KEY = readFileSync("../../tests/testdata/tls/localhost.key"); + +function handler() { + return new Response("Hello World"); +} + +serve(handler, { hostname, port, reusePort: true, cert: CERT, key: KEY }); diff --git a/cli/tests/unit/serve_test.ts b/cli/tests/unit/serve_test.ts index 32d436d04f..8344f1be5e 100644 --- a/cli/tests/unit/serve_test.ts +++ b/cli/tests/unit/serve_test.ts @@ -2,6 +2,7 @@ // deno-lint-ignore-file +import { assertMatch } from "https://deno.land/std@v0.42.0/testing/asserts.ts"; import { Buffer, BufReader, BufWriter } from "../../../test_util/std/io/mod.ts"; import { TextProtoReader } from "../testdata/run/textproto.ts"; import { @@ -31,6 +32,27 @@ function onListen( }; } +Deno.test(async function httpServerShutsDownPortBeforeResolving() { + const ac = new AbortController(); + const listeningPromise = deferred(); + + const server = Deno.serve({ + handler: (_req) => new Response("ok"), + port: 4501, + signal: ac.signal, + onListen: onListen(listeningPromise), + }); + + await listeningPromise; + assertThrows(() => Deno.listen({ port: 4501 })); + + ac.abort(); + await server; + + const listener = Deno.listen({ port: 4501 }); + listener!.close(); +}); + Deno.test(async function httpServerCanResolveHostnames() { const ac = new AbortController(); const listeningPromise = deferred(); @@ -120,6 +142,71 @@ Deno.test({ permissions: { net: true } }, async function httpServerBasic() { await server; }); +Deno.test({ permissions: { net: true } }, async function httpServerOnError() { + const ac = new AbortController(); + const promise = deferred(); + const listeningPromise = deferred(); + let requestStash: Request | null; + + const server = Deno.serve({ + handler: async (request: Request) => { + requestStash = request; + await new Promise((r) => setTimeout(r, 100)); + throw "fail"; + }, + port: 4501, + signal: ac.signal, + onListen: onListen(listeningPromise), + onError: () => { + return new Response("failed: " + requestStash!.url, { status: 500 }); + }, + }); + + await listeningPromise; + const resp = await fetch("http://127.0.0.1:4501/", { + headers: { "connection": "close" }, + }); + const text = await resp.text(); + ac.abort(); + await server; + + assertEquals(text, "failed: http://127.0.0.1:4501/"); +}); + +Deno.test( + { permissions: { net: true } }, + async function httpServerOnErrorFails() { + const ac = new AbortController(); + const promise = deferred(); + const listeningPromise = deferred(); + let requestStash: Request | null; + + const server = Deno.serve({ + handler: async (request: Request) => { + requestStash = request; + await new Promise((r) => setTimeout(r, 100)); + throw "fail"; + }, + port: 4501, + signal: ac.signal, + onListen: onListen(listeningPromise), + onError: () => { + throw "again"; + }, + }); + + await listeningPromise; + const resp = await fetch("http://127.0.0.1:4501/", { + headers: { "connection": "close" }, + }); + const text = await resp.text(); + ac.abort(); + await server; + + assertEquals(text, "Internal Server Error"); + }, +); + Deno.test({ permissions: { net: true } }, async function httpServerOverload1() { const ac = new AbortController(); const promise = deferred(); @@ -238,7 +325,7 @@ Deno.test( console.log = (msg) => { try { const match = msg.match(/Listening on http:\/\/localhost:(\d+)\//); - assert(!!match); + assert(!!match, `Didn't match ${msg}`); const port = +match[1]; assert(port > 0 && port < 65536); } finally { @@ -301,6 +388,109 @@ Deno.test( }, ); +function createUrlTest( + name: string, + methodAndPath: string, + host: string | null, + expected: string, +) { + Deno.test(`httpServerUrl${name}`, async () => { + const listeningPromise: Deferred = deferred(); + const urlPromise = deferred(); + const ac = new AbortController(); + const server = Deno.serve({ + handler: async (request: Request) => { + urlPromise.resolve(request.url); + return new Response(""); + }, + port: 0, + signal: ac.signal, + onListen: ({ port }: { port: number }) => { + listeningPromise.resolve(port); + }, + onError: createOnErrorCb(ac), + }); + + const port = await listeningPromise; + const conn = await Deno.connect({ port }); + + const encoder = new TextEncoder(); + const body = `${methodAndPath} HTTP/1.1\r\n${ + host ? ("Host: " + host + "\r\n") : "" + }Content-Length: 5\r\n\r\n12345`; + const writeResult = await conn.write(encoder.encode(body)); + assertEquals(body.length, writeResult); + + try { + const expectedResult = expected.replace("HOST", "localhost").replace( + "PORT", + `${port}`, + ); + assertEquals(await urlPromise, expectedResult); + } finally { + ac.abort(); + await server; + conn.close(); + } + }); +} + +createUrlTest("WithPath", "GET /path", null, "http://HOST:PORT/path"); +createUrlTest( + "WithPathAndHost", + "GET /path", + "deno.land", + "http://deno.land/path", +); +createUrlTest( + "WithAbsolutePath", + "GET http://localhost/path", + null, + "http://localhost/path", +); +createUrlTest( + "WithAbsolutePathAndHost", + "GET http://localhost/path", + "deno.land", + "http://localhost/path", +); +createUrlTest( + "WithPortAbsolutePath", + "GET http://localhost:1234/path", + null, + "http://localhost:1234/path", +); +createUrlTest( + "WithPortAbsolutePathAndHost", + "GET http://localhost:1234/path", + "deno.land", + "http://localhost:1234/path", +); +createUrlTest( + "WithPortAbsolutePathAndHostWithPort", + "GET http://localhost:1234/path", + "deno.land:9999", + "http://localhost:1234/path", +); + +createUrlTest("WithAsterisk", "OPTIONS *", null, "*"); +createUrlTest( + "WithAuthorityForm", + "CONNECT deno.land:80", + null, + "deno.land:80", +); + +// TODO(mmastrac): These should probably be 400 errors +createUrlTest("WithInvalidAsterisk", "GET *", null, "*"); +createUrlTest("WithInvalidNakedPath", "GET path", null, "path"); +createUrlTest( + "WithInvalidNakedAuthority", + "GET deno.land:1234", + null, + "deno.land:1234", +); + Deno.test( { permissions: { net: true } }, async function httpServerGetRequestBody() { @@ -536,7 +726,10 @@ Deno.test({ permissions: { net: true } }, async function httpServerWebSocket() { response, socket, } = Deno.upgradeWebSocket(request); - socket.onerror = () => fail(); + socket.onerror = (e) => { + console.error(e); + fail(); + }; socket.onmessage = (m) => { socket.send(m.data); socket.close(1001); @@ -553,7 +746,10 @@ Deno.test({ permissions: { net: true } }, async function httpServerWebSocket() { const def = deferred(); const ws = new WebSocket("ws://localhost:4501"); ws.onmessage = (m) => assertEquals(m.data, "foo"); - ws.onerror = () => fail(); + ws.onerror = (e) => { + console.error(e); + fail(); + }; ws.onclose = () => def.resolve(); ws.onopen = () => ws.send("foo"); @@ -562,6 +758,50 @@ Deno.test({ permissions: { net: true } }, async function httpServerWebSocket() { await server; }); +Deno.test( + { permissions: { net: true } }, + async function httpServerWebSocketCanAccessRequest() { + const ac = new AbortController(); + const listeningPromise = deferred(); + const server = Deno.serve({ + handler: async (request) => { + const { + response, + socket, + } = Deno.upgradeWebSocket(request); + socket.onerror = (e) => { + console.error(e); + fail(); + }; + socket.onmessage = (m) => { + socket.send(request.url.toString()); + socket.close(1001); + }; + return response; + }, + port: 4501, + signal: ac.signal, + onListen: onListen(listeningPromise), + onError: createOnErrorCb(ac), + }); + + await listeningPromise; + const def = deferred(); + const ws = new WebSocket("ws://localhost:4501"); + ws.onmessage = (m) => assertEquals(m.data, "http://localhost:4501/"); + ws.onerror = (e) => { + console.error(e); + fail(); + }; + ws.onclose = () => def.resolve(); + ws.onopen = () => ws.send("foo"); + + await def; + ac.abort(); + await server; + }, +); + Deno.test( { permissions: { net: true } }, async function httpVeryLargeRequest() { @@ -682,47 +922,46 @@ Deno.test( }, ); -// FIXME: auto request body reading is intefering with passing it as response. -// Deno.test( -// { permissions: { net: true } }, -// async function httpServerStreamDuplex() { -// const promise = deferred(); -// const ac = new AbortController(); +Deno.test( + { permissions: { net: true } }, + async function httpServerStreamDuplex() { + const promise = deferred(); + const ac = new AbortController(); -// const server = Deno.serve(request => { -// assert(request.body); + const server = Deno.serve((request) => { + assert(request.body); -// promise.resolve(); -// return new Response(request.body); -// }, { port: 2333, signal: ac.signal }); + promise.resolve(); + return new Response(request.body); + }, { port: 2333, signal: ac.signal }); -// const ts = new TransformStream(); -// const writable = ts.writable.getWriter(); + const ts = new TransformStream(); + const writable = ts.writable.getWriter(); -// const resp = await fetch("http://127.0.0.1:2333/", { -// method: "POST", -// body: ts.readable, -// }); + const resp = await fetch("http://127.0.0.1:2333/", { + method: "POST", + body: ts.readable, + }); -// await promise; -// assert(resp.body); -// const reader = resp.body.getReader(); -// await writable.write(new Uint8Array([1])); -// const chunk1 = await reader.read(); -// assert(!chunk1.done); -// assertEquals(chunk1.value, new Uint8Array([1])); -// await writable.write(new Uint8Array([2])); -// const chunk2 = await reader.read(); -// assert(!chunk2.done); -// assertEquals(chunk2.value, new Uint8Array([2])); -// await writable.close(); -// const chunk3 = await reader.read(); -// assert(chunk3.done); + await promise; + assert(resp.body); + const reader = resp.body.getReader(); + await writable.write(new Uint8Array([1])); + const chunk1 = await reader.read(); + assert(!chunk1.done); + assertEquals(chunk1.value, new Uint8Array([1])); + await writable.write(new Uint8Array([2])); + const chunk2 = await reader.read(); + assert(!chunk2.done); + assertEquals(chunk2.value, new Uint8Array([2])); + await writable.close(); + const chunk3 = await reader.read(); + assert(chunk3.done); -// ac.abort(); -// await server; -// }, -// ); + ac.abort(); + await server; + }, +); Deno.test( { permissions: { net: true } }, @@ -867,10 +1106,10 @@ Deno.test( let responseText = new TextDecoder("iso-8859-1").decode(buf); clientConn.close(); - assert(/\r\n[Xx]-[Hh]eader-[Tt]est: Æ\r\n/.test(responseText)); - ac.abort(); await server; + + assertMatch(responseText, /\r\n[Xx]-[Hh]eader-[Tt]est: Æ\r\n/); }, ); @@ -1355,12 +1594,11 @@ createServerLengthTest("autoResponseWithKnownLengthEmpty", { expects_con_len: true, }); -// FIXME: https://github.com/denoland/deno/issues/15892 -// createServerLengthTest("autoResponseWithUnknownLengthEmpty", { -// body: stream(""), -// expects_chunked: true, -// expects_con_len: false, -// }); +createServerLengthTest("autoResponseWithUnknownLengthEmpty", { + body: stream(""), + expects_chunked: true, + expects_con_len: false, +}); Deno.test( { permissions: { net: true } }, @@ -1841,6 +2079,7 @@ Deno.test( method: "GET", headers: { "connection": "close" }, }); + assertEquals(resp.status, 204); assertEquals(resp.headers.get("Content-Length"), null); } finally { ac.abort(); @@ -2162,11 +2401,11 @@ Deno.test( count++; return new Response(`hello world ${count}`); }, { - async onListen() { - const res1 = await fetch("http://localhost:9000/"); + async onListen({ port }: { port: number }) { + const res1 = await fetch(`http://localhost:${port}/`); assertEquals(await res1.text(), "hello world 1"); - const res2 = await fetch("http://localhost:9000/"); + const res2 = await fetch(`http://localhost:${port}/`); assertEquals(await res2.text(), "hello world 2"); promise.resolve(); @@ -2199,13 +2438,13 @@ Deno.test( return new Response("ok"); }, signal: ac.signal, - onListen: onListen(listeningPromise), + onListen: ({ port }: { port: number }) => listeningPromise.resolve(port), onError: createOnErrorCb(ac), }); try { - await listeningPromise; - const resp = await fetch("http://localhost:9000/", { + const port = await listeningPromise; + const resp = await fetch(`http://localhost:${port}/`, { headers: { connection: "close" }, method: "POST", body: '{"sus":true}', @@ -2238,8 +2477,8 @@ Deno.test( }, }), ), { - async onListen() { - const res1 = await fetch("http://localhost:9000/"); + async onListen({ port }) { + const res1 = await fetch(`http://localhost:${port}/`); assertEquals((await res1.text()).length, 40 * 50_000); promise.resolve(); diff --git a/cli/tests/unit/websocket_test.ts b/cli/tests/unit/websocket_test.ts index 997d8f0df6..999eede414 100644 --- a/cli/tests/unit/websocket_test.ts +++ b/cli/tests/unit/websocket_test.ts @@ -43,6 +43,22 @@ Deno.test(async function websocketPingPong() { ws.close(); }); +// TODO(mmastrac): This requires us to ignore bad certs +// Deno.test(async function websocketSecureConnect() { +// const promise = deferred(); +// const ws = new WebSocket("wss://localhost:4243/"); +// assertEquals(ws.url, "wss://localhost:4243/"); +// ws.onerror = (error) => { +// console.log(error); +// fail(); +// }; +// ws.onopen = () => ws.close(); +// ws.onclose = () => { +// promise.resolve(); +// }; +// await promise; +// }); + // https://github.com/denoland/deno/issues/18700 Deno.test( { sanitizeOps: false, sanitizeResources: false }, diff --git a/core/io.rs b/core/io.rs index 103fe79c1f..567d50bd48 100644 --- a/core/io.rs +++ b/core/io.rs @@ -3,6 +3,7 @@ use std::ops::Deref; use std::ops::DerefMut; +use bytes::Buf; use serde_v8::ZeroCopyBuf; /// BufView is a wrapper around an underlying contiguous chunk of bytes. It can @@ -26,11 +27,11 @@ enum BufViewInner { } impl BufView { - fn from_inner(inner: BufViewInner) -> Self { + const fn from_inner(inner: BufViewInner) -> Self { Self { inner, cursor: 0 } } - pub fn empty() -> Self { + pub const fn empty() -> Self { Self::from_inner(BufViewInner::Empty) } @@ -65,6 +66,20 @@ impl BufView { } } +impl Buf for BufView { + fn remaining(&self) -> usize { + self.len() + } + + fn chunk(&self) -> &[u8] { + self.deref() + } + + fn advance(&mut self, cnt: usize) { + self.advance_cursor(cnt) + } +} + impl Deref for BufView { type Target = [u8]; @@ -210,6 +225,20 @@ impl BufMutView { } } +impl Buf for BufMutView { + fn remaining(&self) -> usize { + self.len() + } + + fn chunk(&self) -> &[u8] { + self.deref() + } + + fn advance(&mut self, cnt: usize) { + self.advance_cursor(cnt) + } +} + impl Deref for BufMutView { type Target = [u8]; diff --git a/ext/http/00_serve.js b/ext/http/00_serve.js new file mode 100644 index 0000000000..91bd360944 --- /dev/null +++ b/ext/http/00_serve.js @@ -0,0 +1,534 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +const core = globalThis.Deno.core; +const primordials = globalThis.__bootstrap.primordials; + +const { BadResourcePrototype } = core; +import { InnerBody } from "ext:deno_fetch/22_body.js"; +import { Event } from "ext:deno_web/02_event.js"; +import { + fromInnerResponse, + newInnerResponse, + toInnerResponse, +} from "ext:deno_fetch/23_response.js"; +import { fromInnerRequest } from "ext:deno_fetch/23_request.js"; +import { AbortController } from "ext:deno_web/03_abort_signal.js"; +import { + _eventLoop, + _idleTimeoutDuration, + _idleTimeoutTimeout, + _protocol, + _readyState, + _rid, + _role, + _server, + _serverHandleIdleTimeout, + SERVER, + WebSocket, +} from "ext:deno_websocket/01_websocket.js"; +import { + Deferred, + getReadableStreamResourceBacking, + readableStreamForRid, + ReadableStreamPrototype, +} from "ext:deno_web/06_streams.js"; +const { + ObjectPrototypeIsPrototypeOf, + SafeSet, + SafeSetIterator, + SetPrototypeAdd, + SetPrototypeDelete, + Symbol, + TypeError, + Uint8ArrayPrototype, + Uint8Array, +} = primordials; + +const _upgraded = Symbol("_upgraded"); + +function internalServerError() { + // "Internal Server Error" + return new Response( + new Uint8Array([ + 73, + 110, + 116, + 101, + 114, + 110, + 97, + 108, + 32, + 83, + 101, + 114, + 118, + 101, + 114, + 32, + 69, + 114, + 114, + 111, + 114, + ]), + { status: 500 }, + ); +} + +// Used to ensure that user returns a valid response (but not a different response) from handlers that are upgraded. +const UPGRADE_RESPONSE_SENTINEL = fromInnerResponse( + newInnerResponse(101), + "immutable", +); + +class InnerRequest { + #slabId; + #context; + #methodAndUri; + #streamRid; + #body; + #upgraded; + + constructor(slabId, context) { + this.#slabId = slabId; + this.#context = context; + this.#upgraded = false; + } + + close() { + if (this.#streamRid !== undefined) { + core.close(this.#streamRid); + this.#streamRid = undefined; + } + this.#slabId = undefined; + } + + get [_upgraded]() { + return this.#upgraded; + } + + _wantsUpgrade(upgradeType, ...originalArgs) { + // upgradeHttp is async + // TODO(mmastrac) + if (upgradeType == "upgradeHttp") { + throw "upgradeHttp is unavailable in Deno.serve at this time"; + } + + // upgradeHttpRaw is async + // TODO(mmastrac) + if (upgradeType == "upgradeHttpRaw") { + throw "upgradeHttp is unavailable in Deno.serve at this time"; + } + + // upgradeWebSocket is sync + if (upgradeType == "upgradeWebSocket") { + const response = originalArgs[0]; + const ws = originalArgs[1]; + + this.url(); + this.headerList; + this.close(); + + const goAhead = new Deferred(); + this.#upgraded = () => { + goAhead.resolve(); + }; + + // Start the upgrade in the background. + (async () => { + try { + // Returns the connection and extra bytes, which we can pass directly to op_ws_server_create + const upgrade = await core.opAsync2( + "op_upgrade", + this.#slabId, + response.headerList, + ); + const wsRid = core.ops.op_ws_server_create(upgrade[0], upgrade[1]); + + // We have to wait for the go-ahead signal + await goAhead; + + ws[_rid] = wsRid; + ws[_readyState] = WebSocket.OPEN; + ws[_role] = SERVER; + const event = new Event("open"); + ws.dispatchEvent(event); + + ws[_eventLoop](); + if (ws[_idleTimeoutDuration]) { + ws.addEventListener( + "close", + () => clearTimeout(ws[_idleTimeoutTimeout]), + ); + } + ws[_serverHandleIdleTimeout](); + } catch (error) { + const event = new ErrorEvent("error", { error }); + ws.dispatchEvent(event); + } + })(); + return { response: UPGRADE_RESPONSE_SENTINEL, socket: ws }; + } + } + + url() { + if (this.#methodAndUri === undefined) { + if (this.#slabId === undefined) { + throw new TypeError("request closed"); + } + // TODO(mmastrac): This is quite slow as we're serializing a large number of values. We may want to consider + // splitting this up into multiple ops. + this.#methodAndUri = core.ops.op_get_request_method_and_url(this.#slabId); + } + + const path = this.#methodAndUri[2]; + + // * is valid for OPTIONS + if (path === "*") { + return "*"; + } + + // If the path is empty, return the authority (valid for CONNECT) + if (path == "") { + return this.#methodAndUri[1]; + } + + // CONNECT requires an authority + if (this.#methodAndUri[0] == "CONNECT") { + return this.#methodAndUri[1]; + } + + const hostname = this.#methodAndUri[1]; + if (hostname) { + // Construct a URL from the scheme, the hostname, and the path + return this.#context.scheme + hostname + path; + } + + // Construct a URL from the scheme, the fallback hostname, and the path + return this.#context.scheme + this.#context.fallbackHost + path; + } + + get remoteAddr() { + if (this.#methodAndUri === undefined) { + if (this.#slabId === undefined) { + throw new TypeError("request closed"); + } + this.#methodAndUri = core.ops.op_get_request_method_and_url(this.#slabId); + } + return { + transport: "tcp", + hostname: this.#methodAndUri[3], + port: this.#methodAndUri[4], + }; + } + + get method() { + if (this.#methodAndUri === undefined) { + if (this.#slabId === undefined) { + throw new TypeError("request closed"); + } + this.#methodAndUri = core.ops.op_get_request_method_and_url(this.#slabId); + } + return this.#methodAndUri[0]; + } + + get body() { + if (this.#slabId === undefined) { + throw new TypeError("request closed"); + } + if (this.#body !== undefined) { + return this.#body; + } + // If the method is GET or HEAD, we do not want to include a body here, even if the Rust + // side of the code is willing to provide it to us. + if (this.method == "GET" || this.method == "HEAD") { + this.#body = null; + return null; + } + this.#streamRid = core.ops.op_read_request_body(this.#slabId); + this.#body = new InnerBody(readableStreamForRid(this.#streamRid, false)); + return this.#body; + } + + get headerList() { + if (this.#slabId === undefined) { + throw new TypeError("request closed"); + } + return core.ops.op_get_request_headers(this.#slabId); + } + + get slabId() { + return this.#slabId; + } +} + +class CallbackContext { + scheme; + fallbackHost; + serverRid; + closed; + + initialize(args) { + this.serverRid = args[0]; + this.scheme = args[1]; + this.fallbackHost = args[2]; + this.closed = false; + } + + close() { + try { + this.closed = true; + core.tryClose(this.serverRid); + } catch { + // Pass + } + } +} + +function fastSyncResponseOrStream(req, respBody) { + if (respBody === null || respBody === undefined) { + // Don't set the body + return null; + } + + const stream = respBody.streamOrStatic; + const body = stream.body; + + if (ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, body)) { + core.ops.op_set_response_body_bytes(req, body); + return null; + } + + if (typeof body === "string") { + core.ops.op_set_response_body_text(req, body); + return null; + } + + // At this point in the response it needs to be a stream + if (!ObjectPrototypeIsPrototypeOf(ReadableStreamPrototype, stream)) { + throw TypeError("invalid response"); + } + const resourceBacking = getReadableStreamResourceBacking(stream); + if (resourceBacking) { + core.ops.op_set_response_body_resource( + req, + resourceBacking.rid, + resourceBacking.autoClose, + ); + return null; + } + + return stream; +} + +async function asyncResponse(responseBodies, req, status, stream) { + const responseRid = core.ops.op_set_response_body_stream(req); + SetPrototypeAdd(responseBodies, responseRid); + const reader = stream.getReader(); + core.ops.op_set_promise_complete(req, status); + try { + while (true) { + const { value, done } = await reader.read(); + if (done) { + break; + } + await core.writeAll(responseRid, value); + } + } catch (error) { + await reader.cancel(error); + } finally { + core.tryClose(responseRid); + SetPrototypeDelete(responseBodies, responseRid); + reader.releaseLock(); + } +} + +/** + * Maps the incoming request slab ID to a fully-fledged Request object, passes it to the user-provided + * callback, then extracts the response that was returned from that callback. The response is then pulled + * apart and handled on the Rust side. + * + * This function returns a promise that will only reject in the case of abnormal exit. + */ +function mapToCallback(responseBodies, context, signal, callback, onError) { + return async function (req) { + const innerRequest = new InnerRequest(req, context); + const request = fromInnerRequest(innerRequest, signal, "immutable"); + + // Get the response from the user-provided callback. If that fails, use onError. If that fails, return a fallback + // 500 error. + let response; + try { + response = await callback(request, { + remoteAddr: innerRequest.remoteAddr, + }); + } catch (error) { + try { + response = await onError(error); + } catch (error) { + console.error("Exception in onError while handling exception", error); + response = internalServerError(); + } + } + + const inner = toInnerResponse(response); + if (innerRequest[_upgraded]) { + // We're done here as the connection has been upgraded during the callback and no longer requires servicing. + if (response !== UPGRADE_RESPONSE_SENTINEL) { + console.error("Upgrade response was not returned from callback"); + context.close(); + } + innerRequest[_upgraded](); + return; + } + + // Did everything shut down while we were waiting? + if (context.closed) { + innerRequest.close(); + return; + } + + const status = inner.status; + const headers = inner.headerList; + if (headers && headers.length > 0) { + if (headers.length == 1) { + core.ops.op_set_response_header(req, headers[0][0], headers[0][1]); + } else { + core.ops.op_set_response_headers(req, headers); + } + } + + // Attempt to response quickly to this request, otherwise extract the stream + const stream = fastSyncResponseOrStream(req, inner.body); + if (stream !== null) { + // Handle the stream asynchronously + await asyncResponse(responseBodies, req, status, stream); + } else { + core.ops.op_set_promise_complete(req, status); + } + + innerRequest.close(); + }; +} + +async function serve(arg1, arg2) { + let options = undefined; + let handler = undefined; + if (typeof arg1 === "function") { + handler = arg1; + options = arg2; + } else if (typeof arg2 === "function") { + handler = arg2; + options = arg1; + } else { + options = arg1; + } + if (handler === undefined) { + if (options === undefined) { + throw new TypeError( + "No handler was provided, so an options bag is mandatory.", + ); + } + handler = options.handler; + } + if (typeof handler !== "function") { + throw new TypeError("A handler function must be provided."); + } + if (options === undefined) { + options = {}; + } + + const wantsHttps = options.cert || options.key; + const signal = options.signal; + const onError = options.onError ?? function (error) { + console.error(error); + return internalServerError(); + }; + const listenOpts = { + hostname: options.hostname ?? "0.0.0.0", + port: options.port ?? (wantsHttps ? 9000 : 8000), + reusePort: options.reusePort ?? false, + }; + + const abortController = new AbortController(); + + const responseBodies = new SafeSet(); + const context = new CallbackContext(); + const callback = mapToCallback( + responseBodies, + context, + abortController.signal, + handler, + onError, + ); + + if (wantsHttps) { + if (!options.cert || !options.key) { + throw new TypeError( + "Both cert and key must be provided to enable HTTPS.", + ); + } + listenOpts.cert = options.cert; + listenOpts.key = options.key; + listenOpts.alpnProtocols = ["h2", "http/1.1"]; + const listener = Deno.listenTls(listenOpts); + listenOpts.port = listener.addr.port; + context.initialize(core.ops.op_serve_http( + listener.rid, + )); + } else { + const listener = Deno.listen(listenOpts); + listenOpts.port = listener.addr.port; + context.initialize(core.ops.op_serve_http( + listener.rid, + )); + } + + signal?.addEventListener( + "abort", + () => context.close(), + { once: true }, + ); + + const onListen = options.onListen ?? function ({ port }) { + // If the hostname is "0.0.0.0", we display "localhost" in console + // because browsers in Windows don't resolve "0.0.0.0". + // See the discussion in https://github.com/denoland/deno_std/issues/1165 + const hostname = listenOpts.hostname == "0.0.0.0" + ? "localhost" + : listenOpts.hostname; + console.log(`Listening on ${context.scheme}${hostname}:${port}/`); + }; + + onListen({ port: listenOpts.port }); + + while (true) { + const rid = context.serverRid; + let req; + try { + req = await core.opAsync("op_http_wait", rid); + } catch (error) { + if (ObjectPrototypeIsPrototypeOf(BadResourcePrototype, error)) { + break; + } + throw new Deno.errors.Http(error); + } + if (req === 0xffffffff) { + break; + } + callback(req).catch((error) => { + // Abnormal exit + console.error( + "Terminating Deno.serve loop due to unexpected error", + error, + ); + context.close(); + }); + } + + for (const streamRid of new SafeSetIterator(responseBodies)) { + core.tryClose(streamRid); + } +} + +export { serve }; diff --git a/ext/http/01_http.js b/ext/http/01_http.js index 5bfa58655e..95e2cee740 100644 --- a/ext/http/01_http.js +++ b/ext/http/01_http.js @@ -32,8 +32,8 @@ import { SERVER, WebSocket, } from "ext:deno_websocket/01_websocket.js"; -import { listen, TcpConn, UnixConn } from "ext:deno_net/01_net.js"; -import { listenTls, TlsConn } from "ext:deno_net/02_tls.js"; +import { TcpConn, UnixConn } from "ext:deno_net/01_net.js"; +import { TlsConn } from "ext:deno_net/02_tls.js"; import { Deferred, getReadableStreamResourceBacking, @@ -41,18 +41,17 @@ import { readableStreamForRid, ReadableStreamPrototype, } from "ext:deno_web/06_streams.js"; +import { serve } from "ext:deno_http/00_serve.js"; const { ArrayPrototypeIncludes, ArrayPrototypeMap, ArrayPrototypePush, Error, ObjectPrototypeIsPrototypeOf, - PromisePrototypeCatch, SafeSet, SafeSetIterator, SetPrototypeAdd, SetPrototypeDelete, - SetPrototypeClear, StringPrototypeCharCodeAt, StringPrototypeIncludes, StringPrototypeToLowerCase, @@ -406,6 +405,7 @@ const websocketCvf = buildCaseInsensitiveCommaValueFinder("websocket"); const upgradeCvf = buildCaseInsensitiveCommaValueFinder("upgrade"); function upgradeWebSocket(request, options = {}) { + const inner = toInnerRequest(request); const upgrade = request.headers.get("upgrade"); const upgradeHasWebSocketOption = upgrade !== null && websocketCvf(upgrade); @@ -455,25 +455,39 @@ function upgradeWebSocket(request, options = {}) { } } - const response = fromInnerResponse(r, "immutable"); - const socket = webidl.createBranded(WebSocket); setEventTargetData(socket); socket[_server] = true; - response[_ws] = socket; socket[_idleTimeoutDuration] = options.idleTimeout ?? 120; socket[_idleTimeoutTimeout] = null; + if (inner._wantsUpgrade) { + return inner._wantsUpgrade("upgradeWebSocket", r, socket); + } + + const response = fromInnerResponse(r, "immutable"); + + response[_ws] = socket; + return { response, socket }; } function upgradeHttp(req) { + const inner = toInnerRequest(req); + if (inner._wantsUpgrade) { + return inner._wantsUpgrade("upgradeHttp", arguments); + } + req[_deferred] = new Deferred(); return req[_deferred].promise; } async function upgradeHttpRaw(req, tcpConn) { const inner = toInnerRequest(req); + if (inner._wantsUpgrade) { + return inner._wantsUpgrade("upgradeHttpRaw", arguments); + } + const res = await core.opAsync("op_http_upgrade_early", inner[streamRid]); return new TcpConn(res, tcpConn.remoteAddr, tcpConn.localAddr); } @@ -552,233 +566,4 @@ function buildCaseInsensitiveCommaValueFinder(checkText) { internals.buildCaseInsensitiveCommaValueFinder = buildCaseInsensitiveCommaValueFinder; -function hostnameForDisplay(hostname) { - // If the hostname is "0.0.0.0", we display "localhost" in console - // because browsers in Windows don't resolve "0.0.0.0". - // See the discussion in https://github.com/denoland/deno_std/issues/1165 - return hostname === "0.0.0.0" ? "localhost" : hostname; -} - -async function respond(handler, requestEvent, connInfo, onError) { - let response; - - try { - response = await handler(requestEvent.request, connInfo); - - if (response.bodyUsed && response.body !== null) { - throw new TypeError("Response body already consumed."); - } - } catch (e) { - // Invoke `onError` handler if the request handler throws. - response = await onError(e); - } - - try { - // Send the response. - await requestEvent.respondWith(response); - } catch { - // `respondWith()` can throw for various reasons, including downstream and - // upstream connection errors, as well as errors thrown during streaming - // of the response content. In order to avoid false negatives, we ignore - // the error here and let `serveHttp` close the connection on the - // following iteration if it is in fact a downstream connection error. - } -} - -async function serveConnection( - server, - activeHttpConnections, - handler, - httpConn, - connInfo, - onError, -) { - while (!server.closed) { - let requestEvent = null; - - try { - // Yield the new HTTP request on the connection. - requestEvent = await httpConn.nextRequest(); - } catch { - // Connection has been closed. - break; - } - - if (requestEvent === null) { - break; - } - - respond(handler, requestEvent, connInfo, onError); - } - - SetPrototypeDelete(activeHttpConnections, httpConn); - try { - httpConn.close(); - } catch { - // Connection has already been closed. - } -} - -async function serve(arg1, arg2) { - let options = undefined; - let handler = undefined; - if (typeof arg1 === "function") { - handler = arg1; - options = arg2; - } else if (typeof arg2 === "function") { - handler = arg2; - options = arg1; - } else { - options = arg1; - } - if (handler === undefined) { - if (options === undefined) { - throw new TypeError( - "No handler was provided, so an options bag is mandatory.", - ); - } - handler = options.handler; - } - if (typeof handler !== "function") { - throw new TypeError("A handler function must be provided."); - } - if (options === undefined) { - options = {}; - } - - const signal = options.signal; - const onError = options.onError ?? function (error) { - console.error(error); - return new Response("Internal Server Error", { status: 500 }); - }; - const onListen = options.onListen ?? function ({ port }) { - console.log( - `Listening on http://${hostnameForDisplay(listenOpts.hostname)}:${port}/`, - ); - }; - const listenOpts = { - hostname: options.hostname ?? "127.0.0.1", - port: options.port ?? 9000, - reusePort: options.reusePort ?? false, - }; - - if (options.cert || options.key) { - if (!options.cert || !options.key) { - throw new TypeError( - "Both cert and key must be provided to enable HTTPS.", - ); - } - listenOpts.cert = options.cert; - listenOpts.key = options.key; - } - - let listener; - if (listenOpts.cert && listenOpts.key) { - listener = listenTls({ - hostname: listenOpts.hostname, - port: listenOpts.port, - cert: listenOpts.cert, - key: listenOpts.key, - reusePort: listenOpts.reusePort, - }); - } else { - listener = listen({ - hostname: listenOpts.hostname, - port: listenOpts.port, - reusePort: listenOpts.reusePort, - }); - } - - const serverDeferred = new Deferred(); - const activeHttpConnections = new SafeSet(); - - const server = { - transport: listenOpts.cert && listenOpts.key ? "https" : "http", - hostname: listenOpts.hostname, - port: listenOpts.port, - closed: false, - - close() { - if (server.closed) { - return; - } - server.closed = true; - try { - listener.close(); - } catch { - // Might have been already closed. - } - - for (const httpConn of new SafeSetIterator(activeHttpConnections)) { - try { - httpConn.close(); - } catch { - // Might have been already closed. - } - } - - SetPrototypeClear(activeHttpConnections); - serverDeferred.resolve(); - }, - - async serve() { - while (!server.closed) { - let conn; - - try { - conn = await listener.accept(); - } catch { - // Listener has been closed. - if (!server.closed) { - console.log("Listener has closed unexpectedly"); - } - break; - } - - let httpConn; - try { - const rid = ops.op_http_start(conn.rid); - httpConn = new HttpConn(rid, conn.remoteAddr, conn.localAddr); - } catch { - // Connection has been closed; - continue; - } - - SetPrototypeAdd(activeHttpConnections, httpConn); - - const connInfo = { - localAddr: conn.localAddr, - remoteAddr: conn.remoteAddr, - }; - // Serve the HTTP connection - serveConnection( - server, - activeHttpConnections, - handler, - httpConn, - connInfo, - onError, - ); - } - await serverDeferred.promise; - }, - }; - - signal?.addEventListener( - "abort", - () => { - try { - server.close(); - } catch { - // Pass - } - }, - { once: true }, - ); - - onListen(listener.addr); - - await PromisePrototypeCatch(server.serve(), console.error); -} - export { _ws, HttpConn, serve, upgradeHttp, upgradeHttpRaw, upgradeWebSocket }; diff --git a/ext/http/Cargo.toml b/ext/http/Cargo.toml index 382fd3184f..bb965d9b25 100644 --- a/ext/http/Cargo.toml +++ b/ext/http/Cargo.toml @@ -10,6 +10,9 @@ readme = "README.md" repository.workspace = true description = "HTTP server implementation for Deno" +[features] +"__zombie_http_tracking" = [] + [lib] path = "lib.rs" @@ -24,11 +27,14 @@ brotli = "3.3.4" bytes.workspace = true cache_control.workspace = true deno_core.workspace = true +deno_net.workspace = true deno_websocket.workspace = true flate2.workspace = true fly-accept-encoding = "0.2.0" +http.workspace = true httparse.workspace = true hyper = { workspace = true, features = ["server", "stream", "http1", "http2", "runtime"] } +hyper1 = { package = "hyper", features = ["full"], version = "1.0.0-rc.3" } memmem.workspace = true mime = "0.3.16" once_cell.workspace = true @@ -37,6 +43,8 @@ phf = { version = "0.10", features = ["macros"] } pin-project.workspace = true ring.workspace = true serde.workspace = true +slab.workspace = true +thiserror.workspace = true tokio.workspace = true tokio-util = { workspace = true, features = ["io"] } diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs new file mode 100644 index 0000000000..25088e1ab0 --- /dev/null +++ b/ext/http/http_next.rs @@ -0,0 +1,765 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +use crate::extract_network_stream; +use crate::request_body::HttpRequestBody; +use crate::request_properties::DefaultHttpRequestProperties; +use crate::request_properties::HttpConnectionProperties; +use crate::request_properties::HttpListenProperties; +use crate::request_properties::HttpPropertyExtractor; +use crate::response_body::CompletionHandle; +use crate::response_body::ResponseBytes; +use crate::response_body::ResponseBytesInner; +use crate::response_body::V8StreamHttpResponseBody; +use crate::LocalExecutor; +use deno_core::error::AnyError; +use deno_core::futures::TryFutureExt; +use deno_core::op; +use deno_core::AsyncRefCell; +use deno_core::BufView; +use deno_core::ByteString; +use deno_core::CancelFuture; +use deno_core::CancelHandle; +use deno_core::CancelTryFuture; +use deno_core::OpState; +use deno_core::RcRef; +use deno_core::Resource; +use deno_core::ResourceId; +use deno_core::ZeroCopyBuf; +use deno_net::ops_tls::TlsStream; +use deno_net::raw::put_network_stream_resource; +use deno_net::raw::NetworkStream; +use deno_net::raw::NetworkStreamAddress; +use http::request::Parts; +use hyper1::body::Incoming; +use hyper1::header::COOKIE; +use hyper1::http::HeaderName; +use hyper1::http::HeaderValue; +use hyper1::server::conn::http1; +use hyper1::server::conn::http2; +use hyper1::service::service_fn; +use hyper1::upgrade::OnUpgrade; +use hyper1::StatusCode; +use pin_project::pin_project; +use pin_project::pinned_drop; +use slab::Slab; +use std::borrow::Cow; +use std::cell::RefCell; +use std::future::Future; +use std::io; +use std::net::Ipv4Addr; +use std::net::SocketAddr; +use std::net::SocketAddrV4; +use std::pin::Pin; +use std::rc::Rc; +use tokio::task::spawn_local; +use tokio::task::JoinHandle; + +type Request = hyper1::Request; +type Response = hyper1::Response; + +pub struct HttpSlabRecord { + request_info: HttpConnectionProperties, + request_parts: Parts, + request_body: Option, + // The response may get taken before we tear this down + response: Option, + body: Option>, + promise: CompletionHandle, + #[cfg(__zombie_http_tracking)] + alive: bool, +} + +thread_local! { + pub static SLAB: RefCell> = RefCell::new(Slab::with_capacity(1024)); +} + +/// Generates getters and setters for the [`SLAB`]. For example, +/// `with!(with_req, with_req_mut, Parts, http, http.request_parts);` expands to: +/// +/// ```ignore +/// #[inline(always)] +/// #[allow(dead_code)] +/// pub(crate) fn with_req_mut(key: usize, f: impl FnOnce(&mut Parts) -> T) -> T { +/// SLAB.with(|slab| { +/// let mut borrow = slab.borrow_mut(); +/// let mut http = borrow.get_mut(key).unwrap(); +/// #[cfg(__zombie_http_tracking)] +/// if !http.alive { +/// panic!("Attempted to access a dead HTTP object") +/// } +/// f(&mut http.expr) +/// }) +/// } + +/// #[inline(always)] +/// #[allow(dead_code)] +/// pub(crate) fn with_req(key: usize, f: impl FnOnce(&Parts) -> T) -> T { +/// SLAB.with(|slab| { +/// let mut borrow = slab.borrow(); +/// let mut http = borrow.get(key).unwrap(); +/// #[cfg(__zombie_http_tracking)] +/// if !http.alive { +/// panic!("Attempted to access a dead HTTP object") +/// } +/// f(&http.expr) +/// }) +/// } +/// ``` +macro_rules! with { + ($ref:ident, $mut:ident, $type:ty, $http:ident, $expr:expr) => { + #[inline(always)] + #[allow(dead_code)] + pub(crate) fn $mut(key: usize, f: impl FnOnce(&mut $type) -> T) -> T { + SLAB.with(|slab| { + let mut borrow = slab.borrow_mut(); + #[allow(unused_mut)] // TODO(mmastrac): compiler issue? + let mut $http = borrow.get_mut(key).unwrap(); + #[cfg(__zombie_http_tracking)] + if !$http.alive { + panic!("Attempted to access a dead HTTP object") + } + f(&mut $expr) + }) + } + + #[inline(always)] + #[allow(dead_code)] + pub(crate) fn $ref(key: usize, f: impl FnOnce(&$type) -> T) -> T { + SLAB.with(|slab| { + let borrow = slab.borrow(); + let $http = borrow.get(key).unwrap(); + #[cfg(__zombie_http_tracking)] + if !$http.alive { + panic!("Attempted to access a dead HTTP object") + } + f(&$expr) + }) + } + }; +} + +with!(with_req, with_req_mut, Parts, http, http.request_parts); +with!( + with_req_body, + with_req_body_mut, + Option, + http, + http.request_body +); +with!( + with_resp, + with_resp_mut, + Option, + http, + http.response +); +with!( + with_body, + with_body_mut, + Option>, + http, + http.body +); +with!( + with_promise, + with_promise_mut, + CompletionHandle, + http, + http.promise +); +with!(with_http, with_http_mut, HttpSlabRecord, http, http); + +fn slab_insert( + request: Request, + request_info: HttpConnectionProperties, +) -> usize { + SLAB.with(|slab| { + let (request_parts, request_body) = request.into_parts(); + slab.borrow_mut().insert(HttpSlabRecord { + request_info, + request_parts, + request_body: Some(request_body), + response: Some(Response::new(ResponseBytes::default())), + body: None, + promise: CompletionHandle::default(), + #[cfg(__zombie_http_tracking)] + alive: true, + }) + }) +} + +#[op] +pub fn op_upgrade_raw(_index: usize) {} + +#[op] +pub async fn op_upgrade( + state: Rc>, + index: usize, + headers: Vec<(ByteString, ByteString)>, +) -> Result<(ResourceId, ZeroCopyBuf), AnyError> { + // Stage 1: set the respnse to 101 Switching Protocols and send it + let upgrade = with_http_mut(index, |http| { + // Manually perform the upgrade. We're peeking into hyper's underlying machinery here a bit + let upgrade = http.request_parts.extensions.remove::().unwrap(); + + let response = http.response.as_mut().unwrap(); + *response.status_mut() = StatusCode::SWITCHING_PROTOCOLS; + for (name, value) in headers { + response.headers_mut().append( + HeaderName::from_bytes(&name).unwrap(), + HeaderValue::from_bytes(&value).unwrap(), + ); + } + http.promise.complete(true); + upgrade + }); + + // Stage 2: wait for the request to finish upgrading + let upgraded = upgrade.await?; + + // Stage 3: return the extracted raw network stream + let (stream, bytes) = extract_network_stream(upgraded); + + // We're allocating for those extra bytes, but they are probably going to be empty most of the time + Ok(( + put_network_stream_resource( + &mut state.borrow_mut().resource_table, + stream, + )?, + ZeroCopyBuf::from(bytes.to_vec()), + )) +} + +#[op] +pub fn op_set_promise_complete(index: usize, status: u16) { + with_resp_mut(index, |resp| { + // The Javascript code will never provide a status that is invalid here (see 23_response.js) + *resp.as_mut().unwrap().status_mut() = + StatusCode::from_u16(status).unwrap(); + }); + with_promise_mut(index, |promise| { + promise.complete(true); + }); +} + +#[op] +pub fn op_get_request_method_and_url( + index: usize, +) -> (String, Option, String, String, Option) { + // TODO(mmastrac): Passing method can be optimized + with_http(index, |http| { + let request_properties = DefaultHttpRequestProperties::request_properties( + &http.request_info, + &http.request_parts.uri, + &http.request_parts.headers, + ); + + // Only extract the path part - we handle authority elsewhere + let path = match &http.request_parts.uri.path_and_query() { + Some(path_and_query) => path_and_query.to_string(), + None => "".to_owned(), + }; + + ( + http.request_parts.method.as_str().to_owned(), + request_properties.authority, + path, + String::from(http.request_info.peer_address.as_ref()), + http.request_info.peer_port, + ) + }) +} + +#[op] +pub fn op_get_request_header(index: usize, name: String) -> Option { + with_req(index, |req| { + let value = req.headers.get(name); + value.map(|value| value.as_bytes().into()) + }) +} + +#[op] +pub fn op_get_request_headers(index: usize) -> Vec<(ByteString, ByteString)> { + with_req(index, |req| { + let headers = &req.headers; + let mut vec = Vec::with_capacity(headers.len()); + let mut cookies: Option> = None; + for (name, value) in headers { + if name == COOKIE { + if let Some(ref mut cookies) = cookies { + cookies.push(value.as_bytes()); + } else { + cookies = Some(vec![value.as_bytes()]); + } + } else { + let name: &[u8] = name.as_ref(); + vec.push((name.into(), value.as_bytes().into())) + } + } + + // We treat cookies specially, because we don't want them to get them + // mangled by the `Headers` object in JS. What we do is take all cookie + // headers and concat them into a single cookie header, separated by + // semicolons. + // TODO(mmastrac): This should probably happen on the JS side on-demand + if let Some(cookies) = cookies { + let cookie_sep = "; ".as_bytes(); + vec.push(( + ByteString::from(COOKIE.as_str()), + ByteString::from(cookies.join(cookie_sep)), + )); + } + vec + }) +} + +#[op] +pub fn op_read_request_body(state: &mut OpState, index: usize) -> ResourceId { + let incoming = with_req_body_mut(index, |body| body.take().unwrap()); + let body_resource = Rc::new(HttpRequestBody::new(incoming)); + let res = state.resource_table.add_rc(body_resource.clone()); + with_body_mut(index, |body| { + *body = Some(body_resource); + }); + res +} + +#[op] +pub fn op_set_response_header( + index: usize, + name: ByteString, + value: ByteString, +) { + with_resp_mut(index, |resp| { + let resp_headers = resp.as_mut().unwrap().headers_mut(); + // These are valid latin-1 strings + let name = HeaderName::from_bytes(&name).unwrap(); + let value = HeaderValue::from_bytes(&value).unwrap(); + resp_headers.append(name, value); + }); +} + +#[op] +pub fn op_set_response_headers( + index: usize, + headers: Vec<(ByteString, ByteString)>, +) { + // TODO(mmastrac): Invalid headers should be handled? + with_resp_mut(index, |resp| { + let resp_headers = resp.as_mut().unwrap().headers_mut(); + resp_headers.reserve(headers.len()); + for (name, value) in headers { + // These are valid latin-1 strings + let name = HeaderName::from_bytes(&name).unwrap(); + let value = HeaderValue::from_bytes(&value).unwrap(); + resp_headers.append(name, value); + } + }) +} + +#[op] +pub fn op_set_response_body_resource( + state: &mut OpState, + index: usize, + stream_rid: ResourceId, + auto_close: bool, +) -> Result<(), AnyError> { + // If the stream is auto_close, we will hold the last ref to it until the response is complete. + let resource = if auto_close { + state.resource_table.take_any(stream_rid)? + } else { + state.resource_table.get_any(stream_rid)? + }; + + with_resp_mut(index, move |response| { + let future = resource.clone().read(64 * 1024); + response + .as_mut() + .unwrap() + .body_mut() + .initialize(ResponseBytesInner::Resource(auto_close, resource, future)); + }); + + Ok(()) +} + +#[op] +pub fn op_set_response_body_stream( + state: &mut OpState, + index: usize, +) -> Result { + // TODO(mmastrac): what should this channel size be? + let (tx, rx) = tokio::sync::mpsc::channel(1); + let (tx, rx) = ( + V8StreamHttpResponseBody::new(tx), + ResponseBytesInner::V8Stream(rx), + ); + + with_resp_mut(index, move |response| { + response.as_mut().unwrap().body_mut().initialize(rx); + }); + + Ok(state.resource_table.add(tx)) +} + +#[op] +pub fn op_set_response_body_text(index: usize, text: String) { + if !text.is_empty() { + with_resp_mut(index, move |response| { + response + .as_mut() + .unwrap() + .body_mut() + .initialize(ResponseBytesInner::Bytes(BufView::from(text.into_bytes()))) + }); + } +} + +#[op] +pub fn op_set_response_body_bytes(index: usize, buffer: ZeroCopyBuf) { + if !buffer.is_empty() { + with_resp_mut(index, |response| { + response + .as_mut() + .unwrap() + .body_mut() + .initialize(ResponseBytesInner::Bytes(BufView::from(buffer))) + }); + }; +} + +#[op] +pub async fn op_http_track( + state: Rc>, + index: usize, + server_rid: ResourceId, +) -> Result<(), AnyError> { + let handle = with_resp(index, |resp| { + resp.as_ref().unwrap().body().completion_handle() + }); + + let join_handle = state + .borrow_mut() + .resource_table + .get::(server_rid)?; + + match handle.or_cancel(join_handle.cancel_handle()).await { + Ok(true) => Ok(()), + Ok(false) => { + Err(AnyError::msg("connection closed before message completed")) + } + Err(_e) => Ok(()), + } +} + +#[pin_project(PinnedDrop)] +pub struct SlabFuture>(usize, #[pin] F); + +pub fn new_slab_future( + request: Request, + request_info: HttpConnectionProperties, + tx: tokio::sync::mpsc::Sender, +) -> SlabFuture> { + let index = slab_insert(request, request_info); + let rx = with_promise(index, |promise| promise.clone()); + SlabFuture(index, async move { + if tx.send(index).await.is_ok() { + // We only need to wait for completion if we aren't closed + rx.await; + } + }) +} + +impl> SlabFuture {} + +#[pinned_drop] +impl> PinnedDrop for SlabFuture { + fn drop(self: Pin<&mut Self>) { + SLAB.with(|slab| { + #[cfg(__zombie_http_tracking)] + { + slab.borrow_mut().get_mut(self.0).unwrap().alive = false; + } + #[cfg(not(__zombie_http_tracking))] + { + slab.borrow_mut().remove(self.0); + } + }); + } +} + +impl> Future for SlabFuture { + type Output = Result; + + fn poll( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + let index = self.0; + self + .project() + .1 + .poll(cx) + .map(|_| Ok(with_resp_mut(index, |resp| resp.take().unwrap()))) + } +} + +fn serve_https( + mut io: TlsStream, + request_info: HttpConnectionProperties, + cancel: RcRef, + tx: tokio::sync::mpsc::Sender, +) -> JoinHandle> { + // TODO(mmastrac): This is faster if we can use tokio::spawn but then the send bounds get us + let svc = service_fn(move |req: Request| { + new_slab_future(req, request_info.clone(), tx.clone()) + }); + spawn_local(async { + io.handshake().await?; + let handshake = io.get_ref().1.alpn_protocol(); + // h2 + if handshake == Some(&[104, 50]) { + let conn = http2::Builder::new(LocalExecutor).serve_connection(io, svc); + + conn.map_err(AnyError::from).try_or_cancel(cancel).await + } else { + let conn = http1::Builder::new() + .keep_alive(true) + .serve_connection(io, svc); + + conn + .with_upgrades() + .map_err(AnyError::from) + .try_or_cancel(cancel) + .await + } + }) +} + +fn serve_http( + io: impl tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static, + request_info: HttpConnectionProperties, + cancel: RcRef, + tx: tokio::sync::mpsc::Sender, +) -> JoinHandle> { + // TODO(mmastrac): This is faster if we can use tokio::spawn but then the send bounds get us + let svc = service_fn(move |req: Request| { + new_slab_future(req, request_info.clone(), tx.clone()) + }); + spawn_local(async { + let conn = http1::Builder::new() + .keep_alive(true) + .serve_connection(io, svc); + conn + .with_upgrades() + .map_err(AnyError::from) + .try_or_cancel(cancel) + .await + }) +} + +fn serve_http_on( + network_stream: NetworkStream, + listen_properties: &HttpListenProperties, + cancel: RcRef, + tx: tokio::sync::mpsc::Sender, +) -> JoinHandle> { + // We always want some sort of peer address. If we can't get one, just make up one. + let peer_address = network_stream.peer_address().unwrap_or_else(|_| { + NetworkStreamAddress::Ip(SocketAddr::V4(SocketAddrV4::new( + Ipv4Addr::new(0, 0, 0, 0), + 0, + ))) + }); + let connection_properties: HttpConnectionProperties = + DefaultHttpRequestProperties::connection_properties( + listen_properties, + &peer_address, + ); + + match network_stream { + NetworkStream::Tcp(conn) => { + serve_http(conn, connection_properties, cancel, tx) + } + NetworkStream::Tls(conn) => { + serve_https(conn, connection_properties, cancel, tx) + } + #[cfg(unix)] + NetworkStream::Unix(conn) => { + serve_http(conn, connection_properties, cancel, tx) + } + } +} + +struct HttpJoinHandle( + AsyncRefCell>>>, + CancelHandle, + AsyncRefCell>, +); + +impl HttpJoinHandle { + fn cancel_handle(self: &Rc) -> RcRef { + RcRef::map(self, |this| &this.1) + } +} + +impl Resource for HttpJoinHandle { + fn name(&self) -> Cow { + "http".into() + } + + fn close(self: Rc) { + self.1.cancel() + } +} + +#[op(v8)] +pub fn op_serve_http( + state: Rc>, + listener_rid: ResourceId, +) -> Result<(ResourceId, &'static str, String), AnyError> { + let listener = + DefaultHttpRequestProperties::get_network_stream_listener_for_rid( + &mut state.borrow_mut(), + listener_rid, + )?; + + let local_address = listener.listen_address()?; + let listen_properties = DefaultHttpRequestProperties::listen_properties( + listener.stream(), + &local_address, + ); + + let (tx, rx) = tokio::sync::mpsc::channel(10); + let resource: Rc = Rc::new(HttpJoinHandle( + AsyncRefCell::new(None), + CancelHandle::new(), + AsyncRefCell::new(rx), + )); + let cancel_clone = resource.cancel_handle(); + + let listen_properties_clone = listen_properties.clone(); + let handle = spawn_local(async move { + loop { + let conn = listener + .accept() + .try_or_cancel(cancel_clone.clone()) + .await?; + serve_http_on( + conn, + &listen_properties_clone, + cancel_clone.clone(), + tx.clone(), + ); + } + #[allow(unreachable_code)] + Ok::<_, AnyError>(()) + }); + + // Set the handle after we start the future + *RcRef::map(&resource, |this| &this.0) + .try_borrow_mut() + .unwrap() = Some(handle); + + Ok(( + state.borrow_mut().resource_table.add_rc(resource), + listen_properties.scheme, + listen_properties.fallback_host, + )) +} + +#[op(v8)] +pub fn op_serve_http_on( + state: Rc>, + conn: ResourceId, +) -> Result<(ResourceId, &'static str, String), AnyError> { + let network_stream = + DefaultHttpRequestProperties::get_network_stream_for_rid( + &mut state.borrow_mut(), + conn, + )?; + + let local_address = network_stream.local_address()?; + let listen_properties = DefaultHttpRequestProperties::listen_properties( + network_stream.stream(), + &local_address, + ); + + let (tx, rx) = tokio::sync::mpsc::channel(10); + let resource: Rc = Rc::new(HttpJoinHandle( + AsyncRefCell::new(None), + CancelHandle::new(), + AsyncRefCell::new(rx), + )); + + let handle = serve_http_on( + network_stream, + &listen_properties, + resource.cancel_handle(), + tx, + ); + + // Set the handle after we start the future + *RcRef::map(&resource, |this| &this.0) + .try_borrow_mut() + .unwrap() = Some(handle); + + Ok(( + state.borrow_mut().resource_table.add_rc(resource), + listen_properties.scheme, + listen_properties.fallback_host, + )) +} + +#[op] +pub async fn op_http_wait( + state: Rc>, + rid: ResourceId, +) -> Result { + // We will get the join handle initially, as we might be consuming requests still + let join_handle = state + .borrow_mut() + .resource_table + .get::(rid)?; + + let cancel = join_handle.clone().cancel_handle(); + let next = async { + let mut recv = RcRef::map(&join_handle, |this| &this.2).borrow_mut().await; + recv.recv().await + } + .or_cancel(cancel) + .unwrap_or_else(|_| None) + .await; + + // Do we have a request? + if let Some(req) = next { + return Ok(req as u32); + } + + // No - we're shutting down + let res = RcRef::map(join_handle, |this| &this.0) + .borrow_mut() + .await + .take() + .unwrap() + .await?; + + // Drop the cancel and join handles + state + .borrow_mut() + .resource_table + .take::(rid)?; + + // Filter out shutdown (ENOTCONN) errors + if let Err(err) = res { + if let Some(err) = err.source() { + if let Some(err) = err.downcast_ref::() { + if err.kind() == io::ErrorKind::NotConnected { + return Ok(u32::MAX); + } + } + } + return Err(err); + } + + Ok(u32::MAX) +} diff --git a/ext/http/lib.rs b/ext/http/lib.rs index 43e3c130aa..561b13885d 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -34,6 +34,7 @@ use deno_core::ResourceId; use deno_core::StringOrBuffer; use deno_core::WriteOutcome; use deno_core::ZeroCopyBuf; +use deno_net::raw::NetworkStream; use deno_websocket::ws_create_server_stream; use flate2::write::GzEncoder; use flate2::Compression; @@ -76,7 +77,11 @@ use crate::reader_stream::ExternallyAbortableReaderStream; use crate::reader_stream::ShutdownHandle; pub mod compressible; +mod http_next; mod reader_stream; +mod request_body; +mod request_properties; +mod response_body; mod websocket_upgrade; deno_core::extension!( @@ -92,8 +97,25 @@ deno_core::extension!( op_http_websocket_accept_header, op_http_upgrade_early, op_http_upgrade_websocket, + http_next::op_serve_http, + http_next::op_serve_http_on, + http_next::op_http_wait, + http_next::op_http_track, + http_next::op_set_response_header, + http_next::op_set_response_headers, + http_next::op_set_response_body_text, + http_next::op_set_promise_complete, + http_next::op_set_response_body_bytes, + http_next::op_set_response_body_resource, + http_next::op_set_response_body_stream, + http_next::op_get_request_header, + http_next::op_get_request_headers, + http_next::op_get_request_method_and_url, + http_next::op_read_request_body, + http_next::op_upgrade, + http_next::op_upgrade_raw, ], - esm = ["01_http.js"], + esm = ["00_serve.js", "01_http.js"], ); pub enum HttpSocketAddr { @@ -1147,8 +1169,10 @@ async fn op_http_upgrade_websocket( } }; - let transport = hyper::upgrade::on(request).await?; - let ws_rid = ws_create_server_stream(&state, transport).await?; + let (transport, bytes) = + extract_network_stream(hyper::upgrade::on(request).await?); + let ws_rid = + ws_create_server_stream(&mut state.borrow_mut(), transport, bytes)?; Ok(ws_rid) } @@ -1166,6 +1190,16 @@ where } } +impl hyper1::rt::Executor for LocalExecutor +where + Fut: Future + 'static, + Fut::Output: 'static, +{ + fn execute(&self, fut: Fut) { + spawn_local(fut); + } +} + fn http_error(message: &'static str) -> AnyError { custom_error("Http", message) } @@ -1192,3 +1226,47 @@ fn filter_enotconn( fn never() -> Pending { pending() } + +trait CanDowncastUpgrade: Sized { + fn downcast( + self, + ) -> Result<(T, Bytes), Self>; +} + +impl CanDowncastUpgrade for hyper1::upgrade::Upgraded { + fn downcast( + self, + ) -> Result<(T, Bytes), Self> { + let hyper1::upgrade::Parts { io, read_buf, .. } = self.downcast()?; + Ok((io, read_buf)) + } +} + +impl CanDowncastUpgrade for hyper::upgrade::Upgraded { + fn downcast( + self, + ) -> Result<(T, Bytes), Self> { + let hyper::upgrade::Parts { io, read_buf, .. } = self.downcast()?; + Ok((io, read_buf)) + } +} + +fn extract_network_stream( + upgraded: U, +) -> (NetworkStream, Bytes) { + let upgraded = match upgraded.downcast::() { + Ok((stream, bytes)) => return (NetworkStream::Tcp(stream), bytes), + Err(x) => x, + }; + let upgraded = match upgraded.downcast::() { + Ok((stream, bytes)) => return (NetworkStream::Tls(stream), bytes), + Err(x) => x, + }; + #[cfg(unix)] + let upgraded = match upgraded.downcast::() { + Ok((stream, bytes)) => return (NetworkStream::Unix(stream), bytes), + Err(x) => x, + }; + drop(upgraded); + unreachable!("unexpected stream type"); +} diff --git a/ext/http/request_body.rs b/ext/http/request_body.rs new file mode 100644 index 0000000000..73908ca55d --- /dev/null +++ b/ext/http/request_body.rs @@ -0,0 +1,84 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +use bytes::Bytes; +use deno_core::error::AnyError; +use deno_core::futures::stream::Peekable; +use deno_core::futures::Stream; +use deno_core::futures::StreamExt; +use deno_core::AsyncRefCell; +use deno_core::AsyncResult; +use deno_core::BufView; +use deno_core::RcRef; +use deno_core::Resource; +use hyper1::body::Body; +use hyper1::body::Incoming; +use hyper1::body::SizeHint; +use std::borrow::Cow; +use std::pin::Pin; +use std::rc::Rc; + +/// Converts a hyper incoming body stream into a stream of [`Bytes`] that we can use to read in V8. +struct ReadFuture(Incoming); + +impl Stream for ReadFuture { + type Item = Result; + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let res = Pin::new(&mut self.get_mut().0).poll_frame(cx); + match res { + std::task::Poll::Ready(Some(Ok(frame))) => { + if let Ok(data) = frame.into_data() { + // Ensure that we never yield an empty frame + if !data.is_empty() { + return std::task::Poll::Ready(Some(Ok(data))); + } + } + } + std::task::Poll::Ready(None) => return std::task::Poll::Ready(None), + _ => {} + } + std::task::Poll::Pending + } +} + +pub struct HttpRequestBody(AsyncRefCell>, SizeHint); + +impl HttpRequestBody { + pub fn new(body: Incoming) -> Self { + let size_hint = body.size_hint(); + Self(AsyncRefCell::new(ReadFuture(body).peekable()), size_hint) + } + + async fn read(self: Rc, limit: usize) -> Result { + let peekable = RcRef::map(self, |this| &this.0); + let mut peekable = peekable.borrow_mut().await; + match Pin::new(&mut *peekable).peek_mut().await { + None => Ok(BufView::empty()), + Some(Err(_)) => Err(peekable.next().await.unwrap().err().unwrap()), + Some(Ok(bytes)) => { + if bytes.len() <= limit { + // We can safely take the next item since we peeked it + return Ok(BufView::from(peekable.next().await.unwrap()?)); + } + let ret = bytes.split_to(limit); + Ok(BufView::from(ret)) + } + } + } +} + +impl Resource for HttpRequestBody { + fn name(&self) -> Cow { + "requestBody".into() + } + + fn read(self: Rc, limit: usize) -> AsyncResult { + Box::pin(HttpRequestBody::read(self, limit)) + } + + fn size_hint(&self) -> (u64, Option) { + (self.1.lower(), self.1.upper()) + } +} diff --git a/ext/http/request_properties.rs b/ext/http/request_properties.rs new file mode 100644 index 0000000000..7a7f5219c0 --- /dev/null +++ b/ext/http/request_properties.rs @@ -0,0 +1,249 @@ +use deno_core::error::AnyError; +use deno_core::OpState; +use deno_core::ResourceId; +use deno_net::raw::NetworkStream; +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +use deno_net::raw::take_network_stream_listener_resource; +use deno_net::raw::take_network_stream_resource; +use deno_net::raw::NetworkStreamAddress; +use deno_net::raw::NetworkStreamListener; +use deno_net::raw::NetworkStreamType; +use hyper::HeaderMap; +use hyper::Uri; +use hyper1::header::HOST; +use std::borrow::Cow; +use std::rc::Rc; + +// TODO(mmastrac): I don't like that we have to clone this, but it's one-time setup +#[derive(Clone)] +pub struct HttpListenProperties { + pub stream_type: NetworkStreamType, + pub scheme: &'static str, + pub fallback_host: String, + pub local_port: Option, +} + +#[derive(Clone)] +pub struct HttpConnectionProperties { + pub stream_type: NetworkStreamType, + pub peer_address: Rc, + pub peer_port: Option, + pub local_port: Option, +} + +pub struct HttpRequestProperties { + pub authority: Option, +} + +/// Pluggable trait to determine listen, connection and request properties +/// for embedders that wish to provide alternative routes for incoming HTTP. +pub trait HttpPropertyExtractor { + /// Given a listener [`ResourceId`], returns the [`NetworkStreamListener`]. + fn get_network_stream_listener_for_rid( + state: &mut OpState, + listener_rid: ResourceId, + ) -> Result; + + /// Given a connection [`ResourceId`], returns the [`NetworkStream`]. + fn get_network_stream_for_rid( + state: &mut OpState, + rid: ResourceId, + ) -> Result; + + /// Determines the listener properties. + fn listen_properties( + stream_type: NetworkStreamType, + local_address: &NetworkStreamAddress, + ) -> HttpListenProperties; + + /// Determines the connection properties. + fn connection_properties( + listen_properties: &HttpListenProperties, + peer_address: &NetworkStreamAddress, + ) -> HttpConnectionProperties; + + /// Determines the request properties. + fn request_properties( + connection_properties: &HttpConnectionProperties, + uri: &Uri, + headers: &HeaderMap, + ) -> HttpRequestProperties; +} + +pub struct DefaultHttpRequestProperties {} + +impl HttpPropertyExtractor for DefaultHttpRequestProperties { + fn get_network_stream_for_rid( + state: &mut OpState, + rid: ResourceId, + ) -> Result { + take_network_stream_resource(&mut state.resource_table, rid) + } + + fn get_network_stream_listener_for_rid( + state: &mut OpState, + listener_rid: ResourceId, + ) -> Result { + take_network_stream_listener_resource( + &mut state.resource_table, + listener_rid, + ) + } + + fn listen_properties( + stream_type: NetworkStreamType, + local_address: &NetworkStreamAddress, + ) -> HttpListenProperties { + let scheme = req_scheme_from_stream_type(stream_type); + let fallback_host = req_host_from_addr(stream_type, local_address); + let local_port: Option = match local_address { + NetworkStreamAddress::Ip(ip) => Some(ip.port()), + #[cfg(unix)] + NetworkStreamAddress::Unix(_) => None, + }; + + HttpListenProperties { + scheme, + fallback_host, + local_port, + stream_type, + } + } + + fn connection_properties( + listen_properties: &HttpListenProperties, + peer_address: &NetworkStreamAddress, + ) -> HttpConnectionProperties { + let peer_port: Option = match peer_address { + NetworkStreamAddress::Ip(ip) => Some(ip.port()), + #[cfg(unix)] + NetworkStreamAddress::Unix(_) => None, + }; + let peer_address = match peer_address { + NetworkStreamAddress::Ip(addr) => Rc::from(addr.ip().to_string()), + #[cfg(unix)] + NetworkStreamAddress::Unix(_) => Rc::from("unix"), + }; + let local_port = listen_properties.local_port; + let stream_type = listen_properties.stream_type; + + HttpConnectionProperties { + stream_type, + peer_address, + peer_port, + local_port, + } + } + + fn request_properties( + connection_properties: &HttpConnectionProperties, + uri: &Uri, + headers: &HeaderMap, + ) -> HttpRequestProperties { + let authority = req_host( + uri, + headers, + connection_properties.stream_type, + connection_properties.local_port.unwrap_or_default(), + ) + .map(|s| s.into_owned()); + + HttpRequestProperties { authority } + } +} + +/// Compute the fallback address from the [`NetworkStreamListenAddress`]. If the request has no authority/host in +/// its URI, and there is no [`HeaderName::HOST`] header, we fall back to this. +fn req_host_from_addr( + stream_type: NetworkStreamType, + addr: &NetworkStreamAddress, +) -> String { + match addr { + NetworkStreamAddress::Ip(addr) => { + if (stream_type == NetworkStreamType::Tls && addr.port() == 443) + || (stream_type == NetworkStreamType::Tcp && addr.port() == 80) + { + if addr.ip().is_loopback() || addr.ip().is_unspecified() { + return "localhost".to_owned(); + } + addr.ip().to_string() + } else { + if addr.ip().is_loopback() || addr.ip().is_unspecified() { + return format!("localhost:{}", addr.port()); + } + addr.to_string() + } + } + // There is no standard way for unix domain socket URLs + // nginx and nodejs request use http://unix:[socket_path]:/ but it is not a valid URL + // httpie uses http+unix://[percent_encoding_of_path]/ which we follow + #[cfg(unix)] + NetworkStreamAddress::Unix(unix) => percent_encoding::percent_encode( + unix + .as_pathname() + .and_then(|x| x.to_str()) + .unwrap_or_default() + .as_bytes(), + percent_encoding::NON_ALPHANUMERIC, + ) + .to_string(), + } +} + +fn req_scheme_from_stream_type(stream_type: NetworkStreamType) -> &'static str { + match stream_type { + NetworkStreamType::Tcp => "http://", + NetworkStreamType::Tls => "https://", + #[cfg(unix)] + NetworkStreamType::Unix => "http+unix://", + } +} + +fn req_host<'a>( + uri: &'a Uri, + headers: &'a HeaderMap, + addr_type: NetworkStreamType, + port: u16, +) -> Option> { + // Unix sockets always use the socket address + #[cfg(unix)] + if addr_type == NetworkStreamType::Unix { + return None; + } + + // It is rare that an authority will be passed, but if it does, it takes priority + if let Some(auth) = uri.authority() { + match addr_type { + NetworkStreamType::Tcp => { + if port == 80 { + return Some(Cow::Borrowed(auth.host())); + } + } + NetworkStreamType::Tls => { + if port == 443 { + return Some(Cow::Borrowed(auth.host())); + } + } + #[cfg(unix)] + NetworkStreamType::Unix => {} + } + return Some(Cow::Borrowed(auth.as_str())); + } + + // TODO(mmastrac): Most requests will use this path and we probably will want to optimize it in the future + if let Some(host) = headers.get(HOST) { + return Some(match host.to_str() { + Ok(host) => Cow::Borrowed(host), + Err(_) => Cow::Owned( + host + .as_bytes() + .iter() + .cloned() + .map(char::from) + .collect::(), + ), + }); + } + + None +} diff --git a/ext/http/response_body.rs b/ext/http/response_body.rs new file mode 100644 index 0000000000..0086e4d782 --- /dev/null +++ b/ext/http/response_body.rs @@ -0,0 +1,253 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +use std::borrow::Cow; +use std::cell::RefCell; +use std::future::Future; +use std::pin::Pin; +use std::rc::Rc; +use std::task::Waker; + +use deno_core::error::bad_resource; +use deno_core::error::AnyError; +use deno_core::futures::FutureExt; +use deno_core::AsyncRefCell; +use deno_core::AsyncResult; +use deno_core::BufView; +use deno_core::CancelHandle; +use deno_core::CancelTryFuture; +use deno_core::RcRef; +use deno_core::Resource; +use deno_core::WriteOutcome; +use hyper1::body::Body; +use hyper1::body::Frame; +use hyper1::body::SizeHint; + +#[derive(Clone, Debug, Default)] +pub struct CompletionHandle { + inner: Rc>, +} + +#[derive(Debug, Default)] +struct CompletionHandleInner { + complete: bool, + success: bool, + waker: Option, +} + +impl CompletionHandle { + pub fn complete(&self, success: bool) { + let mut mut_self = self.inner.borrow_mut(); + mut_self.complete = true; + mut_self.success = success; + if let Some(waker) = mut_self.waker.take() { + drop(mut_self); + waker.wake(); + } + } +} + +impl Future for CompletionHandle { + type Output = bool; + + fn poll( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + let mut mut_self = self.inner.borrow_mut(); + if mut_self.complete { + return std::task::Poll::Ready(mut_self.success); + } + + mut_self.waker = Some(cx.waker().clone()); + std::task::Poll::Pending + } +} + +#[derive(Default)] +pub enum ResponseBytesInner { + /// An empty stream. + #[default] + Empty, + /// A completed stream. + Done, + /// A static buffer of bytes, sent it one fell swoop. + Bytes(BufView), + /// A resource stream, piped in fast mode. + Resource(bool, Rc, AsyncResult), + /// A JS-backed stream, written in JS and transported via pipe. + V8Stream(tokio::sync::mpsc::Receiver), +} + +impl std::fmt::Debug for ResponseBytesInner { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Done => f.write_str("Done"), + Self::Empty => f.write_str("Empty"), + Self::Bytes(..) => f.write_str("Bytes"), + Self::Resource(..) => f.write_str("Resource"), + Self::V8Stream(..) => f.write_str("V8Stream"), + } + } +} + +/// This represents the union of possible response types in Deno with the stream-style [`Body`] interface +/// required by hyper. As the API requires information about request completion (including a success/fail +/// flag), we include a very lightweight [`CompletionHandle`] for interested parties to listen on. +#[derive(Debug, Default)] +pub struct ResponseBytes(ResponseBytesInner, CompletionHandle); + +impl ResponseBytes { + pub fn initialize(&mut self, inner: ResponseBytesInner) { + debug_assert!(matches!(self.0, ResponseBytesInner::Empty)); + self.0 = inner; + } + + pub fn completion_handle(&self) -> CompletionHandle { + self.1.clone() + } + + fn complete(&mut self, success: bool) -> ResponseBytesInner { + if matches!(self.0, ResponseBytesInner::Done) { + return ResponseBytesInner::Done; + } + + let current = std::mem::replace(&mut self.0, ResponseBytesInner::Done); + self.1.complete(success); + current + } +} + +impl ResponseBytesInner { + pub fn size_hint(&self) -> SizeHint { + match self { + Self::Done => SizeHint::with_exact(0), + Self::Empty => SizeHint::with_exact(0), + Self::Bytes(bytes) => SizeHint::with_exact(bytes.len() as u64), + Self::Resource(_, res, _) => { + let hint = res.size_hint(); + let mut size_hint = SizeHint::new(); + size_hint.set_lower(hint.0); + if let Some(upper) = hint.1 { + size_hint.set_upper(upper) + } + size_hint + } + Self::V8Stream(..) => SizeHint::default(), + } + } +} + +impl Body for ResponseBytes { + type Data = BufView; + type Error = AnyError; + + fn poll_frame( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll, Self::Error>>> { + match &mut self.0 { + ResponseBytesInner::Done | ResponseBytesInner::Empty => { + unreachable!() + } + ResponseBytesInner::Bytes(..) => { + if let ResponseBytesInner::Bytes(data) = self.complete(true) { + std::task::Poll::Ready(Some(Ok(Frame::data(data)))) + } else { + unreachable!() + } + } + ResponseBytesInner::Resource(auto_close, stm, ref mut future) => { + match future.poll_unpin(cx) { + std::task::Poll::Pending => std::task::Poll::Pending, + std::task::Poll::Ready(Err(err)) => { + std::task::Poll::Ready(Some(Err(err))) + } + std::task::Poll::Ready(Ok(buf)) => { + if buf.is_empty() { + if *auto_close { + stm.clone().close(); + } + self.complete(true); + return std::task::Poll::Ready(None); + } + // Re-arm the future + *future = stm.clone().read(64 * 1024); + std::task::Poll::Ready(Some(Ok(Frame::data(buf)))) + } + } + } + ResponseBytesInner::V8Stream(stm) => match stm.poll_recv(cx) { + std::task::Poll::Pending => std::task::Poll::Pending, + std::task::Poll::Ready(Some(buf)) => { + std::task::Poll::Ready(Some(Ok(Frame::data(buf)))) + } + std::task::Poll::Ready(None) => { + self.complete(true); + std::task::Poll::Ready(None) + } + }, + } + } + + fn is_end_stream(&self) -> bool { + matches!(self.0, ResponseBytesInner::Done | ResponseBytesInner::Empty) + } + + fn size_hint(&self) -> SizeHint { + // The size hint currently only used in the case where it is exact bounds in hyper, but we'll pass it through + // anyways just in case hyper needs it. + self.0.size_hint() + } +} + +impl Drop for ResponseBytes { + fn drop(&mut self) { + // We won't actually poll_frame for Empty responses so this is where we return success + self.complete(matches!(self.0, ResponseBytesInner::Empty)); + } +} + +/// A response body object that can be passed to V8. This body will feed byte buffers to a channel which +/// feed's hyper's HTTP response. +pub struct V8StreamHttpResponseBody( + AsyncRefCell>>, + CancelHandle, +); + +impl V8StreamHttpResponseBody { + pub fn new(sender: tokio::sync::mpsc::Sender) -> Self { + Self(AsyncRefCell::new(Some(sender)), CancelHandle::default()) + } +} + +impl Resource for V8StreamHttpResponseBody { + fn name(&self) -> Cow { + "responseBody".into() + } + + fn write( + self: Rc, + buf: BufView, + ) -> AsyncResult { + let cancel_handle = RcRef::map(&self, |this| &this.1); + Box::pin( + async move { + let nwritten = buf.len(); + + let res = RcRef::map(self, |this| &this.0).borrow().await; + if let Some(tx) = res.as_ref() { + tx.send(buf) + .await + .map_err(|_| bad_resource("failed to write"))?; + Ok(WriteOutcome::Full { nwritten }) + } else { + Err(bad_resource("failed to write")) + } + } + .try_or_cancel(cancel_handle), + ) + } + + fn close(self: Rc) { + self.1.cancel(); + } +} diff --git a/ext/net/Cargo.toml b/ext/net/Cargo.toml index a7a1acff6f..6bab80cc79 100644 --- a/ext/net/Cargo.toml +++ b/ext/net/Cargo.toml @@ -17,6 +17,7 @@ path = "lib.rs" deno_core.workspace = true deno_tls.workspace = true log.workspace = true +pin-project.workspace = true serde.workspace = true socket2.workspace = true tokio.workspace = true diff --git a/ext/net/lib.rs b/ext/net/lib.rs index f812bf60bc..ff67186b0c 100644 --- a/ext/net/lib.rs +++ b/ext/net/lib.rs @@ -5,6 +5,7 @@ pub mod ops; pub mod ops_tls; #[cfg(unix)] pub mod ops_unix; +pub mod raw; pub mod resolve_addr; use deno_core::error::AnyError; diff --git a/ext/net/ops_tls.rs b/ext/net/ops_tls.rs index c0cfb8674f..8a77570668 100644 --- a/ext/net/ops_tls.rs +++ b/ext/net/ops_tls.rs @@ -61,6 +61,7 @@ use std::fs::File; use std::io; use std::io::BufReader; use std::io::ErrorKind; +use std::net::SocketAddr; use std::path::Path; use std::pin::Pin; use std::rc::Rc; @@ -115,6 +116,13 @@ impl TlsStream { Self::new(tcp, Connection::Client(tls)) } + pub fn new_client_side_from( + tcp: TcpStream, + connection: ClientConnection, + ) -> Self { + Self::new(tcp, Connection::Client(connection)) + } + pub fn new_server_side( tcp: TcpStream, tls_config: Arc, @@ -123,6 +131,13 @@ impl TlsStream { Self::new(tcp, Connection::Server(tls)) } + pub fn new_server_side_from( + tcp: TcpStream, + connection: ServerConnection, + ) -> Self { + Self::new(tcp, Connection::Server(connection)) + } + pub fn into_split(self) -> (ReadHalf, WriteHalf) { let shared = Shared::new(self); let rd = ReadHalf { @@ -132,6 +147,16 @@ impl TlsStream { (rd, wr) } + /// Convenience method to match [`TcpStream`]. + pub fn peer_addr(&self) -> Result { + self.0.as_ref().unwrap().tcp.peer_addr() + } + + /// Convenience method to match [`TcpStream`]. + pub fn local_addr(&self) -> Result { + self.0.as_ref().unwrap().tcp.local_addr() + } + /// Tokio-rustls compatibility: returns a reference to the underlying TCP /// stream, and a reference to the Rustls `Connection` object. pub fn get_ref(&self) -> (&TcpStream, &Connection) { @@ -954,8 +979,8 @@ fn load_private_keys_from_file( } pub struct TlsListenerResource { - tcp_listener: AsyncRefCell, - tls_config: Arc, + pub(crate) tcp_listener: AsyncRefCell, + pub(crate) tls_config: Arc, cancel_handle: CancelHandle, } diff --git a/ext/net/ops_unix.rs b/ext/net/ops_unix.rs index 1161d27592..bed923f8b4 100644 --- a/ext/net/ops_unix.rs +++ b/ext/net/ops_unix.rs @@ -32,8 +32,8 @@ pub fn into_string(s: std::ffi::OsString) -> Result { }) } -struct UnixListenerResource { - listener: AsyncRefCell, +pub(crate) struct UnixListenerResource { + pub listener: AsyncRefCell, cancel: CancelHandle, } diff --git a/ext/net/raw.rs b/ext/net/raw.rs new file mode 100644 index 0000000000..74cc10d630 --- /dev/null +++ b/ext/net/raw.rs @@ -0,0 +1,304 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +use crate::io::TcpStreamResource; +#[cfg(unix)] +use crate::io::UnixStreamResource; +use crate::ops::TcpListenerResource; +use crate::ops_tls::TlsListenerResource; +use crate::ops_tls::TlsStream; +use crate::ops_tls::TlsStreamResource; +#[cfg(unix)] +use crate::ops_unix::UnixListenerResource; +use deno_core::error::bad_resource; +use deno_core::error::bad_resource_id; +use deno_core::error::AnyError; +use deno_core::ResourceId; +use deno_core::ResourceTable; +use deno_tls::rustls::ServerConfig; +use pin_project::pin_project; +use std::rc::Rc; +use std::sync::Arc; +use tokio::net::TcpStream; +#[cfg(unix)] +use tokio::net::UnixStream; + +/// A raw stream of one of the types handled by this extension. +#[pin_project(project = NetworkStreamProject)] +pub enum NetworkStream { + Tcp(#[pin] TcpStream), + Tls(#[pin] TlsStream), + #[cfg(unix)] + Unix(#[pin] UnixStream), +} + +/// A raw stream of one of the types handled by this extension. +#[derive(Copy, Clone, PartialEq, Eq)] +pub enum NetworkStreamType { + Tcp, + Tls, + #[cfg(unix)] + Unix, +} + +impl NetworkStream { + pub fn local_address(&self) -> Result { + match self { + Self::Tcp(tcp) => Ok(NetworkStreamAddress::Ip(tcp.local_addr()?)), + Self::Tls(tls) => Ok(NetworkStreamAddress::Ip(tls.local_addr()?)), + #[cfg(unix)] + Self::Unix(unix) => Ok(NetworkStreamAddress::Unix(unix.local_addr()?)), + } + } + + pub fn peer_address(&self) -> Result { + match self { + Self::Tcp(tcp) => Ok(NetworkStreamAddress::Ip(tcp.peer_addr()?)), + Self::Tls(tls) => Ok(NetworkStreamAddress::Ip(tls.peer_addr()?)), + #[cfg(unix)] + Self::Unix(unix) => Ok(NetworkStreamAddress::Unix(unix.peer_addr()?)), + } + } + + pub fn stream(&self) -> NetworkStreamType { + match self { + Self::Tcp(_) => NetworkStreamType::Tcp, + Self::Tls(_) => NetworkStreamType::Tls, + #[cfg(unix)] + Self::Unix(_) => NetworkStreamType::Unix, + } + } +} + +impl tokio::io::AsyncRead for NetworkStream { + fn poll_read( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll> { + match self.project() { + NetworkStreamProject::Tcp(s) => s.poll_read(cx, buf), + NetworkStreamProject::Tls(s) => s.poll_read(cx, buf), + #[cfg(unix)] + NetworkStreamProject::Unix(s) => s.poll_read(cx, buf), + } + } +} + +impl tokio::io::AsyncWrite for NetworkStream { + fn poll_write( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll> { + match self.project() { + NetworkStreamProject::Tcp(s) => s.poll_write(cx, buf), + NetworkStreamProject::Tls(s) => s.poll_write(cx, buf), + #[cfg(unix)] + NetworkStreamProject::Unix(s) => s.poll_write(cx, buf), + } + } + + fn poll_flush( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + match self.project() { + NetworkStreamProject::Tcp(s) => s.poll_flush(cx), + NetworkStreamProject::Tls(s) => s.poll_flush(cx), + #[cfg(unix)] + NetworkStreamProject::Unix(s) => s.poll_flush(cx), + } + } + + fn poll_shutdown( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + match self.project() { + NetworkStreamProject::Tcp(s) => s.poll_shutdown(cx), + NetworkStreamProject::Tls(s) => s.poll_shutdown(cx), + #[cfg(unix)] + NetworkStreamProject::Unix(s) => s.poll_shutdown(cx), + } + } + + fn is_write_vectored(&self) -> bool { + match self { + Self::Tcp(s) => s.is_write_vectored(), + Self::Tls(s) => s.is_write_vectored(), + #[cfg(unix)] + Self::Unix(s) => s.is_write_vectored(), + } + } + + fn poll_write_vectored( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + bufs: &[std::io::IoSlice<'_>], + ) -> std::task::Poll> { + match self.project() { + NetworkStreamProject::Tcp(s) => s.poll_write_vectored(cx, bufs), + NetworkStreamProject::Tls(s) => s.poll_write_vectored(cx, bufs), + #[cfg(unix)] + NetworkStreamProject::Unix(s) => s.poll_write_vectored(cx, bufs), + } + } +} + +/// A raw stream listener of one of the types handled by this extension. +pub enum NetworkStreamListener { + Tcp(tokio::net::TcpListener), + Tls(tokio::net::TcpListener, Arc), + #[cfg(unix)] + Unix(tokio::net::UnixListener), +} + +pub enum NetworkStreamAddress { + Ip(std::net::SocketAddr), + #[cfg(unix)] + Unix(tokio::net::unix::SocketAddr), +} + +impl NetworkStreamListener { + /// Accepts a connection on this listener. + pub async fn accept(&self) -> Result { + Ok(match self { + Self::Tcp(tcp) => { + let (stream, _addr) = tcp.accept().await?; + NetworkStream::Tcp(stream) + } + Self::Tls(tcp, config) => { + let (stream, _addr) = tcp.accept().await?; + NetworkStream::Tls(TlsStream::new_server_side(stream, config.clone())) + } + #[cfg(unix)] + Self::Unix(unix) => { + let (stream, _addr) = unix.accept().await?; + NetworkStream::Unix(stream) + } + }) + } + + pub fn listen_address(&self) -> Result { + match self { + Self::Tcp(tcp) => Ok(NetworkStreamAddress::Ip(tcp.local_addr()?)), + Self::Tls(tcp, _) => Ok(NetworkStreamAddress::Ip(tcp.local_addr()?)), + #[cfg(unix)] + Self::Unix(unix) => Ok(NetworkStreamAddress::Unix(unix.local_addr()?)), + } + } + + pub fn stream(&self) -> NetworkStreamType { + match self { + Self::Tcp(..) => NetworkStreamType::Tcp, + Self::Tls(..) => NetworkStreamType::Tls, + #[cfg(unix)] + Self::Unix(..) => NetworkStreamType::Unix, + } + } +} + +/// In some cases it may be more efficient to extract the resource from the resource table and use it directly (for example, an HTTP server). +/// This method will extract a stream from the resource table and return it, unwrapped. +pub fn take_network_stream_resource( + resource_table: &mut ResourceTable, + stream_rid: ResourceId, +) -> Result { + // The stream we're attempting to unwrap may be in use somewhere else. If that's the case, we cannot proceed + // with the process of unwrapping this connection, so we just return a bad resource error. + // See also: https://github.com/denoland/deno/pull/16242 + + if let Ok(resource_rc) = resource_table.take::(stream_rid) + { + // This TCP connection might be used somewhere else. + let resource = Rc::try_unwrap(resource_rc) + .map_err(|_| bad_resource("TCP stream is currently in use"))?; + let (read_half, write_half) = resource.into_inner(); + let tcp_stream = read_half.reunite(write_half)?; + return Ok(NetworkStream::Tcp(tcp_stream)); + } + + if let Ok(resource_rc) = resource_table.take::(stream_rid) + { + // This TLS connection might be used somewhere else. + let resource = Rc::try_unwrap(resource_rc) + .map_err(|_| bad_resource("TLS stream is currently in use"))?; + let (read_half, write_half) = resource.into_inner(); + let tls_stream = read_half.reunite(write_half); + return Ok(NetworkStream::Tls(tls_stream)); + } + + #[cfg(unix)] + if let Ok(resource_rc) = resource_table.take::(stream_rid) + { + // This UNIX socket might be used somewhere else. + let resource = Rc::try_unwrap(resource_rc) + .map_err(|_| bad_resource("UNIX stream is currently in use"))?; + let (read_half, write_half) = resource.into_inner(); + let unix_stream = read_half.reunite(write_half)?; + return Ok(NetworkStream::Unix(unix_stream)); + } + + Err(bad_resource_id()) +} + +/// Inserts a raw stream (back?) into the resource table and returns a resource ID. This can then be used to create raw connection +/// objects on the JS side. +pub fn put_network_stream_resource( + resource_table: &mut ResourceTable, + stream: NetworkStream, +) -> Result { + let res = match stream { + NetworkStream::Tcp(conn) => { + let (r, w) = conn.into_split(); + resource_table.add(TcpStreamResource::new((r, w))) + } + NetworkStream::Tls(conn) => { + let (r, w) = conn.into_split(); + resource_table.add(TlsStreamResource::new((r, w))) + } + #[cfg(unix)] + NetworkStream::Unix(conn) => { + let (r, w) = conn.into_split(); + resource_table.add(UnixStreamResource::new((r, w))) + } + }; + + Ok(res) +} + +/// In some cases it may be more efficient to extract the resource from the resource table and use it directly (for example, an HTTP server). +/// This method will extract a stream from the resource table and return it, unwrapped. +pub fn take_network_stream_listener_resource( + resource_table: &mut ResourceTable, + listener_rid: ResourceId, +) -> Result { + if let Ok(resource_rc) = + resource_table.take::(listener_rid) + { + let resource = Rc::try_unwrap(resource_rc) + .map_err(|_| bad_resource("TCP socket listener is currently in use"))?; + return Ok(NetworkStreamListener::Tcp(resource.listener.into_inner())); + } + + if let Ok(resource_rc) = + resource_table.take::(listener_rid) + { + let resource = Rc::try_unwrap(resource_rc) + .map_err(|_| bad_resource("TLS socket listener is currently in use"))?; + return Ok(NetworkStreamListener::Tls( + resource.tcp_listener.into_inner(), + resource.tls_config, + )); + } + + #[cfg(unix)] + if let Ok(resource_rc) = + resource_table.take::(listener_rid) + { + let resource = Rc::try_unwrap(resource_rc) + .map_err(|_| bad_resource("UNIX socket listener is currently in use"))?; + return Ok(NetworkStreamListener::Unix(resource.listener.into_inner())); + } + + Err(bad_resource_id()) +} diff --git a/ext/websocket/Cargo.toml b/ext/websocket/Cargo.toml index 53e184e1e2..006c73a5f2 100644 --- a/ext/websocket/Cargo.toml +++ b/ext/websocket/Cargo.toml @@ -14,11 +14,13 @@ description = "Implementation of WebSocket API for Deno" path = "lib.rs" [dependencies] +bytes.workspace = true deno_core.workspace = true +deno_net.workspace = true deno_tls.workspace = true fastwebsockets = { workspace = true, features = ["upgrade"] } http.workspace = true -hyper.workspace = true +hyper = { workspace = true, features = ["backports"] } serde.workspace = true tokio.workspace = true tokio-rustls.workspace = true diff --git a/ext/websocket/lib.rs b/ext/websocket/lib.rs index 798856bc14..71aa66ff38 100644 --- a/ext/websocket/lib.rs +++ b/ext/websocket/lib.rs @@ -1,11 +1,10 @@ // Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. - +use crate::stream::WebSocketStream; +use bytes::Bytes; use deno_core::error::invalid_hostname; use deno_core::error::type_error; use deno_core::error::AnyError; use deno_core::op; -use deno_core::StringOrBuffer; - use deno_core::url; use deno_core::AsyncRefCell; use deno_core::ByteString; @@ -15,7 +14,10 @@ use deno_core::OpState; use deno_core::RcRef; use deno_core::Resource; use deno_core::ResourceId; +use deno_core::StringOrBuffer; use deno_core::ZeroCopyBuf; +use deno_net::raw::take_network_stream_resource; +use deno_net::raw::NetworkStream; use deno_tls::create_client_config; use http::header::CONNECTION; use http::header::UPGRADE; @@ -24,9 +26,7 @@ use http::HeaderValue; use http::Method; use http::Request; use http::Uri; -use hyper::upgrade::Upgraded; use hyper::Body; -use hyper::Response; use serde::Deserialize; use serde::Serialize; use std::borrow::Cow; @@ -52,6 +52,7 @@ use fastwebsockets::Role; use fastwebsockets::WebSocket; pub use tokio_tungstenite; // Re-export tokio_tungstenite +mod stream; #[derive(Clone)] pub struct WsRootStore(pub Option); @@ -243,17 +244,21 @@ where let client = fastwebsockets::handshake::client(&LocalExecutor, request, socket); - let (stream, response): (WebSocket, Response) = - if let Some(cancel_resource) = cancel_resource { - client.or_cancel(cancel_resource.0.to_owned()).await? - } else { - client.await - } - .map_err(|err| { - DomExceptionNetworkError::new(&format!( - "failed to connect to WebSocket: {err}" - )) - })?; + let (upgraded, response) = if let Some(cancel_resource) = cancel_resource { + client.or_cancel(cancel_resource.0.to_owned()).await? + } else { + client.await + } + .map_err(|err| { + DomExceptionNetworkError::new(&format!( + "failed to connect to WebSocket: {err}" + )) + })?; + + let inner = MaybeTlsStream::Plain(upgraded.into_inner()); + let stream = + WebSocketStream::new(stream::WsStreamKind::Tungstenite(inner), None); + let stream = WebSocket::after_handshake(stream, Role::Client); if let Some(cancel_rid) = cancel_handle { state.borrow_mut().resource_table.close(cancel_rid).ok(); @@ -294,7 +299,7 @@ pub enum MessageKind { } pub struct ServerWebSocket { - ws: AsyncRefCell>, + ws: AsyncRefCell>, closed: Rc>, } @@ -320,11 +325,19 @@ impl Resource for ServerWebSocket { "serverWebSocket".into() } } -pub async fn ws_create_server_stream( - state: &Rc>, - transport: Upgraded, + +pub fn ws_create_server_stream( + state: &mut OpState, + transport: NetworkStream, + read_buf: Bytes, ) -> Result { - let mut ws = WebSocket::after_handshake(transport, Role::Server); + let mut ws = WebSocket::after_handshake( + WebSocketStream::new( + stream::WsStreamKind::Network(transport), + Some(read_buf), + ), + Role::Server, + ); ws.set_writev(true); ws.set_auto_close(true); ws.set_auto_pong(true); @@ -334,11 +347,26 @@ pub async fn ws_create_server_stream( closed: Rc::new(Cell::new(false)), }; - let resource_table = &mut state.borrow_mut().resource_table; - let rid = resource_table.add(ws_resource); + let rid = state.resource_table.add(ws_resource); Ok(rid) } +#[op] +pub fn op_ws_server_create( + state: &mut OpState, + conn: ResourceId, + extra_bytes: &[u8], +) -> Result { + let network_stream = + take_network_stream_resource(&mut state.resource_table, conn)?; + // Copying the extra bytes, but unlikely this will account for much + ws_create_server_stream( + state, + network_stream, + Bytes::from(extra_bytes.to_vec()), + ) +} + #[op] pub async fn op_ws_send_binary( state: Rc>, @@ -490,6 +518,7 @@ deno_core::extension!(deno_websocket, op_ws_next_event, op_ws_send_binary, op_ws_send_text, + op_ws_server_create, ], esm = [ "01_websocket.js", "02_websocketstream.js" ], options = { diff --git a/ext/websocket/stream.rs b/ext/websocket/stream.rs new file mode 100644 index 0000000000..69c06b7eb7 --- /dev/null +++ b/ext/websocket/stream.rs @@ -0,0 +1,115 @@ +// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license. +use bytes::Buf; +use bytes::Bytes; +use deno_net::raw::NetworkStream; +use hyper::upgrade::Upgraded; +use std::pin::Pin; +use std::task::Poll; +use tokio::io::AsyncRead; +use tokio::io::AsyncWrite; +use tokio::io::ReadBuf; +use tokio_tungstenite::MaybeTlsStream; + +// TODO(bartlomieju): remove this +pub(crate) enum WsStreamKind { + Tungstenite(MaybeTlsStream), + Network(NetworkStream), +} + +pub(crate) struct WebSocketStream { + stream: WsStreamKind, + pre: Option, +} + +impl WebSocketStream { + pub fn new(stream: WsStreamKind, buffer: Option) -> Self { + Self { + stream, + pre: buffer, + } + } +} + +impl AsyncRead for WebSocketStream { + // From hyper's Rewind (https://github.com/hyperium/hyper), MIT License, Copyright (c) Sean McArthur + fn poll_read( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + if let Some(mut prefix) = self.pre.take() { + // If there are no remaining bytes, let the bytes get dropped. + if !prefix.is_empty() { + let copy_len = std::cmp::min(prefix.len(), buf.remaining()); + // TODO: There should be a way to do following two lines cleaner... + buf.put_slice(&prefix[..copy_len]); + prefix.advance(copy_len); + // Put back what's left + if !prefix.is_empty() { + self.pre = Some(prefix); + } + + return Poll::Ready(Ok(())); + } + } + match &mut self.stream { + WsStreamKind::Network(stream) => Pin::new(stream).poll_read(cx, buf), + WsStreamKind::Tungstenite(stream) => Pin::new(stream).poll_read(cx, buf), + } + } +} + +impl AsyncWrite for WebSocketStream { + fn poll_write( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll> { + match &mut self.stream { + WsStreamKind::Network(stream) => Pin::new(stream).poll_write(cx, buf), + WsStreamKind::Tungstenite(stream) => Pin::new(stream).poll_write(cx, buf), + } + } + + fn poll_flush( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + match &mut self.stream { + WsStreamKind::Network(stream) => Pin::new(stream).poll_flush(cx), + WsStreamKind::Tungstenite(stream) => Pin::new(stream).poll_flush(cx), + } + } + + fn poll_shutdown( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + match &mut self.stream { + WsStreamKind::Network(stream) => Pin::new(stream).poll_shutdown(cx), + WsStreamKind::Tungstenite(stream) => Pin::new(stream).poll_shutdown(cx), + } + } + + fn is_write_vectored(&self) -> bool { + match &self.stream { + WsStreamKind::Network(stream) => stream.is_write_vectored(), + WsStreamKind::Tungstenite(stream) => stream.is_write_vectored(), + } + } + + fn poll_write_vectored( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + bufs: &[std::io::IoSlice<'_>], + ) -> std::task::Poll> { + match &mut self.stream { + WsStreamKind::Network(stream) => { + Pin::new(stream).poll_write_vectored(cx, bufs) + } + WsStreamKind::Tungstenite(stream) => { + Pin::new(stream).poll_write_vectored(cx, bufs) + } + } + } +}