diff --git a/easytier/src/common/global_ctx.rs b/easytier/src/common/global_ctx.rs index 670553b..5381581 100644 --- a/easytier/src/common/global_ctx.rs +++ b/easytier/src/common/global_ctx.rs @@ -142,6 +142,7 @@ impl GlobalCtx { let feature_flags = PeerFeatureFlag { kcp_input: !config_fs.get_flags().disable_kcp_input, no_relay_kcp: config_fs.get_flags().disable_relay_kcp, + support_conn_list_sync: true, // Enable selective peer list sync by default ..Default::default() }; diff --git a/easytier/src/peers/peer_ospf_route.rs b/easytier/src/peers/peer_ospf_route.rs index 8694745..d294fa3 100644 --- a/easytier/src/peers/peer_ospf_route.rs +++ b/easytier/src/peers/peer_ospf_route.rs @@ -22,7 +22,6 @@ use petgraph::{ use prefix_trie::PrefixMap; use prost::Message; use prost_reflect::{DynamicMessage, ReflectMessage}; -use serde::{Deserialize, Serialize}; use tokio::{ select, sync::Mutex, @@ -40,8 +39,9 @@ use crate::{ common::{Ipv4Inet, NatType, StunInfo}, peer_rpc::{ route_foreign_network_infos, route_foreign_network_summary, - ForeignNetworkRouteInfoEntry, ForeignNetworkRouteInfoKey, OspfRouteRpc, - OspfRouteRpcClientFactory, OspfRouteRpcServer, PeerIdVersion, RouteForeignNetworkInfos, + sync_route_info_request::ConnInfo, ForeignNetworkRouteInfoEntry, + ForeignNetworkRouteInfoKey, OspfRouteRpc, OspfRouteRpcClientFactory, + OspfRouteRpcServer, PeerIdVersion, RouteForeignNetworkInfos, RouteForeignNetworkSummary, RoutePeerInfo, RoutePeerInfos, SyncRouteInfoError, SyncRouteInfoRequest, SyncRouteInfoResponse, }, @@ -68,6 +68,7 @@ static UPDATE_PEER_INFO_PERIOD: Duration = Duration::from_secs(3600); static REMOVE_DEAD_PEER_INFO_AFTER: Duration = Duration::from_secs(3660); // the cost (latency between two peers) is i32, i32::MAX is large enough. static AVOID_RELAY_COST: usize = i32::MAX as usize; +static FORCE_USE_CONN_LIST: AtomicBool = AtomicBool::new(true); type Version = u32; @@ -233,49 +234,11 @@ impl From for crate::proto::api::instance::Route { } } -#[derive(Deserialize, Serialize, Clone, Debug, PartialEq)] -struct RouteConnBitmap { - peer_ids: Vec<(PeerId, Version)>, - bitmap: Vec, -} - -impl From for crate::proto::peer_rpc::RouteConnBitmap { - fn from(val: RouteConnBitmap) -> Self { - crate::proto::peer_rpc::RouteConnBitmap { - peer_ids: val - .peer_ids - .into_iter() - .map(|x| PeerIdVersion { - peer_id: x.0, - version: x.1, - }) - .collect(), - bitmap: val.bitmap, - } - } -} - -impl From for RouteConnBitmap { - fn from(v: crate::proto::peer_rpc::RouteConnBitmap) -> Self { - RouteConnBitmap { - peer_ids: v - .peer_ids - .into_iter() - .map(|x| (x.peer_id, x.version)) - .collect(), - bitmap: v.bitmap, - } - } -} +type RouteConnBitmap = crate::proto::peer_rpc::RouteConnBitmap; +type RouteConnPeerList = crate::proto::peer_rpc::RouteConnPeerList; +type PeerConnInfo = crate::proto::peer_rpc::route_conn_peer_list::PeerConnInfo; impl RouteConnBitmap { - fn new() -> Self { - RouteConnBitmap { - peer_ids: Vec::new(), - bitmap: Vec::new(), - } - } - fn get_bit(&self, idx: usize) -> bool { let byte_idx = idx / 8; let bit_idx = idx % 8; @@ -285,9 +248,9 @@ impl RouteConnBitmap { fn get_connected_peers(&self, peer_idx: usize) -> BTreeSet { let mut connected_peers = BTreeSet::new(); - for (idx, (peer_id, _)) in self.peer_ids.iter().enumerate() { + for (idx, peer_id_version) in self.peer_ids.iter().enumerate() { if self.get_bit(peer_idx * self.peer_ids.len() + idx) { - connected_peers.insert(*peer_id); + connected_peers.insert(peer_id_version.peer_id); } } connected_peers @@ -474,26 +437,26 @@ impl SyncedRouteInfo { } fn update_conn_map(&self, conn_bitmap: &RouteConnBitmap) { - self.fill_empty_peer_info(&conn_bitmap.peer_ids.iter().map(|x| x.0).collect()); + self.fill_empty_peer_info(&conn_bitmap.peer_ids.iter().map(|x| x.peer_id).collect()); let mut need_inc_version = false; - for (peer_idx, (peer_id, version)) in conn_bitmap.peer_ids.iter().enumerate() { + for (peer_idx, peer_id_version) in conn_bitmap.peer_ids.iter().enumerate() { let connceted_peers = conn_bitmap.get_connected_peers(peer_idx); self.fill_empty_peer_info(&connceted_peers); self.conn_map - .entry(*peer_id) + .entry(peer_id_version.peer_id) .and_modify(|(old_conn_bitmap, old_version)| { - if *version > old_version.get() { + if peer_id_version.version > old_version.get() { *old_conn_bitmap = connceted_peers.clone(); need_inc_version = true; - old_version.set(*version); + old_version.set(peer_id_version.version); } }) .or_insert_with(|| { need_inc_version = true; - (connceted_peers, (*version).into()) + (connceted_peers, (peer_id_version.version).into()) }); } if need_inc_version { @@ -501,6 +464,49 @@ impl SyncedRouteInfo { } } + fn update_conn_peer_list(&self, conn_peer_list: &RouteConnPeerList) { + let mut need_inc_version = false; + + for peer_conn_info in &conn_peer_list.peer_conn_infos { + let Some(peer_id_version) = peer_conn_info.peer_id else { + continue; + }; + let connected_peers: BTreeSet = + peer_conn_info.connected_peer_ids.iter().copied().collect(); + let (peer_id, version) = (peer_id_version.peer_id, peer_id_version.version); + + self.fill_empty_peer_info(&connected_peers); + + self.conn_map + .entry(peer_id) + .and_modify(|(old_conn_bitmap, old_version)| { + if version > old_version.get() { + *old_conn_bitmap = connected_peers.clone(); + need_inc_version = true; + old_version.set(version); + } + }) + .or_insert_with(|| { + need_inc_version = true; + (connected_peers, version.into()) + }); + } + if need_inc_version { + self.version.inc(); + } + } + + fn update_conn_info(&self, conn_info: &ConnInfo) { + match conn_info { + ConnInfo::ConnBitmap(conn_bitmap) => { + self.update_conn_map(conn_bitmap); + } + ConnInfo::ConnPeerList(conn_peer_list) => { + self.update_conn_peer_list(conn_peer_list); + } + } + } + fn update_foreign_network(&self, foreign_network: &RouteForeignNetworkInfos) { for item in foreign_network.infos.iter().map(Clone::clone) { let Some(key) = item.key else { @@ -1185,7 +1191,7 @@ struct SyncRouteSession { my_peer_id: PeerId, dst_peer_id: PeerId, dst_saved_peer_info_versions: DashMap, - dst_saved_conn_bitmap_version: DashMap, + dst_saved_conn_info_version: DashMap, dst_saved_foreign_network_versions: DashMap, my_session_id: AtomicSessionId, @@ -1209,7 +1215,7 @@ impl SyncRouteSession { my_peer_id, dst_peer_id, dst_saved_peer_info_versions: DashMap::new(), - dst_saved_conn_bitmap_version: DashMap::new(), + dst_saved_conn_info_version: DashMap::new(), dst_saved_foreign_network_versions: DashMap::new(), my_session_id: AtomicSessionId::new(rand::random()), @@ -1246,7 +1252,7 @@ impl SyncRouteSession { // never send version 0 conn bitmap to dst peer. return true; } - self.dst_saved_conn_bitmap_version + self.dst_saved_conn_info_version .get(&peer_id) .map(|v| { v.touch(); @@ -1293,15 +1299,46 @@ impl SyncRouteSession { conn_bitmap: &RouteConnBitmap, dst_peer_id: PeerId, ) { - for (peer_id, version) in conn_bitmap.peer_ids.iter() { - if *peer_id == dst_peer_id { + for peer_id_version in conn_bitmap.peer_ids.iter() { + if peer_id_version.peer_id == dst_peer_id { continue; } - self.dst_saved_conn_bitmap_version - .entry(*peer_id) + self.dst_saved_conn_info_version + .entry(peer_id_version.peer_id) .or_default() - .set_if_larger(*version); + .set_if_larger(peer_id_version.version); + } + } + + fn update_dst_saved_conn_peer_list_version( + &self, + conn_peer_list: &RouteConnPeerList, + dst_peer_id: PeerId, + ) { + for peer_conn_info in &conn_peer_list.peer_conn_infos { + let Some(peer_id_version) = peer_conn_info.peer_id else { + continue; + }; + if peer_id_version.peer_id == dst_peer_id { + continue; + } + + self.dst_saved_conn_info_version + .entry(peer_id_version.peer_id) + .or_default() + .set_if_larger(peer_id_version.version); + } + } + + fn update_dst_saved_conn_info_version(&self, conn_info: &ConnInfo, dst_peer_id: PeerId) { + match conn_info { + ConnInfo::ConnBitmap(conn_bitmap) => { + self.update_dst_saved_conn_bitmap_version(conn_bitmap, dst_peer_id); + } + ConnInfo::ConnPeerList(peer_list) => { + self.update_dst_saved_conn_peer_list_version(peer_list, dst_peer_id); + } } } @@ -1330,7 +1367,7 @@ impl SyncRouteSession { if session_id != self.dst_session_id.load(Ordering::Relaxed) { tracing::warn!(?self, ?session_id, "session id mismatch, clear saved info."); self.dst_session_id.store(session_id, Ordering::Relaxed); - self.dst_saved_conn_bitmap_version.clear(); + self.dst_saved_conn_info_version.clear(); self.dst_saved_peer_info_versions.clear(); } } @@ -1340,9 +1377,9 @@ impl SyncRouteSession { .retain(|_, v| !v.is_expired()); self.dst_saved_peer_info_versions.shrink_to_fit(); - self.dst_saved_conn_bitmap_version + self.dst_saved_conn_info_version .retain(|_, v| !v.is_expired()); - self.dst_saved_conn_bitmap_version.shrink_to_fit(); + self.dst_saved_conn_info_version.shrink_to_fit(); self.dst_saved_foreign_network_versions .retain(|_, v| !v.is_expired()); @@ -1441,7 +1478,7 @@ impl PeerRouteServiceImpl { group_trust_map_cache: DashMap::new(), version: AtomicVersion::new(), }, - cached_local_conn_map: std::sync::Mutex::new(RouteConnBitmap::new()), + cached_local_conn_map: std::sync::Mutex::new(RouteConnBitmap::default()), cached_local_conn_map_version: AtomicVersion::new(), last_update_my_foreign_network: AtomicCell::new(None), @@ -1634,23 +1671,33 @@ impl PeerRouteServiceImpl { .synced_route_info .conn_map .iter() - .map(|x| (*x.key(), x.value().1.get())) + .map(|x| PeerIdVersion { + peer_id: *x.key(), + version: x.value().1.get(), + }) // do not sync conn info of peers that are not reachable from any peer. - .filter(|p| all_dst_peer_ids.contains(&p.0) || self.route_table.peer_reachable(p.0)) + .filter(|p| { + all_dst_peer_ids.contains(&p.peer_id) || self.route_table.peer_reachable(p.peer_id) + }) .collect::>(); - let mut conn_bitmap = RouteConnBitmap::new(); - conn_bitmap.bitmap = vec![0; (all_peer_ids.len() * all_peer_ids.len()).div_ceil(8)]; - conn_bitmap.peer_ids = all_peer_ids; + let mut conn_bitmap = RouteConnBitmap { + bitmap: vec![0; (all_peer_ids.len() * all_peer_ids.len()).div_ceil(8)], + peer_ids: all_peer_ids, + }; let all_peer_ids = &conn_bitmap.peer_ids; - for (peer_idx, (peer_id, _)) in all_peer_ids.iter().enumerate() { - let Some(connected) = self.synced_route_info.conn_map.get(peer_id) else { + for (peer_idx, peer_id_version) in all_peer_ids.iter().enumerate() { + let Some(connected) = self + .synced_route_info + .conn_map + .get(&peer_id_version.peer_id) + else { continue; }; - for (idx, (other_peer_id, _)) in all_peer_ids.iter().enumerate() { - if connected.0.contains(other_peer_id) { + for (idx, other_peer_id_version) in all_peer_ids.iter().enumerate() { + if connected.0.contains(&other_peer_id_version.peer_id) { let bit_idx = peer_idx * all_peer_ids.len() + idx; conn_bitmap.bitmap[bit_idx / 8] |= 1 << (bit_idx % 8); } @@ -1692,8 +1739,9 @@ impl PeerRouteServiceImpl { fn build_conn_bitmap(&self, session: &SyncRouteSession) -> Option { let mut need_update = false; - for (peer_id, local_version) in self.cached_local_conn_map.lock().unwrap().peer_ids.iter() { - if session.check_saved_conn_version_update_to_date(*peer_id, *local_version) { + for peer_id_version in self.cached_local_conn_map.lock().unwrap().peer_ids.iter() { + let (peer_id, local_version) = (peer_id_version.peer_id, peer_id_version.version); + if session.check_saved_conn_version_update_to_date(peer_id, local_version) { continue; } need_update = true; @@ -1706,6 +1754,46 @@ impl PeerRouteServiceImpl { Some(self.cached_local_conn_map.lock().unwrap().clone()) } + fn estimate_conn_bitmap_size(&self) -> usize { + let cached_conn_map = self.cached_local_conn_map.lock().unwrap(); + cached_conn_map.bitmap.len() + + (cached_conn_map.peer_ids.len() * std::mem::size_of::()) + } + + fn build_conn_peer_list( + &self, + session: &SyncRouteSession, + estimated_size: &mut usize, + ) -> Option { + let mut peer_conn_infos = Vec::new(); + let cached_conn_map = self.cached_local_conn_map.lock().unwrap(); + *estimated_size = 0; + + for peer_id_version in cached_conn_map.peer_ids.iter() { + let (peer_id, local_version) = (peer_id_version.peer_id, peer_id_version.version); + if session.check_saved_conn_version_update_to_date(peer_id, local_version) { + continue; + } + + let Some(connected) = self.synced_route_info.conn_map.get(&peer_id) else { + continue; + }; + + peer_conn_infos.push(PeerConnInfo { + peer_id: Some(*peer_id_version), + connected_peer_ids: connected.0.iter().copied().collect(), + }); + *estimated_size += std::mem::size_of::() + + connected.0.len() * std::mem::size_of::(); + } + + if peer_conn_infos.is_empty() { + return None; + } + + Some(RouteConnPeerList { peer_conn_infos }) + } + fn build_foreign_network_info( &self, session: &SyncRouteSession, @@ -1750,16 +1838,51 @@ impl PeerRouteServiceImpl { fn build_sync_request( &self, session: &SyncRouteSession, + dst_peer_id: PeerId, ) -> ( Option>, - Option, + Option, Option, ) { let route_infos = self.build_route_info(session); - let conn_bitmap = self.build_conn_bitmap(session); + let conn_info = self.build_conn_info(session, dst_peer_id); let foreign_network = self.build_foreign_network_info(session); - (route_infos, conn_bitmap, foreign_network) + (route_infos, conn_info, foreign_network) + } + + fn build_conn_info( + &self, + session: &SyncRouteSession, + dst_peer_id: PeerId, + ) -> Option { + // Check if destination peer supports selective peer list sync + let dst_supports_peer_list = self + .synced_route_info + .peer_infos + .get(&dst_peer_id) + .and_then(|p| p.feature_flag) + .map(|x| x.support_conn_list_sync) + .unwrap_or(false); + + if !dst_supports_peer_list && !FORCE_USE_CONN_LIST.load(Ordering::Relaxed) { + // Destination peer doesn't support peer list, use bitmap format + return self.build_conn_bitmap(session).map(Into::into); + } + + // Both formats are supported, choose the more efficient one + let mut conn_list_estimated_size = 0; + let peer_list = self.build_conn_peer_list(session, &mut conn_list_estimated_size); + let bitmap_size = self.estimate_conn_bitmap_size(); + + if conn_list_estimated_size < bitmap_size + || FORCE_USE_CONN_LIST.load(Ordering::Relaxed) + || peer_list.is_none() + { + peer_list.map(Into::into) + } else { + self.build_conn_bitmap(session).map(Into::into) + } } fn clear_expired_peer(&self) { @@ -1848,9 +1971,10 @@ impl PeerRouteServiceImpl { let my_peer_id = self.my_peer_id; - let (peer_infos, conn_bitmap, foreign_network) = self.build_sync_request(&session); + let (peer_infos, conn_info, foreign_network) = + self.build_sync_request(&session, dst_peer_id); if peer_infos.is_none() - && conn_bitmap.is_none() + && conn_info.is_none() && foreign_network.is_none() && !session.need_sync_initiator_info.load(Ordering::Relaxed) && !(sync_as_initiator && session.we_are_initiator.load(Ordering::Relaxed)) @@ -1858,8 +1982,8 @@ impl PeerRouteServiceImpl { return true; } - tracing::debug!(?foreign_network, "sync_route request need send to peer. my_id {:?}, pper_id: {:?}, peer_infos: {:?}, conn_bitmap: {:?}, synced_route_info: {:?} session: {:?}", - my_peer_id, dst_peer_id, peer_infos, conn_bitmap, self.synced_route_info, session); + tracing::debug!(?foreign_network, "sync_route request need send to peer. my_id {:?}, pper_id: {:?}, peer_infos: {:?}, conn_info: {:?}, synced_route_info: {:?} session: {:?}", + my_peer_id, dst_peer_id, peer_infos, conn_info, self.synced_route_info, session); session .need_sync_initiator_info @@ -1878,7 +2002,7 @@ impl PeerRouteServiceImpl { my_session_id: session.my_session_id.load(Ordering::Relaxed), is_initiator: session.we_are_initiator.load(Ordering::Relaxed), peer_infos: peer_infos.clone().map(|x| RoutePeerInfos { items: x }), - conn_bitmap: conn_bitmap.clone().map(Into::into), + conn_info: conn_info.clone(), foreign_network_infos: foreign_network.clone(), }; @@ -1934,8 +2058,9 @@ impl PeerRouteServiceImpl { session.update_dst_saved_peer_info_version(peer_infos, dst_peer_id); } - if let Some(conn_bitmap) = &conn_bitmap { - session.update_dst_saved_conn_bitmap_version(conn_bitmap, dst_peer_id); + // Update session saved versions based on the connection info format used + if let Some(conn_info) = &conn_info { + session.update_dst_saved_conn_info_version(conn_info, dst_peer_id); } if let Some(foreign_network) = &foreign_network { @@ -2029,7 +2154,7 @@ impl OspfRouteRpc for RouteSessionManager { let from_session_id = request.my_session_id; let is_initiator = request.is_initiator; let peer_infos = request.peer_infos.map(|x| x.items); - let conn_bitmap = request.conn_bitmap.map(Into::into); + let conn_info = request.conn_info; let foreign_network = request.foreign_network_infos; let raw_peer_infos = if peer_infos.is_some() { let r = get_raw_peer_infos(&mut ctrl.get_raw_input().unwrap()).unwrap(); @@ -2046,7 +2171,7 @@ impl OspfRouteRpc for RouteSessionManager { is_initiator, peer_infos, raw_peer_infos, - conn_bitmap, + conn_info, foreign_network, ) .await; @@ -2293,7 +2418,7 @@ impl RouteSessionManager { is_initiator: bool, peer_infos: Option>, raw_peer_infos: Option>, - conn_bitmap: Option, + conn_info: Option, foreign_network: Option, ) -> Result { let Some(service_impl) = self.service_impl.upgrade() else { @@ -2327,9 +2452,9 @@ impl RouteSessionManager { need_update_route_table = true; } - if let Some(conn_bitmap) = &conn_bitmap { - service_impl.synced_route_info.update_conn_map(conn_bitmap); - session.update_dst_saved_conn_bitmap_version(conn_bitmap, from_peer_id); + if let Some(conn_info) = &conn_info { + service_impl.synced_route_info.update_conn_info(conn_info); + session.update_dst_saved_conn_info_version(conn_info, from_peer_id); need_update_route_table = true; } @@ -2349,8 +2474,8 @@ impl RouteSessionManager { } tracing::debug!( - "handling sync_route_info rpc: from_peer_id: {:?}, is_initiator: {:?}, peer_infos: {:?}, conn_bitmap: {:?}, synced_route_info: {:?} session: {:?}, new_route_table: {:?}", - from_peer_id, is_initiator, peer_infos, conn_bitmap, service_impl.synced_route_info, session, service_impl.route_table); + "handling sync_route_info rpc: from_peer_id: {:?}, is_initiator: {:?}, peer_infos: {:?}, conn_info: {:?}, synced_route_info: {:?} session: {:?}, new_route_table: {:?}", + from_peer_id, is_initiator, peer_infos, conn_info, service_impl.synced_route_info, session, service_impl.route_table); session .dst_is_initiator @@ -2715,9 +2840,9 @@ mod tests { peers::{ create_packet_recv_chan, peer_manager::{PeerManager, RouteAlgoType}, - peer_ospf_route::{PeerIdAndVersion, PeerRouteServiceImpl}, + peer_ospf_route::{PeerIdAndVersion, PeerRouteServiceImpl, FORCE_USE_CONN_LIST}, route_trait::{NextHopPolicy, Route, RouteCostCalculatorInterface}, - tests::{connect_peer_manager, create_mock_peer_manager}, + tests::{connect_peer_manager, create_mock_peer_manager, wait_route_appear}, }, proto::{ common::NatType, @@ -2773,8 +2898,11 @@ mod tests { assert!(rx1 <= max_rx); } + #[rstest::rstest] #[tokio::test] - async fn ospf_route_2node() { + async fn ospf_route_2node(#[values(true, false)] enable_conn_list_sync: bool) { + FORCE_USE_CONN_LIST.store(enable_conn_list_sync, Ordering::Relaxed); + let p_a = create_mock_pmgr().await; let p_b = create_mock_pmgr().await; connect_peer_manager(p_a.clone(), p_b.clone()).await; @@ -2845,8 +2973,11 @@ mod tests { .await; } + #[rstest::rstest] #[tokio::test] - async fn ospf_route_multi_node() { + async fn ospf_route_multi_node(#[values(true, false)] enable_conn_list_sync: bool) { + FORCE_USE_CONN_LIST.store(enable_conn_list_sync, Ordering::Relaxed); + let p_a = create_mock_pmgr().await; let p_b = create_mock_pmgr().await; let p_c = create_mock_pmgr().await; @@ -2976,8 +3107,10 @@ mod tests { } } + #[rstest::rstest] #[tokio::test] - async fn ospf_route_3node_disconnect() { + async fn ospf_route_3node_disconnect(#[values(true, false)] enable_conn_list_sync: bool) { + FORCE_USE_CONN_LIST.store(enable_conn_list_sync, Ordering::Relaxed); let p_a = create_mock_pmgr().await; let p_b = create_mock_pmgr().await; let p_c = create_mock_pmgr().await; @@ -3019,8 +3152,10 @@ mod tests { } } + #[rstest::rstest] #[tokio::test] - async fn peer_reconnect() { + async fn peer_reconnect(#[values(true, false)] enable_conn_list_sync: bool) { + FORCE_USE_CONN_LIST.store(enable_conn_list_sync, Ordering::Relaxed); let p_a = create_mock_pmgr().await; let p_b = create_mock_pmgr().await; let r_a = create_mock_route(p_a.clone()).await; @@ -3063,8 +3198,10 @@ mod tests { check_rpc_counter(&r_a, p_b.my_peer_id(), 2, 2); } + #[rstest::rstest] #[tokio::test] - async fn test_cost_calculator() { + async fn test_cost_calculator(#[values(true, false)] enable_conn_list_sync: bool) { + FORCE_USE_CONN_LIST.store(enable_conn_list_sync, Ordering::Relaxed); let p_a = create_mock_pmgr().await; let p_b = create_mock_pmgr().await; let p_c = create_mock_pmgr().await; @@ -3150,8 +3287,10 @@ mod tests { .await; } + #[rstest::rstest] #[tokio::test] - async fn test_raw_peer_info() { + async fn test_raw_peer_info(#[values(true, false)] enable_conn_list_sync: bool) { + FORCE_USE_CONN_LIST.store(enable_conn_list_sync, Ordering::Relaxed); let mut req = SyncRouteInfoRequest::default(); let raw_info_map: DashMap = DashMap::new(); @@ -3177,8 +3316,10 @@ mod tests { assert_eq!(req, req2); } + #[rstest::rstest] #[tokio::test] - async fn test_peer_id_map_override() { + async fn test_peer_id_map_override(#[values(true, false)] enable_conn_list_sync: bool) { + FORCE_USE_CONN_LIST.store(enable_conn_list_sync, Ordering::Relaxed); let p_a = create_mock_peer_manager().await; let p_b = create_mock_peer_manager().await; let p_c = create_mock_peer_manager().await; @@ -3230,9 +3371,10 @@ mod tests { p_b.get_global_ctx().config.remove_proxy_cidr(proxy); check_route_peer_id(p_c.clone()).await; } - + #[rstest::rstest] #[tokio::test] - async fn test_subnet_proxy_conflict() { + async fn test_subnet_proxy_conflict(#[values(true, false)] enable_conn_list_sync: bool) { + FORCE_USE_CONN_LIST.store(enable_conn_list_sync, Ordering::Relaxed); // Create three peer managers: A, B, C let p_a = create_mock_peer_manager().await; let p_b = create_mock_peer_manager().await; @@ -3325,4 +3467,21 @@ mod tests { ) .await; } + #[rstest::rstest] + #[tokio::test] + async fn test_connect_at_different_time(#[values(true, false)] enable_conn_list_sync: bool) { + FORCE_USE_CONN_LIST.store(enable_conn_list_sync, Ordering::Relaxed); + // Create three peer managers: A, B, C + let p_a = create_mock_peer_manager().await; + let p_b = create_mock_peer_manager().await; + let p_c = create_mock_peer_manager().await; + + // Connect A-B-C in a line topology + connect_peer_manager(p_a.clone(), p_b.clone()).await; + + wait_route_appear(p_a.clone(), p_b.clone()).await.unwrap(); + + connect_peer_manager(p_b.clone(), p_c.clone()).await; + wait_route_appear(p_a.clone(), p_c.clone()).await.unwrap(); + } } diff --git a/easytier/src/proto/common.proto b/easytier/src/proto/common.proto index e59be2e..e57df32 100644 --- a/easytier/src/proto/common.proto +++ b/easytier/src/proto/common.proto @@ -203,6 +203,7 @@ message PeerFeatureFlag { bool avoid_relay_data = 2; bool kcp_input = 3; bool no_relay_kcp = 4; + bool support_conn_list_sync = 5; } enum SocketType { diff --git a/easytier/src/proto/peer_rpc.proto b/easytier/src/proto/peer_rpc.proto index 61644a4..2aef0d7 100644 --- a/easytier/src/proto/peer_rpc.proto +++ b/easytier/src/proto/peer_rpc.proto @@ -39,6 +39,14 @@ message RouteConnBitmap { bytes bitmap = 2; } +message RouteConnPeerList { + message PeerConnInfo { + PeerIdVersion peer_id = 1; + repeated uint32 connected_peer_ids = 2; + } + repeated PeerConnInfo peer_conn_infos = 1; +} + message RoutePeerInfos { repeated RoutePeerInfo items = 1; } message ForeignNetworkRouteInfoKey { @@ -82,7 +90,10 @@ message SyncRouteInfoRequest { uint64 my_session_id = 2; bool is_initiator = 3; RoutePeerInfos peer_infos = 4; - RouteConnBitmap conn_bitmap = 5; + oneof conn_info { + RouteConnBitmap conn_bitmap = 5; + RouteConnPeerList conn_peer_list = 7; + } RouteForeignNetworkInfos foreign_network_infos = 6; } diff --git a/easytier/src/proto/peer_rpc.rs b/easytier/src/proto/peer_rpc.rs index 6a62715..f3203ec 100644 --- a/easytier/src/proto/peer_rpc.rs +++ b/easytier/src/proto/peer_rpc.rs @@ -38,6 +38,18 @@ impl PeerGroupInfo { } } +impl From for sync_route_info_request::ConnInfo { + fn from(val: RouteConnBitmap) -> Self { + Self::ConnBitmap(val) + } +} + +impl From for sync_route_info_request::ConnInfo { + fn from(val: RouteConnPeerList) -> Self { + Self::ConnPeerList(val) + } +} + #[cfg(test)] mod tests { use super::*;