fix ipv6 direct connector not work

This commit is contained in:
sijie.sun
2024-09-30 15:56:24 +08:00
committed by Sijie.Sun
parent 984ed8f6cf
commit ba3da97ad4
5 changed files with 113 additions and 38 deletions

View File

@@ -1,6 +1,6 @@
// try connect peers directly, with either its public ip or lan ip
use std::{net::SocketAddr, sync::Arc};
use std::{net::SocketAddr, sync::Arc, time::Duration};
use crate::{
common::{error::Error, global_ctx::ArcGlobalCtx, PeerId},
@@ -19,6 +19,7 @@ use crate::{
use crate::proto::cli::PeerConnInfo;
use anyhow::Context;
use rand::Rng;
use tokio::{task::JoinSet, time::timeout};
use tracing::Instrument;
use url::Host;
@@ -64,13 +65,13 @@ impl PeerManagerForDirectConnector for PeerManager {
struct DstBlackListItem(PeerId, String);
#[derive(Hash, Eq, PartialEq, Clone)]
struct DstSchemeBlackListItem(PeerId, String);
struct DstListenerUrlBlackListItem(PeerId, url::Url);
struct DirectConnectorManagerData {
global_ctx: ArcGlobalCtx,
peer_manager: Arc<PeerManager>,
dst_blacklist: timedmap::TimedMap<DstBlackListItem, ()>,
dst_sceme_blacklist: timedmap::TimedMap<DstSchemeBlackListItem, ()>,
dst_listener_blacklist: timedmap::TimedMap<DstListenerUrlBlackListItem, ()>,
}
impl DirectConnectorManagerData {
@@ -79,7 +80,7 @@ impl DirectConnectorManagerData {
global_ctx,
peer_manager,
dst_blacklist: timedmap::TimedMap::new(),
dst_sceme_blacklist: timedmap::TimedMap::new(),
dst_listener_blacklist: timedmap::TimedMap::new(),
}
}
}
@@ -147,7 +148,7 @@ impl DirectConnectorManager {
}
while let Some(task_ret) = tasks.join_next().await {
tracing::trace!(?task_ret, "direct connect task ret");
tracing::debug!(?task_ret, ?my_peer_id, "direct connect task ret");
}
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
@@ -168,7 +169,7 @@ impl DirectConnectorManager {
.dst_blacklist
.contains(&DstBlackListItem(dst_peer_id.clone(), addr.clone()))
{
tracing::trace!("try_connect_to_ip failed, addr in blacklist: {}", addr);
tracing::debug!("try_connect_to_ip failed, addr in blacklist: {}", addr);
return Err(Error::UrlInBlacklist);
}
@@ -203,24 +204,38 @@ impl DirectConnectorManager {
dst_peer_id: PeerId,
addr: String,
) -> Result<(), Error> {
let ret = Self::do_try_connect_to_ip(data.clone(), dst_peer_id, addr.clone()).await;
if let Err(e) = ret {
if !matches!(e, Error::UrlInBlacklist) {
tracing::info!(
"try_connect_to_ip failed: {:?}, peer_id: {}",
e,
dst_peer_id
);
let mut rand_gen = rand::rngs::OsRng::default();
let backoff_ms = vec![1000, 2000, 4000];
let mut backoff_idx = 0;
loop {
let ret = Self::do_try_connect_to_ip(data.clone(), dst_peer_id, addr.clone()).await;
tracing::debug!(?ret, ?dst_peer_id, ?addr, "try_connect_to_ip return");
if matches!(ret, Err(Error::UrlInBlacklist) | Ok(_)) {
return ret;
}
if backoff_idx < backoff_ms.len() {
let delta = backoff_ms[backoff_idx] >> 1;
assert!(delta > 0);
assert!(delta < backoff_ms[backoff_idx]);
tokio::time::sleep(Duration::from_millis(
(backoff_ms[backoff_idx] + rand_gen.gen_range(-delta..delta)) as u64,
))
.await;
backoff_idx += 1;
continue;
} else {
data.dst_blacklist.insert(
DstBlackListItem(dst_peer_id.clone(), addr.clone()),
(),
std::time::Duration::from_secs(DIRECT_CONNECTOR_BLACKLIST_TIMEOUT_SEC),
);
return ret;
}
return Err(e);
} else {
tracing::info!("try_connect_to_ip success, peer_id: {}", dst_peer_id);
return Ok(());
}
}
@@ -230,6 +245,8 @@ impl DirectConnectorManager {
dst_peer_id: PeerId,
ip_list: GetIpListResponse,
) -> Result<(), Error> {
data.dst_listener_blacklist.cleanup();
let enable_ipv6 = data.global_ctx.get_flags().enable_ipv6;
let available_listeners = ip_list
.listeners
@@ -238,14 +255,15 @@ impl DirectConnectorManager {
.filter_map(|l| if l.scheme() != "ring" { Some(l) } else { None })
.filter(|l| l.port().is_some() && l.host().is_some())
.filter(|l| {
!data.dst_sceme_blacklist.contains(&DstSchemeBlackListItem(
dst_peer_id.clone(),
l.scheme().to_string(),
))
!data
.dst_listener_blacklist
.contains(&DstListenerUrlBlackListItem(dst_peer_id.clone(), l.clone()))
})
.filter(|l| enable_ipv6 || !matches!(l.host().unwrap().to_owned(), Host::Ipv6(_)))
.collect::<Vec<_>>();
tracing::debug!(?available_listeners, "got available listeners");
let mut listener = available_listeners.get(0).ok_or(anyhow::anyhow!(
"peer {} have no valid listener",
dst_peer_id
@@ -270,6 +288,13 @@ impl DirectConnectorManager {
dst_peer_id.clone(),
addr.to_string(),
));
} else {
tracing::error!(
?ip,
?listener,
?dst_peer_id,
"failed to set host for interface ipv4"
);
}
});
@@ -284,6 +309,13 @@ impl DirectConnectorManager {
dst_peer_id.clone(),
addr.to_string(),
));
} else {
tracing::error!(
?public_ipv4,
?listener,
?dst_peer_id,
"failed to set host for public ipv4"
);
}
}
}
@@ -299,6 +331,13 @@ impl DirectConnectorManager {
dst_peer_id.clone(),
addr.to_string(),
));
} else {
tracing::error!(
?ip,
?listener,
?dst_peer_id,
"failed to set host for interface ipv6"
);
}
});
@@ -313,6 +352,13 @@ impl DirectConnectorManager {
dst_peer_id.clone(),
addr.to_string(),
));
} else {
tracing::error!(
?public_ipv6,
?listener,
?dst_peer_id,
"failed to set host for public ipv6"
);
}
}
}
@@ -323,16 +369,28 @@ impl DirectConnectorManager {
let mut has_succ = false;
while let Some(ret) = tasks.join_next().await {
if let Err(e) = ret {
tracing::error!("join direct connect task failed: {:?}", e);
} else if let Ok(Ok(_)) = ret {
has_succ = true;
match ret {
Ok(Ok(_)) => {
has_succ = true;
tracing::info!(
?dst_peer_id,
?listener,
"try direct connect to peer success"
);
break;
}
Ok(Err(e)) => {
tracing::info!(?e, "try direct connect to peer failed");
}
Err(e) => {
tracing::error!(?e, "try direct connect to peer task join failed");
}
}
}
if !has_succ {
data.dst_sceme_blacklist.insert(
DstSchemeBlackListItem(dst_peer_id.clone(), listener.scheme().to_string()),
data.dst_listener_blacklist.insert(
DstListenerUrlBlackListItem(dst_peer_id.clone(), listener.clone()),
(),
std::time::Duration::from_secs(DIRECT_CONNECTOR_BLACKLIST_TIMEOUT_SEC),
);
@@ -355,7 +413,7 @@ impl DirectConnectorManager {
}
}
tracing::trace!("try direct connect to peer: {}", dst_peer_id);
tracing::debug!("try direct connect to peer: {}", dst_peer_id);
let rpc_stub = peer_manager
.get_peer_rpc_mgr()
@@ -384,7 +442,7 @@ mod tests {
use crate::{
connector::direct::{
DirectConnectorManager, DirectConnectorManagerData, DstBlackListItem,
DstSchemeBlackListItem,
DstListenerUrlBlackListItem,
},
instance::listeners::ListenerManager,
peers::tests::{
@@ -461,8 +519,11 @@ mod tests {
.unwrap();
assert!(data
.dst_sceme_blacklist
.contains(&DstSchemeBlackListItem(1, "tcp".into())));
.dst_listener_blacklist
.contains(&DstListenerUrlBlackListItem(
1,
"tcp://127.0.0.1:10222".parse().unwrap()
)));
assert!(data
.dst_blacklist