mirror of
https://mirror.suhoan.cn/https://github.com/EasyTier/EasyTier.git
synced 2025-12-14 13:47:24 +08:00
forward foreign peer event to unbounded channel
if some events loss, may cause inconsistent foreign peer info.
This commit is contained in:
@@ -10,7 +10,7 @@ use std::sync::Arc;
|
|||||||
use dashmap::DashMap;
|
use dashmap::DashMap;
|
||||||
use tokio::{
|
use tokio::{
|
||||||
sync::{
|
sync::{
|
||||||
mpsc::{self, UnboundedReceiver, UnboundedSender},
|
mpsc::{self, unbounded_channel, UnboundedReceiver, UnboundedSender},
|
||||||
Mutex,
|
Mutex,
|
||||||
},
|
},
|
||||||
task::JoinSet,
|
task::JoinSet,
|
||||||
@@ -260,8 +260,16 @@ impl ForeignNetworkManager {
|
|||||||
async fn start_global_event_handler(&self) {
|
async fn start_global_event_handler(&self) {
|
||||||
let data = self.data.clone();
|
let data = self.data.clone();
|
||||||
let mut s = self.global_ctx.subscribe();
|
let mut s = self.global_ctx.subscribe();
|
||||||
|
let (ev_tx, mut ev_rx) = unbounded_channel();
|
||||||
self.tasks.lock().await.spawn(async move {
|
self.tasks.lock().await.spawn(async move {
|
||||||
while let Ok(e) = s.recv().await {
|
while let Ok(e) = s.recv().await {
|
||||||
|
ev_tx.send(e).unwrap();
|
||||||
|
}
|
||||||
|
panic!("global event handler at foreign network manager exit");
|
||||||
|
});
|
||||||
|
|
||||||
|
self.tasks.lock().await.spawn(async move {
|
||||||
|
while let Some(e) = ev_rx.recv().await {
|
||||||
if let GlobalCtxEvent::PeerRemoved(peer_id) = &e {
|
if let GlobalCtxEvent::PeerRemoved(peer_id) = &e {
|
||||||
tracing::info!(?e, "remove peer from foreign network manager");
|
tracing::info!(?e, "remove peer from foreign network manager");
|
||||||
data.remove_peer(*peer_id);
|
data.remove_peer(*peer_id);
|
||||||
|
|||||||
Reference in New Issue
Block a user