mirror of
https://mirror.suhoan.cn/https://github.com/EasyTier/EasyTier.git
synced 2025-12-12 04:37:23 +08:00
mpsc tunnel may be stuck by slow tcp stream, should not panic for this (#406)
* mpsc tunnel may be stuck by slow tcp stream, should not panic for this * disallow node connect to self
This commit is contained in:
@@ -224,7 +224,12 @@ impl PeerConn {
|
||||
self.info = Some(rsp);
|
||||
self.is_client = Some(false);
|
||||
self.send_handshake().await?;
|
||||
Ok(())
|
||||
|
||||
if self.get_peer_id() == self.my_peer_id {
|
||||
Err(Error::WaitRespError("peer id conflict".to_owned()))
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[tracing::instrument]
|
||||
@@ -235,7 +240,12 @@ impl PeerConn {
|
||||
tracing::info!("handshake response: {:?}", rsp);
|
||||
self.info = Some(rsp);
|
||||
self.is_client = Some(true);
|
||||
Ok(())
|
||||
|
||||
if self.get_peer_id() == self.my_peer_id {
|
||||
Err(Error::WaitRespError("peer id conflict".to_owned()))
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub fn handshake_done(&self) -> bool {
|
||||
@@ -396,6 +406,24 @@ mod tests {
|
||||
use crate::tunnel::filter::PacketRecorderTunnelFilter;
|
||||
use crate::tunnel::ring::create_ring_tunnel_pair;
|
||||
|
||||
#[tokio::test]
|
||||
async fn peer_conn_handshake_same_id() {
|
||||
let (c, s) = create_ring_tunnel_pair();
|
||||
let c_peer_id = new_peer_id();
|
||||
let s_peer_id = c_peer_id;
|
||||
|
||||
let mut c_peer = PeerConn::new(c_peer_id, get_mock_global_ctx(), Box::new(c));
|
||||
let mut s_peer = PeerConn::new(s_peer_id, get_mock_global_ctx(), Box::new(s));
|
||||
|
||||
let (c_ret, s_ret) = tokio::join!(
|
||||
c_peer.do_handshake_as_client(),
|
||||
s_peer.do_handshake_as_server()
|
||||
);
|
||||
|
||||
assert!(c_ret.is_err());
|
||||
assert!(s_ret.is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn peer_conn_handshake() {
|
||||
let (c, s) = create_ring_tunnel_pair();
|
||||
|
||||
@@ -294,6 +294,8 @@ impl PeerConnPinger {
|
||||
let need_close = if last_rx_packets != current_rx_packets {
|
||||
// if we receive some packet from peers, we should relax the condition
|
||||
counter > 50 && loss_rate_1 > 0.5
|
||||
|
||||
// TODO: wait more time to see if the loss rate is still high after no rx
|
||||
} else {
|
||||
true
|
||||
};
|
||||
|
||||
@@ -70,17 +70,32 @@ impl<T: Tunnel> MpscTunnel<T> {
|
||||
sink: &mut Pin<Box<dyn ZCPacketSink>>,
|
||||
) -> Result<(), TunnelError> {
|
||||
let item = rx.recv().await.with_context(|| "recv error")?;
|
||||
sink.feed(item).await?;
|
||||
while let Ok(item) = rx.try_recv() {
|
||||
if let Err(e) = timeout(Duration::from_secs(5), sink.feed(item))
|
||||
.await
|
||||
.unwrap()
|
||||
{
|
||||
tracing::error!(?e, "feed error");
|
||||
break;
|
||||
|
||||
match timeout(Duration::from_secs(10), async move {
|
||||
sink.feed(item).await?;
|
||||
while let Ok(item) = rx.try_recv() {
|
||||
match sink.feed(item).await {
|
||||
Err(e) => {
|
||||
tracing::error!(?e, "feed error");
|
||||
return Err(e);
|
||||
}
|
||||
Ok(_) => {}
|
||||
}
|
||||
}
|
||||
sink.flush().await
|
||||
})
|
||||
.await
|
||||
{
|
||||
Ok(Ok(_)) => Ok(()),
|
||||
Ok(Err(e)) => {
|
||||
tracing::error!(?e, "forward error");
|
||||
Err(e)
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::error!(?e, "forward timeout");
|
||||
Err(e.into())
|
||||
}
|
||||
}
|
||||
sink.flush().await
|
||||
}
|
||||
|
||||
pub fn get_stream(&mut self) -> Pin<Box<dyn ZCPacketStream>> {
|
||||
|
||||
Reference in New Issue
Block a user