mirror of
https://mirror.suhoan.cn/https://github.com/EasyTier/EasyTier.git
synced 2025-12-16 14:47:25 +08:00
feat/web: Patchset 3 (#455)
https://apifox.com/apidoc/shared-ceda7a60-e817-4ea8-827b-de4e874dc45e implement all backend API
This commit is contained in:
@@ -4,10 +4,14 @@ pub mod storage;
|
||||
use std::sync::Arc;
|
||||
|
||||
use dashmap::DashMap;
|
||||
use easytier::{common::scoped_task::ScopedTask, tunnel::TunnelListener};
|
||||
use easytier::{
|
||||
common::scoped_task::ScopedTask, proto::web::HeartbeatRequest, tunnel::TunnelListener,
|
||||
};
|
||||
use session::Session;
|
||||
use storage::{Storage, StorageToken};
|
||||
|
||||
use crate::db::Db;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct ClientManager {
|
||||
accept_task: Option<ScopedTask<()>>,
|
||||
@@ -18,13 +22,13 @@ pub struct ClientManager {
|
||||
}
|
||||
|
||||
impl ClientManager {
|
||||
pub fn new() -> Self {
|
||||
pub fn new(db: Db) -> Self {
|
||||
ClientManager {
|
||||
accept_task: None,
|
||||
clear_task: None,
|
||||
|
||||
client_sessions: Arc::new(DashMap::new()),
|
||||
storage: Storage::new(),
|
||||
storage: Storage::new(db),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -41,7 +45,8 @@ impl ClientManager {
|
||||
let info = tunnel.info().unwrap();
|
||||
let client_url: url::Url = info.remote_addr.unwrap().into();
|
||||
println!("New session from {:?}", tunnel.info());
|
||||
let session = Session::new(tunnel, storage.clone(), client_url.clone());
|
||||
let mut session = Session::new(storage.clone(), client_url.clone());
|
||||
session.serve(tunnel).await;
|
||||
sessions.insert(client_url, Arc::new(session));
|
||||
}
|
||||
});
|
||||
@@ -87,6 +92,19 @@ impl ClientManager {
|
||||
.get(&c_url)
|
||||
.map(|item| item.value().clone())
|
||||
}
|
||||
|
||||
pub fn list_machine_by_token(&self, token: String) -> Vec<url::Url> {
|
||||
self.storage.list_token_clients(&token)
|
||||
}
|
||||
|
||||
pub async fn get_heartbeat_requests(&self, client_url: &url::Url) -> Option<HeartbeatRequest> {
|
||||
let s = self.client_sessions.get(client_url)?.clone();
|
||||
s.data().read().await.req()
|
||||
}
|
||||
|
||||
pub fn db(&self) -> &Db {
|
||||
self.storage.db()
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -101,12 +119,12 @@ mod tests {
|
||||
web_client::WebClient,
|
||||
};
|
||||
|
||||
use crate::client_manager::ClientManager;
|
||||
use crate::{client_manager::ClientManager, db::Db};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_client() {
|
||||
let listener = UdpTunnelListener::new("udp://0.0.0.0:54333".parse().unwrap());
|
||||
let mut mgr = ClientManager::new();
|
||||
let mut mgr = ClientManager::new(Db::memory_db().await);
|
||||
mgr.serve(Box::new(listener)).await.unwrap();
|
||||
|
||||
let connector = UdpTunnelConnector::new("udp://127.0.0.1:54333".parse().unwrap());
|
||||
|
||||
@@ -1,12 +1,13 @@
|
||||
use std::{fmt::Debug, sync::Arc};
|
||||
|
||||
use easytier::{
|
||||
common::scoped_task::ScopedTask,
|
||||
proto::{
|
||||
rpc_impl::bidirect::BidirectRpcManager,
|
||||
rpc_types::{self, controller::BaseController},
|
||||
web::{
|
||||
HeartbeatRequest, HeartbeatResponse, WebClientService, WebClientServiceClientFactory,
|
||||
WebServerService, WebServerServiceServer,
|
||||
HeartbeatRequest, HeartbeatResponse, RunNetworkInstanceRequest, WebClientService,
|
||||
WebClientServiceClientFactory, WebServerService, WebServerServiceServer,
|
||||
},
|
||||
},
|
||||
tunnel::Tunnel,
|
||||
@@ -98,6 +99,8 @@ pub struct Session {
|
||||
rpc_mgr: BidirectRpcManager,
|
||||
|
||||
data: SharedSessionData,
|
||||
|
||||
run_network_on_start_task: Option<ScopedTask<()>>,
|
||||
}
|
||||
|
||||
impl Debug for Session {
|
||||
@@ -106,20 +109,122 @@ impl Debug for Session {
|
||||
}
|
||||
}
|
||||
|
||||
type SessionRpcClient = Box<dyn WebClientService<Controller = BaseController> + Send>;
|
||||
|
||||
impl Session {
|
||||
pub fn new(tunnel: Box<dyn Tunnel>, storage: WeakRefStorage, client_url: url::Url) -> Self {
|
||||
pub fn new(storage: WeakRefStorage, client_url: url::Url) -> Self {
|
||||
let session_data = SessionData::new(storage, client_url);
|
||||
let data = Arc::new(RwLock::new(session_data));
|
||||
|
||||
let rpc_mgr =
|
||||
BidirectRpcManager::new().set_rx_timeout(Some(std::time::Duration::from_secs(30)));
|
||||
rpc_mgr.run_with_tunnel(tunnel);
|
||||
|
||||
let data = Arc::new(RwLock::new(SessionData::new(storage, client_url)));
|
||||
|
||||
rpc_mgr.rpc_server().registry().register(
|
||||
WebServerServiceServer::new(SessionRpcService { data: data.clone() }),
|
||||
"",
|
||||
);
|
||||
|
||||
Session { rpc_mgr, data }
|
||||
Session {
|
||||
rpc_mgr,
|
||||
data,
|
||||
run_network_on_start_task: None,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn serve(&mut self, tunnel: Box<dyn Tunnel>) {
|
||||
self.rpc_mgr.run_with_tunnel(tunnel);
|
||||
|
||||
let data = self.data.read().await;
|
||||
self.run_network_on_start_task.replace(
|
||||
tokio::spawn(Self::run_network_on_start(
|
||||
data.heartbeat_waiter(),
|
||||
data.storage.clone(),
|
||||
self.scoped_rpc_client(),
|
||||
))
|
||||
.into(),
|
||||
);
|
||||
}
|
||||
|
||||
async fn run_network_on_start(
|
||||
mut heartbeat_waiter: broadcast::Receiver<HeartbeatRequest>,
|
||||
storage: WeakRefStorage,
|
||||
rpc_client: SessionRpcClient,
|
||||
) {
|
||||
loop {
|
||||
heartbeat_waiter = heartbeat_waiter.resubscribe();
|
||||
let req = heartbeat_waiter.recv().await;
|
||||
if req.is_err() {
|
||||
tracing::error!(
|
||||
"Failed to receive heartbeat request, error: {:?}",
|
||||
req.err()
|
||||
);
|
||||
return;
|
||||
}
|
||||
let req = req.unwrap();
|
||||
let running_inst_ids = req
|
||||
.running_network_instances
|
||||
.iter()
|
||||
.map(|x| x.to_string())
|
||||
.collect::<Vec<_>>();
|
||||
let Some(storage) = storage.upgrade() else {
|
||||
tracing::error!("Failed to get storage");
|
||||
return;
|
||||
};
|
||||
|
||||
let user_id = match storage
|
||||
.db
|
||||
.get_user_id_by_token(req.user_token.clone())
|
||||
.await
|
||||
{
|
||||
Ok(Some(user_id)) => user_id,
|
||||
Ok(None) => {
|
||||
tracing::info!("User not found by token: {:?}", req.user_token);
|
||||
return;
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!("Failed to get user id by token, error: {:?}", e);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let local_configs = match storage.db.list_network_configs(user_id, true).await {
|
||||
Ok(configs) => configs,
|
||||
Err(e) => {
|
||||
tracing::error!("Failed to list network configs, error: {:?}", e);
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let mut has_failed = false;
|
||||
|
||||
for c in local_configs {
|
||||
if running_inst_ids.contains(&c.network_instance_id) {
|
||||
continue;
|
||||
}
|
||||
let ret = rpc_client
|
||||
.run_network_instance(
|
||||
BaseController::default(),
|
||||
RunNetworkInstanceRequest {
|
||||
inst_id: Some(c.network_instance_id.clone().into()),
|
||||
config: c.network_config,
|
||||
},
|
||||
)
|
||||
.await;
|
||||
tracing::info!(
|
||||
?user_id,
|
||||
"Run network instance: {:?}, user_token: {:?}",
|
||||
ret,
|
||||
req.user_token
|
||||
);
|
||||
|
||||
has_failed |= ret.is_err();
|
||||
}
|
||||
|
||||
if !has_failed {
|
||||
tracing::info!(?req, "All network instances are running");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_running(&self) -> bool {
|
||||
@@ -130,9 +235,7 @@ impl Session {
|
||||
self.data.clone()
|
||||
}
|
||||
|
||||
pub fn scoped_rpc_client(
|
||||
&self,
|
||||
) -> Box<dyn WebClientService<Controller = BaseController> + Send> {
|
||||
pub fn scoped_rpc_client(&self) -> SessionRpcClient {
|
||||
self.rpc_mgr
|
||||
.rpc_client()
|
||||
.scoped_client::<WebClientServiceClientFactory<BaseController>>(1, 1, "".to_string())
|
||||
@@ -141,4 +244,8 @@ impl Session {
|
||||
pub async fn get_token(&self) -> Option<StorageToken> {
|
||||
self.data.read().await.storage_token.clone()
|
||||
}
|
||||
|
||||
pub async fn get_heartbeat_req(&self) -> Option<HeartbeatRequest> {
|
||||
self.data.read().await.req()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,6 +2,8 @@ use std::sync::{Arc, Weak};
|
||||
|
||||
use dashmap::{DashMap, DashSet};
|
||||
|
||||
use crate::db::Db;
|
||||
|
||||
// use this to maintain Storage
|
||||
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
|
||||
pub struct StorageToken {
|
||||
@@ -15,6 +17,7 @@ pub struct StorageInner {
|
||||
// some map for indexing
|
||||
pub token_clients_map: DashMap<String, DashSet<url::Url>>,
|
||||
pub machine_client_url_map: DashMap<uuid::Uuid, url::Url>,
|
||||
pub db: Db,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -30,10 +33,11 @@ impl TryFrom<WeakRefStorage> for Storage {
|
||||
}
|
||||
|
||||
impl Storage {
|
||||
pub fn new() -> Self {
|
||||
pub fn new(db: Db) -> Self {
|
||||
Storage(Arc::new(StorageInner {
|
||||
token_clients_map: DashMap::new(),
|
||||
machine_client_url_map: DashMap::new(),
|
||||
db,
|
||||
}))
|
||||
}
|
||||
|
||||
@@ -69,4 +73,16 @@ impl Storage {
|
||||
.get(&machine_id)
|
||||
.map(|url| url.clone())
|
||||
}
|
||||
|
||||
pub fn list_token_clients(&self, token: &str) -> Vec<url::Url> {
|
||||
self.0
|
||||
.token_clients_map
|
||||
.get(token)
|
||||
.map(|set| set.iter().map(|url| url.clone()).collect())
|
||||
.unwrap_or_default()
|
||||
}
|
||||
|
||||
pub fn db(&self) -> &Db {
|
||||
&self.0.db
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user