allow manually specify public address of listeners (#556)

This commit is contained in:
Sijie.Sun
2025-01-10 09:25:14 +08:00
committed by GitHub
parent 306817ae9a
commit bb0ccca3e5
7 changed files with 246 additions and 108 deletions

View File

@@ -1,6 +1,13 @@
// try connect peers directly, with either its public ip or lan ip
use std::{net::SocketAddr, sync::Arc, time::Duration};
use std::{
net::SocketAddr,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};
use crate::{
common::{error::Error, global_ctx::ArcGlobalCtx, PeerId},
@@ -29,6 +36,8 @@ use super::create_connector_by_url;
pub const DIRECT_CONNECTOR_SERVICE_ID: u32 = 1;
pub const DIRECT_CONNECTOR_BLACKLIST_TIMEOUT_SEC: u64 = 300;
static TESTING: AtomicBool = AtomicBool::new(false);
#[async_trait::async_trait]
pub trait PeerManagerForDirectConnector {
async fn list_peers(&self) -> Vec<PeerId>;
@@ -182,7 +191,7 @@ impl DirectConnectorManager {
// let (peer_id, conn_id) = data.peer_manager.try_connect(connector).await?;
if peer_id != dst_peer_id {
if peer_id != dst_peer_id && !TESTING.load(Ordering::Relaxed) {
tracing::info!(
"connect to ip succ: {}, but peer id mismatch, expect: {}, actual: {}",
addr,
@@ -279,87 +288,103 @@ impl DirectConnectorManager {
let listener_host = listener.socket_addrs(|| None).unwrap().pop();
match listener_host {
Some(SocketAddr::V4(_)) => {
ip_list.interface_ipv4s.iter().for_each(|ip| {
let mut addr = (*listener).clone();
if addr.set_host(Some(ip.to_string().as_str())).is_ok() {
tasks.spawn(Self::try_connect_to_ip(
data.clone(),
dst_peer_id.clone(),
addr.to_string(),
));
} else {
tracing::error!(
?ip,
?listener,
?dst_peer_id,
"failed to set host for interface ipv4"
);
}
});
Some(SocketAddr::V4(s_addr)) => {
if s_addr.ip().is_unspecified() {
ip_list.interface_ipv4s.iter().for_each(|ip| {
let mut addr = (*listener).clone();
if addr.set_host(Some(ip.to_string().as_str())).is_ok() {
tasks.spawn(Self::try_connect_to_ip(
data.clone(),
dst_peer_id.clone(),
addr.to_string(),
));
} else {
tracing::error!(
?ip,
?listener,
?dst_peer_id,
"failed to set host for interface ipv4"
);
}
});
if let Some(public_ipv4) = ip_list.public_ipv4 {
let mut addr = (*listener).clone();
if addr
.set_host(Some(public_ipv4.to_string().as_str()))
.is_ok()
{
tasks.spawn(Self::try_connect_to_ip(
data.clone(),
dst_peer_id.clone(),
addr.to_string(),
));
} else {
tracing::error!(
?public_ipv4,
?listener,
?dst_peer_id,
"failed to set host for public ipv4"
);
if let Some(public_ipv4) = ip_list.public_ipv4 {
let mut addr = (*listener).clone();
if addr
.set_host(Some(public_ipv4.to_string().as_str()))
.is_ok()
{
tasks.spawn(Self::try_connect_to_ip(
data.clone(),
dst_peer_id.clone(),
addr.to_string(),
));
} else {
tracing::error!(
?public_ipv4,
?listener,
?dst_peer_id,
"failed to set host for public ipv4"
);
}
}
} else if !s_addr.ip().is_loopback() || TESTING.load(Ordering::Relaxed) {
tasks.spawn(Self::try_connect_to_ip(
data.clone(),
dst_peer_id.clone(),
listener.to_string(),
));
}
}
Some(SocketAddr::V6(_)) => {
ip_list.interface_ipv6s.iter().for_each(|ip| {
let mut addr = (*listener).clone();
if addr
.set_host(Some(format!("[{}]", ip.to_string()).as_str()))
.is_ok()
{
tasks.spawn(Self::try_connect_to_ip(
data.clone(),
dst_peer_id.clone(),
addr.to_string(),
));
} else {
tracing::error!(
?ip,
?listener,
?dst_peer_id,
"failed to set host for interface ipv6"
);
}
});
Some(SocketAddr::V6(s_addr)) => {
if s_addr.ip().is_unspecified() {
ip_list.interface_ipv6s.iter().for_each(|ip| {
let mut addr = (*listener).clone();
if addr
.set_host(Some(format!("[{}]", ip.to_string()).as_str()))
.is_ok()
{
tasks.spawn(Self::try_connect_to_ip(
data.clone(),
dst_peer_id.clone(),
addr.to_string(),
));
} else {
tracing::error!(
?ip,
?listener,
?dst_peer_id,
"failed to set host for interface ipv6"
);
}
});
if let Some(public_ipv6) = ip_list.public_ipv6 {
let mut addr = (*listener).clone();
if addr
.set_host(Some(format!("[{}]", public_ipv6.to_string()).as_str()))
.is_ok()
{
tasks.spawn(Self::try_connect_to_ip(
data.clone(),
dst_peer_id.clone(),
addr.to_string(),
));
} else {
tracing::error!(
?public_ipv6,
?listener,
?dst_peer_id,
"failed to set host for public ipv6"
);
if let Some(public_ipv6) = ip_list.public_ipv6 {
let mut addr = (*listener).clone();
if addr
.set_host(Some(format!("[{}]", public_ipv6.to_string()).as_str()))
.is_ok()
{
tasks.spawn(Self::try_connect_to_ip(
data.clone(),
dst_peer_id.clone(),
addr.to_string(),
));
} else {
tracing::error!(
?public_ipv6,
?listener,
?dst_peer_id,
"failed to set host for public ipv6"
);
}
}
} else if !s_addr.ip().is_loopback() || TESTING.load(Ordering::Relaxed) {
tasks.spawn(Self::try_connect_to_ip(
data.clone(),
dst_peer_id.clone(),
listener.to_string(),
));
}
}
p => {
@@ -452,6 +477,49 @@ mod tests {
proto::peer_rpc::GetIpListResponse,
};
use super::TESTING;
#[tokio::test]
async fn direct_connector_mapped_listener() {
TESTING.store(true, std::sync::atomic::Ordering::Relaxed);
let p_a = create_mock_peer_manager().await;
let p_b = create_mock_peer_manager().await;
let p_c = create_mock_peer_manager().await;
let p_x = create_mock_peer_manager().await;
connect_peer_manager(p_a.clone(), p_b.clone()).await;
connect_peer_manager(p_b.clone(), p_c.clone()).await;
connect_peer_manager(p_c.clone(), p_x.clone()).await;
wait_route_appear(p_a.clone(), p_c.clone()).await.unwrap();
wait_route_appear(p_a.clone(), p_x.clone()).await.unwrap();
let mut f = p_a.get_global_ctx().get_flags();
f.bind_device = false;
p_a.get_global_ctx().config.set_flags(f);
p_c.get_global_ctx()
.config
.set_mapped_listeners(Some(vec!["tcp://127.0.0.1:11334".parse().unwrap()]));
p_x.get_global_ctx()
.config
.set_listeners(vec!["tcp://0.0.0.0:11334".parse().unwrap()]);
let mut lis_x = ListenerManager::new(p_x.get_global_ctx(), p_x.clone());
lis_x.prepare_listeners().await.unwrap();
lis_x.run().await.unwrap();
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
let mut dm_a = DirectConnectorManager::new(p_a.get_global_ctx(), p_a.clone());
let mut dm_c = DirectConnectorManager::new(p_c.get_global_ctx(), p_c.clone());
dm_a.run_as_client();
dm_c.run_as_server();
// p_c's mapped listener is p_x's listener, so p_a should connect to p_x directly
wait_route_appear_with_cost(p_a.clone(), p_x.my_peer_id(), Some(1))
.await
.unwrap();
}
#[rstest::rstest]
#[tokio::test]
async fn direct_connector_basic_test(

View File

@@ -56,23 +56,27 @@ pub async fn create_connector_by_url(
"tcp" => {
let dst_addr = check_scheme_and_get_socket_addr::<SocketAddr>(&url, "tcp")?;
let mut connector = TcpTunnelConnector::new(url);
set_bind_addr_for_peer_connector(
&mut connector,
dst_addr.is_ipv4(),
&global_ctx.get_ip_collector(),
)
.await;
if global_ctx.config.get_flags().bind_device {
set_bind_addr_for_peer_connector(
&mut connector,
dst_addr.is_ipv4(),
&global_ctx.get_ip_collector(),
)
.await;
}
return Ok(Box::new(connector));
}
"udp" => {
let dst_addr = check_scheme_and_get_socket_addr::<SocketAddr>(&url, "udp")?;
let mut connector = UdpTunnelConnector::new(url);
set_bind_addr_for_peer_connector(
&mut connector,
dst_addr.is_ipv4(),
&global_ctx.get_ip_collector(),
)
.await;
if global_ctx.config.get_flags().bind_device {
set_bind_addr_for_peer_connector(
&mut connector,
dst_addr.is_ipv4(),
&global_ctx.get_ip_collector(),
)
.await;
}
return Ok(Box::new(connector));
}
"ring" => {
@@ -84,12 +88,14 @@ pub async fn create_connector_by_url(
"quic" => {
let dst_addr = check_scheme_and_get_socket_addr::<SocketAddr>(&url, "quic")?;
let mut connector = QUICTunnelConnector::new(url);
set_bind_addr_for_peer_connector(
&mut connector,
dst_addr.is_ipv4(),
&global_ctx.get_ip_collector(),
)
.await;
if global_ctx.config.get_flags().bind_device {
set_bind_addr_for_peer_connector(
&mut connector,
dst_addr.is_ipv4(),
&global_ctx.get_ip_collector(),
)
.await;
}
return Ok(Box::new(connector));
}
#[cfg(feature = "wireguard")]
@@ -101,12 +107,14 @@ pub async fn create_connector_by_url(
&nid.network_secret.unwrap_or_default(),
);
let mut connector = WgTunnelConnector::new(url, wg_config);
set_bind_addr_for_peer_connector(
&mut connector,
dst_addr.is_ipv4(),
&global_ctx.get_ip_collector(),
)
.await;
if global_ctx.config.get_flags().bind_device {
set_bind_addr_for_peer_connector(
&mut connector,
dst_addr.is_ipv4(),
&global_ctx.get_ip_collector(),
)
.await;
}
return Ok(Box::new(connector));
}
#[cfg(feature = "websocket")]
@@ -114,12 +122,14 @@ pub async fn create_connector_by_url(
use crate::tunnel::{FromUrl, IpVersion};
let dst_addr = SocketAddr::from_url(url.clone(), IpVersion::Both)?;
let mut connector = crate::tunnel::websocket::WSTunnelConnector::new(url);
set_bind_addr_for_peer_connector(
&mut connector,
dst_addr.is_ipv4(),
&global_ctx.get_ip_collector(),
)
.await;
if global_ctx.config.get_flags().bind_device {
set_bind_addr_for_peer_connector(
&mut connector,
dst_addr.is_ipv4(),
&global_ctx.get_ip_collector(),
)
.await;
}
return Ok(Box::new(connector));
}
_ => {