diff --git a/easytier/src/connector/udp_hole_punch/common.rs b/easytier/src/connector/udp_hole_punch/common.rs index ce94ba7..e89808d 100644 --- a/easytier/src/connector/udp_hole_punch/common.rs +++ b/easytier/src/connector/udp_hole_punch/common.rs @@ -56,8 +56,8 @@ impl From for UdpNatType { fn from(nat_type: NatType) -> Self { match nat_type { NatType::Unknown => UdpNatType::Unknown, - NatType::NoPat | NatType::OpenInternet => UdpNatType::Open(nat_type), - NatType::FullCone | NatType::Restricted | NatType::PortRestricted => { + NatType::OpenInternet => UdpNatType::Open(nat_type), + NatType::NoPat | NatType::FullCone | NatType::Restricted | NatType::PortRestricted => { UdpNatType::Cone(nat_type) } NatType::Symmetric | NatType::SymUdpFirewall => UdpNatType::HardSymmetric(nat_type), diff --git a/easytier/src/easytier-core.rs b/easytier/src/easytier-core.rs index 823a718..fa058e4 100644 --- a/easytier/src/easytier-core.rs +++ b/easytier/src/easytier-core.rs @@ -24,7 +24,7 @@ use easytier::{ scoped_task::ScopedTask, stun::MockStunInfoCollector, }, - connector::{create_connector_by_url, dns_connector::DNSTunnelConnector}, + connector::create_connector_by_url, launcher, proto::{ self, @@ -1087,7 +1087,6 @@ async fn run_main(cli: Cli) -> anyhow::Result<()> { hostname, ); tokio::signal::ctrl_c().await.unwrap(); - DNSTunnelConnector::new("".parse().unwrap(), global_ctx); return Ok(()); } diff --git a/easytier/src/peers/peer.rs b/easytier/src/peers/peer.rs index b34608f..2fef827 100644 --- a/easytier/src/peers/peer.rs +++ b/easytier/src/peers/peer.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use crossbeam::atomic::AtomicCell; use dashmap::DashMap; -use tokio::{select, sync::mpsc, task::JoinHandle}; +use tokio::{select, sync::mpsc}; use tracing::Instrument; @@ -32,7 +32,7 @@ pub struct Peer { packet_recv_chan: PacketRecvChan, close_event_sender: mpsc::Sender, - close_event_listener: JoinHandle<()>, + close_event_listener: ScopedTask<()>, shutdown_notifier: Arc, @@ -87,7 +87,8 @@ impl Peer { "peer_close_event_listener", ?peer_node_id, )), - ); + ) + .into(); let default_conn_id = Arc::new(AtomicCell::new(PeerConnId::default())); @@ -188,7 +189,13 @@ impl Peer { let mut ret = Vec::new(); for conn in conns { - ret.push(conn.get_conn_info()); + let info = conn.get_conn_info(); + if !info.is_closed { + ret.push(info); + } else { + let conn_id = info.conn_id.parse().unwrap(); + let _ = self.close_peer_conn(&conn_id).await; + } } ret } diff --git a/easytier/src/peers/peer_conn.rs b/easytier/src/peers/peer_conn.rs index 35694b4..b957980 100644 --- a/easytier/src/peers/peer_conn.rs +++ b/easytier/src/peers/peer_conn.rs @@ -432,6 +432,7 @@ impl PeerConn { loss_rate: (f64::from(self.loss_rate_stats.load(Ordering::Relaxed)) / 100.0) as f32, is_client: self.is_client.unwrap_or_default(), network_name: info.network_name.clone(), + is_closed: self.close_event_notifier.is_closed(), } } } diff --git a/easytier/src/peers/peer_ospf_route.rs b/easytier/src/peers/peer_ospf_route.rs index 0207d3a..704ad03 100644 --- a/easytier/src/peers/peer_ospf_route.rs +++ b/easytier/src/peers/peer_ospf_route.rs @@ -86,8 +86,9 @@ impl AtomicVersion { self.0.fetch_add(1, Ordering::Relaxed) + 1 } - fn set_if_larger(&self, version: Version) { - self.0.fetch_max(version, Ordering::Relaxed); + fn set_if_larger(&self, version: Version) -> bool { + // return true if the version is set. + self.0.fetch_max(version, Ordering::Relaxed) < version } } @@ -452,7 +453,6 @@ impl SyncedRouteInfo { let mut need_inc_version = false; for (peer_idx, (peer_id, version)) in conn_bitmap.peer_ids.iter().enumerate() { - assert!(self.peer_infos.contains_key(peer_id)); let connceted_peers = conn_bitmap.get_connected_peers(peer_idx); self.fill_empty_peer_info(&connceted_peers); @@ -460,17 +460,14 @@ impl SyncedRouteInfo { .entry(*peer_id) .and_modify(|(old_conn_bitmap, old_version)| { if *version > old_version.get() { - *old_conn_bitmap = conn_bitmap.get_connected_peers(peer_idx); + *old_conn_bitmap = connceted_peers.clone(); need_inc_version = true; old_version.set(*version); } }) .or_insert_with(|| { need_inc_version = true; - ( - conn_bitmap.get_connected_peers(peer_idx), - version.clone().into(), - ) + (connceted_peers, version.clone().into()) }); } if need_inc_version { @@ -479,7 +476,6 @@ impl SyncedRouteInfo { } fn update_foreign_network(&self, foreign_network: &RouteForeignNetworkInfos) { - let mut need_inc_version = false; for item in foreign_network.infos.iter().map(Clone::clone) { let Some(key) = item.key else { continue; @@ -494,14 +490,10 @@ impl SyncedRouteInfo { .entry(key.clone()) .and_modify(|old_entry| { if entry.version > old_entry.version { - need_inc_version = true; *old_entry = entry.clone(); } }) - .or_insert_with(|| { - need_inc_version = true; - entry.clone() - }); + .or_insert_with(|| entry.clone()); } } @@ -1069,6 +1061,7 @@ struct PeerRouteServiceImpl { foreign_network_owner_map: DashMap>, synced_route_info: SyncedRouteInfo, cached_local_conn_map: std::sync::Mutex, + cached_local_conn_map_version: AtomicVersion, last_update_my_foreign_network: AtomicCell>, @@ -1118,6 +1111,7 @@ impl PeerRouteServiceImpl { version: AtomicVersion::new(), }, cached_local_conn_map: std::sync::Mutex::new(RouteConnBitmap::new()), + cached_local_conn_map_version: AtomicVersion::new(), last_update_my_foreign_network: AtomicCell::new(None), @@ -1290,6 +1284,8 @@ impl PeerRouteServiceImpl { // update route table first because we want to filter out unreachable peers. self.update_route_table(); + let synced_version = self.synced_route_info.version.get(); + // the conn_bitmap should contain complete list of directly connected peers. // use union of dst peers can preserve this property. let all_dst_peer_ids = self @@ -1327,7 +1323,13 @@ impl PeerRouteServiceImpl { } } - *self.cached_local_conn_map.lock().unwrap() = conn_bitmap; + let mut locked = self.cached_local_conn_map.lock().unwrap(); + if self + .cached_local_conn_map_version + .set_if_larger(synced_version) + { + *locked = conn_bitmap; + } } fn build_route_info(&self, session: &SyncRouteSession) -> Option> { diff --git a/easytier/src/proto/cli.proto b/easytier/src/proto/cli.proto index 74d80ba..973af45 100644 --- a/easytier/src/proto/cli.proto +++ b/easytier/src/proto/cli.proto @@ -30,6 +30,7 @@ message PeerConnInfo { float loss_rate = 7; bool is_client = 8; string network_name = 9; + bool is_closed = 10; } message PeerInfo { diff --git a/easytier/src/tunnel/websocket.rs b/easytier/src/tunnel/websocket.rs index 2f423d3..f81d4d8 100644 --- a/easytier/src/tunnel/websocket.rs +++ b/easytier/src/tunnel/websocket.rs @@ -183,8 +183,6 @@ impl WSTunnelConnector { ) -> Result, TunnelError> { let is_wss = is_wss(&addr)?; let socket_addr = SocketAddr::from_url(addr.clone(), ip_version).await?; - let domain = addr.domain(); - let host = socket_addr.ip(); let stream = tcp_socket.connect(socket_addr).await?; let info = TunnelInfo { @@ -208,9 +206,7 @@ impl WSTunnelConnector { let sni = "localhost"; let server_name = rustls::pki_types::ServerName::try_from(sni) .map_err(|_| TunnelError::InvalidProtocol("Invalid SNI".to_string()))?; - let stream = tls_conn - .connect(server_name, stream) - .await?; + let stream = tls_conn.connect(server_name, stream).await?; MaybeTlsStream::Rustls(stream) } else { MaybeTlsStream::Plain(stream) diff --git a/easytier/src/utils.rs b/easytier/src/utils.rs index 1694cb2..e36365c 100644 --- a/easytier/src/utils.rs +++ b/easytier/src/utils.rs @@ -205,7 +205,7 @@ pub fn setup_panic_handler() { // backtrace is risky, so use it last let backtrace = backtrace::Backtrace::force_capture(); - write_err(format!("backtrace: {:?}", backtrace)); + write_err(format!("backtrace: {:#?}", backtrace)); std::process::exit(1); }));