Skip to main content

mito2/sst/
file.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Structures to describe metadata of files.
16
17use std::fmt;
18use std::fmt::{Debug, Formatter};
19use std::num::NonZeroU64;
20use std::sync::atomic::{AtomicBool, Ordering};
21use std::sync::{Arc, RwLock};
22
23use base64::prelude::{BASE64_STANDARD, Engine};
24use bytes::Bytes;
25use common_base::readable_size::ReadableSize;
26use common_telemetry::{debug, error};
27use common_time::Timestamp;
28use partition::expr::PartitionExpr;
29use serde::{Deserialize, Serialize};
30use smallvec::SmallVec;
31use store_api::metadata::ColumnMetadata;
32use store_api::region_request::PathType;
33use store_api::storage::{ColumnId, FileId, IndexVersion, RegionId};
34
35use crate::access_layer::AccessLayerRef;
36use crate::cache::CacheManagerRef;
37use crate::cache::file_cache::{FileType, IndexKey};
38use crate::sst::file_purger::FilePurgerRef;
39use crate::sst::location;
40
41/// Custom serde functions for Bytes fields serialized as base64 strings.
42fn serialize_bytes_option<S>(bytes: &Option<Bytes>, serializer: S) -> Result<S::Ok, S::Error>
43where
44    S: serde::Serializer,
45{
46    match bytes {
47        None => serializer.serialize_none(),
48        Some(b) => serializer.serialize_some(&BASE64_STANDARD.encode(b)),
49    }
50}
51
52fn deserialize_bytes_option<'de, D>(deserializer: D) -> Result<Option<Bytes>, D::Error>
53where
54    D: serde::Deserializer<'de>,
55{
56    let opt: Option<String> = Option::deserialize(deserializer)?;
57    match opt {
58        None => Ok(None),
59        Some(s) => {
60            let decoded = BASE64_STANDARD
61                .decode(&s)
62                .map_err(serde::de::Error::custom)?;
63            Ok(Some(Bytes::from(decoded)))
64        }
65    }
66}
67
68/// Custom serde functions for partition_expr field in FileMeta
69fn serialize_partition_expr<S>(
70    partition_expr: &Option<PartitionExpr>,
71    serializer: S,
72) -> Result<S::Ok, S::Error>
73where
74    S: serde::Serializer,
75{
76    use serde::ser::Error;
77
78    match partition_expr {
79        None => serializer.serialize_none(),
80        Some(expr) => {
81            let json_str = expr.as_json_str().map_err(S::Error::custom)?;
82            serializer.serialize_some(&json_str)
83        }
84    }
85}
86
87fn deserialize_partition_expr<'de, D>(deserializer: D) -> Result<Option<PartitionExpr>, D::Error>
88where
89    D: serde::Deserializer<'de>,
90{
91    use serde::de::Error;
92
93    let opt_json_str: Option<String> = Option::deserialize(deserializer)?;
94    match opt_json_str {
95        None => Ok(None),
96        Some(json_str) => {
97            if json_str.is_empty() {
98                // Empty string represents explicit "single-region/no-partition" designation
99                Ok(None)
100            } else {
101                // Parse the JSON string to PartitionExpr
102                PartitionExpr::from_json_str(&json_str).map_err(D::Error::custom)
103            }
104        }
105    }
106}
107
108/// Type to store SST level.
109pub type Level = u8;
110/// Maximum level of SSTs.
111pub const MAX_LEVEL: Level = 2;
112/// Type to store index types for a column.
113pub type IndexTypes = SmallVec<[IndexType; 4]>;
114
115/// Cross-region file id.
116///
117/// It contains a region id and a file id. The string representation is `{region_id}/{file_id}`.
118#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
119pub struct RegionFileId {
120    /// The region that creates the file.
121    region_id: RegionId,
122    /// The id of the file.
123    file_id: FileId,
124}
125
126impl RegionFileId {
127    /// Creates a new [RegionFileId] from `region_id` and `file_id`.
128    pub fn new(region_id: RegionId, file_id: FileId) -> Self {
129        Self { region_id, file_id }
130    }
131
132    /// Gets the region id.
133    pub fn region_id(&self) -> RegionId {
134        self.region_id
135    }
136
137    /// Gets the file id.
138    pub fn file_id(&self) -> FileId {
139        self.file_id
140    }
141}
142
143impl fmt::Display for RegionFileId {
144    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
145        write!(f, "{}/{}", self.region_id, self.file_id)
146    }
147}
148
149/// Unique identifier for an index file, combining the SST file ID and the index version.
150#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
151pub struct RegionIndexId {
152    pub file_id: RegionFileId,
153    pub version: IndexVersion,
154}
155
156impl RegionIndexId {
157    pub fn new(file_id: RegionFileId, version: IndexVersion) -> Self {
158        Self { file_id, version }
159    }
160
161    pub fn region_id(&self) -> RegionId {
162        self.file_id.region_id
163    }
164
165    pub fn file_id(&self) -> FileId {
166        self.file_id.file_id
167    }
168}
169
170impl fmt::Display for RegionIndexId {
171    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
172        if self.version == 0 {
173            write!(f, "{}/{}", self.file_id.region_id, self.file_id.file_id)
174        } else {
175            write!(
176                f,
177                "{}/{}.{}",
178                self.file_id.region_id, self.file_id.file_id, self.version
179            )
180        }
181    }
182}
183
184/// Time range (min and max timestamps) of a SST file.
185/// Both min and max are inclusive.
186pub type FileTimeRange = (Timestamp, Timestamp);
187
188/// Checks if two inclusive timestamp ranges overlap with each other.
189pub(crate) fn overlaps(l: &FileTimeRange, r: &FileTimeRange) -> bool {
190    let (l, r) = if l.0 <= r.0 { (l, r) } else { (r, l) };
191    let (_, l_end) = l;
192    let (r_start, _) = r;
193
194    r_start <= l_end
195}
196
197/// Metadata of a SST file.
198#[derive(Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
199#[serde(default)]
200pub struct FileMeta {
201    /// Region that created the file. The region id may not be the id of the current region.
202    pub region_id: RegionId,
203    /// Compared to normal file names, FileId ignore the extension
204    pub file_id: FileId,
205    /// Timestamp range of file. The timestamps have the same time unit as the
206    /// data in the SST.
207    pub time_range: FileTimeRange,
208    /// SST level of the file.
209    pub level: Level,
210    /// Size of the file.
211    pub file_size: u64,
212    /// Maximum uncompressed row group size of the file. 0 means unknown.
213    pub max_row_group_uncompressed_size: u64,
214    /// Available indexes of the file.
215    pub available_indexes: IndexTypes,
216    /// Created indexes of the file for each column.
217    ///
218    /// This is essentially a more granular, column-level version of `available_indexes`,
219    /// primarily used for manual index building in the asynchronous index construction mode.
220    ///
221    /// For backward compatibility, older `FileMeta` versions might only contain `available_indexes`.
222    /// In such cases, we cannot deduce specific column index information from `available_indexes` alone.
223    /// Therefore, defaulting this `indexes` field to an empty list during deserialization is a
224    /// reasonable and necessary step to ensure column information consistency.
225    pub indexes: Vec<ColumnIndexMetadata>,
226    /// Size of the index file.
227    pub index_file_size: u64,
228    /// Version of the index file.
229    /// Used to generate the index file name: "{file_id}.{index_version}.puffin".
230    /// Default is 0 (which maps to "{file_id}.puffin" for compatibility).
231    pub index_version: u64,
232    /// Number of rows in the file.
233    ///
234    /// For historical reasons, this field might be missing in old files. Thus
235    /// the default value `0` doesn't means the file doesn't contains any rows,
236    /// but instead means the number of rows is unknown.
237    pub num_rows: u64,
238    /// Number of row groups in the file.
239    ///
240    /// For historical reasons, this field might be missing in old files. Thus
241    /// the default value `0` doesn't means the file doesn't contains any rows,
242    /// but instead means the number of rows is unknown.
243    pub num_row_groups: u64,
244    /// Sequence in this file.
245    ///
246    /// This sequence is the only sequence in this file. And it's retrieved from the max
247    /// sequence of the rows on generating this file.
248    pub sequence: Option<NonZeroU64>,
249    /// Partition expression from the region metadata when the file is created.
250    ///
251    /// This is stored as a PartitionExpr object in memory for convenience,
252    /// but serialized as JSON string for manifest compatibility.
253    /// Compatibility behavior:
254    /// - None: no partition expr was set when the file was created (legacy files).
255    /// - Some(expr): partition expression from region metadata.
256    #[serde(
257        serialize_with = "serialize_partition_expr",
258        deserialize_with = "deserialize_partition_expr"
259    )]
260    pub partition_expr: Option<PartitionExpr>,
261    /// Number of series in the file.
262    ///
263    /// The number is 0 if the series number is not available.
264    pub num_series: u64,
265    /// Minimum primary key value in the file, encoded as bytes.
266    /// `None` if the primary key range is not available (e.g., legacy files).
267    #[serde(
268        default,
269        skip_serializing_if = "Option::is_none",
270        serialize_with = "serialize_bytes_option",
271        deserialize_with = "deserialize_bytes_option"
272    )]
273    pub primary_key_min: Option<Bytes>,
274    /// Maximum primary key value in the file, encoded as bytes.
275    /// `None` if the primary key range is not available (e.g., legacy files).
276    #[serde(
277        default,
278        skip_serializing_if = "Option::is_none",
279        serialize_with = "serialize_bytes_option",
280        deserialize_with = "deserialize_bytes_option"
281    )]
282    pub primary_key_max: Option<Bytes>,
283}
284
285impl Debug for FileMeta {
286    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
287        let mut debug_struct = f.debug_struct("FileMeta");
288        debug_struct
289            .field("region_id", &self.region_id)
290            .field_with("file_id", |f| write!(f, "{} ", self.file_id))
291            .field_with("time_range", |f| {
292                write!(
293                    f,
294                    "({}, {}) ",
295                    self.time_range.0.to_iso8601_string(),
296                    self.time_range.1.to_iso8601_string()
297                )
298            })
299            .field("level", &self.level)
300            .field("file_size", &ReadableSize(self.file_size))
301            .field(
302                "max_row_group_uncompressed_size",
303                &ReadableSize(self.max_row_group_uncompressed_size),
304            );
305        if !self.available_indexes.is_empty() {
306            debug_struct
307                .field("available_indexes", &self.available_indexes)
308                .field("indexes", &self.indexes)
309                .field("index_file_size", &ReadableSize(self.index_file_size));
310        }
311        debug_struct
312            .field("num_rows", &self.num_rows)
313            .field("num_row_groups", &self.num_row_groups)
314            .field_with("sequence", |f| match self.sequence {
315                None => {
316                    write!(f, "None")
317                }
318                Some(seq) => {
319                    write!(f, "{}", seq)
320                }
321            })
322            .field("partition_expr", &self.partition_expr)
323            .field("num_series", &self.num_series);
324        if self.primary_key_min.is_some() || self.primary_key_max.is_some() {
325            debug_struct
326                .field(
327                    "primary_key_min",
328                    &self.primary_key_min.as_ref().map(|b| b.len()),
329                )
330                .field(
331                    "primary_key_max",
332                    &self.primary_key_max.as_ref().map(|b| b.len()),
333                );
334        }
335        debug_struct.finish()
336    }
337}
338
339/// Type of index.
340#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
341pub enum IndexType {
342    /// Inverted index.
343    InvertedIndex,
344    /// Full-text index.
345    FulltextIndex,
346    /// Bloom Filter index
347    BloomFilterIndex,
348    /// Vector index (HNSW).
349    #[cfg(feature = "vector_index")]
350    VectorIndex,
351}
352
353/// Metadata of indexes created for a specific column in an SST file.
354///
355/// This structure tracks which index types have been successfully created for a column.
356/// It provides more granular, column-level index information compared to the file-level
357/// `available_indexes` field in [`FileMeta`].
358///
359/// This is primarily used for:
360/// - Manual index building in asynchronous index construction mode
361/// - Verifying index consistency between files and region metadata
362#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
363#[serde(default)]
364pub struct ColumnIndexMetadata {
365    /// The column ID this index metadata applies to.
366    pub column_id: ColumnId,
367    /// List of index types that have been successfully created for this column.
368    pub created_indexes: IndexTypes,
369}
370
371impl FileMeta {
372    /// Returns the primary key range if both min and max are present.
373    pub fn primary_key_range(&self) -> Option<(Bytes, Bytes)> {
374        match (&self.primary_key_min, &self.primary_key_max) {
375            (Some(min), Some(max)) => Some((min.clone(), max.clone())),
376            _ => None,
377        }
378    }
379
380    pub fn exists_index(&self) -> bool {
381        !self.available_indexes.is_empty()
382    }
383
384    pub fn index_version(&self) -> Option<IndexVersion> {
385        if self.exists_index() {
386            Some(self.index_version)
387        } else {
388            None
389        }
390    }
391
392    /// Whether the index file is up-to-date comparing to another file meta.
393    pub fn is_index_up_to_date(&self, other: &FileMeta) -> bool {
394        self.exists_index() && other.exists_index() && self.index_version >= other.index_version
395    }
396
397    /// Returns true if the file has an inverted index
398    pub fn inverted_index_available(&self) -> bool {
399        self.available_indexes.contains(&IndexType::InvertedIndex)
400    }
401
402    /// Returns true if the file has a fulltext index
403    pub fn fulltext_index_available(&self) -> bool {
404        self.available_indexes.contains(&IndexType::FulltextIndex)
405    }
406
407    /// Returns true if the file has a bloom filter index.
408    pub fn bloom_filter_index_available(&self) -> bool {
409        self.available_indexes
410            .contains(&IndexType::BloomFilterIndex)
411    }
412
413    /// Returns true if the file has a vector index.
414    #[cfg(feature = "vector_index")]
415    pub fn vector_index_available(&self) -> bool {
416        self.available_indexes.contains(&IndexType::VectorIndex)
417    }
418
419    pub fn index_file_size(&self) -> u64 {
420        self.index_file_size
421    }
422
423    /// Check whether the file index is consistent with the given region metadata.
424    pub fn is_index_consistent_with_region(&self, metadata: &[ColumnMetadata]) -> bool {
425        let id_to_indexes = self
426            .indexes
427            .iter()
428            .map(|index| (index.column_id, index.created_indexes.clone()))
429            .collect::<std::collections::HashMap<_, _>>();
430        for column in metadata {
431            if !column.column_schema.is_indexed() {
432                continue;
433            }
434            if let Some(indexes) = id_to_indexes.get(&column.column_id) {
435                if column.column_schema.is_inverted_indexed()
436                    && !indexes.contains(&IndexType::InvertedIndex)
437                {
438                    return false;
439                }
440                if column.column_schema.is_fulltext_indexed()
441                    && !indexes.contains(&IndexType::FulltextIndex)
442                {
443                    return false;
444                }
445                if column.column_schema.is_skipping_indexed()
446                    && !indexes.contains(&IndexType::BloomFilterIndex)
447                {
448                    return false;
449                }
450            } else {
451                return false;
452            }
453        }
454        true
455    }
456
457    /// Returns the cross-region file id.
458    pub fn file_id(&self) -> RegionFileId {
459        RegionFileId::new(self.region_id, self.file_id)
460    }
461
462    /// Returns the RegionIndexId for this file.
463    pub fn index_id(&self) -> RegionIndexId {
464        RegionIndexId::new(self.file_id(), self.index_version)
465    }
466}
467
468/// Handle to a SST file.
469#[derive(Clone)]
470pub struct FileHandle {
471    inner: Arc<FileHandleInner>,
472}
473
474impl fmt::Debug for FileHandle {
475    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
476        f.debug_struct("FileHandle")
477            .field("meta", self.meta_ref())
478            .field("compacting", &self.compacting())
479            .field("deleted", &self.inner.deleted.load(Ordering::Relaxed))
480            .finish()
481    }
482}
483
484impl FileHandle {
485    pub fn new(meta: FileMeta, file_purger: FilePurgerRef) -> FileHandle {
486        let pk_range = meta.primary_key_range();
487        FileHandle {
488            inner: Arc::new(FileHandleInner::new(meta, file_purger, pk_range)),
489        }
490    }
491
492    #[cfg(test)]
493    pub fn new_with_primary_key_range(
494        meta: FileMeta,
495        file_purger: FilePurgerRef,
496        primary_key_range: Option<(Bytes, Bytes)>,
497    ) -> FileHandle {
498        FileHandle {
499            inner: Arc::new(FileHandleInner::new(meta, file_purger, primary_key_range)),
500        }
501    }
502
503    /// Returns the region id of the file.
504    pub fn region_id(&self) -> RegionId {
505        self.inner.meta.region_id
506    }
507
508    /// Returns the cross-region file id.
509    pub fn file_id(&self) -> RegionFileId {
510        RegionFileId::new(self.inner.meta.region_id, self.inner.meta.file_id)
511    }
512
513    /// Returns the RegionIndexId for this file.
514    pub fn index_id(&self) -> RegionIndexId {
515        RegionIndexId::new(self.file_id(), self.inner.meta.index_version)
516    }
517
518    /// Returns the complete file path of the file.
519    pub fn file_path(&self, table_dir: &str, path_type: PathType) -> String {
520        location::sst_file_path(table_dir, self.file_id(), path_type)
521    }
522
523    /// Returns the time range of the file.
524    pub fn time_range(&self) -> FileTimeRange {
525        self.inner.meta.time_range
526    }
527
528    /// Mark the file as deleted and will delete it on drop asynchronously
529    pub fn mark_deleted(&self) {
530        self.inner.deleted.store(true, Ordering::Relaxed);
531    }
532
533    pub fn compacting(&self) -> bool {
534        self.inner.compacting.load(Ordering::Relaxed)
535    }
536
537    pub fn set_compacting(&self, compacting: bool) {
538        self.inner.compacting.store(compacting, Ordering::Relaxed);
539    }
540
541    pub fn index_outdated(&self) -> bool {
542        self.inner.index_outdated.load(Ordering::Relaxed)
543    }
544
545    pub fn set_index_outdated(&self, index_outdated: bool) {
546        self.inner
547            .index_outdated
548            .store(index_outdated, Ordering::Relaxed);
549    }
550
551    /// Returns a reference to the [FileMeta].
552    pub fn meta_ref(&self) -> &FileMeta {
553        &self.inner.meta
554    }
555
556    pub fn file_purger(&self) -> FilePurgerRef {
557        self.inner.file_purger.clone()
558    }
559
560    pub fn size(&self) -> u64 {
561        self.inner.meta.file_size
562    }
563
564    pub fn index_size(&self) -> u64 {
565        self.inner.meta.index_file_size
566    }
567
568    pub fn num_rows(&self) -> usize {
569        self.inner.meta.num_rows as usize
570    }
571
572    pub fn level(&self) -> Level {
573        self.inner.meta.level
574    }
575
576    pub fn is_deleted(&self) -> bool {
577        self.inner.deleted.load(Ordering::Relaxed)
578    }
579
580    pub fn primary_key_range(&self) -> Option<(Bytes, Bytes)> {
581        self.inner.primary_key_range.read().unwrap().clone()
582    }
583
584    pub(crate) fn set_primary_key_range(&self, primary_key_range: (Bytes, Bytes)) {
585        *self.inner.primary_key_range.write().unwrap() = Some(primary_key_range);
586    }
587}
588
589/// Inner data of [FileHandle].
590///
591/// Contains meta of the file, and other mutable info like whether the file is compacting.
592struct FileHandleInner {
593    meta: FileMeta,
594    compacting: AtomicBool,
595    deleted: AtomicBool,
596    index_outdated: AtomicBool,
597    primary_key_range: RwLock<Option<(Bytes, Bytes)>>,
598    file_purger: FilePurgerRef,
599}
600
601impl Drop for FileHandleInner {
602    fn drop(&mut self) {
603        self.file_purger.remove_file(
604            self.meta.clone(),
605            self.deleted.load(Ordering::Acquire),
606            self.index_outdated.load(Ordering::Acquire),
607        );
608    }
609}
610
611impl FileHandleInner {
612    /// There should only be one `FileHandleInner` for each file on a datanode
613    fn new(
614        meta: FileMeta,
615        file_purger: FilePurgerRef,
616        primary_key_range: Option<(Bytes, Bytes)>,
617    ) -> FileHandleInner {
618        file_purger.new_file(&meta);
619        FileHandleInner {
620            meta,
621            compacting: AtomicBool::new(false),
622            deleted: AtomicBool::new(false),
623            index_outdated: AtomicBool::new(false),
624            primary_key_range: RwLock::new(primary_key_range),
625            file_purger,
626        }
627    }
628}
629
630/// Delete files for a region.
631/// - `region_id`: Region id.
632/// - `file_ids`: List of (file id, index version) tuples to delete.
633/// - `delete_index`: Whether to delete the index file from the cache.
634/// - `access_layer`: Access layer to delete files.
635/// - `cache_manager`: Cache manager to remove files from cache.
636pub async fn delete_files(
637    region_id: RegionId,
638    file_ids: &[(FileId, u64)],
639    delete_index: bool,
640    access_layer: &AccessLayerRef,
641    cache_manager: &Option<CacheManagerRef>,
642) -> crate::error::Result<()> {
643    // Remove meta of the file from cache.
644    if let Some(cache) = &cache_manager {
645        for (file_id, _) in file_ids {
646            cache.remove_parquet_meta_data(RegionFileId::new(region_id, *file_id));
647        }
648    }
649    let mut attempted_files = Vec::with_capacity(file_ids.len());
650    let mut index_ids = Vec::new();
651
652    for (file_id, index_version) in file_ids {
653        let region_file_id = RegionFileId::new(region_id, *file_id);
654        attempted_files.push(*file_id);
655        index_ids.extend(
656            (0..=*index_version).map(|version| RegionIndexId::new(region_file_id, version)),
657        );
658    }
659
660    access_layer
661        .delete_ssts(region_id, &attempted_files)
662        .await?;
663    access_layer.delete_indexes(&index_ids).await?;
664
665    debug!(
666        "Attempted to delete {} files for region {}: {:?}",
667        attempted_files.len(),
668        region_id,
669        attempted_files
670    );
671
672    for (file_id, index_version) in file_ids {
673        purge_index_cache_stager(
674            region_id,
675            delete_index,
676            access_layer,
677            cache_manager,
678            *file_id,
679            *index_version,
680        )
681        .await;
682    }
683    Ok(())
684}
685
686pub async fn delete_index(
687    region_index_id: RegionIndexId,
688    access_layer: &AccessLayerRef,
689    cache_manager: &Option<CacheManagerRef>,
690) -> crate::error::Result<()> {
691    delete_index_and_purge(region_index_id, access_layer, cache_manager).await?;
692
693    Ok(())
694}
695
696pub async fn delete_indexes(
697    index_ids: &[RegionIndexId],
698    access_layer: &AccessLayerRef,
699    cache_manager: &Option<CacheManagerRef>,
700) -> crate::error::Result<()> {
701    if index_ids.is_empty() {
702        return Ok(());
703    }
704
705    if let Err(e) = access_layer.delete_indexes(index_ids).await {
706        error!(e; "Failed to batch delete index files");
707
708        for index_id in index_ids {
709            delete_index_and_purge(*index_id, access_layer, cache_manager).await?;
710        }
711
712        return Ok(());
713    }
714
715    purge_indexes(index_ids, access_layer, cache_manager).await;
716
717    Ok(())
718}
719
720async fn delete_index_and_purge(
721    index_id: RegionIndexId,
722    access_layer: &AccessLayerRef,
723    cache_manager: &Option<CacheManagerRef>,
724) -> crate::error::Result<()> {
725    access_layer.delete_index(index_id).await?;
726    purge_index_cache_stager(
727        index_id.region_id(),
728        true,
729        access_layer,
730        cache_manager,
731        index_id.file_id(),
732        index_id.version,
733    )
734    .await;
735    Ok(())
736}
737
738async fn purge_indexes(
739    index_ids: &[RegionIndexId],
740    access_layer: &AccessLayerRef,
741    cache_manager: &Option<CacheManagerRef>,
742) {
743    for index_id in index_ids {
744        purge_index_cache_stager(
745            index_id.region_id(),
746            true,
747            access_layer,
748            cache_manager,
749            index_id.file_id(),
750            index_id.version,
751        )
752        .await;
753    }
754}
755
756async fn purge_index_cache_stager(
757    region_id: RegionId,
758    delete_index: bool,
759    access_layer: &AccessLayerRef,
760    cache_manager: &Option<CacheManagerRef>,
761    file_id: FileId,
762    index_version: u64,
763) {
764    if let Some(write_cache) = cache_manager.as_ref().and_then(|cache| cache.write_cache()) {
765        // Removes index file from the cache.
766        if delete_index {
767            write_cache
768                .remove(IndexKey::new(
769                    region_id,
770                    file_id,
771                    FileType::Puffin(index_version),
772                ))
773                .await;
774        }
775
776        // Remove the SST file from the cache.
777        write_cache
778            .remove(IndexKey::new(region_id, file_id, FileType::Parquet))
779            .await;
780    }
781
782    // Purges index content in the stager.
783    if let Err(e) = access_layer
784        .puffin_manager_factory()
785        .purge_stager(RegionIndexId::new(
786            RegionFileId::new(region_id, file_id),
787            index_version,
788        ))
789        .await
790    {
791        error!(e; "Failed to purge stager with index file, file_id: {}, index_version: {}, region: {}",
792                file_id, index_version, region_id);
793    }
794}
795
796#[cfg(test)]
797mod tests {
798    use std::str::FromStr;
799
800    use datatypes::prelude::ConcreteDataType;
801    use datatypes::schema::{
802        ColumnSchema, FulltextAnalyzer, FulltextBackend, FulltextOptions, SkippingIndexOptions,
803    };
804    use datatypes::value::Value;
805    use partition::expr::{PartitionExpr, col};
806
807    use super::*;
808
809    fn create_file_meta(file_id: FileId, level: Level) -> FileMeta {
810        FileMeta {
811            region_id: 0.into(),
812            file_id,
813            time_range: FileTimeRange::default(),
814            level,
815            file_size: 0,
816            max_row_group_uncompressed_size: 0,
817            available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
818            indexes: vec![ColumnIndexMetadata {
819                column_id: 0,
820                created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
821            }],
822            index_file_size: 0,
823            index_version: 0,
824            num_rows: 0,
825            num_row_groups: 0,
826            sequence: None,
827            partition_expr: None,
828            num_series: 0,
829            ..Default::default()
830        }
831    }
832
833    #[test]
834    fn test_deserialize_file_meta() {
835        let file_meta = create_file_meta(FileId::random(), 0);
836        let serialized_file_meta = serde_json::to_string(&file_meta).unwrap();
837        let deserialized_file_meta = serde_json::from_str(&serialized_file_meta);
838        assert_eq!(file_meta, deserialized_file_meta.unwrap());
839    }
840
841    #[test]
842    fn test_deserialize_from_string() {
843        let json_file_meta = "{\"region_id\":0,\"file_id\":\"bc5896ec-e4d8-4017-a80d-f2de73188d55\",\
844        \"time_range\":[{\"value\":0,\"unit\":\"Millisecond\"},{\"value\":0,\"unit\":\"Millisecond\"}],\
845        \"available_indexes\":[\"InvertedIndex\"],\"indexes\":[{\"column_id\": 0, \"created_indexes\": [\"InvertedIndex\"]}],\"level\":0}";
846        let file_meta = create_file_meta(
847            FileId::from_str("bc5896ec-e4d8-4017-a80d-f2de73188d55").unwrap(),
848            0,
849        );
850        let deserialized_file_meta: FileMeta = serde_json::from_str(json_file_meta).unwrap();
851        assert_eq!(file_meta, deserialized_file_meta);
852    }
853
854    #[test]
855    fn test_file_meta_with_partition_expr() {
856        let file_id = FileId::random();
857        let partition_expr = PartitionExpr::new(
858            col("a"),
859            partition::expr::RestrictedOp::GtEq,
860            Value::UInt32(10).into(),
861        );
862
863        let file_meta_with_partition = FileMeta {
864            region_id: 0.into(),
865            file_id,
866            time_range: FileTimeRange::default(),
867            level: 0,
868            file_size: 0,
869            max_row_group_uncompressed_size: 0,
870            available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
871            indexes: vec![ColumnIndexMetadata {
872                column_id: 0,
873                created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
874            }],
875            index_file_size: 0,
876            index_version: 0,
877            num_rows: 0,
878            num_row_groups: 0,
879            sequence: None,
880            partition_expr: Some(partition_expr.clone()),
881            num_series: 0,
882            ..Default::default()
883        };
884
885        // Test serialization/deserialization
886        let serialized = serde_json::to_string(&file_meta_with_partition).unwrap();
887        let deserialized: FileMeta = serde_json::from_str(&serialized).unwrap();
888        assert_eq!(file_meta_with_partition, deserialized);
889
890        // Verify the serialized JSON contains the expected partition expression string
891        let serialized_value: serde_json::Value = serde_json::from_str(&serialized).unwrap();
892        assert!(serialized_value["partition_expr"].as_str().is_some());
893        let partition_expr_json = serialized_value["partition_expr"].as_str().unwrap();
894        assert!(partition_expr_json.contains("\"Column\":\"a\""));
895        assert!(partition_expr_json.contains("\"op\":\"GtEq\""));
896
897        // Test with None (legacy files)
898        let file_meta_none = FileMeta {
899            partition_expr: None,
900            ..file_meta_with_partition.clone()
901        };
902        let serialized_none = serde_json::to_string(&file_meta_none).unwrap();
903        let deserialized_none: FileMeta = serde_json::from_str(&serialized_none).unwrap();
904        assert_eq!(file_meta_none, deserialized_none);
905    }
906
907    #[test]
908    fn test_file_meta_partition_expr_backward_compatibility() {
909        // Test that we can deserialize old JSON format with partition_expr as string
910        let json_with_partition_expr = r#"{
911            "region_id": 0,
912            "file_id": "bc5896ec-e4d8-4017-a80d-f2de73188d55",
913            "time_range": [
914                {"value": 0, "unit": "Millisecond"},
915                {"value": 0, "unit": "Millisecond"}
916            ],
917            "level": 0,
918            "file_size": 0,
919            "available_indexes": ["InvertedIndex"],
920            "index_file_size": 0,
921            "num_rows": 0,
922            "num_row_groups": 0,
923            "sequence": null,
924            "partition_expr": "{\"Expr\":{\"lhs\":{\"Column\":\"a\"},\"op\":\"GtEq\",\"rhs\":{\"Value\":{\"UInt32\":10}}}}"
925        }"#;
926
927        let file_meta: FileMeta = serde_json::from_str(json_with_partition_expr).unwrap();
928        assert!(file_meta.partition_expr.is_some());
929        let expr = file_meta.partition_expr.unwrap();
930        assert_eq!(format!("{}", expr), "a >= 10");
931
932        // Test empty partition expression string
933        let json_with_empty_expr = r#"{
934            "region_id": 0,
935            "file_id": "bc5896ec-e4d8-4017-a80d-f2de73188d55",
936            "time_range": [
937                {"value": 0, "unit": "Millisecond"},
938                {"value": 0, "unit": "Millisecond"}
939            ],
940            "level": 0,
941            "file_size": 0,
942            "available_indexes": [],
943            "index_file_size": 0,
944            "num_rows": 0,
945            "num_row_groups": 0,
946            "sequence": null,
947            "partition_expr": ""
948        }"#;
949
950        let file_meta_empty: FileMeta = serde_json::from_str(json_with_empty_expr).unwrap();
951        assert!(file_meta_empty.partition_expr.is_none());
952
953        // Test null partition expression
954        let json_with_null_expr = r#"{
955            "region_id": 0,
956            "file_id": "bc5896ec-e4d8-4017-a80d-f2de73188d55",
957            "time_range": [
958                {"value": 0, "unit": "Millisecond"},
959                {"value": 0, "unit": "Millisecond"}
960            ],
961            "level": 0,
962            "file_size": 0,
963            "available_indexes": [],
964            "index_file_size": 0,
965            "num_rows": 0,
966            "num_row_groups": 0,
967            "sequence": null,
968            "partition_expr": null
969        }"#;
970
971        let file_meta_null: FileMeta = serde_json::from_str(json_with_null_expr).unwrap();
972        assert!(file_meta_null.partition_expr.is_none());
973
974        // Test partition expression doesn't exist
975        let json_with_empty_expr = r#"{
976            "region_id": 0,
977            "file_id": "bc5896ec-e4d8-4017-a80d-f2de73188d55",
978            "time_range": [
979                {"value": 0, "unit": "Millisecond"},
980                {"value": 0, "unit": "Millisecond"}
981            ],
982            "level": 0,
983            "file_size": 0,
984            "available_indexes": [],
985            "index_file_size": 0,
986            "num_rows": 0,
987            "num_row_groups": 0,
988            "sequence": null
989        }"#;
990
991        let file_meta_empty: FileMeta = serde_json::from_str(json_with_empty_expr).unwrap();
992        assert!(file_meta_empty.partition_expr.is_none());
993    }
994
995    #[test]
996    fn test_file_meta_indexes_backward_compatibility() {
997        // Old FileMeta format without the 'indexes' field
998        let json_old_file_meta = r#"{
999            "region_id": 0,
1000            "file_id": "bc5896ec-e4d8-4017-a80d-f2de73188d55",
1001            "time_range": [
1002                {"value": 0, "unit": "Millisecond"},
1003                {"value": 0, "unit": "Millisecond"}
1004            ],
1005            "available_indexes": ["InvertedIndex"],
1006            "level": 0,
1007            "file_size": 0,
1008            "index_file_size": 0,
1009            "num_rows": 0,
1010            "num_row_groups": 0
1011        }"#;
1012
1013        let deserialized_file_meta: FileMeta = serde_json::from_str(json_old_file_meta).unwrap();
1014
1015        // Verify backward compatibility: indexes field should default to empty vec
1016        assert_eq!(deserialized_file_meta.indexes, vec![]);
1017
1018        let expected_indexes: IndexTypes = SmallVec::from_iter([IndexType::InvertedIndex]);
1019        assert_eq!(deserialized_file_meta.available_indexes, expected_indexes);
1020
1021        assert_eq!(
1022            deserialized_file_meta.file_id,
1023            FileId::from_str("bc5896ec-e4d8-4017-a80d-f2de73188d55").unwrap()
1024        );
1025    }
1026    #[test]
1027    fn test_is_index_consistent_with_region() {
1028        fn new_column_meta(
1029            id: ColumnId,
1030            name: &str,
1031            inverted: bool,
1032            fulltext: bool,
1033            skipping: bool,
1034        ) -> ColumnMetadata {
1035            let mut column_schema =
1036                ColumnSchema::new(name, ConcreteDataType::string_datatype(), true);
1037            if inverted {
1038                column_schema = column_schema.with_inverted_index(true);
1039            }
1040            if fulltext {
1041                column_schema = column_schema
1042                    .with_fulltext_options(FulltextOptions::new_unchecked(
1043                        true,
1044                        FulltextAnalyzer::English,
1045                        false,
1046                        FulltextBackend::Bloom,
1047                        1000,
1048                        0.01,
1049                    ))
1050                    .unwrap();
1051            }
1052            if skipping {
1053                column_schema = column_schema
1054                    .with_skipping_options(SkippingIndexOptions::new_unchecked(
1055                        1024,
1056                        0.01,
1057                        datatypes::schema::SkippingIndexType::BloomFilter,
1058                    ))
1059                    .unwrap();
1060            }
1061
1062            ColumnMetadata {
1063                column_schema,
1064                semantic_type: api::v1::SemanticType::Tag,
1065                column_id: id,
1066            }
1067        }
1068
1069        // Case 1: Perfect match. File has exactly the required indexes.
1070        let mut file_meta = FileMeta {
1071            indexes: vec![ColumnIndexMetadata {
1072                column_id: 1,
1073                created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
1074            }],
1075            ..Default::default()
1076        };
1077        let region_meta = vec![new_column_meta(1, "tag1", true, false, false)];
1078        assert!(file_meta.is_index_consistent_with_region(&region_meta));
1079
1080        // Case 2: Superset match. File has more indexes than required.
1081        file_meta.indexes = vec![ColumnIndexMetadata {
1082            column_id: 1,
1083            created_indexes: SmallVec::from_iter([
1084                IndexType::InvertedIndex,
1085                IndexType::BloomFilterIndex,
1086            ]),
1087        }];
1088        let region_meta = vec![new_column_meta(1, "tag1", true, false, false)];
1089        assert!(file_meta.is_index_consistent_with_region(&region_meta));
1090
1091        // Case 3: Missing index type. File has the column but lacks the required index type.
1092        file_meta.indexes = vec![ColumnIndexMetadata {
1093            column_id: 1,
1094            created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
1095        }];
1096        let region_meta = vec![new_column_meta(1, "tag1", true, true, false)]; // Requires fulltext too
1097        assert!(!file_meta.is_index_consistent_with_region(&region_meta));
1098
1099        // Case 4: Missing column. Region requires an index on a column not in the file's index list.
1100        file_meta.indexes = vec![ColumnIndexMetadata {
1101            column_id: 2, // File only has index for column 2
1102            created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
1103        }];
1104        let region_meta = vec![new_column_meta(1, "tag1", true, false, false)]; // Requires index on column 1
1105        assert!(!file_meta.is_index_consistent_with_region(&region_meta));
1106
1107        // Case 5: No indexes required by region. Should always be consistent.
1108        file_meta.indexes = vec![ColumnIndexMetadata {
1109            column_id: 1,
1110            created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
1111        }];
1112        let region_meta = vec![new_column_meta(1, "tag1", false, false, false)]; // No index required
1113        assert!(file_meta.is_index_consistent_with_region(&region_meta));
1114
1115        // Case 6: Empty file indexes. Region requires an index.
1116        file_meta.indexes = vec![];
1117        let region_meta = vec![new_column_meta(1, "tag1", true, false, false)];
1118        assert!(!file_meta.is_index_consistent_with_region(&region_meta));
1119
1120        // Case 7: Multiple columns, one is inconsistent.
1121        file_meta.indexes = vec![
1122            ColumnIndexMetadata {
1123                column_id: 1,
1124                created_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
1125            },
1126            ColumnIndexMetadata {
1127                column_id: 2, // Column 2 is missing the required BloomFilterIndex
1128                created_indexes: SmallVec::from_iter([IndexType::FulltextIndex]),
1129            },
1130        ];
1131        let region_meta = vec![
1132            new_column_meta(1, "tag1", true, false, false),
1133            new_column_meta(2, "tag2", false, true, true), // Requires Fulltext and BloomFilter
1134        ];
1135        assert!(!file_meta.is_index_consistent_with_region(&region_meta));
1136    }
1137}