fix cli add port forward failed if initial forward list is empty (#1324)

This commit is contained in:
Sijie.Sun
2025-09-02 22:03:57 +08:00
committed by GitHub
parent b87a05b457
commit ef3309814d

View File

@@ -1,6 +1,9 @@
use std::{ use std::{
net::{IpAddr, Ipv4Addr, SocketAddr}, net::{IpAddr, Ipv4Addr, SocketAddr},
sync::{Arc, Weak}, sync::{
atomic::{AtomicBool, Ordering},
Arc, Weak,
},
time::{Duration, Instant}, time::{Duration, Instant},
}; };
@@ -38,7 +41,7 @@ use tokio::{
io::{AsyncRead, AsyncWrite}, io::{AsyncRead, AsyncWrite},
net::{TcpListener, TcpSocket, UdpSocket}, net::{TcpListener, TcpSocket, UdpSocket},
select, select,
sync::{mpsc, Mutex}, sync::{mpsc, Mutex, Notify},
task::JoinSet, task::JoinSet,
time::timeout, time::timeout,
}; };
@@ -418,12 +421,21 @@ pub struct Socks5Server {
kcp_endpoint: Mutex<Option<Weak<KcpEndpoint>>>, kcp_endpoint: Mutex<Option<Weak<KcpEndpoint>>>,
cancel_tokens: DashMap<PortForwardConfig, DropGuard>, socks5_enabled: Arc<AtomicBool>,
cancel_tokens: Arc<DashMap<PortForwardConfig, DropGuard>>,
port_forward_list_change_notifier: Arc<Notify>,
} }
#[async_trait::async_trait] #[async_trait::async_trait]
impl PeerPacketFilter for Socks5Server { impl PeerPacketFilter for Socks5Server {
async fn try_process_packet_from_peer(&self, packet: ZCPacket) -> Option<ZCPacket> { async fn try_process_packet_from_peer(&self, packet: ZCPacket) -> Option<ZCPacket> {
if self.cancel_tokens.is_empty()
&& self.entries.is_empty()
&& !self.socks5_enabled.load(Ordering::Relaxed)
{
return Some(packet);
}
let hdr = packet.peer_manager_header().unwrap(); let hdr = packet.peer_manager_header().unwrap();
if hdr.packet_type != PacketType::Data as u8 { if hdr.packet_type != PacketType::Data as u8 {
return Some(packet); return Some(packet);
@@ -519,7 +531,9 @@ impl Socks5Server {
kcp_endpoint: Mutex::new(None), kcp_endpoint: Mutex::new(None),
cancel_tokens: DashMap::new(), socks5_enabled: Arc::new(AtomicBool::new(false)),
cancel_tokens: Arc::new(DashMap::new()),
port_forward_list_change_notifier: Arc::new(Notify::new()),
}) })
} }
@@ -531,9 +545,18 @@ impl Socks5Server {
let entries = self.entries.clone(); let entries = self.entries.clone();
let tcp_forward_task = self.tcp_forward_task.clone(); let tcp_forward_task = self.tcp_forward_task.clone();
let udp_client_map = self.udp_client_map.clone(); let udp_client_map = self.udp_client_map.clone();
let cancel_tokens = self.cancel_tokens.clone();
let port_forward_list_change_notifier = self.port_forward_list_change_notifier.clone();
let socks5_enabled = self.socks5_enabled.clone();
self.tasks.lock().unwrap().spawn(async move { self.tasks.lock().unwrap().spawn(async move {
let mut prev_ipv4 = None; let mut prev_ipv4 = None;
loop { loop {
if cancel_tokens.is_empty() && !socks5_enabled.load(Ordering::Relaxed) {
let _ = net.lock().await.take();
port_forward_list_change_notifier.notified().await;
continue;
}
let mut event_recv = global_ctx.subscribe(); let mut event_recv = global_ctx.subscribe();
let cur_ipv4 = global_ctx.get_ipv4(); let cur_ipv4 = global_ctx.get_ipv4();
@@ -570,7 +593,6 @@ impl Socks5Server {
kcp_endpoint: Option<Weak<KcpEndpoint>>, kcp_endpoint: Option<Weak<KcpEndpoint>>,
) -> Result<(), Error> { ) -> Result<(), Error> {
*self.kcp_endpoint.lock().await = kcp_endpoint; *self.kcp_endpoint.lock().await = kcp_endpoint;
let mut need_start = false;
if let Some(proxy_url) = self.global_ctx.config.get_socks5_portal() { if let Some(proxy_url) = self.global_ctx.config.get_socks5_portal() {
let bind_addr = format!( let bind_addr = format!(
"{}:{}", "{}:{}",
@@ -598,22 +620,18 @@ impl Socks5Server {
} }
}); });
self.socks5_enabled.store(true, Ordering::Relaxed);
join_joinset_background(self.tasks.clone(), "socks5 server".to_string()); join_joinset_background(self.tasks.clone(), "socks5 server".to_string());
need_start = true;
}; };
let cfgs = self.global_ctx.config.get_port_forwards(); let cfgs = self.global_ctx.config.get_port_forwards();
self.reload_port_forwards(&cfgs).await?; self.reload_port_forwards(&cfgs).await?;
need_start = need_start || !cfgs.is_empty();
if need_start { self.peer_manager
self.peer_manager .add_packet_process_pipeline(Box::new(self.clone()))
.add_packet_process_pipeline(Box::new(self.clone())) .await;
.await;
self.run_net_update_task().await; self.run_net_update_task().await;
}
Ok(()) Ok(())
} }
@@ -635,6 +653,7 @@ impl Socks5Server {
self.add_port_forward(cfg.clone()).await?; self.add_port_forward(cfg.clone()).await?;
} }
} }
self.port_forward_list_change_notifier.notify_one();
Ok(()) Ok(())
} }