use ospf route to propogate foreign network info

This commit is contained in:
sijie.sun
2024-09-22 10:10:32 +08:00
committed by Sijie.Sun
parent fb8d262554
commit aca9a0e35b
7 changed files with 449 additions and 175 deletions

View File

@@ -5,7 +5,10 @@ only forward packets of peers that directly connected to this node.
in future, with the help wo peer center we can forward packets of peers that
connected to any node in the local network.
*/
use std::sync::{Arc, Weak};
use std::{
sync::{Arc, Weak},
time::SystemTime,
};
use dashmap::DashMap;
use tokio::{
@@ -44,33 +47,33 @@ use super::{
};
struct ForeignNetworkEntry {
my_peer_id: PeerId,
global_ctx: ArcGlobalCtx,
network: NetworkIdentity,
peer_map: Arc<PeerMap>,
relay_data: bool,
route: ArcRoute,
peer_rpc: Weak<PeerRpcManager>,
rpc_sender: UnboundedSender<ZCPacket>,
packet_recv: Mutex<Option<PacketRecvChanReceiver>>,
tasks: Mutex<JoinSet<()>>,
}
impl ForeignNetworkEntry {
fn new(
network: NetworkIdentity,
packet_sender: PacketRecvChan,
global_ctx: ArcGlobalCtx,
my_peer_id: PeerId,
relay_data: bool,
peer_rpc: Arc<PeerRpcManager>,
) -> Self {
let config = TomlConfigLoader::default();
config.set_network_identity(network.clone());
config.set_hostname(Some(format!("PublicServer_{}", global_ctx.get_hostname())));
let foreign_global_ctx = Arc::new(GlobalCtx::new(config));
foreign_global_ctx.replace_stun_info_collector(Box::new(MockStunInfoCollector {
udp_nat_type: NatType::Unknown,
}));
let mut feature_flag = global_ctx.get_feature_flags();
feature_flag.is_public_server = true;
global_ctx.set_feature_flags(feature_flag);
let foreign_global_ctx = Self::build_foreign_global_ctx(&network, global_ctx.clone());
let (packet_sender, packet_recv) = mpsc::channel(1000);
let peer_map = Arc::new(PeerMap::new(
packet_sender,
@@ -78,11 +81,9 @@ impl ForeignNetworkEntry {
my_peer_id,
));
let route = PeerRoute::new(my_peer_id, foreign_global_ctx.clone(), peer_rpc.clone());
let (peer_rpc, rpc_transport_sender) = Self::build_rpc_tspt(my_peer_id, peer_map.clone());
for u in global_ctx.get_running_listeners().into_iter() {
foreign_global_ctx.add_running_listener(u);
}
let route = PeerRoute::new(my_peer_id, foreign_global_ctx.clone(), peer_rpc.clone());
peer_rpc.rpc_server().registry().register(
DirectConnectorRpcServer::new(DirectConnectorManagerRpcServer::new(
foreign_global_ctx.clone(),
@@ -91,16 +92,101 @@ impl ForeignNetworkEntry {
);
Self {
my_peer_id,
global_ctx: foreign_global_ctx,
network,
peer_map,
relay_data,
route: Arc::new(Box::new(route)),
peer_rpc: Arc::downgrade(&peer_rpc),
rpc_sender: rpc_transport_sender,
packet_recv: Mutex::new(Some(packet_recv)),
tasks: Mutex::new(JoinSet::new()),
}
}
async fn prepare(&self, my_peer_id: PeerId) {
fn build_foreign_global_ctx(
network: &NetworkIdentity,
global_ctx: ArcGlobalCtx,
) -> ArcGlobalCtx {
let config = TomlConfigLoader::default();
config.set_network_identity(network.clone());
config.set_hostname(Some(format!("PublicServer_{}", global_ctx.get_hostname())));
let foreign_global_ctx = Arc::new(GlobalCtx::new(config));
foreign_global_ctx.replace_stun_info_collector(Box::new(MockStunInfoCollector {
udp_nat_type: NatType::Unknown,
}));
let mut feature_flag = global_ctx.get_feature_flags();
feature_flag.is_public_server = true;
foreign_global_ctx.set_feature_flags(feature_flag);
for u in global_ctx.get_running_listeners().into_iter() {
foreign_global_ctx.add_running_listener(u);
}
foreign_global_ctx
}
fn build_rpc_tspt(
my_peer_id: PeerId,
peer_map: Arc<PeerMap>,
) -> (Arc<PeerRpcManager>, UnboundedSender<ZCPacket>) {
struct RpcTransport {
my_peer_id: PeerId,
peer_map: Weak<PeerMap>,
packet_recv: Mutex<UnboundedReceiver<ZCPacket>>,
}
#[async_trait::async_trait]
impl PeerRpcManagerTransport for RpcTransport {
fn my_peer_id(&self) -> PeerId {
self.my_peer_id
}
async fn send(&self, msg: ZCPacket, dst_peer_id: PeerId) -> Result<(), Error> {
tracing::debug!(
"foreign network manager send rpc to peer: {:?}",
dst_peer_id
);
let peer_map = self
.peer_map
.upgrade()
.ok_or(anyhow::anyhow!("peer map is gone"))?;
peer_map
.send_msg(msg, dst_peer_id, NextHopPolicy::LeastHop)
.await
}
async fn recv(&self) -> Result<ZCPacket, Error> {
if let Some(o) = self.packet_recv.lock().await.recv().await {
tracing::info!("recv rpc packet in foreign network manager rpc transport");
Ok(o)
} else {
Err(Error::Unknown)
}
}
}
let (rpc_transport_sender, peer_rpc_tspt_recv) = mpsc::unbounded_channel();
let tspt = RpcTransport {
my_peer_id,
peer_map: Arc::downgrade(&peer_map),
packet_recv: Mutex::new(peer_rpc_tspt_recv),
};
let peer_rpc = Arc::new(PeerRpcManager::new(tspt));
(peer_rpc, rpc_transport_sender)
}
async fn prepare_route(&self, my_peer_id: PeerId) {
struct Interface {
my_peer_id: PeerId,
peer_map: Weak<PeerMap>,
@@ -131,6 +217,52 @@ impl ForeignNetworkEntry {
self.peer_map.add_route(self.route.clone()).await;
}
async fn start_packet_recv(&self) {
let mut recv = self.packet_recv.lock().await.take().unwrap();
let my_node_id = self.my_peer_id;
let rpc_sender = self.rpc_sender.clone();
let peer_map = self.peer_map.clone();
let relay_data = self.relay_data;
self.tasks.lock().await.spawn(async move {
while let Some(packet_bytes) = recv.recv().await {
let Some(hdr) = packet_bytes.peer_manager_header() else {
tracing::warn!("invalid packet, skip");
continue;
};
tracing::info!(?hdr, "recv packet in foreign network manager");
let to_peer_id = hdr.to_peer_id.get();
if to_peer_id == my_node_id {
if hdr.packet_type == PacketType::TaRpc as u8
|| hdr.packet_type == PacketType::RpcReq as u8
|| hdr.packet_type == PacketType::RpcResp as u8
{
rpc_sender.send(packet_bytes).unwrap();
continue;
}
tracing::trace!(?hdr, "ignore packet in foreign network");
} else {
if !relay_data && hdr.packet_type == PacketType::Data as u8 {
continue;
}
let ret = peer_map
.send_msg(packet_bytes, to_peer_id, NextHopPolicy::LeastHop)
.await;
if ret.is_err() {
tracing::error!("forward packet to peer failed: {:?}", ret.err());
}
}
}
});
}
async fn prepare(&self, my_peer_id: PeerId) {
self.prepare_route(my_peer_id).await;
self.start_packet_recv().await;
self.peer_rpc.upgrade().unwrap().run();
}
}
impl Drop for ForeignNetworkEntry {
@@ -147,27 +279,11 @@ impl Drop for ForeignNetworkEntry {
struct ForeignNetworkManagerData {
network_peer_maps: DashMap<String, Arc<ForeignNetworkEntry>>,
peer_network_map: DashMap<PeerId, String>,
network_peer_last_update: DashMap<String, SystemTime>,
lock: std::sync::Mutex<()>,
}
impl ForeignNetworkManagerData {
async fn send_msg(&self, msg: ZCPacket, dst_peer_id: PeerId) -> Result<(), Error> {
let network_name = self
.peer_network_map
.get(&dst_peer_id)
.ok_or_else(|| Error::RouteError(Some("network not found".to_string())))?
.clone();
let entry = self
.network_peer_maps
.get(&network_name)
.ok_or_else(|| Error::RouteError(Some("no peer in network".to_string())))?
.clone();
entry
.peer_map
.send_msg(msg, dst_peer_id, NextHopPolicy::LeastHop)
.await
}
fn get_peer_network(&self, peer_id: PeerId) -> Option<String> {
self.peer_network_map.get(&peer_id).map(|v| v.clone())
}
@@ -179,8 +295,12 @@ impl ForeignNetworkManagerData {
fn remove_peer(&self, peer_id: PeerId, network_name: &String) {
let _l = self.lock.lock().unwrap();
self.peer_network_map.remove(&peer_id);
self.network_peer_maps
.remove_if(network_name, |_, v| v.peer_map.is_empty());
if let Some(_) = self
.network_peer_maps
.remove_if(network_name, |_, v| v.peer_map.is_empty())
{
self.network_peer_last_update.remove(network_name);
}
}
async fn clear_no_conn_peer(&self, network_name: &String) {
@@ -197,37 +317,7 @@ impl ForeignNetworkManagerData {
let _l = self.lock.lock().unwrap();
self.peer_network_map.retain(|_, v| v != network_name);
self.network_peer_maps.remove(network_name);
}
}
struct RpcTransport {
my_peer_id: PeerId,
data: Arc<ForeignNetworkManagerData>,
packet_recv: Mutex<UnboundedReceiver<ZCPacket>>,
}
#[async_trait::async_trait]
impl PeerRpcManagerTransport for RpcTransport {
fn my_peer_id(&self) -> PeerId {
self.my_peer_id
}
async fn send(&self, msg: ZCPacket, dst_peer_id: PeerId) -> Result<(), Error> {
tracing::debug!(
"foreign network manager send rpc to peer: {:?}",
dst_peer_id
);
self.data.send_msg(msg, dst_peer_id).await
}
async fn recv(&self) -> Result<ZCPacket, Error> {
if let Some(o) = self.packet_recv.lock().await.recv().await {
tracing::info!("recv rpc packet in foreign network manager rpc transport");
Ok(o)
} else {
Err(Error::Unknown)
}
self.network_peer_last_update.remove(network_name);
}
}
@@ -238,12 +328,7 @@ pub struct ForeignNetworkManager {
global_ctx: ArcGlobalCtx,
packet_sender_to_mgr: PacketRecvChan,
packet_sender: PacketRecvChan,
packet_recv: Mutex<Option<PacketRecvChanReceiver>>,
data: Arc<ForeignNetworkManagerData>,
rpc_mgr: Arc<PeerRpcManager>,
rpc_transport_sender: UnboundedSender<ZCPacket>,
tasks: Mutex<JoinSet<()>>,
}
@@ -254,34 +339,19 @@ impl ForeignNetworkManager {
global_ctx: ArcGlobalCtx,
packet_sender_to_mgr: PacketRecvChan,
) -> Self {
// recv packet from all foreign networks
let (packet_sender, packet_recv) = mpsc::channel(1000);
let data = Arc::new(ForeignNetworkManagerData {
network_peer_maps: DashMap::new(),
peer_network_map: DashMap::new(),
network_peer_last_update: DashMap::new(),
lock: std::sync::Mutex::new(()),
});
// handle rpc from foreign networks
let (rpc_transport_sender, peer_rpc_tspt_recv) = mpsc::unbounded_channel();
let rpc_mgr = Arc::new(PeerRpcManager::new(RpcTransport {
my_peer_id,
data: data.clone(),
packet_recv: Mutex::new(peer_rpc_tspt_recv),
}));
Self {
my_peer_id,
global_ctx,
packet_sender_to_mgr,
packet_sender,
packet_recv: Mutex::new(Some(packet_recv)),
data,
rpc_mgr,
rpc_transport_sender,
tasks: Mutex::new(JoinSet::new()),
}
@@ -323,11 +393,9 @@ impl ForeignNetworkManager {
new_added = true;
Arc::new(ForeignNetworkEntry::new(
peer_conn.get_network_identity(),
self.packet_sender.clone(),
self.global_ctx.clone(),
self.my_peer_id,
!ret.is_err(),
self.rpc_mgr.clone(),
))
})
.clone();
@@ -337,6 +405,11 @@ impl ForeignNetworkManager {
peer_conn.get_network_identity().network_name.clone(),
);
self.data.network_peer_last_update.insert(
peer_conn.get_network_identity().network_name.clone(),
SystemTime::now(),
);
entry
};
@@ -363,83 +436,29 @@ impl ForeignNetworkManager {
let mut s = entry.global_ctx.subscribe();
self.tasks.lock().await.spawn(async move {
while let Ok(e) = s.recv().await {
if let GlobalCtxEvent::PeerRemoved(peer_id) = &e {
tracing::info!(?e, "remove peer from foreign network manager");
data.remove_peer(*peer_id, &network_name);
} else if let GlobalCtxEvent::PeerConnRemoved(..) = &e {
tracing::info!(?e, "clear no conn peer from foreign network manager");
data.clear_no_conn_peer(&network_name).await;
match &e {
GlobalCtxEvent::PeerRemoved(peer_id) => {
tracing::info!(?e, "remove peer from foreign network manager");
data.remove_peer(*peer_id, &network_name);
data.network_peer_last_update
.insert(network_name.clone(), SystemTime::now());
}
GlobalCtxEvent::PeerConnRemoved(..) => {
tracing::info!(?e, "clear no conn peer from foreign network manager");
data.clear_no_conn_peer(&network_name).await;
}
GlobalCtxEvent::PeerAdded(_) => {
tracing::info!(?e, "add peer to foreign network manager");
data.network_peer_last_update
.insert(network_name.clone(), SystemTime::now());
}
_ => continue,
}
}
// if lagged or recv done just remove the network
tracing::error!("global event handler at foreign network manager exit");
data.remove_network(&network_name);
});
self.tasks.lock().await.spawn(async move {});
}
async fn start_packet_recv(&self) {
let mut recv = self.packet_recv.lock().await.take().unwrap();
let sender_to_mgr = self.packet_sender_to_mgr.clone();
let my_node_id = self.my_peer_id;
let rpc_sender = self.rpc_transport_sender.clone();
let data = self.data.clone();
self.tasks.lock().await.spawn(async move {
while let Some(packet_bytes) = recv.recv().await {
let Some(hdr) = packet_bytes.peer_manager_header() else {
tracing::warn!("invalid packet, skip");
continue;
};
tracing::info!(?hdr, "recv packet in foreign network manager");
let from_peer_id = hdr.from_peer_id.get();
let to_peer_id = hdr.to_peer_id.get();
if to_peer_id == my_node_id {
if hdr.packet_type == PacketType::TaRpc as u8
|| hdr.packet_type == PacketType::RpcReq as u8
|| hdr.packet_type == PacketType::RpcResp as u8
{
rpc_sender.send(packet_bytes).unwrap();
continue;
}
if let Err(e) = sender_to_mgr.send(packet_bytes).await {
tracing::error!("send packet to mgr failed: {:?}", e);
}
} else {
let Some(from_network) = data.get_peer_network(from_peer_id) else {
continue;
};
let Some(to_network) = data.get_peer_network(to_peer_id) else {
continue;
};
if from_network != to_network {
continue;
}
if let Some(entry) = data.get_network_entry(&from_network) {
if !entry.relay_data && hdr.packet_type == PacketType::Data as u8 {
continue;
}
let ret = entry
.peer_map
.send_msg(packet_bytes, to_peer_id, NextHopPolicy::LeastHop)
.await;
if ret.is_err() {
tracing::error!("forward packet to peer failed: {:?}", ret.err());
}
} else {
tracing::error!("foreign network not found: {}", from_network);
}
}
}
});
}
pub async fn run(&self) {
self.start_packet_recv().await;
self.rpc_mgr.run();
}
pub async fn list_foreign_networks(&self) -> ListForeignNetworkResponse {
@@ -473,6 +492,13 @@ impl ForeignNetworkManager {
}
ret
}
pub fn get_foreign_network_last_update(&self, network_name: &str) -> Option<SystemTime> {
self.data
.network_peer_last_update
.get(network_name)
.map(|v| v.clone())
}
}
impl Drop for ForeignNetworkManager {
@@ -496,7 +522,7 @@ mod tests {
tests::{connect_peer_manager, wait_route_appear},
},
proto::common::NatType,
tunnel::common::tests::wait_for_condition,
tunnel::common::tests::{enable_log, wait_for_condition},
};
use super::*;