From 793889c3b784253459b0c6d883d7390de1f1e6f3 Mon Sep 17 00:00:00 2001 From: "Sijie.Sun" Date: Sat, 13 Sep 2025 08:48:50 +0800 Subject: [PATCH] fix ospf ipv4 map error when ipv4 conflicted and changed (#1359) --- easytier/src/peers/peer_ospf_route.rs | 117 +++++++++++++++++++++----- 1 file changed, 94 insertions(+), 23 deletions(-) diff --git a/easytier/src/peers/peer_ospf_route.rs b/easytier/src/peers/peer_ospf_route.rs index f71e4bf..9144262 100644 --- a/easytier/src/peers/peer_ospf_route.rs +++ b/easytier/src/peers/peer_ospf_route.rs @@ -720,15 +720,20 @@ struct NextHopInfo { } // dst_peer_id -> (next_hop_peer_id, cost, path_len) type NextHopMap = DashMap; +#[derive(Debug, Clone, Copy)] +struct PeerIdAndVersion { + peer_id: PeerId, + version: Version, +} // computed with SyncedRouteInfo. used to get next hop. #[derive(Debug)] struct RouteTable { peer_infos: DashMap, next_hop_map: NextHopMap, - ipv4_peer_id_map: DashMap, - ipv6_peer_id_map: DashMap, - cidr_peer_id_map: DashMap, + ipv4_peer_id_map: DashMap, + ipv6_peer_id_map: DashMap, + cidr_peer_id_map: DashMap, next_hop_map_version: AtomicVersion, } @@ -834,15 +839,15 @@ impl RouteTable { }); self.ipv4_peer_id_map.retain(|_, v| { // remove ipv4 map for peers we cannot reach. - self.next_hop_map.contains_key(v) + self.next_hop_map.contains_key(&v.peer_id) }); self.ipv6_peer_id_map.retain(|_, v| { // remove ipv6 map for peers we cannot reach. - self.next_hop_map.contains_key(v) + self.next_hop_map.contains_key(&v.peer_id) }); self.cidr_peer_id_map.retain(|_, v| { // remove cidr map for peers we cannot reach. - self.next_hop_map.contains_key(v) + self.next_hop_map.contains_key(&v.peer_id) }); } @@ -957,8 +962,19 @@ impl RouteTable { self.peer_infos.insert(*peer_id, info.clone()); - let is_new_peer_better = |old_peer_id: PeerId| -> bool { - let old_next_hop = self.get_next_hop(old_peer_id); + let peer_id_and_version = PeerIdAndVersion { + peer_id: *peer_id, + version, + }; + + let is_new_peer_better = |old_peer: &PeerIdAndVersion| -> bool { + if peer_id_and_version.version > old_peer.version { + return true; + } + if peer_id_and_version.peer_id == old_peer.peer_id { + return false; + } + let old_next_hop = self.get_next_hop(old_peer.peer_id); let new_next_hop = item.value(); old_next_hop.is_none() || new_next_hop.path_len < old_next_hop.unwrap().path_len }; @@ -967,34 +983,34 @@ impl RouteTable { self.ipv4_peer_id_map .entry(ipv4_addr.into()) .and_modify(|v| { - if *v != *peer_id && is_new_peer_better(*v) { - *v = *peer_id; + if is_new_peer_better(v) { + *v = peer_id_and_version; } }) - .or_insert(*peer_id); + .or_insert(peer_id_and_version); } if let Some(ipv6_addr) = info.ipv6_addr.and_then(|x| x.address) { self.ipv6_peer_id_map .entry(ipv6_addr.into()) .and_modify(|v| { - if *v != *peer_id && is_new_peer_better(*v) { - *v = *peer_id; + if is_new_peer_better(v) { + *v = peer_id_and_version; } }) - .or_insert(*peer_id); + .or_insert(peer_id_and_version); } for cidr in info.proxy_cidrs.iter() { self.cidr_peer_id_map .entry(cidr.parse().unwrap()) .and_modify(|v| { - if *v != *peer_id && is_new_peer_better(*v) { + if is_new_peer_better(v) { // if the next hop is not set or the new next hop is better, update it. - *v = *peer_id; + *v = peer_id_and_version; } }) - .or_insert(*peer_id); + .or_insert(peer_id_and_version); } } } @@ -1004,7 +1020,7 @@ impl RouteTable { for item in self.cidr_peer_id_map.iter() { let (k, v) = item.pair(); if k.contains(&ipv4) { - return Some(*v); + return Some(v.peer_id); } } None @@ -2376,8 +2392,8 @@ impl Route for PeerRoute { async fn get_peer_id_by_ipv4(&self, ipv4_addr: &Ipv4Addr) -> Option { let route_table = &self.service_impl.route_table; - if let Some(peer_id) = route_table.ipv4_peer_id_map.get(ipv4_addr) { - return Some(*peer_id); + if let Some(p) = route_table.ipv4_peer_id_map.get(ipv4_addr) { + return Some(p.peer_id); } if let Some(peer_id) = route_table.get_peer_id_for_proxy(ipv4_addr) { @@ -2390,8 +2406,8 @@ impl Route for PeerRoute { async fn get_peer_id_by_ipv6(&self, ipv6_addr: &Ipv6Addr) -> Option { let route_table = &self.service_impl.route_table; - if let Some(peer_id) = route_table.ipv6_peer_id_map.get(ipv6_addr) { - return Some(*peer_id); + if let Some(p) = route_table.ipv6_peer_id_map.get(ipv6_addr) { + return Some(p.peer_id); } // TODO: Add proxy support for IPv6 similar to IPv4 @@ -2493,6 +2509,7 @@ mod tests { time::Duration, }; + use cidr::{Ipv4Cidr, Ipv4Inet, Ipv6Inet}; use dashmap::DashMap; use prost_reflect::{DynamicMessage, ReflectMessage}; @@ -2504,7 +2521,7 @@ mod tests { peer_manager::{PeerManager, RouteAlgoType}, peer_ospf_route::PeerRouteServiceImpl, route_trait::{NextHopPolicy, Route, RouteCostCalculatorInterface}, - tests::connect_peer_manager, + tests::{connect_peer_manager, create_mock_peer_manager}, }, proto::{ common::NatType, @@ -2964,4 +2981,58 @@ mod tests { assert_eq!(req, req2); } + + #[tokio::test] + async fn test_peer_id_map_override() { + let p_a = create_mock_peer_manager().await; + let p_b = create_mock_peer_manager().await; + let p_c = create_mock_peer_manager().await; + + connect_peer_manager(p_a.clone(), p_b.clone()).await; + connect_peer_manager(p_b.clone(), p_c.clone()).await; + + let ip: Ipv4Inet = "10.0.0.1/24".parse().unwrap(); + let ipv6: Ipv6Inet = "2001:db8::1/64".parse().unwrap(); + let proxy: Ipv4Cidr = "10.3.0.0/24".parse().unwrap(); + let check_route_peer_id = async |p: Arc| { + let p = p.clone(); + wait_for_condition( + || async { + p_a.get_route().get_peer_id_by_ipv4(&ip.address()).await == Some(p.my_peer_id()) + && p_a.get_route().get_peer_id_by_ipv6(&ipv6.address()).await + == Some(p.my_peer_id()) + && p_a + .get_route() + .get_peer_id_by_ipv4(&proxy.first_address()) + .await + == Some(p.my_peer_id()) + }, + Duration::from_secs(5), + ) + .await; + }; + + p_c.get_global_ctx().set_ipv4(Some(ip)); + p_c.get_global_ctx().set_ipv6(Some(ipv6)); + p_c.get_global_ctx() + .config + .add_proxy_cidr(proxy, None) + .unwrap(); + check_route_peer_id(p_c.clone()).await; + + p_b.get_global_ctx().set_ipv4(Some(ip)); + p_b.get_global_ctx().set_ipv6(Some(ipv6)); + p_b.get_global_ctx() + .config + .add_proxy_cidr(proxy, None) + .unwrap(); + check_route_peer_id(p_b.clone()).await; + + p_b.get_global_ctx() + .set_ipv4(Some("10.0.0.2/24".parse().unwrap())); + p_b.get_global_ctx() + .set_ipv6(Some("2001:db8::2/64".parse().unwrap())); + p_b.get_global_ctx().config.remove_proxy_cidr(proxy); + check_route_peer_id(p_c.clone()).await; + } }