1pub(crate) mod cache_size;
18
19pub(crate) mod file_cache;
20pub(crate) mod index;
21pub(crate) mod manifest_cache;
22#[cfg(test)]
23pub(crate) mod test_util;
24pub(crate) mod write_cache;
25
26use std::mem;
27use std::ops::Range;
28use std::sync::Arc;
29
30use bytes::Bytes;
31use common_base::readable_size::ReadableSize;
32use common_telemetry::warn;
33use datatypes::arrow::record_batch::RecordBatch;
34use datatypes::value::Value;
35use datatypes::vectors::VectorRef;
36use index::bloom_filter_index::{BloomFilterIndexCache, BloomFilterIndexCacheRef};
37use index::result_cache::IndexResultCache;
38use moka::notification::RemovalCause;
39use moka::sync::Cache;
40use object_store::ObjectStore;
41use parquet::file::metadata::{FileMetaData, PageIndexPolicy, ParquetMetaData};
42use puffin::puffin_manager::cache::{PuffinMetadataCache, PuffinMetadataCacheRef};
43use snafu::{OptionExt, ResultExt};
44use store_api::metadata::RegionMetadataRef;
45use store_api::storage::{ConcreteDataType, FileId, RegionId, TimeSeriesRowSelector};
46
47use crate::cache::cache_size::parquet_meta_size;
48use crate::cache::file_cache::{FileType, IndexKey};
49use crate::cache::index::inverted_index::{InvertedIndexCache, InvertedIndexCacheRef};
50#[cfg(feature = "vector_index")]
51use crate::cache::index::vector_index::{VectorIndexCache, VectorIndexCacheRef};
52use crate::cache::write_cache::WriteCacheRef;
53use crate::error::{InvalidMetadataSnafu, InvalidParquetSnafu, Result};
54use crate::memtable::record_batch_estimated_size;
55use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS};
56use crate::read::Batch;
57use crate::read::range_cache::{RangeScanCacheKey, RangeScanCacheValue};
58use crate::sst::file::{RegionFileId, RegionIndexId};
59use crate::sst::parquet::PARQUET_METADATA_KEY;
60use crate::sst::parquet::reader::MetadataCacheMetrics;
61
62const SST_META_TYPE: &str = "sst_meta";
64const VECTOR_TYPE: &str = "vector";
66const PAGE_TYPE: &str = "page";
68const FILE_TYPE: &str = "file";
70const INDEX_TYPE: &str = "index";
72const SELECTOR_RESULT_TYPE: &str = "selector_result";
74const RANGE_RESULT_TYPE: &str = "range_result";
76const RANGE_RESULT_CONCAT_MEMORY_LIMIT: ReadableSize = ReadableSize::mb(512);
77const RANGE_RESULT_CONCAT_MEMORY_PERMIT: ReadableSize = ReadableSize::kb(1);
78
79#[derive(Debug)]
80pub(crate) struct RangeResultMemoryLimiter {
81 semaphore: Arc<tokio::sync::Semaphore>,
82 permit_bytes: usize,
83}
84
85impl Default for RangeResultMemoryLimiter {
86 fn default() -> Self {
87 Self::new(
88 RANGE_RESULT_CONCAT_MEMORY_LIMIT.as_bytes() as usize,
89 RANGE_RESULT_CONCAT_MEMORY_PERMIT.as_bytes() as usize,
90 )
91 }
92}
93
94impl RangeResultMemoryLimiter {
95 pub(crate) fn new(limit_bytes: usize, permit_bytes: usize) -> Self {
96 let permit_bytes = permit_bytes.max(1);
97 let permits = limit_bytes.div_ceil(permit_bytes).max(1);
98 Self {
99 semaphore: Arc::new(tokio::sync::Semaphore::new(permits)),
100 permit_bytes,
101 }
102 }
103
104 pub(crate) fn permit_bytes(&self) -> usize {
105 self.permit_bytes
106 }
107
108 pub(crate) async fn acquire(
109 &self,
110 bytes: usize,
111 ) -> std::result::Result<tokio::sync::SemaphorePermit<'_>, tokio::sync::AcquireError> {
112 let permits = bytes.div_ceil(self.permit_bytes()).max(1) as u32;
113 self.semaphore.acquire_many(permits).await
114 }
115}
116
117#[derive(Debug)]
122pub(crate) struct CachedSstMeta {
123 parquet_metadata: Arc<ParquetMetaData>,
124 region_metadata: RegionMetadataRef,
125 region_metadata_weight: usize,
126}
127
128impl CachedSstMeta {
129 pub(crate) fn try_new(file_path: &str, parquet_metadata: ParquetMetaData) -> Result<Self> {
130 Self::try_new_with_region_metadata(file_path, parquet_metadata, None)
131 }
132
133 pub(crate) fn try_new_with_region_metadata(
134 file_path: &str,
135 parquet_metadata: ParquetMetaData,
136 region_metadata: Option<RegionMetadataRef>,
137 ) -> Result<Self> {
138 let file_metadata = parquet_metadata.file_metadata();
139 let key_values = file_metadata
140 .key_value_metadata()
141 .context(InvalidParquetSnafu {
142 file: file_path,
143 reason: "missing key value meta",
144 })?;
145 let meta_value = key_values
146 .iter()
147 .find(|kv| kv.key == PARQUET_METADATA_KEY)
148 .with_context(|| InvalidParquetSnafu {
149 file: file_path,
150 reason: format!("key {} not found", PARQUET_METADATA_KEY),
151 })?;
152 let json = meta_value
153 .value
154 .as_ref()
155 .with_context(|| InvalidParquetSnafu {
156 file: file_path,
157 reason: format!("No value for key {}", PARQUET_METADATA_KEY),
158 })?;
159 let region_metadata = match region_metadata {
160 Some(region_metadata) => region_metadata,
161 None => Arc::new(
162 store_api::metadata::RegionMetadata::from_json(json)
163 .context(InvalidMetadataSnafu)?,
164 ),
165 };
166 let region_metadata_weight = region_metadata.estimated_size().max(json.len());
168 let parquet_metadata = Arc::new(strip_region_metadata_from_parquet(parquet_metadata));
169
170 Ok(Self {
171 parquet_metadata,
172 region_metadata,
173 region_metadata_weight,
174 })
175 }
176
177 pub(crate) fn parquet_metadata(&self) -> Arc<ParquetMetaData> {
178 self.parquet_metadata.clone()
179 }
180
181 pub(crate) fn region_metadata(&self) -> RegionMetadataRef {
182 self.region_metadata.clone()
183 }
184}
185
186fn strip_region_metadata_from_parquet(parquet_metadata: ParquetMetaData) -> ParquetMetaData {
187 let file_metadata = parquet_metadata.file_metadata();
188 let filtered_key_values = file_metadata.key_value_metadata().and_then(|key_values| {
189 let filtered = key_values
190 .iter()
191 .filter(|kv| kv.key != PARQUET_METADATA_KEY)
192 .cloned()
193 .collect::<Vec<_>>();
194 (!filtered.is_empty()).then_some(filtered)
195 });
196 let stripped_file_metadata = FileMetaData::new(
197 file_metadata.version(),
198 file_metadata.num_rows(),
199 file_metadata.created_by().map(ToString::to_string),
200 filtered_key_values,
201 file_metadata.schema_descr_ptr(),
202 file_metadata.column_orders().cloned(),
203 );
204
205 let mut builder = parquet_metadata.into_builder();
206 let row_groups = builder.take_row_groups();
207 let column_index = builder.take_column_index();
208 let offset_index = builder.take_offset_index();
209
210 parquet::file::metadata::ParquetMetaDataBuilder::new(stripped_file_metadata)
211 .set_row_groups(row_groups)
212 .set_column_index(column_index)
213 .set_offset_index(offset_index)
214 .build()
215}
216
217#[derive(Clone)]
219pub enum CacheStrategy {
220 EnableAll(CacheManagerRef),
223 Compaction(CacheManagerRef),
228 Disabled,
230}
231
232impl CacheStrategy {
233 pub(crate) async fn get_sst_meta_data(
235 &self,
236 file_id: RegionFileId,
237 metrics: &mut MetadataCacheMetrics,
238 page_index_policy: PageIndexPolicy,
239 ) -> Option<Arc<CachedSstMeta>> {
240 match self {
241 CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => {
242 cache_manager
243 .get_sst_meta_data(file_id, metrics, page_index_policy)
244 .await
245 }
246 CacheStrategy::Disabled => {
247 metrics.cache_miss += 1;
248 None
249 }
250 }
251 }
252
253 pub(crate) fn get_sst_meta_data_from_mem_cache(
255 &self,
256 file_id: RegionFileId,
257 ) -> Option<Arc<CachedSstMeta>> {
258 match self {
259 CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => {
260 cache_manager.get_sst_meta_data_from_mem_cache(file_id)
261 }
262 CacheStrategy::Disabled => None,
263 }
264 }
265
266 pub fn get_parquet_meta_data_from_mem_cache(
268 &self,
269 file_id: RegionFileId,
270 ) -> Option<Arc<ParquetMetaData>> {
271 self.get_sst_meta_data_from_mem_cache(file_id)
272 .map(|metadata| metadata.parquet_metadata())
273 }
274
275 pub(crate) fn put_sst_meta_data(&self, file_id: RegionFileId, metadata: Arc<CachedSstMeta>) {
277 match self {
278 CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => {
279 cache_manager.put_sst_meta_data(file_id, metadata);
280 }
281 CacheStrategy::Disabled => {}
282 }
283 }
284
285 pub fn put_parquet_meta_data(
287 &self,
288 file_id: RegionFileId,
289 metadata: Arc<ParquetMetaData>,
290 region_metadata: Option<RegionMetadataRef>,
291 ) {
292 match self {
293 CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => {
294 cache_manager.put_parquet_meta_data(file_id, metadata, region_metadata);
295 }
296 CacheStrategy::Disabled => {}
297 }
298 }
299
300 pub fn remove_parquet_meta_data(&self, file_id: RegionFileId) {
302 match self {
303 CacheStrategy::EnableAll(cache_manager) => {
304 cache_manager.remove_parquet_meta_data(file_id);
305 }
306 CacheStrategy::Compaction(cache_manager) => {
307 cache_manager.remove_parquet_meta_data(file_id);
308 }
309 CacheStrategy::Disabled => {}
310 }
311 }
312
313 pub fn get_repeated_vector(
316 &self,
317 data_type: &ConcreteDataType,
318 value: &Value,
319 ) -> Option<VectorRef> {
320 match self {
321 CacheStrategy::EnableAll(cache_manager) => {
322 cache_manager.get_repeated_vector(data_type, value)
323 }
324 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
325 }
326 }
327
328 pub fn put_repeated_vector(&self, value: Value, vector: VectorRef) {
331 if let CacheStrategy::EnableAll(cache_manager) = self {
332 cache_manager.put_repeated_vector(value, vector);
333 }
334 }
335
336 pub fn get_pages(&self, page_key: &PageKey) -> Option<Arc<PageValue>> {
339 match self {
340 CacheStrategy::EnableAll(cache_manager) => cache_manager.get_pages(page_key),
341 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
342 }
343 }
344
345 pub fn put_pages(&self, page_key: PageKey, pages: Arc<PageValue>) {
348 if let CacheStrategy::EnableAll(cache_manager) = self {
349 cache_manager.put_pages(page_key, pages);
350 }
351 }
352
353 pub async fn evict_puffin_cache(&self, file_id: RegionIndexId) {
355 match self {
356 CacheStrategy::EnableAll(cache_manager) => {
357 cache_manager.evict_puffin_cache(file_id).await
358 }
359 CacheStrategy::Compaction(cache_manager) => {
360 cache_manager.evict_puffin_cache(file_id).await
361 }
362 CacheStrategy::Disabled => {}
363 }
364 }
365
366 pub fn get_selector_result(
369 &self,
370 selector_key: &SelectorResultKey,
371 ) -> Option<Arc<SelectorResultValue>> {
372 match self {
373 CacheStrategy::EnableAll(cache_manager) => {
374 cache_manager.get_selector_result(selector_key)
375 }
376 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
377 }
378 }
379
380 pub fn put_selector_result(
383 &self,
384 selector_key: SelectorResultKey,
385 result: Arc<SelectorResultValue>,
386 ) {
387 if let CacheStrategy::EnableAll(cache_manager) = self {
388 cache_manager.put_selector_result(selector_key, result);
389 }
390 }
391
392 #[allow(dead_code)]
395 pub(crate) fn get_range_result(
396 &self,
397 key: &RangeScanCacheKey,
398 ) -> Option<Arc<RangeScanCacheValue>> {
399 match self {
400 CacheStrategy::EnableAll(cache_manager) => cache_manager.get_range_result(key),
401 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
402 }
403 }
404
405 pub(crate) fn put_range_result(
408 &self,
409 key: RangeScanCacheKey,
410 result: Arc<RangeScanCacheValue>,
411 ) {
412 if let CacheStrategy::EnableAll(cache_manager) = self {
413 cache_manager.put_range_result(key, result);
414 }
415 }
416
417 pub(crate) fn has_range_result_cache(&self) -> bool {
419 match self {
420 CacheStrategy::EnableAll(cache_manager) => cache_manager.has_range_result_cache(),
421 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => false,
422 }
423 }
424
425 pub(crate) fn range_result_memory_limiter(&self) -> Option<&Arc<RangeResultMemoryLimiter>> {
426 match self {
427 CacheStrategy::EnableAll(cache_manager) => {
428 Some(cache_manager.range_result_memory_limiter())
429 }
430 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
431 }
432 }
433
434 pub fn write_cache(&self) -> Option<&WriteCacheRef> {
437 match self {
438 CacheStrategy::EnableAll(cache_manager) => cache_manager.write_cache(),
439 CacheStrategy::Compaction(cache_manager) => cache_manager.write_cache(),
440 CacheStrategy::Disabled => None,
441 }
442 }
443
444 pub fn inverted_index_cache(&self) -> Option<&InvertedIndexCacheRef> {
447 match self {
448 CacheStrategy::EnableAll(cache_manager) => cache_manager.inverted_index_cache(),
449 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
450 }
451 }
452
453 pub fn bloom_filter_index_cache(&self) -> Option<&BloomFilterIndexCacheRef> {
456 match self {
457 CacheStrategy::EnableAll(cache_manager) => cache_manager.bloom_filter_index_cache(),
458 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
459 }
460 }
461
462 #[cfg(feature = "vector_index")]
465 pub fn vector_index_cache(&self) -> Option<&VectorIndexCacheRef> {
466 match self {
467 CacheStrategy::EnableAll(cache_manager) => cache_manager.vector_index_cache(),
468 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
469 }
470 }
471
472 pub fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
475 match self {
476 CacheStrategy::EnableAll(cache_manager) => cache_manager.puffin_metadata_cache(),
477 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
478 }
479 }
480
481 pub fn index_result_cache(&self) -> Option<&IndexResultCache> {
484 match self {
485 CacheStrategy::EnableAll(cache_manager) => cache_manager.index_result_cache(),
486 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
487 }
488 }
489
490 pub fn maybe_download_background(
492 &self,
493 index_key: IndexKey,
494 remote_path: String,
495 remote_store: ObjectStore,
496 file_size: u64,
497 ) {
498 if let CacheStrategy::EnableAll(cache_manager) = self
499 && let Some(write_cache) = cache_manager.write_cache()
500 {
501 write_cache.file_cache().maybe_download_background(
502 index_key,
503 remote_path,
504 remote_store,
505 file_size,
506 );
507 }
508 }
509}
510
511#[derive(Default)]
515pub struct CacheManager {
516 sst_meta_cache: Option<SstMetaCache>,
518 vector_cache: Option<VectorCache>,
520 page_cache: Option<PageCache>,
522 write_cache: Option<WriteCacheRef>,
524 inverted_index_cache: Option<InvertedIndexCacheRef>,
526 bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
528 #[cfg(feature = "vector_index")]
530 vector_index_cache: Option<VectorIndexCacheRef>,
531 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
533 selector_result_cache: Option<SelectorResultCache>,
535 range_result_cache: Option<RangeResultCache>,
537 range_result_memory_limiter: Arc<RangeResultMemoryLimiter>,
539 index_result_cache: Option<IndexResultCache>,
541}
542
543pub type CacheManagerRef = Arc<CacheManager>;
544
545impl CacheManager {
546 pub fn builder() -> CacheManagerBuilder {
548 CacheManagerBuilder::default()
549 }
550
551 pub(crate) async fn get_sst_meta_data(
554 &self,
555 file_id: RegionFileId,
556 metrics: &mut MetadataCacheMetrics,
557 page_index_policy: PageIndexPolicy,
558 ) -> Option<Arc<CachedSstMeta>> {
559 if let Some(metadata) = self.get_sst_meta_data_from_mem_cache(file_id) {
560 metrics.mem_cache_hit += 1;
561 return Some(metadata);
562 }
563
564 let key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Parquet);
565 if let Some(write_cache) = &self.write_cache
566 && let Some(metadata) = write_cache
567 .file_cache()
568 .get_sst_meta_data(key, metrics, page_index_policy)
569 .await
570 {
571 metrics.file_cache_hit += 1;
572 self.put_sst_meta_data(file_id, metadata.clone());
573 return Some(metadata);
574 }
575
576 metrics.cache_miss += 1;
577 None
578 }
579
580 pub(crate) async fn get_parquet_meta_data(
583 &self,
584 file_id: RegionFileId,
585 metrics: &mut MetadataCacheMetrics,
586 page_index_policy: PageIndexPolicy,
587 ) -> Option<Arc<ParquetMetaData>> {
588 self.get_sst_meta_data(file_id, metrics, page_index_policy)
589 .await
590 .map(|metadata| metadata.parquet_metadata())
591 }
592
593 pub(crate) fn get_sst_meta_data_from_mem_cache(
596 &self,
597 file_id: RegionFileId,
598 ) -> Option<Arc<CachedSstMeta>> {
599 self.sst_meta_cache.as_ref().and_then(|sst_meta_cache| {
600 let value = sst_meta_cache.get(&SstMetaKey(file_id.region_id(), file_id.file_id()));
601 update_hit_miss(value, SST_META_TYPE)
602 })
603 }
604
605 pub fn get_parquet_meta_data_from_mem_cache(
608 &self,
609 file_id: RegionFileId,
610 ) -> Option<Arc<ParquetMetaData>> {
611 self.get_sst_meta_data_from_mem_cache(file_id)
612 .map(|metadata| metadata.parquet_metadata())
613 }
614
615 pub(crate) fn put_sst_meta_data(&self, file_id: RegionFileId, metadata: Arc<CachedSstMeta>) {
617 if let Some(cache) = &self.sst_meta_cache {
618 let key = SstMetaKey(file_id.region_id(), file_id.file_id());
619 CACHE_BYTES
620 .with_label_values(&[SST_META_TYPE])
621 .add(meta_cache_weight(&key, &metadata).into());
622 cache.insert(key, metadata);
623 }
624 }
625
626 pub fn put_parquet_meta_data(
628 &self,
629 file_id: RegionFileId,
630 metadata: Arc<ParquetMetaData>,
631 region_metadata: Option<RegionMetadataRef>,
632 ) {
633 if self.sst_meta_cache.is_some() {
634 let file_path = format!(
635 "region_id={}, file_id={}",
636 file_id.region_id(),
637 file_id.file_id()
638 );
639 match CachedSstMeta::try_new_with_region_metadata(
640 &file_path,
641 Arc::unwrap_or_clone(metadata),
642 region_metadata,
643 ) {
644 Ok(metadata) => self.put_sst_meta_data(file_id, Arc::new(metadata)),
645 Err(err) => warn!(
646 err; "Failed to decode region metadata while caching parquet metadata, region_id: {}, file_id: {}",
647 file_id.region_id(),
648 file_id.file_id()
649 ),
650 }
651 }
652 }
653
654 pub fn remove_parquet_meta_data(&self, file_id: RegionFileId) {
656 if let Some(cache) = &self.sst_meta_cache {
657 cache.remove(&SstMetaKey(file_id.region_id(), file_id.file_id()));
658 }
659 }
660
661 pub(crate) fn sst_meta_cache_weighted_size(&self) -> u64 {
663 self.sst_meta_cache
664 .as_ref()
665 .map(|cache| cache.weighted_size())
666 .unwrap_or(0)
667 }
668
669 pub(crate) fn sst_meta_cache_enabled(&self) -> bool {
671 self.sst_meta_cache.is_some()
672 }
673
674 pub fn get_repeated_vector(
676 &self,
677 data_type: &ConcreteDataType,
678 value: &Value,
679 ) -> Option<VectorRef> {
680 self.vector_cache.as_ref().and_then(|vector_cache| {
681 let value = vector_cache.get(&(data_type.clone(), value.clone()));
682 update_hit_miss(value, VECTOR_TYPE)
683 })
684 }
685
686 pub fn put_repeated_vector(&self, value: Value, vector: VectorRef) {
688 if let Some(cache) = &self.vector_cache {
689 let key = (vector.data_type(), value);
690 CACHE_BYTES
691 .with_label_values(&[VECTOR_TYPE])
692 .add(vector_cache_weight(&key, &vector).into());
693 cache.insert(key, vector);
694 }
695 }
696
697 pub fn get_pages(&self, page_key: &PageKey) -> Option<Arc<PageValue>> {
699 self.page_cache.as_ref().and_then(|page_cache| {
700 let value = page_cache.get(page_key);
701 update_hit_miss(value, PAGE_TYPE)
702 })
703 }
704
705 pub fn put_pages(&self, page_key: PageKey, pages: Arc<PageValue>) {
707 if let Some(cache) = &self.page_cache {
708 CACHE_BYTES
709 .with_label_values(&[PAGE_TYPE])
710 .add(page_cache_weight(&page_key, &pages).into());
711 cache.insert(page_key, pages);
712 }
713 }
714
715 pub async fn evict_puffin_cache(&self, file_id: RegionIndexId) {
717 if let Some(cache) = &self.bloom_filter_index_cache {
718 cache.invalidate_file(file_id.file_id());
719 }
720
721 if let Some(cache) = &self.inverted_index_cache {
722 cache.invalidate_file(file_id.file_id());
723 }
724
725 if let Some(cache) = &self.index_result_cache {
726 cache.invalidate_file(file_id.file_id());
727 }
728
729 #[cfg(feature = "vector_index")]
730 if let Some(cache) = &self.vector_index_cache {
731 cache.invalidate_file(file_id.file_id());
732 }
733
734 if let Some(cache) = &self.puffin_metadata_cache {
735 cache.remove(&file_id.to_string());
736 }
737
738 if let Some(write_cache) = &self.write_cache {
739 write_cache
740 .remove(IndexKey::new(
741 file_id.region_id(),
742 file_id.file_id(),
743 FileType::Puffin(file_id.version),
744 ))
745 .await;
746 }
747 }
748
749 pub fn get_selector_result(
751 &self,
752 selector_key: &SelectorResultKey,
753 ) -> Option<Arc<SelectorResultValue>> {
754 self.selector_result_cache
755 .as_ref()
756 .and_then(|selector_result_cache| selector_result_cache.get(selector_key))
757 }
758
759 pub fn put_selector_result(
761 &self,
762 selector_key: SelectorResultKey,
763 result: Arc<SelectorResultValue>,
764 ) {
765 if let Some(cache) = &self.selector_result_cache {
766 CACHE_BYTES
767 .with_label_values(&[SELECTOR_RESULT_TYPE])
768 .add(selector_result_cache_weight(&selector_key, &result).into());
769 cache.insert(selector_key, result);
770 }
771 }
772
773 #[allow(dead_code)]
775 pub(crate) fn get_range_result(
776 &self,
777 key: &RangeScanCacheKey,
778 ) -> Option<Arc<RangeScanCacheValue>> {
779 self.range_result_cache
780 .as_ref()
781 .and_then(|cache| update_hit_miss(cache.get(key), RANGE_RESULT_TYPE))
782 }
783
784 pub(crate) fn put_range_result(
786 &self,
787 key: RangeScanCacheKey,
788 result: Arc<RangeScanCacheValue>,
789 ) {
790 if let Some(cache) = &self.range_result_cache {
791 CACHE_BYTES
792 .with_label_values(&[RANGE_RESULT_TYPE])
793 .add(range_result_cache_weight(&key, &result).into());
794 cache.insert(key, result);
795 }
796 }
797
798 pub(crate) fn has_range_result_cache(&self) -> bool {
800 self.range_result_cache.is_some()
801 }
802
803 pub(crate) fn range_result_memory_limiter(&self) -> &Arc<RangeResultMemoryLimiter> {
804 &self.range_result_memory_limiter
805 }
806
807 pub(crate) fn write_cache(&self) -> Option<&WriteCacheRef> {
809 self.write_cache.as_ref()
810 }
811
812 pub(crate) fn inverted_index_cache(&self) -> Option<&InvertedIndexCacheRef> {
813 self.inverted_index_cache.as_ref()
814 }
815
816 pub(crate) fn bloom_filter_index_cache(&self) -> Option<&BloomFilterIndexCacheRef> {
817 self.bloom_filter_index_cache.as_ref()
818 }
819
820 #[cfg(feature = "vector_index")]
821 pub(crate) fn vector_index_cache(&self) -> Option<&VectorIndexCacheRef> {
822 self.vector_index_cache.as_ref()
823 }
824
825 pub(crate) fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
826 self.puffin_metadata_cache.as_ref()
827 }
828
829 pub(crate) fn index_result_cache(&self) -> Option<&IndexResultCache> {
830 self.index_result_cache.as_ref()
831 }
832}
833
834pub fn selector_result_cache_miss() {
836 CACHE_MISS.with_label_values(&[SELECTOR_RESULT_TYPE]).inc()
837}
838
839pub fn selector_result_cache_hit() {
841 CACHE_HIT.with_label_values(&[SELECTOR_RESULT_TYPE]).inc()
842}
843
844#[derive(Default)]
846pub struct CacheManagerBuilder {
847 sst_meta_cache_size: u64,
848 vector_cache_size: u64,
849 page_cache_size: u64,
850 index_metadata_size: u64,
851 index_content_size: u64,
852 index_content_page_size: u64,
853 index_result_cache_size: u64,
854 puffin_metadata_size: u64,
855 write_cache: Option<WriteCacheRef>,
856 selector_result_cache_size: u64,
857 range_result_cache_size: u64,
858}
859
860impl CacheManagerBuilder {
861 pub fn sst_meta_cache_size(mut self, bytes: u64) -> Self {
863 self.sst_meta_cache_size = bytes;
864 self
865 }
866
867 pub fn vector_cache_size(mut self, bytes: u64) -> Self {
869 self.vector_cache_size = bytes;
870 self
871 }
872
873 pub fn page_cache_size(mut self, bytes: u64) -> Self {
875 self.page_cache_size = bytes;
876 self
877 }
878
879 pub fn write_cache(mut self, cache: Option<WriteCacheRef>) -> Self {
881 self.write_cache = cache;
882 self
883 }
884
885 pub fn index_metadata_size(mut self, bytes: u64) -> Self {
887 self.index_metadata_size = bytes;
888 self
889 }
890
891 pub fn index_content_size(mut self, bytes: u64) -> Self {
893 self.index_content_size = bytes;
894 self
895 }
896
897 pub fn index_content_page_size(mut self, bytes: u64) -> Self {
899 self.index_content_page_size = bytes;
900 self
901 }
902
903 pub fn index_result_cache_size(mut self, bytes: u64) -> Self {
905 self.index_result_cache_size = bytes;
906 self
907 }
908
909 pub fn puffin_metadata_size(mut self, bytes: u64) -> Self {
911 self.puffin_metadata_size = bytes;
912 self
913 }
914
915 pub fn selector_result_cache_size(mut self, bytes: u64) -> Self {
917 self.selector_result_cache_size = bytes;
918 self
919 }
920
921 pub fn range_result_cache_size(mut self, bytes: u64) -> Self {
923 self.range_result_cache_size = bytes;
924 self
925 }
926
927 pub fn build(self) -> CacheManager {
929 fn to_str(cause: RemovalCause) -> &'static str {
930 match cause {
931 RemovalCause::Expired => "expired",
932 RemovalCause::Explicit => "explicit",
933 RemovalCause::Replaced => "replaced",
934 RemovalCause::Size => "size",
935 }
936 }
937
938 let sst_meta_cache = (self.sst_meta_cache_size != 0).then(|| {
939 Cache::builder()
940 .max_capacity(self.sst_meta_cache_size)
941 .weigher(meta_cache_weight)
942 .eviction_listener(|k, v, cause| {
943 let size = meta_cache_weight(&k, &v);
944 CACHE_BYTES
945 .with_label_values(&[SST_META_TYPE])
946 .sub(size.into());
947 CACHE_EVICTION
948 .with_label_values(&[SST_META_TYPE, to_str(cause)])
949 .inc();
950 })
951 .build()
952 });
953 let vector_cache = (self.vector_cache_size != 0).then(|| {
954 Cache::builder()
955 .max_capacity(self.vector_cache_size)
956 .weigher(vector_cache_weight)
957 .eviction_listener(|k, v, cause| {
958 let size = vector_cache_weight(&k, &v);
959 CACHE_BYTES
960 .with_label_values(&[VECTOR_TYPE])
961 .sub(size.into());
962 CACHE_EVICTION
963 .with_label_values(&[VECTOR_TYPE, to_str(cause)])
964 .inc();
965 })
966 .build()
967 });
968 let page_cache = (self.page_cache_size != 0).then(|| {
969 Cache::builder()
970 .max_capacity(self.page_cache_size)
971 .weigher(page_cache_weight)
972 .eviction_listener(|k, v, cause| {
973 let size = page_cache_weight(&k, &v);
974 CACHE_BYTES.with_label_values(&[PAGE_TYPE]).sub(size.into());
975 CACHE_EVICTION
976 .with_label_values(&[PAGE_TYPE, to_str(cause)])
977 .inc();
978 })
979 .build()
980 });
981 let inverted_index_cache = InvertedIndexCache::new(
982 self.index_metadata_size,
983 self.index_content_size,
984 self.index_content_page_size,
985 );
986 let bloom_filter_index_cache = BloomFilterIndexCache::new(
988 self.index_metadata_size,
989 self.index_content_size,
990 self.index_content_page_size,
991 );
992 #[cfg(feature = "vector_index")]
993 let vector_index_cache = (self.index_content_size != 0)
994 .then(|| Arc::new(VectorIndexCache::new(self.index_content_size)));
995 let index_result_cache = (self.index_result_cache_size != 0)
996 .then(|| IndexResultCache::new(self.index_result_cache_size));
997 let puffin_metadata_cache =
998 PuffinMetadataCache::new(self.puffin_metadata_size, &CACHE_BYTES);
999 let selector_result_cache = (self.selector_result_cache_size != 0).then(|| {
1000 Cache::builder()
1001 .max_capacity(self.selector_result_cache_size)
1002 .weigher(selector_result_cache_weight)
1003 .eviction_listener(|k, v, cause| {
1004 let size = selector_result_cache_weight(&k, &v);
1005 CACHE_BYTES
1006 .with_label_values(&[SELECTOR_RESULT_TYPE])
1007 .sub(size.into());
1008 CACHE_EVICTION
1009 .with_label_values(&[SELECTOR_RESULT_TYPE, to_str(cause)])
1010 .inc();
1011 })
1012 .build()
1013 });
1014 let range_result_cache = (self.range_result_cache_size != 0).then(|| {
1015 Cache::builder()
1016 .max_capacity(self.range_result_cache_size)
1017 .weigher(range_result_cache_weight)
1018 .eviction_listener(move |k, v, cause| {
1019 let size = range_result_cache_weight(&k, &v);
1020 CACHE_BYTES
1021 .with_label_values(&[RANGE_RESULT_TYPE])
1022 .sub(size.into());
1023 CACHE_EVICTION
1024 .with_label_values(&[RANGE_RESULT_TYPE, to_str(cause)])
1025 .inc();
1026 })
1027 .build()
1028 });
1029 CacheManager {
1030 sst_meta_cache,
1031 vector_cache,
1032 page_cache,
1033 write_cache: self.write_cache,
1034 inverted_index_cache: Some(Arc::new(inverted_index_cache)),
1035 bloom_filter_index_cache: Some(Arc::new(bloom_filter_index_cache)),
1036 #[cfg(feature = "vector_index")]
1037 vector_index_cache,
1038 puffin_metadata_cache: Some(Arc::new(puffin_metadata_cache)),
1039 selector_result_cache,
1040 range_result_cache,
1041 range_result_memory_limiter: Arc::new(RangeResultMemoryLimiter::default()),
1042 index_result_cache,
1043 }
1044 }
1045}
1046
1047fn meta_cache_weight(k: &SstMetaKey, v: &Arc<CachedSstMeta>) -> u32 {
1048 (k.estimated_size() + parquet_meta_size(&v.parquet_metadata) + v.region_metadata_weight) as u32
1050}
1051
1052fn vector_cache_weight(_k: &(ConcreteDataType, Value), v: &VectorRef) -> u32 {
1053 (mem::size_of::<ConcreteDataType>() + mem::size_of::<Value>() + v.memory_size()) as u32
1055}
1056
1057fn page_cache_weight(k: &PageKey, v: &Arc<PageValue>) -> u32 {
1058 (k.estimated_size() + v.estimated_size()) as u32
1059}
1060
1061fn selector_result_cache_weight(k: &SelectorResultKey, v: &Arc<SelectorResultValue>) -> u32 {
1062 (mem::size_of_val(k) + v.estimated_size()) as u32
1063}
1064
1065fn range_result_cache_weight(k: &RangeScanCacheKey, v: &Arc<RangeScanCacheValue>) -> u32 {
1066 (k.estimated_size() + v.estimated_size()) as u32
1067}
1068
1069fn update_hit_miss<T>(value: Option<T>, cache_type: &str) -> Option<T> {
1071 if value.is_some() {
1072 CACHE_HIT.with_label_values(&[cache_type]).inc();
1073 } else {
1074 CACHE_MISS.with_label_values(&[cache_type]).inc();
1075 }
1076 value
1077}
1078
1079#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1081struct SstMetaKey(RegionId, FileId);
1082
1083impl SstMetaKey {
1084 fn estimated_size(&self) -> usize {
1086 mem::size_of::<Self>()
1087 }
1088}
1089
1090#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1092pub struct ColumnPagePath {
1093 region_id: RegionId,
1095 file_id: FileId,
1097 row_group_idx: usize,
1099 column_idx: usize,
1101}
1102
1103#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1108pub struct PageKey {
1109 file_id: FileId,
1111 row_group_idx: usize,
1113 ranges: Vec<Range<u64>>,
1115}
1116
1117impl PageKey {
1118 pub fn new(file_id: FileId, row_group_idx: usize, ranges: Vec<Range<u64>>) -> PageKey {
1120 PageKey {
1121 file_id,
1122 row_group_idx,
1123 ranges,
1124 }
1125 }
1126
1127 fn estimated_size(&self) -> usize {
1129 mem::size_of::<Self>() + mem::size_of_val(self.ranges.as_slice())
1130 }
1131}
1132
1133#[derive(Default)]
1136pub struct PageValue {
1137 pub compressed: Vec<Bytes>,
1139 pub page_size: u64,
1141}
1142
1143impl PageValue {
1144 pub fn new(bytes: Vec<Bytes>, page_size: u64) -> PageValue {
1146 PageValue {
1147 compressed: bytes,
1148 page_size,
1149 }
1150 }
1151
1152 fn estimated_size(&self) -> usize {
1154 mem::size_of::<Self>()
1155 + self.page_size as usize
1156 + self.compressed.iter().map(mem::size_of_val).sum::<usize>()
1157 }
1158}
1159
1160#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1162pub struct SelectorResultKey {
1163 pub file_id: FileId,
1165 pub row_group_idx: usize,
1167 pub selector: TimeSeriesRowSelector,
1169}
1170
1171pub enum SelectorResult {
1173 PrimaryKey(Vec<Batch>),
1175 Flat(Vec<RecordBatch>),
1177}
1178
1179pub struct SelectorResultValue {
1181 pub result: SelectorResult,
1183 pub projection: Vec<usize>,
1185}
1186
1187impl SelectorResultValue {
1188 pub fn new(result: Vec<Batch>, projection: Vec<usize>) -> SelectorResultValue {
1190 SelectorResultValue {
1191 result: SelectorResult::PrimaryKey(result),
1192 projection,
1193 }
1194 }
1195
1196 pub fn new_flat(result: Vec<RecordBatch>, projection: Vec<usize>) -> SelectorResultValue {
1198 SelectorResultValue {
1199 result: SelectorResult::Flat(result),
1200 projection,
1201 }
1202 }
1203
1204 fn estimated_size(&self) -> usize {
1206 match &self.result {
1207 SelectorResult::PrimaryKey(batches) => {
1208 batches.iter().map(|batch| batch.memory_size()).sum()
1209 }
1210 SelectorResult::Flat(batches) => batches.iter().map(record_batch_estimated_size).sum(),
1211 }
1212 }
1213}
1214
1215type SstMetaCache = Cache<SstMetaKey, Arc<CachedSstMeta>>;
1217type VectorCache = Cache<(ConcreteDataType, Value), VectorRef>;
1221type PageCache = Cache<PageKey, Arc<PageValue>>;
1223type SelectorResultCache = Cache<SelectorResultKey, Arc<SelectorResultValue>>;
1225type RangeResultCache = Cache<RangeScanCacheKey, Arc<RangeScanCacheValue>>;
1227
1228#[cfg(test)]
1229mod tests {
1230 use std::sync::Arc;
1231
1232 use api::v1::SemanticType;
1233 use api::v1::index::{BloomFilterMeta, InvertedIndexMetas};
1234 use datatypes::schema::ColumnSchema;
1235 use datatypes::vectors::Int64Vector;
1236 use puffin::file_metadata::FileMetadata;
1237 use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
1238 use store_api::storage::ColumnId;
1239
1240 use super::*;
1241 use crate::cache::index::bloom_filter_index::Tag;
1242 use crate::cache::index::result_cache::PredicateKey;
1243 use crate::cache::test_util::{
1244 parquet_meta, sst_parquet_meta, sst_parquet_meta_with_region_metadata,
1245 };
1246 use crate::read::range_cache::{
1247 RangeScanCacheKey, RangeScanCacheValue, ScanRequestFingerprintBuilder,
1248 };
1249 use crate::sst::parquet::row_selection::RowGroupSelection;
1250
1251 #[tokio::test]
1252 async fn test_disable_cache() {
1253 let cache = CacheManager::default();
1254 assert!(cache.sst_meta_cache.is_none());
1255 assert!(cache.vector_cache.is_none());
1256 assert!(cache.page_cache.is_none());
1257
1258 let region_id = RegionId::new(1, 1);
1259 let file_id = RegionFileId::new(region_id, FileId::random());
1260 let metadata = parquet_meta();
1261 let mut metrics = MetadataCacheMetrics::default();
1262 cache.put_parquet_meta_data(file_id, metadata, None);
1263 assert!(
1264 cache
1265 .get_parquet_meta_data(file_id, &mut metrics, Default::default())
1266 .await
1267 .is_none()
1268 );
1269
1270 let value = Value::Int64(10);
1271 let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10]));
1272 cache.put_repeated_vector(value.clone(), vector.clone());
1273 assert!(
1274 cache
1275 .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
1276 .is_none()
1277 );
1278
1279 let key = PageKey::new(file_id.file_id(), 1, vec![Range { start: 0, end: 5 }]);
1280 let pages = Arc::new(PageValue::default());
1281 cache.put_pages(key.clone(), pages);
1282 assert!(cache.get_pages(&key).is_none());
1283
1284 assert!(cache.write_cache().is_none());
1285 }
1286
1287 #[tokio::test]
1288 async fn test_parquet_meta_cache() {
1289 let cache = CacheManager::builder().sst_meta_cache_size(2000).build();
1290 let mut metrics = MetadataCacheMetrics::default();
1291 let region_id = RegionId::new(1, 1);
1292 let file_id = RegionFileId::new(region_id, FileId::random());
1293 assert!(
1294 cache
1295 .get_parquet_meta_data(file_id, &mut metrics, Default::default())
1296 .await
1297 .is_none()
1298 );
1299 let (metadata, region_metadata) = sst_parquet_meta();
1300 cache.put_parquet_meta_data(file_id, metadata, None);
1301 let cached = cache
1302 .get_sst_meta_data(file_id, &mut metrics, Default::default())
1303 .await
1304 .unwrap();
1305 assert_eq!(region_metadata, cached.region_metadata());
1306 assert!(
1307 cached
1308 .parquet_metadata()
1309 .file_metadata()
1310 .key_value_metadata()
1311 .is_none_or(|key_values| {
1312 key_values
1313 .iter()
1314 .all(|key_value| key_value.key != PARQUET_METADATA_KEY)
1315 })
1316 );
1317 cache.remove_parquet_meta_data(file_id);
1318 assert!(
1319 cache
1320 .get_parquet_meta_data(file_id, &mut metrics, Default::default())
1321 .await
1322 .is_none()
1323 );
1324 }
1325
1326 #[tokio::test]
1327 async fn test_parquet_meta_cache_with_provided_region_metadata() {
1328 let cache = CacheManager::builder().sst_meta_cache_size(2000).build();
1329 let mut metrics = MetadataCacheMetrics::default();
1330 let region_id = RegionId::new(1, 1);
1331 let file_id = RegionFileId::new(region_id, FileId::random());
1332 let (metadata, region_metadata) = sst_parquet_meta();
1333
1334 cache.put_parquet_meta_data(file_id, metadata, Some(region_metadata.clone()));
1335
1336 let cached = cache
1337 .get_sst_meta_data(file_id, &mut metrics, Default::default())
1338 .await
1339 .unwrap();
1340 assert!(Arc::ptr_eq(®ion_metadata, &cached.region_metadata()));
1341 }
1342
1343 #[test]
1344 fn test_meta_cache_weight_accounts_for_decoded_region_metadata() {
1345 let region_metadata = Arc::new(wide_region_metadata(128));
1346 let json_len = region_metadata.to_json().unwrap().len();
1347 let metadata = sst_parquet_meta_with_region_metadata(region_metadata.clone());
1348 let cached = Arc::new(
1349 CachedSstMeta::try_new("test.parquet", Arc::unwrap_or_clone(metadata)).unwrap(),
1350 );
1351 let key = SstMetaKey(region_metadata.region_id, FileId::random());
1352
1353 assert!(cached.region_metadata_weight > json_len);
1354 assert_eq!(
1355 meta_cache_weight(&key, &cached) as usize,
1356 key.estimated_size()
1357 + parquet_meta_size(&cached.parquet_metadata)
1358 + cached.region_metadata_weight
1359 );
1360 }
1361
1362 #[test]
1363 fn test_repeated_vector_cache() {
1364 let cache = CacheManager::builder().vector_cache_size(4096).build();
1365 let value = Value::Int64(10);
1366 assert!(
1367 cache
1368 .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
1369 .is_none()
1370 );
1371 let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10]));
1372 cache.put_repeated_vector(value.clone(), vector.clone());
1373 let cached = cache
1374 .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
1375 .unwrap();
1376 assert_eq!(vector, cached);
1377 }
1378
1379 #[test]
1380 fn test_page_cache() {
1381 let cache = CacheManager::builder().page_cache_size(1000).build();
1382 let file_id = FileId::random();
1383 let key = PageKey::new(file_id, 0, vec![(0..10), (10..20)]);
1384 assert!(cache.get_pages(&key).is_none());
1385 let pages = Arc::new(PageValue::default());
1386 cache.put_pages(key.clone(), pages);
1387 assert!(cache.get_pages(&key).is_some());
1388 }
1389
1390 #[test]
1391 fn test_selector_result_cache() {
1392 let cache = CacheManager::builder()
1393 .selector_result_cache_size(1000)
1394 .build();
1395 let file_id = FileId::random();
1396 let key = SelectorResultKey {
1397 file_id,
1398 row_group_idx: 0,
1399 selector: TimeSeriesRowSelector::LastRow,
1400 };
1401 assert!(cache.get_selector_result(&key).is_none());
1402 let result = Arc::new(SelectorResultValue::new(Vec::new(), Vec::new()));
1403 cache.put_selector_result(key, result);
1404 assert!(cache.get_selector_result(&key).is_some());
1405 }
1406
1407 #[test]
1408 fn test_range_result_cache() {
1409 let cache = Arc::new(
1410 CacheManager::builder()
1411 .range_result_cache_size(1024 * 1024)
1412 .build(),
1413 );
1414
1415 let key = RangeScanCacheKey {
1416 region_id: RegionId::new(1, 1),
1417 row_groups: vec![(FileId::random(), 0)],
1418 scan: ScanRequestFingerprintBuilder {
1419 read_column_ids: vec![],
1420 read_column_types: vec![],
1421 filters: vec!["tag_0 = 1".to_string()],
1422 time_filters: vec![],
1423 series_row_selector: None,
1424 append_mode: false,
1425 filter_deleted: true,
1426 merge_mode: crate::region::options::MergeMode::LastRow,
1427 partition_expr_version: 0,
1428 }
1429 .build(),
1430 };
1431 let value = Arc::new(RangeScanCacheValue::new(Vec::new(), 0));
1432
1433 assert!(cache.get_range_result(&key).is_none());
1434 cache.put_range_result(key.clone(), value.clone());
1435 assert!(cache.get_range_result(&key).is_some());
1436
1437 let enable_all = CacheStrategy::EnableAll(cache.clone());
1438 assert!(enable_all.get_range_result(&key).is_some());
1439
1440 let compaction = CacheStrategy::Compaction(cache.clone());
1441 assert!(compaction.get_range_result(&key).is_none());
1442 compaction.put_range_result(key.clone(), value.clone());
1443 assert!(cache.get_range_result(&key).is_some());
1444
1445 let disabled = CacheStrategy::Disabled;
1446 assert!(disabled.get_range_result(&key).is_none());
1447 disabled.put_range_result(key.clone(), value);
1448 assert!(cache.get_range_result(&key).is_some());
1449 }
1450
1451 #[tokio::test]
1452 async fn test_evict_puffin_cache_clears_all_entries() {
1453 use std::collections::{BTreeMap, HashMap};
1454
1455 let cache = CacheManager::builder()
1456 .index_metadata_size(128)
1457 .index_content_size(128)
1458 .index_content_page_size(64)
1459 .index_result_cache_size(128)
1460 .puffin_metadata_size(128)
1461 .build();
1462 let cache = Arc::new(cache);
1463
1464 let region_id = RegionId::new(1, 1);
1465 let index_id = RegionIndexId::new(RegionFileId::new(region_id, FileId::random()), 0);
1466 let column_id: ColumnId = 1;
1467
1468 let bloom_cache = cache.bloom_filter_index_cache().unwrap().clone();
1469 let inverted_cache = cache.inverted_index_cache().unwrap().clone();
1470 let result_cache = cache.index_result_cache().unwrap();
1471 let puffin_metadata_cache = cache.puffin_metadata_cache().unwrap().clone();
1472
1473 let bloom_key = (
1474 index_id.file_id(),
1475 index_id.version,
1476 column_id,
1477 Tag::Skipping,
1478 );
1479 bloom_cache.put_metadata(bloom_key, Arc::new(BloomFilterMeta::default()));
1480 inverted_cache.put_metadata(
1481 (index_id.file_id(), index_id.version),
1482 Arc::new(InvertedIndexMetas::default()),
1483 );
1484 let predicate = PredicateKey::new_bloom(Arc::new(BTreeMap::new()));
1485 let selection = Arc::new(RowGroupSelection::default());
1486 result_cache.put(predicate.clone(), index_id.file_id(), selection);
1487 let file_id_str = index_id.to_string();
1488 let metadata = Arc::new(FileMetadata {
1489 blobs: Vec::new(),
1490 properties: HashMap::new(),
1491 });
1492 puffin_metadata_cache.put_metadata(file_id_str.clone(), metadata);
1493
1494 assert!(bloom_cache.get_metadata(bloom_key).is_some());
1495 assert!(
1496 inverted_cache
1497 .get_metadata((index_id.file_id(), index_id.version))
1498 .is_some()
1499 );
1500 assert!(result_cache.get(&predicate, index_id.file_id()).is_some());
1501 assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_some());
1502
1503 cache.evict_puffin_cache(index_id).await;
1504
1505 assert!(bloom_cache.get_metadata(bloom_key).is_none());
1506 assert!(
1507 inverted_cache
1508 .get_metadata((index_id.file_id(), index_id.version))
1509 .is_none()
1510 );
1511 assert!(result_cache.get(&predicate, index_id.file_id()).is_none());
1512 assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_none());
1513
1514 bloom_cache.put_metadata(bloom_key, Arc::new(BloomFilterMeta::default()));
1516 inverted_cache.put_metadata(
1517 (index_id.file_id(), index_id.version),
1518 Arc::new(InvertedIndexMetas::default()),
1519 );
1520 result_cache.put(
1521 predicate.clone(),
1522 index_id.file_id(),
1523 Arc::new(RowGroupSelection::default()),
1524 );
1525 puffin_metadata_cache.put_metadata(
1526 file_id_str.clone(),
1527 Arc::new(FileMetadata {
1528 blobs: Vec::new(),
1529 properties: HashMap::new(),
1530 }),
1531 );
1532
1533 let strategy = CacheStrategy::EnableAll(cache.clone());
1534 strategy.evict_puffin_cache(index_id).await;
1535
1536 assert!(bloom_cache.get_metadata(bloom_key).is_none());
1537 assert!(
1538 inverted_cache
1539 .get_metadata((index_id.file_id(), index_id.version))
1540 .is_none()
1541 );
1542 assert!(result_cache.get(&predicate, index_id.file_id()).is_none());
1543 assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_none());
1544 }
1545
1546 fn wide_region_metadata(column_count: u32) -> RegionMetadata {
1547 let region_id = RegionId::new(1024, 7);
1548 let mut builder = RegionMetadataBuilder::new(region_id);
1549 let mut primary_key = Vec::new();
1550
1551 for column_id in 0..column_count {
1552 let semantic_type = if column_id < 32 {
1553 primary_key.push(column_id);
1554 SemanticType::Tag
1555 } else {
1556 SemanticType::Field
1557 };
1558 let mut column_schema = ColumnSchema::new(
1559 format!("wide_column_{column_id}"),
1560 ConcreteDataType::string_datatype(),
1561 true,
1562 );
1563 column_schema
1564 .mut_metadata()
1565 .insert(format!("cache_key_{column_id}"), "cache_value".repeat(4));
1566 builder.push_column_metadata(ColumnMetadata {
1567 column_schema,
1568 semantic_type,
1569 column_id,
1570 });
1571 }
1572
1573 builder.push_column_metadata(ColumnMetadata {
1574 column_schema: ColumnSchema::new(
1575 "ts",
1576 ConcreteDataType::timestamp_millisecond_datatype(),
1577 false,
1578 ),
1579 semantic_type: SemanticType::Timestamp,
1580 column_id: column_count,
1581 });
1582 builder.primary_key(primary_key);
1583
1584 builder.build().unwrap()
1585 }
1586}