diff --git a/easytier-web/frontend-lib/src/modules/api.ts b/easytier-web/frontend-lib/src/modules/api.ts index 26ff64c..699a79a 100644 --- a/easytier-web/frontend-lib/src/modules/api.ts +++ b/easytier-web/frontend-lib/src/modules/api.ts @@ -1,6 +1,7 @@ import axios, { AxiosError, AxiosInstance, AxiosResponse, InternalAxiosRequestConfig } from 'axios'; import { Md5 } from 'ts-md5' import { UUID } from './utils'; +import { NetworkConfig } from '../types/network'; export interface ValidateConfigResponse { toml_config: string; @@ -37,6 +38,15 @@ export interface ListNetworkInstanceIdResponse { disabled_inst_ids: Array, } +export interface GenerateConfigRequest { + config: NetworkConfig; +} + +export interface GenerateConfigResponse { + toml_config?: string; + error?: string; +} + export class ApiClient { private client: AxiosInstance; private authFailedCb: Function | undefined; @@ -193,6 +203,18 @@ export class ApiClient { public captcha_url() { return this.client.defaults.baseURL + '/auth/captcha'; } + + public async generate_config(config: GenerateConfigRequest): Promise { + try { + const response = await this.client.post('/generate-config', config); + return response; + } catch (error) { + if (error instanceof AxiosError) { + return { error: error.response?.data }; + } + return { error: 'Unknown error: ' + error }; + } + } } export default ApiClient; \ No newline at end of file diff --git a/easytier-web/frontend/src/components/ConfigGenerator.vue b/easytier-web/frontend/src/components/ConfigGenerator.vue new file mode 100644 index 0000000..3c870f7 --- /dev/null +++ b/easytier-web/frontend/src/components/ConfigGenerator.vue @@ -0,0 +1,39 @@ + + + diff --git a/easytier-web/frontend/src/main.ts b/easytier-web/frontend/src/main.ts index 1512c48..d25d176 100644 --- a/easytier-web/frontend/src/main.ts +++ b/easytier-web/frontend/src/main.ts @@ -15,6 +15,7 @@ import DeviceManagement from './components/DeviceManagement.vue' import Dashboard from './components/Dashboard.vue' import DialogService from 'primevue/dialogservice'; import ToastService from 'primevue/toastservice'; +import ConfigGenerator from './components/ConfigGenerator.vue' const routes = [ { @@ -66,6 +67,10 @@ const routes = [ } } }, + { + path: '/config_generator', + component: ConfigGenerator, + } ] const router = createRouter({ diff --git a/easytier-web/src/restful/mod.rs b/easytier-web/src/restful/mod.rs index 9d780c7..3169d0f 100644 --- a/easytier-web/src/restful/mod.rs +++ b/easytier-web/src/restful/mod.rs @@ -6,11 +6,14 @@ mod users; use std::{net::SocketAddr, sync::Arc}; use axum::http::StatusCode; +use axum::routing::post; use axum::{extract::State, routing::get, Json, Router}; use axum_login::tower_sessions::{ExpiredDeletion, SessionManagerLayer}; use axum_login::{login_required, AuthManagerLayerBuilder, AuthzBackend}; use axum_messages::MessagesManagerLayer; +use easytier::common::config::ConfigLoader; use easytier::common::scoped_task::ScopedTask; +use easytier::launcher::NetworkConfig; use easytier::proto::rpc_types; use network::NetworkApi; use sea_orm::DbErr; @@ -48,6 +51,17 @@ struct GetSummaryJsonResp { device_count: u32, } +#[derive(Debug, serde::Deserialize, serde::Serialize)] +struct GenerateConfigRequest { + config: NetworkConfig, +} + +#[derive(Debug, serde::Deserialize, serde::Serialize)] +struct GenerateConfigResponse { + error: Option, + toml_config: Option, +} + #[derive(Debug, serde::Deserialize, serde::Serialize)] pub struct Error { message: String, @@ -131,6 +145,24 @@ impl RestfulServer { .into()) } + async fn handle_generate_config( + Json(req): Json, + ) -> Result, HttpHandleError> { + let config = req.config.gen_config(); + match config { + Ok(c) => Ok(GenerateConfigResponse { + error: None, + toml_config: Some(c.dump()), + } + .into()), + Err(e) => Ok(GenerateConfigResponse { + error: Some(format!("{:?}", e)), + toml_config: None, + } + .into()), + } + } + pub async fn start(&mut self) -> Result<(), anyhow::Error> { let listener = TcpListener::bind(self.bind_addr).await?; @@ -178,6 +210,10 @@ impl RestfulServer { .route_layer(login_required!(Backend)) .merge(auth::router()) .with_state(self.client_mgr.clone()) + .route( + "/api/v1/generate-config", + post(Self::handle_generate_config), + ) .layer(MessagesManagerLayer) .layer(auth_layer) .layer(tower_http::cors::CorsLayer::very_permissive()) diff --git a/easytier/src/easytier-cli.rs b/easytier/src/easytier-cli.rs index 063181b..761cc2b 100644 --- a/easytier/src/easytier-cli.rs +++ b/easytier/src/easytier-cli.rs @@ -20,7 +20,8 @@ use easytier::{ DumpRouteRequest, GetVpnPortalInfoRequest, ListConnectorRequest, ListForeignNetworkRequest, ListGlobalForeignNetworkRequest, ListPeerRequest, ListPeerResponse, ListRouteRequest, ListRouteResponse, NodeInfo, PeerManageRpc, - PeerManageRpcClientFactory, ShowNodeInfoRequest, VpnPortalRpc, + PeerManageRpcClientFactory, ShowNodeInfoRequest, TcpProxyEntryState, + TcpProxyEntryTransportType, TcpProxyRpc, TcpProxyRpcClientFactory, VpnPortalRpc, VpnPortalRpcClientFactory, }, common::NatType, @@ -58,6 +59,7 @@ enum SubCommand { VpnPortal, Node(NodeArgs), Service(ServiceArgs), + Proxy, } #[derive(Args, Debug)] @@ -221,6 +223,19 @@ impl CommandHandler { .with_context(|| "failed to get vpn portal client")?) } + async fn get_tcp_proxy_client( + &self, + transport_type: &str, + ) -> Result>, Error> { + Ok(self + .client + .lock() + .unwrap() + .scoped_client::>(transport_type.to_string()) + .await + .with_context(|| "failed to get vpn portal client")?) + } + async fn list_peers(&self) -> Result { let client = self.get_peer_manager_client().await?; let request = ListPeerRequest::default(); @@ -1088,6 +1103,55 @@ async fn main() -> Result<(), Error> { } } } + SubCommand::Proxy => { + let mut entries = vec![]; + let client = handler.get_tcp_proxy_client("tcp").await?; + let ret = client + .list_tcp_proxy_entry(BaseController::default(), Default::default()) + .await; + entries.extend(ret.unwrap_or_default().entries); + + let client = handler.get_tcp_proxy_client("kcp_src").await?; + let ret = client + .list_tcp_proxy_entry(BaseController::default(), Default::default()) + .await; + entries.extend(ret.unwrap_or_default().entries); + + let client = handler.get_tcp_proxy_client("kcp_dst").await?; + let ret = client + .list_tcp_proxy_entry(BaseController::default(), Default::default()) + .await; + entries.extend(ret.unwrap_or_default().entries); + + #[derive(tabled::Tabled)] + struct TableItem { + src: String, + dst: String, + start_time: String, + state: String, + transport_type: String, + } + + let table_rows = entries + .iter() + .map(|e| TableItem { + src: SocketAddr::from(e.src.unwrap_or_default()).to_string(), + dst: SocketAddr::from(e.dst.unwrap_or_default()).to_string(), + start_time: chrono::DateTime::::from_timestamp_millis( + (e.start_time * 1000) as i64, + ) + .unwrap() + .to_string(), + state: format!("{:?}", TcpProxyEntryState::try_from(e.state).unwrap()), + transport_type: format!( + "{:?}", + TcpProxyEntryTransportType::try_from(e.transport_type).unwrap() + ), + }) + .collect::>(); + + println!("{}", tabled::Table::new(table_rows).with(Style::modern())); + } } Ok(()) diff --git a/easytier/src/gateway/icmp_proxy.rs b/easytier/src/gateway/icmp_proxy.rs index 86e12b8..55156e2 100644 --- a/easytier/src/gateway/icmp_proxy.rs +++ b/easytier/src/gateway/icmp_proxy.rs @@ -6,6 +6,7 @@ use std::{ time::Duration, }; +use anyhow::Context; use pnet::packet::{ icmp::{self, echo_reply::MutableEchoReplyPacket, IcmpCode, IcmpTypes, MutableIcmpPacket}, ip::IpNextHeaderProtocols, @@ -212,7 +213,7 @@ impl IcmpProxy { Err(e) => { tracing::warn!("create icmp socket failed: {:?}", e); if !self.global_ctx.no_tun() { - return Err(e); + return Err(anyhow::anyhow!("create icmp socket failed: {:?}", e).into()); } } } @@ -281,10 +282,15 @@ impl IcmpProxy { dst_ip: Ipv4Addr, icmp_packet: &icmp::echo_request::EchoRequestPacket, ) -> Result<(), Error> { - self.socket.lock().unwrap().as_ref().unwrap().send_to( - icmp_packet.packet(), - &SocketAddrV4::new(dst_ip.into(), 0).into(), - )?; + self.socket + .lock() + .unwrap() + .as_ref() + .with_context(|| "icmp socket not created")? + .send_to( + icmp_packet.packet(), + &SocketAddrV4::new(dst_ip.into(), 0).into(), + )?; Ok(()) } diff --git a/easytier/src/gateway/kcp_proxy.rs b/easytier/src/gateway/kcp_proxy.rs index 2623044..6e3e7ad 100644 --- a/easytier/src/gateway/kcp_proxy.rs +++ b/easytier/src/gateway/kcp_proxy.rs @@ -1,13 +1,14 @@ use std::{ net::{IpAddr, Ipv4Addr, SocketAddr}, - sync::Arc, + sync::{Arc, Weak}, time::Duration, }; use anyhow::Context; use bytes::Bytes; +use dashmap::DashMap; use kcp_sys::{ - endpoint::{KcpEndpoint, KcpPacketReceiver}, + endpoint::{ConnId, KcpEndpoint, KcpPacketReceiver}, ffi_safe::KcpConfig, packet_def::KcpPacket, stream::KcpStream, @@ -31,7 +32,14 @@ use crate::{ global_ctx::{ArcGlobalCtx, GlobalCtx}, }, peers::{peer_manager::PeerManager, NicPacketFilter, PeerPacketFilter}, - proto::peer_rpc::KcpConnData, + proto::{ + cli::{ + ListTcpProxyEntryRequest, ListTcpProxyEntryResponse, TcpProxyEntry, TcpProxyEntryState, + TcpProxyEntryTransportType, TcpProxyRpc, + }, + peer_rpc::KcpConnData, + rpc_types::{self, controller::BaseController}, + }, tunnel::packet_def::{PacketType, PeerManagerHeader, ZCPacket}, }; @@ -106,8 +114,9 @@ pub struct NatDstKcpConnector { impl NatDstConnector for NatDstKcpConnector { type DstStream = KcpStream; - async fn connect(&self, nat_dst: SocketAddr) -> Result { + async fn connect(&self, src: SocketAddr, nat_dst: SocketAddr) -> Result { let conn_data = KcpConnData { + src: Some(src.into()), dst: Some(nat_dst.into()), }; @@ -153,9 +162,12 @@ impl NatDstConnector for NatDstKcpConnector { hdr: &PeerManagerHeader, _ipv4: &Ipv4Packet, ) -> bool { - // TODO: how to support net to net kcp proxy? return hdr.from_peer_id == hdr.to_peer_id; } + + fn transport_type(&self) -> TcpProxyEntryTransportType { + TcpProxyEntryTransportType::Kcp + } } #[derive(Clone)] @@ -191,15 +203,10 @@ impl NicPacketFilter for TcpProxyForKcpSrc { return true; } - let Some(my_ipv4) = self.0.get_global_ctx().get_ipv4() else { - return false; - }; - let data = zc_packet.payload(); let ip_packet = Ipv4Packet::new(data).unwrap(); if ip_packet.get_version() != 4 // TODO: how to support net to net kcp proxy? - || ip_packet.get_source() != my_ipv4.address() || ip_packet.get_next_level_protocol() != IpNextHeaderProtocols::Tcp || !self.check_dst_allow_kcp_input(&ip_packet.get_destination()).await { @@ -212,7 +219,7 @@ impl NicPacketFilter for TcpProxyForKcpSrc { && tcp_packet.get_flags() & TcpFlags::ACK == 0; if !is_syn && !self.0.is_tcp_proxy_connection(SocketAddr::new( - IpAddr::V4(my_ipv4.address()), + IpAddr::V4(ip_packet.get_source()), tcp_packet.get_source(), )) { @@ -272,11 +279,16 @@ impl KcpProxySrc { .await; self.tcp_proxy.0.start(false).await.unwrap(); } + + pub fn get_tcp_proxy(&self) -> Arc> { + self.tcp_proxy.0.clone() + } } pub struct KcpProxyDst { kcp_endpoint: Arc, peer_manager: Arc, + proxy_entries: Arc>, tasks: JoinSet<()>, } @@ -296,6 +308,7 @@ impl KcpProxyDst { Self { kcp_endpoint: Arc::new(kcp_endpoint), peer_manager, + proxy_entries: Arc::new(DashMap::new()), tasks, } } @@ -304,6 +317,7 @@ impl KcpProxyDst { async fn handle_one_in_stream( mut kcp_stream: KcpStream, global_ctx: ArcGlobalCtx, + proxy_entries: Arc>, ) -> Result<()> { let mut conn_data = kcp_stream.conn_data().clone(); let parsed_conn_data = KcpConnData::decode(&mut conn_data) @@ -316,6 +330,21 @@ impl KcpProxyDst { ))? .into(); + let conn_id = kcp_stream.conn_id(); + proxy_entries.insert( + conn_id, + TcpProxyEntry { + src: parsed_conn_data.src, + dst: parsed_conn_data.dst, + start_time: chrono::Local::now().timestamp() as u64, + state: TcpProxyEntryState::ConnectingDst.into(), + transport_type: TcpProxyEntryTransportType::Kcp.into(), + }, + ); + crate::defer! { + proxy_entries.remove(&conn_id); + } + if Some(dst_socket.ip()) == global_ctx.get_ipv4().map(|ip| IpAddr::V4(ip.address())) { dst_socket = format!("127.0.0.1:{}", dst_socket.port()).parse().unwrap(); } @@ -324,7 +353,13 @@ impl KcpProxyDst { let _g = global_ctx.net_ns.guard(); let connector = NatDstTcpConnector {}; - let mut ret = connector.connect(dst_socket).await?; + let mut ret = connector + .connect("0.0.0.0:0".parse().unwrap(), dst_socket) + .await?; + + if let Some(mut e) = proxy_entries.get_mut(&kcp_stream.conn_id()) { + e.state = TcpProxyEntryState::Connected.into(); + } copy_bidirectional(&mut ret, &mut kcp_stream).await?; Ok(()) @@ -333,6 +368,7 @@ impl KcpProxyDst { async fn run_accept_task(&mut self) { let kcp_endpoint = self.kcp_endpoint.clone(); let global_ctx = self.peer_manager.get_global_ctx().clone(); + let proxy_entries = self.proxy_entries.clone(); self.tasks.spawn(async move { while let Ok(conn) = kcp_endpoint.accept().await { let stream = KcpStream::new(&kcp_endpoint, conn) @@ -340,8 +376,9 @@ impl KcpProxyDst { .unwrap(); let global_ctx = global_ctx.clone(); + let proxy_entries = proxy_entries.clone(); tokio::spawn(async move { - let _ = Self::handle_one_in_stream(stream, global_ctx).await; + let _ = Self::handle_one_in_stream(stream, global_ctx, proxy_entries).await; }); } }); @@ -357,3 +394,30 @@ impl KcpProxyDst { .await; } } + +#[derive(Clone)] +pub struct KcpProxyDstRpcService(Weak>); + +impl KcpProxyDstRpcService { + pub fn new(kcp_proxy_dst: &KcpProxyDst) -> Self { + Self(Arc::downgrade(&kcp_proxy_dst.proxy_entries)) + } +} + +#[async_trait::async_trait] +impl TcpProxyRpc for KcpProxyDstRpcService { + type Controller = BaseController; + async fn list_tcp_proxy_entry( + &self, + _: BaseController, + _request: ListTcpProxyEntryRequest, // Accept request of type HelloRequest + ) -> std::result::Result { + let mut reply = ListTcpProxyEntryResponse::default(); + if let Some(tcp_proxy) = self.0.upgrade() { + for item in tcp_proxy.iter() { + reply.entries.push(item.value().clone()); + } + } + Ok(reply) + } +} diff --git a/easytier/src/gateway/tcp_proxy.rs b/easytier/src/gateway/tcp_proxy.rs index 3f88afb..57828ae 100644 --- a/easytier/src/gateway/tcp_proxy.rs +++ b/easytier/src/gateway/tcp_proxy.rs @@ -10,7 +10,7 @@ use pnet::packet::MutablePacket; use pnet::packet::Packet; use std::net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4}; use std::sync::atomic::{AtomicBool, AtomicU16}; -use std::sync::Arc; +use std::sync::{Arc, Weak}; use std::time::{Duration, Instant}; use tokio::io::{copy_bidirectional, AsyncRead, AsyncWrite}; use tokio::net::{TcpListener, TcpSocket, TcpStream}; @@ -24,6 +24,12 @@ use crate::common::join_joinset_background; use crate::peers::peer_manager::PeerManager; use crate::peers::{NicPacketFilter, PeerPacketFilter}; +use crate::proto::cli::{ + ListTcpProxyEntryRequest, ListTcpProxyEntryResponse, TcpProxyEntry, TcpProxyEntryState, + TcpProxyEntryTransportType, TcpProxyRpc, +}; +use crate::proto::rpc_types; +use crate::proto::rpc_types::controller::BaseController; use crate::tunnel::packet_def::{PacketType, PeerManagerHeader, ZCPacket}; use super::CidrSet; @@ -35,7 +41,7 @@ use super::tokio_smoltcp::{self, channel_device, Net, NetConfig}; pub(crate) trait NatDstConnector: Send + Sync + Clone + 'static { type DstStream: AsyncRead + AsyncWrite + Unpin + Send; - async fn connect(&self, dst: SocketAddr) -> Result; + async fn connect(&self, src: SocketAddr, dst: SocketAddr) -> Result; fn check_packet_from_peer_fast(&self, cidr_set: &CidrSet, global_ctx: &GlobalCtx) -> bool; fn check_packet_from_peer( &self, @@ -44,6 +50,7 @@ pub(crate) trait NatDstConnector: Send + Sync + Clone + 'static { hdr: &PeerManagerHeader, ipv4: &Ipv4Packet, ) -> bool; + fn transport_type(&self) -> TcpProxyEntryTransportType; } #[derive(Debug, Clone)] @@ -53,7 +60,7 @@ pub struct NatDstTcpConnector; impl NatDstConnector for NatDstTcpConnector { type DstStream = TcpStream; - async fn connect(&self, nat_dst: SocketAddr) -> Result { + async fn connect(&self, _src: SocketAddr, nat_dst: SocketAddr) -> Result { let socket = TcpSocket::new_v4().unwrap(); if let Err(e) = socket.set_nodelay(true) { tracing::warn!("set_nodelay failed, ignore it: {:?}", e); @@ -90,19 +97,13 @@ impl NatDstConnector for NatDstTcpConnector { true } + + fn transport_type(&self) -> TcpProxyEntryTransportType { + TcpProxyEntryTransportType::Tcp + } } -#[derive(Debug, Clone, Copy, PartialEq)] -enum NatDstEntryState { - // receive syn packet but not start connecting to dst - SynReceived, - // connecting to dst - ConnectingDst, - // connected to dst - Connected, - // connection closed - Closed, -} +type NatDstEntryState = TcpProxyEntryState; #[derive(Debug)] pub struct NatDstEntry { @@ -110,6 +111,7 @@ pub struct NatDstEntry { src: SocketAddr, dst: SocketAddr, start_time: Instant, + start_time_local: chrono::DateTime, tasks: Mutex>, state: AtomicCell, } @@ -121,10 +123,21 @@ impl NatDstEntry { src, dst, start_time: Instant::now(), + start_time_local: chrono::Local::now(), tasks: Mutex::new(JoinSet::new()), state: AtomicCell::new(NatDstEntryState::SynReceived), } } + + fn into_pb(&self, transport_type: TcpProxyEntryTransportType) -> TcpProxyEntry { + TcpProxyEntry { + src: Some(self.src.clone().into()), + dst: Some(self.dst.clone().into()), + start_time: self.start_time_local.timestamp() as u64, + state: self.state.load().into(), + transport_type: transport_type.into(), + } + } } enum ProxyTcpStream { @@ -644,7 +657,7 @@ impl TcpProxy { }; let _guard = global_ctx.net_ns.guard(); - let Ok(dst_tcp_stream) = connector.connect(nat_dst).await else { + let Ok(dst_tcp_stream) = connector.connect(nat_entry.src, nat_dst).await else { tracing::error!("connect to dst failed: {:?}", nat_entry); nat_entry.state.store(NatDstEntryState::Closed); Self::remove_entry_from_all_conn_map(conn_map, addr_conn_map, nat_entry); @@ -802,4 +815,45 @@ impl TcpProxy { pub fn is_tcp_proxy_connection(&self, src: SocketAddr) -> bool { self.syn_map.contains_key(&src) || self.addr_conn_map.contains_key(&src) } + + pub fn list_proxy_entries(&self) -> Vec { + let mut entries: Vec = Vec::new(); + let transport_type = self.connector.transport_type(); + for entry in self.syn_map.iter() { + entries.push(entry.value().as_ref().into_pb(transport_type)); + } + for entry in self.conn_map.iter() { + entries.push(entry.value().as_ref().into_pb(transport_type)); + } + entries + } +} + +#[derive(Clone)] +pub struct TcpProxyRpcService { + tcp_proxy: Weak>, +} + +#[async_trait::async_trait] +impl TcpProxyRpc for TcpProxyRpcService { + type Controller = BaseController; + async fn list_tcp_proxy_entry( + &self, + _: BaseController, + _request: ListTcpProxyEntryRequest, // Accept request of type HelloRequest + ) -> std::result::Result { + let mut reply = ListTcpProxyEntryResponse::default(); + if let Some(tcp_proxy) = self.tcp_proxy.upgrade() { + reply.entries = tcp_proxy.list_proxy_entries(); + } + Ok(reply) + } +} + +impl TcpProxyRpcService { + pub fn new(tcp_proxy: Arc>) -> Self { + Self { + tcp_proxy: Arc::downgrade(&tcp_proxy), + } + } } diff --git a/easytier/src/instance/instance.rs b/easytier/src/instance/instance.rs index c437403..7f1676b 100644 --- a/easytier/src/instance/instance.rs +++ b/easytier/src/instance/instance.rs @@ -17,8 +17,8 @@ use crate::connector::direct::DirectConnectorManager; use crate::connector::manual::{ConnectorManagerRpcService, ManualConnectorManager}; use crate::connector::udp_hole_punch::UdpHolePunchConnector; use crate::gateway::icmp_proxy::IcmpProxy; -use crate::gateway::kcp_proxy::{KcpProxyDst, KcpProxySrc}; -use crate::gateway::tcp_proxy::{NatDstTcpConnector, TcpProxy}; +use crate::gateway::kcp_proxy::{KcpProxyDst, KcpProxyDstRpcService, KcpProxySrc}; +use crate::gateway::tcp_proxy::{NatDstTcpConnector, TcpProxy, TcpProxyRpcService}; use crate::gateway::udp_proxy::UdpProxy; use crate::peer_center::instance::PeerCenterInstance; use crate::peers::peer_conn::PeerConnId; @@ -380,8 +380,6 @@ impl Instance { self.check_dhcp_ip_conflict(); } - self.run_rpc_server().await?; - if self.global_ctx.get_flags().enable_kcp_proxy { let src_proxy = KcpProxySrc::new(self.get_peer_manager()).await; src_proxy.start().await; @@ -419,6 +417,8 @@ impl Instance { #[cfg(feature = "socks5")] self.socks5_server.run().await?; + self.run_rpc_server().await?; + Ok(()) } @@ -541,6 +541,26 @@ impl Instance { s.registry() .register(VpnPortalRpcServer::new(vpn_portal_rpc), ""); + if let Some(ip_proxy) = self.ip_proxy.as_ref() { + s.registry().register( + TcpProxyRpcServer::new(TcpProxyRpcService::new(ip_proxy.tcp_proxy.clone())), + "tcp", + ); + } + if let Some(kcp_proxy) = self.kcp_proxy_src.as_ref() { + s.registry().register( + TcpProxyRpcServer::new(TcpProxyRpcService::new(kcp_proxy.get_tcp_proxy())), + "kcp_src", + ); + } + + if let Some(kcp_proxy) = self.kcp_proxy_dst.as_ref() { + s.registry().register( + TcpProxyRpcServer::new(KcpProxyDstRpcService::new(kcp_proxy)), + "kcp_dst", + ); + } + let _g = self.global_ctx.net_ns.guard(); Ok(s.serve().await.with_context(|| "rpc server start failed")?) } diff --git a/easytier/src/launcher.rs b/easytier/src/launcher.rs index 7f58fe4..b61bd06 100644 --- a/easytier/src/launcher.rs +++ b/easytier/src/launcher.rs @@ -256,7 +256,7 @@ impl EasyTierLauncher { fetch_node_info, )); if let Err(e) = ret { - error_msg.write().unwrap().replace(e.to_string()); + error_msg.write().unwrap().replace(format!("{:?}", e)); } instance_alive.store(false, std::sync::atomic::Ordering::Relaxed); notifier.notify_one(); diff --git a/easytier/src/proto/cli.proto b/easytier/src/proto/cli.proto index 110c688..f53e102 100644 --- a/easytier/src/proto/cli.proto +++ b/easytier/src/proto/cli.proto @@ -177,3 +177,39 @@ service VpnPortalRpc { rpc GetVpnPortalInfo(GetVpnPortalInfoRequest) returns (GetVpnPortalInfoResponse); } + +enum TcpProxyEntryTransportType { + TCP = 0; + KCP = 1; +} + +enum TcpProxyEntryState { + Unknown = 0; + // receive syn packet but not start connecting to dst + SynReceived = 1; + // connecting to dst + ConnectingDst = 2; + // connected to dst + Connected = 3; + // connection closed + Closed = 4; +} + +message TcpProxyEntry { + common.SocketAddr src = 1; + common.SocketAddr dst = 2; + uint64 start_time = 3; + TcpProxyEntryState state = 4; + TcpProxyEntryTransportType transport_type = 5; +} + +message ListTcpProxyEntryRequest {} + +message ListTcpProxyEntryResponse { + repeated TcpProxyEntry entries = 1; +} + +service TcpProxyRpc { + rpc ListTcpProxyEntry(ListTcpProxyEntryRequest) + returns (ListTcpProxyEntryResponse); +} diff --git a/easytier/src/proto/common.rs b/easytier/src/proto/common.rs index d1b9d38..a40e471 100644 --- a/easytier/src/proto/common.rs +++ b/easytier/src/proto/common.rs @@ -101,7 +101,11 @@ impl From for Ipv4Inet { impl From for cidr::Ipv4Inet { fn from(value: Ipv4Inet) -> Self { - cidr::Ipv4Inet::new(value.address.unwrap().into(), value.network_length as u8).unwrap() + cidr::Ipv4Inet::new( + value.address.unwrap_or_default().into(), + value.network_length as u8, + ) + .unwrap() } } @@ -168,6 +172,9 @@ impl From for SocketAddr { impl From for std::net::SocketAddr { fn from(value: SocketAddr) -> Self { + if value.ip.is_none() { + return "0.0.0.0:0".parse().unwrap(); + } match value.ip.unwrap() { socket_addr::Ip::Ipv4(ip) => std::net::SocketAddr::V4(std::net::SocketAddrV4::new( std::net::Ipv4Addr::from(ip), diff --git a/easytier/src/proto/peer_rpc.proto b/easytier/src/proto/peer_rpc.proto index 3ff6baa..7f3e2a2 100644 --- a/easytier/src/proto/peer_rpc.proto +++ b/easytier/src/proto/peer_rpc.proto @@ -207,5 +207,6 @@ message HandshakeRequest { } message KcpConnData { + common.SocketAddr src = 1; common.SocketAddr dst = 4; }