From f418be4f57594e88e5bcc6f384f1f6ad39f3918f Mon Sep 17 00:00:00 2001 From: Divy Srivastava Date: Wed, 18 Jan 2023 07:09:54 -0800 Subject: [PATCH] Optimize http_bench_json_ops example (#16505) --- core/Cargo.toml | 2 +- .../http_bench_json_ops.js | 25 ++--- .../main.rs} | 92 +++++++++---------- 3 files changed, 57 insertions(+), 62 deletions(-) rename core/examples/{ => http_bench_json_ops}/http_bench_json_ops.js (74%) rename core/examples/{http_bench_json_ops.rs => http_bench_json_ops/main.rs} (71%) diff --git a/core/Cargo.toml b/core/Cargo.toml index 16a1d533d7..1a65998058 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -40,7 +40,7 @@ v8.workspace = true [[example]] name = "http_bench_json_ops" -path = "examples/http_bench_json_ops.rs" +path = "examples/http_bench_json_ops/main.rs" # These dependencies are only used for the 'http_bench_*_ops' examples. [dev-dependencies] diff --git a/core/examples/http_bench_json_ops.js b/core/examples/http_bench_json_ops/http_bench_json_ops.js similarity index 74% rename from core/examples/http_bench_json_ops.js rename to core/examples/http_bench_json_ops/http_bench_json_ops.js index 28c4910644..9650804c70 100644 --- a/core/examples/http_bench_json_ops.js +++ b/core/examples/http_bench_json_ops/http_bench_json_ops.js @@ -4,6 +4,8 @@ // exercise the event loop in a simple yet semi-realistic way. Deno.core.initializeAsyncOps(); +const { ops } = Deno.core; + const requestBuf = new Uint8Array(64 * 1024); const responseBuf = new Uint8Array( "HTTP/1.1 200 OK\r\nContent-Length: 12\r\n\r\nHello World\n" @@ -13,27 +15,28 @@ const responseBuf = new Uint8Array( /** Listens on 0.0.0.0:4570, returns rid. */ function listen() { - return Deno.core.ops.op_listen(); + return ops.op_listen(); } /** Accepts a connection, returns rid. */ function accept(serverRid) { - return Deno.core.ops.op_accept(serverRid); + return ops.op_accept(serverRid); +} + +function read(serverRid, buf) { + return ops.op_read_socket(serverRid, buf); } async function serve(rid) { try { while (true) { - await Deno.core.read(rid, requestBuf); - await Deno.core.writeAll(rid, responseBuf); - } - } catch (e) { - if ( - !e.message.includes("Broken pipe") && - !e.message.includes("Connection reset by peer") - ) { - throw e; + await read(rid, requestBuf); + if (!ops.op_try_write(rid, responseBuf)) { + await Deno.core.writeAll(rid, responseBuf); + } } + } catch { + // pass } Deno.core.close(rid); } diff --git a/core/examples/http_bench_json_ops.rs b/core/examples/http_bench_json_ops/main.rs similarity index 71% rename from core/examples/http_bench_json_ops.rs rename to core/examples/http_bench_json_ops/main.rs index 9a55a0823d..f0bbec0d9d 100644 --- a/core/examples/http_bench_json_ops.rs +++ b/core/examples/http_bench_json_ops/main.rs @@ -3,13 +3,11 @@ use deno_core::anyhow::Error; use deno_core::op; use deno_core::AsyncRefCell; use deno_core::AsyncResult; -use deno_core::CancelHandle; -use deno_core::CancelTryFuture; use deno_core::JsRuntime; use deno_core::OpState; -use deno_core::RcRef; use deno_core::Resource; use deno_core::ResourceId; +use deno_core::ZeroCopyBuf; use std::cell::RefCell; use std::env; use std::net::SocketAddr; @@ -22,42 +20,22 @@ use tokio::io::AsyncWriteExt; // You can remove this: use deno_core::*; -struct Logger; - -impl log::Log for Logger { - fn enabled(&self, metadata: &log::Metadata) -> bool { - metadata.level() <= log::max_level() - } - - fn log(&self, record: &log::Record) { - if self.enabled(record.metadata()) { - println!("{} - {}", record.level(), record.args()); - } - } - - fn flush(&self) {} -} - // Note: a `tokio::net::TcpListener` doesn't need to be wrapped in a cell, // because it only supports one op (`accept`) which does not require a mutable // reference to the listener. struct TcpListener { inner: tokio::net::TcpListener, - cancel: CancelHandle, } impl TcpListener { async fn accept(self: Rc) -> Result { - let cancel = RcRef::map(&self, |r| &r.cancel); - let stream = self.inner.accept().try_or_cancel(cancel).await?.0.into(); + let stream = self.inner.accept().await?.0.into(); Ok(stream) } } impl Resource for TcpListener { - fn close(self: Rc) { - self.cancel.cancel(); - } + fn close(self: Rc) {} } impl TryFrom for TcpListener { @@ -67,7 +45,6 @@ impl TryFrom for TcpListener { ) -> Result { tokio::net::TcpListener::try_from(std_listener).map(|tokio_listener| Self { inner: tokio_listener, - cancel: Default::default(), }) } } @@ -75,17 +52,12 @@ impl TryFrom for TcpListener { struct TcpStream { rd: AsyncRefCell, wr: AsyncRefCell, - // When a `TcpStream` resource is closed, all pending 'read' ops are - // canceled, while 'write' ops are allowed to complete. Therefore only - // 'read' futures are attached to this cancel handle. - cancel: CancelHandle, } impl TcpStream { async fn read(self: Rc, data: &mut [u8]) -> Result { let mut rd = RcRef::map(&self, |r| &r.rd).borrow_mut().await; - let cancel = RcRef::map(self, |r| &r.cancel); - let nread = rd.read(data).try_or_cancel(cancel).await?; + let nread = rd.read(data).await?; Ok(nread) } @@ -94,15 +66,21 @@ impl TcpStream { let nwritten = wr.write(data).await?; Ok(nwritten) } + + fn try_write(self: Rc, data: &[u8]) -> Result { + let wr = RcRef::map(self, |r| &r.wr) + .try_borrow_mut() + .ok_or_else(|| Error::msg("Failed to acquire lock on TcpStream"))?; + let nwritten = wr.try_write(data)?; + Ok(nwritten) + } } impl Resource for TcpStream { deno_core::impl_readable_byob!(); deno_core::impl_writable!(); - fn close(self: Rc) { - self.cancel.cancel() - } + fn close(self: Rc) {} } impl From for TcpStream { @@ -111,25 +89,40 @@ impl From for TcpStream { Self { rd: rd.into(), wr: wr.into(), - cancel: Default::default(), } } } fn create_js_runtime() -> JsRuntime { let ext = deno_core::Extension::builder("my_ext") - .ops(vec![op_listen::decl(), op_accept::decl()]) + .ops(vec![ + op_listen::decl(), + op_accept::decl(), + op_try_write::decl(), + op_read_socket::decl(), + ]) .build(); JsRuntime::new(deno_core::RuntimeOptions { extensions: vec![ext], + will_snapshot: false, ..Default::default() }) } +#[op] +async fn op_read_socket( + state: Rc>, + rid: ResourceId, + mut data: ZeroCopyBuf, +) -> Result { + let resource = state.borrow_mut().resource_table.get::(rid)?; + let nread = resource.read(&mut data).await?; + Ok(nread as u32) +} + #[op] fn op_listen(state: &mut OpState) -> Result { - log::debug!("listen"); let addr = "127.0.0.1:4570".parse::().unwrap(); let std_listener = std::net::TcpListener::bind(addr)?; std_listener.set_nonblocking(true)?; @@ -143,32 +136,31 @@ async fn op_accept( state: Rc>, rid: ResourceId, ) -> Result { - log::debug!("accept rid={}", rid); - let listener = state.borrow().resource_table.get::(rid)?; let stream = listener.accept().await?; let rid = state.borrow_mut().resource_table.add(stream); Ok(rid) } -fn main() { - log::set_logger(&Logger).unwrap(); - log::set_max_level( - env::args() - .find(|a| a == "-D") - .map(|_| log::LevelFilter::Debug) - .unwrap_or(log::LevelFilter::Warn), - ); +#[op(fast)] +fn op_try_write( + state: &mut OpState, + rid: u32, + value: &[u8], +) -> Result { + let stream = state.resource_table.get::(rid)?; + Ok(stream.try_write(value).is_ok()) +} +fn main() { // NOTE: `--help` arg will display V8 help and exit deno_core::v8_set_flags(env::args().collect()); let mut js_runtime = create_js_runtime(); let runtime = tokio::runtime::Builder::new_current_thread() - .enable_all() + .enable_io() .build() .unwrap(); - let future = async move { js_runtime .execute_script(