mito2/
cache.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Cache for the engine.
16
17pub(crate) mod cache_size;
18
19pub(crate) mod file_cache;
20pub(crate) mod index;
21pub(crate) mod manifest_cache;
22#[cfg(test)]
23pub(crate) mod test_util;
24pub(crate) mod write_cache;
25
26use std::mem;
27use std::ops::Range;
28use std::sync::Arc;
29
30use bytes::Bytes;
31use datatypes::arrow::record_batch::RecordBatch;
32use datatypes::value::Value;
33use datatypes::vectors::VectorRef;
34use index::bloom_filter_index::{BloomFilterIndexCache, BloomFilterIndexCacheRef};
35use index::result_cache::IndexResultCache;
36use moka::notification::RemovalCause;
37use moka::sync::Cache;
38use object_store::ObjectStore;
39use parquet::file::metadata::{PageIndexPolicy, ParquetMetaData};
40use puffin::puffin_manager::cache::{PuffinMetadataCache, PuffinMetadataCacheRef};
41use store_api::storage::{ConcreteDataType, FileId, RegionId, TimeSeriesRowSelector};
42
43use crate::cache::cache_size::parquet_meta_size;
44use crate::cache::file_cache::{FileType, IndexKey};
45use crate::cache::index::inverted_index::{InvertedIndexCache, InvertedIndexCacheRef};
46#[cfg(feature = "vector_index")]
47use crate::cache::index::vector_index::{VectorIndexCache, VectorIndexCacheRef};
48use crate::cache::write_cache::WriteCacheRef;
49use crate::memtable::record_batch_estimated_size;
50use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS};
51use crate::read::Batch;
52use crate::sst::file::{RegionFileId, RegionIndexId};
53use crate::sst::parquet::reader::MetadataCacheMetrics;
54
55/// Metrics type key for sst meta.
56const SST_META_TYPE: &str = "sst_meta";
57/// Metrics type key for vector.
58const VECTOR_TYPE: &str = "vector";
59/// Metrics type key for pages.
60const PAGE_TYPE: &str = "page";
61/// Metrics type key for files on the local store.
62const FILE_TYPE: &str = "file";
63/// Metrics type key for index files (puffin) on the local store.
64const INDEX_TYPE: &str = "index";
65/// Metrics type key for selector result cache.
66const SELECTOR_RESULT_TYPE: &str = "selector_result";
67
68/// Cache strategies that may only enable a subset of caches.
69#[derive(Clone)]
70pub enum CacheStrategy {
71    /// Strategy for normal operations.
72    /// Doesn't disable any cache.
73    EnableAll(CacheManagerRef),
74    /// Strategy for compaction.
75    /// Disables some caches during compaction to avoid affecting queries.
76    /// Enables the write cache so that the compaction can read files cached
77    /// in the write cache and write the compacted files back to the write cache.
78    Compaction(CacheManagerRef),
79    /// Do not use any cache.
80    Disabled,
81}
82
83impl CacheStrategy {
84    /// Gets parquet metadata with cache metrics tracking.
85    /// Returns the metadata and updates the provided metrics.
86    pub(crate) async fn get_parquet_meta_data(
87        &self,
88        file_id: RegionFileId,
89        metrics: &mut MetadataCacheMetrics,
90        page_index_policy: PageIndexPolicy,
91    ) -> Option<Arc<ParquetMetaData>> {
92        match self {
93            CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => {
94                cache_manager
95                    .get_parquet_meta_data(file_id, metrics, page_index_policy)
96                    .await
97            }
98            CacheStrategy::Disabled => {
99                metrics.cache_miss += 1;
100                None
101            }
102        }
103    }
104
105    /// Calls [CacheManager::get_parquet_meta_data_from_mem_cache()].
106    pub fn get_parquet_meta_data_from_mem_cache(
107        &self,
108        file_id: RegionFileId,
109    ) -> Option<Arc<ParquetMetaData>> {
110        match self {
111            CacheStrategy::EnableAll(cache_manager) => {
112                cache_manager.get_parquet_meta_data_from_mem_cache(file_id)
113            }
114            CacheStrategy::Compaction(cache_manager) => {
115                cache_manager.get_parquet_meta_data_from_mem_cache(file_id)
116            }
117            CacheStrategy::Disabled => None,
118        }
119    }
120
121    /// Calls [CacheManager::put_parquet_meta_data()].
122    pub fn put_parquet_meta_data(&self, file_id: RegionFileId, metadata: Arc<ParquetMetaData>) {
123        match self {
124            CacheStrategy::EnableAll(cache_manager) => {
125                cache_manager.put_parquet_meta_data(file_id, metadata);
126            }
127            CacheStrategy::Compaction(cache_manager) => {
128                cache_manager.put_parquet_meta_data(file_id, metadata);
129            }
130            CacheStrategy::Disabled => {}
131        }
132    }
133
134    /// Calls [CacheManager::remove_parquet_meta_data()].
135    pub fn remove_parquet_meta_data(&self, file_id: RegionFileId) {
136        match self {
137            CacheStrategy::EnableAll(cache_manager) => {
138                cache_manager.remove_parquet_meta_data(file_id);
139            }
140            CacheStrategy::Compaction(cache_manager) => {
141                cache_manager.remove_parquet_meta_data(file_id);
142            }
143            CacheStrategy::Disabled => {}
144        }
145    }
146
147    /// Calls [CacheManager::get_repeated_vector()].
148    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
149    pub fn get_repeated_vector(
150        &self,
151        data_type: &ConcreteDataType,
152        value: &Value,
153    ) -> Option<VectorRef> {
154        match self {
155            CacheStrategy::EnableAll(cache_manager) => {
156                cache_manager.get_repeated_vector(data_type, value)
157            }
158            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
159        }
160    }
161
162    /// Calls [CacheManager::put_repeated_vector()].
163    /// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
164    pub fn put_repeated_vector(&self, value: Value, vector: VectorRef) {
165        if let CacheStrategy::EnableAll(cache_manager) = self {
166            cache_manager.put_repeated_vector(value, vector);
167        }
168    }
169
170    /// Calls [CacheManager::get_pages()].
171    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
172    pub fn get_pages(&self, page_key: &PageKey) -> Option<Arc<PageValue>> {
173        match self {
174            CacheStrategy::EnableAll(cache_manager) => cache_manager.get_pages(page_key),
175            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
176        }
177    }
178
179    /// Calls [CacheManager::put_pages()].
180    /// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
181    pub fn put_pages(&self, page_key: PageKey, pages: Arc<PageValue>) {
182        if let CacheStrategy::EnableAll(cache_manager) = self {
183            cache_manager.put_pages(page_key, pages);
184        }
185    }
186
187    /// Calls [CacheManager::evict_puffin_cache()].
188    pub async fn evict_puffin_cache(&self, file_id: RegionIndexId) {
189        match self {
190            CacheStrategy::EnableAll(cache_manager) => {
191                cache_manager.evict_puffin_cache(file_id).await
192            }
193            CacheStrategy::Compaction(cache_manager) => {
194                cache_manager.evict_puffin_cache(file_id).await
195            }
196            CacheStrategy::Disabled => {}
197        }
198    }
199
200    /// Calls [CacheManager::get_selector_result()].
201    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
202    pub fn get_selector_result(
203        &self,
204        selector_key: &SelectorResultKey,
205    ) -> Option<Arc<SelectorResultValue>> {
206        match self {
207            CacheStrategy::EnableAll(cache_manager) => {
208                cache_manager.get_selector_result(selector_key)
209            }
210            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
211        }
212    }
213
214    /// Calls [CacheManager::put_selector_result()].
215    /// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
216    pub fn put_selector_result(
217        &self,
218        selector_key: SelectorResultKey,
219        result: Arc<SelectorResultValue>,
220    ) {
221        if let CacheStrategy::EnableAll(cache_manager) = self {
222            cache_manager.put_selector_result(selector_key, result);
223        }
224    }
225
226    /// Calls [CacheManager::write_cache()].
227    /// It returns None if the strategy is [CacheStrategy::Disabled].
228    pub fn write_cache(&self) -> Option<&WriteCacheRef> {
229        match self {
230            CacheStrategy::EnableAll(cache_manager) => cache_manager.write_cache(),
231            CacheStrategy::Compaction(cache_manager) => cache_manager.write_cache(),
232            CacheStrategy::Disabled => None,
233        }
234    }
235
236    /// Calls [CacheManager::index_cache()].
237    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
238    pub fn inverted_index_cache(&self) -> Option<&InvertedIndexCacheRef> {
239        match self {
240            CacheStrategy::EnableAll(cache_manager) => cache_manager.inverted_index_cache(),
241            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
242        }
243    }
244
245    /// Calls [CacheManager::bloom_filter_index_cache()].
246    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
247    pub fn bloom_filter_index_cache(&self) -> Option<&BloomFilterIndexCacheRef> {
248        match self {
249            CacheStrategy::EnableAll(cache_manager) => cache_manager.bloom_filter_index_cache(),
250            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
251        }
252    }
253
254    /// Calls [CacheManager::vector_index_cache()].
255    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
256    #[cfg(feature = "vector_index")]
257    pub fn vector_index_cache(&self) -> Option<&VectorIndexCacheRef> {
258        match self {
259            CacheStrategy::EnableAll(cache_manager) => cache_manager.vector_index_cache(),
260            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
261        }
262    }
263
264    /// Calls [CacheManager::puffin_metadata_cache()].
265    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
266    pub fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
267        match self {
268            CacheStrategy::EnableAll(cache_manager) => cache_manager.puffin_metadata_cache(),
269            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
270        }
271    }
272
273    /// Calls [CacheManager::index_result_cache()].
274    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
275    pub fn index_result_cache(&self) -> Option<&IndexResultCache> {
276        match self {
277            CacheStrategy::EnableAll(cache_manager) => cache_manager.index_result_cache(),
278            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
279        }
280    }
281
282    /// Triggers download if the strategy is [CacheStrategy::EnableAll] and write cache is available.
283    pub fn maybe_download_background(
284        &self,
285        index_key: IndexKey,
286        remote_path: String,
287        remote_store: ObjectStore,
288        file_size: u64,
289    ) {
290        if let CacheStrategy::EnableAll(cache_manager) = self
291            && let Some(write_cache) = cache_manager.write_cache()
292        {
293            write_cache.file_cache().maybe_download_background(
294                index_key,
295                remote_path,
296                remote_store,
297                file_size,
298            );
299        }
300    }
301}
302
303/// Manages cached data for the engine.
304///
305/// All caches are disabled by default.
306#[derive(Default)]
307pub struct CacheManager {
308    /// Cache for SST metadata.
309    sst_meta_cache: Option<SstMetaCache>,
310    /// Cache for vectors.
311    vector_cache: Option<VectorCache>,
312    /// Cache for SST pages.
313    page_cache: Option<PageCache>,
314    /// A Cache for writing files to object stores.
315    write_cache: Option<WriteCacheRef>,
316    /// Cache for inverted index.
317    inverted_index_cache: Option<InvertedIndexCacheRef>,
318    /// Cache for bloom filter index.
319    bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
320    /// Cache for vector index.
321    #[cfg(feature = "vector_index")]
322    vector_index_cache: Option<VectorIndexCacheRef>,
323    /// Puffin metadata cache.
324    puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
325    /// Cache for time series selectors.
326    selector_result_cache: Option<SelectorResultCache>,
327    /// Cache for index result.
328    index_result_cache: Option<IndexResultCache>,
329}
330
331pub type CacheManagerRef = Arc<CacheManager>;
332
333impl CacheManager {
334    /// Returns a builder to build the cache.
335    pub fn builder() -> CacheManagerBuilder {
336        CacheManagerBuilder::default()
337    }
338
339    /// Gets cached [ParquetMetaData] with metrics tracking.
340    /// Tries in-memory cache first, then file cache, updating metrics accordingly.
341    pub(crate) async fn get_parquet_meta_data(
342        &self,
343        file_id: RegionFileId,
344        metrics: &mut MetadataCacheMetrics,
345        page_index_policy: PageIndexPolicy,
346    ) -> Option<Arc<ParquetMetaData>> {
347        // Try to get metadata from sst meta cache
348        if let Some(metadata) = self.get_parquet_meta_data_from_mem_cache(file_id) {
349            metrics.mem_cache_hit += 1;
350            return Some(metadata);
351        }
352
353        // Try to get metadata from write cache
354        let key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Parquet);
355        if let Some(write_cache) = &self.write_cache
356            && let Some(metadata) = write_cache
357                .file_cache()
358                .get_parquet_meta_data(key, metrics, page_index_policy)
359                .await
360        {
361            metrics.file_cache_hit += 1;
362            let metadata = Arc::new(metadata);
363            // Put metadata into sst meta cache
364            self.put_parquet_meta_data(file_id, metadata.clone());
365            return Some(metadata);
366        };
367        metrics.cache_miss += 1;
368
369        None
370    }
371
372    /// Gets cached [ParquetMetaData] from in-memory cache.
373    /// This method does not perform I/O.
374    pub fn get_parquet_meta_data_from_mem_cache(
375        &self,
376        file_id: RegionFileId,
377    ) -> Option<Arc<ParquetMetaData>> {
378        // Try to get metadata from sst meta cache
379        self.sst_meta_cache.as_ref().and_then(|sst_meta_cache| {
380            let value = sst_meta_cache.get(&SstMetaKey(file_id.region_id(), file_id.file_id()));
381            update_hit_miss(value, SST_META_TYPE)
382        })
383    }
384
385    /// Puts [ParquetMetaData] into the cache.
386    pub fn put_parquet_meta_data(&self, file_id: RegionFileId, metadata: Arc<ParquetMetaData>) {
387        if let Some(cache) = &self.sst_meta_cache {
388            let key = SstMetaKey(file_id.region_id(), file_id.file_id());
389            CACHE_BYTES
390                .with_label_values(&[SST_META_TYPE])
391                .add(meta_cache_weight(&key, &metadata).into());
392            cache.insert(key, metadata);
393        }
394    }
395
396    /// Removes [ParquetMetaData] from the cache.
397    pub fn remove_parquet_meta_data(&self, file_id: RegionFileId) {
398        if let Some(cache) = &self.sst_meta_cache {
399            cache.remove(&SstMetaKey(file_id.region_id(), file_id.file_id()));
400        }
401    }
402
403    /// Returns the total weighted size of the in-memory SST meta cache.
404    pub(crate) fn sst_meta_cache_weighted_size(&self) -> u64 {
405        self.sst_meta_cache
406            .as_ref()
407            .map(|cache| cache.weighted_size())
408            .unwrap_or(0)
409    }
410
411    /// Returns true if the in-memory SST meta cache is enabled.
412    pub(crate) fn sst_meta_cache_enabled(&self) -> bool {
413        self.sst_meta_cache.is_some()
414    }
415
416    /// Gets a vector with repeated value for specific `key`.
417    pub fn get_repeated_vector(
418        &self,
419        data_type: &ConcreteDataType,
420        value: &Value,
421    ) -> Option<VectorRef> {
422        self.vector_cache.as_ref().and_then(|vector_cache| {
423            let value = vector_cache.get(&(data_type.clone(), value.clone()));
424            update_hit_miss(value, VECTOR_TYPE)
425        })
426    }
427
428    /// Puts a vector with repeated value into the cache.
429    pub fn put_repeated_vector(&self, value: Value, vector: VectorRef) {
430        if let Some(cache) = &self.vector_cache {
431            let key = (vector.data_type(), value);
432            CACHE_BYTES
433                .with_label_values(&[VECTOR_TYPE])
434                .add(vector_cache_weight(&key, &vector).into());
435            cache.insert(key, vector);
436        }
437    }
438
439    /// Gets pages for the row group.
440    pub fn get_pages(&self, page_key: &PageKey) -> Option<Arc<PageValue>> {
441        self.page_cache.as_ref().and_then(|page_cache| {
442            let value = page_cache.get(page_key);
443            update_hit_miss(value, PAGE_TYPE)
444        })
445    }
446
447    /// Puts pages of the row group into the cache.
448    pub fn put_pages(&self, page_key: PageKey, pages: Arc<PageValue>) {
449        if let Some(cache) = &self.page_cache {
450            CACHE_BYTES
451                .with_label_values(&[PAGE_TYPE])
452                .add(page_cache_weight(&page_key, &pages).into());
453            cache.insert(page_key, pages);
454        }
455    }
456
457    /// Evicts every puffin-related cache entry for the given file.
458    pub async fn evict_puffin_cache(&self, file_id: RegionIndexId) {
459        if let Some(cache) = &self.bloom_filter_index_cache {
460            cache.invalidate_file(file_id.file_id());
461        }
462
463        if let Some(cache) = &self.inverted_index_cache {
464            cache.invalidate_file(file_id.file_id());
465        }
466
467        if let Some(cache) = &self.index_result_cache {
468            cache.invalidate_file(file_id.file_id());
469        }
470
471        #[cfg(feature = "vector_index")]
472        if let Some(cache) = &self.vector_index_cache {
473            cache.invalidate_file(file_id.file_id());
474        }
475
476        if let Some(cache) = &self.puffin_metadata_cache {
477            cache.remove(&file_id.to_string());
478        }
479
480        if let Some(write_cache) = &self.write_cache {
481            write_cache
482                .remove(IndexKey::new(
483                    file_id.region_id(),
484                    file_id.file_id(),
485                    FileType::Puffin(file_id.version),
486                ))
487                .await;
488        }
489    }
490
491    /// Gets result of for the selector.
492    pub fn get_selector_result(
493        &self,
494        selector_key: &SelectorResultKey,
495    ) -> Option<Arc<SelectorResultValue>> {
496        self.selector_result_cache
497            .as_ref()
498            .and_then(|selector_result_cache| selector_result_cache.get(selector_key))
499    }
500
501    /// Puts result of the selector into the cache.
502    pub fn put_selector_result(
503        &self,
504        selector_key: SelectorResultKey,
505        result: Arc<SelectorResultValue>,
506    ) {
507        if let Some(cache) = &self.selector_result_cache {
508            CACHE_BYTES
509                .with_label_values(&[SELECTOR_RESULT_TYPE])
510                .add(selector_result_cache_weight(&selector_key, &result).into());
511            cache.insert(selector_key, result);
512        }
513    }
514
515    /// Gets the write cache.
516    pub(crate) fn write_cache(&self) -> Option<&WriteCacheRef> {
517        self.write_cache.as_ref()
518    }
519
520    pub(crate) fn inverted_index_cache(&self) -> Option<&InvertedIndexCacheRef> {
521        self.inverted_index_cache.as_ref()
522    }
523
524    pub(crate) fn bloom_filter_index_cache(&self) -> Option<&BloomFilterIndexCacheRef> {
525        self.bloom_filter_index_cache.as_ref()
526    }
527
528    #[cfg(feature = "vector_index")]
529    pub(crate) fn vector_index_cache(&self) -> Option<&VectorIndexCacheRef> {
530        self.vector_index_cache.as_ref()
531    }
532
533    pub(crate) fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
534        self.puffin_metadata_cache.as_ref()
535    }
536
537    pub(crate) fn index_result_cache(&self) -> Option<&IndexResultCache> {
538        self.index_result_cache.as_ref()
539    }
540}
541
542/// Increases selector cache miss metrics.
543pub fn selector_result_cache_miss() {
544    CACHE_MISS.with_label_values(&[SELECTOR_RESULT_TYPE]).inc()
545}
546
547/// Increases selector cache hit metrics.
548pub fn selector_result_cache_hit() {
549    CACHE_HIT.with_label_values(&[SELECTOR_RESULT_TYPE]).inc()
550}
551
552/// Builder to construct a [CacheManager].
553#[derive(Default)]
554pub struct CacheManagerBuilder {
555    sst_meta_cache_size: u64,
556    vector_cache_size: u64,
557    page_cache_size: u64,
558    index_metadata_size: u64,
559    index_content_size: u64,
560    index_content_page_size: u64,
561    index_result_cache_size: u64,
562    puffin_metadata_size: u64,
563    write_cache: Option<WriteCacheRef>,
564    selector_result_cache_size: u64,
565}
566
567impl CacheManagerBuilder {
568    /// Sets meta cache size.
569    pub fn sst_meta_cache_size(mut self, bytes: u64) -> Self {
570        self.sst_meta_cache_size = bytes;
571        self
572    }
573
574    /// Sets vector cache size.
575    pub fn vector_cache_size(mut self, bytes: u64) -> Self {
576        self.vector_cache_size = bytes;
577        self
578    }
579
580    /// Sets page cache size.
581    pub fn page_cache_size(mut self, bytes: u64) -> Self {
582        self.page_cache_size = bytes;
583        self
584    }
585
586    /// Sets write cache.
587    pub fn write_cache(mut self, cache: Option<WriteCacheRef>) -> Self {
588        self.write_cache = cache;
589        self
590    }
591
592    /// Sets cache size for index metadata.
593    pub fn index_metadata_size(mut self, bytes: u64) -> Self {
594        self.index_metadata_size = bytes;
595        self
596    }
597
598    /// Sets cache size for index content.
599    pub fn index_content_size(mut self, bytes: u64) -> Self {
600        self.index_content_size = bytes;
601        self
602    }
603
604    /// Sets page size for index content.
605    pub fn index_content_page_size(mut self, bytes: u64) -> Self {
606        self.index_content_page_size = bytes;
607        self
608    }
609
610    /// Sets cache size for index result.
611    pub fn index_result_cache_size(mut self, bytes: u64) -> Self {
612        self.index_result_cache_size = bytes;
613        self
614    }
615
616    /// Sets cache size for puffin metadata.
617    pub fn puffin_metadata_size(mut self, bytes: u64) -> Self {
618        self.puffin_metadata_size = bytes;
619        self
620    }
621
622    /// Sets selector result cache size.
623    pub fn selector_result_cache_size(mut self, bytes: u64) -> Self {
624        self.selector_result_cache_size = bytes;
625        self
626    }
627
628    /// Builds the [CacheManager].
629    pub fn build(self) -> CacheManager {
630        fn to_str(cause: RemovalCause) -> &'static str {
631            match cause {
632                RemovalCause::Expired => "expired",
633                RemovalCause::Explicit => "explicit",
634                RemovalCause::Replaced => "replaced",
635                RemovalCause::Size => "size",
636            }
637        }
638
639        let sst_meta_cache = (self.sst_meta_cache_size != 0).then(|| {
640            Cache::builder()
641                .max_capacity(self.sst_meta_cache_size)
642                .weigher(meta_cache_weight)
643                .eviction_listener(|k, v, cause| {
644                    let size = meta_cache_weight(&k, &v);
645                    CACHE_BYTES
646                        .with_label_values(&[SST_META_TYPE])
647                        .sub(size.into());
648                    CACHE_EVICTION
649                        .with_label_values(&[SST_META_TYPE, to_str(cause)])
650                        .inc();
651                })
652                .build()
653        });
654        let vector_cache = (self.vector_cache_size != 0).then(|| {
655            Cache::builder()
656                .max_capacity(self.vector_cache_size)
657                .weigher(vector_cache_weight)
658                .eviction_listener(|k, v, cause| {
659                    let size = vector_cache_weight(&k, &v);
660                    CACHE_BYTES
661                        .with_label_values(&[VECTOR_TYPE])
662                        .sub(size.into());
663                    CACHE_EVICTION
664                        .with_label_values(&[VECTOR_TYPE, to_str(cause)])
665                        .inc();
666                })
667                .build()
668        });
669        let page_cache = (self.page_cache_size != 0).then(|| {
670            Cache::builder()
671                .max_capacity(self.page_cache_size)
672                .weigher(page_cache_weight)
673                .eviction_listener(|k, v, cause| {
674                    let size = page_cache_weight(&k, &v);
675                    CACHE_BYTES.with_label_values(&[PAGE_TYPE]).sub(size.into());
676                    CACHE_EVICTION
677                        .with_label_values(&[PAGE_TYPE, to_str(cause)])
678                        .inc();
679                })
680                .build()
681        });
682        let inverted_index_cache = InvertedIndexCache::new(
683            self.index_metadata_size,
684            self.index_content_size,
685            self.index_content_page_size,
686        );
687        // TODO(ruihang): check if it's ok to reuse the same param with inverted index
688        let bloom_filter_index_cache = BloomFilterIndexCache::new(
689            self.index_metadata_size,
690            self.index_content_size,
691            self.index_content_page_size,
692        );
693        #[cfg(feature = "vector_index")]
694        let vector_index_cache = (self.index_content_size != 0)
695            .then(|| Arc::new(VectorIndexCache::new(self.index_content_size)));
696        let index_result_cache = (self.index_result_cache_size != 0)
697            .then(|| IndexResultCache::new(self.index_result_cache_size));
698        let puffin_metadata_cache =
699            PuffinMetadataCache::new(self.puffin_metadata_size, &CACHE_BYTES);
700        let selector_result_cache = (self.selector_result_cache_size != 0).then(|| {
701            Cache::builder()
702                .max_capacity(self.selector_result_cache_size)
703                .weigher(selector_result_cache_weight)
704                .eviction_listener(|k, v, cause| {
705                    let size = selector_result_cache_weight(&k, &v);
706                    CACHE_BYTES
707                        .with_label_values(&[SELECTOR_RESULT_TYPE])
708                        .sub(size.into());
709                    CACHE_EVICTION
710                        .with_label_values(&[SELECTOR_RESULT_TYPE, to_str(cause)])
711                        .inc();
712                })
713                .build()
714        });
715        CacheManager {
716            sst_meta_cache,
717            vector_cache,
718            page_cache,
719            write_cache: self.write_cache,
720            inverted_index_cache: Some(Arc::new(inverted_index_cache)),
721            bloom_filter_index_cache: Some(Arc::new(bloom_filter_index_cache)),
722            #[cfg(feature = "vector_index")]
723            vector_index_cache,
724            puffin_metadata_cache: Some(Arc::new(puffin_metadata_cache)),
725            selector_result_cache,
726            index_result_cache,
727        }
728    }
729}
730
731fn meta_cache_weight(k: &SstMetaKey, v: &Arc<ParquetMetaData>) -> u32 {
732    // We ignore the size of `Arc`.
733    (k.estimated_size() + parquet_meta_size(v)) as u32
734}
735
736fn vector_cache_weight(_k: &(ConcreteDataType, Value), v: &VectorRef) -> u32 {
737    // We ignore the heap size of `Value`.
738    (mem::size_of::<ConcreteDataType>() + mem::size_of::<Value>() + v.memory_size()) as u32
739}
740
741fn page_cache_weight(k: &PageKey, v: &Arc<PageValue>) -> u32 {
742    (k.estimated_size() + v.estimated_size()) as u32
743}
744
745fn selector_result_cache_weight(k: &SelectorResultKey, v: &Arc<SelectorResultValue>) -> u32 {
746    (mem::size_of_val(k) + v.estimated_size()) as u32
747}
748
749/// Updates cache hit/miss metrics.
750fn update_hit_miss<T>(value: Option<T>, cache_type: &str) -> Option<T> {
751    if value.is_some() {
752        CACHE_HIT.with_label_values(&[cache_type]).inc();
753    } else {
754        CACHE_MISS.with_label_values(&[cache_type]).inc();
755    }
756    value
757}
758
759/// Cache key (region id, file id) for SST meta.
760#[derive(Debug, Clone, PartialEq, Eq, Hash)]
761struct SstMetaKey(RegionId, FileId);
762
763impl SstMetaKey {
764    /// Returns memory used by the key (estimated).
765    fn estimated_size(&self) -> usize {
766        mem::size_of::<Self>()
767    }
768}
769
770/// Path to column pages in the SST file.
771#[derive(Debug, Clone, PartialEq, Eq, Hash)]
772pub struct ColumnPagePath {
773    /// Region id of the SST file to cache.
774    region_id: RegionId,
775    /// Id of the SST file to cache.
776    file_id: FileId,
777    /// Index of the row group.
778    row_group_idx: usize,
779    /// Index of the column in the row group.
780    column_idx: usize,
781}
782
783/// Cache key to pages in a row group (after projection).
784///
785/// Different projections will have different cache keys.
786/// We cache all ranges together because they may refer to the same `Bytes`.
787#[derive(Debug, Clone, PartialEq, Eq, Hash)]
788pub struct PageKey {
789    /// Id of the SST file to cache.
790    file_id: FileId,
791    /// Index of the row group.
792    row_group_idx: usize,
793    /// Byte ranges of the pages to cache.
794    ranges: Vec<Range<u64>>,
795}
796
797impl PageKey {
798    /// Creates a key for a list of pages.
799    pub fn new(file_id: FileId, row_group_idx: usize, ranges: Vec<Range<u64>>) -> PageKey {
800        PageKey {
801            file_id,
802            row_group_idx,
803            ranges,
804        }
805    }
806
807    /// Returns memory used by the key (estimated).
808    fn estimated_size(&self) -> usize {
809        mem::size_of::<Self>() + mem::size_of_val(self.ranges.as_slice())
810    }
811}
812
813/// Cached row group pages for a column.
814// We don't use enum here to make it easier to mock and use the struct.
815#[derive(Default)]
816pub struct PageValue {
817    /// Compressed page in the row group.
818    pub compressed: Vec<Bytes>,
819    /// Total size of the pages (may be larger than sum of compressed bytes due to gaps).
820    pub page_size: u64,
821}
822
823impl PageValue {
824    /// Creates a new value from a range of compressed pages.
825    pub fn new(bytes: Vec<Bytes>, page_size: u64) -> PageValue {
826        PageValue {
827            compressed: bytes,
828            page_size,
829        }
830    }
831
832    /// Returns memory used by the value (estimated).
833    fn estimated_size(&self) -> usize {
834        mem::size_of::<Self>()
835            + self.page_size as usize
836            + self.compressed.iter().map(mem::size_of_val).sum::<usize>()
837    }
838}
839
840/// Cache key for time series row selector result.
841#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
842pub struct SelectorResultKey {
843    /// Id of the SST file.
844    pub file_id: FileId,
845    /// Index of the row group.
846    pub row_group_idx: usize,
847    /// Time series row selector.
848    pub selector: TimeSeriesRowSelector,
849}
850
851/// Result stored in the selector result cache.
852pub enum SelectorResult {
853    /// Batches in the primary key format.
854    PrimaryKey(Vec<Batch>),
855    /// Record batches in the flat format.
856    Flat(Vec<RecordBatch>),
857}
858
859/// Cached result for time series row selector.
860pub struct SelectorResultValue {
861    /// Batches of rows selected by the selector.
862    pub result: SelectorResult,
863    /// Projection of rows.
864    pub projection: Vec<usize>,
865}
866
867impl SelectorResultValue {
868    /// Creates a new selector result value with primary key format.
869    pub fn new(result: Vec<Batch>, projection: Vec<usize>) -> SelectorResultValue {
870        SelectorResultValue {
871            result: SelectorResult::PrimaryKey(result),
872            projection,
873        }
874    }
875
876    /// Creates a new selector result value with flat format.
877    pub fn new_flat(result: Vec<RecordBatch>, projection: Vec<usize>) -> SelectorResultValue {
878        SelectorResultValue {
879            result: SelectorResult::Flat(result),
880            projection,
881        }
882    }
883
884    /// Returns memory used by the value (estimated).
885    fn estimated_size(&self) -> usize {
886        match &self.result {
887            SelectorResult::PrimaryKey(batches) => {
888                batches.iter().map(|batch| batch.memory_size()).sum()
889            }
890            SelectorResult::Flat(batches) => batches.iter().map(record_batch_estimated_size).sum(),
891        }
892    }
893}
894
895/// Maps (region id, file id) to [ParquetMetaData].
896type SstMetaCache = Cache<SstMetaKey, Arc<ParquetMetaData>>;
897/// Maps [Value] to a vector that holds this value repeatedly.
898///
899/// e.g. `"hello" => ["hello", "hello", "hello"]`
900type VectorCache = Cache<(ConcreteDataType, Value), VectorRef>;
901/// Maps (region, file, row group, column) to [PageValue].
902type PageCache = Cache<PageKey, Arc<PageValue>>;
903/// Maps (file id, row group id, time series row selector) to [SelectorResultValue].
904type SelectorResultCache = Cache<SelectorResultKey, Arc<SelectorResultValue>>;
905
906#[cfg(test)]
907mod tests {
908    use std::sync::Arc;
909
910    use api::v1::index::{BloomFilterMeta, InvertedIndexMetas};
911    use datatypes::vectors::Int64Vector;
912    use puffin::file_metadata::FileMetadata;
913    use store_api::storage::ColumnId;
914
915    use super::*;
916    use crate::cache::index::bloom_filter_index::Tag;
917    use crate::cache::index::result_cache::PredicateKey;
918    use crate::cache::test_util::parquet_meta;
919    use crate::sst::parquet::row_selection::RowGroupSelection;
920
921    #[tokio::test]
922    async fn test_disable_cache() {
923        let cache = CacheManager::default();
924        assert!(cache.sst_meta_cache.is_none());
925        assert!(cache.vector_cache.is_none());
926        assert!(cache.page_cache.is_none());
927
928        let region_id = RegionId::new(1, 1);
929        let file_id = RegionFileId::new(region_id, FileId::random());
930        let metadata = parquet_meta();
931        let mut metrics = MetadataCacheMetrics::default();
932        cache.put_parquet_meta_data(file_id, metadata);
933        assert!(
934            cache
935                .get_parquet_meta_data(file_id, &mut metrics, Default::default())
936                .await
937                .is_none()
938        );
939
940        let value = Value::Int64(10);
941        let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10]));
942        cache.put_repeated_vector(value.clone(), vector.clone());
943        assert!(
944            cache
945                .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
946                .is_none()
947        );
948
949        let key = PageKey::new(file_id.file_id(), 1, vec![Range { start: 0, end: 5 }]);
950        let pages = Arc::new(PageValue::default());
951        cache.put_pages(key.clone(), pages);
952        assert!(cache.get_pages(&key).is_none());
953
954        assert!(cache.write_cache().is_none());
955    }
956
957    #[tokio::test]
958    async fn test_parquet_meta_cache() {
959        let cache = CacheManager::builder().sst_meta_cache_size(2000).build();
960        let mut metrics = MetadataCacheMetrics::default();
961        let region_id = RegionId::new(1, 1);
962        let file_id = RegionFileId::new(region_id, FileId::random());
963        assert!(
964            cache
965                .get_parquet_meta_data(file_id, &mut metrics, Default::default())
966                .await
967                .is_none()
968        );
969        let metadata = parquet_meta();
970        cache.put_parquet_meta_data(file_id, metadata);
971        assert!(
972            cache
973                .get_parquet_meta_data(file_id, &mut metrics, Default::default())
974                .await
975                .is_some()
976        );
977        cache.remove_parquet_meta_data(file_id);
978        assert!(
979            cache
980                .get_parquet_meta_data(file_id, &mut metrics, Default::default())
981                .await
982                .is_none()
983        );
984    }
985
986    #[test]
987    fn test_repeated_vector_cache() {
988        let cache = CacheManager::builder().vector_cache_size(4096).build();
989        let value = Value::Int64(10);
990        assert!(
991            cache
992                .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
993                .is_none()
994        );
995        let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10]));
996        cache.put_repeated_vector(value.clone(), vector.clone());
997        let cached = cache
998            .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
999            .unwrap();
1000        assert_eq!(vector, cached);
1001    }
1002
1003    #[test]
1004    fn test_page_cache() {
1005        let cache = CacheManager::builder().page_cache_size(1000).build();
1006        let file_id = FileId::random();
1007        let key = PageKey::new(file_id, 0, vec![(0..10), (10..20)]);
1008        assert!(cache.get_pages(&key).is_none());
1009        let pages = Arc::new(PageValue::default());
1010        cache.put_pages(key.clone(), pages);
1011        assert!(cache.get_pages(&key).is_some());
1012    }
1013
1014    #[test]
1015    fn test_selector_result_cache() {
1016        let cache = CacheManager::builder()
1017            .selector_result_cache_size(1000)
1018            .build();
1019        let file_id = FileId::random();
1020        let key = SelectorResultKey {
1021            file_id,
1022            row_group_idx: 0,
1023            selector: TimeSeriesRowSelector::LastRow,
1024        };
1025        assert!(cache.get_selector_result(&key).is_none());
1026        let result = Arc::new(SelectorResultValue::new(Vec::new(), Vec::new()));
1027        cache.put_selector_result(key, result);
1028        assert!(cache.get_selector_result(&key).is_some());
1029    }
1030
1031    #[tokio::test]
1032    async fn test_evict_puffin_cache_clears_all_entries() {
1033        use std::collections::{BTreeMap, HashMap};
1034
1035        let cache = CacheManager::builder()
1036            .index_metadata_size(128)
1037            .index_content_size(128)
1038            .index_content_page_size(64)
1039            .index_result_cache_size(128)
1040            .puffin_metadata_size(128)
1041            .build();
1042        let cache = Arc::new(cache);
1043
1044        let region_id = RegionId::new(1, 1);
1045        let index_id = RegionIndexId::new(RegionFileId::new(region_id, FileId::random()), 0);
1046        let column_id: ColumnId = 1;
1047
1048        let bloom_cache = cache.bloom_filter_index_cache().unwrap().clone();
1049        let inverted_cache = cache.inverted_index_cache().unwrap().clone();
1050        let result_cache = cache.index_result_cache().unwrap();
1051        let puffin_metadata_cache = cache.puffin_metadata_cache().unwrap().clone();
1052
1053        let bloom_key = (
1054            index_id.file_id(),
1055            index_id.version,
1056            column_id,
1057            Tag::Skipping,
1058        );
1059        bloom_cache.put_metadata(bloom_key, Arc::new(BloomFilterMeta::default()));
1060        inverted_cache.put_metadata(
1061            (index_id.file_id(), index_id.version),
1062            Arc::new(InvertedIndexMetas::default()),
1063        );
1064        let predicate = PredicateKey::new_bloom(Arc::new(BTreeMap::new()));
1065        let selection = Arc::new(RowGroupSelection::default());
1066        result_cache.put(predicate.clone(), index_id.file_id(), selection);
1067        let file_id_str = index_id.to_string();
1068        let metadata = Arc::new(FileMetadata {
1069            blobs: Vec::new(),
1070            properties: HashMap::new(),
1071        });
1072        puffin_metadata_cache.put_metadata(file_id_str.clone(), metadata);
1073
1074        assert!(bloom_cache.get_metadata(bloom_key).is_some());
1075        assert!(
1076            inverted_cache
1077                .get_metadata((index_id.file_id(), index_id.version))
1078                .is_some()
1079        );
1080        assert!(result_cache.get(&predicate, index_id.file_id()).is_some());
1081        assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_some());
1082
1083        cache.evict_puffin_cache(index_id).await;
1084
1085        assert!(bloom_cache.get_metadata(bloom_key).is_none());
1086        assert!(
1087            inverted_cache
1088                .get_metadata((index_id.file_id(), index_id.version))
1089                .is_none()
1090        );
1091        assert!(result_cache.get(&predicate, index_id.file_id()).is_none());
1092        assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_none());
1093
1094        // Refill caches and evict via CacheStrategy to ensure delegation works.
1095        bloom_cache.put_metadata(bloom_key, Arc::new(BloomFilterMeta::default()));
1096        inverted_cache.put_metadata(
1097            (index_id.file_id(), index_id.version),
1098            Arc::new(InvertedIndexMetas::default()),
1099        );
1100        result_cache.put(
1101            predicate.clone(),
1102            index_id.file_id(),
1103            Arc::new(RowGroupSelection::default()),
1104        );
1105        puffin_metadata_cache.put_metadata(
1106            file_id_str.clone(),
1107            Arc::new(FileMetadata {
1108                blobs: Vec::new(),
1109                properties: HashMap::new(),
1110            }),
1111        );
1112
1113        let strategy = CacheStrategy::EnableAll(cache.clone());
1114        strategy.evict_puffin_cache(index_id).await;
1115
1116        assert!(bloom_cache.get_metadata(bloom_key).is_none());
1117        assert!(
1118            inverted_cache
1119                .get_metadata((index_id.file_id(), index_id.version))
1120                .is_none()
1121        );
1122        assert!(result_cache.get(&predicate, index_id.file_id()).is_none());
1123        assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_none());
1124    }
1125}