mito2/
cache.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Cache for the engine.
16
17mod cache_size;
18
19pub(crate) mod file_cache;
20pub(crate) mod index;
21#[cfg(test)]
22pub(crate) mod test_util;
23pub(crate) mod write_cache;
24
25use std::mem;
26use std::ops::Range;
27use std::sync::Arc;
28
29use bytes::Bytes;
30use datatypes::value::Value;
31use datatypes::vectors::VectorRef;
32use index::bloom_filter_index::{BloomFilterIndexCache, BloomFilterIndexCacheRef};
33use index::result_cache::IndexResultCache;
34use moka::notification::RemovalCause;
35use moka::sync::Cache;
36use parquet::file::metadata::ParquetMetaData;
37use puffin::puffin_manager::cache::{PuffinMetadataCache, PuffinMetadataCacheRef};
38use store_api::storage::{ConcreteDataType, FileId, RegionId, TimeSeriesRowSelector};
39
40use crate::cache::cache_size::parquet_meta_size;
41use crate::cache::file_cache::{FileType, IndexKey};
42use crate::cache::index::inverted_index::{InvertedIndexCache, InvertedIndexCacheRef};
43use crate::cache::write_cache::WriteCacheRef;
44use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS};
45use crate::read::Batch;
46use crate::sst::file::RegionFileId;
47
48/// Metrics type key for sst meta.
49const SST_META_TYPE: &str = "sst_meta";
50/// Metrics type key for vector.
51const VECTOR_TYPE: &str = "vector";
52/// Metrics type key for pages.
53const PAGE_TYPE: &str = "page";
54/// Metrics type key for files on the local store.
55const FILE_TYPE: &str = "file";
56/// Metrics type key for selector result cache.
57const SELECTOR_RESULT_TYPE: &str = "selector_result";
58
59/// Cache strategies that may only enable a subset of caches.
60#[derive(Clone)]
61pub enum CacheStrategy {
62    /// Strategy for normal operations.
63    /// Doesn't disable any cache.
64    EnableAll(CacheManagerRef),
65    /// Strategy for compaction.
66    /// Disables some caches during compaction to avoid affecting queries.
67    /// Enables the write cache so that the compaction can read files cached
68    /// in the write cache and write the compacted files back to the write cache.
69    Compaction(CacheManagerRef),
70    /// Do not use any cache.
71    Disabled,
72}
73
74impl CacheStrategy {
75    /// Calls [CacheManager::get_parquet_meta_data()].
76    pub async fn get_parquet_meta_data(
77        &self,
78        file_id: RegionFileId,
79    ) -> Option<Arc<ParquetMetaData>> {
80        match self {
81            CacheStrategy::EnableAll(cache_manager) => {
82                cache_manager.get_parquet_meta_data(file_id).await
83            }
84            CacheStrategy::Compaction(cache_manager) => {
85                cache_manager.get_parquet_meta_data(file_id).await
86            }
87            CacheStrategy::Disabled => None,
88        }
89    }
90
91    /// Calls [CacheManager::get_parquet_meta_data_from_mem_cache()].
92    pub fn get_parquet_meta_data_from_mem_cache(
93        &self,
94        file_id: RegionFileId,
95    ) -> Option<Arc<ParquetMetaData>> {
96        match self {
97            CacheStrategy::EnableAll(cache_manager) => {
98                cache_manager.get_parquet_meta_data_from_mem_cache(file_id)
99            }
100            CacheStrategy::Compaction(cache_manager) => {
101                cache_manager.get_parquet_meta_data_from_mem_cache(file_id)
102            }
103            CacheStrategy::Disabled => None,
104        }
105    }
106
107    /// Calls [CacheManager::put_parquet_meta_data()].
108    pub fn put_parquet_meta_data(&self, file_id: RegionFileId, metadata: Arc<ParquetMetaData>) {
109        match self {
110            CacheStrategy::EnableAll(cache_manager) => {
111                cache_manager.put_parquet_meta_data(file_id, metadata);
112            }
113            CacheStrategy::Compaction(cache_manager) => {
114                cache_manager.put_parquet_meta_data(file_id, metadata);
115            }
116            CacheStrategy::Disabled => {}
117        }
118    }
119
120    /// Calls [CacheManager::remove_parquet_meta_data()].
121    pub fn remove_parquet_meta_data(&self, file_id: RegionFileId) {
122        match self {
123            CacheStrategy::EnableAll(cache_manager) => {
124                cache_manager.remove_parquet_meta_data(file_id);
125            }
126            CacheStrategy::Compaction(cache_manager) => {
127                cache_manager.remove_parquet_meta_data(file_id);
128            }
129            CacheStrategy::Disabled => {}
130        }
131    }
132
133    /// Calls [CacheManager::get_repeated_vector()].
134    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
135    pub fn get_repeated_vector(
136        &self,
137        data_type: &ConcreteDataType,
138        value: &Value,
139    ) -> Option<VectorRef> {
140        match self {
141            CacheStrategy::EnableAll(cache_manager) => {
142                cache_manager.get_repeated_vector(data_type, value)
143            }
144            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
145        }
146    }
147
148    /// Calls [CacheManager::put_repeated_vector()].
149    /// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
150    pub fn put_repeated_vector(&self, value: Value, vector: VectorRef) {
151        if let CacheStrategy::EnableAll(cache_manager) = self {
152            cache_manager.put_repeated_vector(value, vector);
153        }
154    }
155
156    /// Calls [CacheManager::get_pages()].
157    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
158    pub fn get_pages(&self, page_key: &PageKey) -> Option<Arc<PageValue>> {
159        match self {
160            CacheStrategy::EnableAll(cache_manager) => cache_manager.get_pages(page_key),
161            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
162        }
163    }
164
165    /// Calls [CacheManager::put_pages()].
166    /// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
167    pub fn put_pages(&self, page_key: PageKey, pages: Arc<PageValue>) {
168        if let CacheStrategy::EnableAll(cache_manager) = self {
169            cache_manager.put_pages(page_key, pages);
170        }
171    }
172
173    /// Calls [CacheManager::evict_puffin_cache()].
174    pub async fn evict_puffin_cache(&self, file_id: RegionFileId) {
175        match self {
176            CacheStrategy::EnableAll(cache_manager) => {
177                cache_manager.evict_puffin_cache(file_id).await
178            }
179            CacheStrategy::Compaction(cache_manager) => {
180                cache_manager.evict_puffin_cache(file_id).await
181            }
182            CacheStrategy::Disabled => {}
183        }
184    }
185
186    /// Calls [CacheManager::get_selector_result()].
187    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
188    pub fn get_selector_result(
189        &self,
190        selector_key: &SelectorResultKey,
191    ) -> Option<Arc<SelectorResultValue>> {
192        match self {
193            CacheStrategy::EnableAll(cache_manager) => {
194                cache_manager.get_selector_result(selector_key)
195            }
196            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
197        }
198    }
199
200    /// Calls [CacheManager::put_selector_result()].
201    /// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
202    pub fn put_selector_result(
203        &self,
204        selector_key: SelectorResultKey,
205        result: Arc<SelectorResultValue>,
206    ) {
207        if let CacheStrategy::EnableAll(cache_manager) = self {
208            cache_manager.put_selector_result(selector_key, result);
209        }
210    }
211
212    /// Calls [CacheManager::write_cache()].
213    /// It returns None if the strategy is [CacheStrategy::Disabled].
214    pub fn write_cache(&self) -> Option<&WriteCacheRef> {
215        match self {
216            CacheStrategy::EnableAll(cache_manager) => cache_manager.write_cache(),
217            CacheStrategy::Compaction(cache_manager) => cache_manager.write_cache(),
218            CacheStrategy::Disabled => None,
219        }
220    }
221
222    /// Calls [CacheManager::index_cache()].
223    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
224    pub fn inverted_index_cache(&self) -> Option<&InvertedIndexCacheRef> {
225        match self {
226            CacheStrategy::EnableAll(cache_manager) => cache_manager.inverted_index_cache(),
227            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
228        }
229    }
230
231    /// Calls [CacheManager::bloom_filter_index_cache()].
232    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
233    pub fn bloom_filter_index_cache(&self) -> Option<&BloomFilterIndexCacheRef> {
234        match self {
235            CacheStrategy::EnableAll(cache_manager) => cache_manager.bloom_filter_index_cache(),
236            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
237        }
238    }
239
240    /// Calls [CacheManager::puffin_metadata_cache()].
241    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
242    pub fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
243        match self {
244            CacheStrategy::EnableAll(cache_manager) => cache_manager.puffin_metadata_cache(),
245            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
246        }
247    }
248
249    /// Calls [CacheManager::index_result_cache()].
250    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
251    pub fn index_result_cache(&self) -> Option<&IndexResultCache> {
252        match self {
253            CacheStrategy::EnableAll(cache_manager) => cache_manager.index_result_cache(),
254            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
255        }
256    }
257}
258
259/// Manages cached data for the engine.
260///
261/// All caches are disabled by default.
262#[derive(Default)]
263pub struct CacheManager {
264    /// Cache for SST metadata.
265    sst_meta_cache: Option<SstMetaCache>,
266    /// Cache for vectors.
267    vector_cache: Option<VectorCache>,
268    /// Cache for SST pages.
269    page_cache: Option<PageCache>,
270    /// A Cache for writing files to object stores.
271    write_cache: Option<WriteCacheRef>,
272    /// Cache for inverted index.
273    inverted_index_cache: Option<InvertedIndexCacheRef>,
274    /// Cache for bloom filter index.
275    bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
276    /// Puffin metadata cache.
277    puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
278    /// Cache for time series selectors.
279    selector_result_cache: Option<SelectorResultCache>,
280    /// Cache for index result.
281    index_result_cache: Option<IndexResultCache>,
282}
283
284pub type CacheManagerRef = Arc<CacheManager>;
285
286impl CacheManager {
287    /// Returns a builder to build the cache.
288    pub fn builder() -> CacheManagerBuilder {
289        CacheManagerBuilder::default()
290    }
291
292    /// Gets cached [ParquetMetaData] from in-memory cache first.
293    /// If not found, tries to get it from write cache and fill the in-memory cache.
294    pub async fn get_parquet_meta_data(
295        &self,
296        file_id: RegionFileId,
297    ) -> Option<Arc<ParquetMetaData>> {
298        // Try to get metadata from sst meta cache
299        let metadata = self.get_parquet_meta_data_from_mem_cache(file_id);
300        if metadata.is_some() {
301            return metadata;
302        }
303
304        // Try to get metadata from write cache
305        let key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Parquet);
306        if let Some(write_cache) = &self.write_cache
307            && let Some(metadata) = write_cache.file_cache().get_parquet_meta_data(key).await
308        {
309            let metadata = Arc::new(metadata);
310            // Put metadata into sst meta cache
311            self.put_parquet_meta_data(file_id, metadata.clone());
312            return Some(metadata);
313        };
314
315        None
316    }
317
318    /// Gets cached [ParquetMetaData] from in-memory cache.
319    /// This method does not perform I/O.
320    pub fn get_parquet_meta_data_from_mem_cache(
321        &self,
322        file_id: RegionFileId,
323    ) -> Option<Arc<ParquetMetaData>> {
324        // Try to get metadata from sst meta cache
325        self.sst_meta_cache.as_ref().and_then(|sst_meta_cache| {
326            let value = sst_meta_cache.get(&SstMetaKey(file_id.region_id(), file_id.file_id()));
327            update_hit_miss(value, SST_META_TYPE)
328        })
329    }
330
331    /// Puts [ParquetMetaData] into the cache.
332    pub fn put_parquet_meta_data(&self, file_id: RegionFileId, metadata: Arc<ParquetMetaData>) {
333        if let Some(cache) = &self.sst_meta_cache {
334            let key = SstMetaKey(file_id.region_id(), file_id.file_id());
335            CACHE_BYTES
336                .with_label_values(&[SST_META_TYPE])
337                .add(meta_cache_weight(&key, &metadata).into());
338            cache.insert(key, metadata);
339        }
340    }
341
342    /// Removes [ParquetMetaData] from the cache.
343    pub fn remove_parquet_meta_data(&self, file_id: RegionFileId) {
344        if let Some(cache) = &self.sst_meta_cache {
345            cache.remove(&SstMetaKey(file_id.region_id(), file_id.file_id()));
346        }
347    }
348
349    /// Gets a vector with repeated value for specific `key`.
350    pub fn get_repeated_vector(
351        &self,
352        data_type: &ConcreteDataType,
353        value: &Value,
354    ) -> Option<VectorRef> {
355        self.vector_cache.as_ref().and_then(|vector_cache| {
356            let value = vector_cache.get(&(data_type.clone(), value.clone()));
357            update_hit_miss(value, VECTOR_TYPE)
358        })
359    }
360
361    /// Puts a vector with repeated value into the cache.
362    pub fn put_repeated_vector(&self, value: Value, vector: VectorRef) {
363        if let Some(cache) = &self.vector_cache {
364            let key = (vector.data_type(), value);
365            CACHE_BYTES
366                .with_label_values(&[VECTOR_TYPE])
367                .add(vector_cache_weight(&key, &vector).into());
368            cache.insert(key, vector);
369        }
370    }
371
372    /// Gets pages for the row group.
373    pub fn get_pages(&self, page_key: &PageKey) -> Option<Arc<PageValue>> {
374        self.page_cache.as_ref().and_then(|page_cache| {
375            let value = page_cache.get(page_key);
376            update_hit_miss(value, PAGE_TYPE)
377        })
378    }
379
380    /// Puts pages of the row group into the cache.
381    pub fn put_pages(&self, page_key: PageKey, pages: Arc<PageValue>) {
382        if let Some(cache) = &self.page_cache {
383            CACHE_BYTES
384                .with_label_values(&[PAGE_TYPE])
385                .add(page_cache_weight(&page_key, &pages).into());
386            cache.insert(page_key, pages);
387        }
388    }
389
390    /// Evicts every puffin-related cache entry for the given file.
391    pub async fn evict_puffin_cache(&self, file_id: RegionFileId) {
392        if let Some(cache) = &self.bloom_filter_index_cache {
393            cache.invalidate_file(file_id.file_id());
394        }
395
396        if let Some(cache) = &self.inverted_index_cache {
397            cache.invalidate_file(file_id.file_id());
398        }
399
400        if let Some(cache) = &self.index_result_cache {
401            cache.invalidate_file(file_id.file_id());
402        }
403
404        if let Some(cache) = &self.puffin_metadata_cache {
405            cache.remove(&file_id.to_string());
406        }
407
408        if let Some(write_cache) = &self.write_cache {
409            write_cache
410                .remove(IndexKey::new(
411                    file_id.region_id(),
412                    file_id.file_id(),
413                    FileType::Puffin,
414                ))
415                .await;
416        }
417    }
418
419    /// Gets result of for the selector.
420    pub fn get_selector_result(
421        &self,
422        selector_key: &SelectorResultKey,
423    ) -> Option<Arc<SelectorResultValue>> {
424        self.selector_result_cache
425            .as_ref()
426            .and_then(|selector_result_cache| selector_result_cache.get(selector_key))
427    }
428
429    /// Puts result of the selector into the cache.
430    pub fn put_selector_result(
431        &self,
432        selector_key: SelectorResultKey,
433        result: Arc<SelectorResultValue>,
434    ) {
435        if let Some(cache) = &self.selector_result_cache {
436            CACHE_BYTES
437                .with_label_values(&[SELECTOR_RESULT_TYPE])
438                .add(selector_result_cache_weight(&selector_key, &result).into());
439            cache.insert(selector_key, result);
440        }
441    }
442
443    /// Gets the write cache.
444    pub(crate) fn write_cache(&self) -> Option<&WriteCacheRef> {
445        self.write_cache.as_ref()
446    }
447
448    pub(crate) fn inverted_index_cache(&self) -> Option<&InvertedIndexCacheRef> {
449        self.inverted_index_cache.as_ref()
450    }
451
452    pub(crate) fn bloom_filter_index_cache(&self) -> Option<&BloomFilterIndexCacheRef> {
453        self.bloom_filter_index_cache.as_ref()
454    }
455
456    pub(crate) fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
457        self.puffin_metadata_cache.as_ref()
458    }
459
460    pub(crate) fn index_result_cache(&self) -> Option<&IndexResultCache> {
461        self.index_result_cache.as_ref()
462    }
463}
464
465/// Increases selector cache miss metrics.
466pub fn selector_result_cache_miss() {
467    CACHE_MISS.with_label_values(&[SELECTOR_RESULT_TYPE]).inc()
468}
469
470/// Increases selector cache hit metrics.
471pub fn selector_result_cache_hit() {
472    CACHE_HIT.with_label_values(&[SELECTOR_RESULT_TYPE]).inc()
473}
474
475/// Builder to construct a [CacheManager].
476#[derive(Default)]
477pub struct CacheManagerBuilder {
478    sst_meta_cache_size: u64,
479    vector_cache_size: u64,
480    page_cache_size: u64,
481    index_metadata_size: u64,
482    index_content_size: u64,
483    index_content_page_size: u64,
484    index_result_cache_size: u64,
485    puffin_metadata_size: u64,
486    write_cache: Option<WriteCacheRef>,
487    selector_result_cache_size: u64,
488}
489
490impl CacheManagerBuilder {
491    /// Sets meta cache size.
492    pub fn sst_meta_cache_size(mut self, bytes: u64) -> Self {
493        self.sst_meta_cache_size = bytes;
494        self
495    }
496
497    /// Sets vector cache size.
498    pub fn vector_cache_size(mut self, bytes: u64) -> Self {
499        self.vector_cache_size = bytes;
500        self
501    }
502
503    /// Sets page cache size.
504    pub fn page_cache_size(mut self, bytes: u64) -> Self {
505        self.page_cache_size = bytes;
506        self
507    }
508
509    /// Sets write cache.
510    pub fn write_cache(mut self, cache: Option<WriteCacheRef>) -> Self {
511        self.write_cache = cache;
512        self
513    }
514
515    /// Sets cache size for index metadata.
516    pub fn index_metadata_size(mut self, bytes: u64) -> Self {
517        self.index_metadata_size = bytes;
518        self
519    }
520
521    /// Sets cache size for index content.
522    pub fn index_content_size(mut self, bytes: u64) -> Self {
523        self.index_content_size = bytes;
524        self
525    }
526
527    /// Sets page size for index content.
528    pub fn index_content_page_size(mut self, bytes: u64) -> Self {
529        self.index_content_page_size = bytes;
530        self
531    }
532
533    /// Sets cache size for index result.
534    pub fn index_result_cache_size(mut self, bytes: u64) -> Self {
535        self.index_result_cache_size = bytes;
536        self
537    }
538
539    /// Sets cache size for puffin metadata.
540    pub fn puffin_metadata_size(mut self, bytes: u64) -> Self {
541        self.puffin_metadata_size = bytes;
542        self
543    }
544
545    /// Sets selector result cache size.
546    pub fn selector_result_cache_size(mut self, bytes: u64) -> Self {
547        self.selector_result_cache_size = bytes;
548        self
549    }
550
551    /// Builds the [CacheManager].
552    pub fn build(self) -> CacheManager {
553        fn to_str(cause: RemovalCause) -> &'static str {
554            match cause {
555                RemovalCause::Expired => "expired",
556                RemovalCause::Explicit => "explicit",
557                RemovalCause::Replaced => "replaced",
558                RemovalCause::Size => "size",
559            }
560        }
561
562        let sst_meta_cache = (self.sst_meta_cache_size != 0).then(|| {
563            Cache::builder()
564                .max_capacity(self.sst_meta_cache_size)
565                .weigher(meta_cache_weight)
566                .eviction_listener(|k, v, cause| {
567                    let size = meta_cache_weight(&k, &v);
568                    CACHE_BYTES
569                        .with_label_values(&[SST_META_TYPE])
570                        .sub(size.into());
571                    CACHE_EVICTION
572                        .with_label_values(&[SST_META_TYPE, to_str(cause)])
573                        .inc();
574                })
575                .build()
576        });
577        let vector_cache = (self.vector_cache_size != 0).then(|| {
578            Cache::builder()
579                .max_capacity(self.vector_cache_size)
580                .weigher(vector_cache_weight)
581                .eviction_listener(|k, v, cause| {
582                    let size = vector_cache_weight(&k, &v);
583                    CACHE_BYTES
584                        .with_label_values(&[VECTOR_TYPE])
585                        .sub(size.into());
586                    CACHE_EVICTION
587                        .with_label_values(&[VECTOR_TYPE, to_str(cause)])
588                        .inc();
589                })
590                .build()
591        });
592        let page_cache = (self.page_cache_size != 0).then(|| {
593            Cache::builder()
594                .max_capacity(self.page_cache_size)
595                .weigher(page_cache_weight)
596                .eviction_listener(|k, v, cause| {
597                    let size = page_cache_weight(&k, &v);
598                    CACHE_BYTES.with_label_values(&[PAGE_TYPE]).sub(size.into());
599                    CACHE_EVICTION
600                        .with_label_values(&[PAGE_TYPE, to_str(cause)])
601                        .inc();
602                })
603                .build()
604        });
605        let inverted_index_cache = InvertedIndexCache::new(
606            self.index_metadata_size,
607            self.index_content_size,
608            self.index_content_page_size,
609        );
610        // TODO(ruihang): check if it's ok to reuse the same param with inverted index
611        let bloom_filter_index_cache = BloomFilterIndexCache::new(
612            self.index_metadata_size,
613            self.index_content_size,
614            self.index_content_page_size,
615        );
616        let index_result_cache = (self.index_result_cache_size != 0)
617            .then(|| IndexResultCache::new(self.index_result_cache_size));
618        let puffin_metadata_cache =
619            PuffinMetadataCache::new(self.puffin_metadata_size, &CACHE_BYTES);
620        let selector_result_cache = (self.selector_result_cache_size != 0).then(|| {
621            Cache::builder()
622                .max_capacity(self.selector_result_cache_size)
623                .weigher(selector_result_cache_weight)
624                .eviction_listener(|k, v, cause| {
625                    let size = selector_result_cache_weight(&k, &v);
626                    CACHE_BYTES
627                        .with_label_values(&[SELECTOR_RESULT_TYPE])
628                        .sub(size.into());
629                    CACHE_EVICTION
630                        .with_label_values(&[SELECTOR_RESULT_TYPE, to_str(cause)])
631                        .inc();
632                })
633                .build()
634        });
635        CacheManager {
636            sst_meta_cache,
637            vector_cache,
638            page_cache,
639            write_cache: self.write_cache,
640            inverted_index_cache: Some(Arc::new(inverted_index_cache)),
641            bloom_filter_index_cache: Some(Arc::new(bloom_filter_index_cache)),
642            puffin_metadata_cache: Some(Arc::new(puffin_metadata_cache)),
643            selector_result_cache,
644            index_result_cache,
645        }
646    }
647}
648
649fn meta_cache_weight(k: &SstMetaKey, v: &Arc<ParquetMetaData>) -> u32 {
650    // We ignore the size of `Arc`.
651    (k.estimated_size() + parquet_meta_size(v)) as u32
652}
653
654fn vector_cache_weight(_k: &(ConcreteDataType, Value), v: &VectorRef) -> u32 {
655    // We ignore the heap size of `Value`.
656    (mem::size_of::<ConcreteDataType>() + mem::size_of::<Value>() + v.memory_size()) as u32
657}
658
659fn page_cache_weight(k: &PageKey, v: &Arc<PageValue>) -> u32 {
660    (k.estimated_size() + v.estimated_size()) as u32
661}
662
663fn selector_result_cache_weight(k: &SelectorResultKey, v: &Arc<SelectorResultValue>) -> u32 {
664    (mem::size_of_val(k) + v.estimated_size()) as u32
665}
666
667/// Updates cache hit/miss metrics.
668fn update_hit_miss<T>(value: Option<T>, cache_type: &str) -> Option<T> {
669    if value.is_some() {
670        CACHE_HIT.with_label_values(&[cache_type]).inc();
671    } else {
672        CACHE_MISS.with_label_values(&[cache_type]).inc();
673    }
674    value
675}
676
677/// Cache key (region id, file id) for SST meta.
678#[derive(Debug, Clone, PartialEq, Eq, Hash)]
679struct SstMetaKey(RegionId, FileId);
680
681impl SstMetaKey {
682    /// Returns memory used by the key (estimated).
683    fn estimated_size(&self) -> usize {
684        mem::size_of::<Self>()
685    }
686}
687
688/// Path to column pages in the SST file.
689#[derive(Debug, Clone, PartialEq, Eq, Hash)]
690pub struct ColumnPagePath {
691    /// Region id of the SST file to cache.
692    region_id: RegionId,
693    /// Id of the SST file to cache.
694    file_id: FileId,
695    /// Index of the row group.
696    row_group_idx: usize,
697    /// Index of the column in the row group.
698    column_idx: usize,
699}
700
701/// Cache key to pages in a row group (after projection).
702///
703/// Different projections will have different cache keys.
704/// We cache all ranges together because they may refer to the same `Bytes`.
705#[derive(Debug, Clone, PartialEq, Eq, Hash)]
706pub struct PageKey {
707    /// Id of the SST file to cache.
708    file_id: FileId,
709    /// Index of the row group.
710    row_group_idx: usize,
711    /// Byte ranges of the pages to cache.
712    ranges: Vec<Range<u64>>,
713}
714
715impl PageKey {
716    /// Creates a key for a list of pages.
717    pub fn new(file_id: FileId, row_group_idx: usize, ranges: Vec<Range<u64>>) -> PageKey {
718        PageKey {
719            file_id,
720            row_group_idx,
721            ranges,
722        }
723    }
724
725    /// Returns memory used by the key (estimated).
726    fn estimated_size(&self) -> usize {
727        mem::size_of::<Self>() + mem::size_of_val(self.ranges.as_slice())
728    }
729}
730
731/// Cached row group pages for a column.
732// We don't use enum here to make it easier to mock and use the struct.
733#[derive(Default)]
734pub struct PageValue {
735    /// Compressed page in the row group.
736    pub compressed: Vec<Bytes>,
737    /// Total size of the pages (may be larger than sum of compressed bytes due to gaps).
738    pub page_size: u64,
739}
740
741impl PageValue {
742    /// Creates a new value from a range of compressed pages.
743    pub fn new(bytes: Vec<Bytes>, page_size: u64) -> PageValue {
744        PageValue {
745            compressed: bytes,
746            page_size,
747        }
748    }
749
750    /// Returns memory used by the value (estimated).
751    fn estimated_size(&self) -> usize {
752        mem::size_of::<Self>()
753            + self.page_size as usize
754            + self.compressed.iter().map(mem::size_of_val).sum::<usize>()
755    }
756}
757
758/// Cache key for time series row selector result.
759#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
760pub struct SelectorResultKey {
761    /// Id of the SST file.
762    pub file_id: FileId,
763    /// Index of the row group.
764    pub row_group_idx: usize,
765    /// Time series row selector.
766    pub selector: TimeSeriesRowSelector,
767}
768
769/// Cached result for time series row selector.
770pub struct SelectorResultValue {
771    /// Batches of rows selected by the selector.
772    pub result: Vec<Batch>,
773    /// Projection of rows.
774    pub projection: Vec<usize>,
775}
776
777impl SelectorResultValue {
778    /// Creates a new selector result value.
779    pub fn new(result: Vec<Batch>, projection: Vec<usize>) -> SelectorResultValue {
780        SelectorResultValue { result, projection }
781    }
782
783    /// Returns memory used by the value (estimated).
784    fn estimated_size(&self) -> usize {
785        // We only consider heap size of all batches.
786        self.result.iter().map(|batch| batch.memory_size()).sum()
787    }
788}
789
790/// Maps (region id, file id) to [ParquetMetaData].
791type SstMetaCache = Cache<SstMetaKey, Arc<ParquetMetaData>>;
792/// Maps [Value] to a vector that holds this value repeatedly.
793///
794/// e.g. `"hello" => ["hello", "hello", "hello"]`
795type VectorCache = Cache<(ConcreteDataType, Value), VectorRef>;
796/// Maps (region, file, row group, column) to [PageValue].
797type PageCache = Cache<PageKey, Arc<PageValue>>;
798/// Maps (file id, row group id, time series row selector) to [SelectorResultValue].
799type SelectorResultCache = Cache<SelectorResultKey, Arc<SelectorResultValue>>;
800
801#[cfg(test)]
802mod tests {
803    use std::sync::Arc;
804
805    use api::v1::index::{BloomFilterMeta, InvertedIndexMetas};
806    use datatypes::vectors::Int64Vector;
807    use puffin::file_metadata::FileMetadata;
808    use store_api::storage::ColumnId;
809
810    use super::*;
811    use crate::cache::index::bloom_filter_index::Tag;
812    use crate::cache::index::result_cache::PredicateKey;
813    use crate::cache::test_util::parquet_meta;
814    use crate::sst::parquet::row_selection::RowGroupSelection;
815
816    #[tokio::test]
817    async fn test_disable_cache() {
818        let cache = CacheManager::default();
819        assert!(cache.sst_meta_cache.is_none());
820        assert!(cache.vector_cache.is_none());
821        assert!(cache.page_cache.is_none());
822
823        let region_id = RegionId::new(1, 1);
824        let file_id = RegionFileId::new(region_id, FileId::random());
825        let metadata = parquet_meta();
826        cache.put_parquet_meta_data(file_id, metadata);
827        assert!(cache.get_parquet_meta_data(file_id).await.is_none());
828
829        let value = Value::Int64(10);
830        let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10]));
831        cache.put_repeated_vector(value.clone(), vector.clone());
832        assert!(
833            cache
834                .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
835                .is_none()
836        );
837
838        let key = PageKey::new(file_id.file_id(), 1, vec![Range { start: 0, end: 5 }]);
839        let pages = Arc::new(PageValue::default());
840        cache.put_pages(key.clone(), pages);
841        assert!(cache.get_pages(&key).is_none());
842
843        assert!(cache.write_cache().is_none());
844    }
845
846    #[tokio::test]
847    async fn test_parquet_meta_cache() {
848        let cache = CacheManager::builder().sst_meta_cache_size(2000).build();
849        let region_id = RegionId::new(1, 1);
850        let file_id = RegionFileId::new(region_id, FileId::random());
851        assert!(cache.get_parquet_meta_data(file_id).await.is_none());
852        let metadata = parquet_meta();
853        cache.put_parquet_meta_data(file_id, metadata);
854        assert!(cache.get_parquet_meta_data(file_id).await.is_some());
855        cache.remove_parquet_meta_data(file_id);
856        assert!(cache.get_parquet_meta_data(file_id).await.is_none());
857    }
858
859    #[test]
860    fn test_repeated_vector_cache() {
861        let cache = CacheManager::builder().vector_cache_size(4096).build();
862        let value = Value::Int64(10);
863        assert!(
864            cache
865                .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
866                .is_none()
867        );
868        let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10]));
869        cache.put_repeated_vector(value.clone(), vector.clone());
870        let cached = cache
871            .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
872            .unwrap();
873        assert_eq!(vector, cached);
874    }
875
876    #[test]
877    fn test_page_cache() {
878        let cache = CacheManager::builder().page_cache_size(1000).build();
879        let file_id = FileId::random();
880        let key = PageKey::new(file_id, 0, vec![(0..10), (10..20)]);
881        assert!(cache.get_pages(&key).is_none());
882        let pages = Arc::new(PageValue::default());
883        cache.put_pages(key.clone(), pages);
884        assert!(cache.get_pages(&key).is_some());
885    }
886
887    #[test]
888    fn test_selector_result_cache() {
889        let cache = CacheManager::builder()
890            .selector_result_cache_size(1000)
891            .build();
892        let file_id = FileId::random();
893        let key = SelectorResultKey {
894            file_id,
895            row_group_idx: 0,
896            selector: TimeSeriesRowSelector::LastRow,
897        };
898        assert!(cache.get_selector_result(&key).is_none());
899        let result = Arc::new(SelectorResultValue::new(Vec::new(), Vec::new()));
900        cache.put_selector_result(key, result);
901        assert!(cache.get_selector_result(&key).is_some());
902    }
903
904    #[tokio::test]
905    async fn test_evict_puffin_cache_clears_all_entries() {
906        use std::collections::{BTreeMap, HashMap};
907
908        let cache = CacheManager::builder()
909            .index_metadata_size(128)
910            .index_content_size(128)
911            .index_content_page_size(64)
912            .index_result_cache_size(128)
913            .puffin_metadata_size(128)
914            .build();
915        let cache = Arc::new(cache);
916
917        let region_id = RegionId::new(1, 1);
918        let region_file_id = RegionFileId::new(region_id, FileId::random());
919        let column_id: ColumnId = 1;
920
921        let bloom_cache = cache.bloom_filter_index_cache().unwrap().clone();
922        let inverted_cache = cache.inverted_index_cache().unwrap().clone();
923        let result_cache = cache.index_result_cache().unwrap();
924        let puffin_metadata_cache = cache.puffin_metadata_cache().unwrap().clone();
925
926        let bloom_key = (region_file_id.file_id(), column_id, Tag::Skipping);
927        bloom_cache.put_metadata(bloom_key, Arc::new(BloomFilterMeta::default()));
928        inverted_cache.put_metadata(
929            region_file_id.file_id(),
930            Arc::new(InvertedIndexMetas::default()),
931        );
932        let predicate = PredicateKey::new_bloom(Arc::new(BTreeMap::new()));
933        let selection = Arc::new(RowGroupSelection::default());
934        result_cache.put(predicate.clone(), region_file_id.file_id(), selection);
935        let file_id_str = region_file_id.to_string();
936        let metadata = Arc::new(FileMetadata {
937            blobs: Vec::new(),
938            properties: HashMap::new(),
939        });
940        puffin_metadata_cache.put_metadata(file_id_str.clone(), metadata);
941
942        assert!(bloom_cache.get_metadata(bloom_key).is_some());
943        assert!(
944            inverted_cache
945                .get_metadata(region_file_id.file_id())
946                .is_some()
947        );
948        assert!(
949            result_cache
950                .get(&predicate, region_file_id.file_id())
951                .is_some()
952        );
953        assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_some());
954
955        cache.evict_puffin_cache(region_file_id).await;
956
957        assert!(bloom_cache.get_metadata(bloom_key).is_none());
958        assert!(
959            inverted_cache
960                .get_metadata(region_file_id.file_id())
961                .is_none()
962        );
963        assert!(
964            result_cache
965                .get(&predicate, region_file_id.file_id())
966                .is_none()
967        );
968        assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_none());
969
970        // Refill caches and evict via CacheStrategy to ensure delegation works.
971        bloom_cache.put_metadata(bloom_key, Arc::new(BloomFilterMeta::default()));
972        inverted_cache.put_metadata(
973            region_file_id.file_id(),
974            Arc::new(InvertedIndexMetas::default()),
975        );
976        result_cache.put(
977            predicate.clone(),
978            region_file_id.file_id(),
979            Arc::new(RowGroupSelection::default()),
980        );
981        puffin_metadata_cache.put_metadata(
982            file_id_str.clone(),
983            Arc::new(FileMetadata {
984                blobs: Vec::new(),
985                properties: HashMap::new(),
986            }),
987        );
988
989        let strategy = CacheStrategy::EnableAll(cache.clone());
990        strategy.evict_puffin_cache(region_file_id).await;
991
992        assert!(bloom_cache.get_metadata(bloom_key).is_none());
993        assert!(
994            inverted_cache
995                .get_metadata(region_file_id.file_id())
996                .is_none()
997        );
998        assert!(
999            result_cache
1000                .get(&predicate, region_file_id.file_id())
1001                .is_none()
1002        );
1003        assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_none());
1004    }
1005}