mito2/cache/
manifest_cache.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A cache for manifest files.
16
17use std::path::PathBuf;
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use common_base::readable_size::ReadableSize;
22use common_telemetry::{error, info, warn};
23use futures::{FutureExt, TryStreamExt};
24use moka::future::Cache;
25use moka::notification::RemovalCause;
26use moka::policy::EvictionPolicy;
27use object_store::ObjectStore;
28use object_store::util::join_path;
29use snafu::ResultExt;
30
31use crate::error::{OpenDalSnafu, Result};
32use crate::metrics::{CACHE_BYTES, CACHE_HIT, CACHE_MISS};
33
34/// Subdirectory of cached manifest files.
35///
36/// This must contain three layers, corresponding to [`build_prometheus_metrics_layer`](object_store::layers::build_prometheus_metrics_layer).
37const MANIFEST_DIR: &str = "cache/object/manifest/";
38
39/// Metric label for manifest files.
40const MANIFEST_TYPE: &str = "manifest";
41
42/// A manifest cache manages manifest files on local store and evicts files based
43/// on size.
44#[derive(Debug, Clone)]
45pub struct ManifestCache {
46    /// Local store to cache files.
47    local_store: ObjectStore,
48    /// Index to track cached manifest files.
49    index: Cache<String, IndexValue>,
50}
51
52impl ManifestCache {
53    /// Creates a new manifest cache and recovers the index from local store.
54    pub async fn new(
55        local_store: ObjectStore,
56        capacity: ReadableSize,
57        ttl: Option<Duration>,
58    ) -> ManifestCache {
59        let total_capacity = capacity.as_bytes();
60
61        info!(
62            "Initializing manifest cache with capacity: {}",
63            ReadableSize(total_capacity)
64        );
65
66        let index = Self::build_cache(local_store.clone(), total_capacity, ttl);
67
68        let cache = ManifestCache { local_store, index };
69
70        // Recovers the cache index from local store asynchronously
71        cache.recover(false).await;
72
73        cache
74    }
75
76    /// Builds the cache.
77    fn build_cache(
78        local_store: ObjectStore,
79        capacity: u64,
80        ttl: Option<Duration>,
81    ) -> Cache<String, IndexValue> {
82        let cache_store = local_store;
83        let mut builder = Cache::builder()
84            .eviction_policy(EvictionPolicy::lru())
85            .weigher(|key: &String, value: &IndexValue| -> u32 {
86                key.len() as u32 + value.file_size
87            })
88            .max_capacity(capacity)
89            .async_eviction_listener(move |key: Arc<String>, value: IndexValue, cause| {
90                let store = cache_store.clone();
91                // Stores files under MANIFEST_DIR.
92                let file_path = join_path(MANIFEST_DIR, &key);
93                async move {
94                    if let RemovalCause::Replaced = cause {
95                        // The cache is replaced by another file. We don't remove the same
96                        // file but updates the metrics as the file is already replaced by users.
97                        CACHE_BYTES
98                            .with_label_values(&[MANIFEST_TYPE])
99                            .sub(value.file_size.into());
100                        return;
101                    }
102
103                    match store.delete(&file_path).await {
104                        Ok(()) => {
105                            CACHE_BYTES
106                                .with_label_values(&[MANIFEST_TYPE])
107                                .sub(value.file_size.into());
108                        }
109                        Err(e) => {
110                            warn!(e; "Failed to delete cached manifest file {}", file_path);
111                        }
112                    }
113                }
114                .boxed()
115            });
116        if let Some(ttl) = ttl {
117            builder = builder.time_to_idle(ttl);
118        }
119        builder.build()
120    }
121
122    /// Puts a file into the cache index.
123    ///
124    /// The caller should ensure the file is in the correct path.
125    pub(crate) async fn put(&self, key: String, value: IndexValue) {
126        CACHE_BYTES
127            .with_label_values(&[MANIFEST_TYPE])
128            .add(value.file_size.into());
129        self.index.insert(key, value).await;
130
131        // Since files can be large items, we run the pending tasks immediately.
132        self.index.run_pending_tasks().await;
133    }
134
135    /// Gets the index value for the key.
136    pub(crate) async fn get(&self, key: &str) -> Option<IndexValue> {
137        self.index.get(key).await
138    }
139
140    /// Removes a file from the cache explicitly.
141    pub(crate) async fn remove(&self, key: &str) {
142        let file_path = self.cache_file_path(key);
143        self.index.remove(key).await;
144        // Always deletes the file from the local store.
145        if let Err(e) = self.local_store.delete(&file_path).await {
146            warn!(e; "Failed to delete a cached manifest file {}", file_path);
147        }
148    }
149
150    /// Removes multiple files from the cache in batch.
151    pub(crate) async fn remove_batch(&self, keys: &[String]) {
152        if keys.is_empty() {
153            return;
154        }
155
156        for key in keys {
157            self.index.remove(key).await;
158        }
159
160        let file_paths: Vec<String> = keys.iter().map(|key| self.cache_file_path(key)).collect();
161
162        if let Err(e) = self.local_store.delete_iter(file_paths).await {
163            warn!(e; "Failed to delete cached manifest files in batch");
164        }
165    }
166
167    async fn recover_inner(&self) -> Result<()> {
168        let now = Instant::now();
169        let mut lister = self
170            .local_store
171            .lister_with(MANIFEST_DIR)
172            .recursive(true)
173            .await
174            .context(OpenDalSnafu)?;
175        let (mut total_size, mut total_keys) = (0i64, 0);
176        while let Some(entry) = lister.try_next().await.context(OpenDalSnafu)? {
177            let meta = entry.metadata();
178            if !meta.is_file() {
179                continue;
180            }
181
182            let meta = self
183                .local_store
184                .stat(entry.path())
185                .await
186                .context(OpenDalSnafu)?;
187            let file_size = meta.content_length() as u32;
188            let key = entry.path().trim_start_matches(MANIFEST_DIR).to_string();
189            common_telemetry::info!("Manifest cache recover {}, size: {}", key, file_size);
190            self.index.insert(key, IndexValue { file_size }).await;
191            let size = i64::from(file_size);
192            total_size += size;
193            total_keys += 1;
194        }
195        CACHE_BYTES
196            .with_label_values(&[MANIFEST_TYPE])
197            .add(total_size);
198
199        // Runs all pending tasks of the moka cache so that the cache size is updated
200        // and the eviction policy is applied.
201        self.index.run_pending_tasks().await;
202
203        let weight = self.index.weighted_size();
204        let count = self.index.entry_count();
205        info!(
206            "Recovered manifest cache, num_keys: {}, num_bytes: {}, count: {}, weight: {}, cost: {:?}",
207            total_keys,
208            total_size,
209            count,
210            weight,
211            now.elapsed()
212        );
213        Ok(())
214    }
215
216    /// Recovers the index from local store.
217    pub(crate) async fn recover(&self, sync: bool) {
218        let moved_self = self.clone();
219        let handle = tokio::spawn(async move {
220            if let Err(err) = moved_self.recover_inner().await {
221                error!(err; "Failed to recover manifest cache.")
222            }
223
224            moved_self.clean_empty_dirs(true).await;
225        });
226
227        if sync {
228            let _ = handle.await;
229        }
230    }
231
232    /// Returns the cache file path for the key.
233    pub(crate) fn cache_file_path(&self, key: &str) -> String {
234        join_path(MANIFEST_DIR, key)
235    }
236
237    /// Gets a manifest file from cache.
238    /// Returns the file data if found in cache, None otherwise.
239    pub(crate) async fn get_file(&self, key: &str) -> Option<Vec<u8>> {
240        if self.get(key).await.is_none() {
241            CACHE_MISS.with_label_values(&[MANIFEST_TYPE]).inc();
242            return None;
243        }
244
245        let cache_file_path = self.cache_file_path(key);
246        match self.local_store.read(&cache_file_path).await {
247            Ok(data) => {
248                CACHE_HIT.with_label_values(&[MANIFEST_TYPE]).inc();
249                Some(data.to_vec())
250            }
251            Err(e) => {
252                warn!(e; "Failed to read cached manifest file {}", cache_file_path);
253                CACHE_MISS.with_label_values(&[MANIFEST_TYPE]).inc();
254                None
255            }
256        }
257    }
258
259    /// Puts a manifest file into cache.
260    pub(crate) async fn put_file(&self, key: String, data: Vec<u8>) {
261        let cache_file_path = self.cache_file_path(&key);
262
263        if let Err(e) = self.local_store.write(&cache_file_path, data.clone()).await {
264            warn!(e; "Failed to write manifest to cache {}", cache_file_path);
265            return;
266        }
267
268        let file_size = data.len() as u32;
269        self.put(key, IndexValue { file_size }).await;
270    }
271
272    /// Removes empty directories recursively under the manifest cache directory.
273    ///
274    /// If `check_mtime` is true, only removes directories that have not been modified
275    /// for at least 1 hour.
276    pub(crate) async fn clean_empty_dirs(&self, check_mtime: bool) {
277        info!("Clean empty dirs start");
278
279        let root = self.local_store.info().root();
280        let manifest_dir = PathBuf::from(root).join(MANIFEST_DIR);
281        let manifest_dir_clone = manifest_dir.clone();
282
283        let result = tokio::task::spawn_blocking(move || {
284            Self::clean_empty_dirs_sync(&manifest_dir_clone, check_mtime)
285        })
286        .await;
287
288        match result {
289            Ok(Ok(())) => {
290                info!("Clean empty dirs end");
291            }
292            Ok(Err(e)) => {
293                warn!(e; "Failed to clean empty directories under {}", manifest_dir.display());
294            }
295            Err(e) => {
296                warn!(e; "Failed to spawn blocking task for cleaning empty directories");
297            }
298        }
299    }
300
301    /// Removes all manifest files under the given directory from cache and cleans up empty directories.
302    pub(crate) async fn clean_manifests(&self, dir: &str) {
303        info!("Clean manifest cache for directory: {}", dir);
304
305        let cache_dir = join_path(MANIFEST_DIR, dir);
306        let mut lister = match self
307            .local_store
308            .lister_with(&cache_dir)
309            .recursive(true)
310            .await
311        {
312            Ok(lister) => lister,
313            Err(e) => {
314                warn!(e; "Failed to list manifest files under {}", cache_dir);
315                return;
316            }
317        };
318
319        let mut keys_to_remove = Vec::new();
320        loop {
321            match lister.try_next().await {
322                Ok(Some(entry)) => {
323                    let meta = entry.metadata();
324                    if meta.is_file() {
325                        keys_to_remove
326                            .push(entry.path().trim_start_matches(MANIFEST_DIR).to_string());
327                    }
328                }
329                Ok(None) => break,
330                Err(e) => {
331                    warn!(e; "Failed to read entry while listing {}", cache_dir);
332                    break;
333                }
334            }
335        }
336
337        info!(
338            "Going to remove files from manifest cache, files: {:?}",
339            keys_to_remove
340        );
341
342        // Removes all files from cache in batch
343        self.remove_batch(&keys_to_remove).await;
344
345        // Cleans up empty directories under the given dir
346        let root = self.local_store.info().root();
347        let dir_path = PathBuf::from(root).join(&cache_dir);
348        let dir_path_clone = dir_path.clone();
349
350        let result = tokio::task::spawn_blocking(move || {
351            Self::clean_empty_dirs_sync(&dir_path_clone, false)
352        })
353        .await;
354
355        match result {
356            Ok(Ok(())) => {
357                info!("Cleaned manifest cache for directory: {}", dir);
358            }
359            Ok(Err(e)) => {
360                warn!(e; "Failed to clean empty directories under {}", dir_path.display());
361            }
362            Err(e) => {
363                warn!(e; "Failed to spawn blocking task for cleaning empty directories");
364            }
365        }
366    }
367
368    /// Synchronously removes empty directories recursively.
369    ///
370    /// If `check_mtime` is true, only removes directories that have not been modified
371    /// for at least 1 hour.
372    fn clean_empty_dirs_sync(dir: &PathBuf, check_mtime: bool) -> std::io::Result<()> {
373        Self::remove_empty_dirs_recursive_sync(dir, check_mtime)?;
374        Ok(())
375    }
376
377    fn remove_empty_dirs_recursive_sync(dir: &PathBuf, check_mtime: bool) -> std::io::Result<bool> {
378        common_telemetry::debug!(
379            "Maybe remove empty dir: {:?}, check_mtime: {}",
380            dir,
381            check_mtime
382        );
383        let entries = match std::fs::read_dir(dir) {
384            Ok(entries) => entries,
385            Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
386                // Directory doesn't exist, treat as already removed (empty)
387                return Ok(true);
388            }
389            Err(e) => return Err(e),
390        };
391
392        let mut is_empty = true;
393        // Iterates all entries under the directory.
394        // We have to check all entries to clean up all empty subdirectories.
395        for entry in entries {
396            let entry = entry?;
397            let path = entry.path();
398            let metadata = std::fs::metadata(&path)?;
399
400            if metadata.is_dir() {
401                // Checks if we should skip this directory based on modification time
402                if check_mtime
403                    && let Ok(modified) = metadata.modified()
404                    && let Ok(elapsed) = modified.elapsed()
405                    && elapsed < Duration::from_secs(3600)
406                {
407                    common_telemetry::debug!("Skip directory by mtime, elapsed: {:?}", elapsed);
408                    // Only removes if not modified for at least 1 hour.
409                    is_empty = false;
410                    continue;
411                }
412
413                let subdir_empty = Self::remove_empty_dirs_recursive_sync(&path, check_mtime)?;
414                if subdir_empty {
415                    if let Err(e) = std::fs::remove_dir(&path)
416                        && e.kind() != std::io::ErrorKind::NotFound
417                    {
418                        warn!(e; "Failed to remove empty directory {}", path.display());
419                        is_empty = false;
420                    } else {
421                        info!(
422                            "Removed empty directory {} from manifest cache",
423                            path.display()
424                        );
425                    }
426                } else {
427                    is_empty = false;
428                }
429            } else {
430                is_empty = false;
431            }
432        }
433
434        Ok(is_empty)
435    }
436}
437
438/// An entity that describes the file in the manifest cache.
439///
440/// It should only keep minimal information needed by the cache.
441#[derive(Debug, Clone)]
442pub(crate) struct IndexValue {
443    /// Size of the file in bytes.
444    pub(crate) file_size: u32,
445}
446
447#[cfg(test)]
448mod tests {
449    use common_test_util::temp_dir::create_temp_dir;
450    use object_store::services::Fs;
451
452    use super::*;
453
454    fn new_fs_store(path: &str) -> ObjectStore {
455        let builder = Fs::default().root(path);
456        ObjectStore::new(builder).unwrap().finish()
457    }
458
459    #[tokio::test]
460    async fn test_manifest_cache_basic() {
461        common_telemetry::init_default_ut_logging();
462
463        let dir = create_temp_dir("");
464        let local_store = new_fs_store(dir.path().to_str().unwrap());
465
466        let cache = ManifestCache::new(local_store.clone(), ReadableSize::mb(10), None).await;
467        let key = "region_1/manifest/00000000000000000007.json";
468        let file_path = cache.cache_file_path(key);
469
470        // Get an empty file.
471        assert!(cache.get(key).await.is_none());
472
473        // Write a file.
474        local_store
475            .write(&file_path, b"manifest content".as_slice())
476            .await
477            .unwrap();
478        // Add to the cache.
479        cache
480            .put(key.to_string(), IndexValue { file_size: 16 })
481            .await;
482
483        // Get the cached value.
484        let value = cache.get(key).await.unwrap();
485        assert_eq!(16, value.file_size);
486
487        // Get weighted size.
488        cache.index.run_pending_tasks().await;
489        assert_eq!(59, cache.index.weighted_size());
490
491        // Remove the file.
492        cache.remove(key).await;
493        cache.index.run_pending_tasks().await;
494        assert!(cache.get(key).await.is_none());
495
496        // Ensure all pending tasks of the moka cache is done before assertion.
497        cache.index.run_pending_tasks().await;
498
499        // The file also not exists.
500        assert!(!local_store.exists(&file_path).await.unwrap());
501        assert_eq!(0, cache.index.weighted_size());
502    }
503
504    #[tokio::test]
505    async fn test_manifest_cache_recover() {
506        common_telemetry::init_default_ut_logging();
507
508        let dir = create_temp_dir("");
509        let local_store = new_fs_store(dir.path().to_str().unwrap());
510        let cache = ManifestCache::new(local_store.clone(), ReadableSize::mb(10), None).await;
511
512        // Write some manifest files with different paths
513        let keys = [
514            "region_1/manifest/00000000000000000001.json",
515            "region_1/manifest/00000000000000000002.json",
516            "region_1/manifest/00000000000000000001.checkpoint",
517            "region_2/manifest/00000000000000000001.json",
518        ];
519
520        let mut total_size = 0;
521        for (i, key) in keys.iter().enumerate() {
522            let file_path = cache.cache_file_path(key);
523            let content = format!("manifest-{}", i).into_bytes();
524            local_store
525                .write(&file_path, content.clone())
526                .await
527                .unwrap();
528
529            // Add to the cache.
530            cache
531                .put(
532                    key.to_string(),
533                    IndexValue {
534                        file_size: content.len() as u32,
535                    },
536                )
537                .await;
538            total_size += content.len() + key.len();
539        }
540
541        // Create a new cache instance which will automatically recover from local store
542        let cache = ManifestCache::new(local_store.clone(), ReadableSize::mb(10), None).await;
543
544        // Wait for recovery to complete synchronously
545        cache.recover(true).await;
546
547        // Check size.
548        cache.index.run_pending_tasks().await;
549        let total_cached = cache.index.weighted_size() as usize;
550        assert_eq!(total_size, total_cached);
551
552        // Verify all files
553        for (i, key) in keys.iter().enumerate() {
554            let value = cache.get(key).await.unwrap();
555            assert_eq!(format!("manifest-{}", i).len() as u32, value.file_size);
556        }
557    }
558
559    #[tokio::test]
560    async fn test_cache_file_path() {
561        let dir = create_temp_dir("");
562        let local_store = new_fs_store(dir.path().to_str().unwrap());
563        let cache = ManifestCache::new(local_store, ReadableSize::mb(10), None).await;
564
565        assert_eq!(
566            "cache/object/manifest/region_1/manifest/00000000000000000007.json",
567            cache.cache_file_path("region_1/manifest/00000000000000000007.json")
568        );
569        assert_eq!(
570            "cache/object/manifest/region_1/manifest/00000000000000000007.checkpoint",
571            cache.cache_file_path("region_1/manifest/00000000000000000007.checkpoint")
572        );
573    }
574}