From a4af83e82d8009b335e9d0c9a9e3582aee261e84 Mon Sep 17 00:00:00 2001 From: "sijie.sun" Date: Thu, 21 Mar 2024 23:33:19 +0800 Subject: [PATCH] fix peer rpc and ospf route --- .github/workflows/rust.yml | 2 + easytier-core/src/peers/packet.rs | 5 ++- easytier-core/src/peers/peer_ospf_route.rs | 35 ++++++++++----- easytier-core/src/peers/peer_rpc.rs | 50 ++++++++++++++++++---- 4 files changed, 71 insertions(+), 21 deletions(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 051692f..172bdde 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -82,6 +82,8 @@ jobs: fi fi + # see https://github.com/rust-lang/rustup/issues/3709 + rustup set auto-self-update disable rustup install 1.75 rustup default 1.75 rustup target add $TARGET diff --git a/easytier-core/src/peers/packet.rs b/easytier-core/src/peers/packet.rs index 02d9ac2..bd50c58 100644 --- a/easytier-core/src/peers/packet.rs +++ b/easytier-core/src/peers/packet.rs @@ -71,7 +71,7 @@ pub enum CtrlPacketPayload { RoutePacket(RoutePacket), Ping(u32), Pong(u32), - TaRpc(u32, bool, Vec), // u32: service_id, bool: is_req, Vec: rpc body + TaRpc(u32, u32, bool, Vec), // u32: service_id, u32: transact_id, bool: is_req, Vec: rpc body } impl CtrlPacketPayload { @@ -206,10 +206,11 @@ impl Packet { from_peer: PeerId, to_peer: PeerId, service_id: u32, + transact_id: u32, is_req: bool, body: Vec, ) -> Self { - let ta_rpc = CtrlPacketPayload::TaRpc(service_id, is_req, body); + let ta_rpc = CtrlPacketPayload::TaRpc(service_id, transact_id, is_req, body); Packet::new( from_peer, to_peer, diff --git a/easytier-core/src/peers/peer_ospf_route.rs b/easytier-core/src/peers/peer_ospf_route.rs index 04c8cc0..97a8b58 100644 --- a/easytier-core/src/peers/peer_ospf_route.rs +++ b/easytier-core/src/peers/peer_ospf_route.rs @@ -1294,6 +1294,12 @@ mod tests { peer_mgr } + fn check_rpc_counter(route: &Arc, peer_id: PeerId, max_tx: u32, max_rx: u32) { + let (tx1, rx1) = get_rpc_counter(route, peer_id); + assert!(tx1 <= max_tx); + assert!(rx1 <= max_rx); + } + #[tokio::test] async fn ospf_route_2node() { let p_a = create_mock_pmgr().await; @@ -1378,14 +1384,21 @@ mod tests { } connect_peer_manager(p_a.clone(), p_c.clone()).await; - for r in vec![r_a.clone(), r_b.clone(), r_c.clone()].iter() { - // for full-connected 3 nodes, the sessions between them will be a cycle - wait_for_condition( - || async { r.service_impl.sessions.len() == 2 }, - Duration::from_secs(3), - ) - .await; - } + // for full-connected 3 nodes, the sessions between them may be a cycle or a line + wait_for_condition( + || async { + let mut lens = vec![ + r_a.service_impl.sessions.len(), + r_b.service_impl.sessions.len(), + r_c.service_impl.sessions.len(), + ]; + lens.sort(); + + lens == vec![1, 1, 2] || lens == vec![2, 2, 2] + }, + Duration::from_secs(3), + ) + .await; let p_d = create_mock_pmgr().await; let r_d = create_mock_route(p_d.clone()).await; @@ -1422,7 +1435,7 @@ mod tests { tokio::time::sleep(Duration::from_secs(2)).await; - assert!([(2, 2), (1, 1)].contains(&get_rpc_counter(&r_e, last_p.my_peer_id()))); + check_rpc_counter(&r_e, last_p.my_peer_id(), 2, 2); for r in all_route.iter() { if r.my_peer_id != last_p.my_peer_id() { @@ -1537,7 +1550,7 @@ mod tests { assert_eq!(1, r_b.list_routes().await.len()); - assert!([(2, 2), (1, 1)].contains(&get_rpc_counter(&r_a, p_b.my_peer_id()))); + check_rpc_counter(&r_a, p_b.my_peer_id(), 2, 2); p_a.get_peer_map() .close_peer(p_b.my_peer_id()) @@ -1561,6 +1574,6 @@ mod tests { tokio::time::sleep(Duration::from_secs(1)).await; println!("session: {:?}", r_a.session_mgr.dump_sessions()); - assert!([(2, 2), (1, 1)].contains(&get_rpc_counter(&r_a, p_b.my_peer_id()))); + check_rpc_counter(&r_a, p_b.my_peer_id(), 2, 2); } } diff --git a/easytier-core/src/peers/peer_rpc.rs b/easytier-core/src/peers/peer_rpc.rs index 70c7bb2..cfea4cd 100644 --- a/easytier-core/src/peers/peer_rpc.rs +++ b/easytier-core/src/peers/peer_rpc.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::sync::{atomic::AtomicU32, Arc}; use dashmap::DashMap; use futures::{SinkExt, StreamExt}; @@ -19,6 +19,7 @@ use crate::{ use super::packet::CtrlPacketPayload; type PeerRpcServiceId = u32; +type PeerRpcTransactId = u32; #[async_trait::async_trait] #[auto_impl::auto_impl(Arc)] @@ -38,7 +39,7 @@ struct PeerRpcEndPoint { type PeerRpcEndPointCreator = Box PeerRpcEndPoint + Send + Sync + 'static>; #[derive(Hash, Eq, PartialEq, Clone)] -struct PeerRpcClientCtxKey(PeerId, PeerRpcServiceId); +struct PeerRpcClientCtxKey(PeerId, PeerRpcServiceId, PeerRpcTransactId); // handle rpc request from one peer pub struct PeerRpcManager { @@ -50,6 +51,8 @@ pub struct PeerRpcManager { peer_rpc_endpoints: Arc>, client_resp_receivers: Arc>, + + transact_id: AtomicU32, } impl std::fmt::Debug for PeerRpcManager { @@ -65,6 +68,7 @@ struct TaRpcPacketInfo { from_peer: PeerId, to_peer: PeerId, service_id: PeerRpcServiceId, + transact_id: PeerRpcTransactId, is_req: bool, content: Vec, } @@ -80,6 +84,8 @@ impl PeerRpcManager { peer_rpc_endpoints: Arc::new(DashMap::new()), client_resp_receivers: Arc::new(DashMap::new()), + + transact_id: AtomicU32::new(0), } } @@ -107,6 +113,7 @@ impl PeerRpcManager { let tspt = tspt.clone(); tasks.spawn(async move { let mut cur_req_peer_id = None; + let mut cur_transact_id = 0; loop { tokio::select! { Some(resp) = client_transport.next() => { @@ -133,6 +140,7 @@ impl PeerRpcManager { tspt.my_peer_id(), cur_req_peer_id, service_id, + cur_transact_id, false, serialized_resp.unwrap(), ); @@ -161,6 +169,7 @@ impl PeerRpcManager { assert_eq!(info.service_id, service_id); cur_req_peer_id = Some(packet.from_peer.clone().into()); + cur_transact_id = info.transact_id; tracing::trace!("recv packet from peer, packet: {:?}", packet); @@ -213,10 +222,11 @@ impl PeerRpcManager { fn parse_rpc_packet(packet: &Packet) -> Result { let ctrl_packet_payload = CtrlPacketPayload::from_packet2(&packet); match &ctrl_packet_payload { - CtrlPacketPayload::TaRpc(id, is_req, body) => Ok(TaRpcPacketInfo { + CtrlPacketPayload::TaRpc(id, tid, is_req, body) => Ok(TaRpcPacketInfo { from_peer: packet.from_peer.into(), to_peer: packet.to_peer.into(), service_id: *id, + transact_id: *tid, is_req: *is_req, content: body.clone(), }), @@ -257,9 +267,11 @@ impl PeerRpcManager { endpoint.packet_sender.send(packet).unwrap(); } else { - if let Some(a) = client_resp_receivers - .get(&PeerRpcClientCtxKey(info.from_peer, info.service_id)) - { + if let Some(a) = client_resp_receivers.get(&PeerRpcClientCtxKey( + info.from_peer, + info.service_id, + info.transact_id, + )) { log::trace!("recv resp: {:?}", packet); if let Err(e) = a.send(packet) { tracing::error!(error = ?e, "send resp to client failed"); @@ -291,6 +303,10 @@ impl PeerRpcManager { let (mut server_s, mut server_r) = server_transport.split(); + let transact_id = self + .transact_id + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + let tspt = self.tspt.clone(); tasks.spawn(async move { while let Some(a) = server_r.next().await { @@ -309,6 +325,7 @@ impl PeerRpcManager { tspt.my_peer_id(), dst_peer_id, service_id, + transact_id, true, a.unwrap(), ); @@ -345,11 +362,16 @@ impl PeerRpcManager { tracing::warn!("[PEER RPC MGR] server packet read aborted"); }); + let key = PeerRpcClientCtxKey(dst_peer_id, service_id, transact_id); let _insert_ret = self .client_resp_receivers - .insert(PeerRpcClientCtxKey(dst_peer_id, service_id), packet_sender); + .insert(key.clone(), packet_sender); - f(client_transport).await + let ret = f(client_transport).await; + + self.client_resp_receivers.remove(&key); + + ret } pub fn my_peer_id(&self) -> PeerId { @@ -486,6 +508,18 @@ mod tests { println!("ip_list: {:?}", ip_list); assert_eq!(ip_list.as_ref().unwrap(), "hello abc"); + // call again + let ip_list = peer_mgr_a + .get_peer_rpc_mgr() + .do_client_rpc_scoped(1, peer_mgr_b.my_peer_id(), |c| async { + let c = TestRpcServiceClient::new(tarpc::client::Config::default(), c).spawn(); + let ret = c.hello(tarpc::context::current(), "abcd".to_owned()).await; + ret + }) + .await; + println!("ip_list: {:?}", ip_list); + assert_eq!(ip_list.as_ref().unwrap(), "hello abcd"); + let ip_list = peer_mgr_c .get_peer_rpc_mgr() .do_client_rpc_scoped(1, peer_mgr_b.my_peer_id(), |c| async {