From 4f53fccd25ec5c8bd6c9595f0caad9b8abc07fdf Mon Sep 17 00:00:00 2001 From: "Sijie.Sun" Date: Mon, 21 Jul 2025 23:18:38 +0800 Subject: [PATCH] fix bugs (#1138) 1. avoid dns query hangs the thread 2. avoid deadloop when stun query failed because of no ipv4 addr. 3. make quic input error non-fatal. 4. remove ring tunnel from connection map to avoid mem leak. 5. limit listener retry count. --- Cargo.lock | 4 +-- easytier/src/connector/direct.rs | 12 +++++--- easytier/src/connector/manual.rs | 4 +-- easytier/src/connector/udp_hole_punch/mod.rs | 8 ++++-- easytier/src/instance/instance.rs | 24 ++++++++++++---- easytier/src/instance/listeners.rs | 7 ++++- easytier/src/tunnel/ring.rs | 29 ++++++++++++++------ 7 files changed, 62 insertions(+), 26 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0b88d35..30df35e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8486,8 +8486,8 @@ dependencies = [ [[package]] name = "thunk-rs" -version = "0.3.4" -source = "git+https://github.com/easytier/thunk.git#403f0d26d3d5bcfdfd76c23e36e517f19fe891e0" +version = "0.3.5" +source = "git+https://github.com/easytier/thunk.git#cbbeec75a66b7b3cf0824ae890d9d06bcfb9d1f3" [[package]] name = "tiff" diff --git a/easytier/src/connector/direct.rs b/easytier/src/connector/direct.rs index 2c85004..b967d6c 100644 --- a/easytier/src/connector/direct.rs +++ b/easytier/src/connector/direct.rs @@ -12,7 +12,10 @@ use std::{ }; use crate::{ - common::{error::Error, global_ctx::ArcGlobalCtx, stun::StunInfoCollectorTrait, PeerId}, + common::{ + dns::socket_addrs, error::Error, global_ctx::ArcGlobalCtx, stun::StunInfoCollectorTrait, + PeerId, + }, peers::{ peer_conn::PeerConnId, peer_manager::PeerManager, @@ -281,14 +284,14 @@ impl DirectConnectorManagerData { } } - fn spawn_direct_connect_task( + async fn spawn_direct_connect_task( self: &Arc, dst_peer_id: PeerId, ip_list: &GetIpListResponse, listener: &url::Url, tasks: &mut JoinSet>, ) { - let Ok(mut addrs) = listener.socket_addrs(|| None) else { + let Ok(mut addrs) = socket_addrs(listener, || None).await else { tracing::error!(?listener, "failed to parse socket address from listener"); return; }; @@ -432,7 +435,8 @@ impl DirectConnectorManagerData { &ip_list, &listener, &mut tasks, - ); + ) + .await; listener_list.push(listener.clone().to_string()); available_listeners.pop(); diff --git a/easytier/src/connector/manual.rs b/easytier/src/connector/manual.rs index 01e70f6..2cce883 100644 --- a/easytier/src/connector/manual.rs +++ b/easytier/src/connector/manual.rs @@ -15,7 +15,7 @@ use tokio::{ }; use crate::{ - common::{join_joinset_background, PeerId}, + common::{dns::socket_addrs, join_joinset_background, PeerId}, peers::peer_conn::PeerConnId, proto::{ cli::{ @@ -373,7 +373,7 @@ impl ManualConnectorManager { if u.scheme() == "ring" || u.scheme() == "txt" || u.scheme() == "srv" { ip_versions.push(IpVersion::Both); } else { - let addrs = match u.socket_addrs(|| Some(1000)) { + let addrs = match socket_addrs(&u, || Some(1000)).await { Ok(addrs) => addrs, Err(e) => { data.global_ctx.issue_event(GlobalCtxEvent::ConnectError( diff --git a/easytier/src/connector/udp_hole_punch/mod.rs b/easytier/src/connector/udp_hole_punch/mod.rs index 261820a..1bdaa15 100644 --- a/easytier/src/connector/udp_hole_punch/mod.rs +++ b/easytier/src/connector/udp_hole_punch/mod.rs @@ -270,7 +270,7 @@ impl UdpHoePunchConnectorData { #[tracing::instrument(skip(self))] async fn cone_to_cone(self: Arc, task_info: PunchTaskInfo) -> Result<(), Error> { - let mut backoff = BackOff::new(vec![0, 1000, 2000, 4000, 4000, 8000, 8000, 16000]); + let mut backoff = BackOff::new(vec![1000, 1000, 2000, 4000, 4000, 8000, 8000, 16000]); loop { backoff.sleep_for_next_backoff().await; @@ -293,7 +293,8 @@ impl UdpHoePunchConnectorData { #[tracing::instrument(skip(self))] async fn sym_to_cone(self: Arc, task_info: PunchTaskInfo) -> Result<(), Error> { - let mut backoff = BackOff::new(vec![0, 1000, 2000, 4000, 4000, 8000, 8000, 16000, 64000]); + let mut backoff = + BackOff::new(vec![1000, 1000, 2000, 4000, 4000, 8000, 8000, 16000, 64000]); let mut round = 0; let mut port_idx = rand::random(); @@ -338,7 +339,8 @@ impl UdpHoePunchConnectorData { #[tracing::instrument(skip(self))] async fn both_easy_sym(self: Arc, task_info: PunchTaskInfo) -> Result<(), Error> { - let mut backoff = BackOff::new(vec![0, 1000, 2000, 4000, 4000, 8000, 8000, 16000, 64000]); + let mut backoff = + BackOff::new(vec![1000, 1000, 2000, 4000, 4000, 8000, 8000, 16000, 64000]); loop { backoff.sleep_for_next_backoff().await; diff --git a/easytier/src/instance/instance.rs b/easytier/src/instance/instance.rs index ee1d1a9..0741876 100644 --- a/easytier/src/instance/instance.rs +++ b/easytier/src/instance/instance.rs @@ -526,6 +526,19 @@ impl Instance { }); } + async fn run_quic_dst(&mut self) -> Result<(), Error> { + if !self.global_ctx.get_flags().enable_quic_proxy { + return Ok(()); + } + + let quic_dst = QUICProxyDst::new(self.global_ctx.clone())?; + quic_dst.start().await?; + self.global_ctx + .set_quic_proxy_port(Some(quic_dst.local_addr()?.port())); + self.quic_proxy_dst = Some(quic_dst); + Ok(()) + } + pub async fn run(&mut self) -> Result<(), Error> { self.listener_manager .lock() @@ -588,11 +601,12 @@ impl Instance { } if !self.global_ctx.get_flags().disable_quic_input { - let quic_dst = QUICProxyDst::new(self.global_ctx.clone())?; - quic_dst.start().await?; - self.global_ctx - .set_quic_proxy_port(Some(quic_dst.local_addr()?.port())); - self.quic_proxy_dst = Some(quic_dst); + if let Err(e) = self.run_quic_dst().await { + eprintln!( + "quic input start failed: {:?} (some platforms may not support)", + e + ); + } } // run after tun device created, so listener can bind to tun device, which may be required by win 10 diff --git a/easytier/src/instance/listeners.rs b/easytier/src/instance/listeners.rs index da03f99..19a36f0 100644 --- a/easytier/src/instance/listeners.rs +++ b/easytier/src/instance/listeners.rs @@ -179,11 +179,13 @@ impl ListenerManage peer_manager: Weak, global_ctx: ArcGlobalCtx, ) { + let mut err_count = 0; loop { let mut l = (creator)(); let _g = global_ctx.net_ns.guard(); match l.listen().await { Ok(_) => { + err_count = 0; global_ctx.add_running_listener(l.local_url()); global_ctx.issue_event(GlobalCtxEvent::ListenerAdded(l.local_url())); } @@ -193,8 +195,11 @@ impl ListenerManage l.local_url(), format!("error: {:?}, retry listen later...", e), )); + err_count += 1; + if err_count > 5 { + return; + } tokio::time::sleep(std::time::Duration::from_secs(1)).await; - continue; } } loop { diff --git a/easytier/src/tunnel/ring.rs b/easytier/src/tunnel/ring.rs index e0dd13a..d834a09 100644 --- a/easytier/src/tunnel/ring.rs +++ b/easytier/src/tunnel/ring.rs @@ -12,10 +12,7 @@ use async_trait::async_trait; use futures::{Sink, SinkExt, Stream, StreamExt}; use once_cell::sync::Lazy; -use tokio::sync::{ - mpsc::{UnboundedReceiver, UnboundedSender}, - Mutex, -}; +use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use uuid::Uuid; @@ -184,14 +181,17 @@ struct Connection { server: Arc, } -static CONNECTION_MAP: Lazy>>>>> = - Lazy::new(|| Arc::new(Mutex::new(HashMap::new()))); +static CONNECTION_MAP: Lazy< + Arc>>>>, +> = Lazy::new(|| Arc::new(std::sync::Mutex::new(HashMap::new()))); #[derive(Debug)] pub struct RingTunnelListener { listerner_addr: url::Url, conn_sender: UnboundedSender>, conn_receiver: UnboundedReceiver>, + + key_in_conn_map: Option, } impl RingTunnelListener { @@ -201,6 +201,7 @@ impl RingTunnelListener { listerner_addr: key, conn_sender, conn_receiver, + key_in_conn_map: None, } } } @@ -244,10 +245,12 @@ impl RingTunnelListener { impl TunnelListener for RingTunnelListener { async fn listen(&mut self) -> Result<(), TunnelError> { tracing::info!("listen new conn of key: {}", self.listerner_addr); + let addr = self.get_addr().await?; CONNECTION_MAP .lock() - .await - .insert(self.get_addr().await?, self.conn_sender.clone()); + .unwrap() + .insert(addr, self.conn_sender.clone()); + self.key_in_conn_map = Some(addr); Ok(()) } @@ -276,6 +279,14 @@ impl TunnelListener for RingTunnelListener { } } +impl Drop for RingTunnelListener { + fn drop(&mut self) { + if let Some(addr) = self.key_in_conn_map { + CONNECTION_MAP.lock().unwrap().remove(&addr); + } + } +} + pub struct RingTunnelConnector { remote_addr: url::Url, } @@ -297,7 +308,7 @@ impl TunnelConnector for RingTunnelConnector { .await?; let entry = CONNECTION_MAP .lock() - .await + .unwrap() .get(&remote_addr) .unwrap() .clone();