mito2/
cache.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Cache for the engine.
16
17mod cache_size;
18
19pub(crate) mod file_cache;
20pub(crate) mod index;
21#[cfg(test)]
22pub(crate) mod test_util;
23pub(crate) mod write_cache;
24
25use std::mem;
26use std::sync::Arc;
27
28use bytes::Bytes;
29use datatypes::value::Value;
30use datatypes::vectors::VectorRef;
31use index::bloom_filter_index::{BloomFilterIndexCache, BloomFilterIndexCacheRef};
32use index::result_cache::IndexResultCache;
33use moka::notification::RemovalCause;
34use moka::sync::Cache;
35use parquet::column::page::Page;
36use parquet::file::metadata::ParquetMetaData;
37use puffin::puffin_manager::cache::{PuffinMetadataCache, PuffinMetadataCacheRef};
38use store_api::storage::{ConcreteDataType, RegionId, TimeSeriesRowSelector};
39
40use crate::cache::cache_size::parquet_meta_size;
41use crate::cache::file_cache::{FileType, IndexKey};
42use crate::cache::index::inverted_index::{InvertedIndexCache, InvertedIndexCacheRef};
43use crate::cache::write_cache::WriteCacheRef;
44use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS};
45use crate::read::Batch;
46use crate::sst::file::{FileId, RegionFileId};
47
48/// Metrics type key for sst meta.
49const SST_META_TYPE: &str = "sst_meta";
50/// Metrics type key for vector.
51const VECTOR_TYPE: &str = "vector";
52/// Metrics type key for pages.
53const PAGE_TYPE: &str = "page";
54/// Metrics type key for files on the local store.
55const FILE_TYPE: &str = "file";
56/// Metrics type key for selector result cache.
57const SELECTOR_RESULT_TYPE: &str = "selector_result";
58
59/// Cache strategies that may only enable a subset of caches.
60#[derive(Clone)]
61pub enum CacheStrategy {
62    /// Strategy for normal operations.
63    /// Doesn't disable any cache.
64    EnableAll(CacheManagerRef),
65    /// Strategy for compaction.
66    /// Disables some caches during compaction to avoid affecting queries.
67    /// Enables the write cache so that the compaction can read files cached
68    /// in the write cache and write the compacted files back to the write cache.
69    Compaction(CacheManagerRef),
70    /// Do not use any cache.
71    Disabled,
72}
73
74impl CacheStrategy {
75    /// Calls [CacheManager::get_parquet_meta_data()].
76    pub async fn get_parquet_meta_data(
77        &self,
78        file_id: RegionFileId,
79    ) -> Option<Arc<ParquetMetaData>> {
80        match self {
81            CacheStrategy::EnableAll(cache_manager) => {
82                cache_manager.get_parquet_meta_data(file_id).await
83            }
84            CacheStrategy::Compaction(cache_manager) => {
85                cache_manager.get_parquet_meta_data(file_id).await
86            }
87            CacheStrategy::Disabled => None,
88        }
89    }
90
91    /// Calls [CacheManager::get_parquet_meta_data_from_mem_cache()].
92    pub fn get_parquet_meta_data_from_mem_cache(
93        &self,
94        file_id: RegionFileId,
95    ) -> Option<Arc<ParquetMetaData>> {
96        match self {
97            CacheStrategy::EnableAll(cache_manager) => {
98                cache_manager.get_parquet_meta_data_from_mem_cache(file_id)
99            }
100            CacheStrategy::Compaction(cache_manager) => {
101                cache_manager.get_parquet_meta_data_from_mem_cache(file_id)
102            }
103            CacheStrategy::Disabled => None,
104        }
105    }
106
107    /// Calls [CacheManager::put_parquet_meta_data()].
108    pub fn put_parquet_meta_data(&self, file_id: RegionFileId, metadata: Arc<ParquetMetaData>) {
109        match self {
110            CacheStrategy::EnableAll(cache_manager) => {
111                cache_manager.put_parquet_meta_data(file_id, metadata);
112            }
113            CacheStrategy::Compaction(cache_manager) => {
114                cache_manager.put_parquet_meta_data(file_id, metadata);
115            }
116            CacheStrategy::Disabled => {}
117        }
118    }
119
120    /// Calls [CacheManager::remove_parquet_meta_data()].
121    pub fn remove_parquet_meta_data(&self, file_id: RegionFileId) {
122        match self {
123            CacheStrategy::EnableAll(cache_manager) => {
124                cache_manager.remove_parquet_meta_data(file_id);
125            }
126            CacheStrategy::Compaction(cache_manager) => {
127                cache_manager.remove_parquet_meta_data(file_id);
128            }
129            CacheStrategy::Disabled => {}
130        }
131    }
132
133    /// Calls [CacheManager::get_repeated_vector()].
134    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
135    pub fn get_repeated_vector(
136        &self,
137        data_type: &ConcreteDataType,
138        value: &Value,
139    ) -> Option<VectorRef> {
140        match self {
141            CacheStrategy::EnableAll(cache_manager) => {
142                cache_manager.get_repeated_vector(data_type, value)
143            }
144            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
145        }
146    }
147
148    /// Calls [CacheManager::put_repeated_vector()].
149    /// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
150    pub fn put_repeated_vector(&self, value: Value, vector: VectorRef) {
151        if let CacheStrategy::EnableAll(cache_manager) = self {
152            cache_manager.put_repeated_vector(value, vector);
153        }
154    }
155
156    /// Calls [CacheManager::get_pages()].
157    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
158    pub fn get_pages(&self, page_key: &PageKey) -> Option<Arc<PageValue>> {
159        match self {
160            CacheStrategy::EnableAll(cache_manager) => cache_manager.get_pages(page_key),
161            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
162        }
163    }
164
165    /// Calls [CacheManager::put_pages()].
166    /// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
167    pub fn put_pages(&self, page_key: PageKey, pages: Arc<PageValue>) {
168        if let CacheStrategy::EnableAll(cache_manager) = self {
169            cache_manager.put_pages(page_key, pages);
170        }
171    }
172
173    /// Calls [CacheManager::get_selector_result()].
174    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
175    pub fn get_selector_result(
176        &self,
177        selector_key: &SelectorResultKey,
178    ) -> Option<Arc<SelectorResultValue>> {
179        match self {
180            CacheStrategy::EnableAll(cache_manager) => {
181                cache_manager.get_selector_result(selector_key)
182            }
183            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
184        }
185    }
186
187    /// Calls [CacheManager::put_selector_result()].
188    /// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
189    pub fn put_selector_result(
190        &self,
191        selector_key: SelectorResultKey,
192        result: Arc<SelectorResultValue>,
193    ) {
194        if let CacheStrategy::EnableAll(cache_manager) = self {
195            cache_manager.put_selector_result(selector_key, result);
196        }
197    }
198
199    /// Calls [CacheManager::write_cache()].
200    /// It returns None if the strategy is [CacheStrategy::Disabled].
201    pub fn write_cache(&self) -> Option<&WriteCacheRef> {
202        match self {
203            CacheStrategy::EnableAll(cache_manager) => cache_manager.write_cache(),
204            CacheStrategy::Compaction(cache_manager) => cache_manager.write_cache(),
205            CacheStrategy::Disabled => None,
206        }
207    }
208
209    /// Calls [CacheManager::index_cache()].
210    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
211    pub fn inverted_index_cache(&self) -> Option<&InvertedIndexCacheRef> {
212        match self {
213            CacheStrategy::EnableAll(cache_manager) => cache_manager.inverted_index_cache(),
214            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
215        }
216    }
217
218    /// Calls [CacheManager::bloom_filter_index_cache()].
219    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
220    pub fn bloom_filter_index_cache(&self) -> Option<&BloomFilterIndexCacheRef> {
221        match self {
222            CacheStrategy::EnableAll(cache_manager) => cache_manager.bloom_filter_index_cache(),
223            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
224        }
225    }
226
227    /// Calls [CacheManager::puffin_metadata_cache()].
228    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
229    pub fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
230        match self {
231            CacheStrategy::EnableAll(cache_manager) => cache_manager.puffin_metadata_cache(),
232            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
233        }
234    }
235
236    /// Calls [CacheManager::index_result_cache()].
237    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
238    pub fn index_result_cache(&self) -> Option<&IndexResultCache> {
239        match self {
240            CacheStrategy::EnableAll(cache_manager) => cache_manager.index_result_cache(),
241            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
242        }
243    }
244}
245
246/// Manages cached data for the engine.
247///
248/// All caches are disabled by default.
249#[derive(Default)]
250pub struct CacheManager {
251    /// Cache for SST metadata.
252    sst_meta_cache: Option<SstMetaCache>,
253    /// Cache for vectors.
254    vector_cache: Option<VectorCache>,
255    /// Cache for SST pages.
256    page_cache: Option<PageCache>,
257    /// A Cache for writing files to object stores.
258    write_cache: Option<WriteCacheRef>,
259    /// Cache for inverted index.
260    inverted_index_cache: Option<InvertedIndexCacheRef>,
261    /// Cache for bloom filter index.
262    bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
263    /// Puffin metadata cache.
264    puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
265    /// Cache for time series selectors.
266    selector_result_cache: Option<SelectorResultCache>,
267    /// Cache for index result.
268    index_result_cache: Option<IndexResultCache>,
269}
270
271pub type CacheManagerRef = Arc<CacheManager>;
272
273impl CacheManager {
274    /// Returns a builder to build the cache.
275    pub fn builder() -> CacheManagerBuilder {
276        CacheManagerBuilder::default()
277    }
278
279    /// Gets cached [ParquetMetaData] from in-memory cache first.
280    /// If not found, tries to get it from write cache and fill the in-memory cache.
281    pub async fn get_parquet_meta_data(
282        &self,
283        file_id: RegionFileId,
284    ) -> Option<Arc<ParquetMetaData>> {
285        // Try to get metadata from sst meta cache
286        let metadata = self.get_parquet_meta_data_from_mem_cache(file_id);
287        if metadata.is_some() {
288            return metadata;
289        }
290
291        // Try to get metadata from write cache
292        let key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Parquet);
293        if let Some(write_cache) = &self.write_cache {
294            if let Some(metadata) = write_cache.file_cache().get_parquet_meta_data(key).await {
295                let metadata = Arc::new(metadata);
296                // Put metadata into sst meta cache
297                self.put_parquet_meta_data(file_id, metadata.clone());
298                return Some(metadata);
299            }
300        };
301
302        None
303    }
304
305    /// Gets cached [ParquetMetaData] from in-memory cache.
306    /// This method does not perform I/O.
307    pub fn get_parquet_meta_data_from_mem_cache(
308        &self,
309        file_id: RegionFileId,
310    ) -> Option<Arc<ParquetMetaData>> {
311        // Try to get metadata from sst meta cache
312        self.sst_meta_cache.as_ref().and_then(|sst_meta_cache| {
313            let value = sst_meta_cache.get(&SstMetaKey(file_id.region_id(), file_id.file_id()));
314            update_hit_miss(value, SST_META_TYPE)
315        })
316    }
317
318    /// Puts [ParquetMetaData] into the cache.
319    pub fn put_parquet_meta_data(&self, file_id: RegionFileId, metadata: Arc<ParquetMetaData>) {
320        if let Some(cache) = &self.sst_meta_cache {
321            let key = SstMetaKey(file_id.region_id(), file_id.file_id());
322            CACHE_BYTES
323                .with_label_values(&[SST_META_TYPE])
324                .add(meta_cache_weight(&key, &metadata).into());
325            cache.insert(key, metadata);
326        }
327    }
328
329    /// Removes [ParquetMetaData] from the cache.
330    pub fn remove_parquet_meta_data(&self, file_id: RegionFileId) {
331        if let Some(cache) = &self.sst_meta_cache {
332            cache.remove(&SstMetaKey(file_id.region_id(), file_id.file_id()));
333        }
334    }
335
336    /// Gets a vector with repeated value for specific `key`.
337    pub fn get_repeated_vector(
338        &self,
339        data_type: &ConcreteDataType,
340        value: &Value,
341    ) -> Option<VectorRef> {
342        self.vector_cache.as_ref().and_then(|vector_cache| {
343            let value = vector_cache.get(&(data_type.clone(), value.clone()));
344            update_hit_miss(value, VECTOR_TYPE)
345        })
346    }
347
348    /// Puts a vector with repeated value into the cache.
349    pub fn put_repeated_vector(&self, value: Value, vector: VectorRef) {
350        if let Some(cache) = &self.vector_cache {
351            let key = (vector.data_type(), value);
352            CACHE_BYTES
353                .with_label_values(&[VECTOR_TYPE])
354                .add(vector_cache_weight(&key, &vector).into());
355            cache.insert(key, vector);
356        }
357    }
358
359    /// Gets pages for the row group.
360    pub fn get_pages(&self, page_key: &PageKey) -> Option<Arc<PageValue>> {
361        self.page_cache.as_ref().and_then(|page_cache| {
362            let value = page_cache.get(page_key);
363            update_hit_miss(value, PAGE_TYPE)
364        })
365    }
366
367    /// Puts pages of the row group into the cache.
368    pub fn put_pages(&self, page_key: PageKey, pages: Arc<PageValue>) {
369        if let Some(cache) = &self.page_cache {
370            CACHE_BYTES
371                .with_label_values(&[PAGE_TYPE])
372                .add(page_cache_weight(&page_key, &pages).into());
373            cache.insert(page_key, pages);
374        }
375    }
376
377    /// Gets result of for the selector.
378    pub fn get_selector_result(
379        &self,
380        selector_key: &SelectorResultKey,
381    ) -> Option<Arc<SelectorResultValue>> {
382        self.selector_result_cache
383            .as_ref()
384            .and_then(|selector_result_cache| selector_result_cache.get(selector_key))
385    }
386
387    /// Puts result of the selector into the cache.
388    pub fn put_selector_result(
389        &self,
390        selector_key: SelectorResultKey,
391        result: Arc<SelectorResultValue>,
392    ) {
393        if let Some(cache) = &self.selector_result_cache {
394            CACHE_BYTES
395                .with_label_values(&[SELECTOR_RESULT_TYPE])
396                .add(selector_result_cache_weight(&selector_key, &result).into());
397            cache.insert(selector_key, result);
398        }
399    }
400
401    /// Gets the write cache.
402    pub(crate) fn write_cache(&self) -> Option<&WriteCacheRef> {
403        self.write_cache.as_ref()
404    }
405
406    pub(crate) fn inverted_index_cache(&self) -> Option<&InvertedIndexCacheRef> {
407        self.inverted_index_cache.as_ref()
408    }
409
410    pub(crate) fn bloom_filter_index_cache(&self) -> Option<&BloomFilterIndexCacheRef> {
411        self.bloom_filter_index_cache.as_ref()
412    }
413
414    pub(crate) fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
415        self.puffin_metadata_cache.as_ref()
416    }
417
418    pub(crate) fn index_result_cache(&self) -> Option<&IndexResultCache> {
419        self.index_result_cache.as_ref()
420    }
421}
422
423/// Increases selector cache miss metrics.
424pub fn selector_result_cache_miss() {
425    CACHE_MISS.with_label_values(&[SELECTOR_RESULT_TYPE]).inc()
426}
427
428/// Increases selector cache hit metrics.
429pub fn selector_result_cache_hit() {
430    CACHE_HIT.with_label_values(&[SELECTOR_RESULT_TYPE]).inc()
431}
432
433/// Builder to construct a [CacheManager].
434#[derive(Default)]
435pub struct CacheManagerBuilder {
436    sst_meta_cache_size: u64,
437    vector_cache_size: u64,
438    page_cache_size: u64,
439    index_metadata_size: u64,
440    index_content_size: u64,
441    index_content_page_size: u64,
442    index_result_cache_size: u64,
443    puffin_metadata_size: u64,
444    write_cache: Option<WriteCacheRef>,
445    selector_result_cache_size: u64,
446}
447
448impl CacheManagerBuilder {
449    /// Sets meta cache size.
450    pub fn sst_meta_cache_size(mut self, bytes: u64) -> Self {
451        self.sst_meta_cache_size = bytes;
452        self
453    }
454
455    /// Sets vector cache size.
456    pub fn vector_cache_size(mut self, bytes: u64) -> Self {
457        self.vector_cache_size = bytes;
458        self
459    }
460
461    /// Sets page cache size.
462    pub fn page_cache_size(mut self, bytes: u64) -> Self {
463        self.page_cache_size = bytes;
464        self
465    }
466
467    /// Sets write cache.
468    pub fn write_cache(mut self, cache: Option<WriteCacheRef>) -> Self {
469        self.write_cache = cache;
470        self
471    }
472
473    /// Sets cache size for index metadata.
474    pub fn index_metadata_size(mut self, bytes: u64) -> Self {
475        self.index_metadata_size = bytes;
476        self
477    }
478
479    /// Sets cache size for index content.
480    pub fn index_content_size(mut self, bytes: u64) -> Self {
481        self.index_content_size = bytes;
482        self
483    }
484
485    /// Sets page size for index content.
486    pub fn index_content_page_size(mut self, bytes: u64) -> Self {
487        self.index_content_page_size = bytes;
488        self
489    }
490
491    /// Sets cache size for index result.
492    pub fn index_result_cache_size(mut self, bytes: u64) -> Self {
493        self.index_result_cache_size = bytes;
494        self
495    }
496
497    /// Sets cache size for puffin metadata.
498    pub fn puffin_metadata_size(mut self, bytes: u64) -> Self {
499        self.puffin_metadata_size = bytes;
500        self
501    }
502
503    /// Sets selector result cache size.
504    pub fn selector_result_cache_size(mut self, bytes: u64) -> Self {
505        self.selector_result_cache_size = bytes;
506        self
507    }
508
509    /// Builds the [CacheManager].
510    pub fn build(self) -> CacheManager {
511        fn to_str(cause: RemovalCause) -> &'static str {
512            match cause {
513                RemovalCause::Expired => "expired",
514                RemovalCause::Explicit => "explicit",
515                RemovalCause::Replaced => "replaced",
516                RemovalCause::Size => "size",
517            }
518        }
519
520        let sst_meta_cache = (self.sst_meta_cache_size != 0).then(|| {
521            Cache::builder()
522                .max_capacity(self.sst_meta_cache_size)
523                .weigher(meta_cache_weight)
524                .eviction_listener(|k, v, cause| {
525                    let size = meta_cache_weight(&k, &v);
526                    CACHE_BYTES
527                        .with_label_values(&[SST_META_TYPE])
528                        .sub(size.into());
529                    CACHE_EVICTION
530                        .with_label_values(&[SST_META_TYPE, to_str(cause)])
531                        .inc();
532                })
533                .build()
534        });
535        let vector_cache = (self.vector_cache_size != 0).then(|| {
536            Cache::builder()
537                .max_capacity(self.vector_cache_size)
538                .weigher(vector_cache_weight)
539                .eviction_listener(|k, v, cause| {
540                    let size = vector_cache_weight(&k, &v);
541                    CACHE_BYTES
542                        .with_label_values(&[VECTOR_TYPE])
543                        .sub(size.into());
544                    CACHE_EVICTION
545                        .with_label_values(&[VECTOR_TYPE, to_str(cause)])
546                        .inc();
547                })
548                .build()
549        });
550        let page_cache = (self.page_cache_size != 0).then(|| {
551            Cache::builder()
552                .max_capacity(self.page_cache_size)
553                .weigher(page_cache_weight)
554                .eviction_listener(|k, v, cause| {
555                    let size = page_cache_weight(&k, &v);
556                    CACHE_BYTES.with_label_values(&[PAGE_TYPE]).sub(size.into());
557                    CACHE_EVICTION
558                        .with_label_values(&[PAGE_TYPE, to_str(cause)])
559                        .inc();
560                })
561                .build()
562        });
563        let inverted_index_cache = InvertedIndexCache::new(
564            self.index_metadata_size,
565            self.index_content_size,
566            self.index_content_page_size,
567        );
568        // TODO(ruihang): check if it's ok to reuse the same param with inverted index
569        let bloom_filter_index_cache = BloomFilterIndexCache::new(
570            self.index_metadata_size,
571            self.index_content_size,
572            self.index_content_page_size,
573        );
574        let index_result_cache = (self.index_result_cache_size != 0)
575            .then(|| IndexResultCache::new(self.index_result_cache_size));
576        let puffin_metadata_cache =
577            PuffinMetadataCache::new(self.puffin_metadata_size, &CACHE_BYTES);
578        let selector_result_cache = (self.selector_result_cache_size != 0).then(|| {
579            Cache::builder()
580                .max_capacity(self.selector_result_cache_size)
581                .weigher(selector_result_cache_weight)
582                .eviction_listener(|k, v, cause| {
583                    let size = selector_result_cache_weight(&k, &v);
584                    CACHE_BYTES
585                        .with_label_values(&[SELECTOR_RESULT_TYPE])
586                        .sub(size.into());
587                    CACHE_EVICTION
588                        .with_label_values(&[SELECTOR_RESULT_TYPE, to_str(cause)])
589                        .inc();
590                })
591                .build()
592        });
593        CacheManager {
594            sst_meta_cache,
595            vector_cache,
596            page_cache,
597            write_cache: self.write_cache,
598            inverted_index_cache: Some(Arc::new(inverted_index_cache)),
599            bloom_filter_index_cache: Some(Arc::new(bloom_filter_index_cache)),
600            puffin_metadata_cache: Some(Arc::new(puffin_metadata_cache)),
601            selector_result_cache,
602            index_result_cache,
603        }
604    }
605}
606
607fn meta_cache_weight(k: &SstMetaKey, v: &Arc<ParquetMetaData>) -> u32 {
608    // We ignore the size of `Arc`.
609    (k.estimated_size() + parquet_meta_size(v)) as u32
610}
611
612fn vector_cache_weight(_k: &(ConcreteDataType, Value), v: &VectorRef) -> u32 {
613    // We ignore the heap size of `Value`.
614    (mem::size_of::<ConcreteDataType>() + mem::size_of::<Value>() + v.memory_size()) as u32
615}
616
617fn page_cache_weight(k: &PageKey, v: &Arc<PageValue>) -> u32 {
618    (k.estimated_size() + v.estimated_size()) as u32
619}
620
621fn selector_result_cache_weight(k: &SelectorResultKey, v: &Arc<SelectorResultValue>) -> u32 {
622    (mem::size_of_val(k) + v.estimated_size()) as u32
623}
624
625/// Updates cache hit/miss metrics.
626fn update_hit_miss<T>(value: Option<T>, cache_type: &str) -> Option<T> {
627    if value.is_some() {
628        CACHE_HIT.with_label_values(&[cache_type]).inc();
629    } else {
630        CACHE_MISS.with_label_values(&[cache_type]).inc();
631    }
632    value
633}
634
635/// Cache key (region id, file id) for SST meta.
636#[derive(Debug, Clone, PartialEq, Eq, Hash)]
637struct SstMetaKey(RegionId, FileId);
638
639impl SstMetaKey {
640    /// Returns memory used by the key (estimated).
641    fn estimated_size(&self) -> usize {
642        mem::size_of::<Self>()
643    }
644}
645
646/// Path to column pages in the SST file.
647#[derive(Debug, Clone, PartialEq, Eq, Hash)]
648pub struct ColumnPagePath {
649    /// Region id of the SST file to cache.
650    region_id: RegionId,
651    /// Id of the SST file to cache.
652    file_id: FileId,
653    /// Index of the row group.
654    row_group_idx: usize,
655    /// Index of the column in the row group.
656    column_idx: usize,
657}
658
659/// Cache key for pages of a SST row group.
660#[derive(Debug, Clone, PartialEq, Eq, Hash)]
661pub enum PageKey {
662    /// Cache key for a compressed page in a row group.
663    Compressed(ColumnPagePath),
664    /// Cache key for all uncompressed pages in a row group.
665    Uncompressed(ColumnPagePath),
666}
667
668impl PageKey {
669    /// Creates a key for a compressed page.
670    pub fn new_compressed(
671        region_id: RegionId,
672        file_id: FileId,
673        row_group_idx: usize,
674        column_idx: usize,
675    ) -> PageKey {
676        PageKey::Compressed(ColumnPagePath {
677            region_id,
678            file_id,
679            row_group_idx,
680            column_idx,
681        })
682    }
683
684    /// Creates a key for all uncompressed pages in a row group.
685    pub fn new_uncompressed(
686        region_id: RegionId,
687        file_id: FileId,
688        row_group_idx: usize,
689        column_idx: usize,
690    ) -> PageKey {
691        PageKey::Uncompressed(ColumnPagePath {
692            region_id,
693            file_id,
694            row_group_idx,
695            column_idx,
696        })
697    }
698
699    /// Returns memory used by the key (estimated).
700    fn estimated_size(&self) -> usize {
701        mem::size_of::<Self>()
702    }
703}
704
705/// Cached row group pages for a column.
706// We don't use enum here to make it easier to mock and use the struct.
707#[derive(Default)]
708pub struct PageValue {
709    /// Compressed page of the column in the row group.
710    pub compressed: Bytes,
711    /// All pages of the column in the row group.
712    pub row_group: Vec<Page>,
713}
714
715impl PageValue {
716    /// Creates a new value from a compressed page.
717    pub fn new_compressed(bytes: Bytes) -> PageValue {
718        PageValue {
719            compressed: bytes,
720            row_group: vec![],
721        }
722    }
723
724    /// Creates a new value from all pages in a row group.
725    pub fn new_row_group(pages: Vec<Page>) -> PageValue {
726        PageValue {
727            compressed: Bytes::new(),
728            row_group: pages,
729        }
730    }
731
732    /// Returns memory used by the value (estimated).
733    fn estimated_size(&self) -> usize {
734        mem::size_of::<Self>()
735            + self.compressed.len()
736            + self
737                .row_group
738                .iter()
739                .map(|page| page.buffer().len())
740                .sum::<usize>()
741    }
742}
743
744/// Cache key for time series row selector result.
745#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
746pub struct SelectorResultKey {
747    /// Id of the SST file.
748    pub file_id: FileId,
749    /// Index of the row group.
750    pub row_group_idx: usize,
751    /// Time series row selector.
752    pub selector: TimeSeriesRowSelector,
753}
754
755/// Cached result for time series row selector.
756pub struct SelectorResultValue {
757    /// Batches of rows selected by the selector.
758    pub result: Vec<Batch>,
759    /// Projection of rows.
760    pub projection: Vec<usize>,
761}
762
763impl SelectorResultValue {
764    /// Creates a new selector result value.
765    pub fn new(result: Vec<Batch>, projection: Vec<usize>) -> SelectorResultValue {
766        SelectorResultValue { result, projection }
767    }
768
769    /// Returns memory used by the value (estimated).
770    fn estimated_size(&self) -> usize {
771        // We only consider heap size of all batches.
772        self.result.iter().map(|batch| batch.memory_size()).sum()
773    }
774}
775
776/// Maps (region id, file id) to [ParquetMetaData].
777type SstMetaCache = Cache<SstMetaKey, Arc<ParquetMetaData>>;
778/// Maps [Value] to a vector that holds this value repeatedly.
779///
780/// e.g. `"hello" => ["hello", "hello", "hello"]`
781type VectorCache = Cache<(ConcreteDataType, Value), VectorRef>;
782/// Maps (region, file, row group, column) to [PageValue].
783type PageCache = Cache<PageKey, Arc<PageValue>>;
784/// Maps (file id, row group id, time series row selector) to [SelectorResultValue].
785type SelectorResultCache = Cache<SelectorResultKey, Arc<SelectorResultValue>>;
786
787#[cfg(test)]
788mod tests {
789    use std::sync::Arc;
790
791    use datatypes::vectors::Int64Vector;
792
793    use super::*;
794    use crate::cache::test_util::parquet_meta;
795
796    #[tokio::test]
797    async fn test_disable_cache() {
798        let cache = CacheManager::default();
799        assert!(cache.sst_meta_cache.is_none());
800        assert!(cache.vector_cache.is_none());
801        assert!(cache.page_cache.is_none());
802
803        let region_id = RegionId::new(1, 1);
804        let file_id = RegionFileId::new(region_id, FileId::random());
805        let metadata = parquet_meta();
806        cache.put_parquet_meta_data(file_id, metadata);
807        assert!(cache.get_parquet_meta_data(file_id).await.is_none());
808
809        let value = Value::Int64(10);
810        let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10]));
811        cache.put_repeated_vector(value.clone(), vector.clone());
812        assert!(cache
813            .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
814            .is_none());
815
816        let key = PageKey::new_uncompressed(region_id, file_id.file_id(), 0, 0);
817        let pages = Arc::new(PageValue::default());
818        cache.put_pages(key.clone(), pages);
819        assert!(cache.get_pages(&key).is_none());
820
821        assert!(cache.write_cache().is_none());
822    }
823
824    #[tokio::test]
825    async fn test_parquet_meta_cache() {
826        let cache = CacheManager::builder().sst_meta_cache_size(2000).build();
827        let region_id = RegionId::new(1, 1);
828        let file_id = RegionFileId::new(region_id, FileId::random());
829        assert!(cache.get_parquet_meta_data(file_id).await.is_none());
830        let metadata = parquet_meta();
831        cache.put_parquet_meta_data(file_id, metadata);
832        assert!(cache.get_parquet_meta_data(file_id).await.is_some());
833        cache.remove_parquet_meta_data(file_id);
834        assert!(cache.get_parquet_meta_data(file_id).await.is_none());
835    }
836
837    #[test]
838    fn test_repeated_vector_cache() {
839        let cache = CacheManager::builder().vector_cache_size(4096).build();
840        let value = Value::Int64(10);
841        assert!(cache
842            .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
843            .is_none());
844        let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10]));
845        cache.put_repeated_vector(value.clone(), vector.clone());
846        let cached = cache
847            .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
848            .unwrap();
849        assert_eq!(vector, cached);
850    }
851
852    #[test]
853    fn test_page_cache() {
854        let cache = CacheManager::builder().page_cache_size(1000).build();
855        let region_id = RegionId::new(1, 1);
856        let file_id = FileId::random();
857        let key = PageKey::new_compressed(region_id, file_id, 0, 0);
858        assert!(cache.get_pages(&key).is_none());
859        let pages = Arc::new(PageValue::default());
860        cache.put_pages(key.clone(), pages);
861        assert!(cache.get_pages(&key).is_some());
862    }
863
864    #[test]
865    fn test_selector_result_cache() {
866        let cache = CacheManager::builder()
867            .selector_result_cache_size(1000)
868            .build();
869        let file_id = FileId::random();
870        let key = SelectorResultKey {
871            file_id,
872            row_group_idx: 0,
873            selector: TimeSeriesRowSelector::LastRow,
874        };
875        assert!(cache.get_selector_result(&key).is_none());
876        let result = Arc::new(SelectorResultValue::new(Vec::new(), Vec::new()));
877        cache.put_selector_result(key, result);
878        assert!(cache.get_selector_result(&key).is_some());
879    }
880}