From 671b8d5a0c01a0a910bff4366a8935ee502430d8 Mon Sep 17 00:00:00 2001 From: "Sijie.Sun" Date: Tue, 26 Aug 2025 08:37:31 +0800 Subject: [PATCH] fix quic transport (#1293) --- easytier/src/tunnel/quic.rs | 92 ++++++++++++++++++++----------------- 1 file changed, 51 insertions(+), 41 deletions(-) diff --git a/easytier/src/tunnel/quic.rs b/easytier/src/tunnel/quic.rs index 49695d9..bddfff1 100644 --- a/easytier/src/tunnel/quic.rs +++ b/easytier/src/tunnel/quic.rs @@ -7,7 +7,7 @@ use std::{ }; use crate::tunnel::{ - common::{FramedReader, FramedWriter, TunnelWrapper}, + common::{setup_sokcet2, FramedReader, FramedWriter, TunnelWrapper}, TunnelInfo, }; use anyhow::Context; @@ -89,12 +89,20 @@ impl AsyncUdpSocket for NoGroAsyncUdpSocket { #[allow(unused)] pub fn make_server_endpoint(bind_addr: SocketAddr) -> Result<(Endpoint, Vec), Box> { let (server_config, server_cert) = configure_server()?; - let socket = std::net::UdpSocket::bind(bind_addr)?; + + let socket2_socket = socket2::Socket::new( + socket2::Domain::for_address(bind_addr), + socket2::Type::DGRAM, + Some(socket2::Protocol::UDP), + )?; + setup_sokcet2(&socket2_socket, &bind_addr)?; + let socket = std::net::UdpSocket::from(socket2_socket); + let runtime = quinn::default_runtime().ok_or_else(|| std::io::Error::other("no async runtime found"))?; let mut endpoint_config = EndpointConfig::default(); endpoint_config.max_udp_payload_size(1200)?; - let socket = NoGroAsyncUdpSocket { + let socket: NoGroAsyncUdpSocket = NoGroAsyncUdpSocket { inner: runtime.wrap_udp_socket(socket)?, }; let endpoint = Endpoint::new_with_abstract_socket( @@ -147,46 +155,17 @@ impl QUICTunnelListener { server_cert: None, } } -} -#[async_trait::async_trait] -impl TunnelListener for QUICTunnelListener { - async fn listen(&mut self) -> Result<(), TunnelError> { - let addr = - check_scheme_and_get_socket_addr::(&self.addr, "quic", IpVersion::Both) - .await?; - let (endpoint, server_cert) = make_server_endpoint(addr).unwrap(); - self.endpoint = Some(endpoint); - self.server_cert = Some(server_cert); - - self.addr - .set_port(Some(self.endpoint.as_ref().unwrap().local_addr()?.port())) - .unwrap(); - - Ok(()) - } - - async fn accept(&mut self) -> Result, super::TunnelError> { + async fn do_accept(&mut self) -> Result, super::TunnelError> { // accept a single connection - let conn = loop { - let Some(incoming_conn) = self.endpoint.as_ref().unwrap().accept().await else { - tokio::time::sleep(Duration::from_millis(100)).await; - continue; - }; - match incoming_conn.await { - Ok(conn) => { - tracing::info!( - "[server] connection accepted: addr={}", - conn.remote_address() - ); - break conn; - } - Err(e) => { - tracing::error!("[server] accept connection failed: {:?}", e); - tokio::time::sleep(Duration::from_millis(100)).await; - } - } - }; + let conn = self + .endpoint + .as_ref() + .unwrap() + .accept() + .await + .ok_or_else(|| anyhow::anyhow!("accept failed, no incoming"))?; + let conn = conn.await.with_context(|| "accept connection failed")?; let remote_addr = conn.remote_address(); let (w, r) = conn.accept_bi().await.with_context(|| "accept_bi failed")?; @@ -206,6 +185,37 @@ impl TunnelListener for QUICTunnelListener { Some(info), ))) } +} + +#[async_trait::async_trait] +impl TunnelListener for QUICTunnelListener { + async fn listen(&mut self) -> Result<(), TunnelError> { + let addr = + check_scheme_and_get_socket_addr::(&self.addr, "quic", IpVersion::Both) + .await?; + let (endpoint, server_cert) = make_server_endpoint(addr) + .map_err(|e| anyhow::anyhow!("make server endpoint error: {:?}", e))?; + self.endpoint = Some(endpoint); + self.server_cert = Some(server_cert); + + self.addr + .set_port(Some(self.endpoint.as_ref().unwrap().local_addr()?.port())) + .unwrap(); + + Ok(()) + } + + async fn accept(&mut self) -> Result, super::TunnelError> { + loop { + match self.do_accept().await { + Ok(ret) => return Ok(ret), + Err(e) => { + tracing::warn!(?e, "accept fail"); + tokio::time::sleep(Duration::from_millis(1)).await; + } + } + } + } fn local_url(&self) -> url::Url { self.addr.clone()