mirror of
https://mirror.suhoan.cn/https://github.com/EasyTier/EasyTier.git
synced 2025-12-12 12:47:25 +08:00
fix ospf route (#970)
- **fix deadlock in ospf route introducd by #958 ** - **use random peer id for foreign network entry, because ospf route algo need peer id change after peer info version reset. this may interfere route propagation and cause node residual** - **allow multiple nodes broadcast same network ranges for subnet proxy** - **bump version to v2.3.2**
This commit is contained in:
2
.github/workflows/docker.yml
vendored
2
.github/workflows/docker.yml
vendored
@@ -11,7 +11,7 @@ on:
|
||||
image_tag:
|
||||
description: 'Tag for this image build'
|
||||
type: string
|
||||
default: 'v2.3.1'
|
||||
default: 'v2.3.2'
|
||||
required: true
|
||||
mark_latest:
|
||||
description: 'Mark this image as latest'
|
||||
|
||||
2
.github/workflows/release.yml
vendored
2
.github/workflows/release.yml
vendored
@@ -21,7 +21,7 @@ on:
|
||||
version:
|
||||
description: 'Version for this release'
|
||||
type: string
|
||||
default: 'v2.3.1'
|
||||
default: 'v2.3.2'
|
||||
required: true
|
||||
make_latest:
|
||||
description: 'Mark this release as latest'
|
||||
|
||||
6
Cargo.lock
generated
6
Cargo.lock
generated
@@ -1942,7 +1942,7 @@ checksum = "0d6ef0072f8a535281e4876be788938b528e9a1d43900b82c2569af7da799125"
|
||||
|
||||
[[package]]
|
||||
name = "easytier"
|
||||
version = "2.3.1"
|
||||
version = "2.3.2"
|
||||
dependencies = [
|
||||
"aes-gcm",
|
||||
"anyhow",
|
||||
@@ -2070,7 +2070,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "easytier-gui"
|
||||
version = "2.3.1"
|
||||
version = "2.3.2"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"chrono",
|
||||
@@ -2116,7 +2116,7 @@ dependencies = [
|
||||
|
||||
[[package]]
|
||||
name = "easytier-web"
|
||||
version = "2.3.1"
|
||||
version = "2.3.2"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"async-trait",
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
id=easytier_magisk
|
||||
name=EasyTier_Magisk
|
||||
version=v2.3.1
|
||||
version=v2.3.2
|
||||
versionCode=1
|
||||
author=EasyTier
|
||||
description=easytier magisk module @EasyTier(https://github.com/EasyTier/EasyTier)
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
{
|
||||
"name": "easytier-gui",
|
||||
"type": "module",
|
||||
"version": "2.3.1",
|
||||
"version": "2.3.2",
|
||||
"private": true,
|
||||
"packageManager": "pnpm@9.12.1+sha512.e5a7e52a4183a02d5931057f7a0dbff9d5e9ce3161e33fa68ae392125b79282a8a8a470a51dfc8a0ed86221442eb2fb57019b0990ed24fab519bf0e1bc5ccfc4",
|
||||
"scripts": {
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "easytier-gui"
|
||||
version = "2.3.1"
|
||||
version = "2.3.2"
|
||||
description = "EasyTier GUI"
|
||||
authors = ["you"]
|
||||
edition = "2021"
|
||||
|
||||
@@ -17,7 +17,7 @@
|
||||
"createUpdaterArtifacts": false
|
||||
},
|
||||
"productName": "easytier-gui",
|
||||
"version": "2.3.1",
|
||||
"version": "2.3.2",
|
||||
"identifier": "com.kkrainbow.easytier",
|
||||
"plugins": {},
|
||||
"app": {
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
[package]
|
||||
name = "easytier-web"
|
||||
version = "2.3.1"
|
||||
version = "2.3.2"
|
||||
edition = "2021"
|
||||
description = "Config server for easytier. easytier-core gets config from this and web frontend use it as restful api server."
|
||||
|
||||
|
||||
@@ -3,7 +3,7 @@ name = "easytier"
|
||||
description = "A full meshed p2p VPN, connecting all your devices in one network with one command."
|
||||
homepage = "https://github.com/EasyTier/EasyTier"
|
||||
repository = "https://github.com/EasyTier/EasyTier"
|
||||
version = "2.3.1"
|
||||
version = "2.3.2"
|
||||
edition = "2021"
|
||||
authors = ["kkrainbow"]
|
||||
keywords = ["vpn", "p2p", "network", "easytier"]
|
||||
|
||||
@@ -70,13 +70,16 @@ struct ForeignNetworkEntry {
|
||||
packet_recv: Mutex<Option<PacketRecvChanReceiver>>,
|
||||
|
||||
tasks: Mutex<JoinSet<()>>,
|
||||
|
||||
pub lock: Mutex<()>,
|
||||
}
|
||||
|
||||
impl ForeignNetworkEntry {
|
||||
fn new(
|
||||
network: NetworkIdentity,
|
||||
global_ctx: ArcGlobalCtx,
|
||||
// NOTICE: ospf route need my_peer_id be changed after restart.
|
||||
my_peer_id: PeerId,
|
||||
global_ctx: ArcGlobalCtx,
|
||||
relay_data: bool,
|
||||
pm_packet_sender: PacketRecvChan,
|
||||
) -> Self {
|
||||
@@ -114,6 +117,8 @@ impl ForeignNetworkEntry {
|
||||
packet_recv: Mutex::new(Some(packet_recv)),
|
||||
|
||||
tasks: Mutex::new(JoinSet::new()),
|
||||
|
||||
lock: Mutex::new(()),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -202,11 +207,7 @@ impl ForeignNetworkEntry {
|
||||
(peer_rpc, rpc_transport_sender)
|
||||
}
|
||||
|
||||
async fn prepare_route(
|
||||
&self,
|
||||
my_peer_id: PeerId,
|
||||
accessor: Box<dyn GlobalForeignNetworkAccessor>,
|
||||
) {
|
||||
async fn prepare_route(&self, accessor: Box<dyn GlobalForeignNetworkAccessor>) {
|
||||
struct Interface {
|
||||
my_peer_id: PeerId,
|
||||
peer_map: Weak<PeerMap>,
|
||||
@@ -238,10 +239,14 @@ impl ForeignNetworkEntry {
|
||||
}
|
||||
}
|
||||
|
||||
let route = PeerRoute::new(my_peer_id, self.global_ctx.clone(), self.peer_rpc.clone());
|
||||
let route = PeerRoute::new(
|
||||
self.my_peer_id,
|
||||
self.global_ctx.clone(),
|
||||
self.peer_rpc.clone(),
|
||||
);
|
||||
route
|
||||
.open(Box::new(Interface {
|
||||
my_peer_id,
|
||||
my_peer_id: self.my_peer_id,
|
||||
network_identity: self.network.clone(),
|
||||
peer_map: Arc::downgrade(&self.peer_map),
|
||||
accessor,
|
||||
@@ -317,8 +322,8 @@ impl ForeignNetworkEntry {
|
||||
});
|
||||
}
|
||||
|
||||
async fn prepare(&self, my_peer_id: PeerId, accessor: Box<dyn GlobalForeignNetworkAccessor>) {
|
||||
self.prepare_route(my_peer_id, accessor).await;
|
||||
async fn prepare(&self, accessor: Box<dyn GlobalForeignNetworkAccessor>) {
|
||||
self.prepare_route(accessor).await;
|
||||
self.start_packet_recv().await;
|
||||
self.peer_rpc.run();
|
||||
}
|
||||
@@ -400,8 +405,8 @@ impl ForeignNetworkManagerData {
|
||||
new_added = true;
|
||||
Arc::new(ForeignNetworkEntry::new(
|
||||
network_identity.clone(),
|
||||
global_ctx.clone(),
|
||||
my_peer_id,
|
||||
global_ctx.clone(),
|
||||
relay_data,
|
||||
pm_packet_sender.clone(),
|
||||
))
|
||||
@@ -417,9 +422,7 @@ impl ForeignNetworkManagerData {
|
||||
drop(l);
|
||||
|
||||
if new_added {
|
||||
entry
|
||||
.prepare(my_peer_id, Box::new(self.accessor.clone()))
|
||||
.await;
|
||||
entry.prepare(Box::new(self.accessor.clone())).await;
|
||||
}
|
||||
|
||||
(entry, new_added)
|
||||
@@ -467,6 +470,13 @@ impl ForeignNetworkManager {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_network_peer_id(&self, network_name: &str) -> Option<PeerId> {
|
||||
self.data
|
||||
.network_peer_maps
|
||||
.get(network_name)
|
||||
.and_then(|v| Some(v.my_peer_id))
|
||||
}
|
||||
|
||||
pub async fn add_peer_conn(&self, peer_conn: PeerConn) -> Result<(), Error> {
|
||||
tracing::info!(peer_conn = ?peer_conn.get_conn_info(), network = ?peer_conn.get_network_identity(), "add new peer conn in foreign network manager");
|
||||
|
||||
@@ -483,7 +493,7 @@ impl ForeignNetworkManager {
|
||||
.data
|
||||
.get_or_insert_entry(
|
||||
&peer_conn.get_network_identity(),
|
||||
self.my_peer_id,
|
||||
peer_conn.get_my_peer_id(),
|
||||
peer_conn.get_peer_id(),
|
||||
!ret.is_err(),
|
||||
&self.global_ctx,
|
||||
@@ -491,17 +501,30 @@ impl ForeignNetworkManager {
|
||||
)
|
||||
.await;
|
||||
|
||||
if entry.network != peer_conn.get_network_identity() {
|
||||
let _g = entry.lock.lock().await;
|
||||
|
||||
if entry.network != peer_conn.get_network_identity()
|
||||
|| entry.my_peer_id != peer_conn.get_my_peer_id()
|
||||
{
|
||||
if new_added {
|
||||
self.data
|
||||
.remove_network(&entry.network.network_name.clone());
|
||||
}
|
||||
return Err(anyhow::anyhow!(
|
||||
let err = if entry.my_peer_id != peer_conn.get_my_peer_id() {
|
||||
anyhow::anyhow!(
|
||||
"my peer id not match. exp: {:?} real: {:?}, need retry connect",
|
||||
entry.my_peer_id,
|
||||
peer_conn.get_my_peer_id()
|
||||
)
|
||||
} else {
|
||||
anyhow::anyhow!(
|
||||
"network secret not match. exp: {:?} real: {:?}",
|
||||
entry.network,
|
||||
peer_conn.get_network_identity()
|
||||
)
|
||||
.into());
|
||||
};
|
||||
tracing::error!(?err, "foreign network entry not match, disconnect peer");
|
||||
return Err(err.into());
|
||||
}
|
||||
|
||||
if new_added {
|
||||
@@ -567,7 +590,8 @@ impl ForeignNetworkManager {
|
||||
.network_secret_digest
|
||||
.unwrap_or_default()
|
||||
.to_vec(),
|
||||
..Default::default()
|
||||
my_peer_id_for_this_network: item.my_peer_id,
|
||||
peers: Default::default(),
|
||||
};
|
||||
for peer in item.peer_map.list_peers().await {
|
||||
let mut peer_info = PeerInfo::default();
|
||||
@@ -614,8 +638,6 @@ impl Drop for ForeignNetworkManager {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::{
|
||||
common::global_ctx::tests::get_mock_global_ctx_with_network,
|
||||
connector::udp_hole_punch::tests::{
|
||||
@@ -629,6 +651,7 @@ mod tests {
|
||||
set_global_var,
|
||||
tunnel::common::tests::wait_for_condition,
|
||||
};
|
||||
use std::time::Duration;
|
||||
|
||||
use super::*;
|
||||
|
||||
@@ -769,7 +792,10 @@ mod tests {
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(
|
||||
vec![pm_center.my_peer_id()],
|
||||
vec![pm_center
|
||||
.get_foreign_network_manager()
|
||||
.get_network_peer_id("net1")
|
||||
.unwrap()],
|
||||
pma_net1
|
||||
.get_foreign_network_client()
|
||||
.get_peer_map()
|
||||
@@ -777,7 +803,10 @@ mod tests {
|
||||
.await
|
||||
);
|
||||
assert_eq!(
|
||||
vec![pm_center.my_peer_id()],
|
||||
vec![pm_center
|
||||
.get_foreign_network_manager()
|
||||
.get_network_peer_id("net1")
|
||||
.unwrap()],
|
||||
pmb_net1
|
||||
.get_foreign_network_client()
|
||||
.get_peer_map()
|
||||
@@ -894,6 +923,75 @@ mod tests {
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_foreign_network_manager_cluster_simple() {
|
||||
set_global_var!(OSPF_UPDATE_MY_GLOBAL_FOREIGN_NETWORK_INTERVAL_SEC, 1);
|
||||
|
||||
let pm_center1 = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await;
|
||||
let pm_center2 = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await;
|
||||
|
||||
connect_peer_manager(pm_center1.clone(), pm_center2.clone()).await;
|
||||
|
||||
let pma_net1 = create_mock_peer_manager_for_foreign_network("net1").await;
|
||||
let pmb_net1 = create_mock_peer_manager_for_foreign_network("net1").await;
|
||||
connect_peer_manager(pma_net1.clone(), pm_center1.clone()).await;
|
||||
connect_peer_manager(pmb_net1.clone(), pm_center2.clone()).await;
|
||||
|
||||
wait_route_appear(pma_net1.clone(), pmb_net1.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let pma_net2 = create_mock_peer_manager_for_foreign_network("net2").await;
|
||||
let pmb_net2 = create_mock_peer_manager_for_foreign_network("net2").await;
|
||||
connect_peer_manager(pma_net2.clone(), pm_center1.clone()).await;
|
||||
connect_peer_manager(pmb_net2.clone(), pm_center2.clone()).await;
|
||||
|
||||
wait_route_appear(pma_net2.clone(), pmb_net2.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_foreign_network_manager_cluster_multiple_hops() {
|
||||
set_global_var!(OSPF_UPDATE_MY_GLOBAL_FOREIGN_NETWORK_INTERVAL_SEC, 1);
|
||||
|
||||
let pm_center1 = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await;
|
||||
let pm_center2 = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await;
|
||||
let pm_center3 = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await;
|
||||
let pm_center4 = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await;
|
||||
|
||||
connect_peer_manager(pm_center1.clone(), pm_center2.clone()).await;
|
||||
connect_peer_manager(pm_center2.clone(), pm_center3.clone()).await;
|
||||
connect_peer_manager(pm_center3.clone(), pm_center4.clone()).await;
|
||||
|
||||
let pma_net1 = create_mock_peer_manager_for_foreign_network("net1").await;
|
||||
let pmb_net1 = create_mock_peer_manager_for_foreign_network("net1").await;
|
||||
connect_peer_manager(pma_net1.clone(), pm_center1.clone()).await;
|
||||
connect_peer_manager(pmb_net1.clone(), pm_center3.clone()).await;
|
||||
wait_route_appear(pma_net1.clone(), pmb_net1.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
let pmc_net1 = create_mock_peer_manager_for_foreign_network("net1").await;
|
||||
connect_peer_manager(pmc_net1.clone(), pm_center4.clone()).await;
|
||||
wait_route_appear(pma_net1.clone(), pmc_net1.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let pma_net2 = create_mock_peer_manager_for_foreign_network("net2").await;
|
||||
let pmb_net2 = create_mock_peer_manager_for_foreign_network("net2").await;
|
||||
connect_peer_manager(pma_net2.clone(), pm_center1.clone()).await;
|
||||
connect_peer_manager(pmb_net2.clone(), pm_center4.clone()).await;
|
||||
wait_route_appear(pma_net2.clone(), pmb_net2.clone())
|
||||
.await
|
||||
.unwrap();
|
||||
drop(pmb_net2);
|
||||
wait_for_condition(
|
||||
|| async { pma_net2.list_routes().await.len() == 1 },
|
||||
Duration::from_secs(5),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_foreign_network_manager_cluster() {
|
||||
set_global_var!(OSPF_UPDATE_MY_GLOBAL_FOREIGN_NETWORK_INTERVAL_SEC, 1);
|
||||
|
||||
@@ -266,6 +266,31 @@ impl PeerConn {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(handshake_recved))]
|
||||
pub async fn do_handshake_as_server_ext<Fn>(
|
||||
&mut self,
|
||||
mut handshake_recved: Fn,
|
||||
) -> Result<(), Error>
|
||||
where
|
||||
Fn: FnMut(&mut Self, &HandshakeRequest) -> Result<(), Error> + Send,
|
||||
{
|
||||
let rsp = self.wait_handshake_loop().await?;
|
||||
|
||||
handshake_recved(self, &rsp)?;
|
||||
|
||||
tracing::info!("handshake request: {:?}", rsp);
|
||||
self.info = Some(rsp);
|
||||
self.is_client = Some(false);
|
||||
|
||||
self.send_handshake().await?;
|
||||
|
||||
if self.get_peer_id() == self.my_peer_id {
|
||||
Err(Error::WaitRespError("peer id conflict".to_owned()))
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument]
|
||||
pub async fn do_handshake_as_server(&mut self) -> Result<(), Error> {
|
||||
let rsp = self.wait_handshake_loop().await?;
|
||||
@@ -435,6 +460,17 @@ impl PeerConn {
|
||||
is_closed: self.close_event_notifier.is_closed(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set_peer_id(&mut self, peer_id: PeerId) {
|
||||
if self.info.is_some() {
|
||||
panic!("set_peer_id should only be called before handshake");
|
||||
}
|
||||
self.my_peer_id = peer_id;
|
||||
}
|
||||
|
||||
pub fn get_my_peer_id(&self) -> PeerId {
|
||||
self.my_peer_id
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for PeerConn {
|
||||
|
||||
@@ -144,6 +144,8 @@ pub struct PeerManager {
|
||||
|
||||
// conns that are directly connected (which are not hole punched)
|
||||
directly_connected_conn_map: Arc<DashMap<PeerId, DashSet<uuid::Uuid>>>,
|
||||
|
||||
reserved_my_peer_id_map: DashMap<String, PeerId>,
|
||||
}
|
||||
|
||||
impl Debug for PeerManager {
|
||||
@@ -272,6 +274,8 @@ impl PeerManager {
|
||||
exit_nodes,
|
||||
|
||||
directly_connected_conn_map: Arc::new(DashMap::new()),
|
||||
|
||||
reserved_my_peer_id_map: DashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -413,7 +417,7 @@ impl PeerManager {
|
||||
self.add_direct_tunnel(t).await
|
||||
}
|
||||
|
||||
#[tracing::instrument]
|
||||
#[tracing::instrument(ret)]
|
||||
pub async fn add_tunnel_as_server(
|
||||
&self,
|
||||
tunnel: Box<dyn Tunnel>,
|
||||
@@ -421,18 +425,43 @@ impl PeerManager {
|
||||
) -> Result<(), Error> {
|
||||
tracing::info!("add tunnel as server start");
|
||||
let mut peer = PeerConn::new(self.my_peer_id, self.global_ctx.clone(), tunnel);
|
||||
peer.do_handshake_as_server().await?;
|
||||
if self.global_ctx.config.get_flags().private_mode
|
||||
&& peer.get_network_identity().network_name
|
||||
!= self.global_ctx.get_network_identity().network_name
|
||||
peer.do_handshake_as_server_ext(|peer, msg| {
|
||||
if msg.network_name
|
||||
== self.global_ctx.get_network_identity().network_name
|
||||
{
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if self.global_ctx.config.get_flags().private_mode {
|
||||
return Err(Error::SecretKeyError(
|
||||
"private mode is turned on, network identity not match".to_string(),
|
||||
));
|
||||
}
|
||||
if peer.get_network_identity().network_name
|
||||
== self.global_ctx.get_network_identity().network_name
|
||||
{
|
||||
|
||||
let mut peer_id = self
|
||||
.foreign_network_manager
|
||||
.get_network_peer_id(&msg.network_name);
|
||||
if peer_id.is_none() {
|
||||
peer_id = Some(*self.reserved_my_peer_id_map.entry(msg.network_name.clone()).or_insert_with(|| {
|
||||
rand::random::<PeerId>()
|
||||
}).value());
|
||||
}
|
||||
peer.set_peer_id(peer_id.clone().unwrap());
|
||||
|
||||
tracing::info!(
|
||||
?peer_id,
|
||||
?msg.network_name,
|
||||
"handshake as server with foreign network, new peer id: {}, peer id in foreign manager: {:?}",
|
||||
peer.get_my_peer_id(), peer_id
|
||||
);
|
||||
|
||||
Ok(())
|
||||
})
|
||||
.await?;
|
||||
|
||||
let peer_network_name = peer.get_network_identity().network_name.clone();
|
||||
|
||||
if peer_network_name == self.global_ctx.get_network_identity().network_name {
|
||||
let (peer_id, conn_id) = (peer.get_peer_id(), peer.get_conn_id());
|
||||
self.add_new_peer_conn(peer).await?;
|
||||
if is_directly_connected {
|
||||
@@ -441,12 +470,15 @@ impl PeerManager {
|
||||
} else {
|
||||
self.foreign_network_manager.add_peer_conn(peer).await?;
|
||||
}
|
||||
|
||||
self.reserved_my_peer_id_map.remove(&peer_network_name);
|
||||
|
||||
tracing::info!("add tunnel as server done");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn try_handle_foreign_network_packet(
|
||||
packet: ZCPacket,
|
||||
mut packet: ZCPacket,
|
||||
my_peer_id: PeerId,
|
||||
peer_map: &PeerMap,
|
||||
foreign_network_mgr: &ForeignNetworkManager,
|
||||
@@ -463,6 +495,10 @@ impl PeerManager {
|
||||
let foreign_network_name = foreign_hdr.get_network_name(packet.payload());
|
||||
let foreign_peer_id = foreign_hdr.get_dst_peer_id();
|
||||
|
||||
let foreign_network_my_peer_id =
|
||||
foreign_network_mgr.get_network_peer_id(&foreign_network_name);
|
||||
|
||||
// NOTICE: the to peer id is modified by the src from foreign network my peer id to the origin my peer id
|
||||
if to_peer_id == my_peer_id {
|
||||
// packet sent from other peer to me, extract the inner packet and forward it
|
||||
if let Err(e) = foreign_network_mgr
|
||||
@@ -481,7 +517,27 @@ impl PeerManager {
|
||||
);
|
||||
}
|
||||
Ok(())
|
||||
} else if from_peer_id == my_peer_id {
|
||||
} else if Some(from_peer_id) == foreign_network_my_peer_id {
|
||||
// to_peer_id is my peer id for the foreign network, need to convert to the origin my_peer_id of dst
|
||||
let Some(to_peer_id) = peer_map
|
||||
.get_origin_my_peer_id(&foreign_network_name, to_peer_id)
|
||||
.await
|
||||
else {
|
||||
tracing::debug!(
|
||||
?foreign_network_name,
|
||||
?to_peer_id,
|
||||
"cannot find origin my peer id for foreign network."
|
||||
);
|
||||
return Err(packet);
|
||||
};
|
||||
|
||||
// modify the to_peer id from foreign network my peer id to the origin my peer id
|
||||
packet
|
||||
.mut_peer_manager_header()
|
||||
.unwrap()
|
||||
.to_peer_id
|
||||
.set(to_peer_id);
|
||||
|
||||
// packet is generated from foreign network mgr and should be forward to other peer
|
||||
if let Err(e) = peer_map
|
||||
.send_msg(packet, to_peer_id, NextHopPolicy::LeastHop)
|
||||
@@ -496,7 +552,7 @@ impl PeerManager {
|
||||
|
||||
Ok(())
|
||||
} else {
|
||||
// target is not me, forward it
|
||||
// target is not me, forward it. try get origin peer id
|
||||
Err(packet)
|
||||
}
|
||||
}
|
||||
@@ -717,6 +773,7 @@ impl PeerManager {
|
||||
last_update: Some(last_update.into()),
|
||||
version: 0,
|
||||
network_secret_digest: info.network_secret_digest.clone(),
|
||||
my_peer_id_for_this_network: info.my_peer_id_for_this_network,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
@@ -204,6 +204,22 @@ impl PeerMap {
|
||||
None
|
||||
}
|
||||
|
||||
pub async fn get_origin_my_peer_id(
|
||||
&self,
|
||||
network_name: &str,
|
||||
foreign_my_peer_id: PeerId,
|
||||
) -> Option<PeerId> {
|
||||
for route in self.routes.read().await.iter() {
|
||||
let origin_peer_id = route
|
||||
.get_origin_my_peer_id(network_name, foreign_my_peer_id)
|
||||
.await;
|
||||
if origin_peer_id.is_some() {
|
||||
return origin_peer_id;
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.peer_map.is_empty()
|
||||
}
|
||||
|
||||
@@ -857,8 +857,7 @@ impl RouteTable {
|
||||
let is_new_peer_better = |old_peer_id: PeerId| -> bool {
|
||||
let old_next_hop = self.get_next_hop(old_peer_id);
|
||||
let new_next_hop = item.value();
|
||||
old_next_hop.is_none()
|
||||
|| new_next_hop.path_latency < old_next_hop.unwrap().path_latency
|
||||
old_next_hop.is_none() || new_next_hop.path_len < old_next_hop.unwrap().path_len
|
||||
};
|
||||
|
||||
if let Some(ipv4_addr) = info.ipv4_addr {
|
||||
@@ -866,7 +865,7 @@ impl RouteTable {
|
||||
.entry(ipv4_addr.into())
|
||||
.and_modify(|v| {
|
||||
if *v != *peer_id && is_new_peer_better(*v) {
|
||||
self.ipv4_peer_id_map.insert(ipv4_addr.into(), *peer_id);
|
||||
*v = *peer_id;
|
||||
}
|
||||
})
|
||||
.or_insert(*peer_id);
|
||||
@@ -878,14 +877,10 @@ impl RouteTable {
|
||||
.and_modify(|v| {
|
||||
if *v != *peer_id && is_new_peer_better(*v) {
|
||||
// if the next hop is not set or the new next hop is better, update it.
|
||||
self.cidr_peer_id_map
|
||||
.insert(cidr.parse().unwrap(), *peer_id);
|
||||
*v = *peer_id;
|
||||
}
|
||||
})
|
||||
.or_insert(*peer_id);
|
||||
|
||||
self.cidr_peer_id_map
|
||||
.insert(cidr.parse().unwrap(), *peer_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1084,6 +1079,7 @@ struct PeerRouteServiceImpl {
|
||||
route_table: RouteTable,
|
||||
route_table_with_cost: RouteTable,
|
||||
foreign_network_owner_map: DashMap<NetworkIdentity, Vec<PeerId>>,
|
||||
foreign_network_my_peer_id_map: DashMap<(String, PeerId), PeerId>,
|
||||
synced_route_info: SyncedRouteInfo,
|
||||
cached_local_conn_map: std::sync::Mutex<RouteConnBitmap>,
|
||||
cached_local_conn_map_version: AtomicVersion,
|
||||
@@ -1104,6 +1100,10 @@ impl Debug for PeerRouteServiceImpl {
|
||||
.field("route_table_with_cost", &self.route_table_with_cost)
|
||||
.field("synced_route_info", &self.synced_route_info)
|
||||
.field("foreign_network_owner_map", &self.foreign_network_owner_map)
|
||||
.field(
|
||||
"foreign_network_my_peer_id_map",
|
||||
&self.foreign_network_my_peer_id_map,
|
||||
)
|
||||
.field(
|
||||
"cached_local_conn_map",
|
||||
&self.cached_local_conn_map.lock().unwrap(),
|
||||
@@ -1127,6 +1127,7 @@ impl PeerRouteServiceImpl {
|
||||
route_table: RouteTable::new(),
|
||||
route_table_with_cost: RouteTable::new(),
|
||||
foreign_network_owner_map: DashMap::new(),
|
||||
foreign_network_my_peer_id_map: DashMap::new(),
|
||||
|
||||
synced_route_info: SyncedRouteInfo {
|
||||
peer_infos: DashMap::new(),
|
||||
@@ -1266,6 +1267,7 @@ impl PeerRouteServiceImpl {
|
||||
}
|
||||
|
||||
fn update_foreign_network_owner_map(&self) {
|
||||
self.foreign_network_my_peer_id_map.clear();
|
||||
self.foreign_network_owner_map.clear();
|
||||
for item in self.synced_route_info.foreign_network.iter() {
|
||||
let key = item.key();
|
||||
@@ -1290,7 +1292,12 @@ impl PeerRouteServiceImpl {
|
||||
self.foreign_network_owner_map
|
||||
.entry(network_identity)
|
||||
.or_insert_with(|| Vec::new())
|
||||
.push(key.peer_id);
|
||||
.push(entry.my_peer_id_for_this_network);
|
||||
|
||||
self.foreign_network_my_peer_id_map.insert(
|
||||
(key.network_name.clone(), entry.my_peer_id_for_this_network),
|
||||
key.peer_id,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1529,8 +1536,6 @@ impl PeerRouteServiceImpl {
|
||||
req_dynamic_msg.set_field_by_name("peer_infos", Value::Message(peer_infos));
|
||||
}
|
||||
|
||||
tracing::trace!(?req_dynamic_msg, "build_sync_route_raw_req");
|
||||
|
||||
req_dynamic_msg
|
||||
}
|
||||
|
||||
@@ -1646,7 +1651,12 @@ impl PeerRouteServiceImpl {
|
||||
}
|
||||
|
||||
fn update_peer_info_last_update(&self) {
|
||||
tracing::debug!(?self, "update_peer_info_last_update");
|
||||
tracing::debug!(
|
||||
"update_peer_info_last_update, my_peer_id: {:?}, prev: {:?}, new: {:?}",
|
||||
self.my_peer_id,
|
||||
self.peer_info_last_update.load(),
|
||||
std::time::Instant::now()
|
||||
);
|
||||
self.peer_info_last_update.store(std::time::Instant::now());
|
||||
}
|
||||
|
||||
@@ -2089,7 +2099,6 @@ impl PeerRoute {
|
||||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(session_mgr))]
|
||||
async fn maintain_session_tasks(
|
||||
session_mgr: RouteSessionManager,
|
||||
service_impl: Arc<PeerRouteServiceImpl>,
|
||||
@@ -2097,7 +2106,6 @@ impl PeerRoute {
|
||||
session_mgr.maintain_sessions(service_impl).await;
|
||||
}
|
||||
|
||||
#[tracing::instrument(skip(session_mgr))]
|
||||
async fn update_my_peer_info_routine(
|
||||
service_impl: Arc<PeerRouteServiceImpl>,
|
||||
session_mgr: RouteSessionManager,
|
||||
@@ -2296,6 +2304,17 @@ impl Route for PeerRoute {
|
||||
.unwrap_or_default()
|
||||
}
|
||||
|
||||
async fn get_origin_my_peer_id(
|
||||
&self,
|
||||
network_name: &str,
|
||||
foreign_my_peer_id: PeerId,
|
||||
) -> Option<PeerId> {
|
||||
self.service_impl
|
||||
.foreign_network_my_peer_id_map
|
||||
.get(&(network_name.to_string(), foreign_my_peer_id))
|
||||
.map(|x| *x)
|
||||
}
|
||||
|
||||
async fn get_feature_flag(&self, peer_id: PeerId) -> Option<PeerFeatureFlag> {
|
||||
self.service_impl
|
||||
.route_table
|
||||
|
||||
@@ -95,6 +95,16 @@ pub trait Route {
|
||||
Default::default()
|
||||
}
|
||||
|
||||
// my peer id in foreign network is different from the one in local network
|
||||
// this function is used to get the peer id in local network
|
||||
async fn get_origin_my_peer_id(
|
||||
&self,
|
||||
_network_name: &str,
|
||||
_foreign_my_peer_id: PeerId,
|
||||
) -> Option<PeerId> {
|
||||
None
|
||||
}
|
||||
|
||||
async fn set_route_cost_fn(&self, _cost_fn: RouteCostCalculator) {}
|
||||
|
||||
async fn get_feature_flag(&self, peer_id: PeerId) -> Option<PeerFeatureFlag>;
|
||||
|
||||
@@ -103,6 +103,7 @@ message ListForeignNetworkRequest {}
|
||||
message ForeignNetworkEntryPb {
|
||||
repeated PeerInfo peers = 1;
|
||||
bytes network_secret_digest = 2;
|
||||
uint32 my_peer_id_for_this_network = 3;
|
||||
}
|
||||
|
||||
message ListForeignNetworkResponse {
|
||||
|
||||
@@ -46,6 +46,7 @@ message ForeignNetworkRouteInfoEntry {
|
||||
google.protobuf.Timestamp last_update = 2;
|
||||
uint32 version = 3;
|
||||
bytes network_secret_digest = 4;
|
||||
uint32 my_peer_id_for_this_network = 5;
|
||||
}
|
||||
|
||||
message RouteForeignNetworkInfos {
|
||||
|
||||
@@ -887,11 +887,17 @@ pub async fn manual_reconnector(#[values(true, false)] is_foreign: bool) {
|
||||
.get_foreign_network_client()
|
||||
.get_peer_map()
|
||||
};
|
||||
let center_inst_peer_id = if !is_foreign {
|
||||
center_inst.peer_id()
|
||||
} else {
|
||||
center_inst
|
||||
.get_peer_manager()
|
||||
.get_foreign_network_manager()
|
||||
.get_network_peer_id(&inst1.get_global_ctx().get_network_identity().network_name)
|
||||
.unwrap()
|
||||
};
|
||||
|
||||
let conns = peer_map
|
||||
.list_peer_conns(center_inst.peer_id())
|
||||
.await
|
||||
.unwrap();
|
||||
let conns = peer_map.list_peer_conns(center_inst_peer_id).await.unwrap();
|
||||
|
||||
assert!(conns.len() >= 1);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user