1pub(crate) mod cache_size;
18
19pub(crate) mod file_cache;
20pub(crate) mod index;
21pub(crate) mod manifest_cache;
22#[cfg(test)]
23pub(crate) mod test_util;
24pub(crate) mod write_cache;
25
26use std::mem;
27use std::ops::Range;
28use std::sync::Arc;
29
30use bytes::Bytes;
31use common_base::readable_size::ReadableSize;
32use common_telemetry::warn;
33use datatypes::arrow::record_batch::RecordBatch;
34use datatypes::value::Value;
35use datatypes::vectors::VectorRef;
36use index::bloom_filter_index::{BloomFilterIndexCache, BloomFilterIndexCacheRef};
37use index::result_cache::IndexResultCache;
38use moka::notification::RemovalCause;
39use moka::sync::Cache;
40use object_store::ObjectStore;
41use parquet::file::metadata::{FileMetaData, PageIndexPolicy, ParquetMetaData};
42use puffin::puffin_manager::cache::{PuffinMetadataCache, PuffinMetadataCacheRef};
43use snafu::{OptionExt, ResultExt};
44use store_api::metadata::RegionMetadataRef;
45use store_api::storage::{ConcreteDataType, FileId, RegionId, TimeSeriesRowSelector};
46
47use crate::cache::cache_size::parquet_meta_size;
48use crate::cache::file_cache::{FileType, IndexKey};
49use crate::cache::index::inverted_index::{InvertedIndexCache, InvertedIndexCacheRef};
50#[cfg(feature = "vector_index")]
51use crate::cache::index::vector_index::{VectorIndexCache, VectorIndexCacheRef};
52use crate::cache::write_cache::WriteCacheRef;
53use crate::error::{InvalidMetadataSnafu, InvalidParquetSnafu, Result, UnexpectedSnafu};
54use crate::memtable::record_batch_estimated_size;
55use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS};
56use crate::read::Batch;
57use crate::read::range_cache::{RangeScanCacheKey, RangeScanCacheValue};
58use crate::sst::file::{RegionFileId, RegionIndexId};
59use crate::sst::parquet::PARQUET_METADATA_KEY;
60use crate::sst::parquet::reader::MetadataCacheMetrics;
61
62const SST_META_TYPE: &str = "sst_meta";
64const VECTOR_TYPE: &str = "vector";
66const PAGE_TYPE: &str = "page";
68const FILE_TYPE: &str = "file";
70const INDEX_TYPE: &str = "index";
72const SELECTOR_RESULT_TYPE: &str = "selector_result";
74const RANGE_RESULT_TYPE: &str = "range_result";
76const RANGE_RESULT_CONCAT_MEMORY_LIMIT: ReadableSize = ReadableSize::mb(512);
77const RANGE_RESULT_CONCAT_MEMORY_PERMIT: ReadableSize = ReadableSize::kb(1);
78
79#[derive(Debug)]
80pub(crate) struct RangeResultMemoryLimiter {
81 semaphore: Arc<tokio::sync::Semaphore>,
82 permit_bytes: usize,
83 total_permits: usize,
84}
85
86impl Default for RangeResultMemoryLimiter {
87 fn default() -> Self {
88 Self::new(
89 RANGE_RESULT_CONCAT_MEMORY_LIMIT.as_bytes() as usize,
90 RANGE_RESULT_CONCAT_MEMORY_PERMIT.as_bytes() as usize,
91 )
92 }
93}
94
95impl RangeResultMemoryLimiter {
96 pub(crate) fn new(limit_bytes: usize, permit_bytes: usize) -> Self {
97 let permit_bytes = permit_bytes.max(1);
98 let total_permits = limit_bytes
99 .div_ceil(permit_bytes)
100 .clamp(1, tokio::sync::Semaphore::MAX_PERMITS);
101 Self {
102 semaphore: Arc::new(tokio::sync::Semaphore::new(total_permits)),
103 permit_bytes,
104 total_permits,
105 }
106 }
107
108 #[cfg(test)]
109 pub(crate) fn permit_bytes(&self) -> usize {
110 self.permit_bytes
111 }
112
113 #[cfg(test)]
114 pub(crate) fn available_permits(&self) -> usize {
115 self.semaphore.available_permits()
116 }
117
118 pub(crate) async fn acquire(&self, bytes: usize) -> Result<tokio::sync::SemaphorePermit<'_>> {
119 let permits = bytes.div_ceil(self.permit_bytes).max(1);
120 if permits > self.total_permits {
121 return UnexpectedSnafu {
122 reason: format!(
123 "range result memory request of {bytes} bytes exceeds limiter capacity of {} bytes",
124 self.total_permits.saturating_mul(self.permit_bytes)
125 ),
126 }
127 .fail();
128 }
129 self.semaphore
130 .acquire_many(permits as u32)
131 .await
132 .map_err(|_| {
133 UnexpectedSnafu {
134 reason: "range result memory limiter is unexpectedly closed",
135 }
136 .build()
137 })
138 }
139}
140
141#[derive(Debug)]
146pub(crate) struct CachedSstMeta {
147 parquet_metadata: Arc<ParquetMetaData>,
148 region_metadata: RegionMetadataRef,
149 region_metadata_weight: usize,
150}
151
152impl CachedSstMeta {
153 pub(crate) fn try_new(file_path: &str, parquet_metadata: ParquetMetaData) -> Result<Self> {
154 Self::try_new_with_region_metadata(file_path, parquet_metadata, None)
155 }
156
157 pub(crate) fn try_new_with_region_metadata(
158 file_path: &str,
159 parquet_metadata: ParquetMetaData,
160 region_metadata: Option<RegionMetadataRef>,
161 ) -> Result<Self> {
162 let file_metadata = parquet_metadata.file_metadata();
163 let key_values = file_metadata
164 .key_value_metadata()
165 .context(InvalidParquetSnafu {
166 file: file_path,
167 reason: "missing key value meta",
168 })?;
169 let meta_value = key_values
170 .iter()
171 .find(|kv| kv.key == PARQUET_METADATA_KEY)
172 .with_context(|| InvalidParquetSnafu {
173 file: file_path,
174 reason: format!("key {} not found", PARQUET_METADATA_KEY),
175 })?;
176 let json = meta_value
177 .value
178 .as_ref()
179 .with_context(|| InvalidParquetSnafu {
180 file: file_path,
181 reason: format!("No value for key {}", PARQUET_METADATA_KEY),
182 })?;
183 let region_metadata = match region_metadata {
184 Some(region_metadata) => region_metadata,
185 None => Arc::new(
186 store_api::metadata::RegionMetadata::from_json(json)
187 .context(InvalidMetadataSnafu)?,
188 ),
189 };
190 let region_metadata_weight = region_metadata.estimated_size().max(json.len());
192 let parquet_metadata = Arc::new(strip_region_metadata_from_parquet(parquet_metadata));
193
194 Ok(Self {
195 parquet_metadata,
196 region_metadata,
197 region_metadata_weight,
198 })
199 }
200
201 pub(crate) fn parquet_metadata(&self) -> Arc<ParquetMetaData> {
202 self.parquet_metadata.clone()
203 }
204
205 pub(crate) fn region_metadata(&self) -> RegionMetadataRef {
206 self.region_metadata.clone()
207 }
208}
209
210fn strip_region_metadata_from_parquet(parquet_metadata: ParquetMetaData) -> ParquetMetaData {
211 let file_metadata = parquet_metadata.file_metadata();
212 let filtered_key_values = file_metadata.key_value_metadata().and_then(|key_values| {
213 let filtered = key_values
214 .iter()
215 .filter(|kv| kv.key != PARQUET_METADATA_KEY)
216 .cloned()
217 .collect::<Vec<_>>();
218 (!filtered.is_empty()).then_some(filtered)
219 });
220 let stripped_file_metadata = FileMetaData::new(
221 file_metadata.version(),
222 file_metadata.num_rows(),
223 file_metadata.created_by().map(ToString::to_string),
224 filtered_key_values,
225 file_metadata.schema_descr_ptr(),
226 file_metadata.column_orders().cloned(),
227 );
228
229 let mut builder = parquet_metadata.into_builder();
230 let row_groups = builder.take_row_groups();
231 let column_index = builder.take_column_index();
232 let offset_index = builder.take_offset_index();
233
234 parquet::file::metadata::ParquetMetaDataBuilder::new(stripped_file_metadata)
235 .set_row_groups(row_groups)
236 .set_column_index(column_index)
237 .set_offset_index(offset_index)
238 .build()
239}
240
241#[derive(Clone)]
243pub enum CacheStrategy {
244 EnableAll(CacheManagerRef),
247 Compaction(CacheManagerRef),
252 Disabled,
254}
255
256impl CacheStrategy {
257 pub(crate) async fn get_sst_meta_data(
259 &self,
260 file_id: RegionFileId,
261 metrics: &mut MetadataCacheMetrics,
262 page_index_policy: PageIndexPolicy,
263 ) -> Option<Arc<CachedSstMeta>> {
264 match self {
265 CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => {
266 cache_manager
267 .get_sst_meta_data(file_id, metrics, page_index_policy)
268 .await
269 }
270 CacheStrategy::Disabled => {
271 metrics.cache_miss += 1;
272 None
273 }
274 }
275 }
276
277 pub(crate) fn get_sst_meta_data_from_mem_cache(
279 &self,
280 file_id: RegionFileId,
281 ) -> Option<Arc<CachedSstMeta>> {
282 match self {
283 CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => {
284 cache_manager.get_sst_meta_data_from_mem_cache(file_id)
285 }
286 CacheStrategy::Disabled => None,
287 }
288 }
289
290 pub fn get_parquet_meta_data_from_mem_cache(
292 &self,
293 file_id: RegionFileId,
294 ) -> Option<Arc<ParquetMetaData>> {
295 self.get_sst_meta_data_from_mem_cache(file_id)
296 .map(|metadata| metadata.parquet_metadata())
297 }
298
299 pub(crate) fn put_sst_meta_data(&self, file_id: RegionFileId, metadata: Arc<CachedSstMeta>) {
301 match self {
302 CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => {
303 cache_manager.put_sst_meta_data(file_id, metadata);
304 }
305 CacheStrategy::Disabled => {}
306 }
307 }
308
309 pub fn put_parquet_meta_data(
311 &self,
312 file_id: RegionFileId,
313 metadata: Arc<ParquetMetaData>,
314 region_metadata: Option<RegionMetadataRef>,
315 ) {
316 match self {
317 CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => {
318 cache_manager.put_parquet_meta_data(file_id, metadata, region_metadata);
319 }
320 CacheStrategy::Disabled => {}
321 }
322 }
323
324 pub fn remove_parquet_meta_data(&self, file_id: RegionFileId) {
326 match self {
327 CacheStrategy::EnableAll(cache_manager) => {
328 cache_manager.remove_parquet_meta_data(file_id);
329 }
330 CacheStrategy::Compaction(cache_manager) => {
331 cache_manager.remove_parquet_meta_data(file_id);
332 }
333 CacheStrategy::Disabled => {}
334 }
335 }
336
337 pub fn get_repeated_vector(
340 &self,
341 data_type: &ConcreteDataType,
342 value: &Value,
343 ) -> Option<VectorRef> {
344 match self {
345 CacheStrategy::EnableAll(cache_manager) => {
346 cache_manager.get_repeated_vector(data_type, value)
347 }
348 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
349 }
350 }
351
352 pub fn put_repeated_vector(&self, value: Value, vector: VectorRef) {
355 if let CacheStrategy::EnableAll(cache_manager) = self {
356 cache_manager.put_repeated_vector(value, vector);
357 }
358 }
359
360 pub fn get_pages(&self, page_key: &PageKey) -> Option<Arc<PageValue>> {
363 match self {
364 CacheStrategy::EnableAll(cache_manager) => cache_manager.get_pages(page_key),
365 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
366 }
367 }
368
369 pub fn put_pages(&self, page_key: PageKey, pages: Arc<PageValue>) {
372 if let CacheStrategy::EnableAll(cache_manager) = self {
373 cache_manager.put_pages(page_key, pages);
374 }
375 }
376
377 pub async fn evict_puffin_cache(&self, file_id: RegionIndexId) {
379 match self {
380 CacheStrategy::EnableAll(cache_manager) => {
381 cache_manager.evict_puffin_cache(file_id).await
382 }
383 CacheStrategy::Compaction(cache_manager) => {
384 cache_manager.evict_puffin_cache(file_id).await
385 }
386 CacheStrategy::Disabled => {}
387 }
388 }
389
390 pub fn get_selector_result(
393 &self,
394 selector_key: &SelectorResultKey,
395 ) -> Option<Arc<SelectorResultValue>> {
396 match self {
397 CacheStrategy::EnableAll(cache_manager) => {
398 cache_manager.get_selector_result(selector_key)
399 }
400 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
401 }
402 }
403
404 pub fn put_selector_result(
407 &self,
408 selector_key: SelectorResultKey,
409 result: Arc<SelectorResultValue>,
410 ) {
411 if let CacheStrategy::EnableAll(cache_manager) = self {
412 cache_manager.put_selector_result(selector_key, result);
413 }
414 }
415
416 #[allow(dead_code)]
419 pub(crate) fn get_range_result(
420 &self,
421 key: &RangeScanCacheKey,
422 ) -> Option<Arc<RangeScanCacheValue>> {
423 match self {
424 CacheStrategy::EnableAll(cache_manager) => cache_manager.get_range_result(key),
425 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
426 }
427 }
428
429 pub(crate) fn put_range_result(
432 &self,
433 key: RangeScanCacheKey,
434 result: Arc<RangeScanCacheValue>,
435 ) {
436 if let CacheStrategy::EnableAll(cache_manager) = self {
437 cache_manager.put_range_result(key, result);
438 }
439 }
440
441 pub(crate) fn has_range_result_cache(&self) -> bool {
443 match self {
444 CacheStrategy::EnableAll(cache_manager) => cache_manager.has_range_result_cache(),
445 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => false,
446 }
447 }
448
449 pub(crate) fn range_result_memory_limiter(&self) -> Option<&Arc<RangeResultMemoryLimiter>> {
450 match self {
451 CacheStrategy::EnableAll(cache_manager) => {
452 Some(cache_manager.range_result_memory_limiter())
453 }
454 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
455 }
456 }
457
458 pub(crate) fn range_result_cache_size(&self) -> Option<usize> {
459 match self {
460 CacheStrategy::EnableAll(cache_manager) => {
461 Some(cache_manager.range_result_cache_size())
462 }
463 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
464 }
465 }
466
467 pub fn write_cache(&self) -> Option<&WriteCacheRef> {
470 match self {
471 CacheStrategy::EnableAll(cache_manager) => cache_manager.write_cache(),
472 CacheStrategy::Compaction(cache_manager) => cache_manager.write_cache(),
473 CacheStrategy::Disabled => None,
474 }
475 }
476
477 pub fn inverted_index_cache(&self) -> Option<&InvertedIndexCacheRef> {
480 match self {
481 CacheStrategy::EnableAll(cache_manager) => cache_manager.inverted_index_cache(),
482 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
483 }
484 }
485
486 pub fn bloom_filter_index_cache(&self) -> Option<&BloomFilterIndexCacheRef> {
489 match self {
490 CacheStrategy::EnableAll(cache_manager) => cache_manager.bloom_filter_index_cache(),
491 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
492 }
493 }
494
495 #[cfg(feature = "vector_index")]
498 pub fn vector_index_cache(&self) -> Option<&VectorIndexCacheRef> {
499 match self {
500 CacheStrategy::EnableAll(cache_manager) => cache_manager.vector_index_cache(),
501 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
502 }
503 }
504
505 pub fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
508 match self {
509 CacheStrategy::EnableAll(cache_manager) => cache_manager.puffin_metadata_cache(),
510 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
511 }
512 }
513
514 pub fn index_result_cache(&self) -> Option<&IndexResultCache> {
517 match self {
518 CacheStrategy::EnableAll(cache_manager) => cache_manager.index_result_cache(),
519 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
520 }
521 }
522
523 pub fn maybe_download_background(
525 &self,
526 index_key: IndexKey,
527 remote_path: String,
528 remote_store: ObjectStore,
529 file_size: u64,
530 ) {
531 if let CacheStrategy::EnableAll(cache_manager) = self
532 && let Some(write_cache) = cache_manager.write_cache()
533 {
534 write_cache.file_cache().maybe_download_background(
535 index_key,
536 remote_path,
537 remote_store,
538 file_size,
539 );
540 }
541 }
542}
543
544#[derive(Default)]
548pub struct CacheManager {
549 sst_meta_cache: Option<SstMetaCache>,
551 vector_cache: Option<VectorCache>,
553 page_cache: Option<PageCache>,
555 write_cache: Option<WriteCacheRef>,
557 inverted_index_cache: Option<InvertedIndexCacheRef>,
559 bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
561 #[cfg(feature = "vector_index")]
563 vector_index_cache: Option<VectorIndexCacheRef>,
564 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
566 selector_result_cache: Option<SelectorResultCache>,
568 range_result_cache: Option<RangeResultCache>,
570 range_result_cache_size: u64,
572 range_result_memory_limiter: Arc<RangeResultMemoryLimiter>,
574 index_result_cache: Option<IndexResultCache>,
576}
577
578pub type CacheManagerRef = Arc<CacheManager>;
579
580impl CacheManager {
581 pub fn builder() -> CacheManagerBuilder {
583 CacheManagerBuilder::default()
584 }
585
586 pub(crate) async fn get_sst_meta_data(
589 &self,
590 file_id: RegionFileId,
591 metrics: &mut MetadataCacheMetrics,
592 page_index_policy: PageIndexPolicy,
593 ) -> Option<Arc<CachedSstMeta>> {
594 if let Some(metadata) = self.get_sst_meta_data_from_mem_cache(file_id) {
595 metrics.mem_cache_hit += 1;
596 return Some(metadata);
597 }
598
599 let key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Parquet);
600 if let Some(write_cache) = &self.write_cache
601 && let Some(metadata) = write_cache
602 .file_cache()
603 .get_sst_meta_data(key, metrics, page_index_policy)
604 .await
605 {
606 metrics.file_cache_hit += 1;
607 self.put_sst_meta_data(file_id, metadata.clone());
608 return Some(metadata);
609 }
610
611 metrics.cache_miss += 1;
612 None
613 }
614
615 pub(crate) async fn get_parquet_meta_data(
618 &self,
619 file_id: RegionFileId,
620 metrics: &mut MetadataCacheMetrics,
621 page_index_policy: PageIndexPolicy,
622 ) -> Option<Arc<ParquetMetaData>> {
623 self.get_sst_meta_data(file_id, metrics, page_index_policy)
624 .await
625 .map(|metadata| metadata.parquet_metadata())
626 }
627
628 pub(crate) fn get_sst_meta_data_from_mem_cache(
631 &self,
632 file_id: RegionFileId,
633 ) -> Option<Arc<CachedSstMeta>> {
634 self.sst_meta_cache.as_ref().and_then(|sst_meta_cache| {
635 let value = sst_meta_cache.get(&SstMetaKey(file_id.region_id(), file_id.file_id()));
636 update_hit_miss(value, SST_META_TYPE)
637 })
638 }
639
640 pub fn get_parquet_meta_data_from_mem_cache(
643 &self,
644 file_id: RegionFileId,
645 ) -> Option<Arc<ParquetMetaData>> {
646 self.get_sst_meta_data_from_mem_cache(file_id)
647 .map(|metadata| metadata.parquet_metadata())
648 }
649
650 pub(crate) fn put_sst_meta_data(&self, file_id: RegionFileId, metadata: Arc<CachedSstMeta>) {
652 if let Some(cache) = &self.sst_meta_cache {
653 let key = SstMetaKey(file_id.region_id(), file_id.file_id());
654 CACHE_BYTES
655 .with_label_values(&[SST_META_TYPE])
656 .add(meta_cache_weight(&key, &metadata).into());
657 cache.insert(key, metadata);
658 }
659 }
660
661 pub fn put_parquet_meta_data(
663 &self,
664 file_id: RegionFileId,
665 metadata: Arc<ParquetMetaData>,
666 region_metadata: Option<RegionMetadataRef>,
667 ) {
668 if self.sst_meta_cache.is_some() {
669 let file_path = format!(
670 "region_id={}, file_id={}",
671 file_id.region_id(),
672 file_id.file_id()
673 );
674 match CachedSstMeta::try_new_with_region_metadata(
675 &file_path,
676 Arc::unwrap_or_clone(metadata),
677 region_metadata,
678 ) {
679 Ok(metadata) => self.put_sst_meta_data(file_id, Arc::new(metadata)),
680 Err(err) => warn!(
681 err; "Failed to decode region metadata while caching parquet metadata, region_id: {}, file_id: {}",
682 file_id.region_id(),
683 file_id.file_id()
684 ),
685 }
686 }
687 }
688
689 pub fn remove_parquet_meta_data(&self, file_id: RegionFileId) {
691 if let Some(cache) = &self.sst_meta_cache {
692 cache.remove(&SstMetaKey(file_id.region_id(), file_id.file_id()));
693 }
694 }
695
696 pub(crate) fn sst_meta_cache_weighted_size(&self) -> u64 {
698 self.sst_meta_cache
699 .as_ref()
700 .map(|cache| cache.weighted_size())
701 .unwrap_or(0)
702 }
703
704 pub(crate) fn sst_meta_cache_enabled(&self) -> bool {
706 self.sst_meta_cache.is_some()
707 }
708
709 pub fn get_repeated_vector(
711 &self,
712 data_type: &ConcreteDataType,
713 value: &Value,
714 ) -> Option<VectorRef> {
715 self.vector_cache.as_ref().and_then(|vector_cache| {
716 let value = vector_cache.get(&(data_type.clone(), value.clone()));
717 update_hit_miss(value, VECTOR_TYPE)
718 })
719 }
720
721 pub fn put_repeated_vector(&self, value: Value, vector: VectorRef) {
723 if let Some(cache) = &self.vector_cache {
724 let key = (vector.data_type(), value);
725 CACHE_BYTES
726 .with_label_values(&[VECTOR_TYPE])
727 .add(vector_cache_weight(&key, &vector).into());
728 cache.insert(key, vector);
729 }
730 }
731
732 pub fn get_pages(&self, page_key: &PageKey) -> Option<Arc<PageValue>> {
734 self.page_cache.as_ref().and_then(|page_cache| {
735 let value = page_cache.get(page_key);
736 update_hit_miss(value, PAGE_TYPE)
737 })
738 }
739
740 pub fn put_pages(&self, page_key: PageKey, pages: Arc<PageValue>) {
742 if let Some(cache) = &self.page_cache {
743 CACHE_BYTES
744 .with_label_values(&[PAGE_TYPE])
745 .add(page_cache_weight(&page_key, &pages).into());
746 cache.insert(page_key, pages);
747 }
748 }
749
750 pub async fn evict_puffin_cache(&self, file_id: RegionIndexId) {
752 if let Some(cache) = &self.bloom_filter_index_cache {
753 cache.invalidate_file(file_id.file_id());
754 }
755
756 if let Some(cache) = &self.inverted_index_cache {
757 cache.invalidate_file(file_id.file_id());
758 }
759
760 if let Some(cache) = &self.index_result_cache {
761 cache.invalidate_file(file_id.file_id());
762 }
763
764 #[cfg(feature = "vector_index")]
765 if let Some(cache) = &self.vector_index_cache {
766 cache.invalidate_file(file_id.file_id());
767 }
768
769 if let Some(cache) = &self.puffin_metadata_cache {
770 cache.remove(&file_id.to_string());
771 }
772
773 if let Some(write_cache) = &self.write_cache {
774 write_cache
775 .remove(IndexKey::new(
776 file_id.region_id(),
777 file_id.file_id(),
778 FileType::Puffin(file_id.version),
779 ))
780 .await;
781 }
782 }
783
784 pub fn get_selector_result(
786 &self,
787 selector_key: &SelectorResultKey,
788 ) -> Option<Arc<SelectorResultValue>> {
789 self.selector_result_cache
790 .as_ref()
791 .and_then(|selector_result_cache| selector_result_cache.get(selector_key))
792 }
793
794 pub fn put_selector_result(
796 &self,
797 selector_key: SelectorResultKey,
798 result: Arc<SelectorResultValue>,
799 ) {
800 if let Some(cache) = &self.selector_result_cache {
801 CACHE_BYTES
802 .with_label_values(&[SELECTOR_RESULT_TYPE])
803 .add(selector_result_cache_weight(&selector_key, &result).into());
804 cache.insert(selector_key, result);
805 }
806 }
807
808 #[allow(dead_code)]
810 pub(crate) fn get_range_result(
811 &self,
812 key: &RangeScanCacheKey,
813 ) -> Option<Arc<RangeScanCacheValue>> {
814 self.range_result_cache
815 .as_ref()
816 .and_then(|cache| update_hit_miss(cache.get(key), RANGE_RESULT_TYPE))
817 }
818
819 pub(crate) fn put_range_result(
821 &self,
822 key: RangeScanCacheKey,
823 result: Arc<RangeScanCacheValue>,
824 ) {
825 if let Some(cache) = &self.range_result_cache {
826 CACHE_BYTES
827 .with_label_values(&[RANGE_RESULT_TYPE])
828 .add(range_result_cache_weight(&key, &result).into());
829 cache.insert(key, result);
830 }
831 }
832
833 pub(crate) fn has_range_result_cache(&self) -> bool {
835 self.range_result_cache.is_some()
836 }
837
838 pub(crate) fn range_result_memory_limiter(&self) -> &Arc<RangeResultMemoryLimiter> {
839 &self.range_result_memory_limiter
840 }
841
842 pub(crate) fn range_result_cache_size(&self) -> usize {
843 self.range_result_cache_size as usize
844 }
845
846 pub(crate) fn write_cache(&self) -> Option<&WriteCacheRef> {
848 self.write_cache.as_ref()
849 }
850
851 pub(crate) fn inverted_index_cache(&self) -> Option<&InvertedIndexCacheRef> {
852 self.inverted_index_cache.as_ref()
853 }
854
855 pub(crate) fn bloom_filter_index_cache(&self) -> Option<&BloomFilterIndexCacheRef> {
856 self.bloom_filter_index_cache.as_ref()
857 }
858
859 #[cfg(feature = "vector_index")]
860 pub(crate) fn vector_index_cache(&self) -> Option<&VectorIndexCacheRef> {
861 self.vector_index_cache.as_ref()
862 }
863
864 pub(crate) fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
865 self.puffin_metadata_cache.as_ref()
866 }
867
868 pub(crate) fn index_result_cache(&self) -> Option<&IndexResultCache> {
869 self.index_result_cache.as_ref()
870 }
871}
872
873pub fn selector_result_cache_miss() {
875 CACHE_MISS.with_label_values(&[SELECTOR_RESULT_TYPE]).inc()
876}
877
878pub fn selector_result_cache_hit() {
880 CACHE_HIT.with_label_values(&[SELECTOR_RESULT_TYPE]).inc()
881}
882
883#[derive(Default)]
885pub struct CacheManagerBuilder {
886 sst_meta_cache_size: u64,
887 vector_cache_size: u64,
888 page_cache_size: u64,
889 index_metadata_size: u64,
890 index_content_size: u64,
891 index_content_page_size: u64,
892 index_result_cache_size: u64,
893 puffin_metadata_size: u64,
894 write_cache: Option<WriteCacheRef>,
895 selector_result_cache_size: u64,
896 range_result_cache_size: u64,
897}
898
899impl CacheManagerBuilder {
900 pub fn sst_meta_cache_size(mut self, bytes: u64) -> Self {
902 self.sst_meta_cache_size = bytes;
903 self
904 }
905
906 pub fn vector_cache_size(mut self, bytes: u64) -> Self {
908 self.vector_cache_size = bytes;
909 self
910 }
911
912 pub fn page_cache_size(mut self, bytes: u64) -> Self {
914 self.page_cache_size = bytes;
915 self
916 }
917
918 pub fn write_cache(mut self, cache: Option<WriteCacheRef>) -> Self {
920 self.write_cache = cache;
921 self
922 }
923
924 pub fn index_metadata_size(mut self, bytes: u64) -> Self {
926 self.index_metadata_size = bytes;
927 self
928 }
929
930 pub fn index_content_size(mut self, bytes: u64) -> Self {
932 self.index_content_size = bytes;
933 self
934 }
935
936 pub fn index_content_page_size(mut self, bytes: u64) -> Self {
938 self.index_content_page_size = bytes;
939 self
940 }
941
942 pub fn index_result_cache_size(mut self, bytes: u64) -> Self {
944 self.index_result_cache_size = bytes;
945 self
946 }
947
948 pub fn puffin_metadata_size(mut self, bytes: u64) -> Self {
950 self.puffin_metadata_size = bytes;
951 self
952 }
953
954 pub fn selector_result_cache_size(mut self, bytes: u64) -> Self {
956 self.selector_result_cache_size = bytes;
957 self
958 }
959
960 pub fn range_result_cache_size(mut self, bytes: u64) -> Self {
962 self.range_result_cache_size = bytes;
963 self
964 }
965
966 pub fn build(self) -> CacheManager {
968 fn to_str(cause: RemovalCause) -> &'static str {
969 match cause {
970 RemovalCause::Expired => "expired",
971 RemovalCause::Explicit => "explicit",
972 RemovalCause::Replaced => "replaced",
973 RemovalCause::Size => "size",
974 }
975 }
976
977 let sst_meta_cache = (self.sst_meta_cache_size != 0).then(|| {
978 Cache::builder()
979 .max_capacity(self.sst_meta_cache_size)
980 .weigher(meta_cache_weight)
981 .eviction_listener(|k, v, cause| {
982 let size = meta_cache_weight(&k, &v);
983 CACHE_BYTES
984 .with_label_values(&[SST_META_TYPE])
985 .sub(size.into());
986 CACHE_EVICTION
987 .with_label_values(&[SST_META_TYPE, to_str(cause)])
988 .inc();
989 })
990 .build()
991 });
992 let vector_cache = (self.vector_cache_size != 0).then(|| {
993 Cache::builder()
994 .max_capacity(self.vector_cache_size)
995 .weigher(vector_cache_weight)
996 .eviction_listener(|k, v, cause| {
997 let size = vector_cache_weight(&k, &v);
998 CACHE_BYTES
999 .with_label_values(&[VECTOR_TYPE])
1000 .sub(size.into());
1001 CACHE_EVICTION
1002 .with_label_values(&[VECTOR_TYPE, to_str(cause)])
1003 .inc();
1004 })
1005 .build()
1006 });
1007 let page_cache = (self.page_cache_size != 0).then(|| {
1008 Cache::builder()
1009 .max_capacity(self.page_cache_size)
1010 .weigher(page_cache_weight)
1011 .eviction_listener(|k, v, cause| {
1012 let size = page_cache_weight(&k, &v);
1013 CACHE_BYTES.with_label_values(&[PAGE_TYPE]).sub(size.into());
1014 CACHE_EVICTION
1015 .with_label_values(&[PAGE_TYPE, to_str(cause)])
1016 .inc();
1017 })
1018 .build()
1019 });
1020 let inverted_index_cache = InvertedIndexCache::new(
1021 self.index_metadata_size,
1022 self.index_content_size,
1023 self.index_content_page_size,
1024 );
1025 let bloom_filter_index_cache = BloomFilterIndexCache::new(
1027 self.index_metadata_size,
1028 self.index_content_size,
1029 self.index_content_page_size,
1030 );
1031 #[cfg(feature = "vector_index")]
1032 let vector_index_cache = (self.index_content_size != 0)
1033 .then(|| Arc::new(VectorIndexCache::new(self.index_content_size)));
1034 let index_result_cache = (self.index_result_cache_size != 0)
1035 .then(|| IndexResultCache::new(self.index_result_cache_size));
1036 let puffin_metadata_cache =
1037 PuffinMetadataCache::new(self.puffin_metadata_size, &CACHE_BYTES);
1038 let selector_result_cache = (self.selector_result_cache_size != 0).then(|| {
1039 Cache::builder()
1040 .max_capacity(self.selector_result_cache_size)
1041 .weigher(selector_result_cache_weight)
1042 .eviction_listener(|k, v, cause| {
1043 let size = selector_result_cache_weight(&k, &v);
1044 CACHE_BYTES
1045 .with_label_values(&[SELECTOR_RESULT_TYPE])
1046 .sub(size.into());
1047 CACHE_EVICTION
1048 .with_label_values(&[SELECTOR_RESULT_TYPE, to_str(cause)])
1049 .inc();
1050 })
1051 .build()
1052 });
1053 let range_result_cache = (self.range_result_cache_size != 0).then(|| {
1054 Cache::builder()
1055 .max_capacity(self.range_result_cache_size)
1056 .weigher(range_result_cache_weight)
1057 .eviction_listener(move |k, v, cause| {
1058 let size = range_result_cache_weight(&k, &v);
1059 CACHE_BYTES
1060 .with_label_values(&[RANGE_RESULT_TYPE])
1061 .sub(size.into());
1062 CACHE_EVICTION
1063 .with_label_values(&[RANGE_RESULT_TYPE, to_str(cause)])
1064 .inc();
1065 })
1066 .build()
1067 });
1068 CacheManager {
1069 sst_meta_cache,
1070 vector_cache,
1071 page_cache,
1072 write_cache: self.write_cache,
1073 inverted_index_cache: Some(Arc::new(inverted_index_cache)),
1074 bloom_filter_index_cache: Some(Arc::new(bloom_filter_index_cache)),
1075 #[cfg(feature = "vector_index")]
1076 vector_index_cache,
1077 puffin_metadata_cache: Some(Arc::new(puffin_metadata_cache)),
1078 selector_result_cache,
1079 range_result_cache,
1080 range_result_cache_size: self.range_result_cache_size,
1081 range_result_memory_limiter: Arc::new(RangeResultMemoryLimiter::new(
1082 self.range_result_cache_size as usize,
1083 RANGE_RESULT_CONCAT_MEMORY_PERMIT.as_bytes() as usize,
1084 )),
1085 index_result_cache,
1086 }
1087 }
1088}
1089
1090fn meta_cache_weight(k: &SstMetaKey, v: &Arc<CachedSstMeta>) -> u32 {
1091 (k.estimated_size() + parquet_meta_size(&v.parquet_metadata) + v.region_metadata_weight) as u32
1093}
1094
1095fn vector_cache_weight(_k: &(ConcreteDataType, Value), v: &VectorRef) -> u32 {
1096 (mem::size_of::<ConcreteDataType>() + mem::size_of::<Value>() + v.memory_size()) as u32
1098}
1099
1100fn page_cache_weight(k: &PageKey, v: &Arc<PageValue>) -> u32 {
1101 (k.estimated_size() + v.estimated_size()) as u32
1102}
1103
1104fn selector_result_cache_weight(k: &SelectorResultKey, v: &Arc<SelectorResultValue>) -> u32 {
1105 (mem::size_of_val(k) + v.estimated_size()) as u32
1106}
1107
1108fn range_result_cache_weight(k: &RangeScanCacheKey, v: &Arc<RangeScanCacheValue>) -> u32 {
1109 (k.estimated_size() + v.estimated_size()) as u32
1110}
1111
1112fn update_hit_miss<T>(value: Option<T>, cache_type: &str) -> Option<T> {
1114 if value.is_some() {
1115 CACHE_HIT.with_label_values(&[cache_type]).inc();
1116 } else {
1117 CACHE_MISS.with_label_values(&[cache_type]).inc();
1118 }
1119 value
1120}
1121
1122#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1124struct SstMetaKey(RegionId, FileId);
1125
1126impl SstMetaKey {
1127 fn estimated_size(&self) -> usize {
1129 mem::size_of::<Self>()
1130 }
1131}
1132
1133#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1135pub struct ColumnPagePath {
1136 region_id: RegionId,
1138 file_id: FileId,
1140 row_group_idx: usize,
1142 column_idx: usize,
1144}
1145
1146#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1151pub struct PageKey {
1152 file_id: FileId,
1154 row_group_idx: usize,
1156 ranges: Vec<Range<u64>>,
1158}
1159
1160impl PageKey {
1161 pub fn new(file_id: FileId, row_group_idx: usize, ranges: Vec<Range<u64>>) -> PageKey {
1163 PageKey {
1164 file_id,
1165 row_group_idx,
1166 ranges,
1167 }
1168 }
1169
1170 fn estimated_size(&self) -> usize {
1172 mem::size_of::<Self>() + mem::size_of_val(self.ranges.as_slice())
1173 }
1174}
1175
1176#[derive(Default)]
1179pub struct PageValue {
1180 pub compressed: Vec<Bytes>,
1182 pub page_size: u64,
1184}
1185
1186impl PageValue {
1187 pub fn new(bytes: Vec<Bytes>, page_size: u64) -> PageValue {
1189 PageValue {
1190 compressed: bytes,
1191 page_size,
1192 }
1193 }
1194
1195 fn estimated_size(&self) -> usize {
1197 mem::size_of::<Self>()
1198 + self.page_size as usize
1199 + self.compressed.iter().map(mem::size_of_val).sum::<usize>()
1200 }
1201}
1202
1203#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1205pub struct SelectorResultKey {
1206 pub file_id: FileId,
1208 pub row_group_idx: usize,
1210 pub selector: TimeSeriesRowSelector,
1212}
1213
1214pub enum SelectorResult {
1216 PrimaryKey(Vec<Batch>),
1218 Flat(Vec<RecordBatch>),
1220}
1221
1222pub struct SelectorResultValue {
1224 pub result: SelectorResult,
1226 pub projection: Vec<usize>,
1228}
1229
1230impl SelectorResultValue {
1231 pub fn new(result: Vec<Batch>, projection: Vec<usize>) -> SelectorResultValue {
1233 SelectorResultValue {
1234 result: SelectorResult::PrimaryKey(result),
1235 projection,
1236 }
1237 }
1238
1239 pub fn new_flat(result: Vec<RecordBatch>, projection: Vec<usize>) -> SelectorResultValue {
1241 SelectorResultValue {
1242 result: SelectorResult::Flat(result),
1243 projection,
1244 }
1245 }
1246
1247 fn estimated_size(&self) -> usize {
1249 match &self.result {
1250 SelectorResult::PrimaryKey(batches) => {
1251 batches.iter().map(|batch| batch.memory_size()).sum()
1252 }
1253 SelectorResult::Flat(batches) => batches.iter().map(record_batch_estimated_size).sum(),
1254 }
1255 }
1256}
1257
1258type SstMetaCache = Cache<SstMetaKey, Arc<CachedSstMeta>>;
1260type VectorCache = Cache<(ConcreteDataType, Value), VectorRef>;
1264type PageCache = Cache<PageKey, Arc<PageValue>>;
1266type SelectorResultCache = Cache<SelectorResultKey, Arc<SelectorResultValue>>;
1268type RangeResultCache = Cache<RangeScanCacheKey, Arc<RangeScanCacheValue>>;
1270
1271#[cfg(test)]
1272mod tests {
1273 use std::sync::Arc;
1274
1275 use api::v1::SemanticType;
1276 use api::v1::index::{BloomFilterMeta, InvertedIndexMetas};
1277 use datatypes::schema::ColumnSchema;
1278 use datatypes::vectors::Int64Vector;
1279 use puffin::file_metadata::FileMetadata;
1280 use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
1281 use store_api::storage::ColumnId;
1282
1283 use super::*;
1284 use crate::cache::index::bloom_filter_index::Tag;
1285 use crate::cache::index::result_cache::PredicateKey;
1286 use crate::cache::test_util::{
1287 parquet_meta, sst_parquet_meta, sst_parquet_meta_with_region_metadata,
1288 };
1289 use crate::read::range_cache::{
1290 RangeScanCacheKey, RangeScanCacheValue, ScanRequestFingerprintBuilder,
1291 };
1292 use crate::sst::parquet::row_selection::RowGroupSelection;
1293
1294 #[tokio::test]
1295 async fn test_disable_cache() {
1296 let cache = CacheManager::default();
1297 assert!(cache.sst_meta_cache.is_none());
1298 assert!(cache.vector_cache.is_none());
1299 assert!(cache.page_cache.is_none());
1300
1301 let region_id = RegionId::new(1, 1);
1302 let file_id = RegionFileId::new(region_id, FileId::random());
1303 let metadata = parquet_meta();
1304 let mut metrics = MetadataCacheMetrics::default();
1305 cache.put_parquet_meta_data(file_id, metadata, None);
1306 assert!(
1307 cache
1308 .get_parquet_meta_data(file_id, &mut metrics, Default::default())
1309 .await
1310 .is_none()
1311 );
1312
1313 let value = Value::Int64(10);
1314 let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10]));
1315 cache.put_repeated_vector(value.clone(), vector.clone());
1316 assert!(
1317 cache
1318 .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
1319 .is_none()
1320 );
1321
1322 let key = PageKey::new(file_id.file_id(), 1, vec![Range { start: 0, end: 5 }]);
1323 let pages = Arc::new(PageValue::default());
1324 cache.put_pages(key.clone(), pages);
1325 assert!(cache.get_pages(&key).is_none());
1326
1327 assert!(cache.write_cache().is_none());
1328 }
1329
1330 #[tokio::test]
1331 async fn test_parquet_meta_cache() {
1332 let cache = CacheManager::builder().sst_meta_cache_size(2000).build();
1333 let mut metrics = MetadataCacheMetrics::default();
1334 let region_id = RegionId::new(1, 1);
1335 let file_id = RegionFileId::new(region_id, FileId::random());
1336 assert!(
1337 cache
1338 .get_parquet_meta_data(file_id, &mut metrics, Default::default())
1339 .await
1340 .is_none()
1341 );
1342 let (metadata, region_metadata) = sst_parquet_meta();
1343 cache.put_parquet_meta_data(file_id, metadata, None);
1344 let cached = cache
1345 .get_sst_meta_data(file_id, &mut metrics, Default::default())
1346 .await
1347 .unwrap();
1348 assert_eq!(region_metadata, cached.region_metadata());
1349 assert!(
1350 cached
1351 .parquet_metadata()
1352 .file_metadata()
1353 .key_value_metadata()
1354 .is_none_or(|key_values| {
1355 key_values
1356 .iter()
1357 .all(|key_value| key_value.key != PARQUET_METADATA_KEY)
1358 })
1359 );
1360 cache.remove_parquet_meta_data(file_id);
1361 assert!(
1362 cache
1363 .get_parquet_meta_data(file_id, &mut metrics, Default::default())
1364 .await
1365 .is_none()
1366 );
1367 }
1368
1369 #[tokio::test]
1370 async fn test_parquet_meta_cache_with_provided_region_metadata() {
1371 let cache = CacheManager::builder().sst_meta_cache_size(2000).build();
1372 let mut metrics = MetadataCacheMetrics::default();
1373 let region_id = RegionId::new(1, 1);
1374 let file_id = RegionFileId::new(region_id, FileId::random());
1375 let (metadata, region_metadata) = sst_parquet_meta();
1376
1377 cache.put_parquet_meta_data(file_id, metadata, Some(region_metadata.clone()));
1378
1379 let cached = cache
1380 .get_sst_meta_data(file_id, &mut metrics, Default::default())
1381 .await
1382 .unwrap();
1383 assert!(Arc::ptr_eq(®ion_metadata, &cached.region_metadata()));
1384 }
1385
1386 #[test]
1387 fn test_meta_cache_weight_accounts_for_decoded_region_metadata() {
1388 let region_metadata = Arc::new(wide_region_metadata(128));
1389 let json_len = region_metadata.to_json().unwrap().len();
1390 let metadata = sst_parquet_meta_with_region_metadata(region_metadata.clone());
1391 let cached = Arc::new(
1392 CachedSstMeta::try_new("test.parquet", Arc::unwrap_or_clone(metadata)).unwrap(),
1393 );
1394 let key = SstMetaKey(region_metadata.region_id, FileId::random());
1395
1396 assert!(cached.region_metadata_weight > json_len);
1397 assert_eq!(
1398 meta_cache_weight(&key, &cached) as usize,
1399 key.estimated_size()
1400 + parquet_meta_size(&cached.parquet_metadata)
1401 + cached.region_metadata_weight
1402 );
1403 }
1404
1405 #[test]
1406 fn test_repeated_vector_cache() {
1407 let cache = CacheManager::builder().vector_cache_size(4096).build();
1408 let value = Value::Int64(10);
1409 assert!(
1410 cache
1411 .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
1412 .is_none()
1413 );
1414 let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10]));
1415 cache.put_repeated_vector(value.clone(), vector.clone());
1416 let cached = cache
1417 .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
1418 .unwrap();
1419 assert_eq!(vector, cached);
1420 }
1421
1422 #[test]
1423 fn test_page_cache() {
1424 let cache = CacheManager::builder().page_cache_size(1000).build();
1425 let file_id = FileId::random();
1426 let key = PageKey::new(file_id, 0, vec![(0..10), (10..20)]);
1427 assert!(cache.get_pages(&key).is_none());
1428 let pages = Arc::new(PageValue::default());
1429 cache.put_pages(key.clone(), pages);
1430 assert!(cache.get_pages(&key).is_some());
1431 }
1432
1433 #[test]
1434 fn test_selector_result_cache() {
1435 let cache = CacheManager::builder()
1436 .selector_result_cache_size(1000)
1437 .build();
1438 let file_id = FileId::random();
1439 let key = SelectorResultKey {
1440 file_id,
1441 row_group_idx: 0,
1442 selector: TimeSeriesRowSelector::LastRow,
1443 };
1444 assert!(cache.get_selector_result(&key).is_none());
1445 let result = Arc::new(SelectorResultValue::new(Vec::new(), Vec::new()));
1446 cache.put_selector_result(key, result);
1447 assert!(cache.get_selector_result(&key).is_some());
1448 }
1449
1450 #[test]
1451 fn test_range_result_cache() {
1452 let cache = Arc::new(
1453 CacheManager::builder()
1454 .range_result_cache_size(1024 * 1024)
1455 .build(),
1456 );
1457
1458 let key = RangeScanCacheKey {
1459 region_id: RegionId::new(1, 1),
1460 row_groups: vec![(FileId::random(), 0)],
1461 scan: ScanRequestFingerprintBuilder {
1462 read_column_ids: vec![],
1463 read_column_types: vec![],
1464 filters: vec!["tag_0 = 1".to_string()],
1465 time_filters: vec![],
1466 series_row_selector: None,
1467 append_mode: false,
1468 filter_deleted: true,
1469 merge_mode: crate::region::options::MergeMode::LastRow,
1470 partition_expr_version: 0,
1471 }
1472 .build(),
1473 };
1474 let value = Arc::new(RangeScanCacheValue::new(Vec::new(), 0));
1475
1476 assert!(cache.get_range_result(&key).is_none());
1477 cache.put_range_result(key.clone(), value.clone());
1478 assert!(cache.get_range_result(&key).is_some());
1479
1480 let enable_all = CacheStrategy::EnableAll(cache.clone());
1481 assert!(enable_all.get_range_result(&key).is_some());
1482
1483 let compaction = CacheStrategy::Compaction(cache.clone());
1484 assert!(compaction.get_range_result(&key).is_none());
1485 compaction.put_range_result(key.clone(), value.clone());
1486 assert!(cache.get_range_result(&key).is_some());
1487
1488 let disabled = CacheStrategy::Disabled;
1489 assert!(disabled.get_range_result(&key).is_none());
1490 disabled.put_range_result(key.clone(), value);
1491 assert!(cache.get_range_result(&key).is_some());
1492 }
1493
1494 #[test]
1495 fn test_range_result_cache_size_configures_limiter() {
1496 let cache_size = 3 * 1024_u64;
1497 let cache = CacheManager::builder()
1498 .range_result_cache_size(cache_size)
1499 .build();
1500
1501 assert_eq!(cache.range_result_cache_size(), cache_size as usize);
1502 assert_eq!(
1503 cache.range_result_memory_limiter().permit_bytes(),
1504 RANGE_RESULT_CONCAT_MEMORY_PERMIT.as_bytes() as usize
1505 );
1506 assert_eq!(
1507 cache.range_result_memory_limiter().available_permits(),
1508 (cache_size as usize).div_ceil(RANGE_RESULT_CONCAT_MEMORY_PERMIT.as_bytes() as usize)
1509 );
1510 }
1511
1512 #[tokio::test]
1513 async fn range_result_memory_limiter_rejects_oversized_request() {
1514 let limiter = RangeResultMemoryLimiter::new(2 * 1024, 1024);
1515 assert_eq!(limiter.available_permits(), 2);
1516
1517 let err = limiter.acquire(10 * 1024).await.unwrap_err();
1518 assert!(
1519 err.to_string().contains("exceeds limiter capacity"),
1520 "unexpected error: {err}"
1521 );
1522 assert_eq!(limiter.available_permits(), 2);
1523 }
1524
1525 #[tokio::test]
1526 async fn range_result_memory_limiter_allows_request_up_to_capacity() {
1527 let limiter = RangeResultMemoryLimiter::new(2 * 1024, 1024);
1528 let permit = limiter.acquire(2 * 1024).await.unwrap();
1529 assert_eq!(limiter.available_permits(), 0);
1530 drop(permit);
1531 assert_eq!(limiter.available_permits(), 2);
1532 }
1533
1534 #[tokio::test]
1535 async fn test_evict_puffin_cache_clears_all_entries() {
1536 use std::collections::{BTreeMap, HashMap};
1537
1538 let cache = CacheManager::builder()
1539 .index_metadata_size(128)
1540 .index_content_size(128)
1541 .index_content_page_size(64)
1542 .index_result_cache_size(128)
1543 .puffin_metadata_size(128)
1544 .build();
1545 let cache = Arc::new(cache);
1546
1547 let region_id = RegionId::new(1, 1);
1548 let index_id = RegionIndexId::new(RegionFileId::new(region_id, FileId::random()), 0);
1549 let column_id: ColumnId = 1;
1550
1551 let bloom_cache = cache.bloom_filter_index_cache().unwrap().clone();
1552 let inverted_cache = cache.inverted_index_cache().unwrap().clone();
1553 let result_cache = cache.index_result_cache().unwrap();
1554 let puffin_metadata_cache = cache.puffin_metadata_cache().unwrap().clone();
1555
1556 let bloom_key = (
1557 index_id.file_id(),
1558 index_id.version,
1559 column_id,
1560 Tag::Skipping,
1561 );
1562 bloom_cache.put_metadata(bloom_key, Arc::new(BloomFilterMeta::default()));
1563 inverted_cache.put_metadata(
1564 (index_id.file_id(), index_id.version),
1565 Arc::new(InvertedIndexMetas::default()),
1566 );
1567 let predicate = PredicateKey::new_bloom(Arc::new(BTreeMap::new()));
1568 let selection = Arc::new(RowGroupSelection::default());
1569 result_cache.put(predicate.clone(), index_id.file_id(), selection);
1570 let file_id_str = index_id.to_string();
1571 let metadata = Arc::new(FileMetadata {
1572 blobs: Vec::new(),
1573 properties: HashMap::new(),
1574 });
1575 puffin_metadata_cache.put_metadata(file_id_str.clone(), metadata);
1576
1577 assert!(bloom_cache.get_metadata(bloom_key).is_some());
1578 assert!(
1579 inverted_cache
1580 .get_metadata((index_id.file_id(), index_id.version))
1581 .is_some()
1582 );
1583 assert!(result_cache.get(&predicate, index_id.file_id()).is_some());
1584 assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_some());
1585
1586 cache.evict_puffin_cache(index_id).await;
1587
1588 assert!(bloom_cache.get_metadata(bloom_key).is_none());
1589 assert!(
1590 inverted_cache
1591 .get_metadata((index_id.file_id(), index_id.version))
1592 .is_none()
1593 );
1594 assert!(result_cache.get(&predicate, index_id.file_id()).is_none());
1595 assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_none());
1596
1597 bloom_cache.put_metadata(bloom_key, Arc::new(BloomFilterMeta::default()));
1599 inverted_cache.put_metadata(
1600 (index_id.file_id(), index_id.version),
1601 Arc::new(InvertedIndexMetas::default()),
1602 );
1603 result_cache.put(
1604 predicate.clone(),
1605 index_id.file_id(),
1606 Arc::new(RowGroupSelection::default()),
1607 );
1608 puffin_metadata_cache.put_metadata(
1609 file_id_str.clone(),
1610 Arc::new(FileMetadata {
1611 blobs: Vec::new(),
1612 properties: HashMap::new(),
1613 }),
1614 );
1615
1616 let strategy = CacheStrategy::EnableAll(cache.clone());
1617 strategy.evict_puffin_cache(index_id).await;
1618
1619 assert!(bloom_cache.get_metadata(bloom_key).is_none());
1620 assert!(
1621 inverted_cache
1622 .get_metadata((index_id.file_id(), index_id.version))
1623 .is_none()
1624 );
1625 assert!(result_cache.get(&predicate, index_id.file_id()).is_none());
1626 assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_none());
1627 }
1628
1629 fn wide_region_metadata(column_count: u32) -> RegionMetadata {
1630 let region_id = RegionId::new(1024, 7);
1631 let mut builder = RegionMetadataBuilder::new(region_id);
1632 let mut primary_key = Vec::new();
1633
1634 for column_id in 0..column_count {
1635 let semantic_type = if column_id < 32 {
1636 primary_key.push(column_id);
1637 SemanticType::Tag
1638 } else {
1639 SemanticType::Field
1640 };
1641 let mut column_schema = ColumnSchema::new(
1642 format!("wide_column_{column_id}"),
1643 ConcreteDataType::string_datatype(),
1644 true,
1645 );
1646 column_schema
1647 .mut_metadata()
1648 .insert(format!("cache_key_{column_id}"), "cache_value".repeat(4));
1649 builder.push_column_metadata(ColumnMetadata {
1650 column_schema,
1651 semantic_type,
1652 column_id,
1653 });
1654 }
1655
1656 builder.push_column_metadata(ColumnMetadata {
1657 column_schema: ColumnSchema::new(
1658 "ts",
1659 ConcreteDataType::timestamp_millisecond_datatype(),
1660 false,
1661 ),
1662 semantic_type: SemanticType::Timestamp,
1663 column_id: column_count,
1664 });
1665 builder.primary_key(primary_key);
1666
1667 builder.build().unwrap()
1668 }
1669}