fix(core): Fix sleep-wake reconnect by resetting alive_conn_urls (#1593)

Co-authored-by: sijie.sun <sijie.sun@smartx.com>
This commit is contained in:
datasone
2025-12-05 14:31:08 +08:00
committed by GitHub
parent 43a650f9ab
commit 2a656d6a0c
5 changed files with 84 additions and 147 deletions

4
Cargo.lock generated
View File

@@ -4722,9 +4722,9 @@ dependencies = [
[[package]] [[package]]
name = "multimap" name = "multimap"
version = "0.10.0" version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "defc4c55412d89136f966bbb339008b474350e5e6e78d2714439c386b3137a03" checksum = "1d87ecb2933e8aeadb3e3a02b828fed80a7528047e68b4f424523a0981a3a084"
dependencies = [ dependencies = [
"serde", "serde",
] ]

View File

@@ -213,7 +213,7 @@ hickory-client = "0.25.2"
hickory-server = { version = "0.25.2", features = ["resolver"] } hickory-server = { version = "0.25.2", features = ["resolver"] }
derive_builder = "0.20.2" derive_builder = "0.20.2"
humantime-serde = "1.1.1" humantime-serde = "1.1.1"
multimap = "0.10.0" multimap = "0.10.1"
version-compare = "0.2.0" version-compare = "0.2.0"
hmac = "0.12.1" hmac = "0.12.1"
sha2 = "0.10.8" sha2 = "0.10.8"

View File

@@ -3,24 +3,16 @@ use std::{
sync::{Arc, Weak}, sync::{Arc, Weak},
}; };
use anyhow::Context;
use dashmap::DashSet; use dashmap::DashSet;
use tokio::{ use tokio::{sync::mpsc, task::JoinSet, time::timeout};
sync::{
broadcast::{error::RecvError, Receiver},
mpsc,
},
task::JoinSet,
time::timeout,
};
use crate::{ use crate::{
common::{dns::socket_addrs, join_joinset_background, PeerId}, common::{dns::socket_addrs, join_joinset_background, PeerId},
peers::{peer_conn::PeerConnId, peer_map::PeerMap}, peers::peer_conn::PeerConnId,
proto::{ proto::{
api::instance::{ api::instance::{
Connector, ConnectorManageRpc, ConnectorStatus, ListConnectorRequest, Connector, ConnectorManageRpc, ConnectorStatus, ListConnectorRequest,
ListConnectorResponse, PeerConnInfo, ListConnectorResponse,
}, },
rpc_types::{self, controller::BaseController}, rpc_types::{self, controller::BaseController},
}, },
@@ -40,7 +32,7 @@ use crate::{
use super::create_connector_by_url; use super::create_connector_by_url;
type ConnectorMap = Arc<DashSet<String>>; type ConnectorMap = Arc<DashSet<url::Url>>;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
struct ReconnResult { struct ReconnResult {
@@ -51,11 +43,11 @@ struct ReconnResult {
struct ConnectorManagerData { struct ConnectorManagerData {
connectors: ConnectorMap, connectors: ConnectorMap,
reconnecting: DashSet<String>, reconnecting: DashSet<url::Url>,
peer_manager: Weak<PeerManager>, peer_manager: Weak<PeerManager>,
alive_conn_urls: Arc<DashSet<String>>, alive_conn_urls: Arc<DashSet<url::Url>>,
// user removed connector urls // user removed connector urls
removed_conn_urls: Arc<DashSet<String>>, removed_conn_urls: Arc<DashSet<url::Url>>,
net_ns: NetNS, net_ns: NetNS,
global_ctx: ArcGlobalCtx, global_ctx: ArcGlobalCtx,
} }
@@ -70,7 +62,6 @@ impl ManualConnectorManager {
pub fn new(global_ctx: ArcGlobalCtx, peer_manager: Arc<PeerManager>) -> Self { pub fn new(global_ctx: ArcGlobalCtx, peer_manager: Arc<PeerManager>) -> Self {
let connectors = Arc::new(DashSet::new()); let connectors = Arc::new(DashSet::new());
let tasks = JoinSet::new(); let tasks = JoinSet::new();
let event_subscriber = global_ctx.subscribe();
let mut ret = Self { let mut ret = Self {
global_ctx: global_ctx.clone(), global_ctx: global_ctx.clone(),
@@ -88,10 +79,6 @@ impl ManualConnectorManager {
ret.tasks ret.tasks
.spawn(Self::conn_mgr_reconn_routine(ret.data.clone())); .spawn(Self::conn_mgr_reconn_routine(ret.data.clone()));
ret.tasks.spawn(Self::conn_mgr_handle_event_routine(
ret.data.clone(),
event_subscriber,
));
ret ret
} }
@@ -101,11 +88,11 @@ impl ManualConnectorManager {
T: TunnelConnector + 'static, T: TunnelConnector + 'static,
{ {
tracing::info!("add_connector: {}", connector.remote_url()); tracing::info!("add_connector: {}", connector.remote_url());
self.data.connectors.insert(connector.remote_url().into()); self.data.connectors.insert(connector.remote_url());
} }
pub async fn add_connector_by_url(&self, url: &str) -> Result<(), Error> { pub async fn add_connector_by_url(&self, url: url::Url) -> Result<(), Error> {
self.data.connectors.insert(url.to_owned()); self.data.connectors.insert(url);
Ok(()) Ok(())
} }
@@ -120,34 +107,28 @@ impl ManualConnectorManager {
{ {
return Err(Error::NotFound); return Err(Error::NotFound);
} }
self.data.removed_conn_urls.insert(url.to_string()); self.data.removed_conn_urls.insert(url.into());
Ok(()) Ok(())
} }
pub async fn clear_connectors(&self) { pub async fn clear_connectors(&self) {
self.list_connectors().await.iter().for_each(|x| { self.list_connectors().await.iter().for_each(|x| {
if let Some(url) = &x.url { if let Some(url) = &x.url {
self.data.removed_conn_urls.insert(url.to_string()); self.data.removed_conn_urls.insert(url.clone().into());
} }
}); });
} }
pub async fn list_connectors(&self) -> Vec<Connector> { pub async fn list_connectors(&self) -> Vec<Connector> {
let conn_urls: BTreeSet<String> = self let dead_urls: BTreeSet<url::Url> = Self::collect_dead_conns(self.data.clone())
.data
.connectors
.iter()
.map(|x| x.key().clone())
.collect();
let dead_urls: BTreeSet<String> = Self::collect_dead_conns(self.data.clone())
.await .await
.into_iter() .into_iter()
.collect(); .collect();
let mut ret = Vec::new(); let mut ret = Vec::new();
for conn_url in conn_urls { for item in self.data.connectors.iter() {
let conn_url = item.key().clone();
let mut status = ConnectorStatus::Connected; let mut status = ConnectorStatus::Connected;
if dead_urls.contains(&conn_url) { if dead_urls.contains(&conn_url) {
status = ConnectorStatus::Disconnected; status = ConnectorStatus::Disconnected;
@@ -155,20 +136,20 @@ impl ManualConnectorManager {
ret.insert( ret.insert(
0, 0,
Connector { Connector {
url: Some(conn_url.parse().unwrap()), url: Some(conn_url.into()),
status: status.into(), status: status.into(),
}, },
); );
} }
let reconnecting_urls: BTreeSet<String> = let reconnecting_urls: BTreeSet<url::Url> =
self.data.reconnecting.iter().map(|x| x.clone()).collect(); self.data.reconnecting.iter().map(|x| x.clone()).collect();
for conn_url in reconnecting_urls { for conn_url in reconnecting_urls {
ret.insert( ret.insert(
0, 0,
Connector { Connector {
url: Some(conn_url.parse().unwrap()), url: Some(conn_url.into()),
status: ConnectorStatus::Connecting.into(), status: ConnectorStatus::Connecting.into(),
}, },
); );
@@ -177,49 +158,6 @@ impl ManualConnectorManager {
ret ret
} }
async fn conn_mgr_handle_event_routine(
data: Arc<ConnectorManagerData>,
mut event_recv: Receiver<GlobalCtxEvent>,
) {
loop {
match event_recv.recv().await {
Ok(event) => {
Self::handle_event(&event, &data).await;
}
Err(RecvError::Lagged(n)) => {
tracing::warn!("event_recv lagged: {}, rebuild alive conn list", n);
event_recv = event_recv.resubscribe();
data.alive_conn_urls.clear();
let Some(pm) = data.peer_manager.upgrade() else {
tracing::warn!("peer manager is gone, exit");
break;
};
let fill_alive_urls_with_peer_map = |peer_map: &PeerMap| {
for x in peer_map.get_alive_conns().iter().map(|x| {
x.tunnel
.clone()
.unwrap_or_default()
.remote_addr
.unwrap_or_default()
.to_string()
}) {
data.alive_conn_urls.insert(x);
}
};
fill_alive_urls_with_peer_map(&pm.get_peer_map());
fill_alive_urls_with_peer_map(&pm.get_foreign_network_client().get_peer_map());
continue;
}
Err(RecvError::Closed) => {
tracing::warn!("event_recv closed, exit");
break;
}
}
}
}
async fn conn_mgr_reconn_routine(data: Arc<ConnectorManagerData>) { async fn conn_mgr_reconn_routine(data: Arc<ConnectorManagerData>) {
tracing::warn!("conn_mgr_routine started"); tracing::warn!("conn_mgr_routine started");
let mut reconn_interval = tokio::time::interval(std::time::Duration::from_millis( let mut reconn_interval = tokio::time::interval(std::time::Duration::from_millis(
@@ -261,31 +199,6 @@ impl ManualConnectorManager {
} }
} }
async fn handle_event(event: &GlobalCtxEvent, data: &ConnectorManagerData) {
let need_add_alive = |conn_info: &PeerConnInfo| conn_info.is_client;
match event {
GlobalCtxEvent::PeerConnAdded(conn_info) => {
if !need_add_alive(conn_info) {
return;
}
let addr = conn_info.tunnel.as_ref().unwrap().remote_addr.clone();
data.alive_conn_urls.insert(addr.unwrap().to_string());
tracing::warn!("peer conn added: {:?}", conn_info);
}
GlobalCtxEvent::PeerConnRemoved(conn_info) => {
if !need_add_alive(conn_info) {
return;
}
let addr = conn_info.tunnel.as_ref().unwrap().remote_addr.clone();
data.alive_conn_urls.remove(&addr.unwrap().to_string());
tracing::warn!("peer conn removed: {:?}", conn_info);
}
_ => {}
}
}
fn handle_remove_connector(data: Arc<ConnectorManagerData>) { fn handle_remove_connector(data: Arc<ConnectorManagerData>) {
let remove_later = DashSet::new(); let remove_later = DashSet::new();
for it in data.removed_conn_urls.iter() { for it in data.removed_conn_urls.iter() {
@@ -307,12 +220,20 @@ impl ManualConnectorManager {
} }
} }
async fn collect_dead_conns(data: Arc<ConnectorManagerData>) -> BTreeSet<String> { async fn collect_dead_conns(data: Arc<ConnectorManagerData>) -> BTreeSet<url::Url> {
Self::handle_remove_connector(data.clone()); Self::handle_remove_connector(data.clone());
let all_urls: BTreeSet<String> = data.connectors.iter().map(|x| x.key().clone()).collect();
let mut ret = BTreeSet::new(); let mut ret = BTreeSet::new();
for url in all_urls.iter() { let Some(pm) = data.peer_manager.upgrade() else {
if !data.alive_conn_urls.contains(url) { tracing::warn!("peer manager is gone, exit");
return ret;
};
for url in data.connectors.iter().map(|x| x.key().clone()) {
if !pm.get_peer_map().is_client_url_alive(&url)
&& !pm
.get_foreign_network_client()
.get_peer_map()
.is_client_url_alive(&url)
{
ret.insert(url.clone()); ret.insert(url.clone());
} }
} }
@@ -347,21 +268,19 @@ impl ManualConnectorManager {
async fn conn_reconnect( async fn conn_reconnect(
data: Arc<ConnectorManagerData>, data: Arc<ConnectorManagerData>,
dead_url: String, dead_url: url::Url,
) -> Result<ReconnResult, Error> { ) -> Result<ReconnResult, Error> {
tracing::info!("reconnect: {}", dead_url); tracing::info!("reconnect: {}", dead_url);
let mut ip_versions = vec![]; let mut ip_versions = vec![];
let u = url::Url::parse(&dead_url) if dead_url.scheme() == "ring" || dead_url.scheme() == "txt" || dead_url.scheme() == "srv" {
.with_context(|| format!("failed to parse connector url {:?}", dead_url))?;
if u.scheme() == "ring" || u.scheme() == "txt" || u.scheme() == "srv" {
ip_versions.push(IpVersion::Both); ip_versions.push(IpVersion::Both);
} else { } else {
let addrs = match socket_addrs(&u, || Some(1000)).await { let addrs = match socket_addrs(&dead_url, || Some(1000)).await {
Ok(addrs) => addrs, Ok(addrs) => addrs,
Err(e) => { Err(e) => {
data.global_ctx.issue_event(GlobalCtxEvent::ConnectError( data.global_ctx.issue_event(GlobalCtxEvent::ConnectError(
dead_url.clone(), dead_url.to_string(),
format!("{:?}", IpVersion::Both), format!("{:?}", IpVersion::Both),
format!("{:?}", e), format!("{:?}", e),
)); ));
@@ -393,13 +312,18 @@ impl ManualConnectorManager {
"cannot get ip from url" "cannot get ip from url"
))); )));
for ip_version in ip_versions { for ip_version in ip_versions {
let use_long_timeout = dead_url.starts_with("http") let use_long_timeout = dead_url.scheme() == "http"
|| dead_url.starts_with("srv") || dead_url.scheme() == "https"
|| dead_url.starts_with("txt"); || dead_url.scheme() == "txt"
|| dead_url.scheme() == "srv";
let ret = timeout( let ret = timeout(
// allow http connector to wait longer // allow http connector to wait longer
std::time::Duration::from_secs(if use_long_timeout { 20 } else { 2 }), std::time::Duration::from_secs(if use_long_timeout { 20 } else { 2 }),
Self::conn_reconnect_with_ip_version(data.clone(), dead_url.clone(), ip_version), Self::conn_reconnect_with_ip_version(
data.clone(),
dead_url.to_string(),
ip_version,
),
) )
.await; .await;
tracing::info!("reconnect: {} done, ret: {:?}", dead_url, ret); tracing::info!("reconnect: {} done, ret: {:?}", dead_url, ret);
@@ -422,7 +346,7 @@ impl ManualConnectorManager {
// 发送事件(只有在未 break 时才执行) // 发送事件(只有在未 break 时才执行)
data.global_ctx.issue_event(GlobalCtxEvent::ConnectError( data.global_ctx.issue_event(GlobalCtxEvent::ConnectError(
dead_url.clone(), dead_url.to_string(),
format!("{:?}", ip_version), format!("{:?}", ip_version),
format!("{:?}", reconn_ret), format!("{:?}", reconn_ret),
)); ));

View File

@@ -484,7 +484,7 @@ impl InstanceConfigPatcher {
match ConfigPatchAction::try_from(connector.action) { match ConfigPatchAction::try_from(connector.action) {
Ok(ConfigPatchAction::Add) => { Ok(ConfigPatchAction::Add) => {
tracing::info!("Connector added: {}", url); tracing::info!("Connector added: {}", url);
conn_manager.add_connector_by_url(url.as_str()).await?; conn_manager.add_connector_by_url(url).await?;
} }
Ok(ConfigPatchAction::Remove) => { Ok(ConfigPatchAction::Remove) => {
tracing::info!("Connector removed: {}", url); tracing::info!("Connector removed: {}", url);
@@ -620,7 +620,7 @@ impl Instance {
async fn add_initial_peers(&mut self) -> Result<(), Error> { async fn add_initial_peers(&mut self) -> Result<(), Error> {
for peer in self.global_ctx.config.get_peers().iter() { for peer in self.global_ctx.config.get_peers().iter() {
self.get_conn_manager() self.get_conn_manager()
.add_connector_by_url(peer.uri.as_str()) .add_connector_by_url(peer.uri.clone())
.await?; .await?;
} }
Ok(()) Ok(())

View File

@@ -5,6 +5,7 @@ use std::{
use anyhow::Context; use anyhow::Context;
use dashmap::{DashMap, DashSet}; use dashmap::{DashMap, DashSet};
use parking_lot::Mutex;
use tokio::sync::RwLock; use tokio::sync::RwLock;
use crate::{ use crate::{
@@ -33,7 +34,7 @@ pub struct PeerMap {
peer_map: DashMap<PeerId, Arc<Peer>>, peer_map: DashMap<PeerId, Arc<Peer>>,
packet_send: PacketRecvChan, packet_send: PacketRecvChan,
routes: RwLock<Vec<ArcRoute>>, routes: RwLock<Vec<ArcRoute>>,
alive_conns: Arc<DashMap<(PeerId, PeerConnId), PeerConnInfo>>, alive_client_urls: Arc<Mutex<multimap::MultiMap<url::Url, PeerConnId>>>,
} }
impl PeerMap { impl PeerMap {
@@ -44,7 +45,7 @@ impl PeerMap {
peer_map: DashMap::new(), peer_map: DashMap::new(),
packet_send, packet_send,
routes: RwLock::new(Vec::new()), routes: RwLock::new(Vec::new()),
alive_conns: Arc::new(DashMap::new()), alive_client_urls: Arc::new(Mutex::new(multimap::MultiMap::new())),
} }
} }
@@ -56,7 +57,7 @@ impl PeerMap {
} }
pub async fn add_new_peer_conn(&self, peer_conn: PeerConn) { pub async fn add_new_peer_conn(&self, peer_conn: PeerConn) {
self.maintain_alive_conns(&peer_conn); let _ = self.maintain_alive_client_urls(&peer_conn);
let peer_id = peer_conn.get_peer_id(); let peer_id = peer_conn.get_peer_id();
let no_entry = self.peer_map.get(&peer_id).is_none(); let no_entry = self.peer_map.get(&peer_id).is_none();
if no_entry { if no_entry {
@@ -69,29 +70,48 @@ impl PeerMap {
} }
} }
fn maintain_alive_conns(&self, peer_conn: &PeerConn) { fn maintain_alive_client_urls(&self, peer_conn: &PeerConn) -> Option<()> {
let close_notifier = peer_conn.get_close_notifier();
let alive_conns_weak = Arc::downgrade(&self.alive_conns);
let conn_id = close_notifier.get_conn_id();
let conn_info = peer_conn.get_conn_info(); let conn_info = peer_conn.get_conn_info();
self.alive_conns if !conn_info.is_client {
.insert((conn_info.peer_id, conn_id), conn_info.clone()); return None;
}
let close_notifier = peer_conn.get_close_notifier();
let alive_conns_weak = Arc::downgrade(&self.alive_client_urls);
let conn_id = close_notifier.get_conn_id();
let alive_client_url: url::Url = conn_info.tunnel?.remote_addr?.into();
self.alive_client_urls
.lock()
.insert(alive_client_url.clone(), conn_id);
tokio::spawn(async move { tokio::spawn(async move {
if let Some(mut waiter) = close_notifier.get_waiter().await { if let Some(mut waiter) = close_notifier.get_waiter().await {
let _ = waiter.recv().await; let _ = waiter.recv().await;
} }
let mut alive_conn_count = 0; let Some(alive_conns) = alive_conns_weak.upgrade() else {
if let Some(alive_conns) = alive_conns_weak.upgrade() { return;
alive_conns.remove(&(conn_info.peer_id, conn_id)).unwrap(); };
alive_conn_count = alive_conns.len(); let mut guard = alive_conns.lock();
shrink_dashmap(&alive_conns, None); if let Some(mut conn_ids) = guard.remove(&alive_client_url) {
} conn_ids.retain(|id| id != &conn_id);
if !conn_ids.is_empty() {
guard.insert_many(alive_client_url, conn_ids);
}
};
let alive_conn_count = guard.len();
drop(guard);
tracing::debug!( tracing::debug!(
?conn_id, ?conn_id,
"peer conn is closed, current alive conns: {}", "peer conn is closed, current alive conns: {}",
alive_conn_count alive_conn_count
); );
}); });
Some(())
}
pub fn is_client_url_alive(&self, url: &url::Url) -> bool {
self.alive_client_urls.lock().contains_key(url)
} }
pub fn get_peer_by_id(&self, peer_id: PeerId) -> Option<Arc<Peer>> { pub fn get_peer_by_id(&self, peer_id: PeerId) -> Option<Arc<Peer>> {
@@ -359,13 +379,6 @@ impl PeerMap {
Ok(!self.has_peer(gateway_id)) Ok(!self.has_peer(gateway_id))
} }
pub fn get_alive_conns(&self) -> DashMap<(PeerId, PeerConnId), PeerConnInfo> {
self.alive_conns
.iter()
.map(|v| (*v.key(), v.value().clone()))
.collect()
}
pub fn my_peer_id(&self) -> PeerId { pub fn my_peer_id(&self) -> PeerId {
self.my_peer_id self.my_peer_id
} }