From f356b2bd5e3d46eaf4147a38a2f7c7e7c2824fbf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bartek=20Iwa=C5=84czuk?= Date: Sun, 17 Nov 2019 14:14:50 +0100 Subject: [PATCH] refactor: fixes for futures (#3363) After landing #3358 the benchmarks exploded indicating problems with workers and deno_core_http_bench. This PR dramatically fixes thread/syscall count that showed up on benchmarks. Thread count is not back to previous levels but difference went from hundreds/thousands to about ~50. --- cli/compilers/ts.rs | 4 ++-- cli/compilers/wasm.rs | 13 ++++++------- cli/ops/workers.rs | 3 ++- cli/worker.rs | 19 +++++++------------ core/examples/http_bench.rs | 22 +++++++++++++++------- 5 files changed, 32 insertions(+), 29 deletions(-) diff --git a/cli/compilers/ts.rs b/cli/compilers/ts.rs index 34bf74ab1d..13823b24f1 100644 --- a/cli/compilers/ts.rs +++ b/cli/compilers/ts.rs @@ -272,8 +272,8 @@ impl TsCompiler { let worker = TsCompiler::setup_worker(global_state.clone()); let worker_ = worker.clone(); + worker.post_message(req_msg).unwrap(); let first_msg_fut = async move { - worker.post_message(req_msg).await.unwrap(); let result = worker.await; if let Err(err) = result { // TODO(ry) Need to forward the error instead of exiting. @@ -382,8 +382,8 @@ impl TsCompiler { .add("Compile", &module_url.to_string()); let global_state_ = global_state.clone(); + worker.post_message(req_msg).unwrap(); let first_msg_fut = async move { - worker.post_message(req_msg).await.unwrap(); let result = worker.await; if let Err(err) = result { // TODO(ry) Need to forward the error instead of exiting. diff --git a/cli/compilers/wasm.rs b/cli/compilers/wasm.rs index 30a171db44..882e28e43b 100644 --- a/cli/compilers/wasm.rs +++ b/cli/compilers/wasm.rs @@ -86,14 +86,13 @@ impl WasmCompiler { let worker_ = worker.clone(); let url = source_file.url.clone(); + let _res = worker.post_message( + serde_json::to_string(&base64_data) + .unwrap() + .into_boxed_str() + .into_boxed_bytes(), + ); let fut = worker - .post_message( - serde_json::to_string(&base64_data) - .unwrap() - .into_boxed_str() - .into_boxed_bytes(), - ) - .then(move |_| worker) .then(move |result| { if let Err(err) = result { // TODO(ry) Need to forward the error instead of exiting. diff --git a/cli/ops/workers.rs b/cli/ops/workers.rs index ee60c68249..42f93ec573 100644 --- a/cli/ops/workers.rs +++ b/cli/ops/workers.rs @@ -271,7 +271,8 @@ fn op_host_post_message( let mut table = state.workers.lock().unwrap(); // TODO: don't return bad resource anymore let worker = table.get_mut(&id).ok_or_else(bad_resource)?; - tokio_util::block_on(worker.post_message(msg).boxed()) + worker + .post_message(msg) .map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?; Ok(JsonOp::Sync(json!({}))) } diff --git a/cli/worker.rs b/cli/worker.rs index aca822888c..d5cc801d86 100644 --- a/cli/worker.rs +++ b/cli/worker.rs @@ -144,17 +144,12 @@ impl Worker { /// Post message to worker as a host. /// /// This method blocks current thread. - pub fn post_message( - self: &Self, - buf: Buf, - ) -> impl Future> { + pub fn post_message(self: &Self, buf: Buf) -> Result<(), ErrBox> { let channels = self.external_channels.lock().unwrap(); let mut sender = channels.sender.clone(); - async move { - let result = sender.send(buf).map_err(ErrBox::from).await; - drop(sender); - result - } + futures::executor::block_on(sender.send(buf)) + .map(|_| ()) + .map_err(ErrBox::from) } /// Get message from worker as a host. @@ -396,7 +391,7 @@ mod tests { let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); - let r = futures::executor::block_on(worker_.post_message(msg).boxed()); + let r = worker_.post_message(msg); assert!(r.is_ok()); let maybe_msg = @@ -409,7 +404,7 @@ mod tests { .to_string() .into_boxed_str() .into_boxed_bytes(); - let r = futures::executor::block_on(worker_.post_message(msg).boxed()); + let r = worker_.post_message(msg); assert!(r.is_ok()); }) } @@ -439,7 +434,7 @@ mod tests { ); let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes(); - let r = futures::executor::block_on(worker_.post_message(msg)); + let r = worker_.post_message(msg); assert!(r.is_ok()); futures::executor::block_on(worker_future).unwrap(); diff --git a/core/examples/http_bench.rs b/core/examples/http_bench.rs index 6a9213cbe4..3172f17dde 100644 --- a/core/examples/http_bench.rs +++ b/core/examples/http_bench.rs @@ -15,6 +15,7 @@ extern crate lazy_static; use deno::*; use futures::future::FutureExt; use futures::future::TryFutureExt; +use futures::stream::StreamExt; use std::env; use std::future::Future; use std::io::Error; @@ -24,6 +25,7 @@ use std::pin::Pin; use std::sync::Mutex; use std::sync::MutexGuard; use std::task::Poll; +use tokio::net::tcp::Incoming; use tokio::prelude::Async; use tokio::prelude::AsyncRead; use tokio::prelude::AsyncWrite; @@ -190,7 +192,7 @@ pub fn bad_resource() -> Error { Error::new(ErrorKind::NotFound, "bad resource id") } -struct TcpListener(tokio::net::TcpListener); +struct TcpListener(Incoming); impl Resource for TcpListener {} @@ -213,14 +215,19 @@ fn op_accept( ) -> Pin> { let rid = record.arg as u32; debug!("accept {}", rid); - let fut = futures::future::poll_fn(move |_cx| { + let fut = futures::future::poll_fn(move |cx| { let mut table = lock_resource_table(); let listener = table.get_mut::(rid).ok_or_else(bad_resource)?; - match listener.0.poll_accept() { - Err(e) => Poll::Ready(Err(e)), - Ok(Async::Ready(v)) => Poll::Ready(Ok(v)), - Ok(Async::NotReady) => Poll::Pending, + let mut listener = futures::compat::Compat01As03::new(&mut listener.0); + match listener.poll_next_unpin(cx) { + Poll::Ready(Some(Err(e))) => Poll::Ready(Err(e)), + Poll::Ready(Some(Ok(stream))) => { + let addr = stream.peer_addr().unwrap(); + Poll::Ready(Ok((stream, addr))) + } + Poll::Pending => Poll::Pending, + _ => unreachable!(), } }) .and_then(move |(stream, addr)| { @@ -240,7 +247,8 @@ fn op_listen( let addr = "127.0.0.1:4544".parse::().unwrap(); let listener = tokio::net::TcpListener::bind(&addr).unwrap(); let mut table = lock_resource_table(); - let rid = table.add("tcpListener", Box::new(TcpListener(listener))); + let rid = + table.add("tcpListener", Box::new(TcpListener(listener.incoming()))); futures::future::ok(rid as i32).boxed() }