1use std::fmt;
18use std::fmt::{Debug, Formatter};
19use std::num::NonZeroU64;
20use std::sync::atomic::{AtomicBool, Ordering};
21use std::sync::{Arc, RwLock};
22
23use base64::prelude::{BASE64_STANDARD, Engine};
24use bytes::Bytes;
25use common_base::readable_size::ReadableSize;
26use common_telemetry::{debug, error};
27use common_time::Timestamp;
28use partition::expr::PartitionExpr;
29use serde::{Deserialize, Serialize};
30use smallvec::SmallVec;
31use store_api::metadata::ColumnMetadata;
32use store_api::region_request::PathType;
33use store_api::storage::{ColumnId, FileId, IndexVersion, RegionId};
34
35use crate::access_layer::AccessLayerRef;
36use crate::cache::CacheManagerRef;
37use crate::cache::file_cache::{FileType, IndexKey};
38use crate::sst::file_purger::FilePurgerRef;
39use crate::sst::location;
40
41fn serialize_bytes_option<S>(bytes: &Option<Bytes>, serializer: S) -> Result<S::Ok, S::Error>
43where
44 S: serde::Serializer,
45{
46 match bytes {
47 None => serializer.serialize_none(),
48 Some(b) => serializer.serialize_some(&BASE64_STANDARD.encode(b)),
49 }
50}
51
52fn deserialize_bytes_option<'de, D>(deserializer: D) -> Result<Option<Bytes>, D::Error>
53where
54 D: serde::Deserializer<'de>,
55{
56 let opt: Option<String> = Option::deserialize(deserializer)?;
57 match opt {
58 None => Ok(None),
59 Some(s) => {
60 let decoded = BASE64_STANDARD
61 .decode(&s)
62 .map_err(serde::de::Error::custom)?;
63 Ok(Some(Bytes::from(decoded)))
64 }
65 }
66}
67
68fn serialize_partition_expr<S>(
70 partition_expr: &Option<PartitionExpr>,
71 serializer: S,
72) -> Result<S::Ok, S::Error>
73where
74 S: serde::Serializer,
75{
76 use serde::ser::Error;
77
78 match partition_expr {
79 None => serializer.serialize_none(),
80 Some(expr) => {
81 let json_str = expr.as_json_str().map_err(S::Error::custom)?;
82 serializer.serialize_some(&json_str)
83 }
84 }
85}
86
87fn deserialize_partition_expr<'de, D>(deserializer: D) -> Result<Option<PartitionExpr>, D::Error>
88where
89 D: serde::Deserializer<'de>,
90{
91 use serde::de::Error;
92
93 let opt_json_str: Option<String> = Option::deserialize(deserializer)?;
94 match opt_json_str {
95 None => Ok(None),
96 Some(json_str) => {
97 if json_str.is_empty() {
98 Ok(None)
100 } else {
101 PartitionExpr::from_json_str(&json_str).map_err(D::Error::custom)
103 }
104 }
105 }
106}
107
108pub type Level = u8;
110pub const MAX_LEVEL: Level = 2;
112pub type IndexTypes = SmallVec<[IndexType; 4]>;
114
115#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
119pub struct RegionFileId {
120 region_id: RegionId,
122 file_id: FileId,
124}
125
126impl RegionFileId {
127 pub fn new(region_id: RegionId, file_id: FileId) -> Self {
129 Self { region_id, file_id }
130 }
131
132 pub fn region_id(&self) -> RegionId {
134 self.region_id
135 }
136
137 pub fn file_id(&self) -> FileId {
139 self.file_id
140 }
141}
142
143impl fmt::Display for RegionFileId {
144 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
145 write!(f, "{}/{}", self.region_id, self.file_id)
146 }
147}
148
149#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
151pub struct RegionIndexId {
152 pub file_id: RegionFileId,
153 pub version: IndexVersion,
154}
155
156impl RegionIndexId {
157 pub fn new(file_id: RegionFileId, version: IndexVersion) -> Self {
158 Self { file_id, version }
159 }
160
161 pub fn region_id(&self) -> RegionId {
162 self.file_id.region_id
163 }
164
165 pub fn file_id(&self) -> FileId {
166 self.file_id.file_id
167 }
168}
169
170impl fmt::Display for RegionIndexId {
171 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
172 if self.version == 0 {
173 write!(f, "{}/{}", self.file_id.region_id, self.file_id.file_id)
174 } else {
175 write!(
176 f,
177 "{}/{}.{}",
178 self.file_id.region_id, self.file_id.file_id, self.version
179 )
180 }
181 }
182}
183
184pub type FileTimeRange = (Timestamp, Timestamp);
187
188pub(crate) fn overlaps(l: &FileTimeRange, r: &FileTimeRange) -> bool {
190 let (l, r) = if l.0 <= r.0 { (l, r) } else { (r, l) };
191 let (_, l_end) = l;
192 let (r_start, _) = r;
193
194 r_start <= l_end
195}
196
197#[derive(Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
199#[serde(default)]
200pub struct FileMeta {
201 pub region_id: RegionId,
203 pub file_id: FileId,
205 pub time_range: FileTimeRange,
208 pub level: Level,
210 pub file_size: u64,
212 pub max_row_group_uncompressed_size: u64,
214 pub available_indexes: IndexTypes,
216 pub indexes: Vec<ColumnIndexMetadata>,
226 pub index_file_size: u64,
228 pub index_version: u64,
232 pub num_rows: u64,
238 pub num_row_groups: u64,
244 pub sequence: Option<NonZeroU64>,
249 #[serde(
257 serialize_with = "serialize_partition_expr",
258 deserialize_with = "deserialize_partition_expr"
259 )]
260 pub partition_expr: Option<PartitionExpr>,
261 pub num_series: u64,
265 #[serde(
268 default,
269 skip_serializing_if = "Option::is_none",
270 serialize_with = "serialize_bytes_option",
271 deserialize_with = "deserialize_bytes_option"
272 )]
273 pub primary_key_min: Option<Bytes>,
274 #[serde(
277 default,
278 skip_serializing_if = "Option::is_none",
279 serialize_with = "serialize_bytes_option",
280 deserialize_with = "deserialize_bytes_option"
281 )]
282 pub primary_key_max: Option<Bytes>,
283}
284
285impl Debug for FileMeta {
286 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
287 let mut debug_struct = f.debug_struct("FileMeta");
288 debug_struct
289 .field("region_id", &self.region_id)
290 .field_with("file_id", |f| write!(f, "{} ", self.file_id))
291 .field_with("time_range", |f| {
292 write!(
293 f,
294 "({}, {}) ",
295 self.time_range.0.to_iso8601_string(),
296 self.time_range.1.to_iso8601_string()
297 )
298 })
299 .field("level", &self.level)
300 .field("file_size", &ReadableSize(self.file_size))
301 .field(
302 "max_row_group_uncompressed_size",
303 &ReadableSize(self.max_row_group_uncompressed_size),
304 );
305 if !self.available_indexes.is_empty() {
306 debug_struct
307 .field("available_indexes", &self.available_indexes)
308 .field("indexes", &self.indexes)
309 .field("index_file_size", &ReadableSize(self.index_file_size));
310 }
311 debug_struct
312 .field("num_rows", &self.num_rows)
313 .field("num_row_groups", &self.num_row_groups)
314 .field_with("sequence", |f| match self.sequence {
315 None => {
316 write!(f, "None")
317 }
318 Some(seq) => {
319 write!(f, "{}", seq)
320 }
321 })
322 .field("partition_expr", &self.partition_expr)
323 .field("num_series", &self.num_series);
324 if self.primary_key_min.is_some() || self.primary_key_max.is_some() {
325 debug_struct
326 .field(
327 "primary_key_min",
328 &self.primary_key_min.as_ref().map(|b| b.len()),
329 )
330 .field(
331 "primary_key_max",
332 &self.primary_key_max.as_ref().map(|b| b.len()),
333 );
334 }
335 debug_struct.finish()
336 }
337}
338
339#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
341pub enum IndexType {
342 InvertedIndex,
344 FulltextIndex,
346 BloomFilterIndex,
348 #[cfg(feature = "vector_index")]
350 VectorIndex,
351}
352
353#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
363#[serde(default)]
364pub struct ColumnIndexMetadata {
365 pub column_id: ColumnId,
367 pub created_indexes: IndexTypes,
369}
370
371impl FileMeta {
372 pub fn primary_key_range(&self) -> Option<(Bytes, Bytes)> {
374 match (&self.primary_key_min, &self.primary_key_max) {
375 (Some(min), Some(max)) => Some((min.clone(), max.clone())),
376 _ => None,
377 }
378 }
379
380 pub fn exists_index(&self) -> bool {
381 !self.available_indexes.is_empty()
382 }
383
384 pub fn index_version(&self) -> Option<IndexVersion> {
385 if self.exists_index() {
386 Some(self.index_version)
387 } else {
388 None
389 }
390 }
391
392 pub fn is_index_up_to_date(&self, other: &FileMeta) -> bool {
394 self.exists_index() && other.exists_index() && self.index_version >= other.index_version
395 }
396
397 pub fn inverted_index_available(&self) -> bool {
399 self.available_indexes.contains(&IndexType::InvertedIndex)
400 }
401
402 pub fn fulltext_index_available(&self) -> bool {
404 self.available_indexes.contains(&IndexType::FulltextIndex)
405 }
406
407 pub fn bloom_filter_index_available(&self) -> bool {
409 self.available_indexes
410 .contains(&IndexType::BloomFilterIndex)
411 }
412
413 #[cfg(feature = "vector_index")]
415 pub fn vector_index_available(&self) -> bool {
416 self.available_indexes.contains(&IndexType::VectorIndex)
417 }
418
419 pub fn index_file_size(&self) -> u64 {
420 self.index_file_size
421 }
422
423 pub fn is_index_consistent_with_region(&self, metadata: &[ColumnMetadata]) -> bool {
425 let id_to_indexes = self
426 .indexes
427 .iter()
428 .map(|index| (index.column_id, index.created_indexes.clone()))
429 .collect::<std::collections::HashMap<_, _>>();
430 for column in metadata {
431 if !column.column_schema.is_indexed() {
432 continue;
433 }
434 if let Some(indexes) = id_to_indexes.get(&column.column_id) {
435 if column.column_schema.is_inverted_indexed()
436 && !indexes.contains(&IndexType::InvertedIndex)
437 {
438 return false;
439 }
440 if column.column_schema.is_fulltext_indexed()
441 && !indexes.contains(&IndexType::FulltextIndex)
442 {
443 return false;
444 }
445 if column.column_schema.is_skipping_indexed()
446 && !indexes.contains(&IndexType::BloomFilterIndex)
447 {
448 return false;
449 }
450 } else {
451 return false;
452 }
453 }
454 true
455 }
456
457 pub fn file_id(&self) -> RegionFileId {
459 RegionFileId::new(self.region_id, self.file_id)
460 }
461
462 pub fn index_id(&self) -> RegionIndexId {
464 RegionIndexId::new(self.file_id(), self.index_version)
465 }
466}
467
468#[derive(Clone)]
470pub struct FileHandle {
471 inner: Arc<FileHandleInner>,
472}
473
474impl fmt::Debug for FileHandle {
475 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
476 f.debug_struct("FileHandle")
477 .field("meta", self.meta_ref())
478 .field("compacting", &self.compacting())
479 .field("deleted", &self.inner.deleted.load(Ordering::Relaxed))
480 .finish()
481 }
482}
483
484impl FileHandle {
485 pub fn new(meta: FileMeta, file_purger: FilePurgerRef) -> FileHandle {
486 let pk_range = meta.primary_key_range();
487 FileHandle {
488 inner: Arc::new(FileHandleInner::new(meta, file_purger, pk_range)),
489 }
490 }
491
492 #[cfg(test)]
493 pub fn new_with_primary_key_range(
494 meta: FileMeta,
495 file_purger: FilePurgerRef,
496 primary_key_range: Option<(Bytes, Bytes)>,
497 ) -> FileHandle {
498 FileHandle {
499 inner: Arc::new(FileHandleInner::new(meta, file_purger, primary_key_range)),
500 }
501 }
502
503 pub fn region_id(&self) -> RegionId {
505 self.inner.meta.region_id
506 }
507
508 pub fn file_id(&self) -> RegionFileId {
510 RegionFileId::new(self.inner.meta.region_id, self.inner.meta.file_id)
511 }
512
513 pub fn index_id(&self) -> RegionIndexId {
515 RegionIndexId::new(self.file_id(), self.inner.meta.index_version)
516 }
517
518 pub fn file_path(&self, table_dir: &str, path_type: PathType) -> String {
520 location::sst_file_path(table_dir, self.file_id(), path_type)
521 }
522
523 pub fn time_range(&self) -> FileTimeRange {
525 self.inner.meta.time_range
526 }
527
528 pub fn mark_deleted(&self) {
530 self.inner.deleted.store(true, Ordering::Relaxed);
531 }
532
533 pub fn compacting(&self) -> bool {
534 self.inner.compacting.load(Ordering::Relaxed)
535 }
536
537 pub fn set_compacting(&self, compacting: bool) {
538 self.inner.compacting.store(compacting, Ordering::Relaxed);
539 }
540
541 pub fn index_outdated(&self) -> bool {
542 self.inner.index_outdated.load(Ordering::Relaxed)
543 }
544
545 pub fn set_index_outdated(&self, index_outdated: bool) {
546 self.inner
547 .index_outdated
548 .store(index_outdated, Ordering::Relaxed);
549 }
550
551 pub fn meta_ref(&self) -> &FileMeta {
553 &self.inner.meta
554 }
555
556 pub fn file_purger(&self) -> FilePurgerRef {
557 self.inner.file_purger.clone()
558 }
559
560 pub fn size(&self) -> u64 {
561 self.inner.meta.file_size
562 }
563
564 pub fn index_size(&self) -> u64 {
565 self.inner.meta.index_file_size
566 }
567
568 pub fn num_rows(&self) -> usize {
569 self.inner.meta.num_rows as usize
570 }
571
572 pub fn level(&self) -> Level {
573 self.inner.meta.level
574 }
575
576 pub fn is_deleted(&self) -> bool {
577 self.inner.deleted.load(Ordering::Relaxed)
578 }
579
580 pub fn primary_key_range(&self) -> Option<(Bytes, Bytes)> {
581 self.inner.primary_key_range.read().unwrap().clone()
582 }
583
584 pub(crate) fn set_primary_key_range(&self, primary_key_range: (Bytes, Bytes)) {
585 *self.inner.primary_key_range.write().unwrap() = Some(primary_key_range);
586 }
587}
588
589struct FileHandleInner {
593 meta: FileMeta,
594 compacting: AtomicBool,
595 deleted: AtomicBool,
596 index_outdated: AtomicBool,
597 primary_key_range: RwLock<Option<(Bytes, Bytes)>>,
598 file_purger: FilePurgerRef,
599}
600
601impl Drop for FileHandleInner {
602 fn drop(&mut self) {
603 self.file_purger.remove_file(
604 self.meta.clone(),
605 self.deleted.load(Ordering::Acquire),
606 self.index_outdated.load(Ordering::Acquire),
607 );
608 }
609}
610
611impl FileHandleInner {
612 fn new(
614 meta: FileMeta,
615 file_purger: FilePurgerRef,
616 primary_key_range: Option<(Bytes, Bytes)>,
617 ) -> FileHandleInner {
618 file_purger.new_file(&meta);
619 FileHandleInner {
620 meta,
621 compacting: AtomicBool::new(false),
622 deleted: AtomicBool::new(false),
623 index_outdated: AtomicBool::new(false),
624 primary_key_range: RwLock::new(primary_key_range),
625 file_purger,
626 }
627 }
628}
629
630pub async fn delete_files(
637 region_id: RegionId,
638 file_ids: &[(FileId, u64)],
639 delete_index: bool,
640 access_layer: &AccessLayerRef,
641 cache_manager: &Option<CacheManagerRef>,
642) -> crate::error::Result<()> {
643 if let Some(cache) = &cache_manager {
645 for (file_id, _) in file_ids {
646 cache.remove_parquet_meta_data(RegionFileId::new(region_id, *file_id));
647 }
648 }
649 let mut attempted_files = Vec::with_capacity(file_ids.len());
650 let mut index_ids = Vec::new();
651
652 for (file_id, index_version) in file_ids {
653 let region_file_id = RegionFileId::new(region_id, *file_id);
654 attempted_files.push(*file_id);
655 index_ids.extend(
656 (0..=*index_version).map(|version| RegionIndexId::new(region_file_id, version)),
657 );
658 }
659
660 access_layer
661 .delete_ssts(region_id, &attempted_files)
662 .await?;
663 access_layer.delete_indexes(&index_ids).await?;
664
665 debug!(
666 "Attempted to delete {} files for region {}: {:?}",
667 attempted_files.len(),
668 region_id,
669 attempted_files
670 );
671
672 for (file_id, index_version) in file_ids {
673 purge_index_cache_stager(
674 region_id,
675 delete_index,
676 access_layer,
677 cache_manager,
678 *file_id,
679 *index_version,
680 )
681 .await;
682 }
683 Ok(())
684}
685
686pub async fn delete_index(
687 region_index_id: RegionIndexId,
688 access_layer: &AccessLayerRef,
689 cache_manager: &Option<CacheManagerRef>,
690) -> crate::error::Result<()> {
691 delete_index_and_purge(region_index_id, access_layer, cache_manager).await?;
692
693 Ok(())
694}
695
696pub async fn delete_indexes(
697 index_ids: &[RegionIndexId],
698 access_layer: &AccessLayerRef,
699 cache_manager: &Option<CacheManagerRef>,
700) -> crate::error::Result<()> {
701 if index_ids.is_empty() {
702 return Ok(());
703 }
704
705 if let Err(e) = access_layer.delete_indexes(index_ids).await {
706 error!(e; "Failed to batch delete index files");
707
708 for index_id in index_ids {
709 delete_index_and_purge(*index_id, access_layer, cache_manager).await?;
710 }
711
712 return Ok(());
713 }
714
715 purge_indexes(index_ids, access_layer, cache_manager).await;
716
717 Ok(())
718}
719
720async fn delete_index_and_purge(
721 index_id: RegionIndexId,
722 access_layer: &AccessLayerRef,
723 cache_manager: &Option<CacheManagerRef>,
724) -> crate::error::Result<()> {
725 access_layer.delete_index(index_id).await?;
726 purge_index_cache_stager(
727 index_id.region_id(),
728 true,
729 access_layer,
730 cache_manager,
731 index_id.file_id(),
732 index_id.version,
733 )
734 .await;
735 Ok(())
736}
737
738async fn purge_indexes(
739 index_ids: &[RegionIndexId],
740 access_layer: &AccessLayerRef,
741 cache_manager: &Option<CacheManagerRef>,
742) {
743 for index_id in index_ids {
744 purge_index_cache_stager(
745 index_id.region_id(),
746 true,
747 access_layer,
748 cache_manager,
749 index_id.file_id(),
750 index_id.version,
751 )
752 .await;
753 }
754}
755
756async fn purge_index_cache_stager(
757 region_id: RegionId,
758 delete_index: bool,
759 access_layer: &AccessLayerRef,
760 cache_manager: &Option<CacheManagerRef>,
761 file_id: FileId,
762 index_version: u64,
763) {
764 if let Some(write_cache) = cache_manager.as_ref().and_then(|cache| cache.write_cache()) {
765 if delete_index {
767 write_cache
768 .remove(IndexKey::new(
769 region_id,
770 file_id,
771 FileType::Puffin(index_version),
772 ))
773 .await;
774 }
775
776 write_cache
778 .remove(IndexKey::new(region_id, file_id, FileType::Parquet))
779 .await;
780 }
781
782 if let Err(e) = access_layer
784 .puffin_manager_factory()
785 .purge_stager(RegionIndexId::new(
786 RegionFileId::new(region_id, file_id),
787 index_version,
788 ))
789 .await
790 {
791 error!(e; "Failed to purge stager with index file, file_id: {}, index_version: {}, region: {}",
792 file_id, index_version, region_id);
793 }
794}
795
796#[cfg(test)]
797mod tests {
798 use std::str::FromStr;
799
800 use datatypes::prelude::ConcreteDataType;
801 use datatypes::schema::{
802 ColumnSchema, FulltextAnalyzer, FulltextBackend, FulltextOptions, SkippingIndexOptions,
803 };
804 use datatypes::value::Value;
805 use partition::expr::{PartitionExpr, col};
806
807 use super::*;
808
809 fn create_file_meta(file_id: FileId, level: Level) -> FileMeta {
810 FileMeta {
811 region_id: 0.into(),
812 file_id,
813 time_range: FileTimeRange::default(),
814 level,
815 file_size: 0,
816 max_row_group_uncompressed_size: 0,
817 available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
818 indexes: vec![ColumnIndexMetadata {
819 column_id: 0,
820 created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
821 }],
822 index_file_size: 0,
823 index_version: 0,
824 num_rows: 0,
825 num_row_groups: 0,
826 sequence: None,
827 partition_expr: None,
828 num_series: 0,
829 ..Default::default()
830 }
831 }
832
833 #[test]
834 fn test_deserialize_file_meta() {
835 let file_meta = create_file_meta(FileId::random(), 0);
836 let serialized_file_meta = serde_json::to_string(&file_meta).unwrap();
837 let deserialized_file_meta = serde_json::from_str(&serialized_file_meta);
838 assert_eq!(file_meta, deserialized_file_meta.unwrap());
839 }
840
841 #[test]
842 fn test_deserialize_from_string() {
843 let json_file_meta = "{\"region_id\":0,\"file_id\":\"bc5896ec-e4d8-4017-a80d-f2de73188d55\",\
844 \"time_range\":[{\"value\":0,\"unit\":\"Millisecond\"},{\"value\":0,\"unit\":\"Millisecond\"}],\
845 \"available_indexes\":[\"InvertedIndex\"],\"indexes\":[{\"column_id\": 0, \"created_indexes\": [\"InvertedIndex\"]}],\"level\":0}";
846 let file_meta = create_file_meta(
847 FileId::from_str("bc5896ec-e4d8-4017-a80d-f2de73188d55").unwrap(),
848 0,
849 );
850 let deserialized_file_meta: FileMeta = serde_json::from_str(json_file_meta).unwrap();
851 assert_eq!(file_meta, deserialized_file_meta);
852 }
853
854 #[test]
855 fn test_file_meta_with_partition_expr() {
856 let file_id = FileId::random();
857 let partition_expr = PartitionExpr::new(
858 col("a"),
859 partition::expr::RestrictedOp::GtEq,
860 Value::UInt32(10).into(),
861 );
862
863 let file_meta_with_partition = FileMeta {
864 region_id: 0.into(),
865 file_id,
866 time_range: FileTimeRange::default(),
867 level: 0,
868 file_size: 0,
869 max_row_group_uncompressed_size: 0,
870 available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
871 indexes: vec![ColumnIndexMetadata {
872 column_id: 0,
873 created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
874 }],
875 index_file_size: 0,
876 index_version: 0,
877 num_rows: 0,
878 num_row_groups: 0,
879 sequence: None,
880 partition_expr: Some(partition_expr.clone()),
881 num_series: 0,
882 ..Default::default()
883 };
884
885 let serialized = serde_json::to_string(&file_meta_with_partition).unwrap();
887 let deserialized: FileMeta = serde_json::from_str(&serialized).unwrap();
888 assert_eq!(file_meta_with_partition, deserialized);
889
890 let serialized_value: serde_json::Value = serde_json::from_str(&serialized).unwrap();
892 assert!(serialized_value["partition_expr"].as_str().is_some());
893 let partition_expr_json = serialized_value["partition_expr"].as_str().unwrap();
894 assert!(partition_expr_json.contains("\"Column\":\"a\""));
895 assert!(partition_expr_json.contains("\"op\":\"GtEq\""));
896
897 let file_meta_none = FileMeta {
899 partition_expr: None,
900 ..file_meta_with_partition.clone()
901 };
902 let serialized_none = serde_json::to_string(&file_meta_none).unwrap();
903 let deserialized_none: FileMeta = serde_json::from_str(&serialized_none).unwrap();
904 assert_eq!(file_meta_none, deserialized_none);
905 }
906
907 #[test]
908 fn test_file_meta_partition_expr_backward_compatibility() {
909 let json_with_partition_expr = r#"{
911 "region_id": 0,
912 "file_id": "bc5896ec-e4d8-4017-a80d-f2de73188d55",
913 "time_range": [
914 {"value": 0, "unit": "Millisecond"},
915 {"value": 0, "unit": "Millisecond"}
916 ],
917 "level": 0,
918 "file_size": 0,
919 "available_indexes": ["InvertedIndex"],
920 "index_file_size": 0,
921 "num_rows": 0,
922 "num_row_groups": 0,
923 "sequence": null,
924 "partition_expr": "{\"Expr\":{\"lhs\":{\"Column\":\"a\"},\"op\":\"GtEq\",\"rhs\":{\"Value\":{\"UInt32\":10}}}}"
925 }"#;
926
927 let file_meta: FileMeta = serde_json::from_str(json_with_partition_expr).unwrap();
928 assert!(file_meta.partition_expr.is_some());
929 let expr = file_meta.partition_expr.unwrap();
930 assert_eq!(format!("{}", expr), "a >= 10");
931
932 let json_with_empty_expr = r#"{
934 "region_id": 0,
935 "file_id": "bc5896ec-e4d8-4017-a80d-f2de73188d55",
936 "time_range": [
937 {"value": 0, "unit": "Millisecond"},
938 {"value": 0, "unit": "Millisecond"}
939 ],
940 "level": 0,
941 "file_size": 0,
942 "available_indexes": [],
943 "index_file_size": 0,
944 "num_rows": 0,
945 "num_row_groups": 0,
946 "sequence": null,
947 "partition_expr": ""
948 }"#;
949
950 let file_meta_empty: FileMeta = serde_json::from_str(json_with_empty_expr).unwrap();
951 assert!(file_meta_empty.partition_expr.is_none());
952
953 let json_with_null_expr = r#"{
955 "region_id": 0,
956 "file_id": "bc5896ec-e4d8-4017-a80d-f2de73188d55",
957 "time_range": [
958 {"value": 0, "unit": "Millisecond"},
959 {"value": 0, "unit": "Millisecond"}
960 ],
961 "level": 0,
962 "file_size": 0,
963 "available_indexes": [],
964 "index_file_size": 0,
965 "num_rows": 0,
966 "num_row_groups": 0,
967 "sequence": null,
968 "partition_expr": null
969 }"#;
970
971 let file_meta_null: FileMeta = serde_json::from_str(json_with_null_expr).unwrap();
972 assert!(file_meta_null.partition_expr.is_none());
973
974 let json_with_empty_expr = r#"{
976 "region_id": 0,
977 "file_id": "bc5896ec-e4d8-4017-a80d-f2de73188d55",
978 "time_range": [
979 {"value": 0, "unit": "Millisecond"},
980 {"value": 0, "unit": "Millisecond"}
981 ],
982 "level": 0,
983 "file_size": 0,
984 "available_indexes": [],
985 "index_file_size": 0,
986 "num_rows": 0,
987 "num_row_groups": 0,
988 "sequence": null
989 }"#;
990
991 let file_meta_empty: FileMeta = serde_json::from_str(json_with_empty_expr).unwrap();
992 assert!(file_meta_empty.partition_expr.is_none());
993 }
994
995 #[test]
996 fn test_file_meta_indexes_backward_compatibility() {
997 let json_old_file_meta = r#"{
999 "region_id": 0,
1000 "file_id": "bc5896ec-e4d8-4017-a80d-f2de73188d55",
1001 "time_range": [
1002 {"value": 0, "unit": "Millisecond"},
1003 {"value": 0, "unit": "Millisecond"}
1004 ],
1005 "available_indexes": ["InvertedIndex"],
1006 "level": 0,
1007 "file_size": 0,
1008 "index_file_size": 0,
1009 "num_rows": 0,
1010 "num_row_groups": 0
1011 }"#;
1012
1013 let deserialized_file_meta: FileMeta = serde_json::from_str(json_old_file_meta).unwrap();
1014
1015 assert_eq!(deserialized_file_meta.indexes, vec![]);
1017
1018 let expected_indexes: IndexTypes = SmallVec::from_iter([IndexType::InvertedIndex]);
1019 assert_eq!(deserialized_file_meta.available_indexes, expected_indexes);
1020
1021 assert_eq!(
1022 deserialized_file_meta.file_id,
1023 FileId::from_str("bc5896ec-e4d8-4017-a80d-f2de73188d55").unwrap()
1024 );
1025 }
1026 #[test]
1027 fn test_is_index_consistent_with_region() {
1028 fn new_column_meta(
1029 id: ColumnId,
1030 name: &str,
1031 inverted: bool,
1032 fulltext: bool,
1033 skipping: bool,
1034 ) -> ColumnMetadata {
1035 let mut column_schema =
1036 ColumnSchema::new(name, ConcreteDataType::string_datatype(), true);
1037 if inverted {
1038 column_schema = column_schema.with_inverted_index(true);
1039 }
1040 if fulltext {
1041 column_schema = column_schema
1042 .with_fulltext_options(FulltextOptions::new_unchecked(
1043 true,
1044 FulltextAnalyzer::English,
1045 false,
1046 FulltextBackend::Bloom,
1047 1000,
1048 0.01,
1049 ))
1050 .unwrap();
1051 }
1052 if skipping {
1053 column_schema = column_schema
1054 .with_skipping_options(SkippingIndexOptions::new_unchecked(
1055 1024,
1056 0.01,
1057 datatypes::schema::SkippingIndexType::BloomFilter,
1058 ))
1059 .unwrap();
1060 }
1061
1062 ColumnMetadata {
1063 column_schema,
1064 semantic_type: api::v1::SemanticType::Tag,
1065 column_id: id,
1066 }
1067 }
1068
1069 let mut file_meta = FileMeta {
1071 indexes: vec![ColumnIndexMetadata {
1072 column_id: 1,
1073 created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
1074 }],
1075 ..Default::default()
1076 };
1077 let region_meta = vec![new_column_meta(1, "tag1", true, false, false)];
1078 assert!(file_meta.is_index_consistent_with_region(®ion_meta));
1079
1080 file_meta.indexes = vec![ColumnIndexMetadata {
1082 column_id: 1,
1083 created_indexes: SmallVec::from_iter([
1084 IndexType::InvertedIndex,
1085 IndexType::BloomFilterIndex,
1086 ]),
1087 }];
1088 let region_meta = vec![new_column_meta(1, "tag1", true, false, false)];
1089 assert!(file_meta.is_index_consistent_with_region(®ion_meta));
1090
1091 file_meta.indexes = vec![ColumnIndexMetadata {
1093 column_id: 1,
1094 created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
1095 }];
1096 let region_meta = vec![new_column_meta(1, "tag1", true, true, false)]; assert!(!file_meta.is_index_consistent_with_region(®ion_meta));
1098
1099 file_meta.indexes = vec![ColumnIndexMetadata {
1101 column_id: 2, created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
1103 }];
1104 let region_meta = vec![new_column_meta(1, "tag1", true, false, false)]; assert!(!file_meta.is_index_consistent_with_region(®ion_meta));
1106
1107 file_meta.indexes = vec![ColumnIndexMetadata {
1109 column_id: 1,
1110 created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
1111 }];
1112 let region_meta = vec![new_column_meta(1, "tag1", false, false, false)]; assert!(file_meta.is_index_consistent_with_region(®ion_meta));
1114
1115 file_meta.indexes = vec![];
1117 let region_meta = vec![new_column_meta(1, "tag1", true, false, false)];
1118 assert!(!file_meta.is_index_consistent_with_region(®ion_meta));
1119
1120 file_meta.indexes = vec![
1122 ColumnIndexMetadata {
1123 column_id: 1,
1124 created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
1125 },
1126 ColumnIndexMetadata {
1127 column_id: 2, created_indexes: SmallVec::from_iter([IndexType::FulltextIndex]),
1129 },
1130 ];
1131 let region_meta = vec![
1132 new_column_meta(1, "tag1", true, false, false),
1133 new_column_meta(2, "tag2", false, true, true), ];
1135 assert!(!file_meta.is_index_consistent_with_region(®ion_meta));
1136 }
1137}