mito2/cache/
file_cache.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A cache for files.
16
17use std::ops::Range;
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use bytes::Bytes;
22use common_base::readable_size::ReadableSize;
23use common_telemetry::{error, info, warn};
24use futures::{FutureExt, TryStreamExt};
25use moka::future::Cache;
26use moka::notification::RemovalCause;
27use moka::policy::EvictionPolicy;
28use object_store::util::join_path;
29use object_store::{ErrorKind, ObjectStore, Reader};
30use parquet::file::metadata::ParquetMetaData;
31use snafu::ResultExt;
32use store_api::storage::RegionId;
33
34use crate::cache::FILE_TYPE;
35use crate::error::{OpenDalSnafu, Result};
36use crate::metrics::{CACHE_BYTES, CACHE_HIT, CACHE_MISS};
37use crate::sst::file::FileId;
38use crate::sst::parquet::helper::fetch_byte_ranges;
39use crate::sst::parquet::metadata::MetadataLoader;
40
41/// Subdirectory of cached files for write.
42///
43/// This must contain three layers, corresponding to [`build_prometheus_metrics_layer`](object_store::layers::build_prometheus_metrics_layer).
44const FILE_DIR: &str = "cache/object/write/";
45
46/// A file cache manages files on local store and evict files based
47/// on size.
48#[derive(Debug)]
49pub(crate) struct FileCache {
50    /// Local store to cache files.
51    local_store: ObjectStore,
52    /// Index to track cached files.
53    ///
54    /// File id is enough to identity a file uniquely.
55    memory_index: Cache<IndexKey, IndexValue>,
56}
57
58pub(crate) type FileCacheRef = Arc<FileCache>;
59
60impl FileCache {
61    /// Creates a new file cache.
62    pub(crate) fn new(
63        local_store: ObjectStore,
64        capacity: ReadableSize,
65        ttl: Option<Duration>,
66    ) -> FileCache {
67        let cache_store = local_store.clone();
68        let mut builder = Cache::builder()
69            .eviction_policy(EvictionPolicy::lru())
70            .weigher(|_key, value: &IndexValue| -> u32 {
71                // We only measure space on local store.
72                value.file_size
73            })
74            .max_capacity(capacity.as_bytes())
75            .async_eviction_listener(move |key, value, cause| {
76                let store = cache_store.clone();
77                // Stores files under FILE_DIR.
78                let file_path = cache_file_path(FILE_DIR, *key);
79                async move {
80                    if let RemovalCause::Replaced = cause {
81                        // The cache is replaced by another file. This is unexpected, we don't remove the same
82                        // file but updates the metrics as the file is already replaced by users.
83                        CACHE_BYTES.with_label_values(&[FILE_TYPE]).sub(value.file_size.into());
84                        warn!("Replace existing cache {} for region {} unexpectedly", file_path, key.region_id);
85                        return;
86                    }
87
88                    match store.delete(&file_path).await {
89                        Ok(()) => {
90                            CACHE_BYTES.with_label_values(&[FILE_TYPE]).sub(value.file_size.into());
91                        }
92                        Err(e) => {
93                            warn!(e; "Failed to delete cached file {} for region {}", file_path, key.region_id);
94                        }
95                    }
96                }
97                .boxed()
98            });
99        if let Some(ttl) = ttl {
100            builder = builder.time_to_idle(ttl);
101        }
102        let memory_index = builder.build();
103        FileCache {
104            local_store,
105            memory_index,
106        }
107    }
108
109    /// Puts a file into the cache index.
110    ///
111    /// The `WriteCache` should ensure the file is in the correct path.
112    pub(crate) async fn put(&self, key: IndexKey, value: IndexValue) {
113        CACHE_BYTES
114            .with_label_values(&[FILE_TYPE])
115            .add(value.file_size.into());
116        self.memory_index.insert(key, value).await;
117
118        // Since files are large items, we run the pending tasks immediately.
119        self.memory_index.run_pending_tasks().await;
120    }
121
122    pub(crate) async fn get(&self, key: IndexKey) -> Option<IndexValue> {
123        self.memory_index.get(&key).await
124    }
125
126    /// Reads a file from the cache.
127    #[allow(unused)]
128    pub(crate) async fn reader(&self, key: IndexKey) -> Option<Reader> {
129        // We must use `get()` to update the estimator of the cache.
130        // See https://docs.rs/moka/latest/moka/future/struct.Cache.html#method.contains_key
131        if self.memory_index.get(&key).await.is_none() {
132            CACHE_MISS.with_label_values(&[FILE_TYPE]).inc();
133            return None;
134        }
135
136        let file_path = self.cache_file_path(key);
137        match self.get_reader(&file_path).await {
138            Ok(Some(reader)) => {
139                CACHE_HIT.with_label_values(&[FILE_TYPE]).inc();
140                return Some(reader);
141            }
142            Err(e) => {
143                if e.kind() != ErrorKind::NotFound {
144                    warn!(e; "Failed to get file for key {:?}", key);
145                }
146            }
147            Ok(None) => {}
148        }
149
150        // We removes the file from the index.
151        self.memory_index.remove(&key).await;
152        CACHE_MISS.with_label_values(&[FILE_TYPE]).inc();
153        None
154    }
155
156    /// Reads ranges from the cache.
157    pub(crate) async fn read_ranges(
158        &self,
159        key: IndexKey,
160        ranges: &[Range<u64>],
161    ) -> Option<Vec<Bytes>> {
162        if self.memory_index.get(&key).await.is_none() {
163            CACHE_MISS.with_label_values(&[FILE_TYPE]).inc();
164            return None;
165        }
166
167        let file_path = self.cache_file_path(key);
168        // In most cases, it will use blocking read,
169        // because FileCache is normally based on local file system, which supports blocking read.
170        let bytes_result = fetch_byte_ranges(&file_path, self.local_store.clone(), ranges).await;
171        match bytes_result {
172            Ok(bytes) => {
173                CACHE_HIT.with_label_values(&[FILE_TYPE]).inc();
174                Some(bytes)
175            }
176            Err(e) => {
177                if e.kind() != ErrorKind::NotFound {
178                    warn!(e; "Failed to get file for key {:?}", key);
179                }
180
181                // We removes the file from the index.
182                self.memory_index.remove(&key).await;
183                CACHE_MISS.with_label_values(&[FILE_TYPE]).inc();
184                None
185            }
186        }
187    }
188
189    /// Removes a file from the cache explicitly.
190    /// It always tries to remove the file from the local store because we may not have the file
191    /// in the memory index if upload is failed.
192    pub(crate) async fn remove(&self, key: IndexKey) {
193        let file_path = self.cache_file_path(key);
194        self.memory_index.remove(&key).await;
195        // Always delete the file from the local store.
196        if let Err(e) = self.local_store.delete(&file_path).await {
197            warn!(e; "Failed to delete a cached file {}", file_path);
198        }
199    }
200
201    async fn recover_inner(&self) -> Result<()> {
202        let now = Instant::now();
203        let mut lister = self
204            .local_store
205            .lister_with(FILE_DIR)
206            .await
207            .context(OpenDalSnafu)?;
208        // Use i64 for total_size to reduce the risk of overflow.
209        // It is possible that the total size of the cache is larger than i32::MAX.
210        let (mut total_size, mut total_keys) = (0i64, 0);
211        while let Some(entry) = lister.try_next().await.context(OpenDalSnafu)? {
212            let meta = entry.metadata();
213            if !meta.is_file() {
214                continue;
215            }
216            let Some(key) = parse_index_key(entry.name()) else {
217                continue;
218            };
219
220            let meta = self
221                .local_store
222                .stat(entry.path())
223                .await
224                .context(OpenDalSnafu)?;
225            let file_size = meta.content_length() as u32;
226            self.memory_index
227                .insert(key, IndexValue { file_size })
228                .await;
229            total_size += i64::from(file_size);
230            total_keys += 1;
231        }
232        // The metrics is a signed int gauge so we can updates it finally.
233        CACHE_BYTES.with_label_values(&[FILE_TYPE]).add(total_size);
234
235        // Run all pending tasks of the moka cache so that the cache size is updated
236        // and the eviction policy is applied.
237        self.memory_index.run_pending_tasks().await;
238
239        info!(
240            "Recovered file cache, num_keys: {}, num_bytes: {}, total weight: {}, cost: {:?}",
241            total_keys,
242            total_size,
243            self.memory_index.weighted_size(),
244            now.elapsed()
245        );
246        Ok(())
247    }
248
249    /// Recovers the index from local store.
250    pub(crate) async fn recover(self: &Arc<Self>, sync: bool) {
251        let moved_self = self.clone();
252        let handle = tokio::spawn(async move {
253            if let Err(err) = moved_self.recover_inner().await {
254                error!(err; "Failed to recover file cache.")
255            }
256        });
257
258        if sync {
259            let _ = handle.await;
260        }
261    }
262
263    /// Returns the cache file path for the key.
264    pub(crate) fn cache_file_path(&self, key: IndexKey) -> String {
265        cache_file_path(FILE_DIR, key)
266    }
267
268    /// Returns the local store of the file cache.
269    pub(crate) fn local_store(&self) -> ObjectStore {
270        self.local_store.clone()
271    }
272
273    /// Get the parquet metadata in file cache.
274    /// If the file is not in the cache or fail to load metadata, return None.
275    pub(crate) async fn get_parquet_meta_data(&self, key: IndexKey) -> Option<ParquetMetaData> {
276        // Check if file cache contains the key
277        if let Some(index_value) = self.memory_index.get(&key).await {
278            // Load metadata from file cache
279            let local_store = self.local_store();
280            let file_path = self.cache_file_path(key);
281            let file_size = index_value.file_size as u64;
282            let metadata_loader = MetadataLoader::new(local_store, &file_path, file_size);
283
284            match metadata_loader.load().await {
285                Ok(metadata) => {
286                    CACHE_HIT.with_label_values(&[FILE_TYPE]).inc();
287                    Some(metadata)
288                }
289                Err(e) => {
290                    if !e.is_object_not_found() {
291                        warn!(
292                            e; "Failed to get parquet metadata for key {:?}",
293                            key
294                        );
295                    }
296                    // We removes the file from the index.
297                    self.memory_index.remove(&key).await;
298                    CACHE_MISS.with_label_values(&[FILE_TYPE]).inc();
299                    None
300                }
301            }
302        } else {
303            CACHE_MISS.with_label_values(&[FILE_TYPE]).inc();
304            None
305        }
306    }
307
308    async fn get_reader(&self, file_path: &str) -> object_store::Result<Option<Reader>> {
309        if self.local_store.exists(file_path).await? {
310            Ok(Some(self.local_store.reader(file_path).await?))
311        } else {
312            Ok(None)
313        }
314    }
315
316    /// Checks if the key is in the file cache.
317    #[cfg(test)]
318    pub(crate) fn contains_key(&self, key: &IndexKey) -> bool {
319        self.memory_index.contains_key(key)
320    }
321}
322
323/// Key of file cache index.
324#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
325pub(crate) struct IndexKey {
326    pub region_id: RegionId,
327    pub file_id: FileId,
328    pub file_type: FileType,
329}
330
331impl IndexKey {
332    /// Creates a new index key.
333    pub fn new(region_id: RegionId, file_id: FileId, file_type: FileType) -> IndexKey {
334        IndexKey {
335            region_id,
336            file_id,
337            file_type,
338        }
339    }
340}
341
342/// Type of the file.
343#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
344pub enum FileType {
345    /// Parquet file.
346    Parquet,
347    /// Puffin file.
348    Puffin,
349}
350
351impl FileType {
352    /// Parses the file type from string.
353    fn parse(s: &str) -> Option<FileType> {
354        match s {
355            "parquet" => Some(FileType::Parquet),
356            "puffin" => Some(FileType::Puffin),
357            _ => None,
358        }
359    }
360
361    /// Converts the file type to string.
362    fn as_str(&self) -> &'static str {
363        match self {
364            FileType::Parquet => "parquet",
365            FileType::Puffin => "puffin",
366        }
367    }
368}
369
370/// An entity that describes the file in the file cache.
371///
372/// It should only keep minimal information needed by the cache.
373#[derive(Debug, Clone)]
374pub(crate) struct IndexValue {
375    /// Size of the file in bytes.
376    pub(crate) file_size: u32,
377}
378
379/// Generates the path to the cached file.
380///
381/// The file name format is `{region_id}.{file_id}.{file_type}`
382fn cache_file_path(cache_file_dir: &str, key: IndexKey) -> String {
383    join_path(
384        cache_file_dir,
385        &format!(
386            "{}.{}.{}",
387            key.region_id.as_u64(),
388            key.file_id,
389            key.file_type.as_str()
390        ),
391    )
392}
393
394/// Parse index key from the file name.
395fn parse_index_key(name: &str) -> Option<IndexKey> {
396    let mut split = name.splitn(3, '.');
397    let region_id = split.next().and_then(|s| {
398        let id = s.parse::<u64>().ok()?;
399        Some(RegionId::from_u64(id))
400    })?;
401    let file_id = split.next().and_then(|s| FileId::parse_str(s).ok())?;
402    let file_type = split.next().and_then(FileType::parse)?;
403
404    Some(IndexKey::new(region_id, file_id, file_type))
405}
406
407#[cfg(test)]
408mod tests {
409    use common_test_util::temp_dir::create_temp_dir;
410    use object_store::services::Fs;
411
412    use super::*;
413
414    fn new_fs_store(path: &str) -> ObjectStore {
415        let builder = Fs::default().root(path);
416        ObjectStore::new(builder).unwrap().finish()
417    }
418
419    #[tokio::test]
420    async fn test_file_cache_ttl() {
421        let dir = create_temp_dir("");
422        let local_store = new_fs_store(dir.path().to_str().unwrap());
423
424        let cache = FileCache::new(
425            local_store.clone(),
426            ReadableSize::mb(10),
427            Some(Duration::from_millis(10)),
428        );
429        let region_id = RegionId::new(2000, 0);
430        let file_id = FileId::random();
431        let key = IndexKey::new(region_id, file_id, FileType::Parquet);
432        let file_path = cache.cache_file_path(key);
433
434        // Get an empty file.
435        assert!(cache.reader(key).await.is_none());
436
437        // Write a file.
438        local_store
439            .write(&file_path, b"hello".as_slice())
440            .await
441            .unwrap();
442
443        // Add to the cache.
444        cache
445            .put(
446                IndexKey::new(region_id, file_id, FileType::Parquet),
447                IndexValue { file_size: 5 },
448            )
449            .await;
450
451        let exist = cache.reader(key).await;
452        assert!(exist.is_some());
453        tokio::time::sleep(Duration::from_millis(15)).await;
454        cache.memory_index.run_pending_tasks().await;
455        let non = cache.reader(key).await;
456        assert!(non.is_none());
457    }
458
459    #[tokio::test]
460    async fn test_file_cache_basic() {
461        let dir = create_temp_dir("");
462        let local_store = new_fs_store(dir.path().to_str().unwrap());
463
464        let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None);
465        let region_id = RegionId::new(2000, 0);
466        let file_id = FileId::random();
467        let key = IndexKey::new(region_id, file_id, FileType::Parquet);
468        let file_path = cache.cache_file_path(key);
469
470        // Get an empty file.
471        assert!(cache.reader(key).await.is_none());
472
473        // Write a file.
474        local_store
475            .write(&file_path, b"hello".as_slice())
476            .await
477            .unwrap();
478        // Add to the cache.
479        cache
480            .put(
481                IndexKey::new(region_id, file_id, FileType::Parquet),
482                IndexValue { file_size: 5 },
483            )
484            .await;
485
486        // Read file content.
487        let reader = cache.reader(key).await.unwrap();
488        let buf = reader.read(..).await.unwrap().to_vec();
489        assert_eq!("hello", String::from_utf8(buf).unwrap());
490
491        // Get weighted size.
492        cache.memory_index.run_pending_tasks().await;
493        assert_eq!(5, cache.memory_index.weighted_size());
494
495        // Remove the file.
496        cache.remove(key).await;
497        assert!(cache.reader(key).await.is_none());
498
499        // Ensure all pending tasks of the moka cache is done before assertion.
500        cache.memory_index.run_pending_tasks().await;
501
502        // The file also not exists.
503        assert!(!local_store.exists(&file_path).await.unwrap());
504        assert_eq!(0, cache.memory_index.weighted_size());
505    }
506
507    #[tokio::test]
508    async fn test_file_cache_file_removed() {
509        let dir = create_temp_dir("");
510        let local_store = new_fs_store(dir.path().to_str().unwrap());
511
512        let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None);
513        let region_id = RegionId::new(2000, 0);
514        let file_id = FileId::random();
515        let key = IndexKey::new(region_id, file_id, FileType::Parquet);
516        let file_path = cache.cache_file_path(key);
517
518        // Write a file.
519        local_store
520            .write(&file_path, b"hello".as_slice())
521            .await
522            .unwrap();
523        // Add to the cache.
524        cache
525            .put(
526                IndexKey::new(region_id, file_id, FileType::Parquet),
527                IndexValue { file_size: 5 },
528            )
529            .await;
530
531        // Remove the file but keep the index.
532        local_store.delete(&file_path).await.unwrap();
533
534        // Reader is none.
535        assert!(cache.reader(key).await.is_none());
536        // Key is removed.
537        assert!(!cache.memory_index.contains_key(&key));
538    }
539
540    #[tokio::test]
541    async fn test_file_cache_recover() {
542        let dir = create_temp_dir("");
543        let local_store = new_fs_store(dir.path().to_str().unwrap());
544        let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None);
545
546        let region_id = RegionId::new(2000, 0);
547        let file_type = FileType::Parquet;
548        // Write N files.
549        let file_ids: Vec<_> = (0..10).map(|_| FileId::random()).collect();
550        let mut total_size = 0;
551        for (i, file_id) in file_ids.iter().enumerate() {
552            let key = IndexKey::new(region_id, *file_id, file_type);
553            let file_path = cache.cache_file_path(key);
554            let bytes = i.to_string().into_bytes();
555            local_store.write(&file_path, bytes.clone()).await.unwrap();
556
557            // Add to the cache.
558            cache
559                .put(
560                    IndexKey::new(region_id, *file_id, file_type),
561                    IndexValue {
562                        file_size: bytes.len() as u32,
563                    },
564                )
565                .await;
566            total_size += bytes.len();
567        }
568
569        // Recover the cache.
570        let cache = Arc::new(FileCache::new(
571            local_store.clone(),
572            ReadableSize::mb(10),
573            None,
574        ));
575        // No entry before recovery.
576        assert!(cache
577            .reader(IndexKey::new(region_id, file_ids[0], file_type))
578            .await
579            .is_none());
580        cache.recover(true).await;
581
582        // Check size.
583        cache.memory_index.run_pending_tasks().await;
584        assert_eq!(total_size, cache.memory_index.weighted_size() as usize);
585
586        for (i, file_id) in file_ids.iter().enumerate() {
587            let key = IndexKey::new(region_id, *file_id, file_type);
588            let reader = cache.reader(key).await.unwrap();
589            let buf = reader.read(..).await.unwrap().to_vec();
590            assert_eq!(i.to_string(), String::from_utf8(buf).unwrap());
591        }
592    }
593
594    #[tokio::test]
595    async fn test_file_cache_read_ranges() {
596        let dir = create_temp_dir("");
597        let local_store = new_fs_store(dir.path().to_str().unwrap());
598        let file_cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None);
599        let region_id = RegionId::new(2000, 0);
600        let file_id = FileId::random();
601        let key = IndexKey::new(region_id, file_id, FileType::Parquet);
602        let file_path = file_cache.cache_file_path(key);
603        // Write a file.
604        let data = b"hello greptime database";
605        local_store
606            .write(&file_path, data.as_slice())
607            .await
608            .unwrap();
609        // Add to the cache.
610        file_cache.put(key, IndexValue { file_size: 5 }).await;
611        // Ranges
612        let ranges = vec![0..5, 6..10, 15..19, 0..data.len() as u64];
613        let bytes = file_cache.read_ranges(key, &ranges).await.unwrap();
614
615        assert_eq!(4, bytes.len());
616        assert_eq!(b"hello", bytes[0].as_ref());
617        assert_eq!(b"grep", bytes[1].as_ref());
618        assert_eq!(b"data", bytes[2].as_ref());
619        assert_eq!(data, bytes[3].as_ref());
620    }
621
622    #[test]
623    fn test_cache_file_path() {
624        let file_id = FileId::parse_str("3368731b-a556-42b8-a5df-9c31ce155095").unwrap();
625        assert_eq!(
626            "test_dir/5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet",
627            cache_file_path(
628                "test_dir",
629                IndexKey::new(RegionId::new(1234, 5), file_id, FileType::Parquet)
630            )
631        );
632        assert_eq!(
633            "test_dir/5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet",
634            cache_file_path(
635                "test_dir/",
636                IndexKey::new(RegionId::new(1234, 5), file_id, FileType::Parquet)
637            )
638        );
639    }
640
641    #[test]
642    fn test_parse_file_name() {
643        let file_id = FileId::parse_str("3368731b-a556-42b8-a5df-9c31ce155095").unwrap();
644        let region_id = RegionId::new(1234, 5);
645        assert_eq!(
646            IndexKey::new(region_id, file_id, FileType::Parquet),
647            parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet").unwrap()
648        );
649        assert!(parse_index_key("").is_none());
650        assert!(parse_index_key(".").is_none());
651        assert!(parse_index_key("5299989643269").is_none());
652        assert!(parse_index_key("5299989643269.").is_none());
653        assert!(parse_index_key(".5299989643269").is_none());
654        assert!(parse_index_key("5299989643269.").is_none());
655        assert!(parse_index_key("5299989643269.3368731b-a556-42b8-a5df").is_none());
656        assert!(parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095").is_none());
657        assert!(
658            parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parque").is_none()
659        );
660        assert!(parse_index_key(
661            "5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet.puffin"
662        )
663        .is_none());
664    }
665}