make kcp proxy compitible with old version (#585)

* fix kcp not work with smoltcp
* check if dst kcp input is enabled
This commit is contained in:
Sijie.Sun
2025-01-26 16:22:10 +08:00
committed by GitHub
parent b69b122c8d
commit 2a5d5ea4df
9 changed files with 78 additions and 11 deletions

2
Cargo.lock generated
View File

@@ -3551,7 +3551,7 @@ dependencies = [
[[package]]
name = "kcp-sys"
version = "0.1.0"
source = "git+https://github.com/EasyTier/kcp-sys#a932f3ed394cad1ace9c56f90611b421d856e628"
source = "git+https://github.com/EasyTier/kcp-sys#9ce5c08807378ad0486291928994c4f80878c196"
dependencies = [
"anyhow",
"auto_impl",

View File

@@ -101,6 +101,10 @@ impl GlobalCtx {
let enable_exit_node = config_fs.get_flags().enable_exit_node;
let no_tun = config_fs.get_flags().no_tun;
let mut feature_flags = PeerFeatureFlag::default();
feature_flags.kcp_input = !config_fs.get_flags().disable_kcp_input;
feature_flags.no_relay_kcp = config_fs.get_flags().disable_relay_kcp;
GlobalCtx {
inst_name: config_fs.get_inst_name(),
id,
@@ -123,7 +127,7 @@ impl GlobalCtx {
enable_exit_node,
no_tun,
feature_flags: AtomicCell::new(PeerFeatureFlag::default()),
feature_flags: AtomicCell::new(feature_flags),
}
}

View File

@@ -1,5 +1,5 @@
use std::{
net::{IpAddr, SocketAddr},
net::{IpAddr, Ipv4Addr, SocketAddr},
sync::Arc,
time::Duration,
};
@@ -8,6 +8,7 @@ use anyhow::Context;
use bytes::Bytes;
use kcp_sys::{
endpoint::{KcpEndpoint, KcpPacketReceiver},
ffi_safe::KcpConfig,
packet_def::KcpPacket,
stream::KcpStream,
};
@@ -29,6 +30,16 @@ use crate::{
tunnel::packet_def::{PacketType, PeerManagerHeader, ZCPacket},
};
fn create_kcp_endpoint() -> KcpEndpoint {
let mut kcp_endpoint = KcpEndpoint::new();
kcp_endpoint.set_kcp_config_factory(Box::new(|conv| {
let mut cfg = KcpConfig::new_turbo(conv);
cfg.interval = Some(5);
cfg
}));
kcp_endpoint
}
struct KcpEndpointFilter {
kcp_endpoint: Arc<KcpEndpoint>,
is_src: bool,
@@ -153,6 +164,20 @@ pub struct KcpProxySrc {
tasks: JoinSet<()>,
}
impl TcpProxyForKcpSrc {
async fn check_dst_allow_kcp_input(&self, dst_ip: &Ipv4Addr) -> bool {
let peer_map: Arc<crate::peers::peer_map::PeerMap> =
self.0.get_peer_manager().get_peer_map();
let Some(dst_peer_id) = peer_map.get_peer_id_by_ipv4(dst_ip).await else {
return false;
};
let Some(feature_flag) = peer_map.get_peer_feature_flag(dst_peer_id).await else {
return false;
};
feature_flag.kcp_input
}
}
#[async_trait::async_trait]
impl NicPacketFilter for TcpProxyForKcpSrc {
async fn try_process_packet_from_nic(&self, zc_packet: &mut ZCPacket) -> bool {
@@ -161,7 +186,7 @@ impl NicPacketFilter for TcpProxyForKcpSrc {
return true;
}
let Some(my_ipv4) = self.0.get_local_ip() else {
let Some(my_ipv4) = self.0.get_global_ctx().get_ipv4() else {
return false;
};
@@ -169,8 +194,9 @@ impl NicPacketFilter for TcpProxyForKcpSrc {
let ip_packet = Ipv4Packet::new(data).unwrap();
if ip_packet.get_version() != 4
// TODO: how to support net to net kcp proxy?
|| ip_packet.get_source() != my_ipv4
|| ip_packet.get_source() != my_ipv4.address()
|| ip_packet.get_next_level_protocol() != IpNextHeaderProtocols::Tcp
|| !self.check_dst_allow_kcp_input(&ip_packet.get_destination()).await
{
return false;
}
@@ -183,7 +209,7 @@ impl NicPacketFilter for TcpProxyForKcpSrc {
impl KcpProxySrc {
pub async fn new(peer_manager: Arc<PeerManager>) -> Self {
let mut kcp_endpoint = KcpEndpoint::new();
let mut kcp_endpoint = create_kcp_endpoint();
kcp_endpoint.run().await;
let output_receiver = kcp_endpoint.output_receiver().unwrap();
@@ -238,7 +264,7 @@ pub struct KcpProxyDst {
impl KcpProxyDst {
pub async fn new(peer_manager: Arc<PeerManager>) -> Self {
let mut kcp_endpoint = KcpEndpoint::new();
let mut kcp_endpoint = create_kcp_endpoint();
kcp_endpoint.run().await;
let mut tasks = JoinSet::new();

View File

@@ -700,6 +700,10 @@ impl<C: NatDstConnector> TcpProxy<C> {
}
}
pub fn get_global_ctx(&self) -> &ArcGlobalCtx {
&self.global_ctx
}
pub fn is_smoltcp_enabled(&self) -> bool {
self.enable_smoltcp
.load(std::sync::atomic::Ordering::Relaxed)
@@ -783,4 +787,8 @@ impl<C: NatDstConnector> TcpProxy<C> {
Some(())
}
pub fn get_peer_manager(&self) -> &Arc<PeerManager> {
&self.peer_manager
}
}

View File

@@ -10,7 +10,7 @@ use crate::{
global_ctx::{ArcGlobalCtx, GlobalCtxEvent, NetworkIdentity},
PeerId,
},
proto::cli::PeerConnInfo,
proto::{cli::PeerConnInfo, common::PeerFeatureFlag},
tunnel::{packet_def::ZCPacket, TunnelError},
};
@@ -167,6 +167,16 @@ impl PeerMap {
None
}
pub async fn get_peer_feature_flag(&self, peer_id: PeerId) -> Option<PeerFeatureFlag> {
for route in self.routes.read().await.iter() {
let feature_flag = route.get_feature_flag(peer_id).await;
if feature_flag.is_some() {
return feature_flag;
};
}
None
}
pub fn is_empty(&self) -> bool {
self.peer_map.is_empty()
}

View File

@@ -30,7 +30,7 @@ use crate::{
},
peers::route_trait::{Route, RouteInterfaceBox},
proto::{
common::{Ipv4Inet, NatType, StunInfo},
common::{Ipv4Inet, NatType, PeerFeatureFlag, StunInfo},
peer_rpc::{
route_foreign_network_infos, ForeignNetworkRouteInfoEntry, ForeignNetworkRouteInfoKey,
OspfRouteRpc, OspfRouteRpcClientFactory, OspfRouteRpcServer, PeerIdVersion,
@@ -2042,6 +2042,8 @@ impl Route for PeerRoute {
route.cost_latency_first = next_hop_peer_latency_first.map(|x| x.path_latency);
route.path_latency_latency_first = next_hop_peer_latency_first.map(|x| x.path_latency);
route.feature_flag = item.feature_flag.clone();
routes.push(route);
}
routes
@@ -2101,6 +2103,14 @@ impl Route for PeerRoute {
.map(|x| x.clone())
.unwrap_or_default()
}
async fn get_feature_flag(&self, peer_id: PeerId) -> Option<PeerFeatureFlag> {
self.service_impl
.route_table
.peer_infos
.get(&peer_id)
.and_then(|x| x.feature_flag.clone())
}
}
impl PeerPacketFilter for Arc<PeerRoute> {}

View File

@@ -4,8 +4,11 @@ use dashmap::DashMap;
use crate::{
common::{global_ctx::NetworkIdentity, PeerId},
proto::peer_rpc::{
ForeignNetworkRouteInfoEntry, ForeignNetworkRouteInfoKey, RouteForeignNetworkInfos,
proto::{
common::PeerFeatureFlag,
peer_rpc::{
ForeignNetworkRouteInfoEntry, ForeignNetworkRouteInfoKey, RouteForeignNetworkInfos,
},
},
};
@@ -94,6 +97,8 @@ pub trait Route {
async fn set_route_cost_fn(&self, _cost_fn: RouteCostCalculator) {}
async fn get_feature_flag(&self, peer_id: PeerId) -> Option<PeerFeatureFlag>;
async fn dump(&self) -> String {
"this route implementation does not support dump".to_string()
}

View File

@@ -151,4 +151,6 @@ message StunInfo {
message PeerFeatureFlag {
bool is_public_server = 1;
bool avoid_relay_data = 2;
bool kcp_input = 3;
bool no_relay_kcp = 4;
}

View File

@@ -365,6 +365,7 @@ pub async fn subnet_proxy_three_node_test(
#[values(true, false)] no_tun: bool,
#[values(true, false)] relay_by_public_server: bool,
#[values(true, false)] enable_kcp_proxy: bool,
#[values(true, false)] disable_kcp_input: bool,
) {
let insts = init_three_node_ex(
proto,
@@ -372,6 +373,7 @@ pub async fn subnet_proxy_three_node_test(
if cfg.get_inst_name() == "inst3" {
let mut flags = cfg.get_flags();
flags.no_tun = no_tun;
flags.disable_kcp_input = disable_kcp_input;
cfg.set_flags(flags);
cfg.add_proxy_cidr("10.1.2.0/24".parse().unwrap());
}