mito2/cache/
file_cache.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A cache for files.
16
17use std::fmt;
18use std::ops::Range;
19use std::sync::Arc;
20use std::time::{Duration, Instant};
21
22use bytes::Bytes;
23use common_base::readable_size::ReadableSize;
24use common_telemetry::{debug, error, info, warn};
25use futures::{AsyncWriteExt, FutureExt, TryStreamExt};
26use moka::future::Cache;
27use moka::notification::RemovalCause;
28use moka::policy::EvictionPolicy;
29use object_store::util::join_path;
30use object_store::{ErrorKind, ObjectStore, Reader};
31use parquet::file::metadata::ParquetMetaData;
32use snafu::ResultExt;
33use store_api::storage::{FileId, RegionId};
34use tokio::sync::mpsc::UnboundedReceiver;
35
36use crate::access_layer::TempFileCleaner;
37use crate::cache::{FILE_TYPE, INDEX_TYPE};
38use crate::error::{self, OpenDalSnafu, Result};
39use crate::metrics::{
40    CACHE_BYTES, CACHE_HIT, CACHE_MISS, WRITE_CACHE_DOWNLOAD_BYTES_TOTAL,
41    WRITE_CACHE_DOWNLOAD_ELAPSED,
42};
43use crate::region::opener::RegionLoadCacheTask;
44use crate::sst::parquet::helper::fetch_byte_ranges;
45use crate::sst::parquet::metadata::MetadataLoader;
46
47/// Subdirectory of cached files for write.
48///
49/// This must contain three layers, corresponding to [`build_prometheus_metrics_layer`](object_store::layers::build_prometheus_metrics_layer).
50const FILE_DIR: &str = "cache/object/write/";
51
52/// Default percentage for index (puffin) cache (20% of total capacity).
53pub(crate) const DEFAULT_INDEX_CACHE_PERCENT: u8 = 20;
54
55/// Minimum capacity for each cache (512MB).
56const MIN_CACHE_CAPACITY: u64 = 512 * 1024 * 1024;
57
58/// Inner struct for FileCache that can be used in spawned tasks.
59#[derive(Debug)]
60struct FileCacheInner {
61    /// Local store to cache files.
62    local_store: ObjectStore,
63    /// Index to track cached Parquet files.
64    parquet_index: Cache<IndexKey, IndexValue>,
65    /// Index to track cached Puffin files.
66    puffin_index: Cache<IndexKey, IndexValue>,
67}
68
69impl FileCacheInner {
70    /// Returns the appropriate memory index for the given file type.
71    fn memory_index(&self, file_type: FileType) -> &Cache<IndexKey, IndexValue> {
72        match file_type {
73            FileType::Parquet => &self.parquet_index,
74            FileType::Puffin { .. } => &self.puffin_index,
75        }
76    }
77
78    /// Returns the cache file path for the key.
79    fn cache_file_path(&self, key: IndexKey) -> String {
80        cache_file_path(FILE_DIR, key)
81    }
82
83    /// Puts a file into the cache index.
84    ///
85    /// The `WriteCache` should ensure the file is in the correct path.
86    async fn put(&self, key: IndexKey, value: IndexValue) {
87        CACHE_BYTES
88            .with_label_values(&[key.file_type.metric_label()])
89            .add(value.file_size.into());
90        let index = self.memory_index(key.file_type);
91        index.insert(key, value).await;
92
93        // Since files are large items, we run the pending tasks immediately.
94        index.run_pending_tasks().await;
95    }
96
97    /// Recovers the index from local store.
98    async fn recover(&self) -> Result<()> {
99        let now = Instant::now();
100        let mut lister = self
101            .local_store
102            .lister_with(FILE_DIR)
103            .await
104            .context(OpenDalSnafu)?;
105        // Use i64 for total_size to reduce the risk of overflow.
106        // It is possible that the total size of the cache is larger than i32::MAX.
107        let (mut total_size, mut total_keys) = (0i64, 0);
108        let (mut parquet_size, mut puffin_size) = (0i64, 0i64);
109        while let Some(entry) = lister.try_next().await.context(OpenDalSnafu)? {
110            let meta = entry.metadata();
111            if !meta.is_file() {
112                continue;
113            }
114            let Some(key) = parse_index_key(entry.name()) else {
115                continue;
116            };
117
118            let meta = self
119                .local_store
120                .stat(entry.path())
121                .await
122                .context(OpenDalSnafu)?;
123            let file_size = meta.content_length() as u32;
124            let index = self.memory_index(key.file_type);
125            index.insert(key, IndexValue { file_size }).await;
126            let size = i64::from(file_size);
127            total_size += size;
128            total_keys += 1;
129
130            // Track sizes separately for each file type
131            match key.file_type {
132                FileType::Parquet => parquet_size += size,
133                FileType::Puffin { .. } => puffin_size += size,
134            }
135        }
136        // The metrics is a signed int gauge so we can updates it finally.
137        CACHE_BYTES
138            .with_label_values(&[FILE_TYPE])
139            .add(parquet_size);
140        CACHE_BYTES
141            .with_label_values(&[INDEX_TYPE])
142            .add(puffin_size);
143
144        // Run all pending tasks of the moka cache so that the cache size is updated
145        // and the eviction policy is applied.
146        self.parquet_index.run_pending_tasks().await;
147        self.puffin_index.run_pending_tasks().await;
148
149        let parquet_weight = self.parquet_index.weighted_size();
150        let parquet_count = self.parquet_index.entry_count();
151        let puffin_weight = self.puffin_index.weighted_size();
152        let puffin_count = self.puffin_index.entry_count();
153        info!(
154            "Recovered file cache, num_keys: {}, num_bytes: {}, parquet(count: {}, weight: {}), puffin(count: {}, weight: {}), cost: {:?}",
155            total_keys,
156            total_size,
157            parquet_count,
158            parquet_weight,
159            puffin_count,
160            puffin_weight,
161            now.elapsed()
162        );
163        Ok(())
164    }
165
166    /// Downloads a file without cleaning up on error.
167    async fn download_without_cleaning(
168        &self,
169        index_key: IndexKey,
170        remote_path: &str,
171        remote_store: &ObjectStore,
172        file_size: u64,
173    ) -> Result<()> {
174        const DOWNLOAD_READER_CONCURRENCY: usize = 8;
175        const DOWNLOAD_READER_CHUNK_SIZE: ReadableSize = ReadableSize::mb(8);
176
177        let file_type = index_key.file_type;
178        let timer = WRITE_CACHE_DOWNLOAD_ELAPSED
179            .with_label_values(&[match file_type {
180                FileType::Parquet => "download_parquet",
181                FileType::Puffin { .. } => "download_puffin",
182            }])
183            .start_timer();
184
185        let reader = remote_store
186            .reader_with(remote_path)
187            .concurrent(DOWNLOAD_READER_CONCURRENCY)
188            .chunk(DOWNLOAD_READER_CHUNK_SIZE.as_bytes() as usize)
189            .await
190            .context(error::OpenDalSnafu)?
191            .into_futures_async_read(0..file_size)
192            .await
193            .context(error::OpenDalSnafu)?;
194
195        let cache_path = self.cache_file_path(index_key);
196        let mut writer = self
197            .local_store
198            .writer(&cache_path)
199            .await
200            .context(error::OpenDalSnafu)?
201            .into_futures_async_write();
202
203        let region_id = index_key.region_id;
204        let file_id = index_key.file_id;
205        let bytes_written =
206            futures::io::copy(reader, &mut writer)
207                .await
208                .context(error::DownloadSnafu {
209                    region_id,
210                    file_id,
211                    file_type,
212                })?;
213        writer.close().await.context(error::DownloadSnafu {
214            region_id,
215            file_id,
216            file_type,
217        })?;
218
219        WRITE_CACHE_DOWNLOAD_BYTES_TOTAL.inc_by(bytes_written);
220
221        let elapsed = timer.stop_and_record();
222        debug!(
223            "Successfully download file '{}' to local '{}', file size: {}, region: {}, cost: {:?}s",
224            remote_path, cache_path, bytes_written, region_id, elapsed,
225        );
226
227        let index_value = IndexValue {
228            file_size: bytes_written as _,
229        };
230        self.put(index_key, index_value).await;
231        Ok(())
232    }
233
234    /// Downloads a file from remote store to local cache.
235    async fn download(
236        &self,
237        index_key: IndexKey,
238        remote_path: &str,
239        remote_store: &ObjectStore,
240        file_size: u64,
241    ) -> Result<()> {
242        if let Err(e) = self
243            .download_without_cleaning(index_key, remote_path, remote_store, file_size)
244            .await
245        {
246            let filename = index_key.to_string();
247            TempFileCleaner::clean_atomic_dir_files(&self.local_store, &[&filename]).await;
248
249            return Err(e);
250        }
251
252        Ok(())
253    }
254}
255
256/// A file cache manages files on local store and evict files based
257/// on size.
258#[derive(Debug, Clone)]
259pub(crate) struct FileCache {
260    /// Inner cache state shared with background worker.
261    inner: Arc<FileCacheInner>,
262    /// Capacity of the puffin (index) cache in bytes.
263    puffin_capacity: u64,
264}
265
266pub(crate) type FileCacheRef = Arc<FileCache>;
267
268impl FileCache {
269    /// Creates a new file cache.
270    pub(crate) fn new(
271        local_store: ObjectStore,
272        capacity: ReadableSize,
273        ttl: Option<Duration>,
274        index_cache_percent: Option<u8>,
275    ) -> FileCache {
276        // Validate and use the provided percent or default
277        let index_percent = index_cache_percent
278            .filter(|&percent| percent > 0 && percent < 100)
279            .unwrap_or(DEFAULT_INDEX_CACHE_PERCENT);
280        let total_capacity = capacity.as_bytes();
281
282        // Convert percent to ratio and calculate capacity for each cache
283        let index_ratio = index_percent as f64 / 100.0;
284        let puffin_capacity = (total_capacity as f64 * index_ratio) as u64;
285        let parquet_capacity = total_capacity - puffin_capacity;
286
287        // Ensure both capacities are at least 512MB
288        let puffin_capacity = puffin_capacity.max(MIN_CACHE_CAPACITY);
289        let parquet_capacity = parquet_capacity.max(MIN_CACHE_CAPACITY);
290
291        info!(
292            "Initializing file cache with index_percent: {}%, total_capacity: {}, parquet_capacity: {}, puffin_capacity: {}",
293            index_percent,
294            ReadableSize(total_capacity),
295            ReadableSize(parquet_capacity),
296            ReadableSize(puffin_capacity)
297        );
298
299        let parquet_index = Self::build_cache(local_store.clone(), parquet_capacity, ttl, "file");
300        let puffin_index = Self::build_cache(local_store.clone(), puffin_capacity, ttl, "index");
301
302        // Create inner cache shared with background worker
303        let inner = Arc::new(FileCacheInner {
304            local_store,
305            parquet_index,
306            puffin_index,
307        });
308
309        FileCache {
310            inner,
311            puffin_capacity,
312        }
313    }
314
315    /// Builds a cache for a specific file type.
316    fn build_cache(
317        local_store: ObjectStore,
318        capacity: u64,
319        ttl: Option<Duration>,
320        label: &'static str,
321    ) -> Cache<IndexKey, IndexValue> {
322        let cache_store = local_store;
323        let mut builder = Cache::builder()
324            .eviction_policy(EvictionPolicy::lru())
325            .weigher(|_key, value: &IndexValue| -> u32 {
326                // We only measure space on local store.
327                value.file_size
328            })
329            .max_capacity(capacity)
330            .async_eviction_listener(move |key, value, cause| {
331                let store = cache_store.clone();
332                // Stores files under FILE_DIR.
333                let file_path = cache_file_path(FILE_DIR, *key);
334                async move {
335                    if let RemovalCause::Replaced = cause {
336                        // The cache is replaced by another file. This is unexpected, we don't remove the same
337                        // file but updates the metrics as the file is already replaced by users.
338                        CACHE_BYTES.with_label_values(&[label]).sub(value.file_size.into());
339                        // TODO(yingwen): Don't log warn later.
340                        warn!("Replace existing cache {} for region {} unexpectedly", file_path, key.region_id);
341                        return;
342                    }
343
344                    match store.delete(&file_path).await {
345                        Ok(()) => {
346                            CACHE_BYTES.with_label_values(&[label]).sub(value.file_size.into());
347                        }
348                        Err(e) => {
349                            warn!(e; "Failed to delete cached file {} for region {}", file_path, key.region_id);
350                        }
351                    }
352                }
353                .boxed()
354            });
355        if let Some(ttl) = ttl {
356            builder = builder.time_to_idle(ttl);
357        }
358        builder.build()
359    }
360
361    /// Puts a file into the cache index.
362    ///
363    /// The `WriteCache` should ensure the file is in the correct path.
364    pub(crate) async fn put(&self, key: IndexKey, value: IndexValue) {
365        self.inner.put(key, value).await
366    }
367
368    pub(crate) async fn get(&self, key: IndexKey) -> Option<IndexValue> {
369        self.inner.memory_index(key.file_type).get(&key).await
370    }
371
372    /// Reads a file from the cache.
373    #[allow(unused)]
374    pub(crate) async fn reader(&self, key: IndexKey) -> Option<Reader> {
375        // We must use `get()` to update the estimator of the cache.
376        // See https://docs.rs/moka/latest/moka/future/struct.Cache.html#method.contains_key
377        let index = self.inner.memory_index(key.file_type);
378        if index.get(&key).await.is_none() {
379            CACHE_MISS
380                .with_label_values(&[key.file_type.metric_label()])
381                .inc();
382            return None;
383        }
384
385        let file_path = self.inner.cache_file_path(key);
386        match self.get_reader(&file_path).await {
387            Ok(Some(reader)) => {
388                CACHE_HIT
389                    .with_label_values(&[key.file_type.metric_label()])
390                    .inc();
391                return Some(reader);
392            }
393            Err(e) => {
394                if e.kind() != ErrorKind::NotFound {
395                    warn!(e; "Failed to get file for key {:?}", key);
396                }
397            }
398            Ok(None) => {}
399        }
400
401        // We removes the file from the index.
402        index.remove(&key).await;
403        CACHE_MISS
404            .with_label_values(&[key.file_type.metric_label()])
405            .inc();
406        None
407    }
408
409    /// Reads ranges from the cache.
410    pub(crate) async fn read_ranges(
411        &self,
412        key: IndexKey,
413        ranges: &[Range<u64>],
414    ) -> Option<Vec<Bytes>> {
415        let index = self.inner.memory_index(key.file_type);
416        if index.get(&key).await.is_none() {
417            CACHE_MISS
418                .with_label_values(&[key.file_type.metric_label()])
419                .inc();
420            return None;
421        }
422
423        let file_path = self.inner.cache_file_path(key);
424        // In most cases, it will use blocking read,
425        // because FileCache is normally based on local file system, which supports blocking read.
426        let bytes_result =
427            fetch_byte_ranges(&file_path, self.inner.local_store.clone(), ranges).await;
428        match bytes_result {
429            Ok(bytes) => {
430                CACHE_HIT
431                    .with_label_values(&[key.file_type.metric_label()])
432                    .inc();
433                Some(bytes)
434            }
435            Err(e) => {
436                if e.kind() != ErrorKind::NotFound {
437                    warn!(e; "Failed to get file for key {:?}", key);
438                }
439
440                // We removes the file from the index.
441                index.remove(&key).await;
442                CACHE_MISS
443                    .with_label_values(&[key.file_type.metric_label()])
444                    .inc();
445                None
446            }
447        }
448    }
449
450    /// Removes a file from the cache explicitly.
451    /// It always tries to remove the file from the local store because we may not have the file
452    /// in the memory index if upload is failed.
453    pub(crate) async fn remove(&self, key: IndexKey) {
454        let file_path = self.inner.cache_file_path(key);
455        self.inner.memory_index(key.file_type).remove(&key).await;
456        // Always delete the file from the local store.
457        if let Err(e) = self.inner.local_store.delete(&file_path).await {
458            warn!(e; "Failed to delete a cached file {}", file_path);
459        }
460    }
461
462    /// Recovers the index from local store.
463    ///
464    /// If `task_receiver` is provided, spawns a background task after recovery
465    /// to process `RegionLoadCacheTask` messages for loading files into the cache.
466    pub(crate) async fn recover(
467        &self,
468        sync: bool,
469        task_receiver: Option<UnboundedReceiver<RegionLoadCacheTask>>,
470    ) {
471        let moved_self = self.clone();
472        let handle = tokio::spawn(async move {
473            if let Err(err) = moved_self.inner.recover().await {
474                error!(err; "Failed to recover file cache.")
475            }
476
477            // Spawns background task to process region load cache tasks after recovery.
478            // So it won't block the recovery when `sync` is true.
479            if let Some(mut receiver) = task_receiver {
480                info!("Spawning background task for processing region load cache tasks");
481                tokio::spawn(async move {
482                    while let Some(task) = receiver.recv().await {
483                        task.fill_cache(&moved_self).await;
484                    }
485                    info!("Background task for processing region load cache tasks stopped");
486                });
487            }
488        });
489
490        if sync {
491            let _ = handle.await;
492        }
493    }
494
495    /// Returns the cache file path for the key.
496    pub(crate) fn cache_file_path(&self, key: IndexKey) -> String {
497        self.inner.cache_file_path(key)
498    }
499
500    /// Returns the local store of the file cache.
501    pub(crate) fn local_store(&self) -> ObjectStore {
502        self.inner.local_store.clone()
503    }
504
505    /// Get the parquet metadata in file cache.
506    /// If the file is not in the cache or fail to load metadata, return None.
507    pub(crate) async fn get_parquet_meta_data(&self, key: IndexKey) -> Option<ParquetMetaData> {
508        // Check if file cache contains the key
509        if let Some(index_value) = self.inner.parquet_index.get(&key).await {
510            // Load metadata from file cache
511            let local_store = self.local_store();
512            let file_path = self.inner.cache_file_path(key);
513            let file_size = index_value.file_size as u64;
514            let metadata_loader = MetadataLoader::new(local_store, &file_path, file_size);
515
516            match metadata_loader.load().await {
517                Ok(metadata) => {
518                    CACHE_HIT
519                        .with_label_values(&[key.file_type.metric_label()])
520                        .inc();
521                    Some(metadata)
522                }
523                Err(e) => {
524                    if !e.is_object_not_found() {
525                        warn!(
526                            e; "Failed to get parquet metadata for key {:?}",
527                            key
528                        );
529                    }
530                    // We removes the file from the index.
531                    self.inner.parquet_index.remove(&key).await;
532                    CACHE_MISS
533                        .with_label_values(&[key.file_type.metric_label()])
534                        .inc();
535                    None
536                }
537            }
538        } else {
539            CACHE_MISS
540                .with_label_values(&[key.file_type.metric_label()])
541                .inc();
542            None
543        }
544    }
545
546    async fn get_reader(&self, file_path: &str) -> object_store::Result<Option<Reader>> {
547        if self.inner.local_store.exists(file_path).await? {
548            Ok(Some(self.inner.local_store.reader(file_path).await?))
549        } else {
550            Ok(None)
551        }
552    }
553
554    /// Checks if the key is in the file cache.
555    pub(crate) fn contains_key(&self, key: &IndexKey) -> bool {
556        self.inner.memory_index(key.file_type).contains_key(key)
557    }
558
559    /// Returns the capacity of the puffin (index) cache in bytes.
560    pub(crate) fn puffin_cache_capacity(&self) -> u64 {
561        self.puffin_capacity
562    }
563
564    /// Returns the current weighted size (used bytes) of the puffin (index) cache.
565    pub(crate) fn puffin_cache_size(&self) -> u64 {
566        self.inner.puffin_index.weighted_size()
567    }
568
569    /// Downloads a file in `remote_path` from the remote object store to the local cache
570    /// (specified by `index_key`).
571    pub(crate) async fn download(
572        &self,
573        index_key: IndexKey,
574        remote_path: &str,
575        remote_store: &ObjectStore,
576        file_size: u64,
577    ) -> Result<()> {
578        self.inner
579            .download(index_key, remote_path, remote_store, file_size)
580            .await
581    }
582}
583
584/// Key of file cache index.
585#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
586pub struct IndexKey {
587    pub region_id: RegionId,
588    pub file_id: FileId,
589    pub file_type: FileType,
590}
591
592impl IndexKey {
593    /// Creates a new index key.
594    pub fn new(region_id: RegionId, file_id: FileId, file_type: FileType) -> IndexKey {
595        IndexKey {
596            region_id,
597            file_id,
598            file_type,
599        }
600    }
601}
602
603impl fmt::Display for IndexKey {
604    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
605        write!(
606            f,
607            "{}.{}.{}",
608            self.region_id.as_u64(),
609            self.file_id,
610            self.file_type
611        )
612    }
613}
614
615/// Type of the file.
616#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
617pub enum FileType {
618    /// Parquet file.
619    Parquet,
620    /// Puffin file.
621    Puffin(u64),
622}
623
624impl fmt::Display for FileType {
625    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
626        match self {
627            FileType::Parquet => write!(f, "parquet"),
628            FileType::Puffin(version) => write!(f, "{}.puffin", version),
629        }
630    }
631}
632
633impl FileType {
634    /// Parses the file type from string.
635    fn parse(s: &str) -> Option<FileType> {
636        match s {
637            "parquet" => Some(FileType::Parquet),
638            "puffin" => Some(FileType::Puffin(0)),
639            _ => {
640                // if post-fix with .puffin, try to parse the version
641                if let Some(version_str) = s.strip_suffix(".puffin") {
642                    let version = version_str.parse::<u64>().ok()?;
643                    Some(FileType::Puffin(version))
644                } else {
645                    None
646                }
647            }
648        }
649    }
650
651    /// Returns the metric label for this file type.
652    fn metric_label(&self) -> &'static str {
653        match self {
654            FileType::Parquet => FILE_TYPE,
655            FileType::Puffin(_) => INDEX_TYPE,
656        }
657    }
658}
659
660/// An entity that describes the file in the file cache.
661///
662/// It should only keep minimal information needed by the cache.
663#[derive(Debug, Clone)]
664pub(crate) struct IndexValue {
665    /// Size of the file in bytes.
666    pub(crate) file_size: u32,
667}
668
669/// Generates the path to the cached file.
670///
671/// The file name format is `{region_id}.{file_id}.{file_type}`
672fn cache_file_path(cache_file_dir: &str, key: IndexKey) -> String {
673    join_path(cache_file_dir, &key.to_string())
674}
675
676/// Parse index key from the file name.
677fn parse_index_key(name: &str) -> Option<IndexKey> {
678    let mut split = name.splitn(3, '.');
679    let region_id = split.next().and_then(|s| {
680        let id = s.parse::<u64>().ok()?;
681        Some(RegionId::from_u64(id))
682    })?;
683    let file_id = split.next().and_then(|s| FileId::parse_str(s).ok())?;
684    let file_type = split.next().and_then(FileType::parse)?;
685
686    Some(IndexKey::new(region_id, file_id, file_type))
687}
688
689#[cfg(test)]
690mod tests {
691    use common_test_util::temp_dir::create_temp_dir;
692    use object_store::services::Fs;
693
694    use super::*;
695
696    fn new_fs_store(path: &str) -> ObjectStore {
697        let builder = Fs::default().root(path);
698        ObjectStore::new(builder).unwrap().finish()
699    }
700
701    #[tokio::test]
702    async fn test_file_cache_ttl() {
703        let dir = create_temp_dir("");
704        let local_store = new_fs_store(dir.path().to_str().unwrap());
705
706        let cache = FileCache::new(
707            local_store.clone(),
708            ReadableSize::mb(10),
709            Some(Duration::from_millis(10)),
710            None,
711        );
712        let region_id = RegionId::new(2000, 0);
713        let file_id = FileId::random();
714        let key = IndexKey::new(region_id, file_id, FileType::Parquet);
715        let file_path = cache.cache_file_path(key);
716
717        // Get an empty file.
718        assert!(cache.reader(key).await.is_none());
719
720        // Write a file.
721        local_store
722            .write(&file_path, b"hello".as_slice())
723            .await
724            .unwrap();
725
726        // Add to the cache.
727        cache
728            .put(
729                IndexKey::new(region_id, file_id, FileType::Parquet),
730                IndexValue { file_size: 5 },
731            )
732            .await;
733
734        let exist = cache.reader(key).await;
735        assert!(exist.is_some());
736        tokio::time::sleep(Duration::from_millis(15)).await;
737        cache.inner.parquet_index.run_pending_tasks().await;
738        let non = cache.reader(key).await;
739        assert!(non.is_none());
740    }
741
742    #[tokio::test]
743    async fn test_file_cache_basic() {
744        let dir = create_temp_dir("");
745        let local_store = new_fs_store(dir.path().to_str().unwrap());
746
747        let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None, None);
748        let region_id = RegionId::new(2000, 0);
749        let file_id = FileId::random();
750        let key = IndexKey::new(region_id, file_id, FileType::Parquet);
751        let file_path = cache.cache_file_path(key);
752
753        // Get an empty file.
754        assert!(cache.reader(key).await.is_none());
755
756        // Write a file.
757        local_store
758            .write(&file_path, b"hello".as_slice())
759            .await
760            .unwrap();
761        // Add to the cache.
762        cache
763            .put(
764                IndexKey::new(region_id, file_id, FileType::Parquet),
765                IndexValue { file_size: 5 },
766            )
767            .await;
768
769        // Read file content.
770        let reader = cache.reader(key).await.unwrap();
771        let buf = reader.read(..).await.unwrap().to_vec();
772        assert_eq!("hello", String::from_utf8(buf).unwrap());
773
774        // Get weighted size.
775        cache.inner.parquet_index.run_pending_tasks().await;
776        assert_eq!(5, cache.inner.parquet_index.weighted_size());
777
778        // Remove the file.
779        cache.remove(key).await;
780        assert!(cache.reader(key).await.is_none());
781
782        // Ensure all pending tasks of the moka cache is done before assertion.
783        cache.inner.parquet_index.run_pending_tasks().await;
784
785        // The file also not exists.
786        assert!(!local_store.exists(&file_path).await.unwrap());
787        assert_eq!(0, cache.inner.parquet_index.weighted_size());
788    }
789
790    #[tokio::test]
791    async fn test_file_cache_file_removed() {
792        let dir = create_temp_dir("");
793        let local_store = new_fs_store(dir.path().to_str().unwrap());
794
795        let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None, None);
796        let region_id = RegionId::new(2000, 0);
797        let file_id = FileId::random();
798        let key = IndexKey::new(region_id, file_id, FileType::Parquet);
799        let file_path = cache.cache_file_path(key);
800
801        // Write a file.
802        local_store
803            .write(&file_path, b"hello".as_slice())
804            .await
805            .unwrap();
806        // Add to the cache.
807        cache
808            .put(
809                IndexKey::new(region_id, file_id, FileType::Parquet),
810                IndexValue { file_size: 5 },
811            )
812            .await;
813
814        // Remove the file but keep the index.
815        local_store.delete(&file_path).await.unwrap();
816
817        // Reader is none.
818        assert!(cache.reader(key).await.is_none());
819        // Key is removed.
820        assert!(!cache.inner.parquet_index.contains_key(&key));
821    }
822
823    #[tokio::test]
824    async fn test_file_cache_recover() {
825        let dir = create_temp_dir("");
826        let local_store = new_fs_store(dir.path().to_str().unwrap());
827        let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None, None);
828
829        let region_id = RegionId::new(2000, 0);
830        let file_type = FileType::Parquet;
831        // Write N files.
832        let file_ids: Vec<_> = (0..10).map(|_| FileId::random()).collect();
833        let mut total_size = 0;
834        for (i, file_id) in file_ids.iter().enumerate() {
835            let key = IndexKey::new(region_id, *file_id, file_type);
836            let file_path = cache.cache_file_path(key);
837            let bytes = i.to_string().into_bytes();
838            local_store.write(&file_path, bytes.clone()).await.unwrap();
839
840            // Add to the cache.
841            cache
842                .put(
843                    IndexKey::new(region_id, *file_id, file_type),
844                    IndexValue {
845                        file_size: bytes.len() as u32,
846                    },
847                )
848                .await;
849            total_size += bytes.len();
850        }
851
852        // Recover the cache.
853        let cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None, None);
854        // No entry before recovery.
855        assert!(
856            cache
857                .reader(IndexKey::new(region_id, file_ids[0], file_type))
858                .await
859                .is_none()
860        );
861        cache.recover(true, None).await;
862
863        // Check size.
864        cache.inner.parquet_index.run_pending_tasks().await;
865        assert_eq!(
866            total_size,
867            cache.inner.parquet_index.weighted_size() as usize
868        );
869
870        for (i, file_id) in file_ids.iter().enumerate() {
871            let key = IndexKey::new(region_id, *file_id, file_type);
872            let reader = cache.reader(key).await.unwrap();
873            let buf = reader.read(..).await.unwrap().to_vec();
874            assert_eq!(i.to_string(), String::from_utf8(buf).unwrap());
875        }
876    }
877
878    #[tokio::test]
879    async fn test_file_cache_read_ranges() {
880        let dir = create_temp_dir("");
881        let local_store = new_fs_store(dir.path().to_str().unwrap());
882        let file_cache = FileCache::new(local_store.clone(), ReadableSize::mb(10), None, None);
883        let region_id = RegionId::new(2000, 0);
884        let file_id = FileId::random();
885        let key = IndexKey::new(region_id, file_id, FileType::Parquet);
886        let file_path = file_cache.cache_file_path(key);
887        // Write a file.
888        let data = b"hello greptime database";
889        local_store
890            .write(&file_path, data.as_slice())
891            .await
892            .unwrap();
893        // Add to the cache.
894        file_cache.put(key, IndexValue { file_size: 5 }).await;
895        // Ranges
896        let ranges = vec![0..5, 6..10, 15..19, 0..data.len() as u64];
897        let bytes = file_cache.read_ranges(key, &ranges).await.unwrap();
898
899        assert_eq!(4, bytes.len());
900        assert_eq!(b"hello", bytes[0].as_ref());
901        assert_eq!(b"grep", bytes[1].as_ref());
902        assert_eq!(b"data", bytes[2].as_ref());
903        assert_eq!(data, bytes[3].as_ref());
904    }
905
906    #[test]
907    fn test_cache_file_path() {
908        let file_id = FileId::parse_str("3368731b-a556-42b8-a5df-9c31ce155095").unwrap();
909        assert_eq!(
910            "test_dir/5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet",
911            cache_file_path(
912                "test_dir",
913                IndexKey::new(RegionId::new(1234, 5), file_id, FileType::Parquet)
914            )
915        );
916        assert_eq!(
917            "test_dir/5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet",
918            cache_file_path(
919                "test_dir/",
920                IndexKey::new(RegionId::new(1234, 5), file_id, FileType::Parquet)
921            )
922        );
923    }
924
925    #[test]
926    fn test_parse_file_name() {
927        let file_id = FileId::parse_str("3368731b-a556-42b8-a5df-9c31ce155095").unwrap();
928        let region_id = RegionId::new(1234, 5);
929        assert_eq!(
930            IndexKey::new(region_id, file_id, FileType::Parquet),
931            parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet").unwrap()
932        );
933        assert_eq!(
934            IndexKey::new(region_id, file_id, FileType::Puffin(0)),
935            parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.puffin").unwrap()
936        );
937        assert_eq!(
938            IndexKey::new(region_id, file_id, FileType::Puffin(42)),
939            parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.42.puffin")
940                .unwrap()
941        );
942        assert!(parse_index_key("").is_none());
943        assert!(parse_index_key(".").is_none());
944        assert!(parse_index_key("5299989643269").is_none());
945        assert!(parse_index_key("5299989643269.").is_none());
946        assert!(parse_index_key(".5299989643269").is_none());
947        assert!(parse_index_key("5299989643269.").is_none());
948        assert!(parse_index_key("5299989643269.3368731b-a556-42b8-a5df").is_none());
949        assert!(parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095").is_none());
950        assert!(
951            parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parque").is_none()
952        );
953        assert!(
954            parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet.puffin")
955                .is_none()
956        );
957    }
958}