1pub(crate) mod cache_size;
18
19pub(crate) mod file_cache;
20pub(crate) mod index;
21pub(crate) mod manifest_cache;
22#[cfg(test)]
23pub(crate) mod test_util;
24pub(crate) mod write_cache;
25
26use std::mem;
27use std::ops::Range;
28use std::sync::Arc;
29
30use bytes::Bytes;
31use common_base::readable_size::ReadableSize;
32use common_telemetry::warn;
33use datatypes::arrow::record_batch::RecordBatch;
34use datatypes::value::Value;
35use datatypes::vectors::VectorRef;
36use index::bloom_filter_index::{BloomFilterIndexCache, BloomFilterIndexCacheRef};
37use index::result_cache::IndexResultCache;
38use moka::notification::RemovalCause;
39use moka::sync::Cache;
40use object_store::ObjectStore;
41use parquet::file::metadata::{FileMetaData, PageIndexPolicy, ParquetMetaData};
42use puffin::puffin_manager::cache::{PuffinMetadataCache, PuffinMetadataCacheRef};
43use snafu::{OptionExt, ResultExt};
44use store_api::metadata::RegionMetadataRef;
45use store_api::storage::{ConcreteDataType, FileId, RegionId, TimeSeriesRowSelector};
46
47use crate::cache::cache_size::parquet_meta_size;
48use crate::cache::file_cache::{FileType, IndexKey};
49use crate::cache::index::inverted_index::{InvertedIndexCache, InvertedIndexCacheRef};
50#[cfg(feature = "vector_index")]
51use crate::cache::index::vector_index::{VectorIndexCache, VectorIndexCacheRef};
52use crate::cache::write_cache::WriteCacheRef;
53use crate::error::{InvalidMetadataSnafu, InvalidParquetSnafu, Result, UnexpectedSnafu};
54use crate::memtable::record_batch_estimated_size;
55use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS};
56use crate::read::Batch;
57use crate::read::range_cache::{RangeScanCacheKey, RangeScanCacheValue};
58use crate::sst::file::{RegionFileId, RegionIndexId};
59use crate::sst::parquet::PARQUET_METADATA_KEY;
60use crate::sst::parquet::read_columns::ParquetReadColumns;
61use crate::sst::parquet::reader::MetadataCacheMetrics;
62
63const SST_META_TYPE: &str = "sst_meta";
65const VECTOR_TYPE: &str = "vector";
67const PAGE_TYPE: &str = "page";
69const FILE_TYPE: &str = "file";
71const INDEX_TYPE: &str = "index";
73const SELECTOR_RESULT_TYPE: &str = "selector_result";
75const RANGE_RESULT_TYPE: &str = "range_result";
77const RANGE_RESULT_CONCAT_MEMORY_LIMIT: ReadableSize = ReadableSize::mb(512);
78const RANGE_RESULT_CONCAT_MEMORY_PERMIT: ReadableSize = ReadableSize::kb(1);
79
80#[derive(Debug)]
81pub(crate) struct RangeResultMemoryLimiter {
82 semaphore: Arc<tokio::sync::Semaphore>,
83 permit_bytes: usize,
84 total_permits: usize,
85}
86
87impl Default for RangeResultMemoryLimiter {
88 fn default() -> Self {
89 Self::new(
90 RANGE_RESULT_CONCAT_MEMORY_LIMIT.as_bytes() as usize,
91 RANGE_RESULT_CONCAT_MEMORY_PERMIT.as_bytes() as usize,
92 )
93 }
94}
95
96impl RangeResultMemoryLimiter {
97 pub(crate) fn new(limit_bytes: usize, permit_bytes: usize) -> Self {
98 let permit_bytes = permit_bytes.max(1);
99 let total_permits = limit_bytes
100 .div_ceil(permit_bytes)
101 .clamp(1, tokio::sync::Semaphore::MAX_PERMITS);
102 Self {
103 semaphore: Arc::new(tokio::sync::Semaphore::new(total_permits)),
104 permit_bytes,
105 total_permits,
106 }
107 }
108
109 #[cfg(test)]
110 pub(crate) fn permit_bytes(&self) -> usize {
111 self.permit_bytes
112 }
113
114 #[cfg(test)]
115 pub(crate) fn available_permits(&self) -> usize {
116 self.semaphore.available_permits()
117 }
118
119 pub(crate) async fn acquire(&self, bytes: usize) -> Result<tokio::sync::SemaphorePermit<'_>> {
120 let permits = bytes.div_ceil(self.permit_bytes).max(1);
121 if permits > self.total_permits {
122 return UnexpectedSnafu {
123 reason: format!(
124 "range result memory request of {bytes} bytes exceeds limiter capacity of {} bytes",
125 self.total_permits.saturating_mul(self.permit_bytes)
126 ),
127 }
128 .fail();
129 }
130 self.semaphore
131 .acquire_many(permits as u32)
132 .await
133 .map_err(|_| {
134 UnexpectedSnafu {
135 reason: "range result memory limiter is unexpectedly closed",
136 }
137 .build()
138 })
139 }
140}
141
142#[derive(Debug)]
147pub(crate) struct CachedSstMeta {
148 parquet_metadata: Arc<ParquetMetaData>,
149 region_metadata: RegionMetadataRef,
150 region_metadata_weight: usize,
151}
152
153impl CachedSstMeta {
154 pub(crate) fn try_new(file_path: &str, parquet_metadata: ParquetMetaData) -> Result<Self> {
155 Self::try_new_with_region_metadata(file_path, parquet_metadata, None)
156 }
157
158 pub(crate) fn try_new_with_region_metadata(
159 file_path: &str,
160 parquet_metadata: ParquetMetaData,
161 region_metadata: Option<RegionMetadataRef>,
162 ) -> Result<Self> {
163 let file_metadata = parquet_metadata.file_metadata();
164 let key_values = file_metadata
165 .key_value_metadata()
166 .context(InvalidParquetSnafu {
167 file: file_path,
168 reason: "missing key value meta",
169 })?;
170 let meta_value = key_values
171 .iter()
172 .find(|kv| kv.key == PARQUET_METADATA_KEY)
173 .with_context(|| InvalidParquetSnafu {
174 file: file_path,
175 reason: format!("key {} not found", PARQUET_METADATA_KEY),
176 })?;
177 let json = meta_value
178 .value
179 .as_ref()
180 .with_context(|| InvalidParquetSnafu {
181 file: file_path,
182 reason: format!("No value for key {}", PARQUET_METADATA_KEY),
183 })?;
184 let region_metadata = match region_metadata {
185 Some(region_metadata) => region_metadata,
186 None => Arc::new(
187 store_api::metadata::RegionMetadata::from_json(json)
188 .context(InvalidMetadataSnafu)?,
189 ),
190 };
191 let region_metadata_weight = region_metadata.estimated_size().max(json.len());
193 let parquet_metadata = Arc::new(strip_region_metadata_from_parquet(parquet_metadata));
194
195 Ok(Self {
196 parquet_metadata,
197 region_metadata,
198 region_metadata_weight,
199 })
200 }
201
202 pub(crate) fn parquet_metadata(&self) -> Arc<ParquetMetaData> {
203 self.parquet_metadata.clone()
204 }
205
206 pub(crate) fn region_metadata(&self) -> RegionMetadataRef {
207 self.region_metadata.clone()
208 }
209}
210
211fn strip_region_metadata_from_parquet(parquet_metadata: ParquetMetaData) -> ParquetMetaData {
212 let file_metadata = parquet_metadata.file_metadata();
213 let filtered_key_values = file_metadata.key_value_metadata().and_then(|key_values| {
214 let filtered = key_values
215 .iter()
216 .filter(|kv| kv.key != PARQUET_METADATA_KEY)
217 .cloned()
218 .collect::<Vec<_>>();
219 (!filtered.is_empty()).then_some(filtered)
220 });
221 let stripped_file_metadata = FileMetaData::new(
222 file_metadata.version(),
223 file_metadata.num_rows(),
224 file_metadata.created_by().map(ToString::to_string),
225 filtered_key_values,
226 file_metadata.schema_descr_ptr(),
227 file_metadata.column_orders().cloned(),
228 );
229
230 let mut builder = parquet_metadata.into_builder();
231 let row_groups = builder.take_row_groups();
232 let column_index = builder.take_column_index();
233 let offset_index = builder.take_offset_index();
234
235 parquet::file::metadata::ParquetMetaDataBuilder::new(stripped_file_metadata)
236 .set_row_groups(row_groups)
237 .set_column_index(column_index)
238 .set_offset_index(offset_index)
239 .build()
240}
241
242#[derive(Clone)]
244pub enum CacheStrategy {
245 EnableAll(CacheManagerRef),
248 Compaction(CacheManagerRef),
253 Disabled,
255}
256
257impl CacheStrategy {
258 pub(crate) async fn get_sst_meta_data(
260 &self,
261 file_id: RegionFileId,
262 metrics: &mut MetadataCacheMetrics,
263 page_index_policy: PageIndexPolicy,
264 ) -> Option<Arc<CachedSstMeta>> {
265 match self {
266 CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => {
267 cache_manager
268 .get_sst_meta_data(file_id, metrics, page_index_policy)
269 .await
270 }
271 CacheStrategy::Disabled => {
272 metrics.cache_miss += 1;
273 None
274 }
275 }
276 }
277
278 pub(crate) fn get_sst_meta_data_from_mem_cache(
280 &self,
281 file_id: RegionFileId,
282 ) -> Option<Arc<CachedSstMeta>> {
283 match self {
284 CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => {
285 cache_manager.get_sst_meta_data_from_mem_cache(file_id)
286 }
287 CacheStrategy::Disabled => None,
288 }
289 }
290
291 pub fn get_parquet_meta_data_from_mem_cache(
293 &self,
294 file_id: RegionFileId,
295 ) -> Option<Arc<ParquetMetaData>> {
296 self.get_sst_meta_data_from_mem_cache(file_id)
297 .map(|metadata| metadata.parquet_metadata())
298 }
299
300 pub(crate) fn put_sst_meta_data(&self, file_id: RegionFileId, metadata: Arc<CachedSstMeta>) {
302 match self {
303 CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => {
304 cache_manager.put_sst_meta_data(file_id, metadata);
305 }
306 CacheStrategy::Disabled => {}
307 }
308 }
309
310 pub fn put_parquet_meta_data(
312 &self,
313 file_id: RegionFileId,
314 metadata: Arc<ParquetMetaData>,
315 region_metadata: Option<RegionMetadataRef>,
316 ) {
317 match self {
318 CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => {
319 cache_manager.put_parquet_meta_data(file_id, metadata, region_metadata);
320 }
321 CacheStrategy::Disabled => {}
322 }
323 }
324
325 pub fn remove_parquet_meta_data(&self, file_id: RegionFileId) {
327 match self {
328 CacheStrategy::EnableAll(cache_manager) => {
329 cache_manager.remove_parquet_meta_data(file_id);
330 }
331 CacheStrategy::Compaction(cache_manager) => {
332 cache_manager.remove_parquet_meta_data(file_id);
333 }
334 CacheStrategy::Disabled => {}
335 }
336 }
337
338 pub fn get_repeated_vector(
341 &self,
342 data_type: &ConcreteDataType,
343 value: &Value,
344 ) -> Option<VectorRef> {
345 match self {
346 CacheStrategy::EnableAll(cache_manager) => {
347 cache_manager.get_repeated_vector(data_type, value)
348 }
349 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
350 }
351 }
352
353 pub fn put_repeated_vector(&self, value: Value, vector: VectorRef) {
356 if let CacheStrategy::EnableAll(cache_manager) = self {
357 cache_manager.put_repeated_vector(value, vector);
358 }
359 }
360
361 pub fn get_pages(&self, page_key: &PageKey) -> Option<Arc<PageValue>> {
364 match self {
365 CacheStrategy::EnableAll(cache_manager) => cache_manager.get_pages(page_key),
366 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
367 }
368 }
369
370 pub fn put_pages(&self, page_key: PageKey, pages: Arc<PageValue>) {
373 if let CacheStrategy::EnableAll(cache_manager) = self {
374 cache_manager.put_pages(page_key, pages);
375 }
376 }
377
378 pub async fn evict_puffin_cache(&self, file_id: RegionIndexId) {
380 match self {
381 CacheStrategy::EnableAll(cache_manager) => {
382 cache_manager.evict_puffin_cache(file_id).await
383 }
384 CacheStrategy::Compaction(cache_manager) => {
385 cache_manager.evict_puffin_cache(file_id).await
386 }
387 CacheStrategy::Disabled => {}
388 }
389 }
390
391 pub fn get_selector_result(
394 &self,
395 selector_key: &SelectorResultKey,
396 ) -> Option<Arc<SelectorResultValue>> {
397 match self {
398 CacheStrategy::EnableAll(cache_manager) => {
399 cache_manager.get_selector_result(selector_key)
400 }
401 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
402 }
403 }
404
405 pub fn put_selector_result(
408 &self,
409 selector_key: SelectorResultKey,
410 result: Arc<SelectorResultValue>,
411 ) {
412 if let CacheStrategy::EnableAll(cache_manager) = self {
413 cache_manager.put_selector_result(selector_key, result);
414 }
415 }
416
417 #[allow(dead_code)]
420 pub(crate) fn get_range_result(
421 &self,
422 key: &RangeScanCacheKey,
423 ) -> Option<Arc<RangeScanCacheValue>> {
424 match self {
425 CacheStrategy::EnableAll(cache_manager) => cache_manager.get_range_result(key),
426 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
427 }
428 }
429
430 pub(crate) fn put_range_result(
433 &self,
434 key: RangeScanCacheKey,
435 result: Arc<RangeScanCacheValue>,
436 ) {
437 if let CacheStrategy::EnableAll(cache_manager) = self {
438 cache_manager.put_range_result(key, result);
439 }
440 }
441
442 pub(crate) fn has_range_result_cache(&self) -> bool {
444 match self {
445 CacheStrategy::EnableAll(cache_manager) => cache_manager.has_range_result_cache(),
446 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => false,
447 }
448 }
449
450 pub(crate) fn range_result_memory_limiter(&self) -> Option<&Arc<RangeResultMemoryLimiter>> {
451 match self {
452 CacheStrategy::EnableAll(cache_manager) => {
453 Some(cache_manager.range_result_memory_limiter())
454 }
455 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
456 }
457 }
458
459 pub(crate) fn range_result_cache_size(&self) -> Option<usize> {
460 match self {
461 CacheStrategy::EnableAll(cache_manager) => {
462 Some(cache_manager.range_result_cache_size())
463 }
464 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
465 }
466 }
467
468 pub fn write_cache(&self) -> Option<&WriteCacheRef> {
471 match self {
472 CacheStrategy::EnableAll(cache_manager) => cache_manager.write_cache(),
473 CacheStrategy::Compaction(cache_manager) => cache_manager.write_cache(),
474 CacheStrategy::Disabled => None,
475 }
476 }
477
478 pub fn inverted_index_cache(&self) -> Option<&InvertedIndexCacheRef> {
481 match self {
482 CacheStrategy::EnableAll(cache_manager) => cache_manager.inverted_index_cache(),
483 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
484 }
485 }
486
487 pub fn bloom_filter_index_cache(&self) -> Option<&BloomFilterIndexCacheRef> {
490 match self {
491 CacheStrategy::EnableAll(cache_manager) => cache_manager.bloom_filter_index_cache(),
492 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
493 }
494 }
495
496 #[cfg(feature = "vector_index")]
499 pub fn vector_index_cache(&self) -> Option<&VectorIndexCacheRef> {
500 match self {
501 CacheStrategy::EnableAll(cache_manager) => cache_manager.vector_index_cache(),
502 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
503 }
504 }
505
506 pub fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
509 match self {
510 CacheStrategy::EnableAll(cache_manager) => cache_manager.puffin_metadata_cache(),
511 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
512 }
513 }
514
515 pub fn index_result_cache(&self) -> Option<&IndexResultCache> {
518 match self {
519 CacheStrategy::EnableAll(cache_manager) => cache_manager.index_result_cache(),
520 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
521 }
522 }
523
524 pub fn maybe_download_background(
526 &self,
527 index_key: IndexKey,
528 remote_path: String,
529 remote_store: ObjectStore,
530 file_size: u64,
531 ) {
532 if let CacheStrategy::EnableAll(cache_manager) = self
533 && let Some(write_cache) = cache_manager.write_cache()
534 {
535 write_cache.file_cache().maybe_download_background(
536 index_key,
537 remote_path,
538 remote_store,
539 file_size,
540 );
541 }
542 }
543}
544
545#[derive(Default)]
549pub struct CacheManager {
550 sst_meta_cache: Option<SstMetaCache>,
552 vector_cache: Option<VectorCache>,
554 page_cache: Option<PageCache>,
556 write_cache: Option<WriteCacheRef>,
558 inverted_index_cache: Option<InvertedIndexCacheRef>,
560 bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
562 #[cfg(feature = "vector_index")]
564 vector_index_cache: Option<VectorIndexCacheRef>,
565 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
567 selector_result_cache: Option<SelectorResultCache>,
569 range_result_cache: Option<RangeResultCache>,
571 range_result_cache_size: u64,
573 range_result_memory_limiter: Arc<RangeResultMemoryLimiter>,
575 index_result_cache: Option<IndexResultCache>,
577}
578
579pub type CacheManagerRef = Arc<CacheManager>;
580
581impl CacheManager {
582 pub fn builder() -> CacheManagerBuilder {
584 CacheManagerBuilder::default()
585 }
586
587 pub(crate) async fn get_sst_meta_data(
590 &self,
591 file_id: RegionFileId,
592 metrics: &mut MetadataCacheMetrics,
593 page_index_policy: PageIndexPolicy,
594 ) -> Option<Arc<CachedSstMeta>> {
595 if let Some(metadata) = self.get_sst_meta_data_from_mem_cache(file_id) {
596 metrics.mem_cache_hit += 1;
597 return Some(metadata);
598 }
599
600 let key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Parquet);
601 if let Some(write_cache) = &self.write_cache
602 && let Some(metadata) = write_cache
603 .file_cache()
604 .get_sst_meta_data(key, metrics, page_index_policy)
605 .await
606 {
607 metrics.file_cache_hit += 1;
608 self.put_sst_meta_data(file_id, metadata.clone());
609 return Some(metadata);
610 }
611
612 metrics.cache_miss += 1;
613 None
614 }
615
616 pub(crate) async fn get_parquet_meta_data(
619 &self,
620 file_id: RegionFileId,
621 metrics: &mut MetadataCacheMetrics,
622 page_index_policy: PageIndexPolicy,
623 ) -> Option<Arc<ParquetMetaData>> {
624 self.get_sst_meta_data(file_id, metrics, page_index_policy)
625 .await
626 .map(|metadata| metadata.parquet_metadata())
627 }
628
629 pub(crate) fn get_sst_meta_data_from_mem_cache(
632 &self,
633 file_id: RegionFileId,
634 ) -> Option<Arc<CachedSstMeta>> {
635 self.sst_meta_cache.as_ref().and_then(|sst_meta_cache| {
636 let value = sst_meta_cache.get(&SstMetaKey(file_id.region_id(), file_id.file_id()));
637 update_hit_miss(value, SST_META_TYPE)
638 })
639 }
640
641 pub fn get_parquet_meta_data_from_mem_cache(
644 &self,
645 file_id: RegionFileId,
646 ) -> Option<Arc<ParquetMetaData>> {
647 self.get_sst_meta_data_from_mem_cache(file_id)
648 .map(|metadata| metadata.parquet_metadata())
649 }
650
651 pub(crate) fn put_sst_meta_data(&self, file_id: RegionFileId, metadata: Arc<CachedSstMeta>) {
653 if let Some(cache) = &self.sst_meta_cache {
654 let key = SstMetaKey(file_id.region_id(), file_id.file_id());
655 CACHE_BYTES
656 .with_label_values(&[SST_META_TYPE])
657 .add(meta_cache_weight(&key, &metadata).into());
658 cache.insert(key, metadata);
659 }
660 }
661
662 pub fn put_parquet_meta_data(
664 &self,
665 file_id: RegionFileId,
666 metadata: Arc<ParquetMetaData>,
667 region_metadata: Option<RegionMetadataRef>,
668 ) {
669 if self.sst_meta_cache.is_some() {
670 let file_path = format!(
671 "region_id={}, file_id={}",
672 file_id.region_id(),
673 file_id.file_id()
674 );
675 match CachedSstMeta::try_new_with_region_metadata(
676 &file_path,
677 Arc::unwrap_or_clone(metadata),
678 region_metadata,
679 ) {
680 Ok(metadata) => self.put_sst_meta_data(file_id, Arc::new(metadata)),
681 Err(err) => warn!(
682 err; "Failed to decode region metadata while caching parquet metadata, region_id: {}, file_id: {}",
683 file_id.region_id(),
684 file_id.file_id()
685 ),
686 }
687 }
688 }
689
690 pub fn remove_parquet_meta_data(&self, file_id: RegionFileId) {
692 if let Some(cache) = &self.sst_meta_cache {
693 cache.remove(&SstMetaKey(file_id.region_id(), file_id.file_id()));
694 }
695 }
696
697 pub(crate) fn sst_meta_cache_weighted_size(&self) -> u64 {
699 self.sst_meta_cache
700 .as_ref()
701 .map(|cache| cache.weighted_size())
702 .unwrap_or(0)
703 }
704
705 pub(crate) fn sst_meta_cache_enabled(&self) -> bool {
707 self.sst_meta_cache.is_some()
708 }
709
710 pub fn get_repeated_vector(
712 &self,
713 data_type: &ConcreteDataType,
714 value: &Value,
715 ) -> Option<VectorRef> {
716 self.vector_cache.as_ref().and_then(|vector_cache| {
717 let value = vector_cache.get(&(data_type.clone(), value.clone()));
718 update_hit_miss(value, VECTOR_TYPE)
719 })
720 }
721
722 pub fn put_repeated_vector(&self, value: Value, vector: VectorRef) {
724 if let Some(cache) = &self.vector_cache {
725 let key = (vector.data_type(), value);
726 CACHE_BYTES
727 .with_label_values(&[VECTOR_TYPE])
728 .add(vector_cache_weight(&key, &vector).into());
729 cache.insert(key, vector);
730 }
731 }
732
733 pub fn get_pages(&self, page_key: &PageKey) -> Option<Arc<PageValue>> {
735 self.page_cache.as_ref().and_then(|page_cache| {
736 let value = page_cache.get(page_key);
737 update_hit_miss(value, PAGE_TYPE)
738 })
739 }
740
741 pub fn put_pages(&self, page_key: PageKey, pages: Arc<PageValue>) {
743 if let Some(cache) = &self.page_cache {
744 CACHE_BYTES
745 .with_label_values(&[PAGE_TYPE])
746 .add(page_cache_weight(&page_key, &pages).into());
747 cache.insert(page_key, pages);
748 }
749 }
750
751 pub async fn evict_puffin_cache(&self, file_id: RegionIndexId) {
753 if let Some(cache) = &self.bloom_filter_index_cache {
754 cache.invalidate_file(file_id.file_id());
755 }
756
757 if let Some(cache) = &self.inverted_index_cache {
758 cache.invalidate_file(file_id.file_id());
759 }
760
761 if let Some(cache) = &self.index_result_cache {
762 cache.invalidate_file(file_id.file_id());
763 }
764
765 #[cfg(feature = "vector_index")]
766 if let Some(cache) = &self.vector_index_cache {
767 cache.invalidate_file(file_id.file_id());
768 }
769
770 if let Some(cache) = &self.puffin_metadata_cache {
771 cache.remove(&file_id.to_string());
772 }
773
774 if let Some(write_cache) = &self.write_cache {
775 write_cache
776 .remove(IndexKey::new(
777 file_id.region_id(),
778 file_id.file_id(),
779 FileType::Puffin(file_id.version),
780 ))
781 .await;
782 }
783 }
784
785 pub fn get_selector_result(
787 &self,
788 selector_key: &SelectorResultKey,
789 ) -> Option<Arc<SelectorResultValue>> {
790 self.selector_result_cache
791 .as_ref()
792 .and_then(|selector_result_cache| selector_result_cache.get(selector_key))
793 }
794
795 pub fn put_selector_result(
797 &self,
798 selector_key: SelectorResultKey,
799 result: Arc<SelectorResultValue>,
800 ) {
801 if let Some(cache) = &self.selector_result_cache {
802 CACHE_BYTES
803 .with_label_values(&[SELECTOR_RESULT_TYPE])
804 .add(selector_result_cache_weight(&selector_key, &result).into());
805 cache.insert(selector_key, result);
806 }
807 }
808
809 #[allow(dead_code)]
811 pub(crate) fn get_range_result(
812 &self,
813 key: &RangeScanCacheKey,
814 ) -> Option<Arc<RangeScanCacheValue>> {
815 self.range_result_cache
816 .as_ref()
817 .and_then(|cache| update_hit_miss(cache.get(key), RANGE_RESULT_TYPE))
818 }
819
820 pub(crate) fn put_range_result(
822 &self,
823 key: RangeScanCacheKey,
824 result: Arc<RangeScanCacheValue>,
825 ) {
826 if let Some(cache) = &self.range_result_cache {
827 CACHE_BYTES
828 .with_label_values(&[RANGE_RESULT_TYPE])
829 .add(range_result_cache_weight(&key, &result).into());
830 cache.insert(key, result);
831 }
832 }
833
834 pub(crate) fn has_range_result_cache(&self) -> bool {
836 self.range_result_cache.is_some()
837 }
838
839 pub(crate) fn range_result_memory_limiter(&self) -> &Arc<RangeResultMemoryLimiter> {
840 &self.range_result_memory_limiter
841 }
842
843 pub(crate) fn range_result_cache_size(&self) -> usize {
844 self.range_result_cache_size as usize
845 }
846
847 pub(crate) fn write_cache(&self) -> Option<&WriteCacheRef> {
849 self.write_cache.as_ref()
850 }
851
852 pub(crate) fn inverted_index_cache(&self) -> Option<&InvertedIndexCacheRef> {
853 self.inverted_index_cache.as_ref()
854 }
855
856 pub(crate) fn bloom_filter_index_cache(&self) -> Option<&BloomFilterIndexCacheRef> {
857 self.bloom_filter_index_cache.as_ref()
858 }
859
860 #[cfg(feature = "vector_index")]
861 pub(crate) fn vector_index_cache(&self) -> Option<&VectorIndexCacheRef> {
862 self.vector_index_cache.as_ref()
863 }
864
865 pub(crate) fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
866 self.puffin_metadata_cache.as_ref()
867 }
868
869 pub(crate) fn index_result_cache(&self) -> Option<&IndexResultCache> {
870 self.index_result_cache.as_ref()
871 }
872}
873
874pub fn selector_result_cache_miss() {
876 CACHE_MISS.with_label_values(&[SELECTOR_RESULT_TYPE]).inc()
877}
878
879pub fn selector_result_cache_hit() {
881 CACHE_HIT.with_label_values(&[SELECTOR_RESULT_TYPE]).inc()
882}
883
884#[derive(Default)]
886pub struct CacheManagerBuilder {
887 sst_meta_cache_size: u64,
888 vector_cache_size: u64,
889 page_cache_size: u64,
890 index_metadata_size: u64,
891 index_content_size: u64,
892 index_content_page_size: u64,
893 index_result_cache_size: u64,
894 puffin_metadata_size: u64,
895 write_cache: Option<WriteCacheRef>,
896 selector_result_cache_size: u64,
897 range_result_cache_size: u64,
898}
899
900impl CacheManagerBuilder {
901 pub fn sst_meta_cache_size(mut self, bytes: u64) -> Self {
903 self.sst_meta_cache_size = bytes;
904 self
905 }
906
907 pub fn vector_cache_size(mut self, bytes: u64) -> Self {
909 self.vector_cache_size = bytes;
910 self
911 }
912
913 pub fn page_cache_size(mut self, bytes: u64) -> Self {
915 self.page_cache_size = bytes;
916 self
917 }
918
919 pub fn write_cache(mut self, cache: Option<WriteCacheRef>) -> Self {
921 self.write_cache = cache;
922 self
923 }
924
925 pub fn index_metadata_size(mut self, bytes: u64) -> Self {
927 self.index_metadata_size = bytes;
928 self
929 }
930
931 pub fn index_content_size(mut self, bytes: u64) -> Self {
933 self.index_content_size = bytes;
934 self
935 }
936
937 pub fn index_content_page_size(mut self, bytes: u64) -> Self {
939 self.index_content_page_size = bytes;
940 self
941 }
942
943 pub fn index_result_cache_size(mut self, bytes: u64) -> Self {
945 self.index_result_cache_size = bytes;
946 self
947 }
948
949 pub fn puffin_metadata_size(mut self, bytes: u64) -> Self {
951 self.puffin_metadata_size = bytes;
952 self
953 }
954
955 pub fn selector_result_cache_size(mut self, bytes: u64) -> Self {
957 self.selector_result_cache_size = bytes;
958 self
959 }
960
961 pub fn range_result_cache_size(mut self, bytes: u64) -> Self {
963 self.range_result_cache_size = bytes;
964 self
965 }
966
967 pub fn build(self) -> CacheManager {
969 fn to_str(cause: RemovalCause) -> &'static str {
970 match cause {
971 RemovalCause::Expired => "expired",
972 RemovalCause::Explicit => "explicit",
973 RemovalCause::Replaced => "replaced",
974 RemovalCause::Size => "size",
975 }
976 }
977
978 let sst_meta_cache = (self.sst_meta_cache_size != 0).then(|| {
979 Cache::builder()
980 .max_capacity(self.sst_meta_cache_size)
981 .weigher(meta_cache_weight)
982 .eviction_listener(|k, v, cause| {
983 let size = meta_cache_weight(&k, &v);
984 CACHE_BYTES
985 .with_label_values(&[SST_META_TYPE])
986 .sub(size.into());
987 CACHE_EVICTION
988 .with_label_values(&[SST_META_TYPE, to_str(cause)])
989 .inc();
990 })
991 .build()
992 });
993 let vector_cache = (self.vector_cache_size != 0).then(|| {
994 Cache::builder()
995 .max_capacity(self.vector_cache_size)
996 .weigher(vector_cache_weight)
997 .eviction_listener(|k, v, cause| {
998 let size = vector_cache_weight(&k, &v);
999 CACHE_BYTES
1000 .with_label_values(&[VECTOR_TYPE])
1001 .sub(size.into());
1002 CACHE_EVICTION
1003 .with_label_values(&[VECTOR_TYPE, to_str(cause)])
1004 .inc();
1005 })
1006 .build()
1007 });
1008 let page_cache = (self.page_cache_size != 0).then(|| {
1009 Cache::builder()
1010 .max_capacity(self.page_cache_size)
1011 .weigher(page_cache_weight)
1012 .eviction_listener(|k, v, cause| {
1013 let size = page_cache_weight(&k, &v);
1014 CACHE_BYTES.with_label_values(&[PAGE_TYPE]).sub(size.into());
1015 CACHE_EVICTION
1016 .with_label_values(&[PAGE_TYPE, to_str(cause)])
1017 .inc();
1018 })
1019 .build()
1020 });
1021 let inverted_index_cache = InvertedIndexCache::new(
1022 self.index_metadata_size,
1023 self.index_content_size,
1024 self.index_content_page_size,
1025 );
1026 let bloom_filter_index_cache = BloomFilterIndexCache::new(
1028 self.index_metadata_size,
1029 self.index_content_size,
1030 self.index_content_page_size,
1031 );
1032 #[cfg(feature = "vector_index")]
1033 let vector_index_cache = (self.index_content_size != 0)
1034 .then(|| Arc::new(VectorIndexCache::new(self.index_content_size)));
1035 let index_result_cache = (self.index_result_cache_size != 0)
1036 .then(|| IndexResultCache::new(self.index_result_cache_size));
1037 let puffin_metadata_cache =
1038 PuffinMetadataCache::new(self.puffin_metadata_size, &CACHE_BYTES);
1039 let selector_result_cache = (self.selector_result_cache_size != 0).then(|| {
1040 Cache::builder()
1041 .max_capacity(self.selector_result_cache_size)
1042 .weigher(selector_result_cache_weight)
1043 .eviction_listener(|k, v, cause| {
1044 let size = selector_result_cache_weight(&k, &v);
1045 CACHE_BYTES
1046 .with_label_values(&[SELECTOR_RESULT_TYPE])
1047 .sub(size.into());
1048 CACHE_EVICTION
1049 .with_label_values(&[SELECTOR_RESULT_TYPE, to_str(cause)])
1050 .inc();
1051 })
1052 .build()
1053 });
1054 let range_result_cache = (self.range_result_cache_size != 0).then(|| {
1055 Cache::builder()
1056 .max_capacity(self.range_result_cache_size)
1057 .weigher(range_result_cache_weight)
1058 .eviction_listener(move |k, v, cause| {
1059 let size = range_result_cache_weight(&k, &v);
1060 CACHE_BYTES
1061 .with_label_values(&[RANGE_RESULT_TYPE])
1062 .sub(size.into());
1063 CACHE_EVICTION
1064 .with_label_values(&[RANGE_RESULT_TYPE, to_str(cause)])
1065 .inc();
1066 })
1067 .build()
1068 });
1069 CacheManager {
1070 sst_meta_cache,
1071 vector_cache,
1072 page_cache,
1073 write_cache: self.write_cache,
1074 inverted_index_cache: Some(Arc::new(inverted_index_cache)),
1075 bloom_filter_index_cache: Some(Arc::new(bloom_filter_index_cache)),
1076 #[cfg(feature = "vector_index")]
1077 vector_index_cache,
1078 puffin_metadata_cache: Some(Arc::new(puffin_metadata_cache)),
1079 selector_result_cache,
1080 range_result_cache,
1081 range_result_cache_size: self.range_result_cache_size,
1082 range_result_memory_limiter: Arc::new(RangeResultMemoryLimiter::new(
1083 self.range_result_cache_size as usize,
1084 RANGE_RESULT_CONCAT_MEMORY_PERMIT.as_bytes() as usize,
1085 )),
1086 index_result_cache,
1087 }
1088 }
1089}
1090
1091fn meta_cache_weight(k: &SstMetaKey, v: &Arc<CachedSstMeta>) -> u32 {
1092 (k.estimated_size() + parquet_meta_size(&v.parquet_metadata) + v.region_metadata_weight) as u32
1094}
1095
1096fn vector_cache_weight(_k: &(ConcreteDataType, Value), v: &VectorRef) -> u32 {
1097 (mem::size_of::<ConcreteDataType>() + mem::size_of::<Value>() + v.memory_size()) as u32
1099}
1100
1101fn page_cache_weight(k: &PageKey, v: &Arc<PageValue>) -> u32 {
1102 (k.estimated_size() + v.estimated_size()) as u32
1103}
1104
1105fn selector_result_cache_weight(k: &SelectorResultKey, v: &Arc<SelectorResultValue>) -> u32 {
1106 (mem::size_of_val(k) + v.estimated_size()) as u32
1107}
1108
1109fn range_result_cache_weight(k: &RangeScanCacheKey, v: &Arc<RangeScanCacheValue>) -> u32 {
1110 (k.estimated_size() + v.estimated_size()) as u32
1111}
1112
1113fn update_hit_miss<T>(value: Option<T>, cache_type: &str) -> Option<T> {
1115 if value.is_some() {
1116 CACHE_HIT.with_label_values(&[cache_type]).inc();
1117 } else {
1118 CACHE_MISS.with_label_values(&[cache_type]).inc();
1119 }
1120 value
1121}
1122
1123#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1125struct SstMetaKey(RegionId, FileId);
1126
1127impl SstMetaKey {
1128 fn estimated_size(&self) -> usize {
1130 mem::size_of::<Self>()
1131 }
1132}
1133
1134#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1136pub struct ColumnPagePath {
1137 region_id: RegionId,
1139 file_id: FileId,
1141 row_group_idx: usize,
1143 column_idx: usize,
1145}
1146
1147#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1152pub struct PageKey {
1153 file_id: FileId,
1155 row_group_idx: usize,
1157 ranges: Vec<Range<u64>>,
1159}
1160
1161impl PageKey {
1162 pub fn new(file_id: FileId, row_group_idx: usize, ranges: Vec<Range<u64>>) -> PageKey {
1164 PageKey {
1165 file_id,
1166 row_group_idx,
1167 ranges,
1168 }
1169 }
1170
1171 fn estimated_size(&self) -> usize {
1173 mem::size_of::<Self>() + mem::size_of_val(self.ranges.as_slice())
1174 }
1175}
1176
1177#[derive(Default)]
1180pub struct PageValue {
1181 pub compressed: Vec<Bytes>,
1183 pub page_size: u64,
1185}
1186
1187impl PageValue {
1188 pub fn new(bytes: Vec<Bytes>, page_size: u64) -> PageValue {
1190 PageValue {
1191 compressed: bytes,
1192 page_size,
1193 }
1194 }
1195
1196 fn estimated_size(&self) -> usize {
1198 mem::size_of::<Self>()
1199 + self.page_size as usize
1200 + self.compressed.iter().map(mem::size_of_val).sum::<usize>()
1201 }
1202}
1203
1204#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1206pub struct SelectorResultKey {
1207 pub file_id: FileId,
1209 pub row_group_idx: usize,
1211 pub selector: TimeSeriesRowSelector,
1213}
1214
1215pub enum SelectorResult {
1217 PrimaryKey(Vec<Batch>),
1219 Flat(Vec<RecordBatch>),
1221}
1222
1223pub struct SelectorResultValue {
1225 pub result: SelectorResult,
1227 pub read_cols: ParquetReadColumns,
1229}
1230
1231impl SelectorResultValue {
1232 pub fn new(result: Vec<Batch>, read_cols: ParquetReadColumns) -> SelectorResultValue {
1234 SelectorResultValue {
1235 result: SelectorResult::PrimaryKey(result),
1236 read_cols,
1237 }
1238 }
1239
1240 pub fn new_flat(
1242 result: Vec<RecordBatch>,
1243 read_cols: ParquetReadColumns,
1244 ) -> SelectorResultValue {
1245 SelectorResultValue {
1246 result: SelectorResult::Flat(result),
1247 read_cols,
1248 }
1249 }
1250
1251 fn estimated_size(&self) -> usize {
1253 match &self.result {
1254 SelectorResult::PrimaryKey(batches) => {
1255 batches.iter().map(|batch| batch.memory_size()).sum()
1256 }
1257 SelectorResult::Flat(batches) => batches.iter().map(record_batch_estimated_size).sum(),
1258 }
1259 }
1260}
1261
1262type SstMetaCache = Cache<SstMetaKey, Arc<CachedSstMeta>>;
1264type VectorCache = Cache<(ConcreteDataType, Value), VectorRef>;
1268type PageCache = Cache<PageKey, Arc<PageValue>>;
1270type SelectorResultCache = Cache<SelectorResultKey, Arc<SelectorResultValue>>;
1272type RangeResultCache = Cache<RangeScanCacheKey, Arc<RangeScanCacheValue>>;
1274
1275#[cfg(test)]
1276mod tests {
1277 use std::sync::Arc;
1278
1279 use api::v1::SemanticType;
1280 use api::v1::index::{BloomFilterMeta, InvertedIndexMetas};
1281 use datatypes::schema::ColumnSchema;
1282 use datatypes::vectors::Int64Vector;
1283 use puffin::file_metadata::FileMetadata;
1284 use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
1285 use store_api::storage::ColumnId;
1286
1287 use super::*;
1288 use crate::cache::index::bloom_filter_index::Tag;
1289 use crate::cache::index::result_cache::PredicateKey;
1290 use crate::cache::test_util::{
1291 parquet_meta, sst_parquet_meta, sst_parquet_meta_with_region_metadata,
1292 };
1293 use crate::read::range_cache::{
1294 RangeScanCacheKey, RangeScanCacheValue, ScanRequestFingerprintBuilder,
1295 };
1296 use crate::read::read_columns::ReadColumns;
1297 use crate::sst::parquet::row_selection::RowGroupSelection;
1298
1299 #[tokio::test]
1300 async fn test_disable_cache() {
1301 let cache = CacheManager::default();
1302 assert!(cache.sst_meta_cache.is_none());
1303 assert!(cache.vector_cache.is_none());
1304 assert!(cache.page_cache.is_none());
1305
1306 let region_id = RegionId::new(1, 1);
1307 let file_id = RegionFileId::new(region_id, FileId::random());
1308 let metadata = parquet_meta();
1309 let mut metrics = MetadataCacheMetrics::default();
1310 cache.put_parquet_meta_data(file_id, metadata, None);
1311 assert!(
1312 cache
1313 .get_parquet_meta_data(file_id, &mut metrics, Default::default())
1314 .await
1315 .is_none()
1316 );
1317
1318 let value = Value::Int64(10);
1319 let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10]));
1320 cache.put_repeated_vector(value.clone(), vector.clone());
1321 assert!(
1322 cache
1323 .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
1324 .is_none()
1325 );
1326
1327 let key = PageKey::new(file_id.file_id(), 1, vec![Range { start: 0, end: 5 }]);
1328 let pages = Arc::new(PageValue::default());
1329 cache.put_pages(key.clone(), pages);
1330 assert!(cache.get_pages(&key).is_none());
1331
1332 assert!(cache.write_cache().is_none());
1333 }
1334
1335 #[tokio::test]
1336 async fn test_parquet_meta_cache() {
1337 let cache = CacheManager::builder().sst_meta_cache_size(2000).build();
1338 let mut metrics = MetadataCacheMetrics::default();
1339 let region_id = RegionId::new(1, 1);
1340 let file_id = RegionFileId::new(region_id, FileId::random());
1341 assert!(
1342 cache
1343 .get_parquet_meta_data(file_id, &mut metrics, Default::default())
1344 .await
1345 .is_none()
1346 );
1347 let (metadata, region_metadata) = sst_parquet_meta();
1348 cache.put_parquet_meta_data(file_id, metadata, None);
1349 let cached = cache
1350 .get_sst_meta_data(file_id, &mut metrics, Default::default())
1351 .await
1352 .unwrap();
1353 assert_eq!(region_metadata, cached.region_metadata());
1354 assert!(
1355 cached
1356 .parquet_metadata()
1357 .file_metadata()
1358 .key_value_metadata()
1359 .is_none_or(|key_values| {
1360 key_values
1361 .iter()
1362 .all(|key_value| key_value.key != PARQUET_METADATA_KEY)
1363 })
1364 );
1365 cache.remove_parquet_meta_data(file_id);
1366 assert!(
1367 cache
1368 .get_parquet_meta_data(file_id, &mut metrics, Default::default())
1369 .await
1370 .is_none()
1371 );
1372 }
1373
1374 #[tokio::test]
1375 async fn test_parquet_meta_cache_with_provided_region_metadata() {
1376 let cache = CacheManager::builder().sst_meta_cache_size(2000).build();
1377 let mut metrics = MetadataCacheMetrics::default();
1378 let region_id = RegionId::new(1, 1);
1379 let file_id = RegionFileId::new(region_id, FileId::random());
1380 let (metadata, region_metadata) = sst_parquet_meta();
1381
1382 cache.put_parquet_meta_data(file_id, metadata, Some(region_metadata.clone()));
1383
1384 let cached = cache
1385 .get_sst_meta_data(file_id, &mut metrics, Default::default())
1386 .await
1387 .unwrap();
1388 assert!(Arc::ptr_eq(®ion_metadata, &cached.region_metadata()));
1389 }
1390
1391 #[test]
1392 fn test_meta_cache_weight_accounts_for_decoded_region_metadata() {
1393 let region_metadata = Arc::new(wide_region_metadata(128));
1394 let json_len = region_metadata.to_json().unwrap().len();
1395 let metadata = sst_parquet_meta_with_region_metadata(region_metadata.clone());
1396 let cached = Arc::new(
1397 CachedSstMeta::try_new("test.parquet", Arc::unwrap_or_clone(metadata)).unwrap(),
1398 );
1399 let key = SstMetaKey(region_metadata.region_id, FileId::random());
1400
1401 assert!(cached.region_metadata_weight > json_len);
1402 assert_eq!(
1403 meta_cache_weight(&key, &cached) as usize,
1404 key.estimated_size()
1405 + parquet_meta_size(&cached.parquet_metadata)
1406 + cached.region_metadata_weight
1407 );
1408 }
1409
1410 #[test]
1411 fn test_repeated_vector_cache() {
1412 let cache = CacheManager::builder().vector_cache_size(4096).build();
1413 let value = Value::Int64(10);
1414 assert!(
1415 cache
1416 .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
1417 .is_none()
1418 );
1419 let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10]));
1420 cache.put_repeated_vector(value.clone(), vector.clone());
1421 let cached = cache
1422 .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
1423 .unwrap();
1424 assert_eq!(vector, cached);
1425 }
1426
1427 #[test]
1428 fn test_page_cache() {
1429 let cache = CacheManager::builder().page_cache_size(1000).build();
1430 let file_id = FileId::random();
1431 let key = PageKey::new(file_id, 0, vec![(0..10), (10..20)]);
1432 assert!(cache.get_pages(&key).is_none());
1433 let pages = Arc::new(PageValue::default());
1434 cache.put_pages(key.clone(), pages);
1435 assert!(cache.get_pages(&key).is_some());
1436 }
1437
1438 #[test]
1439 fn test_selector_result_cache() {
1440 let cache = CacheManager::builder()
1441 .selector_result_cache_size(1000)
1442 .build();
1443 let file_id = FileId::random();
1444 let key = SelectorResultKey {
1445 file_id,
1446 row_group_idx: 0,
1447 selector: TimeSeriesRowSelector::LastRow,
1448 };
1449 assert!(cache.get_selector_result(&key).is_none());
1450 let result = Arc::new(SelectorResultValue::new(
1451 Vec::new(),
1452 ParquetReadColumns::from_deduped(Vec::new()),
1453 ));
1454 cache.put_selector_result(key, result);
1455 assert!(cache.get_selector_result(&key).is_some());
1456 }
1457
1458 #[test]
1459 fn test_range_result_cache() {
1460 let cache = Arc::new(
1461 CacheManager::builder()
1462 .range_result_cache_size(1024 * 1024)
1463 .build(),
1464 );
1465
1466 let key = RangeScanCacheKey {
1467 region_id: RegionId::new(1, 1),
1468 row_groups: vec![(FileId::random(), 0)],
1469 scan: ScanRequestFingerprintBuilder {
1470 read_columns: ReadColumns::from_deduped_column_ids(std::iter::empty()),
1471 read_column_types: vec![],
1472 filters: vec!["tag_0 = 1".to_string()],
1473 time_filters: vec![],
1474 series_row_selector: None,
1475 append_mode: false,
1476 filter_deleted: true,
1477 merge_mode: crate::region::options::MergeMode::LastRow,
1478 partition_expr_version: 0,
1479 }
1480 .build(),
1481 };
1482 let value = Arc::new(RangeScanCacheValue::new(Vec::new(), 0));
1483
1484 assert!(cache.get_range_result(&key).is_none());
1485 cache.put_range_result(key.clone(), value.clone());
1486 assert!(cache.get_range_result(&key).is_some());
1487
1488 let enable_all = CacheStrategy::EnableAll(cache.clone());
1489 assert!(enable_all.get_range_result(&key).is_some());
1490
1491 let compaction = CacheStrategy::Compaction(cache.clone());
1492 assert!(compaction.get_range_result(&key).is_none());
1493 compaction.put_range_result(key.clone(), value.clone());
1494 assert!(cache.get_range_result(&key).is_some());
1495
1496 let disabled = CacheStrategy::Disabled;
1497 assert!(disabled.get_range_result(&key).is_none());
1498 disabled.put_range_result(key.clone(), value);
1499 assert!(cache.get_range_result(&key).is_some());
1500 }
1501
1502 #[test]
1503 fn test_range_result_cache_size_configures_limiter() {
1504 let cache_size = 3 * 1024_u64;
1505 let cache = CacheManager::builder()
1506 .range_result_cache_size(cache_size)
1507 .build();
1508
1509 assert_eq!(cache.range_result_cache_size(), cache_size as usize);
1510 assert_eq!(
1511 cache.range_result_memory_limiter().permit_bytes(),
1512 RANGE_RESULT_CONCAT_MEMORY_PERMIT.as_bytes() as usize
1513 );
1514 assert_eq!(
1515 cache.range_result_memory_limiter().available_permits(),
1516 (cache_size as usize).div_ceil(RANGE_RESULT_CONCAT_MEMORY_PERMIT.as_bytes() as usize)
1517 );
1518 }
1519
1520 #[tokio::test]
1521 async fn range_result_memory_limiter_rejects_oversized_request() {
1522 let limiter = RangeResultMemoryLimiter::new(2 * 1024, 1024);
1523 assert_eq!(limiter.available_permits(), 2);
1524
1525 let err = limiter.acquire(10 * 1024).await.unwrap_err();
1526 assert!(
1527 err.to_string().contains("exceeds limiter capacity"),
1528 "unexpected error: {err}"
1529 );
1530 assert_eq!(limiter.available_permits(), 2);
1531 }
1532
1533 #[tokio::test]
1534 async fn range_result_memory_limiter_allows_request_up_to_capacity() {
1535 let limiter = RangeResultMemoryLimiter::new(2 * 1024, 1024);
1536 let permit = limiter.acquire(2 * 1024).await.unwrap();
1537 assert_eq!(limiter.available_permits(), 0);
1538 drop(permit);
1539 assert_eq!(limiter.available_permits(), 2);
1540 }
1541
1542 #[tokio::test]
1543 async fn test_evict_puffin_cache_clears_all_entries() {
1544 use std::collections::{BTreeMap, HashMap};
1545
1546 let cache = CacheManager::builder()
1547 .index_metadata_size(128)
1548 .index_content_size(128)
1549 .index_content_page_size(64)
1550 .index_result_cache_size(128)
1551 .puffin_metadata_size(128)
1552 .build();
1553 let cache = Arc::new(cache);
1554
1555 let region_id = RegionId::new(1, 1);
1556 let index_id = RegionIndexId::new(RegionFileId::new(region_id, FileId::random()), 0);
1557 let column_id: ColumnId = 1;
1558
1559 let bloom_cache = cache.bloom_filter_index_cache().unwrap().clone();
1560 let inverted_cache = cache.inverted_index_cache().unwrap().clone();
1561 let result_cache = cache.index_result_cache().unwrap();
1562 let puffin_metadata_cache = cache.puffin_metadata_cache().unwrap().clone();
1563
1564 let bloom_key = (
1565 index_id.file_id(),
1566 index_id.version,
1567 column_id,
1568 Tag::Skipping,
1569 );
1570 bloom_cache.put_metadata(bloom_key, Arc::new(BloomFilterMeta::default()));
1571 inverted_cache.put_metadata(
1572 (index_id.file_id(), index_id.version),
1573 Arc::new(InvertedIndexMetas::default()),
1574 );
1575 let predicate = PredicateKey::new_bloom(Arc::new(BTreeMap::new()));
1576 let selection = Arc::new(RowGroupSelection::default());
1577 result_cache.put(predicate.clone(), index_id.file_id(), selection);
1578 let file_id_str = index_id.to_string();
1579 let metadata = Arc::new(FileMetadata {
1580 blobs: Vec::new(),
1581 properties: HashMap::new(),
1582 });
1583 puffin_metadata_cache.put_metadata(file_id_str.clone(), metadata);
1584
1585 assert!(bloom_cache.get_metadata(bloom_key).is_some());
1586 assert!(
1587 inverted_cache
1588 .get_metadata((index_id.file_id(), index_id.version))
1589 .is_some()
1590 );
1591 assert!(result_cache.get(&predicate, index_id.file_id()).is_some());
1592 assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_some());
1593
1594 cache.evict_puffin_cache(index_id).await;
1595
1596 assert!(bloom_cache.get_metadata(bloom_key).is_none());
1597 assert!(
1598 inverted_cache
1599 .get_metadata((index_id.file_id(), index_id.version))
1600 .is_none()
1601 );
1602 assert!(result_cache.get(&predicate, index_id.file_id()).is_none());
1603 assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_none());
1604
1605 bloom_cache.put_metadata(bloom_key, Arc::new(BloomFilterMeta::default()));
1607 inverted_cache.put_metadata(
1608 (index_id.file_id(), index_id.version),
1609 Arc::new(InvertedIndexMetas::default()),
1610 );
1611 result_cache.put(
1612 predicate.clone(),
1613 index_id.file_id(),
1614 Arc::new(RowGroupSelection::default()),
1615 );
1616 puffin_metadata_cache.put_metadata(
1617 file_id_str.clone(),
1618 Arc::new(FileMetadata {
1619 blobs: Vec::new(),
1620 properties: HashMap::new(),
1621 }),
1622 );
1623
1624 let strategy = CacheStrategy::EnableAll(cache.clone());
1625 strategy.evict_puffin_cache(index_id).await;
1626
1627 assert!(bloom_cache.get_metadata(bloom_key).is_none());
1628 assert!(
1629 inverted_cache
1630 .get_metadata((index_id.file_id(), index_id.version))
1631 .is_none()
1632 );
1633 assert!(result_cache.get(&predicate, index_id.file_id()).is_none());
1634 assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_none());
1635 }
1636
1637 fn wide_region_metadata(column_count: u32) -> RegionMetadata {
1638 let region_id = RegionId::new(1024, 7);
1639 let mut builder = RegionMetadataBuilder::new(region_id);
1640 let mut primary_key = Vec::new();
1641
1642 for column_id in 0..column_count {
1643 let semantic_type = if column_id < 32 {
1644 primary_key.push(column_id);
1645 SemanticType::Tag
1646 } else {
1647 SemanticType::Field
1648 };
1649 let mut column_schema = ColumnSchema::new(
1650 format!("wide_column_{column_id}"),
1651 ConcreteDataType::string_datatype(),
1652 true,
1653 );
1654 column_schema
1655 .mut_metadata()
1656 .insert(format!("cache_key_{column_id}"), "cache_value".repeat(4));
1657 builder.push_column_metadata(ColumnMetadata {
1658 column_schema,
1659 semantic_type,
1660 column_id,
1661 });
1662 }
1663
1664 builder.push_column_metadata(ColumnMetadata {
1665 column_schema: ColumnSchema::new(
1666 "ts",
1667 ConcreteDataType::timestamp_millisecond_datatype(),
1668 false,
1669 ),
1670 semantic_type: SemanticType::Timestamp,
1671 column_id: column_count,
1672 });
1673 builder.primary_key(primary_key);
1674
1675 builder.build().unwrap()
1676 }
1677}