diff --git a/easytier/src/gateway/socks5.rs b/easytier/src/gateway/socks5.rs index d430894..43666ca 100644 --- a/easytier/src/gateway/socks5.rs +++ b/easytier/src/gateway/socks5.rs @@ -1,4 +1,5 @@ use std::{ + any::Any, net::{IpAddr, Ipv4Addr, SocketAddr}, sync::{ atomic::{AtomicBool, Ordering}, @@ -204,6 +205,7 @@ impl AsyncTcpConnector for SmolTcpConnector { impl Drop for SmolTcpConnector { fn drop(&mut self) { if let Some(entry) = self.current_entry.lock().unwrap().take() { + tracing::debug!("drop smoltcp connector entry {:?}", entry); self.entries.remove(&entry); } } @@ -240,6 +242,73 @@ impl AsyncTcpConnector for Socks5KcpConnector { } } +struct Socks5AutoConnector { + kcp_endpoint: Option>, + peer_mgr: Weak, + entries: Socks5EntrySet, + smoltcp_net: Option>, + src_addr: SocketAddr, + + inner_connector: parking_lot::Mutex>>, +} + +#[async_trait::async_trait] +impl AsyncTcpConnector for Socks5AutoConnector { + type S = SocksTcpStream; + + async fn tcp_connect( + &self, + mut addr: SocketAddr, + timeout_s: u64, + ) -> crate::gateway::fast_socks5::Result { + if self.inner_connector.lock().is_some() { + return Err(anyhow::anyhow!("inner connector is already set").into()); + } + + let Some(peer_mgr_arc) = self.peer_mgr.upgrade() else { + tracing::error!("peer manager is dropped"); + return Err(anyhow::anyhow!("peer manager is dropped").into()); + }; + + if let Some(local_addr) = self.smoltcp_net.as_ref().map(|n| n.get_address()) { + if local_addr == addr.ip() { + addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), addr.port()); + } + } + + if self.smoltcp_net.is_none() + || peer_mgr_arc.get_msg_dst_peer(&addr.ip()).await.0.is_empty() + || addr.ip().is_loopback() + { + // cannot find dst in virtual network, so try connect to dst directly + return Ok(SocksTcpStream::Tcp( + tcp_connect_with_timeout(addr, timeout_s).await?, + )); + } + + let dst_allow_kcp = peer_mgr_arc.check_allow_kcp_to_dst(&addr.ip()).await; + tracing::debug!("dst_allow_kcp: {:?}", dst_allow_kcp); + + let connector: Box + Send> = + match (&self.kcp_endpoint, dst_allow_kcp) { + (Some(kcp_endpoint), true) => Box::new(Socks5KcpConnector { + kcp_endpoint: kcp_endpoint.clone(), + peer_mgr: self.peer_mgr.clone(), + src_addr: self.src_addr, + }), + (_, _) => Box::new(SmolTcpConnector { + net: self.smoltcp_net.clone().unwrap(), + entries: self.entries.clone(), + current_entry: std::sync::Mutex::new(None), + }), + }; + + let ret = connector.tcp_connect(addr, timeout_s).await; + self.inner_connector.lock().replace(Box::new(connector)); + ret + } +} + fn bind_tcp_socket(addr: SocketAddr, net_ns: NetNS) -> Result { let _g = net_ns.guard(); let socket2_socket = socket2::Socket::new( @@ -359,32 +428,29 @@ impl Socks5ServerNet { } } - fn handle_tcp_stream(&self, stream: tokio::net::TcpStream) { + async fn handle_tcp_stream_task(stream: tokio::net::TcpStream, connector: Socks5AutoConnector) { let mut config = Config::::default(); config.set_request_timeout(10); config.set_skip_auth(false); config.set_allow_no_auth(true); - let socket = Socks5Socket::new( - stream, - Arc::new(config), - SmolTcpConnector { - net: self.smoltcp_net.clone(), - entries: self.entries.clone(), - current_entry: std::sync::Mutex::new(None), - }, - ); + let socket = Socks5Socket::new(stream, Arc::new(config), connector); - self.forward_tasks.lock().unwrap().spawn(async move { - match socket.upgrade_to_socks5().await { - Ok(_) => { - tracing::info!("socks5 handle success"); - } - Err(e) => { - tracing::error!("socks5 handshake failed: {:?}", e); - } - }; - }); + match socket.upgrade_to_socks5().await { + Ok(_) => { + tracing::info!("socks5 handle success"); + } + Err(e) => { + tracing::error!("socks5 handshake failed: {:?}", e); + } + }; + } + + fn handle_tcp_stream(&self, stream: tokio::net::TcpStream, connector: Socks5AutoConnector) { + self.forward_tasks + .lock() + .unwrap() + .spawn(Self::handle_tcp_stream_task(stream, connector)); } } @@ -596,7 +662,7 @@ impl Socks5Server { self: &Arc, kcp_endpoint: Option>, ) -> Result<(), Error> { - *self.kcp_endpoint.lock().await = kcp_endpoint; + *self.kcp_endpoint.lock().await = kcp_endpoint.clone(); if let Some(proxy_url) = self.global_ctx.config.get_socks5_portal() { let bind_addr = format!( "{}:{}", @@ -609,14 +675,32 @@ impl Socks5Server { self.global_ctx.net_ns.clone(), )?; + let entries = self.entries.clone(); + let peer_manager = Arc::downgrade(&self.peer_manager); let net = self.net.clone(); self.tasks.lock().unwrap().spawn(async move { loop { match listener.accept().await { - Ok((socket, _addr)) => { + Ok((socket, addr)) => { tracing::info!("accept a new connection, {:?}", socket); + let connector = Socks5AutoConnector { + smoltcp_net: net + .lock() + .await + .as_ref() + .map(|net| net.smoltcp_net.clone()), + entries: entries.clone(), + kcp_endpoint: kcp_endpoint.clone(), + peer_mgr: peer_manager.clone(), + src_addr: addr, + inner_connector: parking_lot::Mutex::new(None), + }; if let Some(net) = net.lock().await.as_ref() { - net.handle_tcp_stream(socket); + net.handle_tcp_stream(socket, connector); + } else { + tokio::spawn(Socks5ServerNet::handle_tcp_stream_task( + socket, connector, + )); } } Err(err) => tracing::error!("accept error = {:?}", err), @@ -752,40 +836,21 @@ impl Socks5Server { dst_addr ); - let net_guard = net.lock().await; - let Some(net) = net_guard.as_ref() else { - tracing::error!("net is not ready"); - continue; + let connector = Socks5AutoConnector { + kcp_endpoint: kcp_endpoint.clone(), + peer_mgr: peer_mgr.clone(), + entries: entries.clone(), + smoltcp_net: net.lock().await.as_ref().map(|net| net.smoltcp_net.clone()), + src_addr: addr, + inner_connector: parking_lot::Mutex::new(None), }; - let Some(peer_mgr_arc) = peer_mgr.upgrade() else { - tracing::error!("peer manager is dropped"); - continue; - }; - - let dst_allow_kcp = peer_mgr_arc.check_allow_kcp_to_dst(&dst_addr.ip()).await; - tracing::debug!("dst_allow_kcp: {:?}", dst_allow_kcp); - - let connector: Box + Send> = - match (&kcp_endpoint, dst_allow_kcp) { - (Some(kcp_endpoint), true) => Box::new(Socks5KcpConnector { - kcp_endpoint: kcp_endpoint.clone(), - peer_mgr: peer_mgr.clone(), - src_addr: addr, - }), - (_, _) => Box::new(SmolTcpConnector { - net: net.smoltcp_net.clone(), - entries: entries.clone(), - current_entry: std::sync::Mutex::new(None), - }), - }; - forward_tasks .lock() .unwrap() .spawn(Self::handle_port_forward_connection( incoming_socket, - connector, + Box::new(connector), dst_addr, )); } diff --git a/easytier/src/peers/peer_manager.rs b/easytier/src/peers/peer_manager.rs index 19b1b50..871b23e 100644 --- a/easytier/src/peers/peer_manager.rs +++ b/easytier/src/peers/peer_manager.rs @@ -1063,7 +1063,14 @@ impl PeerManager { } } - pub async fn get_msg_dst_peer(&self, ipv4_addr: &Ipv4Addr) -> (Vec, bool) { + pub async fn get_msg_dst_peer(&self, addr: &IpAddr) -> (Vec, bool) { + match addr { + IpAddr::V4(ipv4_addr) => self.get_msg_dst_peer_ipv4(ipv4_addr).await, + IpAddr::V6(ipv6_addr) => self.get_msg_dst_peer_ipv6(ipv6_addr).await, + } + } + + pub async fn get_msg_dst_peer_ipv4(&self, ipv4_addr: &Ipv4Addr) -> (Vec, bool) { let mut is_exit_node = false; let mut dst_peers = vec![]; let network_length = self @@ -1189,7 +1196,7 @@ impl PeerManager { } let (dst_peers, is_exit_node) = match ip_addr { - IpAddr::V4(ipv4_addr) => self.get_msg_dst_peer(&ipv4_addr).await, + IpAddr::V4(ipv4_addr) => self.get_msg_dst_peer_ipv4(&ipv4_addr).await, IpAddr::V6(ipv6_addr) => self.get_msg_dst_peer_ipv6(&ipv6_addr).await, };