1use std::fmt;
18use std::fmt::{Debug, Formatter};
19use std::num::NonZeroU64;
20use std::sync::Arc;
21use std::sync::atomic::{AtomicBool, Ordering};
22
23use common_base::readable_size::ReadableSize;
24use common_telemetry::{debug, error};
25use common_time::Timestamp;
26use partition::expr::PartitionExpr;
27use serde::{Deserialize, Serialize};
28use smallvec::SmallVec;
29use store_api::metadata::ColumnMetadata;
30use store_api::region_request::PathType;
31use store_api::storage::{ColumnId, FileId, IndexVersion, RegionId};
32
33use crate::access_layer::AccessLayerRef;
34use crate::cache::CacheManagerRef;
35use crate::cache::file_cache::{FileType, IndexKey};
36use crate::sst::file_purger::FilePurgerRef;
37use crate::sst::location;
38
39fn serialize_partition_expr<S>(
41 partition_expr: &Option<PartitionExpr>,
42 serializer: S,
43) -> Result<S::Ok, S::Error>
44where
45 S: serde::Serializer,
46{
47 use serde::ser::Error;
48
49 match partition_expr {
50 None => serializer.serialize_none(),
51 Some(expr) => {
52 let json_str = expr.as_json_str().map_err(S::Error::custom)?;
53 serializer.serialize_some(&json_str)
54 }
55 }
56}
57
58fn deserialize_partition_expr<'de, D>(deserializer: D) -> Result<Option<PartitionExpr>, D::Error>
59where
60 D: serde::Deserializer<'de>,
61{
62 use serde::de::Error;
63
64 let opt_json_str: Option<String> = Option::deserialize(deserializer)?;
65 match opt_json_str {
66 None => Ok(None),
67 Some(json_str) => {
68 if json_str.is_empty() {
69 Ok(None)
71 } else {
72 PartitionExpr::from_json_str(&json_str).map_err(D::Error::custom)
74 }
75 }
76 }
77}
78
79pub type Level = u8;
81pub const MAX_LEVEL: Level = 2;
83pub type IndexTypes = SmallVec<[IndexType; 4]>;
85
86#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
90pub struct RegionFileId {
91 region_id: RegionId,
93 file_id: FileId,
95}
96
97impl RegionFileId {
98 pub fn new(region_id: RegionId, file_id: FileId) -> Self {
100 Self { region_id, file_id }
101 }
102
103 pub fn region_id(&self) -> RegionId {
105 self.region_id
106 }
107
108 pub fn file_id(&self) -> FileId {
110 self.file_id
111 }
112}
113
114impl fmt::Display for RegionFileId {
115 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
116 write!(f, "{}/{}", self.region_id, self.file_id)
117 }
118}
119
120#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
122pub struct RegionIndexId {
123 pub file_id: RegionFileId,
124 pub version: IndexVersion,
125}
126
127impl RegionIndexId {
128 pub fn new(file_id: RegionFileId, version: IndexVersion) -> Self {
129 Self { file_id, version }
130 }
131
132 pub fn region_id(&self) -> RegionId {
133 self.file_id.region_id
134 }
135
136 pub fn file_id(&self) -> FileId {
137 self.file_id.file_id
138 }
139}
140
141impl fmt::Display for RegionIndexId {
142 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
143 if self.version == 0 {
144 write!(f, "{}/{}", self.file_id.region_id, self.file_id.file_id)
145 } else {
146 write!(
147 f,
148 "{}/{}.{}",
149 self.file_id.region_id, self.file_id.file_id, self.version
150 )
151 }
152 }
153}
154
155pub type FileTimeRange = (Timestamp, Timestamp);
158
159pub(crate) fn overlaps(l: &FileTimeRange, r: &FileTimeRange) -> bool {
161 let (l, r) = if l.0 <= r.0 { (l, r) } else { (r, l) };
162 let (_, l_end) = l;
163 let (r_start, _) = r;
164
165 r_start <= l_end
166}
167
168#[derive(Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
170#[serde(default)]
171pub struct FileMeta {
172 pub region_id: RegionId,
174 pub file_id: FileId,
176 pub time_range: FileTimeRange,
179 pub level: Level,
181 pub file_size: u64,
183 pub max_row_group_uncompressed_size: u64,
185 pub available_indexes: IndexTypes,
187 pub indexes: Vec<ColumnIndexMetadata>,
197 pub index_file_size: u64,
199 pub index_version: u64,
203 pub num_rows: u64,
209 pub num_row_groups: u64,
215 pub sequence: Option<NonZeroU64>,
220 #[serde(
228 serialize_with = "serialize_partition_expr",
229 deserialize_with = "deserialize_partition_expr"
230 )]
231 pub partition_expr: Option<PartitionExpr>,
232 pub num_series: u64,
236}
237
238impl Debug for FileMeta {
239 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
240 let mut debug_struct = f.debug_struct("FileMeta");
241 debug_struct
242 .field("region_id", &self.region_id)
243 .field_with("file_id", |f| write!(f, "{} ", self.file_id))
244 .field_with("time_range", |f| {
245 write!(
246 f,
247 "({}, {}) ",
248 self.time_range.0.to_iso8601_string(),
249 self.time_range.1.to_iso8601_string()
250 )
251 })
252 .field("level", &self.level)
253 .field("file_size", &ReadableSize(self.file_size))
254 .field(
255 "max_row_group_uncompressed_size",
256 &ReadableSize(self.max_row_group_uncompressed_size),
257 );
258 if !self.available_indexes.is_empty() {
259 debug_struct
260 .field("available_indexes", &self.available_indexes)
261 .field("indexes", &self.indexes)
262 .field("index_file_size", &ReadableSize(self.index_file_size));
263 }
264 debug_struct
265 .field("num_rows", &self.num_rows)
266 .field("num_row_groups", &self.num_row_groups)
267 .field_with("sequence", |f| match self.sequence {
268 None => {
269 write!(f, "None")
270 }
271 Some(seq) => {
272 write!(f, "{}", seq)
273 }
274 })
275 .field("partition_expr", &self.partition_expr)
276 .field("num_series", &self.num_series)
277 .finish()
278 }
279}
280
281#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
283pub enum IndexType {
284 InvertedIndex,
286 FulltextIndex,
288 BloomFilterIndex,
290 #[cfg(feature = "vector_index")]
292 VectorIndex,
293}
294
295#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
305#[serde(default)]
306pub struct ColumnIndexMetadata {
307 pub column_id: ColumnId,
309 pub created_indexes: IndexTypes,
311}
312
313impl FileMeta {
314 pub fn exists_index(&self) -> bool {
315 !self.available_indexes.is_empty()
316 }
317
318 pub fn index_version(&self) -> Option<IndexVersion> {
319 if self.exists_index() {
320 Some(self.index_version)
321 } else {
322 None
323 }
324 }
325
326 pub fn is_index_up_to_date(&self, other: &FileMeta) -> bool {
328 self.exists_index() && other.exists_index() && self.index_version >= other.index_version
329 }
330
331 pub fn inverted_index_available(&self) -> bool {
333 self.available_indexes.contains(&IndexType::InvertedIndex)
334 }
335
336 pub fn fulltext_index_available(&self) -> bool {
338 self.available_indexes.contains(&IndexType::FulltextIndex)
339 }
340
341 pub fn bloom_filter_index_available(&self) -> bool {
343 self.available_indexes
344 .contains(&IndexType::BloomFilterIndex)
345 }
346
347 pub fn index_file_size(&self) -> u64 {
348 self.index_file_size
349 }
350
351 pub fn is_index_consistent_with_region(&self, metadata: &[ColumnMetadata]) -> bool {
353 let id_to_indexes = self
354 .indexes
355 .iter()
356 .map(|index| (index.column_id, index.created_indexes.clone()))
357 .collect::<std::collections::HashMap<_, _>>();
358 for column in metadata {
359 if !column.column_schema.is_indexed() {
360 continue;
361 }
362 if let Some(indexes) = id_to_indexes.get(&column.column_id) {
363 if column.column_schema.is_inverted_indexed()
364 && !indexes.contains(&IndexType::InvertedIndex)
365 {
366 return false;
367 }
368 if column.column_schema.is_fulltext_indexed()
369 && !indexes.contains(&IndexType::FulltextIndex)
370 {
371 return false;
372 }
373 if column.column_schema.is_skipping_indexed()
374 && !indexes.contains(&IndexType::BloomFilterIndex)
375 {
376 return false;
377 }
378 } else {
379 return false;
380 }
381 }
382 true
383 }
384
385 pub fn file_id(&self) -> RegionFileId {
387 RegionFileId::new(self.region_id, self.file_id)
388 }
389
390 pub fn index_id(&self) -> RegionIndexId {
392 RegionIndexId::new(self.file_id(), self.index_version)
393 }
394}
395
396#[derive(Clone)]
398pub struct FileHandle {
399 inner: Arc<FileHandleInner>,
400}
401
402impl fmt::Debug for FileHandle {
403 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
404 f.debug_struct("FileHandle")
405 .field("meta", self.meta_ref())
406 .field("compacting", &self.compacting())
407 .field("deleted", &self.inner.deleted.load(Ordering::Relaxed))
408 .finish()
409 }
410}
411
412impl FileHandle {
413 pub fn new(meta: FileMeta, file_purger: FilePurgerRef) -> FileHandle {
414 FileHandle {
415 inner: Arc::new(FileHandleInner::new(meta, file_purger)),
416 }
417 }
418
419 pub fn region_id(&self) -> RegionId {
421 self.inner.meta.region_id
422 }
423
424 pub fn file_id(&self) -> RegionFileId {
426 RegionFileId::new(self.inner.meta.region_id, self.inner.meta.file_id)
427 }
428
429 pub fn index_id(&self) -> RegionIndexId {
431 RegionIndexId::new(self.file_id(), self.inner.meta.index_version)
432 }
433
434 pub fn file_path(&self, table_dir: &str, path_type: PathType) -> String {
436 location::sst_file_path(table_dir, self.file_id(), path_type)
437 }
438
439 pub fn time_range(&self) -> FileTimeRange {
441 self.inner.meta.time_range
442 }
443
444 pub fn mark_deleted(&self) {
446 self.inner.deleted.store(true, Ordering::Relaxed);
447 }
448
449 pub fn compacting(&self) -> bool {
450 self.inner.compacting.load(Ordering::Relaxed)
451 }
452
453 pub fn set_compacting(&self, compacting: bool) {
454 self.inner.compacting.store(compacting, Ordering::Relaxed);
455 }
456
457 pub fn index_outdated(&self) -> bool {
458 self.inner.index_outdated.load(Ordering::Relaxed)
459 }
460
461 pub fn set_index_outdated(&self, index_outdated: bool) {
462 self.inner
463 .index_outdated
464 .store(index_outdated, Ordering::Relaxed);
465 }
466
467 pub fn meta_ref(&self) -> &FileMeta {
469 &self.inner.meta
470 }
471
472 pub fn file_purger(&self) -> FilePurgerRef {
473 self.inner.file_purger.clone()
474 }
475
476 pub fn size(&self) -> u64 {
477 self.inner.meta.file_size
478 }
479
480 pub fn index_size(&self) -> u64 {
481 self.inner.meta.index_file_size
482 }
483
484 pub fn num_rows(&self) -> usize {
485 self.inner.meta.num_rows as usize
486 }
487
488 pub fn level(&self) -> Level {
489 self.inner.meta.level
490 }
491
492 pub fn is_deleted(&self) -> bool {
493 self.inner.deleted.load(Ordering::Relaxed)
494 }
495}
496
497struct FileHandleInner {
501 meta: FileMeta,
502 compacting: AtomicBool,
503 deleted: AtomicBool,
504 index_outdated: AtomicBool,
505 file_purger: FilePurgerRef,
506}
507
508impl Drop for FileHandleInner {
509 fn drop(&mut self) {
510 self.file_purger.remove_file(
511 self.meta.clone(),
512 self.deleted.load(Ordering::Acquire),
513 self.index_outdated.load(Ordering::Acquire),
514 );
515 }
516}
517
518impl FileHandleInner {
519 fn new(meta: FileMeta, file_purger: FilePurgerRef) -> FileHandleInner {
521 file_purger.new_file(&meta);
522 FileHandleInner {
523 meta,
524 compacting: AtomicBool::new(false),
525 deleted: AtomicBool::new(false),
526 index_outdated: AtomicBool::new(false),
527 file_purger,
528 }
529 }
530}
531
532pub async fn delete_files(
539 region_id: RegionId,
540 file_ids: &[(FileId, u64)],
541 delete_index: bool,
542 access_layer: &AccessLayerRef,
543 cache_manager: &Option<CacheManagerRef>,
544) -> crate::error::Result<()> {
545 if let Some(cache) = &cache_manager {
547 for (file_id, _) in file_ids {
548 cache.remove_parquet_meta_data(RegionFileId::new(region_id, *file_id));
549 }
550 }
551 let mut deleted_files = Vec::with_capacity(file_ids.len());
552
553 for (file_id, index_version) in file_ids {
554 let region_file_id = RegionFileId::new(region_id, *file_id);
555 match access_layer
556 .delete_sst(
557 ®ion_file_id,
558 &RegionIndexId::new(region_file_id, *index_version),
559 )
560 .await
561 {
562 Ok(_) => {
563 deleted_files.push(*file_id);
564 }
565 Err(e) => {
566 error!(e; "Failed to delete sst and index file for {}", region_file_id);
567 }
568 }
569 }
570
571 debug!(
572 "Deleted {} files for region {}: {:?}",
573 deleted_files.len(),
574 region_id,
575 deleted_files
576 );
577
578 for (file_id, index_version) in file_ids {
579 purge_index_cache_stager(
580 region_id,
581 delete_index,
582 access_layer,
583 cache_manager,
584 *file_id,
585 *index_version,
586 )
587 .await;
588 }
589 Ok(())
590}
591
592pub async fn delete_index(
593 region_index_id: RegionIndexId,
594 access_layer: &AccessLayerRef,
595 cache_manager: &Option<CacheManagerRef>,
596) -> crate::error::Result<()> {
597 access_layer.delete_index(region_index_id).await?;
598
599 purge_index_cache_stager(
600 region_index_id.region_id(),
601 true,
602 access_layer,
603 cache_manager,
604 region_index_id.file_id(),
605 region_index_id.version,
606 )
607 .await;
608
609 Ok(())
610}
611
612async fn purge_index_cache_stager(
613 region_id: RegionId,
614 delete_index: bool,
615 access_layer: &AccessLayerRef,
616 cache_manager: &Option<CacheManagerRef>,
617 file_id: FileId,
618 index_version: u64,
619) {
620 if let Some(write_cache) = cache_manager.as_ref().and_then(|cache| cache.write_cache()) {
621 if delete_index {
623 write_cache
624 .remove(IndexKey::new(
625 region_id,
626 file_id,
627 FileType::Puffin(index_version),
628 ))
629 .await;
630 }
631
632 write_cache
634 .remove(IndexKey::new(region_id, file_id, FileType::Parquet))
635 .await;
636 }
637
638 if let Err(e) = access_layer
640 .puffin_manager_factory()
641 .purge_stager(RegionIndexId::new(
642 RegionFileId::new(region_id, file_id),
643 index_version,
644 ))
645 .await
646 {
647 error!(e; "Failed to purge stager with index file, file_id: {}, index_version: {}, region: {}",
648 file_id, index_version, region_id);
649 }
650}
651
652#[cfg(test)]
653mod tests {
654 use std::str::FromStr;
655
656 use datatypes::prelude::ConcreteDataType;
657 use datatypes::schema::{
658 ColumnSchema, FulltextAnalyzer, FulltextBackend, FulltextOptions, SkippingIndexOptions,
659 };
660 use datatypes::value::Value;
661 use partition::expr::{PartitionExpr, col};
662
663 use super::*;
664
665 fn create_file_meta(file_id: FileId, level: Level) -> FileMeta {
666 FileMeta {
667 region_id: 0.into(),
668 file_id,
669 time_range: FileTimeRange::default(),
670 level,
671 file_size: 0,
672 max_row_group_uncompressed_size: 0,
673 available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
674 indexes: vec![ColumnIndexMetadata {
675 column_id: 0,
676 created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
677 }],
678 index_file_size: 0,
679 index_version: 0,
680 num_rows: 0,
681 num_row_groups: 0,
682 sequence: None,
683 partition_expr: None,
684 num_series: 0,
685 }
686 }
687
688 #[test]
689 fn test_deserialize_file_meta() {
690 let file_meta = create_file_meta(FileId::random(), 0);
691 let serialized_file_meta = serde_json::to_string(&file_meta).unwrap();
692 let deserialized_file_meta = serde_json::from_str(&serialized_file_meta);
693 assert_eq!(file_meta, deserialized_file_meta.unwrap());
694 }
695
696 #[test]
697 fn test_deserialize_from_string() {
698 let json_file_meta = "{\"region_id\":0,\"file_id\":\"bc5896ec-e4d8-4017-a80d-f2de73188d55\",\
699 \"time_range\":[{\"value\":0,\"unit\":\"Millisecond\"},{\"value\":0,\"unit\":\"Millisecond\"}],\
700 \"available_indexes\":[\"InvertedIndex\"],\"indexes\":[{\"column_id\": 0, \"created_indexes\": [\"InvertedIndex\"]}],\"level\":0}";
701 let file_meta = create_file_meta(
702 FileId::from_str("bc5896ec-e4d8-4017-a80d-f2de73188d55").unwrap(),
703 0,
704 );
705 let deserialized_file_meta: FileMeta = serde_json::from_str(json_file_meta).unwrap();
706 assert_eq!(file_meta, deserialized_file_meta);
707 }
708
709 #[test]
710 fn test_file_meta_with_partition_expr() {
711 let file_id = FileId::random();
712 let partition_expr = PartitionExpr::new(
713 col("a"),
714 partition::expr::RestrictedOp::GtEq,
715 Value::UInt32(10).into(),
716 );
717
718 let file_meta_with_partition = FileMeta {
719 region_id: 0.into(),
720 file_id,
721 time_range: FileTimeRange::default(),
722 level: 0,
723 file_size: 0,
724 max_row_group_uncompressed_size: 0,
725 available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
726 indexes: vec![ColumnIndexMetadata {
727 column_id: 0,
728 created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
729 }],
730 index_file_size: 0,
731 index_version: 0,
732 num_rows: 0,
733 num_row_groups: 0,
734 sequence: None,
735 partition_expr: Some(partition_expr.clone()),
736 num_series: 0,
737 };
738
739 let serialized = serde_json::to_string(&file_meta_with_partition).unwrap();
741 let deserialized: FileMeta = serde_json::from_str(&serialized).unwrap();
742 assert_eq!(file_meta_with_partition, deserialized);
743
744 let serialized_value: serde_json::Value = serde_json::from_str(&serialized).unwrap();
746 assert!(serialized_value["partition_expr"].as_str().is_some());
747 let partition_expr_json = serialized_value["partition_expr"].as_str().unwrap();
748 assert!(partition_expr_json.contains("\"Column\":\"a\""));
749 assert!(partition_expr_json.contains("\"op\":\"GtEq\""));
750
751 let file_meta_none = FileMeta {
753 partition_expr: None,
754 ..file_meta_with_partition.clone()
755 };
756 let serialized_none = serde_json::to_string(&file_meta_none).unwrap();
757 let deserialized_none: FileMeta = serde_json::from_str(&serialized_none).unwrap();
758 assert_eq!(file_meta_none, deserialized_none);
759 }
760
761 #[test]
762 fn test_file_meta_partition_expr_backward_compatibility() {
763 let json_with_partition_expr = r#"{
765 "region_id": 0,
766 "file_id": "bc5896ec-e4d8-4017-a80d-f2de73188d55",
767 "time_range": [
768 {"value": 0, "unit": "Millisecond"},
769 {"value": 0, "unit": "Millisecond"}
770 ],
771 "level": 0,
772 "file_size": 0,
773 "available_indexes": ["InvertedIndex"],
774 "index_file_size": 0,
775 "num_rows": 0,
776 "num_row_groups": 0,
777 "sequence": null,
778 "partition_expr": "{\"Expr\":{\"lhs\":{\"Column\":\"a\"},\"op\":\"GtEq\",\"rhs\":{\"Value\":{\"UInt32\":10}}}}"
779 }"#;
780
781 let file_meta: FileMeta = serde_json::from_str(json_with_partition_expr).unwrap();
782 assert!(file_meta.partition_expr.is_some());
783 let expr = file_meta.partition_expr.unwrap();
784 assert_eq!(format!("{}", expr), "a >= 10");
785
786 let json_with_empty_expr = r#"{
788 "region_id": 0,
789 "file_id": "bc5896ec-e4d8-4017-a80d-f2de73188d55",
790 "time_range": [
791 {"value": 0, "unit": "Millisecond"},
792 {"value": 0, "unit": "Millisecond"}
793 ],
794 "level": 0,
795 "file_size": 0,
796 "available_indexes": [],
797 "index_file_size": 0,
798 "num_rows": 0,
799 "num_row_groups": 0,
800 "sequence": null,
801 "partition_expr": ""
802 }"#;
803
804 let file_meta_empty: FileMeta = serde_json::from_str(json_with_empty_expr).unwrap();
805 assert!(file_meta_empty.partition_expr.is_none());
806
807 let json_with_null_expr = r#"{
809 "region_id": 0,
810 "file_id": "bc5896ec-e4d8-4017-a80d-f2de73188d55",
811 "time_range": [
812 {"value": 0, "unit": "Millisecond"},
813 {"value": 0, "unit": "Millisecond"}
814 ],
815 "level": 0,
816 "file_size": 0,
817 "available_indexes": [],
818 "index_file_size": 0,
819 "num_rows": 0,
820 "num_row_groups": 0,
821 "sequence": null,
822 "partition_expr": null
823 }"#;
824
825 let file_meta_null: FileMeta = serde_json::from_str(json_with_null_expr).unwrap();
826 assert!(file_meta_null.partition_expr.is_none());
827
828 let json_with_empty_expr = r#"{
830 "region_id": 0,
831 "file_id": "bc5896ec-e4d8-4017-a80d-f2de73188d55",
832 "time_range": [
833 {"value": 0, "unit": "Millisecond"},
834 {"value": 0, "unit": "Millisecond"}
835 ],
836 "level": 0,
837 "file_size": 0,
838 "available_indexes": [],
839 "index_file_size": 0,
840 "num_rows": 0,
841 "num_row_groups": 0,
842 "sequence": null
843 }"#;
844
845 let file_meta_empty: FileMeta = serde_json::from_str(json_with_empty_expr).unwrap();
846 assert!(file_meta_empty.partition_expr.is_none());
847 }
848
849 #[test]
850 fn test_file_meta_indexes_backward_compatibility() {
851 let json_old_file_meta = r#"{
853 "region_id": 0,
854 "file_id": "bc5896ec-e4d8-4017-a80d-f2de73188d55",
855 "time_range": [
856 {"value": 0, "unit": "Millisecond"},
857 {"value": 0, "unit": "Millisecond"}
858 ],
859 "available_indexes": ["InvertedIndex"],
860 "level": 0,
861 "file_size": 0,
862 "index_file_size": 0,
863 "num_rows": 0,
864 "num_row_groups": 0
865 }"#;
866
867 let deserialized_file_meta: FileMeta = serde_json::from_str(json_old_file_meta).unwrap();
868
869 assert_eq!(deserialized_file_meta.indexes, vec![]);
871
872 let expected_indexes: IndexTypes = SmallVec::from_iter([IndexType::InvertedIndex]);
873 assert_eq!(deserialized_file_meta.available_indexes, expected_indexes);
874
875 assert_eq!(
876 deserialized_file_meta.file_id,
877 FileId::from_str("bc5896ec-e4d8-4017-a80d-f2de73188d55").unwrap()
878 );
879 }
880 #[test]
881 fn test_is_index_consistent_with_region() {
882 fn new_column_meta(
883 id: ColumnId,
884 name: &str,
885 inverted: bool,
886 fulltext: bool,
887 skipping: bool,
888 ) -> ColumnMetadata {
889 let mut column_schema =
890 ColumnSchema::new(name, ConcreteDataType::string_datatype(), true);
891 if inverted {
892 column_schema = column_schema.with_inverted_index(true);
893 }
894 if fulltext {
895 column_schema = column_schema
896 .with_fulltext_options(FulltextOptions::new_unchecked(
897 true,
898 FulltextAnalyzer::English,
899 false,
900 FulltextBackend::Bloom,
901 1000,
902 0.01,
903 ))
904 .unwrap();
905 }
906 if skipping {
907 column_schema = column_schema
908 .with_skipping_options(SkippingIndexOptions::new_unchecked(
909 1024,
910 0.01,
911 datatypes::schema::SkippingIndexType::BloomFilter,
912 ))
913 .unwrap();
914 }
915
916 ColumnMetadata {
917 column_schema,
918 semantic_type: api::v1::SemanticType::Tag,
919 column_id: id,
920 }
921 }
922
923 let mut file_meta = FileMeta {
925 indexes: vec![ColumnIndexMetadata {
926 column_id: 1,
927 created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
928 }],
929 ..Default::default()
930 };
931 let region_meta = vec![new_column_meta(1, "tag1", true, false, false)];
932 assert!(file_meta.is_index_consistent_with_region(®ion_meta));
933
934 file_meta.indexes = vec![ColumnIndexMetadata {
936 column_id: 1,
937 created_indexes: SmallVec::from_iter([
938 IndexType::InvertedIndex,
939 IndexType::BloomFilterIndex,
940 ]),
941 }];
942 let region_meta = vec![new_column_meta(1, "tag1", true, false, false)];
943 assert!(file_meta.is_index_consistent_with_region(®ion_meta));
944
945 file_meta.indexes = vec![ColumnIndexMetadata {
947 column_id: 1,
948 created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
949 }];
950 let region_meta = vec![new_column_meta(1, "tag1", true, true, false)]; assert!(!file_meta.is_index_consistent_with_region(®ion_meta));
952
953 file_meta.indexes = vec![ColumnIndexMetadata {
955 column_id: 2, created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
957 }];
958 let region_meta = vec![new_column_meta(1, "tag1", true, false, false)]; assert!(!file_meta.is_index_consistent_with_region(®ion_meta));
960
961 file_meta.indexes = vec![ColumnIndexMetadata {
963 column_id: 1,
964 created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
965 }];
966 let region_meta = vec![new_column_meta(1, "tag1", false, false, false)]; assert!(file_meta.is_index_consistent_with_region(®ion_meta));
968
969 file_meta.indexes = vec![];
971 let region_meta = vec![new_column_meta(1, "tag1", true, false, false)];
972 assert!(!file_meta.is_index_consistent_with_region(®ion_meta));
973
974 file_meta.indexes = vec![
976 ColumnIndexMetadata {
977 column_id: 1,
978 created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
979 },
980 ColumnIndexMetadata {
981 column_id: 2, created_indexes: SmallVec::from_iter([IndexType::FulltextIndex]),
983 },
984 ];
985 let region_meta = vec![
986 new_column_meta(1, "tag1", true, false, false),
987 new_column_meta(2, "tag2", false, true, true), ];
989 assert!(!file_meta.is_index_consistent_with_region(®ion_meta));
990 }
991}