improve direct connector

This commit is contained in:
sijie.sun
2024-03-23 16:34:48 +08:00
committed by Sijie.Sun
parent 2cfc5a6ef6
commit 9ed22eaf99
6 changed files with 68 additions and 28 deletions

View File

@@ -1,4 +1,4 @@
use std::sync::Arc; use std::sync::{Arc, Mutex};
use crate::rpc::PeerConnInfo; use crate::rpc::PeerConnInfo;
use crossbeam::atomic::AtomicCell; use crossbeam::atomic::AtomicCell;
@@ -50,6 +50,8 @@ pub struct GlobalCtx {
hotname: AtomicCell<Option<String>>, hotname: AtomicCell<Option<String>>,
stun_info_collection: Box<dyn StunInfoCollectorTrait>, stun_info_collection: Box<dyn StunInfoCollectorTrait>,
running_listeners: Mutex<Vec<url::Url>>,
} }
impl std::fmt::Debug for GlobalCtx { impl std::fmt::Debug for GlobalCtx {
@@ -90,6 +92,8 @@ impl GlobalCtx {
hotname: AtomicCell::new(None), hotname: AtomicCell::new(None),
stun_info_collection: Box::new(StunInfoCollector::new_with_default_servers()), stun_info_collection: Box::new(StunInfoCollector::new_with_default_servers()),
running_listeners: Mutex::new(Vec::new()),
} }
} }
@@ -180,6 +184,14 @@ impl GlobalCtx {
std::ptr::write(ptr, collector); std::ptr::write(ptr, collector);
} }
} }
pub fn get_running_listeners(&self) -> Vec<url::Url> {
self.running_listeners.lock().unwrap().clone()
}
pub fn add_running_listener(&self, url: url::Url) {
self.running_listeners.lock().unwrap().push(url);
}
} }
#[cfg(test)] #[cfg(test)]

View File

@@ -156,12 +156,7 @@ impl IPCollector {
#[tracing::instrument(skip(net_ns))] #[tracing::instrument(skip(net_ns))]
async fn do_collect_ip_addrs(with_public: bool, net_ns: NetNS) -> GetIpListResponse { async fn do_collect_ip_addrs(with_public: bool, net_ns: NetNS) -> GetIpListResponse {
let mut ret = crate::rpc::peer::GetIpListResponse { let mut ret = crate::rpc::peer::GetIpListResponse::new();
public_ipv4: "".to_string(),
interface_ipv4s: vec![],
public_ipv6: "".to_string(),
interface_ipv6s: vec![],
};
if with_public { if with_public {
if let Some(v4_addr) = if let Some(v4_addr) =

View File

@@ -7,7 +7,6 @@ use crate::{
constants::{self, DIRECT_CONNECTOR_BLACKLIST_TIMEOUT_SEC}, constants::{self, DIRECT_CONNECTOR_BLACKLIST_TIMEOUT_SEC},
error::Error, error::Error,
global_ctx::ArcGlobalCtx, global_ctx::ArcGlobalCtx,
network::IPCollector,
PeerId, PeerId,
}, },
peers::{peer_manager::PeerManager, peer_rpc::PeerRpcManager}, peers::{peer_manager::PeerManager, peer_rpc::PeerRpcManager},
@@ -56,21 +55,21 @@ impl PeerManagerForDirectConnector for PeerManager {
#[derive(Clone)] #[derive(Clone)]
struct DirectConnectorManagerRpcServer { struct DirectConnectorManagerRpcServer {
// TODO: this only cache for one src peer, should make it global // TODO: this only cache for one src peer, should make it global
ip_list_collector: Arc<IPCollector>, global_ctx: ArcGlobalCtx,
} }
#[tarpc::server] #[tarpc::server]
impl DirectConnectorRpc for DirectConnectorManagerRpcServer { impl DirectConnectorRpc for DirectConnectorManagerRpcServer {
async fn get_ip_list(self, _: tarpc::context::Context) -> GetIpListResponse { async fn get_ip_list(self, _: tarpc::context::Context) -> GetIpListResponse {
return self.ip_list_collector.collect_ip_addrs().await; let mut ret = self.global_ctx.get_ip_collector().collect_ip_addrs().await;
ret.listeners = self.global_ctx.get_running_listeners();
ret
} }
} }
impl DirectConnectorManagerRpcServer { impl DirectConnectorManagerRpcServer {
pub fn new(ip_collector: Arc<IPCollector>) -> Self { pub fn new(global_ctx: ArcGlobalCtx) -> Self {
Self { Self { global_ctx }
ip_list_collector: ip_collector,
}
} }
} }
@@ -119,7 +118,7 @@ impl DirectConnectorManager {
pub fn run_as_server(&mut self) { pub fn run_as_server(&mut self) {
self.data.peer_manager.get_peer_rpc_mgr().run_service( self.data.peer_manager.get_peer_rpc_mgr().run_service(
constants::DIRECT_CONNECTOR_SERVICE_ID, constants::DIRECT_CONNECTOR_SERVICE_ID,
DirectConnectorManagerRpcServer::new(self.global_ctx.get_ip_collector()).serve(), DirectConnectorManagerRpcServer::new(self.global_ctx.clone()).serve(),
); );
} }
@@ -241,9 +240,19 @@ impl DirectConnectorManager {
}) })
.await?; .await?;
let listener = ip_list
.listeners
.get(0)
.ok_or(anyhow::anyhow!("peer {} have no listener", dst_peer_id))?;
let mut tasks = JoinSet::new(); let mut tasks = JoinSet::new();
ip_list.interface_ipv4s.iter().for_each(|ip| { ip_list.interface_ipv4s.iter().for_each(|ip| {
let addr = format!("{}://{}:{}", "tcp", ip, 11010); let addr = format!(
"{}://{}:{}",
listener.scheme(),
ip,
listener.port().unwrap_or(11010)
);
tasks.spawn(Self::try_connect_to_ip( tasks.spawn(Self::try_connect_to_ip(
data.clone(), data.clone(),
dst_peer_id.clone(), dst_peer_id.clone(),
@@ -251,7 +260,12 @@ impl DirectConnectorManager {
)); ));
}); });
let addr = format!("{}://{}:{}", "tcp", ip_list.public_ipv4.clone(), 11010); let addr = format!(
"{}://{}:{}",
listener.scheme(),
ip_list.public_ipv4.clone(),
listener.port().unwrap_or(11010)
);
tasks.spawn(Self::try_connect_to_ip( tasks.spawn(Self::try_connect_to_ip(
data.clone(), data.clone(),
dst_peer_id.clone(), dst_peer_id.clone(),
@@ -300,7 +314,7 @@ mod tests {
lis_c lis_c
.add_listener(TcpTunnelListener::new( .add_listener(TcpTunnelListener::new(
"tcp://0.0.0.0:11010".parse().unwrap(), "tcp://0.0.0.0:11040".parse().unwrap(),
)) ))
.await .await
.unwrap(); .unwrap();

View File

@@ -91,6 +91,7 @@ impl<H: TunnelHandlerForListener + Send + Sync + 'static + Debug> ListenerManage
global_ctx: ArcGlobalCtx, global_ctx: ArcGlobalCtx,
) { ) {
let mut l = listener.lock().await; let mut l = listener.lock().await;
global_ctx.add_running_listener(l.local_url());
global_ctx.issue_event(GlobalCtxEvent::ListenerAdded(l.local_url())); global_ctx.issue_event(GlobalCtxEvent::ListenerAdded(l.local_url()));
while let Ok(ret) = l.accept().await { while let Ok(ret) = l.accept().await {
let tunnel_info = ret.info().unwrap(); let tunnel_info = ret.info().unwrap();

View File

@@ -19,7 +19,7 @@ mod rpc;
mod tunnels; mod tunnels;
use common::{ use common::{
config::{ConsoleLoggerConfig, FileLoggerConfig, PeerConfig}, config::{ConsoleLoggerConfig, FileLoggerConfig, NetworkIdentity, PeerConfig},
get_logger_timer_rfc3339, get_logger_timer_rfc3339,
}; };
use instance::instance::Instance; use instance::instance::Instance;
@@ -34,21 +34,18 @@ use crate::common::{
#[derive(Parser, Debug)] #[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)] #[command(author, version, about, long_about = None)]
struct Cli { struct Cli {
/// the instance name
#[arg( #[arg(
short = 'm',
long, long,
default_value = "default", help = "network name to identify this vpn network",
help = "instance name to identify this vpn node in same machine" default_value = "default"
)] )]
instance_name: String, network_name: String,
#[arg( #[arg(
short = 'd',
long, long,
help = "instance uuid to identify this vpn node in whole vpn network example: 123e4567-e89b-12d3-a456-426614174000" help = "network secret to verify this node belongs to the vpn network",
default_value = ""
)] )]
instance_id: Option<String>, network_secret: String,
#[arg(short, long, help = "ipv4 address of this vpn node")] #[arg(short, long, help = "ipv4 address of this vpn node")]
ipv4: Option<String>, ipv4: Option<String>,
@@ -92,12 +89,31 @@ struct Cli {
file_log_level: Option<String>, file_log_level: Option<String>,
#[arg(long, help = "directory to store log files")] #[arg(long, help = "directory to store log files")]
file_log_dir: Option<String>, file_log_dir: Option<String>,
#[arg(
short = 'm',
long,
default_value = "default",
help = "instance name to identify this vpn node in same machine"
)]
instance_name: String,
#[arg(
short = 'd',
long,
help = "instance uuid to identify this vpn node in whole vpn network example: 123e4567-e89b-12d3-a456-426614174000"
)]
instance_id: Option<String>,
} }
impl From<Cli> for TomlConfigLoader { impl From<Cli> for TomlConfigLoader {
fn from(cli: Cli) -> Self { fn from(cli: Cli) -> Self {
let cfg = TomlConfigLoader::default(); let cfg = TomlConfigLoader::default();
cfg.set_inst_name(cli.instance_name.clone()); cfg.set_inst_name(cli.instance_name.clone());
cfg.set_network_identity(NetworkIdentity {
network_name: cli.network_name.clone(),
network_secret: cli.network_secret.clone(),
});
cfg.set_netns(cli.net_ns.clone()); cfg.set_netns(cli.net_ns.clone());
if let Some(ipv4) = &cli.ipv4 { if let Some(ipv4) = &cli.ipv4 {

View File

@@ -6,6 +6,7 @@ pub struct GetIpListResponse {
pub interface_ipv4s: Vec<String>, pub interface_ipv4s: Vec<String>,
pub public_ipv6: String, pub public_ipv6: String,
pub interface_ipv6s: Vec<String>, pub interface_ipv6s: Vec<String>,
pub listeners: Vec<url::Url>,
} }
impl GetIpListResponse { impl GetIpListResponse {
@@ -15,6 +16,7 @@ impl GetIpListResponse {
interface_ipv4s: vec![], interface_ipv4s: vec![],
public_ipv6: "".to_string(), public_ipv6: "".to_string(),
interface_ipv6s: vec![], interface_ipv6s: vec![],
listeners: vec![],
} }
} }
} }