1use std::collections::HashMap;
16use std::path::PathBuf;
17use std::sync::Arc;
18use std::time::{Duration, Instant};
19
20use async_trait::async_trait;
21use async_walkdir::{Filtering, WalkDir};
22use base64::prelude::BASE64_URL_SAFE;
23use base64::Engine;
24use common_base::range_read::FileReader;
25use common_runtime::runtime::RuntimeTrait;
26use common_telemetry::{info, warn};
27use futures::{FutureExt, StreamExt};
28use moka::future::Cache;
29use moka::policy::EvictionPolicy;
30use sha2::{Digest, Sha256};
31use snafu::ResultExt;
32use tokio::fs;
33use tokio::sync::mpsc::error::TrySendError;
34use tokio::sync::mpsc::{Receiver, Sender};
35use tokio_util::compat::TokioAsyncWriteCompatExt;
36
37use crate::error::{
38 CacheGetSnafu, CreateSnafu, MetadataSnafu, OpenSnafu, ReadSnafu, RemoveSnafu, RenameSnafu,
39 Result, WalkDirSnafu,
40};
41use crate::puffin_manager::stager::{
42 BoxWriter, DirWriterProvider, InitBlobFn, InitDirFn, Stager, StagerNotifier,
43};
44use crate::puffin_manager::{BlobGuard, DirGuard};
45
46const DELETE_QUEUE_SIZE: usize = 10240;
47const TMP_EXTENSION: &str = "tmp";
48const DELETED_EXTENSION: &str = "deleted";
49const RECYCLE_BIN_TTL: Duration = Duration::from_secs(60);
50
51pub struct BoundedStager<H> {
53 base_dir: PathBuf,
55
56 cache: Cache<String, CacheValue>,
58
59 recycle_bin: Cache<String, CacheValue>,
61
62 delete_queue: Sender<DeleteTask>,
72
73 notifier: Option<Arc<dyn StagerNotifier>>,
75
76 _phantom: std::marker::PhantomData<H>,
77}
78
79impl<H: 'static> BoundedStager<H> {
80 pub async fn new(
81 base_dir: PathBuf,
82 capacity: u64,
83 notifier: Option<Arc<dyn StagerNotifier>>,
84 cache_ttl: Option<Duration>,
85 ) -> Result<Self> {
86 tokio::fs::create_dir_all(&base_dir)
87 .await
88 .context(CreateSnafu)?;
89
90 let recycle_bin = Cache::builder().time_to_idle(RECYCLE_BIN_TTL).build();
91 let recycle_bin_cloned = recycle_bin.clone();
92 let notifier_cloned = notifier.clone();
93
94 let mut cache_builder = Cache::builder()
95 .max_capacity(capacity)
96 .weigher(|_: &String, v: &CacheValue| v.weight())
97 .eviction_policy(EvictionPolicy::lru())
98 .support_invalidation_closures()
99 .async_eviction_listener(move |k, v, _| {
100 let recycle_bin = recycle_bin_cloned.clone();
101 if let Some(notifier) = notifier_cloned.as_ref() {
102 notifier.on_cache_evict(v.size());
103 notifier.on_recycle_insert(v.size());
104 }
105 async move {
106 recycle_bin.insert(k.as_str().to_string(), v).await;
107 }
108 .boxed()
109 });
110 if let Some(ttl) = cache_ttl {
111 if !ttl.is_zero() {
112 cache_builder = cache_builder.time_to_live(ttl);
113 }
114 }
115 let cache = cache_builder.build();
116
117 let (delete_queue, rx) = tokio::sync::mpsc::channel(DELETE_QUEUE_SIZE);
118 let notifier_cloned = notifier.clone();
119 common_runtime::global_runtime().spawn(Self::delete_routine(
120 rx,
121 recycle_bin.clone(),
122 notifier_cloned,
123 ));
124 let stager = Self {
125 cache,
126 base_dir,
127 delete_queue,
128 recycle_bin,
129 notifier,
130 _phantom: std::marker::PhantomData,
131 };
132
133 stager.recover().await?;
134
135 Ok(stager)
136 }
137}
138
139#[async_trait]
140impl<H: ToString + Clone + Send + Sync> Stager for BoundedStager<H> {
141 type Blob = Arc<FsBlobGuard>;
142 type Dir = Arc<FsDirGuard>;
143 type FileHandle = H;
144
145 async fn get_blob<'a>(
146 &self,
147 handle: &Self::FileHandle,
148 key: &str,
149 init_fn: Box<dyn InitBlobFn + Send + Sync + 'a>,
150 ) -> Result<Self::Blob> {
151 let handle_str = handle.to_string();
152 let cache_key = Self::encode_cache_key(&handle_str, key);
153
154 let mut miss = false;
155 let v = self
156 .cache
157 .try_get_with_by_ref(&cache_key, async {
158 if let Some(v) = self.recycle_bin.remove(&cache_key).await {
159 if let Some(notifier) = self.notifier.as_ref() {
160 let size = v.size();
161 notifier.on_cache_insert(size);
162 notifier.on_recycle_clear(size);
163 }
164 return Ok(v);
165 }
166
167 miss = true;
168 let timer = Instant::now();
169 let file_name = format!("{}.{}", cache_key, uuid::Uuid::new_v4());
170 let path = self.base_dir.join(&file_name);
171
172 let size = Self::write_blob(&path, init_fn).await?;
173 if let Some(notifier) = self.notifier.as_ref() {
174 notifier.on_cache_insert(size);
175 notifier.on_load_blob(timer.elapsed());
176 }
177 let guard = Arc::new(FsBlobGuard {
178 handle: handle_str,
179 path,
180 delete_queue: self.delete_queue.clone(),
181 size,
182 });
183 Ok(CacheValue::File(guard))
184 })
185 .await
186 .context(CacheGetSnafu)?;
187
188 if let Some(notifier) = self.notifier.as_ref() {
189 if miss {
190 notifier.on_cache_miss(v.size());
191 } else {
192 notifier.on_cache_hit(v.size());
193 }
194 }
195 match v {
196 CacheValue::File(guard) => Ok(guard),
197 _ => unreachable!(),
198 }
199 }
200
201 async fn get_dir<'a>(
202 &self,
203 handle: &Self::FileHandle,
204 key: &str,
205 init_fn: Box<dyn InitDirFn + Send + Sync + 'a>,
206 ) -> Result<Self::Dir> {
207 let handle_str = handle.to_string();
208
209 let cache_key = Self::encode_cache_key(&handle_str, key);
210
211 let mut miss = false;
212 let v = self
213 .cache
214 .try_get_with_by_ref(&cache_key, async {
215 if let Some(v) = self.recycle_bin.remove(&cache_key).await {
216 if let Some(notifier) = self.notifier.as_ref() {
217 let size = v.size();
218 notifier.on_cache_insert(size);
219 notifier.on_recycle_clear(size);
220 }
221 return Ok(v);
222 }
223
224 miss = true;
225 let timer = Instant::now();
226 let dir_name = format!("{}.{}", cache_key, uuid::Uuid::new_v4());
227 let path = self.base_dir.join(&dir_name);
228
229 let size = Self::write_dir(&path, init_fn).await?;
230 if let Some(notifier) = self.notifier.as_ref() {
231 notifier.on_cache_insert(size);
232 notifier.on_load_dir(timer.elapsed());
233 }
234 let guard = Arc::new(FsDirGuard {
235 handle: handle_str,
236 path,
237 size,
238 delete_queue: self.delete_queue.clone(),
239 });
240 Ok(CacheValue::Dir(guard))
241 })
242 .await
243 .context(CacheGetSnafu)?;
244
245 if let Some(notifier) = self.notifier.as_ref() {
246 if miss {
247 notifier.on_cache_miss(v.size());
248 } else {
249 notifier.on_cache_hit(v.size());
250 }
251 }
252 match v {
253 CacheValue::Dir(guard) => Ok(guard),
254 _ => unreachable!(),
255 }
256 }
257
258 async fn put_dir(
259 &self,
260 handle: &Self::FileHandle,
261 key: &str,
262 dir_path: PathBuf,
263 size: u64,
264 ) -> Result<()> {
265 let handle_str = handle.to_string();
266 let cache_key = Self::encode_cache_key(&handle_str, key);
267
268 self.cache
269 .try_get_with(cache_key.clone(), async move {
270 if let Some(v) = self.recycle_bin.remove(&cache_key).await {
271 if let Some(notifier) = self.notifier.as_ref() {
272 let size = v.size();
273 notifier.on_cache_insert(size);
274 notifier.on_recycle_clear(size);
275 }
276 return Ok(v);
277 }
278
279 let dir_name = format!("{}.{}", cache_key, uuid::Uuid::new_v4());
280 let path = self.base_dir.join(&dir_name);
281
282 fs::rename(&dir_path, &path).await.context(RenameSnafu)?;
283 if let Some(notifier) = self.notifier.as_ref() {
284 notifier.on_cache_insert(size);
285 }
286 let guard = Arc::new(FsDirGuard {
287 handle: handle_str,
288 path,
289 size,
290 delete_queue: self.delete_queue.clone(),
291 });
292 Ok(CacheValue::Dir(guard))
293 })
294 .await
295 .map(|_| ())
296 .context(CacheGetSnafu)?;
297
298 self.cache.run_pending_tasks().await;
302 self.recycle_bin.run_pending_tasks().await;
303
304 Ok(())
305 }
306
307 async fn purge(&self, handle: &Self::FileHandle) -> Result<()> {
308 let handle_str = handle.to_string();
309 self.cache
310 .invalidate_entries_if(move |_k, v| v.handle() == handle_str)
311 .unwrap(); self.cache.run_pending_tasks().await;
313 Ok(())
314 }
315}
316
317impl<H> BoundedStager<H> {
318 fn encode_cache_key(puffin_file_name: &str, key: &str) -> String {
319 let mut hasher = Sha256::new();
320 hasher.update(puffin_file_name);
321 hasher.update(key);
322 hasher.update(puffin_file_name);
323 let hash = hasher.finalize();
324
325 BASE64_URL_SAFE.encode(hash)
326 }
327
328 async fn write_blob(
329 target_path: &PathBuf,
330 init_fn: Box<dyn InitBlobFn + Send + Sync + '_>,
331 ) -> Result<u64> {
332 let tmp_path = target_path.with_extension(TMP_EXTENSION);
335 let writer = Box::new(
336 fs::File::create(&tmp_path)
337 .await
338 .context(CreateSnafu)?
339 .compat_write(),
340 );
341 let size = init_fn(writer).await?;
342
343 fs::rename(tmp_path, target_path)
345 .await
346 .context(RenameSnafu)?;
347 Ok(size)
348 }
349
350 async fn write_dir(
351 target_path: &PathBuf,
352 init_fn: Box<dyn InitDirFn + Send + Sync + '_>,
353 ) -> Result<u64> {
354 let tmp_base = target_path.with_extension(TMP_EXTENSION);
357 let writer_provider = Box::new(MokaDirWriterProvider(tmp_base.clone()));
358 let size = init_fn(writer_provider).await?;
359
360 fs::rename(&tmp_base, target_path)
362 .await
363 .context(RenameSnafu)?;
364 Ok(size)
365 }
366
367 async fn recover(&self) -> Result<()> {
372 let timer = std::time::Instant::now();
373 info!("Recovering the staging area, base_dir: {:?}", self.base_dir);
374
375 let mut read_dir = fs::read_dir(&self.base_dir).await.context(ReadSnafu)?;
376
377 let mut elems = HashMap::new();
378 while let Some(entry) = read_dir.next_entry().await.context(ReadSnafu)? {
379 let path = entry.path();
380
381 if path.extension() == Some(TMP_EXTENSION.as_ref())
382 || path.extension() == Some(DELETED_EXTENSION.as_ref())
383 {
384 if entry.metadata().await.context(MetadataSnafu)?.is_dir() {
386 fs::remove_dir_all(path).await.context(RemoveSnafu)?;
387 } else {
388 fs::remove_file(path).await.context(RemoveSnafu)?;
389 }
390 } else {
391 let meta = entry.metadata().await.context(MetadataSnafu)?;
393 let file_path = path.file_name().unwrap().to_string_lossy().into_owned();
394
395 let key = match file_path.split('.').next() {
397 Some(key) => key.to_string(),
398 None => {
399 warn!(
400 "Invalid staging file name: {}, expected format: <key>.<uuid>",
401 file_path
402 );
403 continue;
404 }
405 };
406
407 if meta.is_dir() {
408 let size = Self::get_dir_size(&path).await?;
409 let v = CacheValue::Dir(Arc::new(FsDirGuard {
410 path,
411 size,
412 delete_queue: self.delete_queue.clone(),
413
414 handle: String::new(),
416 }));
417 let _dup_dir = elems.insert(key, v);
419 } else {
420 let size = meta.len();
421 let v = CacheValue::File(Arc::new(FsBlobGuard {
422 path,
423 size,
424 delete_queue: self.delete_queue.clone(),
425
426 handle: String::new(),
428 }));
429 let _dup_file = elems.insert(key, v);
431 }
432 }
433 }
434
435 let mut size = 0;
436 let num_elems = elems.len();
437 for (key, value) in elems {
438 size += value.size();
439 self.cache.insert(key, value).await;
440 }
441 if let Some(notifier) = self.notifier.as_ref() {
442 notifier.on_cache_insert(size);
443 }
444
445 self.cache.run_pending_tasks().await;
446
447 info!(
448 "Recovered the staging area, num_entries: {}, num_bytes: {}, cost: {:?}",
449 num_elems,
450 size,
451 timer.elapsed()
452 );
453 Ok(())
454 }
455
456 async fn get_dir_size(path: &PathBuf) -> Result<u64> {
458 let mut size = 0;
459 let mut wd = WalkDir::new(path).filter(|entry| async move {
460 match entry.file_type().await {
461 Ok(ft) if ft.is_dir() => Filtering::Ignore,
462 _ => Filtering::Continue,
463 }
464 });
465
466 while let Some(entry) = wd.next().await {
467 let entry = entry.context(WalkDirSnafu)?;
468 size += entry.metadata().await.context(MetadataSnafu)?.len();
469 }
470
471 Ok(size)
472 }
473
474 async fn delete_routine(
475 mut receiver: Receiver<DeleteTask>,
476 recycle_bin: Cache<String, CacheValue>,
477 notifier: Option<Arc<dyn StagerNotifier>>,
478 ) {
479 loop {
480 match tokio::time::timeout(RECYCLE_BIN_TTL, receiver.recv()).await {
481 Ok(Some(task)) => match task {
482 DeleteTask::File(path, size) => {
483 if let Err(err) = fs::remove_file(&path).await {
484 if err.kind() == std::io::ErrorKind::NotFound {
485 continue;
486 }
487
488 warn!(err; "Failed to remove the file.");
489 }
490
491 if let Some(notifier) = notifier.as_ref() {
492 notifier.on_recycle_clear(size);
493 }
494 }
495
496 DeleteTask::Dir(path, size) => {
497 let deleted_path = path.with_extension(DELETED_EXTENSION);
498 if let Err(err) = fs::rename(&path, &deleted_path).await {
499 if err.kind() == std::io::ErrorKind::NotFound {
500 continue;
501 }
502
503 let _ = fs::remove_dir_all(&deleted_path).await;
505 if let Err(err) = fs::rename(&path, &deleted_path).await {
506 warn!(err; "Failed to rename the dangling directory to deleted path.");
507 continue;
508 }
509 }
510 if let Err(err) = fs::remove_dir_all(&deleted_path).await {
511 warn!(err; "Failed to remove the dangling directory.");
512 }
513 if let Some(notifier) = notifier.as_ref() {
514 notifier.on_recycle_clear(size);
515 }
516 }
517 DeleteTask::Terminate => {
518 break;
519 }
520 },
521 Ok(None) => break,
522 Err(_) => {
523 recycle_bin.run_pending_tasks().await;
525 }
526 }
527 }
528
529 info!("The delete routine for the bounded stager is terminated.");
530 }
531}
532
533impl<H> Drop for BoundedStager<H> {
534 fn drop(&mut self) {
535 let _ = self.delete_queue.try_send(DeleteTask::Terminate);
536 }
537}
538
539#[derive(Debug, Clone)]
540enum CacheValue {
541 File(Arc<FsBlobGuard>),
542 Dir(Arc<FsDirGuard>),
543}
544
545impl CacheValue {
546 fn size(&self) -> u64 {
547 match self {
548 CacheValue::File(guard) => guard.size,
549 CacheValue::Dir(guard) => guard.size,
550 }
551 }
552
553 fn weight(&self) -> u32 {
554 self.size().try_into().unwrap_or(u32::MAX)
555 }
556
557 fn handle(&self) -> &str {
558 match self {
559 CacheValue::File(guard) => &guard.handle,
560 CacheValue::Dir(guard) => &guard.handle,
561 }
562 }
563}
564
565enum DeleteTask {
566 File(PathBuf, u64),
567 Dir(PathBuf, u64),
568 Terminate,
569}
570
571#[derive(Debug)]
574pub struct FsBlobGuard {
575 handle: String,
576 path: PathBuf,
577 size: u64,
578 delete_queue: Sender<DeleteTask>,
579}
580
581#[async_trait]
582impl BlobGuard for FsBlobGuard {
583 type Reader = FileReader;
584
585 async fn reader(&self) -> Result<Self::Reader> {
586 FileReader::new(&self.path).await.context(OpenSnafu)
587 }
588}
589
590impl Drop for FsBlobGuard {
591 fn drop(&mut self) {
592 if let Err(err) = self
593 .delete_queue
594 .try_send(DeleteTask::File(self.path.clone(), self.size))
595 {
596 if matches!(err, TrySendError::Closed(_)) {
597 return;
598 }
599 warn!(err; "Failed to send the delete task for the file.");
600 }
601 }
602}
603
604#[derive(Debug)]
607pub struct FsDirGuard {
608 handle: String,
609 path: PathBuf,
610 size: u64,
611 delete_queue: Sender<DeleteTask>,
612}
613
614impl DirGuard for FsDirGuard {
615 fn path(&self) -> &PathBuf {
616 &self.path
617 }
618}
619
620impl Drop for FsDirGuard {
621 fn drop(&mut self) {
622 if let Err(err) = self
623 .delete_queue
624 .try_send(DeleteTask::Dir(self.path.clone(), self.size))
625 {
626 if matches!(err, TrySendError::Closed(_)) {
627 return;
628 }
629 warn!(err; "Failed to send the delete task for the directory.");
630 }
631 }
632}
633
634struct MokaDirWriterProvider(PathBuf);
636
637#[async_trait]
638impl DirWriterProvider for MokaDirWriterProvider {
639 async fn writer(&self, rel_path: &str) -> Result<BoxWriter> {
640 let full_path = if cfg!(windows) {
641 self.0.join(rel_path.replace('/', "\\"))
642 } else {
643 self.0.join(rel_path)
644 };
645 if let Some(parent) = full_path.parent() {
646 fs::create_dir_all(parent).await.context(CreateSnafu)?;
647 }
648 Ok(Box::new(
649 fs::File::create(full_path)
650 .await
651 .context(CreateSnafu)?
652 .compat_write(),
653 ) as BoxWriter)
654 }
655}
656
657#[cfg(test)]
658impl<H> BoundedStager<H> {
659 pub async fn must_get_file(&self, puffin_file_name: &str, key: &str) -> fs::File {
660 let cache_key = Self::encode_cache_key(puffin_file_name, key);
661 let value = self.cache.get(&cache_key).await.unwrap();
662 let path = match &value {
663 CacheValue::File(guard) => &guard.path,
664 _ => panic!("Expected a file, but got a directory."),
665 };
666 fs::File::open(path).await.unwrap()
667 }
668
669 pub async fn must_get_dir(&self, puffin_file_name: &str, key: &str) -> PathBuf {
670 let cache_key = Self::encode_cache_key(puffin_file_name, key);
671 let value = self.cache.get(&cache_key).await.unwrap();
672 let path = match &value {
673 CacheValue::Dir(guard) => &guard.path,
674 _ => panic!("Expected a directory, but got a file."),
675 };
676 path.clone()
677 }
678
679 pub fn in_cache(&self, puffin_file_name: &str, key: &str) -> bool {
680 let cache_key = Self::encode_cache_key(puffin_file_name, key);
681 self.cache.contains_key(&cache_key)
682 }
683}
684
685#[cfg(test)]
686mod tests {
687 use std::sync::atomic::AtomicU64;
688
689 use common_base::range_read::RangeReader;
690 use common_test_util::temp_dir::create_temp_dir;
691 use futures::AsyncWriteExt;
692 use tokio::io::AsyncReadExt as _;
693
694 use super::*;
695 use crate::error::BlobNotFoundSnafu;
696 use crate::puffin_manager::stager::Stager;
697
698 struct MockNotifier {
699 cache_insert_size: AtomicU64,
700 cache_evict_size: AtomicU64,
701 cache_hit_count: AtomicU64,
702 cache_hit_size: AtomicU64,
703 cache_miss_count: AtomicU64,
704 cache_miss_size: AtomicU64,
705 recycle_insert_size: AtomicU64,
706 recycle_clear_size: AtomicU64,
707 }
708
709 #[derive(Debug, PartialEq, Eq)]
710 struct Stats {
711 cache_insert_size: u64,
712 cache_evict_size: u64,
713 cache_hit_count: u64,
714 cache_hit_size: u64,
715 cache_miss_count: u64,
716 cache_miss_size: u64,
717 recycle_insert_size: u64,
718 recycle_clear_size: u64,
719 }
720
721 impl MockNotifier {
722 fn build() -> Arc<MockNotifier> {
723 Arc::new(Self {
724 cache_insert_size: AtomicU64::new(0),
725 cache_evict_size: AtomicU64::new(0),
726 cache_hit_count: AtomicU64::new(0),
727 cache_hit_size: AtomicU64::new(0),
728 cache_miss_count: AtomicU64::new(0),
729 cache_miss_size: AtomicU64::new(0),
730 recycle_insert_size: AtomicU64::new(0),
731 recycle_clear_size: AtomicU64::new(0),
732 })
733 }
734
735 fn stats(&self) -> Stats {
736 Stats {
737 cache_insert_size: self
738 .cache_insert_size
739 .load(std::sync::atomic::Ordering::Relaxed),
740 cache_evict_size: self
741 .cache_evict_size
742 .load(std::sync::atomic::Ordering::Relaxed),
743 cache_hit_count: self
744 .cache_hit_count
745 .load(std::sync::atomic::Ordering::Relaxed),
746 cache_hit_size: self
747 .cache_hit_size
748 .load(std::sync::atomic::Ordering::Relaxed),
749 cache_miss_count: self
750 .cache_miss_count
751 .load(std::sync::atomic::Ordering::Relaxed),
752 cache_miss_size: self
753 .cache_miss_size
754 .load(std::sync::atomic::Ordering::Relaxed),
755 recycle_insert_size: self
756 .recycle_insert_size
757 .load(std::sync::atomic::Ordering::Relaxed),
758 recycle_clear_size: self
759 .recycle_clear_size
760 .load(std::sync::atomic::Ordering::Relaxed),
761 }
762 }
763 }
764
765 impl StagerNotifier for MockNotifier {
766 fn on_cache_insert(&self, size: u64) {
767 self.cache_insert_size
768 .fetch_add(size, std::sync::atomic::Ordering::Relaxed);
769 }
770
771 fn on_cache_evict(&self, size: u64) {
772 self.cache_evict_size
773 .fetch_add(size, std::sync::atomic::Ordering::Relaxed);
774 }
775
776 fn on_cache_hit(&self, size: u64) {
777 self.cache_hit_count
778 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
779 self.cache_hit_size
780 .fetch_add(size, std::sync::atomic::Ordering::Relaxed);
781 }
782
783 fn on_cache_miss(&self, size: u64) {
784 self.cache_miss_count
785 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
786 self.cache_miss_size
787 .fetch_add(size, std::sync::atomic::Ordering::Relaxed);
788 }
789
790 fn on_recycle_insert(&self, size: u64) {
791 self.recycle_insert_size
792 .fetch_add(size, std::sync::atomic::Ordering::Relaxed);
793 }
794
795 fn on_recycle_clear(&self, size: u64) {
796 self.recycle_clear_size
797 .fetch_add(size, std::sync::atomic::Ordering::Relaxed);
798 }
799
800 fn on_load_blob(&self, _duration: Duration) {}
801
802 fn on_load_dir(&self, _duration: Duration) {}
803 }
804
805 #[tokio::test]
806 async fn test_get_blob() {
807 let tempdir = create_temp_dir("test_get_blob_");
808 let notifier = MockNotifier::build();
809 let stager = BoundedStager::new(
810 tempdir.path().to_path_buf(),
811 u64::MAX,
812 Some(notifier.clone()),
813 None,
814 )
815 .await
816 .unwrap();
817
818 let puffin_file_name = "test_get_blob".to_string();
819 let key = "key";
820 let reader = stager
821 .get_blob(
822 &puffin_file_name,
823 key,
824 Box::new(|mut writer| {
825 Box::pin(async move {
826 writer.write_all(b"hello world").await.unwrap();
827 Ok(11)
828 })
829 }),
830 )
831 .await
832 .unwrap()
833 .reader()
834 .await
835 .unwrap();
836
837 let m = reader.metadata().await.unwrap();
838 let buf = reader.read(0..m.content_length).await.unwrap();
839 assert_eq!(&*buf, b"hello world");
840
841 let mut file = stager.must_get_file(&puffin_file_name, key).await;
842 let mut buf = Vec::new();
843 file.read_to_end(&mut buf).await.unwrap();
844 assert_eq!(buf, b"hello world");
845
846 let stats = notifier.stats();
847 assert_eq!(
848 stats,
849 Stats {
850 cache_insert_size: 11,
851 cache_evict_size: 0,
852 cache_hit_count: 0,
853 cache_hit_size: 0,
854 cache_miss_count: 1,
855 cache_miss_size: 11,
856 recycle_insert_size: 0,
857 recycle_clear_size: 0,
858 }
859 );
860 }
861
862 #[tokio::test]
863 async fn test_get_dir() {
864 let tempdir = create_temp_dir("test_get_dir_");
865 let notifier = MockNotifier::build();
866 let stager = BoundedStager::new(
867 tempdir.path().to_path_buf(),
868 u64::MAX,
869 Some(notifier.clone()),
870 None,
871 )
872 .await
873 .unwrap();
874
875 let files_in_dir = [
876 ("file_a", "Hello, world!".as_bytes()),
877 ("file_b", "Hello, Rust!".as_bytes()),
878 ("file_c", "你好,世界!".as_bytes()),
879 ("subdir/file_d", "Hello, Puffin!".as_bytes()),
880 ("subdir/subsubdir/file_e", "¡Hola mundo!".as_bytes()),
881 ];
882
883 let puffin_file_name = "test_get_dir".to_string();
884 let key = "key";
885 let dir_path = stager
886 .get_dir(
887 &puffin_file_name,
888 key,
889 Box::new(|writer_provider| {
890 Box::pin(async move {
891 let mut size = 0;
892 for (rel_path, content) in &files_in_dir {
893 size += content.len();
894 let mut writer = writer_provider.writer(rel_path).await.unwrap();
895 writer.write_all(content).await.unwrap();
896 }
897 Ok(size as _)
898 })
899 }),
900 )
901 .await
902 .unwrap();
903
904 for (rel_path, content) in &files_in_dir {
905 let file_path = dir_path.path().join(rel_path);
906 let mut file = tokio::fs::File::open(&file_path).await.unwrap();
907 let mut buf = Vec::new();
908 file.read_to_end(&mut buf).await.unwrap();
909 assert_eq!(buf, *content);
910 }
911
912 let dir_path = stager.must_get_dir(&puffin_file_name, key).await;
913 for (rel_path, content) in &files_in_dir {
914 let file_path = dir_path.join(rel_path);
915 let mut file = tokio::fs::File::open(&file_path).await.unwrap();
916 let mut buf = Vec::new();
917 file.read_to_end(&mut buf).await.unwrap();
918 assert_eq!(buf, *content);
919 }
920
921 let stats = notifier.stats();
922 assert_eq!(
923 stats,
924 Stats {
925 cache_insert_size: 70,
926 cache_evict_size: 0,
927 cache_hit_count: 0,
928 cache_hit_size: 0,
929 cache_miss_count: 1,
930 cache_miss_size: 70,
931 recycle_insert_size: 0,
932 recycle_clear_size: 0
933 }
934 );
935 }
936
937 #[tokio::test]
938 async fn test_recover() {
939 let tempdir = create_temp_dir("test_recover_");
940 let notifier = MockNotifier::build();
941 let stager = BoundedStager::new(
942 tempdir.path().to_path_buf(),
943 u64::MAX,
944 Some(notifier.clone()),
945 None,
946 )
947 .await
948 .unwrap();
949
950 let puffin_file_name = "test_recover".to_string();
952 let blob_key = "blob_key";
953 let guard = stager
954 .get_blob(
955 &puffin_file_name,
956 blob_key,
957 Box::new(|mut writer| {
958 Box::pin(async move {
959 writer.write_all(b"hello world").await.unwrap();
960 Ok(11)
961 })
962 }),
963 )
964 .await
965 .unwrap();
966 drop(guard);
967
968 let files_in_dir = [
969 ("file_a", "Hello, world!".as_bytes()),
970 ("file_b", "Hello, Rust!".as_bytes()),
971 ("file_c", "你好,世界!".as_bytes()),
972 ("subdir/file_d", "Hello, Puffin!".as_bytes()),
973 ("subdir/subsubdir/file_e", "¡Hola mundo!".as_bytes()),
974 ];
975
976 let dir_key = "dir_key";
977 let guard = stager
978 .get_dir(
979 &puffin_file_name,
980 dir_key,
981 Box::new(|writer_provider| {
982 Box::pin(async move {
983 let mut size = 0;
984 for (rel_path, content) in &files_in_dir {
985 size += content.len();
986 let mut writer = writer_provider.writer(rel_path).await.unwrap();
987 writer.write_all(content).await.unwrap();
988 }
989 Ok(size as _)
990 })
991 }),
992 )
993 .await
994 .unwrap();
995 drop(guard);
996
997 drop(stager);
999 let stager = BoundedStager::new(tempdir.path().to_path_buf(), u64::MAX, None, None)
1000 .await
1001 .unwrap();
1002
1003 let reader = stager
1004 .get_blob(
1005 &puffin_file_name,
1006 blob_key,
1007 Box::new(|_| Box::pin(async { Ok(0) })),
1008 )
1009 .await
1010 .unwrap()
1011 .reader()
1012 .await
1013 .unwrap();
1014
1015 let m = reader.metadata().await.unwrap();
1016 let buf = reader.read(0..m.content_length).await.unwrap();
1017 assert_eq!(&*buf, b"hello world");
1018
1019 let dir_path = stager
1020 .get_dir(
1021 &puffin_file_name,
1022 dir_key,
1023 Box::new(|_| Box::pin(async { Ok(0) })),
1024 )
1025 .await
1026 .unwrap();
1027 for (rel_path, content) in &files_in_dir {
1028 let file_path = dir_path.path().join(rel_path);
1029 let mut file = tokio::fs::File::open(&file_path).await.unwrap();
1030 let mut buf = Vec::new();
1031 file.read_to_end(&mut buf).await.unwrap();
1032 assert_eq!(buf, *content);
1033 }
1034
1035 let stats = notifier.stats();
1036 assert_eq!(
1037 stats,
1038 Stats {
1039 cache_insert_size: 81,
1040 cache_evict_size: 0,
1041 cache_hit_count: 0,
1042 cache_hit_size: 0,
1043 cache_miss_count: 2,
1044 cache_miss_size: 81,
1045 recycle_insert_size: 0,
1046 recycle_clear_size: 0
1047 }
1048 );
1049 }
1050
1051 #[tokio::test]
1052 async fn test_eviction() {
1053 let tempdir = create_temp_dir("test_eviction_");
1054 let notifier = MockNotifier::build();
1055 let stager = BoundedStager::new(
1056 tempdir.path().to_path_buf(),
1057 1, Some(notifier.clone()),
1059 None,
1060 )
1061 .await
1062 .unwrap();
1063
1064 let puffin_file_name = "test_eviction".to_string();
1065 let blob_key = "blob_key";
1066
1067 let reader = stager
1069 .get_blob(
1070 &puffin_file_name,
1071 blob_key,
1072 Box::new(|mut writer| {
1073 Box::pin(async move {
1074 writer.write_all(b"Hello world").await.unwrap();
1075 Ok(11)
1076 })
1077 }),
1078 )
1079 .await
1080 .unwrap()
1081 .reader()
1082 .await
1083 .unwrap();
1084
1085 stager.cache.run_pending_tasks().await;
1087 assert!(!stager.in_cache(&puffin_file_name, blob_key));
1088
1089 let stats = notifier.stats();
1090 assert_eq!(
1091 stats,
1092 Stats {
1093 cache_insert_size: 11,
1094 cache_evict_size: 11,
1095 cache_hit_count: 0,
1096 cache_hit_size: 0,
1097 cache_miss_count: 1,
1098 cache_miss_size: 11,
1099 recycle_insert_size: 11,
1100 recycle_clear_size: 0
1101 }
1102 );
1103
1104 let m = reader.metadata().await.unwrap();
1105 let buf = reader.read(0..m.content_length).await.unwrap();
1106 assert_eq!(&*buf, b"Hello world");
1107
1108 let reader = stager
1110 .get_blob(
1111 &puffin_file_name,
1112 blob_key,
1113 Box::new(|_| async { Ok(0) }.boxed()),
1114 )
1115 .await
1116 .unwrap()
1117 .reader()
1118 .await
1119 .unwrap();
1120
1121 stager.cache.run_pending_tasks().await;
1123 assert!(!stager.in_cache(&puffin_file_name, blob_key));
1124
1125 let stats = notifier.stats();
1126 assert_eq!(
1127 stats,
1128 Stats {
1129 cache_insert_size: 22,
1130 cache_evict_size: 22,
1131 cache_hit_count: 1,
1132 cache_hit_size: 11,
1133 cache_miss_count: 1,
1134 cache_miss_size: 11,
1135 recycle_insert_size: 22,
1136 recycle_clear_size: 11
1137 }
1138 );
1139
1140 let m = reader.metadata().await.unwrap();
1141 let buf = reader.read(0..m.content_length).await.unwrap();
1142 assert_eq!(&*buf, b"Hello world");
1143
1144 let dir_key = "dir_key";
1145 let files_in_dir = [
1146 ("file_a", "Hello, world!".as_bytes()),
1147 ("file_b", "Hello, Rust!".as_bytes()),
1148 ("file_c", "你好,世界!".as_bytes()),
1149 ("subdir/file_d", "Hello, Puffin!".as_bytes()),
1150 ("subdir/subsubdir/file_e", "¡Hola mundo!".as_bytes()),
1151 ];
1152
1153 let guard_0 = stager
1155 .get_dir(
1156 &puffin_file_name,
1157 dir_key,
1158 Box::new(|writer_provider| {
1159 Box::pin(async move {
1160 let mut size = 0;
1161 for (rel_path, content) in &files_in_dir {
1162 let mut writer = writer_provider.writer(rel_path).await.unwrap();
1163 writer.write_all(content).await.unwrap();
1164 size += content.len() as u64;
1165 }
1166 Ok(size)
1167 })
1168 }),
1169 )
1170 .await
1171 .unwrap();
1172
1173 for (rel_path, content) in &files_in_dir {
1174 let file_path = guard_0.path().join(rel_path);
1175 let mut file = tokio::fs::File::open(&file_path).await.unwrap();
1176 let mut buf = Vec::new();
1177 file.read_to_end(&mut buf).await.unwrap();
1178 assert_eq!(buf, *content);
1179 }
1180
1181 stager.cache.run_pending_tasks().await;
1183 assert!(!stager.in_cache(&puffin_file_name, dir_key));
1184
1185 let stats = notifier.stats();
1186 assert_eq!(
1187 stats,
1188 Stats {
1189 cache_insert_size: 92,
1190 cache_evict_size: 92,
1191 cache_hit_count: 1,
1192 cache_hit_size: 11,
1193 cache_miss_count: 2,
1194 cache_miss_size: 81,
1195 recycle_insert_size: 92,
1196 recycle_clear_size: 11
1197 }
1198 );
1199
1200 let guard_1 = stager
1202 .get_dir(
1203 &puffin_file_name,
1204 dir_key,
1205 Box::new(|_| async { Ok(0) }.boxed()),
1206 )
1207 .await
1208 .unwrap();
1209
1210 for (rel_path, content) in &files_in_dir {
1211 let file_path = guard_1.path().join(rel_path);
1212 let mut file = tokio::fs::File::open(&file_path).await.unwrap();
1213 let mut buf = Vec::new();
1214 file.read_to_end(&mut buf).await.unwrap();
1215 assert_eq!(buf, *content);
1216 }
1217
1218 stager.cache.run_pending_tasks().await;
1220 assert!(!stager.in_cache(&puffin_file_name, dir_key));
1221
1222 let stats = notifier.stats();
1223 assert_eq!(
1224 stats,
1225 Stats {
1226 cache_insert_size: 162,
1227 cache_evict_size: 162,
1228 cache_hit_count: 2,
1229 cache_hit_size: 81,
1230 cache_miss_count: 2,
1231 cache_miss_size: 81,
1232 recycle_insert_size: 162,
1233 recycle_clear_size: 81
1234 }
1235 );
1236
1237 drop(guard_0);
1239 drop(guard_1);
1240 let guard_2 = stager
1241 .get_dir(
1242 &puffin_file_name,
1243 dir_key,
1244 Box::new(|_| Box::pin(async move { Ok(0) })),
1245 )
1246 .await
1247 .unwrap();
1248
1249 stager.cache.run_pending_tasks().await;
1251 assert!(!stager.in_cache(&puffin_file_name, blob_key));
1252
1253 for (rel_path, content) in &files_in_dir {
1254 let file_path = guard_2.path().join(rel_path);
1255 let mut file = tokio::fs::File::open(&file_path).await.unwrap();
1256 let mut buf = Vec::new();
1257 file.read_to_end(&mut buf).await.unwrap();
1258 assert_eq!(buf, *content);
1259 }
1260
1261 let stats = notifier.stats();
1262 assert_eq!(
1263 stats,
1264 Stats {
1265 cache_insert_size: 232,
1266 cache_evict_size: 232,
1267 cache_hit_count: 3,
1268 cache_hit_size: 151,
1269 cache_miss_count: 2,
1270 cache_miss_size: 81,
1271 recycle_insert_size: 232,
1272 recycle_clear_size: 151
1273 }
1274 );
1275 }
1276
1277 #[tokio::test]
1278 async fn test_get_blob_concurrency_on_fail() {
1279 let tempdir = create_temp_dir("test_get_blob_concurrency_on_fail_");
1280 let stager = BoundedStager::new(tempdir.path().to_path_buf(), u64::MAX, None, None)
1281 .await
1282 .unwrap();
1283
1284 let puffin_file_name = "test_get_blob_concurrency_on_fail".to_string();
1285 let key = "key";
1286
1287 let stager = Arc::new(stager);
1288 let handles = (0..10)
1289 .map(|_| {
1290 let stager = stager.clone();
1291 let puffin_file_name = puffin_file_name.clone();
1292 let task = async move {
1293 let failed_init = Box::new(|_| {
1294 async {
1295 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1296 BlobNotFoundSnafu { blob: "whatever" }.fail()
1297 }
1298 .boxed()
1299 });
1300 stager.get_blob(&puffin_file_name, key, failed_init).await
1301 };
1302
1303 tokio::spawn(task)
1304 })
1305 .collect::<Vec<_>>();
1306
1307 for handle in handles {
1308 let r = handle.await.unwrap();
1309 assert!(r.is_err());
1310 }
1311
1312 assert!(!stager.in_cache(&puffin_file_name, key));
1313 }
1314
1315 #[tokio::test]
1316 async fn test_get_dir_concurrency_on_fail() {
1317 let tempdir = create_temp_dir("test_get_dir_concurrency_on_fail_");
1318 let stager = BoundedStager::new(tempdir.path().to_path_buf(), u64::MAX, None, None)
1319 .await
1320 .unwrap();
1321
1322 let puffin_file_name = "test_get_dir_concurrency_on_fail".to_string();
1323 let key = "key";
1324
1325 let stager = Arc::new(stager);
1326 let handles = (0..10)
1327 .map(|_| {
1328 let stager = stager.clone();
1329 let puffin_file_name = puffin_file_name.clone();
1330 let task = async move {
1331 let failed_init = Box::new(|_| {
1332 async {
1333 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1334 BlobNotFoundSnafu { blob: "whatever" }.fail()
1335 }
1336 .boxed()
1337 });
1338 stager.get_dir(&puffin_file_name, key, failed_init).await
1339 };
1340
1341 tokio::spawn(task)
1342 })
1343 .collect::<Vec<_>>();
1344
1345 for handle in handles {
1346 let r = handle.await.unwrap();
1347 assert!(r.is_err());
1348 }
1349
1350 assert!(!stager.in_cache(&puffin_file_name, key));
1351 }
1352
1353 #[tokio::test]
1354 async fn test_purge() {
1355 let tempdir = create_temp_dir("test_purge_");
1356 let notifier = MockNotifier::build();
1357 let stager = BoundedStager::new(
1358 tempdir.path().to_path_buf(),
1359 u64::MAX,
1360 Some(notifier.clone()),
1361 None,
1362 )
1363 .await
1364 .unwrap();
1365
1366 let puffin_file_name = "test_purge".to_string();
1368 let blob_key = "blob_key";
1369 let guard = stager
1370 .get_blob(
1371 &puffin_file_name,
1372 blob_key,
1373 Box::new(|mut writer| {
1374 Box::pin(async move {
1375 writer.write_all(b"hello world").await.unwrap();
1376 Ok(11)
1377 })
1378 }),
1379 )
1380 .await
1381 .unwrap();
1382 drop(guard);
1383
1384 let files_in_dir = [
1385 ("file_a", "Hello, world!".as_bytes()),
1386 ("file_b", "Hello, Rust!".as_bytes()),
1387 ("file_c", "你好,世界!".as_bytes()),
1388 ("subdir/file_d", "Hello, Puffin!".as_bytes()),
1389 ("subdir/subsubdir/file_e", "¡Hola mundo!".as_bytes()),
1390 ];
1391
1392 let dir_key = "dir_key";
1393 let guard = stager
1394 .get_dir(
1395 &puffin_file_name,
1396 dir_key,
1397 Box::new(|writer_provider| {
1398 Box::pin(async move {
1399 let mut size = 0;
1400 for (rel_path, content) in &files_in_dir {
1401 size += content.len();
1402 let mut writer = writer_provider.writer(rel_path).await.unwrap();
1403 writer.write_all(content).await.unwrap();
1404 }
1405 Ok(size as _)
1406 })
1407 }),
1408 )
1409 .await
1410 .unwrap();
1411 drop(guard);
1412
1413 stager.purge(&puffin_file_name).await.unwrap();
1415
1416 let stats = notifier.stats();
1417 assert_eq!(
1418 stats,
1419 Stats {
1420 cache_insert_size: 81,
1421 cache_evict_size: 81,
1422 cache_hit_count: 0,
1423 cache_hit_size: 0,
1424 cache_miss_count: 2,
1425 cache_miss_size: 81,
1426 recycle_insert_size: 81,
1427 recycle_clear_size: 0
1428 }
1429 );
1430 }
1431}