support use ipv6

This commit is contained in:
sijie.sun
2024-04-27 23:10:28 +08:00
committed by Sijie.Sun
parent 3a965efab2
commit d8033a77b9
8 changed files with 219 additions and 99 deletions

View File

@@ -1,5 +1,6 @@
use std::{collections::BTreeSet, sync::Arc};
use anyhow::Context;
use dashmap::{DashMap, DashSet};
use tokio::{
sync::{broadcast::Receiver, mpsc, Mutex},
@@ -8,7 +9,10 @@ use tokio::{
};
use crate::{
common::PeerId, peers::peer_conn::PeerConnId, rpc as easytier_rpc, tunnel::TunnelConnector,
common::PeerId,
peers::peer_conn::PeerConnId,
rpc as easytier_rpc,
tunnel::{IpVersion, TunnelConnector},
};
use crate::{
@@ -254,66 +258,109 @@ impl ManualConnectorManager {
&all_urls - &curr_alive
}
async fn conn_reconnect_with_ip_version(
data: Arc<ConnectorManagerData>,
dead_url: String,
connector: MutexConnector,
ip_version: IpVersion,
) -> Result<ReconnResult, Error> {
let ip_collector = data.global_ctx.get_ip_collector();
let net_ns = data.net_ns.clone();
connector.lock().await.set_ip_version(ip_version);
set_bind_addr_for_peer_connector(
connector.lock().await.as_mut(),
ip_version == IpVersion::V4,
&ip_collector,
)
.await;
data.global_ctx.issue_event(GlobalCtxEvent::Connecting(
connector.lock().await.remote_url().clone(),
));
let _g = net_ns.guard();
log::info!("reconnect try connect... conn: {:?}", connector);
let tunnel = connector.lock().await.connect().await?;
log::info!("reconnect get tunnel succ: {:?}", tunnel);
assert_eq!(
dead_url,
tunnel.info().unwrap().remote_addr,
"info: {:?}",
tunnel.info()
);
let (peer_id, conn_id) = data.peer_manager.add_client_tunnel(tunnel).await?;
log::info!("reconnect succ: {} {} {}", peer_id, conn_id, dead_url);
Ok(ReconnResult {
dead_url,
peer_id,
conn_id,
})
}
async fn conn_reconnect(
data: Arc<ConnectorManagerData>,
dead_url: String,
connector: MutexConnector,
) -> Result<ReconnResult, Error> {
let connector = Arc::new(Mutex::new(Some(connector)));
let net_ns = data.net_ns.clone();
log::info!("reconnect: {}", dead_url);
let connector_clone = connector.clone();
let data_clone = data.clone();
let url_clone = dead_url.clone();
let ip_collector = data.global_ctx.get_ip_collector();
let reconn_task = async move {
let mut locked = connector_clone.lock().await;
let conn = locked.as_mut().unwrap();
// TODO: should support set v6 here, use url in connector array
set_bind_addr_for_peer_connector(conn.lock().await.as_mut(), true, &ip_collector).await;
data_clone
.global_ctx
.issue_event(GlobalCtxEvent::Connecting(
conn.lock().await.remote_url().clone(),
));
let _g = net_ns.guard();
log::info!("reconnect try connect... conn: {:?}", conn);
let tunnel = conn.lock().await.connect().await?;
log::info!("reconnect get tunnel succ: {:?}", tunnel);
assert_eq!(
url_clone,
tunnel.info().unwrap().remote_addr,
"info: {:?}",
tunnel.info()
);
let (peer_id, conn_id) = data_clone.peer_manager.add_client_tunnel(tunnel).await?;
log::info!("reconnect succ: {} {} {}", peer_id, conn_id, url_clone);
Ok(ReconnResult {
dead_url: url_clone,
peer_id,
conn_id,
})
};
let ret = timeout(std::time::Duration::from_secs(1), reconn_task).await;
log::info!("reconnect: {} done, ret: {:?}", dead_url, ret);
if ret.is_err() || ret.as_ref().unwrap().is_err() {
data.global_ctx.issue_event(GlobalCtxEvent::ConnectError(
dead_url.clone(),
format!("{:?}", ret),
));
let mut ip_versions = vec![];
let u = url::Url::parse(&dead_url)
.with_context(|| format!("failed to parse connector url {:?}", dead_url))?;
if u.scheme() == "ring" {
ip_versions.push(IpVersion::Both);
} else {
let addrs = u.socket_addrs(|| Some(1000))?;
let mut has_ipv4 = false;
let mut has_ipv6 = false;
for addr in addrs {
if addr.is_ipv4() {
if !has_ipv4 {
ip_versions.insert(0, IpVersion::V4);
}
has_ipv4 = true;
} else if addr.is_ipv6() {
if !has_ipv6 {
ip_versions.push(IpVersion::V6);
}
has_ipv6 = true;
}
}
}
let conn = connector.lock().await.take().unwrap();
data.reconnecting.remove(&dead_url).unwrap();
data.connectors.insert(dead_url.clone(), conn);
let mut reconn_ret = Err(Error::AnyhowError(anyhow::anyhow!(
"cannot get ip from url"
)));
for ip_version in ip_versions {
let ret = timeout(
std::time::Duration::from_secs(1),
Self::conn_reconnect_with_ip_version(
data.clone(),
dead_url.clone(),
connector.clone(),
ip_version,
),
)
.await;
log::info!("reconnect: {} done, ret: {:?}", dead_url, ret);
ret?
if ret.is_ok() && ret.as_ref().unwrap().is_ok() {
reconn_ret = ret.unwrap();
break;
} else {
if ret.is_err() {
reconn_ret = Err(ret.unwrap_err().into());
} else if ret.as_ref().unwrap().is_err() {
reconn_ret = Err(ret.unwrap().unwrap_err());
}
}
}
data.reconnecting.remove(&dead_url).unwrap();
data.connectors.insert(dead_url.clone(), connector);
reconn_ret
}
}