store_api/storage/
file.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::fmt;
17use std::fmt::Debug;
18use std::str::FromStr;
19
20use serde::{Deserialize, Serialize};
21use snafu::{ResultExt, Snafu};
22use uuid::Uuid;
23
24use crate::ManifestVersion;
25use crate::storage::RegionId;
26
27/// Index version
28pub type IndexVersion = u64;
29
30#[derive(Debug, Snafu, PartialEq)]
31pub struct ParseIdError {
32    source: uuid::Error,
33}
34
35/// Unique id for [SST File].
36#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
37pub struct FileId(Uuid);
38
39impl FileId {
40    /// Returns a new unique [FileId] randomly.
41    pub fn random() -> FileId {
42        FileId(Uuid::new_v4())
43    }
44
45    /// Parses id from string.
46    pub fn parse_str(input: &str) -> std::result::Result<FileId, ParseIdError> {
47        Uuid::parse_str(input).map(FileId).context(ParseIdSnafu)
48    }
49
50    /// Converts [FileId] as byte slice.
51    pub fn as_bytes(&self) -> &[u8] {
52        self.0.as_bytes()
53    }
54}
55
56impl From<FileId> for Uuid {
57    fn from(value: FileId) -> Self {
58        value.0
59    }
60}
61
62impl fmt::Display for FileId {
63    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
64        write!(f, "{}", self.0)
65    }
66}
67
68impl FromStr for FileId {
69    type Err = ParseIdError;
70
71    fn from_str(s: &str) -> std::result::Result<FileId, ParseIdError> {
72        FileId::parse_str(s)
73    }
74}
75
76#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
77pub struct FileRef {
78    pub region_id: RegionId,
79    pub file_id: FileId,
80    pub index_version: Option<IndexVersion>,
81}
82
83impl FileRef {
84    pub fn new(region_id: RegionId, file_id: FileId, index_version: Option<IndexVersion>) -> Self {
85        Self {
86            region_id,
87            file_id,
88            index_version,
89        }
90    }
91}
92
93/// The tmp file manifest which record a table's file references.
94/// Also record the manifest version when these tmp files are read.
95#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
96pub struct FileRefsManifest {
97    pub file_refs: HashMap<RegionId, HashSet<FileRef>>,
98    /// Manifest version when this manifest is read for its files
99    pub manifest_version: HashMap<RegionId, ManifestVersion>,
100    /// Cross-region file ownership mapping.
101    ///
102    /// Key is the source/original region id (before repartition); value is the set of
103    /// target/destination region ids (after repartition) that currently hold files
104    /// originally coming from that source region.
105    ///
106    pub cross_region_refs: HashMap<RegionId, HashSet<RegionId>>,
107}
108
109#[derive(Clone, Default, Debug, PartialEq, Eq, Serialize, Deserialize)]
110pub struct GcReport {
111    /// deleted files per region
112    /// TODO(discord9): change to `RemovedFile`?
113    pub deleted_files: HashMap<RegionId, Vec<FileId>>,
114    pub deleted_indexes: HashMap<RegionId, Vec<(FileId, IndexVersion)>>,
115    /// Regions that need retry in next gc round, usually because their tmp ref files are outdated
116    pub need_retry_regions: HashSet<RegionId>,
117    /// Regions successfully processed in this GC run
118    pub processed_regions: HashSet<RegionId>,
119}
120
121impl GcReport {
122    pub fn new(
123        deleted_files: HashMap<RegionId, Vec<FileId>>,
124        deleted_indexes: HashMap<RegionId, Vec<(FileId, IndexVersion)>>,
125        need_retry_regions: HashSet<RegionId>,
126    ) -> Self {
127        Self {
128            deleted_files,
129            deleted_indexes,
130            need_retry_regions,
131            processed_regions: HashSet::new(),
132        }
133    }
134
135    pub fn merge(&mut self, other: GcReport) {
136        for (region, files) in other.deleted_files {
137            let self_files = self.deleted_files.entry(region).or_default();
138            let dedup: HashSet<FileId> = HashSet::from_iter(
139                std::mem::take(self_files)
140                    .into_iter()
141                    .chain(files.iter().cloned()),
142            );
143            *self_files = dedup.into_iter().collect();
144        }
145        for (region, files) in other.deleted_indexes {
146            let self_files = self.deleted_indexes.entry(region).or_default();
147            let dedup: HashSet<(FileId, IndexVersion)> = HashSet::from_iter(
148                std::mem::take(self_files)
149                    .into_iter()
150                    .chain(files.iter().cloned()),
151            );
152            *self_files = dedup.into_iter().collect();
153        }
154        self.need_retry_regions.extend(other.need_retry_regions);
155        self.processed_regions.extend(other.processed_regions);
156        // Remove regions that have succeeded from need_retry_regions
157        self.need_retry_regions
158            .retain(|region| !self.deleted_files.contains_key(region));
159    }
160}
161
162#[cfg(test)]
163mod tests {
164
165    use super::*;
166
167    #[test]
168    fn test_file_id() {
169        let id = FileId::random();
170        let uuid_str = id.to_string();
171        assert_eq!(id.0.to_string(), uuid_str);
172
173        let parsed = FileId::parse_str(&uuid_str).unwrap();
174        assert_eq!(id, parsed);
175        let parsed = uuid_str.parse().unwrap();
176        assert_eq!(id, parsed);
177    }
178
179    #[test]
180    fn test_file_id_serialization() {
181        let id = FileId::random();
182        let json = serde_json::to_string(&id).unwrap();
183        assert_eq!(format!("\"{id}\""), json);
184
185        let parsed = serde_json::from_str(&json).unwrap();
186        assert_eq!(id, parsed);
187    }
188
189    #[test]
190    fn test_file_refs_manifest_serialization() {
191        let mut manifest = FileRefsManifest::default();
192        let r0 = RegionId::new(1024, 1);
193        let r1 = RegionId::new(1024, 2);
194        manifest
195            .file_refs
196            .insert(r0, [FileRef::new(r0, FileId::random(), None)].into());
197        manifest
198            .file_refs
199            .insert(r1, [FileRef::new(r1, FileId::random(), None)].into());
200        manifest.manifest_version.insert(r0, 10);
201        manifest.manifest_version.insert(r1, 20);
202        manifest.cross_region_refs.insert(r0, [r1].into());
203        manifest.cross_region_refs.insert(r1, [r0].into());
204
205        let json = serde_json::to_string(&manifest).unwrap();
206        let parsed: FileRefsManifest = serde_json::from_str(&json).unwrap();
207        assert_eq!(manifest, parsed);
208    }
209
210    #[test]
211    fn test_file_ref_new() {
212        let region_id = RegionId::new(1024, 1);
213        let file_id = FileId::random();
214
215        // Test with Some(index_version)
216        let index_version: IndexVersion = 42;
217        let file_ref = FileRef::new(region_id, file_id, Some(index_version));
218        assert_eq!(file_ref.region_id, region_id);
219        assert_eq!(file_ref.file_id, file_id);
220        assert_eq!(file_ref.index_version, Some(index_version));
221
222        // Test with None
223        let file_ref_none = FileRef::new(region_id, file_id, None);
224        assert_eq!(file_ref_none.region_id, region_id);
225        assert_eq!(file_ref_none.file_id, file_id);
226        assert_eq!(file_ref_none.index_version, None);
227    }
228
229    #[test]
230    fn test_file_ref_equality() {
231        let region_id = RegionId::new(1024, 1);
232        let file_id = FileId::random();
233
234        let file_ref1 = FileRef::new(region_id, file_id, Some(10));
235        let file_ref2 = FileRef::new(region_id, file_id, Some(10));
236        let file_ref3 = FileRef::new(region_id, file_id, Some(20));
237        let file_ref4 = FileRef::new(region_id, file_id, None);
238
239        assert_eq!(file_ref1, file_ref2);
240        assert_ne!(file_ref1, file_ref3);
241        assert_ne!(file_ref1, file_ref4);
242        assert_ne!(file_ref3, file_ref4);
243
244        // Test equality with Some(0) vs None
245        let file_ref_zero = FileRef::new(region_id, file_id, Some(0));
246        assert_ne!(file_ref_zero, file_ref4);
247    }
248
249    #[test]
250    fn test_file_ref_serialization() {
251        let region_id = RegionId::new(1024, 1);
252        let file_id = FileId::random();
253
254        // Test with Some(index_version)
255        let index_version: IndexVersion = 12345;
256        let file_ref = FileRef::new(region_id, file_id, Some(index_version));
257
258        let json = serde_json::to_string(&file_ref).unwrap();
259        let parsed: FileRef = serde_json::from_str(&json).unwrap();
260
261        assert_eq!(file_ref, parsed);
262        assert_eq!(parsed.index_version, Some(index_version));
263
264        // Test with None
265        let file_ref_none = FileRef::new(region_id, file_id, None);
266        let json_none = serde_json::to_string(&file_ref_none).unwrap();
267        let parsed_none: FileRef = serde_json::from_str(&json_none).unwrap();
268
269        assert_eq!(file_ref_none, parsed_none);
270        assert_eq!(parsed_none.index_version, None);
271    }
272}