Skip to main content

mito2/
cache.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Cache for the engine.
16
17pub(crate) mod cache_size;
18
19pub(crate) mod file_cache;
20pub(crate) mod index;
21pub(crate) mod manifest_cache;
22#[cfg(test)]
23pub(crate) mod test_util;
24pub(crate) mod write_cache;
25
26use std::mem;
27use std::ops::Range;
28use std::sync::Arc;
29
30use bytes::Bytes;
31use common_base::readable_size::ReadableSize;
32use common_telemetry::warn;
33use datatypes::arrow::record_batch::RecordBatch;
34use datatypes::value::Value;
35use datatypes::vectors::VectorRef;
36use index::bloom_filter_index::{BloomFilterIndexCache, BloomFilterIndexCacheRef};
37use index::result_cache::IndexResultCache;
38use moka::notification::RemovalCause;
39use moka::sync::Cache;
40use object_store::ObjectStore;
41use parquet::file::metadata::{FileMetaData, PageIndexPolicy, ParquetMetaData};
42use puffin::puffin_manager::cache::{PuffinMetadataCache, PuffinMetadataCacheRef};
43use snafu::{OptionExt, ResultExt};
44use store_api::metadata::RegionMetadataRef;
45use store_api::storage::{ConcreteDataType, FileId, RegionId, TimeSeriesRowSelector};
46
47use crate::cache::cache_size::parquet_meta_size;
48use crate::cache::file_cache::{FileType, IndexKey};
49use crate::cache::index::inverted_index::{InvertedIndexCache, InvertedIndexCacheRef};
50#[cfg(feature = "vector_index")]
51use crate::cache::index::vector_index::{VectorIndexCache, VectorIndexCacheRef};
52use crate::cache::write_cache::WriteCacheRef;
53use crate::error::{InvalidMetadataSnafu, InvalidParquetSnafu, Result, UnexpectedSnafu};
54use crate::memtable::record_batch_estimated_size;
55use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS};
56use crate::read::Batch;
57use crate::read::range_cache::{RangeScanCacheKey, RangeScanCacheValue};
58use crate::sst::file::{RegionFileId, RegionIndexId};
59use crate::sst::parquet::PARQUET_METADATA_KEY;
60use crate::sst::parquet::read_columns::ParquetReadColumns;
61use crate::sst::parquet::reader::MetadataCacheMetrics;
62
63/// Metrics type key for sst meta.
64const SST_META_TYPE: &str = "sst_meta";
65/// Metrics type key for vector.
66const VECTOR_TYPE: &str = "vector";
67/// Metrics type key for pages.
68const PAGE_TYPE: &str = "page";
69/// Metrics type key for files on the local store.
70const FILE_TYPE: &str = "file";
71/// Metrics type key for index files (puffin) on the local store.
72const INDEX_TYPE: &str = "index";
73/// Metrics type key for selector result cache.
74const SELECTOR_RESULT_TYPE: &str = "selector_result";
75/// Metrics type key for range scan result cache.
76const RANGE_RESULT_TYPE: &str = "range_result";
77const RANGE_RESULT_CONCAT_MEMORY_LIMIT: ReadableSize = ReadableSize::mb(512);
78const RANGE_RESULT_CONCAT_MEMORY_PERMIT: ReadableSize = ReadableSize::kb(1);
79
80#[derive(Debug)]
81pub(crate) struct RangeResultMemoryLimiter {
82    semaphore: Arc<tokio::sync::Semaphore>,
83    permit_bytes: usize,
84    total_permits: usize,
85}
86
87impl Default for RangeResultMemoryLimiter {
88    fn default() -> Self {
89        Self::new(
90            RANGE_RESULT_CONCAT_MEMORY_LIMIT.as_bytes() as usize,
91            RANGE_RESULT_CONCAT_MEMORY_PERMIT.as_bytes() as usize,
92        )
93    }
94}
95
96impl RangeResultMemoryLimiter {
97    pub(crate) fn new(limit_bytes: usize, permit_bytes: usize) -> Self {
98        let permit_bytes = permit_bytes.max(1);
99        let total_permits = limit_bytes
100            .div_ceil(permit_bytes)
101            .clamp(1, tokio::sync::Semaphore::MAX_PERMITS);
102        Self {
103            semaphore: Arc::new(tokio::sync::Semaphore::new(total_permits)),
104            permit_bytes,
105            total_permits,
106        }
107    }
108
109    #[cfg(test)]
110    pub(crate) fn permit_bytes(&self) -> usize {
111        self.permit_bytes
112    }
113
114    #[cfg(test)]
115    pub(crate) fn available_permits(&self) -> usize {
116        self.semaphore.available_permits()
117    }
118
119    pub(crate) async fn acquire(&self, bytes: usize) -> Result<tokio::sync::SemaphorePermit<'_>> {
120        let permits = bytes.div_ceil(self.permit_bytes).max(1);
121        if permits > self.total_permits {
122            return UnexpectedSnafu {
123                reason: format!(
124                    "range result memory request of {bytes} bytes exceeds limiter capacity of {} bytes",
125                    self.total_permits.saturating_mul(self.permit_bytes)
126                ),
127            }
128            .fail();
129        }
130        self.semaphore
131            .acquire_many(permits as u32)
132            .await
133            .map_err(|_| {
134                UnexpectedSnafu {
135                    reason: "range result memory limiter is unexpectedly closed",
136                }
137                .build()
138            })
139    }
140}
141
142/// Cached SST metadata combines the parquet footer with the decoded region metadata.
143///
144/// The cached parquet footer strips the `greptime:metadata` JSON payload and stores the decoded
145/// [RegionMetadata] separately so readers can skip repeated deserialization work.
146#[derive(Debug)]
147pub(crate) struct CachedSstMeta {
148    parquet_metadata: Arc<ParquetMetaData>,
149    region_metadata: RegionMetadataRef,
150    region_metadata_weight: usize,
151}
152
153impl CachedSstMeta {
154    pub(crate) fn try_new(file_path: &str, parquet_metadata: ParquetMetaData) -> Result<Self> {
155        Self::try_new_with_region_metadata(file_path, parquet_metadata, None)
156    }
157
158    pub(crate) fn try_new_with_region_metadata(
159        file_path: &str,
160        parquet_metadata: ParquetMetaData,
161        region_metadata: Option<RegionMetadataRef>,
162    ) -> Result<Self> {
163        let file_metadata = parquet_metadata.file_metadata();
164        let key_values = file_metadata
165            .key_value_metadata()
166            .context(InvalidParquetSnafu {
167                file: file_path,
168                reason: "missing key value meta",
169            })?;
170        let meta_value = key_values
171            .iter()
172            .find(|kv| kv.key == PARQUET_METADATA_KEY)
173            .with_context(|| InvalidParquetSnafu {
174                file: file_path,
175                reason: format!("key {} not found", PARQUET_METADATA_KEY),
176            })?;
177        let json = meta_value
178            .value
179            .as_ref()
180            .with_context(|| InvalidParquetSnafu {
181                file: file_path,
182                reason: format!("No value for key {}", PARQUET_METADATA_KEY),
183            })?;
184        let region_metadata = match region_metadata {
185            Some(region_metadata) => region_metadata,
186            None => Arc::new(
187                store_api::metadata::RegionMetadata::from_json(json)
188                    .context(InvalidMetadataSnafu)?,
189            ),
190        };
191        // Keep the previous JSON-byte floor and charge the decoded structures as well.
192        let region_metadata_weight = region_metadata.estimated_size().max(json.len());
193        let parquet_metadata = Arc::new(strip_region_metadata_from_parquet(parquet_metadata));
194
195        Ok(Self {
196            parquet_metadata,
197            region_metadata,
198            region_metadata_weight,
199        })
200    }
201
202    pub(crate) fn parquet_metadata(&self) -> Arc<ParquetMetaData> {
203        self.parquet_metadata.clone()
204    }
205
206    pub(crate) fn region_metadata(&self) -> RegionMetadataRef {
207        self.region_metadata.clone()
208    }
209}
210
211fn strip_region_metadata_from_parquet(parquet_metadata: ParquetMetaData) -> ParquetMetaData {
212    let file_metadata = parquet_metadata.file_metadata();
213    let filtered_key_values = file_metadata.key_value_metadata().and_then(|key_values| {
214        let filtered = key_values
215            .iter()
216            .filter(|kv| kv.key != PARQUET_METADATA_KEY)
217            .cloned()
218            .collect::<Vec<_>>();
219        (!filtered.is_empty()).then_some(filtered)
220    });
221    let stripped_file_metadata = FileMetaData::new(
222        file_metadata.version(),
223        file_metadata.num_rows(),
224        file_metadata.created_by().map(ToString::to_string),
225        filtered_key_values,
226        file_metadata.schema_descr_ptr(),
227        file_metadata.column_orders().cloned(),
228    );
229
230    let mut builder = parquet_metadata.into_builder();
231    let row_groups = builder.take_row_groups();
232    let column_index = builder.take_column_index();
233    let offset_index = builder.take_offset_index();
234
235    parquet::file::metadata::ParquetMetaDataBuilder::new(stripped_file_metadata)
236        .set_row_groups(row_groups)
237        .set_column_index(column_index)
238        .set_offset_index(offset_index)
239        .build()
240}
241
242/// Cache strategies that may only enable a subset of caches.
243#[derive(Clone)]
244pub enum CacheStrategy {
245    /// Strategy for normal operations.
246    /// Doesn't disable any cache.
247    EnableAll(CacheManagerRef),
248    /// Strategy for compaction.
249    /// Disables some caches during compaction to avoid affecting queries.
250    /// Enables the write cache so that the compaction can read files cached
251    /// in the write cache and write the compacted files back to the write cache.
252    Compaction(CacheManagerRef),
253    /// Do not use any cache.
254    Disabled,
255}
256
257impl CacheStrategy {
258    /// Gets fused SST metadata with cache metrics tracking.
259    pub(crate) async fn get_sst_meta_data(
260        &self,
261        file_id: RegionFileId,
262        metrics: &mut MetadataCacheMetrics,
263        page_index_policy: PageIndexPolicy,
264    ) -> Option<Arc<CachedSstMeta>> {
265        match self {
266            CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => {
267                cache_manager
268                    .get_sst_meta_data(file_id, metrics, page_index_policy)
269                    .await
270            }
271            CacheStrategy::Disabled => {
272                metrics.cache_miss += 1;
273                None
274            }
275        }
276    }
277
278    /// Calls [CacheManager::get_sst_meta_data_from_mem_cache()].
279    pub(crate) fn get_sst_meta_data_from_mem_cache(
280        &self,
281        file_id: RegionFileId,
282    ) -> Option<Arc<CachedSstMeta>> {
283        match self {
284            CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => {
285                cache_manager.get_sst_meta_data_from_mem_cache(file_id)
286            }
287            CacheStrategy::Disabled => None,
288        }
289    }
290
291    /// Calls [CacheManager::get_parquet_meta_data_from_mem_cache()].
292    pub fn get_parquet_meta_data_from_mem_cache(
293        &self,
294        file_id: RegionFileId,
295    ) -> Option<Arc<ParquetMetaData>> {
296        self.get_sst_meta_data_from_mem_cache(file_id)
297            .map(|metadata| metadata.parquet_metadata())
298    }
299
300    /// Calls [CacheManager::put_sst_meta_data()].
301    pub(crate) fn put_sst_meta_data(&self, file_id: RegionFileId, metadata: Arc<CachedSstMeta>) {
302        match self {
303            CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => {
304                cache_manager.put_sst_meta_data(file_id, metadata);
305            }
306            CacheStrategy::Disabled => {}
307        }
308    }
309
310    /// Calls [CacheManager::put_parquet_meta_data()].
311    pub fn put_parquet_meta_data(
312        &self,
313        file_id: RegionFileId,
314        metadata: Arc<ParquetMetaData>,
315        region_metadata: Option<RegionMetadataRef>,
316    ) {
317        match self {
318            CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => {
319                cache_manager.put_parquet_meta_data(file_id, metadata, region_metadata);
320            }
321            CacheStrategy::Disabled => {}
322        }
323    }
324
325    /// Calls [CacheManager::remove_parquet_meta_data()].
326    pub fn remove_parquet_meta_data(&self, file_id: RegionFileId) {
327        match self {
328            CacheStrategy::EnableAll(cache_manager) => {
329                cache_manager.remove_parquet_meta_data(file_id);
330            }
331            CacheStrategy::Compaction(cache_manager) => {
332                cache_manager.remove_parquet_meta_data(file_id);
333            }
334            CacheStrategy::Disabled => {}
335        }
336    }
337
338    /// Calls [CacheManager::get_repeated_vector()].
339    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
340    pub fn get_repeated_vector(
341        &self,
342        data_type: &ConcreteDataType,
343        value: &Value,
344    ) -> Option<VectorRef> {
345        match self {
346            CacheStrategy::EnableAll(cache_manager) => {
347                cache_manager.get_repeated_vector(data_type, value)
348            }
349            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
350        }
351    }
352
353    /// Calls [CacheManager::put_repeated_vector()].
354    /// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
355    pub fn put_repeated_vector(&self, value: Value, vector: VectorRef) {
356        if let CacheStrategy::EnableAll(cache_manager) = self {
357            cache_manager.put_repeated_vector(value, vector);
358        }
359    }
360
361    /// Calls [CacheManager::get_pages()].
362    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
363    pub fn get_pages(&self, page_key: &PageKey) -> Option<Arc<PageValue>> {
364        match self {
365            CacheStrategy::EnableAll(cache_manager) => cache_manager.get_pages(page_key),
366            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
367        }
368    }
369
370    /// Calls [CacheManager::put_pages()].
371    /// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
372    pub fn put_pages(&self, page_key: PageKey, pages: Arc<PageValue>) {
373        if let CacheStrategy::EnableAll(cache_manager) = self {
374            cache_manager.put_pages(page_key, pages);
375        }
376    }
377
378    /// Calls [CacheManager::evict_puffin_cache()].
379    pub async fn evict_puffin_cache(&self, file_id: RegionIndexId) {
380        match self {
381            CacheStrategy::EnableAll(cache_manager) => {
382                cache_manager.evict_puffin_cache(file_id).await
383            }
384            CacheStrategy::Compaction(cache_manager) => {
385                cache_manager.evict_puffin_cache(file_id).await
386            }
387            CacheStrategy::Disabled => {}
388        }
389    }
390
391    /// Calls [CacheManager::get_selector_result()].
392    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
393    pub fn get_selector_result(
394        &self,
395        selector_key: &SelectorResultKey,
396    ) -> Option<Arc<SelectorResultValue>> {
397        match self {
398            CacheStrategy::EnableAll(cache_manager) => {
399                cache_manager.get_selector_result(selector_key)
400            }
401            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
402        }
403    }
404
405    /// Calls [CacheManager::put_selector_result()].
406    /// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
407    pub fn put_selector_result(
408        &self,
409        selector_key: SelectorResultKey,
410        result: Arc<SelectorResultValue>,
411    ) {
412        if let CacheStrategy::EnableAll(cache_manager) = self {
413            cache_manager.put_selector_result(selector_key, result);
414        }
415    }
416
417    /// Calls [CacheManager::get_range_result()].
418    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
419    #[allow(dead_code)]
420    pub(crate) fn get_range_result(
421        &self,
422        key: &RangeScanCacheKey,
423    ) -> Option<Arc<RangeScanCacheValue>> {
424        match self {
425            CacheStrategy::EnableAll(cache_manager) => cache_manager.get_range_result(key),
426            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
427        }
428    }
429
430    /// Calls [CacheManager::put_range_result()].
431    /// It does nothing if the strategy isn't [CacheStrategy::EnableAll].
432    pub(crate) fn put_range_result(
433        &self,
434        key: RangeScanCacheKey,
435        result: Arc<RangeScanCacheValue>,
436    ) {
437        if let CacheStrategy::EnableAll(cache_manager) = self {
438            cache_manager.put_range_result(key, result);
439        }
440    }
441
442    /// Returns true if the range result cache is enabled.
443    pub(crate) fn has_range_result_cache(&self) -> bool {
444        match self {
445            CacheStrategy::EnableAll(cache_manager) => cache_manager.has_range_result_cache(),
446            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => false,
447        }
448    }
449
450    pub(crate) fn range_result_memory_limiter(&self) -> Option<&Arc<RangeResultMemoryLimiter>> {
451        match self {
452            CacheStrategy::EnableAll(cache_manager) => {
453                Some(cache_manager.range_result_memory_limiter())
454            }
455            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
456        }
457    }
458
459    pub(crate) fn range_result_cache_size(&self) -> Option<usize> {
460        match self {
461            CacheStrategy::EnableAll(cache_manager) => {
462                Some(cache_manager.range_result_cache_size())
463            }
464            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
465        }
466    }
467
468    /// Calls [CacheManager::write_cache()].
469    /// It returns None if the strategy is [CacheStrategy::Disabled].
470    pub fn write_cache(&self) -> Option<&WriteCacheRef> {
471        match self {
472            CacheStrategy::EnableAll(cache_manager) => cache_manager.write_cache(),
473            CacheStrategy::Compaction(cache_manager) => cache_manager.write_cache(),
474            CacheStrategy::Disabled => None,
475        }
476    }
477
478    /// Calls [CacheManager::index_cache()].
479    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
480    pub fn inverted_index_cache(&self) -> Option<&InvertedIndexCacheRef> {
481        match self {
482            CacheStrategy::EnableAll(cache_manager) => cache_manager.inverted_index_cache(),
483            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
484        }
485    }
486
487    /// Calls [CacheManager::bloom_filter_index_cache()].
488    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
489    pub fn bloom_filter_index_cache(&self) -> Option<&BloomFilterIndexCacheRef> {
490        match self {
491            CacheStrategy::EnableAll(cache_manager) => cache_manager.bloom_filter_index_cache(),
492            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
493        }
494    }
495
496    /// Calls [CacheManager::vector_index_cache()].
497    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
498    #[cfg(feature = "vector_index")]
499    pub fn vector_index_cache(&self) -> Option<&VectorIndexCacheRef> {
500        match self {
501            CacheStrategy::EnableAll(cache_manager) => cache_manager.vector_index_cache(),
502            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
503        }
504    }
505
506    /// Calls [CacheManager::puffin_metadata_cache()].
507    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
508    pub fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
509        match self {
510            CacheStrategy::EnableAll(cache_manager) => cache_manager.puffin_metadata_cache(),
511            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
512        }
513    }
514
515    /// Calls [CacheManager::index_result_cache()].
516    /// It returns None if the strategy is [CacheStrategy::Compaction] or [CacheStrategy::Disabled].
517    pub fn index_result_cache(&self) -> Option<&IndexResultCache> {
518        match self {
519            CacheStrategy::EnableAll(cache_manager) => cache_manager.index_result_cache(),
520            CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
521        }
522    }
523
524    /// Triggers download if the strategy is [CacheStrategy::EnableAll] and write cache is available.
525    pub fn maybe_download_background(
526        &self,
527        index_key: IndexKey,
528        remote_path: String,
529        remote_store: ObjectStore,
530        file_size: u64,
531    ) {
532        if let CacheStrategy::EnableAll(cache_manager) = self
533            && let Some(write_cache) = cache_manager.write_cache()
534        {
535            write_cache.file_cache().maybe_download_background(
536                index_key,
537                remote_path,
538                remote_store,
539                file_size,
540            );
541        }
542    }
543}
544
545/// Manages cached data for the engine.
546///
547/// All caches are disabled by default.
548#[derive(Default)]
549pub struct CacheManager {
550    /// Cache for SST metadata.
551    sst_meta_cache: Option<SstMetaCache>,
552    /// Cache for vectors.
553    vector_cache: Option<VectorCache>,
554    /// Cache for SST pages.
555    page_cache: Option<PageCache>,
556    /// A Cache for writing files to object stores.
557    write_cache: Option<WriteCacheRef>,
558    /// Cache for inverted index.
559    inverted_index_cache: Option<InvertedIndexCacheRef>,
560    /// Cache for bloom filter index.
561    bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
562    /// Cache for vector index.
563    #[cfg(feature = "vector_index")]
564    vector_index_cache: Option<VectorIndexCacheRef>,
565    /// Puffin metadata cache.
566    puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
567    /// Cache for time series selectors.
568    selector_result_cache: Option<SelectorResultCache>,
569    /// Cache for range scan outputs in flat format.
570    range_result_cache: Option<RangeResultCache>,
571    /// Configured capacity for range scan outputs in flat format.
572    range_result_cache_size: u64,
573    /// Shared memory limiter for async range-result cache tasks.
574    range_result_memory_limiter: Arc<RangeResultMemoryLimiter>,
575    /// Cache for index result.
576    index_result_cache: Option<IndexResultCache>,
577}
578
579pub type CacheManagerRef = Arc<CacheManager>;
580
581impl CacheManager {
582    /// Returns a builder to build the cache.
583    pub fn builder() -> CacheManagerBuilder {
584        CacheManagerBuilder::default()
585    }
586
587    /// Gets fused SST metadata with metrics tracking.
588    /// Tries in-memory cache first, then file cache, updating metrics accordingly.
589    pub(crate) async fn get_sst_meta_data(
590        &self,
591        file_id: RegionFileId,
592        metrics: &mut MetadataCacheMetrics,
593        page_index_policy: PageIndexPolicy,
594    ) -> Option<Arc<CachedSstMeta>> {
595        if let Some(metadata) = self.get_sst_meta_data_from_mem_cache(file_id) {
596            metrics.mem_cache_hit += 1;
597            return Some(metadata);
598        }
599
600        let key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Parquet);
601        if let Some(write_cache) = &self.write_cache
602            && let Some(metadata) = write_cache
603                .file_cache()
604                .get_sst_meta_data(key, metrics, page_index_policy)
605                .await
606        {
607            metrics.file_cache_hit += 1;
608            self.put_sst_meta_data(file_id, metadata.clone());
609            return Some(metadata);
610        }
611
612        metrics.cache_miss += 1;
613        None
614    }
615
616    /// Gets cached [ParquetMetaData] with metrics tracking.
617    /// Tries in-memory cache first, then file cache, updating metrics accordingly.
618    pub(crate) async fn get_parquet_meta_data(
619        &self,
620        file_id: RegionFileId,
621        metrics: &mut MetadataCacheMetrics,
622        page_index_policy: PageIndexPolicy,
623    ) -> Option<Arc<ParquetMetaData>> {
624        self.get_sst_meta_data(file_id, metrics, page_index_policy)
625            .await
626            .map(|metadata| metadata.parquet_metadata())
627    }
628
629    /// Gets cached fused SST metadata from in-memory cache.
630    /// This method does not perform I/O.
631    pub(crate) fn get_sst_meta_data_from_mem_cache(
632        &self,
633        file_id: RegionFileId,
634    ) -> Option<Arc<CachedSstMeta>> {
635        self.sst_meta_cache.as_ref().and_then(|sst_meta_cache| {
636            let value = sst_meta_cache.get(&SstMetaKey(file_id.region_id(), file_id.file_id()));
637            update_hit_miss(value, SST_META_TYPE)
638        })
639    }
640
641    /// Gets cached [ParquetMetaData] from in-memory cache.
642    /// This method does not perform I/O.
643    pub fn get_parquet_meta_data_from_mem_cache(
644        &self,
645        file_id: RegionFileId,
646    ) -> Option<Arc<ParquetMetaData>> {
647        self.get_sst_meta_data_from_mem_cache(file_id)
648            .map(|metadata| metadata.parquet_metadata())
649    }
650
651    /// Puts fused SST metadata into the cache.
652    pub(crate) fn put_sst_meta_data(&self, file_id: RegionFileId, metadata: Arc<CachedSstMeta>) {
653        if let Some(cache) = &self.sst_meta_cache {
654            let key = SstMetaKey(file_id.region_id(), file_id.file_id());
655            CACHE_BYTES
656                .with_label_values(&[SST_META_TYPE])
657                .add(meta_cache_weight(&key, &metadata).into());
658            cache.insert(key, metadata);
659        }
660    }
661
662    /// Puts [ParquetMetaData] into the cache.
663    pub fn put_parquet_meta_data(
664        &self,
665        file_id: RegionFileId,
666        metadata: Arc<ParquetMetaData>,
667        region_metadata: Option<RegionMetadataRef>,
668    ) {
669        if self.sst_meta_cache.is_some() {
670            let file_path = format!(
671                "region_id={}, file_id={}",
672                file_id.region_id(),
673                file_id.file_id()
674            );
675            match CachedSstMeta::try_new_with_region_metadata(
676                &file_path,
677                Arc::unwrap_or_clone(metadata),
678                region_metadata,
679            ) {
680                Ok(metadata) => self.put_sst_meta_data(file_id, Arc::new(metadata)),
681                Err(err) => warn!(
682                    err; "Failed to decode region metadata while caching parquet metadata, region_id: {}, file_id: {}",
683                    file_id.region_id(),
684                    file_id.file_id()
685                ),
686            }
687        }
688    }
689
690    /// Removes [ParquetMetaData] from the cache.
691    pub fn remove_parquet_meta_data(&self, file_id: RegionFileId) {
692        if let Some(cache) = &self.sst_meta_cache {
693            cache.remove(&SstMetaKey(file_id.region_id(), file_id.file_id()));
694        }
695    }
696
697    /// Returns the total weighted size of the in-memory SST meta cache.
698    pub(crate) fn sst_meta_cache_weighted_size(&self) -> u64 {
699        self.sst_meta_cache
700            .as_ref()
701            .map(|cache| cache.weighted_size())
702            .unwrap_or(0)
703    }
704
705    /// Returns true if the in-memory SST meta cache is enabled.
706    pub(crate) fn sst_meta_cache_enabled(&self) -> bool {
707        self.sst_meta_cache.is_some()
708    }
709
710    /// Gets a vector with repeated value for specific `key`.
711    pub fn get_repeated_vector(
712        &self,
713        data_type: &ConcreteDataType,
714        value: &Value,
715    ) -> Option<VectorRef> {
716        self.vector_cache.as_ref().and_then(|vector_cache| {
717            let value = vector_cache.get(&(data_type.clone(), value.clone()));
718            update_hit_miss(value, VECTOR_TYPE)
719        })
720    }
721
722    /// Puts a vector with repeated value into the cache.
723    pub fn put_repeated_vector(&self, value: Value, vector: VectorRef) {
724        if let Some(cache) = &self.vector_cache {
725            let key = (vector.data_type(), value);
726            CACHE_BYTES
727                .with_label_values(&[VECTOR_TYPE])
728                .add(vector_cache_weight(&key, &vector).into());
729            cache.insert(key, vector);
730        }
731    }
732
733    /// Gets pages for the row group.
734    pub fn get_pages(&self, page_key: &PageKey) -> Option<Arc<PageValue>> {
735        self.page_cache.as_ref().and_then(|page_cache| {
736            let value = page_cache.get(page_key);
737            update_hit_miss(value, PAGE_TYPE)
738        })
739    }
740
741    /// Puts pages of the row group into the cache.
742    pub fn put_pages(&self, page_key: PageKey, pages: Arc<PageValue>) {
743        if let Some(cache) = &self.page_cache {
744            CACHE_BYTES
745                .with_label_values(&[PAGE_TYPE])
746                .add(page_cache_weight(&page_key, &pages).into());
747            cache.insert(page_key, pages);
748        }
749    }
750
751    /// Evicts every puffin-related cache entry for the given file.
752    pub async fn evict_puffin_cache(&self, file_id: RegionIndexId) {
753        if let Some(cache) = &self.bloom_filter_index_cache {
754            cache.invalidate_file(file_id.file_id());
755        }
756
757        if let Some(cache) = &self.inverted_index_cache {
758            cache.invalidate_file(file_id.file_id());
759        }
760
761        if let Some(cache) = &self.index_result_cache {
762            cache.invalidate_file(file_id.file_id());
763        }
764
765        #[cfg(feature = "vector_index")]
766        if let Some(cache) = &self.vector_index_cache {
767            cache.invalidate_file(file_id.file_id());
768        }
769
770        if let Some(cache) = &self.puffin_metadata_cache {
771            cache.remove(&file_id.to_string());
772        }
773
774        if let Some(write_cache) = &self.write_cache {
775            write_cache
776                .remove(IndexKey::new(
777                    file_id.region_id(),
778                    file_id.file_id(),
779                    FileType::Puffin(file_id.version),
780                ))
781                .await;
782        }
783    }
784
785    /// Gets result of for the selector.
786    pub fn get_selector_result(
787        &self,
788        selector_key: &SelectorResultKey,
789    ) -> Option<Arc<SelectorResultValue>> {
790        self.selector_result_cache
791            .as_ref()
792            .and_then(|selector_result_cache| selector_result_cache.get(selector_key))
793    }
794
795    /// Puts result of the selector into the cache.
796    pub fn put_selector_result(
797        &self,
798        selector_key: SelectorResultKey,
799        result: Arc<SelectorResultValue>,
800    ) {
801        if let Some(cache) = &self.selector_result_cache {
802            CACHE_BYTES
803                .with_label_values(&[SELECTOR_RESULT_TYPE])
804                .add(selector_result_cache_weight(&selector_key, &result).into());
805            cache.insert(selector_key, result);
806        }
807    }
808
809    /// Gets cached result for range scan.
810    #[allow(dead_code)]
811    pub(crate) fn get_range_result(
812        &self,
813        key: &RangeScanCacheKey,
814    ) -> Option<Arc<RangeScanCacheValue>> {
815        self.range_result_cache
816            .as_ref()
817            .and_then(|cache| update_hit_miss(cache.get(key), RANGE_RESULT_TYPE))
818    }
819
820    /// Puts range scan result into cache.
821    pub(crate) fn put_range_result(
822        &self,
823        key: RangeScanCacheKey,
824        result: Arc<RangeScanCacheValue>,
825    ) {
826        if let Some(cache) = &self.range_result_cache {
827            CACHE_BYTES
828                .with_label_values(&[RANGE_RESULT_TYPE])
829                .add(range_result_cache_weight(&key, &result).into());
830            cache.insert(key, result);
831        }
832    }
833
834    /// Returns true if the range result cache is enabled.
835    pub(crate) fn has_range_result_cache(&self) -> bool {
836        self.range_result_cache.is_some()
837    }
838
839    pub(crate) fn range_result_memory_limiter(&self) -> &Arc<RangeResultMemoryLimiter> {
840        &self.range_result_memory_limiter
841    }
842
843    pub(crate) fn range_result_cache_size(&self) -> usize {
844        self.range_result_cache_size as usize
845    }
846
847    /// Gets the write cache.
848    pub(crate) fn write_cache(&self) -> Option<&WriteCacheRef> {
849        self.write_cache.as_ref()
850    }
851
852    pub(crate) fn inverted_index_cache(&self) -> Option<&InvertedIndexCacheRef> {
853        self.inverted_index_cache.as_ref()
854    }
855
856    pub(crate) fn bloom_filter_index_cache(&self) -> Option<&BloomFilterIndexCacheRef> {
857        self.bloom_filter_index_cache.as_ref()
858    }
859
860    #[cfg(feature = "vector_index")]
861    pub(crate) fn vector_index_cache(&self) -> Option<&VectorIndexCacheRef> {
862        self.vector_index_cache.as_ref()
863    }
864
865    pub(crate) fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
866        self.puffin_metadata_cache.as_ref()
867    }
868
869    pub(crate) fn index_result_cache(&self) -> Option<&IndexResultCache> {
870        self.index_result_cache.as_ref()
871    }
872}
873
874/// Increases selector cache miss metrics.
875pub fn selector_result_cache_miss() {
876    CACHE_MISS.with_label_values(&[SELECTOR_RESULT_TYPE]).inc()
877}
878
879/// Increases selector cache hit metrics.
880pub fn selector_result_cache_hit() {
881    CACHE_HIT.with_label_values(&[SELECTOR_RESULT_TYPE]).inc()
882}
883
884/// Builder to construct a [CacheManager].
885#[derive(Default)]
886pub struct CacheManagerBuilder {
887    sst_meta_cache_size: u64,
888    vector_cache_size: u64,
889    page_cache_size: u64,
890    index_metadata_size: u64,
891    index_content_size: u64,
892    index_content_page_size: u64,
893    index_result_cache_size: u64,
894    puffin_metadata_size: u64,
895    write_cache: Option<WriteCacheRef>,
896    selector_result_cache_size: u64,
897    range_result_cache_size: u64,
898}
899
900impl CacheManagerBuilder {
901    /// Sets meta cache size.
902    pub fn sst_meta_cache_size(mut self, bytes: u64) -> Self {
903        self.sst_meta_cache_size = bytes;
904        self
905    }
906
907    /// Sets vector cache size.
908    pub fn vector_cache_size(mut self, bytes: u64) -> Self {
909        self.vector_cache_size = bytes;
910        self
911    }
912
913    /// Sets page cache size.
914    pub fn page_cache_size(mut self, bytes: u64) -> Self {
915        self.page_cache_size = bytes;
916        self
917    }
918
919    /// Sets write cache.
920    pub fn write_cache(mut self, cache: Option<WriteCacheRef>) -> Self {
921        self.write_cache = cache;
922        self
923    }
924
925    /// Sets cache size for index metadata.
926    pub fn index_metadata_size(mut self, bytes: u64) -> Self {
927        self.index_metadata_size = bytes;
928        self
929    }
930
931    /// Sets cache size for index content.
932    pub fn index_content_size(mut self, bytes: u64) -> Self {
933        self.index_content_size = bytes;
934        self
935    }
936
937    /// Sets page size for index content.
938    pub fn index_content_page_size(mut self, bytes: u64) -> Self {
939        self.index_content_page_size = bytes;
940        self
941    }
942
943    /// Sets cache size for index result.
944    pub fn index_result_cache_size(mut self, bytes: u64) -> Self {
945        self.index_result_cache_size = bytes;
946        self
947    }
948
949    /// Sets cache size for puffin metadata.
950    pub fn puffin_metadata_size(mut self, bytes: u64) -> Self {
951        self.puffin_metadata_size = bytes;
952        self
953    }
954
955    /// Sets selector result cache size.
956    pub fn selector_result_cache_size(mut self, bytes: u64) -> Self {
957        self.selector_result_cache_size = bytes;
958        self
959    }
960
961    /// Sets range result cache size.
962    pub fn range_result_cache_size(mut self, bytes: u64) -> Self {
963        self.range_result_cache_size = bytes;
964        self
965    }
966
967    /// Builds the [CacheManager].
968    pub fn build(self) -> CacheManager {
969        fn to_str(cause: RemovalCause) -> &'static str {
970            match cause {
971                RemovalCause::Expired => "expired",
972                RemovalCause::Explicit => "explicit",
973                RemovalCause::Replaced => "replaced",
974                RemovalCause::Size => "size",
975            }
976        }
977
978        let sst_meta_cache = (self.sst_meta_cache_size != 0).then(|| {
979            Cache::builder()
980                .max_capacity(self.sst_meta_cache_size)
981                .weigher(meta_cache_weight)
982                .eviction_listener(|k, v, cause| {
983                    let size = meta_cache_weight(&k, &v);
984                    CACHE_BYTES
985                        .with_label_values(&[SST_META_TYPE])
986                        .sub(size.into());
987                    CACHE_EVICTION
988                        .with_label_values(&[SST_META_TYPE, to_str(cause)])
989                        .inc();
990                })
991                .build()
992        });
993        let vector_cache = (self.vector_cache_size != 0).then(|| {
994            Cache::builder()
995                .max_capacity(self.vector_cache_size)
996                .weigher(vector_cache_weight)
997                .eviction_listener(|k, v, cause| {
998                    let size = vector_cache_weight(&k, &v);
999                    CACHE_BYTES
1000                        .with_label_values(&[VECTOR_TYPE])
1001                        .sub(size.into());
1002                    CACHE_EVICTION
1003                        .with_label_values(&[VECTOR_TYPE, to_str(cause)])
1004                        .inc();
1005                })
1006                .build()
1007        });
1008        let page_cache = (self.page_cache_size != 0).then(|| {
1009            Cache::builder()
1010                .max_capacity(self.page_cache_size)
1011                .weigher(page_cache_weight)
1012                .eviction_listener(|k, v, cause| {
1013                    let size = page_cache_weight(&k, &v);
1014                    CACHE_BYTES.with_label_values(&[PAGE_TYPE]).sub(size.into());
1015                    CACHE_EVICTION
1016                        .with_label_values(&[PAGE_TYPE, to_str(cause)])
1017                        .inc();
1018                })
1019                .build()
1020        });
1021        let inverted_index_cache = InvertedIndexCache::new(
1022            self.index_metadata_size,
1023            self.index_content_size,
1024            self.index_content_page_size,
1025        );
1026        // TODO(ruihang): check if it's ok to reuse the same param with inverted index
1027        let bloom_filter_index_cache = BloomFilterIndexCache::new(
1028            self.index_metadata_size,
1029            self.index_content_size,
1030            self.index_content_page_size,
1031        );
1032        #[cfg(feature = "vector_index")]
1033        let vector_index_cache = (self.index_content_size != 0)
1034            .then(|| Arc::new(VectorIndexCache::new(self.index_content_size)));
1035        let index_result_cache = (self.index_result_cache_size != 0)
1036            .then(|| IndexResultCache::new(self.index_result_cache_size));
1037        let puffin_metadata_cache =
1038            PuffinMetadataCache::new(self.puffin_metadata_size, &CACHE_BYTES);
1039        let selector_result_cache = (self.selector_result_cache_size != 0).then(|| {
1040            Cache::builder()
1041                .max_capacity(self.selector_result_cache_size)
1042                .weigher(selector_result_cache_weight)
1043                .eviction_listener(|k, v, cause| {
1044                    let size = selector_result_cache_weight(&k, &v);
1045                    CACHE_BYTES
1046                        .with_label_values(&[SELECTOR_RESULT_TYPE])
1047                        .sub(size.into());
1048                    CACHE_EVICTION
1049                        .with_label_values(&[SELECTOR_RESULT_TYPE, to_str(cause)])
1050                        .inc();
1051                })
1052                .build()
1053        });
1054        let range_result_cache = (self.range_result_cache_size != 0).then(|| {
1055            Cache::builder()
1056                .max_capacity(self.range_result_cache_size)
1057                .weigher(range_result_cache_weight)
1058                .eviction_listener(move |k, v, cause| {
1059                    let size = range_result_cache_weight(&k, &v);
1060                    CACHE_BYTES
1061                        .with_label_values(&[RANGE_RESULT_TYPE])
1062                        .sub(size.into());
1063                    CACHE_EVICTION
1064                        .with_label_values(&[RANGE_RESULT_TYPE, to_str(cause)])
1065                        .inc();
1066                })
1067                .build()
1068        });
1069        CacheManager {
1070            sst_meta_cache,
1071            vector_cache,
1072            page_cache,
1073            write_cache: self.write_cache,
1074            inverted_index_cache: Some(Arc::new(inverted_index_cache)),
1075            bloom_filter_index_cache: Some(Arc::new(bloom_filter_index_cache)),
1076            #[cfg(feature = "vector_index")]
1077            vector_index_cache,
1078            puffin_metadata_cache: Some(Arc::new(puffin_metadata_cache)),
1079            selector_result_cache,
1080            range_result_cache,
1081            range_result_cache_size: self.range_result_cache_size,
1082            range_result_memory_limiter: Arc::new(RangeResultMemoryLimiter::new(
1083                self.range_result_cache_size as usize,
1084                RANGE_RESULT_CONCAT_MEMORY_PERMIT.as_bytes() as usize,
1085            )),
1086            index_result_cache,
1087        }
1088    }
1089}
1090
1091fn meta_cache_weight(k: &SstMetaKey, v: &Arc<CachedSstMeta>) -> u32 {
1092    // We ignore the size of `Arc`.
1093    (k.estimated_size() + parquet_meta_size(&v.parquet_metadata) + v.region_metadata_weight) as u32
1094}
1095
1096fn vector_cache_weight(_k: &(ConcreteDataType, Value), v: &VectorRef) -> u32 {
1097    // We ignore the heap size of `Value`.
1098    (mem::size_of::<ConcreteDataType>() + mem::size_of::<Value>() + v.memory_size()) as u32
1099}
1100
1101fn page_cache_weight(k: &PageKey, v: &Arc<PageValue>) -> u32 {
1102    (k.estimated_size() + v.estimated_size()) as u32
1103}
1104
1105fn selector_result_cache_weight(k: &SelectorResultKey, v: &Arc<SelectorResultValue>) -> u32 {
1106    (mem::size_of_val(k) + v.estimated_size()) as u32
1107}
1108
1109fn range_result_cache_weight(k: &RangeScanCacheKey, v: &Arc<RangeScanCacheValue>) -> u32 {
1110    (k.estimated_size() + v.estimated_size()) as u32
1111}
1112
1113/// Updates cache hit/miss metrics.
1114fn update_hit_miss<T>(value: Option<T>, cache_type: &str) -> Option<T> {
1115    if value.is_some() {
1116        CACHE_HIT.with_label_values(&[cache_type]).inc();
1117    } else {
1118        CACHE_MISS.with_label_values(&[cache_type]).inc();
1119    }
1120    value
1121}
1122
1123/// Cache key (region id, file id) for SST meta.
1124#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1125struct SstMetaKey(RegionId, FileId);
1126
1127impl SstMetaKey {
1128    /// Returns memory used by the key (estimated).
1129    fn estimated_size(&self) -> usize {
1130        mem::size_of::<Self>()
1131    }
1132}
1133
1134/// Path to column pages in the SST file.
1135#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1136pub struct ColumnPagePath {
1137    /// Region id of the SST file to cache.
1138    region_id: RegionId,
1139    /// Id of the SST file to cache.
1140    file_id: FileId,
1141    /// Index of the row group.
1142    row_group_idx: usize,
1143    /// Index of the column in the row group.
1144    column_idx: usize,
1145}
1146
1147/// Cache key to pages in a row group (after projection).
1148///
1149/// Different projections will have different cache keys.
1150/// We cache all ranges together because they may refer to the same `Bytes`.
1151#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1152pub struct PageKey {
1153    /// Id of the SST file to cache.
1154    file_id: FileId,
1155    /// Index of the row group.
1156    row_group_idx: usize,
1157    /// Byte ranges of the pages to cache.
1158    ranges: Vec<Range<u64>>,
1159}
1160
1161impl PageKey {
1162    /// Creates a key for a list of pages.
1163    pub fn new(file_id: FileId, row_group_idx: usize, ranges: Vec<Range<u64>>) -> PageKey {
1164        PageKey {
1165            file_id,
1166            row_group_idx,
1167            ranges,
1168        }
1169    }
1170
1171    /// Returns memory used by the key (estimated).
1172    fn estimated_size(&self) -> usize {
1173        mem::size_of::<Self>() + mem::size_of_val(self.ranges.as_slice())
1174    }
1175}
1176
1177/// Cached row group pages for a column.
1178// We don't use enum here to make it easier to mock and use the struct.
1179#[derive(Default)]
1180pub struct PageValue {
1181    /// Compressed page in the row group.
1182    pub compressed: Vec<Bytes>,
1183    /// Total size of the pages (may be larger than sum of compressed bytes due to gaps).
1184    pub page_size: u64,
1185}
1186
1187impl PageValue {
1188    /// Creates a new value from a range of compressed pages.
1189    pub fn new(bytes: Vec<Bytes>, page_size: u64) -> PageValue {
1190        PageValue {
1191            compressed: bytes,
1192            page_size,
1193        }
1194    }
1195
1196    /// Returns memory used by the value (estimated).
1197    fn estimated_size(&self) -> usize {
1198        mem::size_of::<Self>()
1199            + self.page_size as usize
1200            + self.compressed.iter().map(mem::size_of_val).sum::<usize>()
1201    }
1202}
1203
1204/// Cache key for time series row selector result.
1205#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1206pub struct SelectorResultKey {
1207    /// Id of the SST file.
1208    pub file_id: FileId,
1209    /// Index of the row group.
1210    pub row_group_idx: usize,
1211    /// Time series row selector.
1212    pub selector: TimeSeriesRowSelector,
1213}
1214
1215/// Result stored in the selector result cache.
1216pub enum SelectorResult {
1217    /// Batches in the primary key format.
1218    PrimaryKey(Vec<Batch>),
1219    /// Record batches in the flat format.
1220    Flat(Vec<RecordBatch>),
1221}
1222
1223/// Cached result for time series row selector.
1224pub struct SelectorResultValue {
1225    /// Batches of rows selected by the selector.
1226    pub result: SelectorResult,
1227    /// The read columns of rows.
1228    pub read_cols: ParquetReadColumns,
1229}
1230
1231impl SelectorResultValue {
1232    /// Creates a new selector result value with primary key format.
1233    pub fn new(result: Vec<Batch>, read_cols: ParquetReadColumns) -> SelectorResultValue {
1234        SelectorResultValue {
1235            result: SelectorResult::PrimaryKey(result),
1236            read_cols,
1237        }
1238    }
1239
1240    /// Creates a new selector result value with flat format.
1241    pub fn new_flat(
1242        result: Vec<RecordBatch>,
1243        read_cols: ParquetReadColumns,
1244    ) -> SelectorResultValue {
1245        SelectorResultValue {
1246            result: SelectorResult::Flat(result),
1247            read_cols,
1248        }
1249    }
1250
1251    /// Returns memory used by the value (estimated).
1252    fn estimated_size(&self) -> usize {
1253        match &self.result {
1254            SelectorResult::PrimaryKey(batches) => {
1255                batches.iter().map(|batch| batch.memory_size()).sum()
1256            }
1257            SelectorResult::Flat(batches) => batches.iter().map(record_batch_estimated_size).sum(),
1258        }
1259    }
1260}
1261
1262/// Maps (region id, file id) to fused SST metadata.
1263type SstMetaCache = Cache<SstMetaKey, Arc<CachedSstMeta>>;
1264/// Maps [Value] to a vector that holds this value repeatedly.
1265///
1266/// e.g. `"hello" => ["hello", "hello", "hello"]`
1267type VectorCache = Cache<(ConcreteDataType, Value), VectorRef>;
1268/// Maps (region, file, row group, column) to [PageValue].
1269type PageCache = Cache<PageKey, Arc<PageValue>>;
1270/// Maps (file id, row group id, time series row selector) to [SelectorResultValue].
1271type SelectorResultCache = Cache<SelectorResultKey, Arc<SelectorResultValue>>;
1272/// Maps partition-range scan key to cached flat batches.
1273type RangeResultCache = Cache<RangeScanCacheKey, Arc<RangeScanCacheValue>>;
1274
1275#[cfg(test)]
1276mod tests {
1277    use std::sync::Arc;
1278
1279    use api::v1::SemanticType;
1280    use api::v1::index::{BloomFilterMeta, InvertedIndexMetas};
1281    use datatypes::schema::ColumnSchema;
1282    use datatypes::vectors::Int64Vector;
1283    use puffin::file_metadata::FileMetadata;
1284    use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
1285    use store_api::storage::ColumnId;
1286
1287    use super::*;
1288    use crate::cache::index::bloom_filter_index::Tag;
1289    use crate::cache::index::result_cache::PredicateKey;
1290    use crate::cache::test_util::{
1291        parquet_meta, sst_parquet_meta, sst_parquet_meta_with_region_metadata,
1292    };
1293    use crate::read::range_cache::{
1294        RangeScanCacheKey, RangeScanCacheValue, ScanRequestFingerprintBuilder,
1295    };
1296    use crate::read::read_columns::ReadColumns;
1297    use crate::sst::parquet::row_selection::RowGroupSelection;
1298
1299    #[tokio::test]
1300    async fn test_disable_cache() {
1301        let cache = CacheManager::default();
1302        assert!(cache.sst_meta_cache.is_none());
1303        assert!(cache.vector_cache.is_none());
1304        assert!(cache.page_cache.is_none());
1305
1306        let region_id = RegionId::new(1, 1);
1307        let file_id = RegionFileId::new(region_id, FileId::random());
1308        let metadata = parquet_meta();
1309        let mut metrics = MetadataCacheMetrics::default();
1310        cache.put_parquet_meta_data(file_id, metadata, None);
1311        assert!(
1312            cache
1313                .get_parquet_meta_data(file_id, &mut metrics, Default::default())
1314                .await
1315                .is_none()
1316        );
1317
1318        let value = Value::Int64(10);
1319        let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10]));
1320        cache.put_repeated_vector(value.clone(), vector.clone());
1321        assert!(
1322            cache
1323                .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
1324                .is_none()
1325        );
1326
1327        let key = PageKey::new(file_id.file_id(), 1, vec![Range { start: 0, end: 5 }]);
1328        let pages = Arc::new(PageValue::default());
1329        cache.put_pages(key.clone(), pages);
1330        assert!(cache.get_pages(&key).is_none());
1331
1332        assert!(cache.write_cache().is_none());
1333    }
1334
1335    #[tokio::test]
1336    async fn test_parquet_meta_cache() {
1337        let cache = CacheManager::builder().sst_meta_cache_size(2000).build();
1338        let mut metrics = MetadataCacheMetrics::default();
1339        let region_id = RegionId::new(1, 1);
1340        let file_id = RegionFileId::new(region_id, FileId::random());
1341        assert!(
1342            cache
1343                .get_parquet_meta_data(file_id, &mut metrics, Default::default())
1344                .await
1345                .is_none()
1346        );
1347        let (metadata, region_metadata) = sst_parquet_meta();
1348        cache.put_parquet_meta_data(file_id, metadata, None);
1349        let cached = cache
1350            .get_sst_meta_data(file_id, &mut metrics, Default::default())
1351            .await
1352            .unwrap();
1353        assert_eq!(region_metadata, cached.region_metadata());
1354        assert!(
1355            cached
1356                .parquet_metadata()
1357                .file_metadata()
1358                .key_value_metadata()
1359                .is_none_or(|key_values| {
1360                    key_values
1361                        .iter()
1362                        .all(|key_value| key_value.key != PARQUET_METADATA_KEY)
1363                })
1364        );
1365        cache.remove_parquet_meta_data(file_id);
1366        assert!(
1367            cache
1368                .get_parquet_meta_data(file_id, &mut metrics, Default::default())
1369                .await
1370                .is_none()
1371        );
1372    }
1373
1374    #[tokio::test]
1375    async fn test_parquet_meta_cache_with_provided_region_metadata() {
1376        let cache = CacheManager::builder().sst_meta_cache_size(2000).build();
1377        let mut metrics = MetadataCacheMetrics::default();
1378        let region_id = RegionId::new(1, 1);
1379        let file_id = RegionFileId::new(region_id, FileId::random());
1380        let (metadata, region_metadata) = sst_parquet_meta();
1381
1382        cache.put_parquet_meta_data(file_id, metadata, Some(region_metadata.clone()));
1383
1384        let cached = cache
1385            .get_sst_meta_data(file_id, &mut metrics, Default::default())
1386            .await
1387            .unwrap();
1388        assert!(Arc::ptr_eq(&region_metadata, &cached.region_metadata()));
1389    }
1390
1391    #[test]
1392    fn test_meta_cache_weight_accounts_for_decoded_region_metadata() {
1393        let region_metadata = Arc::new(wide_region_metadata(128));
1394        let json_len = region_metadata.to_json().unwrap().len();
1395        let metadata = sst_parquet_meta_with_region_metadata(region_metadata.clone());
1396        let cached = Arc::new(
1397            CachedSstMeta::try_new("test.parquet", Arc::unwrap_or_clone(metadata)).unwrap(),
1398        );
1399        let key = SstMetaKey(region_metadata.region_id, FileId::random());
1400
1401        assert!(cached.region_metadata_weight > json_len);
1402        assert_eq!(
1403            meta_cache_weight(&key, &cached) as usize,
1404            key.estimated_size()
1405                + parquet_meta_size(&cached.parquet_metadata)
1406                + cached.region_metadata_weight
1407        );
1408    }
1409
1410    #[test]
1411    fn test_repeated_vector_cache() {
1412        let cache = CacheManager::builder().vector_cache_size(4096).build();
1413        let value = Value::Int64(10);
1414        assert!(
1415            cache
1416                .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
1417                .is_none()
1418        );
1419        let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10]));
1420        cache.put_repeated_vector(value.clone(), vector.clone());
1421        let cached = cache
1422            .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
1423            .unwrap();
1424        assert_eq!(vector, cached);
1425    }
1426
1427    #[test]
1428    fn test_page_cache() {
1429        let cache = CacheManager::builder().page_cache_size(1000).build();
1430        let file_id = FileId::random();
1431        let key = PageKey::new(file_id, 0, vec![(0..10), (10..20)]);
1432        assert!(cache.get_pages(&key).is_none());
1433        let pages = Arc::new(PageValue::default());
1434        cache.put_pages(key.clone(), pages);
1435        assert!(cache.get_pages(&key).is_some());
1436    }
1437
1438    #[test]
1439    fn test_selector_result_cache() {
1440        let cache = CacheManager::builder()
1441            .selector_result_cache_size(1000)
1442            .build();
1443        let file_id = FileId::random();
1444        let key = SelectorResultKey {
1445            file_id,
1446            row_group_idx: 0,
1447            selector: TimeSeriesRowSelector::LastRow,
1448        };
1449        assert!(cache.get_selector_result(&key).is_none());
1450        let result = Arc::new(SelectorResultValue::new(
1451            Vec::new(),
1452            ParquetReadColumns::from_deduped(Vec::new()),
1453        ));
1454        cache.put_selector_result(key, result);
1455        assert!(cache.get_selector_result(&key).is_some());
1456    }
1457
1458    #[test]
1459    fn test_range_result_cache() {
1460        let cache = Arc::new(
1461            CacheManager::builder()
1462                .range_result_cache_size(1024 * 1024)
1463                .build(),
1464        );
1465
1466        let key = RangeScanCacheKey {
1467            region_id: RegionId::new(1, 1),
1468            row_groups: vec![(FileId::random(), 0)],
1469            scan: ScanRequestFingerprintBuilder {
1470                read_columns: ReadColumns::from_deduped_column_ids(std::iter::empty()),
1471                read_column_types: vec![],
1472                filters: vec!["tag_0 = 1".to_string()],
1473                time_filters: vec![],
1474                series_row_selector: None,
1475                append_mode: false,
1476                filter_deleted: true,
1477                merge_mode: crate::region::options::MergeMode::LastRow,
1478                partition_expr_version: 0,
1479            }
1480            .build(),
1481        };
1482        let value = Arc::new(RangeScanCacheValue::new(Vec::new(), 0));
1483
1484        assert!(cache.get_range_result(&key).is_none());
1485        cache.put_range_result(key.clone(), value.clone());
1486        assert!(cache.get_range_result(&key).is_some());
1487
1488        let enable_all = CacheStrategy::EnableAll(cache.clone());
1489        assert!(enable_all.get_range_result(&key).is_some());
1490
1491        let compaction = CacheStrategy::Compaction(cache.clone());
1492        assert!(compaction.get_range_result(&key).is_none());
1493        compaction.put_range_result(key.clone(), value.clone());
1494        assert!(cache.get_range_result(&key).is_some());
1495
1496        let disabled = CacheStrategy::Disabled;
1497        assert!(disabled.get_range_result(&key).is_none());
1498        disabled.put_range_result(key.clone(), value);
1499        assert!(cache.get_range_result(&key).is_some());
1500    }
1501
1502    #[test]
1503    fn test_range_result_cache_size_configures_limiter() {
1504        let cache_size = 3 * 1024_u64;
1505        let cache = CacheManager::builder()
1506            .range_result_cache_size(cache_size)
1507            .build();
1508
1509        assert_eq!(cache.range_result_cache_size(), cache_size as usize);
1510        assert_eq!(
1511            cache.range_result_memory_limiter().permit_bytes(),
1512            RANGE_RESULT_CONCAT_MEMORY_PERMIT.as_bytes() as usize
1513        );
1514        assert_eq!(
1515            cache.range_result_memory_limiter().available_permits(),
1516            (cache_size as usize).div_ceil(RANGE_RESULT_CONCAT_MEMORY_PERMIT.as_bytes() as usize)
1517        );
1518    }
1519
1520    #[tokio::test]
1521    async fn range_result_memory_limiter_rejects_oversized_request() {
1522        let limiter = RangeResultMemoryLimiter::new(2 * 1024, 1024);
1523        assert_eq!(limiter.available_permits(), 2);
1524
1525        let err = limiter.acquire(10 * 1024).await.unwrap_err();
1526        assert!(
1527            err.to_string().contains("exceeds limiter capacity"),
1528            "unexpected error: {err}"
1529        );
1530        assert_eq!(limiter.available_permits(), 2);
1531    }
1532
1533    #[tokio::test]
1534    async fn range_result_memory_limiter_allows_request_up_to_capacity() {
1535        let limiter = RangeResultMemoryLimiter::new(2 * 1024, 1024);
1536        let permit = limiter.acquire(2 * 1024).await.unwrap();
1537        assert_eq!(limiter.available_permits(), 0);
1538        drop(permit);
1539        assert_eq!(limiter.available_permits(), 2);
1540    }
1541
1542    #[tokio::test]
1543    async fn test_evict_puffin_cache_clears_all_entries() {
1544        use std::collections::{BTreeMap, HashMap};
1545
1546        let cache = CacheManager::builder()
1547            .index_metadata_size(128)
1548            .index_content_size(128)
1549            .index_content_page_size(64)
1550            .index_result_cache_size(128)
1551            .puffin_metadata_size(128)
1552            .build();
1553        let cache = Arc::new(cache);
1554
1555        let region_id = RegionId::new(1, 1);
1556        let index_id = RegionIndexId::new(RegionFileId::new(region_id, FileId::random()), 0);
1557        let column_id: ColumnId = 1;
1558
1559        let bloom_cache = cache.bloom_filter_index_cache().unwrap().clone();
1560        let inverted_cache = cache.inverted_index_cache().unwrap().clone();
1561        let result_cache = cache.index_result_cache().unwrap();
1562        let puffin_metadata_cache = cache.puffin_metadata_cache().unwrap().clone();
1563
1564        let bloom_key = (
1565            index_id.file_id(),
1566            index_id.version,
1567            column_id,
1568            Tag::Skipping,
1569        );
1570        bloom_cache.put_metadata(bloom_key, Arc::new(BloomFilterMeta::default()));
1571        inverted_cache.put_metadata(
1572            (index_id.file_id(), index_id.version),
1573            Arc::new(InvertedIndexMetas::default()),
1574        );
1575        let predicate = PredicateKey::new_bloom(Arc::new(BTreeMap::new()));
1576        let selection = Arc::new(RowGroupSelection::default());
1577        result_cache.put(predicate.clone(), index_id.file_id(), selection);
1578        let file_id_str = index_id.to_string();
1579        let metadata = Arc::new(FileMetadata {
1580            blobs: Vec::new(),
1581            properties: HashMap::new(),
1582        });
1583        puffin_metadata_cache.put_metadata(file_id_str.clone(), metadata);
1584
1585        assert!(bloom_cache.get_metadata(bloom_key).is_some());
1586        assert!(
1587            inverted_cache
1588                .get_metadata((index_id.file_id(), index_id.version))
1589                .is_some()
1590        );
1591        assert!(result_cache.get(&predicate, index_id.file_id()).is_some());
1592        assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_some());
1593
1594        cache.evict_puffin_cache(index_id).await;
1595
1596        assert!(bloom_cache.get_metadata(bloom_key).is_none());
1597        assert!(
1598            inverted_cache
1599                .get_metadata((index_id.file_id(), index_id.version))
1600                .is_none()
1601        );
1602        assert!(result_cache.get(&predicate, index_id.file_id()).is_none());
1603        assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_none());
1604
1605        // Refill caches and evict via CacheStrategy to ensure delegation works.
1606        bloom_cache.put_metadata(bloom_key, Arc::new(BloomFilterMeta::default()));
1607        inverted_cache.put_metadata(
1608            (index_id.file_id(), index_id.version),
1609            Arc::new(InvertedIndexMetas::default()),
1610        );
1611        result_cache.put(
1612            predicate.clone(),
1613            index_id.file_id(),
1614            Arc::new(RowGroupSelection::default()),
1615        );
1616        puffin_metadata_cache.put_metadata(
1617            file_id_str.clone(),
1618            Arc::new(FileMetadata {
1619                blobs: Vec::new(),
1620                properties: HashMap::new(),
1621            }),
1622        );
1623
1624        let strategy = CacheStrategy::EnableAll(cache.clone());
1625        strategy.evict_puffin_cache(index_id).await;
1626
1627        assert!(bloom_cache.get_metadata(bloom_key).is_none());
1628        assert!(
1629            inverted_cache
1630                .get_metadata((index_id.file_id(), index_id.version))
1631                .is_none()
1632        );
1633        assert!(result_cache.get(&predicate, index_id.file_id()).is_none());
1634        assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_none());
1635    }
1636
1637    fn wide_region_metadata(column_count: u32) -> RegionMetadata {
1638        let region_id = RegionId::new(1024, 7);
1639        let mut builder = RegionMetadataBuilder::new(region_id);
1640        let mut primary_key = Vec::new();
1641
1642        for column_id in 0..column_count {
1643            let semantic_type = if column_id < 32 {
1644                primary_key.push(column_id);
1645                SemanticType::Tag
1646            } else {
1647                SemanticType::Field
1648            };
1649            let mut column_schema = ColumnSchema::new(
1650                format!("wide_column_{column_id}"),
1651                ConcreteDataType::string_datatype(),
1652                true,
1653            );
1654            column_schema
1655                .mut_metadata()
1656                .insert(format!("cache_key_{column_id}"), "cache_value".repeat(4));
1657            builder.push_column_metadata(ColumnMetadata {
1658                column_schema,
1659                semantic_type,
1660                column_id,
1661            });
1662        }
1663
1664        builder.push_column_metadata(ColumnMetadata {
1665            column_schema: ColumnSchema::new(
1666                "ts",
1667                ConcreteDataType::timestamp_millisecond_datatype(),
1668                false,
1669            ),
1670            semantic_type: SemanticType::Timestamp,
1671            column_id: column_count,
1672        });
1673        builder.primary_key(primary_key);
1674
1675        builder.build().unwrap()
1676    }
1677}