Skip to main content

mito2/cache/
file_cache.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! A cache for files.
16
17use std::fmt;
18use std::ops::Range;
19use std::sync::Arc;
20use std::time::{Duration, Instant};
21
22use bytes::Bytes;
23use common_base::readable_size::ReadableSize;
24use common_telemetry::{debug, error, info, warn};
25use futures::{AsyncWriteExt, FutureExt, TryStreamExt};
26use moka::future::Cache;
27use moka::notification::RemovalCause;
28use moka::policy::EvictionPolicy;
29use object_store::util::join_path;
30use object_store::{ErrorKind, ObjectStore, Reader};
31use parquet::file::metadata::{PageIndexPolicy, ParquetMetaData};
32use snafu::ResultExt;
33use store_api::storage::{FileId, RegionId};
34use tokio::sync::mpsc::{Sender, UnboundedReceiver};
35
36use crate::access_layer::TempFileCleaner;
37use crate::cache::{CachedSstMeta, FILE_TYPE, INDEX_TYPE};
38use crate::error::{self, OpenDalSnafu, Result};
39use crate::metrics::{
40    CACHE_BYTES, CACHE_HIT, CACHE_MISS, WRITE_CACHE_DOWNLOAD_BYTES_TOTAL,
41    WRITE_CACHE_DOWNLOAD_ELAPSED,
42};
43use crate::region::opener::RegionLoadCacheTask;
44use crate::sst::parquet::helper::fetch_byte_ranges;
45use crate::sst::parquet::metadata::MetadataLoader;
46use crate::sst::parquet::reader::MetadataCacheMetrics;
47
48/// Subdirectory of cached files for write.
49///
50/// This must contain three layers, corresponding to [`build_prometheus_metrics_layer`](object_store::layers::build_prometheus_metrics_layer).
51const FILE_DIR: &str = "cache/object/write/";
52
53/// Default percentage for index (puffin) cache (20% of total capacity).
54pub(crate) const DEFAULT_INDEX_CACHE_PERCENT: u8 = 20;
55
56/// Minimum capacity for each cache (512MB).
57const MIN_CACHE_CAPACITY: u64 = 512 * 1024 * 1024;
58
59/// Channel capacity for background download tasks.
60const DOWNLOAD_TASK_CHANNEL_SIZE: usize = 64;
61
62/// A task to download a file in the background.
63struct DownloadTask {
64    index_key: IndexKey,
65    remote_path: String,
66    remote_store: ObjectStore,
67    file_size: u64,
68}
69
70/// Inner struct for FileCache that can be used in spawned tasks.
71#[derive(Debug)]
72struct FileCacheInner {
73    /// Local store to cache files.
74    local_store: ObjectStore,
75    /// Index to track cached Parquet files.
76    parquet_index: Cache<IndexKey, IndexValue>,
77    /// Index to track cached Puffin files.
78    puffin_index: Cache<IndexKey, IndexValue>,
79}
80
81impl FileCacheInner {
82    /// Returns the appropriate memory index for the given file type.
83    fn memory_index(&self, file_type: FileType) -> &Cache<IndexKey, IndexValue> {
84        match file_type {
85            FileType::Parquet => &self.parquet_index,
86            FileType::Puffin { .. } => &self.puffin_index,
87        }
88    }
89
90    /// Returns the cache file path for the key.
91    fn cache_file_path(&self, key: IndexKey) -> String {
92        cache_file_path(FILE_DIR, key)
93    }
94
95    /// Puts a file into the cache index.
96    ///
97    /// The `WriteCache` should ensure the file is in the correct path.
98    async fn put(&self, key: IndexKey, value: IndexValue) {
99        CACHE_BYTES
100            .with_label_values(&[key.file_type.metric_label()])
101            .add(value.file_size.into());
102        let index = self.memory_index(key.file_type);
103        index.insert(key, value).await;
104
105        // Since files are large items, we run the pending tasks immediately.
106        index.run_pending_tasks().await;
107    }
108
109    /// Recovers the index from local store.
110    async fn recover(&self) -> Result<()> {
111        let now = Instant::now();
112        let mut lister = self
113            .local_store
114            .lister_with(FILE_DIR)
115            .await
116            .context(OpenDalSnafu)?;
117        // Use i64 for total_size to reduce the risk of overflow.
118        // It is possible that the total size of the cache is larger than i32::MAX.
119        let (mut total_size, mut total_keys) = (0i64, 0);
120        let (mut parquet_size, mut puffin_size) = (0i64, 0i64);
121        while let Some(entry) = lister.try_next().await.context(OpenDalSnafu)? {
122            let meta = entry.metadata();
123            if !meta.is_file() {
124                continue;
125            }
126            let Some(key) = parse_index_key(entry.name()) else {
127                continue;
128            };
129
130            let meta = self
131                .local_store
132                .stat(entry.path())
133                .await
134                .context(OpenDalSnafu)?;
135            let file_size = meta.content_length() as u32;
136            let index = self.memory_index(key.file_type);
137            index.insert(key, IndexValue { file_size }).await;
138            let size = i64::from(file_size);
139            total_size += size;
140            total_keys += 1;
141
142            // Track sizes separately for each file type
143            match key.file_type {
144                FileType::Parquet => parquet_size += size,
145                FileType::Puffin { .. } => puffin_size += size,
146            }
147        }
148        // The metrics is a signed int gauge so we can updates it finally.
149        CACHE_BYTES
150            .with_label_values(&[FILE_TYPE])
151            .add(parquet_size);
152        CACHE_BYTES
153            .with_label_values(&[INDEX_TYPE])
154            .add(puffin_size);
155
156        // Run all pending tasks of the moka cache so that the cache size is updated
157        // and the eviction policy is applied.
158        self.parquet_index.run_pending_tasks().await;
159        self.puffin_index.run_pending_tasks().await;
160
161        let parquet_weight = self.parquet_index.weighted_size();
162        let parquet_count = self.parquet_index.entry_count();
163        let puffin_weight = self.puffin_index.weighted_size();
164        let puffin_count = self.puffin_index.entry_count();
165        info!(
166            "Recovered file cache, num_keys: {}, num_bytes: {}, parquet(count: {}, weight: {}), puffin(count: {}, weight: {}), cost: {:?}",
167            total_keys,
168            total_size,
169            parquet_count,
170            parquet_weight,
171            puffin_count,
172            puffin_weight,
173            now.elapsed()
174        );
175        Ok(())
176    }
177
178    /// Downloads a file without cleaning up on error.
179    async fn download_without_cleaning(
180        &self,
181        index_key: IndexKey,
182        remote_path: &str,
183        remote_store: &ObjectStore,
184        file_size: u64,
185        concurrency: usize,
186    ) -> Result<()> {
187        const DOWNLOAD_READER_CHUNK_SIZE: ReadableSize = ReadableSize::mb(8);
188
189        let file_type = index_key.file_type;
190        let timer = WRITE_CACHE_DOWNLOAD_ELAPSED
191            .with_label_values(&[match file_type {
192                FileType::Parquet => "download_parquet",
193                FileType::Puffin { .. } => "download_puffin",
194            }])
195            .start_timer();
196
197        let reader = remote_store
198            .reader_with(remote_path)
199            .concurrent(concurrency)
200            .chunk(DOWNLOAD_READER_CHUNK_SIZE.as_bytes() as usize)
201            .await
202            .context(error::OpenDalSnafu)?
203            .into_futures_async_read(0..file_size)
204            .await
205            .context(error::OpenDalSnafu)?;
206
207        let cache_path = self.cache_file_path(index_key);
208        let mut writer = self
209            .local_store
210            .writer(&cache_path)
211            .await
212            .context(error::OpenDalSnafu)?
213            .into_futures_async_write();
214
215        let region_id = index_key.region_id;
216        let file_id = index_key.file_id;
217        let bytes_written =
218            futures::io::copy(reader, &mut writer)
219                .await
220                .context(error::DownloadSnafu {
221                    region_id,
222                    file_id,
223                    file_type,
224                })?;
225        writer.close().await.context(error::DownloadSnafu {
226            region_id,
227            file_id,
228            file_type,
229        })?;
230
231        WRITE_CACHE_DOWNLOAD_BYTES_TOTAL.inc_by(bytes_written);
232
233        let elapsed = timer.stop_and_record();
234        debug!(
235            "Successfully download file '{}' to local '{}', file size: {}, region: {}, cost: {:?}s",
236            remote_path, cache_path, bytes_written, region_id, elapsed,
237        );
238
239        let index_value = IndexValue {
240            file_size: bytes_written as _,
241        };
242        self.put(index_key, index_value).await;
243        Ok(())
244    }
245
246    /// Downloads a file from remote store to local cache.
247    async fn download(
248        &self,
249        index_key: IndexKey,
250        remote_path: &str,
251        remote_store: &ObjectStore,
252        file_size: u64,
253        concurrency: usize,
254    ) -> Result<()> {
255        if let Err(e) = self
256            .download_without_cleaning(index_key, remote_path, remote_store, file_size, concurrency)
257            .await
258        {
259            error!(e; "Failed to download file '{}' for region {}", remote_path, index_key.region_id);
260
261            let filename = index_key.to_string();
262            TempFileCleaner::clean_atomic_dir_files(&self.local_store, &[&filename]).await;
263
264            return Err(e);
265        }
266
267        Ok(())
268    }
269
270    /// Checks if the key is in the file cache.
271    fn contains_key(&self, key: &IndexKey) -> bool {
272        self.memory_index(key.file_type).contains_key(key)
273    }
274}
275
276/// A file cache manages files on local store and evict files based
277/// on size.
278#[derive(Debug, Clone)]
279pub(crate) struct FileCache {
280    /// Inner cache state shared with background worker.
281    inner: Arc<FileCacheInner>,
282    /// Capacity of the puffin (index) cache in bytes.
283    puffin_capacity: u64,
284    /// Channel for background download tasks. None if background worker is disabled.
285    download_task_tx: Option<Sender<DownloadTask>>,
286}
287
288pub(crate) type FileCacheRef = Arc<FileCache>;
289
290impl FileCache {
291    /// Splits the configured total capacity between parquet and puffin caches
292    /// without exceeding the requested overall budget.
293    fn split_cache_capacities(total_capacity: u64, index_percent: u8) -> (u64, u64) {
294        let desired_puffin_capacity = total_capacity * u64::from(index_percent) / 100;
295        let min_cache_capacity = MIN_CACHE_CAPACITY.min(total_capacity / 2);
296        let puffin_capacity =
297            desired_puffin_capacity.clamp(min_cache_capacity, total_capacity - min_cache_capacity);
298        let parquet_capacity = total_capacity - puffin_capacity;
299        (parquet_capacity, puffin_capacity)
300    }
301
302    /// Creates a new file cache.
303    pub(crate) fn new(
304        local_store: ObjectStore,
305        capacity: ReadableSize,
306        ttl: Option<Duration>,
307        index_cache_percent: Option<u8>,
308        enable_background_worker: bool,
309    ) -> FileCache {
310        // Validate and use the provided percent or default
311        let index_percent = index_cache_percent
312            .filter(|&percent| percent > 0 && percent < 100)
313            .unwrap_or(DEFAULT_INDEX_CACHE_PERCENT);
314        let total_capacity = capacity.as_bytes();
315
316        let (parquet_capacity, puffin_capacity) =
317            Self::split_cache_capacities(total_capacity, index_percent);
318
319        info!(
320            "Initializing file cache with index_percent: {}%, total_capacity: {}, parquet_capacity: {}, puffin_capacity: {}",
321            index_percent,
322            ReadableSize(total_capacity),
323            ReadableSize(parquet_capacity),
324            ReadableSize(puffin_capacity)
325        );
326
327        let parquet_index = Self::build_cache(local_store.clone(), parquet_capacity, ttl, "file");
328        let puffin_index = Self::build_cache(local_store.clone(), puffin_capacity, ttl, "index");
329
330        // Create inner cache shared with background worker
331        let inner = Arc::new(FileCacheInner {
332            local_store,
333            parquet_index,
334            puffin_index,
335        });
336
337        // Only create channel and spawn worker if background download is enabled
338        let download_task_tx = if enable_background_worker {
339            let (tx, rx) = tokio::sync::mpsc::channel(DOWNLOAD_TASK_CHANNEL_SIZE);
340            Self::spawn_download_worker(inner.clone(), rx);
341            Some(tx)
342        } else {
343            None
344        };
345
346        FileCache {
347            inner,
348            puffin_capacity,
349            download_task_tx,
350        }
351    }
352
353    /// Spawns a background worker to process download tasks.
354    fn spawn_download_worker(
355        inner: Arc<FileCacheInner>,
356        mut download_task_rx: tokio::sync::mpsc::Receiver<DownloadTask>,
357    ) {
358        tokio::spawn(async move {
359            info!("Background download worker started");
360            while let Some(task) = download_task_rx.recv().await {
361                // Check if the file is already in the cache
362                if inner.contains_key(&task.index_key) {
363                    debug!(
364                        "Skipping background download for region {}, file {} - already in cache",
365                        task.index_key.region_id, task.index_key.file_id
366                    );
367                    continue;
368                }
369
370                // Ignores background download errors.
371                let _ = inner
372                    .download(
373                        task.index_key,
374                        &task.remote_path,
375                        &task.remote_store,
376                        task.file_size,
377                        1, // Background downloads use concurrency=1
378                    )
379                    .await;
380            }
381            info!("Background download worker stopped");
382        });
383    }
384
385    /// Builds a cache for a specific file type.
386    fn build_cache(
387        local_store: ObjectStore,
388        capacity: u64,
389        ttl: Option<Duration>,
390        label: &'static str,
391    ) -> Cache<IndexKey, IndexValue> {
392        let cache_store = local_store;
393        let mut builder = Cache::builder()
394            .eviction_policy(EvictionPolicy::lru())
395            .weigher(|_key, value: &IndexValue| -> u32 {
396                // We only measure space on local store.
397                value.file_size
398            })
399            .max_capacity(capacity)
400            .async_eviction_listener(move |key, value, cause| {
401                let store = cache_store.clone();
402                // Stores files under FILE_DIR.
403                let file_path = cache_file_path(FILE_DIR, *key);
404                async move {
405                    if let RemovalCause::Replaced = cause {
406                        // The cache is replaced by another file (maybe download again). We don't remove the same
407                        // file but updates the metrics as the file is already replaced by users.
408                        CACHE_BYTES.with_label_values(&[label]).sub(value.file_size.into());
409                        return;
410                    }
411
412                    match store.delete(&file_path).await {
413                        Ok(()) => {
414                            CACHE_BYTES.with_label_values(&[label]).sub(value.file_size.into());
415                        }
416                        Err(e) => {
417                            warn!(e; "Failed to delete cached file {} for region {}", file_path, key.region_id);
418                        }
419                    }
420                }
421                .boxed()
422            });
423        if let Some(ttl) = ttl {
424            builder = builder.time_to_idle(ttl);
425        }
426        builder.build()
427    }
428
429    /// Puts a file into the cache index.
430    ///
431    /// The `WriteCache` should ensure the file is in the correct path.
432    pub(crate) async fn put(&self, key: IndexKey, value: IndexValue) {
433        self.inner.put(key, value).await
434    }
435
436    pub(crate) async fn get(&self, key: IndexKey) -> Option<IndexValue> {
437        self.inner.memory_index(key.file_type).get(&key).await
438    }
439
440    /// Reads a file from the cache.
441    #[allow(unused)]
442    pub(crate) async fn reader(&self, key: IndexKey) -> Option<Reader> {
443        // We must use `get()` to update the estimator of the cache.
444        // See https://docs.rs/moka/latest/moka/future/struct.Cache.html#method.contains_key
445        let index = self.inner.memory_index(key.file_type);
446        if index.get(&key).await.is_none() {
447            CACHE_MISS
448                .with_label_values(&[key.file_type.metric_label()])
449                .inc();
450            return None;
451        }
452
453        let file_path = self.inner.cache_file_path(key);
454        match self.get_reader(&file_path).await {
455            Ok(Some(reader)) => {
456                CACHE_HIT
457                    .with_label_values(&[key.file_type.metric_label()])
458                    .inc();
459                return Some(reader);
460            }
461            Err(e) => {
462                if e.kind() != ErrorKind::NotFound {
463                    warn!(e; "Failed to get file for key {:?}", key);
464                }
465            }
466            Ok(None) => {}
467        }
468
469        // We removes the file from the index.
470        index.remove(&key).await;
471        CACHE_MISS
472            .with_label_values(&[key.file_type.metric_label()])
473            .inc();
474        None
475    }
476
477    /// Reads ranges from the cache.
478    pub(crate) async fn read_ranges(
479        &self,
480        key: IndexKey,
481        ranges: &[Range<u64>],
482    ) -> Option<Vec<Bytes>> {
483        let index = self.inner.memory_index(key.file_type);
484        if index.get(&key).await.is_none() {
485            CACHE_MISS
486                .with_label_values(&[key.file_type.metric_label()])
487                .inc();
488            return None;
489        }
490
491        let file_path = self.inner.cache_file_path(key);
492        // In most cases, it will use blocking read,
493        // because FileCache is normally based on local file system, which supports blocking read.
494        let bytes_result =
495            fetch_byte_ranges(&file_path, self.inner.local_store.clone(), ranges).await;
496        match bytes_result {
497            Ok(bytes) => {
498                CACHE_HIT
499                    .with_label_values(&[key.file_type.metric_label()])
500                    .inc();
501                Some(bytes)
502            }
503            Err(e) => {
504                if e.kind() != ErrorKind::NotFound {
505                    warn!(e; "Failed to get file for key {:?}", key);
506                }
507
508                // We removes the file from the index.
509                index.remove(&key).await;
510                CACHE_MISS
511                    .with_label_values(&[key.file_type.metric_label()])
512                    .inc();
513                None
514            }
515        }
516    }
517
518    /// Removes a file from the cache explicitly.
519    /// It always tries to remove the file from the local store because we may not have the file
520    /// in the memory index if upload is failed.
521    pub(crate) async fn remove(&self, key: IndexKey) {
522        let file_path = self.inner.cache_file_path(key);
523        self.inner.memory_index(key.file_type).remove(&key).await;
524        // Always delete the file from the local store.
525        if let Err(e) = self.inner.local_store.delete(&file_path).await {
526            warn!(e; "Failed to delete a cached file {}", file_path);
527        }
528    }
529
530    /// Recovers the index from local store.
531    ///
532    /// If `task_receiver` is provided, spawns a background task after recovery
533    /// to process `RegionLoadCacheTask` messages for loading files into the cache.
534    pub(crate) async fn recover(
535        &self,
536        sync: bool,
537        task_receiver: Option<UnboundedReceiver<RegionLoadCacheTask>>,
538    ) {
539        let moved_self = self.clone();
540        let handle = tokio::spawn(async move {
541            if let Err(err) = moved_self.inner.recover().await {
542                error!(err; "Failed to recover file cache.")
543            }
544
545            // Spawns background task to process region load cache tasks after recovery.
546            // So it won't block the recovery when `sync` is true.
547            if let Some(mut receiver) = task_receiver {
548                info!("Spawning background task for processing region load cache tasks");
549                tokio::spawn(async move {
550                    while let Some(task) = receiver.recv().await {
551                        task.fill_cache(&moved_self).await;
552                    }
553                    info!("Background task for processing region load cache tasks stopped");
554                });
555            }
556        });
557
558        if sync {
559            let _ = handle.await;
560        }
561    }
562
563    /// Returns the cache file path for the key.
564    pub(crate) fn cache_file_path(&self, key: IndexKey) -> String {
565        self.inner.cache_file_path(key)
566    }
567
568    /// Returns the local store of the file cache.
569    pub(crate) fn local_store(&self) -> ObjectStore {
570        self.inner.local_store.clone()
571    }
572
573    /// Get the parquet metadata in file cache.
574    /// If the file is not in the cache or fail to load metadata, return None.
575    pub(crate) async fn get_parquet_meta_data(
576        &self,
577        key: IndexKey,
578        cache_metrics: &mut MetadataCacheMetrics,
579        page_index_policy: PageIndexPolicy,
580    ) -> Option<ParquetMetaData> {
581        // Check if file cache contains the key
582        if let Some(index_value) = self.inner.parquet_index.get(&key).await {
583            // Load metadata from file cache
584            let local_store = self.local_store();
585            let file_path = self.inner.cache_file_path(key);
586            let file_size = index_value.file_size as u64;
587            let mut metadata_loader = MetadataLoader::new(local_store, &file_path, file_size);
588            metadata_loader.with_page_index_policy(page_index_policy);
589
590            match metadata_loader.load(cache_metrics).await {
591                Ok(metadata) => {
592                    CACHE_HIT
593                        .with_label_values(&[key.file_type.metric_label()])
594                        .inc();
595                    Some(metadata)
596                }
597                Err(e) => {
598                    if !e.is_object_not_found() {
599                        warn!(
600                            e; "Failed to get parquet metadata for key {:?}",
601                            key
602                        );
603                    }
604                    // We removes the file from the index.
605                    self.inner.parquet_index.remove(&key).await;
606                    CACHE_MISS
607                        .with_label_values(&[key.file_type.metric_label()])
608                        .inc();
609                    None
610                }
611            }
612        } else {
613            CACHE_MISS
614                .with_label_values(&[key.file_type.metric_label()])
615                .inc();
616            None
617        }
618    }
619
620    /// Get fused SST metadata from the file cache.
621    /// If the file is not in the cache, or metadata loading/decoding fails, return None.
622    pub(crate) async fn get_sst_meta_data(
623        &self,
624        key: IndexKey,
625        cache_metrics: &mut MetadataCacheMetrics,
626        page_index_policy: PageIndexPolicy,
627    ) -> Option<Arc<CachedSstMeta>> {
628        let file_path = self.inner.cache_file_path(key);
629        self.get_parquet_meta_data(key, cache_metrics, page_index_policy)
630            .await
631            .and_then(
632                |metadata| match CachedSstMeta::try_new(&file_path, metadata) {
633                    Ok(metadata) => Some(Arc::new(metadata)),
634                    Err(err) => {
635                        CACHE_MISS
636                            .with_label_values(&[key.file_type.metric_label()])
637                            .inc();
638                        warn!(
639                            err; "Failed to decode cached parquet metadata for key {:?}",
640                            key
641                        );
642                        None
643                    }
644                },
645            )
646    }
647
648    async fn get_reader(&self, file_path: &str) -> object_store::Result<Option<Reader>> {
649        if self.inner.local_store.exists(file_path).await? {
650            Ok(Some(self.inner.local_store.reader(file_path).await?))
651        } else {
652            Ok(None)
653        }
654    }
655
656    /// Checks if the key is in the file cache.
657    pub(crate) fn contains_key(&self, key: &IndexKey) -> bool {
658        self.inner.contains_key(key)
659    }
660
661    /// Returns the capacity of the puffin (index) cache in bytes.
662    pub(crate) fn puffin_cache_capacity(&self) -> u64 {
663        self.puffin_capacity
664    }
665
666    /// Returns the current weighted size (used bytes) of the puffin (index) cache.
667    pub(crate) fn puffin_cache_size(&self) -> u64 {
668        self.inner.puffin_index.weighted_size()
669    }
670
671    /// Downloads a file in `remote_path` from the remote object store to the local cache
672    /// (specified by `index_key`).
673    pub(crate) async fn download(
674        &self,
675        index_key: IndexKey,
676        remote_path: &str,
677        remote_store: &ObjectStore,
678        file_size: u64,
679    ) -> Result<()> {
680        self.inner
681            .download(index_key, remote_path, remote_store, file_size, 8) // Foreground uses concurrency=8
682            .await
683    }
684
685    /// Downloads a file in `remote_path` from the remote object store to the local cache
686    /// (specified by `index_key`) in the background. Errors are logged but not returned.
687    ///
688    /// This method attempts to send a download task to the background worker.
689    /// If the channel is full, the task is silently dropped.
690    pub(crate) fn maybe_download_background(
691        &self,
692        index_key: IndexKey,
693        remote_path: String,
694        remote_store: ObjectStore,
695        file_size: u64,
696    ) {
697        // Do nothing if background worker is disabled (channel is None)
698        let Some(tx) = &self.download_task_tx else {
699            return;
700        };
701
702        let task = DownloadTask {
703            index_key,
704            remote_path,
705            remote_store,
706            file_size,
707        };
708
709        // Try to send the task; if the channel is full, just drop it
710        if let Err(e) = tx.try_send(task) {
711            debug!(
712                "Failed to queue background download task for region {}, file {}: {:?}",
713                index_key.region_id, index_key.file_id, e
714            );
715        }
716    }
717}
718
719/// Key of file cache index.
720#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
721pub struct IndexKey {
722    pub region_id: RegionId,
723    pub file_id: FileId,
724    pub file_type: FileType,
725}
726
727impl IndexKey {
728    /// Creates a new index key.
729    pub fn new(region_id: RegionId, file_id: FileId, file_type: FileType) -> IndexKey {
730        IndexKey {
731            region_id,
732            file_id,
733            file_type,
734        }
735    }
736}
737
738impl fmt::Display for IndexKey {
739    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
740        write!(
741            f,
742            "{}.{}.{}",
743            self.region_id.as_u64(),
744            self.file_id,
745            self.file_type
746        )
747    }
748}
749
750/// Type of the file.
751#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
752pub enum FileType {
753    /// Parquet file.
754    Parquet,
755    /// Puffin file.
756    Puffin(u64),
757}
758
759impl fmt::Display for FileType {
760    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
761        match self {
762            FileType::Parquet => write!(f, "parquet"),
763            FileType::Puffin(version) => write!(f, "{}.puffin", version),
764        }
765    }
766}
767
768impl FileType {
769    /// Parses the file type from string.
770    pub(crate) fn parse(s: &str) -> Option<FileType> {
771        match s {
772            "parquet" => Some(FileType::Parquet),
773            "puffin" => Some(FileType::Puffin(0)),
774            _ => {
775                // if post-fix with .puffin, try to parse the version
776                if let Some(version_str) = s.strip_suffix(".puffin") {
777                    let version = version_str.parse::<u64>().ok()?;
778                    Some(FileType::Puffin(version))
779                } else {
780                    None
781                }
782            }
783        }
784    }
785
786    /// Returns the metric label for this file type.
787    fn metric_label(&self) -> &'static str {
788        match self {
789            FileType::Parquet => FILE_TYPE,
790            FileType::Puffin(_) => INDEX_TYPE,
791        }
792    }
793}
794
795/// An entity that describes the file in the file cache.
796///
797/// It should only keep minimal information needed by the cache.
798#[derive(Debug, Clone)]
799pub(crate) struct IndexValue {
800    /// Size of the file in bytes.
801    pub(crate) file_size: u32,
802}
803
804/// Generates the path to the cached file.
805///
806/// The file name format is `{region_id}.{file_id}.{file_type}`
807fn cache_file_path(cache_file_dir: &str, key: IndexKey) -> String {
808    join_path(cache_file_dir, &key.to_string())
809}
810
811/// Parse index key from the file name.
812fn parse_index_key(name: &str) -> Option<IndexKey> {
813    let mut split = name.splitn(3, '.');
814    let region_id = split.next().and_then(|s| {
815        let id = s.parse::<u64>().ok()?;
816        Some(RegionId::from_u64(id))
817    })?;
818    let file_id = split.next().and_then(|s| FileId::parse_str(s).ok())?;
819    let file_type = split.next().and_then(FileType::parse)?;
820
821    Some(IndexKey::new(region_id, file_id, file_type))
822}
823
824#[cfg(test)]
825mod tests {
826    use common_test_util::temp_dir::create_temp_dir;
827    use object_store::services::Fs;
828
829    use super::*;
830
831    fn new_fs_store(path: &str) -> ObjectStore {
832        let builder = Fs::default().root(path);
833        ObjectStore::new(builder).unwrap().finish()
834    }
835
836    #[tokio::test]
837    async fn test_file_cache_ttl() {
838        let dir = create_temp_dir("");
839        let local_store = new_fs_store(dir.path().to_str().unwrap());
840
841        let cache = FileCache::new(
842            local_store.clone(),
843            ReadableSize::mb(10),
844            Some(Duration::from_millis(10)),
845            None,
846            true, // enable_background_worker
847        );
848        let region_id = RegionId::new(2000, 0);
849        let file_id = FileId::random();
850        let key = IndexKey::new(region_id, file_id, FileType::Parquet);
851        let file_path = cache.cache_file_path(key);
852
853        // Get an empty file.
854        assert!(cache.reader(key).await.is_none());
855
856        // Write a file.
857        local_store
858            .write(&file_path, b"hello".as_slice())
859            .await
860            .unwrap();
861
862        // Add to the cache.
863        cache
864            .put(
865                IndexKey::new(region_id, file_id, FileType::Parquet),
866                IndexValue { file_size: 5 },
867            )
868            .await;
869
870        let exist = cache.reader(key).await;
871        assert!(exist.is_some());
872        tokio::time::sleep(Duration::from_millis(15)).await;
873        cache.inner.parquet_index.run_pending_tasks().await;
874        let non = cache.reader(key).await;
875        assert!(non.is_none());
876    }
877
878    #[tokio::test]
879    async fn test_file_cache_basic() {
880        let dir = create_temp_dir("");
881        let local_store = new_fs_store(dir.path().to_str().unwrap());
882
883        let cache = FileCache::new(
884            local_store.clone(),
885            ReadableSize::mb(10),
886            None,
887            None,
888            true, // enable_background_worker
889        );
890        let region_id = RegionId::new(2000, 0);
891        let file_id = FileId::random();
892        let key = IndexKey::new(region_id, file_id, FileType::Parquet);
893        let file_path = cache.cache_file_path(key);
894
895        // Get an empty file.
896        assert!(cache.reader(key).await.is_none());
897
898        // Write a file.
899        local_store
900            .write(&file_path, b"hello".as_slice())
901            .await
902            .unwrap();
903        // Add to the cache.
904        cache
905            .put(
906                IndexKey::new(region_id, file_id, FileType::Parquet),
907                IndexValue { file_size: 5 },
908            )
909            .await;
910
911        // Read file content.
912        let reader = cache.reader(key).await.unwrap();
913        let buf = reader.read(..).await.unwrap().to_vec();
914        assert_eq!("hello", String::from_utf8(buf).unwrap());
915
916        // Get weighted size.
917        cache.inner.parquet_index.run_pending_tasks().await;
918        assert_eq!(5, cache.inner.parquet_index.weighted_size());
919
920        // Remove the file.
921        cache.remove(key).await;
922        assert!(cache.reader(key).await.is_none());
923
924        // Ensure all pending tasks of the moka cache is done before assertion.
925        cache.inner.parquet_index.run_pending_tasks().await;
926
927        // The file also not exists.
928        assert!(!local_store.exists(&file_path).await.unwrap());
929        assert_eq!(0, cache.inner.parquet_index.weighted_size());
930    }
931
932    #[tokio::test]
933    async fn test_file_cache_file_removed() {
934        let dir = create_temp_dir("");
935        let local_store = new_fs_store(dir.path().to_str().unwrap());
936
937        let cache = FileCache::new(
938            local_store.clone(),
939            ReadableSize::mb(10),
940            None,
941            None,
942            true, // enable_background_worker
943        );
944        let region_id = RegionId::new(2000, 0);
945        let file_id = FileId::random();
946        let key = IndexKey::new(region_id, file_id, FileType::Parquet);
947        let file_path = cache.cache_file_path(key);
948
949        // Write a file.
950        local_store
951            .write(&file_path, b"hello".as_slice())
952            .await
953            .unwrap();
954        // Add to the cache.
955        cache
956            .put(
957                IndexKey::new(region_id, file_id, FileType::Parquet),
958                IndexValue { file_size: 5 },
959            )
960            .await;
961
962        // Remove the file but keep the index.
963        local_store.delete(&file_path).await.unwrap();
964
965        // Reader is none.
966        assert!(cache.reader(key).await.is_none());
967        // Key is removed.
968        assert!(!cache.inner.parquet_index.contains_key(&key));
969    }
970
971    #[tokio::test]
972    async fn test_file_cache_recover() {
973        let dir = create_temp_dir("");
974        let local_store = new_fs_store(dir.path().to_str().unwrap());
975        let cache = FileCache::new(
976            local_store.clone(),
977            ReadableSize::mb(10),
978            None,
979            None,
980            true, // enable_background_worker
981        );
982
983        let region_id = RegionId::new(2000, 0);
984        let file_type = FileType::Parquet;
985        // Write N files.
986        let file_ids: Vec<_> = (0..10).map(|_| FileId::random()).collect();
987        let mut total_size = 0;
988        for (i, file_id) in file_ids.iter().enumerate() {
989            let key = IndexKey::new(region_id, *file_id, file_type);
990            let file_path = cache.cache_file_path(key);
991            let bytes = i.to_string().into_bytes();
992            local_store.write(&file_path, bytes.clone()).await.unwrap();
993
994            // Add to the cache.
995            cache
996                .put(
997                    IndexKey::new(region_id, *file_id, file_type),
998                    IndexValue {
999                        file_size: bytes.len() as u32,
1000                    },
1001                )
1002                .await;
1003            total_size += bytes.len();
1004        }
1005
1006        // Recover the cache.
1007        let cache = FileCache::new(
1008            local_store.clone(),
1009            ReadableSize::mb(10),
1010            None,
1011            None,
1012            true, // enable_background_worker
1013        );
1014        // No entry before recovery.
1015        assert!(
1016            cache
1017                .reader(IndexKey::new(region_id, file_ids[0], file_type))
1018                .await
1019                .is_none()
1020        );
1021        cache.recover(true, None).await;
1022
1023        // Check size.
1024        cache.inner.parquet_index.run_pending_tasks().await;
1025        assert_eq!(
1026            total_size,
1027            cache.inner.parquet_index.weighted_size() as usize
1028        );
1029
1030        for (i, file_id) in file_ids.iter().enumerate() {
1031            let key = IndexKey::new(region_id, *file_id, file_type);
1032            let reader = cache.reader(key).await.unwrap();
1033            let buf = reader.read(..).await.unwrap().to_vec();
1034            assert_eq!(i.to_string(), String::from_utf8(buf).unwrap());
1035        }
1036    }
1037
1038    #[tokio::test]
1039    async fn test_file_cache_read_ranges() {
1040        let dir = create_temp_dir("");
1041        let local_store = new_fs_store(dir.path().to_str().unwrap());
1042        let file_cache = FileCache::new(
1043            local_store.clone(),
1044            ReadableSize::mb(10),
1045            None,
1046            None,
1047            true, // enable_background_worker
1048        );
1049        let region_id = RegionId::new(2000, 0);
1050        let file_id = FileId::random();
1051        let key = IndexKey::new(region_id, file_id, FileType::Parquet);
1052        let file_path = file_cache.cache_file_path(key);
1053        // Write a file.
1054        let data = b"hello greptime database";
1055        local_store
1056            .write(&file_path, data.as_slice())
1057            .await
1058            .unwrap();
1059        // Add to the cache.
1060        file_cache.put(key, IndexValue { file_size: 5 }).await;
1061        // Ranges
1062        let ranges = vec![0..5, 6..10, 15..19, 0..data.len() as u64];
1063        let bytes = file_cache.read_ranges(key, &ranges).await.unwrap();
1064
1065        assert_eq!(4, bytes.len());
1066        assert_eq!(b"hello", bytes[0].as_ref());
1067        assert_eq!(b"grep", bytes[1].as_ref());
1068        assert_eq!(b"data", bytes[2].as_ref());
1069        assert_eq!(data, bytes[3].as_ref());
1070    }
1071
1072    #[test]
1073    fn test_file_cache_capacity_respects_total_budget() {
1074        let total_capacity = ReadableSize::mb(256).as_bytes();
1075        let (parquet_capacity, puffin_capacity) =
1076            FileCache::split_cache_capacities(total_capacity, 20);
1077
1078        assert_eq!(total_capacity, parquet_capacity + puffin_capacity);
1079        assert_eq!(ReadableSize::mb(128).as_bytes(), parquet_capacity);
1080        assert_eq!(ReadableSize::mb(128).as_bytes(), puffin_capacity);
1081    }
1082
1083    #[test]
1084    fn test_file_cache_capacity_keeps_split_when_total_allows_it() {
1085        let total_capacity = ReadableSize::gb(5).as_bytes();
1086        let (parquet_capacity, puffin_capacity) =
1087            FileCache::split_cache_capacities(total_capacity, 20);
1088
1089        assert_eq!(total_capacity, parquet_capacity + puffin_capacity);
1090        assert_eq!(ReadableSize::gb(4).as_bytes(), parquet_capacity);
1091        assert_eq!(ReadableSize::gb(1).as_bytes(), puffin_capacity);
1092    }
1093
1094    #[test]
1095    fn test_cache_file_path() {
1096        let file_id = FileId::parse_str("3368731b-a556-42b8-a5df-9c31ce155095").unwrap();
1097        assert_eq!(
1098            "test_dir/5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet",
1099            cache_file_path(
1100                "test_dir",
1101                IndexKey::new(RegionId::new(1234, 5), file_id, FileType::Parquet)
1102            )
1103        );
1104        assert_eq!(
1105            "test_dir/5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet",
1106            cache_file_path(
1107                "test_dir/",
1108                IndexKey::new(RegionId::new(1234, 5), file_id, FileType::Parquet)
1109            )
1110        );
1111    }
1112
1113    #[test]
1114    fn test_parse_file_name() {
1115        let file_id = FileId::parse_str("3368731b-a556-42b8-a5df-9c31ce155095").unwrap();
1116        let region_id = RegionId::new(1234, 5);
1117        assert_eq!(
1118            IndexKey::new(region_id, file_id, FileType::Parquet),
1119            parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet").unwrap()
1120        );
1121        assert_eq!(
1122            IndexKey::new(region_id, file_id, FileType::Puffin(0)),
1123            parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.puffin").unwrap()
1124        );
1125        assert_eq!(
1126            IndexKey::new(region_id, file_id, FileType::Puffin(42)),
1127            parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.42.puffin")
1128                .unwrap()
1129        );
1130        assert!(parse_index_key("").is_none());
1131        assert!(parse_index_key(".").is_none());
1132        assert!(parse_index_key("5299989643269").is_none());
1133        assert!(parse_index_key("5299989643269.").is_none());
1134        assert!(parse_index_key(".5299989643269").is_none());
1135        assert!(parse_index_key("5299989643269.").is_none());
1136        assert!(parse_index_key("5299989643269.3368731b-a556-42b8-a5df").is_none());
1137        assert!(parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095").is_none());
1138        assert!(
1139            parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parque").is_none()
1140        );
1141        assert!(
1142            parse_index_key("5299989643269.3368731b-a556-42b8-a5df-9c31ce155095.parquet.puffin")
1143                .is_none()
1144        );
1145    }
1146}