Add is_hole_punched flag to PeerConn (#1001)

This commit is contained in:
liusen373
2025-06-18 12:14:57 +08:00
committed by GitHub
parent 327ccdcf38
commit 72673a9d52
8 changed files with 47 additions and 92 deletions

View File

@@ -186,7 +186,7 @@ impl DirectConnectorManagerData {
.await?;
// NOTICE: must add as directly connected tunnel
self.peer_manager.add_direct_tunnel(ret).await
self.peer_manager.add_client_tunnel(ret, true).await
}
async fn do_try_connect_to_ip(&self, dst_peer_id: PeerId, addr: String) -> Result<(), Error> {

View File

@@ -221,7 +221,7 @@ impl UdpHoePunchConnectorData {
Ok(Some(tunnel)) => {
tracing::info!(?tunnel, "hole punching get tunnel success");
if let Err(e) = self.peer_mgr.add_client_tunnel(tunnel).await {
if let Err(e) = self.peer_mgr.add_client_tunnel(tunnel, false).await {
tracing::warn!(?e, "add client tunnel failed");
op(true);
false

View File

@@ -721,7 +721,7 @@ mod tests {
let s_ret =
tokio::spawn(async move { b_mgr_copy.add_tunnel_as_server(b_ring, true).await });
pma_net1.add_client_tunnel(a_ring).await.unwrap();
pma_net1.add_client_tunnel(a_ring, false).await.unwrap();
s_ret.await.unwrap().unwrap();
}

View File

@@ -1,7 +1,7 @@
use std::sync::Arc;
use crossbeam::atomic::AtomicCell;
use dashmap::DashMap;
use dashmap::{DashMap, DashSet};
use tokio::{select, sync::mpsc};
@@ -200,6 +200,17 @@ impl Peer {
ret
}
pub fn has_directly_connected_conn(&self) -> bool {
self.conns.iter().any(|entry|!(entry.value()).is_hole_punched())
}
pub fn get_directly_connections(&self) -> DashSet<uuid::Uuid> {
self.conns.iter()
.filter(|entry| !(entry.value()).is_hole_punched())
.map(|entry|(entry.value()).get_conn_id())
.collect()
}
pub fn get_default_conn_id(&self) -> PeerConnId {
self.default_conn_id.load()
}

View File

@@ -101,6 +101,9 @@ pub struct PeerConn {
info: Option<HandshakeRequest>,
is_client: Option<bool>,
// remote or local
is_hole_punched: bool,
close_event_notifier: Arc<PeerConnCloseNotify>,
ctrl_resp_sender: broadcast::Sender<ZCPacket>,
@@ -152,6 +155,8 @@ impl PeerConn {
info: None,
is_client: None,
is_hole_punched: true,
close_event_notifier: Arc::new(PeerConnCloseNotify::new(conn_id)),
ctrl_resp_sender: ctrl_sender,
@@ -166,6 +171,14 @@ impl PeerConn {
self.conn_id
}
pub fn set_is_hole_punched(&mut self, is_hole_punched: bool) {
self.is_hole_punched = is_hole_punched;
}
pub fn is_hole_punched(&self) -> bool {
self.is_hole_punched
}
async fn wait_handshake(&mut self, need_retry: &mut bool) -> Result<HandshakeRequest, Error> {
*need_retry = false;

View File

@@ -23,7 +23,7 @@ use crate::{
compressor::{Compressor as _, DefaultCompressor},
constants::EASYTIER_VERSION,
error::Error,
global_ctx::{ArcGlobalCtx, GlobalCtxEvent, NetworkIdentity},
global_ctx::{ArcGlobalCtx, NetworkIdentity},
stun::StunInfoCollectorTrait,
PeerId,
},
@@ -142,9 +142,6 @@ pub struct PeerManager {
exit_nodes: Vec<Ipv4Addr>,
// conns that are directly connected (which are not hole punched)
directly_connected_conn_map: Arc<DashMap<PeerId, DashSet<uuid::Uuid>>>,
reserved_my_peer_id_map: DashMap<String, PeerId>,
}
@@ -273,8 +270,6 @@ impl PeerManager {
exit_nodes,
directly_connected_conn_map: Arc::new(DashMap::new()),
reserved_my_peer_id_map: DashMap::new(),
}
}
@@ -319,8 +314,10 @@ impl PeerManager {
pub async fn add_client_tunnel(
&self,
tunnel: Box<dyn Tunnel>,
is_directly_connected: bool,
) -> Result<(PeerId, PeerConnId), Error> {
let mut peer = PeerConn::new(self.my_peer_id, self.global_ctx.clone(), tunnel);
peer.set_is_hole_punched(!is_directly_connected);
peer.do_handshake_as_client().await?;
let conn_id = peer.get_conn_id();
let peer_id = peer.get_peer_id();
@@ -334,72 +331,12 @@ impl PeerManager {
Ok((peer_id, conn_id))
}
fn add_directly_connected_conn(&self, peer_id: PeerId, conn_id: uuid::Uuid) {
let _ = self
.directly_connected_conn_map
.entry(peer_id)
.or_insert_with(DashSet::new)
.insert(conn_id);
}
pub fn has_directly_connected_conn(&self, peer_id: PeerId) -> bool {
self.directly_connected_conn_map
.get(&peer_id)
.map_or(false, |x| !x.is_empty())
}
async fn start_peer_conn_close_event_handler(&self) {
let dmap = self.directly_connected_conn_map.clone();
let mut event_recv = self.global_ctx.subscribe();
let peer_map = self.peers.clone();
use tokio::sync::broadcast::error::RecvError;
self.tasks.lock().await.spawn(async move {
loop {
match event_recv.recv().await {
Err(RecvError::Closed) => {
tracing::error!("peer conn close event handler exit");
break;
}
Err(RecvError::Lagged(_)) => {
tracing::warn!("peer conn close event handler lagged");
event_recv = event_recv.resubscribe();
let alive_conns = peer_map.get_alive_conns();
for p in dmap.iter_mut() {
p.retain(|x| alive_conns.contains_key(&(*p.key(), *x)));
}
dmap.retain(|_, v| !v.is_empty());
}
Ok(event) => {
if let GlobalCtxEvent::PeerConnRemoved(info) = event {
let mut need_remove = false;
if let Some(set) = dmap.get_mut(&info.peer_id) {
let conn_id = info.conn_id.parse().unwrap();
let old = set.remove(&conn_id);
tracing::info!(
?old,
?info,
"try remove conn id from directly connected map"
);
need_remove = set.is_empty();
}
if need_remove {
dmap.remove(&info.peer_id);
}
}
}
}
}
});
}
pub async fn add_direct_tunnel(
&self,
t: Box<dyn Tunnel>,
) -> Result<(PeerId, PeerConnId), Error> {
let (peer_id, conn_id) = self.add_client_tunnel(t).await?;
self.add_directly_connected_conn(peer_id, conn_id);
Ok((peer_id, conn_id))
if let Some(peer) = self.peers.get_peer_by_id(peer_id) {
peer.has_directly_connected_conn()
} else {
false
}
}
#[tracing::instrument]
@@ -414,7 +351,7 @@ impl PeerManager {
let t = ns
.run_async(|| async move { connector.connect().await })
.await?;
self.add_direct_tunnel(t).await
self.add_client_tunnel(t, true).await
}
#[tracing::instrument(ret)]
@@ -462,11 +399,8 @@ impl PeerManager {
let peer_network_name = peer.get_network_identity().network_name.clone();
if peer_network_name == self.global_ctx.get_network_identity().network_name {
let (peer_id, conn_id) = (peer.get_peer_id(), peer.get_conn_id());
peer.set_is_hole_punched(!is_directly_connected);
self.add_new_peer_conn(peer).await?;
if is_directly_connected {
self.add_directly_connected_conn(peer_id, conn_id);
}
} else {
self.foreign_network_manager.add_peer_conn(peer).await?;
}
@@ -1020,11 +954,9 @@ impl PeerManager {
async fn run_clean_peer_without_conn_routine(&self) {
let peer_map = self.peers.clone();
let dmap = self.directly_connected_conn_map.clone();
self.tasks.lock().await.spawn(async move {
loop {
peer_map.clean_peer_without_conn().await;
dmap.retain(|p, v| peer_map.has_peer(*p) && !v.is_empty());
tokio::time::sleep(std::time::Duration::from_secs(3)).await;
}
});
@@ -1041,8 +973,6 @@ impl PeerManager {
}
pub async fn run(&self) -> Result<(), Error> {
self.start_peer_conn_close_event_handler().await;
match &self.route_algo_inst {
RouteAlgoInst::Ospf(route) => self.add_route(route.clone()).await,
RouteAlgoInst::None => {}
@@ -1135,10 +1065,11 @@ impl PeerManager {
}
pub fn get_directly_connections(&self, peer_id: PeerId) -> DashSet<uuid::Uuid> {
self.directly_connected_conn_map
.get(&peer_id)
.map(|x| x.clone())
.unwrap_or_default()
if let Some(peer) = self.peers.get_peer_by_id(peer_id) {
return peer.get_directly_connections()
}
DashSet::new()
}
pub async fn clear_resources(&self) {
@@ -1227,7 +1158,7 @@ mod tests {
});
server_mgr
.add_client_tunnel(server.accept().await.unwrap())
.add_client_tunnel(server.accept().await.unwrap(), false)
.await
.unwrap();
}
@@ -1432,7 +1363,7 @@ mod tests {
let a_mgr_copy = peer_mgr_a.clone();
tokio::spawn(async move {
a_mgr_copy.add_client_tunnel(a_ring).await.unwrap();
a_mgr_copy.add_client_tunnel(a_ring, false).await.unwrap();
});
let b_mgr_copy = peer_mgr_b.clone();
tokio::spawn(async move {

View File

@@ -87,7 +87,7 @@ impl PeerMap {
});
}
fn get_peer_by_id(&self, peer_id: PeerId) -> Option<Arc<Peer>> {
pub fn get_peer_by_id(&self, peer_id: PeerId) -> Option<Arc<Peer>> {
self.peer_map.get(&peer_id).map(|v| v.clone())
}

View File

@@ -41,7 +41,7 @@ pub async fn connect_peer_manager(client: Arc<PeerManager>, server: Arc<PeerMana
let (a_ring, b_ring) = create_ring_tunnel_pair();
let a_mgr_copy = client.clone();
tokio::spawn(async move {
a_mgr_copy.add_client_tunnel(a_ring).await.unwrap();
a_mgr_copy.add_client_tunnel(a_ring, false).await.unwrap();
});
let b_mgr_copy = server.clone();
tokio::spawn(async move {