mito2/
cache.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Cache for the engine.
16
17mod cache_size;
18
19pub(crate) mod file_cache;
20pub(crate) mod index;
21pub(crate) mod manifest_cache;
22#[cfg(test)]
23pub(crate) mod test_util;
24pub(crate) mod write_cache;
25
26use std::mem;
27use std::ops::Range;
28use std::sync::Arc;
29
30use bytes::Bytes;
31use datatypes::value::Value;
32use datatypes::vectors::VectorRef;
33use index::bloom_filter_index::{BloomFilterIndexCache, BloomFilterIndexCacheRef};
34use index::result_cache::IndexResultCache;
35use moka::notification::RemovalCause;
36use moka::sync::Cache;
37use parquet::file::metadata::ParquetMetaData;
38use puffin::puffin_manager::cache::{PuffinMetadataCache, PuffinMetadataCacheRef};
39use store_api::storage::{ConcreteDataType, FileId, RegionId, TimeSeriesRowSelector};
40
41use crate::cache::cache_size::parquet_meta_size;
42use crate::cache::file_cache::{FileType, IndexKey};
43use crate::cache::index::inverted_index::{InvertedIndexCache, InvertedIndexCacheRef};
44use crate::cache::write_cache::WriteCacheRef;
45use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS};
46use crate::read::Batch;
47use crate::sst::file::RegionFileId;
48use crate::sst::parquet::reader::MetadataCacheMetrics;
49
50/// Metrics type key for sst meta.
51const SST_META_TYPE: &str = "sst_meta";
52/// Metrics type key for vector.
53const VECTOR_TYPE: &str = "vector";
54/// Metrics type key for pages.
55const PAGE_TYPE: &str = "page";
56/// Metrics type key for files on the local store.
57const FILE_TYPE: &str = "file";
58/// Metrics type key for index files (puffin) on the local store.
59const INDEX_TYPE: &str = "index";
60/// Metrics type key for selector result cache.
61const SELECTOR_RESULT_TYPE: &str = "selector_result";
62
63/// Cache strategies that may only enable a subset of caches.
64#[derive(Clone)]
65pub enum CacheStrategy {
66    /// Strategy for normal operations.
67    /// Doesn't disable any cache.
68    EnableAll(CacheManagerRef),
69    /// Strategy for compaction.
70    /// Disables some caches during compaction to avoid affecting queries.
71    /// Enables the write cache so that the compaction can read files cached
72    /// in the write cache and write the compacted files back to the write cache.
73    Compaction(CacheManagerRef),
74    /// Do not use any cache.
75    Disabled,
76}
77
78impl CacheStrategy {
79    /// Gets parquet metadata with cache metrics tracking.
80    /// Returns the metadata and updates the provided metrics.
81    pub(crate) async fn get_parquet_meta_data(
82        &self,
83        file_id: RegionFileId,
84        metrics: &mut MetadataCacheMetrics,
85    ) -> Option<Arc<ParquetMetaData>> {
86        match self {
87            CacheStrategy::EnableAll(cache_manager) => {
88                cache_manager.get_parquet_meta_data(file_id, metrics).await
89            }
90            CacheStrategy::Compaction(cache_manager) => {
91                cache_manager.get_parquet_meta_data(file_id, metrics).await
92            }
93            CacheStrategy::Disabled => {
94                metrics.cache_miss += 1;
95                None
96            }
97        }
98    }
99
100    /// Calls [CacheManager::get_parquet_meta_data_from_mem_cache()].
101    pub fn get_parquet_meta_data_from_mem_cache(
102        &self,
103        file_id: RegionFileId,
104    ) -> Option<Arc<ParquetMetaData>> {
105        match self {
106            CacheStrategy::EnableAll(cache_manager) => {
107                cache_manager.get_parquet_meta_data_from_mem_cache(file_id)
108            }
109            CacheStrategy::Compaction(cache_manager) => {
110                cache_manager.get_parquet_meta_data_from_mem_cache(file_id)
111            }
112            CacheStrategy::Disabled => None,
113        }
114    }
115
116    /// Calls [CacheManager::put_parquet_meta_data()].
117    pub fn put_parquet_meta_data(&self, file_id: RegionFileId, metadata: Arc<ParquetMetaData>) {
118        match self {
119            CacheStrategy::EnableAll(cache_manager) => {
120                cache_manager.put_parquet_meta_data(file_id, metadata);
121            }
122            CacheStrategy::Compaction(cache_manager) => {
123                cache_manager.put_parquet_meta_data(file_id, metadata);
124            }
125            CacheStrategy::Disabled => {}
126        }
127    }
128
129    /// Calls [CacheManager::remove_parquet_meta_data()].
130    pub fn remove_parquet_meta_data(&self, file_id: RegionFileId) {
131        match self {
132            CacheStrategy::EnableAll(cache_manager) => {
133                cache_manager.remove_parquet_meta_data(file_id);
134            }
135            CacheStrategy::Compaction(cache_manager) => {
136                cache_manager.remove_parquet_meta_data(file_id);
137            }
138            CacheStrategy::Disabled => {}
139        }
140    }
141
142    /// Calls [CacheManager::get_repeated_vector()].
143    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
144    pub fn get_repeated_vector(
145        &self,
146        data_type: &ConcreteDataType,
147        value: &Value,
148    ) -> Option<VectorRef> {
149        match self {
150            CacheStrategy::EnableAll(cache_manager) => {
151                cache_manager.get_repeated_vector(data_type, value)
152            }
153            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
154        }
155    }
156
157    /// Calls [CacheManager::put_repeated_vector()].
158    /// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
159    pub fn put_repeated_vector(&self, value: Value, vector: VectorRef) {
160        if let CacheStrategy::EnableAll(cache_manager) = self {
161            cache_manager.put_repeated_vector(value, vector);
162        }
163    }
164
165    /// Calls [CacheManager::get_pages()].
166    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
167    pub fn get_pages(&self, page_key: &PageKey) -> Option<Arc<PageValue>> {
168        match self {
169            CacheStrategy::EnableAll(cache_manager) => cache_manager.get_pages(page_key),
170            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
171        }
172    }
173
174    /// Calls [CacheManager::put_pages()].
175    /// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
176    pub fn put_pages(&self, page_key: PageKey, pages: Arc<PageValue>) {
177        if let CacheStrategy::EnableAll(cache_manager) = self {
178            cache_manager.put_pages(page_key, pages);
179        }
180    }
181
182    /// Calls [CacheManager::evict_puffin_cache()].
183    pub async fn evict_puffin_cache(&self, file_id: RegionFileId) {
184        match self {
185            CacheStrategy::EnableAll(cache_manager) => {
186                cache_manager.evict_puffin_cache(file_id).await
187            }
188            CacheStrategy::Compaction(cache_manager) => {
189                cache_manager.evict_puffin_cache(file_id).await
190            }
191            CacheStrategy::Disabled => {}
192        }
193    }
194
195    /// Calls [CacheManager::get_selector_result()].
196    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
197    pub fn get_selector_result(
198        &self,
199        selector_key: &SelectorResultKey,
200    ) -> Option<Arc<SelectorResultValue>> {
201        match self {
202            CacheStrategy::EnableAll(cache_manager) => {
203                cache_manager.get_selector_result(selector_key)
204            }
205            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
206        }
207    }
208
209    /// Calls [CacheManager::put_selector_result()].
210    /// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
211    pub fn put_selector_result(
212        &self,
213        selector_key: SelectorResultKey,
214        result: Arc<SelectorResultValue>,
215    ) {
216        if let CacheStrategy::EnableAll(cache_manager) = self {
217            cache_manager.put_selector_result(selector_key, result);
218        }
219    }
220
221    /// Calls [CacheManager::write_cache()].
222    /// It returns None if the strategy is [CacheStrategy::Disabled].
223    pub fn write_cache(&self) -> Option<&WriteCacheRef> {
224        match self {
225            CacheStrategy::EnableAll(cache_manager) => cache_manager.write_cache(),
226            CacheStrategy::Compaction(cache_manager) => cache_manager.write_cache(),
227            CacheStrategy::Disabled => None,
228        }
229    }
230
231    /// Calls [CacheManager::index_cache()].
232    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
233    pub fn inverted_index_cache(&self) -> Option<&InvertedIndexCacheRef> {
234        match self {
235            CacheStrategy::EnableAll(cache_manager) => cache_manager.inverted_index_cache(),
236            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
237        }
238    }
239
240    /// Calls [CacheManager::bloom_filter_index_cache()].
241    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
242    pub fn bloom_filter_index_cache(&self) -> Option<&BloomFilterIndexCacheRef> {
243        match self {
244            CacheStrategy::EnableAll(cache_manager) => cache_manager.bloom_filter_index_cache(),
245            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
246        }
247    }
248
249    /// Calls [CacheManager::puffin_metadata_cache()].
250    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
251    pub fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
252        match self {
253            CacheStrategy::EnableAll(cache_manager) => cache_manager.puffin_metadata_cache(),
254            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
255        }
256    }
257
258    /// Calls [CacheManager::index_result_cache()].
259    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
260    pub fn index_result_cache(&self) -> Option<&IndexResultCache> {
261        match self {
262            CacheStrategy::EnableAll(cache_manager) => cache_manager.index_result_cache(),
263            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
264        }
265    }
266}
267
268/// Manages cached data for the engine.
269///
270/// All caches are disabled by default.
271#[derive(Default)]
272pub struct CacheManager {
273    /// Cache for SST metadata.
274    sst_meta_cache: Option<SstMetaCache>,
275    /// Cache for vectors.
276    vector_cache: Option<VectorCache>,
277    /// Cache for SST pages.
278    page_cache: Option<PageCache>,
279    /// A Cache for writing files to object stores.
280    write_cache: Option<WriteCacheRef>,
281    /// Cache for inverted index.
282    inverted_index_cache: Option<InvertedIndexCacheRef>,
283    /// Cache for bloom filter index.
284    bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
285    /// Puffin metadata cache.
286    puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
287    /// Cache for time series selectors.
288    selector_result_cache: Option<SelectorResultCache>,
289    /// Cache for index result.
290    index_result_cache: Option<IndexResultCache>,
291}
292
293pub type CacheManagerRef = Arc<CacheManager>;
294
295impl CacheManager {
296    /// Returns a builder to build the cache.
297    pub fn builder() -> CacheManagerBuilder {
298        CacheManagerBuilder::default()
299    }
300
301    /// Gets cached [ParquetMetaData] with metrics tracking.
302    /// Tries in-memory cache first, then file cache, updating metrics accordingly.
303    pub(crate) async fn get_parquet_meta_data(
304        &self,
305        file_id: RegionFileId,
306        metrics: &mut MetadataCacheMetrics,
307    ) -> Option<Arc<ParquetMetaData>> {
308        // Try to get metadata from sst meta cache
309        if let Some(metadata) = self.get_parquet_meta_data_from_mem_cache(file_id) {
310            metrics.mem_cache_hit += 1;
311            return Some(metadata);
312        }
313
314        // Try to get metadata from write cache
315        let key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Parquet);
316        if let Some(write_cache) = &self.write_cache
317            && let Some(metadata) = write_cache.file_cache().get_parquet_meta_data(key).await
318        {
319            metrics.file_cache_hit += 1;
320            let metadata = Arc::new(metadata);
321            // Put metadata into sst meta cache
322            self.put_parquet_meta_data(file_id, metadata.clone());
323            return Some(metadata);
324        };
325        metrics.cache_miss += 1;
326
327        None
328    }
329
330    /// Gets cached [ParquetMetaData] from in-memory cache.
331    /// This method does not perform I/O.
332    pub fn get_parquet_meta_data_from_mem_cache(
333        &self,
334        file_id: RegionFileId,
335    ) -> Option<Arc<ParquetMetaData>> {
336        // Try to get metadata from sst meta cache
337        self.sst_meta_cache.as_ref().and_then(|sst_meta_cache| {
338            let value = sst_meta_cache.get(&SstMetaKey(file_id.region_id(), file_id.file_id()));
339            update_hit_miss(value, SST_META_TYPE)
340        })
341    }
342
343    /// Puts [ParquetMetaData] into the cache.
344    pub fn put_parquet_meta_data(&self, file_id: RegionFileId, metadata: Arc<ParquetMetaData>) {
345        if let Some(cache) = &self.sst_meta_cache {
346            let key = SstMetaKey(file_id.region_id(), file_id.file_id());
347            CACHE_BYTES
348                .with_label_values(&[SST_META_TYPE])
349                .add(meta_cache_weight(&key, &metadata).into());
350            cache.insert(key, metadata);
351        }
352    }
353
354    /// Removes [ParquetMetaData] from the cache.
355    pub fn remove_parquet_meta_data(&self, file_id: RegionFileId) {
356        if let Some(cache) = &self.sst_meta_cache {
357            cache.remove(&SstMetaKey(file_id.region_id(), file_id.file_id()));
358        }
359    }
360
361    /// Gets a vector with repeated value for specific `key`.
362    pub fn get_repeated_vector(
363        &self,
364        data_type: &ConcreteDataType,
365        value: &Value,
366    ) -> Option<VectorRef> {
367        self.vector_cache.as_ref().and_then(|vector_cache| {
368            let value = vector_cache.get(&(data_type.clone(), value.clone()));
369            update_hit_miss(value, VECTOR_TYPE)
370        })
371    }
372
373    /// Puts a vector with repeated value into the cache.
374    pub fn put_repeated_vector(&self, value: Value, vector: VectorRef) {
375        if let Some(cache) = &self.vector_cache {
376            let key = (vector.data_type(), value);
377            CACHE_BYTES
378                .with_label_values(&[VECTOR_TYPE])
379                .add(vector_cache_weight(&key, &vector).into());
380            cache.insert(key, vector);
381        }
382    }
383
384    /// Gets pages for the row group.
385    pub fn get_pages(&self, page_key: &PageKey) -> Option<Arc<PageValue>> {
386        self.page_cache.as_ref().and_then(|page_cache| {
387            let value = page_cache.get(page_key);
388            update_hit_miss(value, PAGE_TYPE)
389        })
390    }
391
392    /// Puts pages of the row group into the cache.
393    pub fn put_pages(&self, page_key: PageKey, pages: Arc<PageValue>) {
394        if let Some(cache) = &self.page_cache {
395            CACHE_BYTES
396                .with_label_values(&[PAGE_TYPE])
397                .add(page_cache_weight(&page_key, &pages).into());
398            cache.insert(page_key, pages);
399        }
400    }
401
402    /// Evicts every puffin-related cache entry for the given file.
403    pub async fn evict_puffin_cache(&self, file_id: RegionFileId) {
404        if let Some(cache) = &self.bloom_filter_index_cache {
405            cache.invalidate_file(file_id.file_id());
406        }
407
408        if let Some(cache) = &self.inverted_index_cache {
409            cache.invalidate_file(file_id.file_id());
410        }
411
412        if let Some(cache) = &self.index_result_cache {
413            cache.invalidate_file(file_id.file_id());
414        }
415
416        if let Some(cache) = &self.puffin_metadata_cache {
417            cache.remove(&file_id.to_string());
418        }
419
420        if let Some(write_cache) = &self.write_cache {
421            write_cache
422                .remove(IndexKey::new(
423                    file_id.region_id(),
424                    file_id.file_id(),
425                    FileType::Puffin,
426                ))
427                .await;
428        }
429    }
430
431    /// Gets result of for the selector.
432    pub fn get_selector_result(
433        &self,
434        selector_key: &SelectorResultKey,
435    ) -> Option<Arc<SelectorResultValue>> {
436        self.selector_result_cache
437            .as_ref()
438            .and_then(|selector_result_cache| selector_result_cache.get(selector_key))
439    }
440
441    /// Puts result of the selector into the cache.
442    pub fn put_selector_result(
443        &self,
444        selector_key: SelectorResultKey,
445        result: Arc<SelectorResultValue>,
446    ) {
447        if let Some(cache) = &self.selector_result_cache {
448            CACHE_BYTES
449                .with_label_values(&[SELECTOR_RESULT_TYPE])
450                .add(selector_result_cache_weight(&selector_key, &result).into());
451            cache.insert(selector_key, result);
452        }
453    }
454
455    /// Gets the write cache.
456    pub(crate) fn write_cache(&self) -> Option<&WriteCacheRef> {
457        self.write_cache.as_ref()
458    }
459
460    pub(crate) fn inverted_index_cache(&self) -> Option<&InvertedIndexCacheRef> {
461        self.inverted_index_cache.as_ref()
462    }
463
464    pub(crate) fn bloom_filter_index_cache(&self) -> Option<&BloomFilterIndexCacheRef> {
465        self.bloom_filter_index_cache.as_ref()
466    }
467
468    pub(crate) fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
469        self.puffin_metadata_cache.as_ref()
470    }
471
472    pub(crate) fn index_result_cache(&self) -> Option<&IndexResultCache> {
473        self.index_result_cache.as_ref()
474    }
475}
476
477/// Increases selector cache miss metrics.
478pub fn selector_result_cache_miss() {
479    CACHE_MISS.with_label_values(&[SELECTOR_RESULT_TYPE]).inc()
480}
481
482/// Increases selector cache hit metrics.
483pub fn selector_result_cache_hit() {
484    CACHE_HIT.with_label_values(&[SELECTOR_RESULT_TYPE]).inc()
485}
486
487/// Builder to construct a [CacheManager].
488#[derive(Default)]
489pub struct CacheManagerBuilder {
490    sst_meta_cache_size: u64,
491    vector_cache_size: u64,
492    page_cache_size: u64,
493    index_metadata_size: u64,
494    index_content_size: u64,
495    index_content_page_size: u64,
496    index_result_cache_size: u64,
497    puffin_metadata_size: u64,
498    write_cache: Option<WriteCacheRef>,
499    selector_result_cache_size: u64,
500}
501
502impl CacheManagerBuilder {
503    /// Sets meta cache size.
504    pub fn sst_meta_cache_size(mut self, bytes: u64) -> Self {
505        self.sst_meta_cache_size = bytes;
506        self
507    }
508
509    /// Sets vector cache size.
510    pub fn vector_cache_size(mut self, bytes: u64) -> Self {
511        self.vector_cache_size = bytes;
512        self
513    }
514
515    /// Sets page cache size.
516    pub fn page_cache_size(mut self, bytes: u64) -> Self {
517        self.page_cache_size = bytes;
518        self
519    }
520
521    /// Sets write cache.
522    pub fn write_cache(mut self, cache: Option<WriteCacheRef>) -> Self {
523        self.write_cache = cache;
524        self
525    }
526
527    /// Sets cache size for index metadata.
528    pub fn index_metadata_size(mut self, bytes: u64) -> Self {
529        self.index_metadata_size = bytes;
530        self
531    }
532
533    /// Sets cache size for index content.
534    pub fn index_content_size(mut self, bytes: u64) -> Self {
535        self.index_content_size = bytes;
536        self
537    }
538
539    /// Sets page size for index content.
540    pub fn index_content_page_size(mut self, bytes: u64) -> Self {
541        self.index_content_page_size = bytes;
542        self
543    }
544
545    /// Sets cache size for index result.
546    pub fn index_result_cache_size(mut self, bytes: u64) -> Self {
547        self.index_result_cache_size = bytes;
548        self
549    }
550
551    /// Sets cache size for puffin metadata.
552    pub fn puffin_metadata_size(mut self, bytes: u64) -> Self {
553        self.puffin_metadata_size = bytes;
554        self
555    }
556
557    /// Sets selector result cache size.
558    pub fn selector_result_cache_size(mut self, bytes: u64) -> Self {
559        self.selector_result_cache_size = bytes;
560        self
561    }
562
563    /// Builds the [CacheManager].
564    pub fn build(self) -> CacheManager {
565        fn to_str(cause: RemovalCause) -> &'static str {
566            match cause {
567                RemovalCause::Expired => "expired",
568                RemovalCause::Explicit => "explicit",
569                RemovalCause::Replaced => "replaced",
570                RemovalCause::Size => "size",
571            }
572        }
573
574        let sst_meta_cache = (self.sst_meta_cache_size != 0).then(|| {
575            Cache::builder()
576                .max_capacity(self.sst_meta_cache_size)
577                .weigher(meta_cache_weight)
578                .eviction_listener(|k, v, cause| {
579                    let size = meta_cache_weight(&k, &v);
580                    CACHE_BYTES
581                        .with_label_values(&[SST_META_TYPE])
582                        .sub(size.into());
583                    CACHE_EVICTION
584                        .with_label_values(&[SST_META_TYPE, to_str(cause)])
585                        .inc();
586                })
587                .build()
588        });
589        let vector_cache = (self.vector_cache_size != 0).then(|| {
590            Cache::builder()
591                .max_capacity(self.vector_cache_size)
592                .weigher(vector_cache_weight)
593                .eviction_listener(|k, v, cause| {
594                    let size = vector_cache_weight(&k, &v);
595                    CACHE_BYTES
596                        .with_label_values(&[VECTOR_TYPE])
597                        .sub(size.into());
598                    CACHE_EVICTION
599                        .with_label_values(&[VECTOR_TYPE, to_str(cause)])
600                        .inc();
601                })
602                .build()
603        });
604        let page_cache = (self.page_cache_size != 0).then(|| {
605            Cache::builder()
606                .max_capacity(self.page_cache_size)
607                .weigher(page_cache_weight)
608                .eviction_listener(|k, v, cause| {
609                    let size = page_cache_weight(&k, &v);
610                    CACHE_BYTES.with_label_values(&[PAGE_TYPE]).sub(size.into());
611                    CACHE_EVICTION
612                        .with_label_values(&[PAGE_TYPE, to_str(cause)])
613                        .inc();
614                })
615                .build()
616        });
617        let inverted_index_cache = InvertedIndexCache::new(
618            self.index_metadata_size,
619            self.index_content_size,
620            self.index_content_page_size,
621        );
622        // TODO(ruihang): check if it's ok to reuse the same param with inverted index
623        let bloom_filter_index_cache = BloomFilterIndexCache::new(
624            self.index_metadata_size,
625            self.index_content_size,
626            self.index_content_page_size,
627        );
628        let index_result_cache = (self.index_result_cache_size != 0)
629            .then(|| IndexResultCache::new(self.index_result_cache_size));
630        let puffin_metadata_cache =
631            PuffinMetadataCache::new(self.puffin_metadata_size, &CACHE_BYTES);
632        let selector_result_cache = (self.selector_result_cache_size != 0).then(|| {
633            Cache::builder()
634                .max_capacity(self.selector_result_cache_size)
635                .weigher(selector_result_cache_weight)
636                .eviction_listener(|k, v, cause| {
637                    let size = selector_result_cache_weight(&k, &v);
638                    CACHE_BYTES
639                        .with_label_values(&[SELECTOR_RESULT_TYPE])
640                        .sub(size.into());
641                    CACHE_EVICTION
642                        .with_label_values(&[SELECTOR_RESULT_TYPE, to_str(cause)])
643                        .inc();
644                })
645                .build()
646        });
647        CacheManager {
648            sst_meta_cache,
649            vector_cache,
650            page_cache,
651            write_cache: self.write_cache,
652            inverted_index_cache: Some(Arc::new(inverted_index_cache)),
653            bloom_filter_index_cache: Some(Arc::new(bloom_filter_index_cache)),
654            puffin_metadata_cache: Some(Arc::new(puffin_metadata_cache)),
655            selector_result_cache,
656            index_result_cache,
657        }
658    }
659}
660
661fn meta_cache_weight(k: &SstMetaKey, v: &Arc<ParquetMetaData>) -> u32 {
662    // We ignore the size of `Arc`.
663    (k.estimated_size() + parquet_meta_size(v)) as u32
664}
665
666fn vector_cache_weight(_k: &(ConcreteDataType, Value), v: &VectorRef) -> u32 {
667    // We ignore the heap size of `Value`.
668    (mem::size_of::<ConcreteDataType>() + mem::size_of::<Value>() + v.memory_size()) as u32
669}
670
671fn page_cache_weight(k: &PageKey, v: &Arc<PageValue>) -> u32 {
672    (k.estimated_size() + v.estimated_size()) as u32
673}
674
675fn selector_result_cache_weight(k: &SelectorResultKey, v: &Arc<SelectorResultValue>) -> u32 {
676    (mem::size_of_val(k) + v.estimated_size()) as u32
677}
678
679/// Updates cache hit/miss metrics.
680fn update_hit_miss<T>(value: Option<T>, cache_type: &str) -> Option<T> {
681    if value.is_some() {
682        CACHE_HIT.with_label_values(&[cache_type]).inc();
683    } else {
684        CACHE_MISS.with_label_values(&[cache_type]).inc();
685    }
686    value
687}
688
689/// Cache key (region id, file id) for SST meta.
690#[derive(Debug, Clone, PartialEq, Eq, Hash)]
691struct SstMetaKey(RegionId, FileId);
692
693impl SstMetaKey {
694    /// Returns memory used by the key (estimated).
695    fn estimated_size(&self) -> usize {
696        mem::size_of::<Self>()
697    }
698}
699
700/// Path to column pages in the SST file.
701#[derive(Debug, Clone, PartialEq, Eq, Hash)]
702pub struct ColumnPagePath {
703    /// Region id of the SST file to cache.
704    region_id: RegionId,
705    /// Id of the SST file to cache.
706    file_id: FileId,
707    /// Index of the row group.
708    row_group_idx: usize,
709    /// Index of the column in the row group.
710    column_idx: usize,
711}
712
713/// Cache key to pages in a row group (after projection).
714///
715/// Different projections will have different cache keys.
716/// We cache all ranges together because they may refer to the same `Bytes`.
717#[derive(Debug, Clone, PartialEq, Eq, Hash)]
718pub struct PageKey {
719    /// Id of the SST file to cache.
720    file_id: FileId,
721    /// Index of the row group.
722    row_group_idx: usize,
723    /// Byte ranges of the pages to cache.
724    ranges: Vec<Range<u64>>,
725}
726
727impl PageKey {
728    /// Creates a key for a list of pages.
729    pub fn new(file_id: FileId, row_group_idx: usize, ranges: Vec<Range<u64>>) -> PageKey {
730        PageKey {
731            file_id,
732            row_group_idx,
733            ranges,
734        }
735    }
736
737    /// Returns memory used by the key (estimated).
738    fn estimated_size(&self) -> usize {
739        mem::size_of::<Self>() + mem::size_of_val(self.ranges.as_slice())
740    }
741}
742
743/// Cached row group pages for a column.
744// We don't use enum here to make it easier to mock and use the struct.
745#[derive(Default)]
746pub struct PageValue {
747    /// Compressed page in the row group.
748    pub compressed: Vec<Bytes>,
749    /// Total size of the pages (may be larger than sum of compressed bytes due to gaps).
750    pub page_size: u64,
751}
752
753impl PageValue {
754    /// Creates a new value from a range of compressed pages.
755    pub fn new(bytes: Vec<Bytes>, page_size: u64) -> PageValue {
756        PageValue {
757            compressed: bytes,
758            page_size,
759        }
760    }
761
762    /// Returns memory used by the value (estimated).
763    fn estimated_size(&self) -> usize {
764        mem::size_of::<Self>()
765            + self.page_size as usize
766            + self.compressed.iter().map(mem::size_of_val).sum::<usize>()
767    }
768}
769
770/// Cache key for time series row selector result.
771#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
772pub struct SelectorResultKey {
773    /// Id of the SST file.
774    pub file_id: FileId,
775    /// Index of the row group.
776    pub row_group_idx: usize,
777    /// Time series row selector.
778    pub selector: TimeSeriesRowSelector,
779}
780
781/// Cached result for time series row selector.
782pub struct SelectorResultValue {
783    /// Batches of rows selected by the selector.
784    pub result: Vec<Batch>,
785    /// Projection of rows.
786    pub projection: Vec<usize>,
787}
788
789impl SelectorResultValue {
790    /// Creates a new selector result value.
791    pub fn new(result: Vec<Batch>, projection: Vec<usize>) -> SelectorResultValue {
792        SelectorResultValue { result, projection }
793    }
794
795    /// Returns memory used by the value (estimated).
796    fn estimated_size(&self) -> usize {
797        // We only consider heap size of all batches.
798        self.result.iter().map(|batch| batch.memory_size()).sum()
799    }
800}
801
802/// Maps (region id, file id) to [ParquetMetaData].
803type SstMetaCache = Cache<SstMetaKey, Arc<ParquetMetaData>>;
804/// Maps [Value] to a vector that holds this value repeatedly.
805///
806/// e.g. `"hello" => ["hello", "hello", "hello"]`
807type VectorCache = Cache<(ConcreteDataType, Value), VectorRef>;
808/// Maps (region, file, row group, column) to [PageValue].
809type PageCache = Cache<PageKey, Arc<PageValue>>;
810/// Maps (file id, row group id, time series row selector) to [SelectorResultValue].
811type SelectorResultCache = Cache<SelectorResultKey, Arc<SelectorResultValue>>;
812
813#[cfg(test)]
814mod tests {
815    use std::sync::Arc;
816
817    use api::v1::index::{BloomFilterMeta, InvertedIndexMetas};
818    use datatypes::vectors::Int64Vector;
819    use puffin::file_metadata::FileMetadata;
820    use store_api::storage::ColumnId;
821
822    use super::*;
823    use crate::cache::index::bloom_filter_index::Tag;
824    use crate::cache::index::result_cache::PredicateKey;
825    use crate::cache::test_util::parquet_meta;
826    use crate::sst::parquet::row_selection::RowGroupSelection;
827
828    #[tokio::test]
829    async fn test_disable_cache() {
830        let cache = CacheManager::default();
831        assert!(cache.sst_meta_cache.is_none());
832        assert!(cache.vector_cache.is_none());
833        assert!(cache.page_cache.is_none());
834
835        let region_id = RegionId::new(1, 1);
836        let file_id = RegionFileId::new(region_id, FileId::random());
837        let metadata = parquet_meta();
838        let mut metrics = MetadataCacheMetrics::default();
839        cache.put_parquet_meta_data(file_id, metadata);
840        assert!(
841            cache
842                .get_parquet_meta_data(file_id, &mut metrics)
843                .await
844                .is_none()
845        );
846
847        let value = Value::Int64(10);
848        let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10]));
849        cache.put_repeated_vector(value.clone(), vector.clone());
850        assert!(
851            cache
852                .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
853                .is_none()
854        );
855
856        let key = PageKey::new(file_id.file_id(), 1, vec![Range { start: 0, end: 5 }]);
857        let pages = Arc::new(PageValue::default());
858        cache.put_pages(key.clone(), pages);
859        assert!(cache.get_pages(&key).is_none());
860
861        assert!(cache.write_cache().is_none());
862    }
863
864    #[tokio::test]
865    async fn test_parquet_meta_cache() {
866        let cache = CacheManager::builder().sst_meta_cache_size(2000).build();
867        let mut metrics = MetadataCacheMetrics::default();
868        let region_id = RegionId::new(1, 1);
869        let file_id = RegionFileId::new(region_id, FileId::random());
870        assert!(
871            cache
872                .get_parquet_meta_data(file_id, &mut metrics)
873                .await
874                .is_none()
875        );
876        let metadata = parquet_meta();
877        cache.put_parquet_meta_data(file_id, metadata);
878        assert!(
879            cache
880                .get_parquet_meta_data(file_id, &mut metrics)
881                .await
882                .is_some()
883        );
884        cache.remove_parquet_meta_data(file_id);
885        assert!(
886            cache
887                .get_parquet_meta_data(file_id, &mut metrics)
888                .await
889                .is_none()
890        );
891    }
892
893    #[test]
894    fn test_repeated_vector_cache() {
895        let cache = CacheManager::builder().vector_cache_size(4096).build();
896        let value = Value::Int64(10);
897        assert!(
898            cache
899                .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
900                .is_none()
901        );
902        let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10]));
903        cache.put_repeated_vector(value.clone(), vector.clone());
904        let cached = cache
905            .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
906            .unwrap();
907        assert_eq!(vector, cached);
908    }
909
910    #[test]
911    fn test_page_cache() {
912        let cache = CacheManager::builder().page_cache_size(1000).build();
913        let file_id = FileId::random();
914        let key = PageKey::new(file_id, 0, vec![(0..10), (10..20)]);
915        assert!(cache.get_pages(&key).is_none());
916        let pages = Arc::new(PageValue::default());
917        cache.put_pages(key.clone(), pages);
918        assert!(cache.get_pages(&key).is_some());
919    }
920
921    #[test]
922    fn test_selector_result_cache() {
923        let cache = CacheManager::builder()
924            .selector_result_cache_size(1000)
925            .build();
926        let file_id = FileId::random();
927        let key = SelectorResultKey {
928            file_id,
929            row_group_idx: 0,
930            selector: TimeSeriesRowSelector::LastRow,
931        };
932        assert!(cache.get_selector_result(&key).is_none());
933        let result = Arc::new(SelectorResultValue::new(Vec::new(), Vec::new()));
934        cache.put_selector_result(key, result);
935        assert!(cache.get_selector_result(&key).is_some());
936    }
937
938    #[tokio::test]
939    async fn test_evict_puffin_cache_clears_all_entries() {
940        use std::collections::{BTreeMap, HashMap};
941
942        let cache = CacheManager::builder()
943            .index_metadata_size(128)
944            .index_content_size(128)
945            .index_content_page_size(64)
946            .index_result_cache_size(128)
947            .puffin_metadata_size(128)
948            .build();
949        let cache = Arc::new(cache);
950
951        let region_id = RegionId::new(1, 1);
952        let region_file_id = RegionFileId::new(region_id, FileId::random());
953        let column_id: ColumnId = 1;
954
955        let bloom_cache = cache.bloom_filter_index_cache().unwrap().clone();
956        let inverted_cache = cache.inverted_index_cache().unwrap().clone();
957        let result_cache = cache.index_result_cache().unwrap();
958        let puffin_metadata_cache = cache.puffin_metadata_cache().unwrap().clone();
959
960        let bloom_key = (region_file_id.file_id(), column_id, Tag::Skipping);
961        bloom_cache.put_metadata(bloom_key, Arc::new(BloomFilterMeta::default()));
962        inverted_cache.put_metadata(
963            region_file_id.file_id(),
964            Arc::new(InvertedIndexMetas::default()),
965        );
966        let predicate = PredicateKey::new_bloom(Arc::new(BTreeMap::new()));
967        let selection = Arc::new(RowGroupSelection::default());
968        result_cache.put(predicate.clone(), region_file_id.file_id(), selection);
969        let file_id_str = region_file_id.to_string();
970        let metadata = Arc::new(FileMetadata {
971            blobs: Vec::new(),
972            properties: HashMap::new(),
973        });
974        puffin_metadata_cache.put_metadata(file_id_str.clone(), metadata);
975
976        assert!(bloom_cache.get_metadata(bloom_key).is_some());
977        assert!(
978            inverted_cache
979                .get_metadata(region_file_id.file_id())
980                .is_some()
981        );
982        assert!(
983            result_cache
984                .get(&predicate, region_file_id.file_id())
985                .is_some()
986        );
987        assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_some());
988
989        cache.evict_puffin_cache(region_file_id).await;
990
991        assert!(bloom_cache.get_metadata(bloom_key).is_none());
992        assert!(
993            inverted_cache
994                .get_metadata(region_file_id.file_id())
995                .is_none()
996        );
997        assert!(
998            result_cache
999                .get(&predicate, region_file_id.file_id())
1000                .is_none()
1001        );
1002        assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_none());
1003
1004        // Refill caches and evict via CacheStrategy to ensure delegation works.
1005        bloom_cache.put_metadata(bloom_key, Arc::new(BloomFilterMeta::default()));
1006        inverted_cache.put_metadata(
1007            region_file_id.file_id(),
1008            Arc::new(InvertedIndexMetas::default()),
1009        );
1010        result_cache.put(
1011            predicate.clone(),
1012            region_file_id.file_id(),
1013            Arc::new(RowGroupSelection::default()),
1014        );
1015        puffin_metadata_cache.put_metadata(
1016            file_id_str.clone(),
1017            Arc::new(FileMetadata {
1018                blobs: Vec::new(),
1019                properties: HashMap::new(),
1020            }),
1021        );
1022
1023        let strategy = CacheStrategy::EnableAll(cache.clone());
1024        strategy.evict_puffin_cache(region_file_id).await;
1025
1026        assert!(bloom_cache.get_metadata(bloom_key).is_none());
1027        assert!(
1028            inverted_cache
1029                .get_metadata(region_file_id.file_id())
1030                .is_none()
1031        );
1032        assert!(
1033            result_cache
1034                .get(&predicate, region_file_id.file_id())
1035                .is_none()
1036        );
1037        assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_none());
1038    }
1039}