diff --git a/easytier/proto/cli.proto b/easytier/proto/cli.proto index bbc08e0..675093d 100644 --- a/easytier/proto/cli.proto +++ b/easytier/proto/cli.proto @@ -117,16 +117,8 @@ service ConnectorManageRpc { rpc ManageConnector (ManageConnectorRequest) returns (ManageConnectorResponse); } -enum LatencyLevel { - VeryLow = 0; - Low = 1; - Normal = 2; - High = 3; - VeryHigh = 4; -} - message DirectConnectedPeerInfo { - LatencyLevel latency_level = 2; + int32 latency_ms = 1; } message PeerInfoForGlobalMap { diff --git a/easytier/src/easytier-cli.rs b/easytier/src/easytier-cli.rs index 44dc223..97541ba 100644 --- a/easytier/src/easytier-cli.rs +++ b/easytier/src/easytier-cli.rs @@ -331,13 +331,7 @@ async fn main() -> Result<(), Error> { let direct_peers = v .direct_peers .iter() - .map(|(k, v)| { - format!( - "{}:{:?}", - k, - LatencyLevel::try_from(v.latency_level).unwrap() - ) - }) + .map(|(k, v)| format!("{}: {:?}ms", k, v.latency_ms,)) .collect::>(); table_rows.push(PeerCenterTableItem { node_id: node_id.to_string(), diff --git a/easytier/src/peer_center/instance.rs b/easytier/src/peer_center/instance.rs index 50ca5a6..6e25f5e 100644 --- a/easytier/src/peer_center/instance.rs +++ b/easytier/src/peer_center/instance.rs @@ -1,24 +1,23 @@ use std::{ - collections::hash_map::DefaultHasher, - hash::{Hash, Hasher}, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, - time::{Duration, SystemTime}, + collections::BTreeSet, + sync::Arc, + time::{Duration, Instant, SystemTime}, }; use crossbeam::atomic::AtomicCell; use futures::Future; -use tokio::{ - sync::{Mutex, RwLock}, - task::JoinSet, -}; +use std::sync::RwLock; +use tokio::sync::Mutex; +use tokio::task::JoinSet; use tracing::Instrument; use crate::{ common::PeerId, - peers::{peer_manager::PeerManager, rpc_service::PeerManagerRpcService}, + peers::{ + peer_manager::PeerManager, + route_trait::{RouteCostCalculator, RouteCostCalculatorInterface}, + rpc_service::PeerManagerRpcService, + }, rpc::{GetGlobalPeerMapRequest, GetGlobalPeerMapResponse}, }; @@ -34,7 +33,8 @@ struct PeerCenterBase { lock: Arc>, } -static SERVICE_ID: u32 = 5; +// static SERVICE_ID: u32 = 5; for compatibility with the original code +static SERVICE_ID: u32 = 50; struct PeridicJobCtx { peer_mgr: Arc, @@ -132,7 +132,7 @@ impl PeerCenterBase { pub struct PeerCenterInstanceService { global_peer_map: Arc>, - global_peer_map_digest: Arc>, + global_peer_map_digest: Arc>, } #[tonic::async_trait] @@ -141,7 +141,7 @@ impl crate::rpc::cli::peer_center_rpc_server::PeerCenterRpc for PeerCenterInstan &self, _request: tonic::Request, ) -> Result, tonic::Status> { - let global_peer_map = self.global_peer_map.read().await.clone(); + let global_peer_map = self.global_peer_map.read().unwrap().clone(); Ok(tonic::Response::new(GetGlobalPeerMapResponse { global_peer_map: global_peer_map .map @@ -157,7 +157,8 @@ pub struct PeerCenterInstance { client: Arc, global_peer_map: Arc>, - global_peer_map_digest: Arc>, + global_peer_map_digest: Arc>, + global_peer_map_update_time: Arc>, } impl PeerCenterInstance { @@ -166,7 +167,8 @@ impl PeerCenterInstance { peer_mgr: peer_mgr.clone(), client: Arc::new(PeerCenterBase::new(peer_mgr.clone())), global_peer_map: Arc::new(RwLock::new(GlobalPeerMap::new())), - global_peer_map_digest: Arc::new(RwLock::new(Digest::default())), + global_peer_map_digest: Arc::new(AtomicCell::new(Digest::default())), + global_peer_map_update_time: Arc::new(AtomicCell::new(Instant::now())), } } @@ -179,12 +181,14 @@ impl PeerCenterInstance { async fn init_get_global_info_job(&self) { struct Ctx { global_peer_map: Arc>, - global_peer_map_digest: Arc>, + global_peer_map_digest: Arc>, + global_peer_map_update_time: Arc>, } let ctx = Arc::new(Ctx { global_peer_map: self.global_peer_map.clone(), global_peer_map_digest: self.global_peer_map_digest.clone(), + global_peer_map_update_time: self.global_peer_map_update_time.clone(), }); self.client @@ -193,10 +197,7 @@ impl PeerCenterInstance { rpc_ctx.deadline = SystemTime::now() + Duration::from_secs(3); let ret = client - .get_global_peer_map( - rpc_ctx, - ctx.job_ctx.global_peer_map_digest.read().await.clone(), - ) + .get_global_peer_map(rpc_ctx, ctx.job_ctx.global_peer_map_digest.load()) .await?; let Ok(resp) = ret else { @@ -217,10 +218,13 @@ impl PeerCenterInstance { resp.digest ); - *ctx.job_ctx.global_peer_map.write().await = resp.global_peer_map; - *ctx.job_ctx.global_peer_map_digest.write().await = resp.digest; + *ctx.job_ctx.global_peer_map.write().unwrap() = resp.global_peer_map; + ctx.job_ctx.global_peer_map_digest.store(resp.digest); + ctx.job_ctx + .global_peer_map_update_time + .store(Instant::now()); - Ok(10000) + Ok(5000) }) .await; } @@ -228,67 +232,53 @@ impl PeerCenterInstance { async fn init_report_peers_job(&self) { struct Ctx { service: PeerManagerRpcService, - need_send_peers: AtomicBool, - last_report_peers: Mutex, + + last_report_peers: Mutex>, + last_center_peer: AtomicCell, + last_report_time: AtomicCell, } let ctx = Arc::new(Ctx { service: PeerManagerRpcService::new(self.peer_mgr.clone()), - need_send_peers: AtomicBool::new(true), - last_report_peers: Mutex::new(PeerInfoForGlobalMap::default()), + last_report_peers: Mutex::new(BTreeSet::new()), last_center_peer: AtomicCell::new(PeerId::default()), + last_report_time: AtomicCell::new(Instant::now()), }); self.client .init_periodic_job(ctx, |client, ctx| async move { let my_node_id = ctx.peer_mgr.my_peer_id(); + let peers: PeerInfoForGlobalMap = ctx.job_ctx.service.list_peers().await.into(); + let peer_list = peers.direct_peers.keys().map(|k| *k).collect(); + let job_ctx = &ctx.job_ctx; - // if peers are not same in next 10 seconds, report peers to center server - let mut peers = PeerInfoForGlobalMap::default(); - for _ in 1..10 { - peers = ctx.job_ctx.service.list_peers().await.into(); - if ctx.center_peer.load() != ctx.job_ctx.last_center_peer.load() { - // if center peer changed, report peers immediately - break; - } - if peers == *ctx.job_ctx.last_report_peers.lock().await { - return Ok(3000); - } - tokio::time::sleep(Duration::from_secs(2)).await; + // only report when: + // 1. center peer changed + // 2. last report time is more than 60 seconds + // 3. peers changed + if ctx.center_peer.load() == ctx.job_ctx.last_center_peer.load() + && job_ctx.last_report_time.load().elapsed().as_secs() < 60 + && *job_ctx.last_report_peers.lock().await == peer_list + { + return Ok(5000); } - *ctx.job_ctx.last_report_peers.lock().await = peers.clone(); - let mut hasher = DefaultHasher::new(); - peers.hash(&mut hasher); - - let peers = if ctx.job_ctx.need_send_peers.load(Ordering::Relaxed) { - Some(peers) - } else { - None - }; let mut rpc_ctx = tarpc::context::current(); rpc_ctx.deadline = SystemTime::now() + Duration::from_secs(3); let ret = client - .report_peers( - rpc_ctx, - my_node_id.clone(), - peers, - hasher.finish() as Digest, - ) + .report_peers(rpc_ctx, my_node_id.clone(), peers) .await?; - if matches!(ret.as_ref().err(), Some(Error::DigestMismatch)) { - ctx.job_ctx.need_send_peers.store(true, Ordering::Relaxed); - return Ok(0); - } else if ret.is_err() { + if ret.is_ok() { + ctx.job_ctx.last_center_peer.store(ctx.center_peer.load()); + *ctx.job_ctx.last_report_peers.lock().await = peer_list; + ctx.job_ctx.last_report_time.store(Instant::now()); + } else { tracing::error!("report peers to center server got error result: {:?}", ret); - return Ok(500); } - ctx.job_ctx.last_center_peer.store(ctx.center_peer.load()); - ctx.job_ctx.need_send_peers.store(false, Ordering::Relaxed); - Ok(3000) + Ok(5000) }) .await; } @@ -299,15 +289,62 @@ impl PeerCenterInstance { global_peer_map_digest: self.global_peer_map_digest.clone(), } } + + pub fn get_cost_calculator(&self) -> RouteCostCalculator { + struct RouteCostCalculatorImpl { + global_peer_map: Arc>, + + global_peer_map_clone: GlobalPeerMap, + + last_update_time: AtomicCell, + global_peer_map_update_time: Arc>, + } + + impl RouteCostCalculatorInterface for RouteCostCalculatorImpl { + fn calculate_cost(&self, src: PeerId, dst: PeerId) -> i32 { + let ret = self + .global_peer_map_clone + .map + .get(&src) + .and_then(|src_peer_info| src_peer_info.direct_peers.get(&dst)) + .and_then(|info| Some(info.latency_ms)); + ret.unwrap_or(i32::MAX) + } + + fn begin_update(&mut self) { + let global_peer_map = self.global_peer_map.read().unwrap(); + self.global_peer_map_clone = global_peer_map.clone(); + } + + fn end_update(&mut self) { + self.last_update_time + .store(self.global_peer_map_update_time.load()); + } + + fn need_update(&self) -> bool { + self.last_update_time.load() < self.global_peer_map_update_time.load() + } + } + + Box::new(RouteCostCalculatorImpl { + global_peer_map: self.global_peer_map.clone(), + global_peer_map_clone: GlobalPeerMap::new(), + last_update_time: AtomicCell::new( + self.global_peer_map_update_time.load() - Duration::from_secs(1), + ), + global_peer_map_update_time: self.global_peer_map_update_time.clone(), + }) + } } #[cfg(test)] mod tests { - use std::ops::Deref; - use crate::{ peer_center::server::get_global_data, - peers::tests::{connect_peer_manager, create_mock_peer_manager, wait_route_appear}, + peers::tests::{ + connect_peer_manager, create_mock_peer_manager, wait_for_condition, wait_route_appear, + }, + tunnel::common::tests::enable_log, }; use super::*; @@ -340,43 +377,64 @@ mod tests { let center_data = get_global_data(center_peer); // wait center_data has 3 records for 10 seconds - let now = std::time::Instant::now(); - loop { - if center_data.read().await.global_peer_map.map.len() == 3 { - println!( - "center data ready, {:#?}", - center_data.read().await.global_peer_map - ); - break; - } - if now.elapsed().as_secs() > 60 { - panic!("center data not ready"); - } - tokio::time::sleep(Duration::from_millis(100)).await; - } + wait_for_condition( + || async { + if center_data.global_peer_map.len() == 4 { + println!("center data {:#?}", center_data.global_peer_map); + true + } else { + false + } + }, + Duration::from_secs(10), + ) + .await; let mut digest = None; for pc in peer_centers.iter() { let rpc_service = pc.get_rpc_service(); - let now = std::time::Instant::now(); - while now.elapsed().as_secs() < 10 { - if rpc_service.global_peer_map.read().await.map.len() == 3 { - break; - } - tokio::time::sleep(Duration::from_millis(100)).await; - } - assert_eq!(rpc_service.global_peer_map.read().await.map.len(), 3); + wait_for_condition( + || async { rpc_service.global_peer_map.read().unwrap().map.len() == 3 }, + Duration::from_secs(10), + ) + .await; + println!("rpc service ready, {:#?}", rpc_service.global_peer_map); if digest.is_none() { - digest = Some(rpc_service.global_peer_map_digest.read().await.clone()); + digest = Some(rpc_service.global_peer_map_digest.load()); } else { - let v = rpc_service.global_peer_map_digest.read().await; - assert_eq!(digest.as_ref().unwrap(), v.deref()); + let v = rpc_service.global_peer_map_digest.load(); + assert_eq!(digest.unwrap(), v); } + + let mut route_cost = pc.get_cost_calculator(); + assert!(route_cost.need_update()); + + route_cost.begin_update(); + assert!( + route_cost.calculate_cost(peer_mgr_a.my_peer_id(), peer_mgr_b.my_peer_id()) < 30 + ); + assert!( + route_cost.calculate_cost(peer_mgr_b.my_peer_id(), peer_mgr_a.my_peer_id()) < 30 + ); + assert!( + route_cost.calculate_cost(peer_mgr_b.my_peer_id(), peer_mgr_c.my_peer_id()) < 30 + ); + assert!( + route_cost.calculate_cost(peer_mgr_c.my_peer_id(), peer_mgr_b.my_peer_id()) < 30 + ); + assert!( + route_cost.calculate_cost(peer_mgr_c.my_peer_id(), peer_mgr_a.my_peer_id()) > 10000 + ); + assert!( + route_cost.calculate_cost(peer_mgr_a.my_peer_id(), peer_mgr_c.my_peer_id()) > 10000 + ); + route_cost.end_update(); + assert!(!route_cost.need_update()); } - let global_digest = get_global_data(center_peer).read().await.digest.clone(); + let global_digest = get_global_data(center_peer).digest.load(); assert_eq!(digest.as_ref().unwrap(), &global_digest); } } diff --git a/easytier/src/peer_center/server.rs b/easytier/src/peer_center/server.rs index 89a929f..efa6029 100644 --- a/easytier/src/peer_center/server.rs +++ b/easytier/src/peer_center/server.rs @@ -1,45 +1,48 @@ use std::{ + collections::BinaryHeap, hash::{Hash, Hasher}, sync::Arc, }; +use crossbeam::atomic::AtomicCell; use dashmap::DashMap; use once_cell::sync::Lazy; -use tokio::{sync::RwLock, task::JoinSet}; +use tokio::{task::JoinSet}; -use crate::common::PeerId; +use crate::{common::PeerId, rpc::DirectConnectedPeerInfo}; use super::{ service::{GetGlobalPeerMapResponse, GlobalPeerMap, PeerCenterService, PeerInfoForGlobalMap}, Digest, Error, }; -pub(crate) struct PeerCenterServerGlobalData { - pub global_peer_map: GlobalPeerMap, - pub digest: Digest, - pub update_time: std::time::Instant, - pub peer_update_time: DashMap, +#[derive(Debug, Clone, PartialEq, PartialOrd, Ord, Eq, Hash)] +pub(crate) struct SrcDstPeerPair { + src: PeerId, + dst: PeerId, } -impl PeerCenterServerGlobalData { - fn new() -> Self { - PeerCenterServerGlobalData { - global_peer_map: GlobalPeerMap::new(), - digest: Digest::default(), - update_time: std::time::Instant::now(), - peer_update_time: DashMap::new(), - } - } +#[derive(Debug, Clone)] +pub(crate) struct PeerCenterInfoEntry { + info: DirectConnectedPeerInfo, + update_time: std::time::Instant, +} + +#[derive(Default)] +pub(crate) struct PeerCenterServerGlobalData { + pub(crate) global_peer_map: DashMap, + pub(crate) peer_report_time: DashMap, + pub(crate) digest: AtomicCell, } // a global unique instance for PeerCenterServer -pub(crate) static GLOBAL_DATA: Lazy>>> = +pub(crate) static GLOBAL_DATA: Lazy>> = Lazy::new(DashMap::new); -pub(crate) fn get_global_data(node_id: PeerId) -> Arc> { +pub(crate) fn get_global_data(node_id: PeerId) -> Arc { GLOBAL_DATA .entry(node_id) - .or_insert_with(|| Arc::new(RwLock::new(PeerCenterServerGlobalData::new()))) + .or_insert_with(|| Arc::new(PeerCenterServerGlobalData::default())) .value() .clone() } @@ -48,8 +51,6 @@ pub(crate) fn get_global_data(node_id: PeerId) -> Arc, - tasks: Arc>, } @@ -65,26 +66,32 @@ impl PeerCenterServer { PeerCenterServer { my_node_id, - digest_map: DashMap::new(), - tasks: Arc::new(tasks), } } async fn clean_outdated_peer(my_node_id: PeerId) { let data = get_global_data(my_node_id); - let mut locked_data = data.write().await; - let now = std::time::Instant::now(); - let mut to_remove = Vec::new(); - for kv in locked_data.peer_update_time.iter() { - if now.duration_since(*kv.value()).as_secs() > 20 { - to_remove.push(*kv.key()); - } - } - for peer_id in to_remove { - locked_data.global_peer_map.map.remove(&peer_id); - locked_data.peer_update_time.remove(&peer_id); - } + data.peer_report_time.retain(|_, v| { + std::time::Instant::now().duration_since(*v) < std::time::Duration::from_secs(180) + }); + data.global_peer_map.retain(|_, v| { + std::time::Instant::now().duration_since(v.update_time) + < std::time::Duration::from_secs(180) + }); + } + + fn calc_global_digest(my_node_id: PeerId) -> Digest { + let data = get_global_data(my_node_id); + let mut hasher = std::collections::hash_map::DefaultHasher::new(); + data.global_peer_map + .iter() + .map(|v| v.key().clone()) + .collect::>() + .into_sorted_vec() + .into_iter() + .for_each(|v| v.hash(&mut hasher)); + hasher.finish() } } @@ -95,39 +102,28 @@ impl PeerCenterService for PeerCenterServer { self, _: tarpc::context::Context, my_peer_id: PeerId, - peers: Option, - digest: Digest, + peers: PeerInfoForGlobalMap, ) -> Result<(), Error> { - tracing::trace!("receive report_peers"); + tracing::debug!("receive report_peers"); let data = get_global_data(self.my_node_id); - let mut locked_data = data.write().await; - locked_data - .peer_update_time + data.peer_report_time .insert(my_peer_id, std::time::Instant::now()); - let old_digest = self.digest_map.get(&my_peer_id); - // if digest match, no need to update - if let Some(old_digest) = old_digest { - if *old_digest == digest { - return Ok(()); - } + for (peer_id, peer_info) in peers.direct_peers { + let pair = SrcDstPeerPair { + src: my_peer_id, + dst: peer_id, + }; + let entry = PeerCenterInfoEntry { + info: peer_info, + update_time: std::time::Instant::now(), + }; + data.global_peer_map.insert(pair, entry); } - if peers.is_none() { - return Err(Error::DigestMismatch); - } - - self.digest_map.insert(my_peer_id, digest); - locked_data - .global_peer_map - .map - .insert(my_peer_id, peers.unwrap()); - - let mut hasher = std::collections::hash_map::DefaultHasher::new(); - locked_data.global_peer_map.map.hash(&mut hasher); - locked_data.digest = hasher.finish() as Digest; - locked_data.update_time = std::time::Instant::now(); + data.digest + .store(PeerCenterServer::calc_global_digest(self.my_node_id)); Ok(()) } @@ -138,15 +134,26 @@ impl PeerCenterService for PeerCenterServer { digest: Digest, ) -> Result, Error> { let data = get_global_data(self.my_node_id); - if digest == data.read().await.digest { + if digest == data.digest.load() && digest != 0 { return Ok(None); } - let data = get_global_data(self.my_node_id); - let locked_data = data.read().await; + let mut global_peer_map = GlobalPeerMap::new(); + for item in data.global_peer_map.iter() { + let (pair, entry) = item.pair(); + global_peer_map + .map + .entry(pair.src) + .or_insert_with(|| PeerInfoForGlobalMap { + direct_peers: Default::default(), + }) + .direct_peers + .insert(pair.dst, entry.info.clone()); + } + Ok(Some(GetGlobalPeerMapResponse { - global_peer_map: locked_data.global_peer_map.clone(), - digest: locked_data.digest, + global_peer_map, + digest: data.digest.load(), })) } } diff --git a/easytier/src/peer_center/service.rs b/easytier/src/peer_center/service.rs index 647d3aa..d6b4d59 100644 --- a/easytier/src/peer_center/service.rs +++ b/easytier/src/peer_center/service.rs @@ -5,24 +5,6 @@ use crate::{common::PeerId, rpc::DirectConnectedPeerInfo}; use super::{Digest, Error}; use crate::rpc::PeerInfo; -pub type LatencyLevel = crate::rpc::cli::LatencyLevel; - -impl LatencyLevel { - pub const fn from_latency_ms(lat_ms: u32) -> Self { - if lat_ms < 10 { - LatencyLevel::VeryLow - } else if lat_ms < 50 { - LatencyLevel::Low - } else if lat_ms < 100 { - LatencyLevel::Normal - } else if lat_ms < 200 { - LatencyLevel::High - } else { - LatencyLevel::VeryHigh - } - } -} - pub type PeerInfoForGlobalMap = crate::rpc::cli::PeerInfoForGlobalMap; impl From> for PeerInfoForGlobalMap { @@ -34,10 +16,10 @@ impl From> for PeerInfoForGlobalMap { .iter() .map(|conn| conn.stats.as_ref().unwrap().latency_us) .min() - .unwrap_or(0); + .unwrap_or(u32::MAX as u64); let dp_info = DirectConnectedPeerInfo { - latency_level: LatencyLevel::from_latency_ms(min_lat as u32 / 1000) as i32, + latency_ms: std::cmp::max(1, (min_lat as u32 / 1000) as i32), }; // sort conn info so hash result is stable @@ -73,11 +55,7 @@ pub struct GetGlobalPeerMapResponse { pub trait PeerCenterService { // report center server which peer is directly connected to me // digest is a hash of current peer map, if digest not match, we need to transfer the whole map - async fn report_peers( - my_peer_id: PeerId, - peers: Option, - digest: Digest, - ) -> Result<(), Error>; + async fn report_peers(my_peer_id: PeerId, peers: PeerInfoForGlobalMap) -> Result<(), Error>; async fn get_global_peer_map(digest: Digest) -> Result, Error>; diff --git a/easytier/src/peers/peer_ospf_route.rs b/easytier/src/peers/peer_ospf_route.rs index 0e9acd7..614bab7 100644 --- a/easytier/src/peers/peer_ospf_route.rs +++ b/easytier/src/peers/peer_ospf_route.rs @@ -683,16 +683,18 @@ impl PeerRouteServiceImpl { DefaultRouteCostCalculator::default(), ); - let calc_locked = self.cost_calculator.lock().unwrap(); + let mut calc_locked = self.cost_calculator.lock().unwrap(); if calc_locked.is_none() { return; } + calc_locked.as_mut().unwrap().begin_update(); self.route_table_with_cost.build_from_synced_info( self.my_peer_id, &self.synced_route_info, - &calc_locked.as_ref().unwrap(), + calc_locked.as_mut().unwrap(), ); + calc_locked.as_mut().unwrap().end_update(); } fn cost_calculator_need_update(&self) -> bool { diff --git a/easytier/src/peers/route_trait.rs b/easytier/src/peers/route_trait.rs index d232ab4..ad64df3 100644 --- a/easytier/src/peers/route_trait.rs +++ b/easytier/src/peers/route_trait.rs @@ -31,8 +31,11 @@ pub trait RouteInterface { pub type RouteInterfaceBox = Box; -#[auto_impl::auto_impl(Box, Arc, &)] +#[auto_impl::auto_impl(Box , &mut)] pub trait RouteCostCalculatorInterface: Send + Sync { + fn begin_update(&mut self) {} + fn end_update(&mut self) {} + fn calculate_cost(&self, _src: PeerId, _dst: PeerId) -> i32 { 1 }