From cd26d9f669993353efb7cdc15ec86d6ca3d05037 Mon Sep 17 00:00:00 2001 From: "Sijie.Sun" Date: Thu, 26 Jun 2025 02:19:26 +0800 Subject: [PATCH] fix mem leak of token bucket (#1055) --- easytier/src/common/token_bucket.rs | 78 +++++++++++++++++-- easytier/src/peers/foreign_network_manager.rs | 4 +- 2 files changed, 75 insertions(+), 7 deletions(-) diff --git a/easytier/src/common/token_bucket.rs b/easytier/src/common/token_bucket.rs index f5c710a..ebbcf89 100644 --- a/easytier/src/common/token_bucket.rs +++ b/easytier/src/common/token_bucket.rs @@ -67,12 +67,16 @@ impl TokenBucket { }); // Start background refill task - let arc_clone = arc_self.clone(); + let weak_bucket = Arc::downgrade(&arc_self); + let refill_interval = arc_self.config.refill_interval; let refill_task = tokio::spawn(async move { - let mut interval = time::interval(arc_clone.config.refill_interval); + let mut interval = time::interval(refill_interval); loop { interval.tick().await; - arc_clone.refill(); + let Some(bucket) = weak_bucket.upgrade() else { + break; + }; + bucket.refill(); } }); @@ -167,9 +171,9 @@ impl TokenBucketManager { let retain_task = tokio::spawn(async move { loop { // Retain only buckets that are still in use - buckets_clone.retain(|_, bucket| Arc::::strong_count(bucket) <= 1); + buckets_clone.retain(|_, bucket| Arc::::strong_count(bucket) > 1); // Sleep for a while before next retention check - tokio::time::sleep(Duration::from_secs(60)).await; + tokio::time::sleep(Duration::from_secs(5)).await; } }); @@ -190,6 +194,16 @@ impl TokenBucketManager { #[cfg(test)] mod tests { + use crate::{ + connector::udp_hole_punch::tests::create_mock_peer_manager_with_mock_stun, + peers::{ + foreign_network_manager::tests::create_mock_peer_manager_for_foreign_network, + tests::connect_peer_manager, + }, + proto::common::NatType, + tunnel::common::tests::wait_for_condition, + }; + use super::*; use tokio::time::{sleep, Duration}; @@ -309,4 +323,58 @@ mod tests { tokens ); } + + #[tokio::test] + async fn test_token_bucket_free() { + let pm_center1 = create_mock_peer_manager_with_mock_stun(NatType::Unknown).await; + + for i in 0..10 { + let pma_net1 = create_mock_peer_manager_for_foreign_network(&format!("net{}", i)).await; + + connect_peer_manager(pma_net1.clone(), pm_center1.clone()).await; + wait_for_condition( + || async { pma_net1.list_routes().await.len() == 1 }, + Duration::from_secs(5), + ) + .await; + println!("net{}", i); + println!( + "buckets: {}", + pm_center1 + .get_global_ctx() + .token_bucket_manager() + .buckets + .len() + ); + + drop(pma_net1); + wait_for_condition( + || async { + pm_center1 + .get_foreign_network_manager() + .list_foreign_networks() + .await + .foreign_networks + .len() + == 0 + }, + Duration::from_secs(5), + ) + .await; + } + + // wait token bucket empty + wait_for_condition( + || async { + pm_center1 + .get_global_ctx() + .token_bucket_manager() + .buckets + .len() + == 0 + }, + Duration::from_secs(10), + ) + .await; + } } diff --git a/easytier/src/peers/foreign_network_manager.rs b/easytier/src/peers/foreign_network_manager.rs index e064148..c4816a9 100644 --- a/easytier/src/peers/foreign_network_manager.rs +++ b/easytier/src/peers/foreign_network_manager.rs @@ -675,7 +675,7 @@ impl Drop for ForeignNetworkManager { } #[cfg(test)] -mod tests { +pub mod tests { use crate::{ common::global_ctx::tests::get_mock_global_ctx_with_network, connector::udp_hole_punch::tests::{ @@ -711,7 +711,7 @@ mod tests { peer_mgr } - async fn create_mock_peer_manager_for_foreign_network(network: &str) -> Arc { + pub async fn create_mock_peer_manager_for_foreign_network(network: &str) -> Arc { create_mock_peer_manager_for_foreign_network_ext(network, network).await }