From d2ec60e10814d10058f827b2c10b0ae619fedbb0 Mon Sep 17 00:00:00 2001 From: "Sijie.Sun" Date: Tue, 7 Jan 2025 23:52:18 +0800 Subject: [PATCH] batch recv for udp proxy (#552) --- easytier/src/easytier-core.rs | 2 +- easytier/src/gateway/udp_proxy.rs | 109 ++++++++++++++++++------------ 2 files changed, 67 insertions(+), 44 deletions(-) diff --git a/easytier/src/easytier-core.rs b/easytier/src/easytier-core.rs index d8e158b..e03e108 100644 --- a/easytier/src/easytier-core.rs +++ b/easytier/src/easytier-core.rs @@ -185,7 +185,7 @@ struct Cli { #[arg( long, help = t!("core_clap.multi_thread").to_string(), - default_value = "false" + default_value = "true" )] multi_thread: bool, diff --git a/easytier/src/gateway/udp_proxy.rs b/easytier/src/gateway/udp_proxy.rs index c074704..441e6b7 100644 --- a/easytier/src/gateway/udp_proxy.rs +++ b/easytier/src/gateway/udp_proxy.rs @@ -4,6 +4,7 @@ use std::{ time::Duration, }; +use bytes::{BufMut, BytesMut}; use cidr::Ipv4Inet; use crossbeam::atomic::AtomicCell; use dashmap::DashMap; @@ -24,11 +25,11 @@ use tokio::{ use tracing::Level; use crate::{ - common::{error::Error, global_ctx::ArcGlobalCtx, PeerId}, + common::{error::Error, global_ctx::ArcGlobalCtx, scoped_task::ScopedTask, PeerId}, gateway::ip_reassembler::compose_ipv4_packet, peers::{peer_manager::PeerManager, PeerPacketFilter}, tunnel::{ - common::setup_sokcet2, + common::{reserve_buf, setup_sokcet2}, packet_def::{PacketType, ZCPacket}, }, }; @@ -139,59 +140,81 @@ impl UdpNatEntry { mut packet_sender: Sender, virtual_ipv4: Ipv4Addr, ) { - let mut buf = [0u8; 65536]; - let mut udp_body: &mut [u8] = unsafe { std::mem::transmute(&mut buf[20 + 8..]) }; - let mut ip_id = 1; + let (s, mut r) = tachyonix::channel(128); - loop { - let (len, src_socket) = match timeout( - Duration::from_secs(120), - self.socket.recv_from(&mut udp_body), - ) - .await - { - Ok(Ok(x)) => x, - Ok(Err(err)) => { - tracing::error!(?err, "udp nat recv failed"); + let self_clone = self.clone(); + let recv_task = ScopedTask::from(tokio::spawn(async move { + let mut cur_buf = BytesMut::new(); + loop { + if self_clone + .stopped + .load(std::sync::atomic::Ordering::Relaxed) + { break; } - Err(err) => { - tracing::error!(?err, "udp nat recv timeout"); - break; + + reserve_buf(&mut cur_buf, 64 * 1024 + 28, 128 * 1024 + 28); + assert_eq!(cur_buf.len(), 0); + unsafe { + cur_buf.advance_mut(28); } - }; - tracing::trace!(?len, ?src_socket, "udp nat packet response received"); + let (len, src_socket) = match timeout( + Duration::from_secs(120), + self_clone.socket.recv_buf_from(&mut cur_buf), + ) + .await + { + Ok(Ok(x)) => x, + Ok(Err(err)) => { + tracing::error!(?err, "udp nat recv failed"); + break; + } + Err(err) => { + tracing::error!(?err, "udp nat recv timeout"); + break; + } + }; - if self.stopped.load(std::sync::atomic::Ordering::Relaxed) { - break; + tracing::trace!(?len, ?src_socket, "udp nat packet response received"); + + let ret_buf = cur_buf.split(); + s.send((ret_buf, len, src_socket)).await.unwrap(); } + })); - let SocketAddr::V4(mut src_v4) = src_socket else { - continue; - }; + let self_clone = self.clone(); + let send_task = ScopedTask::from(tokio::spawn(async move { + let mut ip_id = 1; + while let Ok((mut packet, len, src_socket)) = r.recv().await { + let SocketAddr::V4(mut src_v4) = src_socket else { + continue; + }; - self.mark_active(); + self_clone.mark_active(); - if src_v4.ip().is_loopback() { - src_v4.set_ip(virtual_ipv4); + if src_v4.ip().is_loopback() { + src_v4.set_ip(virtual_ipv4); + } + + let Ok(_) = Self::compose_ipv4_packet( + &self_clone, + &mut packet_sender, + &mut packet, + &src_v4, + len, + 1280, + ip_id, + ) + .await + else { + break; + }; + ip_id = ip_id.wrapping_add(1); } + })); - let Ok(_) = Self::compose_ipv4_packet( - &self, - &mut packet_sender, - &mut buf, - &src_v4, - len, - 1200, - ip_id, - ) - .await - else { - break; - }; - ip_id = ip_id.wrapping_add(1); - } + let _ = tokio::join!(recv_task, send_task); self.stop(); }