mito2/cache/
manifest_cache.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A cache for manifest files.
16
17use std::path::PathBuf;
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use common_base::readable_size::ReadableSize;
22use common_telemetry::{error, info, warn};
23use futures::{FutureExt, TryStreamExt};
24use moka::future::Cache;
25use moka::notification::RemovalCause;
26use moka::policy::EvictionPolicy;
27use object_store::ObjectStore;
28use object_store::util::join_path;
29use snafu::ResultExt;
30
31use crate::error::{OpenDalSnafu, Result};
32use crate::metrics::{CACHE_BYTES, CACHE_HIT, CACHE_MISS};
33
34/// Subdirectory of cached manifest files.
35///
36/// This must contain three layers, corresponding to [`build_prometheus_metrics_layer`](object_store::layers::build_prometheus_metrics_layer).
37const MANIFEST_DIR: &str = "cache/object/manifest/";
38
39/// Metric label for manifest files.
40const MANIFEST_TYPE: &str = "manifest";
41
42/// A manifest cache manages manifest files on local store and evicts files based
43/// on size.
44#[derive(Debug, Clone)]
45pub struct ManifestCache {
46    /// Local store to cache files.
47    local_store: ObjectStore,
48    /// Index to track cached manifest files.
49    index: Cache<String, IndexValue>,
50}
51
52impl ManifestCache {
53    /// Creates a new manifest cache and recovers the index from local store.
54    pub async fn new(
55        local_store: ObjectStore,
56        capacity: ReadableSize,
57        ttl: Option<Duration>,
58    ) -> ManifestCache {
59        let total_capacity = capacity.as_bytes();
60
61        info!(
62            "Initializing manifest cache with capacity: {}",
63            ReadableSize(total_capacity)
64        );
65
66        let index = Self::build_cache(local_store.clone(), total_capacity, ttl);
67
68        let cache = ManifestCache { local_store, index };
69
70        // Recovers the cache index from local store asynchronously
71        cache.recover(false).await;
72
73        cache
74    }
75
76    /// Builds the cache.
77    fn build_cache(
78        local_store: ObjectStore,
79        capacity: u64,
80        ttl: Option<Duration>,
81    ) -> Cache<String, IndexValue> {
82        let cache_store = local_store;
83        let mut builder = Cache::builder()
84            .eviction_policy(EvictionPolicy::lru())
85            .weigher(|key: &String, value: &IndexValue| -> u32 {
86                key.len() as u32 + value.file_size
87            })
88            .max_capacity(capacity)
89            .async_eviction_listener(move |key: Arc<String>, value: IndexValue, cause| {
90                let store = cache_store.clone();
91                // Stores files under MANIFEST_DIR.
92                let file_path = join_path(MANIFEST_DIR, &key);
93                async move {
94                    if let RemovalCause::Replaced = cause {
95                        // The cache is replaced by another file. We don't remove the same
96                        // file but updates the metrics as the file is already replaced by users.
97                        CACHE_BYTES
98                            .with_label_values(&[MANIFEST_TYPE])
99                            .sub(value.file_size.into());
100                        return;
101                    }
102
103                    match store.delete(&file_path).await {
104                        Ok(()) => {
105                            CACHE_BYTES
106                                .with_label_values(&[MANIFEST_TYPE])
107                                .sub(value.file_size.into());
108                        }
109                        Err(e) => {
110                            warn!(e; "Failed to delete cached manifest file {}", file_path);
111                        }
112                    }
113                }
114                .boxed()
115            });
116        if let Some(ttl) = ttl {
117            builder = builder.time_to_idle(ttl);
118        }
119        builder.build()
120    }
121
122    /// Puts a file into the cache index.
123    ///
124    /// The caller should ensure the file is in the correct path.
125    pub(crate) async fn put(&self, key: String, value: IndexValue) {
126        CACHE_BYTES
127            .with_label_values(&[MANIFEST_TYPE])
128            .add(value.file_size.into());
129        self.index.insert(key, value).await;
130
131        // Since files can be large items, we run the pending tasks immediately.
132        self.index.run_pending_tasks().await;
133    }
134
135    /// Gets the index value for the key.
136    pub(crate) async fn get(&self, key: &str) -> Option<IndexValue> {
137        self.index.get(key).await
138    }
139
140    /// Removes a file from the cache explicitly.
141    pub(crate) async fn remove(&self, key: &str) {
142        let file_path = self.cache_file_path(key);
143        self.index.remove(key).await;
144        // Always deletes the file from the local store.
145        if let Err(e) = self.local_store.delete(&file_path).await {
146            warn!(e; "Failed to delete a cached manifest file {}", file_path);
147        }
148    }
149
150    /// Removes multiple files from the cache in batch.
151    pub(crate) async fn remove_batch(&self, keys: &[String]) {
152        if keys.is_empty() {
153            return;
154        }
155
156        for key in keys {
157            self.index.remove(key).await;
158        }
159
160        let file_paths: Vec<String> = keys.iter().map(|key| self.cache_file_path(key)).collect();
161
162        if let Err(e) = self.local_store.delete_iter(file_paths).await {
163            warn!(e; "Failed to delete cached manifest files in batch");
164        }
165    }
166
167    async fn recover_inner(&self) -> Result<()> {
168        let now = Instant::now();
169        let mut lister = self
170            .local_store
171            .lister_with(MANIFEST_DIR)
172            .recursive(true)
173            .await
174            .context(OpenDalSnafu)?;
175        let (mut total_size, mut total_keys) = (0i64, 0);
176        while let Some(entry) = lister.try_next().await.context(OpenDalSnafu)? {
177            let meta = entry.metadata();
178            if !meta.is_file() {
179                continue;
180            }
181
182            let meta = self
183                .local_store
184                .stat(entry.path())
185                .await
186                .context(OpenDalSnafu)?;
187            let file_size = meta.content_length() as u32;
188            let key = entry.path().trim_start_matches(MANIFEST_DIR).to_string();
189            common_telemetry::info!("Manifest cache recover {}, size: {}", key, file_size);
190            self.index.insert(key, IndexValue { file_size }).await;
191            let size = i64::from(file_size);
192            total_size += size;
193            total_keys += 1;
194        }
195        CACHE_BYTES
196            .with_label_values(&[MANIFEST_TYPE])
197            .add(total_size);
198
199        // Runs all pending tasks of the moka cache so that the cache size is updated
200        // and the eviction policy is applied.
201        self.index.run_pending_tasks().await;
202
203        let weight = self.index.weighted_size();
204        let count = self.index.entry_count();
205        info!(
206            "Recovered manifest cache, num_keys: {}, num_bytes: {}, count: {}, weight: {}, cost: {:?}",
207            total_keys,
208            total_size,
209            count,
210            weight,
211            now.elapsed()
212        );
213        Ok(())
214    }
215
216    /// Recovers the index from local store.
217    pub(crate) async fn recover(&self, sync: bool) {
218        let moved_self = self.clone();
219        let handle = tokio::spawn(async move {
220            if let Err(err) = moved_self.recover_inner().await {
221                error!(err; "Failed to recover manifest cache.")
222            }
223
224            moved_self.clean_empty_dirs(true).await;
225        });
226
227        if sync {
228            let _ = handle.await;
229        }
230    }
231
232    /// Returns the cache file path for the key.
233    pub(crate) fn cache_file_path(&self, key: &str) -> String {
234        join_path(MANIFEST_DIR, key)
235    }
236
237    /// Gets a manifest file from cache.
238    /// Returns the file data if found in cache, None otherwise.
239    pub(crate) async fn get_file(&self, key: &str) -> Option<Vec<u8>> {
240        if self.get(key).await.is_none() {
241            CACHE_MISS.with_label_values(&[MANIFEST_TYPE]).inc();
242            return None;
243        }
244
245        let cache_file_path = self.cache_file_path(key);
246        match self.local_store.read(&cache_file_path).await {
247            Ok(data) => {
248                CACHE_HIT.with_label_values(&[MANIFEST_TYPE]).inc();
249                Some(data.to_vec())
250            }
251            Err(e) => {
252                warn!(e; "Failed to read cached manifest file {}", cache_file_path);
253                CACHE_MISS.with_label_values(&[MANIFEST_TYPE]).inc();
254                None
255            }
256        }
257    }
258
259    /// Puts a manifest file into cache.
260    pub(crate) async fn put_file(&self, key: String, data: Vec<u8>) {
261        let cache_file_path = self.cache_file_path(&key);
262
263        if let Err(e) = self.local_store.write(&cache_file_path, data.clone()).await {
264            warn!(e; "Failed to write manifest to cache {}", cache_file_path);
265            return;
266        }
267
268        let file_size = data.len() as u32;
269        self.put(key, IndexValue { file_size }).await;
270    }
271
272    /// Removes empty directories recursively under the manifest cache directory.
273    ///
274    /// If `check_mtime` is true, only removes directories that have not been modified
275    /// for at least 1 hour.
276    pub(crate) async fn clean_empty_dirs(&self, check_mtime: bool) {
277        info!("Clean empty dirs start");
278
279        let root = self.local_store.info().root();
280        let manifest_dir = PathBuf::from(root).join(MANIFEST_DIR);
281        let manifest_dir_clone = manifest_dir.clone();
282
283        let result = tokio::task::spawn_blocking(move || {
284            Self::clean_empty_dirs_sync(&manifest_dir_clone, check_mtime)
285        })
286        .await;
287
288        match result {
289            Ok(Ok(())) => {
290                info!("Clean empty dirs end");
291            }
292            Ok(Err(e)) => {
293                warn!(e; "Failed to clean empty directories under {}", manifest_dir.display());
294            }
295            Err(e) => {
296                warn!(e; "Failed to spawn blocking task for cleaning empty directories");
297            }
298        }
299    }
300
301    /// Removes all manifest files under the given directory from cache and cleans up empty directories.
302    pub(crate) async fn clean_manifests(&self, dir: &str) {
303        info!("Clean manifest cache for directory: {}", dir);
304
305        let cache_dir = join_path(MANIFEST_DIR, dir);
306        let mut lister = match self
307            .local_store
308            .lister_with(&cache_dir)
309            .recursive(true)
310            .await
311        {
312            Ok(lister) => lister,
313            Err(e) => {
314                warn!(e; "Failed to list manifest files under {}", cache_dir);
315                return;
316            }
317        };
318
319        let mut keys_to_remove = Vec::new();
320        loop {
321            match lister.try_next().await {
322                Ok(Some(entry)) => {
323                    let meta = entry.metadata();
324                    if meta.is_file() {
325                        keys_to_remove
326                            .push(entry.path().trim_start_matches(MANIFEST_DIR).to_string());
327                    }
328                }
329                Ok(None) => break,
330                Err(e) => {
331                    warn!(e; "Failed to read entry while listing {}", cache_dir);
332                    break;
333                }
334            }
335        }
336
337        info!(
338            "Going to remove files from manifest cache, files: {:?}",
339            keys_to_remove
340        );
341
342        // Removes all files from cache in batch
343        self.remove_batch(&keys_to_remove).await;
344
345        // Cleans up empty directories under the given dir
346        let root = self.local_store.info().root();
347        let dir_path = PathBuf::from(root).join(&cache_dir);
348        let dir_path_clone = dir_path.clone();
349
350        let result = tokio::task::spawn_blocking(move || {
351            Self::clean_empty_dirs_sync(&dir_path_clone, false)
352        })
353        .await;
354
355        match result {
356            Ok(Ok(())) => {
357                info!("Cleaned manifest cache for directory: {}", dir);
358            }
359            Ok(Err(e)) => {
360                warn!(e; "Failed to clean empty directories under {}", dir_path.display());
361            }
362            Err(e) => {
363                warn!(e; "Failed to spawn blocking task for cleaning empty directories");
364            }
365        }
366    }
367
368    /// Synchronously removes empty directories recursively.
369    ///
370    /// If `check_mtime` is true, only removes directories that have not been modified
371    /// for at least 1 hour.
372    fn clean_empty_dirs_sync(dir: &PathBuf, check_mtime: bool) -> std::io::Result<()> {
373        let is_empty = Self::remove_empty_dirs_recursive_sync(dir, check_mtime)?;
374        if is_empty {
375            if let Err(e) = std::fs::remove_dir(dir) {
376                if e.kind() != std::io::ErrorKind::NotFound {
377                    warn!(e; "Failed to remove empty root dir {}", dir.display());
378                    return Err(e);
379                } else {
380                    warn!("Empty root dir not found before removal {}", dir.display());
381                }
382            } else {
383                info!(
384                    "Removed empty root dir {} from manifest cache",
385                    dir.display()
386                );
387            }
388        }
389        Ok(())
390    }
391
392    fn remove_empty_dirs_recursive_sync(dir: &PathBuf, check_mtime: bool) -> std::io::Result<bool> {
393        common_telemetry::debug!(
394            "Maybe remove empty dir: {:?}, check_mtime: {}",
395            dir,
396            check_mtime
397        );
398        let entries = match std::fs::read_dir(dir) {
399            Ok(entries) => entries,
400            Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
401                // Directory doesn't exist, treat as already removed (empty)
402                return Ok(true);
403            }
404            Err(e) => return Err(e),
405        };
406
407        let mut is_empty = true;
408        // Iterates all entries under the directory.
409        // We have to check all entries to clean up all empty subdirectories.
410        for entry in entries {
411            let entry = entry?;
412            let path = entry.path();
413            let metadata = std::fs::metadata(&path)?;
414
415            if metadata.is_dir() {
416                // Checks if we should skip this directory based on modification time
417                if check_mtime
418                    && let Ok(modified) = metadata.modified()
419                    && let Ok(elapsed) = modified.elapsed()
420                    && elapsed < Duration::from_secs(3600)
421                {
422                    common_telemetry::debug!("Skip directory by mtime, elapsed: {:?}", elapsed);
423                    // Only removes if not modified for at least 1 hour.
424                    is_empty = false;
425                    continue;
426                }
427
428                let subdir_empty = Self::remove_empty_dirs_recursive_sync(&path, check_mtime)?;
429                if subdir_empty {
430                    if let Err(e) = std::fs::remove_dir(&path) {
431                        if e.kind() != std::io::ErrorKind::NotFound {
432                            warn!(e; "Failed to remove empty directory {}", path.display());
433                            is_empty = false;
434                        } else {
435                            info!(
436                                "Empty directory {} not found before removal",
437                                path.display()
438                            );
439                        }
440                    } else {
441                        info!(
442                            "Removed empty directory {} from manifest cache",
443                            path.display()
444                        );
445                    }
446                } else {
447                    is_empty = false;
448                }
449            } else {
450                is_empty = false;
451            }
452        }
453
454        Ok(is_empty)
455    }
456}
457
458/// An entity that describes the file in the manifest cache.
459///
460/// It should only keep minimal information needed by the cache.
461#[derive(Debug, Clone)]
462pub(crate) struct IndexValue {
463    /// Size of the file in bytes.
464    pub(crate) file_size: u32,
465}
466
467#[cfg(test)]
468mod tests {
469    use common_test_util::temp_dir::create_temp_dir;
470    use object_store::services::Fs;
471
472    use super::*;
473
474    fn new_fs_store(path: &str) -> ObjectStore {
475        let builder = Fs::default().root(path);
476        ObjectStore::new(builder).unwrap().finish()
477    }
478
479    #[tokio::test]
480    async fn test_manifest_cache_basic() {
481        common_telemetry::init_default_ut_logging();
482
483        let dir = create_temp_dir("");
484        let local_store = new_fs_store(dir.path().to_str().unwrap());
485
486        let cache = ManifestCache::new(local_store.clone(), ReadableSize::mb(10), None).await;
487        let key = "region_1/manifest/00000000000000000007.json";
488        let file_path = cache.cache_file_path(key);
489
490        // Get an empty file.
491        assert!(cache.get(key).await.is_none());
492
493        // Write a file.
494        local_store
495            .write(&file_path, b"manifest content".as_slice())
496            .await
497            .unwrap();
498        // Add to the cache.
499        cache
500            .put(key.to_string(), IndexValue { file_size: 16 })
501            .await;
502
503        // Get the cached value.
504        let value = cache.get(key).await.unwrap();
505        assert_eq!(16, value.file_size);
506
507        // Get weighted size.
508        cache.index.run_pending_tasks().await;
509        assert_eq!(59, cache.index.weighted_size());
510
511        // Remove the file.
512        cache.remove(key).await;
513        cache.index.run_pending_tasks().await;
514        assert!(cache.get(key).await.is_none());
515
516        // Ensure all pending tasks of the moka cache is done before assertion.
517        cache.index.run_pending_tasks().await;
518
519        // The file also not exists.
520        assert!(!local_store.exists(&file_path).await.unwrap());
521        assert_eq!(0, cache.index.weighted_size());
522    }
523
524    #[tokio::test]
525    async fn test_manifest_cache_recover() {
526        common_telemetry::init_default_ut_logging();
527
528        let dir = create_temp_dir("");
529        let local_store = new_fs_store(dir.path().to_str().unwrap());
530        let cache = ManifestCache::new(local_store.clone(), ReadableSize::mb(10), None).await;
531
532        // Write some manifest files with different paths
533        let keys = [
534            "region_1/manifest/00000000000000000001.json",
535            "region_1/manifest/00000000000000000002.json",
536            "region_1/manifest/00000000000000000001.checkpoint",
537            "region_2/manifest/00000000000000000001.json",
538        ];
539
540        let mut total_size = 0;
541        for (i, key) in keys.iter().enumerate() {
542            let file_path = cache.cache_file_path(key);
543            let content = format!("manifest-{}", i).into_bytes();
544            local_store
545                .write(&file_path, content.clone())
546                .await
547                .unwrap();
548
549            // Add to the cache.
550            cache
551                .put(
552                    key.to_string(),
553                    IndexValue {
554                        file_size: content.len() as u32,
555                    },
556                )
557                .await;
558            total_size += content.len() + key.len();
559        }
560
561        // Create a new cache instance which will automatically recover from local store
562        let cache = ManifestCache::new(local_store.clone(), ReadableSize::mb(10), None).await;
563
564        // Wait for recovery to complete synchronously
565        cache.recover(true).await;
566
567        // Check size.
568        cache.index.run_pending_tasks().await;
569        let total_cached = cache.index.weighted_size() as usize;
570        assert_eq!(total_size, total_cached);
571
572        // Verify all files
573        for (i, key) in keys.iter().enumerate() {
574            let value = cache.get(key).await.unwrap();
575            assert_eq!(format!("manifest-{}", i).len() as u32, value.file_size);
576        }
577    }
578
579    #[tokio::test]
580    async fn test_cache_file_path() {
581        let dir = create_temp_dir("");
582        let local_store = new_fs_store(dir.path().to_str().unwrap());
583        let cache = ManifestCache::new(local_store, ReadableSize::mb(10), None).await;
584
585        assert_eq!(
586            "cache/object/manifest/region_1/manifest/00000000000000000007.json",
587            cache.cache_file_path("region_1/manifest/00000000000000000007.json")
588        );
589        assert_eq!(
590            "cache/object/manifest/region_1/manifest/00000000000000000007.checkpoint",
591            cache.cache_file_path("region_1/manifest/00000000000000000007.checkpoint")
592        );
593    }
594
595    #[tokio::test]
596    async fn test_clean_empty_dirs_sync_no_mtime_check() {
597        common_telemetry::init_default_ut_logging();
598
599        let dir = create_temp_dir("");
600        let root = PathBuf::from(dir.path());
601
602        // Create a directory structure:
603        // root/
604        //   empty_dir1/
605        //   empty_dir2/
606        //     empty_subdir/
607        //   non_empty_dir/
608        //     file.txt
609        //   nested/
610        //     empty_subdir1/
611        //     non_empty_subdir/
612        //       file.txt
613
614        let empty_dir1 = root.join("empty_dir1");
615        let empty_dir2 = root.join("empty_dir2");
616        let empty_subdir = empty_dir2.join("empty_subdir");
617        let non_empty_dir = root.join("non_empty_dir");
618        let nested = root.join("nested");
619        let nested_empty = nested.join("empty_subdir1");
620        let nested_non_empty = nested.join("non_empty_subdir");
621
622        // Create directories
623        std::fs::create_dir_all(&empty_dir1).unwrap();
624        std::fs::create_dir_all(&empty_subdir).unwrap();
625        std::fs::create_dir_all(&non_empty_dir).unwrap();
626        std::fs::create_dir_all(&nested_empty).unwrap();
627        std::fs::create_dir_all(&nested_non_empty).unwrap();
628
629        // Create files in non-empty directories
630        std::fs::write(non_empty_dir.join("file.txt"), b"content").unwrap();
631        std::fs::write(nested_non_empty.join("file.txt"), b"content").unwrap();
632
633        // Verify initial state
634        assert!(empty_dir1.exists());
635        assert!(empty_dir2.exists());
636        assert!(empty_subdir.exists());
637        assert!(non_empty_dir.exists());
638        assert!(nested.exists());
639        assert!(nested_empty.exists());
640        assert!(nested_non_empty.exists());
641
642        // Clean empty directories with check_mtime = false
643        ManifestCache::clean_empty_dirs_sync(&root, false).unwrap();
644
645        // Verify empty directories are removed
646        assert!(!empty_dir1.exists());
647        assert!(!empty_dir2.exists());
648        assert!(!empty_subdir.exists());
649        assert!(!nested_empty.exists());
650
651        // Verify non-empty directories still exist
652        assert!(non_empty_dir.exists());
653        assert!(non_empty_dir.join("file.txt").exists());
654        assert!(nested.exists());
655        assert!(nested_non_empty.exists());
656        assert!(nested_non_empty.join("file.txt").exists());
657    }
658
659    #[tokio::test]
660    async fn test_clean_empty_dirs_sync_with_mtime_check() {
661        common_telemetry::init_default_ut_logging();
662
663        let dir = create_temp_dir("");
664        let root = PathBuf::from(dir.path());
665
666        // Create a directory structure with recently created empty directories
667        // root/
668        //   empty_dir1/
669        //   empty_dir2/
670        //     empty_subdir/
671        //   non_empty_dir/
672        //     file.txt
673
674        let empty_dir1 = root.join("empty_dir1");
675        let empty_dir2 = root.join("empty_dir2");
676        let empty_subdir = empty_dir2.join("empty_subdir");
677        let non_empty_dir = root.join("non_empty_dir");
678
679        // Create directories
680        std::fs::create_dir_all(&empty_dir1).unwrap();
681        std::fs::create_dir_all(&empty_subdir).unwrap();
682        std::fs::create_dir_all(&non_empty_dir).unwrap();
683
684        // Create file in non-empty directory
685        std::fs::write(non_empty_dir.join("file.txt"), b"content").unwrap();
686
687        // Verify initial state
688        assert!(empty_dir1.exists());
689        assert!(empty_dir2.exists());
690        assert!(empty_subdir.exists());
691        assert!(non_empty_dir.exists());
692
693        // Clean empty directories with check_mtime = true
694        // Since the directories were just created (mtime < 1 hour), they should NOT be removed
695        ManifestCache::clean_empty_dirs_sync(&root, true).unwrap();
696
697        // Verify empty directories are NOT removed (they're too recent)
698        assert!(empty_dir1.exists());
699        assert!(empty_dir2.exists());
700        assert!(empty_subdir.exists());
701
702        // Verify non-empty directory still exists
703        assert!(non_empty_dir.exists());
704        assert!(non_empty_dir.join("file.txt").exists());
705    }
706}