From 9261d0d32dd8b1a055afc0f30285fc1263a94368 Mon Sep 17 00:00:00 2001 From: "Sijie.Sun" Date: Sat, 2 Mar 2024 22:29:31 +0800 Subject: [PATCH] optimize bandwidth usage (#24) 1. stable stun test result 2. stable report peers result 3. do not send same packet to rip peer --- easytier-core/src/common/stun.rs | 11 ++++++++- easytier-core/src/peer_center/instance.rs | 21 +++++++++++++--- easytier-core/src/peer_center/server.rs | 2 +- easytier-core/src/peers/peer_rip_route.rs | 30 +++++++++++++++++------ 4 files changed, 51 insertions(+), 13 deletions(-) diff --git a/easytier-core/src/common/stun.rs b/easytier-core/src/common/stun.rs index 17a01ea..a8cb5db 100644 --- a/easytier-core/src/common/stun.rs +++ b/easytier-core/src/common/stun.rs @@ -449,7 +449,16 @@ impl StunInfoCollector { self.tasks.spawn(async move { loop { let detector = UdpNatTypeDetector::new(stun_servers.read().await.clone()); - let ret = detector.get_udp_nat_type(0).await; + let old_nat_type = udp_nat_type.load().0; + let mut ret = NatType::Unknown; + for _ in 1..5 { + // if nat type degrade, sleep and retry. so result can be relatively stable. + ret = detector.get_udp_nat_type(0).await; + if ret == NatType::Unknown || ret <= old_nat_type { + break; + } + tokio::time::sleep(Duration::from_secs(5)).await; + } udp_nat_type.store((ret, std::time::Instant::now())); let sleep_sec = match ret { diff --git a/easytier-core/src/peer_center/instance.rs b/easytier-core/src/peer_center/instance.rs index 42075e4..064234c 100644 --- a/easytier-core/src/peer_center/instance.rs +++ b/easytier-core/src/peer_center/instance.rs @@ -202,7 +202,7 @@ impl PeerCenterInstance { }; let Some(resp) = resp else { - return Ok(1000); + return Ok(5000); }; tracing::info!( @@ -214,7 +214,7 @@ impl PeerCenterInstance { *ctx.job_ctx.global_peer_map.write().await = resp.global_peer_map; *ctx.job_ctx.global_peer_map_digest.write().await = resp.digest; - Ok(5000) + Ok(10000) }) .await; } @@ -223,16 +223,29 @@ impl PeerCenterInstance { struct Ctx { service: PeerManagerRpcService, need_send_peers: AtomicBool, + last_report_peers: Mutex, } let ctx = Arc::new(Ctx { service: PeerManagerRpcService::new(self.peer_mgr.clone()), need_send_peers: AtomicBool::new(true), + last_report_peers: Mutex::new(PeerInfoForGlobalMap::default()), }); self.client .init_periodic_job(ctx, |client, ctx| async move { let my_node_id = ctx.peer_mgr.my_node_id(); - let peers: PeerInfoForGlobalMap = ctx.job_ctx.service.list_peers().await.into(); + + // if peers are not same in next 10 seconds, report peers to center server + let mut peers = PeerInfoForGlobalMap::default(); + for _ in 1..10 { + peers = ctx.job_ctx.service.list_peers().await.into(); + if peers == *ctx.job_ctx.last_report_peers.lock().await { + break; + } + tokio::time::sleep(Duration::from_secs(1)).await; + } + + *ctx.job_ctx.last_report_peers.lock().await = peers.clone(); let mut hasher = DefaultHasher::new(); peers.hash(&mut hasher); @@ -262,7 +275,7 @@ impl PeerCenterInstance { } ctx.job_ctx.need_send_peers.store(false, Ordering::Relaxed); - Ok(1000) + Ok(3000) }) .await; } diff --git a/easytier-core/src/peer_center/server.rs b/easytier-core/src/peer_center/server.rs index 3d82065..18bf378 100644 --- a/easytier-core/src/peer_center/server.rs +++ b/easytier-core/src/peer_center/server.rs @@ -77,7 +77,7 @@ impl PeerCenterServer { let now = std::time::Instant::now(); let mut to_remove = Vec::new(); for kv in locked_data.peer_update_time.iter() { - if now.duration_since(*kv.value()).as_secs() > 10 { + if now.duration_since(*kv.value()).as_secs() > 20 { to_remove.push(*kv.key()); } } diff --git a/easytier-core/src/peers/peer_rip_route.rs b/easytier-core/src/peers/peer_rip_route.rs index c148064..286c467 100644 --- a/easytier-core/src/peers/peer_rip_route.rs +++ b/easytier-core/src/peers/peer_rip_route.rs @@ -32,6 +32,10 @@ use crate::{ use super::{packet::ArchivedPacketBody, peer_manager::PeerPacketFilter}; +const SEND_ROUTE_PERIOD_SEC: u64 = 60; +const SEND_ROUTE_FAST_REPLY_SEC: u64 = 5; +const ROUTE_EXPIRED_SEC: u64 = 70; + type Version = u32; #[derive(Archive, Deserialize, Serialize, Clone, Debug, PartialEq)] @@ -192,7 +196,7 @@ pub struct BasicRoute { version: RouteVersion, myself: Arc>, - last_send_time_map: Arc>, + last_send_time_map: Arc, Instant)>>, } impl BasicRoute { @@ -373,10 +377,10 @@ impl BasicRoute { let last_send_time_map_new = DashMap::new(); let peers = interface.list_peers().await; for peer in peers.iter() { - let last_send_time = last_send_time_map.get(peer).map(|v| *v).unwrap_or(Instant::now() - Duration::from_secs(3600)); + let last_send_time = last_send_time_map.get(peer).map(|v| *v).unwrap_or((0, None, Instant::now() - Duration::from_secs(3600))); let my_version_peer_saved = sync_peer_from_remote.get(&peer).and_then(|v| v.packet.peer_version); let peer_have_latest_version = my_version_peer_saved == Some(version.get()); - if peer_have_latest_version && last_send_time.elapsed().as_secs() < 60 { + if peer_have_latest_version && last_send_time.2.elapsed().as_secs() < SEND_ROUTE_PERIOD_SEC { last_send_time_map_new.insert(*peer, last_send_time); continue; } @@ -386,11 +390,13 @@ impl BasicRoute { dst_peer_id = ?peer, version = version.get(), ?my_version_peer_saved, - last_send_elapse = ?last_send_time.elapsed().as_secs(), + last_send_version = ?last_send_time.0, + last_send_peer_version = ?last_send_time.1, + last_send_elapse = ?last_send_time.2.elapsed().as_secs(), "need send route info" ); - last_send_time_map_new.insert(*peer, Instant::now()); let peer_version_we_saved = sync_peer_from_remote.get(&peer).and_then(|v| Some(v.packet.version)); + last_send_time_map_new.insert(*peer, (version.get(), peer_version_we_saved, Instant::now())); let ret = Self::send_sync_peer_request( interface, my_peer_id.clone(), @@ -457,7 +463,7 @@ impl BasicRoute { let connected_peers = interface.lock().await.as_ref().unwrap().list_peers().await; for item in sync_peer_from_remote.iter() { let (k, v) = item.pair(); - if now.duration_since(v.last_update).as_secs() > 70 + if now.duration_since(v.last_update).as_secs() > ROUTE_EXPIRED_SEC || !connected_peers.contains(k) { need_update_route = true; @@ -541,7 +547,17 @@ impl BasicRoute { if packet.need_reply { self.last_send_time_map - .remove(&packet.myself.peer_id.to_uuid()); + .entry(packet.myself.peer_id.to_uuid()) + .and_modify(|v| { + const FAST_REPLY_DURATION: u64 = + SEND_ROUTE_PERIOD_SEC - SEND_ROUTE_FAST_REPLY_SEC; + if v.0 != self.version.get() || v.1 != Some(p.version) { + v.2 = Instant::now() - Duration::from_secs(3600); + } else if v.2.elapsed().as_secs() < FAST_REPLY_DURATION { + // do not send same version route info too frequently + v.2 = Instant::now() - Duration::from_secs(FAST_REPLY_DURATION); + } + }); } if updated || packet.need_reply {