1use std::path::PathBuf;
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use common_base::readable_size::ReadableSize;
22use common_telemetry::{error, info, warn};
23use futures::{FutureExt, TryStreamExt};
24use moka::future::Cache;
25use moka::notification::RemovalCause;
26use moka::policy::EvictionPolicy;
27use object_store::ObjectStore;
28use object_store::util::join_path;
29use snafu::ResultExt;
30
31use crate::error::{OpenDalSnafu, Result};
32use crate::metrics::{CACHE_BYTES, CACHE_HIT, CACHE_MISS};
33
34const MANIFEST_DIR: &str = "cache/object/manifest/";
38
39const MANIFEST_TYPE: &str = "manifest";
41
42#[derive(Debug, Clone)]
45pub struct ManifestCache {
46 local_store: ObjectStore,
48 index: Cache<String, IndexValue>,
50}
51
52impl ManifestCache {
53 pub async fn new(
55 local_store: ObjectStore,
56 capacity: ReadableSize,
57 ttl: Option<Duration>,
58 ) -> ManifestCache {
59 let total_capacity = capacity.as_bytes();
60
61 info!(
62 "Initializing manifest cache with capacity: {}",
63 ReadableSize(total_capacity)
64 );
65
66 let index = Self::build_cache(local_store.clone(), total_capacity, ttl);
67
68 let cache = ManifestCache { local_store, index };
69
70 cache.recover(false).await;
72
73 cache
74 }
75
76 fn build_cache(
78 local_store: ObjectStore,
79 capacity: u64,
80 ttl: Option<Duration>,
81 ) -> Cache<String, IndexValue> {
82 let cache_store = local_store;
83 let mut builder = Cache::builder()
84 .eviction_policy(EvictionPolicy::lru())
85 .weigher(|key: &String, value: &IndexValue| -> u32 {
86 key.len() as u32 + value.file_size
87 })
88 .max_capacity(capacity)
89 .async_eviction_listener(move |key: Arc<String>, value: IndexValue, cause| {
90 let store = cache_store.clone();
91 let file_path = join_path(MANIFEST_DIR, &key);
93 async move {
94 if let RemovalCause::Replaced = cause {
95 CACHE_BYTES
98 .with_label_values(&[MANIFEST_TYPE])
99 .sub(value.file_size.into());
100 return;
101 }
102
103 match store.delete(&file_path).await {
104 Ok(()) => {
105 CACHE_BYTES
106 .with_label_values(&[MANIFEST_TYPE])
107 .sub(value.file_size.into());
108 }
109 Err(e) => {
110 warn!(e; "Failed to delete cached manifest file {}", file_path);
111 }
112 }
113 }
114 .boxed()
115 });
116 if let Some(ttl) = ttl {
117 builder = builder.time_to_idle(ttl);
118 }
119 builder.build()
120 }
121
122 pub(crate) async fn put(&self, key: String, value: IndexValue) {
126 CACHE_BYTES
127 .with_label_values(&[MANIFEST_TYPE])
128 .add(value.file_size.into());
129 self.index.insert(key, value).await;
130
131 self.index.run_pending_tasks().await;
133 }
134
135 pub(crate) async fn get(&self, key: &str) -> Option<IndexValue> {
137 self.index.get(key).await
138 }
139
140 pub(crate) async fn remove(&self, key: &str) {
142 let file_path = self.cache_file_path(key);
143 self.index.remove(key).await;
144 if let Err(e) = self.local_store.delete(&file_path).await {
146 warn!(e; "Failed to delete a cached manifest file {}", file_path);
147 }
148 }
149
150 pub(crate) async fn remove_batch(&self, keys: &[String]) {
152 if keys.is_empty() {
153 return;
154 }
155
156 for key in keys {
157 self.index.remove(key).await;
158 }
159
160 let file_paths: Vec<String> = keys.iter().map(|key| self.cache_file_path(key)).collect();
161
162 if let Err(e) = self.local_store.delete_iter(file_paths).await {
163 warn!(e; "Failed to delete cached manifest files in batch");
164 }
165 }
166
167 async fn recover_inner(&self) -> Result<()> {
168 let now = Instant::now();
169 let mut lister = self
170 .local_store
171 .lister_with(MANIFEST_DIR)
172 .recursive(true)
173 .await
174 .context(OpenDalSnafu)?;
175 let (mut total_size, mut total_keys) = (0i64, 0);
176 while let Some(entry) = lister.try_next().await.context(OpenDalSnafu)? {
177 let meta = entry.metadata();
178 if !meta.is_file() {
179 continue;
180 }
181
182 let meta = self
183 .local_store
184 .stat(entry.path())
185 .await
186 .context(OpenDalSnafu)?;
187 let file_size = meta.content_length() as u32;
188 let key = entry.path().trim_start_matches(MANIFEST_DIR).to_string();
189 common_telemetry::info!("Manifest cache recover {}, size: {}", key, file_size);
190 self.index.insert(key, IndexValue { file_size }).await;
191 let size = i64::from(file_size);
192 total_size += size;
193 total_keys += 1;
194 }
195 CACHE_BYTES
196 .with_label_values(&[MANIFEST_TYPE])
197 .add(total_size);
198
199 self.index.run_pending_tasks().await;
202
203 let weight = self.index.weighted_size();
204 let count = self.index.entry_count();
205 info!(
206 "Recovered manifest cache, num_keys: {}, num_bytes: {}, count: {}, weight: {}, cost: {:?}",
207 total_keys,
208 total_size,
209 count,
210 weight,
211 now.elapsed()
212 );
213 Ok(())
214 }
215
216 pub(crate) async fn recover(&self, sync: bool) {
218 let moved_self = self.clone();
219 let handle = tokio::spawn(async move {
220 if let Err(err) = moved_self.recover_inner().await {
221 error!(err; "Failed to recover manifest cache.")
222 }
223
224 moved_self.clean_empty_dirs(true).await;
225 });
226
227 if sync {
228 let _ = handle.await;
229 }
230 }
231
232 pub(crate) fn cache_file_path(&self, key: &str) -> String {
234 join_path(MANIFEST_DIR, key)
235 }
236
237 pub(crate) async fn get_file(&self, key: &str) -> Option<Vec<u8>> {
240 if self.get(key).await.is_none() {
241 CACHE_MISS.with_label_values(&[MANIFEST_TYPE]).inc();
242 return None;
243 }
244
245 let cache_file_path = self.cache_file_path(key);
246 match self.local_store.read(&cache_file_path).await {
247 Ok(data) => {
248 CACHE_HIT.with_label_values(&[MANIFEST_TYPE]).inc();
249 Some(data.to_vec())
250 }
251 Err(e) => {
252 warn!(e; "Failed to read cached manifest file {}", cache_file_path);
253 CACHE_MISS.with_label_values(&[MANIFEST_TYPE]).inc();
254 None
255 }
256 }
257 }
258
259 pub(crate) async fn put_file(&self, key: String, data: Vec<u8>) {
261 let cache_file_path = self.cache_file_path(&key);
262
263 if let Err(e) = self.local_store.write(&cache_file_path, data.clone()).await {
264 warn!(e; "Failed to write manifest to cache {}", cache_file_path);
265 return;
266 }
267
268 let file_size = data.len() as u32;
269 self.put(key, IndexValue { file_size }).await;
270 }
271
272 pub(crate) async fn clean_empty_dirs(&self, check_mtime: bool) {
277 info!("Clean empty dirs start");
278
279 let root = self.local_store.info().root();
280 let manifest_dir = PathBuf::from(root).join(MANIFEST_DIR);
281 let manifest_dir_clone = manifest_dir.clone();
282
283 let result = tokio::task::spawn_blocking(move || {
284 Self::clean_empty_dirs_sync(&manifest_dir_clone, check_mtime)
285 })
286 .await;
287
288 match result {
289 Ok(Ok(())) => {
290 info!("Clean empty dirs end");
291 }
292 Ok(Err(e)) => {
293 warn!(e; "Failed to clean empty directories under {}", manifest_dir.display());
294 }
295 Err(e) => {
296 warn!(e; "Failed to spawn blocking task for cleaning empty directories");
297 }
298 }
299 }
300
301 pub(crate) async fn clean_manifests(&self, dir: &str) {
303 info!("Clean manifest cache for directory: {}", dir);
304
305 let cache_dir = join_path(MANIFEST_DIR, dir);
306 let mut lister = match self
307 .local_store
308 .lister_with(&cache_dir)
309 .recursive(true)
310 .await
311 {
312 Ok(lister) => lister,
313 Err(e) => {
314 warn!(e; "Failed to list manifest files under {}", cache_dir);
315 return;
316 }
317 };
318
319 let mut keys_to_remove = Vec::new();
320 loop {
321 match lister.try_next().await {
322 Ok(Some(entry)) => {
323 let meta = entry.metadata();
324 if meta.is_file() {
325 keys_to_remove
326 .push(entry.path().trim_start_matches(MANIFEST_DIR).to_string());
327 }
328 }
329 Ok(None) => break,
330 Err(e) => {
331 warn!(e; "Failed to read entry while listing {}", cache_dir);
332 break;
333 }
334 }
335 }
336
337 info!(
338 "Going to remove files from manifest cache, files: {:?}",
339 keys_to_remove
340 );
341
342 self.remove_batch(&keys_to_remove).await;
344
345 let root = self.local_store.info().root();
347 let dir_path = PathBuf::from(root).join(&cache_dir);
348 let dir_path_clone = dir_path.clone();
349
350 let result = tokio::task::spawn_blocking(move || {
351 Self::clean_empty_dirs_sync(&dir_path_clone, false)
352 })
353 .await;
354
355 match result {
356 Ok(Ok(())) => {
357 info!("Cleaned manifest cache for directory: {}", dir);
358 }
359 Ok(Err(e)) => {
360 warn!(e; "Failed to clean empty directories under {}", dir_path.display());
361 }
362 Err(e) => {
363 warn!(e; "Failed to spawn blocking task for cleaning empty directories");
364 }
365 }
366 }
367
368 fn clean_empty_dirs_sync(dir: &PathBuf, check_mtime: bool) -> std::io::Result<()> {
373 let is_empty = Self::remove_empty_dirs_recursive_sync(dir, check_mtime)?;
374 if is_empty {
375 if let Err(e) = std::fs::remove_dir(dir) {
376 if e.kind() != std::io::ErrorKind::NotFound {
377 warn!(e; "Failed to remove empty root dir {}", dir.display());
378 return Err(e);
379 } else {
380 warn!("Empty root dir not found before removal {}", dir.display());
381 }
382 } else {
383 info!(
384 "Removed empty root dir {} from manifest cache",
385 dir.display()
386 );
387 }
388 }
389 Ok(())
390 }
391
392 fn remove_empty_dirs_recursive_sync(dir: &PathBuf, check_mtime: bool) -> std::io::Result<bool> {
393 common_telemetry::debug!(
394 "Maybe remove empty dir: {:?}, check_mtime: {}",
395 dir,
396 check_mtime
397 );
398 let entries = match std::fs::read_dir(dir) {
399 Ok(entries) => entries,
400 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
401 return Ok(true);
403 }
404 Err(e) => return Err(e),
405 };
406
407 let mut is_empty = true;
408 for entry in entries {
411 let entry = entry?;
412 let path = entry.path();
413 let metadata = std::fs::metadata(&path)?;
414
415 if metadata.is_dir() {
416 if check_mtime
418 && let Ok(modified) = metadata.modified()
419 && let Ok(elapsed) = modified.elapsed()
420 && elapsed < Duration::from_secs(3600)
421 {
422 common_telemetry::debug!("Skip directory by mtime, elapsed: {:?}", elapsed);
423 is_empty = false;
425 continue;
426 }
427
428 let subdir_empty = Self::remove_empty_dirs_recursive_sync(&path, check_mtime)?;
429 if subdir_empty {
430 if let Err(e) = std::fs::remove_dir(&path) {
431 if e.kind() != std::io::ErrorKind::NotFound {
432 warn!(e; "Failed to remove empty directory {}", path.display());
433 is_empty = false;
434 } else {
435 info!(
436 "Empty directory {} not found before removal",
437 path.display()
438 );
439 }
440 } else {
441 info!(
442 "Removed empty directory {} from manifest cache",
443 path.display()
444 );
445 }
446 } else {
447 is_empty = false;
448 }
449 } else {
450 is_empty = false;
451 }
452 }
453
454 Ok(is_empty)
455 }
456}
457
458#[derive(Debug, Clone)]
462pub(crate) struct IndexValue {
463 pub(crate) file_size: u32,
465}
466
467#[cfg(test)]
468mod tests {
469 use common_test_util::temp_dir::create_temp_dir;
470 use object_store::services::Fs;
471
472 use super::*;
473
474 fn new_fs_store(path: &str) -> ObjectStore {
475 let builder = Fs::default().root(path);
476 ObjectStore::new(builder).unwrap().finish()
477 }
478
479 #[tokio::test]
480 async fn test_manifest_cache_basic() {
481 common_telemetry::init_default_ut_logging();
482
483 let dir = create_temp_dir("");
484 let local_store = new_fs_store(dir.path().to_str().unwrap());
485
486 let cache = ManifestCache::new(local_store.clone(), ReadableSize::mb(10), None).await;
487 let key = "region_1/manifest/00000000000000000007.json";
488 let file_path = cache.cache_file_path(key);
489
490 assert!(cache.get(key).await.is_none());
492
493 local_store
495 .write(&file_path, b"manifest content".as_slice())
496 .await
497 .unwrap();
498 cache
500 .put(key.to_string(), IndexValue { file_size: 16 })
501 .await;
502
503 let value = cache.get(key).await.unwrap();
505 assert_eq!(16, value.file_size);
506
507 cache.index.run_pending_tasks().await;
509 assert_eq!(59, cache.index.weighted_size());
510
511 cache.remove(key).await;
513 cache.index.run_pending_tasks().await;
514 assert!(cache.get(key).await.is_none());
515
516 cache.index.run_pending_tasks().await;
518
519 assert!(!local_store.exists(&file_path).await.unwrap());
521 assert_eq!(0, cache.index.weighted_size());
522 }
523
524 #[tokio::test]
525 async fn test_manifest_cache_recover() {
526 common_telemetry::init_default_ut_logging();
527
528 let dir = create_temp_dir("");
529 let local_store = new_fs_store(dir.path().to_str().unwrap());
530 let cache = ManifestCache::new(local_store.clone(), ReadableSize::mb(10), None).await;
531
532 let keys = [
534 "region_1/manifest/00000000000000000001.json",
535 "region_1/manifest/00000000000000000002.json",
536 "region_1/manifest/00000000000000000001.checkpoint",
537 "region_2/manifest/00000000000000000001.json",
538 ];
539
540 let mut total_size = 0;
541 for (i, key) in keys.iter().enumerate() {
542 let file_path = cache.cache_file_path(key);
543 let content = format!("manifest-{}", i).into_bytes();
544 local_store
545 .write(&file_path, content.clone())
546 .await
547 .unwrap();
548
549 cache
551 .put(
552 key.to_string(),
553 IndexValue {
554 file_size: content.len() as u32,
555 },
556 )
557 .await;
558 total_size += content.len() + key.len();
559 }
560
561 let cache = ManifestCache::new(local_store.clone(), ReadableSize::mb(10), None).await;
563
564 cache.recover(true).await;
566
567 cache.index.run_pending_tasks().await;
569 let total_cached = cache.index.weighted_size() as usize;
570 assert_eq!(total_size, total_cached);
571
572 for (i, key) in keys.iter().enumerate() {
574 let value = cache.get(key).await.unwrap();
575 assert_eq!(format!("manifest-{}", i).len() as u32, value.file_size);
576 }
577 }
578
579 #[tokio::test]
580 async fn test_cache_file_path() {
581 let dir = create_temp_dir("");
582 let local_store = new_fs_store(dir.path().to_str().unwrap());
583 let cache = ManifestCache::new(local_store, ReadableSize::mb(10), None).await;
584
585 assert_eq!(
586 "cache/object/manifest/region_1/manifest/00000000000000000007.json",
587 cache.cache_file_path("region_1/manifest/00000000000000000007.json")
588 );
589 assert_eq!(
590 "cache/object/manifest/region_1/manifest/00000000000000000007.checkpoint",
591 cache.cache_file_path("region_1/manifest/00000000000000000007.checkpoint")
592 );
593 }
594
595 #[tokio::test]
596 async fn test_clean_empty_dirs_sync_no_mtime_check() {
597 common_telemetry::init_default_ut_logging();
598
599 let dir = create_temp_dir("");
600 let root = PathBuf::from(dir.path());
601
602 let empty_dir1 = root.join("empty_dir1");
615 let empty_dir2 = root.join("empty_dir2");
616 let empty_subdir = empty_dir2.join("empty_subdir");
617 let non_empty_dir = root.join("non_empty_dir");
618 let nested = root.join("nested");
619 let nested_empty = nested.join("empty_subdir1");
620 let nested_non_empty = nested.join("non_empty_subdir");
621
622 std::fs::create_dir_all(&empty_dir1).unwrap();
624 std::fs::create_dir_all(&empty_subdir).unwrap();
625 std::fs::create_dir_all(&non_empty_dir).unwrap();
626 std::fs::create_dir_all(&nested_empty).unwrap();
627 std::fs::create_dir_all(&nested_non_empty).unwrap();
628
629 std::fs::write(non_empty_dir.join("file.txt"), b"content").unwrap();
631 std::fs::write(nested_non_empty.join("file.txt"), b"content").unwrap();
632
633 assert!(empty_dir1.exists());
635 assert!(empty_dir2.exists());
636 assert!(empty_subdir.exists());
637 assert!(non_empty_dir.exists());
638 assert!(nested.exists());
639 assert!(nested_empty.exists());
640 assert!(nested_non_empty.exists());
641
642 ManifestCache::clean_empty_dirs_sync(&root, false).unwrap();
644
645 assert!(!empty_dir1.exists());
647 assert!(!empty_dir2.exists());
648 assert!(!empty_subdir.exists());
649 assert!(!nested_empty.exists());
650
651 assert!(non_empty_dir.exists());
653 assert!(non_empty_dir.join("file.txt").exists());
654 assert!(nested.exists());
655 assert!(nested_non_empty.exists());
656 assert!(nested_non_empty.join("file.txt").exists());
657 }
658
659 #[tokio::test]
660 async fn test_clean_empty_dirs_sync_with_mtime_check() {
661 common_telemetry::init_default_ut_logging();
662
663 let dir = create_temp_dir("");
664 let root = PathBuf::from(dir.path());
665
666 let empty_dir1 = root.join("empty_dir1");
675 let empty_dir2 = root.join("empty_dir2");
676 let empty_subdir = empty_dir2.join("empty_subdir");
677 let non_empty_dir = root.join("non_empty_dir");
678
679 std::fs::create_dir_all(&empty_dir1).unwrap();
681 std::fs::create_dir_all(&empty_subdir).unwrap();
682 std::fs::create_dir_all(&non_empty_dir).unwrap();
683
684 std::fs::write(non_empty_dir.join("file.txt"), b"content").unwrap();
686
687 assert!(empty_dir1.exists());
689 assert!(empty_dir2.exists());
690 assert!(empty_subdir.exists());
691 assert!(non_empty_dir.exists());
692
693 ManifestCache::clean_empty_dirs_sync(&root, true).unwrap();
696
697 assert!(empty_dir1.exists());
699 assert!(empty_dir2.exists());
700 assert!(empty_subdir.exists());
701
702 assert!(non_empty_dir.exists());
704 assert!(non_empty_dir.join("file.txt").exists());
705 }
706}