This commit is contained in:
sijie.sun
2025-11-02 10:31:45 +08:00
parent cf2038b6c1
commit ae731bdb3c

View File

@@ -275,12 +275,29 @@ impl RouteConnBitmap {
type Error = SyncRouteInfoError;
#[derive(Debug, Clone)]
struct RouteConnInfo {
connected_peers: BTreeSet<PeerId>,
version: AtomicVersion,
last_update: SystemTime,
}
impl Default for RouteConnInfo {
fn default() -> Self {
Self {
connected_peers: BTreeSet::new(),
version: AtomicVersion::new(),
last_update: SystemTime::now(),
}
}
}
// constructed with all infos synced from all peers.
struct SyncedRouteInfo {
peer_infos: RwLock<OrderedHashMap<PeerId, RoutePeerInfo>>,
// prost doesn't support unknown fields, so we use DynamicMessage to store raw infos and progate them to other peers.
raw_peer_infos: DashMap<PeerId, DynamicMessage>,
conn_map: DashMap<PeerId, (BTreeSet<PeerId>, AtomicVersion)>,
conn_map: RwLock<OrderedHashMap<PeerId, RouteConnInfo>>,
foreign_network: DashMap<ForeignNetworkRouteInfoKey, ForeignNetworkRouteInfoEntry>,
group_trust_map: DashMap<PeerId, HashMap<String, Vec<u8>>>,
group_trust_map_cache: DashMap<PeerId, Arc<Vec<String>>>, // cache for group trust map, should sync with group_trust_map
@@ -303,21 +320,21 @@ impl Debug for SyncedRouteInfo {
impl SyncedRouteInfo {
fn get_connected_peers<T: FromIterator<PeerId>>(&self, peer_id: PeerId) -> Option<T> {
self.conn_map
.read()
.get(&peer_id)
.map(|x| x.0.clone().iter().copied().collect())
.map(|x| x.connected_peers.iter().copied().collect())
}
fn remove_peer(&self, peer_id: PeerId) {
tracing::warn!(?peer_id, "remove_peer from synced_route_info");
self.peer_infos.write().remove(&peer_id);
self.raw_peer_infos.remove(&peer_id);
self.conn_map.remove(&peer_id);
self.conn_map.write().remove(&peer_id);
self.foreign_network.retain(|k, _| k.peer_id != peer_id);
self.group_trust_map.remove(&peer_id);
self.group_trust_map_cache.remove(&peer_id);
shrink_dashmap(&self.raw_peer_infos, None);
shrink_dashmap(&self.conn_map, None);
shrink_dashmap(&self.foreign_network, None);
shrink_dashmap(&self.group_trust_map, None);
shrink_dashmap(&self.group_trust_map_cache, None);
@@ -339,10 +356,14 @@ impl SyncedRouteInfo {
drop(guard);
}
self.conn_map.entry(*peer_id).or_insert_with(|| {
let guard = self.conn_map.upgradable_read();
if !guard.contains_key(peer_id) {
let mut guard = RwLockUpgradableReadGuard::upgrade(guard);
guard.insert(*peer_id, RouteConnInfo::default());
need_inc_version = true;
(BTreeSet::new(), AtomicVersion::new())
});
} else {
drop(guard);
}
}
if need_inc_version {
self.version.inc();
@@ -455,7 +476,31 @@ impl SyncedRouteInfo {
Ok(())
}
fn update_conn_map(&self, conn_bitmap: &RouteConnBitmap) {
fn update_conn_info_one_peer(
&self,
peer_id_version: &PeerIdVersion,
connected_peers: BTreeSet<PeerId>,
) -> bool {
let mut guard = self.conn_map.write();
if guard
.get_mut(&peer_id_version.peer_id)
.is_none_or(|old| peer_id_version.version > old.version.get())
{
guard.insert(
peer_id_version.peer_id,
RouteConnInfo {
connected_peers,
version: peer_id_version.version.into(),
last_update: SystemTime::now(),
},
);
return true;
}
false
}
fn update_conn_info_with_bitmap(&self, conn_bitmap: &RouteConnBitmap) {
self.fill_empty_peer_info(&conn_bitmap.peer_ids.iter().map(|x| x.peer_id).collect());
let mut need_inc_version = false;
@@ -463,27 +508,14 @@ impl SyncedRouteInfo {
for (peer_idx, peer_id_version) in conn_bitmap.peer_ids.iter().enumerate() {
let connceted_peers = conn_bitmap.get_connected_peers(peer_idx);
self.fill_empty_peer_info(&connceted_peers);
self.conn_map
.entry(peer_id_version.peer_id)
.and_modify(|(old_conn_bitmap, old_version)| {
if peer_id_version.version > old_version.get() {
*old_conn_bitmap = connceted_peers.clone();
need_inc_version = true;
old_version.set(peer_id_version.version);
}
})
.or_insert_with(|| {
need_inc_version = true;
(connceted_peers, (peer_id_version.version).into())
});
need_inc_version = self.update_conn_info_one_peer(peer_id_version, connceted_peers);
}
if need_inc_version {
self.version.inc();
}
}
fn update_conn_peer_list(&self, conn_peer_list: &RouteConnPeerList) {
fn update_conn_info_with_list(&self, conn_peer_list: &RouteConnPeerList) {
let mut need_inc_version = false;
for peer_conn_info in &conn_peer_list.peer_conn_infos {
@@ -492,23 +524,9 @@ impl SyncedRouteInfo {
};
let connected_peers: BTreeSet<PeerId> =
peer_conn_info.connected_peer_ids.iter().copied().collect();
let (peer_id, version) = (peer_id_version.peer_id, peer_id_version.version);
self.fill_empty_peer_info(&connected_peers);
self.conn_map
.entry(peer_id)
.and_modify(|(old_conn_bitmap, old_version)| {
if version > old_version.get() {
*old_conn_bitmap = connected_peers.clone();
need_inc_version = true;
old_version.set(version);
}
})
.or_insert_with(|| {
need_inc_version = true;
(connected_peers, version.into())
});
need_inc_version = self.update_conn_info_one_peer(&peer_id_version, connected_peers);
}
if need_inc_version {
self.version.inc();
@@ -518,10 +536,10 @@ impl SyncedRouteInfo {
fn update_conn_info(&self, conn_info: &ConnInfo) {
match conn_info {
ConnInfo::ConnBitmap(conn_bitmap) => {
self.update_conn_map(conn_bitmap);
self.update_conn_info_with_bitmap(conn_bitmap);
}
ConnInfo::ConnPeerList(conn_peer_list) => {
self.update_conn_peer_list(conn_peer_list);
self.update_conn_info_with_list(conn_peer_list);
}
}
}
@@ -592,18 +610,24 @@ impl SyncedRouteInfo {
fn update_my_conn_info(&self, my_peer_id: PeerId, connected_peers: BTreeSet<PeerId>) -> bool {
self.fill_empty_peer_info(&connected_peers);
let mut my_conn_info = self
.conn_map
.entry(my_peer_id)
.or_insert((BTreeSet::new(), AtomicVersion::new()));
let guard = self.conn_map.upgradable_read();
let my_conn_info = guard.get(&my_peer_id);
let new_version = my_conn_info.map(|x| x.version.get()).unwrap_or(0) + 1;
if connected_peers == my_conn_info.value().0 {
false
} else {
let _ = std::mem::replace(&mut my_conn_info.value_mut().0, connected_peers);
my_conn_info.value().1.inc();
if my_conn_info.is_none_or(|old| old.connected_peers != connected_peers) {
let mut guard = RwLockUpgradableReadGuard::upgrade(guard);
guard.insert(
my_peer_id,
RouteConnInfo {
connected_peers,
version: new_version.into(),
last_update: SystemTime::now(),
},
);
self.version.inc();
true
} else {
false
}
}
@@ -694,18 +718,6 @@ impl SyncedRouteInfo {
}
}
fn is_peer_bidirectly_connected(&self, src_peer_id: PeerId, dst_peer_id: PeerId) -> bool {
self.conn_map
.get(&src_peer_id)
.map(|x| x.0.contains(&dst_peer_id))
.unwrap_or(false)
}
fn is_peer_directly_connected(&self, src_peer_id: PeerId, dst_peer_id: PeerId) -> bool {
self.is_peer_bidirectly_connected(src_peer_id, dst_peer_id)
|| self.is_peer_bidirectly_connected(dst_peer_id, src_peer_id)
}
fn verify_and_update_group_trusts(
&self,
peer_infos: &[RoutePeerInfo],
@@ -1550,7 +1562,7 @@ impl PeerRouteServiceImpl {
synced_route_info: SyncedRouteInfo {
peer_infos: RwLock::new(OrderedHashMap::new()),
raw_peer_infos: DashMap::new(),
conn_map: DashMap::new(),
conn_map: RwLock::new(OrderedHashMap::new()),
foreign_network: DashMap::new(),
group_trust_map: DashMap::new(),
group_trust_map_cache: DashMap::new(),
@@ -1738,49 +1750,57 @@ impl PeerRouteServiceImpl {
// the conn_bitmap should contain complete list of directly connected peers.
// use union of dst peers can preserve this property.
let all_dst_peer_ids = self
.synced_route_info
.conn_map
.iter()
.flat_map(|x| x.value().clone().0.into_iter())
.collect::<BTreeSet<_>>();
let all_peer_ids = self
.synced_route_info
.conn_map
.iter()
.map(|x| PeerIdVersion {
peer_id: *x.key(),
version: x.value().1.get(),
})
// do not sync conn info of peers that are not reachable from any peer.
.filter(|p| {
all_dst_peer_ids.contains(&p.peer_id) || self.route_table.peer_reachable(p.peer_id)
})
.collect::<Vec<_>>();
let mut all_peer_ids: BTreeMap<PeerId, Version> = BTreeMap::new();
let mut add_to_all_peer_ids = |peer_id: PeerId, version: Version| {
all_peer_ids
.entry(peer_id)
.and_modify(|x| {
if *x < version {
*x = version;
}
})
.or_insert(version);
};
for item in self.synced_route_info.conn_map.read().iter() {
let src_peer_id = *item.0;
if !self.route_table.peer_reachable(src_peer_id) {
continue;
}
add_to_all_peer_ids(src_peer_id, item.1.version.get());
for dst_peer_id in item.1.connected_peers.iter() {
add_to_all_peer_ids(*dst_peer_id, 0);
}
}
let mut conn_bitmap = RouteConnBitmap {
bitmap: vec![0; (all_peer_ids.len() * all_peer_ids.len()).div_ceil(8)],
peer_ids: all_peer_ids,
peer_ids: all_peer_ids
.iter()
.map(|x| PeerIdVersion {
peer_id: *x.0,
version: *x.1,
})
.collect(),
};
let locked_conn_map = self.synced_route_info.conn_map.read();
let all_peer_ids = &conn_bitmap.peer_ids;
for (peer_idx, peer_id_version) in all_peer_ids.iter().enumerate() {
let Some(connected) = self
.synced_route_info
.conn_map
.get(&peer_id_version.peer_id)
else {
let Some(connected) = locked_conn_map.get(&peer_id_version.peer_id) else {
continue;
};
for (idx, other_peer_id_version) in all_peer_ids.iter().enumerate() {
if connected.0.contains(&other_peer_id_version.peer_id) {
if connected
.connected_peers
.contains(&other_peer_id_version.peer_id)
{
let bit_idx = peer_idx * all_peer_ids.len() + idx;
conn_bitmap.bitmap[bit_idx / 8] |= 1 << (bit_idx % 8);
}
}
}
drop(locked_conn_map);
let mut locked = self.cached_local_conn_map.lock().unwrap();
if self
@@ -1895,25 +1915,22 @@ impl PeerRouteServiceImpl {
estimated_size: &mut usize,
) -> Option<RouteConnPeerList> {
let mut peer_conn_infos = Vec::new();
let cached_conn_map = self.cached_local_conn_map.lock().unwrap();
*estimated_size = 0;
for peer_id_version in cached_conn_map.peer_ids.iter() {
let (peer_id, local_version) = (peer_id_version.peer_id, peer_id_version.version);
if session.check_saved_conn_version_update_to_date(peer_id, local_version) {
for (peer_id, conn_info) in self.synced_route_info.conn_map.read().iter() {
if session.check_saved_conn_version_update_to_date(*peer_id, conn_info.version.get()) {
continue;
}
let Some(connected) = self.synced_route_info.conn_map.get(&peer_id) else {
continue;
};
peer_conn_infos.push(PeerConnInfo {
peer_id: Some(*peer_id_version),
connected_peer_ids: connected.0.iter().copied().collect(),
peer_id: Some(PeerIdVersion {
peer_id: *peer_id,
version: conn_info.version.get(),
}),
connected_peer_ids: conn_info.connected_peers.iter().copied().collect(),
});
*estimated_size += std::mem::size_of::<PeerIdVersion>()
+ connected.0.len() * std::mem::size_of::<PeerId>();
+ conn_info.connected_peers.len() * std::mem::size_of::<PeerId>();
}
if peer_conn_infos.is_empty() {
@@ -3225,13 +3242,13 @@ mod tests {
let synced_info = &p.service_impl.synced_route_info;
for routable_peer in routable_peers.iter() {
// check conn map
let conns = synced_info
.conn_map
.get(&routable_peer.my_peer_id())
.unwrap();
let conns = {
let guard = synced_info.conn_map.read();
guard.get(&routable_peer.my_peer_id()).cloned().unwrap()
};
assert_eq!(
conns.0,
conns.connected_peers,
routable_peer
.get_peer_map()
.list_peers()