From 3e566bb457663cec57602e564f73ded817e426a8 Mon Sep 17 00:00:00 2001 From: Yosi Pramajaya Date: Mon, 31 Jan 2022 22:36:54 +0700 Subject: [PATCH] feat(ext/net): Add Conn.setNoDelay and Conn.setKeepAlive (#13103) --- Cargo.lock | 1 + cli/tests/unit/net_test.ts | 72 +++++++++++++++++++++ ext/net/01_net.js | 8 +++ ext/net/Cargo.toml | 1 + ext/net/io.rs | 30 +++++++++ ext/net/lib.deno_net.d.ts | 4 ++ ext/net/ops.rs | 124 +++++++++++++++++++++++++++++++++++++ 7 files changed, 240 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 0d89411c7a..c1ad8fcbf6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -987,6 +987,7 @@ dependencies = [ "deno_tls", "log", "serde", + "socket2 0.4.2", "tokio", "trust-dns-proto", "trust-dns-resolver", diff --git a/cli/tests/unit/net_test.ts b/cli/tests/unit/net_test.ts index 79b83f8543..0522026763 100644 --- a/cli/tests/unit/net_test.ts +++ b/cli/tests/unit/net_test.ts @@ -238,6 +238,78 @@ Deno.test({ permissions: { net: true } }, async function netTcpDialListen() { conn.close(); }); +Deno.test({ permissions: { net: true } }, async function netTcpSetNoDelay() { + const listener = Deno.listen({ port: 3500 }); + listener.accept().then( + async (conn) => { + assert(conn.remoteAddr != null); + assert(conn.localAddr.transport === "tcp"); + assertEquals(conn.localAddr.hostname, "127.0.0.1"); + assertEquals(conn.localAddr.port, 3500); + await conn.write(new Uint8Array([1, 2, 3])); + conn.close(); + }, + ); + + const conn = await Deno.connect({ hostname: "127.0.0.1", port: 3500 }); + conn.setNoDelay(true); + assert(conn.remoteAddr.transport === "tcp"); + assertEquals(conn.remoteAddr.hostname, "127.0.0.1"); + assertEquals(conn.remoteAddr.port, 3500); + assert(conn.localAddr != null); + const buf = new Uint8Array(1024); + const readResult = await conn.read(buf); + assertEquals(3, readResult); + assertEquals(1, buf[0]); + assertEquals(2, buf[1]); + assertEquals(3, buf[2]); + assert(conn.rid > 0); + + assert(readResult !== null); + + const readResult2 = await conn.read(buf); + assertEquals(readResult2, null); + + listener.close(); + conn.close(); +}); + +Deno.test({ permissions: { net: true } }, async function netTcpSetKeepAlive() { + const listener = Deno.listen({ port: 3500 }); + listener.accept().then( + async (conn) => { + assert(conn.remoteAddr != null); + assert(conn.localAddr.transport === "tcp"); + assertEquals(conn.localAddr.hostname, "127.0.0.1"); + assertEquals(conn.localAddr.port, 3500); + await conn.write(new Uint8Array([1, 2, 3])); + conn.close(); + }, + ); + + const conn = await Deno.connect({ hostname: "127.0.0.1", port: 3500 }); + conn.setKeepAlive(true); + assert(conn.remoteAddr.transport === "tcp"); + assertEquals(conn.remoteAddr.hostname, "127.0.0.1"); + assertEquals(conn.remoteAddr.port, 3500); + assert(conn.localAddr != null); + const buf = new Uint8Array(1024); + const readResult = await conn.read(buf); + assertEquals(3, readResult); + assertEquals(1, buf[0]); + assertEquals(2, buf[1]); + assertEquals(3, buf[2]); + assert(conn.rid > 0); + + assert(readResult !== null); + + const readResult2 = await conn.read(buf); + assertEquals(readResult2, null); + + listener.close(); + conn.close(); +}); + Deno.test( { ignore: Deno.build.os === "windows", diff --git a/ext/net/01_net.js b/ext/net/01_net.js index 4a4005954b..6f54ec9990 100644 --- a/ext/net/01_net.js +++ b/ext/net/01_net.js @@ -95,6 +95,14 @@ closeWrite() { return shutdown(this.rid); } + + setNoDelay(nodelay = true) { + return core.opSync("op_set_nodelay", this.rid, nodelay); + } + + setKeepAlive(keepalive = true) { + return core.opSync("op_set_keepalive", this.rid, keepalive); + } } class Listener { diff --git a/ext/net/Cargo.toml b/ext/net/Cargo.toml index fe4efa3869..b4f6434572 100644 --- a/ext/net/Cargo.toml +++ b/ext/net/Cargo.toml @@ -18,6 +18,7 @@ deno_core = { version = "0.117.0", path = "../../core" } deno_tls = { version = "0.22.0", path = "../tls" } log = "0.4.14" serde = { version = "1.0.129", features = ["derive"] } +socket2 = "0.4.2" tokio = { version = "1.10.1", features = ["full"] } trust-dns-proto = "0.20.3" trust-dns-resolver = { version = "0.20.3", features = ["tokio-runtime", "serde-config"] } diff --git a/ext/net/io.rs b/ext/net/io.rs index 9673a4a896..17b86af17e 100644 --- a/ext/net/io.rs +++ b/ext/net/io.rs @@ -1,5 +1,6 @@ // Copyright 2018-2022 the Deno authors. All rights reserved. MIT license. +use deno_core::error::generic_error; use deno_core::error::AnyError; use deno_core::AsyncMutFuture; use deno_core::AsyncRefCell; @@ -9,6 +10,7 @@ use deno_core::CancelTryFuture; use deno_core::RcRef; use deno_core::Resource; use deno_core::ZeroCopyBuf; +use socket2::SockRef; use std::borrow::Cow; use std::rc::Rc; use tokio::io::AsyncRead; @@ -118,6 +120,34 @@ impl Resource for TcpStreamResource { } } +impl TcpStreamResource { + pub fn set_nodelay(self: Rc, nodelay: bool) -> Result<(), AnyError> { + self.map_socket(Box::new(move |socket| Ok(socket.set_nodelay(nodelay)?))) + } + + pub fn set_keepalive( + self: Rc, + keepalive: bool, + ) -> Result<(), AnyError> { + self + .map_socket(Box::new(move |socket| Ok(socket.set_keepalive(keepalive)?))) + } + + fn map_socket( + self: Rc, + map: Box Result<(), AnyError>>, + ) -> Result<(), AnyError> { + if let Some(wr) = RcRef::map(self, |r| &r.wr).try_borrow() { + let stream = wr.as_ref().as_ref(); + let socket = socket2::SockRef::from(stream); + + return map(socket); + } + + Err(generic_error("Unable to get resources")) + } +} + #[cfg(unix)] pub type UnixStreamResource = FullDuplexResource; diff --git a/ext/net/lib.deno_net.d.ts b/ext/net/lib.deno_net.d.ts index c00cc1e444..ebed8ac873 100644 --- a/ext/net/lib.deno_net.d.ts +++ b/ext/net/lib.deno_net.d.ts @@ -50,6 +50,10 @@ declare namespace Deno { /** Shuts down (`shutdown(2)`) the write side of the connection. Most * callers should just use `close()`. */ closeWrite(): Promise; + /** Enable/disable the use of Nagle's algorithm. Defaults to true */ + setNoDelay(nodelay?: boolean): void; + /** Enable/disable keep-alive functionality */ + setKeepAlive(keepalive?: boolean): void; } // deno-lint-ignore no-empty-interface diff --git a/ext/net/ops.rs b/ext/net/ops.rs index 05085b4018..f64b79ba7c 100644 --- a/ext/net/ops.rs +++ b/ext/net/ops.rs @@ -55,6 +55,8 @@ pub fn init() -> Vec { ("op_dgram_recv", op_async(op_dgram_recv)), ("op_dgram_send", op_async(op_dgram_send::

)), ("op_dns_resolve", op_async(op_dns_resolve::

)), + ("op_set_nodelay", op_sync(op_set_nodelay::

)), + ("op_set_keepalive", op_sync(op_set_keepalive::

)), ] } @@ -665,6 +667,26 @@ where Ok(results) } +pub fn op_set_nodelay( + state: &mut OpState, + rid: ResourceId, + nodelay: bool, +) -> Result<(), AnyError> { + let resource: Rc = + state.resource_table.get::(rid)?; + resource.set_nodelay(nodelay) +} + +pub fn op_set_keepalive( + state: &mut OpState, + rid: ResourceId, + keepalive: bool, +) -> Result<(), AnyError> { + let resource: Rc = + state.resource_table.get::(rid)?; + resource.set_keepalive(keepalive) +} + fn rdata_to_return_record( ty: RecordType, ) -> impl Fn(&RData) -> Option { @@ -717,8 +739,13 @@ fn rdata_to_return_record( #[cfg(test)] mod tests { use super::*; + use deno_core::Extension; + use deno_core::JsRuntime; + use deno_core::RuntimeOptions; + use socket2::SockRef; use std::net::Ipv4Addr; use std::net::Ipv6Addr; + use std::path::Path; use trust_dns_proto::rr::rdata::mx::MX; use trust_dns_proto::rr::rdata::srv::SRV; use trust_dns_proto::rr::rdata::txt::TXT; @@ -810,4 +837,101 @@ mod tests { ])) ); } + + struct TestPermission {} + + impl NetPermissions for TestPermission { + fn check_net>( + &mut self, + _host: &(T, Option), + ) -> Result<(), AnyError> { + Ok(()) + } + + fn check_read(&mut self, _p: &Path) -> Result<(), AnyError> { + Ok(()) + } + + fn check_write(&mut self, _p: &Path) -> Result<(), AnyError> { + Ok(()) + } + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn tcp_set_no_delay() { + let set_nodelay = Box::new(|state: &mut OpState, rid| { + op_set_nodelay::(state, rid, true).unwrap(); + }); + let test_fn = Box::new(|socket: SockRef| { + assert!(socket.nodelay().unwrap()); + assert!(!socket.keepalive().unwrap()); + }); + check_sockopt(String::from("127.0.0.1:4245"), set_nodelay, test_fn).await; + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn tcp_set_keepalive() { + let set_keepalive = Box::new(|state: &mut OpState, rid| { + op_set_keepalive::(state, rid, true).unwrap(); + }); + let test_fn = Box::new(|socket: SockRef| { + assert!(!socket.nodelay().unwrap()); + assert!(socket.keepalive().unwrap()); + }); + check_sockopt(String::from("127.0.0.1:4246"), set_keepalive, test_fn).await; + } + + async fn check_sockopt( + addr: String, + set_sockopt_fn: Box, + test_fn: Box, + ) { + let clone_addr = addr.clone(); + tokio::spawn(async move { + let listener = TcpListener::bind(addr).await.unwrap(); + let _ = listener.accept().await; + }); + let my_ext = Extension::builder() + .state(move |state| { + state.put(TestPermission {}); + Ok(()) + }) + .build(); + + let mut runtime = JsRuntime::new(RuntimeOptions { + extensions: vec![my_ext], + ..Default::default() + }); + + let conn_state = runtime.op_state(); + + let server_addr: Vec<&str> = clone_addr.split(':').collect(); + let ip_args = IpListenArgs { + hostname: String::from(server_addr[0]), + port: server_addr[1].parse().unwrap(), + }; + let connect_args = ConnectArgs { + transport: String::from("tcp"), + transport_args: ArgsEnum::Ip(ip_args), + }; + + let connect_fut = + op_net_connect::(conn_state, connect_args, ()); + let conn = connect_fut.await.unwrap(); + + let rid = conn.rid; + let state = runtime.op_state(); + set_sockopt_fn(&mut state.borrow_mut(), rid); + + let resource = state + .borrow_mut() + .resource_table + .get::(rid) + .unwrap(); + + let wr = resource.wr_borrow_mut().await; + let stream = wr.as_ref().as_ref(); + let socket = socket2::SockRef::from(stream); + test_fn(socket); + } }