diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index da05a05..9497505 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -220,6 +220,8 @@ jobs: run: | sudo sysctl net.bridge.bridge-nf-call-iptables=0 sudo sysctl net.bridge.bridge-nf-call-ip6tables=0 + sudo sysctl net.ipv6.conf.lo.disable_ipv6=0 + sudo ip addr add 2001:db8::2/64 dev lo - name: Cargo cache uses: actions/cache@v4.0.0 with: diff --git a/easytier/src/common/config.rs b/easytier/src/common/config.rs index d170638..3501300 100644 --- a/easytier/src/common/config.rs +++ b/easytier/src/common/config.rs @@ -142,6 +142,8 @@ pub struct Flags { pub default_protocol: String, #[derivative(Default(value = "true"))] pub enable_encryption: bool, + #[derivative(Default(value = "true"))] + pub enable_ipv6: bool, } #[derive(Debug, Clone, Deserialize, Serialize, PartialEq)] diff --git a/easytier/src/connector/direct.rs b/easytier/src/connector/direct.rs index c210b02..7de6657 100644 --- a/easytier/src/connector/direct.rs +++ b/easytier/src/connector/direct.rs @@ -1,6 +1,6 @@ // try connect peers directly, with either its public ip or lan ip -use std::sync::Arc; +use std::{net::SocketAddr, sync::Arc}; use crate::{ common::{error::Error, global_ctx::ArcGlobalCtx, PeerId}, @@ -10,6 +10,7 @@ use crate::{ use crate::rpc::{peer::GetIpListResponse, PeerConnInfo}; use tokio::{task::JoinSet, time::timeout}; use tracing::Instrument; +use url::Host; use super::create_connector_by_url; @@ -230,17 +231,19 @@ impl DirectConnectorManager { dst_peer_id: PeerId, ip_list: GetIpListResponse, ) -> Result<(), Error> { + let enable_ipv6 = data.global_ctx.get_flags().enable_ipv6; let available_listeners = ip_list .listeners .iter() .filter_map(|l| if l.scheme() != "ring" { Some(l) } else { None }) - .filter(|l| l.port().is_some()) + .filter(|l| l.port().is_some() && l.host().is_some()) .filter(|l| { !data.dst_sceme_blacklist.contains(&DstSchemeBlackListItem( dst_peer_id.clone(), l.scheme().to_string(), )) }) + .filter(|l| enable_ipv6 || !matches!(l.host().unwrap().to_owned(), Host::Ipv6(_))) .collect::>(); let mut listener = available_listeners.get(0).ok_or(anyhow::anyhow!( @@ -255,31 +258,58 @@ impl DirectConnectorManager { .unwrap_or(listener); let mut tasks = JoinSet::new(); - ip_list.interface_ipv4s.iter().for_each(|ip| { - let addr = format!( - "{}://{}:{}", - listener.scheme(), - ip, - listener.port().unwrap_or(11010) - ); - tasks.spawn(Self::try_connect_to_ip( - data.clone(), - dst_peer_id.clone(), - addr, - )); - }); - let addr = format!( - "{}://{}:{}", - listener.scheme(), - ip_list.public_ipv4.clone(), - listener.port().unwrap_or(11010) - ); - tasks.spawn(Self::try_connect_to_ip( - data.clone(), - dst_peer_id.clone(), - addr, - )); + let listener_host = listener.socket_addrs(|| None).unwrap().pop(); + match listener_host { + Some(SocketAddr::V4(_)) => { + ip_list.interface_ipv4s.iter().for_each(|ip| { + let mut addr = (*listener).clone(); + if addr.set_host(Some(ip.as_str())).is_ok() { + tasks.spawn(Self::try_connect_to_ip( + data.clone(), + dst_peer_id.clone(), + addr.to_string(), + )); + } + }); + + let mut addr = (*listener).clone(); + if addr.set_host(Some(ip_list.public_ipv4.as_str())).is_ok() { + tasks.spawn(Self::try_connect_to_ip( + data.clone(), + dst_peer_id.clone(), + addr.to_string(), + )); + } + } + Some(SocketAddr::V6(_)) => { + ip_list.interface_ipv6s.iter().for_each(|ip| { + let mut addr = (*listener).clone(); + if addr.set_host(Some(format!("[{}]", ip).as_str())).is_ok() { + tasks.spawn(Self::try_connect_to_ip( + data.clone(), + dst_peer_id.clone(), + addr.to_string(), + )); + } + }); + + let mut addr = (*listener).clone(); + if addr + .set_host(Some(format!("[{}]", ip_list.public_ipv6).as_str())) + .is_ok() + { + tasks.spawn(Self::try_connect_to_ip( + data.clone(), + dst_peer_id.clone(), + addr.to_string(), + )); + } + } + p => { + tracing::error!(?p, ?listener, "failed to parse ip version from listener"); + } + } let mut has_succ = false; while let Some(ret) = tasks.join_next().await { @@ -351,7 +381,14 @@ mod tests { #[rstest::rstest] #[tokio::test] - async fn direct_connector_basic_test(#[values("tcp", "udp", "wg")] proto: &str) { + async fn direct_connector_basic_test( + #[values("tcp", "udp", "wg")] proto: &str, + #[values("true", "false")] ipv6: bool, + ) { + if ipv6 && proto != "udp" { + return; + } + let p_a = create_mock_peer_manager().await; let p_b = create_mock_peer_manager().await; let p_c = create_mock_peer_manager().await; @@ -366,12 +403,18 @@ mod tests { dm_a.run_as_client(); dm_c.run_as_server(); - let port = if proto == "wg" { 11040 } else { 11041 }; - p_c.get_global_ctx() - .config - .set_listeners(vec![format!("{}://0.0.0.0:{}", proto, port) - .parse() - .unwrap()]); + if !ipv6 { + let port = if proto == "wg" { 11040 } else { 11041 }; + p_c.get_global_ctx().config.set_listeners(vec![format!( + "{}://0.0.0.0:{}", + proto, port + ) + .parse() + .unwrap()]); + } + let mut f = p_c.get_global_ctx().config.get_flags(); + f.enable_ipv6 = ipv6; + p_c.get_global_ctx().config.set_flags(f); let mut lis_c = ListenerManager::new(p_c.get_global_ctx(), p_c.clone()); lis_c.prepare_listeners().await.unwrap(); diff --git a/easytier/src/connector/manual.rs b/easytier/src/connector/manual.rs index b189a99..e5cf808 100644 --- a/easytier/src/connector/manual.rs +++ b/easytier/src/connector/manual.rs @@ -1,5 +1,6 @@ use std::{collections::BTreeSet, sync::Arc}; +use anyhow::Context; use dashmap::{DashMap, DashSet}; use tokio::{ sync::{broadcast::Receiver, mpsc, Mutex}, @@ -8,7 +9,10 @@ use tokio::{ }; use crate::{ - common::PeerId, peers::peer_conn::PeerConnId, rpc as easytier_rpc, tunnel::TunnelConnector, + common::PeerId, + peers::peer_conn::PeerConnId, + rpc as easytier_rpc, + tunnel::{IpVersion, TunnelConnector}, }; use crate::{ @@ -254,66 +258,109 @@ impl ManualConnectorManager { &all_urls - &curr_alive } + async fn conn_reconnect_with_ip_version( + data: Arc, + dead_url: String, + connector: MutexConnector, + ip_version: IpVersion, + ) -> Result { + let ip_collector = data.global_ctx.get_ip_collector(); + let net_ns = data.net_ns.clone(); + + connector.lock().await.set_ip_version(ip_version); + + set_bind_addr_for_peer_connector( + connector.lock().await.as_mut(), + ip_version == IpVersion::V4, + &ip_collector, + ) + .await; + + data.global_ctx.issue_event(GlobalCtxEvent::Connecting( + connector.lock().await.remote_url().clone(), + )); + + let _g = net_ns.guard(); + log::info!("reconnect try connect... conn: {:?}", connector); + let tunnel = connector.lock().await.connect().await?; + log::info!("reconnect get tunnel succ: {:?}", tunnel); + assert_eq!( + dead_url, + tunnel.info().unwrap().remote_addr, + "info: {:?}", + tunnel.info() + ); + let (peer_id, conn_id) = data.peer_manager.add_client_tunnel(tunnel).await?; + log::info!("reconnect succ: {} {} {}", peer_id, conn_id, dead_url); + Ok(ReconnResult { + dead_url, + peer_id, + conn_id, + }) + } + async fn conn_reconnect( data: Arc, dead_url: String, connector: MutexConnector, ) -> Result { - let connector = Arc::new(Mutex::new(Some(connector))); - let net_ns = data.net_ns.clone(); - log::info!("reconnect: {}", dead_url); - let connector_clone = connector.clone(); - let data_clone = data.clone(); - let url_clone = dead_url.clone(); - let ip_collector = data.global_ctx.get_ip_collector(); - let reconn_task = async move { - let mut locked = connector_clone.lock().await; - let conn = locked.as_mut().unwrap(); - // TODO: should support set v6 here, use url in connector array - set_bind_addr_for_peer_connector(conn.lock().await.as_mut(), true, &ip_collector).await; - - data_clone - .global_ctx - .issue_event(GlobalCtxEvent::Connecting( - conn.lock().await.remote_url().clone(), - )); - - let _g = net_ns.guard(); - log::info!("reconnect try connect... conn: {:?}", conn); - let tunnel = conn.lock().await.connect().await?; - log::info!("reconnect get tunnel succ: {:?}", tunnel); - assert_eq!( - url_clone, - tunnel.info().unwrap().remote_addr, - "info: {:?}", - tunnel.info() - ); - let (peer_id, conn_id) = data_clone.peer_manager.add_client_tunnel(tunnel).await?; - log::info!("reconnect succ: {} {} {}", peer_id, conn_id, url_clone); - Ok(ReconnResult { - dead_url: url_clone, - peer_id, - conn_id, - }) - }; - - let ret = timeout(std::time::Duration::from_secs(1), reconn_task).await; - log::info!("reconnect: {} done, ret: {:?}", dead_url, ret); - - if ret.is_err() || ret.as_ref().unwrap().is_err() { - data.global_ctx.issue_event(GlobalCtxEvent::ConnectError( - dead_url.clone(), - format!("{:?}", ret), - )); + let mut ip_versions = vec![]; + let u = url::Url::parse(&dead_url) + .with_context(|| format!("failed to parse connector url {:?}", dead_url))?; + if u.scheme() == "ring" { + ip_versions.push(IpVersion::Both); + } else { + let addrs = u.socket_addrs(|| Some(1000))?; + let mut has_ipv4 = false; + let mut has_ipv6 = false; + for addr in addrs { + if addr.is_ipv4() { + if !has_ipv4 { + ip_versions.insert(0, IpVersion::V4); + } + has_ipv4 = true; + } else if addr.is_ipv6() { + if !has_ipv6 { + ip_versions.push(IpVersion::V6); + } + has_ipv6 = true; + } + } } - let conn = connector.lock().await.take().unwrap(); - data.reconnecting.remove(&dead_url).unwrap(); - data.connectors.insert(dead_url.clone(), conn); + let mut reconn_ret = Err(Error::AnyhowError(anyhow::anyhow!( + "cannot get ip from url" + ))); + for ip_version in ip_versions { + let ret = timeout( + std::time::Duration::from_secs(1), + Self::conn_reconnect_with_ip_version( + data.clone(), + dead_url.clone(), + connector.clone(), + ip_version, + ), + ) + .await; + log::info!("reconnect: {} done, ret: {:?}", dead_url, ret); - ret? + if ret.is_ok() && ret.as_ref().unwrap().is_ok() { + reconn_ret = ret.unwrap(); + break; + } else { + if ret.is_err() { + reconn_ret = Err(ret.unwrap_err().into()); + } else if ret.as_ref().unwrap().is_err() { + reconn_ret = Err(ret.unwrap().unwrap_err()); + } + } + } + data.reconnecting.remove(&dead_url).unwrap(); + data.connectors.insert(dead_url.clone(), connector); + + reconn_ret } } diff --git a/easytier/src/easytier-core.rs b/easytier/src/easytier-core.rs index f280540..30b4406 100644 --- a/easytier/src/easytier-core.rs +++ b/easytier/src/easytier-core.rs @@ -137,6 +137,9 @@ and the vpn client is in network of 10.14.14.0/24" default_value = "false" )] multi_thread: bool, + + #[arg(long, help = "do not use ipv6", default_value = "false")] + disable_ipv6: bool, } impl From for TomlConfigLoader { @@ -266,6 +269,7 @@ impl From for TomlConfigLoader { f.default_protocol = cli.default_protocol.as_ref().unwrap().clone(); } f.enable_encryption = !cli.disable_encryption; + f.enable_ipv6 = !cli.disable_ipv6; cfg.set_flags(f); cfg diff --git a/easytier/src/instance/listeners.rs b/easytier/src/instance/listeners.rs index afb6fdd..40730af 100644 --- a/easytier/src/instance/listeners.rs +++ b/easytier/src/instance/listeners.rs @@ -56,10 +56,16 @@ impl TunnelHandlerForListener for PeerManager { } } +#[derive(Debug, Clone)] +struct Listener { + inner: Arc>, + must_succ: bool, +} + pub struct ListenerManager { global_ctx: ArcGlobalCtx, net_ns: NetNS, - listeners: Vec>>, + listeners: Vec, peer_manager: Arc, tasks: JoinSet<()>, @@ -77,27 +83,42 @@ impl ListenerManage } pub async fn prepare_listeners(&mut self) -> Result<(), Error> { - self.add_listener(RingTunnelListener::new( - format!("ring://{}", self.global_ctx.get_id()) - .parse() - .unwrap(), - )) + self.add_listener( + RingTunnelListener::new( + format!("ring://{}", self.global_ctx.get_id()) + .parse() + .unwrap(), + ), + true, + ) .await?; for l in self.global_ctx.config.get_listener_uris().iter() { let lis = get_listener_by_url(l, self.global_ctx.clone())?; - self.add_listener(lis).await?; + self.add_listener(lis, true).await?; + } + + if self.global_ctx.config.get_flags().enable_ipv6 { + let _ = self + .add_listener( + UdpTunnelListener::new("udp://[::]:0".parse().unwrap()), + false, + ) + .await?; } Ok(()) } - pub async fn add_listener(&mut self, listener: Listener) -> Result<(), Error> + pub async fn add_listener(&mut self, listener: L, must_succ: bool) -> Result<(), Error> where - Listener: TunnelListener + 'static, + L: TunnelListener + 'static, { let listener = Arc::new(Mutex::new(listener)); - self.listeners.push(listener); + self.listeners.push(Listener { + inner: listener, + must_succ, + }); Ok(()) } @@ -136,16 +157,17 @@ impl ListenerManage pub async fn run(&mut self) -> Result<(), Error> { for listener in &self.listeners { let _guard = self.net_ns.guard(); - let addr = listener.lock().await.local_url(); + let addr = listener.inner.lock().await.local_url(); log::warn!("run listener: {:?}", listener); listener + .inner .lock() .await .listen() .await .with_context(|| format!("failed to add listener {}", addr))?; self.tasks.spawn(Self::run_listener( - listener.clone(), + listener.inner.clone(), self.peer_manager.clone(), self.global_ctx.clone(), )); @@ -190,7 +212,7 @@ mod tests { let ring_id = format!("ring://{}", uuid::Uuid::new_v4()); listener_mgr - .add_listener(RingTunnelListener::new(ring_id.parse().unwrap())) + .add_listener(RingTunnelListener::new(ring_id.parse().unwrap()), true) .await .unwrap(); listener_mgr.run().await.unwrap(); diff --git a/easytier/src/tests/three_node.rs b/easytier/src/tests/three_node.rs index 3fc979c..a5f9bd0 100644 --- a/easytier/src/tests/three_node.rs +++ b/easytier/src/tests/three_node.rs @@ -508,7 +508,7 @@ pub async fn wireguard_vpn_portal() { // ping other node in network wait_for_condition( || async { ping_test("net_d", "10.144.144.1").await }, - Duration::from_secs(5000), + Duration::from_secs(5), ) .await; wait_for_condition( diff --git a/easytier/src/tunnel/mod.rs b/easytier/src/tunnel/mod.rs index bfbb3fc..2a7b1bc 100644 --- a/easytier/src/tunnel/mod.rs +++ b/easytier/src/tunnel/mod.rs @@ -83,7 +83,7 @@ pub trait TunnelConnCounter: 'static + Send + Sync + Debug { fn get(&self) -> u32; } -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, PartialEq)] pub enum IpVersion { V4, V6,