1mod cache_size;
18
19pub(crate) mod file_cache;
20pub(crate) mod index;
21pub(crate) mod manifest_cache;
22#[cfg(test)]
23pub(crate) mod test_util;
24pub(crate) mod write_cache;
25
26use std::mem;
27use std::ops::Range;
28use std::sync::Arc;
29
30use bytes::Bytes;
31use datatypes::value::Value;
32use datatypes::vectors::VectorRef;
33use index::bloom_filter_index::{BloomFilterIndexCache, BloomFilterIndexCacheRef};
34use index::result_cache::IndexResultCache;
35use moka::notification::RemovalCause;
36use moka::sync::Cache;
37use object_store::ObjectStore;
38use parquet::file::metadata::ParquetMetaData;
39use puffin::puffin_manager::cache::{PuffinMetadataCache, PuffinMetadataCacheRef};
40use store_api::storage::{ConcreteDataType, FileId, RegionId, TimeSeriesRowSelector};
41
42use crate::cache::cache_size::parquet_meta_size;
43use crate::cache::file_cache::{FileType, IndexKey};
44use crate::cache::index::inverted_index::{InvertedIndexCache, InvertedIndexCacheRef};
45use crate::cache::write_cache::WriteCacheRef;
46use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS};
47use crate::read::Batch;
48use crate::sst::file::{RegionFileId, RegionIndexId};
49use crate::sst::parquet::reader::MetadataCacheMetrics;
50
51const SST_META_TYPE: &str = "sst_meta";
53const VECTOR_TYPE: &str = "vector";
55const PAGE_TYPE: &str = "page";
57const FILE_TYPE: &str = "file";
59const INDEX_TYPE: &str = "index";
61const SELECTOR_RESULT_TYPE: &str = "selector_result";
63
64#[derive(Clone)]
66pub enum CacheStrategy {
67 EnableAll(CacheManagerRef),
70 Compaction(CacheManagerRef),
75 Disabled,
77}
78
79impl CacheStrategy {
80 pub(crate) async fn get_parquet_meta_data(
83 &self,
84 file_id: RegionFileId,
85 metrics: &mut MetadataCacheMetrics,
86 ) -> Option<Arc<ParquetMetaData>> {
87 match self {
88 CacheStrategy::EnableAll(cache_manager) => {
89 cache_manager.get_parquet_meta_data(file_id, metrics).await
90 }
91 CacheStrategy::Compaction(cache_manager) => {
92 cache_manager.get_parquet_meta_data(file_id, metrics).await
93 }
94 CacheStrategy::Disabled => {
95 metrics.cache_miss += 1;
96 None
97 }
98 }
99 }
100
101 pub fn get_parquet_meta_data_from_mem_cache(
103 &self,
104 file_id: RegionFileId,
105 ) -> Option<Arc<ParquetMetaData>> {
106 match self {
107 CacheStrategy::EnableAll(cache_manager) => {
108 cache_manager.get_parquet_meta_data_from_mem_cache(file_id)
109 }
110 CacheStrategy::Compaction(cache_manager) => {
111 cache_manager.get_parquet_meta_data_from_mem_cache(file_id)
112 }
113 CacheStrategy::Disabled => None,
114 }
115 }
116
117 pub fn put_parquet_meta_data(&self, file_id: RegionFileId, metadata: Arc<ParquetMetaData>) {
119 match self {
120 CacheStrategy::EnableAll(cache_manager) => {
121 cache_manager.put_parquet_meta_data(file_id, metadata);
122 }
123 CacheStrategy::Compaction(cache_manager) => {
124 cache_manager.put_parquet_meta_data(file_id, metadata);
125 }
126 CacheStrategy::Disabled => {}
127 }
128 }
129
130 pub fn remove_parquet_meta_data(&self, file_id: RegionFileId) {
132 match self {
133 CacheStrategy::EnableAll(cache_manager) => {
134 cache_manager.remove_parquet_meta_data(file_id);
135 }
136 CacheStrategy::Compaction(cache_manager) => {
137 cache_manager.remove_parquet_meta_data(file_id);
138 }
139 CacheStrategy::Disabled => {}
140 }
141 }
142
143 pub fn get_repeated_vector(
146 &self,
147 data_type: &ConcreteDataType,
148 value: &Value,
149 ) -> Option<VectorRef> {
150 match self {
151 CacheStrategy::EnableAll(cache_manager) => {
152 cache_manager.get_repeated_vector(data_type, value)
153 }
154 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
155 }
156 }
157
158 pub fn put_repeated_vector(&self, value: Value, vector: VectorRef) {
161 if let CacheStrategy::EnableAll(cache_manager) = self {
162 cache_manager.put_repeated_vector(value, vector);
163 }
164 }
165
166 pub fn get_pages(&self, page_key: &PageKey) -> Option<Arc<PageValue>> {
169 match self {
170 CacheStrategy::EnableAll(cache_manager) => cache_manager.get_pages(page_key),
171 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
172 }
173 }
174
175 pub fn put_pages(&self, page_key: PageKey, pages: Arc<PageValue>) {
178 if let CacheStrategy::EnableAll(cache_manager) = self {
179 cache_manager.put_pages(page_key, pages);
180 }
181 }
182
183 pub async fn evict_puffin_cache(&self, file_id: RegionIndexId) {
185 match self {
186 CacheStrategy::EnableAll(cache_manager) => {
187 cache_manager.evict_puffin_cache(file_id).await
188 }
189 CacheStrategy::Compaction(cache_manager) => {
190 cache_manager.evict_puffin_cache(file_id).await
191 }
192 CacheStrategy::Disabled => {}
193 }
194 }
195
196 pub fn get_selector_result(
199 &self,
200 selector_key: &SelectorResultKey,
201 ) -> Option<Arc<SelectorResultValue>> {
202 match self {
203 CacheStrategy::EnableAll(cache_manager) => {
204 cache_manager.get_selector_result(selector_key)
205 }
206 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
207 }
208 }
209
210 pub fn put_selector_result(
213 &self,
214 selector_key: SelectorResultKey,
215 result: Arc<SelectorResultValue>,
216 ) {
217 if let CacheStrategy::EnableAll(cache_manager) = self {
218 cache_manager.put_selector_result(selector_key, result);
219 }
220 }
221
222 pub fn write_cache(&self) -> Option<&WriteCacheRef> {
225 match self {
226 CacheStrategy::EnableAll(cache_manager) => cache_manager.write_cache(),
227 CacheStrategy::Compaction(cache_manager) => cache_manager.write_cache(),
228 CacheStrategy::Disabled => None,
229 }
230 }
231
232 pub fn inverted_index_cache(&self) -> Option<&InvertedIndexCacheRef> {
235 match self {
236 CacheStrategy::EnableAll(cache_manager) => cache_manager.inverted_index_cache(),
237 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
238 }
239 }
240
241 pub fn bloom_filter_index_cache(&self) -> Option<&BloomFilterIndexCacheRef> {
244 match self {
245 CacheStrategy::EnableAll(cache_manager) => cache_manager.bloom_filter_index_cache(),
246 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
247 }
248 }
249
250 pub fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
253 match self {
254 CacheStrategy::EnableAll(cache_manager) => cache_manager.puffin_metadata_cache(),
255 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
256 }
257 }
258
259 pub fn index_result_cache(&self) -> Option<&IndexResultCache> {
262 match self {
263 CacheStrategy::EnableAll(cache_manager) => cache_manager.index_result_cache(),
264 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
265 }
266 }
267
268 pub fn maybe_download_background(
270 &self,
271 index_key: IndexKey,
272 remote_path: String,
273 remote_store: ObjectStore,
274 file_size: u64,
275 ) {
276 if let CacheStrategy::EnableAll(cache_manager) = self
277 && let Some(write_cache) = cache_manager.write_cache()
278 {
279 write_cache.file_cache().maybe_download_background(
280 index_key,
281 remote_path,
282 remote_store,
283 file_size,
284 );
285 }
286 }
287}
288
289#[derive(Default)]
293pub struct CacheManager {
294 sst_meta_cache: Option<SstMetaCache>,
296 vector_cache: Option<VectorCache>,
298 page_cache: Option<PageCache>,
300 write_cache: Option<WriteCacheRef>,
302 inverted_index_cache: Option<InvertedIndexCacheRef>,
304 bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
306 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
308 selector_result_cache: Option<SelectorResultCache>,
310 index_result_cache: Option<IndexResultCache>,
312}
313
314pub type CacheManagerRef = Arc<CacheManager>;
315
316impl CacheManager {
317 pub fn builder() -> CacheManagerBuilder {
319 CacheManagerBuilder::default()
320 }
321
322 pub(crate) async fn get_parquet_meta_data(
325 &self,
326 file_id: RegionFileId,
327 metrics: &mut MetadataCacheMetrics,
328 ) -> Option<Arc<ParquetMetaData>> {
329 if let Some(metadata) = self.get_parquet_meta_data_from_mem_cache(file_id) {
331 metrics.mem_cache_hit += 1;
332 return Some(metadata);
333 }
334
335 let key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Parquet);
337 if let Some(write_cache) = &self.write_cache
338 && let Some(metadata) = write_cache.file_cache().get_parquet_meta_data(key).await
339 {
340 metrics.file_cache_hit += 1;
341 let metadata = Arc::new(metadata);
342 self.put_parquet_meta_data(file_id, metadata.clone());
344 return Some(metadata);
345 };
346 metrics.cache_miss += 1;
347
348 None
349 }
350
351 pub fn get_parquet_meta_data_from_mem_cache(
354 &self,
355 file_id: RegionFileId,
356 ) -> Option<Arc<ParquetMetaData>> {
357 self.sst_meta_cache.as_ref().and_then(|sst_meta_cache| {
359 let value = sst_meta_cache.get(&SstMetaKey(file_id.region_id(), file_id.file_id()));
360 update_hit_miss(value, SST_META_TYPE)
361 })
362 }
363
364 pub fn put_parquet_meta_data(&self, file_id: RegionFileId, metadata: Arc<ParquetMetaData>) {
366 if let Some(cache) = &self.sst_meta_cache {
367 let key = SstMetaKey(file_id.region_id(), file_id.file_id());
368 CACHE_BYTES
369 .with_label_values(&[SST_META_TYPE])
370 .add(meta_cache_weight(&key, &metadata).into());
371 cache.insert(key, metadata);
372 }
373 }
374
375 pub fn remove_parquet_meta_data(&self, file_id: RegionFileId) {
377 if let Some(cache) = &self.sst_meta_cache {
378 cache.remove(&SstMetaKey(file_id.region_id(), file_id.file_id()));
379 }
380 }
381
382 pub fn get_repeated_vector(
384 &self,
385 data_type: &ConcreteDataType,
386 value: &Value,
387 ) -> Option<VectorRef> {
388 self.vector_cache.as_ref().and_then(|vector_cache| {
389 let value = vector_cache.get(&(data_type.clone(), value.clone()));
390 update_hit_miss(value, VECTOR_TYPE)
391 })
392 }
393
394 pub fn put_repeated_vector(&self, value: Value, vector: VectorRef) {
396 if let Some(cache) = &self.vector_cache {
397 let key = (vector.data_type(), value);
398 CACHE_BYTES
399 .with_label_values(&[VECTOR_TYPE])
400 .add(vector_cache_weight(&key, &vector).into());
401 cache.insert(key, vector);
402 }
403 }
404
405 pub fn get_pages(&self, page_key: &PageKey) -> Option<Arc<PageValue>> {
407 self.page_cache.as_ref().and_then(|page_cache| {
408 let value = page_cache.get(page_key);
409 update_hit_miss(value, PAGE_TYPE)
410 })
411 }
412
413 pub fn put_pages(&self, page_key: PageKey, pages: Arc<PageValue>) {
415 if let Some(cache) = &self.page_cache {
416 CACHE_BYTES
417 .with_label_values(&[PAGE_TYPE])
418 .add(page_cache_weight(&page_key, &pages).into());
419 cache.insert(page_key, pages);
420 }
421 }
422
423 pub async fn evict_puffin_cache(&self, file_id: RegionIndexId) {
425 if let Some(cache) = &self.bloom_filter_index_cache {
426 cache.invalidate_file(file_id.file_id());
427 }
428
429 if let Some(cache) = &self.inverted_index_cache {
430 cache.invalidate_file(file_id.file_id());
431 }
432
433 if let Some(cache) = &self.index_result_cache {
434 cache.invalidate_file(file_id.file_id());
435 }
436
437 if let Some(cache) = &self.puffin_metadata_cache {
438 cache.remove(&file_id.to_string());
439 }
440
441 if let Some(write_cache) = &self.write_cache {
442 write_cache
443 .remove(IndexKey::new(
444 file_id.region_id(),
445 file_id.file_id(),
446 FileType::Puffin(file_id.version),
447 ))
448 .await;
449 }
450 }
451
452 pub fn get_selector_result(
454 &self,
455 selector_key: &SelectorResultKey,
456 ) -> Option<Arc<SelectorResultValue>> {
457 self.selector_result_cache
458 .as_ref()
459 .and_then(|selector_result_cache| selector_result_cache.get(selector_key))
460 }
461
462 pub fn put_selector_result(
464 &self,
465 selector_key: SelectorResultKey,
466 result: Arc<SelectorResultValue>,
467 ) {
468 if let Some(cache) = &self.selector_result_cache {
469 CACHE_BYTES
470 .with_label_values(&[SELECTOR_RESULT_TYPE])
471 .add(selector_result_cache_weight(&selector_key, &result).into());
472 cache.insert(selector_key, result);
473 }
474 }
475
476 pub(crate) fn write_cache(&self) -> Option<&WriteCacheRef> {
478 self.write_cache.as_ref()
479 }
480
481 pub(crate) fn inverted_index_cache(&self) -> Option<&InvertedIndexCacheRef> {
482 self.inverted_index_cache.as_ref()
483 }
484
485 pub(crate) fn bloom_filter_index_cache(&self) -> Option<&BloomFilterIndexCacheRef> {
486 self.bloom_filter_index_cache.as_ref()
487 }
488
489 pub(crate) fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
490 self.puffin_metadata_cache.as_ref()
491 }
492
493 pub(crate) fn index_result_cache(&self) -> Option<&IndexResultCache> {
494 self.index_result_cache.as_ref()
495 }
496}
497
498pub fn selector_result_cache_miss() {
500 CACHE_MISS.with_label_values(&[SELECTOR_RESULT_TYPE]).inc()
501}
502
503pub fn selector_result_cache_hit() {
505 CACHE_HIT.with_label_values(&[SELECTOR_RESULT_TYPE]).inc()
506}
507
508#[derive(Default)]
510pub struct CacheManagerBuilder {
511 sst_meta_cache_size: u64,
512 vector_cache_size: u64,
513 page_cache_size: u64,
514 index_metadata_size: u64,
515 index_content_size: u64,
516 index_content_page_size: u64,
517 index_result_cache_size: u64,
518 puffin_metadata_size: u64,
519 write_cache: Option<WriteCacheRef>,
520 selector_result_cache_size: u64,
521}
522
523impl CacheManagerBuilder {
524 pub fn sst_meta_cache_size(mut self, bytes: u64) -> Self {
526 self.sst_meta_cache_size = bytes;
527 self
528 }
529
530 pub fn vector_cache_size(mut self, bytes: u64) -> Self {
532 self.vector_cache_size = bytes;
533 self
534 }
535
536 pub fn page_cache_size(mut self, bytes: u64) -> Self {
538 self.page_cache_size = bytes;
539 self
540 }
541
542 pub fn write_cache(mut self, cache: Option<WriteCacheRef>) -> Self {
544 self.write_cache = cache;
545 self
546 }
547
548 pub fn index_metadata_size(mut self, bytes: u64) -> Self {
550 self.index_metadata_size = bytes;
551 self
552 }
553
554 pub fn index_content_size(mut self, bytes: u64) -> Self {
556 self.index_content_size = bytes;
557 self
558 }
559
560 pub fn index_content_page_size(mut self, bytes: u64) -> Self {
562 self.index_content_page_size = bytes;
563 self
564 }
565
566 pub fn index_result_cache_size(mut self, bytes: u64) -> Self {
568 self.index_result_cache_size = bytes;
569 self
570 }
571
572 pub fn puffin_metadata_size(mut self, bytes: u64) -> Self {
574 self.puffin_metadata_size = bytes;
575 self
576 }
577
578 pub fn selector_result_cache_size(mut self, bytes: u64) -> Self {
580 self.selector_result_cache_size = bytes;
581 self
582 }
583
584 pub fn build(self) -> CacheManager {
586 fn to_str(cause: RemovalCause) -> &'static str {
587 match cause {
588 RemovalCause::Expired => "expired",
589 RemovalCause::Explicit => "explicit",
590 RemovalCause::Replaced => "replaced",
591 RemovalCause::Size => "size",
592 }
593 }
594
595 let sst_meta_cache = (self.sst_meta_cache_size != 0).then(|| {
596 Cache::builder()
597 .max_capacity(self.sst_meta_cache_size)
598 .weigher(meta_cache_weight)
599 .eviction_listener(|k, v, cause| {
600 let size = meta_cache_weight(&k, &v);
601 CACHE_BYTES
602 .with_label_values(&[SST_META_TYPE])
603 .sub(size.into());
604 CACHE_EVICTION
605 .with_label_values(&[SST_META_TYPE, to_str(cause)])
606 .inc();
607 })
608 .build()
609 });
610 let vector_cache = (self.vector_cache_size != 0).then(|| {
611 Cache::builder()
612 .max_capacity(self.vector_cache_size)
613 .weigher(vector_cache_weight)
614 .eviction_listener(|k, v, cause| {
615 let size = vector_cache_weight(&k, &v);
616 CACHE_BYTES
617 .with_label_values(&[VECTOR_TYPE])
618 .sub(size.into());
619 CACHE_EVICTION
620 .with_label_values(&[VECTOR_TYPE, to_str(cause)])
621 .inc();
622 })
623 .build()
624 });
625 let page_cache = (self.page_cache_size != 0).then(|| {
626 Cache::builder()
627 .max_capacity(self.page_cache_size)
628 .weigher(page_cache_weight)
629 .eviction_listener(|k, v, cause| {
630 let size = page_cache_weight(&k, &v);
631 CACHE_BYTES.with_label_values(&[PAGE_TYPE]).sub(size.into());
632 CACHE_EVICTION
633 .with_label_values(&[PAGE_TYPE, to_str(cause)])
634 .inc();
635 })
636 .build()
637 });
638 let inverted_index_cache = InvertedIndexCache::new(
639 self.index_metadata_size,
640 self.index_content_size,
641 self.index_content_page_size,
642 );
643 let bloom_filter_index_cache = BloomFilterIndexCache::new(
645 self.index_metadata_size,
646 self.index_content_size,
647 self.index_content_page_size,
648 );
649 let index_result_cache = (self.index_result_cache_size != 0)
650 .then(|| IndexResultCache::new(self.index_result_cache_size));
651 let puffin_metadata_cache =
652 PuffinMetadataCache::new(self.puffin_metadata_size, &CACHE_BYTES);
653 let selector_result_cache = (self.selector_result_cache_size != 0).then(|| {
654 Cache::builder()
655 .max_capacity(self.selector_result_cache_size)
656 .weigher(selector_result_cache_weight)
657 .eviction_listener(|k, v, cause| {
658 let size = selector_result_cache_weight(&k, &v);
659 CACHE_BYTES
660 .with_label_values(&[SELECTOR_RESULT_TYPE])
661 .sub(size.into());
662 CACHE_EVICTION
663 .with_label_values(&[SELECTOR_RESULT_TYPE, to_str(cause)])
664 .inc();
665 })
666 .build()
667 });
668 CacheManager {
669 sst_meta_cache,
670 vector_cache,
671 page_cache,
672 write_cache: self.write_cache,
673 inverted_index_cache: Some(Arc::new(inverted_index_cache)),
674 bloom_filter_index_cache: Some(Arc::new(bloom_filter_index_cache)),
675 puffin_metadata_cache: Some(Arc::new(puffin_metadata_cache)),
676 selector_result_cache,
677 index_result_cache,
678 }
679 }
680}
681
682fn meta_cache_weight(k: &SstMetaKey, v: &Arc<ParquetMetaData>) -> u32 {
683 (k.estimated_size() + parquet_meta_size(v)) as u32
685}
686
687fn vector_cache_weight(_k: &(ConcreteDataType, Value), v: &VectorRef) -> u32 {
688 (mem::size_of::<ConcreteDataType>() + mem::size_of::<Value>() + v.memory_size()) as u32
690}
691
692fn page_cache_weight(k: &PageKey, v: &Arc<PageValue>) -> u32 {
693 (k.estimated_size() + v.estimated_size()) as u32
694}
695
696fn selector_result_cache_weight(k: &SelectorResultKey, v: &Arc<SelectorResultValue>) -> u32 {
697 (mem::size_of_val(k) + v.estimated_size()) as u32
698}
699
700fn update_hit_miss<T>(value: Option<T>, cache_type: &str) -> Option<T> {
702 if value.is_some() {
703 CACHE_HIT.with_label_values(&[cache_type]).inc();
704 } else {
705 CACHE_MISS.with_label_values(&[cache_type]).inc();
706 }
707 value
708}
709
710#[derive(Debug, Clone, PartialEq, Eq, Hash)]
712struct SstMetaKey(RegionId, FileId);
713
714impl SstMetaKey {
715 fn estimated_size(&self) -> usize {
717 mem::size_of::<Self>()
718 }
719}
720
721#[derive(Debug, Clone, PartialEq, Eq, Hash)]
723pub struct ColumnPagePath {
724 region_id: RegionId,
726 file_id: FileId,
728 row_group_idx: usize,
730 column_idx: usize,
732}
733
734#[derive(Debug, Clone, PartialEq, Eq, Hash)]
739pub struct PageKey {
740 file_id: FileId,
742 row_group_idx: usize,
744 ranges: Vec<Range<u64>>,
746}
747
748impl PageKey {
749 pub fn new(file_id: FileId, row_group_idx: usize, ranges: Vec<Range<u64>>) -> PageKey {
751 PageKey {
752 file_id,
753 row_group_idx,
754 ranges,
755 }
756 }
757
758 fn estimated_size(&self) -> usize {
760 mem::size_of::<Self>() + mem::size_of_val(self.ranges.as_slice())
761 }
762}
763
764#[derive(Default)]
767pub struct PageValue {
768 pub compressed: Vec<Bytes>,
770 pub page_size: u64,
772}
773
774impl PageValue {
775 pub fn new(bytes: Vec<Bytes>, page_size: u64) -> PageValue {
777 PageValue {
778 compressed: bytes,
779 page_size,
780 }
781 }
782
783 fn estimated_size(&self) -> usize {
785 mem::size_of::<Self>()
786 + self.page_size as usize
787 + self.compressed.iter().map(mem::size_of_val).sum::<usize>()
788 }
789}
790
791#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
793pub struct SelectorResultKey {
794 pub file_id: FileId,
796 pub row_group_idx: usize,
798 pub selector: TimeSeriesRowSelector,
800}
801
802pub struct SelectorResultValue {
804 pub result: Vec<Batch>,
806 pub projection: Vec<usize>,
808}
809
810impl SelectorResultValue {
811 pub fn new(result: Vec<Batch>, projection: Vec<usize>) -> SelectorResultValue {
813 SelectorResultValue { result, projection }
814 }
815
816 fn estimated_size(&self) -> usize {
818 self.result.iter().map(|batch| batch.memory_size()).sum()
820 }
821}
822
823type SstMetaCache = Cache<SstMetaKey, Arc<ParquetMetaData>>;
825type VectorCache = Cache<(ConcreteDataType, Value), VectorRef>;
829type PageCache = Cache<PageKey, Arc<PageValue>>;
831type SelectorResultCache = Cache<SelectorResultKey, Arc<SelectorResultValue>>;
833
834#[cfg(test)]
835mod tests {
836 use std::sync::Arc;
837
838 use api::v1::index::{BloomFilterMeta, InvertedIndexMetas};
839 use datatypes::vectors::Int64Vector;
840 use puffin::file_metadata::FileMetadata;
841 use store_api::storage::ColumnId;
842
843 use super::*;
844 use crate::cache::index::bloom_filter_index::Tag;
845 use crate::cache::index::result_cache::PredicateKey;
846 use crate::cache::test_util::parquet_meta;
847 use crate::sst::parquet::row_selection::RowGroupSelection;
848
849 #[tokio::test]
850 async fn test_disable_cache() {
851 let cache = CacheManager::default();
852 assert!(cache.sst_meta_cache.is_none());
853 assert!(cache.vector_cache.is_none());
854 assert!(cache.page_cache.is_none());
855
856 let region_id = RegionId::new(1, 1);
857 let file_id = RegionFileId::new(region_id, FileId::random());
858 let metadata = parquet_meta();
859 let mut metrics = MetadataCacheMetrics::default();
860 cache.put_parquet_meta_data(file_id, metadata);
861 assert!(
862 cache
863 .get_parquet_meta_data(file_id, &mut metrics)
864 .await
865 .is_none()
866 );
867
868 let value = Value::Int64(10);
869 let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10]));
870 cache.put_repeated_vector(value.clone(), vector.clone());
871 assert!(
872 cache
873 .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
874 .is_none()
875 );
876
877 let key = PageKey::new(file_id.file_id(), 1, vec![Range { start: 0, end: 5 }]);
878 let pages = Arc::new(PageValue::default());
879 cache.put_pages(key.clone(), pages);
880 assert!(cache.get_pages(&key).is_none());
881
882 assert!(cache.write_cache().is_none());
883 }
884
885 #[tokio::test]
886 async fn test_parquet_meta_cache() {
887 let cache = CacheManager::builder().sst_meta_cache_size(2000).build();
888 let mut metrics = MetadataCacheMetrics::default();
889 let region_id = RegionId::new(1, 1);
890 let file_id = RegionFileId::new(region_id, FileId::random());
891 assert!(
892 cache
893 .get_parquet_meta_data(file_id, &mut metrics)
894 .await
895 .is_none()
896 );
897 let metadata = parquet_meta();
898 cache.put_parquet_meta_data(file_id, metadata);
899 assert!(
900 cache
901 .get_parquet_meta_data(file_id, &mut metrics)
902 .await
903 .is_some()
904 );
905 cache.remove_parquet_meta_data(file_id);
906 assert!(
907 cache
908 .get_parquet_meta_data(file_id, &mut metrics)
909 .await
910 .is_none()
911 );
912 }
913
914 #[test]
915 fn test_repeated_vector_cache() {
916 let cache = CacheManager::builder().vector_cache_size(4096).build();
917 let value = Value::Int64(10);
918 assert!(
919 cache
920 .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
921 .is_none()
922 );
923 let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10]));
924 cache.put_repeated_vector(value.clone(), vector.clone());
925 let cached = cache
926 .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
927 .unwrap();
928 assert_eq!(vector, cached);
929 }
930
931 #[test]
932 fn test_page_cache() {
933 let cache = CacheManager::builder().page_cache_size(1000).build();
934 let file_id = FileId::random();
935 let key = PageKey::new(file_id, 0, vec![(0..10), (10..20)]);
936 assert!(cache.get_pages(&key).is_none());
937 let pages = Arc::new(PageValue::default());
938 cache.put_pages(key.clone(), pages);
939 assert!(cache.get_pages(&key).is_some());
940 }
941
942 #[test]
943 fn test_selector_result_cache() {
944 let cache = CacheManager::builder()
945 .selector_result_cache_size(1000)
946 .build();
947 let file_id = FileId::random();
948 let key = SelectorResultKey {
949 file_id,
950 row_group_idx: 0,
951 selector: TimeSeriesRowSelector::LastRow,
952 };
953 assert!(cache.get_selector_result(&key).is_none());
954 let result = Arc::new(SelectorResultValue::new(Vec::new(), Vec::new()));
955 cache.put_selector_result(key, result);
956 assert!(cache.get_selector_result(&key).is_some());
957 }
958
959 #[tokio::test]
960 async fn test_evict_puffin_cache_clears_all_entries() {
961 use std::collections::{BTreeMap, HashMap};
962
963 let cache = CacheManager::builder()
964 .index_metadata_size(128)
965 .index_content_size(128)
966 .index_content_page_size(64)
967 .index_result_cache_size(128)
968 .puffin_metadata_size(128)
969 .build();
970 let cache = Arc::new(cache);
971
972 let region_id = RegionId::new(1, 1);
973 let index_id = RegionIndexId::new(RegionFileId::new(region_id, FileId::random()), 0);
974 let column_id: ColumnId = 1;
975
976 let bloom_cache = cache.bloom_filter_index_cache().unwrap().clone();
977 let inverted_cache = cache.inverted_index_cache().unwrap().clone();
978 let result_cache = cache.index_result_cache().unwrap();
979 let puffin_metadata_cache = cache.puffin_metadata_cache().unwrap().clone();
980
981 let bloom_key = (
982 index_id.file_id(),
983 index_id.version,
984 column_id,
985 Tag::Skipping,
986 );
987 bloom_cache.put_metadata(bloom_key, Arc::new(BloomFilterMeta::default()));
988 inverted_cache.put_metadata(
989 (index_id.file_id(), index_id.version),
990 Arc::new(InvertedIndexMetas::default()),
991 );
992 let predicate = PredicateKey::new_bloom(Arc::new(BTreeMap::new()));
993 let selection = Arc::new(RowGroupSelection::default());
994 result_cache.put(predicate.clone(), index_id.file_id(), selection);
995 let file_id_str = index_id.to_string();
996 let metadata = Arc::new(FileMetadata {
997 blobs: Vec::new(),
998 properties: HashMap::new(),
999 });
1000 puffin_metadata_cache.put_metadata(file_id_str.clone(), metadata);
1001
1002 assert!(bloom_cache.get_metadata(bloom_key).is_some());
1003 assert!(
1004 inverted_cache
1005 .get_metadata((index_id.file_id(), index_id.version))
1006 .is_some()
1007 );
1008 assert!(result_cache.get(&predicate, index_id.file_id()).is_some());
1009 assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_some());
1010
1011 cache.evict_puffin_cache(index_id).await;
1012
1013 assert!(bloom_cache.get_metadata(bloom_key).is_none());
1014 assert!(
1015 inverted_cache
1016 .get_metadata((index_id.file_id(), index_id.version))
1017 .is_none()
1018 );
1019 assert!(result_cache.get(&predicate, index_id.file_id()).is_none());
1020 assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_none());
1021
1022 bloom_cache.put_metadata(bloom_key, Arc::new(BloomFilterMeta::default()));
1024 inverted_cache.put_metadata(
1025 (index_id.file_id(), index_id.version),
1026 Arc::new(InvertedIndexMetas::default()),
1027 );
1028 result_cache.put(
1029 predicate.clone(),
1030 index_id.file_id(),
1031 Arc::new(RowGroupSelection::default()),
1032 );
1033 puffin_metadata_cache.put_metadata(
1034 file_id_str.clone(),
1035 Arc::new(FileMetadata {
1036 blobs: Vec::new(),
1037 properties: HashMap::new(),
1038 }),
1039 );
1040
1041 let strategy = CacheStrategy::EnableAll(cache.clone());
1042 strategy.evict_puffin_cache(index_id).await;
1043
1044 assert!(bloom_cache.get_metadata(bloom_key).is_none());
1045 assert!(
1046 inverted_cache
1047 .get_metadata((index_id.file_id(), index_id.version))
1048 .is_none()
1049 );
1050 assert!(result_cache.get(&predicate, index_id.file_id()).is_none());
1051 assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_none());
1052 }
1053}