mito2/cache/
file_cache.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A cache for files.
16
17use std::fmt;
18use std::ops::Range;
19use std::sync::Arc;
20use std::time::{Duration, Instant};
21
22use bytes::Bytes;
23use common_base::readable_size::ReadableSize;
24use common_telemetry::{debug, error, info, warn};
25use futures::{AsyncWriteExt, FutureExt, TryStreamExt};
26use moka::future::Cache;
27use moka::notification::RemovalCause;
28use moka::policy::EvictionPolicy;
29use object_store::util::join_path;
30use object_store::{ErrorKind, ObjectStore, Reader};
31use parquet::file::metadata::ParquetMetaData;
32use snafu::ResultExt;
33use store_api::storage::{FileId, RegionId};
34use tokio::sync::mpsc::UnboundedReceiver;
35
36use crate::access_layer::TempFileCleaner;
37use crate::cache::{FILE_TYPE, INDEX_TYPE};
38use crate::error::{self, OpenDalSnafu, Result};
39use crate::metrics::{
40    CACHE_BYTES, CACHE_HIT, CACHE_MISS, WRITE_CACHE_DOWNLOAD_BYTES_TOTAL,
41    WRITE_CACHE_DOWNLOAD_ELAPSED,
42};
43use crate::region::opener::RegionLoadCacheTask;
44use crate::sst::parquet::helper::fetch_byte_ranges;
45use crate::sst::parquet::metadata::MetadataLoader;
46
47/// Subdirectory of cached files for write.
48///
49/// This must contain three layers, corresponding to [`build_prometheus_metrics_layer`](object_store::layers::build_prometheus_metrics_layer).
50const FILE_DIR: &str = "cache/object/write/";
51
52/// Default percentage for index (puffin) cache (20% of total capacity).
53pub(crate) const DEFAULT_INDEX_CACHE_PERCENT: u8 = 20;
54
55/// Minimum capacity for each cache (512MB).
56const MIN_CACHE_CAPACITY: u64 = 512 * 1024 * 1024;
57
58/// A file cache manages files on local store and evict files based
59/// on size.
60#[derive(Debug)]
61pub(crate) struct FileCache {
62    /// Local store to cache files.
63    local_store: ObjectStore,
64    /// Index to track cached Parquet files.
65    parquet_index: Cache<IndexKey, IndexValue>,
66    /// Index to track cached Puffin files.
67    puffin_index: Cache<IndexKey, IndexValue>,
68    /// Capacity of the puffin (index) cache in bytes.
69    puffin_capacity: u64,
70}
71
72pub(crate) type FileCacheRef = Arc<FileCache>;
73
74impl FileCache {
75    /// Creates a new file cache.
76    pub(crate) fn new(
77        local_store: ObjectStore,
78        capacity: ReadableSize,
79        ttl: Option<Duration>,
80        index_cache_percent: Option<u8>,
81    ) -> FileCache {
82        // Validate and use the provided percent or default
83        let index_percent = index_cache_percent
84            .filter(|&percent| percent > 0 && percent < 100)
85            .unwrap_or(DEFAULT_INDEX_CACHE_PERCENT);
86        let total_capacity = capacity.as_bytes();
87
88        // Convert percent to ratio and calculate capacity for each cache
89        let index_ratio = index_percent as f64 / 100.0;
90        let puffin_capacity = (total_capacity as f64 * index_ratio) as u64;
91        let parquet_capacity = total_capacity - puffin_capacity;
92
93        // Ensure both capacities are at least 512MB
94        let puffin_capacity = puffin_capacity.max(MIN_CACHE_CAPACITY);
95        let parquet_capacity = parquet_capacity.max(MIN_CACHE_CAPACITY);
96
97        info!(
98            "Initializing file cache with index_percent: {}%, total_capacity: {}, parquet_capacity: {}, puffin_capacity: {}",
99            index_percent,
100            ReadableSize(total_capacity),
101            ReadableSize(parquet_capacity),
102            ReadableSize(puffin_capacity)
103        );
104
105        let parquet_index = Self::build_cache(local_store.clone(), parquet_capacity, ttl, "file");
106        let puffin_index = Self::build_cache(local_store.clone(), puffin_capacity, ttl, "index");
107
108        FileCache {
109            local_store,
110            parquet_index,
111            puffin_index,
112            puffin_capacity,
113        }
114    }
115
116    /// Builds a cache for a specific file type.
117    fn build_cache(
118        local_store: ObjectStore,
119        capacity: u64,
120        ttl: Option<Duration>,
121        label: &'static str,
122    ) -> Cache<IndexKey, IndexValue> {
123        let cache_store = local_store;
124        let mut builder = Cache::builder()
125            .eviction_policy(EvictionPolicy::lru())
126            .weigher(|_key, value: &IndexValue| -> u32 {
127                // We only measure space on local store.
128                value.file_size
129            })
130            .max_capacity(capacity)
131            .async_eviction_listener(move |key, value, cause| {
132                let store = cache_store.clone();
133                // Stores files under FILE_DIR.
134                let file_path = cache_file_path(FILE_DIR, *key);
135                async move {
136                    if let RemovalCause::Replaced = cause {
137                        // The cache is replaced by another file. This is unexpected, we don't remove the same
138                        // file but updates the metrics as the file is already replaced by users.
139                        CACHE_BYTES.with_label_values(&[label]).sub(value.file_size.into());
140                        warn!("Replace existing cache {} for region {} unexpectedly", file_path, key.region_id);
141                        return;
142                    }
143
144                    match store.delete(&file_path).await {
145                        Ok(()) => {
146                            CACHE_BYTES.with_label_values(&[label]).sub(value.file_size.into());
147                        }
148                        Err(e) => {
149                            warn!(e; "Failed to delete cached file {} for region {}", file_path, key.region_id);
150                        }
151                    }
152                }
153                .boxed()
154            });
155        if let Some(ttl) = ttl {
156            builder = builder.time_to_idle(ttl);
157        }
158        builder.build()
159    }
160
161    /// Returns the appropriate memory index for the given file type.
162    fn memory_index(&self, file_type: FileType) -> &Cache<IndexKey, IndexValue> {
163        match file_type {
164            FileType::Parquet => &self.parquet_index,
165            FileType::Puffin => &self.puffin_index,
166        }
167    }
168
169    /// Puts a file into the cache index.
170    ///
171    /// The `WriteCache` should ensure the file is in the correct path.
172    pub(crate) async fn put(&self, key: IndexKey, value: IndexValue) {
173        CACHE_BYTES
174            .with_label_values(&[key.file_type.metric_label()])
175            .add(value.file_size.into());
176        let index = self.memory_index(key.file_type);
177        index.insert(key, value).await;
178
179        // Since files are large items, we run the pending tasks immediately.
180        index.run_pending_tasks().await;
181    }
182
183    pub(crate) async fn get(&self, key: IndexKey) -> Option<IndexValue> {
184        self.memory_index(key.file_type).get(&key).await
185    }
186
187    /// Reads a file from the cache.
188    #[allow(unused)]
189    pub(crate) async fn reader(&self, key: IndexKey) -> Option<Reader> {
190        // We must use `get()` to update the estimator of the cache.
191        // See https://docs.rs/moka/latest/moka/future/struct.Cache.html#method.contains_key
192        let index = self.memory_index(key.file_type);
193        if index.get(&key).await.is_none() {
194            CACHE_MISS
195                .with_label_values(&[key.file_type.metric_label()])
196                .inc();
197            return None;
198        }
199
200        let file_path = self.cache_file_path(key);
201        match self.get_reader(&file_path).await {
202            Ok(Some(reader)) => {
203                CACHE_HIT
204                    .with_label_values(&[key.file_type.metric_label()])
205                    .inc();
206                return Some(reader);
207            }
208            Err(e) => {
209                if e.kind() != ErrorKind::NotFound {
210                    warn!(e; "Failed to get file for key {:?}", key);
211                }
212            }
213            Ok(None) => {}
214        }
215
216        // We removes the file from the index.
217        index.remove(&key).await;
218        CACHE_MISS
219            .with_label_values(&[key.file_type.metric_label()])
220            .inc();
221        None
222    }
223
224    /// Reads ranges from the cache.
225    pub(crate) async fn read_ranges(
226        &self,
227        key: IndexKey,
228        ranges: &[Range<u64>],
229    ) -> Option<Vec<Bytes>> {
230        let index = self.memory_index(key.file_type);
231        if index.get(&key).await.is_none() {
232            CACHE_MISS
233                .with_label_values(&[key.file_type.metric_label()])
234                .inc();
235            return None;
236        }
237
238        let file_path = self.cache_file_path(key);
239        // In most cases, it will use blocking read,
240        // because FileCache is normally based on local file system, which supports blocking read.
241        let bytes_result = fetch_byte_ranges(&file_path, self.local_store.clone(), ranges).await;
242        match bytes_result {
243            Ok(bytes) => {
244                CACHE_HIT
245                    .with_label_values(&[key.file_type.metric_label()])
246                    .inc();
247                Some(bytes)
248            }
249            Err(e) => {
250                if e.kind() != ErrorKind::NotFound {
251                    warn!(e; "Failed to get file for key {:?}", key);
252                }
253
254                // We removes the file from the index.
255                index.remove(&key).await;
256                CACHE_MISS
257                    .with_label_values(&[key.file_type.metric_label()])
258                    .inc();
259                None
260            }
261        }
262    }
263
264    /// Removes a file from the cache explicitly.
265    /// It always tries to remove the file from the local store because we may not have the file
266    /// in the memory index if upload is failed.
267    pub(crate) async fn remove(&self, key: IndexKey) {
268        let file_path = self.cache_file_path(key);
269        self.memory_index(key.file_type).remove(&key).await;
270        // Always delete the file from the local store.
271        if let Err(e) = self.local_store.delete(&file_path).await {
272            warn!(e; "Failed to delete a cached file {}", file_path);
273        }
274    }
275
276    async fn recover_inner(&self) -> Result<()> {
277        let now = Instant::now();
278        let mut lister = self
279            .local_store
280            .lister_with(FILE_DIR)
281            .await
282            .context(OpenDalSnafu)?;
283        // Use i64 for total_size to reduce the risk of overflow.
284        // It is possible that the total size of the cache is larger than i32::MAX.
285        let (mut total_size, mut total_keys) = (0i64, 0);
286        let (mut parquet_size, mut puffin_size) = (0i64, 0i64);
287        while let Some(entry) = lister.try_next().await.context(OpenDalSnafu)? {
288            let meta = entry.metadata();
289            if !meta.is_file() {
290                continue;
291            }
292            let Some(key) = parse_index_key(entry.name()) else {
293                continue;
294            };
295
296            let meta = self
297                .local_store
298                .stat(entry.path())
299                .await
300                .context(OpenDalSnafu)?;
301            let file_size = meta.content_length() as u32;
302            let index = self.memory_index(key.file_type);
303            index.insert(key, IndexValue { file_size }).await;
304            let size = i64::from(file_size);
305            total_size += size;
306            total_keys += 1;
307
308            // Track sizes separately for each file type
309            match key.file_type {
310                FileType::Parquet => parquet_size += size,
311                FileType::Puffin => puffin_size += size,
312            }
313        }
314        // The metrics is a signed int gauge so we can updates it finally.
315        CACHE_BYTES
316            .with_label_values(&[FILE_TYPE])
317            .add(parquet_size);
318        CACHE_BYTES
319            .with_label_values(&[INDEX_TYPE])
320            .add(puffin_size);
321
322        // Run all pending tasks of the moka cache so that the cache size is updated
323        // and the eviction policy is applied.
324        self.parquet_index.run_pending_tasks().await;
325        self.puffin_index.run_pending_tasks().await;
326
327        let parquet_weight = self.parquet_index.weighted_size();
328        let parquet_count = self.parquet_index.entry_count();
329        let puffin_weight = self.puffin_index.weighted_size();
330        let puffin_count = self.puffin_index.entry_count();
331        info!(
332            "Recovered file cache, num_keys: {}, num_bytes: {}, parquet(count: {}, weight: {}), puffin(count: {}, weight: {}), cost: {:?}",
333            total_keys,
334            total_size,
335            parquet_count,
336            parquet_weight,
337            puffin_count,
338            puffin_weight,
339            now.elapsed()
340        );
341        Ok(())
342    }
343
344    /// Recovers the index from local store.
345    ///
346    /// If `task_receiver` is provided, spawns a background task after recovery
347    /// to process `RegionLoadCacheTask` messages for loading files into the cache.
348    pub(crate) async fn recover(
349        self: &Arc<Self>,
350        sync: bool,
351        task_receiver: Option<UnboundedReceiver<RegionLoadCacheTask>>,
352    ) {
353        let moved_self = self.clone();
354        let handle = tokio::spawn(async move {
355            if let Err(err) = moved_self.recover_inner().await {
356                error!(err; "Failed to recover file cache.")
357            }
358
359            // Spawns background task to process region load cache tasks after recovery.
360            // So it won't block the recovery when `sync` is true.
361            if let Some(mut receiver) = task_receiver {
362                let cache_ref = moved_self.clone();
363                info!("Spawning background task for processing region load cache tasks");
364                tokio::spawn(async move {
365                    while let Some(task) = receiver.recv().await {
366                        let file_cache = cache_ref.clone();
367                        task.fill_cache(file_cache).await;
368                    }
369                    info!("Background task for processing region load cache tasks stopped");
370                });
371            }
372        });
373
374        if sync {
375            let _ = handle.await;
376        }
377    }
378
379    /// Returns the cache file path for the key.
380    pub(crate) fn cache_file_path(&self, key: IndexKey) -> String {
381        cache_file_path(FILE_DIR, key)
382    }
383
384    /// Returns the local store of the file cache.
385    pub(crate) fn local_store(&self) -> ObjectStore {
386        self.local_store.clone()
387    }
388
389    /// Get the parquet metadata in file cache.
390    /// If the file is not in the cache or fail to load metadata, return None.
391    pub(crate) async fn get_parquet_meta_data(&self, key: IndexKey) -> Option<ParquetMetaData> {
392        // Check if file cache contains the key
393        if let Some(index_value) = self.parquet_index.get(&key).await {
394            // Load metadata from file cache
395            let local_store = self.local_store();
396            let file_path = self.cache_file_path(key);
397            let file_size = index_value.file_size as u64;
398            let metadata_loader = MetadataLoader::new(local_store, &file_path, file_size);
399
400            match metadata_loader.load().await {
401                Ok(metadata) => {
402                    CACHE_HIT
403                        .with_label_values(&[key.file_type.metric_label()])
404                        .inc();
405                    Some(metadata)
406                }
407                Err(e) => {
408                    if !e.is_object_not_found() {
409                        warn!(
410                            e; "Failed to get parquet metadata for key {:?}",
411                            key
412                        );
413                    }
414                    // We removes the file from the index.
415                    self.parquet_index.remove(&key).await;
416                    CACHE_MISS
417                        .with_label_values(&[key.file_type.metric_label()])
418                        .inc();
419                    None
420                }
421            }
422        } else {
423            CACHE_MISS
424                .with_label_values(&[key.file_type.metric_label()])
425                .inc();
426            None
427        }
428    }
429
430    async fn get_reader(&self, file_path: &str) -> object_store::Result<Option<Reader>> {
431        if self.local_store.exists(file_path).await? {
432            Ok(Some(self.local_store.reader(file_path).await?))
433        } else {
434            Ok(None)
435        }
436    }
437
438    /// Checks if the key is in the file cache.
439    pub(crate) fn contains_key(&self, key: &IndexKey) -> bool {
440        self.memory_index(key.file_type).contains_key(key)
441    }
442
443    /// Returns the capacity of the puffin (index) cache in bytes.
444    pub(crate) fn puffin_cache_capacity(&self) -> u64 {
445        self.puffin_capacity
446    }
447
448    /// Returns the current weighted size (used bytes) of the puffin (index) cache.
449    pub(crate) fn puffin_cache_size(&self) -> u64 {
450        self.puffin_index.weighted_size()
451    }
452
453    /// Downloads a file in `remote_path` from the remote object store to the local cache
454    /// (specified by `index_key`).
455    pub(crate) async fn download(
456        &self,
457        index_key: IndexKey,
458        remote_path: &str,
459        remote_store: &ObjectStore,
460        file_size: u64,
461    ) -> Result<()> {
462        if let Err(e) = self
463            .download_without_cleaning(index_key, remote_path, remote_store, file_size)
464            .await
465        {
466            let filename = index_key.to_string();
467            TempFileCleaner::clean_atomic_dir_files(&self.local_store, &[&filename]).await;
468
469            return Err(e);
470        }
471        Ok(())
472    }
473
474    async fn download_without_cleaning(
475        &self,
476        index_key: IndexKey,
477        remote_path: &str,
478        remote_store: &ObjectStore,
479        file_size: u64,
480    ) -> Result<()> {
481        const DOWNLOAD_READER_CONCURRENCY: usize = 8;
482        const DOWNLOAD_READER_CHUNK_SIZE: ReadableSize = ReadableSize::mb(8);
483
484        let file_type = index_key.file_type;
485        let timer = WRITE_CACHE_DOWNLOAD_ELAPSED
486            .with_label_values(&[match file_type {
487                FileType::Parquet => "download_parquet",
488                FileType::Puffin => "download_puffin",
489            }])
490            .start_timer();
491
492        let reader = remote_store
493            .reader_with(remote_path)
494            .concurrent(DOWNLOAD_READER_CONCURRENCY)
495            .chunk(DOWNLOAD_READER_CHUNK_SIZE.as_bytes() as usize)
496            .await
497            .context(error::OpenDalSnafu)?
498            .into_futures_async_read(0..file_size)
499            .await
500            .context(error::OpenDalSnafu)?;
501
502        let cache_path = self.cache_file_path(index_key);
503        let mut writer = self
504            .local_store
505            .writer(&cache_path)
506            .await
507            .context(error::OpenDalSnafu)?
508            .into_futures_async_write();
509
510        let region_id = index_key.region_id;
511        let file_id = index_key.file_id;
512        let bytes_written =
513            futures::io::copy(reader, &mut writer)
514                .await
515                .context(error::DownloadSnafu {
516                    region_id,
517                    file_id,
518                    file_type,
519                })?;
520        writer.close().await.context(error::DownloadSnafu {
521            region_id,
522            file_id,
523            file_type,
524        })?;
525
526        WRITE_CACHE_DOWNLOAD_BYTES_TOTAL.inc_by(bytes_written);
527
528        let elapsed = timer.stop_and_record();
529        debug!(
530            "Successfully download file '{}' to local '{}', file size: {}, region: {}, cost: {:?}s",
531            remote_path, cache_path, bytes_written, region_id, elapsed,
532        );
533
534        let index_value = IndexValue {
535            file_size: bytes_written as _,
536        };
537        self.put(index_key, index_value).await;
538        Ok(())
539    }
540}
541
542/// Key of file cache index.
543#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
544pub(crate) struct IndexKey {
545    pub region_id: RegionId,
546    pub file_id: FileId,
547    pub file_type: FileType,
548}
549
550impl IndexKey {
551    /// Creates a new index key.
552    pub fn new(region_id: RegionId, file_id: FileId, file_type: FileType) -> IndexKey {
553        IndexKey {
554            region_id,
555            file_id,
556            file_type,
557        }
558    }
559}
560
561impl fmt::Display for IndexKey {
562    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
563        write!(
564            f,
565            "{}.{}.{}",
566            self.region_id.as_u64(),
567            self.file_id,
568            self.file_type.as_str()
569        )
570    }
571}
572
573/// Type of the file.
574#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
575pub enum FileType {
576    /// Parquet file.
577    Parquet,
578    /// Puffin file.
579    Puffin,
580}
581
582impl FileType {
583    /// Parses the file type from string.
584    fn parse(s: &str) -> Option<FileType> {
585        match s {
586            "parquet" => Some(FileType::Parquet),
587            "puffin" => Some(FileType::Puffin),
588            _ => None,
589        }
590    }
591
592    /// Converts the file type to string.
593    fn as_str(&self) -> &'static str {
594        match self {
595            FileType::Parquet => "parquet",
596            FileType::Puffin => "puffin",
597        }
598    }
599
600    /// Returns the metric label for this file type.
601    fn metric_label(&self) -> &'static str {
602        match self {
603            FileType::Parquet => FILE_TYPE,
604            FileType::Puffin => INDEX_TYPE,
605        }
606    }
607}
608
609/// An entity that describes the file in the file cache.
610///
611/// It should only keep minimal information needed by the cache.
612#[derive(Debug, Clone)]
613pub(crate) struct IndexValue {
614    /// Size of the file in bytes.
615    pub(crate) file_size: u32,
616}
617
618/// Generates the path to the cached file.
619///
620/// The file name format is `{region_id}.{file_id}.{file_type}`
621fn cache_file_path(cache_file_dir: &str, key: IndexKey) -> String {
622    join_path(cache_file_dir, &key.to_string())
623}
624
625/// Parse index key from the file name.
626fn parse_index_key(name: &str) -> Option<IndexKey> {
627    let mut split = name.splitn(3, '.');
628    let region_id = split.next().and_then(|s| {
629        let id = s.parse::<u64>().ok()?;
630        Some(RegionId::from_u64(id))
631    })?;
632    let file_id = split.next().and_then(|s| FileId::parse_str(s).ok())?;
633    let file_type = split.next().and_then(FileType::parse)?;
634
635    Some(IndexKey::new(region_id, file_id, file_type))
636}
637
638#[cfg(test)]
639mod tests {
640    use common_test_util::temp_dir::create_temp_dir;
641    use object_store::services::Fs;
642
643    use super::*;
644
645    fn new_fs_store(path: &str) -> ObjectStore {
646        let builder = Fs::default().root(path);
647        ObjectStore::new(builder).unwrap().finish()
648    }
649
650    #[tokio::test]
651    async fn test_file_cache_ttl() {
652        let dir = create_temp_dir("");
653        let local_store = new_fs_store(dir.path().to_str().unwrap());
654
655        let cache = FileCache::new(
656            local_store.clone(),
657            ReadableSize::mb(10),
658            Some(Duration::from_millis(10)),
659            None,
660        );
661        let region_id = RegionId::new(2000, 0);
662        let file_id = FileId::random();
663        let key = IndexKey::new(region_id, file_id, FileType::Parquet);
664        let file_path = cache.cache_file_path(key);
665
666        // Get an empty file.
667        assert!(cache.reader(key).await.is_none());
668
669        // Write a file.
670        local_store
671            .write(&file_path, b"hello".as_slice())
672            .await
673            .unwrap();
674
675        // Add to the cache.
676        cache
677            .put(
678                IndexKey::new(region_id, file_id, FileType::Parquet),
679                IndexValue { file_size: 5 },
680            )
681            .await;
682
683        let exist = cache.reader(key).await;
684        assert!(exist.is_some());
685        tokio::time::sleep(Duration::from_millis(15)).await;
686        cache.parquet_index.run_pending_tasks().await;
687        let non = cache.reader(key).await;
688        assert!(non.is_none());
689    }
690
691    #[tokio::test]
692    async fn test_file_cache_basic() {
693        let dir = create_temp_dir("");
694        let local_store = new_fs_store(dir.path().to_str().unwrap());
695
696        let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None, None);
697        let region_id = RegionId::new(2000, 0);
698        let file_id = FileId::random();
699        let key = IndexKey::new(region_id, file_id, FileType::Parquet);
700        let file_path = cache.cache_file_path(key);
701
702        // Get an empty file.
703        assert!(cache.reader(key).await.is_none());
704
705        // Write a file.
706        local_store
707            .write(&file_path, b"hello".as_slice())
708            .await
709            .unwrap();
710        // Add to the cache.
711        cache
712            .put(
713                IndexKey::new(region_id, file_id, FileType::Parquet),
714                IndexValue { file_size: 5 },
715            )
716            .await;
717
718        // Read file content.
719        let reader = cache.reader(key).await.unwrap();
720        let buf = reader.read(..).await.unwrap().to_vec();
721        assert_eq!("hello", String::from_utf8(buf).unwrap());
722
723        // Get weighted size.
724        cache.parquet_index.run_pending_tasks().await;
725        assert_eq!(5, cache.parquet_index.weighted_size());
726
727        // Remove the file.
728        cache.remove(key).await;
729        assert!(cache.reader(key).await.is_none());
730
731        // Ensure all pending tasks of the moka cache is done before assertion.
732        cache.parquet_index.run_pending_tasks().await;
733
734        // The file also not exists.
735        assert!(!local_store.exists(&file_path).await.unwrap());
736        assert_eq!(0, cache.parquet_index.weighted_size());
737    }
738
739    #[tokio::test]
740    async fn test_file_cache_file_removed() {
741        let dir = create_temp_dir("");
742        let local_store = new_fs_store(dir.path().to_str().unwrap());
743
744        let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None, None);
745        let region_id = RegionId::new(2000, 0);
746        let file_id = FileId::random();
747        let key = IndexKey::new(region_id, file_id, FileType::Parquet);
748        let file_path = cache.cache_file_path(key);
749
750        // Write a file.
751        local_store
752            .write(&file_path, b"hello".as_slice())
753            .await
754            .unwrap();
755        // Add to the cache.
756        cache
757            .put(
758                IndexKey::new(region_id, file_id, FileType::Parquet),
759                IndexValue { file_size: 5 },
760            )
761            .await;
762
763        // Remove the file but keep the index.
764        local_store.delete(&file_path).await.unwrap();
765
766        // Reader is none.
767        assert!(cache.reader(key).await.is_none());
768        // Key is removed.
769        assert!(!cache.parquet_index.contains_key(&key));
770    }
771
772    #[tokio::test]
773    async fn test_file_cache_recover() {
774        let dir = create_temp_dir("");
775        let local_store = new_fs_store(dir.path().to_str().unwrap());
776        let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None, None);
777
778        let region_id = RegionId::new(2000, 0);
779        let file_type = FileType::Parquet;
780        // Write N files.
781        let file_ids: Vec<_> = (0..10).map(|_| FileId::random()).collect();
782        let mut total_size = 0;
783        for (i, file_id) in file_ids.iter().enumerate() {
784            let key = IndexKey::new(region_id, *file_id, file_type);
785            let file_path = cache.cache_file_path(key);
786            let bytes = i.to_string().into_bytes();
787            local_store.write(&file_path, bytes.clone()).await.unwrap();
788
789            // Add to the cache.
790            cache
791                .put(
792                    IndexKey::new(region_id, *file_id, file_type),
793                    IndexValue {
794                        file_size: bytes.len() as u32,
795                    },
796                )
797                .await;
798            total_size += bytes.len();
799        }
800
801        // Recover the cache.
802        let cache = Arc::new(FileCache::new(
803            local_store.clone(),
804            ReadableSize::mb(10),
805            None,
806            None,
807        ));
808        // No entry before recovery.
809        assert!(
810            cache
811                .reader(IndexKey::new(region_id, file_ids[0], file_type))
812                .await
813                .is_none()
814        );
815        cache.recover(true, None).await;
816
817        // Check size.
818        cache.parquet_index.run_pending_tasks().await;
819        assert_eq!(total_size, cache.parquet_index.weighted_size() as usize);
820
821        for (i, file_id) in file_ids.iter().enumerate() {
822            let key = IndexKey::new(region_id, *file_id, file_type);
823            let reader = cache.reader(key).await.unwrap();
824            let buf = reader.read(..).await.unwrap().to_vec();
825            assert_eq!(i.to_string(), String::from_utf8(buf).unwrap());
826        }
827    }
828
829    #[tokio::test]
830    async fn test_file_cache_read_ranges() {
831        let dir = create_temp_dir("");
832        let local_store = new_fs_store(dir.path().to_str().unwrap());
833        let file_cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None, None);
834        let region_id = RegionId::new(2000, 0);
835        let file_id = FileId::random();
836        let key = IndexKey::new(region_id, file_id, FileType::Parquet);
837        let file_path = file_cache.cache_file_path(key);
838        // Write a file.
839        let data = b"hello greptime database";
840        local_store
841            .write(&file_path, data.as_slice())
842            .await
843            .unwrap();
844        // Add to the cache.
845        file_cache.put(key, IndexValue { file_size: 5 }).await;
846        // Ranges
847        let ranges = vec![0..5, 6..10, 15..19, 0..data.len() as u64];
848        let bytes = file_cache.read_ranges(key, &ranges).await.unwrap();
849
850        assert_eq!(4, bytes.len());
851        assert_eq!(b"hello", bytes[0].as_ref());
852        assert_eq!(b"grep", bytes[1].as_ref());
853        assert_eq!(b"data", bytes[2].as_ref());
854        assert_eq!(data, bytes[3].as_ref());
855    }
856
857    #[test]
858    fn test_cache_file_path() {
859        let file_id = FileId::parse_str("3368731b-a556-42b8-a5df-9c31ce155095").unwrap();
860        assert_eq!(
861            "test_dir/5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet",
862            cache_file_path(
863                "test_dir",
864                IndexKey::new(RegionId::new(1234, 5), file_id, FileType::Parquet)
865            )
866        );
867        assert_eq!(
868            "test_dir/5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet",
869            cache_file_path(
870                "test_dir/",
871                IndexKey::new(RegionId::new(1234, 5), file_id, FileType::Parquet)
872            )
873        );
874    }
875
876    #[test]
877    fn test_parse_file_name() {
878        let file_id = FileId::parse_str("3368731b-a556-42b8-a5df-9c31ce155095").unwrap();
879        let region_id = RegionId::new(1234, 5);
880        assert_eq!(
881            IndexKey::new(region_id, file_id, FileType::Parquet),
882            parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet").unwrap()
883        );
884        assert!(parse_index_key("").is_none());
885        assert!(parse_index_key(".").is_none());
886        assert!(parse_index_key("5299989643269").is_none());
887        assert!(parse_index_key("5299989643269.").is_none());
888        assert!(parse_index_key(".5299989643269").is_none());
889        assert!(parse_index_key("5299989643269.").is_none());
890        assert!(parse_index_key("5299989643269.3368731b-a556-42b8-a5df").is_none());
891        assert!(parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095").is_none());
892        assert!(
893            parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parque").is_none()
894        );
895        assert!(
896            parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet.puffin")
897                .is_none()
898        );
899    }
900}