fix web show dup entry for same machine (#526)

This commit is contained in:
Sijie.Sun
2024-12-21 11:51:01 -05:00
committed by GitHub
parent 4e5915f98e
commit 4cf61f0d4a
7 changed files with 82 additions and 30 deletions

View File

@@ -1,13 +1,14 @@
pub mod session;
pub mod storage;
use std::sync::Arc;
use std::{collections::BTreeMap, str::FromStr, sync::Arc};
use dashmap::DashMap;
use easytier::{
common::scoped_task::ScopedTask, proto::web::HeartbeatRequest, tunnel::TunnelListener,
};
use session::Session;
use sqlx::types::chrono;
use storage::{Storage, StorageToken};
use crate::db::Db;
@@ -93,7 +94,7 @@ impl ClientManager {
.map(|item| item.value().clone())
}
pub fn list_machine_by_token(&self, token: String) -> Vec<url::Url> {
pub async fn list_machine_by_token(&self, token: String) -> Vec<url::Url> {
self.storage.list_token_clients(&token)
}

View File

@@ -1,4 +1,4 @@
use std::{fmt::Debug, sync::Arc};
use std::{fmt::Debug, str::FromStr as _, sync::Arc};
use easytier::{
common::scoped_task::ScopedTask,
@@ -87,10 +87,20 @@ impl WebServerService for SessionRpcService {
.map(Into::into)
.unwrap_or(uuid::Uuid::new_v4()),
});
if let Ok(storage) = Storage::try_from(data.storage.clone()) {
storage.add_client(data.storage_token.as_ref().unwrap().clone());
}
}
if let Ok(storage) = Storage::try_from(data.storage.clone()) {
let Ok(report_time) = chrono::DateTime::<chrono::Local>::from_str(&req.report_time)
else {
tracing::error!("Failed to parse report time: {:?}", req.report_time);
return Ok(HeartbeatResponse {});
};
storage.update_client(
data.storage_token.as_ref().unwrap().clone(),
report_time.timestamp(),
);
}
let _ = data.notifier.send(req);
Ok(HeartbeatResponse {})
}

View File

@@ -1,6 +1,6 @@
use std::sync::{Arc, Weak};
use dashmap::{DashMap, DashSet};
use dashmap::DashMap;
use crate::db::Db;
@@ -12,11 +12,19 @@ pub struct StorageToken {
pub machine_id: uuid::Uuid,
}
#[derive(Debug, Clone)]
struct ClientInfo {
client_url: url::Url,
machine_id: uuid::Uuid,
token: String,
report_time: i64,
}
#[derive(Debug)]
pub struct StorageInner {
// some map for indexing
pub token_clients_map: DashMap<String, DashSet<url::Url>>,
pub machine_client_url_map: DashMap<uuid::Uuid, DashSet<url::Url>>,
token_clients_map: DashMap<String, DashMap<uuid::Uuid, ClientInfo>>,
machine_client_url_map: DashMap<uuid::Uuid, ClientInfo>,
pub db: Db,
}
@@ -41,33 +49,57 @@ impl Storage {
}))
}
pub fn add_client(&self, stoken: StorageToken) {
fn remove_mid_to_client_info_map(
map: &DashMap<uuid::Uuid, ClientInfo>,
machine_id: &uuid::Uuid,
client_url: &url::Url,
) {
map.remove_if(&machine_id, |_, v| v.client_url == *client_url);
}
fn update_mid_to_client_info_map(
map: &DashMap<uuid::Uuid, ClientInfo>,
client_info: &ClientInfo,
) {
map.entry(client_info.machine_id)
.and_modify(|e| {
if e.report_time < client_info.report_time {
assert_eq!(e.machine_id, client_info.machine_id);
*e = client_info.clone();
}
})
.or_insert(client_info.clone());
}
pub fn update_client(&self, stoken: StorageToken, report_time: i64) {
let inner = self
.0
.token_clients_map
.entry(stoken.token)
.or_insert_with(DashSet::new);
inner.insert(stoken.client_url.clone());
.entry(stoken.token.clone())
.or_insert_with(DashMap::new);
self.0
.machine_client_url_map
.entry(stoken.machine_id)
.or_insert_with(DashSet::new)
.insert(stoken.client_url.clone());
let client_info = ClientInfo {
client_url: stoken.client_url.clone(),
machine_id: stoken.machine_id,
token: stoken.token.clone(),
report_time,
};
Self::update_mid_to_client_info_map(&inner, &client_info);
Self::update_mid_to_client_info_map(&self.0.machine_client_url_map, &client_info);
}
pub fn remove_client(&self, stoken: &StorageToken) {
self.0.token_clients_map.remove_if(&stoken.token, |_, set| {
set.remove(&stoken.client_url);
Self::remove_mid_to_client_info_map(set, &stoken.machine_id, &stoken.client_url);
set.is_empty()
});
self.0
.machine_client_url_map
.remove_if(&stoken.machine_id, |_, set| {
set.remove(&stoken.client_url);
set.is_empty()
});
Self::remove_mid_to_client_info_map(
&self.0.machine_client_url_map,
&stoken.machine_id,
&stoken.client_url,
);
}
pub fn weak_ref(&self) -> WeakRefStorage {
@@ -78,15 +110,19 @@ impl Storage {
self.0
.machine_client_url_map
.get(&machine_id)
.map(|url| url.iter().next().map(|url| url.clone()))
.flatten()
.map(|info| info.client_url.clone())
}
pub fn list_token_clients(&self, token: &str) -> Vec<url::Url> {
self.0
.token_clients_map
.get(token)
.map(|set| set.iter().map(|url| url.clone()).collect())
.map(|info_map| {
info_map
.iter()
.map(|info| info.value().client_url.clone())
.collect()
})
.unwrap_or_default()
}

View File

@@ -121,7 +121,9 @@ impl RestfulServer {
return Err((StatusCode::UNAUTHORIZED, other_error("No such user").into()));
};
let machines = client_mgr.list_machine_by_token(user.tokens[0].clone());
let machines = client_mgr
.list_machine_by_token(user.tokens[0].clone())
.await;
Ok(GetSummaryJsonResp {
device_count: machines.len() as u32,

View File

@@ -270,7 +270,7 @@ impl NetworkApi {
let client_urls = DashSet::new();
for token in tokens {
let urls = client_mgr.list_machine_by_token(token);
let urls = client_mgr.list_machine_by_token(token).await;
for url in urls {
client_urls.insert(url);
}