mito2/cache/
manifest_cache.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A cache for manifest files.
16
17use std::path::PathBuf;
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use common_base::readable_size::ReadableSize;
22use common_telemetry::{error, info, warn};
23use futures::{FutureExt, TryStreamExt};
24use moka::future::Cache;
25use moka::notification::RemovalCause;
26use moka::policy::EvictionPolicy;
27use object_store::ObjectStore;
28use object_store::util::join_path;
29use snafu::ResultExt;
30
31use crate::error::{OpenDalSnafu, Result};
32use crate::metrics::{CACHE_BYTES, CACHE_HIT, CACHE_MISS};
33
34/// Subdirectory of cached manifest files.
35///
36/// This must contain three layers, corresponding to [`build_prometheus_metrics_layer`](object_store::layers::build_prometheus_metrics_layer).
37const MANIFEST_DIR: &str = "cache/object/manifest/";
38
39/// Metric label for manifest files.
40const MANIFEST_TYPE: &str = "manifest";
41
42/// A manifest cache manages manifest files on local store and evicts files based
43/// on size.
44#[derive(Debug, Clone)]
45pub struct ManifestCache {
46    /// Local store to cache files.
47    local_store: ObjectStore,
48    /// Index to track cached manifest files.
49    index: Cache<String, IndexValue>,
50}
51
52impl ManifestCache {
53    /// Creates a new manifest cache and recovers the index from local store.
54    pub async fn new(
55        local_store: ObjectStore,
56        capacity: ReadableSize,
57        ttl: Option<Duration>,
58        recover_sync: bool,
59    ) -> ManifestCache {
60        let total_capacity = capacity.as_bytes();
61
62        info!(
63            "Initializing manifest cache with capacity: {}",
64            ReadableSize(total_capacity)
65        );
66
67        let index = Self::build_cache(local_store.clone(), total_capacity, ttl);
68
69        let cache = ManifestCache { local_store, index };
70
71        // Recovers the cache index from local store.
72        cache.recover(recover_sync).await;
73
74        cache
75    }
76
77    /// Builds the cache.
78    fn build_cache(
79        local_store: ObjectStore,
80        capacity: u64,
81        ttl: Option<Duration>,
82    ) -> Cache<String, IndexValue> {
83        let cache_store = local_store;
84        let mut builder = Cache::builder()
85            .eviction_policy(EvictionPolicy::lru())
86            .weigher(|key: &String, value: &IndexValue| -> u32 {
87                key.len() as u32 + value.file_size
88            })
89            .max_capacity(capacity)
90            .async_eviction_listener(move |key: Arc<String>, value: IndexValue, cause| {
91                let store = cache_store.clone();
92                // Stores files under MANIFEST_DIR.
93                let file_path = join_path(MANIFEST_DIR, &key);
94                async move {
95                    if let RemovalCause::Replaced = cause {
96                        // The cache is replaced by another file. We don't remove the same
97                        // file but updates the metrics as the file is already replaced by users.
98                        CACHE_BYTES
99                            .with_label_values(&[MANIFEST_TYPE])
100                            .sub(value.file_size.into());
101                        return;
102                    }
103
104                    match store.delete(&file_path).await {
105                        Ok(()) => {
106                            CACHE_BYTES
107                                .with_label_values(&[MANIFEST_TYPE])
108                                .sub(value.file_size.into());
109                        }
110                        Err(e) => {
111                            warn!(e; "Failed to delete cached manifest file {}", file_path);
112                        }
113                    }
114                }
115                .boxed()
116            });
117        if let Some(ttl) = ttl {
118            builder = builder.time_to_idle(ttl);
119        }
120        builder.build()
121    }
122
123    /// Puts a file into the cache index.
124    ///
125    /// The caller should ensure the file is in the correct path.
126    pub(crate) async fn put(&self, key: String, value: IndexValue) {
127        CACHE_BYTES
128            .with_label_values(&[MANIFEST_TYPE])
129            .add(value.file_size.into());
130        self.index.insert(key, value).await;
131
132        // Since files can be large items, we run the pending tasks immediately.
133        self.index.run_pending_tasks().await;
134    }
135
136    /// Gets the index value for the key.
137    pub(crate) async fn get(&self, key: &str) -> Option<IndexValue> {
138        self.index.get(key).await
139    }
140
141    /// Removes a file from the cache explicitly.
142    pub(crate) async fn remove(&self, key: &str) {
143        let file_path = self.cache_file_path(key);
144        self.index.remove(key).await;
145        // Always deletes the file from the local store.
146        if let Err(e) = self.local_store.delete(&file_path).await {
147            warn!(e; "Failed to delete a cached manifest file {}", file_path);
148        }
149    }
150
151    /// Removes multiple files from the cache in batch.
152    pub(crate) async fn remove_batch(&self, keys: &[String]) {
153        if keys.is_empty() {
154            return;
155        }
156
157        for key in keys {
158            self.index.remove(key).await;
159        }
160
161        let file_paths: Vec<String> = keys.iter().map(|key| self.cache_file_path(key)).collect();
162
163        if let Err(e) = self.local_store.delete_iter(file_paths).await {
164            warn!(e; "Failed to delete cached manifest files in batch");
165        }
166    }
167
168    async fn recover_inner(&self) -> Result<()> {
169        let now = Instant::now();
170        let mut lister = self
171            .local_store
172            .lister_with(MANIFEST_DIR)
173            .recursive(true)
174            .await
175            .context(OpenDalSnafu)?;
176        let (mut total_size, mut total_keys) = (0i64, 0);
177        while let Some(entry) = lister.try_next().await.context(OpenDalSnafu)? {
178            let meta = entry.metadata();
179            if !meta.is_file() {
180                continue;
181            }
182
183            let meta = self
184                .local_store
185                .stat(entry.path())
186                .await
187                .context(OpenDalSnafu)?;
188            let file_size = meta.content_length() as u32;
189            let key = entry.path().trim_start_matches(MANIFEST_DIR).to_string();
190            common_telemetry::debug!("Manifest cache recover {}, size: {}", key, file_size);
191            self.index.insert(key, IndexValue { file_size }).await;
192            let size = i64::from(file_size);
193            total_size += size;
194            total_keys += 1;
195        }
196        CACHE_BYTES
197            .with_label_values(&[MANIFEST_TYPE])
198            .add(total_size);
199
200        // Runs all pending tasks of the moka cache so that the cache size is updated
201        // and the eviction policy is applied.
202        self.index.run_pending_tasks().await;
203
204        let weight = self.index.weighted_size();
205        let count = self.index.entry_count();
206        info!(
207            "Recovered manifest cache, num_keys: {}, num_bytes: {}, count: {}, weight: {}, cost: {:?}",
208            total_keys,
209            total_size,
210            count,
211            weight,
212            now.elapsed()
213        );
214        Ok(())
215    }
216
217    /// Recovers the index from local store.
218    pub(crate) async fn recover(&self, sync: bool) {
219        let moved_self = self.clone();
220        let handle = tokio::spawn(async move {
221            if let Err(err) = moved_self.recover_inner().await {
222                error!(err; "Failed to recover manifest cache.")
223            }
224
225            moved_self.clean_empty_dirs(true).await;
226        });
227
228        if sync {
229            let _ = handle.await;
230        }
231    }
232
233    /// Returns the cache file path for the key.
234    pub(crate) fn cache_file_path(&self, key: &str) -> String {
235        join_path(MANIFEST_DIR, key)
236    }
237
238    /// Gets a manifest file from cache.
239    /// Returns the file data if found in cache, None otherwise.
240    pub(crate) async fn get_file(&self, key: &str) -> Option<Vec<u8>> {
241        if self.get(key).await.is_none() {
242            CACHE_MISS.with_label_values(&[MANIFEST_TYPE]).inc();
243            return None;
244        }
245
246        let cache_file_path = self.cache_file_path(key);
247        match self.local_store.read(&cache_file_path).await {
248            Ok(data) => {
249                CACHE_HIT.with_label_values(&[MANIFEST_TYPE]).inc();
250                Some(data.to_vec())
251            }
252            Err(e) => {
253                warn!(e; "Failed to read cached manifest file {}", cache_file_path);
254                CACHE_MISS.with_label_values(&[MANIFEST_TYPE]).inc();
255                None
256            }
257        }
258    }
259
260    /// Puts a manifest file into cache.
261    pub(crate) async fn put_file(&self, key: String, data: Vec<u8>) {
262        let cache_file_path = self.cache_file_path(&key);
263
264        if let Err(e) = self.local_store.write(&cache_file_path, data.clone()).await {
265            warn!(e; "Failed to write manifest to cache {}", cache_file_path);
266            return;
267        }
268
269        let file_size = data.len() as u32;
270        self.put(key, IndexValue { file_size }).await;
271    }
272
273    /// Removes empty directories recursively under the manifest cache directory.
274    ///
275    /// If `check_mtime` is true, only removes directories that have not been modified
276    /// for at least 1 hour.
277    pub(crate) async fn clean_empty_dirs(&self, check_mtime: bool) {
278        info!("Clean empty dirs start");
279
280        let root = self.local_store.info().root();
281        let manifest_dir = PathBuf::from(root).join(MANIFEST_DIR);
282        let manifest_dir_clone = manifest_dir.clone();
283
284        let result = tokio::task::spawn_blocking(move || {
285            Self::clean_empty_dirs_sync(&manifest_dir_clone, check_mtime)
286        })
287        .await;
288
289        match result {
290            Ok(Ok(())) => {
291                info!("Clean empty dirs end");
292            }
293            Ok(Err(e)) => {
294                warn!(e; "Failed to clean empty directories under {}", manifest_dir.display());
295            }
296            Err(e) => {
297                warn!(e; "Failed to spawn blocking task for cleaning empty directories");
298            }
299        }
300    }
301
302    /// Removes all manifest files under the given directory from cache and cleans up empty directories.
303    pub(crate) async fn clean_manifests(&self, dir: &str) {
304        info!("Clean manifest cache for directory: {}", dir);
305
306        let cache_dir = join_path(MANIFEST_DIR, dir);
307        let mut lister = match self
308            .local_store
309            .lister_with(&cache_dir)
310            .recursive(true)
311            .await
312        {
313            Ok(lister) => lister,
314            Err(e) => {
315                warn!(e; "Failed to list manifest files under {}", cache_dir);
316                return;
317            }
318        };
319
320        let mut keys_to_remove = Vec::new();
321        loop {
322            match lister.try_next().await {
323                Ok(Some(entry)) => {
324                    let meta = entry.metadata();
325                    if meta.is_file() {
326                        keys_to_remove
327                            .push(entry.path().trim_start_matches(MANIFEST_DIR).to_string());
328                    }
329                }
330                Ok(None) => break,
331                Err(e) => {
332                    warn!(e; "Failed to read entry while listing {}", cache_dir);
333                    break;
334                }
335            }
336        }
337
338        info!(
339            "Going to remove files from manifest cache, files: {:?}",
340            keys_to_remove
341        );
342
343        // Removes all files from cache in batch
344        self.remove_batch(&keys_to_remove).await;
345
346        // Cleans up empty directories under the given dir
347        let root = self.local_store.info().root();
348        let dir_path = PathBuf::from(root).join(&cache_dir);
349        let dir_path_clone = dir_path.clone();
350
351        let result = tokio::task::spawn_blocking(move || {
352            Self::clean_empty_dirs_sync(&dir_path_clone, false)
353        })
354        .await;
355
356        match result {
357            Ok(Ok(())) => {
358                info!("Cleaned manifest cache for directory: {}", dir);
359            }
360            Ok(Err(e)) => {
361                warn!(e; "Failed to clean empty directories under {}", dir_path.display());
362            }
363            Err(e) => {
364                warn!(e; "Failed to spawn blocking task for cleaning empty directories");
365            }
366        }
367    }
368
369    /// Synchronously removes empty directories recursively.
370    ///
371    /// If `check_mtime` is true, only removes directories that have not been modified
372    /// for at least 1 hour.
373    fn clean_empty_dirs_sync(dir: &PathBuf, check_mtime: bool) -> std::io::Result<()> {
374        let is_empty = Self::remove_empty_dirs_recursive_sync(dir, check_mtime)?;
375        if is_empty {
376            if let Err(e) = std::fs::remove_dir(dir) {
377                if e.kind() != std::io::ErrorKind::NotFound {
378                    warn!(e; "Failed to remove empty root dir {}", dir.display());
379                    return Err(e);
380                } else {
381                    warn!("Empty root dir not found before removal {}", dir.display());
382                }
383            } else {
384                info!(
385                    "Removed empty root dir {} from manifest cache",
386                    dir.display()
387                );
388            }
389        }
390        Ok(())
391    }
392
393    fn remove_empty_dirs_recursive_sync(dir: &PathBuf, check_mtime: bool) -> std::io::Result<bool> {
394        common_telemetry::debug!(
395            "Maybe remove empty dir: {:?}, check_mtime: {}",
396            dir,
397            check_mtime
398        );
399        let entries = match std::fs::read_dir(dir) {
400            Ok(entries) => entries,
401            Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
402                // Directory doesn't exist, treat as already removed (empty)
403                return Ok(true);
404            }
405            Err(e) => return Err(e),
406        };
407
408        let mut is_empty = true;
409        // Iterates all entries under the directory.
410        // We have to check all entries to clean up all empty subdirectories.
411        for entry in entries {
412            let entry = entry?;
413            let path = entry.path();
414            let metadata = std::fs::metadata(&path)?;
415
416            if metadata.is_dir() {
417                // Checks if we should skip this directory based on modification time
418                if check_mtime
419                    && let Ok(modified) = metadata.modified()
420                    && let Ok(elapsed) = modified.elapsed()
421                    && elapsed < Duration::from_secs(3600)
422                {
423                    common_telemetry::debug!("Skip directory by mtime, elapsed: {:?}", elapsed);
424                    // Only removes if not modified for at least 1 hour.
425                    is_empty = false;
426                    continue;
427                }
428
429                let subdir_empty = Self::remove_empty_dirs_recursive_sync(&path, check_mtime)?;
430                if subdir_empty {
431                    if let Err(e) = std::fs::remove_dir(&path) {
432                        if e.kind() != std::io::ErrorKind::NotFound {
433                            warn!(e; "Failed to remove empty directory {}", path.display());
434                            is_empty = false;
435                        } else {
436                            info!(
437                                "Empty directory {} not found before removal",
438                                path.display()
439                            );
440                        }
441                    } else {
442                        info!(
443                            "Removed empty directory {} from manifest cache",
444                            path.display()
445                        );
446                    }
447                } else {
448                    is_empty = false;
449                }
450            } else {
451                is_empty = false;
452            }
453        }
454
455        Ok(is_empty)
456    }
457}
458
459/// An entity that describes the file in the manifest cache.
460///
461/// It should only keep minimal information needed by the cache.
462#[derive(Debug, Clone)]
463pub(crate) struct IndexValue {
464    /// Size of the file in bytes.
465    pub(crate) file_size: u32,
466}
467
468#[cfg(test)]
469mod tests {
470    use common_test_util::temp_dir::create_temp_dir;
471    use object_store::services::Fs;
472
473    use super::*;
474
475    fn new_fs_store(path: &str) -> ObjectStore {
476        let builder = Fs::default().root(path);
477        ObjectStore::new(builder).unwrap().finish()
478    }
479
480    #[tokio::test]
481    async fn test_manifest_cache_basic() {
482        common_telemetry::init_default_ut_logging();
483
484        let dir = create_temp_dir("");
485        let local_store = new_fs_store(dir.path().to_str().unwrap());
486
487        let cache = ManifestCache::new(local_store.clone(), ReadableSize::mb(10), None, true).await;
488        let key = "region_1/manifest/00000000000000000007.json";
489        let file_path = cache.cache_file_path(key);
490
491        // Get an empty file.
492        assert!(cache.get(key).await.is_none());
493
494        // Write a file.
495        local_store
496            .write(&file_path, b"manifest content".as_slice())
497            .await
498            .unwrap();
499        // Add to the cache.
500        cache
501            .put(key.to_string(), IndexValue { file_size: 16 })
502            .await;
503
504        // Get the cached value.
505        let value = cache.get(key).await.unwrap();
506        assert_eq!(16, value.file_size);
507
508        // Get weighted size.
509        cache.index.run_pending_tasks().await;
510        assert_eq!(59, cache.index.weighted_size());
511
512        // Remove the file.
513        cache.remove(key).await;
514        cache.index.run_pending_tasks().await;
515        assert!(cache.get(key).await.is_none());
516
517        // Ensure all pending tasks of the moka cache is done before assertion.
518        cache.index.run_pending_tasks().await;
519
520        // The file also not exists.
521        assert!(!local_store.exists(&file_path).await.unwrap());
522        assert_eq!(0, cache.index.weighted_size());
523    }
524
525    #[tokio::test]
526    async fn test_manifest_cache_recover() {
527        common_telemetry::init_default_ut_logging();
528
529        let dir = create_temp_dir("");
530        let local_store = new_fs_store(dir.path().to_str().unwrap());
531        let cache = ManifestCache::new(local_store.clone(), ReadableSize::mb(10), None, true).await;
532
533        // Write some manifest files with different paths
534        let keys = [
535            "region_1/manifest/00000000000000000001.json",
536            "region_1/manifest/00000000000000000002.json",
537            "region_1/manifest/00000000000000000001.checkpoint",
538            "region_2/manifest/00000000000000000001.json",
539        ];
540
541        let mut total_size = 0;
542        for (i, key) in keys.iter().enumerate() {
543            let file_path = cache.cache_file_path(key);
544            let content = format!("manifest-{}", i).into_bytes();
545            local_store
546                .write(&file_path, content.clone())
547                .await
548                .unwrap();
549
550            // Add to the cache.
551            cache
552                .put(
553                    key.to_string(),
554                    IndexValue {
555                        file_size: content.len() as u32,
556                    },
557                )
558                .await;
559            total_size += content.len() + key.len();
560        }
561
562        // Create a new cache instance which will automatically recover from local store
563        let cache = ManifestCache::new(local_store.clone(), ReadableSize::mb(10), None, true).await;
564
565        // Check size.
566        cache.index.run_pending_tasks().await;
567        let total_cached = cache.index.weighted_size() as usize;
568        assert_eq!(total_size, total_cached);
569
570        // Verify all files
571        for (i, key) in keys.iter().enumerate() {
572            let value = cache.get(key).await.unwrap();
573            assert_eq!(format!("manifest-{}", i).len() as u32, value.file_size);
574        }
575    }
576
577    #[tokio::test]
578    async fn test_cache_file_path() {
579        let dir = create_temp_dir("");
580        let local_store = new_fs_store(dir.path().to_str().unwrap());
581        let cache = ManifestCache::new(local_store, ReadableSize::mb(10), None, true).await;
582
583        assert_eq!(
584            "cache/object/manifest/region_1/manifest/00000000000000000007.json",
585            cache.cache_file_path("region_1/manifest/00000000000000000007.json")
586        );
587        assert_eq!(
588            "cache/object/manifest/region_1/manifest/00000000000000000007.checkpoint",
589            cache.cache_file_path("region_1/manifest/00000000000000000007.checkpoint")
590        );
591    }
592
593    #[tokio::test]
594    async fn test_clean_empty_dirs_sync_no_mtime_check() {
595        common_telemetry::init_default_ut_logging();
596
597        let dir = create_temp_dir("");
598        let root = PathBuf::from(dir.path());
599
600        // Create a directory structure:
601        // root/
602        //   empty_dir1/
603        //   empty_dir2/
604        //     empty_subdir/
605        //   non_empty_dir/
606        //     file.txt
607        //   nested/
608        //     empty_subdir1/
609        //     non_empty_subdir/
610        //       file.txt
611
612        let empty_dir1 = root.join("empty_dir1");
613        let empty_dir2 = root.join("empty_dir2");
614        let empty_subdir = empty_dir2.join("empty_subdir");
615        let non_empty_dir = root.join("non_empty_dir");
616        let nested = root.join("nested");
617        let nested_empty = nested.join("empty_subdir1");
618        let nested_non_empty = nested.join("non_empty_subdir");
619
620        // Create directories
621        std::fs::create_dir_all(&empty_dir1).unwrap();
622        std::fs::create_dir_all(&empty_subdir).unwrap();
623        std::fs::create_dir_all(&non_empty_dir).unwrap();
624        std::fs::create_dir_all(&nested_empty).unwrap();
625        std::fs::create_dir_all(&nested_non_empty).unwrap();
626
627        // Create files in non-empty directories
628        std::fs::write(non_empty_dir.join("file.txt"), b"content").unwrap();
629        std::fs::write(nested_non_empty.join("file.txt"), b"content").unwrap();
630
631        // Verify initial state
632        assert!(empty_dir1.exists());
633        assert!(empty_dir2.exists());
634        assert!(empty_subdir.exists());
635        assert!(non_empty_dir.exists());
636        assert!(nested.exists());
637        assert!(nested_empty.exists());
638        assert!(nested_non_empty.exists());
639
640        // Clean empty directories with check_mtime = false
641        ManifestCache::clean_empty_dirs_sync(&root, false).unwrap();
642
643        // Verify empty directories are removed
644        assert!(!empty_dir1.exists());
645        assert!(!empty_dir2.exists());
646        assert!(!empty_subdir.exists());
647        assert!(!nested_empty.exists());
648
649        // Verify non-empty directories still exist
650        assert!(non_empty_dir.exists());
651        assert!(non_empty_dir.join("file.txt").exists());
652        assert!(nested.exists());
653        assert!(nested_non_empty.exists());
654        assert!(nested_non_empty.join("file.txt").exists());
655    }
656
657    #[tokio::test]
658    async fn test_clean_empty_dirs_sync_with_mtime_check() {
659        common_telemetry::init_default_ut_logging();
660
661        let dir = create_temp_dir("");
662        let root = PathBuf::from(dir.path());
663
664        // Create a directory structure with recently created empty directories
665        // root/
666        //   empty_dir1/
667        //   empty_dir2/
668        //     empty_subdir/
669        //   non_empty_dir/
670        //     file.txt
671
672        let empty_dir1 = root.join("empty_dir1");
673        let empty_dir2 = root.join("empty_dir2");
674        let empty_subdir = empty_dir2.join("empty_subdir");
675        let non_empty_dir = root.join("non_empty_dir");
676
677        // Create directories
678        std::fs::create_dir_all(&empty_dir1).unwrap();
679        std::fs::create_dir_all(&empty_subdir).unwrap();
680        std::fs::create_dir_all(&non_empty_dir).unwrap();
681
682        // Create file in non-empty directory
683        std::fs::write(non_empty_dir.join("file.txt"), b"content").unwrap();
684
685        // Verify initial state
686        assert!(empty_dir1.exists());
687        assert!(empty_dir2.exists());
688        assert!(empty_subdir.exists());
689        assert!(non_empty_dir.exists());
690
691        // Clean empty directories with check_mtime = true
692        // Since the directories were just created (mtime < 1 hour), they should NOT be removed
693        ManifestCache::clean_empty_dirs_sync(&root, true).unwrap();
694
695        // Verify empty directories are NOT removed (they're too recent)
696        assert!(empty_dir1.exists());
697        assert!(empty_dir2.exists());
698        assert!(empty_subdir.exists());
699
700        // Verify non-empty directory still exists
701        assert!(non_empty_dir.exists());
702        assert!(non_empty_dir.join("file.txt").exists());
703    }
704}