From 3ffa6214cabb7a175f76e578f4e67b260441556a Mon Sep 17 00:00:00 2001 From: "Sijie.Sun" Date: Sun, 19 Oct 2025 15:46:51 +0800 Subject: [PATCH] fix subnet proxy deadloop (#1492) * use LPM to determine subnet proxy dst. * never allow subnet proxy traffic sending to self. --- Cargo.lock | 26 ++- .../easytier-android-jni/Cargo.toml | 2 +- .../easytier-android-jni/build.sh | 1 + easytier/Cargo.toml | 3 +- easytier/src/gateway/socks5.rs | 5 +- easytier/src/gateway/tcp_proxy.rs | 5 +- easytier/src/instance/virtual_nic.rs | 44 +++- easytier/src/peers/peer_manager.rs | 25 +- easytier/src/peers/peer_ospf_route.rs | 214 +++++++++++++++--- easytier/src/tests/three_node.rs | 108 ++++++++- easytier/src/tunnel/packet_def.rs | 18 ++ easytier/src/vpn_portal/wireguard.rs | 6 +- 12 files changed, 393 insertions(+), 64 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 25c00ae..06603d7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1273,9 +1273,9 @@ dependencies = [ [[package]] name = "cidr" -version = "0.2.3" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6bdf600c45bd958cf2945c445264471cca8b6c8e67bc87b71affd6d7e5682621" +checksum = "bd1b64030216239a2e7c364b13cd96a2097ebf0dfe5025f2dedee14a23f2ab60" dependencies = [ "serde", ] @@ -2176,6 +2176,7 @@ dependencies = [ "petgraph 0.8.1", "pin-project-lite", "pnet", + "prefix-trie", "prost", "prost-build", "prost-reflect", @@ -6182,6 +6183,7 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "85cf4c7c25f1dd66c76b451e9041a8cfce26e4ca754934fa7aed8d5a59a01d20" dependencies = [ + "cidr", "ipnet", "num-traits", ] @@ -7507,10 +7509,11 @@ dependencies = [ [[package]] name = "serde" -version = "1.0.207" +version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5665e14a49a4ea1b91029ba7d3bca9f299e1f7cfa194388ccc20f14743e784f2" +checksum = "9a8e94ea7f378bd32cbbd37198a4a91436180c5bb472411e48b5ec2e2124ae9e" dependencies = [ + "serde_core", "serde_derive", ] @@ -7526,10 +7529,19 @@ dependencies = [ ] [[package]] -name = "serde_derive" -version = "1.0.207" +name = "serde_core" +version = "1.0.228" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6aea2634c86b0e8ef2cfdc0c340baede54ec27b1e46febd7f80dffb2aa44a00e" +checksum = "41d385c7d4ca58e59fc732af25c3983b67ac852c1a25000afe1175de458b67ad" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.228" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" dependencies = [ "proc-macro2", "quote", diff --git a/easytier-contrib/easytier-android-jni/Cargo.toml b/easytier-contrib/easytier-android-jni/Cargo.toml index 86aa083..e2b9465 100644 --- a/easytier-contrib/easytier-android-jni/Cargo.toml +++ b/easytier-contrib/easytier-android-jni/Cargo.toml @@ -11,6 +11,6 @@ jni = "0.21" once_cell = "1.18.0" log = "0.4" android_logger = "0.13" -serde = { version = "1.0", features = ["derive"] } +serde = { version = "1.0.220", features = ["derive"] } serde_json = "1.0" easytier = { path = "../../easytier" } \ No newline at end of file diff --git a/easytier-contrib/easytier-android-jni/build.sh b/easytier-contrib/easytier-android-jni/build.sh index 4bb1ea0..46169b9 100755 --- a/easytier-contrib/easytier-android-jni/build.sh +++ b/easytier-contrib/easytier-android-jni/build.sh @@ -85,6 +85,7 @@ build_for_target() { rust_target="${TARGET_MAP[$android_target]}" mkdir -p "$OUTPUT_DIR/$android_target" cp "$REPO_ROOT/target/$rust_target/release/libeasytier_android_jni.so" "$OUTPUT_DIR/$android_target/" + cp "$REPO_ROOT/target/$rust_target/release/libeasytier_ffi.so" "$OUTPUT_DIR/$android_target/" echo -e "${GREEN}库文件已复制到: $OUTPUT_DIR/$android_target/${NC}" } diff --git a/easytier/Cargo.toml b/easytier/Cargo.toml index 4fd23a9..de31dea 100644 --- a/easytier/Cargo.toml +++ b/easytier/Cargo.toml @@ -114,8 +114,9 @@ idna = "1.0" byteorder = "1.5.0" # for proxy -cidr = { version = "0.2.2", features = ["serde"] } +cidr = { version = "0.3.1", features = ["serde"] } socket2 = { version = "0.5.10", features = ["all"] } +prefix-trie = { version = "0.7.0", features = ["cidr"] } # for hole punching stun_codec = "0.3.4" diff --git a/easytier/src/gateway/socks5.rs b/easytier/src/gateway/socks5.rs index 13c1876..67a98b2 100644 --- a/easytier/src/gateway/socks5.rs +++ b/easytier/src/gateway/socks5.rs @@ -322,7 +322,10 @@ impl Socks5ServerNet { let dst = ipv4.get_destination(); let packet = ZCPacket::new_with_payload(&data); - if let Err(e) = peer_manager.send_msg_by_ip(packet, IpAddr::V4(dst)).await { + if let Err(e) = peer_manager + .send_msg_by_ip(packet, IpAddr::V4(dst), false) + .await + { tracing::error!("send to peer failed in smoltcp sender: {:?}", e); } } diff --git a/easytier/src/gateway/tcp_proxy.rs b/easytier/src/gateway/tcp_proxy.rs index 819d5bd..74b2dd2 100644 --- a/easytier/src/gateway/tcp_proxy.rs +++ b/easytier/src/gateway/tcp_proxy.rs @@ -565,7 +565,10 @@ impl TcpProxy { let dst = ipv4.get_destination(); let packet = ZCPacket::new_with_payload(&data); - if let Err(e) = peer_mgr.send_msg_by_ip(packet, IpAddr::V4(dst)).await { + if let Err(e) = peer_mgr + .send_msg_by_ip(packet, IpAddr::V4(dst), false) + .await + { tracing::error!("send to peer failed in smoltcp sender: {:?}", e); } } diff --git a/easytier/src/instance/virtual_nic.rs b/easytier/src/instance/virtual_nic.rs index 84bdfd0..c0223c0 100644 --- a/easytier/src/instance/virtual_nic.rs +++ b/easytier/src/instance/virtual_nic.rs @@ -688,13 +688,42 @@ impl NicCtx { return; } let dst_ipv4 = ipv4.get_destination(); + let src_ipv4 = ipv4.get_source(); + let my_ipv4 = mgr.get_global_ctx().get_ipv4().map(|x| x.address()); tracing::trace!( ?ret, + ?src_ipv4, + ?dst_ipv4, "[USER_PACKET] recv new packet from tun device and forward to peers." ); - // TODO: use zero-copy - let send_ret = mgr.send_msg_by_ip(ret, IpAddr::V4(dst_ipv4)).await; + // Subnet A is proxied as 10.0.0.0/24, and Subnet B is also proxied as 10.0.0.0/24. + // + // Subnet A has received a route advertised by Subnet B. As a result, A can reach + // the physical subnet 10.0.0.0/24 directly and has also added a virtual route for + // the same subnet 10.0.0.0/24. However, the physical route has a higher priority + // (lower metric) than the virtual one. + // + // When A sends a UDP packet to a non-existent IP within this subnet, the packet + // cannot be delivered on the physical network and is instead routed to the virtual + // network interface. + // + // The virtual interface receives the packet and forwards it to itself, which triggers + // the subnet proxy logic. The subnet proxy then attempts to send another packet to + // the same destination address, causing the same process to repeat and creating an + // infinite loop. Therefore, we must avoid re-sending packets back to ourselves + // when the subnet proxy itself is the originator of the packet. + // + // However, there is a special scenario to consider: when A acts as a gateway, + // packets from devices behind A may be forwarded by the OS to the ET (e.g., an + // eBPF or tunneling component), which happens to proxy the subnet. In this case, + // the packet’s source IP is not A’s own IP, and we must allow such packets to be + // sent to the virtual interface (i.e., "sent to ourselves") to maintain correct + // forwarding behavior. Thus, loop prevention should only apply when the source IP + // belongs to the local host. + let send_ret = mgr + .send_msg_by_ip(ret, IpAddr::V4(dst_ipv4), Some(src_ipv4) == my_ipv4) + .await; if send_ret.is_err() { tracing::trace!(?send_ret, "[USER_PACKET] send_msg failed") } @@ -711,20 +740,23 @@ impl NicCtx { } let src_ipv6 = ipv6.get_source(); let dst_ipv6 = ipv6.get_destination(); + let my_ipv6 = mgr.get_global_ctx().get_ipv6().map(|x| x.address()); tracing::trace!( ?ret, + ?src_ipv6, + ?dst_ipv6, "[USER_PACKET] recv new packet from tun device and forward to peers." ); - if src_ipv6.is_unicast_link_local() - && Some(src_ipv6) != mgr.get_global_ctx().get_ipv6().map(|x| x.address()) - { + if src_ipv6.is_unicast_link_local() && Some(src_ipv6) != my_ipv6 { // do not route link local packet to other nodes unless the address is assigned by user return; } // TODO: use zero-copy - let send_ret = mgr.send_msg_by_ip(ret, IpAddr::V6(dst_ipv6)).await; + let send_ret = mgr + .send_msg_by_ip(ret, IpAddr::V6(dst_ipv6), Some(src_ipv6) == my_ipv6) + .await; if send_ret.is_err() { tracing::trace!(?send_ret, "[USER_PACKET] send_msg failed") } diff --git a/easytier/src/peers/peer_manager.rs b/easytier/src/peers/peer_manager.rs index c67c7a7..8b222f9 100644 --- a/easytier/src/peers/peer_manager.rs +++ b/easytier/src/peers/peer_manager.rs @@ -791,7 +791,7 @@ impl PeerManager { impl PeerPacketFilter for NicPacketProcessor { async fn try_process_packet_from_peer(&self, packet: ZCPacket) -> Option { let hdr = packet.peer_manager_header().unwrap(); - if hdr.packet_type == PacketType::Data as u8 { + if hdr.packet_type == PacketType::Data as u8 && !hdr.is_not_send_to_tun() { tracing::trace!(?packet, "send packet to nic channel"); // TODO: use a function to get the body ref directly for zero copy let _ = self.nic_channel.send(packet).await; @@ -1150,7 +1150,12 @@ impl PeerManager { Ok(()) } - pub async fn send_msg_by_ip(&self, mut msg: ZCPacket, ip_addr: IpAddr) -> Result<(), Error> { + pub async fn send_msg_by_ip( + &self, + mut msg: ZCPacket, + ip_addr: IpAddr, + not_send_to_self: bool, + ) -> Result<(), Error> { tracing::trace!( "do send_msg in peer manager, msg: {:?}, ip_addr: {}", msg, @@ -1210,10 +1215,14 @@ impl PeerManager { msg.clone().unwrap() }; - msg.mut_peer_manager_header() - .unwrap() - .to_peer_id - .set(*peer_id); + let hdr = msg.mut_peer_manager_header().unwrap(); + hdr.to_peer_id.set(*peer_id); + + if not_send_to_self && *peer_id == self.my_peer_id { + // the packet may be sent to vpn portal, so we just set flags instead of drop it + hdr.set_not_send_to_tun(true); + hdr.set_no_proxy(true); + } self.self_tx_counters .self_tx_bytes @@ -1295,6 +1304,10 @@ impl PeerManager { self.global_ctx.clone() } + pub fn get_global_ctx_ref(&self) -> &ArcGlobalCtx { + &self.global_ctx + } + pub fn get_nic_channel(&self) -> PacketRecvChan { self.nic_channel.clone() } diff --git a/easytier/src/peers/peer_ospf_route.rs b/easytier/src/peers/peer_ospf_route.rs index 4756af0..8694745 100644 --- a/easytier/src/peers/peer_ospf_route.rs +++ b/easytier/src/peers/peer_ospf_route.rs @@ -1,9 +1,7 @@ use std::{ - collections::{ - HashMap, {BTreeMap, BTreeSet}, - }, + collections::{BTreeMap, BTreeSet, HashMap}, fmt::Debug, - net::{Ipv4Addr, Ipv6Addr}, + net::{IpAddr, Ipv4Addr, Ipv6Addr}, sync::{ atomic::{AtomicBool, AtomicU32, Ordering}, Arc, Weak, @@ -11,6 +9,8 @@ use std::{ time::{Duration, Instant, SystemTime}, }; +use arc_swap::ArcSwap; +use cidr::{IpCidr, Ipv4Cidr, Ipv6Cidr}; use crossbeam::atomic::AtomicCell; use dashmap::DashMap; use petgraph::{ @@ -19,6 +19,7 @@ use petgraph::{ visit::{EdgeRef, IntoNodeReferences}, Directed, }; +use prefix_trie::PrefixMap; use prost::Message; use prost_reflect::{DynamicMessage, ReflectMessage}; use serde::{Deserialize, Serialize}; @@ -755,7 +756,8 @@ struct RouteTable { next_hop_map: NextHopMap, ipv4_peer_id_map: DashMap, ipv6_peer_id_map: DashMap, - cidr_peer_id_map: DashMap, + cidr_peer_id_map: ArcSwap>, + cidr_v6_peer_id_map: ArcSwap>, next_hop_map_version: AtomicVersion, } @@ -766,7 +768,8 @@ impl RouteTable { next_hop_map: DashMap::new(), ipv4_peer_id_map: DashMap::new(), ipv6_peer_id_map: DashMap::new(), - cidr_peer_id_map: DashMap::new(), + cidr_peer_id_map: ArcSwap::new(Arc::new(PrefixMap::new())), + cidr_v6_peer_id_map: ArcSwap::new(Arc::new(PrefixMap::new())), next_hop_map_version: AtomicVersion::new(), } } @@ -867,16 +870,11 @@ impl RouteTable { // remove ipv6 map for peers we cannot reach. self.next_hop_map.contains_key(&v.peer_id) }); - self.cidr_peer_id_map.retain(|_, v| { - // remove cidr map for peers we cannot reach. - self.next_hop_map.contains_key(&v.peer_id) - }); shrink_dashmap(&self.peer_infos, None); shrink_dashmap(&self.next_hop_map, None); shrink_dashmap(&self.ipv4_peer_id_map, None); shrink_dashmap(&self.ipv6_peer_id_map, None); - shrink_dashmap(&self.cidr_peer_id_map, None); } fn gen_next_hop_map_with_least_hop( @@ -975,6 +973,9 @@ impl RouteTable { self.gen_next_hop_map_with_least_cost(&graph, &start_node, version); }; + let mut new_cidr_prefix_trie = PrefixMap::new(); + let mut new_cidr_v6_prefix_trie = PrefixMap::new(); + // build peer_infos, ipv4_peer_id_map, cidr_peer_id_map // only set map for peers we can reach. for item in self.next_hop_map.iter() { @@ -1030,28 +1031,69 @@ impl RouteTable { } for cidr in info.proxy_cidrs.iter() { - self.cidr_peer_id_map - .entry(cidr.parse().unwrap()) - .and_modify(|v| { - if is_new_peer_better(v) { - // if the next hop is not set or the new next hop is better, update it. - *v = peer_id_and_version; - } - }) - .or_insert(peer_id_and_version); + let cidr = cidr.parse::(); + match cidr { + Ok(IpCidr::V4(cidr)) => { + new_cidr_prefix_trie + .entry(cidr) + .and_modify(|e| { + // if ourself has same cidr, ensure here put my peer id, so we can know deadloop may happen. + if *peer_id == my_peer_id || is_new_peer_better(e) { + *e = peer_id_and_version; + } + }) + .or_insert(peer_id_and_version); + } + + Ok(IpCidr::V6(cidr)) => { + new_cidr_v6_prefix_trie + .entry(cidr) + .and_modify(|e| { + // if ourself has same cidr, ensure here put my peer id, so we can know deadloop may happen. + if *peer_id == my_peer_id || is_new_peer_better(e) { + *e = peer_id_and_version; + } + }) + .or_insert(peer_id_and_version); + } + + _ => { + tracing::warn!("invalid proxy cidr: {:?}, from peer: {:?}", cidr, peer_id); + } + } + tracing::debug!( + "add cidr: {:?} to peer: {:?}, my peer id: {:?}", + cidr, + peer_id, + my_peer_id + ); } } + + self.cidr_peer_id_map.store(Arc::new(new_cidr_prefix_trie)); + self.cidr_v6_peer_id_map + .store(Arc::new(new_cidr_v6_prefix_trie)); + tracing::trace!( + my_peer_id = my_peer_id, + cidrs = ?self.cidr_peer_id_map.load(), + cidrs_v6 = ?self.cidr_v6_peer_id_map.load(), + "update peer cidr map" + ); } - fn get_peer_id_for_proxy(&self, ipv4: &Ipv4Addr) -> Option { - let ipv4 = std::net::IpAddr::V4(*ipv4); - for item in self.cidr_peer_id_map.iter() { - let (k, v) = item.pair(); - if k.contains(&ipv4) { - return Some(v.peer_id); - } + fn get_peer_id_for_proxy(&self, ip: &IpAddr) -> Option { + match ip { + IpAddr::V4(ipv4) => self + .cidr_peer_id_map + .load() + .get_lpm(&Ipv4Cidr::new(*ipv4, 32).unwrap()) + .map(|x| x.1.peer_id), + IpAddr::V6(ipv6) => self + .cidr_v6_peer_id_map + .load() + .get_lpm(&Ipv6Cidr::new(*ipv6, 128).unwrap()) + .map(|x| x.1.peer_id), } - None } } @@ -2541,7 +2583,7 @@ impl Route for PeerRoute { return None; } - if let Some(peer_id) = route_table.get_peer_id_for_proxy(ipv4_addr) { + if let Some(peer_id) = route_table.get_peer_id_for_proxy(&IpAddr::V4(*ipv4_addr)) { return Some(peer_id); } @@ -2555,10 +2597,18 @@ impl Route for PeerRoute { return Some(p.peer_id); } - // TODO: Add proxy support for IPv6 similar to IPv4 - // if let Some(peer_id) = route_table.get_peer_id_for_proxy_ipv6(ipv6_addr) { - // return Some(peer_id); - // } + // only get peer id for proxy when the dst ipv4 is not in same network with us + if self + .global_ctx + .is_ip_in_same_network(&std::net::IpAddr::V6(*ipv6_addr)) + { + tracing::trace!(?ipv6_addr, "ipv6 addr is in same network with us"); + return None; + } + + if let Some(peer_id) = route_table.get_peer_id_for_proxy(&IpAddr::V6(*ipv6_addr)) { + return Some(peer_id); + } tracing::debug!(?ipv6_addr, "no peer id for ipv6"); None @@ -2656,6 +2706,7 @@ mod tests { use cidr::{Ipv4Cidr, Ipv4Inet, Ipv6Inet}; use dashmap::DashMap; + use prefix_trie::PrefixMap; use prost_reflect::{DynamicMessage, ReflectMessage}; use crate::{ @@ -2664,7 +2715,7 @@ mod tests { peers::{ create_packet_recv_chan, peer_manager::{PeerManager, RouteAlgoType}, - peer_ospf_route::PeerRouteServiceImpl, + peer_ospf_route::{PeerIdAndVersion, PeerRouteServiceImpl}, route_trait::{NextHopPolicy, Route, RouteCostCalculatorInterface}, tests::{connect_peer_manager, create_mock_peer_manager}, }, @@ -3179,4 +3230,99 @@ mod tests { p_b.get_global_ctx().config.remove_proxy_cidr(proxy); check_route_peer_id(p_c.clone()).await; } + + #[tokio::test] + async fn test_subnet_proxy_conflict() { + // Create three peer managers: A, B, C + let p_a = create_mock_peer_manager().await; + let p_b = create_mock_peer_manager().await; + let p_c = create_mock_peer_manager().await; + + // Connect A-B-C in a line topology + connect_peer_manager(p_a.clone(), p_b.clone()).await; + connect_peer_manager(p_b.clone(), p_c.clone()).await; + + // Create routes for testing + let route_a = p_a.get_route(); + let route_b = p_b.get_route(); + + // Define the proxy CIDR that will be used by both A and B + let proxy_cidr: Ipv4Cidr = "192.168.100.0/24".parse().unwrap(); + let test_ip = proxy_cidr.first_address(); + + let mut cidr_peer_id_map: PrefixMap = PrefixMap::new(); + cidr_peer_id_map.insert( + proxy_cidr, + PeerIdAndVersion { + peer_id: p_c.my_peer_id(), + version: 0, + }, + ); + assert_eq!( + cidr_peer_id_map + .get_lpm(&Ipv4Cidr::new(test_ip, 32).unwrap()) + .map(|v| v.1.peer_id) + .unwrap_or(0), + p_c.my_peer_id(), + ); + + // First, add proxy CIDR to node C to establish a baseline route + p_c.get_global_ctx() + .config + .add_proxy_cidr(proxy_cidr, None) + .unwrap(); + + // Wait for route convergence - A should route to C for the proxy CIDR + wait_for_condition( + || async { + let peer_id_for_proxy = route_a.get_peer_id_by_ipv4(&test_ip).await; + peer_id_for_proxy == Some(p_c.my_peer_id()) + }, + Duration::from_secs(10), + ) + .await; + + // Now add the same proxy CIDR to node A (creating a conflict) + p_a.get_global_ctx() + .config + .add_proxy_cidr(proxy_cidr, None) + .unwrap(); + + // Wait for route convergence - A should now route to itself for the proxy CIDR + wait_for_condition( + || async { route_a.get_peer_id_by_ipv4(&test_ip).await == Some(p_a.my_peer_id()) }, + Duration::from_secs(10), + ) + .await; + + // Also add the same proxy CIDR to node B (creating another conflict) + p_b.get_global_ctx() + .config + .add_proxy_cidr(proxy_cidr, None) + .unwrap(); + + // Wait for route convergence - B should route to itself for the proxy CIDR + wait_for_condition( + || async { route_b.get_peer_id_by_ipv4(&test_ip).await == Some(p_b.my_peer_id()) }, + Duration::from_secs(5), + ) + .await; + + // Final verification: A should still route to itself even with multiple conflicts + assert_eq!( + route_a.get_peer_id_by_ipv4(&test_ip).await, + Some(p_a.my_peer_id()) + ); + + // remove proxy on A, a should route to B + p_a.get_global_ctx().config.remove_proxy_cidr(proxy_cidr); + wait_for_condition( + || async { + let peer_id_for_proxy = route_a.get_peer_id_by_ipv4(&test_ip).await; + peer_id_for_proxy == Some(p_b.my_peer_id()) + }, + Duration::from_secs(10), + ) + .await; + } } diff --git a/easytier/src/tests/three_node.rs b/easytier/src/tests/three_node.rs index 4e47c3b..7301eff 100644 --- a/easytier/src/tests/three_node.rs +++ b/easytier/src/tests/three_node.rs @@ -11,6 +11,8 @@ use tokio::{net::UdpSocket, task::JoinSet}; use super::*; +// TODO: 需要加一个单测,确保 socks5 + exit node == self || proxy_cidr == 0.0.0.0/0 时,可以实现出口节点的能力。 + use crate::{ common::{ config::{ConfigLoader, NetworkIdentity, PortForwardConfig, TomlConfigLoader}, @@ -341,6 +343,77 @@ pub async fn basic_three_node_test( drop_insts(insts).await; } +#[tokio::test] +#[serial_test::serial] +pub async fn subnet_proxy_loop_prevention_test() { + // 测试场景:inst1 和 inst2 都代理了 10.1.2.0/24 网段, + // inst1 发起对 10.1.2.5 的 ping,不应该出现环路 + let insts = init_three_node_ex( + "udp", + |cfg| { + if cfg.get_inst_name() == "inst1" { + // inst1 代理 10.1.2.0/24 网段 + cfg.add_proxy_cidr("10.1.2.0/24".parse().unwrap(), None) + .unwrap(); + } else if cfg.get_inst_name() == "inst2" { + // inst2 也代理相同的 10.1.2.0/24 网段 + cfg.add_proxy_cidr("10.1.2.0/24".parse().unwrap(), None) + .unwrap(); + } + cfg + }, + false, + ) + .await; + + // 等待代理路由出现 - inst1 应该看到 inst2 的代理路由 + wait_proxy_route_appear( + &insts[0].get_peer_manager(), + "10.144.144.2/24", + insts[1].peer_id(), + "10.1.2.0/24", + ) + .await; + + // 等待代理路由出现 - inst2 应该看到 inst1 的代理路由 + wait_proxy_route_appear( + &insts[1].get_peer_manager(), + "10.144.144.1/24", + insts[0].peer_id(), + "10.1.2.0/24", + ) + .await; + + // 从 inst1 (net_a) 发起对 10.1.2.5 的 ping 测试 + // 这应该失败,并且不会产生环路 + let now = std::time::Instant::now(); + while now.elapsed().as_secs() < 10 { + ping_test("net_a", "10.1.2.5", None).await; + tokio::time::sleep(Duration::from_secs(1)).await; + } + + println!( + "inst0 metrics: {:?}", + insts[0] + .get_global_ctx() + .stats_manager() + .export_prometheus() + ); + + let all_metrics = insts[0].get_global_ctx().stats_manager().get_all_metrics(); + for metric in all_metrics { + if metric.name == MetricName::TrafficPacketsSelfTx { + let counter = insts[0] + .get_global_ctx() + .stats_manager() + .get_counter(metric.name, metric.labels.clone()); + assert!(counter.get() < 40); + } + } + + drop_insts(insts).await; +} + async fn subnet_proxy_test_udp(listen_ip: &str, target_ip: &str) { use crate::tunnel::{common::tests::_tunnel_pingpong_netns, udp::UdpTunnelListener}; use rand::Rng; @@ -998,7 +1071,9 @@ pub async fn wireguard_vpn_portal(#[values(true, false)] test_v6: bool) { #[rstest::rstest] #[tokio::test] #[serial_test::serial] -pub async fn socks5_vpn_portal(#[values("10.144.144.1", "10.144.144.3")] dst_addr: &str) { +pub async fn socks5_vpn_portal( + #[values("10.144.144.1", "10.144.144.3", "10.1.2.4")] dst_addr: &str, +) { use rand::Rng as _; use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, @@ -1006,7 +1081,19 @@ pub async fn socks5_vpn_portal(#[values("10.144.144.1", "10.144.144.3")] dst_add }; use tokio_socks::tcp::socks5::Socks5Stream; - let _insts = init_three_node("tcp").await; + let _insts = init_three_node_ex( + "tcp", + |cfg| { + if cfg.get_inst_name() == "inst3" { + // 添加子网代理配置 + cfg.add_proxy_cidr("10.1.2.0/24".parse().unwrap(), None) + .unwrap(); + } + cfg + }, + false, + ) + .await; let mut buf = vec![0u8; 1024]; rand::thread_rng().fill(&mut buf[..]); @@ -1016,18 +1103,23 @@ pub async fn socks5_vpn_portal(#[values("10.144.144.1", "10.144.144.3")] dst_add let task = tokio::spawn(async move { let net_ns = if dst_addr_clone == "10.144.144.1" { NetNS::new(Some("net_a".into())) - } else { + } else if dst_addr_clone == "10.144.144.3" { NetNS::new(Some("net_c".into())) + } else { + NetNS::new(Some("net_d".into())) }; + let _g = net_ns.guard(); let socket = TcpListener::bind("0.0.0.0:22222").await.unwrap(); let (mut st, addr) = socket.accept().await.unwrap(); - if dst_addr_clone == "10.144.144.3" { + if dst_addr_clone == "10.144.144.1" { + assert_eq!(addr.ip().to_string(), "127.0.0.1".to_string()); + } else if dst_addr_clone == "10.144.144.3" { assert_eq!(addr.ip().to_string(), "10.144.144.1".to_string()); } else { - assert_eq!(addr.ip().to_string(), "127.0.0.1".to_string()); + assert_eq!(addr.ip().to_string(), "10.1.2.3".to_string()); } let rbuf = &mut [0u8; 1024]; @@ -1035,7 +1127,11 @@ pub async fn socks5_vpn_portal(#[values("10.144.144.1", "10.144.144.3")] dst_add assert_eq!(rbuf, buf_clone.as_slice()); }); - let net_ns = NetNS::new(Some("net_a".into())); + let net_ns = if dst_addr == "10.1.2.4" { + NetNS::new(Some("net_c".into())) + } else { + NetNS::new(Some("net_a".into())) + }; let _g = net_ns.guard(); println!("connect to socks5 portal"); diff --git a/easytier/src/tunnel/packet_def.rs b/easytier/src/tunnel/packet_def.rs index 7c0a4bb..4be1733 100644 --- a/easytier/src/tunnel/packet_def.rs +++ b/easytier/src/tunnel/packet_def.rs @@ -82,6 +82,7 @@ bitflags::bitflags! { const NO_PROXY = 0b0000_1000; const COMPRESSED = 0b0001_0000; const KCP_SRC_MODIFIED = 0b0010_0000; + const NOT_SEND_TO_TUN = 0b0100_0000; const _ = !0; } @@ -201,6 +202,23 @@ impl PeerManagerHeader { .unwrap() .contains(PeerManagerHeaderFlags::KCP_SRC_MODIFIED) } + + pub fn set_not_send_to_tun(&mut self, not_send_to_tun: bool) -> &mut Self { + let mut flags = PeerManagerHeaderFlags::from_bits(self.flags).unwrap(); + if not_send_to_tun { + flags.insert(PeerManagerHeaderFlags::NOT_SEND_TO_TUN); + } else { + flags.remove(PeerManagerHeaderFlags::NOT_SEND_TO_TUN); + } + self.flags = flags.bits(); + self + } + + pub fn is_not_send_to_tun(&self) -> bool { + PeerManagerHeaderFlags::from_bits(self.flags) + .unwrap() + .contains(PeerManagerHeaderFlags::NOT_SEND_TO_TUN) + } } #[repr(C, packed)] diff --git a/easytier/src/vpn_portal/wireguard.rs b/easytier/src/vpn_portal/wireguard.rs index 95ca55b..584c1ea 100644 --- a/easytier/src/vpn_portal/wireguard.rs +++ b/easytier/src/vpn_portal/wireguard.rs @@ -128,7 +128,11 @@ impl WireGuardImpl { tracing::trace!(?i, "Received from wg client"); let dst = i.get_destination(); let _ = peer_mgr - .send_msg_by_ip(ZCPacket::new_with_payload(inner.as_ref()), IpAddr::V4(dst)) + .send_msg_by_ip( + ZCPacket::new_with_payload(inner.as_ref()), + IpAddr::V4(dst), + false, + ) .await; }