1use std::collections::HashMap;
16use std::path::PathBuf;
17use std::sync::Arc;
18use std::time::{Duration, Instant};
19
20use async_trait::async_trait;
21use async_walkdir::{Filtering, WalkDir};
22use base64::prelude::BASE64_URL_SAFE;
23use base64::Engine;
24use common_base::range_read::FileReader;
25use common_runtime::runtime::RuntimeTrait;
26use common_telemetry::{info, warn};
27use futures::{FutureExt, StreamExt};
28use moka::future::Cache;
29use moka::policy::EvictionPolicy;
30use sha2::{Digest, Sha256};
31use snafu::ResultExt;
32use tokio::fs;
33use tokio::sync::mpsc::error::TrySendError;
34use tokio::sync::mpsc::{Receiver, Sender};
35use tokio_util::compat::TokioAsyncWriteCompatExt;
36
37use crate::error::{
38 CacheGetSnafu, CreateSnafu, MetadataSnafu, OpenSnafu, ReadSnafu, RemoveSnafu, RenameSnafu,
39 Result, WalkDirSnafu,
40};
41use crate::puffin_manager::stager::{
42 BoxWriter, DirWriterProvider, InitBlobFn, InitDirFn, Stager, StagerNotifier,
43};
44use crate::puffin_manager::{BlobGuard, DirGuard};
45
46const DELETE_QUEUE_SIZE: usize = 10240;
47const TMP_EXTENSION: &str = "tmp";
48const DELETED_EXTENSION: &str = "deleted";
49const RECYCLE_BIN_TTL: Duration = Duration::from_secs(60);
50
51pub struct BoundedStager<H> {
53 base_dir: PathBuf,
55
56 cache: Cache<String, CacheValue>,
58
59 recycle_bin: Cache<String, CacheValue>,
61
62 delete_queue: Sender<DeleteTask>,
72
73 notifier: Option<Arc<dyn StagerNotifier>>,
75
76 _phantom: std::marker::PhantomData<H>,
77}
78
79impl<H: 'static> BoundedStager<H> {
80 pub async fn new(
81 base_dir: PathBuf,
82 capacity: u64,
83 notifier: Option<Arc<dyn StagerNotifier>>,
84 cache_ttl: Option<Duration>,
85 ) -> Result<Self> {
86 tokio::fs::create_dir_all(&base_dir)
87 .await
88 .context(CreateSnafu)?;
89
90 let recycle_bin = Cache::builder().time_to_idle(RECYCLE_BIN_TTL).build();
91 let recycle_bin_cloned = recycle_bin.clone();
92 let notifier_cloned = notifier.clone();
93
94 let mut cache_builder = Cache::builder()
95 .max_capacity(capacity)
96 .weigher(|_: &String, v: &CacheValue| v.weight())
97 .eviction_policy(EvictionPolicy::lru())
98 .support_invalidation_closures()
99 .async_eviction_listener(move |k, v, _| {
100 let recycle_bin = recycle_bin_cloned.clone();
101 if let Some(notifier) = notifier_cloned.as_ref() {
102 notifier.on_cache_evict(v.size());
103 notifier.on_recycle_insert(v.size());
104 }
105 async move {
106 recycle_bin.insert(k.as_str().to_string(), v).await;
107 }
108 .boxed()
109 });
110 if let Some(ttl) = cache_ttl {
111 if !ttl.is_zero() {
112 cache_builder = cache_builder.time_to_live(ttl);
113 }
114 }
115 let cache = cache_builder.build();
116
117 let (delete_queue, rx) = tokio::sync::mpsc::channel(DELETE_QUEUE_SIZE);
118 let notifier_cloned = notifier.clone();
119 common_runtime::global_runtime().spawn(Self::delete_routine(
120 rx,
121 recycle_bin.clone(),
122 notifier_cloned,
123 ));
124 let stager = Self {
125 cache,
126 base_dir,
127 delete_queue,
128 recycle_bin,
129 notifier,
130 _phantom: std::marker::PhantomData,
131 };
132
133 stager.recover().await?;
134
135 Ok(stager)
136 }
137}
138
139#[async_trait]
140impl<H: ToString + Clone + Send + Sync> Stager for BoundedStager<H> {
141 type Blob = Arc<FsBlobGuard>;
142 type Dir = Arc<FsDirGuard>;
143 type FileHandle = H;
144
145 async fn get_blob<'a>(
146 &self,
147 handle: &Self::FileHandle,
148 key: &str,
149 init_fn: Box<dyn InitBlobFn + Send + Sync + 'a>,
150 ) -> Result<Self::Blob> {
151 let handle_str = handle.to_string();
152 let cache_key = Self::encode_cache_key(&handle_str, key);
153
154 let mut miss = false;
155 let v = self
156 .cache
157 .try_get_with_by_ref(&cache_key, async {
158 if let Some(v) = self.recycle_bin.remove(&cache_key).await {
159 if let Some(notifier) = self.notifier.as_ref() {
160 let size = v.size();
161 notifier.on_cache_insert(size);
162 notifier.on_recycle_clear(size);
163 }
164 return Ok(v);
165 }
166
167 miss = true;
168 let timer = Instant::now();
169 let file_name = format!("{}.{}", cache_key, uuid::Uuid::new_v4());
170 let path = self.base_dir.join(&file_name);
171
172 let size = Self::write_blob(&path, init_fn).await?;
173 if let Some(notifier) = self.notifier.as_ref() {
174 notifier.on_cache_insert(size);
175 notifier.on_load_blob(timer.elapsed());
176 }
177 let guard = Arc::new(FsBlobGuard {
178 handle: handle_str,
179 path,
180 delete_queue: self.delete_queue.clone(),
181 size,
182 });
183 Ok(CacheValue::File(guard))
184 })
185 .await
186 .context(CacheGetSnafu)?;
187
188 if let Some(notifier) = self.notifier.as_ref() {
189 if miss {
190 notifier.on_cache_miss(v.size());
191 } else {
192 notifier.on_cache_hit(v.size());
193 }
194 }
195 match v {
196 CacheValue::File(guard) => Ok(guard),
197 _ => unreachable!(),
198 }
199 }
200
201 async fn get_dir<'a>(
202 &self,
203 handle: &Self::FileHandle,
204 key: &str,
205 init_fn: Box<dyn InitDirFn + Send + Sync + 'a>,
206 ) -> Result<Self::Dir> {
207 let handle_str = handle.to_string();
208
209 let cache_key = Self::encode_cache_key(&handle_str, key);
210
211 let mut miss = false;
212 let v = self
213 .cache
214 .try_get_with_by_ref(&cache_key, async {
215 if let Some(v) = self.recycle_bin.remove(&cache_key).await {
216 if let Some(notifier) = self.notifier.as_ref() {
217 let size = v.size();
218 notifier.on_cache_insert(size);
219 notifier.on_recycle_clear(size);
220 }
221 return Ok(v);
222 }
223
224 miss = true;
225 let timer = Instant::now();
226 let dir_name = format!("{}.{}", cache_key, uuid::Uuid::new_v4());
227 let path = self.base_dir.join(&dir_name);
228
229 let size = Self::write_dir(&path, init_fn).await?;
230 if let Some(notifier) = self.notifier.as_ref() {
231 notifier.on_cache_insert(size);
232 notifier.on_load_dir(timer.elapsed());
233 }
234 let guard = Arc::new(FsDirGuard {
235 handle: handle_str,
236 path,
237 size,
238 delete_queue: self.delete_queue.clone(),
239 });
240 Ok(CacheValue::Dir(guard))
241 })
242 .await
243 .context(CacheGetSnafu)?;
244
245 if let Some(notifier) = self.notifier.as_ref() {
246 if miss {
247 notifier.on_cache_miss(v.size());
248 } else {
249 notifier.on_cache_hit(v.size());
250 }
251 }
252 match v {
253 CacheValue::Dir(guard) => Ok(guard),
254 _ => unreachable!(),
255 }
256 }
257
258 async fn put_dir(
259 &self,
260 handle: &Self::FileHandle,
261 key: &str,
262 dir_path: PathBuf,
263 size: u64,
264 ) -> Result<()> {
265 let handle_str = handle.to_string();
266 let cache_key = Self::encode_cache_key(&handle_str, key);
267
268 self.cache
269 .try_get_with(cache_key.clone(), async move {
270 if let Some(v) = self.recycle_bin.remove(&cache_key).await {
271 if let Some(notifier) = self.notifier.as_ref() {
272 let size = v.size();
273 notifier.on_cache_insert(size);
274 notifier.on_recycle_clear(size);
275 }
276 return Ok(v);
277 }
278
279 let dir_name = format!("{}.{}", cache_key, uuid::Uuid::new_v4());
280 let path = self.base_dir.join(&dir_name);
281
282 fs::rename(&dir_path, &path).await.context(RenameSnafu)?;
283 if let Some(notifier) = self.notifier.as_ref() {
284 notifier.on_cache_insert(size);
285 }
286 let guard = Arc::new(FsDirGuard {
287 handle: handle_str,
288 path,
289 size,
290 delete_queue: self.delete_queue.clone(),
291 });
292 Ok(CacheValue::Dir(guard))
293 })
294 .await
295 .map(|_| ())
296 .context(CacheGetSnafu)?;
297
298 self.cache.run_pending_tasks().await;
302 self.recycle_bin.run_pending_tasks().await;
303
304 Ok(())
305 }
306
307 async fn purge(&self, handle: &Self::FileHandle) -> Result<()> {
308 let handle_str = handle.to_string();
309 self.cache
310 .invalidate_entries_if(move |_k, v| v.handle() == handle_str)
311 .unwrap(); self.cache.run_pending_tasks().await;
313 Ok(())
314 }
315}
316
317impl<H> BoundedStager<H> {
318 fn encode_cache_key(puffin_file_name: &str, key: &str) -> String {
319 let mut hasher = Sha256::new();
320 hasher.update(puffin_file_name);
321 hasher.update(key);
322 hasher.update(puffin_file_name);
323 let hash = hasher.finalize();
324
325 BASE64_URL_SAFE.encode(hash)
326 }
327
328 async fn write_blob(
329 target_path: &PathBuf,
330 init_fn: Box<dyn InitBlobFn + Send + Sync + '_>,
331 ) -> Result<u64> {
332 let tmp_path = target_path.with_extension(TMP_EXTENSION);
335 let writer = Box::new(
336 fs::File::create(&tmp_path)
337 .await
338 .context(CreateSnafu)?
339 .compat_write(),
340 );
341 let size = init_fn(writer).await?;
342
343 fs::rename(tmp_path, target_path)
345 .await
346 .context(RenameSnafu)?;
347 Ok(size)
348 }
349
350 async fn write_dir(
351 target_path: &PathBuf,
352 init_fn: Box<dyn InitDirFn + Send + Sync + '_>,
353 ) -> Result<u64> {
354 let tmp_base = target_path.with_extension(TMP_EXTENSION);
357 let writer_provider = Box::new(MokaDirWriterProvider(tmp_base.clone()));
358 let size = init_fn(writer_provider).await?;
359
360 fs::rename(&tmp_base, target_path)
362 .await
363 .context(RenameSnafu)?;
364 Ok(size)
365 }
366
367 async fn recover(&self) -> Result<()> {
372 let mut read_dir = fs::read_dir(&self.base_dir).await.context(ReadSnafu)?;
373
374 let mut elems = HashMap::new();
375 while let Some(entry) = read_dir.next_entry().await.context(ReadSnafu)? {
376 let path = entry.path();
377
378 if path.extension() == Some(TMP_EXTENSION.as_ref())
379 || path.extension() == Some(DELETED_EXTENSION.as_ref())
380 {
381 if entry.metadata().await.context(MetadataSnafu)?.is_dir() {
383 fs::remove_dir_all(path).await.context(RemoveSnafu)?;
384 } else {
385 fs::remove_file(path).await.context(RemoveSnafu)?;
386 }
387 } else {
388 let meta = entry.metadata().await.context(MetadataSnafu)?;
390 let file_path = path.file_name().unwrap().to_string_lossy().into_owned();
391
392 let key = match file_path.split('.').next() {
394 Some(key) => key.to_string(),
395 None => {
396 warn!(
397 "Invalid staging file name: {}, expected format: <key>.<uuid>",
398 file_path
399 );
400 continue;
401 }
402 };
403
404 if meta.is_dir() {
405 let size = Self::get_dir_size(&path).await?;
406 let v = CacheValue::Dir(Arc::new(FsDirGuard {
407 path,
408 size,
409 delete_queue: self.delete_queue.clone(),
410
411 handle: String::new(),
413 }));
414 let _dup_dir = elems.insert(key, v);
416 } else {
417 let size = meta.len();
418 let v = CacheValue::File(Arc::new(FsBlobGuard {
419 path,
420 size,
421 delete_queue: self.delete_queue.clone(),
422
423 handle: String::new(),
425 }));
426 let _dup_file = elems.insert(key, v);
428 }
429 }
430 }
431
432 let mut size = 0;
433 for (key, value) in elems {
434 size += value.size();
435 self.cache.insert(key, value).await;
436 }
437 if let Some(notifier) = self.notifier.as_ref() {
438 notifier.on_cache_insert(size);
439 }
440
441 self.cache.run_pending_tasks().await;
442
443 Ok(())
444 }
445
446 async fn get_dir_size(path: &PathBuf) -> Result<u64> {
448 let mut size = 0;
449 let mut wd = WalkDir::new(path).filter(|entry| async move {
450 match entry.file_type().await {
451 Ok(ft) if ft.is_dir() => Filtering::Ignore,
452 _ => Filtering::Continue,
453 }
454 });
455
456 while let Some(entry) = wd.next().await {
457 let entry = entry.context(WalkDirSnafu)?;
458 size += entry.metadata().await.context(MetadataSnafu)?.len();
459 }
460
461 Ok(size)
462 }
463
464 async fn delete_routine(
465 mut receiver: Receiver<DeleteTask>,
466 recycle_bin: Cache<String, CacheValue>,
467 notifier: Option<Arc<dyn StagerNotifier>>,
468 ) {
469 loop {
470 match tokio::time::timeout(RECYCLE_BIN_TTL, receiver.recv()).await {
471 Ok(Some(task)) => match task {
472 DeleteTask::File(path, size) => {
473 if let Err(err) = fs::remove_file(&path).await {
474 if err.kind() == std::io::ErrorKind::NotFound {
475 continue;
476 }
477
478 warn!(err; "Failed to remove the file.");
479 }
480
481 if let Some(notifier) = notifier.as_ref() {
482 notifier.on_recycle_clear(size);
483 }
484 }
485
486 DeleteTask::Dir(path, size) => {
487 let deleted_path = path.with_extension(DELETED_EXTENSION);
488 if let Err(err) = fs::rename(&path, &deleted_path).await {
489 if err.kind() == std::io::ErrorKind::NotFound {
490 continue;
491 }
492
493 let _ = fs::remove_dir_all(&deleted_path).await;
495 if let Err(err) = fs::rename(&path, &deleted_path).await {
496 warn!(err; "Failed to rename the dangling directory to deleted path.");
497 continue;
498 }
499 }
500 if let Err(err) = fs::remove_dir_all(&deleted_path).await {
501 warn!(err; "Failed to remove the dangling directory.");
502 }
503 if let Some(notifier) = notifier.as_ref() {
504 notifier.on_recycle_clear(size);
505 }
506 }
507 DeleteTask::Terminate => {
508 break;
509 }
510 },
511 Ok(None) => break,
512 Err(_) => {
513 recycle_bin.run_pending_tasks().await;
515 }
516 }
517 }
518
519 info!("The delete routine for the bounded stager is terminated.");
520 }
521}
522
523impl<H> Drop for BoundedStager<H> {
524 fn drop(&mut self) {
525 let _ = self.delete_queue.try_send(DeleteTask::Terminate);
526 }
527}
528
529#[derive(Debug, Clone)]
530enum CacheValue {
531 File(Arc<FsBlobGuard>),
532 Dir(Arc<FsDirGuard>),
533}
534
535impl CacheValue {
536 fn size(&self) -> u64 {
537 match self {
538 CacheValue::File(guard) => guard.size,
539 CacheValue::Dir(guard) => guard.size,
540 }
541 }
542
543 fn weight(&self) -> u32 {
544 self.size().try_into().unwrap_or(u32::MAX)
545 }
546
547 fn handle(&self) -> &str {
548 match self {
549 CacheValue::File(guard) => &guard.handle,
550 CacheValue::Dir(guard) => &guard.handle,
551 }
552 }
553}
554
555enum DeleteTask {
556 File(PathBuf, u64),
557 Dir(PathBuf, u64),
558 Terminate,
559}
560
561#[derive(Debug)]
564pub struct FsBlobGuard {
565 handle: String,
566 path: PathBuf,
567 size: u64,
568 delete_queue: Sender<DeleteTask>,
569}
570
571#[async_trait]
572impl BlobGuard for FsBlobGuard {
573 type Reader = FileReader;
574
575 async fn reader(&self) -> Result<Self::Reader> {
576 FileReader::new(&self.path).await.context(OpenSnafu)
577 }
578}
579
580impl Drop for FsBlobGuard {
581 fn drop(&mut self) {
582 if let Err(err) = self
583 .delete_queue
584 .try_send(DeleteTask::File(self.path.clone(), self.size))
585 {
586 if matches!(err, TrySendError::Closed(_)) {
587 return;
588 }
589 warn!(err; "Failed to send the delete task for the file.");
590 }
591 }
592}
593
594#[derive(Debug)]
597pub struct FsDirGuard {
598 handle: String,
599 path: PathBuf,
600 size: u64,
601 delete_queue: Sender<DeleteTask>,
602}
603
604impl DirGuard for FsDirGuard {
605 fn path(&self) -> &PathBuf {
606 &self.path
607 }
608}
609
610impl Drop for FsDirGuard {
611 fn drop(&mut self) {
612 if let Err(err) = self
613 .delete_queue
614 .try_send(DeleteTask::Dir(self.path.clone(), self.size))
615 {
616 if matches!(err, TrySendError::Closed(_)) {
617 return;
618 }
619 warn!(err; "Failed to send the delete task for the directory.");
620 }
621 }
622}
623
624struct MokaDirWriterProvider(PathBuf);
626
627#[async_trait]
628impl DirWriterProvider for MokaDirWriterProvider {
629 async fn writer(&self, rel_path: &str) -> Result<BoxWriter> {
630 let full_path = if cfg!(windows) {
631 self.0.join(rel_path.replace('/', "\\"))
632 } else {
633 self.0.join(rel_path)
634 };
635 if let Some(parent) = full_path.parent() {
636 fs::create_dir_all(parent).await.context(CreateSnafu)?;
637 }
638 Ok(Box::new(
639 fs::File::create(full_path)
640 .await
641 .context(CreateSnafu)?
642 .compat_write(),
643 ) as BoxWriter)
644 }
645}
646
647#[cfg(test)]
648impl<H> BoundedStager<H> {
649 pub async fn must_get_file(&self, puffin_file_name: &str, key: &str) -> fs::File {
650 let cache_key = Self::encode_cache_key(puffin_file_name, key);
651 let value = self.cache.get(&cache_key).await.unwrap();
652 let path = match &value {
653 CacheValue::File(guard) => &guard.path,
654 _ => panic!("Expected a file, but got a directory."),
655 };
656 fs::File::open(path).await.unwrap()
657 }
658
659 pub async fn must_get_dir(&self, puffin_file_name: &str, key: &str) -> PathBuf {
660 let cache_key = Self::encode_cache_key(puffin_file_name, key);
661 let value = self.cache.get(&cache_key).await.unwrap();
662 let path = match &value {
663 CacheValue::Dir(guard) => &guard.path,
664 _ => panic!("Expected a directory, but got a file."),
665 };
666 path.clone()
667 }
668
669 pub fn in_cache(&self, puffin_file_name: &str, key: &str) -> bool {
670 let cache_key = Self::encode_cache_key(puffin_file_name, key);
671 self.cache.contains_key(&cache_key)
672 }
673}
674
675#[cfg(test)]
676mod tests {
677 use std::sync::atomic::AtomicU64;
678
679 use common_base::range_read::RangeReader;
680 use common_test_util::temp_dir::create_temp_dir;
681 use futures::AsyncWriteExt;
682 use tokio::io::AsyncReadExt as _;
683
684 use super::*;
685 use crate::error::BlobNotFoundSnafu;
686 use crate::puffin_manager::stager::Stager;
687
688 struct MockNotifier {
689 cache_insert_size: AtomicU64,
690 cache_evict_size: AtomicU64,
691 cache_hit_count: AtomicU64,
692 cache_hit_size: AtomicU64,
693 cache_miss_count: AtomicU64,
694 cache_miss_size: AtomicU64,
695 recycle_insert_size: AtomicU64,
696 recycle_clear_size: AtomicU64,
697 }
698
699 #[derive(Debug, PartialEq, Eq)]
700 struct Stats {
701 cache_insert_size: u64,
702 cache_evict_size: u64,
703 cache_hit_count: u64,
704 cache_hit_size: u64,
705 cache_miss_count: u64,
706 cache_miss_size: u64,
707 recycle_insert_size: u64,
708 recycle_clear_size: u64,
709 }
710
711 impl MockNotifier {
712 fn build() -> Arc<MockNotifier> {
713 Arc::new(Self {
714 cache_insert_size: AtomicU64::new(0),
715 cache_evict_size: AtomicU64::new(0),
716 cache_hit_count: AtomicU64::new(0),
717 cache_hit_size: AtomicU64::new(0),
718 cache_miss_count: AtomicU64::new(0),
719 cache_miss_size: AtomicU64::new(0),
720 recycle_insert_size: AtomicU64::new(0),
721 recycle_clear_size: AtomicU64::new(0),
722 })
723 }
724
725 fn stats(&self) -> Stats {
726 Stats {
727 cache_insert_size: self
728 .cache_insert_size
729 .load(std::sync::atomic::Ordering::Relaxed),
730 cache_evict_size: self
731 .cache_evict_size
732 .load(std::sync::atomic::Ordering::Relaxed),
733 cache_hit_count: self
734 .cache_hit_count
735 .load(std::sync::atomic::Ordering::Relaxed),
736 cache_hit_size: self
737 .cache_hit_size
738 .load(std::sync::atomic::Ordering::Relaxed),
739 cache_miss_count: self
740 .cache_miss_count
741 .load(std::sync::atomic::Ordering::Relaxed),
742 cache_miss_size: self
743 .cache_miss_size
744 .load(std::sync::atomic::Ordering::Relaxed),
745 recycle_insert_size: self
746 .recycle_insert_size
747 .load(std::sync::atomic::Ordering::Relaxed),
748 recycle_clear_size: self
749 .recycle_clear_size
750 .load(std::sync::atomic::Ordering::Relaxed),
751 }
752 }
753 }
754
755 impl StagerNotifier for MockNotifier {
756 fn on_cache_insert(&self, size: u64) {
757 self.cache_insert_size
758 .fetch_add(size, std::sync::atomic::Ordering::Relaxed);
759 }
760
761 fn on_cache_evict(&self, size: u64) {
762 self.cache_evict_size
763 .fetch_add(size, std::sync::atomic::Ordering::Relaxed);
764 }
765
766 fn on_cache_hit(&self, size: u64) {
767 self.cache_hit_count
768 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
769 self.cache_hit_size
770 .fetch_add(size, std::sync::atomic::Ordering::Relaxed);
771 }
772
773 fn on_cache_miss(&self, size: u64) {
774 self.cache_miss_count
775 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
776 self.cache_miss_size
777 .fetch_add(size, std::sync::atomic::Ordering::Relaxed);
778 }
779
780 fn on_recycle_insert(&self, size: u64) {
781 self.recycle_insert_size
782 .fetch_add(size, std::sync::atomic::Ordering::Relaxed);
783 }
784
785 fn on_recycle_clear(&self, size: u64) {
786 self.recycle_clear_size
787 .fetch_add(size, std::sync::atomic::Ordering::Relaxed);
788 }
789
790 fn on_load_blob(&self, _duration: Duration) {}
791
792 fn on_load_dir(&self, _duration: Duration) {}
793 }
794
795 #[tokio::test]
796 async fn test_get_blob() {
797 let tempdir = create_temp_dir("test_get_blob_");
798 let notifier = MockNotifier::build();
799 let stager = BoundedStager::new(
800 tempdir.path().to_path_buf(),
801 u64::MAX,
802 Some(notifier.clone()),
803 None,
804 )
805 .await
806 .unwrap();
807
808 let puffin_file_name = "test_get_blob".to_string();
809 let key = "key";
810 let reader = stager
811 .get_blob(
812 &puffin_file_name,
813 key,
814 Box::new(|mut writer| {
815 Box::pin(async move {
816 writer.write_all(b"hello world").await.unwrap();
817 Ok(11)
818 })
819 }),
820 )
821 .await
822 .unwrap()
823 .reader()
824 .await
825 .unwrap();
826
827 let m = reader.metadata().await.unwrap();
828 let buf = reader.read(0..m.content_length).await.unwrap();
829 assert_eq!(&*buf, b"hello world");
830
831 let mut file = stager.must_get_file(&puffin_file_name, key).await;
832 let mut buf = Vec::new();
833 file.read_to_end(&mut buf).await.unwrap();
834 assert_eq!(buf, b"hello world");
835
836 let stats = notifier.stats();
837 assert_eq!(
838 stats,
839 Stats {
840 cache_insert_size: 11,
841 cache_evict_size: 0,
842 cache_hit_count: 0,
843 cache_hit_size: 0,
844 cache_miss_count: 1,
845 cache_miss_size: 11,
846 recycle_insert_size: 0,
847 recycle_clear_size: 0,
848 }
849 );
850 }
851
852 #[tokio::test]
853 async fn test_get_dir() {
854 let tempdir = create_temp_dir("test_get_dir_");
855 let notifier = MockNotifier::build();
856 let stager = BoundedStager::new(
857 tempdir.path().to_path_buf(),
858 u64::MAX,
859 Some(notifier.clone()),
860 None,
861 )
862 .await
863 .unwrap();
864
865 let files_in_dir = [
866 ("file_a", "Hello, world!".as_bytes()),
867 ("file_b", "Hello, Rust!".as_bytes()),
868 ("file_c", "你好,世界!".as_bytes()),
869 ("subdir/file_d", "Hello, Puffin!".as_bytes()),
870 ("subdir/subsubdir/file_e", "¡Hola mundo!".as_bytes()),
871 ];
872
873 let puffin_file_name = "test_get_dir".to_string();
874 let key = "key";
875 let dir_path = stager
876 .get_dir(
877 &puffin_file_name,
878 key,
879 Box::new(|writer_provider| {
880 Box::pin(async move {
881 let mut size = 0;
882 for (rel_path, content) in &files_in_dir {
883 size += content.len();
884 let mut writer = writer_provider.writer(rel_path).await.unwrap();
885 writer.write_all(content).await.unwrap();
886 }
887 Ok(size as _)
888 })
889 }),
890 )
891 .await
892 .unwrap();
893
894 for (rel_path, content) in &files_in_dir {
895 let file_path = dir_path.path().join(rel_path);
896 let mut file = tokio::fs::File::open(&file_path).await.unwrap();
897 let mut buf = Vec::new();
898 file.read_to_end(&mut buf).await.unwrap();
899 assert_eq!(buf, *content);
900 }
901
902 let dir_path = stager.must_get_dir(&puffin_file_name, key).await;
903 for (rel_path, content) in &files_in_dir {
904 let file_path = dir_path.join(rel_path);
905 let mut file = tokio::fs::File::open(&file_path).await.unwrap();
906 let mut buf = Vec::new();
907 file.read_to_end(&mut buf).await.unwrap();
908 assert_eq!(buf, *content);
909 }
910
911 let stats = notifier.stats();
912 assert_eq!(
913 stats,
914 Stats {
915 cache_insert_size: 70,
916 cache_evict_size: 0,
917 cache_hit_count: 0,
918 cache_hit_size: 0,
919 cache_miss_count: 1,
920 cache_miss_size: 70,
921 recycle_insert_size: 0,
922 recycle_clear_size: 0
923 }
924 );
925 }
926
927 #[tokio::test]
928 async fn test_recover() {
929 let tempdir = create_temp_dir("test_recover_");
930 let notifier = MockNotifier::build();
931 let stager = BoundedStager::new(
932 tempdir.path().to_path_buf(),
933 u64::MAX,
934 Some(notifier.clone()),
935 None,
936 )
937 .await
938 .unwrap();
939
940 let puffin_file_name = "test_recover".to_string();
942 let blob_key = "blob_key";
943 let guard = stager
944 .get_blob(
945 &puffin_file_name,
946 blob_key,
947 Box::new(|mut writer| {
948 Box::pin(async move {
949 writer.write_all(b"hello world").await.unwrap();
950 Ok(11)
951 })
952 }),
953 )
954 .await
955 .unwrap();
956 drop(guard);
957
958 let files_in_dir = [
959 ("file_a", "Hello, world!".as_bytes()),
960 ("file_b", "Hello, Rust!".as_bytes()),
961 ("file_c", "你好,世界!".as_bytes()),
962 ("subdir/file_d", "Hello, Puffin!".as_bytes()),
963 ("subdir/subsubdir/file_e", "¡Hola mundo!".as_bytes()),
964 ];
965
966 let dir_key = "dir_key";
967 let guard = stager
968 .get_dir(
969 &puffin_file_name,
970 dir_key,
971 Box::new(|writer_provider| {
972 Box::pin(async move {
973 let mut size = 0;
974 for (rel_path, content) in &files_in_dir {
975 size += content.len();
976 let mut writer = writer_provider.writer(rel_path).await.unwrap();
977 writer.write_all(content).await.unwrap();
978 }
979 Ok(size as _)
980 })
981 }),
982 )
983 .await
984 .unwrap();
985 drop(guard);
986
987 drop(stager);
989 let stager = BoundedStager::new(tempdir.path().to_path_buf(), u64::MAX, None, None)
990 .await
991 .unwrap();
992
993 let reader = stager
994 .get_blob(
995 &puffin_file_name,
996 blob_key,
997 Box::new(|_| Box::pin(async { Ok(0) })),
998 )
999 .await
1000 .unwrap()
1001 .reader()
1002 .await
1003 .unwrap();
1004
1005 let m = reader.metadata().await.unwrap();
1006 let buf = reader.read(0..m.content_length).await.unwrap();
1007 assert_eq!(&*buf, b"hello world");
1008
1009 let dir_path = stager
1010 .get_dir(
1011 &puffin_file_name,
1012 dir_key,
1013 Box::new(|_| Box::pin(async { Ok(0) })),
1014 )
1015 .await
1016 .unwrap();
1017 for (rel_path, content) in &files_in_dir {
1018 let file_path = dir_path.path().join(rel_path);
1019 let mut file = tokio::fs::File::open(&file_path).await.unwrap();
1020 let mut buf = Vec::new();
1021 file.read_to_end(&mut buf).await.unwrap();
1022 assert_eq!(buf, *content);
1023 }
1024
1025 let stats = notifier.stats();
1026 assert_eq!(
1027 stats,
1028 Stats {
1029 cache_insert_size: 81,
1030 cache_evict_size: 0,
1031 cache_hit_count: 0,
1032 cache_hit_size: 0,
1033 cache_miss_count: 2,
1034 cache_miss_size: 81,
1035 recycle_insert_size: 0,
1036 recycle_clear_size: 0
1037 }
1038 );
1039 }
1040
1041 #[tokio::test]
1042 async fn test_eviction() {
1043 let tempdir = create_temp_dir("test_eviction_");
1044 let notifier = MockNotifier::build();
1045 let stager = BoundedStager::new(
1046 tempdir.path().to_path_buf(),
1047 1, Some(notifier.clone()),
1049 None,
1050 )
1051 .await
1052 .unwrap();
1053
1054 let puffin_file_name = "test_eviction".to_string();
1055 let blob_key = "blob_key";
1056
1057 let reader = stager
1059 .get_blob(
1060 &puffin_file_name,
1061 blob_key,
1062 Box::new(|mut writer| {
1063 Box::pin(async move {
1064 writer.write_all(b"Hello world").await.unwrap();
1065 Ok(11)
1066 })
1067 }),
1068 )
1069 .await
1070 .unwrap()
1071 .reader()
1072 .await
1073 .unwrap();
1074
1075 stager.cache.run_pending_tasks().await;
1077 assert!(!stager.in_cache(&puffin_file_name, blob_key));
1078
1079 let stats = notifier.stats();
1080 assert_eq!(
1081 stats,
1082 Stats {
1083 cache_insert_size: 11,
1084 cache_evict_size: 11,
1085 cache_hit_count: 0,
1086 cache_hit_size: 0,
1087 cache_miss_count: 1,
1088 cache_miss_size: 11,
1089 recycle_insert_size: 11,
1090 recycle_clear_size: 0
1091 }
1092 );
1093
1094 let m = reader.metadata().await.unwrap();
1095 let buf = reader.read(0..m.content_length).await.unwrap();
1096 assert_eq!(&*buf, b"Hello world");
1097
1098 let reader = stager
1100 .get_blob(
1101 &puffin_file_name,
1102 blob_key,
1103 Box::new(|_| async { Ok(0) }.boxed()),
1104 )
1105 .await
1106 .unwrap()
1107 .reader()
1108 .await
1109 .unwrap();
1110
1111 stager.cache.run_pending_tasks().await;
1113 assert!(!stager.in_cache(&puffin_file_name, blob_key));
1114
1115 let stats = notifier.stats();
1116 assert_eq!(
1117 stats,
1118 Stats {
1119 cache_insert_size: 22,
1120 cache_evict_size: 22,
1121 cache_hit_count: 1,
1122 cache_hit_size: 11,
1123 cache_miss_count: 1,
1124 cache_miss_size: 11,
1125 recycle_insert_size: 22,
1126 recycle_clear_size: 11
1127 }
1128 );
1129
1130 let m = reader.metadata().await.unwrap();
1131 let buf = reader.read(0..m.content_length).await.unwrap();
1132 assert_eq!(&*buf, b"Hello world");
1133
1134 let dir_key = "dir_key";
1135 let files_in_dir = [
1136 ("file_a", "Hello, world!".as_bytes()),
1137 ("file_b", "Hello, Rust!".as_bytes()),
1138 ("file_c", "你好,世界!".as_bytes()),
1139 ("subdir/file_d", "Hello, Puffin!".as_bytes()),
1140 ("subdir/subsubdir/file_e", "¡Hola mundo!".as_bytes()),
1141 ];
1142
1143 let guard_0 = stager
1145 .get_dir(
1146 &puffin_file_name,
1147 dir_key,
1148 Box::new(|writer_provider| {
1149 Box::pin(async move {
1150 let mut size = 0;
1151 for (rel_path, content) in &files_in_dir {
1152 let mut writer = writer_provider.writer(rel_path).await.unwrap();
1153 writer.write_all(content).await.unwrap();
1154 size += content.len() as u64;
1155 }
1156 Ok(size)
1157 })
1158 }),
1159 )
1160 .await
1161 .unwrap();
1162
1163 for (rel_path, content) in &files_in_dir {
1164 let file_path = guard_0.path().join(rel_path);
1165 let mut file = tokio::fs::File::open(&file_path).await.unwrap();
1166 let mut buf = Vec::new();
1167 file.read_to_end(&mut buf).await.unwrap();
1168 assert_eq!(buf, *content);
1169 }
1170
1171 stager.cache.run_pending_tasks().await;
1173 assert!(!stager.in_cache(&puffin_file_name, dir_key));
1174
1175 let stats = notifier.stats();
1176 assert_eq!(
1177 stats,
1178 Stats {
1179 cache_insert_size: 92,
1180 cache_evict_size: 92,
1181 cache_hit_count: 1,
1182 cache_hit_size: 11,
1183 cache_miss_count: 2,
1184 cache_miss_size: 81,
1185 recycle_insert_size: 92,
1186 recycle_clear_size: 11
1187 }
1188 );
1189
1190 let guard_1 = stager
1192 .get_dir(
1193 &puffin_file_name,
1194 dir_key,
1195 Box::new(|_| async { Ok(0) }.boxed()),
1196 )
1197 .await
1198 .unwrap();
1199
1200 for (rel_path, content) in &files_in_dir {
1201 let file_path = guard_1.path().join(rel_path);
1202 let mut file = tokio::fs::File::open(&file_path).await.unwrap();
1203 let mut buf = Vec::new();
1204 file.read_to_end(&mut buf).await.unwrap();
1205 assert_eq!(buf, *content);
1206 }
1207
1208 stager.cache.run_pending_tasks().await;
1210 assert!(!stager.in_cache(&puffin_file_name, dir_key));
1211
1212 let stats = notifier.stats();
1213 assert_eq!(
1214 stats,
1215 Stats {
1216 cache_insert_size: 162,
1217 cache_evict_size: 162,
1218 cache_hit_count: 2,
1219 cache_hit_size: 81,
1220 cache_miss_count: 2,
1221 cache_miss_size: 81,
1222 recycle_insert_size: 162,
1223 recycle_clear_size: 81
1224 }
1225 );
1226
1227 drop(guard_0);
1229 drop(guard_1);
1230 let guard_2 = stager
1231 .get_dir(
1232 &puffin_file_name,
1233 dir_key,
1234 Box::new(|_| Box::pin(async move { Ok(0) })),
1235 )
1236 .await
1237 .unwrap();
1238
1239 stager.cache.run_pending_tasks().await;
1241 assert!(!stager.in_cache(&puffin_file_name, blob_key));
1242
1243 for (rel_path, content) in &files_in_dir {
1244 let file_path = guard_2.path().join(rel_path);
1245 let mut file = tokio::fs::File::open(&file_path).await.unwrap();
1246 let mut buf = Vec::new();
1247 file.read_to_end(&mut buf).await.unwrap();
1248 assert_eq!(buf, *content);
1249 }
1250
1251 let stats = notifier.stats();
1252 assert_eq!(
1253 stats,
1254 Stats {
1255 cache_insert_size: 232,
1256 cache_evict_size: 232,
1257 cache_hit_count: 3,
1258 cache_hit_size: 151,
1259 cache_miss_count: 2,
1260 cache_miss_size: 81,
1261 recycle_insert_size: 232,
1262 recycle_clear_size: 151
1263 }
1264 );
1265 }
1266
1267 #[tokio::test]
1268 async fn test_get_blob_concurrency_on_fail() {
1269 let tempdir = create_temp_dir("test_get_blob_concurrency_on_fail_");
1270 let stager = BoundedStager::new(tempdir.path().to_path_buf(), u64::MAX, None, None)
1271 .await
1272 .unwrap();
1273
1274 let puffin_file_name = "test_get_blob_concurrency_on_fail".to_string();
1275 let key = "key";
1276
1277 let stager = Arc::new(stager);
1278 let handles = (0..10)
1279 .map(|_| {
1280 let stager = stager.clone();
1281 let puffin_file_name = puffin_file_name.clone();
1282 let task = async move {
1283 let failed_init = Box::new(|_| {
1284 async {
1285 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1286 BlobNotFoundSnafu { blob: "whatever" }.fail()
1287 }
1288 .boxed()
1289 });
1290 stager.get_blob(&puffin_file_name, key, failed_init).await
1291 };
1292
1293 tokio::spawn(task)
1294 })
1295 .collect::<Vec<_>>();
1296
1297 for handle in handles {
1298 let r = handle.await.unwrap();
1299 assert!(r.is_err());
1300 }
1301
1302 assert!(!stager.in_cache(&puffin_file_name, key));
1303 }
1304
1305 #[tokio::test]
1306 async fn test_get_dir_concurrency_on_fail() {
1307 let tempdir = create_temp_dir("test_get_dir_concurrency_on_fail_");
1308 let stager = BoundedStager::new(tempdir.path().to_path_buf(), u64::MAX, None, None)
1309 .await
1310 .unwrap();
1311
1312 let puffin_file_name = "test_get_dir_concurrency_on_fail".to_string();
1313 let key = "key";
1314
1315 let stager = Arc::new(stager);
1316 let handles = (0..10)
1317 .map(|_| {
1318 let stager = stager.clone();
1319 let puffin_file_name = puffin_file_name.clone();
1320 let task = async move {
1321 let failed_init = Box::new(|_| {
1322 async {
1323 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1324 BlobNotFoundSnafu { blob: "whatever" }.fail()
1325 }
1326 .boxed()
1327 });
1328 stager.get_dir(&puffin_file_name, key, failed_init).await
1329 };
1330
1331 tokio::spawn(task)
1332 })
1333 .collect::<Vec<_>>();
1334
1335 for handle in handles {
1336 let r = handle.await.unwrap();
1337 assert!(r.is_err());
1338 }
1339
1340 assert!(!stager.in_cache(&puffin_file_name, key));
1341 }
1342
1343 #[tokio::test]
1344 async fn test_purge() {
1345 let tempdir = create_temp_dir("test_purge_");
1346 let notifier = MockNotifier::build();
1347 let stager = BoundedStager::new(
1348 tempdir.path().to_path_buf(),
1349 u64::MAX,
1350 Some(notifier.clone()),
1351 None,
1352 )
1353 .await
1354 .unwrap();
1355
1356 let puffin_file_name = "test_purge".to_string();
1358 let blob_key = "blob_key";
1359 let guard = stager
1360 .get_blob(
1361 &puffin_file_name,
1362 blob_key,
1363 Box::new(|mut writer| {
1364 Box::pin(async move {
1365 writer.write_all(b"hello world").await.unwrap();
1366 Ok(11)
1367 })
1368 }),
1369 )
1370 .await
1371 .unwrap();
1372 drop(guard);
1373
1374 let files_in_dir = [
1375 ("file_a", "Hello, world!".as_bytes()),
1376 ("file_b", "Hello, Rust!".as_bytes()),
1377 ("file_c", "你好,世界!".as_bytes()),
1378 ("subdir/file_d", "Hello, Puffin!".as_bytes()),
1379 ("subdir/subsubdir/file_e", "¡Hola mundo!".as_bytes()),
1380 ];
1381
1382 let dir_key = "dir_key";
1383 let guard = stager
1384 .get_dir(
1385 &puffin_file_name,
1386 dir_key,
1387 Box::new(|writer_provider| {
1388 Box::pin(async move {
1389 let mut size = 0;
1390 for (rel_path, content) in &files_in_dir {
1391 size += content.len();
1392 let mut writer = writer_provider.writer(rel_path).await.unwrap();
1393 writer.write_all(content).await.unwrap();
1394 }
1395 Ok(size as _)
1396 })
1397 }),
1398 )
1399 .await
1400 .unwrap();
1401 drop(guard);
1402
1403 stager.purge(&puffin_file_name).await.unwrap();
1405
1406 let stats = notifier.stats();
1407 assert_eq!(
1408 stats,
1409 Stats {
1410 cache_insert_size: 81,
1411 cache_evict_size: 81,
1412 cache_hit_count: 0,
1413 cache_hit_size: 0,
1414 cache_miss_count: 2,
1415 cache_miss_size: 81,
1416 recycle_insert_size: 81,
1417 recycle_clear_size: 0
1418 }
1419 );
1420 }
1421}