From 4608bca998138b30b727ff1880b22bc522146c35 Mon Sep 17 00:00:00 2001 From: "Sijie.Sun" Date: Mon, 2 Jun 2025 20:12:27 +0800 Subject: [PATCH] improve performance of route generation (#914) this may fix following problem: 1. cpu 100% when large number of nodes in network. 2. high cpu usage when large number of foreign networks. 3. packet loss when new node enters/exits. 4. old routes not cleand and show as an obloleted entry. --- Cargo.lock | 11 +- Cargo.toml | 2 +- easytier/Cargo.toml | 1 + easytier/src/peers/graph_algo.rs | 179 ++++++++++ easytier/src/peers/mod.rs | 2 + easytier/src/peers/peer_manager.rs | 6 + easytier/src/peers/peer_ospf_route.rs | 492 +++++++++++++++----------- 7 files changed, 474 insertions(+), 219 deletions(-) create mode 100644 easytier/src/peers/graph_algo.rs diff --git a/Cargo.lock b/Cargo.lock index 4e60f75..7066cf1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1950,6 +1950,7 @@ dependencies = [ "gethostname 0.5.0", "git-version", "globwalk", + "hashbrown 0.15.3", "hickory-client", "hickory-proto", "hickory-resolver", @@ -3075,9 +3076,9 @@ dependencies = [ [[package]] name = "hashbrown" -version = "0.15.2" +version = "0.15.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf151400ff0baff5465007dd2f3e717f3fe502074ca563069ce3a6629d07b289" +checksum = "84b26c544d002229e640969970a2e74021aadf6e2f96372b9c58eff97de08eb3" dependencies = [ "allocator-api2", "equivalent", @@ -3681,7 +3682,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c9c992b02b5b4c94ea26e32fe5bccb7aa7d9f390ab5c1221ff895bc7ea8b652" dependencies = [ "equivalent", - "hashbrown 0.15.2", + "hashbrown 0.15.3", "serde", ] @@ -4788,7 +4789,7 @@ version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "af1844ef2428cc3e1cb900be36181049ef3d3193c63e43026cfe202983b27a56" dependencies = [ - "proc-macro-crate 2.0.0", + "proc-macro-crate 3.2.0", "proc-macro2", "quote", "syn 2.0.87", @@ -5362,7 +5363,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a98c6720655620a521dcc722d0ad66cd8afd5d86e34a89ef691c50b7b24de06" dependencies = [ "fixedbitset 0.5.7", - "hashbrown 0.15.2", + "hashbrown 0.15.3", "indexmap 2.7.1", "serde", ] diff --git a/Cargo.toml b/Cargo.toml index 1f73dc4..40a0833 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,5 +16,5 @@ panic = "unwind" panic = "abort" lto = true codegen-units = 1 -opt-level = 'z' +opt-level = 3 strip = true diff --git a/easytier/Cargo.toml b/easytier/Cargo.toml index 1df24c4..52875f9 100644 --- a/easytier/Cargo.toml +++ b/easytier/Cargo.toml @@ -138,6 +138,7 @@ network-interface = "2.0" # for ospf route petgraph = "0.8.1" +hashbrown = "0.15.3" # for wireguard boringtun = { package = "boringtun-easytier", version = "0.6.1", optional = true } diff --git a/easytier/src/peers/graph_algo.rs b/easytier/src/peers/graph_algo.rs new file mode 100644 index 0000000..33a77d2 --- /dev/null +++ b/easytier/src/peers/graph_algo.rs @@ -0,0 +1,179 @@ +use core::cmp::Ordering; +use hashbrown::hash_map::{ + Entry::{Occupied, Vacant}, + HashMap, +}; +use petgraph::{ + algo::Measure, + visit::{EdgeRef as _, IntoEdges, VisitMap as _, Visitable}, +}; +use std::{collections::BinaryHeap, hash::Hash}; + +/// `MinScored` holds a score `K` and a scored object `T` in +/// a pair for use with a `BinaryHeap`. +/// +/// `MinScored` compares in reverse order by the score, so that we can +/// use `BinaryHeap` as a min-heap to extract the score-value pair with the +/// least score. +/// +/// **Note:** `MinScored` implements a total order (`Ord`), so that it is +/// possible to use float types as scores. +#[derive(Copy, Clone, Debug)] +pub struct MinScored(pub K, pub T); + +impl PartialEq for MinScored { + #[inline] + fn eq(&self, other: &MinScored) -> bool { + self.cmp(other) == Ordering::Equal + } +} + +impl Eq for MinScored {} + +impl PartialOrd for MinScored { + #[inline] + fn partial_cmp(&self, other: &MinScored) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for MinScored { + #[inline] + fn cmp(&self, other: &MinScored) -> Ordering { + let a = &self.0; + let b = &other.0; + if a == b { + Ordering::Equal + } else if a < b { + Ordering::Greater + } else if a > b { + Ordering::Less + } else if a.ne(a) && b.ne(b) { + // these are the NaN cases + Ordering::Equal + } else if a.ne(a) { + // Order NaN less, so that it is last in the MinScore order + Ordering::Less + } else { + Ordering::Greater + } + } +} + +pub fn dijkstra_with_first_hop( + graph: G, + start: G::NodeId, + mut edge_cost: F, +) -> ( + HashMap, + HashMap, +) +where + G: IntoEdges + Visitable, + G::NodeId: Eq + Hash + Clone, + F: FnMut(G::EdgeRef) -> K, + K: Measure + Copy, +{ + let mut visited = graph.visit_map(); + let mut scores = HashMap::new(); + let mut first_hop = HashMap::new(); + let mut visit_next = BinaryHeap::new(); + let zero_score = K::default(); + scores.insert(start.clone(), zero_score); + visit_next.push(MinScored(zero_score, start.clone())); + first_hop.insert(start.clone(), (start.clone(), 0)); + + while let Some(MinScored(node_score, node)) = visit_next.pop() { + if visited.is_visited(&node) { + continue; + } + for edge in graph.edges(node.clone()) { + let next = edge.target(); + if visited.is_visited(&next) { + continue; + } + let next_score = node_score + edge_cost(edge); + match scores.entry(next.clone()) { + Occupied(mut ent) => { + if next_score < *ent.get() { + *ent.get_mut() = next_score; + visit_next.push(MinScored(next_score, next.clone())); + // 继承前驱的 first_hop,或自己就是第一跳 + let hop = if node == start { + (next.clone(), 0) + } else { + first_hop[&node].clone() + }; + first_hop.insert(next.clone(), (hop.0, hop.1 + 1)); + } + } + Vacant(ent) => { + ent.insert(next_score); + visit_next.push(MinScored(next_score, next.clone())); + let hop = if node == start { + (next.clone(), 0) + } else { + first_hop[&node].clone() + }; + first_hop.insert(next.clone(), (hop.0, hop.1 + 1)); + } + } + } + visited.visit(node); + } + + (scores, first_hop) +} + +#[cfg(test)] +mod tests { + use super::*; + use petgraph::graph::DiGraph; + + #[test] + fn test_dijkstra_with_first_hop_4node() { + let mut graph = DiGraph::<&str, u32>::new(); + let a = graph.add_node("a"); + let b = graph.add_node("b"); + let c = graph.add_node("c"); + let d = graph.add_node("d"); + + graph.extend_with_edges(&[(a, b, 1)]); + graph.extend_with_edges(&[(b, c, 1)]); + graph.extend_with_edges(&[(c, d, 2)]); + + let (scores, first_hop) = dijkstra_with_first_hop(&graph, a, |edge| *edge.weight()); + + assert_eq!(scores[&b], 1); + assert_eq!(scores[&c], 2); + assert_eq!(scores[&d], 4); + + assert_eq!(first_hop[&b], (b, 1)); + assert_eq!(first_hop[&c], (b, 2)); + assert_eq!(first_hop[&d], (b, 3)); + } + + #[test] + fn test_dijkstra_with_first_hop() { + let mut graph = DiGraph::<&str, u32>::new(); + let a = graph.add_node("a"); + let b = graph.add_node("b"); + let c = graph.add_node("c"); + let d = graph.add_node("d"); + let e = graph.add_node("e"); + + graph.extend_with_edges(&[(a, b, 1), (a, c, 2), (b, d, 1), (c, d, 3), (d, e, 1)]); + + let (scores, first_hop) = dijkstra_with_first_hop(&graph, a, |edge| *edge.weight()); + + assert_eq!(scores[&b], 1); + assert_eq!(scores[&c], 2); + assert_eq!(scores[&d], 2); + assert_eq!(scores[&e], 3); + + assert_eq!(first_hop[&b], (b, 1)); + assert_eq!(first_hop[&c], (c, 1)); + assert_eq!(first_hop[&d], (b, 2)); // d is reached via b + assert_eq!(first_hop[&e], (b, 3)); // e is reached via d + } +} diff --git a/easytier/src/peers/mod.rs b/easytier/src/peers/mod.rs index 0fbd63f..c3d9e1a 100644 --- a/easytier/src/peers/mod.rs +++ b/easytier/src/peers/mod.rs @@ -1,3 +1,5 @@ +mod graph_algo; + pub mod peer; // pub mod peer_conn; pub mod peer_conn; diff --git a/easytier/src/peers/peer_manager.rs b/easytier/src/peers/peer_manager.rs index 0fb55a9..236063e 100644 --- a/easytier/src/peers/peer_manager.rs +++ b/easytier/src/peers/peer_manager.rs @@ -1275,6 +1275,12 @@ mod tests { let peer_mgr_d = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; let peer_mgr_e = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; + println!("peer_mgr_a: {}", peer_mgr_a.my_peer_id); + println!("peer_mgr_b: {}", peer_mgr_b.my_peer_id); + println!("peer_mgr_c: {}", peer_mgr_c.my_peer_id); + println!("peer_mgr_d: {}", peer_mgr_d.my_peer_id); + println!("peer_mgr_e: {}", peer_mgr_e.my_peer_id); + connect_peer_manager(peer_mgr_a.clone(), peer_mgr_b.clone()).await; connect_peer_manager(peer_mgr_b.clone(), peer_mgr_c.clone()).await; diff --git a/easytier/src/peers/peer_ospf_route.rs b/easytier/src/peers/peer_ospf_route.rs index 11fcb58..1e5b7ca 100644 --- a/easytier/src/peers/peer_ospf_route.rs +++ b/easytier/src/peers/peer_ospf_route.rs @@ -1,7 +1,6 @@ use std::{ collections::BTreeSet, fmt::Debug, - hash::RandomState, net::Ipv4Addr, sync::{ atomic::{AtomicBool, AtomicU32, Ordering}, @@ -13,9 +12,10 @@ use std::{ use crossbeam::atomic::AtomicCell; use dashmap::DashMap; use petgraph::{ - algo::{all_simple_paths, astar, dijkstra}, - graph::NodeIndex, - Directed, Graph, + algo::dijkstra, + graph::{Graph, NodeIndex}, + visit::{EdgeRef, IntoNodeReferences}, + Directed, }; use prost::Message; use prost_reflect::{DynamicMessage, ReflectMessage}; @@ -49,6 +49,7 @@ use crate::{ }; use super::{ + graph_algo::dijkstra_with_first_hop, peer_rpc::PeerRpcManager, route_trait::{ DefaultRouteCostCalculator, ForeignNetworkRouteInfoMap, NextHopPolicy, RouteCostCalculator, @@ -60,7 +61,8 @@ use super::{ static SERVICE_ID: u32 = 7; static UPDATE_PEER_INFO_PERIOD: Duration = Duration::from_secs(3600); static REMOVE_DEAD_PEER_INFO_AFTER: Duration = Duration::from_secs(3660); -static AVOID_RELAY_COST: i32 = i32::MAX / 512; +// the cost (latency between two peers) is i32, i32::MAX is large enough. +static AVOID_RELAY_COST: usize = i32::MAX as usize; type Version = u32; @@ -80,14 +82,12 @@ impl AtomicVersion { self.0.store(version, Ordering::Relaxed); } - fn inc(&self) { - self.0.fetch_add(1, Ordering::Relaxed); + fn inc(&self) -> Version { + self.0.fetch_add(1, Ordering::Relaxed) + 1 } fn set_if_larger(&self, version: Version) { - if self.get() < version { - self.set(version); - } + self.0.fetch_max(version, Ordering::Relaxed); } } @@ -283,13 +283,25 @@ impl RouteConnBitmap { type Error = SyncRouteInfoError; // constructed with all infos synced from all peers. -#[derive(Debug)] struct SyncedRouteInfo { peer_infos: DashMap, // prost doesn't support unknown fields, so we use DynamicMessage to store raw infos and progate them to other peers. raw_peer_infos: DashMap, conn_map: DashMap, AtomicVersion)>, foreign_network: DashMap, + + version: AtomicVersion, +} + +impl Debug for SyncedRouteInfo { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SyncedRouteInfo") + .field("peer_infos", &self.peer_infos) + .field("conn_map", &self.conn_map) + .field("foreign_network", &self.foreign_network) + .field("version", &self.version.get()) + .finish() + } } impl SyncedRouteInfo { @@ -305,17 +317,24 @@ impl SyncedRouteInfo { self.raw_peer_infos.remove(&peer_id); self.conn_map.remove(&peer_id); self.foreign_network.retain(|k, _| k.peer_id != peer_id); + self.version.inc(); } fn fill_empty_peer_info(&self, peer_ids: &BTreeSet) { + let mut need_inc_version = false; for peer_id in peer_ids { - self.peer_infos - .entry(*peer_id) - .or_insert_with(|| RoutePeerInfo::new()); + self.peer_infos.entry(*peer_id).or_insert_with(|| { + need_inc_version = true; + RoutePeerInfo::new() + }); - self.conn_map - .entry(*peer_id) - .or_insert_with(|| (BTreeSet::new(), AtomicVersion::new())); + self.conn_map.entry(*peer_id).or_insert_with(|| { + need_inc_version = true; + (BTreeSet::new(), AtomicVersion::new()) + }); + } + if need_inc_version { + self.version.inc(); } } @@ -377,6 +396,7 @@ impl SyncedRouteInfo { peer_infos: &Vec, raw_peer_infos: &Vec, ) -> Result<(), Error> { + let mut need_inc_version = false; for (idx, route_info) in peer_infos.iter().enumerate() { let mut route_info = route_info.clone(); let raw_route_info = &raw_peer_infos[idx]; @@ -410,20 +430,27 @@ impl SyncedRouteInfo { self.raw_peer_infos .insert(route_info.peer_id, raw_route_info.clone()); *old_entry = route_info.clone(); + need_inc_version = true; } }) .or_insert_with(|| { + need_inc_version = true; self.raw_peer_infos .insert(route_info.peer_id, raw_route_info.clone()); route_info.clone() }); } + if need_inc_version { + self.version.inc(); + } Ok(()) } fn update_conn_map(&self, conn_bitmap: &RouteConnBitmap) { self.fill_empty_peer_info(&conn_bitmap.peer_ids.iter().map(|x| x.0).collect()); + let mut need_inc_version = false; + for (peer_idx, (peer_id, version)) in conn_bitmap.peer_ids.iter().enumerate() { assert!(self.peer_infos.contains_key(peer_id)); let connceted_peers = conn_bitmap.get_connected_peers(peer_idx); @@ -434,19 +461,25 @@ impl SyncedRouteInfo { .and_modify(|(old_conn_bitmap, old_version)| { if *version > old_version.get() { *old_conn_bitmap = conn_bitmap.get_connected_peers(peer_idx); + need_inc_version = true; old_version.set(*version); } }) .or_insert_with(|| { + need_inc_version = true; ( conn_bitmap.get_connected_peers(peer_idx), version.clone().into(), ) }); } + if need_inc_version { + self.version.inc(); + } } fn update_foreign_network(&self, foreign_network: &RouteForeignNetworkInfos) { + let mut need_inc_version = false; for item in foreign_network.infos.iter().map(Clone::clone) { let Some(key) = item.key else { continue; @@ -461,10 +494,14 @@ impl SyncedRouteInfo { .entry(key.clone()) .and_modify(|old_entry| { if entry.version > old_entry.version { + need_inc_version = true; *old_entry = entry.clone(); } }) - .or_insert_with(|| entry.clone()); + .or_insert_with(|| { + need_inc_version = true; + entry.clone() + }); } } @@ -483,7 +520,12 @@ impl SyncedRouteInfo { let old_version = old.version; *old = new; - new_version != old_version + if new_version != old_version { + self.version.inc(); + true + } else { + false + } } fn update_my_conn_info(&self, my_peer_id: PeerId, connected_peers: BTreeSet) -> bool { @@ -499,6 +541,7 @@ impl SyncedRouteInfo { } else { let _ = std::mem::replace(&mut my_conn_info.value_mut().0, connected_peers); my_conn_info.value().1.inc(); + self.version.inc(); true } } @@ -557,6 +600,10 @@ impl SyncedRouteInfo { updated = true; } + if updated { + self.version.inc(); + } + updated } @@ -573,13 +620,14 @@ impl SyncedRouteInfo { } } -type PeerGraph = Graph; +type PeerGraph = Graph; type PeerIdToNodexIdxMap = DashMap; #[derive(Debug, Clone, Copy)] struct NextHopInfo { next_hop_peer_id: PeerId, path_latency: i32, path_len: usize, // path includes src and dst. + version: Version, } // dst_peer_id -> (next_hop_peer_id, cost, path_len) type NextHopMap = DashMap; @@ -591,6 +639,7 @@ struct RouteTable { next_hop_map: NextHopMap, ipv4_peer_id_map: DashMap, cidr_peer_id_map: DashMap, + next_hop_map_version: AtomicVersion, } impl RouteTable { @@ -600,15 +649,23 @@ impl RouteTable { next_hop_map: DashMap::new(), ipv4_peer_id_map: DashMap::new(), cidr_peer_id_map: DashMap::new(), + next_hop_map_version: AtomicVersion::new(), } } fn get_next_hop(&self, dst_peer_id: PeerId) -> Option { - self.next_hop_map.get(&dst_peer_id).map(|x| *x) + let cur_version = self.next_hop_map_version.get(); + self.next_hop_map.get(&dst_peer_id).and_then(|x| { + if x.version >= cur_version { + Some(*x) + } else { + None + } + }) } fn peer_reachable(&self, peer_id: PeerId) -> bool { - self.next_hop_map.contains_key(&peer_id) + self.get_next_hop(peer_id).is_some() } fn get_nat_type(&self, peer_id: PeerId) -> Option { @@ -617,158 +674,16 @@ impl RouteTable { .map(|x| NatType::try_from(x.udp_stun_info as i32).unwrap_or_default()) } + // return graph and start node index (node of my peer id). fn build_peer_graph_from_synced_info( - peers: Vec, - synced_info: &SyncedRouteInfo, - cost_calc: &mut T, - ) -> (PeerGraph, PeerIdToNodexIdxMap) { - let mut graph: PeerGraph = Graph::new(); - let peer_id_to_node_index = PeerIdToNodexIdxMap::new(); - for peer_id in peers.iter() { - peer_id_to_node_index.insert(*peer_id, graph.add_node(*peer_id)); - } - - for peer_id in peers.iter() { - let connected_peers = synced_info - .get_connected_peers(*peer_id) - .unwrap_or(BTreeSet::new()); - - // if avoid relay, just set all outgoing edges to a large value: AVOID_RELAY_COST. - let peer_avoid_relay_data = synced_info.get_avoid_relay_data(*peer_id); - - for dst_peer_id in connected_peers.iter() { - let Some(dst_idx) = peer_id_to_node_index.get(dst_peer_id) else { - continue; - }; - - graph.add_edge( - *peer_id_to_node_index.get(&peer_id).unwrap(), - *dst_idx, - if peer_avoid_relay_data { - AVOID_RELAY_COST - } else { - cost_calc.calculate_cost(*peer_id, *dst_peer_id) - }, - ); - } - } - - (graph, peer_id_to_node_index) - } - - fn gen_next_hop_map_with_least_hop( - my_peer_id: PeerId, - graph: &PeerGraph, - idx_map: &PeerIdToNodexIdxMap, - cost_calc: &mut T, - ) -> NextHopMap { - let res = dijkstra(&graph, *idx_map.get(&my_peer_id).unwrap(), None, |_| 1); - let next_hop_map = NextHopMap::new(); - for (node_idx, cost) in res.iter() { - if *cost == 0 { - continue; - } - let mut all_paths = all_simple_paths::, _, RandomState>( - graph, - *idx_map.get(&my_peer_id).unwrap(), - *node_idx, - *cost - 1, - Some(*cost + 1), // considering having avoid relay, the max cost could be a bit larger. - ) - .collect::>(); - - assert!(!all_paths.is_empty()); - all_paths.sort_by(|a, b| a.len().cmp(&b.len())); - - // find a path with least cost. - let mut min_cost = i32::MAX; - let mut min_path_len = usize::MAX; - let mut min_path = Vec::new(); - for path in all_paths.iter() { - if min_path_len < path.len() && min_cost < AVOID_RELAY_COST { - // the min path does not contain avoid relay node. - break; - } - - let mut cost = 0; - for i in 0..path.len() - 1 { - let src_peer_id = *graph.node_weight(path[i]).unwrap(); - let dst_peer_id = *graph.node_weight(path[i + 1]).unwrap(); - let edge_weight = *graph - .edge_weight(graph.find_edge(path[i], path[i + 1]).unwrap()) - .unwrap(); - if edge_weight != 1 { - // means avoid relay. - cost += edge_weight; - } else { - cost += cost_calc.calculate_cost(src_peer_id, dst_peer_id); - } - } - - if cost <= min_cost { - min_cost = cost; - min_path = path.clone(); - min_path_len = path.len(); - } - } - next_hop_map.insert( - *graph.node_weight(*node_idx).unwrap(), - NextHopInfo { - next_hop_peer_id: *graph.node_weight(min_path[1]).unwrap(), - path_latency: min_cost, - path_len: min_path_len, - }, - ); - } - - next_hop_map - } - - fn gen_next_hop_map_with_least_cost( - my_peer_id: PeerId, - graph: &PeerGraph, - idx_map: &PeerIdToNodexIdxMap, - ) -> NextHopMap { - let next_hop_map = NextHopMap::new(); - for item in idx_map.iter() { - if *item.key() == my_peer_id { - continue; - } - - let dst_peer_node_idx = *item.value(); - - let Some((cost, path)) = astar::astar( - graph, - *idx_map.get(&my_peer_id).unwrap(), - |node_idx| node_idx == dst_peer_node_idx, - |e| *e.weight(), - |_| 0, - ) else { - continue; - }; - - next_hop_map.insert( - *item.key(), - NextHopInfo { - next_hop_peer_id: *graph.node_weight(path[1]).unwrap(), - path_latency: cost, - path_len: path.len(), - }, - ); - } - - next_hop_map - } - - fn build_from_synced_info( - &self, my_peer_id: PeerId, synced_info: &SyncedRouteInfo, - policy: NextHopPolicy, - mut cost_calc: T, - ) { - // build peer_infos - self.peer_infos.clear(); + cost_calc: &T, + ) -> (PeerGraph, NodeIndex) { + let mut graph: PeerGraph = PeerGraph::new(); + + let mut start_node_idx = None; + let peer_id_to_node_index: PeerIdToNodexIdxMap = DashMap::new(); for item in synced_info.peer_infos.iter() { let peer_id = item.key(); let info = item.value(); @@ -777,49 +692,175 @@ impl RouteTable { continue; } - self.peer_infos.insert(*peer_id, info.clone()); + let node_idx = graph.add_node(*peer_id); + + peer_id_to_node_index.insert(*peer_id, node_idx); + if *peer_id == my_peer_id { + start_node_idx = Some(node_idx); + } } - if self.peer_infos.is_empty() { + if start_node_idx.is_none() { + return (graph, NodeIndex::end()); + } + + for item in peer_id_to_node_index.iter() { + let src_peer_id = item.key(); + let src_node_idx = item.value(); + let connected_peers = synced_info + .get_connected_peers(*src_peer_id) + .unwrap_or(BTreeSet::new()); + + // if avoid relay, just set all outgoing edges to a large value: AVOID_RELAY_COST. + let peer_avoid_relay_data = synced_info.get_avoid_relay_data(*src_peer_id); + + for dst_peer_id in connected_peers.iter() { + let Some(dst_node_idx) = peer_id_to_node_index.get(dst_peer_id) else { + continue; + }; + + let mut cost = cost_calc.calculate_cost(*src_peer_id, *dst_peer_id) as usize; + if peer_avoid_relay_data { + cost += AVOID_RELAY_COST; + } + + graph.add_edge(*src_node_idx, *dst_node_idx, cost); + } + } + + (graph, start_node_idx.unwrap()) + } + + fn clean_expired_route_info(&self) { + let cur_version = self.next_hop_map_version.get(); + self.next_hop_map.retain(|_, v| { + // remove next hop map for peers we cannot reach. + v.version >= cur_version + }); + self.peer_infos.retain(|k, _| { + // remove peer info for peers we cannot reach. + self.next_hop_map.contains_key(k) + }); + self.ipv4_peer_id_map.retain(|_, v| { + // remove ipv4 map for peers we cannot reach. + self.next_hop_map.contains_key(v) + }); + self.cidr_peer_id_map.retain(|_, v| { + // remove cidr map for peers we cannot reach. + self.next_hop_map.contains_key(v) + }); + } + + fn gen_next_hop_map_with_least_hop( + &self, + graph: &PeerGraph, + start_node: &NodeIndex, + version: Version, + ) { + let normalize_edge_cost = |e: petgraph::graph::EdgeReference| { + if *e.weight() >= AVOID_RELAY_COST { + AVOID_RELAY_COST + 1 + } else { + 1 + } + }; + // Step 1: 第一次 Dijkstra - 计算最短跳数 + let path_len_map = dijkstra(&graph, *start_node, None, normalize_edge_cost); + + // Step 2: 构建最短跳数子图(只保留属于最短路径和 AVOID RELAY 的边) + let mut subgraph: PeerGraph = PeerGraph::new(); + let mut start_node_idx = None; + for (node_idx, peer_id) in graph.node_references() { + let new_node_idx = subgraph.add_node(*peer_id); + if node_idx == *start_node { + start_node_idx = Some(new_node_idx); + } + } + + for edge in graph.edge_references() { + let (src, tgt) = graph.edge_endpoints(edge.id()).unwrap(); + let Some(src_path_len) = path_len_map.get(&src) else { + continue; + }; + let Some(tgt_path_len) = path_len_map.get(&tgt) else { + continue; + }; + if *src_path_len + normalize_edge_cost(edge) == *tgt_path_len { + subgraph.add_edge(src, tgt, *edge.weight()); + } + } + + // Step 3: 第二次 Dijkstra - 在子图上找代价最小的路径 + self.gen_next_hop_map_with_least_cost(&subgraph, &start_node_idx.clone().unwrap(), version); + } + + fn gen_next_hop_map_with_least_cost( + &self, + graph: &PeerGraph, + start_node: &NodeIndex, + version: Version, + ) { + let (costs, next_hops) = dijkstra_with_first_hop(&graph, *start_node, |e| *e.weight()); + + for (dst, (next_hop, path_len)) in next_hops.iter() { + let info = NextHopInfo { + next_hop_peer_id: *graph.node_weight(*next_hop).unwrap(), + path_latency: (*costs.get(dst).unwrap() % AVOID_RELAY_COST) as i32, + path_len: *path_len as usize, + version, + }; + let dst_peer_id = *graph.node_weight(*dst).unwrap(); + self.next_hop_map + .entry(dst_peer_id) + .and_modify(|x| { + if x.version < version { + *x = info; + } + }) + .or_insert(info); + } + + self.next_hop_map_version.set_if_larger(version); + } + + fn build_from_synced_info( + &self, + my_peer_id: PeerId, + synced_info: &SyncedRouteInfo, + policy: NextHopPolicy, + cost_calc: &T, + ) { + let version = synced_info.version.get(); + + // build next hop map + let (graph, start_node) = + Self::build_peer_graph_from_synced_info(my_peer_id, &synced_info, cost_calc); + + if graph.node_count() == 0 { + tracing::warn!("no peer in graph, cannot build next hop map"); return; } - // build next hop map - self.next_hop_map.clear(); - self.next_hop_map.insert( - my_peer_id, - NextHopInfo { - next_hop_peer_id: my_peer_id, - path_latency: 0, - path_len: 1, - }, - ); - let (graph, idx_map) = Self::build_peer_graph_from_synced_info( - self.peer_infos.iter().map(|x| *x.key()).collect(), - &synced_info, - &mut cost_calc, - ); - let next_hop_map = if matches!(policy, NextHopPolicy::LeastHop) { - Self::gen_next_hop_map_with_least_hop(my_peer_id, &graph, &idx_map, &mut cost_calc) + if matches!(policy, NextHopPolicy::LeastHop) { + self.gen_next_hop_map_with_least_hop(&graph, &start_node, version); } else { - Self::gen_next_hop_map_with_least_cost(my_peer_id, &graph, &idx_map) + self.gen_next_hop_map_with_least_cost(&graph, &start_node, version); }; - for item in next_hop_map.iter() { - self.next_hop_map.insert(*item.key(), *item.value()); - } - // build graph - // build ipv4_peer_id_map, cidr_peer_id_map - self.ipv4_peer_id_map.clear(); - self.cidr_peer_id_map.clear(); - for item in self.peer_infos.iter() { - // only set ipv4 map for peers we can reach. - if !self.next_hop_map.contains_key(item.key()) { + // build peer_infos, ipv4_peer_id_map, cidr_peer_id_map + // only set map for peers we can reach. + for item in self.next_hop_map.iter() { + if item.version < version { + // skip if the next hop entry is outdated. (peer is unreachable) continue; } let peer_id = item.key(); - let info = item.value(); + let Some(info) = synced_info.peer_infos.get(peer_id) else { + continue; + }; + + self.peer_infos.insert(*peer_id, info.clone()); if let Some(ipv4_addr) = info.ipv4_addr { self.ipv4_peer_id_map.insert(ipv4_addr.into(), *peer_id); @@ -1022,7 +1063,7 @@ struct PeerRouteServiceImpl { interface: Mutex>, - cost_calculator: std::sync::Mutex>, + cost_calculator: std::sync::RwLock>, route_table: RouteTable, route_table_with_cost: RouteTable, foreign_network_owner_map: DashMap>, @@ -1063,7 +1104,7 @@ impl PeerRouteServiceImpl { interface: Mutex::new(None), - cost_calculator: std::sync::Mutex::new(Some(Box::new(DefaultRouteCostCalculator))), + cost_calculator: std::sync::RwLock::new(Some(Box::new(DefaultRouteCostCalculator))), route_table: RouteTable::new(), route_table_with_cost: RouteTable::new(), @@ -1074,6 +1115,7 @@ impl PeerRouteServiceImpl { raw_peer_infos: DashMap::new(), conn_map: DashMap::new(), foreign_network: DashMap::new(), + version: AtomicVersion::new(), }, cached_local_conn_map: std::sync::Mutex::new(RouteConnBitmap::new()), @@ -1171,23 +1213,37 @@ impl PeerRouteServiceImpl { } fn update_route_table(&self) { - let mut calc_locked = self.cost_calculator.lock().unwrap(); + self.cost_calculator + .write() + .unwrap() + .as_mut() + .unwrap() + .begin_update(); + + let calc_locked = self.cost_calculator.read().unwrap(); - calc_locked.as_mut().unwrap().begin_update(); self.route_table.build_from_synced_info( self.my_peer_id, &self.synced_route_info, NextHopPolicy::LeastHop, - calc_locked.as_mut().unwrap(), + calc_locked.as_ref().unwrap(), ); self.route_table_with_cost.build_from_synced_info( self.my_peer_id, &self.synced_route_info, NextHopPolicy::LeastCost, - calc_locked.as_mut().unwrap(), + calc_locked.as_ref().unwrap(), ); - calc_locked.as_mut().unwrap().end_update(); + + drop(calc_locked); + + self.cost_calculator + .write() + .unwrap() + .as_mut() + .unwrap() + .end_update(); } fn update_foreign_network_owner_map(&self) { @@ -1221,7 +1277,7 @@ impl PeerRouteServiceImpl { fn cost_calculator_need_update(&self) -> bool { self.cost_calculator - .lock() + .read() .unwrap() .as_ref() .map(|x| x.need_update()) @@ -1411,6 +1467,9 @@ impl PeerRouteServiceImpl { for p in to_remove.iter() { self.synced_route_info.foreign_network.remove(p); } + + self.route_table.clean_expired_route_info(); + self.route_table_with_cost.clean_expired_route_info(); } fn build_sync_route_raw_req( @@ -2022,6 +2081,7 @@ impl PeerRoute { if service_impl.cost_calculator_need_update() { tracing::debug!("cost_calculator_need_update"); + service_impl.synced_route_info.version.inc(); service_impl.update_route_table(); } @@ -2136,7 +2196,7 @@ impl Route for PeerRoute { let next_hop_peer_latency_first = route_table_with_cost.get_next_hop(*item.key()); let mut route: crate::proto::cli::Route = item.value().clone().into(); route.next_hop_peer_id = next_hop_peer.next_hop_peer_id; - route.cost = (next_hop_peer.path_len - 1) as i32; + route.cost = next_hop_peer.path_len as i32; route.path_latency = next_hop_peer.path_latency; route.next_hop_peer_id_latency_first = @@ -2166,7 +2226,8 @@ impl Route for PeerRoute { } async fn set_route_cost_fn(&self, _cost_fn: RouteCostCalculator) { - *self.service_impl.cost_calculator.lock().unwrap() = Some(_cost_fn); + *self.service_impl.cost_calculator.write().unwrap() = Some(_cost_fn); + self.service_impl.synced_route_info.version.inc(); self.service_impl.update_route_table(); } @@ -2307,7 +2368,10 @@ mod tests { for r in vec![r_a.clone(), r_b.clone()].iter() { wait_for_condition( - || async { r.list_routes().await.len() == 1 }, + || async { + println!("route: {:?}", r.list_routes().await); + r.list_routes().await.len() == 1 + }, Duration::from_secs(5), ) .await; @@ -2348,6 +2412,8 @@ mod tests { assert_eq!(i_a.0, i_b.1); assert_eq!(i_b.0, i_a.1); + println!("after drop p_b, r_b"); + drop(r_b); drop(p_b);