diff --git a/easytier-core/src/common/global_ctx.rs b/easytier-core/src/common/global_ctx.rs index 4887eca..9cf60c1 100644 --- a/easytier-core/src/common/global_ctx.rs +++ b/easytier-core/src/common/global_ctx.rs @@ -2,6 +2,8 @@ use std::{io::Write, sync::Arc}; use crate::rpc::PeerConnInfo; use crossbeam::atomic::AtomicCell; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; use super::{ config_fs::ConfigFs, @@ -12,8 +14,8 @@ use super::{ #[derive(Debug, Clone, PartialEq)] pub enum GlobalCtxEvent { - PeerAdded, - PeerRemoved, + PeerAdded(Uuid), + PeerRemoved(Uuid), PeerConnAdded(PeerConnInfo), PeerConnRemoved(PeerConnInfo), } @@ -21,11 +23,31 @@ pub enum GlobalCtxEvent { type EventBus = tokio::sync::broadcast::Sender; type EventBusSubscriber = tokio::sync::broadcast::Receiver; +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)] +pub struct NetworkIdentity { + pub network_name: String, + pub network_secret: String, +} + +impl NetworkIdentity { + pub fn new(network_name: String, network_secret: String) -> Self { + NetworkIdentity { + network_name, + network_secret, + } + } + + pub fn default() -> Self { + Self::new("default".to_string(), "".to_string()) + } +} + pub struct GlobalCtx { pub inst_name: String, pub id: uuid::Uuid, pub config_fs: ConfigFs, pub net_ns: NetNS, + pub network: NetworkIdentity, event_bus: EventBus, @@ -54,11 +76,17 @@ impl std::fmt::Debug for GlobalCtx { pub type ArcGlobalCtx = std::sync::Arc; impl GlobalCtx { - pub fn new(inst_name: &str, config_fs: ConfigFs, net_ns: NetNS) -> Self { + pub fn new( + inst_name: &str, + config_fs: ConfigFs, + net_ns: NetNS, + network: Option, + ) -> Self { let id = config_fs .get_or_add_file("inst_id", || uuid::Uuid::new_v4().to_string()) .unwrap(); let id = uuid::Uuid::parse_str(&id).unwrap(); + let network = network.unwrap_or(NetworkIdentity::default()); let (event_bus, _) = tokio::sync::broadcast::channel(100); @@ -67,6 +95,8 @@ impl GlobalCtx { id, config_fs, net_ns: net_ns.clone(), + network, + event_bus, cached_ipv4: AtomicCell::new(None), cached_proxy_cidrs: AtomicCell::new(None), @@ -193,6 +223,10 @@ impl GlobalCtx { pub fn get_id(&self) -> uuid::Uuid { self.id } + + pub fn get_network_identity(&self) -> NetworkIdentity { + self.network.clone() + } } #[cfg(test)] @@ -203,18 +237,22 @@ pub mod tests { async fn test_global_ctx() { let config_fs = ConfigFs::new("/tmp/easytier"); let net_ns = NetNS::new(None); - let global_ctx = GlobalCtx::new("test", config_fs, net_ns); + let global_ctx = GlobalCtx::new("test", config_fs, net_ns, None); let mut subscriber = global_ctx.subscribe(); - global_ctx.issue_event(GlobalCtxEvent::PeerAdded); - global_ctx.issue_event(GlobalCtxEvent::PeerRemoved); + let uuid = Uuid::new_v4(); + global_ctx.issue_event(GlobalCtxEvent::PeerAdded(uuid.clone())); + global_ctx.issue_event(GlobalCtxEvent::PeerRemoved(uuid.clone())); global_ctx.issue_event(GlobalCtxEvent::PeerConnAdded(PeerConnInfo::default())); global_ctx.issue_event(GlobalCtxEvent::PeerConnRemoved(PeerConnInfo::default())); - assert_eq!(subscriber.recv().await.unwrap(), GlobalCtxEvent::PeerAdded); assert_eq!( subscriber.recv().await.unwrap(), - GlobalCtxEvent::PeerRemoved + GlobalCtxEvent::PeerAdded(uuid.clone()) + ); + assert_eq!( + subscriber.recv().await.unwrap(), + GlobalCtxEvent::PeerRemoved(uuid.clone()) ); assert_eq!( subscriber.recv().await.unwrap(), @@ -226,7 +264,9 @@ pub mod tests { ); } - pub fn get_mock_global_ctx() -> ArcGlobalCtx { + pub fn get_mock_global_ctx_with_network( + network_identy: Option, + ) -> ArcGlobalCtx { let node_id = uuid::Uuid::new_v4(); let config_fs = ConfigFs::new_with_dir(node_id.to_string().as_str(), "/tmp/easytier"); let net_ns = NetNS::new(None); @@ -234,6 +274,11 @@ pub mod tests { format!("test_{}", node_id).as_str(), config_fs, net_ns, + network_identy, )) } + + pub fn get_mock_global_ctx() -> ArcGlobalCtx { + get_mock_global_ctx_with_network(None) + } } diff --git a/easytier-core/src/connector/manual.rs b/easytier-core/src/connector/manual.rs index 8ea9808..1465397 100644 --- a/easytier-core/src/connector/manual.rs +++ b/easytier-core/src/connector/manual.rs @@ -220,8 +220,8 @@ impl ManualConnectorManager { log::warn!("peer conn removed: {:?}", conn_info); } - GlobalCtxEvent::PeerAdded => todo!(), - GlobalCtxEvent::PeerRemoved => todo!(), + GlobalCtxEvent::PeerAdded(..) => todo!(), + GlobalCtxEvent::PeerRemoved(..) => todo!(), } } diff --git a/easytier-core/src/connector/udp_hole_punch.rs b/easytier-core/src/connector/udp_hole_punch.rs index 961046c..fade1d8 100644 --- a/easytier-core/src/connector/udp_hole_punch.rs +++ b/easytier-core/src/connector/udp_hole_punch.rs @@ -499,12 +499,18 @@ pub mod tests { } } + pub fn replace_stun_info_collector(peer_mgr: Arc, udp_nat_type: NatType) { + let collector = Box::new(MockStunInfoCollector { udp_nat_type }); + peer_mgr + .get_global_ctx() + .replace_stun_info_collector(collector); + } + pub async fn create_mock_peer_manager_with_mock_stun( udp_nat_type: NatType, ) -> Arc { let p_a = create_mock_peer_manager().await; - let collector = Box::new(MockStunInfoCollector { udp_nat_type }); - p_a.get_global_ctx().replace_stun_info_collector(collector); + replace_stun_info_collector(p_a.clone(), udp_nat_type); p_a } diff --git a/easytier-core/src/instance/instance.rs b/easytier-core/src/instance/instance.rs index c452cd0..2636212 100644 --- a/easytier-core/src/instance/instance.rs +++ b/easytier-core/src/instance/instance.rs @@ -116,7 +116,7 @@ impl Instance { let (peer_packet_sender, peer_packet_receiver) = tokio::sync::mpsc::channel(100); - let global_ctx = Arc::new(GlobalCtx::new(inst_name, config, net_ns.clone())); + let global_ctx = Arc::new(GlobalCtx::new(inst_name, config, net_ns.clone(), None)); let id = global_ctx.get_id(); diff --git a/easytier-core/src/peers/foreign_network_client.rs b/easytier-core/src/peers/foreign_network_client.rs new file mode 100644 index 0000000..ae877aa --- /dev/null +++ b/easytier-core/src/peers/foreign_network_client.rs @@ -0,0 +1,184 @@ +use std::{ + sync::Arc, + time::{Duration, SystemTime}, +}; + +use dashmap::DashMap; +use tokio::{ + sync::{mpsc, Mutex}, + task::JoinSet, +}; +use tokio_util::bytes::Bytes; + +use crate::common::{ + error::Error, + global_ctx::{ArcGlobalCtx, NetworkIdentity}, +}; + +use super::{ + foreign_network_manager::{ForeignNetworkServiceClient, FOREIGN_NETWORK_SERVICE_ID}, + peer_conn::PeerConn, + peer_map::PeerMap, + peer_rpc::PeerRpcManager, + PeerId, +}; + +pub struct ForeignNetworkClient { + global_ctx: ArcGlobalCtx, + peer_rpc: Arc, + + peer_map: Arc, + + next_hop: Arc>, + tasks: Mutex>, +} + +impl ForeignNetworkClient { + pub fn new( + global_ctx: ArcGlobalCtx, + packet_sender_to_mgr: mpsc::Sender, + peer_rpc: Arc, + ) -> Self { + let peer_map = Arc::new(PeerMap::new(packet_sender_to_mgr, global_ctx.clone())); + let next_hop = Arc::new(DashMap::new()); + + Self { + global_ctx, + peer_rpc, + + peer_map, + + next_hop, + tasks: Mutex::new(JoinSet::new()), + } + } + + pub async fn add_new_peer_conn(&self, peer_conn: PeerConn) { + tracing::warn!(peer_conn = ?peer_conn.get_conn_info(), network = ?peer_conn.get_network_identity(), "add new peer conn in foreign network client"); + self.peer_map.add_new_peer_conn(peer_conn).await + } + + async fn collect_next_hop_in_foreign_network_task( + network_identity: NetworkIdentity, + peer_map: Arc, + peer_rpc: Arc, + next_hop: Arc>, + ) { + loop { + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + + peer_map.clean_peer_without_conn().await; + + let new_next_hop = Self::collect_next_hop_in_foreign_network( + network_identity.clone(), + peer_map.clone(), + peer_rpc.clone(), + ) + .await; + + next_hop.clear(); + for (k, v) in new_next_hop.into_iter() { + next_hop.insert(k, v); + } + } + } + + async fn collect_next_hop_in_foreign_network( + network_identity: NetworkIdentity, + peer_map: Arc, + peer_rpc: Arc, + ) -> DashMap { + let peers = peer_map.list_peers().await; + let mut tasks = JoinSet::new(); + if !peers.is_empty() { + tracing::warn!(?peers, "collect next hop in foreign network"); + } + for peer in peers { + let peer_rpc = peer_rpc.clone(); + let network_identity = network_identity.clone(); + tasks.spawn(async move { + let Ok(Some(peers_in_foreign)) = peer_rpc + .do_client_rpc_scoped(FOREIGN_NETWORK_SERVICE_ID, peer, |c| async { + let c = + ForeignNetworkServiceClient::new(tarpc::client::Config::default(), c) + .spawn(); + let mut rpc_ctx = tarpc::context::current(); + rpc_ctx.deadline = SystemTime::now() + Duration::from_secs(2); + let ret = c.list_network_peers(rpc_ctx, network_identity).await; + ret + }) + .await + else { + return (peer, vec![]); + }; + + (peer, peers_in_foreign) + }); + } + + let new_next_hop = DashMap::new(); + while let Some(join_ret) = tasks.join_next().await { + let Ok((gateway, peer_ids)) = join_ret else { + tracing::error!(?join_ret, "collect next hop in foreign network failed"); + continue; + }; + for ret in peer_ids { + new_next_hop.insert(ret, gateway); + } + } + + new_next_hop + } + + pub fn has_next_hop(&self, peer_id: &PeerId) -> bool { + self.get_next_hop(peer_id).is_some() + } + + pub fn get_next_hop(&self, peer_id: &PeerId) -> Option { + if self.peer_map.has_peer(peer_id) { + return Some(peer_id.clone()); + } + self.next_hop.get(peer_id).map(|v| v.clone()) + } + + pub async fn send_msg(&self, msg: Bytes, peer_id: &PeerId) -> Result<(), Error> { + if let Some(next_hop) = self.get_next_hop(peer_id) { + return self.peer_map.send_msg_directly(msg, &next_hop).await; + } + Err(Error::RouteError("no next hop".to_string())) + } + + pub fn list_foreign_peers(&self) -> Vec { + let mut peers = vec![]; + for item in self.next_hop.iter() { + if item.key() != &self.global_ctx.get_id() { + peers.push(item.key().clone()); + } + } + peers + } + + pub async fn run(&self) { + self.tasks + .lock() + .await + .spawn(Self::collect_next_hop_in_foreign_network_task( + self.global_ctx.get_network_identity(), + self.peer_map.clone(), + self.peer_rpc.clone(), + self.next_hop.clone(), + )); + } + + pub fn get_next_hop_table(&self) -> DashMap { + let next_hop = DashMap::new(); + for item in self.next_hop.iter() { + next_hop.insert(item.key().clone(), item.value().clone()); + } + next_hop + } + + pub fn get_peer_map(&self) -> Arc { + self.peer_map.clone() + } +} diff --git a/easytier-core/src/peers/foreign_network_manager.rs b/easytier-core/src/peers/foreign_network_manager.rs new file mode 100644 index 0000000..412d529 --- /dev/null +++ b/easytier-core/src/peers/foreign_network_manager.rs @@ -0,0 +1,430 @@ +/* +foreign_network_manager is used to forward packets of other networks. currently +only forward packets of peers that directly connected to this node. + +in future, with the help wo peer center we can forward packets of peers that +connected to any node in the local network. +*/ +use std::sync::Arc; + +use dashmap::DashMap; +use tokio::{ + sync::{ + mpsc::{self, UnboundedReceiver, UnboundedSender}, + Mutex, + }, + task::JoinSet, +}; +use tokio_util::bytes::Bytes; +use uuid::Uuid; + +use crate::common::{ + error::Error, + global_ctx::{ArcGlobalCtx, GlobalCtxEvent, NetworkIdentity}, +}; + +use super::{ + packet::{self, ArchivedPacketBody}, + peer_conn::PeerConn, + peer_map::PeerMap, + peer_rpc::{PeerRpcManager, PeerRpcManagerTransport}, + PeerId, +}; + +struct ForeignNetworkEntry { + network: NetworkIdentity, + peer_map: Arc, +} + +impl ForeignNetworkEntry { + fn new( + network: NetworkIdentity, + packet_sender: mpsc::Sender, + global_ctx: ArcGlobalCtx, + ) -> Self { + let peer_map = Arc::new(PeerMap::new(packet_sender, global_ctx)); + Self { network, peer_map } + } +} + +struct ForeignNetworkManagerData { + network_peer_maps: DashMap>, + peer_network_map: DashMap, +} + +impl ForeignNetworkManagerData { + async fn send_msg(&self, msg: Bytes, dst_peer_id: &uuid::Uuid) -> Result<(), Error> { + let network_name = self + .peer_network_map + .get(dst_peer_id) + .ok_or_else(|| Error::RouteError("network not found".to_string()))? + .clone(); + let entry = self + .network_peer_maps + .get(&network_name) + .ok_or_else(|| Error::RouteError("no peer in network".to_string()))? + .clone(); + entry.peer_map.send_msg(msg, dst_peer_id).await + } + + fn get_peer_network(&self, peer_id: &uuid::Uuid) -> Option { + self.peer_network_map.get(peer_id).map(|v| v.clone()) + } + + fn get_network_entry(&self, network_name: &str) -> Option> { + self.network_peer_maps.get(network_name).map(|v| v.clone()) + } + + fn remove_peer(&self, peer_id: &uuid::Uuid) { + self.peer_network_map.remove(peer_id); + self.network_peer_maps.retain(|_, v| !v.peer_map.is_empty()); + } + + fn clear_no_conn_peer(&self) { + for item in self.network_peer_maps.iter() { + let peer_map = item.value().peer_map.clone(); + tokio::spawn(async move { + peer_map.clean_peer_without_conn().await; + }); + } + } +} + +struct RpcTransport { + my_peer_id: uuid::Uuid, + data: Arc, + + packet_recv: Mutex>, +} + +#[async_trait::async_trait] +impl PeerRpcManagerTransport for RpcTransport { + fn my_peer_id(&self) -> Uuid { + self.my_peer_id + } + + async fn send(&self, msg: Bytes, dst_peer_id: &uuid::Uuid) -> Result<(), Error> { + self.data.send_msg(msg, dst_peer_id).await + } + + async fn recv(&self) -> Result { + if let Some(o) = self.packet_recv.lock().await.recv().await { + Ok(o) + } else { + Err(Error::Unknown) + } + } +} + +pub const FOREIGN_NETWORK_SERVICE_ID: u32 = 1; + +#[tarpc::service] +pub trait ForeignNetworkService { + async fn list_network_peers(network_identy: NetworkIdentity) -> Option>; +} + +#[tarpc::server] +impl ForeignNetworkService for Arc { + async fn list_network_peers( + self, + _: tarpc::context::Context, + network_identy: NetworkIdentity, + ) -> Option> { + let entry = self.network_peer_maps.get(&network_identy.network_name)?; + Some(entry.peer_map.list_peers().await) + } +} + +pub struct ForeignNetworkManager { + global_ctx: ArcGlobalCtx, + packet_sender_to_mgr: mpsc::Sender, + + packet_sender: mpsc::Sender, + packet_recv: Mutex>>, + + data: Arc, + rpc_mgr: Arc, + rpc_transport_sender: UnboundedSender, + + tasks: Mutex>, +} + +impl ForeignNetworkManager { + pub fn new(global_ctx: ArcGlobalCtx, packet_sender_to_mgr: mpsc::Sender) -> Self { + // recv packet from all foreign networks + let (packet_sender, packet_recv) = mpsc::channel(1000); + + let data = Arc::new(ForeignNetworkManagerData { + network_peer_maps: DashMap::new(), + peer_network_map: DashMap::new(), + }); + + // handle rpc from foreign networks + let (rpc_transport_sender, peer_rpc_tspt_recv) = mpsc::unbounded_channel(); + let rpc_mgr = Arc::new(PeerRpcManager::new(RpcTransport { + my_peer_id: global_ctx.get_id(), + data: data.clone(), + packet_recv: Mutex::new(peer_rpc_tspt_recv), + })); + + Self { + global_ctx, + packet_sender_to_mgr, + + packet_sender, + packet_recv: Mutex::new(Some(packet_recv)), + + data, + rpc_mgr, + rpc_transport_sender, + + tasks: Mutex::new(JoinSet::new()), + } + } + + pub async fn add_peer_conn(&self, peer_conn: PeerConn) -> Result<(), Error> { + tracing::warn!(peer_conn = ?peer_conn.get_conn_info(), network = ?peer_conn.get_network_identity(), "add new peer conn in foreign network manager"); + + let entry = self + .data + .network_peer_maps + .entry(peer_conn.get_network_identity().network_name.clone()) + .or_insert_with(|| { + Arc::new(ForeignNetworkEntry::new( + peer_conn.get_network_identity(), + self.packet_sender.clone(), + self.global_ctx.clone(), + )) + }) + .clone(); + + self.data.peer_network_map.insert( + peer_conn.get_peer_id(), + peer_conn.get_network_identity().network_name.clone(), + ); + + if entry.network.network_secret != peer_conn.get_network_identity().network_secret { + return Err(anyhow::anyhow!("network secret not match").into()); + } + + Ok(entry.peer_map.add_new_peer_conn(peer_conn).await) + } + + async fn start_global_event_handler(&self) { + let data = self.data.clone(); + let mut s = self.global_ctx.subscribe(); + self.tasks.lock().await.spawn(async move { + while let Ok(e) = s.recv().await { + tracing::warn!(?e, "global event"); + if let GlobalCtxEvent::PeerRemoved(peer_id) = &e { + data.remove_peer(peer_id); + } else if let GlobalCtxEvent::PeerConnRemoved(..) = &e { + data.clear_no_conn_peer(); + } + } + }); + } + + async fn start_packet_recv(&self) { + let mut recv = self.packet_recv.lock().await.take().unwrap(); + let sender_to_mgr = self.packet_sender_to_mgr.clone(); + let my_node_id = self.global_ctx.get_id(); + let rpc_sender = self.rpc_transport_sender.clone(); + let data = self.data.clone(); + + self.tasks.lock().await.spawn(async move { + while let Some(packet_bytes) = recv.recv().await { + let packet = packet::Packet::decode(&packet_bytes); + let from_peer_uuid = packet.from_peer.to_uuid(); + let to_peer_uuid = packet.to_peer.as_ref().unwrap().to_uuid(); + if to_peer_uuid == my_node_id { + if let ArchivedPacketBody::Ctrl(packet::ArchivedCtrlPacketBody::TaRpc(..)) = + &packet.body + { + rpc_sender.send(packet_bytes.clone()).unwrap(); + continue; + } + if let Err(e) = sender_to_mgr.send(packet_bytes).await { + tracing::error!("send packet to mgr failed: {:?}", e); + } + } else { + let Some(from_network) = data.get_peer_network(&from_peer_uuid) else { + continue; + }; + let Some(to_network) = data.get_peer_network(&to_peer_uuid) else { + continue; + }; + if from_network != to_network { + continue; + } + + if let Some(entry) = data.get_network_entry(&from_network) { + let ret = entry.peer_map.send_msg(packet_bytes, &to_peer_uuid).await; + if ret.is_err() { + tracing::error!("forward packet to peer failed: {:?}", ret.err()); + } + } else { + tracing::error!("foreign network not found: {}", from_network); + } + } + } + }); + } + + async fn register_peer_rpc_service(&self) { + self.rpc_mgr.run(); + self.rpc_mgr + .run_service(FOREIGN_NETWORK_SERVICE_ID, self.data.clone().serve()) + } + + pub async fn run(&self) { + self.start_global_event_handler().await; + self.start_packet_recv().await; + self.register_peer_rpc_service().await; + } +} + +#[cfg(test)] +mod tests { + use crate::{ + common::global_ctx::tests::get_mock_global_ctx_with_network, + connector::udp_hole_punch::tests::{ + create_mock_peer_manager_with_mock_stun, replace_stun_info_collector, + }, + peers::{ + peer_manager::PeerManager, + tests::{connect_peer_manager, wait_route_appear}, + }, + rpc::NatType, + }; + + use super::*; + + async fn create_mock_peer_manager_for_foreign_network(network: &str) -> Arc { + let (s, _r) = tokio::sync::mpsc::channel(1000); + let peer_mgr = Arc::new(PeerManager::new( + get_mock_global_ctx_with_network(Some(NetworkIdentity { + network_name: network.to_string(), + network_secret: network.to_string(), + })), + s, + )); + replace_stun_info_collector(peer_mgr.clone(), NatType::Unknown); + peer_mgr.run().await.unwrap(); + peer_mgr + } + + #[tokio::test] + async fn test_foreign_network_manager() { + let pm_center = create_mock_peer_manager_with_mock_stun(crate::rpc::NatType::Unknown).await; + let pm_center2 = + create_mock_peer_manager_with_mock_stun(crate::rpc::NatType::Unknown).await; + connect_peer_manager(pm_center.clone(), pm_center2.clone()).await; + + let pma_net1 = create_mock_peer_manager_for_foreign_network("net1").await; + let pmb_net1 = create_mock_peer_manager_for_foreign_network("net1").await; + connect_peer_manager(pma_net1.clone(), pm_center.clone()).await; + connect_peer_manager(pmb_net1.clone(), pm_center.clone()).await; + + let now = std::time::Instant::now(); + let mut succ = false; + while now.elapsed().as_secs() < 10 { + let table = pma_net1.get_foreign_network_client().get_next_hop_table(); + if table.len() >= 1 { + succ = true; + break; + } + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + assert!(succ); + + assert_eq!( + vec![pm_center.my_node_id()], + pma_net1 + .get_foreign_network_client() + .get_peer_map() + .list_peers() + .await + ); + assert_eq!( + vec![pm_center.my_node_id()], + pmb_net1 + .get_foreign_network_client() + .get_peer_map() + .list_peers() + .await + ); + wait_route_appear(pma_net1.clone(), pmb_net1.my_node_id()) + .await + .unwrap(); + assert_eq!(1, pma_net1.list_routes().await.len()); + assert_eq!(1, pmb_net1.list_routes().await.len()); + + let pmc_net1 = create_mock_peer_manager_for_foreign_network("net1").await; + connect_peer_manager(pmc_net1.clone(), pm_center.clone()).await; + wait_route_appear(pma_net1.clone(), pmc_net1.my_node_id()) + .await + .unwrap(); + wait_route_appear(pmb_net1.clone(), pmc_net1.my_node_id()) + .await + .unwrap(); + assert_eq!(2, pmc_net1.list_routes().await.len()); + + let pma_net2 = create_mock_peer_manager_for_foreign_network("net2").await; + let pmb_net2 = create_mock_peer_manager_for_foreign_network("net2").await; + connect_peer_manager(pma_net2.clone(), pm_center.clone()).await; + connect_peer_manager(pmb_net2.clone(), pm_center.clone()).await; + wait_route_appear(pma_net2.clone(), pmb_net2.my_node_id()) + .await + .unwrap(); + assert_eq!(1, pma_net2.list_routes().await.len()); + assert_eq!(1, pmb_net2.list_routes().await.len()); + + assert_eq!( + 5, + pm_center + .get_foreign_network_manager() + .data + .peer_network_map + .len() + ); + + assert_eq!( + 2, + pm_center + .get_foreign_network_manager() + .data + .network_peer_maps + .len() + ); + + drop(pmb_net2); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + assert_eq!( + 4, + pm_center + .get_foreign_network_manager() + .data + .peer_network_map + .len() + ); + drop(pma_net2); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + assert_eq!( + 3, + pm_center + .get_foreign_network_manager() + .data + .peer_network_map + .len() + ); + assert_eq!( + 1, + pm_center + .get_foreign_network_manager() + .data + .network_peer_maps + .len() + ); + } +} diff --git a/easytier-core/src/peers/mod.rs b/easytier-core/src/peers/mod.rs index e3bf8ee..5f6f1bc 100644 --- a/easytier-core/src/peers/mod.rs +++ b/easytier-core/src/peers/mod.rs @@ -8,6 +8,9 @@ pub mod peer_rpc; pub mod route_trait; pub mod rpc_service; +pub mod foreign_network_client; +pub mod foreign_network_manager; + #[cfg(test)] pub mod tests; diff --git a/easytier-core/src/peers/packet.rs b/easytier-core/src/peers/packet.rs index 52a8113..6f8f7bc 100644 --- a/easytier-core/src/peers/packet.rs +++ b/easytier-core/src/peers/packet.rs @@ -1,7 +1,12 @@ +use std::fmt::Debug; + use rkyv::{Archive, Deserialize, Serialize}; use tokio_util::bytes::Bytes; -use crate::common::rkyv_util::{decode_from_bytes, encode_to_bytes}; +use crate::common::{ + global_ctx::NetworkIdentity, + rkyv_util::{decode_from_bytes, encode_to_bytes}, +}; const MAGIC: u32 = 0xd1e1a5e1; const VERSION: u32 = 1; @@ -44,6 +49,43 @@ impl From<&ArchivedUUID> for UUID { } } +#[derive(Archive, Deserialize, Serialize)] +#[archive(compare(PartialEq), check_bytes)] +// Derives can be passed through to the generated type: +pub struct NetworkIdentityForPacket(Vec); + +impl From for NetworkIdentityForPacket { + fn from(network: NetworkIdentity) -> Self { + Self(bincode::serialize(&network).unwrap()) + } +} + +impl From for NetworkIdentity { + fn from(network: NetworkIdentityForPacket) -> Self { + bincode::deserialize(&network.0).unwrap() + } +} + +impl From<&ArchivedNetworkIdentityForPacket> for NetworkIdentity { + fn from(network: &ArchivedNetworkIdentityForPacket) -> Self { + NetworkIdentityForPacket(network.0.to_vec()).into() + } +} + +impl Debug for NetworkIdentityForPacket { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let network: NetworkIdentity = bincode::deserialize(&self.0).unwrap(); + write!(f, "{:?}", network) + } +} + +impl Debug for ArchivedNetworkIdentityForPacket { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let network: NetworkIdentity = bincode::deserialize(&self.0).unwrap(); + write!(f, "{:?}", network) + } +} + #[derive(Archive, Deserialize, Serialize, Debug)] #[archive(compare(PartialEq), check_bytes)] // Derives can be passed through to the generated type: @@ -53,7 +95,7 @@ pub struct HandShake { pub my_peer_id: UUID, pub version: u32, pub features: Vec, - // pub interfaces: Vec, + pub network_identity: NetworkIdentityForPacket, } #[derive(Archive, Deserialize, Serialize, Debug)] @@ -116,7 +158,7 @@ impl From for Bytes { } impl Packet { - pub fn new_handshake(from_peer: uuid::Uuid) -> Self { + pub fn new_handshake(from_peer: uuid::Uuid, network: &NetworkIdentity) -> Self { Packet { from_peer: from_peer.into(), to_peer: None, @@ -125,6 +167,7 @@ impl Packet { my_peer_id: from_peer.into(), version: VERSION, features: Vec::new(), + network_identity: network.clone().into(), })), } } diff --git a/easytier-core/src/peers/peer.rs b/easytier-core/src/peers/peer.rs index f14d96c..ef980de 100644 --- a/easytier-core/src/peers/peer.rs +++ b/easytier-core/src/peers/peer.rs @@ -146,6 +146,8 @@ impl Peer { impl Drop for Peer { fn drop(&mut self) { self.shutdown_notifier.notify_one(); + self.global_ctx + .issue_event(GlobalCtxEvent::PeerRemoved(self.peer_node_id)); tracing::info!("peer {} drop", self.peer_node_id); } } @@ -172,6 +174,7 @@ mod tests { "test", ConfigFs::new("/tmp/easytier-test"), NetNS::new(None), + None, )); let local_peer = Peer::new(uuid::Uuid::new_v4(), local_packet_send, global_ctx.clone()); let remote_peer = Peer::new(uuid::Uuid::new_v4(), remote_packet_send, global_ctx.clone()); diff --git a/easytier-core/src/peers/peer_conn.rs b/easytier-core/src/peers/peer_conn.rs index 687c750..a09c8ef 100644 --- a/easytier-core/src/peers/peer_conn.rs +++ b/easytier-core/src/peers/peer_conn.rs @@ -23,7 +23,7 @@ use tokio_util::{ use tracing::Instrument; use crate::{ - common::global_ctx::ArcGlobalCtx, + common::global_ctx::{ArcGlobalCtx, NetworkIdentity}, define_tunnel_filter_chain, rpc::{PeerConnInfo, PeerConnStats}, tunnels::{ @@ -82,6 +82,7 @@ pub struct PeerInfo { version: u32, pub features: Vec, pub interfaces: Vec, + pub network_identity: NetworkIdentity, } impl<'a> From<&ArchivedHandShake> for PeerInfo { @@ -92,6 +93,7 @@ impl<'a> From<&ArchivedHandShake> for PeerInfo { version: hs.version.into(), features: hs.features.iter().map(|x| x.to_string()).collect(), interfaces: Vec::new(), + network_identity: (&hs.network_identity).into(), } } } @@ -380,7 +382,7 @@ impl PeerConn { let hs_req = self .global_ctx .net_ns - .run(|| packet::Packet::new_handshake(self.my_node_id)); + .run(|| packet::Packet::new_handshake(self.my_node_id, &self.global_ctx.network)); sink.send(hs_req.into()).await?; Ok(()) @@ -393,7 +395,7 @@ impl PeerConn { let hs_req = self .global_ctx .net_ns - .run(|| packet::Packet::new_handshake(self.my_node_id)); + .run(|| packet::Packet::new_handshake(self.my_node_id, &self.global_ctx.network)); sink.send(hs_req.into()).await?; wait_response!(stream, hs_rsp, packet::ArchivedPacketBody::Ctrl(ArchivedCtrlPacketBody::HandShake(x)) => x); @@ -481,20 +483,20 @@ impl PeerConn { self.tasks.spawn( async move { tracing::info!("start recving peer conn packet"); + let mut task_ret = Ok(()); while let Some(ret) = stream.next().await { if ret.is_err() { tracing::error!(error = ?ret, "peer conn recv error"); - if let Err(close_ret) = sink.close().await { - tracing::error!(error = ?close_ret, "peer conn sink close error, ignore it"); - } - if let Err(e) = close_event_sender.send(conn_id).await { - tracing::error!(error = ?e, "peer conn close event send error"); - } - return Err(ret.err().unwrap()); + task_ret = Err(ret.err().unwrap()); + break; } match Self::get_packet_type(ret.unwrap().into()) { - PeerConnPacketType::Data(item) => sender.send(item).await.unwrap(), + PeerConnPacketType::Data(item) => { + if sender.send(item).await.is_err() { + break; + } + } PeerConnPacketType::CtrlReq(item) => { let ret = Self::handle_ctrl_req_packet(item, &conn_info).unwrap(); if let Err(e) = sink.send(ret).await { @@ -508,8 +510,17 @@ impl PeerConn { } } } + tracing::info!("end recving peer conn packet"); - Ok(()) + + if let Err(close_ret) = sink.close().await { + tracing::error!(error = ?close_ret, "peer conn sink close error, ignore it"); + } + if let Err(e) = close_event_sender.send(conn_id).await { + tracing::error!(error = ?e, "peer conn close event send error"); + } + + task_ret } .instrument( tracing::info_span!("peer conn recv loop", conn_info = ?conn_info_for_instrument), @@ -525,6 +536,10 @@ impl PeerConn { self.info.as_ref().unwrap().my_peer_id } + pub fn get_network_identity(&self) -> NetworkIdentity { + self.info.as_ref().unwrap().network_identity.clone() + } + pub fn set_close_event_sender(&mut self, sender: mpsc::Sender) { self.close_event_sender = Some(sender); } @@ -597,6 +612,7 @@ mod tests { "c", ConfigFs::new_with_dir("c", "/tmp"), NetNS::new(None), + None, )), Box::new(c), ); @@ -607,6 +623,7 @@ mod tests { "c", ConfigFs::new_with_dir("c", "/tmp"), NetNS::new(None), + None, )), Box::new(s), ); @@ -627,6 +644,8 @@ mod tests { assert_eq!(c_peer.get_peer_id(), s_uuid); assert_eq!(s_peer.get_peer_id(), c_uuid); + assert_eq!(c_peer.get_network_identity(), s_peer.get_network_identity()); + assert_eq!(c_peer.get_network_identity(), NetworkIdentity::default()); } async fn peer_conn_pingpong_test_common(drop_start: u32, drop_end: u32, conn_closed: bool) { diff --git a/easytier-core/src/peers/peer_manager.rs b/easytier-core/src/peers/peer_manager.rs index 67e8675..23bd8df 100644 --- a/easytier-core/src/peers/peer_manager.rs +++ b/easytier-core/src/peers/peer_manager.rs @@ -27,6 +27,8 @@ use crate::{ }; use super::{ + foreign_network_client::ForeignNetworkClient, + foreign_network_manager::ForeignNetworkManager, peer_map::PeerMap, peer_rip_route::BasicRoute, peer_rpc::PeerRpcManager, @@ -37,6 +39,7 @@ use super::{ struct RpcTransport { my_peer_id: uuid::Uuid, peers: Arc, + foreign_peers: Mutex>>, packet_recv: Mutex>, peer_rpc_tspt_sender: UnboundedSender, @@ -49,6 +52,11 @@ impl PeerRpcManagerTransport for RpcTransport { } async fn send(&self, msg: Bytes, dst_peer_id: &uuid::Uuid) -> Result<(), Error> { + if let Some(foreign_peers) = self.foreign_peers.lock().await.as_ref() { + if foreign_peers.has_peer(dst_peer_id) { + return foreign_peers.send_msg(msg, dst_peer_id).await; + } + } self.peers .send_msg(msg, dst_peer_id) .map_err(|e| e.into()) @@ -101,6 +109,9 @@ pub struct PeerManager { nic_packet_process_pipeline: Arc>>, basic_route: Arc, + + foreign_network_manager: Arc, + foreign_network_client: Arc, } impl Debug for PeerManager { @@ -123,12 +134,24 @@ impl PeerManager { let rpc_tspt = Arc::new(RpcTransport { my_peer_id: global_ctx.get_id(), peers: peers.clone(), + foreign_peers: Mutex::new(None), packet_recv: Mutex::new(peer_rpc_tspt_recv), peer_rpc_tspt_sender, }); + let peer_rpc_mgr = Arc::new(PeerRpcManager::new(rpc_tspt.clone())); let basic_route = Arc::new(BasicRoute::new(global_ctx.get_id(), global_ctx.clone())); + let foreign_network_manager = Arc::new(ForeignNetworkManager::new( + global_ctx.clone(), + packet_send.clone(), + )); + let foreign_network_client = Arc::new(ForeignNetworkClient::new( + global_ctx.clone(), + packet_send.clone(), + peer_rpc_mgr.clone(), + )); + PeerManager { my_node_id: global_ctx.get_id(), global_ctx, @@ -140,13 +163,16 @@ impl PeerManager { peers: peers.clone(), - peer_rpc_mgr: Arc::new(PeerRpcManager::new(rpc_tspt.clone())), + peer_rpc_mgr, peer_rpc_tspt: rpc_tspt, peer_packet_process_pipeline: Arc::new(RwLock::new(Vec::new())), nic_packet_process_pipeline: Arc::new(RwLock::new(Vec::new())), basic_route, + + foreign_network_manager, + foreign_network_client, } } @@ -155,7 +181,11 @@ impl PeerManager { peer.do_handshake_as_client().await?; let conn_id = peer.get_conn_id(); let peer_id = peer.get_peer_id(); - self.peers.add_new_peer_conn(peer).await; + if peer.get_network_identity() == self.global_ctx.get_network_identity() { + self.peers.add_new_peer_conn(peer).await; + } else { + self.foreign_network_client.add_new_peer_conn(peer).await; + } Ok((peer_id, conn_id)) } @@ -176,7 +206,11 @@ impl PeerManager { tracing::info!("add tunnel as server start"); let mut peer = PeerConn::new(self.my_node_id, self.global_ctx.clone(), tunnel); peer.do_handshake_as_server().await?; - self.peers.add_new_peer_conn(peer).await; + if peer.get_network_identity() == self.global_ctx.get_network_identity() { + self.peers.add_new_peer_conn(peer).await; + } else { + self.foreign_network_manager.add_peer_conn(peer).await?; + } tracing::info!("add tunnel as server done"); Ok(()) } @@ -312,12 +346,15 @@ impl PeerManager { struct Interface { my_node_id: uuid::Uuid, peers: Arc, + foreign_network_client: Arc, } #[async_trait] impl RouteInterface for Interface { async fn list_peers(&self) -> Vec { - self.peers.list_peers_with_conn().await + let mut peers = self.foreign_network_client.list_foreign_peers(); + peers.extend(self.peers.list_peers_with_conn().await); + peers } async fn send_route_packet( &self, @@ -325,17 +362,18 @@ impl PeerManager { route_id: u8, dst_peer_id: &PeerId, ) -> Result<(), Error> { + let packet_bytes: Bytes = + packet::Packet::new_route_packet(self.my_node_id, *dst_peer_id, route_id, &msg) + .into(); + if self.foreign_network_client.has_next_hop(dst_peer_id) { + return self + .foreign_network_client + .send_msg(packet_bytes, &dst_peer_id) + .await; + } + self.peers - .send_msg_directly( - packet::Packet::new_route_packet( - self.my_node_id, - *dst_peer_id, - route_id, - &msg, - ) - .into(), - dst_peer_id, - ) + .send_msg_directly(packet_bytes, dst_peer_id) .await } } @@ -345,6 +383,7 @@ impl PeerManager { .open(Box::new(Interface { my_node_id, peers: self.peers.clone(), + foreign_network_client: self.foreign_network_client.clone(), })) .await .unwrap(); @@ -400,24 +439,23 @@ impl PeerManager { let peer_map = self.peers.clone(); self.tasks.lock().await.spawn(async move { loop { - let mut to_remove = vec![]; - - for peer_id in peer_map.list_peers().await { - let conns = peer_map.list_peer_conns(&peer_id).await; - if conns.is_none() || conns.as_ref().unwrap().is_empty() { - to_remove.push(peer_id); - } - } - - for peer_id in to_remove { - peer_map.close_peer(&peer_id).await.unwrap(); - } - + peer_map.clean_peer_without_conn().await; tokio::time::sleep(std::time::Duration::from_secs(3)).await; } }); } + async fn run_foriegn_network(&self) { + self.peer_rpc_tspt + .foreign_peers + .lock() + .await + .replace(self.foreign_network_client.get_peer_map().clone()); + + self.foreign_network_manager.run().await; + self.foreign_network_client.run().await; + } + pub async fn run(&self) -> Result<(), Error> { self.add_route(self.basic_route.clone()).await; @@ -425,6 +463,8 @@ impl PeerManager { self.start_peer_recv().await; self.peer_rpc_mgr.run(); self.run_clean_peer_without_conn_routine().await; + + self.run_foriegn_network().await; Ok(()) } @@ -451,4 +491,12 @@ impl PeerManager { pub fn get_basic_route(&self) -> Arc { self.basic_route.clone() } + + pub fn get_foreign_network_manager(&self) -> Arc { + self.foreign_network_manager.clone() + } + + pub fn get_foreign_network_client(&self) -> Arc { + self.foreign_network_client.clone() + } } diff --git a/easytier-core/src/peers/peer_map.rs b/easytier-core/src/peers/peer_map.rs index 119a40c..87a6e53 100644 --- a/easytier-core/src/peers/peer_map.rs +++ b/easytier-core/src/peers/peer_map.rs @@ -51,6 +51,10 @@ impl PeerMap { self.peer_map.get(peer_id).map(|v| v.clone()) } + pub fn has_peer(&self, peer_id: &PeerId) -> bool { + self.peer_map.contains_key(peer_id) + } + pub async fn send_msg_directly( &self, msg: Bytes, @@ -97,6 +101,10 @@ impl PeerMap { } } + if gateway_peer_id.is_none() && self.has_peer(dst_peer_id) { + gateway_peer_id = Some(*dst_peer_id); + } + let Some(gateway_peer_id) = gateway_peer_id else { log::error!("no gateway for dst_peer_id: {}", dst_peer_id); return Ok(()); @@ -117,6 +125,10 @@ impl PeerMap { None } + pub fn is_empty(&self) -> bool { + self.peer_map.is_empty() + } + pub async fn list_peers(&self) -> Vec { let mut ret = Vec::new(); for item in self.peer_map.iter() { @@ -175,4 +187,19 @@ impl PeerMap { let mut routes = self.routes.write().await; routes.insert(0, route); } + + pub async fn clean_peer_without_conn(&self) { + let mut to_remove = vec![]; + + for peer_id in self.list_peers().await { + let conns = self.list_peer_conns(&peer_id).await; + if conns.is_none() || conns.as_ref().unwrap().is_empty() { + to_remove.push(peer_id); + } + } + + for peer_id in to_remove { + self.close_peer(&peer_id).await.unwrap(); + } + } } diff --git a/easytier-core/src/peers/peer_rpc.rs b/easytier-core/src/peers/peer_rpc.rs index 063b2aa..db6116e 100644 --- a/easytier-core/src/peers/peer_rpc.rs +++ b/easytier-core/src/peers/peer_rpc.rs @@ -222,7 +222,10 @@ impl PeerRpcManager { let client_resp_receivers = self.client_resp_receivers.clone(); tokio::spawn(async move { loop { - let o = tspt.recv().await.unwrap(); + let Ok(o) = tspt.recv().await else { + tracing::warn!("peer rpc transport read aborted, exiting"); + break; + }; let packet = Packet::decode(&o); let packet: Packet = packet.deserialize(&mut rkyv::Infallible).unwrap(); let info = Self::parse_rpc_packet(&packet).unwrap();