From 9ed22eaf99308b14549dc8dadef2a27b91efddd0 Mon Sep 17 00:00:00 2001 From: "sijie.sun" Date: Sat, 23 Mar 2024 16:34:48 +0800 Subject: [PATCH] improve direct connector --- easytier-core/src/common/global_ctx.rs | 14 +++++++++- easytier-core/src/common/network.rs | 7 +---- easytier-core/src/connector/direct.rs | 36 +++++++++++++++++-------- easytier-core/src/instance/listeners.rs | 1 + easytier-core/src/main.rs | 36 ++++++++++++++++++------- easytier-core/src/rpc/peer.rs | 2 ++ 6 files changed, 68 insertions(+), 28 deletions(-) diff --git a/easytier-core/src/common/global_ctx.rs b/easytier-core/src/common/global_ctx.rs index d2ee625..8f60d92 100644 --- a/easytier-core/src/common/global_ctx.rs +++ b/easytier-core/src/common/global_ctx.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use crate::rpc::PeerConnInfo; use crossbeam::atomic::AtomicCell; @@ -50,6 +50,8 @@ pub struct GlobalCtx { hotname: AtomicCell>, stun_info_collection: Box, + + running_listeners: Mutex>, } impl std::fmt::Debug for GlobalCtx { @@ -90,6 +92,8 @@ impl GlobalCtx { hotname: AtomicCell::new(None), stun_info_collection: Box::new(StunInfoCollector::new_with_default_servers()), + + running_listeners: Mutex::new(Vec::new()), } } @@ -180,6 +184,14 @@ impl GlobalCtx { std::ptr::write(ptr, collector); } } + + pub fn get_running_listeners(&self) -> Vec { + self.running_listeners.lock().unwrap().clone() + } + + pub fn add_running_listener(&self, url: url::Url) { + self.running_listeners.lock().unwrap().push(url); + } } #[cfg(test)] diff --git a/easytier-core/src/common/network.rs b/easytier-core/src/common/network.rs index 4e336db..69ea760 100644 --- a/easytier-core/src/common/network.rs +++ b/easytier-core/src/common/network.rs @@ -156,12 +156,7 @@ impl IPCollector { #[tracing::instrument(skip(net_ns))] async fn do_collect_ip_addrs(with_public: bool, net_ns: NetNS) -> GetIpListResponse { - let mut ret = crate::rpc::peer::GetIpListResponse { - public_ipv4: "".to_string(), - interface_ipv4s: vec![], - public_ipv6: "".to_string(), - interface_ipv6s: vec![], - }; + let mut ret = crate::rpc::peer::GetIpListResponse::new(); if with_public { if let Some(v4_addr) = diff --git a/easytier-core/src/connector/direct.rs b/easytier-core/src/connector/direct.rs index b598a82..4c473b6 100644 --- a/easytier-core/src/connector/direct.rs +++ b/easytier-core/src/connector/direct.rs @@ -7,7 +7,6 @@ use crate::{ constants::{self, DIRECT_CONNECTOR_BLACKLIST_TIMEOUT_SEC}, error::Error, global_ctx::ArcGlobalCtx, - network::IPCollector, PeerId, }, peers::{peer_manager::PeerManager, peer_rpc::PeerRpcManager}, @@ -56,21 +55,21 @@ impl PeerManagerForDirectConnector for PeerManager { #[derive(Clone)] struct DirectConnectorManagerRpcServer { // TODO: this only cache for one src peer, should make it global - ip_list_collector: Arc, + global_ctx: ArcGlobalCtx, } #[tarpc::server] impl DirectConnectorRpc for DirectConnectorManagerRpcServer { async fn get_ip_list(self, _: tarpc::context::Context) -> GetIpListResponse { - return self.ip_list_collector.collect_ip_addrs().await; + let mut ret = self.global_ctx.get_ip_collector().collect_ip_addrs().await; + ret.listeners = self.global_ctx.get_running_listeners(); + ret } } impl DirectConnectorManagerRpcServer { - pub fn new(ip_collector: Arc) -> Self { - Self { - ip_list_collector: ip_collector, - } + pub fn new(global_ctx: ArcGlobalCtx) -> Self { + Self { global_ctx } } } @@ -119,7 +118,7 @@ impl DirectConnectorManager { pub fn run_as_server(&mut self) { self.data.peer_manager.get_peer_rpc_mgr().run_service( constants::DIRECT_CONNECTOR_SERVICE_ID, - DirectConnectorManagerRpcServer::new(self.global_ctx.get_ip_collector()).serve(), + DirectConnectorManagerRpcServer::new(self.global_ctx.clone()).serve(), ); } @@ -241,9 +240,19 @@ impl DirectConnectorManager { }) .await?; + let listener = ip_list + .listeners + .get(0) + .ok_or(anyhow::anyhow!("peer {} have no listener", dst_peer_id))?; + let mut tasks = JoinSet::new(); ip_list.interface_ipv4s.iter().for_each(|ip| { - let addr = format!("{}://{}:{}", "tcp", ip, 11010); + let addr = format!( + "{}://{}:{}", + listener.scheme(), + ip, + listener.port().unwrap_or(11010) + ); tasks.spawn(Self::try_connect_to_ip( data.clone(), dst_peer_id.clone(), @@ -251,7 +260,12 @@ impl DirectConnectorManager { )); }); - let addr = format!("{}://{}:{}", "tcp", ip_list.public_ipv4.clone(), 11010); + let addr = format!( + "{}://{}:{}", + listener.scheme(), + ip_list.public_ipv4.clone(), + listener.port().unwrap_or(11010) + ); tasks.spawn(Self::try_connect_to_ip( data.clone(), dst_peer_id.clone(), @@ -300,7 +314,7 @@ mod tests { lis_c .add_listener(TcpTunnelListener::new( - "tcp://0.0.0.0:11010".parse().unwrap(), + "tcp://0.0.0.0:11040".parse().unwrap(), )) .await .unwrap(); diff --git a/easytier-core/src/instance/listeners.rs b/easytier-core/src/instance/listeners.rs index d938a5a..70f2052 100644 --- a/easytier-core/src/instance/listeners.rs +++ b/easytier-core/src/instance/listeners.rs @@ -91,6 +91,7 @@ impl ListenerManage global_ctx: ArcGlobalCtx, ) { let mut l = listener.lock().await; + global_ctx.add_running_listener(l.local_url()); global_ctx.issue_event(GlobalCtxEvent::ListenerAdded(l.local_url())); while let Ok(ret) = l.accept().await { let tunnel_info = ret.info().unwrap(); diff --git a/easytier-core/src/main.rs b/easytier-core/src/main.rs index 215b501..ec19d5e 100644 --- a/easytier-core/src/main.rs +++ b/easytier-core/src/main.rs @@ -19,7 +19,7 @@ mod rpc; mod tunnels; use common::{ - config::{ConsoleLoggerConfig, FileLoggerConfig, PeerConfig}, + config::{ConsoleLoggerConfig, FileLoggerConfig, NetworkIdentity, PeerConfig}, get_logger_timer_rfc3339, }; use instance::instance::Instance; @@ -34,21 +34,18 @@ use crate::common::{ #[derive(Parser, Debug)] #[command(author, version, about, long_about = None)] struct Cli { - /// the instance name #[arg( - short = 'm', long, - default_value = "default", - help = "instance name to identify this vpn node in same machine" + help = "network name to identify this vpn network", + default_value = "default" )] - instance_name: String, - + network_name: String, #[arg( - short = 'd', long, - help = "instance uuid to identify this vpn node in whole vpn network example: 123e4567-e89b-12d3-a456-426614174000" + help = "network secret to verify this node belongs to the vpn network", + default_value = "" )] - instance_id: Option, + network_secret: String, #[arg(short, long, help = "ipv4 address of this vpn node")] ipv4: Option, @@ -92,12 +89,31 @@ struct Cli { file_log_level: Option, #[arg(long, help = "directory to store log files")] file_log_dir: Option, + + #[arg( + short = 'm', + long, + default_value = "default", + help = "instance name to identify this vpn node in same machine" + )] + instance_name: String, + + #[arg( + short = 'd', + long, + help = "instance uuid to identify this vpn node in whole vpn network example: 123e4567-e89b-12d3-a456-426614174000" + )] + instance_id: Option, } impl From for TomlConfigLoader { fn from(cli: Cli) -> Self { let cfg = TomlConfigLoader::default(); cfg.set_inst_name(cli.instance_name.clone()); + cfg.set_network_identity(NetworkIdentity { + network_name: cli.network_name.clone(), + network_secret: cli.network_secret.clone(), + }); cfg.set_netns(cli.net_ns.clone()); if let Some(ipv4) = &cli.ipv4 { diff --git a/easytier-core/src/rpc/peer.rs b/easytier-core/src/rpc/peer.rs index 02951b3..394808d 100644 --- a/easytier-core/src/rpc/peer.rs +++ b/easytier-core/src/rpc/peer.rs @@ -6,6 +6,7 @@ pub struct GetIpListResponse { pub interface_ipv4s: Vec, pub public_ipv6: String, pub interface_ipv6s: Vec, + pub listeners: Vec, } impl GetIpListResponse { @@ -15,6 +16,7 @@ impl GetIpListResponse { interface_ipv4s: vec![], public_ipv6: "".to_string(), interface_ipv6s: vec![], + listeners: vec![], } } }