From 72673a9d52534b532d47b78e9e6afaeb2b13e718 Mon Sep 17 00:00:00 2001 From: liusen373 <52489720+liusen373@users.noreply.github.com> Date: Wed, 18 Jun 2025 12:14:57 +0800 Subject: [PATCH] Add is_hole_punched flag to PeerConn (#1001) --- easytier/src/connector/direct.rs | 2 +- easytier/src/connector/udp_hole_punch/mod.rs | 2 +- easytier/src/peers/foreign_network_manager.rs | 2 +- easytier/src/peers/peer.rs | 13 ++- easytier/src/peers/peer_conn.rs | 13 +++ easytier/src/peers/peer_manager.rs | 103 +++--------------- easytier/src/peers/peer_map.rs | 2 +- easytier/src/peers/tests.rs | 2 +- 8 files changed, 47 insertions(+), 92 deletions(-) diff --git a/easytier/src/connector/direct.rs b/easytier/src/connector/direct.rs index 5e1e6c5..2fa0f2f 100644 --- a/easytier/src/connector/direct.rs +++ b/easytier/src/connector/direct.rs @@ -186,7 +186,7 @@ impl DirectConnectorManagerData { .await?; // NOTICE: must add as directly connected tunnel - self.peer_manager.add_direct_tunnel(ret).await + self.peer_manager.add_client_tunnel(ret, true).await } async fn do_try_connect_to_ip(&self, dst_peer_id: PeerId, addr: String) -> Result<(), Error> { diff --git a/easytier/src/connector/udp_hole_punch/mod.rs b/easytier/src/connector/udp_hole_punch/mod.rs index 010e9ed..a068a58 100644 --- a/easytier/src/connector/udp_hole_punch/mod.rs +++ b/easytier/src/connector/udp_hole_punch/mod.rs @@ -221,7 +221,7 @@ impl UdpHoePunchConnectorData { Ok(Some(tunnel)) => { tracing::info!(?tunnel, "hole punching get tunnel success"); - if let Err(e) = self.peer_mgr.add_client_tunnel(tunnel).await { + if let Err(e) = self.peer_mgr.add_client_tunnel(tunnel, false).await { tracing::warn!(?e, "add client tunnel failed"); op(true); false diff --git a/easytier/src/peers/foreign_network_manager.rs b/easytier/src/peers/foreign_network_manager.rs index 7494306..a3838b5 100644 --- a/easytier/src/peers/foreign_network_manager.rs +++ b/easytier/src/peers/foreign_network_manager.rs @@ -721,7 +721,7 @@ mod tests { let s_ret = tokio::spawn(async move { b_mgr_copy.add_tunnel_as_server(b_ring, true).await }); - pma_net1.add_client_tunnel(a_ring).await.unwrap(); + pma_net1.add_client_tunnel(a_ring, false).await.unwrap(); s_ret.await.unwrap().unwrap(); } diff --git a/easytier/src/peers/peer.rs b/easytier/src/peers/peer.rs index 2fef827..3ef8884 100644 --- a/easytier/src/peers/peer.rs +++ b/easytier/src/peers/peer.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use crossbeam::atomic::AtomicCell; -use dashmap::DashMap; +use dashmap::{DashMap, DashSet}; use tokio::{select, sync::mpsc}; @@ -200,6 +200,17 @@ impl Peer { ret } + pub fn has_directly_connected_conn(&self) -> bool { + self.conns.iter().any(|entry|!(entry.value()).is_hole_punched()) + } + + pub fn get_directly_connections(&self) -> DashSet { + self.conns.iter() + .filter(|entry| !(entry.value()).is_hole_punched()) + .map(|entry|(entry.value()).get_conn_id()) + .collect() + } + pub fn get_default_conn_id(&self) -> PeerConnId { self.default_conn_id.load() } diff --git a/easytier/src/peers/peer_conn.rs b/easytier/src/peers/peer_conn.rs index ca258c5..8989d8d 100644 --- a/easytier/src/peers/peer_conn.rs +++ b/easytier/src/peers/peer_conn.rs @@ -101,6 +101,9 @@ pub struct PeerConn { info: Option, is_client: Option, + // remote or local + is_hole_punched: bool, + close_event_notifier: Arc, ctrl_resp_sender: broadcast::Sender, @@ -152,6 +155,8 @@ impl PeerConn { info: None, is_client: None, + is_hole_punched: true, + close_event_notifier: Arc::new(PeerConnCloseNotify::new(conn_id)), ctrl_resp_sender: ctrl_sender, @@ -166,6 +171,14 @@ impl PeerConn { self.conn_id } + pub fn set_is_hole_punched(&mut self, is_hole_punched: bool) { + self.is_hole_punched = is_hole_punched; + } + + pub fn is_hole_punched(&self) -> bool { + self.is_hole_punched + } + async fn wait_handshake(&mut self, need_retry: &mut bool) -> Result { *need_retry = false; diff --git a/easytier/src/peers/peer_manager.rs b/easytier/src/peers/peer_manager.rs index 7f251a5..bbefb6d 100644 --- a/easytier/src/peers/peer_manager.rs +++ b/easytier/src/peers/peer_manager.rs @@ -23,7 +23,7 @@ use crate::{ compressor::{Compressor as _, DefaultCompressor}, constants::EASYTIER_VERSION, error::Error, - global_ctx::{ArcGlobalCtx, GlobalCtxEvent, NetworkIdentity}, + global_ctx::{ArcGlobalCtx, NetworkIdentity}, stun::StunInfoCollectorTrait, PeerId, }, @@ -142,9 +142,6 @@ pub struct PeerManager { exit_nodes: Vec, - // conns that are directly connected (which are not hole punched) - directly_connected_conn_map: Arc>>, - reserved_my_peer_id_map: DashMap, } @@ -273,8 +270,6 @@ impl PeerManager { exit_nodes, - directly_connected_conn_map: Arc::new(DashMap::new()), - reserved_my_peer_id_map: DashMap::new(), } } @@ -319,8 +314,10 @@ impl PeerManager { pub async fn add_client_tunnel( &self, tunnel: Box, + is_directly_connected: bool, ) -> Result<(PeerId, PeerConnId), Error> { let mut peer = PeerConn::new(self.my_peer_id, self.global_ctx.clone(), tunnel); + peer.set_is_hole_punched(!is_directly_connected); peer.do_handshake_as_client().await?; let conn_id = peer.get_conn_id(); let peer_id = peer.get_peer_id(); @@ -334,72 +331,12 @@ impl PeerManager { Ok((peer_id, conn_id)) } - fn add_directly_connected_conn(&self, peer_id: PeerId, conn_id: uuid::Uuid) { - let _ = self - .directly_connected_conn_map - .entry(peer_id) - .or_insert_with(DashSet::new) - .insert(conn_id); - } - pub fn has_directly_connected_conn(&self, peer_id: PeerId) -> bool { - self.directly_connected_conn_map - .get(&peer_id) - .map_or(false, |x| !x.is_empty()) - } - - async fn start_peer_conn_close_event_handler(&self) { - let dmap = self.directly_connected_conn_map.clone(); - let mut event_recv = self.global_ctx.subscribe(); - let peer_map = self.peers.clone(); - use tokio::sync::broadcast::error::RecvError; - self.tasks.lock().await.spawn(async move { - loop { - match event_recv.recv().await { - Err(RecvError::Closed) => { - tracing::error!("peer conn close event handler exit"); - break; - } - Err(RecvError::Lagged(_)) => { - tracing::warn!("peer conn close event handler lagged"); - event_recv = event_recv.resubscribe(); - let alive_conns = peer_map.get_alive_conns(); - for p in dmap.iter_mut() { - p.retain(|x| alive_conns.contains_key(&(*p.key(), *x))); - } - dmap.retain(|_, v| !v.is_empty()); - } - Ok(event) => { - if let GlobalCtxEvent::PeerConnRemoved(info) = event { - let mut need_remove = false; - if let Some(set) = dmap.get_mut(&info.peer_id) { - let conn_id = info.conn_id.parse().unwrap(); - let old = set.remove(&conn_id); - tracing::info!( - ?old, - ?info, - "try remove conn id from directly connected map" - ); - need_remove = set.is_empty(); - } - - if need_remove { - dmap.remove(&info.peer_id); - } - } - } - } - } - }); - } - - pub async fn add_direct_tunnel( - &self, - t: Box, - ) -> Result<(PeerId, PeerConnId), Error> { - let (peer_id, conn_id) = self.add_client_tunnel(t).await?; - self.add_directly_connected_conn(peer_id, conn_id); - Ok((peer_id, conn_id)) + if let Some(peer) = self.peers.get_peer_by_id(peer_id) { + peer.has_directly_connected_conn() + } else { + false + } } #[tracing::instrument] @@ -414,7 +351,7 @@ impl PeerManager { let t = ns .run_async(|| async move { connector.connect().await }) .await?; - self.add_direct_tunnel(t).await + self.add_client_tunnel(t, true).await } #[tracing::instrument(ret)] @@ -462,11 +399,8 @@ impl PeerManager { let peer_network_name = peer.get_network_identity().network_name.clone(); if peer_network_name == self.global_ctx.get_network_identity().network_name { - let (peer_id, conn_id) = (peer.get_peer_id(), peer.get_conn_id()); + peer.set_is_hole_punched(!is_directly_connected); self.add_new_peer_conn(peer).await?; - if is_directly_connected { - self.add_directly_connected_conn(peer_id, conn_id); - } } else { self.foreign_network_manager.add_peer_conn(peer).await?; } @@ -1020,11 +954,9 @@ impl PeerManager { async fn run_clean_peer_without_conn_routine(&self) { let peer_map = self.peers.clone(); - let dmap = self.directly_connected_conn_map.clone(); self.tasks.lock().await.spawn(async move { loop { peer_map.clean_peer_without_conn().await; - dmap.retain(|p, v| peer_map.has_peer(*p) && !v.is_empty()); tokio::time::sleep(std::time::Duration::from_secs(3)).await; } }); @@ -1041,8 +973,6 @@ impl PeerManager { } pub async fn run(&self) -> Result<(), Error> { - self.start_peer_conn_close_event_handler().await; - match &self.route_algo_inst { RouteAlgoInst::Ospf(route) => self.add_route(route.clone()).await, RouteAlgoInst::None => {} @@ -1135,10 +1065,11 @@ impl PeerManager { } pub fn get_directly_connections(&self, peer_id: PeerId) -> DashSet { - self.directly_connected_conn_map - .get(&peer_id) - .map(|x| x.clone()) - .unwrap_or_default() + if let Some(peer) = self.peers.get_peer_by_id(peer_id) { + return peer.get_directly_connections() + } + + DashSet::new() } pub async fn clear_resources(&self) { @@ -1227,7 +1158,7 @@ mod tests { }); server_mgr - .add_client_tunnel(server.accept().await.unwrap()) + .add_client_tunnel(server.accept().await.unwrap(), false) .await .unwrap(); } @@ -1432,7 +1363,7 @@ mod tests { let a_mgr_copy = peer_mgr_a.clone(); tokio::spawn(async move { - a_mgr_copy.add_client_tunnel(a_ring).await.unwrap(); + a_mgr_copy.add_client_tunnel(a_ring, false).await.unwrap(); }); let b_mgr_copy = peer_mgr_b.clone(); tokio::spawn(async move { diff --git a/easytier/src/peers/peer_map.rs b/easytier/src/peers/peer_map.rs index 9ab1927..4aa0abc 100644 --- a/easytier/src/peers/peer_map.rs +++ b/easytier/src/peers/peer_map.rs @@ -87,7 +87,7 @@ impl PeerMap { }); } - fn get_peer_by_id(&self, peer_id: PeerId) -> Option> { + pub fn get_peer_by_id(&self, peer_id: PeerId) -> Option> { self.peer_map.get(&peer_id).map(|v| v.clone()) } diff --git a/easytier/src/peers/tests.rs b/easytier/src/peers/tests.rs index 90ffe5c..a8b984e 100644 --- a/easytier/src/peers/tests.rs +++ b/easytier/src/peers/tests.rs @@ -41,7 +41,7 @@ pub async fn connect_peer_manager(client: Arc, server: Arc