From cf2038b6c11486c8cc8978ecf3b47611ada7212d Mon Sep 17 00:00:00 2001 From: "sijie.sun" Date: Fri, 24 Oct 2025 00:36:17 +0800 Subject: [PATCH] make ospf route more effiencient --- Cargo.lock | 10 + easytier/Cargo.toml | 1 + easytier/src/peers/peer_ospf_route.rs | 332 +++++++++++++++++++------- easytier/src/tests/ipv6_test.rs | 3 +- 4 files changed, 253 insertions(+), 93 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a4d81c7..ef348d6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2136,6 +2136,7 @@ dependencies = [ "nix 0.29.0", "once_cell", "openssl", + "ordered_hash_map", "parking_lot", "percent-encoding", "petgraph 0.8.1", @@ -5498,6 +5499,15 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "ordered_hash_map" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6c699f8a30f345785be969deed7eee4c73a5de58c7faf61d6a3251ef798ff61" +dependencies = [ + "hashbrown 0.15.3", +] + [[package]] name = "os_info" version = "3.8.2" diff --git a/easytier/Cargo.toml b/easytier/Cargo.toml index 0c5819b..86d8de6 100644 --- a/easytier/Cargo.toml +++ b/easytier/Cargo.toml @@ -143,6 +143,7 @@ network-interface = "2.0" # for ospf route petgraph = "0.8.1" hashbrown = "0.15.3" +ordered_hash_map = "0.5.0" # for wireguard boringtun = { package = "boringtun-easytier", version = "0.6.1", optional = true } diff --git a/easytier/src/peers/peer_ospf_route.rs b/easytier/src/peers/peer_ospf_route.rs index d8f024d..7350d26 100644 --- a/easytier/src/peers/peer_ospf_route.rs +++ b/easytier/src/peers/peer_ospf_route.rs @@ -1,5 +1,5 @@ use std::{ - collections::{BTreeMap, BTreeSet, HashMap}, + collections::{BTreeMap, BTreeSet, HashMap, VecDeque}, fmt::Debug, net::{IpAddr, Ipv4Addr, Ipv6Addr}, sync::{ @@ -13,6 +13,8 @@ use arc_swap::ArcSwap; use cidr::{IpCidr, Ipv4Cidr, Ipv6Cidr}; use crossbeam::atomic::AtomicCell; use dashmap::DashMap; +use ordered_hash_map::OrderedHashMap; +use parking_lot::{lock_api::RwLockUpgradableReadGuard, RwLock}; use petgraph::{ algo::dijkstra, graph::{Graph, NodeIndex}, @@ -41,7 +43,7 @@ use crate::{ route_foreign_network_infos, route_foreign_network_summary, sync_route_info_request::ConnInfo, ForeignNetworkRouteInfoEntry, ForeignNetworkRouteInfoKey, OspfRouteRpc, OspfRouteRpcClientFactory, - OspfRouteRpcServer, PeerIdVersion, RouteForeignNetworkInfos, + OspfRouteRpcServer, PeerGroupInfo, PeerIdVersion, RouteForeignNetworkInfos, RouteForeignNetworkSummary, RoutePeerInfo, RoutePeerInfos, SyncRouteInfoError, SyncRouteInfoRequest, SyncRouteInfoResponse, }, @@ -70,6 +72,12 @@ static REMOVE_DEAD_PEER_INFO_AFTER: Duration = Duration::from_secs(3660); static AVOID_RELAY_COST: usize = i32::MAX as usize; static FORCE_USE_CONN_LIST: AtomicBool = AtomicBool::new(false); +// if a peer is unreachable for `REMOVE_UNREACHABLE_PEER_INFO_AFTER` time, we can remove it because +// 1. all the ospf sessions between two zone are already destroy, new created session will resend the peer info. +// 2. all the dst_saved_peer_info_version in all sessions already remove the peer info, the peer info will be propagated +// in another zone when two zone restore the conneciton. +static REMOVE_UNREACHABLE_PEER_INFO_AFTER: Duration = Duration::from_secs(90); + type Version = u32; #[derive(Debug, Clone)] @@ -124,7 +132,9 @@ impl RoutePeerInfo { proxy_cidrs: Vec::new(), hostname: None, udp_stun_info: 0, - last_update: Some(SystemTime::now().into()), + // ensure this is updated when the peer_infos/conn_info/foreign_network lock is acquired. + // else we may assign a older timestamp than iterate time. + last_update: None, version: 0, easytier_version: EASYTIER_VERSION.to_string(), feature_flag: None, @@ -136,13 +146,12 @@ impl RoutePeerInfo { } } - pub fn update_self( - &self, + pub fn new_updated_self( my_peer_id: PeerId, peer_route_id: u64, global_ctx: &ArcGlobalCtx, ) -> Self { - let mut new = Self { + Self { peer_id: my_peer_id, inst_id: Some(global_ctx.get_id().into()), cost: 0, @@ -160,9 +169,10 @@ impl RoutePeerInfo { .get_stun_info_collector() .get_stun_info() .udp_nat_type, - // following fields do not participate in comparison. - last_update: self.last_update, - version: self.version, + + // these two fields should not participate in comparison. + last_update: None, + version: 0, easytier_version: EASYTIER_VERSION.to_string(), feature_flag: Some(global_ctx.get_feature_flags()), @@ -176,22 +186,28 @@ impl RoutePeerInfo { ipv6_addr: global_ctx.get_ipv6().map(|x| x.into()), groups: global_ctx.get_acl_groups(my_peer_id), - }; + } + } + pub fn try_update_new_peer_info(old: &RoutePeerInfo, new: &mut RoutePeerInfo) -> bool { let need_update_periodically = if let Ok(Ok(d)) = - SystemTime::try_from(new.last_update.unwrap_or_default()).map(|x| x.elapsed()) + SystemTime::try_from(old.last_update.unwrap_or_default()).map(|x| x.elapsed()) { d > UPDATE_PEER_INFO_PERIOD } else { true }; - if new != *self || need_update_periodically { - new.last_update = Some(SystemTime::now().into()); - new.version += 1; - } + // these two fields should not participate in comparison. + new.version = old.version; + new.last_update = old.last_update; - new + if *new != *old || need_update_periodically { + new.version += 1; + true + } else { + false + } } } @@ -261,7 +277,7 @@ type Error = SyncRouteInfoError; // constructed with all infos synced from all peers. struct SyncedRouteInfo { - peer_infos: DashMap, + peer_infos: RwLock>, // prost doesn't support unknown fields, so we use DynamicMessage to store raw infos and progate them to other peers. raw_peer_infos: DashMap, conn_map: DashMap, AtomicVersion)>, @@ -293,14 +309,13 @@ impl SyncedRouteInfo { fn remove_peer(&self, peer_id: PeerId) { tracing::warn!(?peer_id, "remove_peer from synced_route_info"); - self.peer_infos.remove(&peer_id); + self.peer_infos.write().remove(&peer_id); self.raw_peer_infos.remove(&peer_id); self.conn_map.remove(&peer_id); self.foreign_network.retain(|k, _| k.peer_id != peer_id); self.group_trust_map.remove(&peer_id); self.group_trust_map_cache.remove(&peer_id); - shrink_dashmap(&self.peer_infos, None); shrink_dashmap(&self.raw_peer_infos, None); shrink_dashmap(&self.conn_map, None); shrink_dashmap(&self.foreign_network, None); @@ -313,10 +328,16 @@ impl SyncedRouteInfo { fn fill_empty_peer_info(&self, peer_ids: &BTreeSet) { let mut need_inc_version = false; for peer_id in peer_ids { - self.peer_infos.entry(*peer_id).or_insert_with(|| { + let guard = self.peer_infos.upgradable_read(); + if !guard.contains_key(peer_id) { + let mut peer_info = RoutePeerInfo::new(); + let mut guard = RwLockUpgradableReadGuard::upgrade(guard); + peer_info.last_update = Some(SystemTime::now().into()); + guard.insert(*peer_id, peer_info); need_inc_version = true; - RoutePeerInfo::new() - }); + } else { + drop(guard); + } self.conn_map.entry(*peer_id).or_insert_with(|| { need_inc_version = true; @@ -330,6 +351,7 @@ impl SyncedRouteInfo { fn get_peer_info_version_with_default(&self, peer_id: PeerId) -> Version { self.peer_infos + .read() .get(&peer_id) .map(|x| x.version) .unwrap_or(0) @@ -338,8 +360,9 @@ impl SyncedRouteInfo { fn get_avoid_relay_data(&self, peer_id: PeerId) -> bool { // if avoid relay, just set all outgoing edges to a large value: AVOID_RELAY_COST. self.peer_infos + .read() .get(&peer_id) - .and_then(|x| x.value().feature_flag) + .and_then(|x| x.feature_flag) .map(|x| x.avoid_relay_data) .unwrap_or_default() } @@ -395,7 +418,10 @@ impl SyncedRouteInfo { my_peer_route_id, dst_peer_id, if route_info.peer_id == dst_peer_id { - self.peer_infos.get(&dst_peer_id).map(|x| x.peer_route_id) + self.peer_infos + .read() + .get(&dst_peer_id) + .map(|x| x.peer_route_id) } else { None }, @@ -409,26 +435,19 @@ impl SyncedRouteInfo { .unwrap(); assert_eq!(peer_id_raw, route_info.peer_id); + let mut guard = self.peer_infos.write(); // time between peers may not be synchronized, so update last_update to local now. // note only last_update with larger version will be updated to local saved peer info. route_info.last_update = Some(SystemTime::now().into()); - - self.peer_infos - .entry(route_info.peer_id) - .and_modify(|old_entry| { - if route_info.version > old_entry.version { - self.raw_peer_infos - .insert(route_info.peer_id, raw_route_info.clone()); - *old_entry = route_info.clone(); - need_inc_version = true; - } - }) - .or_insert_with(|| { - need_inc_version = true; - self.raw_peer_infos - .insert(route_info.peer_id, raw_route_info.clone()); - route_info.clone() - }); + if guard + .get_mut(&route_info.peer_id) + .is_none_or(|old| route_info.version > old.version) + { + self.raw_peer_infos + .insert(route_info.peer_id, raw_route_info.clone()); + guard.insert(route_info.peer_id, route_info); + need_inc_version = true; + } } if need_inc_version { self.version.inc(); @@ -535,15 +554,34 @@ impl SyncedRouteInfo { my_peer_route_id: u64, global_ctx: &ArcGlobalCtx, ) -> bool { - let mut old = self.peer_infos.entry(my_peer_id).or_default(); - let new = old.update_self(my_peer_id, my_peer_route_id, global_ctx); - let new_version = new.version; - let old_version = old.version; - *old = new; - drop(old); + let mut new = RoutePeerInfo::new_updated_self(my_peer_id, my_peer_route_id, global_ctx); + let mut guard = self.peer_infos.upgradable_read(); + let old = guard.get(&my_peer_id); + let new_version = old.map(|x| x.version).unwrap_or(0) + 1; + let need_insert_new = if let Some(old) = old { + RoutePeerInfo::try_update_new_peer_info(old, &mut new) + } else { + true + }; + + if need_insert_new { + let acl_groups = if old.map(|x| x.groups != new.groups).unwrap_or(true) { + Some(new.groups.clone()) + } else { + None + }; + + guard.with_upgraded(|peer_infos| { + new.last_update = Some(SystemTime::now().into()); + new.version = new_version; + peer_infos.insert(my_peer_id, new) + }); + drop(guard); + + if let Some(acl_groups) = acl_groups { + self.update_my_group_trusts(my_peer_id, &acl_groups); + } - if new_version != old_version { - self.update_my_group_trusts(my_peer_id); self.version.inc(); true } else { @@ -634,6 +672,28 @@ impl SyncedRouteInfo { updated } + fn get_next_last_sync_succ_timestamp(&self) -> SystemTime { + let _peer_info_lock = self.peer_infos.read(); + // TODO: add conn and foreign network lock + + SystemTime::now() + } + + fn check_peer_info_last_update_monotonic_increasing(&self) { + let mut last_update: Option = None; + for peer_info in self.peer_infos.read().values() { + if let Some(last_update) = last_update { + let cur_last_update = peer_info.last_update.unwrap(); + assert!( + cur_last_update.seconds > last_update.seconds + || cur_last_update.nanos >= last_update.nanos, + "peer info last update not monotonic increasing" + ); + } + last_update = peer_info.last_update; + } + } + fn is_peer_bidirectly_connected(&self, src_peer_id: PeerId, dst_peer_id: PeerId) -> bool { self.conn_map .get(&src_peer_id) @@ -725,13 +785,15 @@ impl SyncedRouteInfo { } } - fn update_my_group_trusts(&self, my_peer_id: PeerId) { + fn update_my_group_trusts(&self, my_peer_id: PeerId, groups: &[PeerGroupInfo]) { let mut my_group_map = HashMap::new(); let mut my_group_names = Vec::new(); - for group in self.peer_infos.entry(my_peer_id).or_default().groups.iter() { + + for group in groups.iter() { my_group_map.insert(group.group_name.clone(), group.group_proof.clone()); my_group_names.push(group.group_name.clone()); } + self.group_trust_map.insert(my_peer_id, my_group_map); self.group_trust_map_cache .insert(my_peer_id, Arc::new(my_group_names)); @@ -749,21 +811,16 @@ struct NextHopInfo { } // dst_peer_id -> (next_hop_peer_id, cost, path_len) type NextHopMap = DashMap; -#[derive(Debug, Clone, Copy)] -struct PeerIdAndVersion { - peer_id: PeerId, - version: Version, -} // computed with SyncedRouteInfo. used to get next hop. #[derive(Debug)] struct RouteTable { peer_infos: DashMap, next_hop_map: NextHopMap, - ipv4_peer_id_map: DashMap, - ipv6_peer_id_map: DashMap, - cidr_peer_id_map: ArcSwap>, - cidr_v6_peer_id_map: ArcSwap>, + ipv4_peer_id_map: DashMap, + ipv6_peer_id_map: DashMap, + cidr_peer_id_map: ArcSwap>, + cidr_v6_peer_id_map: ArcSwap>, next_hop_map_version: AtomicVersion, } @@ -811,18 +868,17 @@ impl RouteTable { let mut start_node_idx = None; let peer_id_to_node_index: PeerIdToNodexIdxMap = DashMap::new(); - for item in synced_info.peer_infos.iter() { - let peer_id = item.key(); - let info = item.value(); + for (peer_id, info) in synced_info.peer_infos.read().iter() { + let peer_id = *peer_id; if info.version == 0 { continue; } - let node_idx = graph.add_node(*peer_id); + let node_idx = graph.add_node(peer_id); - peer_id_to_node_index.insert(*peer_id, node_idx); - if *peer_id == my_peer_id { + peer_id_to_node_index.insert(peer_id, node_idx); + if peer_id == my_peer_id { start_node_idx = Some(node_idx); } } @@ -991,18 +1047,18 @@ impl RouteTable { } let peer_id = item.key(); - let Some(info) = synced_info.peer_infos.get(peer_id) else { + let Some(info) = synced_info.peer_infos.read().get(peer_id).cloned() else { continue; }; self.peer_infos.insert(*peer_id, info.clone()); - let peer_id_and_version = PeerIdAndVersion { + let peer_id_and_version = PeerIdVersion { peer_id: *peer_id, version, }; - let is_new_peer_better = |old_peer: &PeerIdAndVersion| -> bool { + let is_new_peer_better = |old_peer: &PeerIdVersion| -> bool { if peer_id_and_version.version > old_peer.version { return true; } @@ -1194,6 +1250,11 @@ struct SyncRouteSession { dst_saved_conn_info_version: DashMap, dst_saved_foreign_network_versions: DashMap, + // we don't want to send unreachable peer infos to peer, so we keep track of them. + unreachable_peers: parking_lot::Mutex>, + + last_sync_succ_timestamp: AtomicCell>, + my_session_id: AtomicSessionId, dst_session_id: AtomicSessionId, @@ -1218,6 +1279,10 @@ impl SyncRouteSession { dst_saved_conn_info_version: DashMap::new(), dst_saved_foreign_network_versions: DashMap::new(), + unreachable_peers: parking_lot::Mutex::new(VecDeque::new()), + + last_sync_succ_timestamp: AtomicCell::new(None), + my_session_id: AtomicSessionId::new(rand::random()), dst_session_id: AtomicSessionId::new(0), @@ -1363,12 +1428,15 @@ impl SyncRouteSession { self.need_sync_initiator_info.store(true, Ordering::Relaxed); } + // return whether session id is updated fn update_dst_session_id(&self, session_id: SessionId) { if session_id != self.dst_session_id.load(Ordering::Relaxed) { tracing::warn!(?self, ?session_id, "session id mismatch, clear saved info."); self.dst_session_id.store(session_id, Ordering::Relaxed); self.dst_saved_conn_info_version.clear(); self.dst_saved_peer_info_versions.clear(); + self.last_sync_succ_timestamp.store(None); + self.unreachable_peers.lock().clear(); } } @@ -1386,6 +1454,16 @@ impl SyncRouteSession { self.dst_saved_foreign_network_versions.shrink_to_fit(); } + fn update_last_sync_succ_timestamp(&self, next_last_sync_succ_timestamp: SystemTime) { + let _ = self.last_sync_succ_timestamp.fetch_update(|x| { + if x.is_none_or(|old| old < next_last_sync_succ_timestamp) { + Some(Some(next_last_sync_succ_timestamp)) + } else { + None + } + }); + } + fn short_debug_string(&self) -> String { format!( "session_dst_peer: {:?}, my_session_id: {:?}, dst_session_id: {:?}, we_are_initiator: {:?}, dst_is_initiator: {:?}, rpc_tx_count: {:?}, rpc_rx_count: {:?}, task: {:?}", @@ -1470,7 +1548,7 @@ impl PeerRouteServiceImpl { foreign_network_my_peer_id_map: DashMap::new(), synced_route_info: SyncedRouteInfo { - peer_infos: DashMap::new(), + peer_infos: RwLock::new(OrderedHashMap::new()), raw_peer_infos: DashMap::new(), conn_map: DashMap::new(), foreign_network: DashMap::new(), @@ -1714,22 +1792,73 @@ impl PeerRouteServiceImpl { } fn build_route_info(&self, session: &SyncRouteSession) -> Option> { + self.synced_route_info + .check_peer_info_last_update_monotonic_increasing(); let mut route_infos = Vec::new(); - for item in self.synced_route_info.peer_infos.iter() { + let peer_infos = self.synced_route_info.peer_infos.read(); + let last_sync_succ_timestamp = session.last_sync_succ_timestamp.load(); + for (peer_id, peer_info) in peer_infos.iter().rev() { + // stop iter if last_update of peer info is older than session.latest_scanned_peer_info_timestamp + if let Some(last_update) = peer_info.last_update { + let last_update = TryInto::::try_into(last_update).unwrap(); + if last_sync_succ_timestamp.is_some_and(|t| last_update < t) { + tracing::debug!( + "ignore peer_info {:?} because last_update: {:?} is older than last_sync_succ_timestamp: {:?}, peer_infos_count: {}, my_peer_id: {:?}, session: {:?}", + peer_info, + last_update, + last_sync_succ_timestamp, + peer_infos.len(), + self.my_peer_id, + session + ); + continue; + } + } + + if session.check_saved_peer_info_update_to_date(peer_info.peer_id, peer_info.version) { + continue; + } + // do not send unreachable peer info to dst peer. - if !self.route_table.peer_reachable(*item.key()) { + if !self.route_table.peer_reachable(*peer_id) { + session.unreachable_peers.lock().push_front(PeerIdVersion { + peer_id: *peer_id, + version: peer_info.version, + }); continue; } - if session - .check_saved_peer_info_update_to_date(item.value().peer_id, item.value().version) - { - continue; - } - - route_infos.push(item.value().clone()); + route_infos.push(peer_info.clone()); } + let mut unreachable_peers = session.unreachable_peers.lock(); + let cur_len = unreachable_peers.len(); + for _ in 0..cur_len { + let peer_id_version = unreachable_peers.pop_back().unwrap(); + let peer_id = peer_id_version.peer_id; + let version = peer_id_version.version; + if session.check_saved_peer_info_update_to_date(peer_id, version) { + // already up-to-date, skip + continue; + } + let reachable = self.route_table.peer_reachable(peer_id); + match self.synced_route_info.peer_infos.read().get(&peer_id) { + Some(route_info) => { + if reachable { + route_infos.push(route_info.clone()); + } + // this round rpc may fail, so keep it and remove the id only when it's in dst_saved_map + unreachable_peers.push_front(peer_id_version); + } + None => { + // if not found in peer info map, forget this peer id. + continue; + } + }; + } + + unreachable_peers.shrink_to_fit(); + if route_infos.is_empty() { None } else { @@ -1860,6 +1989,7 @@ impl PeerRouteServiceImpl { let dst_supports_peer_list = self .synced_route_info .peer_infos + .read() .get(&dst_peer_id) .and_then(|p| p.feature_flag) .map(|x| x.support_conn_list_sync) @@ -1888,11 +2018,13 @@ impl PeerRouteServiceImpl { fn clear_expired_peer(&self) { let now = SystemTime::now(); let mut to_remove = Vec::new(); - for item in self.synced_route_info.peer_infos.iter() { - if let Ok(d) = now.duration_since(item.value().last_update.unwrap().try_into().unwrap()) - { - if d > REMOVE_DEAD_PEER_INFO_AFTER { - to_remove.push(*item.key()); + for (peer_id, peer_info) in self.synced_route_info.peer_infos.read().iter() { + if let Ok(d) = now.duration_since(peer_info.last_update.unwrap().try_into().unwrap()) { + if d > REMOVE_DEAD_PEER_INFO_AFTER + || (d > REMOVE_UNREACHABLE_PEER_INFO_AFTER + && !self.route_table.peer_reachable(*peer_id)) + { + to_remove.push(*peer_id); } } } @@ -1971,6 +2103,8 @@ impl PeerRouteServiceImpl { let my_peer_id = self.my_peer_id; + let next_last_sync_succ_timestamp = + self.synced_route_info.get_next_last_sync_succ_timestamp(); let (peer_infos, conn_info, foreign_network) = self.build_sync_request(&session, dst_peer_id); if peer_infos.is_none() @@ -2020,6 +2154,11 @@ impl PeerRouteServiceImpl { .sync_route_info(ctrl, SyncRouteInfoRequest::default()) .await; + tracing::debug!( + "sync_route_info resp: {:?}, req: {:?}, session: {:?}, my_info: {:?}, next_last_sync_succ_timestamp: {:?}", + ret, sync_route_info_req, session, self.global_ctx.network, next_last_sync_succ_timestamp + ); + if let Err(e) = &ret { tracing::error!( ?ret, @@ -2066,6 +2205,8 @@ impl PeerRouteServiceImpl { if let Some(foreign_network) = &foreign_network { session.update_dst_saved_foreign_network_version(foreign_network, dst_peer_id); } + + session.update_last_sync_succ_timestamp(next_last_sync_succ_timestamp); } } false @@ -2840,7 +2981,7 @@ mod tests { peers::{ create_packet_recv_chan, peer_manager::{PeerManager, RouteAlgoType}, - peer_ospf_route::{PeerIdAndVersion, PeerRouteServiceImpl, FORCE_USE_CONN_LIST}, + peer_ospf_route::{PeerIdVersion, PeerRouteServiceImpl, FORCE_USE_CONN_LIST}, route_trait::{NextHopPolicy, Route, RouteCostCalculatorInterface}, tests::{connect_peer_manager, create_mock_peer_manager, wait_route_appear}, }, @@ -2923,8 +3064,14 @@ mod tests { tokio::time::sleep(Duration::from_secs(3)).await; - assert_eq!(2, r_a.service_impl.synced_route_info.peer_infos.len()); - assert_eq!(2, r_b.service_impl.synced_route_info.peer_infos.len()); + assert_eq!( + 2, + r_a.service_impl.synced_route_info.peer_infos.read().len() + ); + assert_eq!( + 2, + r_b.service_impl.synced_route_info.peer_infos.read().len() + ); for s in r_a.service_impl.sessions.iter() { assert!(s.value().task.is_running()); @@ -2934,6 +3081,7 @@ mod tests { r_a.service_impl .synced_route_info .peer_infos + .read() .get(&p_a.my_peer_id()) .unwrap() .version, @@ -2990,7 +3138,7 @@ mod tests { for r in [r_a.clone(), r_b.clone(), r_c.clone()].iter() { wait_for_condition( - || async { r.service_impl.synced_route_info.peer_infos.len() == 3 }, + || async { r.service_impl.synced_route_info.peer_infos.read().len() == 3 }, Duration::from_secs(5), ) .await; @@ -3095,7 +3243,9 @@ mod tests { // check peer infos let peer_info = synced_info .peer_infos + .read() .get(&routable_peer.my_peer_id()) + .cloned() .unwrap(); assert_eq!(peer_info.peer_id, routable_peer.my_peer_id()); } @@ -3125,7 +3275,7 @@ mod tests { for r in [r_a.clone(), r_b.clone(), r_c.clone()].iter() { wait_for_condition( - || async { r.service_impl.synced_route_info.peer_infos.len() == 3 }, + || async { r.service_impl.synced_route_info.peer_infos.read().len() == 3 }, Duration::from_secs(5), ) .await; @@ -3392,10 +3542,10 @@ mod tests { let proxy_cidr: Ipv4Cidr = "192.168.100.0/24".parse().unwrap(); let test_ip = proxy_cidr.first_address(); - let mut cidr_peer_id_map: PrefixMap = PrefixMap::new(); + let mut cidr_peer_id_map: PrefixMap = PrefixMap::new(); cidr_peer_id_map.insert( proxy_cidr, - PeerIdAndVersion { + PeerIdVersion { peer_id: p_c.my_peer_id(), version: 0, }, diff --git a/easytier/src/tests/ipv6_test.rs b/easytier/src/tests/ipv6_test.rs index d71dc07..07bd1b6 100644 --- a/easytier/src/tests/ipv6_test.rs +++ b/easytier/src/tests/ipv6_test.rs @@ -38,8 +38,7 @@ async fn test_route_peer_info_ipv6() { global_ctx.set_ipv6(Some(ipv6_cidr)); // Create RoutePeerInfo with IPv6 support - let peer_info = RoutePeerInfo::new(); - let updated_info = peer_info.update_self(123, 456, &global_ctx); + let updated_info = RoutePeerInfo::new_updated_self(123, 456, &global_ctx); // Verify IPv6 address is included assert!(updated_info.ipv6_addr.is_some());