Skip to main content

mito2/
cache.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Cache for the engine.
16
17pub(crate) mod cache_size;
18
19pub(crate) mod file_cache;
20pub(crate) mod index;
21pub(crate) mod manifest_cache;
22#[cfg(test)]
23pub(crate) mod test_util;
24pub(crate) mod write_cache;
25
26use std::collections::{BTreeMap, HashMap};
27use std::mem;
28use std::ops::Range;
29use std::sync::{Arc, RwLock};
30
31use bytes::Bytes;
32use common_base::readable_size::ReadableSize;
33use common_telemetry::warn;
34use datatypes::arrow::buffer::BooleanBuffer;
35use datatypes::arrow::record_batch::RecordBatch;
36use datatypes::value::Value;
37use datatypes::vectors::VectorRef;
38use index::bloom_filter_index::{BloomFilterIndexCache, BloomFilterIndexCacheRef};
39use index::result_cache::IndexResultCache;
40use moka::notification::RemovalCause;
41use moka::sync::Cache;
42use object_store::ObjectStore;
43use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
44use parquet::file::metadata::{FileMetaData, PageIndexPolicy, ParquetMetaData};
45use puffin::puffin_manager::cache::{PuffinMetadataCache, PuffinMetadataCacheRef};
46use smallvec::SmallVec;
47use snafu::{OptionExt, ResultExt};
48use store_api::metadata::RegionMetadataRef;
49use store_api::storage::{ConcreteDataType, FileId, RegionId, TimeSeriesRowSelector};
50
51use crate::cache::cache_size::parquet_meta_size;
52use crate::cache::file_cache::{FileType, IndexKey};
53use crate::cache::index::inverted_index::{InvertedIndexCache, InvertedIndexCacheRef};
54#[cfg(feature = "vector_index")]
55use crate::cache::index::vector_index::{VectorIndexCache, VectorIndexCacheRef};
56use crate::cache::write_cache::WriteCacheRef;
57use crate::error::{InvalidMetadataSnafu, InvalidParquetSnafu, Result, UnexpectedSnafu};
58use crate::memtable::record_batch_estimated_size;
59use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS};
60use crate::read::Batch;
61use crate::read::range_cache::{RangeScanCacheKey, RangeScanCacheValue};
62use crate::sst::file::{RegionFileId, RegionIndexId};
63use crate::sst::parquet::PARQUET_METADATA_KEY;
64use crate::sst::parquet::read_columns::ParquetReadColumns;
65use crate::sst::parquet::reader::MetadataCacheMetrics;
66
67/// Metrics type key for sst meta.
68const SST_META_TYPE: &str = "sst_meta";
69/// Metrics type key for vector.
70const VECTOR_TYPE: &str = "vector";
71/// Metrics type key for pages.
72const PAGE_TYPE: &str = "page";
73/// Metrics type key for files on the local store.
74const FILE_TYPE: &str = "file";
75/// Metrics type key for index files (puffin) on the local store.
76const INDEX_TYPE: &str = "index";
77/// Metrics type key for selector result cache.
78const SELECTOR_RESULT_TYPE: &str = "selector_result";
79/// Metrics type key for range scan result cache.
80const RANGE_RESULT_TYPE: &str = "range_result";
81/// Metrics type key for prefilter result cache.
82const PREFILTER_RESULT_TYPE: &str = "prefilter_result";
83const RANGE_RESULT_CONCAT_MEMORY_LIMIT: ReadableSize = ReadableSize::mb(512);
84const RANGE_RESULT_CONCAT_MEMORY_PERMIT: ReadableSize = ReadableSize::kb(1);
85
86#[derive(Debug)]
87pub(crate) struct RangeResultMemoryLimiter {
88    semaphore: Arc<tokio::sync::Semaphore>,
89    permit_bytes: usize,
90    total_permits: usize,
91}
92
93impl Default for RangeResultMemoryLimiter {
94    fn default() -> Self {
95        Self::new(
96            RANGE_RESULT_CONCAT_MEMORY_LIMIT.as_bytes() as usize,
97            RANGE_RESULT_CONCAT_MEMORY_PERMIT.as_bytes() as usize,
98        )
99    }
100}
101
102impl RangeResultMemoryLimiter {
103    pub(crate) fn new(limit_bytes: usize, permit_bytes: usize) -> Self {
104        let permit_bytes = permit_bytes.max(1);
105        let total_permits = limit_bytes
106            .div_ceil(permit_bytes)
107            .clamp(1, tokio::sync::Semaphore::MAX_PERMITS);
108        Self {
109            semaphore: Arc::new(tokio::sync::Semaphore::new(total_permits)),
110            permit_bytes,
111            total_permits,
112        }
113    }
114
115    #[cfg(test)]
116    pub(crate) fn permit_bytes(&self) -> usize {
117        self.permit_bytes
118    }
119
120    #[cfg(test)]
121    pub(crate) fn available_permits(&self) -> usize {
122        self.semaphore.available_permits()
123    }
124
125    pub(crate) async fn acquire(&self, bytes: usize) -> Result<tokio::sync::SemaphorePermit<'_>> {
126        let permits = bytes.div_ceil(self.permit_bytes).max(1);
127        if permits > self.total_permits {
128            return UnexpectedSnafu {
129                reason: format!(
130                    "range result memory request of {bytes} bytes exceeds limiter capacity of {} bytes",
131                    self.total_permits.saturating_mul(self.permit_bytes)
132                ),
133            }
134            .fail();
135        }
136        self.semaphore
137            .acquire_many(permits as u32)
138            .await
139            .map_err(|_| {
140                UnexpectedSnafu {
141                    reason: "range result memory limiter is unexpectedly closed",
142                }
143                .build()
144            })
145    }
146}
147
148/// Cached SST metadata combines the parquet footer with the decoded region metadata.
149///
150/// The cached parquet footer strips the `greptime:metadata` JSON payload and stores the decoded
151/// [RegionMetadata] separately so readers can skip repeated deserialization work.
152#[derive(Debug)]
153pub(crate) struct CachedSstMeta {
154    parquet_metadata: Arc<ParquetMetaData>,
155    region_metadata: RegionMetadataRef,
156    region_metadata_weight: usize,
157    page_index_policy: PageIndexPolicy,
158}
159
160impl CachedSstMeta {
161    #[cfg(test)]
162    pub(crate) fn try_new(file_path: &str, parquet_metadata: ParquetMetaData) -> Result<Self> {
163        let page_index_policy = infer_loaded_page_index_policy(&parquet_metadata);
164        Self::try_new_with_page_index_policy(file_path, parquet_metadata, None, page_index_policy)
165    }
166
167    pub(crate) fn try_new_with_region_metadata(
168        file_path: &str,
169        parquet_metadata: ParquetMetaData,
170        region_metadata: Option<RegionMetadataRef>,
171    ) -> Result<Self> {
172        let page_index_policy = infer_loaded_page_index_policy(&parquet_metadata);
173        Self::try_new_with_page_index_policy(
174            file_path,
175            parquet_metadata,
176            region_metadata,
177            page_index_policy,
178        )
179    }
180
181    pub(crate) fn try_new_with_page_index_policy(
182        file_path: &str,
183        parquet_metadata: ParquetMetaData,
184        region_metadata: Option<RegionMetadataRef>,
185        page_index_policy: PageIndexPolicy,
186    ) -> Result<Self> {
187        let file_metadata = parquet_metadata.file_metadata();
188        let key_values = file_metadata
189            .key_value_metadata()
190            .context(InvalidParquetSnafu {
191                file: file_path,
192                reason: "missing key value meta",
193            })?;
194        let meta_value = key_values
195            .iter()
196            .find(|kv| kv.key == PARQUET_METADATA_KEY)
197            .with_context(|| InvalidParquetSnafu {
198                file: file_path,
199                reason: format!("key {} not found", PARQUET_METADATA_KEY),
200            })?;
201        let json = meta_value
202            .value
203            .as_ref()
204            .with_context(|| InvalidParquetSnafu {
205                file: file_path,
206                reason: format!("No value for key {}", PARQUET_METADATA_KEY),
207            })?;
208        let region_metadata = match region_metadata {
209            Some(region_metadata) => region_metadata,
210            None => Arc::new(
211                store_api::metadata::RegionMetadata::from_json(json)
212                    .context(InvalidMetadataSnafu)?,
213            ),
214        };
215        // Keep the previous JSON-byte floor and charge the decoded structures as well.
216        let region_metadata_weight = region_metadata.estimated_size().max(json.len());
217        let parquet_metadata = Arc::new(strip_region_metadata_from_parquet(parquet_metadata));
218
219        Ok(Self {
220            parquet_metadata,
221            region_metadata,
222            region_metadata_weight,
223            page_index_policy,
224        })
225    }
226
227    pub(crate) fn parquet_metadata(&self) -> Arc<ParquetMetaData> {
228        self.parquet_metadata.clone()
229    }
230
231    pub(crate) fn region_metadata(&self) -> RegionMetadataRef {
232        self.region_metadata.clone()
233    }
234
235    fn satisfies_page_index_policy(&self, requested: PageIndexPolicy) -> bool {
236        match requested {
237            PageIndexPolicy::Skip => true,
238            PageIndexPolicy::Optional => self.page_index_policy != PageIndexPolicy::Skip,
239            PageIndexPolicy::Required => self.page_index_policy == PageIndexPolicy::Required,
240        }
241    }
242}
243
244fn infer_loaded_page_index_policy(parquet_metadata: &ParquetMetaData) -> PageIndexPolicy {
245    if parquet_metadata.column_index().is_some() || parquet_metadata.offset_index().is_some() {
246        PageIndexPolicy::Optional
247    } else {
248        PageIndexPolicy::Skip
249    }
250}
251
252fn strip_region_metadata_from_parquet(parquet_metadata: ParquetMetaData) -> ParquetMetaData {
253    let file_metadata = parquet_metadata.file_metadata();
254    let filtered_key_values = file_metadata.key_value_metadata().and_then(|key_values| {
255        let filtered = key_values
256            .iter()
257            .filter(|kv| kv.key != PARQUET_METADATA_KEY)
258            .cloned()
259            .collect::<Vec<_>>();
260        (!filtered.is_empty()).then_some(filtered)
261    });
262    let stripped_file_metadata = FileMetaData::new(
263        file_metadata.version(),
264        file_metadata.num_rows(),
265        file_metadata.created_by().map(ToString::to_string),
266        filtered_key_values,
267        file_metadata.schema_descr_ptr(),
268        file_metadata.column_orders().cloned(),
269    );
270
271    let mut builder = parquet_metadata.into_builder();
272    let row_groups = builder.take_row_groups();
273    let column_index = builder.take_column_index();
274    let offset_index = builder.take_offset_index();
275
276    parquet::file::metadata::ParquetMetaDataBuilder::new(stripped_file_metadata)
277        .set_row_groups(row_groups)
278        .set_column_index(column_index)
279        .set_offset_index(offset_index)
280        .build()
281}
282
283fn removal_cause_str(cause: RemovalCause) -> &'static str {
284    match cause {
285        RemovalCause::Expired => "expired",
286        RemovalCause::Explicit => "explicit",
287        RemovalCause::Replaced => "replaced",
288        RemovalCause::Size => "size",
289    }
290}
291
292#[derive(Debug, Clone, PartialEq, Eq, Hash)]
293pub(crate) struct PrefilterRowSelector {
294    row_count: usize,
295    skip: bool,
296}
297
298// `parquet::arrow::arrow_reader::RowSelector` does not implement `Hash`, but
299// prefilter cache keys must hash the upstream row-selection snapshot. Keep a
300// local hashable mirror of the two fields that define selector semantics.
301// TODO(yingwen): Remove this mirror if upstream `RowSelector` implements `Hash`.
302impl From<&RowSelector> for PrefilterRowSelector {
303    fn from(selector: &RowSelector) -> Self {
304        Self {
305            row_count: selector.row_count,
306            skip: selector.skip,
307        }
308    }
309}
310
311/// Key for a cached prefilter result.
312#[derive(Debug, Clone, PartialEq, Eq, Hash)]
313pub(crate) struct PrefilterKey {
314    file_id: FileId,
315    row_group_idx: u32,
316    row_selection: Option<Arc<Vec<PrefilterRowSelector>>>,
317    schema_version: u64,
318    filter_exprs: SmallVec<[String; 1]>,
319    mem_usage: usize,
320}
321
322impl PrefilterKey {
323    pub(crate) fn row_selection_snapshot(
324        row_selection: Option<&RowSelection>,
325    ) -> Option<Arc<Vec<PrefilterRowSelector>>> {
326        row_selection.map(|selection| {
327            Arc::new(
328                selection
329                    .iter()
330                    .map(PrefilterRowSelector::from)
331                    .collect::<Vec<_>>(),
332            )
333        })
334    }
335
336    pub(crate) fn new(
337        file_id: FileId,
338        row_group_idx: u32,
339        row_selection: Option<Arc<Vec<PrefilterRowSelector>>>,
340        schema_version: u64,
341        filter_exprs: SmallVec<[String; 1]>,
342    ) -> Self {
343        let row_selection_bytes = row_selection
344            .as_ref()
345            .map(|selection| selection.len() * mem::size_of::<PrefilterRowSelector>())
346            .unwrap_or(0);
347        let spilled_expr_bytes = if filter_exprs.spilled() {
348            filter_exprs.capacity() * mem::size_of::<String>()
349        } else {
350            0
351        };
352        let expr_bytes = filter_exprs.iter().map(|s| s.capacity()).sum::<usize>();
353
354        Self {
355            file_id,
356            row_group_idx,
357            row_selection,
358            schema_version,
359            filter_exprs,
360            mem_usage: mem::size_of::<Self>()
361                + row_selection_bytes
362                + spilled_expr_bytes
363                + expr_bytes,
364        }
365    }
366
367    fn mem_usage(&self) -> usize {
368        self.mem_usage
369    }
370}
371
372type PrefilterResultCache = Cache<PrefilterKey, Arc<BooleanBuffer>>;
373
374fn new_prefilter_result_cache(capacity: u64) -> PrefilterResultCache {
375    Cache::builder()
376        .max_capacity(capacity)
377        .weigher(prefilter_result_cache_weight)
378        .eviction_listener(|k, v, cause| {
379            let size = prefilter_result_cache_weight(&k, &v);
380            CACHE_BYTES
381                .with_label_values(&[PREFILTER_RESULT_TYPE])
382                .sub(size.into());
383            CACHE_EVICTION
384                .with_label_values(&[PREFILTER_RESULT_TYPE, removal_cause_str(cause)])
385                .inc();
386        })
387        .build()
388}
389
390fn prefilter_result_cache_weight(k: &PrefilterKey, v: &Arc<BooleanBuffer>) -> u32 {
391    (k.mem_usage() + mem::size_of::<BooleanBuffer>() + v.values().len()) as u32
392}
393
394/// Cache strategies that may only enable a subset of caches.
395#[derive(Clone)]
396pub enum CacheStrategy {
397    /// Strategy for normal operations.
398    /// Doesn't disable any cache.
399    EnableAll(CacheManagerRef),
400    /// Strategy for compaction.
401    /// Disables some caches during compaction to avoid affecting queries.
402    /// Enables the write cache so that the compaction can read files cached
403    /// in the write cache and write the compacted files back to the write cache.
404    Compaction(CacheManagerRef),
405    /// Do not use any cache.
406    Disabled,
407}
408
409impl CacheStrategy {
410    /// Gets fused SST metadata with cache metrics tracking.
411    pub(crate) async fn get_sst_meta_data(
412        &self,
413        file_id: RegionFileId,
414        metrics: &mut MetadataCacheMetrics,
415        page_index_policy: PageIndexPolicy,
416    ) -> Option<Arc<CachedSstMeta>> {
417        match self {
418            CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => {
419                cache_manager
420                    .get_sst_meta_data(file_id, metrics, page_index_policy)
421                    .await
422            }
423            CacheStrategy::Disabled => {
424                metrics.cache_miss += 1;
425                None
426            }
427        }
428    }
429
430    /// Calls [CacheManager::get_sst_meta_data_from_mem_cache()].
431    pub(crate) fn get_sst_meta_data_from_mem_cache(
432        &self,
433        file_id: RegionFileId,
434        page_index_policy: PageIndexPolicy,
435    ) -> Option<Arc<CachedSstMeta>> {
436        match self {
437            CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => {
438                cache_manager.get_sst_meta_data_from_mem_cache(file_id, page_index_policy)
439            }
440            CacheStrategy::Disabled => None,
441        }
442    }
443
444    /// Calls [CacheManager::get_parquet_meta_data_from_mem_cache()].
445    pub fn get_parquet_meta_data_from_mem_cache(
446        &self,
447        file_id: RegionFileId,
448    ) -> Option<Arc<ParquetMetaData>> {
449        self.get_sst_meta_data_from_mem_cache(file_id, PageIndexPolicy::Skip)
450            .map(|metadata| metadata.parquet_metadata())
451    }
452
453    /// Calls [CacheManager::put_sst_meta_data()].
454    pub(crate) fn put_sst_meta_data(&self, file_id: RegionFileId, metadata: Arc<CachedSstMeta>) {
455        match self {
456            CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => {
457                cache_manager.put_sst_meta_data(file_id, metadata);
458            }
459            CacheStrategy::Disabled => {}
460        }
461    }
462
463    /// Calls [CacheManager::put_parquet_meta_data()].
464    pub fn put_parquet_meta_data(
465        &self,
466        file_id: RegionFileId,
467        metadata: Arc<ParquetMetaData>,
468        region_metadata: Option<RegionMetadataRef>,
469    ) {
470        match self {
471            CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => {
472                cache_manager.put_parquet_meta_data(file_id, metadata, region_metadata);
473            }
474            CacheStrategy::Disabled => {}
475        }
476    }
477
478    /// Calls [CacheManager::get_prefilter_result()].
479    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
480    pub(crate) fn get_prefilter_result(&self, key: &PrefilterKey) -> Option<Arc<BooleanBuffer>> {
481        match self {
482            CacheStrategy::EnableAll(cache_manager) => cache_manager.get_prefilter_result(key),
483            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
484        }
485    }
486
487    /// Calls [CacheManager::put_prefilter_result()].
488    /// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
489    pub(crate) fn put_prefilter_result(&self, key: PrefilterKey, result: Arc<BooleanBuffer>) {
490        if let CacheStrategy::EnableAll(cache_manager) = self {
491            cache_manager.put_prefilter_result(key, result);
492        }
493    }
494
495    /// Calls [CacheManager::remove_parquet_meta_data()].
496    pub fn remove_parquet_meta_data(&self, file_id: RegionFileId) {
497        match self {
498            CacheStrategy::EnableAll(cache_manager) => {
499                cache_manager.remove_parquet_meta_data(file_id);
500            }
501            CacheStrategy::Compaction(cache_manager) => {
502                cache_manager.remove_parquet_meta_data(file_id);
503            }
504            CacheStrategy::Disabled => {}
505        }
506    }
507
508    /// Calls [CacheManager::get_repeated_vector()].
509    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
510    pub fn get_repeated_vector(
511        &self,
512        data_type: &ConcreteDataType,
513        value: &Value,
514    ) -> Option<VectorRef> {
515        match self {
516            CacheStrategy::EnableAll(cache_manager) => {
517                cache_manager.get_repeated_vector(data_type, value)
518            }
519            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
520        }
521    }
522
523    /// Calls [CacheManager::put_repeated_vector()].
524    /// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
525    pub fn put_repeated_vector(&self, value: Value, vector: VectorRef) {
526        if let CacheStrategy::EnableAll(cache_manager) = self {
527            cache_manager.put_repeated_vector(value, vector);
528        }
529    }
530
531    /// Calls [CacheManager::get_page_ranges()].
532    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
533    pub fn get_page_ranges(
534        &self,
535        file_id: FileId,
536        row_group_idx: usize,
537        ranges: &[Range<u64>],
538    ) -> Option<PageRangeLookup> {
539        match self {
540            CacheStrategy::EnableAll(cache_manager) => {
541                cache_manager.get_page_ranges(file_id, row_group_idx, ranges)
542            }
543            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
544        }
545    }
546
547    /// Calls [CacheManager::put_page_ranges()].
548    /// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
549    pub fn put_page_ranges(
550        &self,
551        file_id: FileId,
552        row_group_idx: usize,
553        ranges: &[Range<u64>],
554        pages: &[Bytes],
555    ) {
556        if let CacheStrategy::EnableAll(cache_manager) = self {
557            cache_manager.put_page_ranges(file_id, row_group_idx, ranges, pages);
558        }
559    }
560
561    /// Calls [CacheManager::evict_puffin_cache()].
562    pub async fn evict_puffin_cache(&self, file_id: RegionIndexId) {
563        match self {
564            CacheStrategy::EnableAll(cache_manager) => {
565                cache_manager.evict_puffin_cache(file_id).await
566            }
567            CacheStrategy::Compaction(cache_manager) => {
568                cache_manager.evict_puffin_cache(file_id).await
569            }
570            CacheStrategy::Disabled => {}
571        }
572    }
573
574    /// Calls [CacheManager::get_selector_result()].
575    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
576    pub fn get_selector_result(
577        &self,
578        selector_key: &SelectorResultKey,
579    ) -> Option<Arc<SelectorResultValue>> {
580        match self {
581            CacheStrategy::EnableAll(cache_manager) => {
582                cache_manager.get_selector_result(selector_key)
583            }
584            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
585        }
586    }
587
588    /// Calls [CacheManager::put_selector_result()].
589    /// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
590    pub fn put_selector_result(
591        &self,
592        selector_key: SelectorResultKey,
593        result: Arc<SelectorResultValue>,
594    ) {
595        if let CacheStrategy::EnableAll(cache_manager) = self {
596            cache_manager.put_selector_result(selector_key, result);
597        }
598    }
599
600    /// Calls [CacheManager::get_range_result()].
601    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
602    #[allow(dead_code)]
603    pub(crate) fn get_range_result(
604        &self,
605        key: &RangeScanCacheKey,
606    ) -> Option<Arc<RangeScanCacheValue>> {
607        match self {
608            CacheStrategy::EnableAll(cache_manager) => cache_manager.get_range_result(key),
609            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
610        }
611    }
612
613    /// Calls [CacheManager::put_range_result()].
614    /// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
615    pub(crate) fn put_range_result(
616        &self,
617        key: RangeScanCacheKey,
618        result: Arc<RangeScanCacheValue>,
619    ) {
620        if let CacheStrategy::EnableAll(cache_manager) = self {
621            cache_manager.put_range_result(key, result);
622        }
623    }
624
625    /// Returns true if the range result cache is enabled.
626    pub(crate) fn has_range_result_cache(&self) -> bool {
627        match self {
628            CacheStrategy::EnableAll(cache_manager) => cache_manager.has_range_result_cache(),
629            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => false,
630        }
631    }
632
633    pub(crate) fn range_result_memory_limiter(&self) -> Option<&Arc<RangeResultMemoryLimiter>> {
634        match self {
635            CacheStrategy::EnableAll(cache_manager) => {
636                Some(cache_manager.range_result_memory_limiter())
637            }
638            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
639        }
640    }
641
642    pub(crate) fn range_result_cache_size(&self) -> Option<usize> {
643        match self {
644            CacheStrategy::EnableAll(cache_manager) => {
645                Some(cache_manager.range_result_cache_size())
646            }
647            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
648        }
649    }
650
651    /// Calls [CacheManager::write_cache()].
652    /// It returns None if the strategy is [CacheStrategy::Disabled].
653    pub fn write_cache(&self) -> Option<&WriteCacheRef> {
654        match self {
655            CacheStrategy::EnableAll(cache_manager) => cache_manager.write_cache(),
656            CacheStrategy::Compaction(cache_manager) => cache_manager.write_cache(),
657            CacheStrategy::Disabled => None,
658        }
659    }
660
661    /// Calls [CacheManager::index_cache()].
662    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
663    pub fn inverted_index_cache(&self) -> Option<&InvertedIndexCacheRef> {
664        match self {
665            CacheStrategy::EnableAll(cache_manager) => cache_manager.inverted_index_cache(),
666            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
667        }
668    }
669
670    /// Calls [CacheManager::bloom_filter_index_cache()].
671    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
672    pub fn bloom_filter_index_cache(&self) -> Option<&BloomFilterIndexCacheRef> {
673        match self {
674            CacheStrategy::EnableAll(cache_manager) => cache_manager.bloom_filter_index_cache(),
675            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
676        }
677    }
678
679    /// Calls [CacheManager::vector_index_cache()].
680    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
681    #[cfg(feature = "vector_index")]
682    pub fn vector_index_cache(&self) -> Option<&VectorIndexCacheRef> {
683        match self {
684            CacheStrategy::EnableAll(cache_manager) => cache_manager.vector_index_cache(),
685            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
686        }
687    }
688
689    /// Calls [CacheManager::puffin_metadata_cache()].
690    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
691    pub fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
692        match self {
693            CacheStrategy::EnableAll(cache_manager) => cache_manager.puffin_metadata_cache(),
694            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
695        }
696    }
697
698    /// Calls [CacheManager::index_result_cache()].
699    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
700    pub fn index_result_cache(&self) -> Option<&IndexResultCache> {
701        match self {
702            CacheStrategy::EnableAll(cache_manager) => cache_manager.index_result_cache(),
703            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
704        }
705    }
706
707    /// Triggers download if the strategy is [CacheStrategy::EnableAll] and write cache is available.
708    pub fn maybe_download_background(
709        &self,
710        index_key: IndexKey,
711        remote_path: String,
712        remote_store: ObjectStore,
713        file_size: u64,
714    ) {
715        if let CacheStrategy::EnableAll(cache_manager) = self
716            && let Some(write_cache) = cache_manager.write_cache()
717        {
718            write_cache.file_cache().maybe_download_background(
719                index_key,
720                remote_path,
721                remote_store,
722                file_size,
723            );
724        }
725    }
726}
727
728/// Manages cached data for the engine.
729///
730/// All caches are disabled by default.
731#[derive(Default)]
732pub struct CacheManager {
733    /// Cache for SST metadata.
734    sst_meta_cache: Option<SstMetaCache>,
735    /// Cache for vectors.
736    vector_cache: Option<VectorCache>,
737    /// Cache for SST byte ranges.
738    page_cache: Option<Arc<PageRangeCache>>,
739    /// A Cache for writing files to object stores.
740    write_cache: Option<WriteCacheRef>,
741    /// Cache for inverted index.
742    inverted_index_cache: Option<InvertedIndexCacheRef>,
743    /// Cache for bloom filter index.
744    bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
745    /// Cache for vector index.
746    #[cfg(feature = "vector_index")]
747    vector_index_cache: Option<VectorIndexCacheRef>,
748    /// Puffin metadata cache.
749    puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
750    /// Cache for time series selectors.
751    selector_result_cache: Option<SelectorResultCache>,
752    /// Cache for range scan outputs in flat format.
753    range_result_cache: Option<RangeResultCache>,
754    /// Configured capacity for range scan outputs in flat format.
755    range_result_cache_size: u64,
756    /// Shared memory limiter for async range-result cache tasks.
757    range_result_memory_limiter: Arc<RangeResultMemoryLimiter>,
758    /// Cache for index result.
759    index_result_cache: Option<IndexResultCache>,
760    /// Cache for prefilter result.
761    prefilter_result_cache: Option<PrefilterResultCache>,
762}
763
764pub type CacheManagerRef = Arc<CacheManager>;
765
766impl CacheManager {
767    /// Returns a builder to build the cache.
768    pub fn builder() -> CacheManagerBuilder {
769        CacheManagerBuilder::default()
770    }
771
772    /// Gets fused SST metadata with metrics tracking.
773    /// Tries in-memory cache first, then file cache, updating metrics accordingly.
774    pub(crate) async fn get_sst_meta_data(
775        &self,
776        file_id: RegionFileId,
777        metrics: &mut MetadataCacheMetrics,
778        page_index_policy: PageIndexPolicy,
779    ) -> Option<Arc<CachedSstMeta>> {
780        if let Some(metadata) = self.get_sst_meta_data_from_mem_cache(file_id, page_index_policy) {
781            metrics.mem_cache_hit += 1;
782            return Some(metadata);
783        }
784
785        let key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Parquet);
786        if let Some(write_cache) = &self.write_cache
787            && let Some(metadata) = write_cache
788                .file_cache()
789                .get_sst_meta_data(key, metrics, page_index_policy)
790                .await
791        {
792            metrics.file_cache_hit += 1;
793            self.put_sst_meta_data(file_id, metadata.clone());
794            return Some(metadata);
795        }
796
797        metrics.cache_miss += 1;
798        None
799    }
800
801    /// Gets cached [ParquetMetaData] with metrics tracking.
802    /// Tries in-memory cache first, then file cache, updating metrics accordingly.
803    pub(crate) async fn get_parquet_meta_data(
804        &self,
805        file_id: RegionFileId,
806        metrics: &mut MetadataCacheMetrics,
807        page_index_policy: PageIndexPolicy,
808    ) -> Option<Arc<ParquetMetaData>> {
809        self.get_sst_meta_data(file_id, metrics, page_index_policy)
810            .await
811            .map(|metadata| metadata.parquet_metadata())
812    }
813
814    /// Gets cached fused SST metadata from in-memory cache.
815    /// This method does not perform I/O.
816    pub(crate) fn get_sst_meta_data_from_mem_cache(
817        &self,
818        file_id: RegionFileId,
819        page_index_policy: PageIndexPolicy,
820    ) -> Option<Arc<CachedSstMeta>> {
821        self.sst_meta_cache.as_ref().and_then(|sst_meta_cache| {
822            let value = sst_meta_cache.get(&SstMetaKey(file_id.region_id(), file_id.file_id()));
823            let value =
824                value.filter(|metadata| metadata.satisfies_page_index_policy(page_index_policy));
825            update_hit_miss(value, SST_META_TYPE)
826        })
827    }
828
829    /// Gets cached [ParquetMetaData] from in-memory cache.
830    /// This method does not perform I/O.
831    pub fn get_parquet_meta_data_from_mem_cache(
832        &self,
833        file_id: RegionFileId,
834    ) -> Option<Arc<ParquetMetaData>> {
835        self.get_sst_meta_data_from_mem_cache(file_id, PageIndexPolicy::Skip)
836            .map(|metadata| metadata.parquet_metadata())
837    }
838
839    /// Puts fused SST metadata into the cache.
840    pub(crate) fn put_sst_meta_data(&self, file_id: RegionFileId, metadata: Arc<CachedSstMeta>) {
841        if let Some(cache) = &self.sst_meta_cache {
842            let key = SstMetaKey(file_id.region_id(), file_id.file_id());
843            CACHE_BYTES
844                .with_label_values(&[SST_META_TYPE])
845                .add(meta_cache_weight(&key, &metadata).into());
846            cache.insert(key, metadata);
847        }
848    }
849
850    /// Puts [ParquetMetaData] into the cache.
851    pub fn put_parquet_meta_data(
852        &self,
853        file_id: RegionFileId,
854        metadata: Arc<ParquetMetaData>,
855        region_metadata: Option<RegionMetadataRef>,
856    ) {
857        if self.sst_meta_cache.is_some() {
858            let file_path = format!(
859                "region_id={}, file_id={}",
860                file_id.region_id(),
861                file_id.file_id()
862            );
863            match CachedSstMeta::try_new_with_region_metadata(
864                &file_path,
865                Arc::unwrap_or_clone(metadata),
866                region_metadata,
867            ) {
868                Ok(metadata) => self.put_sst_meta_data(file_id, Arc::new(metadata)),
869                Err(err) => warn!(
870                    err; "Failed to decode region metadata while caching parquet metadata, region_id: {}, file_id: {}",
871                    file_id.region_id(),
872                    file_id.file_id()
873                ),
874            }
875        }
876    }
877
878    /// Removes [ParquetMetaData] from the cache.
879    pub fn remove_parquet_meta_data(&self, file_id: RegionFileId) {
880        if let Some(cache) = &self.sst_meta_cache {
881            cache.remove(&SstMetaKey(file_id.region_id(), file_id.file_id()));
882        }
883    }
884
885    /// Returns the total weighted size of the in-memory SST meta cache.
886    pub(crate) fn sst_meta_cache_weighted_size(&self) -> u64 {
887        self.sst_meta_cache
888            .as_ref()
889            .map(|cache| cache.weighted_size())
890            .unwrap_or(0)
891    }
892
893    /// Returns true if the in-memory SST meta cache is enabled.
894    pub(crate) fn sst_meta_cache_enabled(&self) -> bool {
895        self.sst_meta_cache.is_some()
896    }
897
898    /// Gets a vector with repeated value for specific `key`.
899    pub fn get_repeated_vector(
900        &self,
901        data_type: &ConcreteDataType,
902        value: &Value,
903    ) -> Option<VectorRef> {
904        self.vector_cache.as_ref().and_then(|vector_cache| {
905            let value = vector_cache.get(&(data_type.clone(), value.clone()));
906            update_hit_miss(value, VECTOR_TYPE)
907        })
908    }
909
910    /// Puts a vector with repeated value into the cache.
911    pub fn put_repeated_vector(&self, value: Value, vector: VectorRef) {
912        if let Some(cache) = &self.vector_cache {
913            let key = (vector.data_type(), value);
914            CACHE_BYTES
915                .with_label_values(&[VECTOR_TYPE])
916                .add(vector_cache_weight(&key, &vector).into());
917            cache.insert(key, vector);
918        }
919    }
920
921    /// Gets cached byte fragments for the requested ranges.
922    pub fn get_page_ranges(
923        &self,
924        file_id: FileId,
925        row_group_idx: usize,
926        ranges: &[Range<u64>],
927    ) -> Option<PageRangeLookup> {
928        self.page_cache.as_ref().map(|page_cache| {
929            let lookup = page_cache.lookup(file_id, row_group_idx, ranges);
930            if lookup.cached_bytes > 0 {
931                CACHE_HIT.with_label_values(&[PAGE_TYPE]).inc();
932            }
933            if !lookup.missing_ranges.is_empty() {
934                CACHE_MISS.with_label_values(&[PAGE_TYPE]).inc();
935            }
936            lookup
937        })
938    }
939
940    /// Puts byte fragments into the page cache.
941    pub fn put_page_ranges(
942        &self,
943        file_id: FileId,
944        row_group_idx: usize,
945        ranges: &[Range<u64>],
946        pages: &[Bytes],
947    ) {
948        if let Some(cache) = &self.page_cache {
949            cache.insert_ranges(file_id, row_group_idx, ranges, pages);
950        }
951    }
952
953    /// Evicts every puffin-related cache entry for the given file.
954    pub async fn evict_puffin_cache(&self, file_id: RegionIndexId) {
955        if let Some(cache) = &self.bloom_filter_index_cache {
956            cache.invalidate_file(file_id.file_id());
957        }
958
959        if let Some(cache) = &self.inverted_index_cache {
960            cache.invalidate_file(file_id.file_id());
961        }
962
963        if let Some(cache) = &self.index_result_cache {
964            cache.invalidate_file(file_id.file_id());
965        }
966
967        #[cfg(feature = "vector_index")]
968        if let Some(cache) = &self.vector_index_cache {
969            cache.invalidate_file(file_id.file_id());
970        }
971
972        if let Some(cache) = &self.puffin_metadata_cache {
973            cache.remove(&file_id.to_string());
974        }
975
976        if let Some(write_cache) = &self.write_cache {
977            write_cache
978                .remove(IndexKey::new(
979                    file_id.region_id(),
980                    file_id.file_id(),
981                    FileType::Puffin(file_id.version),
982                ))
983                .await;
984        }
985    }
986
987    /// Gets result of for the selector.
988    pub fn get_selector_result(
989        &self,
990        selector_key: &SelectorResultKey,
991    ) -> Option<Arc<SelectorResultValue>> {
992        self.selector_result_cache
993            .as_ref()
994            .and_then(|selector_result_cache| selector_result_cache.get(selector_key))
995    }
996
997    /// Puts result of the selector into the cache.
998    pub fn put_selector_result(
999        &self,
1000        selector_key: SelectorResultKey,
1001        result: Arc<SelectorResultValue>,
1002    ) {
1003        if let Some(cache) = &self.selector_result_cache {
1004            CACHE_BYTES
1005                .with_label_values(&[SELECTOR_RESULT_TYPE])
1006                .add(selector_result_cache_weight(&selector_key, &result).into());
1007            cache.insert(selector_key, result);
1008        }
1009    }
1010
1011    /// Gets cached result for range scan.
1012    #[allow(dead_code)]
1013    pub(crate) fn get_range_result(
1014        &self,
1015        key: &RangeScanCacheKey,
1016    ) -> Option<Arc<RangeScanCacheValue>> {
1017        self.range_result_cache
1018            .as_ref()
1019            .and_then(|cache| update_hit_miss(cache.get(key), RANGE_RESULT_TYPE))
1020    }
1021
1022    /// Puts range scan result into cache.
1023    pub(crate) fn put_range_result(
1024        &self,
1025        key: RangeScanCacheKey,
1026        result: Arc<RangeScanCacheValue>,
1027    ) {
1028        if let Some(cache) = &self.range_result_cache {
1029            CACHE_BYTES
1030                .with_label_values(&[RANGE_RESULT_TYPE])
1031                .add(range_result_cache_weight(&key, &result).into());
1032            cache.insert(key, result);
1033        }
1034    }
1035
1036    /// Returns true if the range result cache is enabled.
1037    pub(crate) fn has_range_result_cache(&self) -> bool {
1038        self.range_result_cache.is_some()
1039    }
1040
1041    pub(crate) fn range_result_memory_limiter(&self) -> &Arc<RangeResultMemoryLimiter> {
1042        &self.range_result_memory_limiter
1043    }
1044
1045    pub(crate) fn range_result_cache_size(&self) -> usize {
1046        self.range_result_cache_size as usize
1047    }
1048
1049    /// Gets the write cache.
1050    pub(crate) fn write_cache(&self) -> Option<&WriteCacheRef> {
1051        self.write_cache.as_ref()
1052    }
1053
1054    pub(crate) fn inverted_index_cache(&self) -> Option<&InvertedIndexCacheRef> {
1055        self.inverted_index_cache.as_ref()
1056    }
1057
1058    pub(crate) fn bloom_filter_index_cache(&self) -> Option<&BloomFilterIndexCacheRef> {
1059        self.bloom_filter_index_cache.as_ref()
1060    }
1061
1062    #[cfg(feature = "vector_index")]
1063    pub(crate) fn vector_index_cache(&self) -> Option<&VectorIndexCacheRef> {
1064        self.vector_index_cache.as_ref()
1065    }
1066
1067    pub(crate) fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
1068        self.puffin_metadata_cache.as_ref()
1069    }
1070
1071    pub(crate) fn index_result_cache(&self) -> Option<&IndexResultCache> {
1072        self.index_result_cache.as_ref()
1073    }
1074
1075    pub(crate) fn get_prefilter_result(&self, key: &PrefilterKey) -> Option<Arc<BooleanBuffer>> {
1076        self.prefilter_result_cache
1077            .as_ref()
1078            .and_then(|cache| update_hit_miss(cache.get(key), PREFILTER_RESULT_TYPE))
1079    }
1080
1081    pub(crate) fn put_prefilter_result(&self, key: PrefilterKey, result: Arc<BooleanBuffer>) {
1082        if let Some(cache) = &self.prefilter_result_cache {
1083            CACHE_BYTES
1084                .with_label_values(&[PREFILTER_RESULT_TYPE])
1085                .add(prefilter_result_cache_weight(&key, &result).into());
1086            cache.insert(key, result);
1087        }
1088    }
1089}
1090
1091/// Increases selector cache miss metrics.
1092pub fn selector_result_cache_miss() {
1093    CACHE_MISS.with_label_values(&[SELECTOR_RESULT_TYPE]).inc()
1094}
1095
1096/// Increases selector cache hit metrics.
1097pub fn selector_result_cache_hit() {
1098    CACHE_HIT.with_label_values(&[SELECTOR_RESULT_TYPE]).inc()
1099}
1100
1101/// Builder to construct a [CacheManager].
1102#[derive(Default)]
1103pub struct CacheManagerBuilder {
1104    sst_meta_cache_size: u64,
1105    vector_cache_size: u64,
1106    page_cache_size: u64,
1107    index_metadata_size: u64,
1108    index_content_size: u64,
1109    index_content_page_size: u64,
1110    index_result_cache_size: u64,
1111    prefilter_result_cache_size: u64,
1112    puffin_metadata_size: u64,
1113    write_cache: Option<WriteCacheRef>,
1114    selector_result_cache_size: u64,
1115    range_result_cache_size: u64,
1116}
1117
1118impl CacheManagerBuilder {
1119    /// Sets meta cache size.
1120    pub fn sst_meta_cache_size(mut self, bytes: u64) -> Self {
1121        self.sst_meta_cache_size = bytes;
1122        self
1123    }
1124
1125    /// Sets vector cache size.
1126    pub fn vector_cache_size(mut self, bytes: u64) -> Self {
1127        self.vector_cache_size = bytes;
1128        self
1129    }
1130
1131    /// Sets page cache size.
1132    pub fn page_cache_size(mut self, bytes: u64) -> Self {
1133        self.page_cache_size = bytes;
1134        self
1135    }
1136
1137    /// Sets write cache.
1138    pub fn write_cache(mut self, cache: Option<WriteCacheRef>) -> Self {
1139        self.write_cache = cache;
1140        self
1141    }
1142
1143    /// Sets cache size for index metadata.
1144    pub fn index_metadata_size(mut self, bytes: u64) -> Self {
1145        self.index_metadata_size = bytes;
1146        self
1147    }
1148
1149    /// Sets cache size for index content.
1150    pub fn index_content_size(mut self, bytes: u64) -> Self {
1151        self.index_content_size = bytes;
1152        self
1153    }
1154
1155    /// Sets page size for index content.
1156    pub fn index_content_page_size(mut self, bytes: u64) -> Self {
1157        self.index_content_page_size = bytes;
1158        self
1159    }
1160
1161    /// Sets cache size for index result.
1162    pub fn index_result_cache_size(mut self, bytes: u64) -> Self {
1163        self.index_result_cache_size = bytes;
1164        self
1165    }
1166
1167    /// Sets cache size for prefilter result.
1168    pub fn prefilter_result_cache_size(mut self, bytes: u64) -> Self {
1169        self.prefilter_result_cache_size = bytes;
1170        self
1171    }
1172
1173    /// Sets cache size for puffin metadata.
1174    pub fn puffin_metadata_size(mut self, bytes: u64) -> Self {
1175        self.puffin_metadata_size = bytes;
1176        self
1177    }
1178
1179    /// Sets selector result cache size.
1180    pub fn selector_result_cache_size(mut self, bytes: u64) -> Self {
1181        self.selector_result_cache_size = bytes;
1182        self
1183    }
1184
1185    /// Sets range result cache size.
1186    pub fn range_result_cache_size(mut self, bytes: u64) -> Self {
1187        self.range_result_cache_size = bytes;
1188        self
1189    }
1190
1191    /// Builds the [CacheManager].
1192    pub fn build(self) -> CacheManager {
1193        let sst_meta_cache = (self.sst_meta_cache_size != 0).then(|| {
1194            Cache::builder()
1195                .max_capacity(self.sst_meta_cache_size)
1196                .weigher(meta_cache_weight)
1197                .eviction_listener(|k, v, cause| {
1198                    let size = meta_cache_weight(&k, &v);
1199                    CACHE_BYTES
1200                        .with_label_values(&[SST_META_TYPE])
1201                        .sub(size.into());
1202                    CACHE_EVICTION
1203                        .with_label_values(&[SST_META_TYPE, removal_cause_str(cause)])
1204                        .inc();
1205                })
1206                .build()
1207        });
1208        let vector_cache = (self.vector_cache_size != 0).then(|| {
1209            Cache::builder()
1210                .max_capacity(self.vector_cache_size)
1211                .weigher(vector_cache_weight)
1212                .eviction_listener(|k, v, cause| {
1213                    let size = vector_cache_weight(&k, &v);
1214                    CACHE_BYTES
1215                        .with_label_values(&[VECTOR_TYPE])
1216                        .sub(size.into());
1217                    CACHE_EVICTION
1218                        .with_label_values(&[VECTOR_TYPE, removal_cause_str(cause)])
1219                        .inc();
1220                })
1221                .build()
1222        });
1223        let page_cache =
1224            (self.page_cache_size != 0).then(|| PageRangeCache::new(self.page_cache_size));
1225        let inverted_index_cache = InvertedIndexCache::new(
1226            self.index_metadata_size,
1227            self.index_content_size,
1228            self.index_content_page_size,
1229        );
1230        // TODO(ruihang): check if it's ok to reuse the same param with inverted index
1231        let bloom_filter_index_cache = BloomFilterIndexCache::new(
1232            self.index_metadata_size,
1233            self.index_content_size,
1234            self.index_content_page_size,
1235        );
1236        #[cfg(feature = "vector_index")]
1237        let vector_index_cache = (self.index_content_size != 0)
1238            .then(|| Arc::new(VectorIndexCache::new(self.index_content_size)));
1239        let index_result_cache = (self.index_result_cache_size != 0)
1240            .then(|| IndexResultCache::new(self.index_result_cache_size));
1241        let prefilter_result_cache = (self.prefilter_result_cache_size != 0)
1242            .then(|| new_prefilter_result_cache(self.prefilter_result_cache_size));
1243        let puffin_metadata_cache =
1244            PuffinMetadataCache::new(self.puffin_metadata_size, &CACHE_BYTES);
1245        let selector_result_cache = (self.selector_result_cache_size != 0).then(|| {
1246            Cache::builder()
1247                .max_capacity(self.selector_result_cache_size)
1248                .weigher(selector_result_cache_weight)
1249                .eviction_listener(|k, v, cause| {
1250                    let size = selector_result_cache_weight(&k, &v);
1251                    CACHE_BYTES
1252                        .with_label_values(&[SELECTOR_RESULT_TYPE])
1253                        .sub(size.into());
1254                    CACHE_EVICTION
1255                        .with_label_values(&[SELECTOR_RESULT_TYPE, removal_cause_str(cause)])
1256                        .inc();
1257                })
1258                .build()
1259        });
1260        let range_result_cache = (self.range_result_cache_size != 0).then(|| {
1261            Cache::builder()
1262                .max_capacity(self.range_result_cache_size)
1263                .weigher(range_result_cache_weight)
1264                .eviction_listener(move |k, v, cause| {
1265                    let size = range_result_cache_weight(&k, &v);
1266                    CACHE_BYTES
1267                        .with_label_values(&[RANGE_RESULT_TYPE])
1268                        .sub(size.into());
1269                    CACHE_EVICTION
1270                        .with_label_values(&[RANGE_RESULT_TYPE, removal_cause_str(cause)])
1271                        .inc();
1272                })
1273                .build()
1274        });
1275        CacheManager {
1276            sst_meta_cache,
1277            vector_cache,
1278            page_cache,
1279            write_cache: self.write_cache,
1280            inverted_index_cache: Some(Arc::new(inverted_index_cache)),
1281            bloom_filter_index_cache: Some(Arc::new(bloom_filter_index_cache)),
1282            #[cfg(feature = "vector_index")]
1283            vector_index_cache,
1284            puffin_metadata_cache: Some(Arc::new(puffin_metadata_cache)),
1285            selector_result_cache,
1286            range_result_cache,
1287            range_result_cache_size: self.range_result_cache_size,
1288            range_result_memory_limiter: Arc::new(RangeResultMemoryLimiter::new(
1289                self.range_result_cache_size as usize,
1290                RANGE_RESULT_CONCAT_MEMORY_PERMIT.as_bytes() as usize,
1291            )),
1292            index_result_cache,
1293            prefilter_result_cache,
1294        }
1295    }
1296}
1297
1298fn meta_cache_weight(k: &SstMetaKey, v: &Arc<CachedSstMeta>) -> u32 {
1299    // We ignore the size of `Arc`.
1300    let size =
1301        k.estimated_size() + parquet_meta_size(&v.parquet_metadata) + v.region_metadata_weight;
1302    u32::try_from(size).unwrap_or(u32::MAX)
1303}
1304
1305fn vector_cache_weight(_k: &(ConcreteDataType, Value), v: &VectorRef) -> u32 {
1306    // We ignore the heap size of `Value`.
1307    (mem::size_of::<ConcreteDataType>() + mem::size_of::<Value>() + v.memory_size()) as u32
1308}
1309
1310fn page_cache_weight(k: &PageFragmentKey, v: &Bytes) -> u32 {
1311    (k.estimated_size() + mem::size_of::<Bytes>() + v.len()) as u32
1312}
1313
1314fn selector_result_cache_weight(k: &SelectorResultKey, v: &Arc<SelectorResultValue>) -> u32 {
1315    (mem::size_of_val(k) + v.estimated_size()) as u32
1316}
1317
1318fn range_result_cache_weight(k: &RangeScanCacheKey, v: &Arc<RangeScanCacheValue>) -> u32 {
1319    (k.estimated_size() + v.estimated_size()) as u32
1320}
1321
1322/// Updates cache hit/miss metrics.
1323fn update_hit_miss<T>(value: Option<T>, cache_type: &str) -> Option<T> {
1324    if value.is_some() {
1325        CACHE_HIT.with_label_values(&[cache_type]).inc();
1326    } else {
1327        CACHE_MISS.with_label_values(&[cache_type]).inc();
1328    }
1329    value
1330}
1331
1332/// Cache key (region id, file id) for SST meta.
1333#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1334struct SstMetaKey(RegionId, FileId);
1335
1336impl SstMetaKey {
1337    /// Returns memory used by the key (estimated).
1338    fn estimated_size(&self) -> usize {
1339        mem::size_of::<Self>()
1340    }
1341}
1342
1343#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1344struct PageFragmentGroupKey {
1345    file_id: FileId,
1346    row_group_idx: usize,
1347}
1348
1349/// Cache key for one byte fragment in an SST row group.
1350#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1351pub struct PageFragmentKey {
1352    /// Id of the SST file.
1353    file_id: FileId,
1354    /// Index of the row group.
1355    row_group_idx: usize,
1356    /// Start offset of the cached byte fragment.
1357    start: u64,
1358    /// End offset of the cached byte fragment.
1359    end: u64,
1360}
1361
1362impl PageFragmentKey {
1363    fn new(file_id: FileId, row_group_idx: usize, range: &Range<u64>) -> PageFragmentKey {
1364        PageFragmentKey {
1365            file_id,
1366            row_group_idx,
1367            start: range.start,
1368            end: range.end,
1369        }
1370    }
1371
1372    fn group_key(&self) -> PageFragmentGroupKey {
1373        PageFragmentGroupKey {
1374            file_id: self.file_id,
1375            row_group_idx: self.row_group_idx,
1376        }
1377    }
1378
1379    /// Returns memory used by the key (estimated).
1380    fn estimated_size(&self) -> usize {
1381        mem::size_of::<Self>()
1382    }
1383}
1384
1385/// One cached byte fragment that overlaps a requested range.
1386#[derive(Clone)]
1387pub struct PageRangePart {
1388    /// Range covered by `bytes`.
1389    pub range: Range<u64>,
1390    /// Bytes for `range`.
1391    pub bytes: Bytes,
1392}
1393
1394/// Result of looking up request ranges in the page range cache.
1395pub struct PageRangeLookup {
1396    /// Cached fragments grouped by the original requested range index.
1397    pub cached_parts: Vec<Vec<PageRangePart>>,
1398    /// Ranges that are not covered by cached fragments and need fetching.
1399    pub missing_ranges: Vec<Range<u64>>,
1400    /// Number of cached fragments used.
1401    pub cached_range_count: usize,
1402    /// Number of requested bytes served from cached fragments.
1403    pub cached_bytes: u64,
1404}
1405
1406impl PageRangeLookup {
1407    pub fn is_fully_cached(&self) -> bool {
1408        self.missing_ranges.is_empty()
1409    }
1410}
1411
1412type PageFragmentRangeIndex = BTreeMap<(u64, u64), PageFragmentKey>;
1413type PageFragmentIndex = HashMap<PageFragmentGroupKey, PageFragmentRangeIndex>;
1414
1415/// Byte-fragment cache for Parquet row-group reads.
1416pub struct PageRangeCache {
1417    cache: Cache<PageFragmentKey, Bytes>,
1418    index: RwLock<PageFragmentIndex>,
1419}
1420
1421impl PageRangeCache {
1422    fn new(capacity: u64) -> Arc<PageRangeCache> {
1423        Arc::new_cyclic(|weak_cache: &std::sync::Weak<PageRangeCache>| {
1424            let cache = Cache::builder()
1425                .max_capacity(capacity)
1426                .weigher(page_cache_weight)
1427                .eviction_listener({
1428                    let weak_cache = weak_cache.clone();
1429                    move |k, v, cause| {
1430                        let size = page_cache_weight(&k, &v);
1431                        CACHE_BYTES.with_label_values(&[PAGE_TYPE]).sub(size.into());
1432                        CACHE_EVICTION
1433                            .with_label_values(&[PAGE_TYPE, removal_cause_str(cause)])
1434                            .inc();
1435
1436                        if let Some(cache) = weak_cache.upgrade()
1437                            && !matches!(cause, RemovalCause::Replaced)
1438                        {
1439                            cache.remove_index_entry(*k);
1440                        }
1441                    }
1442                })
1443                .build();
1444
1445            PageRangeCache {
1446                cache,
1447                index: RwLock::new(HashMap::new()),
1448            }
1449        })
1450    }
1451
1452    fn lookup(
1453        &self,
1454        file_id: FileId,
1455        row_group_idx: usize,
1456        ranges: &[Range<u64>],
1457    ) -> PageRangeLookup {
1458        let mut cached_parts = Vec::with_capacity(ranges.len());
1459        let mut missing_ranges = Vec::new();
1460        let mut cached_range_count = 0;
1461        let mut cached_bytes = 0;
1462
1463        for range in ranges {
1464            if range.start >= range.end {
1465                cached_parts.push(Vec::new());
1466                continue;
1467            }
1468
1469            let mut parts = Vec::new();
1470            let candidates = self.find_index_candidates(file_id, row_group_idx, range);
1471            let mut stale_keys = Vec::new();
1472
1473            for fragment_key in candidates {
1474                if let Some(bytes) = self.cache.get(&fragment_key) {
1475                    let part_start = range.start.max(fragment_key.start);
1476                    let part_end = range.end.min(fragment_key.end);
1477                    let slice_start = (part_start - fragment_key.start) as usize;
1478                    let slice_end = (part_end - fragment_key.start) as usize;
1479                    parts.push(PageRangePart {
1480                        range: part_start..part_end,
1481                        bytes: bytes.slice(slice_start..slice_end),
1482                    });
1483                } else {
1484                    stale_keys.push(fragment_key);
1485                }
1486            }
1487            for key in stale_keys {
1488                self.remove_uncached_index_entry(key);
1489            }
1490
1491            let mut cursor = range.start;
1492            let mut compacted_parts: Vec<PageRangePart> = Vec::with_capacity(parts.len());
1493            for part in parts {
1494                if part.range.end <= cursor {
1495                    continue;
1496                }
1497
1498                let part = if part.range.start < cursor {
1499                    let offset = (cursor - part.range.start) as usize;
1500                    PageRangePart {
1501                        range: cursor..part.range.end,
1502                        bytes: part.bytes.slice(offset..),
1503                    }
1504                } else {
1505                    part
1506                };
1507
1508                if cursor < part.range.start {
1509                    missing_ranges.push(cursor..part.range.start);
1510                }
1511                cached_bytes += part.range.end - part.range.start;
1512                cached_range_count += 1;
1513                cursor = part.range.end;
1514                compacted_parts.push(part);
1515
1516                if cursor >= range.end {
1517                    break;
1518                }
1519            }
1520
1521            if cursor < range.end {
1522                missing_ranges.push(cursor..range.end);
1523            }
1524            cached_parts.push(compacted_parts);
1525        }
1526
1527        PageRangeLookup {
1528            cached_parts,
1529            missing_ranges,
1530            cached_range_count,
1531            cached_bytes,
1532        }
1533    }
1534
1535    fn insert_ranges(
1536        &self,
1537        file_id: FileId,
1538        row_group_idx: usize,
1539        ranges: &[Range<u64>],
1540        pages: &[Bytes],
1541    ) {
1542        for (range, bytes) in ranges.iter().zip(pages) {
1543            if range.start >= range.end || bytes.len() as u64 != range.end - range.start {
1544                continue;
1545            }
1546
1547            let key = PageFragmentKey::new(file_id, row_group_idx, range);
1548            let bytes = Bytes::copy_from_slice(bytes.as_ref());
1549            let size = page_cache_weight(&key, &bytes);
1550            CACHE_BYTES.with_label_values(&[PAGE_TYPE]).add(size.into());
1551            self.cache.insert(key, bytes);
1552            let mut index = self.index.write().unwrap();
1553            index
1554                .entry(key.group_key())
1555                .or_default()
1556                .insert((key.start, key.end), key);
1557        }
1558    }
1559
1560    fn find_index_candidates(
1561        &self,
1562        file_id: FileId,
1563        row_group_idx: usize,
1564        range: &Range<u64>,
1565    ) -> Vec<PageFragmentKey> {
1566        let group_key = PageFragmentGroupKey {
1567            file_id,
1568            row_group_idx,
1569        };
1570        let index = self.index.read().unwrap();
1571        index
1572            .get(&group_key)
1573            .map(|ranges| {
1574                ranges
1575                    .range(..(range.end, 0))
1576                    .filter_map(|(_, fragment_key)| {
1577                        (fragment_key.end > range.start).then_some(*fragment_key)
1578                    })
1579                    .collect()
1580            })
1581            .unwrap_or_default()
1582    }
1583
1584    fn remove_uncached_index_entry(&self, key: PageFragmentKey) {
1585        let group_key = key.group_key();
1586        let mut index = self.index.write().unwrap();
1587        if self.cache.contains_key(&key) {
1588            return;
1589        }
1590
1591        Self::remove_index_entry_locked(&mut index, group_key, key);
1592    }
1593
1594    fn remove_index_entry(&self, key: PageFragmentKey) {
1595        let group_key = key.group_key();
1596        let mut index = self.index.write().unwrap();
1597        Self::remove_index_entry_locked(&mut index, group_key, key);
1598    }
1599
1600    fn remove_index_entry_locked(
1601        index: &mut PageFragmentIndex,
1602        group_key: PageFragmentGroupKey,
1603        key: PageFragmentKey,
1604    ) {
1605        let Some(ranges) = index.get_mut(&group_key) else {
1606            return;
1607        };
1608
1609        let removed = ranges
1610            .get(&(key.start, key.end))
1611            .is_some_and(|current| current == &key);
1612        if removed {
1613            ranges.remove(&(key.start, key.end));
1614        }
1615        if ranges.is_empty() {
1616            index.remove(&group_key);
1617        }
1618    }
1619}
1620
1621/// Cache key for time series row selector result.
1622#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1623pub struct SelectorResultKey {
1624    /// Id of the SST file.
1625    pub file_id: FileId,
1626    /// Index of the row group.
1627    pub row_group_idx: usize,
1628    /// Time series row selector.
1629    pub selector: TimeSeriesRowSelector,
1630}
1631
1632/// Result stored in the selector result cache.
1633pub enum SelectorResult {
1634    /// Batches in the primary key format.
1635    PrimaryKey(Vec<Batch>),
1636    /// Record batches in the flat format.
1637    Flat(Vec<RecordBatch>),
1638}
1639
1640/// Cached result for time series row selector.
1641pub struct SelectorResultValue {
1642    /// Batches of rows selected by the selector.
1643    pub result: SelectorResult,
1644    /// The read columns of rows.
1645    pub read_cols: ParquetReadColumns,
1646}
1647
1648impl SelectorResultValue {
1649    /// Creates a new selector result value with primary key format.
1650    pub fn new(result: Vec<Batch>, read_cols: ParquetReadColumns) -> SelectorResultValue {
1651        SelectorResultValue {
1652            result: SelectorResult::PrimaryKey(result),
1653            read_cols,
1654        }
1655    }
1656
1657    /// Creates a new selector result value with flat format.
1658    pub fn new_flat(
1659        result: Vec<RecordBatch>,
1660        read_cols: ParquetReadColumns,
1661    ) -> SelectorResultValue {
1662        SelectorResultValue {
1663            result: SelectorResult::Flat(result),
1664            read_cols,
1665        }
1666    }
1667
1668    /// Returns memory used by the value (estimated).
1669    fn estimated_size(&self) -> usize {
1670        match &self.result {
1671            SelectorResult::PrimaryKey(batches) => {
1672                batches.iter().map(|batch| batch.memory_size()).sum()
1673            }
1674            SelectorResult::Flat(batches) => batches.iter().map(record_batch_estimated_size).sum(),
1675        }
1676    }
1677}
1678
1679/// Maps (region id, file id) to fused SST metadata.
1680type SstMetaCache = Cache<SstMetaKey, Arc<CachedSstMeta>>;
1681/// Maps [Value] to a vector that holds this value repeatedly.
1682///
1683/// e.g. `"hello" => ["hello", "hello", "hello"]`
1684type VectorCache = Cache<(ConcreteDataType, Value), VectorRef>;
1685/// Maps (file id, row group id, time series row selector) to [SelectorResultValue].
1686type SelectorResultCache = Cache<SelectorResultKey, Arc<SelectorResultValue>>;
1687/// Maps partition-range scan key to cached flat batches.
1688type RangeResultCache = Cache<RangeScanCacheKey, Arc<RangeScanCacheValue>>;
1689
1690#[cfg(test)]
1691mod tests {
1692    use std::sync::Arc;
1693
1694    use api::v1::SemanticType;
1695    use api::v1::index::{BloomFilterMeta, InvertedIndexMetas};
1696    use datatypes::schema::ColumnSchema;
1697    use datatypes::vectors::Int64Vector;
1698    use puffin::file_metadata::FileMetadata;
1699    use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
1700    use store_api::storage::ColumnId;
1701
1702    use super::*;
1703    use crate::cache::index::bloom_filter_index::Tag;
1704    use crate::cache::index::result_cache::PredicateKey;
1705    use crate::cache::test_util::{
1706        parquet_meta, sst_parquet_meta, sst_parquet_meta_with_region_metadata,
1707    };
1708    use crate::read::range_cache::{
1709        RangeScanCacheKey, RangeScanCacheValue, ScanRequestFingerprintBuilder,
1710    };
1711    use crate::read::read_columns::ReadColumns;
1712    use crate::sst::parquet::row_selection::RowGroupSelection;
1713
1714    #[tokio::test]
1715    async fn test_disable_cache() {
1716        let cache = CacheManager::default();
1717        assert!(cache.sst_meta_cache.is_none());
1718        assert!(cache.vector_cache.is_none());
1719        assert!(cache.page_cache.is_none());
1720
1721        let region_id = RegionId::new(1, 1);
1722        let file_id = RegionFileId::new(region_id, FileId::random());
1723        let metadata = parquet_meta();
1724        let mut metrics = MetadataCacheMetrics::default();
1725        cache.put_parquet_meta_data(file_id, metadata, None);
1726        assert!(
1727            cache
1728                .get_parquet_meta_data(file_id, &mut metrics, Default::default())
1729                .await
1730                .is_none()
1731        );
1732
1733        let value = Value::Int64(10);
1734        let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10]));
1735        cache.put_repeated_vector(value.clone(), vector.clone());
1736        assert!(
1737            cache
1738                .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
1739                .is_none()
1740        );
1741
1742        cache.put_page_ranges(
1743            file_id.file_id(),
1744            1,
1745            &[Range { start: 0, end: 5 }],
1746            &[Bytes::from_static(b"abcde")],
1747        );
1748        assert!(
1749            cache
1750                .get_page_ranges(file_id.file_id(), 1, &[Range { start: 0, end: 5 }])
1751                .is_none()
1752        );
1753
1754        assert!(cache.write_cache().is_none());
1755    }
1756
1757    #[tokio::test]
1758    async fn test_parquet_meta_cache() {
1759        let cache = CacheManager::builder().sst_meta_cache_size(2000).build();
1760        let mut metrics = MetadataCacheMetrics::default();
1761        let region_id = RegionId::new(1, 1);
1762        let file_id = RegionFileId::new(region_id, FileId::random());
1763        assert!(
1764            cache
1765                .get_parquet_meta_data(file_id, &mut metrics, Default::default())
1766                .await
1767                .is_none()
1768        );
1769        let (metadata, region_metadata) = sst_parquet_meta();
1770        cache.put_parquet_meta_data(file_id, metadata, None);
1771        let cached = cache
1772            .get_sst_meta_data(file_id, &mut metrics, Default::default())
1773            .await
1774            .unwrap();
1775        assert_eq!(region_metadata, cached.region_metadata());
1776        assert!(
1777            cached
1778                .parquet_metadata()
1779                .file_metadata()
1780                .key_value_metadata()
1781                .is_none_or(|key_values| {
1782                    key_values
1783                        .iter()
1784                        .all(|key_value| key_value.key != PARQUET_METADATA_KEY)
1785                })
1786        );
1787        cache.remove_parquet_meta_data(file_id);
1788        assert!(
1789            cache
1790                .get_parquet_meta_data(file_id, &mut metrics, Default::default())
1791                .await
1792                .is_none()
1793        );
1794    }
1795
1796    #[tokio::test]
1797    async fn test_parquet_meta_cache_with_provided_region_metadata() {
1798        let cache = CacheManager::builder().sst_meta_cache_size(2000).build();
1799        let mut metrics = MetadataCacheMetrics::default();
1800        let region_id = RegionId::new(1, 1);
1801        let file_id = RegionFileId::new(region_id, FileId::random());
1802        let (metadata, region_metadata) = sst_parquet_meta();
1803
1804        cache.put_parquet_meta_data(file_id, metadata, Some(region_metadata.clone()));
1805
1806        let cached = cache
1807            .get_sst_meta_data(file_id, &mut metrics, Default::default())
1808            .await
1809            .unwrap();
1810        assert!(Arc::ptr_eq(&region_metadata, &cached.region_metadata()));
1811    }
1812
1813    #[tokio::test]
1814    async fn test_parquet_meta_cache_respects_page_index_policy() {
1815        let cache = CacheManager::builder().sst_meta_cache_size(2000).build();
1816        let region_id = RegionId::new(1, 1);
1817        let file_id = RegionFileId::new(region_id, FileId::random());
1818        let (metadata, _) = sst_parquet_meta();
1819
1820        let skip_metadata = Arc::new(
1821            CachedSstMeta::try_new_with_page_index_policy(
1822                "test.parquet",
1823                Arc::unwrap_or_clone(metadata.clone()),
1824                None,
1825                PageIndexPolicy::Skip,
1826            )
1827            .unwrap(),
1828        );
1829        cache.put_sst_meta_data(file_id, skip_metadata);
1830
1831        let mut metrics = MetadataCacheMetrics::default();
1832        assert!(
1833            cache
1834                .get_sst_meta_data(file_id, &mut metrics, PageIndexPolicy::Optional)
1835                .await
1836                .is_none()
1837        );
1838        assert_eq!(1, metrics.cache_miss);
1839
1840        let optional_metadata = Arc::new(
1841            CachedSstMeta::try_new_with_page_index_policy(
1842                "test.parquet",
1843                Arc::unwrap_or_clone(metadata),
1844                None,
1845                PageIndexPolicy::Optional,
1846            )
1847            .unwrap(),
1848        );
1849        cache.put_sst_meta_data(file_id, optional_metadata);
1850
1851        let mut metrics = MetadataCacheMetrics::default();
1852        assert!(
1853            cache
1854                .get_sst_meta_data(file_id, &mut metrics, PageIndexPolicy::Optional)
1855                .await
1856                .is_some()
1857        );
1858        assert_eq!(1, metrics.mem_cache_hit);
1859
1860        let mut metrics = MetadataCacheMetrics::default();
1861        assert!(
1862            cache
1863                .get_sst_meta_data(file_id, &mut metrics, PageIndexPolicy::Skip)
1864                .await
1865                .is_some()
1866        );
1867        assert_eq!(1, metrics.mem_cache_hit);
1868    }
1869
1870    #[test]
1871    fn test_meta_cache_weight_accounts_for_decoded_region_metadata() {
1872        let region_metadata = Arc::new(wide_region_metadata(128));
1873        let json_len = region_metadata.to_json().unwrap().len();
1874        let metadata = sst_parquet_meta_with_region_metadata(region_metadata.clone());
1875        let cached = Arc::new(
1876            CachedSstMeta::try_new("test.parquet", Arc::unwrap_or_clone(metadata)).unwrap(),
1877        );
1878        let key = SstMetaKey(region_metadata.region_id, FileId::random());
1879
1880        assert!(cached.region_metadata_weight > json_len);
1881        assert_eq!(
1882            meta_cache_weight(&key, &cached) as usize,
1883            key.estimated_size()
1884                + parquet_meta_size(&cached.parquet_metadata)
1885                + cached.region_metadata_weight
1886        );
1887    }
1888
1889    #[test]
1890    fn test_meta_cache_weight_saturates_on_overflow() {
1891        let region_metadata = Arc::new(wide_region_metadata(1));
1892        let metadata = sst_parquet_meta_with_region_metadata(region_metadata.clone());
1893        let mut cached =
1894            CachedSstMeta::try_new("test.parquet", Arc::unwrap_or_clone(metadata)).unwrap();
1895        cached.region_metadata_weight = u32::MAX as usize + 1;
1896        let cached = Arc::new(cached);
1897        let key = SstMetaKey(region_metadata.region_id, FileId::random());
1898
1899        assert_eq!(u32::MAX, meta_cache_weight(&key, &cached));
1900    }
1901
1902    #[test]
1903    fn test_repeated_vector_cache() {
1904        let cache = CacheManager::builder().vector_cache_size(4096).build();
1905        let value = Value::Int64(10);
1906        assert!(
1907            cache
1908                .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
1909                .is_none()
1910        );
1911        let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10]));
1912        cache.put_repeated_vector(value.clone(), vector.clone());
1913        let cached = cache
1914            .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
1915            .unwrap();
1916        assert_eq!(vector, cached);
1917    }
1918
1919    #[test]
1920    fn test_page_cache() {
1921        let cache = CacheManager::builder().page_cache_size(1000).build();
1922        let file_id = FileId::random();
1923        let uncached = 0..10;
1924        assert_eq!(
1925            vec![0..10],
1926            cache
1927                .get_page_ranges(file_id, 0, std::slice::from_ref(&uncached))
1928                .unwrap()
1929                .missing_ranges
1930        );
1931
1932        let cached = 100..500;
1933        cache.put_page_ranges(
1934            file_id,
1935            0,
1936            std::slice::from_ref(&cached),
1937            &[Bytes::from(vec![7; 400])],
1938        );
1939
1940        let subrange = 200..300;
1941        let lookup = cache
1942            .get_page_ranges(file_id, 0, std::slice::from_ref(&subrange))
1943            .unwrap();
1944        assert!(lookup.is_fully_cached());
1945        assert_eq!(100, lookup.cached_bytes);
1946        assert_eq!(1, lookup.cached_parts.len());
1947        assert_eq!(200..300, lookup.cached_parts[0][0].range);
1948        assert_eq!(100, lookup.cached_parts[0][0].bytes.len());
1949
1950        let overlapping = 400..600;
1951        let lookup = cache
1952            .get_page_ranges(file_id, 0, std::slice::from_ref(&overlapping))
1953            .unwrap();
1954        assert!(!lookup.is_fully_cached());
1955        assert_eq!(100, lookup.cached_bytes);
1956        assert_eq!(vec![500..600], lookup.missing_ranges);
1957        assert_eq!(400..500, lookup.cached_parts[0][0].range);
1958    }
1959
1960    #[test]
1961    fn test_page_cache_detaches_fragment_bytes() {
1962        let cache = PageRangeCache::new(1000);
1963        let file_id = FileId::random();
1964        let backing = Bytes::from(vec![1; 1024]);
1965        let page = backing.slice(512..522);
1966        let page_ptr = page.as_ptr();
1967        let range = 0..10;
1968
1969        cache.insert_ranges(
1970            file_id,
1971            0,
1972            std::slice::from_ref(&range),
1973            std::slice::from_ref(&page),
1974        );
1975
1976        let lookup = cache.lookup(file_id, 0, std::slice::from_ref(&range));
1977        assert!(lookup.is_fully_cached());
1978        assert_eq!(1, lookup.cached_parts[0].len());
1979        assert_eq!(&page[..], &lookup.cached_parts[0][0].bytes[..]);
1980        assert_ne!(page_ptr, lookup.cached_parts[0][0].bytes.as_ptr());
1981    }
1982
1983    #[test]
1984    fn test_page_cache_replaces_fragment() {
1985        let cache = PageRangeCache::new(1000);
1986        let file_id = FileId::random();
1987        let range = 0..10;
1988
1989        cache.insert_ranges(
1990            file_id,
1991            0,
1992            std::slice::from_ref(&range),
1993            &[Bytes::from(vec![1; 10])],
1994        );
1995        cache.insert_ranges(
1996            file_id,
1997            0,
1998            std::slice::from_ref(&range),
1999            &[Bytes::from(vec![2; 10])],
2000        );
2001        cache.cache.run_pending_tasks();
2002        assert_eq!(
2003            vec![PageFragmentKey::new(file_id, 0, &range)],
2004            cache.find_index_candidates(file_id, 0, &range)
2005        );
2006
2007        let lookup = cache.lookup(file_id, 0, std::slice::from_ref(&range));
2008        assert!(lookup.is_fully_cached());
2009        assert_eq!(&vec![2; 10][..], &lookup.cached_parts[0][0].bytes[..]);
2010    }
2011
2012    #[test]
2013    fn test_page_cache_retains_disjoint_inserts_for_same_row_group() {
2014        let cache = PageRangeCache::new(1000);
2015        let file_id = FileId::random();
2016        let range1 = 0..10;
2017        let range2 = 20..30;
2018
2019        cache.insert_ranges(
2020            file_id,
2021            0,
2022            std::slice::from_ref(&range1),
2023            &[Bytes::from(vec![1; 10])],
2024        );
2025        cache.insert_ranges(
2026            file_id,
2027            0,
2028            std::slice::from_ref(&range2),
2029            &[Bytes::from(vec![2; 10])],
2030        );
2031
2032        let lookup = cache.lookup(file_id, 0, &[range1, range2]);
2033        assert!(lookup.is_fully_cached());
2034        assert_eq!(2, lookup.cached_range_count);
2035        assert_eq!(&vec![1; 10][..], &lookup.cached_parts[0][0].bytes[..]);
2036        assert_eq!(&vec![2; 10][..], &lookup.cached_parts[1][0].bytes[..]);
2037    }
2038
2039    #[test]
2040    fn test_page_cache_fragment_eviction() {
2041        let file_id = FileId::random();
2042        let range = 0..10;
2043        let key = PageFragmentKey::new(file_id, 0, &range);
2044        let page = Bytes::from(vec![1; 10]);
2045        let cache = PageRangeCache::new(page_cache_weight(&key, &page) as u64);
2046
2047        cache.insert_ranges(
2048            file_id,
2049            0,
2050            std::slice::from_ref(&range),
2051            &[Bytes::from(vec![1; 10])],
2052        );
2053        assert!(
2054            cache
2055                .lookup(file_id, 0, std::slice::from_ref(&range))
2056                .is_fully_cached()
2057        );
2058
2059        cache.cache.invalidate(&key);
2060        cache.cache.run_pending_tasks();
2061        assert!(cache.find_index_candidates(file_id, 0, &range).is_empty());
2062
2063        let lookup = cache.lookup(file_id, 0, std::slice::from_ref(&range));
2064        assert!(!lookup.is_fully_cached());
2065        assert_eq!(vec![0..10], lookup.missing_ranges);
2066    }
2067
2068    #[test]
2069    fn test_page_cache_rejects_oversized_fragment() {
2070        let cache = PageRangeCache::new(1);
2071        let file_id = FileId::random();
2072        let range = 0..10;
2073
2074        cache.insert_ranges(
2075            file_id,
2076            0,
2077            std::slice::from_ref(&range),
2078            &[Bytes::from(vec![1; 10])],
2079        );
2080        cache.cache.run_pending_tasks();
2081
2082        let lookup = cache.lookup(file_id, 0, std::slice::from_ref(&range));
2083        assert!(!lookup.is_fully_cached());
2084        assert_eq!(vec![0..10], lookup.missing_ranges);
2085    }
2086
2087    #[test]
2088    fn test_selector_result_cache() {
2089        let cache = CacheManager::builder()
2090            .selector_result_cache_size(1000)
2091            .build();
2092        let file_id = FileId::random();
2093        let key = SelectorResultKey {
2094            file_id,
2095            row_group_idx: 0,
2096            selector: TimeSeriesRowSelector::LastRow,
2097        };
2098        assert!(cache.get_selector_result(&key).is_none());
2099        let result = Arc::new(SelectorResultValue::new(
2100            Vec::new(),
2101            ParquetReadColumns::from_deduped(Vec::new()),
2102        ));
2103        cache.put_selector_result(key, result);
2104        assert!(cache.get_selector_result(&key).is_some());
2105    }
2106
2107    #[test]
2108    fn test_prefilter_result_cache() {
2109        let disabled = CacheManager::builder().build();
2110        let file_id = FileId::random();
2111        let key = PrefilterKey::new(
2112            file_id,
2113            0,
2114            None,
2115            1,
2116            SmallVec::from_vec(vec!["tag_0 IN ([a])".to_string()]),
2117        );
2118        let selection = Arc::new(BooleanBuffer::new_set(3));
2119
2120        disabled.put_prefilter_result(key.clone(), selection.clone());
2121        assert!(disabled.get_prefilter_result(&key).is_none());
2122
2123        let cache = Arc::new(
2124            CacheManager::builder()
2125                .prefilter_result_cache_size(1000)
2126                .build(),
2127        );
2128        assert!(cache.get_prefilter_result(&key).is_none());
2129        cache.put_prefilter_result(key.clone(), selection.clone());
2130        assert_eq!(
2131            cache.get_prefilter_result(&key).unwrap().as_ref(),
2132            selection.as_ref()
2133        );
2134
2135        let enable_all = CacheStrategy::EnableAll(cache.clone());
2136        assert!(enable_all.get_prefilter_result(&key).is_some());
2137
2138        let compaction = CacheStrategy::Compaction(cache.clone());
2139        assert!(compaction.get_prefilter_result(&key).is_none());
2140        compaction.put_prefilter_result(key.clone(), selection.clone());
2141        assert!(cache.get_prefilter_result(&key).is_some());
2142
2143        let disabled_strategy = CacheStrategy::Disabled;
2144        assert!(disabled_strategy.get_prefilter_result(&key).is_none());
2145        disabled_strategy.put_prefilter_result(key.clone(), selection);
2146        assert!(cache.get_prefilter_result(&key).is_some());
2147    }
2148
2149    #[test]
2150    fn test_prefilter_key_distinguishes_dimensions() {
2151        let file_id = FileId::random();
2152        let row_selection = RowSelection::from(vec![RowSelector::skip(1), RowSelector::select(3)]);
2153        let other_row_selection =
2154            RowSelection::from(vec![RowSelector::skip(2), RowSelector::select(2)]);
2155        let row_selection = PrefilterKey::row_selection_snapshot(Some(&row_selection));
2156        let other_row_selection = PrefilterKey::row_selection_snapshot(Some(&other_row_selection));
2157        let base = PrefilterKey::new(
2158            file_id,
2159            0,
2160            row_selection.clone(),
2161            1,
2162            SmallVec::from_vec(vec!["tag_0 IN ([a])".to_string()]),
2163        );
2164
2165        assert_ne!(
2166            base,
2167            PrefilterKey::new(
2168                FileId::random(),
2169                0,
2170                row_selection.clone(),
2171                1,
2172                SmallVec::from_vec(vec!["tag_0 IN ([a])".to_string()])
2173            )
2174        );
2175        assert_ne!(
2176            base,
2177            PrefilterKey::new(
2178                file_id,
2179                1,
2180                row_selection.clone(),
2181                1,
2182                SmallVec::from_vec(vec!["tag_0 IN ([a])".to_string()])
2183            )
2184        );
2185        assert_ne!(
2186            base,
2187            PrefilterKey::new(
2188                file_id,
2189                0,
2190                other_row_selection,
2191                1,
2192                SmallVec::from_vec(vec!["tag_0 IN ([a])".to_string()])
2193            )
2194        );
2195        assert_ne!(
2196            base,
2197            PrefilterKey::new(
2198                file_id,
2199                0,
2200                row_selection.clone(),
2201                1,
2202                SmallVec::from_vec(vec!["tag_0 IN ([b])".to_string()])
2203            )
2204        );
2205        assert_ne!(
2206            base,
2207            PrefilterKey::new(
2208                file_id,
2209                0,
2210                row_selection.clone(),
2211                2,
2212                SmallVec::from_vec(vec!["tag_0 IN ([a])".to_string()])
2213            )
2214        );
2215        let pk_group = PrefilterKey::new(
2216            file_id,
2217            0,
2218            row_selection,
2219            1,
2220            SmallVec::from_vec(vec![
2221                "tag_0 IN ([a])".to_string(),
2222                "tag_1 IN ([x])".to_string(),
2223            ]),
2224        );
2225        assert_ne!(base, pk_group);
2226    }
2227
2228    #[test]
2229    fn test_range_result_cache() {
2230        let cache = Arc::new(
2231            CacheManager::builder()
2232                .range_result_cache_size(1024 * 1024)
2233                .build(),
2234        );
2235
2236        let key = RangeScanCacheKey {
2237            region_id: RegionId::new(1, 1),
2238            row_groups: vec![(FileId::random(), 0)],
2239            scan: ScanRequestFingerprintBuilder {
2240                read_columns: ReadColumns::from_deduped_column_ids(std::iter::empty()),
2241                read_column_types: vec![],
2242                filters: vec!["tag_0 = 1".to_string()],
2243                time_filters: vec![],
2244                series_row_selector: None,
2245                append_mode: false,
2246                filter_deleted: true,
2247                merge_mode: crate::region::options::MergeMode::LastRow,
2248                partition_expr_version: 0,
2249            }
2250            .build(),
2251        };
2252        let value = Arc::new(RangeScanCacheValue::new(Vec::new(), 0));
2253
2254        assert!(cache.get_range_result(&key).is_none());
2255        cache.put_range_result(key.clone(), value.clone());
2256        assert!(cache.get_range_result(&key).is_some());
2257
2258        let enable_all = CacheStrategy::EnableAll(cache.clone());
2259        assert!(enable_all.get_range_result(&key).is_some());
2260
2261        let compaction = CacheStrategy::Compaction(cache.clone());
2262        assert!(compaction.get_range_result(&key).is_none());
2263        compaction.put_range_result(key.clone(), value.clone());
2264        assert!(cache.get_range_result(&key).is_some());
2265
2266        let disabled = CacheStrategy::Disabled;
2267        assert!(disabled.get_range_result(&key).is_none());
2268        disabled.put_range_result(key.clone(), value);
2269        assert!(cache.get_range_result(&key).is_some());
2270    }
2271
2272    #[test]
2273    fn test_range_result_cache_size_configures_limiter() {
2274        let cache_size = 3 * 1024_u64;
2275        let cache = CacheManager::builder()
2276            .range_result_cache_size(cache_size)
2277            .build();
2278
2279        assert_eq!(cache.range_result_cache_size(), cache_size as usize);
2280        assert_eq!(
2281            cache.range_result_memory_limiter().permit_bytes(),
2282            RANGE_RESULT_CONCAT_MEMORY_PERMIT.as_bytes() as usize
2283        );
2284        assert_eq!(
2285            cache.range_result_memory_limiter().available_permits(),
2286            (cache_size as usize).div_ceil(RANGE_RESULT_CONCAT_MEMORY_PERMIT.as_bytes() as usize)
2287        );
2288    }
2289
2290    #[tokio::test]
2291    async fn range_result_memory_limiter_rejects_oversized_request() {
2292        let limiter = RangeResultMemoryLimiter::new(2 * 1024, 1024);
2293        assert_eq!(limiter.available_permits(), 2);
2294
2295        let err = limiter.acquire(10 * 1024).await.unwrap_err();
2296        assert!(
2297            err.to_string().contains("exceeds limiter capacity"),
2298            "unexpected error: {err}"
2299        );
2300        assert_eq!(limiter.available_permits(), 2);
2301    }
2302
2303    #[tokio::test]
2304    async fn range_result_memory_limiter_allows_request_up_to_capacity() {
2305        let limiter = RangeResultMemoryLimiter::new(2 * 1024, 1024);
2306        let permit = limiter.acquire(2 * 1024).await.unwrap();
2307        assert_eq!(limiter.available_permits(), 0);
2308        drop(permit);
2309        assert_eq!(limiter.available_permits(), 2);
2310    }
2311
2312    #[tokio::test]
2313    async fn test_evict_puffin_cache_clears_all_entries() {
2314        use std::collections::{BTreeMap, HashMap};
2315
2316        let cache = CacheManager::builder()
2317            .index_metadata_size(128)
2318            .index_content_size(128)
2319            .index_content_page_size(64)
2320            .index_result_cache_size(128)
2321            .puffin_metadata_size(128)
2322            .build();
2323        let cache = Arc::new(cache);
2324
2325        let region_id = RegionId::new(1, 1);
2326        let index_id = RegionIndexId::new(RegionFileId::new(region_id, FileId::random()), 0);
2327        let column_id: ColumnId = 1;
2328
2329        let bloom_cache = cache.bloom_filter_index_cache().unwrap().clone();
2330        let inverted_cache = cache.inverted_index_cache().unwrap().clone();
2331        let result_cache = cache.index_result_cache().unwrap();
2332        let puffin_metadata_cache = cache.puffin_metadata_cache().unwrap().clone();
2333
2334        let bloom_key = (
2335            index_id.file_id(),
2336            index_id.version,
2337            column_id,
2338            Tag::Skipping,
2339        );
2340        bloom_cache.put_metadata(bloom_key, Arc::new(BloomFilterMeta::default()));
2341        inverted_cache.put_metadata(
2342            (index_id.file_id(), index_id.version),
2343            Arc::new(InvertedIndexMetas::default()),
2344        );
2345        let predicate = PredicateKey::new_bloom(Arc::new(BTreeMap::new()));
2346        let selection = Arc::new(RowGroupSelection::default());
2347        result_cache.put(predicate.clone(), index_id.file_id(), selection);
2348        let file_id_str = index_id.to_string();
2349        let metadata = Arc::new(FileMetadata {
2350            blobs: Vec::new(),
2351            properties: HashMap::new(),
2352        });
2353        puffin_metadata_cache.put_metadata(file_id_str.clone(), metadata);
2354
2355        assert!(bloom_cache.get_metadata(bloom_key).is_some());
2356        assert!(
2357            inverted_cache
2358                .get_metadata((index_id.file_id(), index_id.version))
2359                .is_some()
2360        );
2361        assert!(result_cache.get(&predicate, index_id.file_id()).is_some());
2362        assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_some());
2363
2364        cache.evict_puffin_cache(index_id).await;
2365
2366        assert!(bloom_cache.get_metadata(bloom_key).is_none());
2367        assert!(
2368            inverted_cache
2369                .get_metadata((index_id.file_id(), index_id.version))
2370                .is_none()
2371        );
2372        assert!(result_cache.get(&predicate, index_id.file_id()).is_none());
2373        assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_none());
2374
2375        // Refill caches and evict via CacheStrategy to ensure delegation works.
2376        bloom_cache.put_metadata(bloom_key, Arc::new(BloomFilterMeta::default()));
2377        inverted_cache.put_metadata(
2378            (index_id.file_id(), index_id.version),
2379            Arc::new(InvertedIndexMetas::default()),
2380        );
2381        result_cache.put(
2382            predicate.clone(),
2383            index_id.file_id(),
2384            Arc::new(RowGroupSelection::default()),
2385        );
2386        puffin_metadata_cache.put_metadata(
2387            file_id_str.clone(),
2388            Arc::new(FileMetadata {
2389                blobs: Vec::new(),
2390                properties: HashMap::new(),
2391            }),
2392        );
2393
2394        let strategy = CacheStrategy::EnableAll(cache.clone());
2395        strategy.evict_puffin_cache(index_id).await;
2396
2397        assert!(bloom_cache.get_metadata(bloom_key).is_none());
2398        assert!(
2399            inverted_cache
2400                .get_metadata((index_id.file_id(), index_id.version))
2401                .is_none()
2402        );
2403        assert!(result_cache.get(&predicate, index_id.file_id()).is_none());
2404        assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_none());
2405    }
2406
2407    fn wide_region_metadata(column_count: u32) -> RegionMetadata {
2408        let region_id = RegionId::new(1024, 7);
2409        let mut builder = RegionMetadataBuilder::new(region_id);
2410        let mut primary_key = Vec::new();
2411
2412        for column_id in 0..column_count {
2413            let semantic_type = if column_id < 32 {
2414                primary_key.push(column_id);
2415                SemanticType::Tag
2416            } else {
2417                SemanticType::Field
2418            };
2419            let mut column_schema = ColumnSchema::new(
2420                format!("wide_column_{column_id}"),
2421                ConcreteDataType::string_datatype(),
2422                true,
2423            );
2424            column_schema
2425                .mut_metadata()
2426                .insert(format!("cache_key_{column_id}"), "cache_value".repeat(4));
2427            builder.push_column_metadata(ColumnMetadata {
2428                column_schema,
2429                semantic_type,
2430                column_id,
2431            });
2432        }
2433
2434        builder.push_column_metadata(ColumnMetadata {
2435            column_schema: ColumnSchema::new(
2436                "ts",
2437                ConcreteDataType::timestamp_millisecond_datatype(),
2438                false,
2439            ),
2440            semantic_type: SemanticType::Timestamp,
2441            column_id: column_count,
2442        });
2443        builder.primary_key(primary_key);
2444
2445        builder.build().unwrap()
2446    }
2447}