1pub(crate) mod cache_size;
18
19pub(crate) mod file_cache;
20pub(crate) mod index;
21pub(crate) mod manifest_cache;
22#[cfg(test)]
23pub(crate) mod test_util;
24pub(crate) mod write_cache;
25
26use std::mem;
27use std::ops::Range;
28use std::sync::Arc;
29
30use bytes::Bytes;
31use datatypes::value::Value;
32use datatypes::vectors::VectorRef;
33use index::bloom_filter_index::{BloomFilterIndexCache, BloomFilterIndexCacheRef};
34use index::result_cache::IndexResultCache;
35use moka::notification::RemovalCause;
36use moka::sync::Cache;
37use object_store::ObjectStore;
38use parquet::file::metadata::{PageIndexPolicy, ParquetMetaData};
39use puffin::puffin_manager::cache::{PuffinMetadataCache, PuffinMetadataCacheRef};
40use store_api::storage::{ConcreteDataType, FileId, RegionId, TimeSeriesRowSelector};
41
42use crate::cache::cache_size::parquet_meta_size;
43use crate::cache::file_cache::{FileType, IndexKey};
44use crate::cache::index::inverted_index::{InvertedIndexCache, InvertedIndexCacheRef};
45#[cfg(feature = "vector_index")]
46use crate::cache::index::vector_index::{VectorIndexCache, VectorIndexCacheRef};
47use crate::cache::write_cache::WriteCacheRef;
48use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS};
49use crate::read::Batch;
50use crate::sst::file::{RegionFileId, RegionIndexId};
51use crate::sst::parquet::reader::MetadataCacheMetrics;
52
53const SST_META_TYPE: &str = "sst_meta";
55const VECTOR_TYPE: &str = "vector";
57const PAGE_TYPE: &str = "page";
59const FILE_TYPE: &str = "file";
61const INDEX_TYPE: &str = "index";
63const SELECTOR_RESULT_TYPE: &str = "selector_result";
65
66#[derive(Clone)]
68pub enum CacheStrategy {
69 EnableAll(CacheManagerRef),
72 Compaction(CacheManagerRef),
77 Disabled,
79}
80
81impl CacheStrategy {
82 pub(crate) async fn get_parquet_meta_data(
85 &self,
86 file_id: RegionFileId,
87 metrics: &mut MetadataCacheMetrics,
88 page_index_policy: PageIndexPolicy,
89 ) -> Option<Arc<ParquetMetaData>> {
90 match self {
91 CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => {
92 cache_manager
93 .get_parquet_meta_data(file_id, metrics, page_index_policy)
94 .await
95 }
96 CacheStrategy::Disabled => {
97 metrics.cache_miss += 1;
98 None
99 }
100 }
101 }
102
103 pub fn get_parquet_meta_data_from_mem_cache(
105 &self,
106 file_id: RegionFileId,
107 ) -> Option<Arc<ParquetMetaData>> {
108 match self {
109 CacheStrategy::EnableAll(cache_manager) => {
110 cache_manager.get_parquet_meta_data_from_mem_cache(file_id)
111 }
112 CacheStrategy::Compaction(cache_manager) => {
113 cache_manager.get_parquet_meta_data_from_mem_cache(file_id)
114 }
115 CacheStrategy::Disabled => None,
116 }
117 }
118
119 pub fn put_parquet_meta_data(&self, file_id: RegionFileId, metadata: Arc<ParquetMetaData>) {
121 match self {
122 CacheStrategy::EnableAll(cache_manager) => {
123 cache_manager.put_parquet_meta_data(file_id, metadata);
124 }
125 CacheStrategy::Compaction(cache_manager) => {
126 cache_manager.put_parquet_meta_data(file_id, metadata);
127 }
128 CacheStrategy::Disabled => {}
129 }
130 }
131
132 pub fn remove_parquet_meta_data(&self, file_id: RegionFileId) {
134 match self {
135 CacheStrategy::EnableAll(cache_manager) => {
136 cache_manager.remove_parquet_meta_data(file_id);
137 }
138 CacheStrategy::Compaction(cache_manager) => {
139 cache_manager.remove_parquet_meta_data(file_id);
140 }
141 CacheStrategy::Disabled => {}
142 }
143 }
144
145 pub fn get_repeated_vector(
148 &self,
149 data_type: &ConcreteDataType,
150 value: &Value,
151 ) -> Option<VectorRef> {
152 match self {
153 CacheStrategy::EnableAll(cache_manager) => {
154 cache_manager.get_repeated_vector(data_type, value)
155 }
156 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
157 }
158 }
159
160 pub fn put_repeated_vector(&self, value: Value, vector: VectorRef) {
163 if let CacheStrategy::EnableAll(cache_manager) = self {
164 cache_manager.put_repeated_vector(value, vector);
165 }
166 }
167
168 pub fn get_pages(&self, page_key: &PageKey) -> Option<Arc<PageValue>> {
171 match self {
172 CacheStrategy::EnableAll(cache_manager) => cache_manager.get_pages(page_key),
173 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
174 }
175 }
176
177 pub fn put_pages(&self, page_key: PageKey, pages: Arc<PageValue>) {
180 if let CacheStrategy::EnableAll(cache_manager) = self {
181 cache_manager.put_pages(page_key, pages);
182 }
183 }
184
185 pub async fn evict_puffin_cache(&self, file_id: RegionIndexId) {
187 match self {
188 CacheStrategy::EnableAll(cache_manager) => {
189 cache_manager.evict_puffin_cache(file_id).await
190 }
191 CacheStrategy::Compaction(cache_manager) => {
192 cache_manager.evict_puffin_cache(file_id).await
193 }
194 CacheStrategy::Disabled => {}
195 }
196 }
197
198 pub fn get_selector_result(
201 &self,
202 selector_key: &SelectorResultKey,
203 ) -> Option<Arc<SelectorResultValue>> {
204 match self {
205 CacheStrategy::EnableAll(cache_manager) => {
206 cache_manager.get_selector_result(selector_key)
207 }
208 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
209 }
210 }
211
212 pub fn put_selector_result(
215 &self,
216 selector_key: SelectorResultKey,
217 result: Arc<SelectorResultValue>,
218 ) {
219 if let CacheStrategy::EnableAll(cache_manager) = self {
220 cache_manager.put_selector_result(selector_key, result);
221 }
222 }
223
224 pub fn write_cache(&self) -> Option<&WriteCacheRef> {
227 match self {
228 CacheStrategy::EnableAll(cache_manager) => cache_manager.write_cache(),
229 CacheStrategy::Compaction(cache_manager) => cache_manager.write_cache(),
230 CacheStrategy::Disabled => None,
231 }
232 }
233
234 pub fn inverted_index_cache(&self) -> Option<&InvertedIndexCacheRef> {
237 match self {
238 CacheStrategy::EnableAll(cache_manager) => cache_manager.inverted_index_cache(),
239 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
240 }
241 }
242
243 pub fn bloom_filter_index_cache(&self) -> Option<&BloomFilterIndexCacheRef> {
246 match self {
247 CacheStrategy::EnableAll(cache_manager) => cache_manager.bloom_filter_index_cache(),
248 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
249 }
250 }
251
252 #[cfg(feature = "vector_index")]
255 pub fn vector_index_cache(&self) -> Option<&VectorIndexCacheRef> {
256 match self {
257 CacheStrategy::EnableAll(cache_manager) => cache_manager.vector_index_cache(),
258 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
259 }
260 }
261
262 pub fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
265 match self {
266 CacheStrategy::EnableAll(cache_manager) => cache_manager.puffin_metadata_cache(),
267 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
268 }
269 }
270
271 pub fn index_result_cache(&self) -> Option<&IndexResultCache> {
274 match self {
275 CacheStrategy::EnableAll(cache_manager) => cache_manager.index_result_cache(),
276 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
277 }
278 }
279
280 pub fn maybe_download_background(
282 &self,
283 index_key: IndexKey,
284 remote_path: String,
285 remote_store: ObjectStore,
286 file_size: u64,
287 ) {
288 if let CacheStrategy::EnableAll(cache_manager) = self
289 && let Some(write_cache) = cache_manager.write_cache()
290 {
291 write_cache.file_cache().maybe_download_background(
292 index_key,
293 remote_path,
294 remote_store,
295 file_size,
296 );
297 }
298 }
299}
300
301#[derive(Default)]
305pub struct CacheManager {
306 sst_meta_cache: Option<SstMetaCache>,
308 vector_cache: Option<VectorCache>,
310 page_cache: Option<PageCache>,
312 write_cache: Option<WriteCacheRef>,
314 inverted_index_cache: Option<InvertedIndexCacheRef>,
316 bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
318 #[cfg(feature = "vector_index")]
320 vector_index_cache: Option<VectorIndexCacheRef>,
321 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
323 selector_result_cache: Option<SelectorResultCache>,
325 index_result_cache: Option<IndexResultCache>,
327}
328
329pub type CacheManagerRef = Arc<CacheManager>;
330
331impl CacheManager {
332 pub fn builder() -> CacheManagerBuilder {
334 CacheManagerBuilder::default()
335 }
336
337 pub(crate) async fn get_parquet_meta_data(
340 &self,
341 file_id: RegionFileId,
342 metrics: &mut MetadataCacheMetrics,
343 page_index_policy: PageIndexPolicy,
344 ) -> Option<Arc<ParquetMetaData>> {
345 if let Some(metadata) = self.get_parquet_meta_data_from_mem_cache(file_id) {
347 metrics.mem_cache_hit += 1;
348 return Some(metadata);
349 }
350
351 let key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Parquet);
353 if let Some(write_cache) = &self.write_cache
354 && let Some(metadata) = write_cache
355 .file_cache()
356 .get_parquet_meta_data(key, metrics, page_index_policy)
357 .await
358 {
359 metrics.file_cache_hit += 1;
360 let metadata = Arc::new(metadata);
361 self.put_parquet_meta_data(file_id, metadata.clone());
363 return Some(metadata);
364 };
365 metrics.cache_miss += 1;
366
367 None
368 }
369
370 pub fn get_parquet_meta_data_from_mem_cache(
373 &self,
374 file_id: RegionFileId,
375 ) -> Option<Arc<ParquetMetaData>> {
376 self.sst_meta_cache.as_ref().and_then(|sst_meta_cache| {
378 let value = sst_meta_cache.get(&SstMetaKey(file_id.region_id(), file_id.file_id()));
379 update_hit_miss(value, SST_META_TYPE)
380 })
381 }
382
383 pub fn put_parquet_meta_data(&self, file_id: RegionFileId, metadata: Arc<ParquetMetaData>) {
385 if let Some(cache) = &self.sst_meta_cache {
386 let key = SstMetaKey(file_id.region_id(), file_id.file_id());
387 CACHE_BYTES
388 .with_label_values(&[SST_META_TYPE])
389 .add(meta_cache_weight(&key, &metadata).into());
390 cache.insert(key, metadata);
391 }
392 }
393
394 pub fn remove_parquet_meta_data(&self, file_id: RegionFileId) {
396 if let Some(cache) = &self.sst_meta_cache {
397 cache.remove(&SstMetaKey(file_id.region_id(), file_id.file_id()));
398 }
399 }
400
401 pub fn get_repeated_vector(
403 &self,
404 data_type: &ConcreteDataType,
405 value: &Value,
406 ) -> Option<VectorRef> {
407 self.vector_cache.as_ref().and_then(|vector_cache| {
408 let value = vector_cache.get(&(data_type.clone(), value.clone()));
409 update_hit_miss(value, VECTOR_TYPE)
410 })
411 }
412
413 pub fn put_repeated_vector(&self, value: Value, vector: VectorRef) {
415 if let Some(cache) = &self.vector_cache {
416 let key = (vector.data_type(), value);
417 CACHE_BYTES
418 .with_label_values(&[VECTOR_TYPE])
419 .add(vector_cache_weight(&key, &vector).into());
420 cache.insert(key, vector);
421 }
422 }
423
424 pub fn get_pages(&self, page_key: &PageKey) -> Option<Arc<PageValue>> {
426 self.page_cache.as_ref().and_then(|page_cache| {
427 let value = page_cache.get(page_key);
428 update_hit_miss(value, PAGE_TYPE)
429 })
430 }
431
432 pub fn put_pages(&self, page_key: PageKey, pages: Arc<PageValue>) {
434 if let Some(cache) = &self.page_cache {
435 CACHE_BYTES
436 .with_label_values(&[PAGE_TYPE])
437 .add(page_cache_weight(&page_key, &pages).into());
438 cache.insert(page_key, pages);
439 }
440 }
441
442 pub async fn evict_puffin_cache(&self, file_id: RegionIndexId) {
444 if let Some(cache) = &self.bloom_filter_index_cache {
445 cache.invalidate_file(file_id.file_id());
446 }
447
448 if let Some(cache) = &self.inverted_index_cache {
449 cache.invalidate_file(file_id.file_id());
450 }
451
452 if let Some(cache) = &self.index_result_cache {
453 cache.invalidate_file(file_id.file_id());
454 }
455
456 #[cfg(feature = "vector_index")]
457 if let Some(cache) = &self.vector_index_cache {
458 cache.invalidate_file(file_id.file_id());
459 }
460
461 if let Some(cache) = &self.puffin_metadata_cache {
462 cache.remove(&file_id.to_string());
463 }
464
465 if let Some(write_cache) = &self.write_cache {
466 write_cache
467 .remove(IndexKey::new(
468 file_id.region_id(),
469 file_id.file_id(),
470 FileType::Puffin(file_id.version),
471 ))
472 .await;
473 }
474 }
475
476 pub fn get_selector_result(
478 &self,
479 selector_key: &SelectorResultKey,
480 ) -> Option<Arc<SelectorResultValue>> {
481 self.selector_result_cache
482 .as_ref()
483 .and_then(|selector_result_cache| selector_result_cache.get(selector_key))
484 }
485
486 pub fn put_selector_result(
488 &self,
489 selector_key: SelectorResultKey,
490 result: Arc<SelectorResultValue>,
491 ) {
492 if let Some(cache) = &self.selector_result_cache {
493 CACHE_BYTES
494 .with_label_values(&[SELECTOR_RESULT_TYPE])
495 .add(selector_result_cache_weight(&selector_key, &result).into());
496 cache.insert(selector_key, result);
497 }
498 }
499
500 pub(crate) fn write_cache(&self) -> Option<&WriteCacheRef> {
502 self.write_cache.as_ref()
503 }
504
505 pub(crate) fn inverted_index_cache(&self) -> Option<&InvertedIndexCacheRef> {
506 self.inverted_index_cache.as_ref()
507 }
508
509 pub(crate) fn bloom_filter_index_cache(&self) -> Option<&BloomFilterIndexCacheRef> {
510 self.bloom_filter_index_cache.as_ref()
511 }
512
513 #[cfg(feature = "vector_index")]
514 pub(crate) fn vector_index_cache(&self) -> Option<&VectorIndexCacheRef> {
515 self.vector_index_cache.as_ref()
516 }
517
518 pub(crate) fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
519 self.puffin_metadata_cache.as_ref()
520 }
521
522 pub(crate) fn index_result_cache(&self) -> Option<&IndexResultCache> {
523 self.index_result_cache.as_ref()
524 }
525}
526
527pub fn selector_result_cache_miss() {
529 CACHE_MISS.with_label_values(&[SELECTOR_RESULT_TYPE]).inc()
530}
531
532pub fn selector_result_cache_hit() {
534 CACHE_HIT.with_label_values(&[SELECTOR_RESULT_TYPE]).inc()
535}
536
537#[derive(Default)]
539pub struct CacheManagerBuilder {
540 sst_meta_cache_size: u64,
541 vector_cache_size: u64,
542 page_cache_size: u64,
543 index_metadata_size: u64,
544 index_content_size: u64,
545 index_content_page_size: u64,
546 index_result_cache_size: u64,
547 puffin_metadata_size: u64,
548 write_cache: Option<WriteCacheRef>,
549 selector_result_cache_size: u64,
550}
551
552impl CacheManagerBuilder {
553 pub fn sst_meta_cache_size(mut self, bytes: u64) -> Self {
555 self.sst_meta_cache_size = bytes;
556 self
557 }
558
559 pub fn vector_cache_size(mut self, bytes: u64) -> Self {
561 self.vector_cache_size = bytes;
562 self
563 }
564
565 pub fn page_cache_size(mut self, bytes: u64) -> Self {
567 self.page_cache_size = bytes;
568 self
569 }
570
571 pub fn write_cache(mut self, cache: Option<WriteCacheRef>) -> Self {
573 self.write_cache = cache;
574 self
575 }
576
577 pub fn index_metadata_size(mut self, bytes: u64) -> Self {
579 self.index_metadata_size = bytes;
580 self
581 }
582
583 pub fn index_content_size(mut self, bytes: u64) -> Self {
585 self.index_content_size = bytes;
586 self
587 }
588
589 pub fn index_content_page_size(mut self, bytes: u64) -> Self {
591 self.index_content_page_size = bytes;
592 self
593 }
594
595 pub fn index_result_cache_size(mut self, bytes: u64) -> Self {
597 self.index_result_cache_size = bytes;
598 self
599 }
600
601 pub fn puffin_metadata_size(mut self, bytes: u64) -> Self {
603 self.puffin_metadata_size = bytes;
604 self
605 }
606
607 pub fn selector_result_cache_size(mut self, bytes: u64) -> Self {
609 self.selector_result_cache_size = bytes;
610 self
611 }
612
613 pub fn build(self) -> CacheManager {
615 fn to_str(cause: RemovalCause) -> &'static str {
616 match cause {
617 RemovalCause::Expired => "expired",
618 RemovalCause::Explicit => "explicit",
619 RemovalCause::Replaced => "replaced",
620 RemovalCause::Size => "size",
621 }
622 }
623
624 let sst_meta_cache = (self.sst_meta_cache_size != 0).then(|| {
625 Cache::builder()
626 .max_capacity(self.sst_meta_cache_size)
627 .weigher(meta_cache_weight)
628 .eviction_listener(|k, v, cause| {
629 let size = meta_cache_weight(&k, &v);
630 CACHE_BYTES
631 .with_label_values(&[SST_META_TYPE])
632 .sub(size.into());
633 CACHE_EVICTION
634 .with_label_values(&[SST_META_TYPE, to_str(cause)])
635 .inc();
636 })
637 .build()
638 });
639 let vector_cache = (self.vector_cache_size != 0).then(|| {
640 Cache::builder()
641 .max_capacity(self.vector_cache_size)
642 .weigher(vector_cache_weight)
643 .eviction_listener(|k, v, cause| {
644 let size = vector_cache_weight(&k, &v);
645 CACHE_BYTES
646 .with_label_values(&[VECTOR_TYPE])
647 .sub(size.into());
648 CACHE_EVICTION
649 .with_label_values(&[VECTOR_TYPE, to_str(cause)])
650 .inc();
651 })
652 .build()
653 });
654 let page_cache = (self.page_cache_size != 0).then(|| {
655 Cache::builder()
656 .max_capacity(self.page_cache_size)
657 .weigher(page_cache_weight)
658 .eviction_listener(|k, v, cause| {
659 let size = page_cache_weight(&k, &v);
660 CACHE_BYTES.with_label_values(&[PAGE_TYPE]).sub(size.into());
661 CACHE_EVICTION
662 .with_label_values(&[PAGE_TYPE, to_str(cause)])
663 .inc();
664 })
665 .build()
666 });
667 let inverted_index_cache = InvertedIndexCache::new(
668 self.index_metadata_size,
669 self.index_content_size,
670 self.index_content_page_size,
671 );
672 let bloom_filter_index_cache = BloomFilterIndexCache::new(
674 self.index_metadata_size,
675 self.index_content_size,
676 self.index_content_page_size,
677 );
678 #[cfg(feature = "vector_index")]
679 let vector_index_cache = (self.index_content_size != 0)
680 .then(|| Arc::new(VectorIndexCache::new(self.index_content_size)));
681 let index_result_cache = (self.index_result_cache_size != 0)
682 .then(|| IndexResultCache::new(self.index_result_cache_size));
683 let puffin_metadata_cache =
684 PuffinMetadataCache::new(self.puffin_metadata_size, &CACHE_BYTES);
685 let selector_result_cache = (self.selector_result_cache_size != 0).then(|| {
686 Cache::builder()
687 .max_capacity(self.selector_result_cache_size)
688 .weigher(selector_result_cache_weight)
689 .eviction_listener(|k, v, cause| {
690 let size = selector_result_cache_weight(&k, &v);
691 CACHE_BYTES
692 .with_label_values(&[SELECTOR_RESULT_TYPE])
693 .sub(size.into());
694 CACHE_EVICTION
695 .with_label_values(&[SELECTOR_RESULT_TYPE, to_str(cause)])
696 .inc();
697 })
698 .build()
699 });
700 CacheManager {
701 sst_meta_cache,
702 vector_cache,
703 page_cache,
704 write_cache: self.write_cache,
705 inverted_index_cache: Some(Arc::new(inverted_index_cache)),
706 bloom_filter_index_cache: Some(Arc::new(bloom_filter_index_cache)),
707 #[cfg(feature = "vector_index")]
708 vector_index_cache,
709 puffin_metadata_cache: Some(Arc::new(puffin_metadata_cache)),
710 selector_result_cache,
711 index_result_cache,
712 }
713 }
714}
715
716fn meta_cache_weight(k: &SstMetaKey, v: &Arc<ParquetMetaData>) -> u32 {
717 (k.estimated_size() + parquet_meta_size(v)) as u32
719}
720
721fn vector_cache_weight(_k: &(ConcreteDataType, Value), v: &VectorRef) -> u32 {
722 (mem::size_of::<ConcreteDataType>() + mem::size_of::<Value>() + v.memory_size()) as u32
724}
725
726fn page_cache_weight(k: &PageKey, v: &Arc<PageValue>) -> u32 {
727 (k.estimated_size() + v.estimated_size()) as u32
728}
729
730fn selector_result_cache_weight(k: &SelectorResultKey, v: &Arc<SelectorResultValue>) -> u32 {
731 (mem::size_of_val(k) + v.estimated_size()) as u32
732}
733
734fn update_hit_miss<T>(value: Option<T>, cache_type: &str) -> Option<T> {
736 if value.is_some() {
737 CACHE_HIT.with_label_values(&[cache_type]).inc();
738 } else {
739 CACHE_MISS.with_label_values(&[cache_type]).inc();
740 }
741 value
742}
743
744#[derive(Debug, Clone, PartialEq, Eq, Hash)]
746struct SstMetaKey(RegionId, FileId);
747
748impl SstMetaKey {
749 fn estimated_size(&self) -> usize {
751 mem::size_of::<Self>()
752 }
753}
754
755#[derive(Debug, Clone, PartialEq, Eq, Hash)]
757pub struct ColumnPagePath {
758 region_id: RegionId,
760 file_id: FileId,
762 row_group_idx: usize,
764 column_idx: usize,
766}
767
768#[derive(Debug, Clone, PartialEq, Eq, Hash)]
773pub struct PageKey {
774 file_id: FileId,
776 row_group_idx: usize,
778 ranges: Vec<Range<u64>>,
780}
781
782impl PageKey {
783 pub fn new(file_id: FileId, row_group_idx: usize, ranges: Vec<Range<u64>>) -> PageKey {
785 PageKey {
786 file_id,
787 row_group_idx,
788 ranges,
789 }
790 }
791
792 fn estimated_size(&self) -> usize {
794 mem::size_of::<Self>() + mem::size_of_val(self.ranges.as_slice())
795 }
796}
797
798#[derive(Default)]
801pub struct PageValue {
802 pub compressed: Vec<Bytes>,
804 pub page_size: u64,
806}
807
808impl PageValue {
809 pub fn new(bytes: Vec<Bytes>, page_size: u64) -> PageValue {
811 PageValue {
812 compressed: bytes,
813 page_size,
814 }
815 }
816
817 fn estimated_size(&self) -> usize {
819 mem::size_of::<Self>()
820 + self.page_size as usize
821 + self.compressed.iter().map(mem::size_of_val).sum::<usize>()
822 }
823}
824
825#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
827pub struct SelectorResultKey {
828 pub file_id: FileId,
830 pub row_group_idx: usize,
832 pub selector: TimeSeriesRowSelector,
834}
835
836pub struct SelectorResultValue {
838 pub result: Vec<Batch>,
840 pub projection: Vec<usize>,
842}
843
844impl SelectorResultValue {
845 pub fn new(result: Vec<Batch>, projection: Vec<usize>) -> SelectorResultValue {
847 SelectorResultValue { result, projection }
848 }
849
850 fn estimated_size(&self) -> usize {
852 self.result.iter().map(|batch| batch.memory_size()).sum()
854 }
855}
856
857type SstMetaCache = Cache<SstMetaKey, Arc<ParquetMetaData>>;
859type VectorCache = Cache<(ConcreteDataType, Value), VectorRef>;
863type PageCache = Cache<PageKey, Arc<PageValue>>;
865type SelectorResultCache = Cache<SelectorResultKey, Arc<SelectorResultValue>>;
867
868#[cfg(test)]
869mod tests {
870 use std::sync::Arc;
871
872 use api::v1::index::{BloomFilterMeta, InvertedIndexMetas};
873 use datatypes::vectors::Int64Vector;
874 use puffin::file_metadata::FileMetadata;
875 use store_api::storage::ColumnId;
876
877 use super::*;
878 use crate::cache::index::bloom_filter_index::Tag;
879 use crate::cache::index::result_cache::PredicateKey;
880 use crate::cache::test_util::parquet_meta;
881 use crate::sst::parquet::row_selection::RowGroupSelection;
882
883 #[tokio::test]
884 async fn test_disable_cache() {
885 let cache = CacheManager::default();
886 assert!(cache.sst_meta_cache.is_none());
887 assert!(cache.vector_cache.is_none());
888 assert!(cache.page_cache.is_none());
889
890 let region_id = RegionId::new(1, 1);
891 let file_id = RegionFileId::new(region_id, FileId::random());
892 let metadata = parquet_meta();
893 let mut metrics = MetadataCacheMetrics::default();
894 cache.put_parquet_meta_data(file_id, metadata);
895 assert!(
896 cache
897 .get_parquet_meta_data(file_id, &mut metrics, Default::default())
898 .await
899 .is_none()
900 );
901
902 let value = Value::Int64(10);
903 let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10]));
904 cache.put_repeated_vector(value.clone(), vector.clone());
905 assert!(
906 cache
907 .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
908 .is_none()
909 );
910
911 let key = PageKey::new(file_id.file_id(), 1, vec![Range { start: 0, end: 5 }]);
912 let pages = Arc::new(PageValue::default());
913 cache.put_pages(key.clone(), pages);
914 assert!(cache.get_pages(&key).is_none());
915
916 assert!(cache.write_cache().is_none());
917 }
918
919 #[tokio::test]
920 async fn test_parquet_meta_cache() {
921 let cache = CacheManager::builder().sst_meta_cache_size(2000).build();
922 let mut metrics = MetadataCacheMetrics::default();
923 let region_id = RegionId::new(1, 1);
924 let file_id = RegionFileId::new(region_id, FileId::random());
925 assert!(
926 cache
927 .get_parquet_meta_data(file_id, &mut metrics, Default::default())
928 .await
929 .is_none()
930 );
931 let metadata = parquet_meta();
932 cache.put_parquet_meta_data(file_id, metadata);
933 assert!(
934 cache
935 .get_parquet_meta_data(file_id, &mut metrics, Default::default())
936 .await
937 .is_some()
938 );
939 cache.remove_parquet_meta_data(file_id);
940 assert!(
941 cache
942 .get_parquet_meta_data(file_id, &mut metrics, Default::default())
943 .await
944 .is_none()
945 );
946 }
947
948 #[test]
949 fn test_repeated_vector_cache() {
950 let cache = CacheManager::builder().vector_cache_size(4096).build();
951 let value = Value::Int64(10);
952 assert!(
953 cache
954 .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
955 .is_none()
956 );
957 let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10]));
958 cache.put_repeated_vector(value.clone(), vector.clone());
959 let cached = cache
960 .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
961 .unwrap();
962 assert_eq!(vector, cached);
963 }
964
965 #[test]
966 fn test_page_cache() {
967 let cache = CacheManager::builder().page_cache_size(1000).build();
968 let file_id = FileId::random();
969 let key = PageKey::new(file_id, 0, vec![(0..10), (10..20)]);
970 assert!(cache.get_pages(&key).is_none());
971 let pages = Arc::new(PageValue::default());
972 cache.put_pages(key.clone(), pages);
973 assert!(cache.get_pages(&key).is_some());
974 }
975
976 #[test]
977 fn test_selector_result_cache() {
978 let cache = CacheManager::builder()
979 .selector_result_cache_size(1000)
980 .build();
981 let file_id = FileId::random();
982 let key = SelectorResultKey {
983 file_id,
984 row_group_idx: 0,
985 selector: TimeSeriesRowSelector::LastRow,
986 };
987 assert!(cache.get_selector_result(&key).is_none());
988 let result = Arc::new(SelectorResultValue::new(Vec::new(), Vec::new()));
989 cache.put_selector_result(key, result);
990 assert!(cache.get_selector_result(&key).is_some());
991 }
992
993 #[tokio::test]
994 async fn test_evict_puffin_cache_clears_all_entries() {
995 use std::collections::{BTreeMap, HashMap};
996
997 let cache = CacheManager::builder()
998 .index_metadata_size(128)
999 .index_content_size(128)
1000 .index_content_page_size(64)
1001 .index_result_cache_size(128)
1002 .puffin_metadata_size(128)
1003 .build();
1004 let cache = Arc::new(cache);
1005
1006 let region_id = RegionId::new(1, 1);
1007 let index_id = RegionIndexId::new(RegionFileId::new(region_id, FileId::random()), 0);
1008 let column_id: ColumnId = 1;
1009
1010 let bloom_cache = cache.bloom_filter_index_cache().unwrap().clone();
1011 let inverted_cache = cache.inverted_index_cache().unwrap().clone();
1012 let result_cache = cache.index_result_cache().unwrap();
1013 let puffin_metadata_cache = cache.puffin_metadata_cache().unwrap().clone();
1014
1015 let bloom_key = (
1016 index_id.file_id(),
1017 index_id.version,
1018 column_id,
1019 Tag::Skipping,
1020 );
1021 bloom_cache.put_metadata(bloom_key, Arc::new(BloomFilterMeta::default()));
1022 inverted_cache.put_metadata(
1023 (index_id.file_id(), index_id.version),
1024 Arc::new(InvertedIndexMetas::default()),
1025 );
1026 let predicate = PredicateKey::new_bloom(Arc::new(BTreeMap::new()));
1027 let selection = Arc::new(RowGroupSelection::default());
1028 result_cache.put(predicate.clone(), index_id.file_id(), selection);
1029 let file_id_str = index_id.to_string();
1030 let metadata = Arc::new(FileMetadata {
1031 blobs: Vec::new(),
1032 properties: HashMap::new(),
1033 });
1034 puffin_metadata_cache.put_metadata(file_id_str.clone(), metadata);
1035
1036 assert!(bloom_cache.get_metadata(bloom_key).is_some());
1037 assert!(
1038 inverted_cache
1039 .get_metadata((index_id.file_id(), index_id.version))
1040 .is_some()
1041 );
1042 assert!(result_cache.get(&predicate, index_id.file_id()).is_some());
1043 assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_some());
1044
1045 cache.evict_puffin_cache(index_id).await;
1046
1047 assert!(bloom_cache.get_metadata(bloom_key).is_none());
1048 assert!(
1049 inverted_cache
1050 .get_metadata((index_id.file_id(), index_id.version))
1051 .is_none()
1052 );
1053 assert!(result_cache.get(&predicate, index_id.file_id()).is_none());
1054 assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_none());
1055
1056 bloom_cache.put_metadata(bloom_key, Arc::new(BloomFilterMeta::default()));
1058 inverted_cache.put_metadata(
1059 (index_id.file_id(), index_id.version),
1060 Arc::new(InvertedIndexMetas::default()),
1061 );
1062 result_cache.put(
1063 predicate.clone(),
1064 index_id.file_id(),
1065 Arc::new(RowGroupSelection::default()),
1066 );
1067 puffin_metadata_cache.put_metadata(
1068 file_id_str.clone(),
1069 Arc::new(FileMetadata {
1070 blobs: Vec::new(),
1071 properties: HashMap::new(),
1072 }),
1073 );
1074
1075 let strategy = CacheStrategy::EnableAll(cache.clone());
1076 strategy.evict_puffin_cache(index_id).await;
1077
1078 assert!(bloom_cache.get_metadata(bloom_key).is_none());
1079 assert!(
1080 inverted_cache
1081 .get_metadata((index_id.file_id(), index_id.version))
1082 .is_none()
1083 );
1084 assert!(result_cache.get(&predicate, index_id.file_id()).is_none());
1085 assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_none());
1086 }
1087}