Skip to main content

mito2/
cache.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Cache for the engine.
16
17pub(crate) mod cache_size;
18
19pub(crate) mod file_cache;
20pub(crate) mod index;
21pub(crate) mod manifest_cache;
22#[cfg(test)]
23pub(crate) mod test_util;
24pub(crate) mod write_cache;
25
26use std::mem;
27use std::ops::Range;
28use std::sync::Arc;
29
30use bytes::Bytes;
31use common_base::readable_size::ReadableSize;
32use common_telemetry::warn;
33use datatypes::arrow::record_batch::RecordBatch;
34use datatypes::value::Value;
35use datatypes::vectors::VectorRef;
36use index::bloom_filter_index::{BloomFilterIndexCache, BloomFilterIndexCacheRef};
37use index::result_cache::IndexResultCache;
38use moka::notification::RemovalCause;
39use moka::sync::Cache;
40use object_store::ObjectStore;
41use parquet::file::metadata::{FileMetaData, PageIndexPolicy, ParquetMetaData};
42use puffin::puffin_manager::cache::{PuffinMetadataCache, PuffinMetadataCacheRef};
43use snafu::{OptionExt, ResultExt};
44use store_api::metadata::RegionMetadataRef;
45use store_api::storage::{ConcreteDataType, FileId, RegionId, TimeSeriesRowSelector};
46
47use crate::cache::cache_size::parquet_meta_size;
48use crate::cache::file_cache::{FileType, IndexKey};
49use crate::cache::index::inverted_index::{InvertedIndexCache, InvertedIndexCacheRef};
50#[cfg(feature = "vector_index")]
51use crate::cache::index::vector_index::{VectorIndexCache, VectorIndexCacheRef};
52use crate::cache::write_cache::WriteCacheRef;
53use crate::error::{InvalidMetadataSnafu, InvalidParquetSnafu, Result};
54use crate::memtable::record_batch_estimated_size;
55use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS};
56use crate::read::Batch;
57use crate::read::range_cache::{RangeScanCacheKey, RangeScanCacheValue};
58use crate::sst::file::{RegionFileId, RegionIndexId};
59use crate::sst::parquet::PARQUET_METADATA_KEY;
60use crate::sst::parquet::reader::MetadataCacheMetrics;
61
62/// Metrics type key for sst meta.
63const SST_META_TYPE: &str = "sst_meta";
64/// Metrics type key for vector.
65const VECTOR_TYPE: &str = "vector";
66/// Metrics type key for pages.
67const PAGE_TYPE: &str = "page";
68/// Metrics type key for files on the local store.
69const FILE_TYPE: &str = "file";
70/// Metrics type key for index files (puffin) on the local store.
71const INDEX_TYPE: &str = "index";
72/// Metrics type key for selector result cache.
73const SELECTOR_RESULT_TYPE: &str = "selector_result";
74/// Metrics type key for range scan result cache.
75const RANGE_RESULT_TYPE: &str = "range_result";
76const RANGE_RESULT_CONCAT_MEMORY_LIMIT: ReadableSize = ReadableSize::mb(512);
77const RANGE_RESULT_CONCAT_MEMORY_PERMIT: ReadableSize = ReadableSize::kb(1);
78
79#[derive(Debug)]
80pub(crate) struct RangeResultMemoryLimiter {
81    semaphore: Arc<tokio::sync::Semaphore>,
82    permit_bytes: usize,
83}
84
85impl Default for RangeResultMemoryLimiter {
86    fn default() -> Self {
87        Self::new(
88            RANGE_RESULT_CONCAT_MEMORY_LIMIT.as_bytes() as usize,
89            RANGE_RESULT_CONCAT_MEMORY_PERMIT.as_bytes() as usize,
90        )
91    }
92}
93
94impl RangeResultMemoryLimiter {
95    pub(crate) fn new(limit_bytes: usize, permit_bytes: usize) -> Self {
96        let permit_bytes = permit_bytes.max(1);
97        let permits = limit_bytes.div_ceil(permit_bytes).max(1);
98        Self {
99            semaphore: Arc::new(tokio::sync::Semaphore::new(permits)),
100            permit_bytes,
101        }
102    }
103
104    pub(crate) fn permit_bytes(&self) -> usize {
105        self.permit_bytes
106    }
107
108    pub(crate) async fn acquire(
109        &self,
110        bytes: usize,
111    ) -> std::result::Result<tokio::sync::SemaphorePermit<'_>, tokio::sync::AcquireError> {
112        let permits = bytes.div_ceil(self.permit_bytes()).max(1) as u32;
113        self.semaphore.acquire_many(permits).await
114    }
115}
116
117/// Cached SST metadata combines the parquet footer with the decoded region metadata.
118///
119/// The cached parquet footer strips the `greptime:metadata` JSON payload and stores the decoded
120/// [RegionMetadata] separately so readers can skip repeated deserialization work.
121#[derive(Debug)]
122pub(crate) struct CachedSstMeta {
123    parquet_metadata: Arc<ParquetMetaData>,
124    region_metadata: RegionMetadataRef,
125    region_metadata_weight: usize,
126}
127
128impl CachedSstMeta {
129    pub(crate) fn try_new(file_path: &str, parquet_metadata: ParquetMetaData) -> Result<Self> {
130        Self::try_new_with_region_metadata(file_path, parquet_metadata, None)
131    }
132
133    pub(crate) fn try_new_with_region_metadata(
134        file_path: &str,
135        parquet_metadata: ParquetMetaData,
136        region_metadata: Option<RegionMetadataRef>,
137    ) -> Result<Self> {
138        let file_metadata = parquet_metadata.file_metadata();
139        let key_values = file_metadata
140            .key_value_metadata()
141            .context(InvalidParquetSnafu {
142                file: file_path,
143                reason: "missing key value meta",
144            })?;
145        let meta_value = key_values
146            .iter()
147            .find(|kv| kv.key == PARQUET_METADATA_KEY)
148            .with_context(|| InvalidParquetSnafu {
149                file: file_path,
150                reason: format!("key {} not found", PARQUET_METADATA_KEY),
151            })?;
152        let json = meta_value
153            .value
154            .as_ref()
155            .with_context(|| InvalidParquetSnafu {
156                file: file_path,
157                reason: format!("No value for key {}", PARQUET_METADATA_KEY),
158            })?;
159        let region_metadata = match region_metadata {
160            Some(region_metadata) => region_metadata,
161            None => Arc::new(
162                store_api::metadata::RegionMetadata::from_json(json)
163                    .context(InvalidMetadataSnafu)?,
164            ),
165        };
166        // Keep the previous JSON-byte floor and charge the decoded structures as well.
167        let region_metadata_weight = region_metadata.estimated_size().max(json.len());
168        let parquet_metadata = Arc::new(strip_region_metadata_from_parquet(parquet_metadata));
169
170        Ok(Self {
171            parquet_metadata,
172            region_metadata,
173            region_metadata_weight,
174        })
175    }
176
177    pub(crate) fn parquet_metadata(&self) -> Arc<ParquetMetaData> {
178        self.parquet_metadata.clone()
179    }
180
181    pub(crate) fn region_metadata(&self) -> RegionMetadataRef {
182        self.region_metadata.clone()
183    }
184}
185
186fn strip_region_metadata_from_parquet(parquet_metadata: ParquetMetaData) -> ParquetMetaData {
187    let file_metadata = parquet_metadata.file_metadata();
188    let filtered_key_values = file_metadata.key_value_metadata().and_then(|key_values| {
189        let filtered = key_values
190            .iter()
191            .filter(|kv| kv.key != PARQUET_METADATA_KEY)
192            .cloned()
193            .collect::<Vec<_>>();
194        (!filtered.is_empty()).then_some(filtered)
195    });
196    let stripped_file_metadata = FileMetaData::new(
197        file_metadata.version(),
198        file_metadata.num_rows(),
199        file_metadata.created_by().map(ToString::to_string),
200        filtered_key_values,
201        file_metadata.schema_descr_ptr(),
202        file_metadata.column_orders().cloned(),
203    );
204
205    let mut builder = parquet_metadata.into_builder();
206    let row_groups = builder.take_row_groups();
207    let column_index = builder.take_column_index();
208    let offset_index = builder.take_offset_index();
209
210    parquet::file::metadata::ParquetMetaDataBuilder::new(stripped_file_metadata)
211        .set_row_groups(row_groups)
212        .set_column_index(column_index)
213        .set_offset_index(offset_index)
214        .build()
215}
216
217/// Cache strategies that may only enable a subset of caches.
218#[derive(Clone)]
219pub enum CacheStrategy {
220    /// Strategy for normal operations.
221    /// Doesn't disable any cache.
222    EnableAll(CacheManagerRef),
223    /// Strategy for compaction.
224    /// Disables some caches during compaction to avoid affecting queries.
225    /// Enables the write cache so that the compaction can read files cached
226    /// in the write cache and write the compacted files back to the write cache.
227    Compaction(CacheManagerRef),
228    /// Do not use any cache.
229    Disabled,
230}
231
232impl CacheStrategy {
233    /// Gets fused SST metadata with cache metrics tracking.
234    pub(crate) async fn get_sst_meta_data(
235        &self,
236        file_id: RegionFileId,
237        metrics: &mut MetadataCacheMetrics,
238        page_index_policy: PageIndexPolicy,
239    ) -> Option<Arc<CachedSstMeta>> {
240        match self {
241            CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => {
242                cache_manager
243                    .get_sst_meta_data(file_id, metrics, page_index_policy)
244                    .await
245            }
246            CacheStrategy::Disabled => {
247                metrics.cache_miss += 1;
248                None
249            }
250        }
251    }
252
253    /// Calls [CacheManager::get_sst_meta_data_from_mem_cache()].
254    pub(crate) fn get_sst_meta_data_from_mem_cache(
255        &self,
256        file_id: RegionFileId,
257    ) -> Option<Arc<CachedSstMeta>> {
258        match self {
259            CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => {
260                cache_manager.get_sst_meta_data_from_mem_cache(file_id)
261            }
262            CacheStrategy::Disabled => None,
263        }
264    }
265
266    /// Calls [CacheManager::get_parquet_meta_data_from_mem_cache()].
267    pub fn get_parquet_meta_data_from_mem_cache(
268        &self,
269        file_id: RegionFileId,
270    ) -> Option<Arc<ParquetMetaData>> {
271        self.get_sst_meta_data_from_mem_cache(file_id)
272            .map(|metadata| metadata.parquet_metadata())
273    }
274
275    /// Calls [CacheManager::put_sst_meta_data()].
276    pub(crate) fn put_sst_meta_data(&self, file_id: RegionFileId, metadata: Arc<CachedSstMeta>) {
277        match self {
278            CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => {
279                cache_manager.put_sst_meta_data(file_id, metadata);
280            }
281            CacheStrategy::Disabled => {}
282        }
283    }
284
285    /// Calls [CacheManager::put_parquet_meta_data()].
286    pub fn put_parquet_meta_data(
287        &self,
288        file_id: RegionFileId,
289        metadata: Arc<ParquetMetaData>,
290        region_metadata: Option<RegionMetadataRef>,
291    ) {
292        match self {
293            CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => {
294                cache_manager.put_parquet_meta_data(file_id, metadata, region_metadata);
295            }
296            CacheStrategy::Disabled => {}
297        }
298    }
299
300    /// Calls [CacheManager::remove_parquet_meta_data()].
301    pub fn remove_parquet_meta_data(&self, file_id: RegionFileId) {
302        match self {
303            CacheStrategy::EnableAll(cache_manager) => {
304                cache_manager.remove_parquet_meta_data(file_id);
305            }
306            CacheStrategy::Compaction(cache_manager) => {
307                cache_manager.remove_parquet_meta_data(file_id);
308            }
309            CacheStrategy::Disabled => {}
310        }
311    }
312
313    /// Calls [CacheManager::get_repeated_vector()].
314    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
315    pub fn get_repeated_vector(
316        &self,
317        data_type: &ConcreteDataType,
318        value: &Value,
319    ) -> Option<VectorRef> {
320        match self {
321            CacheStrategy::EnableAll(cache_manager) => {
322                cache_manager.get_repeated_vector(data_type, value)
323            }
324            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
325        }
326    }
327
328    /// Calls [CacheManager::put_repeated_vector()].
329    /// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
330    pub fn put_repeated_vector(&self, value: Value, vector: VectorRef) {
331        if let CacheStrategy::EnableAll(cache_manager) = self {
332            cache_manager.put_repeated_vector(value, vector);
333        }
334    }
335
336    /// Calls [CacheManager::get_pages()].
337    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
338    pub fn get_pages(&self, page_key: &PageKey) -> Option<Arc<PageValue>> {
339        match self {
340            CacheStrategy::EnableAll(cache_manager) => cache_manager.get_pages(page_key),
341            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
342        }
343    }
344
345    /// Calls [CacheManager::put_pages()].
346    /// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
347    pub fn put_pages(&self, page_key: PageKey, pages: Arc<PageValue>) {
348        if let CacheStrategy::EnableAll(cache_manager) = self {
349            cache_manager.put_pages(page_key, pages);
350        }
351    }
352
353    /// Calls [CacheManager::evict_puffin_cache()].
354    pub async fn evict_puffin_cache(&self, file_id: RegionIndexId) {
355        match self {
356            CacheStrategy::EnableAll(cache_manager) => {
357                cache_manager.evict_puffin_cache(file_id).await
358            }
359            CacheStrategy::Compaction(cache_manager) => {
360                cache_manager.evict_puffin_cache(file_id).await
361            }
362            CacheStrategy::Disabled => {}
363        }
364    }
365
366    /// Calls [CacheManager::get_selector_result()].
367    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
368    pub fn get_selector_result(
369        &self,
370        selector_key: &SelectorResultKey,
371    ) -> Option<Arc<SelectorResultValue>> {
372        match self {
373            CacheStrategy::EnableAll(cache_manager) => {
374                cache_manager.get_selector_result(selector_key)
375            }
376            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
377        }
378    }
379
380    /// Calls [CacheManager::put_selector_result()].
381    /// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
382    pub fn put_selector_result(
383        &self,
384        selector_key: SelectorResultKey,
385        result: Arc<SelectorResultValue>,
386    ) {
387        if let CacheStrategy::EnableAll(cache_manager) = self {
388            cache_manager.put_selector_result(selector_key, result);
389        }
390    }
391
392    /// Calls [CacheManager::get_range_result()].
393    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
394    #[allow(dead_code)]
395    pub(crate) fn get_range_result(
396        &self,
397        key: &RangeScanCacheKey,
398    ) -> Option<Arc<RangeScanCacheValue>> {
399        match self {
400            CacheStrategy::EnableAll(cache_manager) => cache_manager.get_range_result(key),
401            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
402        }
403    }
404
405    /// Calls [CacheManager::put_range_result()].
406    /// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
407    pub(crate) fn put_range_result(
408        &self,
409        key: RangeScanCacheKey,
410        result: Arc<RangeScanCacheValue>,
411    ) {
412        if let CacheStrategy::EnableAll(cache_manager) = self {
413            cache_manager.put_range_result(key, result);
414        }
415    }
416
417    /// Returns true if the range result cache is enabled.
418    pub(crate) fn has_range_result_cache(&self) -> bool {
419        match self {
420            CacheStrategy::EnableAll(cache_manager) => cache_manager.has_range_result_cache(),
421            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => false,
422        }
423    }
424
425    pub(crate) fn range_result_memory_limiter(&self) -> Option<&Arc<RangeResultMemoryLimiter>> {
426        match self {
427            CacheStrategy::EnableAll(cache_manager) => {
428                Some(cache_manager.range_result_memory_limiter())
429            }
430            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
431        }
432    }
433
434    /// Calls [CacheManager::write_cache()].
435    /// It returns None if the strategy is [CacheStrategy::Disabled].
436    pub fn write_cache(&self) -> Option<&WriteCacheRef> {
437        match self {
438            CacheStrategy::EnableAll(cache_manager) => cache_manager.write_cache(),
439            CacheStrategy::Compaction(cache_manager) => cache_manager.write_cache(),
440            CacheStrategy::Disabled => None,
441        }
442    }
443
444    /// Calls [CacheManager::index_cache()].
445    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
446    pub fn inverted_index_cache(&self) -> Option<&InvertedIndexCacheRef> {
447        match self {
448            CacheStrategy::EnableAll(cache_manager) => cache_manager.inverted_index_cache(),
449            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
450        }
451    }
452
453    /// Calls [CacheManager::bloom_filter_index_cache()].
454    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
455    pub fn bloom_filter_index_cache(&self) -> Option<&BloomFilterIndexCacheRef> {
456        match self {
457            CacheStrategy::EnableAll(cache_manager) => cache_manager.bloom_filter_index_cache(),
458            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
459        }
460    }
461
462    /// Calls [CacheManager::vector_index_cache()].
463    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
464    #[cfg(feature = "vector_index")]
465    pub fn vector_index_cache(&self) -> Option<&VectorIndexCacheRef> {
466        match self {
467            CacheStrategy::EnableAll(cache_manager) => cache_manager.vector_index_cache(),
468            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
469        }
470    }
471
472    /// Calls [CacheManager::puffin_metadata_cache()].
473    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
474    pub fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
475        match self {
476            CacheStrategy::EnableAll(cache_manager) => cache_manager.puffin_metadata_cache(),
477            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
478        }
479    }
480
481    /// Calls [CacheManager::index_result_cache()].
482    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
483    pub fn index_result_cache(&self) -> Option<&IndexResultCache> {
484        match self {
485            CacheStrategy::EnableAll(cache_manager) => cache_manager.index_result_cache(),
486            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
487        }
488    }
489
490    /// Triggers download if the strategy is [CacheStrategy::EnableAll] and write cache is available.
491    pub fn maybe_download_background(
492        &self,
493        index_key: IndexKey,
494        remote_path: String,
495        remote_store: ObjectStore,
496        file_size: u64,
497    ) {
498        if let CacheStrategy::EnableAll(cache_manager) = self
499            && let Some(write_cache) = cache_manager.write_cache()
500        {
501            write_cache.file_cache().maybe_download_background(
502                index_key,
503                remote_path,
504                remote_store,
505                file_size,
506            );
507        }
508    }
509}
510
511/// Manages cached data for the engine.
512///
513/// All caches are disabled by default.
514#[derive(Default)]
515pub struct CacheManager {
516    /// Cache for SST metadata.
517    sst_meta_cache: Option<SstMetaCache>,
518    /// Cache for vectors.
519    vector_cache: Option<VectorCache>,
520    /// Cache for SST pages.
521    page_cache: Option<PageCache>,
522    /// A Cache for writing files to object stores.
523    write_cache: Option<WriteCacheRef>,
524    /// Cache for inverted index.
525    inverted_index_cache: Option<InvertedIndexCacheRef>,
526    /// Cache for bloom filter index.
527    bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
528    /// Cache for vector index.
529    #[cfg(feature = "vector_index")]
530    vector_index_cache: Option<VectorIndexCacheRef>,
531    /// Puffin metadata cache.
532    puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
533    /// Cache for time series selectors.
534    selector_result_cache: Option<SelectorResultCache>,
535    /// Cache for range scan outputs in flat format.
536    range_result_cache: Option<RangeResultCache>,
537    /// Shared memory limiter for async range-result cache tasks.
538    range_result_memory_limiter: Arc<RangeResultMemoryLimiter>,
539    /// Cache for index result.
540    index_result_cache: Option<IndexResultCache>,
541}
542
543pub type CacheManagerRef = Arc<CacheManager>;
544
545impl CacheManager {
546    /// Returns a builder to build the cache.
547    pub fn builder() -> CacheManagerBuilder {
548        CacheManagerBuilder::default()
549    }
550
551    /// Gets fused SST metadata with metrics tracking.
552    /// Tries in-memory cache first, then file cache, updating metrics accordingly.
553    pub(crate) async fn get_sst_meta_data(
554        &self,
555        file_id: RegionFileId,
556        metrics: &mut MetadataCacheMetrics,
557        page_index_policy: PageIndexPolicy,
558    ) -> Option<Arc<CachedSstMeta>> {
559        if let Some(metadata) = self.get_sst_meta_data_from_mem_cache(file_id) {
560            metrics.mem_cache_hit += 1;
561            return Some(metadata);
562        }
563
564        let key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Parquet);
565        if let Some(write_cache) = &self.write_cache
566            && let Some(metadata) = write_cache
567                .file_cache()
568                .get_sst_meta_data(key, metrics, page_index_policy)
569                .await
570        {
571            metrics.file_cache_hit += 1;
572            self.put_sst_meta_data(file_id, metadata.clone());
573            return Some(metadata);
574        }
575
576        metrics.cache_miss += 1;
577        None
578    }
579
580    /// Gets cached [ParquetMetaData] with metrics tracking.
581    /// Tries in-memory cache first, then file cache, updating metrics accordingly.
582    pub(crate) async fn get_parquet_meta_data(
583        &self,
584        file_id: RegionFileId,
585        metrics: &mut MetadataCacheMetrics,
586        page_index_policy: PageIndexPolicy,
587    ) -> Option<Arc<ParquetMetaData>> {
588        self.get_sst_meta_data(file_id, metrics, page_index_policy)
589            .await
590            .map(|metadata| metadata.parquet_metadata())
591    }
592
593    /// Gets cached fused SST metadata from in-memory cache.
594    /// This method does not perform I/O.
595    pub(crate) fn get_sst_meta_data_from_mem_cache(
596        &self,
597        file_id: RegionFileId,
598    ) -> Option<Arc<CachedSstMeta>> {
599        self.sst_meta_cache.as_ref().and_then(|sst_meta_cache| {
600            let value = sst_meta_cache.get(&SstMetaKey(file_id.region_id(), file_id.file_id()));
601            update_hit_miss(value, SST_META_TYPE)
602        })
603    }
604
605    /// Gets cached [ParquetMetaData] from in-memory cache.
606    /// This method does not perform I/O.
607    pub fn get_parquet_meta_data_from_mem_cache(
608        &self,
609        file_id: RegionFileId,
610    ) -> Option<Arc<ParquetMetaData>> {
611        self.get_sst_meta_data_from_mem_cache(file_id)
612            .map(|metadata| metadata.parquet_metadata())
613    }
614
615    /// Puts fused SST metadata into the cache.
616    pub(crate) fn put_sst_meta_data(&self, file_id: RegionFileId, metadata: Arc<CachedSstMeta>) {
617        if let Some(cache) = &self.sst_meta_cache {
618            let key = SstMetaKey(file_id.region_id(), file_id.file_id());
619            CACHE_BYTES
620                .with_label_values(&[SST_META_TYPE])
621                .add(meta_cache_weight(&key, &metadata).into());
622            cache.insert(key, metadata);
623        }
624    }
625
626    /// Puts [ParquetMetaData] into the cache.
627    pub fn put_parquet_meta_data(
628        &self,
629        file_id: RegionFileId,
630        metadata: Arc<ParquetMetaData>,
631        region_metadata: Option<RegionMetadataRef>,
632    ) {
633        if self.sst_meta_cache.is_some() {
634            let file_path = format!(
635                "region_id={}, file_id={}",
636                file_id.region_id(),
637                file_id.file_id()
638            );
639            match CachedSstMeta::try_new_with_region_metadata(
640                &file_path,
641                Arc::unwrap_or_clone(metadata),
642                region_metadata,
643            ) {
644                Ok(metadata) => self.put_sst_meta_data(file_id, Arc::new(metadata)),
645                Err(err) => warn!(
646                    err; "Failed to decode region metadata while caching parquet metadata, region_id: {}, file_id: {}",
647                    file_id.region_id(),
648                    file_id.file_id()
649                ),
650            }
651        }
652    }
653
654    /// Removes [ParquetMetaData] from the cache.
655    pub fn remove_parquet_meta_data(&self, file_id: RegionFileId) {
656        if let Some(cache) = &self.sst_meta_cache {
657            cache.remove(&SstMetaKey(file_id.region_id(), file_id.file_id()));
658        }
659    }
660
661    /// Returns the total weighted size of the in-memory SST meta cache.
662    pub(crate) fn sst_meta_cache_weighted_size(&self) -> u64 {
663        self.sst_meta_cache
664            .as_ref()
665            .map(|cache| cache.weighted_size())
666            .unwrap_or(0)
667    }
668
669    /// Returns true if the in-memory SST meta cache is enabled.
670    pub(crate) fn sst_meta_cache_enabled(&self) -> bool {
671        self.sst_meta_cache.is_some()
672    }
673
674    /// Gets a vector with repeated value for specific `key`.
675    pub fn get_repeated_vector(
676        &self,
677        data_type: &ConcreteDataType,
678        value: &Value,
679    ) -> Option<VectorRef> {
680        self.vector_cache.as_ref().and_then(|vector_cache| {
681            let value = vector_cache.get(&(data_type.clone(), value.clone()));
682            update_hit_miss(value, VECTOR_TYPE)
683        })
684    }
685
686    /// Puts a vector with repeated value into the cache.
687    pub fn put_repeated_vector(&self, value: Value, vector: VectorRef) {
688        if let Some(cache) = &self.vector_cache {
689            let key = (vector.data_type(), value);
690            CACHE_BYTES
691                .with_label_values(&[VECTOR_TYPE])
692                .add(vector_cache_weight(&key, &vector).into());
693            cache.insert(key, vector);
694        }
695    }
696
697    /// Gets pages for the row group.
698    pub fn get_pages(&self, page_key: &PageKey) -> Option<Arc<PageValue>> {
699        self.page_cache.as_ref().and_then(|page_cache| {
700            let value = page_cache.get(page_key);
701            update_hit_miss(value, PAGE_TYPE)
702        })
703    }
704
705    /// Puts pages of the row group into the cache.
706    pub fn put_pages(&self, page_key: PageKey, pages: Arc<PageValue>) {
707        if let Some(cache) = &self.page_cache {
708            CACHE_BYTES
709                .with_label_values(&[PAGE_TYPE])
710                .add(page_cache_weight(&page_key, &pages).into());
711            cache.insert(page_key, pages);
712        }
713    }
714
715    /// Evicts every puffin-related cache entry for the given file.
716    pub async fn evict_puffin_cache(&self, file_id: RegionIndexId) {
717        if let Some(cache) = &self.bloom_filter_index_cache {
718            cache.invalidate_file(file_id.file_id());
719        }
720
721        if let Some(cache) = &self.inverted_index_cache {
722            cache.invalidate_file(file_id.file_id());
723        }
724
725        if let Some(cache) = &self.index_result_cache {
726            cache.invalidate_file(file_id.file_id());
727        }
728
729        #[cfg(feature = "vector_index")]
730        if let Some(cache) = &self.vector_index_cache {
731            cache.invalidate_file(file_id.file_id());
732        }
733
734        if let Some(cache) = &self.puffin_metadata_cache {
735            cache.remove(&file_id.to_string());
736        }
737
738        if let Some(write_cache) = &self.write_cache {
739            write_cache
740                .remove(IndexKey::new(
741                    file_id.region_id(),
742                    file_id.file_id(),
743                    FileType::Puffin(file_id.version),
744                ))
745                .await;
746        }
747    }
748
749    /// Gets result of for the selector.
750    pub fn get_selector_result(
751        &self,
752        selector_key: &SelectorResultKey,
753    ) -> Option<Arc<SelectorResultValue>> {
754        self.selector_result_cache
755            .as_ref()
756            .and_then(|selector_result_cache| selector_result_cache.get(selector_key))
757    }
758
759    /// Puts result of the selector into the cache.
760    pub fn put_selector_result(
761        &self,
762        selector_key: SelectorResultKey,
763        result: Arc<SelectorResultValue>,
764    ) {
765        if let Some(cache) = &self.selector_result_cache {
766            CACHE_BYTES
767                .with_label_values(&[SELECTOR_RESULT_TYPE])
768                .add(selector_result_cache_weight(&selector_key, &result).into());
769            cache.insert(selector_key, result);
770        }
771    }
772
773    /// Gets cached result for range scan.
774    #[allow(dead_code)]
775    pub(crate) fn get_range_result(
776        &self,
777        key: &RangeScanCacheKey,
778    ) -> Option<Arc<RangeScanCacheValue>> {
779        self.range_result_cache
780            .as_ref()
781            .and_then(|cache| update_hit_miss(cache.get(key), RANGE_RESULT_TYPE))
782    }
783
784    /// Puts range scan result into cache.
785    pub(crate) fn put_range_result(
786        &self,
787        key: RangeScanCacheKey,
788        result: Arc<RangeScanCacheValue>,
789    ) {
790        if let Some(cache) = &self.range_result_cache {
791            CACHE_BYTES
792                .with_label_values(&[RANGE_RESULT_TYPE])
793                .add(range_result_cache_weight(&key, &result).into());
794            cache.insert(key, result);
795        }
796    }
797
798    /// Returns true if the range result cache is enabled.
799    pub(crate) fn has_range_result_cache(&self) -> bool {
800        self.range_result_cache.is_some()
801    }
802
803    pub(crate) fn range_result_memory_limiter(&self) -> &Arc<RangeResultMemoryLimiter> {
804        &self.range_result_memory_limiter
805    }
806
807    /// Gets the write cache.
808    pub(crate) fn write_cache(&self) -> Option<&WriteCacheRef> {
809        self.write_cache.as_ref()
810    }
811
812    pub(crate) fn inverted_index_cache(&self) -> Option<&InvertedIndexCacheRef> {
813        self.inverted_index_cache.as_ref()
814    }
815
816    pub(crate) fn bloom_filter_index_cache(&self) -> Option<&BloomFilterIndexCacheRef> {
817        self.bloom_filter_index_cache.as_ref()
818    }
819
820    #[cfg(feature = "vector_index")]
821    pub(crate) fn vector_index_cache(&self) -> Option<&VectorIndexCacheRef> {
822        self.vector_index_cache.as_ref()
823    }
824
825    pub(crate) fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
826        self.puffin_metadata_cache.as_ref()
827    }
828
829    pub(crate) fn index_result_cache(&self) -> Option<&IndexResultCache> {
830        self.index_result_cache.as_ref()
831    }
832}
833
834/// Increases selector cache miss metrics.
835pub fn selector_result_cache_miss() {
836    CACHE_MISS.with_label_values(&[SELECTOR_RESULT_TYPE]).inc()
837}
838
839/// Increases selector cache hit metrics.
840pub fn selector_result_cache_hit() {
841    CACHE_HIT.with_label_values(&[SELECTOR_RESULT_TYPE]).inc()
842}
843
844/// Builder to construct a [CacheManager].
845#[derive(Default)]
846pub struct CacheManagerBuilder {
847    sst_meta_cache_size: u64,
848    vector_cache_size: u64,
849    page_cache_size: u64,
850    index_metadata_size: u64,
851    index_content_size: u64,
852    index_content_page_size: u64,
853    index_result_cache_size: u64,
854    puffin_metadata_size: u64,
855    write_cache: Option<WriteCacheRef>,
856    selector_result_cache_size: u64,
857    range_result_cache_size: u64,
858}
859
860impl CacheManagerBuilder {
861    /// Sets meta cache size.
862    pub fn sst_meta_cache_size(mut self, bytes: u64) -> Self {
863        self.sst_meta_cache_size = bytes;
864        self
865    }
866
867    /// Sets vector cache size.
868    pub fn vector_cache_size(mut self, bytes: u64) -> Self {
869        self.vector_cache_size = bytes;
870        self
871    }
872
873    /// Sets page cache size.
874    pub fn page_cache_size(mut self, bytes: u64) -> Self {
875        self.page_cache_size = bytes;
876        self
877    }
878
879    /// Sets write cache.
880    pub fn write_cache(mut self, cache: Option<WriteCacheRef>) -> Self {
881        self.write_cache = cache;
882        self
883    }
884
885    /// Sets cache size for index metadata.
886    pub fn index_metadata_size(mut self, bytes: u64) -> Self {
887        self.index_metadata_size = bytes;
888        self
889    }
890
891    /// Sets cache size for index content.
892    pub fn index_content_size(mut self, bytes: u64) -> Self {
893        self.index_content_size = bytes;
894        self
895    }
896
897    /// Sets page size for index content.
898    pub fn index_content_page_size(mut self, bytes: u64) -> Self {
899        self.index_content_page_size = bytes;
900        self
901    }
902
903    /// Sets cache size for index result.
904    pub fn index_result_cache_size(mut self, bytes: u64) -> Self {
905        self.index_result_cache_size = bytes;
906        self
907    }
908
909    /// Sets cache size for puffin metadata.
910    pub fn puffin_metadata_size(mut self, bytes: u64) -> Self {
911        self.puffin_metadata_size = bytes;
912        self
913    }
914
915    /// Sets selector result cache size.
916    pub fn selector_result_cache_size(mut self, bytes: u64) -> Self {
917        self.selector_result_cache_size = bytes;
918        self
919    }
920
921    /// Sets range result cache size.
922    pub fn range_result_cache_size(mut self, bytes: u64) -> Self {
923        self.range_result_cache_size = bytes;
924        self
925    }
926
927    /// Builds the [CacheManager].
928    pub fn build(self) -> CacheManager {
929        fn to_str(cause: RemovalCause) -> &'static str {
930            match cause {
931                RemovalCause::Expired => "expired",
932                RemovalCause::Explicit => "explicit",
933                RemovalCause::Replaced => "replaced",
934                RemovalCause::Size => "size",
935            }
936        }
937
938        let sst_meta_cache = (self.sst_meta_cache_size != 0).then(|| {
939            Cache::builder()
940                .max_capacity(self.sst_meta_cache_size)
941                .weigher(meta_cache_weight)
942                .eviction_listener(|k, v, cause| {
943                    let size = meta_cache_weight(&k, &v);
944                    CACHE_BYTES
945                        .with_label_values(&[SST_META_TYPE])
946                        .sub(size.into());
947                    CACHE_EVICTION
948                        .with_label_values(&[SST_META_TYPE, to_str(cause)])
949                        .inc();
950                })
951                .build()
952        });
953        let vector_cache = (self.vector_cache_size != 0).then(|| {
954            Cache::builder()
955                .max_capacity(self.vector_cache_size)
956                .weigher(vector_cache_weight)
957                .eviction_listener(|k, v, cause| {
958                    let size = vector_cache_weight(&k, &v);
959                    CACHE_BYTES
960                        .with_label_values(&[VECTOR_TYPE])
961                        .sub(size.into());
962                    CACHE_EVICTION
963                        .with_label_values(&[VECTOR_TYPE, to_str(cause)])
964                        .inc();
965                })
966                .build()
967        });
968        let page_cache = (self.page_cache_size != 0).then(|| {
969            Cache::builder()
970                .max_capacity(self.page_cache_size)
971                .weigher(page_cache_weight)
972                .eviction_listener(|k, v, cause| {
973                    let size = page_cache_weight(&k, &v);
974                    CACHE_BYTES.with_label_values(&[PAGE_TYPE]).sub(size.into());
975                    CACHE_EVICTION
976                        .with_label_values(&[PAGE_TYPE, to_str(cause)])
977                        .inc();
978                })
979                .build()
980        });
981        let inverted_index_cache = InvertedIndexCache::new(
982            self.index_metadata_size,
983            self.index_content_size,
984            self.index_content_page_size,
985        );
986        // TODO(ruihang): check if it's ok to reuse the same param with inverted index
987        let bloom_filter_index_cache = BloomFilterIndexCache::new(
988            self.index_metadata_size,
989            self.index_content_size,
990            self.index_content_page_size,
991        );
992        #[cfg(feature = "vector_index")]
993        let vector_index_cache = (self.index_content_size != 0)
994            .then(|| Arc::new(VectorIndexCache::new(self.index_content_size)));
995        let index_result_cache = (self.index_result_cache_size != 0)
996            .then(|| IndexResultCache::new(self.index_result_cache_size));
997        let puffin_metadata_cache =
998            PuffinMetadataCache::new(self.puffin_metadata_size, &CACHE_BYTES);
999        let selector_result_cache = (self.selector_result_cache_size != 0).then(|| {
1000            Cache::builder()
1001                .max_capacity(self.selector_result_cache_size)
1002                .weigher(selector_result_cache_weight)
1003                .eviction_listener(|k, v, cause| {
1004                    let size = selector_result_cache_weight(&k, &v);
1005                    CACHE_BYTES
1006                        .with_label_values(&[SELECTOR_RESULT_TYPE])
1007                        .sub(size.into());
1008                    CACHE_EVICTION
1009                        .with_label_values(&[SELECTOR_RESULT_TYPE, to_str(cause)])
1010                        .inc();
1011                })
1012                .build()
1013        });
1014        let range_result_cache = (self.range_result_cache_size != 0).then(|| {
1015            Cache::builder()
1016                .max_capacity(self.range_result_cache_size)
1017                .weigher(range_result_cache_weight)
1018                .eviction_listener(move |k, v, cause| {
1019                    let size = range_result_cache_weight(&k, &v);
1020                    CACHE_BYTES
1021                        .with_label_values(&[RANGE_RESULT_TYPE])
1022                        .sub(size.into());
1023                    CACHE_EVICTION
1024                        .with_label_values(&[RANGE_RESULT_TYPE, to_str(cause)])
1025                        .inc();
1026                })
1027                .build()
1028        });
1029        CacheManager {
1030            sst_meta_cache,
1031            vector_cache,
1032            page_cache,
1033            write_cache: self.write_cache,
1034            inverted_index_cache: Some(Arc::new(inverted_index_cache)),
1035            bloom_filter_index_cache: Some(Arc::new(bloom_filter_index_cache)),
1036            #[cfg(feature = "vector_index")]
1037            vector_index_cache,
1038            puffin_metadata_cache: Some(Arc::new(puffin_metadata_cache)),
1039            selector_result_cache,
1040            range_result_cache,
1041            range_result_memory_limiter: Arc::new(RangeResultMemoryLimiter::default()),
1042            index_result_cache,
1043        }
1044    }
1045}
1046
1047fn meta_cache_weight(k: &SstMetaKey, v: &Arc<CachedSstMeta>) -> u32 {
1048    // We ignore the size of `Arc`.
1049    (k.estimated_size() + parquet_meta_size(&v.parquet_metadata) + v.region_metadata_weight) as u32
1050}
1051
1052fn vector_cache_weight(_k: &(ConcreteDataType, Value), v: &VectorRef) -> u32 {
1053    // We ignore the heap size of `Value`.
1054    (mem::size_of::<ConcreteDataType>() + mem::size_of::<Value>() + v.memory_size()) as u32
1055}
1056
1057fn page_cache_weight(k: &PageKey, v: &Arc<PageValue>) -> u32 {
1058    (k.estimated_size() + v.estimated_size()) as u32
1059}
1060
1061fn selector_result_cache_weight(k: &SelectorResultKey, v: &Arc<SelectorResultValue>) -> u32 {
1062    (mem::size_of_val(k) + v.estimated_size()) as u32
1063}
1064
1065fn range_result_cache_weight(k: &RangeScanCacheKey, v: &Arc<RangeScanCacheValue>) -> u32 {
1066    (k.estimated_size() + v.estimated_size()) as u32
1067}
1068
1069/// Updates cache hit/miss metrics.
1070fn update_hit_miss<T>(value: Option<T>, cache_type: &str) -> Option<T> {
1071    if value.is_some() {
1072        CACHE_HIT.with_label_values(&[cache_type]).inc();
1073    } else {
1074        CACHE_MISS.with_label_values(&[cache_type]).inc();
1075    }
1076    value
1077}
1078
1079/// Cache key (region id, file id) for SST meta.
1080#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1081struct SstMetaKey(RegionId, FileId);
1082
1083impl SstMetaKey {
1084    /// Returns memory used by the key (estimated).
1085    fn estimated_size(&self) -> usize {
1086        mem::size_of::<Self>()
1087    }
1088}
1089
1090/// Path to column pages in the SST file.
1091#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1092pub struct ColumnPagePath {
1093    /// Region id of the SST file to cache.
1094    region_id: RegionId,
1095    /// Id of the SST file to cache.
1096    file_id: FileId,
1097    /// Index of the row group.
1098    row_group_idx: usize,
1099    /// Index of the column in the row group.
1100    column_idx: usize,
1101}
1102
1103/// Cache key to pages in a row group (after projection).
1104///
1105/// Different projections will have different cache keys.
1106/// We cache all ranges together because they may refer to the same `Bytes`.
1107#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1108pub struct PageKey {
1109    /// Id of the SST file to cache.
1110    file_id: FileId,
1111    /// Index of the row group.
1112    row_group_idx: usize,
1113    /// Byte ranges of the pages to cache.
1114    ranges: Vec<Range<u64>>,
1115}
1116
1117impl PageKey {
1118    /// Creates a key for a list of pages.
1119    pub fn new(file_id: FileId, row_group_idx: usize, ranges: Vec<Range<u64>>) -> PageKey {
1120        PageKey {
1121            file_id,
1122            row_group_idx,
1123            ranges,
1124        }
1125    }
1126
1127    /// Returns memory used by the key (estimated).
1128    fn estimated_size(&self) -> usize {
1129        mem::size_of::<Self>() + mem::size_of_val(self.ranges.as_slice())
1130    }
1131}
1132
1133/// Cached row group pages for a column.
1134// We don't use enum here to make it easier to mock and use the struct.
1135#[derive(Default)]
1136pub struct PageValue {
1137    /// Compressed page in the row group.
1138    pub compressed: Vec<Bytes>,
1139    /// Total size of the pages (may be larger than sum of compressed bytes due to gaps).
1140    pub page_size: u64,
1141}
1142
1143impl PageValue {
1144    /// Creates a new value from a range of compressed pages.
1145    pub fn new(bytes: Vec<Bytes>, page_size: u64) -> PageValue {
1146        PageValue {
1147            compressed: bytes,
1148            page_size,
1149        }
1150    }
1151
1152    /// Returns memory used by the value (estimated).
1153    fn estimated_size(&self) -> usize {
1154        mem::size_of::<Self>()
1155            + self.page_size as usize
1156            + self.compressed.iter().map(mem::size_of_val).sum::<usize>()
1157    }
1158}
1159
1160/// Cache key for time series row selector result.
1161#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1162pub struct SelectorResultKey {
1163    /// Id of the SST file.
1164    pub file_id: FileId,
1165    /// Index of the row group.
1166    pub row_group_idx: usize,
1167    /// Time series row selector.
1168    pub selector: TimeSeriesRowSelector,
1169}
1170
1171/// Result stored in the selector result cache.
1172pub enum SelectorResult {
1173    /// Batches in the primary key format.
1174    PrimaryKey(Vec<Batch>),
1175    /// Record batches in the flat format.
1176    Flat(Vec<RecordBatch>),
1177}
1178
1179/// Cached result for time series row selector.
1180pub struct SelectorResultValue {
1181    /// Batches of rows selected by the selector.
1182    pub result: SelectorResult,
1183    /// Projection of rows.
1184    pub projection: Vec<usize>,
1185}
1186
1187impl SelectorResultValue {
1188    /// Creates a new selector result value with primary key format.
1189    pub fn new(result: Vec<Batch>, projection: Vec<usize>) -> SelectorResultValue {
1190        SelectorResultValue {
1191            result: SelectorResult::PrimaryKey(result),
1192            projection,
1193        }
1194    }
1195
1196    /// Creates a new selector result value with flat format.
1197    pub fn new_flat(result: Vec<RecordBatch>, projection: Vec<usize>) -> SelectorResultValue {
1198        SelectorResultValue {
1199            result: SelectorResult::Flat(result),
1200            projection,
1201        }
1202    }
1203
1204    /// Returns memory used by the value (estimated).
1205    fn estimated_size(&self) -> usize {
1206        match &self.result {
1207            SelectorResult::PrimaryKey(batches) => {
1208                batches.iter().map(|batch| batch.memory_size()).sum()
1209            }
1210            SelectorResult::Flat(batches) => batches.iter().map(record_batch_estimated_size).sum(),
1211        }
1212    }
1213}
1214
1215/// Maps (region id, file id) to fused SST metadata.
1216type SstMetaCache = Cache<SstMetaKey, Arc<CachedSstMeta>>;
1217/// Maps [Value] to a vector that holds this value repeatedly.
1218///
1219/// e.g. `"hello" => ["hello", "hello", "hello"]`
1220type VectorCache = Cache<(ConcreteDataType, Value), VectorRef>;
1221/// Maps (region, file, row group, column) to [PageValue].
1222type PageCache = Cache<PageKey, Arc<PageValue>>;
1223/// Maps (file id, row group id, time series row selector) to [SelectorResultValue].
1224type SelectorResultCache = Cache<SelectorResultKey, Arc<SelectorResultValue>>;
1225/// Maps partition-range scan key to cached flat batches.
1226type RangeResultCache = Cache<RangeScanCacheKey, Arc<RangeScanCacheValue>>;
1227
1228#[cfg(test)]
1229mod tests {
1230    use std::sync::Arc;
1231
1232    use api::v1::SemanticType;
1233    use api::v1::index::{BloomFilterMeta, InvertedIndexMetas};
1234    use datatypes::schema::ColumnSchema;
1235    use datatypes::vectors::Int64Vector;
1236    use puffin::file_metadata::FileMetadata;
1237    use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
1238    use store_api::storage::ColumnId;
1239
1240    use super::*;
1241    use crate::cache::index::bloom_filter_index::Tag;
1242    use crate::cache::index::result_cache::PredicateKey;
1243    use crate::cache::test_util::{
1244        parquet_meta, sst_parquet_meta, sst_parquet_meta_with_region_metadata,
1245    };
1246    use crate::read::range_cache::{
1247        RangeScanCacheKey, RangeScanCacheValue, ScanRequestFingerprintBuilder,
1248    };
1249    use crate::sst::parquet::row_selection::RowGroupSelection;
1250
1251    #[tokio::test]
1252    async fn test_disable_cache() {
1253        let cache = CacheManager::default();
1254        assert!(cache.sst_meta_cache.is_none());
1255        assert!(cache.vector_cache.is_none());
1256        assert!(cache.page_cache.is_none());
1257
1258        let region_id = RegionId::new(1, 1);
1259        let file_id = RegionFileId::new(region_id, FileId::random());
1260        let metadata = parquet_meta();
1261        let mut metrics = MetadataCacheMetrics::default();
1262        cache.put_parquet_meta_data(file_id, metadata, None);
1263        assert!(
1264            cache
1265                .get_parquet_meta_data(file_id, &mut metrics, Default::default())
1266                .await
1267                .is_none()
1268        );
1269
1270        let value = Value::Int64(10);
1271        let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10]));
1272        cache.put_repeated_vector(value.clone(), vector.clone());
1273        assert!(
1274            cache
1275                .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
1276                .is_none()
1277        );
1278
1279        let key = PageKey::new(file_id.file_id(), 1, vec![Range { start: 0, end: 5 }]);
1280        let pages = Arc::new(PageValue::default());
1281        cache.put_pages(key.clone(), pages);
1282        assert!(cache.get_pages(&key).is_none());
1283
1284        assert!(cache.write_cache().is_none());
1285    }
1286
1287    #[tokio::test]
1288    async fn test_parquet_meta_cache() {
1289        let cache = CacheManager::builder().sst_meta_cache_size(2000).build();
1290        let mut metrics = MetadataCacheMetrics::default();
1291        let region_id = RegionId::new(1, 1);
1292        let file_id = RegionFileId::new(region_id, FileId::random());
1293        assert!(
1294            cache
1295                .get_parquet_meta_data(file_id, &mut metrics, Default::default())
1296                .await
1297                .is_none()
1298        );
1299        let (metadata, region_metadata) = sst_parquet_meta();
1300        cache.put_parquet_meta_data(file_id, metadata, None);
1301        let cached = cache
1302            .get_sst_meta_data(file_id, &mut metrics, Default::default())
1303            .await
1304            .unwrap();
1305        assert_eq!(region_metadata, cached.region_metadata());
1306        assert!(
1307            cached
1308                .parquet_metadata()
1309                .file_metadata()
1310                .key_value_metadata()
1311                .is_none_or(|key_values| {
1312                    key_values
1313                        .iter()
1314                        .all(|key_value| key_value.key != PARQUET_METADATA_KEY)
1315                })
1316        );
1317        cache.remove_parquet_meta_data(file_id);
1318        assert!(
1319            cache
1320                .get_parquet_meta_data(file_id, &mut metrics, Default::default())
1321                .await
1322                .is_none()
1323        );
1324    }
1325
1326    #[tokio::test]
1327    async fn test_parquet_meta_cache_with_provided_region_metadata() {
1328        let cache = CacheManager::builder().sst_meta_cache_size(2000).build();
1329        let mut metrics = MetadataCacheMetrics::default();
1330        let region_id = RegionId::new(1, 1);
1331        let file_id = RegionFileId::new(region_id, FileId::random());
1332        let (metadata, region_metadata) = sst_parquet_meta();
1333
1334        cache.put_parquet_meta_data(file_id, metadata, Some(region_metadata.clone()));
1335
1336        let cached = cache
1337            .get_sst_meta_data(file_id, &mut metrics, Default::default())
1338            .await
1339            .unwrap();
1340        assert!(Arc::ptr_eq(&region_metadata, &cached.region_metadata()));
1341    }
1342
1343    #[test]
1344    fn test_meta_cache_weight_accounts_for_decoded_region_metadata() {
1345        let region_metadata = Arc::new(wide_region_metadata(128));
1346        let json_len = region_metadata.to_json().unwrap().len();
1347        let metadata = sst_parquet_meta_with_region_metadata(region_metadata.clone());
1348        let cached = Arc::new(
1349            CachedSstMeta::try_new("test.parquet", Arc::unwrap_or_clone(metadata)).unwrap(),
1350        );
1351        let key = SstMetaKey(region_metadata.region_id, FileId::random());
1352
1353        assert!(cached.region_metadata_weight > json_len);
1354        assert_eq!(
1355            meta_cache_weight(&key, &cached) as usize,
1356            key.estimated_size()
1357                + parquet_meta_size(&cached.parquet_metadata)
1358                + cached.region_metadata_weight
1359        );
1360    }
1361
1362    #[test]
1363    fn test_repeated_vector_cache() {
1364        let cache = CacheManager::builder().vector_cache_size(4096).build();
1365        let value = Value::Int64(10);
1366        assert!(
1367            cache
1368                .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
1369                .is_none()
1370        );
1371        let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10]));
1372        cache.put_repeated_vector(value.clone(), vector.clone());
1373        let cached = cache
1374            .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
1375            .unwrap();
1376        assert_eq!(vector, cached);
1377    }
1378
1379    #[test]
1380    fn test_page_cache() {
1381        let cache = CacheManager::builder().page_cache_size(1000).build();
1382        let file_id = FileId::random();
1383        let key = PageKey::new(file_id, 0, vec![(0..10), (10..20)]);
1384        assert!(cache.get_pages(&key).is_none());
1385        let pages = Arc::new(PageValue::default());
1386        cache.put_pages(key.clone(), pages);
1387        assert!(cache.get_pages(&key).is_some());
1388    }
1389
1390    #[test]
1391    fn test_selector_result_cache() {
1392        let cache = CacheManager::builder()
1393            .selector_result_cache_size(1000)
1394            .build();
1395        let file_id = FileId::random();
1396        let key = SelectorResultKey {
1397            file_id,
1398            row_group_idx: 0,
1399            selector: TimeSeriesRowSelector::LastRow,
1400        };
1401        assert!(cache.get_selector_result(&key).is_none());
1402        let result = Arc::new(SelectorResultValue::new(Vec::new(), Vec::new()));
1403        cache.put_selector_result(key, result);
1404        assert!(cache.get_selector_result(&key).is_some());
1405    }
1406
1407    #[test]
1408    fn test_range_result_cache() {
1409        let cache = Arc::new(
1410            CacheManager::builder()
1411                .range_result_cache_size(1024 * 1024)
1412                .build(),
1413        );
1414
1415        let key = RangeScanCacheKey {
1416            region_id: RegionId::new(1, 1),
1417            row_groups: vec![(FileId::random(), 0)],
1418            scan: ScanRequestFingerprintBuilder {
1419                read_column_ids: vec![],
1420                read_column_types: vec![],
1421                filters: vec!["tag_0 = 1".to_string()],
1422                time_filters: vec![],
1423                series_row_selector: None,
1424                append_mode: false,
1425                filter_deleted: true,
1426                merge_mode: crate::region::options::MergeMode::LastRow,
1427                partition_expr_version: 0,
1428            }
1429            .build(),
1430        };
1431        let value = Arc::new(RangeScanCacheValue::new(Vec::new(), 0));
1432
1433        assert!(cache.get_range_result(&key).is_none());
1434        cache.put_range_result(key.clone(), value.clone());
1435        assert!(cache.get_range_result(&key).is_some());
1436
1437        let enable_all = CacheStrategy::EnableAll(cache.clone());
1438        assert!(enable_all.get_range_result(&key).is_some());
1439
1440        let compaction = CacheStrategy::Compaction(cache.clone());
1441        assert!(compaction.get_range_result(&key).is_none());
1442        compaction.put_range_result(key.clone(), value.clone());
1443        assert!(cache.get_range_result(&key).is_some());
1444
1445        let disabled = CacheStrategy::Disabled;
1446        assert!(disabled.get_range_result(&key).is_none());
1447        disabled.put_range_result(key.clone(), value);
1448        assert!(cache.get_range_result(&key).is_some());
1449    }
1450
1451    #[tokio::test]
1452    async fn test_evict_puffin_cache_clears_all_entries() {
1453        use std::collections::{BTreeMap, HashMap};
1454
1455        let cache = CacheManager::builder()
1456            .index_metadata_size(128)
1457            .index_content_size(128)
1458            .index_content_page_size(64)
1459            .index_result_cache_size(128)
1460            .puffin_metadata_size(128)
1461            .build();
1462        let cache = Arc::new(cache);
1463
1464        let region_id = RegionId::new(1, 1);
1465        let index_id = RegionIndexId::new(RegionFileId::new(region_id, FileId::random()), 0);
1466        let column_id: ColumnId = 1;
1467
1468        let bloom_cache = cache.bloom_filter_index_cache().unwrap().clone();
1469        let inverted_cache = cache.inverted_index_cache().unwrap().clone();
1470        let result_cache = cache.index_result_cache().unwrap();
1471        let puffin_metadata_cache = cache.puffin_metadata_cache().unwrap().clone();
1472
1473        let bloom_key = (
1474            index_id.file_id(),
1475            index_id.version,
1476            column_id,
1477            Tag::Skipping,
1478        );
1479        bloom_cache.put_metadata(bloom_key, Arc::new(BloomFilterMeta::default()));
1480        inverted_cache.put_metadata(
1481            (index_id.file_id(), index_id.version),
1482            Arc::new(InvertedIndexMetas::default()),
1483        );
1484        let predicate = PredicateKey::new_bloom(Arc::new(BTreeMap::new()));
1485        let selection = Arc::new(RowGroupSelection::default());
1486        result_cache.put(predicate.clone(), index_id.file_id(), selection);
1487        let file_id_str = index_id.to_string();
1488        let metadata = Arc::new(FileMetadata {
1489            blobs: Vec::new(),
1490            properties: HashMap::new(),
1491        });
1492        puffin_metadata_cache.put_metadata(file_id_str.clone(), metadata);
1493
1494        assert!(bloom_cache.get_metadata(bloom_key).is_some());
1495        assert!(
1496            inverted_cache
1497                .get_metadata((index_id.file_id(), index_id.version))
1498                .is_some()
1499        );
1500        assert!(result_cache.get(&predicate, index_id.file_id()).is_some());
1501        assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_some());
1502
1503        cache.evict_puffin_cache(index_id).await;
1504
1505        assert!(bloom_cache.get_metadata(bloom_key).is_none());
1506        assert!(
1507            inverted_cache
1508                .get_metadata((index_id.file_id(), index_id.version))
1509                .is_none()
1510        );
1511        assert!(result_cache.get(&predicate, index_id.file_id()).is_none());
1512        assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_none());
1513
1514        // Refill caches and evict via CacheStrategy to ensure delegation works.
1515        bloom_cache.put_metadata(bloom_key, Arc::new(BloomFilterMeta::default()));
1516        inverted_cache.put_metadata(
1517            (index_id.file_id(), index_id.version),
1518            Arc::new(InvertedIndexMetas::default()),
1519        );
1520        result_cache.put(
1521            predicate.clone(),
1522            index_id.file_id(),
1523            Arc::new(RowGroupSelection::default()),
1524        );
1525        puffin_metadata_cache.put_metadata(
1526            file_id_str.clone(),
1527            Arc::new(FileMetadata {
1528                blobs: Vec::new(),
1529                properties: HashMap::new(),
1530            }),
1531        );
1532
1533        let strategy = CacheStrategy::EnableAll(cache.clone());
1534        strategy.evict_puffin_cache(index_id).await;
1535
1536        assert!(bloom_cache.get_metadata(bloom_key).is_none());
1537        assert!(
1538            inverted_cache
1539                .get_metadata((index_id.file_id(), index_id.version))
1540                .is_none()
1541        );
1542        assert!(result_cache.get(&predicate, index_id.file_id()).is_none());
1543        assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_none());
1544    }
1545
1546    fn wide_region_metadata(column_count: u32) -> RegionMetadata {
1547        let region_id = RegionId::new(1024, 7);
1548        let mut builder = RegionMetadataBuilder::new(region_id);
1549        let mut primary_key = Vec::new();
1550
1551        for column_id in 0..column_count {
1552            let semantic_type = if column_id < 32 {
1553                primary_key.push(column_id);
1554                SemanticType::Tag
1555            } else {
1556                SemanticType::Field
1557            };
1558            let mut column_schema = ColumnSchema::new(
1559                format!("wide_column_{column_id}"),
1560                ConcreteDataType::string_datatype(),
1561                true,
1562            );
1563            column_schema
1564                .mut_metadata()
1565                .insert(format!("cache_key_{column_id}"), "cache_value".repeat(4));
1566            builder.push_column_metadata(ColumnMetadata {
1567                column_schema,
1568                semantic_type,
1569                column_id,
1570            });
1571        }
1572
1573        builder.push_column_metadata(ColumnMetadata {
1574            column_schema: ColumnSchema::new(
1575                "ts",
1576                ConcreteDataType::timestamp_millisecond_datatype(),
1577                false,
1578            ),
1579            semantic_type: SemanticType::Timestamp,
1580            column_id: column_count,
1581        });
1582        builder.primary_key(primary_key);
1583
1584        builder.build().unwrap()
1585    }
1586}