mirror of
https://mirror.suhoan.cn/https://github.com/EasyTier/EasyTier.git
synced 2025-12-12 04:37:23 +08:00
make ospf route more effiencient
This commit is contained in:
10
Cargo.lock
generated
10
Cargo.lock
generated
@@ -2136,6 +2136,7 @@ dependencies = [
|
||||
"nix 0.29.0",
|
||||
"once_cell",
|
||||
"openssl",
|
||||
"ordered_hash_map",
|
||||
"parking_lot",
|
||||
"percent-encoding",
|
||||
"petgraph 0.8.1",
|
||||
@@ -5498,6 +5499,15 @@ dependencies = [
|
||||
"pin-project-lite",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ordered_hash_map"
|
||||
version = "0.5.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a6c699f8a30f345785be969deed7eee4c73a5de58c7faf61d6a3251ef798ff61"
|
||||
dependencies = [
|
||||
"hashbrown 0.15.3",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "os_info"
|
||||
version = "3.8.2"
|
||||
|
||||
@@ -143,6 +143,7 @@ network-interface = "2.0"
|
||||
# for ospf route
|
||||
petgraph = "0.8.1"
|
||||
hashbrown = "0.15.3"
|
||||
ordered_hash_map = "0.5.0"
|
||||
|
||||
# for wireguard
|
||||
boringtun = { package = "boringtun-easytier", version = "0.6.1", optional = true }
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use std::{
|
||||
collections::{BTreeMap, BTreeSet, HashMap},
|
||||
collections::{BTreeMap, BTreeSet, HashMap, VecDeque},
|
||||
fmt::Debug,
|
||||
net::{IpAddr, Ipv4Addr, Ipv6Addr},
|
||||
sync::{
|
||||
@@ -13,6 +13,8 @@ use arc_swap::ArcSwap;
|
||||
use cidr::{IpCidr, Ipv4Cidr, Ipv6Cidr};
|
||||
use crossbeam::atomic::AtomicCell;
|
||||
use dashmap::DashMap;
|
||||
use ordered_hash_map::OrderedHashMap;
|
||||
use parking_lot::{lock_api::RwLockUpgradableReadGuard, RwLock};
|
||||
use petgraph::{
|
||||
algo::dijkstra,
|
||||
graph::{Graph, NodeIndex},
|
||||
@@ -41,7 +43,7 @@ use crate::{
|
||||
route_foreign_network_infos, route_foreign_network_summary,
|
||||
sync_route_info_request::ConnInfo, ForeignNetworkRouteInfoEntry,
|
||||
ForeignNetworkRouteInfoKey, OspfRouteRpc, OspfRouteRpcClientFactory,
|
||||
OspfRouteRpcServer, PeerIdVersion, RouteForeignNetworkInfos,
|
||||
OspfRouteRpcServer, PeerGroupInfo, PeerIdVersion, RouteForeignNetworkInfos,
|
||||
RouteForeignNetworkSummary, RoutePeerInfo, RoutePeerInfos, SyncRouteInfoError,
|
||||
SyncRouteInfoRequest, SyncRouteInfoResponse,
|
||||
},
|
||||
@@ -70,6 +72,12 @@ static REMOVE_DEAD_PEER_INFO_AFTER: Duration = Duration::from_secs(3660);
|
||||
static AVOID_RELAY_COST: usize = i32::MAX as usize;
|
||||
static FORCE_USE_CONN_LIST: AtomicBool = AtomicBool::new(false);
|
||||
|
||||
// if a peer is unreachable for `REMOVE_UNREACHABLE_PEER_INFO_AFTER` time, we can remove it because
|
||||
// 1. all the ospf sessions between two zone are already destroy, new created session will resend the peer info.
|
||||
// 2. all the dst_saved_peer_info_version in all sessions already remove the peer info, the peer info will be propagated
|
||||
// in another zone when two zone restore the conneciton.
|
||||
static REMOVE_UNREACHABLE_PEER_INFO_AFTER: Duration = Duration::from_secs(90);
|
||||
|
||||
type Version = u32;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -124,7 +132,9 @@ impl RoutePeerInfo {
|
||||
proxy_cidrs: Vec::new(),
|
||||
hostname: None,
|
||||
udp_stun_info: 0,
|
||||
last_update: Some(SystemTime::now().into()),
|
||||
// ensure this is updated when the peer_infos/conn_info/foreign_network lock is acquired.
|
||||
// else we may assign a older timestamp than iterate time.
|
||||
last_update: None,
|
||||
version: 0,
|
||||
easytier_version: EASYTIER_VERSION.to_string(),
|
||||
feature_flag: None,
|
||||
@@ -136,13 +146,12 @@ impl RoutePeerInfo {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn update_self(
|
||||
&self,
|
||||
pub fn new_updated_self(
|
||||
my_peer_id: PeerId,
|
||||
peer_route_id: u64,
|
||||
global_ctx: &ArcGlobalCtx,
|
||||
) -> Self {
|
||||
let mut new = Self {
|
||||
Self {
|
||||
peer_id: my_peer_id,
|
||||
inst_id: Some(global_ctx.get_id().into()),
|
||||
cost: 0,
|
||||
@@ -160,9 +169,10 @@ impl RoutePeerInfo {
|
||||
.get_stun_info_collector()
|
||||
.get_stun_info()
|
||||
.udp_nat_type,
|
||||
// following fields do not participate in comparison.
|
||||
last_update: self.last_update,
|
||||
version: self.version,
|
||||
|
||||
// these two fields should not participate in comparison.
|
||||
last_update: None,
|
||||
version: 0,
|
||||
|
||||
easytier_version: EASYTIER_VERSION.to_string(),
|
||||
feature_flag: Some(global_ctx.get_feature_flags()),
|
||||
@@ -176,22 +186,28 @@ impl RoutePeerInfo {
|
||||
ipv6_addr: global_ctx.get_ipv6().map(|x| x.into()),
|
||||
|
||||
groups: global_ctx.get_acl_groups(my_peer_id),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
pub fn try_update_new_peer_info(old: &RoutePeerInfo, new: &mut RoutePeerInfo) -> bool {
|
||||
let need_update_periodically = if let Ok(Ok(d)) =
|
||||
SystemTime::try_from(new.last_update.unwrap_or_default()).map(|x| x.elapsed())
|
||||
SystemTime::try_from(old.last_update.unwrap_or_default()).map(|x| x.elapsed())
|
||||
{
|
||||
d > UPDATE_PEER_INFO_PERIOD
|
||||
} else {
|
||||
true
|
||||
};
|
||||
|
||||
if new != *self || need_update_periodically {
|
||||
new.last_update = Some(SystemTime::now().into());
|
||||
new.version += 1;
|
||||
}
|
||||
// these two fields should not participate in comparison.
|
||||
new.version = old.version;
|
||||
new.last_update = old.last_update;
|
||||
|
||||
new
|
||||
if *new != *old || need_update_periodically {
|
||||
new.version += 1;
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -261,7 +277,7 @@ type Error = SyncRouteInfoError;
|
||||
|
||||
// constructed with all infos synced from all peers.
|
||||
struct SyncedRouteInfo {
|
||||
peer_infos: DashMap<PeerId, RoutePeerInfo>,
|
||||
peer_infos: RwLock<OrderedHashMap<PeerId, RoutePeerInfo>>,
|
||||
// prost doesn't support unknown fields, so we use DynamicMessage to store raw infos and progate them to other peers.
|
||||
raw_peer_infos: DashMap<PeerId, DynamicMessage>,
|
||||
conn_map: DashMap<PeerId, (BTreeSet<PeerId>, AtomicVersion)>,
|
||||
@@ -293,14 +309,13 @@ impl SyncedRouteInfo {
|
||||
|
||||
fn remove_peer(&self, peer_id: PeerId) {
|
||||
tracing::warn!(?peer_id, "remove_peer from synced_route_info");
|
||||
self.peer_infos.remove(&peer_id);
|
||||
self.peer_infos.write().remove(&peer_id);
|
||||
self.raw_peer_infos.remove(&peer_id);
|
||||
self.conn_map.remove(&peer_id);
|
||||
self.foreign_network.retain(|k, _| k.peer_id != peer_id);
|
||||
self.group_trust_map.remove(&peer_id);
|
||||
self.group_trust_map_cache.remove(&peer_id);
|
||||
|
||||
shrink_dashmap(&self.peer_infos, None);
|
||||
shrink_dashmap(&self.raw_peer_infos, None);
|
||||
shrink_dashmap(&self.conn_map, None);
|
||||
shrink_dashmap(&self.foreign_network, None);
|
||||
@@ -313,10 +328,16 @@ impl SyncedRouteInfo {
|
||||
fn fill_empty_peer_info(&self, peer_ids: &BTreeSet<PeerId>) {
|
||||
let mut need_inc_version = false;
|
||||
for peer_id in peer_ids {
|
||||
self.peer_infos.entry(*peer_id).or_insert_with(|| {
|
||||
let guard = self.peer_infos.upgradable_read();
|
||||
if !guard.contains_key(peer_id) {
|
||||
let mut peer_info = RoutePeerInfo::new();
|
||||
let mut guard = RwLockUpgradableReadGuard::upgrade(guard);
|
||||
peer_info.last_update = Some(SystemTime::now().into());
|
||||
guard.insert(*peer_id, peer_info);
|
||||
need_inc_version = true;
|
||||
RoutePeerInfo::new()
|
||||
});
|
||||
} else {
|
||||
drop(guard);
|
||||
}
|
||||
|
||||
self.conn_map.entry(*peer_id).or_insert_with(|| {
|
||||
need_inc_version = true;
|
||||
@@ -330,6 +351,7 @@ impl SyncedRouteInfo {
|
||||
|
||||
fn get_peer_info_version_with_default(&self, peer_id: PeerId) -> Version {
|
||||
self.peer_infos
|
||||
.read()
|
||||
.get(&peer_id)
|
||||
.map(|x| x.version)
|
||||
.unwrap_or(0)
|
||||
@@ -338,8 +360,9 @@ impl SyncedRouteInfo {
|
||||
fn get_avoid_relay_data(&self, peer_id: PeerId) -> bool {
|
||||
// if avoid relay, just set all outgoing edges to a large value: AVOID_RELAY_COST.
|
||||
self.peer_infos
|
||||
.read()
|
||||
.get(&peer_id)
|
||||
.and_then(|x| x.value().feature_flag)
|
||||
.and_then(|x| x.feature_flag)
|
||||
.map(|x| x.avoid_relay_data)
|
||||
.unwrap_or_default()
|
||||
}
|
||||
@@ -395,7 +418,10 @@ impl SyncedRouteInfo {
|
||||
my_peer_route_id,
|
||||
dst_peer_id,
|
||||
if route_info.peer_id == dst_peer_id {
|
||||
self.peer_infos.get(&dst_peer_id).map(|x| x.peer_route_id)
|
||||
self.peer_infos
|
||||
.read()
|
||||
.get(&dst_peer_id)
|
||||
.map(|x| x.peer_route_id)
|
||||
} else {
|
||||
None
|
||||
},
|
||||
@@ -409,26 +435,19 @@ impl SyncedRouteInfo {
|
||||
.unwrap();
|
||||
assert_eq!(peer_id_raw, route_info.peer_id);
|
||||
|
||||
let mut guard = self.peer_infos.write();
|
||||
// time between peers may not be synchronized, so update last_update to local now.
|
||||
// note only last_update with larger version will be updated to local saved peer info.
|
||||
route_info.last_update = Some(SystemTime::now().into());
|
||||
|
||||
self.peer_infos
|
||||
.entry(route_info.peer_id)
|
||||
.and_modify(|old_entry| {
|
||||
if route_info.version > old_entry.version {
|
||||
self.raw_peer_infos
|
||||
.insert(route_info.peer_id, raw_route_info.clone());
|
||||
*old_entry = route_info.clone();
|
||||
need_inc_version = true;
|
||||
}
|
||||
})
|
||||
.or_insert_with(|| {
|
||||
need_inc_version = true;
|
||||
self.raw_peer_infos
|
||||
.insert(route_info.peer_id, raw_route_info.clone());
|
||||
route_info.clone()
|
||||
});
|
||||
if guard
|
||||
.get_mut(&route_info.peer_id)
|
||||
.is_none_or(|old| route_info.version > old.version)
|
||||
{
|
||||
self.raw_peer_infos
|
||||
.insert(route_info.peer_id, raw_route_info.clone());
|
||||
guard.insert(route_info.peer_id, route_info);
|
||||
need_inc_version = true;
|
||||
}
|
||||
}
|
||||
if need_inc_version {
|
||||
self.version.inc();
|
||||
@@ -535,15 +554,34 @@ impl SyncedRouteInfo {
|
||||
my_peer_route_id: u64,
|
||||
global_ctx: &ArcGlobalCtx,
|
||||
) -> bool {
|
||||
let mut old = self.peer_infos.entry(my_peer_id).or_default();
|
||||
let new = old.update_self(my_peer_id, my_peer_route_id, global_ctx);
|
||||
let new_version = new.version;
|
||||
let old_version = old.version;
|
||||
*old = new;
|
||||
drop(old);
|
||||
let mut new = RoutePeerInfo::new_updated_self(my_peer_id, my_peer_route_id, global_ctx);
|
||||
let mut guard = self.peer_infos.upgradable_read();
|
||||
let old = guard.get(&my_peer_id);
|
||||
let new_version = old.map(|x| x.version).unwrap_or(0) + 1;
|
||||
let need_insert_new = if let Some(old) = old {
|
||||
RoutePeerInfo::try_update_new_peer_info(old, &mut new)
|
||||
} else {
|
||||
true
|
||||
};
|
||||
|
||||
if need_insert_new {
|
||||
let acl_groups = if old.map(|x| x.groups != new.groups).unwrap_or(true) {
|
||||
Some(new.groups.clone())
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
guard.with_upgraded(|peer_infos| {
|
||||
new.last_update = Some(SystemTime::now().into());
|
||||
new.version = new_version;
|
||||
peer_infos.insert(my_peer_id, new)
|
||||
});
|
||||
drop(guard);
|
||||
|
||||
if let Some(acl_groups) = acl_groups {
|
||||
self.update_my_group_trusts(my_peer_id, &acl_groups);
|
||||
}
|
||||
|
||||
if new_version != old_version {
|
||||
self.update_my_group_trusts(my_peer_id);
|
||||
self.version.inc();
|
||||
true
|
||||
} else {
|
||||
@@ -634,6 +672,28 @@ impl SyncedRouteInfo {
|
||||
updated
|
||||
}
|
||||
|
||||
fn get_next_last_sync_succ_timestamp(&self) -> SystemTime {
|
||||
let _peer_info_lock = self.peer_infos.read();
|
||||
// TODO: add conn and foreign network lock
|
||||
|
||||
SystemTime::now()
|
||||
}
|
||||
|
||||
fn check_peer_info_last_update_monotonic_increasing(&self) {
|
||||
let mut last_update: Option<prost_types::Timestamp> = None;
|
||||
for peer_info in self.peer_infos.read().values() {
|
||||
if let Some(last_update) = last_update {
|
||||
let cur_last_update = peer_info.last_update.unwrap();
|
||||
assert!(
|
||||
cur_last_update.seconds > last_update.seconds
|
||||
|| cur_last_update.nanos >= last_update.nanos,
|
||||
"peer info last update not monotonic increasing"
|
||||
);
|
||||
}
|
||||
last_update = peer_info.last_update;
|
||||
}
|
||||
}
|
||||
|
||||
fn is_peer_bidirectly_connected(&self, src_peer_id: PeerId, dst_peer_id: PeerId) -> bool {
|
||||
self.conn_map
|
||||
.get(&src_peer_id)
|
||||
@@ -725,13 +785,15 @@ impl SyncedRouteInfo {
|
||||
}
|
||||
}
|
||||
|
||||
fn update_my_group_trusts(&self, my_peer_id: PeerId) {
|
||||
fn update_my_group_trusts(&self, my_peer_id: PeerId, groups: &[PeerGroupInfo]) {
|
||||
let mut my_group_map = HashMap::new();
|
||||
let mut my_group_names = Vec::new();
|
||||
for group in self.peer_infos.entry(my_peer_id).or_default().groups.iter() {
|
||||
|
||||
for group in groups.iter() {
|
||||
my_group_map.insert(group.group_name.clone(), group.group_proof.clone());
|
||||
my_group_names.push(group.group_name.clone());
|
||||
}
|
||||
|
||||
self.group_trust_map.insert(my_peer_id, my_group_map);
|
||||
self.group_trust_map_cache
|
||||
.insert(my_peer_id, Arc::new(my_group_names));
|
||||
@@ -749,21 +811,16 @@ struct NextHopInfo {
|
||||
}
|
||||
// dst_peer_id -> (next_hop_peer_id, cost, path_len)
|
||||
type NextHopMap = DashMap<PeerId, NextHopInfo>;
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
struct PeerIdAndVersion {
|
||||
peer_id: PeerId,
|
||||
version: Version,
|
||||
}
|
||||
|
||||
// computed with SyncedRouteInfo. used to get next hop.
|
||||
#[derive(Debug)]
|
||||
struct RouteTable {
|
||||
peer_infos: DashMap<PeerId, RoutePeerInfo>,
|
||||
next_hop_map: NextHopMap,
|
||||
ipv4_peer_id_map: DashMap<Ipv4Addr, PeerIdAndVersion>,
|
||||
ipv6_peer_id_map: DashMap<Ipv6Addr, PeerIdAndVersion>,
|
||||
cidr_peer_id_map: ArcSwap<PrefixMap<Ipv4Cidr, PeerIdAndVersion>>,
|
||||
cidr_v6_peer_id_map: ArcSwap<PrefixMap<Ipv6Cidr, PeerIdAndVersion>>,
|
||||
ipv4_peer_id_map: DashMap<Ipv4Addr, PeerIdVersion>,
|
||||
ipv6_peer_id_map: DashMap<Ipv6Addr, PeerIdVersion>,
|
||||
cidr_peer_id_map: ArcSwap<PrefixMap<Ipv4Cidr, PeerIdVersion>>,
|
||||
cidr_v6_peer_id_map: ArcSwap<PrefixMap<Ipv6Cidr, PeerIdVersion>>,
|
||||
next_hop_map_version: AtomicVersion,
|
||||
}
|
||||
|
||||
@@ -811,18 +868,17 @@ impl RouteTable {
|
||||
|
||||
let mut start_node_idx = None;
|
||||
let peer_id_to_node_index: PeerIdToNodexIdxMap = DashMap::new();
|
||||
for item in synced_info.peer_infos.iter() {
|
||||
let peer_id = item.key();
|
||||
let info = item.value();
|
||||
for (peer_id, info) in synced_info.peer_infos.read().iter() {
|
||||
let peer_id = *peer_id;
|
||||
|
||||
if info.version == 0 {
|
||||
continue;
|
||||
}
|
||||
|
||||
let node_idx = graph.add_node(*peer_id);
|
||||
let node_idx = graph.add_node(peer_id);
|
||||
|
||||
peer_id_to_node_index.insert(*peer_id, node_idx);
|
||||
if *peer_id == my_peer_id {
|
||||
peer_id_to_node_index.insert(peer_id, node_idx);
|
||||
if peer_id == my_peer_id {
|
||||
start_node_idx = Some(node_idx);
|
||||
}
|
||||
}
|
||||
@@ -991,18 +1047,18 @@ impl RouteTable {
|
||||
}
|
||||
|
||||
let peer_id = item.key();
|
||||
let Some(info) = synced_info.peer_infos.get(peer_id) else {
|
||||
let Some(info) = synced_info.peer_infos.read().get(peer_id).cloned() else {
|
||||
continue;
|
||||
};
|
||||
|
||||
self.peer_infos.insert(*peer_id, info.clone());
|
||||
|
||||
let peer_id_and_version = PeerIdAndVersion {
|
||||
let peer_id_and_version = PeerIdVersion {
|
||||
peer_id: *peer_id,
|
||||
version,
|
||||
};
|
||||
|
||||
let is_new_peer_better = |old_peer: &PeerIdAndVersion| -> bool {
|
||||
let is_new_peer_better = |old_peer: &PeerIdVersion| -> bool {
|
||||
if peer_id_and_version.version > old_peer.version {
|
||||
return true;
|
||||
}
|
||||
@@ -1194,6 +1250,11 @@ struct SyncRouteSession {
|
||||
dst_saved_conn_info_version: DashMap<PeerId, VersionAndTouchTime>,
|
||||
dst_saved_foreign_network_versions: DashMap<ForeignNetworkRouteInfoKey, VersionAndTouchTime>,
|
||||
|
||||
// we don't want to send unreachable peer infos to peer, so we keep track of them.
|
||||
unreachable_peers: parking_lot::Mutex<VecDeque<PeerIdVersion>>,
|
||||
|
||||
last_sync_succ_timestamp: AtomicCell<Option<SystemTime>>,
|
||||
|
||||
my_session_id: AtomicSessionId,
|
||||
dst_session_id: AtomicSessionId,
|
||||
|
||||
@@ -1218,6 +1279,10 @@ impl SyncRouteSession {
|
||||
dst_saved_conn_info_version: DashMap::new(),
|
||||
dst_saved_foreign_network_versions: DashMap::new(),
|
||||
|
||||
unreachable_peers: parking_lot::Mutex::new(VecDeque::new()),
|
||||
|
||||
last_sync_succ_timestamp: AtomicCell::new(None),
|
||||
|
||||
my_session_id: AtomicSessionId::new(rand::random()),
|
||||
dst_session_id: AtomicSessionId::new(0),
|
||||
|
||||
@@ -1363,12 +1428,15 @@ impl SyncRouteSession {
|
||||
self.need_sync_initiator_info.store(true, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
// return whether session id is updated
|
||||
fn update_dst_session_id(&self, session_id: SessionId) {
|
||||
if session_id != self.dst_session_id.load(Ordering::Relaxed) {
|
||||
tracing::warn!(?self, ?session_id, "session id mismatch, clear saved info.");
|
||||
self.dst_session_id.store(session_id, Ordering::Relaxed);
|
||||
self.dst_saved_conn_info_version.clear();
|
||||
self.dst_saved_peer_info_versions.clear();
|
||||
self.last_sync_succ_timestamp.store(None);
|
||||
self.unreachable_peers.lock().clear();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1386,6 +1454,16 @@ impl SyncRouteSession {
|
||||
self.dst_saved_foreign_network_versions.shrink_to_fit();
|
||||
}
|
||||
|
||||
fn update_last_sync_succ_timestamp(&self, next_last_sync_succ_timestamp: SystemTime) {
|
||||
let _ = self.last_sync_succ_timestamp.fetch_update(|x| {
|
||||
if x.is_none_or(|old| old < next_last_sync_succ_timestamp) {
|
||||
Some(Some(next_last_sync_succ_timestamp))
|
||||
} else {
|
||||
None
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
fn short_debug_string(&self) -> String {
|
||||
format!(
|
||||
"session_dst_peer: {:?}, my_session_id: {:?}, dst_session_id: {:?}, we_are_initiator: {:?}, dst_is_initiator: {:?}, rpc_tx_count: {:?}, rpc_rx_count: {:?}, task: {:?}",
|
||||
@@ -1470,7 +1548,7 @@ impl PeerRouteServiceImpl {
|
||||
foreign_network_my_peer_id_map: DashMap::new(),
|
||||
|
||||
synced_route_info: SyncedRouteInfo {
|
||||
peer_infos: DashMap::new(),
|
||||
peer_infos: RwLock::new(OrderedHashMap::new()),
|
||||
raw_peer_infos: DashMap::new(),
|
||||
conn_map: DashMap::new(),
|
||||
foreign_network: DashMap::new(),
|
||||
@@ -1714,22 +1792,73 @@ impl PeerRouteServiceImpl {
|
||||
}
|
||||
|
||||
fn build_route_info(&self, session: &SyncRouteSession) -> Option<Vec<RoutePeerInfo>> {
|
||||
self.synced_route_info
|
||||
.check_peer_info_last_update_monotonic_increasing();
|
||||
let mut route_infos = Vec::new();
|
||||
for item in self.synced_route_info.peer_infos.iter() {
|
||||
let peer_infos = self.synced_route_info.peer_infos.read();
|
||||
let last_sync_succ_timestamp = session.last_sync_succ_timestamp.load();
|
||||
for (peer_id, peer_info) in peer_infos.iter().rev() {
|
||||
// stop iter if last_update of peer info is older than session.latest_scanned_peer_info_timestamp
|
||||
if let Some(last_update) = peer_info.last_update {
|
||||
let last_update = TryInto::<SystemTime>::try_into(last_update).unwrap();
|
||||
if last_sync_succ_timestamp.is_some_and(|t| last_update < t) {
|
||||
tracing::debug!(
|
||||
"ignore peer_info {:?} because last_update: {:?} is older than last_sync_succ_timestamp: {:?}, peer_infos_count: {}, my_peer_id: {:?}, session: {:?}",
|
||||
peer_info,
|
||||
last_update,
|
||||
last_sync_succ_timestamp,
|
||||
peer_infos.len(),
|
||||
self.my_peer_id,
|
||||
session
|
||||
);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if session.check_saved_peer_info_update_to_date(peer_info.peer_id, peer_info.version) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// do not send unreachable peer info to dst peer.
|
||||
if !self.route_table.peer_reachable(*item.key()) {
|
||||
if !self.route_table.peer_reachable(*peer_id) {
|
||||
session.unreachable_peers.lock().push_front(PeerIdVersion {
|
||||
peer_id: *peer_id,
|
||||
version: peer_info.version,
|
||||
});
|
||||
continue;
|
||||
}
|
||||
|
||||
if session
|
||||
.check_saved_peer_info_update_to_date(item.value().peer_id, item.value().version)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
route_infos.push(item.value().clone());
|
||||
route_infos.push(peer_info.clone());
|
||||
}
|
||||
|
||||
let mut unreachable_peers = session.unreachable_peers.lock();
|
||||
let cur_len = unreachable_peers.len();
|
||||
for _ in 0..cur_len {
|
||||
let peer_id_version = unreachable_peers.pop_back().unwrap();
|
||||
let peer_id = peer_id_version.peer_id;
|
||||
let version = peer_id_version.version;
|
||||
if session.check_saved_peer_info_update_to_date(peer_id, version) {
|
||||
// already up-to-date, skip
|
||||
continue;
|
||||
}
|
||||
let reachable = self.route_table.peer_reachable(peer_id);
|
||||
match self.synced_route_info.peer_infos.read().get(&peer_id) {
|
||||
Some(route_info) => {
|
||||
if reachable {
|
||||
route_infos.push(route_info.clone());
|
||||
}
|
||||
// this round rpc may fail, so keep it and remove the id only when it's in dst_saved_map
|
||||
unreachable_peers.push_front(peer_id_version);
|
||||
}
|
||||
None => {
|
||||
// if not found in peer info map, forget this peer id.
|
||||
continue;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
unreachable_peers.shrink_to_fit();
|
||||
|
||||
if route_infos.is_empty() {
|
||||
None
|
||||
} else {
|
||||
@@ -1860,6 +1989,7 @@ impl PeerRouteServiceImpl {
|
||||
let dst_supports_peer_list = self
|
||||
.synced_route_info
|
||||
.peer_infos
|
||||
.read()
|
||||
.get(&dst_peer_id)
|
||||
.and_then(|p| p.feature_flag)
|
||||
.map(|x| x.support_conn_list_sync)
|
||||
@@ -1888,11 +2018,13 @@ impl PeerRouteServiceImpl {
|
||||
fn clear_expired_peer(&self) {
|
||||
let now = SystemTime::now();
|
||||
let mut to_remove = Vec::new();
|
||||
for item in self.synced_route_info.peer_infos.iter() {
|
||||
if let Ok(d) = now.duration_since(item.value().last_update.unwrap().try_into().unwrap())
|
||||
{
|
||||
if d > REMOVE_DEAD_PEER_INFO_AFTER {
|
||||
to_remove.push(*item.key());
|
||||
for (peer_id, peer_info) in self.synced_route_info.peer_infos.read().iter() {
|
||||
if let Ok(d) = now.duration_since(peer_info.last_update.unwrap().try_into().unwrap()) {
|
||||
if d > REMOVE_DEAD_PEER_INFO_AFTER
|
||||
|| (d > REMOVE_UNREACHABLE_PEER_INFO_AFTER
|
||||
&& !self.route_table.peer_reachable(*peer_id))
|
||||
{
|
||||
to_remove.push(*peer_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1971,6 +2103,8 @@ impl PeerRouteServiceImpl {
|
||||
|
||||
let my_peer_id = self.my_peer_id;
|
||||
|
||||
let next_last_sync_succ_timestamp =
|
||||
self.synced_route_info.get_next_last_sync_succ_timestamp();
|
||||
let (peer_infos, conn_info, foreign_network) =
|
||||
self.build_sync_request(&session, dst_peer_id);
|
||||
if peer_infos.is_none()
|
||||
@@ -2020,6 +2154,11 @@ impl PeerRouteServiceImpl {
|
||||
.sync_route_info(ctrl, SyncRouteInfoRequest::default())
|
||||
.await;
|
||||
|
||||
tracing::debug!(
|
||||
"sync_route_info resp: {:?}, req: {:?}, session: {:?}, my_info: {:?}, next_last_sync_succ_timestamp: {:?}",
|
||||
ret, sync_route_info_req, session, self.global_ctx.network, next_last_sync_succ_timestamp
|
||||
);
|
||||
|
||||
if let Err(e) = &ret {
|
||||
tracing::error!(
|
||||
?ret,
|
||||
@@ -2066,6 +2205,8 @@ impl PeerRouteServiceImpl {
|
||||
if let Some(foreign_network) = &foreign_network {
|
||||
session.update_dst_saved_foreign_network_version(foreign_network, dst_peer_id);
|
||||
}
|
||||
|
||||
session.update_last_sync_succ_timestamp(next_last_sync_succ_timestamp);
|
||||
}
|
||||
}
|
||||
false
|
||||
@@ -2840,7 +2981,7 @@ mod tests {
|
||||
peers::{
|
||||
create_packet_recv_chan,
|
||||
peer_manager::{PeerManager, RouteAlgoType},
|
||||
peer_ospf_route::{PeerIdAndVersion, PeerRouteServiceImpl, FORCE_USE_CONN_LIST},
|
||||
peer_ospf_route::{PeerIdVersion, PeerRouteServiceImpl, FORCE_USE_CONN_LIST},
|
||||
route_trait::{NextHopPolicy, Route, RouteCostCalculatorInterface},
|
||||
tests::{connect_peer_manager, create_mock_peer_manager, wait_route_appear},
|
||||
},
|
||||
@@ -2923,8 +3064,14 @@ mod tests {
|
||||
|
||||
tokio::time::sleep(Duration::from_secs(3)).await;
|
||||
|
||||
assert_eq!(2, r_a.service_impl.synced_route_info.peer_infos.len());
|
||||
assert_eq!(2, r_b.service_impl.synced_route_info.peer_infos.len());
|
||||
assert_eq!(
|
||||
2,
|
||||
r_a.service_impl.synced_route_info.peer_infos.read().len()
|
||||
);
|
||||
assert_eq!(
|
||||
2,
|
||||
r_b.service_impl.synced_route_info.peer_infos.read().len()
|
||||
);
|
||||
|
||||
for s in r_a.service_impl.sessions.iter() {
|
||||
assert!(s.value().task.is_running());
|
||||
@@ -2934,6 +3081,7 @@ mod tests {
|
||||
r_a.service_impl
|
||||
.synced_route_info
|
||||
.peer_infos
|
||||
.read()
|
||||
.get(&p_a.my_peer_id())
|
||||
.unwrap()
|
||||
.version,
|
||||
@@ -2990,7 +3138,7 @@ mod tests {
|
||||
|
||||
for r in [r_a.clone(), r_b.clone(), r_c.clone()].iter() {
|
||||
wait_for_condition(
|
||||
|| async { r.service_impl.synced_route_info.peer_infos.len() == 3 },
|
||||
|| async { r.service_impl.synced_route_info.peer_infos.read().len() == 3 },
|
||||
Duration::from_secs(5),
|
||||
)
|
||||
.await;
|
||||
@@ -3095,7 +3243,9 @@ mod tests {
|
||||
// check peer infos
|
||||
let peer_info = synced_info
|
||||
.peer_infos
|
||||
.read()
|
||||
.get(&routable_peer.my_peer_id())
|
||||
.cloned()
|
||||
.unwrap();
|
||||
assert_eq!(peer_info.peer_id, routable_peer.my_peer_id());
|
||||
}
|
||||
@@ -3125,7 +3275,7 @@ mod tests {
|
||||
|
||||
for r in [r_a.clone(), r_b.clone(), r_c.clone()].iter() {
|
||||
wait_for_condition(
|
||||
|| async { r.service_impl.synced_route_info.peer_infos.len() == 3 },
|
||||
|| async { r.service_impl.synced_route_info.peer_infos.read().len() == 3 },
|
||||
Duration::from_secs(5),
|
||||
)
|
||||
.await;
|
||||
@@ -3392,10 +3542,10 @@ mod tests {
|
||||
let proxy_cidr: Ipv4Cidr = "192.168.100.0/24".parse().unwrap();
|
||||
let test_ip = proxy_cidr.first_address();
|
||||
|
||||
let mut cidr_peer_id_map: PrefixMap<Ipv4Cidr, PeerIdAndVersion> = PrefixMap::new();
|
||||
let mut cidr_peer_id_map: PrefixMap<Ipv4Cidr, PeerIdVersion> = PrefixMap::new();
|
||||
cidr_peer_id_map.insert(
|
||||
proxy_cidr,
|
||||
PeerIdAndVersion {
|
||||
PeerIdVersion {
|
||||
peer_id: p_c.my_peer_id(),
|
||||
version: 0,
|
||||
},
|
||||
|
||||
@@ -38,8 +38,7 @@ async fn test_route_peer_info_ipv6() {
|
||||
global_ctx.set_ipv6(Some(ipv6_cidr));
|
||||
|
||||
// Create RoutePeerInfo with IPv6 support
|
||||
let peer_info = RoutePeerInfo::new();
|
||||
let updated_info = peer_info.update_self(123, 456, &global_ctx);
|
||||
let updated_info = RoutePeerInfo::new_updated_self(123, 456, &global_ctx);
|
||||
|
||||
// Verify IPv6 address is included
|
||||
assert!(updated_info.ipv6_addr.is_some());
|
||||
|
||||
Reference in New Issue
Block a user