diff --git a/.github/workflows/core.yml b/.github/workflows/core.yml index 4593234..12cdcc9 100644 --- a/.github/workflows/core.yml +++ b/.github/workflows/core.yml @@ -160,7 +160,7 @@ jobs: # The prefix cache key, this can be changed to start a new cache manually. # default: "v0-rust" prefix-key: "" - + cache-targets: "false" - name: Setup protoc uses: arduino/setup-protoc@v3 diff --git a/Cargo.lock b/Cargo.lock index a4d81c7..ef348d6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2136,6 +2136,7 @@ dependencies = [ "nix 0.29.0", "once_cell", "openssl", + "ordered_hash_map", "parking_lot", "percent-encoding", "petgraph 0.8.1", @@ -5498,6 +5499,15 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "ordered_hash_map" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6c699f8a30f345785be969deed7eee4c73a5de58c7faf61d6a3251ef798ff61" +dependencies = [ + "hashbrown 0.15.3", +] + [[package]] name = "os_info" version = "3.8.2" diff --git a/easytier/Cargo.toml b/easytier/Cargo.toml index 0c5819b..86d8de6 100644 --- a/easytier/Cargo.toml +++ b/easytier/Cargo.toml @@ -143,6 +143,7 @@ network-interface = "2.0" # for ospf route petgraph = "0.8.1" hashbrown = "0.15.3" +ordered_hash_map = "0.5.0" # for wireguard boringtun = { package = "boringtun-easytier", version = "0.6.1", optional = true } diff --git a/easytier/src/peers/peer_ospf_route.rs b/easytier/src/peers/peer_ospf_route.rs index d8f024d..7c808f4 100644 --- a/easytier/src/peers/peer_ospf_route.rs +++ b/easytier/src/peers/peer_ospf_route.rs @@ -13,6 +13,8 @@ use arc_swap::ArcSwap; use cidr::{IpCidr, Ipv4Cidr, Ipv6Cidr}; use crossbeam::atomic::AtomicCell; use dashmap::DashMap; +use ordered_hash_map::OrderedHashMap; +use parking_lot::{lock_api::RwLockUpgradableReadGuard, RwLock}; use petgraph::{ algo::dijkstra, graph::{Graph, NodeIndex}, @@ -41,7 +43,7 @@ use crate::{ route_foreign_network_infos, route_foreign_network_summary, sync_route_info_request::ConnInfo, ForeignNetworkRouteInfoEntry, ForeignNetworkRouteInfoKey, OspfRouteRpc, OspfRouteRpcClientFactory, - OspfRouteRpcServer, PeerIdVersion, RouteForeignNetworkInfos, + OspfRouteRpcServer, PeerGroupInfo, PeerIdVersion, RouteForeignNetworkInfos, RouteForeignNetworkSummary, RoutePeerInfo, RoutePeerInfos, SyncRouteInfoError, SyncRouteInfoRequest, SyncRouteInfoResponse, }, @@ -70,6 +72,12 @@ static REMOVE_DEAD_PEER_INFO_AFTER: Duration = Duration::from_secs(3660); static AVOID_RELAY_COST: usize = i32::MAX as usize; static FORCE_USE_CONN_LIST: AtomicBool = AtomicBool::new(false); +// if a peer is unreachable for `REMOVE_UNREACHABLE_PEER_INFO_AFTER` time, we can remove it because +// 1. all the ospf sessions between two zone are already destroy, new created session will resend the peer info. +// 2. all the dst_saved_peer_info_version in all sessions already remove the peer info, the peer info will be propagated +// in another zone when two zone restore the conneciton. +static REMOVE_UNREACHABLE_PEER_INFO_AFTER: Duration = Duration::from_secs(90); + type Version = u32; #[derive(Debug, Clone)] @@ -124,7 +132,9 @@ impl RoutePeerInfo { proxy_cidrs: Vec::new(), hostname: None, udp_stun_info: 0, - last_update: Some(SystemTime::now().into()), + // ensure this is updated when the peer_infos/conn_info/foreign_network lock is acquired. + // else we may assign a older timestamp than iterate time. + last_update: None, version: 0, easytier_version: EASYTIER_VERSION.to_string(), feature_flag: None, @@ -136,13 +146,21 @@ impl RoutePeerInfo { } } - pub fn update_self( - &self, + /// Creates a new `RoutePeerInfo` instance with updated information from the given context. + /// + /// # Parameters + /// - `my_peer_id`: The unique identifier for the peer. + /// - `peer_route_id`: The route identifier associated with the peer. + /// - `global_ctx`: Reference to the global context containing configuration and state. + /// + /// # Returns + /// A new `RoutePeerInfo` instance initialized with values from the provided context and parameters. + pub fn new_updated_self( my_peer_id: PeerId, peer_route_id: u64, global_ctx: &ArcGlobalCtx, ) -> Self { - let mut new = Self { + Self { peer_id: my_peer_id, inst_id: Some(global_ctx.get_id().into()), cost: 0, @@ -160,9 +178,10 @@ impl RoutePeerInfo { .get_stun_info_collector() .get_stun_info() .udp_nat_type, - // following fields do not participate in comparison. - last_update: self.last_update, - version: self.version, + + // these two fields should not participate in comparison. + last_update: None, + version: 0, easytier_version: EASYTIER_VERSION.to_string(), feature_flag: Some(global_ctx.get_feature_flags()), @@ -176,22 +195,38 @@ impl RoutePeerInfo { ipv6_addr: global_ctx.get_ipv6().map(|x| x.into()), groups: global_ctx.get_acl_groups(my_peer_id), - }; + } + } + /// Attempts to update the `new` RoutePeerInfo based on the `old` RoutePeerInfo. + /// + /// An update is triggered if any fields in `new` differ from `old`, or if the time since + /// `old.last_update` exceeds the `UPDATE_PEER_INFO_PERIOD`. + /// + /// If an update occurs, `new.last_update` is set to the current time and `new.version` is incremented. + /// Otherwise, `new.last_update` and `new.version` are copied from `old` without modification. + /// + /// Returns `true` if an update was performed (fields changed or periodic update required), + /// or `false` if no update was necessary. + pub fn try_update_new_peer_info(old: &RoutePeerInfo, new: &mut RoutePeerInfo) -> bool { let need_update_periodically = if let Ok(Ok(d)) = - SystemTime::try_from(new.last_update.unwrap_or_default()).map(|x| x.elapsed()) + SystemTime::try_from(old.last_update.unwrap_or_default()).map(|x| x.elapsed()) { d > UPDATE_PEER_INFO_PERIOD } else { true }; - if new != *self || need_update_periodically { - new.last_update = Some(SystemTime::now().into()); - new.version += 1; - } + // these two fields should not participate in comparison. + new.version = old.version; + new.last_update = old.last_update; - new + if *new != *old || need_update_periodically { + new.version += 1; + true + } else { + false + } } } @@ -259,12 +294,29 @@ impl RouteConnBitmap { type Error = SyncRouteInfoError; +#[derive(Debug, Clone)] +struct RouteConnInfo { + connected_peers: BTreeSet, + version: AtomicVersion, + last_update: SystemTime, +} + +impl Default for RouteConnInfo { + fn default() -> Self { + Self { + connected_peers: BTreeSet::new(), + version: AtomicVersion::new(), + last_update: SystemTime::now(), + } + } +} + // constructed with all infos synced from all peers. struct SyncedRouteInfo { - peer_infos: DashMap, - // prost doesn't support unknown fields, so we use DynamicMessage to store raw infos and progate them to other peers. + peer_infos: RwLock>, + // prost doesn't support unknown fields, so we use DynamicMessage to store raw infos and propagate them to other peers. raw_peer_infos: DashMap, - conn_map: DashMap, AtomicVersion)>, + conn_map: RwLock>, foreign_network: DashMap, group_trust_map: DashMap>>, group_trust_map_cache: DashMap>>, // cache for group trust map, should sync with group_trust_map @@ -287,22 +339,21 @@ impl Debug for SyncedRouteInfo { impl SyncedRouteInfo { fn get_connected_peers>(&self, peer_id: PeerId) -> Option { self.conn_map + .read() .get(&peer_id) - .map(|x| x.0.clone().iter().copied().collect()) + .map(|x| x.connected_peers.iter().copied().collect()) } fn remove_peer(&self, peer_id: PeerId) { tracing::warn!(?peer_id, "remove_peer from synced_route_info"); - self.peer_infos.remove(&peer_id); + self.peer_infos.write().remove(&peer_id); self.raw_peer_infos.remove(&peer_id); - self.conn_map.remove(&peer_id); + self.conn_map.write().remove(&peer_id); self.foreign_network.retain(|k, _| k.peer_id != peer_id); self.group_trust_map.remove(&peer_id); self.group_trust_map_cache.remove(&peer_id); - shrink_dashmap(&self.peer_infos, None); shrink_dashmap(&self.raw_peer_infos, None); - shrink_dashmap(&self.conn_map, None); shrink_dashmap(&self.foreign_network, None); shrink_dashmap(&self.group_trust_map, None); shrink_dashmap(&self.group_trust_map_cache, None); @@ -313,15 +364,25 @@ impl SyncedRouteInfo { fn fill_empty_peer_info(&self, peer_ids: &BTreeSet) { let mut need_inc_version = false; for peer_id in peer_ids { - self.peer_infos.entry(*peer_id).or_insert_with(|| { + let guard = self.peer_infos.upgradable_read(); + if !guard.contains_key(peer_id) { + let mut peer_info = RoutePeerInfo::new(); + let mut guard = RwLockUpgradableReadGuard::upgrade(guard); + peer_info.last_update = Some(SystemTime::now().into()); + guard.insert(*peer_id, peer_info); need_inc_version = true; - RoutePeerInfo::new() - }); + } else { + drop(guard); + } - self.conn_map.entry(*peer_id).or_insert_with(|| { + let guard = self.conn_map.upgradable_read(); + if !guard.contains_key(peer_id) { + let mut guard = RwLockUpgradableReadGuard::upgrade(guard); + guard.insert(*peer_id, RouteConnInfo::default()); need_inc_version = true; - (BTreeSet::new(), AtomicVersion::new()) - }); + } else { + drop(guard); + } } if need_inc_version { self.version.inc(); @@ -330,6 +391,7 @@ impl SyncedRouteInfo { fn get_peer_info_version_with_default(&self, peer_id: PeerId) -> Version { self.peer_infos + .read() .get(&peer_id) .map(|x| x.version) .unwrap_or(0) @@ -338,8 +400,9 @@ impl SyncedRouteInfo { fn get_avoid_relay_data(&self, peer_id: PeerId) -> bool { // if avoid relay, just set all outgoing edges to a large value: AVOID_RELAY_COST. self.peer_infos + .read() .get(&peer_id) - .and_then(|x| x.value().feature_flag) + .and_then(|x| x.feature_flag) .map(|x| x.avoid_relay_data) .unwrap_or_default() } @@ -395,7 +458,10 @@ impl SyncedRouteInfo { my_peer_route_id, dst_peer_id, if route_info.peer_id == dst_peer_id { - self.peer_infos.get(&dst_peer_id).map(|x| x.peer_route_id) + self.peer_infos + .read() + .get(&dst_peer_id) + .map(|x| x.peer_route_id) } else { None }, @@ -409,26 +475,19 @@ impl SyncedRouteInfo { .unwrap(); assert_eq!(peer_id_raw, route_info.peer_id); + let mut guard = self.peer_infos.write(); // time between peers may not be synchronized, so update last_update to local now. // note only last_update with larger version will be updated to local saved peer info. route_info.last_update = Some(SystemTime::now().into()); - - self.peer_infos - .entry(route_info.peer_id) - .and_modify(|old_entry| { - if route_info.version > old_entry.version { - self.raw_peer_infos - .insert(route_info.peer_id, raw_route_info.clone()); - *old_entry = route_info.clone(); - need_inc_version = true; - } - }) - .or_insert_with(|| { - need_inc_version = true; - self.raw_peer_infos - .insert(route_info.peer_id, raw_route_info.clone()); - route_info.clone() - }); + if guard + .get_mut(&route_info.peer_id) + .is_none_or(|old| route_info.version > old.version) + { + self.raw_peer_infos + .insert(route_info.peer_id, raw_route_info.clone()); + guard.insert(route_info.peer_id, route_info); + need_inc_version = true; + } } if need_inc_version { self.version.inc(); @@ -436,7 +495,31 @@ impl SyncedRouteInfo { Ok(()) } - fn update_conn_map(&self, conn_bitmap: &RouteConnBitmap) { + fn update_conn_info_one_peer( + &self, + peer_id_version: &PeerIdVersion, + connected_peers: BTreeSet, + ) -> bool { + let mut guard = self.conn_map.write(); + if guard + .get_mut(&peer_id_version.peer_id) + .is_none_or(|old| peer_id_version.version > old.version.get()) + { + guard.insert( + peer_id_version.peer_id, + RouteConnInfo { + connected_peers, + version: peer_id_version.version.into(), + last_update: SystemTime::now(), + }, + ); + return true; + } + + false + } + + fn update_conn_info_with_bitmap(&self, conn_bitmap: &RouteConnBitmap) { self.fill_empty_peer_info(&conn_bitmap.peer_ids.iter().map(|x| x.peer_id).collect()); let mut need_inc_version = false; @@ -444,27 +527,14 @@ impl SyncedRouteInfo { for (peer_idx, peer_id_version) in conn_bitmap.peer_ids.iter().enumerate() { let connceted_peers = conn_bitmap.get_connected_peers(peer_idx); self.fill_empty_peer_info(&connceted_peers); - - self.conn_map - .entry(peer_id_version.peer_id) - .and_modify(|(old_conn_bitmap, old_version)| { - if peer_id_version.version > old_version.get() { - *old_conn_bitmap = connceted_peers.clone(); - need_inc_version = true; - old_version.set(peer_id_version.version); - } - }) - .or_insert_with(|| { - need_inc_version = true; - (connceted_peers, (peer_id_version.version).into()) - }); + need_inc_version = self.update_conn_info_one_peer(peer_id_version, connceted_peers); } if need_inc_version { self.version.inc(); } } - fn update_conn_peer_list(&self, conn_peer_list: &RouteConnPeerList) { + fn update_conn_info_with_list(&self, conn_peer_list: &RouteConnPeerList) { let mut need_inc_version = false; for peer_conn_info in &conn_peer_list.peer_conn_infos { @@ -473,23 +543,9 @@ impl SyncedRouteInfo { }; let connected_peers: BTreeSet = peer_conn_info.connected_peer_ids.iter().copied().collect(); - let (peer_id, version) = (peer_id_version.peer_id, peer_id_version.version); self.fill_empty_peer_info(&connected_peers); - - self.conn_map - .entry(peer_id) - .and_modify(|(old_conn_bitmap, old_version)| { - if version > old_version.get() { - *old_conn_bitmap = connected_peers.clone(); - need_inc_version = true; - old_version.set(version); - } - }) - .or_insert_with(|| { - need_inc_version = true; - (connected_peers, version.into()) - }); + need_inc_version = self.update_conn_info_one_peer(&peer_id_version, connected_peers); } if need_inc_version { self.version.inc(); @@ -499,10 +555,10 @@ impl SyncedRouteInfo { fn update_conn_info(&self, conn_info: &ConnInfo) { match conn_info { ConnInfo::ConnBitmap(conn_bitmap) => { - self.update_conn_map(conn_bitmap); + self.update_conn_info_with_bitmap(conn_bitmap); } ConnInfo::ConnPeerList(conn_peer_list) => { - self.update_conn_peer_list(conn_peer_list); + self.update_conn_info_with_list(conn_peer_list); } } } @@ -535,15 +591,34 @@ impl SyncedRouteInfo { my_peer_route_id: u64, global_ctx: &ArcGlobalCtx, ) -> bool { - let mut old = self.peer_infos.entry(my_peer_id).or_default(); - let new = old.update_self(my_peer_id, my_peer_route_id, global_ctx); - let new_version = new.version; - let old_version = old.version; - *old = new; - drop(old); + let mut new = RoutePeerInfo::new_updated_self(my_peer_id, my_peer_route_id, global_ctx); + let mut guard = self.peer_infos.upgradable_read(); + let old = guard.get(&my_peer_id); + let new_version = old.map(|x| x.version).unwrap_or(0) + 1; + let need_insert_new = if let Some(old) = old { + RoutePeerInfo::try_update_new_peer_info(old, &mut new) + } else { + true + }; + + if need_insert_new { + let acl_groups = if old.map(|x| x.groups != new.groups).unwrap_or(true) { + Some(new.groups.clone()) + } else { + None + }; + + guard.with_upgraded(|peer_infos| { + new.last_update = Some(SystemTime::now().into()); + new.version = new_version; + peer_infos.insert(my_peer_id, new) + }); + drop(guard); + + if let Some(acl_groups) = acl_groups { + self.update_my_group_trusts(my_peer_id, &acl_groups); + } - if new_version != old_version { - self.update_my_group_trusts(my_peer_id); self.version.inc(); true } else { @@ -554,18 +629,24 @@ impl SyncedRouteInfo { fn update_my_conn_info(&self, my_peer_id: PeerId, connected_peers: BTreeSet) -> bool { self.fill_empty_peer_info(&connected_peers); - let mut my_conn_info = self - .conn_map - .entry(my_peer_id) - .or_insert((BTreeSet::new(), AtomicVersion::new())); + let guard = self.conn_map.upgradable_read(); + let my_conn_info = guard.get(&my_peer_id); + let new_version = my_conn_info.map(|x| x.version.get()).unwrap_or(0) + 1; - if connected_peers == my_conn_info.value().0 { - false - } else { - let _ = std::mem::replace(&mut my_conn_info.value_mut().0, connected_peers); - my_conn_info.value().1.inc(); + if my_conn_info.is_none_or(|old| old.connected_peers != connected_peers) { + let mut guard = RwLockUpgradableReadGuard::upgrade(guard); + guard.insert( + my_peer_id, + RouteConnInfo { + connected_peers, + version: new_version.into(), + last_update: SystemTime::now(), + }, + ); self.version.inc(); true + } else { + false } } @@ -634,16 +715,12 @@ impl SyncedRouteInfo { updated } - fn is_peer_bidirectly_connected(&self, src_peer_id: PeerId, dst_peer_id: PeerId) -> bool { - self.conn_map - .get(&src_peer_id) - .map(|x| x.0.contains(&dst_peer_id)) - .unwrap_or(false) - } + fn get_next_last_sync_succ_timestamp(&self) -> SystemTime { + let _peer_info_lock = self.peer_infos.read(); + let _conn_info_lock = self.conn_map.read(); + // TODO: add conn and foreign network lock - fn is_peer_directly_connected(&self, src_peer_id: PeerId, dst_peer_id: PeerId) -> bool { - self.is_peer_bidirectly_connected(src_peer_id, dst_peer_id) - || self.is_peer_bidirectly_connected(dst_peer_id, src_peer_id) + SystemTime::now() } fn verify_and_update_group_trusts( @@ -725,13 +802,15 @@ impl SyncedRouteInfo { } } - fn update_my_group_trusts(&self, my_peer_id: PeerId) { + fn update_my_group_trusts(&self, my_peer_id: PeerId, groups: &[PeerGroupInfo]) { let mut my_group_map = HashMap::new(); let mut my_group_names = Vec::new(); - for group in self.peer_infos.entry(my_peer_id).or_default().groups.iter() { + + for group in groups.iter() { my_group_map.insert(group.group_name.clone(), group.group_proof.clone()); my_group_names.push(group.group_name.clone()); } + self.group_trust_map.insert(my_peer_id, my_group_map); self.group_trust_map_cache .insert(my_peer_id, Arc::new(my_group_names)); @@ -749,21 +828,16 @@ struct NextHopInfo { } // dst_peer_id -> (next_hop_peer_id, cost, path_len) type NextHopMap = DashMap; -#[derive(Debug, Clone, Copy)] -struct PeerIdAndVersion { - peer_id: PeerId, - version: Version, -} // computed with SyncedRouteInfo. used to get next hop. #[derive(Debug)] struct RouteTable { peer_infos: DashMap, next_hop_map: NextHopMap, - ipv4_peer_id_map: DashMap, - ipv6_peer_id_map: DashMap, - cidr_peer_id_map: ArcSwap>, - cidr_v6_peer_id_map: ArcSwap>, + ipv4_peer_id_map: DashMap, + ipv6_peer_id_map: DashMap, + cidr_peer_id_map: ArcSwap>, + cidr_v6_peer_id_map: ArcSwap>, next_hop_map_version: AtomicVersion, } @@ -811,18 +885,17 @@ impl RouteTable { let mut start_node_idx = None; let peer_id_to_node_index: PeerIdToNodexIdxMap = DashMap::new(); - for item in synced_info.peer_infos.iter() { - let peer_id = item.key(); - let info = item.value(); + for (peer_id, info) in synced_info.peer_infos.read().iter() { + let peer_id = *peer_id; if info.version == 0 { continue; } - let node_idx = graph.add_node(*peer_id); + let node_idx = graph.add_node(peer_id); - peer_id_to_node_index.insert(*peer_id, node_idx); - if *peer_id == my_peer_id { + peer_id_to_node_index.insert(peer_id, node_idx); + if peer_id == my_peer_id { start_node_idx = Some(node_idx); } } @@ -991,18 +1064,18 @@ impl RouteTable { } let peer_id = item.key(); - let Some(info) = synced_info.peer_infos.get(peer_id) else { + let Some(info) = synced_info.peer_infos.read().get(peer_id).cloned() else { continue; }; self.peer_infos.insert(*peer_id, info.clone()); - let peer_id_and_version = PeerIdAndVersion { + let peer_id_and_version = PeerIdVersion { peer_id: *peer_id, version, }; - let is_new_peer_better = |old_peer: &PeerIdAndVersion| -> bool { + let is_new_peer_better = |old_peer: &PeerIdVersion| -> bool { if peer_id_and_version.version > old_peer.version { return true; } @@ -1194,6 +1267,12 @@ struct SyncRouteSession { dst_saved_conn_info_version: DashMap, dst_saved_foreign_network_versions: DashMap, + // we don't want to send unreachable peer infos / conn infos to peer, so we keep track of them. + unreachable_peers_for_peer_info: parking_lot::Mutex>, + unreachable_peers_for_conn_info: parking_lot::Mutex>, + + last_sync_succ_timestamp: AtomicCell>, + my_session_id: AtomicSessionId, dst_session_id: AtomicSessionId, @@ -1207,6 +1286,8 @@ struct SyncRouteSession { rpc_rx_count: AtomicU32, task: SessionTask, + + lock: parking_lot::Mutex<()>, } impl SyncRouteSession { @@ -1218,6 +1299,11 @@ impl SyncRouteSession { dst_saved_conn_info_version: DashMap::new(), dst_saved_foreign_network_versions: DashMap::new(), + unreachable_peers_for_peer_info: parking_lot::Mutex::new(BTreeMap::new()), + unreachable_peers_for_conn_info: parking_lot::Mutex::new(BTreeMap::new()), + + last_sync_succ_timestamp: AtomicCell::new(None), + my_session_id: AtomicSessionId::new(rand::random()), dst_session_id: AtomicSessionId::new(0), @@ -1230,6 +1316,8 @@ impl SyncRouteSession { rpc_rx_count: AtomicU32::new(0), task: SessionTask::new(my_peer_id), + + lock: parking_lot::Mutex::new(()), } } @@ -1363,12 +1451,19 @@ impl SyncRouteSession { self.need_sync_initiator_info.store(true, Ordering::Relaxed); } + // return whether session id is updated fn update_dst_session_id(&self, session_id: SessionId) { if session_id != self.dst_session_id.load(Ordering::Relaxed) { tracing::warn!(?self, ?session_id, "session id mismatch, clear saved info."); self.dst_session_id.store(session_id, Ordering::Relaxed); self.dst_saved_conn_info_version.clear(); self.dst_saved_peer_info_versions.clear(); + + // update_dst_session_id is always called with session lock held, so clear + // last_sync_succ_timestamp and unreachable_peers non-atomic is safe. + self.last_sync_succ_timestamp.store(None); + self.unreachable_peers_for_peer_info.lock().clear(); + self.unreachable_peers_for_conn_info.lock().clear(); } } @@ -1386,6 +1481,16 @@ impl SyncRouteSession { self.dst_saved_foreign_network_versions.shrink_to_fit(); } + fn update_last_sync_succ_timestamp(&self, next_last_sync_succ_timestamp: SystemTime) { + let _ = self.last_sync_succ_timestamp.fetch_update(|x| { + if x.is_none_or(|old| old < next_last_sync_succ_timestamp) { + Some(Some(next_last_sync_succ_timestamp)) + } else { + None + } + }); + } + fn short_debug_string(&self) -> String { format!( "session_dst_peer: {:?}, my_session_id: {:?}, dst_session_id: {:?}, we_are_initiator: {:?}, dst_is_initiator: {:?}, rpc_tx_count: {:?}, rpc_rx_count: {:?}, task: {:?}", @@ -1470,9 +1575,9 @@ impl PeerRouteServiceImpl { foreign_network_my_peer_id_map: DashMap::new(), synced_route_info: SyncedRouteInfo { - peer_infos: DashMap::new(), + peer_infos: RwLock::new(OrderedHashMap::new()), raw_peer_infos: DashMap::new(), - conn_map: DashMap::new(), + conn_map: RwLock::new(OrderedHashMap::new()), foreign_network: DashMap::new(), group_trust_map: DashMap::new(), group_trust_map_cache: DashMap::new(), @@ -1660,49 +1765,57 @@ impl PeerRouteServiceImpl { // the conn_bitmap should contain complete list of directly connected peers. // use union of dst peers can preserve this property. - let all_dst_peer_ids = self - .synced_route_info - .conn_map - .iter() - .flat_map(|x| x.value().clone().0.into_iter()) - .collect::>(); - - let all_peer_ids = self - .synced_route_info - .conn_map - .iter() - .map(|x| PeerIdVersion { - peer_id: *x.key(), - version: x.value().1.get(), - }) - // do not sync conn info of peers that are not reachable from any peer. - .filter(|p| { - all_dst_peer_ids.contains(&p.peer_id) || self.route_table.peer_reachable(p.peer_id) - }) - .collect::>(); + let mut all_peer_ids: BTreeMap = BTreeMap::new(); + let mut add_to_all_peer_ids = |peer_id: PeerId, version: Version| { + all_peer_ids + .entry(peer_id) + .and_modify(|x| { + if *x < version { + *x = version; + } + }) + .or_insert(version); + }; + for item in self.synced_route_info.conn_map.read().iter() { + let src_peer_id = *item.0; + if !self.route_table.peer_reachable(src_peer_id) { + continue; + } + add_to_all_peer_ids(src_peer_id, item.1.version.get()); + for dst_peer_id in item.1.connected_peers.iter() { + add_to_all_peer_ids(*dst_peer_id, 0); + } + } let mut conn_bitmap = RouteConnBitmap { bitmap: vec![0; (all_peer_ids.len() * all_peer_ids.len()).div_ceil(8)], - peer_ids: all_peer_ids, + peer_ids: all_peer_ids + .iter() + .map(|x| PeerIdVersion { + peer_id: *x.0, + version: *x.1, + }) + .collect(), }; + let locked_conn_map = self.synced_route_info.conn_map.read(); let all_peer_ids = &conn_bitmap.peer_ids; for (peer_idx, peer_id_version) in all_peer_ids.iter().enumerate() { - let Some(connected) = self - .synced_route_info - .conn_map - .get(&peer_id_version.peer_id) - else { + let Some(connected) = locked_conn_map.get(&peer_id_version.peer_id) else { continue; }; for (idx, other_peer_id_version) in all_peer_ids.iter().enumerate() { - if connected.0.contains(&other_peer_id_version.peer_id) { + if connected + .connected_peers + .contains(&other_peer_id_version.peer_id) + { let bit_idx = peer_idx * all_peer_ids.len() + idx; conn_bitmap.bitmap[bit_idx / 8] |= 1 << (bit_idx % 8); } } } + drop(locked_conn_map); let mut locked = self.cached_local_conn_map.lock().unwrap(); if self @@ -1715,21 +1828,58 @@ impl PeerRouteServiceImpl { fn build_route_info(&self, session: &SyncRouteSession) -> Option> { let mut route_infos = Vec::new(); - for item in self.synced_route_info.peer_infos.iter() { + let peer_infos = self.synced_route_info.peer_infos.read(); + let mut unreachable_peers_for_peer_info = session.unreachable_peers_for_peer_info.lock(); + let last_sync_succ_timestamp = session.last_sync_succ_timestamp.load(); + for (peer_id, peer_info) in peer_infos.iter().rev() { + // stop iter if last_update of peer info is older than session.last_sync_succ_timestamp + if let Some(last_update) = peer_info.last_update { + let last_update = TryInto::::try_into(last_update).unwrap(); + if last_sync_succ_timestamp.is_some_and(|t| last_update < t) { + tracing::debug!( + "ignore peer_info {:?} because last_update: {:?} is older than last_sync_succ_timestamp: {:?}, peer_infos_count: {}, my_peer_id: {:?}, session: {:?}", + peer_info, + last_update, + last_sync_succ_timestamp, + peer_infos.len(), + self.my_peer_id, + session + ); + break; + } + } + + if session.check_saved_peer_info_update_to_date(peer_info.peer_id, peer_info.version) { + continue; + } + // do not send unreachable peer info to dst peer. - if !self.route_table.peer_reachable(*item.key()) { + if !self.route_table.peer_reachable(*peer_id) { + unreachable_peers_for_peer_info.insert(*peer_id, peer_info.version); continue; } - if session - .check_saved_peer_info_update_to_date(item.value().peer_id, item.value().version) - { - continue; - } - - route_infos.push(item.value().clone()); + route_infos.push(peer_info.clone()); } + unreachable_peers_for_peer_info.retain(|peer_id, version| { + if session.check_saved_peer_info_update_to_date(*peer_id, *version) { + // if saved peer info is up-to-date, forget this peer id. + return false; + } + let Some(peer_info) = peer_infos.get(peer_id) else { + // if not found in peer info map, forget this peer id. + return false; + }; + + if self.route_table.peer_reachable(*peer_id) { + route_infos.push(peer_info.clone()); + } + + // this round rpc may fail, so keep it and remove the id only when it's in dst_saved_map + true + }); + if route_infos.is_empty() { None } else { @@ -1737,55 +1887,75 @@ impl PeerRouteServiceImpl { } } - fn build_conn_bitmap(&self, session: &SyncRouteSession) -> Option { - let mut need_update = false; - for peer_id_version in self.cached_local_conn_map.lock().unwrap().peer_ids.iter() { - let (peer_id, local_version) = (peer_id_version.peer_id, peer_id_version.version); - if session.check_saved_conn_version_update_to_date(peer_id, local_version) { - continue; - } - need_update = true; - } - - if !need_update { - return None; - } - - Some(self.cached_local_conn_map.lock().unwrap().clone()) - } - - fn estimate_conn_bitmap_size(&self) -> usize { - let cached_conn_map = self.cached_local_conn_map.lock().unwrap(); - cached_conn_map.bitmap.len() - + (cached_conn_map.peer_ids.len() * std::mem::size_of::()) - } - fn build_conn_peer_list( &self, session: &SyncRouteSession, estimated_size: &mut usize, ) -> Option { + let last_sync_succ_timestamp = session.last_sync_succ_timestamp.load(); let mut peer_conn_infos = Vec::new(); - let cached_conn_map = self.cached_local_conn_map.lock().unwrap(); *estimated_size = 0; - for peer_id_version in cached_conn_map.peer_ids.iter() { - let (peer_id, local_version) = (peer_id_version.peer_id, peer_id_version.version); - if session.check_saved_conn_version_update_to_date(peer_id, local_version) { + let conn_map = self.synced_route_info.conn_map.read(); + let mut unreachable_peers_for_conn_info = session.unreachable_peers_for_conn_info.lock(); + + let mut add_to_conn_peer_list = |peer_id: PeerId, conn_info: &RouteConnInfo| { + peer_conn_infos.push(PeerConnInfo { + peer_id: Some(PeerIdVersion { + peer_id, + version: conn_info.version.get(), + }), + connected_peer_ids: conn_info.connected_peers.iter().copied().collect(), + }); + *estimated_size += std::mem::size_of::() + + conn_info.connected_peers.len() * std::mem::size_of::(); + }; + + for (peer_id, conn_info) in conn_map.iter().rev() { + // stop iter if last_update of conn info is older than session.last_sync_succ_timestamp + let last_update = TryInto::::try_into(conn_info.last_update).unwrap(); + if last_sync_succ_timestamp.is_some_and(|t| last_update < t) { + tracing::debug!( + "ignore conn info {:?} because last_update: {:?} is older than last_sync_succ_timestamp: {:?}, conn_map count: {}, my_peer_id: {:?}, session: {:?}", + conn_info, + last_update, + last_sync_succ_timestamp, + conn_map.len(), + self.my_peer_id, + session + ); + break; + } + + if session.check_saved_conn_version_update_to_date(*peer_id, conn_info.version.get()) { continue; } - let Some(connected) = self.synced_route_info.conn_map.get(&peer_id) else { + if !self.route_table.peer_reachable(*peer_id) { + unreachable_peers_for_conn_info.insert(*peer_id, conn_info.version.get()); continue; + } + + add_to_conn_peer_list(*peer_id, conn_info); + } + + unreachable_peers_for_conn_info.retain(|peer_id, version| { + if session.check_saved_conn_version_update_to_date(*peer_id, *version) { + // if saved conn info is up-to-date, forget this peer id. + return false; + } + let Some(conn_info) = conn_map.get(peer_id) else { + // if not found in peer info map, forget this peer id. + return false; }; - peer_conn_infos.push(PeerConnInfo { - peer_id: Some(*peer_id_version), - connected_peer_ids: connected.0.iter().copied().collect(), - }); - *estimated_size += std::mem::size_of::() - + connected.0.len() * std::mem::size_of::(); - } + if self.route_table.peer_reachable(*peer_id) { + add_to_conn_peer_list(*peer_id, conn_info); + } + + // this round rpc may fail, so keep it and remove the id only when it's in dst_saved_map + true + }); if peer_conn_infos.is_empty() { return None; @@ -1794,6 +1964,16 @@ impl PeerRouteServiceImpl { Some(RouteConnPeerList { peer_conn_infos }) } + fn build_conn_bitmap(&self) -> RouteConnBitmap { + self.cached_local_conn_map.lock().unwrap().clone() + } + + fn estimate_conn_bitmap_size(&self) -> usize { + let cached_conn_map = self.cached_local_conn_map.lock().unwrap(); + cached_conn_map.bitmap.len() + + (cached_conn_map.peer_ids.len() * std::mem::size_of::()) + } + fn build_foreign_network_info( &self, session: &SyncRouteSession, @@ -1860,39 +2040,35 @@ impl PeerRouteServiceImpl { let dst_supports_peer_list = self .synced_route_info .peer_infos + .read() .get(&dst_peer_id) .and_then(|p| p.feature_flag) .map(|x| x.support_conn_list_sync) - .unwrap_or(false); - - if !dst_supports_peer_list && !FORCE_USE_CONN_LIST.load(Ordering::Relaxed) { - // Destination peer doesn't support peer list, use bitmap format - return self.build_conn_bitmap(session).map(Into::into); - } + .unwrap_or(false) + || FORCE_USE_CONN_LIST.load(Ordering::Relaxed); // Both formats are supported, choose the more efficient one let mut conn_list_estimated_size = 0; - let peer_list = self.build_conn_peer_list(session, &mut conn_list_estimated_size); + let peer_list = self.build_conn_peer_list(session, &mut conn_list_estimated_size)?; let bitmap_size = self.estimate_conn_bitmap_size(); - if conn_list_estimated_size < bitmap_size - || FORCE_USE_CONN_LIST.load(Ordering::Relaxed) - || peer_list.is_none() - { - peer_list.map(Into::into) + if conn_list_estimated_size < bitmap_size && dst_supports_peer_list { + Some(peer_list.into()) } else { - self.build_conn_bitmap(session).map(Into::into) + Some(self.build_conn_bitmap().into()) } } fn clear_expired_peer(&self) { let now = SystemTime::now(); let mut to_remove = Vec::new(); - for item in self.synced_route_info.peer_infos.iter() { - if let Ok(d) = now.duration_since(item.value().last_update.unwrap().try_into().unwrap()) - { - if d > REMOVE_DEAD_PEER_INFO_AFTER { - to_remove.push(*item.key()); + for (peer_id, peer_info) in self.synced_route_info.peer_infos.read().iter() { + if let Ok(d) = now.duration_since(peer_info.last_update.unwrap().try_into().unwrap()) { + if d > REMOVE_DEAD_PEER_INFO_AFTER + || (d > REMOVE_UNREACHABLE_PEER_INFO_AFTER + && !self.route_table.peer_reachable(*peer_id)) + { + to_remove.push(*peer_id); } } } @@ -1969,8 +2145,12 @@ impl PeerRouteServiceImpl { return true; }; + let _session_lock = session.lock.lock(); + let my_peer_id = self.my_peer_id; + let next_last_sync_succ_timestamp = + self.synced_route_info.get_next_last_sync_succ_timestamp(); let (peer_infos, conn_info, foreign_network) = self.build_sync_request(&session, dst_peer_id); if peer_infos.is_none() @@ -2016,9 +2196,17 @@ impl PeerRouteServiceImpl { .encode_to_vec() .into(), ); + + drop(_session_lock); let ret = rpc_stub .sync_route_info(ctrl, SyncRouteInfoRequest::default()) .await; + let _session_lock = session.lock.lock(); + + tracing::debug!( + "sync_route_info resp: {:?}, req: {:?}, session: {:?}, my_info: {:?}, next_last_sync_succ_timestamp: {:?}", + ret, sync_route_info_req, session, self.global_ctx.network, next_last_sync_succ_timestamp + ); if let Err(e) = &ret { tracing::error!( @@ -2066,6 +2254,8 @@ impl PeerRouteServiceImpl { if let Some(foreign_network) = &foreign_network { session.update_dst_saved_foreign_network_version(foreign_network, dst_peer_id); } + + session.update_last_sync_succ_timestamp(next_last_sync_succ_timestamp); } } false @@ -2428,6 +2618,8 @@ impl RouteSessionManager { let my_peer_id = service_impl.my_peer_id; let session = self.get_or_start_session(from_peer_id)?; + let _session_lock = session.lock.lock(); + session.rpc_rx_count.fetch_add(1, Ordering::Relaxed); session.update_dst_session_id(from_session_id); @@ -2840,7 +3032,7 @@ mod tests { peers::{ create_packet_recv_chan, peer_manager::{PeerManager, RouteAlgoType}, - peer_ospf_route::{PeerIdAndVersion, PeerRouteServiceImpl, FORCE_USE_CONN_LIST}, + peer_ospf_route::{PeerIdVersion, PeerRouteServiceImpl, FORCE_USE_CONN_LIST}, route_trait::{NextHopPolicy, Route, RouteCostCalculatorInterface}, tests::{connect_peer_manager, create_mock_peer_manager, wait_route_appear}, }, @@ -2923,8 +3115,14 @@ mod tests { tokio::time::sleep(Duration::from_secs(3)).await; - assert_eq!(2, r_a.service_impl.synced_route_info.peer_infos.len()); - assert_eq!(2, r_b.service_impl.synced_route_info.peer_infos.len()); + assert_eq!( + 2, + r_a.service_impl.synced_route_info.peer_infos.read().len() + ); + assert_eq!( + 2, + r_b.service_impl.synced_route_info.peer_infos.read().len() + ); for s in r_a.service_impl.sessions.iter() { assert!(s.value().task.is_running()); @@ -2934,6 +3132,7 @@ mod tests { r_a.service_impl .synced_route_info .peer_infos + .read() .get(&p_a.my_peer_id()) .unwrap() .version, @@ -2990,7 +3189,7 @@ mod tests { for r in [r_a.clone(), r_b.clone(), r_c.clone()].iter() { wait_for_condition( - || async { r.service_impl.synced_route_info.peer_infos.len() == 3 }, + || async { r.service_impl.synced_route_info.peer_infos.read().len() == 3 }, Duration::from_secs(5), ) .await; @@ -3077,13 +3276,13 @@ mod tests { let synced_info = &p.service_impl.synced_route_info; for routable_peer in routable_peers.iter() { // check conn map - let conns = synced_info - .conn_map - .get(&routable_peer.my_peer_id()) - .unwrap(); + let conns = { + let guard = synced_info.conn_map.read(); + guard.get(&routable_peer.my_peer_id()).cloned().unwrap() + }; assert_eq!( - conns.0, + conns.connected_peers, routable_peer .get_peer_map() .list_peers() @@ -3095,7 +3294,9 @@ mod tests { // check peer infos let peer_info = synced_info .peer_infos + .read() .get(&routable_peer.my_peer_id()) + .cloned() .unwrap(); assert_eq!(peer_info.peer_id, routable_peer.my_peer_id()); } @@ -3125,7 +3326,7 @@ mod tests { for r in [r_a.clone(), r_b.clone(), r_c.clone()].iter() { wait_for_condition( - || async { r.service_impl.synced_route_info.peer_infos.len() == 3 }, + || async { r.service_impl.synced_route_info.peer_infos.read().len() == 3 }, Duration::from_secs(5), ) .await; @@ -3392,10 +3593,10 @@ mod tests { let proxy_cidr: Ipv4Cidr = "192.168.100.0/24".parse().unwrap(); let test_ip = proxy_cidr.first_address(); - let mut cidr_peer_id_map: PrefixMap = PrefixMap::new(); + let mut cidr_peer_id_map: PrefixMap = PrefixMap::new(); cidr_peer_id_map.insert( proxy_cidr, - PeerIdAndVersion { + PeerIdVersion { peer_id: p_c.my_peer_id(), version: 0, }, diff --git a/easytier/src/tests/ipv6_test.rs b/easytier/src/tests/ipv6_test.rs index d71dc07..07bd1b6 100644 --- a/easytier/src/tests/ipv6_test.rs +++ b/easytier/src/tests/ipv6_test.rs @@ -38,8 +38,7 @@ async fn test_route_peer_info_ipv6() { global_ctx.set_ipv6(Some(ipv6_cidr)); // Create RoutePeerInfo with IPv6 support - let peer_info = RoutePeerInfo::new(); - let updated_info = peer_info.update_self(123, 456, &global_ctx); + let updated_info = RoutePeerInfo::new_updated_self(123, 456, &global_ctx); // Verify IPv6 address is included assert!(updated_info.ipv6_addr.is_some());