1pub(crate) mod cache_size;
18
19pub(crate) mod file_cache;
20pub(crate) mod index;
21pub(crate) mod manifest_cache;
22#[cfg(test)]
23pub(crate) mod test_util;
24pub(crate) mod write_cache;
25
26use std::collections::{BTreeMap, HashMap};
27use std::mem;
28use std::ops::Range;
29use std::sync::{Arc, RwLock};
30
31use bytes::Bytes;
32use common_base::readable_size::ReadableSize;
33use common_telemetry::warn;
34use datatypes::arrow::buffer::BooleanBuffer;
35use datatypes::arrow::record_batch::RecordBatch;
36use datatypes::value::Value;
37use datatypes::vectors::VectorRef;
38use index::bloom_filter_index::{BloomFilterIndexCache, BloomFilterIndexCacheRef};
39use index::result_cache::IndexResultCache;
40use moka::notification::RemovalCause;
41use moka::sync::Cache;
42use object_store::ObjectStore;
43use parquet::arrow::arrow_reader::{RowSelection, RowSelector};
44use parquet::file::metadata::{FileMetaData, PageIndexPolicy, ParquetMetaData};
45use puffin::puffin_manager::cache::{PuffinMetadataCache, PuffinMetadataCacheRef};
46use smallvec::SmallVec;
47use snafu::{OptionExt, ResultExt};
48use store_api::metadata::RegionMetadataRef;
49use store_api::storage::{ConcreteDataType, FileId, RegionId, TimeSeriesRowSelector};
50
51use crate::cache::cache_size::parquet_meta_size;
52use crate::cache::file_cache::{FileType, IndexKey};
53use crate::cache::index::inverted_index::{InvertedIndexCache, InvertedIndexCacheRef};
54#[cfg(feature = "vector_index")]
55use crate::cache::index::vector_index::{VectorIndexCache, VectorIndexCacheRef};
56use crate::cache::write_cache::WriteCacheRef;
57use crate::error::{InvalidMetadataSnafu, InvalidParquetSnafu, Result, UnexpectedSnafu};
58use crate::memtable::record_batch_estimated_size;
59use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS};
60use crate::read::Batch;
61use crate::read::range_cache::{RangeScanCacheKey, RangeScanCacheValue};
62use crate::sst::file::{RegionFileId, RegionIndexId};
63use crate::sst::parquet::PARQUET_METADATA_KEY;
64use crate::sst::parquet::read_columns::ParquetReadColumns;
65use crate::sst::parquet::reader::MetadataCacheMetrics;
66
67const SST_META_TYPE: &str = "sst_meta";
69const VECTOR_TYPE: &str = "vector";
71const PAGE_TYPE: &str = "page";
73const FILE_TYPE: &str = "file";
75const INDEX_TYPE: &str = "index";
77const SELECTOR_RESULT_TYPE: &str = "selector_result";
79const RANGE_RESULT_TYPE: &str = "range_result";
81const PREFILTER_RESULT_TYPE: &str = "prefilter_result";
83const RANGE_RESULT_CONCAT_MEMORY_LIMIT: ReadableSize = ReadableSize::mb(512);
84const RANGE_RESULT_CONCAT_MEMORY_PERMIT: ReadableSize = ReadableSize::kb(1);
85
86#[derive(Debug)]
87pub(crate) struct RangeResultMemoryLimiter {
88 semaphore: Arc<tokio::sync::Semaphore>,
89 permit_bytes: usize,
90 total_permits: usize,
91}
92
93impl Default for RangeResultMemoryLimiter {
94 fn default() -> Self {
95 Self::new(
96 RANGE_RESULT_CONCAT_MEMORY_LIMIT.as_bytes() as usize,
97 RANGE_RESULT_CONCAT_MEMORY_PERMIT.as_bytes() as usize,
98 )
99 }
100}
101
102impl RangeResultMemoryLimiter {
103 pub(crate) fn new(limit_bytes: usize, permit_bytes: usize) -> Self {
104 let permit_bytes = permit_bytes.max(1);
105 let total_permits = limit_bytes
106 .div_ceil(permit_bytes)
107 .clamp(1, tokio::sync::Semaphore::MAX_PERMITS);
108 Self {
109 semaphore: Arc::new(tokio::sync::Semaphore::new(total_permits)),
110 permit_bytes,
111 total_permits,
112 }
113 }
114
115 #[cfg(test)]
116 pub(crate) fn permit_bytes(&self) -> usize {
117 self.permit_bytes
118 }
119
120 #[cfg(test)]
121 pub(crate) fn available_permits(&self) -> usize {
122 self.semaphore.available_permits()
123 }
124
125 pub(crate) async fn acquire(&self, bytes: usize) -> Result<tokio::sync::SemaphorePermit<'_>> {
126 let permits = bytes.div_ceil(self.permit_bytes).max(1);
127 if permits > self.total_permits {
128 return UnexpectedSnafu {
129 reason: format!(
130 "range result memory request of {bytes} bytes exceeds limiter capacity of {} bytes",
131 self.total_permits.saturating_mul(self.permit_bytes)
132 ),
133 }
134 .fail();
135 }
136 self.semaphore
137 .acquire_many(permits as u32)
138 .await
139 .map_err(|_| {
140 UnexpectedSnafu {
141 reason: "range result memory limiter is unexpectedly closed",
142 }
143 .build()
144 })
145 }
146}
147
148#[derive(Debug)]
153pub(crate) struct CachedSstMeta {
154 parquet_metadata: Arc<ParquetMetaData>,
155 region_metadata: RegionMetadataRef,
156 region_metadata_weight: usize,
157 page_index_policy: PageIndexPolicy,
158}
159
160impl CachedSstMeta {
161 #[cfg(test)]
162 pub(crate) fn try_new(file_path: &str, parquet_metadata: ParquetMetaData) -> Result<Self> {
163 let page_index_policy = infer_loaded_page_index_policy(&parquet_metadata);
164 Self::try_new_with_page_index_policy(file_path, parquet_metadata, None, page_index_policy)
165 }
166
167 pub(crate) fn try_new_with_region_metadata(
168 file_path: &str,
169 parquet_metadata: ParquetMetaData,
170 region_metadata: Option<RegionMetadataRef>,
171 ) -> Result<Self> {
172 let page_index_policy = infer_loaded_page_index_policy(&parquet_metadata);
173 Self::try_new_with_page_index_policy(
174 file_path,
175 parquet_metadata,
176 region_metadata,
177 page_index_policy,
178 )
179 }
180
181 pub(crate) fn try_new_with_page_index_policy(
182 file_path: &str,
183 parquet_metadata: ParquetMetaData,
184 region_metadata: Option<RegionMetadataRef>,
185 page_index_policy: PageIndexPolicy,
186 ) -> Result<Self> {
187 let file_metadata = parquet_metadata.file_metadata();
188 let key_values = file_metadata
189 .key_value_metadata()
190 .context(InvalidParquetSnafu {
191 file: file_path,
192 reason: "missing key value meta",
193 })?;
194 let meta_value = key_values
195 .iter()
196 .find(|kv| kv.key == PARQUET_METADATA_KEY)
197 .with_context(|| InvalidParquetSnafu {
198 file: file_path,
199 reason: format!("key {} not found", PARQUET_METADATA_KEY),
200 })?;
201 let json = meta_value
202 .value
203 .as_ref()
204 .with_context(|| InvalidParquetSnafu {
205 file: file_path,
206 reason: format!("No value for key {}", PARQUET_METADATA_KEY),
207 })?;
208 let region_metadata = match region_metadata {
209 Some(region_metadata) => region_metadata,
210 None => Arc::new(
211 store_api::metadata::RegionMetadata::from_json(json)
212 .context(InvalidMetadataSnafu)?,
213 ),
214 };
215 let region_metadata_weight = region_metadata.estimated_size().max(json.len());
217 let parquet_metadata = Arc::new(strip_region_metadata_from_parquet(parquet_metadata));
218
219 Ok(Self {
220 parquet_metadata,
221 region_metadata,
222 region_metadata_weight,
223 page_index_policy,
224 })
225 }
226
227 pub(crate) fn parquet_metadata(&self) -> Arc<ParquetMetaData> {
228 self.parquet_metadata.clone()
229 }
230
231 pub(crate) fn region_metadata(&self) -> RegionMetadataRef {
232 self.region_metadata.clone()
233 }
234
235 fn satisfies_page_index_policy(&self, requested: PageIndexPolicy) -> bool {
236 match requested {
237 PageIndexPolicy::Skip => true,
238 PageIndexPolicy::Optional => self.page_index_policy != PageIndexPolicy::Skip,
239 PageIndexPolicy::Required => self.page_index_policy == PageIndexPolicy::Required,
240 }
241 }
242}
243
244fn infer_loaded_page_index_policy(parquet_metadata: &ParquetMetaData) -> PageIndexPolicy {
245 if parquet_metadata.column_index().is_some() || parquet_metadata.offset_index().is_some() {
246 PageIndexPolicy::Optional
247 } else {
248 PageIndexPolicy::Skip
249 }
250}
251
252fn strip_region_metadata_from_parquet(parquet_metadata: ParquetMetaData) -> ParquetMetaData {
253 let file_metadata = parquet_metadata.file_metadata();
254 let filtered_key_values = file_metadata.key_value_metadata().and_then(|key_values| {
255 let filtered = key_values
256 .iter()
257 .filter(|kv| kv.key != PARQUET_METADATA_KEY)
258 .cloned()
259 .collect::<Vec<_>>();
260 (!filtered.is_empty()).then_some(filtered)
261 });
262 let stripped_file_metadata = FileMetaData::new(
263 file_metadata.version(),
264 file_metadata.num_rows(),
265 file_metadata.created_by().map(ToString::to_string),
266 filtered_key_values,
267 file_metadata.schema_descr_ptr(),
268 file_metadata.column_orders().cloned(),
269 );
270
271 let mut builder = parquet_metadata.into_builder();
272 let row_groups = builder.take_row_groups();
273 let column_index = builder.take_column_index();
274 let offset_index = builder.take_offset_index();
275
276 parquet::file::metadata::ParquetMetaDataBuilder::new(stripped_file_metadata)
277 .set_row_groups(row_groups)
278 .set_column_index(column_index)
279 .set_offset_index(offset_index)
280 .build()
281}
282
283fn removal_cause_str(cause: RemovalCause) -> &'static str {
284 match cause {
285 RemovalCause::Expired => "expired",
286 RemovalCause::Explicit => "explicit",
287 RemovalCause::Replaced => "replaced",
288 RemovalCause::Size => "size",
289 }
290}
291
292#[derive(Debug, Clone, PartialEq, Eq, Hash)]
293pub(crate) struct PrefilterRowSelector {
294 row_count: usize,
295 skip: bool,
296}
297
298impl From<&RowSelector> for PrefilterRowSelector {
303 fn from(selector: &RowSelector) -> Self {
304 Self {
305 row_count: selector.row_count,
306 skip: selector.skip,
307 }
308 }
309}
310
311#[derive(Debug, Clone, PartialEq, Eq, Hash)]
313pub(crate) struct PrefilterKey {
314 file_id: FileId,
315 row_group_idx: u32,
316 row_selection: Option<Arc<Vec<PrefilterRowSelector>>>,
317 schema_version: u64,
318 filter_exprs: SmallVec<[String; 1]>,
319 mem_usage: usize,
320}
321
322impl PrefilterKey {
323 pub(crate) fn row_selection_snapshot(
324 row_selection: Option<&RowSelection>,
325 ) -> Option<Arc<Vec<PrefilterRowSelector>>> {
326 row_selection.map(|selection| {
327 Arc::new(
328 selection
329 .iter()
330 .map(PrefilterRowSelector::from)
331 .collect::<Vec<_>>(),
332 )
333 })
334 }
335
336 pub(crate) fn new(
337 file_id: FileId,
338 row_group_idx: u32,
339 row_selection: Option<Arc<Vec<PrefilterRowSelector>>>,
340 schema_version: u64,
341 filter_exprs: SmallVec<[String; 1]>,
342 ) -> Self {
343 let row_selection_bytes = row_selection
344 .as_ref()
345 .map(|selection| selection.len() * mem::size_of::<PrefilterRowSelector>())
346 .unwrap_or(0);
347 let spilled_expr_bytes = if filter_exprs.spilled() {
348 filter_exprs.capacity() * mem::size_of::<String>()
349 } else {
350 0
351 };
352 let expr_bytes = filter_exprs.iter().map(|s| s.capacity()).sum::<usize>();
353
354 Self {
355 file_id,
356 row_group_idx,
357 row_selection,
358 schema_version,
359 filter_exprs,
360 mem_usage: mem::size_of::<Self>()
361 + row_selection_bytes
362 + spilled_expr_bytes
363 + expr_bytes,
364 }
365 }
366
367 fn mem_usage(&self) -> usize {
368 self.mem_usage
369 }
370}
371
372type PrefilterResultCache = Cache<PrefilterKey, Arc<BooleanBuffer>>;
373
374fn new_prefilter_result_cache(capacity: u64) -> PrefilterResultCache {
375 Cache::builder()
376 .max_capacity(capacity)
377 .weigher(prefilter_result_cache_weight)
378 .eviction_listener(|k, v, cause| {
379 let size = prefilter_result_cache_weight(&k, &v);
380 CACHE_BYTES
381 .with_label_values(&[PREFILTER_RESULT_TYPE])
382 .sub(size.into());
383 CACHE_EVICTION
384 .with_label_values(&[PREFILTER_RESULT_TYPE, removal_cause_str(cause)])
385 .inc();
386 })
387 .build()
388}
389
390fn prefilter_result_cache_weight(k: &PrefilterKey, v: &Arc<BooleanBuffer>) -> u32 {
391 (k.mem_usage() + mem::size_of::<BooleanBuffer>() + v.values().len()) as u32
392}
393
394#[derive(Clone)]
396pub enum CacheStrategy {
397 EnableAll(CacheManagerRef),
400 Compaction(CacheManagerRef),
405 Disabled,
407}
408
409impl CacheStrategy {
410 pub(crate) async fn get_sst_meta_data(
412 &self,
413 file_id: RegionFileId,
414 metrics: &mut MetadataCacheMetrics,
415 page_index_policy: PageIndexPolicy,
416 ) -> Option<Arc<CachedSstMeta>> {
417 match self {
418 CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => {
419 cache_manager
420 .get_sst_meta_data(file_id, metrics, page_index_policy)
421 .await
422 }
423 CacheStrategy::Disabled => {
424 metrics.cache_miss += 1;
425 None
426 }
427 }
428 }
429
430 pub(crate) fn get_sst_meta_data_from_mem_cache(
432 &self,
433 file_id: RegionFileId,
434 page_index_policy: PageIndexPolicy,
435 ) -> Option<Arc<CachedSstMeta>> {
436 match self {
437 CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => {
438 cache_manager.get_sst_meta_data_from_mem_cache(file_id, page_index_policy)
439 }
440 CacheStrategy::Disabled => None,
441 }
442 }
443
444 pub fn get_parquet_meta_data_from_mem_cache(
446 &self,
447 file_id: RegionFileId,
448 ) -> Option<Arc<ParquetMetaData>> {
449 self.get_sst_meta_data_from_mem_cache(file_id, PageIndexPolicy::Skip)
450 .map(|metadata| metadata.parquet_metadata())
451 }
452
453 pub(crate) fn put_sst_meta_data(&self, file_id: RegionFileId, metadata: Arc<CachedSstMeta>) {
455 match self {
456 CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => {
457 cache_manager.put_sst_meta_data(file_id, metadata);
458 }
459 CacheStrategy::Disabled => {}
460 }
461 }
462
463 pub fn put_parquet_meta_data(
465 &self,
466 file_id: RegionFileId,
467 metadata: Arc<ParquetMetaData>,
468 region_metadata: Option<RegionMetadataRef>,
469 ) {
470 match self {
471 CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => {
472 cache_manager.put_parquet_meta_data(file_id, metadata, region_metadata);
473 }
474 CacheStrategy::Disabled => {}
475 }
476 }
477
478 pub(crate) fn get_prefilter_result(&self, key: &PrefilterKey) -> Option<Arc<BooleanBuffer>> {
481 match self {
482 CacheStrategy::EnableAll(cache_manager) => cache_manager.get_prefilter_result(key),
483 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
484 }
485 }
486
487 pub(crate) fn put_prefilter_result(&self, key: PrefilterKey, result: Arc<BooleanBuffer>) {
490 if let CacheStrategy::EnableAll(cache_manager) = self {
491 cache_manager.put_prefilter_result(key, result);
492 }
493 }
494
495 pub fn remove_parquet_meta_data(&self, file_id: RegionFileId) {
497 match self {
498 CacheStrategy::EnableAll(cache_manager) => {
499 cache_manager.remove_parquet_meta_data(file_id);
500 }
501 CacheStrategy::Compaction(cache_manager) => {
502 cache_manager.remove_parquet_meta_data(file_id);
503 }
504 CacheStrategy::Disabled => {}
505 }
506 }
507
508 pub fn get_repeated_vector(
511 &self,
512 data_type: &ConcreteDataType,
513 value: &Value,
514 ) -> Option<VectorRef> {
515 match self {
516 CacheStrategy::EnableAll(cache_manager) => {
517 cache_manager.get_repeated_vector(data_type, value)
518 }
519 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
520 }
521 }
522
523 pub fn put_repeated_vector(&self, value: Value, vector: VectorRef) {
526 if let CacheStrategy::EnableAll(cache_manager) = self {
527 cache_manager.put_repeated_vector(value, vector);
528 }
529 }
530
531 pub fn get_page_ranges(
534 &self,
535 file_id: FileId,
536 row_group_idx: usize,
537 ranges: &[Range<u64>],
538 ) -> Option<PageRangeLookup> {
539 match self {
540 CacheStrategy::EnableAll(cache_manager) => {
541 cache_manager.get_page_ranges(file_id, row_group_idx, ranges)
542 }
543 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
544 }
545 }
546
547 pub fn put_page_ranges(
550 &self,
551 file_id: FileId,
552 row_group_idx: usize,
553 ranges: &[Range<u64>],
554 pages: &[Bytes],
555 ) {
556 if let CacheStrategy::EnableAll(cache_manager) = self {
557 cache_manager.put_page_ranges(file_id, row_group_idx, ranges, pages);
558 }
559 }
560
561 pub async fn evict_puffin_cache(&self, file_id: RegionIndexId) {
563 match self {
564 CacheStrategy::EnableAll(cache_manager) => {
565 cache_manager.evict_puffin_cache(file_id).await
566 }
567 CacheStrategy::Compaction(cache_manager) => {
568 cache_manager.evict_puffin_cache(file_id).await
569 }
570 CacheStrategy::Disabled => {}
571 }
572 }
573
574 pub fn get_selector_result(
577 &self,
578 selector_key: &SelectorResultKey,
579 ) -> Option<Arc<SelectorResultValue>> {
580 match self {
581 CacheStrategy::EnableAll(cache_manager) => {
582 cache_manager.get_selector_result(selector_key)
583 }
584 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
585 }
586 }
587
588 pub fn put_selector_result(
591 &self,
592 selector_key: SelectorResultKey,
593 result: Arc<SelectorResultValue>,
594 ) {
595 if let CacheStrategy::EnableAll(cache_manager) = self {
596 cache_manager.put_selector_result(selector_key, result);
597 }
598 }
599
600 #[allow(dead_code)]
603 pub(crate) fn get_range_result(
604 &self,
605 key: &RangeScanCacheKey,
606 ) -> Option<Arc<RangeScanCacheValue>> {
607 match self {
608 CacheStrategy::EnableAll(cache_manager) => cache_manager.get_range_result(key),
609 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
610 }
611 }
612
613 pub(crate) fn put_range_result(
616 &self,
617 key: RangeScanCacheKey,
618 result: Arc<RangeScanCacheValue>,
619 ) {
620 if let CacheStrategy::EnableAll(cache_manager) = self {
621 cache_manager.put_range_result(key, result);
622 }
623 }
624
625 pub(crate) fn has_range_result_cache(&self) -> bool {
627 match self {
628 CacheStrategy::EnableAll(cache_manager) => cache_manager.has_range_result_cache(),
629 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => false,
630 }
631 }
632
633 pub(crate) fn range_result_memory_limiter(&self) -> Option<&Arc<RangeResultMemoryLimiter>> {
634 match self {
635 CacheStrategy::EnableAll(cache_manager) => {
636 Some(cache_manager.range_result_memory_limiter())
637 }
638 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
639 }
640 }
641
642 pub(crate) fn range_result_cache_size(&self) -> Option<usize> {
643 match self {
644 CacheStrategy::EnableAll(cache_manager) => {
645 Some(cache_manager.range_result_cache_size())
646 }
647 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
648 }
649 }
650
651 pub fn write_cache(&self) -> Option<&WriteCacheRef> {
654 match self {
655 CacheStrategy::EnableAll(cache_manager) => cache_manager.write_cache(),
656 CacheStrategy::Compaction(cache_manager) => cache_manager.write_cache(),
657 CacheStrategy::Disabled => None,
658 }
659 }
660
661 pub fn inverted_index_cache(&self) -> Option<&InvertedIndexCacheRef> {
664 match self {
665 CacheStrategy::EnableAll(cache_manager) => cache_manager.inverted_index_cache(),
666 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
667 }
668 }
669
670 pub fn bloom_filter_index_cache(&self) -> Option<&BloomFilterIndexCacheRef> {
673 match self {
674 CacheStrategy::EnableAll(cache_manager) => cache_manager.bloom_filter_index_cache(),
675 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
676 }
677 }
678
679 #[cfg(feature = "vector_index")]
682 pub fn vector_index_cache(&self) -> Option<&VectorIndexCacheRef> {
683 match self {
684 CacheStrategy::EnableAll(cache_manager) => cache_manager.vector_index_cache(),
685 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
686 }
687 }
688
689 pub fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
692 match self {
693 CacheStrategy::EnableAll(cache_manager) => cache_manager.puffin_metadata_cache(),
694 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
695 }
696 }
697
698 pub fn index_result_cache(&self) -> Option<&IndexResultCache> {
701 match self {
702 CacheStrategy::EnableAll(cache_manager) => cache_manager.index_result_cache(),
703 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
704 }
705 }
706
707 pub fn maybe_download_background(
709 &self,
710 index_key: IndexKey,
711 remote_path: String,
712 remote_store: ObjectStore,
713 file_size: u64,
714 ) {
715 if let CacheStrategy::EnableAll(cache_manager) = self
716 && let Some(write_cache) = cache_manager.write_cache()
717 {
718 write_cache.file_cache().maybe_download_background(
719 index_key,
720 remote_path,
721 remote_store,
722 file_size,
723 );
724 }
725 }
726}
727
728#[derive(Default)]
732pub struct CacheManager {
733 sst_meta_cache: Option<SstMetaCache>,
735 vector_cache: Option<VectorCache>,
737 page_cache: Option<Arc<PageRangeCache>>,
739 write_cache: Option<WriteCacheRef>,
741 inverted_index_cache: Option<InvertedIndexCacheRef>,
743 bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
745 #[cfg(feature = "vector_index")]
747 vector_index_cache: Option<VectorIndexCacheRef>,
748 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
750 selector_result_cache: Option<SelectorResultCache>,
752 range_result_cache: Option<RangeResultCache>,
754 range_result_cache_size: u64,
756 range_result_memory_limiter: Arc<RangeResultMemoryLimiter>,
758 index_result_cache: Option<IndexResultCache>,
760 prefilter_result_cache: Option<PrefilterResultCache>,
762}
763
764pub type CacheManagerRef = Arc<CacheManager>;
765
766impl CacheManager {
767 pub fn builder() -> CacheManagerBuilder {
769 CacheManagerBuilder::default()
770 }
771
772 pub(crate) async fn get_sst_meta_data(
775 &self,
776 file_id: RegionFileId,
777 metrics: &mut MetadataCacheMetrics,
778 page_index_policy: PageIndexPolicy,
779 ) -> Option<Arc<CachedSstMeta>> {
780 if let Some(metadata) = self.get_sst_meta_data_from_mem_cache(file_id, page_index_policy) {
781 metrics.mem_cache_hit += 1;
782 return Some(metadata);
783 }
784
785 let key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Parquet);
786 if let Some(write_cache) = &self.write_cache
787 && let Some(metadata) = write_cache
788 .file_cache()
789 .get_sst_meta_data(key, metrics, page_index_policy)
790 .await
791 {
792 metrics.file_cache_hit += 1;
793 self.put_sst_meta_data(file_id, metadata.clone());
794 return Some(metadata);
795 }
796
797 metrics.cache_miss += 1;
798 None
799 }
800
801 pub(crate) async fn get_parquet_meta_data(
804 &self,
805 file_id: RegionFileId,
806 metrics: &mut MetadataCacheMetrics,
807 page_index_policy: PageIndexPolicy,
808 ) -> Option<Arc<ParquetMetaData>> {
809 self.get_sst_meta_data(file_id, metrics, page_index_policy)
810 .await
811 .map(|metadata| metadata.parquet_metadata())
812 }
813
814 pub(crate) fn get_sst_meta_data_from_mem_cache(
817 &self,
818 file_id: RegionFileId,
819 page_index_policy: PageIndexPolicy,
820 ) -> Option<Arc<CachedSstMeta>> {
821 self.sst_meta_cache.as_ref().and_then(|sst_meta_cache| {
822 let value = sst_meta_cache.get(&SstMetaKey(file_id.region_id(), file_id.file_id()));
823 let value =
824 value.filter(|metadata| metadata.satisfies_page_index_policy(page_index_policy));
825 update_hit_miss(value, SST_META_TYPE)
826 })
827 }
828
829 pub fn get_parquet_meta_data_from_mem_cache(
832 &self,
833 file_id: RegionFileId,
834 ) -> Option<Arc<ParquetMetaData>> {
835 self.get_sst_meta_data_from_mem_cache(file_id, PageIndexPolicy::Skip)
836 .map(|metadata| metadata.parquet_metadata())
837 }
838
839 pub(crate) fn put_sst_meta_data(&self, file_id: RegionFileId, metadata: Arc<CachedSstMeta>) {
841 if let Some(cache) = &self.sst_meta_cache {
842 let key = SstMetaKey(file_id.region_id(), file_id.file_id());
843 CACHE_BYTES
844 .with_label_values(&[SST_META_TYPE])
845 .add(meta_cache_weight(&key, &metadata).into());
846 cache.insert(key, metadata);
847 }
848 }
849
850 pub fn put_parquet_meta_data(
852 &self,
853 file_id: RegionFileId,
854 metadata: Arc<ParquetMetaData>,
855 region_metadata: Option<RegionMetadataRef>,
856 ) {
857 if self.sst_meta_cache.is_some() {
858 let file_path = format!(
859 "region_id={}, file_id={}",
860 file_id.region_id(),
861 file_id.file_id()
862 );
863 match CachedSstMeta::try_new_with_region_metadata(
864 &file_path,
865 Arc::unwrap_or_clone(metadata),
866 region_metadata,
867 ) {
868 Ok(metadata) => self.put_sst_meta_data(file_id, Arc::new(metadata)),
869 Err(err) => warn!(
870 err; "Failed to decode region metadata while caching parquet metadata, region_id: {}, file_id: {}",
871 file_id.region_id(),
872 file_id.file_id()
873 ),
874 }
875 }
876 }
877
878 pub fn remove_parquet_meta_data(&self, file_id: RegionFileId) {
880 if let Some(cache) = &self.sst_meta_cache {
881 cache.remove(&SstMetaKey(file_id.region_id(), file_id.file_id()));
882 }
883 }
884
885 pub(crate) fn sst_meta_cache_weighted_size(&self) -> u64 {
887 self.sst_meta_cache
888 .as_ref()
889 .map(|cache| cache.weighted_size())
890 .unwrap_or(0)
891 }
892
893 pub(crate) fn sst_meta_cache_enabled(&self) -> bool {
895 self.sst_meta_cache.is_some()
896 }
897
898 pub fn get_repeated_vector(
900 &self,
901 data_type: &ConcreteDataType,
902 value: &Value,
903 ) -> Option<VectorRef> {
904 self.vector_cache.as_ref().and_then(|vector_cache| {
905 let value = vector_cache.get(&(data_type.clone(), value.clone()));
906 update_hit_miss(value, VECTOR_TYPE)
907 })
908 }
909
910 pub fn put_repeated_vector(&self, value: Value, vector: VectorRef) {
912 if let Some(cache) = &self.vector_cache {
913 let key = (vector.data_type(), value);
914 CACHE_BYTES
915 .with_label_values(&[VECTOR_TYPE])
916 .add(vector_cache_weight(&key, &vector).into());
917 cache.insert(key, vector);
918 }
919 }
920
921 pub fn get_page_ranges(
923 &self,
924 file_id: FileId,
925 row_group_idx: usize,
926 ranges: &[Range<u64>],
927 ) -> Option<PageRangeLookup> {
928 self.page_cache.as_ref().map(|page_cache| {
929 let lookup = page_cache.lookup(file_id, row_group_idx, ranges);
930 if lookup.cached_bytes > 0 {
931 CACHE_HIT.with_label_values(&[PAGE_TYPE]).inc();
932 }
933 if !lookup.missing_ranges.is_empty() {
934 CACHE_MISS.with_label_values(&[PAGE_TYPE]).inc();
935 }
936 lookup
937 })
938 }
939
940 pub fn put_page_ranges(
942 &self,
943 file_id: FileId,
944 row_group_idx: usize,
945 ranges: &[Range<u64>],
946 pages: &[Bytes],
947 ) {
948 if let Some(cache) = &self.page_cache {
949 cache.insert_ranges(file_id, row_group_idx, ranges, pages);
950 }
951 }
952
953 pub async fn evict_puffin_cache(&self, file_id: RegionIndexId) {
955 if let Some(cache) = &self.bloom_filter_index_cache {
956 cache.invalidate_file(file_id.file_id());
957 }
958
959 if let Some(cache) = &self.inverted_index_cache {
960 cache.invalidate_file(file_id.file_id());
961 }
962
963 if let Some(cache) = &self.index_result_cache {
964 cache.invalidate_file(file_id.file_id());
965 }
966
967 #[cfg(feature = "vector_index")]
968 if let Some(cache) = &self.vector_index_cache {
969 cache.invalidate_file(file_id.file_id());
970 }
971
972 if let Some(cache) = &self.puffin_metadata_cache {
973 cache.remove(&file_id.to_string());
974 }
975
976 if let Some(write_cache) = &self.write_cache {
977 write_cache
978 .remove(IndexKey::new(
979 file_id.region_id(),
980 file_id.file_id(),
981 FileType::Puffin(file_id.version),
982 ))
983 .await;
984 }
985 }
986
987 pub fn get_selector_result(
989 &self,
990 selector_key: &SelectorResultKey,
991 ) -> Option<Arc<SelectorResultValue>> {
992 self.selector_result_cache
993 .as_ref()
994 .and_then(|selector_result_cache| selector_result_cache.get(selector_key))
995 }
996
997 pub fn put_selector_result(
999 &self,
1000 selector_key: SelectorResultKey,
1001 result: Arc<SelectorResultValue>,
1002 ) {
1003 if let Some(cache) = &self.selector_result_cache {
1004 CACHE_BYTES
1005 .with_label_values(&[SELECTOR_RESULT_TYPE])
1006 .add(selector_result_cache_weight(&selector_key, &result).into());
1007 cache.insert(selector_key, result);
1008 }
1009 }
1010
1011 #[allow(dead_code)]
1013 pub(crate) fn get_range_result(
1014 &self,
1015 key: &RangeScanCacheKey,
1016 ) -> Option<Arc<RangeScanCacheValue>> {
1017 self.range_result_cache
1018 .as_ref()
1019 .and_then(|cache| update_hit_miss(cache.get(key), RANGE_RESULT_TYPE))
1020 }
1021
1022 pub(crate) fn put_range_result(
1024 &self,
1025 key: RangeScanCacheKey,
1026 result: Arc<RangeScanCacheValue>,
1027 ) {
1028 if let Some(cache) = &self.range_result_cache {
1029 CACHE_BYTES
1030 .with_label_values(&[RANGE_RESULT_TYPE])
1031 .add(range_result_cache_weight(&key, &result).into());
1032 cache.insert(key, result);
1033 }
1034 }
1035
1036 pub(crate) fn has_range_result_cache(&self) -> bool {
1038 self.range_result_cache.is_some()
1039 }
1040
1041 pub(crate) fn range_result_memory_limiter(&self) -> &Arc<RangeResultMemoryLimiter> {
1042 &self.range_result_memory_limiter
1043 }
1044
1045 pub(crate) fn range_result_cache_size(&self) -> usize {
1046 self.range_result_cache_size as usize
1047 }
1048
1049 pub(crate) fn write_cache(&self) -> Option<&WriteCacheRef> {
1051 self.write_cache.as_ref()
1052 }
1053
1054 pub(crate) fn inverted_index_cache(&self) -> Option<&InvertedIndexCacheRef> {
1055 self.inverted_index_cache.as_ref()
1056 }
1057
1058 pub(crate) fn bloom_filter_index_cache(&self) -> Option<&BloomFilterIndexCacheRef> {
1059 self.bloom_filter_index_cache.as_ref()
1060 }
1061
1062 #[cfg(feature = "vector_index")]
1063 pub(crate) fn vector_index_cache(&self) -> Option<&VectorIndexCacheRef> {
1064 self.vector_index_cache.as_ref()
1065 }
1066
1067 pub(crate) fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
1068 self.puffin_metadata_cache.as_ref()
1069 }
1070
1071 pub(crate) fn index_result_cache(&self) -> Option<&IndexResultCache> {
1072 self.index_result_cache.as_ref()
1073 }
1074
1075 pub(crate) fn get_prefilter_result(&self, key: &PrefilterKey) -> Option<Arc<BooleanBuffer>> {
1076 self.prefilter_result_cache
1077 .as_ref()
1078 .and_then(|cache| update_hit_miss(cache.get(key), PREFILTER_RESULT_TYPE))
1079 }
1080
1081 pub(crate) fn put_prefilter_result(&self, key: PrefilterKey, result: Arc<BooleanBuffer>) {
1082 if let Some(cache) = &self.prefilter_result_cache {
1083 CACHE_BYTES
1084 .with_label_values(&[PREFILTER_RESULT_TYPE])
1085 .add(prefilter_result_cache_weight(&key, &result).into());
1086 cache.insert(key, result);
1087 }
1088 }
1089}
1090
1091pub fn selector_result_cache_miss() {
1093 CACHE_MISS.with_label_values(&[SELECTOR_RESULT_TYPE]).inc()
1094}
1095
1096pub fn selector_result_cache_hit() {
1098 CACHE_HIT.with_label_values(&[SELECTOR_RESULT_TYPE]).inc()
1099}
1100
1101#[derive(Default)]
1103pub struct CacheManagerBuilder {
1104 sst_meta_cache_size: u64,
1105 vector_cache_size: u64,
1106 page_cache_size: u64,
1107 index_metadata_size: u64,
1108 index_content_size: u64,
1109 index_content_page_size: u64,
1110 index_result_cache_size: u64,
1111 prefilter_result_cache_size: u64,
1112 puffin_metadata_size: u64,
1113 write_cache: Option<WriteCacheRef>,
1114 selector_result_cache_size: u64,
1115 range_result_cache_size: u64,
1116}
1117
1118impl CacheManagerBuilder {
1119 pub fn sst_meta_cache_size(mut self, bytes: u64) -> Self {
1121 self.sst_meta_cache_size = bytes;
1122 self
1123 }
1124
1125 pub fn vector_cache_size(mut self, bytes: u64) -> Self {
1127 self.vector_cache_size = bytes;
1128 self
1129 }
1130
1131 pub fn page_cache_size(mut self, bytes: u64) -> Self {
1133 self.page_cache_size = bytes;
1134 self
1135 }
1136
1137 pub fn write_cache(mut self, cache: Option<WriteCacheRef>) -> Self {
1139 self.write_cache = cache;
1140 self
1141 }
1142
1143 pub fn index_metadata_size(mut self, bytes: u64) -> Self {
1145 self.index_metadata_size = bytes;
1146 self
1147 }
1148
1149 pub fn index_content_size(mut self, bytes: u64) -> Self {
1151 self.index_content_size = bytes;
1152 self
1153 }
1154
1155 pub fn index_content_page_size(mut self, bytes: u64) -> Self {
1157 self.index_content_page_size = bytes;
1158 self
1159 }
1160
1161 pub fn index_result_cache_size(mut self, bytes: u64) -> Self {
1163 self.index_result_cache_size = bytes;
1164 self
1165 }
1166
1167 pub fn prefilter_result_cache_size(mut self, bytes: u64) -> Self {
1169 self.prefilter_result_cache_size = bytes;
1170 self
1171 }
1172
1173 pub fn puffin_metadata_size(mut self, bytes: u64) -> Self {
1175 self.puffin_metadata_size = bytes;
1176 self
1177 }
1178
1179 pub fn selector_result_cache_size(mut self, bytes: u64) -> Self {
1181 self.selector_result_cache_size = bytes;
1182 self
1183 }
1184
1185 pub fn range_result_cache_size(mut self, bytes: u64) -> Self {
1187 self.range_result_cache_size = bytes;
1188 self
1189 }
1190
1191 pub fn build(self) -> CacheManager {
1193 let sst_meta_cache = (self.sst_meta_cache_size != 0).then(|| {
1194 Cache::builder()
1195 .max_capacity(self.sst_meta_cache_size)
1196 .weigher(meta_cache_weight)
1197 .eviction_listener(|k, v, cause| {
1198 let size = meta_cache_weight(&k, &v);
1199 CACHE_BYTES
1200 .with_label_values(&[SST_META_TYPE])
1201 .sub(size.into());
1202 CACHE_EVICTION
1203 .with_label_values(&[SST_META_TYPE, removal_cause_str(cause)])
1204 .inc();
1205 })
1206 .build()
1207 });
1208 let vector_cache = (self.vector_cache_size != 0).then(|| {
1209 Cache::builder()
1210 .max_capacity(self.vector_cache_size)
1211 .weigher(vector_cache_weight)
1212 .eviction_listener(|k, v, cause| {
1213 let size = vector_cache_weight(&k, &v);
1214 CACHE_BYTES
1215 .with_label_values(&[VECTOR_TYPE])
1216 .sub(size.into());
1217 CACHE_EVICTION
1218 .with_label_values(&[VECTOR_TYPE, removal_cause_str(cause)])
1219 .inc();
1220 })
1221 .build()
1222 });
1223 let page_cache =
1224 (self.page_cache_size != 0).then(|| PageRangeCache::new(self.page_cache_size));
1225 let inverted_index_cache = InvertedIndexCache::new(
1226 self.index_metadata_size,
1227 self.index_content_size,
1228 self.index_content_page_size,
1229 );
1230 let bloom_filter_index_cache = BloomFilterIndexCache::new(
1232 self.index_metadata_size,
1233 self.index_content_size,
1234 self.index_content_page_size,
1235 );
1236 #[cfg(feature = "vector_index")]
1237 let vector_index_cache = (self.index_content_size != 0)
1238 .then(|| Arc::new(VectorIndexCache::new(self.index_content_size)));
1239 let index_result_cache = (self.index_result_cache_size != 0)
1240 .then(|| IndexResultCache::new(self.index_result_cache_size));
1241 let prefilter_result_cache = (self.prefilter_result_cache_size != 0)
1242 .then(|| new_prefilter_result_cache(self.prefilter_result_cache_size));
1243 let puffin_metadata_cache =
1244 PuffinMetadataCache::new(self.puffin_metadata_size, &CACHE_BYTES);
1245 let selector_result_cache = (self.selector_result_cache_size != 0).then(|| {
1246 Cache::builder()
1247 .max_capacity(self.selector_result_cache_size)
1248 .weigher(selector_result_cache_weight)
1249 .eviction_listener(|k, v, cause| {
1250 let size = selector_result_cache_weight(&k, &v);
1251 CACHE_BYTES
1252 .with_label_values(&[SELECTOR_RESULT_TYPE])
1253 .sub(size.into());
1254 CACHE_EVICTION
1255 .with_label_values(&[SELECTOR_RESULT_TYPE, removal_cause_str(cause)])
1256 .inc();
1257 })
1258 .build()
1259 });
1260 let range_result_cache = (self.range_result_cache_size != 0).then(|| {
1261 Cache::builder()
1262 .max_capacity(self.range_result_cache_size)
1263 .weigher(range_result_cache_weight)
1264 .eviction_listener(move |k, v, cause| {
1265 let size = range_result_cache_weight(&k, &v);
1266 CACHE_BYTES
1267 .with_label_values(&[RANGE_RESULT_TYPE])
1268 .sub(size.into());
1269 CACHE_EVICTION
1270 .with_label_values(&[RANGE_RESULT_TYPE, removal_cause_str(cause)])
1271 .inc();
1272 })
1273 .build()
1274 });
1275 CacheManager {
1276 sst_meta_cache,
1277 vector_cache,
1278 page_cache,
1279 write_cache: self.write_cache,
1280 inverted_index_cache: Some(Arc::new(inverted_index_cache)),
1281 bloom_filter_index_cache: Some(Arc::new(bloom_filter_index_cache)),
1282 #[cfg(feature = "vector_index")]
1283 vector_index_cache,
1284 puffin_metadata_cache: Some(Arc::new(puffin_metadata_cache)),
1285 selector_result_cache,
1286 range_result_cache,
1287 range_result_cache_size: self.range_result_cache_size,
1288 range_result_memory_limiter: Arc::new(RangeResultMemoryLimiter::new(
1289 self.range_result_cache_size as usize,
1290 RANGE_RESULT_CONCAT_MEMORY_PERMIT.as_bytes() as usize,
1291 )),
1292 index_result_cache,
1293 prefilter_result_cache,
1294 }
1295 }
1296}
1297
1298fn meta_cache_weight(k: &SstMetaKey, v: &Arc<CachedSstMeta>) -> u32 {
1299 let size =
1301 k.estimated_size() + parquet_meta_size(&v.parquet_metadata) + v.region_metadata_weight;
1302 u32::try_from(size).unwrap_or(u32::MAX)
1303}
1304
1305fn vector_cache_weight(_k: &(ConcreteDataType, Value), v: &VectorRef) -> u32 {
1306 (mem::size_of::<ConcreteDataType>() + mem::size_of::<Value>() + v.memory_size()) as u32
1308}
1309
1310fn page_cache_weight(k: &PageFragmentKey, v: &Bytes) -> u32 {
1311 (k.estimated_size() + mem::size_of::<Bytes>() + v.len()) as u32
1312}
1313
1314fn selector_result_cache_weight(k: &SelectorResultKey, v: &Arc<SelectorResultValue>) -> u32 {
1315 (mem::size_of_val(k) + v.estimated_size()) as u32
1316}
1317
1318fn range_result_cache_weight(k: &RangeScanCacheKey, v: &Arc<RangeScanCacheValue>) -> u32 {
1319 (k.estimated_size() + v.estimated_size()) as u32
1320}
1321
1322fn update_hit_miss<T>(value: Option<T>, cache_type: &str) -> Option<T> {
1324 if value.is_some() {
1325 CACHE_HIT.with_label_values(&[cache_type]).inc();
1326 } else {
1327 CACHE_MISS.with_label_values(&[cache_type]).inc();
1328 }
1329 value
1330}
1331
1332#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1334struct SstMetaKey(RegionId, FileId);
1335
1336impl SstMetaKey {
1337 fn estimated_size(&self) -> usize {
1339 mem::size_of::<Self>()
1340 }
1341}
1342
1343#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1344struct PageFragmentGroupKey {
1345 file_id: FileId,
1346 row_group_idx: usize,
1347}
1348
1349#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1351pub struct PageFragmentKey {
1352 file_id: FileId,
1354 row_group_idx: usize,
1356 start: u64,
1358 end: u64,
1360}
1361
1362impl PageFragmentKey {
1363 fn new(file_id: FileId, row_group_idx: usize, range: &Range<u64>) -> PageFragmentKey {
1364 PageFragmentKey {
1365 file_id,
1366 row_group_idx,
1367 start: range.start,
1368 end: range.end,
1369 }
1370 }
1371
1372 fn group_key(&self) -> PageFragmentGroupKey {
1373 PageFragmentGroupKey {
1374 file_id: self.file_id,
1375 row_group_idx: self.row_group_idx,
1376 }
1377 }
1378
1379 fn estimated_size(&self) -> usize {
1381 mem::size_of::<Self>()
1382 }
1383}
1384
1385#[derive(Clone)]
1387pub struct PageRangePart {
1388 pub range: Range<u64>,
1390 pub bytes: Bytes,
1392}
1393
1394pub struct PageRangeLookup {
1396 pub cached_parts: Vec<Vec<PageRangePart>>,
1398 pub missing_ranges: Vec<Range<u64>>,
1400 pub cached_range_count: usize,
1402 pub cached_bytes: u64,
1404}
1405
1406impl PageRangeLookup {
1407 pub fn is_fully_cached(&self) -> bool {
1408 self.missing_ranges.is_empty()
1409 }
1410}
1411
1412type PageFragmentRangeIndex = BTreeMap<(u64, u64), PageFragmentKey>;
1413type PageFragmentIndex = HashMap<PageFragmentGroupKey, PageFragmentRangeIndex>;
1414
1415pub struct PageRangeCache {
1417 cache: Cache<PageFragmentKey, Bytes>,
1418 index: RwLock<PageFragmentIndex>,
1419}
1420
1421impl PageRangeCache {
1422 fn new(capacity: u64) -> Arc<PageRangeCache> {
1423 Arc::new_cyclic(|weak_cache: &std::sync::Weak<PageRangeCache>| {
1424 let cache = Cache::builder()
1425 .max_capacity(capacity)
1426 .weigher(page_cache_weight)
1427 .eviction_listener({
1428 let weak_cache = weak_cache.clone();
1429 move |k, v, cause| {
1430 let size = page_cache_weight(&k, &v);
1431 CACHE_BYTES.with_label_values(&[PAGE_TYPE]).sub(size.into());
1432 CACHE_EVICTION
1433 .with_label_values(&[PAGE_TYPE, removal_cause_str(cause)])
1434 .inc();
1435
1436 if let Some(cache) = weak_cache.upgrade()
1437 && !matches!(cause, RemovalCause::Replaced)
1438 {
1439 cache.remove_index_entry(*k);
1440 }
1441 }
1442 })
1443 .build();
1444
1445 PageRangeCache {
1446 cache,
1447 index: RwLock::new(HashMap::new()),
1448 }
1449 })
1450 }
1451
1452 fn lookup(
1453 &self,
1454 file_id: FileId,
1455 row_group_idx: usize,
1456 ranges: &[Range<u64>],
1457 ) -> PageRangeLookup {
1458 let mut cached_parts = Vec::with_capacity(ranges.len());
1459 let mut missing_ranges = Vec::new();
1460 let mut cached_range_count = 0;
1461 let mut cached_bytes = 0;
1462
1463 for range in ranges {
1464 if range.start >= range.end {
1465 cached_parts.push(Vec::new());
1466 continue;
1467 }
1468
1469 let mut parts = Vec::new();
1470 let candidates = self.find_index_candidates(file_id, row_group_idx, range);
1471 let mut stale_keys = Vec::new();
1472
1473 for fragment_key in candidates {
1474 if let Some(bytes) = self.cache.get(&fragment_key) {
1475 let part_start = range.start.max(fragment_key.start);
1476 let part_end = range.end.min(fragment_key.end);
1477 let slice_start = (part_start - fragment_key.start) as usize;
1478 let slice_end = (part_end - fragment_key.start) as usize;
1479 parts.push(PageRangePart {
1480 range: part_start..part_end,
1481 bytes: bytes.slice(slice_start..slice_end),
1482 });
1483 } else {
1484 stale_keys.push(fragment_key);
1485 }
1486 }
1487 for key in stale_keys {
1488 self.remove_uncached_index_entry(key);
1489 }
1490
1491 let mut cursor = range.start;
1492 let mut compacted_parts: Vec<PageRangePart> = Vec::with_capacity(parts.len());
1493 for part in parts {
1494 if part.range.end <= cursor {
1495 continue;
1496 }
1497
1498 let part = if part.range.start < cursor {
1499 let offset = (cursor - part.range.start) as usize;
1500 PageRangePart {
1501 range: cursor..part.range.end,
1502 bytes: part.bytes.slice(offset..),
1503 }
1504 } else {
1505 part
1506 };
1507
1508 if cursor < part.range.start {
1509 missing_ranges.push(cursor..part.range.start);
1510 }
1511 cached_bytes += part.range.end - part.range.start;
1512 cached_range_count += 1;
1513 cursor = part.range.end;
1514 compacted_parts.push(part);
1515
1516 if cursor >= range.end {
1517 break;
1518 }
1519 }
1520
1521 if cursor < range.end {
1522 missing_ranges.push(cursor..range.end);
1523 }
1524 cached_parts.push(compacted_parts);
1525 }
1526
1527 PageRangeLookup {
1528 cached_parts,
1529 missing_ranges,
1530 cached_range_count,
1531 cached_bytes,
1532 }
1533 }
1534
1535 fn insert_ranges(
1536 &self,
1537 file_id: FileId,
1538 row_group_idx: usize,
1539 ranges: &[Range<u64>],
1540 pages: &[Bytes],
1541 ) {
1542 for (range, bytes) in ranges.iter().zip(pages) {
1543 if range.start >= range.end || bytes.len() as u64 != range.end - range.start {
1544 continue;
1545 }
1546
1547 let key = PageFragmentKey::new(file_id, row_group_idx, range);
1548 let bytes = Bytes::copy_from_slice(bytes.as_ref());
1549 let size = page_cache_weight(&key, &bytes);
1550 CACHE_BYTES.with_label_values(&[PAGE_TYPE]).add(size.into());
1551 self.cache.insert(key, bytes);
1552 let mut index = self.index.write().unwrap();
1553 index
1554 .entry(key.group_key())
1555 .or_default()
1556 .insert((key.start, key.end), key);
1557 }
1558 }
1559
1560 fn find_index_candidates(
1561 &self,
1562 file_id: FileId,
1563 row_group_idx: usize,
1564 range: &Range<u64>,
1565 ) -> Vec<PageFragmentKey> {
1566 let group_key = PageFragmentGroupKey {
1567 file_id,
1568 row_group_idx,
1569 };
1570 let index = self.index.read().unwrap();
1571 index
1572 .get(&group_key)
1573 .map(|ranges| {
1574 ranges
1575 .range(..(range.end, 0))
1576 .filter_map(|(_, fragment_key)| {
1577 (fragment_key.end > range.start).then_some(*fragment_key)
1578 })
1579 .collect()
1580 })
1581 .unwrap_or_default()
1582 }
1583
1584 fn remove_uncached_index_entry(&self, key: PageFragmentKey) {
1585 let group_key = key.group_key();
1586 let mut index = self.index.write().unwrap();
1587 if self.cache.contains_key(&key) {
1588 return;
1589 }
1590
1591 Self::remove_index_entry_locked(&mut index, group_key, key);
1592 }
1593
1594 fn remove_index_entry(&self, key: PageFragmentKey) {
1595 let group_key = key.group_key();
1596 let mut index = self.index.write().unwrap();
1597 Self::remove_index_entry_locked(&mut index, group_key, key);
1598 }
1599
1600 fn remove_index_entry_locked(
1601 index: &mut PageFragmentIndex,
1602 group_key: PageFragmentGroupKey,
1603 key: PageFragmentKey,
1604 ) {
1605 let Some(ranges) = index.get_mut(&group_key) else {
1606 return;
1607 };
1608
1609 let removed = ranges
1610 .get(&(key.start, key.end))
1611 .is_some_and(|current| current == &key);
1612 if removed {
1613 ranges.remove(&(key.start, key.end));
1614 }
1615 if ranges.is_empty() {
1616 index.remove(&group_key);
1617 }
1618 }
1619}
1620
1621#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1623pub struct SelectorResultKey {
1624 pub file_id: FileId,
1626 pub row_group_idx: usize,
1628 pub selector: TimeSeriesRowSelector,
1630}
1631
1632pub enum SelectorResult {
1634 PrimaryKey(Vec<Batch>),
1636 Flat(Vec<RecordBatch>),
1638}
1639
1640pub struct SelectorResultValue {
1642 pub result: SelectorResult,
1644 pub read_cols: ParquetReadColumns,
1646}
1647
1648impl SelectorResultValue {
1649 pub fn new(result: Vec<Batch>, read_cols: ParquetReadColumns) -> SelectorResultValue {
1651 SelectorResultValue {
1652 result: SelectorResult::PrimaryKey(result),
1653 read_cols,
1654 }
1655 }
1656
1657 pub fn new_flat(
1659 result: Vec<RecordBatch>,
1660 read_cols: ParquetReadColumns,
1661 ) -> SelectorResultValue {
1662 SelectorResultValue {
1663 result: SelectorResult::Flat(result),
1664 read_cols,
1665 }
1666 }
1667
1668 fn estimated_size(&self) -> usize {
1670 match &self.result {
1671 SelectorResult::PrimaryKey(batches) => {
1672 batches.iter().map(|batch| batch.memory_size()).sum()
1673 }
1674 SelectorResult::Flat(batches) => batches.iter().map(record_batch_estimated_size).sum(),
1675 }
1676 }
1677}
1678
1679type SstMetaCache = Cache<SstMetaKey, Arc<CachedSstMeta>>;
1681type VectorCache = Cache<(ConcreteDataType, Value), VectorRef>;
1685type SelectorResultCache = Cache<SelectorResultKey, Arc<SelectorResultValue>>;
1687type RangeResultCache = Cache<RangeScanCacheKey, Arc<RangeScanCacheValue>>;
1689
1690#[cfg(test)]
1691mod tests {
1692 use std::sync::Arc;
1693
1694 use api::v1::SemanticType;
1695 use api::v1::index::{BloomFilterMeta, InvertedIndexMetas};
1696 use datatypes::schema::ColumnSchema;
1697 use datatypes::vectors::Int64Vector;
1698 use puffin::file_metadata::FileMetadata;
1699 use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
1700 use store_api::storage::ColumnId;
1701
1702 use super::*;
1703 use crate::cache::index::bloom_filter_index::Tag;
1704 use crate::cache::index::result_cache::PredicateKey;
1705 use crate::cache::test_util::{
1706 parquet_meta, sst_parquet_meta, sst_parquet_meta_with_region_metadata,
1707 };
1708 use crate::read::range_cache::{
1709 RangeScanCacheKey, RangeScanCacheValue, ScanRequestFingerprintBuilder,
1710 };
1711 use crate::read::read_columns::ReadColumns;
1712 use crate::sst::parquet::row_selection::RowGroupSelection;
1713
1714 #[tokio::test]
1715 async fn test_disable_cache() {
1716 let cache = CacheManager::default();
1717 assert!(cache.sst_meta_cache.is_none());
1718 assert!(cache.vector_cache.is_none());
1719 assert!(cache.page_cache.is_none());
1720
1721 let region_id = RegionId::new(1, 1);
1722 let file_id = RegionFileId::new(region_id, FileId::random());
1723 let metadata = parquet_meta();
1724 let mut metrics = MetadataCacheMetrics::default();
1725 cache.put_parquet_meta_data(file_id, metadata, None);
1726 assert!(
1727 cache
1728 .get_parquet_meta_data(file_id, &mut metrics, Default::default())
1729 .await
1730 .is_none()
1731 );
1732
1733 let value = Value::Int64(10);
1734 let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10]));
1735 cache.put_repeated_vector(value.clone(), vector.clone());
1736 assert!(
1737 cache
1738 .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
1739 .is_none()
1740 );
1741
1742 cache.put_page_ranges(
1743 file_id.file_id(),
1744 1,
1745 &[Range { start: 0, end: 5 }],
1746 &[Bytes::from_static(b"abcde")],
1747 );
1748 assert!(
1749 cache
1750 .get_page_ranges(file_id.file_id(), 1, &[Range { start: 0, end: 5 }])
1751 .is_none()
1752 );
1753
1754 assert!(cache.write_cache().is_none());
1755 }
1756
1757 #[tokio::test]
1758 async fn test_parquet_meta_cache() {
1759 let cache = CacheManager::builder().sst_meta_cache_size(2000).build();
1760 let mut metrics = MetadataCacheMetrics::default();
1761 let region_id = RegionId::new(1, 1);
1762 let file_id = RegionFileId::new(region_id, FileId::random());
1763 assert!(
1764 cache
1765 .get_parquet_meta_data(file_id, &mut metrics, Default::default())
1766 .await
1767 .is_none()
1768 );
1769 let (metadata, region_metadata) = sst_parquet_meta();
1770 cache.put_parquet_meta_data(file_id, metadata, None);
1771 let cached = cache
1772 .get_sst_meta_data(file_id, &mut metrics, Default::default())
1773 .await
1774 .unwrap();
1775 assert_eq!(region_metadata, cached.region_metadata());
1776 assert!(
1777 cached
1778 .parquet_metadata()
1779 .file_metadata()
1780 .key_value_metadata()
1781 .is_none_or(|key_values| {
1782 key_values
1783 .iter()
1784 .all(|key_value| key_value.key != PARQUET_METADATA_KEY)
1785 })
1786 );
1787 cache.remove_parquet_meta_data(file_id);
1788 assert!(
1789 cache
1790 .get_parquet_meta_data(file_id, &mut metrics, Default::default())
1791 .await
1792 .is_none()
1793 );
1794 }
1795
1796 #[tokio::test]
1797 async fn test_parquet_meta_cache_with_provided_region_metadata() {
1798 let cache = CacheManager::builder().sst_meta_cache_size(2000).build();
1799 let mut metrics = MetadataCacheMetrics::default();
1800 let region_id = RegionId::new(1, 1);
1801 let file_id = RegionFileId::new(region_id, FileId::random());
1802 let (metadata, region_metadata) = sst_parquet_meta();
1803
1804 cache.put_parquet_meta_data(file_id, metadata, Some(region_metadata.clone()));
1805
1806 let cached = cache
1807 .get_sst_meta_data(file_id, &mut metrics, Default::default())
1808 .await
1809 .unwrap();
1810 assert!(Arc::ptr_eq(®ion_metadata, &cached.region_metadata()));
1811 }
1812
1813 #[tokio::test]
1814 async fn test_parquet_meta_cache_respects_page_index_policy() {
1815 let cache = CacheManager::builder().sst_meta_cache_size(2000).build();
1816 let region_id = RegionId::new(1, 1);
1817 let file_id = RegionFileId::new(region_id, FileId::random());
1818 let (metadata, _) = sst_parquet_meta();
1819
1820 let skip_metadata = Arc::new(
1821 CachedSstMeta::try_new_with_page_index_policy(
1822 "test.parquet",
1823 Arc::unwrap_or_clone(metadata.clone()),
1824 None,
1825 PageIndexPolicy::Skip,
1826 )
1827 .unwrap(),
1828 );
1829 cache.put_sst_meta_data(file_id, skip_metadata);
1830
1831 let mut metrics = MetadataCacheMetrics::default();
1832 assert!(
1833 cache
1834 .get_sst_meta_data(file_id, &mut metrics, PageIndexPolicy::Optional)
1835 .await
1836 .is_none()
1837 );
1838 assert_eq!(1, metrics.cache_miss);
1839
1840 let optional_metadata = Arc::new(
1841 CachedSstMeta::try_new_with_page_index_policy(
1842 "test.parquet",
1843 Arc::unwrap_or_clone(metadata),
1844 None,
1845 PageIndexPolicy::Optional,
1846 )
1847 .unwrap(),
1848 );
1849 cache.put_sst_meta_data(file_id, optional_metadata);
1850
1851 let mut metrics = MetadataCacheMetrics::default();
1852 assert!(
1853 cache
1854 .get_sst_meta_data(file_id, &mut metrics, PageIndexPolicy::Optional)
1855 .await
1856 .is_some()
1857 );
1858 assert_eq!(1, metrics.mem_cache_hit);
1859
1860 let mut metrics = MetadataCacheMetrics::default();
1861 assert!(
1862 cache
1863 .get_sst_meta_data(file_id, &mut metrics, PageIndexPolicy::Skip)
1864 .await
1865 .is_some()
1866 );
1867 assert_eq!(1, metrics.mem_cache_hit);
1868 }
1869
1870 #[test]
1871 fn test_meta_cache_weight_accounts_for_decoded_region_metadata() {
1872 let region_metadata = Arc::new(wide_region_metadata(128));
1873 let json_len = region_metadata.to_json().unwrap().len();
1874 let metadata = sst_parquet_meta_with_region_metadata(region_metadata.clone());
1875 let cached = Arc::new(
1876 CachedSstMeta::try_new("test.parquet", Arc::unwrap_or_clone(metadata)).unwrap(),
1877 );
1878 let key = SstMetaKey(region_metadata.region_id, FileId::random());
1879
1880 assert!(cached.region_metadata_weight > json_len);
1881 assert_eq!(
1882 meta_cache_weight(&key, &cached) as usize,
1883 key.estimated_size()
1884 + parquet_meta_size(&cached.parquet_metadata)
1885 + cached.region_metadata_weight
1886 );
1887 }
1888
1889 #[test]
1890 fn test_meta_cache_weight_saturates_on_overflow() {
1891 let region_metadata = Arc::new(wide_region_metadata(1));
1892 let metadata = sst_parquet_meta_with_region_metadata(region_metadata.clone());
1893 let mut cached =
1894 CachedSstMeta::try_new("test.parquet", Arc::unwrap_or_clone(metadata)).unwrap();
1895 cached.region_metadata_weight = u32::MAX as usize + 1;
1896 let cached = Arc::new(cached);
1897 let key = SstMetaKey(region_metadata.region_id, FileId::random());
1898
1899 assert_eq!(u32::MAX, meta_cache_weight(&key, &cached));
1900 }
1901
1902 #[test]
1903 fn test_repeated_vector_cache() {
1904 let cache = CacheManager::builder().vector_cache_size(4096).build();
1905 let value = Value::Int64(10);
1906 assert!(
1907 cache
1908 .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
1909 .is_none()
1910 );
1911 let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10]));
1912 cache.put_repeated_vector(value.clone(), vector.clone());
1913 let cached = cache
1914 .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
1915 .unwrap();
1916 assert_eq!(vector, cached);
1917 }
1918
1919 #[test]
1920 fn test_page_cache() {
1921 let cache = CacheManager::builder().page_cache_size(1000).build();
1922 let file_id = FileId::random();
1923 let uncached = 0..10;
1924 assert_eq!(
1925 vec![0..10],
1926 cache
1927 .get_page_ranges(file_id, 0, std::slice::from_ref(&uncached))
1928 .unwrap()
1929 .missing_ranges
1930 );
1931
1932 let cached = 100..500;
1933 cache.put_page_ranges(
1934 file_id,
1935 0,
1936 std::slice::from_ref(&cached),
1937 &[Bytes::from(vec![7; 400])],
1938 );
1939
1940 let subrange = 200..300;
1941 let lookup = cache
1942 .get_page_ranges(file_id, 0, std::slice::from_ref(&subrange))
1943 .unwrap();
1944 assert!(lookup.is_fully_cached());
1945 assert_eq!(100, lookup.cached_bytes);
1946 assert_eq!(1, lookup.cached_parts.len());
1947 assert_eq!(200..300, lookup.cached_parts[0][0].range);
1948 assert_eq!(100, lookup.cached_parts[0][0].bytes.len());
1949
1950 let overlapping = 400..600;
1951 let lookup = cache
1952 .get_page_ranges(file_id, 0, std::slice::from_ref(&overlapping))
1953 .unwrap();
1954 assert!(!lookup.is_fully_cached());
1955 assert_eq!(100, lookup.cached_bytes);
1956 assert_eq!(vec![500..600], lookup.missing_ranges);
1957 assert_eq!(400..500, lookup.cached_parts[0][0].range);
1958 }
1959
1960 #[test]
1961 fn test_page_cache_detaches_fragment_bytes() {
1962 let cache = PageRangeCache::new(1000);
1963 let file_id = FileId::random();
1964 let backing = Bytes::from(vec![1; 1024]);
1965 let page = backing.slice(512..522);
1966 let page_ptr = page.as_ptr();
1967 let range = 0..10;
1968
1969 cache.insert_ranges(
1970 file_id,
1971 0,
1972 std::slice::from_ref(&range),
1973 std::slice::from_ref(&page),
1974 );
1975
1976 let lookup = cache.lookup(file_id, 0, std::slice::from_ref(&range));
1977 assert!(lookup.is_fully_cached());
1978 assert_eq!(1, lookup.cached_parts[0].len());
1979 assert_eq!(&page[..], &lookup.cached_parts[0][0].bytes[..]);
1980 assert_ne!(page_ptr, lookup.cached_parts[0][0].bytes.as_ptr());
1981 }
1982
1983 #[test]
1984 fn test_page_cache_replaces_fragment() {
1985 let cache = PageRangeCache::new(1000);
1986 let file_id = FileId::random();
1987 let range = 0..10;
1988
1989 cache.insert_ranges(
1990 file_id,
1991 0,
1992 std::slice::from_ref(&range),
1993 &[Bytes::from(vec![1; 10])],
1994 );
1995 cache.insert_ranges(
1996 file_id,
1997 0,
1998 std::slice::from_ref(&range),
1999 &[Bytes::from(vec![2; 10])],
2000 );
2001 cache.cache.run_pending_tasks();
2002 assert_eq!(
2003 vec![PageFragmentKey::new(file_id, 0, &range)],
2004 cache.find_index_candidates(file_id, 0, &range)
2005 );
2006
2007 let lookup = cache.lookup(file_id, 0, std::slice::from_ref(&range));
2008 assert!(lookup.is_fully_cached());
2009 assert_eq!(&vec![2; 10][..], &lookup.cached_parts[0][0].bytes[..]);
2010 }
2011
2012 #[test]
2013 fn test_page_cache_retains_disjoint_inserts_for_same_row_group() {
2014 let cache = PageRangeCache::new(1000);
2015 let file_id = FileId::random();
2016 let range1 = 0..10;
2017 let range2 = 20..30;
2018
2019 cache.insert_ranges(
2020 file_id,
2021 0,
2022 std::slice::from_ref(&range1),
2023 &[Bytes::from(vec![1; 10])],
2024 );
2025 cache.insert_ranges(
2026 file_id,
2027 0,
2028 std::slice::from_ref(&range2),
2029 &[Bytes::from(vec![2; 10])],
2030 );
2031
2032 let lookup = cache.lookup(file_id, 0, &[range1, range2]);
2033 assert!(lookup.is_fully_cached());
2034 assert_eq!(2, lookup.cached_range_count);
2035 assert_eq!(&vec![1; 10][..], &lookup.cached_parts[0][0].bytes[..]);
2036 assert_eq!(&vec![2; 10][..], &lookup.cached_parts[1][0].bytes[..]);
2037 }
2038
2039 #[test]
2040 fn test_page_cache_fragment_eviction() {
2041 let file_id = FileId::random();
2042 let range = 0..10;
2043 let key = PageFragmentKey::new(file_id, 0, &range);
2044 let page = Bytes::from(vec![1; 10]);
2045 let cache = PageRangeCache::new(page_cache_weight(&key, &page) as u64);
2046
2047 cache.insert_ranges(
2048 file_id,
2049 0,
2050 std::slice::from_ref(&range),
2051 &[Bytes::from(vec![1; 10])],
2052 );
2053 assert!(
2054 cache
2055 .lookup(file_id, 0, std::slice::from_ref(&range))
2056 .is_fully_cached()
2057 );
2058
2059 cache.cache.invalidate(&key);
2060 cache.cache.run_pending_tasks();
2061 assert!(cache.find_index_candidates(file_id, 0, &range).is_empty());
2062
2063 let lookup = cache.lookup(file_id, 0, std::slice::from_ref(&range));
2064 assert!(!lookup.is_fully_cached());
2065 assert_eq!(vec![0..10], lookup.missing_ranges);
2066 }
2067
2068 #[test]
2069 fn test_page_cache_rejects_oversized_fragment() {
2070 let cache = PageRangeCache::new(1);
2071 let file_id = FileId::random();
2072 let range = 0..10;
2073
2074 cache.insert_ranges(
2075 file_id,
2076 0,
2077 std::slice::from_ref(&range),
2078 &[Bytes::from(vec![1; 10])],
2079 );
2080 cache.cache.run_pending_tasks();
2081
2082 let lookup = cache.lookup(file_id, 0, std::slice::from_ref(&range));
2083 assert!(!lookup.is_fully_cached());
2084 assert_eq!(vec![0..10], lookup.missing_ranges);
2085 }
2086
2087 #[test]
2088 fn test_selector_result_cache() {
2089 let cache = CacheManager::builder()
2090 .selector_result_cache_size(1000)
2091 .build();
2092 let file_id = FileId::random();
2093 let key = SelectorResultKey {
2094 file_id,
2095 row_group_idx: 0,
2096 selector: TimeSeriesRowSelector::LastRow,
2097 };
2098 assert!(cache.get_selector_result(&key).is_none());
2099 let result = Arc::new(SelectorResultValue::new(
2100 Vec::new(),
2101 ParquetReadColumns::from_deduped(Vec::new()),
2102 ));
2103 cache.put_selector_result(key, result);
2104 assert!(cache.get_selector_result(&key).is_some());
2105 }
2106
2107 #[test]
2108 fn test_prefilter_result_cache() {
2109 let disabled = CacheManager::builder().build();
2110 let file_id = FileId::random();
2111 let key = PrefilterKey::new(
2112 file_id,
2113 0,
2114 None,
2115 1,
2116 SmallVec::from_vec(vec!["tag_0 IN ([a])".to_string()]),
2117 );
2118 let selection = Arc::new(BooleanBuffer::new_set(3));
2119
2120 disabled.put_prefilter_result(key.clone(), selection.clone());
2121 assert!(disabled.get_prefilter_result(&key).is_none());
2122
2123 let cache = Arc::new(
2124 CacheManager::builder()
2125 .prefilter_result_cache_size(1000)
2126 .build(),
2127 );
2128 assert!(cache.get_prefilter_result(&key).is_none());
2129 cache.put_prefilter_result(key.clone(), selection.clone());
2130 assert_eq!(
2131 cache.get_prefilter_result(&key).unwrap().as_ref(),
2132 selection.as_ref()
2133 );
2134
2135 let enable_all = CacheStrategy::EnableAll(cache.clone());
2136 assert!(enable_all.get_prefilter_result(&key).is_some());
2137
2138 let compaction = CacheStrategy::Compaction(cache.clone());
2139 assert!(compaction.get_prefilter_result(&key).is_none());
2140 compaction.put_prefilter_result(key.clone(), selection.clone());
2141 assert!(cache.get_prefilter_result(&key).is_some());
2142
2143 let disabled_strategy = CacheStrategy::Disabled;
2144 assert!(disabled_strategy.get_prefilter_result(&key).is_none());
2145 disabled_strategy.put_prefilter_result(key.clone(), selection);
2146 assert!(cache.get_prefilter_result(&key).is_some());
2147 }
2148
2149 #[test]
2150 fn test_prefilter_key_distinguishes_dimensions() {
2151 let file_id = FileId::random();
2152 let row_selection = RowSelection::from(vec![RowSelector::skip(1), RowSelector::select(3)]);
2153 let other_row_selection =
2154 RowSelection::from(vec![RowSelector::skip(2), RowSelector::select(2)]);
2155 let row_selection = PrefilterKey::row_selection_snapshot(Some(&row_selection));
2156 let other_row_selection = PrefilterKey::row_selection_snapshot(Some(&other_row_selection));
2157 let base = PrefilterKey::new(
2158 file_id,
2159 0,
2160 row_selection.clone(),
2161 1,
2162 SmallVec::from_vec(vec!["tag_0 IN ([a])".to_string()]),
2163 );
2164
2165 assert_ne!(
2166 base,
2167 PrefilterKey::new(
2168 FileId::random(),
2169 0,
2170 row_selection.clone(),
2171 1,
2172 SmallVec::from_vec(vec!["tag_0 IN ([a])".to_string()])
2173 )
2174 );
2175 assert_ne!(
2176 base,
2177 PrefilterKey::new(
2178 file_id,
2179 1,
2180 row_selection.clone(),
2181 1,
2182 SmallVec::from_vec(vec!["tag_0 IN ([a])".to_string()])
2183 )
2184 );
2185 assert_ne!(
2186 base,
2187 PrefilterKey::new(
2188 file_id,
2189 0,
2190 other_row_selection,
2191 1,
2192 SmallVec::from_vec(vec!["tag_0 IN ([a])".to_string()])
2193 )
2194 );
2195 assert_ne!(
2196 base,
2197 PrefilterKey::new(
2198 file_id,
2199 0,
2200 row_selection.clone(),
2201 1,
2202 SmallVec::from_vec(vec!["tag_0 IN ([b])".to_string()])
2203 )
2204 );
2205 assert_ne!(
2206 base,
2207 PrefilterKey::new(
2208 file_id,
2209 0,
2210 row_selection.clone(),
2211 2,
2212 SmallVec::from_vec(vec!["tag_0 IN ([a])".to_string()])
2213 )
2214 );
2215 let pk_group = PrefilterKey::new(
2216 file_id,
2217 0,
2218 row_selection,
2219 1,
2220 SmallVec::from_vec(vec![
2221 "tag_0 IN ([a])".to_string(),
2222 "tag_1 IN ([x])".to_string(),
2223 ]),
2224 );
2225 assert_ne!(base, pk_group);
2226 }
2227
2228 #[test]
2229 fn test_range_result_cache() {
2230 let cache = Arc::new(
2231 CacheManager::builder()
2232 .range_result_cache_size(1024 * 1024)
2233 .build(),
2234 );
2235
2236 let key = RangeScanCacheKey {
2237 region_id: RegionId::new(1, 1),
2238 row_groups: vec![(FileId::random(), 0)],
2239 scan: ScanRequestFingerprintBuilder {
2240 read_columns: ReadColumns::from_deduped_column_ids(std::iter::empty()),
2241 read_column_types: vec![],
2242 filters: vec!["tag_0 = 1".to_string()],
2243 time_filters: vec![],
2244 series_row_selector: None,
2245 append_mode: false,
2246 filter_deleted: true,
2247 merge_mode: crate::region::options::MergeMode::LastRow,
2248 partition_expr_version: 0,
2249 }
2250 .build(),
2251 };
2252 let value = Arc::new(RangeScanCacheValue::new(Vec::new(), 0));
2253
2254 assert!(cache.get_range_result(&key).is_none());
2255 cache.put_range_result(key.clone(), value.clone());
2256 assert!(cache.get_range_result(&key).is_some());
2257
2258 let enable_all = CacheStrategy::EnableAll(cache.clone());
2259 assert!(enable_all.get_range_result(&key).is_some());
2260
2261 let compaction = CacheStrategy::Compaction(cache.clone());
2262 assert!(compaction.get_range_result(&key).is_none());
2263 compaction.put_range_result(key.clone(), value.clone());
2264 assert!(cache.get_range_result(&key).is_some());
2265
2266 let disabled = CacheStrategy::Disabled;
2267 assert!(disabled.get_range_result(&key).is_none());
2268 disabled.put_range_result(key.clone(), value);
2269 assert!(cache.get_range_result(&key).is_some());
2270 }
2271
2272 #[test]
2273 fn test_range_result_cache_size_configures_limiter() {
2274 let cache_size = 3 * 1024_u64;
2275 let cache = CacheManager::builder()
2276 .range_result_cache_size(cache_size)
2277 .build();
2278
2279 assert_eq!(cache.range_result_cache_size(), cache_size as usize);
2280 assert_eq!(
2281 cache.range_result_memory_limiter().permit_bytes(),
2282 RANGE_RESULT_CONCAT_MEMORY_PERMIT.as_bytes() as usize
2283 );
2284 assert_eq!(
2285 cache.range_result_memory_limiter().available_permits(),
2286 (cache_size as usize).div_ceil(RANGE_RESULT_CONCAT_MEMORY_PERMIT.as_bytes() as usize)
2287 );
2288 }
2289
2290 #[tokio::test]
2291 async fn range_result_memory_limiter_rejects_oversized_request() {
2292 let limiter = RangeResultMemoryLimiter::new(2 * 1024, 1024);
2293 assert_eq!(limiter.available_permits(), 2);
2294
2295 let err = limiter.acquire(10 * 1024).await.unwrap_err();
2296 assert!(
2297 err.to_string().contains("exceeds limiter capacity"),
2298 "unexpected error: {err}"
2299 );
2300 assert_eq!(limiter.available_permits(), 2);
2301 }
2302
2303 #[tokio::test]
2304 async fn range_result_memory_limiter_allows_request_up_to_capacity() {
2305 let limiter = RangeResultMemoryLimiter::new(2 * 1024, 1024);
2306 let permit = limiter.acquire(2 * 1024).await.unwrap();
2307 assert_eq!(limiter.available_permits(), 0);
2308 drop(permit);
2309 assert_eq!(limiter.available_permits(), 2);
2310 }
2311
2312 #[tokio::test]
2313 async fn test_evict_puffin_cache_clears_all_entries() {
2314 use std::collections::{BTreeMap, HashMap};
2315
2316 let cache = CacheManager::builder()
2317 .index_metadata_size(128)
2318 .index_content_size(128)
2319 .index_content_page_size(64)
2320 .index_result_cache_size(128)
2321 .puffin_metadata_size(128)
2322 .build();
2323 let cache = Arc::new(cache);
2324
2325 let region_id = RegionId::new(1, 1);
2326 let index_id = RegionIndexId::new(RegionFileId::new(region_id, FileId::random()), 0);
2327 let column_id: ColumnId = 1;
2328
2329 let bloom_cache = cache.bloom_filter_index_cache().unwrap().clone();
2330 let inverted_cache = cache.inverted_index_cache().unwrap().clone();
2331 let result_cache = cache.index_result_cache().unwrap();
2332 let puffin_metadata_cache = cache.puffin_metadata_cache().unwrap().clone();
2333
2334 let bloom_key = (
2335 index_id.file_id(),
2336 index_id.version,
2337 column_id,
2338 Tag::Skipping,
2339 );
2340 bloom_cache.put_metadata(bloom_key, Arc::new(BloomFilterMeta::default()));
2341 inverted_cache.put_metadata(
2342 (index_id.file_id(), index_id.version),
2343 Arc::new(InvertedIndexMetas::default()),
2344 );
2345 let predicate = PredicateKey::new_bloom(Arc::new(BTreeMap::new()));
2346 let selection = Arc::new(RowGroupSelection::default());
2347 result_cache.put(predicate.clone(), index_id.file_id(), selection);
2348 let file_id_str = index_id.to_string();
2349 let metadata = Arc::new(FileMetadata {
2350 blobs: Vec::new(),
2351 properties: HashMap::new(),
2352 });
2353 puffin_metadata_cache.put_metadata(file_id_str.clone(), metadata);
2354
2355 assert!(bloom_cache.get_metadata(bloom_key).is_some());
2356 assert!(
2357 inverted_cache
2358 .get_metadata((index_id.file_id(), index_id.version))
2359 .is_some()
2360 );
2361 assert!(result_cache.get(&predicate, index_id.file_id()).is_some());
2362 assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_some());
2363
2364 cache.evict_puffin_cache(index_id).await;
2365
2366 assert!(bloom_cache.get_metadata(bloom_key).is_none());
2367 assert!(
2368 inverted_cache
2369 .get_metadata((index_id.file_id(), index_id.version))
2370 .is_none()
2371 );
2372 assert!(result_cache.get(&predicate, index_id.file_id()).is_none());
2373 assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_none());
2374
2375 bloom_cache.put_metadata(bloom_key, Arc::new(BloomFilterMeta::default()));
2377 inverted_cache.put_metadata(
2378 (index_id.file_id(), index_id.version),
2379 Arc::new(InvertedIndexMetas::default()),
2380 );
2381 result_cache.put(
2382 predicate.clone(),
2383 index_id.file_id(),
2384 Arc::new(RowGroupSelection::default()),
2385 );
2386 puffin_metadata_cache.put_metadata(
2387 file_id_str.clone(),
2388 Arc::new(FileMetadata {
2389 blobs: Vec::new(),
2390 properties: HashMap::new(),
2391 }),
2392 );
2393
2394 let strategy = CacheStrategy::EnableAll(cache.clone());
2395 strategy.evict_puffin_cache(index_id).await;
2396
2397 assert!(bloom_cache.get_metadata(bloom_key).is_none());
2398 assert!(
2399 inverted_cache
2400 .get_metadata((index_id.file_id(), index_id.version))
2401 .is_none()
2402 );
2403 assert!(result_cache.get(&predicate, index_id.file_id()).is_none());
2404 assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_none());
2405 }
2406
2407 fn wide_region_metadata(column_count: u32) -> RegionMetadata {
2408 let region_id = RegionId::new(1024, 7);
2409 let mut builder = RegionMetadataBuilder::new(region_id);
2410 let mut primary_key = Vec::new();
2411
2412 for column_id in 0..column_count {
2413 let semantic_type = if column_id < 32 {
2414 primary_key.push(column_id);
2415 SemanticType::Tag
2416 } else {
2417 SemanticType::Field
2418 };
2419 let mut column_schema = ColumnSchema::new(
2420 format!("wide_column_{column_id}"),
2421 ConcreteDataType::string_datatype(),
2422 true,
2423 );
2424 column_schema
2425 .mut_metadata()
2426 .insert(format!("cache_key_{column_id}"), "cache_value".repeat(4));
2427 builder.push_column_metadata(ColumnMetadata {
2428 column_schema,
2429 semantic_type,
2430 column_id,
2431 });
2432 }
2433
2434 builder.push_column_metadata(ColumnMetadata {
2435 column_schema: ColumnSchema::new(
2436 "ts",
2437 ConcreteDataType::timestamp_millisecond_datatype(),
2438 false,
2439 ),
2440 semantic_type: SemanticType::Timestamp,
2441 column_id: column_count,
2442 });
2443 builder.primary_key(primary_key);
2444
2445 builder.build().unwrap()
2446 }
2447}