mito2/sst/
file.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Structures to describe metadata of files.
16
17use std::fmt;
18use std::fmt::{Debug, Formatter};
19use std::num::NonZeroU64;
20use std::sync::Arc;
21use std::sync::atomic::{AtomicBool, Ordering};
22
23use common_base::readable_size::ReadableSize;
24use common_telemetry::{debug, error};
25use common_time::Timestamp;
26use partition::expr::PartitionExpr;
27use serde::{Deserialize, Serialize};
28use smallvec::SmallVec;
29use store_api::metadata::ColumnMetadata;
30use store_api::region_request::PathType;
31use store_api::storage::{ColumnId, FileId, IndexVersion, RegionId};
32
33use crate::access_layer::AccessLayerRef;
34use crate::cache::CacheManagerRef;
35use crate::cache::file_cache::{FileType, IndexKey};
36use crate::sst::file_purger::FilePurgerRef;
37use crate::sst::location;
38
39/// Custom serde functions for partition_expr field in FileMeta
40fn serialize_partition_expr<S>(
41    partition_expr: &Option<PartitionExpr>,
42    serializer: S,
43) -> Result<S::Ok, S::Error>
44where
45    S: serde::Serializer,
46{
47    use serde::ser::Error;
48
49    match partition_expr {
50        None => serializer.serialize_none(),
51        Some(expr) => {
52            let json_str = expr.as_json_str().map_err(S::Error::custom)?;
53            serializer.serialize_some(&json_str)
54        }
55    }
56}
57
58fn deserialize_partition_expr<'de, D>(deserializer: D) -> Result<Option<PartitionExpr>, D::Error>
59where
60    D: serde::Deserializer<'de>,
61{
62    use serde::de::Error;
63
64    let opt_json_str: Option<String> = Option::deserialize(deserializer)?;
65    match opt_json_str {
66        None => Ok(None),
67        Some(json_str) => {
68            if json_str.is_empty() {
69                // Empty string represents explicit "single-region/no-partition" designation
70                Ok(None)
71            } else {
72                // Parse the JSON string to PartitionExpr
73                PartitionExpr::from_json_str(&json_str).map_err(D::Error::custom)
74            }
75        }
76    }
77}
78
79/// Type to store SST level.
80pub type Level = u8;
81/// Maximum level of SSTs.
82pub const MAX_LEVEL: Level = 2;
83/// Type to store index types for a column.
84pub type IndexTypes = SmallVec<[IndexType; 4]>;
85
86/// Cross-region file id.
87///
88/// It contains a region id and a file id. The string representation is `{region_id}/{file_id}`.
89#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
90pub struct RegionFileId {
91    /// The region that creates the file.
92    region_id: RegionId,
93    /// The id of the file.
94    file_id: FileId,
95}
96
97impl RegionFileId {
98    /// Creates a new [RegionFileId] from `region_id` and `file_id`.
99    pub fn new(region_id: RegionId, file_id: FileId) -> Self {
100        Self { region_id, file_id }
101    }
102
103    /// Gets the region id.
104    pub fn region_id(&self) -> RegionId {
105        self.region_id
106    }
107
108    /// Gets the file id.
109    pub fn file_id(&self) -> FileId {
110        self.file_id
111    }
112}
113
114impl fmt::Display for RegionFileId {
115    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
116        write!(f, "{}/{}", self.region_id, self.file_id)
117    }
118}
119
120/// Unique identifier for an index file, combining the SST file ID and the index version.
121#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
122pub struct RegionIndexId {
123    pub file_id: RegionFileId,
124    pub version: IndexVersion,
125}
126
127impl RegionIndexId {
128    pub fn new(file_id: RegionFileId, version: IndexVersion) -> Self {
129        Self { file_id, version }
130    }
131
132    pub fn region_id(&self) -> RegionId {
133        self.file_id.region_id
134    }
135
136    pub fn file_id(&self) -> FileId {
137        self.file_id.file_id
138    }
139}
140
141impl fmt::Display for RegionIndexId {
142    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
143        if self.version == 0 {
144            write!(f, "{}/{}", self.file_id.region_id, self.file_id.file_id)
145        } else {
146            write!(
147                f,
148                "{}/{}.{}",
149                self.file_id.region_id, self.file_id.file_id, self.version
150            )
151        }
152    }
153}
154
155/// Time range (min and max timestamps) of a SST file.
156/// Both min and max are inclusive.
157pub type FileTimeRange = (Timestamp, Timestamp);
158
159/// Checks if two inclusive timestamp ranges overlap with each other.
160pub(crate) fn overlaps(l: &FileTimeRange, r: &FileTimeRange) -> bool {
161    let (l, r) = if l.0 <= r.0 { (l, r) } else { (r, l) };
162    let (_, l_end) = l;
163    let (r_start, _) = r;
164
165    r_start <= l_end
166}
167
168/// Metadata of a SST file.
169#[derive(Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
170#[serde(default)]
171pub struct FileMeta {
172    /// Region that created the file. The region id may not be the id of the current region.
173    pub region_id: RegionId,
174    /// Compared to normal file names, FileId ignore the extension
175    pub file_id: FileId,
176    /// Timestamp range of file. The timestamps have the same time unit as the
177    /// data in the SST.
178    pub time_range: FileTimeRange,
179    /// SST level of the file.
180    pub level: Level,
181    /// Size of the file.
182    pub file_size: u64,
183    /// Maximum uncompressed row group size of the file. 0 means unknown.
184    pub max_row_group_uncompressed_size: u64,
185    /// Available indexes of the file.
186    pub available_indexes: IndexTypes,
187    /// Created indexes of the file for each column.
188    ///
189    /// This is essentially a more granular, column-level version of `available_indexes`,
190    /// primarily used for manual index building in the asynchronous index construction mode.
191    ///
192    /// For backward compatibility, older `FileMeta` versions might only contain `available_indexes`.
193    /// In such cases, we cannot deduce specific column index information from `available_indexes` alone.
194    /// Therefore, defaulting this `indexes` field to an empty list during deserialization is a
195    /// reasonable and necessary step to ensure column information consistency.
196    pub indexes: Vec<ColumnIndexMetadata>,
197    /// Size of the index file.
198    pub index_file_size: u64,
199    /// Version of the index file.
200    /// Used to generate the index file name: "{file_id}.{index_version}.puffin".
201    /// Default is 0 (which maps to "{file_id}.puffin" for compatibility).
202    pub index_version: u64,
203    /// Number of rows in the file.
204    ///
205    /// For historical reasons, this field might be missing in old files. Thus
206    /// the default value `0` doesn't means the file doesn't contains any rows,
207    /// but instead means the number of rows is unknown.
208    pub num_rows: u64,
209    /// Number of row groups in the file.
210    ///
211    /// For historical reasons, this field might be missing in old files. Thus
212    /// the default value `0` doesn't means the file doesn't contains any rows,
213    /// but instead means the number of rows is unknown.
214    pub num_row_groups: u64,
215    /// Sequence in this file.
216    ///
217    /// This sequence is the only sequence in this file. And it's retrieved from the max
218    /// sequence of the rows on generating this file.
219    pub sequence: Option<NonZeroU64>,
220    /// Partition expression from the region metadata when the file is created.
221    ///
222    /// This is stored as a PartitionExpr object in memory for convenience,
223    /// but serialized as JSON string for manifest compatibility.
224    /// Compatibility behavior:
225    /// - None: no partition expr was set when the file was created (legacy files).
226    /// - Some(expr): partition expression from region metadata.
227    #[serde(
228        serialize_with = "serialize_partition_expr",
229        deserialize_with = "deserialize_partition_expr"
230    )]
231    pub partition_expr: Option<PartitionExpr>,
232    /// Number of series in the file.
233    ///
234    /// The number is 0 if the series number is not available.
235    pub num_series: u64,
236}
237
238impl Debug for FileMeta {
239    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
240        let mut debug_struct = f.debug_struct("FileMeta");
241        debug_struct
242            .field("region_id", &self.region_id)
243            .field_with("file_id", |f| write!(f, "{} ", self.file_id))
244            .field_with("time_range", |f| {
245                write!(
246                    f,
247                    "({}, {}) ",
248                    self.time_range.0.to_iso8601_string(),
249                    self.time_range.1.to_iso8601_string()
250                )
251            })
252            .field("level", &self.level)
253            .field("file_size", &ReadableSize(self.file_size))
254            .field(
255                "max_row_group_uncompressed_size",
256                &ReadableSize(self.max_row_group_uncompressed_size),
257            );
258        if !self.available_indexes.is_empty() {
259            debug_struct
260                .field("available_indexes", &self.available_indexes)
261                .field("indexes", &self.indexes)
262                .field("index_file_size", &ReadableSize(self.index_file_size));
263        }
264        debug_struct
265            .field("num_rows", &self.num_rows)
266            .field("num_row_groups", &self.num_row_groups)
267            .field_with("sequence", |f| match self.sequence {
268                None => {
269                    write!(f, "None")
270                }
271                Some(seq) => {
272                    write!(f, "{}", seq)
273                }
274            })
275            .field("partition_expr", &self.partition_expr)
276            .field("num_series", &self.num_series)
277            .finish()
278    }
279}
280
281/// Type of index.
282#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
283pub enum IndexType {
284    /// Inverted index.
285    InvertedIndex,
286    /// Full-text index.
287    FulltextIndex,
288    /// Bloom Filter index
289    BloomFilterIndex,
290    /// Vector index (HNSW).
291    #[cfg(feature = "vector_index")]
292    VectorIndex,
293}
294
295/// Metadata of indexes created for a specific column in an SST file.
296///
297/// This structure tracks which index types have been successfully created for a column.
298/// It provides more granular, column-level index information compared to the file-level
299/// `available_indexes` field in [`FileMeta`].
300///
301/// This is primarily used for:
302/// - Manual index building in asynchronous index construction mode
303/// - Verifying index consistency between files and region metadata
304#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
305#[serde(default)]
306pub struct ColumnIndexMetadata {
307    /// The column ID this index metadata applies to.
308    pub column_id: ColumnId,
309    /// List of index types that have been successfully created for this column.
310    pub created_indexes: IndexTypes,
311}
312
313impl FileMeta {
314    pub fn exists_index(&self) -> bool {
315        !self.available_indexes.is_empty()
316    }
317
318    pub fn index_version(&self) -> Option<IndexVersion> {
319        if self.exists_index() {
320            Some(self.index_version)
321        } else {
322            None
323        }
324    }
325
326    /// Whether the index file is up-to-date comparing to another file meta.    
327    pub fn is_index_up_to_date(&self, other: &FileMeta) -> bool {
328        self.exists_index() && other.exists_index() && self.index_version >= other.index_version
329    }
330
331    /// Returns true if the file has an inverted index
332    pub fn inverted_index_available(&self) -> bool {
333        self.available_indexes.contains(&IndexType::InvertedIndex)
334    }
335
336    /// Returns true if the file has a fulltext index
337    pub fn fulltext_index_available(&self) -> bool {
338        self.available_indexes.contains(&IndexType::FulltextIndex)
339    }
340
341    /// Returns true if the file has a bloom filter index.
342    pub fn bloom_filter_index_available(&self) -> bool {
343        self.available_indexes
344            .contains(&IndexType::BloomFilterIndex)
345    }
346
347    /// Returns true if the file has a vector index.
348    #[cfg(feature = "vector_index")]
349    pub fn vector_index_available(&self) -> bool {
350        self.available_indexes.contains(&IndexType::VectorIndex)
351    }
352
353    pub fn index_file_size(&self) -> u64 {
354        self.index_file_size
355    }
356
357    /// Check whether the file index is consistent with the given region metadata.
358    pub fn is_index_consistent_with_region(&self, metadata: &[ColumnMetadata]) -> bool {
359        let id_to_indexes = self
360            .indexes
361            .iter()
362            .map(|index| (index.column_id, index.created_indexes.clone()))
363            .collect::<std::collections::HashMap<_, _>>();
364        for column in metadata {
365            if !column.column_schema.is_indexed() {
366                continue;
367            }
368            if let Some(indexes) = id_to_indexes.get(&column.column_id) {
369                if column.column_schema.is_inverted_indexed()
370                    && !indexes.contains(&IndexType::InvertedIndex)
371                {
372                    return false;
373                }
374                if column.column_schema.is_fulltext_indexed()
375                    && !indexes.contains(&IndexType::FulltextIndex)
376                {
377                    return false;
378                }
379                if column.column_schema.is_skipping_indexed()
380                    && !indexes.contains(&IndexType::BloomFilterIndex)
381                {
382                    return false;
383                }
384            } else {
385                return false;
386            }
387        }
388        true
389    }
390
391    /// Returns the cross-region file id.
392    pub fn file_id(&self) -> RegionFileId {
393        RegionFileId::new(self.region_id, self.file_id)
394    }
395
396    /// Returns the RegionIndexId for this file.
397    pub fn index_id(&self) -> RegionIndexId {
398        RegionIndexId::new(self.file_id(), self.index_version)
399    }
400}
401
402/// Handle to a SST file.
403#[derive(Clone)]
404pub struct FileHandle {
405    inner: Arc<FileHandleInner>,
406}
407
408impl fmt::Debug for FileHandle {
409    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
410        f.debug_struct("FileHandle")
411            .field("meta", self.meta_ref())
412            .field("compacting", &self.compacting())
413            .field("deleted", &self.inner.deleted.load(Ordering::Relaxed))
414            .finish()
415    }
416}
417
418impl FileHandle {
419    pub fn new(meta: FileMeta, file_purger: FilePurgerRef) -> FileHandle {
420        FileHandle {
421            inner: Arc::new(FileHandleInner::new(meta, file_purger)),
422        }
423    }
424
425    /// Returns the region id of the file.
426    pub fn region_id(&self) -> RegionId {
427        self.inner.meta.region_id
428    }
429
430    /// Returns the cross-region file id.
431    pub fn file_id(&self) -> RegionFileId {
432        RegionFileId::new(self.inner.meta.region_id, self.inner.meta.file_id)
433    }
434
435    /// Returns the RegionIndexId for this file.
436    pub fn index_id(&self) -> RegionIndexId {
437        RegionIndexId::new(self.file_id(), self.inner.meta.index_version)
438    }
439
440    /// Returns the complete file path of the file.
441    pub fn file_path(&self, table_dir: &str, path_type: PathType) -> String {
442        location::sst_file_path(table_dir, self.file_id(), path_type)
443    }
444
445    /// Returns the time range of the file.
446    pub fn time_range(&self) -> FileTimeRange {
447        self.inner.meta.time_range
448    }
449
450    /// Mark the file as deleted and will delete it on drop asynchronously
451    pub fn mark_deleted(&self) {
452        self.inner.deleted.store(true, Ordering::Relaxed);
453    }
454
455    pub fn compacting(&self) -> bool {
456        self.inner.compacting.load(Ordering::Relaxed)
457    }
458
459    pub fn set_compacting(&self, compacting: bool) {
460        self.inner.compacting.store(compacting, Ordering::Relaxed);
461    }
462
463    pub fn index_outdated(&self) -> bool {
464        self.inner.index_outdated.load(Ordering::Relaxed)
465    }
466
467    pub fn set_index_outdated(&self, index_outdated: bool) {
468        self.inner
469            .index_outdated
470            .store(index_outdated, Ordering::Relaxed);
471    }
472
473    /// Returns a reference to the [FileMeta].
474    pub fn meta_ref(&self) -> &FileMeta {
475        &self.inner.meta
476    }
477
478    pub fn file_purger(&self) -> FilePurgerRef {
479        self.inner.file_purger.clone()
480    }
481
482    pub fn size(&self) -> u64 {
483        self.inner.meta.file_size
484    }
485
486    pub fn index_size(&self) -> u64 {
487        self.inner.meta.index_file_size
488    }
489
490    pub fn num_rows(&self) -> usize {
491        self.inner.meta.num_rows as usize
492    }
493
494    pub fn level(&self) -> Level {
495        self.inner.meta.level
496    }
497
498    pub fn is_deleted(&self) -> bool {
499        self.inner.deleted.load(Ordering::Relaxed)
500    }
501}
502
503/// Inner data of [FileHandle].
504///
505/// Contains meta of the file, and other mutable info like whether the file is compacting.
506struct FileHandleInner {
507    meta: FileMeta,
508    compacting: AtomicBool,
509    deleted: AtomicBool,
510    index_outdated: AtomicBool,
511    file_purger: FilePurgerRef,
512}
513
514impl Drop for FileHandleInner {
515    fn drop(&mut self) {
516        self.file_purger.remove_file(
517            self.meta.clone(),
518            self.deleted.load(Ordering::Acquire),
519            self.index_outdated.load(Ordering::Acquire),
520        );
521    }
522}
523
524impl FileHandleInner {
525    /// There should only be one `FileHandleInner` for each file on a datanode
526    fn new(meta: FileMeta, file_purger: FilePurgerRef) -> FileHandleInner {
527        file_purger.new_file(&meta);
528        FileHandleInner {
529            meta,
530            compacting: AtomicBool::new(false),
531            deleted: AtomicBool::new(false),
532            index_outdated: AtomicBool::new(false),
533            file_purger,
534        }
535    }
536}
537
538/// Delete files for a region.
539/// - `region_id`: Region id.
540/// - `file_ids`: List of (file id, index version) tuples to delete.
541/// - `delete_index`: Whether to delete the index file from the cache.
542/// - `access_layer`: Access layer to delete files.
543/// - `cache_manager`: Cache manager to remove files from cache.
544pub async fn delete_files(
545    region_id: RegionId,
546    file_ids: &[(FileId, u64)],
547    delete_index: bool,
548    access_layer: &AccessLayerRef,
549    cache_manager: &Option<CacheManagerRef>,
550) -> crate::error::Result<()> {
551    // Remove meta of the file from cache.
552    if let Some(cache) = &cache_manager {
553        for (file_id, _) in file_ids {
554            cache.remove_parquet_meta_data(RegionFileId::new(region_id, *file_id));
555        }
556    }
557    let mut attempted_files = Vec::with_capacity(file_ids.len());
558    let mut index_ids = Vec::new();
559
560    for (file_id, index_version) in file_ids {
561        let region_file_id = RegionFileId::new(region_id, *file_id);
562        attempted_files.push(*file_id);
563        index_ids.extend(
564            (0..=*index_version).map(|version| RegionIndexId::new(region_file_id, version)),
565        );
566    }
567
568    access_layer
569        .delete_ssts(region_id, &attempted_files)
570        .await?;
571    access_layer.delete_indexes(&index_ids).await?;
572
573    debug!(
574        "Attempted to delete {} files for region {}: {:?}",
575        attempted_files.len(),
576        region_id,
577        attempted_files
578    );
579
580    for (file_id, index_version) in file_ids {
581        purge_index_cache_stager(
582            region_id,
583            delete_index,
584            access_layer,
585            cache_manager,
586            *file_id,
587            *index_version,
588        )
589        .await;
590    }
591    Ok(())
592}
593
594pub async fn delete_index(
595    region_index_id: RegionIndexId,
596    access_layer: &AccessLayerRef,
597    cache_manager: &Option<CacheManagerRef>,
598) -> crate::error::Result<()> {
599    delete_index_and_purge(region_index_id, access_layer, cache_manager).await?;
600
601    Ok(())
602}
603
604pub async fn delete_indexes(
605    index_ids: &[RegionIndexId],
606    access_layer: &AccessLayerRef,
607    cache_manager: &Option<CacheManagerRef>,
608) -> crate::error::Result<()> {
609    if index_ids.is_empty() {
610        return Ok(());
611    }
612
613    if let Err(e) = access_layer.delete_indexes(index_ids).await {
614        error!(e; "Failed to batch delete index files");
615
616        for index_id in index_ids {
617            delete_index_and_purge(*index_id, access_layer, cache_manager).await?;
618        }
619
620        return Ok(());
621    }
622
623    purge_indexes(index_ids, access_layer, cache_manager).await;
624
625    Ok(())
626}
627
628async fn delete_index_and_purge(
629    index_id: RegionIndexId,
630    access_layer: &AccessLayerRef,
631    cache_manager: &Option<CacheManagerRef>,
632) -> crate::error::Result<()> {
633    access_layer.delete_index(index_id).await?;
634    purge_index_cache_stager(
635        index_id.region_id(),
636        true,
637        access_layer,
638        cache_manager,
639        index_id.file_id(),
640        index_id.version,
641    )
642    .await;
643    Ok(())
644}
645
646async fn purge_indexes(
647    index_ids: &[RegionIndexId],
648    access_layer: &AccessLayerRef,
649    cache_manager: &Option<CacheManagerRef>,
650) {
651    for index_id in index_ids {
652        purge_index_cache_stager(
653            index_id.region_id(),
654            true,
655            access_layer,
656            cache_manager,
657            index_id.file_id(),
658            index_id.version,
659        )
660        .await;
661    }
662}
663
664async fn purge_index_cache_stager(
665    region_id: RegionId,
666    delete_index: bool,
667    access_layer: &AccessLayerRef,
668    cache_manager: &Option<CacheManagerRef>,
669    file_id: FileId,
670    index_version: u64,
671) {
672    if let Some(write_cache) = cache_manager.as_ref().and_then(|cache| cache.write_cache()) {
673        // Removes index file from the cache.
674        if delete_index {
675            write_cache
676                .remove(IndexKey::new(
677                    region_id,
678                    file_id,
679                    FileType::Puffin(index_version),
680                ))
681                .await;
682        }
683
684        // Remove the SST file from the cache.
685        write_cache
686            .remove(IndexKey::new(region_id, file_id, FileType::Parquet))
687            .await;
688    }
689
690    // Purges index content in the stager.
691    if let Err(e) = access_layer
692        .puffin_manager_factory()
693        .purge_stager(RegionIndexId::new(
694            RegionFileId::new(region_id, file_id),
695            index_version,
696        ))
697        .await
698    {
699        error!(e; "Failed to purge stager with index file, file_id: {}, index_version: {}, region: {}",
700                file_id, index_version, region_id);
701    }
702}
703
704#[cfg(test)]
705mod tests {
706    use std::str::FromStr;
707
708    use datatypes::prelude::ConcreteDataType;
709    use datatypes::schema::{
710        ColumnSchema, FulltextAnalyzer, FulltextBackend, FulltextOptions, SkippingIndexOptions,
711    };
712    use datatypes::value::Value;
713    use partition::expr::{PartitionExpr, col};
714
715    use super::*;
716
717    fn create_file_meta(file_id: FileId, level: Level) -> FileMeta {
718        FileMeta {
719            region_id: 0.into(),
720            file_id,
721            time_range: FileTimeRange::default(),
722            level,
723            file_size: 0,
724            max_row_group_uncompressed_size: 0,
725            available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
726            indexes: vec![ColumnIndexMetadata {
727                column_id: 0,
728                created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
729            }],
730            index_file_size: 0,
731            index_version: 0,
732            num_rows: 0,
733            num_row_groups: 0,
734            sequence: None,
735            partition_expr: None,
736            num_series: 0,
737        }
738    }
739
740    #[test]
741    fn test_deserialize_file_meta() {
742        let file_meta = create_file_meta(FileId::random(), 0);
743        let serialized_file_meta = serde_json::to_string(&file_meta).unwrap();
744        let deserialized_file_meta = serde_json::from_str(&serialized_file_meta);
745        assert_eq!(file_meta, deserialized_file_meta.unwrap());
746    }
747
748    #[test]
749    fn test_deserialize_from_string() {
750        let json_file_meta = "{\"region_id\":0,\"file_id\":\"bc5896ec-e4d8-4017-a80d-f2de73188d55\",\
751        \"time_range\":[{\"value\":0,\"unit\":\"Millisecond\"},{\"value\":0,\"unit\":\"Millisecond\"}],\
752        \"available_indexes\":[\"InvertedIndex\"],\"indexes\":[{\"column_id\": 0, \"created_indexes\": [\"InvertedIndex\"]}],\"level\":0}";
753        let file_meta = create_file_meta(
754            FileId::from_str("bc5896ec-e4d8-4017-a80d-f2de73188d55").unwrap(),
755            0,
756        );
757        let deserialized_file_meta: FileMeta = serde_json::from_str(json_file_meta).unwrap();
758        assert_eq!(file_meta, deserialized_file_meta);
759    }
760
761    #[test]
762    fn test_file_meta_with_partition_expr() {
763        let file_id = FileId::random();
764        let partition_expr = PartitionExpr::new(
765            col("a"),
766            partition::expr::RestrictedOp::GtEq,
767            Value::UInt32(10).into(),
768        );
769
770        let file_meta_with_partition = FileMeta {
771            region_id: 0.into(),
772            file_id,
773            time_range: FileTimeRange::default(),
774            level: 0,
775            file_size: 0,
776            max_row_group_uncompressed_size: 0,
777            available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
778            indexes: vec![ColumnIndexMetadata {
779                column_id: 0,
780                created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
781            }],
782            index_file_size: 0,
783            index_version: 0,
784            num_rows: 0,
785            num_row_groups: 0,
786            sequence: None,
787            partition_expr: Some(partition_expr.clone()),
788            num_series: 0,
789        };
790
791        // Test serialization/deserialization
792        let serialized = serde_json::to_string(&file_meta_with_partition).unwrap();
793        let deserialized: FileMeta = serde_json::from_str(&serialized).unwrap();
794        assert_eq!(file_meta_with_partition, deserialized);
795
796        // Verify the serialized JSON contains the expected partition expression string
797        let serialized_value: serde_json::Value = serde_json::from_str(&serialized).unwrap();
798        assert!(serialized_value["partition_expr"].as_str().is_some());
799        let partition_expr_json = serialized_value["partition_expr"].as_str().unwrap();
800        assert!(partition_expr_json.contains("\"Column\":\"a\""));
801        assert!(partition_expr_json.contains("\"op\":\"GtEq\""));
802
803        // Test with None (legacy files)
804        let file_meta_none = FileMeta {
805            partition_expr: None,
806            ..file_meta_with_partition.clone()
807        };
808        let serialized_none = serde_json::to_string(&file_meta_none).unwrap();
809        let deserialized_none: FileMeta = serde_json::from_str(&serialized_none).unwrap();
810        assert_eq!(file_meta_none, deserialized_none);
811    }
812
813    #[test]
814    fn test_file_meta_partition_expr_backward_compatibility() {
815        // Test that we can deserialize old JSON format with partition_expr as string
816        let json_with_partition_expr = r#"{
817            "region_id": 0,
818            "file_id": "bc5896ec-e4d8-4017-a80d-f2de73188d55",
819            "time_range": [
820                {"value": 0, "unit": "Millisecond"},
821                {"value": 0, "unit": "Millisecond"}
822            ],
823            "level": 0,
824            "file_size": 0,
825            "available_indexes": ["InvertedIndex"],
826            "index_file_size": 0,
827            "num_rows": 0,
828            "num_row_groups": 0,
829            "sequence": null,
830            "partition_expr": "{\"Expr\":{\"lhs\":{\"Column\":\"a\"},\"op\":\"GtEq\",\"rhs\":{\"Value\":{\"UInt32\":10}}}}"
831        }"#;
832
833        let file_meta: FileMeta = serde_json::from_str(json_with_partition_expr).unwrap();
834        assert!(file_meta.partition_expr.is_some());
835        let expr = file_meta.partition_expr.unwrap();
836        assert_eq!(format!("{}", expr), "a >= 10");
837
838        // Test empty partition expression string
839        let json_with_empty_expr = r#"{
840            "region_id": 0,
841            "file_id": "bc5896ec-e4d8-4017-a80d-f2de73188d55",
842            "time_range": [
843                {"value": 0, "unit": "Millisecond"},
844                {"value": 0, "unit": "Millisecond"}
845            ],
846            "level": 0,
847            "file_size": 0,
848            "available_indexes": [],
849            "index_file_size": 0,
850            "num_rows": 0,
851            "num_row_groups": 0,
852            "sequence": null,
853            "partition_expr": ""
854        }"#;
855
856        let file_meta_empty: FileMeta = serde_json::from_str(json_with_empty_expr).unwrap();
857        assert!(file_meta_empty.partition_expr.is_none());
858
859        // Test null partition expression
860        let json_with_null_expr = r#"{
861            "region_id": 0,
862            "file_id": "bc5896ec-e4d8-4017-a80d-f2de73188d55",
863            "time_range": [
864                {"value": 0, "unit": "Millisecond"},
865                {"value": 0, "unit": "Millisecond"}
866            ],
867            "level": 0,
868            "file_size": 0,
869            "available_indexes": [],
870            "index_file_size": 0,
871            "num_rows": 0,
872            "num_row_groups": 0,
873            "sequence": null,
874            "partition_expr": null
875        }"#;
876
877        let file_meta_null: FileMeta = serde_json::from_str(json_with_null_expr).unwrap();
878        assert!(file_meta_null.partition_expr.is_none());
879
880        // Test partition expression doesn't exist
881        let json_with_empty_expr = r#"{
882            "region_id": 0,
883            "file_id": "bc5896ec-e4d8-4017-a80d-f2de73188d55",
884            "time_range": [
885                {"value": 0, "unit": "Millisecond"},
886                {"value": 0, "unit": "Millisecond"}
887            ],
888            "level": 0,
889            "file_size": 0,
890            "available_indexes": [],
891            "index_file_size": 0,
892            "num_rows": 0,
893            "num_row_groups": 0,
894            "sequence": null
895        }"#;
896
897        let file_meta_empty: FileMeta = serde_json::from_str(json_with_empty_expr).unwrap();
898        assert!(file_meta_empty.partition_expr.is_none());
899    }
900
901    #[test]
902    fn test_file_meta_indexes_backward_compatibility() {
903        // Old FileMeta format without the 'indexes' field
904        let json_old_file_meta = r#"{
905            "region_id": 0,
906            "file_id": "bc5896ec-e4d8-4017-a80d-f2de73188d55",
907            "time_range": [
908                {"value": 0, "unit": "Millisecond"},
909                {"value": 0, "unit": "Millisecond"}
910            ],
911            "available_indexes": ["InvertedIndex"],
912            "level": 0,
913            "file_size": 0,
914            "index_file_size": 0,
915            "num_rows": 0,
916            "num_row_groups": 0
917        }"#;
918
919        let deserialized_file_meta: FileMeta = serde_json::from_str(json_old_file_meta).unwrap();
920
921        // Verify backward compatibility: indexes field should default to empty vec
922        assert_eq!(deserialized_file_meta.indexes, vec![]);
923
924        let expected_indexes: IndexTypes = SmallVec::from_iter([IndexType::InvertedIndex]);
925        assert_eq!(deserialized_file_meta.available_indexes, expected_indexes);
926
927        assert_eq!(
928            deserialized_file_meta.file_id,
929            FileId::from_str("bc5896ec-e4d8-4017-a80d-f2de73188d55").unwrap()
930        );
931    }
932    #[test]
933    fn test_is_index_consistent_with_region() {
934        fn new_column_meta(
935            id: ColumnId,
936            name: &str,
937            inverted: bool,
938            fulltext: bool,
939            skipping: bool,
940        ) -> ColumnMetadata {
941            let mut column_schema =
942                ColumnSchema::new(name, ConcreteDataType::string_datatype(), true);
943            if inverted {
944                column_schema = column_schema.with_inverted_index(true);
945            }
946            if fulltext {
947                column_schema = column_schema
948                    .with_fulltext_options(FulltextOptions::new_unchecked(
949                        true,
950                        FulltextAnalyzer::English,
951                        false,
952                        FulltextBackend::Bloom,
953                        1000,
954                        0.01,
955                    ))
956                    .unwrap();
957            }
958            if skipping {
959                column_schema = column_schema
960                    .with_skipping_options(SkippingIndexOptions::new_unchecked(
961                        1024,
962                        0.01,
963                        datatypes::schema::SkippingIndexType::BloomFilter,
964                    ))
965                    .unwrap();
966            }
967
968            ColumnMetadata {
969                column_schema,
970                semantic_type: api::v1::SemanticType::Tag,
971                column_id: id,
972            }
973        }
974
975        // Case 1: Perfect match. File has exactly the required indexes.
976        let mut file_meta = FileMeta {
977            indexes: vec![ColumnIndexMetadata {
978                column_id: 1,
979                created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
980            }],
981            ..Default::default()
982        };
983        let region_meta = vec![new_column_meta(1, "tag1", true, false, false)];
984        assert!(file_meta.is_index_consistent_with_region(&region_meta));
985
986        // Case 2: Superset match. File has more indexes than required.
987        file_meta.indexes = vec![ColumnIndexMetadata {
988            column_id: 1,
989            created_indexes: SmallVec::from_iter([
990                IndexType::InvertedIndex,
991                IndexType::BloomFilterIndex,
992            ]),
993        }];
994        let region_meta = vec![new_column_meta(1, "tag1", true, false, false)];
995        assert!(file_meta.is_index_consistent_with_region(&region_meta));
996
997        // Case 3: Missing index type. File has the column but lacks the required index type.
998        file_meta.indexes = vec![ColumnIndexMetadata {
999            column_id: 1,
1000            created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
1001        }];
1002        let region_meta = vec![new_column_meta(1, "tag1", true, true, false)]; // Requires fulltext too
1003        assert!(!file_meta.is_index_consistent_with_region(&region_meta));
1004
1005        // Case 4: Missing column. Region requires an index on a column not in the file's index list.
1006        file_meta.indexes = vec![ColumnIndexMetadata {
1007            column_id: 2, // File only has index for column 2
1008            created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
1009        }];
1010        let region_meta = vec![new_column_meta(1, "tag1", true, false, false)]; // Requires index on column 1
1011        assert!(!file_meta.is_index_consistent_with_region(&region_meta));
1012
1013        // Case 5: No indexes required by region. Should always be consistent.
1014        file_meta.indexes = vec![ColumnIndexMetadata {
1015            column_id: 1,
1016            created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
1017        }];
1018        let region_meta = vec![new_column_meta(1, "tag1", false, false, false)]; // No index required
1019        assert!(file_meta.is_index_consistent_with_region(&region_meta));
1020
1021        // Case 6: Empty file indexes. Region requires an index.
1022        file_meta.indexes = vec![];
1023        let region_meta = vec![new_column_meta(1, "tag1", true, false, false)];
1024        assert!(!file_meta.is_index_consistent_with_region(&region_meta));
1025
1026        // Case 7: Multiple columns, one is inconsistent.
1027        file_meta.indexes = vec![
1028            ColumnIndexMetadata {
1029                column_id: 1,
1030                created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
1031            },
1032            ColumnIndexMetadata {
1033                column_id: 2, // Column 2 is missing the required BloomFilterIndex
1034                created_indexes: SmallVec::from_iter([IndexType::FulltextIndex]),
1035            },
1036        ];
1037        let region_meta = vec![
1038            new_column_meta(1, "tag1", true, false, false),
1039            new_column_meta(2, "tag2", false, true, true), // Requires Fulltext and BloomFilter
1040        ];
1041        assert!(!file_meta.is_index_consistent_with_region(&region_meta));
1042    }
1043}