From 03e758a5424ad99a007f24ae55f07b32cac9c928 Mon Sep 17 00:00:00 2001 From: DanieleAurilio Date: Sun, 1 Dec 2024 21:10:14 +0100 Subject: [PATCH] feat(ext/net): Add abort signal to net tcp connect --- ext/net/01_net.js | 43 +++++++++++++++++++++++++++--------------- ext/net/ops.rs | 31 +++++++++++++++++++++++++----- tests/unit/net_test.ts | 23 +++++++++++++--------- 3 files changed, 68 insertions(+), 29 deletions(-) diff --git a/ext/net/01_net.js b/ext/net/01_net.js index 13dbce05e3..90a0e5be4d 100644 --- a/ext/net/01_net.js +++ b/ext/net/01_net.js @@ -579,23 +579,36 @@ function createListenDatagram(udpOpFn, unixOpFn) { async function connect(args) { switch (args.transport ?? "tcp") { case "tcp": { - const port = validatePort(args.port); - const { 0: rid, 1: localAddr, 2: remoteAddr } = await op_net_connect_tcp( - { - hostname: args.hostname ?? "127.0.0.1", - port, - }, - args.timeout, - ); - localAddr.transport = "tcp"; - remoteAddr.transport = "tcp"; - - const tcpConn = new TcpConn(rid, remoteAddr, localAddr); - if (args.signal) { - args.signal.addEventListener("abort", () => tcpConn.close()); + let cancelRid; + let abortHandler; + if (args?.signal) { + args.signal.throwIfAborted(); + cancelRid = createCancelHandle(); + abortHandler = () => core.tryClose(cancelRid); + args.signal[abortSignal.add](abortHandler); } + const port = validatePort(args.port); - return tcpConn; + try { + const { 0: rid, 1: localAddr, 2: remoteAddr } = + await op_net_connect_tcp( + { + hostname: args.hostname ?? "127.0.0.1", + port, + }, + args.timeout, + cancelRid, + ); + localAddr.transport = "tcp"; + remoteAddr.transport = "tcp"; + + return new TcpConn(rid, remoteAddr, localAddr); + } finally { + if (args?.signal) { + args.signal[abortSignal.remove](abortHandler); + args.signal.throwIfAborted(); + } + } } case "unix": { const { 0: rid, 1: localAddr, 2: remoteAddr } = await op_net_connect_unix( diff --git a/ext/net/ops.rs b/ext/net/ops.rs index 71931f5d18..f7e1628d66 100644 --- a/ext/net/ops.rs +++ b/ext/net/ops.rs @@ -373,11 +373,12 @@ pub async fn op_net_connect_tcp( state: Rc>, #[serde] addr: IpAddr, #[serde] timeout: Option, + #[smi] resource_abort_id: Option, ) -> Result<(ResourceId, IpAddr, IpAddr), NetError> where NP: NetPermissions + 'static, { - op_net_connect_tcp_inner::(state, addr, timeout).await + op_net_connect_tcp_inner::(state, addr, timeout, resource_abort_id).await } #[inline] @@ -385,6 +386,7 @@ pub async fn op_net_connect_tcp_inner( state: Rc>, addr: IpAddr, timeout: Option, + resource_abort_id: Option, ) -> Result<(ResourceId, IpAddr, IpAddr), NetError> where NP: NetPermissions + 'static, @@ -400,10 +402,28 @@ where .await? .next() .ok_or_else(|| NetError::NoResolvedAddress)?; - let tcp_stream = match TcpStream::connect_timeout(&addr, timeout).await { + + let cancel_handle = resource_abort_id.and_then(|rid| { + state + .borrow_mut() + .resource_table + .get::(rid) + .ok() + }); + + let tcp_stream_result = if let Some(cancel_handle) = &cancel_handle { + TcpStream::connect_timeout(&addr, timeout) + .or_cancel(cancel_handle) + .await? + } else { + TcpStream::connect_timeout(&addr, timeout).await + }; + + let tcp_stream = match tcp_stream_result { Ok(tcp_stream) => tcp_stream, Err(e) => return Err(NetError::Io(e)), }; + let local_addr = tcp_stream.local_addr()?; let remote_addr = tcp_stream.peer_addr()?; @@ -1155,9 +1175,10 @@ mod tests { port: server_addr[1].parse().unwrap(), }; - let mut connect_fut = - op_net_connect_tcp_inner::(conn_state, ip_addr, None) - .boxed_local(); + let mut connect_fut = op_net_connect_tcp_inner::( + conn_state, ip_addr, None, None, + ) + .boxed_local(); let mut rid = None; tokio::select! { diff --git a/tests/unit/net_test.ts b/tests/unit/net_test.ts index 18c70707be..bcd55383c8 100644 --- a/tests/unit/net_test.ts +++ b/tests/unit/net_test.ts @@ -1295,16 +1295,21 @@ Deno.test( Deno.test( { permissions: { net: true } }, - async function netTcpAbortSignal() { + async function netTcpWithAbortSignal() { const controller = new AbortController(); - await Deno.connect({ - hostname: "deno.com", - port: 80, - transport: "tcp", - signal: controller.signal, - }); - controller.abort(); - assertEquals(controller.signal.aborted, true); + setTimeout(() => controller.abort(), 1000); + const error = await assertRejects( + async () => { + await Deno.connect({ + hostname: "deno.com", + port: 50000, + transport: "tcp", + signal: controller.signal, + }); + }, + ); + assert(error instanceof DOMException); + assertEquals(error.name, "AbortError"); }, );