Skip to main content

mito2/
cache.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Cache for the engine.
16
17pub(crate) mod cache_size;
18
19pub(crate) mod file_cache;
20pub(crate) mod index;
21pub(crate) mod manifest_cache;
22#[cfg(test)]
23pub(crate) mod test_util;
24pub(crate) mod write_cache;
25
26use std::mem;
27use std::ops::Range;
28use std::sync::Arc;
29
30use bytes::Bytes;
31use common_base::readable_size::ReadableSize;
32use common_telemetry::warn;
33use datatypes::arrow::record_batch::RecordBatch;
34use datatypes::value::Value;
35use datatypes::vectors::VectorRef;
36use index::bloom_filter_index::{BloomFilterIndexCache, BloomFilterIndexCacheRef};
37use index::result_cache::IndexResultCache;
38use moka::notification::RemovalCause;
39use moka::sync::Cache;
40use object_store::ObjectStore;
41use parquet::file::metadata::{FileMetaData, PageIndexPolicy, ParquetMetaData};
42use puffin::puffin_manager::cache::{PuffinMetadataCache, PuffinMetadataCacheRef};
43use snafu::{OptionExt, ResultExt};
44use store_api::metadata::RegionMetadataRef;
45use store_api::storage::{ConcreteDataType, FileId, RegionId, TimeSeriesRowSelector};
46
47use crate::cache::cache_size::parquet_meta_size;
48use crate::cache::file_cache::{FileType, IndexKey};
49use crate::cache::index::inverted_index::{InvertedIndexCache, InvertedIndexCacheRef};
50#[cfg(feature = "vector_index")]
51use crate::cache::index::vector_index::{VectorIndexCache, VectorIndexCacheRef};
52use crate::cache::write_cache::WriteCacheRef;
53use crate::error::{InvalidMetadataSnafu, InvalidParquetSnafu, Result, UnexpectedSnafu};
54use crate::memtable::record_batch_estimated_size;
55use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS};
56use crate::read::Batch;
57use crate::read::range_cache::{RangeScanCacheKey, RangeScanCacheValue};
58use crate::sst::file::{RegionFileId, RegionIndexId};
59use crate::sst::parquet::PARQUET_METADATA_KEY;
60use crate::sst::parquet::reader::MetadataCacheMetrics;
61
62/// Metrics type key for sst meta.
63const SST_META_TYPE: &str = "sst_meta";
64/// Metrics type key for vector.
65const VECTOR_TYPE: &str = "vector";
66/// Metrics type key for pages.
67const PAGE_TYPE: &str = "page";
68/// Metrics type key for files on the local store.
69const FILE_TYPE: &str = "file";
70/// Metrics type key for index files (puffin) on the local store.
71const INDEX_TYPE: &str = "index";
72/// Metrics type key for selector result cache.
73const SELECTOR_RESULT_TYPE: &str = "selector_result";
74/// Metrics type key for range scan result cache.
75const RANGE_RESULT_TYPE: &str = "range_result";
76const RANGE_RESULT_CONCAT_MEMORY_LIMIT: ReadableSize = ReadableSize::mb(512);
77const RANGE_RESULT_CONCAT_MEMORY_PERMIT: ReadableSize = ReadableSize::kb(1);
78
79#[derive(Debug)]
80pub(crate) struct RangeResultMemoryLimiter {
81    semaphore: Arc<tokio::sync::Semaphore>,
82    permit_bytes: usize,
83    total_permits: usize,
84}
85
86impl Default for RangeResultMemoryLimiter {
87    fn default() -> Self {
88        Self::new(
89            RANGE_RESULT_CONCAT_MEMORY_LIMIT.as_bytes() as usize,
90            RANGE_RESULT_CONCAT_MEMORY_PERMIT.as_bytes() as usize,
91        )
92    }
93}
94
95impl RangeResultMemoryLimiter {
96    pub(crate) fn new(limit_bytes: usize, permit_bytes: usize) -> Self {
97        let permit_bytes = permit_bytes.max(1);
98        let total_permits = limit_bytes
99            .div_ceil(permit_bytes)
100            .clamp(1, tokio::sync::Semaphore::MAX_PERMITS);
101        Self {
102            semaphore: Arc::new(tokio::sync::Semaphore::new(total_permits)),
103            permit_bytes,
104            total_permits,
105        }
106    }
107
108    #[cfg(test)]
109    pub(crate) fn permit_bytes(&self) -> usize {
110        self.permit_bytes
111    }
112
113    #[cfg(test)]
114    pub(crate) fn available_permits(&self) -> usize {
115        self.semaphore.available_permits()
116    }
117
118    pub(crate) async fn acquire(&self, bytes: usize) -> Result<tokio::sync::SemaphorePermit<'_>> {
119        let permits = bytes.div_ceil(self.permit_bytes).max(1);
120        if permits > self.total_permits {
121            return UnexpectedSnafu {
122                reason: format!(
123                    "range result memory request of {bytes} bytes exceeds limiter capacity of {} bytes",
124                    self.total_permits.saturating_mul(self.permit_bytes)
125                ),
126            }
127            .fail();
128        }
129        self.semaphore
130            .acquire_many(permits as u32)
131            .await
132            .map_err(|_| {
133                UnexpectedSnafu {
134                    reason: "range result memory limiter is unexpectedly closed",
135                }
136                .build()
137            })
138    }
139}
140
141/// Cached SST metadata combines the parquet footer with the decoded region metadata.
142///
143/// The cached parquet footer strips the `greptime:metadata` JSON payload and stores the decoded
144/// [RegionMetadata] separately so readers can skip repeated deserialization work.
145#[derive(Debug)]
146pub(crate) struct CachedSstMeta {
147    parquet_metadata: Arc<ParquetMetaData>,
148    region_metadata: RegionMetadataRef,
149    region_metadata_weight: usize,
150}
151
152impl CachedSstMeta {
153    pub(crate) fn try_new(file_path: &str, parquet_metadata: ParquetMetaData) -> Result<Self> {
154        Self::try_new_with_region_metadata(file_path, parquet_metadata, None)
155    }
156
157    pub(crate) fn try_new_with_region_metadata(
158        file_path: &str,
159        parquet_metadata: ParquetMetaData,
160        region_metadata: Option<RegionMetadataRef>,
161    ) -> Result<Self> {
162        let file_metadata = parquet_metadata.file_metadata();
163        let key_values = file_metadata
164            .key_value_metadata()
165            .context(InvalidParquetSnafu {
166                file: file_path,
167                reason: "missing key value meta",
168            })?;
169        let meta_value = key_values
170            .iter()
171            .find(|kv| kv.key == PARQUET_METADATA_KEY)
172            .with_context(|| InvalidParquetSnafu {
173                file: file_path,
174                reason: format!("key {} not found", PARQUET_METADATA_KEY),
175            })?;
176        let json = meta_value
177            .value
178            .as_ref()
179            .with_context(|| InvalidParquetSnafu {
180                file: file_path,
181                reason: format!("No value for key {}", PARQUET_METADATA_KEY),
182            })?;
183        let region_metadata = match region_metadata {
184            Some(region_metadata) => region_metadata,
185            None => Arc::new(
186                store_api::metadata::RegionMetadata::from_json(json)
187                    .context(InvalidMetadataSnafu)?,
188            ),
189        };
190        // Keep the previous JSON-byte floor and charge the decoded structures as well.
191        let region_metadata_weight = region_metadata.estimated_size().max(json.len());
192        let parquet_metadata = Arc::new(strip_region_metadata_from_parquet(parquet_metadata));
193
194        Ok(Self {
195            parquet_metadata,
196            region_metadata,
197            region_metadata_weight,
198        })
199    }
200
201    pub(crate) fn parquet_metadata(&self) -> Arc<ParquetMetaData> {
202        self.parquet_metadata.clone()
203    }
204
205    pub(crate) fn region_metadata(&self) -> RegionMetadataRef {
206        self.region_metadata.clone()
207    }
208}
209
210fn strip_region_metadata_from_parquet(parquet_metadata: ParquetMetaData) -> ParquetMetaData {
211    let file_metadata = parquet_metadata.file_metadata();
212    let filtered_key_values = file_metadata.key_value_metadata().and_then(|key_values| {
213        let filtered = key_values
214            .iter()
215            .filter(|kv| kv.key != PARQUET_METADATA_KEY)
216            .cloned()
217            .collect::<Vec<_>>();
218        (!filtered.is_empty()).then_some(filtered)
219    });
220    let stripped_file_metadata = FileMetaData::new(
221        file_metadata.version(),
222        file_metadata.num_rows(),
223        file_metadata.created_by().map(ToString::to_string),
224        filtered_key_values,
225        file_metadata.schema_descr_ptr(),
226        file_metadata.column_orders().cloned(),
227    );
228
229    let mut builder = parquet_metadata.into_builder();
230    let row_groups = builder.take_row_groups();
231    let column_index = builder.take_column_index();
232    let offset_index = builder.take_offset_index();
233
234    parquet::file::metadata::ParquetMetaDataBuilder::new(stripped_file_metadata)
235        .set_row_groups(row_groups)
236        .set_column_index(column_index)
237        .set_offset_index(offset_index)
238        .build()
239}
240
241/// Cache strategies that may only enable a subset of caches.
242#[derive(Clone)]
243pub enum CacheStrategy {
244    /// Strategy for normal operations.
245    /// Doesn't disable any cache.
246    EnableAll(CacheManagerRef),
247    /// Strategy for compaction.
248    /// Disables some caches during compaction to avoid affecting queries.
249    /// Enables the write cache so that the compaction can read files cached
250    /// in the write cache and write the compacted files back to the write cache.
251    Compaction(CacheManagerRef),
252    /// Do not use any cache.
253    Disabled,
254}
255
256impl CacheStrategy {
257    /// Gets fused SST metadata with cache metrics tracking.
258    pub(crate) async fn get_sst_meta_data(
259        &self,
260        file_id: RegionFileId,
261        metrics: &mut MetadataCacheMetrics,
262        page_index_policy: PageIndexPolicy,
263    ) -> Option<Arc<CachedSstMeta>> {
264        match self {
265            CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => {
266                cache_manager
267                    .get_sst_meta_data(file_id, metrics, page_index_policy)
268                    .await
269            }
270            CacheStrategy::Disabled => {
271                metrics.cache_miss += 1;
272                None
273            }
274        }
275    }
276
277    /// Calls [CacheManager::get_sst_meta_data_from_mem_cache()].
278    pub(crate) fn get_sst_meta_data_from_mem_cache(
279        &self,
280        file_id: RegionFileId,
281    ) -> Option<Arc<CachedSstMeta>> {
282        match self {
283            CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => {
284                cache_manager.get_sst_meta_data_from_mem_cache(file_id)
285            }
286            CacheStrategy::Disabled => None,
287        }
288    }
289
290    /// Calls [CacheManager::get_parquet_meta_data_from_mem_cache()].
291    pub fn get_parquet_meta_data_from_mem_cache(
292        &self,
293        file_id: RegionFileId,
294    ) -> Option<Arc<ParquetMetaData>> {
295        self.get_sst_meta_data_from_mem_cache(file_id)
296            .map(|metadata| metadata.parquet_metadata())
297    }
298
299    /// Calls [CacheManager::put_sst_meta_data()].
300    pub(crate) fn put_sst_meta_data(&self, file_id: RegionFileId, metadata: Arc<CachedSstMeta>) {
301        match self {
302            CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => {
303                cache_manager.put_sst_meta_data(file_id, metadata);
304            }
305            CacheStrategy::Disabled => {}
306        }
307    }
308
309    /// Calls [CacheManager::put_parquet_meta_data()].
310    pub fn put_parquet_meta_data(
311        &self,
312        file_id: RegionFileId,
313        metadata: Arc<ParquetMetaData>,
314        region_metadata: Option<RegionMetadataRef>,
315    ) {
316        match self {
317            CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => {
318                cache_manager.put_parquet_meta_data(file_id, metadata, region_metadata);
319            }
320            CacheStrategy::Disabled => {}
321        }
322    }
323
324    /// Calls [CacheManager::remove_parquet_meta_data()].
325    pub fn remove_parquet_meta_data(&self, file_id: RegionFileId) {
326        match self {
327            CacheStrategy::EnableAll(cache_manager) => {
328                cache_manager.remove_parquet_meta_data(file_id);
329            }
330            CacheStrategy::Compaction(cache_manager) => {
331                cache_manager.remove_parquet_meta_data(file_id);
332            }
333            CacheStrategy::Disabled => {}
334        }
335    }
336
337    /// Calls [CacheManager::get_repeated_vector()].
338    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
339    pub fn get_repeated_vector(
340        &self,
341        data_type: &ConcreteDataType,
342        value: &Value,
343    ) -> Option<VectorRef> {
344        match self {
345            CacheStrategy::EnableAll(cache_manager) => {
346                cache_manager.get_repeated_vector(data_type, value)
347            }
348            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
349        }
350    }
351
352    /// Calls [CacheManager::put_repeated_vector()].
353    /// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
354    pub fn put_repeated_vector(&self, value: Value, vector: VectorRef) {
355        if let CacheStrategy::EnableAll(cache_manager) = self {
356            cache_manager.put_repeated_vector(value, vector);
357        }
358    }
359
360    /// Calls [CacheManager::get_pages()].
361    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
362    pub fn get_pages(&self, page_key: &PageKey) -> Option<Arc<PageValue>> {
363        match self {
364            CacheStrategy::EnableAll(cache_manager) => cache_manager.get_pages(page_key),
365            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
366        }
367    }
368
369    /// Calls [CacheManager::put_pages()].
370    /// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
371    pub fn put_pages(&self, page_key: PageKey, pages: Arc<PageValue>) {
372        if let CacheStrategy::EnableAll(cache_manager) = self {
373            cache_manager.put_pages(page_key, pages);
374        }
375    }
376
377    /// Calls [CacheManager::evict_puffin_cache()].
378    pub async fn evict_puffin_cache(&self, file_id: RegionIndexId) {
379        match self {
380            CacheStrategy::EnableAll(cache_manager) => {
381                cache_manager.evict_puffin_cache(file_id).await
382            }
383            CacheStrategy::Compaction(cache_manager) => {
384                cache_manager.evict_puffin_cache(file_id).await
385            }
386            CacheStrategy::Disabled => {}
387        }
388    }
389
390    /// Calls [CacheManager::get_selector_result()].
391    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
392    pub fn get_selector_result(
393        &self,
394        selector_key: &SelectorResultKey,
395    ) -> Option<Arc<SelectorResultValue>> {
396        match self {
397            CacheStrategy::EnableAll(cache_manager) => {
398                cache_manager.get_selector_result(selector_key)
399            }
400            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
401        }
402    }
403
404    /// Calls [CacheManager::put_selector_result()].
405    /// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
406    pub fn put_selector_result(
407        &self,
408        selector_key: SelectorResultKey,
409        result: Arc<SelectorResultValue>,
410    ) {
411        if let CacheStrategy::EnableAll(cache_manager) = self {
412            cache_manager.put_selector_result(selector_key, result);
413        }
414    }
415
416    /// Calls [CacheManager::get_range_result()].
417    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
418    #[allow(dead_code)]
419    pub(crate) fn get_range_result(
420        &self,
421        key: &RangeScanCacheKey,
422    ) -> Option<Arc<RangeScanCacheValue>> {
423        match self {
424            CacheStrategy::EnableAll(cache_manager) => cache_manager.get_range_result(key),
425            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
426        }
427    }
428
429    /// Calls [CacheManager::put_range_result()].
430    /// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
431    pub(crate) fn put_range_result(
432        &self,
433        key: RangeScanCacheKey,
434        result: Arc<RangeScanCacheValue>,
435    ) {
436        if let CacheStrategy::EnableAll(cache_manager) = self {
437            cache_manager.put_range_result(key, result);
438        }
439    }
440
441    /// Returns true if the range result cache is enabled.
442    pub(crate) fn has_range_result_cache(&self) -> bool {
443        match self {
444            CacheStrategy::EnableAll(cache_manager) => cache_manager.has_range_result_cache(),
445            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => false,
446        }
447    }
448
449    pub(crate) fn range_result_memory_limiter(&self) -> Option<&Arc<RangeResultMemoryLimiter>> {
450        match self {
451            CacheStrategy::EnableAll(cache_manager) => {
452                Some(cache_manager.range_result_memory_limiter())
453            }
454            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
455        }
456    }
457
458    pub(crate) fn range_result_cache_size(&self) -> Option<usize> {
459        match self {
460            CacheStrategy::EnableAll(cache_manager) => {
461                Some(cache_manager.range_result_cache_size())
462            }
463            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
464        }
465    }
466
467    /// Calls [CacheManager::write_cache()].
468    /// It returns None if the strategy is [CacheStrategy::Disabled].
469    pub fn write_cache(&self) -> Option<&WriteCacheRef> {
470        match self {
471            CacheStrategy::EnableAll(cache_manager) => cache_manager.write_cache(),
472            CacheStrategy::Compaction(cache_manager) => cache_manager.write_cache(),
473            CacheStrategy::Disabled => None,
474        }
475    }
476
477    /// Calls [CacheManager::index_cache()].
478    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
479    pub fn inverted_index_cache(&self) -> Option<&InvertedIndexCacheRef> {
480        match self {
481            CacheStrategy::EnableAll(cache_manager) => cache_manager.inverted_index_cache(),
482            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
483        }
484    }
485
486    /// Calls [CacheManager::bloom_filter_index_cache()].
487    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
488    pub fn bloom_filter_index_cache(&self) -> Option<&BloomFilterIndexCacheRef> {
489        match self {
490            CacheStrategy::EnableAll(cache_manager) => cache_manager.bloom_filter_index_cache(),
491            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
492        }
493    }
494
495    /// Calls [CacheManager::vector_index_cache()].
496    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
497    #[cfg(feature = "vector_index")]
498    pub fn vector_index_cache(&self) -> Option<&VectorIndexCacheRef> {
499        match self {
500            CacheStrategy::EnableAll(cache_manager) => cache_manager.vector_index_cache(),
501            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
502        }
503    }
504
505    /// Calls [CacheManager::puffin_metadata_cache()].
506    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
507    pub fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
508        match self {
509            CacheStrategy::EnableAll(cache_manager) => cache_manager.puffin_metadata_cache(),
510            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
511        }
512    }
513
514    /// Calls [CacheManager::index_result_cache()].
515    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
516    pub fn index_result_cache(&self) -> Option<&IndexResultCache> {
517        match self {
518            CacheStrategy::EnableAll(cache_manager) => cache_manager.index_result_cache(),
519            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
520        }
521    }
522
523    /// Triggers download if the strategy is [CacheStrategy::EnableAll] and write cache is available.
524    pub fn maybe_download_background(
525        &self,
526        index_key: IndexKey,
527        remote_path: String,
528        remote_store: ObjectStore,
529        file_size: u64,
530    ) {
531        if let CacheStrategy::EnableAll(cache_manager) = self
532            && let Some(write_cache) = cache_manager.write_cache()
533        {
534            write_cache.file_cache().maybe_download_background(
535                index_key,
536                remote_path,
537                remote_store,
538                file_size,
539            );
540        }
541    }
542}
543
544/// Manages cached data for the engine.
545///
546/// All caches are disabled by default.
547#[derive(Default)]
548pub struct CacheManager {
549    /// Cache for SST metadata.
550    sst_meta_cache: Option<SstMetaCache>,
551    /// Cache for vectors.
552    vector_cache: Option<VectorCache>,
553    /// Cache for SST pages.
554    page_cache: Option<PageCache>,
555    /// A Cache for writing files to object stores.
556    write_cache: Option<WriteCacheRef>,
557    /// Cache for inverted index.
558    inverted_index_cache: Option<InvertedIndexCacheRef>,
559    /// Cache for bloom filter index.
560    bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
561    /// Cache for vector index.
562    #[cfg(feature = "vector_index")]
563    vector_index_cache: Option<VectorIndexCacheRef>,
564    /// Puffin metadata cache.
565    puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
566    /// Cache for time series selectors.
567    selector_result_cache: Option<SelectorResultCache>,
568    /// Cache for range scan outputs in flat format.
569    range_result_cache: Option<RangeResultCache>,
570    /// Configured capacity for range scan outputs in flat format.
571    range_result_cache_size: u64,
572    /// Shared memory limiter for async range-result cache tasks.
573    range_result_memory_limiter: Arc<RangeResultMemoryLimiter>,
574    /// Cache for index result.
575    index_result_cache: Option<IndexResultCache>,
576}
577
578pub type CacheManagerRef = Arc<CacheManager>;
579
580impl CacheManager {
581    /// Returns a builder to build the cache.
582    pub fn builder() -> CacheManagerBuilder {
583        CacheManagerBuilder::default()
584    }
585
586    /// Gets fused SST metadata with metrics tracking.
587    /// Tries in-memory cache first, then file cache, updating metrics accordingly.
588    pub(crate) async fn get_sst_meta_data(
589        &self,
590        file_id: RegionFileId,
591        metrics: &mut MetadataCacheMetrics,
592        page_index_policy: PageIndexPolicy,
593    ) -> Option<Arc<CachedSstMeta>> {
594        if let Some(metadata) = self.get_sst_meta_data_from_mem_cache(file_id) {
595            metrics.mem_cache_hit += 1;
596            return Some(metadata);
597        }
598
599        let key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Parquet);
600        if let Some(write_cache) = &self.write_cache
601            && let Some(metadata) = write_cache
602                .file_cache()
603                .get_sst_meta_data(key, metrics, page_index_policy)
604                .await
605        {
606            metrics.file_cache_hit += 1;
607            self.put_sst_meta_data(file_id, metadata.clone());
608            return Some(metadata);
609        }
610
611        metrics.cache_miss += 1;
612        None
613    }
614
615    /// Gets cached [ParquetMetaData] with metrics tracking.
616    /// Tries in-memory cache first, then file cache, updating metrics accordingly.
617    pub(crate) async fn get_parquet_meta_data(
618        &self,
619        file_id: RegionFileId,
620        metrics: &mut MetadataCacheMetrics,
621        page_index_policy: PageIndexPolicy,
622    ) -> Option<Arc<ParquetMetaData>> {
623        self.get_sst_meta_data(file_id, metrics, page_index_policy)
624            .await
625            .map(|metadata| metadata.parquet_metadata())
626    }
627
628    /// Gets cached fused SST metadata from in-memory cache.
629    /// This method does not perform I/O.
630    pub(crate) fn get_sst_meta_data_from_mem_cache(
631        &self,
632        file_id: RegionFileId,
633    ) -> Option<Arc<CachedSstMeta>> {
634        self.sst_meta_cache.as_ref().and_then(|sst_meta_cache| {
635            let value = sst_meta_cache.get(&SstMetaKey(file_id.region_id(), file_id.file_id()));
636            update_hit_miss(value, SST_META_TYPE)
637        })
638    }
639
640    /// Gets cached [ParquetMetaData] from in-memory cache.
641    /// This method does not perform I/O.
642    pub fn get_parquet_meta_data_from_mem_cache(
643        &self,
644        file_id: RegionFileId,
645    ) -> Option<Arc<ParquetMetaData>> {
646        self.get_sst_meta_data_from_mem_cache(file_id)
647            .map(|metadata| metadata.parquet_metadata())
648    }
649
650    /// Puts fused SST metadata into the cache.
651    pub(crate) fn put_sst_meta_data(&self, file_id: RegionFileId, metadata: Arc<CachedSstMeta>) {
652        if let Some(cache) = &self.sst_meta_cache {
653            let key = SstMetaKey(file_id.region_id(), file_id.file_id());
654            CACHE_BYTES
655                .with_label_values(&[SST_META_TYPE])
656                .add(meta_cache_weight(&key, &metadata).into());
657            cache.insert(key, metadata);
658        }
659    }
660
661    /// Puts [ParquetMetaData] into the cache.
662    pub fn put_parquet_meta_data(
663        &self,
664        file_id: RegionFileId,
665        metadata: Arc<ParquetMetaData>,
666        region_metadata: Option<RegionMetadataRef>,
667    ) {
668        if self.sst_meta_cache.is_some() {
669            let file_path = format!(
670                "region_id={}, file_id={}",
671                file_id.region_id(),
672                file_id.file_id()
673            );
674            match CachedSstMeta::try_new_with_region_metadata(
675                &file_path,
676                Arc::unwrap_or_clone(metadata),
677                region_metadata,
678            ) {
679                Ok(metadata) => self.put_sst_meta_data(file_id, Arc::new(metadata)),
680                Err(err) => warn!(
681                    err; "Failed to decode region metadata while caching parquet metadata, region_id: {}, file_id: {}",
682                    file_id.region_id(),
683                    file_id.file_id()
684                ),
685            }
686        }
687    }
688
689    /// Removes [ParquetMetaData] from the cache.
690    pub fn remove_parquet_meta_data(&self, file_id: RegionFileId) {
691        if let Some(cache) = &self.sst_meta_cache {
692            cache.remove(&SstMetaKey(file_id.region_id(), file_id.file_id()));
693        }
694    }
695
696    /// Returns the total weighted size of the in-memory SST meta cache.
697    pub(crate) fn sst_meta_cache_weighted_size(&self) -> u64 {
698        self.sst_meta_cache
699            .as_ref()
700            .map(|cache| cache.weighted_size())
701            .unwrap_or(0)
702    }
703
704    /// Returns true if the in-memory SST meta cache is enabled.
705    pub(crate) fn sst_meta_cache_enabled(&self) -> bool {
706        self.sst_meta_cache.is_some()
707    }
708
709    /// Gets a vector with repeated value for specific `key`.
710    pub fn get_repeated_vector(
711        &self,
712        data_type: &ConcreteDataType,
713        value: &Value,
714    ) -> Option<VectorRef> {
715        self.vector_cache.as_ref().and_then(|vector_cache| {
716            let value = vector_cache.get(&(data_type.clone(), value.clone()));
717            update_hit_miss(value, VECTOR_TYPE)
718        })
719    }
720
721    /// Puts a vector with repeated value into the cache.
722    pub fn put_repeated_vector(&self, value: Value, vector: VectorRef) {
723        if let Some(cache) = &self.vector_cache {
724            let key = (vector.data_type(), value);
725            CACHE_BYTES
726                .with_label_values(&[VECTOR_TYPE])
727                .add(vector_cache_weight(&key, &vector).into());
728            cache.insert(key, vector);
729        }
730    }
731
732    /// Gets pages for the row group.
733    pub fn get_pages(&self, page_key: &PageKey) -> Option<Arc<PageValue>> {
734        self.page_cache.as_ref().and_then(|page_cache| {
735            let value = page_cache.get(page_key);
736            update_hit_miss(value, PAGE_TYPE)
737        })
738    }
739
740    /// Puts pages of the row group into the cache.
741    pub fn put_pages(&self, page_key: PageKey, pages: Arc<PageValue>) {
742        if let Some(cache) = &self.page_cache {
743            CACHE_BYTES
744                .with_label_values(&[PAGE_TYPE])
745                .add(page_cache_weight(&page_key, &pages).into());
746            cache.insert(page_key, pages);
747        }
748    }
749
750    /// Evicts every puffin-related cache entry for the given file.
751    pub async fn evict_puffin_cache(&self, file_id: RegionIndexId) {
752        if let Some(cache) = &self.bloom_filter_index_cache {
753            cache.invalidate_file(file_id.file_id());
754        }
755
756        if let Some(cache) = &self.inverted_index_cache {
757            cache.invalidate_file(file_id.file_id());
758        }
759
760        if let Some(cache) = &self.index_result_cache {
761            cache.invalidate_file(file_id.file_id());
762        }
763
764        #[cfg(feature = "vector_index")]
765        if let Some(cache) = &self.vector_index_cache {
766            cache.invalidate_file(file_id.file_id());
767        }
768
769        if let Some(cache) = &self.puffin_metadata_cache {
770            cache.remove(&file_id.to_string());
771        }
772
773        if let Some(write_cache) = &self.write_cache {
774            write_cache
775                .remove(IndexKey::new(
776                    file_id.region_id(),
777                    file_id.file_id(),
778                    FileType::Puffin(file_id.version),
779                ))
780                .await;
781        }
782    }
783
784    /// Gets result of for the selector.
785    pub fn get_selector_result(
786        &self,
787        selector_key: &SelectorResultKey,
788    ) -> Option<Arc<SelectorResultValue>> {
789        self.selector_result_cache
790            .as_ref()
791            .and_then(|selector_result_cache| selector_result_cache.get(selector_key))
792    }
793
794    /// Puts result of the selector into the cache.
795    pub fn put_selector_result(
796        &self,
797        selector_key: SelectorResultKey,
798        result: Arc<SelectorResultValue>,
799    ) {
800        if let Some(cache) = &self.selector_result_cache {
801            CACHE_BYTES
802                .with_label_values(&[SELECTOR_RESULT_TYPE])
803                .add(selector_result_cache_weight(&selector_key, &result).into());
804            cache.insert(selector_key, result);
805        }
806    }
807
808    /// Gets cached result for range scan.
809    #[allow(dead_code)]
810    pub(crate) fn get_range_result(
811        &self,
812        key: &RangeScanCacheKey,
813    ) -> Option<Arc<RangeScanCacheValue>> {
814        self.range_result_cache
815            .as_ref()
816            .and_then(|cache| update_hit_miss(cache.get(key), RANGE_RESULT_TYPE))
817    }
818
819    /// Puts range scan result into cache.
820    pub(crate) fn put_range_result(
821        &self,
822        key: RangeScanCacheKey,
823        result: Arc<RangeScanCacheValue>,
824    ) {
825        if let Some(cache) = &self.range_result_cache {
826            CACHE_BYTES
827                .with_label_values(&[RANGE_RESULT_TYPE])
828                .add(range_result_cache_weight(&key, &result).into());
829            cache.insert(key, result);
830        }
831    }
832
833    /// Returns true if the range result cache is enabled.
834    pub(crate) fn has_range_result_cache(&self) -> bool {
835        self.range_result_cache.is_some()
836    }
837
838    pub(crate) fn range_result_memory_limiter(&self) -> &Arc<RangeResultMemoryLimiter> {
839        &self.range_result_memory_limiter
840    }
841
842    pub(crate) fn range_result_cache_size(&self) -> usize {
843        self.range_result_cache_size as usize
844    }
845
846    /// Gets the write cache.
847    pub(crate) fn write_cache(&self) -> Option<&WriteCacheRef> {
848        self.write_cache.as_ref()
849    }
850
851    pub(crate) fn inverted_index_cache(&self) -> Option<&InvertedIndexCacheRef> {
852        self.inverted_index_cache.as_ref()
853    }
854
855    pub(crate) fn bloom_filter_index_cache(&self) -> Option<&BloomFilterIndexCacheRef> {
856        self.bloom_filter_index_cache.as_ref()
857    }
858
859    #[cfg(feature = "vector_index")]
860    pub(crate) fn vector_index_cache(&self) -> Option<&VectorIndexCacheRef> {
861        self.vector_index_cache.as_ref()
862    }
863
864    pub(crate) fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
865        self.puffin_metadata_cache.as_ref()
866    }
867
868    pub(crate) fn index_result_cache(&self) -> Option<&IndexResultCache> {
869        self.index_result_cache.as_ref()
870    }
871}
872
873/// Increases selector cache miss metrics.
874pub fn selector_result_cache_miss() {
875    CACHE_MISS.with_label_values(&[SELECTOR_RESULT_TYPE]).inc()
876}
877
878/// Increases selector cache hit metrics.
879pub fn selector_result_cache_hit() {
880    CACHE_HIT.with_label_values(&[SELECTOR_RESULT_TYPE]).inc()
881}
882
883/// Builder to construct a [CacheManager].
884#[derive(Default)]
885pub struct CacheManagerBuilder {
886    sst_meta_cache_size: u64,
887    vector_cache_size: u64,
888    page_cache_size: u64,
889    index_metadata_size: u64,
890    index_content_size: u64,
891    index_content_page_size: u64,
892    index_result_cache_size: u64,
893    puffin_metadata_size: u64,
894    write_cache: Option<WriteCacheRef>,
895    selector_result_cache_size: u64,
896    range_result_cache_size: u64,
897}
898
899impl CacheManagerBuilder {
900    /// Sets meta cache size.
901    pub fn sst_meta_cache_size(mut self, bytes: u64) -> Self {
902        self.sst_meta_cache_size = bytes;
903        self
904    }
905
906    /// Sets vector cache size.
907    pub fn vector_cache_size(mut self, bytes: u64) -> Self {
908        self.vector_cache_size = bytes;
909        self
910    }
911
912    /// Sets page cache size.
913    pub fn page_cache_size(mut self, bytes: u64) -> Self {
914        self.page_cache_size = bytes;
915        self
916    }
917
918    /// Sets write cache.
919    pub fn write_cache(mut self, cache: Option<WriteCacheRef>) -> Self {
920        self.write_cache = cache;
921        self
922    }
923
924    /// Sets cache size for index metadata.
925    pub fn index_metadata_size(mut self, bytes: u64) -> Self {
926        self.index_metadata_size = bytes;
927        self
928    }
929
930    /// Sets cache size for index content.
931    pub fn index_content_size(mut self, bytes: u64) -> Self {
932        self.index_content_size = bytes;
933        self
934    }
935
936    /// Sets page size for index content.
937    pub fn index_content_page_size(mut self, bytes: u64) -> Self {
938        self.index_content_page_size = bytes;
939        self
940    }
941
942    /// Sets cache size for index result.
943    pub fn index_result_cache_size(mut self, bytes: u64) -> Self {
944        self.index_result_cache_size = bytes;
945        self
946    }
947
948    /// Sets cache size for puffin metadata.
949    pub fn puffin_metadata_size(mut self, bytes: u64) -> Self {
950        self.puffin_metadata_size = bytes;
951        self
952    }
953
954    /// Sets selector result cache size.
955    pub fn selector_result_cache_size(mut self, bytes: u64) -> Self {
956        self.selector_result_cache_size = bytes;
957        self
958    }
959
960    /// Sets range result cache size.
961    pub fn range_result_cache_size(mut self, bytes: u64) -> Self {
962        self.range_result_cache_size = bytes;
963        self
964    }
965
966    /// Builds the [CacheManager].
967    pub fn build(self) -> CacheManager {
968        fn to_str(cause: RemovalCause) -> &'static str {
969            match cause {
970                RemovalCause::Expired => "expired",
971                RemovalCause::Explicit => "explicit",
972                RemovalCause::Replaced => "replaced",
973                RemovalCause::Size => "size",
974            }
975        }
976
977        let sst_meta_cache = (self.sst_meta_cache_size != 0).then(|| {
978            Cache::builder()
979                .max_capacity(self.sst_meta_cache_size)
980                .weigher(meta_cache_weight)
981                .eviction_listener(|k, v, cause| {
982                    let size = meta_cache_weight(&k, &v);
983                    CACHE_BYTES
984                        .with_label_values(&[SST_META_TYPE])
985                        .sub(size.into());
986                    CACHE_EVICTION
987                        .with_label_values(&[SST_META_TYPE, to_str(cause)])
988                        .inc();
989                })
990                .build()
991        });
992        let vector_cache = (self.vector_cache_size != 0).then(|| {
993            Cache::builder()
994                .max_capacity(self.vector_cache_size)
995                .weigher(vector_cache_weight)
996                .eviction_listener(|k, v, cause| {
997                    let size = vector_cache_weight(&k, &v);
998                    CACHE_BYTES
999                        .with_label_values(&[VECTOR_TYPE])
1000                        .sub(size.into());
1001                    CACHE_EVICTION
1002                        .with_label_values(&[VECTOR_TYPE, to_str(cause)])
1003                        .inc();
1004                })
1005                .build()
1006        });
1007        let page_cache = (self.page_cache_size != 0).then(|| {
1008            Cache::builder()
1009                .max_capacity(self.page_cache_size)
1010                .weigher(page_cache_weight)
1011                .eviction_listener(|k, v, cause| {
1012                    let size = page_cache_weight(&k, &v);
1013                    CACHE_BYTES.with_label_values(&[PAGE_TYPE]).sub(size.into());
1014                    CACHE_EVICTION
1015                        .with_label_values(&[PAGE_TYPE, to_str(cause)])
1016                        .inc();
1017                })
1018                .build()
1019        });
1020        let inverted_index_cache = InvertedIndexCache::new(
1021            self.index_metadata_size,
1022            self.index_content_size,
1023            self.index_content_page_size,
1024        );
1025        // TODO(ruihang): check if it's ok to reuse the same param with inverted index
1026        let bloom_filter_index_cache = BloomFilterIndexCache::new(
1027            self.index_metadata_size,
1028            self.index_content_size,
1029            self.index_content_page_size,
1030        );
1031        #[cfg(feature = "vector_index")]
1032        let vector_index_cache = (self.index_content_size != 0)
1033            .then(|| Arc::new(VectorIndexCache::new(self.index_content_size)));
1034        let index_result_cache = (self.index_result_cache_size != 0)
1035            .then(|| IndexResultCache::new(self.index_result_cache_size));
1036        let puffin_metadata_cache =
1037            PuffinMetadataCache::new(self.puffin_metadata_size, &CACHE_BYTES);
1038        let selector_result_cache = (self.selector_result_cache_size != 0).then(|| {
1039            Cache::builder()
1040                .max_capacity(self.selector_result_cache_size)
1041                .weigher(selector_result_cache_weight)
1042                .eviction_listener(|k, v, cause| {
1043                    let size = selector_result_cache_weight(&k, &v);
1044                    CACHE_BYTES
1045                        .with_label_values(&[SELECTOR_RESULT_TYPE])
1046                        .sub(size.into());
1047                    CACHE_EVICTION
1048                        .with_label_values(&[SELECTOR_RESULT_TYPE, to_str(cause)])
1049                        .inc();
1050                })
1051                .build()
1052        });
1053        let range_result_cache = (self.range_result_cache_size != 0).then(|| {
1054            Cache::builder()
1055                .max_capacity(self.range_result_cache_size)
1056                .weigher(range_result_cache_weight)
1057                .eviction_listener(move |k, v, cause| {
1058                    let size = range_result_cache_weight(&k, &v);
1059                    CACHE_BYTES
1060                        .with_label_values(&[RANGE_RESULT_TYPE])
1061                        .sub(size.into());
1062                    CACHE_EVICTION
1063                        .with_label_values(&[RANGE_RESULT_TYPE, to_str(cause)])
1064                        .inc();
1065                })
1066                .build()
1067        });
1068        CacheManager {
1069            sst_meta_cache,
1070            vector_cache,
1071            page_cache,
1072            write_cache: self.write_cache,
1073            inverted_index_cache: Some(Arc::new(inverted_index_cache)),
1074            bloom_filter_index_cache: Some(Arc::new(bloom_filter_index_cache)),
1075            #[cfg(feature = "vector_index")]
1076            vector_index_cache,
1077            puffin_metadata_cache: Some(Arc::new(puffin_metadata_cache)),
1078            selector_result_cache,
1079            range_result_cache,
1080            range_result_cache_size: self.range_result_cache_size,
1081            range_result_memory_limiter: Arc::new(RangeResultMemoryLimiter::new(
1082                self.range_result_cache_size as usize,
1083                RANGE_RESULT_CONCAT_MEMORY_PERMIT.as_bytes() as usize,
1084            )),
1085            index_result_cache,
1086        }
1087    }
1088}
1089
1090fn meta_cache_weight(k: &SstMetaKey, v: &Arc<CachedSstMeta>) -> u32 {
1091    // We ignore the size of `Arc`.
1092    (k.estimated_size() + parquet_meta_size(&v.parquet_metadata) + v.region_metadata_weight) as u32
1093}
1094
1095fn vector_cache_weight(_k: &(ConcreteDataType, Value), v: &VectorRef) -> u32 {
1096    // We ignore the heap size of `Value`.
1097    (mem::size_of::<ConcreteDataType>() + mem::size_of::<Value>() + v.memory_size()) as u32
1098}
1099
1100fn page_cache_weight(k: &PageKey, v: &Arc<PageValue>) -> u32 {
1101    (k.estimated_size() + v.estimated_size()) as u32
1102}
1103
1104fn selector_result_cache_weight(k: &SelectorResultKey, v: &Arc<SelectorResultValue>) -> u32 {
1105    (mem::size_of_val(k) + v.estimated_size()) as u32
1106}
1107
1108fn range_result_cache_weight(k: &RangeScanCacheKey, v: &Arc<RangeScanCacheValue>) -> u32 {
1109    (k.estimated_size() + v.estimated_size()) as u32
1110}
1111
1112/// Updates cache hit/miss metrics.
1113fn update_hit_miss<T>(value: Option<T>, cache_type: &str) -> Option<T> {
1114    if value.is_some() {
1115        CACHE_HIT.with_label_values(&[cache_type]).inc();
1116    } else {
1117        CACHE_MISS.with_label_values(&[cache_type]).inc();
1118    }
1119    value
1120}
1121
1122/// Cache key (region id, file id) for SST meta.
1123#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1124struct SstMetaKey(RegionId, FileId);
1125
1126impl SstMetaKey {
1127    /// Returns memory used by the key (estimated).
1128    fn estimated_size(&self) -> usize {
1129        mem::size_of::<Self>()
1130    }
1131}
1132
1133/// Path to column pages in the SST file.
1134#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1135pub struct ColumnPagePath {
1136    /// Region id of the SST file to cache.
1137    region_id: RegionId,
1138    /// Id of the SST file to cache.
1139    file_id: FileId,
1140    /// Index of the row group.
1141    row_group_idx: usize,
1142    /// Index of the column in the row group.
1143    column_idx: usize,
1144}
1145
1146/// Cache key to pages in a row group (after projection).
1147///
1148/// Different projections will have different cache keys.
1149/// We cache all ranges together because they may refer to the same `Bytes`.
1150#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1151pub struct PageKey {
1152    /// Id of the SST file to cache.
1153    file_id: FileId,
1154    /// Index of the row group.
1155    row_group_idx: usize,
1156    /// Byte ranges of the pages to cache.
1157    ranges: Vec<Range<u64>>,
1158}
1159
1160impl PageKey {
1161    /// Creates a key for a list of pages.
1162    pub fn new(file_id: FileId, row_group_idx: usize, ranges: Vec<Range<u64>>) -> PageKey {
1163        PageKey {
1164            file_id,
1165            row_group_idx,
1166            ranges,
1167        }
1168    }
1169
1170    /// Returns memory used by the key (estimated).
1171    fn estimated_size(&self) -> usize {
1172        mem::size_of::<Self>() + mem::size_of_val(self.ranges.as_slice())
1173    }
1174}
1175
1176/// Cached row group pages for a column.
1177// We don't use enum here to make it easier to mock and use the struct.
1178#[derive(Default)]
1179pub struct PageValue {
1180    /// Compressed page in the row group.
1181    pub compressed: Vec<Bytes>,
1182    /// Total size of the pages (may be larger than sum of compressed bytes due to gaps).
1183    pub page_size: u64,
1184}
1185
1186impl PageValue {
1187    /// Creates a new value from a range of compressed pages.
1188    pub fn new(bytes: Vec<Bytes>, page_size: u64) -> PageValue {
1189        PageValue {
1190            compressed: bytes,
1191            page_size,
1192        }
1193    }
1194
1195    /// Returns memory used by the value (estimated).
1196    fn estimated_size(&self) -> usize {
1197        mem::size_of::<Self>()
1198            + self.page_size as usize
1199            + self.compressed.iter().map(mem::size_of_val).sum::<usize>()
1200    }
1201}
1202
1203/// Cache key for time series row selector result.
1204#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1205pub struct SelectorResultKey {
1206    /// Id of the SST file.
1207    pub file_id: FileId,
1208    /// Index of the row group.
1209    pub row_group_idx: usize,
1210    /// Time series row selector.
1211    pub selector: TimeSeriesRowSelector,
1212}
1213
1214/// Result stored in the selector result cache.
1215pub enum SelectorResult {
1216    /// Batches in the primary key format.
1217    PrimaryKey(Vec<Batch>),
1218    /// Record batches in the flat format.
1219    Flat(Vec<RecordBatch>),
1220}
1221
1222/// Cached result for time series row selector.
1223pub struct SelectorResultValue {
1224    /// Batches of rows selected by the selector.
1225    pub result: SelectorResult,
1226    /// Projection of rows.
1227    pub projection: Vec<usize>,
1228}
1229
1230impl SelectorResultValue {
1231    /// Creates a new selector result value with primary key format.
1232    pub fn new(result: Vec<Batch>, projection: Vec<usize>) -> SelectorResultValue {
1233        SelectorResultValue {
1234            result: SelectorResult::PrimaryKey(result),
1235            projection,
1236        }
1237    }
1238
1239    /// Creates a new selector result value with flat format.
1240    pub fn new_flat(result: Vec<RecordBatch>, projection: Vec<usize>) -> SelectorResultValue {
1241        SelectorResultValue {
1242            result: SelectorResult::Flat(result),
1243            projection,
1244        }
1245    }
1246
1247    /// Returns memory used by the value (estimated).
1248    fn estimated_size(&self) -> usize {
1249        match &self.result {
1250            SelectorResult::PrimaryKey(batches) => {
1251                batches.iter().map(|batch| batch.memory_size()).sum()
1252            }
1253            SelectorResult::Flat(batches) => batches.iter().map(record_batch_estimated_size).sum(),
1254        }
1255    }
1256}
1257
1258/// Maps (region id, file id) to fused SST metadata.
1259type SstMetaCache = Cache<SstMetaKey, Arc<CachedSstMeta>>;
1260/// Maps [Value] to a vector that holds this value repeatedly.
1261///
1262/// e.g. `"hello" => ["hello", "hello", "hello"]`
1263type VectorCache = Cache<(ConcreteDataType, Value), VectorRef>;
1264/// Maps (region, file, row group, column) to [PageValue].
1265type PageCache = Cache<PageKey, Arc<PageValue>>;
1266/// Maps (file id, row group id, time series row selector) to [SelectorResultValue].
1267type SelectorResultCache = Cache<SelectorResultKey, Arc<SelectorResultValue>>;
1268/// Maps partition-range scan key to cached flat batches.
1269type RangeResultCache = Cache<RangeScanCacheKey, Arc<RangeScanCacheValue>>;
1270
1271#[cfg(test)]
1272mod tests {
1273    use std::sync::Arc;
1274
1275    use api::v1::SemanticType;
1276    use api::v1::index::{BloomFilterMeta, InvertedIndexMetas};
1277    use datatypes::schema::ColumnSchema;
1278    use datatypes::vectors::Int64Vector;
1279    use puffin::file_metadata::FileMetadata;
1280    use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
1281    use store_api::storage::ColumnId;
1282
1283    use super::*;
1284    use crate::cache::index::bloom_filter_index::Tag;
1285    use crate::cache::index::result_cache::PredicateKey;
1286    use crate::cache::test_util::{
1287        parquet_meta, sst_parquet_meta, sst_parquet_meta_with_region_metadata,
1288    };
1289    use crate::read::range_cache::{
1290        RangeScanCacheKey, RangeScanCacheValue, ScanRequestFingerprintBuilder,
1291    };
1292    use crate::sst::parquet::row_selection::RowGroupSelection;
1293
1294    #[tokio::test]
1295    async fn test_disable_cache() {
1296        let cache = CacheManager::default();
1297        assert!(cache.sst_meta_cache.is_none());
1298        assert!(cache.vector_cache.is_none());
1299        assert!(cache.page_cache.is_none());
1300
1301        let region_id = RegionId::new(1, 1);
1302        let file_id = RegionFileId::new(region_id, FileId::random());
1303        let metadata = parquet_meta();
1304        let mut metrics = MetadataCacheMetrics::default();
1305        cache.put_parquet_meta_data(file_id, metadata, None);
1306        assert!(
1307            cache
1308                .get_parquet_meta_data(file_id, &mut metrics, Default::default())
1309                .await
1310                .is_none()
1311        );
1312
1313        let value = Value::Int64(10);
1314        let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10]));
1315        cache.put_repeated_vector(value.clone(), vector.clone());
1316        assert!(
1317            cache
1318                .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
1319                .is_none()
1320        );
1321
1322        let key = PageKey::new(file_id.file_id(), 1, vec![Range { start: 0, end: 5 }]);
1323        let pages = Arc::new(PageValue::default());
1324        cache.put_pages(key.clone(), pages);
1325        assert!(cache.get_pages(&key).is_none());
1326
1327        assert!(cache.write_cache().is_none());
1328    }
1329
1330    #[tokio::test]
1331    async fn test_parquet_meta_cache() {
1332        let cache = CacheManager::builder().sst_meta_cache_size(2000).build();
1333        let mut metrics = MetadataCacheMetrics::default();
1334        let region_id = RegionId::new(1, 1);
1335        let file_id = RegionFileId::new(region_id, FileId::random());
1336        assert!(
1337            cache
1338                .get_parquet_meta_data(file_id, &mut metrics, Default::default())
1339                .await
1340                .is_none()
1341        );
1342        let (metadata, region_metadata) = sst_parquet_meta();
1343        cache.put_parquet_meta_data(file_id, metadata, None);
1344        let cached = cache
1345            .get_sst_meta_data(file_id, &mut metrics, Default::default())
1346            .await
1347            .unwrap();
1348        assert_eq!(region_metadata, cached.region_metadata());
1349        assert!(
1350            cached
1351                .parquet_metadata()
1352                .file_metadata()
1353                .key_value_metadata()
1354                .is_none_or(|key_values| {
1355                    key_values
1356                        .iter()
1357                        .all(|key_value| key_value.key != PARQUET_METADATA_KEY)
1358                })
1359        );
1360        cache.remove_parquet_meta_data(file_id);
1361        assert!(
1362            cache
1363                .get_parquet_meta_data(file_id, &mut metrics, Default::default())
1364                .await
1365                .is_none()
1366        );
1367    }
1368
1369    #[tokio::test]
1370    async fn test_parquet_meta_cache_with_provided_region_metadata() {
1371        let cache = CacheManager::builder().sst_meta_cache_size(2000).build();
1372        let mut metrics = MetadataCacheMetrics::default();
1373        let region_id = RegionId::new(1, 1);
1374        let file_id = RegionFileId::new(region_id, FileId::random());
1375        let (metadata, region_metadata) = sst_parquet_meta();
1376
1377        cache.put_parquet_meta_data(file_id, metadata, Some(region_metadata.clone()));
1378
1379        let cached = cache
1380            .get_sst_meta_data(file_id, &mut metrics, Default::default())
1381            .await
1382            .unwrap();
1383        assert!(Arc::ptr_eq(&region_metadata, &cached.region_metadata()));
1384    }
1385
1386    #[test]
1387    fn test_meta_cache_weight_accounts_for_decoded_region_metadata() {
1388        let region_metadata = Arc::new(wide_region_metadata(128));
1389        let json_len = region_metadata.to_json().unwrap().len();
1390        let metadata = sst_parquet_meta_with_region_metadata(region_metadata.clone());
1391        let cached = Arc::new(
1392            CachedSstMeta::try_new("test.parquet", Arc::unwrap_or_clone(metadata)).unwrap(),
1393        );
1394        let key = SstMetaKey(region_metadata.region_id, FileId::random());
1395
1396        assert!(cached.region_metadata_weight > json_len);
1397        assert_eq!(
1398            meta_cache_weight(&key, &cached) as usize,
1399            key.estimated_size()
1400                + parquet_meta_size(&cached.parquet_metadata)
1401                + cached.region_metadata_weight
1402        );
1403    }
1404
1405    #[test]
1406    fn test_repeated_vector_cache() {
1407        let cache = CacheManager::builder().vector_cache_size(4096).build();
1408        let value = Value::Int64(10);
1409        assert!(
1410            cache
1411                .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
1412                .is_none()
1413        );
1414        let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10]));
1415        cache.put_repeated_vector(value.clone(), vector.clone());
1416        let cached = cache
1417            .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
1418            .unwrap();
1419        assert_eq!(vector, cached);
1420    }
1421
1422    #[test]
1423    fn test_page_cache() {
1424        let cache = CacheManager::builder().page_cache_size(1000).build();
1425        let file_id = FileId::random();
1426        let key = PageKey::new(file_id, 0, vec![(0..10), (10..20)]);
1427        assert!(cache.get_pages(&key).is_none());
1428        let pages = Arc::new(PageValue::default());
1429        cache.put_pages(key.clone(), pages);
1430        assert!(cache.get_pages(&key).is_some());
1431    }
1432
1433    #[test]
1434    fn test_selector_result_cache() {
1435        let cache = CacheManager::builder()
1436            .selector_result_cache_size(1000)
1437            .build();
1438        let file_id = FileId::random();
1439        let key = SelectorResultKey {
1440            file_id,
1441            row_group_idx: 0,
1442            selector: TimeSeriesRowSelector::LastRow,
1443        };
1444        assert!(cache.get_selector_result(&key).is_none());
1445        let result = Arc::new(SelectorResultValue::new(Vec::new(), Vec::new()));
1446        cache.put_selector_result(key, result);
1447        assert!(cache.get_selector_result(&key).is_some());
1448    }
1449
1450    #[test]
1451    fn test_range_result_cache() {
1452        let cache = Arc::new(
1453            CacheManager::builder()
1454                .range_result_cache_size(1024 * 1024)
1455                .build(),
1456        );
1457
1458        let key = RangeScanCacheKey {
1459            region_id: RegionId::new(1, 1),
1460            row_groups: vec![(FileId::random(), 0)],
1461            scan: ScanRequestFingerprintBuilder {
1462                read_column_ids: vec![],
1463                read_column_types: vec![],
1464                filters: vec!["tag_0 = 1".to_string()],
1465                time_filters: vec![],
1466                series_row_selector: None,
1467                append_mode: false,
1468                filter_deleted: true,
1469                merge_mode: crate::region::options::MergeMode::LastRow,
1470                partition_expr_version: 0,
1471            }
1472            .build(),
1473        };
1474        let value = Arc::new(RangeScanCacheValue::new(Vec::new(), 0));
1475
1476        assert!(cache.get_range_result(&key).is_none());
1477        cache.put_range_result(key.clone(), value.clone());
1478        assert!(cache.get_range_result(&key).is_some());
1479
1480        let enable_all = CacheStrategy::EnableAll(cache.clone());
1481        assert!(enable_all.get_range_result(&key).is_some());
1482
1483        let compaction = CacheStrategy::Compaction(cache.clone());
1484        assert!(compaction.get_range_result(&key).is_none());
1485        compaction.put_range_result(key.clone(), value.clone());
1486        assert!(cache.get_range_result(&key).is_some());
1487
1488        let disabled = CacheStrategy::Disabled;
1489        assert!(disabled.get_range_result(&key).is_none());
1490        disabled.put_range_result(key.clone(), value);
1491        assert!(cache.get_range_result(&key).is_some());
1492    }
1493
1494    #[test]
1495    fn test_range_result_cache_size_configures_limiter() {
1496        let cache_size = 3 * 1024_u64;
1497        let cache = CacheManager::builder()
1498            .range_result_cache_size(cache_size)
1499            .build();
1500
1501        assert_eq!(cache.range_result_cache_size(), cache_size as usize);
1502        assert_eq!(
1503            cache.range_result_memory_limiter().permit_bytes(),
1504            RANGE_RESULT_CONCAT_MEMORY_PERMIT.as_bytes() as usize
1505        );
1506        assert_eq!(
1507            cache.range_result_memory_limiter().available_permits(),
1508            (cache_size as usize).div_ceil(RANGE_RESULT_CONCAT_MEMORY_PERMIT.as_bytes() as usize)
1509        );
1510    }
1511
1512    #[tokio::test]
1513    async fn range_result_memory_limiter_rejects_oversized_request() {
1514        let limiter = RangeResultMemoryLimiter::new(2 * 1024, 1024);
1515        assert_eq!(limiter.available_permits(), 2);
1516
1517        let err = limiter.acquire(10 * 1024).await.unwrap_err();
1518        assert!(
1519            err.to_string().contains("exceeds limiter capacity"),
1520            "unexpected error: {err}"
1521        );
1522        assert_eq!(limiter.available_permits(), 2);
1523    }
1524
1525    #[tokio::test]
1526    async fn range_result_memory_limiter_allows_request_up_to_capacity() {
1527        let limiter = RangeResultMemoryLimiter::new(2 * 1024, 1024);
1528        let permit = limiter.acquire(2 * 1024).await.unwrap();
1529        assert_eq!(limiter.available_permits(), 0);
1530        drop(permit);
1531        assert_eq!(limiter.available_permits(), 2);
1532    }
1533
1534    #[tokio::test]
1535    async fn test_evict_puffin_cache_clears_all_entries() {
1536        use std::collections::{BTreeMap, HashMap};
1537
1538        let cache = CacheManager::builder()
1539            .index_metadata_size(128)
1540            .index_content_size(128)
1541            .index_content_page_size(64)
1542            .index_result_cache_size(128)
1543            .puffin_metadata_size(128)
1544            .build();
1545        let cache = Arc::new(cache);
1546
1547        let region_id = RegionId::new(1, 1);
1548        let index_id = RegionIndexId::new(RegionFileId::new(region_id, FileId::random()), 0);
1549        let column_id: ColumnId = 1;
1550
1551        let bloom_cache = cache.bloom_filter_index_cache().unwrap().clone();
1552        let inverted_cache = cache.inverted_index_cache().unwrap().clone();
1553        let result_cache = cache.index_result_cache().unwrap();
1554        let puffin_metadata_cache = cache.puffin_metadata_cache().unwrap().clone();
1555
1556        let bloom_key = (
1557            index_id.file_id(),
1558            index_id.version,
1559            column_id,
1560            Tag::Skipping,
1561        );
1562        bloom_cache.put_metadata(bloom_key, Arc::new(BloomFilterMeta::default()));
1563        inverted_cache.put_metadata(
1564            (index_id.file_id(), index_id.version),
1565            Arc::new(InvertedIndexMetas::default()),
1566        );
1567        let predicate = PredicateKey::new_bloom(Arc::new(BTreeMap::new()));
1568        let selection = Arc::new(RowGroupSelection::default());
1569        result_cache.put(predicate.clone(), index_id.file_id(), selection);
1570        let file_id_str = index_id.to_string();
1571        let metadata = Arc::new(FileMetadata {
1572            blobs: Vec::new(),
1573            properties: HashMap::new(),
1574        });
1575        puffin_metadata_cache.put_metadata(file_id_str.clone(), metadata);
1576
1577        assert!(bloom_cache.get_metadata(bloom_key).is_some());
1578        assert!(
1579            inverted_cache
1580                .get_metadata((index_id.file_id(), index_id.version))
1581                .is_some()
1582        );
1583        assert!(result_cache.get(&predicate, index_id.file_id()).is_some());
1584        assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_some());
1585
1586        cache.evict_puffin_cache(index_id).await;
1587
1588        assert!(bloom_cache.get_metadata(bloom_key).is_none());
1589        assert!(
1590            inverted_cache
1591                .get_metadata((index_id.file_id(), index_id.version))
1592                .is_none()
1593        );
1594        assert!(result_cache.get(&predicate, index_id.file_id()).is_none());
1595        assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_none());
1596
1597        // Refill caches and evict via CacheStrategy to ensure delegation works.
1598        bloom_cache.put_metadata(bloom_key, Arc::new(BloomFilterMeta::default()));
1599        inverted_cache.put_metadata(
1600            (index_id.file_id(), index_id.version),
1601            Arc::new(InvertedIndexMetas::default()),
1602        );
1603        result_cache.put(
1604            predicate.clone(),
1605            index_id.file_id(),
1606            Arc::new(RowGroupSelection::default()),
1607        );
1608        puffin_metadata_cache.put_metadata(
1609            file_id_str.clone(),
1610            Arc::new(FileMetadata {
1611                blobs: Vec::new(),
1612                properties: HashMap::new(),
1613            }),
1614        );
1615
1616        let strategy = CacheStrategy::EnableAll(cache.clone());
1617        strategy.evict_puffin_cache(index_id).await;
1618
1619        assert!(bloom_cache.get_metadata(bloom_key).is_none());
1620        assert!(
1621            inverted_cache
1622                .get_metadata((index_id.file_id(), index_id.version))
1623                .is_none()
1624        );
1625        assert!(result_cache.get(&predicate, index_id.file_id()).is_none());
1626        assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_none());
1627    }
1628
1629    fn wide_region_metadata(column_count: u32) -> RegionMetadata {
1630        let region_id = RegionId::new(1024, 7);
1631        let mut builder = RegionMetadataBuilder::new(region_id);
1632        let mut primary_key = Vec::new();
1633
1634        for column_id in 0..column_count {
1635            let semantic_type = if column_id < 32 {
1636                primary_key.push(column_id);
1637                SemanticType::Tag
1638            } else {
1639                SemanticType::Field
1640            };
1641            let mut column_schema = ColumnSchema::new(
1642                format!("wide_column_{column_id}"),
1643                ConcreteDataType::string_datatype(),
1644                true,
1645            );
1646            column_schema
1647                .mut_metadata()
1648                .insert(format!("cache_key_{column_id}"), "cache_value".repeat(4));
1649            builder.push_column_metadata(ColumnMetadata {
1650                column_schema,
1651                semantic_type,
1652                column_id,
1653            });
1654        }
1655
1656        builder.push_column_metadata(ColumnMetadata {
1657            column_schema: ColumnSchema::new(
1658                "ts",
1659                ConcreteDataType::timestamp_millisecond_datatype(),
1660                false,
1661            ),
1662            semantic_type: SemanticType::Timestamp,
1663            column_id: column_count,
1664        });
1665        builder.primary_key(primary_key);
1666
1667        builder.build().unwrap()
1668    }
1669}