store_api/storage/
file.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::fmt;
17use std::fmt::Debug;
18use std::str::FromStr;
19
20use serde::{Deserialize, Serialize};
21use snafu::{ResultExt, Snafu};
22use uuid::Uuid;
23
24use crate::ManifestVersion;
25use crate::storage::RegionId;
26
27/// Index version
28pub type IndexVersion = u64;
29
30#[derive(Debug, Snafu, PartialEq)]
31pub struct ParseIdError {
32    source: uuid::Error,
33}
34
35/// Unique id for [SST File].
36#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
37pub struct FileId(Uuid);
38
39impl FileId {
40    /// Returns a new unique [FileId] randomly.
41    pub fn random() -> FileId {
42        FileId(Uuid::new_v4())
43    }
44
45    /// Parses id from string.
46    pub fn parse_str(input: &str) -> std::result::Result<FileId, ParseIdError> {
47        Uuid::parse_str(input).map(FileId).context(ParseIdSnafu)
48    }
49
50    /// Converts [FileId] as byte slice.
51    pub fn as_bytes(&self) -> &[u8] {
52        self.0.as_bytes()
53    }
54}
55
56impl From<FileId> for Uuid {
57    fn from(value: FileId) -> Self {
58        value.0
59    }
60}
61
62impl fmt::Display for FileId {
63    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
64        write!(f, "{}", self.0)
65    }
66}
67
68impl FromStr for FileId {
69    type Err = ParseIdError;
70
71    fn from_str(s: &str) -> std::result::Result<FileId, ParseIdError> {
72        FileId::parse_str(s)
73    }
74}
75
76#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
77pub struct FileRef {
78    pub region_id: RegionId,
79    pub file_id: FileId,
80    pub index_version: Option<IndexVersion>,
81}
82
83impl FileRef {
84    pub fn new(region_id: RegionId, file_id: FileId, index_version: Option<IndexVersion>) -> Self {
85        Self {
86            region_id,
87            file_id,
88            index_version,
89        }
90    }
91}
92
93/// The tmp file manifest which record a table's file references.
94/// Also record the manifest version when these tmp files are read.
95#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
96pub struct FileRefsManifest {
97    pub file_refs: HashMap<RegionId, HashSet<FileRef>>,
98    /// Manifest version when this manifest is read for its files
99    pub manifest_version: HashMap<RegionId, ManifestVersion>,
100    /// Cross-region file ownership mapping.
101    ///
102    /// Key is the source/original region id (before repartition); value is the set of
103    /// target/destination region ids (after repartition) that currently hold files
104    /// originally coming from that source region.
105    ///
106    pub cross_region_refs: HashMap<RegionId, HashSet<RegionId>>,
107}
108
109#[derive(Clone, Default, Debug, PartialEq, Eq, Serialize, Deserialize)]
110pub struct GcReport {
111    /// deleted files per region
112    /// TODO(discord9): change to `RemovedFile`?
113    pub deleted_files: HashMap<RegionId, Vec<FileId>>,
114    pub deleted_indexes: HashMap<RegionId, Vec<(FileId, IndexVersion)>>,
115    /// Regions that need retry in next gc round, usually because their tmp ref files are outdated
116    pub need_retry_regions: HashSet<RegionId>,
117}
118
119impl GcReport {
120    pub fn new(
121        deleted_files: HashMap<RegionId, Vec<FileId>>,
122        deleted_indexes: HashMap<RegionId, Vec<(FileId, IndexVersion)>>,
123        need_retry_regions: HashSet<RegionId>,
124    ) -> Self {
125        Self {
126            deleted_files,
127            deleted_indexes,
128            need_retry_regions,
129        }
130    }
131
132    pub fn merge(&mut self, other: GcReport) {
133        for (region, files) in other.deleted_files {
134            let self_files = self.deleted_files.entry(region).or_default();
135            let dedup: HashSet<FileId> = HashSet::from_iter(
136                std::mem::take(self_files)
137                    .into_iter()
138                    .chain(files.iter().cloned()),
139            );
140            *self_files = dedup.into_iter().collect();
141        }
142        self.need_retry_regions.extend(other.need_retry_regions);
143        // Remove regions that have succeeded from need_retry_regions
144        self.need_retry_regions
145            .retain(|region| !self.deleted_files.contains_key(region));
146    }
147}
148
149#[cfg(test)]
150mod tests {
151
152    use super::*;
153
154    #[test]
155    fn test_file_id() {
156        let id = FileId::random();
157        let uuid_str = id.to_string();
158        assert_eq!(id.0.to_string(), uuid_str);
159
160        let parsed = FileId::parse_str(&uuid_str).unwrap();
161        assert_eq!(id, parsed);
162        let parsed = uuid_str.parse().unwrap();
163        assert_eq!(id, parsed);
164    }
165
166    #[test]
167    fn test_file_id_serialization() {
168        let id = FileId::random();
169        let json = serde_json::to_string(&id).unwrap();
170        assert_eq!(format!("\"{id}\""), json);
171
172        let parsed = serde_json::from_str(&json).unwrap();
173        assert_eq!(id, parsed);
174    }
175
176    #[test]
177    fn test_file_refs_manifest_serialization() {
178        let mut manifest = FileRefsManifest::default();
179        let r0 = RegionId::new(1024, 1);
180        let r1 = RegionId::new(1024, 2);
181        manifest
182            .file_refs
183            .insert(r0, [FileRef::new(r0, FileId::random(), None)].into());
184        manifest
185            .file_refs
186            .insert(r1, [FileRef::new(r1, FileId::random(), None)].into());
187        manifest.manifest_version.insert(r0, 10);
188        manifest.manifest_version.insert(r1, 20);
189        manifest.cross_region_refs.insert(r0, [r1].into());
190        manifest.cross_region_refs.insert(r1, [r0].into());
191
192        let json = serde_json::to_string(&manifest).unwrap();
193        let parsed: FileRefsManifest = serde_json::from_str(&json).unwrap();
194        assert_eq!(manifest, parsed);
195    }
196
197    #[test]
198    fn test_file_ref_new() {
199        let region_id = RegionId::new(1024, 1);
200        let file_id = FileId::random();
201
202        // Test with Some(index_version)
203        let index_version: IndexVersion = 42;
204        let file_ref = FileRef::new(region_id, file_id, Some(index_version));
205        assert_eq!(file_ref.region_id, region_id);
206        assert_eq!(file_ref.file_id, file_id);
207        assert_eq!(file_ref.index_version, Some(index_version));
208
209        // Test with None
210        let file_ref_none = FileRef::new(region_id, file_id, None);
211        assert_eq!(file_ref_none.region_id, region_id);
212        assert_eq!(file_ref_none.file_id, file_id);
213        assert_eq!(file_ref_none.index_version, None);
214    }
215
216    #[test]
217    fn test_file_ref_equality() {
218        let region_id = RegionId::new(1024, 1);
219        let file_id = FileId::random();
220
221        let file_ref1 = FileRef::new(region_id, file_id, Some(10));
222        let file_ref2 = FileRef::new(region_id, file_id, Some(10));
223        let file_ref3 = FileRef::new(region_id, file_id, Some(20));
224        let file_ref4 = FileRef::new(region_id, file_id, None);
225
226        assert_eq!(file_ref1, file_ref2);
227        assert_ne!(file_ref1, file_ref3);
228        assert_ne!(file_ref1, file_ref4);
229        assert_ne!(file_ref3, file_ref4);
230
231        // Test equality with Some(0) vs None
232        let file_ref_zero = FileRef::new(region_id, file_id, Some(0));
233        assert_ne!(file_ref_zero, file_ref4);
234    }
235
236    #[test]
237    fn test_file_ref_serialization() {
238        let region_id = RegionId::new(1024, 1);
239        let file_id = FileId::random();
240
241        // Test with Some(index_version)
242        let index_version: IndexVersion = 12345;
243        let file_ref = FileRef::new(region_id, file_id, Some(index_version));
244
245        let json = serde_json::to_string(&file_ref).unwrap();
246        let parsed: FileRef = serde_json::from_str(&json).unwrap();
247
248        assert_eq!(file_ref, parsed);
249        assert_eq!(parsed.index_version, Some(index_version));
250
251        // Test with None
252        let file_ref_none = FileRef::new(region_id, file_id, None);
253        let json_none = serde_json::to_string(&file_ref_none).unwrap();
254        let parsed_none: FileRef = serde_json::from_str(&json_none).unwrap();
255
256        assert_eq!(file_ref_none, parsed_none);
257        assert_eq!(parsed_none.index_version, None);
258    }
259}