mito2/
cache.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Cache for the engine.
16
17mod cache_size;
18
19pub(crate) mod file_cache;
20pub(crate) mod index;
21#[cfg(test)]
22pub(crate) mod test_util;
23pub(crate) mod write_cache;
24
25use std::mem;
26use std::sync::Arc;
27
28use bytes::Bytes;
29use datatypes::value::Value;
30use datatypes::vectors::VectorRef;
31use index::bloom_filter_index::{BloomFilterIndexCache, BloomFilterIndexCacheRef};
32use index::result_cache::IndexResultCache;
33use moka::notification::RemovalCause;
34use moka::sync::Cache;
35use parquet::column::page::Page;
36use parquet::file::metadata::ParquetMetaData;
37use puffin::puffin_manager::cache::{PuffinMetadataCache, PuffinMetadataCacheRef};
38use store_api::storage::{ConcreteDataType, RegionId, TimeSeriesRowSelector};
39
40use crate::cache::cache_size::parquet_meta_size;
41use crate::cache::file_cache::{FileType, IndexKey};
42use crate::cache::index::inverted_index::{InvertedIndexCache, InvertedIndexCacheRef};
43use crate::cache::write_cache::WriteCacheRef;
44use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS};
45use crate::read::Batch;
46use crate::sst::file::FileId;
47
48/// Metrics type key for sst meta.
49const SST_META_TYPE: &str = "sst_meta";
50/// Metrics type key for vector.
51const VECTOR_TYPE: &str = "vector";
52/// Metrics type key for pages.
53const PAGE_TYPE: &str = "page";
54/// Metrics type key for files on the local store.
55const FILE_TYPE: &str = "file";
56/// Metrics type key for selector result cache.
57const SELECTOR_RESULT_TYPE: &str = "selector_result";
58
59/// Cache strategies that may only enable a subset of caches.
60#[derive(Clone)]
61pub enum CacheStrategy {
62    /// Strategy for normal operations.
63    /// Doesn't disable any cache.
64    EnableAll(CacheManagerRef),
65    /// Strategy for compaction.
66    /// Disables some caches during compaction to avoid affecting queries.
67    /// Enables the write cache so that the compaction can read files cached
68    /// in the write cache and write the compacted files back to the write cache.
69    Compaction(CacheManagerRef),
70    /// Do not use any cache.
71    Disabled,
72}
73
74impl CacheStrategy {
75    /// Calls [CacheManager::get_parquet_meta_data()].
76    pub async fn get_parquet_meta_data(
77        &self,
78        region_id: RegionId,
79        file_id: FileId,
80    ) -> Option<Arc<ParquetMetaData>> {
81        match self {
82            CacheStrategy::EnableAll(cache_manager) => {
83                cache_manager
84                    .get_parquet_meta_data(region_id, file_id)
85                    .await
86            }
87            CacheStrategy::Compaction(cache_manager) => {
88                cache_manager
89                    .get_parquet_meta_data(region_id, file_id)
90                    .await
91            }
92            CacheStrategy::Disabled => None,
93        }
94    }
95
96    /// Calls [CacheManager::get_parquet_meta_data_from_mem_cache()].
97    pub fn get_parquet_meta_data_from_mem_cache(
98        &self,
99        region_id: RegionId,
100        file_id: FileId,
101    ) -> Option<Arc<ParquetMetaData>> {
102        match self {
103            CacheStrategy::EnableAll(cache_manager) => {
104                cache_manager.get_parquet_meta_data_from_mem_cache(region_id, file_id)
105            }
106            CacheStrategy::Compaction(cache_manager) => {
107                cache_manager.get_parquet_meta_data_from_mem_cache(region_id, file_id)
108            }
109            CacheStrategy::Disabled => None,
110        }
111    }
112
113    /// Calls [CacheManager::put_parquet_meta_data()].
114    pub fn put_parquet_meta_data(
115        &self,
116        region_id: RegionId,
117        file_id: FileId,
118        metadata: Arc<ParquetMetaData>,
119    ) {
120        match self {
121            CacheStrategy::EnableAll(cache_manager) => {
122                cache_manager.put_parquet_meta_data(region_id, file_id, metadata);
123            }
124            CacheStrategy::Compaction(cache_manager) => {
125                cache_manager.put_parquet_meta_data(region_id, file_id, metadata);
126            }
127            CacheStrategy::Disabled => {}
128        }
129    }
130
131    /// Calls [CacheManager::remove_parquet_meta_data()].
132    pub fn remove_parquet_meta_data(&self, region_id: RegionId, file_id: FileId) {
133        match self {
134            CacheStrategy::EnableAll(cache_manager) => {
135                cache_manager.remove_parquet_meta_data(region_id, file_id);
136            }
137            CacheStrategy::Compaction(cache_manager) => {
138                cache_manager.remove_parquet_meta_data(region_id, file_id);
139            }
140            CacheStrategy::Disabled => {}
141        }
142    }
143
144    /// Calls [CacheManager::get_repeated_vector()].
145    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
146    pub fn get_repeated_vector(
147        &self,
148        data_type: &ConcreteDataType,
149        value: &Value,
150    ) -> Option<VectorRef> {
151        match self {
152            CacheStrategy::EnableAll(cache_manager) => {
153                cache_manager.get_repeated_vector(data_type, value)
154            }
155            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
156        }
157    }
158
159    /// Calls [CacheManager::put_repeated_vector()].
160    /// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
161    pub fn put_repeated_vector(&self, value: Value, vector: VectorRef) {
162        if let CacheStrategy::EnableAll(cache_manager) = self {
163            cache_manager.put_repeated_vector(value, vector);
164        }
165    }
166
167    /// Calls [CacheManager::get_pages()].
168    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
169    pub fn get_pages(&self, page_key: &PageKey) -> Option<Arc<PageValue>> {
170        match self {
171            CacheStrategy::EnableAll(cache_manager) => cache_manager.get_pages(page_key),
172            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
173        }
174    }
175
176    /// Calls [CacheManager::put_pages()].
177    /// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
178    pub fn put_pages(&self, page_key: PageKey, pages: Arc<PageValue>) {
179        if let CacheStrategy::EnableAll(cache_manager) = self {
180            cache_manager.put_pages(page_key, pages);
181        }
182    }
183
184    /// Calls [CacheManager::get_selector_result()].
185    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
186    pub fn get_selector_result(
187        &self,
188        selector_key: &SelectorResultKey,
189    ) -> Option<Arc<SelectorResultValue>> {
190        match self {
191            CacheStrategy::EnableAll(cache_manager) => {
192                cache_manager.get_selector_result(selector_key)
193            }
194            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
195        }
196    }
197
198    /// Calls [CacheManager::put_selector_result()].
199    /// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
200    pub fn put_selector_result(
201        &self,
202        selector_key: SelectorResultKey,
203        result: Arc<SelectorResultValue>,
204    ) {
205        if let CacheStrategy::EnableAll(cache_manager) = self {
206            cache_manager.put_selector_result(selector_key, result);
207        }
208    }
209
210    /// Calls [CacheManager::write_cache()].
211    /// It returns None if the strategy is [CacheStrategy::Disabled].
212    pub fn write_cache(&self) -> Option<&WriteCacheRef> {
213        match self {
214            CacheStrategy::EnableAll(cache_manager) => cache_manager.write_cache(),
215            CacheStrategy::Compaction(cache_manager) => cache_manager.write_cache(),
216            CacheStrategy::Disabled => None,
217        }
218    }
219
220    /// Calls [CacheManager::index_cache()].
221    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
222    pub fn inverted_index_cache(&self) -> Option<&InvertedIndexCacheRef> {
223        match self {
224            CacheStrategy::EnableAll(cache_manager) => cache_manager.inverted_index_cache(),
225            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
226        }
227    }
228
229    /// Calls [CacheManager::bloom_filter_index_cache()].
230    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
231    pub fn bloom_filter_index_cache(&self) -> Option<&BloomFilterIndexCacheRef> {
232        match self {
233            CacheStrategy::EnableAll(cache_manager) => cache_manager.bloom_filter_index_cache(),
234            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
235        }
236    }
237
238    /// Calls [CacheManager::puffin_metadata_cache()].
239    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
240    pub fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
241        match self {
242            CacheStrategy::EnableAll(cache_manager) => cache_manager.puffin_metadata_cache(),
243            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
244        }
245    }
246
247    /// Calls [CacheManager::index_result_cache()].
248    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
249    pub fn index_result_cache(&self) -> Option<&IndexResultCache> {
250        match self {
251            CacheStrategy::EnableAll(cache_manager) => cache_manager.index_result_cache(),
252            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
253        }
254    }
255}
256
257/// Manages cached data for the engine.
258///
259/// All caches are disabled by default.
260#[derive(Default)]
261pub struct CacheManager {
262    /// Cache for SST metadata.
263    sst_meta_cache: Option<SstMetaCache>,
264    /// Cache for vectors.
265    vector_cache: Option<VectorCache>,
266    /// Cache for SST pages.
267    page_cache: Option<PageCache>,
268    /// A Cache for writing files to object stores.
269    write_cache: Option<WriteCacheRef>,
270    /// Cache for inverted index.
271    inverted_index_cache: Option<InvertedIndexCacheRef>,
272    /// Cache for bloom filter index.
273    bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
274    /// Puffin metadata cache.
275    puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
276    /// Cache for time series selectors.
277    selector_result_cache: Option<SelectorResultCache>,
278    /// Cache for index result.
279    index_result_cache: Option<IndexResultCache>,
280}
281
282pub type CacheManagerRef = Arc<CacheManager>;
283
284impl CacheManager {
285    /// Returns a builder to build the cache.
286    pub fn builder() -> CacheManagerBuilder {
287        CacheManagerBuilder::default()
288    }
289
290    /// Gets cached [ParquetMetaData] from in-memory cache first.
291    /// If not found, tries to get it from write cache and fill the in-memory cache.
292    pub async fn get_parquet_meta_data(
293        &self,
294        region_id: RegionId,
295        file_id: FileId,
296    ) -> Option<Arc<ParquetMetaData>> {
297        // Try to get metadata from sst meta cache
298        let metadata = self.get_parquet_meta_data_from_mem_cache(region_id, file_id);
299        if metadata.is_some() {
300            return metadata;
301        }
302
303        // Try to get metadata from write cache
304        let key = IndexKey::new(region_id, file_id, FileType::Parquet);
305        if let Some(write_cache) = &self.write_cache {
306            if let Some(metadata) = write_cache.file_cache().get_parquet_meta_data(key).await {
307                let metadata = Arc::new(metadata);
308                // Put metadata into sst meta cache
309                self.put_parquet_meta_data(region_id, file_id, metadata.clone());
310                return Some(metadata);
311            }
312        };
313
314        None
315    }
316
317    /// Gets cached [ParquetMetaData] from in-memory cache.
318    /// This method does not perform I/O.
319    pub fn get_parquet_meta_data_from_mem_cache(
320        &self,
321        region_id: RegionId,
322        file_id: FileId,
323    ) -> Option<Arc<ParquetMetaData>> {
324        // Try to get metadata from sst meta cache
325        self.sst_meta_cache.as_ref().and_then(|sst_meta_cache| {
326            let value = sst_meta_cache.get(&SstMetaKey(region_id, file_id));
327            update_hit_miss(value, SST_META_TYPE)
328        })
329    }
330
331    /// Puts [ParquetMetaData] into the cache.
332    pub fn put_parquet_meta_data(
333        &self,
334        region_id: RegionId,
335        file_id: FileId,
336        metadata: Arc<ParquetMetaData>,
337    ) {
338        if let Some(cache) = &self.sst_meta_cache {
339            let key = SstMetaKey(region_id, file_id);
340            CACHE_BYTES
341                .with_label_values(&[SST_META_TYPE])
342                .add(meta_cache_weight(&key, &metadata).into());
343            cache.insert(key, metadata);
344        }
345    }
346
347    /// Removes [ParquetMetaData] from the cache.
348    pub fn remove_parquet_meta_data(&self, region_id: RegionId, file_id: FileId) {
349        if let Some(cache) = &self.sst_meta_cache {
350            cache.remove(&SstMetaKey(region_id, file_id));
351        }
352    }
353
354    /// Gets a vector with repeated value for specific `key`.
355    pub fn get_repeated_vector(
356        &self,
357        data_type: &ConcreteDataType,
358        value: &Value,
359    ) -> Option<VectorRef> {
360        self.vector_cache.as_ref().and_then(|vector_cache| {
361            let value = vector_cache.get(&(data_type.clone(), value.clone()));
362            update_hit_miss(value, VECTOR_TYPE)
363        })
364    }
365
366    /// Puts a vector with repeated value into the cache.
367    pub fn put_repeated_vector(&self, value: Value, vector: VectorRef) {
368        if let Some(cache) = &self.vector_cache {
369            let key = (vector.data_type(), value);
370            CACHE_BYTES
371                .with_label_values(&[VECTOR_TYPE])
372                .add(vector_cache_weight(&key, &vector).into());
373            cache.insert(key, vector);
374        }
375    }
376
377    /// Gets pages for the row group.
378    pub fn get_pages(&self, page_key: &PageKey) -> Option<Arc<PageValue>> {
379        self.page_cache.as_ref().and_then(|page_cache| {
380            let value = page_cache.get(page_key);
381            update_hit_miss(value, PAGE_TYPE)
382        })
383    }
384
385    /// Puts pages of the row group into the cache.
386    pub fn put_pages(&self, page_key: PageKey, pages: Arc<PageValue>) {
387        if let Some(cache) = &self.page_cache {
388            CACHE_BYTES
389                .with_label_values(&[PAGE_TYPE])
390                .add(page_cache_weight(&page_key, &pages).into());
391            cache.insert(page_key, pages);
392        }
393    }
394
395    /// Gets result of for the selector.
396    pub fn get_selector_result(
397        &self,
398        selector_key: &SelectorResultKey,
399    ) -> Option<Arc<SelectorResultValue>> {
400        self.selector_result_cache
401            .as_ref()
402            .and_then(|selector_result_cache| selector_result_cache.get(selector_key))
403    }
404
405    /// Puts result of the selector into the cache.
406    pub fn put_selector_result(
407        &self,
408        selector_key: SelectorResultKey,
409        result: Arc<SelectorResultValue>,
410    ) {
411        if let Some(cache) = &self.selector_result_cache {
412            CACHE_BYTES
413                .with_label_values(&[SELECTOR_RESULT_TYPE])
414                .add(selector_result_cache_weight(&selector_key, &result).into());
415            cache.insert(selector_key, result);
416        }
417    }
418
419    /// Gets the write cache.
420    pub(crate) fn write_cache(&self) -> Option<&WriteCacheRef> {
421        self.write_cache.as_ref()
422    }
423
424    pub(crate) fn inverted_index_cache(&self) -> Option<&InvertedIndexCacheRef> {
425        self.inverted_index_cache.as_ref()
426    }
427
428    pub(crate) fn bloom_filter_index_cache(&self) -> Option<&BloomFilterIndexCacheRef> {
429        self.bloom_filter_index_cache.as_ref()
430    }
431
432    pub(crate) fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
433        self.puffin_metadata_cache.as_ref()
434    }
435
436    pub(crate) fn index_result_cache(&self) -> Option<&IndexResultCache> {
437        self.index_result_cache.as_ref()
438    }
439}
440
441/// Increases selector cache miss metrics.
442pub fn selector_result_cache_miss() {
443    CACHE_MISS.with_label_values(&[SELECTOR_RESULT_TYPE]).inc()
444}
445
446/// Increases selector cache hit metrics.
447pub fn selector_result_cache_hit() {
448    CACHE_HIT.with_label_values(&[SELECTOR_RESULT_TYPE]).inc()
449}
450
451/// Builder to construct a [CacheManager].
452#[derive(Default)]
453pub struct CacheManagerBuilder {
454    sst_meta_cache_size: u64,
455    vector_cache_size: u64,
456    page_cache_size: u64,
457    index_metadata_size: u64,
458    index_content_size: u64,
459    index_content_page_size: u64,
460    index_result_cache_size: u64,
461    puffin_metadata_size: u64,
462    write_cache: Option<WriteCacheRef>,
463    selector_result_cache_size: u64,
464}
465
466impl CacheManagerBuilder {
467    /// Sets meta cache size.
468    pub fn sst_meta_cache_size(mut self, bytes: u64) -> Self {
469        self.sst_meta_cache_size = bytes;
470        self
471    }
472
473    /// Sets vector cache size.
474    pub fn vector_cache_size(mut self, bytes: u64) -> Self {
475        self.vector_cache_size = bytes;
476        self
477    }
478
479    /// Sets page cache size.
480    pub fn page_cache_size(mut self, bytes: u64) -> Self {
481        self.page_cache_size = bytes;
482        self
483    }
484
485    /// Sets write cache.
486    pub fn write_cache(mut self, cache: Option<WriteCacheRef>) -> Self {
487        self.write_cache = cache;
488        self
489    }
490
491    /// Sets cache size for index metadata.
492    pub fn index_metadata_size(mut self, bytes: u64) -> Self {
493        self.index_metadata_size = bytes;
494        self
495    }
496
497    /// Sets cache size for index content.
498    pub fn index_content_size(mut self, bytes: u64) -> Self {
499        self.index_content_size = bytes;
500        self
501    }
502
503    /// Sets page size for index content.
504    pub fn index_content_page_size(mut self, bytes: u64) -> Self {
505        self.index_content_page_size = bytes;
506        self
507    }
508
509    /// Sets cache size for index result.
510    pub fn index_result_cache_size(mut self, bytes: u64) -> Self {
511        self.index_result_cache_size = bytes;
512        self
513    }
514
515    /// Sets cache size for puffin metadata.
516    pub fn puffin_metadata_size(mut self, bytes: u64) -> Self {
517        self.puffin_metadata_size = bytes;
518        self
519    }
520
521    /// Sets selector result cache size.
522    pub fn selector_result_cache_size(mut self, bytes: u64) -> Self {
523        self.selector_result_cache_size = bytes;
524        self
525    }
526
527    /// Builds the [CacheManager].
528    pub fn build(self) -> CacheManager {
529        fn to_str(cause: RemovalCause) -> &'static str {
530            match cause {
531                RemovalCause::Expired => "expired",
532                RemovalCause::Explicit => "explicit",
533                RemovalCause::Replaced => "replaced",
534                RemovalCause::Size => "size",
535            }
536        }
537
538        let sst_meta_cache = (self.sst_meta_cache_size != 0).then(|| {
539            Cache::builder()
540                .max_capacity(self.sst_meta_cache_size)
541                .weigher(meta_cache_weight)
542                .eviction_listener(|k, v, cause| {
543                    let size = meta_cache_weight(&k, &v);
544                    CACHE_BYTES
545                        .with_label_values(&[SST_META_TYPE])
546                        .sub(size.into());
547                    CACHE_EVICTION
548                        .with_label_values(&[SST_META_TYPE, to_str(cause)])
549                        .inc();
550                })
551                .build()
552        });
553        let vector_cache = (self.vector_cache_size != 0).then(|| {
554            Cache::builder()
555                .max_capacity(self.vector_cache_size)
556                .weigher(vector_cache_weight)
557                .eviction_listener(|k, v, cause| {
558                    let size = vector_cache_weight(&k, &v);
559                    CACHE_BYTES
560                        .with_label_values(&[VECTOR_TYPE])
561                        .sub(size.into());
562                    CACHE_EVICTION
563                        .with_label_values(&[VECTOR_TYPE, to_str(cause)])
564                        .inc();
565                })
566                .build()
567        });
568        let page_cache = (self.page_cache_size != 0).then(|| {
569            Cache::builder()
570                .max_capacity(self.page_cache_size)
571                .weigher(page_cache_weight)
572                .eviction_listener(|k, v, cause| {
573                    let size = page_cache_weight(&k, &v);
574                    CACHE_BYTES.with_label_values(&[PAGE_TYPE]).sub(size.into());
575                    CACHE_EVICTION
576                        .with_label_values(&[PAGE_TYPE, to_str(cause)])
577                        .inc();
578                })
579                .build()
580        });
581        let inverted_index_cache = InvertedIndexCache::new(
582            self.index_metadata_size,
583            self.index_content_size,
584            self.index_content_page_size,
585        );
586        // TODO(ruihang): check if it's ok to reuse the same param with inverted index
587        let bloom_filter_index_cache = BloomFilterIndexCache::new(
588            self.index_metadata_size,
589            self.index_content_size,
590            self.index_content_page_size,
591        );
592        let index_result_cache = (self.index_result_cache_size != 0)
593            .then(|| IndexResultCache::new(self.index_result_cache_size));
594        let puffin_metadata_cache =
595            PuffinMetadataCache::new(self.puffin_metadata_size, &CACHE_BYTES);
596        let selector_result_cache = (self.selector_result_cache_size != 0).then(|| {
597            Cache::builder()
598                .max_capacity(self.selector_result_cache_size)
599                .weigher(selector_result_cache_weight)
600                .eviction_listener(|k, v, cause| {
601                    let size = selector_result_cache_weight(&k, &v);
602                    CACHE_BYTES
603                        .with_label_values(&[SELECTOR_RESULT_TYPE])
604                        .sub(size.into());
605                    CACHE_EVICTION
606                        .with_label_values(&[SELECTOR_RESULT_TYPE, to_str(cause)])
607                        .inc();
608                })
609                .build()
610        });
611        CacheManager {
612            sst_meta_cache,
613            vector_cache,
614            page_cache,
615            write_cache: self.write_cache,
616            inverted_index_cache: Some(Arc::new(inverted_index_cache)),
617            bloom_filter_index_cache: Some(Arc::new(bloom_filter_index_cache)),
618            puffin_metadata_cache: Some(Arc::new(puffin_metadata_cache)),
619            selector_result_cache,
620            index_result_cache,
621        }
622    }
623}
624
625fn meta_cache_weight(k: &SstMetaKey, v: &Arc<ParquetMetaData>) -> u32 {
626    // We ignore the size of `Arc`.
627    (k.estimated_size() + parquet_meta_size(v)) as u32
628}
629
630fn vector_cache_weight(_k: &(ConcreteDataType, Value), v: &VectorRef) -> u32 {
631    // We ignore the heap size of `Value`.
632    (mem::size_of::<ConcreteDataType>() + mem::size_of::<Value>() + v.memory_size()) as u32
633}
634
635fn page_cache_weight(k: &PageKey, v: &Arc<PageValue>) -> u32 {
636    (k.estimated_size() + v.estimated_size()) as u32
637}
638
639fn selector_result_cache_weight(k: &SelectorResultKey, v: &Arc<SelectorResultValue>) -> u32 {
640    (mem::size_of_val(k) + v.estimated_size()) as u32
641}
642
643/// Updates cache hit/miss metrics.
644fn update_hit_miss<T>(value: Option<T>, cache_type: &str) -> Option<T> {
645    if value.is_some() {
646        CACHE_HIT.with_label_values(&[cache_type]).inc();
647    } else {
648        CACHE_MISS.with_label_values(&[cache_type]).inc();
649    }
650    value
651}
652
653/// Cache key (region id, file id) for SST meta.
654#[derive(Debug, Clone, PartialEq, Eq, Hash)]
655struct SstMetaKey(RegionId, FileId);
656
657impl SstMetaKey {
658    /// Returns memory used by the key (estimated).
659    fn estimated_size(&self) -> usize {
660        mem::size_of::<Self>()
661    }
662}
663
664/// Path to column pages in the SST file.
665#[derive(Debug, Clone, PartialEq, Eq, Hash)]
666pub struct ColumnPagePath {
667    /// Region id of the SST file to cache.
668    region_id: RegionId,
669    /// Id of the SST file to cache.
670    file_id: FileId,
671    /// Index of the row group.
672    row_group_idx: usize,
673    /// Index of the column in the row group.
674    column_idx: usize,
675}
676
677/// Cache key for pages of a SST row group.
678#[derive(Debug, Clone, PartialEq, Eq, Hash)]
679pub enum PageKey {
680    /// Cache key for a compressed page in a row group.
681    Compressed(ColumnPagePath),
682    /// Cache key for all uncompressed pages in a row group.
683    Uncompressed(ColumnPagePath),
684}
685
686impl PageKey {
687    /// Creates a key for a compressed page.
688    pub fn new_compressed(
689        region_id: RegionId,
690        file_id: FileId,
691        row_group_idx: usize,
692        column_idx: usize,
693    ) -> PageKey {
694        PageKey::Compressed(ColumnPagePath {
695            region_id,
696            file_id,
697            row_group_idx,
698            column_idx,
699        })
700    }
701
702    /// Creates a key for all uncompressed pages in a row group.
703    pub fn new_uncompressed(
704        region_id: RegionId,
705        file_id: FileId,
706        row_group_idx: usize,
707        column_idx: usize,
708    ) -> PageKey {
709        PageKey::Uncompressed(ColumnPagePath {
710            region_id,
711            file_id,
712            row_group_idx,
713            column_idx,
714        })
715    }
716
717    /// Returns memory used by the key (estimated).
718    fn estimated_size(&self) -> usize {
719        mem::size_of::<Self>()
720    }
721}
722
723/// Cached row group pages for a column.
724// We don't use enum here to make it easier to mock and use the struct.
725#[derive(Default)]
726pub struct PageValue {
727    /// Compressed page of the column in the row group.
728    pub compressed: Bytes,
729    /// All pages of the column in the row group.
730    pub row_group: Vec<Page>,
731}
732
733impl PageValue {
734    /// Creates a new value from a compressed page.
735    pub fn new_compressed(bytes: Bytes) -> PageValue {
736        PageValue {
737            compressed: bytes,
738            row_group: vec![],
739        }
740    }
741
742    /// Creates a new value from all pages in a row group.
743    pub fn new_row_group(pages: Vec<Page>) -> PageValue {
744        PageValue {
745            compressed: Bytes::new(),
746            row_group: pages,
747        }
748    }
749
750    /// Returns memory used by the value (estimated).
751    fn estimated_size(&self) -> usize {
752        mem::size_of::<Self>()
753            + self.compressed.len()
754            + self
755                .row_group
756                .iter()
757                .map(|page| page.buffer().len())
758                .sum::<usize>()
759    }
760}
761
762/// Cache key for time series row selector result.
763#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
764pub struct SelectorResultKey {
765    /// Id of the SST file.
766    pub file_id: FileId,
767    /// Index of the row group.
768    pub row_group_idx: usize,
769    /// Time series row selector.
770    pub selector: TimeSeriesRowSelector,
771}
772
773/// Cached result for time series row selector.
774pub struct SelectorResultValue {
775    /// Batches of rows selected by the selector.
776    pub result: Vec<Batch>,
777    /// Projection of rows.
778    pub projection: Vec<usize>,
779}
780
781impl SelectorResultValue {
782    /// Creates a new selector result value.
783    pub fn new(result: Vec<Batch>, projection: Vec<usize>) -> SelectorResultValue {
784        SelectorResultValue { result, projection }
785    }
786
787    /// Returns memory used by the value (estimated).
788    fn estimated_size(&self) -> usize {
789        // We only consider heap size of all batches.
790        self.result.iter().map(|batch| batch.memory_size()).sum()
791    }
792}
793
794/// Maps (region id, file id) to [ParquetMetaData].
795type SstMetaCache = Cache<SstMetaKey, Arc<ParquetMetaData>>;
796/// Maps [Value] to a vector that holds this value repeatedly.
797///
798/// e.g. `"hello" => ["hello", "hello", "hello"]`
799type VectorCache = Cache<(ConcreteDataType, Value), VectorRef>;
800/// Maps (region, file, row group, column) to [PageValue].
801type PageCache = Cache<PageKey, Arc<PageValue>>;
802/// Maps (file id, row group id, time series row selector) to [SelectorResultValue].
803type SelectorResultCache = Cache<SelectorResultKey, Arc<SelectorResultValue>>;
804
805#[cfg(test)]
806mod tests {
807    use std::sync::Arc;
808
809    use datatypes::vectors::Int64Vector;
810
811    use super::*;
812    use crate::cache::test_util::parquet_meta;
813
814    #[tokio::test]
815    async fn test_disable_cache() {
816        let cache = CacheManager::default();
817        assert!(cache.sst_meta_cache.is_none());
818        assert!(cache.vector_cache.is_none());
819        assert!(cache.page_cache.is_none());
820
821        let region_id = RegionId::new(1, 1);
822        let file_id = FileId::random();
823        let metadata = parquet_meta();
824        cache.put_parquet_meta_data(region_id, file_id, metadata);
825        assert!(cache
826            .get_parquet_meta_data(region_id, file_id)
827            .await
828            .is_none());
829
830        let value = Value::Int64(10);
831        let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10]));
832        cache.put_repeated_vector(value.clone(), vector.clone());
833        assert!(cache
834            .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
835            .is_none());
836
837        let key = PageKey::new_uncompressed(region_id, file_id, 0, 0);
838        let pages = Arc::new(PageValue::default());
839        cache.put_pages(key.clone(), pages);
840        assert!(cache.get_pages(&key).is_none());
841
842        assert!(cache.write_cache().is_none());
843    }
844
845    #[tokio::test]
846    async fn test_parquet_meta_cache() {
847        let cache = CacheManager::builder().sst_meta_cache_size(2000).build();
848        let region_id = RegionId::new(1, 1);
849        let file_id = FileId::random();
850        assert!(cache
851            .get_parquet_meta_data(region_id, file_id)
852            .await
853            .is_none());
854        let metadata = parquet_meta();
855        cache.put_parquet_meta_data(region_id, file_id, metadata);
856        assert!(cache
857            .get_parquet_meta_data(region_id, file_id)
858            .await
859            .is_some());
860        cache.remove_parquet_meta_data(region_id, file_id);
861        assert!(cache
862            .get_parquet_meta_data(region_id, file_id)
863            .await
864            .is_none());
865    }
866
867    #[test]
868    fn test_repeated_vector_cache() {
869        let cache = CacheManager::builder().vector_cache_size(4096).build();
870        let value = Value::Int64(10);
871        assert!(cache
872            .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
873            .is_none());
874        let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10]));
875        cache.put_repeated_vector(value.clone(), vector.clone());
876        let cached = cache
877            .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
878            .unwrap();
879        assert_eq!(vector, cached);
880    }
881
882    #[test]
883    fn test_page_cache() {
884        let cache = CacheManager::builder().page_cache_size(1000).build();
885        let region_id = RegionId::new(1, 1);
886        let file_id = FileId::random();
887        let key = PageKey::new_compressed(region_id, file_id, 0, 0);
888        assert!(cache.get_pages(&key).is_none());
889        let pages = Arc::new(PageValue::default());
890        cache.put_pages(key.clone(), pages);
891        assert!(cache.get_pages(&key).is_some());
892    }
893
894    #[test]
895    fn test_selector_result_cache() {
896        let cache = CacheManager::builder()
897            .selector_result_cache_size(1000)
898            .build();
899        let file_id = FileId::random();
900        let key = SelectorResultKey {
901            file_id,
902            row_group_idx: 0,
903            selector: TimeSeriesRowSelector::LastRow,
904        };
905        assert!(cache.get_selector_result(&key).is_none());
906        let result = Arc::new(SelectorResultValue::new(Vec::new(), Vec::new()));
907        cache.put_selector_result(key, result);
908        assert!(cache.get_selector_result(&key).is_some());
909    }
910}