From 29365c39ed0c15d9351cdc363f7d39a8098b91c0 Mon Sep 17 00:00:00 2001 From: "sijie.sun" Date: Sun, 12 May 2024 23:18:20 +0800 Subject: [PATCH] use latency from peer center for route --- easytier/src/common/config.rs | 2 + easytier/src/instance/instance.rs | 5 ++ easytier/src/peer_center/instance.rs | 3 +- easytier/src/peers/foreign_network_manager.rs | 11 +++- easytier/src/peers/peer_manager.rs | 64 +++++++++++++++---- easytier/src/peers/peer_map.rs | 22 +++++-- easytier/src/peers/peer_ospf_route.rs | 58 +++++++++++------ easytier/src/tunnel/packet_def.rs | 23 +++++-- 8 files changed, 145 insertions(+), 43 deletions(-) diff --git a/easytier/src/common/config.rs b/easytier/src/common/config.rs index 9604e37..76c1dd7 100644 --- a/easytier/src/common/config.rs +++ b/easytier/src/common/config.rs @@ -150,6 +150,8 @@ pub struct Flags { pub enable_ipv6: bool, #[derivative(Default(value = "1420"))] pub mtu: u16, + #[derivative(Default(value = "true"))] + pub latency_first: bool, } #[derive(Debug, Clone, Deserialize, Serialize, PartialEq)] diff --git a/easytier/src/instance/instance.rs b/easytier/src/instance/instance.rs index 07ea912..16cb932 100644 --- a/easytier/src/instance/instance.rs +++ b/easytier/src/instance/instance.rs @@ -302,6 +302,11 @@ impl Instance { self.udp_hole_puncher.lock().await.run().await?; self.peer_center.init().await; + let route_calc = self.peer_center.get_cost_calculator(); + self.peer_manager + .get_route() + .set_route_cost_fn(route_calc) + .await; self.add_initial_peers().await?; diff --git a/easytier/src/peer_center/instance.rs b/easytier/src/peer_center/instance.rs index 6e25f5e..35831a0 100644 --- a/easytier/src/peer_center/instance.rs +++ b/easytier/src/peer_center/instance.rs @@ -308,7 +308,7 @@ impl PeerCenterInstance { .get(&src) .and_then(|src_peer_info| src_peer_info.direct_peers.get(&dst)) .and_then(|info| Some(info.latency_ms)); - ret.unwrap_or(i32::MAX) + ret.unwrap_or(80) } fn begin_update(&mut self) { @@ -344,7 +344,6 @@ mod tests { peers::tests::{ connect_peer_manager, create_mock_peer_manager, wait_for_condition, wait_route_appear, }, - tunnel::common::tests::enable_log, }; use super::*; diff --git a/easytier/src/peers/foreign_network_manager.rs b/easytier/src/peers/foreign_network_manager.rs index f0b179b..35ef2f8 100644 --- a/easytier/src/peers/foreign_network_manager.rs +++ b/easytier/src/peers/foreign_network_manager.rs @@ -29,6 +29,7 @@ use super::{ peer_conn::PeerConn, peer_map::PeerMap, peer_rpc::{PeerRpcManager, PeerRpcManagerTransport}, + route_trait::NextHopPolicy, PacketRecvChan, PacketRecvChanReceiver, }; @@ -66,7 +67,10 @@ impl ForeignNetworkManagerData { .get(&network_name) .ok_or_else(|| Error::RouteError(Some("no peer in network".to_string())))? .clone(); - entry.peer_map.send_msg(msg, dst_peer_id).await + entry + .peer_map + .send_msg(msg, dst_peer_id, NextHopPolicy::LeastHop) + .await } fn get_peer_network(&self, peer_id: PeerId) -> Option { @@ -275,7 +279,10 @@ impl ForeignNetworkManager { } if let Some(entry) = data.get_network_entry(&from_network) { - let ret = entry.peer_map.send_msg(packet_bytes, to_peer_id).await; + let ret = entry + .peer_map + .send_msg(packet_bytes, to_peer_id, NextHopPolicy::LeastHop) + .await; if ret.is_err() { tracing::error!("forward packet to peer failed: {:?}", ret.err()); } diff --git a/easytier/src/peers/peer_manager.rs b/easytier/src/peers/peer_manager.rs index a2d7e34..e040d25 100644 --- a/easytier/src/peers/peer_manager.rs +++ b/easytier/src/peers/peer_manager.rs @@ -22,7 +22,9 @@ use tokio_util::bytes::Bytes; use crate::{ common::{error::Error, global_ctx::ArcGlobalCtx, PeerId}, peers::{ - peer_conn::PeerConn, peer_rpc::PeerRpcManagerTransport, route_trait::RouteInterface, + peer_conn::PeerConn, + peer_rpc::PeerRpcManagerTransport, + route_trait::{NextHopPolicy, RouteInterface}, PeerPacketFilter, }, tunnel::{ @@ -73,7 +75,10 @@ impl PeerRpcManagerTransport for RpcTransport { .ok_or(Error::Unknown)?; let peers = self.peers.upgrade().ok_or(Error::Unknown)?; - if let Some(gateway_id) = peers.get_gateway_peer_id(dst_peer_id).await { + if let Some(gateway_id) = peers + .get_gateway_peer_id(dst_peer_id, NextHopPolicy::LeastHop) + .await + { tracing::trace!( ?dst_peer_id, ?gateway_id, @@ -320,6 +325,7 @@ impl PeerManager { let my_peer_id = self.my_peer_id; let peers = self.peers.clone(); let pipe_line = self.peer_packet_process_pipeline.clone(); + let foreign_client = self.foreign_network_client.clone(); let encryptor = self.encryptor.clone(); self.tasks.lock().await.spawn(async move { log::trace!("start_peer_recv"); @@ -332,14 +338,20 @@ impl PeerManager { let from_peer_id = hdr.from_peer_id.get(); let to_peer_id = hdr.to_peer_id.get(); if to_peer_id != my_peer_id { - if hdr.ttl <= 1 { - tracing::warn!(?hdr, "ttl is 0, drop packet"); + if hdr.forward_counter > 7 { + tracing::warn!(?hdr, "forward counter exceed, drop packet"); continue; } - hdr.ttl -= 1; + if hdr.forward_counter > 2 && hdr.is_latency_first() { + tracing::trace!(?hdr, "set_latency_first false because too many hop"); + hdr.set_latency_first(false); + } + + hdr.forward_counter += 1; tracing::trace!(?to_peer_id, ?my_peer_id, "need forward"); - let ret = peers.send_msg(ret, to_peer_id).await; + let ret = + Self::send_msg_internal(&peers, &foreign_client, ret, to_peer_id).await; if ret.is_err() { tracing::error!(?ret, ?to_peer_id, ?from_peer_id, "forward packet error"); } @@ -524,11 +536,31 @@ impl PeerManager { } } + fn get_next_hop_policy(is_first_latency: bool) -> NextHopPolicy { + if is_first_latency { + NextHopPolicy::LeastCost + } else { + NextHopPolicy::LeastHop + } + } + pub async fn send_msg(&self, msg: ZCPacket, dst_peer_id: PeerId) -> Result<(), Error> { - if let Some(gateway) = self.peers.get_gateway_peer_id(dst_peer_id).await { - self.peers.send_msg_directly(msg, gateway).await - } else if self.foreign_network_client.has_next_hop(dst_peer_id) { - self.foreign_network_client.send_msg(msg, dst_peer_id).await + Self::send_msg_internal(&self.peers, &self.foreign_network_client, msg, dst_peer_id).await + } + + async fn send_msg_internal( + peers: &Arc, + foreign_network_client: &Arc, + msg: ZCPacket, + dst_peer_id: PeerId, + ) -> Result<(), Error> { + let policy = + Self::get_next_hop_policy(msg.peer_manager_header().unwrap().is_latency_first()); + + if let Some(gateway) = peers.get_gateway_peer_id(dst_peer_id, policy).await { + peers.send_msg_directly(msg, gateway).await + } else if foreign_network_client.has_next_hop(dst_peer_id) { + foreign_network_client.send_msg(msg, dst_peer_id).await } else { Err(Error::RouteError(None)) } @@ -570,6 +602,12 @@ impl PeerManager { .encrypt(&mut msg) .with_context(|| "encrypt failed")?; + let is_latency_first = self.global_ctx.get_flags().latency_first; + msg.mut_peer_manager_header() + .unwrap() + .set_latency_first(is_latency_first); + let next_hop_policy = Self::get_next_hop_policy(is_latency_first); + let mut errs: Vec = vec![]; let mut msg = Some(msg); @@ -587,7 +625,11 @@ impl PeerManager { .to_peer_id .set(*peer_id); - if let Some(gateway) = self.peers.get_gateway_peer_id(*peer_id).await { + if let Some(gateway) = self + .peers + .get_gateway_peer_id(*peer_id, next_hop_policy.clone()) + .await + { if let Err(e) = self.peers.send_msg_directly(msg, gateway).await { errs.push(e); } diff --git a/easytier/src/peers/peer_map.rs b/easytier/src/peers/peer_map.rs index 821c903..e0bbfa7 100644 --- a/easytier/src/peers/peer_map.rs +++ b/easytier/src/peers/peer_map.rs @@ -18,7 +18,7 @@ use crate::{ use super::{ peer::Peer, peer_conn::{PeerConn, PeerConnId}, - route_trait::ArcRoute, + route_trait::{ArcRoute, NextHopPolicy}, PacketRecvChan, }; @@ -94,7 +94,11 @@ impl PeerMap { Ok(()) } - pub async fn get_gateway_peer_id(&self, dst_peer_id: PeerId) -> Option { + pub async fn get_gateway_peer_id( + &self, + dst_peer_id: PeerId, + policy: NextHopPolicy, + ) -> Option { if dst_peer_id == self.my_peer_id { return Some(dst_peer_id); } @@ -105,7 +109,10 @@ impl PeerMap { // get route info for route in self.routes.read().await.iter() { - if let Some(gateway_peer_id) = route.get_next_hop(dst_peer_id).await { + if let Some(gateway_peer_id) = route + .get_next_hop_with_policy(dst_peer_id, policy.clone()) + .await + { // for foreign network, gateway_peer_id may not connect to me if self.has_peer(gateway_peer_id) { return Some(gateway_peer_id); @@ -116,8 +123,13 @@ impl PeerMap { None } - pub async fn send_msg(&self, msg: ZCPacket, dst_peer_id: PeerId) -> Result<(), Error> { - let Some(gateway_peer_id) = self.get_gateway_peer_id(dst_peer_id).await else { + pub async fn send_msg( + &self, + msg: ZCPacket, + dst_peer_id: PeerId, + policy: NextHopPolicy, + ) -> Result<(), Error> { + let Some(gateway_peer_id) = self.get_gateway_peer_id(dst_peer_id, policy).await else { return Err(Error::RouteError(Some(format!( "peer map sengmsg no gateway for dst_peer_id: {}", dst_peer_id diff --git a/easytier/src/peers/peer_ospf_route.rs b/easytier/src/peers/peer_ospf_route.rs index 614bab7..125bf4a 100644 --- a/easytier/src/peers/peer_ospf_route.rs +++ b/easytier/src/peers/peer_ospf_route.rs @@ -400,11 +400,42 @@ impl RouteTable { .map(|x| NatType::try_from(x.udp_stun_info as i32).unwrap()) } + fn find_path_with_least_cost( + my_peer_id: PeerId, + peer_id: PeerId, + synced_info: &SyncedRouteInfo, + cost_calc: &mut T, + ) -> Option> { + let Some((path, _cost)): Option<(Vec, i32)> = pathfinding::prelude::dijkstra( + &my_peer_id, + |src_peer| { + synced_info + .get_connected_peers(*src_peer) + .unwrap_or_else(|| BTreeSet::new()) + .into_iter() + .map(|dst_peer| { + let cost = cost_calc.calculate_cost(*src_peer, dst_peer); + (dst_peer, cost) + }) + .collect::>() + }, + |x| *x == peer_id, + ) else { + return None; + }; + + if !path.is_empty() { + Some(path) + } else { + None + } + } + fn build_from_synced_info( &self, my_peer_id: PeerId, synced_info: &SyncedRouteInfo, - cost_calc: T, + mut cost_calc: T, ) { // build peer_infos self.peer_infos.clear(); @@ -427,24 +458,11 @@ impl RouteTable { if peer_id == my_peer_id { continue; } - let Some((path, _cost)): Option<(Vec, i32)> = pathfinding::prelude::dijkstra( - &my_peer_id, - |src_peer| { - synced_info - .get_connected_peers(*src_peer) - .unwrap_or_else(|| BTreeSet::new()) - .into_iter() - .map(|dst_peer| { - let cost = cost_calc.calculate_cost(*src_peer, dst_peer); - (dst_peer, cost) - }) - .collect::>() - }, - |x| *x == peer_id, - ) else { - continue; - }; - if !path.is_empty() { + + let path = + Self::find_path_with_least_cost(my_peer_id, peer_id, synced_info, &mut cost_calc); + + if let Some(path) = path { assert!(path.len() >= 2); self.next_hop_map .insert(peer_id, (path[1], (path.len() - 1) as i32)); @@ -1226,6 +1244,7 @@ impl PeerRoute { session_mgr.maintain_sessions(service_impl).await; } + #[tracing::instrument(skip(session_mgr))] async fn update_my_peer_info_routine( service_impl: Arc, session_mgr: RouteSessionManager, @@ -1237,6 +1256,7 @@ impl PeerRoute { } if service_impl.cost_calculator_need_update() { + tracing::debug!("cost_calculator_need_update"); service_impl.update_route_table(); } diff --git a/easytier/src/tunnel/packet_def.rs b/easytier/src/tunnel/packet_def.rs index dd0a106..567bfa6 100644 --- a/easytier/src/tunnel/packet_def.rs +++ b/easytier/src/tunnel/packet_def.rs @@ -7,8 +7,6 @@ use zerocopy::FromZeroes; type DefaultEndian = LittleEndian; -pub const DEFAULT_TTL: u8 = 8; - // TCP TunnelHeader #[repr(C, packed)] #[derive(AsBytes, FromBytes, FromZeroes, Clone, Debug, Default)] @@ -61,6 +59,7 @@ pub enum PacketType { bitflags::bitflags! { struct PeerManagerHeaderFlags: u8 { const ENCRYPTED = 0b0000_0001; + const LATENCY_FIRST = 0b0000_0010; } } @@ -71,7 +70,7 @@ pub struct PeerManagerHeader { pub to_peer_id: U32, pub packet_type: u8, pub flags: u8, - pub ttl: u8, + pub forward_counter: u8, reserved: u8, pub len: U32, } @@ -93,6 +92,22 @@ impl PeerManagerHeader { } self.flags = flags.bits(); } + + pub fn is_latency_first(&self) -> bool { + PeerManagerHeaderFlags::from_bits(self.flags) + .unwrap() + .contains(PeerManagerHeaderFlags::LATENCY_FIRST) + } + + pub fn set_latency_first(&mut self, latency_first: bool) { + let mut flags = PeerManagerHeaderFlags::from_bits(self.flags).unwrap(); + if latency_first { + flags.insert(PeerManagerHeaderFlags::LATENCY_FIRST); + } else { + flags.remove(PeerManagerHeaderFlags::LATENCY_FIRST); + } + self.flags = flags.bits(); + } } // reserve the space for aes tag and nonce @@ -365,7 +380,7 @@ impl ZCPacket { hdr.to_peer_id.set(to_peer_id); hdr.packet_type = packet_type; hdr.flags = 0; - hdr.ttl = DEFAULT_TTL; + hdr.forward_counter = 1; hdr.len.set(payload_len as u32); }