mito2/
cache.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Cache for the engine.
16
17mod cache_size;
18
19pub(crate) mod file_cache;
20pub(crate) mod index;
21#[cfg(test)]
22pub(crate) mod test_util;
23pub(crate) mod write_cache;
24
25use std::mem;
26use std::sync::Arc;
27
28use bytes::Bytes;
29use datatypes::value::Value;
30use datatypes::vectors::VectorRef;
31use index::bloom_filter_index::{BloomFilterIndexCache, BloomFilterIndexCacheRef};
32use moka::notification::RemovalCause;
33use moka::sync::Cache;
34use parquet::column::page::Page;
35use parquet::file::metadata::ParquetMetaData;
36use puffin::puffin_manager::cache::{PuffinMetadataCache, PuffinMetadataCacheRef};
37use store_api::storage::{ConcreteDataType, RegionId, TimeSeriesRowSelector};
38
39use crate::cache::cache_size::parquet_meta_size;
40use crate::cache::file_cache::{FileType, IndexKey};
41use crate::cache::index::inverted_index::{InvertedIndexCache, InvertedIndexCacheRef};
42use crate::cache::write_cache::WriteCacheRef;
43use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS};
44use crate::read::Batch;
45use crate::sst::file::FileId;
46
47/// Metrics type key for sst meta.
48const SST_META_TYPE: &str = "sst_meta";
49/// Metrics type key for vector.
50const VECTOR_TYPE: &str = "vector";
51/// Metrics type key for pages.
52const PAGE_TYPE: &str = "page";
53/// Metrics type key for files on the local store.
54const FILE_TYPE: &str = "file";
55/// Metrics type key for selector result cache.
56const SELECTOR_RESULT_TYPE: &str = "selector_result";
57
58/// Cache strategies that may only enable a subset of caches.
59#[derive(Clone)]
60pub enum CacheStrategy {
61    /// Strategy for normal operations.
62    /// Doesn't disable any cache.
63    EnableAll(CacheManagerRef),
64    /// Strategy for compaction.
65    /// Disables some caches during compaction to avoid affecting queries.
66    /// Enables the write cache so that the compaction can read files cached
67    /// in the write cache and write the compacted files back to the write cache.
68    Compaction(CacheManagerRef),
69    /// Do not use any cache.
70    Disabled,
71}
72
73impl CacheStrategy {
74    /// Calls [CacheManager::get_parquet_meta_data()].
75    pub async fn get_parquet_meta_data(
76        &self,
77        region_id: RegionId,
78        file_id: FileId,
79    ) -> Option<Arc<ParquetMetaData>> {
80        match self {
81            CacheStrategy::EnableAll(cache_manager) => {
82                cache_manager
83                    .get_parquet_meta_data(region_id, file_id)
84                    .await
85            }
86            CacheStrategy::Compaction(cache_manager) => {
87                cache_manager
88                    .get_parquet_meta_data(region_id, file_id)
89                    .await
90            }
91            CacheStrategy::Disabled => None,
92        }
93    }
94
95    /// Calls [CacheManager::get_parquet_meta_data_from_mem_cache()].
96    pub fn get_parquet_meta_data_from_mem_cache(
97        &self,
98        region_id: RegionId,
99        file_id: FileId,
100    ) -> Option<Arc<ParquetMetaData>> {
101        match self {
102            CacheStrategy::EnableAll(cache_manager) => {
103                cache_manager.get_parquet_meta_data_from_mem_cache(region_id, file_id)
104            }
105            CacheStrategy::Compaction(cache_manager) => {
106                cache_manager.get_parquet_meta_data_from_mem_cache(region_id, file_id)
107            }
108            CacheStrategy::Disabled => None,
109        }
110    }
111
112    /// Calls [CacheManager::put_parquet_meta_data()].
113    pub fn put_parquet_meta_data(
114        &self,
115        region_id: RegionId,
116        file_id: FileId,
117        metadata: Arc<ParquetMetaData>,
118    ) {
119        match self {
120            CacheStrategy::EnableAll(cache_manager) => {
121                cache_manager.put_parquet_meta_data(region_id, file_id, metadata);
122            }
123            CacheStrategy::Compaction(cache_manager) => {
124                cache_manager.put_parquet_meta_data(region_id, file_id, metadata);
125            }
126            CacheStrategy::Disabled => {}
127        }
128    }
129
130    /// Calls [CacheManager::remove_parquet_meta_data()].
131    pub fn remove_parquet_meta_data(&self, region_id: RegionId, file_id: FileId) {
132        match self {
133            CacheStrategy::EnableAll(cache_manager) => {
134                cache_manager.remove_parquet_meta_data(region_id, file_id);
135            }
136            CacheStrategy::Compaction(cache_manager) => {
137                cache_manager.remove_parquet_meta_data(region_id, file_id);
138            }
139            CacheStrategy::Disabled => {}
140        }
141    }
142
143    /// Calls [CacheManager::get_repeated_vector()].
144    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
145    pub fn get_repeated_vector(
146        &self,
147        data_type: &ConcreteDataType,
148        value: &Value,
149    ) -> Option<VectorRef> {
150        match self {
151            CacheStrategy::EnableAll(cache_manager) => {
152                cache_manager.get_repeated_vector(data_type, value)
153            }
154            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
155        }
156    }
157
158    /// Calls [CacheManager::put_repeated_vector()].
159    /// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
160    pub fn put_repeated_vector(&self, value: Value, vector: VectorRef) {
161        if let CacheStrategy::EnableAll(cache_manager) = self {
162            cache_manager.put_repeated_vector(value, vector);
163        }
164    }
165
166    /// Calls [CacheManager::get_pages()].
167    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
168    pub fn get_pages(&self, page_key: &PageKey) -> Option<Arc<PageValue>> {
169        match self {
170            CacheStrategy::EnableAll(cache_manager) => cache_manager.get_pages(page_key),
171            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
172        }
173    }
174
175    /// Calls [CacheManager::put_pages()].
176    /// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
177    pub fn put_pages(&self, page_key: PageKey, pages: Arc<PageValue>) {
178        if let CacheStrategy::EnableAll(cache_manager) = self {
179            cache_manager.put_pages(page_key, pages);
180        }
181    }
182
183    /// Calls [CacheManager::get_selector_result()].
184    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
185    pub fn get_selector_result(
186        &self,
187        selector_key: &SelectorResultKey,
188    ) -> Option<Arc<SelectorResultValue>> {
189        match self {
190            CacheStrategy::EnableAll(cache_manager) => {
191                cache_manager.get_selector_result(selector_key)
192            }
193            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
194        }
195    }
196
197    /// Calls [CacheManager::put_selector_result()].
198    /// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
199    pub fn put_selector_result(
200        &self,
201        selector_key: SelectorResultKey,
202        result: Arc<SelectorResultValue>,
203    ) {
204        if let CacheStrategy::EnableAll(cache_manager) = self {
205            cache_manager.put_selector_result(selector_key, result);
206        }
207    }
208
209    /// Calls [CacheManager::write_cache()].
210    /// It returns None if the strategy is [CacheStrategy::Disabled].
211    pub fn write_cache(&self) -> Option<&WriteCacheRef> {
212        match self {
213            CacheStrategy::EnableAll(cache_manager) => cache_manager.write_cache(),
214            CacheStrategy::Compaction(cache_manager) => cache_manager.write_cache(),
215            CacheStrategy::Disabled => None,
216        }
217    }
218
219    /// Calls [CacheManager::index_cache()].
220    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
221    pub fn inverted_index_cache(&self) -> Option<&InvertedIndexCacheRef> {
222        match self {
223            CacheStrategy::EnableAll(cache_manager) => cache_manager.inverted_index_cache(),
224            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
225        }
226    }
227
228    /// Calls [CacheManager::bloom_filter_index_cache()].
229    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
230    pub fn bloom_filter_index_cache(&self) -> Option<&BloomFilterIndexCacheRef> {
231        match self {
232            CacheStrategy::EnableAll(cache_manager) => cache_manager.bloom_filter_index_cache(),
233            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
234        }
235    }
236
237    /// Calls [CacheManager::puffin_metadata_cache()].
238    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
239    pub fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
240        match self {
241            CacheStrategy::EnableAll(cache_manager) => cache_manager.puffin_metadata_cache(),
242            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
243        }
244    }
245}
246
247/// Manages cached data for the engine.
248///
249/// All caches are disabled by default.
250#[derive(Default)]
251pub struct CacheManager {
252    /// Cache for SST metadata.
253    sst_meta_cache: Option<SstMetaCache>,
254    /// Cache for vectors.
255    vector_cache: Option<VectorCache>,
256    /// Cache for SST pages.
257    page_cache: Option<PageCache>,
258    /// A Cache for writing files to object stores.
259    write_cache: Option<WriteCacheRef>,
260    /// Cache for inverted index.
261    index_cache: Option<InvertedIndexCacheRef>,
262    /// Cache for bloom filter index.
263    bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
264    /// Puffin metadata cache.
265    puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
266    /// Cache for time series selectors.
267    selector_result_cache: Option<SelectorResultCache>,
268}
269
270pub type CacheManagerRef = Arc<CacheManager>;
271
272impl CacheManager {
273    /// Returns a builder to build the cache.
274    pub fn builder() -> CacheManagerBuilder {
275        CacheManagerBuilder::default()
276    }
277
278    /// Gets cached [ParquetMetaData] from in-memory cache first.
279    /// If not found, tries to get it from write cache and fill the in-memory cache.
280    pub async fn get_parquet_meta_data(
281        &self,
282        region_id: RegionId,
283        file_id: FileId,
284    ) -> Option<Arc<ParquetMetaData>> {
285        // Try to get metadata from sst meta cache
286        let metadata = self.get_parquet_meta_data_from_mem_cache(region_id, file_id);
287        if metadata.is_some() {
288            return metadata;
289        }
290
291        // Try to get metadata from write cache
292        let key = IndexKey::new(region_id, file_id, FileType::Parquet);
293        if let Some(write_cache) = &self.write_cache {
294            if let Some(metadata) = write_cache.file_cache().get_parquet_meta_data(key).await {
295                let metadata = Arc::new(metadata);
296                // Put metadata into sst meta cache
297                self.put_parquet_meta_data(region_id, file_id, metadata.clone());
298                return Some(metadata);
299            }
300        };
301
302        None
303    }
304
305    /// Gets cached [ParquetMetaData] from in-memory cache.
306    /// This method does not perform I/O.
307    pub fn get_parquet_meta_data_from_mem_cache(
308        &self,
309        region_id: RegionId,
310        file_id: FileId,
311    ) -> Option<Arc<ParquetMetaData>> {
312        // Try to get metadata from sst meta cache
313        self.sst_meta_cache.as_ref().and_then(|sst_meta_cache| {
314            let value = sst_meta_cache.get(&SstMetaKey(region_id, file_id));
315            update_hit_miss(value, SST_META_TYPE)
316        })
317    }
318
319    /// Puts [ParquetMetaData] into the cache.
320    pub fn put_parquet_meta_data(
321        &self,
322        region_id: RegionId,
323        file_id: FileId,
324        metadata: Arc<ParquetMetaData>,
325    ) {
326        if let Some(cache) = &self.sst_meta_cache {
327            let key = SstMetaKey(region_id, file_id);
328            CACHE_BYTES
329                .with_label_values(&[SST_META_TYPE])
330                .add(meta_cache_weight(&key, &metadata).into());
331            cache.insert(key, metadata);
332        }
333    }
334
335    /// Removes [ParquetMetaData] from the cache.
336    pub fn remove_parquet_meta_data(&self, region_id: RegionId, file_id: FileId) {
337        if let Some(cache) = &self.sst_meta_cache {
338            cache.remove(&SstMetaKey(region_id, file_id));
339        }
340    }
341
342    /// Gets a vector with repeated value for specific `key`.
343    pub fn get_repeated_vector(
344        &self,
345        data_type: &ConcreteDataType,
346        value: &Value,
347    ) -> Option<VectorRef> {
348        self.vector_cache.as_ref().and_then(|vector_cache| {
349            let value = vector_cache.get(&(data_type.clone(), value.clone()));
350            update_hit_miss(value, VECTOR_TYPE)
351        })
352    }
353
354    /// Puts a vector with repeated value into the cache.
355    pub fn put_repeated_vector(&self, value: Value, vector: VectorRef) {
356        if let Some(cache) = &self.vector_cache {
357            let key = (vector.data_type(), value);
358            CACHE_BYTES
359                .with_label_values(&[VECTOR_TYPE])
360                .add(vector_cache_weight(&key, &vector).into());
361            cache.insert(key, vector);
362        }
363    }
364
365    /// Gets pages for the row group.
366    pub fn get_pages(&self, page_key: &PageKey) -> Option<Arc<PageValue>> {
367        self.page_cache.as_ref().and_then(|page_cache| {
368            let value = page_cache.get(page_key);
369            update_hit_miss(value, PAGE_TYPE)
370        })
371    }
372
373    /// Puts pages of the row group into the cache.
374    pub fn put_pages(&self, page_key: PageKey, pages: Arc<PageValue>) {
375        if let Some(cache) = &self.page_cache {
376            CACHE_BYTES
377                .with_label_values(&[PAGE_TYPE])
378                .add(page_cache_weight(&page_key, &pages).into());
379            cache.insert(page_key, pages);
380        }
381    }
382
383    /// Gets result of for the selector.
384    pub fn get_selector_result(
385        &self,
386        selector_key: &SelectorResultKey,
387    ) -> Option<Arc<SelectorResultValue>> {
388        self.selector_result_cache
389            .as_ref()
390            .and_then(|selector_result_cache| selector_result_cache.get(selector_key))
391    }
392
393    /// Puts result of the selector into the cache.
394    pub fn put_selector_result(
395        &self,
396        selector_key: SelectorResultKey,
397        result: Arc<SelectorResultValue>,
398    ) {
399        if let Some(cache) = &self.selector_result_cache {
400            CACHE_BYTES
401                .with_label_values(&[SELECTOR_RESULT_TYPE])
402                .add(selector_result_cache_weight(&selector_key, &result).into());
403            cache.insert(selector_key, result);
404        }
405    }
406
407    /// Gets the write cache.
408    pub(crate) fn write_cache(&self) -> Option<&WriteCacheRef> {
409        self.write_cache.as_ref()
410    }
411
412    pub(crate) fn inverted_index_cache(&self) -> Option<&InvertedIndexCacheRef> {
413        self.index_cache.as_ref()
414    }
415
416    pub(crate) fn bloom_filter_index_cache(&self) -> Option<&BloomFilterIndexCacheRef> {
417        self.bloom_filter_index_cache.as_ref()
418    }
419
420    pub(crate) fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
421        self.puffin_metadata_cache.as_ref()
422    }
423}
424
425/// Increases selector cache miss metrics.
426pub fn selector_result_cache_miss() {
427    CACHE_MISS.with_label_values(&[SELECTOR_RESULT_TYPE]).inc()
428}
429
430/// Increases selector cache hit metrics.
431pub fn selector_result_cache_hit() {
432    CACHE_HIT.with_label_values(&[SELECTOR_RESULT_TYPE]).inc()
433}
434
435/// Builder to construct a [CacheManager].
436#[derive(Default)]
437pub struct CacheManagerBuilder {
438    sst_meta_cache_size: u64,
439    vector_cache_size: u64,
440    page_cache_size: u64,
441    index_metadata_size: u64,
442    index_content_size: u64,
443    index_content_page_size: u64,
444    puffin_metadata_size: u64,
445    write_cache: Option<WriteCacheRef>,
446    selector_result_cache_size: u64,
447}
448
449impl CacheManagerBuilder {
450    /// Sets meta cache size.
451    pub fn sst_meta_cache_size(mut self, bytes: u64) -> Self {
452        self.sst_meta_cache_size = bytes;
453        self
454    }
455
456    /// Sets vector cache size.
457    pub fn vector_cache_size(mut self, bytes: u64) -> Self {
458        self.vector_cache_size = bytes;
459        self
460    }
461
462    /// Sets page cache size.
463    pub fn page_cache_size(mut self, bytes: u64) -> Self {
464        self.page_cache_size = bytes;
465        self
466    }
467
468    /// Sets write cache.
469    pub fn write_cache(mut self, cache: Option<WriteCacheRef>) -> Self {
470        self.write_cache = cache;
471        self
472    }
473
474    /// Sets cache size for index metadata.
475    pub fn index_metadata_size(mut self, bytes: u64) -> Self {
476        self.index_metadata_size = bytes;
477        self
478    }
479
480    /// Sets cache size for index content.
481    pub fn index_content_size(mut self, bytes: u64) -> Self {
482        self.index_content_size = bytes;
483        self
484    }
485
486    /// Sets page size for index content.
487    pub fn index_content_page_size(mut self, bytes: u64) -> Self {
488        self.index_content_page_size = bytes;
489        self
490    }
491
492    /// Sets cache size for puffin metadata.
493    pub fn puffin_metadata_size(mut self, bytes: u64) -> Self {
494        self.puffin_metadata_size = bytes;
495        self
496    }
497
498    /// Sets selector result cache size.
499    pub fn selector_result_cache_size(mut self, bytes: u64) -> Self {
500        self.selector_result_cache_size = bytes;
501        self
502    }
503
504    /// Builds the [CacheManager].
505    pub fn build(self) -> CacheManager {
506        fn to_str(cause: RemovalCause) -> &'static str {
507            match cause {
508                RemovalCause::Expired => "expired",
509                RemovalCause::Explicit => "explicit",
510                RemovalCause::Replaced => "replaced",
511                RemovalCause::Size => "size",
512            }
513        }
514
515        let sst_meta_cache = (self.sst_meta_cache_size != 0).then(|| {
516            Cache::builder()
517                .max_capacity(self.sst_meta_cache_size)
518                .weigher(meta_cache_weight)
519                .eviction_listener(|k, v, cause| {
520                    let size = meta_cache_weight(&k, &v);
521                    CACHE_BYTES
522                        .with_label_values(&[SST_META_TYPE])
523                        .sub(size.into());
524                    CACHE_EVICTION
525                        .with_label_values(&[SST_META_TYPE, to_str(cause)])
526                        .inc();
527                })
528                .build()
529        });
530        let vector_cache = (self.vector_cache_size != 0).then(|| {
531            Cache::builder()
532                .max_capacity(self.vector_cache_size)
533                .weigher(vector_cache_weight)
534                .eviction_listener(|k, v, cause| {
535                    let size = vector_cache_weight(&k, &v);
536                    CACHE_BYTES
537                        .with_label_values(&[VECTOR_TYPE])
538                        .sub(size.into());
539                    CACHE_EVICTION
540                        .with_label_values(&[VECTOR_TYPE, to_str(cause)])
541                        .inc();
542                })
543                .build()
544        });
545        let page_cache = (self.page_cache_size != 0).then(|| {
546            Cache::builder()
547                .max_capacity(self.page_cache_size)
548                .weigher(page_cache_weight)
549                .eviction_listener(|k, v, cause| {
550                    let size = page_cache_weight(&k, &v);
551                    CACHE_BYTES.with_label_values(&[PAGE_TYPE]).sub(size.into());
552                    CACHE_EVICTION
553                        .with_label_values(&[PAGE_TYPE, to_str(cause)])
554                        .inc();
555                })
556                .build()
557        });
558        let inverted_index_cache = InvertedIndexCache::new(
559            self.index_metadata_size,
560            self.index_content_size,
561            self.index_content_page_size,
562        );
563        // TODO(ruihang): check if it's ok to reuse the same param with inverted index
564        let bloom_filter_index_cache = BloomFilterIndexCache::new(
565            self.index_metadata_size,
566            self.index_content_size,
567            self.index_content_page_size,
568        );
569        let puffin_metadata_cache =
570            PuffinMetadataCache::new(self.puffin_metadata_size, &CACHE_BYTES);
571        let selector_result_cache = (self.selector_result_cache_size != 0).then(|| {
572            Cache::builder()
573                .max_capacity(self.selector_result_cache_size)
574                .weigher(selector_result_cache_weight)
575                .eviction_listener(|k, v, cause| {
576                    let size = selector_result_cache_weight(&k, &v);
577                    CACHE_BYTES
578                        .with_label_values(&[SELECTOR_RESULT_TYPE])
579                        .sub(size.into());
580                    CACHE_EVICTION
581                        .with_label_values(&[SELECTOR_RESULT_TYPE, to_str(cause)])
582                        .inc();
583                })
584                .build()
585        });
586        CacheManager {
587            sst_meta_cache,
588            vector_cache,
589            page_cache,
590            write_cache: self.write_cache,
591            index_cache: Some(Arc::new(inverted_index_cache)),
592            bloom_filter_index_cache: Some(Arc::new(bloom_filter_index_cache)),
593            puffin_metadata_cache: Some(Arc::new(puffin_metadata_cache)),
594            selector_result_cache,
595        }
596    }
597}
598
599fn meta_cache_weight(k: &SstMetaKey, v: &Arc<ParquetMetaData>) -> u32 {
600    // We ignore the size of `Arc`.
601    (k.estimated_size() + parquet_meta_size(v)) as u32
602}
603
604fn vector_cache_weight(_k: &(ConcreteDataType, Value), v: &VectorRef) -> u32 {
605    // We ignore the heap size of `Value`.
606    (mem::size_of::<ConcreteDataType>() + mem::size_of::<Value>() + v.memory_size()) as u32
607}
608
609fn page_cache_weight(k: &PageKey, v: &Arc<PageValue>) -> u32 {
610    (k.estimated_size() + v.estimated_size()) as u32
611}
612
613fn selector_result_cache_weight(k: &SelectorResultKey, v: &Arc<SelectorResultValue>) -> u32 {
614    (mem::size_of_val(k) + v.estimated_size()) as u32
615}
616
617/// Updates cache hit/miss metrics.
618fn update_hit_miss<T>(value: Option<T>, cache_type: &str) -> Option<T> {
619    if value.is_some() {
620        CACHE_HIT.with_label_values(&[cache_type]).inc();
621    } else {
622        CACHE_MISS.with_label_values(&[cache_type]).inc();
623    }
624    value
625}
626
627/// Cache key (region id, file id) for SST meta.
628#[derive(Debug, Clone, PartialEq, Eq, Hash)]
629struct SstMetaKey(RegionId, FileId);
630
631impl SstMetaKey {
632    /// Returns memory used by the key (estimated).
633    fn estimated_size(&self) -> usize {
634        mem::size_of::<Self>()
635    }
636}
637
638/// Path to column pages in the SST file.
639#[derive(Debug, Clone, PartialEq, Eq, Hash)]
640pub struct ColumnPagePath {
641    /// Region id of the SST file to cache.
642    region_id: RegionId,
643    /// Id of the SST file to cache.
644    file_id: FileId,
645    /// Index of the row group.
646    row_group_idx: usize,
647    /// Index of the column in the row group.
648    column_idx: usize,
649}
650
651/// Cache key for pages of a SST row group.
652#[derive(Debug, Clone, PartialEq, Eq, Hash)]
653pub enum PageKey {
654    /// Cache key for a compressed page in a row group.
655    Compressed(ColumnPagePath),
656    /// Cache key for all uncompressed pages in a row group.
657    Uncompressed(ColumnPagePath),
658}
659
660impl PageKey {
661    /// Creates a key for a compressed page.
662    pub fn new_compressed(
663        region_id: RegionId,
664        file_id: FileId,
665        row_group_idx: usize,
666        column_idx: usize,
667    ) -> PageKey {
668        PageKey::Compressed(ColumnPagePath {
669            region_id,
670            file_id,
671            row_group_idx,
672            column_idx,
673        })
674    }
675
676    /// Creates a key for all uncompressed pages in a row group.
677    pub fn new_uncompressed(
678        region_id: RegionId,
679        file_id: FileId,
680        row_group_idx: usize,
681        column_idx: usize,
682    ) -> PageKey {
683        PageKey::Uncompressed(ColumnPagePath {
684            region_id,
685            file_id,
686            row_group_idx,
687            column_idx,
688        })
689    }
690
691    /// Returns memory used by the key (estimated).
692    fn estimated_size(&self) -> usize {
693        mem::size_of::<Self>()
694    }
695}
696
697/// Cached row group pages for a column.
698// We don't use enum here to make it easier to mock and use the struct.
699#[derive(Default)]
700pub struct PageValue {
701    /// Compressed page of the column in the row group.
702    pub compressed: Bytes,
703    /// All pages of the column in the row group.
704    pub row_group: Vec<Page>,
705}
706
707impl PageValue {
708    /// Creates a new value from a compressed page.
709    pub fn new_compressed(bytes: Bytes) -> PageValue {
710        PageValue {
711            compressed: bytes,
712            row_group: vec![],
713        }
714    }
715
716    /// Creates a new value from all pages in a row group.
717    pub fn new_row_group(pages: Vec<Page>) -> PageValue {
718        PageValue {
719            compressed: Bytes::new(),
720            row_group: pages,
721        }
722    }
723
724    /// Returns memory used by the value (estimated).
725    fn estimated_size(&self) -> usize {
726        mem::size_of::<Self>()
727            + self.compressed.len()
728            + self
729                .row_group
730                .iter()
731                .map(|page| page.buffer().len())
732                .sum::<usize>()
733    }
734}
735
736/// Cache key for time series row selector result.
737#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
738pub struct SelectorResultKey {
739    /// Id of the SST file.
740    pub file_id: FileId,
741    /// Index of the row group.
742    pub row_group_idx: usize,
743    /// Time series row selector.
744    pub selector: TimeSeriesRowSelector,
745}
746
747/// Cached result for time series row selector.
748pub struct SelectorResultValue {
749    /// Batches of rows selected by the selector.
750    pub result: Vec<Batch>,
751    /// Projection of rows.
752    pub projection: Vec<usize>,
753}
754
755impl SelectorResultValue {
756    /// Creates a new selector result value.
757    pub fn new(result: Vec<Batch>, projection: Vec<usize>) -> SelectorResultValue {
758        SelectorResultValue { result, projection }
759    }
760
761    /// Returns memory used by the value (estimated).
762    fn estimated_size(&self) -> usize {
763        // We only consider heap size of all batches.
764        self.result.iter().map(|batch| batch.memory_size()).sum()
765    }
766}
767
768/// Maps (region id, file id) to [ParquetMetaData].
769type SstMetaCache = Cache<SstMetaKey, Arc<ParquetMetaData>>;
770/// Maps [Value] to a vector that holds this value repeatedly.
771///
772/// e.g. `"hello" => ["hello", "hello", "hello"]`
773type VectorCache = Cache<(ConcreteDataType, Value), VectorRef>;
774/// Maps (region, file, row group, column) to [PageValue].
775type PageCache = Cache<PageKey, Arc<PageValue>>;
776/// Maps (file id, row group id, time series row selector) to [SelectorResultValue].
777type SelectorResultCache = Cache<SelectorResultKey, Arc<SelectorResultValue>>;
778
779#[cfg(test)]
780mod tests {
781    use std::sync::Arc;
782
783    use datatypes::vectors::Int64Vector;
784
785    use super::*;
786    use crate::cache::test_util::parquet_meta;
787
788    #[tokio::test]
789    async fn test_disable_cache() {
790        let cache = CacheManager::default();
791        assert!(cache.sst_meta_cache.is_none());
792        assert!(cache.vector_cache.is_none());
793        assert!(cache.page_cache.is_none());
794
795        let region_id = RegionId::new(1, 1);
796        let file_id = FileId::random();
797        let metadata = parquet_meta();
798        cache.put_parquet_meta_data(region_id, file_id, metadata);
799        assert!(cache
800            .get_parquet_meta_data(region_id, file_id)
801            .await
802            .is_none());
803
804        let value = Value::Int64(10);
805        let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10]));
806        cache.put_repeated_vector(value.clone(), vector.clone());
807        assert!(cache
808            .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
809            .is_none());
810
811        let key = PageKey::new_uncompressed(region_id, file_id, 0, 0);
812        let pages = Arc::new(PageValue::default());
813        cache.put_pages(key.clone(), pages);
814        assert!(cache.get_pages(&key).is_none());
815
816        assert!(cache.write_cache().is_none());
817    }
818
819    #[tokio::test]
820    async fn test_parquet_meta_cache() {
821        let cache = CacheManager::builder().sst_meta_cache_size(2000).build();
822        let region_id = RegionId::new(1, 1);
823        let file_id = FileId::random();
824        assert!(cache
825            .get_parquet_meta_data(region_id, file_id)
826            .await
827            .is_none());
828        let metadata = parquet_meta();
829        cache.put_parquet_meta_data(region_id, file_id, metadata);
830        assert!(cache
831            .get_parquet_meta_data(region_id, file_id)
832            .await
833            .is_some());
834        cache.remove_parquet_meta_data(region_id, file_id);
835        assert!(cache
836            .get_parquet_meta_data(region_id, file_id)
837            .await
838            .is_none());
839    }
840
841    #[test]
842    fn test_repeated_vector_cache() {
843        let cache = CacheManager::builder().vector_cache_size(4096).build();
844        let value = Value::Int64(10);
845        assert!(cache
846            .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
847            .is_none());
848        let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10]));
849        cache.put_repeated_vector(value.clone(), vector.clone());
850        let cached = cache
851            .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
852            .unwrap();
853        assert_eq!(vector, cached);
854    }
855
856    #[test]
857    fn test_page_cache() {
858        let cache = CacheManager::builder().page_cache_size(1000).build();
859        let region_id = RegionId::new(1, 1);
860        let file_id = FileId::random();
861        let key = PageKey::new_compressed(region_id, file_id, 0, 0);
862        assert!(cache.get_pages(&key).is_none());
863        let pages = Arc::new(PageValue::default());
864        cache.put_pages(key.clone(), pages);
865        assert!(cache.get_pages(&key).is_some());
866    }
867
868    #[test]
869    fn test_selector_result_cache() {
870        let cache = CacheManager::builder()
871            .selector_result_cache_size(1000)
872            .build();
873        let file_id = FileId::random();
874        let key = SelectorResultKey {
875            file_id,
876            row_group_idx: 0,
877            selector: TimeSeriesRowSelector::LastRow,
878        };
879        assert!(cache.get_selector_result(&key).is_none());
880        let result = Arc::new(SelectorResultValue::new(Vec::new(), Vec::new()));
881        cache.put_selector_result(key, result);
882        assert!(cache.get_selector_result(&key).is_some());
883    }
884}