From 23f69ce6a412a61997d612183c05844107d8de15 Mon Sep 17 00:00:00 2001 From: "Sijie.Sun" Date: Mon, 17 Mar 2025 10:46:14 +0800 Subject: [PATCH] improve direct connector (#685) * support ipv6 stun * show interface and public ip in cli node info * direct conn should keep trying unless already direct connected * peer should use conn with smallest latency * deprecate ipv6_listener, use -l instead --- Cargo.lock | 18 +- easytier/Cargo.toml | 2 + easytier/src/common/config.rs | 1 - easytier/src/common/stun.rs | 210 ++++++++++++++++-- easytier/src/connector/direct.rs | 208 +++++++---------- easytier/src/connector/dns_connector.rs | 74 +++--- easytier/src/connector/manual.rs | 16 +- .../src/connector/udp_hole_punch/common.rs | 2 +- easytier/src/easytier-cli.rs | 40 +++- easytier/src/easytier-core.rs | 11 - easytier/src/instance/listeners.rs | 46 ++-- easytier/src/peers/foreign_network_manager.rs | 3 +- easytier/src/peers/peer.rs | 42 +++- easytier/src/peers/peer_manager.rs | 81 ++++++- easytier/src/peers/peer_map.rs | 5 + easytier/src/peers/rpc_service.rs | 15 +- easytier/src/peers/tests.rs | 2 +- easytier/src/proto/cli.proto | 4 + easytier/src/proto/cli.rs | 4 + easytier/src/proto/common.proto | 2 +- easytier/src/tunnel/common.rs | 8 +- easytier/src/tunnel/mod.rs | 2 +- easytier/src/tunnel/tcp.rs | 6 +- easytier/src/tunnel/udp.rs | 25 ++- 24 files changed, 558 insertions(+), 269 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0ca4fa9..139d632 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -817,6 +817,15 @@ dependencies = [ "syn 2.0.87", ] +[[package]] +name = "bounded_join_set" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae18fd8f4a623bcf416b5bc8f1e0905534d9911597ed17cc57ab9b6eed65454d" +dependencies = [ + "tokio", +] + [[package]] name = "brotli" version = "7.0.0" @@ -1881,6 +1890,7 @@ dependencies = [ "base64 0.22.1", "bitflags 2.8.0", "boringtun-easytier", + "bounded_join_set", "bytecodec", "byteorder", "bytes", @@ -8135,9 +8145,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.39.2" +version = "1.44.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "daa4fb1bc778bd6f04cbfc4bb2d06a7396a8f299dc33ea1900cedaa316f467b1" +checksum = "f382da615b842244d4b8738c82ed1275e6c5dd90c459a30941cd07080b06c91a" dependencies = [ "backtrace", "bytes", @@ -8153,9 +8163,9 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "2.4.0" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" +checksum = "6e06d43f1345a3bcd39f6a56dbb7dcab2ba47e68e8ac134855e7e2bdbaf8cab8" dependencies = [ "proc-macro2", "quote", diff --git a/easytier/Cargo.toml b/easytier/Cargo.toml index 78ba3ab..f15173d 100644 --- a/easytier/Cargo.toml +++ b/easytier/Cargo.toml @@ -192,6 +192,8 @@ http_req = { git = "https://github.com/EasyTier/http_req.git", default-features # for dns connector hickory-resolver = "0.24.4" +bounded_join_set = "0.3.0" + [target.'cfg(any(target_os = "linux", target_os = "macos", target_os = "windows", target_os = "freebsd"))'.dependencies] machine-uid = "0.5.3" diff --git a/easytier/src/common/config.rs b/easytier/src/common/config.rs index fb89a59..d6a727a 100644 --- a/easytier/src/common/config.rs +++ b/easytier/src/common/config.rs @@ -27,7 +27,6 @@ pub fn gen_default_flags() -> Flags { disable_p2p: false, relay_all_peer_rpc: false, disable_udp_hole_punching: false, - ipv6_listener: "udp://[::]:0".to_string(), multi_thread: true, data_compress_algo: CompressionAlgoPb::None.into(), bind_device: true, diff --git a/easytier/src/common/stun.rs b/easytier/src/common/stun.rs index 301efcd..8ec6544 100644 --- a/easytier/src/common/stun.rs +++ b/easytier/src/common/stun.rs @@ -1,5 +1,5 @@ use std::collections::BTreeSet; -use std::net::{IpAddr, SocketAddr}; +use std::net::{IpAddr, Ipv6Addr, SocketAddr}; use std::sync::atomic::AtomicBool; use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; @@ -8,6 +8,8 @@ use crate::proto::common::{NatType, StunInfo}; use anyhow::Context; use chrono::Local; use crossbeam::atomic::AtomicCell; +use hickory_resolver::config::{NameServerConfig, Protocol, ResolverConfig, ResolverOpts}; +use hickory_resolver::TokioAsyncResolver; use rand::seq::IteratorRandom; use tokio::net::{lookup_host, UdpSocket}; use tokio::sync::{broadcast, Mutex}; @@ -22,21 +24,68 @@ use crate::common::error::Error; use super::stun_codec_ext::*; +pub fn get_default_resolver_config() -> ResolverConfig { + let mut default_resolve_config = ResolverConfig::new(); + default_resolve_config.add_name_server(NameServerConfig::new( + "223.5.5.5:53".parse().unwrap(), + Protocol::Udp, + )); + default_resolve_config.add_name_server(NameServerConfig::new( + "180.184.1.1:53".parse().unwrap(), + Protocol::Udp, + )); + default_resolve_config +} + +pub async fn resolve_txt_record( + domain_name: &str, + resolver: &TokioAsyncResolver, +) -> Result { + let response = resolver.txt_lookup(domain_name).await.with_context(|| { + format!( + "txt_lookup failed, domain_name: {}", + domain_name.to_string() + ) + })?; + + let txt_record = response.iter().next().with_context(|| { + format!( + "no txt record found, domain_name: {}", + domain_name.to_string() + ) + })?; + + let txt_data = String::from_utf8_lossy(&txt_record.txt_data()[0]); + tracing::info!(?txt_data, ?domain_name, "get txt record"); + + Ok(txt_data.to_string()) +} + struct HostResolverIter { hostnames: Vec, ips: Vec, max_ip_per_domain: u32, + use_ipv6: bool, } impl HostResolverIter { - fn new(hostnames: Vec, max_ip_per_domain: u32) -> Self { + fn new(hostnames: Vec, max_ip_per_domain: u32, use_ipv6: bool) -> Self { Self { hostnames, ips: vec![], max_ip_per_domain, + use_ipv6, } } + async fn get_txt_record(domain_name: &str) -> Result, Error> { + let resolver = TokioAsyncResolver::tokio_from_system_conf().unwrap_or( + TokioAsyncResolver::tokio(get_default_resolver_config(), ResolverOpts::default()), + ); + let txt_data = resolve_txt_record(domain_name, &resolver).await?; + Ok(txt_data.split(" ").map(|x| x.to_string()).collect()) + } + #[async_recursion::async_recursion] async fn next(&mut self) -> Option { if self.ips.is_empty() { @@ -51,10 +100,35 @@ impl HostResolverIter { format!("{}:3478", host) }; + if host.starts_with("txt:") { + let domain_name = host.trim_start_matches("txt:"); + match Self::get_txt_record(domain_name).await { + Ok(hosts) => { + tracing::info!( + ?domain_name, + ?hosts, + "get txt record success when resolve stun server" + ); + // insert hosts to the head of hostnames + self.hostnames.splice(0..0, hosts.into_iter()); + } + Err(e) => { + tracing::warn!( + ?domain_name, + ?e, + "get txt record failed when resolve stun server" + ); + } + } + return self.next().await; + } + + let use_ipv6 = self.use_ipv6; + match lookup_host(&host).await { Ok(ips) => { self.ips = ips - .filter(|x| x.is_ipv4()) + .filter(|x| if use_ipv6 { x.is_ipv6() } else { x.is_ipv4() }) .choose_multiple(&mut rand::thread_rng(), self.max_ip_per_domain as usize); if self.ips.is_empty() { @@ -400,7 +474,7 @@ impl UdpNatTypeDetectResult { // find resp with distinct stun server self.stun_resps .iter() - .map(|x| x.stun_server_addr) + .map(|x| x.recv_from_addr) .collect::>() .len() } @@ -555,8 +629,11 @@ impl UdpNatTypeDetector { udp: Arc, ) -> Result { let mut stun_servers = vec![]; - let mut host_resolver = - HostResolverIter::new(self.stun_server_hosts.clone(), self.max_ip_per_domain); + let mut host_resolver = HostResolverIter::new( + self.stun_server_hosts.clone(), + self.max_ip_per_domain, + false, + ); while let Some(addr) = host_resolver.next().await { stun_servers.push(addr); } @@ -602,7 +679,9 @@ pub trait StunInfoCollectorTrait: Send + Sync { pub struct StunInfoCollector { stun_servers: Arc>>, + stun_servers_v6: Arc>>, udp_nat_test_result: Arc>>, + public_ipv6: Arc>>, nat_test_result_time: Arc>>, redetect_notify: Arc, tasks: std::sync::Mutex>, @@ -621,7 +700,12 @@ impl StunInfoCollectorTrait for StunInfoCollector { udp_nat_type: result.nat_type() as i32, tcp_nat_type: 0, last_update_time: self.nat_test_result_time.load().timestamp(), - public_ip: result.public_ips().iter().map(|x| x.to_string()).collect(), + public_ip: result + .public_ips() + .iter() + .map(|x| x.to_string()) + .chain(self.public_ipv6.load().map(|x| x.to_string())) + .collect(), min_port: result.min_port() as u32, max_port: result.max_port() as u32, } @@ -640,7 +724,7 @@ impl StunInfoCollectorTrait for StunInfoCollector { if stun_servers.is_empty() { let mut host_resolver = - HostResolverIter::new(self.stun_servers.read().unwrap().clone(), 2); + HostResolverIter::new(self.stun_servers.read().unwrap().clone(), 2, false); while let Some(addr) = host_resolver.next().await { stun_servers.push(addr); if stun_servers.len() >= 2 { @@ -680,7 +764,9 @@ impl StunInfoCollector { pub fn new(stun_servers: Vec) -> Self { Self { stun_servers: Arc::new(RwLock::new(stun_servers)), + stun_servers_v6: Arc::new(RwLock::new(Self::get_default_servers_v6())), udp_nat_test_result: Arc::new(RwLock::new(None)), + public_ipv6: Arc::new(AtomicCell::new(None)), nat_test_result_time: Arc::new(AtomicCell::new(Local::now())), redetect_notify: Arc::new(tokio::sync::Notify::new()), tasks: std::sync::Mutex::new(JoinSet::new()), @@ -696,28 +782,42 @@ impl StunInfoCollector { // NOTICE: we may need to choose stun stun server based on geo location // stun server cross nation may return a external ip address with high latency and loss rate vec![ + "txt:stun.easytier.cn", "stun.miwifi.com", "stun.chat.bilibili.com", "stun.hitv.com", - "stun.cdnbye.com", - "stun.douyucdn.cn:18000", - "fwa.lifesizecloud.com", - "global.turn.twilio.com", - "turn.cloudflare.com", - "stun.isp.net.au", - "stun.nextcloud.com", - "stun.freeswitch.org", - "stun.voip.blackberry.com", - "stunserver.stunprotocol.org", - "stun.sipnet.com", - "stun.radiojar.com", - "stun.sonetel.com", ] .iter() .map(|x| x.to_string()) .collect() } + pub fn get_default_servers_v6() -> Vec { + vec!["txt:stun-v6.easytier.cn"] + .iter() + .map(|x| x.to_string()) + .collect() + } + + async fn get_public_ipv6(servers: &Vec) -> Option { + let mut ips = HostResolverIter::new(servers.to_vec(), 10, true); + while let Some(ip) = ips.next().await { + let udp = Arc::new(UdpSocket::bind(format!("[::]:0")).await.unwrap()); + let ret = StunClientBuilder::new(udp.clone()) + .new_stun_client(ip) + .bind_request(false, false) + .await; + tracing::debug!(?ret, "finish ipv6 udp nat type detect"); + match ret.map(|x| x.mapped_socket_addr.map(|x| x.ip())) { + Ok(Some(IpAddr::V6(v6))) => { + return Some(v6); + } + _ => {} + } + } + None + } + fn start_stun_routine(&self) { if self.started.load(std::sync::atomic::Ordering::Relaxed) { return; @@ -784,6 +884,30 @@ impl StunInfoCollector { } } }); + + // for ipv6 + let stun_servers = self.stun_servers_v6.clone(); + let stored_ipv6 = self.public_ipv6.clone(); + let redetect_notify = self.redetect_notify.clone(); + self.tasks.lock().unwrap().spawn(async move { + loop { + let servers = stun_servers.read().unwrap().clone(); + Self::get_public_ipv6(&servers) + .await + .map(|x| stored_ipv6.store(Some(x))); + + let sleep_sec = if stored_ipv6.load().is_none() { + 60 + } else { + 360 + }; + + tokio::select! { + _ = redetect_notify.notified() => {} + _ = tokio::time::sleep(Duration::from_secs(sleep_sec)) => {} + } + } + }); } pub fn update_stun_info(&self) { @@ -862,6 +986,48 @@ mod tests { let detector = UdpNatTypeDetector::new(stun_servers, 1); let ret = detector.detect_nat_type(0).await; println!("{:#?}, {:?}", ret, ret.as_ref().unwrap().nat_type()); - assert_eq!(ret.unwrap().nat_type(), NatType::PortRestricted); + assert_eq!(ret.unwrap().nat_type(), NatType::Restricted); + } + + #[tokio::test] + async fn test_txt_public_stun_server() { + let stun_servers = vec!["txt:stun.easytier.cn".to_string()]; + let detector = UdpNatTypeDetector::new(stun_servers, 1); + let ret = detector.detect_nat_type(0).await; + println!("{:#?}, {:?}", ret, ret.as_ref().unwrap().nat_type()); + assert!(!ret.unwrap().stun_resps.is_empty()); + } + + #[tokio::test] + async fn test_v4_stun() { + let mut udp_server = UdpTunnelListener::new("udp://0.0.0.0:55355".parse().unwrap()); + let mut tasks = JoinSet::new(); + tasks.spawn(async move { + udp_server.listen().await.unwrap(); + loop { + udp_server.accept().await.unwrap(); + } + }); + let stun_servers = vec!["127.0.0.1:55355".to_string()]; + + let detector = UdpNatTypeDetector::new(stun_servers, 1); + let ret = detector.detect_nat_type(0).await; + println!("{:#?}, {:?}", ret, ret.as_ref().unwrap().nat_type()); + assert_eq!(ret.unwrap().nat_type(), NatType::Restricted); + } + + #[tokio::test] + async fn test_v6_stun() { + let mut udp_server = UdpTunnelListener::new("udp://[::]:55355".parse().unwrap()); + let mut tasks = JoinSet::new(); + tasks.spawn(async move { + udp_server.listen().await.unwrap(); + loop { + udp_server.accept().await.unwrap(); + } + }); + let stun_servers = vec!["::1:55355".to_string()]; + let ret = StunInfoCollector::get_public_ipv6(&stun_servers).await; + println!("{:#?}", ret); } } diff --git a/easytier/src/connector/direct.rs b/easytier/src/connector/direct.rs index 7dff78c..53db83f 100644 --- a/easytier/src/connector/direct.rs +++ b/easytier/src/connector/direct.rs @@ -1,7 +1,9 @@ // try connect peers directly, with either its public ip or lan ip use std::{ - net::SocketAddr, + collections::HashSet, + net::{Ipv6Addr, SocketAddr}, + str::FromStr, sync::{ atomic::{AtomicBool, Ordering}, Arc, @@ -79,7 +81,6 @@ struct DstListenerUrlBlackListItem(PeerId, url::Url); struct DirectConnectorManagerData { global_ctx: ArcGlobalCtx, peer_manager: Arc, - dst_blacklist: timedmap::TimedMap, dst_listener_blacklist: timedmap::TimedMap, } @@ -88,7 +89,6 @@ impl DirectConnectorManagerData { Self { global_ctx, peer_manager, - dst_blacklist: timedmap::TimedMap::new(), dst_listener_blacklist: timedmap::TimedMap::new(), } } @@ -150,7 +150,9 @@ impl DirectConnectorManager { let peers = data.peer_manager.list_peers().await; let mut tasks = JoinSet::new(); for peer_id in peers { - if peer_id == my_peer_id { + if peer_id == my_peer_id + || data.peer_manager.has_directly_connected_conn(peer_id) + { continue; } tasks.spawn(Self::do_try_direct_connect(data.clone(), peer_id)); @@ -173,24 +175,13 @@ impl DirectConnectorManager { dst_peer_id: PeerId, addr: String, ) -> Result<(), Error> { - data.dst_blacklist.cleanup(); - if data - .dst_blacklist - .contains(&DstBlackListItem(dst_peer_id.clone(), addr.clone())) - { - tracing::debug!("try_connect_to_ip failed, addr in blacklist: {}", addr); - return Err(Error::UrlInBlacklist); - } - let connector = create_connector_by_url(&addr, &data.global_ctx).await?; let (peer_id, conn_id) = timeout( - std::time::Duration::from_secs(5), - data.peer_manager.try_connect(connector), + std::time::Duration::from_secs(3), + data.peer_manager.try_direct_connect(connector), ) .await??; - // let (peer_id, conn_id) = data.peer_manager.try_connect(connector).await?; - if peer_id != dst_peer_id && !TESTING.load(Ordering::Relaxed) { tracing::info!( "connect to ip succ: {}, but peer id mismatch, expect: {}, actual: {}", @@ -204,6 +195,7 @@ impl DirectConnectorManager { .await?; return Err(Error::InvalidUrl(addr)); } + Ok(()) } @@ -214,7 +206,7 @@ impl DirectConnectorManager { addr: String, ) -> Result<(), Error> { let mut rand_gen = rand::rngs::OsRng::default(); - let backoff_ms = vec![1000, 2000, 4000]; + let backoff_ms = vec![1000, 2000]; let mut backoff_idx = 0; loop { @@ -237,12 +229,6 @@ impl DirectConnectorManager { backoff_idx += 1; continue; } else { - data.dst_blacklist.insert( - DstBlackListItem(dst_peer_id.clone(), addr.clone()), - (), - std::time::Duration::from_secs(DIRECT_CONNECTOR_BLACKLIST_TIMEOUT_SEC), - ); - return ret; } } @@ -273,61 +259,43 @@ impl DirectConnectorManager { tracing::debug!(?available_listeners, "got available listeners"); - let mut listener = available_listeners.get(0).ok_or(anyhow::anyhow!( - "peer {} have no valid listener", - dst_peer_id - ))?; + if available_listeners.is_empty() { + return Err(anyhow::anyhow!("peer {} have no valid listener", dst_peer_id).into()); + } // if have default listener, use it first - listener = available_listeners + let listener = available_listeners .iter() .find(|l| l.scheme() == data.global_ctx.get_flags().default_protocol) - .unwrap_or(listener); + .unwrap_or(available_listeners.get(0).unwrap()); - let mut tasks = JoinSet::new(); + let mut tasks = bounded_join_set::JoinSet::new(2); let listener_host = listener.socket_addrs(|| None).unwrap().pop(); match listener_host { Some(SocketAddr::V4(s_addr)) => { if s_addr.ip().is_unspecified() { - ip_list.interface_ipv4s.iter().for_each(|ip| { - let mut addr = (*listener).clone(); - if addr.set_host(Some(ip.to_string().as_str())).is_ok() { - tasks.spawn(Self::try_connect_to_ip( - data.clone(), - dst_peer_id.clone(), - addr.to_string(), - )); - } else { - tracing::error!( - ?ip, - ?listener, - ?dst_peer_id, - "failed to set host for interface ipv4" - ); - } - }); - - if let Some(public_ipv4) = ip_list.public_ipv4 { - let mut addr = (*listener).clone(); - if addr - .set_host(Some(public_ipv4.to_string().as_str())) - .is_ok() - { - tasks.spawn(Self::try_connect_to_ip( - data.clone(), - dst_peer_id.clone(), - addr.to_string(), - )); - } else { - tracing::error!( - ?public_ipv4, - ?listener, - ?dst_peer_id, - "failed to set host for public ipv4" - ); - } - } + ip_list + .interface_ipv4s + .iter() + .chain(ip_list.public_ipv4.iter()) + .for_each(|ip| { + let mut addr = (*listener).clone(); + if addr.set_host(Some(ip.to_string().as_str())).is_ok() { + tasks.spawn(Self::try_connect_to_ip( + data.clone(), + dst_peer_id.clone(), + addr.to_string(), + )); + } else { + tracing::error!( + ?ip, + ?listener, + ?dst_peer_id, + "failed to set host for interface ipv4" + ); + } + }); } else if !s_addr.ip().is_loopback() || TESTING.load(Ordering::Relaxed) { tasks.spawn(Self::try_connect_to_ip( data.clone(), @@ -338,47 +306,42 @@ impl DirectConnectorManager { } Some(SocketAddr::V6(s_addr)) => { if s_addr.ip().is_unspecified() { - ip_list.interface_ipv6s.iter().for_each(|ip| { - let mut addr = (*listener).clone(); - if addr - .set_host(Some(format!("[{}]", ip.to_string()).as_str())) - .is_ok() - { - tasks.spawn(Self::try_connect_to_ip( - data.clone(), - dst_peer_id.clone(), - addr.to_string(), - )); - } else { - tracing::error!( - ?ip, - ?listener, - ?dst_peer_id, - "failed to set host for interface ipv6" - ); - } - }); - - if let Some(public_ipv6) = ip_list.public_ipv6 { - let mut addr = (*listener).clone(); - if addr - .set_host(Some(format!("[{}]", public_ipv6.to_string()).as_str())) - .is_ok() - { - tasks.spawn(Self::try_connect_to_ip( - data.clone(), - dst_peer_id.clone(), - addr.to_string(), - )); - } else { - tracing::error!( - ?public_ipv6, - ?listener, - ?dst_peer_id, - "failed to set host for public ipv6" - ); - } - } + // for ipv6, only try public ip + ip_list + .interface_ipv6s + .iter() + .chain(ip_list.public_ipv6.iter()) + .filter_map(|x| Ipv6Addr::from_str(&x.to_string()).ok()) + .filter(|x| { + TESTING.load(Ordering::Relaxed) + || (!x.is_loopback() + && !x.is_unspecified() + && !x.is_unique_local() + && !x.is_unicast_link_local() + && !x.is_multicast()) + }) + .collect::>() + .iter() + .for_each(|ip| { + let mut addr = (*listener).clone(); + if addr + .set_host(Some(format!("[{}]", ip.to_string()).as_str())) + .is_ok() + { + tasks.spawn(Self::try_connect_to_ip( + data.clone(), + dst_peer_id.clone(), + addr.to_string(), + )); + } else { + tracing::error!( + ?ip, + ?listener, + ?dst_peer_id, + "failed to set host for public ipv6" + ); + } + }); } else if !s_addr.ip().is_loopback() || TESTING.load(Ordering::Relaxed) { tasks.spawn(Self::try_connect_to_ip( data.clone(), @@ -430,14 +393,6 @@ impl DirectConnectorManager { dst_peer_id: PeerId, ) -> Result<(), Error> { let peer_manager = data.peer_manager.clone(); - // check if we have direct connection with dst_peer_id - if let Some(c) = peer_manager.list_peer_conns(dst_peer_id).await { - // currently if we have any type of direct connection (udp or tcp), we will not try to connect - if !c.is_empty() { - return Ok(()); - } - } - tracing::debug!("try direct connect to peer: {}", dst_peer_id); let rpc_stub = peer_manager @@ -466,8 +421,7 @@ mod tests { use crate::{ connector::direct::{ - DirectConnectorManager, DirectConnectorManagerData, DstBlackListItem, - DstListenerUrlBlackListItem, + DirectConnectorManager, DirectConnectorManagerData, DstListenerUrlBlackListItem, }, instance::listeners::ListenerManager, peers::tests::{ @@ -526,9 +480,7 @@ mod tests { #[values("tcp", "udp", "wg")] proto: &str, #[values("true", "false")] ipv6: bool, ) { - if ipv6 && proto != "udp" { - return; - } + TESTING.store(true, std::sync::atomic::Ordering::Relaxed); let p_a = create_mock_peer_manager().await; let p_b = create_mock_peer_manager().await; @@ -544,14 +496,18 @@ mod tests { dm_a.run_as_client(); dm_c.run_as_server(); + let port = if proto == "wg" { 11040 } else { 11041 }; if !ipv6 { - let port = if proto == "wg" { 11040 } else { 11041 }; p_c.get_global_ctx().config.set_listeners(vec![format!( "{}://0.0.0.0:{}", proto, port ) .parse() .unwrap()]); + } else { + p_c.get_global_ctx() + .config + .set_listeners(vec![format!("{}://[::]:{}", proto, port).parse().unwrap()]); } let mut f = p_c.get_global_ctx().config.get_flags(); f.enable_ipv6 = ipv6; @@ -592,9 +548,5 @@ mod tests { 1, "tcp://127.0.0.1:10222".parse().unwrap() ))); - - assert!(data - .dst_blacklist - .contains(&DstBlackListItem(1, ip_list.listeners[0].to_string()))); } } diff --git a/easytier/src/connector/dns_connector.rs b/easytier/src/connector/dns_connector.rs index 6d37e60..c2ac24b 100644 --- a/easytier/src/connector/dns_connector.rs +++ b/easytier/src/connector/dns_connector.rs @@ -1,13 +1,17 @@ use std::{net::SocketAddr, sync::Arc}; use crate::{ - common::{error::Error, global_ctx::ArcGlobalCtx}, - tunnel::{Tunnel, TunnelConnector, TunnelError, PROTO_PORT_OFFSET}, + common::{ + error::Error, + global_ctx::ArcGlobalCtx, + stun::{get_default_resolver_config, resolve_txt_record}, + }, + tunnel::{IpVersion, Tunnel, TunnelConnector, TunnelError, PROTO_PORT_OFFSET}, }; use anyhow::Context; use dashmap::DashSet; use hickory_resolver::{ - config::{NameServerConfig, Protocol, ResolverConfig, ResolverOpts}, + config::{ResolverConfig, ResolverOpts}, proto::rr::rdata::SRV, TokioAsyncResolver, }; @@ -38,6 +42,7 @@ pub struct DNSTunnelConnector { addr: url::Url, bind_addrs: Vec, global_ctx: ArcGlobalCtx, + ip_version: IpVersion, default_resolve_config: ResolverConfig, default_resolve_opts: ResolverOpts, @@ -45,21 +50,13 @@ pub struct DNSTunnelConnector { impl DNSTunnelConnector { pub fn new(addr: url::Url, global_ctx: ArcGlobalCtx) -> Self { - let mut default_resolve_config = ResolverConfig::new(); - default_resolve_config.add_name_server(NameServerConfig::new( - "223.5.5.5:53".parse().unwrap(), - Protocol::Udp, - )); - default_resolve_config.add_name_server(NameServerConfig::new( - "180.184.1.1:53".parse().unwrap(), - Protocol::Udp, - )); Self { addr, bind_addrs: Vec::new(), global_ctx, + ip_version: IpVersion::Both, - default_resolve_config, + default_resolve_config: get_default_resolver_config(), default_resolve_opts: ResolverOpts::default(), } } @@ -69,26 +66,14 @@ impl DNSTunnelConnector { &self, domain_name: &str, ) -> Result, Error> { - let resolver = TokioAsyncResolver::tokio_from_system_conf().unwrap_or( - TokioAsyncResolver::tokio(ResolverConfig::default(), ResolverOpts::default()), - ); - - let response = resolver.txt_lookup(domain_name).await.with_context(|| { - format!( - "txt_lookup failed, domain_name: {}", - domain_name.to_string() - ) - })?; - - let txt_record = response.iter().next().with_context(|| { - format!( - "no txt record found, domain_name: {}", - domain_name.to_string() - ) - })?; - - let txt_data = String::from_utf8_lossy(&txt_record.txt_data()[0]); - tracing::info!(?txt_data, ?domain_name, "get txt record"); + let resolver = + TokioAsyncResolver::tokio_from_system_conf().unwrap_or(TokioAsyncResolver::tokio( + self.default_resolve_config.clone(), + self.default_resolve_opts.clone(), + )); + let txt_data = resolve_txt_record(domain_name, &resolver) + .await + .with_context(|| format!("resolve txt record failed, domain_name: {}", domain_name))?; let candidate_urls = txt_data .split(" ") @@ -106,9 +91,9 @@ impl DNSTunnelConnector { ) })?; - let connector = create_connector_by_url(url.as_str(), &self.global_ctx).await; - - connector + let mut connector = create_connector_by_url(url.as_str(), &self.global_ctx).await?; + connector.set_ip_version(self.ip_version); + Ok(connector) } fn handle_one_srv_record(record: &SRV, protocol: &str) -> Result<(url::Url, u64), Error> { @@ -141,9 +126,11 @@ impl DNSTunnelConnector { ) -> Result, Error> { tracing::info!("handle_srv_record: {}", domain_name); - let resolver = TokioAsyncResolver::tokio_from_system_conf().unwrap_or( - TokioAsyncResolver::tokio(ResolverConfig::default(), ResolverOpts::default()), - ); + let resolver = + TokioAsyncResolver::tokio_from_system_conf().unwrap_or(TokioAsyncResolver::tokio( + self.default_resolve_config.clone(), + self.default_resolve_opts.clone(), + )); let srv_domains = PROTO_PORT_OFFSET .iter() @@ -192,8 +179,9 @@ impl DNSTunnelConnector { ) })?; - let connector = create_connector_by_url(url.as_str(), &self.global_ctx).await; - connector + let mut connector = create_connector_by_url(url.as_str(), &self.global_ctx).await?; + connector.set_ip_version(self.ip_version); + Ok(connector) } } @@ -238,6 +226,10 @@ impl super::TunnelConnector for DNSTunnelConnector { fn set_bind_addrs(&mut self, addrs: Vec) { self.bind_addrs = addrs; } + + fn set_ip_version(&mut self, ip_version: IpVersion) { + self.ip_version = ip_version; + } } #[cfg(test)] diff --git a/easytier/src/connector/manual.rs b/easytier/src/connector/manual.rs index 4181cfa..ab24149 100644 --- a/easytier/src/connector/manual.rs +++ b/easytier/src/connector/manual.rs @@ -293,7 +293,6 @@ impl ManualConnectorManager { ip_version: IpVersion, ) -> Result { let ip_collector = data.global_ctx.get_ip_collector(); - let net_ns = data.net_ns.clone(); connector.lock().await.set_ip_version(ip_version); @@ -309,18 +308,11 @@ impl ManualConnectorManager { data.global_ctx.issue_event(GlobalCtxEvent::Connecting( connector.lock().await.remote_url().clone(), )); - - let _g = net_ns.guard(); tracing::info!("reconnect try connect... conn: {:?}", connector); - let tunnel = connector.lock().await.connect().await?; - tracing::info!("reconnect get tunnel succ: {:?}", tunnel); - assert_eq!( - dead_url, - tunnel.info().unwrap().remote_addr.unwrap().to_string(), - "info: {:?}", - tunnel.info() - ); - let (peer_id, conn_id) = data.peer_manager.add_client_tunnel(tunnel).await?; + let (peer_id, conn_id) = data + .peer_manager + .try_direct_connect(connector.lock().await.as_mut()) + .await?; tracing::info!("reconnect succ: {} {} {}", peer_id, conn_id, dead_url); Ok(ReconnResult { dead_url, diff --git a/easytier/src/connector/udp_hole_punch/common.rs b/easytier/src/connector/udp_hole_punch/common.rs index 11cbd06..ec45b4f 100644 --- a/easytier/src/connector/udp_hole_punch/common.rs +++ b/easytier/src/connector/udp_hole_punch/common.rs @@ -388,7 +388,7 @@ impl UdpHolePunchListener { tracing::warn!(?conn, "udp hole punching listener got peer connection"); let peer_mgr = peer_mgr.clone(); tokio::spawn(async move { - if let Err(e) = peer_mgr.add_tunnel_as_server(conn).await { + if let Err(e) = peer_mgr.add_tunnel_as_server(conn, false).await { tracing::error!( ?e, "failed to add tunnel as server in hole punch listener" diff --git a/easytier/src/easytier-cli.rs b/easytier/src/easytier-cli.rs index 0b16a89..f908585 100644 --- a/easytier/src/easytier-cli.rs +++ b/easytier/src/easytier-cli.rs @@ -1,8 +1,14 @@ use std::{ - ffi::OsString, fmt::Write, net::SocketAddr, path::PathBuf, sync::Mutex, time::Duration, vec, + ffi::OsString, + fmt::Write, + net::{IpAddr, SocketAddr}, + path::PathBuf, + sync::Mutex, + time::Duration, + vec, }; -use anyhow::{Context, Ok}; +use anyhow::Context; use clap::{command, Args, Parser, Subcommand}; use humansize::format_size; use service_manager::*; @@ -311,7 +317,11 @@ impl CommandHandler { ipv4: route.ipv4_addr.map(|ip| ip.to_string()).unwrap_or_default(), hostname: route.hostname.clone(), cost: cost_to_str(route.cost), - lat_ms: float_to_str(p.get_latency_ms().unwrap_or(0.0), 3), + lat_ms: if route.cost == 1 { + float_to_str(p.get_latency_ms().unwrap_or(0.0), 3) + } else { + route.path_latency_latency_first().to_string() + }, loss_rate: float_to_str(p.get_loss_rate().unwrap_or(0.0), 3), rx_bytes: format_size(p.get_rx_bytes().unwrap_or(0), humansize::DECIMAL), tx_bytes: format_size(p.get_tx_bytes().unwrap_or(0), humansize::DECIMAL), @@ -1036,6 +1046,7 @@ async fn main() -> Result<(), Error> { match sub_cmd.sub_command { Some(NodeSubCommand::Info) | None => { let stun_info = node_info.stun_info.clone().unwrap_or_default(); + let ip_list = node_info.ip_list.clone().unwrap_or_default(); let mut builder = tabled::builder::Builder::default(); builder.push_record(vec!["Virtual IP", node_info.ipv4_addr.as_str()]); @@ -1045,11 +1056,32 @@ async fn main() -> Result<(), Error> { node_info.proxy_cidrs.join(", ").as_str(), ]); builder.push_record(vec!["Peer ID", node_info.peer_id.to_string().as_str()]); - builder.push_record(vec!["Public IP", stun_info.public_ip.join(", ").as_str()]); + stun_info.public_ip.iter().for_each(|ip| { + let Ok(ip) = ip.parse::() else { + return; + }; + if ip.is_ipv4() { + builder.push_record(vec!["Public IPv4", ip.to_string().as_str()]); + } else { + builder.push_record(vec!["Public IPv6", ip.to_string().as_str()]); + } + }); builder.push_record(vec![ "UDP Stun Type", format!("{:?}", stun_info.udp_nat_type()).as_str(), ]); + ip_list.interface_ipv4s.iter().for_each(|ip| { + builder.push_record(vec![ + "Interface IPv4", + format!("{}", ip.to_string()).as_str(), + ]); + }); + ip_list.interface_ipv6s.iter().for_each(|ip| { + builder.push_record(vec![ + "Interface IPv6", + format!("{}", ip.to_string()).as_str(), + ]); + }); for (idx, l) in node_info.listeners.iter().enumerate() { if l.starts_with("ring") { continue; diff --git a/easytier/src/easytier-core.rs b/easytier/src/easytier-core.rs index 249000b..a1bdc0b 100644 --- a/easytier/src/easytier-core.rs +++ b/easytier/src/easytier-core.rs @@ -308,12 +308,6 @@ struct Cli { )] socks5: Option, - #[arg( - long, - help = t!("core_clap.ipv6_listener").to_string() - )] - ipv6_listener: Option, - #[arg( long, help = t!("core_clap.compression").to_string(), @@ -576,11 +570,6 @@ impl TryFrom<&Cli> for TomlConfigLoader { f.disable_p2p = cli.disable_p2p; f.disable_udp_hole_punching = cli.disable_udp_hole_punching; f.relay_all_peer_rpc = cli.relay_all_peer_rpc; - if let Some(ipv6_listener) = cli.ipv6_listener.as_ref() { - f.ipv6_listener = ipv6_listener - .parse() - .with_context(|| format!("failed to parse ipv6 listener: {}", ipv6_listener))? - } f.multi_thread = cli.multi_thread; f.data_compress_algo = match cli.compression.as_str() { "none" => CompressionAlgoPb::None, diff --git a/easytier/src/instance/listeners.rs b/easytier/src/instance/listeners.rs index 91aceb2..65350f9 100644 --- a/easytier/src/instance/listeners.rs +++ b/easytier/src/instance/listeners.rs @@ -1,5 +1,6 @@ use std::{fmt::Debug, sync::Arc}; +use anyhow::Context; use async_trait::async_trait; use tokio::task::JoinSet; @@ -49,6 +50,10 @@ pub fn get_listener_by_url( }) } +pub fn is_url_host_ipv6(l: &url::Url) -> bool { + l.host_str().map_or(false, |h| h.contains(':')) +} + #[async_trait] pub trait TunnelHandlerForListener { async fn handle_tunnel(&self, tunnel: Box) -> Result<(), Error>; @@ -58,7 +63,7 @@ pub trait TunnelHandlerForListener { impl TunnelHandlerForListener for PeerManager { #[tracing::instrument] async fn handle_tunnel(&self, tunnel: Box) -> Result<(), Error> { - self.add_tunnel_as_server(tunnel).await + self.add_tunnel_as_server(tunnel, true).await } } @@ -113,22 +118,26 @@ impl ListenerManage continue; }; let ctx = self.global_ctx.clone(); - self.add_listener(move || get_listener_by_url(&l, ctx.clone()).unwrap(), true) - .await?; - } - if self.global_ctx.config.get_flags().enable_ipv6 { - let ipv6_listener = self.global_ctx.config.get_flags().ipv6_listener.clone(); - let _ = self - .add_listener( - move || { - Box::new(UdpTunnelListener::new( - ipv6_listener.clone().parse().unwrap(), - )) - }, + let listener = l.clone(); + self.add_listener( + move || get_listener_by_url(&listener, ctx.clone()).unwrap(), + true, + ) + .await?; + + if self.global_ctx.config.get_flags().enable_ipv6 && !is_url_host_ipv6(&l) { + let mut ipv6_listener = l.clone(); + ipv6_listener + .set_host(Some("[::]".to_string().as_str())) + .with_context(|| format!("failed to set ipv6 host for listener: {}", l))?; + let ctx = self.global_ctx.clone(); + self.add_listener( + move || get_listener_by_url(&ipv6_listener, ctx.clone()).unwrap(), false, ) .await?; + } } Ok(()) @@ -161,11 +170,11 @@ impl ListenerManage global_ctx.issue_event(GlobalCtxEvent::ListenerAdded(l.local_url())); } Err(e) => { + tracing::error!(?e, ?l, "listener listen error"); global_ctx.issue_event(GlobalCtxEvent::ListenerAddFailed( l.local_url(), format!("error: {:?}, retry listen later...", e), )); - tracing::error!(?e, ?l, "listener listen error"); tokio::time::sleep(std::time::Duration::from_secs(1)).await; continue; } @@ -217,6 +226,15 @@ impl ListenerManage pub async fn run(&mut self) -> Result<(), Error> { for listener in &self.listeners { + if listener.must_succ { + // try listen once + let mut l = (listener.creator_fn)(); + let _g = self.net_ns.guard(); + l.listen() + .await + .with_context(|| format!("failed to listen on {}", l.local_url()))?; + } + self.tasks.spawn(Self::run_listener( listener.creator_fn.clone(), self.peer_manager.clone(), diff --git a/easytier/src/peers/foreign_network_manager.rs b/easytier/src/peers/foreign_network_manager.rs index dfc143c..df76d4f 100644 --- a/easytier/src/peers/foreign_network_manager.rs +++ b/easytier/src/peers/foreign_network_manager.rs @@ -695,7 +695,8 @@ mod tests { let (a_ring, b_ring) = crate::tunnel::ring::create_ring_tunnel_pair(); let b_mgr_copy = pm_center.clone(); - let s_ret = tokio::spawn(async move { b_mgr_copy.add_tunnel_as_server(b_ring).await }); + let s_ret = + tokio::spawn(async move { b_mgr_copy.add_tunnel_as_server(b_ring, true).await }); pma_net1.add_client_tunnel(a_ring).await.unwrap(); diff --git a/easytier/src/peers/peer.rs b/easytier/src/peers/peer.rs index 248bfc2..e3bb12e 100644 --- a/easytier/src/peers/peer.rs +++ b/easytier/src/peers/peer.rs @@ -11,7 +11,7 @@ use super::{ peer_conn::{PeerConn, PeerConnId}, PacketRecvChan, }; -use crate::proto::cli::PeerConnInfo; +use crate::{common::scoped_task::ScopedTask, proto::cli::PeerConnInfo}; use crate::{ common::{ error::Error, @@ -36,7 +36,8 @@ pub struct Peer { shutdown_notifier: Arc, - default_conn_id: AtomicCell, + default_conn_id: Arc>, + default_conn_id_clear_task: ScopedTask<()>, } impl Peer { @@ -88,6 +89,19 @@ impl Peer { )), ); + let default_conn_id = Arc::new(AtomicCell::new(PeerConnId::default())); + + let conns_copy = conns.clone(); + let default_conn_id_copy = default_conn_id.clone(); + let default_conn_id_clear_task = ScopedTask::from(tokio::spawn(async move { + loop { + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + if conns_copy.len() > 1 { + default_conn_id_copy.store(PeerConnId::default()); + } + } + })); + Peer { peer_node_id, conns: conns.clone(), @@ -98,7 +112,8 @@ impl Peer { close_event_listener, shutdown_notifier, - default_conn_id: AtomicCell::new(PeerConnId::default()), + default_conn_id, + default_conn_id_clear_task, } } @@ -117,14 +132,19 @@ impl Peer { return Some(conn.clone()); } - let conn = self.conns.iter().next(); - if conn.is_none() { - return None; + // find a conn with the smallest latency + let mut min_latency = std::u64::MAX; + for conn in self.conns.iter() { + let latency = conn.value().get_stats().latency_us; + if latency < min_latency { + min_latency = latency; + self.default_conn_id.store(conn.get_conn_id()); + } } - let conn = conn.unwrap().clone(); - self.default_conn_id.store(conn.get_conn_id()); - Some(conn) + self.conns + .get(&self.default_conn_id.load()) + .map(|conn| conn.clone()) } pub async fn send_msg(&self, msg: ZCPacket) -> Result<(), Error> { @@ -158,6 +178,10 @@ impl Peer { } ret } + + pub fn get_default_conn_id(&self) -> PeerConnId { + self.default_conn_id.load() + } } // pritn on drop diff --git a/easytier/src/peers/peer_manager.rs b/easytier/src/peers/peer_manager.rs index e6b7f0a..610fad9 100644 --- a/easytier/src/peers/peer_manager.rs +++ b/easytier/src/peers/peer_manager.rs @@ -8,7 +8,7 @@ use std::{ use anyhow::Context; use async_trait::async_trait; -use dashmap::DashMap; +use dashmap::{DashMap, DashSet}; use tokio::{ sync::{ @@ -23,7 +23,7 @@ use crate::{ compressor::{Compressor as _, DefaultCompressor}, constants::EASYTIER_VERSION, error::Error, - global_ctx::{ArcGlobalCtx, NetworkIdentity}, + global_ctx::{ArcGlobalCtx, GlobalCtxEvent, NetworkIdentity}, stun::StunInfoCollectorTrait, PeerId, }, @@ -141,6 +141,9 @@ pub struct PeerManager { data_compress_algo: CompressorAlgo, exit_nodes: Vec, + + // conns that are directly connected (which are not hole punched) + directly_connected_conn_map: Arc>>, } impl Debug for PeerManager { @@ -267,6 +270,8 @@ impl PeerManager { data_compress_algo, exit_nodes, + + directly_connected_conn_map: Arc::new(DashMap::new()), } } @@ -325,8 +330,48 @@ impl PeerManager { Ok((peer_id, conn_id)) } + fn add_directly_connected_conn(&self, peer_id: PeerId, conn_id: uuid::Uuid) { + let _ = self + .directly_connected_conn_map + .entry(peer_id) + .or_insert_with(DashSet::new) + .insert(conn_id); + } + + pub fn has_directly_connected_conn(&self, peer_id: PeerId) -> bool { + self.directly_connected_conn_map + .get(&peer_id) + .map_or(false, |x| !x.is_empty()) + } + + async fn start_peer_conn_close_event_handler(&self) { + let dmap = self.directly_connected_conn_map.clone(); + let mut event_recv = self.global_ctx.subscribe(); + self.tasks.lock().await.spawn(async move { + while let Ok(event) = event_recv.recv().await { + match event { + GlobalCtxEvent::PeerConnRemoved(info) => { + if let Some(set) = dmap.get_mut(&info.peer_id) { + let conn_id = info.conn_id.parse().unwrap(); + let old = set.remove(&conn_id); + tracing::info!( + ?old, + ?info, + "try remove conn id from directly connected map" + ); + } + } + _ => {} + } + } + }); + } + #[tracing::instrument] - pub async fn try_connect(&self, mut connector: C) -> Result<(PeerId, PeerConnId), Error> + pub async fn try_direct_connect( + &self, + mut connector: C, + ) -> Result<(PeerId, PeerConnId), Error> where C: TunnelConnector + Debug, { @@ -334,18 +379,28 @@ impl PeerManager { let t = ns .run_async(|| async move { connector.connect().await }) .await?; - self.add_client_tunnel(t).await + let (peer_id, conn_id) = self.add_client_tunnel(t).await?; + self.add_directly_connected_conn(peer_id, conn_id); + Ok((peer_id, conn_id)) } #[tracing::instrument] - pub async fn add_tunnel_as_server(&self, tunnel: Box) -> Result<(), Error> { + pub async fn add_tunnel_as_server( + &self, + tunnel: Box, + is_directly_connected: bool, + ) -> Result<(), Error> { tracing::info!("add tunnel as server start"); let mut peer = PeerConn::new(self.my_peer_id, self.global_ctx.clone(), tunnel); peer.do_handshake_as_server().await?; if peer.get_network_identity().network_name == self.global_ctx.get_network_identity().network_name { + let (peer_id, conn_id) = (peer.get_peer_id(), peer.get_conn_id()); self.add_new_peer_conn(peer).await?; + if is_directly_connected { + self.add_directly_connected_conn(peer_id, conn_id); + } } else { self.foreign_network_manager.add_peer_conn(peer).await?; } @@ -857,9 +912,11 @@ impl PeerManager { async fn run_clean_peer_without_conn_routine(&self) { let peer_map = self.peers.clone(); + let dmap = self.directly_connected_conn_map.clone(); self.tasks.lock().await.spawn(async move { loop { peer_map.clean_peer_without_conn().await; + dmap.retain(|p, v| peer_map.has_peer(*p) && !v.is_empty()); tokio::time::sleep(std::time::Duration::from_secs(3)).await; } }); @@ -876,6 +933,8 @@ impl PeerManager { } pub async fn run(&self) -> Result<(), Error> { + self.start_peer_conn_close_event_handler().await; + match &self.route_algo_inst { RouteAlgoInst::Ospf(route) => self.add_route(route.clone()).await, RouteAlgoInst::None => {} @@ -924,7 +983,7 @@ impl PeerManager { self.foreign_network_client.clone() } - pub fn get_my_info(&self) -> cli::NodeInfo { + pub async fn get_my_info(&self) -> cli::NodeInfo { cli::NodeInfo { peer_id: self.my_peer_id, ipv4_addr: self @@ -950,6 +1009,7 @@ impl PeerManager { config: self.global_ctx.config.dump(), version: EASYTIER_VERSION.to_string(), feature_flag: Some(self.global_ctx.get_feature_flags()), + ip_list: Some(self.global_ctx.get_ip_collector().collect_ip_addrs().await), } } @@ -958,6 +1018,13 @@ impl PeerManager { tokio::time::sleep(std::time::Duration::from_secs(1)).await; } } + + pub fn get_directly_connections(&self, peer_id: PeerId) -> DashSet { + self.directly_connected_conn_map + .get(&peer_id) + .map(|x| x.clone()) + .unwrap_or_default() + } } #[cfg(test)] @@ -1026,7 +1093,7 @@ mod tests { tokio::spawn(async move { client.set_bind_addrs(vec![]); - client_mgr.try_connect(client).await.unwrap(); + client_mgr.try_direct_connect(client).await.unwrap(); }); server_mgr diff --git a/easytier/src/peers/peer_map.rs b/easytier/src/peers/peer_map.rs index 26aaf37..b6ae05d 100644 --- a/easytier/src/peers/peer_map.rs +++ b/easytier/src/peers/peer_map.rs @@ -212,6 +212,11 @@ impl PeerMap { } } + pub async fn get_peer_default_conn_id(&self, peer_id: PeerId) -> Option { + self.get_peer_by_id(peer_id) + .map(|p| p.get_default_conn_id()) + } + pub async fn close_peer_conn( &self, peer_id: PeerId, diff --git a/easytier/src/peers/rpc_service.rs b/easytier/src/peers/rpc_service.rs index 34dfac0..14f8f19 100644 --- a/easytier/src/peers/rpc_service.rs +++ b/easytier/src/peers/rpc_service.rs @@ -32,12 +32,23 @@ impl PeerManagerRpcService { .await .iter(), ); + let peer_map = self.peer_manager.get_peer_map(); let mut peer_infos = Vec::new(); for peer in peers { let mut peer_info = PeerInfo::default(); peer_info.peer_id = peer; + peer_info.default_conn_id = peer_map + .get_peer_default_conn_id(peer) + .await + .map(Into::into); + peer_info.directly_connected_conns = self + .peer_manager + .get_directly_connections(peer) + .into_iter() + .map(Into::into) + .collect(); - if let Some(conns) = self.peer_manager.get_peer_map().list_peer_conns(peer).await { + if let Some(conns) = peer_map.list_peer_conns(peer).await { peer_info.conns = conns; } else if let Some(conns) = self .peer_manager @@ -121,7 +132,7 @@ impl PeerManageRpc for PeerManagerRpcService { _request: ShowNodeInfoRequest, // Accept request of type HelloRequest ) -> Result { Ok(ShowNodeInfoResponse { - node_info: Some(self.peer_manager.get_my_info()), + node_info: Some(self.peer_manager.get_my_info().await), }) } } diff --git a/easytier/src/peers/tests.rs b/easytier/src/peers/tests.rs index 0b1d971..90ffe5c 100644 --- a/easytier/src/peers/tests.rs +++ b/easytier/src/peers/tests.rs @@ -45,7 +45,7 @@ pub async fn connect_peer_manager(client: Arc, server: Arc Option { let mut ret = u64::MAX; let p = self.peer.as_ref()?; + let default_conn_id = p.default_conn_id.map(|id| id.to_string()); for conn in p.conns.iter() { let Some(stats) = &conn.stats else { continue; }; + if default_conn_id == Some(conn.conn_id.to_string()) { + return Some(f64::from(stats.latency_us as u32) / 1000.0); + } ret = ret.min(stats.latency_us); } diff --git a/easytier/src/proto/common.proto b/easytier/src/proto/common.proto index 7519860..2494bd0 100644 --- a/easytier/src/proto/common.proto +++ b/easytier/src/proto/common.proto @@ -18,7 +18,7 @@ message FlagsInConfig { bool disable_p2p = 11; bool relay_all_peer_rpc = 12; bool disable_udp_hole_punching = 13; - string ipv6_listener = 14; + // string ipv6_listener = 14; [deprecated = true]; use -l udp://[::]:12345 instead bool multi_thread = 15; CompressionAlgoPb data_compress_algo = 16; bool bind_device = 17; diff --git a/easytier/src/tunnel/common.rs b/easytier/src/tunnel/common.rs index cc5ee29..b834d15 100644 --- a/easytier/src/tunnel/common.rs +++ b/easytier/src/tunnel/common.rs @@ -360,7 +360,13 @@ pub(crate) fn setup_sokcet2_ext( socket2_socket.set_nonblocking(true)?; socket2_socket.set_reuse_address(true)?; - socket2_socket.bind(&socket2::SockAddr::from(*bind_addr))?; + if let Err(e) = socket2_socket.bind(&socket2::SockAddr::from(*bind_addr)) { + if bind_addr.is_ipv4() { + return Err(e.into()); + } else { + tracing::warn!(?e, "bind failed, do not return error for ipv6"); + } + } // #[cfg(all(unix, not(target_os = "solaris"), not(target_os = "illumos")))] // socket2_socket.set_reuse_port(true)?; diff --git a/easytier/src/tunnel/mod.rs b/easytier/src/tunnel/mod.rs index b3deb4e..958f9df 100644 --- a/easytier/src/tunnel/mod.rs +++ b/easytier/src/tunnel/mod.rs @@ -126,7 +126,7 @@ pub trait TunnelListener: Send { } #[async_trait] -#[auto_impl::auto_impl(Box)] +#[auto_impl::auto_impl(Box, &mut)] pub trait TunnelConnector: Send { async fn connect(&mut self) -> Result, TunnelError>; fn remote_url(&self) -> url::Url; diff --git a/easytier/src/tunnel/tcp.rs b/easytier/src/tunnel/tcp.rs index d5efaf7..d0d4b0d 100644 --- a/easytier/src/tunnel/tcp.rs +++ b/easytier/src/tunnel/tcp.rs @@ -150,9 +150,9 @@ impl TcpTunnelConnector { &mut self, addr: SocketAddr, ) -> Result, super::TunnelError> { - tracing::info!(addr = ?self.addr, "connect tcp start"); + tracing::info!(url = ?self.addr, ?addr, "connect tcp start, bind addrs: {:?}", self.bind_addrs); let stream = TcpStream::connect(addr).await?; - tracing::info!(addr = ?self.addr, "connect tcp succ"); + tracing::info!(url = ?self.addr, ?addr, "connect tcp succ"); return get_tunnel_with_tcp_stream(stream, self.addr.clone().into()); } @@ -190,7 +190,7 @@ impl super::TunnelConnector for TcpTunnelConnector { async fn connect(&mut self) -> Result, super::TunnelError> { let addr = check_scheme_and_get_socket_addr_ext::(&self.addr, "tcp", self.ip_version)?; - if self.bind_addrs.is_empty() || addr.is_ipv6() { + if self.bind_addrs.is_empty() { self.connect_with_default_bind(addr).await } else { self.connect_with_custom_bind(addr).await diff --git a/easytier/src/tunnel/udp.rs b/easytier/src/tunnel/udp.rs index 07c5d46..b3c808f 100644 --- a/easytier/src/tunnel/udp.rs +++ b/easytier/src/tunnel/udp.rs @@ -141,12 +141,27 @@ async fn respond_stun_packet( .encode_into_bytes(resp_msg.clone()) .map_err(|e| anyhow::anyhow!("stun encode error: {:?}", e))?; - socket - .send_to(&rsp_buf, addr.clone()) - .await - .with_context(|| "send stun response error")?; + let change_req = req_msg + .get_attribute::() + .map(|r| r.ip() || r.port()) + .unwrap_or(false); - tracing::debug!(?addr, ?req_msg, "udp respond stun packet done"); + if !change_req { + socket + .send_to(&rsp_buf, addr.clone()) + .await + .with_context(|| "send stun response error")?; + } else { + // send from a new udp socket + let socket = if addr.is_ipv4() { + UdpSocket::bind("0.0.0.0:0").await? + } else { + UdpSocket::bind("[::]:0").await? + }; + socket.send_to(&rsp_buf, addr.clone()).await?; + } + + tracing::debug!(?addr, ?req_msg, ?change_req, "udp respond stun packet done"); Ok(()) }