From 1e1959f6fac6dd0e499532772c8143285cdd81de Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Mon, 14 Jun 2021 14:52:49 +0200 Subject: [PATCH] fix: hang in Deno.serveHttp() (#10923) Waiting on next request in Deno.serveHttp() API hanged when responses were using ReadableStream. This was caused by op_http_request_next op that was never woken after response was fully written. This commit adds waker field to DenoService which is called after response is finished. --- Cargo.lock | 6 +-- cli/tests/unit/http_test.ts | 77 +++++++++++++++++++++++++++++++++++++ runtime/Cargo.toml | 2 +- runtime/ops/http.rs | 20 ++++++---- 4 files changed, 94 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 022a609b85..7bd3314b06 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1636,9 +1636,9 @@ checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" [[package]] name = "hyper" -version = "0.14.7" +version = "0.14.9" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e5f105c494081baa3bf9e200b279e27ec1623895cd504c7dbef8d0b080fcf54" +checksum = "07d6baa1b441335f3ce5098ac421fb6547c46dda735ca1bc6d0153c838f9dd83" dependencies = [ "bytes", "futures-channel", @@ -1650,7 +1650,7 @@ dependencies = [ "httparse", "httpdate", "itoa", - "pin-project", + "pin-project-lite", "socket2 0.4.0", "tokio", "tower-service", diff --git a/cli/tests/unit/http_test.ts b/cli/tests/unit/http_test.ts index df599c6f4f..4a362a4796 100644 --- a/cli/tests/unit/http_test.ts +++ b/cli/tests/unit/http_test.ts @@ -1,4 +1,8 @@ // Copyright 2018-2021 the Deno authors. All rights reserved. MIT license. +import { chunkedBodyReader } from "../../../test_util/std/http/_io.ts"; +import { BufReader, BufWriter } from "../../../test_util/std/io/bufio.ts"; +import { Buffer } from "../../../test_util/std/io/buffer.ts"; +import { TextProtoReader } from "../../../test_util/std/textproto/mod.ts"; import { assert, assertEquals, @@ -6,6 +10,33 @@ import { unitTest, } from "./test_util.ts"; +async function writeRequestAndReadResponse(conn: Deno.Conn): Promise { + const encoder = new TextEncoder(); + const decoder = new TextDecoder(); + + const w = new BufWriter(conn); + const r = new BufReader(conn); + const body = `GET / HTTP/1.1\r\nHost: 127.0.0.1:4501\r\n\r\n`; + const writeResult = await w.write(encoder.encode(body)); + assertEquals(body.length, writeResult); + await w.flush(); + const tpr = new TextProtoReader(r); + const statusLine = await tpr.readLine(); + assert(statusLine !== null); + const headers = await tpr.readMIMEHeader(); + assert(headers !== null); + + const chunkedReader = chunkedBodyReader(headers, r); + const buf = new Uint8Array(5); + const dest = new Buffer(); + let result: number | null; + while ((result = await chunkedReader.read(buf)) !== null) { + const len = Math.min(buf.byteLength, result); + await dest.write(buf.subarray(0, len)); + } + return decoder.decode(dest.bytes()); +} + unitTest({ perms: { net: true } }, async function httpServerBasic() { const promise = (async () => { const listener = Deno.listen({ port: 4501 }); @@ -373,3 +404,49 @@ unitTest( await delay(300); }, ); + +unitTest( + { perms: { net: true } }, + // Issue: https://github.com/denoland/deno/issues/10870 + async function httpServerHang() { + // Quick and dirty way to make a readable stream from a string. Alternatively, + // `readableStreamFromReader(file)` could be used. + function stream(s: string): ReadableStream { + return new Response(s).body!; + } + + const httpConns: Deno.HttpConn[] = []; + const promise = (async () => { + let count = 0; + const listener = Deno.listen({ port: 4501 }); + for await (const conn of listener) { + (async () => { + const httpConn = Deno.serveHttp(conn); + httpConns.push(httpConn); + for await (const { respondWith } of httpConn) { + respondWith(new Response(stream("hello"))); + + count++; + if (count >= 2) { + listener.close(); + } + } + })(); + } + })(); + + const clientConn = await Deno.connect({ port: 4501 }); + + const r1 = await writeRequestAndReadResponse(clientConn); + assertEquals(r1, "hello"); + + const r2 = await writeRequestAndReadResponse(clientConn); + assertEquals(r2, "hello"); + + clientConn.close(); + await promise; + for (const conn of httpConns) { + conn.close(); + } + }, +); diff --git a/runtime/Cargo.toml b/runtime/Cargo.toml index 0bd8d13c4c..356fb46946 100644 --- a/runtime/Cargo.toml +++ b/runtime/Cargo.toml @@ -55,7 +55,7 @@ dlopen = "0.1.8" encoding_rs = "0.8.28" filetime = "0.2.14" http = "0.2.3" -hyper = { version = "0.14.5", features = ["server", "stream", "http1", "http2", "runtime"] } +hyper = { version = "0.14.9", features = ["server", "stream", "http1", "http2", "runtime"] } indexmap = "1.6.2" lazy_static = "1.4.0" libc = "0.2.93" diff --git a/runtime/ops/http.rs b/runtime/ops/http.rs index fedcb404f5..11e83f6c77 100644 --- a/runtime/ops/http.rs +++ b/runtime/ops/http.rs @@ -66,6 +66,7 @@ struct ServiceInner { #[derive(Clone, Default)] struct Service { inner: Rc>>, + waker: Rc, } impl HyperService> for Service { @@ -160,15 +161,16 @@ async fn op_http_request_next( let cancel = RcRef::map(conn_resource.clone(), |r| &r.cancel); poll_fn(|cx| { + conn_resource.deno_service.waker.register(cx.waker()); let connection_closed = match conn_resource.poll(cx) { Poll::Pending => false, Poll::Ready(Ok(())) => { - // close ConnResource - state + // try to close ConnResource, but don't unwrap as it might + // already be closed + let _ = state .borrow_mut() .resource_table - .take::(conn_rid) - .unwrap(); + .take::(conn_rid); true } Poll::Ready(Err(e)) => { @@ -188,7 +190,6 @@ async fn op_http_request_next( } } }; - if let Some(request_resource) = conn_resource.deno_service.inner.borrow_mut().take() { @@ -409,6 +410,9 @@ async fn op_http_response( }) .await?; + if maybe_response_body_rid.is_none() { + conn_resource.deno_service.waker.wake(); + } Ok(maybe_response_body_rid) } @@ -430,11 +434,13 @@ async fn op_http_response_close( .ok_or_else(bad_resource_id)?; drop(resource); - poll_fn(|cx| match conn_resource.poll(cx) { + let r = poll_fn(|cx| match conn_resource.poll(cx) { Poll::Ready(x) => Poll::Ready(x), Poll::Pending => Poll::Ready(Ok(())), }) - .await + .await; + conn_resource.deno_service.waker.wake(); + r } async fn op_http_request_read(