From 876d550f681eadc40a3b4cf050d5db7416d5c0fa Mon Sep 17 00:00:00 2001 From: "Sijie.Sun" Date: Sun, 20 Jul 2025 19:15:28 +0800 Subject: [PATCH] reduce memory usage (#1133) Large memory usage comes from: Mimalloc hold large thread cache, causing abort 13M+ usage. QUIC endpoint occupy 3M when GRO is enabled. Smoltcp 64 tcp listener use 2MB. --- .github/workflows/core.yml | 13 ++-- .github/workflows/install_rust.sh | 4 +- CONTRIBUTING.md | 6 +- CONTRIBUTING_zh.md | 38 +++++++---- Cargo.lock | 78 ++++++++++----------- easytier-rpc-build/Cargo.toml | 2 +- easytier/Cargo.toml | 27 ++++---- easytier/src/connector/manual.rs | 17 ++++- easytier/src/easytier-core.rs | 14 ++-- easytier/src/gateway/tcp_proxy.rs | 109 ++++++++++++++++-------------- easytier/src/tunnel/quic.rs | 65 ++++++++++++++++-- 11 files changed, 233 insertions(+), 140 deletions(-) diff --git a/.github/workflows/core.yml b/.github/workflows/core.yml index b200482..f111f1d 100644 --- a/.github/workflows/core.yml +++ b/.github/workflows/core.yml @@ -175,14 +175,17 @@ jobs: fi if [[ $OS =~ ^ubuntu.*$ && $TARGET =~ ^mips.*$ ]]; then - cargo +nightly build -r --target $TARGET -Z build-std=std,panic_abort --package=easytier + cargo +nightly build -r --target $TARGET -Z build-std=std,panic_abort --package=easytier --features=jemalloc else if [[ $OS =~ ^windows.*$ ]]; then SUFFIX=.exe + CORE_FEATURES="--features=mimalloc" + else + CORE_FEATURES="--features=jemalloc" fi cargo build --release --target $TARGET --package=easytier-web --features=embed mv ./target/$TARGET/release/easytier-web"$SUFFIX" ./target/$TARGET/release/easytier-web-embed"$SUFFIX" - cargo build --release --target $TARGET + cargo build --release --target $TARGET $CORE_FEATURES fi # Copied and slightly modified from @lmq8267 (https://github.com/lmq8267) @@ -212,8 +215,8 @@ jobs: rustup set auto-self-update disable - rustup install 1.86 - rustup default 1.86 + rustup install 1.87 + rustup default 1.87 export CC=clang export CXX=clang++ @@ -221,7 +224,7 @@ jobs: cargo build --release --verbose --target $TARGET --package=easytier-web --features=embed mv ./target/$TARGET/release/easytier-web ./target/$TARGET/release/easytier-web-embed - cargo build --release --verbose --target $TARGET + cargo build --release --verbose --target $TARGET --features=mimalloc - name: Compress run: | diff --git a/.github/workflows/install_rust.sh b/.github/workflows/install_rust.sh index 28de5f1..c9d3397 100644 --- a/.github/workflows/install_rust.sh +++ b/.github/workflows/install_rust.sh @@ -29,8 +29,8 @@ fi # see https://github.com/rust-lang/rustup/issues/3709 rustup set auto-self-update disable -rustup install 1.86 -rustup default 1.86 +rustup install 1.87 +rustup default 1.87 # mips/mipsel cannot add target from rustup, need compile by ourselves if [[ $OS =~ ^ubuntu.*$ && $TARGET =~ ^mips.*$ ]]; then diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 86d9471..3fb64b8 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -26,7 +26,7 @@ Thank you for your interest in contributing to EasyTier! This document provides #### Required Tools - Node.js v21 or higher - pnpm v9 or higher -- Rust toolchain (version 1.86) +- Rust toolchain (version 1.87) - LLVM and Clang - Protoc (Protocol Buffers compiler) @@ -79,8 +79,8 @@ sudo apt install -y bridge-utils 2. Install dependencies: ```bash # Install Rust toolchain - rustup install 1.86 - rustup default 1.86 + rustup install 1.87 + rustup default 1.87 # Install project dependencies pnpm -r install diff --git a/CONTRIBUTING_zh.md b/CONTRIBUTING_zh.md index 588fc0d..c2cbea0 100644 --- a/CONTRIBUTING_zh.md +++ b/CONTRIBUTING_zh.md @@ -6,18 +6,26 @@ ## 目录 -- [开发环境配置](#开发环境配置) - - [前置要求](#前置要求) - - [安装步骤](#安装步骤) -- [项目结构](#项目结构) -- [构建指南](#构建指南) - - [构建核心组件](#构建核心组件) - - [构建桌面应用](#构建桌面应用) - - [构建移动应用](#构建移动应用) -- [开发工作流](#开发工作流) -- [测试指南](#测试指南) -- [Pull Request 规范](#pull-request-规范) -- [其他资源](#其他资源) +- [EasyTier 贡献指南](#easytier-贡献指南) + - [目录](#目录) + - [开发环境配置](#开发环境配置) + - [前置要求](#前置要求) + - [必需工具](#必需工具) + - [平台特定依赖](#平台特定依赖) + - [安装步骤](#安装步骤) + - [项目结构](#项目结构) + - [构建指南](#构建指南) + - [构建核心组件](#构建核心组件) + - [构建桌面应用](#构建桌面应用) + - [构建移动应用](#构建移动应用) + - [构建注意事项](#构建注意事项) + - [开发工作流](#开发工作流) + - [测试指南](#测试指南) + - [运行测试](#运行测试) + - [测试要求](#测试要求) + - [Pull Request 规范](#pull-request-规范) + - [其他资源](#其他资源) + - [需要帮助?](#需要帮助) ## 开发环境配置 @@ -26,7 +34,7 @@ #### 必需工具 - Node.js v21 或更高版本 - pnpm v9 或更高版本 -- Rust 工具链(版本 1.86) +- Rust 工具链(版本 1.87) - LLVM 和 Clang - Protoc(Protocol Buffers 编译器) @@ -79,8 +87,8 @@ sudo apt install -y bridge-utils 2. 安装依赖: ```bash # 安装 Rust 工具链 - rustup install 1.86 - rustup default 1.86 + rustup install 1.87 + rustup default 1.87 # 安装项目依赖 pnpm -r install diff --git a/Cargo.lock b/Cargo.lock index bdc3d93..0b88d35 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1992,9 +1992,6 @@ dependencies = [ "http_req", "humansize", "humantime-serde", - "jemalloc-ctl", - "jemalloc-sys", - "jemallocator", "kcp-sys", "machine-uid", "maplit", @@ -2040,6 +2037,9 @@ dependencies = [ "tachyonix", "thiserror 1.0.63", "thunk-rs", + "tikv-jemalloc-ctl", + "tikv-jemalloc-sys", + "tikv-jemallocator", "time", "timedmap", "tokio", @@ -3932,37 +3932,6 @@ dependencies = [ "system-deps", ] -[[package]] -name = "jemalloc-ctl" -version = "0.5.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7cffc705424a344c054e135d12ee591402f4539245e8bbd64e6c9eaa9458b63c" -dependencies = [ - "jemalloc-sys", - "libc", - "paste", -] - -[[package]] -name = "jemalloc-sys" -version = "0.5.4+5.3.0-patched" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac6c1946e1cea1788cbfde01c993b52a10e2da07f4bac608228d1bed20bfebf2" -dependencies = [ - "cc", - "libc", -] - -[[package]] -name = "jemallocator" -version = "0.5.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a0de374a9f8e63150e6f5e8a60cc14c668226d7a347d8aee1a45766e3c4dd3bc" -dependencies = [ - "jemalloc-sys", - "libc", -] - [[package]] name = "jni" version = "0.21.1" @@ -4035,7 +4004,7 @@ dependencies = [ [[package]] name = "kcp-sys" version = "0.1.0" -source = "git+https://github.com/EasyTier/kcp-sys#0f0a0558391ba391c089806c23f369651f6c9eeb" +source = "git+https://github.com/EasyTier/kcp-sys?rev=0f0a0558391ba391c089806c23f369651f6c9eeb#0f0a0558391ba391c089806c23f369651f6c9eeb" dependencies = [ "anyhow", "auto_impl", @@ -4175,9 +4144,9 @@ checksum = "4ec2a862134d2a7d32d7983ddcdd1c4923530833c9f2ea1a44fc5fa473989058" [[package]] name = "libmimalloc-sys" -version = "0.1.42" +version = "0.1.43" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec9d6fac27761dabcd4ee73571cdb06b7022dc99089acbe5435691edffaac0f4" +checksum = "bf88cd67e9de251c1781dbe2f641a1a3ad66eaae831b8a2c38fbdc5ddae16d4d" dependencies = [ "cc", "libc", @@ -4413,9 +4382,9 @@ dependencies = [ [[package]] name = "mimalloc" -version = "0.1.46" +version = "0.1.47" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "995942f432bbb4822a7e9c3faa87a695185b0d09273ba85f097b54f4e458f2af" +checksum = "b1791cbe101e95af5764f06f20f6760521f7158f69dbf9d6baf941ee1bf6bc40" dependencies = [ "libmimalloc-sys", ] @@ -8531,6 +8500,37 @@ dependencies = [ "weezl", ] +[[package]] +name = "tikv-jemalloc-ctl" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f21f216790c8df74ce3ab25b534e0718da5a1916719771d3fec23315c99e468b" +dependencies = [ + "libc", + "paste", + "tikv-jemalloc-sys", +] + +[[package]] +name = "tikv-jemalloc-sys" +version = "0.6.0+5.3.0-1-ge13ca993e8ccb9ba9847cc330696e02839f328f7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd3c60906412afa9c2b5b5a48ca6a5abe5736aec9eb48ad05037a677e52e4e2d" +dependencies = [ + "cc", + "libc", +] + +[[package]] +name = "tikv-jemallocator" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4cec5ff18518d81584f477e9bfdf957f5bb0979b0bac3af4ca30b5b3ae2d2865" +dependencies = [ + "libc", + "tikv-jemalloc-sys", +] + [[package]] name = "time" version = "0.3.41" diff --git a/easytier-rpc-build/Cargo.toml b/easytier-rpc-build/Cargo.toml index f4ca693..d433b46 100644 --- a/easytier-rpc-build/Cargo.toml +++ b/easytier-rpc-build/Cargo.toml @@ -8,7 +8,7 @@ repository = "https://github.com/EasyTier/EasyTier" authors = ["kkrainbow"] keywords = ["vpn", "p2p", "network", "easytier"] categories = ["network-programming", "command-line-utilities"] -rust-version = "1.84.0" +rust-version = "1.87.0" license-file = "LICENSE" readme = "README.md" diff --git a/easytier/Cargo.toml b/easytier/Cargo.toml index b24bc7d..5d26709 100644 --- a/easytier/Cargo.toml +++ b/easytier/Cargo.toml @@ -8,7 +8,7 @@ edition = "2021" authors = ["kkrainbow"] keywords = ["vpn", "p2p", "network", "easytier"] categories = ["network-programming", "command-line-utilities"] -rust-version = "1.84.0" +rust-version = "1.87.0" license-file = "LICENSE" readme = "README.md" @@ -190,7 +190,7 @@ service-manager = { git = "https://github.com/chipsenkbeil/service-manager-rs.gi zstd = { version = "0.13" } -kcp-sys = { git = "https://github.com/EasyTier/kcp-sys" } +kcp-sys = { git = "https://github.com/EasyTier/kcp-sys", rev = "0f0a0558391ba391c089806c23f369651f6c9eeb" } prost-reflect = { version = "0.14.5", default-features = false, features = [ "derive", @@ -213,14 +213,6 @@ humantime-serde = "1.1.1" multimap = "0.10.0" version-compare = "0.2.0" -jemallocator = { version = "0.5.4", optional = true } -jemalloc-ctl = { version = "0.5.4", optional = true } -jemalloc-sys = { version = "0.5.4", features = [ - "stats", - "profiling", - "unprefixed_malloc_on_supported_platforms", -], optional = true } - [target.'cfg(any(target_os = "linux", target_os = "macos", target_os = "windows", target_os = "freebsd"))'.dependencies] machine-uid = "0.5.3" @@ -255,6 +247,15 @@ windows-sys = { version = "0.52", features = [ ]} winapi = { version = "0.3.9", features = ["impl-default"] } +[target.'cfg(not(windows))'.dependencies] +jemallocator = { package = "tikv-jemallocator", version = "0.6.0", optional = true } +jemalloc-ctl = { package = "tikv-jemalloc-ctl", version = "0.6.0", optional = true, features = [ +] } +jemalloc-sys = { package = "tikv-jemalloc-sys", version = "0.6.0", features = [ + "background_threads_runtime_support", + "background_threads", +], optional = true } + [build-dependencies] tonic-build = "0.12" globwalk = "0.8.1" @@ -289,11 +290,10 @@ tokio-socks = "0.5.2" [features] -default = ["wireguard", "mimalloc", "websocket", "smoltcp", "tun", "socks5", "quic"] +default = ["wireguard", "websocket", "smoltcp", "tun", "socks5", "quic"] full = [ "websocket", "wireguard", - "mimalloc", "aes-gcm", "smoltcp", "tun", @@ -313,4 +313,5 @@ websocket = [ ] smoltcp = ["dep:smoltcp", "dep:parking_lot"] socks5 = ["dep:smoltcp"] -jemalloc = ["dep:jemallocator", "dep:jemalloc-ctl", "dep:jemalloc-sys"] +jemalloc = ["dep:jemallocator", "dep:jemalloc-sys"] +jemalloc-prof = ["jemalloc", "dep:jemalloc-ctl", "jemalloc-ctl/stats", "jemalloc-sys/profiling", "jemalloc-sys/stats"] diff --git a/easytier/src/connector/manual.rs b/easytier/src/connector/manual.rs index 1ece6f4..01e70f6 100644 --- a/easytier/src/connector/manual.rs +++ b/easytier/src/connector/manual.rs @@ -242,7 +242,7 @@ impl ManualConnectorManager { tasks.lock().unwrap().spawn(async move { let reconn_ret = Self::conn_reconnect(data_clone.clone(), dead_url.clone(), connector.clone()).await; - sender.send(reconn_ret).await.unwrap(); + let _ = sender.send(reconn_ret).await; data_clone.reconnecting.remove(&dead_url).unwrap(); data_clone.connectors.insert(dead_url.clone(), connector); @@ -373,7 +373,20 @@ impl ManualConnectorManager { if u.scheme() == "ring" || u.scheme() == "txt" || u.scheme() == "srv" { ip_versions.push(IpVersion::Both); } else { - let addrs = u.socket_addrs(|| Some(1000))?; + let addrs = match u.socket_addrs(|| Some(1000)) { + Ok(addrs) => addrs, + Err(e) => { + data.global_ctx.issue_event(GlobalCtxEvent::ConnectError( + dead_url.clone(), + format!("{:?}", IpVersion::Both), + format!("{:?}", e), + )); + return Err(Error::AnyhowError(anyhow::anyhow!( + "get ip from url failed: {:?}", + e + ))); + } + }; tracing::info!(?addrs, ?dead_url, "get ip from url done"); let mut has_ipv4 = false; let mut has_ipv6 = false; diff --git a/easytier/src/easytier-core.rs b/easytier/src/easytier-core.rs index 4126cf4..0d80837 100644 --- a/easytier/src/easytier-core.rs +++ b/easytier/src/easytier-core.rs @@ -4,7 +4,10 @@ extern crate rust_i18n; use std::{ - net::{Ipv4Addr, SocketAddr}, path::PathBuf, process::ExitCode, sync::Arc + net::{Ipv4Addr, SocketAddr}, + path::PathBuf, + process::ExitCode, + sync::Arc, }; use anyhow::Context; @@ -42,7 +45,7 @@ use mimalloc::MiMalloc; #[global_allocator] static GLOBAL_MIMALLOC: MiMalloc = MiMalloc; -#[cfg(feature = "jemalloc")] +#[cfg(feature = "jemalloc-prof")] use jemalloc_ctl::{epoch, stats, Access as _, AsName as _}; #[cfg(feature = "jemalloc")] @@ -50,7 +53,7 @@ use jemalloc_ctl::{epoch, stats, Access as _, AsName as _}; static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc; fn set_prof_active(_active: bool) { - #[cfg(feature = "jemalloc")] + #[cfg(feature = "jemalloc-prof")] { const PROF_ACTIVE: &'static [u8] = b"prof.active\0"; let name = PROF_ACTIVE.name(); @@ -59,7 +62,7 @@ fn set_prof_active(_active: bool) { } fn dump_profile(_cur_allocated: usize) { - #[cfg(feature = "jemalloc")] + #[cfg(feature = "jemalloc-prof")] { const PROF_DUMP: &'static [u8] = b"prof.dump\0"; static mut PROF_DUMP_FILE_NAME: [u8; 128] = [0; 128]; @@ -811,7 +814,6 @@ impl NetworkOptions { if let Some(dev_name) = &self.dev_name { f.dev_name = dev_name.clone() } - println!("mtu: {}, {:?}", f.mtu, self.mtu); if let Some(mtu) = self.mtu { f.mtu = mtu as u32; } @@ -1094,7 +1096,7 @@ async fn run_main(cli: Cli) -> anyhow::Result<()> { } fn memory_monitor() { - #[cfg(feature = "jemalloc")] + #[cfg(feature = "jemalloc-prof")] { let mut last_peak_size = 0; let e = epoch::mib().unwrap(); diff --git a/easytier/src/gateway/tcp_proxy.rs b/easytier/src/gateway/tcp_proxy.rs index d56d909..37299fd 100644 --- a/easytier/src/gateway/tcp_proxy.rs +++ b/easytier/src/gateway/tcp_proxy.rs @@ -15,7 +15,6 @@ use std::sync::{Arc, Weak}; use std::time::{Duration, Instant}; use tokio::io::{copy_bidirectional, AsyncRead, AsyncWrite, AsyncWriteExt}; use tokio::net::{TcpListener, TcpSocket, TcpStream}; -use tokio::select; use tokio::sync::{mpsc, Mutex}; use tokio::task::JoinSet; use tokio::time::timeout; @@ -210,67 +209,62 @@ impl ProxyTcpStream { } } +type SmolTcpAcceptResult = Result<(tokio_smoltcp::TcpStream, SocketAddr)>; #[cfg(feature = "smoltcp")] struct SmolTcpListener { - listener_task: JoinSet<()>, - listen_count: usize, + stream_tx: mpsc::UnboundedSender, + stream_rx: mpsc::UnboundedReceiver, - stream_rx: mpsc::UnboundedReceiver>, + tasks: Arc>>, } #[cfg(feature = "smoltcp")] impl SmolTcpListener { - pub async fn new(net: Arc>>, listen_count: usize) -> Self { - let mut tasks = JoinSet::new(); + pub async fn new() -> Self { + let tasks = Arc::new(std::sync::Mutex::new(JoinSet::new())); + join_joinset_background(tasks.clone(), "smoltcp listener".to_owned()); let (tx, rx) = mpsc::unbounded_channel(); - let locked_net = net.lock().await; - for _ in 0..listen_count { - let mut tcp = locked_net - .as_ref() - .unwrap() - .tcp_bind("0.0.0.0:8899".parse().unwrap()) - .await - .unwrap(); - let tx = tx.clone(); - tasks.spawn(async move { - let mut not_listening_count = 0; - loop { - select! { - _ = tokio::time::sleep(Duration::from_secs(2)) => { - if tcp.is_listening() { - not_listening_count = 0; - continue; - } - - not_listening_count += 1; - if not_listening_count >= 2 { - tracing::error!("smol tcp listener not listening"); - tcp.relisten(); - } - } - accept_ret = tcp.accept() => { - tx.send(accept_ret.map_err(|e| { - anyhow::anyhow!("smol tcp listener accept failed: {:?}", e).into() - })) - .unwrap(); - not_listening_count = 0; - } - } - } - }); - } Self { - listener_task: tasks, - listen_count, + stream_tx: tx, stream_rx: rx, + tasks, } } - pub async fn accept(&mut self) -> Result<(tokio_smoltcp::TcpStream, SocketAddr)> { + pub async fn accept(&mut self) -> SmolTcpAcceptResult { self.stream_rx.recv().await.unwrap() } + + pub fn stream_tx(&self) -> mpsc::UnboundedSender { + self.stream_tx.clone() + } + + pub async fn add_listener( + tx: mpsc::UnboundedSender, + net: Arc>>, + tasks: Arc>>, + ) { + let locked_net = net.lock().await; + let mut tcp = locked_net + .as_ref() + .unwrap() + .tcp_bind("0.0.0.0:8899".parse().unwrap()) + .await + .unwrap(); + tasks.lock().unwrap().spawn(async move { + let ret = timeout(Duration::from_secs(10), tcp.accept()).await; + if let Ok(accept_ret) = ret { + tx.send(accept_ret.map_err(|e| { + anyhow::anyhow!("smol tcp listener accept failed: {:?}", e).into() + })) + .unwrap(); + } else { + tracing::error!("smol tcp listener accept timeout"); + } + }); + } } enum ProxyTcpListener { @@ -323,6 +317,7 @@ pub struct TcpProxy { smoltcp_stack_receiver: Arc>>>, #[cfg(feature = "smoltcp")] smoltcp_net: Arc>>, + smoltcp_listener_tx: std::sync::Mutex>>, enable_smoltcp: Arc, connector: C, @@ -332,10 +327,7 @@ pub struct TcpProxy { impl PeerPacketFilter for TcpProxy { async fn try_process_packet_from_peer(&self, mut packet: ZCPacket) -> Option { if let Some(_) = self.try_handle_peer_packet(&mut packet).await { - if self - .enable_smoltcp - .load(std::sync::atomic::Ordering::Relaxed) - { + if self.is_smoltcp_enabled() { let smoltcp_stack_sender = self.smoltcp_stack_sender.as_ref().unwrap(); if let Err(e) = smoltcp_stack_sender.try_send(packet) { tracing::error!("send to smoltcp stack failed: {:?}", e); @@ -455,6 +447,7 @@ impl TcpProxy { #[cfg(feature = "smoltcp")] smoltcp_net: Arc::new(Mutex::new(None)), + smoltcp_listener_tx: std::sync::Mutex::new(None), enable_smoltcp: Arc::new(AtomicBool::new(true)), @@ -584,7 +577,11 @@ impl TcpProxy { ); net.set_any_ip(true); self.smoltcp_net.lock().await.replace(net); - let tcp = SmolTcpListener::new(self.smoltcp_net.clone(), 64).await; + let tcp = SmolTcpListener::new().await; + self.smoltcp_listener_tx + .lock() + .unwrap() + .replace(tcp.stream_tx()); self.enable_smoltcp .store(true, std::sync::atomic::Ordering::Relaxed); @@ -865,6 +862,18 @@ impl TcpProxy { .syn_map .insert(src, Arc::new(NatDstEntry::new(src, real_dst, mapped_dst))); tracing::info!(src = ?src, ?real_dst, ?mapped_dst, old_entry = ?old_val, "tcp syn received"); + + // if smoltcp is enabled, add the listener to the net + if self.is_smoltcp_enabled() { + let smoltcp_listener_tx = self.smoltcp_listener_tx.lock().unwrap().clone().unwrap(); + SmolTcpListener::add_listener( + smoltcp_listener_tx, + self.smoltcp_net.clone(), + self.tasks.clone(), + ) + .await; + tracing::info!("smol tcp listener added for src: {:?}", src); + } } else if !self.addr_conn_map.contains_key(&src) && !self.syn_map.contains_key(&src) { // if not in syn map and addr conn map, may forwarding n2n packet return None; diff --git a/easytier/src/tunnel/quic.rs b/easytier/src/tunnel/quic.rs index b7fb0ff..559f226 100644 --- a/easytier/src/tunnel/quic.rs +++ b/easytier/src/tunnel/quic.rs @@ -2,7 +2,9 @@ //! //! Checkout the `README.md` for guidance. -use std::{error::Error, net::SocketAddr, sync::Arc, time::Duration}; +use std::{ + error::Error, io::IoSliceMut, net::SocketAddr, pin::Pin, sync::Arc, task::Poll, time::Duration, +}; use crate::tunnel::{ common::{FramedReader, FramedWriter, TunnelWrapper}, @@ -11,8 +13,8 @@ use crate::tunnel::{ use anyhow::Context; use quinn::{ - congestion::BbrConfig, crypto::rustls::QuicClientConfig, ClientConfig, Connection, Endpoint, - ServerConfig, TransportConfig, + congestion::BbrConfig, crypto::rustls::QuicClientConfig, udp::RecvMeta, AsyncUdpSocket, + ClientConfig, Connection, Endpoint, EndpointConfig, ServerConfig, TransportConfig, UdpPoller, }; use super::{ @@ -35,6 +37,48 @@ pub fn configure_client() -> ClientConfig { client_config } +#[derive(Clone, Debug)] +struct NoGroAsyncUdpSocket { + inner: Arc, +} + +impl AsyncUdpSocket for NoGroAsyncUdpSocket { + fn create_io_poller(self: Arc) -> Pin> { + self.inner.clone().create_io_poller() + } + + fn try_send(&self, transmit: &quinn::udp::Transmit) -> std::io::Result<()> { + self.inner.try_send(transmit) + } + + /// Receive UDP datagrams, or register to be woken if receiving may succeed in the future + fn poll_recv( + &self, + cx: &mut std::task::Context, + bufs: &mut [IoSliceMut<'_>], + meta: &mut [RecvMeta], + ) -> Poll> { + self.inner.poll_recv(cx, bufs, meta) + } + + /// Look up the local IP address and port used by this socket + fn local_addr(&self) -> std::io::Result { + self.inner.local_addr() + } + + fn may_fragment(&self) -> bool { + self.inner.may_fragment() + } + + fn max_transmit_segments(&self) -> usize { + self.inner.max_transmit_segments() + } + + fn max_receive_segments(&self) -> usize { + 1 + } +} + /// Constructs a QUIC endpoint configured to listen for incoming connections on a certain address /// and port. /// @@ -45,7 +89,20 @@ pub fn configure_client() -> ClientConfig { #[allow(unused)] pub fn make_server_endpoint(bind_addr: SocketAddr) -> Result<(Endpoint, Vec), Box> { let (server_config, server_cert) = configure_server()?; - let endpoint = Endpoint::server(server_config, bind_addr)?; + let socket = std::net::UdpSocket::bind(bind_addr)?; + let runtime = quinn::default_runtime() + .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::Other, "no async runtime found"))?; + let mut endpoint_config = EndpointConfig::default(); + endpoint_config.max_udp_payload_size(1200)?; + let socket = NoGroAsyncUdpSocket { + inner: runtime.wrap_udp_socket(socket)?, + }; + let endpoint = Endpoint::new_with_abstract_socket( + endpoint_config, + Some(server_config), + Arc::new(socket), + runtime, + )?; Ok((endpoint, server_cert)) }