1
0
Fork 0
mirror of https://github.com/denoland/deno.git synced 2025-01-21 21:50:00 -05:00

feat(ext/net): Add multicasting APIs to DatagramConn (#10706) (#17811)

This commit is contained in:
Sam Gwilym 2023-03-20 21:27:00 +00:00 committed by GitHub
parent cd53ab5427
commit 4c34a2f2df
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 409 additions and 4 deletions

View file

@ -425,6 +425,141 @@ Deno.test(
},
);
Deno.test(
{ permissions: { net: true }, ignore: true },
async function netUdpMulticastV4() {
const listener = Deno.listenDatagram({
hostname: "0.0.0.0",
port: 5353,
transport: "udp",
reuseAddress: true,
});
const membership = await listener.joinMulticastV4(
"224.0.0.251",
"127.0.0.1",
);
membership.setLoopback(true);
membership.setLoopback(false);
membership.setTTL(50);
membership.leave();
listener.close();
},
);
Deno.test(
{ permissions: { net: true }, ignore: true },
async function netUdpMulticastV6() {
const listener = Deno.listenDatagram({
hostname: "::",
port: 5353,
transport: "udp",
reuseAddress: true,
});
const membership = await listener.joinMulticastV6(
"ff02::fb",
1,
);
membership.setLoopback(true);
membership.setLoopback(false);
membership.leave();
listener.close();
},
);
Deno.test(
{ permissions: { net: true }, ignore: true },
async function netUdpSendReceiveMulticastv4() {
const alice = Deno.listenDatagram({
hostname: "0.0.0.0",
port: 5353,
transport: "udp",
reuseAddress: true,
loopback: true,
});
const bob = Deno.listenDatagram({
hostname: "0.0.0.0",
port: 5353,
transport: "udp",
reuseAddress: true,
});
const aliceMembership = await alice.joinMulticastV4(
"224.0.0.1",
"0.0.0.0",
);
const bobMembership = await bob.joinMulticastV4("224.0.0.1", "0.0.0.0");
const sent = new Uint8Array([1, 2, 3]);
await alice.send(sent, {
hostname: "224.0.0.1",
port: 5353,
transport: "udp",
});
const [recvd, remote] = await bob.receive();
assert(remote.transport === "udp");
assertEquals(remote.port, 5353);
assertEquals(recvd.length, 3);
assertEquals(1, recvd[0]);
assertEquals(2, recvd[1]);
assertEquals(3, recvd[2]);
aliceMembership.leave();
bobMembership.leave();
alice.close();
bob.close();
},
);
Deno.test(
{ permissions: { net: true }, ignore: true },
async function netUdpMulticastLoopbackOption() {
// Must bind sender to an address that can send to the broadcast address on MacOS.
// Macos will give us error 49 when sending the broadcast packet if we omit hostname here.
const listener = Deno.listenDatagram({
port: 5353,
transport: "udp",
hostname: "0.0.0.0",
loopback: true,
reuseAddress: true,
});
const membership = await listener.joinMulticastV4(
"224.0.0.1",
"0.0.0.0",
);
// await membership.setLoopback(true);
const sent = new Uint8Array([1, 2, 3]);
const byteLength = await listener.send(sent, {
hostname: "224.0.0.1",
port: 5353,
transport: "udp",
});
assertEquals(byteLength, 3);
const [recvd, remote] = await listener.receive();
assert(remote.transport === "udp");
assertEquals(remote.port, 5353);
assertEquals(recvd.length, 3);
assertEquals(1, recvd[0]);
assertEquals(2, recvd[1]);
assertEquals(3, recvd[2]);
membership.leave();
listener.close();
},
);
Deno.test(
{ permissions: { net: true } },
async function netUdpConcurrentSendReceive() {

View file

@ -163,7 +163,7 @@ declare namespace Deno {
*/
type ToNativeResultType<T extends NativeResultType = NativeResultType> =
T extends NativeStructType ? BufferSource
: ToNativeResultTypeMap[Exclude<T, NativeStructType>];
: ToNativeResultTypeMap[Exclude<T, NativeStructType>];
/** **UNSTABLE**: New API, yet to be vetted.
*
@ -225,7 +225,7 @@ declare namespace Deno {
*/
type FromNativeResultType<T extends NativeResultType = NativeResultType> =
T extends NativeStructType ? Uint8Array
: FromNativeResultTypeMap[Exclude<T, NativeStructType>];
: FromNativeResultTypeMap[Exclude<T, NativeStructType>];
/** **UNSTABLE**: New API, yet to be vetted.
*
@ -850,6 +850,34 @@ declare namespace Deno {
options: CreateHttpClientOptions,
): HttpClient;
/** **UNSTABLE**: New API, yet to be vetted.
*
* Represents membership of a IPv4 multicast group.
*
* @category Network
*/
interface MulticastV4Membership {
/** Leaves the multicast group. */
leave: () => Promise<void>;
/** Sets the multicast loopback option. If enabled, multicast packets will be looped back to the local socket. */
setLoopback: (loopback: boolean) => Promise<void>;
/** Sets the time-to-live of outgoing multicast packets for this socket. */
setTTL: (ttl: number) => Promise<void>;
}
/** **UNSTABLE**: New API, yet to be vetted.
*
* Represents membership of a IPv6 multicast group.
*
* @category Network
*/
interface MulticastV6Membership {
/** Leaves the multicast group. */
leave: () => Promise<void>;
/** Sets the multicast loopback option. If enabled, multicast packets will be looped back to the local socket. */
setLoopback: (loopback: boolean) => Promise<void>;
}
/** **UNSTABLE**: New API, yet to be vetted.
*
* A generic transport listener for message-oriented protocols.
@ -857,6 +885,18 @@ declare namespace Deno {
* @category Network
*/
export interface DatagramConn extends AsyncIterable<[Uint8Array, Addr]> {
/** Joins an IPv4 multicast group. */
joinMulticastV4(
address: string,
networkInterface: string,
): Promise<MulticastV4Membership>;
/** Joins an IPv6 multicast group. */
joinMulticastV6(
address: string,
networkInterface: number,
): Promise<MulticastV6Membership>;
/** Waits for and resolves to the next message to the instance.
*
* Messages are received in the format of a tuple containing the data array
@ -918,6 +958,11 @@ declare namespace Deno {
*
* @default {false} */
reuseAddress?: boolean;
/** When `true`, sent multicast packets will be looped back to the local socket.
*
* @default {false} */
loopback?: boolean;
}
/** **UNSTABLE**: New API, yet to be vetted.

View file

@ -277,6 +277,64 @@ class Datagram {
return this.#addr;
}
async joinMulticastV4(addr, multiInterface) {
await core.opAsync(
"op_net_join_multi_v4_udp",
this.rid,
addr,
multiInterface,
);
return {
leave: () =>
core.opAsync(
"op_net_leave_multi_v4_udp",
this.rid,
addr,
multiInterface,
),
setLoopback: (loopback) =>
core.opAsync(
"op_net_set_multi_loopback_udp",
this.rid,
true,
loopback,
),
setTTL: (ttl) =>
core.opAsync(
"op_net_set_multi_ttl_udp",
this.rid,
ttl,
),
};
}
async joinMulticastV6(addr, multiInterface) {
await core.opAsync(
"op_net_join_multi_v6_udp",
this.rid,
addr,
multiInterface,
);
return {
leave: () =>
core.opAsync(
"op_net_leave_multi_v6_udp",
this.rid,
addr,
multiInterface,
),
setLoopback: (loopback) =>
core.opAsync(
"op_net_set_multi_loopback_udp",
this.rid,
false,
loopback,
),
};
}
async receive(p) {
const buf = p || new Uint8Array(this.bufSize);
let nread;
@ -383,6 +441,7 @@ function createListenDatagram(udpOpFn, unixOpFn) {
port: args.port,
},
args.reuseAddress ?? false,
args.loopback ?? false,
);
addr.transport = "udp";
return new Datagram(rid, addr);

View file

@ -86,6 +86,12 @@ deno_core::extension!(deno_net,
ops::op_node_unstable_net_listen_udp<P>,
ops::op_net_recv_udp,
ops::op_net_send_udp<P>,
ops::op_net_join_multi_v4_udp<P>,
ops::op_net_join_multi_v6_udp<P>,
ops::op_net_leave_multi_v4_udp<P>,
ops::op_net_leave_multi_v6_udp<P>,
ops::op_net_set_multi_loopback_udp<P>,
ops::op_net_set_multi_ttl_udp<P>,
ops::op_dns_resolve<P>,
ops::op_set_nodelay,
ops::op_set_keepalive,

View file

@ -28,8 +28,11 @@ use socket2::Socket;
use socket2::Type;
use std::borrow::Cow;
use std::cell::RefCell;
use std::net::Ipv4Addr;
use std::net::Ipv6Addr;
use std::net::SocketAddr;
use std::rc::Rc;
use std::str::FromStr;
use tokio::net::TcpListener;
use tokio::net::TcpStream;
use tokio::net::UdpSocket;
@ -155,6 +158,151 @@ where
Ok(nwritten)
}
#[op]
async fn op_net_join_multi_v4_udp<NP>(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
address: String,
multi_interface: String,
) -> Result<(), AnyError>
where
NP: NetPermissions + 'static,
{
let resource = state
.borrow_mut()
.resource_table
.get::<UdpSocketResource>(rid)
.map_err(|_| bad_resource("Socket has been closed"))?;
let socket = RcRef::map(&resource, |r| &r.socket).borrow().await;
let addr = Ipv4Addr::from_str(address.as_str())?;
let interface_addr = Ipv4Addr::from_str(multi_interface.as_str())?;
socket.join_multicast_v4(addr, interface_addr)?;
Ok(())
}
#[op]
async fn op_net_join_multi_v6_udp<NP>(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
address: String,
multi_interface: u32,
) -> Result<(), AnyError>
where
NP: NetPermissions + 'static,
{
let resource = state
.borrow_mut()
.resource_table
.get::<UdpSocketResource>(rid)
.map_err(|_| bad_resource("Socket has been closed"))?;
let socket = RcRef::map(&resource, |r| &r.socket).borrow().await;
let addr = Ipv6Addr::from_str(address.as_str())?;
socket.join_multicast_v6(&addr, multi_interface)?;
Ok(())
}
#[op]
async fn op_net_leave_multi_v4_udp<NP>(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
address: String,
multi_interface: String,
) -> Result<(), AnyError>
where
NP: NetPermissions + 'static,
{
let resource = state
.borrow_mut()
.resource_table
.get::<UdpSocketResource>(rid)
.map_err(|_| bad_resource("Socket has been closed"))?;
let socket = RcRef::map(&resource, |r| &r.socket).borrow().await;
let addr = Ipv4Addr::from_str(address.as_str())?;
let interface_addr = Ipv4Addr::from_str(multi_interface.as_str())?;
socket.leave_multicast_v4(addr, interface_addr)?;
Ok(())
}
#[op]
async fn op_net_leave_multi_v6_udp<NP>(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
address: String,
multi_interface: u32,
) -> Result<(), AnyError>
where
NP: NetPermissions + 'static,
{
let resource = state
.borrow_mut()
.resource_table
.get::<UdpSocketResource>(rid)
.map_err(|_| bad_resource("Socket has been closed"))?;
let socket = RcRef::map(&resource, |r| &r.socket).borrow().await;
let addr = Ipv6Addr::from_str(address.as_str())?;
socket.leave_multicast_v6(&addr, multi_interface)?;
Ok(())
}
#[op]
async fn op_net_set_multi_loopback_udp<NP>(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
is_v4_membership: bool,
loopback: bool,
) -> Result<(), AnyError>
where
NP: NetPermissions + 'static,
{
let resource = state
.borrow_mut()
.resource_table
.get::<UdpSocketResource>(rid)
.map_err(|_| bad_resource("Socket has been closed"))?;
let socket = RcRef::map(&resource, |r| &r.socket).borrow().await;
if is_v4_membership {
socket.set_multicast_loop_v4(loopback)?
} else {
socket.set_multicast_loop_v6(loopback)?;
}
Ok(())
}
#[op]
async fn op_net_set_multi_ttl_udp<NP>(
state: Rc<RefCell<OpState>>,
rid: ResourceId,
ttl: u32,
) -> Result<(), AnyError>
where
NP: NetPermissions + 'static,
{
let resource = state
.borrow_mut()
.resource_table
.get::<UdpSocketResource>(rid)
.map_err(|_| bad_resource("Socket has been closed"))?;
let socket = RcRef::map(&resource, |r| &r.socket).borrow().await;
socket.set_multicast_ttl_v4(ttl)?;
Ok(())
}
#[op]
pub async fn op_net_connect_tcp<NP>(
state: Rc<RefCell<OpState>>,
@ -266,6 +414,7 @@ fn net_listen_udp<NP>(
state: &mut OpState,
addr: IpAddr,
reuse_address: bool,
loopback: bool,
) -> Result<(ResourceId, IpAddr), AnyError>
where
NP: NetPermissions + 'static,
@ -301,9 +450,18 @@ where
let socket_addr = socket2::SockAddr::from(addr);
socket_tmp.bind(&socket_addr)?;
socket_tmp.set_nonblocking(true)?;
// Enable messages to be sent to the broadcast address (255.255.255.255) by default
socket_tmp.set_broadcast(true)?;
if domain == Domain::IPV4 {
socket_tmp.set_multicast_loop_v4(loopback)?;
} else {
socket_tmp.set_multicast_loop_v6(loopback)?;
}
let std_socket: std::net::UdpSocket = socket_tmp.into();
let socket = UdpSocket::from_std(std_socket)?;
let local_addr = socket.local_addr()?;
let socket_resource = UdpSocketResource {
@ -320,12 +478,13 @@ fn op_net_listen_udp<NP>(
state: &mut OpState,
addr: IpAddr,
reuse_address: bool,
loopback: bool,
) -> Result<(ResourceId, IpAddr), AnyError>
where
NP: NetPermissions + 'static,
{
super::check_unstable(state, "Deno.listenDatagram");
net_listen_udp::<NP>(state, addr, reuse_address)
net_listen_udp::<NP>(state, addr, reuse_address, loopback)
}
#[op]
@ -333,11 +492,12 @@ fn op_node_unstable_net_listen_udp<NP>(
state: &mut OpState,
addr: IpAddr,
reuse_address: bool,
loopback: bool,
) -> Result<(ResourceId, IpAddr), AnyError>
where
NP: NetPermissions + 'static,
{
net_listen_udp::<NP>(state, addr, reuse_address)
net_listen_udp::<NP>(state, addr, reuse_address, loopback)
}
#[derive(Serialize, Eq, PartialEq, Debug)]