0
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-03-03 17:34:47 -05:00

refactor(ext/http): rewrite hyper integration and fix bug (#12732)

Fixes: #12193
Fixes: #12251
Closes: #12714
This commit is contained in:
Bert Belder 2021-10-04 18:50:40 -07:00
parent 0cc8a9741a
commit 72a6231a61
No known key found for this signature in database
GPG key ID: 7A77887B2E2ED461
6 changed files with 693 additions and 648 deletions

View file

@ -7,6 +7,7 @@ import {
assert, assert,
assertEquals, assertEquals,
assertRejects, assertRejects,
assertStrictEquals,
assertThrows, assertThrows,
deferred, deferred,
delay, delay,
@ -386,7 +387,7 @@ unitTest(
Deno.errors.Http, Deno.errors.Http,
"connection closed", "connection closed",
); );
// The error from `op_http_request_next` reroutes to `respondWith()`. // The error from `op_http_accept` reroutes to `respondWith()`.
assertEquals(await nextRequestPromise, null); assertEquals(await nextRequestPromise, null);
listener.close(); listener.close();
})(); })();
@ -865,6 +866,7 @@ unitTest(
const writer = writable.getWriter(); const writer = writable.getWriter();
async function writeResponse() { async function writeResponse() {
await delay(50);
await writer.write( await writer.write(
new TextEncoder().encode( new TextEncoder().encode(
"written to the writable side of a TransformStream", "written to the writable side of a TransformStream",
@ -1000,6 +1002,80 @@ unitTest(
}, },
); );
// https://github.com/denoland/deno/issues/12193
unitTest(
{ permissions: { net: true } },
async function httpConnConcurrentNextRequestCalls() {
const hostname = "localhost";
const port = 4501;
async function server() {
const listener = Deno.listen({ hostname, port });
const tcpConn = await listener.accept();
const httpConn = Deno.serveHttp(tcpConn);
const promises = new Array(10).fill(null).map(async (_, i) => {
const event = await httpConn.nextRequest();
assert(event);
const { pathname } = new URL(event.request.url);
assertStrictEquals(pathname, `/${i}`);
const response = new Response(`Response #${i}`);
await event.respondWith(response);
});
await Promise.all(promises);
httpConn.close();
listener.close();
}
async function client() {
for (let i = 0; i < 10; i++) {
const response = await fetch(`http://${hostname}:${port}/${i}`);
const body = await response.text();
assertStrictEquals(body, `Response #${i}`);
}
}
await Promise.all([server(), delay(100).then(client)]);
},
);
// https://github.com/denoland/deno/pull/12704
// https://github.com/denoland/deno/pull/12732
unitTest(
{ permissions: { net: true } },
async function httpConnAutoCloseDelayedOnUpgrade() {
const hostname = "localhost";
const port = 4501;
async function server() {
const listener = Deno.listen({ hostname, port });
const tcpConn = await listener.accept();
const httpConn = Deno.serveHttp(tcpConn);
const event1 = await httpConn.nextRequest() as Deno.RequestEvent;
const event2Promise = httpConn.nextRequest();
const { socket, response } = Deno.upgradeWebSocket(event1.request);
socket.onmessage = (event) => socket.send(event.data);
event1.respondWith(response);
const event2 = await event2Promise;
assertStrictEquals(event2, null);
listener.close();
}
async function client() {
const socket = new WebSocket(`ws://${hostname}:${port}/`);
socket.onopen = () => socket.send("bla bla");
const { data } = await new Promise((res) => socket.onmessage = res);
assertStrictEquals(data, "bla bla");
socket.close();
}
await Promise.all([server(), client()]);
},
);
unitTest( unitTest(
{ permissions: { net: true } }, { permissions: { net: true } },
async function httpServerRespondNonAsciiUint8Array() { async function httpServerRespondNonAsciiUint8Array() {

View file

@ -27,6 +27,7 @@
Set, Set,
SetPrototypeAdd, SetPrototypeAdd,
SetPrototypeDelete, SetPrototypeDelete,
SetPrototypeHas,
SetPrototypeValues, SetPrototypeValues,
StringPrototypeIncludes, StringPrototypeIncludes,
StringPrototypeToLowerCase, StringPrototypeToLowerCase,
@ -42,6 +43,8 @@
class HttpConn { class HttpConn {
#rid = 0; #rid = 0;
#closed = false;
// This set holds resource ids of resources // This set holds resource ids of resources
// that were created during lifecycle of this request. // that were created during lifecycle of this request.
// When the connection is closed these resources should be closed // When the connection is closed these resources should be closed
@ -62,10 +65,11 @@
let nextRequest; let nextRequest;
try { try {
nextRequest = await core.opAsync( nextRequest = await core.opAsync(
"op_http_request_next", "op_http_accept",
this.#rid, this.#rid,
); );
} catch (error) { } catch (error) {
this.close();
// A connection error seen here would cause disrupted responses to throw // A connection error seen here would cause disrupted responses to throw
// a generic `BadResource` error. Instead store this error and replace // a generic `BadResource` error. Instead store this error and replace
// those with it. // those with it.
@ -79,26 +83,31 @@
} }
throw error; throw error;
} }
if (nextRequest === null) return null; if (nextRequest == null) {
// Work-around for servers (deno_std/http in particular) that call
// `nextRequest()` before upgrading a previous request which has a
// `connection: upgrade` header.
await null;
this.close();
return null;
}
const [ const [
requestRid, streamRid,
responseSenderRid,
method, method,
headersList, headersList,
url, url,
] = nextRequest; ] = nextRequest;
SetPrototypeAdd(this.managedResources, streamRid);
/** @type {ReadableStream<Uint8Array> | undefined} */ /** @type {ReadableStream<Uint8Array> | undefined} */
let body = null; let body = null;
if (typeof requestRid === "number") { // There might be a body, but we don't expose it for GET/HEAD requests.
SetPrototypeAdd(this.managedResources, requestRid); // It will be closed automatically once the request has been handled and
// There might be a body, but we don't expose it for GET/HEAD requests. // the response has been sent.
// It will be closed automatically once the request has been handled and if (method !== "GET" && method !== "HEAD") {
// the response has been sent. body = createRequestBodyStream(streamRid);
if (method !== "GET" && method !== "HEAD") {
body = createRequestBodyStream(this, requestRid);
}
} }
const innerRequest = newInnerRequest( const innerRequest = newInnerRequest(
@ -111,22 +120,21 @@
const signal = abortSignal.newSignal(); const signal = abortSignal.newSignal();
const request = fromInnerRequest(innerRequest, signal, "immutable"); const request = fromInnerRequest(innerRequest, signal, "immutable");
SetPrototypeAdd(this.managedResources, responseSenderRid); const respondWith = createRespondWith(this, streamRid);
const respondWith = createRespondWith(
this,
responseSenderRid,
requestRid,
);
return { request, respondWith }; return { request, respondWith };
} }
/** @returns {void} */ /** @returns {void} */
close() { close() {
for (const rid of SetPrototypeValues(this.managedResources)) { if (!this.#closed) {
core.tryClose(rid); this.#closed = true;
core.close(this.#rid);
for (const rid of SetPrototypeValues(this.managedResources)) {
SetPrototypeDelete(this.managedResources, rid);
core.close(rid);
}
} }
core.close(this.#rid);
} }
[SymbolAsyncIterator]() { [SymbolAsyncIterator]() {
@ -136,97 +144,86 @@
async next() { async next() {
const reqEvt = await httpConn.nextRequest(); const reqEvt = await httpConn.nextRequest();
// Change with caution, current form avoids a v8 deopt // Change with caution, current form avoids a v8 deopt
return { value: reqEvt, done: reqEvt === null }; return { value: reqEvt ?? undefined, done: reqEvt === null };
}, },
}; };
} }
} }
function readRequest(requestRid, zeroCopyBuf) { function readRequest(streamRid, buf) {
return core.opAsync( return core.opAsync("op_http_read", streamRid, buf);
"op_http_request_read",
requestRid,
zeroCopyBuf,
);
} }
function createRespondWith(httpConn, responseSenderRid, requestRid) { function createRespondWith(httpConn, streamRid) {
return async function respondWith(resp) { return async function respondWith(resp) {
if (resp instanceof Promise) { try {
resp = await resp; if (resp instanceof Promise) {
} resp = await resp;
}
if (!(resp instanceof Response)) { if (!(resp instanceof Response)) {
throw new TypeError( throw new TypeError(
"First argument to respondWith must be a Response or a promise resolving to a Response.", "First argument to respondWith must be a Response or a promise resolving to a Response.",
); );
} }
const innerResp = toInnerResponse(resp); const innerResp = toInnerResponse(resp);
// If response body length is known, it will be sent synchronously in a // If response body length is known, it will be sent synchronously in a
// single op, in other case a "response body" resource will be created and // single op, in other case a "response body" resource will be created and
// we'll be streaming it. // we'll be streaming it.
/** @type {ReadableStream<Uint8Array> | Uint8Array | null} */ /** @type {ReadableStream<Uint8Array> | Uint8Array | null} */
let respBody = null; let respBody = null;
if (innerResp.body !== null) { if (innerResp.body !== null) {
if (innerResp.body.unusable()) throw new TypeError("Body is unusable."); if (innerResp.body.unusable()) {
if (innerResp.body.streamOrStatic instanceof ReadableStream) { throw new TypeError("Body is unusable.");
if ( }
innerResp.body.length === null || if (innerResp.body.streamOrStatic instanceof ReadableStream) {
innerResp.body.source instanceof Blob if (
) { innerResp.body.length === null ||
respBody = innerResp.body.stream; innerResp.body.source instanceof Blob
} else { ) {
const reader = innerResp.body.stream.getReader(); respBody = innerResp.body.stream;
const r1 = await reader.read();
if (r1.done) {
respBody = new Uint8Array(0);
} else { } else {
respBody = r1.value; const reader = innerResp.body.stream.getReader();
const r2 = await reader.read(); const r1 = await reader.read();
if (!r2.done) throw new TypeError("Unreachable"); if (r1.done) {
respBody = new Uint8Array(0);
} else {
respBody = r1.value;
const r2 = await reader.read();
if (!r2.done) throw new TypeError("Unreachable");
}
} }
} else {
innerResp.body.streamOrStatic.consumed = true;
respBody = innerResp.body.streamOrStatic.body;
} }
} else { } else {
innerResp.body.streamOrStatic.consumed = true; respBody = new Uint8Array(0);
respBody = innerResp.body.streamOrStatic.body;
} }
} else { const isStreamingResponseBody =
respBody = new Uint8Array(0); !(typeof respBody === "string" || respBody instanceof Uint8Array);
}
SetPrototypeDelete(httpConn.managedResources, responseSenderRid); try {
let responseBodyRid; await core.opAsync("op_http_write_headers", [
try { streamRid,
responseBodyRid = await core.opAsync(
"op_http_response",
[
responseSenderRid,
innerResp.status ?? 200, innerResp.status ?? 200,
innerResp.headerList, innerResp.headerList,
], ], isStreamingResponseBody ? null : respBody);
(respBody instanceof Uint8Array || typeof respBody === "string") } catch (error) {
? respBody const connError = httpConn[connErrorSymbol];
: null, if (error instanceof BadResource && connError != null) {
); // deno-lint-ignore no-ex-assign
} catch (error) { error = new connError.constructor(connError.message);
const connError = httpConn[connErrorSymbol]; }
if (error instanceof BadResource && connError != null) { if (respBody !== null && respBody instanceof ReadableStream) {
// deno-lint-ignore no-ex-assign await respBody.cancel(error);
error = new connError.constructor(connError.message); }
throw error;
} }
if (respBody !== null && respBody instanceof ReadableStream) {
await respBody.cancel(error);
}
throw error;
}
// If `respond` returns a responseBodyRid, we should stream the body if (isStreamingResponseBody) {
// to that resource.
if (responseBodyRid !== null) {
SetPrototypeAdd(httpConn.managedResources, responseBodyRid);
try {
if (respBody === null || !(respBody instanceof ReadableStream)) { if (respBody === null || !(respBody instanceof ReadableStream)) {
throw new TypeError("Unreachable"); throw new TypeError("Unreachable");
} }
@ -239,11 +236,7 @@
break; break;
} }
try { try {
await core.opAsync( await core.opAsync("op_http_write", streamRid, value);
"op_http_response_write",
responseBodyRid,
value,
);
} catch (error) { } catch (error) {
const connError = httpConn[connErrorSymbol]; const connError = httpConn[connErrorSymbol];
if (error instanceof BadResource && connError != null) { if (error instanceof BadResource && connError != null) {
@ -254,61 +247,55 @@
throw error; throw error;
} }
} }
} finally {
// Once all chunks are sent, and the request body is closed, we can
// close the response body.
SetPrototypeDelete(httpConn.managedResources, responseBodyRid);
try { try {
await core.opAsync("op_http_response_close", responseBodyRid); await core.opAsync("op_http_shutdown", streamRid);
} catch { /* pass */ } } catch (error) {
await reader.cancel(error);
throw error;
}
} }
}
const ws = resp[_ws]; const ws = resp[_ws];
if (ws) { if (ws) {
if (typeof requestRid !== "number") { const wsRid = await core.opAsync(
throw new TypeError( "op_http_upgrade_websocket",
"This request can not be upgraded to a websocket connection.", streamRid,
); );
ws[_rid] = wsRid;
ws[_protocol] = resp.headers.get("sec-websocket-protocol");
httpConn.close();
if (ws[_readyState] === WebSocket.CLOSING) {
await core.opAsync("op_ws_close", { rid: wsRid });
ws[_readyState] = WebSocket.CLOSED;
const errEvent = new ErrorEvent("error");
ws.dispatchEvent(errEvent);
const event = new CloseEvent("close");
ws.dispatchEvent(event);
core.tryClose(wsRid);
} else {
ws[_readyState] = WebSocket.OPEN;
const event = new Event("open");
ws.dispatchEvent(event);
ws[_eventLoop]();
}
} }
} finally {
const wsRid = await core.opAsync( if (SetPrototypeHas(httpConn.managedResources, streamRid)) {
"op_http_upgrade_websocket", SetPrototypeDelete(httpConn.managedResources, streamRid);
requestRid, core.close(streamRid);
);
ws[_rid] = wsRid;
ws[_protocol] = resp.headers.get("sec-websocket-protocol");
if (ws[_readyState] === WebSocket.CLOSING) {
await core.opAsync("op_ws_close", { rid: wsRid });
ws[_readyState] = WebSocket.CLOSED;
const errEvent = new ErrorEvent("error");
ws.dispatchEvent(errEvent);
const event = new CloseEvent("close");
ws.dispatchEvent(event);
core.tryClose(wsRid);
} else {
ws[_readyState] = WebSocket.OPEN;
const event = new Event("open");
ws.dispatchEvent(event);
ws[_eventLoop]();
} }
} else if (typeof requestRid === "number") {
// Try to close "request" resource. It might have been already consumed,
// but if it hasn't been we need to close it here to avoid resource
// leak.
SetPrototypeDelete(httpConn.managedResources, requestRid);
core.tryClose(requestRid);
} }
}; };
} }
function createRequestBodyStream(httpConn, requestRid) { function createRequestBodyStream(streamRid) {
return new ReadableStream({ return new ReadableStream({
type: "bytes", type: "bytes",
async pull(controller) { async pull(controller) {
@ -316,32 +303,21 @@
// This is the largest possible size for a single packet on a TLS // This is the largest possible size for a single packet on a TLS
// stream. // stream.
const chunk = new Uint8Array(16 * 1024 + 256); const chunk = new Uint8Array(16 * 1024 + 256);
const read = await readRequest( const read = await readRequest(streamRid, chunk);
requestRid,
chunk,
);
if (read > 0) { if (read > 0) {
// We read some data. Enqueue it onto the stream. // We read some data. Enqueue it onto the stream.
controller.enqueue(TypedArrayPrototypeSubarray(chunk, 0, read)); controller.enqueue(TypedArrayPrototypeSubarray(chunk, 0, read));
} else { } else {
// We have reached the end of the body, so we close the stream. // We have reached the end of the body, so we close the stream.
controller.close(); controller.close();
SetPrototypeDelete(httpConn.managedResources, requestRid);
core.close(requestRid);
} }
} catch (err) { } catch (err) {
// There was an error while reading a chunk of the body, so we // There was an error while reading a chunk of the body, so we
// error. // error.
controller.error(err); controller.error(err);
controller.close(); controller.close();
SetPrototypeDelete(httpConn.managedResources, requestRid);
core.close(requestRid);
} }
}, },
cancel() {
SetPrototypeDelete(httpConn.managedResources, requestRid);
core.close(requestRid);
},
}); });
} }

File diff suppressed because it is too large Load diff

View file

@ -34,12 +34,13 @@ use std::sync::Arc;
use tokio::net::TcpStream; use tokio::net::TcpStream;
use tokio_rustls::rustls::RootCertStore; use tokio_rustls::rustls::RootCertStore;
use tokio_rustls::TlsConnector; use tokio_rustls::TlsConnector;
use tokio_tungstenite::client_async;
use tokio_tungstenite::tungstenite::{ use tokio_tungstenite::tungstenite::{
handshake::client::Response, protocol::frame::coding::CloseCode, handshake::client::Response, protocol::frame::coding::CloseCode,
protocol::CloseFrame, Message, protocol::CloseFrame, protocol::Role, Message,
}; };
use tokio_tungstenite::MaybeTlsStream; use tokio_tungstenite::MaybeTlsStream;
use tokio_tungstenite::{client_async, WebSocketStream}; use tokio_tungstenite::WebSocketStream;
pub use tokio_tungstenite; // Re-export tokio_tungstenite pub use tokio_tungstenite; // Re-export tokio_tungstenite
@ -72,6 +73,27 @@ pub enum WebSocketStreamType {
}, },
} }
pub async fn ws_create_server_stream(
state: &Rc<RefCell<OpState>>,
transport: hyper::upgrade::Upgraded,
) -> Result<ResourceId, AnyError> {
let ws_stream =
WebSocketStream::from_raw_socket(transport, Role::Server, None).await;
let (ws_tx, ws_rx) = ws_stream.split();
let ws_resource = WsStreamResource {
stream: WebSocketStreamType::Server {
tx: AsyncRefCell::new(ws_tx),
rx: AsyncRefCell::new(ws_rx),
},
cancel: Default::default(),
};
let resource_table = &mut state.borrow_mut().resource_table;
let rid = resource_table.add(ws_resource);
Ok(rid)
}
pub struct WsStreamResource { pub struct WsStreamResource {
pub stream: WebSocketStreamType, pub stream: WebSocketStreamType,
// When a `WsStreamResource` resource is closed, all pending 'read' ops are // When a `WsStreamResource` resource is closed, all pending 'read' ops are

View file

@ -17,6 +17,7 @@ use deno_fetch::reqwest;
use std::env; use std::env;
use std::error::Error; use std::error::Error;
use std::io; use std::io;
use std::sync::Arc;
fn get_dlopen_error_class(error: &dlopen::Error) -> &'static str { fn get_dlopen_error_class(error: &dlopen::Error) -> &'static str {
use dlopen::Error::*; use dlopen::Error::*;
@ -163,6 +164,10 @@ pub fn get_error_class_name(e: &AnyError) -> Option<&'static str> {
.map(get_dlopen_error_class) .map(get_dlopen_error_class)
}) })
.or_else(|| e.downcast_ref::<hyper::Error>().map(get_hyper_error_class)) .or_else(|| e.downcast_ref::<hyper::Error>().map(get_hyper_error_class))
.or_else(|| {
e.downcast_ref::<Arc<hyper::Error>>()
.map(|e| get_hyper_error_class(&**e))
})
.or_else(|| { .or_else(|| {
e.downcast_ref::<deno_core::Canceled>().map(|e| { e.downcast_ref::<deno_core::Canceled>().map(|e| {
let io_err: io::Error = e.to_owned().into(); let io_err: io::Error = e.to_owned().into();

View file

@ -6,6 +6,7 @@ use deno_core::op_sync;
use deno_core::Extension; use deno_core::Extension;
use deno_core::OpState; use deno_core::OpState;
use deno_core::ResourceId; use deno_core::ResourceId;
use deno_http::http_create_conn_resource;
use deno_net::io::TcpStreamResource; use deno_net::io::TcpStreamResource;
use deno_net::ops_tls::TlsStreamResource; use deno_net::ops_tls::TlsStreamResource;
@ -29,7 +30,7 @@ fn op_http_start(
let (read_half, write_half) = resource.into_inner(); let (read_half, write_half) = resource.into_inner();
let tcp_stream = read_half.reunite(write_half)?; let tcp_stream = read_half.reunite(write_half)?;
let addr = tcp_stream.local_addr()?; let addr = tcp_stream.local_addr()?;
return deno_http::start_http(state, tcp_stream, addr, "http"); return http_create_conn_resource(state, tcp_stream, addr, "http");
} }
if let Ok(resource_rc) = state if let Ok(resource_rc) = state
@ -41,7 +42,7 @@ fn op_http_start(
let (read_half, write_half) = resource.into_inner(); let (read_half, write_half) = resource.into_inner();
let tls_stream = read_half.reunite(write_half); let tls_stream = read_half.reunite(write_half);
let addr = tls_stream.get_ref().0.local_addr()?; let addr = tls_stream.get_ref().0.local_addr()?;
return deno_http::start_http(state, tls_stream, addr, "https"); return http_create_conn_resource(state, tls_stream, addr, "https");
} }
Err(bad_resource_id()) Err(bad_resource_id())