1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-21 21:50:00 -05:00

feat(ext/flash): An optimized http/1.1 server (#15405)

Co-authored-by: Bartek Iwańczuk <biwanczuk@gmail.com>
Co-authored-by: Ben Noordhuis <info@bnoordhuis.nl>
Co-authored-by: crowlkats <crowlkats@toaxl.com>
Co-authored-by: Ryan Dahl <ry@tinyclouds.org>
This commit is contained in:
Divy Srivastava 2022-08-18 17:35:02 +05:30 committed by GitHub
parent 0b0843e4a5
commit cd21cff299
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
39 changed files with 21284 additions and 30 deletions

28
Cargo.lock generated
View file

@ -1040,6 +1040,24 @@ dependencies = [
"winapi 0.3.9", "winapi 0.3.9",
] ]
[[package]]
name = "deno_flash"
version = "0.1.0"
dependencies = [
"deno_core",
"deno_tls",
"deno_websocket",
"http",
"httparse",
"libc",
"log 0.4.17",
"mio",
"rustls",
"rustls-pemfile 0.2.1",
"serde",
"tokio",
]
[[package]] [[package]]
name = "deno_graph" name = "deno_graph"
version = "0.30.0" version = "0.30.0"
@ -1147,6 +1165,7 @@ dependencies = [
"deno_crypto", "deno_crypto",
"deno_fetch", "deno_fetch",
"deno_ffi", "deno_ffi",
"deno_flash",
"deno_http", "deno_http",
"deno_net", "deno_net",
"deno_node", "deno_node",
@ -3606,6 +3625,15 @@ dependencies = [
"security-framework", "security-framework",
] ]
[[package]]
name = "rustls-pemfile"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5eebeaeb360c87bfb72e84abdb3447159c0eaececf1bef2aecd65a8be949d1c9"
dependencies = [
"base64 0.13.0",
]
[[package]] [[package]]
name = "rustls-pemfile" name = "rustls-pemfile"
version = "0.3.0" version = "0.3.0"

View file

@ -15,6 +15,7 @@ members = [
"ext/console", "ext/console",
"ext/crypto", "ext/crypto",
"ext/fetch", "ext/fetch",
"ext/flash",
"ext/ffi", "ext/ffi",
"ext/http", "ext/http",
"ext/net", "ext/net",
@ -134,6 +135,8 @@ opt-level = 3
opt-level = 3 opt-level = 3
[profile.release.package.deno_http] [profile.release.package.deno_http]
opt-level = 3 opt-level = 3
[profile.release.package.deno_flash]
opt-level = 3
[profile.release.package.deno_net] [profile.release.package.deno_net]
opt-level = 3 opt-level = 3
[profile.release.package.deno_web] [profile.release.package.deno_web]

View file

@ -0,0 +1,12 @@
// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
const port = Bun.argv[2] || "4545";
const path = new URL("../testdata/128k.bin", import.meta.url).pathname;
Bun.serve({
fetch(_req) {
const file = Bun.file(path);
return new Response(file);
},
port: Number(port),
});

View file

@ -0,0 +1,14 @@
// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
const addr = Deno.args[0] || "127.0.0.1:4500";
const [hostname, port] = addr.split(":");
const { serve } = Deno;
const path = new URL("../testdata/128k.bin", import.meta.url).pathname;
function handler() {
const file = Deno.openSync(path, { read: true });
return new Response(file.readable);
}
serve(handler, { hostname, port: Number(port) });

View file

@ -0,0 +1,14 @@
// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
const addr = Deno.args[0] || "127.0.0.1:4500";
const [hostname, port] = addr.split(":");
const { serve } = Deno;
function handler() {
return new Response("Hello World");
}
serve(handler, {
hostname,
port,
});

View file

@ -0,0 +1,37 @@
// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
// deno-lint-ignore-file
const {
core: {
opAsync,
ops: { op_flash_make_request, op_flash_serve },
encode,
},
} = Deno;
const addr = Deno.args[0] || "127.0.0.1:4500";
const [hostname, port] = addr.split(":");
const serverId = op_flash_serve({ hostname, port });
const serverPromise = opAsync("op_flash_drive_server", serverId);
const fastOps = op_flash_make_request();
function nextRequest() {
return fastOps.nextRequest();
}
function respond(token, response) {
return fastOps.respond(token, response, true);
}
const response = encode(
"HTTP/1.1 200 OK\r\nContent-Length: 11\r\n\r\nHello World",
);
while (true) {
let token = nextRequest();
if (token === 0) token = await opAsync("op_flash_next_async", serverId);
for (let i = 0; i < token; i++) {
respond(
i,
response,
);
}
}

View file

@ -0,0 +1,26 @@
import { renderToReadableStream } from "https://esm.run/react-dom/server";
import * as React from "https://esm.run/react";
const { serve } = Deno;
const addr = Deno.args[0] || "127.0.0.1:4500";
const [hostname, port] = addr.split(":");
const App = () => (
<html>
<body>
<h1>Hello World</h1>
</body>
</html>
);
const headers = {
headers: {
"Content-Type": "text/html",
},
};
serve(
async () => {
return new Response(await renderToReadableStream(<App />), headers);
},
{ hostname, port },
);

File diff suppressed because it is too large Load diff

23
cli/bench/testdata/bun_reactdom_ssr.jsx vendored Normal file
View file

@ -0,0 +1,23 @@
// Bun uses a custom non-portable react-dom fork.
// TODO(@littledivy): Reenable this when it stops segfaulting.
import { renderToReadableStream } from "./react-dom.js";
const headers = {
headers: {
"Content-Type": "text/html",
},
};
const App = () => (
<html>
<body>
<h1>Hello World</h1>
</body>
</html>
);
Bun.serve({
async fetch(req) {
return new Response(await renderToReadableStream(<App />), headers);
},
port: 9000,
});

13
cli/bench/testdata/deno_upgrade_http.js vendored Normal file
View file

@ -0,0 +1,13 @@
const { serve, upgradeHttp } = Deno;
const u8 = Deno.core.encode("HTTP/1.1 101 Switching Protocols\r\n\r\n");
async function handler(req) {
const [conn, _firstPacket] = upgradeHttp(req);
await conn.write(u8);
await conn.close();
}
serve(handler, {
hostname: "127.0.0.1",
port: 9000,
});

28
cli/bench/testdata/react-dom.js vendored Normal file

File diff suppressed because one or more lines are too long

View file

@ -63,6 +63,11 @@ const UNSTABLE_DENO_PROPS: &[&str] = &[
"spawnSync", "spawnSync",
"ChildStatus", "ChildStatus",
"SpawnOutput", "SpawnOutput",
"serve",
"serveTls",
"ServeInit",
"ServeTlsInit",
"Handler",
]; ];
static MSG_MISSING_PROPERTY_DENO: Lazy<Regex> = Lazy::new(|| { static MSG_MISSING_PROPERTY_DENO: Lazy<Regex> = Lazy::new(|| {

View file

@ -1211,12 +1211,144 @@ declare namespace Deno {
*/ */
export function unrefTimer(id: number): void; export function unrefTimer(id: number): void;
/** **UNSTABLE**: new API, yet to be vetted. /**
* A handler for HTTP requests. Consumes a request and returns a response.
*
* Handler allows `void` or `Promise<void>` return type to enable
* request upgrades using `Deno.upgradeHttp()` API. It is callers responsibility
* to write response manually to the returned connection. Failing to do so
* (or not returning a response without an upgrade) will cause the connection
* to hang.
*
* If a handler throws, the server calling the handler will assume the impact
* of the error is isolated to the individual request. It will catch the error
* and close the underlying connection.
*
* @category HTTP Server
*/
export type ServeHandler = (
request: Request,
) => Response | Promise<Response> | void | Promise<void>;
/**
* @category HTTP Server
*/
export interface ServeInit extends Partial<Deno.ListenOptions> {
/** An AbortSignal to close the server and all connections. */
signal?: AbortSignal;
/** The handler to invoke when route handlers throw an error. */
onError?: (error: unknown) => Response | Promise<Response>;
/** The callback which is called when the server started listening */
onListen?: (params: { hostname: string; port: number }) => void;
}
/**
* @category HTTP Server
*/
export interface ServeTlsInit extends ServeInit {
/** Server private key in PEM format */
cert: string;
/** Cert chain in PEM format */
key: string;
}
/** Serves HTTP requests with the given handler.
*
* You can specify an object with a port and hostname option, which is the
* address to listen on. The default is port 9000 on hostname "127.0.0.1".
*
* The below example serves with the port 9000.
*
* ```ts
* Deno.serve((_req) => new Response("Hello, world"));
* ```
*
* You can change the listening address by the `hostname` and `port` options.
* The below example serves with the port 3000.
*
* ```ts
* Deno.serve((_req) => new Response("Hello, world"), { port: 3000 });
* ```
*
* `Deno.serve` function prints the message `Listening on http://<hostname>:<port>/`
* on start-up by default. If you like to change this message, you can specify
* `onListen` option to override it.
*
* ```ts
* Deno.serve((_req) => new Response("Hello, world"), {
* onListen({ port, hostname }) {
* console.log(`Server started at http://${hostname}:${port}`);
* // ... more info specific to your server ..
* },
* });
* ```
*
* @param handler The handler for individual HTTP requests.
* @param options The options. See `ServeInit` documentation for details.
*
* @category HTTP Server
*/
export function serve(
handler: ServeHandler,
options?: ServeInit,
): Promise<void>;
/** Serves HTTPS requests with the given handler.
*
* You must specify `key` and `cert` options.
*
* You can specify an object with a port and hostname option, which is the
* address to listen on. The default is port 9000 on hostname "127.0.0.1".
*
* The below example serves with the default port 8443.
*
* ```ts
* const cert = "-----BEGIN CERTIFICATE-----\n...\n-----END CERTIFICATE-----\n";
* const key = "-----BEGIN PRIVATE KEY-----\n...\n-----END PRIVATE KEY-----\n";
* Deno.serveTls((_req) => new Response("Hello, world"), { cert, key });
*
* ```
*
* `Deno.serveTls` function prints the message `Listening on https://<hostname>:<port>/`
* on start-up by default. If you like to change this message, you can specify
* `onListen` option to override it.
*
* ```ts
* const cert = "-----BEGIN CERTIFICATE-----\n...\n-----END CERTIFICATE-----\n";
* const key = "-----BEGIN PRIVATE KEY-----\n...\n-----END PRIVATE KEY-----\n";
* Deno.serveTls((_req) => new Response("Hello, world"), {
* cert,
* key,
* onListen({ port, hostname }) {
* console.log(`Server started at https://${hostname}:${port}`);
* // ... more info specific to your server ..
* },
* });
* ```
*
* @param handler The handler for individual HTTPS requests.
* @param options The options. See `ServeTlsInit` documentation for details.
*
* @category HTTP Server
*/
export function serveTls(
handler: ServeHandler,
options?: ServeTlsInit,
): Promise<void>;
/** **UNSTABLE**: new API, yet to be vetter.
* *
* Allows to "hijack" a connection that the request is associated with. * Allows to "hijack" a connection that the request is associated with.
* Can be used to implement protocols that build on top of HTTP (eg. * Can be used to implement protocols that build on top of HTTP (eg.
* WebSockets). * WebSockets).
* *
* The return type depends if `request` is coming from `Deno.serve()` API
* or `Deno.serveHttp()`; for former it returns the connection and first
* packet, for latter it returns a promise.
*
* The returned promise returns underlying connection and first packet * The returned promise returns underlying connection and first packet
* received. The promise shouldn't be awaited before responding to the * received. The promise shouldn't be awaited before responding to the
* `request`, otherwise event loop might deadlock. * `request`, otherwise event loop might deadlock.
@ -1225,7 +1357,7 @@ declare namespace Deno {
*/ */
export function upgradeHttp( export function upgradeHttp(
request: Request, request: Request,
): Promise<[Deno.Conn, Uint8Array]>; ): [Deno.Conn, Uint8Array] | Promise<[Deno.Conn, Uint8Array]>;
/** @category Sub Process */ /** @category Sub Process */
export interface SpawnOptions { export interface SpawnOptions {

1981
cli/tests/unit/flash_test.ts Normal file

File diff suppressed because it is too large Load diff

View file

@ -101,6 +101,8 @@ pub use crate::runtime::JsRuntime;
pub use crate::runtime::RuntimeOptions; pub use crate::runtime::RuntimeOptions;
pub use crate::runtime::SharedArrayBufferStore; pub use crate::runtime::SharedArrayBufferStore;
pub use crate::runtime::Snapshot; pub use crate::runtime::Snapshot;
pub use crate::runtime::V8_WRAPPER_OBJECT_INDEX;
pub use crate::runtime::V8_WRAPPER_TYPE_INDEX;
pub use crate::source_map::SourceMapGetter; pub use crate::source_map::SourceMapGetter;
pub use deno_ops::op; pub use deno_ops::op;

View file

@ -64,6 +64,13 @@ pub trait Resource: Any + 'static {
/// resource specific clean-ups, such as cancelling pending futures, after a /// resource specific clean-ups, such as cancelling pending futures, after a
/// resource has been removed from the resource table. /// resource has been removed from the resource table.
fn close(self: Rc<Self>) {} fn close(self: Rc<Self>) {}
/// Resources backed by a file descriptor can let ops know to allow for
/// low-level optimizations.
#[cfg(unix)]
fn backing_fd(self: Rc<Self>) -> Option<std::os::unix::prelude::RawFd> {
None
}
} }
impl dyn Resource { impl dyn Resource {

View file

@ -250,6 +250,9 @@ fn v8_init(
v8::V8::initialize(); v8::V8::initialize();
} }
pub const V8_WRAPPER_TYPE_INDEX: i32 = 0;
pub const V8_WRAPPER_OBJECT_INDEX: i32 = 1;
#[derive(Default)] #[derive(Default)]
pub struct RuntimeOptions { pub struct RuntimeOptions {
/// Source map reference for errors. /// Source map reference for errors.
@ -360,7 +363,12 @@ impl JsRuntime {
let mut params = options let mut params = options
.create_params .create_params
.take() .take()
.unwrap_or_else(v8::Isolate::create_params) .unwrap_or_else(|| {
v8::CreateParams::default().embedder_wrapper_type_info_offsets(
V8_WRAPPER_TYPE_INDEX,
V8_WRAPPER_OBJECT_INDEX,
)
})
.external_references(&**bindings::EXTERNAL_REFERENCES); .external_references(&**bindings::EXTERNAL_REFERENCES);
let snapshot_loaded = if let Some(snapshot) = options.startup_snapshot { let snapshot_loaded = if let Some(snapshot) = options.startup_snapshot {
params = match snapshot { params = match snapshot {

View file

@ -388,7 +388,10 @@
let source = null; let source = null;
let length = null; let length = null;
let contentType = null; let contentType = null;
if (ObjectPrototypeIsPrototypeOf(BlobPrototype, object)) { if (typeof object === "string") {
source = object;
contentType = "text/plain;charset=UTF-8";
} else if (ObjectPrototypeIsPrototypeOf(BlobPrototype, object)) {
stream = object.stream(); stream = object.stream();
source = object; source = object;
length = object.size; length = object.size;
@ -424,24 +427,21 @@
// TODO(@satyarohith): not sure what primordial here. // TODO(@satyarohith): not sure what primordial here.
source = object.toString(); source = object.toString();
contentType = "application/x-www-form-urlencoded;charset=UTF-8"; contentType = "application/x-www-form-urlencoded;charset=UTF-8";
} else if (typeof object === "string") {
source = object;
contentType = "text/plain;charset=UTF-8";
} else if (ObjectPrototypeIsPrototypeOf(ReadableStreamPrototype, object)) { } else if (ObjectPrototypeIsPrototypeOf(ReadableStreamPrototype, object)) {
stream = object; stream = object;
if (object.locked || isReadableStreamDisturbed(object)) { if (object.locked || isReadableStreamDisturbed(object)) {
throw new TypeError("ReadableStream is locked or disturbed"); throw new TypeError("ReadableStream is locked or disturbed");
} }
} }
if (ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, source)) { if (typeof source === "string") {
stream = { body: source, consumed: false };
length = source.byteLength;
} else if (typeof source === "string") {
// WARNING: this deviates from spec (expects length to be set) // WARNING: this deviates from spec (expects length to be set)
// https://fetch.spec.whatwg.org/#bodyinit > 7. // https://fetch.spec.whatwg.org/#bodyinit > 7.
// no observable side-effect for users so far, but could change // no observable side-effect for users so far, but could change
stream = { body: source, consumed: false }; stream = { body: source, consumed: false };
length = null; // NOTE: string length != byte length length = null; // NOTE: string length != byte length
} else if (ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, source)) {
stream = { body: source, consumed: false };
length = source.byteLength;
} }
const body = new InnerBody(stream); const body = new InnerBody(stream);
body.source = source; body.source = source;

View file

@ -16,7 +16,7 @@
const { HTTP_TOKEN_CODE_POINT_RE, byteUpperCase } = window.__bootstrap.infra; const { HTTP_TOKEN_CODE_POINT_RE, byteUpperCase } = window.__bootstrap.infra;
const { URL } = window.__bootstrap.url; const { URL } = window.__bootstrap.url;
const { guardFromHeaders } = window.__bootstrap.headers; const { guardFromHeaders } = window.__bootstrap.headers;
const { mixinBody, extractBody } = window.__bootstrap.fetchBody; const { mixinBody, extractBody, InnerBody } = window.__bootstrap.fetchBody;
const { getLocationHref } = window.__bootstrap.location; const { getLocationHref } = window.__bootstrap.location;
const { extractMimeType } = window.__bootstrap.mimesniff; const { extractMimeType } = window.__bootstrap.mimesniff;
const { blobFromObjectUrl } = window.__bootstrap.file; const { blobFromObjectUrl } = window.__bootstrap.file;
@ -48,6 +48,9 @@
const _signal = Symbol("signal"); const _signal = Symbol("signal");
const _mimeType = Symbol("mime type"); const _mimeType = Symbol("mime type");
const _body = Symbol("body"); const _body = Symbol("body");
const _flash = Symbol("flash");
const _url = Symbol("url");
const _method = Symbol("method");
/** /**
* @param {(() => string)[]} urlList * @param {(() => string)[]} urlList
@ -266,7 +269,11 @@
return extractMimeType(values); return extractMimeType(values);
} }
get [_body]() { get [_body]() {
return this[_request].body; if (this[_flash]) {
return this[_flash].body;
} else {
return this[_request].body;
}
} }
/** /**
@ -427,12 +434,31 @@
get method() { get method() {
webidl.assertBranded(this, RequestPrototype); webidl.assertBranded(this, RequestPrototype);
return this[_request].method; if (this[_method]) {
return this[_method];
}
if (this[_flash]) {
this[_method] = this[_flash].methodCb();
return this[_method];
} else {
this[_method] = this[_request].method;
return this[_method];
}
} }
get url() { get url() {
webidl.assertBranded(this, RequestPrototype); webidl.assertBranded(this, RequestPrototype);
return this[_request].url(); if (this[_url]) {
return this[_url];
}
if (this[_flash]) {
this[_url] = this[_flash].urlCb();
return this[_url];
} else {
this[_url] = this[_request].url();
return this[_url];
}
} }
get headers() { get headers() {
@ -442,6 +468,9 @@
get redirect() { get redirect() {
webidl.assertBranded(this, RequestPrototype); webidl.assertBranded(this, RequestPrototype);
if (this[_flash]) {
return this[_flash].redirectMode;
}
return this[_request].redirectMode; return this[_request].redirectMode;
} }
@ -455,7 +484,12 @@
if (this[_body] && this[_body].unusable()) { if (this[_body] && this[_body].unusable()) {
throw new TypeError("Body is unusable."); throw new TypeError("Body is unusable.");
} }
const newReq = cloneInnerRequest(this[_request]); let newReq;
if (this[_flash]) {
newReq = cloneInnerRequest(this[_flash]);
} else {
newReq = cloneInnerRequest(this[_request]);
}
const newSignal = abortSignal.newSignal(); const newSignal = abortSignal.newSignal();
abortSignal.follow(newSignal, this[_signal]); abortSignal.follow(newSignal, this[_signal]);
return fromInnerRequest( return fromInnerRequest(
@ -549,10 +583,43 @@
return request; return request;
} }
/**
* @param {number} serverId
* @param {number} streamRid
* @param {ReadableStream} body
* @param {() => string} methodCb
* @param {() => string} urlCb
* @param {() => [string, string][]} headersCb
* @returns {Request}
*/
function fromFlashRequest(
serverId,
streamRid,
body,
methodCb,
urlCb,
headersCb,
) {
const request = webidl.createBranded(Request);
request[_flash] = {
body: body !== null ? new InnerBody(body) : null,
methodCb,
urlCb,
streamRid,
serverId,
redirectMode: "follow",
redirectCount: 0,
};
request[_getHeaders] = () => headersFromHeaderList(headersCb(), "request");
return request;
}
window.__bootstrap.fetch ??= {}; window.__bootstrap.fetch ??= {};
window.__bootstrap.fetch.Request = Request; window.__bootstrap.fetch.Request = Request;
window.__bootstrap.fetch.toInnerRequest = toInnerRequest; window.__bootstrap.fetch.toInnerRequest = toInnerRequest;
window.__bootstrap.fetch.fromFlashRequest = fromFlashRequest;
window.__bootstrap.fetch.fromInnerRequest = fromInnerRequest; window.__bootstrap.fetch.fromInnerRequest = fromInnerRequest;
window.__bootstrap.fetch.newInnerRequest = newInnerRequest; window.__bootstrap.fetch.newInnerRequest = newInnerRequest;
window.__bootstrap.fetch.processUrlList = processUrlList; window.__bootstrap.fetch.processUrlList = processUrlList;
window.__bootstrap.fetch._flash = _flash;
})(globalThis); })(globalThis);

View file

@ -15,6 +15,9 @@
const { isProxy } = Deno.core; const { isProxy } = Deno.core;
const webidl = window.__bootstrap.webidl; const webidl = window.__bootstrap.webidl;
const consoleInternal = window.__bootstrap.console; const consoleInternal = window.__bootstrap.console;
const {
byteLowerCase,
} = window.__bootstrap.infra;
const { HTTP_TAB_OR_SPACE, regexMatcher, serializeJSValueToJSONString } = const { HTTP_TAB_OR_SPACE, regexMatcher, serializeJSValueToJSONString } =
window.__bootstrap.infra; window.__bootstrap.infra;
const { extractBody, mixinBody } = window.__bootstrap.fetchBody; const { extractBody, mixinBody } = window.__bootstrap.fetchBody;
@ -185,7 +188,6 @@
// 4. // 4.
response[_response].statusMessage = init.statusText; response[_response].statusMessage = init.statusText;
// 5. // 5.
/** @type {__bootstrap.headers.Headers} */ /** @type {__bootstrap.headers.Headers} */
const headers = response[_headers]; const headers = response[_headers];
@ -200,10 +202,22 @@
"Response with null body status cannot have body", "Response with null body status cannot have body",
); );
} }
const { body, contentType } = bodyWithType; const { body, contentType } = bodyWithType;
response[_response].body = body; response[_response].body = body;
if (contentType !== null && !headers.has("content-type")) {
headers.append("Content-Type", contentType); if (contentType !== null) {
let hasContentType = false;
const list = headerListFromHeaders(headers);
for (let i = 0; i < list.length; i++) {
if (byteLowerCase(list[i][0]) === "content-type") {
hasContentType = true;
break;
}
}
if (!hasContentType) {
ArrayPrototypePush(list, ["Content-Type", contentType]);
}
} }
} }
} }

569
ext/flash/01_http.js Normal file
View file

@ -0,0 +1,569 @@
// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
"use strict";
((window) => {
const { BlobPrototype } = window.__bootstrap.file;
const { fromFlashRequest, toInnerResponse } = window.__bootstrap.fetch;
const core = window.Deno.core;
const {
ReadableStream,
ReadableStreamPrototype,
getReadableStreamRid,
readableStreamClose,
_state,
} = window.__bootstrap.streams;
const {
WebSocket,
_rid,
_readyState,
_eventLoop,
_protocol,
_server,
_idleTimeoutDuration,
_idleTimeoutTimeout,
_serverHandleIdleTimeout,
} = window.__bootstrap.webSocket;
const { _ws } = window.__bootstrap.http;
const {
ObjectPrototypeIsPrototypeOf,
TypedArrayPrototypeSubarray,
TypeError,
Uint8Array,
Uint8ArrayPrototype,
} = window.__bootstrap.primordials;
const statusCodes = {
100: "Continue",
101: "Switching Protocols",
102: "Processing",
200: "OK",
201: "Created",
202: "Accepted",
203: "Non Authoritative Information",
204: "No Content",
205: "Reset Content",
206: "Partial Content",
207: "Multi-Status",
208: "Already Reported",
226: "IM Used",
300: "Multiple Choices",
301: "Moved Permanently",
302: "Found",
303: "See Other",
304: "Not Modified",
305: "Use Proxy",
307: "Temporary Redirect",
308: "Permanent Redirect",
400: "Bad Request",
401: "Unauthorized",
402: "Payment Required",
403: "Forbidden",
404: "Not Found",
405: "Method Not Allowed",
406: "Not Acceptable",
407: "Proxy Authentication Required",
408: "Request Timeout",
409: "Conflict",
410: "Gone",
411: "Length Required",
412: "Precondition Failed",
413: "Payload Too Large",
414: "URI Too Long",
415: "Unsupported Media Type",
416: "Range Not Satisfiable",
418: "I'm a teapot",
421: "Misdirected Request",
422: "Unprocessable Entity",
423: "Locked",
424: "Failed Dependency",
426: "Upgrade Required",
428: "Precondition Required",
429: "Too Many Requests",
431: "Request Header Fields Too Large",
451: "Unavailable For Legal Reasons",
500: "Internal Server Error",
501: "Not Implemented",
502: "Bad Gateway",
503: "Service Unavailable",
504: "Gateway Timeout",
505: "HTTP Version Not Supported",
506: "Variant Also Negotiates",
507: "Insufficient Storage",
508: "Loop Detected",
510: "Not Extended",
511: "Network Authentication Required",
};
const methods = {
0: "GET",
1: "HEAD",
2: "CONNECT",
3: "PUT",
4: "DELETE",
5: "OPTIONS",
6: "TRACE",
7: "POST",
8: "PATCH",
};
let dateInterval;
let date;
let stringResources = {};
// Construct an HTTP response message.
// All HTTP/1.1 messages consist of a start-line followed by a sequence
// of octets.
//
// HTTP-message = start-line
// *( header-field CRLF )
// CRLF
// [ message-body ]
//
function http1Response(method, status, headerList, body, earlyEnd = false) {
// HTTP uses a "<major>.<minor>" numbering scheme
// HTTP-version = HTTP-name "/" DIGIT "." DIGIT
// HTTP-name = %x48.54.54.50 ; "HTTP", case-sensitive
//
// status-line = HTTP-version SP status-code SP reason-phrase CRLF
// Date header: https://datatracker.ietf.org/doc/html/rfc7231#section-7.1.1.2
let str = `HTTP/1.1 ${status} ${statusCodes[status]}\r\nDate: ${date}\r\n`;
for (const [name, value] of headerList) {
// header-field = field-name ":" OWS field-value OWS
str += `${name}: ${value}\r\n`;
}
// https://datatracker.ietf.org/doc/html/rfc7231#section-6.3.6
if (status === 205 || status === 304) {
// MUST NOT generate a payload in a 205 response.
// indicate a zero-length body for the response by
// including a Content-Length header field with a value of 0.
str += "Content-Length: 0\r\n";
return str;
}
// MUST NOT send Content-Length or Transfer-Encoding if status code is 1xx or 204.
if (status == 204 && status <= 100) {
return str;
}
if (earlyEnd === true) {
return str;
}
// null body status is validated by inititalizeAResponse in ext/fetch
if (body !== null && body !== undefined) {
str += `Content-Length: ${body.length}\r\n\r\n`;
} else {
str += "Transfer-Encoding: chunked\r\n\r\n";
return str;
}
// A HEAD request.
if (method === 1) return str;
if (typeof body === "string") {
str += body ?? "";
} else {
const head = core.encode(str);
const response = new Uint8Array(head.byteLength + body.byteLength);
response.set(head, 0);
response.set(body, head.byteLength);
return response;
}
return str;
}
function prepareFastCalls() {
return core.opSync("op_flash_make_request");
}
function hostnameForDisplay(hostname) {
// If the hostname is "0.0.0.0", we display "localhost" in console
// because browsers in Windows don't resolve "0.0.0.0".
// See the discussion in https://github.com/denoland/deno_std/issues/1165
return hostname === "0.0.0.0" ? "localhost" : hostname;
}
function serve(handler, opts = {}) {
delete opts.key;
delete opts.cert;
return serveInner(handler, opts, false);
}
function serveTls(handler, opts = {}) {
return serveInner(handler, opts, true);
}
function serveInner(handler, opts, useTls) {
opts = { hostname: "127.0.0.1", port: 9000, useTls, ...opts };
const signal = opts.signal;
delete opts.signal;
const onError = opts.onError ?? function (error) {
console.error(error);
return new Response("Internal Server Error", { status: 500 });
};
delete opts.onError;
const onListen = opts.onListen ?? function () {
console.log(
`Listening on http://${
hostnameForDisplay(opts.hostname)
}:${opts.port}/`,
);
};
delete opts.onListen;
const serverId = core.ops.op_flash_serve(opts);
const serverPromise = core.opAsync("op_flash_drive_server", serverId);
core.opAsync("op_flash_wait_for_listening", serverId).then(() => {
onListen({ hostname: opts.hostname, port: opts.port });
});
const server = {
id: serverId,
transport: opts.cert && opts.key ? "https" : "http",
hostname: opts.hostname,
port: opts.port,
closed: false,
finished: (async () => {
return await serverPromise;
})(),
async close() {
if (server.closed) {
return;
}
server.closed = true;
await core.opAsync("op_flash_close_server", serverId);
await server.finished;
},
async serve() {
while (true) {
if (server.closed) {
break;
}
let token = nextRequestSync();
if (token === 0) {
token = await core.opAsync("op_flash_next_async", serverId);
if (server.closed) {
break;
}
}
for (let i = 0; i < token; i++) {
let body = null;
// There might be a body, but we don't expose it for GET/HEAD requests.
// It will be closed automatically once the request has been handled and
// the response has been sent.
const method = getMethodSync(i);
let hasBody = method > 2; // Not GET/HEAD/CONNECT
if (hasBody) {
body = createRequestBodyStream(serverId, i);
if (body === null) {
hasBody = false;
}
}
const req = fromFlashRequest(
serverId,
/* streamRid */
i,
body,
/* methodCb */
() => methods[method],
/* urlCb */
() => {
const path = core.ops.op_flash_path(serverId, i);
return `${server.transport}://${server.hostname}:${server.port}${path}`;
},
/* headersCb */
() => core.ops.op_flash_headers(serverId, i),
);
let resp;
try {
resp = await handler(req);
} catch (e) {
resp = await onError(e);
}
// there might've been an HTTP upgrade.
if (resp === undefined) {
continue;
}
const ws = resp[_ws];
if (!ws) {
if (hasBody && body[_state] !== "closed") {
// TODO(@littledivy): Optimize by draining in a single op.
try {
await req.arrayBuffer();
} catch { /* pass */ }
}
}
const innerResp = toInnerResponse(resp);
// If response body length is known, it will be sent synchronously in a
// single op, in other case a "response body" resource will be created and
// we'll be streaming it.
/** @type {ReadableStream<Uint8Array> | Uint8Array | null} */
let respBody = null;
let isStreamingResponseBody = false;
if (innerResp.body !== null) {
if (typeof innerResp.body.streamOrStatic?.body === "string") {
if (innerResp.body.streamOrStatic.consumed === true) {
throw new TypeError("Body is unusable.");
}
innerResp.body.streamOrStatic.consumed = true;
respBody = innerResp.body.streamOrStatic.body;
isStreamingResponseBody = false;
} else if (
ObjectPrototypeIsPrototypeOf(
ReadableStreamPrototype,
innerResp.body.streamOrStatic,
)
) {
if (innerResp.body.unusable()) {
throw new TypeError("Body is unusable.");
}
if (
innerResp.body.length === null ||
ObjectPrototypeIsPrototypeOf(
BlobPrototype,
innerResp.body.source,
)
) {
respBody = innerResp.body.stream;
} else {
const reader = innerResp.body.stream.getReader();
const r1 = await reader.read();
if (r1.done) {
respBody = new Uint8Array(0);
} else {
respBody = r1.value;
const r2 = await reader.read();
if (!r2.done) throw new TypeError("Unreachable");
}
}
isStreamingResponseBody = !(
typeof respBody === "string" ||
ObjectPrototypeIsPrototypeOf(Uint8ArrayPrototype, respBody)
);
} else {
if (innerResp.body.streamOrStatic.consumed === true) {
throw new TypeError("Body is unusable.");
}
innerResp.body.streamOrStatic.consumed = true;
respBody = innerResp.body.streamOrStatic.body;
}
} else {
respBody = new Uint8Array(0);
}
if (isStreamingResponseBody === true) {
const resourceRid = getReadableStreamRid(respBody);
if (resourceRid) {
if (respBody.locked) {
throw new TypeError("ReadableStream is locked.");
}
const reader = respBody.getReader(); // Aquire JS lock.
try {
core.opAsync(
"op_flash_write_resource",
http1Response(
method,
innerResp.status ?? 200,
innerResp.headerList,
null,
true,
),
serverId,
i,
resourceRid,
).then(() => {
// Release JS lock.
readableStreamClose(respBody);
});
} catch (error) {
await reader.cancel(error);
throw error;
}
} else {
const reader = respBody.getReader();
let first = true;
a:
while (true) {
const { value, done } = await reader.read();
if (first) {
first = false;
core.ops.op_flash_respond(
serverId,
i,
http1Response(
method,
innerResp.status ?? 200,
innerResp.headerList,
null,
),
value ?? new Uint8Array(),
false,
);
} else {
if (value === undefined) {
core.ops.op_flash_respond_chuncked(
serverId,
i,
undefined,
done,
);
} else {
respondChunked(
i,
value,
done,
);
}
}
if (done) break a;
}
}
} else {
const responseStr = http1Response(
method,
innerResp.status ?? 200,
innerResp.headerList,
respBody,
);
// TypedArray
if (typeof responseStr !== "string") {
respondFast(i, responseStr, !ws);
} else {
// string
const maybeResponse = stringResources[responseStr];
if (maybeResponse === undefined) {
stringResources[responseStr] = core.encode(responseStr);
core.ops.op_flash_respond(
serverId,
i,
stringResources[responseStr],
null,
!ws, // Don't close socket if there is a deferred websocket upgrade.
);
} else {
respondFast(i, maybeResponse, !ws);
}
}
}
if (ws) {
const wsRid = await core.opAsync(
"op_flash_upgrade_websocket",
serverId,
i,
);
ws[_rid] = wsRid;
ws[_protocol] = resp.headers.get("sec-websocket-protocol");
ws[_readyState] = WebSocket.OPEN;
const event = new Event("open");
ws.dispatchEvent(event);
ws[_eventLoop]();
if (ws[_idleTimeoutDuration]) {
ws.addEventListener(
"close",
() => clearTimeout(ws[_idleTimeoutTimeout]),
);
}
ws[_serverHandleIdleTimeout]();
}
}
}
await server.finished;
},
};
signal?.addEventListener("abort", () => {
clearInterval(dateInterval);
server.close().then(() => {}, () => {});
}, {
once: true,
});
const fastOp = prepareFastCalls();
let nextRequestSync = () => fastOp.nextRequest();
let getMethodSync = (token) => fastOp.getMethod(token);
let respondChunked = (token, chunk, shutdown) =>
fastOp.respondChunked(token, chunk, shutdown);
let respondFast = (token, response, shutdown) =>
fastOp.respond(token, response, shutdown);
if (serverId > 0) {
nextRequestSync = () => core.ops.op_flash_next_server(serverId);
getMethodSync = (token) => core.ops.op_flash_method(serverId, token);
respondChunked = (token, chunk, shutdown) =>
core.ops.op_flash_respond_chuncked(serverId, token, chunk, shutdown);
respondFast = (token, response, shutdown) =>
core.ops.op_flash_respond(serverId, token, response, null, shutdown);
}
if (!dateInterval) {
date = new Date().toUTCString();
dateInterval = setInterval(() => {
date = new Date().toUTCString();
stringResources = {};
}, 1000);
}
return server.serve().catch(console.error);
}
function createRequestBodyStream(serverId, token) {
// The first packet is left over bytes after parsing the request
const firstRead = core.ops.op_flash_first_packet(
serverId,
token,
);
if (!firstRead) return null;
let firstEnqueued = firstRead.byteLength == 0;
return new ReadableStream({
type: "bytes",
async pull(controller) {
try {
if (firstEnqueued === false) {
controller.enqueue(firstRead);
firstEnqueued = true;
return;
}
// This is the largest possible size for a single packet on a TLS
// stream.
const chunk = new Uint8Array(16 * 1024 + 256);
const read = await core.opAsync(
"op_flash_read_body",
serverId,
token,
chunk,
);
if (read > 0) {
// We read some data. Enqueue it onto the stream.
controller.enqueue(TypedArrayPrototypeSubarray(chunk, 0, read));
} else {
// We have reached the end of the body, so we close the stream.
controller.close();
}
} catch (err) {
// There was an error while reading a chunk of the body, so we
// error.
controller.error(err);
controller.close();
}
},
});
}
window.__bootstrap.flash = {
serve,
serveTls,
};
})(this);

29
ext/flash/Cargo.toml Normal file
View file

@ -0,0 +1,29 @@
# Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
[package]
name = "deno_flash"
version = "0.1.0"
authors = ["the Deno authors"]
edition = "2021"
license = "MIT"
readme = "README.md"
repository = "https://github.com/denoland/deno"
description = "Fast HTTP/1 server implementation for Deno"
[lib]
path = "lib.rs"
[dependencies]
deno_core = { path = "../../core", version = "0.147.0" }
deno_tls = { version = "0.52.0", path = "../tls" }
# For HTTP/2 and websocket upgrades
deno_websocket = { version = "0.70.0", path = "../websocket" }
http = "0.2.6"
httparse = "1.7"
libc = "0.2"
log = "0.4.17"
mio = { version = "0.8.1", features = ["os-poll", "net"] }
rustls = { version = "0.20" }
rustls-pemfile = { version = "0.2.1" }
serde = { version = "1.0.136", features = ["derive"] }
tokio = { version = "1.19", features = ["full"] }

7
ext/flash/README.md Normal file
View file

@ -0,0 +1,7 @@
# flash
Flash is a fast HTTP/1.1 server implementation for Deno.
```js
serve((req) => new Response("Hello World"));
```

272
ext/flash/chunked.rs Normal file
View file

@ -0,0 +1,272 @@
// Based on https://github.com/frewsxcv/rust-chunked-transfer/blob/5c08614458580f9e7a85124021006d83ce1ed6e9/src/decoder.rs
// Copyright 2015 The tiny-http Contributors
// Copyright 2015 The rust-chunked-transfer Contributors
// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
use std::error::Error;
use std::fmt;
use std::io::Error as IoError;
use std::io::ErrorKind;
use std::io::Read;
use std::io::Result as IoResult;
pub struct Decoder<R> {
pub source: R,
// remaining size of the chunk being read
// none if we are not in a chunk
pub remaining_chunks_size: Option<usize>,
pub end: bool,
}
impl<R> Decoder<R>
where
R: Read,
{
pub fn new(source: R, remaining_chunks_size: Option<usize>) -> Decoder<R> {
Decoder {
source,
remaining_chunks_size,
end: false,
}
}
fn read_chunk_size(&mut self) -> IoResult<usize> {
let mut chunk_size_bytes = Vec::new();
let mut has_ext = false;
loop {
let byte = match self.source.by_ref().bytes().next() {
Some(b) => b?,
None => {
return Err(IoError::new(ErrorKind::InvalidInput, DecoderError))
}
};
if byte == b'\r' {
break;
}
if byte == b';' {
has_ext = true;
break;
}
chunk_size_bytes.push(byte);
}
// Ignore extensions for now
if has_ext {
loop {
let byte = match self.source.by_ref().bytes().next() {
Some(b) => b?,
None => {
return Err(IoError::new(ErrorKind::InvalidInput, DecoderError))
}
};
if byte == b'\r' {
break;
}
}
}
self.read_line_feed()?;
let chunk_size = String::from_utf8(chunk_size_bytes)
.ok()
.and_then(|c| usize::from_str_radix(c.trim(), 16).ok())
.ok_or_else(|| IoError::new(ErrorKind::InvalidInput, DecoderError))?;
Ok(chunk_size)
}
fn read_carriage_return(&mut self) -> IoResult<()> {
match self.source.by_ref().bytes().next() {
Some(Ok(b'\r')) => Ok(()),
_ => Err(IoError::new(ErrorKind::InvalidInput, DecoderError)),
}
}
fn read_line_feed(&mut self) -> IoResult<()> {
match self.source.by_ref().bytes().next() {
Some(Ok(b'\n')) => Ok(()),
_ => Err(IoError::new(ErrorKind::InvalidInput, DecoderError)),
}
}
}
impl<R> Read for Decoder<R>
where
R: Read,
{
fn read(&mut self, buf: &mut [u8]) -> IoResult<usize> {
let remaining_chunks_size = match self.remaining_chunks_size {
Some(c) => c,
None => {
// first possibility: we are not in a chunk, so we'll attempt to determine
// the chunks size
let chunk_size = self.read_chunk_size()?;
// if the chunk size is 0, we are at EOF
if chunk_size == 0 {
self.read_carriage_return()?;
self.read_line_feed()?;
self.end = true;
return Ok(0);
}
chunk_size
}
};
// second possibility: we continue reading from a chunk
if buf.len() < remaining_chunks_size {
let read = self.source.read(buf)?;
self.remaining_chunks_size = Some(remaining_chunks_size - read);
return Ok(read);
}
// third possibility: the read request goes further than the current chunk
// we simply read until the end of the chunk and return
let buf = &mut buf[..remaining_chunks_size];
let read = self.source.read(buf)?;
self.remaining_chunks_size = if read == remaining_chunks_size {
self.read_carriage_return()?;
self.read_line_feed()?;
None
} else {
Some(remaining_chunks_size - read)
};
Ok(read)
}
}
#[derive(Debug, Copy, Clone)]
struct DecoderError;
impl fmt::Display for DecoderError {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
write!(fmt, "Error while decoding chunks")
}
}
impl Error for DecoderError {
fn description(&self) -> &str {
"Error while decoding chunks"
}
}
#[cfg(test)]
mod test {
use super::Decoder;
use std::io;
use std::io::Read;
/// This unit test is taken from from Hyper
/// https://github.com/hyperium/hyper
/// Copyright (c) 2014 Sean McArthur
#[test]
fn test_read_chunk_size() {
fn read(s: &str, expected: usize) {
let mut decoded = Decoder::new(s.as_bytes(), None);
let actual = decoded.read_chunk_size().unwrap();
assert_eq!(expected, actual);
}
fn read_err(s: &str) {
let mut decoded = Decoder::new(s.as_bytes(), None);
let err_kind = decoded.read_chunk_size().unwrap_err().kind();
assert_eq!(err_kind, io::ErrorKind::InvalidInput);
}
read("1\r\n", 1);
read("01\r\n", 1);
read("0\r\n", 0);
read("00\r\n", 0);
read("A\r\n", 10);
read("a\r\n", 10);
read("Ff\r\n", 255);
read("Ff \r\n", 255);
// Missing LF or CRLF
read_err("F\rF");
read_err("F");
// Invalid hex digit
read_err("X\r\n");
read_err("1X\r\n");
read_err("-\r\n");
read_err("-1\r\n");
// Acceptable (if not fully valid) extensions do not influence the size
read("1;extension\r\n", 1);
read("a;ext name=value\r\n", 10);
read("1;extension;extension2\r\n", 1);
read("1;;; ;\r\n", 1);
read("2; extension...\r\n", 2);
read("3 ; extension=123\r\n", 3);
read("3 ;\r\n", 3);
read("3 ; \r\n", 3);
// Invalid extensions cause an error
read_err("1 invalid extension\r\n");
read_err("1 A\r\n");
read_err("1;no CRLF");
}
#[test]
fn test_valid_chunk_decode() {
let source = io::Cursor::new(
"3\r\nhel\r\nb\r\nlo world!!!\r\n0\r\n\r\n"
.to_string()
.into_bytes(),
);
let mut decoded = Decoder::new(source, None);
let mut string = String::new();
decoded.read_to_string(&mut string).unwrap();
assert_eq!(string, "hello world!!!");
}
#[test]
fn test_decode_zero_length() {
let mut decoder = Decoder::new(b"0\r\n\r\n" as &[u8], None);
let mut decoded = String::new();
decoder.read_to_string(&mut decoded).unwrap();
assert_eq!(decoded, "");
}
#[test]
fn test_decode_invalid_chunk_length() {
let mut decoder = Decoder::new(b"m\r\n\r\n" as &[u8], None);
let mut decoded = String::new();
assert!(decoder.read_to_string(&mut decoded).is_err());
}
#[test]
fn invalid_input1() {
let source = io::Cursor::new(
"2\r\nhel\r\nb\r\nlo world!!!\r\n0\r\n"
.to_string()
.into_bytes(),
);
let mut decoded = Decoder::new(source, None);
let mut string = String::new();
assert!(decoded.read_to_string(&mut string).is_err());
}
#[test]
fn invalid_input2() {
let source = io::Cursor::new(
"3\rhel\r\nb\r\nlo world!!!\r\n0\r\n"
.to_string()
.into_bytes(),
);
let mut decoded = Decoder::new(source, None);
let mut string = String::new();
assert!(decoded.read_to_string(&mut string).is_err());
}
}

1567
ext/flash/lib.rs Normal file

File diff suppressed because it is too large Load diff

81
ext/flash/sendfile.rs Normal file
View file

@ -0,0 +1,81 @@
// Forked from https://github.com/Thomasdezeeuw/sendfile/blob/024f82cd4dede9048392a5bd6d8afcd4d5aa83d5/src/lib.rs
// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
use std::future::Future;
use std::io;
use std::os::unix::io::RawFd;
use std::pin::Pin;
use std::task::{self, Poll};
pub struct SendFile {
pub io: (RawFd, RawFd),
pub written: usize,
}
impl SendFile {
#[inline]
pub fn try_send(&mut self) -> Result<usize, std::io::Error> {
#[cfg(target_os = "linux")]
{
// This is the maximum the Linux kernel will write in a single call.
let count = 0x7ffff000;
let mut offset = self.written as libc::off_t;
// SAFETY: call to libc::sendfile()
let res =
unsafe { libc::sendfile(self.io.1, self.io.0, &mut offset, count) };
if res == -1 {
Err(io::Error::last_os_error())
} else {
self.written = offset as usize;
Ok(res as usize)
}
}
#[cfg(target_os = "macos")]
{
// Send all bytes.
let mut length = 0;
// On macOS `length` is value-result parameter. It determines the number
// of bytes to write and returns the number of bytes written also in
// case of `EAGAIN` errors.
// SAFETY: call to libc::sendfile()
let res = unsafe {
libc::sendfile(
self.io.0,
self.io.1,
self.written as libc::off_t,
&mut length,
std::ptr::null_mut(),
0,
)
};
self.written += length as usize;
if res == -1 {
Err(io::Error::last_os_error())
} else {
Ok(length as usize)
}
}
}
}
impl Future for SendFile {
type Output = Result<(), std::io::Error>;
fn poll(
mut self: Pin<&mut Self>,
_: &mut task::Context<'_>,
) -> Poll<Self::Output> {
loop {
match self.try_send() {
Ok(0) => break Poll::Ready(Ok(())),
Ok(_) => continue,
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => {
break Poll::Pending
}
Err(ref err) if err.kind() == io::ErrorKind::Interrupted => continue, // Try again.
Err(err) => break Poll::Ready(Err(err)),
}
}
}
}

View file

@ -13,6 +13,7 @@
newInnerRequest, newInnerRequest,
newInnerResponse, newInnerResponse,
fromInnerResponse, fromInnerResponse,
_flash,
} = window.__bootstrap.fetch; } = window.__bootstrap.fetch;
const core = window.Deno.core; const core = window.Deno.core;
const { BadResourcePrototype, InterruptedPrototype, ops } = core; const { BadResourcePrototype, InterruptedPrototype, ops } = core;
@ -475,6 +476,20 @@
} }
function upgradeHttp(req) { function upgradeHttp(req) {
if (req[_flash]) {
// NOTE(bartlomieju):
// Access these fields so they are cached on `req` object, otherwise
// they wouldn't be available after the connection gets upgraded.
req.url;
req.method;
req.headers;
const { serverId, streamRid } = req[_flash];
const connRid = core.ops.op_flash_upgrade_http(streamRid, serverId);
// TODO(@littledivy): return already read first packet too.
return [new TcpConn(connRid), new Uint8Array()];
}
req[_deferred] = new Deferred(); req[_deferred] = new Deferred();
return req[_deferred].promise; return req[_deferred].promise;
} }
@ -483,5 +498,6 @@
HttpConn, HttpConn,
upgradeWebSocket, upgradeWebSocket,
upgradeHttp, upgradeHttp,
_ws,
}; };
})(this); })(this);

View file

@ -874,6 +874,41 @@ fn op_http_websocket_accept_header(key: String) -> Result<String, AnyError> {
Ok(base64::encode(digest)) Ok(base64::encode(digest))
} }
struct UpgradedStream(hyper::upgrade::Upgraded);
impl tokio::io::AsyncRead for UpgradedStream {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut Context,
buf: &mut tokio::io::ReadBuf,
) -> std::task::Poll<std::result::Result<(), std::io::Error>> {
Pin::new(&mut self.get_mut().0).poll_read(cx, buf)
}
}
impl tokio::io::AsyncWrite for UpgradedStream {
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context,
buf: &[u8],
) -> std::task::Poll<Result<usize, std::io::Error>> {
Pin::new(&mut self.get_mut().0).poll_write(cx, buf)
}
fn poll_flush(
self: Pin<&mut Self>,
cx: &mut Context,
) -> std::task::Poll<Result<(), std::io::Error>> {
Pin::new(&mut self.get_mut().0).poll_flush(cx)
}
fn poll_shutdown(
self: Pin<&mut Self>,
cx: &mut Context,
) -> std::task::Poll<Result<(), std::io::Error>> {
Pin::new(&mut self.get_mut().0).poll_shutdown(cx)
}
}
impl deno_websocket::Upgraded for UpgradedStream {}
#[op] #[op]
async fn op_http_upgrade_websocket( async fn op_http_upgrade_websocket(
state: Rc<RefCell<OpState>>, state: Rc<RefCell<OpState>>,
@ -893,7 +928,9 @@ async fn op_http_upgrade_websocket(
}; };
let transport = hyper::upgrade::on(request).await?; let transport = hyper::upgrade::on(request).await?;
let ws_rid = ws_create_server_stream(&state, transport).await?; let ws_rid =
ws_create_server_stream(&state, Box::pin(UpgradedStream(transport)))
.await?;
Ok(ws_rid) Ok(ws_rid)
} }

View file

@ -5897,6 +5897,7 @@
window.__bootstrap.streams = { window.__bootstrap.streams = {
// Non-Public // Non-Public
_state,
isReadableStreamDisturbed, isReadableStreamDisturbed,
errorReadableStream, errorReadableStream,
createProxy, createProxy,

View file

@ -34,8 +34,11 @@ use std::cell::RefCell;
use std::convert::TryFrom; use std::convert::TryFrom;
use std::fmt; use std::fmt;
use std::path::PathBuf; use std::path::PathBuf;
use std::pin::Pin;
use std::rc::Rc; use std::rc::Rc;
use std::sync::Arc; use std::sync::Arc;
use tokio::io::AsyncRead;
use tokio::io::AsyncWrite;
use tokio::net::TcpStream; use tokio::net::TcpStream;
use tokio_rustls::rustls::RootCertStore; use tokio_rustls::rustls::RootCertStore;
use tokio_rustls::rustls::ServerName; use tokio_rustls::rustls::ServerName;
@ -67,23 +70,25 @@ pub trait WebSocketPermissions {
/// would override previously used alias. /// would override previously used alias.
pub struct UnsafelyIgnoreCertificateErrors(Option<Vec<String>>); pub struct UnsafelyIgnoreCertificateErrors(Option<Vec<String>>);
type WsStream = WebSocketStream<MaybeTlsStream<TcpStream>>; type ClientWsStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
type ServerWsStream = WebSocketStream<Pin<Box<dyn Upgraded>>>;
pub enum WebSocketStreamType { pub enum WebSocketStreamType {
Client { Client {
tx: AsyncRefCell<SplitSink<WsStream, Message>>, tx: AsyncRefCell<SplitSink<ClientWsStream, Message>>,
rx: AsyncRefCell<SplitStream<WsStream>>, rx: AsyncRefCell<SplitStream<ClientWsStream>>,
}, },
Server { Server {
tx: AsyncRefCell< tx: AsyncRefCell<SplitSink<ServerWsStream, Message>>,
SplitSink<WebSocketStream<hyper::upgrade::Upgraded>, Message>, rx: AsyncRefCell<SplitStream<ServerWsStream>>,
>,
rx: AsyncRefCell<SplitStream<WebSocketStream<hyper::upgrade::Upgraded>>>,
}, },
} }
pub trait Upgraded: AsyncRead + AsyncWrite + Unpin {}
pub async fn ws_create_server_stream( pub async fn ws_create_server_stream(
state: &Rc<RefCell<OpState>>, state: &Rc<RefCell<OpState>>,
transport: hyper::upgrade::Upgraded, transport: Pin<Box<dyn Upgraded>>,
) -> Result<ResourceId, AnyError> { ) -> Result<ResourceId, AnyError> {
let ws_stream = WebSocketStream::from_raw_socket( let ws_stream = WebSocketStream::from_raw_socket(
transport, transport,
@ -340,7 +345,7 @@ where
..Default::default() ..Default::default()
}), }),
); );
let (stream, response): (WsStream, Response) = let (stream, response): (ClientWsStream, Response) =
if let Some(cancel_resource) = cancel_resource { if let Some(cancel_resource) = cancel_resource {
client.or_cancel(cancel_resource.0.to_owned()).await? client.or_cancel(cancel_resource.0.to_owned()).await?
} else { } else {

View file

@ -28,6 +28,7 @@ deno_core = { version = "0.147.0", path = "../core" }
deno_crypto = { version = "0.79.0", path = "../ext/crypto" } deno_crypto = { version = "0.79.0", path = "../ext/crypto" }
deno_fetch = { version = "0.88.0", path = "../ext/fetch" } deno_fetch = { version = "0.88.0", path = "../ext/fetch" }
deno_ffi = { version = "0.52.0", path = "../ext/ffi" } deno_ffi = { version = "0.52.0", path = "../ext/ffi" }
deno_flash = { path = "../ext/flash" }
deno_http = { version = "0.59.0", path = "../ext/http" } deno_http = { version = "0.59.0", path = "../ext/http" }
deno_net = { version = "0.57.0", path = "../ext/net" } deno_net = { version = "0.57.0", path = "../ext/net" }
deno_node = { version = "0.2.0", path = "../ext/node" } deno_node = { version = "0.2.0", path = "../ext/node" }
@ -52,6 +53,7 @@ deno_core = { version = "0.147.0", path = "../core" }
deno_crypto = { version = "0.79.0", path = "../ext/crypto" } deno_crypto = { version = "0.79.0", path = "../ext/crypto" }
deno_fetch = { version = "0.88.0", path = "../ext/fetch" } deno_fetch = { version = "0.88.0", path = "../ext/fetch" }
deno_ffi = { version = "0.52.0", path = "../ext/ffi" } deno_ffi = { version = "0.52.0", path = "../ext/ffi" }
deno_flash = { path = "../ext/flash" }
deno_http = { version = "0.59.0", path = "../ext/http" } deno_http = { version = "0.59.0", path = "../ext/http" }
deno_net = { version = "0.57.0", path = "../ext/net" } deno_net = { version = "0.57.0", path = "../ext/net" }
deno_node = { version = "0.2.0", path = "../ext/node" } deno_node = { version = "0.2.0", path = "../ext/node" }

View file

@ -116,6 +116,15 @@ mod not_docs {
} }
} }
impl deno_flash::FlashPermissions for Permissions {
fn check_net<T: AsRef<str>>(
&mut self,
_host: &(T, Option<u16>),
) -> Result<(), deno_core::error::AnyError> {
unreachable!("snapshotting!")
}
}
impl deno_net::NetPermissions for Permissions { impl deno_net::NetPermissions for Permissions {
fn check_net<T: AsRef<str>>( fn check_net<T: AsRef<str>>(
&mut self, &mut self,
@ -165,6 +174,7 @@ mod not_docs {
None, None,
), ),
deno_http::init(), deno_http::init(),
deno_flash::init::<Permissions>(false), // No --unstable
]; ];
let js_runtime = JsRuntime::new(RuntimeOptions { let js_runtime = JsRuntime::new(RuntimeOptions {

View file

@ -153,5 +153,7 @@
spawnChild: __bootstrap.spawn.spawnChild, spawnChild: __bootstrap.spawn.spawnChild,
spawn: __bootstrap.spawn.spawn, spawn: __bootstrap.spawn.spawn,
spawnSync: __bootstrap.spawn.spawnSync, spawnSync: __bootstrap.spawn.spawnSync,
serve: __bootstrap.flash.serve,
serveTls: __bootstrap.flash.serveTls,
}; };
})(this); })(this);

View file

@ -27,7 +27,11 @@ use tokio::net::UnixStream;
pub fn init() -> Extension { pub fn init() -> Extension {
Extension::builder() Extension::builder()
.ops(vec![op_http_start::decl(), op_http_upgrade::decl()]) .ops(vec![
op_http_start::decl(),
op_http_upgrade::decl(),
op_flash_upgrade_http::decl(),
])
.build() .build()
} }
@ -78,6 +82,23 @@ fn op_http_start(
Err(bad_resource_id()) Err(bad_resource_id())
} }
#[op]
fn op_flash_upgrade_http(
state: &mut OpState,
token: u32,
server_id: u32,
) -> Result<deno_core::ResourceId, AnyError> {
let flash_ctx = state.borrow_mut::<deno_flash::FlashContext>();
let ctx = flash_ctx.servers.get_mut(&server_id).unwrap();
let tcp_stream = deno_flash::detach_socket(ctx, token)?;
Ok(
state
.resource_table
.add(TcpStreamResource::new(tcp_stream.into_split())),
)
}
#[derive(Serialize)] #[derive(Serialize)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct HttpUpgradeResult { pub struct HttpUpgradeResult {

View file

@ -582,6 +582,16 @@ impl Resource for StdFileResource {
fn write(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> { fn write(self: Rc<Self>, buf: ZeroCopyBuf) -> AsyncResult<usize> {
Box::pin(self.write(buf)) Box::pin(self.write(buf))
} }
#[cfg(unix)]
fn backing_fd(self: Rc<Self>) -> Option<std::os::unix::prelude::RawFd> {
use std::os::unix::io::AsRawFd;
self
.with_inner_and_metadata(move |std_file, _| {
Ok(std_file.with_file(|f| f.as_raw_fd()))
})
.ok()
}
} }
// override op_print to use the stdout and stderr in the resource table // override op_print to use the stdout and stderr in the resource table

View file

@ -1313,6 +1313,15 @@ impl Permissions {
} }
} }
impl deno_flash::FlashPermissions for Permissions {
fn check_net<T: AsRef<str>>(
&mut self,
host: &(T, Option<u16>),
) -> Result<(), AnyError> {
self.net.check(host)
}
}
impl deno_net::NetPermissions for Permissions { impl deno_net::NetPermissions for Permissions {
fn check_net<T: AsRef<str>>( fn check_net<T: AsRef<str>>(
&mut self, &mut self,

View file

@ -429,6 +429,7 @@ impl WebWorker {
ops::signal::init(), ops::signal::init(),
ops::tty::init(), ops::tty::init(),
deno_http::init(), deno_http::init(),
deno_flash::init::<Permissions>(unstable),
ops::http::init(), ops::http::init(),
// Permissions ext (worker specific state) // Permissions ext (worker specific state)
perm_ext, perm_ext,

View file

@ -170,6 +170,7 @@ impl MainWorker {
ops::signal::init(), ops::signal::init(),
ops::tty::init(), ops::tty::init(),
deno_http::init(), deno_http::init(),
deno_flash::init::<Permissions>(unstable),
ops::http::init(), ops::http::init(),
// Permissions ext (worker specific state) // Permissions ext (worker specific state)
perm_ext, perm_ext,

View file

@ -22,6 +22,7 @@ async function dlint() {
":!:cli/tests/testdata/error_008_checkjs.js", ":!:cli/tests/testdata/error_008_checkjs.js",
":!:cli/bench/http/node*.js", ":!:cli/bench/http/node*.js",
":!:cli/bench/testdata/express-router.js", ":!:cli/bench/testdata/express-router.js",
":!:cli/bench/testdata/react-dom.js",
":!:cli/compilers/wasm_wrap.js", ":!:cli/compilers/wasm_wrap.js",
":!:cli/dts/**", ":!:cli/dts/**",
":!:cli/tests/testdata/encoding/**", ":!:cli/tests/testdata/encoding/**",