From 0fbbea963f425d2593e504b33f4bf6c447adbe5c Mon Sep 17 00:00:00 2001 From: "sijie.sun" Date: Wed, 14 Aug 2024 23:52:39 +0800 Subject: [PATCH] forward foreign peer event to unbounded channel if some events loss, may cause inconsistent foreign peer info. --- easytier/src/peers/foreign_network_manager.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/easytier/src/peers/foreign_network_manager.rs b/easytier/src/peers/foreign_network_manager.rs index f0fe7f6..66019ef 100644 --- a/easytier/src/peers/foreign_network_manager.rs +++ b/easytier/src/peers/foreign_network_manager.rs @@ -10,7 +10,7 @@ use std::sync::Arc; use dashmap::DashMap; use tokio::{ sync::{ - mpsc::{self, UnboundedReceiver, UnboundedSender}, + mpsc::{self, unbounded_channel, UnboundedReceiver, UnboundedSender}, Mutex, }, task::JoinSet, @@ -260,8 +260,16 @@ impl ForeignNetworkManager { async fn start_global_event_handler(&self) { let data = self.data.clone(); let mut s = self.global_ctx.subscribe(); + let (ev_tx, mut ev_rx) = unbounded_channel(); self.tasks.lock().await.spawn(async move { while let Ok(e) = s.recv().await { + ev_tx.send(e).unwrap(); + } + panic!("global event handler at foreign network manager exit"); + }); + + self.tasks.lock().await.spawn(async move { + while let Some(e) = ev_rx.recv().await { if let GlobalCtxEvent::PeerRemoved(peer_id) = &e { tracing::info!(?e, "remove peer from foreign network manager"); data.remove_peer(*peer_id);