diff --git a/easytier/src/common/acl_processor.rs b/easytier/src/common/acl_processor.rs index 386aff9..de101d8 100644 --- a/easytier/src/common/acl_processor.rs +++ b/easytier/src/common/acl_processor.rs @@ -470,6 +470,7 @@ impl AclProcessor { let rules = match chain_type { ChainType::Inbound => &self.inbound_rules, ChainType::Outbound => &self.outbound_rules, + ChainType::Forward => &self.forward_rules, _ => { return AclResult { action: Action::Drop, diff --git a/easytier/src/gateway/kcp_proxy.rs b/easytier/src/gateway/kcp_proxy.rs index d7dd395..866edbb 100644 --- a/easytier/src/gateway/kcp_proxy.rs +++ b/easytier/src/gateway/kcp_proxy.rs @@ -20,7 +20,12 @@ use pnet::packet::{ Packet as _, }; use prost::Message; -use tokio::{io::copy_bidirectional, select, task::JoinSet}; +use tokio::{ + io::{copy_bidirectional, AsyncRead, AsyncWrite}, + select, + task::JoinSet, +}; +use tokio_util::io::InspectReader; use super::{ tcp_proxy::{NatDstConnector, NatDstTcpConnector, TcpProxy}, @@ -28,11 +33,13 @@ use super::{ }; use crate::{ common::{ + acl_processor::PacketInfo, error::Result, global_ctx::{ArcGlobalCtx, GlobalCtx}, }, - peers::{peer_manager::PeerManager, NicPacketFilter, PeerPacketFilter}, + peers::{acl_filter::AclFilter, peer_manager::PeerManager, NicPacketFilter, PeerPacketFilter}, proto::{ + acl::{Action, ChainType, Protocol}, cli::{ ListTcpProxyEntryRequest, ListTcpProxyEntryResponse, TcpProxyEntry, TcpProxyEntryState, TcpProxyEntryTransportType, TcpProxyRpc, @@ -372,6 +379,50 @@ pub struct KcpProxyDst { tasks: JoinSet<()>, } +#[derive(Clone)] +pub struct ProxyAclHandler { + pub acl_filter: Arc, + pub packet_info: PacketInfo, + pub chain_type: ChainType, +} + +impl ProxyAclHandler { + pub fn handle_packet(&self, buf: &[u8]) -> Result<()> { + let mut packet_info = self.packet_info.clone(); + packet_info.packet_size = buf.len(); + let ret = self + .acl_filter + .get_processor() + .process_packet(&packet_info, self.chain_type); + self.acl_filter.handle_acl_result( + &ret, + &packet_info, + self.chain_type, + &self.acl_filter.get_processor(), + ); + if !matches!(ret.action, Action::Allow) { + return Err(anyhow::anyhow!("acl denied").into()); + } + + Ok(()) + } + + pub async fn copy_bidirection_with_acl( + &self, + src: impl AsyncRead + AsyncWrite + Unpin, + mut dst: impl AsyncRead + AsyncWrite + Unpin, + ) -> Result<()> { + let (src_reader, src_writer) = tokio::io::split(src); + let src_reader = InspectReader::new(src_reader, |buf| { + let _ = self.handle_packet(buf); + }); + let mut src = tokio::io::join(src_reader, src_writer); + + copy_bidirectional(&mut src, &mut dst).await?; + Ok(()) + } +} + impl KcpProxyDst { pub async fn new(peer_manager: Arc) -> Self { let mut kcp_endpoint = create_kcp_endpoint(); @@ -396,7 +447,7 @@ impl KcpProxyDst { #[tracing::instrument(ret)] async fn handle_one_in_stream( - mut kcp_stream: KcpStream, + kcp_stream: KcpStream, global_ctx: ArcGlobalCtx, proxy_entries: Arc>, cidr_set: Arc, @@ -411,6 +462,7 @@ impl KcpProxyDst { parsed_conn_data ))? .into(); + let src_socket: SocketAddr = parsed_conn_data.src.unwrap_or_default().into(); match dst_socket.ip() { IpAddr::V4(dst_v4_ip) => { @@ -437,17 +489,36 @@ impl KcpProxyDst { proxy_entries.remove(&conn_id); } - if Some(dst_socket.ip()) == global_ctx.get_ipv4().map(|ip| IpAddr::V4(ip.address())) - && global_ctx.no_tun() - { + let send_to_self = + Some(dst_socket.ip()) == global_ctx.get_ipv4().map(|ip| IpAddr::V4(ip.address())); + + if send_to_self && global_ctx.no_tun() { dst_socket = format!("127.0.0.1:{}", dst_socket.port()).parse().unwrap(); } + let acl_handler = ProxyAclHandler { + acl_filter: global_ctx.get_acl_filter().clone(), + packet_info: PacketInfo { + src_ip: src_socket.ip(), + dst_ip: dst_socket.ip(), + src_port: Some(src_socket.port()), + dst_port: Some(dst_socket.port()), + protocol: Protocol::Tcp, + packet_size: conn_data.len(), + }, + chain_type: if send_to_self { + ChainType::Inbound + } else { + ChainType::Forward + }, + }; + acl_handler.handle_packet(&conn_data)?; + tracing::debug!("kcp connect to dst socket: {:?}", dst_socket); let _g = global_ctx.net_ns.guard(); let connector = NatDstTcpConnector {}; - let mut ret = connector + let ret = connector .connect("0.0.0.0:0".parse().unwrap(), dst_socket) .await?; @@ -455,7 +526,10 @@ impl KcpProxyDst { e.state = TcpProxyEntryState::Connected.into(); } - copy_bidirectional(&mut ret, &mut kcp_stream).await?; + acl_handler + .copy_bidirection_with_acl(kcp_stream, ret) + .await?; + Ok(()) } diff --git a/easytier/src/gateway/quic_proxy.rs b/easytier/src/gateway/quic_proxy.rs index e6c6f5d..e291b7e 100644 --- a/easytier/src/gateway/quic_proxy.rs +++ b/easytier/src/gateway/quic_proxy.rs @@ -7,19 +7,21 @@ use dashmap::DashMap; use pnet::packet::ipv4::Ipv4Packet; use prost::Message as _; use quinn::{Endpoint, Incoming}; -use tokio::io::{copy_bidirectional, AsyncRead, AsyncReadExt, AsyncWrite}; +use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite}; use tokio::net::TcpStream; use tokio::task::JoinSet; use tokio::time::timeout; +use crate::common::acl_processor::PacketInfo; use crate::common::error::Result; use crate::common::global_ctx::{ArcGlobalCtx, GlobalCtx}; use crate::common::join_joinset_background; use crate::defer; -use crate::gateway::kcp_proxy::TcpProxyForKcpSrcTrait; +use crate::gateway::kcp_proxy::{ProxyAclHandler, TcpProxyForKcpSrcTrait}; use crate::gateway::tcp_proxy::{NatDstConnector, NatDstTcpConnector, TcpProxy}; use crate::gateway::CidrSet; use crate::peers::peer_manager::PeerManager; +use crate::proto::acl::{ChainType, Protocol}; use crate::proto::cli::{ ListTcpProxyEntryRequest, ListTcpProxyEntryResponse, TcpProxyEntry, TcpProxyEntryState, TcpProxyEntryTransportType, TcpProxyRpc, @@ -322,12 +324,13 @@ impl QUICProxyDst { .await; match ret { - Ok(Ok((mut quic_stream, mut tcp_stream))) => { - let ret = copy_bidirectional(&mut quic_stream, &mut tcp_stream).await; + Ok(Ok((quic_stream, tcp_stream, acl))) => { + let remote_addr = quic_stream.connection.as_ref().map(|c| c.remote_address()); + let ret = acl.copy_bidirection_with_acl(quic_stream, tcp_stream).await; tracing::info!( "QUIC connection handled, result: {:?}, remote addr: {:?}", ret, - quic_stream.connection.as_ref().map(|c| c.remote_address()) + remote_addr, ); } Ok(Err(e)) => { @@ -345,7 +348,7 @@ impl QUICProxyDst { cidr_set: Arc, proxy_entry_key: SocketAddr, proxy_entries: Arc>, - ) -> Result<(QUICStream, TcpStream)> { + ) -> Result<(QUICStream, TcpStream, ProxyAclHandler)> { let conn = incoming.await.with_context(|| "accept failed")?; let addr = conn.remote_address(); tracing::info!("Accepted QUIC connection from {}", addr); @@ -376,7 +379,8 @@ impl QUICProxyDst { dst_socket.set_ip(real_ip); } - if Some(*dst_socket.ip()) == ctx.get_ipv4().map(|ip| ip.address()) && ctx.no_tun() { + let send_to_self = Some(*dst_socket.ip()) == ctx.get_ipv4().map(|ip| ip.address()); + if send_to_self && ctx.no_tun() { dst_socket = format!("127.0.0.1:{}", dst_socket.port()).parse().unwrap(); } @@ -391,6 +395,24 @@ impl QUICProxyDst { }, ); + let acl_handler = ProxyAclHandler { + acl_filter: ctx.get_acl_filter().clone(), + packet_info: PacketInfo { + src_ip: addr.ip(), + dst_ip: (*dst_socket.ip()).into(), + src_port: Some(addr.port()), + dst_port: Some(dst_socket.port()), + protocol: Protocol::Tcp, + packet_size: len as usize, + }, + chain_type: if send_to_self { + ChainType::Inbound + } else { + ChainType::Forward + }, + }; + acl_handler.handle_packet(&buf)?; + let connector = NatDstTcpConnector {}; let dst_stream = { @@ -411,7 +433,7 @@ impl QUICProxyDst { receiver: r, }; - Ok((quic_stream, dst_stream)) + Ok((quic_stream, dst_stream, acl_handler)) } } diff --git a/easytier/src/gateway/tcp_proxy.rs b/easytier/src/gateway/tcp_proxy.rs index 37299fd..173048e 100644 --- a/easytier/src/gateway/tcp_proxy.rs +++ b/easytier/src/gateway/tcp_proxy.rs @@ -63,7 +63,13 @@ pub struct NatDstTcpConnector; impl NatDstConnector for NatDstTcpConnector { type DstStream = TcpStream; async fn connect(&self, _src: SocketAddr, nat_dst: SocketAddr) -> Result { - let socket = TcpSocket::new_v4().unwrap(); + let socket = match TcpSocket::new_v4() { + Ok(s) => s, + Err(e) => { + eprintln!("create v4 socket failed: {:?}", e); + return Err(e.into()); + } + }; if let Err(e) = socket.set_nodelay(true) { tracing::warn!("set_nodelay failed, ignore it: {:?}", e); } diff --git a/easytier/src/peers/acl_filter.rs b/easytier/src/peers/acl_filter.rs index 8687345..1599b38 100644 --- a/easytier/src/peers/acl_filter.rs +++ b/easytier/src/peers/acl_filter.rs @@ -64,7 +64,7 @@ impl AclFilter { } /// Get current processor for processing packets - fn get_processor(&self) -> Arc { + pub fn get_processor(&self) -> Arc { self.acl_processor.load_full() } @@ -160,7 +160,7 @@ impl AclFilter { } /// Process ACL result and log if needed - fn handle_acl_result( + pub fn handle_acl_result( &self, result: &AclResult, packet_info: &PacketInfo, diff --git a/easytier/src/tests/three_node.rs b/easytier/src/tests/three_node.rs index ba407ea..30283f8 100644 --- a/easytier/src/tests/three_node.rs +++ b/easytier/src/tests/three_node.rs @@ -1329,16 +1329,33 @@ async fn avoid_tunnel_loop_back_to_virtual_network() { drop_insts(insts).await; } +#[rstest::rstest] #[tokio::test] #[serial_test::serial] -pub async fn acl_rule_test_inbound() { +pub async fn acl_rule_test_inbound( + #[values(true, false)] enable_kcp_proxy: bool, + #[values(true, false)] enable_quic_proxy: bool, +) { use crate::tunnel::{ common::tests::_tunnel_pingpong_netns, tcp::{TcpTunnelConnector, TcpTunnelListener}, udp::{UdpTunnelConnector, UdpTunnelListener}, }; use rand::Rng; - let insts = init_three_node("udp").await; + let insts = init_three_node_ex( + "udp", + |cfg| { + if cfg.get_inst_name() == "inst1" { + let mut flags = cfg.get_flags(); + flags.enable_kcp_proxy = enable_kcp_proxy; + flags.enable_quic_proxy = enable_quic_proxy; + cfg.set_flags(flags); + } + cfg + }, + false, + ) + .await; // 构造 ACL 配置 use crate::proto::acl::*; @@ -1422,7 +1439,7 @@ pub async fn acl_rule_test_inbound() { .await; // 6. 8080 应该连接失败(被 ACL 拦截) - let result = tokio::time::timeout( + let result = tokio::spawn(tokio::time::timeout( std::time::Duration::from_millis(200), _tunnel_pingpong_netns( listener_8080, @@ -1431,10 +1448,13 @@ pub async fn acl_rule_test_inbound() { NetNS::new(Some("net_a".into())), buf.clone(), ), - ) + )) .await; - assert!(result.is_err(), "TCP 连接 8080 应被 ACL 拦截,不能成功"); + assert!( + result.is_err() || result.unwrap().is_err(), + "TCP 连接 8080 应被 ACL 拦截,不能成功" + ); // 7. 从 10.144.144.2 连接 8082 应该连接失败(被 ACL 拦截) let result = tokio::time::timeout( @@ -1508,3 +1528,236 @@ pub async fn acl_rule_test_inbound() { drop_insts(insts).await; } + +#[rstest::rstest] +#[tokio::test] +#[serial_test::serial] +pub async fn acl_rule_test_subnet_proxy( + #[values(true, false)] enable_kcp_proxy: bool, + #[values(true, false)] enable_quic_proxy: bool, +) { + use crate::tunnel::{ + common::tests::_tunnel_pingpong_netns, + tcp::{TcpTunnelConnector, TcpTunnelListener}, + udp::{UdpTunnelConnector, UdpTunnelListener}, + }; + use rand::Rng; + + let insts = init_three_node_ex( + "udp", + |cfg| { + if cfg.get_inst_name() == "inst1" { + let mut flags = cfg.get_flags(); + flags.enable_kcp_proxy = enable_kcp_proxy; + flags.enable_quic_proxy = enable_quic_proxy; + cfg.set_flags(flags); + } else if cfg.get_inst_name() == "inst3" { + // 添加子网代理配置 + cfg.add_proxy_cidr("10.1.2.0/24".parse().unwrap(), None) + .unwrap(); + } + cfg + }, + false, + ) + .await; + + // 等待代理路由出现 + wait_proxy_route_appear( + &insts[0].get_peer_manager(), + "10.144.144.3/24", + insts[2].peer_id(), + "10.1.2.0/24", + ) + .await; + + // Test IPv4 connectivity + wait_for_condition( + || async { ping_test("net_a", "10.1.2.4", None).await }, + Duration::from_secs(5), + ) + .await; + + // 构造 ACL 配置 - 针对子网代理流量 + use crate::proto::acl::*; + let mut acl = Acl::default(); + let mut acl_v1 = AclV1::default(); + + let mut chain = Chain::default(); + chain.name = "test_subnet_proxy_inbound".to_string(); + chain.chain_type = ChainType::Forward as i32; + chain.enabled = true; + + // 禁止访问子网代理中的 8080 端口 + let mut deny_rule = Rule::default(); + deny_rule.name = "deny_subnet_8080".to_string(); + deny_rule.priority = 200; + deny_rule.enabled = true; + deny_rule.action = Action::Drop as i32; + deny_rule.protocol = Protocol::Any as i32; + deny_rule.ports = vec!["8080".to_string()]; + deny_rule.destination_ips = vec!["10.1.2.0/24".to_string()]; + chain.rules.push(deny_rule); + + // 禁止来自 inst1 (10.144.144.1) 访问子网代理中的 8081 端口 + let mut deny_src_rule = Rule::default(); + deny_src_rule.name = "deny_inst1_to_subnet_8081".to_string(); + deny_src_rule.priority = 200; + deny_src_rule.enabled = true; + deny_src_rule.action = Action::Drop as i32; + deny_src_rule.protocol = Protocol::Any as i32; + deny_src_rule.ports = vec!["8081".to_string()]; + deny_src_rule.source_ips = vec!["10.144.144.1/32".to_string()]; + deny_src_rule.destination_ips = vec!["10.1.2.0/24".to_string()]; + chain.rules.push(deny_src_rule); + + // 允许其他流量 + let mut allow_rule = Rule::default(); + allow_rule.name = "allow_all".to_string(); + allow_rule.priority = 100; + allow_rule.enabled = true; + allow_rule.action = Action::Allow as i32; + allow_rule.protocol = Protocol::Any as i32; + allow_rule.stateful = true; + chain.rules.push(allow_rule); + + acl_v1.chains.push(chain); + acl.acl_v1 = Some(acl_v1); + + // 在 inst3 上应用 ACL 规则 + insts[2] + .get_global_ctx() + .get_acl_filter() + .reload_rules(Some(&acl)); + + // TCP 测试部分 - 测试子网代理的 ACL 规则 + { + // 在 net_d (10.1.2.4) 上监听多个端口 + let listener_8080 = TcpTunnelListener::new("tcp://0.0.0.0:8080".parse().unwrap()); + let listener_8081 = TcpTunnelListener::new("tcp://0.0.0.0:8081".parse().unwrap()); + let listener_8082 = TcpTunnelListener::new("tcp://0.0.0.0:8082".parse().unwrap()); + + // 从 inst1 (net_a) 连接到子网代理 + let connector_8080 = TcpTunnelConnector::new("tcp://10.1.2.4:8080".parse().unwrap()); + let connector_8081 = TcpTunnelConnector::new("tcp://10.1.2.4:8081".parse().unwrap()); + let connector_8082 = TcpTunnelConnector::new("tcp://10.1.2.4:8082".parse().unwrap()); + + let mut buf = vec![0; 32]; + rand::thread_rng().fill(&mut buf[..]); + + // 8082 应该可以连接成功(不被 ACL 拦截) + _tunnel_pingpong_netns( + listener_8082, + connector_8082, + NetNS::new(Some("net_d".into())), + NetNS::new(Some("net_a".into())), + buf.clone(), + ) + .await; + + // 8080 应该连接失败(被 ACL 拦截 - 禁止访问子网代理的 8080) + let result = tokio::spawn(tokio::time::timeout( + std::time::Duration::from_millis(200), + _tunnel_pingpong_netns( + listener_8080, + connector_8080, + NetNS::new(Some("net_d".into())), + NetNS::new(Some("net_a".into())), + buf.clone(), + ), + )) + .await; + + assert!( + result.is_err() || result.unwrap().is_err(), + "TCP 连接子网代理 8080 应被 ACL 拦截,不能成功" + ); + + // 8081 应该连接失败(被 ACL 拦截 - 禁止 inst1 访问子网代理的 8081) + let result = tokio::spawn(tokio::time::timeout( + std::time::Duration::from_millis(200), + _tunnel_pingpong_netns( + listener_8081, + connector_8081, + NetNS::new(Some("net_d".into())), + NetNS::new(Some("net_a".into())), + buf.clone(), + ), + )) + .await; + + assert!( + result.is_err() || result.unwrap().is_err(), + "TCP 连接子网代理 8081 应被 ACL 拦截,不能成功" + ); + + let stats = insts[2].get_global_ctx().get_acl_filter().get_stats(); + println!("ACL stats after TCP tests: {:?}", stats); + } + + // UDP 测试部分 - 测试子网代理的 ACL 规则 + { + let listener_8080 = UdpTunnelListener::new("udp://0.0.0.0:8080".parse().unwrap()); + let listener_8082 = UdpTunnelListener::new("udp://0.0.0.0:8082".parse().unwrap()); + + let connector_8080 = UdpTunnelConnector::new("udp://10.1.2.4:8080".parse().unwrap()); + let connector_8082 = UdpTunnelConnector::new("udp://10.1.2.4:8082".parse().unwrap()); + + let mut buf = vec![0; 32]; + rand::thread_rng().fill(&mut buf[..]); + + // 8082 应该可以连接成功 + _tunnel_pingpong_netns( + listener_8082, + connector_8082, + NetNS::new(Some("net_d".into())), + NetNS::new(Some("net_a".into())), + buf.clone(), + ) + .await; + + // 8080 应该连接失败(被 ACL 拦截) + let result = tokio::time::timeout( + std::time::Duration::from_millis(200), + _tunnel_pingpong_netns( + listener_8080, + connector_8080, + NetNS::new(Some("net_d".into())), + NetNS::new(Some("net_a".into())), + buf.clone(), + ), + ) + .await; + + let stats = insts[2].get_global_ctx().get_acl_filter().get_stats(); + println!("ACL stats after UDP tests: {}", stats); + + assert!( + result.is_err(), + "UDP 连接子网代理 8080 应被 ACL 拦截,不能成功" + ); + } + + // 测试 ICMP 到子网代理(应该被拒绝,因为 Any 协议被拒绝) + tokio::spawn(wait_for_condition( + || async { ping_test("net_a", "10.1.2.4", None).await }, + Duration::from_secs(1), + )) + .await + .unwrap_err(); + + // 移除 ACL 规则 + insts[2] + .get_global_ctx() + .get_acl_filter() + .reload_rules(None); + + // 验证移除 ACL 后,ICMP 可以正常工作 + wait_for_condition( + || async { ping_test("net_a", "10.1.2.4", None).await }, + Duration::from_secs(5), + ) + .await; + + drop_insts(insts).await; +}