use latency from peer center for route

This commit is contained in:
sijie.sun
2024-05-12 23:18:20 +08:00
parent 09ebed157e
commit 29365c39ed
8 changed files with 145 additions and 43 deletions

View File

@@ -22,7 +22,9 @@ use tokio_util::bytes::Bytes;
use crate::{
common::{error::Error, global_ctx::ArcGlobalCtx, PeerId},
peers::{
peer_conn::PeerConn, peer_rpc::PeerRpcManagerTransport, route_trait::RouteInterface,
peer_conn::PeerConn,
peer_rpc::PeerRpcManagerTransport,
route_trait::{NextHopPolicy, RouteInterface},
PeerPacketFilter,
},
tunnel::{
@@ -73,7 +75,10 @@ impl PeerRpcManagerTransport for RpcTransport {
.ok_or(Error::Unknown)?;
let peers = self.peers.upgrade().ok_or(Error::Unknown)?;
if let Some(gateway_id) = peers.get_gateway_peer_id(dst_peer_id).await {
if let Some(gateway_id) = peers
.get_gateway_peer_id(dst_peer_id, NextHopPolicy::LeastHop)
.await
{
tracing::trace!(
?dst_peer_id,
?gateway_id,
@@ -320,6 +325,7 @@ impl PeerManager {
let my_peer_id = self.my_peer_id;
let peers = self.peers.clone();
let pipe_line = self.peer_packet_process_pipeline.clone();
let foreign_client = self.foreign_network_client.clone();
let encryptor = self.encryptor.clone();
self.tasks.lock().await.spawn(async move {
log::trace!("start_peer_recv");
@@ -332,14 +338,20 @@ impl PeerManager {
let from_peer_id = hdr.from_peer_id.get();
let to_peer_id = hdr.to_peer_id.get();
if to_peer_id != my_peer_id {
if hdr.ttl <= 1 {
tracing::warn!(?hdr, "ttl is 0, drop packet");
if hdr.forward_counter > 7 {
tracing::warn!(?hdr, "forward counter exceed, drop packet");
continue;
}
hdr.ttl -= 1;
if hdr.forward_counter > 2 && hdr.is_latency_first() {
tracing::trace!(?hdr, "set_latency_first false because too many hop");
hdr.set_latency_first(false);
}
hdr.forward_counter += 1;
tracing::trace!(?to_peer_id, ?my_peer_id, "need forward");
let ret = peers.send_msg(ret, to_peer_id).await;
let ret =
Self::send_msg_internal(&peers, &foreign_client, ret, to_peer_id).await;
if ret.is_err() {
tracing::error!(?ret, ?to_peer_id, ?from_peer_id, "forward packet error");
}
@@ -524,11 +536,31 @@ impl PeerManager {
}
}
fn get_next_hop_policy(is_first_latency: bool) -> NextHopPolicy {
if is_first_latency {
NextHopPolicy::LeastCost
} else {
NextHopPolicy::LeastHop
}
}
pub async fn send_msg(&self, msg: ZCPacket, dst_peer_id: PeerId) -> Result<(), Error> {
if let Some(gateway) = self.peers.get_gateway_peer_id(dst_peer_id).await {
self.peers.send_msg_directly(msg, gateway).await
} else if self.foreign_network_client.has_next_hop(dst_peer_id) {
self.foreign_network_client.send_msg(msg, dst_peer_id).await
Self::send_msg_internal(&self.peers, &self.foreign_network_client, msg, dst_peer_id).await
}
async fn send_msg_internal(
peers: &Arc<PeerMap>,
foreign_network_client: &Arc<ForeignNetworkClient>,
msg: ZCPacket,
dst_peer_id: PeerId,
) -> Result<(), Error> {
let policy =
Self::get_next_hop_policy(msg.peer_manager_header().unwrap().is_latency_first());
if let Some(gateway) = peers.get_gateway_peer_id(dst_peer_id, policy).await {
peers.send_msg_directly(msg, gateway).await
} else if foreign_network_client.has_next_hop(dst_peer_id) {
foreign_network_client.send_msg(msg, dst_peer_id).await
} else {
Err(Error::RouteError(None))
}
@@ -570,6 +602,12 @@ impl PeerManager {
.encrypt(&mut msg)
.with_context(|| "encrypt failed")?;
let is_latency_first = self.global_ctx.get_flags().latency_first;
msg.mut_peer_manager_header()
.unwrap()
.set_latency_first(is_latency_first);
let next_hop_policy = Self::get_next_hop_policy(is_latency_first);
let mut errs: Vec<Error> = vec![];
let mut msg = Some(msg);
@@ -587,7 +625,11 @@ impl PeerManager {
.to_peer_id
.set(*peer_id);
if let Some(gateway) = self.peers.get_gateway_peer_id(*peer_id).await {
if let Some(gateway) = self
.peers
.get_gateway_peer_id(*peer_id, next_hop_policy.clone())
.await
{
if let Err(e) = self.peers.send_msg_directly(msg, gateway).await {
errs.push(e);
}