1use std::fmt;
18use std::fmt::{Debug, Formatter};
19use std::num::NonZeroU64;
20use std::sync::Arc;
21use std::sync::atomic::{AtomicBool, Ordering};
22
23use common_base::readable_size::ReadableSize;
24use common_telemetry::{debug, error};
25use common_time::Timestamp;
26use partition::expr::PartitionExpr;
27use serde::{Deserialize, Serialize};
28use smallvec::SmallVec;
29use store_api::metadata::ColumnMetadata;
30use store_api::region_request::PathType;
31use store_api::storage::{ColumnId, FileId, IndexVersion, RegionId};
32
33use crate::access_layer::AccessLayerRef;
34use crate::cache::CacheManagerRef;
35use crate::cache::file_cache::{FileType, IndexKey};
36use crate::sst::file_purger::FilePurgerRef;
37use crate::sst::location;
38
39fn serialize_partition_expr<S>(
41 partition_expr: &Option<PartitionExpr>,
42 serializer: S,
43) -> Result<S::Ok, S::Error>
44where
45 S: serde::Serializer,
46{
47 use serde::ser::Error;
48
49 match partition_expr {
50 None => serializer.serialize_none(),
51 Some(expr) => {
52 let json_str = expr.as_json_str().map_err(S::Error::custom)?;
53 serializer.serialize_some(&json_str)
54 }
55 }
56}
57
58fn deserialize_partition_expr<'de, D>(deserializer: D) -> Result<Option<PartitionExpr>, D::Error>
59where
60 D: serde::Deserializer<'de>,
61{
62 use serde::de::Error;
63
64 let opt_json_str: Option<String> = Option::deserialize(deserializer)?;
65 match opt_json_str {
66 None => Ok(None),
67 Some(json_str) => {
68 if json_str.is_empty() {
69 Ok(None)
71 } else {
72 PartitionExpr::from_json_str(&json_str).map_err(D::Error::custom)
74 }
75 }
76 }
77}
78
79pub type Level = u8;
81pub const MAX_LEVEL: Level = 2;
83pub type IndexTypes = SmallVec<[IndexType; 4]>;
85
86#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
90pub struct RegionFileId {
91 region_id: RegionId,
93 file_id: FileId,
95}
96
97impl RegionFileId {
98 pub fn new(region_id: RegionId, file_id: FileId) -> Self {
100 Self { region_id, file_id }
101 }
102
103 pub fn region_id(&self) -> RegionId {
105 self.region_id
106 }
107
108 pub fn file_id(&self) -> FileId {
110 self.file_id
111 }
112}
113
114impl fmt::Display for RegionFileId {
115 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
116 write!(f, "{}/{}", self.region_id, self.file_id)
117 }
118}
119
120#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
122pub struct RegionIndexId {
123 pub file_id: RegionFileId,
124 pub version: IndexVersion,
125}
126
127impl RegionIndexId {
128 pub fn new(file_id: RegionFileId, version: IndexVersion) -> Self {
129 Self { file_id, version }
130 }
131
132 pub fn region_id(&self) -> RegionId {
133 self.file_id.region_id
134 }
135
136 pub fn file_id(&self) -> FileId {
137 self.file_id.file_id
138 }
139}
140
141impl fmt::Display for RegionIndexId {
142 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
143 if self.version == 0 {
144 write!(f, "{}/{}", self.file_id.region_id, self.file_id.file_id)
145 } else {
146 write!(
147 f,
148 "{}/{}.{}",
149 self.file_id.region_id, self.file_id.file_id, self.version
150 )
151 }
152 }
153}
154
155pub type FileTimeRange = (Timestamp, Timestamp);
158
159pub(crate) fn overlaps(l: &FileTimeRange, r: &FileTimeRange) -> bool {
161 let (l, r) = if l.0 <= r.0 { (l, r) } else { (r, l) };
162 let (_, l_end) = l;
163 let (r_start, _) = r;
164
165 r_start <= l_end
166}
167
168#[derive(Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
170#[serde(default)]
171pub struct FileMeta {
172 pub region_id: RegionId,
174 pub file_id: FileId,
176 pub time_range: FileTimeRange,
179 pub level: Level,
181 pub file_size: u64,
183 pub max_row_group_uncompressed_size: u64,
185 pub available_indexes: IndexTypes,
187 pub indexes: Vec<ColumnIndexMetadata>,
197 pub index_file_size: u64,
199 pub index_version: u64,
203 pub num_rows: u64,
209 pub num_row_groups: u64,
215 pub sequence: Option<NonZeroU64>,
220 #[serde(
228 serialize_with = "serialize_partition_expr",
229 deserialize_with = "deserialize_partition_expr"
230 )]
231 pub partition_expr: Option<PartitionExpr>,
232 pub num_series: u64,
236}
237
238impl Debug for FileMeta {
239 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
240 let mut debug_struct = f.debug_struct("FileMeta");
241 debug_struct
242 .field("region_id", &self.region_id)
243 .field_with("file_id", |f| write!(f, "{} ", self.file_id))
244 .field_with("time_range", |f| {
245 write!(
246 f,
247 "({}, {}) ",
248 self.time_range.0.to_iso8601_string(),
249 self.time_range.1.to_iso8601_string()
250 )
251 })
252 .field("level", &self.level)
253 .field("file_size", &ReadableSize(self.file_size))
254 .field(
255 "max_row_group_uncompressed_size",
256 &ReadableSize(self.max_row_group_uncompressed_size),
257 );
258 if !self.available_indexes.is_empty() {
259 debug_struct
260 .field("available_indexes", &self.available_indexes)
261 .field("indexes", &self.indexes)
262 .field("index_file_size", &ReadableSize(self.index_file_size));
263 }
264 debug_struct
265 .field("num_rows", &self.num_rows)
266 .field("num_row_groups", &self.num_row_groups)
267 .field_with("sequence", |f| match self.sequence {
268 None => {
269 write!(f, "None")
270 }
271 Some(seq) => {
272 write!(f, "{}", seq)
273 }
274 })
275 .field("partition_expr", &self.partition_expr)
276 .field("num_series", &self.num_series)
277 .finish()
278 }
279}
280
281#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
283pub enum IndexType {
284 InvertedIndex,
286 FulltextIndex,
288 BloomFilterIndex,
290 #[cfg(feature = "vector_index")]
292 VectorIndex,
293}
294
295#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
305#[serde(default)]
306pub struct ColumnIndexMetadata {
307 pub column_id: ColumnId,
309 pub created_indexes: IndexTypes,
311}
312
313impl FileMeta {
314 pub fn exists_index(&self) -> bool {
315 !self.available_indexes.is_empty()
316 }
317
318 pub fn index_version(&self) -> Option<IndexVersion> {
319 if self.exists_index() {
320 Some(self.index_version)
321 } else {
322 None
323 }
324 }
325
326 pub fn is_index_up_to_date(&self, other: &FileMeta) -> bool {
328 self.exists_index() && other.exists_index() && self.index_version >= other.index_version
329 }
330
331 pub fn inverted_index_available(&self) -> bool {
333 self.available_indexes.contains(&IndexType::InvertedIndex)
334 }
335
336 pub fn fulltext_index_available(&self) -> bool {
338 self.available_indexes.contains(&IndexType::FulltextIndex)
339 }
340
341 pub fn bloom_filter_index_available(&self) -> bool {
343 self.available_indexes
344 .contains(&IndexType::BloomFilterIndex)
345 }
346
347 #[cfg(feature = "vector_index")]
349 pub fn vector_index_available(&self) -> bool {
350 self.available_indexes.contains(&IndexType::VectorIndex)
351 }
352
353 pub fn index_file_size(&self) -> u64 {
354 self.index_file_size
355 }
356
357 pub fn is_index_consistent_with_region(&self, metadata: &[ColumnMetadata]) -> bool {
359 let id_to_indexes = self
360 .indexes
361 .iter()
362 .map(|index| (index.column_id, index.created_indexes.clone()))
363 .collect::<std::collections::HashMap<_, _>>();
364 for column in metadata {
365 if !column.column_schema.is_indexed() {
366 continue;
367 }
368 if let Some(indexes) = id_to_indexes.get(&column.column_id) {
369 if column.column_schema.is_inverted_indexed()
370 && !indexes.contains(&IndexType::InvertedIndex)
371 {
372 return false;
373 }
374 if column.column_schema.is_fulltext_indexed()
375 && !indexes.contains(&IndexType::FulltextIndex)
376 {
377 return false;
378 }
379 if column.column_schema.is_skipping_indexed()
380 && !indexes.contains(&IndexType::BloomFilterIndex)
381 {
382 return false;
383 }
384 } else {
385 return false;
386 }
387 }
388 true
389 }
390
391 pub fn file_id(&self) -> RegionFileId {
393 RegionFileId::new(self.region_id, self.file_id)
394 }
395
396 pub fn index_id(&self) -> RegionIndexId {
398 RegionIndexId::new(self.file_id(), self.index_version)
399 }
400}
401
402#[derive(Clone)]
404pub struct FileHandle {
405 inner: Arc<FileHandleInner>,
406}
407
408impl fmt::Debug for FileHandle {
409 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
410 f.debug_struct("FileHandle")
411 .field("meta", self.meta_ref())
412 .field("compacting", &self.compacting())
413 .field("deleted", &self.inner.deleted.load(Ordering::Relaxed))
414 .finish()
415 }
416}
417
418impl FileHandle {
419 pub fn new(meta: FileMeta, file_purger: FilePurgerRef) -> FileHandle {
420 FileHandle {
421 inner: Arc::new(FileHandleInner::new(meta, file_purger)),
422 }
423 }
424
425 pub fn region_id(&self) -> RegionId {
427 self.inner.meta.region_id
428 }
429
430 pub fn file_id(&self) -> RegionFileId {
432 RegionFileId::new(self.inner.meta.region_id, self.inner.meta.file_id)
433 }
434
435 pub fn index_id(&self) -> RegionIndexId {
437 RegionIndexId::new(self.file_id(), self.inner.meta.index_version)
438 }
439
440 pub fn file_path(&self, table_dir: &str, path_type: PathType) -> String {
442 location::sst_file_path(table_dir, self.file_id(), path_type)
443 }
444
445 pub fn time_range(&self) -> FileTimeRange {
447 self.inner.meta.time_range
448 }
449
450 pub fn mark_deleted(&self) {
452 self.inner.deleted.store(true, Ordering::Relaxed);
453 }
454
455 pub fn compacting(&self) -> bool {
456 self.inner.compacting.load(Ordering::Relaxed)
457 }
458
459 pub fn set_compacting(&self, compacting: bool) {
460 self.inner.compacting.store(compacting, Ordering::Relaxed);
461 }
462
463 pub fn index_outdated(&self) -> bool {
464 self.inner.index_outdated.load(Ordering::Relaxed)
465 }
466
467 pub fn set_index_outdated(&self, index_outdated: bool) {
468 self.inner
469 .index_outdated
470 .store(index_outdated, Ordering::Relaxed);
471 }
472
473 pub fn meta_ref(&self) -> &FileMeta {
475 &self.inner.meta
476 }
477
478 pub fn file_purger(&self) -> FilePurgerRef {
479 self.inner.file_purger.clone()
480 }
481
482 pub fn size(&self) -> u64 {
483 self.inner.meta.file_size
484 }
485
486 pub fn index_size(&self) -> u64 {
487 self.inner.meta.index_file_size
488 }
489
490 pub fn num_rows(&self) -> usize {
491 self.inner.meta.num_rows as usize
492 }
493
494 pub fn level(&self) -> Level {
495 self.inner.meta.level
496 }
497
498 pub fn is_deleted(&self) -> bool {
499 self.inner.deleted.load(Ordering::Relaxed)
500 }
501}
502
503struct FileHandleInner {
507 meta: FileMeta,
508 compacting: AtomicBool,
509 deleted: AtomicBool,
510 index_outdated: AtomicBool,
511 file_purger: FilePurgerRef,
512}
513
514impl Drop for FileHandleInner {
515 fn drop(&mut self) {
516 self.file_purger.remove_file(
517 self.meta.clone(),
518 self.deleted.load(Ordering::Acquire),
519 self.index_outdated.load(Ordering::Acquire),
520 );
521 }
522}
523
524impl FileHandleInner {
525 fn new(meta: FileMeta, file_purger: FilePurgerRef) -> FileHandleInner {
527 file_purger.new_file(&meta);
528 FileHandleInner {
529 meta,
530 compacting: AtomicBool::new(false),
531 deleted: AtomicBool::new(false),
532 index_outdated: AtomicBool::new(false),
533 file_purger,
534 }
535 }
536}
537
538pub async fn delete_files(
545 region_id: RegionId,
546 file_ids: &[(FileId, u64)],
547 delete_index: bool,
548 access_layer: &AccessLayerRef,
549 cache_manager: &Option<CacheManagerRef>,
550) -> crate::error::Result<()> {
551 if let Some(cache) = &cache_manager {
553 for (file_id, _) in file_ids {
554 cache.remove_parquet_meta_data(RegionFileId::new(region_id, *file_id));
555 }
556 }
557 let mut attempted_files = Vec::with_capacity(file_ids.len());
558 let mut index_ids = Vec::new();
559
560 for (file_id, index_version) in file_ids {
561 let region_file_id = RegionFileId::new(region_id, *file_id);
562 attempted_files.push(*file_id);
563 index_ids.extend(
564 (0..=*index_version).map(|version| RegionIndexId::new(region_file_id, version)),
565 );
566 }
567
568 access_layer
569 .delete_ssts(region_id, &attempted_files)
570 .await?;
571 access_layer.delete_indexes(&index_ids).await?;
572
573 debug!(
574 "Attempted to delete {} files for region {}: {:?}",
575 attempted_files.len(),
576 region_id,
577 attempted_files
578 );
579
580 for (file_id, index_version) in file_ids {
581 purge_index_cache_stager(
582 region_id,
583 delete_index,
584 access_layer,
585 cache_manager,
586 *file_id,
587 *index_version,
588 )
589 .await;
590 }
591 Ok(())
592}
593
594pub async fn delete_index(
595 region_index_id: RegionIndexId,
596 access_layer: &AccessLayerRef,
597 cache_manager: &Option<CacheManagerRef>,
598) -> crate::error::Result<()> {
599 delete_index_and_purge(region_index_id, access_layer, cache_manager).await?;
600
601 Ok(())
602}
603
604pub async fn delete_indexes(
605 index_ids: &[RegionIndexId],
606 access_layer: &AccessLayerRef,
607 cache_manager: &Option<CacheManagerRef>,
608) -> crate::error::Result<()> {
609 if index_ids.is_empty() {
610 return Ok(());
611 }
612
613 if let Err(e) = access_layer.delete_indexes(index_ids).await {
614 error!(e; "Failed to batch delete index files");
615
616 for index_id in index_ids {
617 delete_index_and_purge(*index_id, access_layer, cache_manager).await?;
618 }
619
620 return Ok(());
621 }
622
623 purge_indexes(index_ids, access_layer, cache_manager).await;
624
625 Ok(())
626}
627
628async fn delete_index_and_purge(
629 index_id: RegionIndexId,
630 access_layer: &AccessLayerRef,
631 cache_manager: &Option<CacheManagerRef>,
632) -> crate::error::Result<()> {
633 access_layer.delete_index(index_id).await?;
634 purge_index_cache_stager(
635 index_id.region_id(),
636 true,
637 access_layer,
638 cache_manager,
639 index_id.file_id(),
640 index_id.version,
641 )
642 .await;
643 Ok(())
644}
645
646async fn purge_indexes(
647 index_ids: &[RegionIndexId],
648 access_layer: &AccessLayerRef,
649 cache_manager: &Option<CacheManagerRef>,
650) {
651 for index_id in index_ids {
652 purge_index_cache_stager(
653 index_id.region_id(),
654 true,
655 access_layer,
656 cache_manager,
657 index_id.file_id(),
658 index_id.version,
659 )
660 .await;
661 }
662}
663
664async fn purge_index_cache_stager(
665 region_id: RegionId,
666 delete_index: bool,
667 access_layer: &AccessLayerRef,
668 cache_manager: &Option<CacheManagerRef>,
669 file_id: FileId,
670 index_version: u64,
671) {
672 if let Some(write_cache) = cache_manager.as_ref().and_then(|cache| cache.write_cache()) {
673 if delete_index {
675 write_cache
676 .remove(IndexKey::new(
677 region_id,
678 file_id,
679 FileType::Puffin(index_version),
680 ))
681 .await;
682 }
683
684 write_cache
686 .remove(IndexKey::new(region_id, file_id, FileType::Parquet))
687 .await;
688 }
689
690 if let Err(e) = access_layer
692 .puffin_manager_factory()
693 .purge_stager(RegionIndexId::new(
694 RegionFileId::new(region_id, file_id),
695 index_version,
696 ))
697 .await
698 {
699 error!(e; "Failed to purge stager with index file, file_id: {}, index_version: {}, region: {}",
700 file_id, index_version, region_id);
701 }
702}
703
704#[cfg(test)]
705mod tests {
706 use std::str::FromStr;
707
708 use datatypes::prelude::ConcreteDataType;
709 use datatypes::schema::{
710 ColumnSchema, FulltextAnalyzer, FulltextBackend, FulltextOptions, SkippingIndexOptions,
711 };
712 use datatypes::value::Value;
713 use partition::expr::{PartitionExpr, col};
714
715 use super::*;
716
717 fn create_file_meta(file_id: FileId, level: Level) -> FileMeta {
718 FileMeta {
719 region_id: 0.into(),
720 file_id,
721 time_range: FileTimeRange::default(),
722 level,
723 file_size: 0,
724 max_row_group_uncompressed_size: 0,
725 available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
726 indexes: vec![ColumnIndexMetadata {
727 column_id: 0,
728 created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
729 }],
730 index_file_size: 0,
731 index_version: 0,
732 num_rows: 0,
733 num_row_groups: 0,
734 sequence: None,
735 partition_expr: None,
736 num_series: 0,
737 }
738 }
739
740 #[test]
741 fn test_deserialize_file_meta() {
742 let file_meta = create_file_meta(FileId::random(), 0);
743 let serialized_file_meta = serde_json::to_string(&file_meta).unwrap();
744 let deserialized_file_meta = serde_json::from_str(&serialized_file_meta);
745 assert_eq!(file_meta, deserialized_file_meta.unwrap());
746 }
747
748 #[test]
749 fn test_deserialize_from_string() {
750 let json_file_meta = "{\"region_id\":0,\"file_id\":\"bc5896ec-e4d8-4017-a80d-f2de73188d55\",\
751 \"time_range\":[{\"value\":0,\"unit\":\"Millisecond\"},{\"value\":0,\"unit\":\"Millisecond\"}],\
752 \"available_indexes\":[\"InvertedIndex\"],\"indexes\":[{\"column_id\": 0, \"created_indexes\": [\"InvertedIndex\"]}],\"level\":0}";
753 let file_meta = create_file_meta(
754 FileId::from_str("bc5896ec-e4d8-4017-a80d-f2de73188d55").unwrap(),
755 0,
756 );
757 let deserialized_file_meta: FileMeta = serde_json::from_str(json_file_meta).unwrap();
758 assert_eq!(file_meta, deserialized_file_meta);
759 }
760
761 #[test]
762 fn test_file_meta_with_partition_expr() {
763 let file_id = FileId::random();
764 let partition_expr = PartitionExpr::new(
765 col("a"),
766 partition::expr::RestrictedOp::GtEq,
767 Value::UInt32(10).into(),
768 );
769
770 let file_meta_with_partition = FileMeta {
771 region_id: 0.into(),
772 file_id,
773 time_range: FileTimeRange::default(),
774 level: 0,
775 file_size: 0,
776 max_row_group_uncompressed_size: 0,
777 available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
778 indexes: vec![ColumnIndexMetadata {
779 column_id: 0,
780 created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
781 }],
782 index_file_size: 0,
783 index_version: 0,
784 num_rows: 0,
785 num_row_groups: 0,
786 sequence: None,
787 partition_expr: Some(partition_expr.clone()),
788 num_series: 0,
789 };
790
791 let serialized = serde_json::to_string(&file_meta_with_partition).unwrap();
793 let deserialized: FileMeta = serde_json::from_str(&serialized).unwrap();
794 assert_eq!(file_meta_with_partition, deserialized);
795
796 let serialized_value: serde_json::Value = serde_json::from_str(&serialized).unwrap();
798 assert!(serialized_value["partition_expr"].as_str().is_some());
799 let partition_expr_json = serialized_value["partition_expr"].as_str().unwrap();
800 assert!(partition_expr_json.contains("\"Column\":\"a\""));
801 assert!(partition_expr_json.contains("\"op\":\"GtEq\""));
802
803 let file_meta_none = FileMeta {
805 partition_expr: None,
806 ..file_meta_with_partition.clone()
807 };
808 let serialized_none = serde_json::to_string(&file_meta_none).unwrap();
809 let deserialized_none: FileMeta = serde_json::from_str(&serialized_none).unwrap();
810 assert_eq!(file_meta_none, deserialized_none);
811 }
812
813 #[test]
814 fn test_file_meta_partition_expr_backward_compatibility() {
815 let json_with_partition_expr = r#"{
817 "region_id": 0,
818 "file_id": "bc5896ec-e4d8-4017-a80d-f2de73188d55",
819 "time_range": [
820 {"value": 0, "unit": "Millisecond"},
821 {"value": 0, "unit": "Millisecond"}
822 ],
823 "level": 0,
824 "file_size": 0,
825 "available_indexes": ["InvertedIndex"],
826 "index_file_size": 0,
827 "num_rows": 0,
828 "num_row_groups": 0,
829 "sequence": null,
830 "partition_expr": "{\"Expr\":{\"lhs\":{\"Column\":\"a\"},\"op\":\"GtEq\",\"rhs\":{\"Value\":{\"UInt32\":10}}}}"
831 }"#;
832
833 let file_meta: FileMeta = serde_json::from_str(json_with_partition_expr).unwrap();
834 assert!(file_meta.partition_expr.is_some());
835 let expr = file_meta.partition_expr.unwrap();
836 assert_eq!(format!("{}", expr), "a >= 10");
837
838 let json_with_empty_expr = r#"{
840 "region_id": 0,
841 "file_id": "bc5896ec-e4d8-4017-a80d-f2de73188d55",
842 "time_range": [
843 {"value": 0, "unit": "Millisecond"},
844 {"value": 0, "unit": "Millisecond"}
845 ],
846 "level": 0,
847 "file_size": 0,
848 "available_indexes": [],
849 "index_file_size": 0,
850 "num_rows": 0,
851 "num_row_groups": 0,
852 "sequence": null,
853 "partition_expr": ""
854 }"#;
855
856 let file_meta_empty: FileMeta = serde_json::from_str(json_with_empty_expr).unwrap();
857 assert!(file_meta_empty.partition_expr.is_none());
858
859 let json_with_null_expr = r#"{
861 "region_id": 0,
862 "file_id": "bc5896ec-e4d8-4017-a80d-f2de73188d55",
863 "time_range": [
864 {"value": 0, "unit": "Millisecond"},
865 {"value": 0, "unit": "Millisecond"}
866 ],
867 "level": 0,
868 "file_size": 0,
869 "available_indexes": [],
870 "index_file_size": 0,
871 "num_rows": 0,
872 "num_row_groups": 0,
873 "sequence": null,
874 "partition_expr": null
875 }"#;
876
877 let file_meta_null: FileMeta = serde_json::from_str(json_with_null_expr).unwrap();
878 assert!(file_meta_null.partition_expr.is_none());
879
880 let json_with_empty_expr = r#"{
882 "region_id": 0,
883 "file_id": "bc5896ec-e4d8-4017-a80d-f2de73188d55",
884 "time_range": [
885 {"value": 0, "unit": "Millisecond"},
886 {"value": 0, "unit": "Millisecond"}
887 ],
888 "level": 0,
889 "file_size": 0,
890 "available_indexes": [],
891 "index_file_size": 0,
892 "num_rows": 0,
893 "num_row_groups": 0,
894 "sequence": null
895 }"#;
896
897 let file_meta_empty: FileMeta = serde_json::from_str(json_with_empty_expr).unwrap();
898 assert!(file_meta_empty.partition_expr.is_none());
899 }
900
901 #[test]
902 fn test_file_meta_indexes_backward_compatibility() {
903 let json_old_file_meta = r#"{
905 "region_id": 0,
906 "file_id": "bc5896ec-e4d8-4017-a80d-f2de73188d55",
907 "time_range": [
908 {"value": 0, "unit": "Millisecond"},
909 {"value": 0, "unit": "Millisecond"}
910 ],
911 "available_indexes": ["InvertedIndex"],
912 "level": 0,
913 "file_size": 0,
914 "index_file_size": 0,
915 "num_rows": 0,
916 "num_row_groups": 0
917 }"#;
918
919 let deserialized_file_meta: FileMeta = serde_json::from_str(json_old_file_meta).unwrap();
920
921 assert_eq!(deserialized_file_meta.indexes, vec![]);
923
924 let expected_indexes: IndexTypes = SmallVec::from_iter([IndexType::InvertedIndex]);
925 assert_eq!(deserialized_file_meta.available_indexes, expected_indexes);
926
927 assert_eq!(
928 deserialized_file_meta.file_id,
929 FileId::from_str("bc5896ec-e4d8-4017-a80d-f2de73188d55").unwrap()
930 );
931 }
932 #[test]
933 fn test_is_index_consistent_with_region() {
934 fn new_column_meta(
935 id: ColumnId,
936 name: &str,
937 inverted: bool,
938 fulltext: bool,
939 skipping: bool,
940 ) -> ColumnMetadata {
941 let mut column_schema =
942 ColumnSchema::new(name, ConcreteDataType::string_datatype(), true);
943 if inverted {
944 column_schema = column_schema.with_inverted_index(true);
945 }
946 if fulltext {
947 column_schema = column_schema
948 .with_fulltext_options(FulltextOptions::new_unchecked(
949 true,
950 FulltextAnalyzer::English,
951 false,
952 FulltextBackend::Bloom,
953 1000,
954 0.01,
955 ))
956 .unwrap();
957 }
958 if skipping {
959 column_schema = column_schema
960 .with_skipping_options(SkippingIndexOptions::new_unchecked(
961 1024,
962 0.01,
963 datatypes::schema::SkippingIndexType::BloomFilter,
964 ))
965 .unwrap();
966 }
967
968 ColumnMetadata {
969 column_schema,
970 semantic_type: api::v1::SemanticType::Tag,
971 column_id: id,
972 }
973 }
974
975 let mut file_meta = FileMeta {
977 indexes: vec![ColumnIndexMetadata {
978 column_id: 1,
979 created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
980 }],
981 ..Default::default()
982 };
983 let region_meta = vec![new_column_meta(1, "tag1", true, false, false)];
984 assert!(file_meta.is_index_consistent_with_region(®ion_meta));
985
986 file_meta.indexes = vec![ColumnIndexMetadata {
988 column_id: 1,
989 created_indexes: SmallVec::from_iter([
990 IndexType::InvertedIndex,
991 IndexType::BloomFilterIndex,
992 ]),
993 }];
994 let region_meta = vec![new_column_meta(1, "tag1", true, false, false)];
995 assert!(file_meta.is_index_consistent_with_region(®ion_meta));
996
997 file_meta.indexes = vec![ColumnIndexMetadata {
999 column_id: 1,
1000 created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
1001 }];
1002 let region_meta = vec![new_column_meta(1, "tag1", true, true, false)]; assert!(!file_meta.is_index_consistent_with_region(®ion_meta));
1004
1005 file_meta.indexes = vec![ColumnIndexMetadata {
1007 column_id: 2, created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
1009 }];
1010 let region_meta = vec![new_column_meta(1, "tag1", true, false, false)]; assert!(!file_meta.is_index_consistent_with_region(®ion_meta));
1012
1013 file_meta.indexes = vec![ColumnIndexMetadata {
1015 column_id: 1,
1016 created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
1017 }];
1018 let region_meta = vec![new_column_meta(1, "tag1", false, false, false)]; assert!(file_meta.is_index_consistent_with_region(®ion_meta));
1020
1021 file_meta.indexes = vec![];
1023 let region_meta = vec![new_column_meta(1, "tag1", true, false, false)];
1024 assert!(!file_meta.is_index_consistent_with_region(®ion_meta));
1025
1026 file_meta.indexes = vec![
1028 ColumnIndexMetadata {
1029 column_id: 1,
1030 created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
1031 },
1032 ColumnIndexMetadata {
1033 column_id: 2, created_indexes: SmallVec::from_iter([IndexType::FulltextIndex]),
1035 },
1036 ];
1037 let region_meta = vec![
1038 new_column_meta(1, "tag1", true, false, false),
1039 new_column_meta(2, "tag2", false, true, true), ];
1041 assert!(!file_meta.is_index_consistent_with_region(®ion_meta));
1042 }
1043}