mirror of
https://github.com/denoland/deno.git
synced 2025-03-03 09:31:22 -05:00
feat(ext/net): Add Conn.setNoDelay and Conn.setKeepAlive (#13103)
This commit is contained in:
parent
b7b6b9c9e5
commit
3e566bb457
7 changed files with 240 additions and 0 deletions
1
Cargo.lock
generated
1
Cargo.lock
generated
|
@ -987,6 +987,7 @@ dependencies = [
|
|||
"deno_tls",
|
||||
"log",
|
||||
"serde",
|
||||
"socket2 0.4.2",
|
||||
"tokio",
|
||||
"trust-dns-proto",
|
||||
"trust-dns-resolver",
|
||||
|
|
|
@ -238,6 +238,78 @@ Deno.test({ permissions: { net: true } }, async function netTcpDialListen() {
|
|||
conn.close();
|
||||
});
|
||||
|
||||
Deno.test({ permissions: { net: true } }, async function netTcpSetNoDelay() {
|
||||
const listener = Deno.listen({ port: 3500 });
|
||||
listener.accept().then(
|
||||
async (conn) => {
|
||||
assert(conn.remoteAddr != null);
|
||||
assert(conn.localAddr.transport === "tcp");
|
||||
assertEquals(conn.localAddr.hostname, "127.0.0.1");
|
||||
assertEquals(conn.localAddr.port, 3500);
|
||||
await conn.write(new Uint8Array([1, 2, 3]));
|
||||
conn.close();
|
||||
},
|
||||
);
|
||||
|
||||
const conn = await Deno.connect({ hostname: "127.0.0.1", port: 3500 });
|
||||
conn.setNoDelay(true);
|
||||
assert(conn.remoteAddr.transport === "tcp");
|
||||
assertEquals(conn.remoteAddr.hostname, "127.0.0.1");
|
||||
assertEquals(conn.remoteAddr.port, 3500);
|
||||
assert(conn.localAddr != null);
|
||||
const buf = new Uint8Array(1024);
|
||||
const readResult = await conn.read(buf);
|
||||
assertEquals(3, readResult);
|
||||
assertEquals(1, buf[0]);
|
||||
assertEquals(2, buf[1]);
|
||||
assertEquals(3, buf[2]);
|
||||
assert(conn.rid > 0);
|
||||
|
||||
assert(readResult !== null);
|
||||
|
||||
const readResult2 = await conn.read(buf);
|
||||
assertEquals(readResult2, null);
|
||||
|
||||
listener.close();
|
||||
conn.close();
|
||||
});
|
||||
|
||||
Deno.test({ permissions: { net: true } }, async function netTcpSetKeepAlive() {
|
||||
const listener = Deno.listen({ port: 3500 });
|
||||
listener.accept().then(
|
||||
async (conn) => {
|
||||
assert(conn.remoteAddr != null);
|
||||
assert(conn.localAddr.transport === "tcp");
|
||||
assertEquals(conn.localAddr.hostname, "127.0.0.1");
|
||||
assertEquals(conn.localAddr.port, 3500);
|
||||
await conn.write(new Uint8Array([1, 2, 3]));
|
||||
conn.close();
|
||||
},
|
||||
);
|
||||
|
||||
const conn = await Deno.connect({ hostname: "127.0.0.1", port: 3500 });
|
||||
conn.setKeepAlive(true);
|
||||
assert(conn.remoteAddr.transport === "tcp");
|
||||
assertEquals(conn.remoteAddr.hostname, "127.0.0.1");
|
||||
assertEquals(conn.remoteAddr.port, 3500);
|
||||
assert(conn.localAddr != null);
|
||||
const buf = new Uint8Array(1024);
|
||||
const readResult = await conn.read(buf);
|
||||
assertEquals(3, readResult);
|
||||
assertEquals(1, buf[0]);
|
||||
assertEquals(2, buf[1]);
|
||||
assertEquals(3, buf[2]);
|
||||
assert(conn.rid > 0);
|
||||
|
||||
assert(readResult !== null);
|
||||
|
||||
const readResult2 = await conn.read(buf);
|
||||
assertEquals(readResult2, null);
|
||||
|
||||
listener.close();
|
||||
conn.close();
|
||||
});
|
||||
|
||||
Deno.test(
|
||||
{
|
||||
ignore: Deno.build.os === "windows",
|
||||
|
|
|
@ -95,6 +95,14 @@
|
|||
closeWrite() {
|
||||
return shutdown(this.rid);
|
||||
}
|
||||
|
||||
setNoDelay(nodelay = true) {
|
||||
return core.opSync("op_set_nodelay", this.rid, nodelay);
|
||||
}
|
||||
|
||||
setKeepAlive(keepalive = true) {
|
||||
return core.opSync("op_set_keepalive", this.rid, keepalive);
|
||||
}
|
||||
}
|
||||
|
||||
class Listener {
|
||||
|
|
|
@ -18,6 +18,7 @@ deno_core = { version = "0.117.0", path = "../../core" }
|
|||
deno_tls = { version = "0.22.0", path = "../tls" }
|
||||
log = "0.4.14"
|
||||
serde = { version = "1.0.129", features = ["derive"] }
|
||||
socket2 = "0.4.2"
|
||||
tokio = { version = "1.10.1", features = ["full"] }
|
||||
trust-dns-proto = "0.20.3"
|
||||
trust-dns-resolver = { version = "0.20.3", features = ["tokio-runtime", "serde-config"] }
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
|
||||
|
||||
use deno_core::error::generic_error;
|
||||
use deno_core::error::AnyError;
|
||||
use deno_core::AsyncMutFuture;
|
||||
use deno_core::AsyncRefCell;
|
||||
|
@ -9,6 +10,7 @@ use deno_core::CancelTryFuture;
|
|||
use deno_core::RcRef;
|
||||
use deno_core::Resource;
|
||||
use deno_core::ZeroCopyBuf;
|
||||
use socket2::SockRef;
|
||||
use std::borrow::Cow;
|
||||
use std::rc::Rc;
|
||||
use tokio::io::AsyncRead;
|
||||
|
@ -118,6 +120,34 @@ impl Resource for TcpStreamResource {
|
|||
}
|
||||
}
|
||||
|
||||
impl TcpStreamResource {
|
||||
pub fn set_nodelay(self: Rc<Self>, nodelay: bool) -> Result<(), AnyError> {
|
||||
self.map_socket(Box::new(move |socket| Ok(socket.set_nodelay(nodelay)?)))
|
||||
}
|
||||
|
||||
pub fn set_keepalive(
|
||||
self: Rc<Self>,
|
||||
keepalive: bool,
|
||||
) -> Result<(), AnyError> {
|
||||
self
|
||||
.map_socket(Box::new(move |socket| Ok(socket.set_keepalive(keepalive)?)))
|
||||
}
|
||||
|
||||
fn map_socket(
|
||||
self: Rc<Self>,
|
||||
map: Box<dyn FnOnce(SockRef) -> Result<(), AnyError>>,
|
||||
) -> Result<(), AnyError> {
|
||||
if let Some(wr) = RcRef::map(self, |r| &r.wr).try_borrow() {
|
||||
let stream = wr.as_ref().as_ref();
|
||||
let socket = socket2::SockRef::from(stream);
|
||||
|
||||
return map(socket);
|
||||
}
|
||||
|
||||
Err(generic_error("Unable to get resources"))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(unix)]
|
||||
pub type UnixStreamResource =
|
||||
FullDuplexResource<unix::OwnedReadHalf, unix::OwnedWriteHalf>;
|
||||
|
|
4
ext/net/lib.deno_net.d.ts
vendored
4
ext/net/lib.deno_net.d.ts
vendored
|
@ -50,6 +50,10 @@ declare namespace Deno {
|
|||
/** Shuts down (`shutdown(2)`) the write side of the connection. Most
|
||||
* callers should just use `close()`. */
|
||||
closeWrite(): Promise<void>;
|
||||
/** Enable/disable the use of Nagle's algorithm. Defaults to true */
|
||||
setNoDelay(nodelay?: boolean): void;
|
||||
/** Enable/disable keep-alive functionality */
|
||||
setKeepAlive(keepalive?: boolean): void;
|
||||
}
|
||||
|
||||
// deno-lint-ignore no-empty-interface
|
||||
|
|
124
ext/net/ops.rs
124
ext/net/ops.rs
|
@ -55,6 +55,8 @@ pub fn init<P: NetPermissions + 'static>() -> Vec<OpPair> {
|
|||
("op_dgram_recv", op_async(op_dgram_recv)),
|
||||
("op_dgram_send", op_async(op_dgram_send::<P>)),
|
||||
("op_dns_resolve", op_async(op_dns_resolve::<P>)),
|
||||
("op_set_nodelay", op_sync(op_set_nodelay::<P>)),
|
||||
("op_set_keepalive", op_sync(op_set_keepalive::<P>)),
|
||||
]
|
||||
}
|
||||
|
||||
|
@ -665,6 +667,26 @@ where
|
|||
Ok(results)
|
||||
}
|
||||
|
||||
pub fn op_set_nodelay<NP>(
|
||||
state: &mut OpState,
|
||||
rid: ResourceId,
|
||||
nodelay: bool,
|
||||
) -> Result<(), AnyError> {
|
||||
let resource: Rc<TcpStreamResource> =
|
||||
state.resource_table.get::<TcpStreamResource>(rid)?;
|
||||
resource.set_nodelay(nodelay)
|
||||
}
|
||||
|
||||
pub fn op_set_keepalive<NP>(
|
||||
state: &mut OpState,
|
||||
rid: ResourceId,
|
||||
keepalive: bool,
|
||||
) -> Result<(), AnyError> {
|
||||
let resource: Rc<TcpStreamResource> =
|
||||
state.resource_table.get::<TcpStreamResource>(rid)?;
|
||||
resource.set_keepalive(keepalive)
|
||||
}
|
||||
|
||||
fn rdata_to_return_record(
|
||||
ty: RecordType,
|
||||
) -> impl Fn(&RData) -> Option<DnsReturnRecord> {
|
||||
|
@ -717,8 +739,13 @@ fn rdata_to_return_record(
|
|||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use deno_core::Extension;
|
||||
use deno_core::JsRuntime;
|
||||
use deno_core::RuntimeOptions;
|
||||
use socket2::SockRef;
|
||||
use std::net::Ipv4Addr;
|
||||
use std::net::Ipv6Addr;
|
||||
use std::path::Path;
|
||||
use trust_dns_proto::rr::rdata::mx::MX;
|
||||
use trust_dns_proto::rr::rdata::srv::SRV;
|
||||
use trust_dns_proto::rr::rdata::txt::TXT;
|
||||
|
@ -810,4 +837,101 @@ mod tests {
|
|||
]))
|
||||
);
|
||||
}
|
||||
|
||||
struct TestPermission {}
|
||||
|
||||
impl NetPermissions for TestPermission {
|
||||
fn check_net<T: AsRef<str>>(
|
||||
&mut self,
|
||||
_host: &(T, Option<u16>),
|
||||
) -> Result<(), AnyError> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn check_read(&mut self, _p: &Path) -> Result<(), AnyError> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn check_write(&mut self, _p: &Path) -> Result<(), AnyError> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
|
||||
async fn tcp_set_no_delay() {
|
||||
let set_nodelay = Box::new(|state: &mut OpState, rid| {
|
||||
op_set_nodelay::<TestPermission>(state, rid, true).unwrap();
|
||||
});
|
||||
let test_fn = Box::new(|socket: SockRef| {
|
||||
assert!(socket.nodelay().unwrap());
|
||||
assert!(!socket.keepalive().unwrap());
|
||||
});
|
||||
check_sockopt(String::from("127.0.0.1:4245"), set_nodelay, test_fn).await;
|
||||
}
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
|
||||
async fn tcp_set_keepalive() {
|
||||
let set_keepalive = Box::new(|state: &mut OpState, rid| {
|
||||
op_set_keepalive::<TestPermission>(state, rid, true).unwrap();
|
||||
});
|
||||
let test_fn = Box::new(|socket: SockRef| {
|
||||
assert!(!socket.nodelay().unwrap());
|
||||
assert!(socket.keepalive().unwrap());
|
||||
});
|
||||
check_sockopt(String::from("127.0.0.1:4246"), set_keepalive, test_fn).await;
|
||||
}
|
||||
|
||||
async fn check_sockopt(
|
||||
addr: String,
|
||||
set_sockopt_fn: Box<dyn Fn(&mut OpState, u32)>,
|
||||
test_fn: Box<dyn FnOnce(SockRef)>,
|
||||
) {
|
||||
let clone_addr = addr.clone();
|
||||
tokio::spawn(async move {
|
||||
let listener = TcpListener::bind(addr).await.unwrap();
|
||||
let _ = listener.accept().await;
|
||||
});
|
||||
let my_ext = Extension::builder()
|
||||
.state(move |state| {
|
||||
state.put(TestPermission {});
|
||||
Ok(())
|
||||
})
|
||||
.build();
|
||||
|
||||
let mut runtime = JsRuntime::new(RuntimeOptions {
|
||||
extensions: vec![my_ext],
|
||||
..Default::default()
|
||||
});
|
||||
|
||||
let conn_state = runtime.op_state();
|
||||
|
||||
let server_addr: Vec<&str> = clone_addr.split(':').collect();
|
||||
let ip_args = IpListenArgs {
|
||||
hostname: String::from(server_addr[0]),
|
||||
port: server_addr[1].parse().unwrap(),
|
||||
};
|
||||
let connect_args = ConnectArgs {
|
||||
transport: String::from("tcp"),
|
||||
transport_args: ArgsEnum::Ip(ip_args),
|
||||
};
|
||||
|
||||
let connect_fut =
|
||||
op_net_connect::<TestPermission>(conn_state, connect_args, ());
|
||||
let conn = connect_fut.await.unwrap();
|
||||
|
||||
let rid = conn.rid;
|
||||
let state = runtime.op_state();
|
||||
set_sockopt_fn(&mut state.borrow_mut(), rid);
|
||||
|
||||
let resource = state
|
||||
.borrow_mut()
|
||||
.resource_table
|
||||
.get::<TcpStreamResource>(rid)
|
||||
.unwrap();
|
||||
|
||||
let wr = resource.wr_borrow_mut().await;
|
||||
let stream = wr.as_ref().as_ref();
|
||||
let socket = socket2::SockRef::from(stream);
|
||||
test_fn(socket);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue