fix no relay not work in local network (#476)

This commit is contained in:
Sijie.Sun
2024-11-16 14:36:17 +08:00
committed by GitHub
parent 6cdea38284
commit 15ad92aef2
11 changed files with 298 additions and 44 deletions

View File

@@ -453,26 +453,14 @@ impl ForeignNetworkManager {
}
}
fn check_network_in_whitelist(&self, network_name: &str) -> Result<(), Error> {
if self
.global_ctx
.get_flags()
.foreign_network_whitelist
.split(" ")
.map(wildmatch::WildMatch::new)
.any(|wl| wl.matches(network_name))
{
Ok(())
} else {
Err(anyhow::anyhow!("network {} not in whitelist", network_name).into())
}
}
pub async fn add_peer_conn(&self, peer_conn: PeerConn) -> Result<(), Error> {
tracing::info!(peer_conn = ?peer_conn.get_conn_info(), network = ?peer_conn.get_network_identity(), "add new peer conn in foreign network manager");
let relay_peer_rpc = self.global_ctx.get_flags().relay_all_peer_rpc;
let ret = self.check_network_in_whitelist(&peer_conn.get_network_identity().network_name);
let ret = self
.global_ctx
.check_network_in_whitelist(&peer_conn.get_network_identity().network_name)
.map_err(Into::into);
if ret.is_err() && !relay_peer_rpc {
return ret;
}
@@ -686,7 +674,7 @@ mod tests {
let pm_center = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await;
tracing::debug!("pm_center: {:?}", pm_center.my_peer_id());
let mut flag = pm_center.get_global_ctx().get_flags();
flag.foreign_network_whitelist = vec!["net1".to_string(), "net2*".to_string()].join(" ");
flag.relay_network_whitelist = vec!["net1".to_string(), "net2*".to_string()].join(" ");
pm_center.get_global_ctx().config.set_flags(flag);
let pma_net1 = create_mock_peer_manager_for_foreign_network(name.as_str()).await;
@@ -711,7 +699,7 @@ mod tests {
async fn only_relay_peer_rpc() {
let pm_center = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await;
let mut flag = pm_center.get_global_ctx().get_flags();
flag.foreign_network_whitelist = "".to_string();
flag.relay_network_whitelist = "".to_string();
flag.relay_all_peer_rpc = true;
pm_center.get_global_ctx().config.set_flags(flag);
tracing::debug!("pm_center: {:?}", pm_center.my_peer_id());

View File

@@ -181,6 +181,16 @@ impl PeerManager {
}
}
if global_ctx
.check_network_in_whitelist(&global_ctx.get_network_name())
.is_err()
{
// if local network is not in whitelist, avoid relay data when exist any other route path
let mut f = global_ctx.get_feature_flags();
f.avoid_relay_data = true;
global_ctx.set_feature_flags(f);
}
// TODO: remove these because we have impl pipeline processor.
let (peer_rpc_tspt_sender, peer_rpc_tspt_recv) = mpsc::unbounded_channel();
let rpc_tspt = Arc::new(RpcTransport {
@@ -919,9 +929,10 @@ mod tests {
peers::{
peer_manager::RouteAlgoType,
peer_rpc::tests::register_service,
tests::{connect_peer_manager, wait_route_appear},
route_trait::NextHopPolicy,
tests::{connect_peer_manager, wait_route_appear, wait_route_appear_with_cost},
},
proto::common::{CompressionAlgoPb, NatType},
proto::common::{CompressionAlgoPb, NatType, PeerFeatureFlag},
tunnel::{common::tests::wait_for_condition, TunnelConnector, TunnelListener},
};
@@ -1088,4 +1099,70 @@ mod tests {
connect_peer_manager(peer_mgr_b.clone(), mgr_d.clone()).await;
wait_route_appear(mgr_d, peer_mgr_b).await.unwrap();
}
#[tokio::test]
async fn test_avoid_relay_data() {
// a->b->c
// a->d->e->c
let peer_mgr_a = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await;
let peer_mgr_b = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await;
let peer_mgr_c = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await;
let peer_mgr_d = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await;
let peer_mgr_e = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await;
connect_peer_manager(peer_mgr_a.clone(), peer_mgr_b.clone()).await;
connect_peer_manager(peer_mgr_b.clone(), peer_mgr_c.clone()).await;
connect_peer_manager(peer_mgr_a.clone(), peer_mgr_d.clone()).await;
connect_peer_manager(peer_mgr_d.clone(), peer_mgr_e.clone()).await;
connect_peer_manager(peer_mgr_e.clone(), peer_mgr_c.clone()).await;
// when b's avoid_relay_data is false, a->c should route through b and cost is 2
wait_route_appear_with_cost(peer_mgr_a.clone(), peer_mgr_c.my_peer_id, Some(2))
.await
.unwrap();
let ret = peer_mgr_a
.get_route()
.get_next_hop_with_policy(peer_mgr_c.my_peer_id, NextHopPolicy::LeastCost)
.await;
assert_eq!(ret, Some(peer_mgr_b.my_peer_id));
// when b's avoid_relay_data is true, a->c should route through d and e, cost is 3
peer_mgr_b
.get_global_ctx()
.set_feature_flags(PeerFeatureFlag {
avoid_relay_data: true,
..Default::default()
});
tokio::time::sleep(Duration::from_secs(2)).await;
wait_route_appear_with_cost(peer_mgr_a.clone(), peer_mgr_c.my_peer_id, Some(3))
.await
.expect(
format!(
"route not appear, a route table: {}, table: {:#?}",
peer_mgr_a.get_route().dump().await,
peer_mgr_a.get_route().list_routes().await
)
.as_str(),
);
let ret = peer_mgr_a
.get_route()
.get_next_hop_with_policy(peer_mgr_c.my_peer_id, NextHopPolicy::LeastCost)
.await;
assert_eq!(ret, Some(peer_mgr_d.my_peer_id));
println!("route table: {:#?}", peer_mgr_a.list_routes().await);
// drop e, path should go back to through b
drop(peer_mgr_e);
wait_route_appear_with_cost(peer_mgr_a.clone(), peer_mgr_c.my_peer_id, Some(2))
.await
.unwrap();
let ret = peer_mgr_a
.get_route()
.get_next_hop_with_policy(peer_mgr_c.my_peer_id, NextHopPolicy::LeastCost)
.await;
assert_eq!(ret, Some(peer_mgr_b.my_peer_id));
}
}

View File

@@ -57,6 +57,7 @@ use super::{
static SERVICE_ID: u32 = 7;
static UPDATE_PEER_INFO_PERIOD: Duration = Duration::from_secs(3600);
static REMOVE_DEAD_PEER_INFO_AFTER: Duration = Duration::from_secs(3660);
static AVOID_RELAY_COST: i32 = i32::MAX / 512;
type Version = u32;
@@ -192,8 +193,9 @@ impl Into<crate::proto::cli::Route> for RoutePeerInfo {
} else {
None
},
next_hop_peer_id: 0,
cost: self.cost as i32,
next_hop_peer_id: 0, // next_hop_peer_id is calculated in RouteTable.
cost: 0, // cost is calculated in RouteTable.
path_latency: 0, // path_latency is calculated in RouteTable.
proxy_cidrs: self.proxy_cidrs.clone(),
hostname: self.hostname.unwrap_or_default(),
stun_info: {
@@ -206,6 +208,10 @@ impl Into<crate::proto::cli::Route> for RoutePeerInfo {
inst_id: self.inst_id.map(|x| x.to_string()).unwrap_or_default(),
version: self.easytier_version,
feature_flag: self.feature_flag,
next_hop_peer_id_latency_first: None,
cost_latency_first: None,
path_latency_latency_first: None,
}
}
}
@@ -314,6 +320,15 @@ impl SyncedRouteInfo {
.unwrap_or(0)
}
fn get_avoid_relay_data(&self, peer_id: PeerId) -> bool {
// if avoid relay, just set all outgoing edges to a large value: AVOID_RELAY_COST.
self.peer_infos
.get(&peer_id)
.and_then(|x| x.value().feature_flag)
.map(|x| x.avoid_relay_data)
.unwrap_or_default()
}
fn check_duplicate_peer_id(
&self,
my_peer_id: PeerId,
@@ -538,7 +553,14 @@ impl SyncedRouteInfo {
type PeerGraph = Graph<PeerId, i32, Directed>;
type PeerIdToNodexIdxMap = DashMap<PeerId, NodeIndex>;
type NextHopMap = DashMap<PeerId, (PeerId, i32)>;
#[derive(Debug, Clone, Copy)]
struct NextHopInfo {
next_hop_peer_id: PeerId,
path_latency: i32,
path_len: usize, // path includes src and dst.
}
// dst_peer_id -> (next_hop_peer_id, cost, path_len)
type NextHopMap = DashMap<PeerId, NextHopInfo>;
// computed with SyncedRouteInfo. used to get next hop.
#[derive(Debug)]
@@ -559,7 +581,7 @@ impl RouteTable {
}
}
fn get_next_hop(&self, dst_peer_id: PeerId) -> Option<(PeerId, i32)> {
fn get_next_hop(&self, dst_peer_id: PeerId) -> Option<NextHopInfo> {
self.next_hop_map.get(&dst_peer_id).map(|x| *x)
}
@@ -588,6 +610,10 @@ impl RouteTable {
let connected_peers = synced_info
.get_connected_peers(*peer_id)
.unwrap_or(BTreeSet::new());
// if avoid relay, just set all outgoing edges to a large value: AVOID_RELAY_COST.
let peer_avoid_relay_data = synced_info.get_avoid_relay_data(*peer_id);
for dst_peer_id in connected_peers.iter() {
let Some(dst_idx) = peer_id_to_node_index.get(dst_peer_id) else {
continue;
@@ -596,7 +622,11 @@ impl RouteTable {
graph.add_edge(
*peer_id_to_node_index.get(&peer_id).unwrap(),
*dst_idx,
cost_calc.calculate_cost(*peer_id, *dst_peer_id),
if peer_avoid_relay_data {
AVOID_RELAY_COST
} else {
cost_calc.calculate_cost(*peer_id, *dst_peer_id)
},
);
}
}
@@ -616,36 +646,56 @@ impl RouteTable {
if *cost == 0 {
continue;
}
let all_paths = all_simple_paths::<Vec<_>, _>(
let mut all_paths = all_simple_paths::<Vec<_>, _>(
graph,
*idx_map.get(&my_peer_id).unwrap(),
*node_idx,
*cost - 1,
Some(*cost - 1),
Some(*cost + 1), // considering having avoid relay, the max cost could be a bit larger.
)
.collect::<Vec<_>>();
assert!(!all_paths.is_empty());
all_paths.sort_by(|a, b| a.len().cmp(&b.len()));
// find a path with least cost.
let mut min_cost = i32::MAX;
let mut min_path_len = usize::MAX;
let mut min_path = Vec::new();
for path in all_paths.iter() {
if min_path_len < path.len() && min_cost < AVOID_RELAY_COST {
// the min path does not contain avoid relay node.
break;
}
let mut cost = 0;
for i in 0..path.len() - 1 {
let src_peer_id = *graph.node_weight(path[i]).unwrap();
let dst_peer_id = *graph.node_weight(path[i + 1]).unwrap();
cost += cost_calc.calculate_cost(src_peer_id, dst_peer_id);
let edge_weight = *graph
.edge_weight(graph.find_edge(path[i], path[i + 1]).unwrap())
.unwrap();
if edge_weight != 1 {
// means avoid relay.
cost += edge_weight;
} else {
cost += cost_calc.calculate_cost(src_peer_id, dst_peer_id);
}
}
if cost <= min_cost {
min_cost = cost;
min_path = path.clone();
min_path_len = path.len();
}
}
next_hop_map.insert(
*graph.node_weight(*node_idx).unwrap(),
(*graph.node_weight(min_path[1]).unwrap(), *cost as i32),
NextHopInfo {
next_hop_peer_id: *graph.node_weight(min_path[1]).unwrap(),
path_latency: min_cost,
path_len: min_path_len,
},
);
}
@@ -675,7 +725,14 @@ impl RouteTable {
continue;
};
next_hop_map.insert(*item.key(), (*graph.node_weight(path[1]).unwrap(), cost));
next_hop_map.insert(
*item.key(),
NextHopInfo {
next_hop_peer_id: *graph.node_weight(path[1]).unwrap(),
path_latency: cost,
path_len: path.len(),
},
);
}
next_hop_map
@@ -707,7 +764,14 @@ impl RouteTable {
// build next hop map
self.next_hop_map.clear();
self.next_hop_map.insert(my_peer_id, (my_peer_id, 0));
self.next_hop_map.insert(
my_peer_id,
NextHopInfo {
next_hop_peer_id: my_peer_id,
path_latency: 0,
path_len: 1,
},
);
let (graph, idx_map) = Self::build_peer_graph_from_synced_info(
self.peer_infos.iter().map(|x| *x.key()).collect(),
&synced_info,
@@ -1937,7 +2001,9 @@ impl Route for PeerRoute {
async fn get_next_hop(&self, dst_peer_id: PeerId) -> Option<PeerId> {
let route_table = &self.service_impl.route_table;
route_table.get_next_hop(dst_peer_id).map(|x| x.0)
route_table
.get_next_hop(dst_peer_id)
.map(|x| x.next_hop_peer_id)
}
async fn get_next_hop_with_policy(
@@ -1950,11 +2016,14 @@ impl Route for PeerRoute {
} else {
&self.service_impl.route_table
};
route_table.get_next_hop(dst_peer_id).map(|x| x.0)
route_table
.get_next_hop(dst_peer_id)
.map(|x| x.next_hop_peer_id)
}
async fn list_routes(&self) -> Vec<crate::proto::cli::Route> {
let route_table = &self.service_impl.route_table;
let route_table_with_cost = &self.service_impl.route_table_with_cost;
let mut routes = Vec::new();
for item in route_table.peer_infos.iter() {
if *item.key() == self.my_peer_id {
@@ -1963,9 +2032,17 @@ impl Route for PeerRoute {
let Some(next_hop_peer) = route_table.get_next_hop(*item.key()) else {
continue;
};
let next_hop_peer_latency_first = route_table_with_cost.get_next_hop(*item.key());
let mut route: crate::proto::cli::Route = item.value().clone().into();
route.next_hop_peer_id = next_hop_peer.0;
route.cost = next_hop_peer.1;
route.next_hop_peer_id = next_hop_peer.next_hop_peer_id;
route.cost = (next_hop_peer.path_len - 1) as i32;
route.path_latency = next_hop_peer.path_latency;
route.next_hop_peer_id_latency_first =
next_hop_peer_latency_first.map(|x| x.next_hop_peer_id);
route.cost_latency_first = next_hop_peer_latency_first.map(|x| x.path_latency);
route.path_latency_latency_first = next_hop_peer_latency_first.map(|x| x.path_latency);
routes.push(route);
}
routes