mito2/
cache.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Cache for the engine.
16
17pub(crate) mod cache_size;
18
19pub(crate) mod file_cache;
20pub(crate) mod index;
21pub(crate) mod manifest_cache;
22#[cfg(test)]
23pub(crate) mod test_util;
24pub(crate) mod write_cache;
25
26use std::mem;
27use std::ops::Range;
28use std::sync::Arc;
29
30use bytes::Bytes;
31use datatypes::value::Value;
32use datatypes::vectors::VectorRef;
33use index::bloom_filter_index::{BloomFilterIndexCache, BloomFilterIndexCacheRef};
34use index::result_cache::IndexResultCache;
35use moka::notification::RemovalCause;
36use moka::sync::Cache;
37use object_store::ObjectStore;
38use parquet::file::metadata::{PageIndexPolicy, ParquetMetaData};
39use puffin::puffin_manager::cache::{PuffinMetadataCache, PuffinMetadataCacheRef};
40use store_api::storage::{ConcreteDataType, FileId, RegionId, TimeSeriesRowSelector};
41
42use crate::cache::cache_size::parquet_meta_size;
43use crate::cache::file_cache::{FileType, IndexKey};
44use crate::cache::index::inverted_index::{InvertedIndexCache, InvertedIndexCacheRef};
45#[cfg(feature = "vector_index")]
46use crate::cache::index::vector_index::{VectorIndexCache, VectorIndexCacheRef};
47use crate::cache::write_cache::WriteCacheRef;
48use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS};
49use crate::read::Batch;
50use crate::sst::file::{RegionFileId, RegionIndexId};
51use crate::sst::parquet::reader::MetadataCacheMetrics;
52
53/// Metrics type key for sst meta.
54const SST_META_TYPE: &str = "sst_meta";
55/// Metrics type key for vector.
56const VECTOR_TYPE: &str = "vector";
57/// Metrics type key for pages.
58const PAGE_TYPE: &str = "page";
59/// Metrics type key for files on the local store.
60const FILE_TYPE: &str = "file";
61/// Metrics type key for index files (puffin) on the local store.
62const INDEX_TYPE: &str = "index";
63/// Metrics type key for selector result cache.
64const SELECTOR_RESULT_TYPE: &str = "selector_result";
65
66/// Cache strategies that may only enable a subset of caches.
67#[derive(Clone)]
68pub enum CacheStrategy {
69    /// Strategy for normal operations.
70    /// Doesn't disable any cache.
71    EnableAll(CacheManagerRef),
72    /// Strategy for compaction.
73    /// Disables some caches during compaction to avoid affecting queries.
74    /// Enables the write cache so that the compaction can read files cached
75    /// in the write cache and write the compacted files back to the write cache.
76    Compaction(CacheManagerRef),
77    /// Do not use any cache.
78    Disabled,
79}
80
81impl CacheStrategy {
82    /// Gets parquet metadata with cache metrics tracking.
83    /// Returns the metadata and updates the provided metrics.
84    pub(crate) async fn get_parquet_meta_data(
85        &self,
86        file_id: RegionFileId,
87        metrics: &mut MetadataCacheMetrics,
88        page_index_policy: PageIndexPolicy,
89    ) -> Option<Arc<ParquetMetaData>> {
90        match self {
91            CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => {
92                cache_manager
93                    .get_parquet_meta_data(file_id, metrics, page_index_policy)
94                    .await
95            }
96            CacheStrategy::Disabled => {
97                metrics.cache_miss += 1;
98                None
99            }
100        }
101    }
102
103    /// Calls [CacheManager::get_parquet_meta_data_from_mem_cache()].
104    pub fn get_parquet_meta_data_from_mem_cache(
105        &self,
106        file_id: RegionFileId,
107    ) -> Option<Arc<ParquetMetaData>> {
108        match self {
109            CacheStrategy::EnableAll(cache_manager) => {
110                cache_manager.get_parquet_meta_data_from_mem_cache(file_id)
111            }
112            CacheStrategy::Compaction(cache_manager) => {
113                cache_manager.get_parquet_meta_data_from_mem_cache(file_id)
114            }
115            CacheStrategy::Disabled => None,
116        }
117    }
118
119    /// Calls [CacheManager::put_parquet_meta_data()].
120    pub fn put_parquet_meta_data(&self, file_id: RegionFileId, metadata: Arc<ParquetMetaData>) {
121        match self {
122            CacheStrategy::EnableAll(cache_manager) => {
123                cache_manager.put_parquet_meta_data(file_id, metadata);
124            }
125            CacheStrategy::Compaction(cache_manager) => {
126                cache_manager.put_parquet_meta_data(file_id, metadata);
127            }
128            CacheStrategy::Disabled => {}
129        }
130    }
131
132    /// Calls [CacheManager::remove_parquet_meta_data()].
133    pub fn remove_parquet_meta_data(&self, file_id: RegionFileId) {
134        match self {
135            CacheStrategy::EnableAll(cache_manager) => {
136                cache_manager.remove_parquet_meta_data(file_id);
137            }
138            CacheStrategy::Compaction(cache_manager) => {
139                cache_manager.remove_parquet_meta_data(file_id);
140            }
141            CacheStrategy::Disabled => {}
142        }
143    }
144
145    /// Calls [CacheManager::get_repeated_vector()].
146    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
147    pub fn get_repeated_vector(
148        &self,
149        data_type: &ConcreteDataType,
150        value: &Value,
151    ) -> Option<VectorRef> {
152        match self {
153            CacheStrategy::EnableAll(cache_manager) => {
154                cache_manager.get_repeated_vector(data_type, value)
155            }
156            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
157        }
158    }
159
160    /// Calls [CacheManager::put_repeated_vector()].
161    /// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
162    pub fn put_repeated_vector(&self, value: Value, vector: VectorRef) {
163        if let CacheStrategy::EnableAll(cache_manager) = self {
164            cache_manager.put_repeated_vector(value, vector);
165        }
166    }
167
168    /// Calls [CacheManager::get_pages()].
169    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
170    pub fn get_pages(&self, page_key: &PageKey) -> Option<Arc<PageValue>> {
171        match self {
172            CacheStrategy::EnableAll(cache_manager) => cache_manager.get_pages(page_key),
173            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
174        }
175    }
176
177    /// Calls [CacheManager::put_pages()].
178    /// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
179    pub fn put_pages(&self, page_key: PageKey, pages: Arc<PageValue>) {
180        if let CacheStrategy::EnableAll(cache_manager) = self {
181            cache_manager.put_pages(page_key, pages);
182        }
183    }
184
185    /// Calls [CacheManager::evict_puffin_cache()].
186    pub async fn evict_puffin_cache(&self, file_id: RegionIndexId) {
187        match self {
188            CacheStrategy::EnableAll(cache_manager) => {
189                cache_manager.evict_puffin_cache(file_id).await
190            }
191            CacheStrategy::Compaction(cache_manager) => {
192                cache_manager.evict_puffin_cache(file_id).await
193            }
194            CacheStrategy::Disabled => {}
195        }
196    }
197
198    /// Calls [CacheManager::get_selector_result()].
199    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
200    pub fn get_selector_result(
201        &self,
202        selector_key: &SelectorResultKey,
203    ) -> Option<Arc<SelectorResultValue>> {
204        match self {
205            CacheStrategy::EnableAll(cache_manager) => {
206                cache_manager.get_selector_result(selector_key)
207            }
208            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
209        }
210    }
211
212    /// Calls [CacheManager::put_selector_result()].
213    /// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
214    pub fn put_selector_result(
215        &self,
216        selector_key: SelectorResultKey,
217        result: Arc<SelectorResultValue>,
218    ) {
219        if let CacheStrategy::EnableAll(cache_manager) = self {
220            cache_manager.put_selector_result(selector_key, result);
221        }
222    }
223
224    /// Calls [CacheManager::write_cache()].
225    /// It returns None if the strategy is [CacheStrategy::Disabled].
226    pub fn write_cache(&self) -> Option<&WriteCacheRef> {
227        match self {
228            CacheStrategy::EnableAll(cache_manager) => cache_manager.write_cache(),
229            CacheStrategy::Compaction(cache_manager) => cache_manager.write_cache(),
230            CacheStrategy::Disabled => None,
231        }
232    }
233
234    /// Calls [CacheManager::index_cache()].
235    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
236    pub fn inverted_index_cache(&self) -> Option<&InvertedIndexCacheRef> {
237        match self {
238            CacheStrategy::EnableAll(cache_manager) => cache_manager.inverted_index_cache(),
239            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
240        }
241    }
242
243    /// Calls [CacheManager::bloom_filter_index_cache()].
244    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
245    pub fn bloom_filter_index_cache(&self) -> Option<&BloomFilterIndexCacheRef> {
246        match self {
247            CacheStrategy::EnableAll(cache_manager) => cache_manager.bloom_filter_index_cache(),
248            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
249        }
250    }
251
252    /// Calls [CacheManager::vector_index_cache()].
253    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
254    #[cfg(feature = "vector_index")]
255    pub fn vector_index_cache(&self) -> Option<&VectorIndexCacheRef> {
256        match self {
257            CacheStrategy::EnableAll(cache_manager) => cache_manager.vector_index_cache(),
258            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
259        }
260    }
261
262    /// Calls [CacheManager::puffin_metadata_cache()].
263    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
264    pub fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
265        match self {
266            CacheStrategy::EnableAll(cache_manager) => cache_manager.puffin_metadata_cache(),
267            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
268        }
269    }
270
271    /// Calls [CacheManager::index_result_cache()].
272    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
273    pub fn index_result_cache(&self) -> Option<&IndexResultCache> {
274        match self {
275            CacheStrategy::EnableAll(cache_manager) => cache_manager.index_result_cache(),
276            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
277        }
278    }
279
280    /// Triggers download if the strategy is [CacheStrategy::EnableAll] and write cache is available.
281    pub fn maybe_download_background(
282        &self,
283        index_key: IndexKey,
284        remote_path: String,
285        remote_store: ObjectStore,
286        file_size: u64,
287    ) {
288        if let CacheStrategy::EnableAll(cache_manager) = self
289            && let Some(write_cache) = cache_manager.write_cache()
290        {
291            write_cache.file_cache().maybe_download_background(
292                index_key,
293                remote_path,
294                remote_store,
295                file_size,
296            );
297        }
298    }
299}
300
301/// Manages cached data for the engine.
302///
303/// All caches are disabled by default.
304#[derive(Default)]
305pub struct CacheManager {
306    /// Cache for SST metadata.
307    sst_meta_cache: Option<SstMetaCache>,
308    /// Cache for vectors.
309    vector_cache: Option<VectorCache>,
310    /// Cache for SST pages.
311    page_cache: Option<PageCache>,
312    /// A Cache for writing files to object stores.
313    write_cache: Option<WriteCacheRef>,
314    /// Cache for inverted index.
315    inverted_index_cache: Option<InvertedIndexCacheRef>,
316    /// Cache for bloom filter index.
317    bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
318    /// Cache for vector index.
319    #[cfg(feature = "vector_index")]
320    vector_index_cache: Option<VectorIndexCacheRef>,
321    /// Puffin metadata cache.
322    puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
323    /// Cache for time series selectors.
324    selector_result_cache: Option<SelectorResultCache>,
325    /// Cache for index result.
326    index_result_cache: Option<IndexResultCache>,
327}
328
329pub type CacheManagerRef = Arc<CacheManager>;
330
331impl CacheManager {
332    /// Returns a builder to build the cache.
333    pub fn builder() -> CacheManagerBuilder {
334        CacheManagerBuilder::default()
335    }
336
337    /// Gets cached [ParquetMetaData] with metrics tracking.
338    /// Tries in-memory cache first, then file cache, updating metrics accordingly.
339    pub(crate) async fn get_parquet_meta_data(
340        &self,
341        file_id: RegionFileId,
342        metrics: &mut MetadataCacheMetrics,
343        page_index_policy: PageIndexPolicy,
344    ) -> Option<Arc<ParquetMetaData>> {
345        // Try to get metadata from sst meta cache
346        if let Some(metadata) = self.get_parquet_meta_data_from_mem_cache(file_id) {
347            metrics.mem_cache_hit += 1;
348            return Some(metadata);
349        }
350
351        // Try to get metadata from write cache
352        let key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Parquet);
353        if let Some(write_cache) = &self.write_cache
354            && let Some(metadata) = write_cache
355                .file_cache()
356                .get_parquet_meta_data(key, metrics, page_index_policy)
357                .await
358        {
359            metrics.file_cache_hit += 1;
360            let metadata = Arc::new(metadata);
361            // Put metadata into sst meta cache
362            self.put_parquet_meta_data(file_id, metadata.clone());
363            return Some(metadata);
364        };
365        metrics.cache_miss += 1;
366
367        None
368    }
369
370    /// Gets cached [ParquetMetaData] from in-memory cache.
371    /// This method does not perform I/O.
372    pub fn get_parquet_meta_data_from_mem_cache(
373        &self,
374        file_id: RegionFileId,
375    ) -> Option<Arc<ParquetMetaData>> {
376        // Try to get metadata from sst meta cache
377        self.sst_meta_cache.as_ref().and_then(|sst_meta_cache| {
378            let value = sst_meta_cache.get(&SstMetaKey(file_id.region_id(), file_id.file_id()));
379            update_hit_miss(value, SST_META_TYPE)
380        })
381    }
382
383    /// Puts [ParquetMetaData] into the cache.
384    pub fn put_parquet_meta_data(&self, file_id: RegionFileId, metadata: Arc<ParquetMetaData>) {
385        if let Some(cache) = &self.sst_meta_cache {
386            let key = SstMetaKey(file_id.region_id(), file_id.file_id());
387            CACHE_BYTES
388                .with_label_values(&[SST_META_TYPE])
389                .add(meta_cache_weight(&key, &metadata).into());
390            cache.insert(key, metadata);
391        }
392    }
393
394    /// Removes [ParquetMetaData] from the cache.
395    pub fn remove_parquet_meta_data(&self, file_id: RegionFileId) {
396        if let Some(cache) = &self.sst_meta_cache {
397            cache.remove(&SstMetaKey(file_id.region_id(), file_id.file_id()));
398        }
399    }
400
401    /// Gets a vector with repeated value for specific `key`.
402    pub fn get_repeated_vector(
403        &self,
404        data_type: &ConcreteDataType,
405        value: &Value,
406    ) -> Option<VectorRef> {
407        self.vector_cache.as_ref().and_then(|vector_cache| {
408            let value = vector_cache.get(&(data_type.clone(), value.clone()));
409            update_hit_miss(value, VECTOR_TYPE)
410        })
411    }
412
413    /// Puts a vector with repeated value into the cache.
414    pub fn put_repeated_vector(&self, value: Value, vector: VectorRef) {
415        if let Some(cache) = &self.vector_cache {
416            let key = (vector.data_type(), value);
417            CACHE_BYTES
418                .with_label_values(&[VECTOR_TYPE])
419                .add(vector_cache_weight(&key, &vector).into());
420            cache.insert(key, vector);
421        }
422    }
423
424    /// Gets pages for the row group.
425    pub fn get_pages(&self, page_key: &PageKey) -> Option<Arc<PageValue>> {
426        self.page_cache.as_ref().and_then(|page_cache| {
427            let value = page_cache.get(page_key);
428            update_hit_miss(value, PAGE_TYPE)
429        })
430    }
431
432    /// Puts pages of the row group into the cache.
433    pub fn put_pages(&self, page_key: PageKey, pages: Arc<PageValue>) {
434        if let Some(cache) = &self.page_cache {
435            CACHE_BYTES
436                .with_label_values(&[PAGE_TYPE])
437                .add(page_cache_weight(&page_key, &pages).into());
438            cache.insert(page_key, pages);
439        }
440    }
441
442    /// Evicts every puffin-related cache entry for the given file.
443    pub async fn evict_puffin_cache(&self, file_id: RegionIndexId) {
444        if let Some(cache) = &self.bloom_filter_index_cache {
445            cache.invalidate_file(file_id.file_id());
446        }
447
448        if let Some(cache) = &self.inverted_index_cache {
449            cache.invalidate_file(file_id.file_id());
450        }
451
452        if let Some(cache) = &self.index_result_cache {
453            cache.invalidate_file(file_id.file_id());
454        }
455
456        #[cfg(feature = "vector_index")]
457        if let Some(cache) = &self.vector_index_cache {
458            cache.invalidate_file(file_id.file_id());
459        }
460
461        if let Some(cache) = &self.puffin_metadata_cache {
462            cache.remove(&file_id.to_string());
463        }
464
465        if let Some(write_cache) = &self.write_cache {
466            write_cache
467                .remove(IndexKey::new(
468                    file_id.region_id(),
469                    file_id.file_id(),
470                    FileType::Puffin(file_id.version),
471                ))
472                .await;
473        }
474    }
475
476    /// Gets result of for the selector.
477    pub fn get_selector_result(
478        &self,
479        selector_key: &SelectorResultKey,
480    ) -> Option<Arc<SelectorResultValue>> {
481        self.selector_result_cache
482            .as_ref()
483            .and_then(|selector_result_cache| selector_result_cache.get(selector_key))
484    }
485
486    /// Puts result of the selector into the cache.
487    pub fn put_selector_result(
488        &self,
489        selector_key: SelectorResultKey,
490        result: Arc<SelectorResultValue>,
491    ) {
492        if let Some(cache) = &self.selector_result_cache {
493            CACHE_BYTES
494                .with_label_values(&[SELECTOR_RESULT_TYPE])
495                .add(selector_result_cache_weight(&selector_key, &result).into());
496            cache.insert(selector_key, result);
497        }
498    }
499
500    /// Gets the write cache.
501    pub(crate) fn write_cache(&self) -> Option<&WriteCacheRef> {
502        self.write_cache.as_ref()
503    }
504
505    pub(crate) fn inverted_index_cache(&self) -> Option<&InvertedIndexCacheRef> {
506        self.inverted_index_cache.as_ref()
507    }
508
509    pub(crate) fn bloom_filter_index_cache(&self) -> Option<&BloomFilterIndexCacheRef> {
510        self.bloom_filter_index_cache.as_ref()
511    }
512
513    #[cfg(feature = "vector_index")]
514    pub(crate) fn vector_index_cache(&self) -> Option<&VectorIndexCacheRef> {
515        self.vector_index_cache.as_ref()
516    }
517
518    pub(crate) fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
519        self.puffin_metadata_cache.as_ref()
520    }
521
522    pub(crate) fn index_result_cache(&self) -> Option<&IndexResultCache> {
523        self.index_result_cache.as_ref()
524    }
525}
526
527/// Increases selector cache miss metrics.
528pub fn selector_result_cache_miss() {
529    CACHE_MISS.with_label_values(&[SELECTOR_RESULT_TYPE]).inc()
530}
531
532/// Increases selector cache hit metrics.
533pub fn selector_result_cache_hit() {
534    CACHE_HIT.with_label_values(&[SELECTOR_RESULT_TYPE]).inc()
535}
536
537/// Builder to construct a [CacheManager].
538#[derive(Default)]
539pub struct CacheManagerBuilder {
540    sst_meta_cache_size: u64,
541    vector_cache_size: u64,
542    page_cache_size: u64,
543    index_metadata_size: u64,
544    index_content_size: u64,
545    index_content_page_size: u64,
546    index_result_cache_size: u64,
547    puffin_metadata_size: u64,
548    write_cache: Option<WriteCacheRef>,
549    selector_result_cache_size: u64,
550}
551
552impl CacheManagerBuilder {
553    /// Sets meta cache size.
554    pub fn sst_meta_cache_size(mut self, bytes: u64) -> Self {
555        self.sst_meta_cache_size = bytes;
556        self
557    }
558
559    /// Sets vector cache size.
560    pub fn vector_cache_size(mut self, bytes: u64) -> Self {
561        self.vector_cache_size = bytes;
562        self
563    }
564
565    /// Sets page cache size.
566    pub fn page_cache_size(mut self, bytes: u64) -> Self {
567        self.page_cache_size = bytes;
568        self
569    }
570
571    /// Sets write cache.
572    pub fn write_cache(mut self, cache: Option<WriteCacheRef>) -> Self {
573        self.write_cache = cache;
574        self
575    }
576
577    /// Sets cache size for index metadata.
578    pub fn index_metadata_size(mut self, bytes: u64) -> Self {
579        self.index_metadata_size = bytes;
580        self
581    }
582
583    /// Sets cache size for index content.
584    pub fn index_content_size(mut self, bytes: u64) -> Self {
585        self.index_content_size = bytes;
586        self
587    }
588
589    /// Sets page size for index content.
590    pub fn index_content_page_size(mut self, bytes: u64) -> Self {
591        self.index_content_page_size = bytes;
592        self
593    }
594
595    /// Sets cache size for index result.
596    pub fn index_result_cache_size(mut self, bytes: u64) -> Self {
597        self.index_result_cache_size = bytes;
598        self
599    }
600
601    /// Sets cache size for puffin metadata.
602    pub fn puffin_metadata_size(mut self, bytes: u64) -> Self {
603        self.puffin_metadata_size = bytes;
604        self
605    }
606
607    /// Sets selector result cache size.
608    pub fn selector_result_cache_size(mut self, bytes: u64) -> Self {
609        self.selector_result_cache_size = bytes;
610        self
611    }
612
613    /// Builds the [CacheManager].
614    pub fn build(self) -> CacheManager {
615        fn to_str(cause: RemovalCause) -> &'static str {
616            match cause {
617                RemovalCause::Expired => "expired",
618                RemovalCause::Explicit => "explicit",
619                RemovalCause::Replaced => "replaced",
620                RemovalCause::Size => "size",
621            }
622        }
623
624        let sst_meta_cache = (self.sst_meta_cache_size != 0).then(|| {
625            Cache::builder()
626                .max_capacity(self.sst_meta_cache_size)
627                .weigher(meta_cache_weight)
628                .eviction_listener(|k, v, cause| {
629                    let size = meta_cache_weight(&k, &v);
630                    CACHE_BYTES
631                        .with_label_values(&[SST_META_TYPE])
632                        .sub(size.into());
633                    CACHE_EVICTION
634                        .with_label_values(&[SST_META_TYPE, to_str(cause)])
635                        .inc();
636                })
637                .build()
638        });
639        let vector_cache = (self.vector_cache_size != 0).then(|| {
640            Cache::builder()
641                .max_capacity(self.vector_cache_size)
642                .weigher(vector_cache_weight)
643                .eviction_listener(|k, v, cause| {
644                    let size = vector_cache_weight(&k, &v);
645                    CACHE_BYTES
646                        .with_label_values(&[VECTOR_TYPE])
647                        .sub(size.into());
648                    CACHE_EVICTION
649                        .with_label_values(&[VECTOR_TYPE, to_str(cause)])
650                        .inc();
651                })
652                .build()
653        });
654        let page_cache = (self.page_cache_size != 0).then(|| {
655            Cache::builder()
656                .max_capacity(self.page_cache_size)
657                .weigher(page_cache_weight)
658                .eviction_listener(|k, v, cause| {
659                    let size = page_cache_weight(&k, &v);
660                    CACHE_BYTES.with_label_values(&[PAGE_TYPE]).sub(size.into());
661                    CACHE_EVICTION
662                        .with_label_values(&[PAGE_TYPE, to_str(cause)])
663                        .inc();
664                })
665                .build()
666        });
667        let inverted_index_cache = InvertedIndexCache::new(
668            self.index_metadata_size,
669            self.index_content_size,
670            self.index_content_page_size,
671        );
672        // TODO(ruihang): check if it's ok to reuse the same param with inverted index
673        let bloom_filter_index_cache = BloomFilterIndexCache::new(
674            self.index_metadata_size,
675            self.index_content_size,
676            self.index_content_page_size,
677        );
678        #[cfg(feature = "vector_index")]
679        let vector_index_cache = (self.index_content_size != 0)
680            .then(|| Arc::new(VectorIndexCache::new(self.index_content_size)));
681        let index_result_cache = (self.index_result_cache_size != 0)
682            .then(|| IndexResultCache::new(self.index_result_cache_size));
683        let puffin_metadata_cache =
684            PuffinMetadataCache::new(self.puffin_metadata_size, &CACHE_BYTES);
685        let selector_result_cache = (self.selector_result_cache_size != 0).then(|| {
686            Cache::builder()
687                .max_capacity(self.selector_result_cache_size)
688                .weigher(selector_result_cache_weight)
689                .eviction_listener(|k, v, cause| {
690                    let size = selector_result_cache_weight(&k, &v);
691                    CACHE_BYTES
692                        .with_label_values(&[SELECTOR_RESULT_TYPE])
693                        .sub(size.into());
694                    CACHE_EVICTION
695                        .with_label_values(&[SELECTOR_RESULT_TYPE, to_str(cause)])
696                        .inc();
697                })
698                .build()
699        });
700        CacheManager {
701            sst_meta_cache,
702            vector_cache,
703            page_cache,
704            write_cache: self.write_cache,
705            inverted_index_cache: Some(Arc::new(inverted_index_cache)),
706            bloom_filter_index_cache: Some(Arc::new(bloom_filter_index_cache)),
707            #[cfg(feature = "vector_index")]
708            vector_index_cache,
709            puffin_metadata_cache: Some(Arc::new(puffin_metadata_cache)),
710            selector_result_cache,
711            index_result_cache,
712        }
713    }
714}
715
716fn meta_cache_weight(k: &SstMetaKey, v: &Arc<ParquetMetaData>) -> u32 {
717    // We ignore the size of `Arc`.
718    (k.estimated_size() + parquet_meta_size(v)) as u32
719}
720
721fn vector_cache_weight(_k: &(ConcreteDataType, Value), v: &VectorRef) -> u32 {
722    // We ignore the heap size of `Value`.
723    (mem::size_of::<ConcreteDataType>() + mem::size_of::<Value>() + v.memory_size()) as u32
724}
725
726fn page_cache_weight(k: &PageKey, v: &Arc<PageValue>) -> u32 {
727    (k.estimated_size() + v.estimated_size()) as u32
728}
729
730fn selector_result_cache_weight(k: &SelectorResultKey, v: &Arc<SelectorResultValue>) -> u32 {
731    (mem::size_of_val(k) + v.estimated_size()) as u32
732}
733
734/// Updates cache hit/miss metrics.
735fn update_hit_miss<T>(value: Option<T>, cache_type: &str) -> Option<T> {
736    if value.is_some() {
737        CACHE_HIT.with_label_values(&[cache_type]).inc();
738    } else {
739        CACHE_MISS.with_label_values(&[cache_type]).inc();
740    }
741    value
742}
743
744/// Cache key (region id, file id) for SST meta.
745#[derive(Debug, Clone, PartialEq, Eq, Hash)]
746struct SstMetaKey(RegionId, FileId);
747
748impl SstMetaKey {
749    /// Returns memory used by the key (estimated).
750    fn estimated_size(&self) -> usize {
751        mem::size_of::<Self>()
752    }
753}
754
755/// Path to column pages in the SST file.
756#[derive(Debug, Clone, PartialEq, Eq, Hash)]
757pub struct ColumnPagePath {
758    /// Region id of the SST file to cache.
759    region_id: RegionId,
760    /// Id of the SST file to cache.
761    file_id: FileId,
762    /// Index of the row group.
763    row_group_idx: usize,
764    /// Index of the column in the row group.
765    column_idx: usize,
766}
767
768/// Cache key to pages in a row group (after projection).
769///
770/// Different projections will have different cache keys.
771/// We cache all ranges together because they may refer to the same `Bytes`.
772#[derive(Debug, Clone, PartialEq, Eq, Hash)]
773pub struct PageKey {
774    /// Id of the SST file to cache.
775    file_id: FileId,
776    /// Index of the row group.
777    row_group_idx: usize,
778    /// Byte ranges of the pages to cache.
779    ranges: Vec<Range<u64>>,
780}
781
782impl PageKey {
783    /// Creates a key for a list of pages.
784    pub fn new(file_id: FileId, row_group_idx: usize, ranges: Vec<Range<u64>>) -> PageKey {
785        PageKey {
786            file_id,
787            row_group_idx,
788            ranges,
789        }
790    }
791
792    /// Returns memory used by the key (estimated).
793    fn estimated_size(&self) -> usize {
794        mem::size_of::<Self>() + mem::size_of_val(self.ranges.as_slice())
795    }
796}
797
798/// Cached row group pages for a column.
799// We don't use enum here to make it easier to mock and use the struct.
800#[derive(Default)]
801pub struct PageValue {
802    /// Compressed page in the row group.
803    pub compressed: Vec<Bytes>,
804    /// Total size of the pages (may be larger than sum of compressed bytes due to gaps).
805    pub page_size: u64,
806}
807
808impl PageValue {
809    /// Creates a new value from a range of compressed pages.
810    pub fn new(bytes: Vec<Bytes>, page_size: u64) -> PageValue {
811        PageValue {
812            compressed: bytes,
813            page_size,
814        }
815    }
816
817    /// Returns memory used by the value (estimated).
818    fn estimated_size(&self) -> usize {
819        mem::size_of::<Self>()
820            + self.page_size as usize
821            + self.compressed.iter().map(mem::size_of_val).sum::<usize>()
822    }
823}
824
825/// Cache key for time series row selector result.
826#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
827pub struct SelectorResultKey {
828    /// Id of the SST file.
829    pub file_id: FileId,
830    /// Index of the row group.
831    pub row_group_idx: usize,
832    /// Time series row selector.
833    pub selector: TimeSeriesRowSelector,
834}
835
836/// Cached result for time series row selector.
837pub struct SelectorResultValue {
838    /// Batches of rows selected by the selector.
839    pub result: Vec<Batch>,
840    /// Projection of rows.
841    pub projection: Vec<usize>,
842}
843
844impl SelectorResultValue {
845    /// Creates a new selector result value.
846    pub fn new(result: Vec<Batch>, projection: Vec<usize>) -> SelectorResultValue {
847        SelectorResultValue { result, projection }
848    }
849
850    /// Returns memory used by the value (estimated).
851    fn estimated_size(&self) -> usize {
852        // We only consider heap size of all batches.
853        self.result.iter().map(|batch| batch.memory_size()).sum()
854    }
855}
856
857/// Maps (region id, file id) to [ParquetMetaData].
858type SstMetaCache = Cache<SstMetaKey, Arc<ParquetMetaData>>;
859/// Maps [Value] to a vector that holds this value repeatedly.
860///
861/// e.g. `"hello" => ["hello", "hello", "hello"]`
862type VectorCache = Cache<(ConcreteDataType, Value), VectorRef>;
863/// Maps (region, file, row group, column) to [PageValue].
864type PageCache = Cache<PageKey, Arc<PageValue>>;
865/// Maps (file id, row group id, time series row selector) to [SelectorResultValue].
866type SelectorResultCache = Cache<SelectorResultKey, Arc<SelectorResultValue>>;
867
868#[cfg(test)]
869mod tests {
870    use std::sync::Arc;
871
872    use api::v1::index::{BloomFilterMeta, InvertedIndexMetas};
873    use datatypes::vectors::Int64Vector;
874    use puffin::file_metadata::FileMetadata;
875    use store_api::storage::ColumnId;
876
877    use super::*;
878    use crate::cache::index::bloom_filter_index::Tag;
879    use crate::cache::index::result_cache::PredicateKey;
880    use crate::cache::test_util::parquet_meta;
881    use crate::sst::parquet::row_selection::RowGroupSelection;
882
883    #[tokio::test]
884    async fn test_disable_cache() {
885        let cache = CacheManager::default();
886        assert!(cache.sst_meta_cache.is_none());
887        assert!(cache.vector_cache.is_none());
888        assert!(cache.page_cache.is_none());
889
890        let region_id = RegionId::new(1, 1);
891        let file_id = RegionFileId::new(region_id, FileId::random());
892        let metadata = parquet_meta();
893        let mut metrics = MetadataCacheMetrics::default();
894        cache.put_parquet_meta_data(file_id, metadata);
895        assert!(
896            cache
897                .get_parquet_meta_data(file_id, &mut metrics, Default::default())
898                .await
899                .is_none()
900        );
901
902        let value = Value::Int64(10);
903        let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10]));
904        cache.put_repeated_vector(value.clone(), vector.clone());
905        assert!(
906            cache
907                .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
908                .is_none()
909        );
910
911        let key = PageKey::new(file_id.file_id(), 1, vec![Range { start: 0, end: 5 }]);
912        let pages = Arc::new(PageValue::default());
913        cache.put_pages(key.clone(), pages);
914        assert!(cache.get_pages(&key).is_none());
915
916        assert!(cache.write_cache().is_none());
917    }
918
919    #[tokio::test]
920    async fn test_parquet_meta_cache() {
921        let cache = CacheManager::builder().sst_meta_cache_size(2000).build();
922        let mut metrics = MetadataCacheMetrics::default();
923        let region_id = RegionId::new(1, 1);
924        let file_id = RegionFileId::new(region_id, FileId::random());
925        assert!(
926            cache
927                .get_parquet_meta_data(file_id, &mut metrics, Default::default())
928                .await
929                .is_none()
930        );
931        let metadata = parquet_meta();
932        cache.put_parquet_meta_data(file_id, metadata);
933        assert!(
934            cache
935                .get_parquet_meta_data(file_id, &mut metrics, Default::default())
936                .await
937                .is_some()
938        );
939        cache.remove_parquet_meta_data(file_id);
940        assert!(
941            cache
942                .get_parquet_meta_data(file_id, &mut metrics, Default::default())
943                .await
944                .is_none()
945        );
946    }
947
948    #[test]
949    fn test_repeated_vector_cache() {
950        let cache = CacheManager::builder().vector_cache_size(4096).build();
951        let value = Value::Int64(10);
952        assert!(
953            cache
954                .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
955                .is_none()
956        );
957        let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10]));
958        cache.put_repeated_vector(value.clone(), vector.clone());
959        let cached = cache
960            .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
961            .unwrap();
962        assert_eq!(vector, cached);
963    }
964
965    #[test]
966    fn test_page_cache() {
967        let cache = CacheManager::builder().page_cache_size(1000).build();
968        let file_id = FileId::random();
969        let key = PageKey::new(file_id, 0, vec![(0..10), (10..20)]);
970        assert!(cache.get_pages(&key).is_none());
971        let pages = Arc::new(PageValue::default());
972        cache.put_pages(key.clone(), pages);
973        assert!(cache.get_pages(&key).is_some());
974    }
975
976    #[test]
977    fn test_selector_result_cache() {
978        let cache = CacheManager::builder()
979            .selector_result_cache_size(1000)
980            .build();
981        let file_id = FileId::random();
982        let key = SelectorResultKey {
983            file_id,
984            row_group_idx: 0,
985            selector: TimeSeriesRowSelector::LastRow,
986        };
987        assert!(cache.get_selector_result(&key).is_none());
988        let result = Arc::new(SelectorResultValue::new(Vec::new(), Vec::new()));
989        cache.put_selector_result(key, result);
990        assert!(cache.get_selector_result(&key).is_some());
991    }
992
993    #[tokio::test]
994    async fn test_evict_puffin_cache_clears_all_entries() {
995        use std::collections::{BTreeMap, HashMap};
996
997        let cache = CacheManager::builder()
998            .index_metadata_size(128)
999            .index_content_size(128)
1000            .index_content_page_size(64)
1001            .index_result_cache_size(128)
1002            .puffin_metadata_size(128)
1003            .build();
1004        let cache = Arc::new(cache);
1005
1006        let region_id = RegionId::new(1, 1);
1007        let index_id = RegionIndexId::new(RegionFileId::new(region_id, FileId::random()), 0);
1008        let column_id: ColumnId = 1;
1009
1010        let bloom_cache = cache.bloom_filter_index_cache().unwrap().clone();
1011        let inverted_cache = cache.inverted_index_cache().unwrap().clone();
1012        let result_cache = cache.index_result_cache().unwrap();
1013        let puffin_metadata_cache = cache.puffin_metadata_cache().unwrap().clone();
1014
1015        let bloom_key = (
1016            index_id.file_id(),
1017            index_id.version,
1018            column_id,
1019            Tag::Skipping,
1020        );
1021        bloom_cache.put_metadata(bloom_key, Arc::new(BloomFilterMeta::default()));
1022        inverted_cache.put_metadata(
1023            (index_id.file_id(), index_id.version),
1024            Arc::new(InvertedIndexMetas::default()),
1025        );
1026        let predicate = PredicateKey::new_bloom(Arc::new(BTreeMap::new()));
1027        let selection = Arc::new(RowGroupSelection::default());
1028        result_cache.put(predicate.clone(), index_id.file_id(), selection);
1029        let file_id_str = index_id.to_string();
1030        let metadata = Arc::new(FileMetadata {
1031            blobs: Vec::new(),
1032            properties: HashMap::new(),
1033        });
1034        puffin_metadata_cache.put_metadata(file_id_str.clone(), metadata);
1035
1036        assert!(bloom_cache.get_metadata(bloom_key).is_some());
1037        assert!(
1038            inverted_cache
1039                .get_metadata((index_id.file_id(), index_id.version))
1040                .is_some()
1041        );
1042        assert!(result_cache.get(&predicate, index_id.file_id()).is_some());
1043        assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_some());
1044
1045        cache.evict_puffin_cache(index_id).await;
1046
1047        assert!(bloom_cache.get_metadata(bloom_key).is_none());
1048        assert!(
1049            inverted_cache
1050                .get_metadata((index_id.file_id(), index_id.version))
1051                .is_none()
1052        );
1053        assert!(result_cache.get(&predicate, index_id.file_id()).is_none());
1054        assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_none());
1055
1056        // Refill caches and evict via CacheStrategy to ensure delegation works.
1057        bloom_cache.put_metadata(bloom_key, Arc::new(BloomFilterMeta::default()));
1058        inverted_cache.put_metadata(
1059            (index_id.file_id(), index_id.version),
1060            Arc::new(InvertedIndexMetas::default()),
1061        );
1062        result_cache.put(
1063            predicate.clone(),
1064            index_id.file_id(),
1065            Arc::new(RowGroupSelection::default()),
1066        );
1067        puffin_metadata_cache.put_metadata(
1068            file_id_str.clone(),
1069            Arc::new(FileMetadata {
1070                blobs: Vec::new(),
1071                properties: HashMap::new(),
1072            }),
1073        );
1074
1075        let strategy = CacheStrategy::EnableAll(cache.clone());
1076        strategy.evict_puffin_cache(index_id).await;
1077
1078        assert!(bloom_cache.get_metadata(bloom_key).is_none());
1079        assert!(
1080            inverted_cache
1081                .get_metadata((index_id.file_id(), index_id.version))
1082                .is_none()
1083        );
1084        assert!(result_cache.get(&predicate, index_id.file_id()).is_none());
1085        assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_none());
1086    }
1087}