diff --git a/Cargo.lock b/Cargo.lock index 991118e..83b1b9c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3551,7 +3551,7 @@ dependencies = [ [[package]] name = "kcp-sys" version = "0.1.0" -source = "git+https://github.com/EasyTier/kcp-sys#a932f3ed394cad1ace9c56f90611b421d856e628" +source = "git+https://github.com/EasyTier/kcp-sys#9ce5c08807378ad0486291928994c4f80878c196" dependencies = [ "anyhow", "auto_impl", diff --git a/easytier/src/common/global_ctx.rs b/easytier/src/common/global_ctx.rs index 2a2e8c1..84e6ad9 100644 --- a/easytier/src/common/global_ctx.rs +++ b/easytier/src/common/global_ctx.rs @@ -101,6 +101,10 @@ impl GlobalCtx { let enable_exit_node = config_fs.get_flags().enable_exit_node; let no_tun = config_fs.get_flags().no_tun; + let mut feature_flags = PeerFeatureFlag::default(); + feature_flags.kcp_input = !config_fs.get_flags().disable_kcp_input; + feature_flags.no_relay_kcp = config_fs.get_flags().disable_relay_kcp; + GlobalCtx { inst_name: config_fs.get_inst_name(), id, @@ -123,7 +127,7 @@ impl GlobalCtx { enable_exit_node, no_tun, - feature_flags: AtomicCell::new(PeerFeatureFlag::default()), + feature_flags: AtomicCell::new(feature_flags), } } diff --git a/easytier/src/gateway/kcp_proxy.rs b/easytier/src/gateway/kcp_proxy.rs index 7627c09..d3ec032 100644 --- a/easytier/src/gateway/kcp_proxy.rs +++ b/easytier/src/gateway/kcp_proxy.rs @@ -1,5 +1,5 @@ use std::{ - net::{IpAddr, SocketAddr}, + net::{IpAddr, Ipv4Addr, SocketAddr}, sync::Arc, time::Duration, }; @@ -8,6 +8,7 @@ use anyhow::Context; use bytes::Bytes; use kcp_sys::{ endpoint::{KcpEndpoint, KcpPacketReceiver}, + ffi_safe::KcpConfig, packet_def::KcpPacket, stream::KcpStream, }; @@ -29,6 +30,16 @@ use crate::{ tunnel::packet_def::{PacketType, PeerManagerHeader, ZCPacket}, }; +fn create_kcp_endpoint() -> KcpEndpoint { + let mut kcp_endpoint = KcpEndpoint::new(); + kcp_endpoint.set_kcp_config_factory(Box::new(|conv| { + let mut cfg = KcpConfig::new_turbo(conv); + cfg.interval = Some(5); + cfg + })); + kcp_endpoint +} + struct KcpEndpointFilter { kcp_endpoint: Arc, is_src: bool, @@ -153,6 +164,20 @@ pub struct KcpProxySrc { tasks: JoinSet<()>, } +impl TcpProxyForKcpSrc { + async fn check_dst_allow_kcp_input(&self, dst_ip: &Ipv4Addr) -> bool { + let peer_map: Arc = + self.0.get_peer_manager().get_peer_map(); + let Some(dst_peer_id) = peer_map.get_peer_id_by_ipv4(dst_ip).await else { + return false; + }; + let Some(feature_flag) = peer_map.get_peer_feature_flag(dst_peer_id).await else { + return false; + }; + feature_flag.kcp_input + } +} + #[async_trait::async_trait] impl NicPacketFilter for TcpProxyForKcpSrc { async fn try_process_packet_from_nic(&self, zc_packet: &mut ZCPacket) -> bool { @@ -161,7 +186,7 @@ impl NicPacketFilter for TcpProxyForKcpSrc { return true; } - let Some(my_ipv4) = self.0.get_local_ip() else { + let Some(my_ipv4) = self.0.get_global_ctx().get_ipv4() else { return false; }; @@ -169,8 +194,9 @@ impl NicPacketFilter for TcpProxyForKcpSrc { let ip_packet = Ipv4Packet::new(data).unwrap(); if ip_packet.get_version() != 4 // TODO: how to support net to net kcp proxy? - || ip_packet.get_source() != my_ipv4 + || ip_packet.get_source() != my_ipv4.address() || ip_packet.get_next_level_protocol() != IpNextHeaderProtocols::Tcp + || !self.check_dst_allow_kcp_input(&ip_packet.get_destination()).await { return false; } @@ -183,7 +209,7 @@ impl NicPacketFilter for TcpProxyForKcpSrc { impl KcpProxySrc { pub async fn new(peer_manager: Arc) -> Self { - let mut kcp_endpoint = KcpEndpoint::new(); + let mut kcp_endpoint = create_kcp_endpoint(); kcp_endpoint.run().await; let output_receiver = kcp_endpoint.output_receiver().unwrap(); @@ -238,7 +264,7 @@ pub struct KcpProxyDst { impl KcpProxyDst { pub async fn new(peer_manager: Arc) -> Self { - let mut kcp_endpoint = KcpEndpoint::new(); + let mut kcp_endpoint = create_kcp_endpoint(); kcp_endpoint.run().await; let mut tasks = JoinSet::new(); diff --git a/easytier/src/gateway/tcp_proxy.rs b/easytier/src/gateway/tcp_proxy.rs index d91fc7f..90ce931 100644 --- a/easytier/src/gateway/tcp_proxy.rs +++ b/easytier/src/gateway/tcp_proxy.rs @@ -700,6 +700,10 @@ impl TcpProxy { } } + pub fn get_global_ctx(&self) -> &ArcGlobalCtx { + &self.global_ctx + } + pub fn is_smoltcp_enabled(&self) -> bool { self.enable_smoltcp .load(std::sync::atomic::Ordering::Relaxed) @@ -783,4 +787,8 @@ impl TcpProxy { Some(()) } + + pub fn get_peer_manager(&self) -> &Arc { + &self.peer_manager + } } diff --git a/easytier/src/peers/peer_map.rs b/easytier/src/peers/peer_map.rs index 4be5bbf..26aaf37 100644 --- a/easytier/src/peers/peer_map.rs +++ b/easytier/src/peers/peer_map.rs @@ -10,7 +10,7 @@ use crate::{ global_ctx::{ArcGlobalCtx, GlobalCtxEvent, NetworkIdentity}, PeerId, }, - proto::cli::PeerConnInfo, + proto::{cli::PeerConnInfo, common::PeerFeatureFlag}, tunnel::{packet_def::ZCPacket, TunnelError}, }; @@ -167,6 +167,16 @@ impl PeerMap { None } + pub async fn get_peer_feature_flag(&self, peer_id: PeerId) -> Option { + for route in self.routes.read().await.iter() { + let feature_flag = route.get_feature_flag(peer_id).await; + if feature_flag.is_some() { + return feature_flag; + }; + } + None + } + pub fn is_empty(&self) -> bool { self.peer_map.is_empty() } diff --git a/easytier/src/peers/peer_ospf_route.rs b/easytier/src/peers/peer_ospf_route.rs index d966b26..779ea21 100644 --- a/easytier/src/peers/peer_ospf_route.rs +++ b/easytier/src/peers/peer_ospf_route.rs @@ -30,7 +30,7 @@ use crate::{ }, peers::route_trait::{Route, RouteInterfaceBox}, proto::{ - common::{Ipv4Inet, NatType, StunInfo}, + common::{Ipv4Inet, NatType, PeerFeatureFlag, StunInfo}, peer_rpc::{ route_foreign_network_infos, ForeignNetworkRouteInfoEntry, ForeignNetworkRouteInfoKey, OspfRouteRpc, OspfRouteRpcClientFactory, OspfRouteRpcServer, PeerIdVersion, @@ -2042,6 +2042,8 @@ impl Route for PeerRoute { route.cost_latency_first = next_hop_peer_latency_first.map(|x| x.path_latency); route.path_latency_latency_first = next_hop_peer_latency_first.map(|x| x.path_latency); + route.feature_flag = item.feature_flag.clone(); + routes.push(route); } routes @@ -2101,6 +2103,14 @@ impl Route for PeerRoute { .map(|x| x.clone()) .unwrap_or_default() } + + async fn get_feature_flag(&self, peer_id: PeerId) -> Option { + self.service_impl + .route_table + .peer_infos + .get(&peer_id) + .and_then(|x| x.feature_flag.clone()) + } } impl PeerPacketFilter for Arc {} diff --git a/easytier/src/peers/route_trait.rs b/easytier/src/peers/route_trait.rs index 9cf6c24..76fd1fb 100644 --- a/easytier/src/peers/route_trait.rs +++ b/easytier/src/peers/route_trait.rs @@ -4,8 +4,11 @@ use dashmap::DashMap; use crate::{ common::{global_ctx::NetworkIdentity, PeerId}, - proto::peer_rpc::{ - ForeignNetworkRouteInfoEntry, ForeignNetworkRouteInfoKey, RouteForeignNetworkInfos, + proto::{ + common::PeerFeatureFlag, + peer_rpc::{ + ForeignNetworkRouteInfoEntry, ForeignNetworkRouteInfoKey, RouteForeignNetworkInfos, + }, }, }; @@ -94,6 +97,8 @@ pub trait Route { async fn set_route_cost_fn(&self, _cost_fn: RouteCostCalculator) {} + async fn get_feature_flag(&self, peer_id: PeerId) -> Option; + async fn dump(&self) -> String { "this route implementation does not support dump".to_string() } diff --git a/easytier/src/proto/common.proto b/easytier/src/proto/common.proto index 5eb21f5..18c1701 100644 --- a/easytier/src/proto/common.proto +++ b/easytier/src/proto/common.proto @@ -151,4 +151,6 @@ message StunInfo { message PeerFeatureFlag { bool is_public_server = 1; bool avoid_relay_data = 2; + bool kcp_input = 3; + bool no_relay_kcp = 4; } diff --git a/easytier/src/tests/three_node.rs b/easytier/src/tests/three_node.rs index d1943bf..2e8bdee 100644 --- a/easytier/src/tests/three_node.rs +++ b/easytier/src/tests/three_node.rs @@ -365,6 +365,7 @@ pub async fn subnet_proxy_three_node_test( #[values(true, false)] no_tun: bool, #[values(true, false)] relay_by_public_server: bool, #[values(true, false)] enable_kcp_proxy: bool, + #[values(true, false)] disable_kcp_input: bool, ) { let insts = init_three_node_ex( proto, @@ -372,6 +373,7 @@ pub async fn subnet_proxy_three_node_test( if cfg.get_inst_name() == "inst3" { let mut flags = cfg.get_flags(); flags.no_tun = no_tun; + flags.disable_kcp_input = disable_kcp_input; cfg.set_flags(flags); cfg.add_proxy_cidr("10.1.2.0/24".parse().unwrap()); }