mirror of
https://mirror.suhoan.cn/https://github.com/EasyTier/EasyTier.git
synced 2025-12-13 21:27:25 +08:00
fix dead loop in direct connecto if disable-p2p is enabled in dst (#1206)
This commit is contained in:
@@ -16,6 +16,7 @@ use crate::{
|
|||||||
dns::socket_addrs, error::Error, global_ctx::ArcGlobalCtx, stun::StunInfoCollectorTrait,
|
dns::socket_addrs, error::Error, global_ctx::ArcGlobalCtx, stun::StunInfoCollectorTrait,
|
||||||
PeerId,
|
PeerId,
|
||||||
},
|
},
|
||||||
|
connector::udp_hole_punch::handle_rpc_result,
|
||||||
peers::{
|
peers::{
|
||||||
peer_conn::PeerConnId,
|
peer_conn::PeerConnId,
|
||||||
peer_manager::PeerManager,
|
peer_manager::PeerManager,
|
||||||
@@ -91,6 +92,7 @@ struct DirectConnectorManagerData {
|
|||||||
global_ctx: ArcGlobalCtx,
|
global_ctx: ArcGlobalCtx,
|
||||||
peer_manager: Arc<PeerManager>,
|
peer_manager: Arc<PeerManager>,
|
||||||
dst_listener_blacklist: timedmap::TimedMap<DstListenerUrlBlackListItem, ()>,
|
dst_listener_blacklist: timedmap::TimedMap<DstListenerUrlBlackListItem, ()>,
|
||||||
|
peer_black_list: timedmap::TimedMap<PeerId, ()>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl DirectConnectorManagerData {
|
impl DirectConnectorManagerData {
|
||||||
@@ -99,6 +101,7 @@ impl DirectConnectorManagerData {
|
|||||||
global_ctx,
|
global_ctx,
|
||||||
peer_manager,
|
peer_manager,
|
||||||
dst_listener_blacklist: timedmap::TimedMap::new(),
|
dst_listener_blacklist: timedmap::TimedMap::new(),
|
||||||
|
peer_black_list: timedmap::TimedMap::new(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -473,7 +476,17 @@ impl DirectConnectorManagerData {
|
|||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
let mut backoff =
|
let mut backoff =
|
||||||
udp_hole_punch::BackOff::new(vec![1000, 2000, 2000, 5000, 5000, 10000, 30000, 60000]);
|
udp_hole_punch::BackOff::new(vec![1000, 2000, 2000, 5000, 5000, 10000, 30000, 60000]);
|
||||||
|
let mut attempt = 0;
|
||||||
loop {
|
loop {
|
||||||
|
if self.peer_black_list.contains(&dst_peer_id) {
|
||||||
|
return Err(anyhow::anyhow!("peer {} is blacklisted", dst_peer_id).into());
|
||||||
|
}
|
||||||
|
|
||||||
|
if attempt > 0 {
|
||||||
|
tokio::time::sleep(Duration::from_millis(backoff.next_backoff())).await;
|
||||||
|
}
|
||||||
|
attempt += 1;
|
||||||
|
|
||||||
let peer_manager = self.peer_manager.clone();
|
let peer_manager = self.peer_manager.clone();
|
||||||
tracing::debug!("try direct connect to peer: {}", dst_peer_id);
|
tracing::debug!("try direct connect to peer: {}", dst_peer_id);
|
||||||
|
|
||||||
@@ -486,17 +499,11 @@ impl DirectConnectorManagerData {
|
|||||||
self.global_ctx.get_network_name(),
|
self.global_ctx.get_network_name(),
|
||||||
);
|
);
|
||||||
|
|
||||||
let ip_list = match rpc_stub
|
let ip_list = rpc_stub
|
||||||
.get_ip_list(BaseController::default(), GetIpListRequest {})
|
.get_ip_list(BaseController::default(), GetIpListRequest {})
|
||||||
.await
|
.await;
|
||||||
.with_context(|| format!("get ip list from peer {}", dst_peer_id))
|
let ip_list = handle_rpc_result(ip_list, dst_peer_id, &self.peer_black_list)
|
||||||
{
|
.with_context(|| format!("get ip list from peer {}", dst_peer_id))?;
|
||||||
Ok(ip_list) => ip_list,
|
|
||||||
Err(e) => {
|
|
||||||
tracing::error!(?e, "failed to get ip list from peer");
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
tracing::info!(ip_list = ?ip_list, dst_peer_id = ?dst_peer_id, "got ip list");
|
tracing::info!(ip_list = ?ip_list, dst_peer_id = ?dst_peer_id, "got ip list");
|
||||||
|
|
||||||
@@ -512,8 +519,6 @@ impl DirectConnectorManagerData {
|
|||||||
);
|
);
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
tokio::time::sleep(Duration::from_millis(backoff.next_backoff())).await;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -547,13 +552,16 @@ impl PeerTaskLauncher for DirectConnectorLauncher {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn collect_peers_need_task(&self, data: &Self::Data) -> Vec<Self::CollectPeerItem> {
|
async fn collect_peers_need_task(&self, data: &Self::Data) -> Vec<Self::CollectPeerItem> {
|
||||||
|
data.peer_black_list.cleanup();
|
||||||
let my_peer_id = data.peer_manager.my_peer_id();
|
let my_peer_id = data.peer_manager.my_peer_id();
|
||||||
data.peer_manager
|
data.peer_manager
|
||||||
.list_peers()
|
.list_peers()
|
||||||
.await
|
.await
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.filter(|peer_id| {
|
.filter(|peer_id| {
|
||||||
*peer_id != my_peer_id && !data.peer_manager.has_directly_connected_conn(*peer_id)
|
*peer_id != my_peer_id
|
||||||
|
&& !data.peer_manager.has_directly_connected_conn(*peer_id)
|
||||||
|
&& !data.peer_black_list.contains(peer_id)
|
||||||
})
|
})
|
||||||
.collect()
|
.collect()
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -260,7 +260,7 @@ impl PunchBothEasySymHoleClient {
|
|||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
let remote_ret = handle_rpc_result(remote_ret, dst_peer_id, self.blacklist.clone())?;
|
let remote_ret = handle_rpc_result(remote_ret, dst_peer_id, &self.blacklist)?;
|
||||||
|
|
||||||
if remote_ret.is_busy {
|
if remote_ret.is_busy {
|
||||||
*is_busy = true;
|
*is_busy = true;
|
||||||
|
|||||||
@@ -154,7 +154,7 @@ impl PunchConeHoleClient {
|
|||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
let resp = handle_rpc_result(resp, dst_peer_id, self.blacklist.clone())?;
|
let resp = handle_rpc_result(resp, dst_peer_id, &self.blacklist)?;
|
||||||
|
|
||||||
let remote_mapped_addr = resp.listener_mapped_addr.ok_or(anyhow::anyhow!(
|
let remote_mapped_addr = resp.listener_mapped_addr.ok_or(anyhow::anyhow!(
|
||||||
"select_punch_listener response missing listener_mapped_addr"
|
"select_punch_listener response missing listener_mapped_addr"
|
||||||
|
|||||||
@@ -183,7 +183,7 @@ impl BackOff {
|
|||||||
pub fn handle_rpc_result<T>(
|
pub fn handle_rpc_result<T>(
|
||||||
ret: Result<T, rpc_types::error::Error>,
|
ret: Result<T, rpc_types::error::Error>,
|
||||||
dst_peer_id: PeerId,
|
dst_peer_id: PeerId,
|
||||||
blacklist: Arc<timedmap::TimedMap<PeerId, ()>>,
|
blacklist: &timedmap::TimedMap<PeerId, ()>,
|
||||||
) -> Result<T, rpc_types::error::Error> {
|
) -> Result<T, rpc_types::error::Error> {
|
||||||
match ret {
|
match ret {
|
||||||
Ok(ret) => Ok(ret),
|
Ok(ret) => Ok(ret),
|
||||||
|
|||||||
@@ -437,7 +437,7 @@ impl PunchSymToConeHoleClient {
|
|||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
let resp = handle_rpc_result(resp, dst_peer_id, self.blacklist.clone())?;
|
let resp = handle_rpc_result(resp, dst_peer_id, &self.blacklist)?;
|
||||||
|
|
||||||
let remote_mapped_addr = resp.listener_mapped_addr.ok_or(anyhow::anyhow!(
|
let remote_mapped_addr = resp.listener_mapped_addr.ok_or(anyhow::anyhow!(
|
||||||
"select_punch_listener response missing listener_mapped_addr"
|
"select_punch_listener response missing listener_mapped_addr"
|
||||||
|
|||||||
Reference in New Issue
Block a user