1use std::fmt;
18use std::fmt::{Debug, Formatter};
19use std::num::NonZeroU64;
20use std::sync::Arc;
21use std::sync::atomic::{AtomicBool, Ordering};
22
23use common_base::readable_size::ReadableSize;
24use common_telemetry::{debug, error};
25use common_time::Timestamp;
26use partition::expr::PartitionExpr;
27use serde::{Deserialize, Serialize};
28use smallvec::SmallVec;
29use store_api::metadata::ColumnMetadata;
30use store_api::region_request::PathType;
31use store_api::storage::{ColumnId, FileId, IndexVersion, RegionId};
32
33use crate::access_layer::AccessLayerRef;
34use crate::cache::CacheManagerRef;
35use crate::cache::file_cache::{FileType, IndexKey};
36use crate::sst::file_purger::FilePurgerRef;
37use crate::sst::location;
38
39fn serialize_partition_expr<S>(
41 partition_expr: &Option<PartitionExpr>,
42 serializer: S,
43) -> Result<S::Ok, S::Error>
44where
45 S: serde::Serializer,
46{
47 use serde::ser::Error;
48
49 match partition_expr {
50 None => serializer.serialize_none(),
51 Some(expr) => {
52 let json_str = expr.as_json_str().map_err(S::Error::custom)?;
53 serializer.serialize_some(&json_str)
54 }
55 }
56}
57
58fn deserialize_partition_expr<'de, D>(deserializer: D) -> Result<Option<PartitionExpr>, D::Error>
59where
60 D: serde::Deserializer<'de>,
61{
62 use serde::de::Error;
63
64 let opt_json_str: Option<String> = Option::deserialize(deserializer)?;
65 match opt_json_str {
66 None => Ok(None),
67 Some(json_str) => {
68 if json_str.is_empty() {
69 Ok(None)
71 } else {
72 PartitionExpr::from_json_str(&json_str).map_err(D::Error::custom)
74 }
75 }
76 }
77}
78
79pub type Level = u8;
81pub const MAX_LEVEL: Level = 2;
83pub type IndexTypes = SmallVec<[IndexType; 4]>;
85
86#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
90pub struct RegionFileId {
91 region_id: RegionId,
93 file_id: FileId,
95}
96
97impl RegionFileId {
98 pub fn new(region_id: RegionId, file_id: FileId) -> Self {
100 Self { region_id, file_id }
101 }
102
103 pub fn region_id(&self) -> RegionId {
105 self.region_id
106 }
107
108 pub fn file_id(&self) -> FileId {
110 self.file_id
111 }
112}
113
114impl fmt::Display for RegionFileId {
115 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
116 write!(f, "{}/{}", self.region_id, self.file_id)
117 }
118}
119
120#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
122pub struct RegionIndexId {
123 pub file_id: RegionFileId,
124 pub version: IndexVersion,
125}
126
127impl RegionIndexId {
128 pub fn new(file_id: RegionFileId, version: IndexVersion) -> Self {
129 Self { file_id, version }
130 }
131
132 pub fn region_id(&self) -> RegionId {
133 self.file_id.region_id
134 }
135
136 pub fn file_id(&self) -> FileId {
137 self.file_id.file_id
138 }
139}
140
141impl fmt::Display for RegionIndexId {
142 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
143 if self.version == 0 {
144 write!(f, "{}/{}", self.file_id.region_id, self.file_id.file_id)
145 } else {
146 write!(
147 f,
148 "{}/{}.{}",
149 self.file_id.region_id, self.file_id.file_id, self.version
150 )
151 }
152 }
153}
154
155pub type FileTimeRange = (Timestamp, Timestamp);
158
159pub(crate) fn overlaps(l: &FileTimeRange, r: &FileTimeRange) -> bool {
161 let (l, r) = if l.0 <= r.0 { (l, r) } else { (r, l) };
162 let (_, l_end) = l;
163 let (r_start, _) = r;
164
165 r_start <= l_end
166}
167
168#[derive(Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
170#[serde(default)]
171pub struct FileMeta {
172 pub region_id: RegionId,
174 pub file_id: FileId,
176 pub time_range: FileTimeRange,
179 pub level: Level,
181 pub file_size: u64,
183 pub max_row_group_uncompressed_size: u64,
185 pub available_indexes: IndexTypes,
187 pub indexes: Vec<ColumnIndexMetadata>,
197 pub index_file_size: u64,
199 pub index_version: u64,
203 pub num_rows: u64,
209 pub num_row_groups: u64,
215 pub sequence: Option<NonZeroU64>,
220 #[serde(
228 serialize_with = "serialize_partition_expr",
229 deserialize_with = "deserialize_partition_expr"
230 )]
231 pub partition_expr: Option<PartitionExpr>,
232 pub num_series: u64,
236}
237
238impl Debug for FileMeta {
239 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
240 let mut debug_struct = f.debug_struct("FileMeta");
241 debug_struct
242 .field("region_id", &self.region_id)
243 .field_with("file_id", |f| write!(f, "{} ", self.file_id))
244 .field_with("time_range", |f| {
245 write!(
246 f,
247 "({}, {}) ",
248 self.time_range.0.to_iso8601_string(),
249 self.time_range.1.to_iso8601_string()
250 )
251 })
252 .field("level", &self.level)
253 .field("file_size", &ReadableSize(self.file_size))
254 .field(
255 "max_row_group_uncompressed_size",
256 &ReadableSize(self.max_row_group_uncompressed_size),
257 );
258 if !self.available_indexes.is_empty() {
259 debug_struct
260 .field("available_indexes", &self.available_indexes)
261 .field("indexes", &self.indexes)
262 .field("index_file_size", &ReadableSize(self.index_file_size));
263 }
264 debug_struct
265 .field("num_rows", &self.num_rows)
266 .field("num_row_groups", &self.num_row_groups)
267 .field_with("sequence", |f| match self.sequence {
268 None => {
269 write!(f, "None")
270 }
271 Some(seq) => {
272 write!(f, "{}", seq)
273 }
274 })
275 .field("partition_expr", &self.partition_expr)
276 .field("num_series", &self.num_series)
277 .finish()
278 }
279}
280
281#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
283pub enum IndexType {
284 InvertedIndex,
286 FulltextIndex,
288 BloomFilterIndex,
290}
291
292#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
302#[serde(default)]
303pub struct ColumnIndexMetadata {
304 pub column_id: ColumnId,
306 pub created_indexes: IndexTypes,
308}
309
310impl FileMeta {
311 pub fn exists_index(&self) -> bool {
312 !self.available_indexes.is_empty()
313 }
314
315 pub fn is_index_up_to_date(&self, other: &FileMeta) -> bool {
317 self.exists_index() && other.exists_index() && self.index_version >= other.index_version
318 }
319
320 pub fn inverted_index_available(&self) -> bool {
322 self.available_indexes.contains(&IndexType::InvertedIndex)
323 }
324
325 pub fn fulltext_index_available(&self) -> bool {
327 self.available_indexes.contains(&IndexType::FulltextIndex)
328 }
329
330 pub fn bloom_filter_index_available(&self) -> bool {
332 self.available_indexes
333 .contains(&IndexType::BloomFilterIndex)
334 }
335
336 pub fn index_file_size(&self) -> u64 {
337 self.index_file_size
338 }
339
340 pub fn is_index_consistent_with_region(&self, metadata: &[ColumnMetadata]) -> bool {
342 let id_to_indexes = self
343 .indexes
344 .iter()
345 .map(|index| (index.column_id, index.created_indexes.clone()))
346 .collect::<std::collections::HashMap<_, _>>();
347 for column in metadata {
348 if !column.column_schema.is_indexed() {
349 continue;
350 }
351 if let Some(indexes) = id_to_indexes.get(&column.column_id) {
352 if column.column_schema.is_inverted_indexed()
353 && !indexes.contains(&IndexType::InvertedIndex)
354 {
355 return false;
356 }
357 if column.column_schema.is_fulltext_indexed()
358 && !indexes.contains(&IndexType::FulltextIndex)
359 {
360 return false;
361 }
362 if column.column_schema.is_skipping_indexed()
363 && !indexes.contains(&IndexType::BloomFilterIndex)
364 {
365 return false;
366 }
367 } else {
368 return false;
369 }
370 }
371 true
372 }
373
374 pub fn file_id(&self) -> RegionFileId {
376 RegionFileId::new(self.region_id, self.file_id)
377 }
378
379 pub fn index_id(&self) -> RegionIndexId {
381 RegionIndexId::new(self.file_id(), self.index_version)
382 }
383}
384
385#[derive(Clone)]
387pub struct FileHandle {
388 inner: Arc<FileHandleInner>,
389}
390
391impl fmt::Debug for FileHandle {
392 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
393 f.debug_struct("FileHandle")
394 .field("meta", self.meta_ref())
395 .field("compacting", &self.compacting())
396 .field("deleted", &self.inner.deleted.load(Ordering::Relaxed))
397 .finish()
398 }
399}
400
401impl FileHandle {
402 pub fn new(meta: FileMeta, file_purger: FilePurgerRef) -> FileHandle {
403 FileHandle {
404 inner: Arc::new(FileHandleInner::new(meta, file_purger)),
405 }
406 }
407
408 pub fn region_id(&self) -> RegionId {
410 self.inner.meta.region_id
411 }
412
413 pub fn file_id(&self) -> RegionFileId {
415 RegionFileId::new(self.inner.meta.region_id, self.inner.meta.file_id)
416 }
417
418 pub fn index_id(&self) -> RegionIndexId {
420 RegionIndexId::new(self.file_id(), self.inner.meta.index_version)
421 }
422
423 pub fn file_path(&self, table_dir: &str, path_type: PathType) -> String {
425 location::sst_file_path(table_dir, self.file_id(), path_type)
426 }
427
428 pub fn time_range(&self) -> FileTimeRange {
430 self.inner.meta.time_range
431 }
432
433 pub fn mark_deleted(&self) {
435 self.inner.deleted.store(true, Ordering::Relaxed);
436 }
437
438 pub fn compacting(&self) -> bool {
439 self.inner.compacting.load(Ordering::Relaxed)
440 }
441
442 pub fn set_compacting(&self, compacting: bool) {
443 self.inner.compacting.store(compacting, Ordering::Relaxed);
444 }
445
446 pub fn index_outdated(&self) -> bool {
447 self.inner.index_outdated.load(Ordering::Relaxed)
448 }
449
450 pub fn set_index_outdated(&self, index_outdated: bool) {
451 self.inner
452 .index_outdated
453 .store(index_outdated, Ordering::Relaxed);
454 }
455
456 pub fn meta_ref(&self) -> &FileMeta {
458 &self.inner.meta
459 }
460
461 pub fn file_purger(&self) -> FilePurgerRef {
462 self.inner.file_purger.clone()
463 }
464
465 pub fn size(&self) -> u64 {
466 self.inner.meta.file_size
467 }
468
469 pub fn index_size(&self) -> u64 {
470 self.inner.meta.index_file_size
471 }
472
473 pub fn num_rows(&self) -> usize {
474 self.inner.meta.num_rows as usize
475 }
476
477 pub fn level(&self) -> Level {
478 self.inner.meta.level
479 }
480
481 pub fn is_deleted(&self) -> bool {
482 self.inner.deleted.load(Ordering::Relaxed)
483 }
484}
485
486struct FileHandleInner {
490 meta: FileMeta,
491 compacting: AtomicBool,
492 deleted: AtomicBool,
493 index_outdated: AtomicBool,
494 file_purger: FilePurgerRef,
495}
496
497impl Drop for FileHandleInner {
498 fn drop(&mut self) {
499 self.file_purger.remove_file(
500 self.meta.clone(),
501 self.deleted.load(Ordering::Acquire),
502 self.index_outdated.load(Ordering::Acquire),
503 );
504 }
505}
506
507impl FileHandleInner {
508 fn new(meta: FileMeta, file_purger: FilePurgerRef) -> FileHandleInner {
510 file_purger.new_file(&meta);
511 FileHandleInner {
512 meta,
513 compacting: AtomicBool::new(false),
514 deleted: AtomicBool::new(false),
515 index_outdated: AtomicBool::new(false),
516 file_purger,
517 }
518 }
519}
520
521pub async fn delete_files(
528 region_id: RegionId,
529 file_ids: &[(FileId, u64)],
530 delete_index: bool,
531 access_layer: &AccessLayerRef,
532 cache_manager: &Option<CacheManagerRef>,
533) -> crate::error::Result<()> {
534 if let Some(cache) = &cache_manager {
536 for (file_id, _) in file_ids {
537 cache.remove_parquet_meta_data(RegionFileId::new(region_id, *file_id));
538 }
539 }
540 let mut deleted_files = Vec::with_capacity(file_ids.len());
541
542 for (file_id, index_version) in file_ids {
543 let region_file_id = RegionFileId::new(region_id, *file_id);
544 match access_layer
545 .delete_sst(
546 ®ion_file_id,
547 &RegionIndexId::new(region_file_id, *index_version),
548 )
549 .await
550 {
551 Ok(_) => {
552 deleted_files.push(*file_id);
553 }
554 Err(e) => {
555 error!(e; "Failed to delete sst and index file for {}", region_file_id);
556 }
557 }
558 }
559
560 debug!(
561 "Deleted {} files for region {}: {:?}",
562 deleted_files.len(),
563 region_id,
564 deleted_files
565 );
566
567 for (file_id, index_version) in file_ids {
568 purge_index_cache_stager(
569 region_id,
570 delete_index,
571 access_layer,
572 cache_manager,
573 *file_id,
574 *index_version,
575 )
576 .await;
577 }
578 Ok(())
579}
580
581pub async fn delete_index(
582 region_index_id: RegionIndexId,
583 access_layer: &AccessLayerRef,
584 cache_manager: &Option<CacheManagerRef>,
585) -> crate::error::Result<()> {
586 access_layer.delete_index(region_index_id).await?;
587
588 purge_index_cache_stager(
589 region_index_id.region_id(),
590 true,
591 access_layer,
592 cache_manager,
593 region_index_id.file_id(),
594 region_index_id.version,
595 )
596 .await;
597
598 Ok(())
599}
600
601async fn purge_index_cache_stager(
602 region_id: RegionId,
603 delete_index: bool,
604 access_layer: &AccessLayerRef,
605 cache_manager: &Option<CacheManagerRef>,
606 file_id: FileId,
607 index_version: u64,
608) {
609 if let Some(write_cache) = cache_manager.as_ref().and_then(|cache| cache.write_cache()) {
610 if delete_index {
612 write_cache
613 .remove(IndexKey::new(
614 region_id,
615 file_id,
616 FileType::Puffin(index_version),
617 ))
618 .await;
619 }
620
621 write_cache
623 .remove(IndexKey::new(region_id, file_id, FileType::Parquet))
624 .await;
625 }
626
627 if let Err(e) = access_layer
629 .puffin_manager_factory()
630 .purge_stager(RegionIndexId::new(
631 RegionFileId::new(region_id, file_id),
632 index_version,
633 ))
634 .await
635 {
636 error!(e; "Failed to purge stager with index file, file_id: {}, index_version: {}, region: {}",
637 file_id, index_version, region_id);
638 }
639}
640
641#[cfg(test)]
642mod tests {
643 use std::str::FromStr;
644
645 use datatypes::prelude::ConcreteDataType;
646 use datatypes::schema::{
647 ColumnSchema, FulltextAnalyzer, FulltextBackend, FulltextOptions, SkippingIndexOptions,
648 };
649 use datatypes::value::Value;
650 use partition::expr::{PartitionExpr, col};
651
652 use super::*;
653
654 fn create_file_meta(file_id: FileId, level: Level) -> FileMeta {
655 FileMeta {
656 region_id: 0.into(),
657 file_id,
658 time_range: FileTimeRange::default(),
659 level,
660 file_size: 0,
661 max_row_group_uncompressed_size: 0,
662 available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
663 indexes: vec![ColumnIndexMetadata {
664 column_id: 0,
665 created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
666 }],
667 index_file_size: 0,
668 index_version: 0,
669 num_rows: 0,
670 num_row_groups: 0,
671 sequence: None,
672 partition_expr: None,
673 num_series: 0,
674 }
675 }
676
677 #[test]
678 fn test_deserialize_file_meta() {
679 let file_meta = create_file_meta(FileId::random(), 0);
680 let serialized_file_meta = serde_json::to_string(&file_meta).unwrap();
681 let deserialized_file_meta = serde_json::from_str(&serialized_file_meta);
682 assert_eq!(file_meta, deserialized_file_meta.unwrap());
683 }
684
685 #[test]
686 fn test_deserialize_from_string() {
687 let json_file_meta = "{\"region_id\":0,\"file_id\":\"bc5896ec-e4d8-4017-a80d-f2de73188d55\",\
688 \"time_range\":[{\"value\":0,\"unit\":\"Millisecond\"},{\"value\":0,\"unit\":\"Millisecond\"}],\
689 \"available_indexes\":[\"InvertedIndex\"],\"indexes\":[{\"column_id\": 0, \"created_indexes\": [\"InvertedIndex\"]}],\"level\":0}";
690 let file_meta = create_file_meta(
691 FileId::from_str("bc5896ec-e4d8-4017-a80d-f2de73188d55").unwrap(),
692 0,
693 );
694 let deserialized_file_meta: FileMeta = serde_json::from_str(json_file_meta).unwrap();
695 assert_eq!(file_meta, deserialized_file_meta);
696 }
697
698 #[test]
699 fn test_file_meta_with_partition_expr() {
700 let file_id = FileId::random();
701 let partition_expr = PartitionExpr::new(
702 col("a"),
703 partition::expr::RestrictedOp::GtEq,
704 Value::UInt32(10).into(),
705 );
706
707 let file_meta_with_partition = FileMeta {
708 region_id: 0.into(),
709 file_id,
710 time_range: FileTimeRange::default(),
711 level: 0,
712 file_size: 0,
713 max_row_group_uncompressed_size: 0,
714 available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
715 indexes: vec![ColumnIndexMetadata {
716 column_id: 0,
717 created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
718 }],
719 index_file_size: 0,
720 index_version: 0,
721 num_rows: 0,
722 num_row_groups: 0,
723 sequence: None,
724 partition_expr: Some(partition_expr.clone()),
725 num_series: 0,
726 };
727
728 let serialized = serde_json::to_string(&file_meta_with_partition).unwrap();
730 let deserialized: FileMeta = serde_json::from_str(&serialized).unwrap();
731 assert_eq!(file_meta_with_partition, deserialized);
732
733 let serialized_value: serde_json::Value = serde_json::from_str(&serialized).unwrap();
735 assert!(serialized_value["partition_expr"].as_str().is_some());
736 let partition_expr_json = serialized_value["partition_expr"].as_str().unwrap();
737 assert!(partition_expr_json.contains("\"Column\":\"a\""));
738 assert!(partition_expr_json.contains("\"op\":\"GtEq\""));
739
740 let file_meta_none = FileMeta {
742 partition_expr: None,
743 ..file_meta_with_partition.clone()
744 };
745 let serialized_none = serde_json::to_string(&file_meta_none).unwrap();
746 let deserialized_none: FileMeta = serde_json::from_str(&serialized_none).unwrap();
747 assert_eq!(file_meta_none, deserialized_none);
748 }
749
750 #[test]
751 fn test_file_meta_partition_expr_backward_compatibility() {
752 let json_with_partition_expr = r#"{
754 "region_id": 0,
755 "file_id": "bc5896ec-e4d8-4017-a80d-f2de73188d55",
756 "time_range": [
757 {"value": 0, "unit": "Millisecond"},
758 {"value": 0, "unit": "Millisecond"}
759 ],
760 "level": 0,
761 "file_size": 0,
762 "available_indexes": ["InvertedIndex"],
763 "index_file_size": 0,
764 "num_rows": 0,
765 "num_row_groups": 0,
766 "sequence": null,
767 "partition_expr": "{\"Expr\":{\"lhs\":{\"Column\":\"a\"},\"op\":\"GtEq\",\"rhs\":{\"Value\":{\"UInt32\":10}}}}"
768 }"#;
769
770 let file_meta: FileMeta = serde_json::from_str(json_with_partition_expr).unwrap();
771 assert!(file_meta.partition_expr.is_some());
772 let expr = file_meta.partition_expr.unwrap();
773 assert_eq!(format!("{}", expr), "a >= 10");
774
775 let json_with_empty_expr = r#"{
777 "region_id": 0,
778 "file_id": "bc5896ec-e4d8-4017-a80d-f2de73188d55",
779 "time_range": [
780 {"value": 0, "unit": "Millisecond"},
781 {"value": 0, "unit": "Millisecond"}
782 ],
783 "level": 0,
784 "file_size": 0,
785 "available_indexes": [],
786 "index_file_size": 0,
787 "num_rows": 0,
788 "num_row_groups": 0,
789 "sequence": null,
790 "partition_expr": ""
791 }"#;
792
793 let file_meta_empty: FileMeta = serde_json::from_str(json_with_empty_expr).unwrap();
794 assert!(file_meta_empty.partition_expr.is_none());
795
796 let json_with_null_expr = r#"{
798 "region_id": 0,
799 "file_id": "bc5896ec-e4d8-4017-a80d-f2de73188d55",
800 "time_range": [
801 {"value": 0, "unit": "Millisecond"},
802 {"value": 0, "unit": "Millisecond"}
803 ],
804 "level": 0,
805 "file_size": 0,
806 "available_indexes": [],
807 "index_file_size": 0,
808 "num_rows": 0,
809 "num_row_groups": 0,
810 "sequence": null,
811 "partition_expr": null
812 }"#;
813
814 let file_meta_null: FileMeta = serde_json::from_str(json_with_null_expr).unwrap();
815 assert!(file_meta_null.partition_expr.is_none());
816
817 let json_with_empty_expr = r#"{
819 "region_id": 0,
820 "file_id": "bc5896ec-e4d8-4017-a80d-f2de73188d55",
821 "time_range": [
822 {"value": 0, "unit": "Millisecond"},
823 {"value": 0, "unit": "Millisecond"}
824 ],
825 "level": 0,
826 "file_size": 0,
827 "available_indexes": [],
828 "index_file_size": 0,
829 "num_rows": 0,
830 "num_row_groups": 0,
831 "sequence": null
832 }"#;
833
834 let file_meta_empty: FileMeta = serde_json::from_str(json_with_empty_expr).unwrap();
835 assert!(file_meta_empty.partition_expr.is_none());
836 }
837
838 #[test]
839 fn test_file_meta_indexes_backward_compatibility() {
840 let json_old_file_meta = r#"{
842 "region_id": 0,
843 "file_id": "bc5896ec-e4d8-4017-a80d-f2de73188d55",
844 "time_range": [
845 {"value": 0, "unit": "Millisecond"},
846 {"value": 0, "unit": "Millisecond"}
847 ],
848 "available_indexes": ["InvertedIndex"],
849 "level": 0,
850 "file_size": 0,
851 "index_file_size": 0,
852 "num_rows": 0,
853 "num_row_groups": 0
854 }"#;
855
856 let deserialized_file_meta: FileMeta = serde_json::from_str(json_old_file_meta).unwrap();
857
858 assert_eq!(deserialized_file_meta.indexes, vec![]);
860
861 let expected_indexes: IndexTypes = SmallVec::from_iter([IndexType::InvertedIndex]);
862 assert_eq!(deserialized_file_meta.available_indexes, expected_indexes);
863
864 assert_eq!(
865 deserialized_file_meta.file_id,
866 FileId::from_str("bc5896ec-e4d8-4017-a80d-f2de73188d55").unwrap()
867 );
868 }
869 #[test]
870 fn test_is_index_consistent_with_region() {
871 fn new_column_meta(
872 id: ColumnId,
873 name: &str,
874 inverted: bool,
875 fulltext: bool,
876 skipping: bool,
877 ) -> ColumnMetadata {
878 let mut column_schema =
879 ColumnSchema::new(name, ConcreteDataType::string_datatype(), true);
880 if inverted {
881 column_schema = column_schema.with_inverted_index(true);
882 }
883 if fulltext {
884 column_schema = column_schema
885 .with_fulltext_options(FulltextOptions::new_unchecked(
886 true,
887 FulltextAnalyzer::English,
888 false,
889 FulltextBackend::Bloom,
890 1000,
891 0.01,
892 ))
893 .unwrap();
894 }
895 if skipping {
896 column_schema = column_schema
897 .with_skipping_options(SkippingIndexOptions::new_unchecked(
898 1024,
899 0.01,
900 datatypes::schema::SkippingIndexType::BloomFilter,
901 ))
902 .unwrap();
903 }
904
905 ColumnMetadata {
906 column_schema,
907 semantic_type: api::v1::SemanticType::Tag,
908 column_id: id,
909 }
910 }
911
912 let mut file_meta = FileMeta {
914 indexes: vec![ColumnIndexMetadata {
915 column_id: 1,
916 created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
917 }],
918 ..Default::default()
919 };
920 let region_meta = vec![new_column_meta(1, "tag1", true, false, false)];
921 assert!(file_meta.is_index_consistent_with_region(®ion_meta));
922
923 file_meta.indexes = vec![ColumnIndexMetadata {
925 column_id: 1,
926 created_indexes: SmallVec::from_iter([
927 IndexType::InvertedIndex,
928 IndexType::BloomFilterIndex,
929 ]),
930 }];
931 let region_meta = vec![new_column_meta(1, "tag1", true, false, false)];
932 assert!(file_meta.is_index_consistent_with_region(®ion_meta));
933
934 file_meta.indexes = vec![ColumnIndexMetadata {
936 column_id: 1,
937 created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
938 }];
939 let region_meta = vec![new_column_meta(1, "tag1", true, true, false)]; assert!(!file_meta.is_index_consistent_with_region(®ion_meta));
941
942 file_meta.indexes = vec![ColumnIndexMetadata {
944 column_id: 2, created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
946 }];
947 let region_meta = vec![new_column_meta(1, "tag1", true, false, false)]; assert!(!file_meta.is_index_consistent_with_region(®ion_meta));
949
950 file_meta.indexes = vec![ColumnIndexMetadata {
952 column_id: 1,
953 created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
954 }];
955 let region_meta = vec![new_column_meta(1, "tag1", false, false, false)]; assert!(file_meta.is_index_consistent_with_region(®ion_meta));
957
958 file_meta.indexes = vec![];
960 let region_meta = vec![new_column_meta(1, "tag1", true, false, false)];
961 assert!(!file_meta.is_index_consistent_with_region(®ion_meta));
962
963 file_meta.indexes = vec![
965 ColumnIndexMetadata {
966 column_id: 1,
967 created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
968 },
969 ColumnIndexMetadata {
970 column_id: 2, created_indexes: SmallVec::from_iter([IndexType::FulltextIndex]),
972 },
973 ];
974 let region_meta = vec![
975 new_column_meta(1, "tag1", true, false, false),
976 new_column_meta(2, "tag2", false, true, true), ];
978 assert!(!file_meta.is_index_consistent_with_region(®ion_meta));
979 }
980}