mito2/cache/
file_cache.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A cache for files.
16
17use std::fmt;
18use std::ops::Range;
19use std::sync::Arc;
20use std::time::{Duration, Instant};
21
22use bytes::Bytes;
23use common_base::readable_size::ReadableSize;
24use common_telemetry::{debug, error, info, warn};
25use futures::{AsyncWriteExt, FutureExt, TryStreamExt};
26use moka::future::Cache;
27use moka::notification::RemovalCause;
28use moka::policy::EvictionPolicy;
29use object_store::util::join_path;
30use object_store::{ErrorKind, ObjectStore, Reader};
31use parquet::file::metadata::{PageIndexPolicy, ParquetMetaData};
32use snafu::ResultExt;
33use store_api::storage::{FileId, RegionId};
34use tokio::sync::mpsc::{Sender, UnboundedReceiver};
35
36use crate::access_layer::TempFileCleaner;
37use crate::cache::{FILE_TYPE, INDEX_TYPE};
38use crate::error::{self, OpenDalSnafu, Result};
39use crate::metrics::{
40    CACHE_BYTES, CACHE_HIT, CACHE_MISS, WRITE_CACHE_DOWNLOAD_BYTES_TOTAL,
41    WRITE_CACHE_DOWNLOAD_ELAPSED,
42};
43use crate::region::opener::RegionLoadCacheTask;
44use crate::sst::parquet::helper::fetch_byte_ranges;
45use crate::sst::parquet::metadata::MetadataLoader;
46use crate::sst::parquet::reader::MetadataCacheMetrics;
47
48/// Subdirectory of cached files for write.
49///
50/// This must contain three layers, corresponding to [`build_prometheus_metrics_layer`](object_store::layers::build_prometheus_metrics_layer).
51const FILE_DIR: &str = "cache/object/write/";
52
53/// Default percentage for index (puffin) cache (20% of total capacity).
54pub(crate) const DEFAULT_INDEX_CACHE_PERCENT: u8 = 20;
55
56/// Minimum capacity for each cache (512MB).
57const MIN_CACHE_CAPACITY: u64 = 512 * 1024 * 1024;
58
59/// Channel capacity for background download tasks.
60const DOWNLOAD_TASK_CHANNEL_SIZE: usize = 64;
61
62/// A task to download a file in the background.
63struct DownloadTask {
64    index_key: IndexKey,
65    remote_path: String,
66    remote_store: ObjectStore,
67    file_size: u64,
68}
69
70/// Inner struct for FileCache that can be used in spawned tasks.
71#[derive(Debug)]
72struct FileCacheInner {
73    /// Local store to cache files.
74    local_store: ObjectStore,
75    /// Index to track cached Parquet files.
76    parquet_index: Cache<IndexKey, IndexValue>,
77    /// Index to track cached Puffin files.
78    puffin_index: Cache<IndexKey, IndexValue>,
79}
80
81impl FileCacheInner {
82    /// Returns the appropriate memory index for the given file type.
83    fn memory_index(&self, file_type: FileType) -> &Cache<IndexKey, IndexValue> {
84        match file_type {
85            FileType::Parquet => &self.parquet_index,
86            FileType::Puffin { .. } => &self.puffin_index,
87        }
88    }
89
90    /// Returns the cache file path for the key.
91    fn cache_file_path(&self, key: IndexKey) -> String {
92        cache_file_path(FILE_DIR, key)
93    }
94
95    /// Puts a file into the cache index.
96    ///
97    /// The `WriteCache` should ensure the file is in the correct path.
98    async fn put(&self, key: IndexKey, value: IndexValue) {
99        CACHE_BYTES
100            .with_label_values(&[key.file_type.metric_label()])
101            .add(value.file_size.into());
102        let index = self.memory_index(key.file_type);
103        index.insert(key, value).await;
104
105        // Since files are large items, we run the pending tasks immediately.
106        index.run_pending_tasks().await;
107    }
108
109    /// Recovers the index from local store.
110    async fn recover(&self) -> Result<()> {
111        let now = Instant::now();
112        let mut lister = self
113            .local_store
114            .lister_with(FILE_DIR)
115            .await
116            .context(OpenDalSnafu)?;
117        // Use i64 for total_size to reduce the risk of overflow.
118        // It is possible that the total size of the cache is larger than i32::MAX.
119        let (mut total_size, mut total_keys) = (0i64, 0);
120        let (mut parquet_size, mut puffin_size) = (0i64, 0i64);
121        while let Some(entry) = lister.try_next().await.context(OpenDalSnafu)? {
122            let meta = entry.metadata();
123            if !meta.is_file() {
124                continue;
125            }
126            let Some(key) = parse_index_key(entry.name()) else {
127                continue;
128            };
129
130            let meta = self
131                .local_store
132                .stat(entry.path())
133                .await
134                .context(OpenDalSnafu)?;
135            let file_size = meta.content_length() as u32;
136            let index = self.memory_index(key.file_type);
137            index.insert(key, IndexValue { file_size }).await;
138            let size = i64::from(file_size);
139            total_size += size;
140            total_keys += 1;
141
142            // Track sizes separately for each file type
143            match key.file_type {
144                FileType::Parquet => parquet_size += size,
145                FileType::Puffin { .. } => puffin_size += size,
146            }
147        }
148        // The metrics is a signed int gauge so we can updates it finally.
149        CACHE_BYTES
150            .with_label_values(&[FILE_TYPE])
151            .add(parquet_size);
152        CACHE_BYTES
153            .with_label_values(&[INDEX_TYPE])
154            .add(puffin_size);
155
156        // Run all pending tasks of the moka cache so that the cache size is updated
157        // and the eviction policy is applied.
158        self.parquet_index.run_pending_tasks().await;
159        self.puffin_index.run_pending_tasks().await;
160
161        let parquet_weight = self.parquet_index.weighted_size();
162        let parquet_count = self.parquet_index.entry_count();
163        let puffin_weight = self.puffin_index.weighted_size();
164        let puffin_count = self.puffin_index.entry_count();
165        info!(
166            "Recovered file cache, num_keys: {}, num_bytes: {}, parquet(count: {}, weight: {}), puffin(count: {}, weight: {}), cost: {:?}",
167            total_keys,
168            total_size,
169            parquet_count,
170            parquet_weight,
171            puffin_count,
172            puffin_weight,
173            now.elapsed()
174        );
175        Ok(())
176    }
177
178    /// Downloads a file without cleaning up on error.
179    async fn download_without_cleaning(
180        &self,
181        index_key: IndexKey,
182        remote_path: &str,
183        remote_store: &ObjectStore,
184        file_size: u64,
185        concurrency: usize,
186    ) -> Result<()> {
187        const DOWNLOAD_READER_CHUNK_SIZE: ReadableSize = ReadableSize::mb(8);
188
189        let file_type = index_key.file_type;
190        let timer = WRITE_CACHE_DOWNLOAD_ELAPSED
191            .with_label_values(&[match file_type {
192                FileType::Parquet => "download_parquet",
193                FileType::Puffin { .. } => "download_puffin",
194            }])
195            .start_timer();
196
197        let reader = remote_store
198            .reader_with(remote_path)
199            .concurrent(concurrency)
200            .chunk(DOWNLOAD_READER_CHUNK_SIZE.as_bytes() as usize)
201            .await
202            .context(error::OpenDalSnafu)?
203            .into_futures_async_read(0..file_size)
204            .await
205            .context(error::OpenDalSnafu)?;
206
207        let cache_path = self.cache_file_path(index_key);
208        let mut writer = self
209            .local_store
210            .writer(&cache_path)
211            .await
212            .context(error::OpenDalSnafu)?
213            .into_futures_async_write();
214
215        let region_id = index_key.region_id;
216        let file_id = index_key.file_id;
217        let bytes_written =
218            futures::io::copy(reader, &mut writer)
219                .await
220                .context(error::DownloadSnafu {
221                    region_id,
222                    file_id,
223                    file_type,
224                })?;
225        writer.close().await.context(error::DownloadSnafu {
226            region_id,
227            file_id,
228            file_type,
229        })?;
230
231        WRITE_CACHE_DOWNLOAD_BYTES_TOTAL.inc_by(bytes_written);
232
233        let elapsed = timer.stop_and_record();
234        debug!(
235            "Successfully download file '{}' to local '{}', file size: {}, region: {}, cost: {:?}s",
236            remote_path, cache_path, bytes_written, region_id, elapsed,
237        );
238
239        let index_value = IndexValue {
240            file_size: bytes_written as _,
241        };
242        self.put(index_key, index_value).await;
243        Ok(())
244    }
245
246    /// Downloads a file from remote store to local cache.
247    async fn download(
248        &self,
249        index_key: IndexKey,
250        remote_path: &str,
251        remote_store: &ObjectStore,
252        file_size: u64,
253        concurrency: usize,
254    ) -> Result<()> {
255        if let Err(e) = self
256            .download_without_cleaning(index_key, remote_path, remote_store, file_size, concurrency)
257            .await
258        {
259            error!(e; "Failed to download file '{}' for region {}", remote_path, index_key.region_id);
260
261            let filename = index_key.to_string();
262            TempFileCleaner::clean_atomic_dir_files(&self.local_store, &[&filename]).await;
263
264            return Err(e);
265        }
266
267        Ok(())
268    }
269
270    /// Checks if the key is in the file cache.
271    fn contains_key(&self, key: &IndexKey) -> bool {
272        self.memory_index(key.file_type).contains_key(key)
273    }
274}
275
276/// A file cache manages files on local store and evict files based
277/// on size.
278#[derive(Debug, Clone)]
279pub(crate) struct FileCache {
280    /// Inner cache state shared with background worker.
281    inner: Arc<FileCacheInner>,
282    /// Capacity of the puffin (index) cache in bytes.
283    puffin_capacity: u64,
284    /// Channel for background download tasks. None if background worker is disabled.
285    download_task_tx: Option<Sender<DownloadTask>>,
286}
287
288pub(crate) type FileCacheRef = Arc<FileCache>;
289
290impl FileCache {
291    /// Creates a new file cache.
292    pub(crate) fn new(
293        local_store: ObjectStore,
294        capacity: ReadableSize,
295        ttl: Option<Duration>,
296        index_cache_percent: Option<u8>,
297        enable_background_worker: bool,
298    ) -> FileCache {
299        // Validate and use the provided percent or default
300        let index_percent = index_cache_percent
301            .filter(|&percent| percent > 0 && percent < 100)
302            .unwrap_or(DEFAULT_INDEX_CACHE_PERCENT);
303        let total_capacity = capacity.as_bytes();
304
305        // Convert percent to ratio and calculate capacity for each cache
306        let index_ratio = index_percent as f64 / 100.0;
307        let puffin_capacity = (total_capacity as f64 * index_ratio) as u64;
308        let parquet_capacity = total_capacity - puffin_capacity;
309
310        // Ensure both capacities are at least 512MB
311        let puffin_capacity = puffin_capacity.max(MIN_CACHE_CAPACITY);
312        let parquet_capacity = parquet_capacity.max(MIN_CACHE_CAPACITY);
313
314        info!(
315            "Initializing file cache with index_percent: {}%, total_capacity: {}, parquet_capacity: {}, puffin_capacity: {}",
316            index_percent,
317            ReadableSize(total_capacity),
318            ReadableSize(parquet_capacity),
319            ReadableSize(puffin_capacity)
320        );
321
322        let parquet_index = Self::build_cache(local_store.clone(), parquet_capacity, ttl, "file");
323        let puffin_index = Self::build_cache(local_store.clone(), puffin_capacity, ttl, "index");
324
325        // Create inner cache shared with background worker
326        let inner = Arc::new(FileCacheInner {
327            local_store,
328            parquet_index,
329            puffin_index,
330        });
331
332        // Only create channel and spawn worker if background download is enabled
333        let download_task_tx = if enable_background_worker {
334            let (tx, rx) = tokio::sync::mpsc::channel(DOWNLOAD_TASK_CHANNEL_SIZE);
335            Self::spawn_download_worker(inner.clone(), rx);
336            Some(tx)
337        } else {
338            None
339        };
340
341        FileCache {
342            inner,
343            puffin_capacity,
344            download_task_tx,
345        }
346    }
347
348    /// Spawns a background worker to process download tasks.
349    fn spawn_download_worker(
350        inner: Arc<FileCacheInner>,
351        mut download_task_rx: tokio::sync::mpsc::Receiver<DownloadTask>,
352    ) {
353        tokio::spawn(async move {
354            info!("Background download worker started");
355            while let Some(task) = download_task_rx.recv().await {
356                // Check if the file is already in the cache
357                if inner.contains_key(&task.index_key) {
358                    debug!(
359                        "Skipping background download for region {}, file {} - already in cache",
360                        task.index_key.region_id, task.index_key.file_id
361                    );
362                    continue;
363                }
364
365                // Ignores background download errors.
366                let _ = inner
367                    .download(
368                        task.index_key,
369                        &task.remote_path,
370                        &task.remote_store,
371                        task.file_size,
372                        1, // Background downloads use concurrency=1
373                    )
374                    .await;
375            }
376            info!("Background download worker stopped");
377        });
378    }
379
380    /// Builds a cache for a specific file type.
381    fn build_cache(
382        local_store: ObjectStore,
383        capacity: u64,
384        ttl: Option<Duration>,
385        label: &'static str,
386    ) -> Cache<IndexKey, IndexValue> {
387        let cache_store = local_store;
388        let mut builder = Cache::builder()
389            .eviction_policy(EvictionPolicy::lru())
390            .weigher(|_key, value: &IndexValue| -> u32 {
391                // We only measure space on local store.
392                value.file_size
393            })
394            .max_capacity(capacity)
395            .async_eviction_listener(move |key, value, cause| {
396                let store = cache_store.clone();
397                // Stores files under FILE_DIR.
398                let file_path = cache_file_path(FILE_DIR, *key);
399                async move {
400                    if let RemovalCause::Replaced = cause {
401                        // The cache is replaced by another file (maybe download again). We don't remove the same
402                        // file but updates the metrics as the file is already replaced by users.
403                        CACHE_BYTES.with_label_values(&[label]).sub(value.file_size.into());
404                        return;
405                    }
406
407                    match store.delete(&file_path).await {
408                        Ok(()) => {
409                            CACHE_BYTES.with_label_values(&[label]).sub(value.file_size.into());
410                        }
411                        Err(e) => {
412                            warn!(e; "Failed to delete cached file {} for region {}", file_path, key.region_id);
413                        }
414                    }
415                }
416                .boxed()
417            });
418        if let Some(ttl) = ttl {
419            builder = builder.time_to_idle(ttl);
420        }
421        builder.build()
422    }
423
424    /// Puts a file into the cache index.
425    ///
426    /// The `WriteCache` should ensure the file is in the correct path.
427    pub(crate) async fn put(&self, key: IndexKey, value: IndexValue) {
428        self.inner.put(key, value).await
429    }
430
431    pub(crate) async fn get(&self, key: IndexKey) -> Option<IndexValue> {
432        self.inner.memory_index(key.file_type).get(&key).await
433    }
434
435    /// Reads a file from the cache.
436    #[allow(unused)]
437    pub(crate) async fn reader(&self, key: IndexKey) -> Option<Reader> {
438        // We must use `get()` to update the estimator of the cache.
439        // See https://docs.rs/moka/latest/moka/future/struct.Cache.html#method.contains_key
440        let index = self.inner.memory_index(key.file_type);
441        if index.get(&key).await.is_none() {
442            CACHE_MISS
443                .with_label_values(&[key.file_type.metric_label()])
444                .inc();
445            return None;
446        }
447
448        let file_path = self.inner.cache_file_path(key);
449        match self.get_reader(&file_path).await {
450            Ok(Some(reader)) => {
451                CACHE_HIT
452                    .with_label_values(&[key.file_type.metric_label()])
453                    .inc();
454                return Some(reader);
455            }
456            Err(e) => {
457                if e.kind() != ErrorKind::NotFound {
458                    warn!(e; "Failed to get file for key {:?}", key);
459                }
460            }
461            Ok(None) => {}
462        }
463
464        // We removes the file from the index.
465        index.remove(&key).await;
466        CACHE_MISS
467            .with_label_values(&[key.file_type.metric_label()])
468            .inc();
469        None
470    }
471
472    /// Reads ranges from the cache.
473    pub(crate) async fn read_ranges(
474        &self,
475        key: IndexKey,
476        ranges: &[Range<u64>],
477    ) -> Option<Vec<Bytes>> {
478        let index = self.inner.memory_index(key.file_type);
479        if index.get(&key).await.is_none() {
480            CACHE_MISS
481                .with_label_values(&[key.file_type.metric_label()])
482                .inc();
483            return None;
484        }
485
486        let file_path = self.inner.cache_file_path(key);
487        // In most cases, it will use blocking read,
488        // because FileCache is normally based on local file system, which supports blocking read.
489        let bytes_result =
490            fetch_byte_ranges(&file_path, self.inner.local_store.clone(), ranges).await;
491        match bytes_result {
492            Ok(bytes) => {
493                CACHE_HIT
494                    .with_label_values(&[key.file_type.metric_label()])
495                    .inc();
496                Some(bytes)
497            }
498            Err(e) => {
499                if e.kind() != ErrorKind::NotFound {
500                    warn!(e; "Failed to get file for key {:?}", key);
501                }
502
503                // We removes the file from the index.
504                index.remove(&key).await;
505                CACHE_MISS
506                    .with_label_values(&[key.file_type.metric_label()])
507                    .inc();
508                None
509            }
510        }
511    }
512
513    /// Removes a file from the cache explicitly.
514    /// It always tries to remove the file from the local store because we may not have the file
515    /// in the memory index if upload is failed.
516    pub(crate) async fn remove(&self, key: IndexKey) {
517        let file_path = self.inner.cache_file_path(key);
518        self.inner.memory_index(key.file_type).remove(&key).await;
519        // Always delete the file from the local store.
520        if let Err(e) = self.inner.local_store.delete(&file_path).await {
521            warn!(e; "Failed to delete a cached file {}", file_path);
522        }
523    }
524
525    /// Recovers the index from local store.
526    ///
527    /// If `task_receiver` is provided, spawns a background task after recovery
528    /// to process `RegionLoadCacheTask` messages for loading files into the cache.
529    pub(crate) async fn recover(
530        &self,
531        sync: bool,
532        task_receiver: Option<UnboundedReceiver<RegionLoadCacheTask>>,
533    ) {
534        let moved_self = self.clone();
535        let handle = tokio::spawn(async move {
536            if let Err(err) = moved_self.inner.recover().await {
537                error!(err; "Failed to recover file cache.")
538            }
539
540            // Spawns background task to process region load cache tasks after recovery.
541            // So it won't block the recovery when `sync` is true.
542            if let Some(mut receiver) = task_receiver {
543                info!("Spawning background task for processing region load cache tasks");
544                tokio::spawn(async move {
545                    while let Some(task) = receiver.recv().await {
546                        task.fill_cache(&moved_self).await;
547                    }
548                    info!("Background task for processing region load cache tasks stopped");
549                });
550            }
551        });
552
553        if sync {
554            let _ = handle.await;
555        }
556    }
557
558    /// Returns the cache file path for the key.
559    pub(crate) fn cache_file_path(&self, key: IndexKey) -> String {
560        self.inner.cache_file_path(key)
561    }
562
563    /// Returns the local store of the file cache.
564    pub(crate) fn local_store(&self) -> ObjectStore {
565        self.inner.local_store.clone()
566    }
567
568    /// Get the parquet metadata in file cache.
569    /// If the file is not in the cache or fail to load metadata, return None.
570    pub(crate) async fn get_parquet_meta_data(
571        &self,
572        key: IndexKey,
573        cache_metrics: &mut MetadataCacheMetrics,
574        page_index_policy: PageIndexPolicy,
575    ) -> Option<ParquetMetaData> {
576        // Check if file cache contains the key
577        if let Some(index_value) = self.inner.parquet_index.get(&key).await {
578            // Load metadata from file cache
579            let local_store = self.local_store();
580            let file_path = self.inner.cache_file_path(key);
581            let file_size = index_value.file_size as u64;
582            let mut metadata_loader = MetadataLoader::new(local_store, &file_path, file_size);
583            metadata_loader.with_page_index_policy(page_index_policy);
584
585            match metadata_loader.load(cache_metrics).await {
586                Ok(metadata) => {
587                    CACHE_HIT
588                        .with_label_values(&[key.file_type.metric_label()])
589                        .inc();
590                    Some(metadata)
591                }
592                Err(e) => {
593                    if !e.is_object_not_found() {
594                        warn!(
595                            e; "Failed to get parquet metadata for key {:?}",
596                            key
597                        );
598                    }
599                    // We removes the file from the index.
600                    self.inner.parquet_index.remove(&key).await;
601                    CACHE_MISS
602                        .with_label_values(&[key.file_type.metric_label()])
603                        .inc();
604                    None
605                }
606            }
607        } else {
608            CACHE_MISS
609                .with_label_values(&[key.file_type.metric_label()])
610                .inc();
611            None
612        }
613    }
614
615    async fn get_reader(&self, file_path: &str) -> object_store::Result<Option<Reader>> {
616        if self.inner.local_store.exists(file_path).await? {
617            Ok(Some(self.inner.local_store.reader(file_path).await?))
618        } else {
619            Ok(None)
620        }
621    }
622
623    /// Checks if the key is in the file cache.
624    pub(crate) fn contains_key(&self, key: &IndexKey) -> bool {
625        self.inner.contains_key(key)
626    }
627
628    /// Returns the capacity of the puffin (index) cache in bytes.
629    pub(crate) fn puffin_cache_capacity(&self) -> u64 {
630        self.puffin_capacity
631    }
632
633    /// Returns the current weighted size (used bytes) of the puffin (index) cache.
634    pub(crate) fn puffin_cache_size(&self) -> u64 {
635        self.inner.puffin_index.weighted_size()
636    }
637
638    /// Downloads a file in `remote_path` from the remote object store to the local cache
639    /// (specified by `index_key`).
640    pub(crate) async fn download(
641        &self,
642        index_key: IndexKey,
643        remote_path: &str,
644        remote_store: &ObjectStore,
645        file_size: u64,
646    ) -> Result<()> {
647        self.inner
648            .download(index_key, remote_path, remote_store, file_size, 8) // Foreground uses concurrency=8
649            .await
650    }
651
652    /// Downloads a file in `remote_path` from the remote object store to the local cache
653    /// (specified by `index_key`) in the background. Errors are logged but not returned.
654    ///
655    /// This method attempts to send a download task to the background worker.
656    /// If the channel is full, the task is silently dropped.
657    pub(crate) fn maybe_download_background(
658        &self,
659        index_key: IndexKey,
660        remote_path: String,
661        remote_store: ObjectStore,
662        file_size: u64,
663    ) {
664        // Do nothing if background worker is disabled (channel is None)
665        let Some(tx) = &self.download_task_tx else {
666            return;
667        };
668
669        let task = DownloadTask {
670            index_key,
671            remote_path,
672            remote_store,
673            file_size,
674        };
675
676        // Try to send the task; if the channel is full, just drop it
677        if let Err(e) = tx.try_send(task) {
678            debug!(
679                "Failed to queue background download task for region {}, file {}: {:?}",
680                index_key.region_id, index_key.file_id, e
681            );
682        }
683    }
684}
685
686/// Key of file cache index.
687#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
688pub struct IndexKey {
689    pub region_id: RegionId,
690    pub file_id: FileId,
691    pub file_type: FileType,
692}
693
694impl IndexKey {
695    /// Creates a new index key.
696    pub fn new(region_id: RegionId, file_id: FileId, file_type: FileType) -> IndexKey {
697        IndexKey {
698            region_id,
699            file_id,
700            file_type,
701        }
702    }
703}
704
705impl fmt::Display for IndexKey {
706    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
707        write!(
708            f,
709            "{}.{}.{}",
710            self.region_id.as_u64(),
711            self.file_id,
712            self.file_type
713        )
714    }
715}
716
717/// Type of the file.
718#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
719pub enum FileType {
720    /// Parquet file.
721    Parquet,
722    /// Puffin file.
723    Puffin(u64),
724}
725
726impl fmt::Display for FileType {
727    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
728        match self {
729            FileType::Parquet => write!(f, "parquet"),
730            FileType::Puffin(version) => write!(f, "{}.puffin", version),
731        }
732    }
733}
734
735impl FileType {
736    /// Parses the file type from string.
737    pub(crate) fn parse(s: &str) -> Option<FileType> {
738        match s {
739            "parquet" => Some(FileType::Parquet),
740            "puffin" => Some(FileType::Puffin(0)),
741            _ => {
742                // if post-fix with .puffin, try to parse the version
743                if let Some(version_str) = s.strip_suffix(".puffin") {
744                    let version = version_str.parse::<u64>().ok()?;
745                    Some(FileType::Puffin(version))
746                } else {
747                    None
748                }
749            }
750        }
751    }
752
753    /// Returns the metric label for this file type.
754    fn metric_label(&self) -> &'static str {
755        match self {
756            FileType::Parquet => FILE_TYPE,
757            FileType::Puffin(_) => INDEX_TYPE,
758        }
759    }
760}
761
762/// An entity that describes the file in the file cache.
763///
764/// It should only keep minimal information needed by the cache.
765#[derive(Debug, Clone)]
766pub(crate) struct IndexValue {
767    /// Size of the file in bytes.
768    pub(crate) file_size: u32,
769}
770
771/// Generates the path to the cached file.
772///
773/// The file name format is `{region_id}.{file_id}.{file_type}`
774fn cache_file_path(cache_file_dir: &str, key: IndexKey) -> String {
775    join_path(cache_file_dir, &key.to_string())
776}
777
778/// Parse index key from the file name.
779fn parse_index_key(name: &str) -> Option<IndexKey> {
780    let mut split = name.splitn(3, '.');
781    let region_id = split.next().and_then(|s| {
782        let id = s.parse::<u64>().ok()?;
783        Some(RegionId::from_u64(id))
784    })?;
785    let file_id = split.next().and_then(|s| FileId::parse_str(s).ok())?;
786    let file_type = split.next().and_then(FileType::parse)?;
787
788    Some(IndexKey::new(region_id, file_id, file_type))
789}
790
791#[cfg(test)]
792mod tests {
793    use common_test_util::temp_dir::create_temp_dir;
794    use object_store::services::Fs;
795
796    use super::*;
797
798    fn new_fs_store(path: &str) -> ObjectStore {
799        let builder = Fs::default().root(path);
800        ObjectStore::new(builder).unwrap().finish()
801    }
802
803    #[tokio::test]
804    async fn test_file_cache_ttl() {
805        let dir = create_temp_dir("");
806        let local_store = new_fs_store(dir.path().to_str().unwrap());
807
808        let cache = FileCache::new(
809            local_store.clone(),
810            ReadableSize::mb(10),
811            Some(Duration::from_millis(10)),
812            None,
813            true, // enable_background_worker
814        );
815        let region_id = RegionId::new(2000, 0);
816        let file_id = FileId::random();
817        let key = IndexKey::new(region_id, file_id, FileType::Parquet);
818        let file_path = cache.cache_file_path(key);
819
820        // Get an empty file.
821        assert!(cache.reader(key).await.is_none());
822
823        // Write a file.
824        local_store
825            .write(&file_path, b"hello".as_slice())
826            .await
827            .unwrap();
828
829        // Add to the cache.
830        cache
831            .put(
832                IndexKey::new(region_id, file_id, FileType::Parquet),
833                IndexValue { file_size: 5 },
834            )
835            .await;
836
837        let exist = cache.reader(key).await;
838        assert!(exist.is_some());
839        tokio::time::sleep(Duration::from_millis(15)).await;
840        cache.inner.parquet_index.run_pending_tasks().await;
841        let non = cache.reader(key).await;
842        assert!(non.is_none());
843    }
844
845    #[tokio::test]
846    async fn test_file_cache_basic() {
847        let dir = create_temp_dir("");
848        let local_store = new_fs_store(dir.path().to_str().unwrap());
849
850        let cache = FileCache::new(
851            local_store.clone(),
852            ReadableSize::mb(10),
853            None,
854            None,
855            true, // enable_background_worker
856        );
857        let region_id = RegionId::new(2000, 0);
858        let file_id = FileId::random();
859        let key = IndexKey::new(region_id, file_id, FileType::Parquet);
860        let file_path = cache.cache_file_path(key);
861
862        // Get an empty file.
863        assert!(cache.reader(key).await.is_none());
864
865        // Write a file.
866        local_store
867            .write(&file_path, b"hello".as_slice())
868            .await
869            .unwrap();
870        // Add to the cache.
871        cache
872            .put(
873                IndexKey::new(region_id, file_id, FileType::Parquet),
874                IndexValue { file_size: 5 },
875            )
876            .await;
877
878        // Read file content.
879        let reader = cache.reader(key).await.unwrap();
880        let buf = reader.read(..).await.unwrap().to_vec();
881        assert_eq!("hello", String::from_utf8(buf).unwrap());
882
883        // Get weighted size.
884        cache.inner.parquet_index.run_pending_tasks().await;
885        assert_eq!(5, cache.inner.parquet_index.weighted_size());
886
887        // Remove the file.
888        cache.remove(key).await;
889        assert!(cache.reader(key).await.is_none());
890
891        // Ensure all pending tasks of the moka cache is done before assertion.
892        cache.inner.parquet_index.run_pending_tasks().await;
893
894        // The file also not exists.
895        assert!(!local_store.exists(&file_path).await.unwrap());
896        assert_eq!(0, cache.inner.parquet_index.weighted_size());
897    }
898
899    #[tokio::test]
900    async fn test_file_cache_file_removed() {
901        let dir = create_temp_dir("");
902        let local_store = new_fs_store(dir.path().to_str().unwrap());
903
904        let cache = FileCache::new(
905            local_store.clone(),
906            ReadableSize::mb(10),
907            None,
908            None,
909            true, // enable_background_worker
910        );
911        let region_id = RegionId::new(2000, 0);
912        let file_id = FileId::random();
913        let key = IndexKey::new(region_id, file_id, FileType::Parquet);
914        let file_path = cache.cache_file_path(key);
915
916        // Write a file.
917        local_store
918            .write(&file_path, b"hello".as_slice())
919            .await
920            .unwrap();
921        // Add to the cache.
922        cache
923            .put(
924                IndexKey::new(region_id, file_id, FileType::Parquet),
925                IndexValue { file_size: 5 },
926            )
927            .await;
928
929        // Remove the file but keep the index.
930        local_store.delete(&file_path).await.unwrap();
931
932        // Reader is none.
933        assert!(cache.reader(key).await.is_none());
934        // Key is removed.
935        assert!(!cache.inner.parquet_index.contains_key(&key));
936    }
937
938    #[tokio::test]
939    async fn test_file_cache_recover() {
940        let dir = create_temp_dir("");
941        let local_store = new_fs_store(dir.path().to_str().unwrap());
942        let cache = FileCache::new(
943            local_store.clone(),
944            ReadableSize::mb(10),
945            None,
946            None,
947            true, // enable_background_worker
948        );
949
950        let region_id = RegionId::new(2000, 0);
951        let file_type = FileType::Parquet;
952        // Write N files.
953        let file_ids: Vec<_> = (0..10).map(|_| FileId::random()).collect();
954        let mut total_size = 0;
955        for (i, file_id) in file_ids.iter().enumerate() {
956            let key = IndexKey::new(region_id, *file_id, file_type);
957            let file_path = cache.cache_file_path(key);
958            let bytes = i.to_string().into_bytes();
959            local_store.write(&file_path, bytes.clone()).await.unwrap();
960
961            // Add to the cache.
962            cache
963                .put(
964                    IndexKey::new(region_id, *file_id, file_type),
965                    IndexValue {
966                        file_size: bytes.len() as u32,
967                    },
968                )
969                .await;
970            total_size += bytes.len();
971        }
972
973        // Recover the cache.
974        let cache = FileCache::new(
975            local_store.clone(),
976            ReadableSize::mb(10),
977            None,
978            None,
979            true, // enable_background_worker
980        );
981        // No entry before recovery.
982        assert!(
983            cache
984                .reader(IndexKey::new(region_id, file_ids[0], file_type))
985                .await
986                .is_none()
987        );
988        cache.recover(true, None).await;
989
990        // Check size.
991        cache.inner.parquet_index.run_pending_tasks().await;
992        assert_eq!(
993            total_size,
994            cache.inner.parquet_index.weighted_size() as usize
995        );
996
997        for (i, file_id) in file_ids.iter().enumerate() {
998            let key = IndexKey::new(region_id, *file_id, file_type);
999            let reader = cache.reader(key).await.unwrap();
1000            let buf = reader.read(..).await.unwrap().to_vec();
1001            assert_eq!(i.to_string(), String::from_utf8(buf).unwrap());
1002        }
1003    }
1004
1005    #[tokio::test]
1006    async fn test_file_cache_read_ranges() {
1007        let dir = create_temp_dir("");
1008        let local_store = new_fs_store(dir.path().to_str().unwrap());
1009        let file_cache = FileCache::new(
1010            local_store.clone(),
1011            ReadableSize::mb(10),
1012            None,
1013            None,
1014            true, // enable_background_worker
1015        );
1016        let region_id = RegionId::new(2000, 0);
1017        let file_id = FileId::random();
1018        let key = IndexKey::new(region_id, file_id, FileType::Parquet);
1019        let file_path = file_cache.cache_file_path(key);
1020        // Write a file.
1021        let data = b"hello greptime database";
1022        local_store
1023            .write(&file_path, data.as_slice())
1024            .await
1025            .unwrap();
1026        // Add to the cache.
1027        file_cache.put(key, IndexValue { file_size: 5 }).await;
1028        // Ranges
1029        let ranges = vec![0..5, 6..10, 15..19, 0..data.len() as u64];
1030        let bytes = file_cache.read_ranges(key, &ranges).await.unwrap();
1031
1032        assert_eq!(4, bytes.len());
1033        assert_eq!(b"hello", bytes[0].as_ref());
1034        assert_eq!(b"grep", bytes[1].as_ref());
1035        assert_eq!(b"data", bytes[2].as_ref());
1036        assert_eq!(data, bytes[3].as_ref());
1037    }
1038
1039    #[test]
1040    fn test_cache_file_path() {
1041        let file_id = FileId::parse_str("3368731b-a556-42b8-a5df-9c31ce155095").unwrap();
1042        assert_eq!(
1043            "test_dir/5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet",
1044            cache_file_path(
1045                "test_dir",
1046                IndexKey::new(RegionId::new(1234, 5), file_id, FileType::Parquet)
1047            )
1048        );
1049        assert_eq!(
1050            "test_dir/5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet",
1051            cache_file_path(
1052                "test_dir/",
1053                IndexKey::new(RegionId::new(1234, 5), file_id, FileType::Parquet)
1054            )
1055        );
1056    }
1057
1058    #[test]
1059    fn test_parse_file_name() {
1060        let file_id = FileId::parse_str("3368731b-a556-42b8-a5df-9c31ce155095").unwrap();
1061        let region_id = RegionId::new(1234, 5);
1062        assert_eq!(
1063            IndexKey::new(region_id, file_id, FileType::Parquet),
1064            parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet").unwrap()
1065        );
1066        assert_eq!(
1067            IndexKey::new(region_id, file_id, FileType::Puffin(0)),
1068            parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.puffin").unwrap()
1069        );
1070        assert_eq!(
1071            IndexKey::new(region_id, file_id, FileType::Puffin(42)),
1072            parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.42.puffin")
1073                .unwrap()
1074        );
1075        assert!(parse_index_key("").is_none());
1076        assert!(parse_index_key(".").is_none());
1077        assert!(parse_index_key("5299989643269").is_none());
1078        assert!(parse_index_key("5299989643269.").is_none());
1079        assert!(parse_index_key(".5299989643269").is_none());
1080        assert!(parse_index_key("5299989643269.").is_none());
1081        assert!(parse_index_key("5299989643269.3368731b-a556-42b8-a5df").is_none());
1082        assert!(parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095").is_none());
1083        assert!(
1084            parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parque").is_none()
1085        );
1086        assert!(
1087            parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet.puffin")
1088                .is_none()
1089        );
1090    }
1091}