diff --git a/README.md b/README.md index e0135e6..277dc87 100644 --- a/README.md +++ b/README.md @@ -71,8 +71,8 @@ easytier-cli route - [x] NAT Traverse with relaying. - [x] NAT Traverse with UDP hole punching. -- [ ] Support shared public server. So users can use it without a public server. +- [x] Support shared public server. So users can use it without a public server. +- [x] Broadcast & Multicast support. - [ ] Encryption. With noise framework or other method. - [ ] Support mobile platforms. -- [ ] Broadcast & Multicast support. - [ ] UI tools. diff --git a/easytier-core/src/peers/peer_manager.rs b/easytier-core/src/peers/peer_manager.rs index 23bd8df..32539f4 100644 --- a/easytier-core/src/peers/peer_manager.rs +++ b/easytier-core/src/peers/peer_manager.rs @@ -414,25 +414,50 @@ impl PeerManager { ipv4_addr ); - let Some(peer_id) = self.peers.get_peer_id_by_ipv4(&ipv4_addr).await else { - log::trace!("no peer id for ipv4: {}", ipv4_addr); + let mut dst_peers = vec![]; + // NOTE: currently we only support ipv4 and cidr is 24 + if ipv4_addr.octets()[3] == 255 { + dst_peers.extend( + self.peers + .list_routes() + .await + .iter() + .map(|x| x.key().clone()), + ); + } else if let Some(peer_id) = self.peers.get_peer_id_by_ipv4(&ipv4_addr).await { + dst_peers.push(peer_id); + } + + if dst_peers.is_empty() { + log::error!("no peer id for ipv4: {}", ipv4_addr); return Ok(()); - }; + } let msg = self.run_nic_packet_process_pipeline(msg).await; - self.peers - .send_msg( - packet::Packet::new_data_packet(self.my_node_id, peer_id, &msg).into(), - &peer_id, - ) - .await?; + let mut errs: Vec = vec![]; - log::trace!( - "do send_msg in peer manager done, dst_peer_id: {:?}", - peer_id - ); + for peer_id in dst_peers.iter() { + let send_ret = self + .peers + .send_msg( + packet::Packet::new_data_packet(self.my_node_id, peer_id.clone(), &msg).into(), + &peer_id, + ) + .await; - Ok(()) + if let Err(send_ret) = send_ret { + errs.push(send_ret); + } + } + + tracing::trace!(?dst_peers, "do send_msg in peer manager done"); + + if errs.is_empty() { + Ok(()) + } else { + tracing::error!(?errs, "send_msg has error"); + Err(anyhow::anyhow!("send_msg has error: {:?}", errs).into()) + } } async fn run_clean_peer_without_conn_routine(&self) { diff --git a/easytier-core/src/peers/peer_map.rs b/easytier-core/src/peers/peer_map.rs index 87a6e53..e550921 100644 --- a/easytier-core/src/peers/peer_map.rs +++ b/easytier-core/src/peers/peer_map.rs @@ -202,4 +202,17 @@ impl PeerMap { self.close_peer(&peer_id).await.unwrap(); } } + + pub async fn list_routes(&self) -> DashMap { + let route_map = DashMap::new(); + for route in self.routes.read().await.iter() { + for item in route.list_routes().await.iter() { + route_map.insert( + item.peer_id.parse().unwrap(), + item.next_hop_peer_id.parse().unwrap(), + ); + } + } + route_map + } } diff --git a/easytier-core/src/tests/three_node.rs b/easytier-core/src/tests/three_node.rs index 3381706..2f89385 100644 --- a/easytier-core/src/tests/three_node.rs +++ b/easytier-core/src/tests/three_node.rs @@ -1,3 +1,7 @@ +use std::sync::{atomic::AtomicU32, Arc}; + +use tokio::{net::UdpSocket, task::JoinSet}; + use super::*; use crate::{ @@ -259,3 +263,52 @@ pub async fn udp_proxy_three_node_test() { ) .await; } + +#[tokio::test] +#[serial_test::serial] +pub async fn udp_broadcast_test() { + let _insts = init_three_node("tcp").await; + + let udp_broadcast_responder = |net_ns: NetNS, counter: Arc| async move { + let _g = net_ns.guard(); + let socket: UdpSocket = UdpSocket::bind("0.0.0.0:22111").await.unwrap(); + socket.set_broadcast(true).unwrap(); + + println!("Awaiting responses..."); // self.recv_buff is a [u8; 8092] + let mut recv_buff = [0; 8092]; + while let Ok((n, addr)) = socket.recv_from(&mut recv_buff).await { + println!("{} bytes response from {:?}", n, addr); + counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + // Remaining code not directly relevant to the question + } + }; + + let mut tasks = JoinSet::new(); + let counter = Arc::new(AtomicU32::new(0)); + tasks.spawn(udp_broadcast_responder( + NetNS::new(Some("net_b".into())), + counter.clone(), + )); + tasks.spawn(udp_broadcast_responder( + NetNS::new(Some("net_c".into())), + counter.clone(), + )); + + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + + // send broadcast + let net_ns = NetNS::new(Some("net_a".into())); + let _g = net_ns.guard(); + let socket: UdpSocket = UdpSocket::bind("0.0.0.0:0").await.unwrap(); + socket.set_broadcast(true).unwrap(); + // socket.connect(("10.144.144.255", 22111)).await.unwrap(); + let call: Vec = vec![1; 1024]; + println!("Sending call, {} bytes", call.len()); + match socket.send_to(&call, "10.144.144.255:22111").await { + Err(e) => panic!("Error sending call: {:?}", e), + _ => {} + } + + tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; + assert_eq!(counter.load(std::sync::atomic::Ordering::Relaxed), 2); +}