1mod cache_size;
18
19pub(crate) mod file_cache;
20pub(crate) mod index;
21#[cfg(test)]
22pub(crate) mod test_util;
23pub(crate) mod write_cache;
24
25use std::mem;
26use std::sync::Arc;
27
28use bytes::Bytes;
29use datatypes::value::Value;
30use datatypes::vectors::VectorRef;
31use index::bloom_filter_index::{BloomFilterIndexCache, BloomFilterIndexCacheRef};
32use index::result_cache::IndexResultCache;
33use moka::notification::RemovalCause;
34use moka::sync::Cache;
35use parquet::column::page::Page;
36use parquet::file::metadata::ParquetMetaData;
37use puffin::puffin_manager::cache::{PuffinMetadataCache, PuffinMetadataCacheRef};
38use store_api::storage::{ConcreteDataType, RegionId, TimeSeriesRowSelector};
39
40use crate::cache::cache_size::parquet_meta_size;
41use crate::cache::file_cache::{FileType, IndexKey};
42use crate::cache::index::inverted_index::{InvertedIndexCache, InvertedIndexCacheRef};
43use crate::cache::write_cache::WriteCacheRef;
44use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS};
45use crate::read::Batch;
46use crate::sst::file::{FileId, RegionFileId};
47
48const SST_META_TYPE: &str = "sst_meta";
50const VECTOR_TYPE: &str = "vector";
52const PAGE_TYPE: &str = "page";
54const FILE_TYPE: &str = "file";
56const SELECTOR_RESULT_TYPE: &str = "selector_result";
58
59#[derive(Clone)]
61pub enum CacheStrategy {
62 EnableAll(CacheManagerRef),
65 Compaction(CacheManagerRef),
70 Disabled,
72}
73
74impl CacheStrategy {
75 pub async fn get_parquet_meta_data(
77 &self,
78 file_id: RegionFileId,
79 ) -> Option<Arc<ParquetMetaData>> {
80 match self {
81 CacheStrategy::EnableAll(cache_manager) => {
82 cache_manager.get_parquet_meta_data(file_id).await
83 }
84 CacheStrategy::Compaction(cache_manager) => {
85 cache_manager.get_parquet_meta_data(file_id).await
86 }
87 CacheStrategy::Disabled => None,
88 }
89 }
90
91 pub fn get_parquet_meta_data_from_mem_cache(
93 &self,
94 file_id: RegionFileId,
95 ) -> Option<Arc<ParquetMetaData>> {
96 match self {
97 CacheStrategy::EnableAll(cache_manager) => {
98 cache_manager.get_parquet_meta_data_from_mem_cache(file_id)
99 }
100 CacheStrategy::Compaction(cache_manager) => {
101 cache_manager.get_parquet_meta_data_from_mem_cache(file_id)
102 }
103 CacheStrategy::Disabled => None,
104 }
105 }
106
107 pub fn put_parquet_meta_data(&self, file_id: RegionFileId, metadata: Arc<ParquetMetaData>) {
109 match self {
110 CacheStrategy::EnableAll(cache_manager) => {
111 cache_manager.put_parquet_meta_data(file_id, metadata);
112 }
113 CacheStrategy::Compaction(cache_manager) => {
114 cache_manager.put_parquet_meta_data(file_id, metadata);
115 }
116 CacheStrategy::Disabled => {}
117 }
118 }
119
120 pub fn remove_parquet_meta_data(&self, file_id: RegionFileId) {
122 match self {
123 CacheStrategy::EnableAll(cache_manager) => {
124 cache_manager.remove_parquet_meta_data(file_id);
125 }
126 CacheStrategy::Compaction(cache_manager) => {
127 cache_manager.remove_parquet_meta_data(file_id);
128 }
129 CacheStrategy::Disabled => {}
130 }
131 }
132
133 pub fn get_repeated_vector(
136 &self,
137 data_type: &ConcreteDataType,
138 value: &Value,
139 ) -> Option<VectorRef> {
140 match self {
141 CacheStrategy::EnableAll(cache_manager) => {
142 cache_manager.get_repeated_vector(data_type, value)
143 }
144 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
145 }
146 }
147
148 pub fn put_repeated_vector(&self, value: Value, vector: VectorRef) {
151 if let CacheStrategy::EnableAll(cache_manager) = self {
152 cache_manager.put_repeated_vector(value, vector);
153 }
154 }
155
156 pub fn get_pages(&self, page_key: &PageKey) -> Option<Arc<PageValue>> {
159 match self {
160 CacheStrategy::EnableAll(cache_manager) => cache_manager.get_pages(page_key),
161 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
162 }
163 }
164
165 pub fn put_pages(&self, page_key: PageKey, pages: Arc<PageValue>) {
168 if let CacheStrategy::EnableAll(cache_manager) = self {
169 cache_manager.put_pages(page_key, pages);
170 }
171 }
172
173 pub fn get_selector_result(
176 &self,
177 selector_key: &SelectorResultKey,
178 ) -> Option<Arc<SelectorResultValue>> {
179 match self {
180 CacheStrategy::EnableAll(cache_manager) => {
181 cache_manager.get_selector_result(selector_key)
182 }
183 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
184 }
185 }
186
187 pub fn put_selector_result(
190 &self,
191 selector_key: SelectorResultKey,
192 result: Arc<SelectorResultValue>,
193 ) {
194 if let CacheStrategy::EnableAll(cache_manager) = self {
195 cache_manager.put_selector_result(selector_key, result);
196 }
197 }
198
199 pub fn write_cache(&self) -> Option<&WriteCacheRef> {
202 match self {
203 CacheStrategy::EnableAll(cache_manager) => cache_manager.write_cache(),
204 CacheStrategy::Compaction(cache_manager) => cache_manager.write_cache(),
205 CacheStrategy::Disabled => None,
206 }
207 }
208
209 pub fn inverted_index_cache(&self) -> Option<&InvertedIndexCacheRef> {
212 match self {
213 CacheStrategy::EnableAll(cache_manager) => cache_manager.inverted_index_cache(),
214 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
215 }
216 }
217
218 pub fn bloom_filter_index_cache(&self) -> Option<&BloomFilterIndexCacheRef> {
221 match self {
222 CacheStrategy::EnableAll(cache_manager) => cache_manager.bloom_filter_index_cache(),
223 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
224 }
225 }
226
227 pub fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
230 match self {
231 CacheStrategy::EnableAll(cache_manager) => cache_manager.puffin_metadata_cache(),
232 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
233 }
234 }
235
236 pub fn index_result_cache(&self) -> Option<&IndexResultCache> {
239 match self {
240 CacheStrategy::EnableAll(cache_manager) => cache_manager.index_result_cache(),
241 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
242 }
243 }
244}
245
246#[derive(Default)]
250pub struct CacheManager {
251 sst_meta_cache: Option<SstMetaCache>,
253 vector_cache: Option<VectorCache>,
255 page_cache: Option<PageCache>,
257 write_cache: Option<WriteCacheRef>,
259 inverted_index_cache: Option<InvertedIndexCacheRef>,
261 bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
263 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
265 selector_result_cache: Option<SelectorResultCache>,
267 index_result_cache: Option<IndexResultCache>,
269}
270
271pub type CacheManagerRef = Arc<CacheManager>;
272
273impl CacheManager {
274 pub fn builder() -> CacheManagerBuilder {
276 CacheManagerBuilder::default()
277 }
278
279 pub async fn get_parquet_meta_data(
282 &self,
283 file_id: RegionFileId,
284 ) -> Option<Arc<ParquetMetaData>> {
285 let metadata = self.get_parquet_meta_data_from_mem_cache(file_id);
287 if metadata.is_some() {
288 return metadata;
289 }
290
291 let key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Parquet);
293 if let Some(write_cache) = &self.write_cache {
294 if let Some(metadata) = write_cache.file_cache().get_parquet_meta_data(key).await {
295 let metadata = Arc::new(metadata);
296 self.put_parquet_meta_data(file_id, metadata.clone());
298 return Some(metadata);
299 }
300 };
301
302 None
303 }
304
305 pub fn get_parquet_meta_data_from_mem_cache(
308 &self,
309 file_id: RegionFileId,
310 ) -> Option<Arc<ParquetMetaData>> {
311 self.sst_meta_cache.as_ref().and_then(|sst_meta_cache| {
313 let value = sst_meta_cache.get(&SstMetaKey(file_id.region_id(), file_id.file_id()));
314 update_hit_miss(value, SST_META_TYPE)
315 })
316 }
317
318 pub fn put_parquet_meta_data(&self, file_id: RegionFileId, metadata: Arc<ParquetMetaData>) {
320 if let Some(cache) = &self.sst_meta_cache {
321 let key = SstMetaKey(file_id.region_id(), file_id.file_id());
322 CACHE_BYTES
323 .with_label_values(&[SST_META_TYPE])
324 .add(meta_cache_weight(&key, &metadata).into());
325 cache.insert(key, metadata);
326 }
327 }
328
329 pub fn remove_parquet_meta_data(&self, file_id: RegionFileId) {
331 if let Some(cache) = &self.sst_meta_cache {
332 cache.remove(&SstMetaKey(file_id.region_id(), file_id.file_id()));
333 }
334 }
335
336 pub fn get_repeated_vector(
338 &self,
339 data_type: &ConcreteDataType,
340 value: &Value,
341 ) -> Option<VectorRef> {
342 self.vector_cache.as_ref().and_then(|vector_cache| {
343 let value = vector_cache.get(&(data_type.clone(), value.clone()));
344 update_hit_miss(value, VECTOR_TYPE)
345 })
346 }
347
348 pub fn put_repeated_vector(&self, value: Value, vector: VectorRef) {
350 if let Some(cache) = &self.vector_cache {
351 let key = (vector.data_type(), value);
352 CACHE_BYTES
353 .with_label_values(&[VECTOR_TYPE])
354 .add(vector_cache_weight(&key, &vector).into());
355 cache.insert(key, vector);
356 }
357 }
358
359 pub fn get_pages(&self, page_key: &PageKey) -> Option<Arc<PageValue>> {
361 self.page_cache.as_ref().and_then(|page_cache| {
362 let value = page_cache.get(page_key);
363 update_hit_miss(value, PAGE_TYPE)
364 })
365 }
366
367 pub fn put_pages(&self, page_key: PageKey, pages: Arc<PageValue>) {
369 if let Some(cache) = &self.page_cache {
370 CACHE_BYTES
371 .with_label_values(&[PAGE_TYPE])
372 .add(page_cache_weight(&page_key, &pages).into());
373 cache.insert(page_key, pages);
374 }
375 }
376
377 pub fn get_selector_result(
379 &self,
380 selector_key: &SelectorResultKey,
381 ) -> Option<Arc<SelectorResultValue>> {
382 self.selector_result_cache
383 .as_ref()
384 .and_then(|selector_result_cache| selector_result_cache.get(selector_key))
385 }
386
387 pub fn put_selector_result(
389 &self,
390 selector_key: SelectorResultKey,
391 result: Arc<SelectorResultValue>,
392 ) {
393 if let Some(cache) = &self.selector_result_cache {
394 CACHE_BYTES
395 .with_label_values(&[SELECTOR_RESULT_TYPE])
396 .add(selector_result_cache_weight(&selector_key, &result).into());
397 cache.insert(selector_key, result);
398 }
399 }
400
401 pub(crate) fn write_cache(&self) -> Option<&WriteCacheRef> {
403 self.write_cache.as_ref()
404 }
405
406 pub(crate) fn inverted_index_cache(&self) -> Option<&InvertedIndexCacheRef> {
407 self.inverted_index_cache.as_ref()
408 }
409
410 pub(crate) fn bloom_filter_index_cache(&self) -> Option<&BloomFilterIndexCacheRef> {
411 self.bloom_filter_index_cache.as_ref()
412 }
413
414 pub(crate) fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
415 self.puffin_metadata_cache.as_ref()
416 }
417
418 pub(crate) fn index_result_cache(&self) -> Option<&IndexResultCache> {
419 self.index_result_cache.as_ref()
420 }
421}
422
423pub fn selector_result_cache_miss() {
425 CACHE_MISS.with_label_values(&[SELECTOR_RESULT_TYPE]).inc()
426}
427
428pub fn selector_result_cache_hit() {
430 CACHE_HIT.with_label_values(&[SELECTOR_RESULT_TYPE]).inc()
431}
432
433#[derive(Default)]
435pub struct CacheManagerBuilder {
436 sst_meta_cache_size: u64,
437 vector_cache_size: u64,
438 page_cache_size: u64,
439 index_metadata_size: u64,
440 index_content_size: u64,
441 index_content_page_size: u64,
442 index_result_cache_size: u64,
443 puffin_metadata_size: u64,
444 write_cache: Option<WriteCacheRef>,
445 selector_result_cache_size: u64,
446}
447
448impl CacheManagerBuilder {
449 pub fn sst_meta_cache_size(mut self, bytes: u64) -> Self {
451 self.sst_meta_cache_size = bytes;
452 self
453 }
454
455 pub fn vector_cache_size(mut self, bytes: u64) -> Self {
457 self.vector_cache_size = bytes;
458 self
459 }
460
461 pub fn page_cache_size(mut self, bytes: u64) -> Self {
463 self.page_cache_size = bytes;
464 self
465 }
466
467 pub fn write_cache(mut self, cache: Option<WriteCacheRef>) -> Self {
469 self.write_cache = cache;
470 self
471 }
472
473 pub fn index_metadata_size(mut self, bytes: u64) -> Self {
475 self.index_metadata_size = bytes;
476 self
477 }
478
479 pub fn index_content_size(mut self, bytes: u64) -> Self {
481 self.index_content_size = bytes;
482 self
483 }
484
485 pub fn index_content_page_size(mut self, bytes: u64) -> Self {
487 self.index_content_page_size = bytes;
488 self
489 }
490
491 pub fn index_result_cache_size(mut self, bytes: u64) -> Self {
493 self.index_result_cache_size = bytes;
494 self
495 }
496
497 pub fn puffin_metadata_size(mut self, bytes: u64) -> Self {
499 self.puffin_metadata_size = bytes;
500 self
501 }
502
503 pub fn selector_result_cache_size(mut self, bytes: u64) -> Self {
505 self.selector_result_cache_size = bytes;
506 self
507 }
508
509 pub fn build(self) -> CacheManager {
511 fn to_str(cause: RemovalCause) -> &'static str {
512 match cause {
513 RemovalCause::Expired => "expired",
514 RemovalCause::Explicit => "explicit",
515 RemovalCause::Replaced => "replaced",
516 RemovalCause::Size => "size",
517 }
518 }
519
520 let sst_meta_cache = (self.sst_meta_cache_size != 0).then(|| {
521 Cache::builder()
522 .max_capacity(self.sst_meta_cache_size)
523 .weigher(meta_cache_weight)
524 .eviction_listener(|k, v, cause| {
525 let size = meta_cache_weight(&k, &v);
526 CACHE_BYTES
527 .with_label_values(&[SST_META_TYPE])
528 .sub(size.into());
529 CACHE_EVICTION
530 .with_label_values(&[SST_META_TYPE, to_str(cause)])
531 .inc();
532 })
533 .build()
534 });
535 let vector_cache = (self.vector_cache_size != 0).then(|| {
536 Cache::builder()
537 .max_capacity(self.vector_cache_size)
538 .weigher(vector_cache_weight)
539 .eviction_listener(|k, v, cause| {
540 let size = vector_cache_weight(&k, &v);
541 CACHE_BYTES
542 .with_label_values(&[VECTOR_TYPE])
543 .sub(size.into());
544 CACHE_EVICTION
545 .with_label_values(&[VECTOR_TYPE, to_str(cause)])
546 .inc();
547 })
548 .build()
549 });
550 let page_cache = (self.page_cache_size != 0).then(|| {
551 Cache::builder()
552 .max_capacity(self.page_cache_size)
553 .weigher(page_cache_weight)
554 .eviction_listener(|k, v, cause| {
555 let size = page_cache_weight(&k, &v);
556 CACHE_BYTES.with_label_values(&[PAGE_TYPE]).sub(size.into());
557 CACHE_EVICTION
558 .with_label_values(&[PAGE_TYPE, to_str(cause)])
559 .inc();
560 })
561 .build()
562 });
563 let inverted_index_cache = InvertedIndexCache::new(
564 self.index_metadata_size,
565 self.index_content_size,
566 self.index_content_page_size,
567 );
568 let bloom_filter_index_cache = BloomFilterIndexCache::new(
570 self.index_metadata_size,
571 self.index_content_size,
572 self.index_content_page_size,
573 );
574 let index_result_cache = (self.index_result_cache_size != 0)
575 .then(|| IndexResultCache::new(self.index_result_cache_size));
576 let puffin_metadata_cache =
577 PuffinMetadataCache::new(self.puffin_metadata_size, &CACHE_BYTES);
578 let selector_result_cache = (self.selector_result_cache_size != 0).then(|| {
579 Cache::builder()
580 .max_capacity(self.selector_result_cache_size)
581 .weigher(selector_result_cache_weight)
582 .eviction_listener(|k, v, cause| {
583 let size = selector_result_cache_weight(&k, &v);
584 CACHE_BYTES
585 .with_label_values(&[SELECTOR_RESULT_TYPE])
586 .sub(size.into());
587 CACHE_EVICTION
588 .with_label_values(&[SELECTOR_RESULT_TYPE, to_str(cause)])
589 .inc();
590 })
591 .build()
592 });
593 CacheManager {
594 sst_meta_cache,
595 vector_cache,
596 page_cache,
597 write_cache: self.write_cache,
598 inverted_index_cache: Some(Arc::new(inverted_index_cache)),
599 bloom_filter_index_cache: Some(Arc::new(bloom_filter_index_cache)),
600 puffin_metadata_cache: Some(Arc::new(puffin_metadata_cache)),
601 selector_result_cache,
602 index_result_cache,
603 }
604 }
605}
606
607fn meta_cache_weight(k: &SstMetaKey, v: &Arc<ParquetMetaData>) -> u32 {
608 (k.estimated_size() + parquet_meta_size(v)) as u32
610}
611
612fn vector_cache_weight(_k: &(ConcreteDataType, Value), v: &VectorRef) -> u32 {
613 (mem::size_of::<ConcreteDataType>() + mem::size_of::<Value>() + v.memory_size()) as u32
615}
616
617fn page_cache_weight(k: &PageKey, v: &Arc<PageValue>) -> u32 {
618 (k.estimated_size() + v.estimated_size()) as u32
619}
620
621fn selector_result_cache_weight(k: &SelectorResultKey, v: &Arc<SelectorResultValue>) -> u32 {
622 (mem::size_of_val(k) + v.estimated_size()) as u32
623}
624
625fn update_hit_miss<T>(value: Option<T>, cache_type: &str) -> Option<T> {
627 if value.is_some() {
628 CACHE_HIT.with_label_values(&[cache_type]).inc();
629 } else {
630 CACHE_MISS.with_label_values(&[cache_type]).inc();
631 }
632 value
633}
634
635#[derive(Debug, Clone, PartialEq, Eq, Hash)]
637struct SstMetaKey(RegionId, FileId);
638
639impl SstMetaKey {
640 fn estimated_size(&self) -> usize {
642 mem::size_of::<Self>()
643 }
644}
645
646#[derive(Debug, Clone, PartialEq, Eq, Hash)]
648pub struct ColumnPagePath {
649 region_id: RegionId,
651 file_id: FileId,
653 row_group_idx: usize,
655 column_idx: usize,
657}
658
659#[derive(Debug, Clone, PartialEq, Eq, Hash)]
661pub enum PageKey {
662 Compressed(ColumnPagePath),
664 Uncompressed(ColumnPagePath),
666}
667
668impl PageKey {
669 pub fn new_compressed(
671 region_id: RegionId,
672 file_id: FileId,
673 row_group_idx: usize,
674 column_idx: usize,
675 ) -> PageKey {
676 PageKey::Compressed(ColumnPagePath {
677 region_id,
678 file_id,
679 row_group_idx,
680 column_idx,
681 })
682 }
683
684 pub fn new_uncompressed(
686 region_id: RegionId,
687 file_id: FileId,
688 row_group_idx: usize,
689 column_idx: usize,
690 ) -> PageKey {
691 PageKey::Uncompressed(ColumnPagePath {
692 region_id,
693 file_id,
694 row_group_idx,
695 column_idx,
696 })
697 }
698
699 fn estimated_size(&self) -> usize {
701 mem::size_of::<Self>()
702 }
703}
704
705#[derive(Default)]
708pub struct PageValue {
709 pub compressed: Bytes,
711 pub row_group: Vec<Page>,
713}
714
715impl PageValue {
716 pub fn new_compressed(bytes: Bytes) -> PageValue {
718 PageValue {
719 compressed: bytes,
720 row_group: vec![],
721 }
722 }
723
724 pub fn new_row_group(pages: Vec<Page>) -> PageValue {
726 PageValue {
727 compressed: Bytes::new(),
728 row_group: pages,
729 }
730 }
731
732 fn estimated_size(&self) -> usize {
734 mem::size_of::<Self>()
735 + self.compressed.len()
736 + self
737 .row_group
738 .iter()
739 .map(|page| page.buffer().len())
740 .sum::<usize>()
741 }
742}
743
744#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
746pub struct SelectorResultKey {
747 pub file_id: FileId,
749 pub row_group_idx: usize,
751 pub selector: TimeSeriesRowSelector,
753}
754
755pub struct SelectorResultValue {
757 pub result: Vec<Batch>,
759 pub projection: Vec<usize>,
761}
762
763impl SelectorResultValue {
764 pub fn new(result: Vec<Batch>, projection: Vec<usize>) -> SelectorResultValue {
766 SelectorResultValue { result, projection }
767 }
768
769 fn estimated_size(&self) -> usize {
771 self.result.iter().map(|batch| batch.memory_size()).sum()
773 }
774}
775
776type SstMetaCache = Cache<SstMetaKey, Arc<ParquetMetaData>>;
778type VectorCache = Cache<(ConcreteDataType, Value), VectorRef>;
782type PageCache = Cache<PageKey, Arc<PageValue>>;
784type SelectorResultCache = Cache<SelectorResultKey, Arc<SelectorResultValue>>;
786
787#[cfg(test)]
788mod tests {
789 use std::sync::Arc;
790
791 use datatypes::vectors::Int64Vector;
792
793 use super::*;
794 use crate::cache::test_util::parquet_meta;
795
796 #[tokio::test]
797 async fn test_disable_cache() {
798 let cache = CacheManager::default();
799 assert!(cache.sst_meta_cache.is_none());
800 assert!(cache.vector_cache.is_none());
801 assert!(cache.page_cache.is_none());
802
803 let region_id = RegionId::new(1, 1);
804 let file_id = RegionFileId::new(region_id, FileId::random());
805 let metadata = parquet_meta();
806 cache.put_parquet_meta_data(file_id, metadata);
807 assert!(cache.get_parquet_meta_data(file_id).await.is_none());
808
809 let value = Value::Int64(10);
810 let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10]));
811 cache.put_repeated_vector(value.clone(), vector.clone());
812 assert!(cache
813 .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
814 .is_none());
815
816 let key = PageKey::new_uncompressed(region_id, file_id.file_id(), 0, 0);
817 let pages = Arc::new(PageValue::default());
818 cache.put_pages(key.clone(), pages);
819 assert!(cache.get_pages(&key).is_none());
820
821 assert!(cache.write_cache().is_none());
822 }
823
824 #[tokio::test]
825 async fn test_parquet_meta_cache() {
826 let cache = CacheManager::builder().sst_meta_cache_size(2000).build();
827 let region_id = RegionId::new(1, 1);
828 let file_id = RegionFileId::new(region_id, FileId::random());
829 assert!(cache.get_parquet_meta_data(file_id).await.is_none());
830 let metadata = parquet_meta();
831 cache.put_parquet_meta_data(file_id, metadata);
832 assert!(cache.get_parquet_meta_data(file_id).await.is_some());
833 cache.remove_parquet_meta_data(file_id);
834 assert!(cache.get_parquet_meta_data(file_id).await.is_none());
835 }
836
837 #[test]
838 fn test_repeated_vector_cache() {
839 let cache = CacheManager::builder().vector_cache_size(4096).build();
840 let value = Value::Int64(10);
841 assert!(cache
842 .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
843 .is_none());
844 let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10]));
845 cache.put_repeated_vector(value.clone(), vector.clone());
846 let cached = cache
847 .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
848 .unwrap();
849 assert_eq!(vector, cached);
850 }
851
852 #[test]
853 fn test_page_cache() {
854 let cache = CacheManager::builder().page_cache_size(1000).build();
855 let region_id = RegionId::new(1, 1);
856 let file_id = FileId::random();
857 let key = PageKey::new_compressed(region_id, file_id, 0, 0);
858 assert!(cache.get_pages(&key).is_none());
859 let pages = Arc::new(PageValue::default());
860 cache.put_pages(key.clone(), pages);
861 assert!(cache.get_pages(&key).is_some());
862 }
863
864 #[test]
865 fn test_selector_result_cache() {
866 let cache = CacheManager::builder()
867 .selector_result_cache_size(1000)
868 .build();
869 let file_id = FileId::random();
870 let key = SelectorResultKey {
871 file_id,
872 row_group_idx: 0,
873 selector: TimeSeriesRowSelector::LastRow,
874 };
875 assert!(cache.get_selector_result(&key).is_none());
876 let result = Arc::new(SelectorResultValue::new(Vec::new(), Vec::new()));
877 cache.put_selector_result(key, result);
878 assert!(cache.get_selector_result(&key).is_some());
879 }
880}