mirror of
https://mirror.suhoan.cn/https://github.com/EasyTier/EasyTier.git
synced 2025-12-12 12:47:25 +08:00
support ip broadcast (#26)
This commit is contained in:
@@ -71,8 +71,8 @@ easytier-cli route
|
|||||||
- [x] NAT Traverse with relaying.
|
- [x] NAT Traverse with relaying.
|
||||||
- [x] NAT Traverse with UDP hole punching.
|
- [x] NAT Traverse with UDP hole punching.
|
||||||
|
|
||||||
- [ ] Support shared public server. So users can use it without a public server.
|
- [x] Support shared public server. So users can use it without a public server.
|
||||||
|
- [x] Broadcast & Multicast support.
|
||||||
- [ ] Encryption. With noise framework or other method.
|
- [ ] Encryption. With noise framework or other method.
|
||||||
- [ ] Support mobile platforms.
|
- [ ] Support mobile platforms.
|
||||||
- [ ] Broadcast & Multicast support.
|
|
||||||
- [ ] UI tools.
|
- [ ] UI tools.
|
||||||
|
|||||||
@@ -414,25 +414,50 @@ impl PeerManager {
|
|||||||
ipv4_addr
|
ipv4_addr
|
||||||
);
|
);
|
||||||
|
|
||||||
let Some(peer_id) = self.peers.get_peer_id_by_ipv4(&ipv4_addr).await else {
|
let mut dst_peers = vec![];
|
||||||
log::trace!("no peer id for ipv4: {}", ipv4_addr);
|
// NOTE: currently we only support ipv4 and cidr is 24
|
||||||
|
if ipv4_addr.octets()[3] == 255 {
|
||||||
|
dst_peers.extend(
|
||||||
|
self.peers
|
||||||
|
.list_routes()
|
||||||
|
.await
|
||||||
|
.iter()
|
||||||
|
.map(|x| x.key().clone()),
|
||||||
|
);
|
||||||
|
} else if let Some(peer_id) = self.peers.get_peer_id_by_ipv4(&ipv4_addr).await {
|
||||||
|
dst_peers.push(peer_id);
|
||||||
|
}
|
||||||
|
|
||||||
|
if dst_peers.is_empty() {
|
||||||
|
log::error!("no peer id for ipv4: {}", ipv4_addr);
|
||||||
return Ok(());
|
return Ok(());
|
||||||
};
|
}
|
||||||
|
|
||||||
let msg = self.run_nic_packet_process_pipeline(msg).await;
|
let msg = self.run_nic_packet_process_pipeline(msg).await;
|
||||||
self.peers
|
let mut errs: Vec<Error> = vec![];
|
||||||
.send_msg(
|
|
||||||
packet::Packet::new_data_packet(self.my_node_id, peer_id, &msg).into(),
|
|
||||||
&peer_id,
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
log::trace!(
|
for peer_id in dst_peers.iter() {
|
||||||
"do send_msg in peer manager done, dst_peer_id: {:?}",
|
let send_ret = self
|
||||||
peer_id
|
.peers
|
||||||
);
|
.send_msg(
|
||||||
|
packet::Packet::new_data_packet(self.my_node_id, peer_id.clone(), &msg).into(),
|
||||||
|
&peer_id,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
Ok(())
|
if let Err(send_ret) = send_ret {
|
||||||
|
errs.push(send_ret);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tracing::trace!(?dst_peers, "do send_msg in peer manager done");
|
||||||
|
|
||||||
|
if errs.is_empty() {
|
||||||
|
Ok(())
|
||||||
|
} else {
|
||||||
|
tracing::error!(?errs, "send_msg has error");
|
||||||
|
Err(anyhow::anyhow!("send_msg has error: {:?}", errs).into())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn run_clean_peer_without_conn_routine(&self) {
|
async fn run_clean_peer_without_conn_routine(&self) {
|
||||||
|
|||||||
@@ -202,4 +202,17 @@ impl PeerMap {
|
|||||||
self.close_peer(&peer_id).await.unwrap();
|
self.close_peer(&peer_id).await.unwrap();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn list_routes(&self) -> DashMap<PeerId, PeerId> {
|
||||||
|
let route_map = DashMap::new();
|
||||||
|
for route in self.routes.read().await.iter() {
|
||||||
|
for item in route.list_routes().await.iter() {
|
||||||
|
route_map.insert(
|
||||||
|
item.peer_id.parse().unwrap(),
|
||||||
|
item.next_hop_peer_id.parse().unwrap(),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
route_map
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,3 +1,7 @@
|
|||||||
|
use std::sync::{atomic::AtomicU32, Arc};
|
||||||
|
|
||||||
|
use tokio::{net::UdpSocket, task::JoinSet};
|
||||||
|
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
@@ -259,3 +263,52 @@ pub async fn udp_proxy_three_node_test() {
|
|||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
#[serial_test::serial]
|
||||||
|
pub async fn udp_broadcast_test() {
|
||||||
|
let _insts = init_three_node("tcp").await;
|
||||||
|
|
||||||
|
let udp_broadcast_responder = |net_ns: NetNS, counter: Arc<AtomicU32>| async move {
|
||||||
|
let _g = net_ns.guard();
|
||||||
|
let socket: UdpSocket = UdpSocket::bind("0.0.0.0:22111").await.unwrap();
|
||||||
|
socket.set_broadcast(true).unwrap();
|
||||||
|
|
||||||
|
println!("Awaiting responses..."); // self.recv_buff is a [u8; 8092]
|
||||||
|
let mut recv_buff = [0; 8092];
|
||||||
|
while let Ok((n, addr)) = socket.recv_from(&mut recv_buff).await {
|
||||||
|
println!("{} bytes response from {:?}", n, addr);
|
||||||
|
counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
|
||||||
|
// Remaining code not directly relevant to the question
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut tasks = JoinSet::new();
|
||||||
|
let counter = Arc::new(AtomicU32::new(0));
|
||||||
|
tasks.spawn(udp_broadcast_responder(
|
||||||
|
NetNS::new(Some("net_b".into())),
|
||||||
|
counter.clone(),
|
||||||
|
));
|
||||||
|
tasks.spawn(udp_broadcast_responder(
|
||||||
|
NetNS::new(Some("net_c".into())),
|
||||||
|
counter.clone(),
|
||||||
|
));
|
||||||
|
|
||||||
|
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
|
||||||
|
|
||||||
|
// send broadcast
|
||||||
|
let net_ns = NetNS::new(Some("net_a".into()));
|
||||||
|
let _g = net_ns.guard();
|
||||||
|
let socket: UdpSocket = UdpSocket::bind("0.0.0.0:0").await.unwrap();
|
||||||
|
socket.set_broadcast(true).unwrap();
|
||||||
|
// socket.connect(("10.144.144.255", 22111)).await.unwrap();
|
||||||
|
let call: Vec<u8> = vec![1; 1024];
|
||||||
|
println!("Sending call, {} bytes", call.len());
|
||||||
|
match socket.send_to(&call, "10.144.144.255:22111").await {
|
||||||
|
Err(e) => panic!("Error sending call: {:?}", e),
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
|
||||||
|
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
|
||||||
|
assert_eq!(counter.load(std::sync::atomic::Ordering::Relaxed), 2);
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user