mito2/
cache.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Cache for the engine.
16
17pub(crate) mod cache_size;
18
19pub(crate) mod file_cache;
20pub(crate) mod index;
21pub(crate) mod manifest_cache;
22#[cfg(test)]
23pub(crate) mod test_util;
24pub(crate) mod write_cache;
25
26use std::mem;
27use std::ops::Range;
28use std::sync::Arc;
29
30use bytes::Bytes;
31use common_telemetry::warn;
32use datatypes::arrow::record_batch::RecordBatch;
33use datatypes::value::Value;
34use datatypes::vectors::VectorRef;
35use index::bloom_filter_index::{BloomFilterIndexCache, BloomFilterIndexCacheRef};
36use index::result_cache::IndexResultCache;
37use moka::notification::RemovalCause;
38use moka::sync::Cache;
39use object_store::ObjectStore;
40use parquet::file::metadata::{FileMetaData, PageIndexPolicy, ParquetMetaData};
41use puffin::puffin_manager::cache::{PuffinMetadataCache, PuffinMetadataCacheRef};
42use snafu::{OptionExt, ResultExt};
43use store_api::metadata::RegionMetadataRef;
44use store_api::storage::{ConcreteDataType, FileId, RegionId, TimeSeriesRowSelector};
45
46use crate::cache::cache_size::parquet_meta_size;
47use crate::cache::file_cache::{FileType, IndexKey};
48use crate::cache::index::inverted_index::{InvertedIndexCache, InvertedIndexCacheRef};
49#[cfg(feature = "vector_index")]
50use crate::cache::index::vector_index::{VectorIndexCache, VectorIndexCacheRef};
51use crate::cache::write_cache::WriteCacheRef;
52use crate::error::{InvalidMetadataSnafu, InvalidParquetSnafu, Result};
53use crate::memtable::record_batch_estimated_size;
54use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS};
55use crate::read::Batch;
56use crate::read::range_cache::{RangeScanCacheKey, RangeScanCacheValue};
57use crate::sst::file::{RegionFileId, RegionIndexId};
58use crate::sst::parquet::PARQUET_METADATA_KEY;
59use crate::sst::parquet::reader::MetadataCacheMetrics;
60
61/// Metrics type key for sst meta.
62const SST_META_TYPE: &str = "sst_meta";
63/// Metrics type key for vector.
64const VECTOR_TYPE: &str = "vector";
65/// Metrics type key for pages.
66const PAGE_TYPE: &str = "page";
67/// Metrics type key for files on the local store.
68const FILE_TYPE: &str = "file";
69/// Metrics type key for index files (puffin) on the local store.
70const INDEX_TYPE: &str = "index";
71/// Metrics type key for selector result cache.
72const SELECTOR_RESULT_TYPE: &str = "selector_result";
73/// Metrics type key for range scan result cache.
74const RANGE_RESULT_TYPE: &str = "range_result";
75
76/// Cached SST metadata combines the parquet footer with the decoded region metadata.
77///
78/// The cached parquet footer strips the `greptime:metadata` JSON payload and stores the decoded
79/// [RegionMetadata] separately so readers can skip repeated deserialization work.
80#[derive(Debug)]
81pub(crate) struct CachedSstMeta {
82    parquet_metadata: Arc<ParquetMetaData>,
83    region_metadata: RegionMetadataRef,
84    region_metadata_weight: usize,
85}
86
87impl CachedSstMeta {
88    pub(crate) fn try_new(file_path: &str, parquet_metadata: ParquetMetaData) -> Result<Self> {
89        Self::try_new_with_region_metadata(file_path, parquet_metadata, None)
90    }
91
92    pub(crate) fn try_new_with_region_metadata(
93        file_path: &str,
94        parquet_metadata: ParquetMetaData,
95        region_metadata: Option<RegionMetadataRef>,
96    ) -> Result<Self> {
97        let file_metadata = parquet_metadata.file_metadata();
98        let key_values = file_metadata
99            .key_value_metadata()
100            .context(InvalidParquetSnafu {
101                file: file_path,
102                reason: "missing key value meta",
103            })?;
104        let meta_value = key_values
105            .iter()
106            .find(|kv| kv.key == PARQUET_METADATA_KEY)
107            .with_context(|| InvalidParquetSnafu {
108                file: file_path,
109                reason: format!("key {} not found", PARQUET_METADATA_KEY),
110            })?;
111        let json = meta_value
112            .value
113            .as_ref()
114            .with_context(|| InvalidParquetSnafu {
115                file: file_path,
116                reason: format!("No value for key {}", PARQUET_METADATA_KEY),
117            })?;
118        let region_metadata = match region_metadata {
119            Some(region_metadata) => region_metadata,
120            None => Arc::new(
121                store_api::metadata::RegionMetadata::from_json(json)
122                    .context(InvalidMetadataSnafu)?,
123            ),
124        };
125        // Keep the previous JSON-byte floor and charge the decoded structures as well.
126        let region_metadata_weight = region_metadata.estimated_size().max(json.len());
127        let parquet_metadata = Arc::new(strip_region_metadata_from_parquet(parquet_metadata));
128
129        Ok(Self {
130            parquet_metadata,
131            region_metadata,
132            region_metadata_weight,
133        })
134    }
135
136    pub(crate) fn parquet_metadata(&self) -> Arc<ParquetMetaData> {
137        self.parquet_metadata.clone()
138    }
139
140    pub(crate) fn region_metadata(&self) -> RegionMetadataRef {
141        self.region_metadata.clone()
142    }
143}
144
145fn strip_region_metadata_from_parquet(parquet_metadata: ParquetMetaData) -> ParquetMetaData {
146    let file_metadata = parquet_metadata.file_metadata();
147    let filtered_key_values = file_metadata.key_value_metadata().and_then(|key_values| {
148        let filtered = key_values
149            .iter()
150            .filter(|kv| kv.key != PARQUET_METADATA_KEY)
151            .cloned()
152            .collect::<Vec<_>>();
153        (!filtered.is_empty()).then_some(filtered)
154    });
155    let stripped_file_metadata = FileMetaData::new(
156        file_metadata.version(),
157        file_metadata.num_rows(),
158        file_metadata.created_by().map(ToString::to_string),
159        filtered_key_values,
160        file_metadata.schema_descr_ptr(),
161        file_metadata.column_orders().cloned(),
162    );
163
164    let mut builder = parquet_metadata.into_builder();
165    let row_groups = builder.take_row_groups();
166    let column_index = builder.take_column_index();
167    let offset_index = builder.take_offset_index();
168
169    parquet::file::metadata::ParquetMetaDataBuilder::new(stripped_file_metadata)
170        .set_row_groups(row_groups)
171        .set_column_index(column_index)
172        .set_offset_index(offset_index)
173        .build()
174}
175
176/// Cache strategies that may only enable a subset of caches.
177#[derive(Clone)]
178pub enum CacheStrategy {
179    /// Strategy for normal operations.
180    /// Doesn't disable any cache.
181    EnableAll(CacheManagerRef),
182    /// Strategy for compaction.
183    /// Disables some caches during compaction to avoid affecting queries.
184    /// Enables the write cache so that the compaction can read files cached
185    /// in the write cache and write the compacted files back to the write cache.
186    Compaction(CacheManagerRef),
187    /// Do not use any cache.
188    Disabled,
189}
190
191impl CacheStrategy {
192    /// Gets fused SST metadata with cache metrics tracking.
193    pub(crate) async fn get_sst_meta_data(
194        &self,
195        file_id: RegionFileId,
196        metrics: &mut MetadataCacheMetrics,
197        page_index_policy: PageIndexPolicy,
198    ) -> Option<Arc<CachedSstMeta>> {
199        match self {
200            CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => {
201                cache_manager
202                    .get_sst_meta_data(file_id, metrics, page_index_policy)
203                    .await
204            }
205            CacheStrategy::Disabled => {
206                metrics.cache_miss += 1;
207                None
208            }
209        }
210    }
211
212    /// Calls [CacheManager::get_sst_meta_data_from_mem_cache()].
213    pub(crate) fn get_sst_meta_data_from_mem_cache(
214        &self,
215        file_id: RegionFileId,
216    ) -> Option<Arc<CachedSstMeta>> {
217        match self {
218            CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => {
219                cache_manager.get_sst_meta_data_from_mem_cache(file_id)
220            }
221            CacheStrategy::Disabled => None,
222        }
223    }
224
225    /// Calls [CacheManager::get_parquet_meta_data_from_mem_cache()].
226    pub fn get_parquet_meta_data_from_mem_cache(
227        &self,
228        file_id: RegionFileId,
229    ) -> Option<Arc<ParquetMetaData>> {
230        self.get_sst_meta_data_from_mem_cache(file_id)
231            .map(|metadata| metadata.parquet_metadata())
232    }
233
234    /// Calls [CacheManager::put_sst_meta_data()].
235    pub(crate) fn put_sst_meta_data(&self, file_id: RegionFileId, metadata: Arc<CachedSstMeta>) {
236        match self {
237            CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => {
238                cache_manager.put_sst_meta_data(file_id, metadata);
239            }
240            CacheStrategy::Disabled => {}
241        }
242    }
243
244    /// Calls [CacheManager::put_parquet_meta_data()].
245    pub fn put_parquet_meta_data(
246        &self,
247        file_id: RegionFileId,
248        metadata: Arc<ParquetMetaData>,
249        region_metadata: Option<RegionMetadataRef>,
250    ) {
251        match self {
252            CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => {
253                cache_manager.put_parquet_meta_data(file_id, metadata, region_metadata);
254            }
255            CacheStrategy::Disabled => {}
256        }
257    }
258
259    /// Calls [CacheManager::remove_parquet_meta_data()].
260    pub fn remove_parquet_meta_data(&self, file_id: RegionFileId) {
261        match self {
262            CacheStrategy::EnableAll(cache_manager) => {
263                cache_manager.remove_parquet_meta_data(file_id);
264            }
265            CacheStrategy::Compaction(cache_manager) => {
266                cache_manager.remove_parquet_meta_data(file_id);
267            }
268            CacheStrategy::Disabled => {}
269        }
270    }
271
272    /// Calls [CacheManager::get_repeated_vector()].
273    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
274    pub fn get_repeated_vector(
275        &self,
276        data_type: &ConcreteDataType,
277        value: &Value,
278    ) -> Option<VectorRef> {
279        match self {
280            CacheStrategy::EnableAll(cache_manager) => {
281                cache_manager.get_repeated_vector(data_type, value)
282            }
283            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
284        }
285    }
286
287    /// Calls [CacheManager::put_repeated_vector()].
288    /// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
289    pub fn put_repeated_vector(&self, value: Value, vector: VectorRef) {
290        if let CacheStrategy::EnableAll(cache_manager) = self {
291            cache_manager.put_repeated_vector(value, vector);
292        }
293    }
294
295    /// Calls [CacheManager::get_pages()].
296    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
297    pub fn get_pages(&self, page_key: &PageKey) -> Option<Arc<PageValue>> {
298        match self {
299            CacheStrategy::EnableAll(cache_manager) => cache_manager.get_pages(page_key),
300            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
301        }
302    }
303
304    /// Calls [CacheManager::put_pages()].
305    /// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
306    pub fn put_pages(&self, page_key: PageKey, pages: Arc<PageValue>) {
307        if let CacheStrategy::EnableAll(cache_manager) = self {
308            cache_manager.put_pages(page_key, pages);
309        }
310    }
311
312    /// Calls [CacheManager::evict_puffin_cache()].
313    pub async fn evict_puffin_cache(&self, file_id: RegionIndexId) {
314        match self {
315            CacheStrategy::EnableAll(cache_manager) => {
316                cache_manager.evict_puffin_cache(file_id).await
317            }
318            CacheStrategy::Compaction(cache_manager) => {
319                cache_manager.evict_puffin_cache(file_id).await
320            }
321            CacheStrategy::Disabled => {}
322        }
323    }
324
325    /// Calls [CacheManager::get_selector_result()].
326    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
327    pub fn get_selector_result(
328        &self,
329        selector_key: &SelectorResultKey,
330    ) -> Option<Arc<SelectorResultValue>> {
331        match self {
332            CacheStrategy::EnableAll(cache_manager) => {
333                cache_manager.get_selector_result(selector_key)
334            }
335            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
336        }
337    }
338
339    /// Calls [CacheManager::put_selector_result()].
340    /// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
341    pub fn put_selector_result(
342        &self,
343        selector_key: SelectorResultKey,
344        result: Arc<SelectorResultValue>,
345    ) {
346        if let CacheStrategy::EnableAll(cache_manager) = self {
347            cache_manager.put_selector_result(selector_key, result);
348        }
349    }
350
351    /// Calls [CacheManager::get_range_result()].
352    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
353    #[allow(dead_code)]
354    pub(crate) fn get_range_result(
355        &self,
356        key: &RangeScanCacheKey,
357    ) -> Option<Arc<RangeScanCacheValue>> {
358        match self {
359            CacheStrategy::EnableAll(cache_manager) => cache_manager.get_range_result(key),
360            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
361        }
362    }
363
364    /// Calls [CacheManager::put_range_result()].
365    /// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
366    pub(crate) fn put_range_result(
367        &self,
368        key: RangeScanCacheKey,
369        result: Arc<RangeScanCacheValue>,
370    ) {
371        if let CacheStrategy::EnableAll(cache_manager) = self {
372            cache_manager.put_range_result(key, result);
373        }
374    }
375
376    /// Calls [CacheManager::write_cache()].
377    /// It returns None if the strategy is [CacheStrategy::Disabled].
378    pub fn write_cache(&self) -> Option<&WriteCacheRef> {
379        match self {
380            CacheStrategy::EnableAll(cache_manager) => cache_manager.write_cache(),
381            CacheStrategy::Compaction(cache_manager) => cache_manager.write_cache(),
382            CacheStrategy::Disabled => None,
383        }
384    }
385
386    /// Calls [CacheManager::index_cache()].
387    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
388    pub fn inverted_index_cache(&self) -> Option<&InvertedIndexCacheRef> {
389        match self {
390            CacheStrategy::EnableAll(cache_manager) => cache_manager.inverted_index_cache(),
391            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
392        }
393    }
394
395    /// Calls [CacheManager::bloom_filter_index_cache()].
396    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
397    pub fn bloom_filter_index_cache(&self) -> Option<&BloomFilterIndexCacheRef> {
398        match self {
399            CacheStrategy::EnableAll(cache_manager) => cache_manager.bloom_filter_index_cache(),
400            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
401        }
402    }
403
404    /// Calls [CacheManager::vector_index_cache()].
405    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
406    #[cfg(feature = "vector_index")]
407    pub fn vector_index_cache(&self) -> Option<&VectorIndexCacheRef> {
408        match self {
409            CacheStrategy::EnableAll(cache_manager) => cache_manager.vector_index_cache(),
410            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
411        }
412    }
413
414    /// Calls [CacheManager::puffin_metadata_cache()].
415    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
416    pub fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
417        match self {
418            CacheStrategy::EnableAll(cache_manager) => cache_manager.puffin_metadata_cache(),
419            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
420        }
421    }
422
423    /// Calls [CacheManager::index_result_cache()].
424    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
425    pub fn index_result_cache(&self) -> Option<&IndexResultCache> {
426        match self {
427            CacheStrategy::EnableAll(cache_manager) => cache_manager.index_result_cache(),
428            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
429        }
430    }
431
432    /// Triggers download if the strategy is [CacheStrategy::EnableAll] and write cache is available.
433    pub fn maybe_download_background(
434        &self,
435        index_key: IndexKey,
436        remote_path: String,
437        remote_store: ObjectStore,
438        file_size: u64,
439    ) {
440        if let CacheStrategy::EnableAll(cache_manager) = self
441            && let Some(write_cache) = cache_manager.write_cache()
442        {
443            write_cache.file_cache().maybe_download_background(
444                index_key,
445                remote_path,
446                remote_store,
447                file_size,
448            );
449        }
450    }
451}
452
453/// Manages cached data for the engine.
454///
455/// All caches are disabled by default.
456#[derive(Default)]
457pub struct CacheManager {
458    /// Cache for SST metadata.
459    sst_meta_cache: Option<SstMetaCache>,
460    /// Cache for vectors.
461    vector_cache: Option<VectorCache>,
462    /// Cache for SST pages.
463    page_cache: Option<PageCache>,
464    /// A Cache for writing files to object stores.
465    write_cache: Option<WriteCacheRef>,
466    /// Cache for inverted index.
467    inverted_index_cache: Option<InvertedIndexCacheRef>,
468    /// Cache for bloom filter index.
469    bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
470    /// Cache for vector index.
471    #[cfg(feature = "vector_index")]
472    vector_index_cache: Option<VectorIndexCacheRef>,
473    /// Puffin metadata cache.
474    puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
475    /// Cache for time series selectors.
476    selector_result_cache: Option<SelectorResultCache>,
477    /// Cache for range scan outputs in flat format.
478    range_result_cache: Option<RangeResultCache>,
479    /// Cache for index result.
480    index_result_cache: Option<IndexResultCache>,
481}
482
483pub type CacheManagerRef = Arc<CacheManager>;
484
485impl CacheManager {
486    /// Returns a builder to build the cache.
487    pub fn builder() -> CacheManagerBuilder {
488        CacheManagerBuilder::default()
489    }
490
491    /// Gets fused SST metadata with metrics tracking.
492    /// Tries in-memory cache first, then file cache, updating metrics accordingly.
493    pub(crate) async fn get_sst_meta_data(
494        &self,
495        file_id: RegionFileId,
496        metrics: &mut MetadataCacheMetrics,
497        page_index_policy: PageIndexPolicy,
498    ) -> Option<Arc<CachedSstMeta>> {
499        if let Some(metadata) = self.get_sst_meta_data_from_mem_cache(file_id) {
500            metrics.mem_cache_hit += 1;
501            return Some(metadata);
502        }
503
504        let key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Parquet);
505        if let Some(write_cache) = &self.write_cache
506            && let Some(metadata) = write_cache
507                .file_cache()
508                .get_sst_meta_data(key, metrics, page_index_policy)
509                .await
510        {
511            metrics.file_cache_hit += 1;
512            self.put_sst_meta_data(file_id, metadata.clone());
513            return Some(metadata);
514        }
515
516        metrics.cache_miss += 1;
517        None
518    }
519
520    /// Gets cached [ParquetMetaData] with metrics tracking.
521    /// Tries in-memory cache first, then file cache, updating metrics accordingly.
522    pub(crate) async fn get_parquet_meta_data(
523        &self,
524        file_id: RegionFileId,
525        metrics: &mut MetadataCacheMetrics,
526        page_index_policy: PageIndexPolicy,
527    ) -> Option<Arc<ParquetMetaData>> {
528        self.get_sst_meta_data(file_id, metrics, page_index_policy)
529            .await
530            .map(|metadata| metadata.parquet_metadata())
531    }
532
533    /// Gets cached fused SST metadata from in-memory cache.
534    /// This method does not perform I/O.
535    pub(crate) fn get_sst_meta_data_from_mem_cache(
536        &self,
537        file_id: RegionFileId,
538    ) -> Option<Arc<CachedSstMeta>> {
539        self.sst_meta_cache.as_ref().and_then(|sst_meta_cache| {
540            let value = sst_meta_cache.get(&SstMetaKey(file_id.region_id(), file_id.file_id()));
541            update_hit_miss(value, SST_META_TYPE)
542        })
543    }
544
545    /// Gets cached [ParquetMetaData] from in-memory cache.
546    /// This method does not perform I/O.
547    pub fn get_parquet_meta_data_from_mem_cache(
548        &self,
549        file_id: RegionFileId,
550    ) -> Option<Arc<ParquetMetaData>> {
551        self.get_sst_meta_data_from_mem_cache(file_id)
552            .map(|metadata| metadata.parquet_metadata())
553    }
554
555    /// Puts fused SST metadata into the cache.
556    pub(crate) fn put_sst_meta_data(&self, file_id: RegionFileId, metadata: Arc<CachedSstMeta>) {
557        if let Some(cache) = &self.sst_meta_cache {
558            let key = SstMetaKey(file_id.region_id(), file_id.file_id());
559            CACHE_BYTES
560                .with_label_values(&[SST_META_TYPE])
561                .add(meta_cache_weight(&key, &metadata).into());
562            cache.insert(key, metadata);
563        }
564    }
565
566    /// Puts [ParquetMetaData] into the cache.
567    pub fn put_parquet_meta_data(
568        &self,
569        file_id: RegionFileId,
570        metadata: Arc<ParquetMetaData>,
571        region_metadata: Option<RegionMetadataRef>,
572    ) {
573        if self.sst_meta_cache.is_some() {
574            let file_path = format!(
575                "region_id={}, file_id={}",
576                file_id.region_id(),
577                file_id.file_id()
578            );
579            match CachedSstMeta::try_new_with_region_metadata(
580                &file_path,
581                Arc::unwrap_or_clone(metadata),
582                region_metadata,
583            ) {
584                Ok(metadata) => self.put_sst_meta_data(file_id, Arc::new(metadata)),
585                Err(err) => warn!(
586                    err; "Failed to decode region metadata while caching parquet metadata, region_id: {}, file_id: {}",
587                    file_id.region_id(),
588                    file_id.file_id()
589                ),
590            }
591        }
592    }
593
594    /// Removes [ParquetMetaData] from the cache.
595    pub fn remove_parquet_meta_data(&self, file_id: RegionFileId) {
596        if let Some(cache) = &self.sst_meta_cache {
597            cache.remove(&SstMetaKey(file_id.region_id(), file_id.file_id()));
598        }
599    }
600
601    /// Returns the total weighted size of the in-memory SST meta cache.
602    pub(crate) fn sst_meta_cache_weighted_size(&self) -> u64 {
603        self.sst_meta_cache
604            .as_ref()
605            .map(|cache| cache.weighted_size())
606            .unwrap_or(0)
607    }
608
609    /// Returns true if the in-memory SST meta cache is enabled.
610    pub(crate) fn sst_meta_cache_enabled(&self) -> bool {
611        self.sst_meta_cache.is_some()
612    }
613
614    /// Gets a vector with repeated value for specific `key`.
615    pub fn get_repeated_vector(
616        &self,
617        data_type: &ConcreteDataType,
618        value: &Value,
619    ) -> Option<VectorRef> {
620        self.vector_cache.as_ref().and_then(|vector_cache| {
621            let value = vector_cache.get(&(data_type.clone(), value.clone()));
622            update_hit_miss(value, VECTOR_TYPE)
623        })
624    }
625
626    /// Puts a vector with repeated value into the cache.
627    pub fn put_repeated_vector(&self, value: Value, vector: VectorRef) {
628        if let Some(cache) = &self.vector_cache {
629            let key = (vector.data_type(), value);
630            CACHE_BYTES
631                .with_label_values(&[VECTOR_TYPE])
632                .add(vector_cache_weight(&key, &vector).into());
633            cache.insert(key, vector);
634        }
635    }
636
637    /// Gets pages for the row group.
638    pub fn get_pages(&self, page_key: &PageKey) -> Option<Arc<PageValue>> {
639        self.page_cache.as_ref().and_then(|page_cache| {
640            let value = page_cache.get(page_key);
641            update_hit_miss(value, PAGE_TYPE)
642        })
643    }
644
645    /// Puts pages of the row group into the cache.
646    pub fn put_pages(&self, page_key: PageKey, pages: Arc<PageValue>) {
647        if let Some(cache) = &self.page_cache {
648            CACHE_BYTES
649                .with_label_values(&[PAGE_TYPE])
650                .add(page_cache_weight(&page_key, &pages).into());
651            cache.insert(page_key, pages);
652        }
653    }
654
655    /// Evicts every puffin-related cache entry for the given file.
656    pub async fn evict_puffin_cache(&self, file_id: RegionIndexId) {
657        if let Some(cache) = &self.bloom_filter_index_cache {
658            cache.invalidate_file(file_id.file_id());
659        }
660
661        if let Some(cache) = &self.inverted_index_cache {
662            cache.invalidate_file(file_id.file_id());
663        }
664
665        if let Some(cache) = &self.index_result_cache {
666            cache.invalidate_file(file_id.file_id());
667        }
668
669        #[cfg(feature = "vector_index")]
670        if let Some(cache) = &self.vector_index_cache {
671            cache.invalidate_file(file_id.file_id());
672        }
673
674        if let Some(cache) = &self.puffin_metadata_cache {
675            cache.remove(&file_id.to_string());
676        }
677
678        if let Some(write_cache) = &self.write_cache {
679            write_cache
680                .remove(IndexKey::new(
681                    file_id.region_id(),
682                    file_id.file_id(),
683                    FileType::Puffin(file_id.version),
684                ))
685                .await;
686        }
687    }
688
689    /// Gets result of for the selector.
690    pub fn get_selector_result(
691        &self,
692        selector_key: &SelectorResultKey,
693    ) -> Option<Arc<SelectorResultValue>> {
694        self.selector_result_cache
695            .as_ref()
696            .and_then(|selector_result_cache| selector_result_cache.get(selector_key))
697    }
698
699    /// Puts result of the selector into the cache.
700    pub fn put_selector_result(
701        &self,
702        selector_key: SelectorResultKey,
703        result: Arc<SelectorResultValue>,
704    ) {
705        if let Some(cache) = &self.selector_result_cache {
706            CACHE_BYTES
707                .with_label_values(&[SELECTOR_RESULT_TYPE])
708                .add(selector_result_cache_weight(&selector_key, &result).into());
709            cache.insert(selector_key, result);
710        }
711    }
712
713    /// Gets cached result for range scan.
714    #[allow(dead_code)]
715    pub(crate) fn get_range_result(
716        &self,
717        key: &RangeScanCacheKey,
718    ) -> Option<Arc<RangeScanCacheValue>> {
719        self.range_result_cache
720            .as_ref()
721            .and_then(|cache| update_hit_miss(cache.get(key), RANGE_RESULT_TYPE))
722    }
723
724    /// Puts range scan result into cache.
725    pub(crate) fn put_range_result(
726        &self,
727        key: RangeScanCacheKey,
728        result: Arc<RangeScanCacheValue>,
729    ) {
730        if let Some(cache) = &self.range_result_cache {
731            CACHE_BYTES
732                .with_label_values(&[RANGE_RESULT_TYPE])
733                .add(range_result_cache_weight(&key, &result).into());
734            cache.insert(key, result);
735        }
736    }
737
738    /// Gets the write cache.
739    pub(crate) fn write_cache(&self) -> Option<&WriteCacheRef> {
740        self.write_cache.as_ref()
741    }
742
743    pub(crate) fn inverted_index_cache(&self) -> Option<&InvertedIndexCacheRef> {
744        self.inverted_index_cache.as_ref()
745    }
746
747    pub(crate) fn bloom_filter_index_cache(&self) -> Option<&BloomFilterIndexCacheRef> {
748        self.bloom_filter_index_cache.as_ref()
749    }
750
751    #[cfg(feature = "vector_index")]
752    pub(crate) fn vector_index_cache(&self) -> Option<&VectorIndexCacheRef> {
753        self.vector_index_cache.as_ref()
754    }
755
756    pub(crate) fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
757        self.puffin_metadata_cache.as_ref()
758    }
759
760    pub(crate) fn index_result_cache(&self) -> Option<&IndexResultCache> {
761        self.index_result_cache.as_ref()
762    }
763}
764
765/// Increases selector cache miss metrics.
766pub fn selector_result_cache_miss() {
767    CACHE_MISS.with_label_values(&[SELECTOR_RESULT_TYPE]).inc()
768}
769
770/// Increases selector cache hit metrics.
771pub fn selector_result_cache_hit() {
772    CACHE_HIT.with_label_values(&[SELECTOR_RESULT_TYPE]).inc()
773}
774
775/// Builder to construct a [CacheManager].
776#[derive(Default)]
777pub struct CacheManagerBuilder {
778    sst_meta_cache_size: u64,
779    vector_cache_size: u64,
780    page_cache_size: u64,
781    index_metadata_size: u64,
782    index_content_size: u64,
783    index_content_page_size: u64,
784    index_result_cache_size: u64,
785    puffin_metadata_size: u64,
786    write_cache: Option<WriteCacheRef>,
787    selector_result_cache_size: u64,
788    range_result_cache_size: u64,
789}
790
791impl CacheManagerBuilder {
792    /// Sets meta cache size.
793    pub fn sst_meta_cache_size(mut self, bytes: u64) -> Self {
794        self.sst_meta_cache_size = bytes;
795        self
796    }
797
798    /// Sets vector cache size.
799    pub fn vector_cache_size(mut self, bytes: u64) -> Self {
800        self.vector_cache_size = bytes;
801        self
802    }
803
804    /// Sets page cache size.
805    pub fn page_cache_size(mut self, bytes: u64) -> Self {
806        self.page_cache_size = bytes;
807        self
808    }
809
810    /// Sets write cache.
811    pub fn write_cache(mut self, cache: Option<WriteCacheRef>) -> Self {
812        self.write_cache = cache;
813        self
814    }
815
816    /// Sets cache size for index metadata.
817    pub fn index_metadata_size(mut self, bytes: u64) -> Self {
818        self.index_metadata_size = bytes;
819        self
820    }
821
822    /// Sets cache size for index content.
823    pub fn index_content_size(mut self, bytes: u64) -> Self {
824        self.index_content_size = bytes;
825        self
826    }
827
828    /// Sets page size for index content.
829    pub fn index_content_page_size(mut self, bytes: u64) -> Self {
830        self.index_content_page_size = bytes;
831        self
832    }
833
834    /// Sets cache size for index result.
835    pub fn index_result_cache_size(mut self, bytes: u64) -> Self {
836        self.index_result_cache_size = bytes;
837        self
838    }
839
840    /// Sets cache size for puffin metadata.
841    pub fn puffin_metadata_size(mut self, bytes: u64) -> Self {
842        self.puffin_metadata_size = bytes;
843        self
844    }
845
846    /// Sets selector result cache size.
847    pub fn selector_result_cache_size(mut self, bytes: u64) -> Self {
848        self.selector_result_cache_size = bytes;
849        self
850    }
851
852    /// Sets range result cache size.
853    pub fn range_result_cache_size(mut self, bytes: u64) -> Self {
854        self.range_result_cache_size = bytes;
855        self
856    }
857
858    /// Builds the [CacheManager].
859    pub fn build(self) -> CacheManager {
860        fn to_str(cause: RemovalCause) -> &'static str {
861            match cause {
862                RemovalCause::Expired => "expired",
863                RemovalCause::Explicit => "explicit",
864                RemovalCause::Replaced => "replaced",
865                RemovalCause::Size => "size",
866            }
867        }
868
869        let sst_meta_cache = (self.sst_meta_cache_size != 0).then(|| {
870            Cache::builder()
871                .max_capacity(self.sst_meta_cache_size)
872                .weigher(meta_cache_weight)
873                .eviction_listener(|k, v, cause| {
874                    let size = meta_cache_weight(&k, &v);
875                    CACHE_BYTES
876                        .with_label_values(&[SST_META_TYPE])
877                        .sub(size.into());
878                    CACHE_EVICTION
879                        .with_label_values(&[SST_META_TYPE, to_str(cause)])
880                        .inc();
881                })
882                .build()
883        });
884        let vector_cache = (self.vector_cache_size != 0).then(|| {
885            Cache::builder()
886                .max_capacity(self.vector_cache_size)
887                .weigher(vector_cache_weight)
888                .eviction_listener(|k, v, cause| {
889                    let size = vector_cache_weight(&k, &v);
890                    CACHE_BYTES
891                        .with_label_values(&[VECTOR_TYPE])
892                        .sub(size.into());
893                    CACHE_EVICTION
894                        .with_label_values(&[VECTOR_TYPE, to_str(cause)])
895                        .inc();
896                })
897                .build()
898        });
899        let page_cache = (self.page_cache_size != 0).then(|| {
900            Cache::builder()
901                .max_capacity(self.page_cache_size)
902                .weigher(page_cache_weight)
903                .eviction_listener(|k, v, cause| {
904                    let size = page_cache_weight(&k, &v);
905                    CACHE_BYTES.with_label_values(&[PAGE_TYPE]).sub(size.into());
906                    CACHE_EVICTION
907                        .with_label_values(&[PAGE_TYPE, to_str(cause)])
908                        .inc();
909                })
910                .build()
911        });
912        let inverted_index_cache = InvertedIndexCache::new(
913            self.index_metadata_size,
914            self.index_content_size,
915            self.index_content_page_size,
916        );
917        // TODO(ruihang): check if it's ok to reuse the same param with inverted index
918        let bloom_filter_index_cache = BloomFilterIndexCache::new(
919            self.index_metadata_size,
920            self.index_content_size,
921            self.index_content_page_size,
922        );
923        #[cfg(feature = "vector_index")]
924        let vector_index_cache = (self.index_content_size != 0)
925            .then(|| Arc::new(VectorIndexCache::new(self.index_content_size)));
926        let index_result_cache = (self.index_result_cache_size != 0)
927            .then(|| IndexResultCache::new(self.index_result_cache_size));
928        let puffin_metadata_cache =
929            PuffinMetadataCache::new(self.puffin_metadata_size, &CACHE_BYTES);
930        let selector_result_cache = (self.selector_result_cache_size != 0).then(|| {
931            Cache::builder()
932                .max_capacity(self.selector_result_cache_size)
933                .weigher(selector_result_cache_weight)
934                .eviction_listener(|k, v, cause| {
935                    let size = selector_result_cache_weight(&k, &v);
936                    CACHE_BYTES
937                        .with_label_values(&[SELECTOR_RESULT_TYPE])
938                        .sub(size.into());
939                    CACHE_EVICTION
940                        .with_label_values(&[SELECTOR_RESULT_TYPE, to_str(cause)])
941                        .inc();
942                })
943                .build()
944        });
945        let range_result_cache = (self.range_result_cache_size != 0).then(|| {
946            Cache::builder()
947                .max_capacity(self.range_result_cache_size)
948                .weigher(range_result_cache_weight)
949                .eviction_listener(move |k, v, cause| {
950                    let size = range_result_cache_weight(&k, &v);
951                    CACHE_BYTES
952                        .with_label_values(&[RANGE_RESULT_TYPE])
953                        .sub(size.into());
954                    CACHE_EVICTION
955                        .with_label_values(&[RANGE_RESULT_TYPE, to_str(cause)])
956                        .inc();
957                })
958                .build()
959        });
960        CacheManager {
961            sst_meta_cache,
962            vector_cache,
963            page_cache,
964            write_cache: self.write_cache,
965            inverted_index_cache: Some(Arc::new(inverted_index_cache)),
966            bloom_filter_index_cache: Some(Arc::new(bloom_filter_index_cache)),
967            #[cfg(feature = "vector_index")]
968            vector_index_cache,
969            puffin_metadata_cache: Some(Arc::new(puffin_metadata_cache)),
970            selector_result_cache,
971            range_result_cache,
972            index_result_cache,
973        }
974    }
975}
976
977fn meta_cache_weight(k: &SstMetaKey, v: &Arc<CachedSstMeta>) -> u32 {
978    // We ignore the size of `Arc`.
979    (k.estimated_size() + parquet_meta_size(&v.parquet_metadata) + v.region_metadata_weight) as u32
980}
981
982fn vector_cache_weight(_k: &(ConcreteDataType, Value), v: &VectorRef) -> u32 {
983    // We ignore the heap size of `Value`.
984    (mem::size_of::<ConcreteDataType>() + mem::size_of::<Value>() + v.memory_size()) as u32
985}
986
987fn page_cache_weight(k: &PageKey, v: &Arc<PageValue>) -> u32 {
988    (k.estimated_size() + v.estimated_size()) as u32
989}
990
991fn selector_result_cache_weight(k: &SelectorResultKey, v: &Arc<SelectorResultValue>) -> u32 {
992    (mem::size_of_val(k) + v.estimated_size()) as u32
993}
994
995fn range_result_cache_weight(k: &RangeScanCacheKey, v: &Arc<RangeScanCacheValue>) -> u32 {
996    (k.estimated_size() + v.estimated_size()) as u32
997}
998
999/// Updates cache hit/miss metrics.
1000fn update_hit_miss<T>(value: Option<T>, cache_type: &str) -> Option<T> {
1001    if value.is_some() {
1002        CACHE_HIT.with_label_values(&[cache_type]).inc();
1003    } else {
1004        CACHE_MISS.with_label_values(&[cache_type]).inc();
1005    }
1006    value
1007}
1008
1009/// Cache key (region id, file id) for SST meta.
1010#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1011struct SstMetaKey(RegionId, FileId);
1012
1013impl SstMetaKey {
1014    /// Returns memory used by the key (estimated).
1015    fn estimated_size(&self) -> usize {
1016        mem::size_of::<Self>()
1017    }
1018}
1019
1020/// Path to column pages in the SST file.
1021#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1022pub struct ColumnPagePath {
1023    /// Region id of the SST file to cache.
1024    region_id: RegionId,
1025    /// Id of the SST file to cache.
1026    file_id: FileId,
1027    /// Index of the row group.
1028    row_group_idx: usize,
1029    /// Index of the column in the row group.
1030    column_idx: usize,
1031}
1032
1033/// Cache key to pages in a row group (after projection).
1034///
1035/// Different projections will have different cache keys.
1036/// We cache all ranges together because they may refer to the same `Bytes`.
1037#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1038pub struct PageKey {
1039    /// Id of the SST file to cache.
1040    file_id: FileId,
1041    /// Index of the row group.
1042    row_group_idx: usize,
1043    /// Byte ranges of the pages to cache.
1044    ranges: Vec<Range<u64>>,
1045}
1046
1047impl PageKey {
1048    /// Creates a key for a list of pages.
1049    pub fn new(file_id: FileId, row_group_idx: usize, ranges: Vec<Range<u64>>) -> PageKey {
1050        PageKey {
1051            file_id,
1052            row_group_idx,
1053            ranges,
1054        }
1055    }
1056
1057    /// Returns memory used by the key (estimated).
1058    fn estimated_size(&self) -> usize {
1059        mem::size_of::<Self>() + mem::size_of_val(self.ranges.as_slice())
1060    }
1061}
1062
1063/// Cached row group pages for a column.
1064// We don't use enum here to make it easier to mock and use the struct.
1065#[derive(Default)]
1066pub struct PageValue {
1067    /// Compressed page in the row group.
1068    pub compressed: Vec<Bytes>,
1069    /// Total size of the pages (may be larger than sum of compressed bytes due to gaps).
1070    pub page_size: u64,
1071}
1072
1073impl PageValue {
1074    /// Creates a new value from a range of compressed pages.
1075    pub fn new(bytes: Vec<Bytes>, page_size: u64) -> PageValue {
1076        PageValue {
1077            compressed: bytes,
1078            page_size,
1079        }
1080    }
1081
1082    /// Returns memory used by the value (estimated).
1083    fn estimated_size(&self) -> usize {
1084        mem::size_of::<Self>()
1085            + self.page_size as usize
1086            + self.compressed.iter().map(mem::size_of_val).sum::<usize>()
1087    }
1088}
1089
1090/// Cache key for time series row selector result.
1091#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1092pub struct SelectorResultKey {
1093    /// Id of the SST file.
1094    pub file_id: FileId,
1095    /// Index of the row group.
1096    pub row_group_idx: usize,
1097    /// Time series row selector.
1098    pub selector: TimeSeriesRowSelector,
1099}
1100
1101/// Result stored in the selector result cache.
1102pub enum SelectorResult {
1103    /// Batches in the primary key format.
1104    PrimaryKey(Vec<Batch>),
1105    /// Record batches in the flat format.
1106    Flat(Vec<RecordBatch>),
1107}
1108
1109/// Cached result for time series row selector.
1110pub struct SelectorResultValue {
1111    /// Batches of rows selected by the selector.
1112    pub result: SelectorResult,
1113    /// Projection of rows.
1114    pub projection: Vec<usize>,
1115}
1116
1117impl SelectorResultValue {
1118    /// Creates a new selector result value with primary key format.
1119    pub fn new(result: Vec<Batch>, projection: Vec<usize>) -> SelectorResultValue {
1120        SelectorResultValue {
1121            result: SelectorResult::PrimaryKey(result),
1122            projection,
1123        }
1124    }
1125
1126    /// Creates a new selector result value with flat format.
1127    pub fn new_flat(result: Vec<RecordBatch>, projection: Vec<usize>) -> SelectorResultValue {
1128        SelectorResultValue {
1129            result: SelectorResult::Flat(result),
1130            projection,
1131        }
1132    }
1133
1134    /// Returns memory used by the value (estimated).
1135    fn estimated_size(&self) -> usize {
1136        match &self.result {
1137            SelectorResult::PrimaryKey(batches) => {
1138                batches.iter().map(|batch| batch.memory_size()).sum()
1139            }
1140            SelectorResult::Flat(batches) => batches.iter().map(record_batch_estimated_size).sum(),
1141        }
1142    }
1143}
1144
1145/// Maps (region id, file id) to fused SST metadata.
1146type SstMetaCache = Cache<SstMetaKey, Arc<CachedSstMeta>>;
1147/// Maps [Value] to a vector that holds this value repeatedly.
1148///
1149/// e.g. `"hello" => ["hello", "hello", "hello"]`
1150type VectorCache = Cache<(ConcreteDataType, Value), VectorRef>;
1151/// Maps (region, file, row group, column) to [PageValue].
1152type PageCache = Cache<PageKey, Arc<PageValue>>;
1153/// Maps (file id, row group id, time series row selector) to [SelectorResultValue].
1154type SelectorResultCache = Cache<SelectorResultKey, Arc<SelectorResultValue>>;
1155/// Maps partition-range scan key to cached flat batches.
1156type RangeResultCache = Cache<RangeScanCacheKey, Arc<RangeScanCacheValue>>;
1157
1158#[cfg(test)]
1159mod tests {
1160    use std::sync::Arc;
1161
1162    use api::v1::SemanticType;
1163    use api::v1::index::{BloomFilterMeta, InvertedIndexMetas};
1164    use datatypes::schema::ColumnSchema;
1165    use datatypes::vectors::Int64Vector;
1166    use puffin::file_metadata::FileMetadata;
1167    use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
1168    use store_api::storage::ColumnId;
1169
1170    use super::*;
1171    use crate::cache::index::bloom_filter_index::Tag;
1172    use crate::cache::index::result_cache::PredicateKey;
1173    use crate::cache::test_util::{
1174        parquet_meta, sst_parquet_meta, sst_parquet_meta_with_region_metadata,
1175    };
1176    use crate::read::range_cache::{
1177        RangeScanCacheKey, RangeScanCacheValue, ScanRequestFingerprintBuilder,
1178    };
1179    use crate::sst::parquet::row_selection::RowGroupSelection;
1180
1181    #[tokio::test]
1182    async fn test_disable_cache() {
1183        let cache = CacheManager::default();
1184        assert!(cache.sst_meta_cache.is_none());
1185        assert!(cache.vector_cache.is_none());
1186        assert!(cache.page_cache.is_none());
1187
1188        let region_id = RegionId::new(1, 1);
1189        let file_id = RegionFileId::new(region_id, FileId::random());
1190        let metadata = parquet_meta();
1191        let mut metrics = MetadataCacheMetrics::default();
1192        cache.put_parquet_meta_data(file_id, metadata, None);
1193        assert!(
1194            cache
1195                .get_parquet_meta_data(file_id, &mut metrics, Default::default())
1196                .await
1197                .is_none()
1198        );
1199
1200        let value = Value::Int64(10);
1201        let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10]));
1202        cache.put_repeated_vector(value.clone(), vector.clone());
1203        assert!(
1204            cache
1205                .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
1206                .is_none()
1207        );
1208
1209        let key = PageKey::new(file_id.file_id(), 1, vec![Range { start: 0, end: 5 }]);
1210        let pages = Arc::new(PageValue::default());
1211        cache.put_pages(key.clone(), pages);
1212        assert!(cache.get_pages(&key).is_none());
1213
1214        assert!(cache.write_cache().is_none());
1215    }
1216
1217    #[tokio::test]
1218    async fn test_parquet_meta_cache() {
1219        let cache = CacheManager::builder().sst_meta_cache_size(2000).build();
1220        let mut metrics = MetadataCacheMetrics::default();
1221        let region_id = RegionId::new(1, 1);
1222        let file_id = RegionFileId::new(region_id, FileId::random());
1223        assert!(
1224            cache
1225                .get_parquet_meta_data(file_id, &mut metrics, Default::default())
1226                .await
1227                .is_none()
1228        );
1229        let (metadata, region_metadata) = sst_parquet_meta();
1230        cache.put_parquet_meta_data(file_id, metadata, None);
1231        let cached = cache
1232            .get_sst_meta_data(file_id, &mut metrics, Default::default())
1233            .await
1234            .unwrap();
1235        assert_eq!(region_metadata, cached.region_metadata());
1236        assert!(
1237            cached
1238                .parquet_metadata()
1239                .file_metadata()
1240                .key_value_metadata()
1241                .is_none_or(|key_values| {
1242                    key_values
1243                        .iter()
1244                        .all(|key_value| key_value.key != PARQUET_METADATA_KEY)
1245                })
1246        );
1247        cache.remove_parquet_meta_data(file_id);
1248        assert!(
1249            cache
1250                .get_parquet_meta_data(file_id, &mut metrics, Default::default())
1251                .await
1252                .is_none()
1253        );
1254    }
1255
1256    #[tokio::test]
1257    async fn test_parquet_meta_cache_with_provided_region_metadata() {
1258        let cache = CacheManager::builder().sst_meta_cache_size(2000).build();
1259        let mut metrics = MetadataCacheMetrics::default();
1260        let region_id = RegionId::new(1, 1);
1261        let file_id = RegionFileId::new(region_id, FileId::random());
1262        let (metadata, region_metadata) = sst_parquet_meta();
1263
1264        cache.put_parquet_meta_data(file_id, metadata, Some(region_metadata.clone()));
1265
1266        let cached = cache
1267            .get_sst_meta_data(file_id, &mut metrics, Default::default())
1268            .await
1269            .unwrap();
1270        assert!(Arc::ptr_eq(&region_metadata, &cached.region_metadata()));
1271    }
1272
1273    #[test]
1274    fn test_meta_cache_weight_accounts_for_decoded_region_metadata() {
1275        let region_metadata = Arc::new(wide_region_metadata(128));
1276        let json_len = region_metadata.to_json().unwrap().len();
1277        let metadata = sst_parquet_meta_with_region_metadata(region_metadata.clone());
1278        let cached = Arc::new(
1279            CachedSstMeta::try_new("test.parquet", Arc::unwrap_or_clone(metadata)).unwrap(),
1280        );
1281        let key = SstMetaKey(region_metadata.region_id, FileId::random());
1282
1283        assert!(cached.region_metadata_weight > json_len);
1284        assert_eq!(
1285            meta_cache_weight(&key, &cached) as usize,
1286            key.estimated_size()
1287                + parquet_meta_size(&cached.parquet_metadata)
1288                + cached.region_metadata_weight
1289        );
1290    }
1291
1292    #[test]
1293    fn test_repeated_vector_cache() {
1294        let cache = CacheManager::builder().vector_cache_size(4096).build();
1295        let value = Value::Int64(10);
1296        assert!(
1297            cache
1298                .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
1299                .is_none()
1300        );
1301        let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10]));
1302        cache.put_repeated_vector(value.clone(), vector.clone());
1303        let cached = cache
1304            .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
1305            .unwrap();
1306        assert_eq!(vector, cached);
1307    }
1308
1309    #[test]
1310    fn test_page_cache() {
1311        let cache = CacheManager::builder().page_cache_size(1000).build();
1312        let file_id = FileId::random();
1313        let key = PageKey::new(file_id, 0, vec![(0..10), (10..20)]);
1314        assert!(cache.get_pages(&key).is_none());
1315        let pages = Arc::new(PageValue::default());
1316        cache.put_pages(key.clone(), pages);
1317        assert!(cache.get_pages(&key).is_some());
1318    }
1319
1320    #[test]
1321    fn test_selector_result_cache() {
1322        let cache = CacheManager::builder()
1323            .selector_result_cache_size(1000)
1324            .build();
1325        let file_id = FileId::random();
1326        let key = SelectorResultKey {
1327            file_id,
1328            row_group_idx: 0,
1329            selector: TimeSeriesRowSelector::LastRow,
1330        };
1331        assert!(cache.get_selector_result(&key).is_none());
1332        let result = Arc::new(SelectorResultValue::new(Vec::new(), Vec::new()));
1333        cache.put_selector_result(key, result);
1334        assert!(cache.get_selector_result(&key).is_some());
1335    }
1336
1337    #[test]
1338    fn test_range_result_cache() {
1339        let cache = Arc::new(
1340            CacheManager::builder()
1341                .range_result_cache_size(1024 * 1024)
1342                .build(),
1343        );
1344
1345        let key = RangeScanCacheKey {
1346            region_id: RegionId::new(1, 1),
1347            row_groups: vec![(FileId::random(), 0)],
1348            scan: ScanRequestFingerprintBuilder {
1349                read_column_ids: vec![],
1350                read_column_types: vec![],
1351                filters: vec!["tag_0 = 1".to_string()],
1352                time_filters: vec![],
1353                series_row_selector: None,
1354                append_mode: false,
1355                filter_deleted: true,
1356                merge_mode: crate::region::options::MergeMode::LastRow,
1357                partition_expr_version: 0,
1358            }
1359            .build(),
1360        };
1361        let value = Arc::new(RangeScanCacheValue::new(Vec::new(), 0));
1362
1363        assert!(cache.get_range_result(&key).is_none());
1364        cache.put_range_result(key.clone(), value.clone());
1365        assert!(cache.get_range_result(&key).is_some());
1366
1367        let enable_all = CacheStrategy::EnableAll(cache.clone());
1368        assert!(enable_all.get_range_result(&key).is_some());
1369
1370        let compaction = CacheStrategy::Compaction(cache.clone());
1371        assert!(compaction.get_range_result(&key).is_none());
1372        compaction.put_range_result(key.clone(), value.clone());
1373        assert!(cache.get_range_result(&key).is_some());
1374
1375        let disabled = CacheStrategy::Disabled;
1376        assert!(disabled.get_range_result(&key).is_none());
1377        disabled.put_range_result(key.clone(), value);
1378        assert!(cache.get_range_result(&key).is_some());
1379    }
1380
1381    #[tokio::test]
1382    async fn test_evict_puffin_cache_clears_all_entries() {
1383        use std::collections::{BTreeMap, HashMap};
1384
1385        let cache = CacheManager::builder()
1386            .index_metadata_size(128)
1387            .index_content_size(128)
1388            .index_content_page_size(64)
1389            .index_result_cache_size(128)
1390            .puffin_metadata_size(128)
1391            .build();
1392        let cache = Arc::new(cache);
1393
1394        let region_id = RegionId::new(1, 1);
1395        let index_id = RegionIndexId::new(RegionFileId::new(region_id, FileId::random()), 0);
1396        let column_id: ColumnId = 1;
1397
1398        let bloom_cache = cache.bloom_filter_index_cache().unwrap().clone();
1399        let inverted_cache = cache.inverted_index_cache().unwrap().clone();
1400        let result_cache = cache.index_result_cache().unwrap();
1401        let puffin_metadata_cache = cache.puffin_metadata_cache().unwrap().clone();
1402
1403        let bloom_key = (
1404            index_id.file_id(),
1405            index_id.version,
1406            column_id,
1407            Tag::Skipping,
1408        );
1409        bloom_cache.put_metadata(bloom_key, Arc::new(BloomFilterMeta::default()));
1410        inverted_cache.put_metadata(
1411            (index_id.file_id(), index_id.version),
1412            Arc::new(InvertedIndexMetas::default()),
1413        );
1414        let predicate = PredicateKey::new_bloom(Arc::new(BTreeMap::new()));
1415        let selection = Arc::new(RowGroupSelection::default());
1416        result_cache.put(predicate.clone(), index_id.file_id(), selection);
1417        let file_id_str = index_id.to_string();
1418        let metadata = Arc::new(FileMetadata {
1419            blobs: Vec::new(),
1420            properties: HashMap::new(),
1421        });
1422        puffin_metadata_cache.put_metadata(file_id_str.clone(), metadata);
1423
1424        assert!(bloom_cache.get_metadata(bloom_key).is_some());
1425        assert!(
1426            inverted_cache
1427                .get_metadata((index_id.file_id(), index_id.version))
1428                .is_some()
1429        );
1430        assert!(result_cache.get(&predicate, index_id.file_id()).is_some());
1431        assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_some());
1432
1433        cache.evict_puffin_cache(index_id).await;
1434
1435        assert!(bloom_cache.get_metadata(bloom_key).is_none());
1436        assert!(
1437            inverted_cache
1438                .get_metadata((index_id.file_id(), index_id.version))
1439                .is_none()
1440        );
1441        assert!(result_cache.get(&predicate, index_id.file_id()).is_none());
1442        assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_none());
1443
1444        // Refill caches and evict via CacheStrategy to ensure delegation works.
1445        bloom_cache.put_metadata(bloom_key, Arc::new(BloomFilterMeta::default()));
1446        inverted_cache.put_metadata(
1447            (index_id.file_id(), index_id.version),
1448            Arc::new(InvertedIndexMetas::default()),
1449        );
1450        result_cache.put(
1451            predicate.clone(),
1452            index_id.file_id(),
1453            Arc::new(RowGroupSelection::default()),
1454        );
1455        puffin_metadata_cache.put_metadata(
1456            file_id_str.clone(),
1457            Arc::new(FileMetadata {
1458                blobs: Vec::new(),
1459                properties: HashMap::new(),
1460            }),
1461        );
1462
1463        let strategy = CacheStrategy::EnableAll(cache.clone());
1464        strategy.evict_puffin_cache(index_id).await;
1465
1466        assert!(bloom_cache.get_metadata(bloom_key).is_none());
1467        assert!(
1468            inverted_cache
1469                .get_metadata((index_id.file_id(), index_id.version))
1470                .is_none()
1471        );
1472        assert!(result_cache.get(&predicate, index_id.file_id()).is_none());
1473        assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_none());
1474    }
1475
1476    fn wide_region_metadata(column_count: u32) -> RegionMetadata {
1477        let region_id = RegionId::new(1024, 7);
1478        let mut builder = RegionMetadataBuilder::new(region_id);
1479        let mut primary_key = Vec::new();
1480
1481        for column_id in 0..column_count {
1482            let semantic_type = if column_id < 32 {
1483                primary_key.push(column_id);
1484                SemanticType::Tag
1485            } else {
1486                SemanticType::Field
1487            };
1488            let mut column_schema = ColumnSchema::new(
1489                format!("wide_column_{column_id}"),
1490                ConcreteDataType::string_datatype(),
1491                true,
1492            );
1493            column_schema
1494                .mut_metadata()
1495                .insert(format!("cache_key_{column_id}"), "cache_value".repeat(4));
1496            builder.push_column_metadata(ColumnMetadata {
1497                column_schema,
1498                semantic_type,
1499                column_id,
1500            });
1501        }
1502
1503        builder.push_column_metadata(ColumnMetadata {
1504            column_schema: ColumnSchema::new(
1505                "ts",
1506                ConcreteDataType::timestamp_millisecond_datatype(),
1507                false,
1508            ),
1509            semantic_type: SemanticType::Timestamp,
1510            column_id: column_count,
1511        });
1512        builder.primary_key(primary_key);
1513
1514        builder.build().unwrap()
1515    }
1516}