enhance port forward (#1662)

This commit is contained in:
KKRainbow
2025-12-09 22:16:16 +08:00
committed by GitHub
parent fe4dff5df0
commit 7aba65ea32
2 changed files with 124 additions and 52 deletions

View File

@@ -1,4 +1,5 @@
use std::{
any::Any,
net::{IpAddr, Ipv4Addr, SocketAddr},
sync::{
atomic::{AtomicBool, Ordering},
@@ -204,6 +205,7 @@ impl AsyncTcpConnector for SmolTcpConnector {
impl Drop for SmolTcpConnector {
fn drop(&mut self) {
if let Some(entry) = self.current_entry.lock().unwrap().take() {
tracing::debug!("drop smoltcp connector entry {:?}", entry);
self.entries.remove(&entry);
}
}
@@ -240,6 +242,73 @@ impl AsyncTcpConnector for Socks5KcpConnector {
}
}
struct Socks5AutoConnector {
kcp_endpoint: Option<Weak<KcpEndpoint>>,
peer_mgr: Weak<PeerManager>,
entries: Socks5EntrySet,
smoltcp_net: Option<Arc<Net>>,
src_addr: SocketAddr,
inner_connector: parking_lot::Mutex<Option<Box<dyn Any + Send>>>,
}
#[async_trait::async_trait]
impl AsyncTcpConnector for Socks5AutoConnector {
type S = SocksTcpStream;
async fn tcp_connect(
&self,
mut addr: SocketAddr,
timeout_s: u64,
) -> crate::gateway::fast_socks5::Result<SocksTcpStream> {
if self.inner_connector.lock().is_some() {
return Err(anyhow::anyhow!("inner connector is already set").into());
}
let Some(peer_mgr_arc) = self.peer_mgr.upgrade() else {
tracing::error!("peer manager is dropped");
return Err(anyhow::anyhow!("peer manager is dropped").into());
};
if let Some(local_addr) = self.smoltcp_net.as_ref().map(|n| n.get_address()) {
if local_addr == addr.ip() {
addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), addr.port());
}
}
if self.smoltcp_net.is_none()
|| peer_mgr_arc.get_msg_dst_peer(&addr.ip()).await.0.is_empty()
|| addr.ip().is_loopback()
{
// cannot find dst in virtual network, so try connect to dst directly
return Ok(SocksTcpStream::Tcp(
tcp_connect_with_timeout(addr, timeout_s).await?,
));
}
let dst_allow_kcp = peer_mgr_arc.check_allow_kcp_to_dst(&addr.ip()).await;
tracing::debug!("dst_allow_kcp: {:?}", dst_allow_kcp);
let connector: Box<dyn AsyncTcpConnector<S = SocksTcpStream> + Send> =
match (&self.kcp_endpoint, dst_allow_kcp) {
(Some(kcp_endpoint), true) => Box::new(Socks5KcpConnector {
kcp_endpoint: kcp_endpoint.clone(),
peer_mgr: self.peer_mgr.clone(),
src_addr: self.src_addr,
}),
(_, _) => Box::new(SmolTcpConnector {
net: self.smoltcp_net.clone().unwrap(),
entries: self.entries.clone(),
current_entry: std::sync::Mutex::new(None),
}),
};
let ret = connector.tcp_connect(addr, timeout_s).await;
self.inner_connector.lock().replace(Box::new(connector));
ret
}
}
fn bind_tcp_socket(addr: SocketAddr, net_ns: NetNS) -> Result<TcpListener, Error> {
let _g = net_ns.guard();
let socket2_socket = socket2::Socket::new(
@@ -359,23 +428,14 @@ impl Socks5ServerNet {
}
}
fn handle_tcp_stream(&self, stream: tokio::net::TcpStream) {
async fn handle_tcp_stream_task(stream: tokio::net::TcpStream, connector: Socks5AutoConnector) {
let mut config = Config::<AcceptAuthentication>::default();
config.set_request_timeout(10);
config.set_skip_auth(false);
config.set_allow_no_auth(true);
let socket = Socks5Socket::new(
stream,
Arc::new(config),
SmolTcpConnector {
net: self.smoltcp_net.clone(),
entries: self.entries.clone(),
current_entry: std::sync::Mutex::new(None),
},
);
let socket = Socks5Socket::new(stream, Arc::new(config), connector);
self.forward_tasks.lock().unwrap().spawn(async move {
match socket.upgrade_to_socks5().await {
Ok(_) => {
tracing::info!("socks5 handle success");
@@ -384,7 +444,13 @@ impl Socks5ServerNet {
tracing::error!("socks5 handshake failed: {:?}", e);
}
};
});
}
fn handle_tcp_stream(&self, stream: tokio::net::TcpStream, connector: Socks5AutoConnector) {
self.forward_tasks
.lock()
.unwrap()
.spawn(Self::handle_tcp_stream_task(stream, connector));
}
}
@@ -596,7 +662,7 @@ impl Socks5Server {
self: &Arc<Self>,
kcp_endpoint: Option<Weak<KcpEndpoint>>,
) -> Result<(), Error> {
*self.kcp_endpoint.lock().await = kcp_endpoint;
*self.kcp_endpoint.lock().await = kcp_endpoint.clone();
if let Some(proxy_url) = self.global_ctx.config.get_socks5_portal() {
let bind_addr = format!(
"{}:{}",
@@ -609,14 +675,32 @@ impl Socks5Server {
self.global_ctx.net_ns.clone(),
)?;
let entries = self.entries.clone();
let peer_manager = Arc::downgrade(&self.peer_manager);
let net = self.net.clone();
self.tasks.lock().unwrap().spawn(async move {
loop {
match listener.accept().await {
Ok((socket, _addr)) => {
Ok((socket, addr)) => {
tracing::info!("accept a new connection, {:?}", socket);
let connector = Socks5AutoConnector {
smoltcp_net: net
.lock()
.await
.as_ref()
.map(|net| net.smoltcp_net.clone()),
entries: entries.clone(),
kcp_endpoint: kcp_endpoint.clone(),
peer_mgr: peer_manager.clone(),
src_addr: addr,
inner_connector: parking_lot::Mutex::new(None),
};
if let Some(net) = net.lock().await.as_ref() {
net.handle_tcp_stream(socket);
net.handle_tcp_stream(socket, connector);
} else {
tokio::spawn(Socks5ServerNet::handle_tcp_stream_task(
socket, connector,
));
}
}
Err(err) => tracing::error!("accept error = {:?}", err),
@@ -752,32 +836,13 @@ impl Socks5Server {
dst_addr
);
let net_guard = net.lock().await;
let Some(net) = net_guard.as_ref() else {
tracing::error!("net is not ready");
continue;
};
let Some(peer_mgr_arc) = peer_mgr.upgrade() else {
tracing::error!("peer manager is dropped");
continue;
};
let dst_allow_kcp = peer_mgr_arc.check_allow_kcp_to_dst(&dst_addr.ip()).await;
tracing::debug!("dst_allow_kcp: {:?}", dst_allow_kcp);
let connector: Box<dyn AsyncTcpConnector<S = SocksTcpStream> + Send> =
match (&kcp_endpoint, dst_allow_kcp) {
(Some(kcp_endpoint), true) => Box::new(Socks5KcpConnector {
let connector = Socks5AutoConnector {
kcp_endpoint: kcp_endpoint.clone(),
peer_mgr: peer_mgr.clone(),
src_addr: addr,
}),
(_, _) => Box::new(SmolTcpConnector {
net: net.smoltcp_net.clone(),
entries: entries.clone(),
current_entry: std::sync::Mutex::new(None),
}),
smoltcp_net: net.lock().await.as_ref().map(|net| net.smoltcp_net.clone()),
src_addr: addr,
inner_connector: parking_lot::Mutex::new(None),
};
forward_tasks
@@ -785,7 +850,7 @@ impl Socks5Server {
.unwrap()
.spawn(Self::handle_port_forward_connection(
incoming_socket,
connector,
Box::new(connector),
dst_addr,
));
}

View File

@@ -1063,7 +1063,14 @@ impl PeerManager {
}
}
pub async fn get_msg_dst_peer(&self, ipv4_addr: &Ipv4Addr) -> (Vec<PeerId>, bool) {
pub async fn get_msg_dst_peer(&self, addr: &IpAddr) -> (Vec<PeerId>, bool) {
match addr {
IpAddr::V4(ipv4_addr) => self.get_msg_dst_peer_ipv4(ipv4_addr).await,
IpAddr::V6(ipv6_addr) => self.get_msg_dst_peer_ipv6(ipv6_addr).await,
}
}
pub async fn get_msg_dst_peer_ipv4(&self, ipv4_addr: &Ipv4Addr) -> (Vec<PeerId>, bool) {
let mut is_exit_node = false;
let mut dst_peers = vec![];
let network_length = self
@@ -1189,7 +1196,7 @@ impl PeerManager {
}
let (dst_peers, is_exit_node) = match ip_addr {
IpAddr::V4(ipv4_addr) => self.get_msg_dst_peer(&ipv4_addr).await,
IpAddr::V4(ipv4_addr) => self.get_msg_dst_peer_ipv4(&ipv4_addr).await,
IpAddr::V6(ipv6_addr) => self.get_msg_dst_peer_ipv6(&ipv6_addr).await,
};