1use std::fmt;
18use std::fmt::{Debug, Formatter};
19use std::num::NonZeroU64;
20use std::sync::Arc;
21use std::sync::atomic::{AtomicBool, Ordering};
22
23use common_base::readable_size::ReadableSize;
24use common_telemetry::{debug, error};
25use common_time::Timestamp;
26use partition::expr::PartitionExpr;
27use serde::{Deserialize, Serialize};
28use smallvec::SmallVec;
29use store_api::metadata::ColumnMetadata;
30use store_api::region_request::PathType;
31use store_api::storage::{ColumnId, FileId, IndexVersion, RegionId};
32
33use crate::access_layer::AccessLayerRef;
34use crate::cache::CacheManagerRef;
35use crate::cache::file_cache::{FileType, IndexKey};
36use crate::sst::file_purger::FilePurgerRef;
37use crate::sst::location;
38
39fn serialize_partition_expr<S>(
41 partition_expr: &Option<PartitionExpr>,
42 serializer: S,
43) -> Result<S::Ok, S::Error>
44where
45 S: serde::Serializer,
46{
47 use serde::ser::Error;
48
49 match partition_expr {
50 None => serializer.serialize_none(),
51 Some(expr) => {
52 let json_str = expr.as_json_str().map_err(S::Error::custom)?;
53 serializer.serialize_some(&json_str)
54 }
55 }
56}
57
58fn deserialize_partition_expr<'de, D>(deserializer: D) -> Result<Option<PartitionExpr>, D::Error>
59where
60 D: serde::Deserializer<'de>,
61{
62 use serde::de::Error;
63
64 let opt_json_str: Option<String> = Option::deserialize(deserializer)?;
65 match opt_json_str {
66 None => Ok(None),
67 Some(json_str) => {
68 if json_str.is_empty() {
69 Ok(None)
71 } else {
72 PartitionExpr::from_json_str(&json_str).map_err(D::Error::custom)
74 }
75 }
76 }
77}
78
79pub type Level = u8;
81pub const MAX_LEVEL: Level = 2;
83pub type IndexTypes = SmallVec<[IndexType; 4]>;
85
86#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
90pub struct RegionFileId {
91 region_id: RegionId,
93 file_id: FileId,
95}
96
97impl RegionFileId {
98 pub fn new(region_id: RegionId, file_id: FileId) -> Self {
100 Self { region_id, file_id }
101 }
102
103 pub fn region_id(&self) -> RegionId {
105 self.region_id
106 }
107
108 pub fn file_id(&self) -> FileId {
110 self.file_id
111 }
112}
113
114impl fmt::Display for RegionFileId {
115 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
116 write!(f, "{}/{}", self.region_id, self.file_id)
117 }
118}
119
120#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
122pub struct RegionIndexId {
123 pub file_id: RegionFileId,
124 pub version: IndexVersion,
125}
126
127impl RegionIndexId {
128 pub fn new(file_id: RegionFileId, version: IndexVersion) -> Self {
129 Self { file_id, version }
130 }
131
132 pub fn region_id(&self) -> RegionId {
133 self.file_id.region_id
134 }
135
136 pub fn file_id(&self) -> FileId {
137 self.file_id.file_id
138 }
139}
140
141impl fmt::Display for RegionIndexId {
142 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
143 if self.version == 0 {
144 write!(f, "{}/{}", self.file_id.region_id, self.file_id.file_id)
145 } else {
146 write!(
147 f,
148 "{}/{}.{}",
149 self.file_id.region_id, self.file_id.file_id, self.version
150 )
151 }
152 }
153}
154
155pub type FileTimeRange = (Timestamp, Timestamp);
158
159pub(crate) fn overlaps(l: &FileTimeRange, r: &FileTimeRange) -> bool {
161 let (l, r) = if l.0 <= r.0 { (l, r) } else { (r, l) };
162 let (_, l_end) = l;
163 let (r_start, _) = r;
164
165 r_start <= l_end
166}
167
168#[derive(Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
170#[serde(default)]
171pub struct FileMeta {
172 pub region_id: RegionId,
174 pub file_id: FileId,
176 pub time_range: FileTimeRange,
179 pub level: Level,
181 pub file_size: u64,
183 pub max_row_group_uncompressed_size: u64,
185 pub available_indexes: IndexTypes,
187 pub indexes: Vec<ColumnIndexMetadata>,
197 pub index_file_size: u64,
199 pub index_version: u64,
203 pub num_rows: u64,
209 pub num_row_groups: u64,
215 pub sequence: Option<NonZeroU64>,
220 #[serde(
228 serialize_with = "serialize_partition_expr",
229 deserialize_with = "deserialize_partition_expr"
230 )]
231 pub partition_expr: Option<PartitionExpr>,
232 pub num_series: u64,
236}
237
238impl Debug for FileMeta {
239 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
240 let mut debug_struct = f.debug_struct("FileMeta");
241 debug_struct
242 .field("region_id", &self.region_id)
243 .field_with("file_id", |f| write!(f, "{} ", self.file_id))
244 .field_with("time_range", |f| {
245 write!(
246 f,
247 "({}, {}) ",
248 self.time_range.0.to_iso8601_string(),
249 self.time_range.1.to_iso8601_string()
250 )
251 })
252 .field("level", &self.level)
253 .field("file_size", &ReadableSize(self.file_size))
254 .field(
255 "max_row_group_uncompressed_size",
256 &ReadableSize(self.max_row_group_uncompressed_size),
257 );
258 if !self.available_indexes.is_empty() {
259 debug_struct
260 .field("available_indexes", &self.available_indexes)
261 .field("indexes", &self.indexes)
262 .field("index_file_size", &ReadableSize(self.index_file_size));
263 }
264 debug_struct
265 .field("num_rows", &self.num_rows)
266 .field("num_row_groups", &self.num_row_groups)
267 .field_with("sequence", |f| match self.sequence {
268 None => {
269 write!(f, "None")
270 }
271 Some(seq) => {
272 write!(f, "{}", seq)
273 }
274 })
275 .field("partition_expr", &self.partition_expr)
276 .field("num_series", &self.num_series)
277 .finish()
278 }
279}
280
281#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
283pub enum IndexType {
284 InvertedIndex,
286 FulltextIndex,
288 BloomFilterIndex,
290 #[cfg(feature = "vector_index")]
292 VectorIndex,
293}
294
295#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
305#[serde(default)]
306pub struct ColumnIndexMetadata {
307 pub column_id: ColumnId,
309 pub created_indexes: IndexTypes,
311}
312
313impl FileMeta {
314 pub fn exists_index(&self) -> bool {
315 !self.available_indexes.is_empty()
316 }
317
318 pub fn index_version(&self) -> Option<IndexVersion> {
319 if self.exists_index() {
320 Some(self.index_version)
321 } else {
322 None
323 }
324 }
325
326 pub fn is_index_up_to_date(&self, other: &FileMeta) -> bool {
328 self.exists_index() && other.exists_index() && self.index_version >= other.index_version
329 }
330
331 pub fn inverted_index_available(&self) -> bool {
333 self.available_indexes.contains(&IndexType::InvertedIndex)
334 }
335
336 pub fn fulltext_index_available(&self) -> bool {
338 self.available_indexes.contains(&IndexType::FulltextIndex)
339 }
340
341 pub fn bloom_filter_index_available(&self) -> bool {
343 self.available_indexes
344 .contains(&IndexType::BloomFilterIndex)
345 }
346
347 #[cfg(feature = "vector_index")]
349 pub fn vector_index_available(&self) -> bool {
350 self.available_indexes.contains(&IndexType::VectorIndex)
351 }
352
353 pub fn index_file_size(&self) -> u64 {
354 self.index_file_size
355 }
356
357 pub fn is_index_consistent_with_region(&self, metadata: &[ColumnMetadata]) -> bool {
359 let id_to_indexes = self
360 .indexes
361 .iter()
362 .map(|index| (index.column_id, index.created_indexes.clone()))
363 .collect::<std::collections::HashMap<_, _>>();
364 for column in metadata {
365 if !column.column_schema.is_indexed() {
366 continue;
367 }
368 if let Some(indexes) = id_to_indexes.get(&column.column_id) {
369 if column.column_schema.is_inverted_indexed()
370 && !indexes.contains(&IndexType::InvertedIndex)
371 {
372 return false;
373 }
374 if column.column_schema.is_fulltext_indexed()
375 && !indexes.contains(&IndexType::FulltextIndex)
376 {
377 return false;
378 }
379 if column.column_schema.is_skipping_indexed()
380 && !indexes.contains(&IndexType::BloomFilterIndex)
381 {
382 return false;
383 }
384 } else {
385 return false;
386 }
387 }
388 true
389 }
390
391 pub fn file_id(&self) -> RegionFileId {
393 RegionFileId::new(self.region_id, self.file_id)
394 }
395
396 pub fn index_id(&self) -> RegionIndexId {
398 RegionIndexId::new(self.file_id(), self.index_version)
399 }
400}
401
402#[derive(Clone)]
404pub struct FileHandle {
405 inner: Arc<FileHandleInner>,
406}
407
408impl fmt::Debug for FileHandle {
409 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
410 f.debug_struct("FileHandle")
411 .field("meta", self.meta_ref())
412 .field("compacting", &self.compacting())
413 .field("deleted", &self.inner.deleted.load(Ordering::Relaxed))
414 .finish()
415 }
416}
417
418impl FileHandle {
419 pub fn new(meta: FileMeta, file_purger: FilePurgerRef) -> FileHandle {
420 FileHandle {
421 inner: Arc::new(FileHandleInner::new(meta, file_purger)),
422 }
423 }
424
425 pub fn region_id(&self) -> RegionId {
427 self.inner.meta.region_id
428 }
429
430 pub fn file_id(&self) -> RegionFileId {
432 RegionFileId::new(self.inner.meta.region_id, self.inner.meta.file_id)
433 }
434
435 pub fn index_id(&self) -> RegionIndexId {
437 RegionIndexId::new(self.file_id(), self.inner.meta.index_version)
438 }
439
440 pub fn file_path(&self, table_dir: &str, path_type: PathType) -> String {
442 location::sst_file_path(table_dir, self.file_id(), path_type)
443 }
444
445 pub fn time_range(&self) -> FileTimeRange {
447 self.inner.meta.time_range
448 }
449
450 pub fn mark_deleted(&self) {
452 self.inner.deleted.store(true, Ordering::Relaxed);
453 }
454
455 pub fn compacting(&self) -> bool {
456 self.inner.compacting.load(Ordering::Relaxed)
457 }
458
459 pub fn set_compacting(&self, compacting: bool) {
460 self.inner.compacting.store(compacting, Ordering::Relaxed);
461 }
462
463 pub fn index_outdated(&self) -> bool {
464 self.inner.index_outdated.load(Ordering::Relaxed)
465 }
466
467 pub fn set_index_outdated(&self, index_outdated: bool) {
468 self.inner
469 .index_outdated
470 .store(index_outdated, Ordering::Relaxed);
471 }
472
473 pub fn meta_ref(&self) -> &FileMeta {
475 &self.inner.meta
476 }
477
478 pub fn file_purger(&self) -> FilePurgerRef {
479 self.inner.file_purger.clone()
480 }
481
482 pub fn size(&self) -> u64 {
483 self.inner.meta.file_size
484 }
485
486 pub fn index_size(&self) -> u64 {
487 self.inner.meta.index_file_size
488 }
489
490 pub fn num_rows(&self) -> usize {
491 self.inner.meta.num_rows as usize
492 }
493
494 pub fn level(&self) -> Level {
495 self.inner.meta.level
496 }
497
498 pub fn is_deleted(&self) -> bool {
499 self.inner.deleted.load(Ordering::Relaxed)
500 }
501}
502
503struct FileHandleInner {
507 meta: FileMeta,
508 compacting: AtomicBool,
509 deleted: AtomicBool,
510 index_outdated: AtomicBool,
511 file_purger: FilePurgerRef,
512}
513
514impl Drop for FileHandleInner {
515 fn drop(&mut self) {
516 self.file_purger.remove_file(
517 self.meta.clone(),
518 self.deleted.load(Ordering::Acquire),
519 self.index_outdated.load(Ordering::Acquire),
520 );
521 }
522}
523
524impl FileHandleInner {
525 fn new(meta: FileMeta, file_purger: FilePurgerRef) -> FileHandleInner {
527 file_purger.new_file(&meta);
528 FileHandleInner {
529 meta,
530 compacting: AtomicBool::new(false),
531 deleted: AtomicBool::new(false),
532 index_outdated: AtomicBool::new(false),
533 file_purger,
534 }
535 }
536}
537
538pub async fn delete_files(
545 region_id: RegionId,
546 file_ids: &[(FileId, u64)],
547 delete_index: bool,
548 access_layer: &AccessLayerRef,
549 cache_manager: &Option<CacheManagerRef>,
550) -> crate::error::Result<()> {
551 if let Some(cache) = &cache_manager {
553 for (file_id, _) in file_ids {
554 cache.remove_parquet_meta_data(RegionFileId::new(region_id, *file_id));
555 }
556 }
557 let mut deleted_files = Vec::with_capacity(file_ids.len());
558
559 for (file_id, index_version) in file_ids {
560 let region_file_id = RegionFileId::new(region_id, *file_id);
561 match access_layer
562 .delete_sst(
563 ®ion_file_id,
564 &RegionIndexId::new(region_file_id, *index_version),
565 )
566 .await
567 {
568 Ok(_) => {
569 deleted_files.push(*file_id);
570 }
571 Err(e) => {
572 error!(e; "Failed to delete sst and index file for {}", region_file_id);
573 }
574 }
575 }
576
577 debug!(
578 "Deleted {} files for region {}: {:?}",
579 deleted_files.len(),
580 region_id,
581 deleted_files
582 );
583
584 for (file_id, index_version) in file_ids {
585 purge_index_cache_stager(
586 region_id,
587 delete_index,
588 access_layer,
589 cache_manager,
590 *file_id,
591 *index_version,
592 )
593 .await;
594 }
595 Ok(())
596}
597
598pub async fn delete_index(
599 region_index_id: RegionIndexId,
600 access_layer: &AccessLayerRef,
601 cache_manager: &Option<CacheManagerRef>,
602) -> crate::error::Result<()> {
603 access_layer.delete_index(region_index_id).await?;
604
605 purge_index_cache_stager(
606 region_index_id.region_id(),
607 true,
608 access_layer,
609 cache_manager,
610 region_index_id.file_id(),
611 region_index_id.version,
612 )
613 .await;
614
615 Ok(())
616}
617
618async fn purge_index_cache_stager(
619 region_id: RegionId,
620 delete_index: bool,
621 access_layer: &AccessLayerRef,
622 cache_manager: &Option<CacheManagerRef>,
623 file_id: FileId,
624 index_version: u64,
625) {
626 if let Some(write_cache) = cache_manager.as_ref().and_then(|cache| cache.write_cache()) {
627 if delete_index {
629 write_cache
630 .remove(IndexKey::new(
631 region_id,
632 file_id,
633 FileType::Puffin(index_version),
634 ))
635 .await;
636 }
637
638 write_cache
640 .remove(IndexKey::new(region_id, file_id, FileType::Parquet))
641 .await;
642 }
643
644 if let Err(e) = access_layer
646 .puffin_manager_factory()
647 .purge_stager(RegionIndexId::new(
648 RegionFileId::new(region_id, file_id),
649 index_version,
650 ))
651 .await
652 {
653 error!(e; "Failed to purge stager with index file, file_id: {}, index_version: {}, region: {}",
654 file_id, index_version, region_id);
655 }
656}
657
658#[cfg(test)]
659mod tests {
660 use std::str::FromStr;
661
662 use datatypes::prelude::ConcreteDataType;
663 use datatypes::schema::{
664 ColumnSchema, FulltextAnalyzer, FulltextBackend, FulltextOptions, SkippingIndexOptions,
665 };
666 use datatypes::value::Value;
667 use partition::expr::{PartitionExpr, col};
668
669 use super::*;
670
671 fn create_file_meta(file_id: FileId, level: Level) -> FileMeta {
672 FileMeta {
673 region_id: 0.into(),
674 file_id,
675 time_range: FileTimeRange::default(),
676 level,
677 file_size: 0,
678 max_row_group_uncompressed_size: 0,
679 available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
680 indexes: vec![ColumnIndexMetadata {
681 column_id: 0,
682 created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
683 }],
684 index_file_size: 0,
685 index_version: 0,
686 num_rows: 0,
687 num_row_groups: 0,
688 sequence: None,
689 partition_expr: None,
690 num_series: 0,
691 }
692 }
693
694 #[test]
695 fn test_deserialize_file_meta() {
696 let file_meta = create_file_meta(FileId::random(), 0);
697 let serialized_file_meta = serde_json::to_string(&file_meta).unwrap();
698 let deserialized_file_meta = serde_json::from_str(&serialized_file_meta);
699 assert_eq!(file_meta, deserialized_file_meta.unwrap());
700 }
701
702 #[test]
703 fn test_deserialize_from_string() {
704 let json_file_meta = "{\"region_id\":0,\"file_id\":\"bc5896ec-e4d8-4017-a80d-f2de73188d55\",\
705 \"time_range\":[{\"value\":0,\"unit\":\"Millisecond\"},{\"value\":0,\"unit\":\"Millisecond\"}],\
706 \"available_indexes\":[\"InvertedIndex\"],\"indexes\":[{\"column_id\": 0, \"created_indexes\": [\"InvertedIndex\"]}],\"level\":0}";
707 let file_meta = create_file_meta(
708 FileId::from_str("bc5896ec-e4d8-4017-a80d-f2de73188d55").unwrap(),
709 0,
710 );
711 let deserialized_file_meta: FileMeta = serde_json::from_str(json_file_meta).unwrap();
712 assert_eq!(file_meta, deserialized_file_meta);
713 }
714
715 #[test]
716 fn test_file_meta_with_partition_expr() {
717 let file_id = FileId::random();
718 let partition_expr = PartitionExpr::new(
719 col("a"),
720 partition::expr::RestrictedOp::GtEq,
721 Value::UInt32(10).into(),
722 );
723
724 let file_meta_with_partition = FileMeta {
725 region_id: 0.into(),
726 file_id,
727 time_range: FileTimeRange::default(),
728 level: 0,
729 file_size: 0,
730 max_row_group_uncompressed_size: 0,
731 available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
732 indexes: vec![ColumnIndexMetadata {
733 column_id: 0,
734 created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
735 }],
736 index_file_size: 0,
737 index_version: 0,
738 num_rows: 0,
739 num_row_groups: 0,
740 sequence: None,
741 partition_expr: Some(partition_expr.clone()),
742 num_series: 0,
743 };
744
745 let serialized = serde_json::to_string(&file_meta_with_partition).unwrap();
747 let deserialized: FileMeta = serde_json::from_str(&serialized).unwrap();
748 assert_eq!(file_meta_with_partition, deserialized);
749
750 let serialized_value: serde_json::Value = serde_json::from_str(&serialized).unwrap();
752 assert!(serialized_value["partition_expr"].as_str().is_some());
753 let partition_expr_json = serialized_value["partition_expr"].as_str().unwrap();
754 assert!(partition_expr_json.contains("\"Column\":\"a\""));
755 assert!(partition_expr_json.contains("\"op\":\"GtEq\""));
756
757 let file_meta_none = FileMeta {
759 partition_expr: None,
760 ..file_meta_with_partition.clone()
761 };
762 let serialized_none = serde_json::to_string(&file_meta_none).unwrap();
763 let deserialized_none: FileMeta = serde_json::from_str(&serialized_none).unwrap();
764 assert_eq!(file_meta_none, deserialized_none);
765 }
766
767 #[test]
768 fn test_file_meta_partition_expr_backward_compatibility() {
769 let json_with_partition_expr = r#"{
771 "region_id": 0,
772 "file_id": "bc5896ec-e4d8-4017-a80d-f2de73188d55",
773 "time_range": [
774 {"value": 0, "unit": "Millisecond"},
775 {"value": 0, "unit": "Millisecond"}
776 ],
777 "level": 0,
778 "file_size": 0,
779 "available_indexes": ["InvertedIndex"],
780 "index_file_size": 0,
781 "num_rows": 0,
782 "num_row_groups": 0,
783 "sequence": null,
784 "partition_expr": "{\"Expr\":{\"lhs\":{\"Column\":\"a\"},\"op\":\"GtEq\",\"rhs\":{\"Value\":{\"UInt32\":10}}}}"
785 }"#;
786
787 let file_meta: FileMeta = serde_json::from_str(json_with_partition_expr).unwrap();
788 assert!(file_meta.partition_expr.is_some());
789 let expr = file_meta.partition_expr.unwrap();
790 assert_eq!(format!("{}", expr), "a >= 10");
791
792 let json_with_empty_expr = r#"{
794 "region_id": 0,
795 "file_id": "bc5896ec-e4d8-4017-a80d-f2de73188d55",
796 "time_range": [
797 {"value": 0, "unit": "Millisecond"},
798 {"value": 0, "unit": "Millisecond"}
799 ],
800 "level": 0,
801 "file_size": 0,
802 "available_indexes": [],
803 "index_file_size": 0,
804 "num_rows": 0,
805 "num_row_groups": 0,
806 "sequence": null,
807 "partition_expr": ""
808 }"#;
809
810 let file_meta_empty: FileMeta = serde_json::from_str(json_with_empty_expr).unwrap();
811 assert!(file_meta_empty.partition_expr.is_none());
812
813 let json_with_null_expr = r#"{
815 "region_id": 0,
816 "file_id": "bc5896ec-e4d8-4017-a80d-f2de73188d55",
817 "time_range": [
818 {"value": 0, "unit": "Millisecond"},
819 {"value": 0, "unit": "Millisecond"}
820 ],
821 "level": 0,
822 "file_size": 0,
823 "available_indexes": [],
824 "index_file_size": 0,
825 "num_rows": 0,
826 "num_row_groups": 0,
827 "sequence": null,
828 "partition_expr": null
829 }"#;
830
831 let file_meta_null: FileMeta = serde_json::from_str(json_with_null_expr).unwrap();
832 assert!(file_meta_null.partition_expr.is_none());
833
834 let json_with_empty_expr = r#"{
836 "region_id": 0,
837 "file_id": "bc5896ec-e4d8-4017-a80d-f2de73188d55",
838 "time_range": [
839 {"value": 0, "unit": "Millisecond"},
840 {"value": 0, "unit": "Millisecond"}
841 ],
842 "level": 0,
843 "file_size": 0,
844 "available_indexes": [],
845 "index_file_size": 0,
846 "num_rows": 0,
847 "num_row_groups": 0,
848 "sequence": null
849 }"#;
850
851 let file_meta_empty: FileMeta = serde_json::from_str(json_with_empty_expr).unwrap();
852 assert!(file_meta_empty.partition_expr.is_none());
853 }
854
855 #[test]
856 fn test_file_meta_indexes_backward_compatibility() {
857 let json_old_file_meta = r#"{
859 "region_id": 0,
860 "file_id": "bc5896ec-e4d8-4017-a80d-f2de73188d55",
861 "time_range": [
862 {"value": 0, "unit": "Millisecond"},
863 {"value": 0, "unit": "Millisecond"}
864 ],
865 "available_indexes": ["InvertedIndex"],
866 "level": 0,
867 "file_size": 0,
868 "index_file_size": 0,
869 "num_rows": 0,
870 "num_row_groups": 0
871 }"#;
872
873 let deserialized_file_meta: FileMeta = serde_json::from_str(json_old_file_meta).unwrap();
874
875 assert_eq!(deserialized_file_meta.indexes, vec![]);
877
878 let expected_indexes: IndexTypes = SmallVec::from_iter([IndexType::InvertedIndex]);
879 assert_eq!(deserialized_file_meta.available_indexes, expected_indexes);
880
881 assert_eq!(
882 deserialized_file_meta.file_id,
883 FileId::from_str("bc5896ec-e4d8-4017-a80d-f2de73188d55").unwrap()
884 );
885 }
886 #[test]
887 fn test_is_index_consistent_with_region() {
888 fn new_column_meta(
889 id: ColumnId,
890 name: &str,
891 inverted: bool,
892 fulltext: bool,
893 skipping: bool,
894 ) -> ColumnMetadata {
895 let mut column_schema =
896 ColumnSchema::new(name, ConcreteDataType::string_datatype(), true);
897 if inverted {
898 column_schema = column_schema.with_inverted_index(true);
899 }
900 if fulltext {
901 column_schema = column_schema
902 .with_fulltext_options(FulltextOptions::new_unchecked(
903 true,
904 FulltextAnalyzer::English,
905 false,
906 FulltextBackend::Bloom,
907 1000,
908 0.01,
909 ))
910 .unwrap();
911 }
912 if skipping {
913 column_schema = column_schema
914 .with_skipping_options(SkippingIndexOptions::new_unchecked(
915 1024,
916 0.01,
917 datatypes::schema::SkippingIndexType::BloomFilter,
918 ))
919 .unwrap();
920 }
921
922 ColumnMetadata {
923 column_schema,
924 semantic_type: api::v1::SemanticType::Tag,
925 column_id: id,
926 }
927 }
928
929 let mut file_meta = FileMeta {
931 indexes: vec![ColumnIndexMetadata {
932 column_id: 1,
933 created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
934 }],
935 ..Default::default()
936 };
937 let region_meta = vec![new_column_meta(1, "tag1", true, false, false)];
938 assert!(file_meta.is_index_consistent_with_region(®ion_meta));
939
940 file_meta.indexes = vec![ColumnIndexMetadata {
942 column_id: 1,
943 created_indexes: SmallVec::from_iter([
944 IndexType::InvertedIndex,
945 IndexType::BloomFilterIndex,
946 ]),
947 }];
948 let region_meta = vec![new_column_meta(1, "tag1", true, false, false)];
949 assert!(file_meta.is_index_consistent_with_region(®ion_meta));
950
951 file_meta.indexes = vec![ColumnIndexMetadata {
953 column_id: 1,
954 created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
955 }];
956 let region_meta = vec![new_column_meta(1, "tag1", true, true, false)]; assert!(!file_meta.is_index_consistent_with_region(®ion_meta));
958
959 file_meta.indexes = vec![ColumnIndexMetadata {
961 column_id: 2, created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
963 }];
964 let region_meta = vec![new_column_meta(1, "tag1", true, false, false)]; assert!(!file_meta.is_index_consistent_with_region(®ion_meta));
966
967 file_meta.indexes = vec![ColumnIndexMetadata {
969 column_id: 1,
970 created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
971 }];
972 let region_meta = vec![new_column_meta(1, "tag1", false, false, false)]; assert!(file_meta.is_index_consistent_with_region(®ion_meta));
974
975 file_meta.indexes = vec![];
977 let region_meta = vec![new_column_meta(1, "tag1", true, false, false)];
978 assert!(!file_meta.is_index_consistent_with_region(®ion_meta));
979
980 file_meta.indexes = vec![
982 ColumnIndexMetadata {
983 column_id: 1,
984 created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
985 },
986 ColumnIndexMetadata {
987 column_id: 2, created_indexes: SmallVec::from_iter([IndexType::FulltextIndex]),
989 },
990 ];
991 let region_meta = vec![
992 new_column_meta(1, "tag1", true, false, false),
993 new_column_meta(2, "tag2", false, true, true), ];
995 assert!(!file_meta.is_index_consistent_with_region(®ion_meta));
996 }
997}