1pub(crate) mod cache_size;
18
19pub(crate) mod file_cache;
20pub(crate) mod index;
21pub(crate) mod manifest_cache;
22#[cfg(test)]
23pub(crate) mod test_util;
24pub(crate) mod write_cache;
25
26use std::mem;
27use std::ops::Range;
28use std::sync::Arc;
29
30use bytes::Bytes;
31use datatypes::arrow::record_batch::RecordBatch;
32use datatypes::value::Value;
33use datatypes::vectors::VectorRef;
34use index::bloom_filter_index::{BloomFilterIndexCache, BloomFilterIndexCacheRef};
35use index::result_cache::IndexResultCache;
36use moka::notification::RemovalCause;
37use moka::sync::Cache;
38use object_store::ObjectStore;
39use parquet::file::metadata::{PageIndexPolicy, ParquetMetaData};
40use puffin::puffin_manager::cache::{PuffinMetadataCache, PuffinMetadataCacheRef};
41use store_api::storage::{ConcreteDataType, FileId, RegionId, TimeSeriesRowSelector};
42
43use crate::cache::cache_size::parquet_meta_size;
44use crate::cache::file_cache::{FileType, IndexKey};
45use crate::cache::index::inverted_index::{InvertedIndexCache, InvertedIndexCacheRef};
46#[cfg(feature = "vector_index")]
47use crate::cache::index::vector_index::{VectorIndexCache, VectorIndexCacheRef};
48use crate::cache::write_cache::WriteCacheRef;
49use crate::memtable::record_batch_estimated_size;
50use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS};
51use crate::read::Batch;
52use crate::sst::file::{RegionFileId, RegionIndexId};
53use crate::sst::parquet::reader::MetadataCacheMetrics;
54
55const SST_META_TYPE: &str = "sst_meta";
57const VECTOR_TYPE: &str = "vector";
59const PAGE_TYPE: &str = "page";
61const FILE_TYPE: &str = "file";
63const INDEX_TYPE: &str = "index";
65const SELECTOR_RESULT_TYPE: &str = "selector_result";
67
68#[derive(Clone)]
70pub enum CacheStrategy {
71 EnableAll(CacheManagerRef),
74 Compaction(CacheManagerRef),
79 Disabled,
81}
82
83impl CacheStrategy {
84 pub(crate) async fn get_parquet_meta_data(
87 &self,
88 file_id: RegionFileId,
89 metrics: &mut MetadataCacheMetrics,
90 page_index_policy: PageIndexPolicy,
91 ) -> Option<Arc<ParquetMetaData>> {
92 match self {
93 CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => {
94 cache_manager
95 .get_parquet_meta_data(file_id, metrics, page_index_policy)
96 .await
97 }
98 CacheStrategy::Disabled => {
99 metrics.cache_miss += 1;
100 None
101 }
102 }
103 }
104
105 pub fn get_parquet_meta_data_from_mem_cache(
107 &self,
108 file_id: RegionFileId,
109 ) -> Option<Arc<ParquetMetaData>> {
110 match self {
111 CacheStrategy::EnableAll(cache_manager) => {
112 cache_manager.get_parquet_meta_data_from_mem_cache(file_id)
113 }
114 CacheStrategy::Compaction(cache_manager) => {
115 cache_manager.get_parquet_meta_data_from_mem_cache(file_id)
116 }
117 CacheStrategy::Disabled => None,
118 }
119 }
120
121 pub fn put_parquet_meta_data(&self, file_id: RegionFileId, metadata: Arc<ParquetMetaData>) {
123 match self {
124 CacheStrategy::EnableAll(cache_manager) => {
125 cache_manager.put_parquet_meta_data(file_id, metadata);
126 }
127 CacheStrategy::Compaction(cache_manager) => {
128 cache_manager.put_parquet_meta_data(file_id, metadata);
129 }
130 CacheStrategy::Disabled => {}
131 }
132 }
133
134 pub fn remove_parquet_meta_data(&self, file_id: RegionFileId) {
136 match self {
137 CacheStrategy::EnableAll(cache_manager) => {
138 cache_manager.remove_parquet_meta_data(file_id);
139 }
140 CacheStrategy::Compaction(cache_manager) => {
141 cache_manager.remove_parquet_meta_data(file_id);
142 }
143 CacheStrategy::Disabled => {}
144 }
145 }
146
147 pub fn get_repeated_vector(
150 &self,
151 data_type: &ConcreteDataType,
152 value: &Value,
153 ) -> Option<VectorRef> {
154 match self {
155 CacheStrategy::EnableAll(cache_manager) => {
156 cache_manager.get_repeated_vector(data_type, value)
157 }
158 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
159 }
160 }
161
162 pub fn put_repeated_vector(&self, value: Value, vector: VectorRef) {
165 if let CacheStrategy::EnableAll(cache_manager) = self {
166 cache_manager.put_repeated_vector(value, vector);
167 }
168 }
169
170 pub fn get_pages(&self, page_key: &PageKey) -> Option<Arc<PageValue>> {
173 match self {
174 CacheStrategy::EnableAll(cache_manager) => cache_manager.get_pages(page_key),
175 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
176 }
177 }
178
179 pub fn put_pages(&self, page_key: PageKey, pages: Arc<PageValue>) {
182 if let CacheStrategy::EnableAll(cache_manager) = self {
183 cache_manager.put_pages(page_key, pages);
184 }
185 }
186
187 pub async fn evict_puffin_cache(&self, file_id: RegionIndexId) {
189 match self {
190 CacheStrategy::EnableAll(cache_manager) => {
191 cache_manager.evict_puffin_cache(file_id).await
192 }
193 CacheStrategy::Compaction(cache_manager) => {
194 cache_manager.evict_puffin_cache(file_id).await
195 }
196 CacheStrategy::Disabled => {}
197 }
198 }
199
200 pub fn get_selector_result(
203 &self,
204 selector_key: &SelectorResultKey,
205 ) -> Option<Arc<SelectorResultValue>> {
206 match self {
207 CacheStrategy::EnableAll(cache_manager) => {
208 cache_manager.get_selector_result(selector_key)
209 }
210 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
211 }
212 }
213
214 pub fn put_selector_result(
217 &self,
218 selector_key: SelectorResultKey,
219 result: Arc<SelectorResultValue>,
220 ) {
221 if let CacheStrategy::EnableAll(cache_manager) = self {
222 cache_manager.put_selector_result(selector_key, result);
223 }
224 }
225
226 pub fn write_cache(&self) -> Option<&WriteCacheRef> {
229 match self {
230 CacheStrategy::EnableAll(cache_manager) => cache_manager.write_cache(),
231 CacheStrategy::Compaction(cache_manager) => cache_manager.write_cache(),
232 CacheStrategy::Disabled => None,
233 }
234 }
235
236 pub fn inverted_index_cache(&self) -> Option<&InvertedIndexCacheRef> {
239 match self {
240 CacheStrategy::EnableAll(cache_manager) => cache_manager.inverted_index_cache(),
241 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
242 }
243 }
244
245 pub fn bloom_filter_index_cache(&self) -> Option<&BloomFilterIndexCacheRef> {
248 match self {
249 CacheStrategy::EnableAll(cache_manager) => cache_manager.bloom_filter_index_cache(),
250 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
251 }
252 }
253
254 #[cfg(feature = "vector_index")]
257 pub fn vector_index_cache(&self) -> Option<&VectorIndexCacheRef> {
258 match self {
259 CacheStrategy::EnableAll(cache_manager) => cache_manager.vector_index_cache(),
260 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
261 }
262 }
263
264 pub fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
267 match self {
268 CacheStrategy::EnableAll(cache_manager) => cache_manager.puffin_metadata_cache(),
269 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
270 }
271 }
272
273 pub fn index_result_cache(&self) -> Option<&IndexResultCache> {
276 match self {
277 CacheStrategy::EnableAll(cache_manager) => cache_manager.index_result_cache(),
278 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
279 }
280 }
281
282 pub fn maybe_download_background(
284 &self,
285 index_key: IndexKey,
286 remote_path: String,
287 remote_store: ObjectStore,
288 file_size: u64,
289 ) {
290 if let CacheStrategy::EnableAll(cache_manager) = self
291 && let Some(write_cache) = cache_manager.write_cache()
292 {
293 write_cache.file_cache().maybe_download_background(
294 index_key,
295 remote_path,
296 remote_store,
297 file_size,
298 );
299 }
300 }
301}
302
303#[derive(Default)]
307pub struct CacheManager {
308 sst_meta_cache: Option<SstMetaCache>,
310 vector_cache: Option<VectorCache>,
312 page_cache: Option<PageCache>,
314 write_cache: Option<WriteCacheRef>,
316 inverted_index_cache: Option<InvertedIndexCacheRef>,
318 bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
320 #[cfg(feature = "vector_index")]
322 vector_index_cache: Option<VectorIndexCacheRef>,
323 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
325 selector_result_cache: Option<SelectorResultCache>,
327 index_result_cache: Option<IndexResultCache>,
329}
330
331pub type CacheManagerRef = Arc<CacheManager>;
332
333impl CacheManager {
334 pub fn builder() -> CacheManagerBuilder {
336 CacheManagerBuilder::default()
337 }
338
339 pub(crate) async fn get_parquet_meta_data(
342 &self,
343 file_id: RegionFileId,
344 metrics: &mut MetadataCacheMetrics,
345 page_index_policy: PageIndexPolicy,
346 ) -> Option<Arc<ParquetMetaData>> {
347 if let Some(metadata) = self.get_parquet_meta_data_from_mem_cache(file_id) {
349 metrics.mem_cache_hit += 1;
350 return Some(metadata);
351 }
352
353 let key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Parquet);
355 if let Some(write_cache) = &self.write_cache
356 && let Some(metadata) = write_cache
357 .file_cache()
358 .get_parquet_meta_data(key, metrics, page_index_policy)
359 .await
360 {
361 metrics.file_cache_hit += 1;
362 let metadata = Arc::new(metadata);
363 self.put_parquet_meta_data(file_id, metadata.clone());
365 return Some(metadata);
366 };
367 metrics.cache_miss += 1;
368
369 None
370 }
371
372 pub fn get_parquet_meta_data_from_mem_cache(
375 &self,
376 file_id: RegionFileId,
377 ) -> Option<Arc<ParquetMetaData>> {
378 self.sst_meta_cache.as_ref().and_then(|sst_meta_cache| {
380 let value = sst_meta_cache.get(&SstMetaKey(file_id.region_id(), file_id.file_id()));
381 update_hit_miss(value, SST_META_TYPE)
382 })
383 }
384
385 pub fn put_parquet_meta_data(&self, file_id: RegionFileId, metadata: Arc<ParquetMetaData>) {
387 if let Some(cache) = &self.sst_meta_cache {
388 let key = SstMetaKey(file_id.region_id(), file_id.file_id());
389 CACHE_BYTES
390 .with_label_values(&[SST_META_TYPE])
391 .add(meta_cache_weight(&key, &metadata).into());
392 cache.insert(key, metadata);
393 }
394 }
395
396 pub fn remove_parquet_meta_data(&self, file_id: RegionFileId) {
398 if let Some(cache) = &self.sst_meta_cache {
399 cache.remove(&SstMetaKey(file_id.region_id(), file_id.file_id()));
400 }
401 }
402
403 pub(crate) fn sst_meta_cache_weighted_size(&self) -> u64 {
405 self.sst_meta_cache
406 .as_ref()
407 .map(|cache| cache.weighted_size())
408 .unwrap_or(0)
409 }
410
411 pub(crate) fn sst_meta_cache_enabled(&self) -> bool {
413 self.sst_meta_cache.is_some()
414 }
415
416 pub fn get_repeated_vector(
418 &self,
419 data_type: &ConcreteDataType,
420 value: &Value,
421 ) -> Option<VectorRef> {
422 self.vector_cache.as_ref().and_then(|vector_cache| {
423 let value = vector_cache.get(&(data_type.clone(), value.clone()));
424 update_hit_miss(value, VECTOR_TYPE)
425 })
426 }
427
428 pub fn put_repeated_vector(&self, value: Value, vector: VectorRef) {
430 if let Some(cache) = &self.vector_cache {
431 let key = (vector.data_type(), value);
432 CACHE_BYTES
433 .with_label_values(&[VECTOR_TYPE])
434 .add(vector_cache_weight(&key, &vector).into());
435 cache.insert(key, vector);
436 }
437 }
438
439 pub fn get_pages(&self, page_key: &PageKey) -> Option<Arc<PageValue>> {
441 self.page_cache.as_ref().and_then(|page_cache| {
442 let value = page_cache.get(page_key);
443 update_hit_miss(value, PAGE_TYPE)
444 })
445 }
446
447 pub fn put_pages(&self, page_key: PageKey, pages: Arc<PageValue>) {
449 if let Some(cache) = &self.page_cache {
450 CACHE_BYTES
451 .with_label_values(&[PAGE_TYPE])
452 .add(page_cache_weight(&page_key, &pages).into());
453 cache.insert(page_key, pages);
454 }
455 }
456
457 pub async fn evict_puffin_cache(&self, file_id: RegionIndexId) {
459 if let Some(cache) = &self.bloom_filter_index_cache {
460 cache.invalidate_file(file_id.file_id());
461 }
462
463 if let Some(cache) = &self.inverted_index_cache {
464 cache.invalidate_file(file_id.file_id());
465 }
466
467 if let Some(cache) = &self.index_result_cache {
468 cache.invalidate_file(file_id.file_id());
469 }
470
471 #[cfg(feature = "vector_index")]
472 if let Some(cache) = &self.vector_index_cache {
473 cache.invalidate_file(file_id.file_id());
474 }
475
476 if let Some(cache) = &self.puffin_metadata_cache {
477 cache.remove(&file_id.to_string());
478 }
479
480 if let Some(write_cache) = &self.write_cache {
481 write_cache
482 .remove(IndexKey::new(
483 file_id.region_id(),
484 file_id.file_id(),
485 FileType::Puffin(file_id.version),
486 ))
487 .await;
488 }
489 }
490
491 pub fn get_selector_result(
493 &self,
494 selector_key: &SelectorResultKey,
495 ) -> Option<Arc<SelectorResultValue>> {
496 self.selector_result_cache
497 .as_ref()
498 .and_then(|selector_result_cache| selector_result_cache.get(selector_key))
499 }
500
501 pub fn put_selector_result(
503 &self,
504 selector_key: SelectorResultKey,
505 result: Arc<SelectorResultValue>,
506 ) {
507 if let Some(cache) = &self.selector_result_cache {
508 CACHE_BYTES
509 .with_label_values(&[SELECTOR_RESULT_TYPE])
510 .add(selector_result_cache_weight(&selector_key, &result).into());
511 cache.insert(selector_key, result);
512 }
513 }
514
515 pub(crate) fn write_cache(&self) -> Option<&WriteCacheRef> {
517 self.write_cache.as_ref()
518 }
519
520 pub(crate) fn inverted_index_cache(&self) -> Option<&InvertedIndexCacheRef> {
521 self.inverted_index_cache.as_ref()
522 }
523
524 pub(crate) fn bloom_filter_index_cache(&self) -> Option<&BloomFilterIndexCacheRef> {
525 self.bloom_filter_index_cache.as_ref()
526 }
527
528 #[cfg(feature = "vector_index")]
529 pub(crate) fn vector_index_cache(&self) -> Option<&VectorIndexCacheRef> {
530 self.vector_index_cache.as_ref()
531 }
532
533 pub(crate) fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
534 self.puffin_metadata_cache.as_ref()
535 }
536
537 pub(crate) fn index_result_cache(&self) -> Option<&IndexResultCache> {
538 self.index_result_cache.as_ref()
539 }
540}
541
542pub fn selector_result_cache_miss() {
544 CACHE_MISS.with_label_values(&[SELECTOR_RESULT_TYPE]).inc()
545}
546
547pub fn selector_result_cache_hit() {
549 CACHE_HIT.with_label_values(&[SELECTOR_RESULT_TYPE]).inc()
550}
551
552#[derive(Default)]
554pub struct CacheManagerBuilder {
555 sst_meta_cache_size: u64,
556 vector_cache_size: u64,
557 page_cache_size: u64,
558 index_metadata_size: u64,
559 index_content_size: u64,
560 index_content_page_size: u64,
561 index_result_cache_size: u64,
562 puffin_metadata_size: u64,
563 write_cache: Option<WriteCacheRef>,
564 selector_result_cache_size: u64,
565}
566
567impl CacheManagerBuilder {
568 pub fn sst_meta_cache_size(mut self, bytes: u64) -> Self {
570 self.sst_meta_cache_size = bytes;
571 self
572 }
573
574 pub fn vector_cache_size(mut self, bytes: u64) -> Self {
576 self.vector_cache_size = bytes;
577 self
578 }
579
580 pub fn page_cache_size(mut self, bytes: u64) -> Self {
582 self.page_cache_size = bytes;
583 self
584 }
585
586 pub fn write_cache(mut self, cache: Option<WriteCacheRef>) -> Self {
588 self.write_cache = cache;
589 self
590 }
591
592 pub fn index_metadata_size(mut self, bytes: u64) -> Self {
594 self.index_metadata_size = bytes;
595 self
596 }
597
598 pub fn index_content_size(mut self, bytes: u64) -> Self {
600 self.index_content_size = bytes;
601 self
602 }
603
604 pub fn index_content_page_size(mut self, bytes: u64) -> Self {
606 self.index_content_page_size = bytes;
607 self
608 }
609
610 pub fn index_result_cache_size(mut self, bytes: u64) -> Self {
612 self.index_result_cache_size = bytes;
613 self
614 }
615
616 pub fn puffin_metadata_size(mut self, bytes: u64) -> Self {
618 self.puffin_metadata_size = bytes;
619 self
620 }
621
622 pub fn selector_result_cache_size(mut self, bytes: u64) -> Self {
624 self.selector_result_cache_size = bytes;
625 self
626 }
627
628 pub fn build(self) -> CacheManager {
630 fn to_str(cause: RemovalCause) -> &'static str {
631 match cause {
632 RemovalCause::Expired => "expired",
633 RemovalCause::Explicit => "explicit",
634 RemovalCause::Replaced => "replaced",
635 RemovalCause::Size => "size",
636 }
637 }
638
639 let sst_meta_cache = (self.sst_meta_cache_size != 0).then(|| {
640 Cache::builder()
641 .max_capacity(self.sst_meta_cache_size)
642 .weigher(meta_cache_weight)
643 .eviction_listener(|k, v, cause| {
644 let size = meta_cache_weight(&k, &v);
645 CACHE_BYTES
646 .with_label_values(&[SST_META_TYPE])
647 .sub(size.into());
648 CACHE_EVICTION
649 .with_label_values(&[SST_META_TYPE, to_str(cause)])
650 .inc();
651 })
652 .build()
653 });
654 let vector_cache = (self.vector_cache_size != 0).then(|| {
655 Cache::builder()
656 .max_capacity(self.vector_cache_size)
657 .weigher(vector_cache_weight)
658 .eviction_listener(|k, v, cause| {
659 let size = vector_cache_weight(&k, &v);
660 CACHE_BYTES
661 .with_label_values(&[VECTOR_TYPE])
662 .sub(size.into());
663 CACHE_EVICTION
664 .with_label_values(&[VECTOR_TYPE, to_str(cause)])
665 .inc();
666 })
667 .build()
668 });
669 let page_cache = (self.page_cache_size != 0).then(|| {
670 Cache::builder()
671 .max_capacity(self.page_cache_size)
672 .weigher(page_cache_weight)
673 .eviction_listener(|k, v, cause| {
674 let size = page_cache_weight(&k, &v);
675 CACHE_BYTES.with_label_values(&[PAGE_TYPE]).sub(size.into());
676 CACHE_EVICTION
677 .with_label_values(&[PAGE_TYPE, to_str(cause)])
678 .inc();
679 })
680 .build()
681 });
682 let inverted_index_cache = InvertedIndexCache::new(
683 self.index_metadata_size,
684 self.index_content_size,
685 self.index_content_page_size,
686 );
687 let bloom_filter_index_cache = BloomFilterIndexCache::new(
689 self.index_metadata_size,
690 self.index_content_size,
691 self.index_content_page_size,
692 );
693 #[cfg(feature = "vector_index")]
694 let vector_index_cache = (self.index_content_size != 0)
695 .then(|| Arc::new(VectorIndexCache::new(self.index_content_size)));
696 let index_result_cache = (self.index_result_cache_size != 0)
697 .then(|| IndexResultCache::new(self.index_result_cache_size));
698 let puffin_metadata_cache =
699 PuffinMetadataCache::new(self.puffin_metadata_size, &CACHE_BYTES);
700 let selector_result_cache = (self.selector_result_cache_size != 0).then(|| {
701 Cache::builder()
702 .max_capacity(self.selector_result_cache_size)
703 .weigher(selector_result_cache_weight)
704 .eviction_listener(|k, v, cause| {
705 let size = selector_result_cache_weight(&k, &v);
706 CACHE_BYTES
707 .with_label_values(&[SELECTOR_RESULT_TYPE])
708 .sub(size.into());
709 CACHE_EVICTION
710 .with_label_values(&[SELECTOR_RESULT_TYPE, to_str(cause)])
711 .inc();
712 })
713 .build()
714 });
715 CacheManager {
716 sst_meta_cache,
717 vector_cache,
718 page_cache,
719 write_cache: self.write_cache,
720 inverted_index_cache: Some(Arc::new(inverted_index_cache)),
721 bloom_filter_index_cache: Some(Arc::new(bloom_filter_index_cache)),
722 #[cfg(feature = "vector_index")]
723 vector_index_cache,
724 puffin_metadata_cache: Some(Arc::new(puffin_metadata_cache)),
725 selector_result_cache,
726 index_result_cache,
727 }
728 }
729}
730
731fn meta_cache_weight(k: &SstMetaKey, v: &Arc<ParquetMetaData>) -> u32 {
732 (k.estimated_size() + parquet_meta_size(v)) as u32
734}
735
736fn vector_cache_weight(_k: &(ConcreteDataType, Value), v: &VectorRef) -> u32 {
737 (mem::size_of::<ConcreteDataType>() + mem::size_of::<Value>() + v.memory_size()) as u32
739}
740
741fn page_cache_weight(k: &PageKey, v: &Arc<PageValue>) -> u32 {
742 (k.estimated_size() + v.estimated_size()) as u32
743}
744
745fn selector_result_cache_weight(k: &SelectorResultKey, v: &Arc<SelectorResultValue>) -> u32 {
746 (mem::size_of_val(k) + v.estimated_size()) as u32
747}
748
749fn update_hit_miss<T>(value: Option<T>, cache_type: &str) -> Option<T> {
751 if value.is_some() {
752 CACHE_HIT.with_label_values(&[cache_type]).inc();
753 } else {
754 CACHE_MISS.with_label_values(&[cache_type]).inc();
755 }
756 value
757}
758
759#[derive(Debug, Clone, PartialEq, Eq, Hash)]
761struct SstMetaKey(RegionId, FileId);
762
763impl SstMetaKey {
764 fn estimated_size(&self) -> usize {
766 mem::size_of::<Self>()
767 }
768}
769
770#[derive(Debug, Clone, PartialEq, Eq, Hash)]
772pub struct ColumnPagePath {
773 region_id: RegionId,
775 file_id: FileId,
777 row_group_idx: usize,
779 column_idx: usize,
781}
782
783#[derive(Debug, Clone, PartialEq, Eq, Hash)]
788pub struct PageKey {
789 file_id: FileId,
791 row_group_idx: usize,
793 ranges: Vec<Range<u64>>,
795}
796
797impl PageKey {
798 pub fn new(file_id: FileId, row_group_idx: usize, ranges: Vec<Range<u64>>) -> PageKey {
800 PageKey {
801 file_id,
802 row_group_idx,
803 ranges,
804 }
805 }
806
807 fn estimated_size(&self) -> usize {
809 mem::size_of::<Self>() + mem::size_of_val(self.ranges.as_slice())
810 }
811}
812
813#[derive(Default)]
816pub struct PageValue {
817 pub compressed: Vec<Bytes>,
819 pub page_size: u64,
821}
822
823impl PageValue {
824 pub fn new(bytes: Vec<Bytes>, page_size: u64) -> PageValue {
826 PageValue {
827 compressed: bytes,
828 page_size,
829 }
830 }
831
832 fn estimated_size(&self) -> usize {
834 mem::size_of::<Self>()
835 + self.page_size as usize
836 + self.compressed.iter().map(mem::size_of_val).sum::<usize>()
837 }
838}
839
840#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
842pub struct SelectorResultKey {
843 pub file_id: FileId,
845 pub row_group_idx: usize,
847 pub selector: TimeSeriesRowSelector,
849}
850
851pub enum SelectorResult {
853 PrimaryKey(Vec<Batch>),
855 Flat(Vec<RecordBatch>),
857}
858
859pub struct SelectorResultValue {
861 pub result: SelectorResult,
863 pub projection: Vec<usize>,
865}
866
867impl SelectorResultValue {
868 pub fn new(result: Vec<Batch>, projection: Vec<usize>) -> SelectorResultValue {
870 SelectorResultValue {
871 result: SelectorResult::PrimaryKey(result),
872 projection,
873 }
874 }
875
876 pub fn new_flat(result: Vec<RecordBatch>, projection: Vec<usize>) -> SelectorResultValue {
878 SelectorResultValue {
879 result: SelectorResult::Flat(result),
880 projection,
881 }
882 }
883
884 fn estimated_size(&self) -> usize {
886 match &self.result {
887 SelectorResult::PrimaryKey(batches) => {
888 batches.iter().map(|batch| batch.memory_size()).sum()
889 }
890 SelectorResult::Flat(batches) => batches.iter().map(record_batch_estimated_size).sum(),
891 }
892 }
893}
894
895type SstMetaCache = Cache<SstMetaKey, Arc<ParquetMetaData>>;
897type VectorCache = Cache<(ConcreteDataType, Value), VectorRef>;
901type PageCache = Cache<PageKey, Arc<PageValue>>;
903type SelectorResultCache = Cache<SelectorResultKey, Arc<SelectorResultValue>>;
905
906#[cfg(test)]
907mod tests {
908 use std::sync::Arc;
909
910 use api::v1::index::{BloomFilterMeta, InvertedIndexMetas};
911 use datatypes::vectors::Int64Vector;
912 use puffin::file_metadata::FileMetadata;
913 use store_api::storage::ColumnId;
914
915 use super::*;
916 use crate::cache::index::bloom_filter_index::Tag;
917 use crate::cache::index::result_cache::PredicateKey;
918 use crate::cache::test_util::parquet_meta;
919 use crate::sst::parquet::row_selection::RowGroupSelection;
920
921 #[tokio::test]
922 async fn test_disable_cache() {
923 let cache = CacheManager::default();
924 assert!(cache.sst_meta_cache.is_none());
925 assert!(cache.vector_cache.is_none());
926 assert!(cache.page_cache.is_none());
927
928 let region_id = RegionId::new(1, 1);
929 let file_id = RegionFileId::new(region_id, FileId::random());
930 let metadata = parquet_meta();
931 let mut metrics = MetadataCacheMetrics::default();
932 cache.put_parquet_meta_data(file_id, metadata);
933 assert!(
934 cache
935 .get_parquet_meta_data(file_id, &mut metrics, Default::default())
936 .await
937 .is_none()
938 );
939
940 let value = Value::Int64(10);
941 let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10]));
942 cache.put_repeated_vector(value.clone(), vector.clone());
943 assert!(
944 cache
945 .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
946 .is_none()
947 );
948
949 let key = PageKey::new(file_id.file_id(), 1, vec![Range { start: 0, end: 5 }]);
950 let pages = Arc::new(PageValue::default());
951 cache.put_pages(key.clone(), pages);
952 assert!(cache.get_pages(&key).is_none());
953
954 assert!(cache.write_cache().is_none());
955 }
956
957 #[tokio::test]
958 async fn test_parquet_meta_cache() {
959 let cache = CacheManager::builder().sst_meta_cache_size(2000).build();
960 let mut metrics = MetadataCacheMetrics::default();
961 let region_id = RegionId::new(1, 1);
962 let file_id = RegionFileId::new(region_id, FileId::random());
963 assert!(
964 cache
965 .get_parquet_meta_data(file_id, &mut metrics, Default::default())
966 .await
967 .is_none()
968 );
969 let metadata = parquet_meta();
970 cache.put_parquet_meta_data(file_id, metadata);
971 assert!(
972 cache
973 .get_parquet_meta_data(file_id, &mut metrics, Default::default())
974 .await
975 .is_some()
976 );
977 cache.remove_parquet_meta_data(file_id);
978 assert!(
979 cache
980 .get_parquet_meta_data(file_id, &mut metrics, Default::default())
981 .await
982 .is_none()
983 );
984 }
985
986 #[test]
987 fn test_repeated_vector_cache() {
988 let cache = CacheManager::builder().vector_cache_size(4096).build();
989 let value = Value::Int64(10);
990 assert!(
991 cache
992 .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
993 .is_none()
994 );
995 let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10]));
996 cache.put_repeated_vector(value.clone(), vector.clone());
997 let cached = cache
998 .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
999 .unwrap();
1000 assert_eq!(vector, cached);
1001 }
1002
1003 #[test]
1004 fn test_page_cache() {
1005 let cache = CacheManager::builder().page_cache_size(1000).build();
1006 let file_id = FileId::random();
1007 let key = PageKey::new(file_id, 0, vec![(0..10), (10..20)]);
1008 assert!(cache.get_pages(&key).is_none());
1009 let pages = Arc::new(PageValue::default());
1010 cache.put_pages(key.clone(), pages);
1011 assert!(cache.get_pages(&key).is_some());
1012 }
1013
1014 #[test]
1015 fn test_selector_result_cache() {
1016 let cache = CacheManager::builder()
1017 .selector_result_cache_size(1000)
1018 .build();
1019 let file_id = FileId::random();
1020 let key = SelectorResultKey {
1021 file_id,
1022 row_group_idx: 0,
1023 selector: TimeSeriesRowSelector::LastRow,
1024 };
1025 assert!(cache.get_selector_result(&key).is_none());
1026 let result = Arc::new(SelectorResultValue::new(Vec::new(), Vec::new()));
1027 cache.put_selector_result(key, result);
1028 assert!(cache.get_selector_result(&key).is_some());
1029 }
1030
1031 #[tokio::test]
1032 async fn test_evict_puffin_cache_clears_all_entries() {
1033 use std::collections::{BTreeMap, HashMap};
1034
1035 let cache = CacheManager::builder()
1036 .index_metadata_size(128)
1037 .index_content_size(128)
1038 .index_content_page_size(64)
1039 .index_result_cache_size(128)
1040 .puffin_metadata_size(128)
1041 .build();
1042 let cache = Arc::new(cache);
1043
1044 let region_id = RegionId::new(1, 1);
1045 let index_id = RegionIndexId::new(RegionFileId::new(region_id, FileId::random()), 0);
1046 let column_id: ColumnId = 1;
1047
1048 let bloom_cache = cache.bloom_filter_index_cache().unwrap().clone();
1049 let inverted_cache = cache.inverted_index_cache().unwrap().clone();
1050 let result_cache = cache.index_result_cache().unwrap();
1051 let puffin_metadata_cache = cache.puffin_metadata_cache().unwrap().clone();
1052
1053 let bloom_key = (
1054 index_id.file_id(),
1055 index_id.version,
1056 column_id,
1057 Tag::Skipping,
1058 );
1059 bloom_cache.put_metadata(bloom_key, Arc::new(BloomFilterMeta::default()));
1060 inverted_cache.put_metadata(
1061 (index_id.file_id(), index_id.version),
1062 Arc::new(InvertedIndexMetas::default()),
1063 );
1064 let predicate = PredicateKey::new_bloom(Arc::new(BTreeMap::new()));
1065 let selection = Arc::new(RowGroupSelection::default());
1066 result_cache.put(predicate.clone(), index_id.file_id(), selection);
1067 let file_id_str = index_id.to_string();
1068 let metadata = Arc::new(FileMetadata {
1069 blobs: Vec::new(),
1070 properties: HashMap::new(),
1071 });
1072 puffin_metadata_cache.put_metadata(file_id_str.clone(), metadata);
1073
1074 assert!(bloom_cache.get_metadata(bloom_key).is_some());
1075 assert!(
1076 inverted_cache
1077 .get_metadata((index_id.file_id(), index_id.version))
1078 .is_some()
1079 );
1080 assert!(result_cache.get(&predicate, index_id.file_id()).is_some());
1081 assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_some());
1082
1083 cache.evict_puffin_cache(index_id).await;
1084
1085 assert!(bloom_cache.get_metadata(bloom_key).is_none());
1086 assert!(
1087 inverted_cache
1088 .get_metadata((index_id.file_id(), index_id.version))
1089 .is_none()
1090 );
1091 assert!(result_cache.get(&predicate, index_id.file_id()).is_none());
1092 assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_none());
1093
1094 bloom_cache.put_metadata(bloom_key, Arc::new(BloomFilterMeta::default()));
1096 inverted_cache.put_metadata(
1097 (index_id.file_id(), index_id.version),
1098 Arc::new(InvertedIndexMetas::default()),
1099 );
1100 result_cache.put(
1101 predicate.clone(),
1102 index_id.file_id(),
1103 Arc::new(RowGroupSelection::default()),
1104 );
1105 puffin_metadata_cache.put_metadata(
1106 file_id_str.clone(),
1107 Arc::new(FileMetadata {
1108 blobs: Vec::new(),
1109 properties: HashMap::new(),
1110 }),
1111 );
1112
1113 let strategy = CacheStrategy::EnableAll(cache.clone());
1114 strategy.evict_puffin_cache(index_id).await;
1115
1116 assert!(bloom_cache.get_metadata(bloom_key).is_none());
1117 assert!(
1118 inverted_cache
1119 .get_metadata((index_id.file_id(), index_id.version))
1120 .is_none()
1121 );
1122 assert!(result_cache.get(&predicate, index_id.file_id()).is_none());
1123 assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_none());
1124 }
1125}