1use std::path::PathBuf;
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use common_base::readable_size::ReadableSize;
22use common_telemetry::{error, info, warn};
23use futures::{FutureExt, TryStreamExt};
24use moka::future::Cache;
25use moka::notification::RemovalCause;
26use moka::policy::EvictionPolicy;
27use object_store::ObjectStore;
28use object_store::util::join_path;
29use snafu::ResultExt;
30
31use crate::error::{OpenDalSnafu, Result};
32use crate::metrics::{CACHE_BYTES, CACHE_HIT, CACHE_MISS};
33
34const MANIFEST_DIR: &str = "cache/object/manifest/";
38
39const MANIFEST_TYPE: &str = "manifest";
41
42#[derive(Debug, Clone)]
45pub struct ManifestCache {
46 local_store: ObjectStore,
48 index: Cache<String, IndexValue>,
50}
51
52impl ManifestCache {
53 pub async fn new(
55 local_store: ObjectStore,
56 capacity: ReadableSize,
57 ttl: Option<Duration>,
58 recover_sync: bool,
59 ) -> ManifestCache {
60 let total_capacity = capacity.as_bytes();
61
62 info!(
63 "Initializing manifest cache with capacity: {}",
64 ReadableSize(total_capacity)
65 );
66
67 let index = Self::build_cache(local_store.clone(), total_capacity, ttl);
68
69 let cache = ManifestCache { local_store, index };
70
71 cache.recover(recover_sync).await;
73
74 cache
75 }
76
77 fn build_cache(
79 local_store: ObjectStore,
80 capacity: u64,
81 ttl: Option<Duration>,
82 ) -> Cache<String, IndexValue> {
83 let cache_store = local_store;
84 let mut builder = Cache::builder()
85 .eviction_policy(EvictionPolicy::lru())
86 .weigher(|key: &String, value: &IndexValue| -> u32 {
87 key.len() as u32 + value.file_size
88 })
89 .max_capacity(capacity)
90 .async_eviction_listener(move |key: Arc<String>, value: IndexValue, cause| {
91 let store = cache_store.clone();
92 let file_path = join_path(MANIFEST_DIR, &key);
94 async move {
95 if let RemovalCause::Replaced = cause {
96 CACHE_BYTES
99 .with_label_values(&[MANIFEST_TYPE])
100 .sub(value.file_size.into());
101 return;
102 }
103
104 match store.delete(&file_path).await {
105 Ok(()) => {
106 CACHE_BYTES
107 .with_label_values(&[MANIFEST_TYPE])
108 .sub(value.file_size.into());
109 }
110 Err(e) => {
111 warn!(e; "Failed to delete cached manifest file {}", file_path);
112 }
113 }
114 }
115 .boxed()
116 });
117 if let Some(ttl) = ttl {
118 builder = builder.time_to_idle(ttl);
119 }
120 builder.build()
121 }
122
123 pub(crate) async fn put(&self, key: String, value: IndexValue) {
127 CACHE_BYTES
128 .with_label_values(&[MANIFEST_TYPE])
129 .add(value.file_size.into());
130 self.index.insert(key, value).await;
131
132 self.index.run_pending_tasks().await;
134 }
135
136 pub(crate) async fn get(&self, key: &str) -> Option<IndexValue> {
138 self.index.get(key).await
139 }
140
141 pub(crate) async fn remove(&self, key: &str) {
143 let file_path = self.cache_file_path(key);
144 self.index.remove(key).await;
145 if let Err(e) = self.local_store.delete(&file_path).await {
147 warn!(e; "Failed to delete a cached manifest file {}", file_path);
148 }
149 }
150
151 pub(crate) async fn remove_batch(&self, keys: &[String]) {
153 if keys.is_empty() {
154 return;
155 }
156
157 for key in keys {
158 self.index.remove(key).await;
159 }
160
161 let file_paths: Vec<String> = keys.iter().map(|key| self.cache_file_path(key)).collect();
162
163 if let Err(e) = self.local_store.delete_iter(file_paths).await {
164 warn!(e; "Failed to delete cached manifest files in batch");
165 }
166 }
167
168 async fn recover_inner(&self) -> Result<()> {
169 let now = Instant::now();
170 let mut lister = self
171 .local_store
172 .lister_with(MANIFEST_DIR)
173 .recursive(true)
174 .await
175 .context(OpenDalSnafu)?;
176 let (mut total_size, mut total_keys) = (0i64, 0);
177 while let Some(entry) = lister.try_next().await.context(OpenDalSnafu)? {
178 let meta = entry.metadata();
179 if !meta.is_file() {
180 continue;
181 }
182
183 let meta = self
184 .local_store
185 .stat(entry.path())
186 .await
187 .context(OpenDalSnafu)?;
188 let file_size = meta.content_length() as u32;
189 let key = entry.path().trim_start_matches(MANIFEST_DIR).to_string();
190 common_telemetry::debug!("Manifest cache recover {}, size: {}", key, file_size);
191 self.index.insert(key, IndexValue { file_size }).await;
192 let size = i64::from(file_size);
193 total_size += size;
194 total_keys += 1;
195 }
196 CACHE_BYTES
197 .with_label_values(&[MANIFEST_TYPE])
198 .add(total_size);
199
200 self.index.run_pending_tasks().await;
203
204 let weight = self.index.weighted_size();
205 let count = self.index.entry_count();
206 info!(
207 "Recovered manifest cache, num_keys: {}, num_bytes: {}, count: {}, weight: {}, cost: {:?}",
208 total_keys,
209 total_size,
210 count,
211 weight,
212 now.elapsed()
213 );
214 Ok(())
215 }
216
217 pub(crate) async fn recover(&self, sync: bool) {
219 let moved_self = self.clone();
220 let handle = tokio::spawn(async move {
221 if let Err(err) = moved_self.recover_inner().await {
222 error!(err; "Failed to recover manifest cache.")
223 }
224
225 moved_self.clean_empty_dirs(true).await;
226 });
227
228 if sync {
229 let _ = handle.await;
230 }
231 }
232
233 pub(crate) fn cache_file_path(&self, key: &str) -> String {
235 join_path(MANIFEST_DIR, key)
236 }
237
238 pub(crate) async fn get_file(&self, key: &str) -> Option<Vec<u8>> {
241 if self.get(key).await.is_none() {
242 CACHE_MISS.with_label_values(&[MANIFEST_TYPE]).inc();
243 return None;
244 }
245
246 let cache_file_path = self.cache_file_path(key);
247 match self.local_store.read(&cache_file_path).await {
248 Ok(data) => {
249 CACHE_HIT.with_label_values(&[MANIFEST_TYPE]).inc();
250 Some(data.to_vec())
251 }
252 Err(e) => {
253 warn!(e; "Failed to read cached manifest file {}", cache_file_path);
254 CACHE_MISS.with_label_values(&[MANIFEST_TYPE]).inc();
255 None
256 }
257 }
258 }
259
260 pub(crate) async fn put_file(&self, key: String, data: Vec<u8>) {
262 let cache_file_path = self.cache_file_path(&key);
263
264 if let Err(e) = self.local_store.write(&cache_file_path, data.clone()).await {
265 warn!(e; "Failed to write manifest to cache {}", cache_file_path);
266 return;
267 }
268
269 let file_size = data.len() as u32;
270 self.put(key, IndexValue { file_size }).await;
271 }
272
273 pub(crate) async fn clean_empty_dirs(&self, check_mtime: bool) {
278 info!("Clean empty dirs start");
279
280 let root = self.local_store.info().root();
281 let manifest_dir = PathBuf::from(root).join(MANIFEST_DIR);
282 let manifest_dir_clone = manifest_dir.clone();
283
284 let result = tokio::task::spawn_blocking(move || {
285 Self::clean_empty_dirs_sync(&manifest_dir_clone, check_mtime)
286 })
287 .await;
288
289 match result {
290 Ok(Ok(())) => {
291 info!("Clean empty dirs end");
292 }
293 Ok(Err(e)) => {
294 warn!(e; "Failed to clean empty directories under {}", manifest_dir.display());
295 }
296 Err(e) => {
297 warn!(e; "Failed to spawn blocking task for cleaning empty directories");
298 }
299 }
300 }
301
302 pub(crate) async fn clean_manifests(&self, dir: &str) {
304 info!("Clean manifest cache for directory: {}", dir);
305
306 let cache_dir = join_path(MANIFEST_DIR, dir);
307 let mut lister = match self
308 .local_store
309 .lister_with(&cache_dir)
310 .recursive(true)
311 .await
312 {
313 Ok(lister) => lister,
314 Err(e) => {
315 warn!(e; "Failed to list manifest files under {}", cache_dir);
316 return;
317 }
318 };
319
320 let mut keys_to_remove = Vec::new();
321 loop {
322 match lister.try_next().await {
323 Ok(Some(entry)) => {
324 let meta = entry.metadata();
325 if meta.is_file() {
326 keys_to_remove
327 .push(entry.path().trim_start_matches(MANIFEST_DIR).to_string());
328 }
329 }
330 Ok(None) => break,
331 Err(e) => {
332 warn!(e; "Failed to read entry while listing {}", cache_dir);
333 break;
334 }
335 }
336 }
337
338 info!(
339 "Going to remove files from manifest cache, files: {:?}",
340 keys_to_remove
341 );
342
343 self.remove_batch(&keys_to_remove).await;
345
346 let root = self.local_store.info().root();
348 let dir_path = PathBuf::from(root).join(&cache_dir);
349 let dir_path_clone = dir_path.clone();
350
351 let result = tokio::task::spawn_blocking(move || {
352 Self::clean_empty_dirs_sync(&dir_path_clone, false)
353 })
354 .await;
355
356 match result {
357 Ok(Ok(())) => {
358 info!("Cleaned manifest cache for directory: {}", dir);
359 }
360 Ok(Err(e)) => {
361 warn!(e; "Failed to clean empty directories under {}", dir_path.display());
362 }
363 Err(e) => {
364 warn!(e; "Failed to spawn blocking task for cleaning empty directories");
365 }
366 }
367 }
368
369 fn clean_empty_dirs_sync(dir: &PathBuf, check_mtime: bool) -> std::io::Result<()> {
374 let is_empty = Self::remove_empty_dirs_recursive_sync(dir, check_mtime)?;
375 if is_empty {
376 if let Err(e) = std::fs::remove_dir(dir) {
377 if e.kind() != std::io::ErrorKind::NotFound {
378 warn!(e; "Failed to remove empty root dir {}", dir.display());
379 return Err(e);
380 } else {
381 warn!("Empty root dir not found before removal {}", dir.display());
382 }
383 } else {
384 info!(
385 "Removed empty root dir {} from manifest cache",
386 dir.display()
387 );
388 }
389 }
390 Ok(())
391 }
392
393 fn remove_empty_dirs_recursive_sync(dir: &PathBuf, check_mtime: bool) -> std::io::Result<bool> {
394 common_telemetry::debug!(
395 "Maybe remove empty dir: {:?}, check_mtime: {}",
396 dir,
397 check_mtime
398 );
399 let entries = match std::fs::read_dir(dir) {
400 Ok(entries) => entries,
401 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
402 return Ok(true);
404 }
405 Err(e) => return Err(e),
406 };
407
408 let mut is_empty = true;
409 for entry in entries {
412 let entry = entry?;
413 let path = entry.path();
414 let metadata = std::fs::metadata(&path)?;
415
416 if metadata.is_dir() {
417 if check_mtime
419 && let Ok(modified) = metadata.modified()
420 && let Ok(elapsed) = modified.elapsed()
421 && elapsed < Duration::from_secs(3600)
422 {
423 common_telemetry::debug!("Skip directory by mtime, elapsed: {:?}", elapsed);
424 is_empty = false;
426 continue;
427 }
428
429 let subdir_empty = Self::remove_empty_dirs_recursive_sync(&path, check_mtime)?;
430 if subdir_empty {
431 if let Err(e) = std::fs::remove_dir(&path) {
432 if e.kind() != std::io::ErrorKind::NotFound {
433 warn!(e; "Failed to remove empty directory {}", path.display());
434 is_empty = false;
435 } else {
436 info!(
437 "Empty directory {} not found before removal",
438 path.display()
439 );
440 }
441 } else {
442 info!(
443 "Removed empty directory {} from manifest cache",
444 path.display()
445 );
446 }
447 } else {
448 is_empty = false;
449 }
450 } else {
451 is_empty = false;
452 }
453 }
454
455 Ok(is_empty)
456 }
457}
458
459#[derive(Debug, Clone)]
463pub(crate) struct IndexValue {
464 pub(crate) file_size: u32,
466}
467
468#[cfg(test)]
469mod tests {
470 use common_test_util::temp_dir::create_temp_dir;
471 use object_store::services::Fs;
472
473 use super::*;
474
475 fn new_fs_store(path: &str) -> ObjectStore {
476 let builder = Fs::default().root(path);
477 ObjectStore::new(builder).unwrap().finish()
478 }
479
480 #[tokio::test]
481 async fn test_manifest_cache_basic() {
482 common_telemetry::init_default_ut_logging();
483
484 let dir = create_temp_dir("");
485 let local_store = new_fs_store(dir.path().to_str().unwrap());
486
487 let cache = ManifestCache::new(local_store.clone(), ReadableSize::mb(10), None, true).await;
488 let key = "region_1/manifest/00000000000000000007.json";
489 let file_path = cache.cache_file_path(key);
490
491 assert!(cache.get(key).await.is_none());
493
494 local_store
496 .write(&file_path, b"manifest content".as_slice())
497 .await
498 .unwrap();
499 cache
501 .put(key.to_string(), IndexValue { file_size: 16 })
502 .await;
503
504 let value = cache.get(key).await.unwrap();
506 assert_eq!(16, value.file_size);
507
508 cache.index.run_pending_tasks().await;
510 assert_eq!(59, cache.index.weighted_size());
511
512 cache.remove(key).await;
514 cache.index.run_pending_tasks().await;
515 assert!(cache.get(key).await.is_none());
516
517 cache.index.run_pending_tasks().await;
519
520 assert!(!local_store.exists(&file_path).await.unwrap());
522 assert_eq!(0, cache.index.weighted_size());
523 }
524
525 #[tokio::test]
526 async fn test_manifest_cache_recover() {
527 common_telemetry::init_default_ut_logging();
528
529 let dir = create_temp_dir("");
530 let local_store = new_fs_store(dir.path().to_str().unwrap());
531 let cache = ManifestCache::new(local_store.clone(), ReadableSize::mb(10), None, true).await;
532
533 let keys = [
535 "region_1/manifest/00000000000000000001.json",
536 "region_1/manifest/00000000000000000002.json",
537 "region_1/manifest/00000000000000000001.checkpoint",
538 "region_2/manifest/00000000000000000001.json",
539 ];
540
541 let mut total_size = 0;
542 for (i, key) in keys.iter().enumerate() {
543 let file_path = cache.cache_file_path(key);
544 let content = format!("manifest-{}", i).into_bytes();
545 local_store
546 .write(&file_path, content.clone())
547 .await
548 .unwrap();
549
550 cache
552 .put(
553 key.to_string(),
554 IndexValue {
555 file_size: content.len() as u32,
556 },
557 )
558 .await;
559 total_size += content.len() + key.len();
560 }
561
562 let cache = ManifestCache::new(local_store.clone(), ReadableSize::mb(10), None, true).await;
564
565 cache.index.run_pending_tasks().await;
567 let total_cached = cache.index.weighted_size() as usize;
568 assert_eq!(total_size, total_cached);
569
570 for (i, key) in keys.iter().enumerate() {
572 let value = cache.get(key).await.unwrap();
573 assert_eq!(format!("manifest-{}", i).len() as u32, value.file_size);
574 }
575 }
576
577 #[tokio::test]
578 async fn test_cache_file_path() {
579 let dir = create_temp_dir("");
580 let local_store = new_fs_store(dir.path().to_str().unwrap());
581 let cache = ManifestCache::new(local_store, ReadableSize::mb(10), None, true).await;
582
583 assert_eq!(
584 "cache/object/manifest/region_1/manifest/00000000000000000007.json",
585 cache.cache_file_path("region_1/manifest/00000000000000000007.json")
586 );
587 assert_eq!(
588 "cache/object/manifest/region_1/manifest/00000000000000000007.checkpoint",
589 cache.cache_file_path("region_1/manifest/00000000000000000007.checkpoint")
590 );
591 }
592
593 #[tokio::test]
594 async fn test_clean_empty_dirs_sync_no_mtime_check() {
595 common_telemetry::init_default_ut_logging();
596
597 let dir = create_temp_dir("");
598 let root = PathBuf::from(dir.path());
599
600 let empty_dir1 = root.join("empty_dir1");
613 let empty_dir2 = root.join("empty_dir2");
614 let empty_subdir = empty_dir2.join("empty_subdir");
615 let non_empty_dir = root.join("non_empty_dir");
616 let nested = root.join("nested");
617 let nested_empty = nested.join("empty_subdir1");
618 let nested_non_empty = nested.join("non_empty_subdir");
619
620 std::fs::create_dir_all(&empty_dir1).unwrap();
622 std::fs::create_dir_all(&empty_subdir).unwrap();
623 std::fs::create_dir_all(&non_empty_dir).unwrap();
624 std::fs::create_dir_all(&nested_empty).unwrap();
625 std::fs::create_dir_all(&nested_non_empty).unwrap();
626
627 std::fs::write(non_empty_dir.join("file.txt"), b"content").unwrap();
629 std::fs::write(nested_non_empty.join("file.txt"), b"content").unwrap();
630
631 assert!(empty_dir1.exists());
633 assert!(empty_dir2.exists());
634 assert!(empty_subdir.exists());
635 assert!(non_empty_dir.exists());
636 assert!(nested.exists());
637 assert!(nested_empty.exists());
638 assert!(nested_non_empty.exists());
639
640 ManifestCache::clean_empty_dirs_sync(&root, false).unwrap();
642
643 assert!(!empty_dir1.exists());
645 assert!(!empty_dir2.exists());
646 assert!(!empty_subdir.exists());
647 assert!(!nested_empty.exists());
648
649 assert!(non_empty_dir.exists());
651 assert!(non_empty_dir.join("file.txt").exists());
652 assert!(nested.exists());
653 assert!(nested_non_empty.exists());
654 assert!(nested_non_empty.join("file.txt").exists());
655 }
656
657 #[tokio::test]
658 async fn test_clean_empty_dirs_sync_with_mtime_check() {
659 common_telemetry::init_default_ut_logging();
660
661 let dir = create_temp_dir("");
662 let root = PathBuf::from(dir.path());
663
664 let empty_dir1 = root.join("empty_dir1");
673 let empty_dir2 = root.join("empty_dir2");
674 let empty_subdir = empty_dir2.join("empty_subdir");
675 let non_empty_dir = root.join("non_empty_dir");
676
677 std::fs::create_dir_all(&empty_dir1).unwrap();
679 std::fs::create_dir_all(&empty_subdir).unwrap();
680 std::fs::create_dir_all(&non_empty_dir).unwrap();
681
682 std::fs::write(non_empty_dir.join("file.txt"), b"content").unwrap();
684
685 assert!(empty_dir1.exists());
687 assert!(empty_dir2.exists());
688 assert!(empty_subdir.exists());
689 assert!(non_empty_dir.exists());
690
691 ManifestCache::clean_empty_dirs_sync(&root, true).unwrap();
694
695 assert!(empty_dir1.exists());
697 assert!(empty_dir2.exists());
698 assert!(empty_subdir.exists());
699
700 assert!(non_empty_dir.exists());
702 assert!(non_empty_dir.join("file.txt").exists());
703 }
704}