0
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-03-09 21:57:40 -04:00

feat(ext/net): Add abort signal to net tcp connect

This commit is contained in:
DanieleAurilio 2024-12-01 21:10:14 +01:00
parent 28ae134835
commit 03e758a542
3 changed files with 68 additions and 29 deletions

View file

@ -579,23 +579,36 @@ function createListenDatagram(udpOpFn, unixOpFn) {
async function connect(args) { async function connect(args) {
switch (args.transport ?? "tcp") { switch (args.transport ?? "tcp") {
case "tcp": { case "tcp": {
const port = validatePort(args.port); let cancelRid;
const { 0: rid, 1: localAddr, 2: remoteAddr } = await op_net_connect_tcp( let abortHandler;
{ if (args?.signal) {
hostname: args.hostname ?? "127.0.0.1", args.signal.throwIfAborted();
port, cancelRid = createCancelHandle();
}, abortHandler = () => core.tryClose(cancelRid);
args.timeout, args.signal[abortSignal.add](abortHandler);
);
localAddr.transport = "tcp";
remoteAddr.transport = "tcp";
const tcpConn = new TcpConn(rid, remoteAddr, localAddr);
if (args.signal) {
args.signal.addEventListener("abort", () => tcpConn.close());
} }
const port = validatePort(args.port);
return tcpConn; try {
const { 0: rid, 1: localAddr, 2: remoteAddr } =
await op_net_connect_tcp(
{
hostname: args.hostname ?? "127.0.0.1",
port,
},
args.timeout,
cancelRid,
);
localAddr.transport = "tcp";
remoteAddr.transport = "tcp";
return new TcpConn(rid, remoteAddr, localAddr);
} finally {
if (args?.signal) {
args.signal[abortSignal.remove](abortHandler);
args.signal.throwIfAborted();
}
}
} }
case "unix": { case "unix": {
const { 0: rid, 1: localAddr, 2: remoteAddr } = await op_net_connect_unix( const { 0: rid, 1: localAddr, 2: remoteAddr } = await op_net_connect_unix(

View file

@ -373,11 +373,12 @@ pub async fn op_net_connect_tcp<NP>(
state: Rc<RefCell<OpState>>, state: Rc<RefCell<OpState>>,
#[serde] addr: IpAddr, #[serde] addr: IpAddr,
#[serde] timeout: Option<u64>, #[serde] timeout: Option<u64>,
#[smi] resource_abort_id: Option<ResourceId>,
) -> Result<(ResourceId, IpAddr, IpAddr), NetError> ) -> Result<(ResourceId, IpAddr, IpAddr), NetError>
where where
NP: NetPermissions + 'static, NP: NetPermissions + 'static,
{ {
op_net_connect_tcp_inner::<NP>(state, addr, timeout).await op_net_connect_tcp_inner::<NP>(state, addr, timeout, resource_abort_id).await
} }
#[inline] #[inline]
@ -385,6 +386,7 @@ pub async fn op_net_connect_tcp_inner<NP>(
state: Rc<RefCell<OpState>>, state: Rc<RefCell<OpState>>,
addr: IpAddr, addr: IpAddr,
timeout: Option<u64>, timeout: Option<u64>,
resource_abort_id: Option<ResourceId>,
) -> Result<(ResourceId, IpAddr, IpAddr), NetError> ) -> Result<(ResourceId, IpAddr, IpAddr), NetError>
where where
NP: NetPermissions + 'static, NP: NetPermissions + 'static,
@ -400,10 +402,28 @@ where
.await? .await?
.next() .next()
.ok_or_else(|| NetError::NoResolvedAddress)?; .ok_or_else(|| NetError::NoResolvedAddress)?;
let tcp_stream = match TcpStream::connect_timeout(&addr, timeout).await {
let cancel_handle = resource_abort_id.and_then(|rid| {
state
.borrow_mut()
.resource_table
.get::<CancelHandle>(rid)
.ok()
});
let tcp_stream_result = if let Some(cancel_handle) = &cancel_handle {
TcpStream::connect_timeout(&addr, timeout)
.or_cancel(cancel_handle)
.await?
} else {
TcpStream::connect_timeout(&addr, timeout).await
};
let tcp_stream = match tcp_stream_result {
Ok(tcp_stream) => tcp_stream, Ok(tcp_stream) => tcp_stream,
Err(e) => return Err(NetError::Io(e)), Err(e) => return Err(NetError::Io(e)),
}; };
let local_addr = tcp_stream.local_addr()?; let local_addr = tcp_stream.local_addr()?;
let remote_addr = tcp_stream.peer_addr()?; let remote_addr = tcp_stream.peer_addr()?;
@ -1155,9 +1175,10 @@ mod tests {
port: server_addr[1].parse().unwrap(), port: server_addr[1].parse().unwrap(),
}; };
let mut connect_fut = let mut connect_fut = op_net_connect_tcp_inner::<TestPermission>(
op_net_connect_tcp_inner::<TestPermission>(conn_state, ip_addr, None) conn_state, ip_addr, None, None,
.boxed_local(); )
.boxed_local();
let mut rid = None; let mut rid = None;
tokio::select! { tokio::select! {

View file

@ -1295,16 +1295,21 @@ Deno.test(
Deno.test( Deno.test(
{ permissions: { net: true } }, { permissions: { net: true } },
async function netTcpAbortSignal() { async function netTcpWithAbortSignal() {
const controller = new AbortController(); const controller = new AbortController();
await Deno.connect({ setTimeout(() => controller.abort(), 1000);
hostname: "deno.com", const error = await assertRejects(
port: 80, async () => {
transport: "tcp", await Deno.connect({
signal: controller.signal, hostname: "deno.com",
}); port: 50000,
controller.abort(); transport: "tcp",
assertEquals(controller.signal.aborted, true); signal: controller.signal,
});
},
);
assert(error instanceof DOMException);
assertEquals(error.name, "AbortError");
}, },
); );