puffin/puffin_manager/stager/
bounded_stager.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::path::PathBuf;
17use std::sync::Arc;
18use std::time::{Duration, Instant};
19
20use async_trait::async_trait;
21use async_walkdir::{Filtering, WalkDir};
22use base64::prelude::BASE64_URL_SAFE;
23use base64::Engine;
24use common_base::range_read::FileReader;
25use common_runtime::runtime::RuntimeTrait;
26use common_telemetry::{info, warn};
27use futures::{FutureExt, StreamExt};
28use moka::future::Cache;
29use moka::policy::EvictionPolicy;
30use sha2::{Digest, Sha256};
31use snafu::ResultExt;
32use tokio::fs;
33use tokio::sync::mpsc::error::TrySendError;
34use tokio::sync::mpsc::{Receiver, Sender};
35use tokio_util::compat::TokioAsyncWriteCompatExt;
36
37use crate::error::{
38    CacheGetSnafu, CreateSnafu, MetadataSnafu, OpenSnafu, ReadSnafu, RemoveSnafu, RenameSnafu,
39    Result, WalkDirSnafu,
40};
41use crate::puffin_manager::stager::{
42    BoxWriter, DirWriterProvider, InitBlobFn, InitDirFn, Stager, StagerNotifier,
43};
44use crate::puffin_manager::{BlobGuard, DirGuard};
45
46const DELETE_QUEUE_SIZE: usize = 10240;
47const TMP_EXTENSION: &str = "tmp";
48const DELETED_EXTENSION: &str = "deleted";
49const RECYCLE_BIN_TTL: Duration = Duration::from_secs(60);
50
51/// `BoundedStager` is a `Stager` that uses `moka` to manage staging area.
52pub struct BoundedStager<H> {
53    /// The base directory of the staging area.
54    base_dir: PathBuf,
55
56    /// The cache maintaining the cache key to the size of the file or directory.
57    cache: Cache<String, CacheValue>,
58
59    /// The recycle bin for the deleted files and directories.
60    recycle_bin: Cache<String, CacheValue>,
61
62    /// The delete queue for the cleanup task.
63    ///
64    /// The lifetime of a guard is:
65    ///   1. initially inserted into the cache
66    ///   2. moved to the recycle bin when evicted
67    ///      2.1 moved back to the cache when accessed
68    ///      2.2 deleted from the recycle bin after a certain period
69    ///   3. sent the delete task to the delete queue on drop
70    ///   4. background routine removes the file or directory
71    delete_queue: Sender<DeleteTask>,
72
73    /// Notifier for the stager.
74    notifier: Option<Arc<dyn StagerNotifier>>,
75
76    _phantom: std::marker::PhantomData<H>,
77}
78
79impl<H: 'static> BoundedStager<H> {
80    pub async fn new(
81        base_dir: PathBuf,
82        capacity: u64,
83        notifier: Option<Arc<dyn StagerNotifier>>,
84        cache_ttl: Option<Duration>,
85    ) -> Result<Self> {
86        tokio::fs::create_dir_all(&base_dir)
87            .await
88            .context(CreateSnafu)?;
89
90        let recycle_bin = Cache::builder().time_to_idle(RECYCLE_BIN_TTL).build();
91        let recycle_bin_cloned = recycle_bin.clone();
92        let notifier_cloned = notifier.clone();
93
94        let mut cache_builder = Cache::builder()
95            .max_capacity(capacity)
96            .weigher(|_: &String, v: &CacheValue| v.weight())
97            .eviction_policy(EvictionPolicy::lru())
98            .support_invalidation_closures()
99            .async_eviction_listener(move |k, v, _| {
100                let recycle_bin = recycle_bin_cloned.clone();
101                if let Some(notifier) = notifier_cloned.as_ref() {
102                    notifier.on_cache_evict(v.size());
103                    notifier.on_recycle_insert(v.size());
104                }
105                async move {
106                    recycle_bin.insert(k.as_str().to_string(), v).await;
107                }
108                .boxed()
109            });
110        if let Some(ttl) = cache_ttl {
111            if !ttl.is_zero() {
112                cache_builder = cache_builder.time_to_live(ttl);
113            }
114        }
115        let cache = cache_builder.build();
116
117        let (delete_queue, rx) = tokio::sync::mpsc::channel(DELETE_QUEUE_SIZE);
118        let notifier_cloned = notifier.clone();
119        common_runtime::global_runtime().spawn(Self::delete_routine(
120            rx,
121            recycle_bin.clone(),
122            notifier_cloned,
123        ));
124        let stager = Self {
125            cache,
126            base_dir,
127            delete_queue,
128            recycle_bin,
129            notifier,
130            _phantom: std::marker::PhantomData,
131        };
132
133        stager.recover().await?;
134
135        Ok(stager)
136    }
137}
138
139#[async_trait]
140impl<H: ToString + Clone + Send + Sync> Stager for BoundedStager<H> {
141    type Blob = Arc<FsBlobGuard>;
142    type Dir = Arc<FsDirGuard>;
143    type FileHandle = H;
144
145    async fn get_blob<'a>(
146        &self,
147        handle: &Self::FileHandle,
148        key: &str,
149        init_fn: Box<dyn InitBlobFn + Send + Sync + 'a>,
150    ) -> Result<Self::Blob> {
151        let handle_str = handle.to_string();
152        let cache_key = Self::encode_cache_key(&handle_str, key);
153
154        let mut miss = false;
155        let v = self
156            .cache
157            .try_get_with_by_ref(&cache_key, async {
158                if let Some(v) = self.recycle_bin.remove(&cache_key).await {
159                    if let Some(notifier) = self.notifier.as_ref() {
160                        let size = v.size();
161                        notifier.on_cache_insert(size);
162                        notifier.on_recycle_clear(size);
163                    }
164                    return Ok(v);
165                }
166
167                miss = true;
168                let timer = Instant::now();
169                let file_name = format!("{}.{}", cache_key, uuid::Uuid::new_v4());
170                let path = self.base_dir.join(&file_name);
171
172                let size = Self::write_blob(&path, init_fn).await?;
173                if let Some(notifier) = self.notifier.as_ref() {
174                    notifier.on_cache_insert(size);
175                    notifier.on_load_blob(timer.elapsed());
176                }
177                let guard = Arc::new(FsBlobGuard {
178                    handle: handle_str,
179                    path,
180                    delete_queue: self.delete_queue.clone(),
181                    size,
182                });
183                Ok(CacheValue::File(guard))
184            })
185            .await
186            .context(CacheGetSnafu)?;
187
188        if let Some(notifier) = self.notifier.as_ref() {
189            if miss {
190                notifier.on_cache_miss(v.size());
191            } else {
192                notifier.on_cache_hit(v.size());
193            }
194        }
195        match v {
196            CacheValue::File(guard) => Ok(guard),
197            _ => unreachable!(),
198        }
199    }
200
201    async fn get_dir<'a>(
202        &self,
203        handle: &Self::FileHandle,
204        key: &str,
205        init_fn: Box<dyn InitDirFn + Send + Sync + 'a>,
206    ) -> Result<Self::Dir> {
207        let handle_str = handle.to_string();
208
209        let cache_key = Self::encode_cache_key(&handle_str, key);
210
211        let mut miss = false;
212        let v = self
213            .cache
214            .try_get_with_by_ref(&cache_key, async {
215                if let Some(v) = self.recycle_bin.remove(&cache_key).await {
216                    if let Some(notifier) = self.notifier.as_ref() {
217                        let size = v.size();
218                        notifier.on_cache_insert(size);
219                        notifier.on_recycle_clear(size);
220                    }
221                    return Ok(v);
222                }
223
224                miss = true;
225                let timer = Instant::now();
226                let dir_name = format!("{}.{}", cache_key, uuid::Uuid::new_v4());
227                let path = self.base_dir.join(&dir_name);
228
229                let size = Self::write_dir(&path, init_fn).await?;
230                if let Some(notifier) = self.notifier.as_ref() {
231                    notifier.on_cache_insert(size);
232                    notifier.on_load_dir(timer.elapsed());
233                }
234                let guard = Arc::new(FsDirGuard {
235                    handle: handle_str,
236                    path,
237                    size,
238                    delete_queue: self.delete_queue.clone(),
239                });
240                Ok(CacheValue::Dir(guard))
241            })
242            .await
243            .context(CacheGetSnafu)?;
244
245        if let Some(notifier) = self.notifier.as_ref() {
246            if miss {
247                notifier.on_cache_miss(v.size());
248            } else {
249                notifier.on_cache_hit(v.size());
250            }
251        }
252        match v {
253            CacheValue::Dir(guard) => Ok(guard),
254            _ => unreachable!(),
255        }
256    }
257
258    async fn put_dir(
259        &self,
260        handle: &Self::FileHandle,
261        key: &str,
262        dir_path: PathBuf,
263        size: u64,
264    ) -> Result<()> {
265        let handle_str = handle.to_string();
266        let cache_key = Self::encode_cache_key(&handle_str, key);
267
268        self.cache
269            .try_get_with(cache_key.clone(), async move {
270                if let Some(v) = self.recycle_bin.remove(&cache_key).await {
271                    if let Some(notifier) = self.notifier.as_ref() {
272                        let size = v.size();
273                        notifier.on_cache_insert(size);
274                        notifier.on_recycle_clear(size);
275                    }
276                    return Ok(v);
277                }
278
279                let dir_name = format!("{}.{}", cache_key, uuid::Uuid::new_v4());
280                let path = self.base_dir.join(&dir_name);
281
282                fs::rename(&dir_path, &path).await.context(RenameSnafu)?;
283                if let Some(notifier) = self.notifier.as_ref() {
284                    notifier.on_cache_insert(size);
285                }
286                let guard = Arc::new(FsDirGuard {
287                    handle: handle_str,
288                    path,
289                    size,
290                    delete_queue: self.delete_queue.clone(),
291                });
292                Ok(CacheValue::Dir(guard))
293            })
294            .await
295            .map(|_| ())
296            .context(CacheGetSnafu)?;
297
298        // Dir is usually large.
299        // Runs pending tasks of the cache and recycle bin to free up space
300        // more quickly.
301        self.cache.run_pending_tasks().await;
302        self.recycle_bin.run_pending_tasks().await;
303
304        Ok(())
305    }
306
307    async fn purge(&self, handle: &Self::FileHandle) -> Result<()> {
308        let handle_str = handle.to_string();
309        self.cache
310            .invalidate_entries_if(move |_k, v| v.handle() == handle_str)
311            .unwrap(); // SAFETY: `support_invalidation_closures` is enabled
312        self.cache.run_pending_tasks().await;
313        Ok(())
314    }
315}
316
317impl<H> BoundedStager<H> {
318    fn encode_cache_key(puffin_file_name: &str, key: &str) -> String {
319        let mut hasher = Sha256::new();
320        hasher.update(puffin_file_name);
321        hasher.update(key);
322        hasher.update(puffin_file_name);
323        let hash = hasher.finalize();
324
325        BASE64_URL_SAFE.encode(hash)
326    }
327
328    async fn write_blob(
329        target_path: &PathBuf,
330        init_fn: Box<dyn InitBlobFn + Send + Sync + '_>,
331    ) -> Result<u64> {
332        // To guarantee the atomicity of writing the file, we need to write
333        // the file to a temporary file first...
334        let tmp_path = target_path.with_extension(TMP_EXTENSION);
335        let writer = Box::new(
336            fs::File::create(&tmp_path)
337                .await
338                .context(CreateSnafu)?
339                .compat_write(),
340        );
341        let size = init_fn(writer).await?;
342
343        // ...then rename the temporary file to the target path
344        fs::rename(tmp_path, target_path)
345            .await
346            .context(RenameSnafu)?;
347        Ok(size)
348    }
349
350    async fn write_dir(
351        target_path: &PathBuf,
352        init_fn: Box<dyn InitDirFn + Send + Sync + '_>,
353    ) -> Result<u64> {
354        // To guarantee the atomicity of writing the directory, we need to write
355        // the directory to a temporary directory first...
356        let tmp_base = target_path.with_extension(TMP_EXTENSION);
357        let writer_provider = Box::new(MokaDirWriterProvider(tmp_base.clone()));
358        let size = init_fn(writer_provider).await?;
359
360        // ...then rename the temporary directory to the target path
361        fs::rename(&tmp_base, target_path)
362            .await
363            .context(RenameSnafu)?;
364        Ok(size)
365    }
366
367    /// Recovers the staging area by iterating through the staging directory.
368    ///
369    /// Note: It can't recover the mapping between puffin files and keys, so TTL
370    ///       is configured to purge the dangling files and directories.
371    async fn recover(&self) -> Result<()> {
372        let timer = std::time::Instant::now();
373        info!("Recovering the staging area, base_dir: {:?}", self.base_dir);
374
375        let mut read_dir = fs::read_dir(&self.base_dir).await.context(ReadSnafu)?;
376
377        let mut elems = HashMap::new();
378        while let Some(entry) = read_dir.next_entry().await.context(ReadSnafu)? {
379            let path = entry.path();
380
381            if path.extension() == Some(TMP_EXTENSION.as_ref())
382                || path.extension() == Some(DELETED_EXTENSION.as_ref())
383            {
384                // Remove temporary or deleted files and directories
385                if entry.metadata().await.context(MetadataSnafu)?.is_dir() {
386                    fs::remove_dir_all(path).await.context(RemoveSnafu)?;
387                } else {
388                    fs::remove_file(path).await.context(RemoveSnafu)?;
389                }
390            } else {
391                // Insert the guard of the file or directory to the cache
392                let meta = entry.metadata().await.context(MetadataSnafu)?;
393                let file_path = path.file_name().unwrap().to_string_lossy().into_owned();
394
395                // <key>.<uuid>
396                let key = match file_path.split('.').next() {
397                    Some(key) => key.to_string(),
398                    None => {
399                        warn!(
400                            "Invalid staging file name: {}, expected format: <key>.<uuid>",
401                            file_path
402                        );
403                        continue;
404                    }
405                };
406
407                if meta.is_dir() {
408                    let size = Self::get_dir_size(&path).await?;
409                    let v = CacheValue::Dir(Arc::new(FsDirGuard {
410                        path,
411                        size,
412                        delete_queue: self.delete_queue.clone(),
413
414                        // placeholder
415                        handle: String::new(),
416                    }));
417                    // A duplicate dir will be moved to the delete queue.
418                    let _dup_dir = elems.insert(key, v);
419                } else {
420                    let size = meta.len();
421                    let v = CacheValue::File(Arc::new(FsBlobGuard {
422                        path,
423                        size,
424                        delete_queue: self.delete_queue.clone(),
425
426                        // placeholder
427                        handle: String::new(),
428                    }));
429                    // A duplicate file will be moved to the delete queue.
430                    let _dup_file = elems.insert(key, v);
431                }
432            }
433        }
434
435        let mut size = 0;
436        let num_elems = elems.len();
437        for (key, value) in elems {
438            size += value.size();
439            self.cache.insert(key, value).await;
440        }
441        if let Some(notifier) = self.notifier.as_ref() {
442            notifier.on_cache_insert(size);
443        }
444
445        self.cache.run_pending_tasks().await;
446
447        info!(
448            "Recovered the staging area, num_entries: {}, num_bytes: {}, cost: {:?}",
449            num_elems,
450            size,
451            timer.elapsed()
452        );
453        Ok(())
454    }
455
456    /// Walks through the directory and calculate the total size of all files in the directory.
457    async fn get_dir_size(path: &PathBuf) -> Result<u64> {
458        let mut size = 0;
459        let mut wd = WalkDir::new(path).filter(|entry| async move {
460            match entry.file_type().await {
461                Ok(ft) if ft.is_dir() => Filtering::Ignore,
462                _ => Filtering::Continue,
463            }
464        });
465
466        while let Some(entry) = wd.next().await {
467            let entry = entry.context(WalkDirSnafu)?;
468            size += entry.metadata().await.context(MetadataSnafu)?.len();
469        }
470
471        Ok(size)
472    }
473
474    async fn delete_routine(
475        mut receiver: Receiver<DeleteTask>,
476        recycle_bin: Cache<String, CacheValue>,
477        notifier: Option<Arc<dyn StagerNotifier>>,
478    ) {
479        loop {
480            match tokio::time::timeout(RECYCLE_BIN_TTL, receiver.recv()).await {
481                Ok(Some(task)) => match task {
482                    DeleteTask::File(path, size) => {
483                        if let Err(err) = fs::remove_file(&path).await {
484                            if err.kind() == std::io::ErrorKind::NotFound {
485                                continue;
486                            }
487
488                            warn!(err; "Failed to remove the file.");
489                        }
490
491                        if let Some(notifier) = notifier.as_ref() {
492                            notifier.on_recycle_clear(size);
493                        }
494                    }
495
496                    DeleteTask::Dir(path, size) => {
497                        let deleted_path = path.with_extension(DELETED_EXTENSION);
498                        if let Err(err) = fs::rename(&path, &deleted_path).await {
499                            if err.kind() == std::io::ErrorKind::NotFound {
500                                continue;
501                            }
502
503                            // Remove the deleted directory if the rename fails and retry
504                            let _ = fs::remove_dir_all(&deleted_path).await;
505                            if let Err(err) = fs::rename(&path, &deleted_path).await {
506                                warn!(err; "Failed to rename the dangling directory to deleted path.");
507                                continue;
508                            }
509                        }
510                        if let Err(err) = fs::remove_dir_all(&deleted_path).await {
511                            warn!(err; "Failed to remove the dangling directory.");
512                        }
513                        if let Some(notifier) = notifier.as_ref() {
514                            notifier.on_recycle_clear(size);
515                        }
516                    }
517                    DeleteTask::Terminate => {
518                        break;
519                    }
520                },
521                Ok(None) => break,
522                Err(_) => {
523                    // Purge recycle bin periodically to reclaim the space quickly.
524                    recycle_bin.run_pending_tasks().await;
525                }
526            }
527        }
528
529        info!("The delete routine for the bounded stager is terminated.");
530    }
531}
532
533impl<H> Drop for BoundedStager<H> {
534    fn drop(&mut self) {
535        let _ = self.delete_queue.try_send(DeleteTask::Terminate);
536    }
537}
538
539#[derive(Debug, Clone)]
540enum CacheValue {
541    File(Arc<FsBlobGuard>),
542    Dir(Arc<FsDirGuard>),
543}
544
545impl CacheValue {
546    fn size(&self) -> u64 {
547        match self {
548            CacheValue::File(guard) => guard.size,
549            CacheValue::Dir(guard) => guard.size,
550        }
551    }
552
553    fn weight(&self) -> u32 {
554        self.size().try_into().unwrap_or(u32::MAX)
555    }
556
557    fn handle(&self) -> &str {
558        match self {
559            CacheValue::File(guard) => &guard.handle,
560            CacheValue::Dir(guard) => &guard.handle,
561        }
562    }
563}
564
565enum DeleteTask {
566    File(PathBuf, u64),
567    Dir(PathBuf, u64),
568    Terminate,
569}
570
571/// `FsBlobGuard` is a `BlobGuard` for accessing the blob and
572/// automatically deleting the file on drop.
573#[derive(Debug)]
574pub struct FsBlobGuard {
575    handle: String,
576    path: PathBuf,
577    size: u64,
578    delete_queue: Sender<DeleteTask>,
579}
580
581#[async_trait]
582impl BlobGuard for FsBlobGuard {
583    type Reader = FileReader;
584
585    async fn reader(&self) -> Result<Self::Reader> {
586        FileReader::new(&self.path).await.context(OpenSnafu)
587    }
588}
589
590impl Drop for FsBlobGuard {
591    fn drop(&mut self) {
592        if let Err(err) = self
593            .delete_queue
594            .try_send(DeleteTask::File(self.path.clone(), self.size))
595        {
596            if matches!(err, TrySendError::Closed(_)) {
597                return;
598            }
599            warn!(err; "Failed to send the delete task for the file.");
600        }
601    }
602}
603
604/// `FsDirGuard` is a `DirGuard` for accessing the directory and
605/// automatically deleting the directory on drop.
606#[derive(Debug)]
607pub struct FsDirGuard {
608    handle: String,
609    path: PathBuf,
610    size: u64,
611    delete_queue: Sender<DeleteTask>,
612}
613
614impl DirGuard for FsDirGuard {
615    fn path(&self) -> &PathBuf {
616        &self.path
617    }
618}
619
620impl Drop for FsDirGuard {
621    fn drop(&mut self) {
622        if let Err(err) = self
623            .delete_queue
624            .try_send(DeleteTask::Dir(self.path.clone(), self.size))
625        {
626            if matches!(err, TrySendError::Closed(_)) {
627                return;
628            }
629            warn!(err; "Failed to send the delete task for the directory.");
630        }
631    }
632}
633
634/// `MokaDirWriterProvider` implements `DirWriterProvider` for initializing a directory.
635struct MokaDirWriterProvider(PathBuf);
636
637#[async_trait]
638impl DirWriterProvider for MokaDirWriterProvider {
639    async fn writer(&self, rel_path: &str) -> Result<BoxWriter> {
640        let full_path = if cfg!(windows) {
641            self.0.join(rel_path.replace('/', "\\"))
642        } else {
643            self.0.join(rel_path)
644        };
645        if let Some(parent) = full_path.parent() {
646            fs::create_dir_all(parent).await.context(CreateSnafu)?;
647        }
648        Ok(Box::new(
649            fs::File::create(full_path)
650                .await
651                .context(CreateSnafu)?
652                .compat_write(),
653        ) as BoxWriter)
654    }
655}
656
657#[cfg(test)]
658impl<H> BoundedStager<H> {
659    pub async fn must_get_file(&self, puffin_file_name: &str, key: &str) -> fs::File {
660        let cache_key = Self::encode_cache_key(puffin_file_name, key);
661        let value = self.cache.get(&cache_key).await.unwrap();
662        let path = match &value {
663            CacheValue::File(guard) => &guard.path,
664            _ => panic!("Expected a file, but got a directory."),
665        };
666        fs::File::open(path).await.unwrap()
667    }
668
669    pub async fn must_get_dir(&self, puffin_file_name: &str, key: &str) -> PathBuf {
670        let cache_key = Self::encode_cache_key(puffin_file_name, key);
671        let value = self.cache.get(&cache_key).await.unwrap();
672        let path = match &value {
673            CacheValue::Dir(guard) => &guard.path,
674            _ => panic!("Expected a directory, but got a file."),
675        };
676        path.clone()
677    }
678
679    pub fn in_cache(&self, puffin_file_name: &str, key: &str) -> bool {
680        let cache_key = Self::encode_cache_key(puffin_file_name, key);
681        self.cache.contains_key(&cache_key)
682    }
683}
684
685#[cfg(test)]
686mod tests {
687    use std::sync::atomic::AtomicU64;
688
689    use common_base::range_read::RangeReader;
690    use common_test_util::temp_dir::create_temp_dir;
691    use futures::AsyncWriteExt;
692    use tokio::io::AsyncReadExt as _;
693
694    use super::*;
695    use crate::error::BlobNotFoundSnafu;
696    use crate::puffin_manager::stager::Stager;
697
698    struct MockNotifier {
699        cache_insert_size: AtomicU64,
700        cache_evict_size: AtomicU64,
701        cache_hit_count: AtomicU64,
702        cache_hit_size: AtomicU64,
703        cache_miss_count: AtomicU64,
704        cache_miss_size: AtomicU64,
705        recycle_insert_size: AtomicU64,
706        recycle_clear_size: AtomicU64,
707    }
708
709    #[derive(Debug, PartialEq, Eq)]
710    struct Stats {
711        cache_insert_size: u64,
712        cache_evict_size: u64,
713        cache_hit_count: u64,
714        cache_hit_size: u64,
715        cache_miss_count: u64,
716        cache_miss_size: u64,
717        recycle_insert_size: u64,
718        recycle_clear_size: u64,
719    }
720
721    impl MockNotifier {
722        fn build() -> Arc<MockNotifier> {
723            Arc::new(Self {
724                cache_insert_size: AtomicU64::new(0),
725                cache_evict_size: AtomicU64::new(0),
726                cache_hit_count: AtomicU64::new(0),
727                cache_hit_size: AtomicU64::new(0),
728                cache_miss_count: AtomicU64::new(0),
729                cache_miss_size: AtomicU64::new(0),
730                recycle_insert_size: AtomicU64::new(0),
731                recycle_clear_size: AtomicU64::new(0),
732            })
733        }
734
735        fn stats(&self) -> Stats {
736            Stats {
737                cache_insert_size: self
738                    .cache_insert_size
739                    .load(std::sync::atomic::Ordering::Relaxed),
740                cache_evict_size: self
741                    .cache_evict_size
742                    .load(std::sync::atomic::Ordering::Relaxed),
743                cache_hit_count: self
744                    .cache_hit_count
745                    .load(std::sync::atomic::Ordering::Relaxed),
746                cache_hit_size: self
747                    .cache_hit_size
748                    .load(std::sync::atomic::Ordering::Relaxed),
749                cache_miss_count: self
750                    .cache_miss_count
751                    .load(std::sync::atomic::Ordering::Relaxed),
752                cache_miss_size: self
753                    .cache_miss_size
754                    .load(std::sync::atomic::Ordering::Relaxed),
755                recycle_insert_size: self
756                    .recycle_insert_size
757                    .load(std::sync::atomic::Ordering::Relaxed),
758                recycle_clear_size: self
759                    .recycle_clear_size
760                    .load(std::sync::atomic::Ordering::Relaxed),
761            }
762        }
763    }
764
765    impl StagerNotifier for MockNotifier {
766        fn on_cache_insert(&self, size: u64) {
767            self.cache_insert_size
768                .fetch_add(size, std::sync::atomic::Ordering::Relaxed);
769        }
770
771        fn on_cache_evict(&self, size: u64) {
772            self.cache_evict_size
773                .fetch_add(size, std::sync::atomic::Ordering::Relaxed);
774        }
775
776        fn on_cache_hit(&self, size: u64) {
777            self.cache_hit_count
778                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
779            self.cache_hit_size
780                .fetch_add(size, std::sync::atomic::Ordering::Relaxed);
781        }
782
783        fn on_cache_miss(&self, size: u64) {
784            self.cache_miss_count
785                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
786            self.cache_miss_size
787                .fetch_add(size, std::sync::atomic::Ordering::Relaxed);
788        }
789
790        fn on_recycle_insert(&self, size: u64) {
791            self.recycle_insert_size
792                .fetch_add(size, std::sync::atomic::Ordering::Relaxed);
793        }
794
795        fn on_recycle_clear(&self, size: u64) {
796            self.recycle_clear_size
797                .fetch_add(size, std::sync::atomic::Ordering::Relaxed);
798        }
799
800        fn on_load_blob(&self, _duration: Duration) {}
801
802        fn on_load_dir(&self, _duration: Duration) {}
803    }
804
805    #[tokio::test]
806    async fn test_get_blob() {
807        let tempdir = create_temp_dir("test_get_blob_");
808        let notifier = MockNotifier::build();
809        let stager = BoundedStager::new(
810            tempdir.path().to_path_buf(),
811            u64::MAX,
812            Some(notifier.clone()),
813            None,
814        )
815        .await
816        .unwrap();
817
818        let puffin_file_name = "test_get_blob".to_string();
819        let key = "key";
820        let reader = stager
821            .get_blob(
822                &puffin_file_name,
823                key,
824                Box::new(|mut writer| {
825                    Box::pin(async move {
826                        writer.write_all(b"hello world").await.unwrap();
827                        Ok(11)
828                    })
829                }),
830            )
831            .await
832            .unwrap()
833            .reader()
834            .await
835            .unwrap();
836
837        let m = reader.metadata().await.unwrap();
838        let buf = reader.read(0..m.content_length).await.unwrap();
839        assert_eq!(&*buf, b"hello world");
840
841        let mut file = stager.must_get_file(&puffin_file_name, key).await;
842        let mut buf = Vec::new();
843        file.read_to_end(&mut buf).await.unwrap();
844        assert_eq!(buf, b"hello world");
845
846        let stats = notifier.stats();
847        assert_eq!(
848            stats,
849            Stats {
850                cache_insert_size: 11,
851                cache_evict_size: 0,
852                cache_hit_count: 0,
853                cache_hit_size: 0,
854                cache_miss_count: 1,
855                cache_miss_size: 11,
856                recycle_insert_size: 0,
857                recycle_clear_size: 0,
858            }
859        );
860    }
861
862    #[tokio::test]
863    async fn test_get_dir() {
864        let tempdir = create_temp_dir("test_get_dir_");
865        let notifier = MockNotifier::build();
866        let stager = BoundedStager::new(
867            tempdir.path().to_path_buf(),
868            u64::MAX,
869            Some(notifier.clone()),
870            None,
871        )
872        .await
873        .unwrap();
874
875        let files_in_dir = [
876            ("file_a", "Hello, world!".as_bytes()),
877            ("file_b", "Hello, Rust!".as_bytes()),
878            ("file_c", "你好,世界!".as_bytes()),
879            ("subdir/file_d", "Hello, Puffin!".as_bytes()),
880            ("subdir/subsubdir/file_e", "¡Hola mundo!".as_bytes()),
881        ];
882
883        let puffin_file_name = "test_get_dir".to_string();
884        let key = "key";
885        let dir_path = stager
886            .get_dir(
887                &puffin_file_name,
888                key,
889                Box::new(|writer_provider| {
890                    Box::pin(async move {
891                        let mut size = 0;
892                        for (rel_path, content) in &files_in_dir {
893                            size += content.len();
894                            let mut writer = writer_provider.writer(rel_path).await.unwrap();
895                            writer.write_all(content).await.unwrap();
896                        }
897                        Ok(size as _)
898                    })
899                }),
900            )
901            .await
902            .unwrap();
903
904        for (rel_path, content) in &files_in_dir {
905            let file_path = dir_path.path().join(rel_path);
906            let mut file = tokio::fs::File::open(&file_path).await.unwrap();
907            let mut buf = Vec::new();
908            file.read_to_end(&mut buf).await.unwrap();
909            assert_eq!(buf, *content);
910        }
911
912        let dir_path = stager.must_get_dir(&puffin_file_name, key).await;
913        for (rel_path, content) in &files_in_dir {
914            let file_path = dir_path.join(rel_path);
915            let mut file = tokio::fs::File::open(&file_path).await.unwrap();
916            let mut buf = Vec::new();
917            file.read_to_end(&mut buf).await.unwrap();
918            assert_eq!(buf, *content);
919        }
920
921        let stats = notifier.stats();
922        assert_eq!(
923            stats,
924            Stats {
925                cache_insert_size: 70,
926                cache_evict_size: 0,
927                cache_hit_count: 0,
928                cache_hit_size: 0,
929                cache_miss_count: 1,
930                cache_miss_size: 70,
931                recycle_insert_size: 0,
932                recycle_clear_size: 0
933            }
934        );
935    }
936
937    #[tokio::test]
938    async fn test_recover() {
939        let tempdir = create_temp_dir("test_recover_");
940        let notifier = MockNotifier::build();
941        let stager = BoundedStager::new(
942            tempdir.path().to_path_buf(),
943            u64::MAX,
944            Some(notifier.clone()),
945            None,
946        )
947        .await
948        .unwrap();
949
950        // initialize stager
951        let puffin_file_name = "test_recover".to_string();
952        let blob_key = "blob_key";
953        let guard = stager
954            .get_blob(
955                &puffin_file_name,
956                blob_key,
957                Box::new(|mut writer| {
958                    Box::pin(async move {
959                        writer.write_all(b"hello world").await.unwrap();
960                        Ok(11)
961                    })
962                }),
963            )
964            .await
965            .unwrap();
966        drop(guard);
967
968        let files_in_dir = [
969            ("file_a", "Hello, world!".as_bytes()),
970            ("file_b", "Hello, Rust!".as_bytes()),
971            ("file_c", "你好,世界!".as_bytes()),
972            ("subdir/file_d", "Hello, Puffin!".as_bytes()),
973            ("subdir/subsubdir/file_e", "¡Hola mundo!".as_bytes()),
974        ];
975
976        let dir_key = "dir_key";
977        let guard = stager
978            .get_dir(
979                &puffin_file_name,
980                dir_key,
981                Box::new(|writer_provider| {
982                    Box::pin(async move {
983                        let mut size = 0;
984                        for (rel_path, content) in &files_in_dir {
985                            size += content.len();
986                            let mut writer = writer_provider.writer(rel_path).await.unwrap();
987                            writer.write_all(content).await.unwrap();
988                        }
989                        Ok(size as _)
990                    })
991                }),
992            )
993            .await
994            .unwrap();
995        drop(guard);
996
997        // recover stager
998        drop(stager);
999        let stager = BoundedStager::new(tempdir.path().to_path_buf(), u64::MAX, None, None)
1000            .await
1001            .unwrap();
1002
1003        let reader = stager
1004            .get_blob(
1005                &puffin_file_name,
1006                blob_key,
1007                Box::new(|_| Box::pin(async { Ok(0) })),
1008            )
1009            .await
1010            .unwrap()
1011            .reader()
1012            .await
1013            .unwrap();
1014
1015        let m = reader.metadata().await.unwrap();
1016        let buf = reader.read(0..m.content_length).await.unwrap();
1017        assert_eq!(&*buf, b"hello world");
1018
1019        let dir_path = stager
1020            .get_dir(
1021                &puffin_file_name,
1022                dir_key,
1023                Box::new(|_| Box::pin(async { Ok(0) })),
1024            )
1025            .await
1026            .unwrap();
1027        for (rel_path, content) in &files_in_dir {
1028            let file_path = dir_path.path().join(rel_path);
1029            let mut file = tokio::fs::File::open(&file_path).await.unwrap();
1030            let mut buf = Vec::new();
1031            file.read_to_end(&mut buf).await.unwrap();
1032            assert_eq!(buf, *content);
1033        }
1034
1035        let stats = notifier.stats();
1036        assert_eq!(
1037            stats,
1038            Stats {
1039                cache_insert_size: 81,
1040                cache_evict_size: 0,
1041                cache_hit_count: 0,
1042                cache_hit_size: 0,
1043                cache_miss_count: 2,
1044                cache_miss_size: 81,
1045                recycle_insert_size: 0,
1046                recycle_clear_size: 0
1047            }
1048        );
1049    }
1050
1051    #[tokio::test]
1052    async fn test_eviction() {
1053        let tempdir = create_temp_dir("test_eviction_");
1054        let notifier = MockNotifier::build();
1055        let stager = BoundedStager::new(
1056            tempdir.path().to_path_buf(),
1057            1, /* extremely small size */
1058            Some(notifier.clone()),
1059            None,
1060        )
1061        .await
1062        .unwrap();
1063
1064        let puffin_file_name = "test_eviction".to_string();
1065        let blob_key = "blob_key";
1066
1067        // First time to get the blob
1068        let reader = stager
1069            .get_blob(
1070                &puffin_file_name,
1071                blob_key,
1072                Box::new(|mut writer| {
1073                    Box::pin(async move {
1074                        writer.write_all(b"Hello world").await.unwrap();
1075                        Ok(11)
1076                    })
1077                }),
1078            )
1079            .await
1080            .unwrap()
1081            .reader()
1082            .await
1083            .unwrap();
1084
1085        // The blob should be evicted
1086        stager.cache.run_pending_tasks().await;
1087        assert!(!stager.in_cache(&puffin_file_name, blob_key));
1088
1089        let stats = notifier.stats();
1090        assert_eq!(
1091            stats,
1092            Stats {
1093                cache_insert_size: 11,
1094                cache_evict_size: 11,
1095                cache_hit_count: 0,
1096                cache_hit_size: 0,
1097                cache_miss_count: 1,
1098                cache_miss_size: 11,
1099                recycle_insert_size: 11,
1100                recycle_clear_size: 0
1101            }
1102        );
1103
1104        let m = reader.metadata().await.unwrap();
1105        let buf = reader.read(0..m.content_length).await.unwrap();
1106        assert_eq!(&*buf, b"Hello world");
1107
1108        // Second time to get the blob, get from recycle bin
1109        let reader = stager
1110            .get_blob(
1111                &puffin_file_name,
1112                blob_key,
1113                Box::new(|_| async { Ok(0) }.boxed()),
1114            )
1115            .await
1116            .unwrap()
1117            .reader()
1118            .await
1119            .unwrap();
1120
1121        // The blob should be evicted
1122        stager.cache.run_pending_tasks().await;
1123        assert!(!stager.in_cache(&puffin_file_name, blob_key));
1124
1125        let stats = notifier.stats();
1126        assert_eq!(
1127            stats,
1128            Stats {
1129                cache_insert_size: 22,
1130                cache_evict_size: 22,
1131                cache_hit_count: 1,
1132                cache_hit_size: 11,
1133                cache_miss_count: 1,
1134                cache_miss_size: 11,
1135                recycle_insert_size: 22,
1136                recycle_clear_size: 11
1137            }
1138        );
1139
1140        let m = reader.metadata().await.unwrap();
1141        let buf = reader.read(0..m.content_length).await.unwrap();
1142        assert_eq!(&*buf, b"Hello world");
1143
1144        let dir_key = "dir_key";
1145        let files_in_dir = [
1146            ("file_a", "Hello, world!".as_bytes()),
1147            ("file_b", "Hello, Rust!".as_bytes()),
1148            ("file_c", "你好,世界!".as_bytes()),
1149            ("subdir/file_d", "Hello, Puffin!".as_bytes()),
1150            ("subdir/subsubdir/file_e", "¡Hola mundo!".as_bytes()),
1151        ];
1152
1153        // First time to get the directory
1154        let guard_0 = stager
1155            .get_dir(
1156                &puffin_file_name,
1157                dir_key,
1158                Box::new(|writer_provider| {
1159                    Box::pin(async move {
1160                        let mut size = 0;
1161                        for (rel_path, content) in &files_in_dir {
1162                            let mut writer = writer_provider.writer(rel_path).await.unwrap();
1163                            writer.write_all(content).await.unwrap();
1164                            size += content.len() as u64;
1165                        }
1166                        Ok(size)
1167                    })
1168                }),
1169            )
1170            .await
1171            .unwrap();
1172
1173        for (rel_path, content) in &files_in_dir {
1174            let file_path = guard_0.path().join(rel_path);
1175            let mut file = tokio::fs::File::open(&file_path).await.unwrap();
1176            let mut buf = Vec::new();
1177            file.read_to_end(&mut buf).await.unwrap();
1178            assert_eq!(buf, *content);
1179        }
1180
1181        // The directory should be evicted
1182        stager.cache.run_pending_tasks().await;
1183        assert!(!stager.in_cache(&puffin_file_name, dir_key));
1184
1185        let stats = notifier.stats();
1186        assert_eq!(
1187            stats,
1188            Stats {
1189                cache_insert_size: 92,
1190                cache_evict_size: 92,
1191                cache_hit_count: 1,
1192                cache_hit_size: 11,
1193                cache_miss_count: 2,
1194                cache_miss_size: 81,
1195                recycle_insert_size: 92,
1196                recycle_clear_size: 11
1197            }
1198        );
1199
1200        // Second time to get the directory
1201        let guard_1 = stager
1202            .get_dir(
1203                &puffin_file_name,
1204                dir_key,
1205                Box::new(|_| async { Ok(0) }.boxed()),
1206            )
1207            .await
1208            .unwrap();
1209
1210        for (rel_path, content) in &files_in_dir {
1211            let file_path = guard_1.path().join(rel_path);
1212            let mut file = tokio::fs::File::open(&file_path).await.unwrap();
1213            let mut buf = Vec::new();
1214            file.read_to_end(&mut buf).await.unwrap();
1215            assert_eq!(buf, *content);
1216        }
1217
1218        // Still hold the guard
1219        stager.cache.run_pending_tasks().await;
1220        assert!(!stager.in_cache(&puffin_file_name, dir_key));
1221
1222        let stats = notifier.stats();
1223        assert_eq!(
1224            stats,
1225            Stats {
1226                cache_insert_size: 162,
1227                cache_evict_size: 162,
1228                cache_hit_count: 2,
1229                cache_hit_size: 81,
1230                cache_miss_count: 2,
1231                cache_miss_size: 81,
1232                recycle_insert_size: 162,
1233                recycle_clear_size: 81
1234            }
1235        );
1236
1237        // Third time to get the directory and all guards are dropped
1238        drop(guard_0);
1239        drop(guard_1);
1240        let guard_2 = stager
1241            .get_dir(
1242                &puffin_file_name,
1243                dir_key,
1244                Box::new(|_| Box::pin(async move { Ok(0) })),
1245            )
1246            .await
1247            .unwrap();
1248
1249        // Still hold the guard, so the directory should not be removed even if it's evicted
1250        stager.cache.run_pending_tasks().await;
1251        assert!(!stager.in_cache(&puffin_file_name, blob_key));
1252
1253        for (rel_path, content) in &files_in_dir {
1254            let file_path = guard_2.path().join(rel_path);
1255            let mut file = tokio::fs::File::open(&file_path).await.unwrap();
1256            let mut buf = Vec::new();
1257            file.read_to_end(&mut buf).await.unwrap();
1258            assert_eq!(buf, *content);
1259        }
1260
1261        let stats = notifier.stats();
1262        assert_eq!(
1263            stats,
1264            Stats {
1265                cache_insert_size: 232,
1266                cache_evict_size: 232,
1267                cache_hit_count: 3,
1268                cache_hit_size: 151,
1269                cache_miss_count: 2,
1270                cache_miss_size: 81,
1271                recycle_insert_size: 232,
1272                recycle_clear_size: 151
1273            }
1274        );
1275    }
1276
1277    #[tokio::test]
1278    async fn test_get_blob_concurrency_on_fail() {
1279        let tempdir = create_temp_dir("test_get_blob_concurrency_on_fail_");
1280        let stager = BoundedStager::new(tempdir.path().to_path_buf(), u64::MAX, None, None)
1281            .await
1282            .unwrap();
1283
1284        let puffin_file_name = "test_get_blob_concurrency_on_fail".to_string();
1285        let key = "key";
1286
1287        let stager = Arc::new(stager);
1288        let handles = (0..10)
1289            .map(|_| {
1290                let stager = stager.clone();
1291                let puffin_file_name = puffin_file_name.clone();
1292                let task = async move {
1293                    let failed_init = Box::new(|_| {
1294                        async {
1295                            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1296                            BlobNotFoundSnafu { blob: "whatever" }.fail()
1297                        }
1298                        .boxed()
1299                    });
1300                    stager.get_blob(&puffin_file_name, key, failed_init).await
1301                };
1302
1303                tokio::spawn(task)
1304            })
1305            .collect::<Vec<_>>();
1306
1307        for handle in handles {
1308            let r = handle.await.unwrap();
1309            assert!(r.is_err());
1310        }
1311
1312        assert!(!stager.in_cache(&puffin_file_name, key));
1313    }
1314
1315    #[tokio::test]
1316    async fn test_get_dir_concurrency_on_fail() {
1317        let tempdir = create_temp_dir("test_get_dir_concurrency_on_fail_");
1318        let stager = BoundedStager::new(tempdir.path().to_path_buf(), u64::MAX, None, None)
1319            .await
1320            .unwrap();
1321
1322        let puffin_file_name = "test_get_dir_concurrency_on_fail".to_string();
1323        let key = "key";
1324
1325        let stager = Arc::new(stager);
1326        let handles = (0..10)
1327            .map(|_| {
1328                let stager = stager.clone();
1329                let puffin_file_name = puffin_file_name.clone();
1330                let task = async move {
1331                    let failed_init = Box::new(|_| {
1332                        async {
1333                            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1334                            BlobNotFoundSnafu { blob: "whatever" }.fail()
1335                        }
1336                        .boxed()
1337                    });
1338                    stager.get_dir(&puffin_file_name, key, failed_init).await
1339                };
1340
1341                tokio::spawn(task)
1342            })
1343            .collect::<Vec<_>>();
1344
1345        for handle in handles {
1346            let r = handle.await.unwrap();
1347            assert!(r.is_err());
1348        }
1349
1350        assert!(!stager.in_cache(&puffin_file_name, key));
1351    }
1352
1353    #[tokio::test]
1354    async fn test_purge() {
1355        let tempdir = create_temp_dir("test_purge_");
1356        let notifier = MockNotifier::build();
1357        let stager = BoundedStager::new(
1358            tempdir.path().to_path_buf(),
1359            u64::MAX,
1360            Some(notifier.clone()),
1361            None,
1362        )
1363        .await
1364        .unwrap();
1365
1366        // initialize stager
1367        let puffin_file_name = "test_purge".to_string();
1368        let blob_key = "blob_key";
1369        let guard = stager
1370            .get_blob(
1371                &puffin_file_name,
1372                blob_key,
1373                Box::new(|mut writer| {
1374                    Box::pin(async move {
1375                        writer.write_all(b"hello world").await.unwrap();
1376                        Ok(11)
1377                    })
1378                }),
1379            )
1380            .await
1381            .unwrap();
1382        drop(guard);
1383
1384        let files_in_dir = [
1385            ("file_a", "Hello, world!".as_bytes()),
1386            ("file_b", "Hello, Rust!".as_bytes()),
1387            ("file_c", "你好,世界!".as_bytes()),
1388            ("subdir/file_d", "Hello, Puffin!".as_bytes()),
1389            ("subdir/subsubdir/file_e", "¡Hola mundo!".as_bytes()),
1390        ];
1391
1392        let dir_key = "dir_key";
1393        let guard = stager
1394            .get_dir(
1395                &puffin_file_name,
1396                dir_key,
1397                Box::new(|writer_provider| {
1398                    Box::pin(async move {
1399                        let mut size = 0;
1400                        for (rel_path, content) in &files_in_dir {
1401                            size += content.len();
1402                            let mut writer = writer_provider.writer(rel_path).await.unwrap();
1403                            writer.write_all(content).await.unwrap();
1404                        }
1405                        Ok(size as _)
1406                    })
1407                }),
1408            )
1409            .await
1410            .unwrap();
1411        drop(guard);
1412
1413        // purge the stager
1414        stager.purge(&puffin_file_name).await.unwrap();
1415
1416        let stats = notifier.stats();
1417        assert_eq!(
1418            stats,
1419            Stats {
1420                cache_insert_size: 81,
1421                cache_evict_size: 81,
1422                cache_hit_count: 0,
1423                cache_hit_size: 0,
1424                cache_miss_count: 2,
1425                cache_miss_size: 81,
1426                recycle_insert_size: 81,
1427                recycle_clear_size: 0
1428            }
1429        );
1430    }
1431}