mito2/
cache.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Cache for the engine.
16
17mod cache_size;
18
19pub(crate) mod file_cache;
20pub(crate) mod index;
21pub(crate) mod manifest_cache;
22#[cfg(test)]
23pub(crate) mod test_util;
24pub(crate) mod write_cache;
25
26use std::mem;
27use std::ops::Range;
28use std::sync::Arc;
29
30use bytes::Bytes;
31use datatypes::value::Value;
32use datatypes::vectors::VectorRef;
33use index::bloom_filter_index::{BloomFilterIndexCache, BloomFilterIndexCacheRef};
34use index::result_cache::IndexResultCache;
35use moka::notification::RemovalCause;
36use moka::sync::Cache;
37use object_store::ObjectStore;
38use parquet::file::metadata::ParquetMetaData;
39use puffin::puffin_manager::cache::{PuffinMetadataCache, PuffinMetadataCacheRef};
40use store_api::storage::{ConcreteDataType, FileId, RegionId, TimeSeriesRowSelector};
41
42use crate::cache::cache_size::parquet_meta_size;
43use crate::cache::file_cache::{FileType, IndexKey};
44use crate::cache::index::inverted_index::{InvertedIndexCache, InvertedIndexCacheRef};
45use crate::cache::write_cache::WriteCacheRef;
46use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS};
47use crate::read::Batch;
48use crate::sst::file::{RegionFileId, RegionIndexId};
49use crate::sst::parquet::reader::MetadataCacheMetrics;
50
51/// Metrics type key for sst meta.
52const SST_META_TYPE: &str = "sst_meta";
53/// Metrics type key for vector.
54const VECTOR_TYPE: &str = "vector";
55/// Metrics type key for pages.
56const PAGE_TYPE: &str = "page";
57/// Metrics type key for files on the local store.
58const FILE_TYPE: &str = "file";
59/// Metrics type key for index files (puffin) on the local store.
60const INDEX_TYPE: &str = "index";
61/// Metrics type key for selector result cache.
62const SELECTOR_RESULT_TYPE: &str = "selector_result";
63
64/// Cache strategies that may only enable a subset of caches.
65#[derive(Clone)]
66pub enum CacheStrategy {
67    /// Strategy for normal operations.
68    /// Doesn't disable any cache.
69    EnableAll(CacheManagerRef),
70    /// Strategy for compaction.
71    /// Disables some caches during compaction to avoid affecting queries.
72    /// Enables the write cache so that the compaction can read files cached
73    /// in the write cache and write the compacted files back to the write cache.
74    Compaction(CacheManagerRef),
75    /// Do not use any cache.
76    Disabled,
77}
78
79impl CacheStrategy {
80    /// Gets parquet metadata with cache metrics tracking.
81    /// Returns the metadata and updates the provided metrics.
82    pub(crate) async fn get_parquet_meta_data(
83        &self,
84        file_id: RegionFileId,
85        metrics: &mut MetadataCacheMetrics,
86    ) -> Option<Arc<ParquetMetaData>> {
87        match self {
88            CacheStrategy::EnableAll(cache_manager) => {
89                cache_manager.get_parquet_meta_data(file_id, metrics).await
90            }
91            CacheStrategy::Compaction(cache_manager) => {
92                cache_manager.get_parquet_meta_data(file_id, metrics).await
93            }
94            CacheStrategy::Disabled => {
95                metrics.cache_miss += 1;
96                None
97            }
98        }
99    }
100
101    /// Calls [CacheManager::get_parquet_meta_data_from_mem_cache()].
102    pub fn get_parquet_meta_data_from_mem_cache(
103        &self,
104        file_id: RegionFileId,
105    ) -> Option<Arc<ParquetMetaData>> {
106        match self {
107            CacheStrategy::EnableAll(cache_manager) => {
108                cache_manager.get_parquet_meta_data_from_mem_cache(file_id)
109            }
110            CacheStrategy::Compaction(cache_manager) => {
111                cache_manager.get_parquet_meta_data_from_mem_cache(file_id)
112            }
113            CacheStrategy::Disabled => None,
114        }
115    }
116
117    /// Calls [CacheManager::put_parquet_meta_data()].
118    pub fn put_parquet_meta_data(&self, file_id: RegionFileId, metadata: Arc<ParquetMetaData>) {
119        match self {
120            CacheStrategy::EnableAll(cache_manager) => {
121                cache_manager.put_parquet_meta_data(file_id, metadata);
122            }
123            CacheStrategy::Compaction(cache_manager) => {
124                cache_manager.put_parquet_meta_data(file_id, metadata);
125            }
126            CacheStrategy::Disabled => {}
127        }
128    }
129
130    /// Calls [CacheManager::remove_parquet_meta_data()].
131    pub fn remove_parquet_meta_data(&self, file_id: RegionFileId) {
132        match self {
133            CacheStrategy::EnableAll(cache_manager) => {
134                cache_manager.remove_parquet_meta_data(file_id);
135            }
136            CacheStrategy::Compaction(cache_manager) => {
137                cache_manager.remove_parquet_meta_data(file_id);
138            }
139            CacheStrategy::Disabled => {}
140        }
141    }
142
143    /// Calls [CacheManager::get_repeated_vector()].
144    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
145    pub fn get_repeated_vector(
146        &self,
147        data_type: &ConcreteDataType,
148        value: &Value,
149    ) -> Option<VectorRef> {
150        match self {
151            CacheStrategy::EnableAll(cache_manager) => {
152                cache_manager.get_repeated_vector(data_type, value)
153            }
154            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
155        }
156    }
157
158    /// Calls [CacheManager::put_repeated_vector()].
159    /// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
160    pub fn put_repeated_vector(&self, value: Value, vector: VectorRef) {
161        if let CacheStrategy::EnableAll(cache_manager) = self {
162            cache_manager.put_repeated_vector(value, vector);
163        }
164    }
165
166    /// Calls [CacheManager::get_pages()].
167    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
168    pub fn get_pages(&self, page_key: &PageKey) -> Option<Arc<PageValue>> {
169        match self {
170            CacheStrategy::EnableAll(cache_manager) => cache_manager.get_pages(page_key),
171            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
172        }
173    }
174
175    /// Calls [CacheManager::put_pages()].
176    /// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
177    pub fn put_pages(&self, page_key: PageKey, pages: Arc<PageValue>) {
178        if let CacheStrategy::EnableAll(cache_manager) = self {
179            cache_manager.put_pages(page_key, pages);
180        }
181    }
182
183    /// Calls [CacheManager::evict_puffin_cache()].
184    pub async fn evict_puffin_cache(&self, file_id: RegionIndexId) {
185        match self {
186            CacheStrategy::EnableAll(cache_manager) => {
187                cache_manager.evict_puffin_cache(file_id).await
188            }
189            CacheStrategy::Compaction(cache_manager) => {
190                cache_manager.evict_puffin_cache(file_id).await
191            }
192            CacheStrategy::Disabled => {}
193        }
194    }
195
196    /// Calls [CacheManager::get_selector_result()].
197    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
198    pub fn get_selector_result(
199        &self,
200        selector_key: &SelectorResultKey,
201    ) -> Option<Arc<SelectorResultValue>> {
202        match self {
203            CacheStrategy::EnableAll(cache_manager) => {
204                cache_manager.get_selector_result(selector_key)
205            }
206            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
207        }
208    }
209
210    /// Calls [CacheManager::put_selector_result()].
211    /// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
212    pub fn put_selector_result(
213        &self,
214        selector_key: SelectorResultKey,
215        result: Arc<SelectorResultValue>,
216    ) {
217        if let CacheStrategy::EnableAll(cache_manager) = self {
218            cache_manager.put_selector_result(selector_key, result);
219        }
220    }
221
222    /// Calls [CacheManager::write_cache()].
223    /// It returns None if the strategy is [CacheStrategy::Disabled].
224    pub fn write_cache(&self) -> Option<&WriteCacheRef> {
225        match self {
226            CacheStrategy::EnableAll(cache_manager) => cache_manager.write_cache(),
227            CacheStrategy::Compaction(cache_manager) => cache_manager.write_cache(),
228            CacheStrategy::Disabled => None,
229        }
230    }
231
232    /// Calls [CacheManager::index_cache()].
233    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
234    pub fn inverted_index_cache(&self) -> Option<&InvertedIndexCacheRef> {
235        match self {
236            CacheStrategy::EnableAll(cache_manager) => cache_manager.inverted_index_cache(),
237            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
238        }
239    }
240
241    /// Calls [CacheManager::bloom_filter_index_cache()].
242    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
243    pub fn bloom_filter_index_cache(&self) -> Option<&BloomFilterIndexCacheRef> {
244        match self {
245            CacheStrategy::EnableAll(cache_manager) => cache_manager.bloom_filter_index_cache(),
246            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
247        }
248    }
249
250    /// Calls [CacheManager::puffin_metadata_cache()].
251    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
252    pub fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
253        match self {
254            CacheStrategy::EnableAll(cache_manager) => cache_manager.puffin_metadata_cache(),
255            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
256        }
257    }
258
259    /// Calls [CacheManager::index_result_cache()].
260    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
261    pub fn index_result_cache(&self) -> Option<&IndexResultCache> {
262        match self {
263            CacheStrategy::EnableAll(cache_manager) => cache_manager.index_result_cache(),
264            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
265        }
266    }
267
268    /// Triggers download if the strategy is [CacheStrategy::EnableAll] and write cache is available.
269    pub fn maybe_download_background(
270        &self,
271        index_key: IndexKey,
272        remote_path: String,
273        remote_store: ObjectStore,
274        file_size: u64,
275    ) {
276        if let CacheStrategy::EnableAll(cache_manager) = self
277            && let Some(write_cache) = cache_manager.write_cache()
278        {
279            write_cache.file_cache().maybe_download_background(
280                index_key,
281                remote_path,
282                remote_store,
283                file_size,
284            );
285        }
286    }
287}
288
289/// Manages cached data for the engine.
290///
291/// All caches are disabled by default.
292#[derive(Default)]
293pub struct CacheManager {
294    /// Cache for SST metadata.
295    sst_meta_cache: Option<SstMetaCache>,
296    /// Cache for vectors.
297    vector_cache: Option<VectorCache>,
298    /// Cache for SST pages.
299    page_cache: Option<PageCache>,
300    /// A Cache for writing files to object stores.
301    write_cache: Option<WriteCacheRef>,
302    /// Cache for inverted index.
303    inverted_index_cache: Option<InvertedIndexCacheRef>,
304    /// Cache for bloom filter index.
305    bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
306    /// Puffin metadata cache.
307    puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
308    /// Cache for time series selectors.
309    selector_result_cache: Option<SelectorResultCache>,
310    /// Cache for index result.
311    index_result_cache: Option<IndexResultCache>,
312}
313
314pub type CacheManagerRef = Arc<CacheManager>;
315
316impl CacheManager {
317    /// Returns a builder to build the cache.
318    pub fn builder() -> CacheManagerBuilder {
319        CacheManagerBuilder::default()
320    }
321
322    /// Gets cached [ParquetMetaData] with metrics tracking.
323    /// Tries in-memory cache first, then file cache, updating metrics accordingly.
324    pub(crate) async fn get_parquet_meta_data(
325        &self,
326        file_id: RegionFileId,
327        metrics: &mut MetadataCacheMetrics,
328    ) -> Option<Arc<ParquetMetaData>> {
329        // Try to get metadata from sst meta cache
330        if let Some(metadata) = self.get_parquet_meta_data_from_mem_cache(file_id) {
331            metrics.mem_cache_hit += 1;
332            return Some(metadata);
333        }
334
335        // Try to get metadata from write cache
336        let key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Parquet);
337        if let Some(write_cache) = &self.write_cache
338            && let Some(metadata) = write_cache.file_cache().get_parquet_meta_data(key).await
339        {
340            metrics.file_cache_hit += 1;
341            let metadata = Arc::new(metadata);
342            // Put metadata into sst meta cache
343            self.put_parquet_meta_data(file_id, metadata.clone());
344            return Some(metadata);
345        };
346        metrics.cache_miss += 1;
347
348        None
349    }
350
351    /// Gets cached [ParquetMetaData] from in-memory cache.
352    /// This method does not perform I/O.
353    pub fn get_parquet_meta_data_from_mem_cache(
354        &self,
355        file_id: RegionFileId,
356    ) -> Option<Arc<ParquetMetaData>> {
357        // Try to get metadata from sst meta cache
358        self.sst_meta_cache.as_ref().and_then(|sst_meta_cache| {
359            let value = sst_meta_cache.get(&SstMetaKey(file_id.region_id(), file_id.file_id()));
360            update_hit_miss(value, SST_META_TYPE)
361        })
362    }
363
364    /// Puts [ParquetMetaData] into the cache.
365    pub fn put_parquet_meta_data(&self, file_id: RegionFileId, metadata: Arc<ParquetMetaData>) {
366        if let Some(cache) = &self.sst_meta_cache {
367            let key = SstMetaKey(file_id.region_id(), file_id.file_id());
368            CACHE_BYTES
369                .with_label_values(&[SST_META_TYPE])
370                .add(meta_cache_weight(&key, &metadata).into());
371            cache.insert(key, metadata);
372        }
373    }
374
375    /// Removes [ParquetMetaData] from the cache.
376    pub fn remove_parquet_meta_data(&self, file_id: RegionFileId) {
377        if let Some(cache) = &self.sst_meta_cache {
378            cache.remove(&SstMetaKey(file_id.region_id(), file_id.file_id()));
379        }
380    }
381
382    /// Gets a vector with repeated value for specific `key`.
383    pub fn get_repeated_vector(
384        &self,
385        data_type: &ConcreteDataType,
386        value: &Value,
387    ) -> Option<VectorRef> {
388        self.vector_cache.as_ref().and_then(|vector_cache| {
389            let value = vector_cache.get(&(data_type.clone(), value.clone()));
390            update_hit_miss(value, VECTOR_TYPE)
391        })
392    }
393
394    /// Puts a vector with repeated value into the cache.
395    pub fn put_repeated_vector(&self, value: Value, vector: VectorRef) {
396        if let Some(cache) = &self.vector_cache {
397            let key = (vector.data_type(), value);
398            CACHE_BYTES
399                .with_label_values(&[VECTOR_TYPE])
400                .add(vector_cache_weight(&key, &vector).into());
401            cache.insert(key, vector);
402        }
403    }
404
405    /// Gets pages for the row group.
406    pub fn get_pages(&self, page_key: &PageKey) -> Option<Arc<PageValue>> {
407        self.page_cache.as_ref().and_then(|page_cache| {
408            let value = page_cache.get(page_key);
409            update_hit_miss(value, PAGE_TYPE)
410        })
411    }
412
413    /// Puts pages of the row group into the cache.
414    pub fn put_pages(&self, page_key: PageKey, pages: Arc<PageValue>) {
415        if let Some(cache) = &self.page_cache {
416            CACHE_BYTES
417                .with_label_values(&[PAGE_TYPE])
418                .add(page_cache_weight(&page_key, &pages).into());
419            cache.insert(page_key, pages);
420        }
421    }
422
423    /// Evicts every puffin-related cache entry for the given file.
424    pub async fn evict_puffin_cache(&self, file_id: RegionIndexId) {
425        if let Some(cache) = &self.bloom_filter_index_cache {
426            cache.invalidate_file(file_id.file_id());
427        }
428
429        if let Some(cache) = &self.inverted_index_cache {
430            cache.invalidate_file(file_id.file_id());
431        }
432
433        if let Some(cache) = &self.index_result_cache {
434            cache.invalidate_file(file_id.file_id());
435        }
436
437        if let Some(cache) = &self.puffin_metadata_cache {
438            cache.remove(&file_id.to_string());
439        }
440
441        if let Some(write_cache) = &self.write_cache {
442            write_cache
443                .remove(IndexKey::new(
444                    file_id.region_id(),
445                    file_id.file_id(),
446                    FileType::Puffin(file_id.version),
447                ))
448                .await;
449        }
450    }
451
452    /// Gets result of for the selector.
453    pub fn get_selector_result(
454        &self,
455        selector_key: &SelectorResultKey,
456    ) -> Option<Arc<SelectorResultValue>> {
457        self.selector_result_cache
458            .as_ref()
459            .and_then(|selector_result_cache| selector_result_cache.get(selector_key))
460    }
461
462    /// Puts result of the selector into the cache.
463    pub fn put_selector_result(
464        &self,
465        selector_key: SelectorResultKey,
466        result: Arc<SelectorResultValue>,
467    ) {
468        if let Some(cache) = &self.selector_result_cache {
469            CACHE_BYTES
470                .with_label_values(&[SELECTOR_RESULT_TYPE])
471                .add(selector_result_cache_weight(&selector_key, &result).into());
472            cache.insert(selector_key, result);
473        }
474    }
475
476    /// Gets the write cache.
477    pub(crate) fn write_cache(&self) -> Option<&WriteCacheRef> {
478        self.write_cache.as_ref()
479    }
480
481    pub(crate) fn inverted_index_cache(&self) -> Option<&InvertedIndexCacheRef> {
482        self.inverted_index_cache.as_ref()
483    }
484
485    pub(crate) fn bloom_filter_index_cache(&self) -> Option<&BloomFilterIndexCacheRef> {
486        self.bloom_filter_index_cache.as_ref()
487    }
488
489    pub(crate) fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
490        self.puffin_metadata_cache.as_ref()
491    }
492
493    pub(crate) fn index_result_cache(&self) -> Option<&IndexResultCache> {
494        self.index_result_cache.as_ref()
495    }
496}
497
498/// Increases selector cache miss metrics.
499pub fn selector_result_cache_miss() {
500    CACHE_MISS.with_label_values(&[SELECTOR_RESULT_TYPE]).inc()
501}
502
503/// Increases selector cache hit metrics.
504pub fn selector_result_cache_hit() {
505    CACHE_HIT.with_label_values(&[SELECTOR_RESULT_TYPE]).inc()
506}
507
508/// Builder to construct a [CacheManager].
509#[derive(Default)]
510pub struct CacheManagerBuilder {
511    sst_meta_cache_size: u64,
512    vector_cache_size: u64,
513    page_cache_size: u64,
514    index_metadata_size: u64,
515    index_content_size: u64,
516    index_content_page_size: u64,
517    index_result_cache_size: u64,
518    puffin_metadata_size: u64,
519    write_cache: Option<WriteCacheRef>,
520    selector_result_cache_size: u64,
521}
522
523impl CacheManagerBuilder {
524    /// Sets meta cache size.
525    pub fn sst_meta_cache_size(mut self, bytes: u64) -> Self {
526        self.sst_meta_cache_size = bytes;
527        self
528    }
529
530    /// Sets vector cache size.
531    pub fn vector_cache_size(mut self, bytes: u64) -> Self {
532        self.vector_cache_size = bytes;
533        self
534    }
535
536    /// Sets page cache size.
537    pub fn page_cache_size(mut self, bytes: u64) -> Self {
538        self.page_cache_size = bytes;
539        self
540    }
541
542    /// Sets write cache.
543    pub fn write_cache(mut self, cache: Option<WriteCacheRef>) -> Self {
544        self.write_cache = cache;
545        self
546    }
547
548    /// Sets cache size for index metadata.
549    pub fn index_metadata_size(mut self, bytes: u64) -> Self {
550        self.index_metadata_size = bytes;
551        self
552    }
553
554    /// Sets cache size for index content.
555    pub fn index_content_size(mut self, bytes: u64) -> Self {
556        self.index_content_size = bytes;
557        self
558    }
559
560    /// Sets page size for index content.
561    pub fn index_content_page_size(mut self, bytes: u64) -> Self {
562        self.index_content_page_size = bytes;
563        self
564    }
565
566    /// Sets cache size for index result.
567    pub fn index_result_cache_size(mut self, bytes: u64) -> Self {
568        self.index_result_cache_size = bytes;
569        self
570    }
571
572    /// Sets cache size for puffin metadata.
573    pub fn puffin_metadata_size(mut self, bytes: u64) -> Self {
574        self.puffin_metadata_size = bytes;
575        self
576    }
577
578    /// Sets selector result cache size.
579    pub fn selector_result_cache_size(mut self, bytes: u64) -> Self {
580        self.selector_result_cache_size = bytes;
581        self
582    }
583
584    /// Builds the [CacheManager].
585    pub fn build(self) -> CacheManager {
586        fn to_str(cause: RemovalCause) -> &'static str {
587            match cause {
588                RemovalCause::Expired => "expired",
589                RemovalCause::Explicit => "explicit",
590                RemovalCause::Replaced => "replaced",
591                RemovalCause::Size => "size",
592            }
593        }
594
595        let sst_meta_cache = (self.sst_meta_cache_size != 0).then(|| {
596            Cache::builder()
597                .max_capacity(self.sst_meta_cache_size)
598                .weigher(meta_cache_weight)
599                .eviction_listener(|k, v, cause| {
600                    let size = meta_cache_weight(&k, &v);
601                    CACHE_BYTES
602                        .with_label_values(&[SST_META_TYPE])
603                        .sub(size.into());
604                    CACHE_EVICTION
605                        .with_label_values(&[SST_META_TYPE, to_str(cause)])
606                        .inc();
607                })
608                .build()
609        });
610        let vector_cache = (self.vector_cache_size != 0).then(|| {
611            Cache::builder()
612                .max_capacity(self.vector_cache_size)
613                .weigher(vector_cache_weight)
614                .eviction_listener(|k, v, cause| {
615                    let size = vector_cache_weight(&k, &v);
616                    CACHE_BYTES
617                        .with_label_values(&[VECTOR_TYPE])
618                        .sub(size.into());
619                    CACHE_EVICTION
620                        .with_label_values(&[VECTOR_TYPE, to_str(cause)])
621                        .inc();
622                })
623                .build()
624        });
625        let page_cache = (self.page_cache_size != 0).then(|| {
626            Cache::builder()
627                .max_capacity(self.page_cache_size)
628                .weigher(page_cache_weight)
629                .eviction_listener(|k, v, cause| {
630                    let size = page_cache_weight(&k, &v);
631                    CACHE_BYTES.with_label_values(&[PAGE_TYPE]).sub(size.into());
632                    CACHE_EVICTION
633                        .with_label_values(&[PAGE_TYPE, to_str(cause)])
634                        .inc();
635                })
636                .build()
637        });
638        let inverted_index_cache = InvertedIndexCache::new(
639            self.index_metadata_size,
640            self.index_content_size,
641            self.index_content_page_size,
642        );
643        // TODO(ruihang): check if it's ok to reuse the same param with inverted index
644        let bloom_filter_index_cache = BloomFilterIndexCache::new(
645            self.index_metadata_size,
646            self.index_content_size,
647            self.index_content_page_size,
648        );
649        let index_result_cache = (self.index_result_cache_size != 0)
650            .then(|| IndexResultCache::new(self.index_result_cache_size));
651        let puffin_metadata_cache =
652            PuffinMetadataCache::new(self.puffin_metadata_size, &CACHE_BYTES);
653        let selector_result_cache = (self.selector_result_cache_size != 0).then(|| {
654            Cache::builder()
655                .max_capacity(self.selector_result_cache_size)
656                .weigher(selector_result_cache_weight)
657                .eviction_listener(|k, v, cause| {
658                    let size = selector_result_cache_weight(&k, &v);
659                    CACHE_BYTES
660                        .with_label_values(&[SELECTOR_RESULT_TYPE])
661                        .sub(size.into());
662                    CACHE_EVICTION
663                        .with_label_values(&[SELECTOR_RESULT_TYPE, to_str(cause)])
664                        .inc();
665                })
666                .build()
667        });
668        CacheManager {
669            sst_meta_cache,
670            vector_cache,
671            page_cache,
672            write_cache: self.write_cache,
673            inverted_index_cache: Some(Arc::new(inverted_index_cache)),
674            bloom_filter_index_cache: Some(Arc::new(bloom_filter_index_cache)),
675            puffin_metadata_cache: Some(Arc::new(puffin_metadata_cache)),
676            selector_result_cache,
677            index_result_cache,
678        }
679    }
680}
681
682fn meta_cache_weight(k: &SstMetaKey, v: &Arc<ParquetMetaData>) -> u32 {
683    // We ignore the size of `Arc`.
684    (k.estimated_size() + parquet_meta_size(v)) as u32
685}
686
687fn vector_cache_weight(_k: &(ConcreteDataType, Value), v: &VectorRef) -> u32 {
688    // We ignore the heap size of `Value`.
689    (mem::size_of::<ConcreteDataType>() + mem::size_of::<Value>() + v.memory_size()) as u32
690}
691
692fn page_cache_weight(k: &PageKey, v: &Arc<PageValue>) -> u32 {
693    (k.estimated_size() + v.estimated_size()) as u32
694}
695
696fn selector_result_cache_weight(k: &SelectorResultKey, v: &Arc<SelectorResultValue>) -> u32 {
697    (mem::size_of_val(k) + v.estimated_size()) as u32
698}
699
700/// Updates cache hit/miss metrics.
701fn update_hit_miss<T>(value: Option<T>, cache_type: &str) -> Option<T> {
702    if value.is_some() {
703        CACHE_HIT.with_label_values(&[cache_type]).inc();
704    } else {
705        CACHE_MISS.with_label_values(&[cache_type]).inc();
706    }
707    value
708}
709
710/// Cache key (region id, file id) for SST meta.
711#[derive(Debug, Clone, PartialEq, Eq, Hash)]
712struct SstMetaKey(RegionId, FileId);
713
714impl SstMetaKey {
715    /// Returns memory used by the key (estimated).
716    fn estimated_size(&self) -> usize {
717        mem::size_of::<Self>()
718    }
719}
720
721/// Path to column pages in the SST file.
722#[derive(Debug, Clone, PartialEq, Eq, Hash)]
723pub struct ColumnPagePath {
724    /// Region id of the SST file to cache.
725    region_id: RegionId,
726    /// Id of the SST file to cache.
727    file_id: FileId,
728    /// Index of the row group.
729    row_group_idx: usize,
730    /// Index of the column in the row group.
731    column_idx: usize,
732}
733
734/// Cache key to pages in a row group (after projection).
735///
736/// Different projections will have different cache keys.
737/// We cache all ranges together because they may refer to the same `Bytes`.
738#[derive(Debug, Clone, PartialEq, Eq, Hash)]
739pub struct PageKey {
740    /// Id of the SST file to cache.
741    file_id: FileId,
742    /// Index of the row group.
743    row_group_idx: usize,
744    /// Byte ranges of the pages to cache.
745    ranges: Vec<Range<u64>>,
746}
747
748impl PageKey {
749    /// Creates a key for a list of pages.
750    pub fn new(file_id: FileId, row_group_idx: usize, ranges: Vec<Range<u64>>) -> PageKey {
751        PageKey {
752            file_id,
753            row_group_idx,
754            ranges,
755        }
756    }
757
758    /// Returns memory used by the key (estimated).
759    fn estimated_size(&self) -> usize {
760        mem::size_of::<Self>() + mem::size_of_val(self.ranges.as_slice())
761    }
762}
763
764/// Cached row group pages for a column.
765// We don't use enum here to make it easier to mock and use the struct.
766#[derive(Default)]
767pub struct PageValue {
768    /// Compressed page in the row group.
769    pub compressed: Vec<Bytes>,
770    /// Total size of the pages (may be larger than sum of compressed bytes due to gaps).
771    pub page_size: u64,
772}
773
774impl PageValue {
775    /// Creates a new value from a range of compressed pages.
776    pub fn new(bytes: Vec<Bytes>, page_size: u64) -> PageValue {
777        PageValue {
778            compressed: bytes,
779            page_size,
780        }
781    }
782
783    /// Returns memory used by the value (estimated).
784    fn estimated_size(&self) -> usize {
785        mem::size_of::<Self>()
786            + self.page_size as usize
787            + self.compressed.iter().map(mem::size_of_val).sum::<usize>()
788    }
789}
790
791/// Cache key for time series row selector result.
792#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
793pub struct SelectorResultKey {
794    /// Id of the SST file.
795    pub file_id: FileId,
796    /// Index of the row group.
797    pub row_group_idx: usize,
798    /// Time series row selector.
799    pub selector: TimeSeriesRowSelector,
800}
801
802/// Cached result for time series row selector.
803pub struct SelectorResultValue {
804    /// Batches of rows selected by the selector.
805    pub result: Vec<Batch>,
806    /// Projection of rows.
807    pub projection: Vec<usize>,
808}
809
810impl SelectorResultValue {
811    /// Creates a new selector result value.
812    pub fn new(result: Vec<Batch>, projection: Vec<usize>) -> SelectorResultValue {
813        SelectorResultValue { result, projection }
814    }
815
816    /// Returns memory used by the value (estimated).
817    fn estimated_size(&self) -> usize {
818        // We only consider heap size of all batches.
819        self.result.iter().map(|batch| batch.memory_size()).sum()
820    }
821}
822
823/// Maps (region id, file id) to [ParquetMetaData].
824type SstMetaCache = Cache<SstMetaKey, Arc<ParquetMetaData>>;
825/// Maps [Value] to a vector that holds this value repeatedly.
826///
827/// e.g. `"hello" => ["hello", "hello", "hello"]`
828type VectorCache = Cache<(ConcreteDataType, Value), VectorRef>;
829/// Maps (region, file, row group, column) to [PageValue].
830type PageCache = Cache<PageKey, Arc<PageValue>>;
831/// Maps (file id, row group id, time series row selector) to [SelectorResultValue].
832type SelectorResultCache = Cache<SelectorResultKey, Arc<SelectorResultValue>>;
833
834#[cfg(test)]
835mod tests {
836    use std::sync::Arc;
837
838    use api::v1::index::{BloomFilterMeta, InvertedIndexMetas};
839    use datatypes::vectors::Int64Vector;
840    use puffin::file_metadata::FileMetadata;
841    use store_api::storage::ColumnId;
842
843    use super::*;
844    use crate::cache::index::bloom_filter_index::Tag;
845    use crate::cache::index::result_cache::PredicateKey;
846    use crate::cache::test_util::parquet_meta;
847    use crate::sst::parquet::row_selection::RowGroupSelection;
848
849    #[tokio::test]
850    async fn test_disable_cache() {
851        let cache = CacheManager::default();
852        assert!(cache.sst_meta_cache.is_none());
853        assert!(cache.vector_cache.is_none());
854        assert!(cache.page_cache.is_none());
855
856        let region_id = RegionId::new(1, 1);
857        let file_id = RegionFileId::new(region_id, FileId::random());
858        let metadata = parquet_meta();
859        let mut metrics = MetadataCacheMetrics::default();
860        cache.put_parquet_meta_data(file_id, metadata);
861        assert!(
862            cache
863                .get_parquet_meta_data(file_id, &mut metrics)
864                .await
865                .is_none()
866        );
867
868        let value = Value::Int64(10);
869        let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10]));
870        cache.put_repeated_vector(value.clone(), vector.clone());
871        assert!(
872            cache
873                .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
874                .is_none()
875        );
876
877        let key = PageKey::new(file_id.file_id(), 1, vec![Range { start: 0, end: 5 }]);
878        let pages = Arc::new(PageValue::default());
879        cache.put_pages(key.clone(), pages);
880        assert!(cache.get_pages(&key).is_none());
881
882        assert!(cache.write_cache().is_none());
883    }
884
885    #[tokio::test]
886    async fn test_parquet_meta_cache() {
887        let cache = CacheManager::builder().sst_meta_cache_size(2000).build();
888        let mut metrics = MetadataCacheMetrics::default();
889        let region_id = RegionId::new(1, 1);
890        let file_id = RegionFileId::new(region_id, FileId::random());
891        assert!(
892            cache
893                .get_parquet_meta_data(file_id, &mut metrics)
894                .await
895                .is_none()
896        );
897        let metadata = parquet_meta();
898        cache.put_parquet_meta_data(file_id, metadata);
899        assert!(
900            cache
901                .get_parquet_meta_data(file_id, &mut metrics)
902                .await
903                .is_some()
904        );
905        cache.remove_parquet_meta_data(file_id);
906        assert!(
907            cache
908                .get_parquet_meta_data(file_id, &mut metrics)
909                .await
910                .is_none()
911        );
912    }
913
914    #[test]
915    fn test_repeated_vector_cache() {
916        let cache = CacheManager::builder().vector_cache_size(4096).build();
917        let value = Value::Int64(10);
918        assert!(
919            cache
920                .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
921                .is_none()
922        );
923        let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10]));
924        cache.put_repeated_vector(value.clone(), vector.clone());
925        let cached = cache
926            .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
927            .unwrap();
928        assert_eq!(vector, cached);
929    }
930
931    #[test]
932    fn test_page_cache() {
933        let cache = CacheManager::builder().page_cache_size(1000).build();
934        let file_id = FileId::random();
935        let key = PageKey::new(file_id, 0, vec![(0..10), (10..20)]);
936        assert!(cache.get_pages(&key).is_none());
937        let pages = Arc::new(PageValue::default());
938        cache.put_pages(key.clone(), pages);
939        assert!(cache.get_pages(&key).is_some());
940    }
941
942    #[test]
943    fn test_selector_result_cache() {
944        let cache = CacheManager::builder()
945            .selector_result_cache_size(1000)
946            .build();
947        let file_id = FileId::random();
948        let key = SelectorResultKey {
949            file_id,
950            row_group_idx: 0,
951            selector: TimeSeriesRowSelector::LastRow,
952        };
953        assert!(cache.get_selector_result(&key).is_none());
954        let result = Arc::new(SelectorResultValue::new(Vec::new(), Vec::new()));
955        cache.put_selector_result(key, result);
956        assert!(cache.get_selector_result(&key).is_some());
957    }
958
959    #[tokio::test]
960    async fn test_evict_puffin_cache_clears_all_entries() {
961        use std::collections::{BTreeMap, HashMap};
962
963        let cache = CacheManager::builder()
964            .index_metadata_size(128)
965            .index_content_size(128)
966            .index_content_page_size(64)
967            .index_result_cache_size(128)
968            .puffin_metadata_size(128)
969            .build();
970        let cache = Arc::new(cache);
971
972        let region_id = RegionId::new(1, 1);
973        let index_id = RegionIndexId::new(RegionFileId::new(region_id, FileId::random()), 0);
974        let column_id: ColumnId = 1;
975
976        let bloom_cache = cache.bloom_filter_index_cache().unwrap().clone();
977        let inverted_cache = cache.inverted_index_cache().unwrap().clone();
978        let result_cache = cache.index_result_cache().unwrap();
979        let puffin_metadata_cache = cache.puffin_metadata_cache().unwrap().clone();
980
981        let bloom_key = (
982            index_id.file_id(),
983            index_id.version,
984            column_id,
985            Tag::Skipping,
986        );
987        bloom_cache.put_metadata(bloom_key, Arc::new(BloomFilterMeta::default()));
988        inverted_cache.put_metadata(
989            (index_id.file_id(), index_id.version),
990            Arc::new(InvertedIndexMetas::default()),
991        );
992        let predicate = PredicateKey::new_bloom(Arc::new(BTreeMap::new()));
993        let selection = Arc::new(RowGroupSelection::default());
994        result_cache.put(predicate.clone(), index_id.file_id(), selection);
995        let file_id_str = index_id.to_string();
996        let metadata = Arc::new(FileMetadata {
997            blobs: Vec::new(),
998            properties: HashMap::new(),
999        });
1000        puffin_metadata_cache.put_metadata(file_id_str.clone(), metadata);
1001
1002        assert!(bloom_cache.get_metadata(bloom_key).is_some());
1003        assert!(
1004            inverted_cache
1005                .get_metadata((index_id.file_id(), index_id.version))
1006                .is_some()
1007        );
1008        assert!(result_cache.get(&predicate, index_id.file_id()).is_some());
1009        assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_some());
1010
1011        cache.evict_puffin_cache(index_id).await;
1012
1013        assert!(bloom_cache.get_metadata(bloom_key).is_none());
1014        assert!(
1015            inverted_cache
1016                .get_metadata((index_id.file_id(), index_id.version))
1017                .is_none()
1018        );
1019        assert!(result_cache.get(&predicate, index_id.file_id()).is_none());
1020        assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_none());
1021
1022        // Refill caches and evict via CacheStrategy to ensure delegation works.
1023        bloom_cache.put_metadata(bloom_key, Arc::new(BloomFilterMeta::default()));
1024        inverted_cache.put_metadata(
1025            (index_id.file_id(), index_id.version),
1026            Arc::new(InvertedIndexMetas::default()),
1027        );
1028        result_cache.put(
1029            predicate.clone(),
1030            index_id.file_id(),
1031            Arc::new(RowGroupSelection::default()),
1032        );
1033        puffin_metadata_cache.put_metadata(
1034            file_id_str.clone(),
1035            Arc::new(FileMetadata {
1036                blobs: Vec::new(),
1037                properties: HashMap::new(),
1038            }),
1039        );
1040
1041        let strategy = CacheStrategy::EnableAll(cache.clone());
1042        strategy.evict_puffin_cache(index_id).await;
1043
1044        assert!(bloom_cache.get_metadata(bloom_key).is_none());
1045        assert!(
1046            inverted_cache
1047                .get_metadata((index_id.file_id(), index_id.version))
1048                .is_none()
1049        );
1050        assert!(result_cache.get(&predicate, index_id.file_id()).is_none());
1051        assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_none());
1052    }
1053}