From ba3f36d22ba1ac276d003ee97d1ffb8fc1119537 Mon Sep 17 00:00:00 2001 From: "sijie.sun" Date: Fri, 25 Jul 2025 10:46:06 +0800 Subject: [PATCH] remove lock on pipelines --- easytier/src/gateway/icmp_proxy.rs | 2 +- easytier/src/gateway/kcp_proxy.rs | 8 +-- easytier/src/gateway/quic_proxy.rs | 4 +- easytier/src/gateway/socks5.rs | 2 +- easytier/src/gateway/tcp_proxy.rs | 4 +- easytier/src/gateway/udp_proxy.rs | 2 +- .../instance/dns_server/server_instance.rs | 4 +- easytier/src/peers/mod.rs | 6 +- easytier/src/peers/peer_manager.rs | 56 ++++++++++--------- easytier/src/peers/peer_ospf_route.rs | 3 - easytier/src/vpn_portal/wireguard.rs | 2 +- 11 files changed, 46 insertions(+), 47 deletions(-) diff --git a/easytier/src/gateway/icmp_proxy.rs b/easytier/src/gateway/icmp_proxy.rs index efde1de..b7c08c2 100644 --- a/easytier/src/gateway/icmp_proxy.rs +++ b/easytier/src/gateway/icmp_proxy.rs @@ -306,7 +306,7 @@ impl IcmpProxy { return Err(anyhow::anyhow!("peer manager is gone").into()); }; - pm.add_packet_process_pipeline(Box::new(self.clone())).await; + pm.add_packet_process_pipeline(self.clone()).await; Ok(()) } diff --git a/easytier/src/gateway/kcp_proxy.rs b/easytier/src/gateway/kcp_proxy.rs index d7dd395..cac9a7f 100644 --- a/easytier/src/gateway/kcp_proxy.rs +++ b/easytier/src/gateway/kcp_proxy.rs @@ -341,13 +341,13 @@ impl KcpProxySrc { pub async fn start(&self) { self.peer_manager - .add_nic_packet_process_pipeline(Box::new(self.tcp_proxy.clone())) + .add_nic_packet_process_pipeline(Arc::new(self.tcp_proxy.clone())) .await; self.peer_manager - .add_packet_process_pipeline(Box::new(self.tcp_proxy.0.clone())) + .add_packet_process_pipeline(Arc::new(self.tcp_proxy.0.clone())) .await; self.peer_manager - .add_packet_process_pipeline(Box::new(KcpEndpointFilter { + .add_packet_process_pipeline(Arc::new(KcpEndpointFilter { kcp_endpoint: self.kcp_endpoint.clone(), is_src: true, })) @@ -484,7 +484,7 @@ impl KcpProxyDst { pub async fn start(&mut self) { self.run_accept_task().await; self.peer_manager - .add_packet_process_pipeline(Box::new(KcpEndpointFilter { + .add_packet_process_pipeline(Arc::new(KcpEndpointFilter { kcp_endpoint: self.kcp_endpoint.clone(), is_src: false, })) diff --git a/easytier/src/gateway/quic_proxy.rs b/easytier/src/gateway/quic_proxy.rs index e6c6f5d..a3c84b8 100644 --- a/easytier/src/gateway/quic_proxy.rs +++ b/easytier/src/gateway/quic_proxy.rs @@ -227,10 +227,10 @@ impl QUICProxySrc { pub async fn start(&self) { self.peer_manager - .add_nic_packet_process_pipeline(Box::new(self.tcp_proxy.clone())) + .add_nic_packet_process_pipeline(Arc::new(self.tcp_proxy.clone())) .await; self.peer_manager - .add_packet_process_pipeline(Box::new(self.tcp_proxy.0.clone())) + .add_packet_process_pipeline(Arc::new(self.tcp_proxy.0.clone())) .await; self.tcp_proxy.0.start(false).await.unwrap(); } diff --git a/easytier/src/gateway/socks5.rs b/easytier/src/gateway/socks5.rs index 7d33350..91ed66b 100644 --- a/easytier/src/gateway/socks5.rs +++ b/easytier/src/gateway/socks5.rs @@ -621,7 +621,7 @@ impl Socks5Server { if need_start { self.peer_manager - .add_packet_process_pipeline(Box::new(self.clone())) + .add_packet_process_pipeline(self.clone()) .await; self.run_net_update_task().await; diff --git a/easytier/src/gateway/tcp_proxy.rs b/easytier/src/gateway/tcp_proxy.rs index 37299fd..40af82b 100644 --- a/easytier/src/gateway/tcp_proxy.rs +++ b/easytier/src/gateway/tcp_proxy.rs @@ -476,10 +476,10 @@ impl TcpProxy { self.run_listener().await?; if add_pipeline { self.peer_manager - .add_packet_process_pipeline(Box::new(self.clone())) + .add_packet_process_pipeline(self.clone()) .await; self.peer_manager - .add_nic_packet_process_pipeline(Box::new(self.clone())) + .add_nic_packet_process_pipeline(self.clone()) .await; } join_joinset_background(self.tasks.clone(), "TcpProxy".to_owned()); diff --git a/easytier/src/gateway/udp_proxy.rs b/easytier/src/gateway/udp_proxy.rs index 5e29f0f..5bea00a 100644 --- a/easytier/src/gateway/udp_proxy.rs +++ b/easytier/src/gateway/udp_proxy.rs @@ -404,7 +404,7 @@ impl UdpProxy { pub async fn start(self: &Arc) -> Result<(), Error> { self.peer_manager - .add_packet_process_pipeline(Box::new(self.clone())) + .add_packet_process_pipeline(self.clone()) .await; // clean up nat table diff --git a/easytier/src/instance/dns_server/server_instance.rs b/easytier/src/instance/dns_server/server_instance.rs index bbce458..8e96624 100644 --- a/easytier/src/instance/dns_server/server_instance.rs +++ b/easytier/src/instance/dns_server/server_instance.rs @@ -404,9 +404,7 @@ impl MagicDnsServerInstance { .register(MagicDnsServerRpcServer::new(data.clone()), ""); rpc_server.set_hook(data.clone()); - peer_mgr - .add_nic_packet_process_pipeline(Box::new(data.clone())) - .await; + peer_mgr.add_nic_packet_process_pipeline(data.clone()).await; let data_clone = data.clone(); tokio::task::spawn_blocking(move || data_clone.do_system_config(DEFAULT_ET_DNS_ZONE)) diff --git a/easytier/src/peers/mod.rs b/easytier/src/peers/mod.rs index fcccfe0..d10493d 100644 --- a/easytier/src/peers/mod.rs +++ b/easytier/src/peers/mod.rs @@ -23,6 +23,8 @@ pub mod peer_task; #[cfg(test)] pub mod tests; +use std::sync::Arc; + use crate::tunnel::packet_def::ZCPacket; #[async_trait::async_trait] @@ -43,8 +45,8 @@ pub trait NicPacketFilter { } } -type BoxPeerPacketFilter = Box; -type BoxNicPacketFilter = Box; +type BoxPeerPacketFilter = Arc; +type BoxNicPacketFilter = Arc; // pub type PacketRecvChan = tachyonix::Sender; // pub type PacketRecvChanReceiver = tachyonix::Receiver; diff --git a/easytier/src/peers/peer_manager.rs b/easytier/src/peers/peer_manager.rs index 9507aeb..4e534d7 100644 --- a/easytier/src/peers/peer_manager.rs +++ b/easytier/src/peers/peer_manager.rs @@ -6,6 +6,7 @@ use std::{ }; use anyhow::Context; +use arc_swap::ArcSwap; use async_trait::async_trait; use dashmap::DashMap; @@ -13,7 +14,7 @@ use dashmap::DashMap; use tokio::{ sync::{ mpsc::{self, UnboundedReceiver, UnboundedSender}, - Mutex, RwLock, + Mutex, }, task::JoinSet, }; @@ -131,8 +132,8 @@ pub struct PeerManager { peer_rpc_mgr: Arc, peer_rpc_tspt: Arc, - peer_packet_process_pipeline: Arc>>, - nic_packet_process_pipeline: Arc>>, + peer_packet_process_pipeline: Arc>>, + nic_packet_process_pipeline: ArcSwap>, route_algo_inst: RouteAlgoInst, @@ -261,8 +262,8 @@ impl PeerManager { peer_rpc_mgr, peer_rpc_tspt: rpc_tspt, - peer_packet_process_pipeline: Arc::new(RwLock::new(Vec::new())), - nic_packet_process_pipeline: Arc::new(RwLock::new(Vec::new())), + peer_packet_process_pipeline: Arc::new(ArcSwap::from(Arc::new(Vec::new()))), + nic_packet_process_pipeline: ArcSwap::from(Arc::new(Vec::new())), route_algo_inst, @@ -647,7 +648,7 @@ impl PeerManager { let mut processed = false; let mut zc_packet = Some(ret); let mut idx = 0; - for pipeline in pipe_line.read().await.iter().rev() { + for pipeline in pipe_line.load().iter().rev() { tracing::trace!(?zc_packet, ?idx, "try_process_packet_from_peer"); idx += 1; zc_packet = pipeline @@ -669,18 +670,20 @@ impl PeerManager { pub async fn add_packet_process_pipeline(&self, pipeline: BoxPeerPacketFilter) { // newest pipeline will be executed first + let current = self.peer_packet_process_pipeline.load(); + let mut new_pipelines = (*(*current)).iter().map(|x| x.clone()).collect::>(); + new_pipelines.push(pipeline); self.peer_packet_process_pipeline - .write() - .await - .push(pipeline); + .swap(Arc::new(new_pipelines)); } pub async fn add_nic_packet_process_pipeline(&self, pipeline: BoxNicPacketFilter) { // newest pipeline will be executed first + let current = self.nic_packet_process_pipeline.load(); + let mut new_pipelines = (*current).iter().map(|x| x.clone()).collect::>(); + new_pipelines.push(pipeline); self.nic_packet_process_pipeline - .write() - .await - .push(pipeline); + .swap(Arc::new(new_pipelines)); } async fn init_packet_process_pipeline(&self) { @@ -702,7 +705,7 @@ impl PeerManager { } } } - self.add_packet_process_pipeline(Box::new(NicPacketProcessor { + self.add_packet_process_pipeline(Arc::new(NicPacketProcessor { nic_channel: self.nic_channel.clone(), })) .await; @@ -727,7 +730,7 @@ impl PeerManager { } } } - self.add_packet_process_pipeline(Box::new(PeerRpcPacketProcessor { + self.add_packet_process_pipeline(Arc::new(PeerRpcPacketProcessor { peer_rpc_tspt_sender: self.peer_rpc_tspt.peer_rpc_tspt_sender.clone(), })) .await; @@ -735,12 +738,8 @@ impl PeerManager { pub async fn add_route(&self, route: T) where - T: Route + PeerPacketFilter + Send + Sync + Clone + 'static, + T: Route + Send + Sync + Clone + 'static, { - // for route - self.add_packet_process_pipeline(Box::new(route.clone())) - .await; - struct Interface { my_peer_id: PeerId, peers: Weak, @@ -866,15 +865,19 @@ impl PeerManager { return; } - for pipeline in self.nic_packet_process_pipeline.read().await.iter().rev() { + let pipelines = self.nic_packet_process_pipeline.load(); + for pipeline in pipelines.iter().rev() { let _ = pipeline.try_process_packet_from_nic(data).await; } } pub async fn remove_nic_packet_process_pipeline(&self, id: String) -> Result<(), Error> { - let mut pipelines = self.nic_packet_process_pipeline.write().await; - if let Some(pos) = pipelines.iter().position(|x| x.id() == id) { - pipelines.remove(pos); + let current = self.nic_packet_process_pipeline.load(); + let mut new_pipelines = (*current).iter().map(|x| x.clone()).collect::>(); + if let Some(pos) = new_pipelines.iter().position(|x| x.id() == id) { + new_pipelines.remove(pos); + self.nic_packet_process_pipeline + .swap(Arc::new(new_pipelines)); Ok(()) } else { Err(Error::NotFound) @@ -1206,10 +1209,9 @@ impl PeerManager { } pub async fn clear_resources(&self) { - let mut peer_pipeline = self.peer_packet_process_pipeline.write().await; - peer_pipeline.clear(); - let mut nic_pipeline = self.nic_packet_process_pipeline.write().await; - nic_pipeline.clear(); + self.peer_packet_process_pipeline + .store(Arc::new(Vec::new())); + self.nic_packet_process_pipeline.store(Arc::new(Vec::new())); self.peer_rpc_mgr.rpc_server().registry().unregister_all(); } diff --git a/easytier/src/peers/peer_ospf_route.rs b/easytier/src/peers/peer_ospf_route.rs index 5c0a1d6..cc62218 100644 --- a/easytier/src/peers/peer_ospf_route.rs +++ b/easytier/src/peers/peer_ospf_route.rs @@ -55,7 +55,6 @@ use super::{ DefaultRouteCostCalculator, ForeignNetworkRouteInfoMap, NextHopPolicy, RouteCostCalculator, RouteCostCalculatorInterface, }, - PeerPacketFilter, }; static SERVICE_ID: u32 = 7; @@ -2369,8 +2368,6 @@ impl Route for PeerRoute { } } -impl PeerPacketFilter for Arc {} - #[cfg(test)] mod tests { use std::{ diff --git a/easytier/src/vpn_portal/wireguard.rs b/easytier/src/vpn_portal/wireguard.rs index dca3ed0..83fcea7 100644 --- a/easytier/src/vpn_portal/wireguard.rs +++ b/easytier/src/vpn_portal/wireguard.rs @@ -203,7 +203,7 @@ impl WireGuardImpl { } self.peer_mgr - .add_packet_process_pipeline(Box::new(PeerPacketFilterForVpnPortal { + .add_packet_process_pipeline(Arc::new(PeerPacketFilterForVpnPortal { wg_peer_ip_table: self.wg_peer_ip_table.clone(), })) .await;