diff --git a/easytier/src/common/compressor.rs b/easytier/src/common/compressor.rs index affb4db..7366378 100644 --- a/easytier/src/common/compressor.rs +++ b/easytier/src/common/compressor.rs @@ -1,10 +1,7 @@ -use std::io::{Read, Write}; - +use anyhow::Context; use dashmap::DashMap; use std::cell::RefCell; -use zstd::stream::read::Decoder; -use zstd::stream::write::Encoder; -use zstd::zstd_safe::{CCtx, DCtx}; +use zstd::bulk; use zerocopy::{AsBytes as _, FromBytes as _}; @@ -35,17 +32,16 @@ impl DefaultCompressor { compress_algo: CompressorAlgo, ) -> Result, Error> { match compress_algo { - CompressorAlgo::ZstdDefault => { - let ret = CTX_MAP.with(|map_cell| { - let map = map_cell.borrow(); - let mut ctx_entry = map.entry(compress_algo).or_default(); - let writer = Vec::new(); - let mut o = Encoder::with_context(writer, ctx_entry.value_mut()); - o.write_all(data)?; - o.finish() - }); - Ok(ret?) - } + CompressorAlgo::ZstdDefault => CTX_MAP.with(|map_cell| { + let map = map_cell.borrow(); + let mut ctx_entry = map.entry(compress_algo).or_default(); + ctx_entry.compress(data).with_context(|| { + format!( + "Failed to compress data with algorithm: {:?}", + compress_algo + ) + }) + }), CompressorAlgo::None => Ok(data.to_vec()), } } @@ -59,10 +55,23 @@ impl DefaultCompressor { CompressorAlgo::ZstdDefault => DCTX_MAP.with(|map_cell| { let map = map_cell.borrow(); let mut ctx_entry = map.entry(compress_algo).or_default(); - let mut decoder = Decoder::with_context(data, ctx_entry.value_mut()); - let mut output = Vec::new(); - decoder.read_to_end(&mut output)?; - Ok(output) + for i in 1..=5 { + let mut len = data.len() * 2usize.pow(i); + if i == 5 && len < 64 * 1024 { + len = 64 * 1024; // Ensure a minimum buffer size + } + match ctx_entry.decompress(data, len) { + Ok(buf) => return Ok(buf), + Err(e) if e.to_string().contains("buffer is too small") => { + continue; // Try with a larger buffer + } + Err(e) => return Err(e.into()), + } + } + Err(anyhow::anyhow!( + "Failed to decompress data after multiple attempts with algorithm: {:?}", + compress_algo + )) }), CompressorAlgo::None => Ok(data.to_vec()), } @@ -155,8 +164,8 @@ impl Compressor for DefaultCompressor { } thread_local! { - static CTX_MAP: RefCell>> = RefCell::new(DashMap::new()); - static DCTX_MAP: RefCell>> = RefCell::new(DashMap::new()); + static CTX_MAP: RefCell>> = RefCell::new(DashMap::new()); + static DCTX_MAP: RefCell>> = RefCell::new(DashMap::new()); } #[cfg(test)]