mito2/sst/
file.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Structures to describe metadata of files.
16
17use std::fmt;
18use std::fmt::{Debug, Formatter};
19use std::num::NonZeroU64;
20use std::sync::Arc;
21use std::sync::atomic::{AtomicBool, Ordering};
22
23use common_base::readable_size::ReadableSize;
24use common_telemetry::{error, info};
25use common_time::Timestamp;
26use partition::expr::PartitionExpr;
27use serde::{Deserialize, Serialize};
28use smallvec::SmallVec;
29use store_api::region_request::PathType;
30use store_api::storage::{FileId, RegionId};
31
32use crate::access_layer::AccessLayerRef;
33use crate::cache::CacheManagerRef;
34use crate::cache::file_cache::{FileType, IndexKey};
35use crate::sst::file_purger::FilePurgerRef;
36use crate::sst::location;
37
38/// Custom serde functions for partition_expr field in FileMeta
39fn serialize_partition_expr<S>(
40    partition_expr: &Option<PartitionExpr>,
41    serializer: S,
42) -> Result<S::Ok, S::Error>
43where
44    S: serde::Serializer,
45{
46    use serde::ser::Error;
47
48    match partition_expr {
49        None => serializer.serialize_none(),
50        Some(expr) => {
51            let json_str = expr.as_json_str().map_err(S::Error::custom)?;
52            serializer.serialize_some(&json_str)
53        }
54    }
55}
56
57fn deserialize_partition_expr<'de, D>(deserializer: D) -> Result<Option<PartitionExpr>, D::Error>
58where
59    D: serde::Deserializer<'de>,
60{
61    use serde::de::Error;
62
63    let opt_json_str: Option<String> = Option::deserialize(deserializer)?;
64    match opt_json_str {
65        None => Ok(None),
66        Some(json_str) => {
67            if json_str.is_empty() {
68                // Empty string represents explicit "single-region/no-partition" designation
69                Ok(None)
70            } else {
71                // Parse the JSON string to PartitionExpr
72                PartitionExpr::from_json_str(&json_str).map_err(D::Error::custom)
73            }
74        }
75    }
76}
77
78/// Type to store SST level.
79pub type Level = u8;
80/// Maximum level of SSTs.
81pub const MAX_LEVEL: Level = 2;
82
83/// Cross-region file id.
84///
85/// It contains a region id and a file id. The string representation is `{region_id}/{file_id}`.
86#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
87pub struct RegionFileId {
88    /// The region that creates the file.
89    region_id: RegionId,
90    /// The id of the file.
91    file_id: FileId,
92}
93
94impl RegionFileId {
95    /// Creates a new [RegionFileId] from `region_id` and `file_id`.
96    pub fn new(region_id: RegionId, file_id: FileId) -> Self {
97        Self { region_id, file_id }
98    }
99
100    /// Gets the region id.
101    pub fn region_id(&self) -> RegionId {
102        self.region_id
103    }
104
105    /// Gets the file id.
106    pub fn file_id(&self) -> FileId {
107        self.file_id
108    }
109}
110
111impl fmt::Display for RegionFileId {
112    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
113        write!(f, "{}/{}", self.region_id, self.file_id)
114    }
115}
116
117/// Time range (min and max timestamps) of a SST file.
118/// Both min and max are inclusive.
119pub type FileTimeRange = (Timestamp, Timestamp);
120
121/// Checks if two inclusive timestamp ranges overlap with each other.
122pub(crate) fn overlaps(l: &FileTimeRange, r: &FileTimeRange) -> bool {
123    let (l, r) = if l.0 <= r.0 { (l, r) } else { (r, l) };
124    let (_, l_end) = l;
125    let (r_start, _) = r;
126
127    r_start <= l_end
128}
129
130/// Metadata of a SST file.
131#[derive(Clone, PartialEq, Eq, Serialize, Deserialize, Default)]
132#[serde(default)]
133pub struct FileMeta {
134    /// Region that created the file. The region id may not be the id of the current region.
135    pub region_id: RegionId,
136    /// Compared to normal file names, FileId ignore the extension
137    pub file_id: FileId,
138    /// Timestamp range of file. The timestamps have the same time unit as the
139    /// data in the SST.
140    pub time_range: FileTimeRange,
141    /// SST level of the file.
142    pub level: Level,
143    /// Size of the file.
144    pub file_size: u64,
145    /// Available indexes of the file.
146    pub available_indexes: SmallVec<[IndexType; 4]>,
147    /// Size of the index file.
148    pub index_file_size: u64,
149    /// Number of rows in the file.
150    ///
151    /// For historical reasons, this field might be missing in old files. Thus
152    /// the default value `0` doesn't means the file doesn't contains any rows,
153    /// but instead means the number of rows is unknown.
154    pub num_rows: u64,
155    /// Number of row groups in the file.
156    ///
157    /// For historical reasons, this field might be missing in old files. Thus
158    /// the default value `0` doesn't means the file doesn't contains any rows,
159    /// but instead means the number of rows is unknown.
160    pub num_row_groups: u64,
161    /// Sequence in this file.
162    ///
163    /// This sequence is the only sequence in this file. And it's retrieved from the max
164    /// sequence of the rows on generating this file.
165    pub sequence: Option<NonZeroU64>,
166    /// Partition expression from the region metadata when the file is created.
167    ///
168    /// This is stored as a PartitionExpr object in memory for convenience,
169    /// but serialized as JSON string for manifest compatibility.
170    /// Compatibility behavior:
171    /// - None: no partition expr was set when the file was created (legacy files).
172    /// - Some(expr): partition expression from region metadata.
173    #[serde(
174        serialize_with = "serialize_partition_expr",
175        deserialize_with = "deserialize_partition_expr"
176    )]
177    pub partition_expr: Option<PartitionExpr>,
178}
179
180impl Debug for FileMeta {
181    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
182        let mut debug_struct = f.debug_struct("FileMeta");
183        debug_struct
184            .field("region_id", &self.region_id)
185            .field_with("file_id", |f| write!(f, "{} ", self.file_id))
186            .field_with("time_range", |f| {
187                write!(
188                    f,
189                    "({}, {}) ",
190                    self.time_range.0.to_iso8601_string(),
191                    self.time_range.1.to_iso8601_string()
192                )
193            })
194            .field("level", &self.level)
195            .field("file_size", &ReadableSize(self.file_size));
196        if !self.available_indexes.is_empty() {
197            debug_struct
198                .field("available_indexes", &self.available_indexes)
199                .field("index_file_size", &ReadableSize(self.index_file_size));
200        }
201        debug_struct
202            .field("num_rows", &self.num_rows)
203            .field("num_row_groups", &self.num_row_groups)
204            .field_with("sequence", |f| match self.sequence {
205                None => {
206                    write!(f, "None")
207                }
208                Some(seq) => {
209                    write!(f, "{}", seq)
210                }
211            })
212            .field("partition_expr", &self.partition_expr)
213            .finish()
214    }
215}
216
217/// Type of index.
218#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
219pub enum IndexType {
220    /// Inverted index.
221    InvertedIndex,
222    /// Full-text index.
223    FulltextIndex,
224    /// Bloom Filter index
225    BloomFilterIndex,
226}
227
228impl FileMeta {
229    pub fn exists_index(&self) -> bool {
230        !self.available_indexes.is_empty()
231    }
232
233    /// Returns true if the file has an inverted index
234    pub fn inverted_index_available(&self) -> bool {
235        self.available_indexes.contains(&IndexType::InvertedIndex)
236    }
237
238    /// Returns true if the file has a fulltext index
239    pub fn fulltext_index_available(&self) -> bool {
240        self.available_indexes.contains(&IndexType::FulltextIndex)
241    }
242
243    /// Returns true if the file has a bloom filter index.
244    pub fn bloom_filter_index_available(&self) -> bool {
245        self.available_indexes
246            .contains(&IndexType::BloomFilterIndex)
247    }
248
249    pub fn index_file_size(&self) -> u64 {
250        self.index_file_size
251    }
252
253    /// Returns the cross-region file id.
254    pub fn file_id(&self) -> RegionFileId {
255        RegionFileId::new(self.region_id, self.file_id)
256    }
257}
258
259/// Handle to a SST file.
260#[derive(Clone)]
261pub struct FileHandle {
262    inner: Arc<FileHandleInner>,
263}
264
265impl fmt::Debug for FileHandle {
266    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
267        f.debug_struct("FileHandle")
268            .field("meta", self.meta_ref())
269            .field("compacting", &self.compacting())
270            .field("deleted", &self.inner.deleted.load(Ordering::Relaxed))
271            .finish()
272    }
273}
274
275impl FileHandle {
276    pub fn new(meta: FileMeta, file_purger: FilePurgerRef) -> FileHandle {
277        FileHandle {
278            inner: Arc::new(FileHandleInner::new(meta, file_purger)),
279        }
280    }
281
282    /// Returns the region id of the file.
283    pub fn region_id(&self) -> RegionId {
284        self.inner.meta.region_id
285    }
286
287    /// Returns the cross-region file id.
288    pub fn file_id(&self) -> RegionFileId {
289        RegionFileId::new(self.inner.meta.region_id, self.inner.meta.file_id)
290    }
291
292    /// Returns the complete file path of the file.
293    pub fn file_path(&self, file_dir: &str, path_type: PathType) -> String {
294        location::sst_file_path(file_dir, self.file_id(), path_type)
295    }
296
297    /// Returns the time range of the file.
298    pub fn time_range(&self) -> FileTimeRange {
299        self.inner.meta.time_range
300    }
301
302    /// Mark the file as deleted and will delete it on drop asynchronously
303    pub fn mark_deleted(&self) {
304        self.inner.deleted.store(true, Ordering::Relaxed);
305    }
306
307    pub fn compacting(&self) -> bool {
308        self.inner.compacting.load(Ordering::Relaxed)
309    }
310
311    pub fn set_compacting(&self, compacting: bool) {
312        self.inner.compacting.store(compacting, Ordering::Relaxed);
313    }
314
315    /// Returns a reference to the [FileMeta].
316    pub fn meta_ref(&self) -> &FileMeta {
317        &self.inner.meta
318    }
319
320    pub fn file_purger(&self) -> FilePurgerRef {
321        self.inner.file_purger.clone()
322    }
323
324    pub fn size(&self) -> u64 {
325        self.inner.meta.file_size
326    }
327
328    pub fn index_size(&self) -> u64 {
329        self.inner.meta.index_file_size
330    }
331
332    pub fn num_rows(&self) -> usize {
333        self.inner.meta.num_rows as usize
334    }
335
336    pub fn level(&self) -> Level {
337        self.inner.meta.level
338    }
339
340    pub fn is_deleted(&self) -> bool {
341        self.inner.deleted.load(Ordering::Relaxed)
342    }
343}
344
345/// Inner data of [FileHandle].
346///
347/// Contains meta of the file, and other mutable info like whether the file is compacting.
348struct FileHandleInner {
349    meta: FileMeta,
350    compacting: AtomicBool,
351    deleted: AtomicBool,
352    file_purger: FilePurgerRef,
353}
354
355impl Drop for FileHandleInner {
356    fn drop(&mut self) {
357        self.file_purger
358            .remove_file(self.meta.clone(), self.deleted.load(Ordering::Relaxed));
359    }
360}
361
362impl FileHandleInner {
363    fn new(meta: FileMeta, file_purger: FilePurgerRef) -> FileHandleInner {
364        file_purger.new_file(&meta);
365        FileHandleInner {
366            meta,
367            compacting: AtomicBool::new(false),
368            deleted: AtomicBool::new(false),
369            file_purger,
370        }
371    }
372}
373
374/// Delete
375pub async fn delete_files(
376    region_id: RegionId,
377    file_ids: &[FileId],
378    delete_index: bool,
379    access_layer: &AccessLayerRef,
380    cache_manager: &Option<CacheManagerRef>,
381) -> crate::error::Result<()> {
382    // Remove meta of the file from cache.
383    if let Some(cache) = &cache_manager {
384        for file_id in file_ids {
385            cache.remove_parquet_meta_data(RegionFileId::new(region_id, *file_id));
386        }
387    }
388    let mut deleted_files = Vec::with_capacity(file_ids.len());
389
390    for file_id in file_ids {
391        let region_file_id = RegionFileId::new(region_id, *file_id);
392        match access_layer.delete_sst(&region_file_id).await {
393            Ok(_) => {
394                deleted_files.push(*file_id);
395            }
396            Err(e) => {
397                error!(e; "Failed to delete sst and index file for {}", region_file_id);
398            }
399        }
400    }
401
402    info!(
403        "Deleted {} files for region {}: {:?}",
404        deleted_files.len(),
405        region_id,
406        deleted_files
407    );
408
409    for file_id in file_ids {
410        let region_file_id = RegionFileId::new(region_id, *file_id);
411
412        if let Some(write_cache) = cache_manager.as_ref().and_then(|cache| cache.write_cache()) {
413            // Removes index file from the cache.
414            if delete_index {
415                write_cache
416                    .remove(IndexKey::new(region_id, *file_id, FileType::Puffin))
417                    .await;
418            }
419
420            // Remove the SST file from the cache.
421            write_cache
422                .remove(IndexKey::new(region_id, *file_id, FileType::Parquet))
423                .await;
424        }
425
426        // Purges index content in the stager.
427        if let Err(e) = access_layer
428            .puffin_manager_factory()
429            .purge_stager(region_file_id)
430            .await
431        {
432            error!(e; "Failed to purge stager with index file, file_id: {}, region: {}",
433                    file_id, region_id);
434        }
435    }
436    Ok(())
437}
438
439#[cfg(test)]
440mod tests {
441    use std::str::FromStr;
442
443    use datatypes::value::Value;
444    use partition::expr::{PartitionExpr, col};
445
446    use super::*;
447
448    fn create_file_meta(file_id: FileId, level: Level) -> FileMeta {
449        FileMeta {
450            region_id: 0.into(),
451            file_id,
452            time_range: FileTimeRange::default(),
453            level,
454            file_size: 0,
455            available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
456            index_file_size: 0,
457            num_rows: 0,
458            num_row_groups: 0,
459            sequence: None,
460            partition_expr: None,
461        }
462    }
463
464    #[test]
465    fn test_deserialize_file_meta() {
466        let file_meta = create_file_meta(FileId::random(), 0);
467        let serialized_file_meta = serde_json::to_string(&file_meta).unwrap();
468        let deserialized_file_meta = serde_json::from_str(&serialized_file_meta);
469        assert_eq!(file_meta, deserialized_file_meta.unwrap());
470    }
471
472    #[test]
473    fn test_deserialize_from_string() {
474        let json_file_meta = "{\"region_id\":0,\"file_id\":\"bc5896ec-e4d8-4017-a80d-f2de73188d55\",\
475        \"time_range\":[{\"value\":0,\"unit\":\"Millisecond\"},{\"value\":0,\"unit\":\"Millisecond\"}],\
476        \"available_indexes\":[\"InvertedIndex\"],\"level\":0}";
477        let file_meta = create_file_meta(
478            FileId::from_str("bc5896ec-e4d8-4017-a80d-f2de73188d55").unwrap(),
479            0,
480        );
481        let deserialized_file_meta: FileMeta = serde_json::from_str(json_file_meta).unwrap();
482        assert_eq!(file_meta, deserialized_file_meta);
483    }
484
485    #[test]
486    fn test_file_meta_with_partition_expr() {
487        let file_id = FileId::random();
488        let partition_expr = PartitionExpr::new(
489            col("a"),
490            partition::expr::RestrictedOp::GtEq,
491            Value::UInt32(10).into(),
492        );
493
494        let file_meta_with_partition = FileMeta {
495            region_id: 0.into(),
496            file_id,
497            time_range: FileTimeRange::default(),
498            level: 0,
499            file_size: 0,
500            available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
501            index_file_size: 0,
502            num_rows: 0,
503            num_row_groups: 0,
504            sequence: None,
505            partition_expr: Some(partition_expr.clone()),
506        };
507
508        // Test serialization/deserialization
509        let serialized = serde_json::to_string(&file_meta_with_partition).unwrap();
510        let deserialized: FileMeta = serde_json::from_str(&serialized).unwrap();
511        assert_eq!(file_meta_with_partition, deserialized);
512
513        // Verify the serialized JSON contains the expected partition expression string
514        let serialized_value: serde_json::Value = serde_json::from_str(&serialized).unwrap();
515        assert!(serialized_value["partition_expr"].as_str().is_some());
516        let partition_expr_json = serialized_value["partition_expr"].as_str().unwrap();
517        assert!(partition_expr_json.contains("\"Column\":\"a\""));
518        assert!(partition_expr_json.contains("\"op\":\"GtEq\""));
519
520        // Test with None (legacy files)
521        let file_meta_none = FileMeta {
522            partition_expr: None,
523            ..file_meta_with_partition.clone()
524        };
525        let serialized_none = serde_json::to_string(&file_meta_none).unwrap();
526        let deserialized_none: FileMeta = serde_json::from_str(&serialized_none).unwrap();
527        assert_eq!(file_meta_none, deserialized_none);
528    }
529
530    #[test]
531    fn test_file_meta_partition_expr_backward_compatibility() {
532        // Test that we can deserialize old JSON format with partition_expr as string
533        let json_with_partition_expr = r#"{
534            "region_id": 0,
535            "file_id": "bc5896ec-e4d8-4017-a80d-f2de73188d55",
536            "time_range": [
537                {"value": 0, "unit": "Millisecond"},
538                {"value": 0, "unit": "Millisecond"}
539            ],
540            "level": 0,
541            "file_size": 0,
542            "available_indexes": ["InvertedIndex"],
543            "index_file_size": 0,
544            "num_rows": 0,
545            "num_row_groups": 0,
546            "sequence": null,
547            "partition_expr": "{\"Expr\":{\"lhs\":{\"Column\":\"a\"},\"op\":\"GtEq\",\"rhs\":{\"Value\":{\"UInt32\":10}}}}"
548        }"#;
549
550        let file_meta: FileMeta = serde_json::from_str(json_with_partition_expr).unwrap();
551        assert!(file_meta.partition_expr.is_some());
552        let expr = file_meta.partition_expr.unwrap();
553        assert_eq!(format!("{}", expr), "a >= 10");
554
555        // Test empty partition expression string
556        let json_with_empty_expr = r#"{
557            "region_id": 0,
558            "file_id": "bc5896ec-e4d8-4017-a80d-f2de73188d55",
559            "time_range": [
560                {"value": 0, "unit": "Millisecond"},
561                {"value": 0, "unit": "Millisecond"}
562            ],
563            "level": 0,
564            "file_size": 0,
565            "available_indexes": [],
566            "index_file_size": 0,
567            "num_rows": 0,
568            "num_row_groups": 0,
569            "sequence": null,
570            "partition_expr": ""
571        }"#;
572
573        let file_meta_empty: FileMeta = serde_json::from_str(json_with_empty_expr).unwrap();
574        assert!(file_meta_empty.partition_expr.is_none());
575
576        // Test null partition expression
577        let json_with_null_expr = r#"{
578            "region_id": 0,
579            "file_id": "bc5896ec-e4d8-4017-a80d-f2de73188d55",
580            "time_range": [
581                {"value": 0, "unit": "Millisecond"},
582                {"value": 0, "unit": "Millisecond"}
583            ],
584            "level": 0,
585            "file_size": 0,
586            "available_indexes": [],
587            "index_file_size": 0,
588            "num_rows": 0,
589            "num_row_groups": 0,
590            "sequence": null,
591            "partition_expr": null
592        }"#;
593
594        let file_meta_null: FileMeta = serde_json::from_str(json_with_null_expr).unwrap();
595        assert!(file_meta_null.partition_expr.is_none());
596
597        // Test partition expression doesn't exist
598        let json_with_empty_expr = r#"{
599            "region_id": 0,
600            "file_id": "bc5896ec-e4d8-4017-a80d-f2de73188d55",
601            "time_range": [
602                {"value": 0, "unit": "Millisecond"},
603                {"value": 0, "unit": "Millisecond"}
604            ],
605            "level": 0,
606            "file_size": 0,
607            "available_indexes": [],
608            "index_file_size": 0,
609            "num_rows": 0,
610            "num_row_groups": 0,
611            "sequence": null
612        }"#;
613
614        let file_meta_empty: FileMeta = serde_json::from_str(json_with_empty_expr).unwrap();
615        assert!(file_meta_empty.partition_expr.is_none());
616    }
617}