diff --git a/easytier/src/common/mod.rs b/easytier/src/common/mod.rs index 7258bcc..a5bd69b 100644 --- a/easytier/src/common/mod.rs +++ b/easytier/src/common/mod.rs @@ -4,7 +4,7 @@ use std::{ io::Write as _, sync::{Arc, Mutex}, }; -use tokio::task::JoinSet; +use tokio::{task::JoinSet, time::timeout}; use tracing::Instrument; pub mod compressor; @@ -47,16 +47,13 @@ pub fn join_joinset_background( origin: String, ) { let js = Arc::downgrade(&js); + let o = origin.clone(); tokio::spawn( async move { - loop { + while js.strong_count() > 0 { tokio::time::sleep(std::time::Duration::from_secs(1)).await; - if js.weak_count() == 0 { - tracing::info!("joinset task exit"); - break; - } - future::poll_fn(|cx| { + let fut = future::poll_fn(|cx| { let Some(js) = js.upgrade() else { return std::task::Poll::Ready(()); }; @@ -64,15 +61,24 @@ pub fn join_joinset_background( let mut js = js.lock().unwrap(); while !js.is_empty() { let ret = js.poll_join_next(cx); - if ret.is_pending() { - return std::task::Poll::Pending; + match ret { + std::task::Poll::Ready(Some(_)) => { + continue; + } + std::task::Poll::Ready(None) => { + break; + } + std::task::Poll::Pending => { + return std::task::Poll::Pending; + } } } - std::task::Poll::Ready(()) - }) - .await; + }); + + let _ = timeout(std::time::Duration::from_secs(5), fut).await; } + tracing::debug!(?o, "joinset task exit"); } .instrument(tracing::info_span!( "join_joinset_background", @@ -167,5 +173,6 @@ mod tests { drop(js); tokio::time::sleep(std::time::Duration::from_secs(2)).await; assert_eq!(weak_js.weak_count(), 0); + assert_eq!(weak_js.strong_count(), 0); } } diff --git a/easytier/src/peers/foreign_network_manager.rs b/easytier/src/peers/foreign_network_manager.rs index c249e7d..dfc143c 100644 --- a/easytier/src/peers/foreign_network_manager.rs +++ b/easytier/src/peers/foreign_network_manager.rs @@ -24,6 +24,7 @@ use crate::{ config::{ConfigLoader, TomlConfigLoader}, error::Error, global_ctx::{ArcGlobalCtx, GlobalCtx, GlobalCtxEvent, NetworkIdentity}, + join_joinset_background, stun::MockStunInfoCollector, PeerId, }, @@ -181,6 +182,15 @@ impl ForeignNetworkEntry { } } + impl Drop for RpcTransport { + fn drop(&mut self) { + tracing::debug!( + "drop rpc transport for foreign network manager, my_peer_id: {:?}", + self.my_peer_id + ); + } + } + let (rpc_transport_sender, peer_rpc_tspt_recv) = mpsc::unbounded_channel(); let tspt = RpcTransport { my_peer_id, @@ -216,7 +226,6 @@ impl ForeignNetworkEntry { .list_global_foreign_peer(&self.network_identity) .await; let local = peer_map.list_peers_with_conn().await; - tracing::debug!(?global, ?local, ?self.my_peer_id, "list peers in foreign network manager"); global.extend(local.iter().cloned()); global .into_iter() @@ -426,7 +435,7 @@ pub struct ForeignNetworkManager { data: Arc, - tasks: Mutex>, + tasks: Arc>>, } impl ForeignNetworkManager { @@ -444,6 +453,9 @@ impl ForeignNetworkManager { lock: std::sync::Mutex::new(()), }); + let tasks = Arc::new(std::sync::Mutex::new(JoinSet::new())); + join_joinset_background(tasks.clone(), "ForeignNetworkManager".to_string()); + Self { my_peer_id, global_ctx, @@ -451,7 +463,7 @@ impl ForeignNetworkManager { data, - tasks: Mutex::new(JoinSet::new()), + tasks, } } @@ -503,7 +515,7 @@ impl ForeignNetworkManager { let data = self.data.clone(); let network_name = entry.network.network_name.clone(); let mut s = entry.global_ctx.subscribe(); - self.tasks.lock().await.spawn(async move { + self.tasks.lock().unwrap().spawn(async move { while let Ok(e) = s.recv().await { match &e { GlobalCtxEvent::PeerRemoved(peer_id) => { diff --git a/easytier/src/peers/peer_conn_ping.rs b/easytier/src/peers/peer_conn_ping.rs index 0e5aeb1..93e0afb 100644 --- a/easytier/src/peers/peer_conn_ping.rs +++ b/easytier/src/peers/peer_conn_ping.rs @@ -84,9 +84,7 @@ impl PingIntervalController { self.throughput.rx_packets() > self.last_throughput.rx_packets() } - #[tracing::instrument] fn should_send_ping(&mut self) -> bool { - tracing::trace!(?self, "check should_send_ping"); if self.loss_counter.load(Ordering::Relaxed) > 0 { self.backoff_idx = 0; } else if self.tx_increase() && !self.rx_increase() { @@ -253,6 +251,13 @@ impl PeerConnPinger { continue; } + tracing::debug!( + "pingpong controller send pingpong task, seq: {}, node_id: {}, controller: {:?}", + req_seq, + my_node_id, + controller + ); + let mut sink = sink.clone(); let receiver = ctrl_resp_sender.subscribe(); let ping_res_sender = ping_res_sender.clone(); diff --git a/easytier/src/peers/peer_ospf_route.rs b/easytier/src/peers/peer_ospf_route.rs index f72e86a..19a418c 100644 --- a/easytier/src/peers/peer_ospf_route.rs +++ b/easytier/src/peers/peer_ospf_route.rs @@ -1450,9 +1450,6 @@ impl PeerRouteServiceImpl { let my_peer_id = self.my_peer_id; let (peer_infos, conn_bitmap, foreign_network) = self.build_sync_request(&session); - tracing::trace!(?foreign_network, "building sync_route request. my_id {:?}, pper_id: {:?}, peer_infos: {:?}, conn_bitmap: {:?}, synced_route_info: {:?} session: {:?}", - my_peer_id, dst_peer_id, peer_infos, conn_bitmap, self.synced_route_info, session); - if peer_infos.is_none() && conn_bitmap.is_none() && foreign_network.is_none() @@ -1462,6 +1459,9 @@ impl PeerRouteServiceImpl { return true; } + tracing::debug!(?foreign_network, "sync_route request need send to peer. my_id {:?}, pper_id: {:?}, peer_infos: {:?}, conn_bitmap: {:?}, synced_route_info: {:?} session: {:?}", + my_peer_id, dst_peer_id, peer_infos, conn_bitmap, self.synced_route_info, session); + session .need_sync_initiator_info .store(false, Ordering::Relaxed); @@ -1728,7 +1728,6 @@ impl RouteSessionManager { Ok(session) } - #[tracing::instrument(skip(self))] async fn maintain_sessions(&self, service_impl: Arc) -> bool { let mut cur_dst_peer_id_to_initiate = None; let mut next_sleep_ms = 0; @@ -1764,8 +1763,6 @@ impl RouteSessionManager { .map(|x| *x) .collect::>(); - tracing::trace!(?service_impl.my_peer_id, ?peers, ?session_peers, ?initiator_candidates, "maintain_sessions begin"); - if initiator_candidates.is_empty() { next_sleep_ms = 1000; continue; diff --git a/easytier/src/peers/peer_task.rs b/easytier/src/peers/peer_task.rs index 0fcb89e..aa172b1 100644 --- a/easytier/src/peers/peer_task.rs +++ b/easytier/src/peers/peer_task.rs @@ -83,12 +83,6 @@ where } } - tracing::debug!( - ?peers_to_connect, - ?to_remove, - "got peers to connect and remove" - ); - for key in to_remove { if let Some((_, task)) = peer_task_map.remove(&key) { task.abort(); @@ -115,7 +109,6 @@ where .insert(item.clone(), launcher.launch_task(&data, item).await.into()); } } else if peer_task_map.is_empty() { - tracing::debug!("all task done"); launcher.all_task_done(&data).await; }