diff --git a/easytier/build.rs b/easytier/build.rs index 9ef3d7c..f12911f 100644 --- a/easytier/build.rs +++ b/easytier/build.rs @@ -148,6 +148,7 @@ fn main() -> Result<(), Box> { "src/proto/web.proto", "src/proto/magic_dns.proto", "src/proto/acl.proto", + "src/proto/config.proto", ]; for proto_file in proto_files.iter().chain(proto_files_reflect.iter()) { @@ -162,6 +163,7 @@ fn main() -> Result<(), Box> { .type_attribute(".error", "#[derive(serde::Serialize, serde::Deserialize)]") .type_attribute(".cli", "#[derive(serde::Serialize, serde::Deserialize)]") .type_attribute(".web", "#[derive(serde::Serialize, serde::Deserialize)]") + .type_attribute(".config", "#[derive(serde::Serialize, serde::Deserialize)]") .type_attribute( "peer_rpc.GetIpListResponse", "#[derive(serde::Serialize, serde::Deserialize)]", diff --git a/easytier/src/common/config.rs b/easytier/src/common/config.rs index ada2fbb..a4b3e18 100644 --- a/easytier/src/common/config.rs +++ b/easytier/src/common/config.rs @@ -152,6 +152,7 @@ pub trait ConfigLoader: Send + Sync { mapped_cidr: Option, ) -> Result<(), anyhow::Error>; fn remove_proxy_cidr(&self, cidr: cidr::Ipv4Cidr); + fn clear_proxy_cidrs(&self); fn get_proxy_cidrs(&self) -> Vec; fn get_network_identity(&self) -> NetworkIdentity; @@ -610,6 +611,11 @@ impl ConfigLoader for TomlConfigLoader { } } + fn clear_proxy_cidrs(&self) { + let mut locked_config = self.config.lock().unwrap(); + locked_config.proxy_network = None; + } + fn get_proxy_cidrs(&self) -> Vec { self.config .lock() diff --git a/easytier/src/common/global_ctx.rs b/easytier/src/common/global_ctx.rs index 448be26..5633450 100644 --- a/easytier/src/common/global_ctx.rs +++ b/easytier/src/common/global_ctx.rs @@ -12,6 +12,7 @@ use crate::peers::acl_filter::AclFilter; use crate::proto::acl::GroupIdentity; use crate::proto::cli::PeerConnInfo; use crate::proto::common::{PeerFeatureFlag, PortForwardConfigPb}; +use crate::proto::config::InstanceConfigPatch; use crate::proto::peer_rpc::PeerGroupInfo; use crossbeam::atomic::AtomicCell; @@ -52,6 +53,8 @@ pub enum GlobalCtxEvent { DhcpIpv4Conflicted(Option), PortForwardAdded(PortForwardConfigPb), + + ConfigPatched(InstanceConfigPatch), } pub type EventBus = tokio::sync::broadcast::Sender; diff --git a/easytier/src/easytier-cli.rs b/easytier/src/easytier-cli.rs index 3fac958..5a3ce6d 100644 --- a/easytier/src/easytier-cli.rs +++ b/easytier/src/easytier-cli.rs @@ -21,29 +21,30 @@ use tokio::time::timeout; use easytier::{ common::{ - config::PortForwardConfig, constants::EASYTIER_VERSION, stun::{StunInfoCollector, StunInfoCollectorTrait}, }, peers, proto::{ cli::{ - list_peer_route_pair, AclManageRpc, AclManageRpcClientFactory, AddPortForwardRequest, - ConnectorManageRpc, ConnectorManageRpcClientFactory, DumpRouteRequest, - GetAclStatsRequest, GetLoggerConfigRequest, GetPrometheusStatsRequest, GetStatsRequest, + list_peer_route_pair, AclManageRpc, AclManageRpcClientFactory, ConnectorManageRpc, + ConnectorManageRpcClientFactory, DumpRouteRequest, GetAclStatsRequest, + GetLoggerConfigRequest, GetPrometheusStatsRequest, GetStatsRequest, GetVpnPortalInfoRequest, GetWhitelistRequest, ListConnectorRequest, ListForeignNetworkRequest, ListGlobalForeignNetworkRequest, ListMappedListenerRequest, ListPeerRequest, ListPeerResponse, ListPortForwardRequest, ListRouteRequest, ListRouteResponse, LogLevel, LoggerRpc, LoggerRpcClientFactory, - ManageMappedListenerRequest, MappedListenerManageAction, MappedListenerManageRpc, - MappedListenerManageRpcClientFactory, NodeInfo, PeerManageRpc, + MappedListenerManageRpc, MappedListenerManageRpcClientFactory, NodeInfo, PeerManageRpc, PeerManageRpcClientFactory, PortForwardManageRpc, PortForwardManageRpcClientFactory, - RemovePortForwardRequest, SetLoggerConfigRequest, SetWhitelistRequest, - ShowNodeInfoRequest, StatsRpc, StatsRpcClientFactory, TcpProxyEntryState, - TcpProxyEntryTransportType, TcpProxyRpc, TcpProxyRpcClientFactory, VpnPortalRpc, - VpnPortalRpcClientFactory, + SetLoggerConfigRequest, ShowNodeInfoRequest, StatsRpc, StatsRpcClientFactory, + TcpProxyEntryState, TcpProxyEntryTransportType, TcpProxyRpc, TcpProxyRpcClientFactory, + VpnPortalRpc, VpnPortalRpcClientFactory, + }, + common::{NatType, PortForwardConfigPb, SocketType}, + config::{ + AclPatch, ConfigPatchAction, ConfigRpc, ConfigRpcClientFactory, InstanceConfigPatch, + PatchConfigRequest, PortForwardPatch, StringPatch, UrlPatch, }, - common::{NatType, SocketType}, peer_rpc::{GetGlobalPeerMapRequest, PeerCenterRpc, PeerCenterRpcClientFactory}, rpc_impl::standalone::StandAloneClient, rpc_types::controller::BaseController, @@ -476,6 +477,18 @@ impl CommandHandler<'_> { .with_context(|| "failed to get logger client")?) } + async fn get_config_client( + &self, + ) -> Result>, Error> { + Ok(self + .client + .lock() + .await + .scoped_client::>("".to_string()) + .await + .with_context(|| "failed to get config client")?) + } + async fn list_peers(&self) -> Result { let client = self.get_peer_manager_client().await?; let request = ListPeerRequest::default(); @@ -931,28 +944,24 @@ impl CommandHandler<'_> { Ok(()) } - async fn handle_mapped_listener_add(&self, url: &str) -> Result<(), Error> { + async fn handle_mapped_listener_modify( + &self, + url: &str, + action: ConfigPatchAction, + ) -> Result<(), Error> { let url = Self::mapped_listener_validate_url(url)?; - let client = self.get_mapped_listener_manager_client().await?; - let request = ManageMappedListenerRequest { - action: MappedListenerManageAction::MappedListenerAdd as i32, - url: Some(url.into()), + let client = self.get_config_client().await?; + let request = PatchConfigRequest { + patch: Some(InstanceConfigPatch { + mapped_listeners: vec![UrlPatch { + action: action.into(), + url: Some(url.into()), + }], + ..Default::default() + }), }; let _response = client - .manage_mapped_listener(BaseController::default(), request) - .await?; - Ok(()) - } - - async fn handle_mapped_listener_remove(&self, url: &str) -> Result<(), Error> { - let url = Self::mapped_listener_validate_url(url)?; - let client = self.get_mapped_listener_manager_client().await?; - let request = ManageMappedListenerRequest { - action: MappedListenerManageAction::MappedListenerRemove as i32, - url: Some(url.into()), - }; - let _response = client - .manage_mapped_listener(BaseController::default(), request) + .patch_config(BaseController::default(), request) .await?; Ok(()) } @@ -969,47 +978,9 @@ impl CommandHandler<'_> { Ok(url) } - async fn handle_port_forward_add( - &self, - protocol: &str, - bind_addr: &str, - dst_addr: &str, - ) -> Result<(), Error> { - let bind_addr: std::net::SocketAddr = bind_addr - .parse() - .with_context(|| format!("Invalid bind address: {}", bind_addr))?; - let dst_addr: std::net::SocketAddr = dst_addr - .parse() - .with_context(|| format!("Invalid destination address: {}", dst_addr))?; - - if protocol != "tcp" && protocol != "udp" { - return Err(anyhow::anyhow!("Protocol must be 'tcp' or 'udp'")); - } - - let client = self.get_port_forward_manager_client().await?; - let request = AddPortForwardRequest { - cfg: Some( - PortForwardConfig { - proto: protocol.to_string(), - bind_addr, - dst_addr, - } - .into(), - ), - }; - - client - .add_port_forward(BaseController::default(), request) - .await?; - println!( - "Port forward rule added: {} {} -> {}", - protocol, bind_addr, dst_addr - ); - Ok(()) - } - - async fn handle_port_forward_remove( + async fn handle_port_forward_modify( &self, + action: ConfigPatchAction, protocol: &str, bind_addr: &str, dst_addr: Option<&str>, @@ -1018,28 +989,36 @@ impl CommandHandler<'_> { .parse() .with_context(|| format!("Invalid bind address: {}", bind_addr))?; - if protocol != "tcp" && protocol != "udp" { - return Err(anyhow::anyhow!("Protocol must be 'tcp' or 'udp'")); - } + let socket_type = match protocol { + "tcp" => SocketType::Tcp, + "udp" => SocketType::Udp, + _ => return Err(anyhow::anyhow!("Protocol must be 'tcp' or 'udp'")), + }; - let client = self.get_port_forward_manager_client().await?; - let request = RemovePortForwardRequest { - cfg: Some( - PortForwardConfig { - proto: protocol.to_string(), - bind_addr, - dst_addr: dst_addr - .map(|s| s.parse::().unwrap()) - .unwrap_or("0.0.0.0:0".parse::().unwrap()), - } - .into(), - ), + let client = self.get_config_client().await?; + let request = PatchConfigRequest { + patch: Some(InstanceConfigPatch { + port_forwards: vec![PortForwardPatch { + action: action.into(), + cfg: Some(PortForwardConfigPb { + bind_addr: Some(bind_addr.into()), + dst_addr: dst_addr.map(|s| s.parse::().unwrap().into()), + socket_type: socket_type.into(), + }), + }], + ..Default::default() + }), }; client - .remove_port_forward(BaseController::default(), request) + .patch_config(BaseController::default(), request) .await?; - println!("Port forward rule removed: {} {}", protocol, bind_addr); + println!( + "Port forward rule {}: {} {}", + action.as_str_name().to_lowercase(), + protocol, + bind_addr + ); Ok(()) } @@ -1086,78 +1065,114 @@ impl CommandHandler<'_> { } async fn handle_whitelist_set_tcp(&self, ports: &str) -> Result<(), Error> { - let tcp_ports = Self::parse_port_list(ports)?; - let client = self.get_acl_manager_client().await?; + let mut whitelist = Self::parse_port_list(ports)? + .into_iter() + .map(|p| StringPatch { + action: ConfigPatchAction::Add.into(), + value: p, + }) + .collect::>(); + whitelist.insert( + 0, + StringPatch { + action: ConfigPatchAction::Clear.into(), + value: "".to_string(), + }, + ); + let client = self.get_config_client().await?; - // Get current UDP ports to preserve them - let current = client - .get_whitelist(BaseController::default(), GetWhitelistRequest::default()) - .await?; - let request = SetWhitelistRequest { - tcp_ports, - udp_ports: current.udp_ports, + let request = PatchConfigRequest { + patch: Some(InstanceConfigPatch { + acl: Some(AclPatch { + tcp_whitelist: whitelist, + ..Default::default() + }), + ..Default::default() + }), }; client - .set_whitelist(BaseController::default(), request) + .patch_config(BaseController::default(), request) .await?; println!("TCP whitelist updated: {}", ports); Ok(()) } async fn handle_whitelist_set_udp(&self, ports: &str) -> Result<(), Error> { - let udp_ports = Self::parse_port_list(ports)?; - let client = self.get_acl_manager_client().await?; + let mut whitelist = Self::parse_port_list(ports)? + .into_iter() + .map(|p| StringPatch { + action: ConfigPatchAction::Add.into(), + value: p, + }) + .collect::>(); + whitelist.insert( + 0, + StringPatch { + action: ConfigPatchAction::Clear.into(), + value: "".to_string(), + }, + ); + let client = self.get_config_client().await?; - // Get current TCP ports to preserve them - let current = client - .get_whitelist(BaseController::default(), GetWhitelistRequest::default()) - .await?; - let request = SetWhitelistRequest { - tcp_ports: current.tcp_ports, - udp_ports, + let request = PatchConfigRequest { + patch: Some(InstanceConfigPatch { + acl: Some(AclPatch { + udp_whitelist: whitelist, + ..Default::default() + }), + ..Default::default() + }), }; client - .set_whitelist(BaseController::default(), request) + .patch_config(BaseController::default(), request) .await?; println!("UDP whitelist updated: {}", ports); Ok(()) } async fn handle_whitelist_clear_tcp(&self) -> Result<(), Error> { - let client = self.get_acl_manager_client().await?; + let client = self.get_config_client().await?; - // Get current UDP ports to preserve them - let current = client - .get_whitelist(BaseController::default(), GetWhitelistRequest::default()) - .await?; - let request = SetWhitelistRequest { - tcp_ports: vec![], - udp_ports: current.udp_ports, + let request = PatchConfigRequest { + patch: Some(InstanceConfigPatch { + acl: Some(AclPatch { + tcp_whitelist: vec![StringPatch { + action: ConfigPatchAction::Clear.into(), + value: "".to_string(), + }], + ..Default::default() + }), + ..Default::default() + }), }; client - .set_whitelist(BaseController::default(), request) + .patch_config(BaseController::default(), request) .await?; println!("TCP whitelist cleared"); Ok(()) } async fn handle_whitelist_clear_udp(&self) -> Result<(), Error> { - let client = self.get_acl_manager_client().await?; + let client = self.get_config_client().await?; - // Get current TCP ports to preserve them - let current = client - .get_whitelist(BaseController::default(), GetWhitelistRequest::default()) - .await?; - let request = SetWhitelistRequest { - tcp_ports: current.tcp_ports, - udp_ports: vec![], + let request = PatchConfigRequest { + patch: Some(InstanceConfigPatch { + acl: Some(AclPatch { + udp_whitelist: vec![StringPatch { + action: ConfigPatchAction::Clear.into(), + value: "".to_string(), + }], + ..Default::default() + }), + ..Default::default() + }), }; client - .set_whitelist(BaseController::default(), request) + .patch_config(BaseController::default(), request) .await?; println!("UDP whitelist cleared"); Ok(()) @@ -1626,11 +1641,15 @@ async fn main() -> Result<(), Error> { SubCommand::MappedListener(mapped_listener_args) => { match mapped_listener_args.sub_command { Some(MappedListenerSubCommand::Add { url }) => { - handler.handle_mapped_listener_add(&url).await?; + handler + .handle_mapped_listener_modify(&url, ConfigPatchAction::Add) + .await?; println!("add mapped listener: {url}"); } Some(MappedListenerSubCommand::Remove { url }) => { - handler.handle_mapped_listener_remove(&url).await?; + handler + .handle_mapped_listener_modify(&url, ConfigPatchAction::Remove) + .await?; println!("remove mapped listener: {url}"); } Some(MappedListenerSubCommand::List) | None => { @@ -1993,7 +2012,12 @@ async fn main() -> Result<(), Error> { dst_addr, }) => { handler - .handle_port_forward_add(protocol, bind_addr, dst_addr) + .handle_port_forward_modify( + ConfigPatchAction::Add, + protocol, + bind_addr, + Some(dst_addr), + ) .await?; } Some(PortForwardSubCommand::Remove { @@ -2002,7 +2026,12 @@ async fn main() -> Result<(), Error> { dst_addr, }) => { handler - .handle_port_forward_remove(protocol, bind_addr, dst_addr.as_deref()) + .handle_port_forward_modify( + ConfigPatchAction::Remove, + protocol, + bind_addr, + dst_addr.as_deref(), + ) .await?; } Some(PortForwardSubCommand::List) | None => { diff --git a/easytier/src/instance/instance.rs b/easytier/src/instance/instance.rs index caa65fb..41bea23 100644 --- a/easytier/src/instance/instance.rs +++ b/easytier/src/instance/instance.rs @@ -34,15 +34,16 @@ use crate::peers::rpc_service::PeerManagerRpcService; use crate::peers::{create_packet_recv_chan, recv_packet_from_chan, PacketRecvChanReceiver}; use crate::proto::cli::VpnPortalRpc; use crate::proto::cli::{ - AddPortForwardRequest, AddPortForwardResponse, GetPrometheusStatsRequest, - GetPrometheusStatsResponse, GetStatsRequest, GetStatsResponse, ListMappedListenerRequest, - ListMappedListenerResponse, ListPortForwardRequest, ListPortForwardResponse, - ManageMappedListenerRequest, ManageMappedListenerResponse, MappedListener, - MappedListenerManageAction, MappedListenerManageRpc, MetricSnapshot, PortForwardManageRpc, - RemovePortForwardRequest, RemovePortForwardResponse, StatsRpc, + GetPrometheusStatsRequest, GetPrometheusStatsResponse, GetStatsRequest, GetStatsResponse, + ListMappedListenerRequest, ListMappedListenerResponse, ListPortForwardRequest, + ListPortForwardResponse, MappedListener, MappedListenerManageRpc, MetricSnapshot, + PortForwardManageRpc, StatsRpc, }; use crate::proto::cli::{GetVpnPortalInfoRequest, GetVpnPortalInfoResponse, VpnPortalInfo}; -use crate::proto::common::{PortForwardConfigPb, TunnelInfo}; +use crate::proto::common::{PortForwardConfigPb, TunnelInfo, Void}; +use crate::proto::config::{ + ConfigPatchAction, ConfigRpc, ConfigRpcServer, PatchConfigRequest, PortForwardPatch, +}; use crate::proto::peer_rpc::PeerCenterRpcServer; use crate::proto::rpc_impl::standalone::{RpcServerHook, StandAloneServer}; use crate::proto::rpc_types; @@ -226,6 +227,234 @@ impl RpcServerHook for InstanceRpcServerHook { } } +#[derive(Clone)] +pub struct InstanceConfigPatcher { + global_ctx: ArcGlobalCtx, + socks5_server: Weak, + peer_manager: Arc, +} + +impl InstanceConfigPatcher { + pub async fn apply_patch( + &self, + patch: crate::proto::config::InstanceConfigPatch, + ) -> Result<(), anyhow::Error> { + let patch_for_event = patch.clone(); + + self.patch_port_forwards(patch.port_forwards).await?; + self.patch_acl(patch.acl).await?; + self.patch_proxy_networks(patch.proxy_networks).await?; + self.patch_routes(patch.routes).await?; + self.patch_exit_nodes(patch.exit_nodes).await?; + self.patch_mapped_listeners(patch.mapped_listeners).await?; + if let Some(hostname) = patch.hostname { + self.global_ctx.set_hostname(hostname.clone()); + self.global_ctx.config.set_hostname(Some(hostname)); + } + if let Some(ipv4) = patch.ipv4 { + if !self.global_ctx.config.get_dhcp() { + self.global_ctx.set_ipv4(Some(ipv4.into())); + self.global_ctx.config.set_ipv4(Some(ipv4.into())); + } + } + if let Some(ipv6) = patch.ipv6 { + self.global_ctx.set_ipv6(Some(ipv6.into())); + self.global_ctx.config.set_ipv6(Some(ipv6.into())); + } + self.global_ctx + .issue_event(GlobalCtxEvent::ConfigPatched(patch_for_event)); + + Ok(()) + } + + fn trace_patchables(patches: &Vec>) { + for patch in patches { + match patch.action { + Some(ConfigPatchAction::Add) | Some(ConfigPatchAction::Remove) => { + if let Some(value) = &patch.value { + tracing::info!("{:?} {:?}", patch.action, value); + } else { + tracing::warn!( + "Ignored {:?} patch with no value for type '{}'. Please ensure the patch value is provided.", + patch.action, + std::any::type_name::() + ); + } + } + Some(ConfigPatchAction::Clear) => { + tracing::info!("Clear all for type '{}'", std::any::type_name::()); + } + None => { + tracing::warn!( + "Invalid patch action for type '{}'", + std::any::type_name::() + ); + } + } + } + } + + async fn patch_port_forwards( + &self, + port_forwards: Vec, + ) -> Result<(), anyhow::Error> { + if port_forwards.is_empty() { + return Ok(()); + } + let Some(socks5_server) = self.socks5_server.upgrade() else { + return Err(anyhow::anyhow!("socks5 server not available")); + }; + + let mut current_forwards = self.global_ctx.config.get_port_forwards(); + let patches = port_forwards.into_iter().map(Into::into).collect(); + InstanceConfigPatcher::trace_patchables(&patches); + crate::proto::config::patch_vec(&mut current_forwards, patches); + + self.global_ctx + .config + .set_port_forwards(current_forwards.clone()); + socks5_server + .reload_port_forwards(¤t_forwards) + .await + .with_context(|| "Failed to reload port forwards")?; + + Ok(()) + } + + async fn patch_acl( + &self, + acl_patch: Option, + ) -> Result<(), anyhow::Error> { + let Some(acl_patch) = acl_patch else { + return Ok(()); + }; + if let Some(acl) = acl_patch.acl { + self.global_ctx.config.set_acl(Some(acl)); + } + if !acl_patch.tcp_whitelist.is_empty() { + let mut current_whitelist = self.global_ctx.config.get_tcp_whitelist(); + let patches = acl_patch + .tcp_whitelist + .into_iter() + .map(Into::into) + .collect(); + InstanceConfigPatcher::trace_patchables(&patches); + crate::proto::config::patch_vec(&mut current_whitelist, patches); + self.global_ctx.config.set_tcp_whitelist(current_whitelist); + } + if !acl_patch.udp_whitelist.is_empty() { + let mut current_whitelist = self.global_ctx.config.get_udp_whitelist(); + let patches = acl_patch + .udp_whitelist + .into_iter() + .map(Into::into) + .collect(); + InstanceConfigPatcher::trace_patchables(&patches); + crate::proto::config::patch_vec(&mut current_whitelist, patches); + self.global_ctx.config.set_udp_whitelist(current_whitelist); + } + self.global_ctx + .get_acl_filter() + .reload_rules(AclRuleBuilder::build(&self.global_ctx)?.as_ref()); + Ok(()) + } + + async fn patch_proxy_networks( + &self, + proxy_networks: Vec, + ) -> Result<(), anyhow::Error> { + if proxy_networks.is_empty() { + return Ok(()); + } + for proxy_network_patch in proxy_networks { + let Some(cidr) = proxy_network_patch.cidr.map(|c| c.into()) else { + tracing::warn!("Proxy network cidr is None, skipping."); + continue; + }; + let mapped_cidr: Option = + proxy_network_patch.mapped_cidr.map(|s| s.into()); + match ConfigPatchAction::try_from(proxy_network_patch.action) { + Ok(ConfigPatchAction::Add) => { + tracing::info!("Proxy network added: {}", cidr); + self.global_ctx.config.add_proxy_cidr(cidr, mapped_cidr)?; + } + Ok(ConfigPatchAction::Remove) => { + tracing::info!("Proxy network removed: {}", cidr); + self.global_ctx.config.remove_proxy_cidr(cidr); + } + Ok(ConfigPatchAction::Clear) => { + tracing::info!("Proxy networks cleared."); + self.global_ctx.config.clear_proxy_cidrs(); + } + Err(_) => { + tracing::warn!( + "Invalid proxy network action: {}", + proxy_network_patch.action + ); + } + } + } + Ok(()) + } + + async fn patch_routes( + &self, + routes: Vec, + ) -> Result<(), anyhow::Error> { + if routes.is_empty() { + return Ok(()); + } + let mut current_routes = self.global_ctx.config.get_routes().unwrap_or_default(); + let patches = routes.into_iter().map(Into::into).collect(); + InstanceConfigPatcher::trace_patchables(&patches); + crate::proto::config::patch_vec(&mut current_routes, patches); + if current_routes.is_empty() { + self.global_ctx.config.set_routes(None); + } else { + self.global_ctx.config.set_routes(Some(current_routes)); + } + Ok(()) + } + + async fn patch_exit_nodes( + &self, + exit_nodes: Vec, + ) -> Result<(), anyhow::Error> { + if exit_nodes.is_empty() { + return Ok(()); + } + let mut current_exit_nodes = self.global_ctx.config.get_exit_nodes(); + let patches = exit_nodes.into_iter().map(Into::into).collect(); + InstanceConfigPatcher::trace_patchables(&patches); + crate::proto::config::patch_vec(&mut current_exit_nodes, patches); + self.global_ctx.config.set_exit_nodes(current_exit_nodes); + self.peer_manager.update_exit_nodes().await; + + Ok(()) + } + + async fn patch_mapped_listeners( + &self, + mapped_listeners: Vec, + ) -> Result<(), anyhow::Error> { + if mapped_listeners.is_empty() { + return Ok(()); + } + let mut current_mapped_listeners = self.global_ctx.config.get_mapped_listeners(); + let patches = mapped_listeners.into_iter().map(Into::into).collect(); + InstanceConfigPatcher::trace_patchables(&patches); + crate::proto::config::patch_vec(&mut current_mapped_listeners, patches); + if current_mapped_listeners.is_empty() { + self.global_ctx.config.set_mapped_listeners(None); + } else { + self.global_ctx + .config + .set_mapped_listeners(Some(current_mapped_listeners)); + } + Ok(()) + } +} + pub struct Instance { inst_name: String, @@ -827,25 +1056,6 @@ impl Instance { ret.mappedlisteners = mapped_listeners; Ok(ret) } - - async fn manage_mapped_listener( - &self, - _: BaseController, - req: ManageMappedListenerRequest, - ) -> Result { - let url: url::Url = req.url.ok_or(anyhow::anyhow!("url is empty"))?.into(); - - let urls = self.0.config.get_mapped_listeners(); - let mut set_urls: HashSet = urls.into_iter().collect(); - if req.action == MappedListenerManageAction::MappedListenerRemove as i32 { - set_urls.remove(&url); - } else if req.action == MappedListenerManageAction::MappedListenerAdd as i32 { - set_urls.insert(url); - } - let urls: Vec = set_urls.into_iter().collect(); - self.0.config.set_mapped_listeners(Some(urls)); - Ok(ManageMappedListenerResponse::default()) - } } MappedListenerManagerRpcService(self.global_ctx.clone()) @@ -864,55 +1074,6 @@ impl Instance { impl PortForwardManageRpc for PortForwardManagerRpcService { type Controller = BaseController; - async fn add_port_forward( - &self, - _: BaseController, - request: AddPortForwardRequest, - ) -> Result { - let Some(socks5_server) = self.socks5_server.upgrade() else { - return Err(anyhow::anyhow!("socks5 server not available").into()); - }; - if let Some(cfg) = request.cfg { - tracing::info!("Port forward rule added: {:?}", cfg); - let mut current_forwards = self.global_ctx.config.get_port_forwards(); - current_forwards.push(cfg.into()); - self.global_ctx - .config - .set_port_forwards(current_forwards.clone()); - socks5_server - .reload_port_forwards(¤t_forwards) - .await - .with_context(|| "Failed to reload port forwards")?; - } - Ok(AddPortForwardResponse {}) - } - - async fn remove_port_forward( - &self, - _: BaseController, - request: RemovePortForwardRequest, - ) -> Result { - let Some(socks5_server) = self.socks5_server.upgrade() else { - return Err(anyhow::anyhow!("socks5 server not available").into()); - }; - let Some(cfg) = request.cfg else { - return Err(anyhow::anyhow!("port forward config is empty").into()); - }; - let cfg = cfg.into(); - let mut current_forwards = self.global_ctx.config.get_port_forwards(); - current_forwards.retain(|e| *e != cfg); - self.global_ctx - .config - .set_port_forwards(current_forwards.clone()); - socks5_server - .reload_port_forwards(¤t_forwards) - .await - .with_context(|| "Failed to reload port forwards")?; - - tracing::info!("Port forward rule removed: {:?}", cfg); - Ok(RemovePortForwardResponse {}) - } - async fn list_port_forward( &self, _: BaseController, @@ -984,6 +1145,43 @@ impl Instance { } } + pub fn get_config_patcher(&self) -> InstanceConfigPatcher { + InstanceConfigPatcher { + global_ctx: self.global_ctx.clone(), + socks5_server: Arc::downgrade(&self.socks5_server), + peer_manager: self.peer_manager.clone(), + } + } + + fn get_config_service(&self) -> impl ConfigRpc + Clone { + #[derive(Clone)] + pub struct ConfigRpcService { + patcher: InstanceConfigPatcher, + } + + #[async_trait::async_trait] + impl ConfigRpc for ConfigRpcService { + type Controller = BaseController; + + async fn patch_config( + &self, + _: Self::Controller, + request: PatchConfigRequest, + ) -> crate::proto::rpc_types::error::Result { + let Some(patch) = request.patch else { + return Ok(Void::default()); + }; + + self.patcher.apply_patch(patch).await?; + Ok(Void::default()) + } + } + + ConfigRpcService { + patcher: self.get_config_patcher(), + } + } + async fn run_rpc_server(&mut self) -> Result<(), Error> { let Some(_) = self.global_ctx.config.get_rpc_portal() else { tracing::info!("rpc server not enabled, because rpc_portal is not set."); @@ -1001,6 +1199,7 @@ impl Instance { let port_forward_manager_rpc = self.get_port_forward_manager_rpc_service(); let stats_rpc_service = self.get_stats_rpc_service(); let logger_rpc_service = LoggerRpcService::new(); + let config_rpc_service = self.get_config_service(); let s = self.rpc_server.as_mut().unwrap(); let peer_mgr_rpc_service = PeerManagerRpcService::new(peer_mgr.clone()); @@ -1031,6 +1230,8 @@ impl Instance { ); s.registry() .register(LoggerRpcServer::new(logger_rpc_service), ""); + s.registry() + .register(ConfigRpcServer::new(config_rpc_service), ""); if let Some(ip_proxy) = self.ip_proxy.as_ref() { s.registry().register( diff --git a/easytier/src/instance_manager.rs b/easytier/src/instance_manager.rs index dca8013..7b2a640 100644 --- a/easytier/src/instance_manager.rs +++ b/easytier/src/instance_manager.rs @@ -319,6 +319,10 @@ fn handle_event( ), ); } + + GlobalCtxEvent::ConfigPatched(patch) => { + print_event(instance_id, format!("config patched. patch: {:?}", patch)); + } } } else { events = events.resubscribe(); diff --git a/easytier/src/peers/peer_manager.rs b/easytier/src/peers/peer_manager.rs index 3c240ff..4594789 100644 --- a/easytier/src/peers/peer_manager.rs +++ b/easytier/src/peers/peer_manager.rs @@ -152,7 +152,7 @@ pub struct PeerManager { encryptor: Arc, data_compress_algo: CompressorAlgo, - exit_nodes: Vec, + exit_nodes: RwLock>, reserved_my_peer_id_map: DashMap, @@ -304,7 +304,7 @@ impl PeerManager { encryptor, data_compress_algo, - exit_nodes, + exit_nodes: RwLock::new(exit_nodes), reserved_my_peer_id_map: DashMap::new(), @@ -1068,7 +1068,7 @@ impl PeerManager { .global_ctx .is_ip_in_same_network(&std::net::IpAddr::V4(*ipv4_addr)) { - for exit_node in &self.exit_nodes { + for exit_node in self.exit_nodes.read().await.iter() { let IpAddr::V4(exit_node) = exit_node else { continue; }; @@ -1109,7 +1109,7 @@ impl PeerManager { dst_peers.push(peer_id); } else if !ipv6_addr.is_unicast_link_local() { // NOTE: never route link local address to exit node. - for exit_node in &self.exit_nodes { + for exit_node in self.exit_nodes.read().await.iter() { let IpAddr::V6(exit_node) = exit_node else { continue; }; @@ -1419,6 +1419,11 @@ impl PeerManager { true } + + pub async fn update_exit_nodes(&self) { + let exit_nodes = self.global_ctx.config.get_exit_nodes(); + *self.exit_nodes.write().await = exit_nodes; + } } #[cfg(test)] diff --git a/easytier/src/peers/rpc_service.rs b/easytier/src/peers/rpc_service.rs index c8fc21c..007c13f 100644 --- a/easytier/src/peers/rpc_service.rs +++ b/easytier/src/peers/rpc_service.rs @@ -1,18 +1,14 @@ use std::sync::Arc; -use crate::{ - common::acl_processor::AclRuleBuilder, - proto::{ - cli::{ - AclManageRpc, DumpRouteRequest, DumpRouteResponse, GetAclStatsRequest, - GetAclStatsResponse, GetWhitelistRequest, GetWhitelistResponse, - ListForeignNetworkRequest, ListForeignNetworkResponse, ListGlobalForeignNetworkRequest, - ListGlobalForeignNetworkResponse, ListPeerRequest, ListPeerResponse, ListRouteRequest, - ListRouteResponse, PeerInfo, PeerManageRpc, SetWhitelistRequest, SetWhitelistResponse, - ShowNodeInfoRequest, ShowNodeInfoResponse, - }, - rpc_types::{self, controller::BaseController}, +use crate::proto::{ + cli::{ + AclManageRpc, DumpRouteRequest, DumpRouteResponse, GetAclStatsRequest, GetAclStatsResponse, + GetWhitelistRequest, GetWhitelistResponse, ListForeignNetworkRequest, + ListForeignNetworkResponse, ListGlobalForeignNetworkRequest, + ListGlobalForeignNetworkResponse, ListPeerRequest, ListPeerResponse, ListRouteRequest, + ListRouteResponse, PeerInfo, PeerManageRpc, ShowNodeInfoRequest, ShowNodeInfoResponse, }, + rpc_types::{self, controller::BaseController}, }; use super::peer_manager::PeerManager; @@ -163,28 +159,6 @@ impl AclManageRpc for PeerManagerRpcService { }) } - async fn set_whitelist( - &self, - _: BaseController, - request: SetWhitelistRequest, - ) -> Result { - tracing::info!( - "Setting whitelist - TCP: {:?}, UDP: {:?}", - request.tcp_ports, - request.udp_ports - ); - - let global_ctx = self.peer_manager.get_global_ctx(); - - global_ctx.config.set_tcp_whitelist(request.tcp_ports); - global_ctx.config.set_udp_whitelist(request.udp_ports); - global_ctx - .get_acl_filter() - .reload_rules(AclRuleBuilder::build(&global_ctx)?.as_ref()); - - Ok(SetWhitelistResponse {}) - } - async fn get_whitelist( &self, _: BaseController, diff --git a/easytier/src/proto/cli.proto b/easytier/src/proto/cli.proto index b76b2f6..df3313d 100644 --- a/easytier/src/proto/cli.proto +++ b/easytier/src/proto/cli.proto @@ -181,21 +181,8 @@ message ListMappedListenerRequest {} message ListMappedListenerResponse { repeated MappedListener mappedlisteners = 1; } -enum MappedListenerManageAction { - MAPPED_LISTENER_ADD = 0; - MAPPED_LISTENER_REMOVE = 1; -} - -message ManageMappedListenerRequest { - MappedListenerManageAction action = 1; - common.Url url = 2; -} - -message ManageMappedListenerResponse {} - service MappedListenerManageRpc { rpc ListMappedListener(ListMappedListenerRequest) returns (ListMappedListenerResponse); - rpc ManageMappedListener(ManageMappedListenerRequest) returns (ManageMappedListenerResponse); } message VpnPortalInfo { @@ -261,17 +248,9 @@ message GetAclStatsResponse { service AclManageRpc { rpc GetAclStats(GetAclStatsRequest) returns (GetAclStatsResponse); - rpc SetWhitelist(SetWhitelistRequest) returns (SetWhitelistResponse); rpc GetWhitelist(GetWhitelistRequest) returns (GetWhitelistResponse); } -message SetWhitelistRequest { - repeated string tcp_ports = 1; - repeated string udp_ports = 2; -} - -message SetWhitelistResponse {} - message GetWhitelistRequest {} message GetWhitelistResponse { @@ -279,18 +258,6 @@ message GetWhitelistResponse { repeated string udp_ports = 2; } -message AddPortForwardRequest { - common.PortForwardConfigPb cfg = 1; -} - -message AddPortForwardResponse {} - -message RemovePortForwardRequest { - common.PortForwardConfigPb cfg = 1; -} - -message RemovePortForwardResponse {} - message ListPortForwardRequest {} message ListPortForwardResponse { @@ -298,8 +265,6 @@ message ListPortForwardResponse { } service PortForwardManageRpc { - rpc AddPortForward(AddPortForwardRequest) returns (AddPortForwardResponse); - rpc RemovePortForward(RemovePortForwardRequest) returns (RemovePortForwardResponse); rpc ListPortForward(ListPortForwardRequest) returns (ListPortForwardResponse); } diff --git a/easytier/src/proto/common.proto b/easytier/src/proto/common.proto index 41ce4ee..1ead7a2 100644 --- a/easytier/src/proto/common.proto +++ b/easytier/src/proto/common.proto @@ -49,10 +49,10 @@ message FlagsInConfig { // enable relay foreign network kcp packets bool enable_relay_foreign_network_kcp = 28; - - // encryption algorithm to use, empty string means default (aes-gcm) + + // encryption algorithm to use, empty string means default (aes-gcm) string encryption_algorithm = 29; - + // disable symmetric nat hole punching, treat symmetric as cone when enabled bool disable_sym_hole_punching = 30; } @@ -144,6 +144,13 @@ message Ipv6Addr { uint32 part4 = 4; } +message IpAddr { + oneof ip { + Ipv4Addr ipv4 = 1; + Ipv6Addr ipv6 = 2; + }; +} + message Ipv4Inet { Ipv4Addr address = 1; uint32 network_length = 2; diff --git a/easytier/src/proto/common.rs b/easytier/src/proto/common.rs index 7a2abaf..5a153c1 100644 --- a/easytier/src/proto/common.rs +++ b/easytier/src/proto/common.rs @@ -108,6 +108,43 @@ impl From for Ipv4Inet { } } +impl From for IpAddr { + fn from(value: std::net::IpAddr) -> Self { + match value { + std::net::IpAddr::V4(v4) => IpAddr { + ip: Some(ip_addr::Ip::Ipv4(Ipv4Addr::from(v4))), + }, + std::net::IpAddr::V6(v6) => IpAddr { + ip: Some(ip_addr::Ip::Ipv6(Ipv6Addr::from(v6))), + }, + } + } +} + +impl From for std::net::IpAddr { + fn from(value: IpAddr) -> Self { + match value.ip { + Some(ip_addr::Ip::Ipv4(v4)) => std::net::IpAddr::V4(v4.into()), + Some(ip_addr::Ip::Ipv6(v6)) => std::net::IpAddr::V6(v6.into()), + None => panic!("IpAddr is None"), + } + } +} + +impl Display for IpAddr { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", std::net::IpAddr::from(*self)) + } +} + +impl FromStr for IpAddr { + type Err = anyhow::Error; + + fn from_str(s: &str) -> Result { + Ok(IpAddr::from(std::net::IpAddr::from_str(s)?)) + } +} + impl From for cidr::Ipv4Inet { fn from(value: Ipv4Inet) -> Self { cidr::Ipv4Inet::new( @@ -118,6 +155,16 @@ impl From for cidr::Ipv4Inet { } } +impl From for cidr::Ipv4Cidr { + fn from(value: Ipv4Inet) -> Self { + cidr::Ipv4Cidr::new( + value.address.unwrap_or_default().into(), + value.network_length as u8, + ) + .unwrap() + } +} + impl fmt::Display for Ipv4Inet { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "{}", cidr::Ipv4Inet::from(*self)) diff --git a/easytier/src/proto/config.proto b/easytier/src/proto/config.proto new file mode 100644 index 0000000..4aa7ae2 --- /dev/null +++ b/easytier/src/proto/config.proto @@ -0,0 +1,65 @@ +syntax = "proto3"; + +import "common.proto"; +import "acl.proto"; + +package config; + +enum ConfigPatchAction { + ADD = 0; + REMOVE = 1; + CLEAR = 2; +} + +message InstanceConfigPatch { + optional string hostname = 1; + optional common.Ipv4Inet ipv4 = 2; + optional common.Ipv6Inet ipv6 = 3; + repeated PortForwardPatch port_forwards = 4; + optional AclPatch acl = 5; + repeated ProxyNetworkPatch proxy_networks = 6; + repeated RoutePatch routes = 7; + repeated ExitNodePatch exit_nodes = 8; + repeated UrlPatch mapped_listeners = 9; +} + +message PortForwardPatch { + ConfigPatchAction action = 1; + common.PortForwardConfigPb cfg = 2; +} + +message StringPatch { + ConfigPatchAction action = 1; + string value = 2; +} + +message UrlPatch { + ConfigPatchAction action = 1; + common.Url url = 2; +} + +message AclPatch { + optional acl.Acl acl = 1; + repeated StringPatch tcp_whitelist = 2; + repeated StringPatch udp_whitelist = 3; +} + +message ProxyNetworkPatch { + ConfigPatchAction action = 1; + common.Ipv4Inet cidr = 2; + optional common.Ipv4Inet mapped_cidr = 3; +} + +message RoutePatch { + ConfigPatchAction action = 1; + common.Ipv4Inet cidr = 2; +} + +message ExitNodePatch { + ConfigPatchAction action = 1; + common.IpAddr node = 2; +} + +message PatchConfigRequest { InstanceConfigPatch patch = 1; } + +service ConfigRpc { rpc PatchConfig(PatchConfigRequest) returns (common.Void); } diff --git a/easytier/src/proto/config.rs b/easytier/src/proto/config.rs new file mode 100644 index 0000000..1552636 --- /dev/null +++ b/easytier/src/proto/config.rs @@ -0,0 +1,75 @@ +include!(concat!(env!("OUT_DIR"), "/config.rs")); + +pub struct Patchable { + pub action: Option, + pub value: Option, +} + +impl From for Patchable { + fn from(patch: PortForwardPatch) -> Self { + Patchable { + action: ConfigPatchAction::try_from(patch.action).ok(), + value: patch.cfg.map(Into::into), + } + } +} + +impl From for Patchable { + fn from(value: RoutePatch) -> Self { + Patchable { + action: ConfigPatchAction::try_from(value.action).ok(), + value: value.cidr.map(Into::into), + } + } +} + +impl From for Patchable { + fn from(value: ExitNodePatch) -> Self { + Patchable { + action: ConfigPatchAction::try_from(value.action).ok(), + value: value.node.map(Into::into), + } + } +} + +impl From for Patchable { + fn from(value: StringPatch) -> Self { + Patchable { + action: ConfigPatchAction::try_from(value.action).ok(), + value: Some(value.value), + } + } +} + +impl From for Patchable { + fn from(value: UrlPatch) -> Self { + Patchable { + action: ConfigPatchAction::try_from(value.action).ok(), + value: value.url.map(Into::into), + } + } +} + +pub fn patch_vec(v: &mut Vec, patches: Vec>) +where + T: PartialEq, +{ + for patch in patches { + match patch.action { + Some(ConfigPatchAction::Add) => { + if let Some(value) = patch.value { + v.push(value); + } + } + Some(ConfigPatchAction::Remove) => { + if let Some(value) = patch.value { + v.retain(|x| x != &value); + } + } + Some(ConfigPatchAction::Clear) => { + v.clear(); + } + None => {} + } + } +} diff --git a/easytier/src/proto/mod.rs b/easytier/src/proto/mod.rs index fd8b455..b99fbe7 100644 --- a/easytier/src/proto/mod.rs +++ b/easytier/src/proto/mod.rs @@ -4,6 +4,7 @@ pub mod rpc_types; pub mod acl; pub mod cli; pub mod common; +pub mod config; pub mod error; pub mod magic_dns; pub mod peer_rpc; diff --git a/easytier/src/tests/mod.rs b/easytier/src/tests/mod.rs index ee79b54..1876314 100644 --- a/easytier/src/tests/mod.rs +++ b/easytier/src/tests/mod.rs @@ -147,6 +147,21 @@ fn check_route(ipv4: &str, dst_peer_id: PeerId, routes: Vec, + peer_id: PeerId, + checker: impl Fn(&crate::proto::cli::Route) -> bool, +) { + let mut found = false; + for r in routes.iter() { + if r.peer_id == peer_id { + found = true; + assert!(checker(r), "{:?}", routes); + } + } + assert!(found, "routes: {:?}, dst_peer_id: {}", routes, peer_id); +} + async fn wait_proxy_route_appear( mgr: &std::sync::Arc, ipv4: &str, diff --git a/easytier/src/tests/three_node.rs b/easytier/src/tests/three_node.rs index da3af46..f553acb 100644 --- a/easytier/src/tests/three_node.rs +++ b/easytier/src/tests/three_node.rs @@ -2094,3 +2094,96 @@ pub async fn acl_group_based_test( drop_insts(insts).await; } + +#[rstest::rstest] +#[tokio::test] +#[serial_test::serial] +pub async fn config_patch_test() { + use crate::proto::{ + common::{PortForwardConfigPb, SocketType}, + config::{ConfigPatchAction, InstanceConfigPatch, PortForwardPatch, ProxyNetworkPatch}, + }; + use crate::tunnel::common::tests::_tunnel_pingpong_netns_with_timeout; + + let insts = init_three_node("udp").await; + + check_route( + "10.144.144.2/24", + insts[1].peer_id(), + insts[0].get_peer_manager().list_routes().await, + ); + + check_route( + "10.144.144.3/24", + insts[2].peer_id(), + insts[0].get_peer_manager().list_routes().await, + ); + + // 测试1: 修改hostname、ip、子网代理 + let patch = InstanceConfigPatch { + hostname: Some("new_inst1".to_string()), + ipv4: Some("10.144.144.22/24".parse().unwrap()), + proxy_networks: vec![ProxyNetworkPatch { + action: ConfigPatchAction::Add as i32, + cidr: Some("10.144.145.0/24".parse().unwrap()), + mapped_cidr: None, + }], + ..Default::default() + }; + insts[1] + .get_config_patcher() + .apply_patch(patch) + .await + .unwrap(); + assert_eq!(insts[1].get_global_ctx().get_hostname(), "new_inst1"); + assert_eq!( + insts[1].get_global_ctx().get_ipv4().unwrap(), + "10.144.144.22/24".parse().unwrap() + ); + tokio::time::sleep(Duration::from_secs(1)).await; + check_route_ex( + insts[0].get_peer_manager().list_routes().await, + insts[1].peer_id(), + |r| { + assert_eq!(r.hostname, "new_inst1"); + assert_eq!(r.ipv4_addr, Some("10.144.144.22/24".parse().unwrap())); + assert_eq!(r.proxy_cidrs[0], "10.144.145.0/24"); + true + }, + ); + + // 测试2: 端口转发 + let patch = InstanceConfigPatch { + port_forwards: vec![PortForwardPatch { + action: ConfigPatchAction::Add as i32, + cfg: Some(PortForwardConfigPb { + bind_addr: Some("0.0.0.0:23458".parse::().unwrap().into()), + dst_addr: Some("10.144.144.3:23457".parse::().unwrap().into()), + socket_type: SocketType::Tcp as i32, + }), + }], + ..Default::default() + }; + insts[0] + .get_config_patcher() + .apply_patch(patch) + .await + .unwrap(); + + let mut buf = vec![0; 32]; + rand::thread_rng().fill(&mut buf[..]); + let tcp_listener = TcpTunnelListener::new("tcp://0.0.0.0:23457".parse().unwrap()); + let tcp_connector = TcpTunnelConnector::new("tcp://127.0.0.1:23458".parse().unwrap()); + let result = _tunnel_pingpong_netns_with_timeout( + tcp_listener, + tcp_connector, + NetNS::new(Some("net_c".into())), + NetNS::new(Some("net_a".into())), + buf.clone(), + std::time::Duration::from_millis(30000), + ) + .await; + assert!(result.is_ok(), "Port forward pingpong should succeed"); + + drop_insts(insts).await; +}