support p2p-only mode (#1598)

This commit is contained in:
Sijie.Sun
2025-11-20 08:20:27 +08:00
committed by GitHub
parent 5b9ac65477
commit b44053f496
16 changed files with 165 additions and 3 deletions

View File

@@ -151,6 +151,9 @@ core_clap:
disable_p2p:
en: "disable p2p communication, will only relay packets with peers specified by --peers"
zh-CN: "禁用P2P通信只通过--peers指定的节点转发数据包"
p2p_only:
en: "only communicate with peers that already establish p2p connection"
zh-CN: "仅与已经建立P2P连接的对等节点通信"
disable_udp_hole_punching:
en: "disable udp hole punching"
zh-CN: "禁用UDP打洞功能"

View File

@@ -35,6 +35,7 @@ pub fn gen_default_flags() -> Flags {
use_smoltcp: false,
relay_network_whitelist: "*".to_string(),
disable_p2p: false,
p2p_only: false,
relay_all_peer_rpc: false,
disable_udp_hole_punching: false,
multi_thread: true,

View File

@@ -84,6 +84,7 @@ pub struct GlobalCtx {
enable_exit_node: bool,
proxy_forward_by_system: bool,
no_tun: bool,
p2p_only: bool,
feature_flags: AtomicCell<PeerFeatureFlag>,
@@ -138,6 +139,7 @@ impl GlobalCtx {
let enable_exit_node = config_fs.get_flags().enable_exit_node || cfg!(target_env = "ohos");
let proxy_forward_by_system = config_fs.get_flags().proxy_forward_by_system;
let no_tun = config_fs.get_flags().no_tun;
let p2p_only = config_fs.get_flags().p2p_only;
let feature_flags = PeerFeatureFlag {
kcp_input: !config_fs.get_flags().disable_kcp_input,
@@ -172,6 +174,7 @@ impl GlobalCtx {
enable_exit_node,
proxy_forward_by_system,
no_tun,
p2p_only,
feature_flags: AtomicCell::new(feature_flags),
quic_proxy_port: AtomicCell::new(None),
@@ -421,6 +424,15 @@ impl GlobalCtx {
.and_then(|acl_v1| acl_v1.group)
.map_or_else(Vec::new, |group| group.declares.to_vec())
}
pub fn p2p_only(&self) -> bool {
self.p2p_only
}
pub fn latency_first(&self) -> bool {
// NOTICE: p2p only is conflict with latency first
self.config.get_flags().latency_first && !self.p2p_only
}
}
#[cfg(test)]

View File

@@ -420,6 +420,15 @@ struct NetworkOptions {
)]
disable_p2p: Option<bool>,
#[arg(
long,
env = "ET_P2P_ONLY",
help = t!("core_clap.p2p_only").to_string(),
num_args = 0..=1,
default_missing_value = "true"
)]
p2p_only: Option<bool>,
#[arg(
long,
env = "ET_DISABLE_UDP_HOLE_PUNCHING",
@@ -934,6 +943,7 @@ impl NetworkOptions {
f.relay_network_whitelist = wl.join(" ");
}
f.disable_p2p = self.disable_p2p.unwrap_or(f.disable_p2p);
f.p2p_only = self.p2p_only.unwrap_or(f.p2p_only);
f.disable_udp_hole_punching = self
.disable_udp_hole_punching
.unwrap_or(f.disable_udp_hole_punching);

View File

@@ -275,7 +275,7 @@ impl IcmpProxy {
}
let peer_manager = self.peer_manager.clone();
let is_latency_first = self.global_ctx.get_flags().latency_first;
let is_latency_first = self.global_ctx.latency_first();
self.tasks.lock().await.spawn(
async move {
while let Some(mut msg) = receiver.recv().await {

View File

@@ -437,7 +437,7 @@ impl UdpProxy {
// forward packets to peer manager
let mut receiver = self.receiver.lock().await.take().unwrap();
let peer_manager = self.peer_manager.clone();
let is_latency_first = self.global_ctx.get_flags().latency_first;
let is_latency_first = self.global_ctx.latency_first();
self.tasks.lock().await.spawn(async move {
while let Some(mut msg) = receiver.recv().await {
let hdr = msg.mut_peer_manager_header().unwrap();

View File

@@ -700,6 +700,10 @@ impl NetworkConfig {
flags.disable_p2p = disable_p2p;
}
if let Some(p2p_only) = self.p2p_only {
flags.p2p_only = p2p_only;
}
if let Some(bind_device) = self.bind_device {
flags.bind_device = bind_device;
}
@@ -874,6 +878,7 @@ impl NetworkConfig {
result.disable_quic_input = Some(flags.disable_quic_input);
result.quic_listen_port = Some(flags.quic_listen_port as i32);
result.disable_p2p = Some(flags.disable_p2p);
result.p2p_only = Some(flags.p2p_only);
result.bind_device = Some(flags.bind_device);
result.no_tun = Some(flags.no_tun);
result.enable_exit_node = Some(flags.enable_exit_node);
@@ -1109,6 +1114,7 @@ mod tests {
flags.enable_quic_proxy = rng.gen_bool(0.5);
flags.disable_quic_input = rng.gen_bool(0.3);
flags.disable_p2p = rng.gen_bool(0.2);
flags.p2p_only = rng.gen_bool(0.2);
flags.bind_device = rng.gen_bool(0.3);
flags.no_tun = rng.gen_bool(0.1);
flags.enable_exit_node = rng.gen_bool(0.4);

View File

@@ -997,11 +997,20 @@ impl PeerManager {
}
}
fn check_p2p_only_before_send(&self, dst_peer_id: PeerId) -> Result<(), Error> {
if self.global_ctx.p2p_only() && !self.peers.has_peer(dst_peer_id) {
return Err(Error::RouteError(None));
}
Ok(())
}
pub async fn send_msg_for_proxy(
&self,
mut msg: ZCPacket,
dst_peer_id: PeerId,
) -> Result<(), Error> {
self.check_p2p_only_before_send(dst_peer_id)?;
self.self_tx_counters
.compress_tx_bytes_before
.add(msg.buf_len() as u64);
@@ -1199,7 +1208,7 @@ impl PeerManager {
.compress_tx_bytes_after
.add(msg.buf_len() as u64);
let is_latency_first = self.global_ctx.get_flags().latency_first;
let is_latency_first = self.global_ctx.latency_first();
msg.mut_peer_manager_header()
.unwrap()
.set_latency_first(is_latency_first)
@@ -1209,6 +1218,11 @@ impl PeerManager {
let mut msg = Some(msg);
let total_dst_peers = dst_peers.len();
for (i, peer_id) in dst_peers.iter().enumerate() {
if let Err(e) = self.check_p2p_only_before_send(*peer_id) {
errs.push(e);
continue;
}
let mut msg = if i == total_dst_peers - 1 {
msg.take().unwrap()
} else {

View File

@@ -76,6 +76,8 @@ message NetworkConfig {
repeated PortForwardConfig port_forwards = 48;
optional bool disable_sym_hole_punching = 49;
optional bool p2p_only = 51;
}
message PortForwardConfig {

View File

@@ -60,6 +60,8 @@ message FlagsInConfig {
// tld dns zone for magic dns
string tld_dns_zone = 31;
bool p2p_only = 32;
}
message RpcDescriptor {

View File

@@ -2,6 +2,7 @@
use core::panic;
use std::{
future::Future,
sync::{atomic::AtomicU32, Arc},
time::Duration,
};
@@ -1950,6 +1951,107 @@ pub async fn acl_rule_test_subnet_proxy(
drop_insts(insts).await;
}
async fn assert_panics_ext<F, Fut>(f: F, expect_panic: bool)
where
F: FnOnce() -> Fut + Send + 'static,
Fut: Future + Send + 'static,
{
// Run the async function in a separate task so panics surface as JoinError
let res = tokio::spawn(async move {
f().await;
})
.await;
if expect_panic {
assert!(
res.is_err() && res.as_ref().unwrap_err().is_panic(),
"Expected function to panic, but it didn't",
);
} else {
assert!(res.is_ok(), "Expected function not to panic, but it did");
}
}
#[rstest::rstest]
#[tokio::test]
#[serial_test::serial]
pub async fn p2p_only_test(
#[values(true, false)] has_p2p_conn: bool,
#[values(true, false)] enable_kcp_proxy: bool,
#[values(true, false)] enable_quic_proxy: bool,
) {
use crate::peers::tests::wait_route_appear_with_cost;
let insts = init_three_node_ex(
"udp",
|cfg| {
if cfg.get_inst_name() == "inst1" {
let mut flags = cfg.get_flags();
flags.enable_kcp_proxy = enable_kcp_proxy;
flags.enable_quic_proxy = enable_quic_proxy;
flags.disable_p2p = true;
flags.p2p_only = true;
cfg.set_flags(flags);
} else if cfg.get_inst_name() == "inst3" {
// 添加子网代理配置
cfg.add_proxy_cidr("10.1.2.0/24".parse().unwrap(), None)
.unwrap();
}
cfg
},
false,
)
.await;
if has_p2p_conn {
insts[2]
.get_conn_manager()
.add_connector(RingTunnelConnector::new(
format!("ring://{}", insts[0].id()).parse().unwrap(),
));
wait_route_appear_with_cost(
insts[2].get_peer_manager(),
insts[0].get_peer_manager().my_peer_id(),
Some(1),
)
.await
.unwrap();
}
let target_ip = "10.1.2.4";
for target_ip in ["10.144.144.3", target_ip] {
assert_panics_ext(
|| async {
subnet_proxy_test_icmp(target_ip).await;
},
!has_p2p_conn,
)
.await;
let listen_ip = if target_ip == "10.144.144.3" {
"0.0.0.0"
} else {
"10.1.2.4"
};
assert_panics_ext(
|| async {
subnet_proxy_test_tcp(listen_ip, target_ip).await;
},
!has_p2p_conn,
)
.await;
assert_panics_ext(
|| async {
subnet_proxy_test_udp(listen_ip, target_ip).await;
},
!has_p2p_conn,
)
.await;
}
}
#[rstest::rstest]
#[tokio::test]
#[serial_test::serial]