From 0427b48d75b2847f6dea747046f1e34ae148a927 Mon Sep 17 00:00:00 2001 From: liusen373 <52489720+liusen373@users.noreply.github.com> Date: Thu, 17 Jul 2025 20:37:05 +0800 Subject: [PATCH] Allows to modify Easytier's mapped listener at runtime via RPC (#1107) * Add proto definition * Implement and register the corresponding rpc service * Parse command line parameters and call remote rpc service --------- Co-authored-by: Sijie.Sun --- easytier/src/easytier-cli.rs | 98 +++++++++++++++++++++++++++++++ easytier/src/instance/instance.rs | 53 +++++++++++++++++ easytier/src/proto/cli.proto | 25 ++++++++ 3 files changed, 176 insertions(+) diff --git a/easytier/src/easytier-cli.rs b/easytier/src/easytier-cli.rs index 8551ca2..b896c23 100644 --- a/easytier/src/easytier-cli.rs +++ b/easytier/src/easytier-cli.rs @@ -33,6 +33,7 @@ use easytier::{ PeerManageRpcClientFactory, ShowNodeInfoRequest, TcpProxyEntryState, TcpProxyEntryTransportType, TcpProxyRpc, TcpProxyRpcClientFactory, VpnPortalRpc, VpnPortalRpcClientFactory, + ManageMappedListenerRequest, MappedListenerManageRpc, MappedListenerManageRpcClientFactory, ListMappedListenerRequest, MappedListenerManageAction }, common::NatType, peer_rpc::{GetGlobalPeerMapRequest, PeerCenterRpc, PeerCenterRpcClientFactory}, @@ -74,6 +75,8 @@ enum SubCommand { Peer(PeerArgs), #[command(about = "manage connectors")] Connector(ConnectorArgs), + #[command(about = "manage mapped listeners")] + MappedListener(MappedListenerArgs), #[command(about = "do stun test")] Stun, #[command(about = "show route info")] @@ -146,6 +149,26 @@ enum ConnectorSubCommand { List, } +#[derive(Args, Debug)] +struct MappedListenerArgs { + #[command(subcommand)] + sub_command: Option, +} + +#[derive(Subcommand, Debug)] +enum MappedListenerSubCommand { + /// Add Mapped Listerner + Add { + url: String + }, + /// Remove Mapped Listener + Remove { + url: String + }, + /// List Existing Mapped Listener + List, +} + #[derive(Subcommand, Debug)] enum NodeSubCommand { #[command(about = "show node info")] @@ -246,6 +269,18 @@ impl CommandHandler<'_> { .with_context(|| "failed to get connector manager client")?) } + async fn get_mapped_listener_manager_client( + &self, + ) -> Result>, Error> { + Ok(self + .client + .lock() + .unwrap() + .scoped_client::>("".to_string()) + .await + .with_context(|| "failed to get mapped listener manager client")?) + } + async fn get_peer_center_client( &self, ) -> Result>, Error> { @@ -704,6 +739,56 @@ impl CommandHandler<'_> { println!("response: {:#?}", response); Ok(()) } + + async fn handle_mapped_listener_list(&self) -> Result<(), Error> { + let client = self.get_mapped_listener_manager_client().await?; + let request = ListMappedListenerRequest::default(); + let response = client + .list_mapped_listener(BaseController::default(), request) + .await?; + if self.verbose || *self.output_format == OutputFormat::Json { + println!("{}", serde_json::to_string_pretty(&response.mappedlisteners)?); + return Ok(()); + } + println!("response: {:#?}", response); + Ok(()) + } + + async fn handle_mapped_listener_add(&self, url: &String) -> Result<(), Error> { + let url = Self::mapped_listener_validate_url(url)?; + let client = self.get_mapped_listener_manager_client().await?; + let request = ManageMappedListenerRequest { + action: MappedListenerManageAction::MappedListenerAdd as i32, + url: Some(url.into()) + }; + let _response = client + .manage_mapped_listener(BaseController::default(), request) + .await?; + Ok(()) + } + + async fn handle_mapped_listener_remove(&self, url: &String) -> Result<(), Error> { + let url = Self::mapped_listener_validate_url(url)?; + let client = self.get_mapped_listener_manager_client().await?; + let request = ManageMappedListenerRequest { + action: MappedListenerManageAction::MappedListenerRemove as i32, + url: Some(url.into()) + }; + let _response = client + .manage_mapped_listener(BaseController::default(), request) + .await?; + Ok(()) + } + + fn mapped_listener_validate_url(url: &String) -> Result { + let url = url::Url::parse(url)?; + if url.scheme() != "tcp" && url.scheme() != "udp" { + return Err(anyhow::anyhow!("Url ({url}) must start with tcp:// or udp://")) + } else if url.port().is_none() { + return Err(anyhow::anyhow!("Url ({url}) is missing port num")) + } + Ok(url) + } } #[derive(Debug)] @@ -1041,6 +1126,19 @@ async fn main() -> Result<(), Error> { handler.handle_connector_list().await?; } }, + SubCommand::MappedListener(mapped_listener_args) => match mapped_listener_args.sub_command { + Some(MappedListenerSubCommand::Add { url }) => { + handler.handle_mapped_listener_add(&url).await?; + println!("add mapped listener: {url}"); + } + Some(MappedListenerSubCommand::Remove { url }) => { + handler.handle_mapped_listener_remove(&url).await?; + println!("remove mapped listener: {url}"); + } + Some(MappedListenerSubCommand::List) | None => { + handler.handle_mapped_listener_list().await?; + } + }, SubCommand::Route(route_args) => match route_args.sub_command { Some(RouteSubCommand::List) | None => handler.handle_route_list().await?, Some(RouteSubCommand::Dump) => handler.handle_route_dump().await?, diff --git a/easytier/src/instance/instance.rs b/easytier/src/instance/instance.rs index 0c6212e..87c400c 100644 --- a/easytier/src/instance/instance.rs +++ b/easytier/src/instance/instance.rs @@ -30,6 +30,10 @@ use crate::peers::rpc_service::PeerManagerRpcService; use crate::peers::{create_packet_recv_chan, recv_packet_from_chan, PacketRecvChanReceiver}; use crate::proto::cli::VpnPortalRpc; use crate::proto::cli::{GetVpnPortalInfoRequest, GetVpnPortalInfoResponse, VpnPortalInfo}; +use crate::proto::cli::{ + MappedListenerManageRpc, MappedListener, ListMappedListenerRequest, ListMappedListenerResponse, + ManageMappedListenerRequest, MappedListenerManageAction, ManageMappedListenerResponse +}; use crate::proto::common::TunnelInfo; use crate::proto::peer_rpc::PeerCenterRpcServer; use crate::proto::rpc_impl::standalone::{RpcServerHook, StandAloneServer}; @@ -715,6 +719,52 @@ impl Instance { } } + fn get_mapped_listener_manager_rpc_service(&self) -> impl MappedListenerManageRpc + Clone { + #[derive(Clone)] + pub struct MappedListenerManagerRpcService(Arc); + + #[async_trait::async_trait] + impl MappedListenerManageRpc for MappedListenerManagerRpcService { + type Controller = BaseController; + + async fn list_mapped_listener( + &self, + _: BaseController, + _request: ListMappedListenerRequest, + ) -> Result { + let mut ret = ListMappedListenerResponse::default(); + let urls = self.0.config.get_mapped_listeners(); + let mapped_listeners: Vec = urls + .into_iter() + .map(|u|MappedListener{url: Some(u.into())}) + .collect(); + ret.mappedlisteners = mapped_listeners; + Ok(ret) + } + + async fn manage_mapped_listener( + &self, + _: BaseController, + req: ManageMappedListenerRequest, + ) -> Result { + let url: url::Url = req.url.ok_or(anyhow::anyhow!("url is empty"))?.into(); + + let urls = self.0.config.get_mapped_listeners(); + let mut set_urls: HashSet = urls.into_iter().collect(); + if req.action == MappedListenerManageAction::MappedListenerRemove as i32 { + set_urls.remove(&url); + } else if req.action == MappedListenerManageAction::MappedListenerAdd as i32 { + set_urls.insert(url); + } + let urls: Vec = set_urls.into_iter().collect(); + self.0.config.set_mapped_listeners(Some(urls)); + Ok(ManageMappedListenerResponse::default()) + } + } + + MappedListenerManagerRpcService(self.global_ctx.clone()) + } + async fn run_rpc_server(&mut self) -> Result<(), Error> { let Some(_) = self.global_ctx.config.get_rpc_portal() else { tracing::info!("rpc server not enabled, because rpc_portal is not set."); @@ -727,6 +777,7 @@ impl Instance { let conn_manager = self.conn_manager.clone(); let peer_center = self.peer_center.clone(); let vpn_portal_rpc = self.get_vpn_portal_rpc_service(); + let mapped_listener_manager_rpc = self.get_mapped_listener_manager_rpc_service(); let s = self.rpc_server.as_mut().unwrap(); s.registry().register( @@ -742,6 +793,8 @@ impl Instance { .register(PeerCenterRpcServer::new(peer_center.get_rpc_service()), ""); s.registry() .register(VpnPortalRpcServer::new(vpn_portal_rpc), ""); + s.registry() + .register(MappedListenerManageRpcServer::new(mapped_listener_manager_rpc), ""); if let Some(ip_proxy) = self.ip_proxy.as_ref() { s.registry().register( diff --git a/easytier/src/proto/cli.proto b/easytier/src/proto/cli.proto index f79bb6d..c73205d 100644 --- a/easytier/src/proto/cli.proto +++ b/easytier/src/proto/cli.proto @@ -172,6 +172,31 @@ service ConnectorManageRpc { rpc ManageConnector(ManageConnectorRequest) returns (ManageConnectorResponse); } +message MappedListener { + common.Url url = 1; +} + +message ListMappedListenerRequest {} + +message ListMappedListenerResponse { repeated MappedListener mappedlisteners = 1; } + +enum MappedListenerManageAction { + MAPPED_LISTENER_ADD = 0; + MAPPED_LISTENER_REMOVE = 1; +} + +message ManageMappedListenerRequest { + MappedListenerManageAction action = 1; + common.Url url = 2; +} + +message ManageMappedListenerResponse {} + +service MappedListenerManageRpc { + rpc ListMappedListener(ListMappedListenerRequest) returns (ListMappedListenerResponse); + rpc ManageMappedListener(ManageMappedListenerRequest) returns (ManageMappedListenerResponse); +} + message VpnPortalInfo { string vpn_type = 1; string client_config = 2;