diff --git a/easytier-core/src/instance/instance.rs b/easytier-core/src/instance/instance.rs index bd2b44b..5cc418b 100644 --- a/easytier-core/src/instance/instance.rs +++ b/easytier-core/src/instance/instance.rs @@ -21,7 +21,6 @@ use crate::connector::udp_hole_punch::UdpHolePunchConnector; use crate::gateway::icmp_proxy::IcmpProxy; use crate::gateway::tcp_proxy::TcpProxy; use crate::peers::peer_manager::PeerManager; -use crate::peers::rip_route::BasicRoute; use crate::peers::rpc_service::PeerManagerRpcService; use crate::tunnels::SinkItem; @@ -274,9 +273,6 @@ impl Instance { self.listener_manager.lock().await.run().await?; self.peer_manager.run().await?; - let route = BasicRoute::new(self.id(), self.global_ctx.clone()); - self.peer_manager.set_route(route).await; - self.run_rpc_server().unwrap(); self.tcp_proxy.start().await.unwrap(); diff --git a/easytier-core/src/peers/mod.rs b/easytier-core/src/peers/mod.rs index aa6e5f8..e3bf8ee 100644 --- a/easytier-core/src/peers/mod.rs +++ b/easytier-core/src/peers/mod.rs @@ -3,8 +3,8 @@ pub mod peer; pub mod peer_conn; pub mod peer_manager; pub mod peer_map; +pub mod peer_rip_route; pub mod peer_rpc; -pub mod rip_route; pub mod route_trait; pub mod rpc_service; diff --git a/easytier-core/src/peers/peer_manager.rs b/easytier-core/src/peers/peer_manager.rs index 39ad32b..fd1fc04 100644 --- a/easytier-core/src/peers/peer_manager.rs +++ b/easytier-core/src/peers/peer_manager.rs @@ -1,8 +1,4 @@ -use std::{ - fmt::Debug, - net::Ipv4Addr, - sync::{atomic::AtomicU8, Arc}, -}; +use std::{fmt::Debug, net::Ipv4Addr, sync::Arc}; use async_trait::async_trait; use futures::{StreamExt, TryFutureExt}; @@ -32,6 +28,7 @@ use crate::{ use super::{ peer_map::PeerMap, + peer_rip_route::BasicRoute, peer_rpc::PeerRpcManager, route_trait::{ArcRoute, Route}, PeerId, @@ -43,8 +40,6 @@ struct RpcTransport { packet_recv: Mutex>, peer_rpc_tspt_sender: UnboundedSender, - - route: Arc>>, } #[async_trait::async_trait] @@ -54,14 +49,8 @@ impl PeerRpcManagerTransport for RpcTransport { } async fn send(&self, msg: Bytes, dst_peer_id: &uuid::Uuid) -> Result<(), Error> { - let route = self.route.lock().await; - if route.is_none() { - log::error!("no route info when send rpc msg"); - return Err(Error::RouteError("No route info".to_string())); - } - self.peers - .send_msg(msg, dst_peer_id, route.as_ref().unwrap().clone()) + .send_msg(msg, dst_peer_id) .map_err(|e| e.into()) .await } @@ -104,14 +93,14 @@ pub struct PeerManager { packet_recv: Arc>>>, peers: Arc, - route: Arc>>, - cur_route_id: AtomicU8, peer_rpc_mgr: Arc, peer_rpc_tspt: Arc, peer_packet_process_pipeline: Arc>>, nic_packet_process_pipeline: Arc>>, + + basic_route: Arc, } impl Debug for PeerManager { @@ -120,7 +109,6 @@ impl Debug for PeerManager { .field("my_node_id", &self.my_node_id) .field("instance_name", &self.global_ctx.inst_name) .field("net_ns", &self.global_ctx.net_ns.name()) - .field("cur_route_id", &self.cur_route_id) .finish() } } @@ -137,9 +125,10 @@ impl PeerManager { peers: peers.clone(), packet_recv: Mutex::new(peer_rpc_tspt_recv), peer_rpc_tspt_sender, - route: Arc::new(Mutex::new(None)), }); + let basic_route = Arc::new(BasicRoute::new(global_ctx.get_id(), global_ctx.clone())); + PeerManager { my_node_id: global_ctx.get_id(), global_ctx, @@ -150,14 +139,14 @@ impl PeerManager { packet_recv: Arc::new(Mutex::new(Some(packet_recv))), peers: peers.clone(), - route: Arc::new(Mutex::new(None)), - cur_route_id: AtomicU8::new(0), peer_rpc_mgr: Arc::new(PeerRpcManager::new(rpc_tspt.clone())), peer_rpc_tspt: rpc_tspt, peer_packet_process_pipeline: Arc::new(RwLock::new(Vec::new())), nic_packet_process_pipeline: Arc::new(RwLock::new(Vec::new())), + + basic_route, } } @@ -196,7 +185,6 @@ impl PeerManager { let mut recv = ReceiverStream::new(self.packet_recv.lock().await.take().unwrap()); let my_node_id = self.my_node_id; let peers = self.peers.clone(); - let arc_route = self.route.clone(); let pipe_line = self.peer_packet_process_pipeline.clone(); self.tasks.lock().await.spawn(async move { log::trace!("start_peer_recv"); @@ -206,22 +194,12 @@ impl PeerManager { let from_peer_uuid = packet.from_peer.to_uuid(); let to_peer_uuid = packet.to_peer.as_ref().unwrap().to_uuid(); if to_peer_uuid != my_node_id { - let locked_arc_route = arc_route.lock().await; - if locked_arc_route.is_none() { - log::error!("no route info after recv a packet"); - continue; - } - - let route = locked_arc_route.as_ref().unwrap().clone(); - drop(locked_arc_route); log::trace!( "need forward: to_peer_uuid: {:?}, my_uuid: {:?}", to_peer_uuid, my_node_id ); - let ret = peers - .send_msg(ret.clone(), &to_peer_uuid, route.clone()) - .await; + let ret = peers.send_msg(ret.clone(), &to_peer_uuid).await; if ret.is_err() { log::error!( "forward packet error: {:?}, dst: {:?}, from: {:?}", @@ -295,46 +273,9 @@ impl PeerManager { })) .await; - // for peer manager router packet - struct RoutePacketProcessor { - route: Arc>>, - } - #[async_trait::async_trait] - impl PeerPacketFilter for RoutePacketProcessor { - async fn try_process_packet_from_peer( - &self, - packet: &packet::ArchivedPacket, - data: &Bytes, - ) -> Option<()> { - if let ArchivedPacketBody::Ctrl(packet::ArchivedCtrlPacketBody::RoutePacket( - route_packet, - )) = &packet.body - { - let r = self.route.lock().await; - match r.as_ref() { - Some(x) => { - let x = x.clone(); - drop(r); - x.handle_route_packet( - packet.from_peer.to_uuid(), - extract_bytes_from_archived_vec(&data, &route_packet.body), - ) - .await; - } - None => { - log::error!("no route info when handle route packet"); - } - } - Some(()) - } else { - None - } - } - } - self.add_packet_process_pipeline(Box::new(RoutePacketProcessor { - route: self.route.clone(), - })) - .await; + // for route + self.add_packet_process_pipeline(Box::new(self.basic_route.clone())) + .await; // for peer rpc packet struct PeerRpcPacketProcessor { @@ -364,7 +305,7 @@ impl PeerManager { .await; } - pub async fn set_route(&self, route: T) + pub async fn add_route(&self, route: T) where T: Route + Send + Sync + 'static, { @@ -400,7 +341,7 @@ impl PeerManager { } let my_node_id = self.my_node_id; - let route_id = route + let _route_id = route .open(Box::new(Interface { my_node_id, peers: self.peers.clone(), @@ -408,28 +349,12 @@ impl PeerManager { .await .unwrap(); - self.cur_route_id - .store(route_id, std::sync::atomic::Ordering::Relaxed); let arc_route: ArcRoute = Arc::new(Box::new(route)); - - self.route.lock().await.replace(arc_route.clone()); - - self.peer_rpc_tspt - .route - .lock() - .await - .replace(arc_route.clone()); + self.peers.add_route(arc_route).await; } pub async fn list_routes(&self) -> Vec { - let route_info = self.route.lock().await; - if route_info.is_none() { - return Vec::new(); - } - - let route = route_info.as_ref().unwrap().clone(); - drop(route_info); - route.list_routes().await + self.basic_route.list_routes().await } async fn run_nic_packet_process_pipeline(&self, mut data: BytesMut) -> BytesMut { @@ -440,45 +365,33 @@ impl PeerManager { } pub async fn send_msg(&self, msg: Bytes, dst_peer_id: &PeerId) -> Result<(), Error> { - self.peer_rpc_tspt.send(msg, dst_peer_id).await + self.peers.send_msg(msg, dst_peer_id).await } pub async fn send_msg_ipv4(&self, msg: BytesMut, ipv4_addr: Ipv4Addr) -> Result<(), Error> { - let route_info = self.route.lock().await; - if route_info.is_none() { - log::error!("no route info"); - return Err(Error::RouteError("No route info".to_string())); - } - - let route = route_info.as_ref().unwrap().clone(); - drop(route_info); - log::trace!( "do send_msg in peer manager, msg: {:?}, ipv4_addr: {}", msg, ipv4_addr ); - match route.get_peer_id_by_ipv4(&ipv4_addr).await { - Some(peer_id) => { - let msg = self.run_nic_packet_process_pipeline(msg).await; - self.peers - .send_msg( - packet::Packet::new_data_packet(self.my_node_id, peer_id, &msg).into(), - &peer_id, - route.clone(), - ) - .await?; - log::trace!( - "do send_msg in peer manager done, dst_peer_id: {:?}", - peer_id - ); - } - None => { - log::trace!("no peer id for ipv4: {}", ipv4_addr); - return Ok(()); - } - } + let Some(peer_id) = self.peers.get_peer_id_by_ipv4(&ipv4_addr).await else { + log::trace!("no peer id for ipv4: {}", ipv4_addr); + return Ok(()); + }; + + let msg = self.run_nic_packet_process_pipeline(msg).await; + self.peers + .send_msg( + packet::Packet::new_data_packet(self.my_node_id, peer_id, &msg).into(), + &peer_id, + ) + .await?; + + log::trace!( + "do send_msg in peer manager done, dst_peer_id: {:?}", + peer_id + ); Ok(()) } @@ -506,6 +419,8 @@ impl PeerManager { } pub async fn run(&self) -> Result<(), Error> { + self.add_route(self.basic_route.clone()).await; + self.init_packet_process_pipeline().await; self.start_peer_recv().await; self.peer_rpc_mgr.run(); diff --git a/easytier-core/src/peers/peer_map.rs b/easytier-core/src/peers/peer_map.rs index c9fa745..3a7922a 100644 --- a/easytier-core/src/peers/peer_map.rs +++ b/easytier-core/src/peers/peer_map.rs @@ -1,9 +1,9 @@ -use std::sync::Arc; +use std::{net::Ipv4Addr, sync::Arc}; use anyhow::Context; use dashmap::DashMap; use easytier_rpc::PeerConnInfo; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, RwLock}; use tokio_util::bytes::Bytes; use crate::{ @@ -17,6 +17,7 @@ pub struct PeerMap { global_ctx: ArcGlobalCtx, peer_map: DashMap>, packet_send: mpsc::Sender, + routes: RwLock>, } impl PeerMap { @@ -25,6 +26,7 @@ impl PeerMap { global_ctx, peer_map: DashMap::new(), packet_send, + routes: RwLock::new(Vec::new()), } } @@ -75,12 +77,7 @@ impl PeerMap { Ok(()) } - pub async fn send_msg( - &self, - msg: Bytes, - dst_peer_id: &uuid::Uuid, - route: ArcRoute, - ) -> Result<(), Error> { + pub async fn send_msg(&self, msg: Bytes, dst_peer_id: &uuid::Uuid) -> Result<(), Error> { if *dst_peer_id == self.global_ctx.get_id() { return Ok(self .packet_send @@ -90,17 +87,34 @@ impl PeerMap { } // get route info - let gateway_peer_id = route.get_next_hop(dst_peer_id).await; - - if gateway_peer_id.is_none() { - log::error!("no gateway for dst_peer_id: {}", dst_peer_id); - return Ok(()); + let mut gateway_peer_id = None; + for route in self.routes.read().await.iter() { + gateway_peer_id = route.get_next_hop(dst_peer_id).await; + if gateway_peer_id.is_none() { + continue; + } else { + break; + } } - let gateway_peer_id = gateway_peer_id.unwrap(); - self.send_msg_directly(msg, &gateway_peer_id).await?; + let Some(gateway_peer_id) = gateway_peer_id else { + log::error!("no gateway for dst_peer_id: {}", dst_peer_id); + return Ok(()); + }; - Ok(()) + self.send_msg_directly(msg.clone(), &gateway_peer_id) + .await?; + return Ok(()); + } + + pub async fn get_peer_id_by_ipv4(&self, ipv4: &Ipv4Addr) -> Option { + for route in self.routes.read().await.iter() { + let peer_id = route.get_peer_id_by_ipv4(ipv4).await; + if peer_id.is_some() { + return peer_id; + } + } + None } pub async fn list_peers(&self) -> Vec { @@ -156,4 +170,9 @@ impl PeerMap { ); Ok(()) } + + pub async fn add_route(&self, route: ArcRoute) { + let mut routes = self.routes.write().await; + routes.insert(0, route); + } } diff --git a/easytier-core/src/peers/rip_route.rs b/easytier-core/src/peers/peer_rip_route.rs similarity index 95% rename from easytier-core/src/peers/rip_route.rs rename to easytier-core/src/peers/peer_rip_route.rs index c0efd23..c22eaa4 100644 --- a/easytier-core/src/peers/rip_route.rs +++ b/easytier-core/src/peers/peer_rip_route.rs @@ -13,7 +13,7 @@ use crate::{ common::{ error::Error, global_ctx::ArcGlobalCtx, - rkyv_util::{decode_from_bytes, encode_to_bytes}, + rkyv_util::{decode_from_bytes, encode_to_bytes, extract_bytes_from_archived_vec}, stun::StunInfoCollectorTrait, }, peers::{ @@ -23,6 +23,8 @@ use crate::{ }, }; +use super::{packet::ArchivedPacketBody, peer_manager::PeerPacketFilter}; + #[derive(Archive, Deserialize, Serialize, Clone, Debug)] #[archive(compare(PartialEq), check_bytes)] // Derives can be passed through to the generated type: @@ -373,18 +375,6 @@ impl BasicRoute { } None } -} - -#[async_trait] -impl Route for BasicRoute { - async fn open(&self, interface: RouteInterfaceBox) -> Result { - *self.interface.lock().await = Some(interface); - self.sync_peer_periodically().await; - self.check_expired_sync_peer_from_remote().await; - Ok(1) - } - - async fn close(&self) {} #[tracing::instrument(skip(self, packet), fields(my_id = ?self.my_peer_id, ctx = ?self.global_ctx))] async fn handle_route_packet(&self, src_peer_id: uuid::Uuid, packet: Bytes) { @@ -416,20 +406,19 @@ impl Route for BasicRoute { self.need_sync_notifier.notify_one(); } } +} - async fn get_peer_id_by_ipv4(&self, ipv4_addr: &Ipv4Addr) -> Option { - if let Some(peer_id) = self.route_table.ipv4_peer_id_map.get(ipv4_addr) { - return Some(*peer_id); - } - - if let Some(peer_id) = self.get_peer_id_for_proxy(ipv4_addr) { - return Some(peer_id); - } - - log::info!("no peer id for ipv4: {}", ipv4_addr); - return None; +#[async_trait] +impl Route for BasicRoute { + async fn open(&self, interface: RouteInterfaceBox) -> Result { + *self.interface.lock().await = Some(interface); + self.sync_peer_periodically().await; + self.check_expired_sync_peer_from_remote().await; + Ok(1) } + async fn close(&self) {} + async fn get_next_hop(&self, dst_peer_id: &PeerId) -> Option { match self.route_table.route_info.get(dst_peer_id) { Some(info) => { @@ -477,4 +466,39 @@ impl Route for BasicRoute { routes } + + async fn get_peer_id_by_ipv4(&self, ipv4_addr: &Ipv4Addr) -> Option { + if let Some(peer_id) = self.route_table.ipv4_peer_id_map.get(ipv4_addr) { + return Some(*peer_id); + } + + if let Some(peer_id) = self.get_peer_id_for_proxy(ipv4_addr) { + return Some(peer_id); + } + + log::info!("no peer id for ipv4: {}", ipv4_addr); + return None; + } +} + +#[async_trait::async_trait] +impl PeerPacketFilter for BasicRoute { + async fn try_process_packet_from_peer( + &self, + packet: &packet::ArchivedPacket, + data: &Bytes, + ) -> Option<()> { + if let ArchivedPacketBody::Ctrl(packet::ArchivedCtrlPacketBody::RoutePacket(route_packet)) = + &packet.body + { + self.handle_route_packet( + packet.from_peer.to_uuid(), + extract_bytes_from_archived_vec(&data, &route_packet.body), + ) + .await; + Some(()) + } else { + None + } + } } diff --git a/easytier-core/src/peers/route_trait.rs b/easytier-core/src/peers/route_trait.rs index e328724..ef1ac28 100644 --- a/easytier-core/src/peers/route_trait.rs +++ b/easytier-core/src/peers/route_trait.rs @@ -21,16 +21,17 @@ pub trait RouteInterface { pub type RouteInterfaceBox = Box; #[async_trait] +#[auto_impl::auto_impl(Box, Arc)] pub trait Route { async fn open(&self, interface: RouteInterfaceBox) -> Result; async fn close(&self); - async fn get_peer_id_by_ipv4(&self, ipv4: &Ipv4Addr) -> Option; async fn get_next_hop(&self, peer_id: &PeerId) -> Option; - - async fn handle_route_packet(&self, src_peer_id: PeerId, packet: Bytes); - async fn list_routes(&self) -> Vec; + + async fn get_peer_id_by_ipv4(&self, _ipv4: &Ipv4Addr) -> Option { + None + } } pub type ArcRoute = Arc>; diff --git a/easytier-core/src/peers/tests.rs b/easytier-core/src/peers/tests.rs index dda7a83..163da2e 100644 --- a/easytier-core/src/peers/tests.rs +++ b/easytier-core/src/peers/tests.rs @@ -2,7 +2,6 @@ use std::sync::Arc; use crate::{ common::{error::Error, global_ctx::tests::get_mock_global_ctx}, - peers::rip_route::BasicRoute, tunnels::ring_tunnel::create_ring_tunnel_pair, }; @@ -11,12 +10,6 @@ use super::peer_manager::PeerManager; pub async fn create_mock_peer_manager() -> Arc { let (s, _r) = tokio::sync::mpsc::channel(1000); let peer_mgr = Arc::new(PeerManager::new(get_mock_global_ctx(), s)); - peer_mgr - .set_route(BasicRoute::new( - peer_mgr.my_node_id(), - peer_mgr.get_global_ctx(), - )) - .await; peer_mgr.run().await.unwrap(); peer_mgr }