From 25dcdc652aa8baf5b52561ff205796506bc3b11a Mon Sep 17 00:00:00 2001 From: "Sijie.Sun" Date: Sat, 14 Jun 2025 11:42:45 +0800 Subject: [PATCH] support mapping subnet proxy (#978) - **support mapping subproxy network cidr** - **add command line option for proxy network mapping** - **fix Instance leak in tests. --- Cargo.lock | 29 ++--- easytier/Cargo.toml | 2 +- easytier/locales/app.yml | 10 +- easytier/src/common/config.rs | 37 +++--- easytier/src/common/global_ctx.rs | 26 +---- easytier/src/common/stun.rs | 15 ++- easytier/src/connector/manual.rs | 50 +++++---- easytier/src/easytier-core.rs | 9 +- easytier/src/gateway/icmp_proxy.rs | 96 ++++++++++++---- easytier/src/gateway/kcp_proxy.rs | 35 ++++-- easytier/src/gateway/mod.rs | 35 +++++- easytier/src/gateway/socks5.rs | 5 +- easytier/src/gateway/tcp_proxy.rs | 55 +++++---- easytier/src/gateway/udp_proxy.rs | 15 ++- easytier/src/instance/instance.rs | 105 +++++++++++++----- easytier/src/instance/listeners.rs | 17 ++- easytier/src/launcher.rs | 42 +++++-- easytier/src/peer_center/instance.rs | 31 ++++-- easytier/src/peer_center/mod.rs | 2 + easytier/src/peers/peer_manager.rs | 18 ++- easytier/src/peers/peer_ospf_route.rs | 4 +- .../src/proto/rpc_impl/service_registry.rs | 4 + easytier/src/tests/three_node.rs | 95 +++++++++++++--- 23 files changed, 521 insertions(+), 216 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 78ff920..fe3eecb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4118,7 +4118,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4" dependencies = [ "cfg-if", - "windows-targets 0.52.6", + "windows-targets 0.48.5", ] [[package]] @@ -6092,9 +6092,9 @@ dependencies = [ [[package]] name = "quote" -version = "1.0.36" +version = "1.0.40" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0fa76aaf39101c457836aec0ce2316dbdc3ab723cdda1c6bd4e6ad4208acaca7" +checksum = "1885c039570dc00dcb4ff087a89e185fd56bae234ddc7f056a945bf36467248d" dependencies = [ "proc-macro2", ] @@ -6513,24 +6513,25 @@ dependencies = [ [[package]] name = "rstest" -version = "0.18.2" +version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97eeab2f3c0a199bc4be135c36c924b6590b88c377d416494288c14f2db30199" +checksum = "6fc39292f8613e913f7df8fa892b8944ceb47c247b78e1b1ae2f09e019be789d" dependencies = [ - "futures", "futures-timer", + "futures-util", "rstest_macros", "rustc_version", ] [[package]] name = "rstest_macros" -version = "0.18.2" +version = "0.25.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d428f8247852f894ee1be110b375111b586d4fa431f6c46e64ba5a0dcccbe605" +checksum = "1f168d99749d307be9de54d23fd226628d99768225ef08f6ffb52e0182a27746" dependencies = [ "cfg-if", "glob", + "proc-macro-crate 3.2.0", "proc-macro2", "quote", "regex", @@ -6658,9 +6659,9 @@ checksum = "c7fb8039b3032c191086b10f11f319a6e99e1e82889c5cc6046f515c9db1d497" [[package]] name = "rustc_version" -version = "0.4.0" +version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366" +checksum = "cfcb3a22ef46e85b45de6ee7e79d063319ebb6594faafcf1c225ea92ab6e9b92" dependencies = [ "semver", ] @@ -8542,9 +8543,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.44.1" +version = "1.45.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f382da615b842244d4b8738c82ed1275e6c5dd90c459a30941cd07080b06c91a" +checksum = "75ef51a33ef1da925cea3e4eb122833cb377c61439ca401b770f54902b806779" dependencies = [ "backtrace", "bytes", @@ -9117,9 +9118,9 @@ checksum = "08f95100a766bf4f8f28f90d77e0a5461bbdb219042e7679bebe79004fed8d75" [[package]] name = "unicode-ident" -version = "1.0.12" +version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3354b9ac3fae1ff6755cb6db53683adb661634f67557942dea4facebec0fee4b" +checksum = "5a5f39404a5da50712a4c1eecf25e90dd62b613502b7e925fd4e4d19b5c96512" [[package]] name = "unicode-normalization" diff --git a/easytier/Cargo.toml b/easytier/Cargo.toml index 522732f..dddb50b 100644 --- a/easytier/Cargo.toml +++ b/easytier/Cargo.toml @@ -270,7 +270,7 @@ thunk-rs = { git = "https://github.com/easytier/thunk.git", default-features = f [dev-dependencies] serial_test = "3.0.0" -rstest = "0.18.2" +rstest = "0.25.0" futures-util = "0.3.30" maplit = "1.0.2" diff --git a/easytier/locales/app.yml b/easytier/locales/app.yml index e5377c3..a3e985e 100644 --- a/easytier/locales/app.yml +++ b/easytier/locales/app.yml @@ -32,8 +32,14 @@ core_clap: en: "use a public shared node to discover peers" zh-CN: "使用公共共享节点来发现对等节点" proxy_networks: - en: "export local networks to other peers in the vpn" - zh-CN: "将本地网络导出到VPN中的其他对等节点" + en: |+ + export local networks to other peers in the vpn, e.g.: 10.0.0.0/24. + also support mapping proxy network to other cidr, e.g.: 10.0.0.0/24->192.168.0.0/24 + other peers can access 10.0.0.1 with ip 192.168.0.1 + zh-CN: |+ + 将本地网络导出到VPN中的其他对等节点,例如:10.0.0.0/24。 + 还支持将代理网络映射到其他CIDR,例如:10.0.0.0/24->192.168.0.0/24 + 其他对等节点可以通过 IP 192.168.0.1 来访问 10.0.0.1 rpc_portal: en: "rpc portal address to listen for management. 0 means random port, 12345 means listen on 12345 of localhost, 0.0.0.0:12345 means listen on 12345 of all interfaces. default is 0 and will try 15888 first" zh-CN: "用于管理的RPC门户地址。0表示随机端口,12345表示在localhost的12345上监听,0.0.0.0:12345表示在所有接口的12345上监听。默认是0,首先尝试15888" diff --git a/easytier/src/common/config.rs b/easytier/src/common/config.rs index 3400430..fee9f0e 100644 --- a/easytier/src/common/config.rs +++ b/easytier/src/common/config.rs @@ -62,9 +62,9 @@ pub trait ConfigLoader: Send + Sync { fn get_dhcp(&self) -> bool; fn set_dhcp(&self, dhcp: bool); - fn add_proxy_cidr(&self, cidr: cidr::IpCidr); - fn remove_proxy_cidr(&self, cidr: cidr::IpCidr); - fn get_proxy_cidrs(&self) -> Vec; + fn add_proxy_cidr(&self, cidr: cidr::Ipv4Cidr, mapped_cidr: Option); + fn remove_proxy_cidr(&self, cidr: cidr::Ipv4Cidr); + fn get_proxy_cidrs(&self) -> Vec; fn get_network_identity(&self) -> NetworkIdentity; fn set_network_identity(&self, identity: NetworkIdentity); @@ -171,7 +171,8 @@ pub struct PeerConfig { #[derive(Debug, Clone, Deserialize, Serialize, PartialEq)] pub struct ProxyNetworkConfig { - pub cidr: String, + pub cidr: cidr::Ipv4Cidr, // the CIDR of the proxy network + pub mapped_cidr: Option, // allow remap the proxy CIDR to another CIDR pub allow: Option>, } @@ -418,50 +419,52 @@ impl ConfigLoader for TomlConfigLoader { self.config.lock().unwrap().dhcp = Some(dhcp); } - fn add_proxy_cidr(&self, cidr: cidr::IpCidr) { + fn add_proxy_cidr(&self, cidr: cidr::Ipv4Cidr, mapped_cidr: Option) { let mut locked_config = self.config.lock().unwrap(); if locked_config.proxy_network.is_none() { locked_config.proxy_network = Some(vec![]); } - let cidr_str = cidr.to_string(); + if let Some(mapped_cidr) = mapped_cidr.as_ref() { + assert_eq!( + cidr.network_length(), + mapped_cidr.network_length(), + "Mapped CIDR must have the same network length as the original CIDR", + ); + } // insert if no duplicate if !locked_config .proxy_network .as_ref() .unwrap() .iter() - .any(|c| c.cidr == cidr_str) + .any(|c| c.cidr == cidr) { locked_config .proxy_network .as_mut() .unwrap() .push(ProxyNetworkConfig { - cidr: cidr_str, + cidr, + mapped_cidr, allow: None, }); } } - fn remove_proxy_cidr(&self, cidr: cidr::IpCidr) { + fn remove_proxy_cidr(&self, cidr: cidr::Ipv4Cidr) { let mut locked_config = self.config.lock().unwrap(); if let Some(proxy_cidrs) = &mut locked_config.proxy_network { - let cidr_str = cidr.to_string(); - proxy_cidrs.retain(|c| c.cidr != cidr_str); + proxy_cidrs.retain(|c| c.cidr != cidr); } } - fn get_proxy_cidrs(&self) -> Vec { + fn get_proxy_cidrs(&self) -> Vec { self.config .lock() .unwrap() .proxy_network .as_ref() - .map(|v| { - v.iter() - .map(|c| c.cidr.parse().unwrap()) - .collect::>() - }) + .cloned() .unwrap_or_default() } diff --git a/easytier/src/common/global_ctx.rs b/easytier/src/common/global_ctx.rs index baf8e93..556d068 100644 --- a/easytier/src/common/global_ctx.rs +++ b/easytier/src/common/global_ctx.rs @@ -4,6 +4,7 @@ use std::{ sync::{Arc, Mutex}, }; +use crate::common::config::ProxyNetworkConfig; use crate::proto::cli::PeerConnInfo; use crate::proto::common::{PeerFeatureFlag, PortForwardConfigPb}; use crossbeam::atomic::AtomicCell; @@ -59,7 +60,7 @@ pub struct GlobalCtx { event_bus: EventBus, cached_ipv4: AtomicCell>, - cached_proxy_cidrs: AtomicCell>>, + cached_proxy_cidrs: AtomicCell>>, ip_collector: Mutex>>, @@ -182,29 +183,6 @@ impl GlobalCtx { self.cached_ipv4.store(None); } - pub fn add_proxy_cidr(&self, cidr: cidr::IpCidr) -> Result<(), std::io::Error> { - self.config.add_proxy_cidr(cidr); - self.cached_proxy_cidrs.store(None); - Ok(()) - } - - pub fn remove_proxy_cidr(&self, cidr: cidr::IpCidr) -> Result<(), std::io::Error> { - self.config.remove_proxy_cidr(cidr); - self.cached_proxy_cidrs.store(None); - Ok(()) - } - - pub fn get_proxy_cidrs(&self) -> Vec { - if let Some(proxy_cidrs) = self.cached_proxy_cidrs.take() { - self.cached_proxy_cidrs.store(Some(proxy_cidrs.clone())); - return proxy_cidrs; - } - - let ret = self.config.get_proxy_cidrs(); - self.cached_proxy_cidrs.store(Some(ret.clone())); - ret - } - pub fn get_id(&self) -> uuid::Uuid { self.config.get_id() } diff --git a/easytier/src/common/stun.rs b/easytier/src/common/stun.rs index a250ae0..f390c54 100644 --- a/easytier/src/common/stun.rs +++ b/easytier/src/common/stun.rs @@ -955,9 +955,18 @@ mod tests { async fn test_txt_public_stun_server() { let stun_servers = vec!["txt:stun.easytier.cn".to_string()]; let detector = UdpNatTypeDetector::new(stun_servers, 1); - let ret = detector.detect_nat_type(0).await; - println!("{:#?}, {:?}", ret, ret.as_ref().unwrap().nat_type()); - assert!(!ret.unwrap().stun_resps.is_empty()); + for _ in 0..5 { + let ret = detector.detect_nat_type(0).await; + println!("{:#?}, {:?}", ret, ret.as_ref().unwrap().nat_type()); + if ret.is_ok() { + assert!(!ret.unwrap().stun_resps.is_empty()); + return; + } + } + debug_assert!( + false, + "should not reach here, stun server should be available" + ); } #[tokio::test] diff --git a/easytier/src/connector/manual.rs b/easytier/src/connector/manual.rs index edd737d..1ece6f4 100644 --- a/easytier/src/connector/manual.rs +++ b/easytier/src/connector/manual.rs @@ -1,4 +1,7 @@ -use std::{collections::BTreeSet, sync::Arc}; +use std::{ + collections::BTreeSet, + sync::{Arc, Weak}, +}; use anyhow::Context; use dashmap::{DashMap, DashSet}; @@ -12,7 +15,7 @@ use tokio::{ }; use crate::{ - common::PeerId, + common::{join_joinset_background, PeerId}, peers::peer_conn::PeerConnId, proto::{ cli::{ @@ -53,7 +56,7 @@ struct ReconnResult { struct ConnectorManagerData { connectors: ConnectorMap, reconnecting: DashSet, - peer_manager: Arc, + peer_manager: Weak, alive_conn_urls: Arc>, // user removed connector urls removed_conn_urls: Arc>, @@ -78,7 +81,7 @@ impl ManualConnectorManager { data: Arc::new(ConnectorManagerData { connectors, reconnecting: DashSet::new(), - peer_manager, + peer_manager: Arc::downgrade(&peer_manager), alive_conn_urls: Arc::new(DashSet::new()), removed_conn_urls: Arc::new(DashSet::new()), net_ns: global_ctx.net_ns.clone(), @@ -190,20 +193,18 @@ impl ManualConnectorManager { tracing::warn!("event_recv lagged: {}, rebuild alive conn list", n); event_recv = event_recv.resubscribe(); data.alive_conn_urls.clear(); - for x in data - .peer_manager - .get_peer_map() - .get_alive_conns() - .iter() - .map(|x| { - x.tunnel - .clone() - .unwrap_or_default() - .remote_addr - .unwrap_or_default() - .to_string() - }) - { + let Some(pm) = data.peer_manager.upgrade() else { + tracing::warn!("peer manager is gone, exit"); + break; + }; + for x in pm.get_peer_map().get_alive_conns().iter().map(|x| { + x.tunnel + .clone() + .unwrap_or_default() + .remote_addr + .unwrap_or_default() + .to_string() + }) { data.alive_conn_urls.insert(x); } continue; @@ -222,6 +223,8 @@ impl ManualConnectorManager { use_global_var!(MANUAL_CONNECTOR_RECONNECT_INTERVAL_MS), )); let (reconn_result_send, mut reconn_result_recv) = mpsc::channel(100); + let tasks = Arc::new(std::sync::Mutex::new(JoinSet::new())); + join_joinset_background(tasks.clone(), "connector_reconnect_tasks".to_string()); loop { tokio::select! { @@ -237,7 +240,7 @@ impl ManualConnectorManager { let insert_succ = data.reconnecting.insert(dead_url.clone()); assert!(insert_succ); - tokio::spawn(async move { + tasks.lock().unwrap().spawn(async move { let reconn_ret = Self::conn_reconnect(data_clone.clone(), dead_url.clone(), connector.clone()).await; sender.send(reconn_ret).await.unwrap(); @@ -340,8 +343,13 @@ impl ManualConnectorManager { connector.lock().await.remote_url().clone(), )); tracing::info!("reconnect try connect... conn: {:?}", connector); - let (peer_id, conn_id) = data - .peer_manager + let Some(pm) = data.peer_manager.upgrade() else { + return Err(Error::AnyhowError(anyhow::anyhow!( + "peer manager is gone, cannot reconnect" + ))); + }; + + let (peer_id, conn_id) = pm .try_direct_connect(connector.lock().await.as_mut()) .await?; tracing::info!("reconnect succ: {} {} {}", peer_id, conn_id, dead_url); diff --git a/easytier/src/easytier-core.rs b/easytier/src/easytier-core.rs index e733a32..5184126 100644 --- a/easytier/src/easytier-core.rs +++ b/easytier/src/easytier-core.rs @@ -25,8 +25,8 @@ use easytier::{ stun::MockStunInfoCollector, }, connector::create_connector_by_url, - launcher::ConfigSource, instance_manager::NetworkInstanceManager, + launcher::{add_proxy_network_to_config, ConfigSource}, proto::common::{CompressionAlgoPb, NatType}, tunnel::{IpVersion, PROTO_PORT_OFFSET}, utils::{init_logger, setup_panic_handler}, @@ -540,7 +540,7 @@ impl Cli { impl NetworkOptions { fn can_merge(&self, cfg: &TomlConfigLoader, config_file_count: usize) -> bool { - if config_file_count == 1{ + if config_file_count == 1 { return true; } let Some(network_name) = &self.network_name else { @@ -624,10 +624,7 @@ impl NetworkOptions { } for n in self.proxy_networks.iter() { - cfg.add_proxy_cidr( - n.parse() - .with_context(|| format!("failed to parse proxy network: {}", n))?, - ); + add_proxy_network_to_config(n, &cfg)?; } let rpc_portal = if let Some(r) = &self.rpc_portal { diff --git a/easytier/src/gateway/icmp_proxy.rs b/easytier/src/gateway/icmp_proxy.rs index 55156e2..efde1de 100644 --- a/easytier/src/gateway/icmp_proxy.rs +++ b/easytier/src/gateway/icmp_proxy.rs @@ -1,7 +1,7 @@ use std::{ mem::MaybeUninit, net::{IpAddr, Ipv4Addr, SocketAddrV4}, - sync::Arc, + sync::{Arc, Weak}, thread, time::Duration, }; @@ -34,7 +34,7 @@ use super::{ #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] struct IcmpNatKey { - dst_ip: std::net::IpAddr, + real_dst_ip: std::net::IpAddr, icmp_id: u16, icmp_seq: u16, } @@ -45,15 +45,22 @@ struct IcmpNatEntry { my_peer_id: PeerId, src_ip: IpAddr, start_time: std::time::Instant, + mapped_dst_ip: std::net::Ipv4Addr, } impl IcmpNatEntry { - fn new(src_peer_id: PeerId, my_peer_id: PeerId, src_ip: IpAddr) -> Result { + fn new( + src_peer_id: PeerId, + my_peer_id: PeerId, + src_ip: IpAddr, + mapped_dst_ip: Ipv4Addr, + ) -> Result { Ok(Self { src_peer_id, my_peer_id, src_ip, start_time: std::time::Instant::now(), + mapped_dst_ip, }) } } @@ -65,10 +72,10 @@ type NewPacketReceiver = tokio::sync::mpsc::UnboundedReceiver; #[derive(Debug)] pub struct IcmpProxy { global_ctx: ArcGlobalCtx, - peer_manager: Arc, + peer_manager: Weak, cidr_set: CidrSet, - socket: std::sync::Mutex>, + socket: std::sync::Mutex>>, nat_table: IcmpNatTable, @@ -78,7 +85,10 @@ pub struct IcmpProxy { icmp_sender: Arc>>>, } -fn socket_recv(socket: &Socket, buf: &mut [MaybeUninit]) -> Result<(usize, IpAddr), Error> { +fn socket_recv( + socket: &Socket, + buf: &mut [MaybeUninit], +) -> Result<(usize, IpAddr), std::io::Error> { let (size, addr) = socket.recv_from(buf)?; let addr = match addr.as_socket() { None => IpAddr::V4(Ipv4Addr::UNSPECIFIED), @@ -87,15 +97,32 @@ fn socket_recv(socket: &Socket, buf: &mut [MaybeUninit]) -> Result<(usize, I Ok((size, addr)) } -fn socket_recv_loop(socket: Socket, nat_table: IcmpNatTable, sender: UnboundedSender) { +fn socket_recv_loop( + socket: Arc, + nat_table: IcmpNatTable, + sender: UnboundedSender, +) { let mut buf = [0u8; 8192]; let data: &mut [MaybeUninit] = unsafe { std::mem::transmute(&mut buf[..]) }; loop { - let Ok((len, peer_ip)) = socket_recv(&socket, data) else { - continue; + let (len, peer_ip) = match socket_recv(&socket, data) { + Ok((len, peer_ip)) => (len, peer_ip), + Err(e) => { + tracing::error!("recv icmp packet failed: {:?}", e); + if sender.is_closed() { + break; + } else { + continue; + } + } }; + if len <= 0 { + tracing::error!("recv empty packet, len: {}", len); + return; + } + if !peer_ip.is_ipv4() { continue; } @@ -114,7 +141,7 @@ fn socket_recv_loop(socket: Socket, nat_table: IcmpNatTable, sender: UnboundedSe } let key = IcmpNatKey { - dst_ip: peer_ip, + real_dst_ip: peer_ip, icmp_id: icmp_packet.get_identifier(), icmp_seq: icmp_packet.get_sequence_number(), }; @@ -128,12 +155,11 @@ fn socket_recv_loop(socket: Socket, nat_table: IcmpNatTable, sender: UnboundedSe continue; }; - let src_v4 = ipv4_packet.get_source(); let payload_len = len - ipv4_packet.get_header_length() as usize * 4; let id = ipv4_packet.get_identification(); let _ = compose_ipv4_packet( &mut buf[..], - &src_v4, + &v.mapped_dst_ip, &dest_ip, IpNextHeaderProtocols::Icmp, payload_len, @@ -176,7 +202,7 @@ impl IcmpProxy { let cidr_set = CidrSet::new(global_ctx.clone()); let ret = Self { global_ctx, - peer_manager, + peer_manager: Arc::downgrade(&peer_manager), cidr_set, socket: std::sync::Mutex::new(None), @@ -208,7 +234,7 @@ impl IcmpProxy { let socket = self.create_raw_socket(); match socket { Ok(socket) => { - self.socket.lock().unwrap().replace(socket); + self.socket.lock().unwrap().replace(Arc::new(socket)); } Err(e) => { tracing::warn!("create icmp socket failed: {:?}", e); @@ -241,7 +267,7 @@ impl IcmpProxy { let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel(); self.icmp_sender.lock().unwrap().replace(sender.clone()); if let Some(socket) = self.socket.lock().unwrap().as_ref() { - let socket = socket.try_clone()?; + let socket = socket.clone(); let nat_table = self.nat_table.clone(); thread::spawn(|| { socket_recv_loop(socket, nat_table, sender); @@ -254,7 +280,11 @@ impl IcmpProxy { while let Some(msg) = receiver.recv().await { let hdr = msg.peer_manager_header().unwrap(); let to_peer_id = hdr.to_peer_id.into(); - let ret = peer_manager.send_msg(msg, to_peer_id).await; + let Some(pm) = peer_manager.upgrade() else { + tracing::warn!("peer manager is gone, icmp proxy send loop exit"); + return; + }; + let ret = pm.send_msg(msg, to_peer_id).await; if ret.is_err() { tracing::error!("send icmp packet to peer failed: {:?}", ret); } @@ -271,9 +301,12 @@ impl IcmpProxy { } }); - self.peer_manager - .add_packet_process_pipeline(Box::new(self.clone())) - .await; + let Some(pm) = self.peer_manager.upgrade() else { + tracing::warn!("peer manager is gone, icmp proxy init failed"); + return Err(anyhow::anyhow!("peer manager is gone").into()); + }; + + pm.add_packet_process_pipeline(Box::new(self.clone())).await; Ok(()) } @@ -361,7 +394,11 @@ impl IcmpProxy { return None; } - if !self.cidr_set.contains_v4(ipv4.get_destination()) + let mut real_dst_ip = ipv4.get_destination(); + + if !self + .cidr_set + .contains_v4(ipv4.get_destination(), &mut real_dst_ip) && !is_exit_node && !(self.global_ctx.no_tun() && Some(ipv4.get_destination()) @@ -416,7 +453,7 @@ impl IcmpProxy { let icmp_seq = icmp_packet.get_sequence_number(); let key = IcmpNatKey { - dst_ip: ipv4.get_destination().into(), + real_dst_ip: real_dst_ip.into(), icmp_id, icmp_seq, }; @@ -425,6 +462,7 @@ impl IcmpProxy { hdr.from_peer_id.into(), hdr.to_peer_id.into(), ipv4.get_source().into(), + ipv4.get_destination(), ) .ok()?; @@ -432,10 +470,24 @@ impl IcmpProxy { tracing::info!("icmp nat table entry replaced: {:?}", old); } - if let Err(e) = self.send_icmp_packet(ipv4.get_destination(), &icmp_packet) { + if let Err(e) = self.send_icmp_packet(real_dst_ip, &icmp_packet) { tracing::error!("send icmp packet failed: {:?}", e); } Some(()) } } + +impl Drop for IcmpProxy { + fn drop(&mut self) { + tracing::info!( + "dropping icmp proxy, {:?}", + self.socket.lock().unwrap().as_ref() + ); + self.socket.lock().unwrap().as_ref().and_then(|s| { + tracing::info!("shutting down icmp socket"); + let _ = s.shutdown(std::net::Shutdown::Both); + Some(()) + }); + } +} diff --git a/easytier/src/gateway/kcp_proxy.rs b/easytier/src/gateway/kcp_proxy.rs index 163b636..9d73f6a 100644 --- a/easytier/src/gateway/kcp_proxy.rs +++ b/easytier/src/gateway/kcp_proxy.rs @@ -107,7 +107,7 @@ async fn handle_kcp_output( #[derive(Debug, Clone)] pub struct NatDstKcpConnector { pub(crate) kcp_endpoint: Arc, - pub(crate) peer_mgr: Arc, + pub(crate) peer_mgr: Weak, } #[async_trait::async_trait] @@ -120,10 +120,14 @@ impl NatDstConnector for NatDstKcpConnector { dst: Some(nat_dst.into()), }; + let Some(peer_mgr) = self.peer_mgr.upgrade() else { + return Err(anyhow::anyhow!("peer manager is not available").into()); + }; + let (dst_peers, _) = match nat_dst { SocketAddr::V4(addr) => { let ip = addr.ip(); - self.peer_mgr.get_msg_dst_peer(&ip).await + peer_mgr.get_msg_dst_peer(&ip).await } SocketAddr::V6(_) => return Err(anyhow::anyhow!("ipv6 is not supported").into()), }; @@ -162,7 +166,7 @@ impl NatDstConnector for NatDstKcpConnector { retry_remain -= 1; let kcp_endpoint = self.kcp_endpoint.clone(); - let peer_mgr = self.peer_mgr.clone(); + let my_peer_id = peer_mgr.my_peer_id(); let dst_peer = dst_peers[0]; let conn_data_clone = conn_data.clone(); @@ -170,7 +174,7 @@ impl NatDstConnector for NatDstKcpConnector { kcp_endpoint .connect( Duration::from_secs(10), - peer_mgr.my_peer_id(), + my_peer_id, dst_peer, Bytes::from(conn_data_clone.encode_to_vec()), ) @@ -194,6 +198,7 @@ impl NatDstConnector for NatDstKcpConnector { _global_ctx: &GlobalCtx, hdr: &PeerManagerHeader, _ipv4: &Ipv4Packet, + _real_dst_ip: &mut Ipv4Addr, ) -> bool { return hdr.from_peer_id == hdr.to_peer_id; } @@ -301,7 +306,7 @@ impl KcpProxySrc { peer_manager.clone(), NatDstKcpConnector { kcp_endpoint: kcp_endpoint.clone(), - peer_mgr: peer_manager.clone(), + peer_mgr: Arc::downgrade(&peer_manager), }, ); @@ -342,6 +347,7 @@ pub struct KcpProxyDst { kcp_endpoint: Arc, peer_manager: Arc, proxy_entries: Arc>, + cidr_set: Arc, tasks: JoinSet<()>, } @@ -357,11 +363,12 @@ impl KcpProxyDst { output_receiver, false, )); - + let cidr_set = CidrSet::new(peer_manager.get_global_ctx()); Self { kcp_endpoint: Arc::new(kcp_endpoint), peer_manager, proxy_entries: Arc::new(DashMap::new()), + cidr_set: Arc::new(cidr_set), tasks, } } @@ -371,6 +378,7 @@ impl KcpProxyDst { mut kcp_stream: KcpStream, global_ctx: ArcGlobalCtx, proxy_entries: Arc>, + cidr_set: Arc, ) -> Result<()> { let mut conn_data = kcp_stream.conn_data().clone(); let parsed_conn_data = KcpConnData::decode(&mut conn_data) @@ -383,6 +391,16 @@ impl KcpProxyDst { ))? .into(); + match dst_socket.ip() { + IpAddr::V4(dst_v4_ip) => { + let mut real_ip = dst_v4_ip; + if cidr_set.contains_v4(dst_v4_ip, &mut real_ip) { + dst_socket.set_ip(real_ip.into()); + } + } + _ => {} + }; + let conn_id = kcp_stream.conn_id(); proxy_entries.insert( conn_id, @@ -424,6 +442,7 @@ impl KcpProxyDst { let kcp_endpoint = self.kcp_endpoint.clone(); let global_ctx = self.peer_manager.get_global_ctx().clone(); let proxy_entries = self.proxy_entries.clone(); + let cidr_set = self.cidr_set.clone(); self.tasks.spawn(async move { while let Ok(conn) = kcp_endpoint.accept().await { let stream = KcpStream::new(&kcp_endpoint, conn) @@ -432,8 +451,10 @@ impl KcpProxyDst { let global_ctx = global_ctx.clone(); let proxy_entries = proxy_entries.clone(); + let cidr_set = cidr_set.clone(); tokio::spawn(async move { - let _ = Self::handle_one_in_stream(stream, global_ctx, proxy_entries).await; + let _ = Self::handle_one_in_stream(stream, global_ctx, proxy_entries, cidr_set) + .await; }); } }); diff --git a/easytier/src/gateway/mod.rs b/easytier/src/gateway/mod.rs index 030b5b3..a3dfc7b 100644 --- a/easytier/src/gateway/mod.rs +++ b/easytier/src/gateway/mod.rs @@ -1,3 +1,4 @@ +use dashmap::DashMap; use std::sync::{Arc, Mutex}; use tokio::task::JoinSet; @@ -20,8 +21,10 @@ pub mod kcp_proxy; #[derive(Debug)] pub(crate) struct CidrSet { global_ctx: ArcGlobalCtx, - cidr_set: Arc>>, + cidr_set: Arc>>, tasks: JoinSet<()>, + + mapped_to_real: Arc>, } impl CidrSet { @@ -30,6 +33,8 @@ impl CidrSet { global_ctx, cidr_set: Arc::new(Mutex::new(vec![])), tasks: JoinSet::new(), + + mapped_to_real: Arc::new(DashMap::new()), }; ret.run_cidr_updater(); ret @@ -38,15 +43,23 @@ impl CidrSet { fn run_cidr_updater(&mut self) { let global_ctx = self.global_ctx.clone(); let cidr_set = self.cidr_set.clone(); + let mapped_to_real = self.mapped_to_real.clone(); self.tasks.spawn(async move { let mut last_cidrs = vec![]; loop { - let cidrs = global_ctx.get_proxy_cidrs(); + let cidrs = global_ctx.config.get_proxy_cidrs(); if cidrs != last_cidrs { last_cidrs = cidrs.clone(); + mapped_to_real.clear(); cidr_set.lock().unwrap().clear(); for cidr in cidrs.iter() { - cidr_set.lock().unwrap().push(cidr.clone()); + let real_cidr = cidr.cidr; + let mapped = cidr.mapped_cidr.unwrap_or(real_cidr.clone()); + cidr_set.lock().unwrap().push(mapped.clone()); + + if mapped != real_cidr { + mapped_to_real.insert(mapped.clone(), real_cidr.clone()); + } } } tokio::time::sleep(std::time::Duration::from_secs(1)).await; @@ -54,11 +67,23 @@ impl CidrSet { }); } - pub fn contains_v4(&self, ip: std::net::Ipv4Addr) -> bool { - let ip = ip.into(); + pub fn contains_v4(&self, ipv4: std::net::Ipv4Addr, real_ip: &mut std::net::Ipv4Addr) -> bool { + let ip = ipv4.into(); let s = self.cidr_set.lock().unwrap(); for cidr in s.iter() { if cidr.contains(&ip) { + if let Some(real_cidr) = self.mapped_to_real.get(&cidr).map(|v| v.value().clone()) { + let origin_network_bits = real_cidr.first().address().to_bits(); + let network_mask = cidr.mask().to_bits(); + + let mut converted_ip = ipv4.to_bits(); + converted_ip &= !network_mask; + converted_ip |= origin_network_bits; + + *real_ip = std::net::Ipv4Addr::from(converted_ip); + } else { + *real_ip = ipv4; + } return true; } } diff --git a/easytier/src/gateway/socks5.rs b/easytier/src/gateway/socks5.rs index 0d8c99c..5c39583 100644 --- a/easytier/src/gateway/socks5.rs +++ b/easytier/src/gateway/socks5.rs @@ -237,12 +237,9 @@ impl AsyncTcpConnector for Socks5KcpConnector { let Some(kcp_endpoint) = self.kcp_endpoint.upgrade() else { return Err(anyhow::anyhow!("kcp endpoint is not ready").into()); }; - let Some(peer_mgr) = self.peer_mgr.upgrade() else { - return Err(anyhow::anyhow!("peer mgr is not ready").into()); - }; let c = NatDstKcpConnector { kcp_endpoint, - peer_mgr, + peer_mgr: self.peer_mgr.clone(), }; println!("connect to kcp endpoint, addr = {:?}", addr); let ret = c diff --git a/easytier/src/gateway/tcp_proxy.rs b/easytier/src/gateway/tcp_proxy.rs index 520318e..ab10c83 100644 --- a/easytier/src/gateway/tcp_proxy.rs +++ b/easytier/src/gateway/tcp_proxy.rs @@ -52,6 +52,7 @@ pub(crate) trait NatDstConnector: Send + Sync + Clone + 'static { global_ctx: &GlobalCtx, hdr: &PeerManagerHeader, ipv4: &Ipv4Packet, + real_dst_ip: &mut Ipv4Addr, ) -> bool; fn transport_type(&self) -> TcpProxyEntryTransportType; } @@ -99,10 +100,11 @@ impl NatDstConnector for NatDstTcpConnector { global_ctx: &GlobalCtx, hdr: &PeerManagerHeader, ipv4: &Ipv4Packet, + real_dst_ip: &mut Ipv4Addr, ) -> bool { let is_exit_node = hdr.is_exit_node(); - if !cidr_set.contains_v4(ipv4.get_destination()) + if !cidr_set.contains_v4(ipv4.get_destination(), real_dst_ip) && !is_exit_node && !(global_ctx.no_tun() && Some(ipv4.get_destination()) @@ -125,7 +127,8 @@ type NatDstEntryState = TcpProxyEntryState; pub struct NatDstEntry { id: uuid::Uuid, src: SocketAddr, - dst: SocketAddr, + real_dst: SocketAddr, + mapped_dst: SocketAddr, start_time: Instant, start_time_local: chrono::DateTime, tasks: Mutex>, @@ -133,11 +136,12 @@ pub struct NatDstEntry { } impl NatDstEntry { - pub fn new(src: SocketAddr, dst: SocketAddr) -> Self { + pub fn new(src: SocketAddr, real_dst: SocketAddr, mapped_dst: SocketAddr) -> Self { Self { id: uuid::Uuid::new_v4(), src, - dst, + real_dst, + mapped_dst, start_time: Instant::now(), start_time_local: chrono::Local::now(), tasks: Mutex::new(JoinSet::new()), @@ -148,7 +152,7 @@ impl NatDstEntry { fn into_pb(&self, transport_type: TcpProxyEntryTransportType) -> TcpProxyEntry { TcpProxyEntry { src: Some(self.src.clone().into()), - dst: Some(self.dst.clone().into()), + dst: Some(self.real_dst.clone().into()), start_time: self.start_time_local.timestamp() as u64, state: self.state.load().into(), transport_type: transport_type.into(), @@ -396,7 +400,7 @@ impl NicPacketFilter for TcpProxy { drop(entry); assert_eq!(nat_entry.src, dst_addr); - let IpAddr::V4(ip) = nat_entry.dst.ip() else { + let IpAddr::V4(ip) = nat_entry.mapped_dst.ip() else { panic!("v4 nat entry src ip is not v4"); }; @@ -416,7 +420,7 @@ impl NicPacketFilter for TcpProxy { let dst = ip_packet.get_destination(); let mut tcp_packet = MutableTcpPacket::new(ip_packet.payload_mut()).unwrap(); - tcp_packet.set_source(nat_entry.dst.port()); + tcp_packet.set_source(nat_entry.real_dst.port()); Self::update_tcp_packet_checksum(&mut tcp_packet, &ip, &dst); drop(tcp_packet); @@ -537,7 +541,6 @@ impl TcpProxy { } } tracing::error!("smoltcp stack sink exited"); - panic!("smoltcp stack sink exited"); }); let peer_mgr = self.peer_manager.clone(); @@ -559,7 +562,6 @@ impl TcpProxy { } } tracing::error!("smoltcp stack stream exited"); - panic!("smoltcp stack stream exited"); }); let interface_config = smoltcp::iface::Config::new(smoltcp::wire::HardwareAddress::Ip); @@ -607,7 +609,7 @@ impl TcpProxy { let mut tcp_listener = self.get_proxy_listener().await?; let global_ctx = self.global_ctx.clone(); - let tasks = self.tasks.clone(); + let tasks = Arc::downgrade(&self.tasks); let syn_map = self.syn_map.clone(); let conn_map = self.conn_map.clone(); let addr_conn_map = self.addr_conn_map.clone(); @@ -644,7 +646,7 @@ impl TcpProxy { tracing::info!( ?socket_addr, "tcp connection accepted for proxy, nat dst: {:?}", - entry.dst + entry.real_dst ); assert_eq!(entry.state.load(), NatDstEntryState::SynReceived); @@ -658,6 +660,11 @@ impl TcpProxy { let old_nat_val = conn_map.insert(entry_clone.id, entry_clone.clone()); assert!(old_nat_val.is_none()); + let Some(tasks) = tasks.upgrade() else { + tracing::error!("tcp proxy tasks is dropped, exit accept loop"); + break; + }; + tasks.lock().unwrap().spawn(Self::connect_to_nat_dst( connector.clone(), global_ctx.clone(), @@ -697,14 +704,14 @@ impl TcpProxy { tracing::warn!("set_nodelay failed, ignore it: {:?}", e); } - let nat_dst = if Some(nat_entry.dst.ip()) + let nat_dst = if Some(nat_entry.real_dst.ip()) == global_ctx.get_ipv4().map(|ip| IpAddr::V4(ip.address())) { - format!("127.0.0.1:{}", nat_entry.dst.port()) + format!("127.0.0.1:{}", nat_entry.real_dst.port()) .parse() .unwrap() } else { - nat_entry.dst + nat_entry.real_dst }; let _guard = global_ctx.net_ns.guard(); @@ -818,10 +825,15 @@ impl TcpProxy { return None; } - if !self - .connector - .check_packet_from_peer(&self.cidr_set, &self.global_ctx, &hdr, &ipv4) - { + let mut real_dst_ip = ipv4.get_destination(); + + if !self.connector.check_packet_from_peer( + &self.cidr_set, + &self.global_ctx, + &hdr, + &ipv4, + &mut real_dst_ip, + ) { return None; } @@ -839,12 +851,13 @@ impl TcpProxy { if is_tcp_syn && !is_tcp_ack { let dest_ip = ip_packet.get_destination(); let dest_port = tcp_packet.get_destination(); - let dst = SocketAddr::V4(SocketAddrV4::new(dest_ip, dest_port)); + let mapped_dst = SocketAddr::V4(SocketAddrV4::new(dest_ip, dest_port)); + let real_dst = SocketAddr::V4(SocketAddrV4::new(real_dst_ip, dest_port)); let old_val = self .syn_map - .insert(src, Arc::new(NatDstEntry::new(src, dst))); - tracing::info!(src = ?src, dst = ?dst, old_entry = ?old_val, "tcp syn received"); + .insert(src, Arc::new(NatDstEntry::new(src, real_dst, mapped_dst))); + tracing::info!(src = ?src, ?real_dst, ?mapped_dst, old_entry = ?old_val, "tcp syn received"); } else if !self.addr_conn_map.contains_key(&src) && !self.syn_map.contains_key(&src) { // if not in syn map and addr conn map, may forwarding n2n packet return None; diff --git a/easytier/src/gateway/udp_proxy.rs b/easytier/src/gateway/udp_proxy.rs index 441e6b7..5e29f0f 100644 --- a/easytier/src/gateway/udp_proxy.rs +++ b/easytier/src/gateway/udp_proxy.rs @@ -139,6 +139,8 @@ impl UdpNatEntry { self: Arc, mut packet_sender: Sender, virtual_ipv4: Ipv4Addr, + real_ipv4: Ipv4Addr, + mapped_ipv4: Ipv4Addr, ) { let (s, mut r) = tachyonix::channel(128); @@ -197,6 +199,10 @@ impl UdpNatEntry { src_v4.set_ip(virtual_ipv4); } + if *src_v4.ip() == real_ipv4 { + src_v4.set_ip(mapped_ipv4); + } + let Ok(_) = Self::compose_ipv4_packet( &self_clone, &mut packet_sender, @@ -266,7 +272,10 @@ impl UdpProxy { return None; } - if !self.cidr_set.contains_v4(ipv4.get_destination()) + let mut real_dst_ip = ipv4.get_destination(); + if !self + .cidr_set + .contains_v4(ipv4.get_destination(), &mut real_dst_ip) && !is_exit_node && !(self.global_ctx.no_tun() && Some(ipv4.get_destination()) @@ -322,6 +331,8 @@ impl UdpProxy { nat_entry.clone(), self.sender.clone(), self.global_ctx.get_ipv4().map(|x| x.address())?, + real_dst_ip, + ipv4.get_destination(), ))); } @@ -335,7 +346,7 @@ impl UdpProxy { .parse() .unwrap() } else { - SocketAddr::new(ipv4.get_destination().into(), udp_packet.get_destination()) + SocketAddr::new(real_dst_ip.into(), udp_packet.get_destination()) }; let send_ret = { diff --git a/easytier/src/instance/instance.rs b/easytier/src/instance/instance.rs index c242cb8..99cc12d 100644 --- a/easytier/src/instance/instance.rs +++ b/easytier/src/instance/instance.rs @@ -7,13 +7,13 @@ use std::sync::{Arc, Weak}; use anyhow::Context; use cidr::{IpCidr, Ipv4Inet}; -use tokio::task::JoinHandle; use tokio::{sync::Mutex, task::JoinSet}; use tokio_util::sync::CancellationToken; use crate::common::config::ConfigLoader; use crate::common::error::Error; use crate::common::global_ctx::{ArcGlobalCtx, GlobalCtx, GlobalCtxEvent}; +use crate::common::scoped_task::ScopedTask; use crate::common::PeerId; use crate::connector::direct::DirectConnectorManager; use crate::connector::manual::{ConnectorManagerRpcService, ManualConnectorManager}; @@ -70,7 +70,7 @@ impl IpProxy { } async fn start(&self) -> Result<(), Error> { - if (self.global_ctx.get_proxy_cidrs().is_empty() + if (self.global_ctx.config.get_proxy_cidrs().is_empty() || self.started.load(Ordering::Relaxed)) && !self.global_ctx.enable_exit_node() && !self.global_ctx.no_tun() @@ -80,8 +80,7 @@ impl IpProxy { // Actually, if this node is enabled as an exit node, // we still can use the system stack to forward packets. - if self.global_ctx.proxy_forward_by_system() - && !self.global_ctx.no_tun() { + if self.global_ctx.proxy_forward_by_system() && !self.global_ctx.no_tun() { return Ok(()); } @@ -119,7 +118,7 @@ impl NicCtx { } struct MagicDnsContainer { - dns_runner_task: JoinHandle<()>, + dns_runner_task: ScopedTask<()>, dns_runner_cancel_token: CancellationToken, } @@ -140,7 +139,7 @@ impl NicCtxContainer { Self { nic_ctx: Some(Box::new(nic_ctx)), magic_dns: Some(MagicDnsContainer { - dns_runner_task: task, + dns_runner_task: task.into(), dns_runner_cancel_token: token, }), } @@ -400,7 +399,7 @@ impl Instance { // Warning, if there is an IP conflict in the network when using DHCP, the IP will be automatically changed. fn check_dhcp_ip_conflict(&self) { use rand::Rng; - let peer_manager_c = self.peer_manager.clone(); + let peer_manager_c = Arc::downgrade(&self.peer_manager.clone()); let global_ctx_c = self.get_global_ctx(); let nic_ctx = self.nic_ctx.clone(); let _peer_packet_receiver = self.peer_packet_receiver.clone(); @@ -411,6 +410,11 @@ impl Instance { loop { tokio::time::sleep(std::time::Duration::from_secs(next_sleep_time)).await; + let Some(peer_manager_c) = peer_manager_c.upgrade() else { + tracing::warn!("peer manager is dropped, stop dhcp check."); + return; + }; + // do not allocate ip if no peer connected let routes = peer_manager_c.list_routes().await; if routes.is_empty() { @@ -788,12 +792,56 @@ impl Instance { Self::use_new_nic_ctx(nic_ctx.clone(), new_nic_ctx, magic_dns_runner).await; Ok(()) } + + pub async fn clear_resources(&mut self) { + self.peer_manager.clear_resources().await; + let _ = self.nic_ctx.lock().await.take(); + if let Some(rpc_server) = self.rpc_server.take() { + rpc_server.registry().unregister_all(); + }; + } +} + +impl Drop for Instance { + fn drop(&mut self) { + let my_peer_id = self.peer_manager.my_peer_id(); + let pm = Arc::downgrade(&self.peer_manager); + let nic_ctx = self.nic_ctx.clone(); + if let Some(rpc_server) = self.rpc_server.take() { + rpc_server.registry().unregister_all(); + }; + tokio::spawn(async move { + nic_ctx.lock().await.take(); + if let Some(pm) = pm.upgrade() { + pm.clear_resources().await; + }; + + let now = std::time::Instant::now(); + while now.elapsed().as_secs() < 1 { + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + if pm.strong_count() == 0 { + tracing::info!( + "Instance for peer {} dropped, all resources cleared.", + my_peer_id + ); + return; + } + } + + debug_assert!( + false, + "Instance for peer {} dropped, but resources not cleared in 1 seconds.", + my_peer_id + ); + }); + } } #[cfg(test)] mod tests { - use crate::{instance::instance::InstanceRpcServerHook, proto::rpc_impl::standalone::RpcServerHook}; - + use crate::{ + instance::instance::InstanceRpcServerHook, proto::rpc_impl::standalone::RpcServerHook, + }; #[tokio::test] async fn test_rpc_portal_whitelist() { @@ -805,7 +853,7 @@ mod tests { expected_result: bool, } - let test_cases:Vec = vec![ + let test_cases: Vec = vec![ // Test default whitelist (127.0.0.0/8, ::1/128) TestCase { remote_url: "tcp://127.0.0.1:15888".to_string(), @@ -822,7 +870,6 @@ mod tests { whitelist: None, expected_result: false, }, - // Test custom whitelist TestCase { remote_url: "tcp://192.168.1.10:15888".to_string(), @@ -848,46 +895,35 @@ mod tests { ]), expected_result: false, }, - // Test empty whitelist (should reject all connections) TestCase { remote_url: "tcp://127.0.0.1:15888".to_string(), whitelist: Some(vec![]), expected_result: false, }, - // Test broad whitelist (0.0.0.0/0 and ::/0 accept all IP addresses) TestCase { remote_url: "tcp://8.8.8.8:15888".to_string(), - whitelist: Some(vec![ - "0.0.0.0/0".parse().unwrap(), - ]), + whitelist: Some(vec!["0.0.0.0/0".parse().unwrap()]), expected_result: true, }, - // Test edge case: specific IP whitelist TestCase { remote_url: "tcp://192.168.1.5:15888".to_string(), - whitelist: Some(vec![ - "192.168.1.5/32".parse().unwrap(), - ]), + whitelist: Some(vec!["192.168.1.5/32".parse().unwrap()]), expected_result: true, }, TestCase { remote_url: "tcp://192.168.1.6:15888".to_string(), - whitelist: Some(vec![ - "192.168.1.5/32".parse().unwrap(), - ]), + whitelist: Some(vec!["192.168.1.5/32".parse().unwrap()]), expected_result: false, }, - // Test invalid URL (this case will fail during URL parsing) TestCase { remote_url: "invalid-url".to_string(), whitelist: None, expected_result: false, }, - // Test URL without IP address (this case will fail during IP parsing) TestCase { remote_url: "tcp://localhost:15888".to_string(), @@ -907,11 +943,22 @@ mod tests { let result = hook.on_new_client(tunnel_info).await; if case.expected_result { - assert!(result.is_ok(), "Expected success for remote_url:{},whitelist:{:?},but got: {:?}", case.remote_url, case.whitelist, result); + assert!( + result.is_ok(), + "Expected success for remote_url:{},whitelist:{:?},but got: {:?}", + case.remote_url, + case.whitelist, + result + ); } else { - assert!(result.is_err(), "Expected failure for remote_url:{},whitelist:{:?},but got: {:?}", case.remote_url, case.whitelist, result); + assert!( + result.is_err(), + "Expected failure for remote_url:{},whitelist:{:?},but got: {:?}", + case.remote_url, + case.whitelist, + result + ); } } - } -} \ No newline at end of file +} diff --git a/easytier/src/instance/listeners.rs b/easytier/src/instance/listeners.rs index 26a2d97..20af6ff 100644 --- a/easytier/src/instance/listeners.rs +++ b/easytier/src/instance/listeners.rs @@ -1,4 +1,9 @@ -use std::{fmt::Debug, net::IpAddr, str::FromStr, sync::Arc}; +use std::{ + fmt::Debug, + net::IpAddr, + str::FromStr, + sync::{Arc, Weak}, +}; use anyhow::Context; use async_trait::async_trait; @@ -89,7 +94,7 @@ pub struct ListenerManager { global_ctx: ArcGlobalCtx, net_ns: NetNS, listeners: Vec, - peer_manager: Arc, + peer_manager: Weak, tasks: JoinSet<()>, } @@ -100,7 +105,7 @@ impl ListenerManage global_ctx: global_ctx.clone(), net_ns: global_ctx.net_ns.clone(), listeners: Vec::new(), - peer_manager, + peer_manager: Arc::downgrade(&peer_manager), tasks: JoinSet::new(), } } @@ -169,7 +174,7 @@ impl ListenerManage #[tracing::instrument(skip(creator))] async fn run_listener( creator: Arc, - peer_manager: Arc, + peer_manager: Weak, global_ctx: ArcGlobalCtx, ) { loop { @@ -221,6 +226,10 @@ impl ListenerManage let peer_manager = peer_manager.clone(); let global_ctx = global_ctx.clone(); tokio::spawn(async move { + let Some(peer_manager) = peer_manager.upgrade() else { + tracing::error!("peer manager is gone, cannot handle tunnel"); + return; + }; let server_ret = peer_manager.handle_tunnel(ret).await; if let Err(e) = &server_ret { global_ctx.issue_event(GlobalCtxEvent::ConnectionError( diff --git a/easytier/src/launcher.rs b/easytier/src/launcher.rs index 90e9ac0..4f4b899 100644 --- a/easytier/src/launcher.rs +++ b/easytier/src/launcher.rs @@ -135,8 +135,6 @@ impl EasyTierLauncher { fetch_node_info: bool, ) -> Result<(), anyhow::Error> { let mut instance = Instance::new(cfg); - let peer_mgr = instance.get_peer_manager(); - let mut tasks = JoinSet::new(); // Subscribe to global context events @@ -164,7 +162,7 @@ impl EasyTierLauncher { if fetch_node_info { let data_c = data.clone(); let global_ctx_c = instance.get_global_ctx(); - let peer_mgr_c = peer_mgr.clone(); + let peer_mgr_c = instance.get_peer_manager().clone(); let vpn_portal = instance.get_vpn_portal_inst(); tasks.spawn(async move { loop { @@ -210,6 +208,9 @@ impl EasyTierLauncher { tasks.abort_all(); drop(tasks); + instance.clear_resources().await; + drop(instance); + Ok(()) } @@ -455,6 +456,36 @@ impl NetworkInstance { } } +pub fn add_proxy_network_to_config( + proxy_network: &str, + cfg: &TomlConfigLoader, +) -> Result<(), anyhow::Error> { + let parts: Vec<&str> = proxy_network.split("->").collect(); + let real_cidr = parts[0] + .parse() + .with_context(|| format!("failed to parse proxy network: {}", parts[0]))?; + + if parts.len() > 2 { + return Err(anyhow::anyhow!( + "invalid proxy network format: {}, support format: or ->, example: + 10.0.0.0/24 or 10.0.0.0/24->192.168.0.0/24", + proxy_network + )); + } + + let mapped_cidr = if parts.len() == 2 { + Some( + parts[1] + .parse() + .with_context(|| format!("failed to parse mapped network: {}", parts[1]))?, + ) + } else { + None + }; + cfg.add_proxy_cidr(real_cidr, mapped_cidr); + Ok(()) +} + pub type NetworkingMethod = crate::proto::web::NetworkingMethod; pub type NetworkConfig = crate::proto::web::NetworkConfig; @@ -534,10 +565,7 @@ impl NetworkConfig { cfg.set_listeners(listener_urls); for n in self.proxy_cidrs.iter() { - cfg.add_proxy_cidr( - n.parse() - .with_context(|| format!("failed to parse proxy network: {}", n))?, - ); + add_proxy_network_to_config(n, &cfg)?; } cfg.set_rpc_portal( diff --git a/easytier/src/peer_center/instance.rs b/easytier/src/peer_center/instance.rs index 82499c2..d446024 100644 --- a/easytier/src/peer_center/instance.rs +++ b/easytier/src/peer_center/instance.rs @@ -1,6 +1,6 @@ use std::{ collections::BTreeSet, - sync::Arc, + sync::{Arc, Weak}, time::{Duration, Instant}, }; @@ -31,7 +31,8 @@ use crate::{ use super::{server::PeerCenterServer, Digest, Error}; struct PeerCenterBase { - peer_mgr: Arc, + peer_mgr: Weak, + my_peer_id: PeerId, tasks: Mutex>, lock: Arc>, } @@ -40,20 +41,25 @@ struct PeerCenterBase { static SERVICE_ID: u32 = 50; struct PeridicJobCtx { - peer_mgr: Arc, + peer_mgr: Weak, + my_peer_id: PeerId, center_peer: AtomicCell, job_ctx: T, } impl PeerCenterBase { pub async fn init(&self) -> Result<(), Error> { - self.peer_mgr + let Some(peer_mgr) = self.peer_mgr.upgrade() else { + return Err(Error::Shutdown); + }; + + peer_mgr .get_peer_rpc_mgr() .rpc_server() .registry() .register( - PeerCenterRpcServer::new(PeerCenterServer::new(self.peer_mgr.my_peer_id())), - &self.peer_mgr.get_global_ctx().get_network_name(), + PeerCenterRpcServer::new(PeerCenterServer::new(peer_mgr.my_peer_id())), + &peer_mgr.get_global_ctx().get_network_name(), ); Ok(()) } @@ -91,17 +97,23 @@ impl PeerCenterBase { + Sync + 'static), ) -> () { - let my_peer_id = self.peer_mgr.my_peer_id(); + let my_peer_id = self.my_peer_id; let peer_mgr = self.peer_mgr.clone(); let lock = self.lock.clone(); self.tasks.lock().await.spawn( async move { let ctx = Arc::new(PeridicJobCtx { peer_mgr: peer_mgr.clone(), + my_peer_id, center_peer: AtomicCell::new(PeerId::default()), job_ctx, }); loop { + let Some(peer_mgr) = peer_mgr.upgrade() else { + tracing::error!("peer manager is shutdown, exit periodic job"); + return; + }; + let Some(center_peer) = Self::select_center_peer(&peer_mgr).await else { tracing::trace!("no center peer found, sleep 1 second"); tokio::time::sleep(Duration::from_secs(1)).await; @@ -138,7 +150,8 @@ impl PeerCenterBase { pub fn new(peer_mgr: Arc) -> Self { PeerCenterBase { - peer_mgr, + peer_mgr: Arc::downgrade(&peer_mgr), + my_peer_id: peer_mgr.my_peer_id(), tasks: Mutex::new(JoinSet::new()), lock: Arc::new(Mutex::new(())), } @@ -289,7 +302,7 @@ impl PeerCenterInstance { self.client .init_periodic_job(ctx, |client, ctx| async move { - let my_node_id = ctx.peer_mgr.my_peer_id(); + let my_node_id = ctx.my_peer_id; let peers: PeerInfoForGlobalMap = ctx.job_ctx.service.list_peers().await.into(); let peer_list = peers.direct_peers.keys().map(|k| *k).collect(); let job_ctx = &ctx.job_ctx; diff --git a/easytier/src/peer_center/mod.rs b/easytier/src/peer_center/mod.rs index 71d83cd..f1e4ec7 100644 --- a/easytier/src/peer_center/mod.rs +++ b/easytier/src/peer_center/mod.rs @@ -19,6 +19,8 @@ pub enum Error { DigestMismatch, #[error("Not center server")] NotCenterServer, + #[error("Instance shutdown")] + Shutdown, } pub type Digest = u64; diff --git a/easytier/src/peers/peer_manager.rs b/easytier/src/peers/peer_manager.rs index ccfffde..7f251a5 100644 --- a/easytier/src/peers/peer_manager.rs +++ b/easytier/src/peers/peer_manager.rs @@ -1101,9 +1101,16 @@ impl PeerManager { .unwrap_or_default(), proxy_cidrs: self .global_ctx + .config .get_proxy_cidrs() .into_iter() - .map(|x| x.to_string()) + .map(|x| { + if x.mapped_cidr.is_none() { + x.cidr.to_string() + } else { + format!("{}->{}", x.cidr, x.mapped_cidr.unwrap()) + } + }) .collect(), hostname: self.global_ctx.get_hostname(), stun_info: Some(self.global_ctx.get_stun_info_collector().get_stun_info()), @@ -1133,6 +1140,15 @@ impl PeerManager { .map(|x| x.clone()) .unwrap_or_default() } + + pub async fn clear_resources(&self) { + let mut peer_pipeline = self.peer_packet_process_pipeline.write().await; + peer_pipeline.clear(); + let mut nic_pipeline = self.nic_packet_process_pipeline.write().await; + nic_pipeline.clear(); + + self.peer_rpc_mgr.rpc_server().registry().unregister_all(); + } } #[cfg(test)] diff --git a/easytier/src/peers/peer_ospf_route.rs b/easytier/src/peers/peer_ospf_route.rs index 8a641a9..5f92aab 100644 --- a/easytier/src/peers/peer_ospf_route.rs +++ b/easytier/src/peers/peer_ospf_route.rs @@ -139,10 +139,12 @@ impl RoutePeerInfo { cost: 0, ipv4_addr: global_ctx.get_ipv4().map(|x| x.address().into()), proxy_cidrs: global_ctx + .config .get_proxy_cidrs() .iter() + .map(|x| x.mapped_cidr.unwrap_or(x.cidr)) + .chain(global_ctx.get_vpn_portal_cidr()) .map(|x| x.to_string()) - .chain(global_ctx.get_vpn_portal_cidr().map(|x| x.to_string())) .collect(), hostname: Some(global_ctx.get_hostname()), udp_stun_info: global_ctx diff --git a/easytier/src/proto/rpc_impl/service_registry.rs b/easytier/src/proto/rpc_impl/service_registry.rs index 1ca440d..6c970cf 100644 --- a/easytier/src/proto/rpc_impl/service_registry.rs +++ b/easytier/src/proto/rpc_impl/service_registry.rs @@ -96,6 +96,10 @@ impl ServiceRegistry { self.table.retain(|k, _| k.domain_name != domain_name); } + pub fn unregister_all(&self) { + self.table.clear(); + } + pub async fn call_method( &self, rpc_desc: RpcDescriptor, diff --git a/easytier/src/tests/three_node.rs b/easytier/src/tests/three_node.rs index dc8a110..e308999 100644 --- a/easytier/src/tests/three_node.rs +++ b/easytier/src/tests/three_node.rs @@ -188,6 +188,24 @@ pub async fn init_three_node_ex TomlConfigLoader>( vec![inst1, inst2, inst3] } +pub async fn drop_insts(insts: Vec) { + let mut set = JoinSet::new(); + for mut inst in insts { + set.spawn(async move { + inst.clear_resources().await; + let pm = Arc::downgrade(&inst.get_peer_manager()); + drop(inst); + let now = std::time::Instant::now(); + while now.elapsed().as_secs() < 5 && pm.strong_count() > 0 { + tokio::time::sleep(std::time::Duration::from_millis(50)).await; + } + + debug_assert_eq!(pm.strong_count(), 0, "PeerManager should be dropped"); + }); + } + while let Some(_) = set.join_next().await {} +} + async fn ping_test(from_netns: &str, target_ip: &str, payload_size: Option) -> bool { let _g = NetNS::new(Some(ROOT_NETNS_NAME.to_owned())).guard(); let code = tokio::process::Command::new("ip") @@ -233,14 +251,17 @@ pub async fn basic_three_node_test(#[values("tcp", "udp", "wg", "ws", "wss")] pr Duration::from_secs(5000), ) .await; + + drop_insts(insts).await; } -async fn subnet_proxy_test_udp() { +async fn subnet_proxy_test_udp(target_ip: &str) { use crate::tunnel::{common::tests::_tunnel_pingpong_netns, udp::UdpTunnelListener}; use rand::Rng; let udp_listener = UdpTunnelListener::new("udp://10.1.2.4:22233".parse().unwrap()); - let udp_connector = UdpTunnelConnector::new("udp://10.1.2.4:22233".parse().unwrap()); + let udp_connector = + UdpTunnelConnector::new(format!("udp://{}:22233", target_ip).parse().unwrap()); // NOTE: this should not excced udp tunnel max buffer size let mut buf = vec![0; 7 * 1024]; @@ -257,7 +278,8 @@ async fn subnet_proxy_test_udp() { // no fragment let udp_listener = UdpTunnelListener::new("udp://10.1.2.4:22233".parse().unwrap()); - let udp_connector = UdpTunnelConnector::new("udp://10.1.2.4:22233".parse().unwrap()); + let udp_connector = + UdpTunnelConnector::new(format!("udp://{}:22233", target_ip).parse().unwrap()); let mut buf = vec![0; 1 * 1024]; rand::thread_rng().fill(&mut buf[..]); @@ -305,12 +327,13 @@ async fn subnet_proxy_test_udp() { .await; } -async fn subnet_proxy_test_tcp() { +async fn subnet_proxy_test_tcp(target_ip: &str) { use crate::tunnel::{common::tests::_tunnel_pingpong_netns, tcp::TcpTunnelListener}; use rand::Rng; let tcp_listener = TcpTunnelListener::new("tcp://10.1.2.4:22223".parse().unwrap()); - let tcp_connector = TcpTunnelConnector::new("tcp://10.1.2.4:22223".parse().unwrap()); + let tcp_connector = + TcpTunnelConnector::new(format!("tcp://{}:22223", target_ip).parse().unwrap()); let mut buf = vec![0; 32]; rand::thread_rng().fill(&mut buf[..]); @@ -341,15 +364,15 @@ async fn subnet_proxy_test_tcp() { .await; } -async fn subnet_proxy_test_icmp() { +async fn subnet_proxy_test_icmp(target_ip: &str) { wait_for_condition( - || async { ping_test("net_a", "10.1.2.4", None).await }, + || async { ping_test("net_a", target_ip, None).await }, Duration::from_secs(5), ) .await; wait_for_condition( - || async { ping_test("net_a", "10.1.2.4", Some(5 * 1024)).await }, + || async { ping_test("net_a", target_ip, Some(5 * 1024)).await }, Duration::from_secs(5), ) .await; @@ -369,8 +392,8 @@ async fn subnet_proxy_test_icmp() { } #[rstest::rstest] -#[tokio::test] #[serial_test::serial] +#[tokio::test] pub async fn subnet_proxy_three_node_test( #[values("tcp", "udp", "wg")] proto: &str, #[values(true, false)] no_tun: bool, @@ -378,6 +401,7 @@ pub async fn subnet_proxy_three_node_test( #[values(true, false)] enable_kcp_proxy: bool, #[values(true, false)] disable_kcp_input: bool, #[values(true, false)] dst_enable_kcp_proxy: bool, + #[values(true, false)] test_mapped_cidr: bool, ) { let insts = init_three_node_ex( proto, @@ -388,7 +412,14 @@ pub async fn subnet_proxy_three_node_test( flags.disable_kcp_input = disable_kcp_input; flags.enable_kcp_proxy = dst_enable_kcp_proxy; cfg.set_flags(flags); - cfg.add_proxy_cidr("10.1.2.0/24".parse().unwrap()); + cfg.add_proxy_cidr( + "10.1.2.0/24".parse().unwrap(), + if test_mapped_cidr { + Some("10.1.3.0/24".parse().unwrap()) + } else { + None + }, + ); } if cfg.get_inst_name() == "inst2" && relay_by_public_server { @@ -410,19 +441,31 @@ pub async fn subnet_proxy_three_node_test( ) .await; - assert_eq!(insts[2].get_global_ctx().get_proxy_cidrs().len(), 1); + assert_eq!(insts[2].get_global_ctx().config.get_proxy_cidrs().len(), 1); wait_proxy_route_appear( &insts[0].get_peer_manager(), "10.144.144.3/24", insts[2].peer_id(), - "10.1.2.0/24", + if test_mapped_cidr { + "10.1.3.0/24" + } else { + "10.1.2.0/24" + }, ) .await; - subnet_proxy_test_icmp().await; - subnet_proxy_test_tcp().await; - subnet_proxy_test_udp().await; + let target_ip = if test_mapped_cidr { + "10.1.3.4" + } else { + "10.1.2.4" + }; + + subnet_proxy_test_icmp(target_ip).await; + subnet_proxy_test_tcp(target_ip).await; + subnet_proxy_test_udp(target_ip).await; + + drop_insts(insts).await; } #[rstest::rstest] @@ -464,6 +507,8 @@ pub async fn data_compress( Duration::from_secs(5), ) .await; + + drop_insts(_insts).await; } #[cfg(feature = "wireguard")] @@ -577,6 +622,8 @@ pub async fn proxy_three_node_disconnect_test(#[values("tcp", "wg")] proto: &str set_link_status("net_d", true); } + + drop_insts(insts).await; }); let (ret,) = tokio::join!(task); @@ -630,6 +677,8 @@ pub async fn udp_broadcast_test() { tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; assert_eq!(counter.load(std::sync::atomic::Ordering::Relaxed), 2); + + drop_insts(_insts).await; } #[tokio::test] @@ -678,6 +727,8 @@ pub async fn foreign_network_forward_nic_data() { Duration::from_secs(5), ) .await; + + drop_insts(vec![center_inst, inst1, inst2]).await; } use std::{net::SocketAddr, str::FromStr}; @@ -778,6 +829,8 @@ pub async fn wireguard_vpn_portal() { Duration::from_secs(5), ) .await; + + drop_insts(insts).await; } #[cfg(feature = "wireguard")] @@ -837,6 +890,8 @@ pub async fn socks5_vpn_portal(#[values("10.144.144.1", "10.144.144.3")] dst_add drop(conn); tokio::join!(task).0.unwrap(); + + drop_insts(_insts).await; } #[tokio::test] @@ -886,6 +941,7 @@ pub async fn foreign_network_functional_cluster() { let peer_map_inst1 = inst1.get_peer_manager(); println!("inst1 peer map: {:?}", peer_map_inst1.list_routes().await); + drop(peer_map_inst1); wait_for_condition( || async { ping_test("net_c", "10.144.145.2", None).await }, @@ -905,6 +961,8 @@ pub async fn foreign_network_functional_cluster() { Duration::from_secs(5), ) .await; + + drop_insts(vec![center_inst1, center_inst2, inst1, inst2]).await; } #[rstest::rstest] @@ -974,6 +1032,9 @@ pub async fn manual_reconnector(#[values(true, false)] is_foreign: bool) { Duration::from_secs(5), ) .await; + + drop(peer_map); + drop_insts(vec![center_inst, inst1, inst2]).await; } #[rstest::rstest] @@ -1017,7 +1078,7 @@ pub async fn port_forward_test( }, ]); } else if cfg.get_inst_name() == "inst3" { - cfg.add_proxy_cidr("10.1.2.0/24".parse().unwrap()); + cfg.add_proxy_cidr("10.1.2.0/24".parse().unwrap(), None); } let mut flags = cfg.get_flags(); flags.no_tun = no_tun; @@ -1093,4 +1154,6 @@ pub async fn port_forward_test( buf, ) .await; + + drop_insts(_insts).await; }