1use std::fmt;
18use std::fmt::{Debug, Formatter};
19use std::num::NonZeroU64;
20use std::str::FromStr;
21use std::sync::atomic::{AtomicBool, Ordering};
22use std::sync::Arc;
23
24use common_base::readable_size::ReadableSize;
25use common_time::Timestamp;
26use serde::{Deserialize, Serialize};
27use smallvec::SmallVec;
28use snafu::{ResultExt, Snafu};
29use store_api::region_request::PathType;
30use store_api::storage::RegionId;
31use uuid::Uuid;
32
33use crate::sst::file_purger::FilePurgerRef;
34use crate::sst::location;
35
36pub type Level = u8;
38pub const MAX_LEVEL: Level = 2;
40
41#[derive(Debug, Snafu, PartialEq)]
42pub struct ParseIdError {
43 source: uuid::Error,
44}
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
48pub struct FileId(Uuid);
49
50impl FileId {
51 pub fn random() -> FileId {
53 FileId(Uuid::new_v4())
54 }
55
56 pub fn parse_str(input: &str) -> std::result::Result<FileId, ParseIdError> {
58 Uuid::parse_str(input).map(FileId).context(ParseIdSnafu)
59 }
60
61 pub fn as_bytes(&self) -> &[u8] {
63 self.0.as_bytes()
64 }
65}
66
67impl From<FileId> for Uuid {
68 fn from(value: FileId) -> Self {
69 value.0
70 }
71}
72
73impl fmt::Display for FileId {
74 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
75 write!(f, "{}", self.0)
76 }
77}
78
79impl FromStr for FileId {
80 type Err = ParseIdError;
81
82 fn from_str(s: &str) -> std::result::Result<FileId, ParseIdError> {
83 FileId::parse_str(s)
84 }
85}
86
87#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
91pub struct RegionFileId {
92 region_id: RegionId,
94 file_id: FileId,
96}
97
98impl RegionFileId {
99 pub fn new(region_id: RegionId, file_id: FileId) -> Self {
101 Self { region_id, file_id }
102 }
103
104 pub fn region_id(&self) -> RegionId {
106 self.region_id
107 }
108
109 pub fn file_id(&self) -> FileId {
111 self.file_id
112 }
113}
114
115impl fmt::Display for RegionFileId {
116 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
117 write!(f, "{}/{}", self.region_id, self.file_id)
118 }
119}
120
121pub type FileTimeRange = (Timestamp, Timestamp);
124
125pub(crate) fn overlaps(l: &FileTimeRange, r: &FileTimeRange) -> bool {
127 let (l, r) = if l.0 <= r.0 { (l, r) } else { (r, l) };
128 let (_, l_end) = l;
129 let (r_start, _) = r;
130
131 r_start <= l_end
132}
133
134#[derive(Clone, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
136#[serde(default)]
137pub struct FileMeta {
138 pub region_id: RegionId,
140 pub file_id: FileId,
142 pub time_range: FileTimeRange,
145 pub level: Level,
147 pub file_size: u64,
149 pub available_indexes: SmallVec<[IndexType; 4]>,
151 pub index_file_size: u64,
153 pub num_rows: u64,
159 pub num_row_groups: u64,
165 pub sequence: Option<NonZeroU64>,
170}
171
172impl Debug for FileMeta {
173 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
174 let mut debug_struct = f.debug_struct("FileMeta");
175 debug_struct
176 .field("region_id", &self.region_id)
177 .field_with("file_id", |f| write!(f, "{} ", self.file_id))
178 .field_with("time_range", |f| {
179 write!(
180 f,
181 "({}, {}) ",
182 self.time_range.0.to_iso8601_string(),
183 self.time_range.1.to_iso8601_string()
184 )
185 })
186 .field("level", &self.level)
187 .field("file_size", &ReadableSize(self.file_size));
188 if !self.available_indexes.is_empty() {
189 debug_struct
190 .field("available_indexes", &self.available_indexes)
191 .field("index_file_size", &ReadableSize(self.index_file_size));
192 }
193 debug_struct
194 .field("num_rows", &self.num_rows)
195 .field("num_row_groups", &self.num_row_groups)
196 .field_with("sequence", |f| match self.sequence {
197 None => {
198 write!(f, "None")
199 }
200 Some(seq) => {
201 write!(f, "{}", seq)
202 }
203 })
204 .finish()
205 }
206}
207
208#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
210pub enum IndexType {
211 InvertedIndex,
213 FulltextIndex,
215 BloomFilterIndex,
217}
218
219impl FileMeta {
220 pub fn exists_index(&self) -> bool {
221 !self.available_indexes.is_empty()
222 }
223
224 pub fn inverted_index_available(&self) -> bool {
226 self.available_indexes.contains(&IndexType::InvertedIndex)
227 }
228
229 pub fn fulltext_index_available(&self) -> bool {
231 self.available_indexes.contains(&IndexType::FulltextIndex)
232 }
233
234 pub fn bloom_filter_index_available(&self) -> bool {
236 self.available_indexes
237 .contains(&IndexType::BloomFilterIndex)
238 }
239
240 pub fn index_file_size(&self) -> u64 {
241 self.index_file_size
242 }
243
244 pub fn file_id(&self) -> RegionFileId {
246 RegionFileId::new(self.region_id, self.file_id)
247 }
248}
249
250#[derive(Clone)]
252pub struct FileHandle {
253 inner: Arc<FileHandleInner>,
254}
255
256impl fmt::Debug for FileHandle {
257 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
258 f.debug_struct("FileHandle")
259 .field("meta", self.meta_ref())
260 .field("compacting", &self.compacting())
261 .field("deleted", &self.inner.deleted.load(Ordering::Relaxed))
262 .finish()
263 }
264}
265
266impl FileHandle {
267 pub fn new(meta: FileMeta, file_purger: FilePurgerRef) -> FileHandle {
268 FileHandle {
269 inner: Arc::new(FileHandleInner::new(meta, file_purger)),
270 }
271 }
272
273 pub fn region_id(&self) -> RegionId {
275 self.inner.meta.region_id
276 }
277
278 pub fn file_id(&self) -> RegionFileId {
280 RegionFileId::new(self.inner.meta.region_id, self.inner.meta.file_id)
281 }
282
283 pub fn file_path(&self, file_dir: &str, path_type: PathType) -> String {
285 location::sst_file_path(file_dir, self.file_id(), path_type)
286 }
287
288 pub fn time_range(&self) -> FileTimeRange {
290 self.inner.meta.time_range
291 }
292
293 pub fn mark_deleted(&self) {
295 self.inner.deleted.store(true, Ordering::Relaxed);
296 }
297
298 pub fn compacting(&self) -> bool {
299 self.inner.compacting.load(Ordering::Relaxed)
300 }
301
302 pub fn set_compacting(&self, compacting: bool) {
303 self.inner.compacting.store(compacting, Ordering::Relaxed);
304 }
305
306 pub fn meta_ref(&self) -> &FileMeta {
308 &self.inner.meta
309 }
310
311 pub fn size(&self) -> u64 {
312 self.inner.meta.file_size
313 }
314
315 pub fn index_size(&self) -> u64 {
316 self.inner.meta.index_file_size
317 }
318
319 pub fn num_rows(&self) -> usize {
320 self.inner.meta.num_rows as usize
321 }
322
323 pub fn level(&self) -> Level {
324 self.inner.meta.level
325 }
326}
327
328struct FileHandleInner {
332 meta: FileMeta,
333 compacting: AtomicBool,
334 deleted: AtomicBool,
335 file_purger: FilePurgerRef,
336}
337
338impl Drop for FileHandleInner {
339 fn drop(&mut self) {
340 self.file_purger
341 .remove_file(self.meta.clone(), self.deleted.load(Ordering::Relaxed));
342 }
343}
344
345impl FileHandleInner {
346 fn new(meta: FileMeta, file_purger: FilePurgerRef) -> FileHandleInner {
347 file_purger.new_file(&meta);
348 FileHandleInner {
349 meta,
350 compacting: AtomicBool::new(false),
351 deleted: AtomicBool::new(false),
352 file_purger,
353 }
354 }
355}
356
357#[cfg(test)]
358mod tests {
359 use super::*;
360
361 #[test]
362 fn test_file_id() {
363 let id = FileId::random();
364 let uuid_str = id.to_string();
365 assert_eq!(id.0.to_string(), uuid_str);
366
367 let parsed = FileId::parse_str(&uuid_str).unwrap();
368 assert_eq!(id, parsed);
369 let parsed = uuid_str.parse().unwrap();
370 assert_eq!(id, parsed);
371 }
372
373 #[test]
374 fn test_file_id_serialization() {
375 let id = FileId::random();
376 let json = serde_json::to_string(&id).unwrap();
377 assert_eq!(format!("\"{id}\""), json);
378
379 let parsed = serde_json::from_str(&json).unwrap();
380 assert_eq!(id, parsed);
381 }
382
383 fn create_file_meta(file_id: FileId, level: Level) -> FileMeta {
384 FileMeta {
385 region_id: 0.into(),
386 file_id,
387 time_range: FileTimeRange::default(),
388 level,
389 file_size: 0,
390 available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
391 index_file_size: 0,
392 num_rows: 0,
393 num_row_groups: 0,
394 sequence: None,
395 }
396 }
397
398 #[test]
399 fn test_deserialize_file_meta() {
400 let file_meta = create_file_meta(FileId::random(), 0);
401 let serialized_file_meta = serde_json::to_string(&file_meta).unwrap();
402 let deserialized_file_meta = serde_json::from_str(&serialized_file_meta);
403 assert_eq!(file_meta, deserialized_file_meta.unwrap());
404 }
405
406 #[test]
407 fn test_deserialize_from_string() {
408 let json_file_meta = "{\"region_id\":0,\"file_id\":\"bc5896ec-e4d8-4017-a80d-f2de73188d55\",\
409 \"time_range\":[{\"value\":0,\"unit\":\"Millisecond\"},{\"value\":0,\"unit\":\"Millisecond\"}],\
410 \"available_indexes\":[\"InvertedIndex\"],\"level\":0}";
411 let file_meta = create_file_meta(
412 FileId::from_str("bc5896ec-e4d8-4017-a80d-f2de73188d55").unwrap(),
413 0,
414 );
415 let deserialized_file_meta: FileMeta = serde_json::from_str(json_file_meta).unwrap();
416 assert_eq!(file_meta, deserialized_file_meta);
417 }
418}