diff --git a/easytier/src/gateway/socks5.rs b/easytier/src/gateway/socks5.rs index 84b2cdb..0611c81 100644 --- a/easytier/src/gateway/socks5.rs +++ b/easytier/src/gateway/socks5.rs @@ -1,6 +1,9 @@ use std::{ net::{IpAddr, Ipv4Addr, SocketAddr}, - sync::{Arc, Weak}, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, Weak, + }, time::{Duration, Instant}, }; @@ -38,7 +41,7 @@ use tokio::{ io::{AsyncRead, AsyncWrite}, net::{TcpListener, TcpSocket, UdpSocket}, select, - sync::{mpsc, Mutex}, + sync::{mpsc, Mutex, Notify}, task::JoinSet, time::timeout, }; @@ -418,12 +421,21 @@ pub struct Socks5Server { kcp_endpoint: Mutex>>, - cancel_tokens: DashMap, + socks5_enabled: Arc, + cancel_tokens: Arc>, + port_forward_list_change_notifier: Arc, } #[async_trait::async_trait] impl PeerPacketFilter for Socks5Server { async fn try_process_packet_from_peer(&self, packet: ZCPacket) -> Option { + if self.cancel_tokens.is_empty() + && self.entries.is_empty() + && !self.socks5_enabled.load(Ordering::Relaxed) + { + return Some(packet); + } + let hdr = packet.peer_manager_header().unwrap(); if hdr.packet_type != PacketType::Data as u8 { return Some(packet); @@ -519,7 +531,9 @@ impl Socks5Server { kcp_endpoint: Mutex::new(None), - cancel_tokens: DashMap::new(), + socks5_enabled: Arc::new(AtomicBool::new(false)), + cancel_tokens: Arc::new(DashMap::new()), + port_forward_list_change_notifier: Arc::new(Notify::new()), }) } @@ -531,9 +545,18 @@ impl Socks5Server { let entries = self.entries.clone(); let tcp_forward_task = self.tcp_forward_task.clone(); let udp_client_map = self.udp_client_map.clone(); + let cancel_tokens = self.cancel_tokens.clone(); + let port_forward_list_change_notifier = self.port_forward_list_change_notifier.clone(); + let socks5_enabled = self.socks5_enabled.clone(); self.tasks.lock().unwrap().spawn(async move { let mut prev_ipv4 = None; loop { + if cancel_tokens.is_empty() && !socks5_enabled.load(Ordering::Relaxed) { + let _ = net.lock().await.take(); + port_forward_list_change_notifier.notified().await; + continue; + } + let mut event_recv = global_ctx.subscribe(); let cur_ipv4 = global_ctx.get_ipv4(); @@ -570,7 +593,6 @@ impl Socks5Server { kcp_endpoint: Option>, ) -> Result<(), Error> { *self.kcp_endpoint.lock().await = kcp_endpoint; - let mut need_start = false; if let Some(proxy_url) = self.global_ctx.config.get_socks5_portal() { let bind_addr = format!( "{}:{}", @@ -598,22 +620,18 @@ impl Socks5Server { } }); + self.socks5_enabled.store(true, Ordering::Relaxed); join_joinset_background(self.tasks.clone(), "socks5 server".to_string()); - - need_start = true; }; let cfgs = self.global_ctx.config.get_port_forwards(); self.reload_port_forwards(&cfgs).await?; - need_start = need_start || !cfgs.is_empty(); - if need_start { - self.peer_manager - .add_packet_process_pipeline(Box::new(self.clone())) - .await; + self.peer_manager + .add_packet_process_pipeline(Box::new(self.clone())) + .await; - self.run_net_update_task().await; - } + self.run_net_update_task().await; Ok(()) } @@ -635,6 +653,7 @@ impl Socks5Server { self.add_port_forward(cfg.clone()).await?; } } + self.port_forward_list_change_notifier.notify_one(); Ok(()) }