conn list only check updated peer

This commit is contained in:
sijie.sun
2025-12-07 21:49:12 +08:00
parent ae731bdb3c
commit 6ec159bfe8
2 changed files with 107 additions and 75 deletions

View File

@@ -1,5 +1,5 @@
use std::{
collections::{BTreeMap, BTreeSet, HashMap, VecDeque},
collections::{BTreeMap, BTreeSet, HashMap},
fmt::Debug,
net::{IpAddr, Ipv4Addr, Ipv6Addr},
sync::{
@@ -698,6 +698,7 @@ impl SyncedRouteInfo {
fn get_next_last_sync_succ_timestamp(&self) -> SystemTime {
let _peer_info_lock = self.peer_infos.read();
let _conn_info_lock = self.conn_map.read();
// TODO: add conn and foreign network lock
SystemTime::now()
@@ -1262,8 +1263,9 @@ struct SyncRouteSession {
dst_saved_conn_info_version: DashMap<PeerId, VersionAndTouchTime>,
dst_saved_foreign_network_versions: DashMap<ForeignNetworkRouteInfoKey, VersionAndTouchTime>,
// we don't want to send unreachable peer infos to peer, so we keep track of them.
unreachable_peers: parking_lot::Mutex<VecDeque<PeerIdVersion>>,
// we don't want to send unreachable peer infos / conn infos to peer, so we keep track of them.
unreachable_peers_for_peer_info: parking_lot::Mutex<BTreeMap<PeerId, Version>>,
unreachable_peers_for_conn_info: parking_lot::Mutex<BTreeMap<PeerId, Version>>,
last_sync_succ_timestamp: AtomicCell<Option<SystemTime>>,
@@ -1280,6 +1282,8 @@ struct SyncRouteSession {
rpc_rx_count: AtomicU32,
task: SessionTask,
lock: parking_lot::Mutex<()>,
}
impl SyncRouteSession {
@@ -1291,7 +1295,8 @@ impl SyncRouteSession {
dst_saved_conn_info_version: DashMap::new(),
dst_saved_foreign_network_versions: DashMap::new(),
unreachable_peers: parking_lot::Mutex::new(VecDeque::new()),
unreachable_peers_for_peer_info: parking_lot::Mutex::new(BTreeMap::new()),
unreachable_peers_for_conn_info: parking_lot::Mutex::new(BTreeMap::new()),
last_sync_succ_timestamp: AtomicCell::new(None),
@@ -1307,6 +1312,8 @@ impl SyncRouteSession {
rpc_rx_count: AtomicU32::new(0),
task: SessionTask::new(my_peer_id),
lock: parking_lot::Mutex::new(()),
}
}
@@ -1447,8 +1454,12 @@ impl SyncRouteSession {
self.dst_session_id.store(session_id, Ordering::Relaxed);
self.dst_saved_conn_info_version.clear();
self.dst_saved_peer_info_versions.clear();
// update_dst_session_id is always called with session lock held, so clear
// last_sync_succ_timestamp and unreachable_peers non-atomic is safe.
self.last_sync_succ_timestamp.store(None);
self.unreachable_peers.lock().clear();
self.unreachable_peers_for_peer_info.lock().clear();
self.unreachable_peers_for_conn_info.lock().clear();
}
}
@@ -1816,6 +1827,7 @@ impl PeerRouteServiceImpl {
.check_peer_info_last_update_monotonic_increasing();
let mut route_infos = Vec::new();
let peer_infos = self.synced_route_info.peer_infos.read();
let mut unreachable_peers_for_peer_info = session.unreachable_peers_for_peer_info.lock();
let last_sync_succ_timestamp = session.last_sync_succ_timestamp.load();
for (peer_id, peer_info) in peer_infos.iter().rev() {
// stop iter if last_update of peer info is older than session.latest_scanned_peer_info_timestamp
@@ -1841,43 +1853,30 @@ impl PeerRouteServiceImpl {
// do not send unreachable peer info to dst peer.
if !self.route_table.peer_reachable(*peer_id) {
session.unreachable_peers.lock().push_front(PeerIdVersion {
peer_id: *peer_id,
version: peer_info.version,
});
unreachable_peers_for_peer_info.insert(*peer_id, peer_info.version);
continue;
}
route_infos.push(peer_info.clone());
}
let mut unreachable_peers = session.unreachable_peers.lock();
let cur_len = unreachable_peers.len();
for _ in 0..cur_len {
let peer_id_version = unreachable_peers.pop_back().unwrap();
let peer_id = peer_id_version.peer_id;
let version = peer_id_version.version;
if session.check_saved_peer_info_update_to_date(peer_id, version) {
// already up-to-date, skip
continue;
unreachable_peers_for_peer_info.retain(|peer_id, version| {
if session.check_saved_peer_info_update_to_date(*peer_id, *version) {
// if saved peer info is up-to-date, forget this peer id.
return false;
}
let reachable = self.route_table.peer_reachable(peer_id);
match self.synced_route_info.peer_infos.read().get(&peer_id) {
Some(route_info) => {
if reachable {
route_infos.push(route_info.clone());
}
// this round rpc may fail, so keep it and remove the id only when it's in dst_saved_map
unreachable_peers.push_front(peer_id_version);
}
None => {
// if not found in peer info map, forget this peer id.
continue;
}
let Some(peer_info) = peer_infos.get(peer_id) else {
// if not found in peer info map, forget this peer id.
return false;
};
}
unreachable_peers.shrink_to_fit();
if self.route_table.peer_reachable(*peer_id) {
route_infos.push(peer_info.clone());
}
// this round rpc may fail, so keep it and remove the id only when it's in dst_saved_map
true
});
if route_infos.is_empty() {
None
@@ -1886,53 +1885,76 @@ impl PeerRouteServiceImpl {
}
}
fn build_conn_bitmap(&self, session: &SyncRouteSession) -> Option<RouteConnBitmap> {
let mut need_update = false;
for peer_id_version in self.cached_local_conn_map.lock().unwrap().peer_ids.iter() {
let (peer_id, local_version) = (peer_id_version.peer_id, peer_id_version.version);
if session.check_saved_conn_version_update_to_date(peer_id, local_version) {
continue;
}
need_update = true;
}
if !need_update {
return None;
}
Some(self.cached_local_conn_map.lock().unwrap().clone())
}
fn estimate_conn_bitmap_size(&self) -> usize {
let cached_conn_map = self.cached_local_conn_map.lock().unwrap();
cached_conn_map.bitmap.len()
+ (cached_conn_map.peer_ids.len() * std::mem::size_of::<PeerIdVersion>())
}
fn build_conn_peer_list(
&self,
session: &SyncRouteSession,
estimated_size: &mut usize,
) -> Option<RouteConnPeerList> {
let last_sync_succ_timestamp = session.last_sync_succ_timestamp.load();
let mut peer_conn_infos = Vec::new();
*estimated_size = 0;
for (peer_id, conn_info) in self.synced_route_info.conn_map.read().iter() {
if session.check_saved_conn_version_update_to_date(*peer_id, conn_info.version.get()) {
continue;
}
let conn_map = self.synced_route_info.conn_map.read();
let mut unreachable_peers_for_conn_info = session.unreachable_peers_for_conn_info.lock();
let mut add_to_conn_peer_list = |peer_id: PeerId, conn_info: &RouteConnInfo| {
peer_conn_infos.push(PeerConnInfo {
peer_id: Some(PeerIdVersion {
peer_id: *peer_id,
peer_id,
version: conn_info.version.get(),
}),
connected_peer_ids: conn_info.connected_peers.iter().copied().collect(),
});
*estimated_size += std::mem::size_of::<PeerIdVersion>()
+ conn_info.connected_peers.len() * std::mem::size_of::<PeerId>();
};
for (peer_id, conn_info) in conn_map.iter() {
// stop iter if last_update of conn info is older than session.latest_scanned_peer_info_timestamp
let last_update = TryInto::<SystemTime>::try_into(conn_info.last_update).unwrap();
if last_sync_succ_timestamp.is_some_and(|t| last_update < t) {
tracing::debug!(
"ignore conn info {:?} because last_update: {:?} is older than last_sync_succ_timestamp: {:?}, conn_map count: {}, my_peer_id: {:?}, session: {:?}",
conn_info,
last_update,
last_sync_succ_timestamp,
conn_map.len(),
self.my_peer_id,
session
);
continue;
}
if session.check_saved_conn_version_update_to_date(*peer_id, conn_info.version.get()) {
continue;
}
if !self.route_table.peer_reachable(*peer_id) {
unreachable_peers_for_conn_info.insert(*peer_id, conn_info.version.get());
continue;
}
add_to_conn_peer_list(*peer_id, conn_info);
}
unreachable_peers_for_conn_info.retain(|peer_id, version| {
if session.check_saved_conn_version_update_to_date(*peer_id, *version) {
// if saved conn info is up-to-date, forget this peer id.
return false;
}
let Some(conn_info) = conn_map.get(peer_id) else {
// if not found in peer info map, forget this peer id.
return false;
};
if self.route_table.peer_reachable(*peer_id) {
add_to_conn_peer_list(*peer_id, conn_info);
}
// this round rpc may fail, so keep it and remove the id only when it's in dst_saved_map
true
});
if peer_conn_infos.is_empty() {
return None;
}
@@ -1940,6 +1962,16 @@ impl PeerRouteServiceImpl {
Some(RouteConnPeerList { peer_conn_infos })
}
fn build_conn_bitmap(&self) -> RouteConnBitmap {
self.cached_local_conn_map.lock().unwrap().clone()
}
fn estimate_conn_bitmap_size(&self) -> usize {
let cached_conn_map = self.cached_local_conn_map.lock().unwrap();
cached_conn_map.bitmap.len()
+ (cached_conn_map.peer_ids.len() * std::mem::size_of::<PeerIdVersion>())
}
fn build_foreign_network_info(
&self,
session: &SyncRouteSession,
@@ -2010,25 +2042,18 @@ impl PeerRouteServiceImpl {
.get(&dst_peer_id)
.and_then(|p| p.feature_flag)
.map(|x| x.support_conn_list_sync)
.unwrap_or(false);
if !dst_supports_peer_list && !FORCE_USE_CONN_LIST.load(Ordering::Relaxed) {
// Destination peer doesn't support peer list, use bitmap format
return self.build_conn_bitmap(session).map(Into::into);
}
.unwrap_or(false)
|| FORCE_USE_CONN_LIST.load(Ordering::Relaxed);
// Both formats are supported, choose the more efficient one
let mut conn_list_estimated_size = 0;
let peer_list = self.build_conn_peer_list(session, &mut conn_list_estimated_size);
let peer_list = self.build_conn_peer_list(session, &mut conn_list_estimated_size)?;
let bitmap_size = self.estimate_conn_bitmap_size();
if conn_list_estimated_size < bitmap_size
|| FORCE_USE_CONN_LIST.load(Ordering::Relaxed)
|| peer_list.is_none()
{
peer_list.map(Into::into)
if conn_list_estimated_size < bitmap_size && dst_supports_peer_list {
Some(peer_list.into())
} else {
self.build_conn_bitmap(session).map(Into::into)
Some(self.build_conn_bitmap().into())
}
}
@@ -2118,6 +2143,8 @@ impl PeerRouteServiceImpl {
return true;
};
let _session_lock = session.lock.lock();
let my_peer_id = self.my_peer_id;
let next_last_sync_succ_timestamp =
@@ -2167,9 +2194,12 @@ impl PeerRouteServiceImpl {
.encode_to_vec()
.into(),
);
drop(_session_lock);
let ret = rpc_stub
.sync_route_info(ctrl, SyncRouteInfoRequest::default())
.await;
let _session_lock = session.lock.lock();
tracing::debug!(
"sync_route_info resp: {:?}, req: {:?}, session: {:?}, my_info: {:?}, next_last_sync_succ_timestamp: {:?}",
@@ -2586,6 +2616,8 @@ impl RouteSessionManager {
let my_peer_id = service_impl.my_peer_id;
let session = self.get_or_start_session(from_peer_id)?;
let _session_lock = session.lock.lock();
session.rpc_rx_count.fetch_add(1, Ordering::Relaxed);
session.update_dst_session_id(from_session_id);