From 85f0091056b833b3701a6577b21fa258f8cc0b23 Mon Sep 17 00:00:00 2001 From: "Sijie.Sun" Date: Sat, 19 Jul 2025 18:16:53 +0800 Subject: [PATCH] fix latency first route of public server (#1129) --- easytier/src/easytier-cli.rs | 265 ++++++++++-------- easytier/src/launcher.rs | 5 +- easytier/src/peer_center/instance.rs | 147 +++++++--- easytier/src/peers/foreign_network_manager.rs | 17 ++ easytier/src/peers/peer.rs | 13 +- easytier/src/peers/peer_manager.rs | 10 +- easytier/src/peers/peer_map.rs | 35 ++- easytier/src/peers/peer_ospf_route.rs | 2 +- easytier/src/peers/rpc_service.rs | 18 +- 9 files changed, 326 insertions(+), 186 deletions(-) diff --git a/easytier/src/easytier-cli.rs b/easytier/src/easytier-cli.rs index b896c23..45d3104 100644 --- a/easytier/src/easytier-cli.rs +++ b/easytier/src/easytier-cli.rs @@ -13,6 +13,7 @@ use anyhow::Context; use cidr::Ipv4Inet; use clap::{command, Args, CommandFactory, Parser, Subcommand}; use clap_complete::Shell; +use dashmap::DashMap; use humansize::format_size; use rust_i18n::t; use service_manager::*; @@ -28,12 +29,13 @@ use easytier::{ cli::{ list_peer_route_pair, ConnectorManageRpc, ConnectorManageRpcClientFactory, DumpRouteRequest, GetVpnPortalInfoRequest, ListConnectorRequest, - ListForeignNetworkRequest, ListGlobalForeignNetworkRequest, ListPeerRequest, - ListPeerResponse, ListRouteRequest, ListRouteResponse, NodeInfo, PeerManageRpc, + ListForeignNetworkRequest, ListGlobalForeignNetworkRequest, ListMappedListenerRequest, + ListPeerRequest, ListPeerResponse, ListRouteRequest, ListRouteResponse, + ManageMappedListenerRequest, MappedListenerManageAction, MappedListenerManageRpc, + MappedListenerManageRpcClientFactory, NodeInfo, PeerManageRpc, PeerManageRpcClientFactory, ShowNodeInfoRequest, TcpProxyEntryState, TcpProxyEntryTransportType, TcpProxyRpc, TcpProxyRpcClientFactory, VpnPortalRpc, VpnPortalRpcClientFactory, - ManageMappedListenerRequest, MappedListenerManageRpc, MappedListenerManageRpcClientFactory, ListMappedListenerRequest, MappedListenerManageAction }, common::NatType, peer_rpc::{GetGlobalPeerMapRequest, PeerCenterRpc, PeerCenterRpcClientFactory}, @@ -92,9 +94,7 @@ enum SubCommand { #[command(about = "show tcp/kcp proxy status")] Proxy, #[command(about = t!("core_clap.generate_completions").to_string())] - GenAutocomplete{ - shell:Shell - }, + GenAutocomplete { shell: Shell }, } #[derive(clap::ValueEnum, Debug, Clone, PartialEq)] @@ -158,13 +158,9 @@ struct MappedListenerArgs { #[derive(Subcommand, Debug)] enum MappedListenerSubCommand { /// Add Mapped Listerner - Add { - url: String - }, + Add { url: String }, /// Remove Mapped Listener - Remove { - url: String - }, + Remove { url: String }, /// List Existing Mapped Listener List, } @@ -616,109 +612,61 @@ impl CommandHandler<'_> { }); let route = p.route.clone().unwrap_or_default(); - if route.cost == 1 { - items.push(RouteTableItem { - ipv4: route.ipv4_addr.map(|ip| ip.to_string()).unwrap_or_default(), - hostname: route.hostname.clone(), - proxy_cidrs: route.proxy_cidrs.clone().join(",").to_string(), - - next_hop_ipv4: "DIRECT".to_string(), - next_hop_hostname: "".to_string(), - next_hop_lat: next_hop_pair.get_latency_ms().unwrap_or(0.0), - path_len: route.cost, - path_latency: next_hop_pair.get_latency_ms().unwrap_or_default() as i32, - - next_hop_ipv4_lat_first: next_hop_pair_latency_first - .map(|pair| pair.route.clone().unwrap_or_default().ipv4_addr) - .unwrap_or_default() - .map(|ip| ip.to_string()) - .unwrap_or_default(), - next_hop_hostname_lat_first: next_hop_pair_latency_first - .map(|pair| pair.route.clone().unwrap_or_default().hostname) - .unwrap_or_default() - .clone(), - path_latency_lat_first: next_hop_pair_latency_first - .map(|pair| { - pair.route - .clone() - .unwrap_or_default() - .path_latency_latency_first - .unwrap_or_default() - }) - .unwrap_or_default(), - path_len_lat_first: next_hop_pair_latency_first - .map(|pair| { - pair.route - .clone() - .unwrap_or_default() - .cost_latency_first - .unwrap_or_default() - }) - .unwrap_or_default(), - - version: if route.version.is_empty() { - "unknown".to_string() - } else { - route.version.to_string() - }, - }); - } else { - items.push(RouteTableItem { - ipv4: route.ipv4_addr.map(|ip| ip.to_string()).unwrap_or_default(), - hostname: route.hostname.clone(), - proxy_cidrs: route.proxy_cidrs.clone().join(",").to_string(), - next_hop_ipv4: next_hop_pair + items.push(RouteTableItem { + ipv4: route.ipv4_addr.map(|ip| ip.to_string()).unwrap_or_default(), + hostname: route.hostname.clone(), + proxy_cidrs: route.proxy_cidrs.clone().join(",").to_string(), + next_hop_ipv4: if route.cost == 1 { + "DIRECT".to_string() + } else { + next_hop_pair .route .clone() .unwrap_or_default() .ipv4_addr .map(|ip| ip.to_string()) - .unwrap_or_default(), - next_hop_hostname: next_hop_pair + .unwrap_or_default() + }, + next_hop_hostname: if route.cost == 1 { + "DIRECT".to_string() + } else { + next_hop_pair .route .clone() .unwrap_or_default() .hostname - .clone(), - next_hop_lat: next_hop_pair.get_latency_ms().unwrap_or(0.0), - path_len: route.cost, - path_latency: p.route.clone().unwrap_or_default().path_latency as i32, + .clone() + }, + next_hop_lat: next_hop_pair.get_latency_ms().unwrap_or(0.0), + path_len: route.cost, + path_latency: route.path_latency, - next_hop_ipv4_lat_first: next_hop_pair_latency_first + next_hop_ipv4_lat_first: if route.cost_latency_first.unwrap_or_default() == 1 { + "DIRECT".to_string() + } else { + next_hop_pair_latency_first .map(|pair| pair.route.clone().unwrap_or_default().ipv4_addr) .unwrap_or_default() .map(|ip| ip.to_string()) - .unwrap_or_default(), - next_hop_hostname_lat_first: next_hop_pair_latency_first + .unwrap_or_default() + }, + next_hop_hostname_lat_first: if route.cost_latency_first.unwrap_or_default() == 1 { + "DIRECT".to_string() + } else { + next_hop_pair_latency_first .map(|pair| pair.route.clone().unwrap_or_default().hostname) .unwrap_or_default() - .clone(), - path_latency_lat_first: next_hop_pair_latency_first - .map(|pair| { - pair.route - .clone() - .unwrap_or_default() - .path_latency_latency_first - .unwrap_or_default() - }) - .unwrap_or_default(), - path_len_lat_first: next_hop_pair_latency_first - .map(|pair| { - pair.route - .clone() - .unwrap_or_default() - .cost_latency_first - .unwrap_or_default() - }) - .unwrap_or_default(), + .clone() + }, + path_latency_lat_first: route.path_latency_latency_first.unwrap_or_default(), + path_len_lat_first: route.cost_latency_first.unwrap_or_default(), - version: if route.version.is_empty() { - "unknown".to_string() - } else { - route.version.to_string() - }, - }); - } + version: if route.version.is_empty() { + "unknown".to_string() + } else { + route.version.to_string() + }, + }); } print_output(&items, self.output_format)?; @@ -747,7 +695,10 @@ impl CommandHandler<'_> { .list_mapped_listener(BaseController::default(), request) .await?; if self.verbose || *self.output_format == OutputFormat::Json { - println!("{}", serde_json::to_string_pretty(&response.mappedlisteners)?); + println!( + "{}", + serde_json::to_string_pretty(&response.mappedlisteners)? + ); return Ok(()); } println!("response: {:#?}", response); @@ -759,7 +710,7 @@ impl CommandHandler<'_> { let client = self.get_mapped_listener_manager_client().await?; let request = ManageMappedListenerRequest { action: MappedListenerManageAction::MappedListenerAdd as i32, - url: Some(url.into()) + url: Some(url.into()), }; let _response = client .manage_mapped_listener(BaseController::default(), request) @@ -772,7 +723,7 @@ impl CommandHandler<'_> { let client = self.get_mapped_listener_manager_client().await?; let request = ManageMappedListenerRequest { action: MappedListenerManageAction::MappedListenerRemove as i32, - url: Some(url.into()) + url: Some(url.into()), }; let _response = client .manage_mapped_listener(BaseController::default(), request) @@ -783,9 +734,11 @@ impl CommandHandler<'_> { fn mapped_listener_validate_url(url: &String) -> Result { let url = url::Url::parse(url)?; if url.scheme() != "tcp" && url.scheme() != "udp" { - return Err(anyhow::anyhow!("Url ({url}) must start with tcp:// or udp://")) + return Err(anyhow::anyhow!( + "Url ({url}) must start with tcp:// or udp://" + )); } else if url.port().is_none() { - return Err(anyhow::anyhow!("Url ({url}) is missing port num")) + return Err(anyhow::anyhow!("Url ({url}) is missing port num")); } Ok(url) } @@ -1079,7 +1032,7 @@ async fn main() -> Result<(), Error> { let locale = sys_locale::get_locale().unwrap_or_else(|| String::from("en-US")); rust_i18n::set_locale(&locale); let cli = Cli::parse(); - + let client = RpcClient::new(TcpTunnelConnector::new( format!("tcp://{}:{}", cli.rpc_portal.ip(), cli.rpc_portal.port()) .parse() @@ -1126,19 +1079,21 @@ async fn main() -> Result<(), Error> { handler.handle_connector_list().await?; } }, - SubCommand::MappedListener(mapped_listener_args) => match mapped_listener_args.sub_command { - Some(MappedListenerSubCommand::Add { url }) => { - handler.handle_mapped_listener_add(&url).await?; - println!("add mapped listener: {url}"); + SubCommand::MappedListener(mapped_listener_args) => { + match mapped_listener_args.sub_command { + Some(MappedListenerSubCommand::Add { url }) => { + handler.handle_mapped_listener_add(&url).await?; + println!("add mapped listener: {url}"); + } + Some(MappedListenerSubCommand::Remove { url }) => { + handler.handle_mapped_listener_remove(&url).await?; + println!("remove mapped listener: {url}"); + } + Some(MappedListenerSubCommand::List) | None => { + handler.handle_mapped_listener_list().await?; + } } - Some(MappedListenerSubCommand::Remove { url }) => { - handler.handle_mapped_listener_remove(&url).await?; - println!("remove mapped listener: {url}"); - } - Some(MappedListenerSubCommand::List) | None => { - handler.handle_mapped_listener_list().await?; - } - }, + } SubCommand::Route(route_args) => match route_args.sub_command { Some(RouteSubCommand::List) | None => handler.handle_route_list().await?, Some(RouteSubCommand::Dump) => handler.handle_route_dump().await?, @@ -1173,10 +1128,53 @@ async fn main() -> Result<(), Error> { GetGlobalPeerMapRequest::default(), ) .await?; + let route_infos = handler.list_peer_route_pair().await?; + struct PeerCenterNodeInfo { + hostname: String, + ipv4: String, + } + let node_id_to_node_info = DashMap::new(); + let node_info = handler + .get_peer_manager_client() + .await? + .show_node_info(BaseController::default(), ShowNodeInfoRequest::default()) + .await? + .node_info + .ok_or(anyhow::anyhow!("node info not found"))?; + node_id_to_node_info.insert( + node_info.peer_id, + PeerCenterNodeInfo { + hostname: node_info.hostname.clone(), + ipv4: node_info.ipv4_addr.clone(), + }, + ); + for route_info in route_infos { + let Some(peer_id) = route_info.route.as_ref().map(|x| x.peer_id) else { + continue; + }; + node_id_to_node_info.insert( + peer_id, + PeerCenterNodeInfo { + hostname: route_info + .route + .as_ref() + .map(|x| x.hostname.clone()) + .unwrap_or_default(), + ipv4: route_info + .route + .as_ref() + .and_then(|x| x.ipv4_addr) + .map(|x| x.to_string()) + .unwrap_or_default(), + }, + ); + } #[derive(tabled::Tabled, serde::Serialize)] struct PeerCenterTableItem { node_id: String, + hostname: String, + ipv4: String, #[tabled(rename = "direct_peers")] #[serde(skip_serializing)] direct_peers_str: String, @@ -1187,27 +1185,50 @@ async fn main() -> Result<(), Error> { #[derive(serde::Serialize)] struct DirectPeerItem { node_id: String, + hostname: String, + ipv4: String, latency_ms: i32, } let mut table_rows = vec![]; for (k, v) in resp.global_peer_map.iter() { let node_id = k; - let direct_peers_strs = v - .direct_peers - .iter() - .map(|(k, v)| format!("{}: {:?}ms", k, v.latency_ms,)) - .collect::>(); let direct_peers: Vec<_> = v .direct_peers .iter() .map(|(k, v)| DirectPeerItem { node_id: k.to_string(), + hostname: node_id_to_node_info + .get(k) + .map(|x| x.hostname.clone()) + .unwrap_or_default(), + ipv4: node_id_to_node_info + .get(k) + .map(|x| x.ipv4.clone()) + .unwrap_or_default(), latency_ms: v.latency_ms, }) .collect(); + let direct_peers_strs = direct_peers + .iter() + .map(|x| { + format!( + "{}({}[{}]): {}ms", + x.node_id, x.hostname, x.ipv4, x.latency_ms, + ) + }) + .collect::>(); + table_rows.push(PeerCenterTableItem { node_id: node_id.to_string(), + hostname: node_id_to_node_info + .get(node_id) + .map(|x| x.hostname.clone()) + .unwrap_or_default(), + ipv4: node_id_to_node_info + .get(node_id) + .map(|x| x.ipv4.clone()) + .unwrap_or_default(), direct_peers_str: direct_peers_strs.join("\n"), direct_peers, }); diff --git a/easytier/src/launcher.rs b/easytier/src/launcher.rs index 0398592..5f2f1ca 100644 --- a/easytier/src/launcher.rs +++ b/easytier/src/launcher.rs @@ -191,9 +191,8 @@ impl EasyTierLauncher { }; *data_c.my_node_info.write().unwrap() = node_info.clone(); *data_c.routes.write().unwrap() = peer_mgr_c.list_routes().await; - *data_c.peers.write().unwrap() = PeerManagerRpcService::new(peer_mgr_c.clone()) - .list_peers() - .await; + *data_c.peers.write().unwrap() = + PeerManagerRpcService::list_peers(&peer_mgr_c).await; tokio::time::sleep(std::time::Duration::from_secs(1)).await; } }); diff --git a/easytier/src/peer_center/instance.rs b/easytier/src/peer_center/instance.rs index d446024..d0a49a9 100644 --- a/easytier/src/peer_center/instance.rs +++ b/easytier/src/peer_center/instance.rs @@ -12,17 +12,19 @@ use tokio::task::JoinSet; use tracing::Instrument; use crate::{ - common::PeerId, + common::{global_ctx::GlobalCtx, PeerId}, peers::{ peer_manager::PeerManager, + peer_map::PeerMap, + peer_rpc::PeerRpcManager, route_trait::{RouteCostCalculator, RouteCostCalculatorInterface}, rpc_service::PeerManagerRpcService, }, proto::{ peer_rpc::{ - GetGlobalPeerMapRequest, GetGlobalPeerMapResponse, GlobalPeerMap, PeerCenterRpc, - PeerCenterRpcClientFactory, PeerCenterRpcServer, PeerInfoForGlobalMap, - ReportPeersRequest, ReportPeersResponse, + DirectConnectedPeerInfo, GetGlobalPeerMapRequest, GetGlobalPeerMapResponse, + GlobalPeerMap, PeerCenterRpc, PeerCenterRpcClientFactory, PeerCenterRpcServer, + PeerInfoForGlobalMap, ReportPeersRequest, ReportPeersResponse, }, rpc_types::{self, controller::BaseController}, }, @@ -30,8 +32,18 @@ use crate::{ use super::{server::PeerCenterServer, Digest, Error}; +#[async_trait::async_trait] +#[auto_impl::auto_impl(&, Arc, Box)] +pub trait PeerCenterPeerManagerTrait: Send + Sync + 'static { + async fn list_peers(&self) -> PeerInfoForGlobalMap; + fn my_peer_id(&self) -> PeerId; + fn get_global_ctx(&self) -> Arc; + fn get_rpc_mgr(&self) -> Weak; + async fn list_routes(&self) -> Vec; +} + struct PeerCenterBase { - peer_mgr: Weak, + peer_mgr: Arc, my_peer_id: PeerId, tasks: Mutex>, lock: Arc>, @@ -41,7 +53,7 @@ struct PeerCenterBase { static SERVICE_ID: u32 = 50; struct PeridicJobCtx { - peer_mgr: Weak, + peer_mgr: Arc, my_peer_id: PeerId, center_peer: AtomicCell, job_ctx: T, @@ -49,22 +61,17 @@ struct PeridicJobCtx { impl PeerCenterBase { pub async fn init(&self) -> Result<(), Error> { - let Some(peer_mgr) = self.peer_mgr.upgrade() else { + let Some(rpc_mgr) = self.peer_mgr.get_rpc_mgr().upgrade() else { return Err(Error::Shutdown); }; - - peer_mgr - .get_peer_rpc_mgr() - .rpc_server() - .registry() - .register( - PeerCenterRpcServer::new(PeerCenterServer::new(peer_mgr.my_peer_id())), - &peer_mgr.get_global_ctx().get_network_name(), - ); + rpc_mgr.rpc_server().registry().register( + PeerCenterRpcServer::new(PeerCenterServer::new(self.peer_mgr.my_peer_id())), + &self.peer_mgr.get_global_ctx().get_network_name(), + ); Ok(()) } - async fn select_center_peer(peer_mgr: &Arc) -> Option { + async fn select_center_peer(peer_mgr: &dyn PeerCenterPeerManagerTrait) -> Option { let peers = peer_mgr.list_routes().await; if peers.is_empty() { return None; @@ -109,19 +116,18 @@ impl PeerCenterBase { job_ctx, }); loop { - let Some(peer_mgr) = peer_mgr.upgrade() else { - tracing::error!("peer manager is shutdown, exit periodic job"); - return; - }; - let Some(center_peer) = Self::select_center_peer(&peer_mgr).await else { tracing::trace!("no center peer found, sleep 1 second"); tokio::time::sleep(Duration::from_secs(1)).await; continue; }; + let Some(rpc_mgr) = peer_mgr.get_rpc_mgr().upgrade() else { + tracing::error!("rpc manager is shutdown, exit periodic job"); + return; + }; + ctx.center_peer.store(center_peer.clone()); tracing::trace!(?center_peer, "run periodic job"); - let rpc_mgr = peer_mgr.get_peer_rpc_mgr(); let _g = lock.lock().await; let stub = rpc_mgr .rpc_client() @@ -148,10 +154,11 @@ impl PeerCenterBase { ); } - pub fn new(peer_mgr: Arc) -> Self { + pub fn new(peer_mgr: Arc) -> Self { + let my_peer_id = peer_mgr.my_peer_id(); PeerCenterBase { - peer_mgr: Arc::downgrade(&peer_mgr), - my_peer_id: peer_mgr.my_peer_id(), + peer_mgr, + my_peer_id, tasks: Mutex::new(JoinSet::new()), lock: Arc::new(Mutex::new(())), } @@ -190,7 +197,7 @@ impl PeerCenterRpc for PeerCenterInstanceService { } pub struct PeerCenterInstance { - peer_mgr: Arc, + peer_mgr: Arc, client: Arc, global_peer_map: Arc>, @@ -199,7 +206,7 @@ pub struct PeerCenterInstance { } impl PeerCenterInstance { - pub fn new(peer_mgr: Arc) -> Self { + pub fn new(peer_mgr: Arc) -> Self { PeerCenterInstance { peer_mgr: peer_mgr.clone(), client: Arc::new(PeerCenterBase::new(peer_mgr.clone())), @@ -286,15 +293,14 @@ impl PeerCenterInstance { async fn init_report_peers_job(&self) { struct Ctx { - service: PeerManagerRpcService, - + peer_mgr: Arc, last_report_peers: Mutex>, last_center_peer: AtomicCell, last_report_time: AtomicCell, } let ctx = Arc::new(Ctx { - service: PeerManagerRpcService::new(self.peer_mgr.clone()), + peer_mgr: self.peer_mgr.clone(), last_report_peers: Mutex::new(BTreeSet::new()), last_center_peer: AtomicCell::new(PeerId::default()), last_report_time: AtomicCell::new(Instant::now()), @@ -303,7 +309,7 @@ impl PeerCenterInstance { self.client .init_periodic_job(ctx, |client, ctx| async move { let my_node_id = ctx.my_peer_id; - let peers: PeerInfoForGlobalMap = ctx.job_ctx.service.list_peers().await.into(); + let peers = ctx.job_ctx.peer_mgr.list_peers().await; let peer_list = peers.direct_peers.keys().map(|k| *k).collect(); let job_ctx = &ctx.job_ctx; @@ -373,7 +379,7 @@ impl PeerCenterInstance { if let Some(cost) = self.directed_cost(src, dst) { return cost; } - self.directed_cost(dst, src).unwrap_or(100) + self.directed_cost(dst, src).unwrap_or(500) } fn begin_update(&mut self) { @@ -402,6 +408,81 @@ impl PeerCenterInstance { } } +#[async_trait::async_trait] +impl PeerCenterPeerManagerTrait for PeerManager { + async fn list_peers(&self) -> PeerInfoForGlobalMap { + PeerManagerRpcService::list_peers(self).await.into() + } + + fn my_peer_id(&self) -> PeerId { + self.get_peer_map().my_peer_id() + } + + fn get_global_ctx(&self) -> Arc { + self.get_peer_map().get_global_ctx() + } + + fn get_rpc_mgr(&self) -> Weak { + Arc::downgrade(&self.get_peer_rpc_mgr()) + } + + async fn list_routes(&self) -> Vec { + self.list_routes().await + } +} + +pub struct PeerMapWithPeerRpcManager { + pub peer_map: Arc, + pub rpc_mgr: Arc, +} + +#[async_trait::async_trait] +impl PeerCenterPeerManagerTrait for PeerMapWithPeerRpcManager { + async fn list_peers(&self) -> PeerInfoForGlobalMap { + // TODO: currently latency between public server cannot be calculated because one public-server pair + // has no connection between them. (hard to get latency from peer manager because it's hard to transfrom the peer id) + // but it's fine because we don't want to too much traffic between public servers. + let peers = self.peer_map.list_peers().await; + let mut ret = PeerInfoForGlobalMap::default(); + for peer in peers { + if let Some(conns) = self.peer_map.list_peer_conns(peer).await { + let Some(min_lat) = conns + .iter() + .map(|conn| conn.stats.as_ref().unwrap().latency_us) + .min() + else { + continue; + }; + + ret.direct_peers.insert( + peer, + DirectConnectedPeerInfo { + latency_ms: std::cmp::max(1, (min_lat as u32 / 1000) as i32), + }, + ); + } + } + + ret + } + + fn my_peer_id(&self) -> PeerId { + self.peer_map.my_peer_id() + } + + fn get_global_ctx(&self) -> Arc { + self.peer_map.get_global_ctx() + } + + fn get_rpc_mgr(&self) -> Weak { + Arc::downgrade(&self.rpc_mgr) + } + + async fn list_routes(&self) -> Vec { + self.peer_map.list_route_infos().await + } +} + #[cfg(test)] mod tests { use crate::{ diff --git a/easytier/src/peers/foreign_network_manager.rs b/easytier/src/peers/foreign_network_manager.rs index 4cc4fd9..df69e36 100644 --- a/easytier/src/peers/foreign_network_manager.rs +++ b/easytier/src/peers/foreign_network_manager.rs @@ -29,6 +29,7 @@ use crate::{ token_bucket::TokenBucket, PeerId, }, + peer_center::instance::{PeerCenterInstance, PeerMapWithPeerRpcManager}, peers::route_trait::{Route, RouteInterface}, proto::{ cli::{ForeignNetworkEntryPb, ListForeignNetworkResponse, PeerInfo}, @@ -73,6 +74,8 @@ struct ForeignNetworkEntry { bps_limiter: Arc, + peer_center: Arc, + tasks: Mutex>, pub lock: Mutex<()>, @@ -116,6 +119,13 @@ impl ForeignNetworkEntry { .token_bucket_manager() .get_or_create(&network.network_name, limiter_config.into()); + let peer_center = Arc::new(PeerCenterInstance::new(Arc::new( + PeerMapWithPeerRpcManager { + peer_map: peer_map.clone(), + rpc_mgr: peer_rpc.clone(), + }, + ))); + Self { my_peer_id, @@ -134,6 +144,8 @@ impl ForeignNetworkEntry { tasks: Mutex::new(JoinSet::new()), + peer_center, + lock: Mutex::new(()), } } @@ -270,6 +282,10 @@ impl ForeignNetworkEntry { .await .unwrap(); + route + .set_route_cost_fn(self.peer_center.get_cost_calculator()) + .await; + self.peer_map.add_route(Arc::new(Box::new(route))).await; } @@ -351,6 +367,7 @@ impl ForeignNetworkEntry { self.prepare_route(accessor).await; self.start_packet_recv().await; self.peer_rpc.run(); + self.peer_center.init().await; } } diff --git a/easytier/src/peers/peer.rs b/easytier/src/peers/peer.rs index 3ef8884..dbdd3b8 100644 --- a/easytier/src/peers/peer.rs +++ b/easytier/src/peers/peer.rs @@ -201,14 +201,17 @@ impl Peer { } pub fn has_directly_connected_conn(&self) -> bool { - self.conns.iter().any(|entry|!(entry.value()).is_hole_punched()) + self.conns + .iter() + .any(|entry| !(entry.value()).is_hole_punched()) } pub fn get_directly_connections(&self) -> DashSet { - self.conns.iter() - .filter(|entry| !(entry.value()).is_hole_punched()) - .map(|entry|(entry.value()).get_conn_id()) - .collect() + self.conns + .iter() + .filter(|entry| !(entry.value()).is_hole_punched()) + .map(|entry| (entry.value()).get_conn_id()) + .collect() } pub fn get_default_conn_id(&self) -> PeerConnId { diff --git a/easytier/src/peers/peer_manager.rs b/easytier/src/peers/peer_manager.rs index 0062761..91cabf9 100644 --- a/easytier/src/peers/peer_manager.rs +++ b/easytier/src/peers/peer_manager.rs @@ -8,7 +8,7 @@ use std::{ use anyhow::Context; use async_trait::async_trait; -use dashmap::{DashMap, DashSet}; +use dashmap::DashMap; use tokio::{ sync::{ @@ -1184,14 +1184,6 @@ impl PeerManager { } } - pub fn get_directly_connections(&self, peer_id: PeerId) -> DashSet { - if let Some(peer) = self.peers.get_peer_by_id(peer_id) { - return peer.get_directly_connections(); - } - - DashSet::new() - } - pub async fn clear_resources(&self) { let mut peer_pipeline = self.peer_packet_process_pipeline.write().await; peer_pipeline.clear(); diff --git a/easytier/src/peers/peer_map.rs b/easytier/src/peers/peer_map.rs index 9bd37bc..eef23fd 100644 --- a/easytier/src/peers/peer_map.rs +++ b/easytier/src/peers/peer_map.rs @@ -1,7 +1,10 @@ -use std::{net::{Ipv4Addr, Ipv6Addr}, sync::Arc}; +use std::{ + net::{Ipv4Addr, Ipv6Addr}, + sync::Arc, +}; use anyhow::Context; -use dashmap::DashMap; +use dashmap::{DashMap, DashSet}; use tokio::sync::RwLock; use crate::{ @@ -10,7 +13,10 @@ use crate::{ global_ctx::{ArcGlobalCtx, GlobalCtxEvent, NetworkIdentity}, PeerId, }, - proto::{cli::PeerConnInfo, peer_rpc::RoutePeerInfo}, + proto::{ + cli::{self, PeerConnInfo}, + peer_rpc::RoutePeerInfo, + }, tunnel::{packet_def::ZCPacket, TunnelError}, }; @@ -91,6 +97,14 @@ impl PeerMap { self.peer_map.get(&peer_id).map(|v| v.clone()) } + pub fn get_directly_connections_by_peer_id(&self, peer_id: PeerId) -> DashSet { + if let Some(peer) = self.get_peer_by_id(peer_id) { + return peer.get_directly_connections(); + } + + DashSet::new() + } + pub fn has_peer(&self, peer_id: PeerId) -> bool { peer_id == self.my_peer_id || self.peer_map.contains_key(&peer_id) } @@ -324,6 +338,13 @@ impl PeerMap { route_map } + pub async fn list_route_infos(&self) -> Vec { + for route in self.routes.read().await.iter() { + return route.list_routes().await; + } + vec![] + } + pub async fn need_relay_by_foreign_network(&self, dst_peer_id: PeerId) -> Result { // if gateway_peer_id is not connected to me, means need relay by foreign network let gateway_id = self @@ -343,6 +364,14 @@ impl PeerMap { .map(|v| (v.key().clone(), v.value().clone())) .collect() } + + pub fn my_peer_id(&self) -> PeerId { + self.my_peer_id + } + + pub fn get_global_ctx(&self) -> ArcGlobalCtx { + self.global_ctx.clone() + } } impl Drop for PeerMap { diff --git a/easytier/src/peers/peer_ospf_route.rs b/easytier/src/peers/peer_ospf_route.rs index d1d99f7..5c0a1d6 100644 --- a/easytier/src/peers/peer_ospf_route.rs +++ b/easytier/src/peers/peer_ospf_route.rs @@ -2264,7 +2264,7 @@ impl Route for PeerRoute { route.next_hop_peer_id_latency_first = next_hop_peer_latency_first.map(|x| x.next_hop_peer_id); - route.cost_latency_first = next_hop_peer_latency_first.map(|x| x.path_latency); + route.cost_latency_first = next_hop_peer_latency_first.map(|x| x.path_len as i32); route.path_latency_latency_first = next_hop_peer_latency_first.map(|x| x.path_latency); route.feature_flag = item.feature_flag.clone(); diff --git a/easytier/src/peers/rpc_service.rs b/easytier/src/peers/rpc_service.rs index 14f8f19..9e58894 100644 --- a/easytier/src/peers/rpc_service.rs +++ b/easytier/src/peers/rpc_service.rs @@ -22,17 +22,17 @@ impl PeerManagerRpcService { PeerManagerRpcService { peer_manager } } - pub async fn list_peers(&self) -> Vec { - let mut peers = self.peer_manager.get_peer_map().list_peers().await; + pub async fn list_peers(peer_manager: &PeerManager) -> Vec { + let mut peers = peer_manager.get_peer_map().list_peers().await; peers.extend( - self.peer_manager + peer_manager .get_foreign_network_client() .get_peer_map() .list_peers() .await .iter(), ); - let peer_map = self.peer_manager.get_peer_map(); + let peer_map = peer_manager.get_peer_map(); let mut peer_infos = Vec::new(); for peer in peers { let mut peer_info = PeerInfo::default(); @@ -41,17 +41,15 @@ impl PeerManagerRpcService { .get_peer_default_conn_id(peer) .await .map(Into::into); - peer_info.directly_connected_conns = self - .peer_manager - .get_directly_connections(peer) + peer_info.directly_connected_conns = peer_map + .get_directly_connections_by_peer_id(peer) .into_iter() .map(Into::into) .collect(); if let Some(conns) = peer_map.list_peer_conns(peer).await { peer_info.conns = conns; - } else if let Some(conns) = self - .peer_manager + } else if let Some(conns) = peer_manager .get_foreign_network_client() .get_peer_map() .list_peer_conns(peer) @@ -77,7 +75,7 @@ impl PeerManageRpc for PeerManagerRpcService { ) -> Result { let mut reply = ListPeerResponse::default(); - let peers = self.list_peers().await; + let peers = PeerManagerRpcService::list_peers(&self.peer_manager).await; for peer in peers { reply.peer_infos.push(peer); }