From ae731bdb3ca1e21160abb342625d7a9e1263dd60 Mon Sep 17 00:00:00 2001 From: "sijie.sun" Date: Sun, 2 Nov 2025 10:31:45 +0800 Subject: [PATCH] tmp --- easytier/src/peers/peer_ospf_route.rs | 227 ++++++++++++++------------ 1 file changed, 122 insertions(+), 105 deletions(-) diff --git a/easytier/src/peers/peer_ospf_route.rs b/easytier/src/peers/peer_ospf_route.rs index 7350d26..a247c1a 100644 --- a/easytier/src/peers/peer_ospf_route.rs +++ b/easytier/src/peers/peer_ospf_route.rs @@ -275,12 +275,29 @@ impl RouteConnBitmap { type Error = SyncRouteInfoError; +#[derive(Debug, Clone)] +struct RouteConnInfo { + connected_peers: BTreeSet, + version: AtomicVersion, + last_update: SystemTime, +} + +impl Default for RouteConnInfo { + fn default() -> Self { + Self { + connected_peers: BTreeSet::new(), + version: AtomicVersion::new(), + last_update: SystemTime::now(), + } + } +} + // constructed with all infos synced from all peers. struct SyncedRouteInfo { peer_infos: RwLock>, // prost doesn't support unknown fields, so we use DynamicMessage to store raw infos and progate them to other peers. raw_peer_infos: DashMap, - conn_map: DashMap, AtomicVersion)>, + conn_map: RwLock>, foreign_network: DashMap, group_trust_map: DashMap>>, group_trust_map_cache: DashMap>>, // cache for group trust map, should sync with group_trust_map @@ -303,21 +320,21 @@ impl Debug for SyncedRouteInfo { impl SyncedRouteInfo { fn get_connected_peers>(&self, peer_id: PeerId) -> Option { self.conn_map + .read() .get(&peer_id) - .map(|x| x.0.clone().iter().copied().collect()) + .map(|x| x.connected_peers.iter().copied().collect()) } fn remove_peer(&self, peer_id: PeerId) { tracing::warn!(?peer_id, "remove_peer from synced_route_info"); self.peer_infos.write().remove(&peer_id); self.raw_peer_infos.remove(&peer_id); - self.conn_map.remove(&peer_id); + self.conn_map.write().remove(&peer_id); self.foreign_network.retain(|k, _| k.peer_id != peer_id); self.group_trust_map.remove(&peer_id); self.group_trust_map_cache.remove(&peer_id); shrink_dashmap(&self.raw_peer_infos, None); - shrink_dashmap(&self.conn_map, None); shrink_dashmap(&self.foreign_network, None); shrink_dashmap(&self.group_trust_map, None); shrink_dashmap(&self.group_trust_map_cache, None); @@ -339,10 +356,14 @@ impl SyncedRouteInfo { drop(guard); } - self.conn_map.entry(*peer_id).or_insert_with(|| { + let guard = self.conn_map.upgradable_read(); + if !guard.contains_key(peer_id) { + let mut guard = RwLockUpgradableReadGuard::upgrade(guard); + guard.insert(*peer_id, RouteConnInfo::default()); need_inc_version = true; - (BTreeSet::new(), AtomicVersion::new()) - }); + } else { + drop(guard); + } } if need_inc_version { self.version.inc(); @@ -455,7 +476,31 @@ impl SyncedRouteInfo { Ok(()) } - fn update_conn_map(&self, conn_bitmap: &RouteConnBitmap) { + fn update_conn_info_one_peer( + &self, + peer_id_version: &PeerIdVersion, + connected_peers: BTreeSet, + ) -> bool { + let mut guard = self.conn_map.write(); + if guard + .get_mut(&peer_id_version.peer_id) + .is_none_or(|old| peer_id_version.version > old.version.get()) + { + guard.insert( + peer_id_version.peer_id, + RouteConnInfo { + connected_peers, + version: peer_id_version.version.into(), + last_update: SystemTime::now(), + }, + ); + return true; + } + + false + } + + fn update_conn_info_with_bitmap(&self, conn_bitmap: &RouteConnBitmap) { self.fill_empty_peer_info(&conn_bitmap.peer_ids.iter().map(|x| x.peer_id).collect()); let mut need_inc_version = false; @@ -463,27 +508,14 @@ impl SyncedRouteInfo { for (peer_idx, peer_id_version) in conn_bitmap.peer_ids.iter().enumerate() { let connceted_peers = conn_bitmap.get_connected_peers(peer_idx); self.fill_empty_peer_info(&connceted_peers); - - self.conn_map - .entry(peer_id_version.peer_id) - .and_modify(|(old_conn_bitmap, old_version)| { - if peer_id_version.version > old_version.get() { - *old_conn_bitmap = connceted_peers.clone(); - need_inc_version = true; - old_version.set(peer_id_version.version); - } - }) - .or_insert_with(|| { - need_inc_version = true; - (connceted_peers, (peer_id_version.version).into()) - }); + need_inc_version = self.update_conn_info_one_peer(peer_id_version, connceted_peers); } if need_inc_version { self.version.inc(); } } - fn update_conn_peer_list(&self, conn_peer_list: &RouteConnPeerList) { + fn update_conn_info_with_list(&self, conn_peer_list: &RouteConnPeerList) { let mut need_inc_version = false; for peer_conn_info in &conn_peer_list.peer_conn_infos { @@ -492,23 +524,9 @@ impl SyncedRouteInfo { }; let connected_peers: BTreeSet = peer_conn_info.connected_peer_ids.iter().copied().collect(); - let (peer_id, version) = (peer_id_version.peer_id, peer_id_version.version); self.fill_empty_peer_info(&connected_peers); - - self.conn_map - .entry(peer_id) - .and_modify(|(old_conn_bitmap, old_version)| { - if version > old_version.get() { - *old_conn_bitmap = connected_peers.clone(); - need_inc_version = true; - old_version.set(version); - } - }) - .or_insert_with(|| { - need_inc_version = true; - (connected_peers, version.into()) - }); + need_inc_version = self.update_conn_info_one_peer(&peer_id_version, connected_peers); } if need_inc_version { self.version.inc(); @@ -518,10 +536,10 @@ impl SyncedRouteInfo { fn update_conn_info(&self, conn_info: &ConnInfo) { match conn_info { ConnInfo::ConnBitmap(conn_bitmap) => { - self.update_conn_map(conn_bitmap); + self.update_conn_info_with_bitmap(conn_bitmap); } ConnInfo::ConnPeerList(conn_peer_list) => { - self.update_conn_peer_list(conn_peer_list); + self.update_conn_info_with_list(conn_peer_list); } } } @@ -592,18 +610,24 @@ impl SyncedRouteInfo { fn update_my_conn_info(&self, my_peer_id: PeerId, connected_peers: BTreeSet) -> bool { self.fill_empty_peer_info(&connected_peers); - let mut my_conn_info = self - .conn_map - .entry(my_peer_id) - .or_insert((BTreeSet::new(), AtomicVersion::new())); + let guard = self.conn_map.upgradable_read(); + let my_conn_info = guard.get(&my_peer_id); + let new_version = my_conn_info.map(|x| x.version.get()).unwrap_or(0) + 1; - if connected_peers == my_conn_info.value().0 { - false - } else { - let _ = std::mem::replace(&mut my_conn_info.value_mut().0, connected_peers); - my_conn_info.value().1.inc(); + if my_conn_info.is_none_or(|old| old.connected_peers != connected_peers) { + let mut guard = RwLockUpgradableReadGuard::upgrade(guard); + guard.insert( + my_peer_id, + RouteConnInfo { + connected_peers, + version: new_version.into(), + last_update: SystemTime::now(), + }, + ); self.version.inc(); true + } else { + false } } @@ -694,18 +718,6 @@ impl SyncedRouteInfo { } } - fn is_peer_bidirectly_connected(&self, src_peer_id: PeerId, dst_peer_id: PeerId) -> bool { - self.conn_map - .get(&src_peer_id) - .map(|x| x.0.contains(&dst_peer_id)) - .unwrap_or(false) - } - - fn is_peer_directly_connected(&self, src_peer_id: PeerId, dst_peer_id: PeerId) -> bool { - self.is_peer_bidirectly_connected(src_peer_id, dst_peer_id) - || self.is_peer_bidirectly_connected(dst_peer_id, src_peer_id) - } - fn verify_and_update_group_trusts( &self, peer_infos: &[RoutePeerInfo], @@ -1550,7 +1562,7 @@ impl PeerRouteServiceImpl { synced_route_info: SyncedRouteInfo { peer_infos: RwLock::new(OrderedHashMap::new()), raw_peer_infos: DashMap::new(), - conn_map: DashMap::new(), + conn_map: RwLock::new(OrderedHashMap::new()), foreign_network: DashMap::new(), group_trust_map: DashMap::new(), group_trust_map_cache: DashMap::new(), @@ -1738,49 +1750,57 @@ impl PeerRouteServiceImpl { // the conn_bitmap should contain complete list of directly connected peers. // use union of dst peers can preserve this property. - let all_dst_peer_ids = self - .synced_route_info - .conn_map - .iter() - .flat_map(|x| x.value().clone().0.into_iter()) - .collect::>(); - - let all_peer_ids = self - .synced_route_info - .conn_map - .iter() - .map(|x| PeerIdVersion { - peer_id: *x.key(), - version: x.value().1.get(), - }) - // do not sync conn info of peers that are not reachable from any peer. - .filter(|p| { - all_dst_peer_ids.contains(&p.peer_id) || self.route_table.peer_reachable(p.peer_id) - }) - .collect::>(); + let mut all_peer_ids: BTreeMap = BTreeMap::new(); + let mut add_to_all_peer_ids = |peer_id: PeerId, version: Version| { + all_peer_ids + .entry(peer_id) + .and_modify(|x| { + if *x < version { + *x = version; + } + }) + .or_insert(version); + }; + for item in self.synced_route_info.conn_map.read().iter() { + let src_peer_id = *item.0; + if !self.route_table.peer_reachable(src_peer_id) { + continue; + } + add_to_all_peer_ids(src_peer_id, item.1.version.get()); + for dst_peer_id in item.1.connected_peers.iter() { + add_to_all_peer_ids(*dst_peer_id, 0); + } + } let mut conn_bitmap = RouteConnBitmap { bitmap: vec![0; (all_peer_ids.len() * all_peer_ids.len()).div_ceil(8)], - peer_ids: all_peer_ids, + peer_ids: all_peer_ids + .iter() + .map(|x| PeerIdVersion { + peer_id: *x.0, + version: *x.1, + }) + .collect(), }; + let locked_conn_map = self.synced_route_info.conn_map.read(); let all_peer_ids = &conn_bitmap.peer_ids; for (peer_idx, peer_id_version) in all_peer_ids.iter().enumerate() { - let Some(connected) = self - .synced_route_info - .conn_map - .get(&peer_id_version.peer_id) - else { + let Some(connected) = locked_conn_map.get(&peer_id_version.peer_id) else { continue; }; for (idx, other_peer_id_version) in all_peer_ids.iter().enumerate() { - if connected.0.contains(&other_peer_id_version.peer_id) { + if connected + .connected_peers + .contains(&other_peer_id_version.peer_id) + { let bit_idx = peer_idx * all_peer_ids.len() + idx; conn_bitmap.bitmap[bit_idx / 8] |= 1 << (bit_idx % 8); } } } + drop(locked_conn_map); let mut locked = self.cached_local_conn_map.lock().unwrap(); if self @@ -1895,25 +1915,22 @@ impl PeerRouteServiceImpl { estimated_size: &mut usize, ) -> Option { let mut peer_conn_infos = Vec::new(); - let cached_conn_map = self.cached_local_conn_map.lock().unwrap(); *estimated_size = 0; - for peer_id_version in cached_conn_map.peer_ids.iter() { - let (peer_id, local_version) = (peer_id_version.peer_id, peer_id_version.version); - if session.check_saved_conn_version_update_to_date(peer_id, local_version) { + for (peer_id, conn_info) in self.synced_route_info.conn_map.read().iter() { + if session.check_saved_conn_version_update_to_date(*peer_id, conn_info.version.get()) { continue; } - let Some(connected) = self.synced_route_info.conn_map.get(&peer_id) else { - continue; - }; - peer_conn_infos.push(PeerConnInfo { - peer_id: Some(*peer_id_version), - connected_peer_ids: connected.0.iter().copied().collect(), + peer_id: Some(PeerIdVersion { + peer_id: *peer_id, + version: conn_info.version.get(), + }), + connected_peer_ids: conn_info.connected_peers.iter().copied().collect(), }); *estimated_size += std::mem::size_of::() - + connected.0.len() * std::mem::size_of::(); + + conn_info.connected_peers.len() * std::mem::size_of::(); } if peer_conn_infos.is_empty() { @@ -3225,13 +3242,13 @@ mod tests { let synced_info = &p.service_impl.synced_route_info; for routable_peer in routable_peers.iter() { // check conn map - let conns = synced_info - .conn_map - .get(&routable_peer.my_peer_id()) - .unwrap(); + let conns = { + let guard = synced_info.conn_map.read(); + guard.get(&routable_peer.my_peer_id()).cloned().unwrap() + }; assert_eq!( - conns.0, + conns.connected_peers, routable_peer .get_peer_map() .list_peers()