mito2/cache/
file_cache.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A cache for files.
16
17use std::fmt;
18use std::ops::Range;
19use std::sync::Arc;
20use std::time::{Duration, Instant};
21
22use bytes::Bytes;
23use common_base::readable_size::ReadableSize;
24use common_telemetry::{error, info, warn};
25use futures::{FutureExt, TryStreamExt};
26use moka::future::Cache;
27use moka::notification::RemovalCause;
28use moka::policy::EvictionPolicy;
29use object_store::util::join_path;
30use object_store::{ErrorKind, ObjectStore, Reader};
31use parquet::file::metadata::ParquetMetaData;
32use snafu::ResultExt;
33use store_api::storage::RegionId;
34
35use crate::cache::FILE_TYPE;
36use crate::error::{OpenDalSnafu, Result};
37use crate::metrics::{CACHE_BYTES, CACHE_HIT, CACHE_MISS};
38use crate::sst::file::FileId;
39use crate::sst::parquet::helper::fetch_byte_ranges;
40use crate::sst::parquet::metadata::MetadataLoader;
41
42/// Subdirectory of cached files for write.
43///
44/// This must contain three layers, corresponding to [`build_prometheus_metrics_layer`](object_store::layers::build_prometheus_metrics_layer).
45const FILE_DIR: &str = "cache/object/write/";
46
47/// A file cache manages files on local store and evict files based
48/// on size.
49#[derive(Debug)]
50pub(crate) struct FileCache {
51    /// Local store to cache files.
52    local_store: ObjectStore,
53    /// Index to track cached files.
54    ///
55    /// File id is enough to identity a file uniquely.
56    memory_index: Cache<IndexKey, IndexValue>,
57}
58
59pub(crate) type FileCacheRef = Arc<FileCache>;
60
61impl FileCache {
62    /// Creates a new file cache.
63    pub(crate) fn new(
64        local_store: ObjectStore,
65        capacity: ReadableSize,
66        ttl: Option<Duration>,
67    ) -> FileCache {
68        let cache_store = local_store.clone();
69        let mut builder = Cache::builder()
70            .eviction_policy(EvictionPolicy::lru())
71            .weigher(|_key, value: &IndexValue| -> u32 {
72                // We only measure space on local store.
73                value.file_size
74            })
75            .max_capacity(capacity.as_bytes())
76            .async_eviction_listener(move |key, value, cause| {
77                let store = cache_store.clone();
78                // Stores files under FILE_DIR.
79                let file_path = cache_file_path(FILE_DIR, *key);
80                async move {
81                    if let RemovalCause::Replaced = cause {
82                        // The cache is replaced by another file. This is unexpected, we don't remove the same
83                        // file but updates the metrics as the file is already replaced by users.
84                        CACHE_BYTES.with_label_values(&[FILE_TYPE]).sub(value.file_size.into());
85                        warn!("Replace existing cache {} for region {} unexpectedly", file_path, key.region_id);
86                        return;
87                    }
88
89                    match store.delete(&file_path).await {
90                        Ok(()) => {
91                            CACHE_BYTES.with_label_values(&[FILE_TYPE]).sub(value.file_size.into());
92                        }
93                        Err(e) => {
94                            warn!(e; "Failed to delete cached file {} for region {}", file_path, key.region_id);
95                        }
96                    }
97                }
98                .boxed()
99            });
100        if let Some(ttl) = ttl {
101            builder = builder.time_to_idle(ttl);
102        }
103        let memory_index = builder.build();
104        FileCache {
105            local_store,
106            memory_index,
107        }
108    }
109
110    /// Puts a file into the cache index.
111    ///
112    /// The `WriteCache` should ensure the file is in the correct path.
113    pub(crate) async fn put(&self, key: IndexKey, value: IndexValue) {
114        CACHE_BYTES
115            .with_label_values(&[FILE_TYPE])
116            .add(value.file_size.into());
117        self.memory_index.insert(key, value).await;
118
119        // Since files are large items, we run the pending tasks immediately.
120        self.memory_index.run_pending_tasks().await;
121    }
122
123    pub(crate) async fn get(&self, key: IndexKey) -> Option<IndexValue> {
124        self.memory_index.get(&key).await
125    }
126
127    /// Reads a file from the cache.
128    #[allow(unused)]
129    pub(crate) async fn reader(&self, key: IndexKey) -> Option<Reader> {
130        // We must use `get()` to update the estimator of the cache.
131        // See https://docs.rs/moka/latest/moka/future/struct.Cache.html#method.contains_key
132        if self.memory_index.get(&key).await.is_none() {
133            CACHE_MISS.with_label_values(&[FILE_TYPE]).inc();
134            return None;
135        }
136
137        let file_path = self.cache_file_path(key);
138        match self.get_reader(&file_path).await {
139            Ok(Some(reader)) => {
140                CACHE_HIT.with_label_values(&[FILE_TYPE]).inc();
141                return Some(reader);
142            }
143            Err(e) => {
144                if e.kind() != ErrorKind::NotFound {
145                    warn!(e; "Failed to get file for key {:?}", key);
146                }
147            }
148            Ok(None) => {}
149        }
150
151        // We removes the file from the index.
152        self.memory_index.remove(&key).await;
153        CACHE_MISS.with_label_values(&[FILE_TYPE]).inc();
154        None
155    }
156
157    /// Reads ranges from the cache.
158    pub(crate) async fn read_ranges(
159        &self,
160        key: IndexKey,
161        ranges: &[Range<u64>],
162    ) -> Option<Vec<Bytes>> {
163        if self.memory_index.get(&key).await.is_none() {
164            CACHE_MISS.with_label_values(&[FILE_TYPE]).inc();
165            return None;
166        }
167
168        let file_path = self.cache_file_path(key);
169        // In most cases, it will use blocking read,
170        // because FileCache is normally based on local file system, which supports blocking read.
171        let bytes_result = fetch_byte_ranges(&file_path, self.local_store.clone(), ranges).await;
172        match bytes_result {
173            Ok(bytes) => {
174                CACHE_HIT.with_label_values(&[FILE_TYPE]).inc();
175                Some(bytes)
176            }
177            Err(e) => {
178                if e.kind() != ErrorKind::NotFound {
179                    warn!(e; "Failed to get file for key {:?}", key);
180                }
181
182                // We removes the file from the index.
183                self.memory_index.remove(&key).await;
184                CACHE_MISS.with_label_values(&[FILE_TYPE]).inc();
185                None
186            }
187        }
188    }
189
190    /// Removes a file from the cache explicitly.
191    /// It always tries to remove the file from the local store because we may not have the file
192    /// in the memory index if upload is failed.
193    pub(crate) async fn remove(&self, key: IndexKey) {
194        let file_path = self.cache_file_path(key);
195        self.memory_index.remove(&key).await;
196        // Always delete the file from the local store.
197        if let Err(e) = self.local_store.delete(&file_path).await {
198            warn!(e; "Failed to delete a cached file {}", file_path);
199        }
200    }
201
202    async fn recover_inner(&self) -> Result<()> {
203        let now = Instant::now();
204        let mut lister = self
205            .local_store
206            .lister_with(FILE_DIR)
207            .await
208            .context(OpenDalSnafu)?;
209        // Use i64 for total_size to reduce the risk of overflow.
210        // It is possible that the total size of the cache is larger than i32::MAX.
211        let (mut total_size, mut total_keys) = (0i64, 0);
212        while let Some(entry) = lister.try_next().await.context(OpenDalSnafu)? {
213            let meta = entry.metadata();
214            if !meta.is_file() {
215                continue;
216            }
217            let Some(key) = parse_index_key(entry.name()) else {
218                continue;
219            };
220
221            let meta = self
222                .local_store
223                .stat(entry.path())
224                .await
225                .context(OpenDalSnafu)?;
226            let file_size = meta.content_length() as u32;
227            self.memory_index
228                .insert(key, IndexValue { file_size })
229                .await;
230            total_size += i64::from(file_size);
231            total_keys += 1;
232        }
233        // The metrics is a signed int gauge so we can updates it finally.
234        CACHE_BYTES.with_label_values(&[FILE_TYPE]).add(total_size);
235
236        // Run all pending tasks of the moka cache so that the cache size is updated
237        // and the eviction policy is applied.
238        self.memory_index.run_pending_tasks().await;
239
240        info!(
241            "Recovered file cache, num_keys: {}, num_bytes: {}, total weight: {}, cost: {:?}",
242            total_keys,
243            total_size,
244            self.memory_index.weighted_size(),
245            now.elapsed()
246        );
247        Ok(())
248    }
249
250    /// Recovers the index from local store.
251    pub(crate) async fn recover(self: &Arc<Self>, sync: bool) {
252        let moved_self = self.clone();
253        let handle = tokio::spawn(async move {
254            if let Err(err) = moved_self.recover_inner().await {
255                error!(err; "Failed to recover file cache.")
256            }
257        });
258
259        if sync {
260            let _ = handle.await;
261        }
262    }
263
264    /// Returns the cache file path for the key.
265    pub(crate) fn cache_file_path(&self, key: IndexKey) -> String {
266        cache_file_path(FILE_DIR, key)
267    }
268
269    /// Returns the local store of the file cache.
270    pub(crate) fn local_store(&self) -> ObjectStore {
271        self.local_store.clone()
272    }
273
274    /// Get the parquet metadata in file cache.
275    /// If the file is not in the cache or fail to load metadata, return None.
276    pub(crate) async fn get_parquet_meta_data(&self, key: IndexKey) -> Option<ParquetMetaData> {
277        // Check if file cache contains the key
278        if let Some(index_value) = self.memory_index.get(&key).await {
279            // Load metadata from file cache
280            let local_store = self.local_store();
281            let file_path = self.cache_file_path(key);
282            let file_size = index_value.file_size as u64;
283            let metadata_loader = MetadataLoader::new(local_store, &file_path, file_size);
284
285            match metadata_loader.load().await {
286                Ok(metadata) => {
287                    CACHE_HIT.with_label_values(&[FILE_TYPE]).inc();
288                    Some(metadata)
289                }
290                Err(e) => {
291                    if !e.is_object_not_found() {
292                        warn!(
293                            e; "Failed to get parquet metadata for key {:?}",
294                            key
295                        );
296                    }
297                    // We removes the file from the index.
298                    self.memory_index.remove(&key).await;
299                    CACHE_MISS.with_label_values(&[FILE_TYPE]).inc();
300                    None
301                }
302            }
303        } else {
304            CACHE_MISS.with_label_values(&[FILE_TYPE]).inc();
305            None
306        }
307    }
308
309    async fn get_reader(&self, file_path: &str) -> object_store::Result<Option<Reader>> {
310        if self.local_store.exists(file_path).await? {
311            Ok(Some(self.local_store.reader(file_path).await?))
312        } else {
313            Ok(None)
314        }
315    }
316
317    /// Checks if the key is in the file cache.
318    #[cfg(test)]
319    pub(crate) fn contains_key(&self, key: &IndexKey) -> bool {
320        self.memory_index.contains_key(key)
321    }
322}
323
324/// Key of file cache index.
325#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
326pub(crate) struct IndexKey {
327    pub region_id: RegionId,
328    pub file_id: FileId,
329    pub file_type: FileType,
330}
331
332impl IndexKey {
333    /// Creates a new index key.
334    pub fn new(region_id: RegionId, file_id: FileId, file_type: FileType) -> IndexKey {
335        IndexKey {
336            region_id,
337            file_id,
338            file_type,
339        }
340    }
341}
342
343impl fmt::Display for IndexKey {
344    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
345        write!(
346            f,
347            "{}.{}.{}",
348            self.region_id.as_u64(),
349            self.file_id,
350            self.file_type.as_str()
351        )
352    }
353}
354
355/// Type of the file.
356#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
357pub enum FileType {
358    /// Parquet file.
359    Parquet,
360    /// Puffin file.
361    Puffin,
362}
363
364impl FileType {
365    /// Parses the file type from string.
366    fn parse(s: &str) -> Option<FileType> {
367        match s {
368            "parquet" => Some(FileType::Parquet),
369            "puffin" => Some(FileType::Puffin),
370            _ => None,
371        }
372    }
373
374    /// Converts the file type to string.
375    fn as_str(&self) -> &'static str {
376        match self {
377            FileType::Parquet => "parquet",
378            FileType::Puffin => "puffin",
379        }
380    }
381}
382
383/// An entity that describes the file in the file cache.
384///
385/// It should only keep minimal information needed by the cache.
386#[derive(Debug, Clone)]
387pub(crate) struct IndexValue {
388    /// Size of the file in bytes.
389    pub(crate) file_size: u32,
390}
391
392/// Generates the path to the cached file.
393///
394/// The file name format is `{region_id}.{file_id}.{file_type}`
395fn cache_file_path(cache_file_dir: &str, key: IndexKey) -> String {
396    join_path(cache_file_dir, &key.to_string())
397}
398
399/// Parse index key from the file name.
400fn parse_index_key(name: &str) -> Option<IndexKey> {
401    let mut split = name.splitn(3, '.');
402    let region_id = split.next().and_then(|s| {
403        let id = s.parse::<u64>().ok()?;
404        Some(RegionId::from_u64(id))
405    })?;
406    let file_id = split.next().and_then(|s| FileId::parse_str(s).ok())?;
407    let file_type = split.next().and_then(FileType::parse)?;
408
409    Some(IndexKey::new(region_id, file_id, file_type))
410}
411
412#[cfg(test)]
413mod tests {
414    use common_test_util::temp_dir::create_temp_dir;
415    use object_store::services::Fs;
416
417    use super::*;
418
419    fn new_fs_store(path: &str) -> ObjectStore {
420        let builder = Fs::default().root(path);
421        ObjectStore::new(builder).unwrap().finish()
422    }
423
424    #[tokio::test]
425    async fn test_file_cache_ttl() {
426        let dir = create_temp_dir("");
427        let local_store = new_fs_store(dir.path().to_str().unwrap());
428
429        let cache = FileCache::new(
430            local_store.clone(),
431            ReadableSize::mb(10),
432            Some(Duration::from_millis(10)),
433        );
434        let region_id = RegionId::new(2000, 0);
435        let file_id = FileId::random();
436        let key = IndexKey::new(region_id, file_id, FileType::Parquet);
437        let file_path = cache.cache_file_path(key);
438
439        // Get an empty file.
440        assert!(cache.reader(key).await.is_none());
441
442        // Write a file.
443        local_store
444            .write(&file_path, b"hello".as_slice())
445            .await
446            .unwrap();
447
448        // Add to the cache.
449        cache
450            .put(
451                IndexKey::new(region_id, file_id, FileType::Parquet),
452                IndexValue { file_size: 5 },
453            )
454            .await;
455
456        let exist = cache.reader(key).await;
457        assert!(exist.is_some());
458        tokio::time::sleep(Duration::from_millis(15)).await;
459        cache.memory_index.run_pending_tasks().await;
460        let non = cache.reader(key).await;
461        assert!(non.is_none());
462    }
463
464    #[tokio::test]
465    async fn test_file_cache_basic() {
466        let dir = create_temp_dir("");
467        let local_store = new_fs_store(dir.path().to_str().unwrap());
468
469        let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None);
470        let region_id = RegionId::new(2000, 0);
471        let file_id = FileId::random();
472        let key = IndexKey::new(region_id, file_id, FileType::Parquet);
473        let file_path = cache.cache_file_path(key);
474
475        // Get an empty file.
476        assert!(cache.reader(key).await.is_none());
477
478        // Write a file.
479        local_store
480            .write(&file_path, b"hello".as_slice())
481            .await
482            .unwrap();
483        // Add to the cache.
484        cache
485            .put(
486                IndexKey::new(region_id, file_id, FileType::Parquet),
487                IndexValue { file_size: 5 },
488            )
489            .await;
490
491        // Read file content.
492        let reader = cache.reader(key).await.unwrap();
493        let buf = reader.read(..).await.unwrap().to_vec();
494        assert_eq!("hello", String::from_utf8(buf).unwrap());
495
496        // Get weighted size.
497        cache.memory_index.run_pending_tasks().await;
498        assert_eq!(5, cache.memory_index.weighted_size());
499
500        // Remove the file.
501        cache.remove(key).await;
502        assert!(cache.reader(key).await.is_none());
503
504        // Ensure all pending tasks of the moka cache is done before assertion.
505        cache.memory_index.run_pending_tasks().await;
506
507        // The file also not exists.
508        assert!(!local_store.exists(&file_path).await.unwrap());
509        assert_eq!(0, cache.memory_index.weighted_size());
510    }
511
512    #[tokio::test]
513    async fn test_file_cache_file_removed() {
514        let dir = create_temp_dir("");
515        let local_store = new_fs_store(dir.path().to_str().unwrap());
516
517        let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None);
518        let region_id = RegionId::new(2000, 0);
519        let file_id = FileId::random();
520        let key = IndexKey::new(region_id, file_id, FileType::Parquet);
521        let file_path = cache.cache_file_path(key);
522
523        // Write a file.
524        local_store
525            .write(&file_path, b"hello".as_slice())
526            .await
527            .unwrap();
528        // Add to the cache.
529        cache
530            .put(
531                IndexKey::new(region_id, file_id, FileType::Parquet),
532                IndexValue { file_size: 5 },
533            )
534            .await;
535
536        // Remove the file but keep the index.
537        local_store.delete(&file_path).await.unwrap();
538
539        // Reader is none.
540        assert!(cache.reader(key).await.is_none());
541        // Key is removed.
542        assert!(!cache.memory_index.contains_key(&key));
543    }
544
545    #[tokio::test]
546    async fn test_file_cache_recover() {
547        let dir = create_temp_dir("");
548        let local_store = new_fs_store(dir.path().to_str().unwrap());
549        let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None);
550
551        let region_id = RegionId::new(2000, 0);
552        let file_type = FileType::Parquet;
553        // Write N files.
554        let file_ids: Vec<_> = (0..10).map(|_| FileId::random()).collect();
555        let mut total_size = 0;
556        for (i, file_id) in file_ids.iter().enumerate() {
557            let key = IndexKey::new(region_id, *file_id, file_type);
558            let file_path = cache.cache_file_path(key);
559            let bytes = i.to_string().into_bytes();
560            local_store.write(&file_path, bytes.clone()).await.unwrap();
561
562            // Add to the cache.
563            cache
564                .put(
565                    IndexKey::new(region_id, *file_id, file_type),
566                    IndexValue {
567                        file_size: bytes.len() as u32,
568                    },
569                )
570                .await;
571            total_size += bytes.len();
572        }
573
574        // Recover the cache.
575        let cache = Arc::new(FileCache::new(
576            local_store.clone(),
577            ReadableSize::mb(10),
578            None,
579        ));
580        // No entry before recovery.
581        assert!(cache
582            .reader(IndexKey::new(region_id, file_ids[0], file_type))
583            .await
584            .is_none());
585        cache.recover(true).await;
586
587        // Check size.
588        cache.memory_index.run_pending_tasks().await;
589        assert_eq!(total_size, cache.memory_index.weighted_size() as usize);
590
591        for (i, file_id) in file_ids.iter().enumerate() {
592            let key = IndexKey::new(region_id, *file_id, file_type);
593            let reader = cache.reader(key).await.unwrap();
594            let buf = reader.read(..).await.unwrap().to_vec();
595            assert_eq!(i.to_string(), String::from_utf8(buf).unwrap());
596        }
597    }
598
599    #[tokio::test]
600    async fn test_file_cache_read_ranges() {
601        let dir = create_temp_dir("");
602        let local_store = new_fs_store(dir.path().to_str().unwrap());
603        let file_cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None);
604        let region_id = RegionId::new(2000, 0);
605        let file_id = FileId::random();
606        let key = IndexKey::new(region_id, file_id, FileType::Parquet);
607        let file_path = file_cache.cache_file_path(key);
608        // Write a file.
609        let data = b"hello greptime database";
610        local_store
611            .write(&file_path, data.as_slice())
612            .await
613            .unwrap();
614        // Add to the cache.
615        file_cache.put(key, IndexValue { file_size: 5 }).await;
616        // Ranges
617        let ranges = vec![0..5, 6..10, 15..19, 0..data.len() as u64];
618        let bytes = file_cache.read_ranges(key, &ranges).await.unwrap();
619
620        assert_eq!(4, bytes.len());
621        assert_eq!(b"hello", bytes[0].as_ref());
622        assert_eq!(b"grep", bytes[1].as_ref());
623        assert_eq!(b"data", bytes[2].as_ref());
624        assert_eq!(data, bytes[3].as_ref());
625    }
626
627    #[test]
628    fn test_cache_file_path() {
629        let file_id = FileId::parse_str("3368731b-a556-42b8-a5df-9c31ce155095").unwrap();
630        assert_eq!(
631            "test_dir/5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet",
632            cache_file_path(
633                "test_dir",
634                IndexKey::new(RegionId::new(1234, 5), file_id, FileType::Parquet)
635            )
636        );
637        assert_eq!(
638            "test_dir/5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet",
639            cache_file_path(
640                "test_dir/",
641                IndexKey::new(RegionId::new(1234, 5), file_id, FileType::Parquet)
642            )
643        );
644    }
645
646    #[test]
647    fn test_parse_file_name() {
648        let file_id = FileId::parse_str("3368731b-a556-42b8-a5df-9c31ce155095").unwrap();
649        let region_id = RegionId::new(1234, 5);
650        assert_eq!(
651            IndexKey::new(region_id, file_id, FileType::Parquet),
652            parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet").unwrap()
653        );
654        assert!(parse_index_key("").is_none());
655        assert!(parse_index_key(".").is_none());
656        assert!(parse_index_key("5299989643269").is_none());
657        assert!(parse_index_key("5299989643269.").is_none());
658        assert!(parse_index_key(".5299989643269").is_none());
659        assert!(parse_index_key("5299989643269.").is_none());
660        assert!(parse_index_key("5299989643269.3368731b-a556-42b8-a5df").is_none());
661        assert!(parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095").is_none());
662        assert!(
663            parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parque").is_none()
664        );
665        assert!(parse_index_key(
666            "5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet.puffin"
667        )
668        .is_none());
669    }
670}