From 0053666dfb95c4ac895322cc7c29d18155fa97ad Mon Sep 17 00:00:00 2001 From: "Sijie.Sun" Date: Wed, 13 Mar 2024 00:15:22 +0800 Subject: [PATCH] use uint32 as peer id (#29) --- easytier-cli/src/main.rs | 4 +- easytier-core/proto/cli.proto | 14 +-- easytier-core/src/common/error.rs | 4 +- easytier-core/src/common/global_ctx.rs | 18 +-- easytier-core/src/common/mod.rs | 6 + easytier-core/src/connector/direct.rs | 33 +++-- easytier-core/src/connector/manual.rs | 17 +-- easytier-core/src/connector/udp_hole_punch.rs | 12 +- easytier-core/src/gateway/icmp_proxy.rs | 15 +-- easytier-core/src/gateway/udp_proxy.rs | 11 +- easytier-core/src/instance/instance.rs | 18 ++- easytier-core/src/instance/virtual_nic.rs | 2 - easytier-core/src/peer_center/instance.rs | 24 ++-- easytier-core/src/peer_center/server.rs | 2 +- easytier-core/src/peer_center/service.rs | 2 +- .../src/peers/foreign_network_client.rs | 23 ++-- .../src/peers/foreign_network_manager.rs | 63 +++++----- easytier-core/src/peers/mod.rs | 2 - easytier-core/src/peers/packet.rs | 54 ++++---- easytier-core/src/peers/peer.rs | 20 +-- easytier-core/src/peers/peer_conn.rs | 89 +++++++------- easytier-core/src/peers/peer_manager.rs | 99 ++++++++------- easytier-core/src/peers/peer_map.rs | 61 +++++----- easytier-core/src/peers/peer_rip_route.rs | 115 +++++++++--------- easytier-core/src/peers/peer_rpc.rs | 77 ++++++------ easytier-core/src/peers/route_trait.rs | 9 +- easytier-core/src/peers/rpc_service.rs | 2 +- easytier-core/src/peers/tests.rs | 17 ++- easytier-core/src/tests/mod.rs | 10 +- easytier-core/src/tests/three_node.rs | 14 +-- 30 files changed, 431 insertions(+), 406 deletions(-) diff --git a/easytier-cli/src/main.rs b/easytier-cli/src/main.rs index 2beb20e..875ab31 100644 --- a/easytier-cli/src/main.rs +++ b/easytier-cli/src/main.rs @@ -277,7 +277,7 @@ impl CommandHandler { tx_bytes: format_size(p.get_tx_bytes().unwrap_or(0), humansize::DECIMAL), tunnel_proto: p.get_conn_protos().unwrap_or(vec![]).join(",").to_string(), nat_type: p.get_udp_nat_type(), - id: p.route.peer_id.clone(), + id: p.route.peer_id.to_string(), } } } @@ -460,7 +460,7 @@ async fn main() -> Result<(), Error> { }) .collect::>(); table_rows.push(PeerCenterTableItem { - node_id: node_id.clone(), + node_id: node_id.to_string(), direct_peers: direct_peers.join("\n"), }); } diff --git a/easytier-core/proto/cli.proto b/easytier-core/proto/cli.proto index ebbd568..f711b52 100644 --- a/easytier-core/proto/cli.proto +++ b/easytier-core/proto/cli.proto @@ -24,8 +24,8 @@ message TunnelInfo { message PeerConnInfo { string conn_id = 1; - string my_node_id = 2; - string peer_id = 3; + uint32 my_peer_id = 2; + uint32 peer_id = 3; repeated string features = 4; TunnelInfo tunnel = 5; PeerConnStats stats = 6; @@ -33,7 +33,7 @@ message PeerConnInfo { } message PeerInfo { - string peer_id = 1; + uint32 peer_id = 1; repeated PeerConnInfo conns = 2; } @@ -62,9 +62,9 @@ message StunInfo { } message Route { - string peer_id = 1; + uint32 peer_id = 1; string ipv4_addr = 2; - string next_hop_peer_id = 3; + uint32 next_hop_peer_id = 3; int32 cost = 4; repeated string proxy_cidrs = 5; string hostname = 6; @@ -129,13 +129,13 @@ message DirectConnectedPeerInfo { } message PeerInfoForGlobalMap { - map direct_peers = 1; + map direct_peers = 1; } message GetGlobalPeerMapRequest {} message GetGlobalPeerMapResponse { - map global_peer_map = 1; + map global_peer_map = 1; } service PeerCenterRpc { diff --git a/easytier-core/src/common/error.rs b/easytier-core/src/common/error.rs index c741ed3..82e89c5 100644 --- a/easytier-core/src/common/error.rs +++ b/easytier-core/src/common/error.rs @@ -4,6 +4,8 @@ use thiserror::Error; use crate::tunnels; +use super::PeerId; + #[derive(Error, Debug)] pub enum Error { #[error("io error")] @@ -13,7 +15,7 @@ pub enum Error { #[error("tunnel error {0}")] TunnelError(#[from] tunnels::TunnelError), #[error("Peer has no conn, PeerId: {0}")] - PeerNoConnectionError(uuid::Uuid), + PeerNoConnectionError(PeerId), #[error("RouteError: {0}")] RouteError(String), #[error("Not found")] diff --git a/easytier-core/src/common/global_ctx.rs b/easytier-core/src/common/global_ctx.rs index 9cf60c1..3b4557f 100644 --- a/easytier-core/src/common/global_ctx.rs +++ b/easytier-core/src/common/global_ctx.rs @@ -3,19 +3,19 @@ use std::{io::Write, sync::Arc}; use crate::rpc::PeerConnInfo; use crossbeam::atomic::AtomicCell; use serde::{Deserialize, Serialize}; -use uuid::Uuid; use super::{ config_fs::ConfigFs, netns::NetNS, network::IPCollector, stun::{StunInfoCollector, StunInfoCollectorTrait}, + PeerId, }; #[derive(Debug, Clone, PartialEq)] pub enum GlobalCtxEvent { - PeerAdded(Uuid), - PeerRemoved(Uuid), + PeerAdded(PeerId), + PeerRemoved(PeerId), PeerConnAdded(PeerConnInfo), PeerConnRemoved(PeerConnInfo), } @@ -231,6 +231,8 @@ impl GlobalCtx { #[cfg(test)] pub mod tests { + use crate::common::new_peer_id; + use super::*; #[tokio::test] @@ -240,19 +242,19 @@ pub mod tests { let global_ctx = GlobalCtx::new("test", config_fs, net_ns, None); let mut subscriber = global_ctx.subscribe(); - let uuid = Uuid::new_v4(); - global_ctx.issue_event(GlobalCtxEvent::PeerAdded(uuid.clone())); - global_ctx.issue_event(GlobalCtxEvent::PeerRemoved(uuid.clone())); + let peer_id = new_peer_id(); + global_ctx.issue_event(GlobalCtxEvent::PeerAdded(peer_id.clone())); + global_ctx.issue_event(GlobalCtxEvent::PeerRemoved(peer_id.clone())); global_ctx.issue_event(GlobalCtxEvent::PeerConnAdded(PeerConnInfo::default())); global_ctx.issue_event(GlobalCtxEvent::PeerConnRemoved(PeerConnInfo::default())); assert_eq!( subscriber.recv().await.unwrap(), - GlobalCtxEvent::PeerAdded(uuid.clone()) + GlobalCtxEvent::PeerAdded(peer_id.clone()) ); assert_eq!( subscriber.recv().await.unwrap(), - GlobalCtxEvent::PeerRemoved(uuid.clone()) + GlobalCtxEvent::PeerRemoved(peer_id.clone()) ); assert_eq!( subscriber.recv().await.unwrap(), diff --git a/easytier-core/src/common/mod.rs b/easytier-core/src/common/mod.rs index 85eb353..ca02025 100644 --- a/easytier-core/src/common/mod.rs +++ b/easytier-core/src/common/mod.rs @@ -23,3 +23,9 @@ pub fn get_logger_timer_rfc3339( ) -> tracing_subscriber::fmt::time::OffsetTime { get_logger_timer(time::format_description::well_known::Rfc3339) } + +pub type PeerId = u32; + +pub fn new_peer_id() -> PeerId { + rand::random() +} diff --git a/easytier-core/src/connector/direct.rs b/easytier-core/src/connector/direct.rs index b24dcb8..92cc625 100644 --- a/easytier-core/src/connector/direct.rs +++ b/easytier-core/src/connector/direct.rs @@ -8,8 +8,9 @@ use crate::{ error::Error, global_ctx::ArcGlobalCtx, network::IPCollector, + PeerId, }, - peers::{peer_manager::PeerManager, peer_rpc::PeerRpcManager, PeerId}, + peers::{peer_manager::PeerManager, peer_rpc::PeerRpcManager}, }; use crate::rpc::{peer::GetIpListResponse, PeerConnInfo}; @@ -37,7 +38,7 @@ impl PeerManagerForDirectConnector for PeerManager { let routes = self.list_routes().await; for r in routes.iter() { - ret.push(r.peer_id.parse().unwrap()); + ret.push(r.peer_id); } ret @@ -91,7 +92,6 @@ impl std::fmt::Debug for DirectConnectorManagerData { } pub struct DirectConnectorManager { - my_node_id: uuid::Uuid, global_ctx: ArcGlobalCtx, data: Arc, @@ -99,13 +99,8 @@ pub struct DirectConnectorManager { } impl DirectConnectorManager { - pub fn new( - my_node_id: uuid::Uuid, - global_ctx: ArcGlobalCtx, - peer_manager: Arc, - ) -> Self { + pub fn new(global_ctx: ArcGlobalCtx, peer_manager: Arc) -> Self { Self { - my_node_id, global_ctx: global_ctx.clone(), data: Arc::new(DirectConnectorManagerData { global_ctx, @@ -130,14 +125,14 @@ impl DirectConnectorManager { pub fn run_as_client(&mut self) { let data = self.data.clone(); - let my_node_id = self.my_node_id.clone(); + let my_peer_id = self.data.peer_manager.my_peer_id(); self.tasks.spawn( async move { loop { let peers = data.peer_manager.list_peers().await; let mut tasks = JoinSet::new(); for peer_id in peers { - if peer_id == my_node_id { + if peer_id == my_peer_id { continue; } tasks.spawn(Self::do_try_direct_connect(data.clone(), peer_id)); @@ -149,7 +144,9 @@ impl DirectConnectorManager { tokio::time::sleep(std::time::Duration::from_secs(5)).await; } } - .instrument(tracing::info_span!("direct_connector_client", my_id = ?self.my_node_id)), + .instrument( + tracing::info_span!("direct_connector_client", my_id = ?self.global_ctx.id), + ), ); } @@ -185,7 +182,7 @@ impl DirectConnectorManager { ); data.peer_manager .get_peer_map() - .close_peer_conn(&peer_id, &conn_id) + .close_peer_conn(peer_id, &conn_id) .await?; return Err(Error::InvalidUrl(addr)); } @@ -291,14 +288,12 @@ mod tests { connect_peer_manager(p_a.clone(), p_b.clone()).await; connect_peer_manager(p_b.clone(), p_c.clone()).await; - wait_route_appear(p_a.clone(), p_c.my_node_id()) + wait_route_appear(p_a.clone(), p_c.my_peer_id()) .await .unwrap(); - let mut dm_a = - DirectConnectorManager::new(p_a.my_node_id(), p_a.get_global_ctx(), p_a.clone()); - let mut dm_c = - DirectConnectorManager::new(p_c.my_node_id(), p_c.get_global_ctx(), p_c.clone()); + let mut dm_a = DirectConnectorManager::new(p_a.get_global_ctx(), p_a.clone()); + let mut dm_c = DirectConnectorManager::new(p_c.get_global_ctx(), p_c.clone()); dm_a.run_as_client(); dm_c.run_as_server(); @@ -318,7 +313,7 @@ mod tests { lis_c.run().await.unwrap(); - wait_route_appear_with_cost(p_a.clone(), p_c.my_node_id(), Some(1)) + wait_route_appear_with_cost(p_a.clone(), p_c.my_peer_id(), Some(1)) .await .unwrap(); } diff --git a/easytier-core/src/connector/manual.rs b/easytier-core/src/connector/manual.rs index 33b2ece..7107863 100644 --- a/easytier-core/src/connector/manual.rs +++ b/easytier-core/src/connector/manual.rs @@ -7,7 +7,7 @@ use tokio::{ time::timeout, }; -use crate::rpc as easytier_rpc; +use crate::{common::PeerId, peers::peer_conn::PeerConnId, rpc as easytier_rpc}; use crate::{ common::{ @@ -32,8 +32,8 @@ type ConnectorMap = Arc>> #[derive(Debug, Clone)] struct ReconnResult { dead_url: String, - peer_id: uuid::Uuid, - conn_id: uuid::Uuid, + peer_id: PeerId, + conn_id: PeerConnId, } struct ConnectorManagerData { @@ -48,24 +48,18 @@ struct ConnectorManagerData { } pub struct ManualConnectorManager { - my_node_id: uuid::Uuid, global_ctx: ArcGlobalCtx, data: Arc, tasks: JoinSet<()>, } impl ManualConnectorManager { - pub fn new( - my_node_id: uuid::Uuid, - global_ctx: ArcGlobalCtx, - peer_manager: Arc, - ) -> Self { + pub fn new(global_ctx: ArcGlobalCtx, peer_manager: Arc) -> Self { let connectors = Arc::new(DashMap::new()); let tasks = JoinSet::new(); let event_subscriber = global_ctx.subscribe(); let mut ret = Self { - my_node_id, global_ctx: global_ctx.clone(), data: Arc::new(ConnectorManagerData { connectors, @@ -364,8 +358,7 @@ mod tests { set_global_var!(MANUAL_CONNECTOR_RECONNECT_INTERVAL_MS, 1); let peer_mgr = create_mock_peer_manager().await; - let my_node_id = uuid::Uuid::new_v4(); - let mgr = ManualConnectorManager::new(my_node_id, peer_mgr.get_global_ctx(), peer_mgr); + let mgr = ManualConnectorManager::new(peer_mgr.get_global_ctx(), peer_mgr); struct MockConnector {} #[async_trait::async_trait] diff --git a/easytier-core/src/connector/udp_hole_punch.rs b/easytier-core/src/connector/udp_hole_punch.rs index fade1d8..5e40790 100644 --- a/easytier-core/src/connector/udp_hole_punch.rs +++ b/easytier-core/src/connector/udp_hole_punch.rs @@ -9,9 +9,9 @@ use tracing::Instrument; use crate::{ common::{ constants, error::Error, global_ctx::ArcGlobalCtx, rkyv_util::encode_to_bytes, - stun::StunInfoCollectorTrait, + stun::StunInfoCollectorTrait, PeerId, }, - peers::{peer_manager::PeerManager, PeerId}, + peers::peer_manager::PeerManager, rpc::NatType, tunnels::{ common::setup_sokcet2, @@ -283,7 +283,7 @@ impl UdpHolePunchConnector { continue; }; - let peer_id: PeerId = route.peer_id.parse().unwrap(); + let peer_id: PeerId = route.peer_id; let conns = data.peer_mgr.list_peer_conns(&peer_id).await; if conns.is_some() && conns.unwrap().len() > 0 { continue; @@ -310,7 +310,7 @@ impl UdpHolePunchConnector { // if we have smae level of full cone, node with smaller peer_id will be the initiator if my_nat_type == peer_nat_type { - if data.global_ctx.id > peer_id { + if data.peer_mgr.my_peer_id() > peer_id { continue; } } else { @@ -522,7 +522,7 @@ pub mod tests { connect_peer_manager(p_a.clone(), p_b.clone()).await; connect_peer_manager(p_b.clone(), p_c.clone()).await; - wait_route_appear(p_a.clone(), p_c.my_node_id()) + wait_route_appear(p_a.clone(), p_c.my_peer_id()) .await .unwrap(); @@ -534,7 +534,7 @@ pub mod tests { hole_punching_a.run().await.unwrap(); hole_punching_c.run().await.unwrap(); - wait_route_appear_with_cost(p_a.clone(), p_c.my_node_id(), Some(1)) + wait_route_appear_with_cost(p_a.clone(), p_c.my_peer_id(), Some(1)) .await .unwrap(); println!("{:?}", p_a.list_routes().await); diff --git a/easytier-core/src/gateway/icmp_proxy.rs b/easytier-core/src/gateway/icmp_proxy.rs index ce19975..920b456 100644 --- a/easytier-core/src/gateway/icmp_proxy.rs +++ b/easytier-core/src/gateway/icmp_proxy.rs @@ -20,11 +20,10 @@ use tokio_util::bytes::Bytes; use tracing::Instrument; use crate::{ - common::{error::Error, global_ctx::ArcGlobalCtx}, + common::{error::Error, global_ctx::ArcGlobalCtx, PeerId}, peers::{ packet, peer_manager::{PeerManager, PeerPacketFilter}, - PeerId, }, }; @@ -184,13 +183,9 @@ impl PeerPacketFilter for IcmpProxy { icmp_seq, }; - if packet.to_peer.is_none() { - return None; - } - let value = IcmpNatEntry::new( - packet.from_peer.to_uuid(), - packet.to_peer.as_ref().unwrap().to_uuid(), + packet.from_peer.into(), + packet.to_peer.into(), ipv4.get_source().into(), ) .ok()?; @@ -270,8 +265,8 @@ impl IcmpProxy { self.tasks.lock().await.spawn( async move { while let Some(msg) = receiver.recv().await { - let to_peer_id: uuid::Uuid = msg.to_peer.as_ref().unwrap().clone().into(); - let ret = peer_manager.send_msg(msg.into(), &to_peer_id).await; + let to_peer_id = msg.to_peer.into(); + let ret = peer_manager.send_msg(msg.into(), to_peer_id).await; if ret.is_err() { tracing::error!("send icmp packet to peer failed: {:?}", ret); } diff --git a/easytier-core/src/gateway/udp_proxy.rs b/easytier-core/src/gateway/udp_proxy.rs index 192f658..c896d1a 100644 --- a/easytier-core/src/gateway/udp_proxy.rs +++ b/easytier-core/src/gateway/udp_proxy.rs @@ -25,11 +25,10 @@ use tokio_util::bytes::Bytes; use tracing::Level; use crate::{ - common::{error::Error, global_ctx::ArcGlobalCtx}, + common::{error::Error, global_ctx::ArcGlobalCtx, PeerId}, peers::{ packet, peer_manager::{PeerManager, PeerPacketFilter}, - PeerId, }, tunnels::common::setup_sokcet2, }; @@ -276,8 +275,8 @@ impl PeerPacketFilter for UdpProxy { tracing::info!(?packet, ?ipv4, ?udp_packet, "udp nat table entry created"); let _g = self.global_ctx.net_ns.guard(); Ok(Arc::new(UdpNatEntry::new( - packet.from_peer.to_uuid(), - packet.to_peer.as_ref().unwrap().to_uuid(), + packet.from_peer.into(), + packet.to_peer.into(), nat_key.src_socket, )?)) }) @@ -366,9 +365,9 @@ impl UdpProxy { let peer_manager = self.peer_manager.clone(); self.tasks.lock().await.spawn(async move { while let Some(msg) = receiver.recv().await { - let to_peer_id: uuid::Uuid = msg.to_peer.as_ref().unwrap().clone().into(); + let to_peer_id: PeerId = msg.to_peer.into(); tracing::trace!(?msg, ?to_peer_id, "udp nat packet response send"); - let ret = peer_manager.send_msg(msg.into(), &to_peer_id).await; + let ret = peer_manager.send_msg(msg.into(), to_peer_id).await; if ret.is_err() { tracing::error!("send icmp packet to peer failed: {:?}", ret); } diff --git a/easytier-core/src/instance/instance.rs b/easytier-core/src/instance/instance.rs index 2636212..fb9d83b 100644 --- a/easytier-core/src/instance/instance.rs +++ b/easytier-core/src/instance/instance.rs @@ -9,12 +9,12 @@ use pnet::packet::ipv4::Ipv4Packet; use tokio::{sync::Mutex, task::JoinSet}; use tokio_util::bytes::{Bytes, BytesMut}; use tonic::transport::Server; -use uuid::Uuid; use crate::common::config_fs::ConfigFs; use crate::common::error::Error; use crate::common::global_ctx::{ArcGlobalCtx, GlobalCtx}; use crate::common::netns::NetNS; +use crate::common::PeerId; use crate::connector::direct::DirectConnectorManager; use crate::connector::manual::{ConnectorManagerRpcService, ManualConnectorManager}; use crate::connector::udp_hole_punch::UdpHolePunchConnector; @@ -22,6 +22,7 @@ use crate::gateway::icmp_proxy::IcmpProxy; use crate::gateway::tcp_proxy::TcpProxy; use crate::gateway::udp_proxy::UdpProxy; use crate::peer_center::instance::PeerCenterInstance; +use crate::peers::peer_conn::PeerConnId; use crate::peers::peer_manager::PeerManager; use crate::peers::rpc_service::PeerManagerRpcService; use crate::tunnels::SinkItem; @@ -126,19 +127,18 @@ impl Instance { )); let listener_manager = Arc::new(Mutex::new(ListenerManager::new( - id, + peer_manager.my_node_id(), net_ns.clone(), peer_manager.clone(), ))); let conn_manager = Arc::new(ManualConnectorManager::new( - id, global_ctx.clone(), peer_manager.clone(), )); let mut direct_conn_manager = - DirectConnectorManager::new(id, global_ctx.clone(), peer_manager.clone()); + DirectConnectorManager::new(global_ctx.clone(), peer_manager.clone()); direct_conn_manager.run(); let udp_hole_puncher = UdpHolePunchConnector::new(global_ctx.clone(), peer_manager.clone()); @@ -302,7 +302,11 @@ impl Instance { self.peer_manager.clone() } - pub async fn close_peer_conn(&mut self, peer_id: &Uuid, conn_id: &Uuid) -> Result<(), Error> { + pub async fn close_peer_conn( + &mut self, + peer_id: PeerId, + conn_id: &PeerConnId, + ) -> Result<(), Error> { self.peer_manager .get_peer_map() .close_peer_conn(peer_id, conn_id) @@ -321,6 +325,10 @@ impl Instance { self.id } + pub fn peer_id(&self) -> PeerId { + self.peer_manager.my_peer_id() + } + fn run_rpc_server(&mut self) -> Result<(), Box> { let addr = "0.0.0.0:15888".parse()?; let peer_mgr = self.peer_manager.clone(); diff --git a/easytier-core/src/instance/virtual_nic.rs b/easytier-core/src/instance/virtual_nic.rs index 586cf82..3f92874 100644 --- a/easytier-core/src/instance/virtual_nic.rs +++ b/easytier-core/src/instance/virtual_nic.rs @@ -54,7 +54,6 @@ impl VirtualNic { let mut config = tun::Configuration::default(); let has_packet_info = cfg!(target_os = "macos"); config.layer(tun::Layer::L3); - config.name(format!("et_{}", self.global_ctx.inst_name)); #[cfg(target_os = "linux")] { @@ -62,7 +61,6 @@ impl VirtualNic { // detect protocol by ourselves for cross platform config.packet_information(false); }); - config.name(self.dev_name.clone()); } if self.queue_num != 1 { diff --git a/easytier-core/src/peer_center/instance.rs b/easytier-core/src/peer_center/instance.rs index 4096e55..d206995 100644 --- a/easytier-core/src/peer_center/instance.rs +++ b/easytier-core/src/peer_center/instance.rs @@ -16,7 +16,8 @@ use tokio::{ use tracing::Instrument; use crate::{ - peers::{peer_manager::PeerManager, rpc_service::PeerManagerRpcService, PeerId}, + common::PeerId, + peers::{peer_manager::PeerManager, rpc_service::PeerManagerRpcService}, rpc::{GetGlobalPeerMapRequest, GetGlobalPeerMapResponse}, }; @@ -43,7 +44,7 @@ impl PeerCenterBase { pub async fn init(&self) -> Result<(), Error> { self.peer_mgr.get_peer_rpc_mgr().run_service( SERVICE_ID, - PeerCenterServer::new(self.peer_mgr.my_node_id()).serve(), + PeerCenterServer::new(self.peer_mgr.my_peer_id()).serve(), ); Ok(()) @@ -55,13 +56,14 @@ impl PeerCenterBase { return None; } // find peer with alphabetical smallest id. - let mut min_peer = peer_mgr.my_node_id().to_string(); + let mut min_peer = peer_mgr.my_peer_id(); for peer in peers.iter() { - if peer.peer_id < min_peer { - min_peer = peer.peer_id.clone(); + let peer_id = peer.peer_id; + if peer_id < min_peer { + min_peer = peer_id; } } - Some(min_peer.parse().unwrap()) + Some(min_peer) } async fn init_periodic_job< @@ -72,7 +74,7 @@ impl PeerCenterBase { job_ctx: T, job_fn: (impl Fn(PeerCenterServiceClient, Arc>) -> Fut + Send + Sync + 'static), ) -> () { - let my_node_id = self.peer_mgr.my_node_id(); + let my_peer_id = self.peer_mgr.my_peer_id(); let peer_mgr = self.peer_mgr.clone(); let lock = self.lock.clone(); self.tasks.lock().await.spawn( @@ -111,7 +113,7 @@ impl PeerCenterBase { } } } - .instrument(tracing::info_span!("periodic_job", ?my_node_id)), + .instrument(tracing::info_span!("periodic_job", ?my_peer_id)), ); } @@ -140,7 +142,7 @@ impl crate::rpc::cli::peer_center_rpc_server::PeerCenterRpc for PeerCenterInstan global_peer_map: global_peer_map .map .into_iter() - .map(|(k, v)| (k.to_string(), v)) + .map(|(k, v)| (k, v)) .collect(), })) } @@ -233,7 +235,7 @@ impl PeerCenterInstance { self.client .init_periodic_job(ctx, |client, ctx| async move { - let my_node_id = ctx.peer_mgr.my_node_id(); + let my_node_id = ctx.peer_mgr.my_peer_id(); // if peers are not same in next 10 seconds, report peers to center server let mut peers = PeerInfoForGlobalMap::default(); @@ -317,7 +319,7 @@ mod tests { connect_peer_manager(peer_mgr_a.clone(), peer_mgr_b.clone()).await; connect_peer_manager(peer_mgr_b.clone(), peer_mgr_c.clone()).await; - wait_route_appear(peer_mgr_a.clone(), peer_mgr_c.my_node_id()) + wait_route_appear(peer_mgr_a.clone(), peer_mgr_c.my_peer_id()) .await .unwrap(); diff --git a/easytier-core/src/peer_center/server.rs b/easytier-core/src/peer_center/server.rs index 18bf378..cf6d40e 100644 --- a/easytier-core/src/peer_center/server.rs +++ b/easytier-core/src/peer_center/server.rs @@ -7,7 +7,7 @@ use dashmap::DashMap; use once_cell::sync::Lazy; use tokio::{sync::RwLock, task::JoinSet}; -use crate::peers::PeerId; +use crate::common::PeerId; use super::{ service::{GetGlobalPeerMapResponse, GlobalPeerMap, PeerCenterService, PeerInfoForGlobalMap}, diff --git a/easytier-core/src/peer_center/service.rs b/easytier-core/src/peer_center/service.rs index 61c735c..647d3aa 100644 --- a/easytier-core/src/peer_center/service.rs +++ b/easytier-core/src/peer_center/service.rs @@ -1,6 +1,6 @@ use std::collections::BTreeMap; -use crate::{peers::PeerId, rpc::DirectConnectedPeerInfo}; +use crate::{common::PeerId, rpc::DirectConnectedPeerInfo}; use super::{Digest, Error}; use crate::rpc::PeerInfo; diff --git a/easytier-core/src/peers/foreign_network_client.rs b/easytier-core/src/peers/foreign_network_client.rs index ae877aa..bfc4a5f 100644 --- a/easytier-core/src/peers/foreign_network_client.rs +++ b/easytier-core/src/peers/foreign_network_client.rs @@ -13,6 +13,7 @@ use tokio_util::bytes::Bytes; use crate::common::{ error::Error, global_ctx::{ArcGlobalCtx, NetworkIdentity}, + PeerId, }; use super::{ @@ -20,12 +21,12 @@ use super::{ peer_conn::PeerConn, peer_map::PeerMap, peer_rpc::PeerRpcManager, - PeerId, }; pub struct ForeignNetworkClient { global_ctx: ArcGlobalCtx, peer_rpc: Arc, + my_peer_id: PeerId, peer_map: Arc, @@ -38,13 +39,19 @@ impl ForeignNetworkClient { global_ctx: ArcGlobalCtx, packet_sender_to_mgr: mpsc::Sender, peer_rpc: Arc, + my_peer_id: PeerId, ) -> Self { - let peer_map = Arc::new(PeerMap::new(packet_sender_to_mgr, global_ctx.clone())); + let peer_map = Arc::new(PeerMap::new( + packet_sender_to_mgr, + global_ctx.clone(), + my_peer_id, + )); let next_hop = Arc::new(DashMap::new()); Self { global_ctx, peer_rpc, + my_peer_id, peer_map, @@ -130,20 +137,20 @@ impl ForeignNetworkClient { new_next_hop } - pub fn has_next_hop(&self, peer_id: &PeerId) -> bool { + pub fn has_next_hop(&self, peer_id: PeerId) -> bool { self.get_next_hop(peer_id).is_some() } - pub fn get_next_hop(&self, peer_id: &PeerId) -> Option { + pub fn get_next_hop(&self, peer_id: PeerId) -> Option { if self.peer_map.has_peer(peer_id) { return Some(peer_id.clone()); } - self.next_hop.get(peer_id).map(|v| v.clone()) + self.next_hop.get(&peer_id).map(|v| v.clone()) } - pub async fn send_msg(&self, msg: Bytes, peer_id: &PeerId) -> Result<(), Error> { + pub async fn send_msg(&self, msg: Bytes, peer_id: PeerId) -> Result<(), Error> { if let Some(next_hop) = self.get_next_hop(peer_id) { - return self.peer_map.send_msg_directly(msg, &next_hop).await; + return self.peer_map.send_msg_directly(msg, next_hop).await; } Err(Error::RouteError("no next hop".to_string())) } @@ -151,7 +158,7 @@ impl ForeignNetworkClient { pub fn list_foreign_peers(&self) -> Vec { let mut peers = vec![]; for item in self.next_hop.iter() { - if item.key() != &self.global_ctx.get_id() { + if item.key() != &self.my_peer_id { peers.push(item.key().clone()); } } diff --git a/easytier-core/src/peers/foreign_network_manager.rs b/easytier-core/src/peers/foreign_network_manager.rs index 412d529..26e5810 100644 --- a/easytier-core/src/peers/foreign_network_manager.rs +++ b/easytier-core/src/peers/foreign_network_manager.rs @@ -16,11 +16,11 @@ use tokio::{ task::JoinSet, }; use tokio_util::bytes::Bytes; -use uuid::Uuid; use crate::common::{ error::Error, global_ctx::{ArcGlobalCtx, GlobalCtxEvent, NetworkIdentity}, + PeerId, }; use super::{ @@ -28,7 +28,6 @@ use super::{ peer_conn::PeerConn, peer_map::PeerMap, peer_rpc::{PeerRpcManager, PeerRpcManagerTransport}, - PeerId, }; struct ForeignNetworkEntry { @@ -41,8 +40,9 @@ impl ForeignNetworkEntry { network: NetworkIdentity, packet_sender: mpsc::Sender, global_ctx: ArcGlobalCtx, + my_peer_id: PeerId, ) -> Self { - let peer_map = Arc::new(PeerMap::new(packet_sender, global_ctx)); + let peer_map = Arc::new(PeerMap::new(packet_sender, global_ctx, my_peer_id)); Self { network, peer_map } } } @@ -53,10 +53,10 @@ struct ForeignNetworkManagerData { } impl ForeignNetworkManagerData { - async fn send_msg(&self, msg: Bytes, dst_peer_id: &uuid::Uuid) -> Result<(), Error> { + async fn send_msg(&self, msg: Bytes, dst_peer_id: PeerId) -> Result<(), Error> { let network_name = self .peer_network_map - .get(dst_peer_id) + .get(&dst_peer_id) .ok_or_else(|| Error::RouteError("network not found".to_string()))? .clone(); let entry = self @@ -67,16 +67,16 @@ impl ForeignNetworkManagerData { entry.peer_map.send_msg(msg, dst_peer_id).await } - fn get_peer_network(&self, peer_id: &uuid::Uuid) -> Option { - self.peer_network_map.get(peer_id).map(|v| v.clone()) + fn get_peer_network(&self, peer_id: PeerId) -> Option { + self.peer_network_map.get(&peer_id).map(|v| v.clone()) } fn get_network_entry(&self, network_name: &str) -> Option> { self.network_peer_maps.get(network_name).map(|v| v.clone()) } - fn remove_peer(&self, peer_id: &uuid::Uuid) { - self.peer_network_map.remove(peer_id); + fn remove_peer(&self, peer_id: PeerId) { + self.peer_network_map.remove(&peer_id); self.network_peer_maps.retain(|_, v| !v.peer_map.is_empty()); } @@ -91,7 +91,7 @@ impl ForeignNetworkManagerData { } struct RpcTransport { - my_peer_id: uuid::Uuid, + my_peer_id: PeerId, data: Arc, packet_recv: Mutex>, @@ -99,11 +99,11 @@ struct RpcTransport { #[async_trait::async_trait] impl PeerRpcManagerTransport for RpcTransport { - fn my_peer_id(&self) -> Uuid { + fn my_peer_id(&self) -> PeerId { self.my_peer_id } - async fn send(&self, msg: Bytes, dst_peer_id: &uuid::Uuid) -> Result<(), Error> { + async fn send(&self, msg: Bytes, dst_peer_id: PeerId) -> Result<(), Error> { self.data.send_msg(msg, dst_peer_id).await } @@ -136,6 +136,7 @@ impl ForeignNetworkService for Arc { } pub struct ForeignNetworkManager { + my_peer_id: PeerId, global_ctx: ArcGlobalCtx, packet_sender_to_mgr: mpsc::Sender, @@ -150,7 +151,11 @@ pub struct ForeignNetworkManager { } impl ForeignNetworkManager { - pub fn new(global_ctx: ArcGlobalCtx, packet_sender_to_mgr: mpsc::Sender) -> Self { + pub fn new( + my_peer_id: PeerId, + global_ctx: ArcGlobalCtx, + packet_sender_to_mgr: mpsc::Sender, + ) -> Self { // recv packet from all foreign networks let (packet_sender, packet_recv) = mpsc::channel(1000); @@ -162,12 +167,13 @@ impl ForeignNetworkManager { // handle rpc from foreign networks let (rpc_transport_sender, peer_rpc_tspt_recv) = mpsc::unbounded_channel(); let rpc_mgr = Arc::new(PeerRpcManager::new(RpcTransport { - my_peer_id: global_ctx.get_id(), + my_peer_id, data: data.clone(), packet_recv: Mutex::new(peer_rpc_tspt_recv), })); Self { + my_peer_id, global_ctx, packet_sender_to_mgr, @@ -194,6 +200,7 @@ impl ForeignNetworkManager { peer_conn.get_network_identity(), self.packet_sender.clone(), self.global_ctx.clone(), + self.my_peer_id, )) }) .clone(); @@ -217,7 +224,7 @@ impl ForeignNetworkManager { while let Ok(e) = s.recv().await { tracing::warn!(?e, "global event"); if let GlobalCtxEvent::PeerRemoved(peer_id) = &e { - data.remove_peer(peer_id); + data.remove_peer(*peer_id); } else if let GlobalCtxEvent::PeerConnRemoved(..) = &e { data.clear_no_conn_peer(); } @@ -228,16 +235,16 @@ impl ForeignNetworkManager { async fn start_packet_recv(&self) { let mut recv = self.packet_recv.lock().await.take().unwrap(); let sender_to_mgr = self.packet_sender_to_mgr.clone(); - let my_node_id = self.global_ctx.get_id(); + let my_node_id = self.my_peer_id; let rpc_sender = self.rpc_transport_sender.clone(); let data = self.data.clone(); self.tasks.lock().await.spawn(async move { while let Some(packet_bytes) = recv.recv().await { let packet = packet::Packet::decode(&packet_bytes); - let from_peer_uuid = packet.from_peer.to_uuid(); - let to_peer_uuid = packet.to_peer.as_ref().unwrap().to_uuid(); - if to_peer_uuid == my_node_id { + let from_peer_id = packet.from_peer.into(); + let to_peer_id = packet.to_peer.into(); + if to_peer_id == my_node_id { if let ArchivedPacketBody::Ctrl(packet::ArchivedCtrlPacketBody::TaRpc(..)) = &packet.body { @@ -248,10 +255,10 @@ impl ForeignNetworkManager { tracing::error!("send packet to mgr failed: {:?}", e); } } else { - let Some(from_network) = data.get_peer_network(&from_peer_uuid) else { + let Some(from_network) = data.get_peer_network(from_peer_id) else { continue; }; - let Some(to_network) = data.get_peer_network(&to_peer_uuid) else { + let Some(to_network) = data.get_peer_network(to_peer_id) else { continue; }; if from_network != to_network { @@ -259,7 +266,7 @@ impl ForeignNetworkManager { } if let Some(entry) = data.get_network_entry(&from_network) { - let ret = entry.peer_map.send_msg(packet_bytes, &to_peer_uuid).await; + let ret = entry.peer_map.send_msg(packet_bytes, to_peer_id).await; if ret.is_err() { tracing::error!("forward packet to peer failed: {:?}", ret.err()); } @@ -339,7 +346,7 @@ mod tests { assert!(succ); assert_eq!( - vec![pm_center.my_node_id()], + vec![pm_center.my_peer_id()], pma_net1 .get_foreign_network_client() .get_peer_map() @@ -347,14 +354,14 @@ mod tests { .await ); assert_eq!( - vec![pm_center.my_node_id()], + vec![pm_center.my_peer_id()], pmb_net1 .get_foreign_network_client() .get_peer_map() .list_peers() .await ); - wait_route_appear(pma_net1.clone(), pmb_net1.my_node_id()) + wait_route_appear(pma_net1.clone(), pmb_net1.my_peer_id()) .await .unwrap(); assert_eq!(1, pma_net1.list_routes().await.len()); @@ -362,10 +369,10 @@ mod tests { let pmc_net1 = create_mock_peer_manager_for_foreign_network("net1").await; connect_peer_manager(pmc_net1.clone(), pm_center.clone()).await; - wait_route_appear(pma_net1.clone(), pmc_net1.my_node_id()) + wait_route_appear(pma_net1.clone(), pmc_net1.my_peer_id()) .await .unwrap(); - wait_route_appear(pmb_net1.clone(), pmc_net1.my_node_id()) + wait_route_appear(pmb_net1.clone(), pmc_net1.my_peer_id()) .await .unwrap(); assert_eq!(2, pmc_net1.list_routes().await.len()); @@ -374,7 +381,7 @@ mod tests { let pmb_net2 = create_mock_peer_manager_for_foreign_network("net2").await; connect_peer_manager(pma_net2.clone(), pm_center.clone()).await; connect_peer_manager(pmb_net2.clone(), pm_center.clone()).await; - wait_route_appear(pma_net2.clone(), pmb_net2.my_node_id()) + wait_route_appear(pma_net2.clone(), pmb_net2.my_peer_id()) .await .unwrap(); assert_eq!(1, pma_net2.list_routes().await.len()); diff --git a/easytier-core/src/peers/mod.rs b/easytier-core/src/peers/mod.rs index 5f6f1bc..f71b80a 100644 --- a/easytier-core/src/peers/mod.rs +++ b/easytier-core/src/peers/mod.rs @@ -13,5 +13,3 @@ pub mod foreign_network_manager; #[cfg(test)] pub mod tests; - -pub type PeerId = uuid::Uuid; diff --git a/easytier-core/src/peers/packet.rs b/easytier-core/src/peers/packet.rs index 6f8f7bc..b5e34d6 100644 --- a/easytier-core/src/peers/packet.rs +++ b/easytier-core/src/peers/packet.rs @@ -6,6 +6,7 @@ use tokio_util::bytes::Bytes; use crate::common::{ global_ctx::NetworkIdentity, rkyv_util::{decode_from_bytes, encode_to_bytes}, + PeerId, }; const MAGIC: u32 = 0xd1e1a5e1; @@ -92,7 +93,7 @@ impl Debug for ArchivedNetworkIdentityForPacket { #[archive_attr(derive(Debug))] pub struct HandShake { pub magic: u32, - pub my_peer_id: UUID, + pub my_peer_id: PeerId, pub version: u32, pub features: Vec, pub network_identity: NetworkIdentityForPacket, @@ -140,8 +141,8 @@ pub enum PacketBody { // Derives can be passed through to the generated type: #[archive_attr(derive(Debug))] pub struct Packet { - pub from_peer: UUID, - pub to_peer: Option, + pub from_peer: PeerId, + pub to_peer: PeerId, pub body: PacketBody, } @@ -158,13 +159,13 @@ impl From for Bytes { } impl Packet { - pub fn new_handshake(from_peer: uuid::Uuid, network: &NetworkIdentity) -> Self { + pub fn new_handshake(from_peer: PeerId, network: &NetworkIdentity) -> Self { Packet { from_peer: from_peer.into(), - to_peer: None, + to_peer: 0, body: PacketBody::Ctrl(CtrlPacketBody::HandShake(HandShake { magic: MAGIC, - my_peer_id: from_peer.into(), + my_peer_id: from_peer, version: VERSION, features: Vec::new(), network_identity: network.clone().into(), @@ -172,25 +173,20 @@ impl Packet { } } - pub fn new_data_packet(from_peer: uuid::Uuid, to_peer: uuid::Uuid, data: &[u8]) -> Self { + pub fn new_data_packet(from_peer: PeerId, to_peer: PeerId, data: &[u8]) -> Self { Packet { - from_peer: from_peer.into(), - to_peer: Some(to_peer.into()), + from_peer, + to_peer, body: PacketBody::Data(DataPacketBody { data: data.to_vec(), }), } } - pub fn new_route_packet( - from_peer: uuid::Uuid, - to_peer: uuid::Uuid, - route_id: u8, - data: &[u8], - ) -> Self { + pub fn new_route_packet(from_peer: PeerId, to_peer: PeerId, route_id: u8, data: &[u8]) -> Self { Packet { - from_peer: from_peer.into(), - to_peer: Some(to_peer.into()), + from_peer, + to_peer, body: PacketBody::Ctrl(CtrlPacketBody::RoutePacket(RoutePacket { route_id, body: data.to_vec(), @@ -198,32 +194,32 @@ impl Packet { } } - pub fn new_ping_packet(from_peer: uuid::Uuid, to_peer: uuid::Uuid, seq: u32) -> Self { + pub fn new_ping_packet(from_peer: PeerId, to_peer: PeerId, seq: u32) -> Self { Packet { - from_peer: from_peer.into(), - to_peer: Some(to_peer.into()), + from_peer, + to_peer, body: PacketBody::Ctrl(CtrlPacketBody::Ping(seq)), } } - pub fn new_pong_packet(from_peer: uuid::Uuid, to_peer: uuid::Uuid, seq: u32) -> Self { + pub fn new_pong_packet(from_peer: PeerId, to_peer: PeerId, seq: u32) -> Self { Packet { - from_peer: from_peer.into(), - to_peer: Some(to_peer.into()), + from_peer, + to_peer, body: PacketBody::Ctrl(CtrlPacketBody::Pong(seq)), } } pub fn new_tarpc_packet( - from_peer: uuid::Uuid, - to_peer: uuid::Uuid, + from_peer: PeerId, + to_peer: PeerId, service_id: u32, is_req: bool, body: Vec, ) -> Self { Packet { - from_peer: from_peer.into(), - to_peer: Some(to_peer.into()), + from_peer, + to_peer, body: PacketBody::Ctrl(CtrlPacketBody::TaRpc(service_id, is_req, body)), } } @@ -231,12 +227,14 @@ impl Packet { #[cfg(test)] mod tests { + use crate::common::new_peer_id; + use super::*; #[tokio::test] async fn serialize() { let a = "abcde"; - let out = Packet::new_data_packet(uuid::Uuid::new_v4(), uuid::Uuid::new_v4(), a.as_bytes()); + let out = Packet::new_data_packet(new_peer_id(), new_peer_id(), a.as_bytes()); // let out = T::new(a.as_bytes()); let out_bytes: Bytes = out.into(); println!("out str: {:?}", a.as_bytes()); diff --git a/easytier-core/src/peers/peer.rs b/easytier-core/src/peers/peer.rs index 8ed69fa..8bf0557 100644 --- a/easytier-core/src/peers/peer.rs +++ b/easytier-core/src/peers/peer.rs @@ -9,26 +9,26 @@ use tokio::{ }; use tokio_util::bytes::Bytes; use tracing::Instrument; -use uuid::Uuid; -use super::peer_conn::PeerConn; +use super::peer_conn::{PeerConn, PeerConnId}; use crate::common::{ error::Error, global_ctx::{ArcGlobalCtx, GlobalCtxEvent}, + PeerId, }; use crate::rpc::PeerConnInfo; type ArcPeerConn = Arc>; -type ConnMap = Arc>; +type ConnMap = Arc>; pub struct Peer { - pub peer_node_id: uuid::Uuid, + pub peer_node_id: PeerId, conns: ConnMap, global_ctx: ArcGlobalCtx, packet_recv_chan: mpsc::Sender, - close_event_sender: mpsc::Sender, + close_event_sender: mpsc::Sender, close_event_listener: JoinHandle<()>, shutdown_notifier: Arc, @@ -36,7 +36,7 @@ pub struct Peer { impl Peer { pub fn new( - peer_node_id: uuid::Uuid, + peer_node_id: PeerId, packet_recv_chan: mpsc::Sender, global_ctx: ArcGlobalCtx, ) -> Self { @@ -118,7 +118,7 @@ impl Peer { Ok(()) } - pub async fn close_peer_conn(&self, conn_id: &Uuid) -> Result<(), Error> { + pub async fn close_peer_conn(&self, conn_id: &PeerConnId) -> Result<(), Error> { let has_key = self.conns.contains_key(conn_id); if !has_key { return Err(Error::NotFound); @@ -157,7 +157,7 @@ mod tests { use tokio::{sync::mpsc, time::timeout}; use crate::{ - common::{config_fs::ConfigFs, global_ctx::GlobalCtx, netns::NetNS}, + common::{config_fs::ConfigFs, global_ctx::GlobalCtx, netns::NetNS, new_peer_id}, peers::peer_conn::PeerConn, tunnels::ring_tunnel::create_ring_tunnel_pair, }; @@ -174,8 +174,8 @@ mod tests { NetNS::new(None), None, )); - let local_peer = Peer::new(uuid::Uuid::new_v4(), local_packet_send, global_ctx.clone()); - let remote_peer = Peer::new(uuid::Uuid::new_v4(), remote_packet_send, global_ctx.clone()); + let local_peer = Peer::new(new_peer_id(), local_packet_send, global_ctx.clone()); + let remote_peer = Peer::new(new_peer_id(), remote_packet_send, global_ctx.clone()); let (local_tunnel, remote_tunnel) = create_ring_tunnel_pair(); let mut local_peer_conn = diff --git a/easytier-core/src/peers/peer_conn.rs b/easytier-core/src/peers/peer_conn.rs index a09c8ef..0f6afb8 100644 --- a/easytier-core/src/peers/peer_conn.rs +++ b/easytier-core/src/peers/peer_conn.rs @@ -23,7 +23,10 @@ use tokio_util::{ use tracing::Instrument; use crate::{ - common::global_ctx::{ArcGlobalCtx, NetworkIdentity}, + common::{ + global_ctx::{ArcGlobalCtx, NetworkIdentity}, + PeerId, + }, define_tunnel_filter_chain, rpc::{PeerConnInfo, PeerConnStats}, tunnels::{ @@ -37,6 +40,8 @@ use super::packet::{self, ArchivedCtrlPacketBody, ArchivedHandShake, Packet}; pub type PacketRecvChan = mpsc::Sender; +pub type PeerConnId = uuid::Uuid; + macro_rules! wait_response { ($stream: ident, $out_var:ident, $pattern:pat_param => $value:expr) => { let rsp_vec = timeout(Duration::from_secs(1), $stream.next()).await; @@ -78,7 +83,7 @@ fn build_ctrl_msg(msg: Bytes, is_req: bool) -> Bytes { pub struct PeerInfo { magic: u32, - pub my_peer_id: uuid::Uuid, + pub my_peer_id: PeerId, version: u32, pub features: Vec, pub interfaces: Vec, @@ -89,7 +94,7 @@ impl<'a> From<&ArchivedHandShake> for PeerInfo { fn from(hs: &ArchivedHandShake) -> Self { PeerInfo { magic: hs.magic.into(), - my_peer_id: hs.my_peer_id.to_uuid(), + my_peer_id: hs.my_peer_id.into(), version: hs.version.into(), features: hs.features.iter().map(|x| x.to_string()).collect(), interfaces: Vec::new(), @@ -99,8 +104,8 @@ impl<'a> From<&ArchivedHandShake> for PeerInfo { } struct PeerConnPinger { - my_node_id: uuid::Uuid, - peer_id: uuid::Uuid, + my_peer_id: PeerId, + peer_id: PeerId, sink: Arc>>>, ctrl_sender: broadcast::Sender, latency_stats: Arc, @@ -111,7 +116,7 @@ struct PeerConnPinger { impl Debug for PeerConnPinger { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("PeerConnPinger") - .field("my_node_id", &self.my_node_id) + .field("my_peer_id", &self.my_peer_id) .field("peer_id", &self.peer_id) .finish() } @@ -119,15 +124,15 @@ impl Debug for PeerConnPinger { impl PeerConnPinger { pub fn new( - my_node_id: uuid::Uuid, - peer_id: uuid::Uuid, + my_peer_id: PeerId, + peer_id: PeerId, sink: Pin>, ctrl_sender: broadcast::Sender, latency_stats: Arc, loss_rate_stats: Arc, ) -> Self { Self { - my_node_id, + my_peer_id, peer_id, sink: Arc::new(Mutex::new(sink)), tasks: JoinSet::new(), @@ -137,16 +142,9 @@ impl PeerConnPinger { } } - pub async fn ping(&self) -> Result<(), TunnelError> { - let mut sink = self.sink.lock().await; - let ping_packet = Packet::new_ping_packet(uuid::Uuid::new_v4(), uuid::Uuid::new_v4(), 0); - sink.send(ping_packet.into()).await?; - Ok(()) - } - async fn do_pingpong_once( - my_node_id: uuid::Uuid, - peer_id: uuid::Uuid, + my_node_id: PeerId, + peer_id: PeerId, sink: Arc>>>, receiver: &mut broadcast::Receiver, seq: u32, @@ -207,7 +205,7 @@ impl PeerConnPinger { async fn pingpong(&mut self) { let sink = self.sink.clone(); - let my_node_id = self.my_node_id; + let my_node_id = self.my_peer_id; let peer_id = self.peer_id; let latency_stats = self.latency_stats.clone(); @@ -309,9 +307,9 @@ impl PeerConnPinger { define_tunnel_filter_chain!(PeerConnTunnel, stats = StatsRecorderTunnelFilter); pub struct PeerConn { - conn_id: uuid::Uuid, + conn_id: PeerConnId, - my_node_id: uuid::Uuid, + my_peer_id: PeerId, global_ctx: ArcGlobalCtx, sink: Pin>, @@ -321,7 +319,7 @@ pub struct PeerConn { info: Option, - close_event_sender: Option>, + close_event_sender: Option>, ctrl_resp_sender: broadcast::Sender, @@ -340,15 +338,15 @@ static CTRL_REQ_PACKET_PREFIX: &[u8] = &[0x12, 0x34, 0x56, 0x78, 0x9a, 0xbc, 0xd static CTRL_RESP_PACKET_PREFIX: &[u8] = &[0x12, 0x34, 0x56, 0x78, 0x9a, 0xbc, 0xde, 0xf1]; impl PeerConn { - pub fn new(node_id: uuid::Uuid, global_ctx: ArcGlobalCtx, tunnel: Box) -> Self { + pub fn new(my_peer_id: PeerId, global_ctx: ArcGlobalCtx, tunnel: Box) -> Self { let (ctrl_sender, _ctrl_receiver) = broadcast::channel(100); let peer_conn_tunnel = PeerConnTunnel::new(); let tunnel = peer_conn_tunnel.wrap_tunnel(tunnel); PeerConn { - conn_id: uuid::Uuid::new_v4(), + conn_id: PeerConnId::new_v4(), - my_node_id: node_id, + my_peer_id, global_ctx, sink: tunnel.pin_sink(), @@ -367,7 +365,7 @@ impl PeerConn { } } - pub fn get_conn_id(&self) -> uuid::Uuid { + pub fn get_conn_id(&self) -> PeerConnId { self.conn_id } @@ -382,7 +380,7 @@ impl PeerConn { let hs_req = self .global_ctx .net_ns - .run(|| packet::Packet::new_handshake(self.my_node_id, &self.global_ctx.network)); + .run(|| packet::Packet::new_handshake(self.my_peer_id, &self.global_ctx.network)); sink.send(hs_req.into()).await?; Ok(()) @@ -395,7 +393,7 @@ impl PeerConn { let hs_req = self .global_ctx .net_ns - .run(|| packet::Packet::new_handshake(self.my_node_id, &self.global_ctx.network)); + .run(|| packet::Packet::new_handshake(self.my_peer_id, &self.global_ctx.network)); sink.send(hs_req.into()).await?; wait_response!(stream, hs_rsp, packet::ArchivedPacketBody::Ctrl(ArchivedCtrlPacketBody::HandShake(x)) => x); @@ -429,8 +427,8 @@ impl PeerConn { log::trace!("recv ping packet: {:?}", packet); Ok(build_ctrl_msg( packet::Packet::new_pong_packet( - conn_info.my_node_id.parse().unwrap(), - conn_info.peer_id.parse().unwrap(), + conn_info.my_peer_id, + conn_info.peer_id, seq.into(), ) .into(), @@ -446,7 +444,7 @@ impl PeerConn { pub fn start_pingpong(&mut self) { let mut pingpong = PeerConnPinger::new( - self.my_node_id, + self.my_peer_id, self.get_peer_id(), self.tunnel.pin_sink(), self.ctrl_resp_sender.clone(), @@ -532,7 +530,7 @@ impl PeerConn { self.sink.send(msg).await } - pub fn get_peer_id(&self) -> uuid::Uuid { + pub fn get_peer_id(&self) -> PeerId { self.info.as_ref().unwrap().my_peer_id } @@ -540,7 +538,7 @@ impl PeerConn { self.info.as_ref().unwrap().network_identity.clone() } - pub fn set_close_event_sender(&mut self, sender: mpsc::Sender) { + pub fn set_close_event_sender(&mut self, sender: mpsc::Sender) { self.close_event_sender = Some(sender); } @@ -559,8 +557,8 @@ impl PeerConn { pub fn get_conn_info(&self) -> PeerConnInfo { PeerConnInfo { conn_id: self.conn_id.to_string(), - my_node_id: self.my_node_id.to_string(), - peer_id: self.get_peer_id().to_string(), + my_peer_id: self.my_peer_id, + peer_id: self.get_peer_id(), features: self.info.as_ref().unwrap().features.clone(), tunnel: self.tunnel.info(), stats: Some(self.get_stats()), @@ -589,6 +587,7 @@ mod tests { use crate::common::global_ctx::tests::get_mock_global_ctx; use crate::common::global_ctx::GlobalCtx; use crate::common::netns::NetNS; + use crate::common::new_peer_id; use crate::tunnels::tunnel_filter::tests::DropSendTunnelFilter; use crate::tunnels::tunnel_filter::{PacketRecorderTunnelFilter, TunnelWithFilter}; @@ -603,11 +602,11 @@ mod tests { let c = TunnelWithFilter::new(c, c_recorder.clone()); let s = TunnelWithFilter::new(s, s_recorder.clone()); - let c_uuid = uuid::Uuid::new_v4(); - let s_uuid = uuid::Uuid::new_v4(); + let c_peer_id = new_peer_id(); + let s_peer_id = new_peer_id(); let mut c_peer = PeerConn::new( - c_uuid, + c_peer_id, Arc::new(GlobalCtx::new( "c", ConfigFs::new_with_dir("c", "/tmp"), @@ -618,7 +617,7 @@ mod tests { ); let mut s_peer = PeerConn::new( - s_uuid, + s_peer_id, Arc::new(GlobalCtx::new( "c", ConfigFs::new_with_dir("c", "/tmp"), @@ -642,8 +641,8 @@ mod tests { assert_eq!(s_recorder.sent.lock().unwrap().len(), 1); assert_eq!(s_recorder.received.lock().unwrap().len(), 1); - assert_eq!(c_peer.get_peer_id(), s_uuid); - assert_eq!(s_peer.get_peer_id(), c_uuid); + assert_eq!(c_peer.get_peer_id(), s_peer_id); + assert_eq!(s_peer.get_peer_id(), c_peer_id); assert_eq!(c_peer.get_network_identity(), s_peer.get_network_identity()); assert_eq!(c_peer.get_network_identity(), NetworkIdentity::default()); } @@ -656,11 +655,11 @@ mod tests { let c_recorder = Arc::new(DropSendTunnelFilter::new(drop_start, drop_end)); let c = TunnelWithFilter::new(c, c_recorder.clone()); - let c_uuid = uuid::Uuid::new_v4(); - let s_uuid = uuid::Uuid::new_v4(); + let c_peer_id = new_peer_id(); + let s_peer_id = new_peer_id(); - let mut c_peer = PeerConn::new(c_uuid, get_mock_global_ctx(), Box::new(c)); - let mut s_peer = PeerConn::new(s_uuid, get_mock_global_ctx(), Box::new(s)); + let mut c_peer = PeerConn::new(c_peer_id, get_mock_global_ctx(), Box::new(c)); + let mut s_peer = PeerConn::new(s_peer_id, get_mock_global_ctx(), Box::new(s)); let (c_ret, s_ret) = tokio::join!( c_peer.do_handshake_as_client(), diff --git a/easytier-core/src/peers/peer_manager.rs b/easytier-core/src/peers/peer_manager.rs index e5cb982..aafc0cd 100644 --- a/easytier-core/src/peers/peer_manager.rs +++ b/easytier-core/src/peers/peer_manager.rs @@ -13,15 +13,12 @@ use tokio::{ use tokio_stream::wrappers::ReceiverStream; use tokio_util::bytes::{Bytes, BytesMut}; -use uuid::Uuid; - use crate::{ - common::{error::Error, global_ctx::ArcGlobalCtx, rkyv_util::extract_bytes_from_archived_vec}, + common::{ + error::Error, global_ctx::ArcGlobalCtx, rkyv_util::extract_bytes_from_archived_vec, PeerId, + }, peers::{ - packet::{self}, - peer_conn::PeerConn, - peer_rpc::PeerRpcManagerTransport, - route_trait::RouteInterface, + packet, peer_conn::PeerConn, peer_rpc::PeerRpcManagerTransport, route_trait::RouteInterface, }, tunnels::{SinkItem, Tunnel, TunnelConnector}, }; @@ -29,15 +26,15 @@ use crate::{ use super::{ foreign_network_client::ForeignNetworkClient, foreign_network_manager::ForeignNetworkManager, + peer_conn::PeerConnId, peer_map::PeerMap, peer_rip_route::BasicRoute, peer_rpc::PeerRpcManager, route_trait::{ArcRoute, Route}, - PeerId, }; struct RpcTransport { - my_peer_id: uuid::Uuid, + my_peer_id: PeerId, peers: Arc, foreign_peers: Mutex>>, @@ -47,11 +44,11 @@ struct RpcTransport { #[async_trait::async_trait] impl PeerRpcManagerTransport for RpcTransport { - fn my_peer_id(&self) -> Uuid { + fn my_peer_id(&self) -> PeerId { self.my_peer_id } - async fn send(&self, msg: Bytes, dst_peer_id: &uuid::Uuid) -> Result<(), Error> { + async fn send(&self, msg: Bytes, dst_peer_id: PeerId) -> Result<(), Error> { if let Some(foreign_peers) = self.foreign_peers.lock().await.as_ref() { if foreign_peers.has_peer(dst_peer_id) { return foreign_peers.send_msg(msg, dst_peer_id).await; @@ -92,7 +89,8 @@ type BoxPeerPacketFilter = Box; type BoxNicPacketFilter = Box; pub struct PeerManager { - my_node_id: uuid::Uuid, + my_peer_id: PeerId, + global_ctx: ArcGlobalCtx, nic_channel: mpsc::Sender, @@ -117,7 +115,7 @@ pub struct PeerManager { impl Debug for PeerManager { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("PeerManager") - .field("my_node_id", &self.my_node_id) + .field("my_peer_id", &self.my_peer_id()) .field("instance_name", &self.global_ctx.inst_name) .field("net_ns", &self.global_ctx.net_ns.name()) .finish() @@ -126,13 +124,19 @@ impl Debug for PeerManager { impl PeerManager { pub fn new(global_ctx: ArcGlobalCtx, nic_channel: mpsc::Sender) -> Self { + let my_peer_id = rand::random(); + let (packet_send, packet_recv) = mpsc::channel(100); - let peers = Arc::new(PeerMap::new(packet_send.clone(), global_ctx.clone())); + let peers = Arc::new(PeerMap::new( + packet_send.clone(), + global_ctx.clone(), + my_peer_id, + )); // TODO: remove these because we have impl pipeline processor. let (peer_rpc_tspt_sender, peer_rpc_tspt_recv) = mpsc::unbounded_channel(); let rpc_tspt = Arc::new(RpcTransport { - my_peer_id: global_ctx.get_id(), + my_peer_id, peers: peers.clone(), foreign_peers: Mutex::new(None), packet_recv: Mutex::new(peer_rpc_tspt_recv), @@ -140,9 +144,10 @@ impl PeerManager { }); let peer_rpc_mgr = Arc::new(PeerRpcManager::new(rpc_tspt.clone())); - let basic_route = Arc::new(BasicRoute::new(global_ctx.get_id(), global_ctx.clone())); + let basic_route = Arc::new(BasicRoute::new(my_peer_id, global_ctx.clone())); let foreign_network_manager = Arc::new(ForeignNetworkManager::new( + my_peer_id, global_ctx.clone(), packet_send.clone(), )); @@ -150,10 +155,12 @@ impl PeerManager { global_ctx.clone(), packet_send.clone(), peer_rpc_mgr.clone(), + my_peer_id, )); PeerManager { - my_node_id: global_ctx.get_id(), + my_peer_id, + global_ctx, nic_channel, @@ -176,8 +183,11 @@ impl PeerManager { } } - pub async fn add_client_tunnel(&self, tunnel: Box) -> Result<(Uuid, Uuid), Error> { - let mut peer = PeerConn::new(self.my_node_id, self.global_ctx.clone(), tunnel); + pub async fn add_client_tunnel( + &self, + tunnel: Box, + ) -> Result<(PeerId, PeerConnId), Error> { + let mut peer = PeerConn::new(self.my_peer_id, self.global_ctx.clone(), tunnel); peer.do_handshake_as_client().await?; let conn_id = peer.get_conn_id(); let peer_id = peer.get_peer_id(); @@ -190,7 +200,7 @@ impl PeerManager { } #[tracing::instrument] - pub async fn try_connect(&self, mut connector: C) -> Result<(Uuid, Uuid), Error> + pub async fn try_connect(&self, mut connector: C) -> Result<(PeerId, PeerConnId), Error> where C: TunnelConnector + Debug, { @@ -204,7 +214,7 @@ impl PeerManager { #[tracing::instrument] pub async fn add_tunnel_as_server(&self, tunnel: Box) -> Result<(), Error> { tracing::info!("add tunnel as server start"); - let mut peer = PeerConn::new(self.my_node_id, self.global_ctx.clone(), tunnel); + let mut peer = PeerConn::new(self.my_peer_id, self.global_ctx.clone(), tunnel); peer.do_handshake_as_server().await?; if peer.get_network_identity() == self.global_ctx.get_network_identity() { self.peers.add_new_peer_conn(peer).await; @@ -217,7 +227,7 @@ impl PeerManager { async fn start_peer_recv(&self) { let mut recv = ReceiverStream::new(self.packet_recv.lock().await.take().unwrap()); - let my_node_id = self.my_node_id; + let my_peer_id = self.my_peer_id; let peers = self.peers.clone(); let pipe_line = self.peer_packet_process_pipeline.clone(); self.tasks.lock().await.spawn(async move { @@ -225,21 +235,21 @@ impl PeerManager { while let Some(ret) = recv.next().await { log::trace!("peer recv a packet...: {:?}", ret); let packet = packet::Packet::decode(&ret); - let from_peer_uuid = packet.from_peer.to_uuid(); - let to_peer_uuid = packet.to_peer.as_ref().unwrap().to_uuid(); - if to_peer_uuid != my_node_id { + let from_peer_id: PeerId = packet.from_peer.into(); + let to_peer_id: PeerId = packet.to_peer.into(); + if to_peer_id != my_peer_id { log::trace!( - "need forward: to_peer_uuid: {:?}, my_uuid: {:?}", - to_peer_uuid, - my_node_id + "need forward: to_peer_id: {:?}, my_peer_id: {:?}", + to_peer_id, + my_peer_id ); - let ret = peers.send_msg(ret.clone(), &to_peer_uuid).await; + let ret = peers.send_msg(ret.clone(), to_peer_id).await; if ret.is_err() { log::error!( "forward packet error: {:?}, dst: {:?}, from: {:?}", ret, - to_peer_uuid, - from_peer_uuid + to_peer_id, + from_peer_id ); } } else { @@ -344,7 +354,7 @@ impl PeerManager { T: Route + Send + Sync + 'static, { struct Interface { - my_node_id: uuid::Uuid, + my_peer_id: PeerId, peers: Arc, foreign_network_client: Arc, } @@ -360,15 +370,15 @@ impl PeerManager { &self, msg: Bytes, route_id: u8, - dst_peer_id: &PeerId, + dst_peer_id: PeerId, ) -> Result<(), Error> { let packet_bytes: Bytes = - packet::Packet::new_route_packet(self.my_node_id, *dst_peer_id, route_id, &msg) + packet::Packet::new_route_packet(self.my_peer_id, dst_peer_id, route_id, &msg) .into(); if self.foreign_network_client.has_next_hop(dst_peer_id) { return self .foreign_network_client - .send_msg(packet_bytes, &dst_peer_id) + .send_msg(packet_bytes, dst_peer_id) .await; } @@ -376,12 +386,15 @@ impl PeerManager { .send_msg_directly(packet_bytes, dst_peer_id) .await } + fn my_peer_id(&self) -> PeerId { + self.my_peer_id + } } - let my_node_id = self.my_node_id; + let my_peer_id = self.my_peer_id; let _route_id = route .open(Box::new(Interface { - my_node_id, + my_peer_id, peers: self.peers.clone(), foreign_network_client: self.foreign_network_client.clone(), })) @@ -403,7 +416,7 @@ impl PeerManager { data } - pub async fn send_msg(&self, msg: Bytes, dst_peer_id: &PeerId) -> Result<(), Error> { + pub async fn send_msg(&self, msg: Bytes, dst_peer_id: PeerId) -> Result<(), Error> { self.peers.send_msg(msg, dst_peer_id).await } @@ -440,8 +453,8 @@ impl PeerManager { let send_ret = self .peers .send_msg( - packet::Packet::new_data_packet(self.my_node_id, peer_id.clone(), &msg).into(), - &peer_id, + packet::Packet::new_data_packet(self.my_peer_id, peer_id.clone(), &msg).into(), + *peer_id, ) .await; @@ -502,7 +515,11 @@ impl PeerManager { } pub fn my_node_id(&self) -> uuid::Uuid { - self.my_node_id + self.global_ctx.get_id() + } + + pub fn my_peer_id(&self) -> PeerId { + self.my_peer_id } pub fn get_global_ctx(&self) -> ArcGlobalCtx { diff --git a/easytier-core/src/peers/peer_map.rs b/easytier-core/src/peers/peer_map.rs index 9e627de..84763dc 100644 --- a/easytier-core/src/peers/peer_map.rs +++ b/easytier-core/src/peers/peer_map.rs @@ -9,24 +9,35 @@ use crate::{ common::{ error::Error, global_ctx::{ArcGlobalCtx, GlobalCtxEvent}, + PeerId, }, rpc::PeerConnInfo, tunnels::TunnelError, }; -use super::{peer::Peer, peer_conn::PeerConn, route_trait::ArcRoute, PeerId}; +use super::{ + peer::Peer, + peer_conn::{PeerConn, PeerConnId}, + route_trait::ArcRoute, +}; pub struct PeerMap { global_ctx: ArcGlobalCtx, + my_peer_id: PeerId, peer_map: DashMap>, packet_send: mpsc::Sender, routes: RwLock>, } impl PeerMap { - pub fn new(packet_send: mpsc::Sender, global_ctx: ArcGlobalCtx) -> Self { + pub fn new( + packet_send: mpsc::Sender, + global_ctx: ArcGlobalCtx, + my_peer_id: PeerId, + ) -> Self { PeerMap { global_ctx, + my_peer_id, peer_map: DashMap::new(), packet_send, routes: RwLock::new(Vec::new()), @@ -53,20 +64,16 @@ impl PeerMap { } } - fn get_peer_by_id(&self, peer_id: &PeerId) -> Option> { - self.peer_map.get(peer_id).map(|v| v.clone()) + fn get_peer_by_id(&self, peer_id: PeerId) -> Option> { + self.peer_map.get(&peer_id).map(|v| v.clone()) } - pub fn has_peer(&self, peer_id: &PeerId) -> bool { - self.peer_map.contains_key(peer_id) + pub fn has_peer(&self, peer_id: PeerId) -> bool { + self.peer_map.contains_key(&peer_id) } - pub async fn send_msg_directly( - &self, - msg: Bytes, - dst_peer_id: &uuid::Uuid, - ) -> Result<(), Error> { - if *dst_peer_id == self.global_ctx.get_id() { + pub async fn send_msg_directly(&self, msg: Bytes, dst_peer_id: PeerId) -> Result<(), Error> { + if dst_peer_id == self.my_peer_id { return Ok(self .packet_send .send(msg) @@ -87,8 +94,8 @@ impl PeerMap { Ok(()) } - pub async fn send_msg(&self, msg: Bytes, dst_peer_id: &uuid::Uuid) -> Result<(), Error> { - if *dst_peer_id == self.global_ctx.get_id() { + pub async fn send_msg(&self, msg: Bytes, dst_peer_id: PeerId) -> Result<(), Error> { + if dst_peer_id == self.my_peer_id { return Ok(self .packet_send .send(msg) @@ -108,7 +115,7 @@ impl PeerMap { } if gateway_peer_id.is_none() && self.has_peer(dst_peer_id) { - gateway_peer_id = Some(*dst_peer_id); + gateway_peer_id = Some(dst_peer_id); } let Some(gateway_peer_id) = gateway_peer_id else { @@ -116,8 +123,7 @@ impl PeerMap { return Ok(()); }; - self.send_msg_directly(msg.clone(), &gateway_peer_id) - .await?; + self.send_msg_directly(msg.clone(), gateway_peer_id).await?; return Ok(()); } @@ -148,7 +154,7 @@ impl PeerMap { let mut ret = Vec::new(); let peers = self.list_peers().await; for peer_id in peers.iter() { - let Some(peer) = self.get_peer_by_id(peer_id) else { + let Some(peer) = self.get_peer_by_id(*peer_id) else { continue; }; if peer.list_peer_conns().await.len() > 0 { @@ -159,7 +165,7 @@ impl PeerMap { } pub async fn list_peer_conns(&self, peer_id: &PeerId) -> Option> { - if let Some(p) = self.get_peer_by_id(peer_id) { + if let Some(p) = self.get_peer_by_id(*peer_id) { Some(p.list_peer_conns().await) } else { return None; @@ -168,8 +174,8 @@ impl PeerMap { pub async fn close_peer_conn( &self, - peer_id: &PeerId, - conn_id: &uuid::Uuid, + peer_id: PeerId, + conn_id: &PeerConnId, ) -> Result<(), Error> { if let Some(p) = self.get_peer_by_id(peer_id) { p.close_peer_conn(conn_id).await @@ -178,10 +184,10 @@ impl PeerMap { } } - pub async fn close_peer(&self, peer_id: &PeerId) -> Result<(), TunnelError> { - let remove_ret = self.peer_map.remove(peer_id); + pub async fn close_peer(&self, peer_id: PeerId) -> Result<(), TunnelError> { + let remove_ret = self.peer_map.remove(&peer_id); self.global_ctx - .issue_event(GlobalCtxEvent::PeerRemoved(peer_id.clone())); + .issue_event(GlobalCtxEvent::PeerRemoved(peer_id)); tracing::info!( ?peer_id, has_old_value = ?remove_ret.is_some(), @@ -207,7 +213,7 @@ impl PeerMap { } for peer_id in to_remove { - self.close_peer(&peer_id).await.unwrap(); + self.close_peer(peer_id).await.unwrap(); } } @@ -215,10 +221,7 @@ impl PeerMap { let route_map = DashMap::new(); for route in self.routes.read().await.iter() { for item in route.list_routes().await.iter() { - route_map.insert( - item.peer_id.parse().unwrap(), - item.next_hop_peer_id.parse().unwrap(), - ); + route_map.insert(item.peer_id, item.next_hop_peer_id); } } route_map diff --git a/easytier-core/src/peers/peer_rip_route.rs b/easytier-core/src/peers/peer_rip_route.rs index 286c467..72b29e7 100644 --- a/easytier-core/src/peers/peer_rip_route.rs +++ b/easytier-core/src/peers/peer_rip_route.rs @@ -13,7 +13,6 @@ use tokio::{ }; use tokio_util::bytes::Bytes; use tracing::Instrument; -use uuid::Uuid; use crate::{ common::{ @@ -21,11 +20,11 @@ use crate::{ global_ctx::ArcGlobalCtx, rkyv_util::{decode_from_bytes, encode_to_bytes, extract_bytes_from_archived_vec}, stun::StunInfoCollectorTrait, + PeerId, }, peers::{ - packet::{self, UUID}, + packet::{self}, route_trait::{Route, RouteInterfaceBox}, - PeerId, }, rpc::{NatType, StunInfo}, }; @@ -44,7 +43,7 @@ type Version = u32; #[archive_attr(derive(Debug))] pub struct SyncPeerInfo { // means next hop in route table. - pub peer_id: UUID, + pub peer_id: PeerId, pub cost: u32, pub ipv4_addr: Option, pub proxy_cidrs: Vec, @@ -53,7 +52,7 @@ pub struct SyncPeerInfo { } impl SyncPeerInfo { - pub fn new_self(from_peer: UUID, global_ctx: &ArcGlobalCtx) -> Self { + pub fn new_self(from_peer: PeerId, global_ctx: &ArcGlobalCtx) -> Self { SyncPeerInfo { peer_id: from_peer, cost: 0, @@ -71,9 +70,9 @@ impl SyncPeerInfo { } } - pub fn clone_for_route_table(&self, next_hop: &UUID, cost: u32, from: &Self) -> Self { + pub fn clone_for_route_table(&self, next_hop: PeerId, cost: u32, from: &Self) -> Self { SyncPeerInfo { - peer_id: next_hop.clone(), + peer_id: next_hop, cost, ipv4_addr: from.ipv4_addr.clone(), proxy_cidrs: from.proxy_cidrs.clone(), @@ -100,8 +99,8 @@ pub struct SyncPeer { impl SyncPeer { pub fn new( - from_peer: UUID, - _to_peer: UUID, + from_peer: PeerId, + _to_peer: PeerId, neighbors: Vec, global_ctx: ArcGlobalCtx, version: Version, @@ -124,13 +123,13 @@ struct SyncPeerFromRemote { last_update: std::time::Instant, } -type SyncPeerFromRemoteMap = Arc>; +type SyncPeerFromRemoteMap = Arc>; #[derive(Debug)] struct RouteTable { - route_info: DashMap, - ipv4_peer_id_map: DashMap, - cidr_peer_id_map: DashMap, + route_info: DashMap, + ipv4_peer_id_map: DashMap, + cidr_peer_id_map: DashMap, } impl RouteTable { @@ -182,7 +181,7 @@ impl RouteVersion { } pub struct BasicRoute { - my_peer_id: packet::UUID, + my_peer_id: PeerId, global_ctx: ArcGlobalCtx, interface: Arc>>, @@ -200,9 +199,9 @@ pub struct BasicRoute { } impl BasicRoute { - pub fn new(my_peer_id: Uuid, global_ctx: ArcGlobalCtx) -> Self { + pub fn new(my_peer_id: PeerId, global_ctx: ArcGlobalCtx) -> Self { BasicRoute { - my_peer_id: my_peer_id.into(), + my_peer_id, global_ctx: global_ctx.clone(), interface: Arc::new(Mutex::new(None)), @@ -223,7 +222,7 @@ impl BasicRoute { } fn update_route_table( - my_id: packet::UUID, + my_id: PeerId, sync_peer_reqs: SyncPeerFromRemoteMap, route_table: Arc, ) { @@ -231,18 +230,18 @@ impl BasicRoute { let new_route_table = Arc::new(RouteTable::new()); for item in sync_peer_reqs.iter() { - Self::update_route_table_with_req( - my_id.clone(), - &item.value().packet, - new_route_table.clone(), - ); + Self::update_route_table_with_req(my_id, &item.value().packet, new_route_table.clone()); } route_table.copy_from(&new_route_table); } - async fn update_myself(myself: &Arc>, global_ctx: &ArcGlobalCtx) -> bool { - let new_myself = SyncPeerInfo::new_self(global_ctx.get_id().into(), &global_ctx); + async fn update_myself( + my_peer_id: PeerId, + myself: &Arc>, + global_ctx: &ArcGlobalCtx, + ) -> bool { + let new_myself = SyncPeerInfo::new_self(my_peer_id, &global_ctx); if *myself.read().await != new_myself { *myself.write().await = new_myself; true @@ -251,26 +250,22 @@ impl BasicRoute { } } - fn update_route_table_with_req( - my_id: packet::UUID, - packet: &SyncPeer, - route_table: Arc, - ) { + fn update_route_table_with_req(my_id: PeerId, packet: &SyncPeer, route_table: Arc) { let peer_id = packet.myself.peer_id.clone(); let update = |cost: u32, peer_info: &SyncPeerInfo| { - let node_id: uuid::Uuid = peer_info.peer_id.clone().into(); + let node_id: PeerId = peer_info.peer_id.into(); let ret = route_table .route_info .entry(node_id.clone().into()) .and_modify(|info| { if info.cost > cost { - *info = info.clone_for_route_table(&peer_id, cost, &peer_info); + *info = info.clone_for_route_table(peer_id, cost, &peer_info); } }) .or_insert( peer_info .clone() - .clone_for_route_table(&peer_id, cost, &peer_info), + .clone_for_route_table(peer_id, cost, &peer_info), ) .value() .clone(); @@ -321,7 +316,7 @@ impl BasicRoute { async fn send_sync_peer_request( interface: &RouteInterfaceBox, - my_peer_id: packet::UUID, + my_peer_id: PeerId, global_ctx: ArcGlobalCtx, peer_id: PeerId, route_table: Arc, @@ -333,11 +328,11 @@ impl BasicRoute { // copy the route info for item in route_table.route_info.iter() { let (k, v) = item.pair(); - route_info_copy.push(v.clone().clone_for_route_table(&(*k).into(), v.cost, &v)); + route_info_copy.push(v.clone().clone_for_route_table(*k, v.cost, &v)); } let msg = SyncPeer::new( my_peer_id, - peer_id.into(), + peer_id, route_info_copy, global_ctx, my_version, @@ -346,7 +341,7 @@ impl BasicRoute { ); // TODO: this may exceed the MTU of the tunnel interface - .send_route_packet(encode_to_bytes::<_, 4096>(&msg), 1, &peer_id) + .send_route_packet(encode_to_bytes::<_, 4096>(&msg), 1, peer_id) .await } @@ -363,7 +358,7 @@ impl BasicRoute { self.tasks.lock().await.spawn( async move { loop { - if Self::update_myself(&myself, &global_ctx).await { + if Self::update_myself(my_peer_id,&myself, &global_ctx).await { version.inc(); tracing::info!( my_id = ?my_peer_id, @@ -378,7 +373,7 @@ impl BasicRoute { let peers = interface.list_peers().await; for peer in peers.iter() { let last_send_time = last_send_time_map.get(peer).map(|v| *v).unwrap_or((0, None, Instant::now() - Duration::from_secs(3600))); - let my_version_peer_saved = sync_peer_from_remote.get(&peer).and_then(|v| v.packet.peer_version); + let my_version_peer_saved = sync_peer_from_remote.get(peer).and_then(|v| v.packet.peer_version); let peer_have_latest_version = my_version_peer_saved == Some(version.get()); if peer_have_latest_version && last_send_time.2.elapsed().as_secs() < SEND_ROUTE_PERIOD_SEC { last_send_time_map_new.insert(*peer, last_send_time); @@ -478,7 +473,7 @@ impl BasicRoute { if need_update_route { Self::update_route_table( - my_peer_id.clone(), + my_peer_id, sync_peer_from_remote.clone(), route_table.clone(), ); @@ -508,13 +503,13 @@ impl BasicRoute { } #[tracing::instrument(skip(self, packet), fields(my_id = ?self.my_peer_id, ctx = ?self.global_ctx))] - async fn handle_route_packet(&self, src_peer_id: uuid::Uuid, packet: Bytes) { + async fn handle_route_packet(&self, src_peer_id: PeerId, packet: Bytes) { let packet = decode_from_bytes::(&packet).unwrap(); let p: SyncPeer = packet.deserialize(&mut rkyv::Infallible).unwrap(); let mut updated = true; - assert_eq!(packet.myself.peer_id.to_uuid(), src_peer_id); + assert_eq!(packet.myself.peer_id, src_peer_id); self.sync_peer_from_remote - .entry(packet.myself.peer_id.to_uuid()) + .entry(packet.myself.peer_id.into()) .and_modify(|v| { if v.packet.myself == p.myself && v.packet.neighbors == p.neighbors { updated = false; @@ -547,7 +542,7 @@ impl BasicRoute { if packet.need_reply { self.last_send_time_map - .entry(packet.myself.peer_id.to_uuid()) + .entry(packet.myself.peer_id.into()) .and_modify(|v| { const FAST_REPLY_DURATION: u64 = SEND_ROUTE_PERIOD_SEC - SEND_ROUTE_FAST_REPLY_SEC; @@ -577,8 +572,8 @@ impl Route for BasicRoute { async fn close(&self) {} - async fn get_next_hop(&self, dst_peer_id: &PeerId) -> Option { - match self.route_table.route_info.get(dst_peer_id) { + async fn get_next_hop(&self, dst_peer_id: PeerId) -> Option { + match self.route_table.route_info.get(&dst_peer_id) { Some(info) => { return Some(info.peer_id.clone().into()); } @@ -592,15 +587,15 @@ impl Route for BasicRoute { async fn list_routes(&self) -> Vec { let mut routes = Vec::new(); - let parse_route_info = |real_peer_id: &Uuid, route_info: &SyncPeerInfo| { + let parse_route_info = |real_peer_id: PeerId, route_info: &SyncPeerInfo| { let mut route = crate::rpc::Route::default(); route.ipv4_addr = if let Some(ipv4_addr) = route_info.ipv4_addr { ipv4_addr.to_string() } else { "".to_string() }; - route.peer_id = real_peer_id.to_string(); - route.next_hop_peer_id = Uuid::from(route_info.peer_id.clone()).to_string(); + route.peer_id = real_peer_id; + route.next_hop_peer_id = route_info.peer_id; route.cost = route_info.cost as i32; route.proxy_cidrs = route_info.proxy_cidrs.clone(); route.hostname = if let Some(hostname) = &route_info.hostname { @@ -619,7 +614,7 @@ impl Route for BasicRoute { }; self.route_table.route_info.iter().for_each(|item| { - routes.push(parse_route_info(item.key(), item.value())); + routes.push(parse_route_info(*item.key(), item.value())); }); routes @@ -650,7 +645,7 @@ impl PeerPacketFilter for BasicRoute { &packet.body { self.handle_route_packet( - packet.from_peer.to_uuid(), + packet.from_peer.into(), extract_bytes_from_archived_vec(&data, &route_packet.body), ) .await; @@ -666,12 +661,12 @@ mod tests { use std::sync::Arc; use crate::{ + common::PeerId, connector::udp_hole_punch::tests::create_mock_peer_manager_with_mock_stun, peers::{ peer_manager::PeerManager, peer_rip_route::Version, tests::{connect_peer_manager, wait_route_appear}, - PeerId, }, rpc::NatType, }; @@ -683,10 +678,10 @@ mod tests { let peer_mgr_c = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; connect_peer_manager(peer_mgr_a.clone(), peer_mgr_b.clone()).await; connect_peer_manager(peer_mgr_b.clone(), peer_mgr_c.clone()).await; - wait_route_appear(peer_mgr_a.clone(), peer_mgr_b.my_node_id()) + wait_route_appear(peer_mgr_a.clone(), peer_mgr_b.my_peer_id()) .await .unwrap(); - wait_route_appear(peer_mgr_a.clone(), peer_mgr_c.my_node_id()) + wait_route_appear(peer_mgr_a.clone(), peer_mgr_c.my_peer_id()) .await .unwrap(); @@ -694,12 +689,12 @@ mod tests { tokio::time::sleep(tokio::time::Duration::from_secs(4)).await; - let check_version = |version: Version, uuid: PeerId, mgrs: &Vec>| { + let check_version = |version: Version, peer_id: PeerId, mgrs: &Vec>| { for mgr in mgrs.iter() { tracing::warn!( "check version: {:?}, {:?}, {:?}, {:?}", version, - uuid, + peer_id, mgr, mgr.get_basic_route().sync_peer_from_remote ); @@ -707,7 +702,7 @@ mod tests { version, mgr.get_basic_route() .sync_peer_from_remote - .get(&uuid) + .get(&peer_id) .unwrap() .packet .version, @@ -715,7 +710,7 @@ mod tests { assert_eq!( mgr.get_basic_route() .sync_peer_from_remote - .get(&uuid) + .get(&peer_id) .unwrap() .packet .peer_version @@ -729,19 +724,19 @@ mod tests { // check peer version in other peer mgr are correct. check_version( peer_mgr_b.get_basic_route().version.get(), - peer_mgr_b.my_node_id(), + peer_mgr_b.my_peer_id(), &vec![peer_mgr_a.clone(), peer_mgr_c.clone()], ); check_version( peer_mgr_a.get_basic_route().version.get(), - peer_mgr_a.my_node_id(), + peer_mgr_a.my_peer_id(), &vec![peer_mgr_b.clone()], ); check_version( peer_mgr_c.get_basic_route().version.get(), - peer_mgr_c.my_node_id(), + peer_mgr_c.my_peer_id(), &vec![peer_mgr_b.clone()], ); }; diff --git a/easytier-core/src/peers/peer_rpc.rs b/easytier-core/src/peers/peer_rpc.rs index db6116e..4ed01cf 100644 --- a/easytier-core/src/peers/peer_rpc.rs +++ b/easytier-core/src/peers/peer_rpc.rs @@ -11,7 +11,10 @@ use tokio::{ use tokio_util::bytes::Bytes; use tracing::Instrument; -use crate::{common::error::Error, peers::packet::Packet}; +use crate::{ + common::{error::Error, PeerId}, + peers::packet::Packet, +}; use super::packet::{CtrlPacketBody, PacketBody}; @@ -20,22 +23,22 @@ type PeerRpcServiceId = u32; #[async_trait::async_trait] #[auto_impl::auto_impl(Arc)] pub trait PeerRpcManagerTransport: Send + Sync + 'static { - fn my_peer_id(&self) -> uuid::Uuid; - async fn send(&self, msg: Bytes, dst_peer_id: &uuid::Uuid) -> Result<(), Error>; + fn my_peer_id(&self) -> PeerId; + async fn send(&self, msg: Bytes, dst_peer_id: PeerId) -> Result<(), Error>; async fn recv(&self) -> Result; } type PacketSender = UnboundedSender; struct PeerRpcEndPoint { - peer_id: uuid::Uuid, + peer_id: PeerId, packet_sender: PacketSender, tasks: JoinSet<()>, } -type PeerRpcEndPointCreator = Box PeerRpcEndPoint + Send + Sync + 'static>; +type PeerRpcEndPointCreator = Box PeerRpcEndPoint + Send + Sync + 'static>; #[derive(Hash, Eq, PartialEq, Clone)] -struct PeerRpcClientCtxKey(uuid::Uuid, PeerRpcServiceId); +struct PeerRpcClientCtxKey(PeerId, PeerRpcServiceId); // handle rpc request from one peer pub struct PeerRpcManager { @@ -44,7 +47,7 @@ pub struct PeerRpcManager { tspt: Arc>, service_registry: Arc>, - peer_rpc_endpoints: Arc>, + peer_rpc_endpoints: Arc>, client_resp_receivers: Arc>, } @@ -59,8 +62,8 @@ impl std::fmt::Debug for PeerRpcManager { #[derive(Debug)] struct TaRpcPacketInfo { - from_peer: uuid::Uuid, - to_peer: uuid::Uuid, + from_peer: PeerId, + to_peer: PeerId, service_id: PeerRpcServiceId, is_req: bool, content: Vec, @@ -89,7 +92,7 @@ impl PeerRpcManager { S::Fut: Send + 'static, { let tspt = self.tspt.clone(); - let creator = Box::new(move |peer_id: uuid::Uuid| { + let creator = Box::new(move |peer_id: PeerId| { let mut tasks = JoinSet::new(); let (packet_sender, mut packet_receiver) = mpsc::unbounded_channel::(); let (mut client_transport, server_transport) = tarpc::transport::channel::unbounded(); @@ -103,7 +106,7 @@ impl PeerRpcManager { let tspt = tspt.clone(); tasks.spawn(async move { - let mut cur_req_uuid = None; + let mut cur_req_peer_id = None; loop { tokio::select! { Some(resp) = client_transport.next() => { @@ -115,8 +118,8 @@ impl PeerRpcManager { } let resp = resp.unwrap(); - if cur_req_uuid.is_none() { - tracing::error!("[PEER RPC MGR] cur_req_uuid is none, ignore this resp"); + if cur_req_peer_id.is_none() { + tracing::error!("[PEER RPC MGR] cur_req_peer_id is none, ignore this resp"); continue; } @@ -128,13 +131,13 @@ impl PeerRpcManager { let msg = Packet::new_tarpc_packet( tspt.my_peer_id(), - cur_req_uuid.take().unwrap(), + cur_req_peer_id.take().unwrap(), service_id, false, serialized_resp.unwrap(), ); - if let Err(e) = tspt.send(msg.into(), &peer_id).await { + if let Err(e) = tspt.send(msg.into(), peer_id).await { tracing::error!(error = ?e, peer_id = ?peer_id, service_id = ?service_id, "send resp to peer failed"); } } @@ -152,7 +155,7 @@ impl PeerRpcManager { } assert_eq!(info.service_id, service_id); - cur_req_uuid = Some(packet.from_peer.clone().into()); + cur_req_peer_id = Some(packet.from_peer.clone().into()); tracing::trace!("recv packet from peer, packet: {:?}", packet); @@ -205,8 +208,8 @@ impl PeerRpcManager { fn parse_rpc_packet(packet: &Packet) -> Result { match &packet.body { PacketBody::Ctrl(CtrlPacketBody::TaRpc(id, is_req, body)) => Ok(TaRpcPacketInfo { - from_peer: packet.from_peer.clone().into(), - to_peer: packet.to_peer.clone().unwrap().into(), + from_peer: packet.from_peer.into(), + to_peer: packet.to_peer.into(), service_id: *id, is_req: *is_req, content: body.clone(), @@ -267,7 +270,7 @@ impl PeerRpcManager { pub async fn do_client_rpc_scoped( &self, service_id: PeerRpcServiceId, - dst_peer_id: uuid::Uuid, + dst_peer_id: PeerId, f: impl FnOnce(UnboundedChannel) -> Fut, ) -> RpcRet where @@ -304,7 +307,7 @@ impl PeerRpcManager { a.unwrap(), ); - if let Err(e) = tspt.send(a.into(), &dst_peer_id).await { + if let Err(e) = tspt.send(a.into(), dst_peer_id).await { tracing::error!(error = ?e, dst_peer_id = ?dst_peer_id, "send to peer failed"); } } @@ -343,7 +346,7 @@ impl PeerRpcManager { f(client_transport).await } - pub fn my_peer_id(&self) -> uuid::Uuid { + pub fn my_peer_id(&self) -> PeerId { self.tspt.my_peer_id() } } @@ -354,7 +357,7 @@ mod tests { use tokio_util::bytes::Bytes; use crate::{ - common::error::Error, + common::{error::Error, new_peer_id, PeerId}, peers::{ peer_rpc::PeerRpcManager, tests::{connect_peer_manager, create_mock_peer_manager, wait_route_appear}, @@ -385,15 +388,15 @@ mod tests { async fn peer_rpc_basic_test() { struct MockTransport { tunnel: Box, - my_peer_id: uuid::Uuid, + my_peer_id: PeerId, } #[async_trait::async_trait] impl PeerRpcManagerTransport for MockTransport { - fn my_peer_id(&self) -> uuid::Uuid { + fn my_peer_id(&self) -> PeerId { self.my_peer_id } - async fn send(&self, msg: Bytes, _dst_peer_id: &uuid::Uuid) -> Result<(), Error> { + async fn send(&self, msg: Bytes, _dst_peer_id: PeerId) -> Result<(), Error> { println!("rpc mgr send: {:?}", msg); self.tunnel.pin_sink().send(msg).await.unwrap(); Ok(()) @@ -409,7 +412,7 @@ mod tests { let server_rpc_mgr = PeerRpcManager::new(MockTransport { tunnel: st, - my_peer_id: uuid::Uuid::new_v4(), + my_peer_id: new_peer_id(), }); server_rpc_mgr.run(); let s = MockService { @@ -419,7 +422,7 @@ mod tests { let client_rpc_mgr = PeerRpcManager::new(MockTransport { tunnel: ct, - my_peer_id: uuid::Uuid::new_v4(), + my_peer_id: new_peer_id(), }); client_rpc_mgr.run(); @@ -442,23 +445,23 @@ mod tests { let peer_mgr_c = create_mock_peer_manager().await; connect_peer_manager(peer_mgr_a.clone(), peer_mgr_b.clone()).await; connect_peer_manager(peer_mgr_b.clone(), peer_mgr_c.clone()).await; - wait_route_appear(peer_mgr_a.clone(), peer_mgr_b.my_node_id()) + wait_route_appear(peer_mgr_a.clone(), peer_mgr_b.my_peer_id()) .await .unwrap(); - wait_route_appear(peer_mgr_a.clone(), peer_mgr_c.my_node_id()) + wait_route_appear(peer_mgr_a.clone(), peer_mgr_c.my_peer_id()) .await .unwrap(); assert_eq!(peer_mgr_a.get_peer_map().list_peers().await.len(), 1); assert_eq!( peer_mgr_a.get_peer_map().list_peers().await[0], - peer_mgr_b.my_node_id() + peer_mgr_b.my_peer_id() ); assert_eq!(peer_mgr_c.get_peer_map().list_peers().await.len(), 1); assert_eq!( peer_mgr_c.get_peer_map().list_peers().await[0], - peer_mgr_b.my_node_id() + peer_mgr_b.my_peer_id() ); let s = MockService { @@ -468,7 +471,7 @@ mod tests { let ip_list = peer_mgr_a .get_peer_rpc_mgr() - .do_client_rpc_scoped(1, peer_mgr_b.my_node_id(), |c| async { + .do_client_rpc_scoped(1, peer_mgr_b.my_peer_id(), |c| async { let c = TestRpcServiceClient::new(tarpc::client::Config::default(), c).spawn(); let ret = c.hello(tarpc::context::current(), "abc".to_owned()).await; ret @@ -479,7 +482,7 @@ mod tests { let ip_list = peer_mgr_c .get_peer_rpc_mgr() - .do_client_rpc_scoped(1, peer_mgr_b.my_node_id(), |c| async { + .do_client_rpc_scoped(1, peer_mgr_b.my_peer_id(), |c| async { let c = TestRpcServiceClient::new(tarpc::client::Config::default(), c).spawn(); let ret = c.hello(tarpc::context::current(), "bcd".to_owned()).await; ret @@ -494,14 +497,14 @@ mod tests { let peer_mgr_a = create_mock_peer_manager().await; let peer_mgr_b = create_mock_peer_manager().await; connect_peer_manager(peer_mgr_a.clone(), peer_mgr_b.clone()).await; - wait_route_appear(peer_mgr_a.clone(), peer_mgr_b.my_node_id()) + wait_route_appear(peer_mgr_a.clone(), peer_mgr_b.my_peer_id()) .await .unwrap(); assert_eq!(peer_mgr_a.get_peer_map().list_peers().await.len(), 1); assert_eq!( peer_mgr_a.get_peer_map().list_peers().await[0], - peer_mgr_b.my_node_id() + peer_mgr_b.my_peer_id() ); let s = MockService { @@ -515,7 +518,7 @@ mod tests { let ip_list = peer_mgr_a .get_peer_rpc_mgr() - .do_client_rpc_scoped(1, peer_mgr_b.my_node_id(), |c| async { + .do_client_rpc_scoped(1, peer_mgr_b.my_peer_id(), |c| async { let c = TestRpcServiceClient::new(tarpc::client::Config::default(), c).spawn(); let ret = c.hello(tarpc::context::current(), "abc".to_owned()).await; ret @@ -526,7 +529,7 @@ mod tests { let ip_list = peer_mgr_a .get_peer_rpc_mgr() - .do_client_rpc_scoped(2, peer_mgr_b.my_node_id(), |c| async { + .do_client_rpc_scoped(2, peer_mgr_b.my_peer_id(), |c| async { let c = TestRpcServiceClient::new(tarpc::client::Config::default(), c).spawn(); let ret = c.hello(tarpc::context::current(), "abc".to_owned()).await; ret diff --git a/easytier-core/src/peers/route_trait.rs b/easytier-core/src/peers/route_trait.rs index 3bb59b6..6636435 100644 --- a/easytier-core/src/peers/route_trait.rs +++ b/easytier-core/src/peers/route_trait.rs @@ -3,9 +3,7 @@ use std::{net::Ipv4Addr, sync::Arc}; use async_trait::async_trait; use tokio_util::bytes::Bytes; -use crate::common::error::Error; - -use super::PeerId; +use crate::common::{error::Error, PeerId}; #[async_trait] pub trait RouteInterface { @@ -14,8 +12,9 @@ pub trait RouteInterface { &self, msg: Bytes, route_id: u8, - dst_peer_id: &PeerId, + dst_peer_id: PeerId, ) -> Result<(), Error>; + fn my_peer_id(&self) -> PeerId; } pub type RouteInterfaceBox = Box; @@ -26,7 +25,7 @@ pub trait Route { async fn open(&self, interface: RouteInterfaceBox) -> Result; async fn close(&self); - async fn get_next_hop(&self, peer_id: &PeerId) -> Option; + async fn get_next_hop(&self, peer_id: PeerId) -> Option; async fn list_routes(&self) -> Vec; async fn get_peer_id_by_ipv4(&self, _ipv4: &Ipv4Addr) -> Option { diff --git a/easytier-core/src/peers/rpc_service.rs b/easytier-core/src/peers/rpc_service.rs index 4363e97..14bb009 100644 --- a/easytier-core/src/peers/rpc_service.rs +++ b/easytier-core/src/peers/rpc_service.rs @@ -23,7 +23,7 @@ impl PeerManagerRpcService { let mut peer_infos = Vec::new(); for peer in peers { let mut peer_info = PeerInfo::default(); - peer_info.peer_id = peer.to_string(); + peer_info.peer_id = peer; if let Some(conns) = self .peer_manager diff --git a/easytier-core/src/peers/tests.rs b/easytier-core/src/peers/tests.rs index 163da2e..39f70d8 100644 --- a/easytier-core/src/peers/tests.rs +++ b/easytier-core/src/peers/tests.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use crate::{ - common::{error::Error, global_ctx::tests::get_mock_global_ctx}, + common::{error::Error, global_ctx::tests::get_mock_global_ctx, PeerId}, tunnels::ring_tunnel::create_ring_tunnel_pair, }; @@ -28,16 +28,16 @@ pub async fn connect_peer_manager(client: Arc, server: Arc, - node_id: uuid::Uuid, + node_id: PeerId, cost: Option, ) -> Result<(), Error> { let now = std::time::Instant::now(); while now.elapsed().as_secs() < 5 { let route = peer_mgr.list_routes().await; - if route.iter().any(|r| { - r.peer_id.clone().parse::().unwrap() == node_id - && (cost.is_none() || r.cost == cost.unwrap()) - }) { + if route + .iter() + .any(|r| r.peer_id == node_id && (cost.is_none() || r.cost == cost.unwrap())) + { return Ok(()); } tokio::time::sleep(std::time::Duration::from_millis(50)).await; @@ -45,9 +45,6 @@ pub async fn wait_route_appear_with_cost( return Err(Error::NotFound); } -pub async fn wait_route_appear( - peer_mgr: Arc, - node_id: uuid::Uuid, -) -> Result<(), Error> { +pub async fn wait_route_appear(peer_mgr: Arc, node_id: PeerId) -> Result<(), Error> { wait_route_appear_with_cost(peer_mgr, node_id, None).await } diff --git a/easytier-core/src/tests/mod.rs b/easytier-core/src/tests/mod.rs index 984507b..1eb42bc 100644 --- a/easytier-core/src/tests/mod.rs +++ b/easytier-core/src/tests/mod.rs @@ -1,3 +1,5 @@ +use crate::common::PeerId; + mod three_node; pub fn get_guest_veth_name(net_ns: &str) -> &str { @@ -124,12 +126,12 @@ pub fn enable_log() { .init(); } -fn check_route(ipv4: &str, dst_peer_id: uuid::Uuid, routes: Vec) { +fn check_route(ipv4: &str, dst_peer_id: PeerId, routes: Vec) { let mut found = false; for r in routes.iter() { if r.ipv4_addr == ipv4.to_string() { found = true; - assert_eq!(r.peer_id, dst_peer_id.to_string(), "{:?}", routes); + assert_eq!(r.peer_id, dst_peer_id, "{:?}", routes); } } assert!(found); @@ -138,7 +140,7 @@ fn check_route(ipv4: &str, dst_peer_id: uuid::Uuid, routes: Vec, ipv4: &str, - dst_peer_id: uuid::Uuid, + dst_peer_id: PeerId, proxy_cidr: &str, ) { let now = std::time::Instant::now(); @@ -146,7 +148,7 @@ async fn wait_proxy_route_appear( for r in mgr.list_routes().await.iter() { let r = r; if r.proxy_cidrs.contains(&proxy_cidr.to_owned()) { - assert_eq!(r.peer_id, dst_peer_id.to_string()); + assert_eq!(r.peer_id, dst_peer_id); assert_eq!(r.ipv4_addr, ipv4); return; } diff --git a/easytier-core/src/tests/three_node.rs b/easytier-core/src/tests/three_node.rs index 2f89385..d5fca58 100644 --- a/easytier-core/src/tests/three_node.rs +++ b/easytier-core/src/tests/three_node.rs @@ -105,13 +105,13 @@ pub async fn basic_three_node_test_tcp() { check_route( "10.144.144.2", - insts[1].id(), + insts[1].peer_id(), insts[0].get_peer_manager().list_routes().await, ); check_route( "10.144.144.3", - insts[2].id(), + insts[2].peer_id(), insts[0].get_peer_manager().list_routes().await, ); } @@ -123,13 +123,13 @@ pub async fn basic_three_node_test_udp() { check_route( "10.144.144.2", - insts[1].id(), + insts[1].peer_id(), insts[0].get_peer_manager().list_routes().await, ); check_route( "10.144.144.3", - insts[2].id(), + insts[2].peer_id(), insts[0].get_peer_manager().list_routes().await, ); } @@ -148,7 +148,7 @@ pub async fn tcp_proxy_three_node_test() { wait_proxy_route_appear( &insts[0].get_peer_manager(), "10.144.144.3", - insts[2].id(), + insts[2].peer_id(), "10.1.2.0/24", ) .await; @@ -182,7 +182,7 @@ pub async fn icmp_proxy_three_node_test() { wait_proxy_route_appear( &insts[0].get_peer_manager(), "10.144.144.3", - insts[2].id(), + insts[2].peer_id(), "10.1.2.0/24", ) .await; @@ -244,7 +244,7 @@ pub async fn udp_proxy_three_node_test() { wait_proxy_route_appear( &insts[0].get_peer_manager(), "10.144.144.3", - insts[2].id(), + insts[2].peer_id(), "10.1.2.0/24", ) .await;