mito2/sst/
file.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Structures to describe metadata of files.
16
17use std::fmt;
18use std::fmt::{Debug, Formatter};
19use std::num::NonZeroU64;
20use std::sync::Arc;
21use std::sync::atomic::{AtomicBool, Ordering};
22
23use common_base::readable_size::ReadableSize;
24use common_telemetry::{error, info};
25use common_time::Timestamp;
26use partition::expr::PartitionExpr;
27use serde::{Deserialize, Serialize};
28use smallvec::SmallVec;
29use store_api::region_request::PathType;
30use store_api::storage::{FileId, RegionId};
31
32use crate::access_layer::AccessLayerRef;
33use crate::cache::CacheManagerRef;
34use crate::cache::file_cache::{FileType, IndexKey};
35use crate::sst::file_purger::FilePurgerRef;
36use crate::sst::location;
37
38/// Custom serde functions for partition_expr field in FileMeta
39fn serialize_partition_expr<S>(
40    partition_expr: &Option<PartitionExpr>,
41    serializer: S,
42) -> Result<S::Ok, S::Error>
43where
44    S: serde::Serializer,
45{
46    use serde::ser::Error;
47
48    match partition_expr {
49        None => serializer.serialize_none(),
50        Some(expr) => {
51            let json_str = expr.as_json_str().map_err(S::Error::custom)?;
52            serializer.serialize_some(&json_str)
53        }
54    }
55}
56
57fn deserialize_partition_expr<'de, D>(deserializer: D) -> Result<Option<PartitionExpr>, D::Error>
58where
59    D: serde::Deserializer<'de>,
60{
61    use serde::de::Error;
62
63    let opt_json_str: Option<String> = Option::deserialize(deserializer)?;
64    match opt_json_str {
65        None => Ok(None),
66        Some(json_str) => {
67            if json_str.is_empty() {
68                // Empty string represents explicit "single-region/no-partition" designation
69                Ok(None)
70            } else {
71                // Parse the JSON string to PartitionExpr
72                PartitionExpr::from_json_str(&json_str).map_err(D::Error::custom)
73            }
74        }
75    }
76}
77
78/// Type to store SST level.
79pub type Level = u8;
80/// Maximum level of SSTs.
81pub const MAX_LEVEL: Level = 2;
82
83/// Cross-region file id.
84///
85/// It contains a region id and a file id. The string representation is `{region_id}/{file_id}`.
86#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
87pub struct RegionFileId {
88    /// The region that creates the file.
89    region_id: RegionId,
90    /// The id of the file.
91    file_id: FileId,
92}
93
94impl RegionFileId {
95    /// Creates a new [RegionFileId] from `region_id` and `file_id`.
96    pub fn new(region_id: RegionId, file_id: FileId) -> Self {
97        Self { region_id, file_id }
98    }
99
100    /// Gets the region id.
101    pub fn region_id(&self) -> RegionId {
102        self.region_id
103    }
104
105    /// Gets the file id.
106    pub fn file_id(&self) -> FileId {
107        self.file_id
108    }
109}
110
111impl fmt::Display for RegionFileId {
112    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
113        write!(f, "{}/{}", self.region_id, self.file_id)
114    }
115}
116
117/// Time range (min and max timestamps) of a SST file.
118/// Both min and max are inclusive.
119pub type FileTimeRange = (Timestamp, Timestamp);
120
121/// Checks if two inclusive timestamp ranges overlap with each other.
122pub(crate) fn overlaps(l: &FileTimeRange, r: &FileTimeRange) -> bool {
123    let (l, r) = if l.0 <= r.0 { (l, r) } else { (r, l) };
124    let (_, l_end) = l;
125    let (r_start, _) = r;
126
127    r_start <= l_end
128}
129
130/// Metadata of a SST file.
131#[derive(Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
132#[serde(default)]
133pub struct FileMeta {
134    /// Region that created the file. The region id may not be the id of the current region.
135    pub region_id: RegionId,
136    /// Compared to normal file names, FileId ignore the extension
137    pub file_id: FileId,
138    /// Timestamp range of file. The timestamps have the same time unit as the
139    /// data in the SST.
140    pub time_range: FileTimeRange,
141    /// SST level of the file.
142    pub level: Level,
143    /// Size of the file.
144    pub file_size: u64,
145    /// Available indexes of the file.
146    pub available_indexes: SmallVec<[IndexType; 4]>,
147    /// Size of the index file.
148    pub index_file_size: u64,
149    /// File ID of the index file.
150    ///
151    /// When this field is None, it means the index file id is the same as the file id.
152    /// Only meaningful when index_file_size > 0.
153    /// Used for rebuilding index files.
154    pub index_file_id: Option<FileId>,
155    /// Number of rows in the file.
156    ///
157    /// For historical reasons, this field might be missing in old files. Thus
158    /// the default value `0` doesn't means the file doesn't contains any rows,
159    /// but instead means the number of rows is unknown.
160    pub num_rows: u64,
161    /// Number of row groups in the file.
162    ///
163    /// For historical reasons, this field might be missing in old files. Thus
164    /// the default value `0` doesn't means the file doesn't contains any rows,
165    /// but instead means the number of rows is unknown.
166    pub num_row_groups: u64,
167    /// Sequence in this file.
168    ///
169    /// This sequence is the only sequence in this file. And it's retrieved from the max
170    /// sequence of the rows on generating this file.
171    pub sequence: Option<NonZeroU64>,
172    /// Partition expression from the region metadata when the file is created.
173    ///
174    /// This is stored as a PartitionExpr object in memory for convenience,
175    /// but serialized as JSON string for manifest compatibility.
176    /// Compatibility behavior:
177    /// - None: no partition expr was set when the file was created (legacy files).
178    /// - Some(expr): partition expression from region metadata.
179    #[serde(
180        serialize_with = "serialize_partition_expr",
181        deserialize_with = "deserialize_partition_expr"
182    )]
183    pub partition_expr: Option<PartitionExpr>,
184    /// Number of series in the file.
185    ///
186    /// The number is 0 if the series number is not available.
187    pub num_series: u64,
188}
189
190impl Debug for FileMeta {
191    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
192        let mut debug_struct = f.debug_struct("FileMeta");
193        debug_struct
194            .field("region_id", &self.region_id)
195            .field_with("file_id", |f| write!(f, "{} ", self.file_id))
196            .field_with("time_range", |f| {
197                write!(
198                    f,
199                    "({}, {}) ",
200                    self.time_range.0.to_iso8601_string(),
201                    self.time_range.1.to_iso8601_string()
202                )
203            })
204            .field("level", &self.level)
205            .field("file_size", &ReadableSize(self.file_size));
206        if !self.available_indexes.is_empty() {
207            debug_struct
208                .field("available_indexes", &self.available_indexes)
209                .field("index_file_size", &ReadableSize(self.index_file_size));
210        }
211        debug_struct
212            .field("num_rows", &self.num_rows)
213            .field("num_row_groups", &self.num_row_groups)
214            .field_with("sequence", |f| match self.sequence {
215                None => {
216                    write!(f, "None")
217                }
218                Some(seq) => {
219                    write!(f, "{}", seq)
220                }
221            })
222            .field("partition_expr", &self.partition_expr)
223            .field("num_series", &self.num_series)
224            .finish()
225    }
226}
227
228/// Type of index.
229#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
230pub enum IndexType {
231    /// Inverted index.
232    InvertedIndex,
233    /// Full-text index.
234    FulltextIndex,
235    /// Bloom Filter index
236    BloomFilterIndex,
237}
238
239impl FileMeta {
240    pub fn exists_index(&self) -> bool {
241        !self.available_indexes.is_empty()
242    }
243
244    /// Returns true if the file has an inverted index
245    pub fn inverted_index_available(&self) -> bool {
246        self.available_indexes.contains(&IndexType::InvertedIndex)
247    }
248
249    /// Returns true if the file has a fulltext index
250    pub fn fulltext_index_available(&self) -> bool {
251        self.available_indexes.contains(&IndexType::FulltextIndex)
252    }
253
254    /// Returns true if the file has a bloom filter index.
255    pub fn bloom_filter_index_available(&self) -> bool {
256        self.available_indexes
257            .contains(&IndexType::BloomFilterIndex)
258    }
259
260    pub fn index_file_size(&self) -> u64 {
261        self.index_file_size
262    }
263
264    /// Returns the cross-region file id.
265    pub fn file_id(&self) -> RegionFileId {
266        RegionFileId::new(self.region_id, self.file_id)
267    }
268
269    /// Returns the cross-region index file id.
270    /// If the index file id is not set, returns the file id.
271    pub fn index_file_id(&self) -> RegionFileId {
272        if let Some(index_file_id) = self.index_file_id {
273            RegionFileId::new(self.region_id, index_file_id)
274        } else {
275            self.file_id()
276        }
277    }
278}
279
280/// Handle to a SST file.
281#[derive(Clone)]
282pub struct FileHandle {
283    inner: Arc<FileHandleInner>,
284}
285
286impl fmt::Debug for FileHandle {
287    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
288        f.debug_struct("FileHandle")
289            .field("meta", self.meta_ref())
290            .field("compacting", &self.compacting())
291            .field("deleted", &self.inner.deleted.load(Ordering::Relaxed))
292            .finish()
293    }
294}
295
296impl FileHandle {
297    pub fn new(meta: FileMeta, file_purger: FilePurgerRef) -> FileHandle {
298        FileHandle {
299            inner: Arc::new(FileHandleInner::new(meta, file_purger)),
300        }
301    }
302
303    /// Returns the region id of the file.
304    pub fn region_id(&self) -> RegionId {
305        self.inner.meta.region_id
306    }
307
308    /// Returns the cross-region file id.
309    pub fn file_id(&self) -> RegionFileId {
310        RegionFileId::new(self.inner.meta.region_id, self.inner.meta.file_id)
311    }
312
313    /// Returns the cross-region index file id.
314    /// If the index file id is not set, returns the file id.
315    pub fn index_file_id(&self) -> RegionFileId {
316        if let Some(index_file_id) = self.inner.meta.index_file_id {
317            RegionFileId::new(self.inner.meta.region_id, index_file_id)
318        } else {
319            self.file_id()
320        }
321    }
322
323    /// Returns the complete file path of the file.
324    pub fn file_path(&self, table_dir: &str, path_type: PathType) -> String {
325        location::sst_file_path(table_dir, self.file_id(), path_type)
326    }
327
328    /// Returns the time range of the file.
329    pub fn time_range(&self) -> FileTimeRange {
330        self.inner.meta.time_range
331    }
332
333    /// Mark the file as deleted and will delete it on drop asynchronously
334    pub fn mark_deleted(&self) {
335        self.inner.deleted.store(true, Ordering::Relaxed);
336    }
337
338    pub fn compacting(&self) -> bool {
339        self.inner.compacting.load(Ordering::Relaxed)
340    }
341
342    pub fn set_compacting(&self, compacting: bool) {
343        self.inner.compacting.store(compacting, Ordering::Relaxed);
344    }
345
346    /// Returns a reference to the [FileMeta].
347    pub fn meta_ref(&self) -> &FileMeta {
348        &self.inner.meta
349    }
350
351    pub fn file_purger(&self) -> FilePurgerRef {
352        self.inner.file_purger.clone()
353    }
354
355    pub fn size(&self) -> u64 {
356        self.inner.meta.file_size
357    }
358
359    pub fn index_size(&self) -> u64 {
360        self.inner.meta.index_file_size
361    }
362
363    pub fn num_rows(&self) -> usize {
364        self.inner.meta.num_rows as usize
365    }
366
367    pub fn level(&self) -> Level {
368        self.inner.meta.level
369    }
370
371    pub fn is_deleted(&self) -> bool {
372        self.inner.deleted.load(Ordering::Relaxed)
373    }
374}
375
376/// Inner data of [FileHandle].
377///
378/// Contains meta of the file, and other mutable info like whether the file is compacting.
379struct FileHandleInner {
380    meta: FileMeta,
381    compacting: AtomicBool,
382    deleted: AtomicBool,
383    file_purger: FilePurgerRef,
384}
385
386impl Drop for FileHandleInner {
387    fn drop(&mut self) {
388        self.file_purger
389            .remove_file(self.meta.clone(), self.deleted.load(Ordering::Relaxed));
390    }
391}
392
393impl FileHandleInner {
394    fn new(meta: FileMeta, file_purger: FilePurgerRef) -> FileHandleInner {
395        file_purger.new_file(&meta);
396        FileHandleInner {
397            meta,
398            compacting: AtomicBool::new(false),
399            deleted: AtomicBool::new(false),
400            file_purger,
401        }
402    }
403}
404
405/// Delete
406pub async fn delete_files(
407    region_id: RegionId,
408    file_ids: &[(FileId, FileId)],
409    delete_index: bool,
410    access_layer: &AccessLayerRef,
411    cache_manager: &Option<CacheManagerRef>,
412) -> crate::error::Result<()> {
413    // Remove meta of the file from cache.
414    if let Some(cache) = &cache_manager {
415        for (file_id, _) in file_ids {
416            cache.remove_parquet_meta_data(RegionFileId::new(region_id, *file_id));
417        }
418    }
419    let mut deleted_files = Vec::with_capacity(file_ids.len());
420
421    for (file_id, index_file_id) in file_ids {
422        let region_file_id = RegionFileId::new(region_id, *file_id);
423        match access_layer
424            .delete_sst(
425                &RegionFileId::new(region_id, *file_id),
426                &RegionFileId::new(region_id, *index_file_id),
427            )
428            .await
429        {
430            Ok(_) => {
431                deleted_files.push(*file_id);
432            }
433            Err(e) => {
434                error!(e; "Failed to delete sst and index file for {}", region_file_id);
435            }
436        }
437    }
438
439    info!(
440        "Deleted {} files for region {}: {:?}",
441        deleted_files.len(),
442        region_id,
443        deleted_files
444    );
445
446    for (file_id, index_file_id) in file_ids {
447        if let Some(write_cache) = cache_manager.as_ref().and_then(|cache| cache.write_cache()) {
448            // Removes index file from the cache.
449            if delete_index {
450                write_cache
451                    .remove(IndexKey::new(region_id, *index_file_id, FileType::Puffin))
452                    .await;
453            }
454
455            // Remove the SST file from the cache.
456            write_cache
457                .remove(IndexKey::new(region_id, *file_id, FileType::Parquet))
458                .await;
459        }
460
461        // Purges index content in the stager.
462        if let Err(e) = access_layer
463            .puffin_manager_factory()
464            .purge_stager(RegionFileId::new(region_id, *index_file_id))
465            .await
466        {
467            error!(e; "Failed to purge stager with index file, file_id: {}, region: {}",
468                    index_file_id, region_id);
469        }
470    }
471    Ok(())
472}
473
474#[cfg(test)]
475mod tests {
476    use std::str::FromStr;
477
478    use datatypes::value::Value;
479    use partition::expr::{PartitionExpr, col};
480
481    use super::*;
482
483    fn create_file_meta(file_id: FileId, level: Level) -> FileMeta {
484        FileMeta {
485            region_id: 0.into(),
486            file_id,
487            time_range: FileTimeRange::default(),
488            level,
489            file_size: 0,
490            available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
491            index_file_size: 0,
492            index_file_id: None,
493            num_rows: 0,
494            num_row_groups: 0,
495            sequence: None,
496            partition_expr: None,
497            num_series: 0,
498        }
499    }
500
501    #[test]
502    fn test_deserialize_file_meta() {
503        let file_meta = create_file_meta(FileId::random(), 0);
504        let serialized_file_meta = serde_json::to_string(&file_meta).unwrap();
505        let deserialized_file_meta = serde_json::from_str(&serialized_file_meta);
506        assert_eq!(file_meta, deserialized_file_meta.unwrap());
507    }
508
509    #[test]
510    fn test_deserialize_from_string() {
511        let json_file_meta = "{\"region_id\":0,\"file_id\":\"bc5896ec-e4d8-4017-a80d-f2de73188d55\",\
512        \"time_range\":[{\"value\":0,\"unit\":\"Millisecond\"},{\"value\":0,\"unit\":\"Millisecond\"}],\
513        \"available_indexes\":[\"InvertedIndex\"],\"level\":0}";
514        let file_meta = create_file_meta(
515            FileId::from_str("bc5896ec-e4d8-4017-a80d-f2de73188d55").unwrap(),
516            0,
517        );
518        let deserialized_file_meta: FileMeta = serde_json::from_str(json_file_meta).unwrap();
519        assert_eq!(file_meta, deserialized_file_meta);
520    }
521
522    #[test]
523    fn test_file_meta_with_partition_expr() {
524        let file_id = FileId::random();
525        let partition_expr = PartitionExpr::new(
526            col("a"),
527            partition::expr::RestrictedOp::GtEq,
528            Value::UInt32(10).into(),
529        );
530
531        let file_meta_with_partition = FileMeta {
532            region_id: 0.into(),
533            file_id,
534            time_range: FileTimeRange::default(),
535            level: 0,
536            file_size: 0,
537            available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
538            index_file_size: 0,
539            index_file_id: None,
540            num_rows: 0,
541            num_row_groups: 0,
542            sequence: None,
543            partition_expr: Some(partition_expr.clone()),
544            num_series: 0,
545        };
546
547        // Test serialization/deserialization
548        let serialized = serde_json::to_string(&file_meta_with_partition).unwrap();
549        let deserialized: FileMeta = serde_json::from_str(&serialized).unwrap();
550        assert_eq!(file_meta_with_partition, deserialized);
551
552        // Verify the serialized JSON contains the expected partition expression string
553        let serialized_value: serde_json::Value = serde_json::from_str(&serialized).unwrap();
554        assert!(serialized_value["partition_expr"].as_str().is_some());
555        let partition_expr_json = serialized_value["partition_expr"].as_str().unwrap();
556        assert!(partition_expr_json.contains("\"Column\":\"a\""));
557        assert!(partition_expr_json.contains("\"op\":\"GtEq\""));
558
559        // Test with None (legacy files)
560        let file_meta_none = FileMeta {
561            partition_expr: None,
562            ..file_meta_with_partition.clone()
563        };
564        let serialized_none = serde_json::to_string(&file_meta_none).unwrap();
565        let deserialized_none: FileMeta = serde_json::from_str(&serialized_none).unwrap();
566        assert_eq!(file_meta_none, deserialized_none);
567    }
568
569    #[test]
570    fn test_file_meta_partition_expr_backward_compatibility() {
571        // Test that we can deserialize old JSON format with partition_expr as string
572        let json_with_partition_expr = r#"{
573            "region_id": 0,
574            "file_id": "bc5896ec-e4d8-4017-a80d-f2de73188d55",
575            "time_range": [
576                {"value": 0, "unit": "Millisecond"},
577                {"value": 0, "unit": "Millisecond"}
578            ],
579            "level": 0,
580            "file_size": 0,
581            "available_indexes": ["InvertedIndex"],
582            "index_file_size": 0,
583            "num_rows": 0,
584            "num_row_groups": 0,
585            "sequence": null,
586            "partition_expr": "{\"Expr\":{\"lhs\":{\"Column\":\"a\"},\"op\":\"GtEq\",\"rhs\":{\"Value\":{\"UInt32\":10}}}}"
587        }"#;
588
589        let file_meta: FileMeta = serde_json::from_str(json_with_partition_expr).unwrap();
590        assert!(file_meta.partition_expr.is_some());
591        let expr = file_meta.partition_expr.unwrap();
592        assert_eq!(format!("{}", expr), "a >= 10");
593
594        // Test empty partition expression string
595        let json_with_empty_expr = r#"{
596            "region_id": 0,
597            "file_id": "bc5896ec-e4d8-4017-a80d-f2de73188d55",
598            "time_range": [
599                {"value": 0, "unit": "Millisecond"},
600                {"value": 0, "unit": "Millisecond"}
601            ],
602            "level": 0,
603            "file_size": 0,
604            "available_indexes": [],
605            "index_file_size": 0,
606            "num_rows": 0,
607            "num_row_groups": 0,
608            "sequence": null,
609            "partition_expr": ""
610        }"#;
611
612        let file_meta_empty: FileMeta = serde_json::from_str(json_with_empty_expr).unwrap();
613        assert!(file_meta_empty.partition_expr.is_none());
614
615        // Test null partition expression
616        let json_with_null_expr = r#"{
617            "region_id": 0,
618            "file_id": "bc5896ec-e4d8-4017-a80d-f2de73188d55",
619            "time_range": [
620                {"value": 0, "unit": "Millisecond"},
621                {"value": 0, "unit": "Millisecond"}
622            ],
623            "level": 0,
624            "file_size": 0,
625            "available_indexes": [],
626            "index_file_size": 0,
627            "num_rows": 0,
628            "num_row_groups": 0,
629            "sequence": null,
630            "partition_expr": null
631        }"#;
632
633        let file_meta_null: FileMeta = serde_json::from_str(json_with_null_expr).unwrap();
634        assert!(file_meta_null.partition_expr.is_none());
635
636        // Test partition expression doesn't exist
637        let json_with_empty_expr = r#"{
638            "region_id": 0,
639            "file_id": "bc5896ec-e4d8-4017-a80d-f2de73188d55",
640            "time_range": [
641                {"value": 0, "unit": "Millisecond"},
642                {"value": 0, "unit": "Millisecond"}
643            ],
644            "level": 0,
645            "file_size": 0,
646            "available_indexes": [],
647            "index_file_size": 0,
648            "num_rows": 0,
649            "num_row_groups": 0,
650            "sequence": null
651        }"#;
652
653        let file_meta_empty: FileMeta = serde_json::from_str(json_with_empty_expr).unwrap();
654        assert!(file_meta_empty.partition_expr.is_none());
655    }
656}