diff --git a/easytier/locales/app.yml b/easytier/locales/app.yml index 266d31c..0695eaf 100644 --- a/easytier/locales/app.yml +++ b/easytier/locales/app.yml @@ -134,6 +134,12 @@ core_clap: compression: en: "compression algorithm to use, support none, zstd. default is none" zh-CN: "要使用的压缩算法,支持 none、zstd。默认为 none" + mapped_listeners: + en: "manually specify the public address of the listener, other nodes can use this address to connect to this node. e.g.: tcp://123.123.123.123:11223, can specify multiple." + zh-CN: "手动指定监听器的公网地址,其他节点可以使用该地址连接到本节点。例如:tcp://123.123.123.123:11223,可以指定多个。" + bind_device: + en: "bind the connector socket to physical devices to avoid routing issues. e.g.: subnet proxy segment conflicts with a node's segment, after binding the physical device, it can communicate with the node normally." + zh-CN: "将连接器的套接字绑定到物理设备以避免路由问题。比如子网代理网段与某节点的网段冲突,绑定物理设备后可以与该节点正常通信。" core_app: panic_backtrace_save: diff --git a/easytier/src/common/config.rs b/easytier/src/common/config.rs index d80d196..e22f5d8 100644 --- a/easytier/src/common/config.rs +++ b/easytier/src/common/config.rs @@ -29,6 +29,7 @@ pub fn gen_default_flags() -> Flags { ipv6_listener: "udp://[::]:0".to_string(), multi_thread: true, data_compress_algo: CompressionAlgoPb::None.into(), + bind_device: true, } } @@ -72,6 +73,9 @@ pub trait ConfigLoader: Send + Sync { fn get_listeners(&self) -> Vec; fn set_listeners(&self, listeners: Vec); + fn get_mapped_listeners(&self) -> Vec; + fn set_mapped_listeners(&self, listeners: Option>); + fn get_rpc_portal(&self) -> Option; fn set_rpc_portal(&self, addr: SocketAddr); @@ -183,6 +187,7 @@ struct Config { dhcp: Option, network_identity: Option, listeners: Option>, + mapped_listeners: Option>, exit_nodes: Option>, peer: Option>, @@ -472,6 +477,19 @@ impl ConfigLoader for TomlConfigLoader { self.config.lock().unwrap().listeners = Some(listeners); } + fn get_mapped_listeners(&self) -> Vec { + self.config + .lock() + .unwrap() + .mapped_listeners + .clone() + .unwrap_or_default() + } + + fn set_mapped_listeners(&self, listeners: Option>) { + self.config.lock().unwrap().mapped_listeners = listeners; + } + fn get_rpc_portal(&self) -> Option { self.config.lock().unwrap().rpc_portal } diff --git a/easytier/src/connector/direct.rs b/easytier/src/connector/direct.rs index 585c1f0..7dff78c 100644 --- a/easytier/src/connector/direct.rs +++ b/easytier/src/connector/direct.rs @@ -1,6 +1,13 @@ // try connect peers directly, with either its public ip or lan ip -use std::{net::SocketAddr, sync::Arc, time::Duration}; +use std::{ + net::SocketAddr, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + time::Duration, +}; use crate::{ common::{error::Error, global_ctx::ArcGlobalCtx, PeerId}, @@ -29,6 +36,8 @@ use super::create_connector_by_url; pub const DIRECT_CONNECTOR_SERVICE_ID: u32 = 1; pub const DIRECT_CONNECTOR_BLACKLIST_TIMEOUT_SEC: u64 = 300; +static TESTING: AtomicBool = AtomicBool::new(false); + #[async_trait::async_trait] pub trait PeerManagerForDirectConnector { async fn list_peers(&self) -> Vec; @@ -182,7 +191,7 @@ impl DirectConnectorManager { // let (peer_id, conn_id) = data.peer_manager.try_connect(connector).await?; - if peer_id != dst_peer_id { + if peer_id != dst_peer_id && !TESTING.load(Ordering::Relaxed) { tracing::info!( "connect to ip succ: {}, but peer id mismatch, expect: {}, actual: {}", addr, @@ -279,87 +288,103 @@ impl DirectConnectorManager { let listener_host = listener.socket_addrs(|| None).unwrap().pop(); match listener_host { - Some(SocketAddr::V4(_)) => { - ip_list.interface_ipv4s.iter().for_each(|ip| { - let mut addr = (*listener).clone(); - if addr.set_host(Some(ip.to_string().as_str())).is_ok() { - tasks.spawn(Self::try_connect_to_ip( - data.clone(), - dst_peer_id.clone(), - addr.to_string(), - )); - } else { - tracing::error!( - ?ip, - ?listener, - ?dst_peer_id, - "failed to set host for interface ipv4" - ); - } - }); + Some(SocketAddr::V4(s_addr)) => { + if s_addr.ip().is_unspecified() { + ip_list.interface_ipv4s.iter().for_each(|ip| { + let mut addr = (*listener).clone(); + if addr.set_host(Some(ip.to_string().as_str())).is_ok() { + tasks.spawn(Self::try_connect_to_ip( + data.clone(), + dst_peer_id.clone(), + addr.to_string(), + )); + } else { + tracing::error!( + ?ip, + ?listener, + ?dst_peer_id, + "failed to set host for interface ipv4" + ); + } + }); - if let Some(public_ipv4) = ip_list.public_ipv4 { - let mut addr = (*listener).clone(); - if addr - .set_host(Some(public_ipv4.to_string().as_str())) - .is_ok() - { - tasks.spawn(Self::try_connect_to_ip( - data.clone(), - dst_peer_id.clone(), - addr.to_string(), - )); - } else { - tracing::error!( - ?public_ipv4, - ?listener, - ?dst_peer_id, - "failed to set host for public ipv4" - ); + if let Some(public_ipv4) = ip_list.public_ipv4 { + let mut addr = (*listener).clone(); + if addr + .set_host(Some(public_ipv4.to_string().as_str())) + .is_ok() + { + tasks.spawn(Self::try_connect_to_ip( + data.clone(), + dst_peer_id.clone(), + addr.to_string(), + )); + } else { + tracing::error!( + ?public_ipv4, + ?listener, + ?dst_peer_id, + "failed to set host for public ipv4" + ); + } } + } else if !s_addr.ip().is_loopback() || TESTING.load(Ordering::Relaxed) { + tasks.spawn(Self::try_connect_to_ip( + data.clone(), + dst_peer_id.clone(), + listener.to_string(), + )); } } - Some(SocketAddr::V6(_)) => { - ip_list.interface_ipv6s.iter().for_each(|ip| { - let mut addr = (*listener).clone(); - if addr - .set_host(Some(format!("[{}]", ip.to_string()).as_str())) - .is_ok() - { - tasks.spawn(Self::try_connect_to_ip( - data.clone(), - dst_peer_id.clone(), - addr.to_string(), - )); - } else { - tracing::error!( - ?ip, - ?listener, - ?dst_peer_id, - "failed to set host for interface ipv6" - ); - } - }); + Some(SocketAddr::V6(s_addr)) => { + if s_addr.ip().is_unspecified() { + ip_list.interface_ipv6s.iter().for_each(|ip| { + let mut addr = (*listener).clone(); + if addr + .set_host(Some(format!("[{}]", ip.to_string()).as_str())) + .is_ok() + { + tasks.spawn(Self::try_connect_to_ip( + data.clone(), + dst_peer_id.clone(), + addr.to_string(), + )); + } else { + tracing::error!( + ?ip, + ?listener, + ?dst_peer_id, + "failed to set host for interface ipv6" + ); + } + }); - if let Some(public_ipv6) = ip_list.public_ipv6 { - let mut addr = (*listener).clone(); - if addr - .set_host(Some(format!("[{}]", public_ipv6.to_string()).as_str())) - .is_ok() - { - tasks.spawn(Self::try_connect_to_ip( - data.clone(), - dst_peer_id.clone(), - addr.to_string(), - )); - } else { - tracing::error!( - ?public_ipv6, - ?listener, - ?dst_peer_id, - "failed to set host for public ipv6" - ); + if let Some(public_ipv6) = ip_list.public_ipv6 { + let mut addr = (*listener).clone(); + if addr + .set_host(Some(format!("[{}]", public_ipv6.to_string()).as_str())) + .is_ok() + { + tasks.spawn(Self::try_connect_to_ip( + data.clone(), + dst_peer_id.clone(), + addr.to_string(), + )); + } else { + tracing::error!( + ?public_ipv6, + ?listener, + ?dst_peer_id, + "failed to set host for public ipv6" + ); + } } + } else if !s_addr.ip().is_loopback() || TESTING.load(Ordering::Relaxed) { + tasks.spawn(Self::try_connect_to_ip( + data.clone(), + dst_peer_id.clone(), + listener.to_string(), + )); } } p => { @@ -452,6 +477,49 @@ mod tests { proto::peer_rpc::GetIpListResponse, }; + use super::TESTING; + + #[tokio::test] + async fn direct_connector_mapped_listener() { + TESTING.store(true, std::sync::atomic::Ordering::Relaxed); + let p_a = create_mock_peer_manager().await; + let p_b = create_mock_peer_manager().await; + let p_c = create_mock_peer_manager().await; + let p_x = create_mock_peer_manager().await; + connect_peer_manager(p_a.clone(), p_b.clone()).await; + connect_peer_manager(p_b.clone(), p_c.clone()).await; + connect_peer_manager(p_c.clone(), p_x.clone()).await; + + wait_route_appear(p_a.clone(), p_c.clone()).await.unwrap(); + wait_route_appear(p_a.clone(), p_x.clone()).await.unwrap(); + + let mut f = p_a.get_global_ctx().get_flags(); + f.bind_device = false; + p_a.get_global_ctx().config.set_flags(f); + + p_c.get_global_ctx() + .config + .set_mapped_listeners(Some(vec!["tcp://127.0.0.1:11334".parse().unwrap()])); + + p_x.get_global_ctx() + .config + .set_listeners(vec!["tcp://0.0.0.0:11334".parse().unwrap()]); + let mut lis_x = ListenerManager::new(p_x.get_global_ctx(), p_x.clone()); + lis_x.prepare_listeners().await.unwrap(); + lis_x.run().await.unwrap(); + + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + let mut dm_a = DirectConnectorManager::new(p_a.get_global_ctx(), p_a.clone()); + let mut dm_c = DirectConnectorManager::new(p_c.get_global_ctx(), p_c.clone()); + dm_a.run_as_client(); + dm_c.run_as_server(); + // p_c's mapped listener is p_x's listener, so p_a should connect to p_x directly + + wait_route_appear_with_cost(p_a.clone(), p_x.my_peer_id(), Some(1)) + .await + .unwrap(); + } + #[rstest::rstest] #[tokio::test] async fn direct_connector_basic_test( diff --git a/easytier/src/connector/mod.rs b/easytier/src/connector/mod.rs index d2f3d6e..a9bd18e 100644 --- a/easytier/src/connector/mod.rs +++ b/easytier/src/connector/mod.rs @@ -56,23 +56,27 @@ pub async fn create_connector_by_url( "tcp" => { let dst_addr = check_scheme_and_get_socket_addr::(&url, "tcp")?; let mut connector = TcpTunnelConnector::new(url); - set_bind_addr_for_peer_connector( - &mut connector, - dst_addr.is_ipv4(), - &global_ctx.get_ip_collector(), - ) - .await; + if global_ctx.config.get_flags().bind_device { + set_bind_addr_for_peer_connector( + &mut connector, + dst_addr.is_ipv4(), + &global_ctx.get_ip_collector(), + ) + .await; + } return Ok(Box::new(connector)); } "udp" => { let dst_addr = check_scheme_and_get_socket_addr::(&url, "udp")?; let mut connector = UdpTunnelConnector::new(url); - set_bind_addr_for_peer_connector( - &mut connector, - dst_addr.is_ipv4(), - &global_ctx.get_ip_collector(), - ) - .await; + if global_ctx.config.get_flags().bind_device { + set_bind_addr_for_peer_connector( + &mut connector, + dst_addr.is_ipv4(), + &global_ctx.get_ip_collector(), + ) + .await; + } return Ok(Box::new(connector)); } "ring" => { @@ -84,12 +88,14 @@ pub async fn create_connector_by_url( "quic" => { let dst_addr = check_scheme_and_get_socket_addr::(&url, "quic")?; let mut connector = QUICTunnelConnector::new(url); - set_bind_addr_for_peer_connector( - &mut connector, - dst_addr.is_ipv4(), - &global_ctx.get_ip_collector(), - ) - .await; + if global_ctx.config.get_flags().bind_device { + set_bind_addr_for_peer_connector( + &mut connector, + dst_addr.is_ipv4(), + &global_ctx.get_ip_collector(), + ) + .await; + } return Ok(Box::new(connector)); } #[cfg(feature = "wireguard")] @@ -101,12 +107,14 @@ pub async fn create_connector_by_url( &nid.network_secret.unwrap_or_default(), ); let mut connector = WgTunnelConnector::new(url, wg_config); - set_bind_addr_for_peer_connector( - &mut connector, - dst_addr.is_ipv4(), - &global_ctx.get_ip_collector(), - ) - .await; + if global_ctx.config.get_flags().bind_device { + set_bind_addr_for_peer_connector( + &mut connector, + dst_addr.is_ipv4(), + &global_ctx.get_ip_collector(), + ) + .await; + } return Ok(Box::new(connector)); } #[cfg(feature = "websocket")] @@ -114,12 +122,14 @@ pub async fn create_connector_by_url( use crate::tunnel::{FromUrl, IpVersion}; let dst_addr = SocketAddr::from_url(url.clone(), IpVersion::Both)?; let mut connector = crate::tunnel::websocket::WSTunnelConnector::new(url); - set_bind_addr_for_peer_connector( - &mut connector, - dst_addr.is_ipv4(), - &global_ctx.get_ip_collector(), - ) - .await; + if global_ctx.config.get_flags().bind_device { + set_bind_addr_for_peer_connector( + &mut connector, + dst_addr.is_ipv4(), + &global_ctx.get_ip_collector(), + ) + .await; + } return Ok(Box::new(connector)); } _ => { diff --git a/easytier/src/easytier-core.rs b/easytier/src/easytier-core.rs index e03e108..7be00e9 100644 --- a/easytier/src/easytier-core.rs +++ b/easytier/src/easytier-core.rs @@ -123,6 +123,13 @@ struct Cli { )] listeners: Vec, + #[arg( + long, + help = t!("core_clap.mapped_listeners").to_string(), + num_args = 0.. + )] + mapped_listeners: Vec, + #[arg( long, help = t!("core_clap.no_listener").to_string(), @@ -300,6 +307,12 @@ struct Cli { default_value = "none", )] compression: String, + + #[arg( + long, + help = t!("core_clap.bind_device").to_string() + )] + bind_device: Option, } rust_i18n::i18n!("locales", fallback = "en"); @@ -422,6 +435,23 @@ impl TryFrom<&Cli> for TomlConfigLoader { .collect(), ); + cfg.set_mapped_listeners(Some( + cli.mapped_listeners + .iter() + .map(|s| { + s.parse() + .with_context(|| format!("mapped listener is not a valid url: {}", s)) + .unwrap() + }) + .map(|s: url::Url| { + if s.port().is_none() { + panic!("mapped listener port is missing: {}", s); + } + s + }) + .collect(), + )); + for n in cli.proxy_networks.iter() { cfg.add_proxy_cidr( n.parse() @@ -534,6 +564,9 @@ impl TryFrom<&Cli> for TomlConfigLoader { ), } .into(); + if let Some(bind_device) = cli.bind_device { + f.bind_device = bind_device; + } cfg.set_flags(f); cfg.set_exit_nodes(cli.exit_nodes.clone()); diff --git a/easytier/src/peers/peer_rpc_service.rs b/easytier/src/peers/peer_rpc_service.rs index ea7987c..35c4fc7 100644 --- a/easytier/src/peers/peer_rpc_service.rs +++ b/easytier/src/peers/peer_rpc_service.rs @@ -24,8 +24,10 @@ impl DirectConnectorRpc for DirectConnectorManagerRpcServer { let mut ret = self.global_ctx.get_ip_collector().collect_ip_addrs().await; ret.listeners = self .global_ctx - .get_running_listeners() + .config + .get_mapped_listeners() .into_iter() + .chain(self.global_ctx.get_running_listeners().into_iter()) .map(Into::into) .collect(); Ok(ret) diff --git a/easytier/src/proto/common.proto b/easytier/src/proto/common.proto index 51070b6..a4d44a7 100644 --- a/easytier/src/proto/common.proto +++ b/easytier/src/proto/common.proto @@ -21,6 +21,7 @@ message FlagsInConfig { string ipv6_listener = 14; bool multi_thread = 15; CompressionAlgoPb data_compress_algo = 16; + bool bind_device = 17; } message RpcDescriptor {