batch recv for udp proxy (#552)

This commit is contained in:
Sijie.Sun
2025-01-07 23:52:18 +08:00
committed by GitHub
parent e016aeddeb
commit d2ec60e108
2 changed files with 67 additions and 44 deletions

View File

@@ -185,7 +185,7 @@ struct Cli {
#[arg(
long,
help = t!("core_clap.multi_thread").to_string(),
default_value = "false"
default_value = "true"
)]
multi_thread: bool,

View File

@@ -4,6 +4,7 @@ use std::{
time::Duration,
};
use bytes::{BufMut, BytesMut};
use cidr::Ipv4Inet;
use crossbeam::atomic::AtomicCell;
use dashmap::DashMap;
@@ -24,11 +25,11 @@ use tokio::{
use tracing::Level;
use crate::{
common::{error::Error, global_ctx::ArcGlobalCtx, PeerId},
common::{error::Error, global_ctx::ArcGlobalCtx, scoped_task::ScopedTask, PeerId},
gateway::ip_reassembler::compose_ipv4_packet,
peers::{peer_manager::PeerManager, PeerPacketFilter},
tunnel::{
common::setup_sokcet2,
common::{reserve_buf, setup_sokcet2},
packet_def::{PacketType, ZCPacket},
},
};
@@ -139,59 +140,81 @@ impl UdpNatEntry {
mut packet_sender: Sender<ZCPacket>,
virtual_ipv4: Ipv4Addr,
) {
let mut buf = [0u8; 65536];
let mut udp_body: &mut [u8] = unsafe { std::mem::transmute(&mut buf[20 + 8..]) };
let mut ip_id = 1;
let (s, mut r) = tachyonix::channel(128);
loop {
let (len, src_socket) = match timeout(
Duration::from_secs(120),
self.socket.recv_from(&mut udp_body),
)
.await
{
Ok(Ok(x)) => x,
Ok(Err(err)) => {
tracing::error!(?err, "udp nat recv failed");
let self_clone = self.clone();
let recv_task = ScopedTask::from(tokio::spawn(async move {
let mut cur_buf = BytesMut::new();
loop {
if self_clone
.stopped
.load(std::sync::atomic::Ordering::Relaxed)
{
break;
}
Err(err) => {
tracing::error!(?err, "udp nat recv timeout");
break;
reserve_buf(&mut cur_buf, 64 * 1024 + 28, 128 * 1024 + 28);
assert_eq!(cur_buf.len(), 0);
unsafe {
cur_buf.advance_mut(28);
}
};
tracing::trace!(?len, ?src_socket, "udp nat packet response received");
let (len, src_socket) = match timeout(
Duration::from_secs(120),
self_clone.socket.recv_buf_from(&mut cur_buf),
)
.await
{
Ok(Ok(x)) => x,
Ok(Err(err)) => {
tracing::error!(?err, "udp nat recv failed");
break;
}
Err(err) => {
tracing::error!(?err, "udp nat recv timeout");
break;
}
};
if self.stopped.load(std::sync::atomic::Ordering::Relaxed) {
break;
tracing::trace!(?len, ?src_socket, "udp nat packet response received");
let ret_buf = cur_buf.split();
s.send((ret_buf, len, src_socket)).await.unwrap();
}
}));
let SocketAddr::V4(mut src_v4) = src_socket else {
continue;
};
let self_clone = self.clone();
let send_task = ScopedTask::from(tokio::spawn(async move {
let mut ip_id = 1;
while let Ok((mut packet, len, src_socket)) = r.recv().await {
let SocketAddr::V4(mut src_v4) = src_socket else {
continue;
};
self.mark_active();
self_clone.mark_active();
if src_v4.ip().is_loopback() {
src_v4.set_ip(virtual_ipv4);
if src_v4.ip().is_loopback() {
src_v4.set_ip(virtual_ipv4);
}
let Ok(_) = Self::compose_ipv4_packet(
&self_clone,
&mut packet_sender,
&mut packet,
&src_v4,
len,
1280,
ip_id,
)
.await
else {
break;
};
ip_id = ip_id.wrapping_add(1);
}
}));
let Ok(_) = Self::compose_ipv4_packet(
&self,
&mut packet_sender,
&mut buf,
&src_v4,
len,
1200,
ip_id,
)
.await
else {
break;
};
ip_id = ip_id.wrapping_add(1);
}
let _ = tokio::join!(recv_task, send_task);
self.stop();
}