allow sync conn with conn list when conn bitmap is too large (#1508)

This commit is contained in:
Sijie.Sun
2025-10-23 08:11:36 +08:00
committed by GitHub
parent 7485f5f64e
commit 71679e889a
5 changed files with 289 additions and 105 deletions

View File

@@ -142,6 +142,7 @@ impl GlobalCtx {
let feature_flags = PeerFeatureFlag {
kcp_input: !config_fs.get_flags().disable_kcp_input,
no_relay_kcp: config_fs.get_flags().disable_relay_kcp,
support_conn_list_sync: true, // Enable selective peer list sync by default
..Default::default()
};

View File

@@ -22,7 +22,6 @@ use petgraph::{
use prefix_trie::PrefixMap;
use prost::Message;
use prost_reflect::{DynamicMessage, ReflectMessage};
use serde::{Deserialize, Serialize};
use tokio::{
select,
sync::Mutex,
@@ -40,8 +39,9 @@ use crate::{
common::{Ipv4Inet, NatType, StunInfo},
peer_rpc::{
route_foreign_network_infos, route_foreign_network_summary,
ForeignNetworkRouteInfoEntry, ForeignNetworkRouteInfoKey, OspfRouteRpc,
OspfRouteRpcClientFactory, OspfRouteRpcServer, PeerIdVersion, RouteForeignNetworkInfos,
sync_route_info_request::ConnInfo, ForeignNetworkRouteInfoEntry,
ForeignNetworkRouteInfoKey, OspfRouteRpc, OspfRouteRpcClientFactory,
OspfRouteRpcServer, PeerIdVersion, RouteForeignNetworkInfos,
RouteForeignNetworkSummary, RoutePeerInfo, RoutePeerInfos, SyncRouteInfoError,
SyncRouteInfoRequest, SyncRouteInfoResponse,
},
@@ -68,6 +68,7 @@ static UPDATE_PEER_INFO_PERIOD: Duration = Duration::from_secs(3600);
static REMOVE_DEAD_PEER_INFO_AFTER: Duration = Duration::from_secs(3660);
// the cost (latency between two peers) is i32, i32::MAX is large enough.
static AVOID_RELAY_COST: usize = i32::MAX as usize;
static FORCE_USE_CONN_LIST: AtomicBool = AtomicBool::new(true);
type Version = u32;
@@ -233,49 +234,11 @@ impl From<RoutePeerInfo> for crate::proto::api::instance::Route {
}
}
#[derive(Deserialize, Serialize, Clone, Debug, PartialEq)]
struct RouteConnBitmap {
peer_ids: Vec<(PeerId, Version)>,
bitmap: Vec<u8>,
}
impl From<RouteConnBitmap> for crate::proto::peer_rpc::RouteConnBitmap {
fn from(val: RouteConnBitmap) -> Self {
crate::proto::peer_rpc::RouteConnBitmap {
peer_ids: val
.peer_ids
.into_iter()
.map(|x| PeerIdVersion {
peer_id: x.0,
version: x.1,
})
.collect(),
bitmap: val.bitmap,
}
}
}
impl From<crate::proto::peer_rpc::RouteConnBitmap> for RouteConnBitmap {
fn from(v: crate::proto::peer_rpc::RouteConnBitmap) -> Self {
RouteConnBitmap {
peer_ids: v
.peer_ids
.into_iter()
.map(|x| (x.peer_id, x.version))
.collect(),
bitmap: v.bitmap,
}
}
}
type RouteConnBitmap = crate::proto::peer_rpc::RouteConnBitmap;
type RouteConnPeerList = crate::proto::peer_rpc::RouteConnPeerList;
type PeerConnInfo = crate::proto::peer_rpc::route_conn_peer_list::PeerConnInfo;
impl RouteConnBitmap {
fn new() -> Self {
RouteConnBitmap {
peer_ids: Vec::new(),
bitmap: Vec::new(),
}
}
fn get_bit(&self, idx: usize) -> bool {
let byte_idx = idx / 8;
let bit_idx = idx % 8;
@@ -285,9 +248,9 @@ impl RouteConnBitmap {
fn get_connected_peers(&self, peer_idx: usize) -> BTreeSet<PeerId> {
let mut connected_peers = BTreeSet::new();
for (idx, (peer_id, _)) in self.peer_ids.iter().enumerate() {
for (idx, peer_id_version) in self.peer_ids.iter().enumerate() {
if self.get_bit(peer_idx * self.peer_ids.len() + idx) {
connected_peers.insert(*peer_id);
connected_peers.insert(peer_id_version.peer_id);
}
}
connected_peers
@@ -474,26 +437,26 @@ impl SyncedRouteInfo {
}
fn update_conn_map(&self, conn_bitmap: &RouteConnBitmap) {
self.fill_empty_peer_info(&conn_bitmap.peer_ids.iter().map(|x| x.0).collect());
self.fill_empty_peer_info(&conn_bitmap.peer_ids.iter().map(|x| x.peer_id).collect());
let mut need_inc_version = false;
for (peer_idx, (peer_id, version)) in conn_bitmap.peer_ids.iter().enumerate() {
for (peer_idx, peer_id_version) in conn_bitmap.peer_ids.iter().enumerate() {
let connceted_peers = conn_bitmap.get_connected_peers(peer_idx);
self.fill_empty_peer_info(&connceted_peers);
self.conn_map
.entry(*peer_id)
.entry(peer_id_version.peer_id)
.and_modify(|(old_conn_bitmap, old_version)| {
if *version > old_version.get() {
if peer_id_version.version > old_version.get() {
*old_conn_bitmap = connceted_peers.clone();
need_inc_version = true;
old_version.set(*version);
old_version.set(peer_id_version.version);
}
})
.or_insert_with(|| {
need_inc_version = true;
(connceted_peers, (*version).into())
(connceted_peers, (peer_id_version.version).into())
});
}
if need_inc_version {
@@ -501,6 +464,49 @@ impl SyncedRouteInfo {
}
}
fn update_conn_peer_list(&self, conn_peer_list: &RouteConnPeerList) {
let mut need_inc_version = false;
for peer_conn_info in &conn_peer_list.peer_conn_infos {
let Some(peer_id_version) = peer_conn_info.peer_id else {
continue;
};
let connected_peers: BTreeSet<PeerId> =
peer_conn_info.connected_peer_ids.iter().copied().collect();
let (peer_id, version) = (peer_id_version.peer_id, peer_id_version.version);
self.fill_empty_peer_info(&connected_peers);
self.conn_map
.entry(peer_id)
.and_modify(|(old_conn_bitmap, old_version)| {
if version > old_version.get() {
*old_conn_bitmap = connected_peers.clone();
need_inc_version = true;
old_version.set(version);
}
})
.or_insert_with(|| {
need_inc_version = true;
(connected_peers, version.into())
});
}
if need_inc_version {
self.version.inc();
}
}
fn update_conn_info(&self, conn_info: &ConnInfo) {
match conn_info {
ConnInfo::ConnBitmap(conn_bitmap) => {
self.update_conn_map(conn_bitmap);
}
ConnInfo::ConnPeerList(conn_peer_list) => {
self.update_conn_peer_list(conn_peer_list);
}
}
}
fn update_foreign_network(&self, foreign_network: &RouteForeignNetworkInfos) {
for item in foreign_network.infos.iter().map(Clone::clone) {
let Some(key) = item.key else {
@@ -1185,7 +1191,7 @@ struct SyncRouteSession {
my_peer_id: PeerId,
dst_peer_id: PeerId,
dst_saved_peer_info_versions: DashMap<PeerId, VersionAndTouchTime>,
dst_saved_conn_bitmap_version: DashMap<PeerId, VersionAndTouchTime>,
dst_saved_conn_info_version: DashMap<PeerId, VersionAndTouchTime>,
dst_saved_foreign_network_versions: DashMap<ForeignNetworkRouteInfoKey, VersionAndTouchTime>,
my_session_id: AtomicSessionId,
@@ -1209,7 +1215,7 @@ impl SyncRouteSession {
my_peer_id,
dst_peer_id,
dst_saved_peer_info_versions: DashMap::new(),
dst_saved_conn_bitmap_version: DashMap::new(),
dst_saved_conn_info_version: DashMap::new(),
dst_saved_foreign_network_versions: DashMap::new(),
my_session_id: AtomicSessionId::new(rand::random()),
@@ -1246,7 +1252,7 @@ impl SyncRouteSession {
// never send version 0 conn bitmap to dst peer.
return true;
}
self.dst_saved_conn_bitmap_version
self.dst_saved_conn_info_version
.get(&peer_id)
.map(|v| {
v.touch();
@@ -1293,15 +1299,46 @@ impl SyncRouteSession {
conn_bitmap: &RouteConnBitmap,
dst_peer_id: PeerId,
) {
for (peer_id, version) in conn_bitmap.peer_ids.iter() {
if *peer_id == dst_peer_id {
for peer_id_version in conn_bitmap.peer_ids.iter() {
if peer_id_version.peer_id == dst_peer_id {
continue;
}
self.dst_saved_conn_bitmap_version
.entry(*peer_id)
self.dst_saved_conn_info_version
.entry(peer_id_version.peer_id)
.or_default()
.set_if_larger(*version);
.set_if_larger(peer_id_version.version);
}
}
fn update_dst_saved_conn_peer_list_version(
&self,
conn_peer_list: &RouteConnPeerList,
dst_peer_id: PeerId,
) {
for peer_conn_info in &conn_peer_list.peer_conn_infos {
let Some(peer_id_version) = peer_conn_info.peer_id else {
continue;
};
if peer_id_version.peer_id == dst_peer_id {
continue;
}
self.dst_saved_conn_info_version
.entry(peer_id_version.peer_id)
.or_default()
.set_if_larger(peer_id_version.version);
}
}
fn update_dst_saved_conn_info_version(&self, conn_info: &ConnInfo, dst_peer_id: PeerId) {
match conn_info {
ConnInfo::ConnBitmap(conn_bitmap) => {
self.update_dst_saved_conn_bitmap_version(conn_bitmap, dst_peer_id);
}
ConnInfo::ConnPeerList(peer_list) => {
self.update_dst_saved_conn_peer_list_version(peer_list, dst_peer_id);
}
}
}
@@ -1330,7 +1367,7 @@ impl SyncRouteSession {
if session_id != self.dst_session_id.load(Ordering::Relaxed) {
tracing::warn!(?self, ?session_id, "session id mismatch, clear saved info.");
self.dst_session_id.store(session_id, Ordering::Relaxed);
self.dst_saved_conn_bitmap_version.clear();
self.dst_saved_conn_info_version.clear();
self.dst_saved_peer_info_versions.clear();
}
}
@@ -1340,9 +1377,9 @@ impl SyncRouteSession {
.retain(|_, v| !v.is_expired());
self.dst_saved_peer_info_versions.shrink_to_fit();
self.dst_saved_conn_bitmap_version
self.dst_saved_conn_info_version
.retain(|_, v| !v.is_expired());
self.dst_saved_conn_bitmap_version.shrink_to_fit();
self.dst_saved_conn_info_version.shrink_to_fit();
self.dst_saved_foreign_network_versions
.retain(|_, v| !v.is_expired());
@@ -1441,7 +1478,7 @@ impl PeerRouteServiceImpl {
group_trust_map_cache: DashMap::new(),
version: AtomicVersion::new(),
},
cached_local_conn_map: std::sync::Mutex::new(RouteConnBitmap::new()),
cached_local_conn_map: std::sync::Mutex::new(RouteConnBitmap::default()),
cached_local_conn_map_version: AtomicVersion::new(),
last_update_my_foreign_network: AtomicCell::new(None),
@@ -1634,23 +1671,33 @@ impl PeerRouteServiceImpl {
.synced_route_info
.conn_map
.iter()
.map(|x| (*x.key(), x.value().1.get()))
.map(|x| PeerIdVersion {
peer_id: *x.key(),
version: x.value().1.get(),
})
// do not sync conn info of peers that are not reachable from any peer.
.filter(|p| all_dst_peer_ids.contains(&p.0) || self.route_table.peer_reachable(p.0))
.filter(|p| {
all_dst_peer_ids.contains(&p.peer_id) || self.route_table.peer_reachable(p.peer_id)
})
.collect::<Vec<_>>();
let mut conn_bitmap = RouteConnBitmap::new();
conn_bitmap.bitmap = vec![0; (all_peer_ids.len() * all_peer_ids.len()).div_ceil(8)];
conn_bitmap.peer_ids = all_peer_ids;
let mut conn_bitmap = RouteConnBitmap {
bitmap: vec![0; (all_peer_ids.len() * all_peer_ids.len()).div_ceil(8)],
peer_ids: all_peer_ids,
};
let all_peer_ids = &conn_bitmap.peer_ids;
for (peer_idx, (peer_id, _)) in all_peer_ids.iter().enumerate() {
let Some(connected) = self.synced_route_info.conn_map.get(peer_id) else {
for (peer_idx, peer_id_version) in all_peer_ids.iter().enumerate() {
let Some(connected) = self
.synced_route_info
.conn_map
.get(&peer_id_version.peer_id)
else {
continue;
};
for (idx, (other_peer_id, _)) in all_peer_ids.iter().enumerate() {
if connected.0.contains(other_peer_id) {
for (idx, other_peer_id_version) in all_peer_ids.iter().enumerate() {
if connected.0.contains(&other_peer_id_version.peer_id) {
let bit_idx = peer_idx * all_peer_ids.len() + idx;
conn_bitmap.bitmap[bit_idx / 8] |= 1 << (bit_idx % 8);
}
@@ -1692,8 +1739,9 @@ impl PeerRouteServiceImpl {
fn build_conn_bitmap(&self, session: &SyncRouteSession) -> Option<RouteConnBitmap> {
let mut need_update = false;
for (peer_id, local_version) in self.cached_local_conn_map.lock().unwrap().peer_ids.iter() {
if session.check_saved_conn_version_update_to_date(*peer_id, *local_version) {
for peer_id_version in self.cached_local_conn_map.lock().unwrap().peer_ids.iter() {
let (peer_id, local_version) = (peer_id_version.peer_id, peer_id_version.version);
if session.check_saved_conn_version_update_to_date(peer_id, local_version) {
continue;
}
need_update = true;
@@ -1706,6 +1754,46 @@ impl PeerRouteServiceImpl {
Some(self.cached_local_conn_map.lock().unwrap().clone())
}
fn estimate_conn_bitmap_size(&self) -> usize {
let cached_conn_map = self.cached_local_conn_map.lock().unwrap();
cached_conn_map.bitmap.len()
+ (cached_conn_map.peer_ids.len() * std::mem::size_of::<PeerIdVersion>())
}
fn build_conn_peer_list(
&self,
session: &SyncRouteSession,
estimated_size: &mut usize,
) -> Option<RouteConnPeerList> {
let mut peer_conn_infos = Vec::new();
let cached_conn_map = self.cached_local_conn_map.lock().unwrap();
*estimated_size = 0;
for peer_id_version in cached_conn_map.peer_ids.iter() {
let (peer_id, local_version) = (peer_id_version.peer_id, peer_id_version.version);
if session.check_saved_conn_version_update_to_date(peer_id, local_version) {
continue;
}
let Some(connected) = self.synced_route_info.conn_map.get(&peer_id) else {
continue;
};
peer_conn_infos.push(PeerConnInfo {
peer_id: Some(*peer_id_version),
connected_peer_ids: connected.0.iter().copied().collect(),
});
*estimated_size += std::mem::size_of::<PeerIdVersion>()
+ connected.0.len() * std::mem::size_of::<PeerId>();
}
if peer_conn_infos.is_empty() {
return None;
}
Some(RouteConnPeerList { peer_conn_infos })
}
fn build_foreign_network_info(
&self,
session: &SyncRouteSession,
@@ -1750,16 +1838,51 @@ impl PeerRouteServiceImpl {
fn build_sync_request(
&self,
session: &SyncRouteSession,
dst_peer_id: PeerId,
) -> (
Option<Vec<RoutePeerInfo>>,
Option<RouteConnBitmap>,
Option<crate::proto::peer_rpc::sync_route_info_request::ConnInfo>,
Option<RouteForeignNetworkInfos>,
) {
let route_infos = self.build_route_info(session);
let conn_bitmap = self.build_conn_bitmap(session);
let conn_info = self.build_conn_info(session, dst_peer_id);
let foreign_network = self.build_foreign_network_info(session);
(route_infos, conn_bitmap, foreign_network)
(route_infos, conn_info, foreign_network)
}
fn build_conn_info(
&self,
session: &SyncRouteSession,
dst_peer_id: PeerId,
) -> Option<crate::proto::peer_rpc::sync_route_info_request::ConnInfo> {
// Check if destination peer supports selective peer list sync
let dst_supports_peer_list = self
.synced_route_info
.peer_infos
.get(&dst_peer_id)
.and_then(|p| p.feature_flag)
.map(|x| x.support_conn_list_sync)
.unwrap_or(false);
if !dst_supports_peer_list && !FORCE_USE_CONN_LIST.load(Ordering::Relaxed) {
// Destination peer doesn't support peer list, use bitmap format
return self.build_conn_bitmap(session).map(Into::into);
}
// Both formats are supported, choose the more efficient one
let mut conn_list_estimated_size = 0;
let peer_list = self.build_conn_peer_list(session, &mut conn_list_estimated_size);
let bitmap_size = self.estimate_conn_bitmap_size();
if conn_list_estimated_size < bitmap_size
|| FORCE_USE_CONN_LIST.load(Ordering::Relaxed)
|| peer_list.is_none()
{
peer_list.map(Into::into)
} else {
self.build_conn_bitmap(session).map(Into::into)
}
}
fn clear_expired_peer(&self) {
@@ -1848,9 +1971,10 @@ impl PeerRouteServiceImpl {
let my_peer_id = self.my_peer_id;
let (peer_infos, conn_bitmap, foreign_network) = self.build_sync_request(&session);
let (peer_infos, conn_info, foreign_network) =
self.build_sync_request(&session, dst_peer_id);
if peer_infos.is_none()
&& conn_bitmap.is_none()
&& conn_info.is_none()
&& foreign_network.is_none()
&& !session.need_sync_initiator_info.load(Ordering::Relaxed)
&& !(sync_as_initiator && session.we_are_initiator.load(Ordering::Relaxed))
@@ -1858,8 +1982,8 @@ impl PeerRouteServiceImpl {
return true;
}
tracing::debug!(?foreign_network, "sync_route request need send to peer. my_id {:?}, pper_id: {:?}, peer_infos: {:?}, conn_bitmap: {:?}, synced_route_info: {:?} session: {:?}",
my_peer_id, dst_peer_id, peer_infos, conn_bitmap, self.synced_route_info, session);
tracing::debug!(?foreign_network, "sync_route request need send to peer. my_id {:?}, pper_id: {:?}, peer_infos: {:?}, conn_info: {:?}, synced_route_info: {:?} session: {:?}",
my_peer_id, dst_peer_id, peer_infos, conn_info, self.synced_route_info, session);
session
.need_sync_initiator_info
@@ -1878,7 +2002,7 @@ impl PeerRouteServiceImpl {
my_session_id: session.my_session_id.load(Ordering::Relaxed),
is_initiator: session.we_are_initiator.load(Ordering::Relaxed),
peer_infos: peer_infos.clone().map(|x| RoutePeerInfos { items: x }),
conn_bitmap: conn_bitmap.clone().map(Into::into),
conn_info: conn_info.clone(),
foreign_network_infos: foreign_network.clone(),
};
@@ -1934,8 +2058,9 @@ impl PeerRouteServiceImpl {
session.update_dst_saved_peer_info_version(peer_infos, dst_peer_id);
}
if let Some(conn_bitmap) = &conn_bitmap {
session.update_dst_saved_conn_bitmap_version(conn_bitmap, dst_peer_id);
// Update session saved versions based on the connection info format used
if let Some(conn_info) = &conn_info {
session.update_dst_saved_conn_info_version(conn_info, dst_peer_id);
}
if let Some(foreign_network) = &foreign_network {
@@ -2029,7 +2154,7 @@ impl OspfRouteRpc for RouteSessionManager {
let from_session_id = request.my_session_id;
let is_initiator = request.is_initiator;
let peer_infos = request.peer_infos.map(|x| x.items);
let conn_bitmap = request.conn_bitmap.map(Into::into);
let conn_info = request.conn_info;
let foreign_network = request.foreign_network_infos;
let raw_peer_infos = if peer_infos.is_some() {
let r = get_raw_peer_infos(&mut ctrl.get_raw_input().unwrap()).unwrap();
@@ -2046,7 +2171,7 @@ impl OspfRouteRpc for RouteSessionManager {
is_initiator,
peer_infos,
raw_peer_infos,
conn_bitmap,
conn_info,
foreign_network,
)
.await;
@@ -2293,7 +2418,7 @@ impl RouteSessionManager {
is_initiator: bool,
peer_infos: Option<Vec<RoutePeerInfo>>,
raw_peer_infos: Option<Vec<DynamicMessage>>,
conn_bitmap: Option<RouteConnBitmap>,
conn_info: Option<crate::proto::peer_rpc::sync_route_info_request::ConnInfo>,
foreign_network: Option<RouteForeignNetworkInfos>,
) -> Result<SyncRouteInfoResponse, Error> {
let Some(service_impl) = self.service_impl.upgrade() else {
@@ -2327,9 +2452,9 @@ impl RouteSessionManager {
need_update_route_table = true;
}
if let Some(conn_bitmap) = &conn_bitmap {
service_impl.synced_route_info.update_conn_map(conn_bitmap);
session.update_dst_saved_conn_bitmap_version(conn_bitmap, from_peer_id);
if let Some(conn_info) = &conn_info {
service_impl.synced_route_info.update_conn_info(conn_info);
session.update_dst_saved_conn_info_version(conn_info, from_peer_id);
need_update_route_table = true;
}
@@ -2349,8 +2474,8 @@ impl RouteSessionManager {
}
tracing::debug!(
"handling sync_route_info rpc: from_peer_id: {:?}, is_initiator: {:?}, peer_infos: {:?}, conn_bitmap: {:?}, synced_route_info: {:?} session: {:?}, new_route_table: {:?}",
from_peer_id, is_initiator, peer_infos, conn_bitmap, service_impl.synced_route_info, session, service_impl.route_table);
"handling sync_route_info rpc: from_peer_id: {:?}, is_initiator: {:?}, peer_infos: {:?}, conn_info: {:?}, synced_route_info: {:?} session: {:?}, new_route_table: {:?}",
from_peer_id, is_initiator, peer_infos, conn_info, service_impl.synced_route_info, session, service_impl.route_table);
session
.dst_is_initiator
@@ -2715,9 +2840,9 @@ mod tests {
peers::{
create_packet_recv_chan,
peer_manager::{PeerManager, RouteAlgoType},
peer_ospf_route::{PeerIdAndVersion, PeerRouteServiceImpl},
peer_ospf_route::{PeerIdAndVersion, PeerRouteServiceImpl, FORCE_USE_CONN_LIST},
route_trait::{NextHopPolicy, Route, RouteCostCalculatorInterface},
tests::{connect_peer_manager, create_mock_peer_manager},
tests::{connect_peer_manager, create_mock_peer_manager, wait_route_appear},
},
proto::{
common::NatType,
@@ -2773,8 +2898,11 @@ mod tests {
assert!(rx1 <= max_rx);
}
#[rstest::rstest]
#[tokio::test]
async fn ospf_route_2node() {
async fn ospf_route_2node(#[values(true, false)] enable_conn_list_sync: bool) {
FORCE_USE_CONN_LIST.store(enable_conn_list_sync, Ordering::Relaxed);
let p_a = create_mock_pmgr().await;
let p_b = create_mock_pmgr().await;
connect_peer_manager(p_a.clone(), p_b.clone()).await;
@@ -2845,8 +2973,11 @@ mod tests {
.await;
}
#[rstest::rstest]
#[tokio::test]
async fn ospf_route_multi_node() {
async fn ospf_route_multi_node(#[values(true, false)] enable_conn_list_sync: bool) {
FORCE_USE_CONN_LIST.store(enable_conn_list_sync, Ordering::Relaxed);
let p_a = create_mock_pmgr().await;
let p_b = create_mock_pmgr().await;
let p_c = create_mock_pmgr().await;
@@ -2976,8 +3107,10 @@ mod tests {
}
}
#[rstest::rstest]
#[tokio::test]
async fn ospf_route_3node_disconnect() {
async fn ospf_route_3node_disconnect(#[values(true, false)] enable_conn_list_sync: bool) {
FORCE_USE_CONN_LIST.store(enable_conn_list_sync, Ordering::Relaxed);
let p_a = create_mock_pmgr().await;
let p_b = create_mock_pmgr().await;
let p_c = create_mock_pmgr().await;
@@ -3019,8 +3152,10 @@ mod tests {
}
}
#[rstest::rstest]
#[tokio::test]
async fn peer_reconnect() {
async fn peer_reconnect(#[values(true, false)] enable_conn_list_sync: bool) {
FORCE_USE_CONN_LIST.store(enable_conn_list_sync, Ordering::Relaxed);
let p_a = create_mock_pmgr().await;
let p_b = create_mock_pmgr().await;
let r_a = create_mock_route(p_a.clone()).await;
@@ -3063,8 +3198,10 @@ mod tests {
check_rpc_counter(&r_a, p_b.my_peer_id(), 2, 2);
}
#[rstest::rstest]
#[tokio::test]
async fn test_cost_calculator() {
async fn test_cost_calculator(#[values(true, false)] enable_conn_list_sync: bool) {
FORCE_USE_CONN_LIST.store(enable_conn_list_sync, Ordering::Relaxed);
let p_a = create_mock_pmgr().await;
let p_b = create_mock_pmgr().await;
let p_c = create_mock_pmgr().await;
@@ -3150,8 +3287,10 @@ mod tests {
.await;
}
#[rstest::rstest]
#[tokio::test]
async fn test_raw_peer_info() {
async fn test_raw_peer_info(#[values(true, false)] enable_conn_list_sync: bool) {
FORCE_USE_CONN_LIST.store(enable_conn_list_sync, Ordering::Relaxed);
let mut req = SyncRouteInfoRequest::default();
let raw_info_map: DashMap<PeerId, DynamicMessage> = DashMap::new();
@@ -3177,8 +3316,10 @@ mod tests {
assert_eq!(req, req2);
}
#[rstest::rstest]
#[tokio::test]
async fn test_peer_id_map_override() {
async fn test_peer_id_map_override(#[values(true, false)] enable_conn_list_sync: bool) {
FORCE_USE_CONN_LIST.store(enable_conn_list_sync, Ordering::Relaxed);
let p_a = create_mock_peer_manager().await;
let p_b = create_mock_peer_manager().await;
let p_c = create_mock_peer_manager().await;
@@ -3230,9 +3371,10 @@ mod tests {
p_b.get_global_ctx().config.remove_proxy_cidr(proxy);
check_route_peer_id(p_c.clone()).await;
}
#[rstest::rstest]
#[tokio::test]
async fn test_subnet_proxy_conflict() {
async fn test_subnet_proxy_conflict(#[values(true, false)] enable_conn_list_sync: bool) {
FORCE_USE_CONN_LIST.store(enable_conn_list_sync, Ordering::Relaxed);
// Create three peer managers: A, B, C
let p_a = create_mock_peer_manager().await;
let p_b = create_mock_peer_manager().await;
@@ -3325,4 +3467,21 @@ mod tests {
)
.await;
}
#[rstest::rstest]
#[tokio::test]
async fn test_connect_at_different_time(#[values(true, false)] enable_conn_list_sync: bool) {
FORCE_USE_CONN_LIST.store(enable_conn_list_sync, Ordering::Relaxed);
// Create three peer managers: A, B, C
let p_a = create_mock_peer_manager().await;
let p_b = create_mock_peer_manager().await;
let p_c = create_mock_peer_manager().await;
// Connect A-B-C in a line topology
connect_peer_manager(p_a.clone(), p_b.clone()).await;
wait_route_appear(p_a.clone(), p_b.clone()).await.unwrap();
connect_peer_manager(p_b.clone(), p_c.clone()).await;
wait_route_appear(p_a.clone(), p_c.clone()).await.unwrap();
}
}

View File

@@ -203,6 +203,7 @@ message PeerFeatureFlag {
bool avoid_relay_data = 2;
bool kcp_input = 3;
bool no_relay_kcp = 4;
bool support_conn_list_sync = 5;
}
enum SocketType {

View File

@@ -39,6 +39,14 @@ message RouteConnBitmap {
bytes bitmap = 2;
}
message RouteConnPeerList {
message PeerConnInfo {
PeerIdVersion peer_id = 1;
repeated uint32 connected_peer_ids = 2;
}
repeated PeerConnInfo peer_conn_infos = 1;
}
message RoutePeerInfos { repeated RoutePeerInfo items = 1; }
message ForeignNetworkRouteInfoKey {
@@ -82,7 +90,10 @@ message SyncRouteInfoRequest {
uint64 my_session_id = 2;
bool is_initiator = 3;
RoutePeerInfos peer_infos = 4;
oneof conn_info {
RouteConnBitmap conn_bitmap = 5;
RouteConnPeerList conn_peer_list = 7;
}
RouteForeignNetworkInfos foreign_network_infos = 6;
}

View File

@@ -38,6 +38,18 @@ impl PeerGroupInfo {
}
}
impl From<RouteConnBitmap> for sync_route_info_request::ConnInfo {
fn from(val: RouteConnBitmap) -> Self {
Self::ConnBitmap(val)
}
}
impl From<RouteConnPeerList> for sync_route_info_request::ConnInfo {
fn from(val: RouteConnPeerList) -> Self {
Self::ConnPeerList(val)
}
}
#[cfg(test)]
mod tests {
use super::*;