Allows to modify Easytier's mapped listener at runtime via RPC (#1107)

* Add proto definition
* Implement and register the corresponding rpc service
* Parse command line parameters and call remote rpc service

---------

Co-authored-by: Sijie.Sun <sunsijie@buaa.edu.cn>
This commit is contained in:
liusen373
2025-07-17 20:37:05 +08:00
committed by GitHub
parent 0b729b99e7
commit 0427b48d75
3 changed files with 176 additions and 0 deletions

View File

@@ -33,6 +33,7 @@ use easytier::{
PeerManageRpcClientFactory, ShowNodeInfoRequest, TcpProxyEntryState,
TcpProxyEntryTransportType, TcpProxyRpc, TcpProxyRpcClientFactory, VpnPortalRpc,
VpnPortalRpcClientFactory,
ManageMappedListenerRequest, MappedListenerManageRpc, MappedListenerManageRpcClientFactory, ListMappedListenerRequest, MappedListenerManageAction
},
common::NatType,
peer_rpc::{GetGlobalPeerMapRequest, PeerCenterRpc, PeerCenterRpcClientFactory},
@@ -74,6 +75,8 @@ enum SubCommand {
Peer(PeerArgs),
#[command(about = "manage connectors")]
Connector(ConnectorArgs),
#[command(about = "manage mapped listeners")]
MappedListener(MappedListenerArgs),
#[command(about = "do stun test")]
Stun,
#[command(about = "show route info")]
@@ -146,6 +149,26 @@ enum ConnectorSubCommand {
List,
}
#[derive(Args, Debug)]
struct MappedListenerArgs {
#[command(subcommand)]
sub_command: Option<MappedListenerSubCommand>,
}
#[derive(Subcommand, Debug)]
enum MappedListenerSubCommand {
/// Add Mapped Listerner
Add {
url: String
},
/// Remove Mapped Listener
Remove {
url: String
},
/// List Existing Mapped Listener
List,
}
#[derive(Subcommand, Debug)]
enum NodeSubCommand {
#[command(about = "show node info")]
@@ -246,6 +269,18 @@ impl CommandHandler<'_> {
.with_context(|| "failed to get connector manager client")?)
}
async fn get_mapped_listener_manager_client(
&self,
) -> Result<Box<dyn MappedListenerManageRpc<Controller = BaseController>>, Error> {
Ok(self
.client
.lock()
.unwrap()
.scoped_client::<MappedListenerManageRpcClientFactory<BaseController>>("".to_string())
.await
.with_context(|| "failed to get mapped listener manager client")?)
}
async fn get_peer_center_client(
&self,
) -> Result<Box<dyn PeerCenterRpc<Controller = BaseController>>, Error> {
@@ -704,6 +739,56 @@ impl CommandHandler<'_> {
println!("response: {:#?}", response);
Ok(())
}
async fn handle_mapped_listener_list(&self) -> Result<(), Error> {
let client = self.get_mapped_listener_manager_client().await?;
let request = ListMappedListenerRequest::default();
let response = client
.list_mapped_listener(BaseController::default(), request)
.await?;
if self.verbose || *self.output_format == OutputFormat::Json {
println!("{}", serde_json::to_string_pretty(&response.mappedlisteners)?);
return Ok(());
}
println!("response: {:#?}", response);
Ok(())
}
async fn handle_mapped_listener_add(&self, url: &String) -> Result<(), Error> {
let url = Self::mapped_listener_validate_url(url)?;
let client = self.get_mapped_listener_manager_client().await?;
let request = ManageMappedListenerRequest {
action: MappedListenerManageAction::MappedListenerAdd as i32,
url: Some(url.into())
};
let _response = client
.manage_mapped_listener(BaseController::default(), request)
.await?;
Ok(())
}
async fn handle_mapped_listener_remove(&self, url: &String) -> Result<(), Error> {
let url = Self::mapped_listener_validate_url(url)?;
let client = self.get_mapped_listener_manager_client().await?;
let request = ManageMappedListenerRequest {
action: MappedListenerManageAction::MappedListenerRemove as i32,
url: Some(url.into())
};
let _response = client
.manage_mapped_listener(BaseController::default(), request)
.await?;
Ok(())
}
fn mapped_listener_validate_url(url: &String) -> Result<url::Url, Error> {
let url = url::Url::parse(url)?;
if url.scheme() != "tcp" && url.scheme() != "udp" {
return Err(anyhow::anyhow!("Url ({url}) must start with tcp:// or udp://"))
} else if url.port().is_none() {
return Err(anyhow::anyhow!("Url ({url}) is missing port num"))
}
Ok(url)
}
}
#[derive(Debug)]
@@ -1041,6 +1126,19 @@ async fn main() -> Result<(), Error> {
handler.handle_connector_list().await?;
}
},
SubCommand::MappedListener(mapped_listener_args) => match mapped_listener_args.sub_command {
Some(MappedListenerSubCommand::Add { url }) => {
handler.handle_mapped_listener_add(&url).await?;
println!("add mapped listener: {url}");
}
Some(MappedListenerSubCommand::Remove { url }) => {
handler.handle_mapped_listener_remove(&url).await?;
println!("remove mapped listener: {url}");
}
Some(MappedListenerSubCommand::List) | None => {
handler.handle_mapped_listener_list().await?;
}
},
SubCommand::Route(route_args) => match route_args.sub_command {
Some(RouteSubCommand::List) | None => handler.handle_route_list().await?,
Some(RouteSubCommand::Dump) => handler.handle_route_dump().await?,

View File

@@ -30,6 +30,10 @@ use crate::peers::rpc_service::PeerManagerRpcService;
use crate::peers::{create_packet_recv_chan, recv_packet_from_chan, PacketRecvChanReceiver};
use crate::proto::cli::VpnPortalRpc;
use crate::proto::cli::{GetVpnPortalInfoRequest, GetVpnPortalInfoResponse, VpnPortalInfo};
use crate::proto::cli::{
MappedListenerManageRpc, MappedListener, ListMappedListenerRequest, ListMappedListenerResponse,
ManageMappedListenerRequest, MappedListenerManageAction, ManageMappedListenerResponse
};
use crate::proto::common::TunnelInfo;
use crate::proto::peer_rpc::PeerCenterRpcServer;
use crate::proto::rpc_impl::standalone::{RpcServerHook, StandAloneServer};
@@ -715,6 +719,52 @@ impl Instance {
}
}
fn get_mapped_listener_manager_rpc_service(&self) -> impl MappedListenerManageRpc<Controller = BaseController> + Clone {
#[derive(Clone)]
pub struct MappedListenerManagerRpcService(Arc<GlobalCtx>);
#[async_trait::async_trait]
impl MappedListenerManageRpc for MappedListenerManagerRpcService {
type Controller = BaseController;
async fn list_mapped_listener(
&self,
_: BaseController,
_request: ListMappedListenerRequest,
) -> Result<ListMappedListenerResponse, rpc_types::error::Error> {
let mut ret = ListMappedListenerResponse::default();
let urls = self.0.config.get_mapped_listeners();
let mapped_listeners: Vec<MappedListener> = urls
.into_iter()
.map(|u|MappedListener{url: Some(u.into())})
.collect();
ret.mappedlisteners = mapped_listeners;
Ok(ret)
}
async fn manage_mapped_listener(
&self,
_: BaseController,
req: ManageMappedListenerRequest,
) -> Result<ManageMappedListenerResponse, rpc_types::error::Error> {
let url: url::Url = req.url.ok_or(anyhow::anyhow!("url is empty"))?.into();
let urls = self.0.config.get_mapped_listeners();
let mut set_urls: HashSet<url::Url> = urls.into_iter().collect();
if req.action == MappedListenerManageAction::MappedListenerRemove as i32 {
set_urls.remove(&url);
} else if req.action == MappedListenerManageAction::MappedListenerAdd as i32 {
set_urls.insert(url);
}
let urls: Vec<url::Url> = set_urls.into_iter().collect();
self.0.config.set_mapped_listeners(Some(urls));
Ok(ManageMappedListenerResponse::default())
}
}
MappedListenerManagerRpcService(self.global_ctx.clone())
}
async fn run_rpc_server(&mut self) -> Result<(), Error> {
let Some(_) = self.global_ctx.config.get_rpc_portal() else {
tracing::info!("rpc server not enabled, because rpc_portal is not set.");
@@ -727,6 +777,7 @@ impl Instance {
let conn_manager = self.conn_manager.clone();
let peer_center = self.peer_center.clone();
let vpn_portal_rpc = self.get_vpn_portal_rpc_service();
let mapped_listener_manager_rpc = self.get_mapped_listener_manager_rpc_service();
let s = self.rpc_server.as_mut().unwrap();
s.registry().register(
@@ -742,6 +793,8 @@ impl Instance {
.register(PeerCenterRpcServer::new(peer_center.get_rpc_service()), "");
s.registry()
.register(VpnPortalRpcServer::new(vpn_portal_rpc), "");
s.registry()
.register(MappedListenerManageRpcServer::new(mapped_listener_manager_rpc), "");
if let Some(ip_proxy) = self.ip_proxy.as_ref() {
s.registry().register(

View File

@@ -172,6 +172,31 @@ service ConnectorManageRpc {
rpc ManageConnector(ManageConnectorRequest) returns (ManageConnectorResponse);
}
message MappedListener {
common.Url url = 1;
}
message ListMappedListenerRequest {}
message ListMappedListenerResponse { repeated MappedListener mappedlisteners = 1; }
enum MappedListenerManageAction {
MAPPED_LISTENER_ADD = 0;
MAPPED_LISTENER_REMOVE = 1;
}
message ManageMappedListenerRequest {
MappedListenerManageAction action = 1;
common.Url url = 2;
}
message ManageMappedListenerResponse {}
service MappedListenerManageRpc {
rpc ListMappedListener(ListMappedListenerRequest) returns (ListMappedListenerResponse);
rpc ManageMappedListener(ManageMappedListenerRequest) returns (ManageMappedListenerResponse);
}
message VpnPortalInfo {
string vpn_type = 1;
string client_config = 2;