1use std::collections::HashMap;
16use std::path::PathBuf;
17use std::sync::Arc;
18use std::time::{Duration, Instant};
19
20use async_trait::async_trait;
21use async_walkdir::{Filtering, WalkDir};
22use base64::Engine;
23use base64::prelude::BASE64_URL_SAFE;
24use common_base::range_read::FileReader;
25use common_runtime::runtime::RuntimeTrait;
26use common_telemetry::{info, warn};
27use futures::{FutureExt, StreamExt};
28use moka::future::Cache;
29use moka::policy::EvictionPolicy;
30use sha2::{Digest, Sha256};
31use snafu::ResultExt;
32use tokio::fs;
33use tokio::sync::mpsc::error::TrySendError;
34use tokio::sync::mpsc::{Receiver, Sender};
35use tokio_util::compat::TokioAsyncWriteCompatExt;
36
37use crate::error::{
38 CacheGetSnafu, CreateSnafu, MetadataSnafu, OpenSnafu, ReadSnafu, RemoveSnafu, RenameSnafu,
39 Result, WalkDirSnafu,
40};
41use crate::puffin_manager::stager::{
42 BoxWriter, DirWriterProvider, InitBlobFn, InitDirFn, Stager, StagerNotifier,
43};
44use crate::puffin_manager::{BlobGuard, DirGuard, DirMetrics};
45
46const DELETE_QUEUE_SIZE: usize = 10240;
47const TMP_EXTENSION: &str = "tmp";
48const DELETED_EXTENSION: &str = "deleted";
49const RECYCLE_BIN_TTL: Duration = Duration::from_secs(60);
50
51pub struct BoundedStager<H> {
53 base_dir: PathBuf,
55
56 cache: Cache<String, CacheValue>,
58
59 recycle_bin: Cache<String, CacheValue>,
61
62 delete_queue: Sender<DeleteTask>,
72
73 notifier: Option<Arc<dyn StagerNotifier>>,
75
76 _phantom: std::marker::PhantomData<H>,
77}
78
79impl<H: 'static> BoundedStager<H> {
80 pub async fn new(
81 base_dir: PathBuf,
82 capacity: u64,
83 notifier: Option<Arc<dyn StagerNotifier>>,
84 cache_ttl: Option<Duration>,
85 ) -> Result<Self> {
86 tokio::fs::create_dir_all(&base_dir)
87 .await
88 .context(CreateSnafu)?;
89
90 let recycle_bin = Cache::builder().time_to_idle(RECYCLE_BIN_TTL).build();
91 let recycle_bin_cloned = recycle_bin.clone();
92 let notifier_cloned = notifier.clone();
93
94 let mut cache_builder = Cache::builder()
95 .max_capacity(capacity)
96 .weigher(|_: &String, v: &CacheValue| v.weight())
97 .eviction_policy(EvictionPolicy::lru())
98 .support_invalidation_closures()
99 .async_eviction_listener(move |k, v, _| {
100 let recycle_bin = recycle_bin_cloned.clone();
101 if let Some(notifier) = notifier_cloned.as_ref() {
102 notifier.on_cache_evict(v.size());
103 notifier.on_recycle_insert(v.size());
104 }
105 async move {
106 recycle_bin.insert(k.as_str().to_string(), v).await;
107 }
108 .boxed()
109 });
110 if let Some(ttl) = cache_ttl
111 && !ttl.is_zero()
112 {
113 cache_builder = cache_builder.time_to_live(ttl);
114 }
115 let cache = cache_builder.build();
116
117 let (delete_queue, rx) = tokio::sync::mpsc::channel(DELETE_QUEUE_SIZE);
118 let notifier_cloned = notifier.clone();
119 common_runtime::global_runtime().spawn(Self::delete_routine(
120 rx,
121 recycle_bin.clone(),
122 notifier_cloned,
123 ));
124 let stager = Self {
125 cache,
126 base_dir,
127 delete_queue,
128 recycle_bin,
129 notifier,
130 _phantom: std::marker::PhantomData,
131 };
132
133 stager.recover().await?;
134
135 Ok(stager)
136 }
137}
138
139#[async_trait]
140impl<H: ToString + Clone + Send + Sync> Stager for BoundedStager<H> {
141 type Blob = Arc<FsBlobGuard>;
142 type Dir = Arc<FsDirGuard>;
143 type FileHandle = H;
144
145 async fn get_blob<'a>(
146 &self,
147 handle: &Self::FileHandle,
148 key: &str,
149 init_fn: Box<dyn InitBlobFn + Send + Sync + 'a>,
150 ) -> Result<Self::Blob> {
151 let handle_str = handle.to_string();
152 let cache_key = Self::encode_cache_key(&handle_str, key);
153
154 let mut miss = false;
155 let v = self
156 .cache
157 .try_get_with_by_ref(&cache_key, async {
158 if let Some(v) = self.recycle_bin.remove(&cache_key).await {
159 if let Some(notifier) = self.notifier.as_ref() {
160 let size = v.size();
161 notifier.on_cache_insert(size);
162 notifier.on_recycle_clear(size);
163 }
164 return Ok(v);
165 }
166
167 miss = true;
168 let timer = Instant::now();
169 let file_name = format!("{}.{}", cache_key, uuid::Uuid::new_v4());
170 let path = self.base_dir.join(&file_name);
171
172 let size = Self::write_blob(&path, init_fn).await?;
173 if let Some(notifier) = self.notifier.as_ref() {
174 notifier.on_cache_insert(size);
175 notifier.on_load_blob(timer.elapsed());
176 }
177 let guard = Arc::new(FsBlobGuard {
178 handle: handle_str,
179 path,
180 delete_queue: self.delete_queue.clone(),
181 size,
182 });
183 Ok(CacheValue::File(guard))
184 })
185 .await
186 .context(CacheGetSnafu)?;
187
188 if let Some(notifier) = self.notifier.as_ref() {
189 if miss {
190 notifier.on_cache_miss(v.size());
191 } else {
192 notifier.on_cache_hit(v.size());
193 }
194 }
195 match v {
196 CacheValue::File(guard) => Ok(guard),
197 _ => unreachable!(),
198 }
199 }
200
201 async fn get_dir<'a>(
202 &self,
203 handle: &Self::FileHandle,
204 key: &str,
205 init_fn: Box<dyn InitDirFn + Send + Sync + 'a>,
206 ) -> Result<(Self::Dir, DirMetrics)> {
207 let handle_str = handle.to_string();
208
209 let cache_key = Self::encode_cache_key(&handle_str, key);
210
211 let mut miss = false;
212 let v = self
213 .cache
214 .try_get_with_by_ref(&cache_key, async {
215 if let Some(v) = self.recycle_bin.remove(&cache_key).await {
216 if let Some(notifier) = self.notifier.as_ref() {
217 let size = v.size();
218 notifier.on_cache_insert(size);
219 notifier.on_recycle_clear(size);
220 }
221 return Ok(v);
222 }
223
224 miss = true;
225 let timer = Instant::now();
226 let dir_name = format!("{}.{}", cache_key, uuid::Uuid::new_v4());
227 let path = self.base_dir.join(&dir_name);
228
229 let size = Self::write_dir(&path, init_fn).await?;
230 if let Some(notifier) = self.notifier.as_ref() {
231 notifier.on_cache_insert(size);
232 notifier.on_load_dir(timer.elapsed());
233 }
234 let guard = Arc::new(FsDirGuard {
235 handle: handle_str,
236 path,
237 size,
238 delete_queue: self.delete_queue.clone(),
239 });
240 Ok(CacheValue::Dir(guard))
241 })
242 .await
243 .context(CacheGetSnafu)?;
244
245 let dir_size = v.size();
246 if let Some(notifier) = self.notifier.as_ref() {
247 if miss {
248 notifier.on_cache_miss(dir_size);
249 } else {
250 notifier.on_cache_hit(dir_size);
251 }
252 }
253
254 let metrics = DirMetrics {
255 cache_hit: !miss,
256 dir_size,
257 };
258
259 match v {
260 CacheValue::Dir(guard) => Ok((guard, metrics)),
261 _ => unreachable!(),
262 }
263 }
264
265 async fn put_dir(
266 &self,
267 handle: &Self::FileHandle,
268 key: &str,
269 dir_path: PathBuf,
270 size: u64,
271 ) -> Result<()> {
272 let handle_str = handle.to_string();
273 let cache_key = Self::encode_cache_key(&handle_str, key);
274
275 self.cache
276 .try_get_with(cache_key.clone(), async move {
277 if let Some(v) = self.recycle_bin.remove(&cache_key).await {
278 if let Some(notifier) = self.notifier.as_ref() {
279 let size = v.size();
280 notifier.on_cache_insert(size);
281 notifier.on_recycle_clear(size);
282 }
283 return Ok(v);
284 }
285
286 let dir_name = format!("{}.{}", cache_key, uuid::Uuid::new_v4());
287 let path = self.base_dir.join(&dir_name);
288
289 fs::rename(&dir_path, &path).await.context(RenameSnafu)?;
290 if let Some(notifier) = self.notifier.as_ref() {
291 notifier.on_cache_insert(size);
292 }
293 let guard = Arc::new(FsDirGuard {
294 handle: handle_str,
295 path,
296 size,
297 delete_queue: self.delete_queue.clone(),
298 });
299 Ok(CacheValue::Dir(guard))
300 })
301 .await
302 .map(|_| ())
303 .context(CacheGetSnafu)?;
304
305 self.cache.run_pending_tasks().await;
309 self.recycle_bin.run_pending_tasks().await;
310
311 Ok(())
312 }
313
314 async fn purge(&self, handle: &Self::FileHandle) -> Result<()> {
315 let handle_str = handle.to_string();
316 self.cache
317 .invalidate_entries_if(move |_k, v| v.handle() == handle_str)
318 .unwrap(); self.cache.run_pending_tasks().await;
320 Ok(())
321 }
322}
323
324impl<H> BoundedStager<H> {
325 fn encode_cache_key(puffin_file_name: &str, key: &str) -> String {
326 let mut hasher = Sha256::new();
327 hasher.update(puffin_file_name);
328 hasher.update(key);
329 hasher.update(puffin_file_name);
330 let hash = hasher.finalize();
331
332 BASE64_URL_SAFE.encode(hash)
333 }
334
335 async fn write_blob(
336 target_path: &PathBuf,
337 init_fn: Box<dyn InitBlobFn + Send + Sync + '_>,
338 ) -> Result<u64> {
339 let tmp_path = target_path.with_extension(TMP_EXTENSION);
342 let writer = Box::new(
343 fs::File::create(&tmp_path)
344 .await
345 .context(CreateSnafu)?
346 .compat_write(),
347 );
348 let size = init_fn(writer).await?;
349
350 fs::rename(tmp_path, target_path)
352 .await
353 .context(RenameSnafu)?;
354 Ok(size)
355 }
356
357 async fn write_dir(
358 target_path: &PathBuf,
359 init_fn: Box<dyn InitDirFn + Send + Sync + '_>,
360 ) -> Result<u64> {
361 let tmp_base = target_path.with_extension(TMP_EXTENSION);
364 let writer_provider = Box::new(MokaDirWriterProvider(tmp_base.clone()));
365 let size = init_fn(writer_provider).await?;
366
367 fs::rename(&tmp_base, target_path)
369 .await
370 .context(RenameSnafu)?;
371 Ok(size)
372 }
373
374 async fn recover(&self) -> Result<()> {
379 let timer = std::time::Instant::now();
380 info!("Recovering the staging area, base_dir: {:?}", self.base_dir);
381
382 let mut read_dir = fs::read_dir(&self.base_dir).await.context(ReadSnafu)?;
383
384 let mut elems = HashMap::new();
385 while let Some(entry) = read_dir.next_entry().await.context(ReadSnafu)? {
386 let path = entry.path();
387
388 if path.extension() == Some(TMP_EXTENSION.as_ref())
389 || path.extension() == Some(DELETED_EXTENSION.as_ref())
390 {
391 if entry.metadata().await.context(MetadataSnafu)?.is_dir() {
393 fs::remove_dir_all(path).await.context(RemoveSnafu)?;
394 } else {
395 fs::remove_file(path).await.context(RemoveSnafu)?;
396 }
397 } else {
398 let meta = entry.metadata().await.context(MetadataSnafu)?;
400 let file_path = path.file_name().unwrap().to_string_lossy().into_owned();
401
402 let key = match file_path.split('.').next() {
404 Some(key) => key.to_string(),
405 None => {
406 warn!(
407 "Invalid staging file name: {}, expected format: <key>.<uuid>",
408 file_path
409 );
410 continue;
411 }
412 };
413
414 if meta.is_dir() {
415 let size = Self::get_dir_size(&path).await?;
416 let v = CacheValue::Dir(Arc::new(FsDirGuard {
417 path,
418 size,
419 delete_queue: self.delete_queue.clone(),
420
421 handle: String::new(),
423 }));
424 let _dup_dir = elems.insert(key, v);
426 } else {
427 let size = meta.len();
428 let v = CacheValue::File(Arc::new(FsBlobGuard {
429 path,
430 size,
431 delete_queue: self.delete_queue.clone(),
432
433 handle: String::new(),
435 }));
436 let _dup_file = elems.insert(key, v);
438 }
439 }
440 }
441
442 let mut size = 0;
443 let num_elems = elems.len();
444 for (key, value) in elems {
445 size += value.size();
446 self.cache.insert(key, value).await;
447 }
448 if let Some(notifier) = self.notifier.as_ref() {
449 notifier.on_cache_insert(size);
450 }
451
452 self.cache.run_pending_tasks().await;
453
454 info!(
455 "Recovered the staging area, num_entries: {}, num_bytes: {}, cost: {:?}",
456 num_elems,
457 size,
458 timer.elapsed()
459 );
460 Ok(())
461 }
462
463 async fn get_dir_size(path: &PathBuf) -> Result<u64> {
465 let mut size = 0;
466 let mut wd = WalkDir::new(path).filter(|entry| async move {
467 match entry.file_type().await {
468 Ok(ft) if ft.is_dir() => Filtering::Ignore,
469 _ => Filtering::Continue,
470 }
471 });
472
473 while let Some(entry) = wd.next().await {
474 let entry = entry.context(WalkDirSnafu)?;
475 size += entry.metadata().await.context(MetadataSnafu)?.len();
476 }
477
478 Ok(size)
479 }
480
481 async fn delete_routine(
482 mut receiver: Receiver<DeleteTask>,
483 recycle_bin: Cache<String, CacheValue>,
484 notifier: Option<Arc<dyn StagerNotifier>>,
485 ) {
486 loop {
487 match tokio::time::timeout(RECYCLE_BIN_TTL, receiver.recv()).await {
488 Ok(Some(task)) => match task {
489 DeleteTask::File(path, size) => {
490 if let Err(err) = fs::remove_file(&path).await {
491 if err.kind() == std::io::ErrorKind::NotFound {
492 continue;
493 }
494
495 warn!(err; "Failed to remove the file.");
496 }
497
498 if let Some(notifier) = notifier.as_ref() {
499 notifier.on_recycle_clear(size);
500 }
501 }
502
503 DeleteTask::Dir(path, size) => {
504 let deleted_path = path.with_extension(DELETED_EXTENSION);
505 if let Err(err) = fs::rename(&path, &deleted_path).await {
506 if err.kind() == std::io::ErrorKind::NotFound {
507 continue;
508 }
509
510 let _ = fs::remove_dir_all(&deleted_path).await;
512 if let Err(err) = fs::rename(&path, &deleted_path).await {
513 warn!(err; "Failed to rename the dangling directory to deleted path.");
514 continue;
515 }
516 }
517 if let Err(err) = fs::remove_dir_all(&deleted_path).await {
518 warn!(err; "Failed to remove the dangling directory.");
519 }
520 if let Some(notifier) = notifier.as_ref() {
521 notifier.on_recycle_clear(size);
522 }
523 }
524 DeleteTask::Terminate => {
525 break;
526 }
527 },
528 Ok(None) => break,
529 Err(_) => {
530 recycle_bin.run_pending_tasks().await;
532 }
533 }
534 }
535
536 info!("The delete routine for the bounded stager is terminated.");
537 }
538}
539
540impl<H> Drop for BoundedStager<H> {
541 fn drop(&mut self) {
542 let _ = self.delete_queue.try_send(DeleteTask::Terminate);
543 }
544}
545
546#[derive(Debug, Clone)]
547enum CacheValue {
548 File(Arc<FsBlobGuard>),
549 Dir(Arc<FsDirGuard>),
550}
551
552impl CacheValue {
553 fn size(&self) -> u64 {
554 match self {
555 CacheValue::File(guard) => guard.size,
556 CacheValue::Dir(guard) => guard.size,
557 }
558 }
559
560 fn weight(&self) -> u32 {
561 self.size().try_into().unwrap_or(u32::MAX)
562 }
563
564 fn handle(&self) -> &str {
565 match self {
566 CacheValue::File(guard) => &guard.handle,
567 CacheValue::Dir(guard) => &guard.handle,
568 }
569 }
570}
571
572enum DeleteTask {
573 File(PathBuf, u64),
574 Dir(PathBuf, u64),
575 Terminate,
576}
577
578#[derive(Debug)]
581pub struct FsBlobGuard {
582 handle: String,
583 path: PathBuf,
584 size: u64,
585 delete_queue: Sender<DeleteTask>,
586}
587
588#[async_trait]
589impl BlobGuard for FsBlobGuard {
590 type Reader = FileReader;
591
592 async fn reader(&self) -> Result<Self::Reader> {
593 FileReader::new(&self.path).await.context(OpenSnafu)
594 }
595}
596
597impl Drop for FsBlobGuard {
598 fn drop(&mut self) {
599 if let Err(err) = self
600 .delete_queue
601 .try_send(DeleteTask::File(self.path.clone(), self.size))
602 {
603 if matches!(err, TrySendError::Closed(_)) {
604 return;
605 }
606 warn!(err; "Failed to send the delete task for the file.");
607 }
608 }
609}
610
611#[derive(Debug)]
614pub struct FsDirGuard {
615 handle: String,
616 path: PathBuf,
617 size: u64,
618 delete_queue: Sender<DeleteTask>,
619}
620
621impl DirGuard for FsDirGuard {
622 fn path(&self) -> &PathBuf {
623 &self.path
624 }
625}
626
627impl Drop for FsDirGuard {
628 fn drop(&mut self) {
629 if let Err(err) = self
630 .delete_queue
631 .try_send(DeleteTask::Dir(self.path.clone(), self.size))
632 {
633 if matches!(err, TrySendError::Closed(_)) {
634 return;
635 }
636 warn!(err; "Failed to send the delete task for the directory.");
637 }
638 }
639}
640
641struct MokaDirWriterProvider(PathBuf);
643
644#[async_trait]
645impl DirWriterProvider for MokaDirWriterProvider {
646 async fn writer(&self, rel_path: &str) -> Result<BoxWriter> {
647 let full_path = if cfg!(windows) {
648 self.0.join(rel_path.replace('/', "\\"))
649 } else {
650 self.0.join(rel_path)
651 };
652 if let Some(parent) = full_path.parent() {
653 fs::create_dir_all(parent).await.context(CreateSnafu)?;
654 }
655 Ok(Box::new(
656 fs::File::create(full_path)
657 .await
658 .context(CreateSnafu)?
659 .compat_write(),
660 ) as BoxWriter)
661 }
662}
663
664#[cfg(test)]
665impl<H> BoundedStager<H> {
666 pub async fn must_get_file(&self, puffin_file_name: &str, key: &str) -> fs::File {
667 let cache_key = Self::encode_cache_key(puffin_file_name, key);
668 let value = self.cache.get(&cache_key).await.unwrap();
669 let path = match &value {
670 CacheValue::File(guard) => &guard.path,
671 _ => panic!("Expected a file, but got a directory."),
672 };
673 fs::File::open(path).await.unwrap()
674 }
675
676 pub async fn must_get_dir(&self, puffin_file_name: &str, key: &str) -> PathBuf {
677 let cache_key = Self::encode_cache_key(puffin_file_name, key);
678 let value = self.cache.get(&cache_key).await.unwrap();
679 let path = match &value {
680 CacheValue::Dir(guard) => &guard.path,
681 _ => panic!("Expected a directory, but got a file."),
682 };
683 path.clone()
684 }
685
686 pub fn in_cache(&self, puffin_file_name: &str, key: &str) -> bool {
687 let cache_key = Self::encode_cache_key(puffin_file_name, key);
688 self.cache.contains_key(&cache_key)
689 }
690}
691
692#[cfg(test)]
693mod tests {
694 use std::sync::atomic::AtomicU64;
695
696 use common_base::range_read::RangeReader;
697 use common_test_util::temp_dir::create_temp_dir;
698 use futures::AsyncWriteExt;
699 use tokio::io::AsyncReadExt as _;
700
701 use super::*;
702 use crate::error::BlobNotFoundSnafu;
703 use crate::puffin_manager::stager::Stager;
704
705 struct MockNotifier {
706 cache_insert_size: AtomicU64,
707 cache_evict_size: AtomicU64,
708 cache_hit_count: AtomicU64,
709 cache_hit_size: AtomicU64,
710 cache_miss_count: AtomicU64,
711 cache_miss_size: AtomicU64,
712 recycle_insert_size: AtomicU64,
713 recycle_clear_size: AtomicU64,
714 }
715
716 #[derive(Debug, PartialEq, Eq)]
717 struct Stats {
718 cache_insert_size: u64,
719 cache_evict_size: u64,
720 cache_hit_count: u64,
721 cache_hit_size: u64,
722 cache_miss_count: u64,
723 cache_miss_size: u64,
724 recycle_insert_size: u64,
725 recycle_clear_size: u64,
726 }
727
728 impl MockNotifier {
729 fn build() -> Arc<MockNotifier> {
730 Arc::new(Self {
731 cache_insert_size: AtomicU64::new(0),
732 cache_evict_size: AtomicU64::new(0),
733 cache_hit_count: AtomicU64::new(0),
734 cache_hit_size: AtomicU64::new(0),
735 cache_miss_count: AtomicU64::new(0),
736 cache_miss_size: AtomicU64::new(0),
737 recycle_insert_size: AtomicU64::new(0),
738 recycle_clear_size: AtomicU64::new(0),
739 })
740 }
741
742 fn stats(&self) -> Stats {
743 Stats {
744 cache_insert_size: self
745 .cache_insert_size
746 .load(std::sync::atomic::Ordering::Relaxed),
747 cache_evict_size: self
748 .cache_evict_size
749 .load(std::sync::atomic::Ordering::Relaxed),
750 cache_hit_count: self
751 .cache_hit_count
752 .load(std::sync::atomic::Ordering::Relaxed),
753 cache_hit_size: self
754 .cache_hit_size
755 .load(std::sync::atomic::Ordering::Relaxed),
756 cache_miss_count: self
757 .cache_miss_count
758 .load(std::sync::atomic::Ordering::Relaxed),
759 cache_miss_size: self
760 .cache_miss_size
761 .load(std::sync::atomic::Ordering::Relaxed),
762 recycle_insert_size: self
763 .recycle_insert_size
764 .load(std::sync::atomic::Ordering::Relaxed),
765 recycle_clear_size: self
766 .recycle_clear_size
767 .load(std::sync::atomic::Ordering::Relaxed),
768 }
769 }
770 }
771
772 impl StagerNotifier for MockNotifier {
773 fn on_cache_insert(&self, size: u64) {
774 self.cache_insert_size
775 .fetch_add(size, std::sync::atomic::Ordering::Relaxed);
776 }
777
778 fn on_cache_evict(&self, size: u64) {
779 self.cache_evict_size
780 .fetch_add(size, std::sync::atomic::Ordering::Relaxed);
781 }
782
783 fn on_cache_hit(&self, size: u64) {
784 self.cache_hit_count
785 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
786 self.cache_hit_size
787 .fetch_add(size, std::sync::atomic::Ordering::Relaxed);
788 }
789
790 fn on_cache_miss(&self, size: u64) {
791 self.cache_miss_count
792 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
793 self.cache_miss_size
794 .fetch_add(size, std::sync::atomic::Ordering::Relaxed);
795 }
796
797 fn on_recycle_insert(&self, size: u64) {
798 self.recycle_insert_size
799 .fetch_add(size, std::sync::atomic::Ordering::Relaxed);
800 }
801
802 fn on_recycle_clear(&self, size: u64) {
803 self.recycle_clear_size
804 .fetch_add(size, std::sync::atomic::Ordering::Relaxed);
805 }
806
807 fn on_load_blob(&self, _duration: Duration) {}
808
809 fn on_load_dir(&self, _duration: Duration) {}
810 }
811
812 #[tokio::test]
813 async fn test_get_blob() {
814 let tempdir = create_temp_dir("test_get_blob_");
815 let notifier = MockNotifier::build();
816 let stager = BoundedStager::new(
817 tempdir.path().to_path_buf(),
818 u64::MAX,
819 Some(notifier.clone()),
820 None,
821 )
822 .await
823 .unwrap();
824
825 let puffin_file_name = "test_get_blob".to_string();
826 let key = "key";
827 let reader = stager
828 .get_blob(
829 &puffin_file_name,
830 key,
831 Box::new(|mut writer| {
832 Box::pin(async move {
833 writer.write_all(b"hello world").await.unwrap();
834 Ok(11)
835 })
836 }),
837 )
838 .await
839 .unwrap()
840 .reader()
841 .await
842 .unwrap();
843
844 let m = reader.metadata().await.unwrap();
845 let buf = reader.read(0..m.content_length).await.unwrap();
846 assert_eq!(&*buf, b"hello world");
847
848 let mut file = stager.must_get_file(&puffin_file_name, key).await;
849 let mut buf = Vec::new();
850 file.read_to_end(&mut buf).await.unwrap();
851 assert_eq!(buf, b"hello world");
852
853 let stats = notifier.stats();
854 assert_eq!(
855 stats,
856 Stats {
857 cache_insert_size: 11,
858 cache_evict_size: 0,
859 cache_hit_count: 0,
860 cache_hit_size: 0,
861 cache_miss_count: 1,
862 cache_miss_size: 11,
863 recycle_insert_size: 0,
864 recycle_clear_size: 0,
865 }
866 );
867 }
868
869 #[tokio::test]
870 async fn test_get_dir() {
871 let tempdir = create_temp_dir("test_get_dir_");
872 let notifier = MockNotifier::build();
873 let stager = BoundedStager::new(
874 tempdir.path().to_path_buf(),
875 u64::MAX,
876 Some(notifier.clone()),
877 None,
878 )
879 .await
880 .unwrap();
881
882 let files_in_dir = [
883 ("file_a", "Hello, world!".as_bytes()),
884 ("file_b", "Hello, Rust!".as_bytes()),
885 ("file_c", "你好,世界!".as_bytes()),
886 ("subdir/file_d", "Hello, Puffin!".as_bytes()),
887 ("subdir/subsubdir/file_e", "¡Hola mundo!".as_bytes()),
888 ];
889
890 let puffin_file_name = "test_get_dir".to_string();
891 let key = "key";
892 let (dir_path, metrics) = stager
893 .get_dir(
894 &puffin_file_name,
895 key,
896 Box::new(|writer_provider| {
897 Box::pin(async move {
898 let mut size = 0;
899 for (rel_path, content) in &files_in_dir {
900 size += content.len();
901 let mut writer = writer_provider.writer(rel_path).await.unwrap();
902 writer.write_all(content).await.unwrap();
903 }
904 Ok(size as _)
905 })
906 }),
907 )
908 .await
909 .unwrap();
910
911 assert!(!metrics.cache_hit);
912 assert!(metrics.dir_size > 0);
913
914 for (rel_path, content) in &files_in_dir {
915 let file_path = dir_path.path().join(rel_path);
916 let mut file = tokio::fs::File::open(&file_path).await.unwrap();
917 let mut buf = Vec::new();
918 file.read_to_end(&mut buf).await.unwrap();
919 assert_eq!(buf, *content);
920 }
921
922 let dir_path = stager.must_get_dir(&puffin_file_name, key).await;
923 for (rel_path, content) in &files_in_dir {
924 let file_path = dir_path.join(rel_path);
925 let mut file = tokio::fs::File::open(&file_path).await.unwrap();
926 let mut buf = Vec::new();
927 file.read_to_end(&mut buf).await.unwrap();
928 assert_eq!(buf, *content);
929 }
930
931 let stats = notifier.stats();
932 assert_eq!(
933 stats,
934 Stats {
935 cache_insert_size: 70,
936 cache_evict_size: 0,
937 cache_hit_count: 0,
938 cache_hit_size: 0,
939 cache_miss_count: 1,
940 cache_miss_size: 70,
941 recycle_insert_size: 0,
942 recycle_clear_size: 0
943 }
944 );
945 }
946
947 #[tokio::test]
948 async fn test_recover() {
949 let tempdir = create_temp_dir("test_recover_");
950 let notifier = MockNotifier::build();
951 let stager = BoundedStager::new(
952 tempdir.path().to_path_buf(),
953 u64::MAX,
954 Some(notifier.clone()),
955 None,
956 )
957 .await
958 .unwrap();
959
960 let puffin_file_name = "test_recover".to_string();
962 let blob_key = "blob_key";
963 let guard = stager
964 .get_blob(
965 &puffin_file_name,
966 blob_key,
967 Box::new(|mut writer| {
968 Box::pin(async move {
969 writer.write_all(b"hello world").await.unwrap();
970 Ok(11)
971 })
972 }),
973 )
974 .await
975 .unwrap();
976 drop(guard);
977
978 let files_in_dir = [
979 ("file_a", "Hello, world!".as_bytes()),
980 ("file_b", "Hello, Rust!".as_bytes()),
981 ("file_c", "你好,世界!".as_bytes()),
982 ("subdir/file_d", "Hello, Puffin!".as_bytes()),
983 ("subdir/subsubdir/file_e", "¡Hola mundo!".as_bytes()),
984 ];
985
986 let dir_key = "dir_key";
987 let (guard, _metrics) = stager
988 .get_dir(
989 &puffin_file_name,
990 dir_key,
991 Box::new(|writer_provider| {
992 Box::pin(async move {
993 let mut size = 0;
994 for (rel_path, content) in &files_in_dir {
995 size += content.len();
996 let mut writer = writer_provider.writer(rel_path).await.unwrap();
997 writer.write_all(content).await.unwrap();
998 }
999 Ok(size as _)
1000 })
1001 }),
1002 )
1003 .await
1004 .unwrap();
1005 drop(guard);
1006
1007 drop(stager);
1009 let stager = BoundedStager::new(tempdir.path().to_path_buf(), u64::MAX, None, None)
1010 .await
1011 .unwrap();
1012
1013 let reader = stager
1014 .get_blob(
1015 &puffin_file_name,
1016 blob_key,
1017 Box::new(|_| Box::pin(async { Ok(0) })),
1018 )
1019 .await
1020 .unwrap()
1021 .reader()
1022 .await
1023 .unwrap();
1024
1025 let m = reader.metadata().await.unwrap();
1026 let buf = reader.read(0..m.content_length).await.unwrap();
1027 assert_eq!(&*buf, b"hello world");
1028
1029 let (dir_path, metrics) = stager
1030 .get_dir(
1031 &puffin_file_name,
1032 dir_key,
1033 Box::new(|_| Box::pin(async { Ok(0) })),
1034 )
1035 .await
1036 .unwrap();
1037
1038 assert!(metrics.cache_hit);
1039 assert!(metrics.dir_size > 0);
1040 for (rel_path, content) in &files_in_dir {
1041 let file_path = dir_path.path().join(rel_path);
1042 let mut file = tokio::fs::File::open(&file_path).await.unwrap();
1043 let mut buf = Vec::new();
1044 file.read_to_end(&mut buf).await.unwrap();
1045 assert_eq!(buf, *content);
1046 }
1047
1048 let stats = notifier.stats();
1049 assert_eq!(
1050 stats,
1051 Stats {
1052 cache_insert_size: 81,
1053 cache_evict_size: 0,
1054 cache_hit_count: 0,
1055 cache_hit_size: 0,
1056 cache_miss_count: 2,
1057 cache_miss_size: 81,
1058 recycle_insert_size: 0,
1059 recycle_clear_size: 0
1060 }
1061 );
1062 }
1063
1064 #[tokio::test]
1065 async fn test_eviction() {
1066 let tempdir = create_temp_dir("test_eviction_");
1067 let notifier = MockNotifier::build();
1068 let stager = BoundedStager::new(
1069 tempdir.path().to_path_buf(),
1070 1, Some(notifier.clone()),
1072 None,
1073 )
1074 .await
1075 .unwrap();
1076
1077 let puffin_file_name = "test_eviction".to_string();
1078 let blob_key = "blob_key";
1079
1080 let reader = stager
1082 .get_blob(
1083 &puffin_file_name,
1084 blob_key,
1085 Box::new(|mut writer| {
1086 Box::pin(async move {
1087 writer.write_all(b"Hello world").await.unwrap();
1088 Ok(11)
1089 })
1090 }),
1091 )
1092 .await
1093 .unwrap()
1094 .reader()
1095 .await
1096 .unwrap();
1097
1098 stager.cache.run_pending_tasks().await;
1100 assert!(!stager.in_cache(&puffin_file_name, blob_key));
1101
1102 let stats = notifier.stats();
1103 assert_eq!(
1104 stats,
1105 Stats {
1106 cache_insert_size: 11,
1107 cache_evict_size: 11,
1108 cache_hit_count: 0,
1109 cache_hit_size: 0,
1110 cache_miss_count: 1,
1111 cache_miss_size: 11,
1112 recycle_insert_size: 11,
1113 recycle_clear_size: 0
1114 }
1115 );
1116
1117 let m = reader.metadata().await.unwrap();
1118 let buf = reader.read(0..m.content_length).await.unwrap();
1119 assert_eq!(&*buf, b"Hello world");
1120
1121 let reader = stager
1123 .get_blob(
1124 &puffin_file_name,
1125 blob_key,
1126 Box::new(|_| async { Ok(0) }.boxed()),
1127 )
1128 .await
1129 .unwrap()
1130 .reader()
1131 .await
1132 .unwrap();
1133
1134 stager.cache.run_pending_tasks().await;
1136 assert!(!stager.in_cache(&puffin_file_name, blob_key));
1137
1138 let stats = notifier.stats();
1139 assert_eq!(
1140 stats,
1141 Stats {
1142 cache_insert_size: 22,
1143 cache_evict_size: 22,
1144 cache_hit_count: 1,
1145 cache_hit_size: 11,
1146 cache_miss_count: 1,
1147 cache_miss_size: 11,
1148 recycle_insert_size: 22,
1149 recycle_clear_size: 11
1150 }
1151 );
1152
1153 let m = reader.metadata().await.unwrap();
1154 let buf = reader.read(0..m.content_length).await.unwrap();
1155 assert_eq!(&*buf, b"Hello world");
1156
1157 let dir_key = "dir_key";
1158 let files_in_dir = [
1159 ("file_a", "Hello, world!".as_bytes()),
1160 ("file_b", "Hello, Rust!".as_bytes()),
1161 ("file_c", "你好,世界!".as_bytes()),
1162 ("subdir/file_d", "Hello, Puffin!".as_bytes()),
1163 ("subdir/subsubdir/file_e", "¡Hola mundo!".as_bytes()),
1164 ];
1165
1166 let (guard_0, _metrics) = stager
1168 .get_dir(
1169 &puffin_file_name,
1170 dir_key,
1171 Box::new(|writer_provider| {
1172 Box::pin(async move {
1173 let mut size = 0;
1174 for (rel_path, content) in &files_in_dir {
1175 let mut writer = writer_provider.writer(rel_path).await.unwrap();
1176 writer.write_all(content).await.unwrap();
1177 size += content.len() as u64;
1178 }
1179 Ok(size)
1180 })
1181 }),
1182 )
1183 .await
1184 .unwrap();
1185
1186 for (rel_path, content) in &files_in_dir {
1187 let file_path = guard_0.path().join(rel_path);
1188 let mut file = tokio::fs::File::open(&file_path).await.unwrap();
1189 let mut buf = Vec::new();
1190 file.read_to_end(&mut buf).await.unwrap();
1191 assert_eq!(buf, *content);
1192 }
1193
1194 stager.cache.run_pending_tasks().await;
1196 assert!(!stager.in_cache(&puffin_file_name, dir_key));
1197
1198 let stats = notifier.stats();
1199 assert_eq!(
1200 stats,
1201 Stats {
1202 cache_insert_size: 92,
1203 cache_evict_size: 92,
1204 cache_hit_count: 1,
1205 cache_hit_size: 11,
1206 cache_miss_count: 2,
1207 cache_miss_size: 81,
1208 recycle_insert_size: 92,
1209 recycle_clear_size: 11
1210 }
1211 );
1212
1213 let (guard_1, _metrics) = stager
1215 .get_dir(
1216 &puffin_file_name,
1217 dir_key,
1218 Box::new(|_| async { Ok(0) }.boxed()),
1219 )
1220 .await
1221 .unwrap();
1222
1223 for (rel_path, content) in &files_in_dir {
1224 let file_path = guard_1.path().join(rel_path);
1225 let mut file = tokio::fs::File::open(&file_path).await.unwrap();
1226 let mut buf = Vec::new();
1227 file.read_to_end(&mut buf).await.unwrap();
1228 assert_eq!(buf, *content);
1229 }
1230
1231 stager.cache.run_pending_tasks().await;
1233 assert!(!stager.in_cache(&puffin_file_name, dir_key));
1234
1235 let stats = notifier.stats();
1236 assert_eq!(
1237 stats,
1238 Stats {
1239 cache_insert_size: 162,
1240 cache_evict_size: 162,
1241 cache_hit_count: 2,
1242 cache_hit_size: 81,
1243 cache_miss_count: 2,
1244 cache_miss_size: 81,
1245 recycle_insert_size: 162,
1246 recycle_clear_size: 81
1247 }
1248 );
1249
1250 drop(guard_0);
1252 drop(guard_1);
1253 let (guard_2, _metrics) = stager
1254 .get_dir(
1255 &puffin_file_name,
1256 dir_key,
1257 Box::new(|_| Box::pin(async move { Ok(0) })),
1258 )
1259 .await
1260 .unwrap();
1261
1262 stager.cache.run_pending_tasks().await;
1264 assert!(!stager.in_cache(&puffin_file_name, blob_key));
1265
1266 for (rel_path, content) in &files_in_dir {
1267 let file_path = guard_2.path().join(rel_path);
1268 let mut file = tokio::fs::File::open(&file_path).await.unwrap();
1269 let mut buf = Vec::new();
1270 file.read_to_end(&mut buf).await.unwrap();
1271 assert_eq!(buf, *content);
1272 }
1273
1274 let stats = notifier.stats();
1275 assert_eq!(
1276 stats,
1277 Stats {
1278 cache_insert_size: 232,
1279 cache_evict_size: 232,
1280 cache_hit_count: 3,
1281 cache_hit_size: 151,
1282 cache_miss_count: 2,
1283 cache_miss_size: 81,
1284 recycle_insert_size: 232,
1285 recycle_clear_size: 151
1286 }
1287 );
1288 }
1289
1290 #[tokio::test]
1291 async fn test_get_blob_concurrency_on_fail() {
1292 let tempdir = create_temp_dir("test_get_blob_concurrency_on_fail_");
1293 let stager = BoundedStager::new(tempdir.path().to_path_buf(), u64::MAX, None, None)
1294 .await
1295 .unwrap();
1296
1297 let puffin_file_name = "test_get_blob_concurrency_on_fail".to_string();
1298 let key = "key";
1299
1300 let stager = Arc::new(stager);
1301 let handles = (0..10)
1302 .map(|_| {
1303 let stager = stager.clone();
1304 let puffin_file_name = puffin_file_name.clone();
1305 let task = async move {
1306 let failed_init = Box::new(|_| {
1307 async {
1308 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1309 BlobNotFoundSnafu { blob: "whatever" }.fail()
1310 }
1311 .boxed()
1312 });
1313 stager.get_blob(&puffin_file_name, key, failed_init).await
1314 };
1315
1316 tokio::spawn(task)
1317 })
1318 .collect::<Vec<_>>();
1319
1320 for handle in handles {
1321 let r = handle.await.unwrap();
1322 assert!(r.is_err());
1323 }
1324
1325 assert!(!stager.in_cache(&puffin_file_name, key));
1326 }
1327
1328 #[tokio::test]
1329 async fn test_get_dir_concurrency_on_fail() {
1330 let tempdir = create_temp_dir("test_get_dir_concurrency_on_fail_");
1331 let stager = BoundedStager::new(tempdir.path().to_path_buf(), u64::MAX, None, None)
1332 .await
1333 .unwrap();
1334
1335 let puffin_file_name = "test_get_dir_concurrency_on_fail".to_string();
1336 let key = "key";
1337
1338 let stager = Arc::new(stager);
1339 let handles = (0..10)
1340 .map(|_| {
1341 let stager = stager.clone();
1342 let puffin_file_name = puffin_file_name.clone();
1343 let task = async move {
1344 let failed_init = Box::new(|_| {
1345 async {
1346 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1347 BlobNotFoundSnafu { blob: "whatever" }.fail()
1348 }
1349 .boxed()
1350 });
1351 stager.get_dir(&puffin_file_name, key, failed_init).await
1352 };
1353
1354 tokio::spawn(task)
1355 })
1356 .collect::<Vec<_>>();
1357
1358 for handle in handles {
1359 let r = handle.await.unwrap();
1360 assert!(r.is_err());
1361 }
1362
1363 assert!(!stager.in_cache(&puffin_file_name, key));
1364 }
1365
1366 #[tokio::test]
1367 async fn test_purge() {
1368 let tempdir = create_temp_dir("test_purge_");
1369 let notifier = MockNotifier::build();
1370 let stager = BoundedStager::new(
1371 tempdir.path().to_path_buf(),
1372 u64::MAX,
1373 Some(notifier.clone()),
1374 None,
1375 )
1376 .await
1377 .unwrap();
1378
1379 let puffin_file_name = "test_purge".to_string();
1381 let blob_key = "blob_key";
1382 let guard = stager
1383 .get_blob(
1384 &puffin_file_name,
1385 blob_key,
1386 Box::new(|mut writer| {
1387 Box::pin(async move {
1388 writer.write_all(b"hello world").await.unwrap();
1389 Ok(11)
1390 })
1391 }),
1392 )
1393 .await
1394 .unwrap();
1395 drop(guard);
1396
1397 let files_in_dir = [
1398 ("file_a", "Hello, world!".as_bytes()),
1399 ("file_b", "Hello, Rust!".as_bytes()),
1400 ("file_c", "你好,世界!".as_bytes()),
1401 ("subdir/file_d", "Hello, Puffin!".as_bytes()),
1402 ("subdir/subsubdir/file_e", "¡Hola mundo!".as_bytes()),
1403 ];
1404
1405 let dir_key = "dir_key";
1406 let (guard, _metrics) = stager
1407 .get_dir(
1408 &puffin_file_name,
1409 dir_key,
1410 Box::new(|writer_provider| {
1411 Box::pin(async move {
1412 let mut size = 0;
1413 for (rel_path, content) in &files_in_dir {
1414 size += content.len();
1415 let mut writer = writer_provider.writer(rel_path).await.unwrap();
1416 writer.write_all(content).await.unwrap();
1417 }
1418 Ok(size as _)
1419 })
1420 }),
1421 )
1422 .await
1423 .unwrap();
1424 drop(guard);
1425
1426 stager.purge(&puffin_file_name).await.unwrap();
1428
1429 let stats = notifier.stats();
1430 assert_eq!(
1431 stats,
1432 Stats {
1433 cache_insert_size: 81,
1434 cache_evict_size: 81,
1435 cache_hit_count: 0,
1436 cache_hit_size: 0,
1437 cache_miss_count: 2,
1438 cache_miss_size: 81,
1439 recycle_insert_size: 81,
1440 recycle_clear_size: 0
1441 }
1442 );
1443 }
1444}