bugfix before release 11x (#130)

* use correct i18n hook

* fix peer rpc panic

make sure server use correct transact id

* fix dhcp

recreate tun device after ip changed

* use upx correctly

* compile arm & armv7

* prepare to release v1.1.0
This commit is contained in:
Sijie.Sun
2024-06-03 23:07:44 +08:00
committed by GitHub
parent c1b725e64e
commit df17a7bb68
13 changed files with 470 additions and 336 deletions

View File

@@ -1,4 +1,3 @@
use std::borrow::BorrowMut;
use std::collections::HashSet;
use std::net::Ipv4Addr;
use std::pin::Pin;
@@ -82,16 +81,216 @@ impl IpProxy {
}
}
struct NicCtx {
global_ctx: ArcGlobalCtx,
peer_mgr: Weak<PeerManager>,
peer_packet_receiver: Arc<Mutex<PacketRecvChanReceiver>>,
nic: Arc<Mutex<virtual_nic::VirtualNic>>,
tasks: JoinSet<()>,
}
impl NicCtx {
fn new(
global_ctx: ArcGlobalCtx,
peer_manager: &Arc<PeerManager>,
peer_packet_receiver: Arc<Mutex<PacketRecvChanReceiver>>,
) -> Self {
NicCtx {
global_ctx: global_ctx.clone(),
peer_mgr: Arc::downgrade(&peer_manager),
peer_packet_receiver,
nic: Arc::new(Mutex::new(virtual_nic::VirtualNic::new(global_ctx))),
tasks: JoinSet::new(),
}
}
async fn assign_ipv4_to_tun_device(&self, ipv4_addr: Ipv4Addr) -> Result<(), Error> {
let nic = self.nic.lock().await;
nic.link_up().await?;
nic.remove_ip(None).await?;
nic.add_ip(ipv4_addr, 24).await?;
if cfg!(target_os = "macos") {
nic.add_route(ipv4_addr, 24).await?;
}
Ok(())
}
async fn do_forward_nic_to_peers_ipv4(ret: ZCPacket, mgr: &PeerManager) {
if let Some(ipv4) = Ipv4Packet::new(ret.payload()) {
if ipv4.get_version() != 4 {
tracing::info!("[USER_PACKET] not ipv4 packet: {:?}", ipv4);
return;
}
let dst_ipv4 = ipv4.get_destination();
tracing::trace!(
?ret,
"[USER_PACKET] recv new packet from tun device and forward to peers."
);
// TODO: use zero-copy
let send_ret = mgr.send_msg_ipv4(ret, dst_ipv4).await;
if send_ret.is_err() {
tracing::trace!(?send_ret, "[USER_PACKET] send_msg_ipv4 failed")
}
} else {
tracing::warn!(?ret, "[USER_PACKET] not ipv4 packet");
}
}
fn do_forward_nic_to_peers(
&mut self,
mut stream: Pin<Box<dyn ZCPacketStream>>,
) -> Result<(), Error> {
// read from nic and write to corresponding tunnel
let Some(mgr) = self.peer_mgr.upgrade() else {
return Err(anyhow::anyhow!("peer manager not available").into());
};
self.tasks.spawn(async move {
while let Some(ret) = stream.next().await {
if ret.is_err() {
log::error!("read from nic failed: {:?}", ret);
break;
}
Self::do_forward_nic_to_peers_ipv4(ret.unwrap(), mgr.as_ref()).await;
}
});
Ok(())
}
fn do_forward_peers_to_nic(&mut self, mut sink: Pin<Box<dyn ZCPacketSink>>) {
let channel = self.peer_packet_receiver.clone();
self.tasks.spawn(async move {
// unlock until coroutine finished
let mut channel = channel.lock().await;
while let Some(packet) = channel.recv().await {
tracing::trace!(
"[USER_PACKET] forward packet from peers to nic. packet: {:?}",
packet
);
let ret = sink.send(packet).await;
if ret.is_err() {
tracing::error!(?ret, "do_forward_tunnel_to_nic sink error");
}
}
});
}
async fn run_proxy_cidrs_route_updater(&mut self) -> Result<(), Error> {
let Some(peer_mgr) = self.peer_mgr.upgrade() else {
return Err(anyhow::anyhow!("peer manager not available").into());
};
let global_ctx = self.global_ctx.clone();
let net_ns = self.global_ctx.net_ns.clone();
let nic = self.nic.lock().await;
let ifcfg = nic.get_ifcfg();
let ifname = nic.ifname().to_owned();
self.tasks.spawn(async move {
let mut cur_proxy_cidrs = vec![];
loop {
let mut proxy_cidrs = vec![];
let routes = peer_mgr.list_routes().await;
for r in routes {
for cidr in r.proxy_cidrs {
let Ok(cidr) = cidr.parse::<cidr::Ipv4Cidr>() else {
continue;
};
proxy_cidrs.push(cidr);
}
}
// add vpn portal cidr to proxy_cidrs
if let Some(vpn_cfg) = global_ctx.config.get_vpn_portal_config() {
proxy_cidrs.push(vpn_cfg.client_cidr);
}
// if route is in cur_proxy_cidrs but not in proxy_cidrs, delete it.
for cidr in cur_proxy_cidrs.iter() {
if proxy_cidrs.contains(cidr) {
continue;
}
let _g = net_ns.guard();
let ret = ifcfg
.remove_ipv4_route(
ifname.as_str(),
cidr.first_address(),
cidr.network_length(),
)
.await;
if ret.is_err() {
tracing::trace!(
cidr = ?cidr,
err = ?ret,
"remove route failed.",
);
}
}
for cidr in proxy_cidrs.iter() {
if cur_proxy_cidrs.contains(cidr) {
continue;
}
let _g = net_ns.guard();
let ret = ifcfg
.add_ipv4_route(
ifname.as_str(),
cidr.first_address(),
cidr.network_length(),
)
.await;
if ret.is_err() {
tracing::trace!(
cidr = ?cidr,
err = ?ret,
"add route failed.",
);
}
}
cur_proxy_cidrs = proxy_cidrs;
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
});
Ok(())
}
async fn run(&mut self, ipv4_addr: Ipv4Addr) -> Result<(), Error> {
let tunnel = {
let mut nic = self.nic.lock().await;
let ret = nic.create_dev().await?;
self.global_ctx
.issue_event(GlobalCtxEvent::TunDeviceReady(nic.ifname().to_string()));
ret
};
let (stream, sink) = tunnel.split();
self.do_forward_nic_to_peers(stream)?;
self.do_forward_peers_to_nic(sink);
self.assign_ipv4_to_tun_device(ipv4_addr).await?;
self.run_proxy_cidrs_route_updater().await?;
Ok(())
}
}
type ArcNicCtx = Arc<Mutex<Option<NicCtx>>>;
pub struct Instance {
inst_name: String,
id: uuid::Uuid,
virtual_nic: Option<Arc<virtual_nic::VirtualNic>>,
peer_packet_receiver: Option<PacketRecvChanReceiver>,
nic_ctx: ArcNicCtx,
tasks: JoinSet<()>,
peer_packet_receiver: Arc<Mutex<PacketRecvChanReceiver>>,
peer_manager: Arc<PeerManager>,
listener_manager: Arc<Mutex<ListenerManager<PeerManager>>>,
conn_manager: Arc<ManualConnectorManager>,
@@ -153,8 +352,8 @@ impl Instance {
inst_name: global_ctx.inst_name.clone(),
id,
virtual_nic: None,
peer_packet_receiver: Some(peer_packet_receiver),
peer_packet_receiver: Arc::new(Mutex::new(peer_packet_receiver)),
nic_ctx: Arc::new(Mutex::new(None)),
tasks: JoinSet::new(),
peer_manager,
@@ -177,78 +376,6 @@ impl Instance {
self.conn_manager.clone()
}
async fn do_forward_nic_to_peers_ipv4(ret: ZCPacket, mgr: &PeerManager) {
if let Some(ipv4) = Ipv4Packet::new(ret.payload()) {
if ipv4.get_version() != 4 {
tracing::info!("[USER_PACKET] not ipv4 packet: {:?}", ipv4);
return;
}
let dst_ipv4 = ipv4.get_destination();
tracing::trace!(
?ret,
"[USER_PACKET] recv new packet from tun device and forward to peers."
);
// TODO: use zero-copy
let send_ret = mgr.send_msg_ipv4(ret, dst_ipv4).await;
if send_ret.is_err() {
tracing::trace!(?send_ret, "[USER_PACKET] send_msg_ipv4 failed")
}
} else {
tracing::warn!(?ret, "[USER_PACKET] not ipv4 packet");
}
}
// async fn do_forward_nic_to_peers_ethernet(mut ret: BytesMut, mgr: &PeerManager) {
// if let Some(eth) = EthernetPacket::new(&ret) {
// log::warn!("begin to forward: {:?}, type: {}", eth, eth.get_ethertype());
// Self::do_forward_nic_to_peers_ipv4(ret.split_off(14), mgr).await;
// } else {
// log::warn!("not ipv4 packet: {:?}", ret);
// }
// }
fn do_forward_nic_to_peers(
&mut self,
mut stream: Pin<Box<dyn ZCPacketStream>>,
) -> Result<(), Error> {
// read from nic and write to corresponding tunnel
let mgr = self.peer_manager.clone();
self.tasks.spawn(async move {
while let Some(ret) = stream.next().await {
if ret.is_err() {
log::error!("read from nic failed: {:?}", ret);
break;
}
Self::do_forward_nic_to_peers_ipv4(ret.unwrap(), mgr.as_ref()).await;
// Self::do_forward_nic_to_peers_ethernet(ret.into(), mgr.as_ref()).await;
}
});
Ok(())
}
fn do_forward_peers_to_nic(
tasks: &mut JoinSet<()>,
mut sink: Pin<Box<dyn ZCPacketSink>>,
channel: Option<PacketRecvChanReceiver>,
) {
tasks.spawn(async move {
let mut channel = channel.unwrap();
while let Some(packet) = channel.recv().await {
tracing::trace!(
"[USER_PACKET] forward packet from peers to nic. packet: {:?}",
packet
);
let ret = sink.send(packet).await;
if ret.is_err() {
tracing::error!(?ret, "do_forward_tunnel_to_nic sink error");
}
}
});
}
async fn add_initial_peers(&mut self) -> Result<(), Error> {
for peer in self.global_ctx.config.get_peers().iter() {
self.get_conn_manager()
@@ -258,35 +385,13 @@ impl Instance {
Ok(())
}
async fn prepare_tun_device(&mut self) -> Result<(), Error> {
let mut nic = virtual_nic::VirtualNic::new(self.get_global_ctx());
let tunnel = nic.create_dev().await?;
self.global_ctx
.issue_event(GlobalCtxEvent::TunDeviceReady(nic.ifname().to_string()));
let (stream, sink) = tunnel.split();
self.virtual_nic = Some(Arc::new(nic));
self.do_forward_nic_to_peers(stream).unwrap();
Self::do_forward_peers_to_nic(
self.tasks.borrow_mut(),
sink,
self.peer_packet_receiver.take(),
);
Ok(())
async fn clear_nic_ctx(arc_nic_ctx: ArcNicCtx) {
let _ = arc_nic_ctx.lock().await.take();
}
async fn assign_ipv4_to_tun_device(&mut self, ipv4_addr: Ipv4Addr) -> Result<(), Error> {
let nic = self.virtual_nic.as_ref().unwrap().clone();
nic.link_up().await?;
nic.remove_ip(None).await?;
nic.add_ip(ipv4_addr, 24).await?;
if cfg!(target_os = "macos") {
nic.add_route(ipv4_addr, 24).await?;
}
Ok(())
async fn use_new_nic_ctx(arc_nic_ctx: ArcNicCtx, nic_ctx: NicCtx) {
let mut g = arc_nic_ctx.lock().await;
*g = Some(nic_ctx);
}
// Warning, if there is an IP conflict in the network when using DHCP, the IP will be automatically changed.
@@ -294,7 +399,8 @@ impl Instance {
use rand::Rng;
let peer_manager_c = self.peer_manager.clone();
let global_ctx_c = self.get_global_ctx();
let nic_c = self.virtual_nic.as_ref().unwrap().clone();
let nic_ctx = self.nic_ctx.clone();
let peer_packet_receiver = self.peer_packet_receiver.clone();
tokio::spawn(async move {
let default_ipv4_addr = Ipv4Addr::new(10, 0, 0, 0);
let mut dhcp_ip: Option<Ipv4Inet> = None;
@@ -348,42 +454,30 @@ impl Instance {
if dhcp_ip != ipv4_addr {
let last_ip = dhcp_ip.map(|p| p.address());
tracing::debug!("last_ip: {:?}", last_ip);
let _ = nic_c.remove_ip(last_ip).await;
#[cfg(target_os = "macos")]
if last_ip.is_some() {
let _g = global_ctx_c.net_ns.guard();
let ret = nic_c
.get_ifcfg()
.remove_ipv4_route(nic_c.ifname(), last_ip.unwrap(), 24)
.await;
if ret.is_err() {
tracing::trace!(
cidr = 24,
err = ?ret,
"remove route failed.",
);
}
}
Self::clear_nic_ctx(nic_ctx.clone()).await;
if let Some(ip) = ipv4_addr {
let _ = nic_c.link_up().await;
let mut new_nic_ctx = NicCtx::new(
global_ctx_c.clone(),
&peer_manager_c,
peer_packet_receiver.clone(),
);
dhcp_ip = Some(ip);
tries = 1;
if let Err(e) = nic_c.add_ip(ip.address(), 24).await {
if let Err(e) = new_nic_ctx.run(ip.address()).await {
tracing::error!("add ip failed: {:?}", e);
global_ctx_c.set_ipv4(None);
let sleep: u64 = rand::thread_rng().gen_range(200..500);
tokio::time::sleep(std::time::Duration::from_millis(sleep)).await;
continue;
}
#[cfg(target_os = "macos")]
let _ = nic_c.add_route(ip.address(), 24).await;
global_ctx_c.set_ipv4(Some(ip.address()));
global_ctx_c.issue_event(GlobalCtxEvent::DhcpIpv4Changed(
last_ip,
Some(ip.address()),
));
Self::use_new_nic_ctx(nic_ctx.clone(), new_nic_ctx).await;
} else {
global_ctx_c.set_ipv4(None);
global_ctx_c.issue_event(GlobalCtxEvent::DhcpIpv4Conflicted(last_ip));
@@ -409,13 +503,15 @@ impl Instance {
self.peer_manager.run().await?;
if self.global_ctx.config.get_dhcp() {
self.prepare_tun_device().await?;
self.run_proxy_cidrs_route_updater();
self.check_dhcp_ip_conflict();
} else if let Some(ipv4_addr) = self.global_ctx.get_ipv4() {
self.prepare_tun_device().await?;
self.assign_ipv4_to_tun_device(ipv4_addr).await?;
self.run_proxy_cidrs_route_updater();
let mut new_nic_ctx = NicCtx::new(
self.global_ctx.clone(),
&self.peer_manager,
self.peer_packet_receiver.clone(),
);
new_nic_ctx.run(ipv4_addr).await?;
Self::use_new_nic_ctx(self.nic_ctx.clone(), new_nic_ctx).await;
}
self.run_rpc_server()?;
@@ -577,84 +673,6 @@ impl Instance {
Ok(())
}
fn run_proxy_cidrs_route_updater(&mut self) {
let peer_mgr = self.peer_manager.clone();
let global_ctx = self.global_ctx.clone();
let net_ns = self.global_ctx.net_ns.clone();
let nic = self.virtual_nic.as_ref().unwrap().clone();
let ifcfg = nic.get_ifcfg();
let ifname = nic.ifname().to_owned();
self.tasks.spawn(async move {
let mut cur_proxy_cidrs = vec![];
loop {
let mut proxy_cidrs = vec![];
let routes = peer_mgr.list_routes().await;
for r in routes {
for cidr in r.proxy_cidrs {
let Ok(cidr) = cidr.parse::<cidr::Ipv4Cidr>() else {
continue;
};
proxy_cidrs.push(cidr);
}
}
// add vpn portal cidr to proxy_cidrs
if let Some(vpn_cfg) = global_ctx.config.get_vpn_portal_config() {
proxy_cidrs.push(vpn_cfg.client_cidr);
}
// if route is in cur_proxy_cidrs but not in proxy_cidrs, delete it.
for cidr in cur_proxy_cidrs.iter() {
if proxy_cidrs.contains(cidr) {
continue;
}
let _g = net_ns.guard();
let ret = ifcfg
.remove_ipv4_route(
ifname.as_str(),
cidr.first_address(),
cidr.network_length(),
)
.await;
if ret.is_err() {
tracing::trace!(
cidr = ?cidr,
err = ?ret,
"remove route failed.",
);
}
}
for cidr in proxy_cidrs.iter() {
if cur_proxy_cidrs.contains(cidr) {
continue;
}
let _g = net_ns.guard();
let ret = ifcfg
.add_ipv4_route(
ifname.as_str(),
cidr.first_address(),
cidr.network_length(),
)
.await;
if ret.is_err() {
tracing::trace!(
cidr = ?cidr,
err = ?ret,
"add route failed.",
);
}
}
cur_proxy_cidrs = proxy_cidrs;
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
});
}
pub fn get_global_ctx(&self) -> ArcGlobalCtx {
self.global_ctx.clone()
}

View File

@@ -260,9 +260,8 @@ impl VirtualNic {
Ok(self)
}
async fn create_dev_ret_err(&mut self) -> Result<Box<dyn Tunnel>, Error> {
async fn create_tun(&mut self) -> Result<AsyncDevice, Error> {
let mut config = Configuration::default();
let has_packet_info = cfg!(target_os = "macos");
config.layer(Layer::L3);
#[cfg(target_os = "linux")]
@@ -303,11 +302,12 @@ impl VirtualNic {
config.queues(self.queue_num);
config.up();
let dev = {
let _g = self.global_ctx.net_ns.guard();
create_as_async(&config)?
};
let _g = self.global_ctx.net_ns.guard();
Ok(create_as_async(&config)?)
}
async fn create_dev_ret_err(&mut self) -> Result<Box<dyn Tunnel>, Error> {
let dev = self.create_tun().await?;
let ifname = dev.get_ref().name()?;
self.ifcfg.wait_interface_show(ifname.as_str()).await?;
@@ -324,8 +324,8 @@ impl VirtualNic {
.await?;
}
let has_packet_info = cfg!(target_os = "macos");
let (a, b) = BiLock::new(dev);
let ft = TunnelWrapper::new(
TunStream::new(a, has_packet_info),
FramedWriter::new_with_converter(

View File

@@ -1,5 +1,8 @@
use std::{
sync::{atomic::AtomicU32, Arc},
sync::{
atomic::{AtomicBool, AtomicU32, Ordering},
Arc,
},
time::Instant,
};
@@ -40,11 +43,13 @@ type PacketSender = UnboundedSender<ZCPacket>;
struct PeerRpcEndPoint {
peer_id: PeerId,
packet_sender: PacketSender,
last_used: AtomicCell<Instant>,
create_time: AtomicCell<Instant>,
finished: Arc<AtomicBool>,
tasks: JoinSet<()>,
}
type PeerRpcEndPointCreator = Box<dyn Fn(PeerId) -> PeerRpcEndPoint + Send + Sync + 'static>;
type PeerRpcEndPointCreator =
Box<dyn Fn(PeerId, PeerRpcTransactId) -> PeerRpcEndPoint + Send + Sync + 'static>;
#[derive(Hash, Eq, PartialEq, Clone)]
struct PeerRpcClientCtxKey(PeerId, PeerRpcServiceId, PeerRpcTransactId);
@@ -55,8 +60,8 @@ pub struct PeerRpcManager {
tspt: Arc<Box<dyn PeerRpcManagerTransport>>,
service_registry: Arc<DashMap<PeerRpcServiceId, PeerRpcEndPointCreator>>,
peer_rpc_endpoints: Arc<DashMap<(PeerId, PeerRpcServiceId), PeerRpcEndPoint>>,
peer_rpc_endpoints: Arc<DashMap<PeerRpcClientCtxKey, PeerRpcEndPoint>>,
client_resp_receivers: Arc<DashMap<PeerRpcClientCtxKey, PacketSender>>,
transact_id: AtomicU32,
@@ -109,11 +114,19 @@ impl PacketMerger {
Some(tmpl_packet)
}
fn feed(&mut self, packet: ZCPacket) -> Result<Option<TaRpcPacket>, Error> {
fn feed(
&mut self,
packet: ZCPacket,
expected_tid: Option<PeerRpcTransactId>,
) -> Result<Option<TaRpcPacket>, Error> {
let payload = packet.payload();
let rpc_packet =
TaRpcPacket::decode(payload).map_err(|e| Error::MessageDecodeError(e.to_string()))?;
if expected_tid.is_some() && rpc_packet.transact_id != expected_tid.unwrap() {
return Ok(None);
}
let total_pieces = rpc_packet.total_pieces;
let piece_idx = rpc_packet.piece_idx;
@@ -176,11 +189,12 @@ impl PeerRpcManager {
S::Fut: Send + 'static,
{
let tspt = self.tspt.clone();
let creator = Box::new(move |peer_id: PeerId| {
let creator = Box::new(move |peer_id: PeerId, transact_id: PeerRpcTransactId| {
let mut tasks = JoinSet::new();
let (packet_sender, mut packet_receiver) = mpsc::unbounded_channel();
let (mut client_transport, server_transport) = tarpc::transport::channel::unbounded();
let server = tarpc::server::BaseChannel::with_defaults(server_transport);
let finished = Arc::new(AtomicBool::new(false));
let my_peer_id_clone = tspt.my_peer_id();
let peer_id_clone = peer_id.clone();
@@ -189,19 +203,13 @@ impl PeerRpcManager {
tasks.spawn(o);
let tspt = tspt.clone();
let finished_clone = finished.clone();
tasks.spawn(async move {
let mut cur_req_peer_id = None;
let mut cur_transact_id = 0;
let mut packet_merger = PacketMerger::new();
loop {
tokio::select! {
Some(resp) = client_transport.next() => {
let Some(cur_req_peer_id) = cur_req_peer_id.take() else {
tracing::error!("[PEER RPC MGR] cur_req_peer_id is none, ignore this resp");
continue;
};
tracing::debug!(resp = ?resp, "server recv packet from service provider");
tracing::debug!(resp = ?resp, ?transact_id, ?peer_id, "server recv packet from service provider");
if resp.is_err() {
tracing::warn!(err = ?resp.err(),
"[PEER RPC MGR] client_transport in server side got channel error, ignore it.");
@@ -217,11 +225,11 @@ impl PeerRpcManager {
let msgs = Self::build_rpc_packet(
tspt.my_peer_id(),
cur_req_peer_id,
peer_id,
service_id,
cur_transact_id,
transact_id,
false,
serialized_resp.unwrap(),
serialized_resp.as_ref().unwrap(),
);
for msg in msgs {
@@ -230,11 +238,13 @@ impl PeerRpcManager {
break;
}
}
finished_clone.store(true, Ordering::Relaxed);
}
Some(packet) = packet_receiver.recv() => {
tracing::trace!("recv packet from peer, packet: {:?}", packet);
let info = match packet_merger.feed(packet) {
let info = match packet_merger.feed(packet, None) {
Err(e) => {
tracing::error!(error = ?e, "feed packet to merger failed");
continue;
@@ -247,10 +257,9 @@ impl PeerRpcManager {
}
};
cur_req_peer_id = Some(info.from_peer);
cur_transact_id = info.transact_id;
assert_eq!(info.service_id, service_id);
assert_eq!(info.from_peer, peer_id);
assert_eq!(info.transact_id, transact_id);
let decoded_ret = postcard::from_bytes(&info.content.as_slice());
if let Err(e) = decoded_ret {
@@ -279,7 +288,8 @@ impl PeerRpcManager {
return PeerRpcEndPoint {
peer_id,
packet_sender,
last_used: AtomicCell::new(Instant::now()),
create_time: AtomicCell::new(Instant::now()),
finished,
tasks,
};
// let resp = client_transport.next().await;
@@ -310,7 +320,7 @@ impl PeerRpcManager {
service_id: PeerRpcServiceId,
transact_id: PeerRpcTransactId,
is_req: bool,
content: Vec<u8>,
content: &Vec<u8>,
) -> Vec<ZCPacket> {
let mut ret = Vec::new();
let content_mtu = RPC_PACKET_CONTENT_MTU;
@@ -373,12 +383,18 @@ impl PeerRpcManager {
}
let endpoint = peer_rpc_endpoints
.entry((info.from_peer, info.service_id))
.entry(PeerRpcClientCtxKey(
info.from_peer,
info.service_id,
info.transact_id,
))
.or_insert_with(|| {
service_registry.get(&info.service_id).unwrap()(info.from_peer)
service_registry.get(&info.service_id).unwrap()(
info.from_peer,
info.transact_id,
)
});
endpoint.last_used.store(Instant::now());
endpoint.packet_sender.send(o).unwrap();
} else {
if let Some(a) = client_resp_receivers.get(&PeerRpcClientCtxKey(
@@ -400,29 +416,42 @@ impl PeerRpcManager {
let peer_rpc_endpoints = self.peer_rpc_endpoints.clone();
tokio::spawn(async move {
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(60)).await;
peer_rpc_endpoints.retain(|_, v| v.last_used.load().elapsed().as_secs() < 60);
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
peer_rpc_endpoints.retain(|_, v| {
v.create_time.load().elapsed().as_secs() < 30
&& !v.finished.load(Ordering::Relaxed)
});
}
});
}
#[tracing::instrument(skip(f))]
pub async fn do_client_rpc_scoped<CM, Req, RpcRet, Fut>(
pub async fn do_client_rpc_scoped<Resp, Req, RpcRet, Fut>(
&self,
service_id: PeerRpcServiceId,
dst_peer_id: PeerId,
f: impl FnOnce(UnboundedChannel<CM, Req>) -> Fut,
f: impl FnOnce(UnboundedChannel<Resp, Req>) -> Fut,
) -> RpcRet
where
CM: serde::Serialize + for<'a> serde::Deserialize<'a> + Send + Sync + 'static,
Req: serde::Serialize + for<'a> serde::Deserialize<'a> + Send + Sync + 'static,
Resp: serde::Serialize
+ for<'a> serde::Deserialize<'a>
+ Send
+ Sync
+ std::fmt::Debug
+ 'static,
Req: serde::Serialize
+ for<'a> serde::Deserialize<'a>
+ Send
+ Sync
+ std::fmt::Debug
+ 'static,
Fut: std::future::Future<Output = RpcRet>,
{
let mut tasks = JoinSet::new();
let (packet_sender, mut packet_receiver) = mpsc::unbounded_channel();
let (client_transport, server_transport) =
tarpc::transport::channel::unbounded::<CM, Req>();
tarpc::transport::channel::unbounded::<Resp, Req>();
let (mut server_s, mut server_r) = server_transport.split();
@@ -438,9 +467,9 @@ impl PeerRpcManager {
continue;
}
let a = postcard::to_allocvec(&a.unwrap());
if a.is_err() {
tracing::error!(error = ?a.err(), "bincode serialize failed");
let req = postcard::to_allocvec(&a.unwrap());
if req.is_err() {
tracing::error!(error = ?req.err(), "bincode serialize failed");
continue;
}
@@ -450,10 +479,10 @@ impl PeerRpcManager {
service_id,
transact_id,
true,
a.unwrap(),
req.as_ref().unwrap(),
);
tracing::debug!(?packets, "client send rpc packet to peer");
tracing::debug!(?packets, ?req, ?transact_id, "client send rpc packet to peer");
for packet in packets {
if let Err(e) = tspt.send(packet, dst_peer_id).await {
@@ -471,7 +500,7 @@ impl PeerRpcManager {
while let Some(packet) = packet_receiver.recv().await {
tracing::trace!("tunnel recv: {:?}", packet);
let info = match packet_merger.feed(packet) {
let info = match packet_merger.feed(packet, Some(transact_id)) {
Err(e) => {
tracing::error!(error = ?e, "feed packet to merger failed");
continue;
@@ -482,9 +511,11 @@ impl PeerRpcManager {
Ok(Some(info)) => info,
};
tracing::debug!(?info, "client recv rpc packet from peer");
let decoded = postcard::from_bytes(&info.content.as_slice());
tracing::debug!(?info, ?decoded, "client recv rpc packet from peer");
assert_eq!(info.transact_id, transact_id);
if let Err(e) = decoded {
tracing::error!(error = ?e, "decode rpc packet failed");
continue;
@@ -517,7 +548,7 @@ impl PeerRpcManager {
#[cfg(test)]
pub mod tests {
use std::{pin::Pin, sync::Arc};
use std::{pin::Pin, sync::Arc, time::Duration};
use futures::{SinkExt, StreamExt};
use tokio::sync::Mutex;
@@ -526,7 +557,10 @@ pub mod tests {
common::{error::Error, new_peer_id, PeerId},
peers::{
peer_rpc::PeerRpcManager,
tests::{connect_peer_manager, create_mock_peer_manager, wait_route_appear},
tests::{
connect_peer_manager, create_mock_peer_manager, wait_for_condition,
wait_route_appear,
},
},
tunnel::{
packet_def::ZCPacket, ring::create_ring_tunnel_pair, Tunnel, ZCPacketSink,
@@ -634,6 +668,12 @@ pub mod tests {
println!("ret: {:?}", ret);
assert_eq!(ret.unwrap(), format!("hello {}", msg));
wait_for_condition(
|| async { server_rpc_mgr.peer_rpc_endpoints.is_empty() },
Duration::from_secs(10),
)
.await;
}
#[tokio::test]
@@ -751,5 +791,11 @@ pub mod tests {
.await;
assert_eq!(ip_list.unwrap(), format!("hello_b {}", msg));
wait_for_condition(
|| async { peer_mgr_b.get_peer_rpc_mgr().peer_rpc_endpoints.is_empty() },
Duration::from_secs(10),
)
.await;
}
}