diff --git a/easytier/src/common/global_ctx.rs b/easytier/src/common/global_ctx.rs index a6efcb2..c7c7c55 100644 --- a/easytier/src/common/global_ctx.rs +++ b/easytier/src/common/global_ctx.rs @@ -5,6 +5,7 @@ use std::{ }; use crate::proto::cli::PeerConnInfo; +use crate::proto::common::PeerFeatureFlag; use crossbeam::atomic::AtomicCell; use super::{ @@ -68,6 +69,8 @@ pub struct GlobalCtx { enable_exit_node: bool, no_tun: bool, + + feature_flags: AtomicCell, } impl std::fmt::Debug for GlobalCtx { @@ -119,6 +122,8 @@ impl GlobalCtx { enable_exit_node, no_tun, + + feature_flags: AtomicCell::new(PeerFeatureFlag::default()), } } @@ -246,6 +251,14 @@ impl GlobalCtx { pub fn no_tun(&self) -> bool { self.no_tun } + + pub fn get_feature_flags(&self) -> PeerFeatureFlag { + self.feature_flags.load() + } + + pub fn set_feature_flags(&self, flags: PeerFeatureFlag) { + self.feature_flags.store(flags); + } } #[cfg(test)] diff --git a/easytier/src/connector/direct.rs b/easytier/src/connector/direct.rs index b586ab8..bf7b71b 100644 --- a/easytier/src/connector/direct.rs +++ b/easytier/src/connector/direct.rs @@ -4,13 +4,16 @@ use std::{net::SocketAddr, sync::Arc}; use crate::{ common::{error::Error, global_ctx::ArcGlobalCtx, PeerId}, - peers::{peer_manager::PeerManager, peer_rpc::PeerRpcManager}, + peers::{ + peer_manager::PeerManager, peer_rpc::PeerRpcManager, + peer_rpc_service::DirectConnectorManagerRpcServer, + }, proto::{ peer_rpc::{ DirectConnectorRpc, DirectConnectorRpcClientFactory, DirectConnectorRpcServer, GetIpListRequest, GetIpListResponse, }, - rpc_types::{self, controller::BaseController}, + rpc_types::controller::BaseController, }, }; @@ -38,7 +41,10 @@ impl PeerManagerForDirectConnector for PeerManager { let mut ret = vec![]; let routes = self.list_routes().await; - for r in routes.iter() { + for r in routes + .iter() + .filter(|r| r.feature_flag.map(|r| !r.is_public_server).unwrap_or(true)) + { ret.push(r.peer_id); } @@ -54,38 +60,6 @@ impl PeerManagerForDirectConnector for PeerManager { } } -#[derive(Clone)] -struct DirectConnectorManagerRpcServer { - // TODO: this only cache for one src peer, should make it global - global_ctx: ArcGlobalCtx, -} - -#[async_trait::async_trait] -impl DirectConnectorRpc for DirectConnectorManagerRpcServer { - type Controller = BaseController; - - async fn get_ip_list( - &self, - _: BaseController, - _: GetIpListRequest, - ) -> rpc_types::error::Result { - let mut ret = self.global_ctx.get_ip_collector().collect_ip_addrs().await; - ret.listeners = self - .global_ctx - .get_running_listeners() - .into_iter() - .map(Into::into) - .collect(); - Ok(ret) - } -} - -impl DirectConnectorManagerRpcServer { - pub fn new(global_ctx: ArcGlobalCtx) -> Self { - Self { global_ctx } - } -} - #[derive(Hash, Eq, PartialEq, Clone)] struct DstBlackListItem(PeerId, String); diff --git a/easytier/src/peer_center/instance.rs b/easytier/src/peer_center/instance.rs index 5ad1f5e..d7bb0a5 100644 --- a/easytier/src/peer_center/instance.rs +++ b/easytier/src/peer_center/instance.rs @@ -65,7 +65,10 @@ impl PeerCenterBase { } // find peer with alphabetical smallest id. let mut min_peer = peer_mgr.my_peer_id(); - for peer in peers.iter() { + for peer in peers + .iter() + .filter(|r| r.feature_flag.map(|r| !r.is_public_server).unwrap_or(true)) + { let peer_id = peer.peer_id; if peer_id < min_peer { min_peer = peer_id; @@ -342,15 +345,22 @@ impl PeerCenterInstance { global_peer_map_update_time: Arc>, } - impl RouteCostCalculatorInterface for RouteCostCalculatorImpl { - fn calculate_cost(&self, src: PeerId, dst: PeerId) -> i32 { - let ret = self - .global_peer_map_clone + impl RouteCostCalculatorImpl { + fn directed_cost(&self, src: PeerId, dst: PeerId) -> Option { + self.global_peer_map_clone .map .get(&src) .and_then(|src_peer_info| src_peer_info.direct_peers.get(&dst)) - .and_then(|info| Some(info.latency_ms)); - ret.unwrap_or(80) + .and_then(|info| Some(info.latency_ms)) + } + } + + impl RouteCostCalculatorInterface for RouteCostCalculatorImpl { + fn calculate_cost(&self, src: PeerId, dst: PeerId) -> i32 { + if let Some(cost) = self.directed_cost(src, dst) { + return cost; + } + self.directed_cost(dst, src).unwrap_or(100) } fn begin_update(&mut self) { diff --git a/easytier/src/peers/foreign_network_manager.rs b/easytier/src/peers/foreign_network_manager.rs index cd4bb72..7d48f09 100644 --- a/easytier/src/peers/foreign_network_manager.rs +++ b/easytier/src/peers/foreign_network_manager.rs @@ -28,6 +28,7 @@ use crate::{ proto::{ cli::{ForeignNetworkEntryPb, ListForeignNetworkResponse, PeerInfo}, common::NatType, + peer_rpc::DirectConnectorRpcServer, }, tunnel::packet_def::{PacketType, ZCPacket}, }; @@ -37,6 +38,7 @@ use super::{ peer_map::PeerMap, peer_ospf_route::PeerRoute, peer_rpc::{PeerRpcManager, PeerRpcManagerTransport}, + peer_rpc_service::DirectConnectorManagerRpcServer, route_trait::{ArcRoute, NextHopPolicy}, PacketRecvChan, PacketRecvChanReceiver, }; @@ -47,6 +49,7 @@ struct ForeignNetworkEntry { peer_map: Arc, relay_data: bool, route: ArcRoute, + peer_rpc: Weak, } impl ForeignNetworkEntry { @@ -65,6 +68,9 @@ impl ForeignNetworkEntry { foreign_global_ctx.replace_stun_info_collector(Box::new(MockStunInfoCollector { udp_nat_type: NatType::Unknown, })); + let mut feature_flag = global_ctx.get_feature_flags(); + feature_flag.is_public_server = true; + global_ctx.set_feature_flags(feature_flag); let peer_map = Arc::new(PeerMap::new( packet_sender, @@ -72,7 +78,17 @@ impl ForeignNetworkEntry { my_peer_id, )); - let route = PeerRoute::new(my_peer_id, foreign_global_ctx.clone(), peer_rpc); + let route = PeerRoute::new(my_peer_id, foreign_global_ctx.clone(), peer_rpc.clone()); + + for u in global_ctx.get_running_listeners().into_iter() { + foreign_global_ctx.add_running_listener(u); + } + peer_rpc.rpc_server().registry().register( + DirectConnectorRpcServer::new(DirectConnectorManagerRpcServer::new( + foreign_global_ctx.clone(), + )), + &network.network_name, + ); Self { global_ctx: foreign_global_ctx, @@ -80,6 +96,7 @@ impl ForeignNetworkEntry { peer_map, relay_data, route: Arc::new(Box::new(route)), + peer_rpc: Arc::downgrade(&peer_rpc), } } @@ -116,6 +133,17 @@ impl ForeignNetworkEntry { } } +impl Drop for ForeignNetworkEntry { + fn drop(&mut self) { + if let Some(peer_rpc) = self.peer_rpc.upgrade() { + peer_rpc + .rpc_server() + .registry() + .unregister_by_domain(&self.network.network_name); + } + } +} + struct ForeignNetworkManagerData { network_peer_maps: DashMap>, peer_network_map: DashMap, diff --git a/easytier/src/peers/mod.rs b/easytier/src/peers/mod.rs index bff75ea..2598738 100644 --- a/easytier/src/peers/mod.rs +++ b/easytier/src/peers/mod.rs @@ -6,6 +6,7 @@ pub mod peer_manager; pub mod peer_map; pub mod peer_ospf_route; pub mod peer_rpc; +pub mod peer_rpc_service; pub mod route_trait; pub mod rpc_service; diff --git a/easytier/src/peers/peer_manager.rs b/easytier/src/peers/peer_manager.rs index a1e3878..9bc7579 100644 --- a/easytier/src/peers/peer_manager.rs +++ b/easytier/src/peers/peer_manager.rs @@ -750,6 +750,7 @@ impl PeerManager { .collect(), config: self.global_ctx.config.dump(), version: EASYTIER_VERSION.to_string(), + feature_flag: Some(self.global_ctx.get_feature_flags()), } } diff --git a/easytier/src/peers/peer_ospf_route.rs b/easytier/src/peers/peer_ospf_route.rs index ed5f190..2cab1ad 100644 --- a/easytier/src/peers/peer_ospf_route.rs +++ b/easytier/src/peers/peer_ospf_route.rs @@ -102,6 +102,7 @@ impl RoutePeerInfo { last_update: Some(SystemTime::now().into()), version: 0, easytier_version: EASYTIER_VERSION.to_string(), + feature_flag: None, } } @@ -127,6 +128,7 @@ impl RoutePeerInfo { version: self.version, easytier_version: EASYTIER_VERSION.to_string(), + feature_flag: Some(global_ctx.get_feature_flags()), }; let need_update_periodically = if let Ok(Ok(d)) = @@ -168,6 +170,7 @@ impl Into for RoutePeerInfo { }, inst_id: self.inst_id.map(|x| x.to_string()).unwrap_or_default(), version: self.easytier_version, + feature_flag: self.feature_flag, } } } diff --git a/easytier/src/peers/peer_rpc_service.rs b/easytier/src/peers/peer_rpc_service.rs new file mode 100644 index 0000000..ea7987c --- /dev/null +++ b/easytier/src/peers/peer_rpc_service.rs @@ -0,0 +1,39 @@ +use crate::{ + common::global_ctx::ArcGlobalCtx, + proto::{ + peer_rpc::{DirectConnectorRpc, GetIpListRequest, GetIpListResponse}, + rpc_types::{self, controller::BaseController}, + }, +}; + +#[derive(Clone)] +pub struct DirectConnectorManagerRpcServer { + // TODO: this only cache for one src peer, should make it global + global_ctx: ArcGlobalCtx, +} + +#[async_trait::async_trait] +impl DirectConnectorRpc for DirectConnectorManagerRpcServer { + type Controller = BaseController; + + async fn get_ip_list( + &self, + _: BaseController, + _: GetIpListRequest, + ) -> rpc_types::error::Result { + let mut ret = self.global_ctx.get_ip_collector().collect_ip_addrs().await; + ret.listeners = self + .global_ctx + .get_running_listeners() + .into_iter() + .map(Into::into) + .collect(); + Ok(ret) + } +} + +impl DirectConnectorManagerRpcServer { + pub fn new(global_ctx: ArcGlobalCtx) -> Self { + Self { global_ctx } + } +} diff --git a/easytier/src/proto/cli.proto b/easytier/src/proto/cli.proto index fce6e2c..301060d 100644 --- a/easytier/src/proto/cli.proto +++ b/easytier/src/proto/cli.proto @@ -53,6 +53,7 @@ message Route { common.StunInfo stun_info = 7; string inst_id = 8; string version = 9; + common.PeerFeatureFlag feature_flag = 10; } message NodeInfo { @@ -65,6 +66,7 @@ message NodeInfo { repeated string listeners = 7; string config = 8; string version = 9; + common.PeerFeatureFlag feature_flag = 10; } message ShowNodeInfoRequest {} diff --git a/easytier/src/proto/common.proto b/easytier/src/proto/common.proto index 093bc23..883a62a 100644 --- a/easytier/src/proto/common.proto +++ b/easytier/src/proto/common.proto @@ -90,3 +90,8 @@ message StunInfo { uint32 min_port = 5; uint32 max_port = 6; } + +message PeerFeatureFlag { + bool is_public_server = 1; + bool no_relay_data = 2; +} diff --git a/easytier/src/proto/peer_rpc.proto b/easytier/src/proto/peer_rpc.proto index 7f8bf1d..8ba7d0c 100644 --- a/easytier/src/proto/peer_rpc.proto +++ b/easytier/src/proto/peer_rpc.proto @@ -18,6 +18,7 @@ message RoutePeerInfo { uint32 version = 9; string easytier_version = 10; + common.PeerFeatureFlag feature_flag = 11; } message PeerIdVersion { diff --git a/easytier/src/proto/rpc_impl/service_registry.rs b/easytier/src/proto/rpc_impl/service_registry.rs index 0b7e875..4d5e200 100644 --- a/easytier/src/proto/rpc_impl/service_registry.rs +++ b/easytier/src/proto/rpc_impl/service_registry.rs @@ -84,6 +84,10 @@ impl ServiceRegistry { self.table.remove(&key).map(|_| ()) } + pub fn unregister_by_domain(&self, domain_name: &str) { + self.table.retain(|k, _| k.domain_name != domain_name); + } + pub async fn call_method( &self, rpc_desc: RpcDescriptor,