puffin/puffin_manager/stager/
bounded_stager.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::path::PathBuf;
17use std::sync::Arc;
18use std::time::{Duration, Instant};
19
20use async_trait::async_trait;
21use async_walkdir::{Filtering, WalkDir};
22use base64::Engine;
23use base64::prelude::BASE64_URL_SAFE;
24use common_base::range_read::FileReader;
25use common_runtime::runtime::RuntimeTrait;
26use common_telemetry::{info, warn};
27use futures::{FutureExt, StreamExt};
28use moka::future::Cache;
29use moka::policy::EvictionPolicy;
30use sha2::{Digest, Sha256};
31use snafu::ResultExt;
32use tokio::fs;
33use tokio::sync::mpsc::error::TrySendError;
34use tokio::sync::mpsc::{Receiver, Sender};
35use tokio_util::compat::TokioAsyncWriteCompatExt;
36
37use crate::error::{
38    CacheGetSnafu, CreateSnafu, MetadataSnafu, OpenSnafu, ReadSnafu, RemoveSnafu, RenameSnafu,
39    Result, WalkDirSnafu,
40};
41use crate::puffin_manager::stager::{
42    BoxWriter, DirWriterProvider, InitBlobFn, InitDirFn, Stager, StagerNotifier,
43};
44use crate::puffin_manager::{BlobGuard, DirGuard, DirMetrics};
45
46const DELETE_QUEUE_SIZE: usize = 10240;
47const TMP_EXTENSION: &str = "tmp";
48const DELETED_EXTENSION: &str = "deleted";
49const RECYCLE_BIN_TTL: Duration = Duration::from_secs(60);
50
51/// `BoundedStager` is a `Stager` that uses `moka` to manage staging area.
52pub struct BoundedStager<H> {
53    /// The base directory of the staging area.
54    base_dir: PathBuf,
55
56    /// The cache maintaining the cache key to the size of the file or directory.
57    cache: Cache<String, CacheValue>,
58
59    /// The recycle bin for the deleted files and directories.
60    recycle_bin: Cache<String, CacheValue>,
61
62    /// The delete queue for the cleanup task.
63    ///
64    /// The lifetime of a guard is:
65    ///   1. initially inserted into the cache
66    ///   2. moved to the recycle bin when evicted
67    ///      2.1 moved back to the cache when accessed
68    ///      2.2 deleted from the recycle bin after a certain period
69    ///   3. sent the delete task to the delete queue on drop
70    ///   4. background routine removes the file or directory
71    delete_queue: Sender<DeleteTask>,
72
73    /// Notifier for the stager.
74    notifier: Option<Arc<dyn StagerNotifier>>,
75
76    _phantom: std::marker::PhantomData<H>,
77}
78
79impl<H: 'static> BoundedStager<H> {
80    pub async fn new(
81        base_dir: PathBuf,
82        capacity: u64,
83        notifier: Option<Arc<dyn StagerNotifier>>,
84        cache_ttl: Option<Duration>,
85    ) -> Result<Self> {
86        tokio::fs::create_dir_all(&base_dir)
87            .await
88            .context(CreateSnafu)?;
89
90        let recycle_bin = Cache::builder().time_to_idle(RECYCLE_BIN_TTL).build();
91        let recycle_bin_cloned = recycle_bin.clone();
92        let notifier_cloned = notifier.clone();
93
94        let mut cache_builder = Cache::builder()
95            .max_capacity(capacity)
96            .weigher(|_: &String, v: &CacheValue| v.weight())
97            .eviction_policy(EvictionPolicy::lru())
98            .support_invalidation_closures()
99            .async_eviction_listener(move |k, v, _| {
100                let recycle_bin = recycle_bin_cloned.clone();
101                if let Some(notifier) = notifier_cloned.as_ref() {
102                    notifier.on_cache_evict(v.size());
103                    notifier.on_recycle_insert(v.size());
104                }
105                async move {
106                    recycle_bin.insert(k.as_str().to_string(), v).await;
107                }
108                .boxed()
109            });
110        if let Some(ttl) = cache_ttl
111            && !ttl.is_zero()
112        {
113            cache_builder = cache_builder.time_to_live(ttl);
114        }
115        let cache = cache_builder.build();
116
117        let (delete_queue, rx) = tokio::sync::mpsc::channel(DELETE_QUEUE_SIZE);
118        let notifier_cloned = notifier.clone();
119        common_runtime::global_runtime().spawn(Self::delete_routine(
120            rx,
121            recycle_bin.clone(),
122            notifier_cloned,
123        ));
124        let stager = Self {
125            cache,
126            base_dir,
127            delete_queue,
128            recycle_bin,
129            notifier,
130            _phantom: std::marker::PhantomData,
131        };
132
133        stager.recover().await?;
134
135        Ok(stager)
136    }
137}
138
139#[async_trait]
140impl<H: ToString + Clone + Send + Sync> Stager for BoundedStager<H> {
141    type Blob = Arc<FsBlobGuard>;
142    type Dir = Arc<FsDirGuard>;
143    type FileHandle = H;
144
145    async fn get_blob<'a>(
146        &self,
147        handle: &Self::FileHandle,
148        key: &str,
149        init_fn: Box<dyn InitBlobFn + Send + Sync + 'a>,
150    ) -> Result<Self::Blob> {
151        let handle_str = handle.to_string();
152        let cache_key = Self::encode_cache_key(&handle_str, key);
153
154        let mut miss = false;
155        let v = self
156            .cache
157            .try_get_with_by_ref(&cache_key, async {
158                if let Some(v) = self.recycle_bin.remove(&cache_key).await {
159                    if let Some(notifier) = self.notifier.as_ref() {
160                        let size = v.size();
161                        notifier.on_cache_insert(size);
162                        notifier.on_recycle_clear(size);
163                    }
164                    return Ok(v);
165                }
166
167                miss = true;
168                let timer = Instant::now();
169                let file_name = format!("{}.{}", cache_key, uuid::Uuid::new_v4());
170                let path = self.base_dir.join(&file_name);
171
172                let size = Self::write_blob(&path, init_fn).await?;
173                if let Some(notifier) = self.notifier.as_ref() {
174                    notifier.on_cache_insert(size);
175                    notifier.on_load_blob(timer.elapsed());
176                }
177                let guard = Arc::new(FsBlobGuard {
178                    handle: handle_str,
179                    path,
180                    delete_queue: self.delete_queue.clone(),
181                    size,
182                });
183                Ok(CacheValue::File(guard))
184            })
185            .await
186            .context(CacheGetSnafu)?;
187
188        if let Some(notifier) = self.notifier.as_ref() {
189            if miss {
190                notifier.on_cache_miss(v.size());
191            } else {
192                notifier.on_cache_hit(v.size());
193            }
194        }
195        match v {
196            CacheValue::File(guard) => Ok(guard),
197            _ => unreachable!(),
198        }
199    }
200
201    async fn get_dir<'a>(
202        &self,
203        handle: &Self::FileHandle,
204        key: &str,
205        init_fn: Box<dyn InitDirFn + Send + Sync + 'a>,
206    ) -> Result<(Self::Dir, DirMetrics)> {
207        let handle_str = handle.to_string();
208
209        let cache_key = Self::encode_cache_key(&handle_str, key);
210
211        let mut miss = false;
212        let v = self
213            .cache
214            .try_get_with_by_ref(&cache_key, async {
215                if let Some(v) = self.recycle_bin.remove(&cache_key).await {
216                    if let Some(notifier) = self.notifier.as_ref() {
217                        let size = v.size();
218                        notifier.on_cache_insert(size);
219                        notifier.on_recycle_clear(size);
220                    }
221                    return Ok(v);
222                }
223
224                miss = true;
225                let timer = Instant::now();
226                let dir_name = format!("{}.{}", cache_key, uuid::Uuid::new_v4());
227                let path = self.base_dir.join(&dir_name);
228
229                let size = Self::write_dir(&path, init_fn).await?;
230                if let Some(notifier) = self.notifier.as_ref() {
231                    notifier.on_cache_insert(size);
232                    notifier.on_load_dir(timer.elapsed());
233                }
234                let guard = Arc::new(FsDirGuard {
235                    handle: handle_str,
236                    path,
237                    size,
238                    delete_queue: self.delete_queue.clone(),
239                });
240                Ok(CacheValue::Dir(guard))
241            })
242            .await
243            .context(CacheGetSnafu)?;
244
245        let dir_size = v.size();
246        if let Some(notifier) = self.notifier.as_ref() {
247            if miss {
248                notifier.on_cache_miss(dir_size);
249            } else {
250                notifier.on_cache_hit(dir_size);
251            }
252        }
253
254        let metrics = DirMetrics {
255            cache_hit: !miss,
256            dir_size,
257        };
258
259        match v {
260            CacheValue::Dir(guard) => Ok((guard, metrics)),
261            _ => unreachable!(),
262        }
263    }
264
265    async fn put_dir(
266        &self,
267        handle: &Self::FileHandle,
268        key: &str,
269        dir_path: PathBuf,
270        size: u64,
271    ) -> Result<()> {
272        let handle_str = handle.to_string();
273        let cache_key = Self::encode_cache_key(&handle_str, key);
274
275        self.cache
276            .try_get_with(cache_key.clone(), async move {
277                if let Some(v) = self.recycle_bin.remove(&cache_key).await {
278                    if let Some(notifier) = self.notifier.as_ref() {
279                        let size = v.size();
280                        notifier.on_cache_insert(size);
281                        notifier.on_recycle_clear(size);
282                    }
283                    return Ok(v);
284                }
285
286                let dir_name = format!("{}.{}", cache_key, uuid::Uuid::new_v4());
287                let path = self.base_dir.join(&dir_name);
288
289                fs::rename(&dir_path, &path).await.context(RenameSnafu)?;
290                if let Some(notifier) = self.notifier.as_ref() {
291                    notifier.on_cache_insert(size);
292                }
293                let guard = Arc::new(FsDirGuard {
294                    handle: handle_str,
295                    path,
296                    size,
297                    delete_queue: self.delete_queue.clone(),
298                });
299                Ok(CacheValue::Dir(guard))
300            })
301            .await
302            .map(|_| ())
303            .context(CacheGetSnafu)?;
304
305        // Dir is usually large.
306        // Runs pending tasks of the cache and recycle bin to free up space
307        // more quickly.
308        self.cache.run_pending_tasks().await;
309        self.recycle_bin.run_pending_tasks().await;
310
311        Ok(())
312    }
313
314    async fn purge(&self, handle: &Self::FileHandle) -> Result<()> {
315        let handle_str = handle.to_string();
316        self.cache
317            .invalidate_entries_if(move |_k, v| v.handle() == handle_str)
318            .unwrap(); // SAFETY: `support_invalidation_closures` is enabled
319        self.cache.run_pending_tasks().await;
320        Ok(())
321    }
322}
323
324impl<H> BoundedStager<H> {
325    fn encode_cache_key(puffin_file_name: &str, key: &str) -> String {
326        let mut hasher = Sha256::new();
327        hasher.update(puffin_file_name);
328        hasher.update(key);
329        hasher.update(puffin_file_name);
330        let hash = hasher.finalize();
331
332        BASE64_URL_SAFE.encode(hash)
333    }
334
335    async fn write_blob(
336        target_path: &PathBuf,
337        init_fn: Box<dyn InitBlobFn + Send + Sync + '_>,
338    ) -> Result<u64> {
339        // To guarantee the atomicity of writing the file, we need to write
340        // the file to a temporary file first...
341        let tmp_path = target_path.with_extension(TMP_EXTENSION);
342        let writer = Box::new(
343            fs::File::create(&tmp_path)
344                .await
345                .context(CreateSnafu)?
346                .compat_write(),
347        );
348        let size = init_fn(writer).await?;
349
350        // ...then rename the temporary file to the target path
351        fs::rename(tmp_path, target_path)
352            .await
353            .context(RenameSnafu)?;
354        Ok(size)
355    }
356
357    async fn write_dir(
358        target_path: &PathBuf,
359        init_fn: Box<dyn InitDirFn + Send + Sync + '_>,
360    ) -> Result<u64> {
361        // To guarantee the atomicity of writing the directory, we need to write
362        // the directory to a temporary directory first...
363        let tmp_base = target_path.with_extension(TMP_EXTENSION);
364        let writer_provider = Box::new(MokaDirWriterProvider(tmp_base.clone()));
365        let size = init_fn(writer_provider).await?;
366
367        // ...then rename the temporary directory to the target path
368        fs::rename(&tmp_base, target_path)
369            .await
370            .context(RenameSnafu)?;
371        Ok(size)
372    }
373
374    /// Recovers the staging area by iterating through the staging directory.
375    ///
376    /// Note: It can't recover the mapping between puffin files and keys, so TTL
377    ///       is configured to purge the dangling files and directories.
378    async fn recover(&self) -> Result<()> {
379        let timer = std::time::Instant::now();
380        info!("Recovering the staging area, base_dir: {:?}", self.base_dir);
381
382        let mut read_dir = fs::read_dir(&self.base_dir).await.context(ReadSnafu)?;
383
384        let mut elems = HashMap::new();
385        while let Some(entry) = read_dir.next_entry().await.context(ReadSnafu)? {
386            let path = entry.path();
387
388            if path.extension() == Some(TMP_EXTENSION.as_ref())
389                || path.extension() == Some(DELETED_EXTENSION.as_ref())
390            {
391                // Remove temporary or deleted files and directories
392                if entry.metadata().await.context(MetadataSnafu)?.is_dir() {
393                    fs::remove_dir_all(path).await.context(RemoveSnafu)?;
394                } else {
395                    fs::remove_file(path).await.context(RemoveSnafu)?;
396                }
397            } else {
398                // Insert the guard of the file or directory to the cache
399                let meta = entry.metadata().await.context(MetadataSnafu)?;
400                let file_path = path.file_name().unwrap().to_string_lossy().into_owned();
401
402                // <key>.<uuid>
403                let key = match file_path.split('.').next() {
404                    Some(key) => key.to_string(),
405                    None => {
406                        warn!(
407                            "Invalid staging file name: {}, expected format: <key>.<uuid>",
408                            file_path
409                        );
410                        continue;
411                    }
412                };
413
414                if meta.is_dir() {
415                    let size = Self::get_dir_size(&path).await?;
416                    let v = CacheValue::Dir(Arc::new(FsDirGuard {
417                        path,
418                        size,
419                        delete_queue: self.delete_queue.clone(),
420
421                        // placeholder
422                        handle: String::new(),
423                    }));
424                    // A duplicate dir will be moved to the delete queue.
425                    let _dup_dir = elems.insert(key, v);
426                } else {
427                    let size = meta.len();
428                    let v = CacheValue::File(Arc::new(FsBlobGuard {
429                        path,
430                        size,
431                        delete_queue: self.delete_queue.clone(),
432
433                        // placeholder
434                        handle: String::new(),
435                    }));
436                    // A duplicate file will be moved to the delete queue.
437                    let _dup_file = elems.insert(key, v);
438                }
439            }
440        }
441
442        let mut size = 0;
443        let num_elems = elems.len();
444        for (key, value) in elems {
445            size += value.size();
446            self.cache.insert(key, value).await;
447        }
448        if let Some(notifier) = self.notifier.as_ref() {
449            notifier.on_cache_insert(size);
450        }
451
452        self.cache.run_pending_tasks().await;
453
454        info!(
455            "Recovered the staging area, num_entries: {}, num_bytes: {}, cost: {:?}",
456            num_elems,
457            size,
458            timer.elapsed()
459        );
460        Ok(())
461    }
462
463    /// Walks through the directory and calculate the total size of all files in the directory.
464    async fn get_dir_size(path: &PathBuf) -> Result<u64> {
465        let mut size = 0;
466        let mut wd = WalkDir::new(path).filter(|entry| async move {
467            match entry.file_type().await {
468                Ok(ft) if ft.is_dir() => Filtering::Ignore,
469                _ => Filtering::Continue,
470            }
471        });
472
473        while let Some(entry) = wd.next().await {
474            let entry = entry.context(WalkDirSnafu)?;
475            size += entry.metadata().await.context(MetadataSnafu)?.len();
476        }
477
478        Ok(size)
479    }
480
481    async fn delete_routine(
482        mut receiver: Receiver<DeleteTask>,
483        recycle_bin: Cache<String, CacheValue>,
484        notifier: Option<Arc<dyn StagerNotifier>>,
485    ) {
486        loop {
487            match tokio::time::timeout(RECYCLE_BIN_TTL, receiver.recv()).await {
488                Ok(Some(task)) => match task {
489                    DeleteTask::File(path, size) => {
490                        if let Err(err) = fs::remove_file(&path).await {
491                            if err.kind() == std::io::ErrorKind::NotFound {
492                                continue;
493                            }
494
495                            warn!(err; "Failed to remove the file.");
496                        }
497
498                        if let Some(notifier) = notifier.as_ref() {
499                            notifier.on_recycle_clear(size);
500                        }
501                    }
502
503                    DeleteTask::Dir(path, size) => {
504                        let deleted_path = path.with_extension(DELETED_EXTENSION);
505                        if let Err(err) = fs::rename(&path, &deleted_path).await {
506                            if err.kind() == std::io::ErrorKind::NotFound {
507                                continue;
508                            }
509
510                            // Remove the deleted directory if the rename fails and retry
511                            let _ = fs::remove_dir_all(&deleted_path).await;
512                            if let Err(err) = fs::rename(&path, &deleted_path).await {
513                                warn!(err; "Failed to rename the dangling directory to deleted path.");
514                                continue;
515                            }
516                        }
517                        if let Err(err) = fs::remove_dir_all(&deleted_path).await {
518                            warn!(err; "Failed to remove the dangling directory.");
519                        }
520                        if let Some(notifier) = notifier.as_ref() {
521                            notifier.on_recycle_clear(size);
522                        }
523                    }
524                    DeleteTask::Terminate => {
525                        break;
526                    }
527                },
528                Ok(None) => break,
529                Err(_) => {
530                    // Purge recycle bin periodically to reclaim the space quickly.
531                    recycle_bin.run_pending_tasks().await;
532                }
533            }
534        }
535
536        info!("The delete routine for the bounded stager is terminated.");
537    }
538}
539
540impl<H> Drop for BoundedStager<H> {
541    fn drop(&mut self) {
542        let _ = self.delete_queue.try_send(DeleteTask::Terminate);
543    }
544}
545
546#[derive(Debug, Clone)]
547enum CacheValue {
548    File(Arc<FsBlobGuard>),
549    Dir(Arc<FsDirGuard>),
550}
551
552impl CacheValue {
553    fn size(&self) -> u64 {
554        match self {
555            CacheValue::File(guard) => guard.size,
556            CacheValue::Dir(guard) => guard.size,
557        }
558    }
559
560    fn weight(&self) -> u32 {
561        self.size().try_into().unwrap_or(u32::MAX)
562    }
563
564    fn handle(&self) -> &str {
565        match self {
566            CacheValue::File(guard) => &guard.handle,
567            CacheValue::Dir(guard) => &guard.handle,
568        }
569    }
570}
571
572enum DeleteTask {
573    File(PathBuf, u64),
574    Dir(PathBuf, u64),
575    Terminate,
576}
577
578/// `FsBlobGuard` is a `BlobGuard` for accessing the blob and
579/// automatically deleting the file on drop.
580#[derive(Debug)]
581pub struct FsBlobGuard {
582    handle: String,
583    path: PathBuf,
584    size: u64,
585    delete_queue: Sender<DeleteTask>,
586}
587
588#[async_trait]
589impl BlobGuard for FsBlobGuard {
590    type Reader = FileReader;
591
592    async fn reader(&self) -> Result<Self::Reader> {
593        FileReader::new(&self.path).await.context(OpenSnafu)
594    }
595}
596
597impl Drop for FsBlobGuard {
598    fn drop(&mut self) {
599        if let Err(err) = self
600            .delete_queue
601            .try_send(DeleteTask::File(self.path.clone(), self.size))
602        {
603            if matches!(err, TrySendError::Closed(_)) {
604                return;
605            }
606            warn!(err; "Failed to send the delete task for the file.");
607        }
608    }
609}
610
611/// `FsDirGuard` is a `DirGuard` for accessing the directory and
612/// automatically deleting the directory on drop.
613#[derive(Debug)]
614pub struct FsDirGuard {
615    handle: String,
616    path: PathBuf,
617    size: u64,
618    delete_queue: Sender<DeleteTask>,
619}
620
621impl DirGuard for FsDirGuard {
622    fn path(&self) -> &PathBuf {
623        &self.path
624    }
625}
626
627impl Drop for FsDirGuard {
628    fn drop(&mut self) {
629        if let Err(err) = self
630            .delete_queue
631            .try_send(DeleteTask::Dir(self.path.clone(), self.size))
632        {
633            if matches!(err, TrySendError::Closed(_)) {
634                return;
635            }
636            warn!(err; "Failed to send the delete task for the directory.");
637        }
638    }
639}
640
641/// `MokaDirWriterProvider` implements `DirWriterProvider` for initializing a directory.
642struct MokaDirWriterProvider(PathBuf);
643
644#[async_trait]
645impl DirWriterProvider for MokaDirWriterProvider {
646    async fn writer(&self, rel_path: &str) -> Result<BoxWriter> {
647        let full_path = if cfg!(windows) {
648            self.0.join(rel_path.replace('/', "\\"))
649        } else {
650            self.0.join(rel_path)
651        };
652        if let Some(parent) = full_path.parent() {
653            fs::create_dir_all(parent).await.context(CreateSnafu)?;
654        }
655        Ok(Box::new(
656            fs::File::create(full_path)
657                .await
658                .context(CreateSnafu)?
659                .compat_write(),
660        ) as BoxWriter)
661    }
662}
663
664#[cfg(test)]
665impl<H> BoundedStager<H> {
666    pub async fn must_get_file(&self, puffin_file_name: &str, key: &str) -> fs::File {
667        let cache_key = Self::encode_cache_key(puffin_file_name, key);
668        let value = self.cache.get(&cache_key).await.unwrap();
669        let path = match &value {
670            CacheValue::File(guard) => &guard.path,
671            _ => panic!("Expected a file, but got a directory."),
672        };
673        fs::File::open(path).await.unwrap()
674    }
675
676    pub async fn must_get_dir(&self, puffin_file_name: &str, key: &str) -> PathBuf {
677        let cache_key = Self::encode_cache_key(puffin_file_name, key);
678        let value = self.cache.get(&cache_key).await.unwrap();
679        let path = match &value {
680            CacheValue::Dir(guard) => &guard.path,
681            _ => panic!("Expected a directory, but got a file."),
682        };
683        path.clone()
684    }
685
686    pub fn in_cache(&self, puffin_file_name: &str, key: &str) -> bool {
687        let cache_key = Self::encode_cache_key(puffin_file_name, key);
688        self.cache.contains_key(&cache_key)
689    }
690}
691
692#[cfg(test)]
693mod tests {
694    use std::sync::atomic::AtomicU64;
695
696    use common_base::range_read::RangeReader;
697    use common_test_util::temp_dir::create_temp_dir;
698    use futures::AsyncWriteExt;
699    use tokio::io::AsyncReadExt as _;
700
701    use super::*;
702    use crate::error::BlobNotFoundSnafu;
703    use crate::puffin_manager::stager::Stager;
704
705    struct MockNotifier {
706        cache_insert_size: AtomicU64,
707        cache_evict_size: AtomicU64,
708        cache_hit_count: AtomicU64,
709        cache_hit_size: AtomicU64,
710        cache_miss_count: AtomicU64,
711        cache_miss_size: AtomicU64,
712        recycle_insert_size: AtomicU64,
713        recycle_clear_size: AtomicU64,
714    }
715
716    #[derive(Debug, PartialEq, Eq)]
717    struct Stats {
718        cache_insert_size: u64,
719        cache_evict_size: u64,
720        cache_hit_count: u64,
721        cache_hit_size: u64,
722        cache_miss_count: u64,
723        cache_miss_size: u64,
724        recycle_insert_size: u64,
725        recycle_clear_size: u64,
726    }
727
728    impl MockNotifier {
729        fn build() -> Arc<MockNotifier> {
730            Arc::new(Self {
731                cache_insert_size: AtomicU64::new(0),
732                cache_evict_size: AtomicU64::new(0),
733                cache_hit_count: AtomicU64::new(0),
734                cache_hit_size: AtomicU64::new(0),
735                cache_miss_count: AtomicU64::new(0),
736                cache_miss_size: AtomicU64::new(0),
737                recycle_insert_size: AtomicU64::new(0),
738                recycle_clear_size: AtomicU64::new(0),
739            })
740        }
741
742        fn stats(&self) -> Stats {
743            Stats {
744                cache_insert_size: self
745                    .cache_insert_size
746                    .load(std::sync::atomic::Ordering::Relaxed),
747                cache_evict_size: self
748                    .cache_evict_size
749                    .load(std::sync::atomic::Ordering::Relaxed),
750                cache_hit_count: self
751                    .cache_hit_count
752                    .load(std::sync::atomic::Ordering::Relaxed),
753                cache_hit_size: self
754                    .cache_hit_size
755                    .load(std::sync::atomic::Ordering::Relaxed),
756                cache_miss_count: self
757                    .cache_miss_count
758                    .load(std::sync::atomic::Ordering::Relaxed),
759                cache_miss_size: self
760                    .cache_miss_size
761                    .load(std::sync::atomic::Ordering::Relaxed),
762                recycle_insert_size: self
763                    .recycle_insert_size
764                    .load(std::sync::atomic::Ordering::Relaxed),
765                recycle_clear_size: self
766                    .recycle_clear_size
767                    .load(std::sync::atomic::Ordering::Relaxed),
768            }
769        }
770    }
771
772    impl StagerNotifier for MockNotifier {
773        fn on_cache_insert(&self, size: u64) {
774            self.cache_insert_size
775                .fetch_add(size, std::sync::atomic::Ordering::Relaxed);
776        }
777
778        fn on_cache_evict(&self, size: u64) {
779            self.cache_evict_size
780                .fetch_add(size, std::sync::atomic::Ordering::Relaxed);
781        }
782
783        fn on_cache_hit(&self, size: u64) {
784            self.cache_hit_count
785                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
786            self.cache_hit_size
787                .fetch_add(size, std::sync::atomic::Ordering::Relaxed);
788        }
789
790        fn on_cache_miss(&self, size: u64) {
791            self.cache_miss_count
792                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
793            self.cache_miss_size
794                .fetch_add(size, std::sync::atomic::Ordering::Relaxed);
795        }
796
797        fn on_recycle_insert(&self, size: u64) {
798            self.recycle_insert_size
799                .fetch_add(size, std::sync::atomic::Ordering::Relaxed);
800        }
801
802        fn on_recycle_clear(&self, size: u64) {
803            self.recycle_clear_size
804                .fetch_add(size, std::sync::atomic::Ordering::Relaxed);
805        }
806
807        fn on_load_blob(&self, _duration: Duration) {}
808
809        fn on_load_dir(&self, _duration: Duration) {}
810    }
811
812    #[tokio::test]
813    async fn test_get_blob() {
814        let tempdir = create_temp_dir("test_get_blob_");
815        let notifier = MockNotifier::build();
816        let stager = BoundedStager::new(
817            tempdir.path().to_path_buf(),
818            u64::MAX,
819            Some(notifier.clone()),
820            None,
821        )
822        .await
823        .unwrap();
824
825        let puffin_file_name = "test_get_blob".to_string();
826        let key = "key";
827        let reader = stager
828            .get_blob(
829                &puffin_file_name,
830                key,
831                Box::new(|mut writer| {
832                    Box::pin(async move {
833                        writer.write_all(b"hello world").await.unwrap();
834                        Ok(11)
835                    })
836                }),
837            )
838            .await
839            .unwrap()
840            .reader()
841            .await
842            .unwrap();
843
844        let m = reader.metadata().await.unwrap();
845        let buf = reader.read(0..m.content_length).await.unwrap();
846        assert_eq!(&*buf, b"hello world");
847
848        let mut file = stager.must_get_file(&puffin_file_name, key).await;
849        let mut buf = Vec::new();
850        file.read_to_end(&mut buf).await.unwrap();
851        assert_eq!(buf, b"hello world");
852
853        let stats = notifier.stats();
854        assert_eq!(
855            stats,
856            Stats {
857                cache_insert_size: 11,
858                cache_evict_size: 0,
859                cache_hit_count: 0,
860                cache_hit_size: 0,
861                cache_miss_count: 1,
862                cache_miss_size: 11,
863                recycle_insert_size: 0,
864                recycle_clear_size: 0,
865            }
866        );
867    }
868
869    #[tokio::test]
870    async fn test_get_dir() {
871        let tempdir = create_temp_dir("test_get_dir_");
872        let notifier = MockNotifier::build();
873        let stager = BoundedStager::new(
874            tempdir.path().to_path_buf(),
875            u64::MAX,
876            Some(notifier.clone()),
877            None,
878        )
879        .await
880        .unwrap();
881
882        let files_in_dir = [
883            ("file_a", "Hello, world!".as_bytes()),
884            ("file_b", "Hello, Rust!".as_bytes()),
885            ("file_c", "你好,世界!".as_bytes()),
886            ("subdir/file_d", "Hello, Puffin!".as_bytes()),
887            ("subdir/subsubdir/file_e", "¡Hola mundo!".as_bytes()),
888        ];
889
890        let puffin_file_name = "test_get_dir".to_string();
891        let key = "key";
892        let (dir_path, metrics) = stager
893            .get_dir(
894                &puffin_file_name,
895                key,
896                Box::new(|writer_provider| {
897                    Box::pin(async move {
898                        let mut size = 0;
899                        for (rel_path, content) in &files_in_dir {
900                            size += content.len();
901                            let mut writer = writer_provider.writer(rel_path).await.unwrap();
902                            writer.write_all(content).await.unwrap();
903                        }
904                        Ok(size as _)
905                    })
906                }),
907            )
908            .await
909            .unwrap();
910
911        assert!(!metrics.cache_hit);
912        assert!(metrics.dir_size > 0);
913
914        for (rel_path, content) in &files_in_dir {
915            let file_path = dir_path.path().join(rel_path);
916            let mut file = tokio::fs::File::open(&file_path).await.unwrap();
917            let mut buf = Vec::new();
918            file.read_to_end(&mut buf).await.unwrap();
919            assert_eq!(buf, *content);
920        }
921
922        let dir_path = stager.must_get_dir(&puffin_file_name, key).await;
923        for (rel_path, content) in &files_in_dir {
924            let file_path = dir_path.join(rel_path);
925            let mut file = tokio::fs::File::open(&file_path).await.unwrap();
926            let mut buf = Vec::new();
927            file.read_to_end(&mut buf).await.unwrap();
928            assert_eq!(buf, *content);
929        }
930
931        let stats = notifier.stats();
932        assert_eq!(
933            stats,
934            Stats {
935                cache_insert_size: 70,
936                cache_evict_size: 0,
937                cache_hit_count: 0,
938                cache_hit_size: 0,
939                cache_miss_count: 1,
940                cache_miss_size: 70,
941                recycle_insert_size: 0,
942                recycle_clear_size: 0
943            }
944        );
945    }
946
947    #[tokio::test]
948    async fn test_recover() {
949        let tempdir = create_temp_dir("test_recover_");
950        let notifier = MockNotifier::build();
951        let stager = BoundedStager::new(
952            tempdir.path().to_path_buf(),
953            u64::MAX,
954            Some(notifier.clone()),
955            None,
956        )
957        .await
958        .unwrap();
959
960        // initialize stager
961        let puffin_file_name = "test_recover".to_string();
962        let blob_key = "blob_key";
963        let guard = stager
964            .get_blob(
965                &puffin_file_name,
966                blob_key,
967                Box::new(|mut writer| {
968                    Box::pin(async move {
969                        writer.write_all(b"hello world").await.unwrap();
970                        Ok(11)
971                    })
972                }),
973            )
974            .await
975            .unwrap();
976        drop(guard);
977
978        let files_in_dir = [
979            ("file_a", "Hello, world!".as_bytes()),
980            ("file_b", "Hello, Rust!".as_bytes()),
981            ("file_c", "你好,世界!".as_bytes()),
982            ("subdir/file_d", "Hello, Puffin!".as_bytes()),
983            ("subdir/subsubdir/file_e", "¡Hola mundo!".as_bytes()),
984        ];
985
986        let dir_key = "dir_key";
987        let (guard, _metrics) = stager
988            .get_dir(
989                &puffin_file_name,
990                dir_key,
991                Box::new(|writer_provider| {
992                    Box::pin(async move {
993                        let mut size = 0;
994                        for (rel_path, content) in &files_in_dir {
995                            size += content.len();
996                            let mut writer = writer_provider.writer(rel_path).await.unwrap();
997                            writer.write_all(content).await.unwrap();
998                        }
999                        Ok(size as _)
1000                    })
1001                }),
1002            )
1003            .await
1004            .unwrap();
1005        drop(guard);
1006
1007        // recover stager
1008        drop(stager);
1009        let stager = BoundedStager::new(tempdir.path().to_path_buf(), u64::MAX, None, None)
1010            .await
1011            .unwrap();
1012
1013        let reader = stager
1014            .get_blob(
1015                &puffin_file_name,
1016                blob_key,
1017                Box::new(|_| Box::pin(async { Ok(0) })),
1018            )
1019            .await
1020            .unwrap()
1021            .reader()
1022            .await
1023            .unwrap();
1024
1025        let m = reader.metadata().await.unwrap();
1026        let buf = reader.read(0..m.content_length).await.unwrap();
1027        assert_eq!(&*buf, b"hello world");
1028
1029        let (dir_path, metrics) = stager
1030            .get_dir(
1031                &puffin_file_name,
1032                dir_key,
1033                Box::new(|_| Box::pin(async { Ok(0) })),
1034            )
1035            .await
1036            .unwrap();
1037
1038        assert!(metrics.cache_hit);
1039        assert!(metrics.dir_size > 0);
1040        for (rel_path, content) in &files_in_dir {
1041            let file_path = dir_path.path().join(rel_path);
1042            let mut file = tokio::fs::File::open(&file_path).await.unwrap();
1043            let mut buf = Vec::new();
1044            file.read_to_end(&mut buf).await.unwrap();
1045            assert_eq!(buf, *content);
1046        }
1047
1048        let stats = notifier.stats();
1049        assert_eq!(
1050            stats,
1051            Stats {
1052                cache_insert_size: 81,
1053                cache_evict_size: 0,
1054                cache_hit_count: 0,
1055                cache_hit_size: 0,
1056                cache_miss_count: 2,
1057                cache_miss_size: 81,
1058                recycle_insert_size: 0,
1059                recycle_clear_size: 0
1060            }
1061        );
1062    }
1063
1064    #[tokio::test]
1065    async fn test_eviction() {
1066        let tempdir = create_temp_dir("test_eviction_");
1067        let notifier = MockNotifier::build();
1068        let stager = BoundedStager::new(
1069            tempdir.path().to_path_buf(),
1070            1, /* extremely small size */
1071            Some(notifier.clone()),
1072            None,
1073        )
1074        .await
1075        .unwrap();
1076
1077        let puffin_file_name = "test_eviction".to_string();
1078        let blob_key = "blob_key";
1079
1080        // First time to get the blob
1081        let reader = stager
1082            .get_blob(
1083                &puffin_file_name,
1084                blob_key,
1085                Box::new(|mut writer| {
1086                    Box::pin(async move {
1087                        writer.write_all(b"Hello world").await.unwrap();
1088                        Ok(11)
1089                    })
1090                }),
1091            )
1092            .await
1093            .unwrap()
1094            .reader()
1095            .await
1096            .unwrap();
1097
1098        // The blob should be evicted
1099        stager.cache.run_pending_tasks().await;
1100        assert!(!stager.in_cache(&puffin_file_name, blob_key));
1101
1102        let stats = notifier.stats();
1103        assert_eq!(
1104            stats,
1105            Stats {
1106                cache_insert_size: 11,
1107                cache_evict_size: 11,
1108                cache_hit_count: 0,
1109                cache_hit_size: 0,
1110                cache_miss_count: 1,
1111                cache_miss_size: 11,
1112                recycle_insert_size: 11,
1113                recycle_clear_size: 0
1114            }
1115        );
1116
1117        let m = reader.metadata().await.unwrap();
1118        let buf = reader.read(0..m.content_length).await.unwrap();
1119        assert_eq!(&*buf, b"Hello world");
1120
1121        // Second time to get the blob, get from recycle bin
1122        let reader = stager
1123            .get_blob(
1124                &puffin_file_name,
1125                blob_key,
1126                Box::new(|_| async { Ok(0) }.boxed()),
1127            )
1128            .await
1129            .unwrap()
1130            .reader()
1131            .await
1132            .unwrap();
1133
1134        // The blob should be evicted
1135        stager.cache.run_pending_tasks().await;
1136        assert!(!stager.in_cache(&puffin_file_name, blob_key));
1137
1138        let stats = notifier.stats();
1139        assert_eq!(
1140            stats,
1141            Stats {
1142                cache_insert_size: 22,
1143                cache_evict_size: 22,
1144                cache_hit_count: 1,
1145                cache_hit_size: 11,
1146                cache_miss_count: 1,
1147                cache_miss_size: 11,
1148                recycle_insert_size: 22,
1149                recycle_clear_size: 11
1150            }
1151        );
1152
1153        let m = reader.metadata().await.unwrap();
1154        let buf = reader.read(0..m.content_length).await.unwrap();
1155        assert_eq!(&*buf, b"Hello world");
1156
1157        let dir_key = "dir_key";
1158        let files_in_dir = [
1159            ("file_a", "Hello, world!".as_bytes()),
1160            ("file_b", "Hello, Rust!".as_bytes()),
1161            ("file_c", "你好,世界!".as_bytes()),
1162            ("subdir/file_d", "Hello, Puffin!".as_bytes()),
1163            ("subdir/subsubdir/file_e", "¡Hola mundo!".as_bytes()),
1164        ];
1165
1166        // First time to get the directory
1167        let (guard_0, _metrics) = stager
1168            .get_dir(
1169                &puffin_file_name,
1170                dir_key,
1171                Box::new(|writer_provider| {
1172                    Box::pin(async move {
1173                        let mut size = 0;
1174                        for (rel_path, content) in &files_in_dir {
1175                            let mut writer = writer_provider.writer(rel_path).await.unwrap();
1176                            writer.write_all(content).await.unwrap();
1177                            size += content.len() as u64;
1178                        }
1179                        Ok(size)
1180                    })
1181                }),
1182            )
1183            .await
1184            .unwrap();
1185
1186        for (rel_path, content) in &files_in_dir {
1187            let file_path = guard_0.path().join(rel_path);
1188            let mut file = tokio::fs::File::open(&file_path).await.unwrap();
1189            let mut buf = Vec::new();
1190            file.read_to_end(&mut buf).await.unwrap();
1191            assert_eq!(buf, *content);
1192        }
1193
1194        // The directory should be evicted
1195        stager.cache.run_pending_tasks().await;
1196        assert!(!stager.in_cache(&puffin_file_name, dir_key));
1197
1198        let stats = notifier.stats();
1199        assert_eq!(
1200            stats,
1201            Stats {
1202                cache_insert_size: 92,
1203                cache_evict_size: 92,
1204                cache_hit_count: 1,
1205                cache_hit_size: 11,
1206                cache_miss_count: 2,
1207                cache_miss_size: 81,
1208                recycle_insert_size: 92,
1209                recycle_clear_size: 11
1210            }
1211        );
1212
1213        // Second time to get the directory
1214        let (guard_1, _metrics) = stager
1215            .get_dir(
1216                &puffin_file_name,
1217                dir_key,
1218                Box::new(|_| async { Ok(0) }.boxed()),
1219            )
1220            .await
1221            .unwrap();
1222
1223        for (rel_path, content) in &files_in_dir {
1224            let file_path = guard_1.path().join(rel_path);
1225            let mut file = tokio::fs::File::open(&file_path).await.unwrap();
1226            let mut buf = Vec::new();
1227            file.read_to_end(&mut buf).await.unwrap();
1228            assert_eq!(buf, *content);
1229        }
1230
1231        // Still hold the guard
1232        stager.cache.run_pending_tasks().await;
1233        assert!(!stager.in_cache(&puffin_file_name, dir_key));
1234
1235        let stats = notifier.stats();
1236        assert_eq!(
1237            stats,
1238            Stats {
1239                cache_insert_size: 162,
1240                cache_evict_size: 162,
1241                cache_hit_count: 2,
1242                cache_hit_size: 81,
1243                cache_miss_count: 2,
1244                cache_miss_size: 81,
1245                recycle_insert_size: 162,
1246                recycle_clear_size: 81
1247            }
1248        );
1249
1250        // Third time to get the directory and all guards are dropped
1251        drop(guard_0);
1252        drop(guard_1);
1253        let (guard_2, _metrics) = stager
1254            .get_dir(
1255                &puffin_file_name,
1256                dir_key,
1257                Box::new(|_| Box::pin(async move { Ok(0) })),
1258            )
1259            .await
1260            .unwrap();
1261
1262        // Still hold the guard, so the directory should not be removed even if it's evicted
1263        stager.cache.run_pending_tasks().await;
1264        assert!(!stager.in_cache(&puffin_file_name, blob_key));
1265
1266        for (rel_path, content) in &files_in_dir {
1267            let file_path = guard_2.path().join(rel_path);
1268            let mut file = tokio::fs::File::open(&file_path).await.unwrap();
1269            let mut buf = Vec::new();
1270            file.read_to_end(&mut buf).await.unwrap();
1271            assert_eq!(buf, *content);
1272        }
1273
1274        let stats = notifier.stats();
1275        assert_eq!(
1276            stats,
1277            Stats {
1278                cache_insert_size: 232,
1279                cache_evict_size: 232,
1280                cache_hit_count: 3,
1281                cache_hit_size: 151,
1282                cache_miss_count: 2,
1283                cache_miss_size: 81,
1284                recycle_insert_size: 232,
1285                recycle_clear_size: 151
1286            }
1287        );
1288    }
1289
1290    #[tokio::test]
1291    async fn test_get_blob_concurrency_on_fail() {
1292        let tempdir = create_temp_dir("test_get_blob_concurrency_on_fail_");
1293        let stager = BoundedStager::new(tempdir.path().to_path_buf(), u64::MAX, None, None)
1294            .await
1295            .unwrap();
1296
1297        let puffin_file_name = "test_get_blob_concurrency_on_fail".to_string();
1298        let key = "key";
1299
1300        let stager = Arc::new(stager);
1301        let handles = (0..10)
1302            .map(|_| {
1303                let stager = stager.clone();
1304                let puffin_file_name = puffin_file_name.clone();
1305                let task = async move {
1306                    let failed_init = Box::new(|_| {
1307                        async {
1308                            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1309                            BlobNotFoundSnafu { blob: "whatever" }.fail()
1310                        }
1311                        .boxed()
1312                    });
1313                    stager.get_blob(&puffin_file_name, key, failed_init).await
1314                };
1315
1316                tokio::spawn(task)
1317            })
1318            .collect::<Vec<_>>();
1319
1320        for handle in handles {
1321            let r = handle.await.unwrap();
1322            assert!(r.is_err());
1323        }
1324
1325        assert!(!stager.in_cache(&puffin_file_name, key));
1326    }
1327
1328    #[tokio::test]
1329    async fn test_get_dir_concurrency_on_fail() {
1330        let tempdir = create_temp_dir("test_get_dir_concurrency_on_fail_");
1331        let stager = BoundedStager::new(tempdir.path().to_path_buf(), u64::MAX, None, None)
1332            .await
1333            .unwrap();
1334
1335        let puffin_file_name = "test_get_dir_concurrency_on_fail".to_string();
1336        let key = "key";
1337
1338        let stager = Arc::new(stager);
1339        let handles = (0..10)
1340            .map(|_| {
1341                let stager = stager.clone();
1342                let puffin_file_name = puffin_file_name.clone();
1343                let task = async move {
1344                    let failed_init = Box::new(|_| {
1345                        async {
1346                            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1347                            BlobNotFoundSnafu { blob: "whatever" }.fail()
1348                        }
1349                        .boxed()
1350                    });
1351                    stager.get_dir(&puffin_file_name, key, failed_init).await
1352                };
1353
1354                tokio::spawn(task)
1355            })
1356            .collect::<Vec<_>>();
1357
1358        for handle in handles {
1359            let r = handle.await.unwrap();
1360            assert!(r.is_err());
1361        }
1362
1363        assert!(!stager.in_cache(&puffin_file_name, key));
1364    }
1365
1366    #[tokio::test]
1367    async fn test_purge() {
1368        let tempdir = create_temp_dir("test_purge_");
1369        let notifier = MockNotifier::build();
1370        let stager = BoundedStager::new(
1371            tempdir.path().to_path_buf(),
1372            u64::MAX,
1373            Some(notifier.clone()),
1374            None,
1375        )
1376        .await
1377        .unwrap();
1378
1379        // initialize stager
1380        let puffin_file_name = "test_purge".to_string();
1381        let blob_key = "blob_key";
1382        let guard = stager
1383            .get_blob(
1384                &puffin_file_name,
1385                blob_key,
1386                Box::new(|mut writer| {
1387                    Box::pin(async move {
1388                        writer.write_all(b"hello world").await.unwrap();
1389                        Ok(11)
1390                    })
1391                }),
1392            )
1393            .await
1394            .unwrap();
1395        drop(guard);
1396
1397        let files_in_dir = [
1398            ("file_a", "Hello, world!".as_bytes()),
1399            ("file_b", "Hello, Rust!".as_bytes()),
1400            ("file_c", "你好,世界!".as_bytes()),
1401            ("subdir/file_d", "Hello, Puffin!".as_bytes()),
1402            ("subdir/subsubdir/file_e", "¡Hola mundo!".as_bytes()),
1403        ];
1404
1405        let dir_key = "dir_key";
1406        let (guard, _metrics) = stager
1407            .get_dir(
1408                &puffin_file_name,
1409                dir_key,
1410                Box::new(|writer_provider| {
1411                    Box::pin(async move {
1412                        let mut size = 0;
1413                        for (rel_path, content) in &files_in_dir {
1414                            size += content.len();
1415                            let mut writer = writer_provider.writer(rel_path).await.unwrap();
1416                            writer.write_all(content).await.unwrap();
1417                        }
1418                        Ok(size as _)
1419                    })
1420                }),
1421            )
1422            .await
1423            .unwrap();
1424        drop(guard);
1425
1426        // purge the stager
1427        stager.purge(&puffin_file_name).await.unwrap();
1428
1429        let stats = notifier.stats();
1430        assert_eq!(
1431            stats,
1432            Stats {
1433                cache_insert_size: 81,
1434                cache_evict_size: 81,
1435                cache_hit_count: 0,
1436                cache_hit_size: 0,
1437                cache_miss_count: 2,
1438                cache_miss_size: 81,
1439                recycle_insert_size: 81,
1440                recycle_clear_size: 0
1441            }
1442        );
1443    }
1444}