puffin/puffin_manager/stager/
bounded_stager.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::path::PathBuf;
17use std::sync::Arc;
18use std::time::{Duration, Instant};
19
20use async_trait::async_trait;
21use async_walkdir::{Filtering, WalkDir};
22use base64::prelude::BASE64_URL_SAFE;
23use base64::Engine;
24use common_base::range_read::FileReader;
25use common_runtime::runtime::RuntimeTrait;
26use common_telemetry::{info, warn};
27use futures::{FutureExt, StreamExt};
28use moka::future::Cache;
29use moka::policy::EvictionPolicy;
30use sha2::{Digest, Sha256};
31use snafu::ResultExt;
32use tokio::fs;
33use tokio::sync::mpsc::error::TrySendError;
34use tokio::sync::mpsc::{Receiver, Sender};
35use tokio_util::compat::TokioAsyncWriteCompatExt;
36
37use crate::error::{
38    CacheGetSnafu, CreateSnafu, MetadataSnafu, OpenSnafu, ReadSnafu, RemoveSnafu, RenameSnafu,
39    Result, WalkDirSnafu,
40};
41use crate::puffin_manager::stager::{
42    BoxWriter, DirWriterProvider, InitBlobFn, InitDirFn, Stager, StagerNotifier,
43};
44use crate::puffin_manager::{BlobGuard, DirGuard};
45
46const DELETE_QUEUE_SIZE: usize = 10240;
47const TMP_EXTENSION: &str = "tmp";
48const DELETED_EXTENSION: &str = "deleted";
49const RECYCLE_BIN_TTL: Duration = Duration::from_secs(60);
50
51/// `BoundedStager` is a `Stager` that uses `moka` to manage staging area.
52pub struct BoundedStager<H> {
53    /// The base directory of the staging area.
54    base_dir: PathBuf,
55
56    /// The cache maintaining the cache key to the size of the file or directory.
57    cache: Cache<String, CacheValue>,
58
59    /// The recycle bin for the deleted files and directories.
60    recycle_bin: Cache<String, CacheValue>,
61
62    /// The delete queue for the cleanup task.
63    ///
64    /// The lifetime of a guard is:
65    ///   1. initially inserted into the cache
66    ///   2. moved to the recycle bin when evicted
67    ///      2.1 moved back to the cache when accessed
68    ///      2.2 deleted from the recycle bin after a certain period
69    ///   3. sent the delete task to the delete queue on drop
70    ///   4. background routine removes the file or directory
71    delete_queue: Sender<DeleteTask>,
72
73    /// Notifier for the stager.
74    notifier: Option<Arc<dyn StagerNotifier>>,
75
76    _phantom: std::marker::PhantomData<H>,
77}
78
79impl<H: 'static> BoundedStager<H> {
80    pub async fn new(
81        base_dir: PathBuf,
82        capacity: u64,
83        notifier: Option<Arc<dyn StagerNotifier>>,
84        cache_ttl: Option<Duration>,
85    ) -> Result<Self> {
86        tokio::fs::create_dir_all(&base_dir)
87            .await
88            .context(CreateSnafu)?;
89
90        let recycle_bin = Cache::builder().time_to_idle(RECYCLE_BIN_TTL).build();
91        let recycle_bin_cloned = recycle_bin.clone();
92        let notifier_cloned = notifier.clone();
93
94        let mut cache_builder = Cache::builder()
95            .max_capacity(capacity)
96            .weigher(|_: &String, v: &CacheValue| v.weight())
97            .eviction_policy(EvictionPolicy::lru())
98            .support_invalidation_closures()
99            .async_eviction_listener(move |k, v, _| {
100                let recycle_bin = recycle_bin_cloned.clone();
101                if let Some(notifier) = notifier_cloned.as_ref() {
102                    notifier.on_cache_evict(v.size());
103                    notifier.on_recycle_insert(v.size());
104                }
105                async move {
106                    recycle_bin.insert(k.as_str().to_string(), v).await;
107                }
108                .boxed()
109            });
110        if let Some(ttl) = cache_ttl {
111            if !ttl.is_zero() {
112                cache_builder = cache_builder.time_to_live(ttl);
113            }
114        }
115        let cache = cache_builder.build();
116
117        let (delete_queue, rx) = tokio::sync::mpsc::channel(DELETE_QUEUE_SIZE);
118        let notifier_cloned = notifier.clone();
119        common_runtime::global_runtime().spawn(Self::delete_routine(
120            rx,
121            recycle_bin.clone(),
122            notifier_cloned,
123        ));
124        let stager = Self {
125            cache,
126            base_dir,
127            delete_queue,
128            recycle_bin,
129            notifier,
130            _phantom: std::marker::PhantomData,
131        };
132
133        stager.recover().await?;
134
135        Ok(stager)
136    }
137}
138
139#[async_trait]
140impl<H: ToString + Clone + Send + Sync> Stager for BoundedStager<H> {
141    type Blob = Arc<FsBlobGuard>;
142    type Dir = Arc<FsDirGuard>;
143    type FileHandle = H;
144
145    async fn get_blob<'a>(
146        &self,
147        handle: &Self::FileHandle,
148        key: &str,
149        init_fn: Box<dyn InitBlobFn + Send + Sync + 'a>,
150    ) -> Result<Self::Blob> {
151        let handle_str = handle.to_string();
152        let cache_key = Self::encode_cache_key(&handle_str, key);
153
154        let mut miss = false;
155        let v = self
156            .cache
157            .try_get_with_by_ref(&cache_key, async {
158                if let Some(v) = self.recycle_bin.remove(&cache_key).await {
159                    if let Some(notifier) = self.notifier.as_ref() {
160                        let size = v.size();
161                        notifier.on_cache_insert(size);
162                        notifier.on_recycle_clear(size);
163                    }
164                    return Ok(v);
165                }
166
167                miss = true;
168                let timer = Instant::now();
169                let file_name = format!("{}.{}", cache_key, uuid::Uuid::new_v4());
170                let path = self.base_dir.join(&file_name);
171
172                let size = Self::write_blob(&path, init_fn).await?;
173                if let Some(notifier) = self.notifier.as_ref() {
174                    notifier.on_cache_insert(size);
175                    notifier.on_load_blob(timer.elapsed());
176                }
177                let guard = Arc::new(FsBlobGuard {
178                    handle: handle_str,
179                    path,
180                    delete_queue: self.delete_queue.clone(),
181                    size,
182                });
183                Ok(CacheValue::File(guard))
184            })
185            .await
186            .context(CacheGetSnafu)?;
187
188        if let Some(notifier) = self.notifier.as_ref() {
189            if miss {
190                notifier.on_cache_miss(v.size());
191            } else {
192                notifier.on_cache_hit(v.size());
193            }
194        }
195        match v {
196            CacheValue::File(guard) => Ok(guard),
197            _ => unreachable!(),
198        }
199    }
200
201    async fn get_dir<'a>(
202        &self,
203        handle: &Self::FileHandle,
204        key: &str,
205        init_fn: Box<dyn InitDirFn + Send + Sync + 'a>,
206    ) -> Result<Self::Dir> {
207        let handle_str = handle.to_string();
208
209        let cache_key = Self::encode_cache_key(&handle_str, key);
210
211        let mut miss = false;
212        let v = self
213            .cache
214            .try_get_with_by_ref(&cache_key, async {
215                if let Some(v) = self.recycle_bin.remove(&cache_key).await {
216                    if let Some(notifier) = self.notifier.as_ref() {
217                        let size = v.size();
218                        notifier.on_cache_insert(size);
219                        notifier.on_recycle_clear(size);
220                    }
221                    return Ok(v);
222                }
223
224                miss = true;
225                let timer = Instant::now();
226                let dir_name = format!("{}.{}", cache_key, uuid::Uuid::new_v4());
227                let path = self.base_dir.join(&dir_name);
228
229                let size = Self::write_dir(&path, init_fn).await?;
230                if let Some(notifier) = self.notifier.as_ref() {
231                    notifier.on_cache_insert(size);
232                    notifier.on_load_dir(timer.elapsed());
233                }
234                let guard = Arc::new(FsDirGuard {
235                    handle: handle_str,
236                    path,
237                    size,
238                    delete_queue: self.delete_queue.clone(),
239                });
240                Ok(CacheValue::Dir(guard))
241            })
242            .await
243            .context(CacheGetSnafu)?;
244
245        if let Some(notifier) = self.notifier.as_ref() {
246            if miss {
247                notifier.on_cache_miss(v.size());
248            } else {
249                notifier.on_cache_hit(v.size());
250            }
251        }
252        match v {
253            CacheValue::Dir(guard) => Ok(guard),
254            _ => unreachable!(),
255        }
256    }
257
258    async fn put_dir(
259        &self,
260        handle: &Self::FileHandle,
261        key: &str,
262        dir_path: PathBuf,
263        size: u64,
264    ) -> Result<()> {
265        let handle_str = handle.to_string();
266        let cache_key = Self::encode_cache_key(&handle_str, key);
267
268        self.cache
269            .try_get_with(cache_key.clone(), async move {
270                if let Some(v) = self.recycle_bin.remove(&cache_key).await {
271                    if let Some(notifier) = self.notifier.as_ref() {
272                        let size = v.size();
273                        notifier.on_cache_insert(size);
274                        notifier.on_recycle_clear(size);
275                    }
276                    return Ok(v);
277                }
278
279                let dir_name = format!("{}.{}", cache_key, uuid::Uuid::new_v4());
280                let path = self.base_dir.join(&dir_name);
281
282                fs::rename(&dir_path, &path).await.context(RenameSnafu)?;
283                if let Some(notifier) = self.notifier.as_ref() {
284                    notifier.on_cache_insert(size);
285                }
286                let guard = Arc::new(FsDirGuard {
287                    handle: handle_str,
288                    path,
289                    size,
290                    delete_queue: self.delete_queue.clone(),
291                });
292                Ok(CacheValue::Dir(guard))
293            })
294            .await
295            .map(|_| ())
296            .context(CacheGetSnafu)?;
297
298        // Dir is usually large.
299        // Runs pending tasks of the cache and recycle bin to free up space
300        // more quickly.
301        self.cache.run_pending_tasks().await;
302        self.recycle_bin.run_pending_tasks().await;
303
304        Ok(())
305    }
306
307    async fn purge(&self, handle: &Self::FileHandle) -> Result<()> {
308        let handle_str = handle.to_string();
309        self.cache
310            .invalidate_entries_if(move |_k, v| v.handle() == handle_str)
311            .unwrap(); // SAFETY: `support_invalidation_closures` is enabled
312        self.cache.run_pending_tasks().await;
313        Ok(())
314    }
315}
316
317impl<H> BoundedStager<H> {
318    fn encode_cache_key(puffin_file_name: &str, key: &str) -> String {
319        let mut hasher = Sha256::new();
320        hasher.update(puffin_file_name);
321        hasher.update(key);
322        hasher.update(puffin_file_name);
323        let hash = hasher.finalize();
324
325        BASE64_URL_SAFE.encode(hash)
326    }
327
328    async fn write_blob(
329        target_path: &PathBuf,
330        init_fn: Box<dyn InitBlobFn + Send + Sync + '_>,
331    ) -> Result<u64> {
332        // To guarantee the atomicity of writing the file, we need to write
333        // the file to a temporary file first...
334        let tmp_path = target_path.with_extension(TMP_EXTENSION);
335        let writer = Box::new(
336            fs::File::create(&tmp_path)
337                .await
338                .context(CreateSnafu)?
339                .compat_write(),
340        );
341        let size = init_fn(writer).await?;
342
343        // ...then rename the temporary file to the target path
344        fs::rename(tmp_path, target_path)
345            .await
346            .context(RenameSnafu)?;
347        Ok(size)
348    }
349
350    async fn write_dir(
351        target_path: &PathBuf,
352        init_fn: Box<dyn InitDirFn + Send + Sync + '_>,
353    ) -> Result<u64> {
354        // To guarantee the atomicity of writing the directory, we need to write
355        // the directory to a temporary directory first...
356        let tmp_base = target_path.with_extension(TMP_EXTENSION);
357        let writer_provider = Box::new(MokaDirWriterProvider(tmp_base.clone()));
358        let size = init_fn(writer_provider).await?;
359
360        // ...then rename the temporary directory to the target path
361        fs::rename(&tmp_base, target_path)
362            .await
363            .context(RenameSnafu)?;
364        Ok(size)
365    }
366
367    /// Recovers the staging area by iterating through the staging directory.
368    ///
369    /// Note: It can't recover the mapping between puffin files and keys, so TTL
370    ///       is configured to purge the dangling files and directories.
371    async fn recover(&self) -> Result<()> {
372        let mut read_dir = fs::read_dir(&self.base_dir).await.context(ReadSnafu)?;
373
374        let mut elems = HashMap::new();
375        while let Some(entry) = read_dir.next_entry().await.context(ReadSnafu)? {
376            let path = entry.path();
377
378            if path.extension() == Some(TMP_EXTENSION.as_ref())
379                || path.extension() == Some(DELETED_EXTENSION.as_ref())
380            {
381                // Remove temporary or deleted files and directories
382                if entry.metadata().await.context(MetadataSnafu)?.is_dir() {
383                    fs::remove_dir_all(path).await.context(RemoveSnafu)?;
384                } else {
385                    fs::remove_file(path).await.context(RemoveSnafu)?;
386                }
387            } else {
388                // Insert the guard of the file or directory to the cache
389                let meta = entry.metadata().await.context(MetadataSnafu)?;
390                let file_path = path.file_name().unwrap().to_string_lossy().into_owned();
391
392                // <key>.<uuid>
393                let key = match file_path.split('.').next() {
394                    Some(key) => key.to_string(),
395                    None => {
396                        warn!(
397                            "Invalid staging file name: {}, expected format: <key>.<uuid>",
398                            file_path
399                        );
400                        continue;
401                    }
402                };
403
404                if meta.is_dir() {
405                    let size = Self::get_dir_size(&path).await?;
406                    let v = CacheValue::Dir(Arc::new(FsDirGuard {
407                        path,
408                        size,
409                        delete_queue: self.delete_queue.clone(),
410
411                        // placeholder
412                        handle: String::new(),
413                    }));
414                    // A duplicate dir will be moved to the delete queue.
415                    let _dup_dir = elems.insert(key, v);
416                } else {
417                    let size = meta.len();
418                    let v = CacheValue::File(Arc::new(FsBlobGuard {
419                        path,
420                        size,
421                        delete_queue: self.delete_queue.clone(),
422
423                        // placeholder
424                        handle: String::new(),
425                    }));
426                    // A duplicate file will be moved to the delete queue.
427                    let _dup_file = elems.insert(key, v);
428                }
429            }
430        }
431
432        let mut size = 0;
433        for (key, value) in elems {
434            size += value.size();
435            self.cache.insert(key, value).await;
436        }
437        if let Some(notifier) = self.notifier.as_ref() {
438            notifier.on_cache_insert(size);
439        }
440
441        self.cache.run_pending_tasks().await;
442
443        Ok(())
444    }
445
446    /// Walks through the directory and calculate the total size of all files in the directory.
447    async fn get_dir_size(path: &PathBuf) -> Result<u64> {
448        let mut size = 0;
449        let mut wd = WalkDir::new(path).filter(|entry| async move {
450            match entry.file_type().await {
451                Ok(ft) if ft.is_dir() => Filtering::Ignore,
452                _ => Filtering::Continue,
453            }
454        });
455
456        while let Some(entry) = wd.next().await {
457            let entry = entry.context(WalkDirSnafu)?;
458            size += entry.metadata().await.context(MetadataSnafu)?.len();
459        }
460
461        Ok(size)
462    }
463
464    async fn delete_routine(
465        mut receiver: Receiver<DeleteTask>,
466        recycle_bin: Cache<String, CacheValue>,
467        notifier: Option<Arc<dyn StagerNotifier>>,
468    ) {
469        loop {
470            match tokio::time::timeout(RECYCLE_BIN_TTL, receiver.recv()).await {
471                Ok(Some(task)) => match task {
472                    DeleteTask::File(path, size) => {
473                        if let Err(err) = fs::remove_file(&path).await {
474                            if err.kind() == std::io::ErrorKind::NotFound {
475                                continue;
476                            }
477
478                            warn!(err; "Failed to remove the file.");
479                        }
480
481                        if let Some(notifier) = notifier.as_ref() {
482                            notifier.on_recycle_clear(size);
483                        }
484                    }
485
486                    DeleteTask::Dir(path, size) => {
487                        let deleted_path = path.with_extension(DELETED_EXTENSION);
488                        if let Err(err) = fs::rename(&path, &deleted_path).await {
489                            if err.kind() == std::io::ErrorKind::NotFound {
490                                continue;
491                            }
492
493                            // Remove the deleted directory if the rename fails and retry
494                            let _ = fs::remove_dir_all(&deleted_path).await;
495                            if let Err(err) = fs::rename(&path, &deleted_path).await {
496                                warn!(err; "Failed to rename the dangling directory to deleted path.");
497                                continue;
498                            }
499                        }
500                        if let Err(err) = fs::remove_dir_all(&deleted_path).await {
501                            warn!(err; "Failed to remove the dangling directory.");
502                        }
503                        if let Some(notifier) = notifier.as_ref() {
504                            notifier.on_recycle_clear(size);
505                        }
506                    }
507                    DeleteTask::Terminate => {
508                        break;
509                    }
510                },
511                Ok(None) => break,
512                Err(_) => {
513                    // Purge recycle bin periodically to reclaim the space quickly.
514                    recycle_bin.run_pending_tasks().await;
515                }
516            }
517        }
518
519        info!("The delete routine for the bounded stager is terminated.");
520    }
521}
522
523impl<H> Drop for BoundedStager<H> {
524    fn drop(&mut self) {
525        let _ = self.delete_queue.try_send(DeleteTask::Terminate);
526    }
527}
528
529#[derive(Debug, Clone)]
530enum CacheValue {
531    File(Arc<FsBlobGuard>),
532    Dir(Arc<FsDirGuard>),
533}
534
535impl CacheValue {
536    fn size(&self) -> u64 {
537        match self {
538            CacheValue::File(guard) => guard.size,
539            CacheValue::Dir(guard) => guard.size,
540        }
541    }
542
543    fn weight(&self) -> u32 {
544        self.size().try_into().unwrap_or(u32::MAX)
545    }
546
547    fn handle(&self) -> &str {
548        match self {
549            CacheValue::File(guard) => &guard.handle,
550            CacheValue::Dir(guard) => &guard.handle,
551        }
552    }
553}
554
555enum DeleteTask {
556    File(PathBuf, u64),
557    Dir(PathBuf, u64),
558    Terminate,
559}
560
561/// `FsBlobGuard` is a `BlobGuard` for accessing the blob and
562/// automatically deleting the file on drop.
563#[derive(Debug)]
564pub struct FsBlobGuard {
565    handle: String,
566    path: PathBuf,
567    size: u64,
568    delete_queue: Sender<DeleteTask>,
569}
570
571#[async_trait]
572impl BlobGuard for FsBlobGuard {
573    type Reader = FileReader;
574
575    async fn reader(&self) -> Result<Self::Reader> {
576        FileReader::new(&self.path).await.context(OpenSnafu)
577    }
578}
579
580impl Drop for FsBlobGuard {
581    fn drop(&mut self) {
582        if let Err(err) = self
583            .delete_queue
584            .try_send(DeleteTask::File(self.path.clone(), self.size))
585        {
586            if matches!(err, TrySendError::Closed(_)) {
587                return;
588            }
589            warn!(err; "Failed to send the delete task for the file.");
590        }
591    }
592}
593
594/// `FsDirGuard` is a `DirGuard` for accessing the directory and
595/// automatically deleting the directory on drop.
596#[derive(Debug)]
597pub struct FsDirGuard {
598    handle: String,
599    path: PathBuf,
600    size: u64,
601    delete_queue: Sender<DeleteTask>,
602}
603
604impl DirGuard for FsDirGuard {
605    fn path(&self) -> &PathBuf {
606        &self.path
607    }
608}
609
610impl Drop for FsDirGuard {
611    fn drop(&mut self) {
612        if let Err(err) = self
613            .delete_queue
614            .try_send(DeleteTask::Dir(self.path.clone(), self.size))
615        {
616            if matches!(err, TrySendError::Closed(_)) {
617                return;
618            }
619            warn!(err; "Failed to send the delete task for the directory.");
620        }
621    }
622}
623
624/// `MokaDirWriterProvider` implements `DirWriterProvider` for initializing a directory.
625struct MokaDirWriterProvider(PathBuf);
626
627#[async_trait]
628impl DirWriterProvider for MokaDirWriterProvider {
629    async fn writer(&self, rel_path: &str) -> Result<BoxWriter> {
630        let full_path = if cfg!(windows) {
631            self.0.join(rel_path.replace('/', "\\"))
632        } else {
633            self.0.join(rel_path)
634        };
635        if let Some(parent) = full_path.parent() {
636            fs::create_dir_all(parent).await.context(CreateSnafu)?;
637        }
638        Ok(Box::new(
639            fs::File::create(full_path)
640                .await
641                .context(CreateSnafu)?
642                .compat_write(),
643        ) as BoxWriter)
644    }
645}
646
647#[cfg(test)]
648impl<H> BoundedStager<H> {
649    pub async fn must_get_file(&self, puffin_file_name: &str, key: &str) -> fs::File {
650        let cache_key = Self::encode_cache_key(puffin_file_name, key);
651        let value = self.cache.get(&cache_key).await.unwrap();
652        let path = match &value {
653            CacheValue::File(guard) => &guard.path,
654            _ => panic!("Expected a file, but got a directory."),
655        };
656        fs::File::open(path).await.unwrap()
657    }
658
659    pub async fn must_get_dir(&self, puffin_file_name: &str, key: &str) -> PathBuf {
660        let cache_key = Self::encode_cache_key(puffin_file_name, key);
661        let value = self.cache.get(&cache_key).await.unwrap();
662        let path = match &value {
663            CacheValue::Dir(guard) => &guard.path,
664            _ => panic!("Expected a directory, but got a file."),
665        };
666        path.clone()
667    }
668
669    pub fn in_cache(&self, puffin_file_name: &str, key: &str) -> bool {
670        let cache_key = Self::encode_cache_key(puffin_file_name, key);
671        self.cache.contains_key(&cache_key)
672    }
673}
674
675#[cfg(test)]
676mod tests {
677    use std::sync::atomic::AtomicU64;
678
679    use common_base::range_read::RangeReader;
680    use common_test_util::temp_dir::create_temp_dir;
681    use futures::AsyncWriteExt;
682    use tokio::io::AsyncReadExt as _;
683
684    use super::*;
685    use crate::error::BlobNotFoundSnafu;
686    use crate::puffin_manager::stager::Stager;
687
688    struct MockNotifier {
689        cache_insert_size: AtomicU64,
690        cache_evict_size: AtomicU64,
691        cache_hit_count: AtomicU64,
692        cache_hit_size: AtomicU64,
693        cache_miss_count: AtomicU64,
694        cache_miss_size: AtomicU64,
695        recycle_insert_size: AtomicU64,
696        recycle_clear_size: AtomicU64,
697    }
698
699    #[derive(Debug, PartialEq, Eq)]
700    struct Stats {
701        cache_insert_size: u64,
702        cache_evict_size: u64,
703        cache_hit_count: u64,
704        cache_hit_size: u64,
705        cache_miss_count: u64,
706        cache_miss_size: u64,
707        recycle_insert_size: u64,
708        recycle_clear_size: u64,
709    }
710
711    impl MockNotifier {
712        fn build() -> Arc<MockNotifier> {
713            Arc::new(Self {
714                cache_insert_size: AtomicU64::new(0),
715                cache_evict_size: AtomicU64::new(0),
716                cache_hit_count: AtomicU64::new(0),
717                cache_hit_size: AtomicU64::new(0),
718                cache_miss_count: AtomicU64::new(0),
719                cache_miss_size: AtomicU64::new(0),
720                recycle_insert_size: AtomicU64::new(0),
721                recycle_clear_size: AtomicU64::new(0),
722            })
723        }
724
725        fn stats(&self) -> Stats {
726            Stats {
727                cache_insert_size: self
728                    .cache_insert_size
729                    .load(std::sync::atomic::Ordering::Relaxed),
730                cache_evict_size: self
731                    .cache_evict_size
732                    .load(std::sync::atomic::Ordering::Relaxed),
733                cache_hit_count: self
734                    .cache_hit_count
735                    .load(std::sync::atomic::Ordering::Relaxed),
736                cache_hit_size: self
737                    .cache_hit_size
738                    .load(std::sync::atomic::Ordering::Relaxed),
739                cache_miss_count: self
740                    .cache_miss_count
741                    .load(std::sync::atomic::Ordering::Relaxed),
742                cache_miss_size: self
743                    .cache_miss_size
744                    .load(std::sync::atomic::Ordering::Relaxed),
745                recycle_insert_size: self
746                    .recycle_insert_size
747                    .load(std::sync::atomic::Ordering::Relaxed),
748                recycle_clear_size: self
749                    .recycle_clear_size
750                    .load(std::sync::atomic::Ordering::Relaxed),
751            }
752        }
753    }
754
755    impl StagerNotifier for MockNotifier {
756        fn on_cache_insert(&self, size: u64) {
757            self.cache_insert_size
758                .fetch_add(size, std::sync::atomic::Ordering::Relaxed);
759        }
760
761        fn on_cache_evict(&self, size: u64) {
762            self.cache_evict_size
763                .fetch_add(size, std::sync::atomic::Ordering::Relaxed);
764        }
765
766        fn on_cache_hit(&self, size: u64) {
767            self.cache_hit_count
768                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
769            self.cache_hit_size
770                .fetch_add(size, std::sync::atomic::Ordering::Relaxed);
771        }
772
773        fn on_cache_miss(&self, size: u64) {
774            self.cache_miss_count
775                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
776            self.cache_miss_size
777                .fetch_add(size, std::sync::atomic::Ordering::Relaxed);
778        }
779
780        fn on_recycle_insert(&self, size: u64) {
781            self.recycle_insert_size
782                .fetch_add(size, std::sync::atomic::Ordering::Relaxed);
783        }
784
785        fn on_recycle_clear(&self, size: u64) {
786            self.recycle_clear_size
787                .fetch_add(size, std::sync::atomic::Ordering::Relaxed);
788        }
789
790        fn on_load_blob(&self, _duration: Duration) {}
791
792        fn on_load_dir(&self, _duration: Duration) {}
793    }
794
795    #[tokio::test]
796    async fn test_get_blob() {
797        let tempdir = create_temp_dir("test_get_blob_");
798        let notifier = MockNotifier::build();
799        let stager = BoundedStager::new(
800            tempdir.path().to_path_buf(),
801            u64::MAX,
802            Some(notifier.clone()),
803            None,
804        )
805        .await
806        .unwrap();
807
808        let puffin_file_name = "test_get_blob".to_string();
809        let key = "key";
810        let reader = stager
811            .get_blob(
812                &puffin_file_name,
813                key,
814                Box::new(|mut writer| {
815                    Box::pin(async move {
816                        writer.write_all(b"hello world").await.unwrap();
817                        Ok(11)
818                    })
819                }),
820            )
821            .await
822            .unwrap()
823            .reader()
824            .await
825            .unwrap();
826
827        let m = reader.metadata().await.unwrap();
828        let buf = reader.read(0..m.content_length).await.unwrap();
829        assert_eq!(&*buf, b"hello world");
830
831        let mut file = stager.must_get_file(&puffin_file_name, key).await;
832        let mut buf = Vec::new();
833        file.read_to_end(&mut buf).await.unwrap();
834        assert_eq!(buf, b"hello world");
835
836        let stats = notifier.stats();
837        assert_eq!(
838            stats,
839            Stats {
840                cache_insert_size: 11,
841                cache_evict_size: 0,
842                cache_hit_count: 0,
843                cache_hit_size: 0,
844                cache_miss_count: 1,
845                cache_miss_size: 11,
846                recycle_insert_size: 0,
847                recycle_clear_size: 0,
848            }
849        );
850    }
851
852    #[tokio::test]
853    async fn test_get_dir() {
854        let tempdir = create_temp_dir("test_get_dir_");
855        let notifier = MockNotifier::build();
856        let stager = BoundedStager::new(
857            tempdir.path().to_path_buf(),
858            u64::MAX,
859            Some(notifier.clone()),
860            None,
861        )
862        .await
863        .unwrap();
864
865        let files_in_dir = [
866            ("file_a", "Hello, world!".as_bytes()),
867            ("file_b", "Hello, Rust!".as_bytes()),
868            ("file_c", "你好,世界!".as_bytes()),
869            ("subdir/file_d", "Hello, Puffin!".as_bytes()),
870            ("subdir/subsubdir/file_e", "¡Hola mundo!".as_bytes()),
871        ];
872
873        let puffin_file_name = "test_get_dir".to_string();
874        let key = "key";
875        let dir_path = stager
876            .get_dir(
877                &puffin_file_name,
878                key,
879                Box::new(|writer_provider| {
880                    Box::pin(async move {
881                        let mut size = 0;
882                        for (rel_path, content) in &files_in_dir {
883                            size += content.len();
884                            let mut writer = writer_provider.writer(rel_path).await.unwrap();
885                            writer.write_all(content).await.unwrap();
886                        }
887                        Ok(size as _)
888                    })
889                }),
890            )
891            .await
892            .unwrap();
893
894        for (rel_path, content) in &files_in_dir {
895            let file_path = dir_path.path().join(rel_path);
896            let mut file = tokio::fs::File::open(&file_path).await.unwrap();
897            let mut buf = Vec::new();
898            file.read_to_end(&mut buf).await.unwrap();
899            assert_eq!(buf, *content);
900        }
901
902        let dir_path = stager.must_get_dir(&puffin_file_name, key).await;
903        for (rel_path, content) in &files_in_dir {
904            let file_path = dir_path.join(rel_path);
905            let mut file = tokio::fs::File::open(&file_path).await.unwrap();
906            let mut buf = Vec::new();
907            file.read_to_end(&mut buf).await.unwrap();
908            assert_eq!(buf, *content);
909        }
910
911        let stats = notifier.stats();
912        assert_eq!(
913            stats,
914            Stats {
915                cache_insert_size: 70,
916                cache_evict_size: 0,
917                cache_hit_count: 0,
918                cache_hit_size: 0,
919                cache_miss_count: 1,
920                cache_miss_size: 70,
921                recycle_insert_size: 0,
922                recycle_clear_size: 0
923            }
924        );
925    }
926
927    #[tokio::test]
928    async fn test_recover() {
929        let tempdir = create_temp_dir("test_recover_");
930        let notifier = MockNotifier::build();
931        let stager = BoundedStager::new(
932            tempdir.path().to_path_buf(),
933            u64::MAX,
934            Some(notifier.clone()),
935            None,
936        )
937        .await
938        .unwrap();
939
940        // initialize stager
941        let puffin_file_name = "test_recover".to_string();
942        let blob_key = "blob_key";
943        let guard = stager
944            .get_blob(
945                &puffin_file_name,
946                blob_key,
947                Box::new(|mut writer| {
948                    Box::pin(async move {
949                        writer.write_all(b"hello world").await.unwrap();
950                        Ok(11)
951                    })
952                }),
953            )
954            .await
955            .unwrap();
956        drop(guard);
957
958        let files_in_dir = [
959            ("file_a", "Hello, world!".as_bytes()),
960            ("file_b", "Hello, Rust!".as_bytes()),
961            ("file_c", "你好,世界!".as_bytes()),
962            ("subdir/file_d", "Hello, Puffin!".as_bytes()),
963            ("subdir/subsubdir/file_e", "¡Hola mundo!".as_bytes()),
964        ];
965
966        let dir_key = "dir_key";
967        let guard = stager
968            .get_dir(
969                &puffin_file_name,
970                dir_key,
971                Box::new(|writer_provider| {
972                    Box::pin(async move {
973                        let mut size = 0;
974                        for (rel_path, content) in &files_in_dir {
975                            size += content.len();
976                            let mut writer = writer_provider.writer(rel_path).await.unwrap();
977                            writer.write_all(content).await.unwrap();
978                        }
979                        Ok(size as _)
980                    })
981                }),
982            )
983            .await
984            .unwrap();
985        drop(guard);
986
987        // recover stager
988        drop(stager);
989        let stager = BoundedStager::new(tempdir.path().to_path_buf(), u64::MAX, None, None)
990            .await
991            .unwrap();
992
993        let reader = stager
994            .get_blob(
995                &puffin_file_name,
996                blob_key,
997                Box::new(|_| Box::pin(async { Ok(0) })),
998            )
999            .await
1000            .unwrap()
1001            .reader()
1002            .await
1003            .unwrap();
1004
1005        let m = reader.metadata().await.unwrap();
1006        let buf = reader.read(0..m.content_length).await.unwrap();
1007        assert_eq!(&*buf, b"hello world");
1008
1009        let dir_path = stager
1010            .get_dir(
1011                &puffin_file_name,
1012                dir_key,
1013                Box::new(|_| Box::pin(async { Ok(0) })),
1014            )
1015            .await
1016            .unwrap();
1017        for (rel_path, content) in &files_in_dir {
1018            let file_path = dir_path.path().join(rel_path);
1019            let mut file = tokio::fs::File::open(&file_path).await.unwrap();
1020            let mut buf = Vec::new();
1021            file.read_to_end(&mut buf).await.unwrap();
1022            assert_eq!(buf, *content);
1023        }
1024
1025        let stats = notifier.stats();
1026        assert_eq!(
1027            stats,
1028            Stats {
1029                cache_insert_size: 81,
1030                cache_evict_size: 0,
1031                cache_hit_count: 0,
1032                cache_hit_size: 0,
1033                cache_miss_count: 2,
1034                cache_miss_size: 81,
1035                recycle_insert_size: 0,
1036                recycle_clear_size: 0
1037            }
1038        );
1039    }
1040
1041    #[tokio::test]
1042    async fn test_eviction() {
1043        let tempdir = create_temp_dir("test_eviction_");
1044        let notifier = MockNotifier::build();
1045        let stager = BoundedStager::new(
1046            tempdir.path().to_path_buf(),
1047            1, /* extremely small size */
1048            Some(notifier.clone()),
1049            None,
1050        )
1051        .await
1052        .unwrap();
1053
1054        let puffin_file_name = "test_eviction".to_string();
1055        let blob_key = "blob_key";
1056
1057        // First time to get the blob
1058        let reader = stager
1059            .get_blob(
1060                &puffin_file_name,
1061                blob_key,
1062                Box::new(|mut writer| {
1063                    Box::pin(async move {
1064                        writer.write_all(b"Hello world").await.unwrap();
1065                        Ok(11)
1066                    })
1067                }),
1068            )
1069            .await
1070            .unwrap()
1071            .reader()
1072            .await
1073            .unwrap();
1074
1075        // The blob should be evicted
1076        stager.cache.run_pending_tasks().await;
1077        assert!(!stager.in_cache(&puffin_file_name, blob_key));
1078
1079        let stats = notifier.stats();
1080        assert_eq!(
1081            stats,
1082            Stats {
1083                cache_insert_size: 11,
1084                cache_evict_size: 11,
1085                cache_hit_count: 0,
1086                cache_hit_size: 0,
1087                cache_miss_count: 1,
1088                cache_miss_size: 11,
1089                recycle_insert_size: 11,
1090                recycle_clear_size: 0
1091            }
1092        );
1093
1094        let m = reader.metadata().await.unwrap();
1095        let buf = reader.read(0..m.content_length).await.unwrap();
1096        assert_eq!(&*buf, b"Hello world");
1097
1098        // Second time to get the blob, get from recycle bin
1099        let reader = stager
1100            .get_blob(
1101                &puffin_file_name,
1102                blob_key,
1103                Box::new(|_| async { Ok(0) }.boxed()),
1104            )
1105            .await
1106            .unwrap()
1107            .reader()
1108            .await
1109            .unwrap();
1110
1111        // The blob should be evicted
1112        stager.cache.run_pending_tasks().await;
1113        assert!(!stager.in_cache(&puffin_file_name, blob_key));
1114
1115        let stats = notifier.stats();
1116        assert_eq!(
1117            stats,
1118            Stats {
1119                cache_insert_size: 22,
1120                cache_evict_size: 22,
1121                cache_hit_count: 1,
1122                cache_hit_size: 11,
1123                cache_miss_count: 1,
1124                cache_miss_size: 11,
1125                recycle_insert_size: 22,
1126                recycle_clear_size: 11
1127            }
1128        );
1129
1130        let m = reader.metadata().await.unwrap();
1131        let buf = reader.read(0..m.content_length).await.unwrap();
1132        assert_eq!(&*buf, b"Hello world");
1133
1134        let dir_key = "dir_key";
1135        let files_in_dir = [
1136            ("file_a", "Hello, world!".as_bytes()),
1137            ("file_b", "Hello, Rust!".as_bytes()),
1138            ("file_c", "你好,世界!".as_bytes()),
1139            ("subdir/file_d", "Hello, Puffin!".as_bytes()),
1140            ("subdir/subsubdir/file_e", "¡Hola mundo!".as_bytes()),
1141        ];
1142
1143        // First time to get the directory
1144        let guard_0 = stager
1145            .get_dir(
1146                &puffin_file_name,
1147                dir_key,
1148                Box::new(|writer_provider| {
1149                    Box::pin(async move {
1150                        let mut size = 0;
1151                        for (rel_path, content) in &files_in_dir {
1152                            let mut writer = writer_provider.writer(rel_path).await.unwrap();
1153                            writer.write_all(content).await.unwrap();
1154                            size += content.len() as u64;
1155                        }
1156                        Ok(size)
1157                    })
1158                }),
1159            )
1160            .await
1161            .unwrap();
1162
1163        for (rel_path, content) in &files_in_dir {
1164            let file_path = guard_0.path().join(rel_path);
1165            let mut file = tokio::fs::File::open(&file_path).await.unwrap();
1166            let mut buf = Vec::new();
1167            file.read_to_end(&mut buf).await.unwrap();
1168            assert_eq!(buf, *content);
1169        }
1170
1171        // The directory should be evicted
1172        stager.cache.run_pending_tasks().await;
1173        assert!(!stager.in_cache(&puffin_file_name, dir_key));
1174
1175        let stats = notifier.stats();
1176        assert_eq!(
1177            stats,
1178            Stats {
1179                cache_insert_size: 92,
1180                cache_evict_size: 92,
1181                cache_hit_count: 1,
1182                cache_hit_size: 11,
1183                cache_miss_count: 2,
1184                cache_miss_size: 81,
1185                recycle_insert_size: 92,
1186                recycle_clear_size: 11
1187            }
1188        );
1189
1190        // Second time to get the directory
1191        let guard_1 = stager
1192            .get_dir(
1193                &puffin_file_name,
1194                dir_key,
1195                Box::new(|_| async { Ok(0) }.boxed()),
1196            )
1197            .await
1198            .unwrap();
1199
1200        for (rel_path, content) in &files_in_dir {
1201            let file_path = guard_1.path().join(rel_path);
1202            let mut file = tokio::fs::File::open(&file_path).await.unwrap();
1203            let mut buf = Vec::new();
1204            file.read_to_end(&mut buf).await.unwrap();
1205            assert_eq!(buf, *content);
1206        }
1207
1208        // Still hold the guard
1209        stager.cache.run_pending_tasks().await;
1210        assert!(!stager.in_cache(&puffin_file_name, dir_key));
1211
1212        let stats = notifier.stats();
1213        assert_eq!(
1214            stats,
1215            Stats {
1216                cache_insert_size: 162,
1217                cache_evict_size: 162,
1218                cache_hit_count: 2,
1219                cache_hit_size: 81,
1220                cache_miss_count: 2,
1221                cache_miss_size: 81,
1222                recycle_insert_size: 162,
1223                recycle_clear_size: 81
1224            }
1225        );
1226
1227        // Third time to get the directory and all guards are dropped
1228        drop(guard_0);
1229        drop(guard_1);
1230        let guard_2 = stager
1231            .get_dir(
1232                &puffin_file_name,
1233                dir_key,
1234                Box::new(|_| Box::pin(async move { Ok(0) })),
1235            )
1236            .await
1237            .unwrap();
1238
1239        // Still hold the guard, so the directory should not be removed even if it's evicted
1240        stager.cache.run_pending_tasks().await;
1241        assert!(!stager.in_cache(&puffin_file_name, blob_key));
1242
1243        for (rel_path, content) in &files_in_dir {
1244            let file_path = guard_2.path().join(rel_path);
1245            let mut file = tokio::fs::File::open(&file_path).await.unwrap();
1246            let mut buf = Vec::new();
1247            file.read_to_end(&mut buf).await.unwrap();
1248            assert_eq!(buf, *content);
1249        }
1250
1251        let stats = notifier.stats();
1252        assert_eq!(
1253            stats,
1254            Stats {
1255                cache_insert_size: 232,
1256                cache_evict_size: 232,
1257                cache_hit_count: 3,
1258                cache_hit_size: 151,
1259                cache_miss_count: 2,
1260                cache_miss_size: 81,
1261                recycle_insert_size: 232,
1262                recycle_clear_size: 151
1263            }
1264        );
1265    }
1266
1267    #[tokio::test]
1268    async fn test_get_blob_concurrency_on_fail() {
1269        let tempdir = create_temp_dir("test_get_blob_concurrency_on_fail_");
1270        let stager = BoundedStager::new(tempdir.path().to_path_buf(), u64::MAX, None, None)
1271            .await
1272            .unwrap();
1273
1274        let puffin_file_name = "test_get_blob_concurrency_on_fail".to_string();
1275        let key = "key";
1276
1277        let stager = Arc::new(stager);
1278        let handles = (0..10)
1279            .map(|_| {
1280                let stager = stager.clone();
1281                let puffin_file_name = puffin_file_name.clone();
1282                let task = async move {
1283                    let failed_init = Box::new(|_| {
1284                        async {
1285                            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1286                            BlobNotFoundSnafu { blob: "whatever" }.fail()
1287                        }
1288                        .boxed()
1289                    });
1290                    stager.get_blob(&puffin_file_name, key, failed_init).await
1291                };
1292
1293                tokio::spawn(task)
1294            })
1295            .collect::<Vec<_>>();
1296
1297        for handle in handles {
1298            let r = handle.await.unwrap();
1299            assert!(r.is_err());
1300        }
1301
1302        assert!(!stager.in_cache(&puffin_file_name, key));
1303    }
1304
1305    #[tokio::test]
1306    async fn test_get_dir_concurrency_on_fail() {
1307        let tempdir = create_temp_dir("test_get_dir_concurrency_on_fail_");
1308        let stager = BoundedStager::new(tempdir.path().to_path_buf(), u64::MAX, None, None)
1309            .await
1310            .unwrap();
1311
1312        let puffin_file_name = "test_get_dir_concurrency_on_fail".to_string();
1313        let key = "key";
1314
1315        let stager = Arc::new(stager);
1316        let handles = (0..10)
1317            .map(|_| {
1318                let stager = stager.clone();
1319                let puffin_file_name = puffin_file_name.clone();
1320                let task = async move {
1321                    let failed_init = Box::new(|_| {
1322                        async {
1323                            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1324                            BlobNotFoundSnafu { blob: "whatever" }.fail()
1325                        }
1326                        .boxed()
1327                    });
1328                    stager.get_dir(&puffin_file_name, key, failed_init).await
1329                };
1330
1331                tokio::spawn(task)
1332            })
1333            .collect::<Vec<_>>();
1334
1335        for handle in handles {
1336            let r = handle.await.unwrap();
1337            assert!(r.is_err());
1338        }
1339
1340        assert!(!stager.in_cache(&puffin_file_name, key));
1341    }
1342
1343    #[tokio::test]
1344    async fn test_purge() {
1345        let tempdir = create_temp_dir("test_purge_");
1346        let notifier = MockNotifier::build();
1347        let stager = BoundedStager::new(
1348            tempdir.path().to_path_buf(),
1349            u64::MAX,
1350            Some(notifier.clone()),
1351            None,
1352        )
1353        .await
1354        .unwrap();
1355
1356        // initialize stager
1357        let puffin_file_name = "test_purge".to_string();
1358        let blob_key = "blob_key";
1359        let guard = stager
1360            .get_blob(
1361                &puffin_file_name,
1362                blob_key,
1363                Box::new(|mut writer| {
1364                    Box::pin(async move {
1365                        writer.write_all(b"hello world").await.unwrap();
1366                        Ok(11)
1367                    })
1368                }),
1369            )
1370            .await
1371            .unwrap();
1372        drop(guard);
1373
1374        let files_in_dir = [
1375            ("file_a", "Hello, world!".as_bytes()),
1376            ("file_b", "Hello, Rust!".as_bytes()),
1377            ("file_c", "你好,世界!".as_bytes()),
1378            ("subdir/file_d", "Hello, Puffin!".as_bytes()),
1379            ("subdir/subsubdir/file_e", "¡Hola mundo!".as_bytes()),
1380        ];
1381
1382        let dir_key = "dir_key";
1383        let guard = stager
1384            .get_dir(
1385                &puffin_file_name,
1386                dir_key,
1387                Box::new(|writer_provider| {
1388                    Box::pin(async move {
1389                        let mut size = 0;
1390                        for (rel_path, content) in &files_in_dir {
1391                            size += content.len();
1392                            let mut writer = writer_provider.writer(rel_path).await.unwrap();
1393                            writer.write_all(content).await.unwrap();
1394                        }
1395                        Ok(size as _)
1396                    })
1397                }),
1398            )
1399            .await
1400            .unwrap();
1401        drop(guard);
1402
1403        // purge the stager
1404        stager.purge(&puffin_file_name).await.unwrap();
1405
1406        let stats = notifier.stats();
1407        assert_eq!(
1408            stats,
1409            Stats {
1410                cache_insert_size: 81,
1411                cache_evict_size: 81,
1412                cache_hit_count: 0,
1413                cache_hit_size: 0,
1414                cache_miss_count: 2,
1415                cache_miss_size: 81,
1416                recycle_insert_size: 81,
1417                recycle_clear_size: 0
1418            }
1419        );
1420    }
1421}