diff --git a/easytier-core/src/peers/peer_rpc.rs b/easytier-core/src/peers/peer_rpc.rs index daadb29..063b2aa 100644 --- a/easytier-core/src/peers/peer_rpc.rs +++ b/easytier-core/src/peers/peer_rpc.rs @@ -146,6 +146,11 @@ impl PeerRpcManager { } let info = info.unwrap(); + if info.from_peer != peer_id { + tracing::warn!("recv packet from peer, but peer_id not match, ignore it"); + continue; + } + assert_eq!(info.service_id, service_id); cur_req_uuid = Some(packet.from_peer.clone().into()); @@ -233,7 +238,7 @@ impl PeerRpcManager { } let endpoint = peer_rpc_endpoints - .entry((info.to_peer, info.service_id)) + .entry((info.from_peer, info.service_id)) .or_insert_with(|| { service_registry.get(&info.service_id).unwrap()(info.from_peer) }); @@ -431,10 +436,15 @@ mod tests { async fn test_rpc_with_peer_manager() { let peer_mgr_a = create_mock_peer_manager().await; let peer_mgr_b = create_mock_peer_manager().await; + let peer_mgr_c = create_mock_peer_manager().await; connect_peer_manager(peer_mgr_a.clone(), peer_mgr_b.clone()).await; + connect_peer_manager(peer_mgr_b.clone(), peer_mgr_c.clone()).await; wait_route_appear(peer_mgr_a.clone(), peer_mgr_b.my_node_id()) .await .unwrap(); + wait_route_appear(peer_mgr_a.clone(), peer_mgr_c.my_node_id()) + .await + .unwrap(); assert_eq!(peer_mgr_a.get_peer_map().list_peers().await.len(), 1); assert_eq!( @@ -442,6 +452,12 @@ mod tests { peer_mgr_b.my_node_id() ); + assert_eq!(peer_mgr_c.get_peer_map().list_peers().await.len(), 1); + assert_eq!( + peer_mgr_c.get_peer_map().list_peers().await[0], + peer_mgr_b.my_node_id() + ); + let s = MockService { prefix: "hello".to_owned(), }; @@ -455,9 +471,19 @@ mod tests { ret }) .await; - println!("ip_list: {:?}", ip_list); assert_eq!(ip_list.as_ref().unwrap(), "hello abc"); + + let ip_list = peer_mgr_c + .get_peer_rpc_mgr() + .do_client_rpc_scoped(1, peer_mgr_b.my_node_id(), |c| async { + let c = TestRpcServiceClient::new(tarpc::client::Config::default(), c).spawn(); + let ret = c.hello(tarpc::context::current(), "bcd".to_owned()).await; + ret + }) + .await; + println!("ip_list: {:?}", ip_list); + assert_eq!(ip_list.as_ref().unwrap(), "hello bcd"); } #[tokio::test]