From f85b031402d1a8fa07b56accd063ed0636cdb295 Mon Sep 17 00:00:00 2001 From: "Sijie.Sun" Date: Sun, 6 Jul 2025 09:16:13 +0800 Subject: [PATCH] handle close peer conn correctly (#1082) --- easytier/src/connector/direct.rs | 5 +- easytier/src/peers/foreign_network_manager.rs | 37 ++++- easytier/src/peers/peer_manager.rs | 157 +++++++++++++++++- 3 files changed, 188 insertions(+), 11 deletions(-) diff --git a/easytier/src/connector/direct.rs b/easytier/src/connector/direct.rs index 2fa0f2f..2c85004 100644 --- a/easytier/src/connector/direct.rs +++ b/easytier/src/connector/direct.rs @@ -211,10 +211,7 @@ impl DirectConnectorManagerData { dst_peer_id, peer_id ); - self.peer_manager - .get_peer_map() - .close_peer_conn(peer_id, &conn_id) - .await?; + self.peer_manager.close_peer_conn(peer_id, &conn_id).await?; return Err(Error::InvalidUrl(addr)); } diff --git a/easytier/src/peers/foreign_network_manager.rs b/easytier/src/peers/foreign_network_manager.rs index c4816a9..4cc4fd9 100644 --- a/easytier/src/peers/foreign_network_manager.rs +++ b/easytier/src/peers/foreign_network_manager.rs @@ -10,7 +10,7 @@ use std::{ time::SystemTime, }; -use dashmap::DashMap; +use dashmap::{DashMap, DashSet}; use tokio::{ sync::{ mpsc::{self, UnboundedReceiver, UnboundedSender}, @@ -367,14 +367,14 @@ impl Drop for ForeignNetworkEntry { struct ForeignNetworkManagerData { network_peer_maps: DashMap>, - peer_network_map: DashMap, + peer_network_map: DashMap>, network_peer_last_update: DashMap, accessor: Arc>, lock: std::sync::Mutex<()>, } impl ForeignNetworkManagerData { - fn get_peer_network(&self, peer_id: PeerId) -> Option { + fn get_peer_network(&self, peer_id: PeerId) -> Option> { self.peer_network_map.get(&peer_id).map(|v| v.clone()) } @@ -384,7 +384,10 @@ impl ForeignNetworkManagerData { fn remove_peer(&self, peer_id: PeerId, network_name: &String) { let _l = self.lock.lock().unwrap(); - self.peer_network_map.remove(&peer_id); + self.peer_network_map.remove_if(&peer_id, |_, v| { + let _ = v.remove(network_name); + v.is_empty() + }); if let Some(_) = self .network_peer_maps .remove_if(network_name, |_, v| v.peer_map.is_empty()) @@ -406,7 +409,10 @@ impl ForeignNetworkManagerData { fn remove_network(&self, network_name: &String) { let _l = self.lock.lock().unwrap(); - self.peer_network_map.retain(|_, v| v != network_name); + self.peer_network_map.iter().for_each(|v| { + v.value().remove(network_name); + }); + self.peer_network_map.retain(|_, v| !v.is_empty()); self.network_peer_maps.remove(network_name); self.network_peer_last_update.remove(network_name); } @@ -439,7 +445,9 @@ impl ForeignNetworkManagerData { .clone(); self.peer_network_map - .insert(dst_peer_id, network_identity.network_name.clone()); + .entry(dst_peer_id) + .or_insert_with(|| DashSet::new()) + .insert(network_identity.network_name.clone()); self.network_peer_last_update .insert(network_identity.network_name.clone(), SystemTime::now()); @@ -665,6 +673,23 @@ impl ForeignNetworkManager { Err(Error::RouteError(Some("network not found".to_string()))) } } + + pub async fn close_peer_conn( + &self, + peer_id: PeerId, + conn_id: &super::peer_conn::PeerConnId, + ) -> Result<(), Error> { + let network_names = self.data.get_peer_network(peer_id).unwrap_or_default(); + for network_name in network_names { + if let Some(entry) = self.data.get_network_entry(&network_name) { + let ret = entry.peer_map.close_peer_conn(peer_id, conn_id).await; + if ret.is_ok() || !matches!(ret.as_ref().unwrap_err(), Error::NotFound) { + return ret; + } + } + } + Err(Error::NotFound) + } } impl Drop for ForeignNetworkManager { diff --git a/easytier/src/peers/peer_manager.rs b/easytier/src/peers/peer_manager.rs index 746a722..7937a35 100644 --- a/easytier/src/peers/peer_manager.rs +++ b/easytier/src/peers/peer_manager.rs @@ -1128,6 +1128,35 @@ impl PeerManager { self.peer_rpc_mgr.rpc_server().registry().unregister_all(); } + + pub async fn close_peer_conn( + &self, + peer_id: PeerId, + conn_id: &PeerConnId, + ) -> Result<(), Error> { + let ret = self.peers.close_peer_conn(peer_id, conn_id).await; + tracing::info!("close_peer_conn in peer map: {:?}", ret); + if ret.is_ok() || !matches!(ret.as_ref().unwrap_err(), Error::NotFound) { + return ret; + } + + let ret = self + .foreign_network_client + .get_peer_map() + .close_peer_conn(peer_id, conn_id) + .await; + tracing::info!("close_peer_conn in foreign network client: {:?}", ret); + if ret.is_ok() || !matches!(ret.as_ref().unwrap_err(), Error::NotFound) { + return ret; + } + + let ret = self + .foreign_network_manager + .close_peer_conn(peer_id, conn_id) + .await; + tracing::info!("close_peer_conn in foreign network manager done: {:?}", ret); + ret + } } #[cfg(test)] @@ -1147,7 +1176,10 @@ mod tests { peer_manager::RouteAlgoType, peer_rpc::tests::register_service, route_trait::NextHopPolicy, - tests::{connect_peer_manager, wait_route_appear, wait_route_appear_with_cost}, + tests::{ + connect_peer_manager, create_mock_peer_manager_with_name, wait_route_appear, + wait_route_appear_with_cost, + }, }, proto::common::{CompressionAlgoPb, NatType, PeerFeatureFlag}, tunnel::{ @@ -1427,4 +1459,127 @@ mod tests { ) .await; } + + #[tokio::test] + async fn close_conn_in_peer_map() { + let peer_mgr_a = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; + let peer_mgr_b = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; + connect_peer_manager(peer_mgr_a.clone(), peer_mgr_b.clone()).await; + wait_route_appear(peer_mgr_a.clone(), peer_mgr_b.clone()) + .await + .unwrap(); + + let conns = peer_mgr_a + .get_peer_map() + .list_peer_conns(peer_mgr_b.my_peer_id) + .await; + assert!(conns.is_some()); + let conn_info = conns.as_ref().unwrap().first().unwrap(); + + peer_mgr_a + .close_peer_conn(peer_mgr_b.my_peer_id, &conn_info.conn_id.parse().unwrap()) + .await + .unwrap(); + + wait_for_condition( + || async { + let peers = peer_mgr_a.list_peers().await; + peers.is_empty() + }, + Duration::from_secs(10), + ) + .await; + // a is client, b is server + } + + #[tokio::test] + async fn close_conn_in_foreign_network_client() { + let peer_mgr_server = create_mock_peer_manager_with_name("server".to_string()).await; + let peer_mgr_client = create_mock_peer_manager_with_name("client".to_string()).await; + connect_peer_manager(peer_mgr_client.clone(), peer_mgr_server.clone()).await; + wait_for_condition( + || async { + peer_mgr_client + .get_foreign_network_client() + .list_public_peers() + .await + .len() + == 1 + }, + Duration::from_secs(3), + ) + .await; + + let peer_id = peer_mgr_client + .foreign_network_client + .list_public_peers() + .await[0]; + let conns = peer_mgr_client + .foreign_network_client + .get_peer_map() + .list_peer_conns(peer_id) + .await; + assert!(conns.is_some()); + let conn_info = conns.as_ref().unwrap().first().unwrap(); + peer_mgr_client + .close_peer_conn(peer_id, &conn_info.conn_id.parse().unwrap()) + .await + .unwrap(); + + wait_for_condition( + || async { + peer_mgr_client + .get_foreign_network_client() + .list_public_peers() + .await + .len() + == 0 + }, + Duration::from_secs(10), + ) + .await; + } + + #[tokio::test] + async fn close_conn_in_foreign_network_manager() { + let peer_mgr_server = create_mock_peer_manager_with_name("server".to_string()).await; + let peer_mgr_client = create_mock_peer_manager_with_name("client".to_string()).await; + connect_peer_manager(peer_mgr_client.clone(), peer_mgr_server.clone()).await; + wait_for_condition( + || async { + peer_mgr_client + .get_foreign_network_client() + .list_public_peers() + .await + .len() + == 1 + }, + Duration::from_secs(3), + ) + .await; + + let conns = peer_mgr_server + .foreign_network_manager + .list_foreign_networks() + .await; + let client_info = conns.foreign_networks["client"].peers[0].clone(); + let conn_info = client_info.conns[0].clone(); + peer_mgr_server + .close_peer_conn(client_info.peer_id, &conn_info.conn_id.parse().unwrap()) + .await + .unwrap(); + + wait_for_condition( + || async { + peer_mgr_client + .get_foreign_network_client() + .list_public_peers() + .await + .len() + == 0 + }, + Duration::from_secs(10), + ) + .await; + } }