diff --git a/easytier-core/Cargo.toml b/easytier-core/Cargo.toml index 5408d00..8827ebf 100644 --- a/easytier-core/Cargo.toml +++ b/easytier-core/Cargo.toml @@ -13,7 +13,11 @@ path = "src/rpc/lib.rs" [dependencies] tracing = { version = "0.1", features = ["log"] } -tracing-subscriber = { version = "0.3", features = ["env-filter", "local-time", "time"] } +tracing-subscriber = { version = "0.3", features = [ + "env-filter", + "local-time", + "time", +] } tracing-appender = "0.2.3" log = "0.4" thiserror = "1.0" @@ -77,21 +81,11 @@ stun-format = { git = "https://github.com/KKRainbow/stun-format.git", features = ] } rand = "0.8.5" -[dependencies.serde] -version = "1.0" -features = ["derive"] +serde = { version = "1.0", features = ["derive"] } +pnet = { version = "0.34.0", features = ["serde"] } +public-ip = { version = "0.2", features = ["default"] } -[dependencies.pnet] -version = "0.34.0" -features = ["serde"] - -[dependencies.clap] -version = "4.4" -features = ["derive"] - -[dependencies.public-ip] -version = "0.2" -features = ["default"] +clap = { version = "4.4", features = ["derive"] } [build-dependencies] tonic-build = "0.10" diff --git a/easytier-core/build.rs b/easytier-core/build.rs index e69e706..dd8a4fb 100644 --- a/easytier-core/build.rs +++ b/easytier-core/build.rs @@ -90,6 +90,10 @@ fn main() -> Result<(), Box> { #[cfg(target_os = "windows")] WindowsBuild::check_for_win(); - tonic_build::compile_protos("proto/cli.proto")?; + tonic_build::configure() + .type_attribute(".", "#[derive(serde::Serialize, serde::Deserialize)]") + .compile(&["proto/cli.proto"], &["proto/"]) + .unwrap(); + // tonic_build::compile_protos("proto/cli.proto")?; Ok(()) } diff --git a/easytier-core/src/main.rs b/easytier-core/src/main.rs index 1132952..928674b 100644 --- a/easytier-core/src/main.rs +++ b/easytier-core/src/main.rs @@ -9,6 +9,7 @@ mod common; mod connector; mod gateway; mod instance; +mod peer_center; mod peers; mod tunnels; diff --git a/easytier-core/src/peer_center/instance.rs b/easytier-core/src/peer_center/instance.rs new file mode 100644 index 0000000..ee54cd6 --- /dev/null +++ b/easytier-core/src/peer_center/instance.rs @@ -0,0 +1,252 @@ +use std::{ + collections::hash_map::DefaultHasher, + hash::{Hash, Hasher}, + sync::{ + atomic::{AtomicBool, Ordering}, + Arc, + }, + time::{Duration, SystemTime}, +}; + +use futures::Future; +use tokio::{sync::Mutex, task::JoinSet}; +use tracing::Instrument; + +use crate::peers::{peer_manager::PeerManager, rpc_service::PeerManagerRpcService, PeerId}; + +use super::{ + server::PeerCenterServer, + service::{PeerCenterService, PeerCenterServiceClient, PeerInfoForGlobalMap}, + Digest, Error, +}; + +pub struct PeerCenterClient { + peer_mgr: Arc, + tasks: Arc>>, +} + +static SERVICE_ID: u32 = 5; + +struct PeridicJobCtx { + peer_mgr: Arc, + job_ctx: T, +} + +impl PeerCenterClient { + pub async fn init(&self) -> Result<(), Error> { + self.peer_mgr.get_peer_rpc_mgr().run_service( + SERVICE_ID, + PeerCenterServer::new(self.peer_mgr.my_node_id()).serve(), + ); + + Ok(()) + } + + async fn select_center_peer(peer_mgr: &Arc) -> Option { + let peers = peer_mgr.list_routes().await; + if peers.is_empty() { + return None; + } + // find peer with alphabetical smallest id. + let mut min_peer = peer_mgr.my_node_id().to_string(); + for peer in peers.iter() { + if peer.peer_id < min_peer { + min_peer = peer.peer_id.clone(); + } + } + Some(min_peer.parse().unwrap()) + } + + async fn init_periodic_job< + T: Send + Sync + 'static + Clone, + Fut: Future> + Send + 'static, + >( + &self, + job_ctx: T, + job_fn: (impl Fn(PeerCenterServiceClient, Arc>) -> Fut + Send + Sync + 'static), + ) -> () { + let my_node_id = self.peer_mgr.my_node_id(); + let peer_mgr = self.peer_mgr.clone(); + self.tasks.lock().await.spawn( + async move { + let ctx = Arc::new(PeridicJobCtx { + peer_mgr: peer_mgr.clone(), + job_ctx, + }); + tracing::warn!(?my_node_id, "before periodic job loop"); + loop { + let Some(center_peer) = Self::select_center_peer(&peer_mgr).await else { + tracing::warn!("no center peer found, sleep 1 second"); + tokio::time::sleep(Duration::from_secs(1)).await; + continue; + }; + tracing::warn!(?center_peer, "run periodic job"); + let rpc_mgr = peer_mgr.get_peer_rpc_mgr(); + let ret = rpc_mgr + .do_client_rpc_scoped(SERVICE_ID, center_peer, |c| async { + let client = + PeerCenterServiceClient::new(tarpc::client::Config::default(), c) + .spawn(); + job_fn(client, ctx.clone()).await + }) + .await; + + let Ok(sleep_time_ms) = ret else { + tracing::error!("periodic job to center server rpc failed: {:?}", ret); + tokio::time::sleep(Duration::from_secs(3)).await; + continue; + }; + + if sleep_time_ms > 0 { + tokio::time::sleep(Duration::from_millis(sleep_time_ms as u64)).await; + } + } + } + .instrument(tracing::info_span!("periodic_job", ?my_node_id)), + ); + } + + pub async fn new(peer_mgr: Arc) -> Self { + PeerCenterClient { + peer_mgr, + tasks: Arc::new(Mutex::new(JoinSet::new())), + } + } +} + +struct PeerCenterInstance { + peer_mgr: Arc, + client: Arc, +} + +impl PeerCenterInstance { + pub async fn new(peer_mgr: Arc) -> Self { + let client = Arc::new(PeerCenterClient::new(peer_mgr.clone()).await); + client.init().await.unwrap(); + + PeerCenterInstance { peer_mgr, client } + } + + async fn init_get_global_info_job(&self) { + self.client + .init_periodic_job({}, |client, _ctx| async move { + let ret = client + .get_global_peer_map(tarpc::context::current(), 0) + .await?; + + let Ok(global_peer_map) = ret else { + tracing::error!( + "get global info from center server got error result: {:?}", + ret + ); + return Ok(1000); + }; + + tracing::warn!("get global info from center server: {:?}", global_peer_map); + + Ok(5000) + }) + .await; + } + + async fn init_report_peers_job(&self) { + struct Ctx { + service: PeerManagerRpcService, + need_send_peers: AtomicBool, + } + let ctx = Arc::new(Ctx { + service: PeerManagerRpcService::new(self.peer_mgr.clone()), + need_send_peers: AtomicBool::new(true), + }); + + self.client + .init_periodic_job(ctx, |client, ctx| async move { + let my_node_id = ctx.peer_mgr.my_node_id(); + let peers: PeerInfoForGlobalMap = ctx.job_ctx.service.list_peers().await.into(); + let mut hasher = DefaultHasher::new(); + peers.hash(&mut hasher); + + let peers = if ctx.job_ctx.need_send_peers.load(Ordering::Relaxed) { + Some(peers) + } else { + None + }; + let mut rpc_ctx = tarpc::context::current(); + rpc_ctx.deadline = SystemTime::now() + Duration::from_secs(3); + + let ret = client + .report_peers( + rpc_ctx, + my_node_id.clone(), + peers, + hasher.finish() as Digest, + ) + .await?; + + if matches!(ret.as_ref().err(), Some(Error::DigestMismatch)) { + ctx.job_ctx.need_send_peers.store(true, Ordering::Relaxed); + return Ok(0); + } else if ret.is_err() { + tracing::error!("report peers to center server got error result: {:?}", ret); + return Ok(500); + } + + ctx.job_ctx.need_send_peers.store(false, Ordering::Relaxed); + Ok(1000) + }) + .await; + } +} + +#[cfg(test)] +mod tests { + use crate::{ + peer_center::server::get_global_data, + peers::tests::{connect_peer_manager, create_mock_peer_manager, wait_route_appear}, + }; + + use super::*; + + #[tokio::test] + async fn test_peer_center_instance() { + let peer_mgr_a = create_mock_peer_manager().await; + let peer_mgr_b = create_mock_peer_manager().await; + let peer_mgr_c = create_mock_peer_manager().await; + + let peer_center_a = PeerCenterInstance::new(peer_mgr_a.clone()).await; + let peer_center_b = PeerCenterInstance::new(peer_mgr_b.clone()).await; + let peer_center_c = PeerCenterInstance::new(peer_mgr_c.clone()).await; + + peer_center_a.init_report_peers_job().await; + peer_center_b.init_report_peers_job().await; + peer_center_c.init_report_peers_job().await; + + connect_peer_manager(peer_mgr_a.clone(), peer_mgr_b.clone()).await; + connect_peer_manager(peer_mgr_b.clone(), peer_mgr_c.clone()).await; + + wait_route_appear(peer_mgr_a.clone(), peer_mgr_c.my_node_id()) + .await + .unwrap(); + + let center_peer = PeerCenterClient::select_center_peer(&peer_mgr_a) + .await + .unwrap(); + let center_data = get_global_data(center_peer); + + // wait center_data has 3 records for 10 seconds + let now = std::time::Instant::now(); + loop { + if center_data.read().await.global_peer_map.map.len() == 3 { + println!( + "center data ready, {:#?}", + center_data.read().await.global_peer_map + ); + break; + } + if now.elapsed().as_secs() > 60 { + panic!("center data not ready"); + } + tokio::time::sleep(Duration::from_millis(100)).await; + } + } +} diff --git a/easytier-core/src/peer_center/mod.rs b/easytier-core/src/peer_center/mod.rs new file mode 100644 index 0000000..32dbd37 --- /dev/null +++ b/easytier-core/src/peer_center/mod.rs @@ -0,0 +1,20 @@ +// peer_center is used to collect peer info into one peer node. +// the center node is selected with the following rules: +// 1. has smallest peer id +// 2. TODO: has allow_to_be_center peer feature +// peer center is not guaranteed to be stable and can be changed when peer enter or leave. +// it's used to reduce the cost to exchange infos between peers. + +mod instance; +mod server; +mod service; + +#[derive(thiserror::Error, Debug, serde::Deserialize, serde::Serialize)] +pub enum Error { + #[error("Digest not match, need provide full peer info to center server.")] + DigestMismatch, + #[error("Not center server")] + NotCenterServer, +} + +pub type Digest = u64; diff --git a/easytier-core/src/peer_center/server.rs b/easytier-core/src/peer_center/server.rs new file mode 100644 index 0000000..efd5c4e --- /dev/null +++ b/easytier-core/src/peer_center/server.rs @@ -0,0 +1,114 @@ +use std::{ + hash::{Hash, Hasher}, + sync::Arc, +}; + +use dashmap::DashMap; +use once_cell::sync::Lazy; +use tokio::sync::RwLock; + +use crate::peers::PeerId; + +use super::{ + service::{GlobalPeerMap, PeerCenterService, PeerInfoForGlobalMap}, + Digest, Error, +}; + +pub(crate) struct PeerCenterServerGlobalData { + pub global_peer_map: GlobalPeerMap, + pub digest: Digest, + pub update_time: std::time::Instant, +} + +impl PeerCenterServerGlobalData { + fn new() -> Self { + PeerCenterServerGlobalData { + global_peer_map: GlobalPeerMap::new(), + digest: Digest::default(), + update_time: std::time::Instant::now(), + } + } +} + +// a global unique instance for PeerCenterServer +pub(crate) static GLOBAL_DATA: Lazy>>> = + Lazy::new(DashMap::new); + +pub(crate) fn get_global_data(node_id: PeerId) -> Arc> { + GLOBAL_DATA + .entry(node_id) + .or_insert_with(|| Arc::new(RwLock::new(PeerCenterServerGlobalData::new()))) + .value() + .clone() +} + +#[derive(Clone, Debug)] +pub struct PeerCenterServer { + // every peer has its own server, so use per-struct dash map is ok. + my_node_id: PeerId, + digest_map: DashMap, +} + +impl PeerCenterServer { + pub fn new(my_node_id: PeerId) -> Self { + PeerCenterServer { + my_node_id, + digest_map: DashMap::new(), + } + } +} + +#[tarpc::server] +impl PeerCenterService for PeerCenterServer { + #[tracing::instrument()] + async fn report_peers( + self, + _: tarpc::context::Context, + my_peer_id: PeerId, + peers: Option, + digest: Digest, + ) -> Result<(), Error> { + tracing::warn!("receive report_peers"); + + let old_digest = self.digest_map.get(&my_peer_id); + // if digest match, no need to update + if let Some(old_digest) = old_digest { + if *old_digest == digest { + return Ok(()); + } + } + + if peers.is_none() { + return Err(Error::DigestMismatch); + } + + self.digest_map.insert(my_peer_id, digest); + let data = get_global_data(self.my_node_id); + let mut locked_data = data.write().await; + locked_data + .global_peer_map + .map + .insert(my_peer_id, peers.unwrap()); + + let mut hasher = std::collections::hash_map::DefaultHasher::new(); + locked_data.global_peer_map.map.hash(&mut hasher); + locked_data.digest = hasher.finish() as Digest; + + Ok(()) + } + + async fn get_global_peer_map( + self, + _: tarpc::context::Context, + digest: Digest, + ) -> Result, Error> { + let data = get_global_data(self.my_node_id); + if digest == data.read().await.digest { + return Ok(None); + } + + let data = get_global_data(self.my_node_id); + let locked_data = data.read().await; + Ok(Some(locked_data.global_peer_map.clone())) + } +} diff --git a/easytier-core/src/peer_center/service.rs b/easytier-core/src/peer_center/service.rs new file mode 100644 index 0000000..3c0ddf6 --- /dev/null +++ b/easytier-core/src/peer_center/service.rs @@ -0,0 +1,92 @@ +use std::collections::BTreeMap; + +use crate::peers::PeerId; + +use super::{Digest, Error}; +use easytier_rpc::PeerInfo; + +#[derive(Debug, Clone, Hash, serde::Deserialize, serde::Serialize)] +pub enum LatencyLevel { + VeryLow, + Low, + Normal, + High, + VeryHigh, +} + +impl LatencyLevel { + pub const fn from_latency_ms(lat_ms: u32) -> Self { + if lat_ms < 10 { + LatencyLevel::VeryLow + } else if lat_ms < 50 { + LatencyLevel::Low + } else if lat_ms < 100 { + LatencyLevel::Normal + } else if lat_ms < 200 { + LatencyLevel::High + } else { + LatencyLevel::VeryHigh + } + } +} + +#[derive(Debug, Clone, Hash, serde::Deserialize, serde::Serialize)] +pub struct PeerConnInfoForGlobalMap { + to_peer_id: PeerId, + latency_level: LatencyLevel, +} + +#[derive(Debug, Clone, Hash, serde::Deserialize, serde::Serialize)] +pub struct PeerInfoForGlobalMap { + pub direct_peers: BTreeMap>, +} + +impl From> for PeerInfoForGlobalMap { + fn from(peers: Vec) -> Self { + let mut peer_map = BTreeMap::new(); + for peer in peers { + let mut conn_info = Vec::new(); + for conn in peer.conns { + conn_info.push(PeerConnInfoForGlobalMap { + to_peer_id: conn.peer_id.parse().unwrap(), + latency_level: LatencyLevel::from_latency_ms( + conn.stats.unwrap().latency_us as u32 / 1000, + ), + }); + } + // sort conn info so hash result is stable + conn_info.sort_by(|a, b| a.to_peer_id.cmp(&b.to_peer_id)); + peer_map.insert(peer.peer_id.parse().unwrap(), conn_info); + } + PeerInfoForGlobalMap { + direct_peers: peer_map, + } + } +} + +// a global peer topology map, peers can use it to find optimal path to other peers +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct GlobalPeerMap { + pub map: BTreeMap, +} + +impl GlobalPeerMap { + pub fn new() -> Self { + GlobalPeerMap { + map: BTreeMap::new(), + } + } +} + +#[tarpc::service] +pub trait PeerCenterService { + // report center server which peer is directly connected to me + // digest is a hash of current peer map, if digest not match, we need to transfer the whole map + async fn report_peers( + my_peer_id: PeerId, + peers: Option, + digest: Digest, + ) -> Result<(), Error>; + + async fn get_global_peer_map(digest: Digest) -> Result, Error>; +} diff --git a/easytier-core/src/peers/peer_manager.rs b/easytier-core/src/peers/peer_manager.rs index b81aac1..39ad32b 100644 --- a/easytier-core/src/peers/peer_manager.rs +++ b/easytier-core/src/peers/peer_manager.rs @@ -128,7 +128,7 @@ impl Debug for PeerManager { impl PeerManager { pub fn new(global_ctx: ArcGlobalCtx, nic_channel: mpsc::Sender) -> Self { let (packet_send, packet_recv) = mpsc::channel(100); - let peers = Arc::new(PeerMap::new(packet_send.clone())); + let peers = Arc::new(PeerMap::new(packet_send.clone(), global_ctx.clone())); // TODO: remove these because we have impl pipeline processor. let (peer_rpc_tspt_sender, peer_rpc_tspt_recv) = mpsc::unbounded_channel(); @@ -166,9 +166,7 @@ impl PeerManager { peer.do_handshake_as_client().await?; let conn_id = peer.get_conn_id(); let peer_id = peer.get_peer_id(); - self.peers - .add_new_peer_conn(peer, self.global_ctx.clone()) - .await; + self.peers.add_new_peer_conn(peer).await; Ok((peer_id, conn_id)) } @@ -189,9 +187,7 @@ impl PeerManager { tracing::info!("add tunnel as server start"); let mut peer = PeerConn::new(self.my_node_id, self.global_ctx.clone(), tunnel); peer.do_handshake_as_server().await?; - self.peers - .add_new_peer_conn(peer, self.global_ctx.clone()) - .await; + self.peers.add_new_peer_conn(peer).await; tracing::info!("add tunnel as server done"); Ok(()) } diff --git a/easytier-core/src/peers/peer_map.rs b/easytier-core/src/peers/peer_map.rs index 7ee5c3b..c9fa745 100644 --- a/easytier-core/src/peers/peer_map.rs +++ b/easytier-core/src/peers/peer_map.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use anyhow::Context; use dashmap::DashMap; use easytier_rpc::PeerConnInfo; use tokio::sync::mpsc; @@ -13,13 +14,15 @@ use crate::{ use super::{peer::Peer, peer_conn::PeerConn, route_trait::ArcRoute, PeerId}; pub struct PeerMap { + global_ctx: ArcGlobalCtx, peer_map: DashMap>, packet_send: mpsc::Sender, } impl PeerMap { - pub fn new(packet_send: mpsc::Sender) -> Self { + pub fn new(packet_send: mpsc::Sender, global_ctx: ArcGlobalCtx) -> Self { PeerMap { + global_ctx, peer_map: DashMap::new(), packet_send, } @@ -29,11 +32,11 @@ impl PeerMap { self.peer_map.insert(peer.peer_node_id, Arc::new(peer)); } - pub async fn add_new_peer_conn(&self, peer_conn: PeerConn, global_ctx: ArcGlobalCtx) { + pub async fn add_new_peer_conn(&self, peer_conn: PeerConn) { let peer_id = peer_conn.get_peer_id(); let no_entry = self.peer_map.get(&peer_id).is_none(); if no_entry { - let new_peer = Peer::new(peer_id, self.packet_send.clone(), global_ctx); + let new_peer = Peer::new(peer_id, self.packet_send.clone(), self.global_ctx.clone()); new_peer.add_peer_conn(peer_conn).await; self.add_new_peer(new_peer).await; } else { @@ -51,6 +54,14 @@ impl PeerMap { msg: Bytes, dst_peer_id: &uuid::Uuid, ) -> Result<(), Error> { + if *dst_peer_id == self.global_ctx.get_id() { + return Ok(self + .packet_send + .send(msg) + .await + .with_context(|| "send msg to self failed")?); + } + match self.get_peer_by_id(dst_peer_id) { Some(peer) => { peer.send_msg(msg).await?; @@ -70,6 +81,14 @@ impl PeerMap { dst_peer_id: &uuid::Uuid, route: ArcRoute, ) -> Result<(), Error> { + if *dst_peer_id == self.global_ctx.get_id() { + return Ok(self + .packet_send + .send(msg) + .await + .with_context(|| "send msg to self failed")?); + } + // get route info let gateway_peer_id = route.get_next_hop(dst_peer_id).await; diff --git a/easytier-core/src/peers/rpc_service.rs b/easytier-core/src/peers/rpc_service.rs index 5814f34..2bf4d17 100644 --- a/easytier-core/src/peers/rpc_service.rs +++ b/easytier-core/src/peers/rpc_service.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use easytier_rpc::cli::PeerInfo; use easytier_rpc::peer_manage_rpc_server::PeerManageRpc; use easytier_rpc::{ListPeerRequest, ListPeerResponse, ListRouteRequest, ListRouteResponse}; use tonic::{Request, Response, Status}; @@ -14,17 +15,10 @@ impl PeerManagerRpcService { pub fn new(peer_manager: Arc) -> Self { PeerManagerRpcService { peer_manager } } -} - -#[tonic::async_trait] -impl PeerManageRpc for PeerManagerRpcService { - async fn list_peer( - &self, - _request: Request, // Accept request of type HelloRequest - ) -> Result, Status> { - let mut reply = ListPeerResponse::default(); + pub async fn list_peers(&self) -> Vec { let peers = self.peer_manager.get_peer_map().list_peers().await; + let mut peer_infos = Vec::new(); for peer in peers { let mut peer_info = easytier_rpc::PeerInfo::default(); peer_info.peer_id = peer.to_string(); @@ -38,7 +32,24 @@ impl PeerManageRpc for PeerManagerRpcService { peer_info.conns = conns; } - reply.peer_infos.push(peer_info); + peer_infos.push(peer_info); + } + + peer_infos + } +} + +#[tonic::async_trait] +impl PeerManageRpc for PeerManagerRpcService { + async fn list_peer( + &self, + _request: Request, // Accept request of type HelloRequest + ) -> Result, Status> { + let mut reply = ListPeerResponse::default(); + + let peers = self.list_peers().await; + for peer in peers { + reply.peer_infos.push(peer); } Ok(Response::new(reply)) diff --git a/easytier-core/src/tests/mod.rs b/easytier-core/src/tests/mod.rs index 83353f6..ed01a5e 100644 --- a/easytier-core/src/tests/mod.rs +++ b/easytier-core/src/tests/mod.rs @@ -116,7 +116,8 @@ pub fn enable_log() { let filter = tracing_subscriber::EnvFilter::builder() .with_default_directive(tracing::level_filters::LevelFilter::INFO.into()) .from_env() - .unwrap(); + .unwrap() + .add_directive("tarpc=error".parse().unwrap()); tracing_subscriber::fmt::fmt() .pretty() .with_env_filter(filter) diff --git a/easytier-core/src/tunnels/stats.rs b/easytier-core/src/tunnels/stats.rs index 9765570..8e8d7a4 100644 --- a/easytier-core/src/tunnels/stats.rs +++ b/easytier-core/src/tunnels/stats.rs @@ -23,7 +23,7 @@ impl WindowLatency { pub fn record_latency(&self, latency_us: u32) { let index = self.latency_us_window_index.fetch_add(1, Relaxed); - if index < self.latency_us_window_size { + if self.count.load(Relaxed) < self.latency_us_window_size { self.count.fetch_add(1, Relaxed); }