diff --git a/Cargo.lock b/Cargo.lock index 695a3ca..8e4d1bd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1979,6 +1979,7 @@ dependencies = [ "axum-login", "axum-messages", "base64 0.22.1", + "chrono", "clap", "dashmap", "easytier", diff --git a/easytier-web/Cargo.toml b/easytier-web/Cargo.toml index a70f29b..23bf45d 100644 --- a/easytier-web/Cargo.toml +++ b/easytier-web/Cargo.toml @@ -53,3 +53,5 @@ uuid = { version = "1.5.0", features = [ "macro-diagnostics", "serde", ] } + +chrono = { version = "0.4.37", features = ["serde"] } diff --git a/easytier-web/src/client_manager/mod.rs b/easytier-web/src/client_manager/mod.rs index d749b00..b9a2a7b 100644 --- a/easytier-web/src/client_manager/mod.rs +++ b/easytier-web/src/client_manager/mod.rs @@ -1,13 +1,14 @@ pub mod session; pub mod storage; -use std::sync::Arc; +use std::{collections::BTreeMap, str::FromStr, sync::Arc}; use dashmap::DashMap; use easytier::{ common::scoped_task::ScopedTask, proto::web::HeartbeatRequest, tunnel::TunnelListener, }; use session::Session; +use sqlx::types::chrono; use storage::{Storage, StorageToken}; use crate::db::Db; @@ -93,7 +94,7 @@ impl ClientManager { .map(|item| item.value().clone()) } - pub fn list_machine_by_token(&self, token: String) -> Vec { + pub async fn list_machine_by_token(&self, token: String) -> Vec { self.storage.list_token_clients(&token) } diff --git a/easytier-web/src/client_manager/session.rs b/easytier-web/src/client_manager/session.rs index 5ad2a18..df7e313 100644 --- a/easytier-web/src/client_manager/session.rs +++ b/easytier-web/src/client_manager/session.rs @@ -1,4 +1,4 @@ -use std::{fmt::Debug, sync::Arc}; +use std::{fmt::Debug, str::FromStr as _, sync::Arc}; use easytier::{ common::scoped_task::ScopedTask, @@ -87,10 +87,20 @@ impl WebServerService for SessionRpcService { .map(Into::into) .unwrap_or(uuid::Uuid::new_v4()), }); - if let Ok(storage) = Storage::try_from(data.storage.clone()) { - storage.add_client(data.storage_token.as_ref().unwrap().clone()); - } } + + if let Ok(storage) = Storage::try_from(data.storage.clone()) { + let Ok(report_time) = chrono::DateTime::::from_str(&req.report_time) + else { + tracing::error!("Failed to parse report time: {:?}", req.report_time); + return Ok(HeartbeatResponse {}); + }; + storage.update_client( + data.storage_token.as_ref().unwrap().clone(), + report_time.timestamp(), + ); + } + let _ = data.notifier.send(req); Ok(HeartbeatResponse {}) } diff --git a/easytier-web/src/client_manager/storage.rs b/easytier-web/src/client_manager/storage.rs index 5c974f3..a9be575 100644 --- a/easytier-web/src/client_manager/storage.rs +++ b/easytier-web/src/client_manager/storage.rs @@ -1,6 +1,6 @@ use std::sync::{Arc, Weak}; -use dashmap::{DashMap, DashSet}; +use dashmap::DashMap; use crate::db::Db; @@ -12,11 +12,19 @@ pub struct StorageToken { pub machine_id: uuid::Uuid, } +#[derive(Debug, Clone)] +struct ClientInfo { + client_url: url::Url, + machine_id: uuid::Uuid, + token: String, + report_time: i64, +} + #[derive(Debug)] pub struct StorageInner { // some map for indexing - pub token_clients_map: DashMap>, - pub machine_client_url_map: DashMap>, + token_clients_map: DashMap>, + machine_client_url_map: DashMap, pub db: Db, } @@ -41,33 +49,57 @@ impl Storage { })) } - pub fn add_client(&self, stoken: StorageToken) { + fn remove_mid_to_client_info_map( + map: &DashMap, + machine_id: &uuid::Uuid, + client_url: &url::Url, + ) { + map.remove_if(&machine_id, |_, v| v.client_url == *client_url); + } + + fn update_mid_to_client_info_map( + map: &DashMap, + client_info: &ClientInfo, + ) { + map.entry(client_info.machine_id) + .and_modify(|e| { + if e.report_time < client_info.report_time { + assert_eq!(e.machine_id, client_info.machine_id); + *e = client_info.clone(); + } + }) + .or_insert(client_info.clone()); + } + + pub fn update_client(&self, stoken: StorageToken, report_time: i64) { let inner = self .0 .token_clients_map - .entry(stoken.token) - .or_insert_with(DashSet::new); - inner.insert(stoken.client_url.clone()); + .entry(stoken.token.clone()) + .or_insert_with(DashMap::new); - self.0 - .machine_client_url_map - .entry(stoken.machine_id) - .or_insert_with(DashSet::new) - .insert(stoken.client_url.clone()); + let client_info = ClientInfo { + client_url: stoken.client_url.clone(), + machine_id: stoken.machine_id, + token: stoken.token.clone(), + report_time, + }; + + Self::update_mid_to_client_info_map(&inner, &client_info); + Self::update_mid_to_client_info_map(&self.0.machine_client_url_map, &client_info); } pub fn remove_client(&self, stoken: &StorageToken) { self.0.token_clients_map.remove_if(&stoken.token, |_, set| { - set.remove(&stoken.client_url); + Self::remove_mid_to_client_info_map(set, &stoken.machine_id, &stoken.client_url); set.is_empty() }); - self.0 - .machine_client_url_map - .remove_if(&stoken.machine_id, |_, set| { - set.remove(&stoken.client_url); - set.is_empty() - }); + Self::remove_mid_to_client_info_map( + &self.0.machine_client_url_map, + &stoken.machine_id, + &stoken.client_url, + ); } pub fn weak_ref(&self) -> WeakRefStorage { @@ -78,15 +110,19 @@ impl Storage { self.0 .machine_client_url_map .get(&machine_id) - .map(|url| url.iter().next().map(|url| url.clone())) - .flatten() + .map(|info| info.client_url.clone()) } pub fn list_token_clients(&self, token: &str) -> Vec { self.0 .token_clients_map .get(token) - .map(|set| set.iter().map(|url| url.clone()).collect()) + .map(|info_map| { + info_map + .iter() + .map(|info| info.value().client_url.clone()) + .collect() + }) .unwrap_or_default() } diff --git a/easytier-web/src/restful/mod.rs b/easytier-web/src/restful/mod.rs index 44ab988..9d780c7 100644 --- a/easytier-web/src/restful/mod.rs +++ b/easytier-web/src/restful/mod.rs @@ -121,7 +121,9 @@ impl RestfulServer { return Err((StatusCode::UNAUTHORIZED, other_error("No such user").into())); }; - let machines = client_mgr.list_machine_by_token(user.tokens[0].clone()); + let machines = client_mgr + .list_machine_by_token(user.tokens[0].clone()) + .await; Ok(GetSummaryJsonResp { device_count: machines.len() as u32, diff --git a/easytier-web/src/restful/network.rs b/easytier-web/src/restful/network.rs index 83b1cf9..3a8d41a 100644 --- a/easytier-web/src/restful/network.rs +++ b/easytier-web/src/restful/network.rs @@ -270,7 +270,7 @@ impl NetworkApi { let client_urls = DashSet::new(); for token in tokens { - let urls = client_mgr.list_machine_by_token(token); + let urls = client_mgr.list_machine_by_token(token).await; for url in urls { client_urls.insert(url); }