diff --git a/Cargo.lock b/Cargo.lock index dd9068f..a4d81c7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4722,9 +4722,9 @@ dependencies = [ [[package]] name = "multimap" -version = "0.10.0" +version = "0.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03" +checksum = "1d87ecb2933e8aeadb3e3a02b828fed80a7528047e68b4f424523a0981a3a084" dependencies = [ "serde", ] diff --git a/easytier/Cargo.toml b/easytier/Cargo.toml index ebf22c7..0c5819b 100644 --- a/easytier/Cargo.toml +++ b/easytier/Cargo.toml @@ -213,7 +213,7 @@ hickory-client = "0.25.2" hickory-server = { version = "0.25.2", features = ["resolver"] } derive_builder = "0.20.2" humantime-serde = "1.1.1" -multimap = "0.10.0" +multimap = "0.10.1" version-compare = "0.2.0" hmac = "0.12.1" sha2 = "0.10.8" diff --git a/easytier/src/connector/manual.rs b/easytier/src/connector/manual.rs index a25f0fb..1aed487 100644 --- a/easytier/src/connector/manual.rs +++ b/easytier/src/connector/manual.rs @@ -3,24 +3,16 @@ use std::{ sync::{Arc, Weak}, }; -use anyhow::Context; use dashmap::DashSet; -use tokio::{ - sync::{ - broadcast::{error::RecvError, Receiver}, - mpsc, - }, - task::JoinSet, - time::timeout, -}; +use tokio::{sync::mpsc, task::JoinSet, time::timeout}; use crate::{ common::{dns::socket_addrs, join_joinset_background, PeerId}, - peers::{peer_conn::PeerConnId, peer_map::PeerMap}, + peers::peer_conn::PeerConnId, proto::{ api::instance::{ Connector, ConnectorManageRpc, ConnectorStatus, ListConnectorRequest, - ListConnectorResponse, PeerConnInfo, + ListConnectorResponse, }, rpc_types::{self, controller::BaseController}, }, @@ -40,7 +32,7 @@ use crate::{ use super::create_connector_by_url; -type ConnectorMap = Arc>; +type ConnectorMap = Arc>; #[derive(Debug, Clone)] struct ReconnResult { @@ -51,11 +43,11 @@ struct ReconnResult { struct ConnectorManagerData { connectors: ConnectorMap, - reconnecting: DashSet, + reconnecting: DashSet, peer_manager: Weak, - alive_conn_urls: Arc>, + alive_conn_urls: Arc>, // user removed connector urls - removed_conn_urls: Arc>, + removed_conn_urls: Arc>, net_ns: NetNS, global_ctx: ArcGlobalCtx, } @@ -70,7 +62,6 @@ impl ManualConnectorManager { pub fn new(global_ctx: ArcGlobalCtx, peer_manager: Arc) -> Self { let connectors = Arc::new(DashSet::new()); let tasks = JoinSet::new(); - let event_subscriber = global_ctx.subscribe(); let mut ret = Self { global_ctx: global_ctx.clone(), @@ -88,10 +79,6 @@ impl ManualConnectorManager { ret.tasks .spawn(Self::conn_mgr_reconn_routine(ret.data.clone())); - ret.tasks.spawn(Self::conn_mgr_handle_event_routine( - ret.data.clone(), - event_subscriber, - )); ret } @@ -101,11 +88,11 @@ impl ManualConnectorManager { T: TunnelConnector + 'static, { tracing::info!("add_connector: {}", connector.remote_url()); - self.data.connectors.insert(connector.remote_url().into()); + self.data.connectors.insert(connector.remote_url()); } - pub async fn add_connector_by_url(&self, url: &str) -> Result<(), Error> { - self.data.connectors.insert(url.to_owned()); + pub async fn add_connector_by_url(&self, url: url::Url) -> Result<(), Error> { + self.data.connectors.insert(url); Ok(()) } @@ -120,34 +107,28 @@ impl ManualConnectorManager { { return Err(Error::NotFound); } - self.data.removed_conn_urls.insert(url.to_string()); + self.data.removed_conn_urls.insert(url.into()); Ok(()) } pub async fn clear_connectors(&self) { self.list_connectors().await.iter().for_each(|x| { if let Some(url) = &x.url { - self.data.removed_conn_urls.insert(url.to_string()); + self.data.removed_conn_urls.insert(url.clone().into()); } }); } pub async fn list_connectors(&self) -> Vec { - let conn_urls: BTreeSet = self - .data - .connectors - .iter() - .map(|x| x.key().clone()) - .collect(); - - let dead_urls: BTreeSet = Self::collect_dead_conns(self.data.clone()) + let dead_urls: BTreeSet = Self::collect_dead_conns(self.data.clone()) .await .into_iter() .collect(); let mut ret = Vec::new(); - for conn_url in conn_urls { + for item in self.data.connectors.iter() { + let conn_url = item.key().clone(); let mut status = ConnectorStatus::Connected; if dead_urls.contains(&conn_url) { status = ConnectorStatus::Disconnected; @@ -155,20 +136,20 @@ impl ManualConnectorManager { ret.insert( 0, Connector { - url: Some(conn_url.parse().unwrap()), + url: Some(conn_url.into()), status: status.into(), }, ); } - let reconnecting_urls: BTreeSet = + let reconnecting_urls: BTreeSet = self.data.reconnecting.iter().map(|x| x.clone()).collect(); for conn_url in reconnecting_urls { ret.insert( 0, Connector { - url: Some(conn_url.parse().unwrap()), + url: Some(conn_url.into()), status: ConnectorStatus::Connecting.into(), }, ); @@ -177,49 +158,6 @@ impl ManualConnectorManager { ret } - async fn conn_mgr_handle_event_routine( - data: Arc, - mut event_recv: Receiver, - ) { - loop { - match event_recv.recv().await { - Ok(event) => { - Self::handle_event(&event, &data).await; - } - Err(RecvError::Lagged(n)) => { - tracing::warn!("event_recv lagged: {}, rebuild alive conn list", n); - event_recv = event_recv.resubscribe(); - data.alive_conn_urls.clear(); - let Some(pm) = data.peer_manager.upgrade() else { - tracing::warn!("peer manager is gone, exit"); - break; - }; - let fill_alive_urls_with_peer_map = |peer_map: &PeerMap| { - for x in peer_map.get_alive_conns().iter().map(|x| { - x.tunnel - .clone() - .unwrap_or_default() - .remote_addr - .unwrap_or_default() - .to_string() - }) { - data.alive_conn_urls.insert(x); - } - }; - - fill_alive_urls_with_peer_map(&pm.get_peer_map()); - fill_alive_urls_with_peer_map(&pm.get_foreign_network_client().get_peer_map()); - - continue; - } - Err(RecvError::Closed) => { - tracing::warn!("event_recv closed, exit"); - break; - } - } - } - } - async fn conn_mgr_reconn_routine(data: Arc) { tracing::warn!("conn_mgr_routine started"); let mut reconn_interval = tokio::time::interval(std::time::Duration::from_millis( @@ -261,31 +199,6 @@ impl ManualConnectorManager { } } - async fn handle_event(event: &GlobalCtxEvent, data: &ConnectorManagerData) { - let need_add_alive = |conn_info: &PeerConnInfo| conn_info.is_client; - match event { - GlobalCtxEvent::PeerConnAdded(conn_info) => { - if !need_add_alive(conn_info) { - return; - } - let addr = conn_info.tunnel.as_ref().unwrap().remote_addr.clone(); - data.alive_conn_urls.insert(addr.unwrap().to_string()); - tracing::warn!("peer conn added: {:?}", conn_info); - } - - GlobalCtxEvent::PeerConnRemoved(conn_info) => { - if !need_add_alive(conn_info) { - return; - } - let addr = conn_info.tunnel.as_ref().unwrap().remote_addr.clone(); - data.alive_conn_urls.remove(&addr.unwrap().to_string()); - tracing::warn!("peer conn removed: {:?}", conn_info); - } - - _ => {} - } - } - fn handle_remove_connector(data: Arc) { let remove_later = DashSet::new(); for it in data.removed_conn_urls.iter() { @@ -307,12 +220,20 @@ impl ManualConnectorManager { } } - async fn collect_dead_conns(data: Arc) -> BTreeSet { + async fn collect_dead_conns(data: Arc) -> BTreeSet { Self::handle_remove_connector(data.clone()); - let all_urls: BTreeSet = data.connectors.iter().map(|x| x.key().clone()).collect(); let mut ret = BTreeSet::new(); - for url in all_urls.iter() { - if !data.alive_conn_urls.contains(url) { + let Some(pm) = data.peer_manager.upgrade() else { + tracing::warn!("peer manager is gone, exit"); + return ret; + }; + for url in data.connectors.iter().map(|x| x.key().clone()) { + if !pm.get_peer_map().is_client_url_alive(&url) + && !pm + .get_foreign_network_client() + .get_peer_map() + .is_client_url_alive(&url) + { ret.insert(url.clone()); } } @@ -347,21 +268,19 @@ impl ManualConnectorManager { async fn conn_reconnect( data: Arc, - dead_url: String, + dead_url: url::Url, ) -> Result { tracing::info!("reconnect: {}", dead_url); let mut ip_versions = vec![]; - let u = url::Url::parse(&dead_url) - .with_context(|| format!("failed to parse connector url {:?}", dead_url))?; - if u.scheme() == "ring" || u.scheme() == "txt" || u.scheme() == "srv" { + if dead_url.scheme() == "ring" || dead_url.scheme() == "txt" || dead_url.scheme() == "srv" { ip_versions.push(IpVersion::Both); } else { - let addrs = match socket_addrs(&u, || Some(1000)).await { + let addrs = match socket_addrs(&dead_url, || Some(1000)).await { Ok(addrs) => addrs, Err(e) => { data.global_ctx.issue_event(GlobalCtxEvent::ConnectError( - dead_url.clone(), + dead_url.to_string(), format!("{:?}", IpVersion::Both), format!("{:?}", e), )); @@ -393,13 +312,18 @@ impl ManualConnectorManager { "cannot get ip from url" ))); for ip_version in ip_versions { - let use_long_timeout = dead_url.starts_with("http") - || dead_url.starts_with("srv") - || dead_url.starts_with("txt"); + let use_long_timeout = dead_url.scheme() == "http" + || dead_url.scheme() == "https" + || dead_url.scheme() == "txt" + || dead_url.scheme() == "srv"; let ret = timeout( // allow http connector to wait longer std::time::Duration::from_secs(if use_long_timeout { 20 } else { 2 }), - Self::conn_reconnect_with_ip_version(data.clone(), dead_url.clone(), ip_version), + Self::conn_reconnect_with_ip_version( + data.clone(), + dead_url.to_string(), + ip_version, + ), ) .await; tracing::info!("reconnect: {} done, ret: {:?}", dead_url, ret); @@ -422,7 +346,7 @@ impl ManualConnectorManager { // 发送事件(只有在未 break 时才执行) data.global_ctx.issue_event(GlobalCtxEvent::ConnectError( - dead_url.clone(), + dead_url.to_string(), format!("{:?}", ip_version), format!("{:?}", reconn_ret), )); diff --git a/easytier/src/instance/instance.rs b/easytier/src/instance/instance.rs index 37247c0..e39a391 100644 --- a/easytier/src/instance/instance.rs +++ b/easytier/src/instance/instance.rs @@ -484,7 +484,7 @@ impl InstanceConfigPatcher { match ConfigPatchAction::try_from(connector.action) { Ok(ConfigPatchAction::Add) => { tracing::info!("Connector added: {}", url); - conn_manager.add_connector_by_url(url.as_str()).await?; + conn_manager.add_connector_by_url(url).await?; } Ok(ConfigPatchAction::Remove) => { tracing::info!("Connector removed: {}", url); @@ -620,7 +620,7 @@ impl Instance { async fn add_initial_peers(&mut self) -> Result<(), Error> { for peer in self.global_ctx.config.get_peers().iter() { self.get_conn_manager() - .add_connector_by_url(peer.uri.as_str()) + .add_connector_by_url(peer.uri.clone()) .await?; } Ok(()) diff --git a/easytier/src/peers/peer_map.rs b/easytier/src/peers/peer_map.rs index 3b2af1d..5a33aaf 100644 --- a/easytier/src/peers/peer_map.rs +++ b/easytier/src/peers/peer_map.rs @@ -5,6 +5,7 @@ use std::{ use anyhow::Context; use dashmap::{DashMap, DashSet}; +use parking_lot::Mutex; use tokio::sync::RwLock; use crate::{ @@ -33,7 +34,7 @@ pub struct PeerMap { peer_map: DashMap>, packet_send: PacketRecvChan, routes: RwLock>, - alive_conns: Arc>, + alive_client_urls: Arc>>, } impl PeerMap { @@ -44,7 +45,7 @@ impl PeerMap { peer_map: DashMap::new(), packet_send, routes: RwLock::new(Vec::new()), - alive_conns: Arc::new(DashMap::new()), + alive_client_urls: Arc::new(Mutex::new(multimap::MultiMap::new())), } } @@ -56,7 +57,7 @@ impl PeerMap { } pub async fn add_new_peer_conn(&self, peer_conn: PeerConn) { - self.maintain_alive_conns(&peer_conn); + let _ = self.maintain_alive_client_urls(&peer_conn); let peer_id = peer_conn.get_peer_id(); let no_entry = self.peer_map.get(&peer_id).is_none(); if no_entry { @@ -69,29 +70,48 @@ impl PeerMap { } } - fn maintain_alive_conns(&self, peer_conn: &PeerConn) { - let close_notifier = peer_conn.get_close_notifier(); - let alive_conns_weak = Arc::downgrade(&self.alive_conns); - let conn_id = close_notifier.get_conn_id(); + fn maintain_alive_client_urls(&self, peer_conn: &PeerConn) -> Option<()> { let conn_info = peer_conn.get_conn_info(); - self.alive_conns - .insert((conn_info.peer_id, conn_id), conn_info.clone()); + if !conn_info.is_client { + return None; + } + + let close_notifier = peer_conn.get_close_notifier(); + let alive_conns_weak = Arc::downgrade(&self.alive_client_urls); + let conn_id = close_notifier.get_conn_id(); + let alive_client_url: url::Url = conn_info.tunnel?.remote_addr?.into(); + self.alive_client_urls + .lock() + .insert(alive_client_url.clone(), conn_id); + tokio::spawn(async move { if let Some(mut waiter) = close_notifier.get_waiter().await { let _ = waiter.recv().await; } - let mut alive_conn_count = 0; - if let Some(alive_conns) = alive_conns_weak.upgrade() { - alive_conns.remove(&(conn_info.peer_id, conn_id)).unwrap(); - alive_conn_count = alive_conns.len(); - shrink_dashmap(&alive_conns, None); - } + let Some(alive_conns) = alive_conns_weak.upgrade() else { + return; + }; + let mut guard = alive_conns.lock(); + if let Some(mut conn_ids) = guard.remove(&alive_client_url) { + conn_ids.retain(|id| id != &conn_id); + if !conn_ids.is_empty() { + guard.insert_many(alive_client_url, conn_ids); + } + }; + let alive_conn_count = guard.len(); + drop(guard); tracing::debug!( ?conn_id, "peer conn is closed, current alive conns: {}", alive_conn_count ); }); + + Some(()) + } + + pub fn is_client_url_alive(&self, url: &url::Url) -> bool { + self.alive_client_urls.lock().contains_key(url) } pub fn get_peer_by_id(&self, peer_id: PeerId) -> Option> { @@ -359,13 +379,6 @@ impl PeerMap { Ok(!self.has_peer(gateway_id)) } - pub fn get_alive_conns(&self) -> DashMap<(PeerId, PeerConnId), PeerConnInfo> { - self.alive_conns - .iter() - .map(|v| (*v.key(), v.value().clone())) - .collect() - } - pub fn my_peer_id(&self) -> PeerId { self.my_peer_id }