improve direct connector (#685)

* support ipv6 stun
* show interface and public ip in cli node info
* direct conn should keep trying unless already direct connected
* peer should use conn with smallest latency
* deprecate ipv6_listener, use -l instead
This commit is contained in:
Sijie.Sun
2025-03-17 10:46:14 +08:00
committed by GitHub
parent f84ae228fc
commit 23f69ce6a4
24 changed files with 558 additions and 269 deletions

View File

@@ -1,7 +1,9 @@
// try connect peers directly, with either its public ip or lan ip
use std::{
net::SocketAddr,
collections::HashSet,
net::{Ipv6Addr, SocketAddr},
str::FromStr,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
@@ -79,7 +81,6 @@ struct DstListenerUrlBlackListItem(PeerId, url::Url);
struct DirectConnectorManagerData {
global_ctx: ArcGlobalCtx,
peer_manager: Arc<PeerManager>,
dst_blacklist: timedmap::TimedMap<DstBlackListItem, ()>,
dst_listener_blacklist: timedmap::TimedMap<DstListenerUrlBlackListItem, ()>,
}
@@ -88,7 +89,6 @@ impl DirectConnectorManagerData {
Self {
global_ctx,
peer_manager,
dst_blacklist: timedmap::TimedMap::new(),
dst_listener_blacklist: timedmap::TimedMap::new(),
}
}
@@ -150,7 +150,9 @@ impl DirectConnectorManager {
let peers = data.peer_manager.list_peers().await;
let mut tasks = JoinSet::new();
for peer_id in peers {
if peer_id == my_peer_id {
if peer_id == my_peer_id
|| data.peer_manager.has_directly_connected_conn(peer_id)
{
continue;
}
tasks.spawn(Self::do_try_direct_connect(data.clone(), peer_id));
@@ -173,24 +175,13 @@ impl DirectConnectorManager {
dst_peer_id: PeerId,
addr: String,
) -> Result<(), Error> {
data.dst_blacklist.cleanup();
if data
.dst_blacklist
.contains(&DstBlackListItem(dst_peer_id.clone(), addr.clone()))
{
tracing::debug!("try_connect_to_ip failed, addr in blacklist: {}", addr);
return Err(Error::UrlInBlacklist);
}
let connector = create_connector_by_url(&addr, &data.global_ctx).await?;
let (peer_id, conn_id) = timeout(
std::time::Duration::from_secs(5),
data.peer_manager.try_connect(connector),
std::time::Duration::from_secs(3),
data.peer_manager.try_direct_connect(connector),
)
.await??;
// let (peer_id, conn_id) = data.peer_manager.try_connect(connector).await?;
if peer_id != dst_peer_id && !TESTING.load(Ordering::Relaxed) {
tracing::info!(
"connect to ip succ: {}, but peer id mismatch, expect: {}, actual: {}",
@@ -204,6 +195,7 @@ impl DirectConnectorManager {
.await?;
return Err(Error::InvalidUrl(addr));
}
Ok(())
}
@@ -214,7 +206,7 @@ impl DirectConnectorManager {
addr: String,
) -> Result<(), Error> {
let mut rand_gen = rand::rngs::OsRng::default();
let backoff_ms = vec![1000, 2000, 4000];
let backoff_ms = vec![1000, 2000];
let mut backoff_idx = 0;
loop {
@@ -237,12 +229,6 @@ impl DirectConnectorManager {
backoff_idx += 1;
continue;
} else {
data.dst_blacklist.insert(
DstBlackListItem(dst_peer_id.clone(), addr.clone()),
(),
std::time::Duration::from_secs(DIRECT_CONNECTOR_BLACKLIST_TIMEOUT_SEC),
);
return ret;
}
}
@@ -273,61 +259,43 @@ impl DirectConnectorManager {
tracing::debug!(?available_listeners, "got available listeners");
let mut listener = available_listeners.get(0).ok_or(anyhow::anyhow!(
"peer {} have no valid listener",
dst_peer_id
))?;
if available_listeners.is_empty() {
return Err(anyhow::anyhow!("peer {} have no valid listener", dst_peer_id).into());
}
// if have default listener, use it first
listener = available_listeners
let listener = available_listeners
.iter()
.find(|l| l.scheme() == data.global_ctx.get_flags().default_protocol)
.unwrap_or(listener);
.unwrap_or(available_listeners.get(0).unwrap());
let mut tasks = JoinSet::new();
let mut tasks = bounded_join_set::JoinSet::new(2);
let listener_host = listener.socket_addrs(|| None).unwrap().pop();
match listener_host {
Some(SocketAddr::V4(s_addr)) => {
if s_addr.ip().is_unspecified() {
ip_list.interface_ipv4s.iter().for_each(|ip| {
let mut addr = (*listener).clone();
if addr.set_host(Some(ip.to_string().as_str())).is_ok() {
tasks.spawn(Self::try_connect_to_ip(
data.clone(),
dst_peer_id.clone(),
addr.to_string(),
));
} else {
tracing::error!(
?ip,
?listener,
?dst_peer_id,
"failed to set host for interface ipv4"
);
}
});
if let Some(public_ipv4) = ip_list.public_ipv4 {
let mut addr = (*listener).clone();
if addr
.set_host(Some(public_ipv4.to_string().as_str()))
.is_ok()
{
tasks.spawn(Self::try_connect_to_ip(
data.clone(),
dst_peer_id.clone(),
addr.to_string(),
));
} else {
tracing::error!(
?public_ipv4,
?listener,
?dst_peer_id,
"failed to set host for public ipv4"
);
}
}
ip_list
.interface_ipv4s
.iter()
.chain(ip_list.public_ipv4.iter())
.for_each(|ip| {
let mut addr = (*listener).clone();
if addr.set_host(Some(ip.to_string().as_str())).is_ok() {
tasks.spawn(Self::try_connect_to_ip(
data.clone(),
dst_peer_id.clone(),
addr.to_string(),
));
} else {
tracing::error!(
?ip,
?listener,
?dst_peer_id,
"failed to set host for interface ipv4"
);
}
});
} else if !s_addr.ip().is_loopback() || TESTING.load(Ordering::Relaxed) {
tasks.spawn(Self::try_connect_to_ip(
data.clone(),
@@ -338,47 +306,42 @@ impl DirectConnectorManager {
}
Some(SocketAddr::V6(s_addr)) => {
if s_addr.ip().is_unspecified() {
ip_list.interface_ipv6s.iter().for_each(|ip| {
let mut addr = (*listener).clone();
if addr
.set_host(Some(format!("[{}]", ip.to_string()).as_str()))
.is_ok()
{
tasks.spawn(Self::try_connect_to_ip(
data.clone(),
dst_peer_id.clone(),
addr.to_string(),
));
} else {
tracing::error!(
?ip,
?listener,
?dst_peer_id,
"failed to set host for interface ipv6"
);
}
});
if let Some(public_ipv6) = ip_list.public_ipv6 {
let mut addr = (*listener).clone();
if addr
.set_host(Some(format!("[{}]", public_ipv6.to_string()).as_str()))
.is_ok()
{
tasks.spawn(Self::try_connect_to_ip(
data.clone(),
dst_peer_id.clone(),
addr.to_string(),
));
} else {
tracing::error!(
?public_ipv6,
?listener,
?dst_peer_id,
"failed to set host for public ipv6"
);
}
}
// for ipv6, only try public ip
ip_list
.interface_ipv6s
.iter()
.chain(ip_list.public_ipv6.iter())
.filter_map(|x| Ipv6Addr::from_str(&x.to_string()).ok())
.filter(|x| {
TESTING.load(Ordering::Relaxed)
|| (!x.is_loopback()
&& !x.is_unspecified()
&& !x.is_unique_local()
&& !x.is_unicast_link_local()
&& !x.is_multicast())
})
.collect::<HashSet<_>>()
.iter()
.for_each(|ip| {
let mut addr = (*listener).clone();
if addr
.set_host(Some(format!("[{}]", ip.to_string()).as_str()))
.is_ok()
{
tasks.spawn(Self::try_connect_to_ip(
data.clone(),
dst_peer_id.clone(),
addr.to_string(),
));
} else {
tracing::error!(
?ip,
?listener,
?dst_peer_id,
"failed to set host for public ipv6"
);
}
});
} else if !s_addr.ip().is_loopback() || TESTING.load(Ordering::Relaxed) {
tasks.spawn(Self::try_connect_to_ip(
data.clone(),
@@ -430,14 +393,6 @@ impl DirectConnectorManager {
dst_peer_id: PeerId,
) -> Result<(), Error> {
let peer_manager = data.peer_manager.clone();
// check if we have direct connection with dst_peer_id
if let Some(c) = peer_manager.list_peer_conns(dst_peer_id).await {
// currently if we have any type of direct connection (udp or tcp), we will not try to connect
if !c.is_empty() {
return Ok(());
}
}
tracing::debug!("try direct connect to peer: {}", dst_peer_id);
let rpc_stub = peer_manager
@@ -466,8 +421,7 @@ mod tests {
use crate::{
connector::direct::{
DirectConnectorManager, DirectConnectorManagerData, DstBlackListItem,
DstListenerUrlBlackListItem,
DirectConnectorManager, DirectConnectorManagerData, DstListenerUrlBlackListItem,
},
instance::listeners::ListenerManager,
peers::tests::{
@@ -526,9 +480,7 @@ mod tests {
#[values("tcp", "udp", "wg")] proto: &str,
#[values("true", "false")] ipv6: bool,
) {
if ipv6 && proto != "udp" {
return;
}
TESTING.store(true, std::sync::atomic::Ordering::Relaxed);
let p_a = create_mock_peer_manager().await;
let p_b = create_mock_peer_manager().await;
@@ -544,14 +496,18 @@ mod tests {
dm_a.run_as_client();
dm_c.run_as_server();
let port = if proto == "wg" { 11040 } else { 11041 };
if !ipv6 {
let port = if proto == "wg" { 11040 } else { 11041 };
p_c.get_global_ctx().config.set_listeners(vec![format!(
"{}://0.0.0.0:{}",
proto, port
)
.parse()
.unwrap()]);
} else {
p_c.get_global_ctx()
.config
.set_listeners(vec![format!("{}://[::]:{}", proto, port).parse().unwrap()]);
}
let mut f = p_c.get_global_ctx().config.get_flags();
f.enable_ipv6 = ipv6;
@@ -592,9 +548,5 @@ mod tests {
1,
"tcp://127.0.0.1:10222".parse().unwrap()
)));
assert!(data
.dst_blacklist
.contains(&DstBlackListItem(1, ip_list.listeners[0].to_string())));
}
}