1use std::fmt;
18use std::num::NonZeroU64;
19use std::str::FromStr;
20use std::sync::atomic::{AtomicBool, Ordering};
21use std::sync::Arc;
22
23use common_time::Timestamp;
24use serde::{Deserialize, Serialize};
25use smallvec::SmallVec;
26use snafu::{ResultExt, Snafu};
27use store_api::storage::RegionId;
28use uuid::Uuid;
29
30use crate::sst::file_purger::{FilePurgerRef, PurgeRequest};
31use crate::sst::location;
32
33pub type Level = u8;
35pub const MAX_LEVEL: Level = 2;
37
38#[derive(Debug, Snafu, PartialEq)]
39pub struct ParseIdError {
40 source: uuid::Error,
41}
42
43#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
45pub struct FileId(Uuid);
46
47impl FileId {
48 pub fn random() -> FileId {
50 FileId(Uuid::new_v4())
51 }
52
53 pub fn parse_str(input: &str) -> std::result::Result<FileId, ParseIdError> {
55 Uuid::parse_str(input).map(FileId).context(ParseIdSnafu)
56 }
57
58 pub fn as_parquet(&self) -> String {
60 format!("{}{}", self, ".parquet")
61 }
62
63 pub fn as_puffin(&self) -> String {
65 format!("{}{}", self, ".puffin")
66 }
67
68 pub fn as_bytes(&self) -> &[u8] {
70 self.0.as_bytes()
71 }
72}
73
74impl From<FileId> for Uuid {
75 fn from(value: FileId) -> Self {
76 value.0
77 }
78}
79
80impl fmt::Display for FileId {
81 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
82 write!(f, "{}", self.0)
83 }
84}
85
86impl FromStr for FileId {
87 type Err = ParseIdError;
88
89 fn from_str(s: &str) -> std::result::Result<FileId, ParseIdError> {
90 FileId::parse_str(s)
91 }
92}
93
94pub type FileTimeRange = (Timestamp, Timestamp);
97
98pub(crate) fn overlaps(l: &FileTimeRange, r: &FileTimeRange) -> bool {
100 let (l, r) = if l.0 <= r.0 { (l, r) } else { (r, l) };
101 let (_, l_end) = l;
102 let (r_start, _) = r;
103
104 r_start <= l_end
105}
106
107#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
109#[serde(default)]
110pub struct FileMeta {
111 pub region_id: RegionId,
113 pub file_id: FileId,
115 pub time_range: FileTimeRange,
118 pub level: Level,
120 pub file_size: u64,
122 pub available_indexes: SmallVec<[IndexType; 4]>,
124 pub index_file_size: u64,
126 pub num_rows: u64,
132 pub num_row_groups: u64,
138 pub sequence: Option<NonZeroU64>,
143}
144
145#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
147pub enum IndexType {
148 InvertedIndex,
150 FulltextIndex,
152 BloomFilterIndex,
154}
155
156impl FileMeta {
157 pub fn exists_index(&self) -> bool {
158 !self.available_indexes.is_empty()
159 }
160
161 pub fn inverted_index_available(&self) -> bool {
163 self.available_indexes.contains(&IndexType::InvertedIndex)
164 }
165
166 pub fn fulltext_index_available(&self) -> bool {
168 self.available_indexes.contains(&IndexType::FulltextIndex)
169 }
170
171 pub fn bloom_filter_index_available(&self) -> bool {
173 self.available_indexes
174 .contains(&IndexType::BloomFilterIndex)
175 }
176
177 pub fn index_file_size(&self) -> u64 {
178 self.index_file_size
179 }
180}
181
182#[derive(Clone)]
184pub struct FileHandle {
185 inner: Arc<FileHandleInner>,
186}
187
188impl fmt::Debug for FileHandle {
189 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
190 f.debug_struct("FileHandle")
191 .field("region_id", &self.inner.meta.region_id)
192 .field("file_id", &self.inner.meta.file_id)
193 .field("time_range", &self.inner.meta.time_range)
194 .field("size", &self.inner.meta.file_size)
195 .field("level", &self.inner.meta.level)
196 .field("compacting", &self.inner.compacting)
197 .field("deleted", &self.inner.deleted)
198 .finish()
199 }
200}
201
202impl FileHandle {
203 pub fn new(meta: FileMeta, file_purger: FilePurgerRef) -> FileHandle {
204 FileHandle {
205 inner: Arc::new(FileHandleInner::new(meta, file_purger)),
206 }
207 }
208
209 pub fn region_id(&self) -> RegionId {
211 self.inner.meta.region_id
212 }
213
214 pub fn file_id(&self) -> FileId {
216 self.inner.meta.file_id
217 }
218
219 pub fn file_path(&self, file_dir: &str) -> String {
221 location::sst_file_path(file_dir, self.file_id())
222 }
223
224 pub fn time_range(&self) -> FileTimeRange {
226 self.inner.meta.time_range
227 }
228
229 pub fn mark_deleted(&self) {
231 self.inner.deleted.store(true, Ordering::Relaxed);
232 }
233
234 pub fn compacting(&self) -> bool {
235 self.inner.compacting.load(Ordering::Relaxed)
236 }
237
238 pub fn set_compacting(&self, compacting: bool) {
239 self.inner.compacting.store(compacting, Ordering::Relaxed);
240 }
241
242 pub fn meta_ref(&self) -> &FileMeta {
244 &self.inner.meta
245 }
246
247 pub fn size(&self) -> u64 {
248 self.inner.meta.file_size
249 }
250
251 pub fn index_size(&self) -> u64 {
252 self.inner.meta.index_file_size
253 }
254
255 pub fn num_rows(&self) -> usize {
256 self.inner.meta.num_rows as usize
257 }
258}
259
260struct FileHandleInner {
264 meta: FileMeta,
265 compacting: AtomicBool,
266 deleted: AtomicBool,
267 file_purger: FilePurgerRef,
268}
269
270impl Drop for FileHandleInner {
271 fn drop(&mut self) {
272 if self.deleted.load(Ordering::Relaxed) {
273 self.file_purger.send_request(PurgeRequest {
274 file_meta: self.meta.clone(),
275 });
276 }
277 }
278}
279
280impl FileHandleInner {
281 fn new(meta: FileMeta, file_purger: FilePurgerRef) -> FileHandleInner {
282 FileHandleInner {
283 meta,
284 compacting: AtomicBool::new(false),
285 deleted: AtomicBool::new(false),
286 file_purger,
287 }
288 }
289}
290
291#[cfg(test)]
292mod tests {
293 use super::*;
294
295 #[test]
296 fn test_file_id() {
297 let id = FileId::random();
298 let uuid_str = id.to_string();
299 assert_eq!(id.0.to_string(), uuid_str);
300
301 let parsed = FileId::parse_str(&uuid_str).unwrap();
302 assert_eq!(id, parsed);
303 let parsed = uuid_str.parse().unwrap();
304 assert_eq!(id, parsed);
305 }
306
307 #[test]
308 fn test_file_id_serialization() {
309 let id = FileId::random();
310 let json = serde_json::to_string(&id).unwrap();
311 assert_eq!(format!("\"{id}\""), json);
312
313 let parsed = serde_json::from_str(&json).unwrap();
314 assert_eq!(id, parsed);
315 }
316
317 #[test]
318 fn test_file_id_as_parquet() {
319 let id = FileId::from_str("67e55044-10b1-426f-9247-bb680e5fe0c8").unwrap();
320 assert_eq!(
321 "67e55044-10b1-426f-9247-bb680e5fe0c8.parquet",
322 id.as_parquet()
323 );
324 }
325
326 fn create_file_meta(file_id: FileId, level: Level) -> FileMeta {
327 FileMeta {
328 region_id: 0.into(),
329 file_id,
330 time_range: FileTimeRange::default(),
331 level,
332 file_size: 0,
333 available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
334 index_file_size: 0,
335 num_rows: 0,
336 num_row_groups: 0,
337 sequence: None,
338 }
339 }
340
341 #[test]
342 fn test_deserialize_file_meta() {
343 let file_meta = create_file_meta(FileId::random(), 0);
344 let serialized_file_meta = serde_json::to_string(&file_meta).unwrap();
345 let deserialized_file_meta = serde_json::from_str(&serialized_file_meta);
346 assert_eq!(file_meta, deserialized_file_meta.unwrap());
347 }
348
349 #[test]
350 fn test_deserialize_from_string() {
351 let json_file_meta = "{\"region_id\":0,\"file_id\":\"bc5896ec-e4d8-4017-a80d-f2de73188d55\",\
352 \"time_range\":[{\"value\":0,\"unit\":\"Millisecond\"},{\"value\":0,\"unit\":\"Millisecond\"}],\
353 \"available_indexes\":[\"InvertedIndex\"],\"level\":0}";
354 let file_meta = create_file_meta(
355 FileId::from_str("bc5896ec-e4d8-4017-a80d-f2de73188d55").unwrap(),
356 0,
357 );
358 let deserialized_file_meta: FileMeta = serde_json::from_str(json_file_meta).unwrap();
359 assert_eq!(file_meta, deserialized_file_meta);
360 }
361}