From 40b5fe9a540d5bb9ff552d41e1887119f45e08c8 Mon Sep 17 00:00:00 2001 From: "Sijie.Sun" Date: Sun, 15 Jun 2025 19:43:45 +0800 Subject: [PATCH] support quic proxy (#993) QUIC proxy works like kcp proxy, it can proxy TCP streams and transfer data with QUIC. QUIC has better congestion algorithm (BBR) for network with both high loss rate and high bandwidth. QUIC proxy can be enabled by passing `--enable-quic-proxy` to easytier in the client side. The proxy status can be viewed by `easytier-cli proxy`. --- .github/workflows/core.yml | 6 +- .github/workflows/test.yml | 3 +- Cargo.lock | 145 ++++-- .../frontend-lib/src/components/Config.vue | 43 +- easytier-web/frontend-lib/src/locales/cn.yaml | 6 + easytier-web/frontend-lib/src/locales/en.yaml | 6 + .../frontend-lib/src/types/network.ts | 4 + easytier/Cargo.toml | 6 +- easytier/locales/app.yml | 6 + easytier/src/common/config.rs | 4 +- easytier/src/common/global_ctx.rs | 11 + easytier/src/easytier-cli.rs | 26 +- easytier/src/easytier-core.rs | 20 + easytier/src/gateway/kcp_proxy.rs | 81 ++-- easytier/src/gateway/mod.rs | 2 + easytier/src/gateway/quic_proxy.rs | 443 ++++++++++++++++++ easytier/src/gateway/tcp_proxy.rs | 4 + easytier/src/instance/instance.rs | 35 ++ easytier/src/instance/listeners.rs | 2 + easytier/src/launcher.rs | 8 + easytier/src/peers/peer_map.rs | 11 +- easytier/src/peers/peer_ospf_route.rs | 9 +- easytier/src/peers/route_trait.rs | 10 +- easytier/src/proto/cli.proto | 1 + easytier/src/proto/common.proto | 9 + easytier/src/proto/peer_rpc.proto | 2 + easytier/src/proto/web.proto | 3 + easytier/src/tests/three_node.rs | 88 +++- easytier/src/tunnel/packet_def.rs | 18 + easytier/src/tunnel/quic.rs | 4 +- 30 files changed, 851 insertions(+), 165 deletions(-) create mode 100644 easytier/src/gateway/quic_proxy.rs diff --git a/.github/workflows/core.yml b/.github/workflows/core.yml index 3c7eaec..a10c5bb 100644 --- a/.github/workflows/core.yml +++ b/.github/workflows/core.yml @@ -175,14 +175,14 @@ jobs: fi if [[ $OS =~ ^ubuntu.*$ && $TARGET =~ ^mips.*$ ]]; then - cargo +nightly build -r --verbose --target $TARGET -Z build-std=std,panic_abort --no-default-features --features mips --package=easytier + cargo +nightly build -r --target $TARGET -Z build-std=std,panic_abort --package=easytier else if [[ $OS =~ ^windows.*$ ]]; then SUFFIX=.exe fi - cargo build --release --verbose --target $TARGET --package=easytier-web --features=embed + cargo build --release --target $TARGET --package=easytier-web --features=embed mv ./target/$TARGET/release/easytier-web"$SUFFIX" ./target/$TARGET/release/easytier-web-embed"$SUFFIX" - cargo build --release --verbose --target $TARGET + cargo build --release --target $TARGET fi # Copied and slightly modified from @lmq8267 (https://github.com/lmq8267) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 9faa67b..e935731 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -91,6 +91,7 @@ jobs: - name: Run tests run: | - sudo -E env "PATH=$PATH" cargo test --no-default-features --features=full --verbose -- --test-threads=1 --nocapture + sudo prlimit --pid $$ --nofile=1048576:1048576 + sudo -E env "PATH=$PATH" cargo test --no-default-features --features=full --verbose -- --test-threads=1 sudo chown -R $USER:$USER ./target sudo chown -R $USER:$USER ~/.cargo diff --git a/Cargo.lock b/Cargo.lock index fe3eecb..e3ceba8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2383,6 +2383,18 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "fastbloom" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "27cea6e7f512d43b098939ff4d5a5d6fe3db07971e1d05176fe26c642d33f5b8" +dependencies = [ + "getrandom 0.3.2", + "rand 0.9.1", + "siphasher 1.0.1", + "wide", +] + [[package]] name = "fastrand" version = "2.1.0" @@ -3923,20 +3935,6 @@ dependencies = [ "libc", ] -[[package]] -name = "jni" -version = "0.19.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6df18c2e3db7e453d3c6ac5b3e9d5182664d28788126d39b91f2d1e22b017ec" -dependencies = [ - "cesu8", - "combine", - "jni-sys", - "log", - "thiserror 1.0.63", - "walkdir", -] - [[package]] name = "jni" version = "0.21.1" @@ -4118,7 +4116,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4" dependencies = [ "cfg-if", - "windows-targets 0.48.5", + "windows-targets 0.52.6", ] [[package]] @@ -4254,6 +4252,12 @@ dependencies = [ "tracing-subscriber", ] +[[package]] +name = "lru-slab" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154" + [[package]] name = "mac" version = "0.1.1" @@ -4504,7 +4508,7 @@ dependencies = [ "openssl-probe", "openssl-sys", "schannel", - "security-framework", + "security-framework 2.11.1", "security-framework-sys", "tempfile", ] @@ -6043,38 +6047,45 @@ dependencies = [ [[package]] name = "quinn" -version = "0.11.3" +version = "0.11.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b22d8e7369034b9a7132bc2008cac12f2013c8132b45e0554e6e20e2617f2156" +checksum = "626214629cda6781b6dc1d316ba307189c85ba657213ce642d9c77670f8202c8" dependencies = [ "bytes", + "cfg_aliases", "pin-project-lite", "quinn-proto", "quinn-udp", "rustc-hash", "rustls", "socket2", - "thiserror 1.0.63", + "thiserror 2.0.11", "tokio", "tracing", + "web-time", ] [[package]] name = "quinn-proto" -version = "0.11.6" +version = "0.11.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba92fb39ec7ad06ca2582c0ca834dfeadcaf06ddfc8e635c80aa7e1c05315fdd" +checksum = "49df843a9161c85bb8aae55f101bc0bac8bcafd637a620d9122fd7e0b2f7422e" dependencies = [ "bytes", - "rand 0.8.5", + "fastbloom", + "getrandom 0.3.2", + "lru-slab", + "rand 0.9.1", "ring", "rustc-hash", "rustls", + "rustls-pki-types", "rustls-platform-verifier", "slab", - "thiserror 1.0.63", + "thiserror 2.0.11", "tinyvec", "tracing", + "web-time", ] [[package]] @@ -6694,9 +6705,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.12" +version = "0.23.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c58f8c84392efc0a126acce10fa59ff7b3d2ac06ab451a33f2741989b806b044" +checksum = "730944ca083c1c233a75c09f199e973ca499344a2b7ba9e755c457e86fb4a321" dependencies = [ "once_cell", "ring", @@ -6708,15 +6719,14 @@ dependencies = [ [[package]] name = "rustls-native-certs" -version = "0.7.1" +version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a88d6d420651b496bdd98684116959239430022a115c1240e6c3993be0b15fba" +checksum = "7fcff2dd52b58a8d98a70243663a0d234c4e2b79235637849d15913394a247d3" dependencies = [ "openssl-probe", - "rustls-pemfile", "rustls-pki-types", "schannel", - "security-framework", + "security-framework 3.2.0", ] [[package]] @@ -6733,26 +6743,29 @@ name = "rustls-pki-types" version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "917ce264624a4b4db1c364dcc35bfca9ded014d0a958cd47ad3e960e988ea51c" +dependencies = [ + "web-time", +] [[package]] name = "rustls-platform-verifier" -version = "0.3.3" +version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "93bda3f493b9abe5b93b3e7e3ecde0df292f2bd28c0296b90586ee0055ff5123" +checksum = "19787cda76408ec5404443dc8b31795c87cd8fec49762dc75fa727740d34acc1" dependencies = [ - "core-foundation 0.9.4", + "core-foundation 0.10.0", "core-foundation-sys", - "jni 0.19.0", + "jni", "log", "once_cell", "rustls", "rustls-native-certs", "rustls-platform-verifier-android", "rustls-webpki", - "security-framework", + "security-framework 3.2.0", "security-framework-sys", - "webpki-roots", - "winapi", + "webpki-root-certs 0.26.11", + "windows-sys 0.59.0", ] [[package]] @@ -6763,9 +6776,9 @@ checksum = "f87165f0995f63a9fbeea62b64d10b4d9d8e78ec6d7d51fb2125fda7bb36788f" [[package]] name = "rustls-webpki" -version = "0.102.6" +version = "0.103.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e6b52d4fda176fd835fdc55a835d4a89b8499cad995885a21149d5ad62f852e" +checksum = "e4a72fe2bcf7a6ac6fd7d0b9e5cb68aeb7d4c0a0271730218b3e92d43b4eb435" dependencies = [ "ring", "rustls-pki-types", @@ -7050,15 +7063,27 @@ dependencies = [ "core-foundation 0.9.4", "core-foundation-sys", "libc", - "num-bigint", + "security-framework-sys", +] + +[[package]] +name = "security-framework" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "271720403f46ca04f7ba6f55d438f8bd878d6b8ca0a1046e8228c4145bcbb316" +dependencies = [ + "bitflags 2.8.0", + "core-foundation 0.10.0", + "core-foundation-sys", + "libc", "security-framework-sys", ] [[package]] name = "security-framework-sys" -version = "2.11.1" +version = "2.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75da29fe9b9b08fe9d6b22b5b4bcbc75d8db3aa31e639aa56bb62e9d46bfceaf" +checksum = "49db231d56a190491cb4aeda9527f1ad45345af50b0851622a7adb8c03b01c32" dependencies = [ "core-foundation-sys", "libc", @@ -7985,7 +8010,7 @@ dependencies = [ "gdkx11-sys", "gtk", "instant", - "jni 0.21.1", + "jni", "lazy_static", "libc", "log", @@ -8047,7 +8072,7 @@ dependencies = [ "heck 0.5.0", "http", "image 0.25.2", - "jni 0.21.1", + "jni", "libc", "log", "mime", @@ -8288,7 +8313,7 @@ dependencies = [ "dpi", "gtk", "http", - "jni 0.21.1", + "jni", "raw-window-handle", "serde", "serde_json", @@ -8306,7 +8331,7 @@ checksum = "62fa2068e8498ad007b54d5773d03d57c3ff6dd96f8c8ce58beff44d0d5e0d30" dependencies = [ "gtk", "http", - "jni 0.21.1", + "jni", "log", "objc2", "objc2-app-kit", @@ -9445,6 +9470,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "web-time" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "webkit2gtk" version = "2.0.1" @@ -9499,6 +9534,24 @@ dependencies = [ "untrusted", ] +[[package]] +name = "webpki-root-certs" +version = "0.26.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75c7f0ef91146ebfb530314f5f1d24528d7f0767efbfd31dce919275413e393e" +dependencies = [ + "webpki-root-certs 1.0.0", +] + +[[package]] +name = "webpki-root-certs" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01a83f7e1a9f8712695c03eabe9ed3fbca0feff0152f33f12593e5a6303cb1a4" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "webpki-roots" version = "0.26.3" @@ -10099,7 +10152,7 @@ dependencies = [ "html5ever", "http", "javascriptcore-rs", - "jni 0.21.1", + "jni", "kuchikiki", "libc", "ndk", diff --git a/easytier-web/frontend-lib/src/components/Config.vue b/easytier-web/frontend-lib/src/components/Config.vue index e38fa3e..3805d3e 100644 --- a/easytier-web/frontend-lib/src/components/Config.vue +++ b/easytier-web/frontend-lib/src/components/Config.vue @@ -147,6 +147,8 @@ const bool_flags: BoolFlag[] = [ { field: 'use_smoltcp', help: 'use_smoltcp_help' }, { field: 'enable_kcp_proxy', help: 'enable_kcp_proxy_help' }, { field: 'disable_kcp_input', help: 'disable_kcp_input_help' }, + { field: 'enable_quic_proxy', help: 'enable_quic_proxy_help' }, + { field: 'disable_quic_input', help: 'disable_quic_input_help' }, { field: 'disable_p2p', help: 'disable_p2p_help' }, { field: 'bind_device', help: 'bind_device_help' }, { field: 'no_tun', help: 'no_tun_help' }, @@ -200,7 +202,7 @@ const bool_flags: BoolFlag[] = [
+ aria-describedby="network_secret-help" toggleMask :feedback="false" />
@@ -271,7 +273,7 @@ const bool_flags: BoolFlag[] = [
+ :placeholder="t('vpn_portal_client_network')" /> /{{ curNetwork.vpn_portal_client_network_len }} @@ -279,7 +281,7 @@ const bool_flags: BoolFlag[] = [
+ :min="0" :max="65535" fluid />
@@ -325,11 +327,10 @@ const bool_flags: BoolFlag[] = [
- +
- +
@@ -338,15 +339,15 @@ const bool_flags: BoolFlag[] = [
+ v-tooltip="t('relay_network_whitelist_help')">
- +
+ :placeholder="t('relay_network_whitelist')" class="w-full" multiple fluid + :suggestions="whitelistSuggestions" @complete="searchWhitelistSuggestions" />
@@ -359,12 +360,12 @@ const bool_flags: BoolFlag[] = [ + :on-label="t('off_text')" :off-label="t('on_text')" class="w-48" />
+ :placeholder="t('chips_placeholder', ['192.168.0.0/16'])" class="w-full" multiple fluid + :suggestions="inetSuggestions" @complete="searchInetSuggestions" />
@@ -377,11 +378,11 @@ const bool_flags: BoolFlag[] = [ + :on-label="t('off_text')" :off-label="t('on_text')" class="w-48" />
+ :format="false" :allow-empty="false" :min="0" :max="65535" class="w-full" />
@@ -394,8 +395,8 @@ const bool_flags: BoolFlag[] = [ + :placeholder="t('chips_placeholder', ['192.168.8.8'])" class="w-full" multiple fluid + :suggestions="exitNodesSuggestions" @complete="searchExitNodesSuggestions" /> @@ -406,8 +407,8 @@ const bool_flags: BoolFlag[] = [ + :placeholder="t('chips_placeholder', ['tcp://123.123.123.123:11223'])" class="w-full" multiple fluid + :suggestions="peerSuggestions" @complete="searchPeerSuggestions" /> diff --git a/easytier-web/frontend-lib/src/locales/cn.yaml b/easytier-web/frontend-lib/src/locales/cn.yaml index 9c3d241..492bea1 100644 --- a/easytier-web/frontend-lib/src/locales/cn.yaml +++ b/easytier-web/frontend-lib/src/locales/cn.yaml @@ -85,6 +85,12 @@ enable_kcp_proxy_help: 将 TCP 流量转为 KCP 流量,降低传输延迟, disable_kcp_input: 禁用 KCP 输入 disable_kcp_input_help: 禁用 KCP 入站流量,其他开启 KCP 代理的节点仍然使用 TCP 连接到本节点。 +enable_quic_proxy: 启用 QUIC 代理 +enable_quic_proxy_help: 将 TCP 流量转为 QUIC 流量,降低传输延迟,提升传输速度。 + +disable_quic_input: 禁用 QUIC 输入 +disable_quic_input_help: 禁用 QUIC 入站流量,其他开启 QUIC 代理的节点仍然使用 TCP 连接到本节点。 + disable_p2p: 禁用 P2P disable_p2p_help: 禁用 P2P 模式,所有流量通过手动指定的服务器中转。 diff --git a/easytier-web/frontend-lib/src/locales/en.yaml b/easytier-web/frontend-lib/src/locales/en.yaml index bf9629f..bfef6e5 100644 --- a/easytier-web/frontend-lib/src/locales/en.yaml +++ b/easytier-web/frontend-lib/src/locales/en.yaml @@ -84,6 +84,12 @@ enable_kcp_proxy_help: Convert TCP traffic to KCP traffic to reduce latency and disable_kcp_input: Disable KCP Input disable_kcp_input_help: Disable inbound KCP traffic, while nodes with KCP proxy enabled continue to connect using TCP. +enable_quic_proxy: Enable QUIC Proxy +enable_quic_proxy_help: Convert TCP traffic to QUIC traffic to reduce latency and boost transmission speed. + +disable_quic_input: Disable QUIC Input +disable_quic_input_help: Disable inbound QUIC traffic, while nodes with QUIC proxy enabled continue to connect using TCP. + disable_p2p: Disable P2P disable_p2p_help: Disable P2P mode; route all traffic through a manually specified relay server. diff --git a/easytier-web/frontend-lib/src/types/network.ts b/easytier-web/frontend-lib/src/types/network.ts index 421d61f..6487fc7 100644 --- a/easytier-web/frontend-lib/src/types/network.ts +++ b/easytier-web/frontend-lib/src/types/network.ts @@ -39,6 +39,8 @@ export interface NetworkConfig { use_smoltcp?: boolean enable_kcp_proxy?: boolean disable_kcp_input?: boolean + enable_quic_proxy?: boolean + disable_quic_input?: boolean disable_p2p?: boolean bind_device?: boolean no_tun?: boolean @@ -105,6 +107,8 @@ export function DEFAULT_NETWORK_CONFIG(): NetworkConfig { use_smoltcp: false, enable_kcp_proxy: false, disable_kcp_input: false, + enable_quic_proxy: false, + disable_quic_input: false, disable_p2p: false, bind_device: true, no_tun: false, diff --git a/easytier/Cargo.toml b/easytier/Cargo.toml index dddb50b..69d60ea 100644 --- a/easytier/Cargo.toml +++ b/easytier/Cargo.toml @@ -64,7 +64,7 @@ bytes = "1.5.0" pin-project-lite = "0.2.13" tachyonix = "0.3.0" -quinn = { version = "0.11.0", optional = true, features = ["ring"] } +quinn = { version = "0.11.8", optional = true, features = ["ring"] } rustls = { version = "0.23.0", features = [ "ring", ], default-features = false, optional = true } @@ -280,9 +280,8 @@ tokio-socks = "0.5.2" [features] -default = ["wireguard", "mimalloc", "websocket", "smoltcp", "tun", "socks5"] +default = ["wireguard", "mimalloc", "websocket", "smoltcp", "tun", "socks5", "quic"] full = [ - "quic", "websocket", "wireguard", "mimalloc", @@ -291,7 +290,6 @@ full = [ "tun", "socks5", ] -mips = ["aes-gcm", "mimalloc", "wireguard", "tun", "smoltcp", "socks5"] wireguard = ["dep:boringtun", "dep:ring"] quic = ["dep:quinn", "dep:rustls", "dep:rcgen"] mimalloc = ["dep:mimalloc"] diff --git a/easytier/locales/app.yml b/easytier/locales/app.yml index a3e985e..120ffc8 100644 --- a/easytier/locales/app.yml +++ b/easytier/locales/app.yml @@ -158,6 +158,12 @@ core_clap: disable_kcp_input: en: "do not allow other nodes to use kcp to proxy tcp streams to this node. when a node with kcp proxy enabled accesses this node, the original tcp connection is preserved." zh-CN: "不允许其他节点使用 KCP 代理 TCP 流到此节点。开启 KCP 代理的节点访问此节点时,依然使用原始 TCP 连接。" + enable_quic_proxy: + en: "proxy tcp streams with QUIC, improving the latency and throughput on the network with udp packet loss." + zh-CN: "使用 QUIC 代理 TCP 流,提高在 UDP 丢包网络上的延迟和吞吐量。" + disable_quic_input: + en: "do not allow other nodes to use QUIC to proxy tcp streams to this node. when a node with QUIC proxy enabled accesses this node, the original tcp connection is preserved." + zh-CN: "不允许其他节点使用 QUIC 代理 TCP 流到此节点。开启 QUIC 代理的节点访问此节点时,依然使用原始 TCP 连接。" port_forward: en: "forward local port to remote port in virtual network. e.g.: udp://0.0.0.0:12345/10.126.126.1:23456, means forward local udp port 12345 to 10.126.126.1:23456 in the virtual network. can specify multiple." zh-CN: "将本地端口转发到虚拟网络中的远程端口。例如:udp://0.0.0.0:12345/10.126.126.1:23456,表示将本地UDP端口12345转发到虚拟网络中的10.126.126.1:23456。可以指定多个。" diff --git a/easytier/src/common/config.rs b/easytier/src/common/config.rs index fee9f0e..2276687 100644 --- a/easytier/src/common/config.rs +++ b/easytier/src/common/config.rs @@ -39,6 +39,8 @@ pub fn gen_default_flags() -> Flags { disable_relay_kcp: true, accept_dns: false, private_mode: false, + enable_quic_proxy: false, + disable_quic_input: false, } } @@ -437,7 +439,7 @@ impl ConfigLoader for TomlConfigLoader { .as_ref() .unwrap() .iter() - .any(|c| c.cidr == cidr) + .any(|c| c.cidr == cidr && c.mapped_cidr == mapped_cidr) { locked_config .proxy_network diff --git a/easytier/src/common/global_ctx.rs b/easytier/src/common/global_ctx.rs index 556d068..266688e 100644 --- a/easytier/src/common/global_ctx.rs +++ b/easytier/src/common/global_ctx.rs @@ -75,6 +75,8 @@ pub struct GlobalCtx { no_tun: bool, feature_flags: AtomicCell, + + quic_proxy_port: AtomicCell>, } impl std::fmt::Debug for GlobalCtx { @@ -137,6 +139,7 @@ impl GlobalCtx { no_tun, feature_flags: AtomicCell::new(feature_flags), + quic_proxy_port: AtomicCell::new(None), } } @@ -281,6 +284,14 @@ impl GlobalCtx { pub fn set_feature_flags(&self, flags: PeerFeatureFlag) { self.feature_flags.store(flags); } + + pub fn get_quic_proxy_port(&self) -> Option { + self.quic_proxy_port.load() + } + + pub fn set_quic_proxy_port(&self, port: Option) { + self.quic_proxy_port.store(port); + } } #[cfg(test)] diff --git a/easytier/src/easytier-cli.rs b/easytier/src/easytier-cli.rs index 0c68771..996ea99 100644 --- a/easytier/src/easytier-cli.rs +++ b/easytier/src/easytier-cli.rs @@ -1083,7 +1083,8 @@ async fn main() -> Result<(), Error> { .iter() .map(|(k, v)| format!("{}: {:?}ms", k, v.latency_ms,)) .collect::>(); - let direct_peers: Vec<_> = v.direct_peers + let direct_peers: Vec<_> = v + .direct_peers .iter() .map(|(k, v)| DirectPeerItem { node_id: k.to_string(), @@ -1257,23 +1258,14 @@ async fn main() -> Result<(), Error> { } SubCommand::Proxy => { let mut entries = vec![]; - let client = handler.get_tcp_proxy_client("tcp").await?; - let ret = client - .list_tcp_proxy_entry(BaseController::default(), Default::default()) - .await; - entries.extend(ret.unwrap_or_default().entries); - let client = handler.get_tcp_proxy_client("kcp_src").await?; - let ret = client - .list_tcp_proxy_entry(BaseController::default(), Default::default()) - .await; - entries.extend(ret.unwrap_or_default().entries); - - let client = handler.get_tcp_proxy_client("kcp_dst").await?; - let ret = client - .list_tcp_proxy_entry(BaseController::default(), Default::default()) - .await; - entries.extend(ret.unwrap_or_default().entries); + for client_type in &["tcp", "kcp_src", "kcp_dst", "quic_src", "quic_dst"] { + let client = handler.get_tcp_proxy_client(client_type).await?; + let ret = client + .list_tcp_proxy_entry(BaseController::default(), Default::default()) + .await; + entries.extend(ret.unwrap_or_default().entries); + } if cli.verbose { println!("{}", serde_json::to_string_pretty(&entries)?); diff --git a/easytier/src/easytier-core.rs b/easytier/src/easytier-core.rs index 5184126..39d53f6 100644 --- a/easytier/src/easytier-core.rs +++ b/easytier/src/easytier-core.rs @@ -433,6 +433,24 @@ struct NetworkOptions { )] disable_kcp_input: Option, + #[arg( + long, + env = "ET_ENABLE_QUIC_PROXY", + help = t!("core_clap.enable_quic_proxy").to_string(), + num_args = 0..=1, + default_missing_value = "true" + )] + enable_quic_proxy: Option, + + #[arg( + long, + env = "ET_DISABLE_QUIC_INPUT", + help = t!("core_clap.disable_quic_input").to_string(), + num_args = 0..=1, + default_missing_value = "true" + )] + disable_quic_input: Option, + #[arg( long, env = "ET_PORT_FORWARD", @@ -773,6 +791,8 @@ impl NetworkOptions { f.bind_device = self.bind_device.unwrap_or(f.bind_device); f.enable_kcp_proxy = self.enable_kcp_proxy.unwrap_or(f.enable_kcp_proxy); f.disable_kcp_input = self.disable_kcp_input.unwrap_or(f.disable_kcp_input); + f.enable_quic_proxy = self.enable_quic_proxy.unwrap_or(f.enable_quic_proxy); + f.disable_quic_input = self.disable_quic_input.unwrap_or(f.disable_quic_input); f.accept_dns = self.accept_dns.unwrap_or(f.accept_dns); f.private_mode = self.private_mode.unwrap_or(f.private_mode); cfg.set_flags(f); diff --git a/easytier/src/gateway/kcp_proxy.rs b/easytier/src/gateway/kcp_proxy.rs index 9d73f6a..d7dd395 100644 --- a/easytier/src/gateway/kcp_proxy.rs +++ b/easytier/src/gateway/kcp_proxy.rs @@ -124,19 +124,16 @@ impl NatDstConnector for NatDstKcpConnector { return Err(anyhow::anyhow!("peer manager is not available").into()); }; - let (dst_peers, _) = match nat_dst { - SocketAddr::V4(addr) => { - let ip = addr.ip(); - peer_mgr.get_msg_dst_peer(&ip).await - } + let dst_peer_id = match nat_dst { + SocketAddr::V4(addr) => peer_mgr.get_peer_map().get_peer_id_by_ipv4(addr.ip()).await, SocketAddr::V6(_) => return Err(anyhow::anyhow!("ipv6 is not supported").into()), }; - tracing::trace!("kcp nat dst: {:?}, dst peers: {:?}", nat_dst, dst_peers); + let Some(dst_peer) = dst_peer_id else { + return Err(anyhow::anyhow!("no peer found for nat dst: {}", nat_dst).into()); + }; - if dst_peers.len() != 1 { - return Err(anyhow::anyhow!("no dst peer found for nat dst: {}", nat_dst).into()); - } + tracing::trace!("kcp nat dst: {:?}, dst peers: {:?}", nat_dst, dst_peer); let mut connect_tasks: JoinSet> = JoinSet::new(); let mut retry_remain = 5; @@ -167,7 +164,6 @@ impl NatDstConnector for NatDstKcpConnector { let kcp_endpoint = self.kcp_endpoint.clone(); let my_peer_id = peer_mgr.my_peer_id(); - let dst_peer = dst_peers[0]; let conn_data_clone = conn_data.clone(); connect_tasks.spawn(async move { @@ -200,7 +196,7 @@ impl NatDstConnector for NatDstKcpConnector { _ipv4: &Ipv4Packet, _real_dst_ip: &mut Ipv4Addr, ) -> bool { - return hdr.from_peer_id == hdr.to_peer_id; + return hdr.from_peer_id == hdr.to_peer_id && hdr.is_kcp_src_modified(); } fn transport_type(&self) -> TcpProxyEntryTransportType { @@ -211,32 +207,41 @@ impl NatDstConnector for NatDstKcpConnector { #[derive(Clone)] struct TcpProxyForKcpSrc(Arc>); -pub struct KcpProxySrc { - kcp_endpoint: Arc, - peer_manager: Arc, - - tcp_proxy: TcpProxyForKcpSrc, - tasks: JoinSet<()>, +#[async_trait::async_trait] +pub(crate) trait TcpProxyForKcpSrcTrait: Send + Sync + 'static { + type Connector: NatDstConnector; + fn get_tcp_proxy(&self) -> &Arc>; + async fn check_dst_allow_kcp_input(&self, dst_ip: &Ipv4Addr) -> bool; } -impl TcpProxyForKcpSrc { +#[async_trait::async_trait] +impl TcpProxyForKcpSrcTrait for TcpProxyForKcpSrc { + type Connector = NatDstKcpConnector; + + fn get_tcp_proxy(&self) -> &Arc> { + &self.0 + } + async fn check_dst_allow_kcp_input(&self, dst_ip: &Ipv4Addr) -> bool { let peer_map: Arc = self.0.get_peer_manager().get_peer_map(); let Some(dst_peer_id) = peer_map.get_peer_id_by_ipv4(dst_ip).await else { return false; }; - let Some(feature_flag) = peer_map.get_peer_feature_flag(dst_peer_id).await else { + let Some(peer_info) = peer_map.get_route_peer_info(dst_peer_id).await else { return false; }; - feature_flag.kcp_input + peer_info.feature_flag.map(|x| x.kcp_input).unwrap_or(false) } } #[async_trait::async_trait] -impl NicPacketFilter for TcpProxyForKcpSrc { +impl> NicPacketFilter for T { async fn try_process_packet_from_nic(&self, zc_packet: &mut ZCPacket) -> bool { - let ret = self.0.try_process_packet_from_nic(zc_packet).await; + let ret = self + .get_tcp_proxy() + .try_process_packet_from_nic(zc_packet) + .await; if ret { return true; } @@ -263,29 +268,45 @@ impl NicPacketFilter for TcpProxyForKcpSrc { } } else { // if not syn packet, only allow established connection - if !self.0.is_tcp_proxy_connection(SocketAddr::new( - IpAddr::V4(ip_packet.get_source()), - tcp_packet.get_source(), - )) { + if !self + .get_tcp_proxy() + .is_tcp_proxy_connection(SocketAddr::new( + IpAddr::V4(ip_packet.get_source()), + tcp_packet.get_source(), + )) + { return false; } } - if let Some(my_ipv4) = self.0.get_global_ctx().get_ipv4() { + if let Some(my_ipv4) = self.get_tcp_proxy().get_global_ctx().get_ipv4() { // this is a net-to-net packet, only allow it when smoltcp is enabled // because the syn-ack packet will not be through and handled by the tun device when // the source ip is in the local network - if ip_packet.get_source() != my_ipv4.address() && !self.0.is_smoltcp_enabled() { + if ip_packet.get_source() != my_ipv4.address() + && !self.get_tcp_proxy().is_smoltcp_enabled() + { return false; } }; - zc_packet.mut_peer_manager_header().unwrap().to_peer_id = self.0.get_my_peer_id().into(); - + let hdr = zc_packet.mut_peer_manager_header().unwrap(); + hdr.to_peer_id = self.get_tcp_proxy().get_my_peer_id().into(); + if self.get_tcp_proxy().get_transport_type() == TcpProxyEntryTransportType::Kcp { + hdr.set_kcp_src_modified(true); + } true } } +pub struct KcpProxySrc { + kcp_endpoint: Arc, + peer_manager: Arc, + + tcp_proxy: TcpProxyForKcpSrc, + tasks: JoinSet<()>, +} + impl KcpProxySrc { pub async fn new(peer_manager: Arc) -> Self { let mut kcp_endpoint = create_kcp_endpoint(); diff --git a/easytier/src/gateway/mod.rs b/easytier/src/gateway/mod.rs index a3dfc7b..c8d8af9 100644 --- a/easytier/src/gateway/mod.rs +++ b/easytier/src/gateway/mod.rs @@ -18,6 +18,8 @@ pub mod socks5; pub mod kcp_proxy; +pub mod quic_proxy; + #[derive(Debug)] pub(crate) struct CidrSet { global_ctx: ArcGlobalCtx, diff --git a/easytier/src/gateway/quic_proxy.rs b/easytier/src/gateway/quic_proxy.rs new file mode 100644 index 0000000..e6c6f5d --- /dev/null +++ b/easytier/src/gateway/quic_proxy.rs @@ -0,0 +1,443 @@ +use std::net::{IpAddr, Ipv4Addr}; +use std::sync::{Arc, Mutex, Weak}; +use std::{net::SocketAddr, pin::Pin}; + +use anyhow::Context; +use dashmap::DashMap; +use pnet::packet::ipv4::Ipv4Packet; +use prost::Message as _; +use quinn::{Endpoint, Incoming}; +use tokio::io::{copy_bidirectional, AsyncRead, AsyncReadExt, AsyncWrite}; +use tokio::net::TcpStream; +use tokio::task::JoinSet; +use tokio::time::timeout; + +use crate::common::error::Result; +use crate::common::global_ctx::{ArcGlobalCtx, GlobalCtx}; +use crate::common::join_joinset_background; +use crate::defer; +use crate::gateway::kcp_proxy::TcpProxyForKcpSrcTrait; +use crate::gateway::tcp_proxy::{NatDstConnector, NatDstTcpConnector, TcpProxy}; +use crate::gateway::CidrSet; +use crate::peers::peer_manager::PeerManager; +use crate::proto::cli::{ + ListTcpProxyEntryRequest, ListTcpProxyEntryResponse, TcpProxyEntry, TcpProxyEntryState, + TcpProxyEntryTransportType, TcpProxyRpc, +}; +use crate::proto::common::ProxyDstInfo; +use crate::proto::rpc_types; +use crate::proto::rpc_types::controller::BaseController; +use crate::tunnel::packet_def::PeerManagerHeader; +use crate::tunnel::quic::{configure_client, make_server_endpoint}; + +pub struct QUICStream { + endpoint: Option, + connection: Option, + sender: quinn::SendStream, + receiver: quinn::RecvStream, +} + +impl AsyncRead for QUICStream { + fn poll_read( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll> { + let this = self.get_mut(); + Pin::new(&mut this.receiver).poll_read(cx, buf) + } +} + +impl AsyncWrite for QUICStream { + fn poll_write( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll> { + let this = self.get_mut(); + AsyncWrite::poll_write(Pin::new(&mut this.sender), cx, buf) + } + + fn poll_flush( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let this = self.get_mut(); + Pin::new(&mut this.sender).poll_flush(cx) + } + + fn poll_shutdown( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let this = self.get_mut(); + Pin::new(&mut this.sender).poll_shutdown(cx) + } +} + +#[derive(Debug, Clone)] +pub struct NatDstQUICConnector { + pub(crate) peer_mgr: Weak, +} + +#[async_trait::async_trait] +impl NatDstConnector for NatDstQUICConnector { + type DstStream = QUICStream; + + #[tracing::instrument(skip(self), level = "debug", name = "NatDstQUICConnector::connect")] + async fn connect(&self, src: SocketAddr, nat_dst: SocketAddr) -> Result { + let Some(peer_mgr) = self.peer_mgr.upgrade() else { + return Err(anyhow::anyhow!("peer manager is not available").into()); + }; + + let IpAddr::V4(dst_ipv4) = nat_dst.ip() else { + return Err(anyhow::anyhow!("src must be an IPv4 address").into()); + }; + + let Some(dst_peer) = peer_mgr.get_peer_map().get_peer_id_by_ipv4(&dst_ipv4).await else { + return Err(anyhow::anyhow!("no peer found for dst: {}", nat_dst).into()); + }; + + let Some(dst_peer_info) = peer_mgr.get_peer_map().get_route_peer_info(dst_peer).await + else { + return Err(anyhow::anyhow!("no peer info found for dst peer: {}", dst_peer).into()); + }; + + let Some(dst_ipv4): Option = dst_peer_info.ipv4_addr.map(Into::into) else { + return Err(anyhow::anyhow!("no ipv4 found for dst peer: {}", dst_peer).into()); + }; + + let Some(quic_port) = dst_peer_info.quic_port else { + return Err(anyhow::anyhow!("no quic port found for dst peer: {}", dst_peer).into()); + }; + + let mut endpoint = Endpoint::client("0.0.0.0:0".parse().unwrap()) + .with_context(|| format!("failed to create QUIC endpoint for src: {}", src))?; + endpoint.set_default_client_config(configure_client()); + + // connect to server + let connection = { + let _g = peer_mgr.get_global_ctx().net_ns.guard(); + endpoint + .connect( + SocketAddr::new(dst_ipv4.into(), quic_port as u16), + "localhost", + ) + .unwrap() + .await + .with_context(|| { + format!( + "failed to connect to NAT destination {} from {}, real dst: {}", + nat_dst, src, dst_ipv4 + ) + })? + }; + + let (mut w, r) = connection + .open_bi() + .await + .with_context(|| "open_bi failed")?; + + let proxy_dst_info = ProxyDstInfo { + dst_addr: Some(nat_dst.into()), + }; + let proxy_dst_info_buf = proxy_dst_info.encode_to_vec(); + let buf_len = proxy_dst_info_buf.len() as u8; + w.write(&buf_len.to_le_bytes()) + .await + .with_context(|| "failed to write proxy dst info buf len to QUIC stream")?; + w.write(&proxy_dst_info_buf) + .await + .with_context(|| "failed to write proxy dst info to QUIC stream")?; + + Ok(QUICStream { + endpoint: Some(endpoint), + connection: Some(connection), + sender: w, + receiver: r, + }) + } + + fn check_packet_from_peer_fast(&self, _cidr_set: &CidrSet, _global_ctx: &GlobalCtx) -> bool { + true + } + + fn check_packet_from_peer( + &self, + _cidr_set: &CidrSet, + _global_ctx: &GlobalCtx, + hdr: &PeerManagerHeader, + _ipv4: &Ipv4Packet, + _real_dst_ip: &mut Ipv4Addr, + ) -> bool { + return hdr.from_peer_id == hdr.to_peer_id && !hdr.is_kcp_src_modified(); + } + + fn transport_type(&self) -> TcpProxyEntryTransportType { + TcpProxyEntryTransportType::Quic + } +} + +#[derive(Clone)] +struct TcpProxyForQUICSrc(Arc>); + +#[async_trait::async_trait] +impl TcpProxyForKcpSrcTrait for TcpProxyForQUICSrc { + type Connector = NatDstQUICConnector; + + fn get_tcp_proxy(&self) -> &Arc> { + &self.0 + } + + async fn check_dst_allow_kcp_input(&self, dst_ip: &Ipv4Addr) -> bool { + let peer_map: Arc = + self.0.get_peer_manager().get_peer_map(); + let Some(dst_peer_id) = peer_map.get_peer_id_by_ipv4(dst_ip).await else { + return false; + }; + let Some(peer_info) = peer_map.get_route_peer_info(dst_peer_id).await else { + return false; + }; + let Some(quic_port) = peer_info.quic_port else { + return false; + }; + quic_port > 0 + } +} + +pub struct QUICProxySrc { + peer_manager: Arc, + tcp_proxy: TcpProxyForQUICSrc, +} + +impl QUICProxySrc { + pub async fn new(peer_manager: Arc) -> Self { + let tcp_proxy = TcpProxy::new( + peer_manager.clone(), + NatDstQUICConnector { + peer_mgr: Arc::downgrade(&peer_manager), + }, + ); + + Self { + peer_manager, + tcp_proxy: TcpProxyForQUICSrc(tcp_proxy), + } + } + + pub async fn start(&self) { + self.peer_manager + .add_nic_packet_process_pipeline(Box::new(self.tcp_proxy.clone())) + .await; + self.peer_manager + .add_packet_process_pipeline(Box::new(self.tcp_proxy.0.clone())) + .await; + self.tcp_proxy.0.start(false).await.unwrap(); + } + + pub fn get_tcp_proxy(&self) -> Arc> { + self.tcp_proxy.0.clone() + } +} + +pub struct QUICProxyDst { + global_ctx: Arc, + endpoint: Arc, + proxy_entries: Arc>, + tasks: Arc>>, +} + +impl QUICProxyDst { + pub fn new(global_ctx: ArcGlobalCtx) -> Result { + let _g = global_ctx.net_ns.guard(); + let (endpoint, _) = make_server_endpoint("0.0.0.0:0".parse().unwrap()) + .map_err(|e| anyhow::anyhow!("failed to create QUIC endpoint: {}", e))?; + let tasks = Arc::new(Mutex::new(JoinSet::new())); + join_joinset_background(tasks.clone(), "QUICProxyDst tasks".to_string()); + Ok(Self { + global_ctx, + endpoint: Arc::new(endpoint), + proxy_entries: Arc::new(DashMap::new()), + tasks, + }) + } + + pub async fn start(&self) -> Result<()> { + let endpoint = self.endpoint.clone(); + let tasks = Arc::downgrade(&self.tasks.clone()); + let ctx = self.global_ctx.clone(); + let cidr_set = Arc::new(CidrSet::new(ctx.clone())); + let proxy_entries = self.proxy_entries.clone(); + + let task = async move { + loop { + match endpoint.accept().await { + Some(conn) => { + let Some(tasks) = tasks.upgrade() else { + tracing::warn!( + "QUICProxyDst tasks is not available, stopping accept loop" + ); + return; + }; + tasks + .lock() + .unwrap() + .spawn(Self::handle_connection_with_timeout( + conn, + ctx.clone(), + cidr_set.clone(), + proxy_entries.clone(), + )); + } + None => { + return; + } + } + } + }; + + self.tasks.lock().unwrap().spawn(task); + + Ok(()) + } + + pub fn local_addr(&self) -> Result { + self.endpoint.local_addr().map_err(Into::into) + } + + async fn handle_connection_with_timeout( + conn: Incoming, + ctx: Arc, + cidr_set: Arc, + proxy_entries: Arc>, + ) { + let remote_addr = conn.remote_address(); + defer!( + proxy_entries.remove(&remote_addr); + ); + let ret = timeout( + std::time::Duration::from_secs(10), + Self::handle_connection(conn, ctx, cidr_set, remote_addr, proxy_entries.clone()), + ) + .await; + + match ret { + Ok(Ok((mut quic_stream, mut tcp_stream))) => { + let ret = copy_bidirectional(&mut quic_stream, &mut tcp_stream).await; + tracing::info!( + "QUIC connection handled, result: {:?}, remote addr: {:?}", + ret, + quic_stream.connection.as_ref().map(|c| c.remote_address()) + ); + } + Ok(Err(e)) => { + tracing::error!("Failed to handle QUIC connection: {}", e); + } + Err(_) => { + tracing::warn!("Timeout while handling QUIC connection"); + } + } + } + + async fn handle_connection( + incoming: Incoming, + ctx: ArcGlobalCtx, + cidr_set: Arc, + proxy_entry_key: SocketAddr, + proxy_entries: Arc>, + ) -> Result<(QUICStream, TcpStream)> { + let conn = incoming.await.with_context(|| "accept failed")?; + let addr = conn.remote_address(); + tracing::info!("Accepted QUIC connection from {}", addr); + let (w, mut r) = conn.accept_bi().await.with_context(|| "accept_bi failed")?; + let len = r + .read_u8() + .await + .with_context(|| "failed to read proxy dst info buf len")?; + let mut buf = vec![0u8; len as usize]; + r.read_exact(&mut buf) + .await + .with_context(|| "failed to read proxy dst info")?; + + let proxy_dst_info = + ProxyDstInfo::decode(&buf[..]).with_context(|| "failed to decode proxy dst info")?; + + let dst_socket: SocketAddr = proxy_dst_info + .dst_addr + .map(Into::into) + .ok_or_else(|| anyhow::anyhow!("no dst addr in proxy dst info"))?; + + let SocketAddr::V4(mut dst_socket) = dst_socket else { + return Err(anyhow::anyhow!("NAT destination must be an IPv4 address").into()); + }; + + let mut real_ip = *dst_socket.ip(); + if cidr_set.contains_v4(*dst_socket.ip(), &mut real_ip) { + dst_socket.set_ip(real_ip); + } + + if Some(*dst_socket.ip()) == ctx.get_ipv4().map(|ip| ip.address()) && ctx.no_tun() { + dst_socket = format!("127.0.0.1:{}", dst_socket.port()).parse().unwrap(); + } + + proxy_entries.insert( + proxy_entry_key, + TcpProxyEntry { + src: Some(addr.into()), + dst: Some(SocketAddr::V4(dst_socket).into()), + start_time: chrono::Local::now().timestamp() as u64, + state: TcpProxyEntryState::ConnectingDst.into(), + transport_type: TcpProxyEntryTransportType::Quic.into(), + }, + ); + + let connector = NatDstTcpConnector {}; + + let dst_stream = { + let _g = ctx.net_ns.guard(); + connector + .connect("0.0.0.0:0".parse().unwrap(), dst_socket.into()) + .await? + }; + + if let Some(mut e) = proxy_entries.get_mut(&proxy_entry_key) { + e.state = TcpProxyEntryState::Connected.into(); + } + + let quic_stream = QUICStream { + endpoint: None, + connection: Some(conn), + sender: w, + receiver: r, + }; + + Ok((quic_stream, dst_stream)) + } +} + +#[derive(Clone)] +pub struct QUICProxyDstRpcService(Weak>); + +impl QUICProxyDstRpcService { + pub fn new(quic_proxy_dst: &QUICProxyDst) -> Self { + Self(Arc::downgrade(&quic_proxy_dst.proxy_entries)) + } +} + +#[async_trait::async_trait] +impl TcpProxyRpc for QUICProxyDstRpcService { + type Controller = BaseController; + async fn list_tcp_proxy_entry( + &self, + _: BaseController, + _request: ListTcpProxyEntryRequest, // Accept request of type HelloRequest + ) -> std::result::Result { + let mut reply = ListTcpProxyEntryResponse::default(); + if let Some(tcp_proxy) = self.0.upgrade() { + for item in tcp_proxy.iter() { + reply.entries.push(item.value().clone()); + } + } + Ok(reply) + } +} diff --git a/easytier/src/gateway/tcp_proxy.rs b/easytier/src/gateway/tcp_proxy.rs index ab10c83..def3b73 100644 --- a/easytier/src/gateway/tcp_proxy.rs +++ b/easytier/src/gateway/tcp_proxy.rs @@ -902,6 +902,10 @@ impl TcpProxy { } entries } + + pub fn get_transport_type(&self) -> TcpProxyEntryTransportType { + self.connector.transport_type() + } } #[derive(Clone)] diff --git a/easytier/src/instance/instance.rs b/easytier/src/instance/instance.rs index 99cc12d..7360e0e 100644 --- a/easytier/src/instance/instance.rs +++ b/easytier/src/instance/instance.rs @@ -20,6 +20,7 @@ use crate::connector::manual::{ConnectorManagerRpcService, ManualConnectorManage use crate::connector::udp_hole_punch::UdpHolePunchConnector; use crate::gateway::icmp_proxy::IcmpProxy; use crate::gateway::kcp_proxy::{KcpProxyDst, KcpProxyDstRpcService, KcpProxySrc}; +use crate::gateway::quic_proxy::{QUICProxyDst, QUICProxyDstRpcService, QUICProxySrc}; use crate::gateway::tcp_proxy::{NatDstTcpConnector, TcpProxy, TcpProxyRpcService}; use crate::gateway::udp_proxy::UdpProxy; use crate::peer_center::instance::PeerCenterInstance; @@ -232,6 +233,9 @@ pub struct Instance { kcp_proxy_src: Option, kcp_proxy_dst: Option, + quic_proxy_src: Option, + quic_proxy_dst: Option, + peer_center: Arc, vpn_portal: Arc>>, @@ -312,6 +316,9 @@ impl Instance { kcp_proxy_src: None, kcp_proxy_dst: None, + quic_proxy_src: None, + quic_proxy_dst: None, + peer_center, vpn_portal: Arc::new(Mutex::new(Box::new(vpn_portal_inst))), @@ -562,6 +569,20 @@ impl Instance { self.kcp_proxy_dst = Some(dst_proxy); } + if self.global_ctx.get_flags().enable_quic_proxy { + let quic_src = QUICProxySrc::new(self.get_peer_manager()).await; + quic_src.start().await; + self.quic_proxy_src = Some(quic_src); + } + + if !self.global_ctx.get_flags().disable_quic_input { + let quic_dst = QUICProxyDst::new(self.global_ctx.clone())?; + quic_dst.start().await?; + self.global_ctx + .set_quic_proxy_port(Some(quic_dst.local_addr()?.port())); + self.quic_proxy_dst = Some(quic_dst); + } + // run after tun device created, so listener can bind to tun device, which may be required by win 10 self.ip_proxy = Some(IpProxy::new( self.get_global_ctx(), @@ -737,6 +758,20 @@ impl Instance { ); } + if let Some(quic_proxy) = self.quic_proxy_src.as_ref() { + s.registry().register( + TcpProxyRpcServer::new(TcpProxyRpcService::new(quic_proxy.get_tcp_proxy())), + "quic_src", + ); + } + + if let Some(quic_proxy) = self.quic_proxy_dst.as_ref() { + s.registry().register( + TcpProxyRpcServer::new(QUICProxyDstRpcService::new(quic_proxy)), + "quic_dst", + ); + } + s.set_hook(Arc::new(InstanceRpcServerHook::new( self.global_ctx.config.get_rpc_portal_whitelist(), ))); diff --git a/easytier/src/instance/listeners.rs b/easytier/src/instance/listeners.rs index 20af6ff..da03f99 100644 --- a/easytier/src/instance/listeners.rs +++ b/easytier/src/instance/listeners.rs @@ -142,6 +142,8 @@ impl ListenerManage if self.global_ctx.config.get_flags().enable_ipv6 && !is_url_host_ipv6(&l) && is_url_host_unspecified(&l) + // quic enables dual-stack by default, may conflict with v4 listener + && l.scheme() != "quic" { let mut ipv6_listener = l.clone(); ipv6_listener diff --git a/easytier/src/launcher.rs b/easytier/src/launcher.rs index 4f4b899..7133a7f 100644 --- a/easytier/src/launcher.rs +++ b/easytier/src/launcher.rs @@ -685,6 +685,14 @@ impl NetworkConfig { flags.disable_kcp_input = disable_kcp_input; } + if let Some(enable_quic_proxy) = self.enable_quic_proxy { + flags.enable_quic_proxy = enable_quic_proxy; + } + + if let Some(disable_quic_input) = self.disable_quic_input { + flags.disable_quic_input = disable_quic_input; + } + if let Some(disable_p2p) = self.disable_p2p { flags.disable_p2p = disable_p2p; } diff --git a/easytier/src/peers/peer_map.rs b/easytier/src/peers/peer_map.rs index f8cef8f..9ab1927 100644 --- a/easytier/src/peers/peer_map.rs +++ b/easytier/src/peers/peer_map.rs @@ -10,7 +10,7 @@ use crate::{ global_ctx::{ArcGlobalCtx, GlobalCtxEvent, NetworkIdentity}, PeerId, }, - proto::{cli::PeerConnInfo, common::PeerFeatureFlag}, + proto::{cli::PeerConnInfo, peer_rpc::RoutePeerInfo}, tunnel::{packet_def::ZCPacket, TunnelError}, }; @@ -194,12 +194,11 @@ impl PeerMap { None } - pub async fn get_peer_feature_flag(&self, peer_id: PeerId) -> Option { + pub async fn get_route_peer_info(&self, peer_id: PeerId) -> Option { for route in self.routes.read().await.iter() { - let feature_flag = route.get_feature_flag(peer_id).await; - if feature_flag.is_some() { - return feature_flag; - }; + if let Some(info) = route.get_peer_info(peer_id).await { + return Some(info); + } } None } diff --git a/easytier/src/peers/peer_ospf_route.rs b/easytier/src/peers/peer_ospf_route.rs index 5f92aab..f9d3adf 100644 --- a/easytier/src/peers/peer_ospf_route.rs +++ b/easytier/src/peers/peer_ospf_route.rs @@ -33,7 +33,7 @@ use crate::{ }, peers::route_trait::{Route, RouteInterfaceBox}, proto::{ - common::{Ipv4Inet, NatType, PeerFeatureFlag, StunInfo}, + common::{Ipv4Inet, NatType, StunInfo}, peer_rpc::{ route_foreign_network_infos, ForeignNetworkRouteInfoEntry, ForeignNetworkRouteInfoKey, OspfRouteRpc, OspfRouteRpcClientFactory, OspfRouteRpcServer, PeerIdVersion, @@ -124,6 +124,7 @@ impl RoutePeerInfo { feature_flag: None, peer_route_id: 0, network_length: 24, + quic_port: None, } } @@ -162,6 +163,8 @@ impl RoutePeerInfo { .get_ipv4() .map(|x| x.network_length() as u32) .unwrap_or(24), + + quic_port: global_ctx.get_quic_proxy_port().map(|x| x as u32), }; let need_update_periodically = if let Ok(Ok(d)) = @@ -2317,12 +2320,12 @@ impl Route for PeerRoute { .map(|x| *x) } - async fn get_feature_flag(&self, peer_id: PeerId) -> Option { + async fn get_peer_info(&self, peer_id: PeerId) -> Option { self.service_impl .route_table .peer_infos .get(&peer_id) - .and_then(|x| x.feature_flag.clone()) + .map(|x| x.clone()) } async fn get_peer_info_last_update_time(&self) -> Instant { diff --git a/easytier/src/peers/route_trait.rs b/easytier/src/peers/route_trait.rs index 2dd7b84..21f6083 100644 --- a/easytier/src/peers/route_trait.rs +++ b/easytier/src/peers/route_trait.rs @@ -4,11 +4,9 @@ use dashmap::DashMap; use crate::{ common::{global_ctx::NetworkIdentity, PeerId}, - proto::{ - common::PeerFeatureFlag, - peer_rpc::{ - ForeignNetworkRouteInfoEntry, ForeignNetworkRouteInfoKey, RouteForeignNetworkInfos, - }, + proto::peer_rpc::{ + ForeignNetworkRouteInfoEntry, ForeignNetworkRouteInfoKey, RouteForeignNetworkInfos, + RoutePeerInfo, }, }; @@ -107,7 +105,7 @@ pub trait Route { async fn set_route_cost_fn(&self, _cost_fn: RouteCostCalculator) {} - async fn get_feature_flag(&self, peer_id: PeerId) -> Option; + async fn get_peer_info(&self, peer_id: PeerId) -> Option; async fn get_peer_info_last_update_time(&self) -> std::time::Instant; diff --git a/easytier/src/proto/cli.proto b/easytier/src/proto/cli.proto index 53ac822..0eafeb2 100644 --- a/easytier/src/proto/cli.proto +++ b/easytier/src/proto/cli.proto @@ -187,6 +187,7 @@ service VpnPortalRpc { enum TcpProxyEntryTransportType { TCP = 0; KCP = 1; + QUIC = 2; } enum TcpProxyEntryState { diff --git a/easytier/src/proto/common.proto b/easytier/src/proto/common.proto index 5066d26..55db5d1 100644 --- a/easytier/src/proto/common.proto +++ b/easytier/src/proto/common.proto @@ -35,6 +35,11 @@ message FlagsInConfig { bool accept_dns = 22; // enable private mode bool private_mode = 23; + + // should we convert all tcp streams into quic streams + bool enable_quic_proxy = 24; + // does this peer allow quic input + bool disable_quic_input = 25; } message RpcDescriptor { @@ -171,3 +176,7 @@ message PortForwardConfigPb { SocketAddr dst_addr = 2; SocketType socket_type = 3; } + +message ProxyDstInfo { + SocketAddr dst_addr = 1; +} diff --git a/easytier/src/proto/peer_rpc.proto b/easytier/src/proto/peer_rpc.proto index 0e1fd96..0ac05ca 100644 --- a/easytier/src/proto/peer_rpc.proto +++ b/easytier/src/proto/peer_rpc.proto @@ -22,6 +22,8 @@ message RoutePeerInfo { uint64 peer_route_id = 12; uint32 network_length = 13; + + optional uint32 quic_port = 14; } message PeerIdVersion { diff --git a/easytier/src/proto/web.proto b/easytier/src/proto/web.proto index c909ea5..d290251 100644 --- a/easytier/src/proto/web.proto +++ b/easytier/src/proto/web.proto @@ -68,6 +68,9 @@ message NetworkConfig { optional bool enable_private_mode = 43; repeated string rpc_portal_whitelists = 44; + + optional bool enable_quic_proxy = 45; + optional bool disable_quic_input = 46; } message MyNodeInfo { diff --git a/easytier/src/tests/three_node.rs b/easytier/src/tests/three_node.rs index e308999..30353da 100644 --- a/easytier/src/tests/three_node.rs +++ b/easytier/src/tests/three_node.rs @@ -222,6 +222,8 @@ async fn ping_test(from_netns: &str, target_ip: &str, payload_size: Option &mut Self { + let mut flags = PeerManagerHeaderFlags::from_bits(self.flags).unwrap(); + if modified { + flags.insert(PeerManagerHeaderFlags::KCP_SRC_MODIFIED); + } else { + flags.remove(PeerManagerHeaderFlags::KCP_SRC_MODIFIED); + } + self.flags = flags.bits(); + self + } + + pub fn is_kcp_src_modified(&self) -> bool { + PeerManagerHeaderFlags::from_bits(self.flags) + .unwrap() + .contains(PeerManagerHeaderFlags::KCP_SRC_MODIFIED) + } } #[repr(C, packed)] diff --git a/easytier/src/tunnel/quic.rs b/easytier/src/tunnel/quic.rs index 20d6344..798a6cd 100644 --- a/easytier/src/tunnel/quic.rs +++ b/easytier/src/tunnel/quic.rs @@ -17,7 +17,7 @@ use super::{ IpVersion, Tunnel, TunnelConnector, TunnelError, TunnelListener, }; -fn configure_client() -> ClientConfig { +pub fn configure_client() -> ClientConfig { ClientConfig::new(Arc::new( QuicClientConfig::try_from(get_insecure_tls_client_config()).unwrap(), )) @@ -38,7 +38,7 @@ pub fn make_server_endpoint(bind_addr: SocketAddr) -> Result<(Endpoint, Vec) } /// Returns default server configuration along with its certificate. -fn configure_server() -> Result<(ServerConfig, Vec), Box> { +pub fn configure_server() -> Result<(ServerConfig, Vec), Box> { let (certs, key) = get_insecure_tls_cert(); let mut server_config = ServerConfig::with_single_cert(certs.clone(), key.into())?;