1mod cache_size;
18
19pub(crate) mod file_cache;
20pub(crate) mod index;
21#[cfg(test)]
22pub(crate) mod test_util;
23pub(crate) mod write_cache;
24
25use std::mem;
26use std::ops::Range;
27use std::sync::Arc;
28
29use bytes::Bytes;
30use datatypes::value::Value;
31use datatypes::vectors::VectorRef;
32use index::bloom_filter_index::{BloomFilterIndexCache, BloomFilterIndexCacheRef};
33use index::result_cache::IndexResultCache;
34use moka::notification::RemovalCause;
35use moka::sync::Cache;
36use parquet::file::metadata::ParquetMetaData;
37use puffin::puffin_manager::cache::{PuffinMetadataCache, PuffinMetadataCacheRef};
38use store_api::storage::{ConcreteDataType, FileId, RegionId, TimeSeriesRowSelector};
39
40use crate::cache::cache_size::parquet_meta_size;
41use crate::cache::file_cache::{FileType, IndexKey};
42use crate::cache::index::inverted_index::{InvertedIndexCache, InvertedIndexCacheRef};
43use crate::cache::write_cache::WriteCacheRef;
44use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS};
45use crate::read::Batch;
46use crate::sst::file::RegionFileId;
47
48const SST_META_TYPE: &str = "sst_meta";
50const VECTOR_TYPE: &str = "vector";
52const PAGE_TYPE: &str = "page";
54const FILE_TYPE: &str = "file";
56const SELECTOR_RESULT_TYPE: &str = "selector_result";
58
59#[derive(Clone)]
61pub enum CacheStrategy {
62 EnableAll(CacheManagerRef),
65 Compaction(CacheManagerRef),
70 Disabled,
72}
73
74impl CacheStrategy {
75 pub async fn get_parquet_meta_data(
77 &self,
78 file_id: RegionFileId,
79 ) -> Option<Arc<ParquetMetaData>> {
80 match self {
81 CacheStrategy::EnableAll(cache_manager) => {
82 cache_manager.get_parquet_meta_data(file_id).await
83 }
84 CacheStrategy::Compaction(cache_manager) => {
85 cache_manager.get_parquet_meta_data(file_id).await
86 }
87 CacheStrategy::Disabled => None,
88 }
89 }
90
91 pub fn get_parquet_meta_data_from_mem_cache(
93 &self,
94 file_id: RegionFileId,
95 ) -> Option<Arc<ParquetMetaData>> {
96 match self {
97 CacheStrategy::EnableAll(cache_manager) => {
98 cache_manager.get_parquet_meta_data_from_mem_cache(file_id)
99 }
100 CacheStrategy::Compaction(cache_manager) => {
101 cache_manager.get_parquet_meta_data_from_mem_cache(file_id)
102 }
103 CacheStrategy::Disabled => None,
104 }
105 }
106
107 pub fn put_parquet_meta_data(&self, file_id: RegionFileId, metadata: Arc<ParquetMetaData>) {
109 match self {
110 CacheStrategy::EnableAll(cache_manager) => {
111 cache_manager.put_parquet_meta_data(file_id, metadata);
112 }
113 CacheStrategy::Compaction(cache_manager) => {
114 cache_manager.put_parquet_meta_data(file_id, metadata);
115 }
116 CacheStrategy::Disabled => {}
117 }
118 }
119
120 pub fn remove_parquet_meta_data(&self, file_id: RegionFileId) {
122 match self {
123 CacheStrategy::EnableAll(cache_manager) => {
124 cache_manager.remove_parquet_meta_data(file_id);
125 }
126 CacheStrategy::Compaction(cache_manager) => {
127 cache_manager.remove_parquet_meta_data(file_id);
128 }
129 CacheStrategy::Disabled => {}
130 }
131 }
132
133 pub fn get_repeated_vector(
136 &self,
137 data_type: &ConcreteDataType,
138 value: &Value,
139 ) -> Option<VectorRef> {
140 match self {
141 CacheStrategy::EnableAll(cache_manager) => {
142 cache_manager.get_repeated_vector(data_type, value)
143 }
144 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
145 }
146 }
147
148 pub fn put_repeated_vector(&self, value: Value, vector: VectorRef) {
151 if let CacheStrategy::EnableAll(cache_manager) = self {
152 cache_manager.put_repeated_vector(value, vector);
153 }
154 }
155
156 pub fn get_pages(&self, page_key: &PageKey) -> Option<Arc<PageValue>> {
159 match self {
160 CacheStrategy::EnableAll(cache_manager) => cache_manager.get_pages(page_key),
161 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
162 }
163 }
164
165 pub fn put_pages(&self, page_key: PageKey, pages: Arc<PageValue>) {
168 if let CacheStrategy::EnableAll(cache_manager) = self {
169 cache_manager.put_pages(page_key, pages);
170 }
171 }
172
173 pub async fn evict_puffin_cache(&self, file_id: RegionFileId) {
175 match self {
176 CacheStrategy::EnableAll(cache_manager) => {
177 cache_manager.evict_puffin_cache(file_id).await
178 }
179 CacheStrategy::Compaction(cache_manager) => {
180 cache_manager.evict_puffin_cache(file_id).await
181 }
182 CacheStrategy::Disabled => {}
183 }
184 }
185
186 pub fn get_selector_result(
189 &self,
190 selector_key: &SelectorResultKey,
191 ) -> Option<Arc<SelectorResultValue>> {
192 match self {
193 CacheStrategy::EnableAll(cache_manager) => {
194 cache_manager.get_selector_result(selector_key)
195 }
196 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
197 }
198 }
199
200 pub fn put_selector_result(
203 &self,
204 selector_key: SelectorResultKey,
205 result: Arc<SelectorResultValue>,
206 ) {
207 if let CacheStrategy::EnableAll(cache_manager) = self {
208 cache_manager.put_selector_result(selector_key, result);
209 }
210 }
211
212 pub fn write_cache(&self) -> Option<&WriteCacheRef> {
215 match self {
216 CacheStrategy::EnableAll(cache_manager) => cache_manager.write_cache(),
217 CacheStrategy::Compaction(cache_manager) => cache_manager.write_cache(),
218 CacheStrategy::Disabled => None,
219 }
220 }
221
222 pub fn inverted_index_cache(&self) -> Option<&InvertedIndexCacheRef> {
225 match self {
226 CacheStrategy::EnableAll(cache_manager) => cache_manager.inverted_index_cache(),
227 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
228 }
229 }
230
231 pub fn bloom_filter_index_cache(&self) -> Option<&BloomFilterIndexCacheRef> {
234 match self {
235 CacheStrategy::EnableAll(cache_manager) => cache_manager.bloom_filter_index_cache(),
236 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
237 }
238 }
239
240 pub fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
243 match self {
244 CacheStrategy::EnableAll(cache_manager) => cache_manager.puffin_metadata_cache(),
245 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
246 }
247 }
248
249 pub fn index_result_cache(&self) -> Option<&IndexResultCache> {
252 match self {
253 CacheStrategy::EnableAll(cache_manager) => cache_manager.index_result_cache(),
254 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
255 }
256 }
257}
258
259#[derive(Default)]
263pub struct CacheManager {
264 sst_meta_cache: Option<SstMetaCache>,
266 vector_cache: Option<VectorCache>,
268 page_cache: Option<PageCache>,
270 write_cache: Option<WriteCacheRef>,
272 inverted_index_cache: Option<InvertedIndexCacheRef>,
274 bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
276 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
278 selector_result_cache: Option<SelectorResultCache>,
280 index_result_cache: Option<IndexResultCache>,
282}
283
284pub type CacheManagerRef = Arc<CacheManager>;
285
286impl CacheManager {
287 pub fn builder() -> CacheManagerBuilder {
289 CacheManagerBuilder::default()
290 }
291
292 pub async fn get_parquet_meta_data(
295 &self,
296 file_id: RegionFileId,
297 ) -> Option<Arc<ParquetMetaData>> {
298 let metadata = self.get_parquet_meta_data_from_mem_cache(file_id);
300 if metadata.is_some() {
301 return metadata;
302 }
303
304 let key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Parquet);
306 if let Some(write_cache) = &self.write_cache
307 && let Some(metadata) = write_cache.file_cache().get_parquet_meta_data(key).await
308 {
309 let metadata = Arc::new(metadata);
310 self.put_parquet_meta_data(file_id, metadata.clone());
312 return Some(metadata);
313 };
314
315 None
316 }
317
318 pub fn get_parquet_meta_data_from_mem_cache(
321 &self,
322 file_id: RegionFileId,
323 ) -> Option<Arc<ParquetMetaData>> {
324 self.sst_meta_cache.as_ref().and_then(|sst_meta_cache| {
326 let value = sst_meta_cache.get(&SstMetaKey(file_id.region_id(), file_id.file_id()));
327 update_hit_miss(value, SST_META_TYPE)
328 })
329 }
330
331 pub fn put_parquet_meta_data(&self, file_id: RegionFileId, metadata: Arc<ParquetMetaData>) {
333 if let Some(cache) = &self.sst_meta_cache {
334 let key = SstMetaKey(file_id.region_id(), file_id.file_id());
335 CACHE_BYTES
336 .with_label_values(&[SST_META_TYPE])
337 .add(meta_cache_weight(&key, &metadata).into());
338 cache.insert(key, metadata);
339 }
340 }
341
342 pub fn remove_parquet_meta_data(&self, file_id: RegionFileId) {
344 if let Some(cache) = &self.sst_meta_cache {
345 cache.remove(&SstMetaKey(file_id.region_id(), file_id.file_id()));
346 }
347 }
348
349 pub fn get_repeated_vector(
351 &self,
352 data_type: &ConcreteDataType,
353 value: &Value,
354 ) -> Option<VectorRef> {
355 self.vector_cache.as_ref().and_then(|vector_cache| {
356 let value = vector_cache.get(&(data_type.clone(), value.clone()));
357 update_hit_miss(value, VECTOR_TYPE)
358 })
359 }
360
361 pub fn put_repeated_vector(&self, value: Value, vector: VectorRef) {
363 if let Some(cache) = &self.vector_cache {
364 let key = (vector.data_type(), value);
365 CACHE_BYTES
366 .with_label_values(&[VECTOR_TYPE])
367 .add(vector_cache_weight(&key, &vector).into());
368 cache.insert(key, vector);
369 }
370 }
371
372 pub fn get_pages(&self, page_key: &PageKey) -> Option<Arc<PageValue>> {
374 self.page_cache.as_ref().and_then(|page_cache| {
375 let value = page_cache.get(page_key);
376 update_hit_miss(value, PAGE_TYPE)
377 })
378 }
379
380 pub fn put_pages(&self, page_key: PageKey, pages: Arc<PageValue>) {
382 if let Some(cache) = &self.page_cache {
383 CACHE_BYTES
384 .with_label_values(&[PAGE_TYPE])
385 .add(page_cache_weight(&page_key, &pages).into());
386 cache.insert(page_key, pages);
387 }
388 }
389
390 pub async fn evict_puffin_cache(&self, file_id: RegionFileId) {
392 if let Some(cache) = &self.bloom_filter_index_cache {
393 cache.invalidate_file(file_id.file_id());
394 }
395
396 if let Some(cache) = &self.inverted_index_cache {
397 cache.invalidate_file(file_id.file_id());
398 }
399
400 if let Some(cache) = &self.index_result_cache {
401 cache.invalidate_file(file_id.file_id());
402 }
403
404 if let Some(cache) = &self.puffin_metadata_cache {
405 cache.remove(&file_id.to_string());
406 }
407
408 if let Some(write_cache) = &self.write_cache {
409 write_cache
410 .remove(IndexKey::new(
411 file_id.region_id(),
412 file_id.file_id(),
413 FileType::Puffin,
414 ))
415 .await;
416 }
417 }
418
419 pub fn get_selector_result(
421 &self,
422 selector_key: &SelectorResultKey,
423 ) -> Option<Arc<SelectorResultValue>> {
424 self.selector_result_cache
425 .as_ref()
426 .and_then(|selector_result_cache| selector_result_cache.get(selector_key))
427 }
428
429 pub fn put_selector_result(
431 &self,
432 selector_key: SelectorResultKey,
433 result: Arc<SelectorResultValue>,
434 ) {
435 if let Some(cache) = &self.selector_result_cache {
436 CACHE_BYTES
437 .with_label_values(&[SELECTOR_RESULT_TYPE])
438 .add(selector_result_cache_weight(&selector_key, &result).into());
439 cache.insert(selector_key, result);
440 }
441 }
442
443 pub(crate) fn write_cache(&self) -> Option<&WriteCacheRef> {
445 self.write_cache.as_ref()
446 }
447
448 pub(crate) fn inverted_index_cache(&self) -> Option<&InvertedIndexCacheRef> {
449 self.inverted_index_cache.as_ref()
450 }
451
452 pub(crate) fn bloom_filter_index_cache(&self) -> Option<&BloomFilterIndexCacheRef> {
453 self.bloom_filter_index_cache.as_ref()
454 }
455
456 pub(crate) fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
457 self.puffin_metadata_cache.as_ref()
458 }
459
460 pub(crate) fn index_result_cache(&self) -> Option<&IndexResultCache> {
461 self.index_result_cache.as_ref()
462 }
463}
464
465pub fn selector_result_cache_miss() {
467 CACHE_MISS.with_label_values(&[SELECTOR_RESULT_TYPE]).inc()
468}
469
470pub fn selector_result_cache_hit() {
472 CACHE_HIT.with_label_values(&[SELECTOR_RESULT_TYPE]).inc()
473}
474
475#[derive(Default)]
477pub struct CacheManagerBuilder {
478 sst_meta_cache_size: u64,
479 vector_cache_size: u64,
480 page_cache_size: u64,
481 index_metadata_size: u64,
482 index_content_size: u64,
483 index_content_page_size: u64,
484 index_result_cache_size: u64,
485 puffin_metadata_size: u64,
486 write_cache: Option<WriteCacheRef>,
487 selector_result_cache_size: u64,
488}
489
490impl CacheManagerBuilder {
491 pub fn sst_meta_cache_size(mut self, bytes: u64) -> Self {
493 self.sst_meta_cache_size = bytes;
494 self
495 }
496
497 pub fn vector_cache_size(mut self, bytes: u64) -> Self {
499 self.vector_cache_size = bytes;
500 self
501 }
502
503 pub fn page_cache_size(mut self, bytes: u64) -> Self {
505 self.page_cache_size = bytes;
506 self
507 }
508
509 pub fn write_cache(mut self, cache: Option<WriteCacheRef>) -> Self {
511 self.write_cache = cache;
512 self
513 }
514
515 pub fn index_metadata_size(mut self, bytes: u64) -> Self {
517 self.index_metadata_size = bytes;
518 self
519 }
520
521 pub fn index_content_size(mut self, bytes: u64) -> Self {
523 self.index_content_size = bytes;
524 self
525 }
526
527 pub fn index_content_page_size(mut self, bytes: u64) -> Self {
529 self.index_content_page_size = bytes;
530 self
531 }
532
533 pub fn index_result_cache_size(mut self, bytes: u64) -> Self {
535 self.index_result_cache_size = bytes;
536 self
537 }
538
539 pub fn puffin_metadata_size(mut self, bytes: u64) -> Self {
541 self.puffin_metadata_size = bytes;
542 self
543 }
544
545 pub fn selector_result_cache_size(mut self, bytes: u64) -> Self {
547 self.selector_result_cache_size = bytes;
548 self
549 }
550
551 pub fn build(self) -> CacheManager {
553 fn to_str(cause: RemovalCause) -> &'static str {
554 match cause {
555 RemovalCause::Expired => "expired",
556 RemovalCause::Explicit => "explicit",
557 RemovalCause::Replaced => "replaced",
558 RemovalCause::Size => "size",
559 }
560 }
561
562 let sst_meta_cache = (self.sst_meta_cache_size != 0).then(|| {
563 Cache::builder()
564 .max_capacity(self.sst_meta_cache_size)
565 .weigher(meta_cache_weight)
566 .eviction_listener(|k, v, cause| {
567 let size = meta_cache_weight(&k, &v);
568 CACHE_BYTES
569 .with_label_values(&[SST_META_TYPE])
570 .sub(size.into());
571 CACHE_EVICTION
572 .with_label_values(&[SST_META_TYPE, to_str(cause)])
573 .inc();
574 })
575 .build()
576 });
577 let vector_cache = (self.vector_cache_size != 0).then(|| {
578 Cache::builder()
579 .max_capacity(self.vector_cache_size)
580 .weigher(vector_cache_weight)
581 .eviction_listener(|k, v, cause| {
582 let size = vector_cache_weight(&k, &v);
583 CACHE_BYTES
584 .with_label_values(&[VECTOR_TYPE])
585 .sub(size.into());
586 CACHE_EVICTION
587 .with_label_values(&[VECTOR_TYPE, to_str(cause)])
588 .inc();
589 })
590 .build()
591 });
592 let page_cache = (self.page_cache_size != 0).then(|| {
593 Cache::builder()
594 .max_capacity(self.page_cache_size)
595 .weigher(page_cache_weight)
596 .eviction_listener(|k, v, cause| {
597 let size = page_cache_weight(&k, &v);
598 CACHE_BYTES.with_label_values(&[PAGE_TYPE]).sub(size.into());
599 CACHE_EVICTION
600 .with_label_values(&[PAGE_TYPE, to_str(cause)])
601 .inc();
602 })
603 .build()
604 });
605 let inverted_index_cache = InvertedIndexCache::new(
606 self.index_metadata_size,
607 self.index_content_size,
608 self.index_content_page_size,
609 );
610 let bloom_filter_index_cache = BloomFilterIndexCache::new(
612 self.index_metadata_size,
613 self.index_content_size,
614 self.index_content_page_size,
615 );
616 let index_result_cache = (self.index_result_cache_size != 0)
617 .then(|| IndexResultCache::new(self.index_result_cache_size));
618 let puffin_metadata_cache =
619 PuffinMetadataCache::new(self.puffin_metadata_size, &CACHE_BYTES);
620 let selector_result_cache = (self.selector_result_cache_size != 0).then(|| {
621 Cache::builder()
622 .max_capacity(self.selector_result_cache_size)
623 .weigher(selector_result_cache_weight)
624 .eviction_listener(|k, v, cause| {
625 let size = selector_result_cache_weight(&k, &v);
626 CACHE_BYTES
627 .with_label_values(&[SELECTOR_RESULT_TYPE])
628 .sub(size.into());
629 CACHE_EVICTION
630 .with_label_values(&[SELECTOR_RESULT_TYPE, to_str(cause)])
631 .inc();
632 })
633 .build()
634 });
635 CacheManager {
636 sst_meta_cache,
637 vector_cache,
638 page_cache,
639 write_cache: self.write_cache,
640 inverted_index_cache: Some(Arc::new(inverted_index_cache)),
641 bloom_filter_index_cache: Some(Arc::new(bloom_filter_index_cache)),
642 puffin_metadata_cache: Some(Arc::new(puffin_metadata_cache)),
643 selector_result_cache,
644 index_result_cache,
645 }
646 }
647}
648
649fn meta_cache_weight(k: &SstMetaKey, v: &Arc<ParquetMetaData>) -> u32 {
650 (k.estimated_size() + parquet_meta_size(v)) as u32
652}
653
654fn vector_cache_weight(_k: &(ConcreteDataType, Value), v: &VectorRef) -> u32 {
655 (mem::size_of::<ConcreteDataType>() + mem::size_of::<Value>() + v.memory_size()) as u32
657}
658
659fn page_cache_weight(k: &PageKey, v: &Arc<PageValue>) -> u32 {
660 (k.estimated_size() + v.estimated_size()) as u32
661}
662
663fn selector_result_cache_weight(k: &SelectorResultKey, v: &Arc<SelectorResultValue>) -> u32 {
664 (mem::size_of_val(k) + v.estimated_size()) as u32
665}
666
667fn update_hit_miss<T>(value: Option<T>, cache_type: &str) -> Option<T> {
669 if value.is_some() {
670 CACHE_HIT.with_label_values(&[cache_type]).inc();
671 } else {
672 CACHE_MISS.with_label_values(&[cache_type]).inc();
673 }
674 value
675}
676
677#[derive(Debug, Clone, PartialEq, Eq, Hash)]
679struct SstMetaKey(RegionId, FileId);
680
681impl SstMetaKey {
682 fn estimated_size(&self) -> usize {
684 mem::size_of::<Self>()
685 }
686}
687
688#[derive(Debug, Clone, PartialEq, Eq, Hash)]
690pub struct ColumnPagePath {
691 region_id: RegionId,
693 file_id: FileId,
695 row_group_idx: usize,
697 column_idx: usize,
699}
700
701#[derive(Debug, Clone, PartialEq, Eq, Hash)]
706pub struct PageKey {
707 file_id: FileId,
709 row_group_idx: usize,
711 ranges: Vec<Range<u64>>,
713}
714
715impl PageKey {
716 pub fn new(file_id: FileId, row_group_idx: usize, ranges: Vec<Range<u64>>) -> PageKey {
718 PageKey {
719 file_id,
720 row_group_idx,
721 ranges,
722 }
723 }
724
725 fn estimated_size(&self) -> usize {
727 mem::size_of::<Self>() + mem::size_of_val(self.ranges.as_slice())
728 }
729}
730
731#[derive(Default)]
734pub struct PageValue {
735 pub compressed: Vec<Bytes>,
737 pub page_size: u64,
739}
740
741impl PageValue {
742 pub fn new(bytes: Vec<Bytes>, page_size: u64) -> PageValue {
744 PageValue {
745 compressed: bytes,
746 page_size,
747 }
748 }
749
750 fn estimated_size(&self) -> usize {
752 mem::size_of::<Self>()
753 + self.page_size as usize
754 + self.compressed.iter().map(mem::size_of_val).sum::<usize>()
755 }
756}
757
758#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
760pub struct SelectorResultKey {
761 pub file_id: FileId,
763 pub row_group_idx: usize,
765 pub selector: TimeSeriesRowSelector,
767}
768
769pub struct SelectorResultValue {
771 pub result: Vec<Batch>,
773 pub projection: Vec<usize>,
775}
776
777impl SelectorResultValue {
778 pub fn new(result: Vec<Batch>, projection: Vec<usize>) -> SelectorResultValue {
780 SelectorResultValue { result, projection }
781 }
782
783 fn estimated_size(&self) -> usize {
785 self.result.iter().map(|batch| batch.memory_size()).sum()
787 }
788}
789
790type SstMetaCache = Cache<SstMetaKey, Arc<ParquetMetaData>>;
792type VectorCache = Cache<(ConcreteDataType, Value), VectorRef>;
796type PageCache = Cache<PageKey, Arc<PageValue>>;
798type SelectorResultCache = Cache<SelectorResultKey, Arc<SelectorResultValue>>;
800
801#[cfg(test)]
802mod tests {
803 use std::sync::Arc;
804
805 use api::v1::index::{BloomFilterMeta, InvertedIndexMetas};
806 use datatypes::vectors::Int64Vector;
807 use puffin::file_metadata::FileMetadata;
808 use store_api::storage::ColumnId;
809
810 use super::*;
811 use crate::cache::index::bloom_filter_index::Tag;
812 use crate::cache::index::result_cache::PredicateKey;
813 use crate::cache::test_util::parquet_meta;
814 use crate::sst::parquet::row_selection::RowGroupSelection;
815
816 #[tokio::test]
817 async fn test_disable_cache() {
818 let cache = CacheManager::default();
819 assert!(cache.sst_meta_cache.is_none());
820 assert!(cache.vector_cache.is_none());
821 assert!(cache.page_cache.is_none());
822
823 let region_id = RegionId::new(1, 1);
824 let file_id = RegionFileId::new(region_id, FileId::random());
825 let metadata = parquet_meta();
826 cache.put_parquet_meta_data(file_id, metadata);
827 assert!(cache.get_parquet_meta_data(file_id).await.is_none());
828
829 let value = Value::Int64(10);
830 let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10]));
831 cache.put_repeated_vector(value.clone(), vector.clone());
832 assert!(
833 cache
834 .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
835 .is_none()
836 );
837
838 let key = PageKey::new(file_id.file_id(), 1, vec![Range { start: 0, end: 5 }]);
839 let pages = Arc::new(PageValue::default());
840 cache.put_pages(key.clone(), pages);
841 assert!(cache.get_pages(&key).is_none());
842
843 assert!(cache.write_cache().is_none());
844 }
845
846 #[tokio::test]
847 async fn test_parquet_meta_cache() {
848 let cache = CacheManager::builder().sst_meta_cache_size(2000).build();
849 let region_id = RegionId::new(1, 1);
850 let file_id = RegionFileId::new(region_id, FileId::random());
851 assert!(cache.get_parquet_meta_data(file_id).await.is_none());
852 let metadata = parquet_meta();
853 cache.put_parquet_meta_data(file_id, metadata);
854 assert!(cache.get_parquet_meta_data(file_id).await.is_some());
855 cache.remove_parquet_meta_data(file_id);
856 assert!(cache.get_parquet_meta_data(file_id).await.is_none());
857 }
858
859 #[test]
860 fn test_repeated_vector_cache() {
861 let cache = CacheManager::builder().vector_cache_size(4096).build();
862 let value = Value::Int64(10);
863 assert!(
864 cache
865 .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
866 .is_none()
867 );
868 let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10]));
869 cache.put_repeated_vector(value.clone(), vector.clone());
870 let cached = cache
871 .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
872 .unwrap();
873 assert_eq!(vector, cached);
874 }
875
876 #[test]
877 fn test_page_cache() {
878 let cache = CacheManager::builder().page_cache_size(1000).build();
879 let file_id = FileId::random();
880 let key = PageKey::new(file_id, 0, vec![(0..10), (10..20)]);
881 assert!(cache.get_pages(&key).is_none());
882 let pages = Arc::new(PageValue::default());
883 cache.put_pages(key.clone(), pages);
884 assert!(cache.get_pages(&key).is_some());
885 }
886
887 #[test]
888 fn test_selector_result_cache() {
889 let cache = CacheManager::builder()
890 .selector_result_cache_size(1000)
891 .build();
892 let file_id = FileId::random();
893 let key = SelectorResultKey {
894 file_id,
895 row_group_idx: 0,
896 selector: TimeSeriesRowSelector::LastRow,
897 };
898 assert!(cache.get_selector_result(&key).is_none());
899 let result = Arc::new(SelectorResultValue::new(Vec::new(), Vec::new()));
900 cache.put_selector_result(key, result);
901 assert!(cache.get_selector_result(&key).is_some());
902 }
903
904 #[tokio::test]
905 async fn test_evict_puffin_cache_clears_all_entries() {
906 use std::collections::{BTreeMap, HashMap};
907
908 let cache = CacheManager::builder()
909 .index_metadata_size(128)
910 .index_content_size(128)
911 .index_content_page_size(64)
912 .index_result_cache_size(128)
913 .puffin_metadata_size(128)
914 .build();
915 let cache = Arc::new(cache);
916
917 let region_id = RegionId::new(1, 1);
918 let region_file_id = RegionFileId::new(region_id, FileId::random());
919 let column_id: ColumnId = 1;
920
921 let bloom_cache = cache.bloom_filter_index_cache().unwrap().clone();
922 let inverted_cache = cache.inverted_index_cache().unwrap().clone();
923 let result_cache = cache.index_result_cache().unwrap();
924 let puffin_metadata_cache = cache.puffin_metadata_cache().unwrap().clone();
925
926 let bloom_key = (region_file_id.file_id(), column_id, Tag::Skipping);
927 bloom_cache.put_metadata(bloom_key, Arc::new(BloomFilterMeta::default()));
928 inverted_cache.put_metadata(
929 region_file_id.file_id(),
930 Arc::new(InvertedIndexMetas::default()),
931 );
932 let predicate = PredicateKey::new_bloom(Arc::new(BTreeMap::new()));
933 let selection = Arc::new(RowGroupSelection::default());
934 result_cache.put(predicate.clone(), region_file_id.file_id(), selection);
935 let file_id_str = region_file_id.to_string();
936 let metadata = Arc::new(FileMetadata {
937 blobs: Vec::new(),
938 properties: HashMap::new(),
939 });
940 puffin_metadata_cache.put_metadata(file_id_str.clone(), metadata);
941
942 assert!(bloom_cache.get_metadata(bloom_key).is_some());
943 assert!(
944 inverted_cache
945 .get_metadata(region_file_id.file_id())
946 .is_some()
947 );
948 assert!(
949 result_cache
950 .get(&predicate, region_file_id.file_id())
951 .is_some()
952 );
953 assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_some());
954
955 cache.evict_puffin_cache(region_file_id).await;
956
957 assert!(bloom_cache.get_metadata(bloom_key).is_none());
958 assert!(
959 inverted_cache
960 .get_metadata(region_file_id.file_id())
961 .is_none()
962 );
963 assert!(
964 result_cache
965 .get(&predicate, region_file_id.file_id())
966 .is_none()
967 );
968 assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_none());
969
970 bloom_cache.put_metadata(bloom_key, Arc::new(BloomFilterMeta::default()));
972 inverted_cache.put_metadata(
973 region_file_id.file_id(),
974 Arc::new(InvertedIndexMetas::default()),
975 );
976 result_cache.put(
977 predicate.clone(),
978 region_file_id.file_id(),
979 Arc::new(RowGroupSelection::default()),
980 );
981 puffin_metadata_cache.put_metadata(
982 file_id_str.clone(),
983 Arc::new(FileMetadata {
984 blobs: Vec::new(),
985 properties: HashMap::new(),
986 }),
987 );
988
989 let strategy = CacheStrategy::EnableAll(cache.clone());
990 strategy.evict_puffin_cache(region_file_id).await;
991
992 assert!(bloom_cache.get_metadata(bloom_key).is_none());
993 assert!(
994 inverted_cache
995 .get_metadata(region_file_id.file_id())
996 .is_none()
997 );
998 assert!(
999 result_cache
1000 .get(&predicate, region_file_id.file_id())
1001 .is_none()
1002 );
1003 assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_none());
1004 }
1005}