1pub(crate) mod cache_size;
18
19pub(crate) mod file_cache;
20pub(crate) mod index;
21pub(crate) mod manifest_cache;
22#[cfg(test)]
23pub(crate) mod test_util;
24pub(crate) mod write_cache;
25
26use std::mem;
27use std::ops::Range;
28use std::sync::Arc;
29
30use bytes::Bytes;
31use common_telemetry::warn;
32use datatypes::arrow::record_batch::RecordBatch;
33use datatypes::value::Value;
34use datatypes::vectors::VectorRef;
35use index::bloom_filter_index::{BloomFilterIndexCache, BloomFilterIndexCacheRef};
36use index::result_cache::IndexResultCache;
37use moka::notification::RemovalCause;
38use moka::sync::Cache;
39use object_store::ObjectStore;
40use parquet::file::metadata::{FileMetaData, PageIndexPolicy, ParquetMetaData};
41use puffin::puffin_manager::cache::{PuffinMetadataCache, PuffinMetadataCacheRef};
42use snafu::{OptionExt, ResultExt};
43use store_api::metadata::RegionMetadataRef;
44use store_api::storage::{ConcreteDataType, FileId, RegionId, TimeSeriesRowSelector};
45
46use crate::cache::cache_size::parquet_meta_size;
47use crate::cache::file_cache::{FileType, IndexKey};
48use crate::cache::index::inverted_index::{InvertedIndexCache, InvertedIndexCacheRef};
49#[cfg(feature = "vector_index")]
50use crate::cache::index::vector_index::{VectorIndexCache, VectorIndexCacheRef};
51use crate::cache::write_cache::WriteCacheRef;
52use crate::error::{InvalidMetadataSnafu, InvalidParquetSnafu, Result};
53use crate::memtable::record_batch_estimated_size;
54use crate::metrics::{CACHE_BYTES, CACHE_EVICTION, CACHE_HIT, CACHE_MISS};
55use crate::read::Batch;
56use crate::read::range_cache::{RangeScanCacheKey, RangeScanCacheValue};
57use crate::sst::file::{RegionFileId, RegionIndexId};
58use crate::sst::parquet::PARQUET_METADATA_KEY;
59use crate::sst::parquet::reader::MetadataCacheMetrics;
60
61const SST_META_TYPE: &str = "sst_meta";
63const VECTOR_TYPE: &str = "vector";
65const PAGE_TYPE: &str = "page";
67const FILE_TYPE: &str = "file";
69const INDEX_TYPE: &str = "index";
71const SELECTOR_RESULT_TYPE: &str = "selector_result";
73const RANGE_RESULT_TYPE: &str = "range_result";
75
76#[derive(Debug)]
81pub(crate) struct CachedSstMeta {
82 parquet_metadata: Arc<ParquetMetaData>,
83 region_metadata: RegionMetadataRef,
84 region_metadata_weight: usize,
85}
86
87impl CachedSstMeta {
88 pub(crate) fn try_new(file_path: &str, parquet_metadata: ParquetMetaData) -> Result<Self> {
89 Self::try_new_with_region_metadata(file_path, parquet_metadata, None)
90 }
91
92 pub(crate) fn try_new_with_region_metadata(
93 file_path: &str,
94 parquet_metadata: ParquetMetaData,
95 region_metadata: Option<RegionMetadataRef>,
96 ) -> Result<Self> {
97 let file_metadata = parquet_metadata.file_metadata();
98 let key_values = file_metadata
99 .key_value_metadata()
100 .context(InvalidParquetSnafu {
101 file: file_path,
102 reason: "missing key value meta",
103 })?;
104 let meta_value = key_values
105 .iter()
106 .find(|kv| kv.key == PARQUET_METADATA_KEY)
107 .with_context(|| InvalidParquetSnafu {
108 file: file_path,
109 reason: format!("key {} not found", PARQUET_METADATA_KEY),
110 })?;
111 let json = meta_value
112 .value
113 .as_ref()
114 .with_context(|| InvalidParquetSnafu {
115 file: file_path,
116 reason: format!("No value for key {}", PARQUET_METADATA_KEY),
117 })?;
118 let region_metadata = match region_metadata {
119 Some(region_metadata) => region_metadata,
120 None => Arc::new(
121 store_api::metadata::RegionMetadata::from_json(json)
122 .context(InvalidMetadataSnafu)?,
123 ),
124 };
125 let region_metadata_weight = region_metadata.estimated_size().max(json.len());
127 let parquet_metadata = Arc::new(strip_region_metadata_from_parquet(parquet_metadata));
128
129 Ok(Self {
130 parquet_metadata,
131 region_metadata,
132 region_metadata_weight,
133 })
134 }
135
136 pub(crate) fn parquet_metadata(&self) -> Arc<ParquetMetaData> {
137 self.parquet_metadata.clone()
138 }
139
140 pub(crate) fn region_metadata(&self) -> RegionMetadataRef {
141 self.region_metadata.clone()
142 }
143}
144
145fn strip_region_metadata_from_parquet(parquet_metadata: ParquetMetaData) -> ParquetMetaData {
146 let file_metadata = parquet_metadata.file_metadata();
147 let filtered_key_values = file_metadata.key_value_metadata().and_then(|key_values| {
148 let filtered = key_values
149 .iter()
150 .filter(|kv| kv.key != PARQUET_METADATA_KEY)
151 .cloned()
152 .collect::<Vec<_>>();
153 (!filtered.is_empty()).then_some(filtered)
154 });
155 let stripped_file_metadata = FileMetaData::new(
156 file_metadata.version(),
157 file_metadata.num_rows(),
158 file_metadata.created_by().map(ToString::to_string),
159 filtered_key_values,
160 file_metadata.schema_descr_ptr(),
161 file_metadata.column_orders().cloned(),
162 );
163
164 let mut builder = parquet_metadata.into_builder();
165 let row_groups = builder.take_row_groups();
166 let column_index = builder.take_column_index();
167 let offset_index = builder.take_offset_index();
168
169 parquet::file::metadata::ParquetMetaDataBuilder::new(stripped_file_metadata)
170 .set_row_groups(row_groups)
171 .set_column_index(column_index)
172 .set_offset_index(offset_index)
173 .build()
174}
175
176#[derive(Clone)]
178pub enum CacheStrategy {
179 EnableAll(CacheManagerRef),
182 Compaction(CacheManagerRef),
187 Disabled,
189}
190
191impl CacheStrategy {
192 pub(crate) async fn get_sst_meta_data(
194 &self,
195 file_id: RegionFileId,
196 metrics: &mut MetadataCacheMetrics,
197 page_index_policy: PageIndexPolicy,
198 ) -> Option<Arc<CachedSstMeta>> {
199 match self {
200 CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => {
201 cache_manager
202 .get_sst_meta_data(file_id, metrics, page_index_policy)
203 .await
204 }
205 CacheStrategy::Disabled => {
206 metrics.cache_miss += 1;
207 None
208 }
209 }
210 }
211
212 pub(crate) fn get_sst_meta_data_from_mem_cache(
214 &self,
215 file_id: RegionFileId,
216 ) -> Option<Arc<CachedSstMeta>> {
217 match self {
218 CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => {
219 cache_manager.get_sst_meta_data_from_mem_cache(file_id)
220 }
221 CacheStrategy::Disabled => None,
222 }
223 }
224
225 pub fn get_parquet_meta_data_from_mem_cache(
227 &self,
228 file_id: RegionFileId,
229 ) -> Option<Arc<ParquetMetaData>> {
230 self.get_sst_meta_data_from_mem_cache(file_id)
231 .map(|metadata| metadata.parquet_metadata())
232 }
233
234 pub(crate) fn put_sst_meta_data(&self, file_id: RegionFileId, metadata: Arc<CachedSstMeta>) {
236 match self {
237 CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => {
238 cache_manager.put_sst_meta_data(file_id, metadata);
239 }
240 CacheStrategy::Disabled => {}
241 }
242 }
243
244 pub fn put_parquet_meta_data(
246 &self,
247 file_id: RegionFileId,
248 metadata: Arc<ParquetMetaData>,
249 region_metadata: Option<RegionMetadataRef>,
250 ) {
251 match self {
252 CacheStrategy::EnableAll(cache_manager) | CacheStrategy::Compaction(cache_manager) => {
253 cache_manager.put_parquet_meta_data(file_id, metadata, region_metadata);
254 }
255 CacheStrategy::Disabled => {}
256 }
257 }
258
259 pub fn remove_parquet_meta_data(&self, file_id: RegionFileId) {
261 match self {
262 CacheStrategy::EnableAll(cache_manager) => {
263 cache_manager.remove_parquet_meta_data(file_id);
264 }
265 CacheStrategy::Compaction(cache_manager) => {
266 cache_manager.remove_parquet_meta_data(file_id);
267 }
268 CacheStrategy::Disabled => {}
269 }
270 }
271
272 pub fn get_repeated_vector(
275 &self,
276 data_type: &ConcreteDataType,
277 value: &Value,
278 ) -> Option<VectorRef> {
279 match self {
280 CacheStrategy::EnableAll(cache_manager) => {
281 cache_manager.get_repeated_vector(data_type, value)
282 }
283 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
284 }
285 }
286
287 pub fn put_repeated_vector(&self, value: Value, vector: VectorRef) {
290 if let CacheStrategy::EnableAll(cache_manager) = self {
291 cache_manager.put_repeated_vector(value, vector);
292 }
293 }
294
295 pub fn get_pages(&self, page_key: &PageKey) -> Option<Arc<PageValue>> {
298 match self {
299 CacheStrategy::EnableAll(cache_manager) => cache_manager.get_pages(page_key),
300 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
301 }
302 }
303
304 pub fn put_pages(&self, page_key: PageKey, pages: Arc<PageValue>) {
307 if let CacheStrategy::EnableAll(cache_manager) = self {
308 cache_manager.put_pages(page_key, pages);
309 }
310 }
311
312 pub async fn evict_puffin_cache(&self, file_id: RegionIndexId) {
314 match self {
315 CacheStrategy::EnableAll(cache_manager) => {
316 cache_manager.evict_puffin_cache(file_id).await
317 }
318 CacheStrategy::Compaction(cache_manager) => {
319 cache_manager.evict_puffin_cache(file_id).await
320 }
321 CacheStrategy::Disabled => {}
322 }
323 }
324
325 pub fn get_selector_result(
328 &self,
329 selector_key: &SelectorResultKey,
330 ) -> Option<Arc<SelectorResultValue>> {
331 match self {
332 CacheStrategy::EnableAll(cache_manager) => {
333 cache_manager.get_selector_result(selector_key)
334 }
335 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
336 }
337 }
338
339 pub fn put_selector_result(
342 &self,
343 selector_key: SelectorResultKey,
344 result: Arc<SelectorResultValue>,
345 ) {
346 if let CacheStrategy::EnableAll(cache_manager) = self {
347 cache_manager.put_selector_result(selector_key, result);
348 }
349 }
350
351 #[allow(dead_code)]
354 pub(crate) fn get_range_result(
355 &self,
356 key: &RangeScanCacheKey,
357 ) -> Option<Arc<RangeScanCacheValue>> {
358 match self {
359 CacheStrategy::EnableAll(cache_manager) => cache_manager.get_range_result(key),
360 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
361 }
362 }
363
364 pub(crate) fn put_range_result(
367 &self,
368 key: RangeScanCacheKey,
369 result: Arc<RangeScanCacheValue>,
370 ) {
371 if let CacheStrategy::EnableAll(cache_manager) = self {
372 cache_manager.put_range_result(key, result);
373 }
374 }
375
376 pub fn write_cache(&self) -> Option<&WriteCacheRef> {
379 match self {
380 CacheStrategy::EnableAll(cache_manager) => cache_manager.write_cache(),
381 CacheStrategy::Compaction(cache_manager) => cache_manager.write_cache(),
382 CacheStrategy::Disabled => None,
383 }
384 }
385
386 pub fn inverted_index_cache(&self) -> Option<&InvertedIndexCacheRef> {
389 match self {
390 CacheStrategy::EnableAll(cache_manager) => cache_manager.inverted_index_cache(),
391 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
392 }
393 }
394
395 pub fn bloom_filter_index_cache(&self) -> Option<&BloomFilterIndexCacheRef> {
398 match self {
399 CacheStrategy::EnableAll(cache_manager) => cache_manager.bloom_filter_index_cache(),
400 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
401 }
402 }
403
404 #[cfg(feature = "vector_index")]
407 pub fn vector_index_cache(&self) -> Option<&VectorIndexCacheRef> {
408 match self {
409 CacheStrategy::EnableAll(cache_manager) => cache_manager.vector_index_cache(),
410 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
411 }
412 }
413
414 pub fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
417 match self {
418 CacheStrategy::EnableAll(cache_manager) => cache_manager.puffin_metadata_cache(),
419 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
420 }
421 }
422
423 pub fn index_result_cache(&self) -> Option<&IndexResultCache> {
426 match self {
427 CacheStrategy::EnableAll(cache_manager) => cache_manager.index_result_cache(),
428 CacheStrategy::Compaction(_) | CacheStrategy::Disabled => None,
429 }
430 }
431
432 pub fn maybe_download_background(
434 &self,
435 index_key: IndexKey,
436 remote_path: String,
437 remote_store: ObjectStore,
438 file_size: u64,
439 ) {
440 if let CacheStrategy::EnableAll(cache_manager) = self
441 && let Some(write_cache) = cache_manager.write_cache()
442 {
443 write_cache.file_cache().maybe_download_background(
444 index_key,
445 remote_path,
446 remote_store,
447 file_size,
448 );
449 }
450 }
451}
452
453#[derive(Default)]
457pub struct CacheManager {
458 sst_meta_cache: Option<SstMetaCache>,
460 vector_cache: Option<VectorCache>,
462 page_cache: Option<PageCache>,
464 write_cache: Option<WriteCacheRef>,
466 inverted_index_cache: Option<InvertedIndexCacheRef>,
468 bloom_filter_index_cache: Option<BloomFilterIndexCacheRef>,
470 #[cfg(feature = "vector_index")]
472 vector_index_cache: Option<VectorIndexCacheRef>,
473 puffin_metadata_cache: Option<PuffinMetadataCacheRef>,
475 selector_result_cache: Option<SelectorResultCache>,
477 range_result_cache: Option<RangeResultCache>,
479 index_result_cache: Option<IndexResultCache>,
481}
482
483pub type CacheManagerRef = Arc<CacheManager>;
484
485impl CacheManager {
486 pub fn builder() -> CacheManagerBuilder {
488 CacheManagerBuilder::default()
489 }
490
491 pub(crate) async fn get_sst_meta_data(
494 &self,
495 file_id: RegionFileId,
496 metrics: &mut MetadataCacheMetrics,
497 page_index_policy: PageIndexPolicy,
498 ) -> Option<Arc<CachedSstMeta>> {
499 if let Some(metadata) = self.get_sst_meta_data_from_mem_cache(file_id) {
500 metrics.mem_cache_hit += 1;
501 return Some(metadata);
502 }
503
504 let key = IndexKey::new(file_id.region_id(), file_id.file_id(), FileType::Parquet);
505 if let Some(write_cache) = &self.write_cache
506 && let Some(metadata) = write_cache
507 .file_cache()
508 .get_sst_meta_data(key, metrics, page_index_policy)
509 .await
510 {
511 metrics.file_cache_hit += 1;
512 self.put_sst_meta_data(file_id, metadata.clone());
513 return Some(metadata);
514 }
515
516 metrics.cache_miss += 1;
517 None
518 }
519
520 pub(crate) async fn get_parquet_meta_data(
523 &self,
524 file_id: RegionFileId,
525 metrics: &mut MetadataCacheMetrics,
526 page_index_policy: PageIndexPolicy,
527 ) -> Option<Arc<ParquetMetaData>> {
528 self.get_sst_meta_data(file_id, metrics, page_index_policy)
529 .await
530 .map(|metadata| metadata.parquet_metadata())
531 }
532
533 pub(crate) fn get_sst_meta_data_from_mem_cache(
536 &self,
537 file_id: RegionFileId,
538 ) -> Option<Arc<CachedSstMeta>> {
539 self.sst_meta_cache.as_ref().and_then(|sst_meta_cache| {
540 let value = sst_meta_cache.get(&SstMetaKey(file_id.region_id(), file_id.file_id()));
541 update_hit_miss(value, SST_META_TYPE)
542 })
543 }
544
545 pub fn get_parquet_meta_data_from_mem_cache(
548 &self,
549 file_id: RegionFileId,
550 ) -> Option<Arc<ParquetMetaData>> {
551 self.get_sst_meta_data_from_mem_cache(file_id)
552 .map(|metadata| metadata.parquet_metadata())
553 }
554
555 pub(crate) fn put_sst_meta_data(&self, file_id: RegionFileId, metadata: Arc<CachedSstMeta>) {
557 if let Some(cache) = &self.sst_meta_cache {
558 let key = SstMetaKey(file_id.region_id(), file_id.file_id());
559 CACHE_BYTES
560 .with_label_values(&[SST_META_TYPE])
561 .add(meta_cache_weight(&key, &metadata).into());
562 cache.insert(key, metadata);
563 }
564 }
565
566 pub fn put_parquet_meta_data(
568 &self,
569 file_id: RegionFileId,
570 metadata: Arc<ParquetMetaData>,
571 region_metadata: Option<RegionMetadataRef>,
572 ) {
573 if self.sst_meta_cache.is_some() {
574 let file_path = format!(
575 "region_id={}, file_id={}",
576 file_id.region_id(),
577 file_id.file_id()
578 );
579 match CachedSstMeta::try_new_with_region_metadata(
580 &file_path,
581 Arc::unwrap_or_clone(metadata),
582 region_metadata,
583 ) {
584 Ok(metadata) => self.put_sst_meta_data(file_id, Arc::new(metadata)),
585 Err(err) => warn!(
586 err; "Failed to decode region metadata while caching parquet metadata, region_id: {}, file_id: {}",
587 file_id.region_id(),
588 file_id.file_id()
589 ),
590 }
591 }
592 }
593
594 pub fn remove_parquet_meta_data(&self, file_id: RegionFileId) {
596 if let Some(cache) = &self.sst_meta_cache {
597 cache.remove(&SstMetaKey(file_id.region_id(), file_id.file_id()));
598 }
599 }
600
601 pub(crate) fn sst_meta_cache_weighted_size(&self) -> u64 {
603 self.sst_meta_cache
604 .as_ref()
605 .map(|cache| cache.weighted_size())
606 .unwrap_or(0)
607 }
608
609 pub(crate) fn sst_meta_cache_enabled(&self) -> bool {
611 self.sst_meta_cache.is_some()
612 }
613
614 pub fn get_repeated_vector(
616 &self,
617 data_type: &ConcreteDataType,
618 value: &Value,
619 ) -> Option<VectorRef> {
620 self.vector_cache.as_ref().and_then(|vector_cache| {
621 let value = vector_cache.get(&(data_type.clone(), value.clone()));
622 update_hit_miss(value, VECTOR_TYPE)
623 })
624 }
625
626 pub fn put_repeated_vector(&self, value: Value, vector: VectorRef) {
628 if let Some(cache) = &self.vector_cache {
629 let key = (vector.data_type(), value);
630 CACHE_BYTES
631 .with_label_values(&[VECTOR_TYPE])
632 .add(vector_cache_weight(&key, &vector).into());
633 cache.insert(key, vector);
634 }
635 }
636
637 pub fn get_pages(&self, page_key: &PageKey) -> Option<Arc<PageValue>> {
639 self.page_cache.as_ref().and_then(|page_cache| {
640 let value = page_cache.get(page_key);
641 update_hit_miss(value, PAGE_TYPE)
642 })
643 }
644
645 pub fn put_pages(&self, page_key: PageKey, pages: Arc<PageValue>) {
647 if let Some(cache) = &self.page_cache {
648 CACHE_BYTES
649 .with_label_values(&[PAGE_TYPE])
650 .add(page_cache_weight(&page_key, &pages).into());
651 cache.insert(page_key, pages);
652 }
653 }
654
655 pub async fn evict_puffin_cache(&self, file_id: RegionIndexId) {
657 if let Some(cache) = &self.bloom_filter_index_cache {
658 cache.invalidate_file(file_id.file_id());
659 }
660
661 if let Some(cache) = &self.inverted_index_cache {
662 cache.invalidate_file(file_id.file_id());
663 }
664
665 if let Some(cache) = &self.index_result_cache {
666 cache.invalidate_file(file_id.file_id());
667 }
668
669 #[cfg(feature = "vector_index")]
670 if let Some(cache) = &self.vector_index_cache {
671 cache.invalidate_file(file_id.file_id());
672 }
673
674 if let Some(cache) = &self.puffin_metadata_cache {
675 cache.remove(&file_id.to_string());
676 }
677
678 if let Some(write_cache) = &self.write_cache {
679 write_cache
680 .remove(IndexKey::new(
681 file_id.region_id(),
682 file_id.file_id(),
683 FileType::Puffin(file_id.version),
684 ))
685 .await;
686 }
687 }
688
689 pub fn get_selector_result(
691 &self,
692 selector_key: &SelectorResultKey,
693 ) -> Option<Arc<SelectorResultValue>> {
694 self.selector_result_cache
695 .as_ref()
696 .and_then(|selector_result_cache| selector_result_cache.get(selector_key))
697 }
698
699 pub fn put_selector_result(
701 &self,
702 selector_key: SelectorResultKey,
703 result: Arc<SelectorResultValue>,
704 ) {
705 if let Some(cache) = &self.selector_result_cache {
706 CACHE_BYTES
707 .with_label_values(&[SELECTOR_RESULT_TYPE])
708 .add(selector_result_cache_weight(&selector_key, &result).into());
709 cache.insert(selector_key, result);
710 }
711 }
712
713 #[allow(dead_code)]
715 pub(crate) fn get_range_result(
716 &self,
717 key: &RangeScanCacheKey,
718 ) -> Option<Arc<RangeScanCacheValue>> {
719 self.range_result_cache
720 .as_ref()
721 .and_then(|cache| update_hit_miss(cache.get(key), RANGE_RESULT_TYPE))
722 }
723
724 pub(crate) fn put_range_result(
726 &self,
727 key: RangeScanCacheKey,
728 result: Arc<RangeScanCacheValue>,
729 ) {
730 if let Some(cache) = &self.range_result_cache {
731 CACHE_BYTES
732 .with_label_values(&[RANGE_RESULT_TYPE])
733 .add(range_result_cache_weight(&key, &result).into());
734 cache.insert(key, result);
735 }
736 }
737
738 pub(crate) fn write_cache(&self) -> Option<&WriteCacheRef> {
740 self.write_cache.as_ref()
741 }
742
743 pub(crate) fn inverted_index_cache(&self) -> Option<&InvertedIndexCacheRef> {
744 self.inverted_index_cache.as_ref()
745 }
746
747 pub(crate) fn bloom_filter_index_cache(&self) -> Option<&BloomFilterIndexCacheRef> {
748 self.bloom_filter_index_cache.as_ref()
749 }
750
751 #[cfg(feature = "vector_index")]
752 pub(crate) fn vector_index_cache(&self) -> Option<&VectorIndexCacheRef> {
753 self.vector_index_cache.as_ref()
754 }
755
756 pub(crate) fn puffin_metadata_cache(&self) -> Option<&PuffinMetadataCacheRef> {
757 self.puffin_metadata_cache.as_ref()
758 }
759
760 pub(crate) fn index_result_cache(&self) -> Option<&IndexResultCache> {
761 self.index_result_cache.as_ref()
762 }
763}
764
765pub fn selector_result_cache_miss() {
767 CACHE_MISS.with_label_values(&[SELECTOR_RESULT_TYPE]).inc()
768}
769
770pub fn selector_result_cache_hit() {
772 CACHE_HIT.with_label_values(&[SELECTOR_RESULT_TYPE]).inc()
773}
774
775#[derive(Default)]
777pub struct CacheManagerBuilder {
778 sst_meta_cache_size: u64,
779 vector_cache_size: u64,
780 page_cache_size: u64,
781 index_metadata_size: u64,
782 index_content_size: u64,
783 index_content_page_size: u64,
784 index_result_cache_size: u64,
785 puffin_metadata_size: u64,
786 write_cache: Option<WriteCacheRef>,
787 selector_result_cache_size: u64,
788 range_result_cache_size: u64,
789}
790
791impl CacheManagerBuilder {
792 pub fn sst_meta_cache_size(mut self, bytes: u64) -> Self {
794 self.sst_meta_cache_size = bytes;
795 self
796 }
797
798 pub fn vector_cache_size(mut self, bytes: u64) -> Self {
800 self.vector_cache_size = bytes;
801 self
802 }
803
804 pub fn page_cache_size(mut self, bytes: u64) -> Self {
806 self.page_cache_size = bytes;
807 self
808 }
809
810 pub fn write_cache(mut self, cache: Option<WriteCacheRef>) -> Self {
812 self.write_cache = cache;
813 self
814 }
815
816 pub fn index_metadata_size(mut self, bytes: u64) -> Self {
818 self.index_metadata_size = bytes;
819 self
820 }
821
822 pub fn index_content_size(mut self, bytes: u64) -> Self {
824 self.index_content_size = bytes;
825 self
826 }
827
828 pub fn index_content_page_size(mut self, bytes: u64) -> Self {
830 self.index_content_page_size = bytes;
831 self
832 }
833
834 pub fn index_result_cache_size(mut self, bytes: u64) -> Self {
836 self.index_result_cache_size = bytes;
837 self
838 }
839
840 pub fn puffin_metadata_size(mut self, bytes: u64) -> Self {
842 self.puffin_metadata_size = bytes;
843 self
844 }
845
846 pub fn selector_result_cache_size(mut self, bytes: u64) -> Self {
848 self.selector_result_cache_size = bytes;
849 self
850 }
851
852 pub fn range_result_cache_size(mut self, bytes: u64) -> Self {
854 self.range_result_cache_size = bytes;
855 self
856 }
857
858 pub fn build(self) -> CacheManager {
860 fn to_str(cause: RemovalCause) -> &'static str {
861 match cause {
862 RemovalCause::Expired => "expired",
863 RemovalCause::Explicit => "explicit",
864 RemovalCause::Replaced => "replaced",
865 RemovalCause::Size => "size",
866 }
867 }
868
869 let sst_meta_cache = (self.sst_meta_cache_size != 0).then(|| {
870 Cache::builder()
871 .max_capacity(self.sst_meta_cache_size)
872 .weigher(meta_cache_weight)
873 .eviction_listener(|k, v, cause| {
874 let size = meta_cache_weight(&k, &v);
875 CACHE_BYTES
876 .with_label_values(&[SST_META_TYPE])
877 .sub(size.into());
878 CACHE_EVICTION
879 .with_label_values(&[SST_META_TYPE, to_str(cause)])
880 .inc();
881 })
882 .build()
883 });
884 let vector_cache = (self.vector_cache_size != 0).then(|| {
885 Cache::builder()
886 .max_capacity(self.vector_cache_size)
887 .weigher(vector_cache_weight)
888 .eviction_listener(|k, v, cause| {
889 let size = vector_cache_weight(&k, &v);
890 CACHE_BYTES
891 .with_label_values(&[VECTOR_TYPE])
892 .sub(size.into());
893 CACHE_EVICTION
894 .with_label_values(&[VECTOR_TYPE, to_str(cause)])
895 .inc();
896 })
897 .build()
898 });
899 let page_cache = (self.page_cache_size != 0).then(|| {
900 Cache::builder()
901 .max_capacity(self.page_cache_size)
902 .weigher(page_cache_weight)
903 .eviction_listener(|k, v, cause| {
904 let size = page_cache_weight(&k, &v);
905 CACHE_BYTES.with_label_values(&[PAGE_TYPE]).sub(size.into());
906 CACHE_EVICTION
907 .with_label_values(&[PAGE_TYPE, to_str(cause)])
908 .inc();
909 })
910 .build()
911 });
912 let inverted_index_cache = InvertedIndexCache::new(
913 self.index_metadata_size,
914 self.index_content_size,
915 self.index_content_page_size,
916 );
917 let bloom_filter_index_cache = BloomFilterIndexCache::new(
919 self.index_metadata_size,
920 self.index_content_size,
921 self.index_content_page_size,
922 );
923 #[cfg(feature = "vector_index")]
924 let vector_index_cache = (self.index_content_size != 0)
925 .then(|| Arc::new(VectorIndexCache::new(self.index_content_size)));
926 let index_result_cache = (self.index_result_cache_size != 0)
927 .then(|| IndexResultCache::new(self.index_result_cache_size));
928 let puffin_metadata_cache =
929 PuffinMetadataCache::new(self.puffin_metadata_size, &CACHE_BYTES);
930 let selector_result_cache = (self.selector_result_cache_size != 0).then(|| {
931 Cache::builder()
932 .max_capacity(self.selector_result_cache_size)
933 .weigher(selector_result_cache_weight)
934 .eviction_listener(|k, v, cause| {
935 let size = selector_result_cache_weight(&k, &v);
936 CACHE_BYTES
937 .with_label_values(&[SELECTOR_RESULT_TYPE])
938 .sub(size.into());
939 CACHE_EVICTION
940 .with_label_values(&[SELECTOR_RESULT_TYPE, to_str(cause)])
941 .inc();
942 })
943 .build()
944 });
945 let range_result_cache = (self.range_result_cache_size != 0).then(|| {
946 Cache::builder()
947 .max_capacity(self.range_result_cache_size)
948 .weigher(range_result_cache_weight)
949 .eviction_listener(move |k, v, cause| {
950 let size = range_result_cache_weight(&k, &v);
951 CACHE_BYTES
952 .with_label_values(&[RANGE_RESULT_TYPE])
953 .sub(size.into());
954 CACHE_EVICTION
955 .with_label_values(&[RANGE_RESULT_TYPE, to_str(cause)])
956 .inc();
957 })
958 .build()
959 });
960 CacheManager {
961 sst_meta_cache,
962 vector_cache,
963 page_cache,
964 write_cache: self.write_cache,
965 inverted_index_cache: Some(Arc::new(inverted_index_cache)),
966 bloom_filter_index_cache: Some(Arc::new(bloom_filter_index_cache)),
967 #[cfg(feature = "vector_index")]
968 vector_index_cache,
969 puffin_metadata_cache: Some(Arc::new(puffin_metadata_cache)),
970 selector_result_cache,
971 range_result_cache,
972 index_result_cache,
973 }
974 }
975}
976
977fn meta_cache_weight(k: &SstMetaKey, v: &Arc<CachedSstMeta>) -> u32 {
978 (k.estimated_size() + parquet_meta_size(&v.parquet_metadata) + v.region_metadata_weight) as u32
980}
981
982fn vector_cache_weight(_k: &(ConcreteDataType, Value), v: &VectorRef) -> u32 {
983 (mem::size_of::<ConcreteDataType>() + mem::size_of::<Value>() + v.memory_size()) as u32
985}
986
987fn page_cache_weight(k: &PageKey, v: &Arc<PageValue>) -> u32 {
988 (k.estimated_size() + v.estimated_size()) as u32
989}
990
991fn selector_result_cache_weight(k: &SelectorResultKey, v: &Arc<SelectorResultValue>) -> u32 {
992 (mem::size_of_val(k) + v.estimated_size()) as u32
993}
994
995fn range_result_cache_weight(k: &RangeScanCacheKey, v: &Arc<RangeScanCacheValue>) -> u32 {
996 (k.estimated_size() + v.estimated_size()) as u32
997}
998
999fn update_hit_miss<T>(value: Option<T>, cache_type: &str) -> Option<T> {
1001 if value.is_some() {
1002 CACHE_HIT.with_label_values(&[cache_type]).inc();
1003 } else {
1004 CACHE_MISS.with_label_values(&[cache_type]).inc();
1005 }
1006 value
1007}
1008
1009#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1011struct SstMetaKey(RegionId, FileId);
1012
1013impl SstMetaKey {
1014 fn estimated_size(&self) -> usize {
1016 mem::size_of::<Self>()
1017 }
1018}
1019
1020#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1022pub struct ColumnPagePath {
1023 region_id: RegionId,
1025 file_id: FileId,
1027 row_group_idx: usize,
1029 column_idx: usize,
1031}
1032
1033#[derive(Debug, Clone, PartialEq, Eq, Hash)]
1038pub struct PageKey {
1039 file_id: FileId,
1041 row_group_idx: usize,
1043 ranges: Vec<Range<u64>>,
1045}
1046
1047impl PageKey {
1048 pub fn new(file_id: FileId, row_group_idx: usize, ranges: Vec<Range<u64>>) -> PageKey {
1050 PageKey {
1051 file_id,
1052 row_group_idx,
1053 ranges,
1054 }
1055 }
1056
1057 fn estimated_size(&self) -> usize {
1059 mem::size_of::<Self>() + mem::size_of_val(self.ranges.as_slice())
1060 }
1061}
1062
1063#[derive(Default)]
1066pub struct PageValue {
1067 pub compressed: Vec<Bytes>,
1069 pub page_size: u64,
1071}
1072
1073impl PageValue {
1074 pub fn new(bytes: Vec<Bytes>, page_size: u64) -> PageValue {
1076 PageValue {
1077 compressed: bytes,
1078 page_size,
1079 }
1080 }
1081
1082 fn estimated_size(&self) -> usize {
1084 mem::size_of::<Self>()
1085 + self.page_size as usize
1086 + self.compressed.iter().map(mem::size_of_val).sum::<usize>()
1087 }
1088}
1089
1090#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
1092pub struct SelectorResultKey {
1093 pub file_id: FileId,
1095 pub row_group_idx: usize,
1097 pub selector: TimeSeriesRowSelector,
1099}
1100
1101pub enum SelectorResult {
1103 PrimaryKey(Vec<Batch>),
1105 Flat(Vec<RecordBatch>),
1107}
1108
1109pub struct SelectorResultValue {
1111 pub result: SelectorResult,
1113 pub projection: Vec<usize>,
1115}
1116
1117impl SelectorResultValue {
1118 pub fn new(result: Vec<Batch>, projection: Vec<usize>) -> SelectorResultValue {
1120 SelectorResultValue {
1121 result: SelectorResult::PrimaryKey(result),
1122 projection,
1123 }
1124 }
1125
1126 pub fn new_flat(result: Vec<RecordBatch>, projection: Vec<usize>) -> SelectorResultValue {
1128 SelectorResultValue {
1129 result: SelectorResult::Flat(result),
1130 projection,
1131 }
1132 }
1133
1134 fn estimated_size(&self) -> usize {
1136 match &self.result {
1137 SelectorResult::PrimaryKey(batches) => {
1138 batches.iter().map(|batch| batch.memory_size()).sum()
1139 }
1140 SelectorResult::Flat(batches) => batches.iter().map(record_batch_estimated_size).sum(),
1141 }
1142 }
1143}
1144
1145type SstMetaCache = Cache<SstMetaKey, Arc<CachedSstMeta>>;
1147type VectorCache = Cache<(ConcreteDataType, Value), VectorRef>;
1151type PageCache = Cache<PageKey, Arc<PageValue>>;
1153type SelectorResultCache = Cache<SelectorResultKey, Arc<SelectorResultValue>>;
1155type RangeResultCache = Cache<RangeScanCacheKey, Arc<RangeScanCacheValue>>;
1157
1158#[cfg(test)]
1159mod tests {
1160 use std::sync::Arc;
1161
1162 use api::v1::SemanticType;
1163 use api::v1::index::{BloomFilterMeta, InvertedIndexMetas};
1164 use datatypes::schema::ColumnSchema;
1165 use datatypes::vectors::Int64Vector;
1166 use puffin::file_metadata::FileMetadata;
1167 use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
1168 use store_api::storage::ColumnId;
1169
1170 use super::*;
1171 use crate::cache::index::bloom_filter_index::Tag;
1172 use crate::cache::index::result_cache::PredicateKey;
1173 use crate::cache::test_util::{
1174 parquet_meta, sst_parquet_meta, sst_parquet_meta_with_region_metadata,
1175 };
1176 use crate::read::range_cache::{
1177 RangeScanCacheKey, RangeScanCacheValue, ScanRequestFingerprintBuilder,
1178 };
1179 use crate::sst::parquet::row_selection::RowGroupSelection;
1180
1181 #[tokio::test]
1182 async fn test_disable_cache() {
1183 let cache = CacheManager::default();
1184 assert!(cache.sst_meta_cache.is_none());
1185 assert!(cache.vector_cache.is_none());
1186 assert!(cache.page_cache.is_none());
1187
1188 let region_id = RegionId::new(1, 1);
1189 let file_id = RegionFileId::new(region_id, FileId::random());
1190 let metadata = parquet_meta();
1191 let mut metrics = MetadataCacheMetrics::default();
1192 cache.put_parquet_meta_data(file_id, metadata, None);
1193 assert!(
1194 cache
1195 .get_parquet_meta_data(file_id, &mut metrics, Default::default())
1196 .await
1197 .is_none()
1198 );
1199
1200 let value = Value::Int64(10);
1201 let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10]));
1202 cache.put_repeated_vector(value.clone(), vector.clone());
1203 assert!(
1204 cache
1205 .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
1206 .is_none()
1207 );
1208
1209 let key = PageKey::new(file_id.file_id(), 1, vec![Range { start: 0, end: 5 }]);
1210 let pages = Arc::new(PageValue::default());
1211 cache.put_pages(key.clone(), pages);
1212 assert!(cache.get_pages(&key).is_none());
1213
1214 assert!(cache.write_cache().is_none());
1215 }
1216
1217 #[tokio::test]
1218 async fn test_parquet_meta_cache() {
1219 let cache = CacheManager::builder().sst_meta_cache_size(2000).build();
1220 let mut metrics = MetadataCacheMetrics::default();
1221 let region_id = RegionId::new(1, 1);
1222 let file_id = RegionFileId::new(region_id, FileId::random());
1223 assert!(
1224 cache
1225 .get_parquet_meta_data(file_id, &mut metrics, Default::default())
1226 .await
1227 .is_none()
1228 );
1229 let (metadata, region_metadata) = sst_parquet_meta();
1230 cache.put_parquet_meta_data(file_id, metadata, None);
1231 let cached = cache
1232 .get_sst_meta_data(file_id, &mut metrics, Default::default())
1233 .await
1234 .unwrap();
1235 assert_eq!(region_metadata, cached.region_metadata());
1236 assert!(
1237 cached
1238 .parquet_metadata()
1239 .file_metadata()
1240 .key_value_metadata()
1241 .is_none_or(|key_values| {
1242 key_values
1243 .iter()
1244 .all(|key_value| key_value.key != PARQUET_METADATA_KEY)
1245 })
1246 );
1247 cache.remove_parquet_meta_data(file_id);
1248 assert!(
1249 cache
1250 .get_parquet_meta_data(file_id, &mut metrics, Default::default())
1251 .await
1252 .is_none()
1253 );
1254 }
1255
1256 #[tokio::test]
1257 async fn test_parquet_meta_cache_with_provided_region_metadata() {
1258 let cache = CacheManager::builder().sst_meta_cache_size(2000).build();
1259 let mut metrics = MetadataCacheMetrics::default();
1260 let region_id = RegionId::new(1, 1);
1261 let file_id = RegionFileId::new(region_id, FileId::random());
1262 let (metadata, region_metadata) = sst_parquet_meta();
1263
1264 cache.put_parquet_meta_data(file_id, metadata, Some(region_metadata.clone()));
1265
1266 let cached = cache
1267 .get_sst_meta_data(file_id, &mut metrics, Default::default())
1268 .await
1269 .unwrap();
1270 assert!(Arc::ptr_eq(®ion_metadata, &cached.region_metadata()));
1271 }
1272
1273 #[test]
1274 fn test_meta_cache_weight_accounts_for_decoded_region_metadata() {
1275 let region_metadata = Arc::new(wide_region_metadata(128));
1276 let json_len = region_metadata.to_json().unwrap().len();
1277 let metadata = sst_parquet_meta_with_region_metadata(region_metadata.clone());
1278 let cached = Arc::new(
1279 CachedSstMeta::try_new("test.parquet", Arc::unwrap_or_clone(metadata)).unwrap(),
1280 );
1281 let key = SstMetaKey(region_metadata.region_id, FileId::random());
1282
1283 assert!(cached.region_metadata_weight > json_len);
1284 assert_eq!(
1285 meta_cache_weight(&key, &cached) as usize,
1286 key.estimated_size()
1287 + parquet_meta_size(&cached.parquet_metadata)
1288 + cached.region_metadata_weight
1289 );
1290 }
1291
1292 #[test]
1293 fn test_repeated_vector_cache() {
1294 let cache = CacheManager::builder().vector_cache_size(4096).build();
1295 let value = Value::Int64(10);
1296 assert!(
1297 cache
1298 .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
1299 .is_none()
1300 );
1301 let vector: VectorRef = Arc::new(Int64Vector::from_slice([10, 10, 10, 10]));
1302 cache.put_repeated_vector(value.clone(), vector.clone());
1303 let cached = cache
1304 .get_repeated_vector(&ConcreteDataType::int64_datatype(), &value)
1305 .unwrap();
1306 assert_eq!(vector, cached);
1307 }
1308
1309 #[test]
1310 fn test_page_cache() {
1311 let cache = CacheManager::builder().page_cache_size(1000).build();
1312 let file_id = FileId::random();
1313 let key = PageKey::new(file_id, 0, vec![(0..10), (10..20)]);
1314 assert!(cache.get_pages(&key).is_none());
1315 let pages = Arc::new(PageValue::default());
1316 cache.put_pages(key.clone(), pages);
1317 assert!(cache.get_pages(&key).is_some());
1318 }
1319
1320 #[test]
1321 fn test_selector_result_cache() {
1322 let cache = CacheManager::builder()
1323 .selector_result_cache_size(1000)
1324 .build();
1325 let file_id = FileId::random();
1326 let key = SelectorResultKey {
1327 file_id,
1328 row_group_idx: 0,
1329 selector: TimeSeriesRowSelector::LastRow,
1330 };
1331 assert!(cache.get_selector_result(&key).is_none());
1332 let result = Arc::new(SelectorResultValue::new(Vec::new(), Vec::new()));
1333 cache.put_selector_result(key, result);
1334 assert!(cache.get_selector_result(&key).is_some());
1335 }
1336
1337 #[test]
1338 fn test_range_result_cache() {
1339 let cache = Arc::new(
1340 CacheManager::builder()
1341 .range_result_cache_size(1024 * 1024)
1342 .build(),
1343 );
1344
1345 let key = RangeScanCacheKey {
1346 region_id: RegionId::new(1, 1),
1347 row_groups: vec![(FileId::random(), 0)],
1348 scan: ScanRequestFingerprintBuilder {
1349 read_column_ids: vec![],
1350 read_column_types: vec![],
1351 filters: vec!["tag_0 = 1".to_string()],
1352 time_filters: vec![],
1353 series_row_selector: None,
1354 append_mode: false,
1355 filter_deleted: true,
1356 merge_mode: crate::region::options::MergeMode::LastRow,
1357 partition_expr_version: 0,
1358 }
1359 .build(),
1360 };
1361 let value = Arc::new(RangeScanCacheValue::new(Vec::new(), 0));
1362
1363 assert!(cache.get_range_result(&key).is_none());
1364 cache.put_range_result(key.clone(), value.clone());
1365 assert!(cache.get_range_result(&key).is_some());
1366
1367 let enable_all = CacheStrategy::EnableAll(cache.clone());
1368 assert!(enable_all.get_range_result(&key).is_some());
1369
1370 let compaction = CacheStrategy::Compaction(cache.clone());
1371 assert!(compaction.get_range_result(&key).is_none());
1372 compaction.put_range_result(key.clone(), value.clone());
1373 assert!(cache.get_range_result(&key).is_some());
1374
1375 let disabled = CacheStrategy::Disabled;
1376 assert!(disabled.get_range_result(&key).is_none());
1377 disabled.put_range_result(key.clone(), value);
1378 assert!(cache.get_range_result(&key).is_some());
1379 }
1380
1381 #[tokio::test]
1382 async fn test_evict_puffin_cache_clears_all_entries() {
1383 use std::collections::{BTreeMap, HashMap};
1384
1385 let cache = CacheManager::builder()
1386 .index_metadata_size(128)
1387 .index_content_size(128)
1388 .index_content_page_size(64)
1389 .index_result_cache_size(128)
1390 .puffin_metadata_size(128)
1391 .build();
1392 let cache = Arc::new(cache);
1393
1394 let region_id = RegionId::new(1, 1);
1395 let index_id = RegionIndexId::new(RegionFileId::new(region_id, FileId::random()), 0);
1396 let column_id: ColumnId = 1;
1397
1398 let bloom_cache = cache.bloom_filter_index_cache().unwrap().clone();
1399 let inverted_cache = cache.inverted_index_cache().unwrap().clone();
1400 let result_cache = cache.index_result_cache().unwrap();
1401 let puffin_metadata_cache = cache.puffin_metadata_cache().unwrap().clone();
1402
1403 let bloom_key = (
1404 index_id.file_id(),
1405 index_id.version,
1406 column_id,
1407 Tag::Skipping,
1408 );
1409 bloom_cache.put_metadata(bloom_key, Arc::new(BloomFilterMeta::default()));
1410 inverted_cache.put_metadata(
1411 (index_id.file_id(), index_id.version),
1412 Arc::new(InvertedIndexMetas::default()),
1413 );
1414 let predicate = PredicateKey::new_bloom(Arc::new(BTreeMap::new()));
1415 let selection = Arc::new(RowGroupSelection::default());
1416 result_cache.put(predicate.clone(), index_id.file_id(), selection);
1417 let file_id_str = index_id.to_string();
1418 let metadata = Arc::new(FileMetadata {
1419 blobs: Vec::new(),
1420 properties: HashMap::new(),
1421 });
1422 puffin_metadata_cache.put_metadata(file_id_str.clone(), metadata);
1423
1424 assert!(bloom_cache.get_metadata(bloom_key).is_some());
1425 assert!(
1426 inverted_cache
1427 .get_metadata((index_id.file_id(), index_id.version))
1428 .is_some()
1429 );
1430 assert!(result_cache.get(&predicate, index_id.file_id()).is_some());
1431 assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_some());
1432
1433 cache.evict_puffin_cache(index_id).await;
1434
1435 assert!(bloom_cache.get_metadata(bloom_key).is_none());
1436 assert!(
1437 inverted_cache
1438 .get_metadata((index_id.file_id(), index_id.version))
1439 .is_none()
1440 );
1441 assert!(result_cache.get(&predicate, index_id.file_id()).is_none());
1442 assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_none());
1443
1444 bloom_cache.put_metadata(bloom_key, Arc::new(BloomFilterMeta::default()));
1446 inverted_cache.put_metadata(
1447 (index_id.file_id(), index_id.version),
1448 Arc::new(InvertedIndexMetas::default()),
1449 );
1450 result_cache.put(
1451 predicate.clone(),
1452 index_id.file_id(),
1453 Arc::new(RowGroupSelection::default()),
1454 );
1455 puffin_metadata_cache.put_metadata(
1456 file_id_str.clone(),
1457 Arc::new(FileMetadata {
1458 blobs: Vec::new(),
1459 properties: HashMap::new(),
1460 }),
1461 );
1462
1463 let strategy = CacheStrategy::EnableAll(cache.clone());
1464 strategy.evict_puffin_cache(index_id).await;
1465
1466 assert!(bloom_cache.get_metadata(bloom_key).is_none());
1467 assert!(
1468 inverted_cache
1469 .get_metadata((index_id.file_id(), index_id.version))
1470 .is_none()
1471 );
1472 assert!(result_cache.get(&predicate, index_id.file_id()).is_none());
1473 assert!(puffin_metadata_cache.get_metadata(&file_id_str).is_none());
1474 }
1475
1476 fn wide_region_metadata(column_count: u32) -> RegionMetadata {
1477 let region_id = RegionId::new(1024, 7);
1478 let mut builder = RegionMetadataBuilder::new(region_id);
1479 let mut primary_key = Vec::new();
1480
1481 for column_id in 0..column_count {
1482 let semantic_type = if column_id < 32 {
1483 primary_key.push(column_id);
1484 SemanticType::Tag
1485 } else {
1486 SemanticType::Field
1487 };
1488 let mut column_schema = ColumnSchema::new(
1489 format!("wide_column_{column_id}"),
1490 ConcreteDataType::string_datatype(),
1491 true,
1492 );
1493 column_schema
1494 .mut_metadata()
1495 .insert(format!("cache_key_{column_id}"), "cache_value".repeat(4));
1496 builder.push_column_metadata(ColumnMetadata {
1497 column_schema,
1498 semantic_type,
1499 column_id,
1500 });
1501 }
1502
1503 builder.push_column_metadata(ColumnMetadata {
1504 column_schema: ColumnSchema::new(
1505 "ts",
1506 ConcreteDataType::timestamp_millisecond_datatype(),
1507 false,
1508 ),
1509 semantic_type: SemanticType::Timestamp,
1510 column_id: column_count,
1511 });
1512 builder.primary_key(primary_key);
1513
1514 builder.build().unwrap()
1515 }
1516}