simplify packet definition (#30)

This commit is contained in:
Sijie.Sun
2024-03-13 18:09:48 +08:00
committed by GitHub
parent 0053666dfb
commit b0494687b5
9 changed files with 26 additions and 52 deletions

View File

@@ -155,7 +155,7 @@ impl PeerPacketFilter for IcmpProxy {
return None; return None;
}; };
let ipv4 = Ipv4Packet::new(&x.data)?; let ipv4 = Ipv4Packet::new(&x)?;
if ipv4.get_version() != 4 || ipv4.get_next_level_protocol() != IpNextHeaderProtocols::Icmp if ipv4.get_version() != 4 || ipv4.get_next_level_protocol() != IpNextHeaderProtocols::Icmp
{ {

View File

@@ -88,7 +88,7 @@ impl PeerPacketFilter for TcpProxy {
return None; return None;
}; };
let ipv4 = Ipv4Packet::new(&x.data)?; let ipv4 = Ipv4Packet::new(&x)?;
if ipv4.get_version() != 4 || ipv4.get_next_level_protocol() != IpNextHeaderProtocols::Tcp { if ipv4.get_version() != 4 || ipv4.get_next_level_protocol() != IpNextHeaderProtocols::Tcp {
return None; return None;
} }
@@ -99,8 +99,8 @@ impl PeerPacketFilter for TcpProxy {
tracing::trace!(ipv4 = ?ipv4, cidr_set = ?self.cidr_set, "proxy tcp packet received"); tracing::trace!(ipv4 = ?ipv4, cidr_set = ?self.cidr_set, "proxy tcp packet received");
let mut packet_buffer = BytesMut::with_capacity(x.data.len()); let mut packet_buffer = BytesMut::with_capacity(x.len());
packet_buffer.extend_from_slice(&x.data.to_vec()); packet_buffer.extend_from_slice(&x.to_vec());
let (ip_buffer, tcp_buffer) = let (ip_buffer, tcp_buffer) =
packet_buffer.split_at_mut(ipv4.get_header_length() as usize * 4); packet_buffer.split_at_mut(ipv4.get_header_length() as usize * 4);

View File

@@ -246,7 +246,7 @@ impl PeerPacketFilter for UdpProxy {
return None; return None;
}; };
let ipv4 = Ipv4Packet::new(&x.data)?; let ipv4 = Ipv4Packet::new(&x)?;
if ipv4.get_version() != 4 || ipv4.get_next_level_protocol() != IpNextHeaderProtocols::Udp { if ipv4.get_version() != 4 || ipv4.get_next_level_protocol() != IpNextHeaderProtocols::Udp {
return None; return None;

View File

@@ -245,9 +245,7 @@ impl ForeignNetworkManager {
let from_peer_id = packet.from_peer.into(); let from_peer_id = packet.from_peer.into();
let to_peer_id = packet.to_peer.into(); let to_peer_id = packet.to_peer.into();
if to_peer_id == my_node_id { if to_peer_id == my_node_id {
if let ArchivedPacketBody::Ctrl(packet::ArchivedCtrlPacketBody::TaRpc(..)) = if let ArchivedPacketBody::TaRpc(..) = &packet.body {
&packet.body
{
rpc_sender.send(packet_bytes.clone()).unwrap(); rpc_sender.send(packet_bytes.clone()).unwrap();
continue; continue;
} }

View File

@@ -111,7 +111,8 @@ pub struct RoutePacket {
#[archive(compare(PartialEq), check_bytes)] #[archive(compare(PartialEq), check_bytes)]
// Derives can be passed through to the generated type: // Derives can be passed through to the generated type:
#[archive_attr(derive(Debug))] #[archive_attr(derive(Debug))]
pub enum CtrlPacketBody { pub enum PacketBody {
Data(Vec<u8>),
HandShake(HandShake), HandShake(HandShake),
RoutePacket(RoutePacket), RoutePacket(RoutePacket),
Ping(u32), Ping(u32),
@@ -119,23 +120,6 @@ pub enum CtrlPacketBody {
TaRpc(u32, bool, Vec<u8>), // u32: service_id, bool: is_req, Vec<u8>: rpc body TaRpc(u32, bool, Vec<u8>), // u32: service_id, bool: is_req, Vec<u8>: rpc body
} }
#[derive(Archive, Deserialize, Serialize, Debug)]
#[archive(compare(PartialEq), check_bytes)]
// Derives can be passed through to the generated type:
#[archive_attr(derive(Debug))]
pub struct DataPacketBody {
pub data: Vec<u8>,
}
#[derive(Archive, Deserialize, Serialize, Debug)]
#[archive(compare(PartialEq), check_bytes)]
// Derives can be passed through to the generated type:
#[archive_attr(derive(Debug))]
pub enum PacketBody {
Ctrl(CtrlPacketBody),
Data(DataPacketBody),
}
#[derive(Archive, Deserialize, Serialize, Debug)] #[derive(Archive, Deserialize, Serialize, Debug)]
#[archive(compare(PartialEq), check_bytes)] #[archive(compare(PartialEq), check_bytes)]
// Derives can be passed through to the generated type: // Derives can be passed through to the generated type:
@@ -163,13 +147,13 @@ impl Packet {
Packet { Packet {
from_peer: from_peer.into(), from_peer: from_peer.into(),
to_peer: 0, to_peer: 0,
body: PacketBody::Ctrl(CtrlPacketBody::HandShake(HandShake { body: PacketBody::HandShake(HandShake {
magic: MAGIC, magic: MAGIC,
my_peer_id: from_peer, my_peer_id: from_peer,
version: VERSION, version: VERSION,
features: Vec::new(), features: Vec::new(),
network_identity: network.clone().into(), network_identity: network.clone().into(),
})), }),
} }
} }
@@ -177,9 +161,7 @@ impl Packet {
Packet { Packet {
from_peer, from_peer,
to_peer, to_peer,
body: PacketBody::Data(DataPacketBody { body: PacketBody::Data(data.to_vec()),
data: data.to_vec(),
}),
} }
} }
@@ -187,10 +169,10 @@ impl Packet {
Packet { Packet {
from_peer, from_peer,
to_peer, to_peer,
body: PacketBody::Ctrl(CtrlPacketBody::RoutePacket(RoutePacket { body: PacketBody::RoutePacket(RoutePacket {
route_id, route_id,
body: data.to_vec(), body: data.to_vec(),
})), }),
} }
} }
@@ -198,7 +180,7 @@ impl Packet {
Packet { Packet {
from_peer, from_peer,
to_peer, to_peer,
body: PacketBody::Ctrl(CtrlPacketBody::Ping(seq)), body: PacketBody::Ping(seq),
} }
} }
@@ -206,7 +188,7 @@ impl Packet {
Packet { Packet {
from_peer, from_peer,
to_peer, to_peer,
body: PacketBody::Ctrl(CtrlPacketBody::Pong(seq)), body: PacketBody::Pong(seq),
} }
} }
@@ -220,7 +202,7 @@ impl Packet {
Packet { Packet {
from_peer, from_peer,
to_peer, to_peer,
body: PacketBody::Ctrl(CtrlPacketBody::TaRpc(service_id, is_req, body)), body: PacketBody::TaRpc(service_id, is_req, body),
} }
} }
} }

View File

@@ -36,7 +36,7 @@ use crate::{
}, },
}; };
use super::packet::{self, ArchivedCtrlPacketBody, ArchivedHandShake, Packet}; use super::packet::{self, ArchivedHandShake, Packet};
pub type PacketRecvChan = mpsc::Sender<Bytes>; pub type PacketRecvChan = mpsc::Sender<Bytes>;
@@ -167,9 +167,7 @@ impl PeerConnPinger {
loop { loop {
match receiver.recv().await { match receiver.recv().await {
Ok(p) => { Ok(p) => {
if let packet::ArchivedPacketBody::Ctrl( if let packet::ArchivedPacketBody::Pong(resp_seq) = &Packet::decode(&p).body
packet::ArchivedCtrlPacketBody::Pong(resp_seq),
) = &Packet::decode(&p).body
{ {
if *resp_seq == seq { if *resp_seq == seq {
break; break;
@@ -373,7 +371,7 @@ impl PeerConn {
let mut stream = self.tunnel.pin_stream(); let mut stream = self.tunnel.pin_stream();
let mut sink = self.tunnel.pin_sink(); let mut sink = self.tunnel.pin_sink();
wait_response!(stream, hs_req, packet::ArchivedPacketBody::Ctrl(ArchivedCtrlPacketBody::HandShake(x)) => x); wait_response!(stream, hs_req, packet::ArchivedPacketBody::HandShake(x) => x);
self.info = Some(PeerInfo::from(hs_req)); self.info = Some(PeerInfo::from(hs_req));
log::info!("handshake request: {:?}", hs_req); log::info!("handshake request: {:?}", hs_req);
@@ -396,7 +394,7 @@ impl PeerConn {
.run(|| packet::Packet::new_handshake(self.my_peer_id, &self.global_ctx.network)); .run(|| packet::Packet::new_handshake(self.my_peer_id, &self.global_ctx.network));
sink.send(hs_req.into()).await?; sink.send(hs_req.into()).await?;
wait_response!(stream, hs_rsp, packet::ArchivedPacketBody::Ctrl(ArchivedCtrlPacketBody::HandShake(x)) => x); wait_response!(stream, hs_rsp, packet::ArchivedPacketBody::HandShake(x) => x);
self.info = Some(PeerInfo::from(hs_rsp)); self.info = Some(PeerInfo::from(hs_rsp));
log::info!("handshake response: {:?}", hs_rsp); log::info!("handshake response: {:?}", hs_rsp);
@@ -423,7 +421,7 @@ impl PeerConn {
) -> Result<Bytes, TunnelError> { ) -> Result<Bytes, TunnelError> {
let packet = Packet::decode(&bytes_item); let packet = Packet::decode(&bytes_item);
match packet.body { match packet.body {
packet::ArchivedPacketBody::Ctrl(packet::ArchivedCtrlPacketBody::Ping(seq)) => { packet::ArchivedPacketBody::Ping(seq) => {
log::trace!("recv ping packet: {:?}", packet); log::trace!("recv ping packet: {:?}", packet);
Ok(build_ctrl_msg( Ok(build_ctrl_msg(
packet::Packet::new_pong_packet( packet::Packet::new_pong_packet(

View File

@@ -303,7 +303,7 @@ impl PeerManager {
if let packet::ArchivedPacketBody::Data(x) = &packet.body { if let packet::ArchivedPacketBody::Data(x) = &packet.body {
// TODO: use a function to get the body ref directly for zero copy // TODO: use a function to get the body ref directly for zero copy
self.nic_channel self.nic_channel
.send(extract_bytes_from_archived_vec(&data, &x.data)) .send(extract_bytes_from_archived_vec(&data, &x))
.await .await
.unwrap(); .unwrap();
Some(()) Some(())
@@ -333,9 +333,7 @@ impl PeerManager {
packet: &packet::ArchivedPacket, packet: &packet::ArchivedPacket,
data: &Bytes, data: &Bytes,
) -> Option<()> { ) -> Option<()> {
if let ArchivedPacketBody::Ctrl(packet::ArchivedCtrlPacketBody::TaRpc(..)) = if let ArchivedPacketBody::TaRpc(..) = &packet.body {
&packet.body
{
self.peer_rpc_tspt_sender.send(data.clone()).unwrap(); self.peer_rpc_tspt_sender.send(data.clone()).unwrap();
Some(()) Some(())
} else { } else {

View File

@@ -641,9 +641,7 @@ impl PeerPacketFilter for BasicRoute {
packet: &packet::ArchivedPacket, packet: &packet::ArchivedPacket,
data: &Bytes, data: &Bytes,
) -> Option<()> { ) -> Option<()> {
if let ArchivedPacketBody::Ctrl(packet::ArchivedCtrlPacketBody::RoutePacket(route_packet)) = if let ArchivedPacketBody::RoutePacket(route_packet) = &packet.body {
&packet.body
{
self.handle_route_packet( self.handle_route_packet(
packet.from_peer.into(), packet.from_peer.into(),
extract_bytes_from_archived_vec(&data, &route_packet.body), extract_bytes_from_archived_vec(&data, &route_packet.body),

View File

@@ -16,7 +16,7 @@ use crate::{
peers::packet::Packet, peers::packet::Packet,
}; };
use super::packet::{CtrlPacketBody, PacketBody}; use super::packet::PacketBody;
type PeerRpcServiceId = u32; type PeerRpcServiceId = u32;
@@ -207,7 +207,7 @@ impl PeerRpcManager {
fn parse_rpc_packet(packet: &Packet) -> Result<TaRpcPacketInfo, Error> { fn parse_rpc_packet(packet: &Packet) -> Result<TaRpcPacketInfo, Error> {
match &packet.body { match &packet.body {
PacketBody::Ctrl(CtrlPacketBody::TaRpc(id, is_req, body)) => Ok(TaRpcPacketInfo { PacketBody::TaRpc(id, is_req, body) => Ok(TaRpcPacketInfo {
from_peer: packet.from_peer.into(), from_peer: packet.from_peer.into(),
to_peer: packet.to_peer.into(), to_peer: packet.to_peer.into(),
service_id: *id, service_id: *id,